소스 검색

mpi: Also make the early data buffer not hardcode STARPU_MAIN_RAM

Samuel Thibault 4 년 전
부모
커밋
9c6e4756e3
2개의 변경된 파일16개의 추가작업 그리고 10개의 파일을 삭제
  1. 1 0
      mpi/src/mpi/starpu_mpi_early_data.h
  2. 15 10
      mpi/src/mpi/starpu_mpi_mpi.c

+ 1 - 0
mpi/src/mpi/starpu_mpi_early_data.h

@@ -39,6 +39,7 @@ LIST_TYPE(_starpu_mpi_early_data_handle,
 	  struct _starpu_mpi_req *req;
 	  void *buffer;
 	  size_t size;
+	  unsigned buffer_node;
 	  int req_ready;
 	  struct _starpu_mpi_node_tag node_tag;
 	  starpu_pthread_mutex_t req_mutex;

+ 15 - 10
mpi/src/mpi/starpu_mpi_mpi.c

@@ -132,6 +132,7 @@ struct _starpu_mpi_early_data_cb_args
 	struct _starpu_mpi_req *req;
 	void *buffer;
 	size_t size;
+	unsigned buffer_node;
 };
 
 void _starpu_mpi_submit_ready_request_inc(struct _starpu_mpi_req *req)
@@ -249,11 +250,12 @@ void _starpu_mpi_submit_ready_request(void *arg)
 				cb_args->early_handle = early_data_handle->handle;
 				cb_args->buffer = early_data_handle->buffer;
 				cb_args->size = early_data_handle->size;
+				cb_args->buffer_node = early_data_handle->buffer_node;
 				cb_args->req = req;
 
 				_STARPU_MPI_DEBUG(3, "Calling data_acquire_cb on starpu_mpi_copy_cb..\n");
 				STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
-				starpu_data_acquire_on_node_cb(early_data_handle->handle,req->node,STARPU_R,_starpu_mpi_early_data_cb,(void*) cb_args);
+				starpu_data_acquire_on_node_cb(early_data_handle->handle,early_data_handle->buffer_node,STARPU_R,_starpu_mpi_early_data_cb,(void*) cb_args);
 				STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
 			}
 			else
@@ -937,14 +939,14 @@ static void _starpu_mpi_early_data_cb(void* arg)
 			int position=0;
 			void *ptr = starpu_data_handle_to_pointer(args->data_handle, args->req->node);
 			MPI_Unpack(args->buffer, itf_src->get_size(args->early_handle), &position, ptr, 1, datatype, args->req->node_tag.node.comm);
-			starpu_free_on_node_flags(STARPU_MAIN_RAM, (uintptr_t) args->buffer, args->size, 0);
+			starpu_free_on_node_flags(args->buffer_node, (uintptr_t) args->buffer, args->size, 0);
 			args->buffer = NULL;
 			_starpu_mpi_datatype_free(args->data_handle, &datatype);
 		}
 		else
 		{
 			STARPU_MPI_ASSERT_MSG(itf_dst->unpack_data, "The data interface does not define an unpack function\n");
-			// FIXME: args->buffers is in STARPU_MAIN_RAM, not req->node
+			// FIXME: args->buffer is in args->buffer_node, not req->node
 			// There is conflation between the memory node for the handle and the memory node for the buffer
 			// Actually we may not want unpack_data to free the buffer, for the case when we are participating to a collective send
 			itf_dst->unpack_data(args->data_handle, args->req->node, args->buffer, itf_src->get_size(args->early_handle));
@@ -954,23 +956,23 @@ static void _starpu_mpi_early_data_cb(void* arg)
 	else
 	{
 		struct starpu_data_interface_ops *itf = starpu_data_get_interface_ops(args->early_handle);
-		void* itf_src = starpu_data_get_interface_on_node(args->early_handle, args->req->node);
+		void* itf_src = starpu_data_get_interface_on_node(args->early_handle, args->buffer_node);
 		void* itf_dst = starpu_data_get_interface_on_node(args->data_handle, args->req->node);
 
 		if (!itf->copy_methods->ram_to_ram)
 		{
 			_STARPU_MPI_DEBUG(3, "Initiating any_to_any copy..\n");
-			itf->copy_methods->any_to_any(itf_src, args->req->node, itf_dst, args->req->node, NULL);
+			itf->copy_methods->any_to_any(itf_src, args->buffer_node, itf_dst, args->req->node, NULL);
 		}
 		else
 		{
 			_STARPU_MPI_DEBUG(3, "Initiating ram_to_ram copy..\n");
-			itf->copy_methods->ram_to_ram(itf_src, args->req->node, itf_dst, args->req->node);
+			itf->copy_methods->ram_to_ram(itf_src, args->buffer_node, itf_dst, args->req->node);
 		}
 	}
 
 	_STARPU_MPI_DEBUG(3, "Done, handling release of early_handle..\n");
-	starpu_data_release_on_node(args->early_handle, args->req->node);
+	starpu_data_release_on_node(args->early_handle, args->buffer_node);
 
 	_STARPU_MPI_DEBUG(3, "Done, handling unregister of early_handle..\n");
 	/* XXX: note that we have already freed the registered buffer above. In
@@ -1150,10 +1152,13 @@ static void _starpu_mpi_receive_early_data(struct _starpu_mpi_envelope *envelope
 	data_handle = _starpu_mpi_tag_get_data_handle_from_tag(envelope->data_tag);
 	STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
 
+	// TODO: rather select some memory node next to the NIC
+	unsigned buffer_node = STARPU_MAIN_RAM;
 	if (data_handle && starpu_data_get_interface_id(data_handle) < STARPU_MAX_INTERFACE_ID)
 	{
 		/* We know which data will receive it and we won't have to unpack, use just the same kind of data.  */
 		early_data_handle->buffer = NULL;
+		early_data_handle->buffer_node = buffer_node;
 		starpu_data_register_same(&early_data_handle->handle, data_handle);
 		//_starpu_mpi_early_data_add(early_data_handle);
 	}
@@ -1164,10 +1169,10 @@ static void _starpu_mpi_receive_early_data(struct _starpu_mpi_envelope *envelope
 		 * to the application when it post a receive for this tag
 		 */
 		_STARPU_MPI_DEBUG(3, "Posting a receive for a data of size %d which has not yet been registered\n", (int)envelope->size);
-		// TODO: rather allocate on some memory node next to NIC
-		early_data_handle->buffer = (void *)starpu_malloc_on_node_flags(STARPU_MAIN_RAM, envelope->size, 0);
+		early_data_handle->buffer = (void *)starpu_malloc_on_node_flags(buffer_node, envelope->size, 0);
 		early_data_handle->size = envelope->size;
-		starpu_variable_data_register(&early_data_handle->handle, STARPU_MAIN_RAM, (uintptr_t) early_data_handle->buffer, envelope->size);
+		early_data_handle->buffer_node = buffer_node;
+		starpu_variable_data_register(&early_data_handle->handle, buffer_node, (uintptr_t) early_data_handle->buffer, envelope->size);
 		//_starpu_mpi_early_data_add(early_data_handle);
 	}