瀏覽代碼

separate out filling the request and submitting it, factorizing the filling into _starpu_mpi_request_fill, and allowing to handle submission more flexibly

Samuel Thibault 7 年之前
父節點
當前提交
99d174ad07
共有 5 個文件被更改,包括 96 次插入110 次删除
  1. 5 54
      mpi/src/mpi/starpu_mpi_mpi.c
  2. 4 48
      mpi/src/nmad/starpu_mpi_nmad.c
  3. 32 4
      mpi/src/starpu_mpi.c
  4. 10 4
      mpi/src/starpu_mpi_private.h
  5. 45 0
      mpi/src/starpu_mpi_req.c

+ 5 - 54
mpi/src/mpi/starpu_mpi_mpi.c

@@ -51,10 +51,9 @@ static unsigned nready_process;
 static unsigned ndetached_send;
 
 static int mpi_thread_cpuid = -1;
-static int use_prio = 1;
+int _starpu_mpi_use_prio = 1;
 
 static void _starpu_mpi_add_sync_point_in_fxt(void);
-static void _starpu_mpi_submit_ready_request(void *arg);
 static void _starpu_mpi_handle_ready_request(struct _starpu_mpi_req *req);
 static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req);
 #ifdef STARPU_MPI_VERBOSE
@@ -130,7 +129,7 @@ void _starpu_mpi_submit_ready_request_inc(struct _starpu_mpi_req *req)
 	_starpu_mpi_submit_ready_request(req);
 }
 
-static void _starpu_mpi_submit_ready_request(void *arg)
+void _starpu_mpi_submit_ready_request(void *arg)
 {
 	_STARPU_MPI_LOG_IN();
 	struct _starpu_mpi_req *req = arg;
@@ -271,58 +270,10 @@ static void nop_acquire_cb(void *arg)
 	starpu_data_release(arg);
 }
 
-struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle_t data_handle,
-						       int srcdst, starpu_mpi_tag_t data_tag, MPI_Comm comm,
-						       unsigned detached, unsigned sync, int prio, void (*callback)(void *), void *arg,
-						       enum _starpu_mpi_request_type request_type, void (*func)(struct _starpu_mpi_req *),
-						       enum starpu_data_access_mode mode,
-						       int sequential_consistency,
-						       int is_internal_req,
-						       starpu_ssize_t count)
+void _starpu_mpi_req_willpost(struct _starpu_mpi_req *req STARPU_ATTRIBUTE_UNUSED)
 {
-	struct _starpu_mpi_req *req;
-
-	if (_starpu_mpi_fake_world_size != -1)
-	{
-		/* Don't actually do the communication */
-		starpu_data_acquire_on_node_cb_sequential_consistency(data_handle, STARPU_MAIN_RAM, mode, nop_acquire_cb, data_handle, sequential_consistency);
-		return NULL;
-	}
-
-	_STARPU_MPI_LOG_IN();
 	_STARPU_MPI_INC_POSTED_REQUESTS(1);
-
-	_starpu_mpi_comm_register(comm);
-
-	/* Initialize the request structure */
-	_starpu_mpi_request_init(&req);
-	req->request_type = request_type;
-	/* prio_list is sorted by increasing values */
-	if (use_prio)
-		req->prio = prio;
-	req->data_handle = data_handle;
-	req->node_tag.rank = srcdst;
-	req->node_tag.data_tag = data_tag;
-	req->node_tag.comm = comm;
-	req->detached = detached;
-	req->sync = sync;
-	req->callback = callback;
-	req->callback_arg = arg;
-	req->func = func;
-	req->sequential_consistency = sequential_consistency;
-	req->is_internal_req = is_internal_req;
-	/* For internal requests, we wait for both the request completion and the matching application request completion */
-	req->to_destroy = !is_internal_req;
-	req->count = count;
-
-	/* Asynchronously request StarPU to fetch the data in main memory: when
-	 * it is available in main memory, _starpu_mpi_submit_ready_request(req) is called and
-	 * the request is actually submitted */
-	starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(data_handle, STARPU_MAIN_RAM, mode, _starpu_mpi_submit_ready_request, (void *)req, sequential_consistency, &req->pre_sync_jobid, &req->post_sync_jobid);
-
-	_STARPU_MPI_LOG_OUT();
-	return req;
- }
+}
 
 #ifdef STARPU_SIMGRID
 int _starpu_mpi_simgrid_mpi_test(unsigned *done, int *flag)
@@ -1507,7 +1458,7 @@ int _starpu_mpi_progress_init(struct _starpu_mpi_argc_argv *argc_argv)
 	nready_process = starpu_get_env_number_default("STARPU_MPI_NREADY_PROCESS", 10);
 	ndetached_send = starpu_get_env_number_default("STARPU_MPI_NDETACHED_SEND", 10);
 	mpi_thread_cpuid = starpu_get_env_number_default("STARPU_MPI_THREAD_CPUID", -1);
-	use_prio = starpu_get_env_number_default("STARPU_MPI_PRIORITIES", 1);
+	_starpu_mpi_use_prio = starpu_get_env_number_default("STARPU_MPI_PRIORITIES", 1);
 
 #ifdef STARPU_SIMGRID
 	STARPU_PTHREAD_MUTEX_INIT(&wait_counter_mutex, NULL);

+ 4 - 48
mpi/src/nmad/starpu_mpi_nmad.c

@@ -39,7 +39,6 @@
 #include <nm_sendrecv_interface.h>
 #include <nm_mpi_nmad.h>
 
-static void _starpu_mpi_submit_ready_request(void *arg);
 
 static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req,nm_sr_event_t event);
 #ifdef STARPU_VERBOSE
@@ -50,7 +49,7 @@ static void _starpu_mpi_handle_pending_request(struct _starpu_mpi_req *req);
 static void _starpu_mpi_add_sync_point_in_fxt(void);
 
 static int mpi_thread_cpuid = -1;
-static int use_prio = 1;
+int _starpu_mpi_use_prio = 1;
 int _starpu_mpi_fake_world_size = -1;
 int _starpu_mpi_fake_world_rank = -1;
 
@@ -84,52 +83,9 @@ static void nop_acquire_cb(void *arg)
 	starpu_data_release(arg);
 }
 
-struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle_t data_handle,
-						       int srcdst, starpu_mpi_tag_t data_tag, MPI_Comm comm,
-						       unsigned detached, unsigned sync, int prio, void (*callback)(void *), void *arg,
-						       enum _starpu_mpi_request_type request_type, void (*func)(struct _starpu_mpi_req *),
-						       enum starpu_data_access_mode mode,
-						       int sequential_consistency,
-						       int is_internal_req,
-						       starpu_ssize_t count)
+void _starpu_mpi_req_willpost(struct _starpu_mpi_req *req STARPU_ATTRIBUTE_UNUSED)
 {
-	struct _starpu_mpi_req *req;
-
-	if (_starpu_mpi_fake_world_size != -1)
-	{
-		/* Don't actually do the communication */
-		starpu_data_acquire_on_node_cb_sequential_consistency(data_handle, STARPU_MAIN_RAM, mode, nop_acquire_cb, data_handle, sequential_consistency);
-		return NULL;
-	}
-
-	_STARPU_MPI_LOG_IN();
 	STARPU_ATOMIC_ADD( &pending_request, 1);
-
-	/* Initialize the request structure */
-	_starpu_mpi_request_init(&req);
-	req->request_type = request_type;
-	/* prio_list is sorted by increasing values */
-	if (use_prio)
-		req->prio = prio;
-	req->data_handle = data_handle;
-	req->node_tag.rank = srcdst;
-	req->node_tag.data_tag = data_tag;
-	req->node_tag.comm = comm;
-	req->detached = detached;
-	req->sync = sync;
-	req->callback = callback;
-	req->callback_arg = arg;
-	req->func = func;
-	req->sequential_consistency = sequential_consistency;
-	nm_mpi_nmad_dest(&req->session, &req->gate, comm, req->node_tag.rank);
-
-	/* Asynchronously request StarPU to fetch the data in main memory: when
-	 * it is available in main memory, _starpu_mpi_submit_ready_request(req) is called and
-	 * the request is actually submitted */
-	starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(data_handle, STARPU_MAIN_RAM, mode, _starpu_mpi_submit_ready_request, (void *)req, sequential_consistency, &req->pre_sync_jobid, &req->post_sync_jobid);
-
-	_STARPU_MPI_LOG_OUT();
-	return req;
 }
 
 /********************************************************/
@@ -492,7 +448,7 @@ static void _starpu_mpi_handle_pending_request(struct _starpu_mpi_req *req)
 	nm_sr_request_monitor(req->session, &(req->data_request), NM_SR_EVENT_FINALIZED,_starpu_mpi_handle_request_termination_callback);
 }
 
-static void _starpu_mpi_submit_ready_request(void *arg)
+void _starpu_mpi_submit_ready_request(void *arg)
 {
 	_STARPU_MPI_LOG_IN();
 	struct _starpu_mpi_req *req = arg;
@@ -673,7 +629,7 @@ int _starpu_mpi_progress_init(struct _starpu_mpi_argc_argv *argc_argv)
 	starpu_sem_init(&callback_sem, 0, 0);
 	running = 0;
 	mpi_thread_cpuid = starpu_get_env_number_default("STARPU_MPI_THREAD_CPUID", -1);
-	use_prio = starpu_get_env_number_default("STARPU_MPI_PRIORITIES", 1);
+	_starpu_mpi_use_prio = starpu_get_env_number_default("STARPU_MPI_PRIORITIES", 1);
 
 	STARPU_PTHREAD_CREATE(&progress_thread, NULL, _starpu_mpi_progress_thread_func, argc_argv);
 

+ 32 - 4
mpi/src/starpu_mpi.c

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2012-2013,2016-2017                      Inria
- * Copyright (C) 2009-2017                                Université de Bordeaux
+ * Copyright (C) 2009-2018                                Université de Bordeaux
  * Copyright (C) 2010-2017                                CNRS
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -41,18 +41,37 @@
 #include <mpi/starpu_mpi_tag.h>
 #endif
 
+static void _starpu_mpi_isend_irecv_common(struct _starpu_mpi_req *req, enum starpu_data_access_mode mode, int sequential_consistency) {
+	/* Asynchronously request StarPU to fetch the data in main memory: when
+	 * it is available in main memory, _starpu_mpi_submit_ready_request(req) is called and
+	 * the request is actually submitted */
+	starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(req->data_handle, STARPU_MAIN_RAM, mode, _starpu_mpi_submit_ready_request, (void *)req, sequential_consistency, &req->pre_sync_jobid, &req->post_sync_jobid);
+}
+
 static struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t data_handle,
 							int dest, starpu_mpi_tag_t data_tag, MPI_Comm comm,
 							unsigned detached, unsigned sync, int prio, void (*callback)(void *), void *arg,
 							int sequential_consistency)
 {
-	return _starpu_mpi_isend_irecv_common(data_handle, dest, data_tag, comm, detached, sync, prio, callback, arg, SEND_REQ, _starpu_mpi_isend_size_func,
+	if (_starpu_mpi_fake_world_size != -1)
+	{
+		/* Don't actually do the communication */
+		return NULL;
+	}
+
+	struct _starpu_mpi_req *req = _starpu_mpi_request_fill(
+	                                      data_handle, dest, data_tag, comm, detached, sync, prio, callback, arg, SEND_REQ, _starpu_mpi_isend_size_func,
+					      sequential_consistency, 0, 0);
+	_starpu_mpi_req_willpost(req);
+	_starpu_mpi_isend_irecv_common(req,
 #ifdef STARPU_MPI_PEDANTIC_ISEND
 					      STARPU_RW,
 #else
 					      STARPU_R,
 #endif
-					      sequential_consistency, 0, 0);
+					      sequential_consistency
+			);
+	return req;
 }
 
 int starpu_mpi_isend_prio(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int dest, starpu_mpi_tag_t data_tag, int prio, MPI_Comm comm)
@@ -147,7 +166,16 @@ int starpu_mpi_issend_detached(starpu_data_handle_t data_handle, int dest, starp
 
 struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle, int source, starpu_mpi_tag_t data_tag, MPI_Comm comm, unsigned detached, unsigned sync, void (*callback)(void *), void *arg, int sequential_consistency, int is_internal_req, starpu_ssize_t count)
 {
-	return _starpu_mpi_isend_irecv_common(data_handle, source, data_tag, comm, detached, sync, 0, callback, arg, RECV_REQ, _starpu_mpi_irecv_size_func, STARPU_W, sequential_consistency, is_internal_req, count);
+	if (_starpu_mpi_fake_world_size != -1)
+	{
+		/* Don't actually do the communication */
+		return NULL;
+	}
+
+	struct _starpu_mpi_req *req = _starpu_mpi_request_fill(data_handle, source, data_tag, comm, detached, sync, 0, callback, arg, RECV_REQ, _starpu_mpi_irecv_size_func, sequential_consistency, is_internal_req, count);
+	_starpu_mpi_req_willpost(req);
+	_starpu_mpi_isend_irecv_common(req, STARPU_W, sequential_consistency);
+	return req;
 }
 
 int starpu_mpi_irecv(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int source, starpu_mpi_tag_t data_tag, MPI_Comm comm)

+ 10 - 4
mpi/src/starpu_mpi_private.h

@@ -66,6 +66,7 @@ void _starpu_mpi_set_debug_level_max(int level);
 #endif
 extern int _starpu_mpi_fake_world_size;
 extern int _starpu_mpi_fake_world_rank;
+extern int _starpu_mpi_use_prio;
 
 #ifdef STARPU_NO_ASSERT
 #  define STARPU_MPI_ASSERT_MSG(x, msg, ...)	do { if (0) { (void) (x); }} while(0)
@@ -290,17 +291,22 @@ LIST_TYPE(_starpu_mpi_req,
 );
 PRIO_LIST_TYPE(_starpu_mpi_req, prio)
 
-struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle_t data_handle,
+/* To be called before actually queueing a request, so the communication layer knows it has something to look at */
+void _starpu_mpi_req_willpost(struct _starpu_mpi_req *req);
+
+void _starpu_mpi_submit_ready_request(void *arg);
+
+void _starpu_mpi_submit_ready_request_inc(struct _starpu_mpi_req *req);
+void _starpu_mpi_request_init(struct _starpu_mpi_req **req);
+struct _starpu_mpi_req * _starpu_mpi_request_fill(starpu_data_handle_t data_handle,
 						       int srcdst, starpu_mpi_tag_t data_tag, MPI_Comm comm,
 						       unsigned detached, unsigned sync, int prio, void (*callback)(void *), void *arg,
 						       enum _starpu_mpi_request_type request_type, void (*func)(struct _starpu_mpi_req *),
-						       enum starpu_data_access_mode mode,
 						       int sequential_consistency,
 						       int is_internal_req,
 						       starpu_ssize_t count);
 
-void _starpu_mpi_submit_ready_request_inc(struct _starpu_mpi_req *req);
-void _starpu_mpi_request_init(struct _starpu_mpi_req **req);
+
 void _starpu_mpi_request_destroy(struct _starpu_mpi_req *req);
 void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req);
 void _starpu_mpi_irecv_size_func(struct _starpu_mpi_req *req);

+ 45 - 0
mpi/src/starpu_mpi_req.c

@@ -19,6 +19,9 @@
 
 #include <starpu.h>
 #include <starpu_mpi_private.h>
+#if defined(STARPU_USE_MPI_MPI)
+#include <mpi/starpu_mpi_comm.h>
+#endif
 #if defined(STARPU_USE_MPI_NMAD)
 #include <pioman.h>
 #endif
@@ -93,6 +96,48 @@ void _starpu_mpi_request_init(struct _starpu_mpi_req **req)
 #endif
 }
 
+struct _starpu_mpi_req *_starpu_mpi_request_fill(starpu_data_handle_t data_handle,
+						 int srcdst, starpu_mpi_tag_t data_tag, MPI_Comm comm,
+						 unsigned detached, unsigned sync, int prio, void (*callback)(void *), void *arg,
+						 enum _starpu_mpi_request_type request_type, void (*func)(struct _starpu_mpi_req *),
+						 int sequential_consistency,
+						 int is_internal_req,
+						 starpu_ssize_t count)
+{
+	struct _starpu_mpi_req *req;
+
+#ifdef STARPU_USE_MPI_MPI
+	_starpu_mpi_comm_register(comm);
+#endif
+
+	/* Initialize the request structure */
+	_starpu_mpi_request_init(&req);
+	req->request_type = request_type;
+	/* prio_list is sorted by increasing values */
+	if (_starpu_mpi_use_prio)
+		req->prio = prio;
+	req->data_handle = data_handle;
+	req->node_tag.rank = srcdst;
+	req->node_tag.data_tag = data_tag;
+	req->node_tag.comm = comm;
+	req->detached = detached;
+	req->sync = sync;
+	req->callback = callback;
+	req->callback_arg = arg;
+	req->func = func;
+	req->sequential_consistency = sequential_consistency;
+#ifdef STARPU_USE_MPI_NMAD
+	nm_mpi_nmad_dest(&req->session, &req->gate, comm, req->node_tag.rank);
+#elif defined(STARPU_USE_MPI_MPI)
+	req->is_internal_req = is_internal_req;
+	/* For internal requests, we wait for both the request completion and the matching application request completion */
+	req->to_destroy = !is_internal_req;
+	req->count = count;
+#endif
+
+	return req;
+}
+
 void _starpu_mpi_request_destroy(struct _starpu_mpi_req *req)
 {
 #ifdef STARPU_USE_MPI_NMAD