Преглед на файлове

mpi: defer choosing the target node to when data acquisition is complete semantically

i.e. "execution time", not "submission time", so that StarPU-MPI can now
choose on which memory node to make the read or write.
Samuel Thibault преди 4 години
родител
ревизия
bf516bbc43
променени са 7 файла, в които са добавени 85 реда и са изтрити 11 реда
  1. 7 1
      include/starpu_data.h
  2. 1 1
      mpi/src/mpi/starpu_mpi_mpi.c
  3. 27 3
      mpi/src/starpu_mpi.c
  4. 31 3
      mpi/src/starpu_mpi_coop_sends.c
  5. 5 0
      mpi/src/starpu_mpi_private.h
  6. 1 1
      mpi/src/starpu_mpi_req.c
  7. 13 2
      src/datawizard/user_interactions.c

+ 7 - 1
include/starpu_data.h

@@ -298,8 +298,14 @@ int starpu_data_acquire_on_node_cb_sequential_consistency_quick(starpu_data_hand
    to retrieve the jobid of the synchronization tasks. \e pre_sync_jobid happens
    just before the acquisition, and \e post_sync_jobid happens just after the
    release.
+
+   callback_acquired is called when the data is acquired in terms of semantic,
+   but the data is not fetched yet. It is given a pointer to the node, which it
+   can modify if it wishes so.
+
+   This is a very internal interface, subject to changes, do not use this.
 */
-int starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(starpu_data_handle_t handle, int node, enum starpu_data_access_mode mode, void (*callback)(void *), void *arg, int sequential_consistency, int quick, long *pre_sync_jobid, long *post_sync_jobid);
+int starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(starpu_data_handle_t handle, int node, enum starpu_data_access_mode mode, void (*callback_acquired)(void *arg, int *node, enum starpu_data_access_mode mode), void (*callback)(void *arg), void *arg, int sequential_consistency, int quick, long *pre_sync_jobid, long *post_sync_jobid);
 
 /**
    The application can call this function instead of starpu_data_acquire() so as to

+ 1 - 1
mpi/src/mpi/starpu_mpi_mpi.c

@@ -256,7 +256,7 @@ void _starpu_mpi_submit_ready_request(void *arg)
 				_STARPU_MPI_DEBUG(3, "Calling data_acquire_cb on starpu_mpi_copy_cb..\n");
 				STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
 				// FIXME: when buffer == NULL, do not hardcode acquiring on early_data_handle->buffer_node, to just acquire where the data happens to have been stored by MPI
-				starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(early_data_handle->handle,early_data_handle->buffer_node,STARPU_R,_starpu_mpi_early_data_cb,(void*) cb_args,  1, 0, NULL, NULL);
+				starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(early_data_handle->handle,early_data_handle->buffer_node,STARPU_R,NULL,_starpu_mpi_early_data_cb,(void*) cb_args,  1, 0, NULL, NULL);
 				STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
 			}
 			else

+ 27 - 3
mpi/src/starpu_mpi.c

@@ -34,9 +34,30 @@
 #include <core/task.h>
 #include <core/topology.h>
 
+int _starpu_mpi_choose_node(starpu_data_handle_t data_handle, enum starpu_data_access_mode mode)
+{
+	/* TODO */
+	return STARPU_MAIN_RAM;
+}
+
+static void _starpu_mpi_acquired_callback(void *arg, int *nodep, enum starpu_data_access_mode mode)
+{
+	struct _starpu_mpi_req *req = arg;
+	int node = *nodep;
+
+	/* The data was acquired in terms of dependencies, we can now look the
+	 * current state of the handle and decide which node we prefer for the data
+	 * fetch */
+
+	if (node < 0)
+		node = _starpu_mpi_choose_node(req->data_handle, mode);
+
+	req->node = *nodep = node;
+}
+
 static void _starpu_mpi_isend_irecv_common(struct _starpu_mpi_req *req, enum starpu_data_access_mode mode, int sequential_consistency)
 {
-	unsigned node = STARPU_MAIN_RAM; // XXX For now
+	int node = -1;
 
 	/* Asynchronously request StarPU to fetch the data in main memory: when
 	 * it is available in main memory, _starpu_mpi_submit_ready_request(req) is called and
@@ -48,6 +69,9 @@ static void _starpu_mpi_isend_irecv_common(struct _starpu_mpi_req *req, enum sta
 		size_t size = starpu_data_get_size(req->data_handle);
 		if (size)
 		{
+			/* FIXME: rather take the less-loaded NUMA node */
+			node = STARPU_MAIN_RAM;
+
 			/* This will potentially block */
 			starpu_memory_allocate(node, size, STARPU_MEMORY_WAIT);
 			req->reserved_size = size;
@@ -58,12 +82,12 @@ static void _starpu_mpi_isend_irecv_common(struct _starpu_mpi_req *req, enum sta
 
 	if (sequential_consistency)
 	{
-		starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(req->data_handle, node, mode, _starpu_mpi_submit_ready_request, (void *)req, 1 /*sequential consistency*/, 1, &req->pre_sync_jobid, &req->post_sync_jobid);
+		starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(req->data_handle, node, mode, _starpu_mpi_acquired_callback, _starpu_mpi_submit_ready_request, (void *)req, 1 /*sequential consistency*/, 1, &req->pre_sync_jobid, &req->post_sync_jobid);
 	}
 	else
 	{
 		/* post_sync_job_id has already been filled */
-		starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(req->data_handle, node, mode, _starpu_mpi_submit_ready_request, (void *)req, 0 /*sequential consistency*/, 1, &req->pre_sync_jobid, NULL);
+		starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(req->data_handle, node, mode, _starpu_mpi_acquired_callback, _starpu_mpi_submit_ready_request, (void *)req, 0 /*sequential consistency*/, 1, &req->pre_sync_jobid, NULL);
 	}
 }
 

+ 31 - 3
mpi/src/starpu_mpi_coop_sends.c

@@ -57,6 +57,23 @@ void _starpu_mpi_release_req_data(struct _starpu_mpi_req *req)
 	}
 }
 
+/* The data was acquired in terms of dependencies, we can now look the
+ * current state of the handle and decide which node we prefer for the data
+ * fetch */
+static void _starpu_mpi_coop_send_acquired_callback(void *arg, int *nodep, enum starpu_data_access_mode mode)
+{
+	struct _starpu_mpi_coop_sends *coop_sends = arg;
+	int node = *nodep;
+
+	if (node < 0)
+		node = _starpu_mpi_choose_node(coop_sends->data_handle, mode);
+
+	/* Record the node in the first req */
+	_starpu_mpi_req_multilist_begin_coop_sends(&coop_sends->reqs)->node = node;
+
+	*nodep = node;
+}
+
 /* Comparison function for getting qsort to put requests with high priority first */
 static int _starpu_mpi_reqs_prio_compare(const void *a, const void *b)
 {
@@ -109,6 +126,8 @@ static void _starpu_mpi_coop_sends_data_ready(void *arg)
 	_STARPU_MPI_LOG_IN();
 	struct _starpu_mpi_coop_sends *coop_sends = arg;
 	struct _starpu_mpi_data *mpi_data = coop_sends->mpi_data;
+	struct _starpu_mpi_req *cur;
+	unsigned node;
 
 	/* Take the cooperative send bag out from more submissions */
 	if (mpi_data->coop_sends == coop_sends)
@@ -119,6 +138,15 @@ static void _starpu_mpi_coop_sends_data_ready(void *arg)
 		_starpu_spin_unlock(&mpi_data->coop_lock);
 	}
 
+	/* Copy over the memory node number */
+	cur = _starpu_mpi_req_multilist_begin_coop_sends(&coop_sends->reqs);
+	node = cur->node;
+
+	for ( ;
+	     cur != _starpu_mpi_req_multilist_end_coop_sends(&coop_sends->reqs);
+	     cur  = _starpu_mpi_req_multilist_next_coop_sends(cur))
+		cur->node = node;
+
 	if (coop_sends->n == 1)
 	{
 		/* Trivial case, just submit it */
@@ -187,6 +215,7 @@ static int _starpu_mpi_coop_send_compatible(struct _starpu_mpi_req *req, struct
 	       && prevreq->sequential_consistency == req->sequential_consistency;
 }
 
+
 void _starpu_mpi_coop_send(starpu_data_handle_t data_handle, struct _starpu_mpi_req *req, enum starpu_data_access_mode mode, int sequential_consistency)
 {
 	struct _starpu_mpi_data *mpi_data = _starpu_mpi_data_get(data_handle);
@@ -250,6 +279,7 @@ void _starpu_mpi_coop_send(starpu_data_handle_t data_handle, struct _starpu_mpi_
 		{
 			/* Didn't find something to join, create one out of critical section */
 			_STARPU_MPI_CALLOC(coop_sends, 1, sizeof(*coop_sends));
+			coop_sends->data_handle = data_handle;
 			coop_sends->redirects_sent = 0;
 			coop_sends->n = 1;
 			_starpu_mpi_req_multilist_head_init_coop_sends(&coop_sends->reqs);
@@ -265,11 +295,9 @@ void _starpu_mpi_coop_send(starpu_data_handle_t data_handle, struct _starpu_mpi_
 	/* In case we created one for nothing after all */
 	free(tofree);
 
-	unsigned node = STARPU_MAIN_RAM; // XXX For now
-
 	if (first)
 		/* We were first, we are responsible for acquiring the data for everybody */
-		starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(req->data_handle, node, mode, _starpu_mpi_coop_sends_data_ready, coop_sends, sequential_consistency, 0, &coop_sends->pre_sync_jobid, NULL);
+		starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(req->data_handle, -1, mode, _starpu_mpi_coop_send_acquired_callback, _starpu_mpi_coop_sends_data_ready, coop_sends, sequential_consistency, 0, &coop_sends->pre_sync_jobid, NULL);
 	else
 		req->pre_sync_jobid = coop_sends->pre_sync_jobid;
 }

+ 5 - 0
mpi/src/starpu_mpi_private.h

@@ -187,6 +187,8 @@ MULTILIST_CREATE_TYPE(_starpu_mpi_req, coop_sends)
 /** One bag of cooperative sends */
 struct _starpu_mpi_coop_sends
 {
+	starpu_data_handle_t data_handle;
+
 	/** List of send requests */
 	struct _starpu_mpi_req_multilist_coop_sends reqs;
 	struct _starpu_mpi_data *mpi_data;
@@ -319,6 +321,9 @@ struct _starpu_mpi_req * _starpu_mpi_request_fill(starpu_data_handle_t data_hand
 						  starpu_ssize_t count);
 
 void _starpu_mpi_request_destroy(struct _starpu_mpi_req *req);
+
+int _starpu_mpi_choose_node(starpu_data_handle_t data_handle, enum starpu_data_access_mode mode);
+
 void _starpu_mpi_data_flush(starpu_data_handle_t data_handle);
 
 struct _starpu_mpi_argc_argv

+ 1 - 1
mpi/src/starpu_mpi_req.c

@@ -25,7 +25,7 @@ void _starpu_mpi_request_init(struct _starpu_mpi_req **req)
 	/* Initialize the request structure */
 	//(*req)->data_handle = NULL;
 	//(*req)->prio = 0;
-	(*req)->node = STARPU_MAIN_RAM; // XXX For now
+	(*req)->node = (unsigned) -1;
 
 	//(*req)->datatype = 0;
 	//(*req)->datatype_name = NULL;

+ 13 - 2
src/datawizard/user_interactions.c

@@ -71,6 +71,7 @@ struct user_interaction_wrapper
 	enum starpu_is_prefetch prefetch;
 	unsigned async;
 	int prio;
+	void (*callback_acquired)(void *, int *node, enum starpu_data_access_mode mode);
 	void (*callback)(void *);
 	void *callback_arg;
 	struct starpu_task *pre_sync_task;
@@ -152,6 +153,12 @@ static void _starpu_data_acquire_fetch_data_callback(void *arg)
 /* Called when the data acquisition is done, launch the fetch into target memory */
 static void _starpu_data_acquire_continuation_non_blocking(void *arg)
 {
+	struct user_interaction_wrapper *wrapper = (struct user_interaction_wrapper *) arg;
+
+	if (wrapper->callback_acquired)
+		/* This can change the node at will according to the current data situation */
+		wrapper->callback_acquired(wrapper->callback_arg, &wrapper->node, wrapper->mode);
+
 	_starpu_data_acquire_launch_fetch(arg, 1, _starpu_data_acquire_fetch_data_callback, arg);
 }
 
@@ -173,7 +180,10 @@ static void starpu_data_acquire_cb_pre_sync_callback(void *arg)
 
 /* The data must be released by calling starpu_data_release later on */
 int starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(starpu_data_handle_t handle, int node,
-							  enum starpu_data_access_mode mode, void (*callback)(void *), void *arg,
+							  enum starpu_data_access_mode mode,
+							  void (*callback_acquired)(void *arg, int *node, enum starpu_data_access_mode mode),
+							  void (*callback)(void *arg),
+							  void *arg,
 							  int sequential_consistency, int quick,
 							  long *pre_sync_jobid, long *post_sync_jobid)
 {
@@ -190,6 +200,7 @@ int starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(starpu_dat
 	_starpu_data_acquire_wrapper_init(wrapper, handle, node, mode);
 	wrapper->async = 1;
 
+	wrapper->callback_acquired = callback_acquired;
 	wrapper->callback = callback;
 	wrapper->callback_arg = arg;
 	wrapper->pre_sync_task = NULL;
@@ -263,7 +274,7 @@ int starpu_data_acquire_on_node_cb_sequential_consistency_quick(starpu_data_hand
 							  enum starpu_data_access_mode mode, void (*callback)(void *), void *arg,
 							  int sequential_consistency, int quick)
 {
-	return starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(handle, node, mode, callback, arg, sequential_consistency, quick, NULL, NULL);
+	return starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(handle, node, mode, NULL, callback, arg, sequential_consistency, quick, NULL, NULL);
 }
 
 int starpu_data_acquire_on_node_cb_sequential_consistency(starpu_data_handle_t handle, int node,