Browse Source

Unset COMMUTE restrictions.
complete FT stats.
Solve warnings

Romain LION 5 years ago
parent
commit
7aedec9988

+ 2 - 4
mpi/examples/matrix_decomposition/mpi_cholesky_codelets.c

@@ -75,8 +75,7 @@ static struct starpu_codelet cl22 =
 #endif
 	.cuda_flags = {STARPU_CUDA_ASYNC},
 	.nbuffers = 3,
-    .modes = {STARPU_R, STARPU_R, STARPU_RW},
-//     .modes = {STARPU_R, STARPU_R, STARPU_RW | STARPU_COMMUTE},
+    .modes = {STARPU_R, STARPU_R, STARPU_RW | STARPU_COMMUTE},
 	.model = &chol_model_22,
 	.color = 0x00ff00,
 };
@@ -119,8 +118,7 @@ static void run_cholesky(starpu_data_handle_t **data_handles, int rank, int node
 							       STARPU_PRIORITY, noprio ? STARPU_DEFAULT_PRIO : unbound_prio ? (int)(2*nblocks - 2*k - m - n) : ((n == k+1) && (m == k+1))?STARPU_MAX_PRIO:STARPU_DEFAULT_PRIO,
 							       STARPU_R, data_handles[n][k],
 							       STARPU_R, data_handles[m][k],
-					               STARPU_RW, data_handles[m][n],
-//							               STARPU_RW | STARPU_COMMUTE, data_handles[m][n],
+					               STARPU_RW | STARPU_COMMUTE, data_handles[m][n],
 							       0);
 				}
 			}

+ 8 - 9
mpi/src/mpi_failure_tolerance/starpu_mpi_checkpoint.c

@@ -37,17 +37,19 @@ extern struct _starpu_mpi_req* _starpu_mpi_irecv_cache_aware(starpu_data_handle_
 
 
 
-void _arg_free(void* _args)
+void _ack_msg_send_cb(void* _args)
 {
 	struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
+	_STARPU_MPI_FT_STATS_SEND_FT_SERVICE_MSG(sizeof(struct _starpu_mpi_cp_ack_msg));
 	_STARPU_MPI_DEBUG(3, "Ack send succeeded cpid:%d, cpinst:%d, dest:%d\n", arg->msg.checkpoint_id, arg->msg.checkpoint_instance, arg->rank);
 	free(arg);
 }
 
-void _starpu_mpi_treat_ack_receipt_cb(void* _args)
+void _ack_msg_recv_cb(void* _args)
 {
 	struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
 	int ret;
+	_STARPU_MPI_FT_STATS_RECV_FT_SERVICE_MSG(sizeof(struct _starpu_mpi_cp_ack_msg));
 	_STARPU_MPI_DEBUG(3, "ack msg recved id:%d inst:%d\n", arg->msg.checkpoint_id, arg->msg.checkpoint_instance);
 	ret = _checkpoint_template_digest_ack_reception(arg->msg.checkpoint_id, arg->msg.checkpoint_instance);
 	if (ret == 0) {
@@ -63,15 +65,15 @@ void _starpu_mpi_store_data_and_send_ack_cb(struct _starpu_mpi_cp_ack_arg_cb* ar
 {
 	checkpoint_package_data_add(arg->msg.checkpoint_id, arg->msg.checkpoint_instance, arg->rank, arg->tag, arg->type, arg->copy_handle, arg->count);
 	_STARPU_MPI_DEBUG(3,"Send ack msg to %d: id=%d inst=%d\n", arg->rank, arg->msg.checkpoint_id, arg->msg.checkpoint_instance);
-	_ft_service_msg_isend_cb((void*)&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_ACK, MPI_COMM_WORLD, _arg_free, arg);
-	_STARPU_MPI_FT_STATS_SEND_FT_SERVICE_MSG(sizeof(struct _starpu_mpi_cp_ack_msg));
+	_ft_service_msg_isend_cb((void *) &arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank,
+	                         _STARPU_MPI_TAG_CP_ACK, MPI_COMM_WORLD, _ack_msg_send_cb, arg);
 }
 
 void _starpu_mpi_push_cp_ack_recv_cb(struct _starpu_mpi_cp_ack_arg_cb* arg)
 {
 	_STARPU_MPI_DEBUG(3, "Posting ack recv cb from %d\n", arg->rank);
-	_ft_service_msg_irecv_cb((void*)&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_ACK, MPI_COMM_WORLD, _starpu_mpi_treat_ack_receipt_cb, arg);
-	_STARPU_MPI_FT_STATS_RECV_FT_SERVICE_MSG(sizeof(struct _starpu_mpi_cp_ack_msg));
+	_ft_service_msg_irecv_cb((void *) &arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank,
+	                         _STARPU_MPI_TAG_CP_ACK, MPI_COMM_WORLD, _ack_msg_recv_cb, arg);
 }
 
 void _recv_internal_dup_ro_cb(void* _args)
@@ -93,7 +95,6 @@ void _recv_cp_external_data_cb(void* _args)
 
 void _recv_cp_internal_data_cb(void* _args)
 {
-	struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
 	_STARPU_MPI_FT_STATS_RECV_CP_DATA(
 			arg->type == STARPU_VALUE ? arg->count : arg->type == STARPU_R ? starpu_data_get_size(arg->handle) : -1);
 }
@@ -135,14 +136,12 @@ void _send_cached_cp_internal_data_cb(void* _args)
 int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_template)
 {
 	starpu_data_handle_t* handle;
-	struct _starpu_mpi_checkpoint_tracker* tracker;
 	struct _starpu_mpi_cp_ack_arg_cb* arg;
 	void* cpy_ptr;
 	struct _starpu_mpi_checkpoint_template_item* item;
 	int current_instance;
 
 	current_instance = increment_current_instance();
-//	_starpu_mpi_checkpoint_template_create_instance_tracker(cp_template, cp_template->cp_id, cp_template->checkpoint_domain, current_instance);
 	_starpu_mpi_checkpoint_post_cp_discard_recv(cp_template);
 
 	item = _starpu_mpi_checkpoint_template_get_first_data(cp_template);

+ 31 - 9
mpi/src/mpi_failure_tolerance/starpu_mpi_checkpoint_template.c

@@ -110,7 +110,7 @@ int _starpu_mpi_checkpoint_template_add_entry(starpu_mpi_checkpoint_template_t c
 	starpu_mpi_tag_t tag;
 	backup_of_fn     _backup_of;
 
-	STARPU_ASSERT_MSG(!(arg_type & STARPU_COMMUTE), "Unable to checkpoint non sequential task flow.\n");
+	arg_type = arg_type & ~STARPU_COMMUTE;
 
 	switch(arg_type)
 	{
@@ -168,10 +168,11 @@ int _starpu_mpi_checkpoint_template_add_entry(starpu_mpi_checkpoint_template_t c
 	}
 }
 
-void checkpoint_discard(void* _args)
+void _cp_discard_message_recv_cb(void* _args)
 {
 	// TODO: store the information of the new CP, for restart purpose
 	struct _starpu_mpi_cp_discard_arg_cb* arg = (struct _starpu_mpi_cp_discard_arg_cb*) _args;
+	_STARPU_MPI_FT_STATS_RECV_FT_SERVICE_MSG(sizeof(struct _starpu_mpi_cp_ack_msg));
 	_STARPU_MPI_DEBUG(0, "DISCARDING OLD CHECKPOINT DATA of rank %d - new one is CPID:%d - CPINST:%d\n", arg->rank, arg->msg.checkpoint_id, arg->msg.checkpoint_instance);
 	checkpoint_package_data_del(arg->msg.checkpoint_id, arg->msg.checkpoint_instance, arg->rank);
 }
@@ -192,13 +193,15 @@ int _starpu_mpi_checkpoint_post_cp_discard_recv(starpu_mpi_checkpoint_template_t
 		starpu_malloc((void**)&arg, sizeof(struct _starpu_mpi_cp_discard_arg_cb));
 		arg->rank = cp_template->backup_of_array[i];
 		_STARPU_MPI_DEBUG(10, "Post DISCARD msg reception from %d\n", arg->rank);
-		_ft_service_msg_irecv_cb(&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_INFO, MPI_COMM_WORLD, checkpoint_discard, (void*)arg);
+		_ft_service_msg_irecv_cb(&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_INFO,
+		                         MPI_COMM_WORLD, _cp_discard_message_recv_cb, (void *) arg);
 	}
 	return i;
 }
 
-void free_arg(void* _args)
+void _cp_discard_message_send_cb(void* _args)
 {
+	_STARPU_MPI_FT_STATS_SEND_FT_SERVICE_MSG(sizeof(struct _starpu_mpi_cp_ack_msg));
 	starpu_free(_args);
 }
 
@@ -221,12 +224,31 @@ int _starpu_mpi_checkpoint_post_cp_discard_send(starpu_mpi_checkpoint_template_t
 		arg->msg.validation=0;
 		arg->msg.checkpoint_id = cp_id;
 		arg->msg.checkpoint_instance = cp_instance;
-		_ft_service_msg_isend_cb(&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_INFO, MPI_COMM_WORLD, free_arg, (void*)arg);
+		_ft_service_msg_isend_cb(&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_INFO,
+		                         MPI_COMM_WORLD, _cp_discard_message_send_cb, (void *) arg);
 	}
 
 	return 0;
 }
 
+starpu_mpi_checkpoint_template_t _starpu_mpi_get_checkpoint_template_by_id(int checkpoint_id)
+{
+	starpu_pthread_mutex_lock(&cp_template_mutex);
+	for (int i=0 ; i < cp_template_array_size ; i++)
+	{
+//		starpu_pthread_mutex_lock(&cp_template_array[i]->mutex);
+		if (cp_template_array[i]->cp_id == checkpoint_id)
+		{
+//			starpu_pthread_mutex_unlock(&cp_template_array[i]->mutex);
+			starpu_pthread_mutex_unlock(&cp_template_mutex);
+			return cp_template_array[i];
+		}
+//		starpu_pthread_mutex_unlock(&cp_template_array[i]->mutex);
+	}
+	starpu_pthread_mutex_unlock(&cp_template_mutex);
+	return NULL;
+}
+
 
 //int _starpu_mpi_checkpoint_post_cp_discard_recv(starpu_mpi_checkpoint_template_t cp_template)
 //{
@@ -244,7 +266,7 @@ int _starpu_mpi_checkpoint_post_cp_discard_send(starpu_mpi_checkpoint_template_t
 //		starpu_malloc((void**)&arg, sizeof(struct _starpu_mpi_cp_discard_arg_cb));
 //		arg->rank = cp_template->backup_of_array[i];
 //		_STARPU_MPI_DEBUG(10, "Posting DISCARD msg reception from %d\n", arg->rank);
-//		_ft_service_msg_irecv_cb(&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_INFO, MPI_COMM_WORLD, checkpoint_discard, (void*)arg);
+//		_ft_service_msg_irecv_cb(&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_INFO, MPI_COMM_WORLD, _cp_discard_message_recv_cb, (void*)arg);
 //	}
 //	if (last_valid_checkpoint.checkpoint_id == -1)
 //	{
@@ -269,7 +291,7 @@ int _starpu_mpi_checkpoint_post_cp_discard_send(starpu_mpi_checkpoint_template_t
 //				starpu_malloc((void**)&arg, sizeof(struct _starpu_mpi_cp_discard_arg_cb));
 //				arg->rank = old_template->backup_of_array[i];
 //				_STARPU_MPI_DEBUG(10, "Posting DISCARD msg reception from %d - LAST VALIDATED CP\n", arg->rank);
-//				_ft_service_msg_irecv_cb(&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_INFO, MPI_COMM_WORLD, checkpoint_discard, (void*)arg);
+//				_ft_service_msg_irecv_cb(&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_INFO, MPI_COMM_WORLD, _cp_discard_message_recv_cb, (void*)arg);
 //			}
 //		}
 //	}
@@ -294,7 +316,7 @@ int _starpu_mpi_checkpoint_post_cp_discard_send(starpu_mpi_checkpoint_template_t
 //		_STARPU_MPI_DEBUG(10, "Sending DISCARD msg reception to %d\n", arg->rank);
 //		arg->msg.checkpoint_id = cp_id;
 //		arg->msg.checkpoint_instance = cp_instance;
-//		_ft_service_msg_isend_cb(&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_INFO, MPI_COMM_WORLD, free_arg, (void*)arg);
+//		_ft_service_msg_isend_cb(&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_INFO, MPI_COMM_WORLD, _cp_discard_message_send_cb, (void*)arg);
 //	}
 //	if (last_valid_checkpoint.checkpoint_id == -1)
 //	{
@@ -321,7 +343,7 @@ int _starpu_mpi_checkpoint_post_cp_discard_send(starpu_mpi_checkpoint_template_t
 //				_STARPU_MPI_DEBUG(10, "Sending DISCARD msg to %d - OLD CP\n", arg->rank);
 //				arg->msg.checkpoint_id = cp_id;
 //				arg->msg.checkpoint_instance = cp_instance;
-//				_ft_service_msg_isend_cb(&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_INFO, MPI_COMM_WORLD, free_arg, (void*)arg);
+//				_ft_service_msg_isend_cb(&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_INFO, MPI_COMM_WORLD, _cp_discard_message_send_cb, (void*)arg);
 //			}
 //		}
 //	}

+ 1 - 21
mpi/src/mpi_failure_tolerance/starpu_mpi_checkpoint_template.h

@@ -36,9 +36,6 @@ extern starpu_pthread_mutex_t           cp_template_mutex;
 extern int                              cp_template_array_size;
 extern starpu_mpi_checkpoint_template_t cp_template_array[MAX_CP_TEMPLATE_NUMBER];
 
-extern struct _starpu_mpi_checkpoint_template_tracking_inst_list future_tracking_list;
-extern struct _starpu_mpi_checkpoint_template_tracking_inst_list pending_tracking_list;
-
 int increment_current_instance();
 int get_current_instance();
 
@@ -48,6 +45,7 @@ void checkpoint_template_lib_quit(void);
 
 int _checkpoint_template_digest_ack_reception(int checkpoint_id, int checkpoint_instance);
 
+starpu_mpi_checkpoint_template_t _starpu_mpi_get_checkpoint_template_by_id(int checkpoint_id);
 int _starpu_mpi_checkpoint_post_cp_discard_recv(starpu_mpi_checkpoint_template_t cp_template);
 
 int _starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template_t *cp_template, int cp_id, int cp_domain, va_list varg_list);
@@ -89,24 +87,6 @@ struct _starpu_mpi_checkpoint_template
 
 };
 
-static starpu_mpi_checkpoint_template_t _starpu_mpi_get_checkpoint_template_by_id(int checkpoint_id)
-{
-	starpu_pthread_mutex_lock(&cp_template_mutex);
-	for (int i=0 ; i < cp_template_array_size ; i++)
-	{
-//		starpu_pthread_mutex_lock(&cp_template_array[i]->mutex);
-		if (cp_template_array[i]->cp_id == checkpoint_id)
-		{
-//			starpu_pthread_mutex_unlock(&cp_template_array[i]->mutex);
-			starpu_pthread_mutex_unlock(&cp_template_mutex);
-			return cp_template_array[i];
-		}
-//		starpu_pthread_mutex_unlock(&cp_template_array[i]->mutex);
-	}
-	starpu_pthread_mutex_unlock(&cp_template_mutex);
-	return NULL;
-}
-
 
 
 

+ 3 - 0
mpi/src/mpi_failure_tolerance/starpu_mpi_checkpoint_tracker.c

@@ -119,6 +119,7 @@ static inline int _clear_domain_tracker_index(struct _starpu_mpi_checkpoint_doma
 		HASH_DEL(index->tracked_inst_hash_table, entry);
 		free(entry);
 	}
+	return 0;
 }
 
 static inline int _domain_tracker_delete_all()
@@ -141,12 +142,14 @@ static inline int _domain_tracker_delete_all()
 int _starpu_mpi_checkpoint_tracker_init()
 {
 	domain_tracker_list = _starpu_mpi_checkpoint_domain_tracker_index_list_new();
+	return 0;
 }
 
 int _starpu_mpi_checkpoint_tracker_shutdown()
 {
 	_domain_tracker_delete_all();
 	free(domain_tracker_list);
+	return 0;
 }
 
 struct _starpu_mpi_checkpoint_tracker* _starpu_mpi_checkpoint_template_get_tracking_inst_by_id_inst(int cp_domain, int cp_inst)