浏览代码

nmad/ : detached request are now freed after it's callback.
data callback is now added after size callback (detached request can be freed during their data callback)

Guillaume Beauchamp 8 年之前
父节点
当前提交
2b70b6179e
共有 2 个文件被更改,包括 85 次插入50 次删除
  1. 84 50
      nmad/src/starpu_mpi.c
  2. 1 0
      nmad/src/starpu_mpi_private.h

+ 84 - 50
nmad/src/starpu_mpi.c

@@ -36,13 +36,13 @@ static char *_starpu_mpi_request_type(enum _starpu_mpi_request_type request_type
 #endif
 #endif
 static struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t data_handle,
 static struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t data_handle,
 							int dest, int mpi_tag, MPI_Comm comm,
 							int dest, int mpi_tag, MPI_Comm comm,
-							unsigned sync, void (*callback)(void *), void *arg);
+							unsigned detached, unsigned sync, void (*callback)(void *), void *arg);
 static struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle,
 static struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle,
 							int source, int mpi_tag, MPI_Comm comm,
 							int source, int mpi_tag, MPI_Comm comm,
-							unsigned sync, void (*callback)(void *), void *arg, int sequential_consistency);
+							unsigned detached, unsigned sync, void (*callback)(void *), void *arg, int sequential_consistency);
 static void _starpu_mpi_handle_new_request(struct _starpu_mpi_req *req);
 static void _starpu_mpi_handle_new_request(struct _starpu_mpi_req *req);
 
 
-static void _starpu_mpi_handle_detached_request(struct _starpu_mpi_req *req);
+static void _starpu_mpi_handle_pending_request(struct _starpu_mpi_req *req);
 
 
 /* The list of requests that have been newly submitted by the application */
 /* The list of requests that have been newly submitted by the application */
 static struct _starpu_mpi_req_list *new_requests;
 static struct _starpu_mpi_req_list *new_requests;
@@ -62,6 +62,7 @@ static volatile int running = 0;
 /* Count requests posted by the application and not yet submitted to MPI, i.e pushed into the new_requests list */
 /* Count requests posted by the application and not yet submitted to MPI, i.e pushed into the new_requests list */
 static starpu_pthread_mutex_t mutex_posted_requests;
 static starpu_pthread_mutex_t mutex_posted_requests;
 static int posted_requests = 0, newer_requests, barrier_running = 0;
 static int posted_requests = 0, newer_requests, barrier_running = 0;
+static int pending_detached = 0;
 
 
 #define _STARPU_MPI_INC_POSTED_REQUESTS(value) { STARPU_PTHREAD_MUTEX_LOCK(&mutex_posted_requests); posted_requests += value; STARPU_PTHREAD_MUTEX_UNLOCK(&mutex_posted_requests); }
 #define _STARPU_MPI_INC_POSTED_REQUESTS(value) { STARPU_PTHREAD_MUTEX_LOCK(&mutex_posted_requests); posted_requests += value; STARPU_PTHREAD_MUTEX_UNLOCK(&mutex_posted_requests); }
 //TODO remove (we no longer need to count them.)
 //TODO remove (we no longer need to count them.)
@@ -69,8 +70,8 @@ static int posted_requests = 0, newer_requests, barrier_running = 0;
 
 
 
 
 
 
-PUK_LFSTACK_TYPE(callback,	void *callback_arg; void (*callback)(void *);unsigned* completed;piom_cond_t*req_cond;);
-static volatile callback_lfstack_t callback_stack = NULL;
+PUK_LFSTACK_TYPE(callback,	struct _starpu_mpi_req *req;);
+static callback_lfstack_t callback_stack = NULL;
 
 
 static starpu_sem_t callback_sem;
 static starpu_sem_t callback_sem;
 
 
@@ -88,7 +89,7 @@ static void _starpu_mpi_request_destroy(struct _starpu_mpi_req *req)
 
 
 static struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle_t data_handle,
 static struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle_t data_handle,
 							      int srcdst, int mpi_tag, MPI_Comm comm,
 							      int srcdst, int mpi_tag, MPI_Comm comm,
-							      unsigned sync, void (*callback)(void *), void *arg,
+							      unsigned detached, unsigned sync, void (*callback)(void *), void *arg,
 							      enum _starpu_mpi_request_type request_type, void (*func)(struct _starpu_mpi_req *),
 							      enum _starpu_mpi_request_type request_type, void (*func)(struct _starpu_mpi_req *),
 							      enum starpu_data_access_mode mode, 
 							      enum starpu_data_access_mode mode, 
 							      int sequential_consistency)
 							      int sequential_consistency)
@@ -116,6 +117,7 @@ static struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle
 	req->session = nm_mpi_communicator_get_session(p_comm);
 	req->session = nm_mpi_communicator_get_session(p_comm);
 	req->gate = nm_mpi_communicator_get_gate(p_comm,req->srcdst);
 	req->gate = nm_mpi_communicator_get_gate(p_comm,req->srcdst);
 
 
+	req->detached = detached;
 	req->sync = sync;
 	req->sync = sync;
 	req->callback = callback;
 	req->callback = callback;
 	req->callback_arg = arg;
 	req->callback_arg = arg;
@@ -166,7 +168,7 @@ static void _starpu_mpi_isend_data_func(struct _starpu_mpi_req *req)
 
 
 	TRACE_MPI_ISEND_SUBMIT_END(req->srcdst, req->mpi_tag, starpu_data_get_size(req->data_handle));
 	TRACE_MPI_ISEND_SUBMIT_END(req->srcdst, req->mpi_tag, starpu_data_get_size(req->data_handle));
 
 
-	_starpu_mpi_handle_detached_request(req);
+	_starpu_mpi_handle_pending_request(req);
 
 
 	_STARPU_MPI_LOG_OUT();
 	_STARPU_MPI_LOG_OUT();
 }
 }
@@ -223,9 +225,9 @@ static void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
 
 
 static struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t data_handle,
 static struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t data_handle,
 							int dest, int mpi_tag, MPI_Comm comm,
 							int dest, int mpi_tag, MPI_Comm comm,
-							 unsigned sync, void (*callback)(void *), void *arg)
+							 unsigned detached, unsigned sync, void (*callback)(void *), void *arg)
 {
 {
-	return _starpu_mpi_isend_irecv_common(data_handle, dest, mpi_tag, comm, sync, callback, arg, SEND_REQ, _starpu_mpi_isend_size_func, STARPU_R,1);
+	return _starpu_mpi_isend_irecv_common(data_handle, dest, mpi_tag, comm, detached, sync, callback, arg, SEND_REQ, _starpu_mpi_isend_size_func, STARPU_R,1);
 }
 }
 
 
 int starpu_mpi_isend(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int dest, int mpi_tag, MPI_Comm comm)
 int starpu_mpi_isend(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int dest, int mpi_tag, MPI_Comm comm)
@@ -234,7 +236,7 @@ int starpu_mpi_isend(starpu_data_handle_t data_handle, starpu_mpi_req *public_re
 	STARPU_ASSERT_MSG(public_req, "starpu_mpi_isend needs a valid starpu_mpi_req");
 	STARPU_ASSERT_MSG(public_req, "starpu_mpi_isend needs a valid starpu_mpi_req");
 
 
 	struct _starpu_mpi_req *req;
 	struct _starpu_mpi_req *req;
-	req = _starpu_mpi_isend_common(data_handle, dest, mpi_tag, comm, 0, NULL, NULL);
+	req = _starpu_mpi_isend_common(data_handle, dest, mpi_tag, comm, 0, 0, NULL, NULL);
 
 
 	STARPU_ASSERT_MSG(req, "Invalid return for _starpu_mpi_isend_common");
 	STARPU_ASSERT_MSG(req, "Invalid return for _starpu_mpi_isend_common");
 	*public_req = req;
 	*public_req = req;
@@ -247,7 +249,7 @@ int starpu_mpi_isend_detached(starpu_data_handle_t data_handle,
 			      int dest, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
 			      int dest, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
 {
 {
 	_STARPU_MPI_LOG_IN();
 	_STARPU_MPI_LOG_IN();
-	_starpu_mpi_isend_common(data_handle, dest, mpi_tag, comm, 0, callback, arg);
+	_starpu_mpi_isend_common(data_handle, dest, mpi_tag, comm, 1, 0, callback, arg);
 
 
 	_STARPU_MPI_LOG_OUT();
 	_STARPU_MPI_LOG_OUT();
 	return 0;
 	return 0;
@@ -274,7 +276,7 @@ int starpu_mpi_issend(starpu_data_handle_t data_handle, starpu_mpi_req *public_r
 	STARPU_ASSERT_MSG(public_req, "starpu_mpi_issend needs a valid starpu_mpi_req");
 	STARPU_ASSERT_MSG(public_req, "starpu_mpi_issend needs a valid starpu_mpi_req");
 
 
 	struct _starpu_mpi_req *req;
 	struct _starpu_mpi_req *req;
-	req = _starpu_mpi_isend_common(data_handle, dest, mpi_tag, comm, 1, NULL, NULL);
+	req = _starpu_mpi_isend_common(data_handle, dest, mpi_tag, comm, 0, 1, NULL, NULL);
 
 
 	STARPU_ASSERT_MSG(req, "Invalid return for _starpu_mpi_isend_common");
 	STARPU_ASSERT_MSG(req, "Invalid return for _starpu_mpi_isend_common");
 	*public_req = req;
 	*public_req = req;
@@ -287,7 +289,7 @@ int starpu_mpi_issend_detached(starpu_data_handle_t data_handle, int dest, int m
 {
 {
 	_STARPU_MPI_LOG_IN();
 	_STARPU_MPI_LOG_IN();
 
 
-	_starpu_mpi_isend_common(data_handle, dest, mpi_tag, comm, 1, callback, arg);
+	_starpu_mpi_isend_common(data_handle, dest, mpi_tag, comm, 1, 1, callback, arg);
 
 
 	_STARPU_MPI_LOG_OUT();
 	_STARPU_MPI_LOG_OUT();
 	return 0;
 	return 0;
@@ -319,7 +321,7 @@ static void _starpu_mpi_irecv_data_func(struct _starpu_mpi_req *req)
 	TRACE_MPI_IRECV_SUBMIT_END(req->srcdst, req->mpi_tag);
 	TRACE_MPI_IRECV_SUBMIT_END(req->srcdst, req->mpi_tag);
 
 
 
 
-	_starpu_mpi_handle_detached_request(req);
+	_starpu_mpi_handle_pending_request(req);
 
 
 	_STARPU_MPI_LOG_OUT();
 	_STARPU_MPI_LOG_OUT();
 }
 }
@@ -358,14 +360,14 @@ static void _starpu_mpi_irecv_size_func(struct _starpu_mpi_req *req)
 		callback->req = req;
 		callback->req = req;
 		starpu_variable_data_register(&callback->handle, 0, (uintptr_t)&(callback->req->count), sizeof(callback->req->count));
 		starpu_variable_data_register(&callback->handle, 0, (uintptr_t)&(callback->req->count), sizeof(callback->req->count));
 		_STARPU_MPI_DEBUG(4, "Receiving size with tag %d from node %d\n", req->mpi_tag, req->srcdst);
 		_STARPU_MPI_DEBUG(4, "Receiving size with tag %d from node %d\n", req->mpi_tag, req->srcdst);
-		_starpu_mpi_irecv_common(callback->handle, req->srcdst, req->mpi_tag, req->comm, 0, _starpu_mpi_irecv_size_callback, callback,1);
+		_starpu_mpi_irecv_common(callback->handle, req->srcdst, req->mpi_tag, req->comm, 1, 0, _starpu_mpi_irecv_size_callback, callback,1);
 	}
 	}
 
 
 }
 }
 
 
-static struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, unsigned sync, void (*callback)(void *), void *arg, int sequential_consistency)
+static struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, unsigned detached, unsigned sync, void (*callback)(void *), void *arg, int sequential_consistency)
 {
 {
-	return _starpu_mpi_isend_irecv_common(data_handle, source, mpi_tag, comm, sync, callback, arg, RECV_REQ, _starpu_mpi_irecv_size_func, STARPU_W,sequential_consistency);
+	return _starpu_mpi_isend_irecv_common(data_handle, source, mpi_tag, comm, detached, sync, callback, arg, RECV_REQ, _starpu_mpi_irecv_size_func, STARPU_W,sequential_consistency);
 }
 }
 
 
 int starpu_mpi_irecv(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int source, int mpi_tag, MPI_Comm comm)
 int starpu_mpi_irecv(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int source, int mpi_tag, MPI_Comm comm)
@@ -375,7 +377,7 @@ int starpu_mpi_irecv(starpu_data_handle_t data_handle, starpu_mpi_req *public_re
 
 
 	struct _starpu_mpi_req *req;
 	struct _starpu_mpi_req *req;
 	TRACE_MPI_IRECV_COMPLETE_BEGIN(source, mpi_tag);
 	TRACE_MPI_IRECV_COMPLETE_BEGIN(source, mpi_tag);
-	req = _starpu_mpi_irecv_common(data_handle, source, mpi_tag, comm, 0, NULL, NULL,1);
+	req = _starpu_mpi_irecv_common(data_handle, source, mpi_tag, comm, 0, 0, NULL, NULL,1);
 	TRACE_MPI_IRECV_COMPLETE_END(source, mpi_tag);
 	TRACE_MPI_IRECV_COMPLETE_END(source, mpi_tag);
 
 
 	STARPU_ASSERT_MSG(req, "Invalid return for _starpu_mpi_irecv_common");
 	STARPU_ASSERT_MSG(req, "Invalid return for _starpu_mpi_irecv_common");
@@ -388,7 +390,7 @@ int starpu_mpi_irecv(starpu_data_handle_t data_handle, starpu_mpi_req *public_re
 int starpu_mpi_irecv_detached(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
 int starpu_mpi_irecv_detached(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
 {
 {
 	_STARPU_MPI_LOG_IN();
 	_STARPU_MPI_LOG_IN();
-	_starpu_mpi_irecv_common(data_handle, source, mpi_tag, comm, 0, callback, arg,1);
+	_starpu_mpi_irecv_common(data_handle, source, mpi_tag, comm, 1, 0, callback, arg,1);
 	_STARPU_MPI_LOG_OUT();
 	_STARPU_MPI_LOG_OUT();
 	return 0;
 	return 0;
 }
 }
@@ -404,7 +406,7 @@ int starpu_mpi_irecv_detached_sequential_consistency(starpu_data_handle_t data_h
 //	if (tag == -1)
 //	if (tag == -1)
 //		starpu_data_set_tag(data_handle, data_tag);
 //		starpu_data_set_tag(data_handle, data_tag);
 
 
-	_starpu_mpi_irecv_common(data_handle, source, data_tag, comm, 0, callback, arg, sequential_consistency);
+	_starpu_mpi_irecv_common(data_handle, source, data_tag, comm, 1, 0, callback, arg, sequential_consistency);
 
 
 	_STARPU_MPI_LOG_OUT();
 	_STARPU_MPI_LOG_OUT();
 	return 0;
 	return 0;
@@ -447,8 +449,11 @@ int starpu_mpi_recv(starpu_data_handle_t data_handle, int source, int mpi_tag, M
 
 
 int starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status)
 int starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status)
 {
 {
-	struct _starpu_mpi_req *req = *public_req;
 	_STARPU_MPI_LOG_IN();
 	_STARPU_MPI_LOG_IN();
+	STARPU_MPI_ASSERT_MSG(public_req, "starpu_mpi_wait needs a valid starpu_mpi_req");
+	struct _starpu_mpi_req *req = *public_req;
+	STARPU_MPI_ASSERT_MSG(!req->detached, "MPI_Wait cannot be called on a detached request");
+
 
 
 /* we must do a test_locked to avoid race condition : 
 /* we must do a test_locked to avoid race condition : 
  * without req_cond could still be used and couldn't be freed)*/
  * without req_cond could still be used and couldn't be freed)*/
@@ -475,8 +480,11 @@ int starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status)
 
 
 int starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status)
 int starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status)
 {
 {
-	struct _starpu_mpi_req *req = *public_req;
+
 	_STARPU_MPI_LOG_IN();
 	_STARPU_MPI_LOG_IN();
+	STARPU_MPI_ASSERT_MSG(public_req, "starpu_mpi_test needs a valid starpu_mpi_req");
+	struct _starpu_mpi_req *req = *public_req;
+	STARPU_MPI_ASSERT_MSG(!req->detached, "MPI_Test cannot be called on a detached request");
 	_STARPU_MPI_DEBUG(2, "Test request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n",
 	_STARPU_MPI_DEBUG(2, "Test request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n",
 			  req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
 			  req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
 
 
@@ -577,19 +585,30 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req,n
 	/* Execute the specified callback, if any */
 	/* Execute the specified callback, if any */
 	if (req->callback){
 	if (req->callback){
 		struct callback_lfstack_cell_s* c = padico_malloc(sizeof(struct callback_lfstack_cell_s));
 		struct callback_lfstack_cell_s* c = padico_malloc(sizeof(struct callback_lfstack_cell_s));
-		c->callback = req->callback;
-		c->callback_arg = req->callback_arg;
-		c->completed=&req->completed;
-		c->req_cond = &req->req_cond;
+		c->req = req;
+		if(req->detached)
+			STARPU_ATOMIC_ADD(&pending_detached, 1);
+		/* The main thread can exit without waiting
+		* the end of the detached request. Callback thread
+		* must then be kept alive if they have a callback.*/
+
 		callback_lfstack_push(&callback_stack, c);
 		callback_lfstack_push(&callback_stack, c);
 		starpu_sem_post(&callback_sem);
 		starpu_sem_post(&callback_sem);
 	}
 	}
 	else
 	else
 	{
 	{
+		if(req->detached)
+		{
+			_starpu_mpi_request_destroy(req);
+			// a detached request wont be wait/test (and freed inside).
+		}
+			else
+		{
 	/* tell anyone potentially waiting on the request that it is
 	/* tell anyone potentially waiting on the request that it is
 	 * terminated now (should be done after the callback)*/
 	 * terminated now (should be done after the callback)*/
-		req->completed = 1;
-		piom_cond_signal(&req->req_cond, REQ_FINALIZED);
+			req->completed = 1;
+			piom_cond_signal(&req->req_cond, REQ_FINALIZED);
+		}
 	}
 	}
 
 
 	_STARPU_MPI_LOG_OUT();
 	_STARPU_MPI_LOG_OUT();
@@ -611,7 +630,7 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req,n
 // 	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
 // 	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
 // 	_STARPU_MPI_LOG_OUT();
 // 	_STARPU_MPI_LOG_OUT();
 // }
 // }
-
+/*
 #ifdef STARPU_MPI_ACTIVITY
 #ifdef STARPU_MPI_ACTIVITY
 static unsigned _starpu_mpi_progression_hook_func(void *arg STARPU_ATTRIBUTE_UNUSED)
 static unsigned _starpu_mpi_progression_hook_func(void *arg STARPU_ATTRIBUTE_UNUSED)
 {
 {
@@ -627,7 +646,8 @@ static unsigned _starpu_mpi_progression_hook_func(void *arg STARPU_ATTRIBUTE_UNU
 
 
 	return may_block;
 	return may_block;
 }
 }
-#endif /* STARPU_MPI_ACTIVITY */
+*/
+//#endif /* STARPU_MPI_ACTIVITY */
 
 
 // static void _starpu_mpi_test_detached_requests(void)
 // static void _starpu_mpi_test_detached_requests(void)
 // {
 // {
@@ -691,16 +711,18 @@ void _starpu_mpi_handle_request_termination_callback(nm_sr_event_t event, const
 	_starpu_mpi_handle_request_termination(ref,event);
 	_starpu_mpi_handle_request_termination(ref,event);
 }
 }
 
 
-static void _starpu_mpi_handle_detached_request(struct _starpu_mpi_req *req)
+static void _starpu_mpi_handle_pending_request(struct _starpu_mpi_req *req)
 {
 {
-	nm_sr_request_set_ref(&(req->request), req);
-
-	nm_sr_request_monitor(req->session, &(req->request), NM_SR_EVENT_FINALIZED,_starpu_mpi_handle_request_termination_callback);
 	if(req->request_type == SEND_REQ && req->waited>1){
 	if(req->request_type == SEND_REQ && req->waited>1){
 		nm_sr_request_set_ref(&(req->size_req), req);
 		nm_sr_request_set_ref(&(req->size_req), req);
 
 
 		nm_sr_request_monitor(req->session, &(req->size_req), NM_SR_EVENT_FINALIZED,_starpu_mpi_handle_request_termination_callback);
 		nm_sr_request_monitor(req->session, &(req->size_req), NM_SR_EVENT_FINALIZED,_starpu_mpi_handle_request_termination_callback);
 	}
 	}
+	/* the if must be before, because the first callback can directly free 
+	* a detached request (the second callback free if req->waited>1). */
+	nm_sr_request_set_ref(&(req->request), req);
+
+	nm_sr_request_monitor(req->session, &(req->request), NM_SR_EVENT_FINALIZED,_starpu_mpi_handle_request_termination_callback);
 }
 }
 
 
 static void _starpu_mpi_handle_new_request(struct _starpu_mpi_req *req)
 static void _starpu_mpi_handle_new_request(struct _starpu_mpi_req *req)
@@ -782,24 +804,29 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 // #ifndef STARPU_MPI_ACTIVITY
 // #ifndef STARPU_MPI_ACTIVITY
 // 		block = block && _starpu_mpi_req_list_empty(detached_requests);
 // 		block = block && _starpu_mpi_req_list_empty(detached_requests);
 // #endif /* STARPU_MPI_ACTIVITY */
 // #endif /* STARPU_MPI_ACTIVITY */
+		fprintf(stderr,"pop begin");
 		struct callback_lfstack_cell_s* c = callback_lfstack_pop(&callback_stack);
 		struct callback_lfstack_cell_s* c = callback_lfstack_pop(&callback_stack);
 		int err=0;
 		int err=0;
 
 
-		if(running)
+		if(running || pending_detached)
 		{
 		{
 			err = starpu_sem_wait(&callback_sem);
 			err = starpu_sem_wait(&callback_sem);
-			//running can be changed while waiting
+			//running/pending_detached can change while waiting
 		}
 		}
 		if(c==NULL)
 		if(c==NULL)
 		{
 		{
-			if(running)
+			if(running || pending_detached)
 			{
 			{
 				c = callback_lfstack_pop(&callback_stack);
 				c = callback_lfstack_pop(&callback_stack);
 				STARPU_ASSERT_MSG(c!=NULL, "Callback thread awakened without callback ready with error %d.",err);
 				STARPU_ASSERT_MSG(c!=NULL, "Callback thread awakened without callback ready with error %d.",err);
 			}
 			}
 			else
 			else
+			{
+				fprintf(stderr,"pop break");
 				break;//what if there is some pending request ?
 				break;//what if there is some pending request ?
+			}
 		}
 		}
+		fprintf(stderr,"pop done");
 
 
 		// if (block)
 		// if (block)
 		// {
 		// {
@@ -815,11 +842,18 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 		// 	TRACE_MPI_SLEEP_END();
 		// 	TRACE_MPI_SLEEP_END();
 		// }
 		// }
 
 
-		c->callback(c->callback_arg);
-		*(c->completed)=1;
-		piom_cond_signal(c->req_cond, REQ_FINALIZED);
+		c->req->callback(c->req->callback_arg);
+		if (c->req->detached){
+			_starpu_mpi_request_destroy(c->req);
+			STARPU_ATOMIC_ADD(&pending_detached, -1);
+		}
+		else{
+			c->req->completed=1;
+			piom_cond_signal(&(c->req->req_cond), REQ_FINALIZED);
+		}
 		/* we signal that the request is completed.*/
 		/* we signal that the request is completed.*/
 
 
+
 		free(c);
 		free(c);
 
 
 		// /* test whether there are some terminated "detached request" */
 		// /* test whether there are some terminated "detached request" */
@@ -865,9 +899,9 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 /*                                                      */
 /*                                                      */
 /********************************************************/
 /********************************************************/
 
 
-#ifdef STARPU_MPI_ACTIVITY
-static int hookid = - 1;
-#endif /* STARPU_MPI_ACTIVITY */
+// #ifdef STARPU_MPI_ACTIVITY
+// static int hookid = - 1;
+// #endif /* STARPU_MPI_ACTIVITY */
 
 
 static void _starpu_mpi_add_sync_point_in_fxt(void)
 static void _starpu_mpi_add_sync_point_in_fxt(void)
 {
 {
@@ -943,10 +977,10 @@ int _starpu_mpi_initialize(int *argc, char ***argv, int initialize_mpi)
 	//	STARPU_PTHREAD_COND_WAIT(&cond_progression, &mutex);
 	//	STARPU_PTHREAD_COND_WAIT(&cond_progression, &mutex);
 	// STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
 	// STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
 
 
-#ifdef STARPU_MPI_ACTIVITY
-	hookid = starpu_progression_hook_register(_starpu_mpi_progression_hook_func, NULL);
-	STARPU_ASSERT_MSG(hookid >= 0, "starpu_progression_hook_register failed");
-#endif /* STARPU_MPI_ACTIVITY */
+// #ifdef STARPU_MPI_ACTIVITY
+// 	hookid = starpu_progression_hook_register(_starpu_mpi_progression_hook_func, NULL);
+// 	STARPU_ASSERT_MSG(hookid >= 0, "starpu_progression_hook_register failed");
+// #endif /* STARPU_MPI_ACTIVITY */
 
 
 	_starpu_mpi_add_sync_point_in_fxt();
 	_starpu_mpi_add_sync_point_in_fxt();
 	_starpu_mpi_comm_amounts_init(MPI_COMM_WORLD);
 	_starpu_mpi_comm_amounts_init(MPI_COMM_WORLD);
@@ -998,9 +1032,9 @@ int starpu_mpi_shutdown(void)
 
 
 	starpu_pthread_join(progress_thread, &value);
 	starpu_pthread_join(progress_thread, &value);
 
 
-#ifdef STARPU_MPI_ACTIVITY
-	starpu_progression_hook_deregister(hookid);
-#endif /* STARPU_MPI_ACTIVITY */
+// #ifdef STARPU_MPI_ACTIVITY
+// 	starpu_progression_hook_deregister(hookid);
+// #endif /* STARPU_MPI_ACTIVITY */
 
 
 	TRACE_MPI_STOP(rank, world_size);
 	TRACE_MPI_STOP(rank, world_size);
 
 

+ 1 - 0
nmad/src/starpu_mpi_private.h

@@ -151,6 +151,7 @@ LIST_TYPE(_starpu_mpi_req,
 
 
 
 
 	/* in the case of detached requests */
 	/* in the case of detached requests */
+	unsigned detached;
 	void *callback_arg;
 	void *callback_arg;
 	void (*callback)(void *);
 	void (*callback)(void *);