Romain LION 5 роки тому
батько
коміт
ed76a1d82e

+ 4 - 2
mpi/src/Makefile.am

@@ -87,7 +87,8 @@ if STARPU_USE_MPI_FT
 noinst_HEADERS +=       \
 	mpi_failure_tolerance/starpu_mpi_ft.h   \
 	mpi_failure_tolerance/starpu_mpi_checkpoint.h    \
-	mpi_failure_tolerance/starpu_mpi_checkpoint_template.h
+	mpi_failure_tolerance/starpu_mpi_checkpoint_template.h \
+	mpi_failure_tolerance/starpu_mpi_ft_service_comms.h
 #	mpi_failure_tolerance/starpu_mpi_checkpoint_package.h
 endif STARPU_USE_MPI_FT
 
@@ -126,6 +127,7 @@ if STARPU_USE_MPI_FT
 libstarpumpi_@STARPU_EFFECTIVE_VERSION@_la_SOURCES +=    \
 	mpi_failure_tolerance/starpu_mpi_ft.c   \
 	mpi_failure_tolerance/starpu_mpi_checkpoint.c    \
-	mpi_failure_tolerance/starpu_mpi_checkpoint_template.c
+	mpi_failure_tolerance/starpu_mpi_checkpoint_template.c   \
+	mpi_failure_tolerance/starpu_mpi_ft_service_comms.c
  #	mpi_failure_tolerance/starpu_mpi_checkpoint_package.c
 endif STARPU_USE_MPI_FT

+ 11 - 248
mpi/src/mpi_failure_tolerance/starpu_mpi_checkpoint.c

@@ -21,30 +21,28 @@
 
 #include <mpi_failure_tolerance/starpu_mpi_checkpoint.h>
 #include <mpi_failure_tolerance/starpu_mpi_checkpoint_template.h>
+#include <mpi_failure_tolerance/starpu_mpi_ft_service_comms.h>
 #include <mpi_failure_tolerance/starpu_mpi_checkpoint_package.h>
 #include <sys/param.h>
 #include <starpu_mpi_private.h>
 #include <mpi/starpu_mpi_mpi_backend.h> // Should be deduced at preprocessing (Nmad vs MPI)
 #include <mpi/starpu_mpi_mpi.h>
 #include "starpu_mpi_cache.h"
+#include "starpu_mpi_ft_service_comms.h"
 
 #define MAX_CP_TEMPLATE_NUMBER 32 // Arbitrary limit
 
 starpu_pthread_mutex_t           cp_lib_mutex;
 int                              my_rank;
 
-static struct _starpu_mpi_req_list detached_ft_service_requests;
-static unsigned detached_send_n_ft_service_requests;
-static starpu_pthread_mutex_t detached_ft_service_requests_mutex;
-
 void _starpu_checkpoint_cached_data_recv_copy_and_ack(void* _arg);
 void _starpu_checkpoint_data_recv_copy_and_ack(void* _arg);
 void _starpu_checkpoint_cached_data_send_copy_and_ack(void* _arg);
-void _starpu_checkpoint_data_send_copy_and_ack(void* _arg);
+void _starpu_checkpoint_data_send_copy_and_ack(void* _args);
 
 void _starpu_mpi_push_cp_ack_recv_cb(void* _args);
 void _starpu_mpi_push_cp_ack_send_cb(void* _args);
-void _starpu_mpi_treat_cache_ack_no_lock_cb(void* args);
+void _starpu_mpi_treat_cache_ack_no_lock_cb(void* _args);
 
 //extern struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle, int source, starpu_mpi_tag_t data_tag, MPI_Comm comm, unsigned detached, unsigned sync, void (*callback)(void *), void *arg, int sequential_consistency, int is_internal_req, starpu_ssize_t count);
 //extern struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t data_handle, int dest, starpu_mpi_tag_t data_tag, MPI_Comm comm, unsigned detached, unsigned sync, int prio, void (*callback)(void *), void *arg, int sequential_consistency);
@@ -126,15 +124,6 @@ int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_te
 //	starpu_pthread_mutex_unlock(&cp_template->mutex);
 //}
 
-int starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template_t* cp_template, int cp_id, ...)
-{
-	va_list varg_list;
-	va_start(varg_list, cp_id);
-	int ret = _starpu_mpi_checkpoint_template_register(cp_template, cp_id, varg_list);
-	va_end(varg_list);
-	return ret;
-}
-
 void _print_ack_sent_cb(void* _args)
 {
 	struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
@@ -165,16 +154,16 @@ void _starpu_checkpoint_cached_data_send_copy_and_ack(void* _arg)
 	starpu_data_release(arg->handle);
 }
 
-void _starpu_checkpoint_data_send_copy_and_ack(void* _arg)
+void _starpu_checkpoint_data_send_copy_and_ack(void* _args)
 {
-	struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _arg;
+	struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
 	starpu_data_register_same(&arg->copy_handle, arg->handle);
-	starpu_data_cpy(arg->copy_handle, arg->handle, 1, _starpu_mpi_push_cp_ack_recv_cb, _arg);
+	starpu_data_cpy(arg->copy_handle, arg->handle, 1, _starpu_mpi_push_cp_ack_recv_cb, _args);
 }
 
-void _starpu_mpi_treat_cache_ack_no_lock_cb(void* args)
+void _starpu_mpi_treat_cache_ack_no_lock_cb(void* _args)
 {
-	starpu_mpi_checkpoint_template_t cp_template = (starpu_mpi_checkpoint_template_t)args;
+	starpu_mpi_checkpoint_template_t cp_template = (starpu_mpi_checkpoint_template_t)_args;
 	cp_template->remaining_ack_awaited--;
 }
 
@@ -188,243 +177,17 @@ void _starpu_mpi_treat_ack_receipt_cb(void* _args)
 
 void _starpu_mpi_push_cp_ack_send_cb(void* _args)
 {
-	struct _starpu_mpi_req* req;
 	struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
 
 	fprintf(stderr, "Send cb\n");
 
-	//starpu_data_acquire(arg->copy_handle, STARPU_R); //Kept in R mode until use or when checkpoint becomes out-of-date
-
-	/* Initialize the request structure */
-	_starpu_mpi_request_init(&req);
-	req->request_type = SEND_REQ;
-	/* prio_list is sorted by increasing values */
-	//TODO: Check compatibility with prio
-	if (_starpu_mpi_use_prio)
-		req->prio = 0;
-	req->data_handle = NULL;
-	req->node_tag.node.rank = arg->rank;
-	req->node_tag.data_tag = _STARPU_MPI_TAG_CP_ACK;
-	req->node_tag.node.comm = MPI_COMM_WORLD;
-	req->detached = 1;
-	req->ptr = (void*)&arg->msg;
-	req->sync = 0;
-	req->datatype = MPI_BYTE;
-	req->callback = _print_ack_sent_cb;
-	req->callback_arg = arg;
-	req->func = NULL;
-	req->sequential_consistency = 1;
-	req->count = sizeof(struct _starpu_mpi_cp_ack_msg);
-
-	_mpi_backend._starpu_mpi_backend_request_fill(req, MPI_COMM_WORLD, 0);
+	_ft_service_msg_isend_cb((void*)&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_ACK, MPI_COMM_WORLD, _print_ack_sent_cb, _args);
 
-	STARPU_PTHREAD_MUTEX_LOCK(&detached_ft_service_requests_mutex);
-
-	MPI_Isend(req->ptr, req->count, req->datatype, req->node_tag.node.rank, req->node_tag.data_tag, req->node_tag.node.comm, &req->backend->data_request);
-	_starpu_mpi_req_list_push_back(&detached_ft_service_requests, req);
-	fprintf(stderr, "pushed send: %p in list %p - prev: %p - next: %p - dest:%d - tag:%d\n", req, &detached_ft_service_requests, _starpu_mpi_req_list_prev(req), _starpu_mpi_req_list_next(req), req->node_tag.node.rank, (int)req->node_tag.data_tag);
-	detached_send_n_ft_service_requests++;
-	req->submitted = 1;
-
-	STARPU_PTHREAD_MUTEX_UNLOCK(&detached_ft_service_requests_mutex);
-
-	_starpu_mpi_wake_up_progress_thread();
 }
 
 void _starpu_mpi_push_cp_ack_recv_cb(void* _args)
 {
-	struct _starpu_mpi_req* req;
 	struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
-	/* Initialize the request structure */
-	_starpu_mpi_request_init(&req);
-	req->request_type = RECV_REQ;
-	/* prio_list is sorted by increasing values */
-	if (_starpu_mpi_use_prio)
-		req->prio = 0;
-	req->data_handle = NULL;
-	req->node_tag.node.rank = arg->rank;
-	req->node_tag.data_tag = _STARPU_MPI_TAG_CP_ACK;
-	req->node_tag.node.comm = MPI_COMM_WORLD;
-	req->detached = 1;
-	req->ptr = malloc(sizeof(struct _starpu_mpi_cp_ack_msg));
-	req->sync = 0;
-	req->datatype = MPI_BYTE;
-	req->callback = _starpu_mpi_treat_ack_receipt_cb;
-	req->callback_arg = req->ptr;
-	req->func = NULL;
-	req->sequential_consistency = 1;
-	req->count = sizeof(struct _starpu_mpi_cp_ack_msg);
-
-	_mpi_backend._starpu_mpi_backend_request_fill(req, MPI_COMM_WORLD, 0);
-
-	STARPU_PTHREAD_MUTEX_LOCK(&detached_ft_service_requests_mutex);
-
-	MPI_Irecv(req->ptr, req->count, req->datatype, req->node_tag.node.rank, req->node_tag.data_tag, req->node_tag.node.comm, &req->backend->data_request);
-	_starpu_mpi_req_list_push_back(&detached_ft_service_requests, req);
-	fprintf(stderr, "pushed recv: %p in list %p - prev: %p - next: %p - src:%d - tag:%d\n", req, &detached_ft_service_requests, _starpu_mpi_req_list_prev(req), _starpu_mpi_req_list_next(req), req->node_tag.node.rank, (int)req->node_tag.data_tag);
-	req->submitted = 1;
-
-	STARPU_PTHREAD_MUTEX_UNLOCK(&detached_ft_service_requests_mutex);
-
-	_starpu_mpi_wake_up_progress_thread();
-}
-
-static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req)
-{
-	_STARPU_MPI_LOG_IN();
-
-	_STARPU_MPI_DEBUG(2, "complete MPI request %p type %s tag %"PRIi64" src %d data %p ptr %p datatype '%s' count %d registered_datatype %d internal_req %p\n",
-	                  req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.node.rank, req->data_handle, req->ptr,
-	                  req->datatype_name, (int)req->count, req->registered_datatype, req->backend->internal_req);
-
-	if (req->backend->internal_req)
-	{
-		free(req->backend->early_data_handle);
-		req->backend->early_data_handle = NULL;
-	}
-	else
-	{
-		if (req->request_type == RECV_REQ || req->request_type == SEND_REQ)
-		{
-			if (req->registered_datatype == 0)
-			{
-				if (req->request_type == SEND_REQ)
-				{
-					// We need to make sure the communication for sending the size
-					// has completed, as MPI can re-order messages, let's call
-					// MPI_Wait to make sure data have been sent
-					int ret;
-					ret = MPI_Wait(&req->backend->size_req, MPI_STATUS_IGNORE);
-					STARPU_MPI_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Wait returning %s", _starpu_mpi_get_mpi_error_code(ret));
-					starpu_free_on_node_flags(STARPU_MAIN_RAM, (uintptr_t)req->ptr, req->count, 0);
-					req->ptr = NULL;
-				}
-				else if (req->request_type == RECV_REQ)
-				{
-					// req->ptr is freed by starpu_data_unpack
-					starpu_data_unpack(req->data_handle, req->ptr, req->count);
-					starpu_memory_deallocate(STARPU_MAIN_RAM, req->count);
-				}
-			}
-			else
-			{
-				//_starpu_mpi_datatype_free(req->data_handle, &req->datatype);
-			}
-		}
-		_STARPU_MPI_TRACE_TERMINATED(req, req->node_tag.node.rank, req->node_tag.data_tag);
-	}
-
-	_starpu_mpi_release_req_data(req);
 
-	if (req->backend->envelope)
-	{
-		free(req->backend->envelope);
-		req->backend->envelope = NULL;
-	}
-
-	/* Execute the specified callback, if any */
-	if (req->callback)
-		req->callback(req->callback_arg);
-
-	/* tell anyone potentially waiting on the request that it is
-	 * terminated now */
-	STARPU_PTHREAD_MUTEX_LOCK(&req->backend->req_mutex);
-	req->completed = 1;
-	STARPU_PTHREAD_COND_BROADCAST(&req->backend->req_cond);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&req->backend->req_mutex);
-	_STARPU_MPI_LOG_OUT();
+	_ft_service_msg_irecv_cb((void*)&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_ACK, MPI_COMM_WORLD, _starpu_mpi_treat_ack_receipt_cb, _args);
 }
-
-void starpu_mpi_test_ft_detached_requests(void)
-{
-	//_STARPU_MPI_LOG_IN();
-	int flag;
-	struct _starpu_mpi_req *req;
-
-	STARPU_PTHREAD_MUTEX_LOCK(&detached_ft_service_requests_mutex);
-
-	if (_starpu_mpi_req_list_empty(&detached_ft_service_requests))
-	{
-		STARPU_PTHREAD_MUTEX_UNLOCK(&detached_ft_service_requests_mutex);
-		//_STARPU_MPI_LOG_OUT();
-		return;
-	}
-
-	_STARPU_MPI_TRACE_TESTING_DETACHED_BEGIN();
-	req = _starpu_mpi_req_list_begin(&detached_ft_service_requests);
-	while (req != _starpu_mpi_req_list_end(&detached_ft_service_requests))
-	{
-		STARPU_PTHREAD_MUTEX_UNLOCK(&detached_ft_service_requests_mutex);
-
-		_STARPU_MPI_TRACE_TEST_BEGIN(req->node_tag.node.rank, req->node_tag.data_tag);
-		//_STARPU_MPI_DEBUG(3, "Test detached request %p - mpitag %"PRIi64" - TYPE %s %d\n", &req->backend->data_request, req->node_tag.data_tag, _starpu_mpi_request_type(req->request_type), req->node_tag.node.rank);
-#ifdef STARPU_SIMGRID
-		req->ret = _starpu_mpi_simgrid_mpi_test(&req->done, &flag);
-#else
-		STARPU_MPI_ASSERT_MSG(req->backend->data_request != MPI_REQUEST_NULL, "Cannot test completion of the request MPI_REQUEST_NULL");
-		req->ret = MPI_Test(&req->backend->data_request, &flag, MPI_STATUS_IGNORE);
-#endif
-
-		STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Test returning %s", _starpu_mpi_get_mpi_error_code(req->ret));
-		_STARPU_MPI_TRACE_TEST_END(req->node_tag.node.rank, req->node_tag.data_tag);
-
-		if (!flag)
-		{
-			req = _starpu_mpi_req_list_next(req);
-		}
-		else
-		{
-			fprintf(stderr, "req success: %d\n", detached_send_n_ft_service_requests);
-			_STARPU_MPI_TRACE_POLLING_END();
-			struct _starpu_mpi_req *next_req;
-			next_req = _starpu_mpi_req_list_next(req);
-
-			_STARPU_MPI_TRACE_COMPLETE_BEGIN(req->request_type, req->node_tag.node.rank, req->node_tag.data_tag);
-
-			STARPU_PTHREAD_MUTEX_LOCK(&detached_ft_service_requests_mutex);
-			if (req->request_type == SEND_REQ)
-				detached_send_n_ft_service_requests--;
-			_starpu_mpi_req_list_erase(&detached_ft_service_requests, req);
-			STARPU_PTHREAD_MUTEX_UNLOCK(&detached_ft_service_requests_mutex);
-			_starpu_mpi_handle_request_termination(req);
-
-			_STARPU_MPI_TRACE_COMPLETE_END(req->request_type, req->node_tag.node.rank, req->node_tag.data_tag);
-
-			STARPU_PTHREAD_MUTEX_LOCK(&req->backend->req_mutex);
-			/* We don't want to free internal non-detached
-			   requests, we need to get their MPI request before
-			   destroying them */
-			if (req->backend->is_internal_req && !req->backend->to_destroy)
-			{
-				/* We have completed the request, let the application request destroy it */
-				req->backend->to_destroy = 1;
-				STARPU_PTHREAD_MUTEX_UNLOCK(&req->backend->req_mutex);
-			}
-			else
-			{
-				STARPU_PTHREAD_MUTEX_UNLOCK(&req->backend->req_mutex);
-				_starpu_mpi_request_destroy(req);
-			}
-
-			req = next_req;
-			_STARPU_MPI_TRACE_POLLING_BEGIN();
-		}
-
-		STARPU_PTHREAD_MUTEX_LOCK(&detached_ft_service_requests_mutex);
-	}
-	_STARPU_MPI_TRACE_TESTING_DETACHED_END();
-
-	STARPU_PTHREAD_MUTEX_UNLOCK(&detached_ft_service_requests_mutex);
-	//_STARPU_MPI_LOG_OUT();
-}
-
-int starpu_mpi_checkpoint_lib_init() {
-	_starpu_mpi_req_list_init(&detached_ft_service_requests);
-	starpu_pthread_mutex_init(&detached_ft_service_requests_mutex, NULL);
-
-	return 0;
-}
-
-int starpu_mpi_checkpoint_lib_busy() {
-	return !_starpu_mpi_req_list_empty(&detached_ft_service_requests);
-}

+ 0 - 2
mpi/src/mpi_failure_tolerance/starpu_mpi_checkpoint.h

@@ -27,8 +27,6 @@ extern "C"
 #endif
 
 void starpu_mpi_test_ft_detached_requests(void);
-int starpu_mpi_checkpoint_lib_init();
-int starpu_mpi_checkpoint_lib_busy();
 
 struct _starpu_mpi_cp_ack_msg
 {

+ 10 - 0
mpi/src/mpi_failure_tolerance/starpu_mpi_checkpoint_template.c

@@ -20,6 +20,7 @@
 
 #include <mpi_failure_tolerance/starpu_mpi_checkpoint.h>
 #include <mpi_failure_tolerance/starpu_mpi_checkpoint_template.h>
+#include <mpi_failure_tolerance/starpu_mpi_ft_service_comms.h>
 #include <mpi_failure_tolerance/starpu_mpi_checkpoint_package.h>
 #include <sys/param.h>
 #include <starpu_mpi_private.h>
@@ -116,6 +117,15 @@ int _starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template_t* c
 	return 0;
 }
 
+int starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template_t* cp_template, int cp_id, ...)
+{
+	va_list varg_list;
+	va_start(varg_list, cp_id);
+	int ret = _starpu_mpi_checkpoint_template_register(cp_template, cp_id, varg_list);
+	va_end(varg_list);
+	return ret;
+}
+
 int _checkpoint_template_digest_ack_reception(int checkpoint_id, int checkpoint_instance) {
 	starpu_pthread_mutex_lock(&cp_template_mutex);
 	for (int i=0 ; i<cp_template_number ; i++)

+ 4 - 4
mpi/src/mpi_failure_tolerance/starpu_mpi_ft.c

@@ -17,6 +17,7 @@
 #include <starpu_mpi_private.h>
 #include <mpi_failure_tolerance/starpu_mpi_checkpoint_template.h>
 #include <mpi_failure_tolerance/starpu_mpi_checkpoint.h>
+#include <mpi_failure_tolerance/starpu_mpi_ft_service_comms.h>
 
 
 starpu_pthread_mutex_t           ft_mutex;
@@ -25,7 +26,7 @@ int                              my_rank;
 int starpu_mpi_ft_turn_on(void)
 {
 	starpu_pthread_mutex_init(&ft_mutex, NULL);
-	starpu_mpi_checkpoint_lib_init();
+	starpu_mpi_ft_service_lib_init();
 	starpu_mpi_comm_rank(MPI_COMM_WORLD, &my_rank); //TODO: check compatibility with several Comms behaviour
 	checkpoint_template_lib_init();
 	return 0;
@@ -41,11 +42,10 @@ int starpu_mpi_ft_turn_off(void)
 
 void starpu_mpi_ft_progress(void)
 {
-	starpu_mpi_test_ft_detached_requests();
+	starpu_mpi_test_ft_detached_service_requests();
 }
 
-
 int starpu_mpi_ft_busy()
 {
-	starpu_mpi_checkpoint_lib_busy();
+	return starpu_mpi_ft_service_lib_busy();
 }

+ 259 - 0
mpi/src/mpi_failure_tolerance/starpu_mpi_ft_service_comms.c

@@ -0,0 +1,259 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2013-2020  Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+
+#include <stdarg.h>
+#include <stdlib.h>
+#include <common/utils.h>
+
+#include <mpi_failure_tolerance/starpu_mpi_checkpoint.h>
+#include <mpi_failure_tolerance/starpu_mpi_checkpoint_template.h>
+#include <mpi_failure_tolerance/starpu_mpi_checkpoint_package.h>
+#include <sys/param.h>
+#include <starpu_mpi_private.h>
+#include <mpi/starpu_mpi_mpi_backend.h> // Should be deduced at preprocessing (Nmad vs MPI)
+#include <mpi/starpu_mpi_mpi.h>
+#include "starpu_mpi_cache.h"
+
+
+static struct _starpu_mpi_req_list detached_ft_service_requests;
+static unsigned detached_send_n_ft_service_requests;
+static starpu_pthread_mutex_t detached_ft_service_requests_mutex;
+
+int _ft_service_msg_recv_send_common(void* msg, int count, int rank, int tag, int req_type, MPI_Comm comm, void (*callback)(void *), void* arg)
+{
+	struct _starpu_mpi_req* req;
+
+	/* Check if the tag is a service message */
+	STARPU_ASSERT_MSG(tag==_STARPU_MPI_TAG_CP_ACK, "Only _STARPU_MPI_TAG_CP_ACK are service msgs.");
+
+	/* Initialize the request structure */
+	_starpu_mpi_request_init(&req);
+	req->request_type = req_type;
+//	/* prio_list is sorted by increasing values */
+//	//TODO: Check compatibility with prio
+//	if (_starpu_mpi_use_prio)
+//		req->prio = 0;
+	req->data_handle = NULL;
+	req->node_tag.node.rank = rank;
+	req->node_tag.data_tag = tag;
+	req->node_tag.node.comm = comm;
+	req->detached = 1;
+	req->ptr = msg;
+	req->sync = 0;
+	req->datatype = MPI_BYTE;
+	req->callback = callback;
+	req->callback_arg = arg;
+	req->func = NULL;
+	req->sequential_consistency = 1;
+	req->count = sizeof(struct _starpu_mpi_cp_ack_msg);
+
+	_mpi_backend._starpu_mpi_backend_request_fill(req, comm, 0);
+
+	STARPU_PTHREAD_MUTEX_LOCK(&detached_ft_service_requests_mutex);
+	if (req_type==SEND_REQ) {
+		MPI_Isend(req->ptr, req->count, req->datatype, req->node_tag.node.rank, req->node_tag.data_tag,
+		          req->node_tag.node.comm, &req->backend->data_request);
+	}
+	else if (req_type==RECV_REQ) {
+		MPI_Irecv(req->ptr, req->count, req->datatype, req->node_tag.node.rank, req->node_tag.data_tag,
+		          req->node_tag.node.comm, &req->backend->data_request);
+	}
+	_starpu_mpi_req_list_push_back(&detached_ft_service_requests, req);
+	fprintf(stderr, "pushed service req: %p in list %p - prev: %p - next: %p - dest:%d - tag:%d\n", req, &detached_ft_service_requests, _starpu_mpi_req_list_prev(req), _starpu_mpi_req_list_next(req), req->node_tag.node.rank, (int)req->node_tag.data_tag);
+	if (req_type==SEND_REQ) {
+		detached_send_n_ft_service_requests++;
+	}
+	req->submitted = 1;
+
+	STARPU_PTHREAD_MUTEX_UNLOCK(&detached_ft_service_requests_mutex);
+
+	_starpu_mpi_wake_up_progress_thread();
+}
+
+inline int _ft_service_msg_isend_cb(void* msg, int count, int rank, int tag, MPI_Comm comm, void (*callback)(void *), void* arg)
+{
+
+	return _ft_service_msg_recv_send_common(msg, count, rank, tag, SEND_REQ, comm, callback, arg);
+}
+
+int _ft_service_msg_irecv_cb(void* msg, int count, int rank, int tag, MPI_Comm comm, void (*callback)(void *), void* arg)
+{
+
+	return _ft_service_msg_recv_send_common(msg, count, rank, tag, SEND_REQ, comm, callback, arg);
+}
+
+static void _starpu_mpi_handle_ft_request_termination(struct _starpu_mpi_req *req)
+{
+	_STARPU_MPI_LOG_IN();
+
+	_STARPU_MPI_DEBUG(2, "complete MPI request %p type %s tag %"PRIi64" src %d data %p ptr %p datatype '%s' count %d registered_datatype %d internal_req %p\n",
+			req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.node.rank, req->data_handle, req->ptr,
+			req->datatype_name, (int)req->count, req->registered_datatype, req->backend->internal_req);
+
+	if (req->backend->internal_req)
+	{
+		free(req->backend->early_data_handle);
+		req->backend->early_data_handle = NULL;
+	}
+	else
+	{
+		if (req->request_type == RECV_REQ || req->request_type == SEND_REQ)
+		{
+			if (req->registered_datatype == 0)
+			{
+				if (req->request_type == SEND_REQ)
+				{
+					// We need to make sure the communication for sending the size
+					// has completed, as MPI can re-order messages, let's call
+					// MPI_Wait to make sure data have been sent
+					int ret;
+					ret = MPI_Wait(&req->backend->size_req, MPI_STATUS_IGNORE);
+					STARPU_MPI_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Wait returning %s", _starpu_mpi_get_mpi_error_code(ret));
+					starpu_free_on_node_flags(STARPU_MAIN_RAM, (uintptr_t)req->ptr, req->count, 0);
+					req->ptr = NULL;
+				}
+				else if (req->request_type == RECV_REQ)
+				{
+					// req->ptr is freed by starpu_data_unpack
+					starpu_data_unpack(req->data_handle, req->ptr, req->count);
+					starpu_memory_deallocate(STARPU_MAIN_RAM, req->count);
+				}
+			}
+			else
+			{
+				//_starpu_mpi_datatype_free(req->data_handle, &req->datatype);
+			}
+		}
+		_STARPU_MPI_TRACE_TERMINATED(req, req->node_tag.node.rank, req->node_tag.data_tag);
+	}
+
+	_starpu_mpi_release_req_data(req);
+
+	if (req->backend->envelope)
+	{
+		free(req->backend->envelope);
+		req->backend->envelope = NULL;
+	}
+
+	/* Execute the specified callback, if any */
+	if (req->callback)
+		req->callback(req->callback_arg);
+
+	/* tell anyone potentially waiting on the request that it is
+	 * terminated now */
+	STARPU_PTHREAD_MUTEX_LOCK(&req->backend->req_mutex);
+	req->completed = 1;
+	STARPU_PTHREAD_COND_BROADCAST(&req->backend->req_cond);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&req->backend->req_mutex);
+	_STARPU_MPI_LOG_OUT();
+}
+
+void starpu_mpi_test_ft_detached_service_requests(void)
+{
+	//_STARPU_MPI_LOG_IN();
+	int flag;
+	struct _starpu_mpi_req *req;
+
+	STARPU_PTHREAD_MUTEX_LOCK(&detached_ft_service_requests_mutex);
+
+	if (_starpu_mpi_req_list_empty(&detached_ft_service_requests))
+	{
+		STARPU_PTHREAD_MUTEX_UNLOCK(&detached_ft_service_requests_mutex);
+		//_STARPU_MPI_LOG_OUT();
+		return;
+	}
+
+	_STARPU_MPI_TRACE_TESTING_DETACHED_BEGIN();
+	req = _starpu_mpi_req_list_begin(&detached_ft_service_requests);
+	while (req != _starpu_mpi_req_list_end(&detached_ft_service_requests))
+	{
+		STARPU_PTHREAD_MUTEX_UNLOCK(&detached_ft_service_requests_mutex);
+
+		_STARPU_MPI_TRACE_TEST_BEGIN(req->node_tag.node.rank, req->node_tag.data_tag);
+		//_STARPU_MPI_DEBUG(3, "Test detached request %p - mpitag %"PRIi64" - TYPE %s %d\n", &req->backend->data_request, req->node_tag.data_tag, _starpu_mpi_request_type(req->request_type), req->node_tag.node.rank);
+#ifdef STARPU_SIMGRID
+		req->ret = _starpu_mpi_simgrid_mpi_test(&req->done, &flag);
+#else
+		STARPU_MPI_ASSERT_MSG(req->backend->data_request != MPI_REQUEST_NULL, "Cannot test completion of the request MPI_REQUEST_NULL");
+		req->ret = MPI_Test(&req->backend->data_request, &flag, MPI_STATUS_IGNORE);
+#endif
+
+		STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Test returning %s", _starpu_mpi_get_mpi_error_code(req->ret));
+		_STARPU_MPI_TRACE_TEST_END(req->node_tag.node.rank, req->node_tag.data_tag);
+
+		if (!flag)
+		{
+			req = _starpu_mpi_req_list_next(req);
+		}
+		else
+		{
+			fprintf(stderr, "req success: %d\n", detached_send_n_ft_service_requests);
+			_STARPU_MPI_TRACE_POLLING_END();
+			struct _starpu_mpi_req *next_req;
+			next_req = _starpu_mpi_req_list_next(req);
+
+			_STARPU_MPI_TRACE_COMPLETE_BEGIN(req->request_type, req->node_tag.node.rank, req->node_tag.data_tag);
+
+			STARPU_PTHREAD_MUTEX_LOCK(&detached_ft_service_requests_mutex);
+			if (req->request_type == SEND_REQ)
+				detached_send_n_ft_service_requests--;
+			_starpu_mpi_req_list_erase(&detached_ft_service_requests, req);
+			STARPU_PTHREAD_MUTEX_UNLOCK(&detached_ft_service_requests_mutex);
+			_starpu_mpi_handle_ft_request_termination(req);
+
+			_STARPU_MPI_TRACE_COMPLETE_END(req->request_type, req->node_tag.node.rank, req->node_tag.data_tag);
+
+			STARPU_PTHREAD_MUTEX_LOCK(&req->backend->req_mutex);
+			/* We don't want to free internal non-detached
+			   requests, we need to get their MPI request before
+			   destroying them */
+			if (req->backend->is_internal_req && !req->backend->to_destroy)
+			{
+				/* We have completed the request, let the application request destroy it */
+				req->backend->to_destroy = 1;
+				STARPU_PTHREAD_MUTEX_UNLOCK(&req->backend->req_mutex);
+			}
+			else
+			{
+				STARPU_PTHREAD_MUTEX_UNLOCK(&req->backend->req_mutex);
+				_starpu_mpi_request_destroy(req);
+			}
+
+			req = next_req;
+			_STARPU_MPI_TRACE_POLLING_BEGIN();
+		}
+
+		STARPU_PTHREAD_MUTEX_LOCK(&detached_ft_service_requests_mutex);
+	}
+	_STARPU_MPI_TRACE_TESTING_DETACHED_END();
+
+	STARPU_PTHREAD_MUTEX_UNLOCK(&detached_ft_service_requests_mutex);
+	//_STARPU_MPI_LOG_OUT();
+}
+
+int starpu_mpi_ft_service_lib_init()
+{
+	_starpu_mpi_req_list_init(&detached_ft_service_requests);
+	starpu_pthread_mutex_init(&detached_ft_service_requests_mutex, NULL);
+
+	return 0;
+}
+
+int starpu_mpi_ft_service_lib_busy()
+{
+	return !_starpu_mpi_req_list_empty(&detached_ft_service_requests);
+}

+ 36 - 0
mpi/src/mpi_failure_tolerance/starpu_mpi_ft_service_comms.h

@@ -0,0 +1,36 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2013-2020  Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#ifndef FT_STARPU_STARPU_MPI_FT_SERVICE_COMMS_H
+#define FT_STARPU_STARPU_MPI_FT_SERVICE_COMMS_H
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
+int _ft_service_msg_isend_cb(void* msg, int count, int rank, int tag, MPI_Comm comm, void (*callback)(void *), void* arg);
+int _ft_service_msg_irecv_cb(void* msg, int count, int rank, int tag, MPI_Comm comm, void (*callback)(void *), void* arg);
+
+void starpu_mpi_test_ft_detached_service_requests(void);
+int starpu_mpi_ft_service_lib_init();
+int starpu_mpi_ft_service_lib_busy();
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif //FT_STARPU_STARPU_MPI_FT_SERVICE_COMMS_H