Browse Source

Pave the way for computing a diffusion tree

Samuel Thibault 7 years ago
parent
commit
225ea854c5

+ 21 - 0
mpi/src/mpi/starpu_mpi_mpi.c

@@ -124,6 +124,27 @@ void _starpu_mpi_submit_ready_request_inc(struct _starpu_mpi_req *req)
 	_starpu_mpi_submit_ready_request(req);
 }
 
+void _starpu_mpi_coop_sends_build_tree(struct _starpu_mpi_coop_sends *coop_sends)
+{
+	/* TODO: turn them into redirects & forwards */
+}
+
+void _starpu_mpi_submit_coop_sends(struct _starpu_mpi_coop_sends *coop_sends, int submit_redirects, int submit_data)
+{
+	unsigned i, n = coop_sends->n;
+
+	/* Note: coop_sends might disappear very very soon after last request is submitted */
+	for (i = 0; i < n; i++)
+	{
+		if (coop_sends->reqs_array[i]->request_type == SEND_REQ && submit_data)
+		{
+			_STARPU_MPI_DEBUG(0, "cooperative sends %p sending to %d\n", coop_sends, coop_sends->reqs_array[i]->node_tag.rank);
+			_starpu_mpi_submit_ready_request(coop_sends->reqs_array[i]);
+		}
+		/* TODO: handle redirect requests */
+	}
+}
+
 void _starpu_mpi_submit_ready_request(void *arg)
 {
 	_STARPU_MPI_LOG_IN();

+ 21 - 0
mpi/src/nmad/starpu_mpi_nmad.c

@@ -443,6 +443,27 @@ static void _starpu_mpi_handle_pending_request(struct _starpu_mpi_req *req)
 	nm_sr_request_monitor(req->session, &(req->data_request), NM_SR_EVENT_FINALIZED,_starpu_mpi_handle_request_termination_callback);
 }
 
+void _starpu_mpi_coop_sends_build_tree(struct _starpu_mpi_coop_sends *coop_sends)
+{
+	/* TODO: turn them into redirects & forwards */
+}
+
+void _starpu_mpi_submit_coop_sends(struct _starpu_mpi_coop_sends *coop_sends, int submit_redirects, int submit_data)
+{
+	unsigned i, n = coop_sends->n;
+
+	/* Note: coop_sends might disappear very very soon after last request is submitted */
+	for (i = 0; i < n; i++)
+	{
+		if (coop_sends->reqs_array[i]->request_type == SEND_REQ && submit_data)
+		{
+			_STARPU_MPI_DEBUG(0, "cooperative sends %p sending to %d\n", coop_sends, coop_sends->reqs_array[i]->node_tag.rank);
+			_starpu_mpi_submit_ready_request(coop_sends->reqs_array[i]);
+		}
+		/* TODO: handle redirect requests */
+	}
+}
+
 void _starpu_mpi_submit_ready_request(void *arg)
 {
 	_STARPU_MPI_LOG_IN();

+ 58 - 34
mpi/src/starpu_mpi_coop_sends.c

@@ -47,8 +47,9 @@ void _starpu_mpi_release_req_data(struct _starpu_mpi_req *req)
 		if (last)
 		{
 			/* We were last, release data */
-			starpu_data_release(req->data_handle);
+			free(coop_sends->reqs_array);
 			free(coop_sends);
+			starpu_data_release(req->data_handle);
 		}
 	}
 	else
@@ -58,8 +59,45 @@ void _starpu_mpi_release_req_data(struct _starpu_mpi_req *req)
 	}
 }
 
+/* Comparison function for getting qsort to put requests with high priority first */
+static int _starpu_mpi_reqs_prio_compare(const void *a, const void *b)
+{
+	const struct _starpu_mpi_req * const *ra = a;
+	const struct _starpu_mpi_req * const *rb = b;
+	return (*rb)->prio - (*ra)->prio;
+}
+
+/* Sort the requests by priority and build a diffusion tree. Actually does something only once per coop_sends bag. */
+static void _starpu_mpi_coop_sends_optimize(struct _starpu_mpi_coop_sends *coop_sends)
+{
+	_starpu_spin_lock(&coop_sends->lock);
+	if (!coop_sends->reqs_array)
+	{
+		unsigned n = coop_sends->n, i;
+		struct _starpu_mpi_req *cur;
+		struct _starpu_mpi_req **reqs;
+
+		_STARPU_MPI_DEBUG(0, "handling cooperative sends %p for %u neighbours\n", coop_sends, n);
+
+		/* Store them in an array */
+		_STARPU_CALLOC(reqs, n, sizeof(*reqs));
+		for (cur  = _starpu_mpi_req_multilist_begin_coop_sends(&coop_sends->reqs), i = 0;
+		     cur != _starpu_mpi_req_multilist_end_coop_sends(&coop_sends->reqs);
+		     cur  = _starpu_mpi_req_multilist_next_coop_sends(cur), i++)
+			reqs[i] = cur;
+		coop_sends->reqs_array = reqs;
+
+		/* Sort them */
+		qsort(reqs, n, sizeof(*reqs), _starpu_mpi_reqs_prio_compare);
+
+		/* And build the diffusion tree */
+		_starpu_mpi_coop_sends_build_tree(coop_sends);
+	}
+	_starpu_spin_unlock(&coop_sends->lock);
+}
+
 /* This is called on completion of acquisition of data for a cooperative send */
-static void _starpu_mpi_submit_coop_sends(void *arg)
+static void _starpu_mpi_coop_sends_data_ready(void *arg)
 {
 	_STARPU_MPI_LOG_IN();
 	struct _starpu_mpi_coop_sends *coop_sends = arg;
@@ -74,38 +112,14 @@ static void _starpu_mpi_submit_coop_sends(void *arg)
 		_starpu_spin_unlock(&mpi_data->coop_lock);
 	}
 
-	struct _starpu_mpi_req *cur;
-	unsigned n;
-
-	/* Count their number */
-	n = 0;
-	for (cur  = _starpu_mpi_req_multilist_begin_coop_sends(&coop_sends->reqs);
-	     cur != _starpu_mpi_req_multilist_end_coop_sends(&coop_sends->reqs);
-	     cur  = _starpu_mpi_req_multilist_next_coop_sends(cur))
-		n++;
-
-	_STARPU_MPI_DEBUG(0, "handling cooperative sends %p for %u neighbours\n", coop_sends, n);
-
-	/* Store them in an array */
-	struct _starpu_mpi_req *reqs[n];
-	memset(reqs, 0, n * sizeof(*reqs));
-	unsigned i;
-	for (cur  = _starpu_mpi_req_multilist_begin_coop_sends(&coop_sends->reqs), i = 0;
-	     cur != _starpu_mpi_req_multilist_end_coop_sends(&coop_sends->reqs);
-	     cur  = _starpu_mpi_req_multilist_next_coop_sends(cur), i++)
-		reqs[i] = cur;
-
-	/* TODO: sort reqs by priority */
-
-	/* TODO: turn them into redirects & forwards */
+	/* Build diffusion tree */
+	_starpu_mpi_coop_sends_optimize(coop_sends);
 
 	/* And submit them */
-	for (i = 0; i < n; i++)
-	{
-		/* Prefetch next request, since once we submit cur we may not be able to read it.  */
-		_STARPU_MPI_DEBUG(0, "cooperative sends %p sending to %d\n", coop_sends, reqs[i]->node_tag.rank);
-		_starpu_mpi_submit_ready_request(reqs[i]);
-	}
+	if (STARPU_TEST_AND_SET(&coop_sends->redirects_sent, 1) == 0)
+		_starpu_mpi_submit_coop_sends(coop_sends, 1, 1);
+	else
+		_starpu_mpi_submit_coop_sends(coop_sends, 0, 1);
 	_STARPU_MPI_LOG_OUT();
 }
 
@@ -116,7 +130,13 @@ static void _starpu_mpi_coop_send_flush(struct _starpu_mpi_coop_sends *coop_send
 {
 	if (!coop_sends)
 		return;
-	/* TODO: send the redirects already */
+
+	/* Build diffusion tree */
+	_starpu_mpi_coop_sends_optimize(coop_sends);
+
+	/* And submit them */
+	if (STARPU_TEST_AND_SET(&coop_sends->redirects_sent, 1) == 0)
+		_starpu_mpi_submit_coop_sends(coop_sends, 1, 0);
 }
 
 /* This is called when a write to the data was just submitted, which means we
@@ -178,6 +198,7 @@ void _starpu_mpi_coop_send(starpu_data_handle_t data_handle, struct _starpu_mpi_
 				coop_sends = mpi_data->coop_sends;
 				_STARPU_MPI_DEBUG(0, "%p: add to cooperative sends %p, dest %d\n", data_handle, coop_sends, req->node_tag.rank);
 				_starpu_mpi_req_multilist_push_back_coop_sends(&coop_sends->reqs, req);
+				coop_sends->n++;
 				req->coop_sends_head = coop_sends;
 				first = 0;
 				done = 1;
@@ -208,8 +229,11 @@ void _starpu_mpi_coop_send(starpu_data_handle_t data_handle, struct _starpu_mpi_
 		{
 			/* Didn't find something to join, create one out of critical section */
 			_STARPU_MPI_CALLOC(coop_sends, 1, sizeof(*coop_sends));
+			coop_sends->redirects_sent = 0;
+			coop_sends->n = 1;
 			_starpu_mpi_req_multilist_head_init_coop_sends(&coop_sends->reqs);
 			_starpu_mpi_req_multilist_push_back_coop_sends(&coop_sends->reqs, req);
+			_starpu_spin_init(&coop_sends->lock);
 			req->coop_sends_head = coop_sends;
 			coop_sends->mpi_data = mpi_data;
 		}
@@ -223,7 +247,7 @@ void _starpu_mpi_coop_send(starpu_data_handle_t data_handle, struct _starpu_mpi_
 	if (first)
 	{
 		/* We were first, we are responsible for acquiring the data for everybody */
-		starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(req->data_handle, STARPU_MAIN_RAM, mode, _starpu_mpi_submit_coop_sends, coop_sends, sequential_consistency, &req->pre_sync_jobid, NULL);
+		starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(req->data_handle, STARPU_MAIN_RAM, mode, _starpu_mpi_coop_sends_data_ready, coop_sends, sequential_consistency, &req->pre_sync_jobid, NULL);
 	}
 }
 

+ 18 - 0
mpi/src/starpu_mpi_private.h

@@ -206,6 +206,12 @@ struct _starpu_mpi_coop_sends
 	/* List of send requests */
 	struct _starpu_mpi_req_multilist_coop_sends reqs;
 	struct _starpu_mpi_data *mpi_data;
+
+	/* Array of send requests, after sorting out */
+	struct _starpu_spinlock lock;
+	struct _starpu_mpi_req **reqs_array;
+	unsigned n;
+	unsigned redirects_sent;
 };
 
 /* Initialized in starpu_mpi_data_register_comm */
@@ -322,9 +328,21 @@ void _starpu_mpi_submit_ready_request(void *arg);
 /* To be called when request is completed */
 void _starpu_mpi_release_req_data(struct _starpu_mpi_req *req);
 
+/* Build a communication tree. Called before _starpu_mpi_coop_send is ever called. coop_sends->lock is held. */
+void _starpu_mpi_coop_sends_build_tree(struct _starpu_mpi_coop_sends *coop_sends);
 /* Try to merge with send request with other send requests */
 void _starpu_mpi_coop_send(starpu_data_handle_t data_handle, struct _starpu_mpi_req *req, enum starpu_data_access_mode mode, int sequential_consistency);
 
+/* Actually submit the coop_sends bag to MPI.
+ * At least one of submit_redirects or submit_data is true.
+ * _starpu_mpi_submit_coop_sends may be called either
+ * - just once with both parameters being true,
+ * - or once with submit_redirects being true (data is not available yet, but we
+ * can send the redirects), and a second time with submit_data being true. Or
+ * the converse, possibly on different threads, etc.
+ */
+void _starpu_mpi_submit_coop_sends(struct _starpu_mpi_coop_sends *coop_sends, int submit_redirects, int submit_data);
+
 void _starpu_mpi_submit_ready_request_inc(struct _starpu_mpi_req *req);
 void _starpu_mpi_request_init(struct _starpu_mpi_req **req);
 struct _starpu_mpi_req * _starpu_mpi_request_fill(starpu_data_handle_t data_handle,