浏览代码

Factorize some code

Cédric Augonnet 15 年之前
父节点
当前提交
5c4864f8d5
共有 1 个文件被更改,包括 45 次插入73 次删除
  1. 45 73
      mpi/starpu_mpi.c

+ 45 - 73
mpi/starpu_mpi.c

@@ -19,11 +19,11 @@
 #include <starpu_mpi_datatype.h>
 #include <starpu_mpi_private.h>
 
-//#define VERBOSE_STARPU_MPI	1
+#define VERBOSE_STARPU_MPI	1
 
 /* TODO find a better way to select the polling method (perhaps during the
  * configuration) */
-#define USE_STARPU_ACTIVITY	1
+//#define USE_STARPU_ACTIVITY	1
 
 static void submit_mpi_req(void *arg);
 static void handle_request_termination(struct starpu_mpi_req_s *req);
@@ -69,20 +69,17 @@ static void starpu_mpi_isend_func(struct starpu_mpi_req_s *req)
 	PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
 }
 
-int starpu_mpi_isend(starpu_data_handle data_handle, starpu_mpi_req *public_req, int dest, int mpi_tag, MPI_Comm comm)
+static struct starpu_mpi_req_s *_starpu_mpi_isend_common(starpu_data_handle data_handle,
+				int dest, int mpi_tag, MPI_Comm comm,
+				unsigned detached, void (*callback)(void *), void *arg)
 {
-	STARPU_ASSERT(public_req);
-
-	struct starpu_mpi_req_s *req = starpu_mpi_req_new();
+	struct starpu_mpi_req_s *req = calloc(1, sizeof(struct starpu_mpi_req_s));
 	STARPU_ASSERT(req);
-	*public_req = req;
-
-	memset(req, 0, sizeof(struct starpu_mpi_req_s));
 
 	/* Initialize the request structure */
 	req->submitted = 0;
 	req->completed = 0;
-	pthread_mutex_init(&req->req_mutex, NULL);
+	PTHREAD_MUTEX_INIT(&req->req_mutex, NULL);
 	PTHREAD_COND_INIT(&req->req_cond, NULL);
 
 	req->request_type = SEND_REQ;
@@ -91,15 +88,31 @@ int starpu_mpi_isend(starpu_data_handle data_handle, starpu_mpi_req *public_req,
 	req->srcdst = dest;
 	req->mpi_tag = mpi_tag;
 	req->comm = comm;
-	req->detached = 0;
 	req->func = starpu_mpi_isend_func;
 
+	req->detached = detached;
+	req->callback = callback;
+	req->callback_arg = arg;
+
 	/* Asynchronously request StarPU to fetch the data in main memory: when
 	 * it is available in main memory, submit_mpi_req(req) is called and
 	 * the request is actually submitted  */
 	starpu_data_sync_with_mem_non_blocking(data_handle, STARPU_R,
 			submit_mpi_req, (void *)req);
 
+	return req;
+}
+
+int starpu_mpi_isend(starpu_data_handle data_handle, starpu_mpi_req *public_req, int dest, int mpi_tag, MPI_Comm comm)
+{
+	STARPU_ASSERT(public_req);
+
+	struct starpu_mpi_req_s *req;
+	req = _starpu_mpi_isend_common(data_handle, dest, mpi_tag, comm, 0, NULL, NULL);
+
+	STARPU_ASSERT(req);
+	*public_req = req;
+
 	return 0;
 }
 
@@ -110,32 +123,7 @@ int starpu_mpi_isend(starpu_data_handle data_handle, starpu_mpi_req *public_req,
 int starpu_mpi_isend_detached(starpu_data_handle data_handle,
 				int dest, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
 {
-	struct starpu_mpi_req_s *req = calloc(1, sizeof(struct starpu_mpi_req_s));
-	STARPU_ASSERT(req);
-
-	/* Initialize the request structure */
-	req->submitted = 0;
-	req->completed = 0;
-	PTHREAD_MUTEX_INIT(&req->req_mutex, NULL);
-	PTHREAD_COND_INIT(&req->req_cond, NULL);
-
-	req->request_type = SEND_REQ;
-
-	req->data_handle = data_handle;
-	req->srcdst = dest;
-	req->mpi_tag = mpi_tag;
-	req->comm = comm;
-	req->func = starpu_mpi_isend_func;
-
-	req->detached = 1;
-	req->callback = callback;
-	req->callback_arg = arg;
-
-	/* Asynchronously request StarPU to fetch the data in main memory: when
-	 * it is available in main memory, submit_mpi_req(req) is called and
-	 * the request is actually submitted  */
-	starpu_data_sync_with_mem_non_blocking(data_handle, STARPU_R,
-			submit_mpi_req, (void *)req);
+	_starpu_mpi_isend_common(data_handle, dest, mpi_tag, comm, 1, callback, arg);
 
 	return 0;
 }
@@ -167,18 +155,11 @@ static void starpu_mpi_irecv_func(struct starpu_mpi_req_s *req)
 	PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
 }
 
-int starpu_mpi_irecv(starpu_data_handle data_handle, starpu_mpi_req *public_req, int source, int mpi_tag, MPI_Comm comm)
+static struct starpu_mpi_req_s *_starpu_mpi_irecv_common(starpu_data_handle data_handle, int source, int mpi_tag, MPI_Comm comm, unsigned detached, void (*callback)(void *), void *arg)
 {
-	STARPU_ASSERT(public_req);
-
-	struct starpu_mpi_req_s *req = starpu_mpi_req_new();
-	STARPU_ASSERT(req);
-	*public_req = req;
-
+	struct starpu_mpi_req_s *req = calloc(1, sizeof(struct starpu_mpi_req_s));
 	STARPU_ASSERT(req);
 
-	memset(req, 0, sizeof(struct starpu_mpi_req_s));
-
 	/* Initialize the request structure */
 	req->submitted = 0;
 	PTHREAD_MUTEX_INIT(&req->req_mutex, NULL);
@@ -190,7 +171,10 @@ int starpu_mpi_irecv(starpu_data_handle data_handle, starpu_mpi_req *public_req,
 	req->srcdst = source;
 	req->mpi_tag = mpi_tag;
 	req->comm = comm;
-	req->detached = 0;
+
+	req->detached = detached;
+	req->callback = callback;
+	req->callback_arg = arg;
 
 	req->func = starpu_mpi_irecv_func;
 
@@ -200,6 +184,19 @@ int starpu_mpi_irecv(starpu_data_handle data_handle, starpu_mpi_req *public_req,
 	starpu_data_sync_with_mem_non_blocking(data_handle, STARPU_W,
 			submit_mpi_req, (void *)req);
 
+	return req;
+}
+
+int starpu_mpi_irecv(starpu_data_handle data_handle, starpu_mpi_req *public_req, int source, int mpi_tag, MPI_Comm comm)
+{
+	STARPU_ASSERT(public_req);
+
+	struct starpu_mpi_req_s *req;
+	req = _starpu_mpi_irecv_common(data_handle, source, mpi_tag, comm, 0, NULL, NULL);
+
+	STARPU_ASSERT(req);
+	*public_req = req;
+
 	return 0;
 }
 
@@ -209,32 +206,7 @@ int starpu_mpi_irecv(starpu_data_handle data_handle, starpu_mpi_req *public_req,
 
 int starpu_mpi_irecv_detached(starpu_data_handle data_handle, int source, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
 {
-	struct starpu_mpi_req_s *req = calloc(1, sizeof(struct starpu_mpi_req_s));
-	STARPU_ASSERT(req);
-
-	/* Initialize the request structure */
-	req->submitted = 0;
-	PTHREAD_MUTEX_INIT(&req->req_mutex, NULL);
-	PTHREAD_COND_INIT(&req->req_cond, NULL);
-
-	req->request_type = RECV_REQ;
-
-	req->data_handle = data_handle;
-	req->srcdst = source;
-	req->mpi_tag = mpi_tag;
-	req->comm = comm;
-
-	req->detached = 1;
-	req->callback = callback;
-	req->callback_arg = arg;
-
-	req->func = starpu_mpi_irecv_func;
-
-	/* Asynchronously request StarPU to fetch the data in main memory: when
-	 * it is available in main memory, submit_mpi_req(req) is called and
-	 * the request is actually submitted  */
-	starpu_data_sync_with_mem_non_blocking(data_handle, STARPU_W,
-			submit_mpi_req, (void *)req);
+	_starpu_mpi_irecv_common(data_handle, source, mpi_tag, comm, 1, callback, arg);
 
 	return 0;
 }