Преглед изворни кода

Add mpi communications in dag.dot

Samuel Thibault пре 8 година
родитељ
комит
c7f166c27c

+ 8 - 0
doc/doxygen/chapters/api/data_management.doxy

@@ -360,6 +360,14 @@ memory.
 ::STARPU_ACQUIRE_NO_NODE and ::STARPU_ACQUIRE_NO_NODE_LOCK_ALL can be used instead of an
 explicit node number.
 
+\fn int starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(starpu_data_handle_t handle, int node, enum starpu_data_access_mode mode, void (*callback)(void *), void *arg, int sequential_consistency, long *pre_sync_jobid, long *post_sync_jobid)
+\ingroup API_Data_Management
+This is the same as starpu_data_acquire_on_node_cb_sequential_consistency(),
+except that the \e pre_sync_jobid and \e post_sync_jobid parameters can be used
+to retrieve the jobid of the synchronization tasks. \e pre_sync_jobid happens
+just before the acquisition, and \e post_sync_jobid happens just after the
+release.
+
 \fn int starpu_data_acquire_on_node_try(starpu_data_handle_t handle, int node, enum starpu_data_access_mode mode)
 \ingroup API_Data_Management
 This is the same as starpu_data_acquire_try(), except that the

+ 1 - 0
include/starpu_data.h

@@ -73,6 +73,7 @@ int starpu_data_acquire_cb(starpu_data_handle_t handle, enum starpu_data_access_
 int starpu_data_acquire_on_node_cb(starpu_data_handle_t handle, int node, enum starpu_data_access_mode mode, void (*callback)(void *), void *arg);
 int starpu_data_acquire_cb_sequential_consistency(starpu_data_handle_t handle, enum starpu_data_access_mode mode, void (*callback)(void *), void *arg, int sequential_consistency);
 int starpu_data_acquire_on_node_cb_sequential_consistency(starpu_data_handle_t handle, int node, enum starpu_data_access_mode mode, void (*callback)(void *), void *arg, int sequential_consistency);
+int starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(starpu_data_handle_t handle, int node, enum starpu_data_access_mode mode, void (*callback)(void *), void *arg, int sequential_consistency, long *pre_sync_jobid, long *post_sync_jobid);
 
 int starpu_data_acquire_try(starpu_data_handle_t handle, enum starpu_data_access_mode mode);
 int starpu_data_acquire_on_node_try(starpu_data_handle_t handle, int node, enum starpu_data_access_mode mode);

+ 6 - 3
mpi/src/starpu_mpi.c

@@ -148,6 +148,8 @@ static void _starpu_mpi_request_init(struct _starpu_mpi_req **req)
 	(*req)->early_data_handle = NULL;
 	(*req)->envelope = NULL;
 	(*req)->sequential_consistency = 1;
+	(*req)->pre_sync_jobid = -1;
+	(*req)->post_sync_jobid = -1;
 
 #ifdef STARPU_SIMGRID
 	starpu_pthread_queue_init(&((*req)->queue));
@@ -333,7 +335,8 @@ static struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle
 
 	if (_starpu_mpi_fake_world_size != -1)
 	{
-		starpu_data_acquire_cb_sequential_consistency(data_handle, mode, nop_acquire_cb, data_handle, sequential_consistency);
+		/* Don't actually do the communication */
+		starpu_data_acquire_on_node_cb_sequential_consistency(data_handle, STARPU_MAIN_RAM, mode, nop_acquire_cb, data_handle, sequential_consistency);
 		return NULL;
 	}
 
@@ -361,7 +364,7 @@ static struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle
 	/* Asynchronously request StarPU to fetch the data in main memory: when
 	 * it is available in main memory, _starpu_mpi_submit_ready_request(req) is called and
 	 * the request is actually submitted */
-	starpu_data_acquire_cb_sequential_consistency(data_handle, mode, _starpu_mpi_submit_ready_request, (void *)req, sequential_consistency);
+	starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(data_handle, STARPU_MAIN_RAM, mode, _starpu_mpi_submit_ready_request, (void *)req, sequential_consistency, &req->pre_sync_jobid, &req->post_sync_jobid);
 
 	_STARPU_MPI_LOG_OUT();
 	return req;
@@ -449,7 +452,7 @@ static void _starpu_mpi_isend_data_func(struct _starpu_mpi_req *req)
 	_starpu_mpi_simgrid_wait_req(&req->data_request, &req->status_store, &req->queue, &req->done);
 #endif
 
-	_STARPU_MPI_TRACE_ISEND_SUBMIT_END(req->node_tag.rank, req->node_tag.data_tag, starpu_data_get_size(req->data_handle));
+	_STARPU_MPI_TRACE_ISEND_SUBMIT_END(req->node_tag.rank, req->node_tag.data_tag, starpu_data_get_size(req->data_handle), req->pre_sync_jobid);
 
 	/* somebody is perhaps waiting for the MPI request to be posted */
 	STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);

+ 4 - 4
mpi/src/starpu_mpi_fxt.h

@@ -58,8 +58,8 @@ extern "C" {
 	FUT_DO_PROBE4(_STARPU_MPI_FUT_BARRIER, (rank), (worldsize), (key), _starpu_gettid());
 #define _STARPU_MPI_TRACE_ISEND_SUBMIT_BEGIN(dest, mpi_tag, size)	\
 	FUT_DO_PROBE4(_STARPU_MPI_FUT_ISEND_SUBMIT_BEGIN, (dest), (mpi_tag), (size), _starpu_gettid());
-#define _STARPU_MPI_TRACE_ISEND_SUBMIT_END(dest, mpi_tag, size)	\
-	FUT_DO_PROBE4(_STARPU_MPI_FUT_ISEND_SUBMIT_END, (dest), (mpi_tag), (size), _starpu_gettid());
+#define _STARPU_MPI_TRACE_ISEND_SUBMIT_END(dest, mpi_tag, size, jobid)	\
+	FUT_DO_PROBE5(_STARPU_MPI_FUT_ISEND_SUBMIT_END, (dest), (mpi_tag), (size), (jobid), _starpu_gettid());
 #define _STARPU_MPI_TRACE_IRECV_SUBMIT_BEGIN(src, mpi_tag)	\
 	FUT_DO_PROBE3(_STARPU_MPI_FUT_IRECV_SUBMIT_BEGIN, (src), (mpi_tag), _starpu_gettid());
 #define _STARPU_MPI_TRACE_IRECV_SUBMIT_END(src, mpi_tag)	\
@@ -77,7 +77,7 @@ extern "C" {
 #define _STARPU_MPI_TRACE_COMPLETE_END(type, rank, mpi_tag)		\
 	if (type == RECV_REQ) { _STARPU_MPI_TRACE_IRECV_COMPLETE_END((rank), (mpi_tag)); } else if (type == SEND_REQ) { _STARPU_MPI_TRACE_ISEND_COMPLETE_END((rank), (mpi_tag), 0); }
 #define _STARPU_MPI_TRACE_TERMINATED(req, rank, mpi_tag)		\
-	if ((req)->request_type == RECV_REQ) FUT_DO_PROBE3(_STARPU_MPI_FUT_IRECV_TERMINATED, (rank), (mpi_tag), _starpu_gettid()); else \
+	if ((req)->request_type == RECV_REQ) FUT_DO_PROBE4(_STARPU_MPI_FUT_IRECV_TERMINATED, (rank), (mpi_tag), (req)->post_sync_jobid, _starpu_gettid()); else \
 	if ((req)->request_type == SEND_REQ) FUT_DO_PROBE3(_STARPU_MPI_FUT_ISEND_TERMINATED, (rank), (mpi_tag), _starpu_gettid());
 #define _STARPU_MPI_TRACE_SLEEP_BEGIN()	\
 	FUT_DO_PROBE1(_STARPU_MPI_FUT_SLEEP_BEGIN, _starpu_gettid());
@@ -103,7 +103,7 @@ extern "C" {
 #define _STARPU_MPI_TRACE_STOP(a, b)				do {} while(0);
 #define _STARPU_MPI_TRACE_BARRIER(a, b, c)			do {} while(0);
 #define _STARPU_MPI_TRACE_ISEND_SUBMIT_BEGIN(a, b, c)		do {} while(0);
-#define _STARPU_MPI_TRACE_ISEND_SUBMIT_END(a, b, c)		do {} while(0);
+#define _STARPU_MPI_TRACE_ISEND_SUBMIT_END(a, b, c, d)		do {} while(0);
 #define _STARPU_MPI_TRACE_IRECV_SUBMIT_BEGIN(a, b)		do {} while(0);
 #define _STARPU_MPI_TRACE_IRECV_SUBMIT_END(a, b)		do {} while(0);
 #define _STARPU_MPI_TRACE_ISEND_COMPLETE_BEGIN(a, b, c)		do {} while(0);

+ 3 - 0
mpi/src/starpu_mpi_private.h

@@ -250,6 +250,9 @@ LIST_TYPE(_starpu_mpi_req,
 
 	int sequential_consistency;
 
+	long pre_sync_jobid;
+	long post_sync_jobid;
+
      	UT_hash_handle hh;
 
 #ifdef STARPU_SIMGRID

+ 18 - 2
src/datawizard/user_interactions.c

@@ -173,9 +173,10 @@ static void starpu_data_acquire_cb_pre_sync_callback(void *arg)
 }
 
 /* The data must be released by calling starpu_data_release later on */
-int starpu_data_acquire_on_node_cb_sequential_consistency(starpu_data_handle_t handle, int node,
+int starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(starpu_data_handle_t handle, int node,
 							  enum starpu_data_access_mode mode, void (*callback)(void *), void *arg,
-							  int sequential_consistency)
+							  int sequential_consistency,
+							  long *pre_sync_jobid, long *post_sync_jobid)
 {
 	STARPU_ASSERT(handle);
 	STARPU_ASSERT_MSG(handle->nchildren == 0, "Acquiring a partitioned data (%p) is not possible", handle);
@@ -202,10 +203,14 @@ int starpu_data_acquire_on_node_cb_sequential_consistency(starpu_data_handle_t h
 		wrapper->pre_sync_task->detach = 1;
 		wrapper->pre_sync_task->callback_func = starpu_data_acquire_cb_pre_sync_callback;
 		wrapper->pre_sync_task->callback_arg = wrapper;
+		if (pre_sync_jobid)
+			*pre_sync_jobid = _starpu_get_job_associated_to_task(wrapper->pre_sync_task)->job_id;
 
 		wrapper->post_sync_task = starpu_task_create();
 		wrapper->post_sync_task->name = "_starpu_data_acquire_cb_post";
 		wrapper->post_sync_task->detach = 1;
+		if (post_sync_jobid)
+			*post_sync_jobid = _starpu_get_job_associated_to_task(wrapper->post_sync_task)->job_id;
 
 		new_task = _starpu_detect_implicit_data_deps_with_handle(wrapper->pre_sync_task, wrapper->post_sync_task, &_starpu_get_job_associated_to_task(wrapper->post_sync_task)->implicit_dep_slot, handle, mode);
 		STARPU_PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
@@ -222,6 +227,10 @@ int starpu_data_acquire_on_node_cb_sequential_consistency(starpu_data_handle_t h
 	}
 	else
 	{
+		if (pre_sync_jobid)
+			*pre_sync_jobid = -1;
+		if (post_sync_jobid)
+			*post_sync_jobid = -1;
 		STARPU_PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
 
 		starpu_data_acquire_cb_pre_sync_callback(wrapper);
@@ -231,6 +240,13 @@ int starpu_data_acquire_on_node_cb_sequential_consistency(starpu_data_handle_t h
 	return 0;
 }
 
+int starpu_data_acquire_on_node_cb_sequential_consistency(starpu_data_handle_t handle, int node,
+							  enum starpu_data_access_mode mode, void (*callback)(void *), void *arg,
+							  int sequential_consistency)
+{
+	return starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(handle, node, mode, callback, arg, sequential_consistency, NULL, NULL);
+}
+
 
 int starpu_data_acquire_on_node_cb(starpu_data_handle_t handle, int node,
 				   enum starpu_data_access_mode mode, void (*callback)(void *), void *arg)

+ 4 - 2
src/debug/traces/starpu_fxt.c

@@ -2479,6 +2479,7 @@ static void handle_mpi_isend_submit_end(struct fxt_ev_64 *ev, struct starpu_fxt_
 	int dest = ev->param[0];
 	int mpi_tag = ev->param[1];
 	size_t size = ev->param[2];
+	long jobid = ev->param[3];
 	double date = get_event_time_stamp(ev, options);
 
 	if (out_paje_file)
@@ -2495,7 +2496,7 @@ static void handle_mpi_isend_submit_end(struct fxt_ev_64 *ev, struct starpu_fxt_
 		}
 	}
 	else
-		_starpu_fxt_mpi_add_send_transfer(options->file_rank, dest, mpi_tag, size, date);
+		_starpu_fxt_mpi_add_send_transfer(options->file_rank, dest, mpi_tag, size, date, jobid);
 }
 
 static void handle_mpi_irecv_submit_begin(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
@@ -2562,6 +2563,7 @@ static void handle_mpi_irecv_terminated(struct fxt_ev_64 *ev, struct starpu_fxt_
 {
 	int src = ev->param[0];
 	int mpi_tag = ev->param[1];
+	long jobid = ev->param[2];
 	double date = get_event_time_stamp(ev, options);
 
 	if (options->file_rank < 0)
@@ -2573,7 +2575,7 @@ static void handle_mpi_irecv_terminated(struct fxt_ev_64 *ev, struct starpu_fxt_
 		}
 	}
 	else
-		_starpu_fxt_mpi_add_recv_transfer(src, options->file_rank, mpi_tag, date);
+		_starpu_fxt_mpi_add_recv_transfer(src, options->file_rank, mpi_tag, date, jobid);
 }
 
 static void handle_mpi_sleep_begin(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)

+ 4 - 2
src/debug/traces/starpu_fxt.h

@@ -47,6 +47,8 @@ void _starpu_fxt_dag_add_tag_deps(const char *prefix, uint64_t child, uint64_t f
 void _starpu_fxt_dag_set_tag_done(const char *prefix, uint64_t tag, const char *color);
 void _starpu_fxt_dag_add_task_deps(const char *prefix, unsigned long dep_prev, unsigned long dep_succ);
 void _starpu_fxt_dag_set_task_done(const char *prefix, unsigned long job_id, const char *label, const char *color);
+void _starpu_fxt_dag_add_send(int src, unsigned long dep_prev, unsigned long tag, unsigned long id);
+void _starpu_fxt_dag_add_receive(int dst, unsigned long dep_prev, unsigned long tag, unsigned long id);
 void _starpu_fxt_dag_add_sync_point(void);
 
 /*
@@ -54,8 +56,8 @@ void _starpu_fxt_dag_add_sync_point(void);
  */
 
 int _starpu_fxt_mpi_find_sync_point(char *filename_in, uint64_t *offset, int *key, int *rank);
-void _starpu_fxt_mpi_add_send_transfer(int src, int dst, int mpi_tag, size_t size, float date);
-void _starpu_fxt_mpi_add_recv_transfer(int src, int dst, int mpi_tag, float date);
+void _starpu_fxt_mpi_add_send_transfer(int src, int dst, int mpi_tag, size_t size, float date, long jobid);
+void _starpu_fxt_mpi_add_recv_transfer(int src, int dst, int mpi_tag, float date, long jobid);
 void _starpu_fxt_display_mpi_transfers(struct starpu_fxt_options *options, int *ranks, FILE *out_paje_file);
 
 void _starpu_fxt_write_paje_header(FILE *file);

+ 12 - 0
src/debug/traces/starpu_fxt_dag.c

@@ -97,6 +97,18 @@ void _starpu_fxt_dag_set_task_done(const char *prefix, unsigned long job_id, con
 		fprintf(out_file, "\t \"task_%s%lu\" [ style=filled, label=\"%s\", fillcolor=\"%s\"]\n", prefix, job_id, label, color);
 }
 
+void _starpu_fxt_dag_add_send(int src, unsigned long dep_prev, unsigned long tag, unsigned long id)
+{
+	if (out_file)
+		fprintf(out_file, "\t \"task_%d_%lu\"->\"mpi_%lu_%lu\"\n", src, dep_prev, tag, id);
+}
+
+void _starpu_fxt_dag_add_receive(int dst, unsigned long dep_prev, unsigned long tag, unsigned long id)
+{
+	if (out_file)
+		fprintf(out_file, "\t \"mpi_%lu_%lu\"->\"task_%d_%lu\"\n", tag, id, dst, dep_prev);
+}
+
 void _starpu_fxt_dag_add_sync_point(void)
 {
 	if (!out_file)

+ 9 - 2
src/debug/traces/starpu_fxt_mpi.c

@@ -35,6 +35,7 @@ LIST_TYPE(mpi_transfer,
 	int mpi_tag;
 	size_t size;
 	float date;
+	long jobid;
 	double bandwidth;
 );
 
@@ -120,7 +121,7 @@ unsigned mpi_recvs_used[MAX_MPI_NODES] = {0};
 unsigned mpi_recvs_matched[MAX_MPI_NODES][MAX_MPI_NODES] = { {0} };
 unsigned mpi_sends_matched[MAX_MPI_NODES][MAX_MPI_NODES] = { {0} };
 
-void _starpu_fxt_mpi_add_send_transfer(int src, int dst STARPU_ATTRIBUTE_UNUSED, int mpi_tag, size_t size, float date)
+void _starpu_fxt_mpi_add_send_transfer(int src, int dst STARPU_ATTRIBUTE_UNUSED, int mpi_tag, size_t size, float date, long jobid)
 {
 	STARPU_ASSERT(src >= 0);
 	if (src >= MAX_MPI_NODES)
@@ -147,9 +148,10 @@ void _starpu_fxt_mpi_add_send_transfer(int src, int dst STARPU_ATTRIBUTE_UNUSED,
 	mpi_sends[src][slot].mpi_tag = mpi_tag;
 	mpi_sends[src][slot].size = size;
 	mpi_sends[src][slot].date = date;
+	mpi_sends[src][slot].jobid = jobid;
 }
 
-void _starpu_fxt_mpi_add_recv_transfer(int src STARPU_ATTRIBUTE_UNUSED, int dst, int mpi_tag, float date)
+void _starpu_fxt_mpi_add_recv_transfer(int src STARPU_ATTRIBUTE_UNUSED, int dst, int mpi_tag, float date, long jobid)
 {
 	if (dst >= MAX_MPI_NODES)
 		return;
@@ -174,6 +176,7 @@ void _starpu_fxt_mpi_add_recv_transfer(int src STARPU_ATTRIBUTE_UNUSED, int dst,
 	mpi_recvs[dst][slot].dst = dst;
 	mpi_recvs[dst][slot].mpi_tag = mpi_tag;
 	mpi_recvs[dst][slot].date = date;
+	mpi_recvs[dst][slot].jobid = jobid;
 }
 
 static
@@ -318,6 +321,10 @@ static void display_all_transfers_from_trace(FILE *out_paje_file, unsigned n)
 			}
 
 			unsigned long id = mpi_com_id++;
+			if (cur->jobid != -1)
+				_starpu_fxt_dag_add_send(src, cur->jobid, mpi_tag, id);
+			if (match->jobid != -1)
+				_starpu_fxt_dag_add_receive(dst, match->jobid, mpi_tag, id);
 #ifdef STARPU_HAVE_POTI
 			char paje_value[STARPU_POTI_STR_LEN], paje_key[STARPU_POTI_STR_LEN];
 			snprintf(paje_value, STARPU_POTI_STR_LEN, "%lu", (long unsigned) size);