Browse Source

Show MPI outbound bandwidth

Samuel Thibault 8 years ago
parent
commit
147405f1c5
2 changed files with 70 additions and 8 deletions
  1. 66 8
      src/debug/traces/starpu_fxt_mpi.c
  2. 4 0
      src/debug/traces/starpu_paje.c

+ 66 - 8
src/debug/traces/starpu_fxt_mpi.c

@@ -28,14 +28,14 @@
 
 
 #define MAX_MPI_NODES 64
 #define MAX_MPI_NODES 64
 
 
-struct mpi_transfer
-{
+LIST_TYPE(mpi_transfer,
 	unsigned matched;
 	unsigned matched;
 	int other_rank; /* src for a recv, dest for a send */
 	int other_rank; /* src for a recv, dest for a send */
 	int mpi_tag;
 	int mpi_tag;
 	size_t size;
 	size_t size;
 	float date;
 	float date;
-};
+	double bandwidth;
+);
 
 
 /* Returns 0 if a barrier is found, -1 otherwise. In case of success, offset is
 /* Returns 0 if a barrier is found, -1 otherwise. In case of success, offset is
  * filled with the timestamp of the barrier */
  * filled with the timestamp of the barrier */
@@ -117,6 +117,7 @@ unsigned mpi_recvs_used[MAX_MPI_NODES] = {0};
  * going through the lists from the beginning to match each and every
  * going through the lists from the beginning to match each and every
  * transfer, thus avoiding a quadratic complexity. */
  * transfer, thus avoiding a quadratic complexity. */
 unsigned mpi_recvs_matched[MAX_MPI_NODES][MAX_MPI_NODES] = {0};
 unsigned mpi_recvs_matched[MAX_MPI_NODES][MAX_MPI_NODES] = {0};
+unsigned mpi_sends_matched[MAX_MPI_NODES][MAX_MPI_NODES] = {0};
 
 
 void _starpu_fxt_mpi_add_send_transfer(int src, int dst STARPU_ATTRIBUTE_UNUSED, int mpi_tag, size_t size, float date)
 void _starpu_fxt_mpi_add_send_transfer(int src, int dst STARPU_ATTRIBUTE_UNUSED, int mpi_tag, size_t size, float date)
 {
 {
@@ -213,14 +214,49 @@ static unsigned long mpi_com_id = 0;
 static void display_all_transfers_from_trace(FILE *out_paje_file, int src)
 static void display_all_transfers_from_trace(FILE *out_paje_file, int src)
 {
 {
 	unsigned slot;
 	unsigned slot;
-	for (slot = 0; slot < mpi_sends_used[src]; slot++)
+	struct mpi_transfer_list pending_matches; /* Sorted list of matches which have not happened yet */
+	double current_bandwidth = 0.;
+
+#ifdef STARPU_HAVE_POTI
+	char mpi_local_container[STARPU_POTI_STR_LEN];
+	snprintf(mpi_local_container, sizeof(mpi_local_container), "%d_mpict", /* XXX */src);
+	poti_SetVariable(0., mpi_local_container, "bwo", current_bandwidth);
+#else
+	fprintf(out_paje_file, "13	%.9f	%d_mpict	bwo	%f\n", 0., src, current_bandwidth);
+#endif
+
+	mpi_transfer_list_init(&pending_matches);
+
+	slot = 0;
+	/* Parse sends to display communications and compute outbound bandwidth */
+	while (slot < mpi_sends_used[src] || !mpi_transfer_list_empty(&pending_matches))
 	{
 	{
+		float start_date = INFINITY;
+		struct mpi_transfer *match;
+		
+		if (slot < mpi_sends_used[src])
+			start_date = mpi_sends[src][slot].date;
+
+		if (!mpi_transfer_list_empty(&pending_matches) &&
+			mpi_transfer_list_front(&pending_matches)->date < start_date)
+		{
+			match = mpi_transfer_list_pop_front(&pending_matches);
+			current_bandwidth -= match->bandwidth;
+			if (out_paje_file)
+			{
+#ifdef STARPU_HAVE_POTI
+				poti_SetVariable(match->date, mpi_local_container, "bwo", current_bandwidth);
+#else
+				fprintf(out_paje_file, "13	%.9f	%d_mpict	bwo	%f\n", match->date, src, current_bandwidth);
+#endif
+			}
+			continue;
+		}
+
 		int dst = mpi_sends[src][slot].other_rank;
 		int dst = mpi_sends[src][slot].other_rank;
 		int mpi_tag = mpi_sends[src][slot].mpi_tag;
 		int mpi_tag = mpi_sends[src][slot].mpi_tag;
-		float start_date = mpi_sends[src][slot].date;
 		size_t size = mpi_sends[src][slot].size;
 		size_t size = mpi_sends[src][slot].size;
 
 
-		struct mpi_transfer *match;
 		if (dst < MAX_MPI_NODES)
 		if (dst < MAX_MPI_NODES)
 			match = try_to_match_send_transfer(src, dst, mpi_tag);
 			match = try_to_match_send_transfer(src, dst, mpi_tag);
 		else
 		else
@@ -229,6 +265,26 @@ static void display_all_transfers_from_trace(FILE *out_paje_file, int src)
 		if (match)
 		if (match)
 		{
 		{
 			float end_date = match->date;
 			float end_date = match->date;
+			struct mpi_transfer *prev;
+
+			match->bandwidth = (0.001*size)/(end_date - start_date);
+			current_bandwidth += match->bandwidth;
+
+			/* Insert in sorted list, most probably at the end so a mere insertion sort */
+			for (prev = mpi_transfer_list_last(&pending_matches);
+				prev != mpi_transfer_list_alpha(&pending_matches);
+				prev = mpi_transfer_list_prev(prev))
+				if (prev->date <= end_date)
+				{
+					/* Found its place */
+					mpi_transfer_list_insert_after(&pending_matches, match, prev);
+					break;
+				}
+			if (prev == mpi_transfer_list_alpha(&pending_matches))
+			{
+				/* No element earlier than this one, put it at the head */
+				mpi_transfer_list_push_front(&pending_matches, match);
+			}
 
 
 			unsigned long id = mpi_com_id++;
 			unsigned long id = mpi_com_id++;
 			/* TODO replace 0 by a MPI program ? */
 			/* TODO replace 0 by a MPI program ? */
@@ -238,14 +294,15 @@ static void display_all_transfers_from_trace(FILE *out_paje_file, int src)
 				char paje_value[STARPU_POTI_STR_LEN], paje_key[STARPU_POTI_STR_LEN];
 				char paje_value[STARPU_POTI_STR_LEN], paje_key[STARPU_POTI_STR_LEN];
 				snprintf(paje_value, STARPU_POTI_STR_LEN, "%lu", (long unsigned) size);
 				snprintf(paje_value, STARPU_POTI_STR_LEN, "%lu", (long unsigned) size);
 				snprintf(paje_key, STARPU_POTI_STR_LEN, "mpicom_%lu", id);
 				snprintf(paje_key, STARPU_POTI_STR_LEN, "mpicom_%lu", id);
+				poti_StartLink(start_date, "MPICt", "MPIL", mpi_local_container, paje_value, paje_key);
 				char mpi_container[STARPU_POTI_STR_LEN];
 				char mpi_container[STARPU_POTI_STR_LEN];
-				snprintf(mpi_container, sizeof(mpi_container), "%d_mpict", /* XXX */src);
-				poti_StartLink(start_date, "MPICt", "MPIL", mpi_container, paje_value, paje_key);
 				snprintf(mpi_container, sizeof(mpi_container), "%d_mpict", /* XXX */dst);
 				snprintf(mpi_container, sizeof(mpi_container), "%d_mpict", /* XXX */dst);
 				poti_EndLink(end_date, "MPICt", "MPIL", mpi_container, paje_value, paje_key);
 				poti_EndLink(end_date, "MPICt", "MPIL", mpi_container, paje_value, paje_key);
+				poti_SetVariable(start_date, mpi_local_container, "bwo", current_bandwidth);
 #else
 #else
 				fprintf(out_paje_file, "18	%.9f	MPIL	MPIroot	%lu	%d_mpict	mpicom_%lu\n", start_date, (unsigned long)size, /* XXX */src, id);
 				fprintf(out_paje_file, "18	%.9f	MPIL	MPIroot	%lu	%d_mpict	mpicom_%lu\n", start_date, (unsigned long)size, /* XXX */src, id);
 				fprintf(out_paje_file, "19	%.9f	MPIL	MPIroot	%lu	%d_mpict	mpicom_%lu\n", end_date, (unsigned long)size, /* XXX */dst, id);
 				fprintf(out_paje_file, "19	%.9f	MPIL	MPIroot	%lu	%d_mpict	mpicom_%lu\n", end_date, (unsigned long)size, /* XXX */dst, id);
+				fprintf(out_paje_file, "13	%.9f	%d_mpict	bwo	%f\n", start_date, src, current_bandwidth);
 #endif
 #endif
 			}
 			}
 		}
 		}
@@ -254,6 +311,7 @@ static void display_all_transfers_from_trace(FILE *out_paje_file, int src)
 			_STARPU_DISP("Warning, could not match MPI transfer from %d to %d (tag %x) starting at %f\n", src, dst, mpi_tag, start_date);
 			_STARPU_DISP("Warning, could not match MPI transfer from %d to %d (tag %x) starting at %f\n", src, dst, mpi_tag, start_date);
 		}
 		}
 
 
+		slot++;
 	}
 	}
 }
 }
 
 

+ 4 - 0
src/debug/traces/starpu_paje.c

@@ -216,6 +216,8 @@ void _starpu_fxt_write_paje_header(FILE *file STARPU_ATTRIBUTE_UNUSED)
 
 
 	/* Types for the MPI Communication Thread of the Memory Node */
 	/* Types for the MPI Communication Thread of the Memory Node */
 	poti_DefineEventType("MPIev", "MPICt", "MPI event type");
 	poti_DefineEventType("MPIev", "MPICt", "MPI event type");
+	poti_DefineVariableType("bwi", "MPICt", "Bandwidth In (MB/s)", "0 0 0");
+	poti_DefineVariableType("bwo", "MPICt", "Bandwidth Out (MB/s)", "0 0 0");
 	poti_DefineStateType("CtS", "MPICt", "Communication Thread State");
 	poti_DefineStateType("CtS", "MPICt", "Communication Thread State");
 	poti_DefineEntityValue("P", "CtS", "Processing", "0 0 0");
 	poti_DefineEntityValue("P", "CtS", "Processing", "0 0 0");
 	poti_DefineEntityValue("Sl", "CtS", "Sleeping", ".9 .1 .0");
 	poti_DefineEntityValue("Sl", "CtS", "Sleeping", ".9 .1 .0");
@@ -305,6 +307,8 @@ void _starpu_fxt_write_paje_header(FILE *file STARPU_ATTRIBUTE_UNUSED)
 4       use     Mm       \"Used (MB)\"                        \n\
 4       use     Mm       \"Used (MB)\"                        \n\
 4       bwi     Mm       \"Bandwidth In (MB/s)\"                        \n\
 4       bwi     Mm       \"Bandwidth In (MB/s)\"                        \n\
 4       bwo     Mm       \"Bandwidth Out (MB/s)\"                        \n\
 4       bwo     Mm       \"Bandwidth Out (MB/s)\"                        \n\
+4       bwi     MPICt       \"Bandwidth In (MB/s)\"                        \n\
+4       bwo     MPICt       \"Bandwidth Out (MB/s)\"                        \n\
 4       gf      T       \"GFlops\"                        \n\
 4       gf      T       \"GFlops\"                        \n\
 6       I       S       Idle         \".9 .1 .0\"		\n\
 6       I       S       Idle         \".9 .1 .0\"		\n\
 6       In       S      Initializing       \"0.0 .7 1.0\"            \n\
 6       In       S      Initializing       \"0.0 .7 1.0\"            \n\