Pārlūkot izejas kodu

Begin to add a MPI driver

Corentin Salingue 8 gadi atpakaļ
vecāks
revīzija
9e1a892b56

+ 2 - 1
include/schedulers/starpu_heteroprio.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2015  INRIA
+ * Copyright (C) 2015, 2016  INRIA
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -54,6 +54,7 @@ static const unsigned starpu_heteroprio_types_to_arch[STARPU_NB_TYPES+1] =
 	STARPU_OPENCL,
 	STARPU_MIC,
 	STARPU_SCC,
+    STARPU_MPI,
 	0
 };
 

+ 5 - 1
include/starpu.h

@@ -2,7 +2,7 @@
  *
  * Copyright (C) 2009-2014, 2016  Université de Bordeaux
  * Copyright (C) 2010-2015  CNRS
- * Copyright (C) 2014  INRIA
+ * Copyright (C) 2014, 2016  INRIA
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -90,6 +90,7 @@ struct starpu_conf
 	int nopencl;
 	int nmic;
 	int nscc;
+    int nmpi_ms;
 
 	unsigned use_explicit_workers_bindid;
 	unsigned workers_bindid[STARPU_NMAXWORKERS];
@@ -106,6 +107,9 @@ struct starpu_conf
 	unsigned use_explicit_workers_scc_deviceid;
 	unsigned workers_scc_deviceid[STARPU_NMAXWORKERS];
 
+	unsigned use_explicit_workers_mpi_deviceid;
+	unsigned workers_mpi_deviceid[STARPU_NMAXWORKERS];
+
 	int bus_calibrate;
 	int calibrate;
 

+ 3 - 1
include/starpu_data.h

@@ -2,6 +2,7 @@
  *
  * Copyright (C) 2010-2016  Université de Bordeaux
  * Copyright (C) 2010, 2011, 2012, 2013, 2014, 2015  CNRS
+ * Copyright (C) 2016  Inria
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -114,7 +115,8 @@ enum starpu_node_kind
 	STARPU_DISK_RAM   = 0x04,
 	STARPU_MIC_RAM    = 0x05,
 	STARPU_SCC_RAM    = 0x06,
-	STARPU_SCC_SHM    = 0x07
+	STARPU_SCC_SHM    = 0x07,
+	STARPU_MPI_MS_RAM = 0x08
 
 };
 

+ 2 - 1
include/starpu_task.h

@@ -3,7 +3,7 @@
  * Copyright (C) 2010-2016  Université de Bordeaux
  * Copyright (C) 2010, 2011, 2012, 2013, 2014  CNRS
  * Copyright (C) 2011  Télécom-SudParis
- * Copyright (C) 2011, 2014  INRIA
+ * Copyright (C) 2011, 2014, 2016  INRIA
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -42,6 +42,7 @@ extern "C"
 #define STARPU_OPENCL	((1ULL)<<6)
 #define STARPU_MIC	((1ULL)<<7)
 #define STARPU_SCC	((1ULL)<<8)
+#define STARPU_MPI	((1ULL)<<9)
 
 #define STARPU_CODELET_SIMGRID_EXECUTE	(1<<0)
 #define STARPU_CUDA_ASYNC	(1<<0)

+ 2 - 0
include/starpu_worker.h

@@ -2,6 +2,7 @@
  *
  * Copyright (C) 2009-2013, 2016  Université de Bordeaux
  * Copyright (C) 2010-2014  CNRS
+ * Copyright (C) 2016  INRIA
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -34,6 +35,7 @@ enum starpu_worker_archtype
 	STARPU_OPENCL_WORKER,
 	STARPU_MIC_WORKER,
 	STARPU_SCC_WORKER,
+	STARPU_MPI_WORKER,
 	STARPU_ANY_WORKER
 };
 

+ 16 - 0
src/Makefile.am

@@ -135,6 +135,9 @@ noinst_HEADERS = 						\
 	drivers/scc/driver_scc_common.h				\
 	drivers/scc/driver_scc_source.h				\
 	drivers/scc/driver_scc_sink.h				\
+	drivers/mpi/driver_mpi_common.h				\
+	drivers/mpi/driver_mpi_source.h				\
+	drivers/mpi/driver_mpi_sink.h				\
 	drivers/disk/driver_disk.h				\
 	debug/traces/starpu_fxt.h				\
 	profiling/bound.h					\
@@ -366,6 +369,19 @@ libstarpu_@STARPU_EFFECTIVE_VERSION@_la_SOURCES += drivers/mic/driver_mic_utils.
 endif
 
 #########################################
+#                                       # 	 
+#     MPI Master/Slave compilation      # 	 
+#                                       # 	 
+######################################### 	 
+
+if STARPU_USE_MPI_MASTER_SLAVE 	 
+libstarpu_@STARPU_EFFECTIVE_VERSION@_la_SOURCES += drivers/mpi/driver_mpi_common.c 	 
+libstarpu_@STARPU_EFFECTIVE_VERSION@_la_SOURCES += drivers/mpi/driver_mpi_source.c 	 
+libstarpu_@STARPU_EFFECTIVE_VERSION@_la_SOURCES += drivers/mpi/driver_mpi_sink.c 	 
+endif 	 
+
+
+#########################################
 
 showcheck:
 	-cat /dev/null

+ 2 - 0
src/common/fxt.h

@@ -2,6 +2,7 @@
  *
  * Copyright (C) 2009-2016  Université de Bordeaux
  * Copyright (C) 2010, 2011, 2012, 2013, 2014, 2016  CNRS
+ * Copyright (C) 2016  Inria
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -40,6 +41,7 @@
 #define _STARPU_FUT_OPENCL_KEY	0x103
 #define _STARPU_FUT_MIC_KEY	0x104
 #define _STARPU_FUT_SCC_KEY	0x105
+#define _STARPU_FUT_MPI_KEY	0x106
 
 #define _STARPU_FUT_WORKER_INIT_START	0x5100
 #define _STARPU_FUT_WORKER_INIT_END	0x5101

+ 13 - 4
src/core/perfmodel/perfmodel_history.c

@@ -3,6 +3,7 @@
  * Copyright (C) 2009-2016  Université de Bordeaux
  * Copyright (C) 2010, 2011, 2012, 2013, 2014, 2015, 2016  CNRS
  * Copyright (C) 2011  Télécom-SudParis
+ * Copyright (C) 2016  Inria
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -481,6 +482,8 @@ static enum starpu_worker_archtype _get_enum_type(int type)
 			return STARPU_MIC_WORKER;
         	case 4:
 			return STARPU_SCC_WORKER;
+        	case 5:
+			return STARPU_MPI_WORKER;
 		default:
 			STARPU_ABORT();
 	}
@@ -628,7 +631,7 @@ static void dump_model_file(FILE *f, struct starpu_perfmodel *model)
 		{
 			fprintf(f, "####################\n");
 			fprintf(f, "# DEV_%d\n", dev);
-			fprintf(f, "# device type (CPU - 0, CUDA - 1, OPENCL - 2, MIC - 3, SCC - 4)\n");
+			fprintf(f, "# device type (CPU - 0, CUDA - 1, OPENCL - 2, MIC - 3, SCC - 4, MPI_MS - 5)\n");
 			fprintf(f, "%u\n", arch_combs[comb]->devices[dev].type);
 
 			fprintf(f, "####################\n");
@@ -822,11 +825,14 @@ void _starpu_initialize_registered_performance_models(void)
 	unsigned i;
 	for(i = 0; i < conf->topology.nhwmicdevices; i++)
 		nmic += conf->topology.nhwmiccores[i];
+	unsigned nmpi = 0;
+	for(i = 0; i < conf->topology.nhwmpidevices; i++)
+		nmpi += conf->topology.nhwmpicores[i];
 	unsigned nscc = conf->topology.nhwscc;
 
-	// We used to allocate 2**(ncores + ncuda + nopencl + nmic + nscc), this is too big
-	// We now allocate only 2*(ncores + ncuda + nopencl + nmic + nscc), and reallocate when necessary in starpu_perfmodel_arch_comb_add
-	nb_arch_combs = 2 * (ncores + ncuda + nopencl + nmic + nscc);
+	// We used to allocate 2**(ncores + ncuda + nopencl + nmic + nscc + nmpi), this is too big
+	// We now allocate only 2*(ncores + ncuda + nopencl + nmic + nscc + nmpi), and reallocate when necessary in starpu_perfmodel_arch_comb_add
+	nb_arch_combs = 2 * (ncores + ncuda + nopencl + nmic + nscc + nmpi);
 	arch_combs = (struct starpu_perfmodel_arch**) malloc(nb_arch_combs*sizeof(struct starpu_perfmodel_arch*));
 	current_arch_comb = 0;
 	STARPU_PTHREAD_RWLOCK_INIT(&arch_combs_mutex, NULL);
@@ -1107,6 +1113,9 @@ char* starpu_perfmodel_get_archtype_name(enum starpu_worker_archtype archtype)
 		case(STARPU_SCC_WORKER):
 			return "scc";
 			break;
+		case(STARPU_MPI_WORKER):
+			return "mpi_ms";
+			break;
 		default:
 			STARPU_ABORT();
 			break;

+ 231 - 44
src/core/topology.c

@@ -2,7 +2,7 @@
  *
  * Copyright (C) 2009-2016  Université de Bordeaux
  * Copyright (C) 2010, 2011, 2012, 2013, 2014, 2015, 2016 CNRS
- * Copyright (C) 2011  INRIA
+ * Copyright (C) 2011, 2016  INRIA
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -25,6 +25,7 @@
 #include <drivers/cuda/driver_cuda.h>
 #include <drivers/mic/driver_mic_source.h>
 #include <drivers/scc/driver_scc_source.h>
+#include <drivers/mpi/driver_mpi_source.h>
 #include <drivers/mp_common/source_common.h>
 #include <drivers/opencl/driver_opencl.h>
 #include <profiling/profiling.h>
@@ -76,6 +77,9 @@ static struct _starpu_worker_set cuda_worker_set[STARPU_MAXCUDADEVS];
 #ifdef STARPU_USE_MIC
 static struct _starpu_worker_set mic_worker_set[STARPU_MAXMICDEVS];
 #endif
+#ifdef STARPU_USE_MPI_MASTER_SLAVE
+static struct _starpu_worker_set mpi_worker_set[STARPU_MAXMPIDEVS];
+#endif
 
 void *
 _starpu_get_worker_from_driver(struct starpu_driver *d)
@@ -127,7 +131,7 @@ _starpu_get_worker_from_driver(struct starpu_driver *d)
  * Discover the topology of the machine
  */
 
-#if defined(STARPU_USE_CUDA) || defined(STARPU_USE_OPENCL) || defined(STARPU_USE_SCC)  || defined(STARPU_SIMGRID)
+#if defined(STARPU_USE_CUDA) || defined(STARPU_USE_OPENCL) || defined(STARPU_USE_SCC)  || defined(STARPU_SIMGRID) || defined(STARPU_USE_MPI_MASTER_SLAVE)
 static void
 _starpu_initialize_workers_deviceid (int *explicit_workers_gpuid,
 				  int *current, int *workers_gpuid,
@@ -385,6 +389,31 @@ static inline int _starpu_get_next_scc_deviceid(struct _starpu_machine_config *c
 }
 #endif
 
+#ifdef STARPU_USE_MPI_MASTER_SLAVE
+static inline int _starpu_get_next_mpi_deviceid(struct _starpu_machine_config *config)
+{
+	unsigned i = ((config->current_mpi_deviceid++) % config->topology.nmpidevices);
+
+	return (int)config->topology.workers_mpi_deviceid[i];
+}
+
+static void
+_starpu_init_mpi_topology (struct _starpu_machine_config *config, long mpi_idx)
+{
+	/* Discover the topology of the mpi node identifier by MPI_IDX. That
+	 * means, make this StarPU instance aware of the number of cores available
+	 * on this MPI device. Update the `nhwmpicores' topology field
+	 * accordingly. */
+
+	struct _starpu_machine_topology *topology = &config->topology;
+
+	int nbcores;
+	_starpu_src_common_sink_nbcores (mpi_ms_nodes[mpi_idx], &nbcores);
+	topology->nhwmpicores[mpi_idx] = nbcores;
+}
+
+#endif /* STARPU_USE_MPI_MASTER_SLAVE */
+
 #ifdef STARPU_USE_MIC
 static void
 _starpu_init_mic_topology (struct _starpu_machine_config *config, long mic_idx)
@@ -545,6 +574,9 @@ _starpu_init_topology (struct _starpu_machine_config *config)
 #ifdef STARPU_USE_SCC
 	config->topology.nhwscc = _starpu_scc_src_get_device_count();
 #endif
+#ifdef STARPU_USE_MPI_MASTER_SLAVE 
+    config->topology.nhwmpi = _starpu_mpi_src_get_device_count();
+#endif
 
 	topology_is_initialized = 1;
 }
@@ -823,13 +855,72 @@ _starpu_init_mic_config (struct _starpu_machine_config *config,
 	}
 
 	topology->nworkers += topology->nmiccores[mic_idx];
-    }
+}  
 
-#ifdef STARPU_USE_MIC
 static COIENGINE mic_handles[STARPU_MAXMICDEVS];
 COIPROCESS _starpu_mic_process[STARPU_MAXMICDEVS];
 #endif
 
+#ifdef STARPU_USE_MPI_MASTER_SLAVE
+static void
+_starpu_init_mpi_config (struct _starpu_machine_config *config,
+			 struct starpu_conf *user_conf,
+			 unsigned mpi_idx)
+{
+	struct _starpu_machine_topology *topology = &config->topology;
+
+	topology->nhwmpicores[mpi_idx] = 0;
+
+	_starpu_init_mpi_topology (config, mpi_idx);
+
+	int nmpicores;
+	nmpicores = starpu_get_env_number("STARPU_NMPITHREADS");
+
+	if (nmpicores == -1)
+	{
+		/* Nothing was specified, so let's use the number of
+		 * detected mpi cores. ! */
+		nmpicores = topology->nhwmpicores[mpi_idx];
+	}
+	else
+	{
+		if ((unsigned) nmpicores > topology->nhwmpicores[mpi_idx])
+		{
+			/* The user requires more MPI cores than there is available */
+			fprintf(stderr,
+				"# Warning: %d MPI cores requested. Only %d available.\n",
+				nmpicores, topology->nhwmpicores[mpi_idx]);
+			nmpicores = topology->nhwmpicores[mpi_idx];
+		}
+	}
+
+	topology->nmpicores[mpi_idx] = nmpicores;
+	STARPU_ASSERT_MSG(topology->nmpicores[mpi_idx] + topology->nworkers <= STARPU_NMAXWORKERS,
+			  "topology->nmpicores[mpi_idx(%d)] (%d) + topology->nworkers (%d) <= STARPU_NMAXWORKERS (%d)",
+			  mpi_idx, topology->nmpicores[mpi_idx], topology->nworkers, STARPU_NMAXWORKERS);
+
+	mpi_worker_set[mpi_idx].workers = &config->workers[topology->nworkers];
+	unsigned mpicore_id;
+	for (mpicore_id = 0; mpicore_id < topology->nmpicores[mpi_idx]; mpicore_id++)
+	{
+		int worker_idx = topology->nworkers + mpicore_id;
+		config->workers[worker_idx].set = &mpi_worker_set[mpi_idx];
+		config->workers[worker_idx].arch = STARPU_MPI_WORKER;
+		config->workers[worker_idx].perf_arch.devices = (struct starpu_perfmodel_device *) malloc(sizeof(struct starpu_perfmodel_device));
+		config->workers[worker_idx].perf_arch.ndevices = 1;
+		config->workers[worker_idx].perf_arch.devices[0].type = STARPU_MPI_WORKER;
+		config->workers[worker_idx].perf_arch.devices[0].devid = mpi_idx;
+		config->workers[worker_idx].perf_arch.devices[0].ncores = 1;
+		config->workers[worker_idx].devid = mpi_idx;
+		config->workers[worker_idx].subworkerid = mpicore_id;
+		config->workers[worker_idx].worker_mask = STARPU_MPI;
+		config->worker_mask |= STARPU_MPI;
+	}
+
+	topology->nworkers += topology->nmpicores[mpi_idx];
+}  
+#endif
+
 static void
 _starpu_init_mp_config (struct _starpu_machine_config *config,
 			struct starpu_conf *user_conf)
@@ -843,49 +934,87 @@ _starpu_init_mp_config (struct _starpu_machine_config *config,
 
 	struct _starpu_machine_topology *topology = &config->topology;
 
-	// We currently only support MIC at this level.
 #ifdef STARPU_USE_MIC
+    {
+        /* Discover and initialize the number of MIC nodes through the mp
+         * infrastructure. */
+        unsigned nhwmicdevices = _starpu_mic_src_get_device_count();
+
+        int reqmicdevices = starpu_get_env_number("STARPU_NMIC");
+        if (reqmicdevices == -1 && user_conf)
+            reqmicdevices = user_conf->nmic;
+        if (reqmicdevices == -1)
+            reqmicdevices = nhwmicdevices;
+
+        if (reqmicdevices == -1)
+        {
+            /* Nothing was specified, so let's use the number of
+             * detected mic devices. ! */
+            reqmicdevices = nhwmicdevices;
+        }
+        else
+        {
+            if ((unsigned) reqmicdevices > nhwmicdevices)
+            {
+                /* The user requires more MIC devices than there is available */
+                fprintf(stderr,
+                    "# Warning: %d MIC devices requested. Only %d available.\n",
+                    reqmicdevices, nhwmicdevices);
+                reqmicdevices = nhwmicdevices;
+            }
+        }
 
-	/* Discover and initialize the number of MIC nodes through the mp
-	 * infrastructure. */
-	unsigned nhwmicdevices = _starpu_mic_src_get_device_count();
+        topology->nmicdevices = 0;
+        unsigned i;
+        for (i = 0; i < (unsigned) reqmicdevices; i++)
+            if (0 == _starpu_init_mic_node (config, i, &mic_handles[i], &_starpu_mic_process[i]))
+                topology->nmicdevices++;
 
-	int reqmicdevices = starpu_get_env_number("STARPU_NMIC");
-	if (reqmicdevices == -1 && user_conf)
-		reqmicdevices = user_conf->nmic;
-	if (reqmicdevices == -1)
-		reqmicdevices = nhwmicdevices;
 
-	if (reqmicdevices == -1)
-	{
-		/* Nothing was specified, so let's use the number of
-		 * detected mic devices. ! */
-		reqmicdevices = nhwmicdevices;
-	}
-	else
-	{
-		if ((unsigned) reqmicdevices > nhwmicdevices)
-		{
-			/* The user requires more MIC devices than there is available */
-			fprintf(stderr,
-				"# Warning: %d MIC devices requested. Only %d available.\n",
-				reqmicdevices, nhwmicdevices);
-			reqmicdevices = nhwmicdevices;
-		}
-	}
+        for (i = 0; i < topology->nmicdevices; i++)
+            _starpu_init_mic_config (config, user_conf, i);
+    }
+#endif
+#ifdef STARPU_USE_MPI_MASTER_SLAVE
+    {
+        /* Discover and initialize the number of MIC nodes through the mp
+         * infrastructure. */
+        unsigned nhwmpidevices = _starpu_mpi_src_get_device_count();
+
+        int reqmpidevices = starpu_get_env_number("STARPU_NMPI_MS");
+        if (reqmpidevices == -1 && user_conf)
+            reqmpidevices = user_conf->nmpi_ms;
+        if (reqmpidevices == -1)
+            /* Nothing was specified, so let's use the number of
+             * detected mpi devices. ! */
+            reqmpidevices = nhwmpidevices;
+
+        if (reqmpidevices != -1)
+        {
+            if ((unsigned) reqmpidevices > nhwmpidevices)
+            {
+                /* The user requires more MPI devices than there is available */
+                fprintf(stderr,
+                    "# Warning: %d MPI Master-Slave devices requested. Only %d available.\n",
+                    reqmpidevices, nhwmpidevices);
+                reqmpidevices = nhwmpidevices;
+            }
+        }
 
-	topology->nmicdevices = 0;
-	unsigned i;
-	for (i = 0; i < (unsigned) reqmicdevices; i++)
-		if (0 == _starpu_init_mic_node (config, i, &mic_handles[i], &_starpu_mic_process[i]))
-			topology->nmicdevices++;
+        topology->nmpidevices = reqmpidevices;
 
+        unsigned i;
+        for (i = 0; i < topology->nmpidevices; i++)
+            mpi_ms_nodes[i] = _starpu_mp_common_node_create(STARPU_MPI_SOURCE, i);
 
-	for (i = 0; i < topology->nmicdevices; i++)
-		_starpu_init_mic_config (config, user_conf, i);
+
+        for (i = 0; i < topology->nmpidevices; i++)
+            _starpu_init_mpi_config (config, user_conf, i);
+    }
 #endif
 }
 
+#ifdef STARPU_USE_MIC
 static void
 _starpu_deinit_mic_node (unsigned mic_idx)
 {
@@ -895,6 +1024,17 @@ _starpu_deinit_mic_node (unsigned mic_idx)
 
 	_starpu_mp_common_node_destroy(mic_nodes[mic_idx]);
 }
+#endif
+
+#ifdef STARPU_USE_MPI_MASTER_SLAVE
+static void _starpu_deinit_mpi_node(int devid)
+{
+    _starpu_mp_common_send_command(mpi_ms_nodes[devid], STARPU_EXIT, NULL, 0);                          
+
+    _starpu_mp_common_node_destroy(mpi_ms_nodes[devid]);
+}
+#endif
+
 
 static void
 _starpu_deinit_mp_config (struct _starpu_machine_config *config)
@@ -902,11 +1042,16 @@ _starpu_deinit_mp_config (struct _starpu_machine_config *config)
 	struct _starpu_machine_topology *topology = &config->topology;
 	unsigned i;
 
+#ifdef STARPU_USE_MIC
 	for (i = 0; i < topology->nmicdevices; i++)
 		_starpu_deinit_mic_node (i);
 	_starpu_mic_clear_kernels();
-}
 #endif
+#ifdef STARPU_USE_MPI_MASTER_SLAVE
+	for (i = 0; i < topology->nmpidevices; i++)
+		_starpu_deinit_mpi_node (i);
+#endif
+}
 
 static int
 _starpu_init_machine_config(struct _starpu_machine_config *config, int no_mp_config STARPU_ATTRIBUTE_UNUSED)
@@ -942,6 +1087,10 @@ _starpu_init_machine_config(struct _starpu_machine_config *config, int no_mp_con
 	for (i = 0; i < (int) (sizeof(mic_worker_set)/sizeof(mic_worker_set[0])); i++)
 		mic_worker_set[i].workers = NULL;
 #endif
+#ifdef STARPU_USE_MPI_MASTER_SLAVE
+	for (i = 0; i < (int) (sizeof(mpi_worker_set)/sizeof(mpi_worker_set[0])); i++)
+		mpi_worker_set[i].workers = NULL;
+#endif
 
 #if defined(STARPU_USE_CUDA) || defined(STARPU_SIMGRID)
 	int ncuda = config->conf.ncuda;
@@ -1143,12 +1292,12 @@ _starpu_init_machine_config(struct _starpu_machine_config *config, int no_mp_con
 	{
 		config->workers[topology->nworkers + sccdev].arch = STARPU_SCC_WORKER;
 		int devid = _starpu_get_next_scc_deviceid(config);
-		config->workers[topology->nworkers + sccdev].perf_arch.devices = (struct starpu_perfmodel_device)malloc(sizeof(struct starpu_perfmodel_device));
+		config->workers[topology->nworkers + sccdev].perf_arch.devices = (struct starpu_perfmodel_device *)malloc(sizeof(struct starpu_perfmodel_device));
 		config->workers[topology->nworkers + sccdev].perf_arch.ndevices = 1;
 
 		config->workers[topology->nworkers + sccdev].perf_arch.devices[0].type = STARPU_SCC_WORKER;
 		config->workers[topology->nworkers + sccdev].perf_arch.devices[0].devid = sccdev;
-		config->workers[topology->nworkers + sccdev].perf_arch.devices[0].ncore = 1;
+		config->workers[topology->nworkers + sccdev].perf_arch.devices[0].ncores = 1;
 		config->workers[topology->nworkers + sccdev].subworkerid = 0;
 		config->workers[topology->nworkers + sccdev].devid = devid;
 		config->workers[topology->nworkers + sccdev].worker_mask = STARPU_SCC;
@@ -1161,10 +1310,9 @@ _starpu_init_machine_config(struct _starpu_machine_config *config, int no_mp_con
 	topology->nworkers += topology->nsccdevices;
 #endif /* STARPU_USE_SCC */
 
-
 	/* Unless not requested, we need to complete configuration with the
 	 * ones of the mp nodes. */
-#ifdef STARPU_USE_MIC
+#if defined(STARPU_USE_MIC) || defined(STARPU_USE_MPI_MASTER_SLAVE)
 	if (! no_mp_config)
 	    _starpu_init_mp_config (config, &config->conf);
 #endif
@@ -1464,6 +1612,11 @@ _starpu_init_workers_binding (struct _starpu_machine_config *config, int no_mp_c
 	unsigned mic_memory_nodes[STARPU_MAXMICDEVS];
 	unsigned mic_bindid[STARPU_MAXMICDEVS];
 #endif
+#ifdef STARPU_USE_MPI_MASTER_SLAVE
+	unsigned mpi_init[STARPU_MAXMPIDEVS] = { };
+	unsigned mpi_memory_nodes[STARPU_MAXMPIDEVS];
+	unsigned mpi_bindid[STARPU_MAXMPIDEVS];
+#endif
 	unsigned bindid;
 
 	for (bindid = 0; bindid < config->nbindid; bindid++)
@@ -1480,7 +1633,7 @@ _starpu_init_workers_binding (struct _starpu_machine_config *config, int no_mp_c
 		struct _starpu_worker *workerarg = &config->workers[worker];
 		unsigned devid = workerarg->devid;
 
-#if defined(STARPU_USE_CUDA) || defined(STARPU_USE_OPENCL) || defined(STARPU_USE_MIC) || defined(STARPU_SIMGRID)
+#if defined(STARPU_USE_CUDA) || defined(STARPU_USE_OPENCL) || defined(STARPU_USE_MIC) || defined(STARPU_SIMGRID) || defined(STARPU_USE_MPI_MASTER_SLAVE)
 		/* Perhaps the worker has some "favourite" bindings  */
 		int *preferred_binding = NULL;
 		int npreferred = 0;
@@ -1667,7 +1820,34 @@ _starpu_init_workers_binding (struct _starpu_machine_config *config, int no_mp_c
 #endif
 			}
 				break;
+#endif /* STARPU_USE_SCC */
+
+#ifdef STARPU_USE_MPI_MASTER_SLAVE
+			case STARPU_MPI_WORKER:
+			{
+				if (mpi_init[devid])
+				{
+					memory_node = mpi_memory_nodes[devid];
+				}
+				else
+				{
+					mpi_init[devid] = 1;
+					mpi_bindid[devid] = _starpu_get_next_bindid(config, preferred_binding, npreferred);
+					memory_node = mpi_memory_nodes[devid] = _starpu_memory_node_register(STARPU_MPI_MS_RAM, devid);
+					_starpu_register_bus(STARPU_MAIN_RAM, memory_node);
+					_starpu_register_bus(memory_node, STARPU_MAIN_RAM);
+
+				}
+				workerarg->bindid = mpi_bindid[devid];
+				_starpu_memory_node_add_nworkers(memory_node);
+#ifdef STARPU_SIMGRID
+				starpu_pthread_queue_register(&workerarg->set->workers[0].wait, &_starpu_simgrid_transfer_queue[memory_node]);
+				starpu_pthread_queue_register(&workerarg->set->workers[0].wait, &_starpu_simgrid_transfer_queue[STARPU_MAIN_RAM]);
 #endif
+				break;
+			}
+#endif /* STARPU_USE_MPI_MASTER_SLAVE */
+
 
 			default:
 				STARPU_ABORT();
@@ -1752,6 +1932,7 @@ _starpu_build_topology (struct _starpu_machine_config *config, int no_mp_config)
 	config->opencl_nodeid = -1;
 	config->mic_nodeid = -1;
 	config->scc_nodeid = -1;
+    config->mpi_nodeid = -1;
 	for (i = 0; i < starpu_worker_get_count(); i++)
 	{
 		switch (starpu_worker_get_type(i))
@@ -1786,6 +1967,12 @@ _starpu_build_topology (struct _starpu_machine_config *config, int no_mp_config)
 				else if (config->scc_nodeid != (int) starpu_worker_get_memory_node(i))
 					config->scc_nodeid = -2;
 				break;
+			case STARPU_MPI_WORKER:
+				if (config->mpi_nodeid == -1)
+					config->mpi_nodeid = starpu_worker_get_memory_node(i);
+				else if (config->mpi_nodeid != (int) starpu_worker_get_memory_node(i))
+					config->mpi_nodeid = -2;
+				break;
 			case STARPU_ANY_WORKER:
 				STARPU_ASSERT(0);
 		}
@@ -1796,7 +1983,7 @@ _starpu_build_topology (struct _starpu_machine_config *config, int no_mp_config)
 
 void _starpu_destroy_topology(struct _starpu_machine_config *config STARPU_ATTRIBUTE_UNUSED)
 {
-#ifdef STARPU_USE_MIC
+#if defined(STARPU_USE_MIC) || defined(STARPU_USE_MPI_MASTER_SLAVE)
 	_starpu_deinit_mp_config(config);
 #endif
 

+ 53 - 1
src/core/workers.c

@@ -36,6 +36,7 @@
 #include <top/starpu_top_core.h>
 #include <drivers/mp_common/sink_common.h>
 #include <drivers/scc/driver_scc_common.h>
+#include <drivers/mpi/driver_mpi_common.h>
 
 #include <drivers/cpu/driver_cpu.h>
 #include <drivers/cuda/driver_cuda.h>
@@ -632,7 +633,7 @@ static void _starpu_launch_drivers(struct _starpu_machine_config *pconfig)
 	{
 		struct _starpu_worker *workerarg = &pconfig->workers[worker];
 		unsigned devid = workerarg->devid;
-#if defined(STARPU_USE_MIC) || defined(STARPU_USE_CUDA) || defined(STARPU_SIMGRID)
+#if defined(STARPU_USE_MIC) || defined(STARPU_USE_CUDA) || defined(STARPU_SIMGRID) || defined(STARPU_USE_MPI_MASTER_SLAVE)
 		struct _starpu_worker_set *worker_set = workerarg->set;
 #endif
 
@@ -797,8 +798,45 @@ static void _starpu_launch_drivers(struct _starpu_machine_config *pconfig)
 				STARPU_PTHREAD_MUTEX_UNLOCK(&workerarg->mutex);
 #endif
 				break;
+#endif /* STARPU_USE_SCC */
+#ifdef STARPU_USE_MPI_MASTER_SLAVE
+			case STARPU_MPI_WORKER:
+				/* We spawn only one thread
+				 * per MPI device, which will control all MPI
+				 * workers of this device. (by using a worker set). */
+				if (worker_set->workers != workerarg)
+					break;
+
+				worker_set->nworkers = pconfig->topology.nmpicores[devid];
+
+				worker_set->set_is_initialized = 0;
+
+				STARPU_PTHREAD_CREATE_ON(
+						workerarg->name,
+						&worker_set->worker_thread,
+						NULL,
+						_starpu_mpi_src_worker,
+						worker_set,
+						_starpu_simgrid_get_host_by_worker(workerarg));
+
+#ifdef STARPU_USE_FXT
+				STARPU_PTHREAD_MUTEX_LOCK(&workerarg->mutex);
+				while (!workerarg->worker_is_running)
+					STARPU_PTHREAD_COND_WAIT(&workerarg->started_cond, &workerarg->mutex);
+				STARPU_PTHREAD_MUTEX_UNLOCK(&workerarg->mutex);
 #endif
 
+				STARPU_PTHREAD_MUTEX_LOCK(&worker_set->mutex);
+				while (!worker_set->set_is_initialized)
+					STARPU_PTHREAD_COND_WAIT(&worker_set->ready_cond,
+								  &worker_set->mutex);
+				STARPU_PTHREAD_MUTEX_UNLOCK(&worker_set->mutex);
+
+				worker_set->started = 1;
+
+				break;
+#endif /* STARPU_USE_MPI_MASTER_SLAVE */
+
 			default:
 				STARPU_ABORT();
 		}
@@ -858,6 +896,7 @@ static void _starpu_launch_drivers(struct _starpu_machine_config *pconfig)
 				break;
 #endif
 			case STARPU_MIC_WORKER:
+            case STARPU_MPI_WORKER:
 				/* Already waited above */
 				break;
 			case STARPU_SCC_WORKER:
@@ -900,6 +939,7 @@ int starpu_conf_init(struct starpu_conf *conf)
 	conf->nopencl = starpu_get_env_number("STARPU_NOPENCL");
 	conf->nmic = starpu_get_env_number("STARPU_NMIC");
 	conf->nscc = starpu_get_env_number("STARPU_NSCC");
+	conf->nmpi_ms = starpu_get_env_number("STARPU_NMPI_MS");
 	conf->calibrate = starpu_get_env_number("STARPU_CALIBRATE");
 	conf->bus_calibrate = starpu_get_env_number("STARPU_BUS_CALIBRATE");
 	conf->mic_sink_program_path = starpu_getenv("STARPU_MIC_PROGRAM_PATH");
@@ -915,6 +955,7 @@ int starpu_conf_init(struct starpu_conf *conf)
 	conf->use_explicit_workers_opencl_gpuid = 0; /* TODO */
 	conf->use_explicit_workers_mic_deviceid = 0; /* TODO */
 	conf->use_explicit_workers_scc_deviceid = 0; /* TODO */
+	conf->use_explicit_workers_mpi_deviceid = 0; /* TODO */
 
 	conf->single_combined_worker = starpu_get_env_number("STARPU_SINGLE_COMBINED_WORKER");
 	if (conf->single_combined_worker == -1)
@@ -1088,6 +1129,12 @@ int starpu_initialize(struct starpu_conf *user_conf, int *argc, char ***argv)
 		setenv("STARPU_SINK", "STARPU_SCC", 1);
 #	endif
 
+#   ifdef STARPU_USE_MPI_MASTER_SLAVE
+	/* In MPI case we look at the rank to know if we are a sink */
+	if (_starpu_mpi_common_mp_init() && !_starpu_mpi_common_is_src_node())
+		setenv("STARPU_SINK", "STARPU_MPI", 1);
+#   endif
+
 	/* If StarPU was configured to use MP sinks, we have to control the
 	 * kind on node we are running on : host or sink ? */
 	if (starpu_getenv("STARPU_SINK"))
@@ -1223,6 +1270,11 @@ int starpu_initialize(struct starpu_conf *user_conf, int *argc, char ***argv)
 		if (_starpu_scc_common_is_mp_initialized())
 			_starpu_scc_src_mp_deinit();
 #endif
+#ifdef STARPU_USE_MPI_MASTER_SLAVE
+        if (_starpu_mpi_common_is_mp_initialized())
+            _starpu_mpi_src_mp_deinit();
+#endif
+
 		initialized = UNINITIALIZED;
 		/* Let somebody else try to do it */
 		STARPU_PTHREAD_COND_SIGNAL(&init_cond);

+ 20 - 1
src/core/workers.h

@@ -2,7 +2,7 @@
  *
  * Copyright (C) 2009-2016  Université de Bordeaux
  * Copyright (C) 2010, 2011, 2012, 2013, 2014, 2016  CNRS
- * Copyright (C) 2011  INRIA
+ * Copyright (C) 2011, 2016  INRIA
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -217,6 +217,11 @@ struct _starpu_machine_topology
 	 */
 	unsigned nhwscc;
 
+	/* Total number of MPI nodes, as detected. May be different
+	 * from the actual number of node workers.
+	 */
+	unsigned nhwmpi;
+
 	/* Actual number of CPU workers used by StarPU. */
 	unsigned ncpus;
 
@@ -229,6 +234,13 @@ struct _starpu_machine_topology
 	/* Actual number of SCC workers used by StarPU. */
 	unsigned nsccdevices;
 
+	/* Actual number of MPI workers used by StarPU. */
+	unsigned nmpidevices;
+    unsigned nhwmpidevices;
+
+	unsigned nhwmpicores[STARPU_MAXMPIDEVS]; // Each MPI node has its set of cores.
+	unsigned nmpicores[STARPU_MAXMPIDEVS];
+
 	/* Topology of MP nodes (mainly MIC and SCC) as well as necessary
 	 * objects to communicate with them. */
 	unsigned nhwmicdevices;
@@ -278,6 +290,8 @@ struct _starpu_machine_topology
 	 * are taken in ID order.
 	 */
 	unsigned workers_scc_deviceid[STARPU_NMAXWORKERS];
+
+	unsigned workers_mpi_deviceid[STARPU_NMAXWORKERS];
 };
 
 struct _starpu_machine_config
@@ -304,6 +318,9 @@ struct _starpu_machine_config
 	/* Which SCC do we use? */
 	int current_scc_deviceid;
 
+	/* Which MPI do we use? */
+	int current_mpi_deviceid;
+
 	/* Memory node for cpus, if only one */
 	int cpus_nodeid;
 	/* Memory node for CUDA, if only one */
@@ -314,6 +331,8 @@ struct _starpu_machine_config
 	int mic_nodeid;
 	/* Memory node for SCC, if only one */
 	int scc_nodeid;
+	/* Memory node for MPI, if only one */
+	int mpi_nodeid;
 
 	/* Basic workers : each of this worker is running its own driver and
 	 * can be combined with other basic workers. */

+ 55 - 7
src/drivers/mp_common/mp_common.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2012  INRIA
+ * Copyright (C) 2012, 2016  INRIA
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -219,15 +219,63 @@ _starpu_mp_common_node_create(enum _starpu_mp_node_kind node_kind,
 	break;
 #endif /* STARPU_USE_SCC */
 
-#ifdef STARPU_USE_MPI
+#ifdef STARPU_USE_MPI_MASTER_SLAVE
 	case STARPU_MPI_SOURCE:
-		STARPU_ABORT();
-		break;
+    {
+    /*
+		node->nb_mp_sinks = 
+		node->devid = 
+
+        node->init = _starpu_mpi_source_init;
+        node->launch_workers = NULL;
+        node->deinit = _starpu_mpi_source_deinit;
+        node->report_error = 
+
+		node->mp_recv_is_ready = 
+		node->mp_send = 
+		node->mp_recv = 
+		node->dt_send = 
+		node->dt_recv = 
+
+		node->get_kernel_from_job = 
+		node->lookup = 
+		node->bind_thread = 
+		node->execute = 
+		node->allocate = 
+		node->free = 
+
+        */
+    }
+	break;
 
 	case STARPU_MPI_SINK:
-		STARPU_ABORT();
+    {
+    /*
+		node->nb_mp_sinks = 
+		node->devid = 
+
+        node->init = _starpu_mpi_sink_init;
+        node->launch_workers = _starpu_mpi_sink_launch_workers;
+        node->deinit = _starpu_mpi_sink_deinit;
+        node->report_error = 
+
+		node->mp_recv_is_ready = ;
+		node->mp_send = 
+		node->mp_recv = 
+		node->dt_send = 
+		node->dt_recv = 
+
+		node->get_kernel_from_job = 
+		node->lookup = 
+		node->bind_thread = 
+		node->execute = 
+		node->allocate = 
+		node->free = 
+
+        */
+    }
 		break;
-#endif /* STARPU_USE_MPI */
+#endif /* STARPU_USE_MPI_MASTER_SLAVE */
 
 	default:
 		STARPU_ASSERT(0);
@@ -244,7 +292,7 @@ _starpu_mp_common_node_create(enum _starpu_mp_node_kind node_kind,
 	STARPU_PTHREAD_MUTEX_INIT(&node->message_queue_mutex,NULL);
 
 	/* If the node is a sink then we must initialize some field */
-	if(node->kind == STARPU_MIC_SINK || node->kind == STARPU_SCC_SINK)
+	if(node->kind == STARPU_MIC_SINK || node->kind == STARPU_SCC_SINK || node->kind == STARPU_MPI_SINK)
 	{
 		int i;
 		node->is_running = 1;

+ 3 - 1
src/drivers/mp_common/mp_common.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2012  INRIA
+ * Copyright (C) 2012, 2016  INRIA
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -88,7 +88,9 @@ union _starpu_mp_connection
 #ifdef STARPU_USE_SCC
 	int scc_nodeid;
 #endif
+#ifdef STARPU_USE_MPI_MASTER_SLAVE
 	int mpi_nodeid;
+#endif
 };
 
 struct _starpu_mp_transfer_command

+ 85 - 0
src/drivers/mpi/driver_mpi_common.c

@@ -0,0 +1,85 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2015  Mathieu Lirzin <mthl@openmailbox.org>
+ * Copyright (C) 2016  Inria
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <mpi.h>
+#include <core/workers.h>
+#include "driver_mpi_common.h"
+
+#define DRIVER_MPI_MASTER_NODE_DEFAULT 0
+
+static int mpi_initialized;
+static int src_node_id;
+
+static void _starpu_mpi_set_src_node_id()
+{
+	int node_id = starpu_get_env_number("STARPU_MPI_MASTER_NODE");
+
+	if (node_id != -1)
+	{
+        int nb_proc, id_proc;
+        MPI_Comm_size(MPI_COMM_WORLD, &nb_proc);
+        MPI_Comm_rank(MPI_COMM_WORLD, &id_proc);
+
+		if (node_id < nb_proc)
+		{
+			src_node_id = node_id;
+			return;
+		}
+		else if (id_proc == DRIVER_MPI_MASTER_NODE_DEFAULT)
+		{
+			/* Only one node prints the error message. */
+			fprintf(stderr, "The node you specify to be the master is "
+					"greater than the total number of nodes.\n"
+					"Taking node %d by default...\n", DRIVER_MPI_MASTER_NODE_DEFAULT);
+		}
+	}
+
+	/* Node by default. */
+	src_node_id = DRIVER_MPI_MASTER_NODE_DEFAULT;
+}
+
+int _starpu_mpi_common_mp_init()
+{
+	if (mpi_initialized || MPI_Init(_starpu_get_argc(), _starpu_get_argv()) != MPI_SUCCESS)
+        return 0;
+
+	mpi_initialized = 1;
+
+    /* Find which node is the master */
+    _starpu_mpi_set_src_node_id();
+
+    return 1;
+}
+
+void _starpu_mpi_src_mp_deinit()
+{
+    MPI_Finalize();    
+    mpi_initialized = 0;
+}
+
+int _starpu_mpi_common_is_src_node()
+{   
+    int id_proc;
+    MPI_Comm_rank(MPI_COMM_WORLD, &id_proc);
+    return id_proc == src_node_id;
+} 
+
+int _starpu_mpi_common_is_mp_initialized()
+{
+	return mpi_initialized;
+}
+

+ 34 - 0
src/drivers/mpi/driver_mpi_common.h

@@ -0,0 +1,34 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2015  Mathieu Lirzin <mthl@openmailbox.org>
+ * Copyright (C) 2016  Inria
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#ifndef __DRIVER_MPI_COMMON_H__
+#define __DRIVER_MPI_COMMON_H__
+
+#include <drivers/mp_common/mp_common.h>
+#include <drivers/mpi/driver_mpi_source.h>
+
+#ifdef STARPU_USE_MPI_MASTER_SLAVE
+
+int _starpu_mpi_common_mp_init();
+void _starpu_mpi_src_mp_deinit();
+int _starpu_mpi_common_is_src_node();
+int _starpu_mpi_common_is_mp_initialized();
+
+
+#endif  /* STARPU_USE_MPI_MASTER_SLAVE */
+
+#endif	/* __DRIVER_MPI_COMMON_H__ */

+ 53 - 0
src/drivers/mpi/driver_mpi_sink.c

@@ -0,0 +1,53 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2015  Mathieu Lirzin <mthl@openmailbox.org>
+ * Copyright (C) 2016  Inria
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+
+#include <mpi.h>
+#include "driver_mpi_sink.h"
+
+void _starpu_mpi_sink_init(struct _starpu_mp_node *node)
+{
+    //TODO
+}
+
+void _starpu_mpi_sink_deinit(struct _starpu_mp_node *node)
+{
+    //TODO
+}
+
+void _starpu_mpi_sink_launch_workers(struct _starpu_mp_node *node)
+{
+    //TODO
+}
+
+//void _starpu_mpi_sink_send(const struct _starpu_mp_node *sink, void *msg,
+//			   int len)
+//{
+//	int dst = STARPU_MP_SRC_NODE;
+//	if (MPI_Send(msg, len, MPI_CHAR, dst, dst, MPI_COMM_WORLD))
+//		STARPU_MP_COMMON_REPORT_ERROR(sink, errno);
+//}
+//
+//void _starpu_mpi_sink_recv(const struct _starpu_mp_node *sink, void *msg,
+//			   int len)
+//{
+//	int src = STARPU_MP_SRC_NODE;
+//	if (MPI_Recv(msg, len, MPI_CHAR, src, sink->mp_connection.mpi_nodeid,
+//		     MPI_COMM_WORLD, MPI_STATUS_IGNORE))
+//		STARPU_MP_COMMON_REPORT_ERROR(sink, errno);
+//}
+

+ 35 - 0
src/drivers/mpi/driver_mpi_sink.h

@@ -0,0 +1,35 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2015  Mathieu Lirzin <mthl@openmailbox.org>
+ * Copyright (C) 2016  Inria
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#ifndef __DRIVER_MPI_SINK_H__
+#define __DRIVER_MPI_SINK_H__
+
+#include <drivers/mp_common/sink_common.h>
+
+#ifdef STARPU_USE_MPI_MASTER_SLAVE
+
+///* Send *MSG which can be an answer or data, to MPI SOURCE. */
+//extern void _starpu_mpi_sink_send(const struct _starpu_mp_node *source,
+//				  void *msg, int len);
+//
+///* Receive *MSG which can be either a command or data, from MPI SOURCE. */
+//extern void _starpu_mpi_sink_recv(const struct _starpu_mp_node *source,
+//				  void *msg, int len);
+
+#endif  /* STARPU_USE_MPI_MASTER_SLAVE */
+
+#endif	/* __DRIVER_MPI_SINK_H__ */

+ 182 - 0
src/drivers/mpi/driver_mpi_source.c

@@ -0,0 +1,182 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2015  Mathieu Lirzin <mthl@openmailbox.org>
+ * Copyright (C) 2016  Inria
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+
+#include <mpi.h>
+#include <errno.h>
+
+#include <starpu.h>
+#include <drivers/mpi/driver_mpi_source.h>
+#include <drivers/mpi/driver_mpi_common.h>
+
+#include <drivers/driver_common/driver_common.h>
+#include <drivers/mp_common/source_common.h>
+
+
+struct _starpu_mp_node *mpi_ms_nodes[STARPU_MAXMPIDEVS];
+
+//static void _starpu_mpi_src_init_context(int devid)
+//{
+//    mpi_mp_nodes[devid] = _starpu_mp_common_node_create(STARPU_MPI_SOURCE, devid);
+//}
+
+static void _starpu_mpi_src_deinit_context(int devid)
+{
+    _starpu_mp_common_send_command(mpi_ms_nodes[devid], STARPU_EXIT, NULL, 0);
+
+    _starpu_mp_common_node_destroy(mpi_ms_nodes[devid]);
+}
+
+
+
+void _starpu_mpi_source_init(struct _starpu_mp_node *node)
+{
+    //TODO
+}
+
+void _starpu_mpi_source_deinit(struct _starpu_mp_node *node)
+{
+    //TODO
+}
+
+unsigned _starpu_mpi_src_get_device_count()
+{
+    int nb_mpi_devices;
+
+    if (!_starpu_mpi_common_is_mp_initialized())
+        return 0;
+    
+    MPI_Comm_size(MPI_COMM_WORLD, &nb_mpi_devices);
+
+    //Remove one for master
+    nb_mpi_devices = nb_mpi_devices - 1;
+
+    return nb_mpi_devices;
+
+}
+
+ void _starpu_mpi_exit_useless_node(int devid)
+{   
+    struct _starpu_mp_node *node = _starpu_mp_common_node_create(STARPU_MPI_SOURCE, devid);
+
+    _starpu_mp_common_send_command(node, STARPU_EXIT, NULL, 0);
+
+    _starpu_mp_common_node_destroy(node);
+}  
+
+void *_starpu_mpi_src_worker(void *arg)
+{
+    struct _starpu_worker_set *worker_set = arg;
+    /* As all workers of a set share common data, we just use the first
+     *       * one for intializing the following stuffs. */
+    struct _starpu_worker *baseworker = &worker_set->workers[0];
+    struct _starpu_machine_config *config = baseworker->config;
+    unsigned baseworkerid = baseworker - config->workers;
+    unsigned devid = baseworker->devid;
+    unsigned i;
+
+    /* unsigned memnode = baseworker->memory_node; */
+
+    _starpu_driver_start(baseworker, _STARPU_FUT_MPI_KEY, 0);
+#ifdef STARPU_USE_FXT             
+    for (i = 1; i < worker_set->nworkers; i++)
+        _starpu_worker_start(&worker_set->workers[i], _STARPU_FUT_MPI_KEY, 0);
+#endif          
+
+    // Current task for a thread managing a worker set has no sense.
+    _starpu_set_current_task(NULL);
+
+    for (i = 0; i < config->topology.nmpicores[devid]; i++)
+    {
+        struct _starpu_worker *worker = &config->workers[baseworkerid+i];
+        snprintf(worker->name, sizeof(worker->name), "MPI_MS %d core %u", devid, i);
+        snprintf(worker->short_name, sizeof(worker->short_name), "MPI_MS %d.%u", devid, i);
+    }
+    {
+        char thread_name[16];
+        snprintf(thread_name, sizeof(thread_name), "MPI_MS %d", devid);
+        starpu_pthread_setname(thread_name);
+    }
+
+    for (i = 0; i < worker_set->nworkers; i++)
+    {
+        struct _starpu_worker *worker = &worker_set->workers[i];
+        _STARPU_TRACE_WORKER_INIT_END(worker->workerid);
+    }
+
+    /* tell the main thread that this one is ready */
+    STARPU_PTHREAD_MUTEX_LOCK(&worker_set->mutex);
+    baseworker->status = STATUS_UNKNOWN;
+    worker_set->set_is_initialized = 1;
+    STARPU_PTHREAD_COND_SIGNAL(&worker_set->ready_cond);
+    STARPU_PTHREAD_MUTEX_UNLOCK(&worker_set->mutex);
+
+    _starpu_src_common_worker(worker_set, baseworkerid, mpi_ms_nodes[devid]);
+
+    return NULL;
+
+    
+}
+
+
+//void _starpu_mpi_source_send(const struct _starpu_mp_node *node, void *msg,
+//			     int len)
+//{
+//	int dst = node->mp_connection.mpi_nodeid;
+//	if (MPI_Send(msg, len, MPI_CHAR, dst, dst, MPI_COMM_WORLD))
+//		STARPU_MP_COMMON_REPORT_ERROR(node, errno);
+//}
+//
+//void _starpu_mpi_source_recv(const struct _starpu_mp_node *node, void *msg,
+//			     int len)
+//{
+//	int src = node->mp_connection.mpi_nodeid;
+//	if (MPI_Recv(msg, len, MPI_CHAR, src, STARPU_MP_SRC_NODE,
+//		     MPI_COMM_WORLD, MPI_STATUS_IGNORE))
+//		STARPU_MP_COMMON_REPORT_ERROR(node, errno);
+//}
+//
+//int _starpu_mpi_copy_src_to_sink(void *src,
+//				 unsigned src_node STARPU_ATTRIBUTE_UNUSED,
+//				 void *dst, unsigned dst_node, size_t size)
+//{
+//	/* TODO */
+//	return 0;
+//}
+//
+//int _starpu_mpi_copy_sink_to_src(void *src, unsigned src_node, void *dst,
+//				 unsigned dst_node STARPU_ATTRIBUTE_UNUSED,
+//				 size_t size)
+//{
+//	/* TODO */
+//	return 0;
+//}
+//
+//int _starpu_mpi_copy_sink_to_sink(void *src, unsigned src_node, void *dst,
+//				  unsigned dst_node, size_t size)
+//{
+//	/* TODO */
+//	return 0;
+//}
+//
+//void (*_starpu_mpi_get_kernel_from_job(const struct _starpu_mp_node *node,
+//				       struct _starpu_job *j))(void)
+//{
+//	/* TODO */
+//	return NULL;
+//}
+

+ 64 - 0
src/drivers/mpi/driver_mpi_source.h

@@ -0,0 +1,64 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2015  Mathieu Lirzin <mthl@openmailbox.org>
+ * Copyright (C) 2016  Inria
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#ifndef __DRIVER_MPI_SOURCE_H__
+#define __DRIVER_MPI_SOURCE_H__
+
+#include <drivers/mp_common/mp_common.h>
+
+#ifdef STARPU_USE_MPI_MASTER_SLAVE
+
+/* Array of structures containing all the informations useful to send
+ * and receive informations with devices */
+extern struct _starpu_mp_node *mpi_ms_nodes[STARPU_MAXMICDEVS];
+
+unsigned _starpu_mpi_src_get_device_count();
+void *_starpu_mpi_src_worker(void *arg);
+void _starpu_mpi_exit_useless_node(int devid);
+
+///* Send *MSG which can be a command or data, to a MPI sink. */
+//extern void _starpu_mpi_source_send(const struct _starpu_mp_node *node,
+//				    void *msg, int len);
+//
+///* Receive *MSG which can be an answer or data, to a MPI sink. */
+//extern void _starpu_mpi_source_recv(const struct _starpu_mp_node *node,
+//				    void *msg, int len);
+//
+///* Transfert SIZE bytes from the address pointed by SRC in the SRC_NODE memory
+// * node to the address pointed by DST in the DST_NODE memory node */
+//extern int _starpu_mpi_copy_src_to_sink(void *src,
+//					unsigned src_node STARPU_ATTRIBUTE_UNUSED,
+//					void *dst, unsigned dst_node,
+//					size_t size);
+//
+///* Transfert SIZE bytes from the address pointed by SRC in the SRC_NODE memory
+// * node to the address pointed by DST in the DST_NODE memory node */
+//extern int _starpu_mpi_copy_sink_to_src(void *src, unsigned src_node, void *dst,
+//					unsigned dst_node STARPU_ATTRIBUTE_UNUSED,
+//					size_t size);
+//
+//extern int _starpu_mpi_copy_sink_to_sink(void *src, unsigned src_node,
+//					 void *dst, unsigned dst_node,
+//					 size_t size);
+//
+///* Get a pointer which points at the implementation to be called by MPI node. */
+//extern void (*_starpu_mpi_get_kernel_from_job(const struct _starpu_mp_node *,
+//					      struct _starpu_job *j))(void);
+
+#endif /* STARPU_USE_MPI_MASTER_SLAVE */
+
+#endif	/* __DRIVER_MPI_SOURCE_H__ */

+ 4 - 0
src/top/starpu_top.c

@@ -2,6 +2,7 @@
  *
  * Copyright (C) 2011 William Braik, Yann Courtois, Jean-Marie Couteyen, Anthony Roy
  * Copyright (C) 2011, 2012, 2013 CNRS
+ * Copyright (C) 2016  Inria
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -111,6 +112,9 @@ static void starpu_top_get_device_type(int id, char* type)
 	case STARPU_SCC_WORKER:
 		strncpy(type, "SCC", 9);
 		break;
+	case STARPU_MPI_WORKER:
+		strncpy(type, "MPI", 9);
+		break;
 	}
 	type[9] = 0;
 }