Forráskód Böngészése

fixed bug in mpi dags

JUVEN Alexis 4 éve
szülő
commit
6006b5e5fe

+ 16 - 1
mpi/src/starpu_mpi.c

@@ -53,7 +53,15 @@ static void _starpu_mpi_isend_irecv_common(struct _starpu_mpi_req *req, enum sta
 		}
 	}
 
-	starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(req->data_handle, STARPU_MAIN_RAM, mode, _starpu_mpi_submit_ready_request, (void *)req, sequential_consistency, 1, &req->pre_sync_jobid, &req->post_sync_jobid);
+	if (sequential_consistency)
+	{
+		starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(req->data_handle, STARPU_MAIN_RAM, mode, _starpu_mpi_submit_ready_request, (void *)req, 1 /*sequential consistency*/, 1, &req->pre_sync_jobid, &req->post_sync_jobid);
+	}
+	else
+	{
+		/* post_sync_job_id has already been filled */
+		starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(req->data_handle, STARPU_MAIN_RAM, mode, _starpu_mpi_submit_ready_request, (void *)req, 0 /*sequential consistency*/, 1, &req->pre_sync_jobid, NULL);
+	}
 }
 
 static struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t data_handle, int dest, starpu_mpi_tag_t data_tag, MPI_Comm comm, unsigned detached, unsigned sync, int prio, void (*callback)(void *), void *arg, int sequential_consistency)
@@ -185,6 +193,13 @@ struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handl
 
 	struct _starpu_mpi_req *req = _starpu_mpi_request_fill(data_handle, source, data_tag, comm, detached, sync, 0, callback, arg, RECV_REQ, _mpi_backend._starpu_mpi_backend_irecv_size_func, sequential_consistency, is_internal_req, count);
 	_starpu_mpi_req_willpost(req);
+
+	if (sequential_consistency == 0)
+	{
+		/* Synchronization task jobid from redux is used */
+		_starpu_mpi_redux_fill_post_sync_jobid(arg, &(req->post_sync_jobid));
+	}
+
 	_starpu_mpi_isend_irecv_common(req, STARPU_W, sequential_consistency);
 	return req;
 }

+ 3 - 0
mpi/src/starpu_mpi_coop_sends.c

@@ -269,6 +269,9 @@ void _starpu_mpi_coop_send(starpu_data_handle_t data_handle, struct _starpu_mpi_
 	{
 		/* We were first, we are responsible for acquiring the data for everybody */
 		starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(req->data_handle, STARPU_MAIN_RAM, mode, _starpu_mpi_coop_sends_data_ready, coop_sends, sequential_consistency, 0, &req->pre_sync_jobid, NULL);
+		coop_sends->pre_sync_jobid = req->pre_sync_jobid;
 	}
+	else
+		req->pre_sync_jobid = coop_sends->pre_sync_jobid;
 }
 

+ 8 - 0
mpi/src/starpu_mpi_private.h

@@ -196,6 +196,9 @@ struct _starpu_mpi_coop_sends
 	struct _starpu_mpi_req **reqs_array;
 	unsigned n;
 	unsigned redirects_sent;
+
+	/* Used to trace dependencies */
+	long pre_sync_jobid;
 };
 
 /** Initialized in starpu_mpi_data_register_comm */
@@ -299,6 +302,11 @@ void _starpu_mpi_coop_send(starpu_data_handle_t data_handle, struct _starpu_mpi_
  */
 void _starpu_mpi_submit_coop_sends(struct _starpu_mpi_coop_sends *coop_sends, int submit_control, int submit_data);
 
+/*
+ * Fills post_sync_jobid with the reduction synchronization task jobid
+ */
+void _starpu_mpi_redux_fill_post_sync_jobid(const void * const redux_data_args, long * const post_sync_jobid);
+
 void _starpu_mpi_submit_ready_request_inc(struct _starpu_mpi_req *req);
 void _starpu_mpi_request_init(struct _starpu_mpi_req **req);
 struct _starpu_mpi_req * _starpu_mpi_request_fill(starpu_data_handle_t data_handle,

+ 14 - 0
mpi/src/starpu_mpi_task_insert.c

@@ -726,6 +726,7 @@ struct _starpu_mpi_redux_data_args
 	int node;
 	MPI_Comm comm;
 	struct starpu_task *taskB;
+	long taskC_jobid;
 };
 
 void _starpu_mpi_redux_data_dummy_func(void *buffers[], void *cl_arg)
@@ -792,6 +793,13 @@ void _starpu_mpi_redux_data_recv_callback(void *callback_arg)
 	starpu_mpi_irecv_detached_sequential_consistency(args->new_handle, args->node, args->data_tag, args->comm, _starpu_mpi_redux_data_detached_callback, args, 0);
 }
 
+
+void _starpu_mpi_redux_fill_post_sync_jobid(const void * const redux_data_args, long * const post_sync_jobid)
+{
+	*post_sync_jobid = ((const struct _starpu_mpi_redux_data_args *) redux_data_args)->taskC_jobid;
+}
+
+
 /* TODO: this should rather be implicitly called by starpu_mpi_task_insert when
  * a data previously accessed in REDUX mode gets accessed in R mode. */
 void starpu_mpi_redux_data_prio(MPI_Comm comm, starpu_data_handle_t data_handle, int prio)
@@ -815,6 +823,10 @@ void starpu_mpi_redux_data_prio(MPI_Comm comm, starpu_data_handle_t data_handle,
 
 	_STARPU_MPI_DEBUG(1, "Doing reduction for data %p on node %d with %d nodes ...\n", data_handle, rank, nb_nodes);
 
+	// Creating synchronization task and use its jobid for tracing
+	struct starpu_task *taskC = starpu_task_create();
+	const long taskC_jobid = starpu_task_get_job_id(taskC);
+
 	// need to count how many nodes have the data in redux mode
 	if (me == rank)
 	{
@@ -855,6 +867,8 @@ void starpu_mpi_redux_data_prio(MPI_Comm comm, starpu_data_handle_t data_handle,
 				args->node = i;
 				args->comm = comm;
 
+				args->taskC_jobid = taskC_jobid;
+
 				// We need to create taskB early as
 				// taskC declares a dependancy on it
 				args->taskB = starpu_task_create();