Browse Source

mpi: New functionality starpu_mpi_irecv_probe_detached which first tests if the message is available before calling MPI_Recv.

Nathalie Furmento 12 years ago
parent
commit
4b7f11209c
6 changed files with 159 additions and 5 deletions
  1. 2 0
      ChangeLog
  2. 1 0
      mpi/include/starpu_mpi.h
  3. 50 4
      mpi/src/starpu_mpi.c
  4. 2 1
      mpi/src/starpu_mpi_private.h
  5. 4 0
      mpi/tests/Makefile.am
  6. 100 0
      mpi/tests/mpi_probe.c

+ 2 - 0
ChangeLog

@@ -56,6 +56,8 @@ New features:
         - When exchanging user-defined data interfaces, the size of
 	  the data is the size returned by the pack operation, i.e
 	  data with dynamic size can now be exchanged with StarPU-MPI.
+        - New functionality starpu_mpi_irecv_probe_detached which
+  	  first tests if the message is available before calling MPI_Recv.
   * Add experimental simgrid support, to simulate execution with various
     number of CPUs, GPUs, amount of memory, etc.
   * Add support for OpenCL simulators (which provide simulated execution time)

+ 1 - 0
mpi/include/starpu_mpi.h

@@ -36,6 +36,7 @@ int starpu_mpi_send(starpu_data_handle_t data_handle, int dest, int mpi_tag, MPI
 int starpu_mpi_recv(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, MPI_Status *status);
 int starpu_mpi_isend_detached(starpu_data_handle_t data_handle, int dest, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg);
 int starpu_mpi_irecv_detached(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg);
+int starpu_mpi_irecv_probe_detached(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg);
 int starpu_mpi_wait(starpu_mpi_req *req, MPI_Status *status);
 int starpu_mpi_test(starpu_mpi_req *req, int *flag, MPI_Status *status);
 int starpu_mpi_barrier(MPI_Comm comm);

+ 50 - 4
mpi/src/starpu_mpi.c

@@ -322,6 +322,33 @@ int starpu_mpi_recv(starpu_data_handle_t data_handle, int source, int mpi_tag, M
 	return 0;
 }
 
+static void _starpu_mpi_probe_func(struct _starpu_mpi_req *req)
+{
+	_STARPU_MPI_LOG_IN();
+
+	_starpu_mpi_handle_allocate_datatype(req->data_handle, &req->datatype, &req->user_datatype);
+#ifdef STARPU_DEVEL
+#warning TODO: release that assert
+#endif
+	assert(req->user_datatype == 0);
+	req->count = 1;
+	req->ptr = starpu_handle_get_local_ptr(req->data_handle);
+
+	_STARPU_MPI_DEBUG("MPI probe tag %d dst %d ptr %p datatype %p count %d req %p\n", req->mpi_tag, req->srcdst, req->ptr, req->datatype, (int)req->count, &req->request);
+
+	_starpu_mpi_handle_detached_request(req);
+
+	_STARPU_MPI_LOG_OUT();
+}
+
+int starpu_mpi_irecv_probe_detached(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
+{
+	_STARPU_MPI_LOG_IN();
+	_starpu_mpi_isend_irecv_common(data_handle, source, mpi_tag, comm, 1, callback, arg, PROBE_REQ, _starpu_mpi_probe_func, STARPU_W);
+	_STARPU_MPI_LOG_OUT();
+	return 0;
+}
+
 /********************************************************/
 /*                                                      */
 /*  Wait functionalities                                */
@@ -563,6 +590,7 @@ static char *_starpu_mpi_request_type(enum _starpu_mpi_request_type request_type
 		case WAIT_REQ: return "WAIT_REQ";
 		case TEST_REQ: return "TEST_REQ";
 		case BARRIER_REQ: return "BARRIER_REQ";
+		case PROBE_REQ: return "PROBE_REQ";
 		default: return "unknown request type";
 		}
 }
@@ -573,7 +601,17 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req)
 	_STARPU_MPI_LOG_IN();
 
 	_STARPU_MPI_DEBUG("complete MPI (%s %d) data %p req %p - tag %d\n", _starpu_mpi_request_type(req->request_type), req->srcdst, req->data_handle, &req->request, req->mpi_tag);
-	if (req->request_type == RECV_REQ || req->request_type == SEND_REQ)
+	if (req->request_type == PROBE_REQ)
+	{
+#ifdef STARPU_DEVEL
+#warning TODO: instead of calling MPI_Recv, we should post a starpu mpi recv request
+#endif
+		MPI_Status status;
+		memset(&status, 0, sizeof(MPI_Status));
+		req->ret = MPI_Recv(req->ptr, req->count, req->datatype, req->srcdst, req->mpi_tag, req->comm, &status);
+	}
+
+	if (req->request_type == RECV_REQ || req->request_type == SEND_REQ || req->request_type == PROBE_REQ)
 	{
 		if (req->user_datatype == 1)
 		{
@@ -590,7 +628,7 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req)
 		starpu_data_release(req->data_handle);
 	}
 
-	if (req->request_type == RECV_REQ)
+	if (req->request_type == RECV_REQ || req->request_type == PROBE_REQ)
 	{
 		TRACE_MPI_IRECV_END(req->srcdst, req->mpi_tag);
 	}
@@ -599,7 +637,7 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req)
 	if (req->callback)
 		req->callback(req->callback_arg);
 
-	/* tell anyone potentiallly waiting on the request that it is
+	/* tell anyone potentially waiting on the request that it is
 	 * terminated now */
 	_STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
 	req->completed = 1;
@@ -659,7 +697,15 @@ static void _starpu_mpi_test_detached_requests(void)
 		_STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
 
 		//_STARPU_MPI_DEBUG("Test detached request %p - mpitag %d - TYPE %s %d\n", &req->request, req->mpi_tag, _starpu_mpi_request_type(req->request_type), req->srcdst);
-		req->ret = MPI_Test(&req->request, &flag, &status);
+		if (req->request_type == PROBE_REQ)
+		{
+			req->ret = MPI_Iprobe(req->srcdst, req->mpi_tag, req->comm, &flag, &status);
+		}
+		else
+		{
+			req->ret = MPI_Test(&req->request, &flag, &status);
+		}
+
 		STARPU_ASSERT(req->ret == MPI_SUCCESS);
 
 		if (flag)

+ 2 - 1
mpi/src/starpu_mpi_private.h

@@ -66,7 +66,8 @@ enum _starpu_mpi_request_type
 	RECV_REQ=1,
 	WAIT_REQ=2,
 	TEST_REQ=3,
-	BARRIER_REQ=4
+	BARRIER_REQ=4,
+	PROBE_REQ=5
 };
 
 LIST_TYPE(_starpu_mpi_req,

+ 4 - 0
mpi/tests/Makefile.am

@@ -80,6 +80,7 @@ starpu_mpi_TESTS =				\
 	mpi_irecv				\
 	mpi_isend_detached			\
 	mpi_irecv_detached			\
+	mpi_probe				\
 	mpi_detached_tag			\
 	ring					\
 	ring_async				\
@@ -104,6 +105,7 @@ noinst_PROGRAMS =				\
 	mpi_irecv				\
 	mpi_isend_detached			\
 	mpi_irecv_detached			\
+	mpi_probe				\
 	mpi_detached_tag			\
 	ring					\
 	ring_async				\
@@ -129,6 +131,8 @@ mpi_isend_detached_LDADD =			\
 	../src/libstarpumpi-@STARPU_EFFECTIVE_VERSION@.la
 mpi_irecv_detached_LDADD =			\
 	../src/libstarpumpi-@STARPU_EFFECTIVE_VERSION@.la
+mpi_probe_LDADD =			\
+	../src/libstarpumpi-@STARPU_EFFECTIVE_VERSION@.la
 mpi_detached_tag_LDADD =				\
 	../src/libstarpumpi-@STARPU_EFFECTIVE_VERSION@.la
 pingpong_LDADD =					\

+ 100 - 0
mpi/tests/mpi_probe.c

@@ -0,0 +1,100 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2010, 2012  Université de Bordeaux 1
+ * Copyright (C) 2010, 2011, 2012  Centre National de la Recherche Scientifique
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <starpu_mpi.h>
+#include <common/utils.h>
+#include "helper.h"
+
+#ifdef STARPU_QUICK_CHECK
+#  define NITER	16
+#else
+#  define NITER	2048
+#endif
+#define SIZE	16
+
+float *tab;
+starpu_data_handle_t tab_handle;
+
+static _starpu_pthread_mutex_t mutex = _STARPU_PTHREAD_MUTEX_INITIALIZER;
+static _starpu_pthread_cond_t cond = _STARPU_PTHREAD_COND_INITIALIZER;
+
+void callback(void *arg __attribute__((unused)))
+{
+	unsigned *received = arg;
+
+	_STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+	*received = 1;
+	_STARPU_PTHREAD_COND_SIGNAL(&cond);
+	_STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+}
+
+
+int main(int argc, char **argv)
+{
+	int ret, rank, size;
+
+	MPI_Init(NULL, NULL);
+	MPI_Comm_rank(MPI_COMM_WORLD, &rank);
+	MPI_Comm_size(MPI_COMM_WORLD, &size);
+
+	if (size%2 != 0)
+	{
+		if (rank == 0)
+			FPRINTF(stderr, "We need a even number of processes.\n");
+
+		MPI_Finalize();
+		return STARPU_TEST_SKIPPED;
+	}
+
+	ret = starpu_init(NULL);
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
+	ret = starpu_mpi_init(NULL, NULL, 0);
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_mpi_init");
+
+	tab = malloc(SIZE*sizeof(float));
+
+	starpu_vector_data_register(&tab_handle, 0, (uintptr_t)tab, SIZE, sizeof(float));
+
+	unsigned nloops = NITER;
+	unsigned loop;
+	int other_rank = rank%2 == 0 ? rank+1 : rank-1;
+
+	for (loop = 0; loop < nloops; loop++)
+	{
+		if ((loop % 2) == (rank%2))
+		{
+			starpu_mpi_send(tab_handle, other_rank, loop, MPI_COMM_WORLD);
+		}
+		else
+		{
+			int received = 0;
+			starpu_mpi_irecv_probe_detached(tab_handle, other_rank, loop, MPI_COMM_WORLD, callback, &received);
+
+			_STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+			while (!received)
+				_STARPU_PTHREAD_COND_WAIT(&cond, &mutex);
+			_STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+		}
+	}
+
+	starpu_mpi_shutdown();
+	starpu_shutdown();
+
+	MPI_Finalize();
+
+	return 0;
+}