Explorar o código

mpi/nmad: fix formatting and add comments

Philippe SWARTVAGHER %!s(int64=4) %!d(string=hai) anos
pai
achega
29e210dd15

+ 22 - 18
mpi/src/nmad/starpu_mpi_nmad.c

@@ -553,8 +553,12 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 		/* we signal that the request is completed.*/
 
 		free(c);
-
 	}
+
+
+	/** Now, shutting down MPI **/
+
+
 	STARPU_ASSERT_MSG(callback_lfstack_pop(&callback_stack)==NULL, "List of callback not empty.");
 	STARPU_ASSERT_MSG(nb_pending_requests==0, "Request still pending.");
 
@@ -615,11 +619,11 @@ static void _starpu_mpi_add_sync_point_in_fxt(void)
 
 int _starpu_mpi_progress_init(struct _starpu_mpi_argc_argv *argc_argv)
 {
-        STARPU_PTHREAD_MUTEX_INIT(&progress_mutex, NULL);
-        STARPU_PTHREAD_COND_INIT(&progress_cond, NULL);
+	STARPU_PTHREAD_MUTEX_INIT(&progress_mutex, NULL);
+	STARPU_PTHREAD_COND_INIT(&progress_cond, NULL);
 
-        STARPU_PTHREAD_MUTEX_INIT(&mpi_wait_for_all_running_mutex, NULL);
-        STARPU_PTHREAD_COND_INIT(&mpi_wait_for_all_running_cond, NULL);
+	STARPU_PTHREAD_MUTEX_INIT(&mpi_wait_for_all_running_mutex, NULL);
+	STARPU_PTHREAD_COND_INIT(&mpi_wait_for_all_running_cond, NULL);
 
 	starpu_sem_init(&callback_sem, 0, 0);
 	running = 0;
@@ -696,21 +700,21 @@ int _starpu_mpi_progress_init(struct _starpu_mpi_argc_argv *argc_argv)
 	/* Launch thread used for nmad callbacks */
 	STARPU_PTHREAD_CREATE(&progress_thread, NULL, _starpu_mpi_progress_thread_func, argc_argv);
 
-        STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
-        while (!running)
-                STARPU_PTHREAD_COND_WAIT(&progress_cond, &progress_mutex);
-        STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
+	STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
+	while (!running)
+		STARPU_PTHREAD_COND_WAIT(&progress_cond, &progress_mutex);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
 
-        return 0;
+	return 0;
 }
 
 void _starpu_mpi_progress_shutdown(void **value)
 {
 	/* kill the progression thread */
-        STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
-        running = 0;
-        STARPU_PTHREAD_COND_BROADCAST(&progress_cond);
-        STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
+	STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
+	running = 0;
+	STARPU_PTHREAD_COND_BROADCAST(&progress_cond);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
 
 	starpu_sem_post(&callback_sem);
 
@@ -718,11 +722,11 @@ void _starpu_mpi_progress_shutdown(void **value)
 
 	callback_lfstack_destroy(&callback_stack);
 
-        STARPU_PTHREAD_MUTEX_DESTROY(&progress_mutex);
-        STARPU_PTHREAD_COND_DESTROY(&progress_cond);
+	STARPU_PTHREAD_MUTEX_DESTROY(&progress_mutex);
+	STARPU_PTHREAD_COND_DESTROY(&progress_cond);
 
-        STARPU_PTHREAD_MUTEX_DESTROY(&mpi_wait_for_all_running_mutex);
-        STARPU_PTHREAD_COND_DESTROY(&mpi_wait_for_all_running_cond);
+	STARPU_PTHREAD_MUTEX_DESTROY(&mpi_wait_for_all_running_mutex);
+	STARPU_PTHREAD_COND_DESTROY(&mpi_wait_for_all_running_cond);
 }
 
 static int64_t _starpu_mpi_tag_max = INT64_MAX;

+ 1 - 0
mpi/src/nmad/starpu_mpi_nmad_backend.c

@@ -52,6 +52,7 @@ void _starpu_mpi_nmad_backend_request_init(struct _starpu_mpi_req *req)
 {
 	_STARPU_MPI_CALLOC(req->backend, 1, sizeof(struct _starpu_mpi_req_backend));
 	piom_cond_init(&req->backend->req_cond, 0);
+	req->backend->data_request = NM_SR_REQUEST_NULL;
 }
 
 void _starpu_mpi_nmad_backend_request_fill(struct _starpu_mpi_req *req, MPI_Comm comm, int is_internal_req)

+ 3 - 3
mpi/src/nmad/starpu_mpi_nmad_backend.h

@@ -41,9 +41,9 @@ struct _starpu_mpi_req_backend
 	nm_sr_request_t size_req;
 
 	/** When datatype is unknown */
-	struct nm_data_s unknown_datatype_body;
-	struct nm_data_s unknown_datatype_data;
-	struct nm_data_s unknown_datatype_size;
+	struct nm_data_s unknown_datatype_body; // part of unknown_datatype_data
+	struct nm_data_s unknown_datatype_data; // will contain size of the datatype and data itself (represented by unknown_datatype_body)
+	struct nm_data_s unknown_datatype_size; // to fetch the size of the datatype
 };
 
 #endif // STARPU_USE_MPI_NMAD

+ 8 - 4
mpi/src/nmad/starpu_mpi_nmad_unknown_datatype.c

@@ -127,17 +127,18 @@ static void _starpu_mpi_unknown_datatype_recv_callback(nm_sr_event_t event, cons
 
 	if (event & NM_SR_EVENT_RECV_DATA)
 	{
-		nm_data_contiguous_build(&(req->backend->unknown_datatype_size), &(req->count), sizeof(int));
-
+		// Header arrived, so get the size of the datatype and store it in req->count:
+		nm_data_contiguous_build(&(req->backend->unknown_datatype_size), &(req->count), sizeof(starpu_ssize_t));
 		int ret = nm_sr_recv_peek(req->backend->session, &(req->backend->data_request), &(req->backend->unknown_datatype_size));
 		STARPU_ASSERT_MSG(ret == NM_ESUCCESS, "nm_sr_recv_peek returned %d", ret);
 
+		// Now we know the size of the datatype, allocate the buffer:
 		req->ptr = (void *)starpu_malloc_on_node_flags(STARPU_MAIN_RAM, req->count, 0);
 		STARPU_ASSERT_MSG(req->ptr, "cannot allocate message of size %ld", req->count);
 
+		// Last step: give this buffer to NewMadeleine to receive data:
 		nm_mpi_nmad_data_get(&(req->backend->unknown_datatype_body), (void*) req->ptr, req->datatype, req->count);
-
-		// warning: this function requires valid pointers for future usage
+		// warning: this function requires valid pointers for future usage:
 		starpu_nm_datatype_unknown_build(&(req->backend->unknown_datatype_data), &(req->count), &(req->backend->unknown_datatype_body));
 		nm_sr_recv_unpack_data(req->backend->session, &(req->backend->data_request), &(req->backend->unknown_datatype_data));
 	}
@@ -157,6 +158,9 @@ void _starpu_mpi_irecv_unknown_datatype(struct _starpu_mpi_req *req)
 
 	_STARPU_MPI_TRACE_IRECV_SUBMIT_BEGIN(req->node_tag.node.rank, req->node_tag.data_tag);
 
+	/* we post a recv without giving a buffer because we don't know the required size of this buffer,
+	 * the buffer will be allocated and provided to nmad when the header of data will be received,
+	 * in _starpu_mpi_unknown_datatype_recv_callback() */
 	nm_sr_recv_init(req->backend->session, &(req->backend->data_request));
 	nm_sr_request_set_ref(&(req->backend->data_request), req);
 	nm_sr_request_monitor(req->backend->session, &(req->backend->data_request), NM_SR_EVENT_FINALIZED | NM_SR_EVENT_RECV_DATA,

+ 1 - 1
mpi/src/starpu_mpi_private.c

@@ -61,7 +61,7 @@ char *_starpu_mpi_get_mpi_error_code(int code)
 
 void _starpu_mpi_env_init(void)
 {
-        _starpu_mpi_comm_debug = starpu_getenv("STARPU_MPI_COMM") != NULL;
+	_starpu_mpi_comm_debug = starpu_getenv("STARPU_MPI_COMM") != NULL;
 	_starpu_mpi_fake_world_size = starpu_get_env_number("STARPU_MPI_FAKE_SIZE");
 	_starpu_mpi_fake_world_rank = starpu_get_env_number("STARPU_MPI_FAKE_RANK");
 	_starpu_mpi_nobind = starpu_get_env_number_default("STARPU_MPI_NOBIND", 0);

+ 1 - 1
mpi/src/starpu_mpi_private.h

@@ -231,7 +231,7 @@ LIST_TYPE(_starpu_mpi_req,
 	char *datatype_name;
 	void *ptr;
 	starpu_ssize_t count;
-	int registered_datatype;
+	int registered_datatype; // = 0: datatype is not predefined by StarPU; = 1: otherwise; initialized with -1
 
 	struct _starpu_mpi_req_backend *backend;