Просмотр исходного кода

Wrong commit fully reverted in follow-up commit #2590

Nathalie Furmento лет назад: 14
Родитель
Сommit
346314a1b9

+ 20 - 1
mpi/Makefile.am

@@ -16,7 +16,8 @@
 
 CC=$(MPICC)
 
-TESTS = $(check_PROGRAMS)
+TESTS_ENVIRONMENT	=	mpiexec -np 2
+TESTS			=	$(check_PROGRAMS)
 
 check_PROGRAMS =
 
@@ -78,6 +79,10 @@ libstarpumpi_la_SOURCES =				\
 
 if !NO_BLAS_LIB
 
+check_PROGRAMS += 				\
+	examples/mpi_lu/plu_example_float	\
+	examples/mpi_lu/plu_example_double
+
 examplebin_PROGRAMS += 				\
 	examples/mpi_lu/plu_example_float	\
 	examples/mpi_lu/plu_example_double
@@ -103,6 +108,20 @@ examples_mpi_lu_plu_example_double_SOURCES =	\
 	$(top_srcdir)/examples/common/blas.c
 endif
 
+check_PROGRAMS +=				\
+	tests/pingpong					\
+	tests/mpi_test					\
+	tests/mpi_isend					\
+	tests/mpi_irecv					\
+	tests/mpi_isend_detached			\
+	tests/mpi_irecv_detached			\
+	tests/mpi_detached_tag				\
+	tests/ring					\
+	tests/ring_async				\
+	tests/ring_async_implicit			\
+	tests/block_interface				\
+	tests/block_interface_pinned
+
 mpiexamplebin_PROGRAMS =				\
 	tests/pingpong					\
 	tests/mpi_test					\

+ 6 - 3
mpi/starpu_mpi.c

@@ -76,6 +76,7 @@ static struct starpu_mpi_req_s *_starpu_mpi_isend_common(starpu_data_handle data
 				int dest, int mpi_tag, MPI_Comm comm,
 				unsigned detached, void (*callback)(void *), void *arg)
 {
+        _STARPU_MPI_DEBUG("--> starpu_mpi_isend_common\n");
 	struct starpu_mpi_req_s *req = calloc(1, sizeof(struct starpu_mpi_req_s));
 	STARPU_ASSERT(req);
 
@@ -103,6 +104,7 @@ static struct starpu_mpi_req_s *_starpu_mpi_isend_common(starpu_data_handle data
 	starpu_data_acquire_cb(data_handle, STARPU_R,
 			submit_mpi_req, (void *)req);
 
+        _STARPU_MPI_DEBUG("<-- starpu_mpi_isend_common\n");
 	return req;
 }
 
@@ -126,7 +128,9 @@ int starpu_mpi_isend(starpu_data_handle data_handle, starpu_mpi_req *public_req,
 int starpu_mpi_isend_detached(starpu_data_handle data_handle,
 				int dest, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
 {
+        _STARPU_MPI_DEBUG("--> starpu_mpi_isend_detached\n");
 	_starpu_mpi_isend_common(data_handle, dest, mpi_tag, comm, 1, callback, arg);
+        _STARPU_MPI_DEBUG("<-- starpu_mpi_isend_detached\n");
 
 	return 0;
 }
@@ -304,7 +308,7 @@ static void starpu_mpi_test_func(struct starpu_mpi_req_s *testing_req)
 	/* Which is the mpi request we are testing for ? */
 	struct starpu_mpi_req_s *req = testing_req->other_request;
 
-        _STARPU_MPI_DEBUG("Test request %p - mpitag %x - TYPE %s %d\n", &req->request, req->mpi_tag, (req->request_type == RECV_REQ)?"recv : source":"send : dest", req->srcdst);
+        //_STARPU_MPI_DEBUG("Test request %p - mpitag %x - TYPE %s %d\n", &req->request, req->mpi_tag, (req->request_type == RECV_REQ)?"recv : source":"send : dest", req->srcdst);
 	int ret = MPI_Test(&req->request, testing_req->flag, testing_req->status);
 
 	if (*testing_req->flag)
@@ -453,8 +457,7 @@ static void test_detached_requests(void)
 		int ret = MPI_Test(&req->request, &flag, &status);
 		STARPU_ASSERT(ret == MPI_SUCCESS);
 
-
-                _STARPU_MPI_DEBUG("Test request %p - mpitag %x - TYPE %s %d\n", &req->request, req->mpi_tag, (req->request_type == RECV_REQ)?"recv : source":"send : dest", req->srcdst);
+                //_STARPU_MPI_DEBUG("Test request %p - mpitag %x - TYPE %s %d\n", &req->request, req->mpi_tag, (req->request_type == RECV_REQ)?"recv : source":"send : dest", req->srcdst);
 
 		if (flag)
 		{

+ 15 - 8
mpi/tests/ring_async_implicit.c

@@ -16,7 +16,7 @@
 
 #include <starpu_mpi.h>
 
-#define NITER	2048
+#define NITER	2//048
 
 unsigned token = 42;
 starpu_data_handle token_handle;
@@ -32,7 +32,7 @@ void increment_cpu(void *descr[], __attribute__ ((unused)) void *_args)
 }
 
 static starpu_codelet increment_cl = {
-	.where = STARPU_CPU|STARPU_CUDA,
+        .where = STARPU_CPU,//|STARPU_CUDA,
 #ifdef STARPU_USE_CUDA
 	.cuda_func = increment_cuda,
 #endif
@@ -87,8 +87,9 @@ int main(int argc, char **argv)
 
 		if (!((loop == 0) && (rank == 0)))
 		{
-			token = 0;
-			starpu_mpi_irecv_detached(token_handle, (rank+size-1)%size, tag, MPI_COMM_WORLD, NULL, NULL);
+                  //			token = 0;
+                        fprintf(stderr,"[%d] RECV %d\n", rank, tag);
+                        starpu_mpi_irecv_detached(token_handle, (rank+size-1)%size, tag, MPI_COMM_WORLD, NULL, NULL);
 		}
 		else {
 			token = 0;
@@ -96,19 +97,24 @@ int main(int argc, char **argv)
 		}
 
 		increment_token();
-		
+
 		if (!((loop == last_loop) && (rank == last_rank)))
 		{
-			starpu_mpi_isend_detached(token_handle, (rank+1)%size, tag+1, MPI_COMM_WORLD, NULL, NULL);
+                        fprintf(stderr,"[%d] SEND %d\n", rank, tag+1);
+                        starpu_mpi_isend_detached(token_handle, (rank+1)%size, tag+1, MPI_COMM_WORLD, NULL, NULL);
 		}
 		else {
-
+                        fprintf(stderr,"[%d] ACQUIRE %d\n", rank, tag);
+                        starpu_task_wait_for_all();
 			starpu_data_acquire(token_handle, STARPU_R);
 			fprintf(stdout, "Finished : token value %d\n", token);
+                        starpu_task_wait_for_all();
 			starpu_data_release(token_handle);
+                        starpu_task_wait_for_all();
 		}
 	}
 
+        fprintf(stderr, "[%d] wait for all\n", rank);
 	starpu_task_wait_for_all();
 
 	starpu_mpi_shutdown();
@@ -118,7 +124,8 @@ int main(int argc, char **argv)
 
 	if (rank == last_rank)
 	{
-		STARPU_ASSERT(token == nloops*size);
+                fprintf(stderr, "[%d] token = %d == %d * %d ?\n", rank, token, nloops, size);
+//		STARPU_ASSERT(token == nloops*size);
 	}
 
 	return 0;

+ 2 - 0
src/core/dependencies/implicit_data_deps.c

@@ -166,6 +166,7 @@ void _starpu_detect_implicit_data_deps(struct starpu_task *task)
 {
 	STARPU_ASSERT(task->cl);
 
+        fprintf(stderr, "---------------> _starpu_detect_implicit_data_deps %p\n", task);
 	unsigned nbuffers = task->cl->nbuffers;
 
 	unsigned buffer;
@@ -182,6 +183,7 @@ void _starpu_detect_implicit_data_deps(struct starpu_task *task)
 		_starpu_detect_implicit_data_deps_with_handle(task, task, handle, mode);
 		PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
 	}
+        fprintf(stderr, "<--------------- _starpu_detect_implicit_data_deps %p\n", task);
 }
 
 /* This function is called when a task has been executed so that we don't

+ 14 - 5
src/core/task.c

@@ -188,12 +188,15 @@ int starpu_task_submit(struct starpu_task *task)
 	int ret;
 	unsigned is_sync = task->synchronous;
 
+        fprintf(stderr, "---------------> starpu_task_submit %p\n", task);
 	if (is_sync)
 	{
 		/* Perhaps it is not possible to submit a synchronous
 		 * (blocking) task */
-		if (STARPU_UNLIKELY(!_starpu_worker_may_perform_blocking_calls()))
+                if (STARPU_UNLIKELY(!_starpu_worker_may_perform_blocking_calls())) {
+                        fprintf(stderr, "---------------> starpu_task_submit DEADLK %p\n", task);
 			return -EDEADLK;
+                }
 
 		task->detach = 0;
 	}
@@ -203,15 +206,18 @@ int starpu_task_submit(struct starpu_task *task)
 	if (task->cl)
 	{
 		uint32_t where = task->cl->where;
-		if (!_starpu_worker_exists(where))
+		if (!_starpu_worker_exists(where)) {
+                        fprintf(stderr, "---------------> starpu_task_submit ENODEV %p\n", task);
 			return -ENODEV;
+                }
 
 		/* In case we require that a task should be explicitely
 		 * executed on a specific worker, we make sure that the worker
 		 * is able to execute this task.  */
-		if (task->execute_on_a_specific_worker 
-			&& !_starpu_worker_may_execute_task(task->workerid, where))
+		if (task->execute_on_a_specific_worker && !_starpu_worker_may_execute_task(task->workerid, where)) {
+                        fprintf(stderr, "---------------> starpu_task_submit ENODEV %p\n", task);
 			return -ENODEV;
+                }
 
 		_starpu_detect_implicit_data_deps(task);
 	}
@@ -245,9 +251,12 @@ int starpu_task_submit(struct starpu_task *task)
 
 	ret = _starpu_submit_job(j, 0);
 
-	if (is_sync)
+	if (is_sync) {
+                fprintf(stderr, "---------------> starpu_task_submit WAIT %p\n", task);
 		_starpu_wait_job(j);
+        }
 
+        fprintf(stderr, "<--------------- starpu_task_submit %p\n", task);
 	return ret;
 }
 

+ 8 - 0
src/datawizard/user_interactions.c

@@ -22,6 +22,8 @@
 #include <datawizard/write_back.h>
 #include <core/dependencies/data_concurrency.h>
 
+#  define _STARPU_DEBUG(fmt, args ...) { fprintf(stderr, "[starpu][%s] " fmt , __func__ ,##args); fflush(stderr); }
+
 /* Explicitly ask StarPU to allocate room for a piece of data on the specified
  * memory node. */
 int starpu_data_request_allocation(starpu_data_handle handle, uint32_t node)
@@ -90,6 +92,8 @@ static void _starpu_data_acquire_continuation_non_blocking(void *arg)
 static void starpu_data_acquire_cb_pre_sync_callback(void *arg)
 {
 	struct user_interaction_wrapper *wrapper = arg;
+        _STARPU_DEBUG("-----------------> starpu_data_acquire_cb_pre_sync_callback\n");
+
 
 	/* we try to get the data, if we do not succeed immediately, we set a
  	* callback function that will be executed automatically when the data is
@@ -100,6 +104,7 @@ static void starpu_data_acquire_cb_pre_sync_callback(void *arg)
 		/* no one has locked this data yet, so we proceed immediately */
 		_starpu_data_acquire_continuation_non_blocking(wrapper);
 	}
+        _STARPU_DEBUG("<----------------- starpu_data_acquire_cb_pre_sync_callback\n");
 }
 
 /* The data must be released by calling starpu_data_release later on */
@@ -108,6 +113,8 @@ int starpu_data_acquire_cb(starpu_data_handle handle,
 {
 	STARPU_ASSERT(handle);
 
+        _STARPU_DEBUG("-----------------> starpu_data_acquire_cb\n");
+
 	struct user_interaction_wrapper *wrapper = malloc(sizeof(struct user_interaction_wrapper));
 	STARPU_ASSERT(wrapper);
 
@@ -149,6 +156,7 @@ int starpu_data_acquire_cb(starpu_data_handle handle,
 		starpu_data_acquire_cb_pre_sync_callback(wrapper);
 	}
 
+        _STARPU_DEBUG("<----------------- starpu_data_acquire_cb\n");
 	return 0;
 }
 

+ 6 - 1
tests/datawizard/acquire_release.c

@@ -30,7 +30,7 @@ static starpu_codelet increment_cl = {
 	.nbuffers = 1
 };
 
-unsigned token = 42;
+unsigned token = 0;
 starpu_data_handle token_handle;
 
 void increment_token()
@@ -54,6 +54,8 @@ int main(int argc, char **argv)
         starpu_init(NULL);
 	starpu_variable_data_register(&token_handle, 0, (uintptr_t)&token, sizeof(unsigned));
 
+        fprintf(stderr, "Token: %d\n", token);
+
 	for(i=0; i<ntasks; i++)
 	{
                 starpu_data_acquire(token_handle, STARPU_RW);
@@ -63,7 +65,10 @@ int main(int argc, char **argv)
                 starpu_data_acquire_cb(token_handle, STARPU_RW, callback, NULL);
 	}
 
+        starpu_task_wait_for_all();
 	starpu_shutdown();
+        fprintf(stderr, "Token: %d\n", token);
+        STARPU_ASSERT(token==ntasks);
 
 	return 0;
 }