Quellcode durchsuchen

Also show MPI inbound bandwidth

Samuel Thibault vor 8 Jahren
Ursprung
Commit
ebe2de03df
1 geänderte Dateien mit 86 neuen und 64 gelöschten Zeilen
  1. 86 64
      src/debug/traces/starpu_fxt_mpi.c

+ 86 - 64
src/debug/traces/starpu_fxt_mpi.c

@@ -30,7 +30,8 @@
 
 LIST_TYPE(mpi_transfer,
 	unsigned matched;
-	int other_rank; /* src for a recv, dest for a send */
+	int src;
+	int dst;
 	int mpi_tag;
 	size_t size;
 	float date;
@@ -141,7 +142,8 @@ void _starpu_fxt_mpi_add_send_transfer(int src, int dst STARPU_ATTRIBUTE_UNUSED,
 	}
 
 	mpi_sends[src][slot].matched = 0;
-	mpi_sends[src][slot].other_rank = dst;
+	mpi_sends[src][slot].src = src;
+	mpi_sends[src][slot].dst = dst;
 	mpi_sends[src][slot].mpi_tag = mpi_tag;
 	mpi_sends[src][slot].size = size;
 	mpi_sends[src][slot].date = date;
@@ -168,7 +170,8 @@ void _starpu_fxt_mpi_add_recv_transfer(int src STARPU_ATTRIBUTE_UNUSED, int dst,
 	}
 
 	mpi_recvs[dst][slot].matched = 0;
-	mpi_recvs[dst][slot].other_rank = dst;
+	mpi_recvs[dst][slot].src = src;
+	mpi_recvs[dst][slot].dst = dst;
 	mpi_recvs[dst][slot].mpi_tag = mpi_tag;
 	mpi_recvs[dst][slot].date = date;
 }
@@ -211,51 +214,76 @@ struct mpi_transfer *try_to_match_send_transfer(int src STARPU_ATTRIBUTE_UNUSED,
 
 static unsigned long mpi_com_id = 0;
 
-static void display_all_transfers_from_trace(FILE *out_paje_file, int src)
+static void display_all_transfers_from_trace(FILE *out_paje_file, unsigned n)
 {
-	unsigned slot;
-	struct mpi_transfer_list pending_matches; /* Sorted list of matches which have not happened yet */
-	double current_bandwidth = 0.;
+	unsigned slot[MAX_MPI_NODES] = { 0 }, node, src;
+	struct mpi_transfer_list pending_receives; /* Sorted list of matches which have not happened yet */
+	double current_out_bandwidth[MAX_MPI_NODES] = { 0. };
+	double current_in_bandwidth[MAX_MPI_NODES] = { 0. };
+#ifdef STARPU_HAVE_POTI
+	char mpi_container[STARPU_POTI_STR_LEN];
+#endif
 
+	for (node = 0; node < n ; node++)
+	{
 #ifdef STARPU_HAVE_POTI
-	char mpi_local_container[STARPU_POTI_STR_LEN];
-	snprintf(mpi_local_container, sizeof(mpi_local_container), "%d_mpict", /* XXX */src);
-	poti_SetVariable(0., mpi_local_container, "bwo", current_bandwidth);
+		poti_SetVariable(0., mpi_local_container, "bwi", 0.);
+		poti_SetVariable(0., mpi_local_container, "bwo", 0.);
 #else
-	fprintf(out_paje_file, "13	%.9f	%d_mpict	bwo	%f\n", 0., src, current_bandwidth);
+		fprintf(out_paje_file, "13	%.9f	%d_mpict	bwi	%f\n", 0., node, 0.);
+		fprintf(out_paje_file, "13	%.9f	%d_mpict	bwo	%f\n", 0., node, 0.);
 #endif
+	}
 
-	mpi_transfer_list_init(&pending_matches);
+	mpi_transfer_list_init(&pending_receives);
 
-	slot = 0;
-	/* Parse sends to display communications and compute outbound bandwidth */
-	while (slot < mpi_sends_used[src] || !mpi_transfer_list_empty(&pending_matches))
+	while (1)
 	{
-		float start_date = INFINITY;
-		struct mpi_transfer *match;
-		
-		if (slot < mpi_sends_used[src])
-			start_date = mpi_sends[src][slot].date;
-
-		if (!mpi_transfer_list_empty(&pending_matches) &&
-			mpi_transfer_list_front(&pending_matches)->date < start_date)
-		{
-			match = mpi_transfer_list_pop_front(&pending_matches);
-			current_bandwidth -= match->bandwidth;
-			if (out_paje_file)
+		float start_date;
+		struct mpi_transfer *cur, *match;
+
+		/* Find out which event comes first: a pending receive, or a new send */
+
+		if (mpi_transfer_list_empty(&pending_receives))
+			start_date = INFINITY;
+		else
+			start_date = mpi_transfer_list_front(&pending_receives)->date;
+
+		src = MAX_MPI_NODES;
+		for (node = 0; node < n; node++) {
+			if (slot[node] < mpi_sends_used[node] && mpi_sends[node][slot[node]].date < start_date)
 			{
+				/* next send for node is earlier than others */
+				src = node;
+				start_date = mpi_sends[src][slot[src]].date;
+			}
+		}
+		if (start_date == INFINITY)
+			/* No event any more, we're finished! */
+			break;
+
+		if (src == MAX_MPI_NODES)
+		{
+			/* Pending match is earlier than all new sends, finish its communication */
+			match = mpi_transfer_list_pop_front(&pending_receives);
+			current_out_bandwidth[match->src] -= match->bandwidth;
+			current_in_bandwidth[match->dst] -= match->bandwidth;
 #ifdef STARPU_HAVE_POTI
-				poti_SetVariable(match->date, mpi_local_container, "bwo", current_bandwidth);
+			snprintf(mpi_container, sizeof(mpi_container), "%d_mpict", src);
+			poti_SetVariable(match->date, mpi_container, "bwo", current_out_bandwidth[match->src]);
+			snprintf(mpi_container, sizeof(mpi_container), "%d_mpict", dst);
+			poti_SetVariable(match->date, mpi_container, "bwi", current_in_bandwidth[match->dst]);
 #else
-				fprintf(out_paje_file, "13	%.9f	%d_mpict	bwo	%f\n", match->date, src, current_bandwidth);
+			fprintf(out_paje_file, "13	%.9f	%d_mpict	bwo	%f\n", match->date, match->src, current_out_bandwidth[match->src]);
+			fprintf(out_paje_file, "13	%.9f	%d_mpict	bwi	%f\n", match->date, match->dst, current_in_bandwidth[match->dst]);
 #endif
-			}
 			continue;
 		}
 
-		int dst = mpi_sends[src][slot].other_rank;
-		int mpi_tag = mpi_sends[src][slot].mpi_tag;
-		size_t size = mpi_sends[src][slot].size;
+		cur = &mpi_sends[src][slot[src]];
+		int dst = cur->dst;
+		int mpi_tag = cur->mpi_tag;
+		size_t size = cur->size;
 
 		if (dst < MAX_MPI_NODES)
 			match = try_to_match_send_transfer(src, dst, mpi_tag);
@@ -268,57 +296,54 @@ static void display_all_transfers_from_trace(FILE *out_paje_file, int src)
 			struct mpi_transfer *prev;
 
 			match->bandwidth = (0.001*size)/(end_date - start_date);
-			current_bandwidth += match->bandwidth;
+			current_out_bandwidth[src] += match->bandwidth;
+			current_in_bandwidth[dst] += match->bandwidth;
 
-			/* Insert in sorted list, most probably at the end so a mere insertion sort */
-			for (prev = mpi_transfer_list_last(&pending_matches);
-				prev != mpi_transfer_list_alpha(&pending_matches);
+			/* Insert in sorted list, most probably at the end so let's use a mere insertion sort */
+			for (prev = mpi_transfer_list_last(&pending_receives);
+				prev != mpi_transfer_list_alpha(&pending_receives);
 				prev = mpi_transfer_list_prev(prev))
 				if (prev->date <= end_date)
 				{
 					/* Found its place */
-					mpi_transfer_list_insert_after(&pending_matches, match, prev);
+					mpi_transfer_list_insert_after(&pending_receives, match, prev);
 					break;
 				}
-			if (prev == mpi_transfer_list_alpha(&pending_matches))
+			if (prev == mpi_transfer_list_alpha(&pending_receives))
 			{
 				/* No element earlier than this one, put it at the head */
-				mpi_transfer_list_push_front(&pending_matches, match);
+				mpi_transfer_list_push_front(&pending_receives, match);
 			}
 
 			unsigned long id = mpi_com_id++;
-			/* TODO replace 0 by a MPI program ? */
-			if (out_paje_file)
-			{
 #ifdef STARPU_HAVE_POTI
-				char paje_value[STARPU_POTI_STR_LEN], paje_key[STARPU_POTI_STR_LEN];
-				snprintf(paje_value, STARPU_POTI_STR_LEN, "%lu", (long unsigned) size);
-				snprintf(paje_key, STARPU_POTI_STR_LEN, "mpicom_%lu", id);
-				poti_StartLink(start_date, "MPICt", "MPIL", mpi_local_container, paje_value, paje_key);
-				char mpi_container[STARPU_POTI_STR_LEN];
-				snprintf(mpi_container, sizeof(mpi_container), "%d_mpict", /* XXX */dst);
-				poti_EndLink(end_date, "MPICt", "MPIL", mpi_container, paje_value, paje_key);
-				poti_SetVariable(start_date, mpi_local_container, "bwo", current_bandwidth);
+			char paje_value[STARPU_POTI_STR_LEN], paje_key[STARPU_POTI_STR_LEN];
+			snprintf(paje_value, STARPU_POTI_STR_LEN, "%lu", (long unsigned) size);
+			snprintf(paje_key, STARPU_POTI_STR_LEN, "mpicom_%lu", id);
+			snprintf(mpi_container, sizeof(mpi_container), "%d_mpict", src);
+			poti_StartLink(start_date, "MPICt", "MPIL", mpi_container, paje_value, paje_key);
+			poti_SetVariable(start_date, mpi_container, "bwo", current_out_bandwidth[src]);
+			snprintf(mpi_container, sizeof(mpi_container), "%d_mpict", dst);
+			poti_EndLink(end_date, "MPICt", "MPIL", mpi_container, paje_value, paje_key);
+			poti_SetVariable(start_date, mpi_container, "bwo", current_in_bandwidth[dst]);
 #else
-				fprintf(out_paje_file, "18	%.9f	MPIL	MPIroot	%lu	%d_mpict	mpicom_%lu\n", start_date, (unsigned long)size, /* XXX */src, id);
-				fprintf(out_paje_file, "19	%.9f	MPIL	MPIroot	%lu	%d_mpict	mpicom_%lu\n", end_date, (unsigned long)size, /* XXX */dst, id);
-				fprintf(out_paje_file, "13	%.9f	%d_mpict	bwo	%f\n", start_date, src, current_bandwidth);
+			fprintf(out_paje_file, "18	%.9f	MPIL	MPIroot	%lu	%d_mpict	mpicom_%lu\n", start_date, (unsigned long)size, src, id);
+			fprintf(out_paje_file, "19	%.9f	MPIL	MPIroot	%lu	%d_mpict	mpicom_%lu\n", end_date, (unsigned long)size, dst, id);
+			fprintf(out_paje_file, "13	%.9f	%d_mpict	bwo	%f\n", start_date, src, current_out_bandwidth[src]);
+			fprintf(out_paje_file, "13	%.9f	%d_mpict	bwi	%f\n", start_date, dst, current_in_bandwidth[dst]);
 #endif
-			}
 		}
 		else
 		{
 			_STARPU_DISP("Warning, could not match MPI transfer from %d to %d (tag %x) starting at %f\n", src, dst, mpi_tag, start_date);
 		}
 
-		slot++;
+		slot[src]++;
 	}
 }
 
-void _starpu_fxt_display_mpi_transfers(struct starpu_fxt_options *options, int *ranks, FILE *out_paje_file)
+void _starpu_fxt_display_mpi_transfers(struct starpu_fxt_options *options, int *ranks STARPU_ATTRIBUTE_UNUSED, FILE *out_paje_file)
 {
-	unsigned inputfile;
-
 	if (options->ninputfiles > MAX_MPI_NODES)
 	{
 		_STARPU_DISP("Warning: %u files given, maximum %u supported, truncating to %u\n", options->ninputfiles, MAX_MPI_NODES, MAX_MPI_NODES);
@@ -326,11 +351,8 @@ void _starpu_fxt_display_mpi_transfers(struct starpu_fxt_options *options, int *
 	}
 
 	/* display the MPI transfers if possible */
-	for (inputfile = 0; inputfile < options->ninputfiles; inputfile++)
-	{
-		int filerank = ranks[inputfile];
-		display_all_transfers_from_trace(out_paje_file, filerank);
-	}
+	if (out_paje_file)
+		display_all_transfers_from_trace(out_paje_file, options->ninputfiles);
 }
 
 #endif // STARPU_USE_FXT