|
|
@@ -21,7 +21,7 @@
|
|
|
#include <starpu_mpi_private.h>
|
|
|
#include <starpu_profiling.h>
|
|
|
#include <starpu_mpi_stats.h>
|
|
|
-#include <starpu_mpi_insert_task.h>
|
|
|
+#include <starpu_mpi_task_insert.h>
|
|
|
#include <common/config.h>
|
|
|
#include <common/thread.h>
|
|
|
|
|
|
@@ -38,7 +38,7 @@ static struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t dat
|
|
|
int source, int mpi_tag, MPI_Comm comm,
|
|
|
unsigned detached, void (*callback)(void *), void *arg,
|
|
|
int sequential_consistency, int is_internal_req,
|
|
|
- ssize_t psize);
|
|
|
+ ssize_t count);
|
|
|
static void _starpu_mpi_handle_detached_request(struct _starpu_mpi_req *req);
|
|
|
|
|
|
/* The list of requests that have been newly submitted by the application */
|
|
|
@@ -62,31 +62,43 @@ static int posted_requests = 0, newer_requests, barrier_running = 0;
|
|
|
|
|
|
#define _STARPU_MPI_INC_POSTED_REQUESTS(value) { STARPU_PTHREAD_MUTEX_LOCK(&mutex_posted_requests); posted_requests += value; STARPU_PTHREAD_MUTEX_UNLOCK(&mutex_posted_requests); }
|
|
|
|
|
|
-struct _starpu_mpi_copy_handle
|
|
|
+LIST_TYPE(_starpu_mpi_copy_handle,
|
|
|
+ starpu_data_handle_t handle;
|
|
|
+ struct _starpu_mpi_envelope *env;
|
|
|
+ struct _starpu_mpi_req *req;
|
|
|
+ void *buffer;
|
|
|
+ int mpi_tag;
|
|
|
+ int source;
|
|
|
+ int req_ready;
|
|
|
+ starpu_pthread_mutex_t req_mutex;
|
|
|
+ starpu_pthread_cond_t req_cond;
|
|
|
+);
|
|
|
+
|
|
|
+struct _starpu_mpi_copy_handle_hashlist
|
|
|
{
|
|
|
- starpu_data_handle_t handle;
|
|
|
- struct _starpu_mpi_envelope *env;
|
|
|
- int mpi_tag;
|
|
|
+ struct _starpu_mpi_copy_handle_list *list;
|
|
|
UT_hash_handle hh;
|
|
|
- struct _starpu_mpi_req *req;
|
|
|
+ int mpi_tag;
|
|
|
};
|
|
|
|
|
|
- /********************************************************/
|
|
|
- /* */
|
|
|
- /* Hashmap's requests functionalities */
|
|
|
- /* */
|
|
|
- /********************************************************/
|
|
|
+/********************************************************/
|
|
|
+/* */
|
|
|
+/* Hashmap's requests functionalities */
|
|
|
+/* */
|
|
|
+/********************************************************/
|
|
|
|
|
|
/** stores application requests for which data have not been received yet */
|
|
|
-static struct _starpu_mpi_req *_starpu_mpi_app_req_hashmap = NULL;
|
|
|
+static struct _starpu_mpi_req **_starpu_mpi_app_req_hashmap = NULL;
|
|
|
+static int _starpu_mpi_app_req_hashmap_count = 0;
|
|
|
/** stores data which have been received by MPI but have not been requested by the application */
|
|
|
-static struct _starpu_mpi_copy_handle *_starpu_mpi_copy_handle_hashmap = NULL;
|
|
|
+static struct _starpu_mpi_copy_handle_hashlist **_starpu_mpi_copy_handle_hashmap = NULL;
|
|
|
+static int _starpu_mpi_copy_handle_hashmap_count = 0;
|
|
|
|
|
|
-static struct _starpu_mpi_req* find_app_req(int mpi_tag)
|
|
|
+static struct _starpu_mpi_req* find_app_req(int mpi_tag, int source)
|
|
|
{
|
|
|
struct _starpu_mpi_req* req;
|
|
|
|
|
|
- HASH_FIND_INT(_starpu_mpi_app_req_hashmap, &mpi_tag, req);
|
|
|
+ HASH_FIND_INT(_starpu_mpi_app_req_hashmap[source], &mpi_tag, req);
|
|
|
|
|
|
return req;
|
|
|
}
|
|
|
@@ -95,24 +107,25 @@ static void add_app_req(struct _starpu_mpi_req *req)
|
|
|
{
|
|
|
struct _starpu_mpi_req *test_req;
|
|
|
|
|
|
- test_req = find_app_req(req->mpi_tag);
|
|
|
+ test_req = find_app_req(req->mpi_tag, req->srcdst);
|
|
|
|
|
|
if (test_req == NULL)
|
|
|
{
|
|
|
- HASH_ADD_INT(_starpu_mpi_app_req_hashmap, mpi_tag, req);
|
|
|
- _STARPU_MPI_DEBUG(3, "Adding request %p with tag %d in the application request hashmap. \n", req, req->mpi_tag);
|
|
|
+ HASH_ADD_INT(_starpu_mpi_app_req_hashmap[req->srcdst], mpi_tag, req);
|
|
|
+ _starpu_mpi_app_req_hashmap_count ++;
|
|
|
+ _STARPU_MPI_DEBUG(3, "Adding request %p with tag %d in the application request hashmap[%d]\n", req, req->mpi_tag, req->srcdst);
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- _STARPU_MPI_DEBUG(3, "[Error] request %p with tag %d already in the application request hashmap. \n", req, req->mpi_tag);
|
|
|
+ _STARPU_MPI_DEBUG(3, "[Error] request %p with tag %d already in the application request hashmap[%d]\n", req, req->mpi_tag, req->srcdst);
|
|
|
int seq_const = starpu_data_get_sequential_consistency_flag(req->data_handle);
|
|
|
if (seq_const && req->sequential_consistency)
|
|
|
{
|
|
|
- STARPU_ASSERT_MSG(!test_req, "[Error] request %p with tag %d wanted to be added to the application request hashmap, while another request %p with the same tag is already in it. \n Sequential consistency is activated : this is not supported by StarPU.", req, req->mpi_tag, test_req);
|
|
|
+ STARPU_ASSERT_MSG(!test_req, "[Error] request %p with tag %d wanted to be added to the application request hashmap[%d], while another request %p with the same tag is already in it. \n Sequential consistency is activated : this is not supported by StarPU.", req, req->mpi_tag, req->srcdst, test_req);
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- STARPU_ASSERT_MSG(!test_req, "[Error] request %p with tag %d wanted to be added to the application request hashmap, while another request %p with the same tag is already in it. \n Sequential consistency isn't activated for this handle : you should want to add dependencies between requests for which the sequential consistency is deactivated.", req, req->mpi_tag, test_req);
|
|
|
+ STARPU_ASSERT_MSG(!test_req, "[Error] request %p with tag %d wanted to be added to the application request hashmap[%d], while another request %p with the same tag is already in it. \n Sequential consistency isn't activated for this handle : you should want to add dependencies between requests for which the sequential consistency is deactivated.", req, req->mpi_tag, req->srcdst, test_req);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
@@ -121,61 +134,115 @@ static void delete_app_req(struct _starpu_mpi_req *req)
|
|
|
{
|
|
|
struct _starpu_mpi_req *test_req;
|
|
|
|
|
|
- test_req = find_app_req(req->mpi_tag);
|
|
|
+ test_req = find_app_req(req->mpi_tag, req->srcdst);
|
|
|
|
|
|
if (test_req != NULL)
|
|
|
{
|
|
|
- HASH_DEL(_starpu_mpi_app_req_hashmap, req);
|
|
|
- _STARPU_MPI_DEBUG(3, "Deleting application request %p with tag %d from the application request hashmap. \n", req, req->mpi_tag);
|
|
|
+ HASH_DEL(_starpu_mpi_app_req_hashmap[req->srcdst], req);
|
|
|
+ _starpu_mpi_app_req_hashmap_count --;
|
|
|
+ _STARPU_MPI_DEBUG(3, "Deleting application request %p with tag %d from the application request hashmap[%d]\n", req, req->mpi_tag, req->srcdst);
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- _STARPU_MPI_DEBUG(3, "[Warning] request %p with tag %d is NOT in the application request hashmap. \n", req, req->mpi_tag);
|
|
|
+ _STARPU_MPI_DEBUG(3, "[Warning] request %p with tag %d is NOT in the application request hashmap[%d]\n", req, req->mpi_tag, req->srcdst);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-static struct _starpu_mpi_copy_handle* find_chandle(int mpi_tag)
|
|
|
+static void _starpu_mpi_copy_handle_display_hash(int source, int tag)
|
|
|
{
|
|
|
- struct _starpu_mpi_copy_handle* chandle;
|
|
|
-
|
|
|
- HASH_FIND_INT(_starpu_mpi_copy_handle_hashmap, &mpi_tag, chandle);
|
|
|
+ struct _starpu_mpi_copy_handle_hashlist *hashlist;
|
|
|
+ HASH_FIND_INT(_starpu_mpi_copy_handle_hashmap[source], &tag, hashlist);
|
|
|
|
|
|
- return chandle;
|
|
|
+ if (hashlist == NULL)
|
|
|
+ {
|
|
|
+ _STARPU_MPI_DEBUG(60, "Hashlist for source %d and tag %d does not exist\n", source, tag);
|
|
|
+ }
|
|
|
+ else if (_starpu_mpi_copy_handle_list_empty(hashlist->list))
|
|
|
+ {
|
|
|
+ _STARPU_MPI_DEBUG(60, "Hashlist for source %d and tag %d is empty\n", source, tag);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ struct _starpu_mpi_copy_handle *cur;
|
|
|
+ for (cur = _starpu_mpi_copy_handle_list_begin(hashlist->list) ;
|
|
|
+ cur != _starpu_mpi_copy_handle_list_end(hashlist->list);
|
|
|
+ cur = _starpu_mpi_copy_handle_list_next(cur))
|
|
|
+ {
|
|
|
+ _STARPU_MPI_DEBUG(60, "Element for source %d and tag %d: %p\n", source, tag, cur);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
-static void add_chandle(struct _starpu_mpi_copy_handle *chandle)
|
|
|
+static struct _starpu_mpi_copy_handle *pop_chandle(int mpi_tag, int source, int delete)
|
|
|
{
|
|
|
- struct _starpu_mpi_copy_handle *test_chandle;
|
|
|
+ struct _starpu_mpi_copy_handle_hashlist *hashlist;
|
|
|
+ struct _starpu_mpi_copy_handle *chandle;
|
|
|
|
|
|
- test_chandle = find_chandle(chandle->mpi_tag);
|
|
|
-
|
|
|
- if (test_chandle == NULL)
|
|
|
+ _STARPU_MPI_DEBUG(60, "Looking for chandle with tag %d in the hashmap[%d]\n", mpi_tag, source);
|
|
|
+ HASH_FIND_INT(_starpu_mpi_copy_handle_hashmap[source], &mpi_tag, hashlist);
|
|
|
+ if (hashlist == NULL)
|
|
|
{
|
|
|
- HASH_ADD_INT(_starpu_mpi_copy_handle_hashmap, mpi_tag, chandle);
|
|
|
- _STARPU_MPI_DEBUG(3, "Adding copied handle %p with tag %d in the hashmap. \n", chandle, chandle->mpi_tag);
|
|
|
+ chandle = NULL;
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- _STARPU_MPI_DEBUG(3, "Error add_chandle : copied handle %p with tag %d already in the hashmap. \n", chandle, chandle->mpi_tag);
|
|
|
- STARPU_ASSERT(test_chandle != NULL);
|
|
|
+ if (_starpu_mpi_copy_handle_list_empty(hashlist->list))
|
|
|
+ {
|
|
|
+ chandle = NULL;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ if (delete == 1)
|
|
|
+ {
|
|
|
+ chandle = _starpu_mpi_copy_handle_list_pop_front(hashlist->list);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ chandle = _starpu_mpi_copy_handle_list_front(hashlist->list);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
+ _STARPU_MPI_DEBUG(60, "Found chandle %p with tag %d in the hashmap[%d]\n", chandle, mpi_tag, source);
|
|
|
+ return chandle;
|
|
|
}
|
|
|
|
|
|
-static void delete_chandle(struct _starpu_mpi_copy_handle *chandle)
|
|
|
+static struct _starpu_mpi_copy_handle *find_chandle(int mpi_tag, int source)
|
|
|
{
|
|
|
- struct _starpu_mpi_copy_handle *test_chandle;
|
|
|
+ return pop_chandle(mpi_tag, source, 0);
|
|
|
+}
|
|
|
|
|
|
- test_chandle = find_chandle(chandle->mpi_tag);
|
|
|
+static void add_chandle(struct _starpu_mpi_copy_handle *chandle)
|
|
|
+{
|
|
|
+ _STARPU_MPI_DEBUG(60, "Trying to add chandle %p with tag %d in the hashmap[%d]\n", chandle, chandle->mpi_tag, chandle->source);
|
|
|
|
|
|
- if (test_chandle != NULL)
|
|
|
+ struct _starpu_mpi_copy_handle_hashlist *hashlist;
|
|
|
+ HASH_FIND_INT(_starpu_mpi_copy_handle_hashmap[chandle->source], &chandle->mpi_tag, hashlist);
|
|
|
+ if (hashlist == NULL)
|
|
|
{
|
|
|
- HASH_DEL(_starpu_mpi_copy_handle_hashmap, chandle);
|
|
|
- _STARPU_MPI_DEBUG(3, "Deleting copied handle %p with tag %d from the hashmap. \n", chandle, chandle->mpi_tag);
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- _STARPU_MPI_DEBUG(3, "Warning delete_chandle : copied handle %p with tag %d isn't in the hashmap. \n", chandle, chandle->mpi_tag);
|
|
|
+ hashlist = malloc(sizeof(struct _starpu_mpi_copy_handle_hashlist));
|
|
|
+ hashlist->list = _starpu_mpi_copy_handle_list_new();
|
|
|
+ hashlist->mpi_tag = chandle->mpi_tag;
|
|
|
+ HASH_ADD_INT(_starpu_mpi_copy_handle_hashmap[chandle->source], mpi_tag, hashlist);
|
|
|
}
|
|
|
+ _starpu_mpi_copy_handle_list_push_back(hashlist->list, chandle);
|
|
|
+ _starpu_mpi_copy_handle_hashmap_count ++;
|
|
|
+#ifdef STARPU_VERBOSE
|
|
|
+ _starpu_mpi_copy_handle_display_hash(chandle->source, chandle->mpi_tag);
|
|
|
+#endif
|
|
|
+}
|
|
|
+
|
|
|
+static void delete_chandle(struct _starpu_mpi_copy_handle *chandle)
|
|
|
+{
|
|
|
+ _STARPU_MPI_DEBUG(60, "Trying to delete chandle %p with tag %d in the hashmap[%d]\n", chandle, chandle->mpi_tag, chandle->source);
|
|
|
+ struct _starpu_mpi_copy_handle *found = pop_chandle(chandle->mpi_tag, chandle->source, 1);
|
|
|
+
|
|
|
+ STARPU_ASSERT_MSG(found == chandle,
|
|
|
+ "Error delete_chandle : chandle %p with tag %d is NOT in the hashmap[%d]\n", chandle, chandle->mpi_tag, chandle->source);
|
|
|
+
|
|
|
+ _starpu_mpi_copy_handle_hashmap_count --;
|
|
|
+#ifdef STARPU_VERBOSE
|
|
|
+ _starpu_mpi_copy_handle_display_hash(chandle->source, chandle->mpi_tag);
|
|
|
+#endif
|
|
|
}
|
|
|
|
|
|
static void _starpu_mpi_request_init(struct _starpu_mpi_req *req)
|
|
|
@@ -183,7 +250,7 @@ static void _starpu_mpi_request_init(struct _starpu_mpi_req *req)
|
|
|
/* Initialize the request structure */
|
|
|
req->data_handle = NULL;
|
|
|
|
|
|
- req->datatype = NULL;
|
|
|
+ req->datatype = 0;
|
|
|
req->ptr = NULL;
|
|
|
req->count = -1;
|
|
|
req->user_datatype = -1;
|
|
|
@@ -195,7 +262,7 @@ static void _starpu_mpi_request_init(struct _starpu_mpi_req *req)
|
|
|
req->func = NULL;
|
|
|
|
|
|
req->status = NULL;
|
|
|
- req->request = NULL;
|
|
|
+ req->request = 0;
|
|
|
req->flag = NULL;
|
|
|
|
|
|
req->ret = -1;
|
|
|
@@ -216,7 +283,7 @@ static void _starpu_mpi_request_init(struct _starpu_mpi_req *req)
|
|
|
req->callback = NULL;
|
|
|
req->callback_arg = NULL;
|
|
|
|
|
|
- req->size_req = NULL;
|
|
|
+ req->size_req = 0;
|
|
|
req->internal_req = NULL;
|
|
|
req->is_internal_req = 0;
|
|
|
req->envelope = NULL;
|
|
|
@@ -236,7 +303,7 @@ static void _starpu_mpi_request_init(struct _starpu_mpi_req *req)
|
|
|
enum starpu_data_access_mode mode,
|
|
|
int sequential_consistency,
|
|
|
int is_internal_req,
|
|
|
- ssize_t psize)
|
|
|
+ ssize_t count)
|
|
|
{
|
|
|
|
|
|
_STARPU_MPI_LOG_IN();
|
|
|
@@ -258,7 +325,7 @@ static void _starpu_mpi_request_init(struct _starpu_mpi_req *req)
|
|
|
req->func = func;
|
|
|
req->sequential_consistency = sequential_consistency;
|
|
|
req->is_internal_req = is_internal_req;
|
|
|
- req->count = psize;
|
|
|
+ req->count = count;
|
|
|
|
|
|
/* Asynchronously request StarPU to fetch the data in main memory: when
|
|
|
* it is available in main memory, _starpu_mpi_submit_new_mpi_request(req) is called and
|
|
|
@@ -312,11 +379,12 @@ static void _starpu_mpi_request_init(struct _starpu_mpi_req *req)
|
|
|
|
|
|
if (req->user_datatype == 0)
|
|
|
{
|
|
|
+ int size;
|
|
|
req->count = 1;
|
|
|
req->ptr = starpu_data_get_local_ptr(req->data_handle);
|
|
|
|
|
|
- req->envelope->psize = (ssize_t)req->count;
|
|
|
-
|
|
|
+ MPI_Type_size(req->datatype, &size);
|
|
|
+ req->envelope->size = (ssize_t)req->count * size;
|
|
|
_STARPU_MPI_DEBUG(1, "Post MPI isend count (%ld) datatype_size %ld request to %d with tag %d\n",req->count,starpu_data_get_size(req->data_handle),req->srcdst, _starpu_mpi_tag);
|
|
|
MPI_Isend(req->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _starpu_mpi_tag, req->comm, &req->size_req);
|
|
|
}
|
|
|
@@ -325,30 +393,30 @@ static void _starpu_mpi_request_init(struct _starpu_mpi_req *req)
|
|
|
int ret;
|
|
|
|
|
|
// Do not pack the data, just try to find out the size
|
|
|
- starpu_data_pack(req->data_handle, NULL, &(req->envelope->psize));
|
|
|
+ starpu_data_pack(req->data_handle, NULL, &(req->envelope->size));
|
|
|
|
|
|
- if (req->envelope->psize != -1)
|
|
|
+ if (req->envelope->size != -1)
|
|
|
{
|
|
|
// We already know the size of the data, let's send it to overlap with the packing of the data
|
|
|
- _STARPU_MPI_DEBUG(1, "Sending size %ld (%ld %s) with tag %d to node %d (first call to pack)\n", req->envelope->psize, sizeof(req->count), _starpu_mpi_datatype(MPI_BYTE), _starpu_mpi_tag, req->srcdst);
|
|
|
- req->count = req->envelope->psize;
|
|
|
+ _STARPU_MPI_DEBUG(1, "Sending size %ld (%ld %s) with tag %d to node %d (first call to pack)\n", req->envelope->size, sizeof(req->count), _starpu_mpi_datatype(MPI_BYTE), _starpu_mpi_tag, req->srcdst);
|
|
|
+ req->count = req->envelope->size;
|
|
|
ret = MPI_Isend(req->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _starpu_mpi_tag, req->comm, &req->size_req);
|
|
|
STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "when sending size, MPI_Isend returning %d", ret);
|
|
|
}
|
|
|
|
|
|
// Pack the data
|
|
|
starpu_data_pack(req->data_handle, &req->ptr, &req->count);
|
|
|
- if (req->envelope->psize == -1)
|
|
|
+ if (req->envelope->size == -1)
|
|
|
{
|
|
|
// We know the size now, let's send it
|
|
|
- _STARPU_MPI_DEBUG(1, "Sending size %ld (%ld %s) with tag %d to node %d (second call to pack)\n", req->envelope->psize, sizeof(req->count), _starpu_mpi_datatype(MPI_BYTE), _starpu_mpi_tag, req->srcdst);
|
|
|
+ _STARPU_MPI_DEBUG(1, "Sending size %ld (%ld %s) with tag %d to node %d (second call to pack)\n", req->envelope->size, sizeof(req->count), _starpu_mpi_datatype(MPI_BYTE), _starpu_mpi_tag, req->srcdst);
|
|
|
ret = MPI_Isend(req->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _starpu_mpi_tag, req->comm, &req->size_req);
|
|
|
STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "when sending size, MPI_Isend returning %d", ret);
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
// We check the size returned with the 2 calls to pack is the same
|
|
|
- STARPU_ASSERT_MSG(req->count == req->envelope->psize, "Calls to pack_data returned different sizes %ld != %ld", req->count, req->envelope->psize);
|
|
|
+ STARPU_ASSERT_MSG(req->count == req->envelope->size, "Calls to pack_data returned different sizes %ld != %ld", req->count, req->envelope->size);
|
|
|
}
|
|
|
// We can send the data now
|
|
|
}
|
|
|
@@ -415,7 +483,7 @@ static void _starpu_mpi_irecv_data_func(struct _starpu_mpi_req *req)
|
|
|
|
|
|
STARPU_ASSERT_MSG(req->ptr, "Invalid pointer to receive data");
|
|
|
|
|
|
- _STARPU_MPI_DEBUG(2, "post MPI irecv request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
|
|
|
+ _STARPU_MPI_DEBUG(20, "post MPI irecv request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
|
|
|
|
|
|
TRACE_MPI_IRECV_SUBMIT_BEGIN(req->srcdst, req->mpi_tag);
|
|
|
|
|
|
@@ -435,9 +503,9 @@ static void _starpu_mpi_irecv_data_func(struct _starpu_mpi_req *req)
|
|
|
_STARPU_MPI_LOG_OUT();
|
|
|
}
|
|
|
|
|
|
-static struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, unsigned detached, void (*callback)(void *), void *arg, int sequential_consistency, int is_internal_req, ssize_t psize)
|
|
|
+static struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, unsigned detached, void (*callback)(void *), void *arg, int sequential_consistency, int is_internal_req, ssize_t count)
|
|
|
{
|
|
|
- return _starpu_mpi_isend_irecv_common(data_handle, source, mpi_tag, comm, detached, callback, arg, RECV_REQ, _starpu_mpi_irecv_data_func, STARPU_W, sequential_consistency, is_internal_req, psize);
|
|
|
+ return _starpu_mpi_isend_irecv_common(data_handle, source, mpi_tag, comm, detached, callback, arg, RECV_REQ, _starpu_mpi_irecv_data_func, STARPU_W, sequential_consistency, is_internal_req, count);
|
|
|
}
|
|
|
|
|
|
int starpu_mpi_irecv(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int source, int mpi_tag, MPI_Comm comm)
|
|
|
@@ -445,12 +513,12 @@ int starpu_mpi_irecv(starpu_data_handle_t data_handle, starpu_mpi_req *public_re
|
|
|
_STARPU_MPI_LOG_IN();
|
|
|
STARPU_ASSERT_MSG(public_req, "starpu_mpi_irecv needs a valid starpu_mpi_req");
|
|
|
|
|
|
- // We check if a tag is defined for the data handle, if not,
|
|
|
- // we define the one given for the communication.
|
|
|
- // A tag is necessary for the internal mpi engine.
|
|
|
- int tag = starpu_data_get_tag(data_handle);
|
|
|
- if (tag == -1)
|
|
|
- starpu_data_set_tag(data_handle, mpi_tag);
|
|
|
+// // We check if a tag is defined for the data handle, if not,
|
|
|
+// // we define the one given for the communication.
|
|
|
+// // A tag is necessary for the internal mpi engine.
|
|
|
+// int tag = starpu_data_get_tag(data_handle);
|
|
|
+// if (tag == -1)
|
|
|
+// starpu_data_set_tag(data_handle, mpi_tag);
|
|
|
|
|
|
struct _starpu_mpi_req *req;
|
|
|
req = _starpu_mpi_irecv_common(data_handle, source, mpi_tag, comm, 0, NULL, NULL, 1, 0, 0);
|
|
|
@@ -466,12 +534,12 @@ int starpu_mpi_irecv_detached(starpu_data_handle_t data_handle, int source, int
|
|
|
{
|
|
|
_STARPU_MPI_LOG_IN();
|
|
|
|
|
|
- // We check if a tag is defined for the data handle, if not,
|
|
|
- // we define the one given for the communication.
|
|
|
- // A tag is necessary for the internal mpi engine.
|
|
|
- int tag = starpu_data_get_tag(data_handle);
|
|
|
- if (tag == -1)
|
|
|
- starpu_data_set_tag(data_handle, mpi_tag);
|
|
|
+// // We check if a tag is defined for the data handle, if not,
|
|
|
+// // we define the one given for the communication.
|
|
|
+// // A tag is necessary for the internal mpi engine.
|
|
|
+// int tag = starpu_data_get_tag(data_handle);
|
|
|
+// if (tag == -1)
|
|
|
+// starpu_data_set_tag(data_handle, mpi_tag);
|
|
|
|
|
|
_starpu_mpi_irecv_common(data_handle, source, mpi_tag, comm, 1, callback, arg, 1, 0, 0);
|
|
|
_STARPU_MPI_LOG_OUT();
|
|
|
@@ -482,6 +550,13 @@ int starpu_mpi_irecv_detached_sequential_consistency(starpu_data_handle_t data_h
|
|
|
{
|
|
|
_STARPU_MPI_LOG_IN();
|
|
|
|
|
|
+// // We check if a tag is defined for the data handle, if not,
|
|
|
+// // we define the one given for the communication.
|
|
|
+// // A tag is necessary for the internal mpi engine.
|
|
|
+// int tag = starpu_data_get_tag(data_handle);
|
|
|
+// if (tag == -1)
|
|
|
+// starpu_data_set_tag(data_handle, mpi_tag);
|
|
|
+
|
|
|
_starpu_mpi_irecv_common(data_handle, source, mpi_tag, comm, 1, callback, arg, sequential_consistency, 0, 0);
|
|
|
|
|
|
_STARPU_MPI_LOG_OUT();
|
|
|
@@ -493,12 +568,12 @@ int starpu_mpi_recv(starpu_data_handle_t data_handle, int source, int mpi_tag, M
|
|
|
starpu_mpi_req req;
|
|
|
_STARPU_MPI_LOG_IN();
|
|
|
|
|
|
- // We check if a tag is defined for the data handle, if not,
|
|
|
- // we define the one given for the communication.
|
|
|
- // A tag is necessary for the internal mpi engine.
|
|
|
- int tag = starpu_data_get_tag(data_handle);
|
|
|
- if (tag == -1)
|
|
|
- starpu_data_set_tag(data_handle, mpi_tag);
|
|
|
+// // We check if a tag is defined for the data handle, if not,
|
|
|
+// // we define the one given for the communication.
|
|
|
+// // A tag is necessary for the internal mpi engine.
|
|
|
+// int tag = starpu_data_get_tag(data_handle);
|
|
|
+// if (tag == -1)
|
|
|
+// starpu_data_set_tag(data_handle, mpi_tag);
|
|
|
|
|
|
starpu_mpi_irecv(data_handle, &req, source, mpi_tag, comm);
|
|
|
starpu_mpi_wait(&req, status);
|
|
|
@@ -779,7 +854,8 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req)
|
|
|
|
|
|
if (req->internal_req)
|
|
|
{
|
|
|
- struct _starpu_mpi_copy_handle *chandle = find_chandle(starpu_data_get_tag(req->data_handle));
|
|
|
+ struct _starpu_mpi_copy_handle *chandle = find_chandle(req->mpi_tag, req->srcdst);
|
|
|
+ STARPU_ASSERT_MSG(chandle, "Could not find a copy data handle with the tag %d and the node %d\n", req->mpi_tag, req->srcdst);
|
|
|
_STARPU_MPI_DEBUG(3, "Handling deleting of copy_handle structure from the hashmap..\n");
|
|
|
delete_chandle(chandle);
|
|
|
free(chandle);
|
|
|
@@ -839,6 +915,7 @@ struct _starpu_mpi_copy_cb_args
|
|
|
starpu_data_handle_t data_handle;
|
|
|
starpu_data_handle_t copy_handle;
|
|
|
struct _starpu_mpi_req *req;
|
|
|
+ void *buffer;
|
|
|
};
|
|
|
|
|
|
static void _starpu_mpi_copy_cb(void* arg)
|
|
|
@@ -850,19 +927,30 @@ static void _starpu_mpi_copy_cb(void* arg)
|
|
|
args->req->request = args->req->internal_req->request;
|
|
|
args->req->submitted = 1;
|
|
|
|
|
|
- struct starpu_data_interface_ops *itf = starpu_data_get_interface_ops(args->copy_handle);
|
|
|
- void* itf_src = starpu_data_get_interface_on_node(args->copy_handle,0);
|
|
|
- void* itf_dst = starpu_data_get_interface_on_node(args->data_handle,0);
|
|
|
-
|
|
|
- if (!itf->copy_methods->ram_to_ram)
|
|
|
+ if (args->buffer)
|
|
|
{
|
|
|
- _STARPU_MPI_DEBUG(3, "Initiating any_to_any copy..\n");
|
|
|
- itf->copy_methods->any_to_any(itf_src, 0, itf_dst, 0, NULL);
|
|
|
+ /* Data has been received as a raw memory, it has to be unpacked */
|
|
|
+ struct starpu_data_interface_ops *itf_src = starpu_data_get_interface_ops(args->copy_handle);
|
|
|
+ struct starpu_data_interface_ops *itf_dst = starpu_data_get_interface_ops(args->data_handle);
|
|
|
+ itf_dst->unpack_data(args->data_handle, 0, args->buffer, itf_src->get_size(args->copy_handle));
|
|
|
+ free(args->buffer);
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- _STARPU_MPI_DEBUG(3, "Initiating ram_to_ram copy..\n");
|
|
|
- itf->copy_methods->ram_to_ram(itf_src, 0, itf_dst, 0);
|
|
|
+ struct starpu_data_interface_ops *itf = starpu_data_get_interface_ops(args->copy_handle);
|
|
|
+ void* itf_src = starpu_data_get_interface_on_node(args->copy_handle,0);
|
|
|
+ void* itf_dst = starpu_data_get_interface_on_node(args->data_handle,0);
|
|
|
+
|
|
|
+ if (!itf->copy_methods->ram_to_ram)
|
|
|
+ {
|
|
|
+ _STARPU_MPI_DEBUG(3, "Initiating any_to_any copy..\n");
|
|
|
+ itf->copy_methods->any_to_any(itf_src, 0, itf_dst, 0, NULL);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ _STARPU_MPI_DEBUG(3, "Initiating ram_to_ram copy..\n");
|
|
|
+ itf->copy_methods->ram_to_ram(itf_src, 0, itf_dst, 0);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
_STARPU_MPI_DEBUG(3, "Done, handling release of copy_handle..\n");
|
|
|
@@ -889,7 +977,7 @@ static void _starpu_mpi_submit_new_mpi_request(void *arg)
|
|
|
|
|
|
_STARPU_MPI_INC_POSTED_REQUESTS(-1);
|
|
|
|
|
|
- _STARPU_MPI_DEBUG(3, "calling _starpu_mpi_submit_new_mpi_request with req %p tag %d and type %s\n", req, req->mpi_tag, _starpu_mpi_request_type(req->request_type));
|
|
|
+ _STARPU_MPI_DEBUG(3, "calling _starpu_mpi_submit_new_mpi_request with req %p srcdst %d tag %d and type %s\n", req, req->srcdst, req->mpi_tag, _starpu_mpi_request_type(req->request_type));
|
|
|
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
|
|
|
@@ -929,7 +1017,7 @@ static void _starpu_mpi_submit_new_mpi_request(void *arg)
|
|
|
else
|
|
|
{
|
|
|
/* test whether the receive request has already been submitted internally by StarPU-MPI*/
|
|
|
- struct _starpu_mpi_copy_handle *chandle = find_chandle(req->mpi_tag);
|
|
|
+ struct _starpu_mpi_copy_handle *chandle = find_chandle(req->mpi_tag, req->srcdst);
|
|
|
|
|
|
/* Case : the request has already been submitted internally by StarPU.
|
|
|
* We'll asynchronously ask a Read permission over the temporary handle, so as when
|
|
|
@@ -937,6 +1025,13 @@ static void _starpu_mpi_submit_new_mpi_request(void *arg)
|
|
|
* bring the data back to the original data handle associated to the request.*/
|
|
|
if (chandle)
|
|
|
{
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&(chandle->req_mutex));
|
|
|
+ while (!(chandle->req_ready))
|
|
|
+ STARPU_PTHREAD_COND_WAIT(&(chandle->req_cond), &(chandle->req_mutex));
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&(chandle->req_mutex));
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
+
|
|
|
_STARPU_MPI_DEBUG(3, "The RECV request %p with tag %d has already been received, copying previously received data into handle's pointer..\n", req, req->mpi_tag);
|
|
|
STARPU_ASSERT(req->data_handle != chandle->handle);
|
|
|
|
|
|
@@ -945,6 +1040,7 @@ static void _starpu_mpi_submit_new_mpi_request(void *arg)
|
|
|
struct _starpu_mpi_copy_cb_args *cb_args = malloc(sizeof(struct _starpu_mpi_copy_cb_args));
|
|
|
cb_args->data_handle = req->data_handle;
|
|
|
cb_args->copy_handle = chandle->handle;
|
|
|
+ cb_args->buffer = chandle->buffer;
|
|
|
cb_args->req = req;
|
|
|
|
|
|
_STARPU_MPI_DEBUG(3, "Calling data_acquire_cb on starpu_mpi_copy_cb..\n");
|
|
|
@@ -954,6 +1050,7 @@ static void _starpu_mpi_submit_new_mpi_request(void *arg)
|
|
|
* We just add the pending receive request to the requests' hashmap. */
|
|
|
else
|
|
|
{
|
|
|
+ _STARPU_MPI_DEBUG(3, "Adding the pending receive request %p (srcdst %d tag %d) into the request hashmap\n", req, req->srcdst, req->mpi_tag);
|
|
|
add_app_req(req);
|
|
|
}
|
|
|
}
|
|
|
@@ -1119,6 +1216,7 @@ static void _starpu_mpi_print_thread_level_support(int thread_level, char *msg)
|
|
|
static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
{
|
|
|
struct _starpu_mpi_argc_argv *argc_argv = (struct _starpu_mpi_argc_argv *) arg;
|
|
|
+ int rank, worldsize;
|
|
|
|
|
|
if (argc_argv->initialize_mpi)
|
|
|
{
|
|
|
@@ -1137,10 +1235,11 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
_starpu_mpi_print_thread_level_support(provided, " has been initialized with");
|
|
|
}
|
|
|
|
|
|
+ MPI_Comm_rank(MPI_COMM_WORLD, &rank);
|
|
|
+ MPI_Comm_size(MPI_COMM_WORLD, &worldsize);
|
|
|
+ MPI_Comm_set_errhandler(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
|
|
|
+
|
|
|
{
|
|
|
- int rank, worldsize;
|
|
|
- MPI_Comm_rank(MPI_COMM_WORLD, &rank);
|
|
|
- MPI_Comm_size(MPI_COMM_WORLD, &worldsize);
|
|
|
TRACE_MPI_START(rank, worldsize);
|
|
|
#ifdef STARPU_USE_FXT
|
|
|
starpu_profiling_set_id(rank);
|
|
|
@@ -1162,7 +1261,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
while (running || posted_requests || !(_starpu_mpi_req_list_empty(new_requests)) || !(_starpu_mpi_req_list_empty(detached_requests)))
|
|
|
{
|
|
|
/* shall we block ? */
|
|
|
- unsigned block = _starpu_mpi_req_list_empty(new_requests) && (HASH_COUNT(_starpu_mpi_app_req_hashmap) == 0);
|
|
|
+ unsigned block = _starpu_mpi_req_list_empty(new_requests) && (_starpu_mpi_app_req_hashmap_count == 0);
|
|
|
|
|
|
#ifndef STARPU_MPI_ACTIVITY
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
|
|
|
@@ -1202,7 +1301,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
/* If there is no currently submitted header_req submitted to catch envelopes from senders, and there is some pending receive
|
|
|
* requests in our side, we resubmit a header request. */
|
|
|
MPI_Request header_req;
|
|
|
- if ((HASH_COUNT(_starpu_mpi_app_req_hashmap) > 0) && (header_req_submitted == 0))// && (HASH_COUNT(_starpu_mpi_copy_handle_hashmap) == 0))
|
|
|
+ if ((_starpu_mpi_app_req_hashmap_count > 0) && (header_req_submitted == 0))// && (HASH_COUNT(_starpu_mpi_copy_handle_hashmap) == 0))
|
|
|
{
|
|
|
_STARPU_MPI_DEBUG(3, "Posting a receive to get a data envelop\n");
|
|
|
MPI_Irecv(recv_env, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, MPI_ANY_SOURCE, _starpu_mpi_tag, MPI_COMM_WORLD, &header_req);
|
|
|
@@ -1226,37 +1325,53 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
|
|
|
if (flag)
|
|
|
{
|
|
|
- _STARPU_MPI_DEBUG(3, "Searching for application request with tag %d (size %ld)\n", recv_env->mpi_tag, recv_env->psize);
|
|
|
+ _STARPU_MPI_DEBUG(3, "Searching for application request with tag %d and source %d (size %ld)\n", recv_env->mpi_tag, status.MPI_SOURCE, recv_env->size);
|
|
|
|
|
|
- struct _starpu_mpi_req *found_req = find_app_req(recv_env->mpi_tag);
|
|
|
+ struct _starpu_mpi_req *found_req = find_app_req(recv_env->mpi_tag, status.MPI_SOURCE);
|
|
|
|
|
|
/* Case : a data will arrive before the matching receive has been submitted in our side of the application.
|
|
|
* We will allow a temporary handle to store the incoming data, by submitting a starpu_mpi_irecv_detached
|
|
|
* on this handle, and register this so as the StarPU-MPI layer can remember it.*/
|
|
|
if (!found_req)
|
|
|
{
|
|
|
- _STARPU_MPI_DEBUG(3, "Request with tag %d not found, creating a copy_handle to receive incoming data..\n",recv_env->mpi_tag);
|
|
|
+ _STARPU_MPI_DEBUG(3, "Request with tag %d and source %d not found, creating a copy_handle to receive incoming data..\n", recv_env->mpi_tag, status.MPI_SOURCE);
|
|
|
|
|
|
starpu_data_handle_t data_handle = NULL;
|
|
|
|
|
|
- while(!(data_handle))
|
|
|
- {
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
- data_handle = starpu_data_get_data_handle_from_tag(recv_env->mpi_tag);
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
- }
|
|
|
- STARPU_ASSERT(data_handle);
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
+ data_handle = starpu_data_get_data_handle_from_tag(recv_env->mpi_tag);
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
|
|
|
- struct _starpu_mpi_copy_handle* chandle = malloc(sizeof(struct _starpu_mpi_copy_handle));
|
|
|
+ struct _starpu_mpi_copy_handle* chandle = calloc(1, sizeof(struct _starpu_mpi_copy_handle));
|
|
|
STARPU_ASSERT(chandle);
|
|
|
-
|
|
|
+ STARPU_PTHREAD_MUTEX_INIT(&chandle->req_mutex, NULL);
|
|
|
+ STARPU_PTHREAD_COND_INIT(&chandle->req_cond, NULL);
|
|
|
chandle->mpi_tag = recv_env->mpi_tag;
|
|
|
chandle->env = recv_env;
|
|
|
- starpu_data_register_same(&chandle->handle, data_handle);
|
|
|
- add_chandle(chandle);
|
|
|
+ chandle->source = status.MPI_SOURCE;
|
|
|
+
|
|
|
+ if (data_handle)
|
|
|
+ {
|
|
|
+ chandle->buffer = NULL;
|
|
|
+ starpu_data_register_same(&chandle->handle, data_handle);
|
|
|
+ add_chandle(chandle);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ /* The application has not registered yet a data with the tag,
|
|
|
+ * we are going to receive the data as a raw memory, and give it
|
|
|
+ * to the application when it post a receive for this tag
|
|
|
+ */
|
|
|
+ _STARPU_MPI_DEBUG(20, "Posting a receive for a data of size %d which has not yet been registered\n", (int)chandle->env->size);
|
|
|
+ chandle->buffer = malloc(chandle->env->size);
|
|
|
+ starpu_vector_data_register(&chandle->handle, 0, (uintptr_t) chandle->buffer, chandle->env->size, 1);
|
|
|
+ add_chandle(chandle);
|
|
|
+ }
|
|
|
|
|
|
- _STARPU_MPI_DEBUG(3, "Posting internal detached irecv on copy_handle with tag %d from src %d ..\n", chandle->mpi_tag, status.MPI_SOURCE);
|
|
|
- chandle->req = _starpu_mpi_irecv_common(chandle->handle, status.MPI_SOURCE, chandle->mpi_tag, MPI_COMM_WORLD, 1, NULL, NULL, 1, 1, recv_env->psize);
|
|
|
+ _STARPU_MPI_DEBUG(20, "Posting internal detached irecv on copy_handle with tag %d from src %d ..\n", chandle->mpi_tag, status.MPI_SOURCE);
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
+ chandle->req = _starpu_mpi_irecv_common(chandle->handle, status.MPI_SOURCE, chandle->mpi_tag, MPI_COMM_WORLD, 1, NULL, NULL, 1, 1, recv_env->size);
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
|
|
|
// We wait until the request is pushed in the
|
|
|
// new_request list, that ensures that the next loop
|
|
|
@@ -1268,6 +1383,11 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
while (!(chandle->req->posted))
|
|
|
STARPU_PTHREAD_COND_WAIT(&(chandle->req->posted_cond), &(chandle->req->posted_mutex));
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&(chandle->req->posted_mutex));
|
|
|
+
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&chandle->req_mutex);
|
|
|
+ chandle->req_ready = 1;
|
|
|
+ STARPU_PTHREAD_COND_BROADCAST(&chandle->req_cond);
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&chandle->req_mutex);
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
}
|
|
|
/* Case : a matching receive has been found for the incoming data, we handle the correct allocation of the pointer associated to
|
|
|
@@ -1286,7 +1406,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- found_req->count = recv_env->psize;
|
|
|
+ found_req->count = recv_env->size;
|
|
|
found_req->ptr = malloc(found_req->count);
|
|
|
|
|
|
STARPU_ASSERT_MSG(found_req->ptr, "cannot allocate message of size %ld\n", found_req->count);
|
|
|
@@ -1313,8 +1433,9 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
STARPU_ASSERT_MSG(_starpu_mpi_req_list_empty(detached_requests), "List of detached requests not empty");
|
|
|
STARPU_ASSERT_MSG(_starpu_mpi_req_list_empty(new_requests), "List of new requests not empty");
|
|
|
STARPU_ASSERT_MSG(posted_requests == 0, "Number of posted request is not zero");
|
|
|
- STARPU_ASSERT_MSG(HASH_COUNT(_starpu_mpi_app_req_hashmap) == 0, "Number of receive requests left is not zero");
|
|
|
- STARPU_ASSERT_MSG(HASH_COUNT(_starpu_mpi_copy_handle_hashmap) == 0, "Number of copy requests left is not zero");
|
|
|
+ STARPU_ASSERT_MSG(_starpu_mpi_app_req_hashmap_count == 0, "Number of receive requests left is not zero");
|
|
|
+ STARPU_ASSERT_MSG(_starpu_mpi_copy_handle_hashmap_count == 0, "Number of copy requests left is not zero");
|
|
|
+
|
|
|
if (argc_argv->initialize_mpi)
|
|
|
{
|
|
|
_STARPU_MPI_DEBUG(3, "Calling MPI_Finalize()\n");
|
|
|
@@ -1323,6 +1444,21 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
|
|
|
+ {
|
|
|
+ int n;
|
|
|
+ struct _starpu_mpi_copy_handle_hashlist *hashlist;
|
|
|
+
|
|
|
+ for(n=0 ; n<worldsize; n++)
|
|
|
+ {
|
|
|
+ for(hashlist=_starpu_mpi_copy_handle_hashmap[n]; hashlist != NULL; hashlist=hashlist->hh.next)
|
|
|
+ {
|
|
|
+ _starpu_mpi_copy_handle_list_delete(hashlist->list);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ free(_starpu_mpi_app_req_hashmap);
|
|
|
+ free(_starpu_mpi_copy_handle_hashmap);
|
|
|
free(argc_argv);
|
|
|
free(recv_env);
|
|
|
|
|
|
@@ -1406,6 +1542,16 @@ int _starpu_mpi_initialize(int *argc, char ***argv, int initialize_mpi)
|
|
|
_starpu_mpi_add_sync_point_in_fxt();
|
|
|
_starpu_mpi_comm_amounts_init(MPI_COMM_WORLD);
|
|
|
_starpu_mpi_cache_init(MPI_COMM_WORLD);
|
|
|
+
|
|
|
+ {
|
|
|
+ int nb_nodes, k;
|
|
|
+ MPI_Comm_size(MPI_COMM_WORLD, &nb_nodes);
|
|
|
+ _starpu_mpi_app_req_hashmap = malloc(nb_nodes * sizeof(struct _starpu_mpi_req *));
|
|
|
+ for(k=0 ; k<nb_nodes ; k++) _starpu_mpi_app_req_hashmap[k] = NULL;
|
|
|
+ _starpu_mpi_copy_handle_hashmap = malloc(nb_nodes * sizeof(struct _starpu_mpi_copy_handle_hash_list *));
|
|
|
+ for(k=0 ; k<nb_nodes ; k++) _starpu_mpi_copy_handle_hashmap[k] = NULL;
|
|
|
+ }
|
|
|
+
|
|
|
return 0;
|
|
|
}
|
|
|
|