Browse Source

/nmad/ : Fix nmad module

Mpi request is now freed after a succefull wait/test.
Added a timout to the test (they hang when asserted).
Callback thread is awakened when starpu mpi shutdown (and exit).
Main thread mark the callback thread as runable in order to avoid race condition 
  (if the main thread finish before the callback initialise).
Normal request fused with detached request
(with nmad all request are detached).
Guillaume Beauchamp 8 years ago
parent
commit
e57895fbbc
5 changed files with 93 additions and 65 deletions
  1. 1 1
      nmad/src/Makefile.am
  2. 89 56
      nmad/src/starpu_mpi.c
  3. 0 5
      nmad/src/starpu_mpi_private.h
  4. 2 2
      nmad/tests/Makefile.am
  5. 1 1
      nmad/tests/datatypes.c

+ 1 - 1
nmad/src/Makefile.am

@@ -31,7 +31,7 @@ lib_LTLIBRARIES = libstarpumpi-@STARPU_EFFECTIVE_VERSION@.la
 libstarpumpi_@STARPU_EFFECTIVE_VERSION@_la_LIBADD = $(top_builddir)/src/libstarpu-@STARPU_EFFECTIVE_VERSION@.la
 libstarpumpi_@STARPU_EFFECTIVE_VERSION@_la_LIBADD = $(top_builddir)/src/libstarpu-@STARPU_EFFECTIVE_VERSION@.la
 libstarpumpi_@STARPU_EFFECTIVE_VERSION@_la_LDFLAGS = $(ldflags) -no-undefined					\
 libstarpumpi_@STARPU_EFFECTIVE_VERSION@_la_LDFLAGS = $(ldflags) -no-undefined					\
   -version-info $(LIBSTARPUMPI_INTERFACE_CURRENT):$(LIBSTARPUMPI_INTERFACE_REVISION):$(LIBSTARPUMPI_INTERFACE_AGE) \
   -version-info $(LIBSTARPUMPI_INTERFACE_CURRENT):$(LIBSTARPUMPI_INTERFACE_REVISION):$(LIBSTARPUMPI_INTERFACE_AGE) \
-  $(MPICC_LDFLAGS) $(FXT_LDFLAGS) $(NMAD_LDFLAGS)
+  $(MPICC_LDFLAGS) $(FXT_LDFLAGS)
 noinst_HEADERS =					\
 noinst_HEADERS =					\
 	starpu_mpi_private.h				\
 	starpu_mpi_private.h				\
 	starpu_mpi_fxt.h				\
 	starpu_mpi_fxt.h				\

+ 89 - 56
nmad/src/starpu_mpi.c

@@ -29,9 +29,6 @@
 #include <datawizard/coherency.h>
 #include <datawizard/coherency.h>
 #include <nm_sendrecv_interface.h>
 #include <nm_sendrecv_interface.h>
 
 
-#define nm_mpi_communicator_get(c) NULL 
-#define nm_mpi_datatype_get(c) NULL
-
 static void _starpu_mpi_submit_new_mpi_request(void *arg);
 static void _starpu_mpi_submit_new_mpi_request(void *arg);
 static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req,nm_sr_event_t event);
 static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req,nm_sr_event_t event);
 #ifdef STARPU_VERBOSE
 #ifdef STARPU_VERBOSE
@@ -39,10 +36,10 @@ static char *_starpu_mpi_request_type(enum _starpu_mpi_request_type request_type
 #endif
 #endif
 static struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t data_handle,
 static struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t data_handle,
 							int dest, int mpi_tag, MPI_Comm comm,
 							int dest, int mpi_tag, MPI_Comm comm,
-							unsigned detached, unsigned sync, void (*callback)(void *), void *arg);
+							unsigned sync, void (*callback)(void *), void *arg);
 static struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle,
 static struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle,
 							int source, int mpi_tag, MPI_Comm comm,
 							int source, int mpi_tag, MPI_Comm comm,
-							unsigned detached, unsigned sync, void (*callback)(void *), void *arg, int sequential_consistency);
+							unsigned sync, void (*callback)(void *), void *arg, int sequential_consistency);
 static void _starpu_mpi_handle_new_request(struct _starpu_mpi_req *req);
 static void _starpu_mpi_handle_new_request(struct _starpu_mpi_req *req);
 
 
 static void _starpu_mpi_handle_detached_request(struct _starpu_mpi_req *req);
 static void _starpu_mpi_handle_detached_request(struct _starpu_mpi_req *req);
@@ -72,11 +69,17 @@ static int posted_requests = 0, newer_requests, barrier_running = 0;
 
 
 
 
 
 
-PUK_LFSTACK_TYPE(callback,	void *callback_arg; void (*callback)(void *););
-static callback_lfstack_t callback_stack = NULL;
+PUK_LFSTACK_TYPE(callback,	void *callback_arg; void (*callback)(void *);unsigned* completed;piom_cond_t*req_cond;);
+static volatile callback_lfstack_t callback_stack = NULL;
 
 
 static starpu_sem_t callback_sem;
 static starpu_sem_t callback_sem;
 
 
+static void _starpu_mpi_request_destroy(struct _starpu_mpi_req *req)
+{
+	piom_cond_destroy(&(req->req_cond));
+	free(req);
+}
+
 /********************************************************/
 /********************************************************/
 /*                                                      */
 /*                                                      */
 /*  Send/Receive functionalities                        */
 /*  Send/Receive functionalities                        */
@@ -85,7 +88,7 @@ static starpu_sem_t callback_sem;
 
 
 static struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle_t data_handle,
 static struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle_t data_handle,
 							      int srcdst, int mpi_tag, MPI_Comm comm,
 							      int srcdst, int mpi_tag, MPI_Comm comm,
-							      unsigned detached, unsigned sync, void (*callback)(void *), void *arg,
+							      unsigned sync, void (*callback)(void *), void *arg,
 							      enum _starpu_mpi_request_type request_type, void (*func)(struct _starpu_mpi_req *),
 							      enum _starpu_mpi_request_type request_type, void (*func)(struct _starpu_mpi_req *),
 							      enum starpu_data_access_mode mode, 
 							      enum starpu_data_access_mode mode, 
 							      int sequential_consistency)
 							      int sequential_consistency)
@@ -96,11 +99,11 @@ static struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle
 	STARPU_ASSERT_MSG(req, "Invalid request");
 	STARPU_ASSERT_MSG(req, "Invalid request");
 
 
 	_STARPU_MPI_INC_POSTED_REQUESTS(1);
 	_STARPU_MPI_INC_POSTED_REQUESTS(1);
-	nm_mpi_communicator_t*p_comm = nm_mpi_communicator_get(comm);
+	nm_mpi_communicator_t*p_comm;
+	p_comm = nm_mpi_communicator_get(comm);
 
 
 	/* Initialize the request structure */
 	/* Initialize the request structure */
 	req->completed = 0;
 	req->completed = 0;
-	STARPU_PTHREAD_MUTEX_INIT(&req->req_mutex, NULL);
 	piom_cond_init(&req->req_cond, 0);
 	piom_cond_init(&req->req_cond, 0);
 
 
 	req->request_type = request_type;
 	req->request_type = request_type;
@@ -113,7 +116,6 @@ static struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle
 	req->session = nm_mpi_communicator_get_session(p_comm);
 	req->session = nm_mpi_communicator_get_session(p_comm);
 	req->gate = nm_mpi_communicator_get_gate(p_comm,req->srcdst);
 	req->gate = nm_mpi_communicator_get_gate(p_comm,req->srcdst);
 
 
-	req->detached = detached;
 	req->sync = sync;
 	req->sync = sync;
 	req->callback = callback;
 	req->callback = callback;
 	req->callback_arg = arg;
 	req->callback_arg = arg;
@@ -221,9 +223,9 @@ static void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
 
 
 static struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t data_handle,
 static struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t data_handle,
 							int dest, int mpi_tag, MPI_Comm comm,
 							int dest, int mpi_tag, MPI_Comm comm,
-							unsigned detached, unsigned sync, void (*callback)(void *), void *arg)
+							 unsigned sync, void (*callback)(void *), void *arg)
 {
 {
-	return _starpu_mpi_isend_irecv_common(data_handle, dest, mpi_tag, comm, detached, sync, callback, arg, SEND_REQ, _starpu_mpi_isend_size_func, STARPU_R,1);
+	return _starpu_mpi_isend_irecv_common(data_handle, dest, mpi_tag, comm, sync, callback, arg, SEND_REQ, _starpu_mpi_isend_size_func, STARPU_R,1);
 }
 }
 
 
 int starpu_mpi_isend(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int dest, int mpi_tag, MPI_Comm comm)
 int starpu_mpi_isend(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int dest, int mpi_tag, MPI_Comm comm)
@@ -232,7 +234,7 @@ int starpu_mpi_isend(starpu_data_handle_t data_handle, starpu_mpi_req *public_re
 	STARPU_ASSERT_MSG(public_req, "starpu_mpi_isend needs a valid starpu_mpi_req");
 	STARPU_ASSERT_MSG(public_req, "starpu_mpi_isend needs a valid starpu_mpi_req");
 
 
 	struct _starpu_mpi_req *req;
 	struct _starpu_mpi_req *req;
-	req = _starpu_mpi_isend_common(data_handle, dest, mpi_tag, comm, 0, 0, NULL, NULL);
+	req = _starpu_mpi_isend_common(data_handle, dest, mpi_tag, comm, 0, NULL, NULL);
 
 
 	STARPU_ASSERT_MSG(req, "Invalid return for _starpu_mpi_isend_common");
 	STARPU_ASSERT_MSG(req, "Invalid return for _starpu_mpi_isend_common");
 	*public_req = req;
 	*public_req = req;
@@ -245,7 +247,7 @@ int starpu_mpi_isend_detached(starpu_data_handle_t data_handle,
 			      int dest, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
 			      int dest, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
 {
 {
 	_STARPU_MPI_LOG_IN();
 	_STARPU_MPI_LOG_IN();
-	_starpu_mpi_isend_common(data_handle, dest, mpi_tag, comm, 1, 0, callback, arg);
+	_starpu_mpi_isend_common(data_handle, dest, mpi_tag, comm, 0, callback, arg);
 
 
 	_STARPU_MPI_LOG_OUT();
 	_STARPU_MPI_LOG_OUT();
 	return 0;
 	return 0;
@@ -272,7 +274,7 @@ int starpu_mpi_issend(starpu_data_handle_t data_handle, starpu_mpi_req *public_r
 	STARPU_ASSERT_MSG(public_req, "starpu_mpi_issend needs a valid starpu_mpi_req");
 	STARPU_ASSERT_MSG(public_req, "starpu_mpi_issend needs a valid starpu_mpi_req");
 
 
 	struct _starpu_mpi_req *req;
 	struct _starpu_mpi_req *req;
-	req = _starpu_mpi_isend_common(data_handle, dest, mpi_tag, comm, 0, 1, NULL, NULL);
+	req = _starpu_mpi_isend_common(data_handle, dest, mpi_tag, comm, 1, NULL, NULL);
 
 
 	STARPU_ASSERT_MSG(req, "Invalid return for _starpu_mpi_isend_common");
 	STARPU_ASSERT_MSG(req, "Invalid return for _starpu_mpi_isend_common");
 	*public_req = req;
 	*public_req = req;
@@ -285,7 +287,7 @@ int starpu_mpi_issend_detached(starpu_data_handle_t data_handle, int dest, int m
 {
 {
 	_STARPU_MPI_LOG_IN();
 	_STARPU_MPI_LOG_IN();
 
 
-	_starpu_mpi_isend_common(data_handle, dest, mpi_tag, comm, 1, 1, callback, arg);
+	_starpu_mpi_isend_common(data_handle, dest, mpi_tag, comm, 1, callback, arg);
 
 
 	_STARPU_MPI_LOG_OUT();
 	_STARPU_MPI_LOG_OUT();
 	return 0;
 	return 0;
@@ -310,7 +312,7 @@ static void _starpu_mpi_irecv_data_func(struct _starpu_mpi_req *req)
 	nm_mpi_data_build(&data, (void*)req->ptr,  nm_mpi_datatype_get(req->datatype), req->count);
 	nm_mpi_data_build(&data, (void*)req->ptr,  nm_mpi_datatype_get(req->datatype), req->count);
 	nm_sr_recv_init(req->session, &(req->request));
 	nm_sr_recv_init(req->session, &(req->request));
 	nm_sr_recv_unpack_data(req->session, &(req->request), &data);
 	nm_sr_recv_unpack_data(req->session, &(req->request), &data);
-	req->ret = nm_sr_recv_irecv(req->session, &(req->request), req->gate, req->mpi_tag,0);
+	req->ret = nm_sr_recv_irecv(req->session, &(req->request), req->gate, req->mpi_tag,NM_TAG_MASK_FULL);
 
 
 	STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_IRecv returning %d", req->ret);
 	STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_IRecv returning %d", req->ret);
 
 
@@ -356,14 +358,14 @@ static void _starpu_mpi_irecv_size_func(struct _starpu_mpi_req *req)
 		callback->req = req;
 		callback->req = req;
 		starpu_variable_data_register(&callback->handle, 0, (uintptr_t)&(callback->req->count), sizeof(callback->req->count));
 		starpu_variable_data_register(&callback->handle, 0, (uintptr_t)&(callback->req->count), sizeof(callback->req->count));
 		_STARPU_MPI_DEBUG(4, "Receiving size with tag %d from node %d\n", req->mpi_tag, req->srcdst);
 		_STARPU_MPI_DEBUG(4, "Receiving size with tag %d from node %d\n", req->mpi_tag, req->srcdst);
-		_starpu_mpi_irecv_common(callback->handle, req->srcdst, req->mpi_tag, req->comm, 1, 0, _starpu_mpi_irecv_size_callback, callback,1);
+		_starpu_mpi_irecv_common(callback->handle, req->srcdst, req->mpi_tag, req->comm, 0, _starpu_mpi_irecv_size_callback, callback,1);
 	}
 	}
 
 
 }
 }
 
 
-static struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, unsigned detached, unsigned sync, void (*callback)(void *), void *arg, int sequential_consistency)
+static struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, unsigned sync, void (*callback)(void *), void *arg, int sequential_consistency)
 {
 {
-	return _starpu_mpi_isend_irecv_common(data_handle, source, mpi_tag, comm, detached, sync, callback, arg, RECV_REQ, _starpu_mpi_irecv_size_func, STARPU_W,sequential_consistency);
+	return _starpu_mpi_isend_irecv_common(data_handle, source, mpi_tag, comm, sync, callback, arg, RECV_REQ, _starpu_mpi_irecv_size_func, STARPU_W,sequential_consistency);
 }
 }
 
 
 int starpu_mpi_irecv(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int source, int mpi_tag, MPI_Comm comm)
 int starpu_mpi_irecv(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int source, int mpi_tag, MPI_Comm comm)
@@ -373,7 +375,7 @@ int starpu_mpi_irecv(starpu_data_handle_t data_handle, starpu_mpi_req *public_re
 
 
 	struct _starpu_mpi_req *req;
 	struct _starpu_mpi_req *req;
 	TRACE_MPI_IRECV_COMPLETE_BEGIN(source, mpi_tag);
 	TRACE_MPI_IRECV_COMPLETE_BEGIN(source, mpi_tag);
-	req = _starpu_mpi_irecv_common(data_handle, source, mpi_tag, comm, 0, 0, NULL, NULL,1);
+	req = _starpu_mpi_irecv_common(data_handle, source, mpi_tag, comm, 0, NULL, NULL,1);
 	TRACE_MPI_IRECV_COMPLETE_END(source, mpi_tag);
 	TRACE_MPI_IRECV_COMPLETE_END(source, mpi_tag);
 
 
 	STARPU_ASSERT_MSG(req, "Invalid return for _starpu_mpi_irecv_common");
 	STARPU_ASSERT_MSG(req, "Invalid return for _starpu_mpi_irecv_common");
@@ -386,7 +388,7 @@ int starpu_mpi_irecv(starpu_data_handle_t data_handle, starpu_mpi_req *public_re
 int starpu_mpi_irecv_detached(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
 int starpu_mpi_irecv_detached(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
 {
 {
 	_STARPU_MPI_LOG_IN();
 	_STARPU_MPI_LOG_IN();
-	_starpu_mpi_irecv_common(data_handle, source, mpi_tag, comm, 1, 0, callback, arg,1);
+	_starpu_mpi_irecv_common(data_handle, source, mpi_tag, comm, 0, callback, arg,1);
 	_STARPU_MPI_LOG_OUT();
 	_STARPU_MPI_LOG_OUT();
 	return 0;
 	return 0;
 }
 }
@@ -402,7 +404,7 @@ int starpu_mpi_irecv_detached_sequential_consistency(starpu_data_handle_t data_h
 //	if (tag == -1)
 //	if (tag == -1)
 //		starpu_data_set_tag(data_handle, data_tag);
 //		starpu_data_set_tag(data_handle, data_tag);
 
 
-	_starpu_mpi_irecv_common(data_handle, source, data_tag, comm, 1, 0, callback, arg, sequential_consistency);
+	_starpu_mpi_irecv_common(data_handle, source, data_tag, comm, 0, callback, arg, sequential_consistency);
 
 
 	_STARPU_MPI_LOG_OUT();
 	_STARPU_MPI_LOG_OUT();
 	return 0;
 	return 0;
@@ -430,7 +432,7 @@ int starpu_mpi_recv(starpu_data_handle_t data_handle, int source, int mpi_tag, M
 // {
 // {
 // 	_STARPU_MPI_LOG_IN();
 // 	_STARPU_MPI_LOG_IN();
 // 	/* Which is the mpi request we are waiting for ? */
 // 	/* Which is the mpi request we are waiting for ? */
-// 	struct _starpu_mpi_req *req = waiting_req->other_request;
+// 	struct _starpu_mpi_req *req = waiting_req->other_requestreq_mutex;
 
 
 // 	TRACE_MPI_UWAIT_BEGIN(req->srcdst, req->mpi_tag);
 // 	TRACE_MPI_UWAIT_BEGIN(req->srcdst, req->mpi_tag);
 
 
@@ -447,12 +449,21 @@ int starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status)
 {
 {
 	struct _starpu_mpi_req *req = *public_req;
 	struct _starpu_mpi_req *req = *public_req;
 	_STARPU_MPI_LOG_IN();
 	_STARPU_MPI_LOG_IN();
-	while (!req->completed)
+
+/* we must do a test_locked to avoid race condition : 
+ * without req_cond could still be used and couldn't be freed)*/
+
+	while (!req->completed || ! piom_cond_test_locked(&(req->req_cond),REQ_FINALIZED)){
 		piom_cond_wait(&(req->req_cond),REQ_FINALIZED);
 		piom_cond_wait(&(req->req_cond),REQ_FINALIZED);
+	}
+
 	if (status!=MPI_STATUS_IGNORE)
 	if (status!=MPI_STATUS_IGNORE)
 		_starpu_mpi_req_status(req,status);
 		_starpu_mpi_req_status(req,status);
+
+	_starpu_mpi_request_destroy(req);
+	*public_req = NULL;
 	_STARPU_MPI_LOG_OUT();
 	_STARPU_MPI_LOG_OUT();
-	return req->ret; //FIXME May have already been freed ?
+	return MPI_SUCCESS;
 }
 }
 
 
 /********************************************************/
 /********************************************************/
@@ -471,11 +482,17 @@ int starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status)
 
 
 	TRACE_MPI_UTESTING_BEGIN(req->srcdst, req->mpi_tag);
 	TRACE_MPI_UTESTING_BEGIN(req->srcdst, req->mpi_tag);
 
 
-	*flag = req->completed;
+/* we must do a test_locked to avoid race condition : 
+ * without req_cond could still be used and couldn't be freed)*/
+
+	*flag = req->completed && piom_cond_test_locked(&(req->req_cond),REQ_FINALIZED);
 	if (*flag && status!=MPI_STATUS_IGNORE)
 	if (*flag && status!=MPI_STATUS_IGNORE)
 		_starpu_mpi_req_status(req,status);
 		_starpu_mpi_req_status(req,status);
 	TRACE_MPI_UTESTING_END(req->srcdst, req->mpi_tag);
 	TRACE_MPI_UTESTING_END(req->srcdst, req->mpi_tag);
-
+	if(*flag){
+		_starpu_mpi_request_destroy(req);
+		*public_req = NULL;
+	}
 	_STARPU_MPI_LOG_OUT();
 	_STARPU_MPI_LOG_OUT();
 	return MPI_SUCCESS;
 	return MPI_SUCCESS;
 }
 }
@@ -562,14 +579,18 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req,n
 		struct callback_lfstack_cell_s* c = padico_malloc(sizeof(struct callback_lfstack_cell_s));
 		struct callback_lfstack_cell_s* c = padico_malloc(sizeof(struct callback_lfstack_cell_s));
 		c->callback = req->callback;
 		c->callback = req->callback;
 		c->callback_arg = req->callback_arg;
 		c->callback_arg = req->callback_arg;
+		c->completed=&req->completed;
+		c->req_cond = &req->req_cond;
 		callback_lfstack_push(&callback_stack, c);
 		callback_lfstack_push(&callback_stack, c);
 		starpu_sem_post(&callback_sem);
 		starpu_sem_post(&callback_sem);
 	}
 	}
-
+	else
+	{
 	/* tell anyone potentially waiting on the request that it is
 	/* tell anyone potentially waiting on the request that it is
-	 * terminated now */
-	req->completed = 1;
-	piom_cond_signal(&req->req_cond, REQ_FINALIZED)
+	 * terminated now (should be done after the callback)*/
+		req->completed = 1;
+		piom_cond_signal(&req->req_cond, REQ_FINALIZED);
+	}
 
 
 	_STARPU_MPI_LOG_OUT();
 	_STARPU_MPI_LOG_OUT();
 }
 }
@@ -672,16 +693,13 @@ void _starpu_mpi_handle_request_termination_callback(nm_sr_event_t event, const
 
 
 static void _starpu_mpi_handle_detached_request(struct _starpu_mpi_req *req)
 static void _starpu_mpi_handle_detached_request(struct _starpu_mpi_req *req)
 {
 {
-	if (req->detached)
-	{
-		nm_sr_request_set_ref(&(req->request), req);
+	nm_sr_request_set_ref(&(req->request), req);
 
 
-		nm_sr_request_monitor(req->session, &(req->request), NM_SR_EVENT_FINALIZED,_starpu_mpi_handle_request_termination_callback);
-		if(req->request_type == SEND_REQ && req->waited>1){
-			nm_sr_request_set_ref(&(req->size_req), req);
+	nm_sr_request_monitor(req->session, &(req->request), NM_SR_EVENT_FINALIZED,_starpu_mpi_handle_request_termination_callback);
+	if(req->request_type == SEND_REQ && req->waited>1){
+		nm_sr_request_set_ref(&(req->size_req), req);
 
 
-			nm_sr_request_monitor(req->session, &(req->size_req), NM_SR_EVENT_FINALIZED,_starpu_mpi_handle_request_termination_callback);
-		}
+		nm_sr_request_monitor(req->session, &(req->size_req), NM_SR_EVENT_FINALIZED,_starpu_mpi_handle_request_termination_callback);
 	}
 	}
 }
 }
 
 
@@ -750,16 +768,13 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 
 
 	/* notify the main thread that the progression thread is ready */ //Why?
 	/* notify the main thread that the progression thread is ready */ //Why?
 //	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
 //	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
-	running = 1;
 /*	STARPU_PTHREAD_COND_BROADCAST(&cond_progression);
 /*	STARPU_PTHREAD_COND_BROADCAST(&cond_progression);
 	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
 	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
 
 
 	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
 	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
-	printf("CALLBACK T START\n");
 	*/
 	*/
-	while (running || posted_requests || !(_starpu_mpi_req_list_empty(new_requests)) || !(_starpu_mpi_req_list_empty(detached_requests)))
-	{//FIXME how do i know if it there won't be any other callback?
-		printf("CALLBACK T IT\n");
+	while (1)// || posted_requests || !(_starpu_mpi_req_list_empty(new_requests)) || !(_starpu_mpi_req_list_empty(detached_requests)))
+	{
 
 
 		/* shall we block ? */
 		/* shall we block ? */
 // 		unsigned block = _starpu_mpi_req_list_empty(new_requests);
 // 		unsigned block = _starpu_mpi_req_list_empty(new_requests);
@@ -767,7 +782,24 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 // #ifndef STARPU_MPI_ACTIVITY
 // #ifndef STARPU_MPI_ACTIVITY
 // 		block = block && _starpu_mpi_req_list_empty(detached_requests);
 // 		block = block && _starpu_mpi_req_list_empty(detached_requests);
 // #endif /* STARPU_MPI_ACTIVITY */
 // #endif /* STARPU_MPI_ACTIVITY */
-		starpu_sem_wait(&callback_sem);
+		struct callback_lfstack_cell_s* c = callback_lfstack_pop(&callback_stack);
+		int err=0;
+
+		if(running)
+		{
+			err = starpu_sem_wait(&callback_sem);
+			//running can be changed while waiting
+		}
+		if(c==NULL)
+		{
+			if(running)
+			{
+				c = callback_lfstack_pop(&callback_stack);
+				STARPU_ASSERT_MSG(c!=NULL, "Callback thread awakened without callback ready with error %d.",err);
+			}
+			else
+				break;//what if there is some pending request ?
+		}
 
 
 		// if (block)
 		// if (block)
 		// {
 		// {
@@ -783,9 +815,11 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 		// 	TRACE_MPI_SLEEP_END();
 		// 	TRACE_MPI_SLEEP_END();
 		// }
 		// }
 
 
-		struct callback_lfstack_cell_s* c = callback_lfstack_pop(&callback_stack);
-		STARPU_ASSERT_MSG(c!=NULL, "Callback thread awakened without callback ready.");
 		c->callback(c->callback_arg);
 		c->callback(c->callback_arg);
+		*(c->completed)=1;
+		piom_cond_signal(c->req_cond, REQ_FINALIZED);
+		/* we signal that the request is completed.*/
+
 		free(c);
 		free(c);
 
 
 		// /* test whether there are some terminated "detached request" */
 		// /* test whether there are some terminated "detached request" */
@@ -808,11 +842,10 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 		// 	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
 		// 	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
 		// }
 		// }
 	}
 	}
-	printf("CALLBACK IT END\n");
-
-	STARPU_ASSERT_MSG(_starpu_mpi_req_list_empty(detached_requests), "List of detached requests not empty");
-	STARPU_ASSERT_MSG(_starpu_mpi_req_list_empty(new_requests), "List of new requests not empty");
-	STARPU_ASSERT_MSG(posted_requests == 0, "Number of posted request is not zero");
+		STARPU_ASSERT_MSG(callback_lfstack_pop(&callback_stack)==NULL, "List of callback not empty.");
+	//STARPU_ASSERT_MSG(_starpu_mpi_req_list_empty(detached_requests), "List of detached requests not empty");
+	//STARPU_ASSERT_MSG(_starpu_mpi_req_list_empty(new_requests), "List of new requests not empty");
+	//STARPU_ASSERT_MSG(posted_requests == 0, "Number of posted request is not zero");
 
 
 	if (argc_argv->initialize_mpi)
 	if (argc_argv->initialize_mpi)
 	{
 	{
@@ -893,7 +926,6 @@ int _starpu_mpi_initialize(int *argc, char ***argv, int initialize_mpi)
 
 
 	if (initialize_mpi)
 	if (initialize_mpi)
 	{
 	{
-		printf("Init MPI\n");
 		int thread_support;
 		int thread_support;
 		_STARPU_DEBUG("Calling MPI_Init_thread\n");
 		_STARPU_DEBUG("Calling MPI_Init_thread\n");
 		if (MPI_Init_thread(argc_argv->argc, argc_argv->argv, MPI_THREAD_SERIALIZED, &thread_support) != MPI_SUCCESS)
 		if (MPI_Init_thread(argc_argv->argc, argc_argv->argv, MPI_THREAD_SERIALIZED, &thread_support) != MPI_SUCCESS)
@@ -902,7 +934,7 @@ int _starpu_mpi_initialize(int *argc, char ***argv, int initialize_mpi)
 		}
 		}
 		_starpu_mpi_print_thread_level_support(thread_support, "_Init_thread level =");
 		_starpu_mpi_print_thread_level_support(thread_support, "_Init_thread level =");
 	}
 	}
-
+	running = 1;
 
 
 	STARPU_PTHREAD_CREATE(&progress_thread, NULL, _starpu_mpi_progress_thread_func, argc_argv);
 	STARPU_PTHREAD_CREATE(&progress_thread, NULL, _starpu_mpi_progress_thread_func, argc_argv);
 
 
@@ -960,6 +992,7 @@ int starpu_mpi_shutdown(void)
 	/* kill the progression thread */
 	/* kill the progression thread */
 //	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
 //	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
 	running = 0;
 	running = 0;
+	starpu_sem_post(&callback_sem);
 //	STARPU_PTHREAD_COND_BROADCAST(&cond_progression);
 //	STARPU_PTHREAD_COND_BROADCAST(&cond_progression);
 //	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
 //	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
 
 

+ 0 - 5
nmad/src/starpu_mpi_private.h

@@ -142,7 +142,6 @@ LIST_TYPE(_starpu_mpi_req,
 	unsigned sync;
 	unsigned sync;
 
 
 	int ret;
 	int ret;
-	starpu_pthread_mutex_t req_mutex;
 	piom_cond_t req_cond;
 	piom_cond_t req_cond;
 
 
 	enum _starpu_mpi_request_type request_type; /* 0 send, 1 recv */
 	enum _starpu_mpi_request_type request_type; /* 0 send, 1 recv */
@@ -150,12 +149,8 @@ LIST_TYPE(_starpu_mpi_req,
 	unsigned submitted;
 	unsigned submitted;
 	unsigned completed;
 	unsigned completed;
 
 
-	/* In the case of a Wait/Test request, we are going to post a request
-	 * to test the completion of another request */
-	struct _starpu_mpi_req *other_request;
 
 
 	/* in the case of detached requests */
 	/* in the case of detached requests */
-	unsigned detached;
 	void *callback_arg;
 	void *callback_arg;
 	void (*callback)(void *);
 	void (*callback)(void *);
 
 

+ 2 - 2
nmad/tests/Makefile.am

@@ -30,9 +30,9 @@ endif
 
 
 # we always test on 4 processes, the execution time is not that bigger
 # we always test on 4 processes, the execution time is not that bigger
 if STARPU_QUICK_CHECK
 if STARPU_QUICK_CHECK
-MPI			=	$(MPIEXEC) $(MPIEXEC_ARGS) -np 4
+MPI			=	$(MPIEXEC) $(MPIEXEC_ARGS) -np 4 --timeout 50
 else
 else
-MPI			=	$(MPIEXEC) $(MPIEXEC_ARGS) -np 4
+MPI			=	$(MPIEXEC) $(MPIEXEC_ARGS) -np 4 --timeout 50
 endif
 endif
 
 
 if STARPU_HAVE_AM111
 if STARPU_HAVE_AM111

+ 1 - 1
nmad/tests/datatypes.c

@@ -352,7 +352,7 @@ int main(int argc, char **argv)
 		MPI_Finalize();
 		MPI_Finalize();
 		return STARPU_TEST_SKIPPED;
 		return STARPU_TEST_SKIPPED;
 	}
 	}
-
+	
 	exchange_void(rank, &error);
 	exchange_void(rank, &error);
 	exchange_variable(rank, &error);
 	exchange_variable(rank, &error);
 	exchange_vector(rank, &error);
 	exchange_vector(rank, &error);