|
@@ -39,7 +39,7 @@ extern struct _starpu_mpi_req* _starpu_mpi_irecv_cache_aware(starpu_data_handle_
|
|
|
void _starpu_mpi_treat_ack_receipt_cb(void* _args)
|
|
|
{
|
|
|
struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
|
|
|
- fprintf(stderr, "ack msg recved id:%d inst:%d\n", arg->msg.checkpoint_id, arg->msg.checkpoint_instance);
|
|
|
+ _STARPU_DEBUG(3, "ack msg recved id:%d inst:%d\n", arg->msg.checkpoint_id, arg->msg.checkpoint_instance);
|
|
|
if (_checkpoint_template_digest_ack_reception(arg->msg.checkpoint_id, arg->msg.checkpoint_instance) == 0) {
|
|
|
free(arg);
|
|
|
}
|
|
@@ -55,9 +55,8 @@ void _print_ack_sent_cb(void* _args)
|
|
|
void _starpu_mpi_push_cp_ack_send_cb(void* _args)
|
|
|
{
|
|
|
struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
|
|
|
- fprintf(stderr, "Send ack msg to %d: id=%d inst=%d\n", arg->rank, arg->msg.checkpoint_id, arg->msg.checkpoint_instance);
|
|
|
+ _STARPU_DEBUG(3,"Send ack msg to %d: id=%d inst=%d\n", arg->rank, arg->msg.checkpoint_id, arg->msg.checkpoint_instance);
|
|
|
_ft_service_msg_isend_cb((void*)&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_ACK, MPI_COMM_WORLD, _print_ack_sent_cb, _args);
|
|
|
-
|
|
|
}
|
|
|
|
|
|
void _starpu_mpi_store_data_and_push_cp_ack_send_cb(void* _args)
|
|
@@ -70,7 +69,12 @@ void _starpu_mpi_store_data_and_push_cp_ack_send_cb(void* _args)
|
|
|
void _starpu_mpi_push_cp_ack_recv_cb(void* _args)
|
|
|
{
|
|
|
struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
|
|
|
- fprintf(stderr, "Posting ack recv cb from %d\n", arg->rank);
|
|
|
+ if (STARPU_VALUE == arg->type)
|
|
|
+ {
|
|
|
+ free(starpu_data_handle_to_pointer(arg->handle, STARPU_MAIN_RAM));
|
|
|
+ starpu_data_unregister(arg->handle);
|
|
|
+ }
|
|
|
+ _STARPU_DEBUG(3, "Posting ack recv cb from %d\n", arg->rank);
|
|
|
_ft_service_msg_irecv_cb((void*)&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_ACK, MPI_COMM_WORLD, _starpu_mpi_treat_ack_receipt_cb, _args);
|
|
|
}
|
|
|
|
|
@@ -86,6 +90,14 @@ void _starpu_checkpoint_cached_data_recv_copy_and_ack(void* _arg)
|
|
|
void _starpu_checkpoint_data_recv_copy_and_ack(void* _arg)
|
|
|
{
|
|
|
struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _arg;
|
|
|
+
|
|
|
+ if (STARPU_VALUE == arg->type)
|
|
|
+ {
|
|
|
+ // an handle as specificaly been created, no need to copy the data. Call directly the Callback
|
|
|
+ arg->copy_handle = arg->handle;
|
|
|
+ return _starpu_mpi_store_data_and_push_cp_ack_send_cb(_arg);
|
|
|
+ }
|
|
|
+
|
|
|
starpu_data_register_same(&arg->copy_handle, arg->handle);
|
|
|
starpu_data_cpy(arg->copy_handle, arg->handle, 1, _starpu_mpi_store_data_and_push_cp_ack_send_cb, _arg);
|
|
|
}
|
|
@@ -93,18 +105,19 @@ void _starpu_checkpoint_data_recv_copy_and_ack(void* _arg)
|
|
|
int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_template)
|
|
|
{
|
|
|
// TODO: For now checkpoint are not taken asynchronously. It will be later, and then we will have to acquire READ permissions to StarPU in order to not have the data potentially corrupted.
|
|
|
- starpu_data_handle_t handle;
|
|
|
+ starpu_data_handle_t* handle;
|
|
|
+ struct _starpu_mpi_cp_ack_arg_cb* arg;
|
|
|
+ void* cpy_ptr;
|
|
|
struct _starpu_mpi_checkpoint_template_item* item;
|
|
|
//MPI_Comm comm;
|
|
|
starpu_pthread_mutex_lock(&cp_template->mutex);
|
|
|
- fprintf(stderr, "Mutex taken\n");
|
|
|
set_pending_checkpoint_template(cp_template);
|
|
|
- fprintf(stderr, "Checkpoint now pending\n");
|
|
|
STARPU_ASSERT_MSG(cp_template->pending==0, "Can not submit a checkpoint while previous instance has not succeeded.\n");
|
|
|
|
|
|
cp_template->pending = 1;
|
|
|
cp_template->cp_template_current_instance++;
|
|
|
- cp_template->remaining_ack_awaited = cp_template->message_number;
|
|
|
+ cp_template->remaining_ack_awaited = cp_template->sent_message_number;
|
|
|
+ starpu_pthread_mutex_unlock(&cp_template->mutex);
|
|
|
|
|
|
item = _starpu_mpi_checkpoint_template_get_first_data(cp_template);
|
|
|
|
|
@@ -115,40 +128,63 @@ int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_te
|
|
|
switch (item->type)
|
|
|
{
|
|
|
case STARPU_VALUE:
|
|
|
-// starpu_data_handle_t send_handle;
|
|
|
-// starpu_variable_data_register(&send_handle, STARPU_MAIN_RAM, (uintptr_t)item->ptr, item->count);
|
|
|
-// starpu_mpi_data_register(send_handle, )
|
|
|
-// starpu_mpi_send
|
|
|
+ // TODO: Maybe do not pass via starpu handles for external data, and need to reimplement mpi comm layer for
|
|
|
+ arg = calloc(1, sizeof(struct _starpu_mpi_cp_ack_arg_cb));
|
|
|
+ handle = &arg->handle;
|
|
|
+ arg->tag = item->tag;
|
|
|
+ arg->type = STARPU_VALUE;
|
|
|
+ arg->count = item->count;
|
|
|
+ arg->msg.checkpoint_id = cp_template->cp_template_id;
|
|
|
+ arg->msg.checkpoint_instance = cp_template->cp_template_current_instance;
|
|
|
+ if (item->backupped_by != -1)
|
|
|
+ {
|
|
|
+ cpy_ptr = malloc(item->count);
|
|
|
+ memcpy(cpy_ptr, item->ptr, item->count);
|
|
|
+ starpu_variable_data_register(handle, STARPU_MAIN_RAM, (uintptr_t)cpy_ptr, item->count);
|
|
|
+ arg->rank = item->backupped_by;
|
|
|
+ _STARPU_MPI_DEBUG(0, "Submit CP: sending external data:%d, tag:%ld, to :%d\n", (int)(*(int*)cpy_ptr), arg->tag, arg->rank);
|
|
|
+ starpu_mpi_isend_detached_prio(*handle, arg->rank, arg->tag, 0, MPI_COMM_WORLD,
|
|
|
+ &_starpu_mpi_push_cp_ack_recv_cb, (void*)arg);
|
|
|
+ }
|
|
|
+ else if (item->backup_of != -1)
|
|
|
+ {
|
|
|
+ cpy_ptr = malloc(item->count);
|
|
|
+ starpu_variable_data_register(handle, STARPU_MAIN_RAM, (uintptr_t)cpy_ptr, item->count);
|
|
|
+ arg->rank = item->backup_of;
|
|
|
+ _STARPU_MPI_DEBUG(0, "Submit CP: receiving external data tag:%ld, from :%d\n", arg->tag, arg->rank);
|
|
|
+ starpu_mpi_irecv_detached(*handle, arg->rank, arg->tag, MPI_COMM_WORLD,
|
|
|
+ &_starpu_checkpoint_data_recv_copy_and_ack, (void*)arg);
|
|
|
+ }
|
|
|
break;
|
|
|
case STARPU_R:
|
|
|
- handle = *(starpu_data_handle_t*)item->ptr;
|
|
|
- if (starpu_mpi_data_get_rank(handle)==my_rank)
|
|
|
+ handle = (starpu_data_handle_t*)item->ptr;
|
|
|
+ if (starpu_mpi_data_get_rank(*handle)==my_rank)
|
|
|
{
|
|
|
- fprintf(stderr, "sending to %d (tag %d)\n", item->backupped_by, (int)starpu_mpi_data_get_tag(handle));
|
|
|
- struct _starpu_mpi_cp_ack_arg_cb* arg = calloc(1, sizeof(struct _starpu_mpi_cp_ack_arg_cb));
|
|
|
+ _STARPU_MPI_DEBUG(0, "Submit CP: sending starPU data to %d (tag %d)\n", item->backupped_by, (int)starpu_mpi_data_get_tag(*handle));
|
|
|
+ arg = calloc(1, sizeof(struct _starpu_mpi_cp_ack_arg_cb));
|
|
|
arg->rank = item->backupped_by;
|
|
|
- arg->handle = handle;
|
|
|
- arg->tag = starpu_mpi_data_get_tag(handle);
|
|
|
+ arg->handle = *handle;
|
|
|
+ arg->tag = starpu_mpi_data_get_tag(*handle);
|
|
|
arg->type = STARPU_R;
|
|
|
arg->count = item->count;
|
|
|
arg->msg.checkpoint_id = cp_template->cp_template_id;
|
|
|
arg->msg.checkpoint_instance = cp_template->cp_template_current_instance;
|
|
|
- _starpu_mpi_isend_cache_aware(handle, item->backupped_by, starpu_mpi_data_get_tag(handle), MPI_COMM_WORLD, 1, 0, 0,
|
|
|
+ _starpu_mpi_isend_cache_aware(*handle, item->backupped_by, starpu_mpi_data_get_tag(*handle), MPI_COMM_WORLD, 1, 0, 0,
|
|
|
&_starpu_mpi_push_cp_ack_recv_cb, (void*)arg, &_starpu_mpi_push_cp_ack_recv_cb, (void*)arg, 1);
|
|
|
}
|
|
|
- else if (item->backup_of == starpu_mpi_data_get_rank(handle))
|
|
|
+ else if (item->backup_of == starpu_mpi_data_get_rank(*handle))
|
|
|
{
|
|
|
- fprintf(stderr,"recving from %d (tag %d)\n", starpu_mpi_data_get_rank(handle), (int)starpu_mpi_data_get_tag(handle));
|
|
|
- struct _starpu_mpi_cp_ack_arg_cb* arg = calloc(1, sizeof(struct _starpu_mpi_cp_ack_arg_cb));
|
|
|
+ _STARPU_MPI_DEBUG(0, "Submit CP: receiving starPU data from %d (tag %d)\n", starpu_mpi_data_get_rank(*handle), (int)starpu_mpi_data_get_tag(*handle));
|
|
|
+ arg = calloc(1, sizeof(struct _starpu_mpi_cp_ack_arg_cb));
|
|
|
arg->rank = item->backup_of;
|
|
|
- arg->handle = handle;
|
|
|
- arg->tag = starpu_mpi_data_get_tag(handle);
|
|
|
+ arg->handle = *handle;
|
|
|
+ arg->tag = starpu_mpi_data_get_tag(*handle);
|
|
|
arg->type = STARPU_R;
|
|
|
arg->count = item->count;
|
|
|
arg->msg.checkpoint_id = cp_template->cp_template_id;
|
|
|
arg->msg.checkpoint_instance = cp_template->cp_template_current_instance;
|
|
|
- _starpu_mpi_irecv_cache_aware(handle, starpu_mpi_data_get_rank(handle), starpu_mpi_data_get_tag(handle), MPI_COMM_WORLD, 1, 0,
|
|
|
- &_starpu_checkpoint_data_recv_copy_and_ack, (void*)arg, &_starpu_checkpoint_cached_data_recv_copy_and_ack, (void*)arg, 1, 1, 1);
|
|
|
+ _starpu_mpi_irecv_cache_aware(*handle, starpu_mpi_data_get_rank(*handle), starpu_mpi_data_get_tag(*handle), MPI_COMM_WORLD, 1, 0,
|
|
|
+ &_starpu_checkpoint_data_recv_copy_and_ack, (void*)arg, &_starpu_checkpoint_cached_data_recv_copy_and_ack, (void*)arg, 1, 0, 1);
|
|
|
}
|
|
|
break;
|
|
|
}
|
|
@@ -156,8 +192,6 @@ int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_te
|
|
|
item = _starpu_mpi_checkpoint_template_get_next_data(cp_template, item);
|
|
|
};
|
|
|
|
|
|
- starpu_pthread_mutex_unlock(&cp_template->mutex);
|
|
|
-
|
|
|
return 0;
|
|
|
}
|
|
|
|