Browse Source

Adding trace management of MPI communication threads.

Marc Sergent 12 years ago
parent
commit
700c3bcd91

+ 52 - 11
mpi/src/starpu_mpi.c

@@ -127,10 +127,12 @@ static void _starpu_mpi_isend_data_func(struct _starpu_mpi_req *req)
 
 	_starpu_mpi_comm_amounts_inc(req->comm, req->srcdst, req->datatype, req->count);
 
+	TRACE_MPI_ISEND_SUBMIT_BEGIN(req->srcdst, req->mpi_tag, 0);
+
 	req->ret = MPI_Isend(req->ptr, req->count, req->datatype, req->srcdst, req->mpi_tag, req->comm, &req->request);
 	STARPU_ASSERT(req->ret == MPI_SUCCESS);
 
-	TRACE_MPI_ISEND(req->srcdst, req->mpi_tag, 0);
+	TRACE_MPI_ISEND_SUBMIT_END(req->srcdst, req->mpi_tag, 0);
 
 	/* somebody is perhaps waiting for the MPI request to be posted */
 	_STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
@@ -232,9 +234,13 @@ static void _starpu_mpi_irecv_data_func(struct _starpu_mpi_req *req)
 
 	_STARPU_MPI_DEBUG("post MPI irecv tag %d src %d data %p ptr %p datatype %p count %d req %p \n", req->mpi_tag, req->srcdst, req->data_handle, req->ptr, req->datatype, (int)req->count, &req->request);
 
+	TRACE_MPI_IRECV_SUBMIT_BEGIN(req->srcdst, req->mpi_tag);
+
 	req->ret = MPI_Irecv(req->ptr, req->count, req->datatype, req->srcdst, req->mpi_tag, req->comm, &req->request);
 	STARPU_ASSERT(req->ret == MPI_SUCCESS);
 
+	TRACE_MPI_IRECV_SUBMIT_END(req->srcdst, req->mpi_tag);
+
 	/* somebody is perhaps waiting for the MPI request to be posted */
 	_STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
 	req->submitted = 1;
@@ -367,9 +373,13 @@ static void _starpu_mpi_wait_func(struct _starpu_mpi_req *waiting_req)
 	/* Which is the mpi request we are waiting for ? */
 	struct _starpu_mpi_req *req = waiting_req->other_request;
 
+	TRACE_MPI_UWAIT_BEGIN(req->srcdst, req->mpi_tag);
+
 	req->ret = MPI_Wait(&req->request, waiting_req->status);
 	STARPU_ASSERT(req->ret == MPI_SUCCESS);
 
+	TRACE_MPI_UWAIT_END(req->srcdst, req->mpi_tag);
+
 	_starpu_mpi_handle_request_termination(req);
 	_STARPU_MPI_LOG_OUT();
 }
@@ -431,9 +441,14 @@ static void _starpu_mpi_test_func(struct _starpu_mpi_req *testing_req)
 	struct _starpu_mpi_req *req = testing_req->other_request;
 
 	_STARPU_MPI_DEBUG("Test request %p - mpitag %d - TYPE %s %d\n", &req->request, req->mpi_tag, _starpu_mpi_request_type(req->request_type), req->srcdst);
+
+	TRACE_MPI_UTESTING_BEGIN(req->srcdst, req->mpi_tag);
+	
 	req->ret = MPI_Test(&req->request, testing_req->flag, testing_req->status);
 	STARPU_ASSERT(req->ret == MPI_SUCCESS);
 
+	TRACE_MPI_UTESTING_END(req->srcdst, req->mpi_tag);
+	
 	if (*testing_req->flag)
 	{
 		testing_req->ret = req->ret;
@@ -633,12 +648,7 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req)
 		}
 		starpu_data_release(req->data_handle);
 	}
-
-	if (req->request_type == RECV_REQ || req->request_type == PROBE_REQ)
-	{
-		TRACE_MPI_IRECV_END(req->srcdst, req->mpi_tag);
-	}
-
+	
 	/* Execute the specified callback, if any */
 	if (req->callback)
 		req->callback(req->callback_arg);
@@ -703,6 +713,7 @@ static void _starpu_mpi_test_detached_requests(void)
 		_STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
 
 		//_STARPU_MPI_DEBUG("Test detached request %p - mpitag %d - TYPE %s %d\n", &req->request, req->mpi_tag, _starpu_mpi_request_type(req->request_type), req->srcdst);
+
 		if (req->request_type == PROBE_REQ)
 		{
 			req->ret = MPI_Iprobe(req->srcdst, req->mpi_tag, req->comm, &flag, &status);
@@ -716,7 +727,25 @@ static void _starpu_mpi_test_detached_requests(void)
 
 		if (flag)
 		{
+			if (req->request_type == RECV_REQ || req->request_type == PROBE_REQ)
+			{
+				TRACE_MPI_IRECV_COMPLETE_BEGIN(req->srcdst, req->mpi_tag);
+			}
+			else if (req->request_type == SEND_REQ)
+			{
+				TRACE_MPI_ISEND_COMPLETE_BEGIN(req->srcdst, req->mpi_tag, 0);
+			}
+
 			_starpu_mpi_handle_request_termination(req);
+
+			if (req->request_type == RECV_REQ || req->request_type == PROBE_REQ)
+			{
+				TRACE_MPI_IRECV_COMPLETE_END(req->srcdst, req->mpi_tag);
+			}
+			else if (req->request_type == SEND_REQ)
+			{
+				TRACE_MPI_ISEND_COMPLETE_END(req->srcdst, req->mpi_tag, 0);
+			}
 		}
 
 		_STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
@@ -726,7 +755,6 @@ static void _starpu_mpi_test_detached_requests(void)
 			_starpu_mpi_req_list_erase(detached_requests, req);
 			free(req);
 		}
-
 	}
 
 	_STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
@@ -832,10 +860,15 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 		if (block)
 		{
 			_STARPU_MPI_DEBUG("NO MORE REQUESTS TO HANDLE\n");
+
+			TRACE_MPI_SLEEP_BEGIN();
+			
 			if (barrier_running)
 				/* Tell mpi_barrier */
 				_STARPU_PTHREAD_COND_SIGNAL(&cond_finished);
 			_STARPU_PTHREAD_COND_WAIT(&cond_progression, &mutex);
+
+			TRACE_MPI_SLEEP_END();
 		}
 
 		/* test whether there are some terminated "detached request" */
@@ -933,6 +966,14 @@ int _starpu_mpi_initialize(int *argc, char ***argv, int initialize_mpi)
 	argc_argv->initialize_mpi = initialize_mpi;
 	argc_argv->argc = argc;
 	argc_argv->argv = argv;
+
+	int rank, worldsize;
+
+	MPI_Comm_rank(MPI_COMM_WORLD, &rank);
+	MPI_Comm_size(MPI_COMM_WORLD, &worldsize);
+
+	TRACE_MPI_START(rank,worldsize);
+
 	_STARPU_PTHREAD_CREATE("MPI progress", &progress_thread, NULL, _starpu_mpi_progress_thread_func, argc_argv);
 
 	_STARPU_PTHREAD_MUTEX_LOCK(&mutex);
@@ -941,9 +982,7 @@ int _starpu_mpi_initialize(int *argc, char ***argv, int initialize_mpi)
 	_STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
 
 #ifdef STARPU_USE_FXT
-	int prank;
-	MPI_Comm_rank(MPI_COMM_WORLD, &prank);
-	starpu_set_profiling_id(prank);
+	starpu_set_profiling_id(rank);
 #endif //STARPU_USE_FXT
 
 #ifdef USE_STARPU_ACTIVITY
@@ -1002,6 +1041,8 @@ int starpu_mpi_shutdown(void)
 	starpu_progression_hook_deregister(hookid);
 #endif
 
+	TRACE_MPI_STOP(rank, world_size);
+
 	/* free the request queues */
 	_starpu_mpi_req_list_delete(detached_requests);
 	_starpu_mpi_req_list_delete(new_requests);

+ 74 - 10
mpi/src/starpu_mpi_fxt.h

@@ -26,22 +26,86 @@
 extern "C" {
 #endif
 
-#define FUT_MPI_BARRIER		0x5201
-#define FUT_MPI_ISEND		0x5202
-#define FUT_MPI_IRECV_END	0x5203
+#define FUT_MPI_START				0x5201
+#define FUT_MPI_STOP				0x5202
+#define FUT_MPI_BARRIER				0x5203
+#define FUT_MPI_ISEND_SUBMIT_BEGIN		0x5204
+#define FUT_MPI_ISEND_SUBMIT_END		0x5205
+#define FUT_MPI_IRECV_SUBMIT_BEGIN		0x5206
+#define FUT_MPI_IRECV_SUBMIT_END		0x5207
+#define FUT_MPI_ISEND_COMPLETE_BEGIN		0x5208
+#define FUT_MPI_ISEND_COMPLETE_END		0x5209
+#define FUT_MPI_IRECV_COMPLETE_BEGIN		0x5210
+#define FUT_MPI_IRECV_COMPLETE_END		0x5211
+#define FUT_MPI_SLEEP_BEGIN			0x5212
+#define FUT_MPI_SLEEP_END			0x5213
+#define FUT_MPI_DTESTING_BEGIN			0x5214
+#define FUT_MPI_DTESTING_END			0x5215
+#define FUT_MPI_UTESTING_BEGIN			0x5216
+#define FUT_MPI_UTESTING_END			0x5217
+#define FUT_MPI_UWAIT_BEGIN			0x5218
+#define FUT_MPI_UWAIT_END			0x5219
 
 #ifdef STARPU_USE_FXT
+#define TRACE_MPI_START(rank, worldsize)	\
+	FUT_DO_PROBE3(FUT_MPI_START, (rank), (worldsize), _starpu_gettid());
+#define TRACE_MPI_STOP(rank, worldsize)	\
+	FUT_DO_PROBE3(FUT_MPI_STOP, (rank), (worldsize), _starpu_gettid());
 #define TRACE_MPI_BARRIER(rank, worldsize, key)	\
 	FUT_DO_PROBE4(FUT_MPI_BARRIER, (rank), (worldsize), (key), _starpu_gettid());
-#define TRACE_MPI_ISEND(dest, mpi_tag, size)	\
-	FUT_DO_PROBE4(FUT_MPI_ISEND, (dest), (mpi_tag), (size), _starpu_gettid());
-#define TRACE_MPI_IRECV_END(src, mpi_tag)	\
-	FUT_DO_PROBE3(FUT_MPI_IRECV_END, (src), (mpi_tag), _starpu_gettid());
+#define TRACE_MPI_ISEND_SUBMIT_BEGIN(dest, mpi_tag, size)	\
+	FUT_DO_PROBE4(FUT_MPI_ISEND_SUBMIT_BEGIN, (dest), (mpi_tag), (size), _starpu_gettid());
+#define TRACE_MPI_ISEND_SUBMIT_END(dest, mpi_tag, size)	\
+	FUT_DO_PROBE4(FUT_MPI_ISEND_SUBMIT_END, (dest), (mpi_tag), (size), _starpu_gettid());
+#define TRACE_MPI_IRECV_SUBMIT_BEGIN(src, mpi_tag)	\
+	FUT_DO_PROBE3(FUT_MPI_IRECV_SUBMIT_BEGIN, (src), (mpi_tag), _starpu_gettid());
+#define TRACE_MPI_IRECV_SUBMIT_END(src, mpi_tag)	\
+	FUT_DO_PROBE3(FUT_MPI_IRECV_SUBMIT_END, (src), (mpi_tag), _starpu_gettid());
+#define TRACE_MPI_ISEND_COMPLETE_BEGIN(dest, mpi_tag, size)	\
+	FUT_DO_PROBE4(FUT_MPI_ISEND_COMPLETE_BEGIN, (dest), (mpi_tag), (size), _starpu_gettid());
+#define TRACE_MPI_ISEND_COMPLETE_END(dest, mpi_tag, size)	\
+	FUT_DO_PROBE4(FUT_MPI_ISEND_COMPLETE_END, (dest), (mpi_tag), (size), _starpu_gettid());
+#define TRACE_MPI_IRECV_COMPLETE_BEGIN(src, mpi_tag)	\
+	FUT_DO_PROBE3(FUT_MPI_IRECV_COMPLETE_BEGIN, (src), (mpi_tag), _starpu_gettid());
+#define TRACE_MPI_IRECV_COMPLETE_END(src, mpi_tag)	\
+	FUT_DO_PROBE3(FUT_MPI_IRECV_COMPLETE_END, (src), (mpi_tag), _starpu_gettid());
+#define TRACE_MPI_SLEEP_BEGIN()	\
+	FUT_DO_PROBE1(FUT_MPI_SLEEP_BEGIN, _starpu_gettid());
+#define TRACE_MPI_SLEEP_END()	\
+	FUT_DO_PROBE1(FUT_MPI_SLEEP_END, _starpu_gettid());
+#define TRACE_MPI_DTESTING_BEGIN()	\
+	FUT_DO_PROBE1(FUT_MPI_DTESTING_BEGIN,  _starpu_gettid());
+#define TRACE_MPI_DTESTING_END()	\
+	FUT_DO_PROBE1(FUT_MPI_DTESTING_END, _starpu_gettid());
+#define TRACE_MPI_UTESTING_BEGIN(src, mpi_tag)	\
+	FUT_DO_PROBE3(FUT_MPI_UTESTING_BEGIN, (src), (mpi_tag),  _starpu_gettid());
+#define TRACE_MPI_UTESTING_END(src, mpi_tag)	\
+	FUT_DO_PROBE3(FUT_MPI_UTESTING_END, (src), (mpi_tag), _starpu_gettid());
+#define TRACE_MPI_UWAIT_BEGIN(src, mpi_tag)	\
+	FUT_DO_PROBE3(FUT_MPI_UWAIT_BEGIN, (src), (mpi_tag),  _starpu_gettid());
+#define TRACE_MPI_UWAIT_END(src, mpi_tag)	\
+	FUT_DO_PROBE3(FUT_MPI_UWAIT_END, (src), (mpi_tag), _starpu_gettid());
 #define TRACE
 #else
-#define TRACE_MPI_BARRIER(a, b, c)	do {} while(0);
-#define TRACE_MPI_ISEND(a, b, c)	do {} while(0);
-#define TRACE_MPI_IRECV_END(a, b)	do {} while(0);
+#define TRACE_MPI_START(a, b)				do {} while(0);
+#define TRACE_MPI_STOP(a, b)				do {} while(0);
+#define TRACE_MPI_BARRIER(a, b, c)			do {} while(0);
+#define TRACE_MPI_ISEND_SUBMIT_BEGIN(a, b, c)		do {} while(0);
+#define TRACE_MPI_ISEND_SUBMIT_END(a, b, c)		do {} while(0);
+#define TRACE_MPI_IRECV_SUBMIT_BEGIN(a, b)		do {} while(0);
+#define TRACE_MPI_IRECV_SUBMIT_END(a, b)		do {} while(0);
+#define TRACE_MPI_ISEND_COMPLETE_BEGIN(a, b, c)		do {} while(0);
+#define TRACE_MPI_ISEND_COMPLETE_END(a, b, c)		do {} while(0);
+#define TRACE_MPI_IRECV_COMPLETE_BEGIN(a, b)		do {} while(0);
+#define TRACE_MPI_IRECV_COMPLETE_END(a, b)		do {} while(0);
+#define TRACE_MPI_SLEEP_BEGIN()				do {} while(0);
+#define TRACE_MPI_SLEEP_END()				do {} while(0);
+#define TRACE_MPI_DTESTING_BEGIN()			do {} while(0);
+#define TRACE_MPI_DTESTING_END()			do {} while(0);
+#define TRACE_MPI_UTESTING_BEGIN(a, b)			do {} while(0);
+#define TRACE_MPI_UTESTING_END(a, b)			do {} while(0);
+#define TRACE_MPI_UWAIT_BEGIN(a, b)			do {} while(0);
+#define TRACE_MPI_UWAIT_END(a, b)			do {} while(0);
 #endif
 
 #ifdef __cplusplus

+ 246 - 6
src/debug/traces/starpu_fxt.c

@@ -207,6 +207,12 @@ static char *worker_container_alias(char *output, int len, const char *prefix, l
 	return output;
 }
 
+static char *mpicommthread_container_alias(char *output, int len, const char *prefix)
+{
+	snprintf(output, len, "%smpict", prefix);
+	return output;
+}
+
 static char *program_container_alias(char *output, int len, const char *prefix)
 {
 	snprintf(output, len, "%sp", prefix);
@@ -242,6 +248,17 @@ static void worker_set_state(double time, const char *prefix, long unsigned int
 #endif
 }
 
+static void mpicommthread_set_state(double time, const char *prefix, const char *name)
+{
+#ifdef STARPU_HAVE_POTI
+	char container[STARPU_POTI_STR_LEN];
+	mpicommthread_container_alias(container, STARPU_POTI_STR_LEN, prefix);
+	poti_SetState(time, container, "CtS", name);
+#else
+	fprintf(out_paje_file, "10	%.9f	%smpict	CtS 	%s\n", time, prefix, name);
+#endif
+}
+
 
 /*
  *	Initialization
@@ -960,25 +977,184 @@ static void handle_mpi_barrier(struct fxt_ev_64 *ev, struct starpu_fxt_options *
 	}
 }
 
-static void handle_mpi_isend(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
+static void handle_mpi_start(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
+{
+	double date = get_event_time_stamp(ev, options);
+
+	char *prefix = options->file_prefix;
+
+	if (out_paje_file)
+	{
+#ifdef STARPU_HAVE_POTI
+		char program_container[STARPU_POTI_STR_LEN];
+		program_container_alias(program_container, STARPU_POTI_STR_LEN, prefix);
+		char new_mpicommthread_container_alias[STARPU_POTI_STR_LEN], new_mpicommthread_container_name[STARPU_POTI_STR_LEN];
+		mpicommthread_container_alias(new_mpicommthread_container_alias, STARPU_POTI_STR_LEN, prefix);
+		snprintf(new_memnode_container_name, STARPU_POTI_STR_LEN, "%smpict", prefix);
+		poti_CreateContainer(date, new_mpicommthread_container_alias, "MPICt", program_container, new_mpicommthread_container_name);
+#else
+		fprintf(out_paje_file, "7	%.9f	%smpict		MPICt	%sp	%smpict\n", date, prefix, prefix, prefix);
+#endif
+		mpicommthread_set_state(date, prefix, "Sl");
+	}
+}
+
+static void handle_mpi_stop(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
+{
+	double date = get_event_time_stamp(ev, options);
+
+	char *prefix = options->file_prefix;
+
+	if (out_paje_file)
+	{
+#ifdef STARPU_HAVE_POTI
+		char mpicommthread_container[STARPU_POTI_STR_LEN];
+		mpicommthread_container_alias(mpicommthread_container, STARPU_POTI_STR_LEN, prefix);
+		poti_DestroyContainer(date, "MPICt", mpicommthread_container);
+#else
+		fprintf(out_paje_file, "8	%.9f	%smpict		MPICt\n",
+			date, prefix);
+#endif
+	}
+}
+
+static void handle_mpi_isend_submit_begin(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
+{
+	double date = get_event_time_stamp(ev, options);
+
+	if (out_paje_file)
+		mpicommthread_set_state(date, options->file_prefix, "SdS");
+}
+
+static void handle_mpi_isend_submit_end(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
 {
 	int dest = ev->param[0];
 	int mpi_tag = ev->param[1];
 	size_t size = ev->param[2];
 	double date = get_event_time_stamp(ev, options);
 
+	if (out_paje_file)
+		mpicommthread_set_state(date, options->file_prefix, "P");
+
 	_starpu_fxt_mpi_add_send_transfer(options->file_rank, dest, mpi_tag, size, date);
 }
 
-static void handle_mpi_irecv_end(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
+static void handle_mpi_irecv_submit_begin(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
+{
+	double date = get_event_time_stamp(ev, options);
+
+	if (out_paje_file)
+		mpicommthread_set_state(date, options->file_prefix, "RvS");
+}
+
+static void handle_mpi_irecv_submit_end(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
+{
+	double date = get_event_time_stamp(ev, options);
+
+	if (out_paje_file)
+		mpicommthread_set_state(date, options->file_prefix, "P");
+}
+
+static void handle_mpi_isend_complete_begin(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
+{
+	double date = get_event_time_stamp(ev, options);
+
+	if (out_paje_file)
+		mpicommthread_set_state(date, options->file_prefix, "SdC");
+}
+
+static void handle_mpi_isend_complete_end(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
+{
+	double date = get_event_time_stamp(ev, options);
+
+	if (out_paje_file)
+		mpicommthread_set_state(date, options->file_prefix, "P");
+}
+
+static void handle_mpi_irecv_complete_begin(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
 {
 	int src = ev->param[0];
 	int mpi_tag = ev->param[1];
 	double date = get_event_time_stamp(ev, options);
 
+	if (out_paje_file)
+		mpicommthread_set_state(date, options->file_prefix, "RvC");
+
 	_starpu_fxt_mpi_add_recv_transfer(src, options->file_rank, mpi_tag, date);
 }
 
+static void handle_mpi_irecv_complete_end(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
+{
+	double date = get_event_time_stamp(ev, options);
+
+	if (out_paje_file)
+		mpicommthread_set_state(date, options->file_prefix, "P");
+}
+
+static void handle_mpi_sleep_begin(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
+{
+	double date = get_event_time_stamp(ev, options);
+
+	if (out_paje_file)
+		mpicommthread_set_state(date, options->file_prefix, "Sl");
+}
+
+static void handle_mpi_sleep_end(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
+{
+	double date = get_event_time_stamp(ev, options);
+
+	if (out_paje_file)
+		mpicommthread_set_state(date, options->file_prefix, "P");
+}
+
+static void handle_mpi_dtesting_begin(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
+{
+	double date = get_event_time_stamp(ev, options);
+
+	if (out_paje_file)
+		mpicommthread_set_state(date, options->file_prefix, "DT");
+}
+
+static void handle_mpi_dtesting_end(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
+{
+	double date = get_event_time_stamp(ev, options);
+
+	if (out_paje_file)
+		mpicommthread_set_state(date, options->file_prefix, "P");
+}
+
+static void handle_mpi_utesting_begin(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
+{
+	double date = get_event_time_stamp(ev, options);
+
+	if (out_paje_file)
+		mpicommthread_set_state(date, options->file_prefix, "UT");
+}
+
+static void handle_mpi_utesting_end(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
+{
+	double date = get_event_time_stamp(ev, options);
+
+	if (out_paje_file)
+		mpicommthread_set_state(date, options->file_prefix, "P");
+}
+
+static void handle_mpi_uwait_begin(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
+{
+	double date = get_event_time_stamp(ev, options);
+
+	if (out_paje_file)
+		mpicommthread_set_state(date, options->file_prefix, "UW");
+}
+
+static void handle_mpi_uwait_end(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
+{
+	double date = get_event_time_stamp(ev, options);
+
+	if (out_paje_file)
+		mpicommthread_set_state(date, options->file_prefix, "P");
+}
+
 static void handle_set_profiling(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
 {
 	int status = ev->param[0];
@@ -1251,16 +1427,80 @@ void starpu_fxt_parse_new_file(char *filename_in, struct starpu_fxt_options *opt
 				handle_user_event(&ev, options);
 				break;
 
+			case FUT_MPI_START:
+				handle_mpi_start(&ev, options);
+				break;
+
+			case FUT_MPI_STOP:
+				handle_mpi_stop(&ev, options);
+				break;
+
 			case FUT_MPI_BARRIER:
 				handle_mpi_barrier(&ev, options);
 				break;
 
-			case FUT_MPI_ISEND:
-				handle_mpi_isend(&ev, options);
+			case FUT_MPI_ISEND_SUBMIT_BEGIN:
+				handle_mpi_isend_submit_begin(&ev, options);
+				break;
+
+			case FUT_MPI_ISEND_SUBMIT_END:
+				handle_mpi_isend_submit_end(&ev, options);
+				break;
+
+			case FUT_MPI_IRECV_SUBMIT_BEGIN:
+				handle_mpi_irecv_submit_begin(&ev, options);
+				break;
+
+			case FUT_MPI_IRECV_SUBMIT_END:
+				handle_mpi_irecv_submit_end(&ev, options);
+				break;
+
+			case FUT_MPI_ISEND_COMPLETE_BEGIN:
+				handle_mpi_isend_complete_begin(&ev, options);
+				break;
+
+			case FUT_MPI_ISEND_COMPLETE_END:
+				handle_mpi_isend_complete_end(&ev, options);
+				break;
+
+			case FUT_MPI_IRECV_COMPLETE_BEGIN:
+				handle_mpi_irecv_complete_begin(&ev, options);
+				break;
+
+			case FUT_MPI_IRECV_COMPLETE_END:
+				handle_mpi_irecv_complete_end(&ev, options);
+				break;
+
+			case FUT_MPI_SLEEP_BEGIN:
+				handle_mpi_sleep_begin(&ev, options);
+				break;
+
+			case FUT_MPI_SLEEP_END:
+				handle_mpi_sleep_end(&ev, options);
+				break;
+
+			case FUT_MPI_DTESTING_BEGIN:
+				handle_mpi_dtesting_begin(&ev, options);
+				break;
+
+			case FUT_MPI_DTESTING_END:
+				handle_mpi_dtesting_end(&ev, options);
+				break;
+
+			case FUT_MPI_UTESTING_BEGIN:
+				handle_mpi_utesting_begin(&ev, options);
+				break;
+
+			case FUT_MPI_UTESTING_END:
+				handle_mpi_utesting_end(&ev, options);
+				break;
+
+			case FUT_MPI_UWAIT_BEGIN:
+				handle_mpi_uwait_begin(&ev, options);
 				break;
 
-			case FUT_MPI_IRECV_END:
-				handle_mpi_irecv_end(&ev, options);
+			case FUT_MPI_UWAIT_END:
+				handle_mpi_uwait_end(&ev, options);
 				break;
 
 			case _STARPU_FUT_SET_PROFILING:

+ 6 - 6
src/debug/traces/starpu_fxt_mpi.c

@@ -229,13 +229,13 @@ static void display_all_transfers_from_trace(FILE *out_paje_file, int src)
 				snprintf(paje_value, STARPU_POTI_STR_LEN, "%lu", (long unsigned) size);
 				snprintf(paje_key, STARPU_POTI_STR_LEN, "mpicom_%lu", id);
 				char mpi_container[STARPU_POTI_STR_LEN];
-				snprintf(mpi_container, sizeof(mpi_container), "%d_p", /* XXX */src);
-				poti_StartLink(start_date, "MPIroot", "MPIL", mpi_container, paje_value, paje_key);
-				snprintf(mpi_container, sizeof(mpi_container), "%d_p", /* XXX */dst);
-				poti_EndLink(end_date, "MPIroot", "MPIL", mpi_container, paje_value, paje_key);
+				snprintf(mpi_container, sizeof(mpi_container), "%d_mpict", /* XXX */src);
+				poti_StartLink(start_date, "MPICt", "MPIL", mpi_container, paje_value, paje_key);
+				snprintf(mpi_container, sizeof(mpi_container), "%d_mpict", /* XXX */dst);
+				poti_EndLink(end_date, "MPICt", "MPIL", mpi_container, paje_value, paje_key);
 #else
-				fprintf(out_paje_file, "18	%.9f	MPIL	MPIroot	%ld	%d_p	mpicom_%lu\n", start_date, size, /* XXX */src, id);
-				fprintf(out_paje_file, "19	%.9f	MPIL	MPIroot	%ld	%d_p	mpicom_%lu\n", end_date, size, /* XXX */dst, id);
+				fprintf(out_paje_file, "18	%.9f	MPIL	MPIroot	%ld	%d_mpict	mpicom_%lu\n", start_date, size, /* XXX */src, id);
+				fprintf(out_paje_file, "19	%.9f	MPIL	MPIroot	%ld	%d_mpict	mpicom_%lu\n", end_date, size, /* XXX */dst, id);
 #endif
 			}
 		}

+ 28 - 3
src/debug/traces/starpu_paje.c

@@ -138,6 +138,7 @@ void _starpu_fxt_write_paje_header(FILE *file)
 	poti_DefineContainerType("Mn", "P", "Memory Node");
 	poti_DefineContainerType("T", "Mn", "Thread");
 	poti_DefineContainerType("W", "T", "Worker");
+	poti_DefineContainerType("MPICt", "T", "MPI Communication Thread");
 	poti_DefineContainerType("Sc", "P", "Scheduler");
 
 	/* Types for the memory node */
@@ -162,6 +163,18 @@ void _starpu_fxt_write_paje_header(FILE *file)
 	poti_DefineEntityValue("Sl", "S", "Sleeping", ".9 .1 .0");
 	poti_DefineEntityValue("P", "S", "Progressing", ".4 .1 .6");
 
+	/* Types for the MPI Communication Thread of the Memory Node */
+	poti_DefineEventType("MPIev", "MPICt", "MPI event type");
+	poti_DefineStateType("CtS", "MPICt", "Communication Thread State");
+	poti_DefineEntityValue("P", "CtS", "Processing", "0 0 0");
+	poti_DefineEntityValue("Sl", "CtS", "Sleeping", ".9 .1 .0");
+	poti_DefineEntityValue("UT", "CtS", "UserTesting", ".2 .1 .6");
+	poti_DefineEntityValue("UW", "CtS", "UserWaiting", ".4 .1 .3");
+	poti_DefineEntityValue("SdS", "CtS", "SendSubmitted", "1.0 .1 1.0");
+	poti_DefineEntityValue("RvS", "CtS", "RecieveSubmitted", "0.1 1.0 1.0");
+	poti_DefineEntityValue("SdC", "CtS", "SendCompleted", "1.0 .5 1.0");
+	poti_DefineEntityValue("RvC", "CtS", "RecieveCompleted", "0.5 1.0 1.0");
+
 	for (i=1; i<=10; i++)
 	{
 		char inctx[8];
@@ -182,7 +195,7 @@ void _starpu_fxt_write_paje_header(FILE *file)
 	poti_DefineVariableType("ntask", "Sc", "Number of tasks", "0 0 0");
 
 	/* Link types */
-	poti_DefineLinkType("MPIL", "MPIP", "P", "P", "Links between two MPI programs");
+	poti_DefineLinkType("MPIL", "P", "MPICt", "MPICt", "Links between two MPI Communication Threads");
 	poti_DefineLinkType("L", "P", "Mn", "Mn", "Links between two Memory Nodes");
 
 	/* Creating the MPI Program */
@@ -194,9 +207,12 @@ void _starpu_fxt_write_paje_header(FILE *file)
 1       Mn      P       \"Memory Node\"                         \n\
 1       T      Mn       \"Thread\"                               \n\
 1       W      T       \"Worker\"                               \n\
+1       MPICt   T       \"MPI Communication Thread\"              \n\
 1       Sc       P       \"Scheduler State\"                        \n\
 2       event   T       \"event type\"				\n\
-3       S       T       \"Thread State\"                        \n");
+2       MPIev   MPICt    \"MPI event type\"			\n\
+3       S       T       \"Thread State\"                        \n\
+3       CtS     MPICt    \"Communication Thread State\"          \n");
 	for (i=1; i<=10; i++)
 		fprintf(file, "3       Ctx%u      T     \"InCtx%u\"         		\n", i, i);
 	fprintf(file, "\
@@ -211,6 +227,15 @@ void _starpu_fxt_write_paje_header(FILE *file)
 6       B       S       Blocked         \".9 .1 .0\"		\n\
 6       Sl       S      Sleeping         \".9 .1 .0\"		\n\
 6       P       S       Progressing         \".4 .1 .6\"		\n");
+	fprintf(file, "\
+6       P       CtS       Processing         \"0 0 0\"		\n\
+6       Sl       CtS      Sleeping         \".9 .1 .0\"		\n\
+6       UT       CtS      UserTesting        \".2 .1 .6\"	\n\
+6       UW       CtS      UserWaiting        \".4 .1 .3\"	\n\
+6       SdS       CtS      SendSubmitted     \"1.0 .1 1.0\"	\n\
+6       RvS       CtS      RecieveSubmitted  \"0.1 1.0 1.0\"	\n\
+6       SdC       CtS      SendCompleted     \"1.0 .5 1.0\"	\n\
+6       RvC       CtS      RecieveCompleted  \"0.5 1.0 1.0\"	\n");
 	for (i=1; i<=10; i++)
 		fprintf(file, "\
 6       I       Ctx%u      Initializing       \"0.0 .7 1.0\"            \n\
@@ -229,7 +254,7 @@ void _starpu_fxt_write_paje_header(FILE *file)
 6       Co       MS     DriverCopy         \".3 .5 .1\"		\n\
 6       CoA      MS     DriverCopyAsync         \".1 .3 .1\"		\n\
 6       No       MS     Nothing         \".0 .0 .0\"		\n\
-5       MPIL     MPIP	P	P      MPIL\n\
+5       MPIL     P	MPICt	MPICt   MPIL			\n\
 5       L       P	Mn	Mn      L\n");
 
 	fprintf(file, "7      0.0 MPIroot      MPIP      0       root\n");