Browse Source

Add comms.rec generation from traces and script to plot graph

Philippe SWARTVAGHER 5 years ago
parent
commit
3669c8a4d7

+ 1 - 0
include/starpu_fxt.h

@@ -65,6 +65,7 @@ struct starpu_fxt_options
 	char *dag_path;
 	char *tasks_path;
 	char *data_path;
+	char *comms_path;
 	char *anim_path;
 	char *states_path;
 

+ 2 - 2
mpi/src/starpu_mpi_fxt.h

@@ -91,8 +91,8 @@ extern "C"
 #define _STARPU_MPI_TRACE_COMPLETE_END(type, rank, data_tag)		\
 	if (type == RECV_REQ) { _STARPU_MPI_TRACE_IRECV_COMPLETE_END((rank), (data_tag)); } else if (type == SEND_REQ) { _STARPU_MPI_TRACE_ISEND_COMPLETE_END((rank), (data_tag), 0); }
 #define _STARPU_MPI_TRACE_TERMINATED(req, rank, data_tag)		\
-	if ((req)->request_type == RECV_REQ) FUT_DO_PROBE4(_STARPU_MPI_FUT_IRECV_TERMINATED, (rank), (data_tag), (req)->post_sync_jobid, _starpu_gettid()); else \
-	if ((req)->request_type == SEND_REQ) FUT_DO_PROBE3(_STARPU_MPI_FUT_ISEND_TERMINATED, (rank), (data_tag), _starpu_gettid());
+	if ((req)->request_type == RECV_REQ) FUT_DO_PROBE5(_STARPU_MPI_FUT_IRECV_TERMINATED, (rank), (data_tag), (req)->post_sync_jobid, _starpu_gettid(), (req)->data_handle); else \
+	if ((req)->request_type == SEND_REQ) FUT_DO_PROBE4(_STARPU_MPI_FUT_ISEND_TERMINATED, (rank), (data_tag), _starpu_gettid(), (req)->data_handle);
 #define _STARPU_MPI_TRACE_SLEEP_BEGIN()	\
 	FUT_DO_PROBE1(_STARPU_MPI_FUT_SLEEP_BEGIN, _starpu_gettid());
 #define _STARPU_MPI_TRACE_SLEEP_END()	\

+ 43 - 1
src/debug/traces/starpu_fxt.c

@@ -82,6 +82,7 @@ static FILE *anim_file;
 static FILE *tasks_file;
 static FILE *data_file;
 static FILE *trace_file;
+static FILE *comms_file;
 
 struct data_parameter_info
 {
@@ -2977,6 +2978,25 @@ static void handle_mpi_isend_submit_end(struct fxt_ev_64 *ev, struct starpu_fxt_
 		_starpu_fxt_mpi_add_send_transfer(options->file_rank, dest, mpi_tag, size, date, jobid);
 }
 
+static void handle_mpi_isend_terminated(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
+{
+	int dest = ev->param[0];
+	int mpi_tag = ev->param[1];
+	unsigned long handle = ev->param[3];
+	double date = get_event_time_stamp(ev, options);
+
+	if (options->file_rank < 0)
+	{
+		if (!mpi_warned)
+		{
+			_STARPU_MSG("Warning : Only one trace file is given. MPI transfers will not be displayed. Add all trace files to show them ! \n");
+			mpi_warned = 1;
+		}
+	}
+	else
+		_starpu_fxt_mpi_dump_send_comm(options->file_rank, dest, mpi_tag, date, handle, comms_file);
+}
+
 static void handle_mpi_irecv_submit_begin(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
 {
 	double date = get_event_time_stamp(ev, options);
@@ -3042,6 +3062,7 @@ static void handle_mpi_irecv_terminated(struct fxt_ev_64 *ev, struct starpu_fxt_
 	int src = ev->param[0];
 	int mpi_tag = ev->param[1];
 	long jobid = ev->param[2];
+	unsigned long handle = ev->param[4];
 	double date = get_event_time_stamp(ev, options);
 
 	if (options->file_rank < 0)
@@ -3053,7 +3074,7 @@ static void handle_mpi_irecv_terminated(struct fxt_ev_64 *ev, struct starpu_fxt_
 		}
 	}
 	else
-		_starpu_fxt_mpi_add_recv_transfer(src, options->file_rank, mpi_tag, date, jobid);
+		_starpu_fxt_mpi_add_recv_transfer(src, options->file_rank, mpi_tag, date, jobid, handle, comms_file);
 }
 
 static void handle_mpi_sleep_begin(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
@@ -3831,6 +3852,7 @@ void _starpu_fxt_parse_new_file(char *filename_in, struct starpu_fxt_options *op
 				break;
 
 			case _STARPU_MPI_FUT_ISEND_TERMINATED:
+				handle_mpi_isend_terminated(&ev, options);
 				break;
 
 			case _STARPU_MPI_FUT_IRECV_TERMINATED:
@@ -4127,6 +4149,7 @@ void starpu_fxt_options_init(struct starpu_fxt_options *options)
 	options->out_paje_path = "paje.trace";
 	options->dag_path = "dag.dot";
 	options->tasks_path = "tasks.rec";
+	options->comms_path = "comms.rec";
 	options->data_path = "data.rec";
 	options->anim_path = "trace.html";
 	options->states_path = "trace.rec";
@@ -4204,6 +4227,15 @@ void _starpu_fxt_data_file_init(struct starpu_fxt_options *options)
 }
 
 static
+void _starpu_fxt_comms_file_init(struct starpu_fxt_options *options)
+{
+	if (options->comms_path)
+		comms_file = fopen(options->comms_path, "w+");
+	else
+		comms_file = NULL;
+}
+
+static
 void _starpu_fxt_write_trace_header(FILE *f)
 {
 	fprintf(f, "#\n");
@@ -4255,6 +4287,14 @@ void _starpu_fxt_tasks_file_close(void)
 }
 
 static
+void _starpu_fxt_comms_file_close(void)
+{
+	if (comms_file)
+		fclose(comms_file);
+}
+
+
+static
 void _starpu_fxt_data_file_close(void)
 {
 	if (data_file)
@@ -4361,6 +4401,7 @@ void starpu_fxt_generate_trace(struct starpu_fxt_options *options)
 	_starpu_fxt_anim_file_init(options);
 	_starpu_fxt_tasks_file_init(options);
 	_starpu_fxt_data_file_init(options);
+	_starpu_fxt_comms_file_init(options);
 	_starpu_fxt_trace_file_init(options);
 
 	_starpu_fxt_paje_file_init(options);
@@ -4491,6 +4532,7 @@ void starpu_fxt_generate_trace(struct starpu_fxt_options *options)
 	_starpu_fxt_anim_file_close();
 	_starpu_fxt_tasks_file_close();
 	_starpu_fxt_data_file_close();
+	_starpu_fxt_comms_file_close();
 	_starpu_fxt_trace_file_close();
 
 	_starpu_fxt_dag_terminate();

+ 2 - 1
src/debug/traces/starpu_fxt.h

@@ -60,8 +60,9 @@ void _starpu_fxt_dag_add_sync_point(void);
 
 int _starpu_fxt_mpi_find_sync_point(char *filename_in, uint64_t *offset, int *key, int *rank);
 void _starpu_fxt_mpi_add_send_transfer(int src, int dst, long mpi_tag, size_t size, float date, long jobid);
-void _starpu_fxt_mpi_add_recv_transfer(int src, int dst, long mpi_tag, float date, long jobid);
+void _starpu_fxt_mpi_add_recv_transfer(int src, int dst, long mpi_tag, float date, long jobid, unsigned long handle, FILE* comms_file);
 void _starpu_fxt_display_mpi_transfers(struct starpu_fxt_options *options, int *ranks, FILE *out_paje_file);
+void _starpu_fxt_mpi_dump_send_comm(int src, int dst STARPU_ATTRIBUTE_UNUSED, int mpi_tag, float date, unsigned long handle, FILE* comms_file);
 
 void _starpu_fxt_write_paje_header(FILE *file, struct starpu_fxt_options *options);
 

+ 28 - 1
src/debug/traces/starpu_fxt_mpi.c

@@ -153,8 +153,35 @@ void _starpu_fxt_mpi_add_send_transfer(int src, int dst STARPU_ATTRIBUTE_UNUSED,
 	mpi_sends[src][slot].jobid = jobid;
 }
 
-void _starpu_fxt_mpi_add_recv_transfer(int src STARPU_ATTRIBUTE_UNUSED, int dst, long mpi_tag, float date, long jobid)
+void _starpu_fxt_mpi_dump_send_comm(int src, int dst STARPU_ATTRIBUTE_UNUSED, int mpi_tag, float date, unsigned long handle, FILE* comms_file)
 {
+	STARPU_ASSERT(src >= 0);
+
+	if (comms_file != NULL)
+	{
+		fprintf(comms_file, "Type: send\n");
+		fprintf(comms_file, "Src: %d\n", src);
+		fprintf(comms_file, "Dst: %d\n", dst);
+		fprintf(comms_file, "Tag: %d\n", mpi_tag);
+		fprintf(comms_file, "Time: %f\n", date);
+		fprintf(comms_file, "Handle: %lx\n", handle);
+		fprintf(comms_file, "\n");
+	}
+}
+
+void _starpu_fxt_mpi_add_recv_transfer(int src STARPU_ATTRIBUTE_UNUSED, int dst, long mpi_tag, float date, long jobid, unsigned long handle, FILE* comms_file)
+{
+	if (comms_file != NULL)
+	{
+		fprintf(comms_file, "Type: recv\n");
+		fprintf(comms_file, "Src: %d\n", src);
+		fprintf(comms_file, "Dst: %d\n", dst);
+		fprintf(comms_file, "Tag: %ld\n", mpi_tag);
+		fprintf(comms_file, "Time: %f\n", date);
+		fprintf(comms_file, "Handle: %lx\n", handle);
+		fprintf(comms_file, "\n");
+	}
+
 	if (dst >= MAX_MPI_NODES)
 		return;
 	unsigned slot = mpi_recvs_used[dst]++;

+ 1 - 0
tools/Makefile.am

@@ -431,6 +431,7 @@ dist_bin_SCRIPTS +=			\
 	starpu_mlr_analysis		\
 	starpu_mlr_analysis.Rmd		\
 	starpu_paje_state_stats		\
+	starpu_send_recv_data_use.py \
 	starpu_trace_state_stats.py
 
 if STARPU_USE_AYUDAME2

+ 140 - 0
tools/starpu_send_recv_data_use.py

@@ -0,0 +1,140 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+# StarPU --- Runtime system for heterogeneous multicore architectures.
+#
+# Copyright (C) 2019                                      INRIA
+#
+# StarPU is free software; you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License as published by
+# the Free Software Foundation; either version 2.1 of the License, or (at
+# your option) any later version.
+#
+# StarPU is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+#
+# See the GNU Lesser General Public License in COPYING.LGPL for more details.
+#
+
+import re
+import numpy as np
+import matplotlib.pyplot as plt
+from matplotlib.gridspec import GridSpec
+import sys
+import os
+
+PROGNAME = sys.argv[0]
+
+
+def convert_rec_file(filename):
+    lines = []
+    item = dict()
+
+    with open(filename, "r") as f:
+        for l in f.readlines():
+            if l == "\n":
+                lines.append(item)
+                item = dict()
+            else:
+                ls = l.split(":")
+                key = ls[0].lower()
+                value = ls[1].strip()
+
+                if key in item:
+                    print("Warning: duplicated key '" + key + "'")
+                else:
+                    if re.match('^\d+$', value) != None:
+                        item[key] = int(value)
+                    elif re.match("^\d+\.\d+$", value) != None:
+                        item[key] = float(value)
+                    else:
+                        item[key] = value
+
+    return lines
+
+
+def usage():
+    print("Offline tool to draw graph about elapsed between sent or received data and their use by tasks")
+    print("")
+    print("Usage: %s <folder containing comms.rec and tasks.rec files>" % PROGNAME)
+
+
+if len(sys.argv) != 2:
+    print("Provide folder where *.rec files are located !")
+    sys.exit(1)
+
+working_directory = sys.argv[1]
+
+comms = convert_rec_file(os.path.join(working_directory, "comms.rec"))
+tasks = [t for t in convert_rec_file(os.path.join(working_directory, "tasks.rec")) if "control" not in t and "starttime" in t]
+
+def plot_graph(comm_type, match, filename, title, xlabel):
+    delays = []
+    workers = dict()
+    nb = 0
+    durations = []
+    min_time = 0.
+    max_time = 0.
+
+    for c in comms:
+        if c["type"] == comm_type:
+            t_matched = None
+            for t in tasks:
+                if match(t, c):
+                    t_matched = t
+                    break
+
+            if t_matched is None:
+                if comm_type == "recv":
+                    print("No match found")
+            else:
+                worker = str(t_matched['mpirank']) + "-" + str(t_matched['workerid'])
+                if worker not in workers:
+                    workers[worker] = []
+
+                eps = t["starttime"] - c["time"]
+                assert(eps > 0)
+                durations.append(eps)
+                workers[worker].append((c["time"], eps))
+
+                if min_time == 0 or c["time"] < min_time:
+                    min_time = c["time"]
+                if max_time == 0 or c["time"] > max_time:
+                    max_time = c["time"]
+
+                nb += 1
+
+
+    fig = plt.figure(constrained_layout=True)
+
+    gs = GridSpec(2, 2, figure=fig)
+    axs = [fig.add_subplot(gs[0, :-1]), fig.add_subplot(gs[1, :-1]), fig.add_subplot(gs[0:, -1])]
+    i = 0
+    for y, x in workers.items():
+        # print(y, x)
+        axs[0].broken_barh(x, [i*10, 8], facecolors=(0.1, 0.2, 0.5, 0.2)) 
+        i += 1
+
+    i = 0
+    for y, x in workers.items(): 
+        for xx in x:
+            axs[1].broken_barh([xx], [i, 1])
+            i += 1
+
+
+    axs[0].set_yticks([i*10+4 for i in range(len(workers))])
+    axs[0].set_yticklabels(list(workers))
+    axs[0].set(xlabel="Time (ms) - Duration: " + str(max_time - min_time) + "ms", ylabel="Worker [mpi]-[*pu]", title=title)
+
+
+    axs[2].hist(durations, bins=np.logspace(np.log10(1), np.log10(max(durations)), 50), rwidth=0.8)
+    axs[2].set_xscale("log")
+    axs[2].set(xlabel=xlabel, ylabel="Number of occurences", title="Histogramm")
+
+    fig.set_size_inches(15, 9)
+
+    plt.savefig(os.path.join(working_directory, filename), dpi=100)
+    plt.show()
+
+plot_graph("recv", lambda t, c: (t["mpirank"] == c["dst"] and t["starttime"] >= c["time"] and c["handle"] in t["handles"]), "recv_use.png", "Elapsed time between recv and use (ms)", "Time between data reception and its use by a task")
+plot_graph("send", lambda t, c: (t["mpirank"] == c["src"] and t["starttime"] >= c["time"] and c["handle"] in t["handles"]), "send_use.png", "Elapsed time between send and use (ms)", "Time between data sending and its use by a task")