Browse Source

Adds prio to CP

Romain LION 4 years ago
parent
commit
a458918f7d

+ 6 - 5
mpi/examples/matrix_decomposition/mpi_cholesky_codelets.c

@@ -85,7 +85,8 @@ starpu_mpi_checkpoint_template_t* checkpoint_p;
 
 int backup_function(int rank)
 {
-	return (rank+1)%_nodes;
+	return (rank/dblockx)*dblockx +(rank+1)%dblockx;
+//	return (rank+1)%_nodes;
 }
 
 
@@ -188,7 +189,7 @@ static void run_cholesky(starpu_data_handle_t **data_handles, int rank, int node
 			if (my_distrib(n, k, nodes) == rank)
 				starpu_data_wont_use(data_handles[n][k]);
 		}
-		starpu_mpi_submit_checkpoint_template(*checkpoint_p);
+		starpu_mpi_submit_checkpoint_template(*checkpoint_p, -2*k);
 		starpu_iteration_pop();
 	}
 }
@@ -242,7 +243,7 @@ static void run_cholesky_column(starpu_data_handle_t **data_handles, int rank, i
 						       0);
 			}
 		}
-		starpu_mpi_submit_checkpoint_template(*checkpoint_p);
+		starpu_mpi_submit_checkpoint_template(*checkpoint_p, (int)(nblocks - 2*n));
 		starpu_iteration_pop();
 	}
 
@@ -353,7 +354,7 @@ static void run_cholesky_antidiagonal(starpu_data_handle_t **data_handles, int r
 					       0);
 		}
 
-		starpu_mpi_submit_checkpoint_template(*checkpoint_p);
+		starpu_mpi_submit_checkpoint_template(*checkpoint_p, (int)(2*nblocks -4*a));
 		starpu_iteration_pop();
 	}
 
@@ -441,7 +442,7 @@ static void run_cholesky_prio(starpu_data_handle_t **data_handles, int rank, int
 
 		}
 
-		starpu_mpi_submit_checkpoint_template(*checkpoint_p);
+		starpu_mpi_submit_checkpoint_template(*checkpoint_p, (int)(2*nblocks - a));
 		starpu_iteration_pop();
 	}
 

+ 1 - 1
mpi/include/starpu_mpi_ft.h

@@ -48,7 +48,7 @@ int starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template_t* cp
 int starpu_mpi_checkpoint_template_create(starpu_mpi_checkpoint_template_t* cp_template, int cp_id, int cp_domain);
 int starpu_mpi_checkpoint_template_add_entry(starpu_mpi_checkpoint_template_t* cp_template, ...);
 int starpu_mpi_checkpoint_template_freeze(starpu_mpi_checkpoint_template_t* cp_template);
-int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_template);
+int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_template, int prio);
 int starpu_mpi_ft_turn_on(void);
 int starpu_mpi_ft_turn_off(void);
 int _starpu_mpi_checkpoint_template_print(starpu_mpi_checkpoint_template_t cp_template);

+ 16 - 5
mpi/src/mpi_failure_tolerance/starpu_mpi_checkpoint.c

@@ -111,7 +111,6 @@ void _send_cp_internal_data_cb(void* _args) {
 
 		//TODO: check cp_domain!
 		struct _starpu_mpi_checkpoint_tracker* tracker = _starpu_mpi_checkpoint_template_get_tracking_inst_by_id_inst(0, arg->msg.checkpoint_instance);
-		fprintf(stderr, "inst: %d tracker:%p\n", arg->msg.checkpoint_instance, tracker);
 		if(!tracker->first_msg_sent_flag)
 		{
 			tracker->first_msg_sent_flag = 1;
@@ -150,9 +149,10 @@ void _recv_internal_data_stats(STARPU_ATTRIBUTE_UNUSED struct _starpu_mpi_cp_ack
 }
 #endif
 
-int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_template)
+int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_template, int prio)
 {
 	starpu_data_handle_t handle;
+	struct _starpu_mpi_data* mpi_data;
 	struct _starpu_mpi_cp_ack_arg_cb* arg;
 	void* cpy_ptr;
 	struct _starpu_mpi_checkpoint_template_item* item;
@@ -183,7 +183,7 @@ int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_te
 					starpu_variable_data_register(&arg->handle, STARPU_MAIN_RAM, (uintptr_t)cpy_ptr, item->count);
 					arg->rank = item->backupped_by;
 					_STARPU_MPI_DEBUG(0, "Submit CP: sending external data:%d, tag:%ld, to :%d\n", (int)(*(int*)cpy_ptr), arg->tag, arg->rank);
-					starpu_mpi_isend_detached_prio(arg->handle, arg->rank, arg->tag, 0, MPI_COMM_WORLD,
+					starpu_mpi_isend_detached_prio(arg->handle, arg->rank, arg->tag, prio, MPI_COMM_WORLD,
 												   &_send_cp_external_data_cb, (void*)arg);
 					// The callback needs to free the handle specially created for the send, and post ack recv
 				}
@@ -200,8 +200,15 @@ int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_te
 				break;
 			case STARPU_R:
 				handle = (starpu_data_handle_t)item->ptr;
+				mpi_data = _starpu_mpi_data_get(handle);
 				if (starpu_mpi_data_get_rank(handle)==my_rank)
 				{
+					if (!mpi_data->modified)
+					{
+						_starpu_mpi_checkpoint_tracker_update(cp_template, cp_template->cp_id, cp_template->checkpoint_domain, current_instance);
+						//TODO: check if the data are all acknowledged
+						break; // We don't want to CP a data that is still at initial state.
+					}
 					_STARPU_MPI_DEBUG(0, "Submit CP: sending starPU data to %d (tag %d)\n", item->backupped_by, (int)starpu_mpi_data_get_tag(handle));
 					arg = calloc(1, sizeof(struct _starpu_mpi_cp_ack_arg_cb));
 					arg->rank = item->backupped_by;
@@ -211,13 +218,17 @@ int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_te
 					arg->count = item->count;
 					arg->msg.checkpoint_id = cp_template->cp_id;
 					arg->msg.checkpoint_instance = current_instance;
-					_starpu_mpi_isend_cache_aware(handle, item->backupped_by, starpu_mpi_data_get_tag(handle), MPI_COMM_WORLD, 1, 0, 0,
+					_starpu_mpi_isend_cache_aware(handle, item->backupped_by, starpu_mpi_data_get_tag(handle), MPI_COMM_WORLD, 1, 0, prio,
 					                              &_send_cp_internal_data_cb, (void*)arg, 1, &arg->cache_flag);
 					// the callbacks need to post ack recv. The cache one needs to release the handle.
 					_send_internal_data_stats(arg);
 				}
 				else if (item->backup_of == starpu_mpi_data_get_rank(handle))
 				{
+					if (!mpi_data->modified)
+					{
+						break; // We don't want to CP a data that is still at initial state.
+					}
 					_STARPU_MPI_DEBUG(0, "Submit CP: receiving starPU data from %d (tag %d)\n", starpu_mpi_data_get_rank(handle), (int)starpu_mpi_data_get_tag(handle));
 					arg = calloc(1, sizeof(struct _starpu_mpi_cp_ack_arg_cb));
 					arg->rank = item->backup_of;
@@ -232,7 +243,7 @@ int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_te
 					// The callback needs to do nothing. The cached one must release the handle.
 					_recv_internal_data_stats(arg);
 					starpu_data_dup_ro(&arg->copy_handle, arg->handle, 1);
-					//starpu_data_acquire_cb(arg->copy_handle, STARPU_R, _recv_internal_dup_ro_cb, arg);
+					starpu_data_acquire_cb(arg->copy_handle, STARPU_R, _recv_internal_dup_ro_cb, arg);
 					// The callback need to store the data and post ack send.
 				}
 				break;

+ 2 - 0
mpi/src/starpu_mpi_task_insert.c

@@ -146,10 +146,12 @@ void _starpu_mpi_exchange_data_after_execution(starpu_data_handle_t data, enum s
 	{
 		int mpi_rank = starpu_mpi_data_get_rank(data);
 		starpu_mpi_tag_t data_tag = starpu_mpi_data_get_tag(data);
+		struct _starpu_mpi_data* mpi_data = _starpu_mpi_data_get(data);
 		if(mpi_rank == -1)
 		{
 			_STARPU_ERROR("StarPU needs to be told the MPI rank of this data, using starpu_mpi_data_register\n");
 		}
+		mpi_data->modified=1;
 		if (mpi_rank == STARPU_MPI_PER_NODE)
 		{
 			mpi_rank = me;

+ 2 - 2
mpi/tests/checkpoints.c

@@ -161,7 +161,7 @@ int test_checkpoint_submit(int argc, char* argv[])
 			break;
 	}
 	FPRINTF_MPI(stderr, "Submitting\n");
-	starpu_mpi_submit_checkpoint_template(cp_template);
+	starpu_mpi_submit_checkpoint_template(cp_template,0);
 
 	FPRINTF_MPI(stderr, "Submitted\n");
 
@@ -185,7 +185,7 @@ int test_checkpoint_submit(int argc, char* argv[])
 	}
 
 	FPRINTF_MPI(stderr, "Submitting\n");
-	starpu_mpi_submit_checkpoint_template(cp_template);
+	starpu_mpi_submit_checkpoint_template(cp_template, 0);
 
 	FPRINTF_MPI(stderr, "Submitted\n");