Переглянути джерело

mpi/src: fix when receiving too early data which have not yet been registered, the data is received as as a raw memory, and will be given it to the application when it post a equivalent receive

Nathalie Furmento 11 роки тому
батько
коміт
29bb94205b
1 змінених файлів з 48 додано та 22 видалено
  1. 48 22
      mpi/src/starpu_mpi.c

+ 48 - 22
mpi/src/starpu_mpi.c

@@ -69,6 +69,7 @@ struct _starpu_mpi_copy_handle
 	int mpi_tag;
 	UT_hash_handle hh;
 	struct _starpu_mpi_req *req;
+	void *buffer;
 };
 
  /********************************************************/
@@ -840,6 +841,7 @@ struct _starpu_mpi_copy_cb_args
 	starpu_data_handle_t data_handle;
 	starpu_data_handle_t copy_handle;
 	struct _starpu_mpi_req *req;
+	void *buffer;
 };
 
 static void _starpu_mpi_copy_cb(void* arg)
@@ -851,19 +853,30 @@ static void _starpu_mpi_copy_cb(void* arg)
 	args->req->request = args->req->internal_req->request;
 	args->req->submitted = 1;
 
-	struct starpu_data_interface_ops *itf = starpu_data_get_interface_ops(args->copy_handle);
-	void* itf_src = starpu_data_get_interface_on_node(args->copy_handle,0);
-	void* itf_dst = starpu_data_get_interface_on_node(args->data_handle,0);
-
-	if (!itf->copy_methods->ram_to_ram)
+	if (args->buffer)
 	{
-		_STARPU_MPI_DEBUG(3, "Initiating any_to_any copy..\n");
-		itf->copy_methods->any_to_any(itf_src, 0, itf_dst, 0, NULL);
+		/* Data has been received as a raw memory, it has to be unpacked */
+		struct starpu_data_interface_ops *itf_src = starpu_data_get_interface_ops(args->copy_handle);
+		struct starpu_data_interface_ops *itf_dst = starpu_data_get_interface_ops(args->data_handle);
+		itf_dst->unpack_data(args->data_handle, 0, args->buffer, itf_src->get_size(args->copy_handle));
+		free(args->buffer);
 	}
 	else
 	{
-		_STARPU_MPI_DEBUG(3, "Initiating ram_to_ram copy..\n");
-		itf->copy_methods->ram_to_ram(itf_src, 0, itf_dst, 0);
+		struct starpu_data_interface_ops *itf = starpu_data_get_interface_ops(args->copy_handle);
+		void* itf_src = starpu_data_get_interface_on_node(args->copy_handle,0);
+		void* itf_dst = starpu_data_get_interface_on_node(args->data_handle,0);
+
+		if (!itf->copy_methods->ram_to_ram)
+		{
+			_STARPU_MPI_DEBUG(3, "Initiating any_to_any copy..\n");
+			itf->copy_methods->any_to_any(itf_src, 0, itf_dst, 0, NULL);
+		}
+		else
+		{
+			_STARPU_MPI_DEBUG(3, "Initiating ram_to_ram copy..\n");
+			itf->copy_methods->ram_to_ram(itf_src, 0, itf_dst, 0);
+		}
 	}
 
 	_STARPU_MPI_DEBUG(3, "Done, handling release of copy_handle..\n");
@@ -946,6 +959,7 @@ static void _starpu_mpi_submit_new_mpi_request(void *arg)
 				struct _starpu_mpi_copy_cb_args *cb_args = malloc(sizeof(struct _starpu_mpi_copy_cb_args));
 				cb_args->data_handle = req->data_handle;
 				cb_args->copy_handle = chandle->handle;
+				cb_args->buffer = chandle->buffer;
 				cb_args->req = req;
 
 				_STARPU_MPI_DEBUG(3, "Calling data_acquire_cb on starpu_mpi_copy_cb..\n");
@@ -1237,27 +1251,39 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 				if (!found_req)
 				{
 					_STARPU_MPI_DEBUG(3, "Request with tag %d not found, creating a copy_handle to receive incoming data..\n",recv_env->mpi_tag);
-
 					starpu_data_handle_t data_handle = NULL;
 
-					while(!(data_handle))
-					{
-						STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
-						data_handle = starpu_data_get_data_handle_from_tag(recv_env->mpi_tag);
-						STARPU_PTHREAD_MUTEX_LOCK(&mutex);
-					}
-					STARPU_ASSERT(data_handle);
+					STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+					data_handle = starpu_data_get_data_handle_from_tag(recv_env->mpi_tag);
+					STARPU_PTHREAD_MUTEX_LOCK(&mutex);
 
 					struct _starpu_mpi_copy_handle* chandle = malloc(sizeof(struct _starpu_mpi_copy_handle));
 					STARPU_ASSERT(chandle);
-
 					chandle->mpi_tag = recv_env->mpi_tag;
 					chandle->env = recv_env;
-					starpu_data_register_same(&chandle->handle, data_handle);
-					add_chandle(chandle);
 
-					_STARPU_MPI_DEBUG(3, "Posting internal detached irecv on copy_handle with tag %d from src %d ..\n", chandle->mpi_tag, status.MPI_SOURCE);
-					chandle->req = _starpu_mpi_irecv_common(chandle->handle, status.MPI_SOURCE, chandle->mpi_tag, MPI_COMM_WORLD, 1, NULL, NULL, 1, 1, recv_env->psize);
+					if (data_handle)
+					{
+						chandle->buffer = NULL;
+						starpu_data_register_same(&chandle->handle, data_handle);
+						add_chandle(chandle);
+					}
+					else
+					{
+						/* The application has not registered yet a data with the tag,
+						 * we are going to receive the data as a raw memory, and give it
+						 * to the application when it post a receive for this tag
+						 */
+						_STARPU_MPI_DEBUG(20, "Posting a receive for a data of size %d which has not yet been registered\n", (int)chandle->env->size);
+						chandle->buffer = malloc(chandle->env->size);
+						starpu_vector_data_register(&chandle->handle, 0, (uintptr_t) chandle->buffer, chandle->env->size, 1);
+						add_chandle(chandle);
+					}
+
+					_STARPU_MPI_DEBUG(20, "Posting internal detached irecv on copy_handle with tag %d from src %d ..\n", chandle->mpi_tag, status.MPI_SOURCE);
+					STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+					chandle->req = _starpu_mpi_irecv_common(chandle->handle, status.MPI_SOURCE, chandle->mpi_tag, MPI_COMM_WORLD, 1, NULL, NULL, 1, 1, recv_env->size);
+					STARPU_PTHREAD_MUTEX_LOCK(&mutex);
 
 					// We wait until the request is pushed in the
 					// new_request list, that ensures that the next loop