Browse Source

nmad: send size with data when datatype is unknown

And don't use one request for size and then one request for data
Philippe SWARTVAGHER 5 years ago
parent
commit
e9384af91b

+ 2 - 0
mpi/src/Makefile.am

@@ -75,6 +75,7 @@ noinst_HEADERS =					\
 	mpi/starpu_mpi_driver.h				\
 	mpi/starpu_mpi_mpi_backend.h			\
 	nmad/starpu_mpi_nmad_backend.h			\
+	nmad/starpu_mpi_nmad_unknown_datatype.h		\
 	load_balancer/policy/data_movements_interface.h	\
 	load_balancer/policy/load_data_interface.h	\
 	load_balancer/policy/load_balancer_policy.h
@@ -95,6 +96,7 @@ libstarpumpi_@STARPU_EFFECTIVE_VERSION@_la_SOURCES =	\
 	starpu_mpi_fortran.c				\
 	starpu_mpi_task_insert_fortran.c		\
 	starpu_mpi_init.c				\
+	nmad/starpu_mpi_nmad_unknown_datatype.c		\
 	nmad/starpu_mpi_nmad.c				\
 	nmad/starpu_mpi_nmad_backend.c			\
 	mpi/starpu_mpi_mpi.c				\

+ 15 - 87
mpi/src/nmad/starpu_mpi_nmad.c

@@ -39,13 +39,14 @@
 #include <nm_sendrecv_interface.h>
 #include <nm_mpi_nmad.h>
 #include "starpu_mpi_nmad_backend.h"
+#include "starpu_mpi_nmad_unknown_datatype.h"
 
-static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req,nm_sr_event_t event);
+void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req,nm_sr_event_t event);
 #ifdef STARPU_VERBOSE
 static char *_starpu_mpi_request_type(enum _starpu_mpi_request_type request_type);
 #endif
 
-static void _starpu_mpi_handle_pending_request(struct _starpu_mpi_req *req);
+void _starpu_mpi_handle_pending_request(struct _starpu_mpi_req *req);
 static void _starpu_mpi_add_sync_point_in_fxt(void);
 
 /* Condition to wake up waiting for all current MPI requests to finish */
@@ -88,6 +89,8 @@ static void _starpu_mpi_isend_data_func(struct _starpu_mpi_req *req)
 {
 	_STARPU_MPI_LOG_IN();
 
+	STARPU_ASSERT_MSG(req->registered_datatype == 1, "Datatype is not registered, it cannot be sent through this way !");
+
 	_STARPU_MPI_DEBUG(30, "post NM isend request %p type %s tag %ld src %d data %p datasize %ld ptr %p datatype '%s' count %d registered_datatype %d sync %d\n", req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.node.rank, req->data_handle, starpu_data_get_size(req->data_handle), req->ptr, req->datatype_name, (int)req->count, req->registered_datatype, req->sync);
 
 	_starpu_mpi_comm_amounts_inc(req->node_tag.node.comm, req->node_tag.node.rank, req->datatype, req->count);
@@ -124,49 +127,15 @@ void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
 
 	if (req->registered_datatype == 1)
 	{
-		req->backend->waited = 1;
 		req->count = 1;
 		req->ptr = starpu_data_handle_to_pointer(req->data_handle, STARPU_MAIN_RAM);
+
+		_starpu_mpi_isend_data_func(req);
 	}
 	else
 	{
-		starpu_ssize_t psize = -1;
-		int ret;
-		req->backend->waited =2;
-
-		// Do not pack the data, just try to find out the size
-		starpu_data_pack(req->data_handle, NULL, &psize);
-
-		if (psize != -1)
-		{
-			// We already know the size of the data, let's send it to overlap with the packing of the data
-			_STARPU_MPI_DEBUG(20, "Sending size %ld (%ld %s) to node %d (first call to pack)\n", psize, sizeof(req->count), "MPI_BYTE", req->node_tag.node.rank);
-			req->count = psize;
-			//ret = nm_sr_isend(nm_mpi_communicator_get_session(p_req->p_comm),nm_mpi_communicator_get_gate(p_comm,req->srcdst), req->mpi_tag,&req->count, sizeof(req->count), &req->backend->size_req);
-			ret = nm_sr_isend(req->backend->session,req->backend->gate, req->node_tag.data_tag,&req->count, sizeof(req->count), &req->backend->size_req);
-
-			//	ret = MPI_Isend(&req->count, sizeof(req->count), MPI_BYTE, req->srcdst, req->mpi_tag, req->comm, &req->backend->size_req);
-			STARPU_ASSERT_MSG(ret == NM_ESUCCESS, "when sending size, nm_sr_isend returning %d", ret);
-		}
-
-		// Pack the data
-		starpu_data_pack(req->data_handle, &req->ptr, &req->count);
-		if (psize == -1)
-		{
-			// We know the size now, let's send it
-			_STARPU_MPI_DEBUG(1, "Sending size %ld (%ld %s) with tag %ld to node %d (second call to pack)\n", req->count, sizeof(req->count), "MPI_BYTE", req->node_tag.data_tag, req->node_tag.node.rank);
-			ret = nm_sr_isend(req->backend->session,req->backend->gate, req->node_tag.data_tag,&req->count, sizeof(req->count), &req->backend->size_req);
-			STARPU_ASSERT_MSG(ret == NM_ESUCCESS, "when sending size, nm_sr_isend returning %d", ret);
-		}
-		else
-		{
-			// We check the size returned with the 2 calls to pack is the same
-			STARPU_ASSERT_MSG(req->count == psize, "Calls to pack_data returned different sizes %ld != %ld", req->count, psize);
-		}
-
-		// We can send the data now
+		_starpu_mpi_isend_unknown_datatype(req);
 	}
-	_starpu_mpi_isend_data_func(req);
 }
 
 /********************************************************/
@@ -179,11 +148,12 @@ static void _starpu_mpi_irecv_data_func(struct _starpu_mpi_req *req)
 {
 	_STARPU_MPI_LOG_IN();
 
+	STARPU_ASSERT_MSG(req->registered_datatype == 1, "Datatype is not registered, it cannot be received through this way !");
+
 	_STARPU_MPI_DEBUG(20, "post NM irecv request %p type %s tag %ld src %d data %p ptr %p datatype '%s' count %d registered_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.node.rank, req->data_handle, req->ptr, req->datatype_name, (int)req->count, req->registered_datatype);
 
 	_STARPU_MPI_TRACE_IRECV_SUBMIT_BEGIN(req->node_tag.node.rank, req->node_tag.data_tag);
 
-	//req->ret = MPI_Irecv(req->ptr, req->count, req->datatype, req->srcdst, req->mpi_tag, req->comm, &req->request);
 	struct nm_data_s data;
 	nm_mpi_nmad_data_get(&data, (void*)req->ptr, req->datatype, req->count);
 	nm_sr_recv_init(req->backend->session, &(req->backend->data_request));
@@ -197,23 +167,6 @@ static void _starpu_mpi_irecv_data_func(struct _starpu_mpi_req *req)
 	_STARPU_MPI_LOG_OUT();
 }
 
-struct _starpu_mpi_irecv_size_callback
-{
-	starpu_data_handle_t handle;
-	struct _starpu_mpi_req *req;
-};
-
-static void _starpu_mpi_irecv_size_callback(void *arg)
-{
-	struct _starpu_mpi_irecv_size_callback *callback = (struct _starpu_mpi_irecv_size_callback *)arg;
-
-	starpu_data_unregister(callback->handle);
-	callback->req->ptr = malloc(callback->req->count);
-	STARPU_ASSERT_MSG(callback->req->ptr, "cannot allocate message of size %ld", callback->req->count);
-	_starpu_mpi_irecv_data_func(callback->req);
-	free(callback);
-}
-
 void _starpu_mpi_irecv_size_func(struct _starpu_mpi_req *req)
 {
 	_STARPU_MPI_LOG_IN();
@@ -227,11 +180,7 @@ void _starpu_mpi_irecv_size_func(struct _starpu_mpi_req *req)
 	}
 	else
 	{
-		struct _starpu_mpi_irecv_size_callback *callback = malloc(sizeof(struct _starpu_mpi_irecv_size_callback));
-		callback->req = req;
-		starpu_variable_data_register(&callback->handle, 0, (uintptr_t)&(callback->req->count), sizeof(callback->req->count));
-		_STARPU_MPI_DEBUG(4, "Receiving size with tag %ld from node %d\n", req->node_tag.data_tag, req->node_tag.node.rank);
-		_starpu_mpi_irecv_common(callback->handle, req->node_tag.node.rank, req->node_tag.data_tag, req->node_tag.node.comm, 1, 0, _starpu_mpi_irecv_size_callback, callback,1,0,0);
+		_starpu_mpi_irecv_unknown_datatype(req);
 	}
 
 }
@@ -347,7 +296,7 @@ static char *_starpu_mpi_request_type(enum _starpu_mpi_request_type request_type
 }
 #endif
 
-static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req,nm_sr_event_t event)
+void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req,nm_sr_event_t event)
 {
 	_STARPU_MPI_LOG_IN();
 
@@ -356,22 +305,10 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req,n
 
 	if (req->request_type == RECV_REQ || req->request_type == SEND_REQ)
 	{
+		nm_mpi_nmad_data_release(req->datatype);
+
 		if (req->registered_datatype == 0)
 		{
-			if(req->backend->waited == 1)
-			        nm_mpi_nmad_data_release(req->datatype);
-			if (req->request_type == SEND_REQ)
-			{
-				req->backend->waited--;
-				// We need to make sure the communication for sending the size
-				// has completed, as MPI can re-order messages, let's count
-				// recerived message.
-				// FIXME concurent access.
-				STARPU_ASSERT_MSG(event == NM_SR_EVENT_FINALIZED, "Callback with event %d", event);
-				if(req->backend->waited>0)
-					return;
-
-			}
 			if (req->request_type == RECV_REQ)
 				// req->ptr is freed by starpu_data_unpack
 				starpu_data_unpack(req->data_handle, req->ptr, req->count);
@@ -380,7 +317,6 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req,n
 		}
 		else
 		{
-		        nm_mpi_nmad_data_release(req->datatype);
 			_starpu_mpi_datatype_free(req->data_handle, &req->datatype);
 		}
 	}
@@ -425,17 +361,9 @@ void _starpu_mpi_handle_request_termination_callback(nm_sr_event_t event, const
 	_starpu_mpi_handle_request_termination(ref,event);
 }
 
-static void _starpu_mpi_handle_pending_request(struct _starpu_mpi_req *req)
+void _starpu_mpi_handle_pending_request(struct _starpu_mpi_req *req)
 {
-	if(req->request_type == SEND_REQ && req->backend->waited>1)
-	{
-		nm_sr_request_set_ref(&(req->backend->size_req), req);
-		nm_sr_request_monitor(req->backend->session, &(req->backend->size_req), NM_SR_EVENT_FINALIZED,_starpu_mpi_handle_request_termination_callback);
-	}
-	/* the if must be before, because the first callback can directly free
-	* a detached request (the second callback free if req->backend->waited>1). */
 	nm_sr_request_set_ref(&(req->backend->data_request), req);
-
 	nm_sr_request_monitor(req->backend->session, &(req->backend->data_request), NM_SR_EVENT_FINALIZED,_starpu_mpi_handle_request_termination_callback);
 }
 

+ 1 - 0
mpi/src/nmad/starpu_mpi_nmad_backend.c

@@ -46,6 +46,7 @@ void _starpu_mpi_nmad_backend_request_init(struct _starpu_mpi_req *req)
 
 void _starpu_mpi_nmad_backend_request_fill(struct _starpu_mpi_req *req, MPI_Comm comm, int is_internal_req)
 {
+	/* this function gives session and gate: */
 	nm_mpi_nmad_dest(&req->backend->session, &req->backend->gate, comm, req->node_tag.node.rank);
 }
 

+ 5 - 1
mpi/src/nmad/starpu_mpi_nmad_backend.h

@@ -37,9 +37,13 @@ struct _starpu_mpi_req_backend
 	nm_gate_t gate;
 	nm_session_t session;
 	nm_sr_request_t data_request;
-	int waited;
 	piom_cond_t req_cond;
 	nm_sr_request_t size_req;
+
+	// When datatype is unknown:
+	struct nm_data_s unknown_datatype_body;
+	struct nm_data_s unknown_datatype_data;
+	struct nm_data_s unknown_datatype_size;
 };
 
 #endif // STARPU_USE_MPI_NMAD

+ 169 - 0
mpi/src/nmad/starpu_mpi_nmad_unknown_datatype.c

@@ -0,0 +1,169 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+*
+* Copyright (C) 2019                                     Inria
+*
+* StarPU is free software; you can redistribute it and/or modify
+* it under the terms of the GNU Lesser General Public License as published by
+* the Free Software Foundation; either version 2.1 of the License, or (at
+* your option) any later version.
+*
+* StarPU is distributed in the hope that it will be useful, but
+* WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+*
+* See the GNU Lesser General Public License in COPYING.LGPL for more details.
+*/
+
+
+#include <common/config.h>
+
+#ifdef STARPU_USE_MPI_NMAD
+#include <starpu_mpi_private.h>
+#include <starpu_mpi_stats.h>
+#include <starpu_mpi_datatype.h>
+#include <nm_sendrecv_interface.h>
+#include <nm_mpi_nmad.h>
+#include "starpu_mpi_nmad_backend.h"
+#include "starpu_mpi_nmad_unknown_datatype.h"
+
+#if defined(STARPU_VERBOSE) || defined(STARPU_MPI_VERBOSE)
+extern char *_starpu_mpi_request_type(enum _starpu_mpi_request_type request_type);
+#endif
+
+extern void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req,nm_sr_event_t event);
+extern void _starpu_mpi_handle_pending_request(struct _starpu_mpi_req *req);
+
+struct starpu_nm_datatype_unknown
+{
+	starpu_ssize_t* count;
+	const struct nm_data_s* body;
+};
+
+static void starpu_nm_datatype_unknown_traversal(const void* _content, nm_data_apply_t apply, void* _context);
+const struct nm_data_ops_s starpu_nm_datatype_unknown_ops =
+{
+	.p_traversal = &starpu_nm_datatype_unknown_traversal
+};
+
+NM_DATA_TYPE(datatype_unknown, struct starpu_nm_datatype_unknown, &starpu_nm_datatype_unknown_ops);
+
+static void starpu_nm_datatype_unknown_traversal(const void* _content, nm_data_apply_t apply, void* _context)
+{
+	const struct starpu_nm_datatype_unknown* p_content = _content;
+
+	(*apply)(p_content->count, sizeof(starpu_ssize_t), _context);
+
+	nm_data_traversal_apply(p_content->body, apply, _context);
+}
+
+// warning: this function requires valid pointers for future usage
+void starpu_nm_datatype_unknown_build(struct nm_data_s* datatype_unknown_data, starpu_ssize_t* count, const struct nm_data_s* body)
+{
+	nm_data_datatype_unknown_set(datatype_unknown_data, (struct starpu_nm_datatype_unknown)
+			{
+			.count = count,
+			.body = body
+			});
+}
+
+/**********************************************
+* Send
+**********************************************/
+
+void _starpu_mpi_isend_unknown_datatype(struct _starpu_mpi_req *req)
+{
+	_STARPU_MPI_LOG_IN();
+
+	STARPU_ASSERT_MSG(req->registered_datatype != 1, "Datatype is registered, no need to send it through this way !");
+
+	_STARPU_MPI_DEBUG(30, "post NM isend (unknown datatype) request %p type %s tag %ld src %d data %p datasize %ld ptr %p datatype '%s' count %d registered_datatype %d sync %d\n", req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.node.rank, req->data_handle, starpu_data_get_size(req->data_handle), req->ptr, req->datatype_name, (int)req->count, req->registered_datatype, req->sync);
+
+	_starpu_mpi_comm_amounts_inc(req->node_tag.node.comm, req->node_tag.node.rank, req->datatype, req->count);
+
+	_STARPU_MPI_TRACE_ISEND_SUBMIT_BEGIN(req->node_tag.node.rank, req->node_tag.data_tag, 0);
+
+	starpu_data_pack(req->data_handle, &req->ptr, &req->count);
+
+	nm_mpi_nmad_data_get(&(req->backend->unknown_datatype_body), (void*)req->ptr, req->datatype, req->count);
+
+	// warning: this function requires valid pointers for future usage
+	starpu_nm_datatype_unknown_build(&(req->backend->unknown_datatype_data), &(req->count), &(req->backend->unknown_datatype_body));
+
+	nm_sr_send_init(req->backend->session, &(req->backend->data_request));
+	nm_sr_send_pack_data(req->backend->session, &(req->backend->data_request), &(req->backend->unknown_datatype_data));
+	nm_sr_send_set_priority(req->backend->session, &(req->backend->data_request), req->prio);
+	nm_sr_send_header(req->backend->session, &(req->backend->data_request), sizeof(starpu_ssize_t));
+
+	if (req->sync == 0)
+	{
+		req->ret = nm_sr_send_isend(req->backend->session, &(req->backend->data_request), req->backend->gate, req->node_tag.data_tag);
+		STARPU_ASSERT_MSG(req->ret == NM_ESUCCESS, "nm_sr_send_isend returning %d", req->ret);
+	}
+	else
+	{
+		req->ret = nm_sr_send_issend(req->backend->session, &(req->backend->data_request), req->backend->gate, req->node_tag.data_tag);
+		STARPU_ASSERT_MSG(req->ret == NM_ESUCCESS, "nm_sr_send_issend returning %d", req->ret);
+	}
+
+	_STARPU_MPI_TRACE_ISEND_SUBMIT_END(req->node_tag.node.rank, req->node_tag.data_tag, starpu_data_get_size(req->data_handle), req->pre_sync_jobid);
+
+	_starpu_mpi_handle_pending_request(req);
+
+	_STARPU_MPI_LOG_OUT();
+}
+
+
+/**********************************************
+ * Receive
+ **********************************************/
+
+static void _starpu_mpi_unknown_datatype_recv_callback(nm_sr_event_t event, const nm_sr_event_info_t* p_info, void* ref)
+{
+	STARPU_ASSERT_MSG(!((event & NM_SR_EVENT_FINALIZED) && (event & NM_SR_EVENT_RECV_DATA)), "Both events can't be triggered at the same time !");
+
+	struct _starpu_mpi_req* req = (struct _starpu_mpi_req*) ref;
+
+	if (event & NM_SR_EVENT_RECV_DATA)
+	{
+		nm_data_contiguous_build(&(req->backend->unknown_datatype_size), &(req->count), sizeof(int));
+
+		int ret = nm_sr_recv_peek(req->backend->session, &(req->backend->data_request), &(req->backend->unknown_datatype_size));
+		STARPU_ASSERT_MSG(ret == NM_ESUCCESS, "nm_sr_recv_peek returned %d", ret);
+
+		req->ptr = malloc(req->count);
+		STARPU_ASSERT_MSG(req->ptr, "cannot allocate message of size %ld", req->count);
+
+		nm_mpi_nmad_data_get(&(req->backend->unknown_datatype_body), (void*) req->ptr, req->datatype, req->count);
+
+		// warning: this function requires valid pointers for future usage
+		starpu_nm_datatype_unknown_build(&(req->backend->unknown_datatype_data), &(req->count), &(req->backend->unknown_datatype_body));
+		nm_sr_recv_unpack_data(req->backend->session, &(req->backend->data_request), &(req->backend->unknown_datatype_data));
+	}
+	else if (event & NM_SR_EVENT_FINALIZED)
+	{
+		_starpu_mpi_handle_request_termination(req, event);
+	}
+}
+
+void _starpu_mpi_irecv_unknown_datatype(struct _starpu_mpi_req *req)
+{
+	_STARPU_MPI_LOG_IN();
+
+	STARPU_ASSERT_MSG(req->registered_datatype != 1, "Datatype is registered, no need to receive it through this way !");
+
+	_STARPU_MPI_DEBUG(20, "post NM irecv (datatype unknown) request %p type %s tag %ld src %d data %p ptr %p datatype '%s' count %d registered_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.node.rank, req->data_handle, req->ptr, req->datatype_name, (int)req->count, req->registered_datatype);
+
+	_STARPU_MPI_TRACE_IRECV_SUBMIT_BEGIN(req->node_tag.node.rank, req->node_tag.data_tag);
+
+	nm_sr_recv_init(req->backend->session, &(req->backend->data_request));
+	nm_sr_request_set_ref(&(req->backend->data_request), req);
+	nm_sr_request_monitor(req->backend->session, &(req->backend->data_request), NM_SR_EVENT_FINALIZED | NM_SR_EVENT_RECV_DATA,
+				&_starpu_mpi_unknown_datatype_recv_callback);
+	nm_sr_recv_irecv(req->backend->session, &(req->backend->data_request), req->backend->gate, req->node_tag.data_tag, NM_TAG_MASK_FULL);
+
+	_STARPU_MPI_TRACE_IRECV_SUBMIT_END(req->node_tag.node.rank, req->node_tag.data_tag);
+
+	_STARPU_MPI_LOG_OUT();
+}
+
+#endif //  STARPU_USE_MPI_NMAD

+ 43 - 0
mpi/src/nmad/starpu_mpi_nmad_unknown_datatype.h

@@ -0,0 +1,43 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2019                                     Inria
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#ifndef __STARPU_MPI_NMAD_UNKNOWN_DATATYPE_H__
+#define __STARPU_MPI_NMAD_UNKNOWN_DATATYPE_H__
+
+#include <common/config.h>
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
+#ifdef STARPU_USE_MPI_NMAD
+
+#include <nm_sendrecv_interface.h>
+#include <nm_mpi_nmad.h>
+
+
+void _starpu_mpi_isend_unknown_datatype(struct _starpu_mpi_req *req);
+void _starpu_mpi_irecv_unknown_datatype(struct _starpu_mpi_req *req);
+
+
+#endif // STARPU_USE_MPI_NMAD
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // __STARPU_MPI_NMAD_UNKNOWN_DATATYPE_H__