Przeglądaj źródła

merge branch load-balancer

Nathalie Furmento 8 lat temu
rodzic
commit
1c3705fb6b

+ 2 - 0
include/starpu_sched_ctx.h

@@ -119,6 +119,8 @@ void starpu_sched_ctx_set_policy_data(unsigned sched_ctx_id, void *policy_data);
 
 void *starpu_sched_ctx_get_policy_data(unsigned sched_ctx_id);
 
+struct starpu_sched_policy *starpu_sched_ctx_get_sched_policy(unsigned sched_ctx_id);
+
 void *starpu_sched_ctx_exec_parallel_code(void* (*func)(void*), void *param, unsigned sched_ctx_id);
 
 int starpu_sched_ctx_get_nready_tasks(unsigned sched_ctx_id);

+ 4 - 3
mpi/Makefile.am

@@ -1,7 +1,7 @@
 # StarPU --- Runtime system for heterogeneous multicore architectures.
 #
 # Copyright (C) 2009-2013, 2015  Université de Bordeaux
-# Copyright (C) 2010, 2011, 2012, 2013  CNRS
+# Copyright (C) 2010, 2011, 2012, 2013, 2017  CNRS
 # Copyright (C) 2016  Inria
 #
 # StarPU is free software; you can redistribute it and/or modify
@@ -22,8 +22,9 @@ pkgconfig_DATA = libstarpumpi.pc starpumpi-1.0.pc starpumpi-1.1.pc starpumpi-1.2
 
 versincludedir = $(includedir)/starpu/$(STARPU_EFFECTIVE_VERSION)
 versinclude_HEADERS = 					\
-	include/fstarpu_mpi_mod.f90			\
-	include/starpu_mpi.h
+	include/starpu_mpi.h				\
+	include/starpu_mpi_lb.h				\
+	include/fstarpu_mpi_mod.f90
 
 showcheck:
 	RET=0 ; \

+ 9 - 3
mpi/examples/Makefile.am

@@ -1,7 +1,7 @@
 # StarPU --- Runtime system for heterogeneous multicore architectures.
 #
 # Copyright (C) 2009-2013, 2015-2016  Université de Bordeaux
-# Copyright (C) 2010, 2011, 2012, 2013, 2014, 2015, 2016  CNRS
+# Copyright (C) 2010, 2011, 2012, 2013, 2014, 2015, 2016, 2017  CNRS
 # Copyright (C) 2016  Inria
 #
 # StarPU is free software; you can redistribute it and/or modify
@@ -108,13 +108,19 @@ AM_LDFLAGS = $(STARPU_OPENCL_LDFLAGS) $(STARPU_CUDA_LDFLAGS) $(FXT_LDFLAGS) $(ST
 ###################
 if BUILD_EXAMPLES
 examplebin_PROGRAMS +=				\
-	stencil/stencil5
+	stencil/stencil5			\
+	stencil/stencil5_lb
 
 stencil_stencil5_LDADD =		\
 	../src/libstarpumpi-@STARPU_EFFECTIVE_VERSION@.la -lm
 
+stencil_stencil5_lb_LDADD =		\
+	../src/libstarpumpi-@STARPU_EFFECTIVE_VERSION@.la -lm
+
 starpu_mpi_EXAMPLES	+=	\
-	stencil/stencil5
+	stencil/stencil5	\
+	stencil/stencil5_lb
+
 endif
 
 ##################

+ 286 - 0
mpi/examples/stencil/stencil5_lb.c

@@ -0,0 +1,286 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2011, 2013, 2015-2016              Université Bordeaux
+ * Copyright (C) 2011, 2012, 2013, 2014, 2015, 2016, 2017  CNRS
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <starpu_mpi.h>
+#include <starpu_mpi_lb.h>
+#include <math.h>
+
+#define FPRINTF(ofile, fmt, ...) do { if (!getenv("STARPU_SSILENT")) {fprintf(ofile, fmt, ## __VA_ARGS__); }} while(0)
+#define FPRINTF_MPI(ofile, fmt, ...) do { if (!getenv("STARPU_SSILENT")) { \
+    						int _disp_rank; starpu_mpi_comm_rank(MPI_COMM_WORLD, &_disp_rank);       \
+                                                fprintf(ofile, "[%d][starpu_mpi][%s] " fmt , _disp_rank, __starpu_func__ ,## __VA_ARGS__); \
+                                                fflush(ofile); }} while(0);
+
+void stencil5_cpu(void *descr[], STARPU_ATTRIBUTE_UNUSED void *_args)
+{
+	float *xy = (float *)STARPU_VARIABLE_GET_PTR(descr[0]);
+	float *xm1y = (float *)STARPU_VARIABLE_GET_PTR(descr[1]);
+	float *xp1y = (float *)STARPU_VARIABLE_GET_PTR(descr[2]);
+	float *xym1 = (float *)STARPU_VARIABLE_GET_PTR(descr[3]);
+	float *xyp1 = (float *)STARPU_VARIABLE_GET_PTR(descr[4]);
+
+//	fprintf(stdout, "VALUES: %2.2f %2.2f %2.2f %2.2f %2.2f\n", *xy, *xm1y, *xp1y, *xym1, *xyp1);
+	*xy = (*xy + *xm1y + *xp1y + *xym1 + *xyp1) / 5;
+//	fprintf(stdout, "VALUES: %2.2f %2.2f %2.2f %2.2f %2.2f\n", *xy, *xm1y, *xp1y, *xym1, *xyp1);
+}
+
+struct starpu_codelet stencil5_cl =
+{
+	.cpu_funcs = {stencil5_cpu},
+	.nbuffers = 5,
+	.modes = {STARPU_RW, STARPU_R, STARPU_R, STARPU_R, STARPU_R}
+};
+
+#ifdef STARPU_QUICK_CHECK
+#  define NITER_DEF	10
+#  define X         	2
+#  define Y         	2
+#elif !defined(STARPU_LONG_CHECK)
+#  define NITER_DEF	10
+#  define X         	5
+#  define Y         	5
+#else
+#  define NITER_DEF	100
+#  define X         	20
+#  define Y         	20
+#endif
+
+int display = 0;
+int niter = NITER_DEF;
+
+/* Returns the MPI node number where data indexes index is */
+int my_distrib(int x, int y, int nb_nodes)
+{
+	/* Block distrib */
+	return ((int)(x / sqrt(nb_nodes) + (y / sqrt(nb_nodes)) * sqrt(nb_nodes))) % nb_nodes;
+}
+
+static void parse_args(int argc, char **argv)
+{
+	int i;
+	for (i = 1; i < argc; i++)
+	{
+		if (strcmp(argv[i], "-iter") == 0)
+		{
+			char *argptr;
+			niter = strtol(argv[++i], &argptr, 10);
+		}
+		if (strcmp(argv[i], "-display") == 0)
+		{
+			display = 1;
+		}
+	}
+}
+
+void get_neighbors(int **neighbor_ids, int *nneighbors)
+{
+	int ret, rank, size;
+	starpu_mpi_comm_rank(MPI_COMM_WORLD, &rank);
+	starpu_mpi_comm_size(MPI_COMM_WORLD, &size);
+
+	if (size <= 2)
+	{
+		*nneighbors = 1;
+		*neighbor_ids = malloc(sizeof(int));
+		*neighbor_ids[0] = rank==size-1?0:rank+1;
+		fprintf(stderr, "rank %d has neighbor %d\n", rank, *neighbor_ids[0]);
+	}
+	else
+	{
+		*nneighbors = 2;
+		*neighbor_ids = malloc(2*sizeof(int));
+		(*neighbor_ids)[0] = rank==size-1?0:rank+1;
+		(*neighbor_ids)[1] = rank==0?size-1:rank-1;
+		fprintf(stderr, "rank %d has neighbor %d and %d\n", rank, (*neighbor_ids)[0], (*neighbor_ids)[1]);
+	}
+}
+
+struct data_node
+{
+	starpu_data_handle_t data_handle;
+	int node;
+};
+
+struct data_node data_nodes[X][Y];
+
+void get_data_unit_to_migrate(starpu_data_handle_t **handle_unit, int *nhandles, int dst_node)
+{
+	int rank, x, y;
+	starpu_mpi_comm_rank(MPI_COMM_WORLD, &rank);
+	fprintf(stderr, "Looking to move data from %d to %d\n", rank, dst_node);
+	for(x = 0; x < X; x++)
+	{
+		for (y = 0; y < Y; y++)
+		{
+			if (data_nodes[x][y].node == rank)
+			{
+				*handle_unit = malloc(sizeof(starpu_data_handle_t));
+				*handle_unit[0] = data_nodes[x][y].data_handle;
+				*nhandles = 1;
+				data_nodes[x][y].node = dst_node;
+				return;
+			}
+		}
+	}
+	*nhandles = 0;
+}
+
+int main(int argc, char **argv)
+{
+	int my_rank, size, x, y, loop;
+	float mean=0;
+	float matrix[X][Y];
+	struct starpu_mpi_lb_conf itf;
+
+	itf.get_neighbors = get_neighbors;
+	itf.get_data_unit_to_migrate = get_data_unit_to_migrate;
+
+	int ret = starpu_init(NULL);
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
+	ret = starpu_mpi_init(&argc, &argv, 1);
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_mpi_init");
+	starpu_mpi_comm_rank(MPI_COMM_WORLD, &my_rank);
+	starpu_mpi_comm_size(MPI_COMM_WORLD, &size);
+
+	if (size > 2)
+	{
+		FPRINTF(stderr, "Only works with 2 nodes\n");
+		starpu_mpi_shutdown();
+		starpu_shutdown();
+		return 77;
+	}
+	if (starpu_cpu_worker_get_count() == 0)
+	{
+		FPRINTF(stderr, "We need at least 1 CPU worker.\n");
+		starpu_mpi_shutdown();
+		starpu_shutdown();
+		return 77;
+	}
+
+	setenv("LB_HEAT_SLEEP_THRESHOLD", "5", 1);
+	starpu_mpi_lb_init("heat", &itf);
+
+	parse_args(argc, argv);
+
+	/* Initial data values */
+	starpu_srand48((long int)time(NULL));
+	for(x = 0; x < X; x++)
+	{
+		for (y = 0; y < Y; y++)
+		{
+			matrix[x][y] = (float)starpu_drand48();
+			mean += matrix[x][y];
+		}
+	}
+	mean /= (X*Y);
+
+	if (display)
+	{
+		FPRINTF_MPI(stdout, "mean=%2.2f\n", mean);
+		for(x = 0; x < X; x++)
+		{
+			fprintf(stdout, "[%d] ", my_rank);
+			for (y = 0; y < Y; y++)
+			{
+				fprintf(stdout, "%2.2f ", matrix[x][y]);
+			}
+			fprintf(stdout, "\n");
+		}
+	}
+
+	/* Initial distribution */
+	for(x = 0; x < X; x++)
+	{
+		for (y = 0; y < Y; y++)
+		{
+			data_nodes[x][y].node = my_distrib(x, y, size);
+			if (data_nodes[x][y].node == my_rank)
+			{
+				//FPRINTF(stderr, "[%d] Owning data[%d][%d]\n", my_rank, x, y);
+				starpu_variable_data_register(&data_nodes[x][y].data_handle, 0, (uintptr_t)&(matrix[x][y]), sizeof(float));
+			}
+			else if (my_rank == my_distrib(x+1, y, size) || my_rank == my_distrib(x-1, y, size)
+				 || my_rank == my_distrib(x, y+1, size) || my_rank == my_distrib(x, y-1, size))
+			{
+				/* I don't own that index, but will need it for my computations */
+				//FPRINTF(stderr, "[%d] Neighbour of data[%d][%d]\n", my_rank, x, y);
+				starpu_variable_data_register(&data_nodes[x][y].data_handle, -1, (uintptr_t)NULL, sizeof(float));
+			}
+			else
+			{
+				/* I know it's useless to allocate anything for this */
+				data_nodes[x][y].data_handle = NULL;
+			}
+			if (data_nodes[x][y].data_handle)
+			{
+				starpu_mpi_data_register(data_nodes[x][y].data_handle, (y*X)+x, data_nodes[x][y].node);
+			}
+		}
+	}
+
+	/* First computation with initial distribution */
+	for(loop=0 ; loop<niter; loop++)
+	{
+		for (x = 1; x < X-1; x++)
+		{
+			for (y = 1; y < Y-1; y++)
+			{
+				starpu_mpi_task_insert(MPI_COMM_WORLD, &stencil5_cl, STARPU_RW, data_nodes[x][y].data_handle,
+						       STARPU_R, data_nodes[x-1][y].data_handle, STARPU_R, data_nodes[x+1][y].data_handle,
+						       STARPU_R, data_nodes[x][y-1].data_handle, STARPU_R, data_nodes[x][y+1].data_handle,
+						       STARPU_TAG_ONLY, ((starpu_tag_t)X)*x + y,
+						       0);
+			}
+		}
+	}
+	FPRINTF(stderr, "Waiting ...\n");
+	starpu_task_wait_for_all();
+
+	// The load balancer needs to be shutdown before unregistering data as it needs access to them
+	starpu_mpi_lb_shutdown();
+
+	/* Unregister data */
+	for(x = 0; x < X; x++)
+	{
+		for (y = 0; y < Y; y++)
+		{
+			if (data_nodes[x][y].data_handle)
+			{
+				starpu_data_unregister(data_nodes[x][y].data_handle);
+			}
+		}
+	}
+
+	starpu_mpi_shutdown();
+	starpu_shutdown();
+
+	if (display)
+	{
+		FPRINTF(stdout, "[%d] mean=%2.2f\n", my_rank, mean);
+		for(x = 0; x < X; x++)
+		{
+			FPRINTF(stdout, "[%d] ", my_rank);
+			for (y = 0; y < Y; y++)
+			{
+				FPRINTF(stdout, "%2.2f ", matrix[x][y]);
+			}
+			FPRINTF(stdout, "\n");
+		}
+	}
+
+	return 0;
+}

+ 4 - 1
mpi/include/starpu_mpi.h

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2009-2012, 2014-2016  Université de Bordeaux
- * Copyright (C) 2010, 2011, 2012, 2013, 2014, 2015, 2016  CNRS
+ * Copyright (C) 2010, 2011, 2012, 2013, 2014, 2015, 2016, 2017  CNRS
  * Copyright (C) 2016  Inria
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -119,6 +119,9 @@ typedef void (*starpu_mpi_datatype_free_func_t)(MPI_Datatype *);
 int starpu_mpi_datatype_register(starpu_data_handle_t handle, starpu_mpi_datatype_allocate_func_t allocate_datatype_func, starpu_mpi_datatype_free_func_t free_datatype_func);
 int starpu_mpi_datatype_unregister(starpu_data_handle_t handle);
 
+int starpu_mpi_pre_submit_hook_register(void (*f)(struct starpu_task *));
+int starpu_mpi_pre_submit_hook_unregister();
+
 #ifdef __cplusplus
 }
 #endif

+ 41 - 0
mpi/include/starpu_mpi_lb.h

@@ -0,0 +1,41 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2016  Inria
+ * Copyright (C) 2017  CNRS
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#ifndef __STARPU_MPI_LOAD_BALANCER_H__
+#define __STARPU_MPI_LOAD_BALANCER_H__
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct starpu_mpi_lb_conf
+{
+	void (*get_neighbors)(int **neighbor_ids, int *nneighbors);
+	void (*get_data_unit_to_migrate)(starpu_data_handle_t **handle_unit, int *nhandles, int dst_node);
+};
+
+/* Inits the load balancer's environment with the load policy provided by the
+ * user
+ */
+void starpu_mpi_lb_init(const char *lb_policy_name, struct starpu_mpi_lb_conf *);
+void starpu_mpi_lb_shutdown();
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // __STARPU_MPI_LOAD_BALANCER_H__

+ 11 - 3
mpi/src/Makefile.am

@@ -1,7 +1,7 @@
 # StarPU --- Runtime system for heterogeneous multicore architectures.
 #
 # Copyright (C) 2009-2012  Université de Bordeaux
-# Copyright (C) 2010, 2011, 2012, 2013, 2014, 2015, 2016  CNRS
+# Copyright (C) 2010, 2011, 2012, 2013, 2014, 2015, 2016, 2017  CNRS
 #
 # StarPU is free software; you can redistribute it and/or modify
 # it under the terms of the GNU Lesser General Public License as published by
@@ -69,7 +69,10 @@ noinst_HEADERS =					\
 	starpu_mpi_sync_data.h				\
 	starpu_mpi_comm.h				\
 	starpu_mpi_tag.h				\
-	starpu_mpi_task_insert.h
+	starpu_mpi_task_insert.h			\
+	load_balancer/policy/data_movements_interface.h	\
+	load_balancer/policy/load_data_interface.h	\
+	load_balancer/policy/load_balancer_policy.h
 
 libstarpumpi_@STARPU_EFFECTIVE_VERSION@_la_SOURCES =	\
 	starpu_mpi.c					\
@@ -88,7 +91,12 @@ libstarpumpi_@STARPU_EFFECTIVE_VERSION@_la_SOURCES =	\
 	starpu_mpi_comm.c				\
 	starpu_mpi_tag.c				\
 	starpu_mpi_fortran.c				\
-	starpu_mpi_task_insert_fortran.c
+	starpu_mpi_task_insert_fortran.c		\
+	load_balancer/policy/data_movements_interface.c	\
+	load_balancer/policy/load_data_interface.c	\
+	load_balancer/policy/load_heat_propagation.c	\
+	load_balancer/load_balancer.c
+
 
 showcheck:
 	-cat /dev/null

+ 156 - 0
mpi/src/load_balancer/load_balancer.c

@@ -0,0 +1,156 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2016  Inria
+ * Copyright (C) 2017  CNRS
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <starpu.h>
+#include <starpu_mpi.h>
+#include <starpu_scheduler.h>
+#include <common/utils.h>
+
+#include <starpu_mpi_lb.h>
+#include "policy/load_balancer_policy.h"
+
+static struct load_balancer_policy *defined_policy = NULL;
+typedef void (*_post_exec_hook_func_t)(struct starpu_task *task, unsigned sched_ctx_id);
+static _post_exec_hook_func_t saved_post_exec_hook[STARPU_NMAX_SCHED_CTXS];
+
+static void post_exec_hook_wrapper(struct starpu_task *task, unsigned sched_ctx_id)
+{
+	//fprintf(stderr,"I am called ! \n");
+	if (defined_policy && defined_policy->finished_task_entry_point)
+		defined_policy->finished_task_entry_point();
+	if (saved_post_exec_hook[sched_ctx_id])
+		saved_post_exec_hook[sched_ctx_id](task, sched_ctx_id);
+}
+
+static struct load_balancer_policy *predefined_policies[] =
+{
+	&load_heat_propagation_policy,
+	NULL
+};
+
+void starpu_mpi_lb_init(const char *lb_policy_name, struct starpu_mpi_lb_conf *itf)
+{
+	int ret;
+
+	const char *policy_name = starpu_getenv("STARPU_MPI_LB");
+	if (!policy_name)
+		policy_name = lb_policy_name;
+
+	if (!policy_name || (strcmp(policy_name, "help") == 0))
+	{
+		_STARPU_MSG("Warning : load balancing is disabled for this run.\n");
+		_STARPU_MSG("Use the STARPU_MPI_LB = <name> environment variable to use a load balancer.\n");
+		_STARPU_MSG("Available load balancers :\n");
+		struct load_balancer_policy **policy;
+		for(policy=predefined_policies ; *policy!=NULL ; policy++)
+		{
+			struct load_balancer_policy *p = *policy;
+			fprintf(stderr," - %s\n", p->policy_name);
+		}
+		return;
+	}
+
+	if (policy_name)
+	{
+		struct load_balancer_policy **policy;
+		for(policy=predefined_policies ; *policy!=NULL ; policy++)
+		{
+			struct load_balancer_policy *p = *policy;
+			if (p->policy_name)
+			{
+				if (strcmp(policy_name, p->policy_name) == 0)
+				{
+					/* we found a policy with the requested name */
+					defined_policy = p;
+					break;
+				}
+			}
+		}
+	}
+
+	if (!defined_policy)
+	{
+		_STARPU_MSG("Error : no load balancer with the name %s. Load balancing will be disabled for this run.\n", policy_name);
+		return;
+	}
+
+	ret = defined_policy->init(itf);
+	if (ret != 0)
+	{
+		_STARPU_MSG("Error (%d) in %s->init: invalid starpu_mpi_lb_conf. Load balancing will be disabled for this run.\n", ret, defined_policy->policy_name);
+		return;
+	}
+
+	/* starpu_register_hook(submitted_task, defined_policy->submitted_task_entry_point); */
+	if (defined_policy->submitted_task_entry_point)
+		starpu_mpi_pre_submit_hook_register(defined_policy->submitted_task_entry_point);
+
+	/* starpu_register_hook(finished_task, defined_policy->finished_task_entry_point); */
+	if (defined_policy->finished_task_entry_point)
+	{
+		int i;
+		for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
+		{
+			struct starpu_sched_policy *sched_policy = starpu_sched_ctx_get_sched_policy(i);
+			if (sched_policy)
+			{
+				_STARPU_DEBUG("Setting post_exec_hook for scheduling context %d %s (%d)\n", i, sched_policy->policy_name, STARPU_NMAX_SCHED_CTXS);
+				saved_post_exec_hook[i] = sched_policy->post_exec_hook;
+				sched_policy->post_exec_hook = post_exec_hook_wrapper;
+			}
+			else
+				saved_post_exec_hook[i] = NULL;
+		}
+	}
+
+	return;
+}
+
+void starpu_mpi_lb_shutdown()
+{
+	if (!defined_policy)
+		return;
+
+	int ret = defined_policy->deinit();
+	if (ret != 0)
+	{
+		_STARPU_MSG("Error (%d) in %s->deinit\n", ret, defined_policy->policy_name);
+		return;
+	}
+
+	/* starpu_unregister_hook(submitted_task, defined_policy->submitted_task_entry_point); */
+	if (defined_policy->submitted_task_entry_point)
+		starpu_mpi_pre_submit_hook_unregister();
+
+	/* starpu_unregister_hook(finished_task, defined_policy->finished_task_entry_point); */
+	if (defined_policy->finished_task_entry_point)
+	{
+		int i;
+		for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
+		{
+			if (saved_post_exec_hook[i])
+			{
+				struct starpu_sched_policy *sched_policy = starpu_sched_ctx_get_sched_policy(i);
+				sched_policy->post_exec_hook = saved_post_exec_hook[i];
+				saved_post_exec_hook[i] = NULL;
+			}
+		}
+	}
+	defined_policy = NULL;
+}

+ 280 - 0
mpi/src/load_balancer/policy/data_movements_interface.c

@@ -0,0 +1,280 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2016  Inria
+ * Copyright (C) 2017  CNRS
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <starpu.h>
+#include <stdlib.h>
+
+#include "data_movements_interface.h"
+
+int **data_movements_get_ref_tags_table(starpu_data_handle_t handle)
+{
+	struct data_movements_interface *dm_interface =
+		(struct data_movements_interface *) starpu_data_get_interface_on_node(handle, STARPU_MAIN_RAM);
+
+	if (dm_interface->tags)
+		return &dm_interface->tags;
+	else
+		return NULL;
+}
+
+int **data_movements_get_ref_ranks_table(starpu_data_handle_t handle)
+{
+	struct data_movements_interface *dm_interface =
+		(struct data_movements_interface *) starpu_data_get_interface_on_node(handle, STARPU_MAIN_RAM);
+
+	if (dm_interface->ranks)
+		return &dm_interface->ranks;
+	else
+		return NULL;
+}
+
+int *data_movements_get_tags_table(starpu_data_handle_t handle)
+{
+	struct data_movements_interface *dm_interface =
+		(struct data_movements_interface *) starpu_data_get_interface_on_node(handle, STARPU_MAIN_RAM);
+
+	return dm_interface->tags;
+}
+
+int *data_movements_get_ranks_table(starpu_data_handle_t handle)
+{
+	struct data_movements_interface *dm_interface =
+		(struct data_movements_interface *) starpu_data_get_interface_on_node(handle, STARPU_MAIN_RAM);
+
+	return dm_interface->ranks;
+}
+
+int data_movements_get_size_tables(starpu_data_handle_t handle)
+{
+	struct data_movements_interface *dm_interface =
+		(struct data_movements_interface *) starpu_data_get_interface_on_node(handle, STARPU_MAIN_RAM);
+
+	return dm_interface->size;
+}
+
+int data_movements_reallocate_tables(starpu_data_handle_t handle, int size)
+{
+	struct data_movements_interface *dm_interface =
+		(struct data_movements_interface *) starpu_data_get_interface_on_node(handle, STARPU_MAIN_RAM);
+
+	if (dm_interface->size)
+	{
+		STARPU_ASSERT(dm_interface->tags);
+		free(dm_interface->tags);
+		dm_interface->tags = NULL;
+
+		STARPU_ASSERT(dm_interface->ranks);
+		free(dm_interface->ranks);
+		dm_interface->ranks = NULL;
+	}
+	else
+	{
+		STARPU_ASSERT(!dm_interface->tags);
+		STARPU_ASSERT(!dm_interface->ranks);
+	}
+
+	dm_interface->size = size;
+
+	if (dm_interface->size)
+	{
+		dm_interface->tags = malloc(size*sizeof(int));
+		dm_interface->ranks = malloc(size*sizeof(int));
+	}
+
+	return 0 ;
+}
+
+static void data_movements_register_data_handle(starpu_data_handle_t handle, unsigned home_node, void *data_interface)
+{
+	struct data_movements_interface *dm_interface = (struct data_movements_interface *) data_interface;
+
+	unsigned node;
+	for (node = 0; node < STARPU_MAXNODES; node++)
+	{
+		struct data_movements_interface *local_interface = (struct data_movements_interface *)
+			starpu_data_get_interface_on_node(handle, node);
+
+		local_interface->size = dm_interface->size;
+		if (node == home_node)
+		{
+			local_interface->tags = dm_interface->tags;
+			local_interface->ranks = dm_interface->ranks;
+		}
+		else
+		{
+			local_interface->tags = NULL;
+			local_interface->ranks = NULL;
+		}
+	}
+}
+
+static starpu_ssize_t data_movements_allocate_data_on_node(void *data_interface, unsigned node)
+{
+	struct data_movements_interface *dm_interface = (struct data_movements_interface *) data_interface;
+
+	int *addr_tags = NULL;
+	int *addr_ranks = NULL;
+	starpu_ssize_t requested_memory = dm_interface->size * sizeof(int);
+
+	addr_tags = (int*) starpu_malloc_on_node(node, requested_memory);
+	if (!addr_tags)
+		goto fail_tags;
+	addr_ranks = (int*) starpu_malloc_on_node(node, requested_memory);
+	if (!addr_ranks)
+		goto fail_ranks;
+
+	/* update the data properly in consequence */
+	dm_interface->tags = addr_tags;
+	dm_interface->ranks = addr_ranks;
+
+	return 2*requested_memory;
+
+fail_ranks:
+	starpu_free_on_node(node, (uintptr_t) addr_tags, requested_memory);
+fail_tags:
+	return -ENOMEM;
+}
+
+static void data_movements_free_data_on_node(void *data_interface, unsigned node)
+{
+	struct data_movements_interface *dm_interface = (struct data_movements_interface *) data_interface;
+	starpu_ssize_t requested_memory = dm_interface->size * sizeof(int);
+
+	starpu_free_on_node(node, (uintptr_t) dm_interface->tags, requested_memory);
+	starpu_free_on_node(node, (uintptr_t) dm_interface->ranks, requested_memory);
+}
+
+static size_t data_movements_get_size(starpu_data_handle_t handle)
+{
+	size_t size;
+	struct data_movements_interface *dm_interface = (struct data_movements_interface *) starpu_data_get_interface_on_node(handle, STARPU_MAIN_RAM);
+
+	size = (dm_interface->size * 2 * sizeof(int)) + sizeof(int);
+	return size;
+}
+
+static uint32_t data_movements_footprint(starpu_data_handle_t handle)
+{
+	return starpu_hash_crc32c_be(data_movements_get_size(handle), 0);
+}
+
+static int data_movements_pack_data(starpu_data_handle_t handle, unsigned node, void **ptr, starpu_ssize_t *count)
+{
+	STARPU_ASSERT(starpu_data_test_if_allocated_on_node(handle, node));
+
+	struct data_movements_interface *dm_interface = (struct data_movements_interface *)
+		starpu_data_get_interface_on_node(handle, node);
+
+	*count = data_movements_get_size(handle);
+	if (ptr != NULL)
+	{
+		char *data;
+		starpu_malloc_flags((void**) &data, *count, 0);
+		assert(data);
+		*ptr = data;
+		memcpy(data, &dm_interface->size, sizeof(int));
+		if (dm_interface->size)
+		{
+			memcpy(data+sizeof(int), dm_interface->tags, (dm_interface->size*sizeof(int)));
+			memcpy(data+sizeof(int)+(dm_interface->size*sizeof(int)), dm_interface->ranks, dm_interface->size*sizeof(int));
+		}
+	}
+
+	return 0;
+}
+
+static int data_movements_unpack_data(starpu_data_handle_t handle, unsigned node, void *ptr, size_t count)
+{
+	char *data = ptr;
+	STARPU_ASSERT(starpu_data_test_if_allocated_on_node(handle, node));
+
+	struct data_movements_interface *dm_interface = (struct data_movements_interface *)
+		starpu_data_get_interface_on_node(handle, node);
+
+	int size = 0;
+	memcpy(&size, data, sizeof(int));
+	STARPU_ASSERT(count == (2 * size * sizeof(int)) + sizeof(int));
+
+	data_movements_reallocate_tables(handle, size);
+
+	if (dm_interface->size)
+	{
+		memcpy(dm_interface->tags, data+sizeof(int), dm_interface->size*sizeof(int));
+		memcpy(dm_interface->ranks, data+sizeof(int)+(dm_interface->size*sizeof(int)), dm_interface->size*sizeof(int));
+	}
+
+    return 0;
+}
+
+static int copy_any_to_any(void *src_interface, unsigned src_node,
+			   void *dst_interface, unsigned dst_node,
+			   void *async_data)
+{
+	struct data_movements_interface *src_data_movements = src_interface;
+	struct data_movements_interface *dst_data_movements = dst_interface;
+	int ret = 0;
+
+	if (starpu_interface_copy((uintptr_t) src_data_movements->tags, 0, src_node,
+				    (uintptr_t) dst_data_movements->tags, 0, dst_node,
+				     src_data_movements->size*sizeof(int),
+				     async_data))
+		ret = -EAGAIN;
+	if (starpu_interface_copy((uintptr_t) src_data_movements->ranks, 0, src_node,
+				    (uintptr_t) dst_data_movements->ranks, 0, dst_node,
+				     src_data_movements->size*sizeof(int),
+				     async_data))
+		ret = -EAGAIN;
+	return ret;
+}
+
+static const struct starpu_data_copy_methods data_movements_copy_methods =
+{
+	.any_to_any = copy_any_to_any
+};
+
+static struct starpu_data_interface_ops interface_data_movements_ops =
+{
+	.register_data_handle = data_movements_register_data_handle,
+	.allocate_data_on_node = data_movements_allocate_data_on_node,
+	.free_data_on_node = data_movements_free_data_on_node,
+	.copy_methods = &data_movements_copy_methods,
+	.get_size = data_movements_get_size,
+	.footprint = data_movements_footprint,
+	.interfaceid = STARPU_UNKNOWN_INTERFACE_ID,
+	.interface_size = sizeof(struct data_movements_interface),
+	.handle_to_pointer = NULL,
+	.pack_data = data_movements_pack_data,
+	.unpack_data = data_movements_unpack_data,
+	.describe = NULL
+};
+
+void data_movements_data_register(starpu_data_handle_t *handleptr, unsigned home_node, int *tags, int *ranks, int size)
+{
+	struct data_movements_interface data_movements =
+	{
+		.tags = tags,
+		.ranks = ranks,
+		.size = size
+	};
+
+	if (interface_data_movements_ops.interfaceid == STARPU_UNKNOWN_INTERFACE_ID)
+	{
+		interface_data_movements_ops.interfaceid = starpu_data_interface_get_next_id();
+	}
+
+	starpu_data_register(handleptr, home_node, &data_movements, &interface_data_movements_ops);
+}

+ 48 - 0
mpi/src/load_balancer/policy/data_movements_interface.h

@@ -0,0 +1,48 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2016  Inria
+ * Copyright (C) 2017  CNRS
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <starpu.h>
+
+#ifndef __DATA_MOVEMENTS_INTERFACE_H
+#define __DATA_MOVEMENTS_INTERFACE_H
+
+/* interface for data_movements */
+struct data_movements_interface
+{
+	/* Data tags table */
+	int *tags;
+	/* Ranks table (where to move the corresponding data) */
+	int *ranks;
+	/* Size of the tables */
+	int size;
+};
+
+void data_movements_data_register(starpu_data_handle_t *handle, unsigned home_node, int *ranks, int *tags, int size);
+
+int **data_movements_get_ref_tags_table(starpu_data_handle_t handle);
+int **data_movements_get_ref_ranks_table(starpu_data_handle_t handle);
+int data_movements_reallocate_tables(starpu_data_handle_t handle, int size);
+
+int *data_movements_get_tags_table(starpu_data_handle_t handle);
+int *data_movements_get_ranks_table(starpu_data_handle_t handle);
+int data_movements_get_size_tables(starpu_data_handle_t handle);
+
+#define DATA_MOVEMENTS_GET_SIZE_TABLES(interface)	(((struct data_movements_interface *)(interface))->size)
+#define DATA_MOVEMENTS_GET_TAGS_TABLE(interface)	(((struct data_movements_interface *)(interface))->tags)
+#define DATA_MOVEMENTS_GET_RANKS_TABLE(interface)	(((struct data_movements_interface *)(interface))->ranks)
+
+#endif /* __DATA_MOVEMENTS_INTERFACE_H */

+ 52 - 0
mpi/src/load_balancer/policy/load_balancer_policy.h

@@ -0,0 +1,52 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2016  Inria
+ * Copyright (C) 2017  CNRS
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#ifndef __LOAD_BALANCER_POLICY_H__
+#define __LOAD_BALANCER_POLICY_H__
+
+#include <starpu_mpi_lb.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/* A load balancer consists in a collection of operations on a data
+ * representing the load of the application (in terms of computation, memory,
+ * whatever). StarPU allows several entry points for the user. The load
+ * balancer allows the user to give its load balancing methods to be used on
+ * these entry points of the runtime system. */
+struct load_balancer_policy
+{
+	int (*init)(struct starpu_mpi_lb_conf *);
+	int (*deinit)();
+	void (*submitted_task_entry_point)();
+	void (*finished_task_entry_point)();
+
+	/* Name of the load balancing policy. The selection of the load balancer is
+	 * performed through the use of the STARPU_MPI_LB=name environment
+	 * variable.
+	 */
+	const char *policy_name;
+};
+
+extern struct load_balancer_policy load_heat_propagation_policy;
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // __LOAD_BALANCER_POLICY_H__

+ 268 - 0
mpi/src/load_balancer/policy/load_data_interface.c

@@ -0,0 +1,268 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2016  Inria
+ * Copyright (C) 2017  CNRS
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <starpu.h>
+#include <stdlib.h>
+
+#include "load_data_interface.h"
+
+int load_data_get_sleep_threshold(starpu_data_handle_t handle)
+{
+	struct load_data_interface *ld_interface =
+		(struct load_data_interface *) starpu_data_get_interface_on_node(handle, STARPU_MAIN_RAM);
+
+	return ld_interface->sleep_task_threshold;
+}
+
+int load_data_get_wakeup_threshold(starpu_data_handle_t handle)
+{
+	struct load_data_interface *ld_interface =
+		(struct load_data_interface *) starpu_data_get_interface_on_node(handle, STARPU_MAIN_RAM);
+
+	return ld_interface->wakeup_task_threshold;
+}
+
+int load_data_get_current_phase(starpu_data_handle_t handle)
+{
+	struct load_data_interface *ld_interface =
+		(struct load_data_interface *) starpu_data_get_interface_on_node(handle, STARPU_MAIN_RAM);
+
+	return ld_interface->phase;
+}
+
+int load_data_get_nsubmitted_tasks(starpu_data_handle_t handle)
+{
+	struct load_data_interface *ld_interface =
+		(struct load_data_interface *) starpu_data_get_interface_on_node(handle, STARPU_MAIN_RAM);
+
+	return ld_interface->nsubmitted_tasks;
+}
+
+int load_data_get_nfinished_tasks(starpu_data_handle_t handle)
+{
+	struct load_data_interface *ld_interface =
+		(struct load_data_interface *) starpu_data_get_interface_on_node(handle, STARPU_MAIN_RAM);
+
+	return ld_interface->nfinished_tasks;
+}
+
+int load_data_inc_nsubmitted_tasks(starpu_data_handle_t handle)
+{
+	struct load_data_interface *ld_interface =
+		(struct load_data_interface *) starpu_data_get_interface_on_node(handle, STARPU_MAIN_RAM);
+
+	(ld_interface->nsubmitted_tasks)++;
+
+	return 0;
+}
+
+int load_data_inc_nfinished_tasks(starpu_data_handle_t handle)
+{
+	struct load_data_interface *ld_interface =
+		(struct load_data_interface *) starpu_data_get_interface_on_node(handle, STARPU_MAIN_RAM);
+
+	(ld_interface->nfinished_tasks)++;
+
+	return 0;
+}
+
+int load_data_next_phase(starpu_data_handle_t handle)
+{
+	struct load_data_interface *ld_interface =
+		(struct load_data_interface *) starpu_data_get_interface_on_node(handle, STARPU_MAIN_RAM);
+
+	ld_interface->phase++;
+
+	return 0;
+}
+
+int load_data_update_elapsed_time(starpu_data_handle_t handle)
+{
+	struct load_data_interface *ld_interface =
+		(struct load_data_interface *) starpu_data_get_interface_on_node(handle, STARPU_MAIN_RAM);
+
+	ld_interface->elapsed_time = starpu_timing_now() - ld_interface->start;
+
+	return 0;
+}
+
+double load_data_get_elapsed_time(starpu_data_handle_t handle)
+{
+	struct load_data_interface *ld_interface =
+		(struct load_data_interface *) starpu_data_get_interface_on_node(handle, STARPU_MAIN_RAM);
+
+	return ld_interface->elapsed_time;
+}
+
+int load_data_update_wakeup_cond(starpu_data_handle_t handle)
+{
+	struct load_data_interface *ld_interface =
+		(struct load_data_interface *) starpu_data_get_interface_on_node(handle, STARPU_MAIN_RAM);
+
+	int previous_threshold = ld_interface->wakeup_task_threshold;
+	ld_interface->wakeup_task_threshold += (ld_interface->nsubmitted_tasks - previous_threshold) * ld_interface->wakeup_ratio;
+
+	return 0;
+}
+
+int load_data_wakeup_cond(starpu_data_handle_t handle)
+{
+	struct load_data_interface *ld_interface =
+		(struct load_data_interface *) starpu_data_get_interface_on_node(handle, STARPU_MAIN_RAM);
+
+	return ((ld_interface->wakeup_task_threshold > 0) && (ld_interface->nfinished_tasks == ld_interface->wakeup_task_threshold));
+}
+
+static void load_data_register_data_handle(starpu_data_handle_t handle, unsigned home_node, void *data_interface)
+{
+	struct load_data_interface *ld_interface = (struct load_data_interface *) data_interface;
+
+	unsigned node;
+	for (node = 0; node < STARPU_MAXNODES; node++)
+	{
+		struct load_data_interface *local_interface = (struct load_data_interface *)
+			starpu_data_get_interface_on_node(handle, node);
+
+		local_interface->start = ld_interface->start;
+		local_interface->elapsed_time = ld_interface->elapsed_time;
+		local_interface->phase = ld_interface->phase;
+		local_interface->nsubmitted_tasks = ld_interface->nsubmitted_tasks;
+		local_interface->nfinished_tasks = ld_interface->nsubmitted_tasks;
+		local_interface->wakeup_task_threshold = ld_interface->wakeup_task_threshold;
+		local_interface->wakeup_ratio = ld_interface->wakeup_ratio;
+		local_interface->sleep_task_threshold = ld_interface->sleep_task_threshold;
+	}
+}
+
+static starpu_ssize_t load_data_allocate_data_on_node(void *data_interface, unsigned node)
+{
+	(void) data_interface;
+	(void) node;
+
+	return 0;
+}
+
+static void load_data_free_data_on_node(void *data_interface, unsigned node)
+{
+	(void) data_interface;
+	(void) node;
+}
+
+static size_t load_data_get_size(starpu_data_handle_t handle)
+{
+	(void) handle;
+	return (sizeof(struct load_data_interface));
+}
+
+static uint32_t load_data_footprint(starpu_data_handle_t handle)
+{
+	struct load_data_interface *ld_interface =
+		(struct load_data_interface *) starpu_data_get_interface_on_node(handle, STARPU_MAIN_RAM);
+	return starpu_hash_crc32c_be(ld_interface->start,
+				     starpu_hash_crc32c_be(ld_interface->elapsed_time,
+							   starpu_hash_crc32c_be(ld_interface->nsubmitted_tasks,
+										 starpu_hash_crc32c_be(ld_interface->sleep_task_threshold, ld_interface->wakeup_task_threshold))));
+}
+
+static int load_data_pack_data(starpu_data_handle_t handle, unsigned node, void **ptr, starpu_ssize_t *count)
+{
+	STARPU_ASSERT(starpu_data_test_if_allocated_on_node(handle, node));
+
+	struct load_data_interface *ld_interface = (struct load_data_interface *)
+		starpu_data_get_interface_on_node(handle, node);
+
+	*count = load_data_get_size(handle);
+	if (ptr != NULL)
+	{
+		char *data;
+		starpu_malloc_flags((void**) &data, *count, 0);
+		*ptr = data;
+		memcpy(data, ld_interface, *count);
+	}
+
+	return 0;
+}
+
+static int load_data_unpack_data(starpu_data_handle_t handle, unsigned node, void *ptr, size_t count)
+{
+	char *data = ptr;
+	STARPU_ASSERT(starpu_data_test_if_allocated_on_node(handle, node));
+
+	struct load_data_interface *ld_interface = (struct load_data_interface *)
+		starpu_data_get_interface_on_node(handle, node);
+
+	STARPU_ASSERT(count == sizeof(struct load_data_interface));
+	memcpy(ld_interface, data, count);
+
+	return 0;
+}
+
+static int copy_any_to_any(void *src_interface, unsigned src_node,
+			   void *dst_interface, unsigned dst_node,
+			   void *async_data)
+{
+	(void) src_interface;
+	(void) dst_interface;
+	(void) src_node;
+	(void) dst_node;
+	(void) async_data;
+
+	return 0;
+}
+
+static const struct starpu_data_copy_methods load_data_copy_methods =
+{
+	.any_to_any = copy_any_to_any
+};
+
+static struct starpu_data_interface_ops interface_load_data_ops =
+{
+	.register_data_handle = load_data_register_data_handle,
+	.allocate_data_on_node = load_data_allocate_data_on_node,
+	.free_data_on_node = load_data_free_data_on_node,
+	.copy_methods = &load_data_copy_methods,
+	.get_size = load_data_get_size,
+	.footprint = load_data_footprint,
+	.interfaceid = STARPU_UNKNOWN_INTERFACE_ID,
+	.interface_size = sizeof(struct load_data_interface),
+	.handle_to_pointer = NULL,
+	.pack_data = load_data_pack_data,
+	.unpack_data = load_data_unpack_data,
+	.describe = NULL
+};
+
+void load_data_data_register(starpu_data_handle_t *handleptr, unsigned home_node, int sleep_task_threshold, double wakeup_ratio)
+{
+	struct load_data_interface load_data =
+	{
+		.start = starpu_timing_now(),
+		.elapsed_time = 0,
+		.phase = 0,
+		.nsubmitted_tasks = 0,
+		.nfinished_tasks = 0,
+		.sleep_task_threshold = sleep_task_threshold,
+		.wakeup_task_threshold = 0,
+		.wakeup_ratio = wakeup_ratio
+	};
+
+	if (interface_load_data_ops.interfaceid == STARPU_UNKNOWN_INTERFACE_ID)
+	{
+		interface_load_data_ops.interfaceid = starpu_data_interface_get_next_id();
+	}
+
+	starpu_data_register(handleptr, home_node, &load_data, &interface_load_data_ops);
+}

+ 70 - 0
mpi/src/load_balancer/policy/load_data_interface.h

@@ -0,0 +1,70 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2016  Inria
+ * Copyright (C) 2017  CNRS
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <starpu.h>
+
+#ifndef __LOAD_DATA_INTERFACE_H
+#define __LOAD_DATA_INTERFACE_H
+
+/* interface for load_data */
+struct load_data_interface
+{
+	/* Starting time of the execution */
+	double start;
+	/* Elapsed time until the start time and the time when event "launch a load
+	 * balancing phase" is triggered */
+	double elapsed_time;
+	/* Current submission phase, i.e how many balanced steps have already
+	 * happened so far. */
+	int phase;
+	/* Number of currently submitted tasks */
+	int nsubmitted_tasks;
+	/* Number of currently finished tasks */
+	int nfinished_tasks;
+	/* Task threshold to sleep the submission thread */
+	int sleep_task_threshold;
+	/* Task threshold to wake-up the submission thread */
+	int wakeup_task_threshold;
+	/* Ratio of submitted tasks to wait for completion before waking up the
+	 * submission thread */
+	double wakeup_ratio;
+};
+
+void load_data_data_register(starpu_data_handle_t *handle, unsigned home_node, int sleep_task_threshold, double wakeup_ratio);
+
+int load_data_get_sleep_threshold(starpu_data_handle_t handle);
+int load_data_get_wakeup_threshold(starpu_data_handle_t handle);
+int load_data_get_current_phase(starpu_data_handle_t handle);
+int load_data_get_nsubmitted_tasks(starpu_data_handle_t handle);
+int load_data_get_nfinished_tasks(starpu_data_handle_t handle);
+
+int load_data_inc_nsubmitted_tasks(starpu_data_handle_t handle);
+int load_data_inc_nfinished_tasks(starpu_data_handle_t handle);
+
+int load_data_next_phase(starpu_data_handle_t handle);
+
+int load_data_update_elapsed_time(starpu_data_handle_t handle);
+double load_data_get_elapsed_time(starpu_data_handle_t handle);
+
+int load_data_update_wakeup_cond(starpu_data_handle_t handle);
+int load_data_wakeup_cond(starpu_data_handle_t handle);
+
+#define LOAD_DATA_GET_NSUBMITTED_TASKS(interface)	(((struct load_data_interface *)(interface))->nsubmitted_tasks)
+#define LOAD_DATA_GET_SLEEP_THRESHOLD(interface)	(((struct load_data_interface *)(interface))->sleep_task_threshold)
+#define LOAD_DATA_GET_WAKEUP_THRESHOLD(interface)	(((struct load_data_interface *)(interface))->wakeup_task_threshold)
+
+#endif /* __LOAD_DATA_INTERFACE_H */

+ 640 - 0
mpi/src/load_balancer/policy/load_heat_propagation.c

@@ -0,0 +1,640 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2016  Inria
+ * Copyright (C) 2017  CNRS
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <starpu_mpi.h>
+#include <starpu_mpi_tag.h>
+#include <common/uthash.h>
+#include <common/utils.h>
+#include <math.h>
+
+#include "load_balancer_policy.h"
+#include "data_movements_interface.h"
+#include "load_data_interface.h"
+
+static int TAG_LOAD(int n)
+{
+	return ((n+1) << 24);
+}
+
+static int TAG_MOV(int n)
+{
+	return ((n+1) << 20);
+}
+
+/* Hash table of local pieces of data that has been moved out of the local MPI
+ * node by the load balancer. All of these pieces of data must be migrated back
+ * to the local node at the end of the execution. */
+struct moved_data_entry
+{
+	UT_hash_handle hh;
+	starpu_data_handle_t handle;
+};
+
+static struct moved_data_entry *mdh = NULL;
+
+static starpu_pthread_mutex_t load_data_mutex;
+static starpu_pthread_cond_t load_data_cond;
+
+/* MPI infos */
+static int my_rank;
+static int world_size;
+
+/* Number of neighbours of the local MPI node and their IDs. These are given by
+ * the get_neighbors() method, and thus can be easily changed. */
+static int *neighbor_ids = NULL;
+static int nneighbors = 0;
+
+/* Local load data */
+static starpu_data_handle_t *load_data_handle = NULL;
+static starpu_data_handle_t *load_data_handle_cpy = NULL;
+/* Load data of neighbours */
+static starpu_data_handle_t *neighbor_load_data_handles = NULL;
+
+/* Table which contains a data_movements_handle for each MPI node of
+ * MPI_COMM_WORLD. Since all the MPI nodes must be advised of any data
+ * movement, this table will be used to perform communications of data
+ * movements handles following an all-to-all model. */
+static starpu_data_handle_t *data_movements_handles = NULL;
+
+/* Load balancer interface which contains the application-specific methods for
+ * the load balancer to use. */
+static struct starpu_mpi_lb_conf *user_itf = NULL;
+
+static double time_threshold = 20000;
+
+/******************************************************************************
+ *                              Balancing                                     *
+ *****************************************************************************/
+
+
+/* Decides which data has to move where, and fills the
+ * data_movements_handles[my_rank] data handle from that.
+ * In data :
+ *  - local load_data_handle
+ *  - nneighbors
+ *  - neighbor_ids[nneighbors]
+ *  - neighbor_load_data_handles[nneighbors]
+ * Out data :
+ *  - data_movements_handles[my_rank]
+ */
+
+static void balance(starpu_data_handle_t load_data_cpy)
+{
+	int less_loaded = -1;
+	int n;
+	double elapsed_time, ref_elapsed_time;
+	double my_elapsed_time = load_data_get_elapsed_time(load_data_cpy);
+
+	/* Search for the less loaded neighbor */
+	ref_elapsed_time = my_elapsed_time;
+	for (n = 0; n < nneighbors; n++)
+	{
+		elapsed_time = load_data_get_elapsed_time(neighbor_load_data_handles[n]);
+		if (ref_elapsed_time > elapsed_time)
+		{
+			//fprintf(stderr,"Node%d: ref local time %lf vs neighbour%d time %lf\n", my_rank, ref_elapsed_time, neighbor_ids[n], elapsed_time);
+			less_loaded = neighbor_ids[n];
+			ref_elapsed_time = elapsed_time;
+		}
+	}
+
+	/* We found it */
+	if (less_loaded >= 0)
+	{
+		_STARPU_DEBUG("Less loaded found on node %d : %d\n", my_rank, less_loaded);
+		double diff_time = my_elapsed_time - ref_elapsed_time;
+		/* If the difference is higher than a time threshold, we move
+		 * one data to the less loaded neighbour. */
+		/* TODO: How to decide the time threshold ? */
+		if ((time_threshold > 0) && (diff_time >= time_threshold))
+		{
+			starpu_data_handle_t *handles = NULL;
+			int nhandles = 0;
+			user_itf->get_data_unit_to_migrate(&handles, &nhandles, less_loaded);
+
+			data_movements_reallocate_tables(data_movements_handles[my_rank], nhandles);
+
+			if (nhandles)
+			{
+				int *tags = data_movements_get_tags_table(data_movements_handles[my_rank]);
+				int *ranks = data_movements_get_ranks_table(data_movements_handles[my_rank]);
+
+				for (n = 0; n < nhandles; n++)
+				{
+					tags[n] = starpu_mpi_data_get_tag(handles[n]);
+					ranks[n] = less_loaded;
+				}
+
+				free(handles);
+			}
+		}
+		else
+			data_movements_reallocate_tables(data_movements_handles[my_rank], 0);
+	}
+	else
+		data_movements_reallocate_tables(data_movements_handles[my_rank], 0);
+}
+
+static void exchange_load_data_infos(starpu_data_handle_t load_data_cpy)
+{
+	int i;
+
+	/* Allocate all requests and status for point-to-point communications */
+	starpu_mpi_req load_send_req[nneighbors];
+	starpu_mpi_req load_recv_req[nneighbors];
+
+	MPI_Status load_send_status[nneighbors];
+	MPI_Status load_recv_status[nneighbors];
+
+	int flag;
+
+	/* Send the local load data to neighbour nodes, and receive the remote load
+	 * data from neighbour nodes */
+	for (i = 0; i < nneighbors; i++)
+	{
+		//_STARPU_DEBUG("[node %d] sending and receiving with %i-th neighbor %i\n", my_rank, i, neighbor_ids[i]);
+		starpu_mpi_isend(load_data_cpy, &load_send_req[i], neighbor_ids[i], TAG_LOAD(my_rank), MPI_COMM_WORLD);
+		starpu_mpi_irecv(neighbor_load_data_handles[i], &load_recv_req[i], neighbor_ids[i], TAG_LOAD(neighbor_ids[i]), MPI_COMM_WORLD);
+	}
+
+	/* Wait for completion of all send requests */
+	for (i = 0; i < nneighbors; i++)
+	{
+		flag = 0;
+		while (!flag)
+			starpu_mpi_test(&load_send_req[i], &flag, &load_send_status[i]);
+	}
+
+	/* Wait for completion of all receive requests */
+	for (i = 0; i < nneighbors; i++)
+	{
+		flag = 0;
+		while (!flag)
+			starpu_mpi_test(&load_recv_req[i], &flag, &load_recv_status[i]);
+	}
+}
+
+static void exchange_data_movements_infos()
+{
+	int i;
+
+	/* Allocate all requests and status for point-to-point communications */
+	starpu_mpi_req data_movements_send_req[world_size];
+	starpu_mpi_req data_movements_recv_req[world_size];
+
+	MPI_Status data_movements_send_status[world_size];
+	MPI_Status data_movements_recv_status[world_size];
+
+	int flag;
+
+	/* Send the new ranks of local data to all other nodes, and receive the new
+	 * ranks of all remote data from all other nodes */
+	for (i = 0; i < world_size; i++)
+	{
+		if (i != my_rank)
+		{
+			//_STARPU_DEBUG("[node %d] Send and receive data movement with %d\n", my_rank, i);
+			starpu_mpi_isend(data_movements_handles[my_rank], &data_movements_send_req[i], i, TAG_MOV(my_rank), MPI_COMM_WORLD);
+			starpu_mpi_irecv(data_movements_handles[i], &data_movements_recv_req[i], i, TAG_MOV(i), MPI_COMM_WORLD);
+		}
+	}
+
+	/* Wait for completion of all send requests */
+	for (i = 0; i < world_size; i++)
+	{
+		if (i != my_rank)
+		{
+			//fprintf(stderr,"Wait for sending data movement of %d to %d\n", my_rank, i);
+			flag = 0;
+			while (!flag)
+				starpu_mpi_test(&data_movements_send_req[i], &flag, &data_movements_send_status[i]);
+		}
+	}
+
+	/* Wait for completion of all receive requests */
+	for (i = 0; i < world_size; i++)
+	{
+		if (i != my_rank)
+		{
+			//fprintf(stderr,"Wait for recieving data movement from %d on %d\n", i, my_rank);
+			flag = 0;
+			while (!flag)
+				starpu_mpi_test(&data_movements_recv_req[i], &flag, &data_movements_recv_status[i]);
+		}
+	}
+}
+
+static void update_data_ranks()
+{
+	int i,j;
+
+	/* Update the new ranks for all concerned data */
+	for (i = 0; i < world_size; i++)
+	{
+		int ndata_to_update = data_movements_get_size_tables(data_movements_handles[i]);
+		if (ndata_to_update)
+		{
+			//fprintf(stderr,"Update %d data from table %d on node %d\n", ndata_to_update, i, my_rank);
+
+			for (j = 0; j < ndata_to_update; j++)
+			{
+				starpu_data_handle_t handle = _starpu_mpi_data_get_data_handle_from_tag((data_movements_get_tags_table(data_movements_handles[i]))[j]);
+				STARPU_ASSERT(handle);
+				int dst_rank = (data_movements_get_ranks_table(data_movements_handles[i]))[j];
+
+				/* Save the fact that the data has been moved out of this node */
+				if (i == my_rank)
+				{
+					struct moved_data_entry *md = (struct moved_data_entry *)malloc(sizeof(struct moved_data_entry));
+					md->handle = handle;
+					HASH_ADD_PTR(mdh, handle, md);
+				}
+				else if (dst_rank == my_rank)
+				{
+					/* The data has been moved out, and now is moved back, so
+					 * update the state of the moved_data hash table to reflect
+					 * this change */
+					struct moved_data_entry *md = NULL;
+					HASH_FIND_PTR(mdh, &handle, md);
+					if (md)
+					{
+						HASH_DEL(mdh, md);
+						free(md);
+					}
+				}
+
+				//if (i == my_rank)
+				//{
+				//    if (dst_rank != my_rank)
+				//        fprintf(stderr,"Move data %p (tag %d) from node %d to node %d\n", handle, (data_movements_get_tags_table(data_movements_handles[i]))[j], my_rank, dst_rank);
+				//    else
+				//        fprintf(stderr,"Bring back data %p (tag %d) from node %d on node %d\n", handle, (data_movements_get_tags_table(data_movements_handles[i]))[j], starpu_mpi_data_get_rank(handle), my_rank);
+				//}
+
+				_STARPU_DEBUG("Call of starpu_mpi_get_data_on_node(%d,%d) on node %d\n", starpu_mpi_data_get_tag(handle), dst_rank, my_rank);
+
+				/* Migrate the data handle */
+				starpu_mpi_get_data_on_node_detached(MPI_COMM_WORLD, handle, dst_rank, NULL, NULL);
+
+				_STARPU_DEBUG("New rank (%d) of data %d upgraded on node %d\n", dst_rank, starpu_mpi_data_get_tag(handle), my_rank);
+				starpu_mpi_data_set_rank_comm(handle, dst_rank, MPI_COMM_WORLD);
+			}
+		}
+	}
+}
+
+static void clean_balance()
+{
+	int i;
+	starpu_mpi_cache_flush(MPI_COMM_WORLD, *load_data_handle_cpy);
+	for (i = 0; i < nneighbors; i++)
+		starpu_mpi_cache_flush(MPI_COMM_WORLD, neighbor_load_data_handles[i]);
+	for (i = 0; i < world_size; i++)
+		starpu_mpi_cache_flush(MPI_COMM_WORLD, data_movements_handles[i]);
+}
+
+/* Core function of the load balancer. Computes from the load_data_cpy handle a
+ * load balancing of the work to come (if needed), perform the necessary data
+ * communications and negociate with the other nodes the rebalancing. */
+static void heat_balance(starpu_data_handle_t load_data_cpy)
+{
+	/* Exchange load data handles with neighboring nodes */
+	exchange_load_data_infos(load_data_cpy);
+
+	/* Determine if this node should sent data to other nodes :
+	 * which ones, how much data */
+	balance(load_data_cpy);
+
+	/* Exchange data movements with neighboring nodes */
+	exchange_data_movements_infos();
+
+	/* Perform data movements */
+	update_data_ranks();
+
+	/* Clean the data handles to properly launch the next balance phase */
+	clean_balance();
+}
+
+/******************************************************************************
+ *                      Heat Load Balancer Entry Points                       *
+ *****************************************************************************/
+
+static void submitted_task_heat(struct starpu_task *task)
+{
+	load_data_inc_nsubmitted_tasks(*load_data_handle);
+	//if (load_data_get_nsubmitted_tasks(*load_data_handle) > task->tag_id)
+	//{
+	//    fprintf(stderr,"Error : nsubmitted_tasks (%d) > tag_id (%lld) ! \n", load_data_get_nsubmitted_tasks(*load_data_handle), (long long int)task->tag_id);
+	//    STARPU_ASSERT(0);
+	//}
+
+	int phase = load_data_get_current_phase(*load_data_handle);
+	/* Numbering of tasks in StarPU-MPI should be given by the application with
+	 * the STARPU_TAG_ONLY insert task option for now. */
+	/* TODO: Properly implement a solution for numbering tasks in StarPU-MPI */
+	if ((task->tag_id / load_data_get_sleep_threshold(*load_data_handle)) > phase)
+	{
+		STARPU_PTHREAD_MUTEX_LOCK(&load_data_mutex);
+		load_data_update_wakeup_cond(*load_data_handle);
+		//fprintf(stderr,"Node %d sleep on tag %lld\n", my_rank, (long long int)task->tag_id);
+		//if (load_data_get_nsubmitted_tasks(*load_data_handle) < load_data_get_wakeup_threshold(*load_data_handle))
+		//{
+		//    fprintf(stderr,"Error : nsubmitted_tasks (%d) lower than wakeup_threshold (%d) !\n", load_data_get_nsubmitted_tasks(*load_data_handle), load_data_get_wakeup_threshold(*load_data_handle));
+		//    STARPU_ASSERT(0);
+		//}
+
+		if (load_data_get_wakeup_threshold(*load_data_handle) > load_data_get_nfinished_tasks(*load_data_handle))
+			STARPU_PTHREAD_COND_WAIT(&load_data_cond, &load_data_mutex);
+
+		load_data_next_phase(*load_data_handle);
+
+		/* Register a copy of the load data at this moment, to allow to compute
+		 * the heat balance while not locking the load data during the whole
+		 * balance step, which could cause all the workers to wait on the lock
+		 * to update the data. */
+		struct starpu_data_interface_ops *itf_load_data = starpu_data_get_interface_ops(*load_data_handle);
+		void* itf_src = starpu_data_get_interface_on_node(*load_data_handle, STARPU_MAIN_RAM);
+		void* itf_dst = starpu_data_get_interface_on_node(*load_data_handle_cpy, STARPU_MAIN_RAM);
+		memcpy(itf_dst, itf_src, itf_load_data->interface_size);
+
+		_STARPU_DEBUG("[node %d] Balance phase %d\n", my_rank, load_data_get_current_phase(*load_data_handle));
+		STARPU_PTHREAD_MUTEX_UNLOCK(&load_data_mutex);
+
+		heat_balance(*load_data_handle_cpy);
+	}
+}
+
+static void finished_task_heat()
+{
+	//fprintf(stderr,"Try to decrement nsubmitted_tasks...");
+	STARPU_PTHREAD_MUTEX_LOCK(&load_data_mutex);
+
+	load_data_inc_nfinished_tasks(*load_data_handle);
+	//fprintf(stderr,"Decrement nsubmitted_tasks, now %d\n", load_data_get_nsubmitted_tasks(*load_data_handle));
+	if (load_data_wakeup_cond(*load_data_handle))
+	{
+		//fprintf(stderr,"Wakeup ! nfinished_tasks = %d, wakeup_threshold = %d\n", load_data_get_nfinished_tasks(*load_data_handle), load_data_get_wakeup_threshold(*load_data_handle));
+		load_data_update_elapsed_time(*load_data_handle);
+		STARPU_PTHREAD_COND_SIGNAL(&load_data_cond);
+		STARPU_PTHREAD_MUTEX_UNLOCK(&load_data_mutex);
+	}
+	else
+		STARPU_PTHREAD_MUTEX_UNLOCK(&load_data_mutex);
+}
+
+/******************************************************************************
+ *                  Initialization / Deinitialization                         *
+ *****************************************************************************/
+
+static int init_heat(struct starpu_mpi_lb_conf *itf)
+{
+	int i;
+	int sleep_task_threshold;
+	double wakeup_ratio;
+
+	starpu_mpi_comm_size(MPI_COMM_WORLD, &world_size);
+	starpu_mpi_comm_rank(MPI_COMM_WORLD, &my_rank);
+
+	/* Immediately return if the starpu_mpi_lb_conf is invalid. */
+	if (!(itf && itf->get_neighbors && itf->get_data_unit_to_migrate))
+	{
+		_STARPU_MSG("Error: struct starpu_mpi_lb_conf %p invalid\n", itf);
+		return 1;
+	}
+
+	user_itf = malloc(sizeof(struct starpu_mpi_lb_conf));
+	memcpy(user_itf, itf, sizeof(struct starpu_mpi_lb_conf));;
+
+	/* Get the neighbors of the local MPI node */
+	user_itf->get_neighbors(&neighbor_ids, &nneighbors);
+	if (nneighbors == 0)
+	{
+		_STARPU_MSG("Error: Function get_neighbors returning 0 neighbor\n");
+		free(user_itf);
+		user_itf = NULL;
+		return 2;
+	}
+
+	/* The sleep threshold is deducted from the numbering of tasks by the
+	 * application. For example, with this threshold, the submission thread
+	 * will stop when a task for which the numbering is 2000 or above will be
+	 * submitted to StarPU-MPI. However, much less tasks can be really
+	 * submitted to the local MPI node: the sleeping of the submission threads
+	 * checks the numbering of the tasks, not how many tasks have been
+	 * submitted to the local MPI node, which are two different things. */
+	char *sleep_env = starpu_getenv("LB_HEAT_SLEEP_THRESHOLD");
+	if (sleep_env)
+		sleep_task_threshold = atoi(sleep_env);
+	else
+		sleep_task_threshold = 2000;
+
+	char *wakeup_env = starpu_getenv("LB_HEAT_WAKEUP_RATIO");
+	if (wakeup_env)
+		wakeup_ratio = atof(wakeup_env);
+	else
+		wakeup_ratio = 0.5;
+
+	char *time_env = starpu_getenv("LB_HEAT_TIME_THRESHOLD");
+	if (time_env)
+		time_threshold = atoi(time_env);
+	else
+		time_threshold = 2000;
+
+	STARPU_PTHREAD_MUTEX_INIT(&load_data_mutex, NULL);
+	STARPU_PTHREAD_COND_INIT(&load_data_cond, NULL);
+
+	/* Allocate, initialize and register all the data handles that will be
+	 * needed for the load balancer, to not reallocate them at each balance
+	 * step. */
+
+	/* Local load data */
+	load_data_handle = malloc(sizeof(starpu_data_handle_t));
+	memset(load_data_handle, 0, sizeof(starpu_data_handle_t));
+	load_data_data_register(load_data_handle, STARPU_MAIN_RAM, sleep_task_threshold, wakeup_ratio);
+
+	/* Copy of the local load data to enable parallel update of the load data
+	 * with communications to neighbor nodes */
+	load_data_handle_cpy = malloc(sizeof(starpu_data_handle_t));
+	memset(load_data_handle_cpy, 0, sizeof(starpu_data_handle_t));
+	void *local_interface = starpu_data_get_interface_on_node(*load_data_handle, STARPU_MAIN_RAM);
+	struct starpu_data_interface_ops *itf_load_data = starpu_data_get_interface_ops(*load_data_handle);
+	starpu_data_register(load_data_handle_cpy, STARPU_MAIN_RAM, local_interface, itf_load_data);
+	starpu_mpi_data_register(*load_data_handle_cpy, TAG_LOAD(my_rank), my_rank);
+
+	/* Remote load data */
+	neighbor_load_data_handles = malloc(nneighbors*sizeof(starpu_data_handle_t));
+	memset(neighbor_load_data_handles, 0, nneighbors*sizeof(starpu_data_handle_t));
+	for (i = 0; i < nneighbors; i++)
+	{
+		load_data_data_register(&neighbor_load_data_handles[i], STARPU_MAIN_RAM, sleep_task_threshold, wakeup_ratio);
+		starpu_mpi_data_register(neighbor_load_data_handles[i], TAG_LOAD(neighbor_ids[i]), neighbor_ids[i]);
+	}
+
+	/* Data movements handles */
+	data_movements_handles = malloc(world_size*sizeof(starpu_data_handle_t));
+	for (i = 0; i < world_size; i++)
+	{
+		data_movements_data_register(&data_movements_handles[i], STARPU_MAIN_RAM, NULL, NULL, 0);
+		starpu_mpi_data_register(data_movements_handles[i], TAG_MOV(i), i);
+	}
+
+	/* Hash table of moved data that will be brought back on the node at
+	 * termination time */
+	mdh = NULL;
+
+	return 0;
+}
+
+/* Move back all the data that has been migrated out of this node at
+ * denitialization time of the load balancer, to ensure the consistency with
+ * the ranks of data originally registered by the application. */
+static void move_back_data()
+{
+	int i,j;
+
+	/* Update the new ranks for all concerned data */
+	for (i = 0; i < world_size; i++)
+	{
+		/* In this case, each data_movements_handles contains the handles to move back on the specific node */
+		int ndata_to_update = data_movements_get_size_tables(data_movements_handles[i]);
+		if (ndata_to_update)
+		{
+			_STARPU_DEBUG("Move back %d data from table %d on node %d\n", ndata_to_update, i, my_rank);
+
+			for (j = 0; j < ndata_to_update; j++)
+			{
+				starpu_data_handle_t handle = _starpu_mpi_data_get_data_handle_from_tag((data_movements_get_tags_table(data_movements_handles[i]))[j]);
+				STARPU_ASSERT(handle);
+
+				int dst_rank = (data_movements_get_ranks_table(data_movements_handles[i]))[j];
+				STARPU_ASSERT(i == dst_rank);
+
+				if (i == my_rank)
+				{
+					/* The data is moved back, so update the state of the
+					 * moved_data hash table to reflect this change */
+					struct moved_data_entry *md = NULL;
+					HASH_FIND_PTR(mdh, &handle, md);
+					if (md)
+					{
+						HASH_DEL(mdh, md);
+						free(md);
+					}
+				}
+
+				//fprintf(stderr,"Call of starpu_mpi_get_data_on_node(%d,%d) on node %d\n", starpu_mpi_data_get_tag(handle), dst_rank, my_rank);
+
+				/* Migrate the data handle */
+				starpu_mpi_get_data_on_node_detached(MPI_COMM_WORLD, handle, dst_rank, NULL, NULL);
+
+				//fprintf(stderr,"New rank (%d) of data %d upgraded on node %d\n", dst_rank, starpu_mpi_data_get_tag(handle), my_rank);
+				starpu_mpi_data_set_rank_comm(handle, dst_rank, MPI_COMM_WORLD);
+			}
+		}
+	}
+}
+
+static int deinit_heat()
+{
+	int i;
+
+	if ((!user_itf) || (nneighbors == 0))
+		return 1;
+
+	_STARPU_DEBUG("Shutting down heat lb policy\n");
+
+	unsigned int ndata_to_move_back = HASH_COUNT(mdh);
+
+	if (ndata_to_move_back)
+	{
+		_STARPU_DEBUG("Move back %u data on node %d ..\n", ndata_to_move_back, my_rank);
+		data_movements_reallocate_tables(data_movements_handles[my_rank], ndata_to_move_back);
+
+		int *tags = data_movements_get_tags_table(data_movements_handles[my_rank]);
+		int *ranks = data_movements_get_ranks_table(data_movements_handles[my_rank]);
+
+		int n = 0;
+		struct moved_data_entry *md, *tmp;
+		HASH_ITER(hh, mdh, md, tmp)
+		{
+			tags[n] = starpu_mpi_data_get_tag(md->handle);
+			ranks[n] = my_rank;
+			n++;
+		}
+	}
+	else
+		data_movements_reallocate_tables(data_movements_handles[my_rank], 0);
+
+	exchange_data_movements_infos();
+	move_back_data();
+
+	/* This assert ensures that all nodes have properly gotten back all the
+	 * data that has been moven out of the node. */
+	STARPU_ASSERT(HASH_COUNT(mdh) == 0);
+	free(mdh);
+	mdh = NULL;
+
+	starpu_data_unregister(*load_data_handle);
+	free(load_data_handle);
+	load_data_handle = NULL;
+
+	starpu_mpi_cache_flush(MPI_COMM_WORLD, *load_data_handle_cpy);
+	starpu_data_unregister(*load_data_handle_cpy);
+	free(load_data_handle_cpy);
+	load_data_handle_cpy = NULL;
+
+	for (i = 0; i < nneighbors; i++)
+	{
+		starpu_mpi_cache_flush(MPI_COMM_WORLD, neighbor_load_data_handles[i]);
+		starpu_data_unregister(neighbor_load_data_handles[i]);
+	}
+	free(neighbor_load_data_handles);
+	neighbor_load_data_handles = NULL;
+
+	nneighbors = 0;
+	free(neighbor_ids);
+	neighbor_ids = NULL;
+
+	for (i = 0; i < world_size; i++)
+	{
+		starpu_mpi_cache_flush(MPI_COMM_WORLD, data_movements_handles[i]);
+		data_movements_reallocate_tables(data_movements_handles[i], 0);
+		starpu_data_unregister(data_movements_handles[i]);
+	}
+	free(data_movements_handles);
+	data_movements_handles = NULL;
+
+	STARPU_PTHREAD_MUTEX_DESTROY(&load_data_mutex);
+	STARPU_PTHREAD_COND_DESTROY(&load_data_cond);
+	free(user_itf);
+	user_itf = NULL;
+
+	return 0;
+}
+
+/******************************************************************************
+ *                                  Policy                                    *
+ *****************************************************************************/
+
+struct load_balancer_policy load_heat_propagation_policy =
+{
+	.init = init_heat,
+	.deinit = deinit_heat,
+	.submitted_task_entry_point = submitted_task_heat,
+	.finished_task_entry_point = finished_task_heat,
+	.policy_name = "heat"
+};

+ 24 - 1
mpi/src/starpu_mpi_task_insert.c

@@ -36,6 +36,22 @@
 	else								\
 		starpu_mpi_isend_detached(data, dest, data_tag, comm, callback, arg);
 
+static void (*pre_submit_hook)(struct starpu_task *task) = NULL;
+
+int starpu_mpi_pre_submit_hook_register(void (*f)(struct starpu_task *))
+{
+	if (pre_submit_hook)
+		_STARPU_MSG("Warning: a pre_submit_hook has already been registered. Please check if you really want to erase the previously registered hook.\n");
+	pre_submit_hook = f;
+	return 0;
+}
+
+int starpu_mpi_pre_submit_hook_unregister()
+{
+	pre_submit_hook = NULL;
+	return 0;
+}
+
 int _starpu_mpi_find_executee_node(starpu_data_handle_t data, enum starpu_data_access_mode mode, int me, int *do_execute, int *inconsistent_execute, int *xrank)
 {
 	if (mode & STARPU_W)
@@ -472,6 +488,7 @@ int _starpu_mpi_task_build_v(MPI_Comm comm, struct starpu_codelet *codelet, stru
 		va_copy(varg_list_copy, varg_list);
 		_starpu_task_insert_create(codelet, task, varg_list_copy);
 		va_end(varg_list_copy);
+
 		return 0;
 	}
 }
@@ -526,7 +543,13 @@ int _starpu_mpi_task_insert_v(MPI_Comm comm, struct starpu_codelet *codelet, va_
 			starpu_task_destroy(task);
 		}
 	}
-	return _starpu_mpi_task_postbuild_v(comm, xrank, do_execute, descrs, nb_data);
+
+	int val = _starpu_mpi_task_postbuild_v(comm, xrank, do_execute, descrs, nb_data);
+
+	if (ret == 0 && pre_submit_hook)
+		pre_submit_hook(task);
+
+	return val;
 }
 
 int starpu_mpi_task_insert(MPI_Comm comm, struct starpu_codelet *codelet, ...)

+ 7 - 3
mpi/tests/Makefile.am

@@ -1,7 +1,7 @@
 # StarPU --- Runtime system for heterogeneous multicore architectures.
 #
 # Copyright (C) 2009-2012, 2015-2016  Université de Bordeaux
-# Copyright (C) 2010, 2011, 2012, 2013, 2014, 2015, 2016  CNRS
+# Copyright (C) 2010, 2011, 2012, 2013, 2014, 2015, 2016, 2017  CNRS
 #
 # StarPU is free software; you can redistribute it and/or modify
 # it under the terms of the GNU Lesser General Public License as published by
@@ -138,7 +138,8 @@ starpu_mpi_TESTS =				\
 	policy_selection			\
 	policy_selection2			\
 	early_request				\
-	starpu_redefine
+	starpu_redefine				\
+	load_balancer
 
 noinst_PROGRAMS =				\
 	datatypes				\
@@ -191,7 +192,8 @@ noinst_PROGRAMS =				\
 	policy_selection			\
 	policy_selection2			\
 	early_request				\
-	starpu_redefine
+	starpu_redefine				\
+	load_balancer
 
 
 XFAIL_TESTS=					\
@@ -301,6 +303,8 @@ early_request_LDADD =					\
 	../src/libstarpumpi-@STARPU_EFFECTIVE_VERSION@.la
 starpu_redefine_LDADD =					\
 	../src/libstarpumpi-@STARPU_EFFECTIVE_VERSION@.la
+load_balancer_LDADD =					\
+	../src/libstarpumpi-@STARPU_EFFECTIVE_VERSION@.la
 
 ring_SOURCES = ring.c
 ring_sync_SOURCES = ring_sync.c

+ 73 - 0
mpi/tests/load_balancer.c

@@ -0,0 +1,73 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2017  CNRS
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <starpu_mpi.h>
+#include <starpu_mpi_lb.h>
+#include "helper.h"
+
+#if !defined(STARPU_HAVE_UNSETENV)
+
+#warning unsetenv is not defined. Skipping test
+int main(int argc, char **argv)
+{
+	return STARPU_TEST_SKIPPED;
+}
+#else
+
+void get_neighbors(int **neighbor_ids, int *nneighbors)
+{
+	int ret, rank, size;
+	starpu_mpi_comm_rank(MPI_COMM_WORLD, &rank);
+	starpu_mpi_comm_size(MPI_COMM_WORLD, &size);
+	*nneighbors = 1;
+	*neighbor_ids = malloc(sizeof(int));
+	*neighbor_ids[0] = rank==size-1?0:rank+1;
+}
+
+void get_data_unit_to_migrate(starpu_data_handle_t **handle_unit, int *nhandles, int dst_node)
+{
+	*nhandles = 0;
+}
+
+int main(int argc, char **argv)
+{
+	int ret;
+	struct starpu_mpi_lb_conf itf;
+
+	itf.get_neighbors = get_neighbors;
+	itf.get_data_unit_to_migrate = get_data_unit_to_migrate;
+
+	MPI_Init(&argc, &argv);
+	ret = starpu_init(NULL);
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
+	ret = starpu_mpi_init(NULL, NULL, 0);
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_mpi_init");
+
+	unsetenv("STARPU_MPI_LB");
+	starpu_mpi_lb_init(NULL, NULL);
+	starpu_mpi_lb_shutdown();
+
+	starpu_mpi_lb_init("heat", &itf);
+	starpu_mpi_lb_shutdown();
+
+	starpu_mpi_shutdown();
+	starpu_shutdown();
+	MPI_Finalize();
+
+	return 0;
+}
+
+#endif

+ 6 - 0
src/core/sched_ctx.c

@@ -1720,6 +1720,12 @@ void* starpu_sched_ctx_get_policy_data(unsigned sched_ctx_id)
 	return sched_ctx->policy_data;
 }
 
+struct starpu_sched_policy *starpu_sched_ctx_get_sched_policy(unsigned sched_ctx_id)
+{
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	return sched_ctx->sched_policy;
+}
+
 struct starpu_worker_collection* starpu_sched_ctx_create_worker_collection(unsigned sched_ctx_id, enum starpu_worker_collection_type  worker_collection_type)
 {
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);