|
@@ -2,7 +2,7 @@
|
|
*
|
|
*
|
|
* Copyright (C) 2012-2013,2016-2017 Inria
|
|
* Copyright (C) 2012-2013,2016-2017 Inria
|
|
* Copyright (C) 2009-2018 Université de Bordeaux
|
|
* Copyright (C) 2009-2018 Université de Bordeaux
|
|
- * Copyright (C) 2010-2017 CNRS
|
|
|
|
|
|
+ * Copyright (C) 2010-2018 CNRS
|
|
*
|
|
*
|
|
* StarPU is free software; you can redistribute it and/or modify
|
|
* StarPU is free software; you can redistribute it and/or modify
|
|
* it under the terms of the GNU Lesser General Public License as published by
|
|
* it under the terms of the GNU Lesser General Public License as published by
|
|
@@ -29,11 +29,13 @@
|
|
*/
|
|
*/
|
|
|
|
|
|
/* This is called after a request is finished processing, to release the data */
|
|
/* This is called after a request is finished processing, to release the data */
|
|
-void _starpu_mpi_release_req_data(struct _starpu_mpi_req *req) {
|
|
|
|
|
|
+void _starpu_mpi_release_req_data(struct _starpu_mpi_req *req)
|
|
|
|
+{
|
|
if (!req->data_handle)
|
|
if (!req->data_handle)
|
|
return;
|
|
return;
|
|
|
|
|
|
- if (_starpu_mpi_req_multilist_queued_coop_sends(req)) {
|
|
|
|
|
|
+ if (_starpu_mpi_req_multilist_queued_coop_sends(req))
|
|
|
|
+ {
|
|
struct _starpu_mpi_coop_sends *coop_sends = req->coop_sends_head;
|
|
struct _starpu_mpi_coop_sends *coop_sends = req->coop_sends_head;
|
|
struct _starpu_mpi_data *mpi_data = coop_sends->mpi_data;
|
|
struct _starpu_mpi_data *mpi_data = coop_sends->mpi_data;
|
|
int last;
|
|
int last;
|
|
@@ -42,12 +44,15 @@ void _starpu_mpi_release_req_data(struct _starpu_mpi_req *req) {
|
|
_starpu_mpi_req_multilist_erase_coop_sends(&coop_sends->reqs, req);
|
|
_starpu_mpi_req_multilist_erase_coop_sends(&coop_sends->reqs, req);
|
|
last = _starpu_mpi_req_multilist_empty_coop_sends(&coop_sends->reqs);
|
|
last = _starpu_mpi_req_multilist_empty_coop_sends(&coop_sends->reqs);
|
|
_starpu_spin_unlock(&mpi_data->coop_lock);
|
|
_starpu_spin_unlock(&mpi_data->coop_lock);
|
|
- if (last) {
|
|
|
|
|
|
+ if (last)
|
|
|
|
+ {
|
|
/* We were last, release data */
|
|
/* We were last, release data */
|
|
starpu_data_release(req->data_handle);
|
|
starpu_data_release(req->data_handle);
|
|
free(coop_sends);
|
|
free(coop_sends);
|
|
}
|
|
}
|
|
- } else {
|
|
|
|
|
|
+ }
|
|
|
|
+ else
|
|
|
|
+ {
|
|
/* Trivial request */
|
|
/* Trivial request */
|
|
starpu_data_release(req->data_handle);
|
|
starpu_data_release(req->data_handle);
|
|
}
|
|
}
|
|
@@ -95,7 +100,8 @@ static void _starpu_mpi_submit_coop_sends(void *arg)
|
|
/* TODO: turn them into redirects & forwards */
|
|
/* TODO: turn them into redirects & forwards */
|
|
|
|
|
|
/* And submit them */
|
|
/* And submit them */
|
|
- for (i = 0; i < n; i++) {
|
|
|
|
|
|
+ for (i = 0; i < n; i++)
|
|
|
|
+ {
|
|
/* Prefetch next request, since once we submit cur we may not be able to read it. */
|
|
/* Prefetch next request, since once we submit cur we may not be able to read it. */
|
|
_STARPU_MPI_DEBUG(0, "cooperative sends %p sending to %d\n", coop_sends, reqs[i]->node_tag.rank);
|
|
_STARPU_MPI_DEBUG(0, "cooperative sends %p sending to %d\n", coop_sends, reqs[i]->node_tag.rank);
|
|
_starpu_mpi_submit_ready_request(reqs[i]);
|
|
_starpu_mpi_submit_ready_request(reqs[i]);
|
|
@@ -146,7 +152,8 @@ static int _starpu_mpi_coop_send_compatible(struct _starpu_mpi_req *req, struct
|
|
&& prevreq->sequential_consistency == req->sequential_consistency;
|
|
&& prevreq->sequential_consistency == req->sequential_consistency;
|
|
}
|
|
}
|
|
|
|
|
|
-void _starpu_mpi_coop_send(starpu_data_handle_t data_handle, struct _starpu_mpi_req *req, enum starpu_data_access_mode mode, int sequential_consistency) {
|
|
|
|
|
|
+void _starpu_mpi_coop_send(starpu_data_handle_t data_handle, struct _starpu_mpi_req *req, enum starpu_data_access_mode mode, int sequential_consistency)
|
|
|
|
+{
|
|
struct _starpu_mpi_data *mpi_data = _starpu_mpi_data_get(data_handle);
|
|
struct _starpu_mpi_data *mpi_data = _starpu_mpi_data_get(data_handle);
|
|
struct _starpu_mpi_coop_sends *coop_sends = NULL, *tofree = NULL;
|
|
struct _starpu_mpi_coop_sends *coop_sends = NULL, *tofree = NULL;
|
|
int done = 0, queue, first = 1;
|
|
int done = 0, queue, first = 1;
|
|
@@ -174,7 +181,9 @@ void _starpu_mpi_coop_send(starpu_data_handle_t data_handle, struct _starpu_mpi_
|
|
req->coop_sends_head = coop_sends;
|
|
req->coop_sends_head = coop_sends;
|
|
first = 0;
|
|
first = 0;
|
|
done = 1;
|
|
done = 1;
|
|
- } else {
|
|
|
|
|
|
+ }
|
|
|
|
+ else
|
|
|
|
+ {
|
|
/* Nope, incompatible, put ours instead */
|
|
/* Nope, incompatible, put ours instead */
|
|
_STARPU_MPI_DEBUG(0, "%p: new cooperative sends %p, dest %d\n", data_handle, coop_sends, req->node_tag.rank);
|
|
_STARPU_MPI_DEBUG(0, "%p: new cooperative sends %p, dest %d\n", data_handle, coop_sends, req->node_tag.rank);
|
|
mpi_data->coop_sends = coop_sends;
|
|
mpi_data->coop_sends = coop_sends;
|
|
@@ -211,7 +220,8 @@ void _starpu_mpi_coop_send(starpu_data_handle_t data_handle, struct _starpu_mpi_
|
|
/* In case we created one for nothing after all */
|
|
/* In case we created one for nothing after all */
|
|
free(tofree);
|
|
free(tofree);
|
|
|
|
|
|
- if (first) {
|
|
|
|
|
|
+ if (first)
|
|
|
|
+ {
|
|
/* We were first, we are responsible for acquiring the data for everybody */
|
|
/* We were first, we are responsible for acquiring the data for everybody */
|
|
starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(req->data_handle, STARPU_MAIN_RAM, mode, _starpu_mpi_submit_coop_sends, coop_sends, sequential_consistency, &req->pre_sync_jobid, NULL);
|
|
starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(req->data_handle, STARPU_MAIN_RAM, mode, _starpu_mpi_submit_coop_sends, coop_sends, sequential_consistency, &req->pre_sync_jobid, NULL);
|
|
}
|
|
}
|