|
@@ -19,6 +19,7 @@
|
|
|
#include <starpu_mpi.h>
|
|
|
#include <starpu_mpi_datatype.h>
|
|
|
#include <starpu_mpi_private.h>
|
|
|
+#include <starpu_mpi_cache.h>
|
|
|
#include <starpu_profiling.h>
|
|
|
#include <starpu_mpi_stats.h>
|
|
|
#include <starpu_mpi_cache.h>
|
|
@@ -26,6 +27,7 @@
|
|
|
#include <starpu_mpi_early_data.h>
|
|
|
#include <starpu_mpi_early_request.h>
|
|
|
#include <starpu_mpi_select_node.h>
|
|
|
+#include <starpu_mpi_tag.h>
|
|
|
#include <common/config.h>
|
|
|
#include <common/thread.h>
|
|
|
#include <datawizard/interfaces/data_interface.h>
|
|
@@ -324,27 +326,27 @@ static void _starpu_mpi_isend_data_func(struct _starpu_mpi_req *req)
|
|
|
if (req->sync == 0)
|
|
|
{
|
|
|
_STARPU_MPI_COMM_TO_DEBUG(req->count, req->datatype, req->srcdst, _STARPU_MPI_TAG_DATA, req->data_tag);
|
|
|
- req->ret = MPI_Isend(req->ptr, req->count, req->datatype, req->srcdst, _STARPU_MPI_TAG_DATA, req->comm, &req->request);
|
|
|
- STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Isend returning %s", _starpu_mpi_get_mpi_code(req->ret));
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
+ req->ret = MPI_Isend(req->ptr, req->count, req->datatype, req->srcdst, _STARPU_MPI_TAG_DATA, req->comm, &req->request);
|
|
|
+ STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Isend returning %s", _starpu_mpi_get_mpi_code(req->ret));
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
_STARPU_MPI_COMM_TO_DEBUG(req->count, req->datatype, req->srcdst, _STARPU_MPI_TAG_SYNC_DATA, req->data_tag);
|
|
|
- req->ret = MPI_Issend(req->ptr, req->count, req->datatype, req->srcdst, _STARPU_MPI_TAG_SYNC_DATA, req->comm, &req->request);
|
|
|
- STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Issend returning %s", _starpu_mpi_get_mpi_code(req->ret));
|
|
|
- }
|
|
|
+ req->ret = MPI_Issend(req->ptr, req->count, req->datatype, req->srcdst, _STARPU_MPI_TAG_SYNC_DATA, req->comm, &req->request);
|
|
|
+ STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Issend returning %s", _starpu_mpi_get_mpi_code(req->ret));
|
|
|
+ }
|
|
|
|
|
|
- _STARPU_MPI_TRACE_ISEND_SUBMIT_END(req->srcdst, req->data_tag, 0);
|
|
|
+ _STARPU_MPI_TRACE_ISEND_SUBMIT_END(req->srcdst, req->data_tag, 0);
|
|
|
|
|
|
- /* somebody is perhaps waiting for the MPI request to be posted */
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
|
|
|
- req->submitted = 1;
|
|
|
- STARPU_PTHREAD_COND_BROADCAST(&req->req_cond);
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
|
|
|
+ /* somebody is perhaps waiting for the MPI request to be posted */
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
|
|
|
+ req->submitted = 1;
|
|
|
+ STARPU_PTHREAD_COND_BROADCAST(&req->req_cond);
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
|
|
|
|
|
|
- _starpu_mpi_handle_detached_request(req);
|
|
|
+ _starpu_mpi_handle_detached_request(req);
|
|
|
|
|
|
- _STARPU_MPI_LOG_OUT();
|
|
|
+ _STARPU_MPI_LOG_OUT();
|
|
|
}
|
|
|
|
|
|
static void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
|
|
@@ -1148,7 +1150,7 @@ static void _starpu_mpi_print_thread_level_support(int thread_level, char *msg)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-static void _starpu_mpi_receive_early_data(struct _starpu_mpi_envelope *envelope, MPI_Status status)
|
|
|
+static void _starpu_mpi_receive_early_data(struct _starpu_mpi_envelope *envelope, MPI_Status status, MPI_Comm comm)
|
|
|
{
|
|
|
_STARPU_MPI_DEBUG(20, "Request with tag %d and source %d not found, creating a early_handle to receive incoming data..\n", envelope->data_tag, status.MPI_SOURCE);
|
|
|
_STARPU_MPI_DEBUG(20, "Request sync %d\n", envelope->sync);
|
|
@@ -1157,7 +1159,7 @@ static void _starpu_mpi_receive_early_data(struct _starpu_mpi_envelope *envelope
|
|
|
|
|
|
starpu_data_handle_t data_handle = NULL;
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
- data_handle = _starpu_data_get_data_handle_from_tag(envelope->data_tag);
|
|
|
+ data_handle = _starpu_mpi_data_get_data_handle_from_tag(envelope->data_tag);
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
|
|
|
if (data_handle && starpu_data_get_interface_id(data_handle) < STARPU_MAX_INTERFACE_ID)
|
|
@@ -1182,7 +1184,7 @@ static void _starpu_mpi_receive_early_data(struct _starpu_mpi_envelope *envelope
|
|
|
_STARPU_MPI_DEBUG(20, "Posting internal detached irecv on early_handle with tag %d from src %d ..\n", early_data_handle->data_tag, status.MPI_SOURCE);
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
early_data_handle->req = _starpu_mpi_irecv_common(early_data_handle->handle, status.MPI_SOURCE,
|
|
|
- early_data_handle->data_tag, MPI_COMM_WORLD, 1, 0,
|
|
|
+ early_data_handle->data_tag, comm, 1, 0,
|
|
|
NULL, NULL, 1, 1, envelope->size);
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
|
|
@@ -1248,6 +1250,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
_starpu_mpi_comm_amounts_init(MPI_COMM_WORLD);
|
|
|
_starpu_mpi_cache_init(MPI_COMM_WORLD);
|
|
|
_starpu_mpi_select_node_init();
|
|
|
+ _starpu_mpi_tag_init();
|
|
|
|
|
|
_starpu_mpi_early_request_init(worldsize);
|
|
|
_starpu_mpi_early_data_init(worldsize);
|
|
@@ -1384,7 +1387,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- _starpu_mpi_receive_early_data(envelope, status);
|
|
|
+ _starpu_mpi_receive_early_data(envelope, status, MPI_COMM_WORLD);
|
|
|
}
|
|
|
}
|
|
|
/* Case: a matching application request has been found for
|
|
@@ -1626,29 +1629,65 @@ int starpu_mpi_shutdown(void)
|
|
|
_starpu_mpi_comm_amounts_display(rank);
|
|
|
_starpu_mpi_comm_amounts_free();
|
|
|
_starpu_mpi_cache_free(world_size);
|
|
|
+ _starpu_mpi_tag_free();
|
|
|
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
|
void _starpu_mpi_clear_cache(starpu_data_handle_t data_handle)
|
|
|
{
|
|
|
- starpu_mpi_cache_flush(MPI_COMM_WORLD, data_handle);
|
|
|
+ _starpu_mpi_data_release_tag(data_handle);
|
|
|
+ struct _starpu_mpi_data *mpi_data = data_handle->mpi_data;
|
|
|
+ _starpu_mpi_cache_flush(mpi_data->comm, data_handle);
|
|
|
+ free(data_handle->mpi_data);
|
|
|
}
|
|
|
|
|
|
-void starpu_mpi_data_register(starpu_data_handle_t data_handle, int tag, int rank)
|
|
|
+void starpu_mpi_data_register_comm(starpu_data_handle_t data_handle, int tag, int rank, MPI_Comm comm)
|
|
|
{
|
|
|
-#ifdef STARPU_DEVEL
|
|
|
-#warning see if the following code is really needed, it deadlocks some applications
|
|
|
-#if 0
|
|
|
- int my;
|
|
|
- starpu_mpi_comm_rank(MPI_COMM_WORLD, &my);
|
|
|
- if (my != rank)
|
|
|
- STARPU_MPI_ASSERT_MSG(data_handle->home_node == -1, "Data does not belong to node %d, it should be assigned a home node -1", my);
|
|
|
-#endif
|
|
|
-#endif
|
|
|
- _starpu_data_set_rank(data_handle, rank);
|
|
|
- _starpu_data_set_tag(data_handle, tag);
|
|
|
- _starpu_data_set_unregister_hook(data_handle, _starpu_mpi_clear_cache);
|
|
|
+ struct _starpu_mpi_data *mpi_data;
|
|
|
+ if (data_handle->mpi_data)
|
|
|
+ {
|
|
|
+ mpi_data = data_handle->mpi_data;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ mpi_data = malloc(sizeof(struct _starpu_mpi_data));
|
|
|
+ data_handle->mpi_data = mpi_data;
|
|
|
+ _starpu_mpi_data_register_tag(data_handle, tag);
|
|
|
+ _starpu_data_set_unregister_hook(data_handle, _starpu_mpi_clear_cache);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (tag != -1)
|
|
|
+ {
|
|
|
+ mpi_data->tag = tag;
|
|
|
+ }
|
|
|
+ if (rank != -1)
|
|
|
+ {
|
|
|
+ mpi_data->rank = rank;
|
|
|
+ mpi_data->comm = comm;
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+void starpu_mpi_data_set_rank_comm(starpu_data_handle_t handle, int rank, MPI_Comm comm)
|
|
|
+{
|
|
|
+ starpu_mpi_data_register_comm(handle, -1, rank, comm);
|
|
|
+}
|
|
|
+
|
|
|
+void starpu_mpi_data_set_tag(starpu_data_handle_t handle, int tag)
|
|
|
+{
|
|
|
+ starpu_mpi_data_register_comm(handle, tag, -1, MPI_COMM_WORLD);
|
|
|
+}
|
|
|
+
|
|
|
+int starpu_mpi_data_get_rank(starpu_data_handle_t data)
|
|
|
+{
|
|
|
+ STARPU_ASSERT_MSG(data->mpi_data, "starpu_mpi_data_register MUST be called for data %p\n", data);
|
|
|
+ return ((struct _starpu_mpi_data *)(data->mpi_data))->rank;
|
|
|
+}
|
|
|
+
|
|
|
+int starpu_mpi_data_get_tag(starpu_data_handle_t data)
|
|
|
+{
|
|
|
+ STARPU_ASSERT_MSG(data->mpi_data, "starpu_mpi_data_register MUST be called for data %p\n", data);
|
|
|
+ return ((struct _starpu_mpi_data *)(data->mpi_data))->tag;
|
|
|
}
|
|
|
|
|
|
int starpu_mpi_comm_size(MPI_Comm comm, int *size)
|