|
@@ -2,7 +2,7 @@
|
|
*
|
|
*
|
|
* Copyright (C) 2017 Inria
|
|
* Copyright (C) 2017 Inria
|
|
* Copyright (C) 2017 Guillaume Beauchamp
|
|
* Copyright (C) 2017 Guillaume Beauchamp
|
|
- * Copyright (C) 2010-2015,2017 CNRS
|
|
|
|
|
|
+ * Copyright (C) 2010-2015,2017,2018 CNRS
|
|
* Copyright (C) 2009-2014,2017-2018 Université de Bordeaux
|
|
* Copyright (C) 2009-2014,2017-2018 Université de Bordeaux
|
|
*
|
|
*
|
|
* StarPU is free software; you can redistribute it and/or modify
|
|
* StarPU is free software; you can redistribute it and/or modify
|
|
@@ -39,20 +39,15 @@
|
|
#include <nm_sendrecv_interface.h>
|
|
#include <nm_sendrecv_interface.h>
|
|
#include <nm_mpi_nmad.h>
|
|
#include <nm_mpi_nmad.h>
|
|
|
|
|
|
|
|
+
|
|
static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req,nm_sr_event_t event);
|
|
static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req,nm_sr_event_t event);
|
|
#ifdef STARPU_VERBOSE
|
|
#ifdef STARPU_VERBOSE
|
|
static char *_starpu_mpi_request_type(enum _starpu_mpi_request_type request_type);
|
|
static char *_starpu_mpi_request_type(enum _starpu_mpi_request_type request_type);
|
|
#endif
|
|
#endif
|
|
-static void _starpu_mpi_handle_new_request(void *arg);
|
|
|
|
|
|
|
|
static void _starpu_mpi_handle_pending_request(struct _starpu_mpi_req *req);
|
|
static void _starpu_mpi_handle_pending_request(struct _starpu_mpi_req *req);
|
|
static void _starpu_mpi_add_sync_point_in_fxt(void);
|
|
static void _starpu_mpi_add_sync_point_in_fxt(void);
|
|
|
|
|
|
-static int mpi_thread_cpuid = -1;
|
|
|
|
-static int use_prio = 1;
|
|
|
|
-int _starpu_mpi_fake_world_size = -1;
|
|
|
|
-int _starpu_mpi_fake_world_rank = -1;
|
|
|
|
-
|
|
|
|
/* Condition to wake up waiting for all current MPI requests to finish */
|
|
/* Condition to wake up waiting for all current MPI requests to finish */
|
|
static starpu_pthread_t progress_thread;
|
|
static starpu_pthread_t progress_thread;
|
|
static starpu_pthread_cond_t progress_cond;
|
|
static starpu_pthread_cond_t progress_cond;
|
|
@@ -72,74 +67,6 @@ static callback_lfstack_t callback_stack = NULL;
|
|
|
|
|
|
static starpu_sem_t callback_sem;
|
|
static starpu_sem_t callback_sem;
|
|
|
|
|
|
-void _starpu_mpi_request_init(struct _starpu_mpi_req **req)
|
|
|
|
-{
|
|
|
|
- _STARPU_MPI_CALLOC(*req, 1, sizeof(struct _starpu_mpi_req));
|
|
|
|
-
|
|
|
|
- /* Initialize the request structure */
|
|
|
|
- (*req)->data_handle = NULL;
|
|
|
|
- (*req)->prio = 0;
|
|
|
|
- (*req)->completed = 0;
|
|
|
|
-
|
|
|
|
- (*req)->datatype = 0;
|
|
|
|
- (*req)->datatype_name = NULL;
|
|
|
|
- (*req)->ptr = NULL;
|
|
|
|
- (*req)->count = -1;
|
|
|
|
- (*req)->registered_datatype = -1;
|
|
|
|
-
|
|
|
|
- (*req)->node_tag.rank = -1;
|
|
|
|
- (*req)->node_tag.data_tag = -1;
|
|
|
|
- (*req)->node_tag.comm = 0;
|
|
|
|
-
|
|
|
|
- (*req)->func = NULL;
|
|
|
|
-
|
|
|
|
- (*req)->status = NULL;
|
|
|
|
- // (*req)->data_request = 0;
|
|
|
|
- (*req)->flag = NULL;
|
|
|
|
-
|
|
|
|
- (*req)->ret = -1;
|
|
|
|
- piom_cond_init(&((*req)->req_cond), 0);
|
|
|
|
- //STARPU_PTHREAD_MUTEX_INIT(&((*req)->req_mutex), NULL);
|
|
|
|
- //STARPU_PTHREAD_COND_INIT(&((*req)->req_cond), NULL);
|
|
|
|
- // STARPU_PTHREAD_MUTEX_INIT(&((*req)->posted_mutex), NULL);
|
|
|
|
- //STARPU_PTHREAD_COND_INIT(&((*req)->posted_cond), NULL);
|
|
|
|
-
|
|
|
|
- (*req)->request_type = UNKNOWN_REQ;
|
|
|
|
-
|
|
|
|
- (*req)->submitted = 0;
|
|
|
|
- (*req)->completed = 0;
|
|
|
|
- (*req)->posted = 0;
|
|
|
|
-
|
|
|
|
- //(*req)->other_request = NULL;
|
|
|
|
-
|
|
|
|
- (*req)->sync = 0;
|
|
|
|
- (*req)->detached = -1;
|
|
|
|
- (*req)->callback = NULL;
|
|
|
|
- (*req)->callback_arg = NULL;
|
|
|
|
-
|
|
|
|
- // (*req)->size_req = 0;
|
|
|
|
- //(*req)->internal_req = NULL;
|
|
|
|
- //(*req)->is_internal_req = 0;
|
|
|
|
- //(*req)->to_destroy = 1;
|
|
|
|
- //(*req)->early_data_handle = NULL;
|
|
|
|
- //(*req)->envelope = NULL;
|
|
|
|
- (*req)->sequential_consistency = 1;
|
|
|
|
- (*req)->pre_sync_jobid = -1;
|
|
|
|
- (*req)->post_sync_jobid = -1;
|
|
|
|
-
|
|
|
|
-#ifdef STARPU_SIMGRID
|
|
|
|
- starpu_pthread_queue_init(&((*req)->queue));
|
|
|
|
- starpu_pthread_queue_register(&wait, &((*req)->queue));
|
|
|
|
- (*req)->done = 0;
|
|
|
|
-#endif
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-void _starpu_mpi_request_destroy(struct _starpu_mpi_req *req)
|
|
|
|
-{
|
|
|
|
- piom_cond_destroy(&(req->req_cond));
|
|
|
|
- free(req);
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
/********************************************************/
|
|
/********************************************************/
|
|
/* */
|
|
/* */
|
|
/* Send/Receive functionalities */
|
|
/* Send/Receive functionalities */
|
|
@@ -151,53 +78,9 @@ static void nop_acquire_cb(void *arg)
|
|
starpu_data_release(arg);
|
|
starpu_data_release(arg);
|
|
}
|
|
}
|
|
|
|
|
|
-struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle_t data_handle,
|
|
|
|
- int srcdst, starpu_mpi_tag_t data_tag, MPI_Comm comm,
|
|
|
|
- unsigned detached, unsigned sync, int prio, void (*callback)(void *), void *arg,
|
|
|
|
- enum _starpu_mpi_request_type request_type, void (*func)(struct _starpu_mpi_req *),
|
|
|
|
- enum starpu_data_access_mode mode,
|
|
|
|
- int sequential_consistency,
|
|
|
|
- int is_internal_req,
|
|
|
|
- starpu_ssize_t count)
|
|
|
|
|
|
+void _starpu_mpi_req_willpost(struct _starpu_mpi_req *req STARPU_ATTRIBUTE_UNUSED)
|
|
{
|
|
{
|
|
-
|
|
|
|
- struct _starpu_mpi_req *req;
|
|
|
|
-
|
|
|
|
- if (_starpu_mpi_fake_world_size != -1)
|
|
|
|
- {
|
|
|
|
- /* Don't actually do the communication */
|
|
|
|
- starpu_data_acquire_on_node_cb_sequential_consistency(data_handle, STARPU_MAIN_RAM, mode, nop_acquire_cb, data_handle, sequential_consistency);
|
|
|
|
- return NULL;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- _STARPU_MPI_LOG_IN();
|
|
|
|
STARPU_ATOMIC_ADD( &pending_request, 1);
|
|
STARPU_ATOMIC_ADD( &pending_request, 1);
|
|
-
|
|
|
|
- /* Initialize the request structure */
|
|
|
|
- _starpu_mpi_request_init(&req);
|
|
|
|
- req->request_type = request_type;
|
|
|
|
- /* prio_list is sorted by increasing values */
|
|
|
|
- if (use_prio)
|
|
|
|
- req->prio = prio;
|
|
|
|
- req->data_handle = data_handle;
|
|
|
|
- req->node_tag.rank = srcdst;
|
|
|
|
- req->node_tag.data_tag = data_tag;
|
|
|
|
- req->node_tag.comm = comm;
|
|
|
|
- req->detached = detached;
|
|
|
|
- req->sync = sync;
|
|
|
|
- req->callback = callback;
|
|
|
|
- req->callback_arg = arg;
|
|
|
|
- req->func = func;
|
|
|
|
- req->sequential_consistency = sequential_consistency;
|
|
|
|
- nm_mpi_nmad_dest(&req->session, &req->gate, comm, req->node_tag.rank);
|
|
|
|
-
|
|
|
|
- /* Asynchronously request StarPU to fetch the data in main memory: when
|
|
|
|
- * it is available in main memory, _starpu_mpi_submit_new_mpi_request(req) is called and
|
|
|
|
- * the request is actually submitted */
|
|
|
|
- starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(data_handle, STARPU_MAIN_RAM, mode, _starpu_mpi_handle_new_request, (void *)req, sequential_consistency, &req->pre_sync_jobid, &req->post_sync_jobid);
|
|
|
|
-
|
|
|
|
- _STARPU_MPI_LOG_OUT();
|
|
|
|
- return req;
|
|
|
|
}
|
|
}
|
|
|
|
|
|
/********************************************************/
|
|
/********************************************************/
|
|
@@ -505,7 +388,7 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req,n
|
|
nm_mpi_nmad_data_release(req->datatype);
|
|
nm_mpi_nmad_data_release(req->datatype);
|
|
_starpu_mpi_datatype_free(req->data_handle, &req->datatype);
|
|
_starpu_mpi_datatype_free(req->data_handle, &req->datatype);
|
|
}
|
|
}
|
|
- starpu_data_release(req->data_handle);
|
|
|
|
|
|
+ _starpu_mpi_release_req_data(req);
|
|
}
|
|
}
|
|
|
|
|
|
/* Execute the specified callback, if any */
|
|
/* Execute the specified callback, if any */
|
|
@@ -560,13 +443,34 @@ static void _starpu_mpi_handle_pending_request(struct _starpu_mpi_req *req)
|
|
nm_sr_request_monitor(req->session, &(req->data_request), NM_SR_EVENT_FINALIZED,_starpu_mpi_handle_request_termination_callback);
|
|
nm_sr_request_monitor(req->session, &(req->data_request), NM_SR_EVENT_FINALIZED,_starpu_mpi_handle_request_termination_callback);
|
|
}
|
|
}
|
|
|
|
|
|
-static void _starpu_mpi_handle_new_request(void *arg)
|
|
|
|
|
|
+void _starpu_mpi_coop_sends_build_tree(struct _starpu_mpi_coop_sends *coop_sends)
|
|
|
|
+{
|
|
|
|
+ /* TODO: turn them into redirects & forwards */
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+void _starpu_mpi_submit_coop_sends(struct _starpu_mpi_coop_sends *coop_sends, int submit_redirects, int submit_data)
|
|
|
|
+{
|
|
|
|
+ unsigned i, n = coop_sends->n;
|
|
|
|
+
|
|
|
|
+ /* Note: coop_sends might disappear very very soon after last request is submitted */
|
|
|
|
+ for (i = 0; i < n; i++)
|
|
|
|
+ {
|
|
|
|
+ if (coop_sends->reqs_array[i]->request_type == SEND_REQ && submit_data)
|
|
|
|
+ {
|
|
|
|
+ _STARPU_MPI_DEBUG(0, "cooperative sends %p sending to %d\n", coop_sends, coop_sends->reqs_array[i]->node_tag.rank);
|
|
|
|
+ _starpu_mpi_submit_ready_request(coop_sends->reqs_array[i]);
|
|
|
|
+ }
|
|
|
|
+ /* TODO: handle redirect requests */
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+void _starpu_mpi_submit_ready_request(void *arg)
|
|
{
|
|
{
|
|
_STARPU_MPI_LOG_IN();
|
|
_STARPU_MPI_LOG_IN();
|
|
struct _starpu_mpi_req *req = arg;
|
|
struct _starpu_mpi_req *req = arg;
|
|
STARPU_ASSERT_MSG(req, "Invalid request");
|
|
STARPU_ASSERT_MSG(req, "Invalid request");
|
|
|
|
|
|
- /* submit the request to MPI */
|
|
|
|
|
|
+ /* submit the request to MPI directly from submitter */
|
|
_STARPU_MPI_DEBUG(2, "Handling new request %p type %s tag %ld src %d data %p ptr %p datatype '%s' count %d registered_datatype %d \n",
|
|
_STARPU_MPI_DEBUG(2, "Handling new request %p type %s tag %ld src %d data %p ptr %p datatype '%s' count %d registered_datatype %d \n",
|
|
req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.rank, req->data_handle, req->ptr, req->datatype_name, (int)req->count, req->registered_datatype);
|
|
req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.rank, req->data_handle, req->ptr, req->datatype_name, (int)req->count, req->registered_datatype);
|
|
req->func(req);
|
|
req->func(req);
|
|
@@ -581,16 +485,15 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
starpu_pthread_setname("MPI");
|
|
starpu_pthread_setname("MPI");
|
|
|
|
|
|
#ifndef STARPU_SIMGRID
|
|
#ifndef STARPU_SIMGRID
|
|
- if (mpi_thread_cpuid >= 0)
|
|
|
|
- _starpu_bind_thread_on_cpu(mpi_thread_cpuid, STARPU_NOWORKERID);
|
|
|
|
|
|
+ if (_starpu_mpi_thread_cpuid >= 0)
|
|
|
|
+ _starpu_bind_thread_on_cpu(_starpu_mpi_thread_cpuid, STARPU_NOWORKERID);
|
|
_starpu_mpi_do_initialize(argc_argv);
|
|
_starpu_mpi_do_initialize(argc_argv);
|
|
- if (mpi_thread_cpuid >= 0)
|
|
|
|
|
|
+ if (_starpu_mpi_thread_cpuid >= 0)
|
|
/* In case MPI changed the binding */
|
|
/* In case MPI changed the binding */
|
|
- _starpu_bind_thread_on_cpu(mpi_thread_cpuid, STARPU_NOWORKERID);
|
|
|
|
|
|
+ _starpu_bind_thread_on_cpu(_starpu_mpi_thread_cpuid, STARPU_NOWORKERID);
|
|
#endif
|
|
#endif
|
|
|
|
|
|
- _starpu_mpi_fake_world_size = starpu_get_env_number("STARPU_MPI_FAKE_SIZE");
|
|
|
|
- _starpu_mpi_fake_world_rank = starpu_get_env_number("STARPU_MPI_FAKE_RANK");
|
|
|
|
|
|
+ _starpu_mpi_env_init();
|
|
|
|
|
|
#ifdef STARPU_SIMGRID
|
|
#ifdef STARPU_SIMGRID
|
|
/* Now that MPI is set up, let the rest of simgrid get initialized */
|
|
/* Now that MPI is set up, let the rest of simgrid get initialized */
|
|
@@ -636,7 +539,8 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
int err=0;
|
|
int err=0;
|
|
|
|
|
|
if(running || pending_request>0)
|
|
if(running || pending_request>0)
|
|
- {/* shall we block ? */
|
|
|
|
|
|
+ {
|
|
|
|
+ /* shall we block ? */
|
|
err = starpu_sem_wait(&callback_sem);
|
|
err = starpu_sem_wait(&callback_sem);
|
|
//running pending_request can change while waiting
|
|
//running pending_request can change while waiting
|
|
}
|
|
}
|
|
@@ -740,8 +644,6 @@ int _starpu_mpi_progress_init(struct _starpu_mpi_argc_argv *argc_argv)
|
|
|
|
|
|
starpu_sem_init(&callback_sem, 0, 0);
|
|
starpu_sem_init(&callback_sem, 0, 0);
|
|
running = 0;
|
|
running = 0;
|
|
- mpi_thread_cpuid = starpu_get_env_number_default("STARPU_MPI_THREAD_CPUID", -1);
|
|
|
|
- use_prio = starpu_get_env_number_default("STARPU_MPI_PRIORITIES", 1);
|
|
|
|
|
|
|
|
STARPU_PTHREAD_CREATE(&progress_thread, NULL, _starpu_mpi_progress_thread_func, argc_argv);
|
|
STARPU_PTHREAD_CREATE(&progress_thread, NULL, _starpu_mpi_progress_thread_func, argc_argv);
|
|
|
|
|
|
@@ -753,7 +655,7 @@ int _starpu_mpi_progress_init(struct _starpu_mpi_argc_argv *argc_argv)
|
|
return 0;
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
|
|
-void _starpu_mpi_progress_shutdown(void *value)
|
|
|
|
|
|
+void _starpu_mpi_progress_shutdown(void **value)
|
|
{
|
|
{
|
|
/* kill the progression thread */
|
|
/* kill the progression thread */
|
|
STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
|
|
STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
|