Browse Source

Rework the MPI lib

Cédric Augonnet 15 years ago
parent
commit
8bc9c5b571
5 changed files with 206 additions and 117 deletions
  1. 173 97
      mpi/starpu_mpi.c
  2. 29 14
      mpi/starpu_mpi.h
  3. 1 2
      mpi/tests/mpi_irecv.c
  4. 1 2
      mpi/tests/mpi_isend.c
  5. 2 2
      mpi/tests/ring_async.c

+ 173 - 97
mpi/starpu_mpi.c

@@ -17,171 +17,252 @@
 #include <starpu_mpi.h>
 #include <starpu_mpi.h>
 #include <starpu_mpi_datatype.h>
 #include <starpu_mpi_datatype.h>
 
 
-static void submit_mpi_req(struct starpu_mpi_req_s *req);
-void handle_request_termination(struct starpu_mpi_req_s *req);
+static void submit_mpi_req(void *arg);
+static void handle_request_termination(struct starpu_mpi_req_s *req);
 
 
+/* The list of requests that have been newly submitted by the application */
 static starpu_mpi_req_list_t new_requests; 
 static starpu_mpi_req_list_t new_requests; 
-static starpu_mpi_req_list_t pending_requests; 
 
 
 static pthread_cond_t cond;
 static pthread_cond_t cond;
 static pthread_mutex_t mutex;
 static pthread_mutex_t mutex;
 static pthread_t progress_thread;
 static pthread_t progress_thread;
 static int running = 0;
 static int running = 0;
 
 
-static void _handle_new_mpi_isend(struct starpu_mpi_req_s *req)
+/*
+ *	Isend
+ */
+
+static void starpu_mpi_isend_func(struct starpu_mpi_req_s *req)
 {
 {
 	void *ptr = starpu_mpi_handle_to_ptr(req->data_handle);
 	void *ptr = starpu_mpi_handle_to_ptr(req->data_handle);
 	starpu_mpi_handle_to_datatype(req->data_handle, &req->datatype);
 	starpu_mpi_handle_to_datatype(req->data_handle, &req->datatype);
 
 
-	MPI_Isend(ptr, 1, req->datatype, req->dst, req->mpi_tag, req->comm, &req->request);
+	MPI_Isend(ptr, 1, req->datatype, req->srcdst, req->mpi_tag, req->comm, &req->request);
+
+	/* somebody is perhaps waiting for the MPI request to be posted */
+	pthread_mutex_lock(&req->req_mutex);
+	req->submitted = 1;
+	pthread_cond_broadcast(&req->req_cond);
+	pthread_mutex_unlock(&req->req_mutex);
 }
 }
 
 
-int starpu_mpi_isend(starpu_data_handle data_handle, struct starpu_mpi_req_s *req,
-		int dest, int mpi_tag, MPI_Comm comm,
-		void (*callback)(void *))
+int starpu_mpi_isend(starpu_data_handle data_handle, struct starpu_mpi_req_s *req, int dest, int mpi_tag, MPI_Comm comm)
 {
 {
+	STARPU_ASSERT(req);
+
+	/* Initialize the request structure */
 	req->submitted = 0;
 	req->submitted = 0;
+	req->completed = 0;
 	pthread_mutex_init(&req->req_mutex, NULL);
 	pthread_mutex_init(&req->req_mutex, NULL);
 	pthread_cond_init(&req->req_cond, NULL);
 	pthread_cond_init(&req->req_cond, NULL);
 
 
 	req->data_handle = data_handle;
 	req->data_handle = data_handle;
-	req->dst = dest;
+	req->srcdst = dest;
 	req->mpi_tag = mpi_tag;
 	req->mpi_tag = mpi_tag;
 	req->comm = comm;
 	req->comm = comm;
-	req->mode = STARPU_R;
+	req->detached = 0;
+	req->func = starpu_mpi_isend_func;
 
 
-	req->handle_new = _handle_new_mpi_isend;
+	/* Asynchronously request StarPU to fetch the data in main memory: when
+	 * it is available in main memory, submit_mpi_req(req) is called and
+	 * the request is actually submitted  */
+	starpu_sync_data_with_mem_non_blocking(data_handle, STARPU_W,
+			submit_mpi_req, (void *)req);
+
+	return 0;
+}
 
 
-	submit_mpi_req(req);
+/*
+ *	Isend (detached)
+ */
 
 
+int starpu_mpi_isend_detached(starpu_data_handle data_handle, struct starpu_mpi_req_s *req, int dest, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
+{
+	/* TODO */
 	return 0;
 	return 0;
 }
 }
 
 
-static void _handle_new_mpi_irecv(struct starpu_mpi_req_s *req)
+/*
+ *	Irecv
+ */
+
+static void starpu_mpi_irecv_func(struct starpu_mpi_req_s *req)
 {
 {
 	void *ptr = starpu_mpi_handle_to_ptr(req->data_handle);
 	void *ptr = starpu_mpi_handle_to_ptr(req->data_handle);
 	starpu_mpi_handle_to_datatype(req->data_handle, &req->datatype);
 	starpu_mpi_handle_to_datatype(req->data_handle, &req->datatype);
 
 
-	MPI_Irecv(ptr, 1, req->datatype, req->src, req->mpi_tag, req->comm, &req->request);
-}
+	MPI_Irecv(ptr, 1, req->datatype, req->srcdst, req->mpi_tag, req->comm, &req->request);
 
 
+	/* somebody is perhaps waiting for the MPI request to be posted */
+	pthread_mutex_lock(&req->req_mutex);
+	req->submitted = 1;
+	pthread_cond_broadcast(&req->req_cond);
+	pthread_mutex_unlock(&req->req_mutex);
+}
 
 
-int starpu_mpi_irecv(starpu_data_handle data_handle, struct starpu_mpi_req_s *req,
-		int source, int mpi_tag, MPI_Comm comm,
-		void (*callback)(void *))
+int starpu_mpi_irecv(starpu_data_handle data_handle, struct starpu_mpi_req_s *req, int source, int mpi_tag, MPI_Comm comm)
 {
 {
+	STARPU_ASSERT(req);
+
+	/* Initialize the request structure */
 	req->submitted = 0;
 	req->submitted = 0;
 	pthread_mutex_init(&req->req_mutex, NULL);
 	pthread_mutex_init(&req->req_mutex, NULL);
 	pthread_cond_init(&req->req_cond, NULL);
 	pthread_cond_init(&req->req_cond, NULL);
 
 
 	req->data_handle = data_handle;
 	req->data_handle = data_handle;
-	req->mode = STARPU_W;
-	req->src = source;
+	req->srcdst = source;
 	req->mpi_tag = mpi_tag;
 	req->mpi_tag = mpi_tag;
 	req->comm = comm;
 	req->comm = comm;
+	req->detached = 0;
 
 
-	req->handle_new = _handle_new_mpi_irecv;
+	req->func = starpu_mpi_irecv_func;
 
 
-	submit_mpi_req(req);
+	/* Asynchronously request StarPU to fetch the data in main memory: when
+	 * it is available in main memory, submit_mpi_req(req) is called and
+	 * the request is actually submitted  */
+	starpu_sync_data_with_mem_non_blocking(data_handle, STARPU_W,
+			submit_mpi_req, (void *)req);
 
 
 	return 0;
 	return 0;
 }
 }
 
 
+/*
+ *	Recv
+ */
+
 int starpu_mpi_recv(starpu_data_handle data_handle,
 int starpu_mpi_recv(starpu_data_handle data_handle,
 		int source, int mpi_tag, MPI_Comm comm, MPI_Status *status)
 		int source, int mpi_tag, MPI_Comm comm, MPI_Status *status)
 {
 {
-	/* test if we are blocking in a callback .. */
-	int ret = starpu_sync_data_with_mem(data_handle, STARPU_W);
-	if (ret)
-		return ret;
+	struct starpu_mpi_req_s req;
 
 
-	void *ptr = starpu_mpi_handle_to_ptr(data_handle);
-	
-	MPI_Datatype datatype;
-	starpu_mpi_handle_to_datatype(data_handle, &datatype);
-	MPI_Recv(ptr, 1, datatype, source, mpi_tag, comm, status);
-
-	starpu_release_data_from_mem(data_handle);
+	starpu_mpi_irecv(data_handle, &req, source, mpi_tag, comm);
+	starpu_mpi_wait(&req, status);
 
 
 	return 0;
 	return 0;
 }
 }
 
 
+/*
+ *	Send
+ */
+
 int starpu_mpi_send(starpu_data_handle data_handle,
 int starpu_mpi_send(starpu_data_handle data_handle,
 		int dest, int mpi_tag, MPI_Comm comm)
 		int dest, int mpi_tag, MPI_Comm comm)
 {
 {
-	/* test if we are blocking in a callback .. */
-	int ret = starpu_sync_data_with_mem(data_handle, STARPU_R);
-	if (ret)
-		return ret;
-
-	void *ptr = starpu_mpi_handle_to_ptr(data_handle);
-	
+	struct starpu_mpi_req_s req;
 	MPI_Status status;
 	MPI_Status status;
-	MPI_Datatype datatype;
-	starpu_mpi_handle_to_datatype(data_handle, &datatype);
-	MPI_Send(ptr, 1, datatype, dest,  mpi_tag, comm);
 
 
-	starpu_release_data_from_mem(data_handle);
+	starpu_mpi_isend(data_handle, &req, dest, mpi_tag, comm);
+	starpu_mpi_wait(&req, &status);
 
 
 	return 0;
 	return 0;
 }
 }
 
 
-static void _handle_new_mpi_wait(struct starpu_mpi_req_s *req)
+/*
+ *	Wait
+ */
+
+static void starpu_mpi_wait_func(struct starpu_mpi_req_s *waiting_req)
 {
 {
-	req->ret = MPI_Wait(&req->request, req->status);
+	/* Which is the mpi request we are waiting for ? */
+	struct starpu_mpi_req_s *req = waiting_req->other_request;
+
+	req->ret = MPI_Wait(&req->request, waiting_req->status);
 	handle_request_termination(req);
 	handle_request_termination(req);
 }
 }
 
 
-
-
 int starpu_mpi_wait(struct starpu_mpi_req_s *req, MPI_Status *status)
 int starpu_mpi_wait(struct starpu_mpi_req_s *req, MPI_Status *status)
 {
 {
 	int ret;
 	int ret;
+	struct starpu_mpi_req_s waiting_req;
 
 
+	/* We cannot try to complete a MPI request that was not actually posted
+	 * to MPI yet. */
 	pthread_mutex_lock(&req->req_mutex);
 	pthread_mutex_lock(&req->req_mutex);
-
-	req->status = status;
-
-	/* we don't submit a wait request until the previous mpi request was
-	 * actually submitted */
 	while (!req->submitted)
 	while (!req->submitted)
 		pthread_cond_wait(&req->req_cond, &req->req_mutex);
 		pthread_cond_wait(&req->req_cond, &req->req_mutex);
+	pthread_mutex_unlock(&req->req_mutex);
 
 
-	req->submitted = 0;
-	req->handle_new = _handle_new_mpi_wait;
-	req->status = status;
-
-	submit_mpi_req(req);
+	/* Initialize the request structure */
+	pthread_mutex_init(&waiting_req.req_mutex, NULL);
+	pthread_cond_init(&waiting_req.req_cond, NULL);
+	waiting_req.status = status;
+	waiting_req.other_request = req;
+	waiting_req.func = starpu_mpi_wait_func;
+	
+	submit_mpi_req(&waiting_req);
 
 
-	while (!req->submitted)
+	/* We wait for the MPI request to finish */
+	pthread_mutex_lock(&req->req_mutex);
+	while (!req->completed)
 		pthread_cond_wait(&req->req_cond, &req->req_mutex);
 		pthread_cond_wait(&req->req_cond, &req->req_mutex);
+	pthread_mutex_unlock(&req->req_mutex);
 
 
 	ret = req->ret;
 	ret = req->ret;
 
 
-	pthread_mutex_unlock(&req->req_mutex);
-
 	return ret;
 	return ret;
 }
 }
 
 
+/*
+ * 	Test
+ */
+
+static void starpu_mpi_test_func(struct starpu_mpi_req_s *testing_req)
+{
+	/* Which is the mpi request we are testing for ? */
+	struct starpu_mpi_req_s *req = testing_req->other_request;
+
+	int ret = MPI_Test(&req->request, testing_req->flag, testing_req->status);
+
+	if (*testing_req->flag)
+	{
+		testing_req->ret = ret;
+		handle_request_termination(req);
+	}
+
+	pthread_mutex_lock(&testing_req->req_mutex);
+	testing_req->completed = 1;
+	pthread_cond_signal(&testing_req->req_cond);
+	pthread_mutex_unlock(&testing_req->req_mutex);
+}
+
 int starpu_mpi_test(struct starpu_mpi_req_s *req, int *flag, MPI_Status *status)
 int starpu_mpi_test(struct starpu_mpi_req_s *req, int *flag, MPI_Status *status)
 {
 {
 	int ret = 0;
 	int ret = 0;
+	struct starpu_mpi_req_s testing_req;
 
 
 	pthread_mutex_lock(&req->req_mutex);
 	pthread_mutex_lock(&req->req_mutex);
 
 
-	if (req->submitted)
-	{
-		ret = MPI_Test(&req->request, flag, status);
+	STARPU_ASSERT(!req->detached);
 
 
-		if (*flag)
-			handle_request_termination(req);
+	pthread_mutex_lock(&req->req_mutex);
+	unsigned submitted = req->submitted;
+	pthread_mutex_unlock(&req->req_mutex);
+
+	if (submitted)
+	{
+		/* Initialize the request structure */
+		pthread_mutex_init(&testing_req.req_mutex, NULL);
+		pthread_cond_init(&testing_req.req_cond, NULL);
+		testing_req.flag = flag;
+		testing_req.status = status;
+		testing_req.other_request = req;
+		testing_req.func = starpu_mpi_wait_func;
+		testing_req.completed = 0;
+		
+		submit_mpi_req(&testing_req);
+	
+		/* We wait for the test request to finish */
+		pthread_mutex_lock(&testing_req.req_mutex);
+		while (!testing_req.completed)
+			pthread_cond_wait(&testing_req.req_cond, &testing_req.req_mutex);
+		pthread_mutex_unlock(&testing_req.req_mutex);
+	
+		ret = testing_req.ret;
 	}
 	}
 	else {
 	else {
 		*flag = 0;
 		*flag = 0;
 	}
 	}
 
 
-	pthread_mutex_unlock(&req->req_mutex);
-
 	return ret;
 	return ret;
 }
 }
 
 
@@ -193,37 +274,23 @@ void handle_request_termination(struct starpu_mpi_req_s *req)
 {
 {
 	MPI_Type_free(&req->datatype);
 	MPI_Type_free(&req->datatype);
 	starpu_release_data_from_mem(req->data_handle);
 	starpu_release_data_from_mem(req->data_handle);
-}
-
-void handle_request(struct starpu_mpi_req_s *req)
-{
-	STARPU_ASSERT(req);
 
 
+	/* tell anyone potentiallly waiting on the request that it is
+	 * terminated now */
 	pthread_mutex_lock(&req->req_mutex);
 	pthread_mutex_lock(&req->req_mutex);
-
-	starpu_sync_data_with_mem(req->data_handle, req->mode);
-
-	/* submit the request to MPI */
-	req->handle_new(req);
-
-	/* perhaps somebody is waiting or trying to test */
-	req->submitted = 1;
+	req->completed = 1;
 	pthread_cond_broadcast(&req->req_cond);
 	pthread_cond_broadcast(&req->req_cond);
-
 	pthread_mutex_unlock(&req->req_mutex);
 	pthread_mutex_unlock(&req->req_mutex);
+	
 }
 }
 
 
-static void submit_mpi_req(struct starpu_mpi_req_s *req)
+void submit_mpi_req(void *arg)
 {
 {
-	pthread_mutex_lock(&mutex);
-	pthread_mutex_lock(&req->req_mutex);
+	struct starpu_mpi_req_s *req = arg;
 
 
+	pthread_mutex_lock(&mutex);
 	starpu_mpi_req_list_push_front(new_requests, req);
 	starpu_mpi_req_list_push_front(new_requests, req);
-
 	pthread_cond_broadcast(&cond);
 	pthread_cond_broadcast(&cond);
-	pthread_cond_broadcast(&req->req_cond);
-
-	pthread_mutex_unlock(&req->req_mutex);
 	pthread_mutex_unlock(&mutex);
 	pthread_mutex_unlock(&mutex);
 }
 }
 
 
@@ -231,6 +298,14 @@ static void submit_mpi_req(struct starpu_mpi_req_s *req)
  *	Progression loop
  *	Progression loop
  */
  */
 
 
+void handle_new_request(struct starpu_mpi_req_s *req)
+{
+	STARPU_ASSERT(req);
+
+	/* submit the request to MPI */
+	req->func(req);
+}
+
 void *progress_thread_func(void *arg __attribute__((unused)))
 void *progress_thread_func(void *arg __attribute__((unused)))
 {
 {
 	/* notify the main thread that the progression thread is ready */
 	/* notify the main thread that the progression thread is ready */
@@ -241,14 +316,18 @@ void *progress_thread_func(void *arg __attribute__((unused)))
 
 
 	pthread_mutex_lock(&mutex);
 	pthread_mutex_lock(&mutex);
 	while (running) {
 	while (running) {
+		/* TODO test if there is some "detached request" and progress if this is the case */
 		pthread_cond_wait(&cond, &mutex);
 		pthread_cond_wait(&cond, &mutex);
 		if (!running)
 		if (!running)
 			break;		
 			break;		
 
 
+		/* Handle new requests  */
+	//	while (req = starpu_mpi_req_list_pop_back(new_requests))
+
+		/* get one request */
+		struct starpu_mpi_req_s *req;
 		while (!starpu_mpi_req_list_empty(new_requests))
 		while (!starpu_mpi_req_list_empty(new_requests))
 		{
 		{
-			/* get one request */
-			struct starpu_mpi_req_s *req;
 			req = starpu_mpi_req_list_pop_back(new_requests);
 			req = starpu_mpi_req_list_pop_back(new_requests);
 
 
 			/* handling a request is likely to block for a while
 			/* handling a request is likely to block for a while
@@ -256,9 +335,7 @@ void *progress_thread_func(void *arg __attribute__((unused)))
 			 * application submit requests in the meantime, so we
 			 * application submit requests in the meantime, so we
 			 * release the lock.  */
 			 * release the lock.  */
 			pthread_mutex_unlock(&mutex);
 			pthread_mutex_unlock(&mutex);
-
-			handle_request(req);
-
+			handle_new_request(req);
 			pthread_mutex_lock(&mutex);
 			pthread_mutex_lock(&mutex);
 		}
 		}
 
 
@@ -278,10 +355,7 @@ int starpu_mpi_initialize(void)
 	pthread_mutex_init(&mutex, NULL);
 	pthread_mutex_init(&mutex, NULL);
 	pthread_cond_init(&cond, NULL);
 	pthread_cond_init(&cond, NULL);
 
 
-	/* requests that have not be submitted to MPI yet */
 	new_requests = starpu_mpi_req_list_new();
 	new_requests = starpu_mpi_req_list_new();
-	/* requests that are already submitted and which are not completed yet */
-	pending_requests = starpu_mpi_req_list_new();
 
 
 	int ret = pthread_create(&progress_thread, NULL, progress_thread_func, NULL);
 	int ret = pthread_create(&progress_thread, NULL, progress_thread_func, NULL);
 
 
@@ -304,5 +378,7 @@ int starpu_mpi_shutdown(void)
 	void *value;
 	void *value;
 	pthread_join(progress_thread, &value);
 	pthread_join(progress_thread, &value);
 
 
+	/* TODO liberate the queues */
+
 	return 0;
 	return 0;
 }
 }

+ 29 - 14
mpi/starpu_mpi.h

@@ -23,30 +23,45 @@
 #include <pthread.h>
 #include <pthread.h>
 
 
 LIST_TYPE(starpu_mpi_req,
 LIST_TYPE(starpu_mpi_req,
-	void *ptr;
+	/* description of the data at StarPU level */
 	starpu_data_handle data_handle;
 	starpu_data_handle data_handle;
-	starpu_access_mode mode;
+
+	/* description of the data to be sent/received */
+	void *ptr;
 	MPI_Datatype datatype;
 	MPI_Datatype datatype;
-	MPI_Request request;
-	MPI_Status *status;
-	void (*handle_new)(struct starpu_mpi_req_s *);
-	void (*handle_pending)(struct starpu_mpi_req_s *);
-	unsigned submitted;
-	int dst;
-	int src;
+
+	/* who are we talking to ? */
+	int srcdst;
 	int mpi_tag;
 	int mpi_tag;
-	int ret;
 	MPI_Comm comm;
 	MPI_Comm comm;
+
+	void (*func)(struct starpu_mpi_req_s *);
+
+	MPI_Status *status;
+	MPI_Request request;
+	int *flag;
+
+	int ret;
 	pthread_mutex_t req_mutex;
 	pthread_mutex_t req_mutex;
 	pthread_cond_t req_cond;
 	pthread_cond_t req_cond;
+
+	unsigned submitted;
+	unsigned completed;
+
+	/* In the case of a Wait/Test request, we are going to post a request
+	 * to test the completion of another request */
+	struct starpu_mpi_req_s *other_request;
+
+	/* in the case of detached requests */
+	unsigned detached;
+	void *arg;
+	void (*callback)(void *);
 );
 );
 
 
 int starpu_mpi_isend(starpu_data_handle data_handle, struct starpu_mpi_req_s *req,
 int starpu_mpi_isend(starpu_data_handle data_handle, struct starpu_mpi_req_s *req,
-		int dest, int mpi_tag, MPI_Comm comm,
-		void (*callback)(void *));
+		int dest, int mpi_tag, MPI_Comm comm);
 int starpu_mpi_irecv(starpu_data_handle data_handle, struct starpu_mpi_req_s *req,
 int starpu_mpi_irecv(starpu_data_handle data_handle, struct starpu_mpi_req_s *req,
-		int source, int mpi_tag, MPI_Comm comm,
-		void (*callback)(void *));
+		int source, int mpi_tag, MPI_Comm comm);
 int starpu_mpi_send(starpu_data_handle data_handle,
 int starpu_mpi_send(starpu_data_handle data_handle,
 		int dest, int mpi_tag, MPI_Comm comm);
 		int dest, int mpi_tag, MPI_Comm comm);
 int starpu_mpi_recv(starpu_data_handle data_handle,
 int starpu_mpi_recv(starpu_data_handle data_handle,

+ 1 - 2
mpi/tests/mpi_irecv.c

@@ -61,8 +61,7 @@ int main(int argc, char **argv)
 		else {
 		else {
 			MPI_Status status;
 			MPI_Status status;
 			struct starpu_mpi_req_s req;
 			struct starpu_mpi_req_s req;
-			starpu_mpi_irecv(tab_handle, &req, other_rank, loop,
-					MPI_COMM_WORLD, NULL);
+			starpu_mpi_irecv(tab_handle, &req, other_rank, loop, MPI_COMM_WORLD);
 			starpu_mpi_wait(&req, &status);
 			starpu_mpi_wait(&req, &status);
 		}
 		}
 	}
 	}

+ 1 - 2
mpi/tests/mpi_isend.c

@@ -58,8 +58,7 @@ int main(int argc, char **argv)
 		{
 		{
 			MPI_Status status;
 			MPI_Status status;
 			struct starpu_mpi_req_s req;
 			struct starpu_mpi_req_s req;
-			starpu_mpi_isend(tab_handle, &req, other_rank, loop,
-						MPI_COMM_WORLD, NULL);
+			starpu_mpi_isend(tab_handle, &req, other_rank, loop, MPI_COMM_WORLD);
 			starpu_mpi_wait(&req, &status);
 			starpu_mpi_wait(&req, &status);
 		}
 		}
 		else {
 		else {

+ 2 - 2
mpi/tests/ring_async.c

@@ -92,7 +92,7 @@ int main(int argc, char **argv)
 			token = 0;
 			token = 0;
 			MPI_Status status;
 			MPI_Status status;
 			struct starpu_mpi_req_s req;
 			struct starpu_mpi_req_s req;
-			starpu_mpi_irecv(token_handle, &req, (rank+size-1)%size, tag, MPI_COMM_WORLD, NULL);
+			starpu_mpi_irecv(token_handle, &req, (rank+size-1)%size, tag, MPI_COMM_WORLD);
 			starpu_mpi_wait(&req, &status);
 			starpu_mpi_wait(&req, &status);
 		}
 		}
 		else {
 		else {
@@ -106,7 +106,7 @@ int main(int argc, char **argv)
 		{
 		{
 			struct starpu_mpi_req_s req;
 			struct starpu_mpi_req_s req;
 			MPI_Status status;
 			MPI_Status status;
-			starpu_mpi_isend(token_handle, &req, (rank+1)%size, tag+1, MPI_COMM_WORLD, NULL);
+			starpu_mpi_isend(token_handle, &req, (rank+1)%size, tag+1, MPI_COMM_WORLD);
 			starpu_mpi_wait(&req, &status);
 			starpu_mpi_wait(&req, &status);
 		}
 		}
 		else {
 		else {