Quellcode durchsuchen

mpi: Make more code use req->node

Samuel Thibault vor 4 Jahren
Ursprung
Commit
a3caa96a0b
1 geänderte Dateien mit 12 neuen und 8 gelöschten Zeilen
  1. 12 8
      mpi/src/mpi/starpu_mpi_mpi.c

+ 12 - 8
mpi/src/mpi/starpu_mpi_mpi.c

@@ -253,7 +253,7 @@ void _starpu_mpi_submit_ready_request(void *arg)
 
 				_STARPU_MPI_DEBUG(3, "Calling data_acquire_cb on starpu_mpi_copy_cb..\n");
 				STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
-				starpu_data_acquire_on_node_cb(early_data_handle->handle,STARPU_MAIN_RAM,STARPU_R,_starpu_mpi_early_data_cb,(void*) cb_args);
+				starpu_data_acquire_on_node_cb(early_data_handle->handle,req->node,STARPU_R,_starpu_mpi_early_data_cb,(void*) cb_args);
 				STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
 			}
 			else
@@ -944,6 +944,9 @@ static void _starpu_mpi_early_data_cb(void* arg)
 		else
 		{
 			STARPU_MPI_ASSERT_MSG(itf_dst->unpack_data, "The data interface does not define an unpack function\n");
+			// FIXME: args->buffers is in STARPU_MAIN_RAM, not req->node
+			// There is conflation between the memory node for the handle and the memory node for the buffer
+			// Actually we may not want unpack_data to free the buffer, for the case when we are participating to a collective send
 			itf_dst->unpack_data(args->data_handle, args->req->node, args->buffer, itf_src->get_size(args->early_handle));
 			args->buffer = NULL;
 		}
@@ -951,23 +954,23 @@ static void _starpu_mpi_early_data_cb(void* arg)
 	else
 	{
 		struct starpu_data_interface_ops *itf = starpu_data_get_interface_ops(args->early_handle);
-		void* itf_src = starpu_data_get_interface_on_node(args->early_handle, STARPU_MAIN_RAM);
+		void* itf_src = starpu_data_get_interface_on_node(args->early_handle, args->req->node);
 		void* itf_dst = starpu_data_get_interface_on_node(args->data_handle, args->req->node);
 
 		if (!itf->copy_methods->ram_to_ram)
 		{
 			_STARPU_MPI_DEBUG(3, "Initiating any_to_any copy..\n");
-			itf->copy_methods->any_to_any(itf_src, STARPU_MAIN_RAM, itf_dst, args->req->node, NULL);
+			itf->copy_methods->any_to_any(itf_src, args->req->node, itf_dst, args->req->node, NULL);
 		}
 		else
 		{
 			_STARPU_MPI_DEBUG(3, "Initiating ram_to_ram copy..\n");
-			itf->copy_methods->ram_to_ram(itf_src, STARPU_MAIN_RAM, itf_dst, args->req->node);
+			itf->copy_methods->ram_to_ram(itf_src, args->req->node, itf_dst, args->req->node);
 		}
 	}
 
 	_STARPU_MPI_DEBUG(3, "Done, handling release of early_handle..\n");
-	starpu_data_release_on_node(args->early_handle, STARPU_MAIN_RAM);
+	starpu_data_release_on_node(args->early_handle, args->req->node);
 
 	_STARPU_MPI_DEBUG(3, "Done, handling unregister of early_handle..\n");
 	/* XXX: note that we have already freed the registered buffer above. In
@@ -1161,6 +1164,7 @@ static void _starpu_mpi_receive_early_data(struct _starpu_mpi_envelope *envelope
 		 * to the application when it post a receive for this tag
 		 */
 		_STARPU_MPI_DEBUG(3, "Posting a receive for a data of size %d which has not yet been registered\n", (int)envelope->size);
+		// TODO: rather allocate on some memory node next to NIC
 		early_data_handle->buffer = (void *)starpu_malloc_on_node_flags(STARPU_MAIN_RAM, envelope->size, 0);
 		early_data_handle->size = envelope->size;
 		starpu_variable_data_register(&early_data_handle->handle, STARPU_MAIN_RAM, (uintptr_t) early_data_handle->buffer, envelope->size);
@@ -1456,13 +1460,13 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 						if (early_request->registered_datatype == 1)
 						{
 							early_request->count = 1;
-							early_request->ptr = starpu_data_handle_to_pointer(early_request->data_handle, STARPU_MAIN_RAM);
+							early_request->ptr = starpu_data_handle_to_pointer(early_request->data_handle, early_request->node);
 						}
 						else
 						{
 							early_request->count = envelope->size;
-							early_request->ptr = (void *)starpu_malloc_on_node_flags(STARPU_MAIN_RAM, early_request->count, 0);
-							starpu_memory_allocate(STARPU_MAIN_RAM, early_request->count, STARPU_MEMORY_OVERFLOW);
+							early_request->ptr = (void *)starpu_malloc_on_node_flags(early_request->node, early_request->count, 0);
+							starpu_memory_allocate(early_request->node, early_request->count, STARPU_MEMORY_OVERFLOW);
 
 							STARPU_MPI_ASSERT_MSG(early_request->ptr, "cannot allocate message of size %ld\n", early_request->count);
 						}