Prechádzať zdrojové kódy

Having a "request" associated to a detached call does not make much sense as it
is forbidden to synchronize with it explicitely, so we remove this field from
the interface of the detached calls, and the lib automatically allocates a
request internally.

Cédric Augonnet 15 rokov pred
rodič
commit
e86adf65df

+ 9 - 6
mpi/starpu_mpi.c

@@ -86,13 +86,12 @@ int starpu_mpi_isend(starpu_data_handle data_handle, struct starpu_mpi_req_s *re
  *	Isend (detached)
  */
 
-int starpu_mpi_isend_detached(starpu_data_handle data_handle, struct starpu_mpi_req_s *req,
+int starpu_mpi_isend_detached(starpu_data_handle data_handle,
 				int dest, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
 {
+	struct starpu_mpi_req_s *req = calloc(1, sizeof(struct starpu_mpi_req_s));
 	STARPU_ASSERT(req);
 
-	memset(req, 0, sizeof(struct starpu_mpi_req_s));
-
 	/* Initialize the request structure */
 	req->submitted = 0;
 	req->completed = 0;
@@ -168,12 +167,11 @@ int starpu_mpi_irecv(starpu_data_handle data_handle, struct starpu_mpi_req_s *re
  *	Irecv (detached)
  */
 
-int starpu_mpi_irecv_detached(starpu_data_handle data_handle, struct starpu_mpi_req_s *req, int source, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
+int starpu_mpi_irecv_detached(starpu_data_handle data_handle, int source, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
 {
+	struct starpu_mpi_req_s *req = calloc(1, sizeof(struct starpu_mpi_req_s));
 	STARPU_ASSERT(req);
 
-	memset(req, 0, sizeof(struct starpu_mpi_req_s));
-
 	/* Initialize the request structure */
 	req->submitted = 0;
 	pthread_mutex_init(&req->req_mutex, NULL);
@@ -424,6 +422,11 @@ void test_detached_requests(void)
 
 		if (flag)
 			starpu_mpi_req_list_erase(detached_requests, req);
+
+#warning TODO fix memleak
+		/* Detached requests are automatically allocated by the lib */
+//		if (req->detached)
+//			free(req);
 	}
 	
 	pthread_mutex_unlock(&detached_requests_mutex);

+ 4 - 4
mpi/starpu_mpi.h

@@ -66,8 +66,8 @@ int starpu_mpi_send(starpu_data_handle data_handle,
 		int dest, int mpi_tag, MPI_Comm comm);
 int starpu_mpi_recv(starpu_data_handle data_handle,
 		int source, int mpi_tag, MPI_Comm comm, MPI_Status *status);
-int starpu_mpi_isend_detached(starpu_data_handle data_handle, struct starpu_mpi_req_s *req, int dest, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg);
-int starpu_mpi_irecv_detached(starpu_data_handle data_handle, struct starpu_mpi_req_s *req, int source, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg);
+int starpu_mpi_isend_detached(starpu_data_handle data_handle, int dest, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg);
+int starpu_mpi_irecv_detached(starpu_data_handle data_handle, int source, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg);
 int starpu_mpi_wait(struct starpu_mpi_req_s *req, MPI_Status *status);
 int starpu_mpi_test(struct starpu_mpi_req_s *req, int *flag, MPI_Status *status);
 int starpu_mpi_initialize(void);
@@ -76,7 +76,7 @@ int starpu_mpi_shutdown(void);
 /* Some helper functions */
 
 /* When the transfer is completed, the tag is unlocked */
-int starpu_mpi_isend_detached_unlock_tag(starpu_data_handle data_handle, struct starpu_mpi_req_s *req, int dest, int mpi_tag, MPI_Comm comm, starpu_tag_t tag);
-int starpu_mpi_irecv_detached_unlock_tag(starpu_data_handle data_handle, struct starpu_mpi_req_s *req, int source, int mpi_tag, MPI_Comm comm, starpu_tag_t tag);
+int starpu_mpi_isend_detached_unlock_tag(starpu_data_handle data_handle, int dest, int mpi_tag, MPI_Comm comm, starpu_tag_t tag);
+int starpu_mpi_irecv_detached_unlock_tag(starpu_data_handle data_handle, int source, int mpi_tag, MPI_Comm comm, starpu_tag_t tag);
 
 #endif // __STARPU_MPI_H__

+ 4 - 4
mpi/starpu_mpi_helper.c

@@ -25,22 +25,22 @@ static void starpu_mpi_unlock_tag_callback(void *arg)
 	free(tagptr);
 }
 
-int starpu_mpi_isend_detached_unlock_tag(starpu_data_handle data_handle, struct starpu_mpi_req_s *req,
+int starpu_mpi_isend_detached_unlock_tag(starpu_data_handle data_handle,
 				int dest, int mpi_tag, MPI_Comm comm, starpu_tag_t tag)
 {
 	starpu_tag_t *tagptr = malloc(sizeof(starpu_tag_t));
 	*tagptr = tag;
 	
-	return starpu_mpi_isend_detached(data_handle, req, dest, mpi_tag, comm,
+	return starpu_mpi_isend_detached(data_handle, dest, mpi_tag, comm,
 						starpu_mpi_unlock_tag_callback, tagptr);
 }
 
 
-int starpu_mpi_irecv_detached_unlock_tag(starpu_data_handle data_handle, struct starpu_mpi_req_s *req, int source, int mpi_tag, MPI_Comm comm, starpu_tag_t tag)
+int starpu_mpi_irecv_detached_unlock_tag(starpu_data_handle data_handle, int source, int mpi_tag, MPI_Comm comm, starpu_tag_t tag)
 {
 	starpu_tag_t *tagptr = malloc(sizeof(starpu_tag_t));
 	*tagptr = tag;
 	
-	return starpu_mpi_irecv_detached(data_handle, req, source, mpi_tag, comm,
+	return starpu_mpi_irecv_detached(data_handle, source, mpi_tag, comm,
 						starpu_mpi_unlock_tag_callback, tagptr);
 }

+ 2 - 3
mpi/tests/mpi_detached_tag.c

@@ -54,15 +54,14 @@ int main(int argc, char **argv)
 
 	for (loop = 0; loop < nloops; loop++)
 	{
-		struct starpu_mpi_req_s req;
 		starpu_tag_t tag = (starpu_tag_t)loop;
 
 		if ((loop % 2) == rank)
 		{
-			starpu_mpi_isend_detached_unlock_tag(tab_handle, &req, other_rank, loop, MPI_COMM_WORLD, tag);
+			starpu_mpi_isend_detached_unlock_tag(tab_handle, other_rank, loop, MPI_COMM_WORLD, tag);
 		}
 		else {
-			starpu_mpi_irecv_detached_unlock_tag(tab_handle, &req, other_rank, loop, MPI_COMM_WORLD, tag);
+			starpu_mpi_irecv_detached_unlock_tag(tab_handle, other_rank, loop, MPI_COMM_WORLD, tag);
 		}
 
 		starpu_tag_wait(tag);

+ 1 - 3
mpi/tests/mpi_irecv_detached.c

@@ -74,10 +74,8 @@ int main(int argc, char **argv)
 		}
 		else {
 			MPI_Status status;
-			struct starpu_mpi_req_s req;
-
 			int received = 0;
-			starpu_mpi_irecv_detached(tab_handle, &req, other_rank, loop, MPI_COMM_WORLD, callback, &received);
+			starpu_mpi_irecv_detached(tab_handle, other_rank, loop, MPI_COMM_WORLD, callback, &received);
 
 			pthread_mutex_lock(&mutex);
 			while (!received)

+ 1 - 2
mpi/tests/mpi_isend_detached.c

@@ -71,10 +71,9 @@ int main(int argc, char **argv)
 		if (rank == 0)
 		{
 			MPI_Status status;
-			struct starpu_mpi_req_s req;
 
 			int sent = 0;
-			starpu_mpi_isend_detached(tab_handle, &req, other_rank, loop, MPI_COMM_WORLD, callback, &sent);
+			starpu_mpi_isend_detached(tab_handle, other_rank, loop, MPI_COMM_WORLD, callback, &sent);
 
 			pthread_mutex_lock(&mutex);
 			while (!sent)