Browse Source

fix asynchronous request submission (aka modprobe brain)

Cédric Augonnet 15 years ago
parent
commit
52c311463b
1 changed files with 20 additions and 5 deletions
  1. 20 5
      mpi/starpu_mpi.c

+ 20 - 5
mpi/starpu_mpi.c

@@ -18,6 +18,7 @@
 #include <starpu_mpi_datatype.h>
 
 static void submit_mpi_req(struct starpu_mpi_req_s *req);
+void handle_request_termination(struct starpu_mpi_req_s *req);
 
 static starpu_mpi_req_list_t new_requests; 
 static starpu_mpi_req_list_t pending_requests; 
@@ -29,6 +30,11 @@ static int running = 0;
 
 static void _handle_new_mpi_isend(struct starpu_mpi_req_s *req)
 {
+	//int rank;
+	//MPI_Comm_rank(MPI_COMM_WORLD, &rank);
+	//fprintf(stdout, "Rank %d _handle_new_mpi_isend\n", rank);
+	//fflush(stdout);
+
 	void *ptr = starpu_mpi_handle_to_ptr(req->data_handle);
 	starpu_mpi_handle_to_datatype(req->data_handle, &req->datatype);
 
@@ -43,9 +49,11 @@ int starpu_mpi_isend(starpu_data_handle data_handle, struct starpu_mpi_req_s *re
 	pthread_mutex_init(&req->req_mutex, NULL);
 	pthread_cond_init(&req->req_cond, NULL);
 
+	req->data_handle = data_handle;
 	req->dst = dest;
 	req->mpi_tag = mpi_tag;
 	req->comm = comm;
+	req->mode = STARPU_R;
 
 	req->handle_new = _handle_new_mpi_isend;
 
@@ -73,6 +81,8 @@ int starpu_mpi_irecv(starpu_data_handle data_handle, struct starpu_mpi_req_s *re
 	pthread_mutex_init(&req->req_mutex, NULL);
 	pthread_cond_init(&req->req_cond, NULL);
 
+	req->data_handle = data_handle;
+	req->mode = STARPU_W;
 	req->src = source;
 	req->mpi_tag = mpi_tag;
 	req->comm = comm;
@@ -134,7 +144,7 @@ int starpu_mpi_wait(struct starpu_mpi_req_s *req, MPI_Status *status)
 
 	ret = MPI_Wait(&req->request, status);
 
-	MPI_Type_free(&req->datatype);
+	handle_request_termination(req);
 
 	pthread_mutex_unlock(&req->req_mutex);
 
@@ -152,7 +162,7 @@ int starpu_mpi_test(struct starpu_mpi_req_s *req, int *flag, MPI_Status *status)
 		ret = MPI_Test(&req->request, flag, status);
 
 		if (*flag)
-			MPI_Type_free(&req->datatype);
+			handle_request_termination(req);
 	}
 	else {
 		*flag = 0;
@@ -167,6 +177,12 @@ int starpu_mpi_test(struct starpu_mpi_req_s *req, int *flag, MPI_Status *status)
  *	Requests
  */
 
+void handle_request_termination(struct starpu_mpi_req_s *req)
+{
+	MPI_Type_free(&req->datatype);
+	starpu_release_data_from_mem(req->data_handle);
+}
+
 void handle_request(struct starpu_mpi_req_s *req)
 {
 	STARPU_ASSERT(req);
@@ -192,6 +208,7 @@ static void submit_mpi_req(struct starpu_mpi_req_s *req)
 
 	starpu_mpi_req_list_push_front(new_requests, req);
 
+	pthread_cond_broadcast(&cond);
 	pthread_cond_broadcast(&req->req_cond);
 
 	pthread_mutex_unlock(&req->req_mutex);
@@ -228,9 +245,7 @@ void *progress_thread_func(void *arg __attribute__((unused)))
 			 * release the lock.  */
 			pthread_mutex_unlock(&mutex);
 
-			/* handle that request */
-			STARPU_ASSERT(req);
-			req->handle_new(req);
+			handle_request(req);
 
 			pthread_mutex_lock(&mutex);
 		}