瀏覽代碼

Adding trace management of MPI communication threads.

Marc Sergent 12 年之前
父節點
當前提交
ff8397c470
共有 2 個文件被更改,包括 124 次插入19 次删除
  1. 50 9
      mpi/src/starpu_mpi.c
  2. 74 10
      mpi/src/starpu_mpi_fxt.h

+ 50 - 9
mpi/src/starpu_mpi.c

@@ -127,10 +127,12 @@ static void _starpu_mpi_isend_data_func(struct _starpu_mpi_req *req)
 
 	_starpu_mpi_comm_amounts_inc(req->comm, req->srcdst, req->datatype, req->count);
 
+	TRACE_MPI_ISEND_SUBMIT_BEGIN(req->srcdst, req->mpi_tag, 0);
+
 	req->ret = MPI_Isend(req->ptr, req->count, req->datatype, req->srcdst, req->mpi_tag, req->comm, &req->request);
 	STARPU_ASSERT(req->ret == MPI_SUCCESS);
 
-	TRACE_MPI_ISEND(req->srcdst, req->mpi_tag, 0);
+	TRACE_MPI_ISEND_SUBMIT_END(req->srcdst, req->mpi_tag, 0);
 
 	/* somebody is perhaps waiting for the MPI request to be posted */
 	_STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
@@ -232,9 +234,13 @@ static void _starpu_mpi_irecv_data_func(struct _starpu_mpi_req *req)
 
 	_STARPU_MPI_DEBUG("post MPI irecv tag %d src %d data %p ptr %p datatype %p count %d req %p \n", req->mpi_tag, req->srcdst, req->data_handle, req->ptr, req->datatype, (int)req->count, &req->request);
 
+	TRACE_MPI_IRECV_SUBMIT_BEGIN(req->srcdst, req->mpi_tag);
+
 	req->ret = MPI_Irecv(req->ptr, req->count, req->datatype, req->srcdst, req->mpi_tag, req->comm, &req->request);
 	STARPU_ASSERT(req->ret == MPI_SUCCESS);
 
+	TRACE_MPI_IRECV_SUBMIT_END(req->srcdst, req->mpi_tag);
+
 	/* somebody is perhaps waiting for the MPI request to be posted */
 	_STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
 	req->submitted = 1;
@@ -367,9 +373,13 @@ static void _starpu_mpi_wait_func(struct _starpu_mpi_req *waiting_req)
 	/* Which is the mpi request we are waiting for ? */
 	struct _starpu_mpi_req *req = waiting_req->other_request;
 
+	TRACE_MPI_UWAIT_BEGIN(req->srcdst, req->mpi_tag);
+
 	req->ret = MPI_Wait(&req->request, waiting_req->status);
 	STARPU_ASSERT(req->ret == MPI_SUCCESS);
 
+	TRACE_MPI_UWAIT_END(req->srcdst, req->mpi_tag);
+
 	_starpu_mpi_handle_request_termination(req);
 	_STARPU_MPI_LOG_OUT();
 }
@@ -431,9 +441,14 @@ static void _starpu_mpi_test_func(struct _starpu_mpi_req *testing_req)
 	struct _starpu_mpi_req *req = testing_req->other_request;
 
 	_STARPU_MPI_DEBUG("Test request %p - mpitag %d - TYPE %s %d\n", &req->request, req->mpi_tag, _starpu_mpi_request_type(req->request_type), req->srcdst);
+
+	TRACE_MPI_UTESTING_BEGIN(req->srcdst, req->mpi_tag);
+	
 	req->ret = MPI_Test(&req->request, testing_req->flag, testing_req->status);
 	STARPU_ASSERT(req->ret == MPI_SUCCESS);
 
+	TRACE_MPI_UTESTING_END(req->srcdst, req->mpi_tag);
+	
 	if (*testing_req->flag)
 	{
 		testing_req->ret = req->ret;
@@ -634,11 +649,6 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req)
 		starpu_data_release(req->data_handle);
 	}
 
-	if (req->request_type == RECV_REQ || req->request_type == PROBE_REQ)
-	{
-		TRACE_MPI_IRECV_END(req->srcdst, req->mpi_tag);
-	}
-
 	/* Execute the specified callback, if any */
 	if (req->callback)
 		req->callback(req->callback_arg);
@@ -716,7 +726,25 @@ static void _starpu_mpi_test_detached_requests(void)
 
 		if (flag)
 		{
+			if (req->request_type == RECV_REQ || req->request_type == PROBE_REQ)
+			{
+				TRACE_MPI_IRECV_COMPLETE_BEGIN(req->srcdst, req->mpi_tag);
+			}
+			else if (req->request_type == SEND_REQ)
+			{
+				TRACE_MPI_ISEND_COMPLETE_BEGIN(req->srcdst, req->mpi_tag, 0);
+			}
+
 			_starpu_mpi_handle_request_termination(req);
+
+			if (req->request_type == RECV_REQ || req->request_type == PROBE_REQ)
+			{
+				TRACE_MPI_IRECV_COMPLETE_END(req->srcdst, req->mpi_tag);
+			}
+			else if (req->request_type == SEND_REQ)
+			{
+				TRACE_MPI_ISEND_COMPLETE_END(req->srcdst, req->mpi_tag, 0);
+			}
 		}
 
 		_STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
@@ -832,10 +860,15 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 		if (block)
 		{
 			_STARPU_MPI_DEBUG("NO MORE REQUESTS TO HANDLE\n");
+
+			TRACE_MPI_SLEEP_BEGIN();
+			
 			if (barrier_running)
 				/* Tell mpi_barrier */
 				_STARPU_PTHREAD_COND_SIGNAL(&cond_finished);
 			_STARPU_PTHREAD_COND_WAIT(&cond_progression, &mutex);
+
+			TRACE_MPI_SLEEP_END();
 		}
 
 		/* test whether there are some terminated "detached request" */
@@ -933,6 +966,14 @@ int _starpu_mpi_initialize(int *argc, char ***argv, int initialize_mpi)
 	argc_argv->initialize_mpi = initialize_mpi;
 	argc_argv->argc = argc;
 	argc_argv->argv = argv;
+
+	int rank, worldsize;
+
+	MPI_Comm_rank(MPI_COMM_WORLD, &rank);
+	MPI_Comm_size(MPI_COMM_WORLD, &worldsize);
+
+	TRACE_MPI_START(rank,worldsize);
+
 	_STARPU_PTHREAD_CREATE("MPI progress", &progress_thread, NULL, _starpu_mpi_progress_thread_func, argc_argv);
 
 	_STARPU_PTHREAD_MUTEX_LOCK(&mutex);
@@ -941,9 +982,7 @@ int _starpu_mpi_initialize(int *argc, char ***argv, int initialize_mpi)
 	_STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
 
 #ifdef STARPU_USE_FXT
-	int prank;
-	MPI_Comm_rank(MPI_COMM_WORLD, &prank);
-	starpu_set_profiling_id(prank);
+	starpu_set_profiling_id(rank);
 #endif //STARPU_USE_FXT
 
 #ifdef USE_STARPU_ACTIVITY
@@ -1002,6 +1041,8 @@ int starpu_mpi_shutdown(void)
 	starpu_progression_hook_deregister(hookid);
 #endif
 
+	TRACE_MPI_STOP(rank, world_size);
+
 	/* free the request queues */
 	_starpu_mpi_req_list_delete(detached_requests);
 	_starpu_mpi_req_list_delete(new_requests);

+ 74 - 10
mpi/src/starpu_mpi_fxt.h

@@ -26,22 +26,86 @@
 extern "C" {
 #endif
 
-#define FUT_MPI_BARRIER		0x5201
-#define FUT_MPI_ISEND		0x5202
-#define FUT_MPI_IRECV_END	0x5203
+#define FUT_MPI_START				0x5201
+#define FUT_MPI_STOP				0x5202
+#define FUT_MPI_BARRIER				0x5203
+#define FUT_MPI_ISEND_SUBMIT_BEGIN		0x5204
+#define FUT_MPI_ISEND_SUBMIT_END		0x5205
+#define FUT_MPI_IRECV_SUBMIT_BEGIN		0x5206
+#define FUT_MPI_IRECV_SUBMIT_END		0x5207
+#define FUT_MPI_ISEND_COMPLETE_BEGIN		0x5208
+#define FUT_MPI_ISEND_COMPLETE_END		0x5209
+#define FUT_MPI_IRECV_COMPLETE_BEGIN		0x5210
+#define FUT_MPI_IRECV_COMPLETE_END		0x5211
+#define FUT_MPI_SLEEP_BEGIN			0x5212
+#define FUT_MPI_SLEEP_END			0x5213
+#define FUT_MPI_DTESTING_BEGIN			0x5214
+#define FUT_MPI_DTESTING_END			0x5215
+#define FUT_MPI_UTESTING_BEGIN			0x5216
+#define FUT_MPI_UTESTING_END			0x5217
+#define FUT_MPI_UWAIT_BEGIN			0x5218
+#define FUT_MPI_UWAIT_END			0x5219
 
 #ifdef STARPU_USE_FXT
+#define TRACE_MPI_START(rank, worldsize)	\
+	FUT_DO_PROBE3(FUT_MPI_START, (rank), (worldsize), _starpu_gettid());
+#define TRACE_MPI_STOP(rank, worldsize)	\
+	FUT_DO_PROBE3(FUT_MPI_STOP, (rank), (worldsize), _starpu_gettid());
 #define TRACE_MPI_BARRIER(rank, worldsize, key)	\
 	FUT_DO_PROBE4(FUT_MPI_BARRIER, (rank), (worldsize), (key), _starpu_gettid());
-#define TRACE_MPI_ISEND(dest, mpi_tag, size)	\
-	FUT_DO_PROBE4(FUT_MPI_ISEND, (dest), (mpi_tag), (size), _starpu_gettid());
-#define TRACE_MPI_IRECV_END(src, mpi_tag)	\
-	FUT_DO_PROBE3(FUT_MPI_IRECV_END, (src), (mpi_tag), _starpu_gettid());
+#define TRACE_MPI_ISEND_SUBMIT_BEGIN(dest, mpi_tag, size)	\
+	FUT_DO_PROBE4(FUT_MPI_ISEND_SUBMIT_BEGIN, (dest), (mpi_tag), (size), _starpu_gettid());
+#define TRACE_MPI_ISEND_SUBMIT_END(dest, mpi_tag, size)	\
+	FUT_DO_PROBE4(FUT_MPI_ISEND_SUBMIT_END, (dest), (mpi_tag), (size), _starpu_gettid());
+#define TRACE_MPI_IRECV_SUBMIT_BEGIN(src, mpi_tag)	\
+	FUT_DO_PROBE3(FUT_MPI_IRECV_SUBMIT_BEGIN, (src), (mpi_tag), _starpu_gettid());
+#define TRACE_MPI_IRECV_SUBMIT_END(src, mpi_tag)	\
+	FUT_DO_PROBE3(FUT_MPI_IRECV_SUBMIT_END, (src), (mpi_tag), _starpu_gettid());
+#define TRACE_MPI_ISEND_COMPLETE_BEGIN(dest, mpi_tag, size)	\
+	FUT_DO_PROBE4(FUT_MPI_ISEND_COMPLETE_BEGIN, (dest), (mpi_tag), (size), _starpu_gettid());
+#define TRACE_MPI_ISEND_COMPLETE_END(dest, mpi_tag, size)	\
+	FUT_DO_PROBE4(FUT_MPI_ISEND_COMPLETE_END, (dest), (mpi_tag), (size), _starpu_gettid());
+#define TRACE_MPI_IRECV_COMPLETE_BEGIN(src, mpi_tag)	\
+	FUT_DO_PROBE3(FUT_MPI_IRECV_COMPLETE_BEGIN, (src), (mpi_tag), _starpu_gettid());
+#define TRACE_MPI_IRECV_COMPLETE_END(src, mpi_tag)	\
+	FUT_DO_PROBE3(FUT_MPI_IRECV_COMPLETE_END, (src), (mpi_tag), _starpu_gettid());
+#define TRACE_MPI_SLEEP_BEGIN()	\
+	FUT_DO_PROBE1(FUT_MPI_SLEEP_BEGIN, _starpu_gettid());
+#define TRACE_MPI_SLEEP_END()	\
+	FUT_DO_PROBE1(FUT_MPI_SLEEP_END, _starpu_gettid());
+#define TRACE_MPI_DTESTING_BEGIN()	\
+	FUT_DO_PROBE1(FUT_MPI_DTESTING_BEGIN,  _starpu_gettid());
+#define TRACE_MPI_DTESTING_END()	\
+	FUT_DO_PROBE1(FUT_MPI_DTESTING_END, _starpu_gettid());
+#define TRACE_MPI_UTESTING_BEGIN(src, mpi_tag)	\
+	FUT_DO_PROBE3(FUT_MPI_UTESTING_BEGIN, (src), (mpi_tag),  _starpu_gettid());
+#define TRACE_MPI_UTESTING_END(src, mpi_tag)	\
+	FUT_DO_PROBE3(FUT_MPI_UTESTING_END, (src), (mpi_tag), _starpu_gettid());
+#define TRACE_MPI_UWAIT_BEGIN(src, mpi_tag)	\
+	FUT_DO_PROBE3(FUT_MPI_UWAIT_BEGIN, (src), (mpi_tag),  _starpu_gettid());
+#define TRACE_MPI_UWAIT_END(src, mpi_tag)	\
+	FUT_DO_PROBE3(FUT_MPI_UWAIT_END, (src), (mpi_tag), _starpu_gettid());
 #define TRACE
 #else
-#define TRACE_MPI_BARRIER(a, b, c)	do {} while(0);
-#define TRACE_MPI_ISEND(a, b, c)	do {} while(0);
-#define TRACE_MPI_IRECV_END(a, b)	do {} while(0);
+#define TRACE_MPI_START(a, b)				do {} while(0);
+#define TRACE_MPI_STOP(a, b)				do {} while(0);
+#define TRACE_MPI_BARRIER(a, b, c)			do {} while(0);
+#define TRACE_MPI_ISEND_SUBMIT_BEGIN(a, b, c)		do {} while(0);
+#define TRACE_MPI_ISEND_SUBMIT_END(a, b, c)		do {} while(0);
+#define TRACE_MPI_IRECV_SUBMIT_BEGIN(a, b)		do {} while(0);
+#define TRACE_MPI_IRECV_SUBMIT_END(a, b)		do {} while(0);
+#define TRACE_MPI_ISEND_COMPLETE_BEGIN(a, b, c)		do {} while(0);
+#define TRACE_MPI_ISEND_COMPLETE_END(a, b, c)		do {} while(0);
+#define TRACE_MPI_IRECV_COMPLETE_BEGIN(a, b)		do {} while(0);
+#define TRACE_MPI_IRECV_COMPLETE_END(a, b)		do {} while(0);
+#define TRACE_MPI_SLEEP_BEGIN()				do {} while(0);
+#define TRACE_MPI_SLEEP_END()				do {} while(0);
+#define TRACE_MPI_DTESTING_BEGIN()			do {} while(0);
+#define TRACE_MPI_DTESTING_END()			do {} while(0);
+#define TRACE_MPI_UTESTING_BEGIN(a, b)			do {} while(0);
+#define TRACE_MPI_UTESTING_END(a, b)			do {} while(0);
+#define TRACE_MPI_UWAIT_BEGIN(a, b)			do {} while(0);
+#define TRACE_MPI_UWAIT_END(a, b)			do {} while(0);
 #endif
 
 #ifdef __cplusplus