Przeglądaj źródła

merge branch nmad

Nathalie Furmento 7 lat temu
rodzic
commit
6180e9dd5d

+ 1 - 0
ChangeLog

@@ -30,6 +30,7 @@ New features:
   * Add starpu_replay tool to replay tasks.rec files with Simgrid.
   * Add experimental support of NUMA nodes. Use STARPU_USE_NUMA to activate it.
   * Add a new set of functions to make Out-of-Core based on HDF5 Library.
+  * Add a new implementation of StarPU-MPI on top of NewMadeleine
 
 Small features:
   * Scheduling contexts may now be associated a user data pointer at creation

+ 1 - 1
Makefile.am

@@ -35,7 +35,7 @@ endif
 
 SUBDIRS += doc
 
-if USE_MPI
+if STARPU_USE_MPI
 SUBDIRS += mpi
 endif
 

+ 50 - 3
configure.ac

@@ -404,6 +404,44 @@ AC_ARG_ENABLE(maxmpidev, [AS_HELP_STRING([--enable-maxmpidev=<number>],
 AC_MSG_RESULT($nmaxmpidev)
 AC_DEFINE_UNQUOTED(STARPU_MAXMPIDEVS, [$nmaxmpidev], [maximum number of MPI devices])
 
+
+###############################################################################
+#                                                                             #
+#                                NEW MADELEINE                                #
+#                                                                             #
+###############################################################################
+
+AC_ARG_ENABLE(nmad, [AS_HELP_STRING([--enable-nmad],
+                              [Enable StarPU MPI library generation using new madeleine instead of mpi])],
+            [enable_nmad=$enableval],
+            [enable_nmad=no])
+
+if test x$use_mpi = xyes -a \( x$enable_nmad \) ; then
+    cc_or_mpicc=$mpicc_path
+        # For some reason, libtool uses gcc instead of mpicc when linking
+        # libstarpumpi.
+        # On Darwin (and maybe other systems ?) the linker will fail (undefined
+        # references to MPI_*). We manually add the required flags to fix this
+        # issue.
+        AC_SUBST(MPICC_LDFLAGS, `$mpicc_path --showme:link`)
+else
+    cc_or_mpicc=$CC
+fi
+
+build_nmad_lib=no
+AC_SUBST(CC_OR_MPICC, $cc_or_mpicc)
+#We can only build StarPU MPI Library if User wants it and MPI is available
+if test x$use_mpi = xyes -a x$enable_nmad = xyes ; then
+    build_nmad_lib=yes
+    enable_mpi=no
+    PKG_CHECK_MODULES([NMAD],[nmad pioman tbx])
+else
+    build_nmad_lib=no
+fi
+
+AC_MSG_CHECKING(whether the StarPU MPI nmad library should be generated)
+AC_MSG_RESULT($build_nmad_lib)
+
 ###############################################################################
 #                                                                             #
 #                                    DSM                                      #
@@ -572,14 +610,22 @@ fi
 AC_MSG_CHECKING(whether the StarPU MPI library should be generated)
 AC_MSG_RESULT($build_mpi_lib)
 
-AC_SUBST(USE_MPI, $build_mpi_lib)
-AM_CONDITIONAL(USE_MPI, test x$build_mpi_lib = xyes)
-if test x$build_mpi_lib = xyes; then
+AM_CONDITIONAL(USE_MPI, test x$build_mpi_lib = xyes -o x$build_nmad_lib = xyes)
+if test x$build_mpi_lib = xyes -o x$build_nmad_lib = xyes ; then
 	AC_DEFINE(STARPU_USE_MPI,[1],[whether the StarPU MPI library is available])
+	if test x$build_mpi_lib = xyes ; then
+		AC_DEFINE(STARPU_USE_MPI_MPI,[1],[whether the StarPU MPI library (with a native MPI implementation) is available])
+	else
+		AC_DEFINE(STARPU_USE_MPI_NMAD,[1],[whether the StarPU MPI library (with a NewMadeleine implementation) is available])
+	fi
 else
 	running_mpi_check=no
 fi
 
+AM_CONDITIONAL(STARPU_USE_MPI_MPI, test x$build_mpi_lib = xyes)
+AM_CONDITIONAL(STARPU_USE_MPI_NMAD, test x$build_nmad_lib = xyes)
+AM_CONDITIONAL(STARPU_USE_MPI, test x$build_nmad_lib = xyes -o x$build_mpi_lib = xyes)
+
 AC_ARG_WITH(mpiexec-args, [AS_HELP_STRING([--with-mpiexec-args[=<arguments to give when running mpiexec>]],
 			[Arguments for mpiexec])],
 	[
@@ -3467,6 +3513,7 @@ AC_MSG_NOTICE([
 
 	StarPU Extensions:
 	       StarPU MPI enabled:                            $build_mpi_lib
+	       StarPU MPI(nmad) enabled:                      $build_nmad_lib
 	       MPI test suite:                                $running_mpi_check
 	       Master-Slave MPI enabled:                      $use_mpi_master_slave
 	       StarPU DSM enabled:                            $enable_dsm

+ 12 - 0
doc/doxygen/chapters/410_mpi_support.doxy

@@ -874,6 +874,18 @@ data transfers and supports data matrices which do not fit in memory (out-of-cor
 </li>
 </ul>
 
+\section MPIImplementation Notes about the Implementation
+
+StarPU-MPI is implemented directly on top of MPI.
+
+Since the release 1.3.0, an implementation on top of NewMadeleine, an
+optimizing communication library for high-performance networks, is
+also provided. To use it, one needs to install NewMadeleine (see
+http://pm2.gforge.inria.fr/newmadeleine/) and enable the configure
+option \ref enable-nmad "--enable-nmad".
+
+Both implementations provide the same public API.
+
 \section MPIMasterSlave MPI Master Slave Support
 
 StarPU provides an other way to execute applications across many

+ 7 - 0
doc/doxygen/chapters/510_configure_options.doxy

@@ -395,6 +395,13 @@ $ STARPU_SILENT=1 mpirun -np 2 ./insert_task
 \endverbatim
 </dd>
 
+<dt>--enable-nmad</dt>
+<dd>
+\anchor enable-nmad
+\addindex __configure__--enable-nmad
+Enable the NewMadeleine implementation for StarPU-MPI.
+</dd>
+
 <dt>--disable-fortran</dt>
 <dd>
 \anchor disable-fortran

+ 2 - 0
include/starpu_config.h.in

@@ -50,6 +50,8 @@
 #undef STARPU_HAVE_ICC
 
 #undef STARPU_USE_MPI
+#undef STARPU_USE_MPI_MPI
+#undef STARPU_USE_MPI_NMAD
 
 #undef STARPU_ATLAS
 #undef STARPU_GOTO

+ 8 - 4
mpi/examples/Makefile.am

@@ -120,13 +120,17 @@ AM_LDFLAGS = $(STARPU_OPENCL_LDFLAGS) $(STARPU_CUDA_LDFLAGS) $(FXT_LDFLAGS) $(ST
 # Stencil example #
 ###################
 if BUILD_EXAMPLES
-examplebin_PROGRAMS +=				\
-	stencil/stencil5			\
-	stencil/stencil5_lb
+examplebin_PROGRAMS +=		\
+	stencil/stencil5
+starpu_mpi_EXAMPLES	+=	\
+	stencil/stencil5
 
+if STARPU_USE_MPI_MPI
+examplebin_PROGRAMS +=		\
+	stencil/stencil5_lb
 starpu_mpi_EXAMPLES	+=	\
-	stencil/stencil5	\
 	stencil/stencil5_lb
+endif
 
 endif
 

+ 2 - 0
mpi/include/starpu_mpi_lb.h

@@ -18,6 +18,8 @@
 #ifndef __STARPU_MPI_LOAD_BALANCER_H__
 #define __STARPU_MPI_LOAD_BALANCER_H__
 
+#include <starpu.h>
+
 #ifdef __cplusplus
 extern "C"
 {

+ 15 - 13
mpi/src/Makefile.am

@@ -21,10 +21,10 @@ BUILT_SOURCES =
 
 CLEANFILES = *.gcno *.gcda *.linkinfo
 
-AM_CFLAGS = -Wall $(STARPU_CUDA_CPPFLAGS) $(STARPU_OPENCL_CPPFLAGS) $(FXT_CFLAGS) $(MAGMA_CFLAGS) $(HWLOC_CFLAGS) $(GLOBAL_AM_CFLAGS)
-LIBS = $(top_builddir)/src/@LIBSTARPU_LINK@ @LIBS@ $(FXT_LIBS) $(MAGMA_LIBS)
+AM_CFLAGS = -Wall $(STARPU_CUDA_CPPFLAGS) $(STARPU_OPENCL_CPPFLAGS) $(FXT_CFLAGS) $(MAGMA_CFLAGS) $(HWLOC_CFLAGS) $(GLOBAL_AM_CFLAGS) $(NMAD_CFLAGS)
+LIBS = $(top_builddir)/src/@LIBSTARPU_LINK@ @LIBS@ $(FXT_LIBS) $(MAGMA_LIBS) $(NMAD_LIBS)
 AM_CPPFLAGS = -I$(top_srcdir)/include/ -I$(top_srcdir)/src/ -I$(top_builddir)/src -I$(top_builddir)/include -I$(top_srcdir)/mpi/include -I$(top_srcdir)/mpi/src
-AM_LDFLAGS = $(STARPU_OPENCL_LDFLAGS) $(STARPU_CUDA_LDFLAGS) $(STARPU_COI_LDFLAGS) $(STARPU_SCIF_LDFLAGS)
+AM_LDFLAGS = $(STARPU_OPENCL_LDFLAGS) $(STARPU_CUDA_LDFLAGS) $(STARPU_COI_LDFLAGS) $(STARPU_SCIF_LDFLAGS) $(NMAD_LDFLAGS)
 
 ldflags =
 
@@ -64,13 +64,13 @@ noinst_HEADERS =					\
 	starpu_mpi_cache.h				\
 	starpu_mpi_select_node.h			\
 	starpu_mpi_cache_stats.h			\
-	starpu_mpi_early_data.h				\
-	starpu_mpi_early_request.h			\
-	starpu_mpi_sync_data.h				\
-	starpu_mpi_comm.h				\
-	starpu_mpi_tag.h				\
 	starpu_mpi_task_insert.h			\
 	starpu_mpi_init.h				\
+	mpi/starpu_mpi_early_data.h			\
+	mpi/starpu_mpi_early_request.h			\
+	mpi/starpu_mpi_sync_data.h			\
+	mpi/starpu_mpi_comm.h				\
+	mpi/starpu_mpi_tag.h				\
 	load_balancer/policy/data_movements_interface.h	\
 	load_balancer/policy/load_data_interface.h	\
 	load_balancer/policy/load_balancer_policy.h
@@ -86,14 +86,16 @@ libstarpumpi_@STARPU_EFFECTIVE_VERSION@_la_SOURCES =	\
 	starpu_mpi_cache.c				\
 	starpu_mpi_select_node.c			\
 	starpu_mpi_cache_stats.c			\
-	starpu_mpi_early_data.c				\
-	starpu_mpi_early_request.c			\
-	starpu_mpi_sync_data.c				\
-	starpu_mpi_comm.c				\
-	starpu_mpi_tag.c				\
 	starpu_mpi_fortran.c				\
 	starpu_mpi_task_insert_fortran.c		\
 	starpu_mpi_init.c				\
+	nmad/starpu_mpi_nmad.c				\
+	mpi/starpu_mpi_mpi.c				\
+	mpi/starpu_mpi_early_data.c			\
+	mpi/starpu_mpi_early_request.c			\
+	mpi/starpu_mpi_sync_data.c			\
+	mpi/starpu_mpi_comm.c				\
+	mpi/starpu_mpi_tag.c				\
 	load_balancer/policy/data_movements_interface.c	\
 	load_balancer/policy/load_data_interface.c	\
 	load_balancer/policy/load_heat_propagation.c	\

+ 5 - 0
mpi/src/load_balancer/load_balancer.c

@@ -21,10 +21,13 @@
 #include <starpu_mpi.h>
 #include <starpu_scheduler.h>
 #include <common/utils.h>
+#include <common/config.h>
 
 #include <starpu_mpi_lb.h>
 #include "policy/load_balancer_policy.h"
 
+#if defined(STARPU_USE_MPI_MPI)
+
 static struct load_balancer_policy *defined_policy = NULL;
 typedef void (*_post_exec_hook_func_t)(struct starpu_task *task, unsigned sched_ctx_id);
 static _post_exec_hook_func_t saved_post_exec_hook[STARPU_NMAX_SCHED_CTXS];
@@ -154,3 +157,5 @@ void starpu_mpi_lb_shutdown()
 	}
 	defined_policy = NULL;
 }
+
+#endif /* STARPU_USE_MPI_MPI */

+ 5 - 0
mpi/src/load_balancer/policy/data_movements_interface.c

@@ -18,9 +18,12 @@
 #include <starpu.h>
 #include <stdlib.h>
 #include <starpu_mpi_private.h>
+#include <common/config.h>
 
 #include "data_movements_interface.h"
 
+#if defined(STARPU_USE_MPI_MPI)
+
 int **data_movements_get_ref_tags_table(starpu_data_handle_t handle)
 {
 	struct data_movements_interface *dm_interface =
@@ -279,3 +282,5 @@ void data_movements_data_register(starpu_data_handle_t *handleptr, unsigned home
 
 	starpu_data_register(handleptr, home_node, &data_movements, &interface_data_movements_ops);
 }
+
+#endif

+ 5 - 0
mpi/src/load_balancer/policy/load_data_interface.c

@@ -17,9 +17,12 @@
 
 #include <starpu.h>
 #include <stdlib.h>
+#include <common/config.h>
 
 #include "load_data_interface.h"
 
+#if defined(STARPU_USE_MPI_MPI)
+
 int load_data_get_sleep_threshold(starpu_data_handle_t handle)
 {
 	struct load_data_interface *ld_interface =
@@ -267,3 +270,5 @@ void load_data_data_register(starpu_data_handle_t *handleptr, unsigned home_node
 
 	starpu_data_register(handleptr, home_node, &load_data, &interface_load_data_ops);
 }
+
+#endif

+ 6 - 1
mpi/src/load_balancer/policy/load_heat_propagation.c

@@ -16,7 +16,7 @@
  */
 
 #include <starpu_mpi.h>
-#include <starpu_mpi_tag.h>
+#include <mpi/starpu_mpi_tag.h>
 #include <common/uthash.h>
 #include <common/utils.h>
 #include <math.h>
@@ -24,6 +24,9 @@
 #include "load_balancer_policy.h"
 #include "data_movements_interface.h"
 #include "load_data_interface.h"
+#include <common/config.h>
+
+#if defined(STARPU_USE_MPI_MPI)
 
 static int TAG_LOAD(int n)
 {
@@ -636,3 +639,5 @@ struct load_balancer_policy load_heat_propagation_policy =
 	.finished_task_entry_point = finished_task_heat,
 	.policy_name = "heat"
 };
+
+#endif

+ 6 - 2
mpi/src/starpu_mpi_comm.c

@@ -19,9 +19,11 @@
 #include <starpu.h>
 #include <starpu_mpi.h>
 #include <starpu_mpi_private.h>
-#include <starpu_mpi_comm.h>
+#include <mpi/starpu_mpi_comm.h>
 #include <common/list.h>
 
+#ifdef STARPU_USE_MPI_MPI
+
 struct _starpu_mpi_comm
 {
 	MPI_Comm comm;
@@ -137,7 +139,7 @@ void _starpu_mpi_comm_post_recv()
 		if (_comm->posted == 0)
 		{
 			_STARPU_MPI_DEBUG(3, "Posting a receive to get a data envelop on comm %d %ld\n", i, (long int)_comm->comm);
-			_STARPU_MPI_COMM_FROM_DEBUG(_comm->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, MPI_ANY_SOURCE, _STARPU_MPI_TAG_ENVELOPE, (starpu_mpi_tag_t)_STARPU_MPI_TAG_ENVELOPE, _comm->comm);
+			_STARPU_MPI_COMM_FROM_DEBUG(_comm->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, MPI_ANY_SOURCE, _STARPU_MPI_TAG_ENVELOPE, (int64_t)_STARPU_MPI_TAG_ENVELOPE, _comm->comm);
 			MPI_Irecv(_comm->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, MPI_ANY_SOURCE, _STARPU_MPI_TAG_ENVELOPE, _comm->comm, &_comm->request);
 #ifdef STARPU_SIMGRID
 			_starpu_mpi_simgrid_wait_req(&_comm->request, &_comm->status, &_comm->queue, &_comm->done);
@@ -218,3 +220,5 @@ void _starpu_mpi_comm_cancel_recv()
 	}
 	STARPU_PTHREAD_MUTEX_UNLOCK(&_starpu_mpi_comms_mutex);
 }
+
+#endif /* STARPU_USE_MPI_MPI */

+ 3 - 0
mpi/src/starpu_mpi_comm.h

@@ -21,6 +21,8 @@
 #include <stdlib.h>
 #include <mpi.h>
 
+#ifdef STARPU_USE_MPI_MPI
+
 #ifdef __cplusplus
 extern "C"
 {
@@ -37,4 +39,5 @@ void _starpu_mpi_comm_cancel_recv();
 }
 #endif
 
+#endif // STARPU_USE_MPI_MPI
 #endif // __STARPU_MPI_COMM_H__

+ 4 - 1
mpi/src/starpu_mpi_early_data.c

@@ -17,10 +17,12 @@
 
 #include <stdlib.h>
 #include <starpu_mpi.h>
-#include <starpu_mpi_early_data.h>
+#include <mpi/starpu_mpi_early_data.h>
 #include <starpu_mpi_private.h>
 #include <common/uthash.h>
 
+#ifdef STARPU_USE_MPI_MPI
+
 struct _starpu_mpi_early_data_handle_hashlist
 {
 	struct _starpu_mpi_early_data_handle_list list;
@@ -127,3 +129,4 @@ void _starpu_mpi_early_data_add(struct _starpu_mpi_early_data_handle *early_data
 	STARPU_PTHREAD_MUTEX_UNLOCK(&_starpu_mpi_early_data_handle_mutex);
 }
 
+#endif // STARPU_USE_MPI_MPI

+ 3 - 0
mpi/src/starpu_mpi_early_data.h

@@ -25,6 +25,8 @@
 #include <common/list.h>
 #include <starpu_mpi_private.h>
 
+#ifdef STARPU_USE_MPI_MPI
+
 #ifdef __cplusplus
 extern "C"
 {
@@ -53,4 +55,5 @@ void _starpu_mpi_early_data_add(struct _starpu_mpi_early_data_handle *early_data
 }
 #endif
 
+#endif /*  STARPU_USE_MPI_MPI */
 #endif /* __STARPU_MPI_EARLY_DATA_H__ */

+ 5 - 1
mpi/src/starpu_mpi_early_request.c

@@ -18,9 +18,11 @@
 #include <stdlib.h>
 #include <starpu_mpi.h>
 #include <starpu_mpi_private.h>
-#include <starpu_mpi_early_request.h>
+#include <mpi/starpu_mpi_early_request.h>
 #include <common/uthash.h>
 
+#ifdef STARPU_USE_MPI_MPI
+
 /** stores application requests for which data have not been received yet */
 struct _starpu_mpi_early_request_hashlist
 {
@@ -115,3 +117,5 @@ void _starpu_mpi_early_request_enqueue(struct _starpu_mpi_req *req)
 	_starpu_mpi_early_request_hash_count ++;
 	STARPU_PTHREAD_MUTEX_UNLOCK(&_starpu_mpi_early_request_mutex);
 }
+
+#endif // STARPU_USE_MPI_MPI

+ 3 - 0
mpi/src/starpu_mpi_early_request.h

@@ -24,6 +24,8 @@
 #include <common/config.h>
 #include <common/list.h>
 
+#ifdef STARPU_USE_MPI_MPI
+
 #ifdef __cplusplus
 extern "C"
 {
@@ -41,4 +43,5 @@ struct _starpu_mpi_req* _starpu_mpi_early_request_dequeue(starpu_mpi_tag_t data_
 }
 #endif
 
+#endif /* STARPU_USE_MPI_MPI */
 #endif /* __STARPU_MPI_EARLY_REQUEST_H__ */

Plik diff jest za duży
+ 1657 - 0
mpi/src/mpi/starpu_mpi_mpi.c


+ 4 - 1
mpi/src/starpu_mpi_sync_data.c

@@ -16,10 +16,12 @@
 
 #include <stdlib.h>
 #include <starpu_mpi.h>
-#include <starpu_mpi_sync_data.h>
+#include <mpi/starpu_mpi_sync_data.h>
 #include <starpu_mpi_private.h>
 #include <common/uthash.h>
 
+#ifdef STARPU_USE_MPI_MPI
+
 struct _starpu_mpi_sync_data_handle_hashlist
 {
 	struct _starpu_mpi_req_list list;
@@ -148,3 +150,4 @@ void _starpu_mpi_sync_data_add(struct _starpu_mpi_req *sync_req)
 #endif
 }
 
+#endif // STARPU_USE_MPI_MPI

+ 3 - 0
mpi/src/starpu_mpi_sync_data.h

@@ -23,6 +23,8 @@
 #include <common/config.h>
 #include <common/list.h>
 
+#ifdef STARPU_USE_MPI_MPI
+
 #ifdef __cplusplus
 extern "C"
 {
@@ -40,4 +42,5 @@ int _starpu_mpi_sync_data_count();
 }
 #endif
 
+#endif /* STARPU_USE_MPI_MPI */
 #endif /* __STARPU_MPI_SYNC_DATA_H__ */

+ 5 - 1
mpi/src/starpu_mpi_tag.c

@@ -23,6 +23,8 @@
 #include <common/starpu_spinlock.h>
 #include <datawizard/coherency.h>
 
+#ifdef STARPU_USE_MPI_MPI
+
 /* Entry in the `registered_tag_handles' hash table.  */
 struct handle_tag_entry
 {
@@ -85,7 +87,7 @@ void _starpu_mpi_tag_data_register(starpu_data_handle_t handle, starpu_mpi_tag_t
 	_STARPU_MPI_MALLOC(entry, sizeof(*entry));
 
 	STARPU_ASSERT_MSG(!(_starpu_mpi_tag_get_data_handle_from_tag(data_tag)),
-			  "There is already a data handle %p registered with the tag %"PRIi64"d\n", _starpu_mpi_tag_get_data_handle_from_tag(data_tag), data_tag);
+			  "There is already a data handle %p registered with the tag %ld\n", _starpu_mpi_tag_get_data_handle_from_tag(data_tag), data_tag);
 
 	_STARPU_MPI_DEBUG(42, "Adding handle %p with tag %"PRIi64"d in hashtable\n", handle, data_tag);
 
@@ -119,3 +121,5 @@ int _starpu_mpi_tag_data_release(starpu_data_handle_t handle)
 	}
 	return 0;
 }
+
+#endif // STARPU_USE_MPI_MPI

+ 3 - 0
mpi/src/starpu_mpi_tag.h

@@ -21,6 +21,8 @@
 #include <stdlib.h>
 #include <mpi.h>
 
+#ifdef STARPU_USE_MPI_MPI
+
 #ifdef __cplusplus
 extern "C"
 {
@@ -37,4 +39,5 @@ starpu_data_handle_t _starpu_mpi_tag_get_data_handle_from_tag(starpu_mpi_tag_t d
 }
 #endif
 
+#endif // STARPU_USE_MPI_MPI
 #endif // __STARPU_MPI_TAG_H__

+ 789 - 0
mpi/src/nmad/starpu_mpi_nmad.c

@@ -0,0 +1,789 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2009, 2010-2014, 2017  Université de Bordeaux
+ * Copyright (C) 2010, 2011, 2012, 2013, 2014, 2015  Centre National de la Recherche Scientifique
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <stdlib.h>
+#include <limits.h>
+#include <starpu_mpi.h>
+#include <starpu_mpi_datatype.h>
+#include <starpu_mpi_private.h>
+#include <starpu_mpi_cache.h>
+#include <starpu_profiling.h>
+#include <starpu_mpi_stats.h>
+#include <starpu_mpi_cache.h>
+#include <starpu_mpi_select_node.h>
+#include <starpu_mpi_init.h>
+#include <common/config.h>
+#include <common/thread.h>
+#include <datawizard/coherency.h>
+#include <core/task.h>
+#include <core/topology.h>
+
+#ifdef STARPU_USE_MPI_NMAD
+
+#include <nm_sendrecv_interface.h>
+#include <nm_mpi_nmad.h>
+
+static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req,nm_sr_event_t event);
+#ifdef STARPU_VERBOSE
+static char *_starpu_mpi_request_type(enum _starpu_mpi_request_type request_type);
+#endif
+static void _starpu_mpi_handle_new_request(void *arg);
+
+static void _starpu_mpi_handle_pending_request(struct _starpu_mpi_req *req);
+static void _starpu_mpi_add_sync_point_in_fxt(void);
+
+static int mpi_thread_cpuid = -1;
+int _starpu_mpi_fake_world_size = -1;
+int _starpu_mpi_fake_world_rank = -1;
+
+/* Condition to wake up waiting for all current MPI requests to finish */
+static starpu_pthread_t progress_thread;
+static starpu_pthread_cond_t progress_cond;
+static starpu_pthread_mutex_t progress_mutex;
+static volatile int running = 0;
+
+extern struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle, int source, int data_tag, MPI_Comm comm, unsigned detached, unsigned sync, void (*callback)(void *), void *arg, int sequential_consistency, int is_internal_req, starpu_ssize_t count);
+
+/* Count requests posted by the application and not yet submitted to MPI, i.e pushed into the new_requests list */
+
+static volatile int pending_request = 0;
+
+#define REQ_FINALIZED 0x1
+
+PUK_LFSTACK_TYPE(callback,	struct _starpu_mpi_req *req;);
+static callback_lfstack_t callback_stack = NULL;
+
+static starpu_sem_t callback_sem;
+
+void _starpu_mpi_request_init(struct _starpu_mpi_req **req)
+{
+	_STARPU_MPI_CALLOC(*req, 1, sizeof(struct _starpu_mpi_req));
+
+	/* Initialize the request structure */
+	(*req)->data_handle = NULL;
+	(*req)->prio = 0;
+	(*req)->completed = 0;
+
+	(*req)->datatype = 0;
+	(*req)->datatype_name = NULL;
+	(*req)->ptr = NULL;
+	(*req)->count = -1;
+	(*req)->registered_datatype = -1;
+
+	(*req)->node_tag.rank = -1;
+	(*req)->node_tag.data_tag = -1;
+	(*req)->node_tag.comm = 0;
+
+	(*req)->func = NULL;
+
+	(*req)->status = NULL;
+	//	(*req)->data_request = 0;
+	(*req)->flag = NULL;
+
+	(*req)->ret = -1;
+	piom_cond_init(&((*req)->req_cond), 0);
+	//STARPU_PTHREAD_MUTEX_INIT(&((*req)->req_mutex), NULL);
+	//STARPU_PTHREAD_COND_INIT(&((*req)->req_cond), NULL);
+	//	STARPU_PTHREAD_MUTEX_INIT(&((*req)->posted_mutex), NULL);
+	//STARPU_PTHREAD_COND_INIT(&((*req)->posted_cond), NULL);
+
+	(*req)->request_type = UNKNOWN_REQ;
+
+	(*req)->submitted = 0;
+	(*req)->completed = 0;
+	(*req)->posted = 0;
+
+	//(*req)->other_request = NULL;
+
+	(*req)->sync = 0;
+	(*req)->detached = -1;
+	(*req)->callback = NULL;
+	(*req)->callback_arg = NULL;
+
+	//	(*req)->size_req = 0;
+	//(*req)->internal_req = NULL;
+	//(*req)->is_internal_req = 0;
+	//(*req)->to_destroy = 1;
+	//(*req)->early_data_handle = NULL;
+	//(*req)->envelope = NULL;
+	(*req)->sequential_consistency = 1;
+	(*req)->pre_sync_jobid = -1;
+	(*req)->post_sync_jobid = -1;
+
+#ifdef STARPU_SIMGRID
+	starpu_pthread_queue_init(&((*req)->queue));
+	starpu_pthread_queue_register(&wait, &((*req)->queue));
+	(*req)->done = 0;
+#endif
+}
+
+void _starpu_mpi_request_destroy(struct _starpu_mpi_req *req)
+{
+	piom_cond_destroy(&(req->req_cond));
+	free(req);
+}
+
+/********************************************************/
+/*                                                      */
+/*  Send/Receive functionalities                        */
+/*                                                      */
+/********************************************************/
+
+static void nop_acquire_cb(void *arg)
+{
+	starpu_data_release(arg);
+}
+
+struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle_t data_handle,
+						       int srcdst, starpu_mpi_tag_t data_tag, MPI_Comm comm,
+						       unsigned detached, unsigned sync, int prio, void (*callback)(void *), void *arg,
+						       enum _starpu_mpi_request_type request_type, void (*func)(struct _starpu_mpi_req *),
+						       enum starpu_data_access_mode mode,
+						       int sequential_consistency,
+						       int is_internal_req,
+						       starpu_ssize_t count)
+{
+
+	struct _starpu_mpi_req *req;
+
+	if (_starpu_mpi_fake_world_size != -1)
+	{
+		/* Don't actually do the communication */
+		starpu_data_acquire_on_node_cb_sequential_consistency(data_handle, STARPU_MAIN_RAM, mode, nop_acquire_cb, data_handle, sequential_consistency);
+		return NULL;
+	}
+
+	_STARPU_MPI_LOG_IN();
+	STARPU_ATOMIC_ADD( &pending_request, 1);
+
+	/* Initialize the request structure */
+	_starpu_mpi_request_init(&req);
+	req->request_type = request_type;
+	/* prio_list is sorted by increasing values */
+	req->prio = prio;
+	req->data_handle = data_handle;
+	req->node_tag.rank = srcdst;
+	req->node_tag.data_tag = data_tag;
+	req->node_tag.comm = comm;
+	req->detached = detached;
+	req->sync = sync;
+	req->callback = callback;
+	req->callback_arg = arg;
+	req->func = func;
+	req->sequential_consistency = sequential_consistency;
+	nm_mpi_nmad_dest(&req->session, &req->gate, comm, req->node_tag.rank);
+
+	/* Asynchronously request StarPU to fetch the data in main memory: when
+	 * it is available in main memory, _starpu_mpi_submit_new_mpi_request(req) is called and
+	 * the request is actually submitted */
+	starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(data_handle, STARPU_MAIN_RAM, mode, _starpu_mpi_handle_new_request, (void *)req, sequential_consistency, &req->pre_sync_jobid, &req->post_sync_jobid);
+
+	_STARPU_MPI_LOG_OUT();
+	return req;
+}
+
+/********************************************************/
+/*                                                      */
+/*  Send functionalities                                */
+/*                                                      */
+/********************************************************/
+
+static void _starpu_mpi_isend_data_func(struct _starpu_mpi_req *req)
+{
+	_STARPU_MPI_LOG_IN();
+
+	_STARPU_MPI_DEBUG(30, "post NM isend request %p type %s tag %ld src %d data %p datasize %ld ptr %p datatype '%s' count %d registered_datatype %d sync %d\n", req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.rank, req->data_handle, starpu_data_get_size(req->data_handle), req->ptr, req->datatype_name, (int)req->count, req->registered_datatype, req->sync);
+
+	_starpu_mpi_comm_amounts_inc(req->node_tag.comm, req->node_tag.rank, req->datatype, req->count);
+
+	_STARPU_MPI_TRACE_ISEND_SUBMIT_BEGIN(req->node_tag.rank, req->node_tag.data_tag, 0);
+
+	struct nm_data_s data;
+	nm_mpi_nmad_data_get(&data, (void*)req->ptr, req->datatype, req->count);
+	nm_sr_send_init(req->session, &(req->data_request));
+	nm_sr_send_pack_data(req->session, &(req->data_request), &data);
+	nm_sr_send_set_priority(req->session, &req->data_request, req->prio);
+
+	if (req->sync == 0)
+	{
+		req->ret = nm_sr_send_isend(req->session, &(req->data_request), req->gate, req->node_tag.data_tag);
+		STARPU_ASSERT_MSG(req->ret == NM_ESUCCESS, "MPI_Isend returning %d", req->ret);
+	}
+	else
+	{
+		req->ret = nm_sr_send_issend(req->session, &(req->data_request), req->gate, req->node_tag.data_tag);
+		STARPU_ASSERT_MSG(req->ret == NM_ESUCCESS, "MPI_Issend returning %d", req->ret);
+	}
+
+	_STARPU_MPI_TRACE_ISEND_SUBMIT_END(req->node_tag.rank, req->node_tag.data_tag, starpu_data_get_size(req->data_handle), req->pre_sync_jobid);
+
+	_starpu_mpi_handle_pending_request(req);
+
+	_STARPU_MPI_LOG_OUT();
+}
+
+void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
+{
+	_starpu_mpi_datatype_allocate(req->data_handle, req);
+
+	if (req->registered_datatype == 1)
+	{
+		req->waited = 1;
+		req->count = 1;
+		req->ptr = starpu_data_handle_to_pointer(req->data_handle, STARPU_MAIN_RAM);
+	}
+	else
+	{
+		starpu_ssize_t psize = -1;
+		int ret;
+		req->waited =2;
+
+		// Do not pack the data, just try to find out the size
+		starpu_data_pack(req->data_handle, NULL, &psize);
+
+		if (psize != -1)
+		{
+			// We already know the size of the data, let's send it to overlap with the packing of the data
+			_STARPU_MPI_DEBUG(20, "Sending size %ld (%ld %s) to node %d (first call to pack)\n", psize, sizeof(req->count), "MPI_BYTE", req->node_tag.rank);
+			req->count = psize;
+			//ret = nm_sr_isend(nm_mpi_communicator_get_session(p_req->p_comm),nm_mpi_communicator_get_gate(p_comm,req->srcdst), req->mpi_tag,&req->count, sizeof(req->count), &req->size_req);
+			ret = nm_sr_isend(req->session,req->gate, req->node_tag.data_tag,&req->count, sizeof(req->count), &req->size_req);
+
+			//	ret = MPI_Isend(&req->count, sizeof(req->count), MPI_BYTE, req->srcdst, req->mpi_tag, req->comm, &req->size_req);
+			STARPU_ASSERT_MSG(ret == NM_ESUCCESS, "when sending size, nm_sr_isend returning %d", ret);
+		}
+
+		// Pack the data
+		starpu_data_pack(req->data_handle, &req->ptr, &req->count);
+		if (psize == -1)
+		{
+			// We know the size now, let's send it
+			_STARPU_MPI_DEBUG(1, "Sending size %ld (%ld %s) with tag %ld to node %d (second call to pack)\n", req->count, sizeof(req->count), "MPI_BYTE", req->node_tag.data_tag, req->node_tag.rank);
+			ret = nm_sr_isend(req->session,req->gate, req->node_tag.data_tag,&req->count, sizeof(req->count), &req->size_req);
+			STARPU_ASSERT_MSG(ret == NM_ESUCCESS, "when sending size, nm_sr_isend returning %d", ret);
+		}
+		else
+		{
+			// We check the size returned with the 2 calls to pack is the same
+			STARPU_ASSERT_MSG(req->count == psize, "Calls to pack_data returned different sizes %ld != %ld", req->count, psize);
+		}
+
+		// We can send the data now
+	}
+	_starpu_mpi_isend_data_func(req);
+}
+
+/********************************************************/
+/*                                                      */
+/*  Receive functionalities                             */
+/*                                                      */
+/********************************************************/
+
+static void _starpu_mpi_irecv_data_func(struct _starpu_mpi_req *req)
+{
+	_STARPU_MPI_LOG_IN();
+
+	_STARPU_MPI_DEBUG(20, "post NM irecv request %p type %s tag %ld src %d data %p ptr %p datatype '%s' count %d registered_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.rank, req->data_handle, req->ptr, req->datatype_name, (int)req->count, req->registered_datatype);
+
+	_STARPU_MPI_TRACE_IRECV_SUBMIT_BEGIN(req->node_tag.rank, req->node_tag.data_tag);
+
+	//req->ret = MPI_Irecv(req->ptr, req->count, req->datatype, req->srcdst, req->mpi_tag, req->comm, &req->request);
+	struct nm_data_s data;
+	nm_mpi_nmad_data_get(&data, (void*)req->ptr, req->datatype, req->count);
+	nm_sr_recv_init(req->session, &(req->data_request));
+	nm_sr_recv_unpack_data(req->session, &(req->data_request), &data);
+	nm_sr_recv_irecv(req->session, &(req->data_request), req->gate, req->node_tag.data_tag, NM_TAG_MASK_FULL);
+
+	_STARPU_MPI_TRACE_IRECV_SUBMIT_END(req->node_tag.rank, req->node_tag.data_tag);
+
+	_starpu_mpi_handle_pending_request(req);
+
+	_STARPU_MPI_LOG_OUT();
+}
+
+struct _starpu_mpi_irecv_size_callback
+{
+	starpu_data_handle_t handle;
+	struct _starpu_mpi_req *req;
+};
+
+static void _starpu_mpi_irecv_size_callback(void *arg)
+{
+	struct _starpu_mpi_irecv_size_callback *callback = (struct _starpu_mpi_irecv_size_callback *)arg;
+
+	starpu_data_unregister(callback->handle);
+	callback->req->ptr = malloc(callback->req->count);
+	STARPU_ASSERT_MSG(callback->req->ptr, "cannot allocate message of size %ld", callback->req->count);
+	_starpu_mpi_irecv_data_func(callback->req);
+	free(callback);
+}
+
+void _starpu_mpi_irecv_size_func(struct _starpu_mpi_req *req)
+{
+	_STARPU_MPI_LOG_IN();
+
+	_starpu_mpi_datatype_allocate(req->data_handle, req);
+	if (req->registered_datatype == 1)
+	{
+		req->count = 1;
+		req->ptr = starpu_data_handle_to_pointer(req->data_handle, STARPU_MAIN_RAM);
+		_starpu_mpi_irecv_data_func(req);
+	}
+	else
+	{
+		struct _starpu_mpi_irecv_size_callback *callback = malloc(sizeof(struct _starpu_mpi_irecv_size_callback));
+		callback->req = req;
+		starpu_variable_data_register(&callback->handle, 0, (uintptr_t)&(callback->req->count), sizeof(callback->req->count));
+		_STARPU_MPI_DEBUG(4, "Receiving size with tag %ld from node %d\n", req->node_tag.data_tag, req->node_tag.rank);
+		_starpu_mpi_irecv_common(callback->handle, req->node_tag.rank, req->node_tag.data_tag, req->node_tag.comm, 1, 0, _starpu_mpi_irecv_size_callback, callback,1,0,0);
+	}
+
+}
+
+/********************************************************/
+/*                                                      */
+/*  Wait functionalities                                */
+/*                                                      */
+/********************************************************/
+
+#define _starpu_mpi_req_status(PUBLIC_REQ,STATUS) do {			\
+	STATUS->MPI_SOURCE=PUBLIC_REQ->node_tag.rank; /**< field name mandatory by spec */ \
+	STATUS->MPI_TAG=PUBLIC_REQ->node_tag.data_tag;    /**< field name mandatory by spec */ \
+	STATUS->MPI_ERROR=PUBLIC_REQ->ret;  /**< field name mandatory by spec */ \
+	STATUS->size=PUBLIC_REQ->count;       /**< size of data received */ \
+	STATUS->cancelled=0;  /**< whether request was cancelled */	\
+} while(0)
+
+int _starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status)
+{
+	_STARPU_MPI_LOG_IN();
+	STARPU_MPI_ASSERT_MSG(public_req, "starpu_mpi_wait needs a valid starpu_mpi_req");
+	struct _starpu_mpi_req *req = *public_req;
+	STARPU_MPI_ASSERT_MSG(!req->detached, "MPI_Wait cannot be called on a detached request");
+
+	/* we must do a test_locked to avoid race condition :
+	 * without req_cond could still be used and couldn't be freed)*/
+	while (!req->completed || ! piom_cond_test_locked(&(req->req_cond),REQ_FINALIZED))
+	{
+		piom_cond_wait(&(req->req_cond),REQ_FINALIZED);
+	}
+
+	if (status!=MPI_STATUS_IGNORE)
+		_starpu_mpi_req_status(req,status);
+
+	_starpu_mpi_request_destroy(req);
+	*public_req = NULL;
+	_STARPU_MPI_LOG_OUT();
+	return MPI_SUCCESS;
+}
+
+/********************************************************/
+/*                                                      */
+/*  Test functionalities                                */
+/*                                                      */
+/********************************************************/
+
+int _starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status)
+{
+	_STARPU_MPI_LOG_IN();
+	STARPU_MPI_ASSERT_MSG(public_req, "starpu_mpi_test needs a valid starpu_mpi_req");
+	struct _starpu_mpi_req *req = *public_req;
+	STARPU_MPI_ASSERT_MSG(!req->detached, "MPI_Test cannot be called on a detached request");
+	_STARPU_MPI_DEBUG(2, "Test request %p type %s tag %ld src %d data %p ptr %p datatype '%s' count %d registered_datatype %d \n",
+			  req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.rank, req->data_handle, req->ptr, req->datatype_name, (int)req->count, req->registered_datatype);
+
+	_STARPU_MPI_TRACE_UTESTING_BEGIN(req->node_tag.rank, req->node_tag.data_tag);
+
+	/* we must do a test_locked to avoid race condition :
+	 * without req_cond could still be used and couldn't be freed)*/
+	*flag = req->completed && piom_cond_test_locked(&(req->req_cond),REQ_FINALIZED);
+	if (*flag && status!=MPI_STATUS_IGNORE)
+		_starpu_mpi_req_status(req,status);
+
+	_STARPU_MPI_TRACE_UTESTING_END(req->node_tag.rank, req->node_tag.data_tag);
+
+	if(*flag)
+	{
+		_starpu_mpi_request_destroy(req);
+		*public_req = NULL;
+	}
+	_STARPU_MPI_LOG_OUT();
+	return MPI_SUCCESS;
+}
+
+/********************************************************/
+/*                                                      */
+/*  Barrier functionalities                             */
+/*                                                      */
+/********************************************************/
+
+int _starpu_mpi_barrier(MPI_Comm comm)
+{
+	_STARPU_MPI_LOG_IN();
+	int ret;
+	//	STARPU_ASSERT_MSG(!barrier_running, "Concurrent starpu_mpi_barrier is not implemented, even on different communicators");
+	ret = MPI_Barrier(comm);
+
+	STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Barrier returning %d", ret);
+
+	_STARPU_MPI_LOG_OUT();
+	return ret;
+}
+
+/********************************************************/
+/*                                                      */
+/*  Progression                                         */
+/*                                                      */
+/********************************************************/
+
+#ifdef STARPU_VERBOSE
+static char *_starpu_mpi_request_type(enum _starpu_mpi_request_type request_type)
+{
+	switch (request_type)
+	{
+		case SEND_REQ: return "SEND_REQ";
+		case RECV_REQ: return "RECV_REQ";
+		case WAIT_REQ: return "WAIT_REQ";
+		case TEST_REQ: return "TEST_REQ";
+		case BARRIER_REQ: return "BARRIER_REQ";
+		default: return "unknown request type";
+	}
+}
+#endif
+
+static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req,nm_sr_event_t event)
+{
+	_STARPU_MPI_LOG_IN();
+
+	_STARPU_MPI_DEBUG(2, "complete MPI request %p type %s tag %ld src %d data %p ptr %p datatype '%s' count %d registered_datatype %d \n",
+			  req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.rank, req->data_handle, req->ptr, req->datatype_name, (int)req->count, req->registered_datatype);
+
+	if (req->request_type == RECV_REQ || req->request_type == SEND_REQ)
+	{
+		if (req->registered_datatype == 0)
+		{
+			if(req->waited == 1)
+			        nm_mpi_nmad_data_release(req->datatype);
+			if (req->request_type == SEND_REQ)
+			{
+				req->waited--;
+				// We need to make sure the communication for sending the size
+				// has completed, as MPI can re-order messages, let's count
+				// recerived message.
+				// FIXME concurent access.
+				STARPU_ASSERT_MSG(event == NM_SR_EVENT_FINALIZED, "Callback with event %d", event);
+				if(req->waited>0)
+					return;
+
+			}
+			if (req->request_type == RECV_REQ)
+				// req->ptr is freed by starpu_data_unpack
+				starpu_data_unpack(req->data_handle, req->ptr, req->count);
+			else
+				free(req->ptr);
+		}
+		else
+		{
+		        nm_mpi_nmad_data_release(req->datatype);
+			_starpu_mpi_datatype_free(req->data_handle, &req->datatype);
+		}
+		starpu_data_release(req->data_handle);
+	}
+
+	/* Execute the specified callback, if any */
+	if (req->callback)
+	{
+		struct callback_lfstack_cell_s* c = padico_malloc(sizeof(struct callback_lfstack_cell_s));
+		c->req = req;
+		/* The main thread can exit without waiting
+		* the end of the detached request. Callback thread
+		* must then be kept alive if they have a callback.*/
+
+		callback_lfstack_push(&callback_stack, c);
+		starpu_sem_post(&callback_sem);
+	}
+	else
+	{
+		if(req->detached)
+		{
+			_starpu_mpi_request_destroy(req);
+			// a detached request wont be wait/test (and freed inside).
+		}
+		else
+		{
+			/* tell anyone potentially waiting on the request that it is
+			 * terminated now (should be done after the callback)*/
+			req->completed = 1;
+			piom_cond_signal(&req->req_cond, REQ_FINALIZED);
+		}
+		int pending_remaining = STARPU_ATOMIC_ADD(&pending_request, -1);
+		if (!running && !pending_remaining)
+			starpu_sem_post(&callback_sem);
+	}
+	_STARPU_MPI_LOG_OUT();
+}
+
+void _starpu_mpi_handle_request_termination_callback(nm_sr_event_t event, const nm_sr_event_info_t*event_info, void*ref)
+{
+	_starpu_mpi_handle_request_termination(ref,event);
+}
+
+static void _starpu_mpi_handle_pending_request(struct _starpu_mpi_req *req)
+{
+	if(req->request_type == SEND_REQ && req->waited>1)
+	{
+		nm_sr_request_set_ref(&(req->size_req), req);
+		nm_sr_request_monitor(req->session, &(req->size_req), NM_SR_EVENT_FINALIZED,_starpu_mpi_handle_request_termination_callback);
+	}
+	/* the if must be before, because the first callback can directly free
+	* a detached request (the second callback free if req->waited>1). */
+	nm_sr_request_set_ref(&(req->data_request), req);
+
+	nm_sr_request_monitor(req->session, &(req->data_request), NM_SR_EVENT_FINALIZED,_starpu_mpi_handle_request_termination_callback);
+}
+
+static void _starpu_mpi_handle_new_request(void *arg)
+{
+	_STARPU_MPI_LOG_IN();
+	struct _starpu_mpi_req *req = arg;
+	STARPU_ASSERT_MSG(req, "Invalid request");
+
+	/* submit the request to MPI */
+	_STARPU_MPI_DEBUG(2, "Handling new request %p type %s tag %ld src %d data %p ptr %p datatype '%s' count %d registered_datatype %d \n",
+			  req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.rank, req->data_handle, req->ptr, req->datatype_name, (int)req->count, req->registered_datatype);
+	req->func(req);
+
+	_STARPU_MPI_LOG_OUT();
+}
+
+static void *_starpu_mpi_progress_thread_func(void *arg)
+{
+	struct _starpu_mpi_argc_argv *argc_argv = (struct _starpu_mpi_argc_argv *) arg;
+
+	starpu_pthread_setname("MPI");
+
+#ifndef STARPU_SIMGRID
+	if (mpi_thread_cpuid >= 0)
+		_starpu_bind_thread_on_cpu(mpi_thread_cpuid, STARPU_NOWORKERID);
+	_starpu_mpi_do_initialize(argc_argv);
+	if (mpi_thread_cpuid >= 0)
+		/* In case MPI changed the binding */
+		_starpu_bind_thread_on_cpu(mpi_thread_cpuid, STARPU_NOWORKERID);
+#endif
+
+	_starpu_mpi_fake_world_size = starpu_get_env_number("STARPU_MPI_FAKE_SIZE");
+	_starpu_mpi_fake_world_rank = starpu_get_env_number("STARPU_MPI_FAKE_RANK");
+
+#ifdef STARPU_SIMGRID
+	/* Now that MPI is set up, let the rest of simgrid get initialized */
+	char **argv_cpy;
+	_STARPU_MPI_MALLOC(argv_cpy, *(argc_argv->argc) * sizeof(char*));
+	int i;
+	for (i = 0; i < *(argc_argv->argc); i++)
+		argv_cpy[i] = strdup((*(argc_argv->argv))[i]);
+	MSG_process_create_with_arguments("main", smpi_simulated_main_, NULL, _starpu_simgrid_get_host_by_name("MAIN"), *(argc_argv->argc), argv_cpy);
+	/* And set TSD for us */
+	void **tsd;
+	_STARPU_CALLOC(tsd, MAX_TSD + 1, sizeof(void*));
+	if (!smpi_process_set_user_data)
+	{
+		_STARPU_ERROR("Your version of simgrid does not provide smpi_process_set_user_data, we can not continue without it\n");
+	}
+	smpi_process_set_user_data(tsd);
+#endif
+
+#ifdef STARPU_USE_FXT
+	_starpu_fxt_wait_initialisation();
+#endif //STARPU_USE_FXT
+
+	{
+		_STARPU_MPI_TRACE_START(argc_argv->rank, argc_argv->world_size);
+#ifdef STARPU_USE_FXT
+		starpu_profiling_set_id(argc_argv->rank);
+#endif //STARPU_USE_FXT
+	}
+
+	_starpu_mpi_add_sync_point_in_fxt();
+	_starpu_mpi_comm_amounts_init(argc_argv->comm);
+	_starpu_mpi_cache_init(argc_argv->comm);
+	_starpu_mpi_select_node_init();
+	_starpu_mpi_datatype_init();
+
+	/* notify the main thread that the progression thread is ready */
+	STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
+	running = 1;
+	STARPU_PTHREAD_COND_SIGNAL(&progress_cond);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
+
+	while (1)
+	{
+		struct callback_lfstack_cell_s* c = callback_lfstack_pop(&callback_stack);
+		int err=0;
+
+		if(running || pending_request>0)
+		{/* shall we block ? */
+			err = starpu_sem_wait(&callback_sem);
+			//running pending_request can change while waiting
+		}
+		if(c==NULL)
+		{
+			c = callback_lfstack_pop(&callback_stack);
+			if (c == NULL)
+			{
+				if(running && pending_request>0)
+				{
+					STARPU_ASSERT_MSG(c!=NULL, "Callback thread awakened without callback ready with error %d.",err);
+				}
+				else
+				{
+					if (pending_request==0)
+						break;
+				}
+				continue;
+			}
+		}
+
+
+		c->req->callback(c->req->callback_arg);
+		if (c->req->detached)
+		{
+			_starpu_mpi_request_destroy(c->req);
+		}
+		else
+		{
+			c->req->completed=1;
+			piom_cond_signal(&(c->req->req_cond), REQ_FINALIZED);
+		}
+		STARPU_ATOMIC_ADD( &pending_request, -1);
+		/* we signal that the request is completed.*/
+
+		free(c);
+
+	}
+	STARPU_ASSERT_MSG(callback_lfstack_pop(&callback_stack)==NULL, "List of callback not empty.");
+	STARPU_ASSERT_MSG(pending_request==0, "Request still pending.");
+
+	if (argc_argv->initialize_mpi)
+	{
+		_STARPU_MPI_DEBUG(3, "Calling MPI_Finalize()\n");
+		MPI_Finalize();
+	}
+
+	starpu_sem_destroy(&callback_sem);
+	free(argc_argv);
+	return NULL;
+}
+
+/********************************************************/
+/*                                                      */
+/*  (De)Initialization methods                          */
+/*                                                      */
+/********************************************************/
+
+// #ifdef STARPU_MPI_ACTIVITY
+// static int hookid = - 1;
+// #endif /* STARPU_MPI_ACTIVITY */
+
+static void _starpu_mpi_add_sync_point_in_fxt(void)
+{
+#ifdef STARPU_USE_FXT
+	int rank;
+	int worldsize;
+	int ret;
+
+	starpu_mpi_comm_rank(MPI_COMM_WORLD, &rank);
+	starpu_mpi_comm_size(MPI_COMM_WORLD, &worldsize);
+
+	ret = MPI_Barrier(MPI_COMM_WORLD);
+	STARPU_MPI_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Barrier returning %s", _starpu_mpi_get_mpi_error_code(ret));
+
+	/* We generate a "unique" key so that we can make sure that different
+	 * FxT traces come from the same MPI run. */
+	int random_number;
+
+	/* XXX perhaps we don't want to generate a new seed if the application
+	 * specified some reproductible behaviour ? */
+	if (rank == 0)
+	{
+		srand(time(NULL));
+		random_number = rand();
+	}
+
+	ret = MPI_Bcast(&random_number, 1, MPI_INT, 0, MPI_COMM_WORLD);
+	STARPU_MPI_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Bcast returning %s", _starpu_mpi_get_mpi_error_code(ret));
+
+	_STARPU_MPI_TRACE_BARRIER(rank, worldsize, random_number);
+
+	_STARPU_MPI_DEBUG(3, "unique key %x\n", random_number);
+#endif
+}
+
+int _starpu_mpi_progress_init(struct _starpu_mpi_argc_argv *argc_argv)
+{
+        STARPU_PTHREAD_MUTEX_INIT(&progress_mutex, NULL);
+        STARPU_PTHREAD_COND_INIT(&progress_cond, NULL);
+
+	starpu_sem_init(&callback_sem, 0, 0);
+	running = 0;
+	mpi_thread_cpuid = starpu_get_env_number_default("STARPU_MPI_THREAD_CPUID", -1);
+
+	STARPU_PTHREAD_CREATE(&progress_thread, NULL, _starpu_mpi_progress_thread_func, argc_argv);
+
+        STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
+        while (!running)
+                STARPU_PTHREAD_COND_WAIT(&progress_cond, &progress_mutex);
+        STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
+
+        return 0;
+}
+
+void _starpu_mpi_progress_shutdown(int *value)
+{
+	/* kill the progression thread */
+        STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
+        running = 0;
+        STARPU_PTHREAD_COND_BROADCAST(&progress_cond);
+        STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
+
+	starpu_sem_post(&callback_sem);
+
+	starpu_pthread_join(progress_thread, &value);
+
+        STARPU_PTHREAD_MUTEX_DESTROY(&progress_mutex);
+        STARPU_PTHREAD_COND_DESTROY(&progress_cond);
+}
+
+
+int starpu_mpi_comm_get_attr(MPI_Comm comm, int keyval, void *attribute_val, int *flag)
+{
+	(void) comm;
+	if (keyval == STARPU_MPI_TAG_UB)
+	{
+		const int64_t starpu_tag_max = INT64_MAX;
+		const nm_tag_t nm_tag_max = NM_TAG_MAX;
+		/* manage case where nmad max tag causes overflow if represented as starpu tag */
+		*(int64_t *)attribute_val = (nm_tag_max > starpu_tag_max) ? starpu_tag_max : nm_tag_max;
+		*flag = 1;
+	}
+	else
+	{
+		*flag = 0;
+	}
+	return 0;
+}
+
+#endif /* STARPU_USE_MPI_NMAD*/

Plik diff jest za duży
+ 18 - 1620
mpi/src/starpu_mpi.c


+ 11 - 6
mpi/src/starpu_mpi_init.c

@@ -24,12 +24,7 @@
 #include <starpu_profiling.h>
 #include <starpu_mpi_stats.h>
 #include <starpu_mpi_cache.h>
-#include <starpu_mpi_sync_data.h>
-#include <starpu_mpi_early_data.h>
-#include <starpu_mpi_early_request.h>
 #include <starpu_mpi_select_node.h>
-#include <starpu_mpi_tag.h>
-#include <starpu_mpi_comm.h>
 #include <common/config.h>
 #include <common/thread.h>
 #include <datawizard/interfaces/data_interface.h>
@@ -37,6 +32,11 @@
 #include <core/simgrid.h>
 #include <core/task.h>
 
+#if defined(STARPU_USE_MPI_MPI)
+#include <mpi/starpu_mpi_comm.h>
+#include <mpi/starpu_mpi_tag.h>
+#endif
+
 #ifdef STARPU_SIMGRID
 static int _mpi_world_size;
 static int _mpi_world_rank;
@@ -69,6 +69,10 @@ void _starpu_mpi_do_initialize(struct _starpu_mpi_argc_argv *argc_argv)
 	if (argc_argv->initialize_mpi)
 	{
 		int thread_support;
+#ifdef STARPU_USE_MPI_NMAD
+		/* strat_prio is preferred for StarPU instead of default strat_aggreg */
+		setenv("NMAD_STRATEGY", "prio", 0 /* do not overwrite user-supplied value, if set */);
+#endif /* STARPU_USE_MPI_NMAD */
 		_STARPU_DEBUG("Calling MPI_Init_thread\n");
 		if (MPI_Init_thread(argc_argv->argc, argc_argv->argv, MPI_THREAD_SERIALIZED, &thread_support) != MPI_SUCCESS)
 		{
@@ -182,8 +186,10 @@ int starpu_mpi_shutdown(void)
 	_starpu_mpi_comm_amounts_display(stderr, rank);
 	_starpu_mpi_comm_amounts_shutdown();
 	_starpu_mpi_cache_shutdown(world_size);
+#if defined(STARPU_USE_MPI_MPI)
 	_starpu_mpi_tag_shutdown();
 	_starpu_mpi_comm_shutdown();
+#endif
 
 	return 0;
 }
@@ -233,4 +239,3 @@ int starpu_mpi_world_rank(void)
 	starpu_mpi_comm_rank(MPI_COMM_WORLD, &rank);
 	return rank;
 }
-

+ 61 - 21
mpi/src/starpu_mpi_private.h

@@ -26,6 +26,11 @@
 #include <common/list.h>
 #include <common/prio_list.h>
 #include <core/simgrid.h>
+#if defined(STARPU_USE_MPI_NMAD)
+#include <pioman.h>
+#include <nm_sendrecv_interface.h>
+#include <nm_session_interface.h>
+#endif
 
 #ifdef __cplusplus
 extern "C"
@@ -152,22 +157,12 @@ int _starpu_debug_rank;
 #  define _STARPU_MPI_LOG_OUT()
 #endif
 
+#if defined(STARPU_USE_MPI_MPI)
 extern int _starpu_mpi_tag;
 #define _STARPU_MPI_TAG_ENVELOPE  _starpu_mpi_tag
 #define _STARPU_MPI_TAG_DATA      _starpu_mpi_tag+1
 #define _STARPU_MPI_TAG_SYNC_DATA _starpu_mpi_tag+2
 
-enum _starpu_mpi_request_type
-{
-	SEND_REQ=0,
-	RECV_REQ=1,
-	WAIT_REQ=2,
-	TEST_REQ=3,
-	BARRIER_REQ=4,
-	PROBE_REQ=5,
-	UNKNOWN_REQ=6,
-};
-
 #define _STARPU_MPI_ENVELOPE_DATA       0
 #define _STARPU_MPI_ENVELOPE_SYNC_READY 1
 
@@ -178,8 +173,18 @@ struct _starpu_mpi_envelope
 	starpu_mpi_tag_t data_tag;
 	unsigned sync;
 };
+#endif /* STARPU_USE_MPI_MPI */
 
-struct _starpu_mpi_req;
+enum _starpu_mpi_request_type
+{
+	SEND_REQ=0,
+	RECV_REQ=1,
+	WAIT_REQ=2,
+	TEST_REQ=3,
+	BARRIER_REQ=4,
+	PROBE_REQ=5,
+	UNKNOWN_REQ=6,
+};
 
 struct _starpu_mpi_node_tag
 {
@@ -196,6 +201,7 @@ struct _starpu_mpi_data
 	int cache_received;
 };
 
+struct _starpu_mpi_req;
 LIST_TYPE(_starpu_mpi_req,
 	/* description of the data at StarPU level */
 	starpu_data_handle_t data_handle;
@@ -211,20 +217,36 @@ LIST_TYPE(_starpu_mpi_req,
 
 	/* who are we talking to ? */
 	struct _starpu_mpi_node_tag node_tag;
+#if defined(STARPU_USE_MPI_NMAD)
+	nm_gate_t gate;
+	nm_session_t session;
+#endif
 
 	void (*func)(struct _starpu_mpi_req *);
 
 	MPI_Status *status;
+#if defined(STARPU_USE_MPI_NMAD)
+	nm_sr_request_t data_request;
+	int waited;
+#elif defined(STARPU_USE_MPI_MPI)
 	MPI_Request data_request;
+#endif
+
 	int *flag;
 	unsigned sync;
 
 	int ret;
+#if defined(STARPU_USE_MPI_NMAD)
+	piom_cond_t req_cond;
+#elif defined(STARPU_USE_MPI_MPI)
 	starpu_pthread_mutex_t req_mutex;
 	starpu_pthread_cond_t req_cond;
-
 	starpu_pthread_mutex_t posted_mutex;
 	starpu_pthread_cond_t posted_cond;
+	/* In the case of a Wait/Test request, we are going to post a request
+	 * to test the completion of another request */
+	struct _starpu_mpi_req *other_request;
+#endif
 
 	enum _starpu_mpi_request_type request_type; /* 0 send, 1 recv */
 
@@ -232,41 +254,59 @@ LIST_TYPE(_starpu_mpi_req,
 	unsigned completed;
 	unsigned posted;
 
-	/* In the case of a Wait/Test request, we are going to post a request
-	 * to test the completion of another request */
-	struct _starpu_mpi_req *other_request;
-
 	/* in the case of detached requests */
 	int detached;
 	void *callback_arg;
 	void (*callback)(void *);
 
         /* in the case of user-defined datatypes, we need to send the size of the data */
+#if defined(STARPU_USE_MPI_NMAD)
+	nm_sr_request_t size_req;
+#elif defined(STARPU_USE_MPI_MPI)
 	MPI_Request size_req;
+#endif
 
-        struct _starpu_mpi_envelope* envelope;
+#if defined(STARPU_USE_MPI_MPI)
+	struct _starpu_mpi_envelope* envelope;
 
 	unsigned is_internal_req:1;
 	unsigned to_destroy:1;
 	struct _starpu_mpi_req *internal_req;
 	struct _starpu_mpi_early_data_handle *early_data_handle;
+     	UT_hash_handle hh;
+#endif
 
 	int sequential_consistency;
 
 	long pre_sync_jobid;
 	long post_sync_jobid;
 
-     	UT_hash_handle hh;
-
 #ifdef STARPU_SIMGRID
         MPI_Status status_store;
 	starpu_pthread_queue_t queue;
 	unsigned done;
 #endif
-
 );
 PRIO_LIST_TYPE(_starpu_mpi_req, prio)
 
+struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle_t data_handle,
+						       int srcdst, starpu_mpi_tag_t data_tag, MPI_Comm comm,
+						       unsigned detached, unsigned sync, int prio, void (*callback)(void *), void *arg,
+						       enum _starpu_mpi_request_type request_type, void (*func)(struct _starpu_mpi_req *),
+						       enum starpu_data_access_mode mode,
+						       int sequential_consistency,
+						       int is_internal_req,
+						       starpu_ssize_t count);
+
+void _starpu_mpi_submit_ready_request_inc(struct _starpu_mpi_req *req);
+void _starpu_mpi_request_init(struct _starpu_mpi_req **req);
+void _starpu_mpi_request_destroy(struct _starpu_mpi_req *req);
+void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req);
+void _starpu_mpi_irecv_size_func(struct _starpu_mpi_req *req);
+int _starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status);
+int _starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status);
+int _starpu_mpi_barrier(MPI_Comm comm);
+
 struct _starpu_mpi_argc_argv
 {
 	int initialize_mpi;

+ 5 - 2
mpi/tests/Makefile.am

@@ -157,8 +157,12 @@ starpu_mpi_TESTS +=				\
 	tags_checking				\
 	sync					\
 	gather					\
-	gather2					\
+	gather2
+
+if STARPU_USE_MPI_MPI
+starpu_mpi_TESTS +=				\
 	load_balancer
+endif
 
 # Expected to fail
 starpu_mpi_TESTS +=				\
@@ -224,7 +228,6 @@ noinst_PROGRAMS =				\
 	starpu_redefine				\
 	load_balancer
 
-
 XFAIL_TESTS=					\
 	policy_register_toomany			\
 	policy_unregister			\

+ 1 - 1
mpi/tests/load_balancer.c

@@ -18,7 +18,7 @@
 #include <starpu_mpi_lb.h>
 #include "helper.h"
 
-#if !defined(STARPU_HAVE_UNSETENV)
+#if !defined(STARPU_HAVE_UNSETENV) || !defined(STARPU_USE_MPI_MPI)
 
 #warning unsetenv is not defined. Skipping test
 int main(int argc, char **argv)