Browse Source

display the MPI transfers between multiple nodes

Cédric Augonnet 15 years ago
parent
commit
e64bb38448
6 changed files with 221 additions and 7 deletions
  1. 15 0
      mpi/starpu_mpi.c
  2. 10 1
      mpi/starpu_mpi_fxt.h
  3. 5 0
      mpi/starpu_mpi_private.h
  4. 117 1
      tools/fxt-tool-mpi.c
  5. 58 4
      tools/fxt-tool.c
  6. 16 1
      tools/fxt-tool.h

+ 15 - 0
mpi/starpu_mpi.c

@@ -49,6 +49,8 @@ static void starpu_mpi_isend_func(struct starpu_mpi_req_s *req)
 
 	MPI_Isend(ptr, 1, req->datatype, req->srcdst, req->mpi_tag, req->comm, &req->request);
 
+	TRACE_MPI_ISEND(req->srcdst, req->mpi_tag, 0);
+
 	/* somebody is perhaps waiting for the MPI request to be posted */
 	pthread_mutex_lock(&req->req_mutex);
 	req->submitted = 1;
@@ -72,6 +74,8 @@ int starpu_mpi_isend(starpu_data_handle data_handle, starpu_mpi_req *public_req,
 	pthread_mutex_init(&req->req_mutex, NULL);
 	pthread_cond_init(&req->req_cond, NULL);
 
+	req->request_type = SEND_REQ;
+
 	req->data_handle = data_handle;
 	req->srcdst = dest;
 	req->mpi_tag = mpi_tag;
@@ -104,6 +108,8 @@ int starpu_mpi_isend_detached(starpu_data_handle data_handle,
 	pthread_mutex_init(&req->req_mutex, NULL);
 	pthread_cond_init(&req->req_cond, NULL);
 
+	req->request_type = SEND_REQ;
+
 	req->data_handle = data_handle;
 	req->srcdst = dest;
 	req->mpi_tag = mpi_tag;
@@ -158,6 +164,8 @@ int starpu_mpi_irecv(starpu_data_handle data_handle, starpu_mpi_req *public_req,
 	pthread_mutex_init(&req->req_mutex, NULL);
 	pthread_cond_init(&req->req_cond, NULL);
 
+	req->request_type = RECV_REQ;
+
 	req->data_handle = data_handle;
 	req->srcdst = source;
 	req->mpi_tag = mpi_tag;
@@ -189,6 +197,8 @@ int starpu_mpi_irecv_detached(starpu_data_handle data_handle, int source, int mp
 	pthread_mutex_init(&req->req_mutex, NULL);
 	pthread_cond_init(&req->req_cond, NULL);
 
+	req->request_type = RECV_REQ;
+
 	req->data_handle = data_handle;
 	req->srcdst = source;
 	req->mpi_tag = mpi_tag;
@@ -381,6 +391,11 @@ static void handle_request_termination(struct starpu_mpi_req_s *req)
 	MPI_Type_free(&req->datatype);
 	starpu_release_data_from_mem(req->data_handle);
 
+	if (req->request_type == RECV_REQ)
+	{
+		TRACE_MPI_IRECV_END(req->srcdst, req->mpi_tag);
+	}
+
 	/* Execute the specified callback, if any */
 	if (req->callback)
 		req->callback(req->callback_arg);

+ 10 - 1
mpi/starpu_mpi_fxt.h

@@ -22,12 +22,21 @@
 #include <common/fxt.h>
 
 #define FUT_MPI_BARRIER		0x5201
+#define FUT_MPI_ISEND		0x5202
+#define FUT_MPI_IRECV_END	0x5203
 
 #ifdef USE_FXT
 #define TRACE_MPI_BARRIER(rank, worldsize, key)	\
-	FUT_DO_PROBE4(FUT_MPI_BARRIER, rank, worldsize, key, syscall(SYS_gettid));
+	FUT_DO_PROBE4(FUT_MPI_BARRIER, (rank), (worldsize), (key), syscall(SYS_gettid));
+#define TRACE_MPI_ISEND(dest, mpi_tag, size)	\
+	FUT_DO_PROBE4(FUT_MPI_ISEND, (dest), (mpi_tag), (size), syscall(SYS_gettid));
+#define TRACE_MPI_IRECV_END(src, mpi_tag)	\
+	FUT_DO_PROBE3(FUT_MPI_IRECV_END, (src), (mpi_tag), syscall(SYS_gettid));
+#define TRACE
 #else
 #define TRACE_MPI_BARRIER(a, b, c)	do {} while(0);
+#define TRACE_MPI_ISEND(a, b, c)	do {} while(0);
+#define TRACE_MPI_IRECV_END(a, b)	do {} while(0);
 #endif
 
 

+ 5 - 0
mpi/starpu_mpi_private.h

@@ -24,6 +24,9 @@
 #include <common/list.h>
 #include <pthread.h>
 
+#define SEND_REQ	0
+#define RECV_REQ	1
+
 LIST_TYPE(starpu_mpi_req,
 	/* description of the data at StarPU level */
 	starpu_data_handle data_handle;
@@ -47,6 +50,8 @@ LIST_TYPE(starpu_mpi_req,
 	pthread_mutex_t req_mutex;
 	pthread_cond_t req_cond;
 
+	unsigned request_type; /* 0 send, 1 recv */
+
 	unsigned submitted;
 	unsigned completed;
 

+ 117 - 1
tools/fxt-tool-mpi.c

@@ -18,7 +18,7 @@
 
 /* Returns 0 if a barrier is found, -1 otherwise. In case of success, offset is
  * filled with the timestamp of the barrier */
-int find_sync_point(char *filename_in, uint64_t *offset, int *key)
+int find_sync_point(char *filename_in, uint64_t *offset, int *key, int *rank)
 {
 	STARPU_ASSERT(offset);
 
@@ -55,6 +55,7 @@ int find_sync_point(char *filename_in, uint64_t *offset, int *key)
 		{
 			/* We found the sync point */
 			*offset = ev.time;
+			*rank = ev.param[0];
 			*key = ev.param[2];
 			found = 1;
 			func_ret = 0;
@@ -71,4 +72,119 @@ int find_sync_point(char *filename_in, uint64_t *offset, int *key)
 	return func_ret;
 }
 
+/*
+ *	Deal with the actual MPI transfers performed with the MPI lib
+ */
+
+/* the list of MPI transfers found in the different traces */
+static struct mpi_transfer *mpi_sends[64] = {NULL};
+static struct mpi_transfer *mpi_recvs[64] = {NULL};
+
+/* number of available slots in the lists  */
+unsigned mpi_sends_list_size[64] = {0};
+unsigned mpi_recvs_list_size[64] = {0};
+
+/* number of slots actually used in the list  */
+unsigned mpi_sends_used[64] = {0};
+unsigned mpi_recvs_used[64] = {0};
+
+void add_mpi_send_transfer(int src, int dst, int mpi_tag, size_t size, float date)
+{
+	unsigned slot = mpi_sends_used[src]++;
+
+	if (mpi_sends_used[src] > mpi_sends_list_size[src])
+	{
+		if (mpi_sends_list_size[src] > 0)
+		{
+			mpi_sends_list_size[src] *= 2;
+		}
+		else {
+			mpi_sends_list_size[src] = 1;
+		}
+
+		mpi_sends[src] = realloc(mpi_sends[src], mpi_sends_list_size[src]*sizeof(struct mpi_transfer));
+	}
+
+	mpi_sends[src][slot].matched = 0;
+	mpi_sends[src][slot].other_rank = dst;
+	mpi_sends[src][slot].mpi_tag = mpi_tag;
+	mpi_sends[src][slot].size = size;
+	mpi_sends[src][slot].date = date;
+}
+
+void add_mpi_recv_transfer(int src, int dst, int mpi_tag, float date)
+{
+	unsigned slot = mpi_recvs_used[dst]++;
+
+	if (mpi_recvs_used[dst] > mpi_recvs_list_size[dst])
+	{
+		if (mpi_recvs_list_size[dst] > 0)
+		{
+			mpi_recvs_list_size[dst] *= 2;
+		}
+		else {
+			mpi_recvs_list_size[dst] = 1;
+		}
+
+		mpi_recvs[dst] = realloc(mpi_recvs[dst], mpi_recvs_list_size[dst]*sizeof(struct mpi_transfer));
+	}
+
+	mpi_recvs[dst][slot].matched = 0;
+	mpi_recvs[dst][slot].other_rank = dst;
+	mpi_recvs[dst][slot].mpi_tag = mpi_tag;
+	mpi_recvs[dst][slot].date = date;
+}
+
+struct mpi_transfer *try_to_match_send_transfer(int src, int dst, int mpi_tag)
+{
+	unsigned slot;
+#warning TODO improve !! this creates a quadratic complexity
+	for (slot = 0; slot < mpi_recvs_used[dst]; slot++)
+	{
+		if (!mpi_recvs[dst][slot].matched)
+		{
+			if (mpi_recvs[dst][slot].mpi_tag == mpi_tag)
+			{
+				/* we found a match ! */
+				mpi_recvs[dst][slot].matched = 1;
+				return &mpi_recvs[dst][slot];
+			}
+		}
+	}
+
+	/* If we reached that point, we could not find a match */
+	return NULL;
+}
+
+static unsigned long mpi_com_id = 0;
 
+void display_all_transfers_from_trace(FILE *out_paje_file, int src)
+{
+	unsigned slot;
+	for (slot = 0; slot < mpi_sends_used[src]; slot++)
+	{
+		int dst = mpi_sends[src][slot].other_rank;
+		int mpi_tag = mpi_sends[src][slot].mpi_tag;
+		float start_date = mpi_sends[src][slot].date;
+		size_t size = mpi_sends[src][slot].size;
+
+		struct mpi_transfer *match;
+		match = try_to_match_send_transfer(src, dst, mpi_tag);
+
+		if (match)
+		{
+			float end_date = match->date;
+
+			unsigned long id = mpi_com_id++;
+			/* TODO replace 0 by a MPI program ? */
+			fprintf(out_paje_file, "18	%f	MPIL	MPIroot   %d	mpi_%d_p	mpicom_%ld\n", start_date, size, /* XXX */src, id);
+			fprintf(out_paje_file, "19	%f	MPIL	MPIroot	  %d	mpi_%d_p	mpicom_%ld\n", end_date, size, /* XXX */dst, id);
+		}
+		else
+		{
+			fprintf(stderr, "Warning, could not match MPI transfer from %d to %d (tag %x) starting at %f\n",
+												src, dst, mpi_tag, start_date);
+		}
+
+	}
+}

+ 58 - 4
tools/fxt-tool.c

@@ -31,6 +31,7 @@ struct fxt_ev_64 ev;
  * processes), we may need to prefix the name of the containers. */
 char *prefix = "";
 uint64_t offset = 0;
+int rank = -1;
 
 static uint64_t start_time = 0;
 static uint64_t end_time = 0;
@@ -75,7 +76,8 @@ static void paje_output_file_init(void)
 	write_paje_header(out_paje_file);
 
 	fprintf(out_paje_file, "                                        \n \
-	1       P      0       \"Program\"                      	\n \
+	1       MPIP      0       \"MPI Program\"                      	\n \
+	1       P      MPIP       \"Program\"                      	\n \
 	1       Mn      P       \"Memory Node\"                         \n \
 	1       T      Mn       \"Worker\"                               \n \
 	1       Sc       P       \"Scheduler State\"                        \n \
@@ -97,7 +99,10 @@ static void paje_output_file_init(void)
 	6       R       MS      Reclaiming         \".0 .1 .4\"		\n \
 	6       Co       MS     DriverCopy         \".3 .5 .1\"		\n \
 	6       No       MS     Nothing         \".0 .0 .0\"		\n \
+	5       MPIL     MPIP	P	P      MPIL\n \
 	5       L       P	Mn	Mn      L\n");
+
+	fprintf(out_paje_file, "7      0.0 MPIroot      MPIP      0       root\n");
 }
 
 /*
@@ -495,6 +500,33 @@ static void handle_task_done(void)
 	dot_set_tag_done(tag_id, colour);
 }
 
+static void handle_mpi_barrier(void)
+{
+	rank = ev.param[0];
+
+	/* Add an event in the trace */
+	fprintf(out_paje_file, "9       %f     event      %sp      %d\n", get_event_time_stamp(), prefix, rank);
+}
+
+static void handle_mpi_isend(void)
+{
+	int dest = ev.param[0];
+	int mpi_tag = ev.param[1];
+	size_t size = ev.param[2];
+	float date = get_event_time_stamp();
+
+	add_mpi_send_transfer(rank, dest, mpi_tag, size, date);
+}
+
+static void handle_mpi_irecv_end(void)
+{
+	int src = ev.param[0];
+	int mpi_tag = ev.param[1];
+	float date = get_event_time_stamp();
+
+	add_mpi_recv_transfer(src, rank, mpi_tag, date);
+}
+
 static void parse_args(int argc, char **argv)
 {
 	int i;
@@ -562,7 +594,7 @@ void parse_new_file(char *filename_in, char *file_prefix, uint64_t file_offset)
 
 	/* TODO starttime ...*/
 	/* create the "program" container */
-	fprintf(out_paje_file, "7      0.0 %sp      P      0       program%s \n", prefix, prefix);
+	fprintf(out_paje_file, "7      0.0 %sp      P      MPIroot       program%s \n", prefix, prefix);
 	/* create a variable with the number of tasks */
 	if (!no_counter)
 	{
@@ -709,6 +741,18 @@ void parse_new_file(char *filename_in, char *file_prefix, uint64_t file_offset)
 				handle_user_event();
 				break;
 
+			case FUT_MPI_BARRIER:
+				handle_mpi_barrier();
+				break;
+
+			case FUT_MPI_ISEND:
+				handle_mpi_isend();
+				break;
+
+			case FUT_MPI_IRECV_END:
+				handle_mpi_irecv_end();
+				break;
+
 			default:
 				fprintf(stderr, "unknown event.. %x at time %llx WITH OFFSET %llx\n",
 					(unsigned)ev.code, (long long unsigned)ev.time, (long long unsigned)(ev.time-offset));
@@ -769,6 +813,7 @@ int main(int argc, char **argv)
 		 */
 		
 		int unique_keys[64];
+		int rank_k[64];
 		uint64_t start_k[64];
 		uint64_t sync_k[64];
 		unsigned sync_k_exists[64];
@@ -790,7 +835,8 @@ int main(int argc, char **argv)
 		{
 			int ret = find_sync_point(filenames[inputfile],
 							&sync_k[inputfile],
-							&unique_keys[inputfile]);
+							&unique_keys[inputfile],
+							&rank_k[inputfile]);
 			if (ret == -1)
 			{
 				/* There was no sync point, we assume there is no offset */
@@ -831,11 +877,19 @@ int main(int argc, char **argv)
 		/* generate the Paje trace for the different files */
 		for (inputfile = 0; inputfile < ninputfiles; inputfile++)
 		{
+			int filerank = rank_k[inputfile];
 
 			char file_prefix[32];
-			snprintf(file_prefix, 32, "file_%d_", inputfile);
+			snprintf(file_prefix, 32, "mpi_%d_", filerank);
 			parse_new_file(filenames[inputfile], file_prefix, offsets[inputfile]);
 		}
+
+		/* display the MPI transfers if possible */
+		for (inputfile = 0; inputfile < ninputfiles; inputfile++)
+		{
+			int filerank = rank_k[inputfile];
+			display_all_transfers_from_trace(out_paje_file, filerank);
+		}
 	}
 
 	display_bandwith_evolution();

+ 16 - 1
tools/fxt-tool.h

@@ -51,7 +51,22 @@ unsigned get_colour_symbol_blue(char *name);
 
 void reinit_colors(void);
 
-int find_sync_point(char *filename_in, uint64_t *offset, int *key);
+/*
+ *	MPI
+ */
+
+int find_sync_point(char *filename_in, uint64_t *offset, int *key, int *rank);
 uint64_t find_start_time(char *filename_in);
 
+struct mpi_transfer {
+	unsigned matched;
+	int other_rank; /* src for a recv, dest for a send */
+	int mpi_tag;
+	size_t size;
+	float date;
+};
+
+void add_mpi_send_transfer(int src, int dst, int mpi_tag, size_t size, float date);
+void add_mpi_recv_transfer(int src, int dst, int mpi_tag, float date);
+
 #endif // __FXT_TOOL_H__