Parcourir la source

Factorize files. Compile ok

Romain LION il y a 5 ans
Parent
commit
d5d14b97f4

+ 4 - 4
mpi/include/starpu_mpi_ft.h

@@ -27,7 +27,7 @@ typedef struct _starpu_mpi_checkpoint_template* starpu_mpi_checkpoint_template_t
 
 /**
  * Registers a checkpoint template \p cp_template with the given arguments.
- * It is then ready to use with ::starpu_mpi_checkpoint_template_submit during the program execution.
+ * It is then ready to use with ::starpu_mpi_submit_checkpoint_template during the program execution.
  * A unique checkpoint id \p cp_id is requested from the user in order to
  * match with a corresponding ::starpu_mpi_init_from_checkpoint.
  *
@@ -42,9 +42,9 @@ typedef struct _starpu_mpi_checkpoint_template* starpu_mpi_checkpoint_template_t
  * </ul>
  */
 int starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template_t* cp_template, int cp_id, ...);
-int starpu_mpi_checkpoint_template_submit(starpu_mpi_checkpoint_template_t cp_template);
-int starpu_mpi_checkpoint_turn_on(void);
-int starpu_mpi_checkpoint_turn_off(void);
+int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_template);
+int starpu_mpi_ft_turn_on(void);
+int starpu_mpi_ft_turn_off(void);
 
 #ifdef __cplusplus
 }

+ 8 - 2
mpi/src/Makefile.am

@@ -85,7 +85,10 @@ noinst_HEADERS =					\
 
 if STARPU_USE_MPI_FT
 noinst_HEADERS +=       \
-	starpu_mpi_checkpoint.h
+	mpi_failure_tolerance/starpu_mpi_ft.h   \
+	mpi_failure_tolerance/starpu_mpi_checkpoint.h    \
+	mpi_failure_tolerance/starpu_mpi_checkpoint_template.h
+#	mpi_failure_tolerance/starpu_mpi_checkpoint_package.h
 endif STARPU_USE_MPI_FT
 
 libstarpumpi_@STARPU_EFFECTIVE_VERSION@_la_SOURCES =	\
@@ -121,5 +124,8 @@ libstarpumpi_@STARPU_EFFECTIVE_VERSION@_la_SOURCES =	\
 
 if STARPU_USE_MPI_FT
 libstarpumpi_@STARPU_EFFECTIVE_VERSION@_la_SOURCES +=    \
-	starpu_mpi_checkpoint.c
+	mpi_failure_tolerance/starpu_mpi_ft.c   \
+	mpi_failure_tolerance/starpu_mpi_checkpoint.c    \
+	mpi_failure_tolerance/starpu_mpi_checkpoint_template.c
+ #	mpi_failure_tolerance/starpu_mpi_checkpoint_package.c
 endif STARPU_USE_MPI_FT

+ 1 - 1
mpi/src/mpi/starpu_mpi_mpi.c

@@ -43,7 +43,7 @@
 #include <core/topology.h>
 #include <core/workers.h>
 #ifdef STARPU_USE_MPI_FT
-#include <starpu_mpi_checkpoint.h>
+#include <mpi_failure_tolerance/starpu_mpi_ft.h>
 #endif // STARPU_USE_MPI_FT
 
 #ifdef STARPU_USE_MPI_MPI

+ 20 - 212
mpi/src/starpu_mpi_checkpoint.c

@@ -19,9 +19,9 @@
 #include <stdlib.h>
 #include <common/utils.h>
 
-#include <starpu_mpi_checkpoint.h>
-#include <starpu_mpi_checkpoint_template.h>
-#include <starpu_mpi_checkpoint_package.h>
+#include <mpi_failure_tolerance/starpu_mpi_checkpoint.h>
+#include <mpi_failure_tolerance/starpu_mpi_checkpoint_template.h>
+#include <mpi_failure_tolerance/starpu_mpi_checkpoint_package.h>
 #include <sys/param.h>
 #include <starpu_mpi_private.h>
 #include <mpi/starpu_mpi_mpi_backend.h> // Should be deduced at preprocessing (Nmad vs MPI)
@@ -30,10 +30,8 @@
 
 #define MAX_CP_TEMPLATE_NUMBER 32 // Arbitrary limit
 
-starpu_pthread_mutex_t           cp_template_mutex;
-starpu_mpi_checkpoint_template_t cp_template_array[MAX_CP_TEMPLATE_NUMBER];
+starpu_pthread_mutex_t           cp_lib_mutex;
 int                              my_rank;
-int cp_template_number = 0;
 
 static struct _starpu_mpi_req_list detached_ft_service_requests;
 static unsigned detached_send_n_ft_service_requests;
@@ -48,114 +46,13 @@ void _starpu_mpi_push_cp_ack_recv_cb(void* _args);
 void _starpu_mpi_push_cp_ack_send_cb(void* _args);
 void _starpu_mpi_treat_cache_ack_no_lock_cb(void* args);
 
-extern struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle, int source, starpu_mpi_tag_t data_tag, MPI_Comm comm, unsigned detached, unsigned sync, void (*callback)(void *), void *arg, int sequential_consistency, int is_internal_req, starpu_ssize_t count);
-extern struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t data_handle, int dest, starpu_mpi_tag_t data_tag, MPI_Comm comm, unsigned detached, unsigned sync, int prio, void (*callback)(void *), void *arg, int sequential_consistency);
+//extern struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle, int source, starpu_mpi_tag_t data_tag, MPI_Comm comm, unsigned detached, unsigned sync, void (*callback)(void *), void *arg, int sequential_consistency, int is_internal_req, starpu_ssize_t count);
+//extern struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t data_handle, int dest, starpu_mpi_tag_t data_tag, MPI_Comm comm, unsigned detached, unsigned sync, int prio, void (*callback)(void *), void *arg, int sequential_consistency);
+extern struct _starpu_mpi_req* _starpu_mpi_isend_cache_aware(starpu_data_handle_t data_handle, int dest, starpu_mpi_tag_t data_tag, MPI_Comm comm, unsigned detached, unsigned sync, int prio, void (*callback)(void *), void *_arg, void (*alt_callback)(void *), void *_alt_arg, int sequential_consistency);
+extern struct _starpu_mpi_req* _starpu_mpi_irecv_cache_aware(starpu_data_handle_t data_handle, int source, starpu_mpi_tag_t data_tag, MPI_Comm comm, unsigned detached, unsigned sync, void (*callback)(void *), void *_arg, void (*alt_callback)(void *), void *_alt_arg, int sequential_consistency, int is_internal_req, starpu_ssize_t count);
 
 
-static int _starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template_t* cp_template, int cp_id, va_list varg_list)
-{
-	int arg_type;
-	//void* useless;
-	void* ptr;
-	int count;
-	int backup_rank;
-	int backup_of;
-//	int (*_backup_of)(int);
-//	int (*_backuped_by)(int);
-
-	starpu_mpi_checkpoint_template_t _cp_template = _starpu_mpi_checkpoint_template_new(cp_id);
-
-	va_list varg_list_copy;
-	va_copy(varg_list_copy, varg_list);
-
-	while ((arg_type = va_arg(varg_list_copy, int)) != 0)
-	{
-		STARPU_ASSERT_MSG(!(arg_type & STARPU_COMMUTE), "Unable to checkpoint non sequential task flow.\n");
-
-		switch(arg_type)
-		{
-			case STARPU_R:
-				ptr         = va_arg(varg_list_copy, void*);
-				count       = 1;
-				backup_rank = va_arg(varg_list_copy, int);
-				backup_of   = -1;
-				break;
-			case STARPU_VALUE:
-				ptr         = va_arg(varg_list_copy, void*);
-				count       = va_arg(varg_list_copy, int);
-				backup_rank = va_arg(varg_list_copy, int);
-				backup_of   = va_arg(varg_list_copy, int);
-				break;
-//			case STARPU_DATA_ARRAY:
-//				ptr         = va_arg(varg_list_copy, void*);
-//				count       = va_arg(varg_list_copy, int);
-//				backup_rank = va_arg(varg_list_copy, int);
-//				backup_of   = -1;
-//				break;
-			default:
-				STARPU_ABORT_MSG("Unrecognized argument %d, did you perhaps forget to end arguments with 0?\n", arg_type);
-				break;
-		}
-		_starpu_mpi_checkpoint_template_add_data(_cp_template, arg_type, ptr, count, backup_rank, backup_of);
-	};
-	va_end(varg_list_copy);
-
-	_starpu_mpi_checkpoint_template_freeze(_cp_template);
-
-	starpu_pthread_mutex_lock(&cp_template_mutex);
-	for (int i=0 ; i<cp_template_number ; i++)
-	{
-		STARPU_ASSERT_MSG(cp_template_array[i]->cp_template_id != _cp_template->cp_template_id, "A checkpoint with id %d has already been registered.\n", _cp_template->cp_template_id);
-	}
-
-	cp_template_array[cp_template_number] = _cp_template;
-	cp_template_number++;
-	starpu_pthread_mutex_unlock(&cp_template_mutex);
-
-	*cp_template = _cp_template;
-
-	return 0;
-}
-
-struct _starpu_mpi_req* _starpu_mpi_irecv_cache_aware(starpu_data_handle_t data_handle, int source, starpu_mpi_tag_t data_tag, MPI_Comm comm, unsigned detached, unsigned sync, void (*callback)(void *), void *_arg, void (*alt_callback)(void *), void *_alt_arg, int sequential_consistency, int is_internal_req, starpu_ssize_t count)
-{
-	struct _starpu_mpi_req* req = NULL;
-	int already_received = _starpu_mpi_cache_received_data_set(data_handle);
-	if (already_received == 0)
-	{
-		if (data_tag == -1)
-			_STARPU_ERROR("StarPU needs to be told the MPI tag of this data, using starpu_mpi_data_register\n");
-		_STARPU_MPI_DEBUG(1, "Receiving data %p from %d\n", data_handle, source);
-		req = _starpu_mpi_irecv_common(data_handle, source, data_tag, comm, detached, sync, callback, _arg, sequential_consistency, is_internal_req, count);
-	}
-	else
-	{
-		fprintf(stderr, "STARPU CACHE: Data already received\n");
-		starpu_data_acquire_cb(data_handle, STARPU_R, alt_callback, _alt_arg);
-	}
-	return req;
-}
-
-struct _starpu_mpi_req* _starpu_mpi_isend_cache_aware(starpu_data_handle_t data_handle, int dest, starpu_mpi_tag_t data_tag, MPI_Comm comm, unsigned detached, unsigned sync, int prio, void (*callback)(void *), void *_arg, void (*alt_callback)(void *), void *_alt_arg, int sequential_consistency)
-{
-	struct _starpu_mpi_req* req = NULL;
-	int already_sent = _starpu_mpi_cache_sent_data_set(data_handle, dest);
-	if (already_sent == 0)
-	{
-		if (data_tag == -1)
-			_STARPU_ERROR("StarPU needs to be told the MPI tag of this data, using starpu_mpi_data_register\n");
-		_STARPU_MPI_DEBUG(1, "Receiving data %p from %d\n", data_handle, mpi_rank);
-		req = _starpu_mpi_isend_common(data_handle, dest, data_tag, comm, detached, sync, prio, callback, _arg, sequential_consistency);
-	}
-	else
-	{
-		fprintf(stderr, "STARPU CACHE: Data already sent\n");
-		starpu_data_acquire_cb(data_handle, STARPU_R, alt_callback, _alt_arg);
-	}
-	return req;
-}
-
-int _starpu_mpi_checkpoint_template_submit(starpu_mpi_checkpoint_template_t cp_template)
+int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_template)
 {
 	// TODO: For now checkpoint are not taken asynchronously. It will be later, and then we will have to acquire READ permissions to StarPU in order to not have the data potentially corrupted.
 	starpu_data_handle_t handle;
@@ -229,72 +126,6 @@ int _starpu_mpi_checkpoint_template_submit(starpu_mpi_checkpoint_template_t cp_t
 //	starpu_pthread_mutex_unlock(&cp_template->mutex);
 //}
 
-// For test purpose
-int _starpu_mpi_checkpoint_template_print(starpu_mpi_checkpoint_template_t cp_template)
-{
-	int val;
-	int i = 0;
-	struct _starpu_mpi_checkpoint_template_item* item = _starpu_mpi_checkpoint_template_get_first_data(cp_template);
-
-	while (item != _starpu_mpi_checkpoint_template_end(cp_template))
-	{
-		fprintf(stderr,"Item %2d: ", i);
-		if (item->type == STARPU_VALUE)
-		{
-			fprintf(stderr, "STARPU_VALUE - ");
-			fprintf(stderr, "Value=%d\n", (*(int *)(item->ptr)));
-		}
-		else if (item->type == STARPU_R)
-		{
-			val = *(int*)starpu_data_handle_to_pointer(*(starpu_data_handle_t*)(item->ptr), 0);
-			fprintf(stderr, "STARPU_R - Value=%d\n", val);
-		}
-		else if (item->type == STARPU_DATA_ARRAY)
-		{
-			fprintf(stderr, "STARPU_DATA_ARRAY - Multiple values: %d", *(int*)starpu_data_handle_to_pointer(*((starpu_data_handle_t*)item->ptr), 0));
-
-			for (int j=1 ; j<MIN(item->count, 5) ; j++)
-			{
-				fprintf(stderr, ", %d", *(int*)starpu_data_handle_to_pointer(((starpu_data_handle_t*)item->ptr)[j], 0)); //j*sizeof(starpu_data_handle_t)
-			}
-			fprintf(stderr, "...\n");
-		}
-		else
-		{
-			printf("Unrecognized type.\n");
-		}
-
-		item = _starpu_mpi_checkpoint_template_get_next_data(cp_template, item);
-		i++;
-	};
-	return 0;
-}
-
-int starpu_mpi_checkpoint_turn_on(void)
-{
-	starpu_pthread_mutex_init(&cp_template_mutex, NULL);
-	_starpu_mpi_req_list_init(&detached_ft_service_requests);
-	starpu_pthread_mutex_init(&detached_ft_service_requests_mutex, NULL);
-	starpu_mpi_comm_rank(MPI_COMM_WORLD, &my_rank); //TODO: check compatibility with several Comms behaviour
-	return 0;
-}
-
-int starpu_mpi_checkpoint_turn_off(void)
-{
-	for (int i=0 ; i<MAX_CP_TEMPLATE_NUMBER ; i++)
-	{
-		if (cp_template_array[i] == NULL)
-		{
-			break;
-		}
-		_starpu_checkpoint_template_free(cp_template_array[i]);
-		cp_template_array[i] = NULL;
-	}
-	starpu_pthread_mutex_destroy(&cp_template_mutex);
-
-	return 0;
-}
-
 int starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template_t* cp_template, int cp_id, ...)
 {
 	va_list varg_list;
@@ -304,11 +135,6 @@ int starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template_t* cp
 	return ret;
 }
 
-int starpu_mpi_checkpoint_template_submit(starpu_mpi_checkpoint_template_t cp_template)
-{
-	return _starpu_mpi_checkpoint_template_submit(cp_template);
-}
-
 void _print_ack_sent_cb(void* _args)
 {
 	struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
@@ -355,27 +181,9 @@ void _starpu_mpi_treat_cache_ack_no_lock_cb(void* args)
 void _starpu_mpi_treat_ack_receipt_cb(void* _args)
 {
 	struct _starpu_mpi_cp_ack_msg* msg = (struct _starpu_mpi_cp_ack_msg*) _args;
-	starpu_pthread_mutex_lock(&cp_template_mutex);
-	for (int i=0 ; i<cp_template_number ; i++)
-	{
-		starpu_pthread_mutex_lock(&cp_template_array[i]->mutex);
-		if (cp_template_array[i]->cp_template_id == msg->checkpoint_id && cp_template_array[i]->cp_template_current_instance == msg->checkpoint_instance)
-		{
-			cp_template_array[i]->remaining_ack_awaited--;
-			if (cp_template_array[i]->remaining_ack_awaited == 0)
-			{
-				// TODO: share info about cp integrity
-				fprintf(stderr, "All cp material for cpid:%d, cpinst:%d - have been sent and acknowledged.\n", msg->checkpoint_id, msg->checkpoint_instance);
-				cp_template_array[i]->pending=0;
-			}
-			free(msg);
-			starpu_pthread_mutex_unlock(&cp_template_array[i]->mutex);
-			starpu_pthread_mutex_unlock(&cp_template_mutex);
-			return;
-		}
-		starpu_pthread_mutex_unlock(&cp_template_array[i]->mutex);
+	if (_checkpoint_template_digest_ack_reception(msg->checkpoint_id, msg->checkpoint_instance) == 0) {
+		free(msg);
 	}
-	starpu_pthread_mutex_unlock(&cp_template_mutex);
 }
 
 void _starpu_mpi_push_cp_ack_send_cb(void* _args)
@@ -527,7 +335,7 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req)
 	_STARPU_MPI_LOG_OUT();
 }
 
-static void _starpu_mpi_test_ft_detached_requests(void)
+void starpu_mpi_test_ft_detached_requests(void)
 {
 	//_STARPU_MPI_LOG_IN();
 	int flag;
@@ -610,13 +418,13 @@ static void _starpu_mpi_test_ft_detached_requests(void)
 	//_STARPU_MPI_LOG_OUT();
 }
 
-void starpu_mpi_ft_progress(void)
-{
-	_starpu_mpi_test_ft_detached_requests();
-}
-
+int starpu_mpi_checkpoint_lib_init() {
+	_starpu_mpi_req_list_init(&detached_ft_service_requests);
+	starpu_pthread_mutex_init(&detached_ft_service_requests_mutex, NULL);
 
-int starpu_mpi_ft_busy()
-{
-	return ! _starpu_mpi_req_list_empty(&detached_ft_service_requests);
+	return 0;
 }
+
+int starpu_mpi_checkpoint_lib_busy() {
+	return !_starpu_mpi_req_list_empty(&detached_ft_service_requests);
+}

+ 4 - 2
mpi/src/starpu_mpi_checkpoint.h

@@ -26,6 +26,10 @@ extern "C"
 {
 #endif
 
+void starpu_mpi_test_ft_detached_requests(void);
+int starpu_mpi_checkpoint_lib_init();
+int starpu_mpi_checkpoint_lib_busy();
+
 struct _starpu_mpi_cp_ack_msg
 {
 	int checkpoint_id;
@@ -40,8 +44,6 @@ struct _starpu_mpi_cp_ack_arg_cb
 	struct _starpu_mpi_cp_ack_msg msg;
 };
 
-void starpu_mpi_ft_progress(void);
-int starpu_mpi_ft_busy();
 
 #ifdef __cplusplus
 }

mpi/src/starpu_mpi_checkpoint_package.h → mpi/src/mpi_failure_tolerance/starpu_mpi_checkpoint_package.h


+ 182 - 0
mpi/src/mpi_failure_tolerance/starpu_mpi_checkpoint_template.c

@@ -0,0 +1,182 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2013-2020  Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <stdarg.h>
+#include <stdlib.h>
+#include <common/utils.h>
+
+#include <mpi_failure_tolerance/starpu_mpi_checkpoint.h>
+#include <mpi_failure_tolerance/starpu_mpi_checkpoint_template.h>
+#include <mpi_failure_tolerance/starpu_mpi_checkpoint_package.h>
+#include <sys/param.h>
+#include <starpu_mpi_private.h>
+#include <mpi/starpu_mpi_mpi_backend.h> // Should be deduced at preprocessing (Nmad vs MPI)
+#include <mpi/starpu_mpi_mpi.h>
+#include <starpu_mpi_cache.h>
+
+
+#define MAX_CP_TEMPLATE_NUMBER 32 // Arbitrary limit
+
+starpu_pthread_mutex_t           cp_template_mutex;
+starpu_mpi_checkpoint_template_t cp_template_array[MAX_CP_TEMPLATE_NUMBER];
+int                              my_rank;
+int cp_template_number = 0;
+
+void checkpoint_template_lib_init(void) {
+	starpu_pthread_mutex_init(&cp_template_mutex, NULL);
+}
+
+void checkpoint_template_lib_quit(void) {
+	for (int i=0 ; i<MAX_CP_TEMPLATE_NUMBER ; i++)
+	{
+		if (cp_template_array[i] == NULL)
+		{
+			break;
+		}
+		_starpu_checkpoint_template_free(cp_template_array[i]);
+		cp_template_array[i] = NULL;
+	}
+}
+
+int _starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template_t* cp_template, int cp_id, va_list varg_list)
+{
+	int arg_type;
+	//void* useless;
+	void* ptr;
+	int count;
+	int backup_rank;
+	int backup_of;
+//	int (*_backup_of)(int);
+//	int (*_backuped_by)(int);
+
+	starpu_mpi_checkpoint_template_t _cp_template = _starpu_mpi_checkpoint_template_new(cp_id);
+
+	va_list varg_list_copy;
+	va_copy(varg_list_copy, varg_list);
+
+	while ((arg_type = va_arg(varg_list_copy, int)) != 0)
+	{
+		STARPU_ASSERT_MSG(!(arg_type & STARPU_COMMUTE), "Unable to checkpoint non sequential task flow.\n");
+
+		switch(arg_type)
+		{
+			case STARPU_R:
+				ptr         = va_arg(varg_list_copy, void*);
+				count       = 1;
+				backup_rank = va_arg(varg_list_copy, int);
+				backup_of   = -1;
+				break;
+			case STARPU_VALUE:
+				ptr         = va_arg(varg_list_copy, void*);
+				count       = va_arg(varg_list_copy, int);
+				backup_rank = va_arg(varg_list_copy, int);
+				backup_of   = va_arg(varg_list_copy, int);
+				break;
+//			case STARPU_DATA_ARRAY:
+//				ptr         = va_arg(varg_list_copy, void*);
+//				count       = va_arg(varg_list_copy, int);
+//				backup_rank = va_arg(varg_list_copy, int);
+//				backup_of   = -1;
+//				break;
+			default:
+				STARPU_ABORT_MSG("Unrecognized argument %d, did you perhaps forget to end arguments with 0?\n", arg_type);
+				break;
+		}
+		_starpu_mpi_checkpoint_template_add_data(_cp_template, arg_type, ptr, count, backup_rank, backup_of);
+	};
+	va_end(varg_list_copy);
+
+	_starpu_mpi_checkpoint_template_freeze(_cp_template);
+
+	starpu_pthread_mutex_lock(&cp_template_mutex);
+	for (int i=0 ; i<cp_template_number ; i++)
+	{
+		STARPU_ASSERT_MSG(cp_template_array[i]->cp_template_id != _cp_template->cp_template_id, "A checkpoint with id %d has already been registered.\n", _cp_template->cp_template_id);
+	}
+
+	cp_template_array[cp_template_number] = _cp_template;
+	cp_template_number++;
+	starpu_pthread_mutex_unlock(&cp_template_mutex);
+
+	*cp_template = _cp_template;
+
+	return 0;
+}
+
+int _checkpoint_template_digest_ack_reception(int checkpoint_id, int checkpoint_instance) {
+	starpu_pthread_mutex_lock(&cp_template_mutex);
+	for (int i=0 ; i<cp_template_number ; i++)
+	{
+		starpu_pthread_mutex_lock(&cp_template_array[i]->mutex);
+		if (cp_template_array[i]->cp_template_id == checkpoint_id && cp_template_array[i]->cp_template_current_instance == checkpoint_instance)
+		{
+			cp_template_array[i]->remaining_ack_awaited--;
+			if (cp_template_array[i]->remaining_ack_awaited == 0)
+			{
+				// TODO: share info about cp integrity
+				fprintf(stderr, "All cp material for cpid:%d, cpinst:%d - have been sent and acknowledged.\n", checkpoint_id, checkpoint_instance);
+				cp_template_array[i]->pending=0;
+			}
+			starpu_pthread_mutex_unlock(&cp_template_array[i]->mutex);
+			starpu_pthread_mutex_unlock(&cp_template_mutex);
+			return 0;
+		}
+		starpu_pthread_mutex_unlock(&cp_template_array[i]->mutex);
+	}
+	starpu_pthread_mutex_unlock(&cp_template_mutex);
+	return -1;
+}
+
+// For test purpose
+int _starpu_mpi_checkpoint_template_print(starpu_mpi_checkpoint_template_t cp_template)
+{
+	int val;
+	int i = 0;
+	struct _starpu_mpi_checkpoint_template_item* item = _starpu_mpi_checkpoint_template_get_first_data(cp_template);
+
+	while (item != _starpu_mpi_checkpoint_template_end(cp_template))
+	{
+		fprintf(stderr,"Item %2d: ", i);
+		if (item->type == STARPU_VALUE)
+		{
+			fprintf(stderr, "STARPU_VALUE - ");
+			fprintf(stderr, "Value=%d\n", (*(int *)(item->ptr)));
+		}
+		else if (item->type == STARPU_R)
+		{
+			val = *(int*)starpu_data_handle_to_pointer(*(starpu_data_handle_t*)(item->ptr), 0);
+			fprintf(stderr, "STARPU_R - Value=%d\n", val);
+		}
+		else if (item->type == STARPU_DATA_ARRAY)
+		{
+			fprintf(stderr, "STARPU_DATA_ARRAY - Multiple values: %d", *(int*)starpu_data_handle_to_pointer(*((starpu_data_handle_t*)item->ptr), 0));
+
+			for (int j=1 ; j<MIN(item->count, 5) ; j++)
+			{
+				fprintf(stderr, ", %d", *(int*)starpu_data_handle_to_pointer(((starpu_data_handle_t*)item->ptr)[j], 0)); //j*sizeof(starpu_data_handle_t)
+			}
+			fprintf(stderr, "...\n");
+		}
+		else
+		{
+			printf("Unrecognized type.\n");
+		}
+
+		item = _starpu_mpi_checkpoint_template_get_next_data(cp_template, item);
+		i++;
+	};
+	return 0;
+}

+ 7 - 0
mpi/src/starpu_mpi_checkpoint_template.h

@@ -20,12 +20,19 @@
 #include <starpu_mpi.h>
 #include <common/list.h>
 #include <starpu_mpi_private.h>
+#include <mpi_failure_tolerance/starpu_mpi_checkpoint.h>
 
 #ifdef __cplusplus
 extern "C"
 {
 #endif
 
+void checkpoint_template_lib_init(void);
+void checkpoint_template_lib_quit(void);
+int _checkpoint_template_digest_ack_reception(int checkpoint_id, int checkpoint_instance);
+
+int _starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template_t* cp_template, int cp_id, va_list varg_list);
+
 LIST_TYPE(_starpu_mpi_checkpoint_template_item,
 int type;
 void* ptr;

+ 51 - 0
mpi/src/mpi_failure_tolerance/starpu_mpi_ft.c

@@ -0,0 +1,51 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2013-2020  Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <starpu_mpi_private.h>
+#include <mpi_failure_tolerance/starpu_mpi_checkpoint_template.h>
+#include <mpi_failure_tolerance/starpu_mpi_checkpoint.h>
+
+
+starpu_pthread_mutex_t           ft_mutex;
+int                              my_rank;
+
+int starpu_mpi_ft_turn_on(void)
+{
+	starpu_pthread_mutex_init(&ft_mutex, NULL);
+	starpu_mpi_checkpoint_lib_init();
+	starpu_mpi_comm_rank(MPI_COMM_WORLD, &my_rank); //TODO: check compatibility with several Comms behaviour
+	checkpoint_template_lib_init();
+	return 0;
+}
+
+int starpu_mpi_ft_turn_off(void)
+{
+	checkpoint_template_lib_quit();
+	starpu_pthread_mutex_destroy(&ft_mutex);
+
+	return 0;
+}
+
+void starpu_mpi_ft_progress(void)
+{
+	starpu_mpi_test_ft_detached_requests();
+}
+
+
+int starpu_mpi_ft_busy()
+{
+	starpu_mpi_checkpoint_lib_busy();
+}

+ 32 - 0
mpi/src/mpi_failure_tolerance/starpu_mpi_ft.h

@@ -0,0 +1,32 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2013-2020  Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#ifndef _STARPU_MPI_FT_H
+#define _STARPU_MPI_FT_H
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
+void starpu_mpi_ft_progress(void);
+int starpu_mpi_ft_busy();
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif //_STARPU_MPI_FT_H

+ 38 - 0
mpi/src/starpu_mpi.c

@@ -175,6 +175,25 @@ int starpu_mpi_issend_detached(starpu_data_handle_t data_handle, int dest, starp
 	return starpu_mpi_issend_detached_prio(data_handle, dest, data_tag, 0, comm, callback, arg);
 }
 
+struct _starpu_mpi_req* _starpu_mpi_isend_cache_aware(starpu_data_handle_t data_handle, int dest, starpu_mpi_tag_t data_tag, MPI_Comm comm, unsigned detached, unsigned sync, int prio, void (*callback)(void *), void *_arg, void (*alt_callback)(void *), void *_alt_arg, int sequential_consistency)
+{
+	struct _starpu_mpi_req* req = NULL;
+	int already_sent = _starpu_mpi_cache_sent_data_set(data_handle, dest);
+	if (already_sent == 0)
+	{
+		if (data_tag == -1)
+			_STARPU_ERROR("StarPU needs to be told the MPI tag of this data, using starpu_mpi_data_register\n");
+		_STARPU_MPI_DEBUG(1, "Receiving data %p from %d\n", data_handle, mpi_rank);
+		req = _starpu_mpi_isend_common(data_handle, dest, data_tag, comm, detached, sync, prio, callback, _arg, sequential_consistency);
+	}
+	else
+	{
+		fprintf(stderr, "STARPU CACHE: Data already sent\n");
+		starpu_data_acquire_cb(data_handle, STARPU_R, alt_callback, _alt_arg);
+	}
+	return req;
+}
+
 struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle, int source, starpu_mpi_tag_t data_tag, MPI_Comm comm, unsigned detached, unsigned sync, void (*callback)(void *), void *arg, int sequential_consistency, int is_internal_req, starpu_ssize_t count)
 {
 	if (_starpu_mpi_fake_world_size != -1)
@@ -238,6 +257,25 @@ int starpu_mpi_recv(starpu_data_handle_t data_handle, int source, starpu_mpi_tag
 	return 0;
 }
 
+struct _starpu_mpi_req* _starpu_mpi_irecv_cache_aware(starpu_data_handle_t data_handle, int source, starpu_mpi_tag_t data_tag, MPI_Comm comm, unsigned detached, unsigned sync, void (*callback)(void *), void *_arg, void (*alt_callback)(void *), void *_alt_arg, int sequential_consistency, int is_internal_req, starpu_ssize_t count)
+{
+	struct _starpu_mpi_req* req = NULL;
+	int already_received = _starpu_mpi_cache_received_data_set(data_handle);
+	if (already_received == 0)
+	{
+		if (data_tag == -1)
+			_STARPU_ERROR("StarPU needs to be told the MPI tag of this data, using starpu_mpi_data_register\n");
+		_STARPU_MPI_DEBUG(1, "Receiving data %p from %d\n", data_handle, source);
+		req = _starpu_mpi_irecv_common(data_handle, source, data_tag, comm, detached, sync, callback, _arg, sequential_consistency, is_internal_req, count);
+	}
+	else
+	{
+		fprintf(stderr, "STARPU CACHE: Data already received\n");
+		starpu_data_acquire_cb(data_handle, STARPU_R, alt_callback, _alt_arg);
+	}
+	return req;
+}
+
 int starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status)
 {
 	return _mpi_backend._starpu_mpi_backend_wait(public_req, status);

+ 2 - 2
mpi/src/starpu_mpi_init.c

@@ -91,7 +91,7 @@ void _starpu_mpi_do_initialize(struct _starpu_mpi_argc_argv *argc_argv)
 #endif
 
 #ifdef STARPU_USE_MPI_FT
-	starpu_mpi_checkpoint_turn_on();
+	starpu_mpi_ft_turn_on();
 #endif // STARPU_USE_MPI_FT
 }
 
@@ -266,7 +266,7 @@ int starpu_mpi_shutdown(void)
 	int rank, world_size;
 
 #ifdef STARPU_USE_MPI_FT
-	starpu_mpi_checkpoint_turn_off();
+	starpu_mpi_ft_turn_off();
 #endif // STARPU_USE_MPI_FT
 
 	/* Make sure we do not have MPI communications pending in the task graph

+ 2 - 2
mpi/tests/checkpoints.c

@@ -17,7 +17,7 @@
 #include <starpu_mpi.h>
 #include "helper.h"
 
-#include <starpu_mpi_checkpoint.h>
+#include <mpi_failure_tolerance/starpu_mpi_checkpoint_template.h>
 
 #define ARRAY_SIZE 12
 
@@ -137,7 +137,7 @@ int test_checkpoint_submit(int argc, char* argv[])
 			break;
 	}
 	FPRINTF_MPI(stderr, "Submitting\n");
-	starpu_mpi_checkpoint_template_submit(cp_template);
+	starpu_mpi_submit_checkpoint_template(cp_template);
 
 	FPRINTF_MPI(stderr, "Submitted\n");
 	sleep(10);