|
@@ -22,6 +22,8 @@
|
|
|
#include <starpu_profiling.h>
|
|
#include <starpu_profiling.h>
|
|
|
#include <starpu_mpi_stats.h>
|
|
#include <starpu_mpi_stats.h>
|
|
|
#include <starpu_mpi_task_insert.h>
|
|
#include <starpu_mpi_task_insert.h>
|
|
|
|
|
+#include <starpu_mpi_early_data.h>
|
|
|
|
|
+#include <starpu_mpi_early_request.h>
|
|
|
#include <common/config.h>
|
|
#include <common/config.h>
|
|
|
#include <common/thread.h>
|
|
#include <common/thread.h>
|
|
|
#include <datawizard/interfaces/data_interface.h>
|
|
#include <datawizard/interfaces/data_interface.h>
|
|
@@ -65,234 +67,52 @@ static int posted_requests = 0, newer_requests, barrier_running = 0;
|
|
|
|
|
|
|
|
#define _STARPU_MPI_INC_POSTED_REQUESTS(value) { STARPU_PTHREAD_MUTEX_LOCK(&mutex_posted_requests); posted_requests += value; STARPU_PTHREAD_MUTEX_UNLOCK(&mutex_posted_requests); }
|
|
#define _STARPU_MPI_INC_POSTED_REQUESTS(value) { STARPU_PTHREAD_MUTEX_LOCK(&mutex_posted_requests); posted_requests += value; STARPU_PTHREAD_MUTEX_UNLOCK(&mutex_posted_requests); }
|
|
|
|
|
|
|
|
-LIST_TYPE(_starpu_mpi_copy_handle,
|
|
|
|
|
- starpu_data_handle_t handle;
|
|
|
|
|
- struct _starpu_mpi_envelope *env;
|
|
|
|
|
- struct _starpu_mpi_req *req;
|
|
|
|
|
- void *buffer;
|
|
|
|
|
- int mpi_tag;
|
|
|
|
|
- int source;
|
|
|
|
|
- int req_ready;
|
|
|
|
|
- starpu_pthread_mutex_t req_mutex;
|
|
|
|
|
- starpu_pthread_cond_t req_cond;
|
|
|
|
|
-);
|
|
|
|
|
-
|
|
|
|
|
-struct _starpu_mpi_copy_handle_hashlist
|
|
|
|
|
|
|
+static void _starpu_mpi_request_init(struct _starpu_mpi_req **req)
|
|
|
{
|
|
{
|
|
|
- struct _starpu_mpi_copy_handle_list *list;
|
|
|
|
|
- UT_hash_handle hh;
|
|
|
|
|
- int mpi_tag;
|
|
|
|
|
-};
|
|
|
|
|
-
|
|
|
|
|
-/********************************************************/
|
|
|
|
|
-/* */
|
|
|
|
|
-/* Hashmap's requests functionalities */
|
|
|
|
|
-/* */
|
|
|
|
|
-/********************************************************/
|
|
|
|
|
-
|
|
|
|
|
-/** stores application requests for which data have not been received yet */
|
|
|
|
|
-static struct _starpu_mpi_req **_starpu_mpi_app_req_hashmap = NULL;
|
|
|
|
|
-static int _starpu_mpi_app_req_hashmap_count = 0;
|
|
|
|
|
-/** stores data which have been received by MPI but have not been requested by the application */
|
|
|
|
|
-static struct _starpu_mpi_copy_handle_hashlist **_starpu_mpi_copy_handle_hashmap = NULL;
|
|
|
|
|
-static int _starpu_mpi_copy_handle_hashmap_count = 0;
|
|
|
|
|
-
|
|
|
|
|
-static struct _starpu_mpi_req* find_app_req(int mpi_tag, int source)
|
|
|
|
|
-{
|
|
|
|
|
- struct _starpu_mpi_req* req;
|
|
|
|
|
-
|
|
|
|
|
- HASH_FIND_INT(_starpu_mpi_app_req_hashmap[source], &mpi_tag, req);
|
|
|
|
|
-
|
|
|
|
|
- return req;
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-static void add_app_req(struct _starpu_mpi_req *req)
|
|
|
|
|
-{
|
|
|
|
|
- struct _starpu_mpi_req *test_req;
|
|
|
|
|
-
|
|
|
|
|
- test_req = find_app_req(req->mpi_tag, req->srcdst);
|
|
|
|
|
-
|
|
|
|
|
- if (test_req == NULL)
|
|
|
|
|
- {
|
|
|
|
|
- HASH_ADD_INT(_starpu_mpi_app_req_hashmap[req->srcdst], mpi_tag, req);
|
|
|
|
|
- _starpu_mpi_app_req_hashmap_count ++;
|
|
|
|
|
- _STARPU_MPI_DEBUG(3, "Adding request %p with tag %d in the application request hashmap[%d]\n", req, req->mpi_tag, req->srcdst);
|
|
|
|
|
- }
|
|
|
|
|
- else
|
|
|
|
|
- {
|
|
|
|
|
- _STARPU_MPI_DEBUG(3, "[Error] request %p with tag %d already in the application request hashmap[%d]\n", req, req->mpi_tag, req->srcdst);
|
|
|
|
|
- int seq_const = starpu_data_get_sequential_consistency_flag(req->data_handle);
|
|
|
|
|
- if (seq_const && req->sequential_consistency)
|
|
|
|
|
- {
|
|
|
|
|
- STARPU_ASSERT_MSG(!test_req, "[Error] request %p with tag %d wanted to be added to the application request hashmap[%d], while another request %p with the same tag is already in it. \n Sequential consistency is activated : this is not supported by StarPU.", req, req->mpi_tag, req->srcdst, test_req);
|
|
|
|
|
- }
|
|
|
|
|
- else
|
|
|
|
|
- {
|
|
|
|
|
- STARPU_ASSERT_MSG(!test_req, "[Error] request %p with tag %d wanted to be added to the application request hashmap[%d], while another request %p with the same tag is already in it. \n Sequential consistency isn't activated for this handle : you should want to add dependencies between requests for which the sequential consistency is deactivated.", req, req->mpi_tag, req->srcdst, test_req);
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-static void delete_app_req(struct _starpu_mpi_req *req)
|
|
|
|
|
-{
|
|
|
|
|
- struct _starpu_mpi_req *test_req;
|
|
|
|
|
|
|
+ *req = malloc(sizeof(struct _starpu_mpi_req));
|
|
|
|
|
+ STARPU_ASSERT_MSG(*req, "Invalid request");
|
|
|
|
|
|
|
|
- test_req = find_app_req(req->mpi_tag, req->srcdst);
|
|
|
|
|
-
|
|
|
|
|
- if (test_req != NULL)
|
|
|
|
|
- {
|
|
|
|
|
- HASH_DEL(_starpu_mpi_app_req_hashmap[req->srcdst], req);
|
|
|
|
|
- _starpu_mpi_app_req_hashmap_count --;
|
|
|
|
|
- _STARPU_MPI_DEBUG(3, "Deleting application request %p with tag %d from the application request hashmap[%d]\n", req, req->mpi_tag, req->srcdst);
|
|
|
|
|
- }
|
|
|
|
|
- else
|
|
|
|
|
- {
|
|
|
|
|
- _STARPU_MPI_DEBUG(3, "[Warning] request %p with tag %d is NOT in the application request hashmap[%d]\n", req, req->mpi_tag, req->srcdst);
|
|
|
|
|
- }
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-#ifdef STARPU_VERBOSE
|
|
|
|
|
-static void _starpu_mpi_copy_handle_display_hash(int source, int tag)
|
|
|
|
|
-{
|
|
|
|
|
- struct _starpu_mpi_copy_handle_hashlist *hashlist;
|
|
|
|
|
- HASH_FIND_INT(_starpu_mpi_copy_handle_hashmap[source], &tag, hashlist);
|
|
|
|
|
-
|
|
|
|
|
- if (hashlist == NULL)
|
|
|
|
|
- {
|
|
|
|
|
- _STARPU_MPI_DEBUG(60, "Hashlist for source %d and tag %d does not exist\n", source, tag);
|
|
|
|
|
- }
|
|
|
|
|
- else if (_starpu_mpi_copy_handle_list_empty(hashlist->list))
|
|
|
|
|
- {
|
|
|
|
|
- _STARPU_MPI_DEBUG(60, "Hashlist for source %d and tag %d is empty\n", source, tag);
|
|
|
|
|
- }
|
|
|
|
|
- else
|
|
|
|
|
- {
|
|
|
|
|
- struct _starpu_mpi_copy_handle *cur;
|
|
|
|
|
- for (cur = _starpu_mpi_copy_handle_list_begin(hashlist->list) ;
|
|
|
|
|
- cur != _starpu_mpi_copy_handle_list_end(hashlist->list);
|
|
|
|
|
- cur = _starpu_mpi_copy_handle_list_next(cur))
|
|
|
|
|
- {
|
|
|
|
|
- _STARPU_MPI_DEBUG(60, "Element for source %d and tag %d: %p\n", source, tag, cur);
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
-}
|
|
|
|
|
-#endif
|
|
|
|
|
-
|
|
|
|
|
-static struct _starpu_mpi_copy_handle *pop_chandle(int mpi_tag, int source, int delete)
|
|
|
|
|
-{
|
|
|
|
|
- struct _starpu_mpi_copy_handle_hashlist *hashlist;
|
|
|
|
|
- struct _starpu_mpi_copy_handle *chandle;
|
|
|
|
|
-
|
|
|
|
|
- _STARPU_MPI_DEBUG(60, "Looking for chandle with tag %d in the hashmap[%d]\n", mpi_tag, source);
|
|
|
|
|
- HASH_FIND_INT(_starpu_mpi_copy_handle_hashmap[source], &mpi_tag, hashlist);
|
|
|
|
|
- if (hashlist == NULL)
|
|
|
|
|
- {
|
|
|
|
|
- chandle = NULL;
|
|
|
|
|
- }
|
|
|
|
|
- else
|
|
|
|
|
- {
|
|
|
|
|
- if (_starpu_mpi_copy_handle_list_empty(hashlist->list))
|
|
|
|
|
- {
|
|
|
|
|
- chandle = NULL;
|
|
|
|
|
- }
|
|
|
|
|
- else
|
|
|
|
|
- {
|
|
|
|
|
- if (delete == 1)
|
|
|
|
|
- {
|
|
|
|
|
- chandle = _starpu_mpi_copy_handle_list_pop_front(hashlist->list);
|
|
|
|
|
- }
|
|
|
|
|
- else
|
|
|
|
|
- {
|
|
|
|
|
- chandle = _starpu_mpi_copy_handle_list_front(hashlist->list);
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- _STARPU_MPI_DEBUG(60, "Found chandle %p with tag %d in the hashmap[%d]\n", chandle, mpi_tag, source);
|
|
|
|
|
- return chandle;
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-static struct _starpu_mpi_copy_handle *find_chandle(int mpi_tag, int source)
|
|
|
|
|
-{
|
|
|
|
|
- return pop_chandle(mpi_tag, source, 0);
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-static void add_chandle(struct _starpu_mpi_copy_handle *chandle)
|
|
|
|
|
-{
|
|
|
|
|
- _STARPU_MPI_DEBUG(60, "Trying to add chandle %p with tag %d in the hashmap[%d]\n", chandle, chandle->mpi_tag, chandle->source);
|
|
|
|
|
-
|
|
|
|
|
- struct _starpu_mpi_copy_handle_hashlist *hashlist;
|
|
|
|
|
- HASH_FIND_INT(_starpu_mpi_copy_handle_hashmap[chandle->source], &chandle->mpi_tag, hashlist);
|
|
|
|
|
- if (hashlist == NULL)
|
|
|
|
|
- {
|
|
|
|
|
- hashlist = malloc(sizeof(struct _starpu_mpi_copy_handle_hashlist));
|
|
|
|
|
- hashlist->list = _starpu_mpi_copy_handle_list_new();
|
|
|
|
|
- hashlist->mpi_tag = chandle->mpi_tag;
|
|
|
|
|
- HASH_ADD_INT(_starpu_mpi_copy_handle_hashmap[chandle->source], mpi_tag, hashlist);
|
|
|
|
|
- }
|
|
|
|
|
- _starpu_mpi_copy_handle_list_push_back(hashlist->list, chandle);
|
|
|
|
|
- _starpu_mpi_copy_handle_hashmap_count ++;
|
|
|
|
|
-#ifdef STARPU_VERBOSE
|
|
|
|
|
- _starpu_mpi_copy_handle_display_hash(chandle->source, chandle->mpi_tag);
|
|
|
|
|
-#endif
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-static void delete_chandle(struct _starpu_mpi_copy_handle *chandle)
|
|
|
|
|
-{
|
|
|
|
|
- _STARPU_MPI_DEBUG(60, "Trying to delete chandle %p with tag %d in the hashmap[%d]\n", chandle, chandle->mpi_tag, chandle->source);
|
|
|
|
|
- struct _starpu_mpi_copy_handle *found = pop_chandle(chandle->mpi_tag, chandle->source, 1);
|
|
|
|
|
-
|
|
|
|
|
- STARPU_ASSERT_MSG(found == chandle,
|
|
|
|
|
- "Error delete_chandle : chandle %p with tag %d is NOT in the hashmap[%d]\n", chandle, chandle->mpi_tag, chandle->source);
|
|
|
|
|
-
|
|
|
|
|
- _starpu_mpi_copy_handle_hashmap_count --;
|
|
|
|
|
-#ifdef STARPU_VERBOSE
|
|
|
|
|
- _starpu_mpi_copy_handle_display_hash(chandle->source, chandle->mpi_tag);
|
|
|
|
|
-#endif
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-static void _starpu_mpi_request_init(struct _starpu_mpi_req *req)
|
|
|
|
|
-{
|
|
|
|
|
/* Initialize the request structure */
|
|
/* Initialize the request structure */
|
|
|
- req->data_handle = NULL;
|
|
|
|
|
|
|
+ (*req)->data_handle = NULL;
|
|
|
|
|
|
|
|
- req->datatype = 0;
|
|
|
|
|
- req->ptr = NULL;
|
|
|
|
|
- req->count = -1;
|
|
|
|
|
- req->user_datatype = -1;
|
|
|
|
|
|
|
+ (*req)->datatype = 0;
|
|
|
|
|
+ (*req)->ptr = NULL;
|
|
|
|
|
+ (*req)->count = -1;
|
|
|
|
|
+ (*req)->user_datatype = -1;
|
|
|
|
|
|
|
|
- req->srcdst = -1;
|
|
|
|
|
- req->mpi_tag = -1;
|
|
|
|
|
- req->comm = 0;
|
|
|
|
|
|
|
+ (*req)->srcdst = -1;
|
|
|
|
|
+ (*req)->mpi_tag = -1;
|
|
|
|
|
+ (*req)->comm = 0;
|
|
|
|
|
|
|
|
- req->func = NULL;
|
|
|
|
|
|
|
+ (*req)->func = NULL;
|
|
|
|
|
|
|
|
- req->status = NULL;
|
|
|
|
|
- req->request = 0;
|
|
|
|
|
- req->flag = NULL;
|
|
|
|
|
|
|
+ (*req)->status = NULL;
|
|
|
|
|
+ (*req)->request = 0;
|
|
|
|
|
+ (*req)->flag = NULL;
|
|
|
|
|
|
|
|
- req->ret = -1;
|
|
|
|
|
- STARPU_PTHREAD_MUTEX_INIT(&req->req_mutex, NULL);
|
|
|
|
|
- STARPU_PTHREAD_COND_INIT(&req->req_cond, NULL);
|
|
|
|
|
- STARPU_PTHREAD_MUTEX_INIT(&req->posted_mutex, NULL);
|
|
|
|
|
- STARPU_PTHREAD_COND_INIT(&req->posted_cond, NULL);
|
|
|
|
|
|
|
+ (*req)->ret = -1;
|
|
|
|
|
+ STARPU_PTHREAD_MUTEX_INIT(&((*req)->req_mutex), NULL);
|
|
|
|
|
+ STARPU_PTHREAD_COND_INIT(&((*req)->req_cond), NULL);
|
|
|
|
|
+ STARPU_PTHREAD_MUTEX_INIT(&((*req)->posted_mutex), NULL);
|
|
|
|
|
+ STARPU_PTHREAD_COND_INIT(&((*req)->posted_cond), NULL);
|
|
|
|
|
|
|
|
- req->request_type = UNKNOWN_REQ;
|
|
|
|
|
|
|
+ (*req)->request_type = UNKNOWN_REQ;
|
|
|
|
|
|
|
|
- req->submitted = 0;
|
|
|
|
|
- req->completed = 0;
|
|
|
|
|
- req->posted = 0;
|
|
|
|
|
|
|
+ (*req)->submitted = 0;
|
|
|
|
|
+ (*req)->completed = 0;
|
|
|
|
|
+ (*req)->posted = 0;
|
|
|
|
|
|
|
|
- req->other_request = NULL;
|
|
|
|
|
|
|
+ (*req)->other_request = NULL;
|
|
|
|
|
|
|
|
- req->detached = -1;
|
|
|
|
|
- req->callback = NULL;
|
|
|
|
|
- req->callback_arg = NULL;
|
|
|
|
|
|
|
+ (*req)->detached = -1;
|
|
|
|
|
+ (*req)->callback = NULL;
|
|
|
|
|
+ (*req)->callback_arg = NULL;
|
|
|
|
|
|
|
|
- req->size_req = 0;
|
|
|
|
|
- req->internal_req = NULL;
|
|
|
|
|
- req->is_internal_req = 0;
|
|
|
|
|
- req->envelope = NULL;
|
|
|
|
|
- req->sequential_consistency = 1;
|
|
|
|
|
|
|
+ (*req)->size_req = 0;
|
|
|
|
|
+ (*req)->internal_req = NULL;
|
|
|
|
|
+ (*req)->is_internal_req = 0;
|
|
|
|
|
+ (*req)->envelope = NULL;
|
|
|
|
|
+ (*req)->sequential_consistency = 1;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/********************************************************/
|
|
/********************************************************/
|
|
@@ -310,35 +130,33 @@ static void _starpu_mpi_request_init(struct _starpu_mpi_req *req)
|
|
|
int is_internal_req,
|
|
int is_internal_req,
|
|
|
ssize_t count)
|
|
ssize_t count)
|
|
|
{
|
|
{
|
|
|
|
|
+ struct _starpu_mpi_req *req;
|
|
|
|
|
|
|
|
- _STARPU_MPI_LOG_IN();
|
|
|
|
|
- struct _starpu_mpi_req *req = malloc(sizeof(struct _starpu_mpi_req));
|
|
|
|
|
- STARPU_ASSERT_MSG(req, "Invalid request");
|
|
|
|
|
-
|
|
|
|
|
- _STARPU_MPI_INC_POSTED_REQUESTS(1);
|
|
|
|
|
-
|
|
|
|
|
- /* Initialize the request structure */
|
|
|
|
|
- _starpu_mpi_request_init(req);
|
|
|
|
|
- req->request_type = request_type;
|
|
|
|
|
- req->data_handle = data_handle;
|
|
|
|
|
- req->srcdst = srcdst;
|
|
|
|
|
- req->mpi_tag = mpi_tag;
|
|
|
|
|
- req->comm = comm;
|
|
|
|
|
- req->detached = detached;
|
|
|
|
|
- req->callback = callback;
|
|
|
|
|
- req->callback_arg = arg;
|
|
|
|
|
- req->func = func;
|
|
|
|
|
- req->sequential_consistency = sequential_consistency;
|
|
|
|
|
- req->is_internal_req = is_internal_req;
|
|
|
|
|
- req->count = count;
|
|
|
|
|
-
|
|
|
|
|
- /* Asynchronously request StarPU to fetch the data in main memory: when
|
|
|
|
|
- * it is available in main memory, _starpu_mpi_submit_new_mpi_request(req) is called and
|
|
|
|
|
- * the request is actually submitted */
|
|
|
|
|
- starpu_data_acquire_cb_sequential_consistency(data_handle, mode, _starpu_mpi_submit_new_mpi_request, (void *)req, sequential_consistency);
|
|
|
|
|
|
|
+ _STARPU_MPI_LOG_IN();
|
|
|
|
|
+ _STARPU_MPI_INC_POSTED_REQUESTS(1);
|
|
|
|
|
|
|
|
- _STARPU_MPI_LOG_OUT();
|
|
|
|
|
- return req;
|
|
|
|
|
|
|
+ /* Initialize the request structure */
|
|
|
|
|
+ _starpu_mpi_request_init(&req);
|
|
|
|
|
+ req->request_type = request_type;
|
|
|
|
|
+ req->data_handle = data_handle;
|
|
|
|
|
+ req->srcdst = srcdst;
|
|
|
|
|
+ req->mpi_tag = mpi_tag;
|
|
|
|
|
+ req->comm = comm;
|
|
|
|
|
+ req->detached = detached;
|
|
|
|
|
+ req->callback = callback;
|
|
|
|
|
+ req->callback_arg = arg;
|
|
|
|
|
+ req->func = func;
|
|
|
|
|
+ req->sequential_consistency = sequential_consistency;
|
|
|
|
|
+ req->is_internal_req = is_internal_req;
|
|
|
|
|
+ req->count = count;
|
|
|
|
|
+
|
|
|
|
|
+ /* Asynchronously request StarPU to fetch the data in main memory: when
|
|
|
|
|
+ * it is available in main memory, _starpu_mpi_submit_new_mpi_request(req) is called and
|
|
|
|
|
+ * the request is actually submitted */
|
|
|
|
|
+ starpu_data_acquire_cb_sequential_consistency(data_handle, mode, _starpu_mpi_submit_new_mpi_request, (void *)req, sequential_consistency);
|
|
|
|
|
+
|
|
|
|
|
+ _STARPU_MPI_LOG_OUT();
|
|
|
|
|
+ return req;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/********************************************************/
|
|
/********************************************************/
|
|
@@ -608,15 +426,11 @@ static void _starpu_mpi_wait_func(struct _starpu_mpi_req *waiting_req)
|
|
|
|
|
|
|
|
int starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status)
|
|
int starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status)
|
|
|
{
|
|
{
|
|
|
- _STARPU_MPI_LOG_IN();
|
|
|
|
|
int ret;
|
|
int ret;
|
|
|
-
|
|
|
|
|
- struct _starpu_mpi_req *waiting_req = malloc(sizeof(struct _starpu_mpi_req));
|
|
|
|
|
- _starpu_mpi_request_init(waiting_req);
|
|
|
|
|
- STARPU_ASSERT_MSG(waiting_req, "Allocation failed");
|
|
|
|
|
-
|
|
|
|
|
struct _starpu_mpi_req *req = *public_req;
|
|
struct _starpu_mpi_req *req = *public_req;
|
|
|
|
|
+ struct _starpu_mpi_req *waiting_req;
|
|
|
|
|
|
|
|
|
|
+ _STARPU_MPI_LOG_IN();
|
|
|
_STARPU_MPI_INC_POSTED_REQUESTS(1);
|
|
_STARPU_MPI_INC_POSTED_REQUESTS(1);
|
|
|
|
|
|
|
|
/* We cannot try to complete a MPI request that was not actually posted
|
|
/* We cannot try to complete a MPI request that was not actually posted
|
|
@@ -627,7 +441,7 @@ int starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status)
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&(req->req_mutex));
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&(req->req_mutex));
|
|
|
|
|
|
|
|
/* Initialize the request structure */
|
|
/* Initialize the request structure */
|
|
|
- _starpu_mpi_request_init(waiting_req);
|
|
|
|
|
|
|
+ _starpu_mpi_request_init(&waiting_req);
|
|
|
waiting_req->status = status;
|
|
waiting_req->status = status;
|
|
|
waiting_req->other_request = req;
|
|
waiting_req->other_request = req;
|
|
|
waiting_req->func = _starpu_mpi_wait_func;
|
|
waiting_req->func = _starpu_mpi_wait_func;
|
|
@@ -704,9 +518,8 @@ int starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status)
|
|
|
|
|
|
|
|
if (submitted)
|
|
if (submitted)
|
|
|
{
|
|
{
|
|
|
- struct _starpu_mpi_req *testing_req = malloc(sizeof(struct _starpu_mpi_req));
|
|
|
|
|
- STARPU_ASSERT_MSG(testing_req, "allocation failed");
|
|
|
|
|
- _starpu_mpi_request_init(testing_req);
|
|
|
|
|
|
|
+ struct _starpu_mpi_req *testing_req;
|
|
|
|
|
+ _starpu_mpi_request_init(&testing_req);
|
|
|
|
|
|
|
|
/* Initialize the request structure */
|
|
/* Initialize the request structure */
|
|
|
STARPU_PTHREAD_MUTEX_INIT(&(testing_req->req_mutex), NULL);
|
|
STARPU_PTHREAD_MUTEX_INIT(&(testing_req->req_mutex), NULL);
|
|
@@ -768,11 +581,11 @@ static void _starpu_mpi_barrier_func(struct _starpu_mpi_req *barrier_req)
|
|
|
|
|
|
|
|
int starpu_mpi_barrier(MPI_Comm comm)
|
|
int starpu_mpi_barrier(MPI_Comm comm)
|
|
|
{
|
|
{
|
|
|
- _STARPU_MPI_LOG_IN();
|
|
|
|
|
int ret;
|
|
int ret;
|
|
|
- struct _starpu_mpi_req *barrier_req = malloc(sizeof(struct _starpu_mpi_req));
|
|
|
|
|
- STARPU_ASSERT_MSG(barrier_req, "allocation failed");
|
|
|
|
|
- _starpu_mpi_request_init(barrier_req);
|
|
|
|
|
|
|
+ struct _starpu_mpi_req *barrier_req;
|
|
|
|
|
+
|
|
|
|
|
+ _STARPU_MPI_LOG_IN();
|
|
|
|
|
+ _starpu_mpi_request_init(&barrier_req);
|
|
|
|
|
|
|
|
/* First wait for *both* all tasks and MPI requests to finish, in case
|
|
/* First wait for *both* all tasks and MPI requests to finish, in case
|
|
|
* some tasks generate MPI requests, MPI requests generate tasks, etc.
|
|
* some tasks generate MPI requests, MPI requests generate tasks, etc.
|
|
@@ -855,11 +668,11 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req)
|
|
|
|
|
|
|
|
if (req->internal_req)
|
|
if (req->internal_req)
|
|
|
{
|
|
{
|
|
|
- struct _starpu_mpi_copy_handle *chandle = find_chandle(req->mpi_tag, req->srcdst);
|
|
|
|
|
- STARPU_ASSERT_MSG(chandle, "Could not find a copy data handle with the tag %d and the node %d\n", req->mpi_tag, req->srcdst);
|
|
|
|
|
- _STARPU_MPI_DEBUG(3, "Handling deleting of copy_handle structure from the hashmap..\n");
|
|
|
|
|
- delete_chandle(chandle);
|
|
|
|
|
- free(chandle);
|
|
|
|
|
|
|
+ struct _starpu_mpi_early_data_handle *early_data_handle = _starpu_mpi_early_data_find(req->mpi_tag, req->srcdst);
|
|
|
|
|
+ STARPU_ASSERT_MSG(early_data_handle, "Could not find a copy data handle with the tag %d and the node %d\n", req->mpi_tag, req->srcdst);
|
|
|
|
|
+ _STARPU_MPI_DEBUG(3, "Handling deleting of early_data structure from the hashmap..\n");
|
|
|
|
|
+ _starpu_mpi_early_data_delete(early_data_handle);
|
|
|
|
|
+ free(early_data_handle);
|
|
|
}
|
|
}
|
|
|
else
|
|
else
|
|
|
{
|
|
{
|
|
@@ -911,17 +724,17 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req)
|
|
|
_STARPU_MPI_LOG_OUT();
|
|
_STARPU_MPI_LOG_OUT();
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-struct _starpu_mpi_copy_cb_args
|
|
|
|
|
|
|
+struct _starpu_mpi_early_data_cb_args
|
|
|
{
|
|
{
|
|
|
starpu_data_handle_t data_handle;
|
|
starpu_data_handle_t data_handle;
|
|
|
- starpu_data_handle_t copy_handle;
|
|
|
|
|
|
|
+ starpu_data_handle_t early_handle;
|
|
|
struct _starpu_mpi_req *req;
|
|
struct _starpu_mpi_req *req;
|
|
|
void *buffer;
|
|
void *buffer;
|
|
|
};
|
|
};
|
|
|
|
|
|
|
|
-static void _starpu_mpi_copy_cb(void* arg)
|
|
|
|
|
|
|
+static void _starpu_mpi_early_data_cb(void* arg)
|
|
|
{
|
|
{
|
|
|
- struct _starpu_mpi_copy_cb_args *args = arg;
|
|
|
|
|
|
|
+ struct _starpu_mpi_early_data_cb_args *args = arg;
|
|
|
|
|
|
|
|
// We store in the application request the internal MPI
|
|
// We store in the application request the internal MPI
|
|
|
// request so that it can be used by starpu_mpi_wait
|
|
// request so that it can be used by starpu_mpi_wait
|
|
@@ -931,16 +744,16 @@ static void _starpu_mpi_copy_cb(void* arg)
|
|
|
if (args->buffer)
|
|
if (args->buffer)
|
|
|
{
|
|
{
|
|
|
/* Data has been received as a raw memory, it has to be unpacked */
|
|
/* Data has been received as a raw memory, it has to be unpacked */
|
|
|
- struct starpu_data_interface_ops *itf_src = starpu_data_get_interface_ops(args->copy_handle);
|
|
|
|
|
|
|
+ struct starpu_data_interface_ops *itf_src = starpu_data_get_interface_ops(args->early_handle);
|
|
|
struct starpu_data_interface_ops *itf_dst = starpu_data_get_interface_ops(args->data_handle);
|
|
struct starpu_data_interface_ops *itf_dst = starpu_data_get_interface_ops(args->data_handle);
|
|
|
STARPU_ASSERT_MSG(itf_dst->unpack_data, "The data interface does not define an unpack function\n");
|
|
STARPU_ASSERT_MSG(itf_dst->unpack_data, "The data interface does not define an unpack function\n");
|
|
|
- itf_dst->unpack_data(args->data_handle, STARPU_MAIN_RAM, args->buffer, itf_src->get_size(args->copy_handle));
|
|
|
|
|
|
|
+ itf_dst->unpack_data(args->data_handle, STARPU_MAIN_RAM, args->buffer, itf_src->get_size(args->early_handle));
|
|
|
free(args->buffer);
|
|
free(args->buffer);
|
|
|
}
|
|
}
|
|
|
else
|
|
else
|
|
|
{
|
|
{
|
|
|
- struct starpu_data_interface_ops *itf = starpu_data_get_interface_ops(args->copy_handle);
|
|
|
|
|
- void* itf_src = starpu_data_get_interface_on_node(args->copy_handle, STARPU_MAIN_RAM);
|
|
|
|
|
|
|
+ struct starpu_data_interface_ops *itf = starpu_data_get_interface_ops(args->early_handle);
|
|
|
|
|
+ void* itf_src = starpu_data_get_interface_on_node(args->early_handle, STARPU_MAIN_RAM);
|
|
|
void* itf_dst = starpu_data_get_interface_on_node(args->data_handle, STARPU_MAIN_RAM);
|
|
void* itf_dst = starpu_data_get_interface_on_node(args->data_handle, STARPU_MAIN_RAM);
|
|
|
|
|
|
|
|
if (!itf->copy_methods->ram_to_ram)
|
|
if (!itf->copy_methods->ram_to_ram)
|
|
@@ -955,11 +768,11 @@ static void _starpu_mpi_copy_cb(void* arg)
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- _STARPU_MPI_DEBUG(3, "Done, handling release of copy_handle..\n");
|
|
|
|
|
- starpu_data_release(args->copy_handle);
|
|
|
|
|
|
|
+ _STARPU_MPI_DEBUG(3, "Done, handling release of early_handle..\n");
|
|
|
|
|
+ starpu_data_release(args->early_handle);
|
|
|
|
|
|
|
|
- _STARPU_MPI_DEBUG(3, "Done, handling unregister of copy_handle..\n");
|
|
|
|
|
- starpu_data_unregister_submit(args->copy_handle);
|
|
|
|
|
|
|
+ _STARPU_MPI_DEBUG(3, "Done, handling unregister of early_handle..\n");
|
|
|
|
|
+ starpu_data_unregister_submit(args->early_handle);
|
|
|
|
|
|
|
|
_STARPU_MPI_DEBUG(3, "Done, handling request %p termination of the already received request\n",args->req);
|
|
_STARPU_MPI_DEBUG(3, "Done, handling request %p termination of the already received request\n",args->req);
|
|
|
// If the request is detached, we need to call _starpu_mpi_handle_request_termination
|
|
// If the request is detached, we need to call _starpu_mpi_handle_request_termination
|
|
@@ -1019,41 +832,41 @@ static void _starpu_mpi_submit_new_mpi_request(void *arg)
|
|
|
else
|
|
else
|
|
|
{
|
|
{
|
|
|
/* test whether the receive request has already been submitted internally by StarPU-MPI*/
|
|
/* test whether the receive request has already been submitted internally by StarPU-MPI*/
|
|
|
- struct _starpu_mpi_copy_handle *chandle = find_chandle(req->mpi_tag, req->srcdst);
|
|
|
|
|
|
|
+ struct _starpu_mpi_early_data_handle *early_data_handle = _starpu_mpi_early_data_find(req->mpi_tag, req->srcdst);
|
|
|
|
|
|
|
|
/* Case : the request has already been submitted internally by StarPU.
|
|
/* Case : the request has already been submitted internally by StarPU.
|
|
|
* We'll asynchronously ask a Read permission over the temporary handle, so as when
|
|
* We'll asynchronously ask a Read permission over the temporary handle, so as when
|
|
|
- * the internal receive will be over, the _starpu_mpi_copy_cb function will be called to
|
|
|
|
|
|
|
+ * the internal receive will be over, the _starpu_mpi_early_data_cb function will be called to
|
|
|
* bring the data back to the original data handle associated to the request.*/
|
|
* bring the data back to the original data handle associated to the request.*/
|
|
|
- if (chandle)
|
|
|
|
|
|
|
+ if (early_data_handle)
|
|
|
{
|
|
{
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&(chandle->req_mutex));
|
|
|
|
|
- while (!(chandle->req_ready))
|
|
|
|
|
- STARPU_PTHREAD_COND_WAIT(&(chandle->req_cond), &(chandle->req_mutex));
|
|
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&(chandle->req_mutex));
|
|
|
|
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&(early_data_handle->req_mutex));
|
|
|
|
|
+ while (!(early_data_handle->req_ready))
|
|
|
|
|
+ STARPU_PTHREAD_COND_WAIT(&(early_data_handle->req_cond), &(early_data_handle->req_mutex));
|
|
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&(early_data_handle->req_mutex));
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
|
|
|
|
|
_STARPU_MPI_DEBUG(3, "The RECV request %p with tag %d has already been received, copying previously received data into handle's pointer..\n", req, req->mpi_tag);
|
|
_STARPU_MPI_DEBUG(3, "The RECV request %p with tag %d has already been received, copying previously received data into handle's pointer..\n", req, req->mpi_tag);
|
|
|
- STARPU_ASSERT(req->data_handle != chandle->handle);
|
|
|
|
|
|
|
+ STARPU_ASSERT(req->data_handle != early_data_handle->handle);
|
|
|
|
|
|
|
|
- req->internal_req = chandle->req;
|
|
|
|
|
|
|
+ req->internal_req = early_data_handle->req;
|
|
|
|
|
|
|
|
- struct _starpu_mpi_copy_cb_args *cb_args = malloc(sizeof(struct _starpu_mpi_copy_cb_args));
|
|
|
|
|
|
|
+ struct _starpu_mpi_early_data_cb_args *cb_args = malloc(sizeof(struct _starpu_mpi_early_data_cb_args));
|
|
|
cb_args->data_handle = req->data_handle;
|
|
cb_args->data_handle = req->data_handle;
|
|
|
- cb_args->copy_handle = chandle->handle;
|
|
|
|
|
- cb_args->buffer = chandle->buffer;
|
|
|
|
|
|
|
+ cb_args->early_handle = early_data_handle->handle;
|
|
|
|
|
+ cb_args->buffer = early_data_handle->buffer;
|
|
|
cb_args->req = req;
|
|
cb_args->req = req;
|
|
|
|
|
|
|
|
_STARPU_MPI_DEBUG(3, "Calling data_acquire_cb on starpu_mpi_copy_cb..\n");
|
|
_STARPU_MPI_DEBUG(3, "Calling data_acquire_cb on starpu_mpi_copy_cb..\n");
|
|
|
- starpu_data_acquire_cb(chandle->handle,STARPU_R,_starpu_mpi_copy_cb,(void*) cb_args);
|
|
|
|
|
|
|
+ starpu_data_acquire_cb(early_data_handle->handle,STARPU_R,_starpu_mpi_early_data_cb,(void*) cb_args);
|
|
|
}
|
|
}
|
|
|
/* Case : a classic receive request with no send received earlier than expected.
|
|
/* Case : a classic receive request with no send received earlier than expected.
|
|
|
* We just add the pending receive request to the requests' hashmap. */
|
|
* We just add the pending receive request to the requests' hashmap. */
|
|
|
else
|
|
else
|
|
|
{
|
|
{
|
|
|
_STARPU_MPI_DEBUG(3, "Adding the pending receive request %p (srcdst %d tag %d) into the request hashmap\n", req, req->srcdst, req->mpi_tag);
|
|
_STARPU_MPI_DEBUG(3, "Adding the pending receive request %p (srcdst %d tag %d) into the request hashmap\n", req, req->srcdst, req->mpi_tag);
|
|
|
- add_app_req(req);
|
|
|
|
|
|
|
+ _starpu_mpi_early_request_add(req);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
@@ -1252,14 +1065,8 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
_starpu_mpi_comm_amounts_init(MPI_COMM_WORLD);
|
|
_starpu_mpi_comm_amounts_init(MPI_COMM_WORLD);
|
|
|
_starpu_mpi_cache_init(MPI_COMM_WORLD);
|
|
_starpu_mpi_cache_init(MPI_COMM_WORLD);
|
|
|
|
|
|
|
|
- {
|
|
|
|
|
- int nb_nodes, k;
|
|
|
|
|
- MPI_Comm_size(MPI_COMM_WORLD, &nb_nodes);
|
|
|
|
|
- _starpu_mpi_app_req_hashmap = malloc(nb_nodes * sizeof(struct _starpu_mpi_req *));
|
|
|
|
|
- for(k=0 ; k<nb_nodes ; k++) _starpu_mpi_app_req_hashmap[k] = NULL;
|
|
|
|
|
- _starpu_mpi_copy_handle_hashmap = malloc(nb_nodes * sizeof(struct _starpu_mpi_copy_handle_hash_list *));
|
|
|
|
|
- for(k=0 ; k<nb_nodes ; k++) _starpu_mpi_copy_handle_hashmap[k] = NULL;
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ _starpu_mpi_early_request_init(worldsize);
|
|
|
|
|
+ _starpu_mpi_early_data_init(worldsize);
|
|
|
|
|
|
|
|
/* notify the main thread that the progression thread is ready */
|
|
/* notify the main thread that the progression thread is ready */
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
@@ -1276,7 +1083,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
while (running || posted_requests || !(_starpu_mpi_req_list_empty(new_requests)) || !(_starpu_mpi_req_list_empty(detached_requests)))
|
|
while (running || posted_requests || !(_starpu_mpi_req_list_empty(new_requests)) || !(_starpu_mpi_req_list_empty(detached_requests)))
|
|
|
{
|
|
{
|
|
|
/* shall we block ? */
|
|
/* shall we block ? */
|
|
|
- unsigned block = _starpu_mpi_req_list_empty(new_requests) && (_starpu_mpi_app_req_hashmap_count == 0);
|
|
|
|
|
|
|
+ unsigned block = _starpu_mpi_req_list_empty(new_requests) && _starpu_mpi_early_request_count() == 0;
|
|
|
|
|
|
|
|
#ifndef STARPU_MPI_ACTIVITY
|
|
#ifndef STARPU_MPI_ACTIVITY
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
|
|
STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
|
|
@@ -1316,7 +1123,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
/* If there is no currently submitted header_req submitted to catch envelopes from senders, and there is some pending receive
|
|
/* If there is no currently submitted header_req submitted to catch envelopes from senders, and there is some pending receive
|
|
|
* requests in our side, we resubmit a header request. */
|
|
* requests in our side, we resubmit a header request. */
|
|
|
MPI_Request header_req;
|
|
MPI_Request header_req;
|
|
|
- if ((_starpu_mpi_app_req_hashmap_count > 0) && (header_req_submitted == 0))// && (HASH_COUNT(_starpu_mpi_copy_handle_hashmap) == 0))
|
|
|
|
|
|
|
+ if ((_starpu_mpi_early_request_count() > 0) && (header_req_submitted == 0))// && (HASH_COUNT(_starpu_mpi_early_data_handle_hashmap) == 0))
|
|
|
{
|
|
{
|
|
|
_STARPU_MPI_DEBUG(3, "Posting a receive to get a data envelop\n");
|
|
_STARPU_MPI_DEBUG(3, "Posting a receive to get a data envelop\n");
|
|
|
MPI_Irecv(recv_env, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, MPI_ANY_SOURCE, _starpu_mpi_tag, MPI_COMM_WORLD, &header_req);
|
|
MPI_Irecv(recv_env, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, MPI_ANY_SOURCE, _starpu_mpi_tag, MPI_COMM_WORLD, &header_req);
|
|
@@ -1342,14 +1149,14 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
{
|
|
{
|
|
|
_STARPU_MPI_DEBUG(3, "Searching for application request with tag %d and source %d (size %ld)\n", recv_env->mpi_tag, status.MPI_SOURCE, recv_env->size);
|
|
_STARPU_MPI_DEBUG(3, "Searching for application request with tag %d and source %d (size %ld)\n", recv_env->mpi_tag, status.MPI_SOURCE, recv_env->size);
|
|
|
|
|
|
|
|
- struct _starpu_mpi_req *found_req = find_app_req(recv_env->mpi_tag, status.MPI_SOURCE);
|
|
|
|
|
|
|
+ struct _starpu_mpi_req *found_req = _starpu_mpi_early_request_find(recv_env->mpi_tag, status.MPI_SOURCE);
|
|
|
|
|
|
|
|
/* Case : a data will arrive before the matching receive has been submitted in our side of the application.
|
|
/* Case : a data will arrive before the matching receive has been submitted in our side of the application.
|
|
|
* We will allow a temporary handle to store the incoming data, by submitting a starpu_mpi_irecv_detached
|
|
* We will allow a temporary handle to store the incoming data, by submitting a starpu_mpi_irecv_detached
|
|
|
* on this handle, and register this so as the StarPU-MPI layer can remember it.*/
|
|
* on this handle, and register this so as the StarPU-MPI layer can remember it.*/
|
|
|
if (!found_req)
|
|
if (!found_req)
|
|
|
{
|
|
{
|
|
|
- _STARPU_MPI_DEBUG(3, "Request with tag %d and source %d not found, creating a copy_handle to receive incoming data..\n", recv_env->mpi_tag, status.MPI_SOURCE);
|
|
|
|
|
|
|
+ _STARPU_MPI_DEBUG(3, "Request with tag %d and source %d not found, creating a early_handle to receive incoming data..\n", recv_env->mpi_tag, status.MPI_SOURCE);
|
|
|
|
|
|
|
|
starpu_data_handle_t data_handle = NULL;
|
|
starpu_data_handle_t data_handle = NULL;
|
|
|
|
|
|
|
@@ -1357,19 +1164,19 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
data_handle = _starpu_data_get_data_handle_from_tag(recv_env->mpi_tag);
|
|
data_handle = _starpu_data_get_data_handle_from_tag(recv_env->mpi_tag);
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
|
|
|
|
|
- struct _starpu_mpi_copy_handle* chandle = calloc(1, sizeof(struct _starpu_mpi_copy_handle));
|
|
|
|
|
- STARPU_ASSERT(chandle);
|
|
|
|
|
- STARPU_PTHREAD_MUTEX_INIT(&chandle->req_mutex, NULL);
|
|
|
|
|
- STARPU_PTHREAD_COND_INIT(&chandle->req_cond, NULL);
|
|
|
|
|
- chandle->mpi_tag = recv_env->mpi_tag;
|
|
|
|
|
- chandle->env = recv_env;
|
|
|
|
|
- chandle->source = status.MPI_SOURCE;
|
|
|
|
|
|
|
+ struct _starpu_mpi_early_data_handle* early_data_handle = calloc(1, sizeof(struct _starpu_mpi_early_data_handle));
|
|
|
|
|
+ STARPU_ASSERT(early_data_handle);
|
|
|
|
|
+ STARPU_PTHREAD_MUTEX_INIT(&early_data_handle->req_mutex, NULL);
|
|
|
|
|
+ STARPU_PTHREAD_COND_INIT(&early_data_handle->req_cond, NULL);
|
|
|
|
|
+ early_data_handle->mpi_tag = recv_env->mpi_tag;
|
|
|
|
|
+ early_data_handle->env = recv_env;
|
|
|
|
|
+ early_data_handle->source = status.MPI_SOURCE;
|
|
|
|
|
|
|
|
if (data_handle)
|
|
if (data_handle)
|
|
|
{
|
|
{
|
|
|
- chandle->buffer = NULL;
|
|
|
|
|
- starpu_data_register_same(&chandle->handle, data_handle);
|
|
|
|
|
- add_chandle(chandle);
|
|
|
|
|
|
|
+ early_data_handle->buffer = NULL;
|
|
|
|
|
+ starpu_data_register_same(&early_data_handle->handle, data_handle);
|
|
|
|
|
+ _starpu_mpi_early_data_add(early_data_handle);
|
|
|
}
|
|
}
|
|
|
else
|
|
else
|
|
|
{
|
|
{
|
|
@@ -1377,15 +1184,17 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
* we are going to receive the data as a raw memory, and give it
|
|
* we are going to receive the data as a raw memory, and give it
|
|
|
* to the application when it post a receive for this tag
|
|
* to the application when it post a receive for this tag
|
|
|
*/
|
|
*/
|
|
|
- _STARPU_MPI_DEBUG(20, "Posting a receive for a data of size %d which has not yet been registered\n", (int)chandle->env->size);
|
|
|
|
|
- chandle->buffer = malloc(chandle->env->size);
|
|
|
|
|
- starpu_vector_data_register(&chandle->handle, STARPU_MAIN_RAM, (uintptr_t) chandle->buffer, chandle->env->size, 1);
|
|
|
|
|
- add_chandle(chandle);
|
|
|
|
|
|
|
+ _STARPU_MPI_DEBUG(20, "Posting a receive for a data of size %d which has not yet been registered\n", (int)early_data_handle->env->size);
|
|
|
|
|
+ early_data_handle->buffer = malloc(early_data_handle->env->size);
|
|
|
|
|
+ starpu_vector_data_register(&early_data_handle->handle, STARPU_MAIN_RAM, (uintptr_t) early_data_handle->buffer, early_data_handle->env->size, 1);
|
|
|
|
|
+ _starpu_mpi_early_data_add(early_data_handle);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- _STARPU_MPI_DEBUG(20, "Posting internal detached irecv on copy_handle with tag %d from src %d ..\n", chandle->mpi_tag, status.MPI_SOURCE);
|
|
|
|
|
|
|
+ _STARPU_MPI_DEBUG(20, "Posting internal detached irecv on early_handle with tag %d from src %d ..\n", early_data_handle->mpi_tag, status.MPI_SOURCE);
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
- chandle->req = _starpu_mpi_irecv_common(chandle->handle, status.MPI_SOURCE, chandle->mpi_tag, MPI_COMM_WORLD, 1, NULL, NULL, 1, 1, recv_env->size);
|
|
|
|
|
|
|
+ early_data_handle->req = _starpu_mpi_irecv_common(early_data_handle->handle, status.MPI_SOURCE,
|
|
|
|
|
+ early_data_handle->mpi_tag, MPI_COMM_WORLD, 1,
|
|
|
|
|
+ NULL, NULL, 1, 1, recv_env->size);
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
|
|
|
|
|
// We wait until the request is pushed in the
|
|
// We wait until the request is pushed in the
|
|
@@ -1394,15 +1203,15 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
// on the request and post the corresponding mpi_irecv,
|
|
// on the request and post the corresponding mpi_irecv,
|
|
|
// otherwise, it may lead to read data as envelop
|
|
// otherwise, it may lead to read data as envelop
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&(chandle->req->posted_mutex));
|
|
|
|
|
- while (!(chandle->req->posted))
|
|
|
|
|
- STARPU_PTHREAD_COND_WAIT(&(chandle->req->posted_cond), &(chandle->req->posted_mutex));
|
|
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&(chandle->req->posted_mutex));
|
|
|
|
|
-
|
|
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&chandle->req_mutex);
|
|
|
|
|
- chandle->req_ready = 1;
|
|
|
|
|
- STARPU_PTHREAD_COND_BROADCAST(&chandle->req_cond);
|
|
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&chandle->req_mutex);
|
|
|
|
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&(early_data_handle->req->posted_mutex));
|
|
|
|
|
+ while (!(early_data_handle->req->posted))
|
|
|
|
|
+ STARPU_PTHREAD_COND_WAIT(&(early_data_handle->req->posted_cond), &(early_data_handle->req->posted_mutex));
|
|
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&(early_data_handle->req->posted_mutex));
|
|
|
|
|
+
|
|
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&early_data_handle->req_mutex);
|
|
|
|
|
+ early_data_handle->req_ready = 1;
|
|
|
|
|
+ STARPU_PTHREAD_COND_BROADCAST(&early_data_handle->req_cond);
|
|
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&early_data_handle->req_mutex);
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
}
|
|
}
|
|
|
/* Case : a matching receive has been found for the incoming data, we handle the correct allocation of the pointer associated to
|
|
/* Case : a matching receive has been found for the incoming data, we handle the correct allocation of the pointer associated to
|
|
@@ -1411,7 +1220,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
{
|
|
{
|
|
|
_STARPU_MPI_DEBUG(3, "A matching receive has been found for the incoming data with tag %d\n", recv_env->mpi_tag);
|
|
_STARPU_MPI_DEBUG(3, "A matching receive has been found for the incoming data with tag %d\n", recv_env->mpi_tag);
|
|
|
|
|
|
|
|
- delete_app_req(found_req);
|
|
|
|
|
|
|
+ _starpu_mpi_early_request_delete(found_req);
|
|
|
|
|
|
|
|
_starpu_mpi_handle_allocate_datatype(found_req->data_handle, &found_req->datatype, &found_req->user_datatype);
|
|
_starpu_mpi_handle_allocate_datatype(found_req->data_handle, &found_req->datatype, &found_req->user_datatype);
|
|
|
if (found_req->user_datatype == 0)
|
|
if (found_req->user_datatype == 0)
|
|
@@ -1448,8 +1257,8 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
STARPU_ASSERT_MSG(_starpu_mpi_req_list_empty(detached_requests), "List of detached requests not empty");
|
|
STARPU_ASSERT_MSG(_starpu_mpi_req_list_empty(detached_requests), "List of detached requests not empty");
|
|
|
STARPU_ASSERT_MSG(_starpu_mpi_req_list_empty(new_requests), "List of new requests not empty");
|
|
STARPU_ASSERT_MSG(_starpu_mpi_req_list_empty(new_requests), "List of new requests not empty");
|
|
|
STARPU_ASSERT_MSG(posted_requests == 0, "Number of posted request is not zero");
|
|
STARPU_ASSERT_MSG(posted_requests == 0, "Number of posted request is not zero");
|
|
|
- STARPU_ASSERT_MSG(_starpu_mpi_app_req_hashmap_count == 0, "Number of receive requests left is not zero");
|
|
|
|
|
- STARPU_ASSERT_MSG(_starpu_mpi_copy_handle_hashmap_count == 0, "Number of copy requests left is not zero");
|
|
|
|
|
|
|
+ _starpu_mpi_early_request_check_termination();
|
|
|
|
|
+ _starpu_mpi_early_data_check_termination();
|
|
|
|
|
|
|
|
if (argc_argv->initialize_mpi)
|
|
if (argc_argv->initialize_mpi)
|
|
|
{
|
|
{
|
|
@@ -1459,27 +1268,8 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
|
|
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
|
|
|
|
|
- {
|
|
|
|
|
- int n;
|
|
|
|
|
- struct _starpu_mpi_copy_handle_hashlist *hashlist;
|
|
|
|
|
-
|
|
|
|
|
- for(n=0 ; n<worldsize; n++)
|
|
|
|
|
- {
|
|
|
|
|
- for(hashlist=_starpu_mpi_copy_handle_hashmap[n]; hashlist != NULL; hashlist=hashlist->hh.next)
|
|
|
|
|
- {
|
|
|
|
|
- _starpu_mpi_copy_handle_list_delete(hashlist->list);
|
|
|
|
|
- }
|
|
|
|
|
- struct _starpu_mpi_copy_handle_hashlist *current, *tmp;
|
|
|
|
|
- HASH_ITER(hh, _starpu_mpi_copy_handle_hashmap[n], current, tmp)
|
|
|
|
|
- {
|
|
|
|
|
- HASH_DEL(_starpu_mpi_copy_handle_hashmap[n], current);
|
|
|
|
|
- free(current);
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- free(_starpu_mpi_app_req_hashmap);
|
|
|
|
|
- free(_starpu_mpi_copy_handle_hashmap);
|
|
|
|
|
|
|
+ _starpu_mpi_early_data_free(worldsize);
|
|
|
|
|
+ _starpu_mpi_early_request_free();
|
|
|
free(argc_argv);
|
|
free(argc_argv);
|
|
|
free(recv_env);
|
|
free(recv_env);
|
|
|
|
|
|