Quellcode durchsuchen

Fix send times in comms.rec and dump matched comms

Times in comms.rec are now the same as in the Paje trace.
Before this commit, comms.rec contains one entry for the send
and one for the recv, now an entry is a communication (send and recv).
Philippe SWARTVAGHER vor 5 Jahren
Ursprung
Commit
1ee7ea8593

+ 1 - 1
mpi/src/mpi/starpu_mpi_mpi.c

@@ -382,7 +382,7 @@ static void _starpu_mpi_isend_data_func(struct _starpu_mpi_req *req)
 	_starpu_mpi_simgrid_wait_req(&req->backend->data_request, &req->status_store, &req->queue, &req->done);
 #endif
 
-	_STARPU_MPI_TRACE_ISEND_SUBMIT_END(req->node_tag.node.rank, req->node_tag.data_tag, starpu_data_get_size(req->data_handle), req->pre_sync_jobid);
+	_STARPU_MPI_TRACE_ISEND_SUBMIT_END(req->node_tag.node.rank, req->node_tag.data_tag, starpu_data_get_size(req->data_handle), req->pre_sync_jobid, req->data_handle);
 
 	/* somebody is perhaps waiting for the MPI request to be posted */
 	STARPU_PTHREAD_MUTEX_LOCK(&req->backend->req_mutex);

+ 1 - 1
mpi/src/nmad/starpu_mpi_nmad.c

@@ -114,7 +114,7 @@ static void _starpu_mpi_isend_data_func(struct _starpu_mpi_req *req)
 		STARPU_ASSERT_MSG(req->ret == NM_ESUCCESS, "MPI_Issend returning %d", req->ret);
 	}
 
-	_STARPU_MPI_TRACE_ISEND_SUBMIT_END(req->node_tag.node.rank, req->node_tag.data_tag, starpu_data_get_size(req->data_handle), req->pre_sync_jobid);
+	_STARPU_MPI_TRACE_ISEND_SUBMIT_END(req->node_tag.node.rank, req->node_tag.data_tag, starpu_data_get_size(req->data_handle), req->pre_sync_jobid, req->data_handle);
 
 	_starpu_mpi_handle_pending_request(req);
 

+ 1 - 1
mpi/src/nmad/starpu_mpi_nmad_unknown_datatype.c

@@ -105,7 +105,7 @@ void _starpu_mpi_isend_unknown_datatype(struct _starpu_mpi_req *req)
 		STARPU_ASSERT_MSG(req->ret == NM_ESUCCESS, "nm_sr_send_issend returning %d", req->ret);
 	}
 
-	_STARPU_MPI_TRACE_ISEND_SUBMIT_END(req->node_tag.node.rank, req->node_tag.data_tag, starpu_data_get_size(req->data_handle), req->pre_sync_jobid);
+	_STARPU_MPI_TRACE_ISEND_SUBMIT_END(req->node_tag.node.rank, req->node_tag.data_tag, starpu_data_get_size(req->data_handle), req->pre_sync_jobid, req->data_handle);
 
 	_starpu_mpi_handle_pending_request(req);
 

+ 4 - 4
mpi/src/starpu_mpi_fxt.h

@@ -72,8 +72,8 @@ extern "C"
 } while (0)
 #define _STARPU_MPI_TRACE_ISEND_SUBMIT_BEGIN(dest, data_tag, size)	\
 	FUT_DO_PROBE4(_STARPU_MPI_FUT_ISEND_SUBMIT_BEGIN, (dest), (data_tag), (size), _starpu_gettid());
-#define _STARPU_MPI_TRACE_ISEND_SUBMIT_END(dest, data_tag, size, jobid)	\
-	FUT_DO_PROBE5(_STARPU_MPI_FUT_ISEND_SUBMIT_END, (dest), (data_tag), (size), (jobid), _starpu_gettid());
+#define _STARPU_MPI_TRACE_ISEND_SUBMIT_END(dest, data_tag, size, jobid, handle)	\
+	FUT_DO_PROBE6(_STARPU_MPI_FUT_ISEND_SUBMIT_END, (dest), (data_tag), (size), (jobid), _starpu_gettid(), (handle));
 #define _STARPU_MPI_TRACE_IRECV_SUBMIT_BEGIN(src, data_tag)	\
 	FUT_DO_PROBE3(_STARPU_MPI_FUT_IRECV_SUBMIT_BEGIN, (src), (data_tag), _starpu_gettid());
 #define _STARPU_MPI_TRACE_IRECV_SUBMIT_END(src, data_tag)	\
@@ -92,7 +92,7 @@ extern "C"
 	if (type == RECV_REQ) { _STARPU_MPI_TRACE_IRECV_COMPLETE_END((rank), (data_tag)); } else if (type == SEND_REQ) { _STARPU_MPI_TRACE_ISEND_COMPLETE_END((rank), (data_tag), 0); }
 #define _STARPU_MPI_TRACE_TERMINATED(req, rank, data_tag)		\
 	if ((req)->request_type == RECV_REQ) FUT_DO_PROBE5(_STARPU_MPI_FUT_IRECV_TERMINATED, (rank), (data_tag), (req)->post_sync_jobid, _starpu_gettid(), (req)->data_handle); else \
-	if ((req)->request_type == SEND_REQ) FUT_DO_PROBE4(_STARPU_MPI_FUT_ISEND_TERMINATED, (rank), (data_tag), _starpu_gettid(), (req)->data_handle);
+	if ((req)->request_type == SEND_REQ) FUT_DO_PROBE3(_STARPU_MPI_FUT_ISEND_TERMINATED, (rank), (data_tag), _starpu_gettid());
 #define _STARPU_MPI_TRACE_SLEEP_BEGIN()	\
 	FUT_DO_PROBE1(_STARPU_MPI_FUT_SLEEP_BEGIN, _starpu_gettid());
 #define _STARPU_MPI_TRACE_SLEEP_END()	\
@@ -149,7 +149,7 @@ extern "C"
 #define _STARPU_MPI_TRACE_STOP(a, b)				do {} while(0);
 #define _STARPU_MPI_TRACE_BARRIER(a, b, c)			do {} while(0);
 #define _STARPU_MPI_TRACE_ISEND_SUBMIT_BEGIN(a, b, c)		do {} while(0);
-#define _STARPU_MPI_TRACE_ISEND_SUBMIT_END(a, b, c, d)		do {} while(0);
+#define _STARPU_MPI_TRACE_ISEND_SUBMIT_END(a, b, c, d, e)	do {} while(0);
 #define _STARPU_MPI_TRACE_IRECV_SUBMIT_BEGIN(a, b)		do {} while(0);
 #define _STARPU_MPI_TRACE_IRECV_SUBMIT_END(a, b)		do {} while(0);
 #define _STARPU_MPI_TRACE_ISEND_COMPLETE_BEGIN(a, b, c)		do {} while(0);

+ 4 - 23
src/debug/traces/starpu_fxt.c

@@ -2999,6 +2999,7 @@ static void handle_mpi_isend_submit_end(struct fxt_ev_64 *ev, struct starpu_fxt_
 	int mpi_tag = ev->param[1];
 	size_t size = ev->param[2];
 	long jobid = ev->param[3];
+	unsigned long handle = ev->param[4];
 	double date = get_event_time_stamp(ev, options);
 
 	if (out_paje_file)
@@ -3015,26 +3016,7 @@ static void handle_mpi_isend_submit_end(struct fxt_ev_64 *ev, struct starpu_fxt_
 		}
 	}
 	else
-		_starpu_fxt_mpi_add_send_transfer(options->file_rank, dest, mpi_tag, size, date, jobid);
-}
-
-static void handle_mpi_isend_terminated(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
-{
-	int dest = ev->param[0];
-	int mpi_tag = ev->param[1];
-	unsigned long handle = ev->param[3];
-	double date = get_event_time_stamp(ev, options);
-
-	if (options->file_rank < 0)
-	{
-		if (!mpi_warned)
-		{
-			_STARPU_MSG("Warning : Only one trace file is given. MPI transfers will not be displayed. Add all trace files to show them ! \n");
-			mpi_warned = 1;
-		}
-	}
-	else
-		_starpu_fxt_mpi_dump_send_comm(options->file_rank, dest, mpi_tag, date, handle, comms_file);
+		_starpu_fxt_mpi_add_send_transfer(options->file_rank, dest, mpi_tag, size, date, jobid, handle);
 }
 
 static void handle_mpi_irecv_submit_begin(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
@@ -3114,7 +3096,7 @@ static void handle_mpi_irecv_terminated(struct fxt_ev_64 *ev, struct starpu_fxt_
 		}
 	}
 	else
-		_starpu_fxt_mpi_add_recv_transfer(src, options->file_rank, mpi_tag, date, jobid, handle, comms_file);
+		_starpu_fxt_mpi_add_recv_transfer(src, options->file_rank, mpi_tag, date, jobid, handle);
 }
 
 static void handle_mpi_sleep_begin(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
@@ -3896,7 +3878,6 @@ void _starpu_fxt_parse_new_file(char *filename_in, struct starpu_fxt_options *op
 				break;
 
 			case _STARPU_MPI_FUT_ISEND_TERMINATED:
-				handle_mpi_isend_terminated(&ev, options);
 				break;
 
 			case _STARPU_MPI_FUT_IRECV_TERMINATED:
@@ -4588,7 +4569,7 @@ void starpu_fxt_generate_trace(struct starpu_fxt_options *options)
 
 		/* display the MPI transfers if possible */
 		if (display_mpi)
-			_starpu_fxt_display_mpi_transfers(options, rank_k, out_paje_file);
+			_starpu_fxt_display_mpi_transfers(options, rank_k, out_paje_file, comms_file);
 	}
 
 	/* close the different files */

+ 3 - 4
src/debug/traces/starpu_fxt.h

@@ -59,10 +59,9 @@ void _starpu_fxt_dag_add_sync_point(void);
  */
 
 int _starpu_fxt_mpi_find_sync_point(char *filename_in, uint64_t *offset, int *key, int *rank);
-void _starpu_fxt_mpi_add_send_transfer(int src, int dst, long mpi_tag, size_t size, float date, long jobid);
-void _starpu_fxt_mpi_add_recv_transfer(int src, int dst, long mpi_tag, float date, long jobid, unsigned long handle, FILE* comms_file);
-void _starpu_fxt_display_mpi_transfers(struct starpu_fxt_options *options, int *ranks, FILE *out_paje_file);
-void _starpu_fxt_mpi_dump_send_comm(int src, int dst STARPU_ATTRIBUTE_UNUSED, int mpi_tag, float date, unsigned long handle, FILE* comms_file);
+void _starpu_fxt_mpi_add_send_transfer(int src, int dst, long mpi_tag, size_t size, float date, long jobid, unsigned long handle);
+void _starpu_fxt_mpi_add_recv_transfer(int src, int dst, long mpi_tag, float date, long jobid, unsigned long handle);
+void _starpu_fxt_display_mpi_transfers(struct starpu_fxt_options *options, int *ranks, FILE *out_paje_file, FILE* out_comms_file);
 
 void _starpu_fxt_write_paje_header(FILE *file, struct starpu_fxt_options *options);
 

+ 26 - 32
src/debug/traces/starpu_fxt_mpi.c

@@ -39,6 +39,7 @@ LIST_TYPE(mpi_transfer,
 	float date;
 	long jobid;
 	double bandwidth;
+	unsigned long handle;
 );
 
 /* Returns 0 if a barrier is found, -1 otherwise. In case of success, offset is
@@ -123,7 +124,7 @@ unsigned mpi_recvs_used[MAX_MPI_NODES] = {0};
 unsigned mpi_recvs_matched[MAX_MPI_NODES][MAX_MPI_NODES] = { {0} };
 unsigned mpi_sends_matched[MAX_MPI_NODES][MAX_MPI_NODES] = { {0} };
 
-void _starpu_fxt_mpi_add_send_transfer(int src, int dst STARPU_ATTRIBUTE_UNUSED, long mpi_tag, size_t size, float date, long jobid)
+void _starpu_fxt_mpi_add_send_transfer(int src, int dst STARPU_ATTRIBUTE_UNUSED, long mpi_tag, size_t size, float date, long jobid, unsigned long handle)
 {
 	STARPU_ASSERT(src >= 0);
 	if (src >= MAX_MPI_NODES)
@@ -151,37 +152,11 @@ void _starpu_fxt_mpi_add_send_transfer(int src, int dst STARPU_ATTRIBUTE_UNUSED,
 	mpi_sends[src][slot].size = size;
 	mpi_sends[src][slot].date = date;
 	mpi_sends[src][slot].jobid = jobid;
+	mpi_sends[src][slot].handle = handle;
 }
 
-void _starpu_fxt_mpi_dump_send_comm(int src, int dst STARPU_ATTRIBUTE_UNUSED, int mpi_tag, float date, unsigned long handle, FILE* comms_file)
+void _starpu_fxt_mpi_add_recv_transfer(int src STARPU_ATTRIBUTE_UNUSED, int dst, long mpi_tag, float date, long jobid, unsigned long handle)
 {
-	STARPU_ASSERT(src >= 0);
-
-	if (comms_file != NULL)
-	{
-		fprintf(comms_file, "Type: send\n");
-		fprintf(comms_file, "Src: %d\n", src);
-		fprintf(comms_file, "Dst: %d\n", dst);
-		fprintf(comms_file, "Tag: %d\n", mpi_tag);
-		fprintf(comms_file, "Time: %f\n", date);
-		fprintf(comms_file, "Handle: %lx\n", handle);
-		fprintf(comms_file, "\n");
-	}
-}
-
-void _starpu_fxt_mpi_add_recv_transfer(int src STARPU_ATTRIBUTE_UNUSED, int dst, long mpi_tag, float date, long jobid, unsigned long handle, FILE* comms_file)
-{
-	if (comms_file != NULL)
-	{
-		fprintf(comms_file, "Type: recv\n");
-		fprintf(comms_file, "Src: %d\n", src);
-		fprintf(comms_file, "Dst: %d\n", dst);
-		fprintf(comms_file, "Tag: %ld\n", mpi_tag);
-		fprintf(comms_file, "Time: %f\n", date);
-		fprintf(comms_file, "Handle: %lx\n", handle);
-		fprintf(comms_file, "\n");
-	}
-
 	if (dst >= MAX_MPI_NODES)
 		return;
 	unsigned slot = mpi_recvs_used[dst]++;
@@ -206,6 +181,7 @@ void _starpu_fxt_mpi_add_recv_transfer(int src STARPU_ATTRIBUTE_UNUSED, int dst,
 	mpi_recvs[dst][slot].mpi_tag = mpi_tag;
 	mpi_recvs[dst][slot].date = date;
 	mpi_recvs[dst][slot].jobid = jobid;
+	mpi_recvs[dst][slot].handle = handle;
 }
 
 static
@@ -246,7 +222,7 @@ struct mpi_transfer *try_to_match_send_transfer(int src STARPU_ATTRIBUTE_UNUSED,
 
 static unsigned long mpi_com_id = 0;
 
-static void display_all_transfers_from_trace(FILE *out_paje_file, unsigned n)
+static void display_all_transfers_from_trace(FILE *out_paje_file, FILE *out_comms_file, unsigned n)
 {
 	unsigned slot[MAX_MPI_NODES] = { 0 }, node;
 	struct mpi_transfer_list pending_receives; /* Sorted list of matches which have not happened yet */
@@ -309,6 +285,7 @@ static void display_all_transfers_from_trace(FILE *out_paje_file, unsigned n)
 		int dst = cur->dst;
 		long mpi_tag = cur->mpi_tag;
 		size_t size = cur->size;
+		unsigned long send_handle = cur->handle;
 
 		if (dst < MAX_MPI_NODES)
 			match = try_to_match_send_transfer(src, dst, mpi_tag);
@@ -318,8 +295,12 @@ static void display_all_transfers_from_trace(FILE *out_paje_file, unsigned n)
 		if (match)
 		{
 			float end_date = match->date;
+			unsigned long recv_handle = match->handle;
 			struct mpi_transfer *prev;
 
+			if (end_date <= start_date)
+				_STARPU_MSG("Warning: a communication finished before it started !\n");
+
 			match->bandwidth = (0.001*size)/(end_date - start_date);
 			current_out_bandwidth[src] += match->bandwidth;
 			current_in_bandwidth[dst] += match->bandwidth;
@@ -365,6 +346,19 @@ static void display_all_transfers_from_trace(FILE *out_paje_file, unsigned n)
 			fprintf(out_paje_file, "23	%.9f	MPIL	MPIroot	%lu	%d_mpict	mpicom_%lu	%ld\n", start_date, (unsigned long)size, src, id, mpi_tag);
 			fprintf(out_paje_file, "19	%.9f	MPIL	MPIroot	%lu	%d_mpict	mpicom_%lu\n", end_date, (unsigned long)size, dst, id);
 #endif
+
+			if (out_comms_file != NULL)
+			{
+				fprintf(out_comms_file, "Src: %d\n", src);
+				fprintf(out_comms_file, "Dst: %d\n", dst);
+				fprintf(out_comms_file, "Tag: %ld\n", mpi_tag);
+				fprintf(out_comms_file, "SendTime: %.9f\n", start_date);
+				fprintf(out_comms_file, "RecvTime: %.9f\n", end_date);
+				fprintf(out_comms_file, "SendHandle: %lx\n", send_handle);
+				fprintf(out_comms_file, "RecvHandle: %lx\n", recv_handle);
+				fprintf(out_comms_file, "Size: %ld\n", size);
+				fprintf(out_comms_file, "\n");
+			}
 		}
 		else
 		{
@@ -375,7 +369,7 @@ static void display_all_transfers_from_trace(FILE *out_paje_file, unsigned n)
 	}
 }
 
-void _starpu_fxt_display_mpi_transfers(struct starpu_fxt_options *options, int *ranks STARPU_ATTRIBUTE_UNUSED, FILE *out_paje_file)
+void _starpu_fxt_display_mpi_transfers(struct starpu_fxt_options *options, int *ranks STARPU_ATTRIBUTE_UNUSED, FILE *out_paje_file, FILE* out_comms_file)
 {
 	if (options->ninputfiles > MAX_MPI_NODES)
 	{
@@ -385,7 +379,7 @@ void _starpu_fxt_display_mpi_transfers(struct starpu_fxt_options *options, int *
 
 	/* display the MPI transfers if possible */
 	if (out_paje_file)
-		display_all_transfers_from_trace(out_paje_file, options->ninputfiles);
+		display_all_transfers_from_trace(out_paje_file, out_comms_file, options->ninputfiles);
 }
 
 #endif // STARPU_USE_FXT

+ 31 - 29
tools/starpu_send_recv_data_use.py

@@ -65,7 +65,12 @@ working_directory = sys.argv[1]
 comms = convert_rec_file(os.path.join(working_directory, "comms.rec"))
 tasks = [t for t in convert_rec_file(os.path.join(working_directory, "tasks.rec")) if "control" not in t and "starttime" in t]
 
-def plot_graph(comm_type, match, filename, title, xlabel):
+if len(tasks) == 0:
+    print("There is no task using data after communication.")
+    sys.exit(0)
+
+
+def plot_graph(comm_time_key, match, filename, title, xlabel):
     delays = []
     workers = dict()
     nb = 0
@@ -74,32 +79,28 @@ def plot_graph(comm_type, match, filename, title, xlabel):
     max_time = 0.
 
     for c in comms:
-        if c["type"] == comm_type:
-            t_matched = None
-            for t in tasks:
-                if match(t, c):
-                    t_matched = t
-                    break
-
-            if t_matched is None:
-                if comm_type == "recv":
-                    print("No match found")
-            else:
-                worker = str(t_matched['mpirank']) + "-" + str(t_matched['workerid'])
-                if worker not in workers:
-                    workers[worker] = []
+        t_matched = None
+        for t in tasks:
+            if match(t, c):
+                t_matched = t
+                break
+
+        if t_matched is not None:
+            worker = str(t_matched['mpirank']) + "-" + str(t_matched['workerid'])
+            if worker not in workers:
+                workers[worker] = []
 
-                eps = t["starttime"] - c["time"]
-                assert(eps > 0)
-                durations.append(eps)
-                workers[worker].append((c["time"], eps))
+            eps = t["starttime"] - c[comm_time_key]
+            assert(eps > 0)
+            durations.append(eps)
+            workers[worker].append((c[comm_time_key], eps))
 
-                if min_time == 0 or c["time"] < min_time:
-                    min_time = c["time"]
-                if max_time == 0 or c["time"] > max_time:
-                    max_time = c["time"]
+            if min_time == 0 or c[comm_time_key] < min_time:
+                min_time = c[comm_time_key]
+            if max_time == 0 or c[comm_time_key] > max_time:
+                max_time = c[comm_time_key]
 
-                nb += 1
+            nb += 1
 
 
     fig = plt.figure(constrained_layout=True)
@@ -122,14 +123,15 @@ def plot_graph(comm_type, match, filename, title, xlabel):
     axs[0].set_yticklabels(list(workers))
     axs[0].set(xlabel="Time (ms) - Duration: " + str(max_time - min_time) + "ms", ylabel="Worker [mpi]-[*pu]", title=title)
 
-    axs[2].hist(durations, bins=np.logspace(np.log10(1), np.log10(max(durations)), 50), rwidth=0.8)
-    axs[2].set_xscale("log")
-    axs[2].set(xlabel=xlabel, ylabel="Number of occurences", title="Histogramm")
+    if len(durations) != 0:
+        axs[2].hist(durations, bins=np.logspace(np.log10(1), np.log10(max(durations)), 50), rwidth=0.8)
+        axs[2].set_xscale("log")
+        axs[2].set(xlabel=xlabel, ylabel="Number of occurences", title="Histogramm")
 
     fig.set_size_inches(15, 9)
 
     plt.savefig(os.path.join(working_directory, filename), dpi=100)
     plt.show()
 
-plot_graph("recv", lambda t, c: (t["mpirank"] == c["dst"] and t["starttime"] >= c["time"] and c["handle"] in t["handles"]), "recv_use.png", "Elapsed time between recv and use (ms)", "Time between data reception and its use by a task")
-plot_graph("send", lambda t, c: (t["mpirank"] == c["src"] and t["starttime"] >= c["time"] and c["handle"] in t["handles"]), "send_use.png", "Elapsed time between send and use (ms)", "Time between data sending and its use by a task")
+plot_graph("recvtime", lambda t, c: (t["mpirank"] == c["dst"] and t["starttime"] >= c["recvtime"] and str(c["recvhandle"]) in t["handles"]), "recv_use.png", "Elapsed time between recv and use (ms)", "Time between data reception and its use by a task")
+plot_graph("sendtime", lambda t, c: (t["mpirank"] == c["src"] and t["starttime"] >= c["sendtime"] and str(c["sendhandle"]) in t["handles"]), "send_use.png", "Elapsed time between send and use (ms)", "Time between data sending and its use by a task")