瀏覽代碼

Implements CP messages ack mechanisms

Romain LION 5 年之前
父節點
當前提交
632844ae83

+ 9 - 1
mpi/src/mpi/starpu_mpi_mpi.c

@@ -42,6 +42,9 @@
 #include <core/task.h>
 #include <core/topology.h>
 #include <core/workers.h>
+#ifdef STARPU_USE_MPI_FT
+#include <starpu_mpi_checkpoint.h>
+#endif // STARPU_USE_MPI_FT
 
 #ifdef STARPU_USE_MPI_MPI
 
@@ -1268,7 +1271,9 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 #endif
 		/* shall we block ? */
 		unsigned block = _starpu_mpi_req_list_empty(&ready_recv_requests) && _starpu_mpi_req_prio_list_empty(&ready_send_requests) && _starpu_mpi_early_request_count() == 0 && _starpu_mpi_sync_data_count() == 0 && _starpu_mpi_req_list_empty(&detached_requests);
-
+#ifdef STARPU_USE_MPI_FT
+		block = block && !starpu_mpi_ft_busy();
+#endif // STARPU_USE_MPI_FT
 		if (block)
 		{
 			_STARPU_MPI_DEBUG(3, "NO MORE REQUESTS TO HANDLE\n");
@@ -1478,6 +1483,9 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 				//_STARPU_MPI_DEBUG(4, "Nothing received, continue ..\n");
 			}
 		}
+#ifdef STARPU_USE_MPI_FT
+		starpu_mpi_ft_progress();
+#endif // STARPU_USE_MPI_FT
 #ifdef STARPU_SIMGRID
 		STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
 		starpu_pthread_wait_wait(&_starpu_mpi_thread_wait);

+ 6 - 0
mpi/src/mpi/starpu_mpi_mpi_backend.h

@@ -32,6 +32,12 @@ extern int _starpu_mpi_tag;
 #define _STARPU_MPI_TAG_DATA      _starpu_mpi_tag+1
 #define _STARPU_MPI_TAG_SYNC_DATA _starpu_mpi_tag+2
 
+#ifdef STARPU_USE_MPI_FT
+#define _STARPU_MPI_TAG_CP_ACK    _starpu_mpi_tag+3
+#define _STARPU_MPI_TAG_CP_RCVRY  _starpu_mpi_tag+4
+#define _STARPU_MPI_TAG_EXT_DATA  _starpu_mpi_tag+5
+#endif // STARPU_USE_MPI_FT
+
 enum _starpu_envelope_mode
 {
 	_STARPU_MPI_ENVELOPE_DATA=0,

+ 350 - 19
mpi/src/starpu_mpi_checkpoint.c

@@ -22,6 +22,8 @@
 #include <starpu_mpi_checkpoint.h>
 #include <sys/param.h>
 #include <starpu_mpi_private.h>
+#include <mpi/starpu_mpi_mpi_backend.h> // Should be deduced at preprocessing (Nmad vs MPI)
+#include "starpu_mpi_cache.h"
 
 #define MAX_CP_TEMPLATE_NUMBER 32 // Arbitrary limit
 
@@ -30,6 +32,14 @@ starpu_mpi_checkpoint_template_t cp_template_array[MAX_CP_TEMPLATE_NUMBER];
 int                              my_rank;
 int cp_template_number = 0;
 
+static struct _starpu_mpi_req_list detached_ft_service_requests;
+static unsigned detached_send_n_ft_service_requests;
+static starpu_pthread_mutex_t detached_ft_service_requests_mutex;
+
+void _starpu_mpi_post_cp_ack_recv_cb(void* _args);
+void _starpu_mpi_post_cp_ack_send_cb(void* _args);
+void _starpu_mpi_treat_cache_ack_no_lock_cb(void* args);
+
 extern struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle, int source, starpu_mpi_tag_t data_tag, MPI_Comm comm, unsigned detached, unsigned sync, void (*callback)(void *), void *arg, int sequential_consistency, int is_internal_req, starpu_ssize_t count);
 extern struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t data_handle, int dest, starpu_mpi_tag_t data_tag, MPI_Comm comm, unsigned detached, unsigned sync, int prio, void (*callback)(void *), void *arg, int sequential_consistency);
 
@@ -37,7 +47,7 @@ extern struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t dat
 static int _starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template_t* cp_template, int cp_id, va_list varg_list)
 {
 	int arg_type;
-	void* useless;
+	//void* useless;
 	void* ptr;
 	int count;
 	int backup_rank;
@@ -99,9 +109,42 @@ static int _starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_templa
 	return 0;
 }
 
-void print_received_value(void* handle)
+struct _starpu_mpi_req* _starpu_mpi_irecv_cache_aware(starpu_data_handle_t data_handle, int source, starpu_mpi_tag_t data_tag, MPI_Comm comm, unsigned detached, unsigned sync, void (*callback)(void *), void *arg, void (*alt_callback)(void *), void *alt_arg, int sequential_consistency, int is_internal_req, starpu_ssize_t count)
 {
-	fprintf(stderr, "Node %d - I received backup value:%d\n", my_rank, *(int*)starpu_data_handle_to_pointer(*(starpu_data_handle_t*)handle, STARPU_MAIN_RAM));
+	struct _starpu_mpi_req* req = NULL;
+	int already_received = _starpu_mpi_cache_received_data_set(data_handle);
+	if (already_received == 0)
+	{
+		if (data_tag == -1)
+			_STARPU_ERROR("StarPU needs to be told the MPI tag of this data, using starpu_mpi_data_register\n");
+		_STARPU_MPI_DEBUG(1, "Receiving data %p from %d\n", data_handle, source);
+		req = _starpu_mpi_irecv_common(data_handle, source, data_tag, comm, detached, sync, callback, (void*)arg, sequential_consistency, is_internal_req, count);
+	}
+	else
+	{
+		fprintf(stderr, "STARPU CACHE: Data already received\n");
+		alt_callback(alt_arg);
+	}
+	return req;
+}
+
+struct _starpu_mpi_req* _starpu_mpi_isend_cache_aware(starpu_data_handle_t data_handle, int dest, starpu_mpi_tag_t data_tag, MPI_Comm comm, unsigned detached, unsigned sync, int prio, void (*callback)(void *), void *arg, void (*alt_callback)(void *), void *alt_arg, int sequential_consistency)
+{
+	struct _starpu_mpi_req* req = NULL;
+	int already_sent = _starpu_mpi_cache_sent_data_set(data_handle, dest);
+	if (already_sent == 0)
+	{
+		if (data_tag == -1)
+			_STARPU_ERROR("StarPU needs to be told the MPI tag of this data, using starpu_mpi_data_register\n");
+		_STARPU_MPI_DEBUG(1, "Receiving data %p from %d\n", data_handle, mpi_rank);
+		req = _starpu_mpi_isend_common(data_handle, dest, data_tag, comm, detached, sync, prio, callback, (void*)arg, sequential_consistency);
+	}
+	else
+	{
+		fprintf(stderr, "STARPU CACHE: Data already sent\n");
+		alt_callback(alt_arg);
+	}
+	return req;
 }
 
 int _starpu_mpi_checkpoint_template_submit(starpu_mpi_checkpoint_template_t cp_template)
@@ -113,7 +156,8 @@ int _starpu_mpi_checkpoint_template_submit(starpu_mpi_checkpoint_template_t cp_t
 	starpu_pthread_mutex_lock(&cp_template->mutex);
 	STARPU_ASSERT_MSG(cp_template->pending==0, "Can not submit a checkpoint while previous instance has not succeeded.\n");
 
-	cp_template->pending = 1;
+	cp_template->pending               = 1;
+	cp_template->remaining_ack_awaited = cp_template->message_number;
 
 	item = _starpu_mpi_checkpoint_template_get_first_data(cp_template);
 	fprintf(stderr, "begin iter\n");
@@ -133,12 +177,20 @@ int _starpu_mpi_checkpoint_template_submit(starpu_mpi_checkpoint_template_t cp_t
 				if (starpu_mpi_data_get_rank(*handle)==my_rank)
 				{
 					fprintf(stderr,"sending to %d (tag %d)\n", item->backup_rank, (int)starpu_mpi_data_get_tag(*handle));
-					_starpu_mpi_isend_common(*handle, item->backup_rank, starpu_mpi_data_get_tag(*handle), MPI_COMM_WORLD, 1, 0, 0, NULL, NULL, 1);
+					struct _starpu_mpi_cp_ack_arg_cb* arg = calloc(1, sizeof(struct _starpu_mpi_cp_ack_arg_cb));
+					arg->rank = item->backup_rank;
+					arg->msg.checkpoint_id = cp_template->cp_template_id;
+					arg->msg.checkpoint_instance = cp_template->cp_template_current_instance;
+					_starpu_mpi_isend_cache_aware(*handle, item->backup_rank, starpu_mpi_data_get_tag(*handle), MPI_COMM_WORLD, 1, 0, 0, &_starpu_mpi_post_cp_ack_recv_cb, (void*)arg, &_starpu_mpi_treat_cache_ack_no_lock_cb, (void*)cp_template, 1);
 				}
 				else if (item->backup_rank==my_rank)
 				{
 					fprintf(stderr,"recving from %d (tag %d)\n", starpu_mpi_data_get_rank(*handle), (int)starpu_mpi_data_get_tag(*handle));
-					_starpu_mpi_irecv_common(*handle, starpu_mpi_data_get_rank(*handle), starpu_mpi_data_get_tag(*handle), MPI_COMM_WORLD, 1, 0, &print_received_value, (void*)handle, 1, 1, 1);
+					struct _starpu_mpi_cp_ack_arg_cb* arg = calloc(1, sizeof(struct _starpu_mpi_cp_ack_arg_cb));
+					arg->rank = starpu_mpi_data_get_rank(*handle);
+					arg->msg.checkpoint_id = cp_template->cp_template_id;
+					arg->msg.checkpoint_instance = cp_template->cp_template_current_instance;
+					_starpu_mpi_irecv_cache_aware(*handle, starpu_mpi_data_get_rank(*handle), starpu_mpi_data_get_tag(*handle), MPI_COMM_WORLD, 1, 0, &_starpu_mpi_post_cp_ack_send_cb, (void*)arg, NULL, NULL, 1, 1, 1);
 				}
 				break;
 		}
@@ -151,18 +203,19 @@ int _starpu_mpi_checkpoint_template_submit(starpu_mpi_checkpoint_template_t cp_t
 	return 0;
 }
 
-/**
- * receives param of type starpu_mpi_checkpoint_template_t
- * @param args
- * @return
- */
-void* _starpu_mpi_checkpoint_ack_send_cb(void* args)
-{
-	starpu_mpi_checkpoint_template_t cp_template = (starpu_mpi_checkpoint_template_t) args;
-	starpu_pthread_mutex_lock(&cp_template->mutex);
-	cp_template->current_send_number--;
-	starpu_pthread_mutex_unlock(&cp_template->mutex);
-}
+//
+///**
+// * receives param of type starpu_mpi_checkpoint_template_t
+// * @param args
+// * @return
+// */
+//void _starpu_mpi_checkpoint_ack_send_cb(void* args)
+//{
+//	starpu_mpi_checkpoint_template_t cp_template = (starpu_mpi_checkpoint_template_t) args;
+//	starpu_pthread_mutex_lock(&cp_template->mutex);
+//	cp_template->remaining_ack_awaited--;
+//	starpu_pthread_mutex_unlock(&cp_template->mutex);
+//}
 
 // For test purpose
 int _starpu_mpi_checkpoint_template_print(starpu_mpi_checkpoint_template_t cp_template)
@@ -183,7 +236,6 @@ int _starpu_mpi_checkpoint_template_print(starpu_mpi_checkpoint_template_t cp_te
 		{
 			val = *(int*)starpu_data_handle_to_pointer(*(starpu_data_handle_t*)(item->ptr), 0);
 			fprintf(stderr, "STARPU_R - Value=%d\n", val);
-
 		}
 		else if (item->type == STARPU_DATA_ARRAY)
 		{
@@ -209,6 +261,8 @@ int _starpu_mpi_checkpoint_template_print(starpu_mpi_checkpoint_template_t cp_te
 int starpu_mpi_checkpoint_turn_on(void)
 {
 	starpu_pthread_mutex_init(&cp_template_mutex, NULL);
+	_starpu_mpi_req_list_init(&detached_ft_service_requests);
+	starpu_pthread_mutex_init(&detached_ft_service_requests_mutex, NULL);
 	starpu_mpi_comm_rank(MPI_COMM_WORLD, &my_rank); //TODO: check compatibility with several Comms behaviour
 	return 0;
 }
@@ -225,6 +279,8 @@ int starpu_mpi_checkpoint_turn_off(void)
 		cp_template_array[i] = NULL;
 	}
 	starpu_pthread_mutex_destroy(&cp_template_mutex);
+
+	return 0;
 }
 
 int starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template_t* cp_template, int cp_id, ...)
@@ -240,3 +296,278 @@ int starpu_mpi_checkpoint_template_submit(starpu_mpi_checkpoint_template_t cp_te
 {
 	return _starpu_mpi_checkpoint_template_submit(cp_template);
 }
+
+void _print_ack_sent_cb(void* _args)
+{
+	struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
+	fprintf(stderr, "Sent succeeded cpid:%d, cpinst:%d, dest:%d\n", arg->msg.checkpoint_id, arg->msg.checkpoint_instance, arg->rank);
+	free(_args);
+}
+
+void _starpu_mpi_treat_cache_ack_no_lock_cb(void* args)
+{
+	starpu_mpi_checkpoint_template_t cp_template = (starpu_mpi_checkpoint_template_t)args;
+	cp_template->remaining_ack_awaited--;
+}
+
+void _starpu_mpi_treat_ack_receipt_cb(void* _args)
+{
+	struct _starpu_mpi_cp_ack_msg* msg = (struct _starpu_mpi_cp_ack_msg*) _args;
+	starpu_pthread_mutex_lock(&cp_template_mutex);
+	for (int i=0 ; i<cp_template_number ; i++)
+	{
+		starpu_pthread_mutex_lock(&cp_template_array[i]->mutex);
+		if (cp_template_array[i]->cp_template_id == msg->checkpoint_id && cp_template_array[i]->cp_template_current_instance == msg->checkpoint_instance)
+		{
+			cp_template_array[i]->remaining_ack_awaited--;
+			if (cp_template_array[i]->remaining_ack_awaited == 0)
+			{
+				// TODO: share info about cp integrity
+				fprintf(stderr, "All cp material for cpid:%d, cpinst:%d - have been sent and acknowledged.\n", msg->checkpoint_id, msg->checkpoint_instance);
+				cp_template_array[i]->pending=0;
+			}
+			free(msg);
+			starpu_pthread_mutex_unlock(&cp_template_array[i]->mutex);
+			starpu_pthread_mutex_unlock(&cp_template_mutex);
+			return;
+		}
+		starpu_pthread_mutex_unlock(&cp_template_array[i]->mutex);
+	}
+	starpu_pthread_mutex_unlock(&cp_template_mutex);
+}
+
+void _starpu_mpi_post_cp_ack_send_cb(void* _args)
+{
+	struct _starpu_mpi_req* req;
+	struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
+
+	fprintf(stderr, "Send cb\n");
+
+	/* Initialize the request structure */
+	_starpu_mpi_request_init(&req);
+	req->request_type = SEND_REQ;
+	/* prio_list is sorted by increasing values */
+	if (_starpu_mpi_use_prio)
+		req->prio = 0;
+	req->data_handle = NULL;
+	req->node_tag.node.rank = arg->rank;
+	req->node_tag.data_tag = _STARPU_MPI_TAG_CP_ACK;
+	req->node_tag.node.comm = MPI_COMM_WORLD;
+	req->detached = 1;
+	req->ptr = (void*)&arg->msg;
+	req->sync = 0;
+	req->datatype = MPI_BYTE;
+	req->callback = _print_ack_sent_cb;
+	req->callback_arg = arg;
+	req->func = NULL;
+	req->sequential_consistency = 1;
+	req->count = sizeof(struct _starpu_mpi_cp_ack_msg);
+
+	_mpi_backend._starpu_mpi_backend_request_fill(req, MPI_COMM_WORLD, 0);
+
+	STARPU_PTHREAD_MUTEX_LOCK(&detached_ft_service_requests_mutex);
+
+	MPI_Isend(req->ptr, req->count, req->datatype, req->node_tag.node.rank, req->node_tag.data_tag, req->node_tag.node.comm, &req->backend->data_request);
+	_starpu_mpi_req_list_push_back(&detached_ft_service_requests, req);
+	fprintf(stderr, "pushed send: %p in list %p - prev: %p - next: %p - dest:%d - tag:%d\n", req, &detached_ft_service_requests, _starpu_mpi_req_list_prev(req), _starpu_mpi_req_list_next(req), req->node_tag.node.rank, (int)req->node_tag.data_tag);
+	detached_send_n_ft_service_requests++;
+	req->submitted = 1;
+
+	STARPU_PTHREAD_MUTEX_UNLOCK(&detached_ft_service_requests_mutex);
+}
+
+void _starpu_mpi_post_cp_ack_recv_cb(void* _args)
+{
+	struct _starpu_mpi_req* req;
+	struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
+	/* Initialize the request structure */
+	_starpu_mpi_request_init(&req);
+	req->request_type = RECV_REQ;
+	/* prio_list is sorted by increasing values */
+	if (_starpu_mpi_use_prio)
+		req->prio = 0;
+	req->data_handle = NULL;
+	req->node_tag.node.rank = arg->rank;
+	req->node_tag.data_tag = _STARPU_MPI_TAG_CP_ACK;
+	req->node_tag.node.comm = MPI_COMM_WORLD;
+	req->detached = 1;
+	req->ptr = malloc(sizeof(struct _starpu_mpi_cp_ack_msg));
+	req->sync = 0;
+	req->datatype = MPI_BYTE;
+	req->callback = _starpu_mpi_treat_ack_receipt_cb;
+	req->callback_arg = req->ptr;
+	req->func = NULL;
+	req->sequential_consistency = 1;
+	req->count = sizeof(struct _starpu_mpi_cp_ack_msg);
+
+	_mpi_backend._starpu_mpi_backend_request_fill(req, MPI_COMM_WORLD, 0);
+
+	STARPU_PTHREAD_MUTEX_LOCK(&detached_ft_service_requests_mutex);
+
+	MPI_Irecv(req->ptr, req->count, req->datatype, req->node_tag.node.rank, req->node_tag.data_tag, req->node_tag.node.comm, &req->backend->data_request);
+	_starpu_mpi_req_list_push_back(&detached_ft_service_requests, req);
+	fprintf(stderr, "pushed recv: %p in list %p - prev: %p - next: %p - src:%d - tag:%d\n", req, &detached_ft_service_requests, _starpu_mpi_req_list_prev(req), _starpu_mpi_req_list_next(req), req->node_tag.node.rank, (int)req->node_tag.data_tag);
+	req->submitted = 1;
+
+	STARPU_PTHREAD_MUTEX_UNLOCK(&detached_ft_service_requests_mutex);
+}
+
+static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req)
+{
+	_STARPU_MPI_LOG_IN();
+
+	_STARPU_MPI_DEBUG(2, "complete MPI request %p type %s tag %"PRIi64" src %d data %p ptr %p datatype '%s' count %d registered_datatype %d internal_req %p\n",
+	                  req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.node.rank, req->data_handle, req->ptr,
+	                  req->datatype_name, (int)req->count, req->registered_datatype, req->backend->internal_req);
+
+	if (req->backend->internal_req)
+	{
+		free(req->backend->early_data_handle);
+		req->backend->early_data_handle = NULL;
+	}
+	else
+	{
+		if (req->request_type == RECV_REQ || req->request_type == SEND_REQ)
+		{
+			if (req->registered_datatype == 0)
+			{
+				if (req->request_type == SEND_REQ)
+				{
+					// We need to make sure the communication for sending the size
+					// has completed, as MPI can re-order messages, let's call
+					// MPI_Wait to make sure data have been sent
+					int ret;
+					ret = MPI_Wait(&req->backend->size_req, MPI_STATUS_IGNORE);
+					STARPU_MPI_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Wait returning %s", _starpu_mpi_get_mpi_error_code(ret));
+					starpu_free_on_node_flags(STARPU_MAIN_RAM, (uintptr_t)req->ptr, req->count, 0);
+					req->ptr = NULL;
+				}
+				else if (req->request_type == RECV_REQ)
+				{
+					// req->ptr is freed by starpu_data_unpack
+					starpu_data_unpack(req->data_handle, req->ptr, req->count);
+					starpu_memory_deallocate(STARPU_MAIN_RAM, req->count);
+				}
+			}
+			else
+			{
+				//_starpu_mpi_datatype_free(req->data_handle, &req->datatype);
+			}
+		}
+		_STARPU_MPI_TRACE_TERMINATED(req, req->node_tag.node.rank, req->node_tag.data_tag);
+	}
+
+	_starpu_mpi_release_req_data(req);
+
+	if (req->backend->envelope)
+	{
+		free(req->backend->envelope);
+		req->backend->envelope = NULL;
+	}
+
+	/* Execute the specified callback, if any */
+	if (req->callback)
+		req->callback(req->callback_arg);
+
+	/* tell anyone potentially waiting on the request that it is
+	 * terminated now */
+	STARPU_PTHREAD_MUTEX_LOCK(&req->backend->req_mutex);
+	req->completed = 1;
+	STARPU_PTHREAD_COND_BROADCAST(&req->backend->req_cond);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&req->backend->req_mutex);
+	_STARPU_MPI_LOG_OUT();
+}
+
+static void _starpu_mpi_test_ft_detached_requests(void)
+{
+	//_STARPU_MPI_LOG_IN();
+	int flag;
+	struct _starpu_mpi_req *req;
+
+	STARPU_PTHREAD_MUTEX_LOCK(&detached_ft_service_requests_mutex);
+
+	if (_starpu_mpi_req_list_empty(&detached_ft_service_requests))
+	{
+		STARPU_PTHREAD_MUTEX_UNLOCK(&detached_ft_service_requests_mutex);
+		//_STARPU_MPI_LOG_OUT();
+		return;
+	}
+
+	_STARPU_MPI_TRACE_TESTING_DETACHED_BEGIN();
+	req = _starpu_mpi_req_list_begin(&detached_ft_service_requests);
+	while (req != _starpu_mpi_req_list_end(&detached_ft_service_requests))
+	{
+		STARPU_PTHREAD_MUTEX_UNLOCK(&detached_ft_service_requests_mutex);
+
+		_STARPU_MPI_TRACE_TEST_BEGIN(req->node_tag.node.rank, req->node_tag.data_tag);
+		//_STARPU_MPI_DEBUG(3, "Test detached request %p - mpitag %"PRIi64" - TYPE %s %d\n", &req->backend->data_request, req->node_tag.data_tag, _starpu_mpi_request_type(req->request_type), req->node_tag.node.rank);
+#ifdef STARPU_SIMGRID
+		req->ret = _starpu_mpi_simgrid_mpi_test(&req->done, &flag);
+#else
+		STARPU_MPI_ASSERT_MSG(req->backend->data_request != MPI_REQUEST_NULL, "Cannot test completion of the request MPI_REQUEST_NULL");
+		req->ret = MPI_Test(&req->backend->data_request, &flag, MPI_STATUS_IGNORE);
+#endif
+
+		STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Test returning %s", _starpu_mpi_get_mpi_error_code(req->ret));
+		_STARPU_MPI_TRACE_TEST_END(req->node_tag.node.rank, req->node_tag.data_tag);
+
+		if (!flag)
+		{
+			req = _starpu_mpi_req_list_next(req);
+		}
+		else
+		{
+			fprintf(stderr, "req success: %d\n", detached_send_n_ft_service_requests);
+			_STARPU_MPI_TRACE_POLLING_END();
+			struct _starpu_mpi_req *next_req;
+			next_req = _starpu_mpi_req_list_next(req);
+
+			_STARPU_MPI_TRACE_COMPLETE_BEGIN(req->request_type, req->node_tag.node.rank, req->node_tag.data_tag);
+
+			STARPU_PTHREAD_MUTEX_LOCK(&detached_ft_service_requests_mutex);
+			if (req->request_type == SEND_REQ)
+				detached_send_n_ft_service_requests--;
+			_starpu_mpi_req_list_erase(&detached_ft_service_requests, req);
+			STARPU_PTHREAD_MUTEX_UNLOCK(&detached_ft_service_requests_mutex);
+			_starpu_mpi_handle_request_termination(req);
+
+			_STARPU_MPI_TRACE_COMPLETE_END(req->request_type, req->node_tag.node.rank, req->node_tag.data_tag);
+
+			STARPU_PTHREAD_MUTEX_LOCK(&req->backend->req_mutex);
+			/* We don't want to free internal non-detached
+			   requests, we need to get their MPI request before
+			   destroying them */
+			if (req->backend->is_internal_req && !req->backend->to_destroy)
+			{
+				/* We have completed the request, let the application request destroy it */
+				req->backend->to_destroy = 1;
+				STARPU_PTHREAD_MUTEX_UNLOCK(&req->backend->req_mutex);
+			}
+			else
+			{
+				STARPU_PTHREAD_MUTEX_UNLOCK(&req->backend->req_mutex);
+				_starpu_mpi_request_destroy(req);
+			}
+
+			req = next_req;
+			_STARPU_MPI_TRACE_POLLING_BEGIN();
+		}
+
+		STARPU_PTHREAD_MUTEX_LOCK(&detached_ft_service_requests_mutex);
+	}
+	_STARPU_MPI_TRACE_TESTING_DETACHED_END();
+
+	STARPU_PTHREAD_MUTEX_UNLOCK(&detached_ft_service_requests_mutex);
+	//_STARPU_MPI_LOG_OUT();
+}
+
+void starpu_mpi_ft_progress(void)
+{
+	_starpu_mpi_test_ft_detached_requests();
+}
+
+
+int starpu_mpi_ft_busy()
+{
+	return ! _starpu_mpi_req_list_empty(&detached_ft_service_requests);
+}

+ 28 - 12
mpi/src/starpu_mpi_checkpoint.h

@@ -36,13 +36,14 @@ LIST_TYPE(_starpu_mpi_checkpoint_template_item,
 
 struct _starpu_mpi_checkpoint_template{
     struct _starpu_mpi_checkpoint_template_item_list list;
-    int size;
-    int cp_template_id;
-	int send_number;
-	int current_send_number;
-    int pending;
-    int frozen;
-    starpu_pthread_mutex_t mutex;
+    int                                              size;
+    int                                              cp_template_id;
+    int                                              cp_template_current_instance;
+	int                                              message_number;
+	int                                              remaining_ack_awaited;
+    int                                              pending;
+    int                                              frozen;
+    starpu_pthread_mutex_t                           mutex;
 };
 
 static inline struct _starpu_mpi_checkpoint_template_item* _starpu_mpi_checkpoint_template_item_create(int type, void* ptr, int count, int backup_rank, int backup_of)
@@ -63,6 +64,7 @@ static inline starpu_mpi_checkpoint_template_t _starpu_mpi_checkpoint_template_n
 	starpu_mpi_checkpoint_template_t _cp_template;
 	_STARPU_MPI_CALLOC(_cp_template, 1, sizeof(struct _starpu_mpi_checkpoint_template));
 	_cp_template->cp_template_id = cp_id;
+	_cp_template->cp_template_current_instance = 0;
 	starpu_pthread_mutex_init(&_cp_template->mutex, NULL);
 	return _cp_template;
 }
@@ -97,9 +99,9 @@ static inline int _starpu_mpi_checkpoint_template_freeze(starpu_mpi_checkpoint_t
 {
 	starpu_pthread_mutex_lock(&cp_template->mutex);
 
-	cp_template->frozen = 1;
-	cp_template->send_number = 0;
-	cp_template->size   = _starpu_mpi_checkpoint_template_item_list_size(&cp_template->list);
+	cp_template->frozen         = 1;
+	cp_template->message_number = 0;
+	cp_template->size           = _starpu_mpi_checkpoint_template_item_list_size(&cp_template->list);
 
 	struct _starpu_mpi_checkpoint_template_item* item = _starpu_mpi_checkpoint_template_get_first_data(cp_template);
 
@@ -108,12 +110,12 @@ static inline int _starpu_mpi_checkpoint_template_freeze(starpu_mpi_checkpoint_t
 		switch (item->type)
 		{
 			case STARPU_VALUE:
-				cp_template->send_number++;
+				cp_template->message_number++;
 				break;
 			case STARPU_R:
 				if (starpu_mpi_data_get_rank(*(starpu_data_handle_t *) item->ptr))
 				{
-					cp_template->send_number++;
+					cp_template->message_number++;
 				}
 				break;
 			case STARPU_DATA_ARRAY:
@@ -127,6 +129,18 @@ static inline int _starpu_mpi_checkpoint_template_freeze(starpu_mpi_checkpoint_t
 	return cp_template->size;
 }
 
+struct _starpu_mpi_cp_ack_msg
+{
+	int checkpoint_id;
+	int checkpoint_instance;
+};
+
+struct _starpu_mpi_cp_ack_arg_cb
+{
+	int rank;
+	struct _starpu_mpi_cp_ack_msg msg;
+};
+
 static inline int _starpu_checkpoint_template_free(starpu_mpi_checkpoint_template_t cp_template)
 {
 	struct _starpu_mpi_checkpoint_template_item* item;
@@ -148,6 +162,8 @@ static inline int _starpu_checkpoint_template_free(starpu_mpi_checkpoint_templat
 // For test purpose
 int _starpu_mpi_checkpoint_template_print(starpu_mpi_checkpoint_template_t cp_template);
 
+void starpu_mpi_ft_progress(void);
+int starpu_mpi_ft_busy();
 
 #ifdef __cplusplus
 }

+ 1 - 1
mpi/tests/checkpoints.c

@@ -141,7 +141,7 @@ int test_checkpoint_submit(int argc, char* argv[])
 
 	FPRINTF_MPI(stderr, "Submitted\n");
 	sleep(10);
-
+	FPRINTF_MPI(stderr, "Bye!\n");
 	starpu_shutdown();
 
 	return 0;