Browse Source

mpi: make more code use the req->node memory node

Samuel Thibault 4 years ago
parent
commit
5587862f9f

+ 16 - 16
mpi/src/mpi/starpu_mpi_mpi.c

@@ -197,12 +197,12 @@ void _starpu_mpi_submit_ready_request(void *arg)
 			if (req->registered_datatype == 1)
 			{
 				req->count = 1;
-				req->ptr = starpu_data_handle_to_pointer(req->data_handle, STARPU_MAIN_RAM);
+				req->ptr = starpu_data_handle_to_pointer(req->data_handle, req->node);
 			}
 			else
 			{
 				STARPU_ASSERT(req->count);
-				req->ptr = (void *)starpu_malloc_on_node_flags(STARPU_MAIN_RAM, req->count, 0);
+				req->ptr = (void *)starpu_malloc_on_node_flags(req->node, req->count, 0);
 			}
 
 			_STARPU_MPI_DEBUG(3, "Pushing internal starpu_mpi_irecv request %p type %s tag %"PRIi64" src %d data %p ptr %p datatype '%s' count %d registered_datatype %d \n",
@@ -268,13 +268,13 @@ void _starpu_mpi_submit_ready_request(void *arg)
 					if (req->registered_datatype == 1)
 					{
 						req->count = 1;
-						req->ptr = starpu_data_handle_to_pointer(req->data_handle, STARPU_MAIN_RAM);
+						req->ptr = starpu_data_handle_to_pointer(req->data_handle, req->node);
 					}
 					else
 					{
 						req->count = sync_req->count;
 						STARPU_ASSERT(req->count);
-						req->ptr = (void *)starpu_malloc_on_node_flags(STARPU_MAIN_RAM, req->count, 0);
+						req->ptr = (void *)starpu_malloc_on_node_flags(req->node, req->count, 0);
 					}
 					_starpu_mpi_req_list_push_front(&ready_recv_requests, req);
 					_STARPU_MPI_INC_READY_REQUESTS(+1);
@@ -431,7 +431,7 @@ void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
 	{
 		int size, ret;
 		req->count = 1;
-		req->ptr = starpu_data_handle_to_pointer(req->data_handle, STARPU_MAIN_RAM);
+		req->ptr = starpu_data_handle_to_pointer(req->data_handle, req->node);
 
 		MPI_Type_size(req->datatype, &size);
 		req->backend->envelope->size = (starpu_ssize_t)req->count * size;
@@ -445,7 +445,7 @@ void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
 		int ret;
 
  		// Do not pack the data, just try to find out the size
-		starpu_data_pack(req->data_handle, NULL, &(req->backend->envelope->size));
+		starpu_data_pack_node(req->data_handle, req->node, NULL, &(req->backend->envelope->size));
 
 		if (req->backend->envelope->size != -1)
  		{
@@ -458,7 +458,7 @@ void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
  		}
 
  		// Pack the data
- 		starpu_data_pack(req->data_handle, &req->ptr, &req->count);
+		starpu_data_pack_node(req->data_handle, req->node, &req->ptr, &req->count);
 		if (req->backend->envelope->size == -1)
  		{
  			// We know the size now, let's send it
@@ -879,14 +879,14 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req)
 					int ret;
 					ret = MPI_Wait(&req->backend->size_req, MPI_STATUS_IGNORE);
 					STARPU_MPI_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Wait returning %s", _starpu_mpi_get_mpi_error_code(ret));
-					starpu_free_on_node_flags(STARPU_MAIN_RAM, (uintptr_t)req->ptr, req->count, 0);
+					starpu_free_on_node_flags(req->node, (uintptr_t)req->ptr, req->count, 0);
 					req->ptr = NULL;
 				}
 				else if (req->request_type == RECV_REQ)
 				{
 					// req->ptr is freed by starpu_data_unpack
-					starpu_data_unpack(req->data_handle, req->ptr, req->count);
-					starpu_memory_deallocate(STARPU_MAIN_RAM, req->count);
+					starpu_data_unpack_node(req->data_handle, req->node, req->ptr, req->count);
+					starpu_memory_deallocate(req->node, req->count);
 				}
 			}
 			else
@@ -930,12 +930,12 @@ static void _starpu_mpi_early_data_cb(void* arg)
 		/* Data has been received as a raw memory, it has to be unpacked */
 		struct starpu_data_interface_ops *itf_src = starpu_data_get_interface_ops(args->early_handle);
 		struct starpu_data_interface_ops *itf_dst = starpu_data_get_interface_ops(args->data_handle);
-		MPI_Datatype datatype = _starpu_mpi_datatype_get_user_defined_datatype(args->data_handle, STARPU_MAIN_RAM);
+		MPI_Datatype datatype = _starpu_mpi_datatype_get_user_defined_datatype(args->data_handle, args->req->node);
 
 		if (datatype)
 		{
 			int position=0;
-			void *ptr = starpu_data_get_local_ptr(args->data_handle);
+			void *ptr = starpu_data_handle_to_pointer(args->data_handle, args->req->node);
 			MPI_Unpack(args->buffer, itf_src->get_size(args->early_handle), &position, ptr, 1, datatype, args->req->node_tag.node.comm);
 			starpu_free_on_node_flags(STARPU_MAIN_RAM, (uintptr_t) args->buffer, args->size, 0);
 			args->buffer = NULL;
@@ -944,7 +944,7 @@ static void _starpu_mpi_early_data_cb(void* arg)
 		else
 		{
 			STARPU_MPI_ASSERT_MSG(itf_dst->unpack_data, "The data interface does not define an unpack function\n");
-			itf_dst->unpack_data(args->data_handle, STARPU_MAIN_RAM, args->buffer, itf_src->get_size(args->early_handle));
+			itf_dst->unpack_data(args->data_handle, args->req->node, args->buffer, itf_src->get_size(args->early_handle));
 			args->buffer = NULL;
 		}
 	}
@@ -952,17 +952,17 @@ static void _starpu_mpi_early_data_cb(void* arg)
 	{
 		struct starpu_data_interface_ops *itf = starpu_data_get_interface_ops(args->early_handle);
 		void* itf_src = starpu_data_get_interface_on_node(args->early_handle, STARPU_MAIN_RAM);
-		void* itf_dst = starpu_data_get_interface_on_node(args->data_handle, STARPU_MAIN_RAM);
+		void* itf_dst = starpu_data_get_interface_on_node(args->data_handle, args->req->node);
 
 		if (!itf->copy_methods->ram_to_ram)
 		{
 			_STARPU_MPI_DEBUG(3, "Initiating any_to_any copy..\n");
-			itf->copy_methods->any_to_any(itf_src, STARPU_MAIN_RAM, itf_dst, STARPU_MAIN_RAM, NULL);
+			itf->copy_methods->any_to_any(itf_src, STARPU_MAIN_RAM, itf_dst, args->req->node, NULL);
 		}
 		else
 		{
 			_STARPU_MPI_DEBUG(3, "Initiating ram_to_ram copy..\n");
-			itf->copy_methods->ram_to_ram(itf_src, STARPU_MAIN_RAM, itf_dst, STARPU_MAIN_RAM);
+			itf->copy_methods->ram_to_ram(itf_src, STARPU_MAIN_RAM, itf_dst, args->req->node);
 		}
 	}
 

+ 4 - 4
mpi/src/nmad/starpu_mpi_nmad.c

@@ -138,7 +138,7 @@ void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
 	if (req->registered_datatype == 1)
 	{
 		req->count = 1;
-		req->ptr = starpu_data_handle_to_pointer(req->data_handle, STARPU_MAIN_RAM);
+		req->ptr = starpu_data_handle_to_pointer(req->data_handle, req->node);
 
 		_starpu_mpi_isend_data_func(req);
 	}
@@ -185,7 +185,7 @@ void _starpu_mpi_irecv_size_func(struct _starpu_mpi_req *req)
 	if (req->registered_datatype == 1)
 	{
 		req->count = 1;
-		req->ptr = starpu_data_handle_to_pointer(req->data_handle, STARPU_MAIN_RAM);
+		req->ptr = starpu_data_handle_to_pointer(req->data_handle, req->node);
 		_starpu_mpi_irecv_data_func(req);
 	}
 	else
@@ -344,9 +344,9 @@ void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req* req)
 		{
 			if (req->request_type == RECV_REQ)
 				// req->ptr is freed by starpu_data_unpack
-				starpu_data_unpack(req->data_handle, req->ptr, req->count);
+				starpu_data_unpack_node(req->data_handle, req->node, req->ptr, req->count);
 			else
-				starpu_free_on_node_flags(STARPU_MAIN_RAM, (uintptr_t) req->ptr, req->count, 0);
+				starpu_free_on_node_flags(req->node, (uintptr_t) req->ptr, req->count, 0);
 		}
 		else
 		{

+ 2 - 2
mpi/src/nmad/starpu_mpi_nmad_unknown_datatype.c

@@ -50,7 +50,7 @@ void _starpu_mpi_isend_unknown_datatype(struct _starpu_mpi_req *req)
 
 	_STARPU_MPI_TRACE_ISEND_SUBMIT_BEGIN(req->node_tag.node.rank, req->node_tag.data_tag, 0);
 
-	starpu_data_pack(req->data_handle, &req->ptr, &req->count);
+	starpu_data_pack_node(req->data_handle, req->node, &req->ptr, &req->count);
 
 	req->backend->unknown_datatype_v[0].iov_base = &req->count;
 	req->backend->unknown_datatype_v[0].iov_len = sizeof(starpu_ssize_t);
@@ -102,7 +102,7 @@ static void _starpu_mpi_unknown_datatype_recv_callback(nm_sr_event_t event, cons
 		nm_sr_recv_peek(req->backend->session, &req->backend->data_request, &data_header);
 
 		// Now we know the size, allocate the buffer:
-		req->ptr = (void *)starpu_malloc_on_node_flags(STARPU_MAIN_RAM, req->count, 0);
+		req->ptr = (void *)starpu_malloc_on_node_flags(req->node, req->count, 0);
 		STARPU_ASSERT_MSG(req->ptr, "cannot allocate message of size %ld", req->count);
 
 		/* Last step: give this buffer to NewMadeleine to receive data

+ 1 - 1
mpi/src/starpu_mpi.c

@@ -36,7 +36,7 @@
 
 static void _starpu_mpi_isend_irecv_common(struct _starpu_mpi_req *req, enum starpu_data_access_mode mode, int sequential_consistency)
 {
-	unsigned node = STARPU_MAIN_RAM; // For now
+	unsigned node = STARPU_MAIN_RAM; // XXX For now
 
 	/* Asynchronously request StarPU to fetch the data in main memory: when
 	 * it is available in main memory, _starpu_mpi_submit_ready_request(req) is called and

+ 5 - 3
mpi/src/starpu_mpi_coop_sends.c

@@ -47,13 +47,13 @@ void _starpu_mpi_release_req_data(struct _starpu_mpi_req *req)
 			/* We were last, release data */
 			free(coop_sends->reqs_array);
 			free(coop_sends);
-			starpu_data_release_on_node(req->data_handle, STARPU_MAIN_RAM);
+			starpu_data_release_on_node(req->data_handle, req->node);
 		}
 	}
 	else
 	{
 		/* Trivial request */
-		starpu_data_release_on_node(req->data_handle, STARPU_MAIN_RAM);
+		starpu_data_release_on_node(req->data_handle, req->node);
 	}
 }
 
@@ -265,9 +265,11 @@ void _starpu_mpi_coop_send(starpu_data_handle_t data_handle, struct _starpu_mpi_
 	/* In case we created one for nothing after all */
 	free(tofree);
 
+	unsigned node = STARPU_MAIN_RAM; // XXX For now
+
 	if (first)
 		/* We were first, we are responsible for acquiring the data for everybody */
-		starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(req->data_handle, STARPU_MAIN_RAM, mode, _starpu_mpi_coop_sends_data_ready, coop_sends, sequential_consistency, 0, &coop_sends->pre_sync_jobid, NULL);
+		starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(req->data_handle, node, mode, _starpu_mpi_coop_sends_data_ready, coop_sends, sequential_consistency, 0, &coop_sends->pre_sync_jobid, NULL);
 	else
 		req->pre_sync_jobid = coop_sends->pre_sync_jobid;
 }