ソースを参照

Adding trace management of MPI communication threads.

Marc Sergent 12 年 前
コミット
958544f09e
共有3 個のファイルを変更した280 個の追加15 個の削除を含む
  1. 246 6
      src/debug/traces/starpu_fxt.c
  2. 6 6
      src/debug/traces/starpu_fxt_mpi.c
  3. 28 3
      src/debug/traces/starpu_paje.c

+ 246 - 6
src/debug/traces/starpu_fxt.c

@@ -207,6 +207,12 @@ static char *worker_container_alias(char *output, int len, const char *prefix, l
 	return output;
 }
 
+static char *mpicommthread_container_alias(char *output, int len, const char *prefix)
+{
+	snprintf(output, len, "%smpict", prefix);
+	return output;
+}
+
 static char *program_container_alias(char *output, int len, const char *prefix)
 {
 	snprintf(output, len, "%sp", prefix);
@@ -242,6 +248,17 @@ static void worker_set_state(double time, const char *prefix, long unsigned int
 #endif
 }
 
+static void mpicommthread_set_state(double time, const char *prefix, const char *name)
+{
+#ifdef STARPU_HAVE_POTI
+	char container[STARPU_POTI_STR_LEN];
+	mpicommthread_container_alias(container, STARPU_POTI_STR_LEN, prefix);
+	poti_SetState(time, container, "CtS", name);
+#else
+	fprintf(out_paje_file, "10	%.9f	%smpict	CtS 	%s\n", time, prefix, name);
+#endif
+}
+
 
 /*
  *	Initialization
@@ -960,25 +977,184 @@ static void handle_mpi_barrier(struct fxt_ev_64 *ev, struct starpu_fxt_options *
 	}
 }
 
-static void handle_mpi_isend(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
+static void handle_mpi_start(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
+{
+	double date = get_event_time_stamp(ev, options);
+
+	char *prefix = options->file_prefix;
+
+	if (out_paje_file)
+	{
+#ifdef STARPU_HAVE_POTI
+		char program_container[STARPU_POTI_STR_LEN];
+		program_container_alias(program_container, STARPU_POTI_STR_LEN, prefix);
+		char new_mpicommthread_container_alias[STARPU_POTI_STR_LEN], new_mpicommthread_container_name[STARPU_POTI_STR_LEN];
+		mpicommthread_container_alias(new_mpicommthread_container_alias, STARPU_POTI_STR_LEN, prefix);
+		snprintf(new_memnode_container_name, STARPU_POTI_STR_LEN, "%smpict", prefix);
+		poti_CreateContainer(date, new_mpicommthread_container_alias, "MPICt", program_container, new_mpicommthread_container_name);
+#else
+		fprintf(out_paje_file, "7	%.9f	%smpict		MPICt	%sp	%smpict\n", date, prefix, prefix, prefix);
+#endif
+		mpicommthread_set_state(date, prefix, "Sl");
+	}
+}
+
+static void handle_mpi_stop(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
+{
+	double date = get_event_time_stamp(ev, options);
+
+	char *prefix = options->file_prefix;
+
+	if (out_paje_file)
+	{
+#ifdef STARPU_HAVE_POTI
+		char mpicommthread_container[STARPU_POTI_STR_LEN];
+		mpicommthread_container_alias(mpicommthread_container, STARPU_POTI_STR_LEN, prefix);
+		poti_DestroyContainer(date, "MPICt", mpicommthread_container);
+#else
+		fprintf(out_paje_file, "8	%.9f	%smpict		MPICt\n",
+			date, prefix);
+#endif
+	}
+}
+
+static void handle_mpi_isend_submit_begin(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
+{
+	double date = get_event_time_stamp(ev, options);
+
+	if (out_paje_file)
+		mpicommthread_set_state(date, options->file_prefix, "SdS");
+}
+
+static void handle_mpi_isend_submit_end(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
 {
 	int dest = ev->param[0];
 	int mpi_tag = ev->param[1];
 	size_t size = ev->param[2];
 	double date = get_event_time_stamp(ev, options);
 
+	if (out_paje_file)
+		mpicommthread_set_state(date, options->file_prefix, "P");
+
 	_starpu_fxt_mpi_add_send_transfer(options->file_rank, dest, mpi_tag, size, date);
 }
 
-static void handle_mpi_irecv_end(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
+static void handle_mpi_irecv_submit_begin(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
+{
+	double date = get_event_time_stamp(ev, options);
+
+	if (out_paje_file)
+		mpicommthread_set_state(date, options->file_prefix, "RvS");
+}
+
+static void handle_mpi_irecv_submit_end(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
+{
+	double date = get_event_time_stamp(ev, options);
+
+	if (out_paje_file)
+		mpicommthread_set_state(date, options->file_prefix, "P");
+}
+
+static void handle_mpi_isend_complete_begin(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
+{
+	double date = get_event_time_stamp(ev, options);
+
+	if (out_paje_file)
+		mpicommthread_set_state(date, options->file_prefix, "SdC");
+}
+
+static void handle_mpi_isend_complete_end(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
+{
+	double date = get_event_time_stamp(ev, options);
+
+	if (out_paje_file)
+		mpicommthread_set_state(date, options->file_prefix, "P");
+}
+
+static void handle_mpi_irecv_complete_begin(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
 {
 	int src = ev->param[0];
 	int mpi_tag = ev->param[1];
 	double date = get_event_time_stamp(ev, options);
 
+	if (out_paje_file)
+		mpicommthread_set_state(date, options->file_prefix, "RvC");
+
 	_starpu_fxt_mpi_add_recv_transfer(src, options->file_rank, mpi_tag, date);
 }
 
+static void handle_mpi_irecv_complete_end(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
+{
+	double date = get_event_time_stamp(ev, options);
+
+	if (out_paje_file)
+		mpicommthread_set_state(date, options->file_prefix, "P");
+}
+
+static void handle_mpi_sleep_begin(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
+{
+	double date = get_event_time_stamp(ev, options);
+
+	if (out_paje_file)
+		mpicommthread_set_state(date, options->file_prefix, "Sl");
+}
+
+static void handle_mpi_sleep_end(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
+{
+	double date = get_event_time_stamp(ev, options);
+
+	if (out_paje_file)
+		mpicommthread_set_state(date, options->file_prefix, "P");
+}
+
+static void handle_mpi_dtesting_begin(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
+{
+	double date = get_event_time_stamp(ev, options);
+
+	if (out_paje_file)
+		mpicommthread_set_state(date, options->file_prefix, "DT");
+}
+
+static void handle_mpi_dtesting_end(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
+{
+	double date = get_event_time_stamp(ev, options);
+
+	if (out_paje_file)
+		mpicommthread_set_state(date, options->file_prefix, "P");
+}
+
+static void handle_mpi_utesting_begin(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
+{
+	double date = get_event_time_stamp(ev, options);
+
+	if (out_paje_file)
+		mpicommthread_set_state(date, options->file_prefix, "UT");
+}
+
+static void handle_mpi_utesting_end(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
+{
+	double date = get_event_time_stamp(ev, options);
+
+	if (out_paje_file)
+		mpicommthread_set_state(date, options->file_prefix, "P");
+}
+
+static void handle_mpi_uwait_begin(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
+{
+	double date = get_event_time_stamp(ev, options);
+
+	if (out_paje_file)
+		mpicommthread_set_state(date, options->file_prefix, "UW");
+}
+
+static void handle_mpi_uwait_end(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
+{
+	double date = get_event_time_stamp(ev, options);
+
+	if (out_paje_file)
+		mpicommthread_set_state(date, options->file_prefix, "P");
+}
+
 static void handle_set_profiling(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
 {
 	int status = ev->param[0];
@@ -1251,16 +1427,80 @@ void starpu_fxt_parse_new_file(char *filename_in, struct starpu_fxt_options *opt
 				handle_user_event(&ev, options);
 				break;
 
+			case FUT_MPI_START:
+				handle_mpi_start(&ev, options);
+				break;
+
+			case FUT_MPI_STOP:
+				handle_mpi_stop(&ev, options);
+				break;
+
 			case FUT_MPI_BARRIER:
 				handle_mpi_barrier(&ev, options);
 				break;
 
-			case FUT_MPI_ISEND:
-				handle_mpi_isend(&ev, options);
+			case FUT_MPI_ISEND_SUBMIT_BEGIN:
+				handle_mpi_isend_submit_begin(&ev, options);
+				break;
+
+			case FUT_MPI_ISEND_SUBMIT_END:
+				handle_mpi_isend_submit_end(&ev, options);
+				break;
+
+			case FUT_MPI_IRECV_SUBMIT_BEGIN:
+				handle_mpi_irecv_submit_begin(&ev, options);
+				break;
+
+			case FUT_MPI_IRECV_SUBMIT_END:
+				handle_mpi_irecv_submit_end(&ev, options);
+				break;
+
+			case FUT_MPI_ISEND_COMPLETE_BEGIN:
+				handle_mpi_isend_complete_begin(&ev, options);
+				break;
+
+			case FUT_MPI_ISEND_COMPLETE_END:
+				handle_mpi_isend_complete_end(&ev, options);
+				break;
+
+			case FUT_MPI_IRECV_COMPLETE_BEGIN:
+				handle_mpi_irecv_complete_begin(&ev, options);
+				break;
+
+			case FUT_MPI_IRECV_COMPLETE_END:
+				handle_mpi_irecv_complete_end(&ev, options);
+				break;
+
+			case FUT_MPI_SLEEP_BEGIN:
+				handle_mpi_sleep_begin(&ev, options);
+				break;
+
+			case FUT_MPI_SLEEP_END:
+				handle_mpi_sleep_end(&ev, options);
+				break;
+
+			case FUT_MPI_DTESTING_BEGIN:
+				handle_mpi_dtesting_begin(&ev, options);
+				break;
+
+			case FUT_MPI_DTESTING_END:
+				handle_mpi_dtesting_end(&ev, options);
+				break;
+
+			case FUT_MPI_UTESTING_BEGIN:
+				handle_mpi_utesting_begin(&ev, options);
+				break;
+
+			case FUT_MPI_UTESTING_END:
+				handle_mpi_utesting_end(&ev, options);
+				break;
+
+			case FUT_MPI_UWAIT_BEGIN:
+				handle_mpi_uwait_begin(&ev, options);
 				break;
 
-			case FUT_MPI_IRECV_END:
-				handle_mpi_irecv_end(&ev, options);
+			case FUT_MPI_UWAIT_END:
+				handle_mpi_uwait_end(&ev, options);
 				break;
 
 			case _STARPU_FUT_SET_PROFILING:

+ 6 - 6
src/debug/traces/starpu_fxt_mpi.c

@@ -229,13 +229,13 @@ static void display_all_transfers_from_trace(FILE *out_paje_file, int src)
 				snprintf(paje_value, STARPU_POTI_STR_LEN, "%lu", (long unsigned) size);
 				snprintf(paje_key, STARPU_POTI_STR_LEN, "mpicom_%lu", id);
 				char mpi_container[STARPU_POTI_STR_LEN];
-				snprintf(mpi_container, sizeof(mpi_container), "%d_p", /* XXX */src);
-				poti_StartLink(start_date, "MPIroot", "MPIL", mpi_container, paje_value, paje_key);
-				snprintf(mpi_container, sizeof(mpi_container), "%d_p", /* XXX */dst);
-				poti_EndLink(end_date, "MPIroot", "MPIL", mpi_container, paje_value, paje_key);
+				snprintf(mpi_container, sizeof(mpi_container), "%d_mpict", /* XXX */src);
+				poti_StartLink(start_date, "MPICt", "MPIL", mpi_container, paje_value, paje_key);
+				snprintf(mpi_container, sizeof(mpi_container), "%d_mpict", /* XXX */dst);
+				poti_EndLink(end_date, "MPICt", "MPIL", mpi_container, paje_value, paje_key);
 #else
-				fprintf(out_paje_file, "18	%.9f	MPIL	MPIroot	%ld	%d_p	mpicom_%lu\n", start_date, size, /* XXX */src, id);
-				fprintf(out_paje_file, "19	%.9f	MPIL	MPIroot	%ld	%d_p	mpicom_%lu\n", end_date, size, /* XXX */dst, id);
+				fprintf(out_paje_file, "18	%.9f	MPIL	MPIroot	%ld	%d_mpict	mpicom_%lu\n", start_date, size, /* XXX */src, id);
+				fprintf(out_paje_file, "19	%.9f	MPIL	MPIroot	%ld	%d_mpict	mpicom_%lu\n", end_date, size, /* XXX */dst, id);
 #endif
 			}
 		}

+ 28 - 3
src/debug/traces/starpu_paje.c

@@ -138,6 +138,7 @@ void _starpu_fxt_write_paje_header(FILE *file)
 	poti_DefineContainerType("Mn", "P", "Memory Node");
 	poti_DefineContainerType("T", "Mn", "Thread");
 	poti_DefineContainerType("W", "T", "Worker");
+	poti_DefineContainerType("MPICt", "T", "MPI Communication Thread");
 	poti_DefineContainerType("Sc", "P", "Scheduler");
 
 	/* Types for the memory node */
@@ -162,6 +163,18 @@ void _starpu_fxt_write_paje_header(FILE *file)
 	poti_DefineEntityValue("Sl", "S", "Sleeping", ".9 .1 .0");
 	poti_DefineEntityValue("P", "S", "Progressing", ".4 .1 .6");
 
+	/* Types for the MPI Communication Thread of the Memory Node */
+	poti_DefineEventType("MPIev", "MPICt", "MPI event type");
+	poti_DefineStateType("CtS", "MPICt", "Communication Thread State");
+	poti_DefineEntityValue("P", "CtS", "Processing", "0 0 0");
+	poti_DefineEntityValue("Sl", "CtS", "Sleeping", ".9 .1 .0");
+	poti_DefineEntityValue("UT", "CtS", "UserTesting", ".2 .1 .6");
+	poti_DefineEntityValue("UW", "CtS", "UserWaiting", ".4 .1 .3");
+	poti_DefineEntityValue("SdS", "CtS", "SendSubmitted", "1.0 .1 1.0");
+	poti_DefineEntityValue("RvS", "CtS", "RecieveSubmitted", "0.1 1.0 1.0");
+	poti_DefineEntityValue("SdC", "CtS", "SendCompleted", "1.0 .5 1.0");
+	poti_DefineEntityValue("RvC", "CtS", "RecieveCompleted", "0.5 1.0 1.0");
+
 	for (i=1; i<=10; i++)
 	{
 		char inctx[8];
@@ -182,7 +195,7 @@ void _starpu_fxt_write_paje_header(FILE *file)
 	poti_DefineVariableType("ntask", "Sc", "Number of tasks", "0 0 0");
 
 	/* Link types */
-	poti_DefineLinkType("MPIL", "MPIP", "P", "P", "Links between two MPI programs");
+	poti_DefineLinkType("MPIL", "P", "MPICt", "MPICt", "Links between two MPI Communication Threads");
 	poti_DefineLinkType("L", "P", "Mn", "Mn", "Links between two Memory Nodes");
 
 	/* Creating the MPI Program */
@@ -194,9 +207,12 @@ void _starpu_fxt_write_paje_header(FILE *file)
 1       Mn      P       \"Memory Node\"                         \n\
 1       T      Mn       \"Thread\"                               \n\
 1       W      T       \"Worker\"                               \n\
+1       MPICt   T       \"MPI Communication Thread\"              \n\
 1       Sc       P       \"Scheduler State\"                        \n\
 2       event   T       \"event type\"				\n\
-3       S       T       \"Thread State\"                        \n");
+2       MPIev   MPICt    \"MPI event type\"			\n\
+3       S       T       \"Thread State\"                        \n\
+3       CtS     MPICt    \"Communication Thread State\"          \n");
 	for (i=1; i<=10; i++)
 		fprintf(file, "3       Ctx%u      T     \"InCtx%u\"         		\n", i, i);
 	fprintf(file, "\
@@ -211,6 +227,15 @@ void _starpu_fxt_write_paje_header(FILE *file)
 6       B       S       Blocked         \".9 .1 .0\"		\n\
 6       Sl       S      Sleeping         \".9 .1 .0\"		\n\
 6       P       S       Progressing         \".4 .1 .6\"		\n");
+	fprintf(file, "\
+6       P       CtS       Processing         \"0 0 0\"		\n\
+6       Sl       CtS      Sleeping         \".9 .1 .0\"		\n\
+6       UT       CtS      UserTesting        \".2 .1 .6\"	\n\
+6       UW       CtS      UserWaiting        \".4 .1 .3\"	\n\
+6       SdS       CtS      SendSubmitted     \"1.0 .1 1.0\"	\n\
+6       RvS       CtS      RecieveSubmitted  \"0.1 1.0 1.0\"	\n\
+6       SdC       CtS      SendCompleted     \"1.0 .5 1.0\"	\n\
+6       RvC       CtS      RecieveCompleted  \"0.5 1.0 1.0\"	\n");
 	for (i=1; i<=10; i++)
 		fprintf(file, "\
 6       I       Ctx%u      Initializing       \"0.0 .7 1.0\"            \n\
@@ -229,7 +254,7 @@ void _starpu_fxt_write_paje_header(FILE *file)
 6       Co       MS     DriverCopy         \".3 .5 .1\"		\n\
 6       CoA      MS     DriverCopyAsync         \".1 .3 .1\"		\n\
 6       No       MS     Nothing         \".0 .0 .0\"		\n\
-5       MPIL     MPIP	P	P      MPIL\n\
+5       MPIL     P	MPICt	MPICt   MPIL			\n\
 5       L       P	Mn	Mn      L\n");
 
 	fprintf(file, "7      0.0 MPIroot      MPIP      0       root\n");