Explorar el Código

/nmad : Count all pending request instead of only detached request with callback.
Request are conted before the datawizard call(MPI Finalise could be called before the callback happen).

Guillaume Beauchamp hace 8 años
padre
commit
b4df0b4e99
Se han modificado 2 ficheros con 25 adiciones y 50 borrados
  1. 1 1
      nmad/examples/Makefile.am
  2. 24 49
      nmad/src/starpu_mpi.c

+ 1 - 1
nmad/examples/Makefile.am

@@ -24,7 +24,7 @@ LOADER_BIN		=
 else
 loader_CPPFLAGS 	= 	$(AM_CFLAGS) $(AM_CPPFLAGS) -I$(top_builddir)/src/
 LOADER			=	loader
-LOADER_BIN		=	$(abs_top_builddir)/mpi/tests/$(LOADER)
+LOADER_BIN		=	$(abs_top_builddir)/nmad/tests/$(LOADER)
 loader_SOURCES		=	../../tests/loader.c
 endif
 

+ 24 - 49
nmad/src/starpu_mpi.c

@@ -29,7 +29,6 @@
 #include <datawizard/coherency.h>
 #include <nm_sendrecv_interface.h>
 
-static void _starpu_mpi_submit_new_mpi_request(void *arg);
 static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req,nm_sr_event_t event);
 #ifdef STARPU_VERBOSE
 static char *_starpu_mpi_request_type(enum _starpu_mpi_request_type request_type);
@@ -44,28 +43,14 @@ static void _starpu_mpi_handle_new_request(struct _starpu_mpi_req *req);
 
 static void _starpu_mpi_handle_pending_request(struct _starpu_mpi_req *req);
 
-/* The list of requests that have been newly submitted by the application */
-static struct _starpu_mpi_req_list *new_requests;
-
-/* The list of detached requests that have already been submitted to MPI */
-static struct _starpu_mpi_req_list *detached_requests;
-static starpu_pthread_mutex_t detached_requests_mutex;
-
-/* Condition to wake up progression thread */
-static starpu_pthread_cond_t cond_progression;
 /* Condition to wake up waiting for all current MPI requests to finish */
-static starpu_pthread_cond_t cond_finished;//FIXME no longer working nor usefull.
-static starpu_pthread_mutex_t mutex;
 static starpu_pthread_t progress_thread;
 static volatile int running = 0;
 
 /* Count requests posted by the application and not yet submitted to MPI, i.e pushed into the new_requests list */
-static starpu_pthread_mutex_t mutex_posted_requests;
-static int posted_requests = 0, newer_requests, barrier_running = 0;
-static int pending_detached = 0;
 
-#define _STARPU_MPI_INC_POSTED_REQUESTS(value) { STARPU_PTHREAD_MUTEX_LOCK(&mutex_posted_requests); posted_requests += value; STARPU_PTHREAD_MUTEX_UNLOCK(&mutex_posted_requests); }
-//TODO remove (we no longer need to count them.)
+static volatile int pending_request = 0;
+
 #define REQ_FINALIZED 0x1
 
 
@@ -99,7 +84,7 @@ static struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle
 	struct _starpu_mpi_req *req = calloc(1, sizeof(struct _starpu_mpi_req));
 	STARPU_ASSERT_MSG(req, "Invalid request");
 
-	_STARPU_MPI_INC_POSTED_REQUESTS(1);
+	STARPU_ATOMIC_ADD( &pending_request, 1);
 	nm_mpi_communicator_t*p_comm;
 	p_comm = nm_mpi_communicator_get(comm);
 
@@ -586,8 +571,6 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req,n
 	if (req->callback){
 		struct callback_lfstack_cell_s* c = padico_malloc(sizeof(struct callback_lfstack_cell_s));
 		c->req = req;
-		if(req->detached)
-			STARPU_ATOMIC_ADD(&pending_detached, 1);
 		/* The main thread can exit without waiting
 		* the end of the detached request. Callback thread
 		* must then be kept alive if they have a callback.*/
@@ -609,8 +592,10 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req,n
 			req->completed = 1;
 			piom_cond_signal(&req->req_cond, REQ_FINALIZED);
 		}
+		int pending_remaining = STARPU_ATOMIC_ADD(&pending_request, -1);
+		if (!running && !pending_remaining)
+			starpu_sem_post(&callback_sem);
 	}
-
 	_STARPU_MPI_LOG_OUT();
 }
 
@@ -713,6 +698,7 @@ void _starpu_mpi_handle_request_termination_callback(nm_sr_event_t event, const
 
 static void _starpu_mpi_handle_pending_request(struct _starpu_mpi_req *req)
 {
+
 	if(req->request_type == SEND_REQ && req->waited>1){
 		nm_sr_request_set_ref(&(req->size_req), req);
 
@@ -804,29 +790,30 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 // #ifndef STARPU_MPI_ACTIVITY
 // 		block = block && _starpu_mpi_req_list_empty(detached_requests);
 // #endif /* STARPU_MPI_ACTIVITY */
-		fprintf(stderr,"pop begin");
 		struct callback_lfstack_cell_s* c = callback_lfstack_pop(&callback_stack);
 		int err=0;
 
-		if(running || pending_detached)
+		if(running || pending_request>0)
 		{
 			err = starpu_sem_wait(&callback_sem);
-			//running/pending_detached can change while waiting
+			//running pending_request can change while waiting
 		}
 		if(c==NULL)
 		{
-			if(running || pending_detached)
-			{
-				c = callback_lfstack_pop(&callback_stack);
-				STARPU_ASSERT_MSG(c!=NULL, "Callback thread awakened without callback ready with error %d.",err);
-			}
-			else
+			c = callback_lfstack_pop(&callback_stack);
+			if (c == NULL)
 			{
-				fprintf(stderr,"pop break");
-				break;//what if there is some pending request ?
+				if(running && pending_request>0){
+					STARPU_ASSERT_MSG(c!=NULL, "Callback thread awakened without callback ready with error %d.",err);	
+				}
+				else
+				{
+					if  (pending_request==0)
+						break;
+				}
+				continue;
 			}
 		}
-		fprintf(stderr,"pop done");
 
 		// if (block)
 		// {
@@ -845,12 +832,12 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 		c->req->callback(c->req->callback_arg);
 		if (c->req->detached){
 			_starpu_mpi_request_destroy(c->req);
-			STARPU_ATOMIC_ADD(&pending_detached, -1);
 		}
 		else{
 			c->req->completed=1;
 			piom_cond_signal(&(c->req->req_cond), REQ_FINALIZED);
 		}
+		STARPU_ATOMIC_ADD( &pending_request, -1);
 		/* we signal that the request is completed.*/
 
 
@@ -887,7 +874,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 		MPI_Finalize();
 	}
 
-	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+	// STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
 	starpu_sem_destroy(&callback_sem);
 	free(argc_argv);
 	return NULL;
@@ -940,17 +927,6 @@ static void _starpu_mpi_add_sync_point_in_fxt(void)
 static
 int _starpu_mpi_initialize(int *argc, char ***argv, int initialize_mpi)
 {
-	STARPU_PTHREAD_MUTEX_INIT(&mutex, NULL);
-	STARPU_PTHREAD_COND_INIT(&cond_progression, NULL);
-	STARPU_PTHREAD_COND_INIT(&cond_finished, NULL);
-	new_requests = _starpu_mpi_req_list_new();
-
-	STARPU_PTHREAD_MUTEX_INIT(&detached_requests_mutex, NULL);
-	detached_requests = _starpu_mpi_req_list_new();
-
-	STARPU_PTHREAD_MUTEX_INIT(&mutex_posted_requests, NULL);
-
-	starpu_sem_init(&callback_sem, 0, 0);
 
 	struct _starpu_mpi_argc_argv *argc_argv = malloc(sizeof(struct _starpu_mpi_argc_argv));
 	argc_argv->initialize_mpi = initialize_mpi;
@@ -968,6 +944,8 @@ int _starpu_mpi_initialize(int *argc, char ***argv, int initialize_mpi)
 		}
 		_starpu_mpi_print_thread_level_support(thread_support, "_Init_thread level =");
 	}
+
+	starpu_sem_init(&callback_sem, 0, 0);
 	running = 1;
 
 	STARPU_PTHREAD_CREATE(&progress_thread, NULL, _starpu_mpi_progress_thread_func, argc_argv);
@@ -1038,9 +1016,6 @@ int starpu_mpi_shutdown(void)
 
 	TRACE_MPI_STOP(rank, world_size);
 
-	/* free the request queues */
-	_starpu_mpi_req_list_delete(detached_requests);
-	_starpu_mpi_req_list_delete(new_requests);
 
 	_starpu_mpi_comm_amounts_display(rank);
 	_starpu_mpi_comm_amounts_free();