Procházet zdrojové kódy

First implementation of the load balancer mechanism.

Marc Sergent před 9 roky
rodič
revize
dc906567c4

+ 1 - 0
include/starpu_scheduler.h

@@ -51,6 +51,7 @@ struct starpu_sched_policy
 };
 
 struct starpu_sched_policy **starpu_sched_get_predefined_policies();
+void starpu_sched_policy_set_post_exec_hook(void (*post_exec_hook)(struct starpu_task *task), const char *policy_name);
 
 void starpu_worker_get_sched_condition(int workerid, starpu_pthread_mutex_t **sched_mutex, starpu_pthread_cond_t **sched_cond);
 

+ 2 - 1
mpi/Makefile.am

@@ -21,7 +21,8 @@ pkgconfig_DATA = libstarpumpi.pc starpumpi-1.0.pc starpumpi-1.1.pc starpumpi-1.2
 
 versincludedir = $(includedir)/starpu/$(STARPU_EFFECTIVE_VERSION)
 versinclude_HEADERS = 					\
-	include/starpu_mpi.h
+	include/starpu_mpi.h				\
+	include/starpu_mpi_lb.h
 
 showcheck:
 	RET=0 ; \

+ 7 - 0
mpi/include/starpu_mpi.h

@@ -74,6 +74,8 @@ void starpu_mpi_comm_amounts_retrieve(size_t *comm_amounts);
 void starpu_mpi_cache_flush(MPI_Comm comm, starpu_data_handle_t data_handle);
 void starpu_mpi_cache_flush_all_data(MPI_Comm comm);
 
+starpu_data_handle_t starpu_mpi_get_data_from_sent_cache(int dst_node);
+
 int starpu_mpi_comm_size(MPI_Comm comm, int *size);
 int starpu_mpi_comm_rank(MPI_Comm comm, int *rank);
 int starpu_mpi_world_rank(void);
@@ -95,6 +97,8 @@ int starpu_mpi_data_get_tag(starpu_data_handle_t handle);
 #define starpu_data_get_rank starpu_mpi_data_get_rank
 #define starpu_data_get_tag starpu_mpi_data_get_tag
 
+starpu_data_handle_t starpu_mpi_data_get_data_handle_from_tag(int tag);
+
 #define STARPU_MPI_NODE_SELECTION_CURRENT_POLICY -1
 #define STARPU_MPI_NODE_SELECTION_MOST_R_DATA    0
 
@@ -113,6 +117,9 @@ typedef void (*starpu_mpi_datatype_free_func_t)(MPI_Datatype *);
 int starpu_mpi_datatype_register(starpu_data_handle_t handle, starpu_mpi_datatype_allocate_func_t allocate_datatype_func, starpu_mpi_datatype_free_func_t free_datatype_func);
 int starpu_mpi_datatype_unregister(starpu_data_handle_t handle);
 
+int starpu_mpi_pre_submit_hook_register(void (*f)(struct starpu_task *));
+int starpu_mpi_pre_submit_hook_unregister();
+
 #ifdef __cplusplus
 }
 #endif

+ 41 - 0
mpi/include/starpu_mpi_lb.h

@@ -0,0 +1,41 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2016  Inria
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#ifndef __LOAD_BALANCER_H__
+#define __LOAD_BALANCER_H__
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct starpu_mpi_lb_conf
+{
+    void (*get_neighbors)(int **neighbor_ids, int *nneighbors);
+    void (*get_data_unit_to_migrate)(starpu_data_handle_t **handle_unit, int *nhandles, int dst_node);
+    const char *name;
+};
+
+/* Inits the load balancer's environment with the load policy provided by the
+ * user 
+ */
+void starpu_mpi_lb_init(struct starpu_mpi_lb_conf *);
+void starpu_mpi_lb_shutdown();
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // __LOAD_BALANCER_H__

+ 9 - 2
mpi/src/Makefile.am

@@ -44,7 +44,10 @@ noinst_HEADERS =					\
 	starpu_mpi_early_request.h			\
 	starpu_mpi_sync_data.h				\
 	starpu_mpi_comm.h				\
-	starpu_mpi_tag.h
+	starpu_mpi_tag.h				\
+	load_balancer/policy/data_movements_interface.h	\
+	load_balancer/policy/load_data_interface.h		\
+	load_balancer/policy/load_balancer_policy.h
 
 libstarpumpi_@STARPU_EFFECTIVE_VERSION@_la_SOURCES =	\
 	starpu_mpi.c					\
@@ -61,7 +64,11 @@ libstarpumpi_@STARPU_EFFECTIVE_VERSION@_la_SOURCES =	\
 	starpu_mpi_early_request.c			\
 	starpu_mpi_sync_data.c				\
 	starpu_mpi_comm.c				\
-	starpu_mpi_tag.c
+	starpu_mpi_tag.c				\
+	load_balancer/policy/data_movements_interface.c	\
+	load_balancer/policy/load_data_interface.c	\
+	load_balancer/policy/load_heat_propagation.c	\
+	load_balancer/load_balancer.c
 
 showcheck:
 	-cat /dev/null

+ 156 - 0
mpi/src/load_balancer/load_balancer.c

@@ -0,0 +1,156 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2016  Inria
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <starpu.h>
+#include <starpu_mpi.h>
+#include <starpu_scheduler.h>
+
+#include <starpu_mpi_lb.h>
+#include "policy/load_balancer_policy.h"
+
+static struct load_balancer_policy *defined_policy = NULL;
+static void (*saved_post_exec_hook)(struct starpu_task *task) = NULL;
+
+static void post_exec_hook_wrapper(struct starpu_task *task)
+{
+    //fprintf(stderr,"I am called ! \n");
+    if (defined_policy && defined_policy->finished_task_entry_point)
+        defined_policy->finished_task_entry_point();
+    if (saved_post_exec_hook)
+        saved_post_exec_hook(task);
+}
+
+static struct load_balancer_policy *predefined_policies[] = 
+{
+    &load_heat_propagation_policy,
+    NULL
+};
+
+void starpu_mpi_lb_init(struct starpu_mpi_lb_conf *itf)
+{
+    const char *policy_name = getenv("STARPU_MPI_LB");
+    if (!policy_name && itf)
+        policy_name = itf->name;
+
+    if (!policy_name || (strcmp(policy_name, "help") == 0))
+    {
+        fprintf(stderr,"Warning : load balancing is disabled for this run.\n");
+        fprintf(stderr,"Use the STARPU_MPI_LB = <name> environment variable to use a load balancer.\n");
+        fprintf(stderr,"Available load balancers :\n");
+        struct load_balancer_policy **policy;
+        for(policy=predefined_policies ; *policy!=NULL ; policy++)
+        {
+            struct load_balancer_policy *p = *policy;
+            fprintf(stderr," - %s\n", p->policy_name);
+        }
+        return;
+    }
+    
+	struct load_balancer_policy **policy;
+	for(policy=predefined_policies ; *policy!=NULL ; policy++)
+	{
+		struct load_balancer_policy *p = *policy;
+		if (p->policy_name)
+		{
+			if (strcmp(policy_name, p->policy_name) == 0)
+			{
+				/* we found a policy with the requested name */
+				defined_policy = p;
+                break;
+			}
+		}
+	}
+
+    if (!defined_policy)
+    {
+        fprintf(stderr,"Error : no load balancer with the name %s. Load balancing will be disabled for this run.\n", policy_name);
+        return;
+    }
+
+    if (defined_policy->init(itf))
+    {
+        fprintf(stderr,"Error in load_balancer->init: invalid starpu_mpi_lb_conf. Load balancing will be disabled for this run.\n");
+        return;
+    }
+
+    /* starpu_register_hook(submitted_task, defined_policy->submitted_task_entry_point); */
+    if (defined_policy->submitted_task_entry_point)
+        starpu_mpi_pre_submit_hook_register(defined_policy->submitted_task_entry_point);
+
+    /* starpu_register_hook(finished_task, defined_policy->finished_task_entry_point); */
+    if (defined_policy->finished_task_entry_point)
+    {
+        STARPU_ASSERT(saved_post_exec_hook == NULL);
+        struct starpu_sched_policy **predefined_sched_policies = starpu_sched_get_predefined_policies();
+        struct starpu_sched_policy **sched_policy;
+        const char *sched_policy_name = getenv("STARPU_SCHED");
+
+        if (!sched_policy_name)
+            sched_policy_name = "eager";
+
+        for(sched_policy=predefined_sched_policies ; *sched_policy!=NULL ; sched_policy++)
+        {
+            struct starpu_sched_policy *sched_p = *sched_policy;
+            if (strcmp(sched_policy_name, sched_p->policy_name) == 0)
+            {
+                /* We found the scheduling policy with the requested name */
+                saved_post_exec_hook = sched_p->post_exec_hook;
+                break;
+            }
+        }
+        starpu_sched_policy_set_post_exec_hook(post_exec_hook_wrapper, sched_policy_name);
+	}
+}
+
+void starpu_mpi_lb_shutdown()
+{
+    if (!defined_policy)
+        return;
+
+    if (defined_policy && defined_policy->deinit())
+        return;
+
+    /* starpu_unregister_hook(submitted_task, defined_policy->submitted_task_entry_point); */
+    if (defined_policy->submitted_task_entry_point)
+        starpu_mpi_pre_submit_hook_unregister();
+
+    /* starpu_unregister_hook(finished_task, defined_policy->finished_task_entry_point); */
+    if (defined_policy->finished_task_entry_point && saved_post_exec_hook != NULL)
+    {
+        struct starpu_sched_policy **predefined_sched_policies = starpu_sched_get_predefined_policies();
+        struct starpu_sched_policy **sched_policy;
+        const char *sched_policy_name = getenv("STARPU_SCHED");
+
+        if (!sched_policy_name)
+            sched_policy_name = "eager";
+
+        for(sched_policy=predefined_sched_policies ; *sched_policy!=NULL ; sched_policy++)
+        {
+            struct starpu_sched_policy *sched_p = *sched_policy;
+            if (strcmp(sched_policy_name, sched_p->policy_name) == 0)
+            {
+                /* We found the scheduling policy with the requested name */
+                sched_p->post_exec_hook = saved_post_exec_hook;
+                saved_post_exec_hook = NULL;
+                break;
+            }
+        }
+	}
+    STARPU_ASSERT(saved_post_exec_hook == NULL);
+    defined_policy = NULL;
+}

+ 280 - 0
mpi/src/load_balancer/policy/data_movements_interface.c

@@ -0,0 +1,280 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2016  Inria
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <starpu.h>
+#include <stdlib.h>
+
+#include "data_movements_interface.h"
+
+int **data_movements_get_ref_tags_table(starpu_data_handle_t handle)
+{
+	struct data_movements_interface *dm_interface =
+		(struct data_movements_interface *) starpu_data_get_interface_on_node(handle, STARPU_MAIN_RAM);
+
+    if (dm_interface->tags)
+        return &dm_interface->tags;
+    else
+        return NULL;
+}
+
+int **data_movements_get_ref_ranks_table(starpu_data_handle_t handle)
+{
+	struct data_movements_interface *dm_interface =
+		(struct data_movements_interface *) starpu_data_get_interface_on_node(handle, STARPU_MAIN_RAM);
+
+    if (dm_interface->ranks)
+        return &dm_interface->ranks;
+    else
+        return NULL;
+}
+
+int *data_movements_get_tags_table(starpu_data_handle_t handle)
+{
+	struct data_movements_interface *dm_interface =
+		(struct data_movements_interface *) starpu_data_get_interface_on_node(handle, STARPU_MAIN_RAM);
+
+	return dm_interface->tags;
+}
+
+int *data_movements_get_ranks_table(starpu_data_handle_t handle)
+{
+	struct data_movements_interface *dm_interface =
+		(struct data_movements_interface *) starpu_data_get_interface_on_node(handle, STARPU_MAIN_RAM);
+
+	return dm_interface->ranks;
+}
+
+int data_movements_get_size_tables(starpu_data_handle_t handle)
+{
+	struct data_movements_interface *dm_interface =
+		(struct data_movements_interface *) starpu_data_get_interface_on_node(handle, STARPU_MAIN_RAM);
+
+	return dm_interface->size;
+}
+
+int data_movements_reallocate_tables(starpu_data_handle_t handle, int size)
+{
+	struct data_movements_interface *dm_interface =
+		(struct data_movements_interface *) starpu_data_get_interface_on_node(handle, STARPU_MAIN_RAM);
+
+
+    if (dm_interface->size)
+    {
+        STARPU_ASSERT(dm_interface->tags);
+        free(dm_interface->tags);
+        dm_interface->tags = NULL;
+
+        STARPU_ASSERT(dm_interface->ranks);
+        free(dm_interface->ranks);
+        dm_interface->ranks = NULL;
+    }
+    else
+    {
+        STARPU_ASSERT(!dm_interface->tags);
+        STARPU_ASSERT(!dm_interface->ranks);
+    }
+
+    dm_interface->size = size;
+
+    if (dm_interface->size)
+    {
+        dm_interface->tags = malloc(size*sizeof(int));
+        dm_interface->ranks = malloc(size*sizeof(int));
+    }
+
+	return 0 ;
+}
+
+static void data_movements_register_data_handle(starpu_data_handle_t handle, unsigned home_node, void *data_interface)
+{
+	struct data_movements_interface *dm_interface = (struct data_movements_interface *) data_interface;
+
+	unsigned node;
+	for (node = 0; node < STARPU_MAXNODES; node++)
+	{
+		struct data_movements_interface *local_interface = (struct data_movements_interface *)
+			starpu_data_get_interface_on_node(handle, node);
+
+		local_interface->size = dm_interface->size;
+		if (node == home_node)
+		{
+			local_interface->tags = dm_interface->tags;
+			local_interface->ranks = dm_interface->ranks;
+		}
+		else
+		{
+			local_interface->tags = NULL;
+			local_interface->ranks = NULL;
+		}
+	}
+}
+
+static starpu_ssize_t data_movements_allocate_data_on_node(void *data_interface, unsigned node)
+{
+	struct data_movements_interface *dm_interface = (struct data_movements_interface *) data_interface;
+
+	int *addr_tags = NULL;
+	int *addr_ranks = NULL;
+	starpu_ssize_t requested_memory = dm_interface->size * sizeof(int);
+
+	addr_tags = (int*) starpu_malloc_on_node(node, requested_memory);
+	if (!addr_tags)
+		goto fail_tags;
+	addr_ranks = (int*) starpu_malloc_on_node(node, requested_memory);
+	if (!addr_ranks)
+		goto fail_ranks;
+
+	/* update the data properly in consequence */
+	dm_interface->tags = addr_tags;
+	dm_interface->ranks = addr_ranks;
+
+	return 2*requested_memory;
+
+fail_ranks:
+	starpu_free_on_node(node, (uintptr_t) addr_tags, requested_memory);
+fail_tags:
+	return -ENOMEM;
+}
+
+static void data_movements_free_data_on_node(void *data_interface, unsigned node)
+{
+	struct data_movements_interface *dm_interface = (struct data_movements_interface *) data_interface;
+	starpu_ssize_t requested_memory = dm_interface->size * sizeof(int);
+
+	starpu_free_on_node(node, (uintptr_t) dm_interface->tags, requested_memory);
+	starpu_free_on_node(node, (uintptr_t) dm_interface->ranks, requested_memory);
+}
+
+static size_t data_movements_get_size(starpu_data_handle_t handle)
+{
+	size_t size;
+	struct data_movements_interface *dm_interface = (struct data_movements_interface *) starpu_data_get_interface_on_node(handle, STARPU_MAIN_RAM);
+
+	size = (dm_interface->size * 2 * sizeof(int)) + sizeof(int);
+	return size;
+}
+
+static uint32_t data_movements_footprint(starpu_data_handle_t handle)
+{
+	return starpu_hash_crc32c_be(data_movements_get_size(handle), 0);
+}
+
+static int data_movements_pack_data(starpu_data_handle_t handle, unsigned node, void **ptr, starpu_ssize_t *count)
+{
+	STARPU_ASSERT(starpu_data_test_if_allocated_on_node(handle, node));
+
+	struct data_movements_interface *dm_interface = (struct data_movements_interface *)
+		starpu_data_get_interface_on_node(handle, node);
+
+	*count = data_movements_get_size(handle);
+	if (ptr != NULL)
+	{
+		char *data;
+		starpu_malloc_flags((void**) &data, *count, 0);
+        assert(data);
+		*ptr = data;
+		memcpy(data, &dm_interface->size, sizeof(int));
+        if (dm_interface->size)
+        {
+            memcpy(data+sizeof(int), dm_interface->tags, (dm_interface->size*sizeof(int)));
+            memcpy(data+sizeof(int)+(dm_interface->size*sizeof(int)), dm_interface->ranks, dm_interface->size*sizeof(int));
+        }
+	}
+
+	return 0;
+}
+
+static int data_movements_unpack_data(starpu_data_handle_t handle, unsigned node, void *ptr, size_t count)
+{
+	char *data = ptr;
+	STARPU_ASSERT(starpu_data_test_if_allocated_on_node(handle, node));
+
+	struct data_movements_interface *dm_interface = (struct data_movements_interface *)
+		starpu_data_get_interface_on_node(handle, node);
+
+    int size = 0;
+	memcpy(&size, data, sizeof(int));
+	STARPU_ASSERT(count == (2 * size * sizeof(int)) + sizeof(int));
+    
+    data_movements_reallocate_tables(handle, size);
+
+    if (dm_interface->size)
+    {
+        memcpy(dm_interface->tags, data+sizeof(int), dm_interface->size*sizeof(int));
+        memcpy(dm_interface->ranks, data+sizeof(int)+(dm_interface->size*sizeof(int)), dm_interface->size*sizeof(int));
+    }
+
+    return 0;
+}
+
+static int copy_any_to_any(void *src_interface, unsigned src_node,
+			   void *dst_interface, unsigned dst_node,
+			   void *async_data)
+{
+	struct data_movements_interface *src_data_movements = src_interface;
+	struct data_movements_interface *dst_data_movements = dst_interface;
+	int ret = 0;
+
+	if (starpu_interface_copy((uintptr_t) src_data_movements->tags, 0, src_node,
+				    (uintptr_t) dst_data_movements->tags, 0, dst_node,
+				     src_data_movements->size*sizeof(int),
+				     async_data))
+		ret = -EAGAIN;
+	if (starpu_interface_copy((uintptr_t) src_data_movements->ranks, 0, src_node,
+				    (uintptr_t) dst_data_movements->ranks, 0, dst_node,
+				     src_data_movements->size*sizeof(int),
+				     async_data))
+		ret = -EAGAIN;
+	return ret;
+}
+
+static const struct starpu_data_copy_methods data_movements_copy_methods =
+{
+	.any_to_any = copy_any_to_any
+};
+
+static struct starpu_data_interface_ops interface_data_movements_ops =
+{
+	.register_data_handle = data_movements_register_data_handle,
+	.allocate_data_on_node = data_movements_allocate_data_on_node,
+	.free_data_on_node = data_movements_free_data_on_node,
+	.copy_methods = &data_movements_copy_methods,
+	.get_size = data_movements_get_size,
+	.footprint = data_movements_footprint,
+	.interfaceid = STARPU_UNKNOWN_INTERFACE_ID,
+	.interface_size = sizeof(struct data_movements_interface),
+	.handle_to_pointer = NULL,
+	.pack_data = data_movements_pack_data,
+	.unpack_data = data_movements_unpack_data,
+	.describe = NULL
+};
+
+void data_movements_data_register(starpu_data_handle_t *handleptr, unsigned home_node, int *tags, int *ranks, int size)
+{
+	struct data_movements_interface data_movements =
+	{
+		.tags = tags,
+		.ranks = ranks,
+		.size = size
+	};
+
+	if (interface_data_movements_ops.interfaceid == STARPU_UNKNOWN_INTERFACE_ID)
+	{
+		interface_data_movements_ops.interfaceid = starpu_data_interface_get_next_id();
+	}
+
+	starpu_data_register(handleptr, home_node, &data_movements, &interface_data_movements_ops);
+}

+ 47 - 0
mpi/src/load_balancer/policy/data_movements_interface.h

@@ -0,0 +1,47 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2016  Inria
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <starpu.h>
+
+#ifndef __DATA_MOVEMENTS_INTERFACE_H
+#define __DATA_MOVEMENTS_INTERFACE_H
+
+/* interface for data_movements */
+struct data_movements_interface
+{
+    /* Data tags table */
+	int *tags;
+    /* Ranks table (where to move the corresponding data) */
+	int *ranks;
+    /* Size of the tables */
+	int size;
+};
+
+void data_movements_data_register(starpu_data_handle_t *handle, unsigned home_node, int *ranks, int *tags, int size);
+
+int **data_movements_get_ref_tags_table(starpu_data_handle_t handle);
+int **data_movements_get_ref_ranks_table(starpu_data_handle_t handle);
+int data_movements_reallocate_tables(starpu_data_handle_t handle, int size);
+
+int *data_movements_get_tags_table(starpu_data_handle_t handle);
+int *data_movements_get_ranks_table(starpu_data_handle_t handle);
+int data_movements_get_size_tables(starpu_data_handle_t handle);
+
+#define DATA_MOVEMENTS_GET_SIZE_TABLES(interface)	(((struct data_movements_interface *)(interface))->size)
+#define DATA_MOVEMENTS_GET_TAGS_TABLE(interface)	(((struct data_movements_interface *)(interface))->tags)
+#define DATA_MOVEMENTS_GET_RANKS_TABLE(interface)	(((struct data_movements_interface *)(interface))->ranks)
+
+#endif /* __DATA_MOVEMENTS_INTERFACE_H */

+ 51 - 0
mpi/src/load_balancer/policy/load_balancer_policy.h

@@ -0,0 +1,51 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2016  Inria
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#ifndef __LOAD_BALANCER_POLICY_H__
+#define __LOAD_BALANCER_POLICY_H__
+
+#include <starpu_mpi_lb.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/* A load balancer consists in a collection of operations on a data
+ * representing the load of the application (in terms of computation, memory,
+ * whatever). StarPU allows several entry points for the user. The load
+ * balancer allows the user to give its load balancing methods to be used on
+ * these entry points of the runtime system. */
+struct load_balancer_policy
+{
+    int (*init)(struct starpu_mpi_lb_conf *);
+    int (*deinit)();
+    void (*submitted_task_entry_point)();
+    void (*finished_task_entry_point)();
+
+    /* Name of the load balancing policy. The selection of the load balancer is
+     * performed through the use of the STARPU_MPI_LB=name environment
+     * variable.
+     */
+	const char *policy_name;
+};
+
+extern struct load_balancer_policy load_heat_propagation_policy;
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // __LOAD_BALANCER_POLICY_H__

+ 264 - 0
mpi/src/load_balancer/policy/load_data_interface.c

@@ -0,0 +1,264 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2016  Inria
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <starpu.h>
+#include <stdlib.h>
+
+#include "load_data_interface.h"
+
+int load_data_get_sleep_threshold(starpu_data_handle_t handle)
+{
+	struct load_data_interface *ld_interface =
+		(struct load_data_interface *) starpu_data_get_interface_on_node(handle, STARPU_MAIN_RAM);
+
+	return ld_interface->sleep_task_threshold;
+}
+
+int load_data_get_wakeup_threshold(starpu_data_handle_t handle)
+{
+	struct load_data_interface *ld_interface =
+		(struct load_data_interface *) starpu_data_get_interface_on_node(handle, STARPU_MAIN_RAM);
+
+	return ld_interface->wakeup_task_threshold;
+}
+
+int load_data_get_current_phase(starpu_data_handle_t handle)
+{
+	struct load_data_interface *ld_interface =
+		(struct load_data_interface *) starpu_data_get_interface_on_node(handle, STARPU_MAIN_RAM);
+
+    return ld_interface->phase;
+}
+
+int load_data_get_nsubmitted_tasks(starpu_data_handle_t handle)
+{
+	struct load_data_interface *ld_interface =
+		(struct load_data_interface *) starpu_data_get_interface_on_node(handle, STARPU_MAIN_RAM);
+
+    return ld_interface->nsubmitted_tasks;
+}
+
+int load_data_get_nfinished_tasks(starpu_data_handle_t handle)
+{
+	struct load_data_interface *ld_interface =
+		(struct load_data_interface *) starpu_data_get_interface_on_node(handle, STARPU_MAIN_RAM);
+
+    return ld_interface->nfinished_tasks;
+}
+
+int load_data_inc_nsubmitted_tasks(starpu_data_handle_t handle)
+{
+	struct load_data_interface *ld_interface =
+		(struct load_data_interface *) starpu_data_get_interface_on_node(handle, STARPU_MAIN_RAM);
+
+    (ld_interface->nsubmitted_tasks)++;
+
+	return 0;
+}
+
+int load_data_inc_nfinished_tasks(starpu_data_handle_t handle)
+{
+	struct load_data_interface *ld_interface =
+		(struct load_data_interface *) starpu_data_get_interface_on_node(handle, STARPU_MAIN_RAM);
+
+    (ld_interface->nfinished_tasks)++;
+
+	return 0;
+}
+
+int load_data_next_phase(starpu_data_handle_t handle)
+{
+	struct load_data_interface *ld_interface =
+		(struct load_data_interface *) starpu_data_get_interface_on_node(handle, STARPU_MAIN_RAM);
+
+    ld_interface->phase++;
+
+	return 0;
+}
+
+int load_data_update_elapsed_time(starpu_data_handle_t handle)
+{
+	struct load_data_interface *ld_interface =
+		(struct load_data_interface *) starpu_data_get_interface_on_node(handle, STARPU_MAIN_RAM);
+
+    ld_interface->elapsed_time = starpu_timing_now() - ld_interface->start;
+
+	return 0;
+}
+
+double load_data_get_elapsed_time(starpu_data_handle_t handle)
+{
+	struct load_data_interface *ld_interface =
+		(struct load_data_interface *) starpu_data_get_interface_on_node(handle, STARPU_MAIN_RAM);
+
+	return ld_interface->elapsed_time;
+}
+
+int load_data_update_wakeup_cond(starpu_data_handle_t handle)
+{
+	struct load_data_interface *ld_interface =
+		(struct load_data_interface *) starpu_data_get_interface_on_node(handle, STARPU_MAIN_RAM);
+
+    int previous_threshold = ld_interface->wakeup_task_threshold;
+    ld_interface->wakeup_task_threshold += (ld_interface->nsubmitted_tasks - previous_threshold) * ld_interface->wakeup_ratio;
+
+	return 0;
+}
+
+int load_data_wakeup_cond(starpu_data_handle_t handle)
+{
+	struct load_data_interface *ld_interface =
+		(struct load_data_interface *) starpu_data_get_interface_on_node(handle, STARPU_MAIN_RAM);
+
+    return ((ld_interface->wakeup_task_threshold > 0) && (ld_interface->nfinished_tasks == ld_interface->wakeup_task_threshold));
+}
+
+static void load_data_register_data_handle(starpu_data_handle_t handle, unsigned home_node, void *data_interface)
+{
+	struct load_data_interface *ld_interface = (struct load_data_interface *) data_interface;
+
+	unsigned node;
+	for (node = 0; node < STARPU_MAXNODES; node++)
+	{
+		struct load_data_interface *local_interface = (struct load_data_interface *)
+			starpu_data_get_interface_on_node(handle, node);
+
+		local_interface->start = ld_interface->start;
+		local_interface->elapsed_time = ld_interface->elapsed_time;
+		local_interface->phase = ld_interface->phase;
+		local_interface->nsubmitted_tasks = ld_interface->nsubmitted_tasks;
+		local_interface->nfinished_tasks = ld_interface->nsubmitted_tasks;
+        local_interface->wakeup_task_threshold = ld_interface->wakeup_task_threshold;
+        local_interface->wakeup_ratio = ld_interface->wakeup_ratio;
+        local_interface->sleep_task_threshold = ld_interface->sleep_task_threshold;
+	}
+}
+
+static starpu_ssize_t load_data_allocate_data_on_node(void *data_interface, unsigned node)
+{
+    (void) data_interface;
+    (void) node;
+
+    return 0;
+}
+
+static void load_data_free_data_on_node(void *data_interface, unsigned node)
+{
+    (void) data_interface;
+    (void) node;
+}
+
+static size_t load_data_get_size(starpu_data_handle_t handle)
+{
+    (void) handle;
+	return (sizeof(struct load_data_interface));
+}
+
+static uint32_t load_data_footprint(starpu_data_handle_t handle)
+{
+	struct load_data_interface *ld_interface =
+		(struct load_data_interface *) starpu_data_get_interface_on_node(handle, STARPU_MAIN_RAM);
+	return starpu_hash_crc32c_be(ld_interface->start, starpu_hash_crc32c_be(ld_interface->elapsed_time, starpu_hash_crc32c_be(ld_interface->nsubmitted_tasks, starpu_hash_crc32c_be(ld_interface->sleep_task_threshold, ld_interface->wakeup_task_threshold))));
+}
+
+static int load_data_pack_data(starpu_data_handle_t handle, unsigned node, void **ptr, starpu_ssize_t *count)
+{
+	STARPU_ASSERT(starpu_data_test_if_allocated_on_node(handle, node));
+
+	struct load_data_interface *ld_interface = (struct load_data_interface *)
+		starpu_data_get_interface_on_node(handle, node);
+
+	*count = load_data_get_size(handle);
+	if (ptr != NULL)
+	{
+		char *data;
+		starpu_malloc_flags((void**) &data, *count, 0);
+		*ptr = data;
+		memcpy(data, ld_interface, *count);
+	}
+
+	return 0;
+}
+
+static int load_data_unpack_data(starpu_data_handle_t handle, unsigned node, void *ptr, size_t count)
+{
+	char *data = ptr;
+	STARPU_ASSERT(starpu_data_test_if_allocated_on_node(handle, node));
+
+	struct load_data_interface *ld_interface = (struct load_data_interface *)
+		starpu_data_get_interface_on_node(handle, node);
+
+	STARPU_ASSERT(count == sizeof(struct load_data_interface));
+	memcpy(ld_interface, data, count);
+
+    return 0;
+}
+
+static int copy_any_to_any(void *src_interface, unsigned src_node,
+			   void *dst_interface, unsigned dst_node,
+			   void *async_data)
+{
+	(void) src_interface;
+	(void) dst_interface;
+    (void) src_node;
+    (void) dst_node;
+	(void) async_data;
+
+	return 0;
+}
+
+static const struct starpu_data_copy_methods load_data_copy_methods =
+{
+	.any_to_any = copy_any_to_any
+};
+
+static struct starpu_data_interface_ops interface_load_data_ops =
+{
+	.register_data_handle = load_data_register_data_handle,
+	.allocate_data_on_node = load_data_allocate_data_on_node,
+	.free_data_on_node = load_data_free_data_on_node,
+	.copy_methods = &load_data_copy_methods,
+	.get_size = load_data_get_size,
+	.footprint = load_data_footprint,
+	.interfaceid = STARPU_UNKNOWN_INTERFACE_ID,
+	.interface_size = sizeof(struct load_data_interface),
+	.handle_to_pointer = NULL,
+	.pack_data = load_data_pack_data,
+	.unpack_data = load_data_unpack_data,
+	.describe = NULL
+};
+
+void load_data_data_register(starpu_data_handle_t *handleptr, unsigned home_node, int sleep_task_threshold, double wakeup_ratio)
+{
+	struct load_data_interface load_data =
+	{
+		.start = starpu_timing_now(),
+        .elapsed_time = 0,
+        .phase = 0,
+        .nsubmitted_tasks = 0,
+        .nfinished_tasks = 0,
+		.sleep_task_threshold = sleep_task_threshold,
+        .wakeup_task_threshold = 0,
+		.wakeup_ratio = wakeup_ratio
+	};
+
+	if (interface_load_data_ops.interfaceid == STARPU_UNKNOWN_INTERFACE_ID)
+	{
+		interface_load_data_ops.interfaceid = starpu_data_interface_get_next_id();
+	}
+
+	starpu_data_register(handleptr, home_node, &load_data, &interface_load_data_ops);
+}

+ 69 - 0
mpi/src/load_balancer/policy/load_data_interface.h

@@ -0,0 +1,69 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2016  Inria
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <starpu.h>
+
+#ifndef __LOAD_DATA_INTERFACE_H
+#define __LOAD_DATA_INTERFACE_H
+
+/* interface for load_data */
+struct load_data_interface
+{
+    /* Starting time of the execution */
+    double start;
+    /* Elapsed time until the start time and the time when event "launch a load
+     * balancing phase" is triggered */
+	double elapsed_time;
+    /* Current submission phase, i.e how many balanced steps have already
+     * happened so far. */
+    int phase;
+    /* Number of currently submitted tasks */
+    int nsubmitted_tasks;
+    /* Number of currently finished tasks */
+    int nfinished_tasks;
+    /* Task threshold to sleep the submission thread */
+	int sleep_task_threshold;
+    /* Task threshold to wake-up the submission thread */
+	int wakeup_task_threshold;
+    /* Ratio of submitted tasks to wait for completion before waking up the
+     * submission thread */
+	double wakeup_ratio;
+};
+
+void load_data_data_register(starpu_data_handle_t *handle, unsigned home_node, int sleep_task_threshold, double wakeup_ratio);
+
+int load_data_get_sleep_threshold(starpu_data_handle_t handle);
+int load_data_get_wakeup_threshold(starpu_data_handle_t handle);
+int load_data_get_current_phase(starpu_data_handle_t handle);
+int load_data_get_nsubmitted_tasks(starpu_data_handle_t handle);
+int load_data_get_nfinished_tasks(starpu_data_handle_t handle);
+
+int load_data_inc_nsubmitted_tasks(starpu_data_handle_t handle);
+int load_data_inc_nfinished_tasks(starpu_data_handle_t handle);
+
+int load_data_next_phase(starpu_data_handle_t handle);
+
+int load_data_update_elapsed_time(starpu_data_handle_t handle);
+double load_data_get_elapsed_time(starpu_data_handle_t handle);
+
+int load_data_update_wakeup_cond(starpu_data_handle_t handle);
+int load_data_wakeup_cond(starpu_data_handle_t handle);
+
+#define LOAD_DATA_GET_NSUBMITTED_TASKS(interface)	(((struct load_data_interface *)(interface))->nsubmitted_tasks)
+#define LOAD_DATA_GET_SLEEP_THRESHOLD(interface)	(((struct load_data_interface *)(interface))->sleep_task_threshold)
+#define LOAD_DATA_GET_WAKEUP_THRESHOLD(interface)	(((struct load_data_interface *)(interface))->wakeup_task_threshold)
+
+#endif /* __LOAD_DATA_INTERFACE_H */

+ 641 - 0
mpi/src/load_balancer/policy/load_heat_propagation.c

@@ -0,0 +1,641 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2016  Inria
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <starpu_mpi.h>
+#include <common/uthash.h>
+#include <math.h>
+
+#include "load_balancer_policy.h"
+#include "data_movements_interface.h"
+#include "load_data_interface.h"
+
+static int TAG_LOAD(int n) 
+{
+    return ((n+1) << 24);
+}
+
+static int TAG_MOV(int n)
+{
+    return ((n+1) << 20);
+}
+
+/* Hash table of local pieces of data that has been moved out of the local MPI
+ * node by the load balancer. All of these pieces of data must be migrated back
+ * to the local node at the end of the execution. */
+struct moved_data_entry
+{
+    UT_hash_handle hh;
+    starpu_data_handle_t handle;
+};
+
+static struct moved_data_entry *mdh = NULL;
+
+static starpu_pthread_mutex_t load_data_mutex;
+static starpu_pthread_cond_t load_data_cond;
+
+/* MPI infos */
+static int my_rank;
+static int world_size;
+
+/* Number of neighbours of the local MPI node and their IDs. These are given by
+ * the get_neighbors() method, and thus can be easily changed. */
+static int *neighbor_ids = NULL;
+static int nneighbors = 0;
+
+/* Local load data */
+static starpu_data_handle_t *load_data_handle = NULL;
+static starpu_data_handle_t *load_data_handle_cpy = NULL;
+/* Load data of neighbours */
+static starpu_data_handle_t *neighbor_load_data_handles = NULL;
+
+/* Table which contains a data_movements_handle for each MPI node of
+ * MPI_COMM_WORLD. Since all the MPI nodes must be advised of any data
+ * movement, this table will be used to perform communications of data
+ * movements handles following an all-to-all model. */
+static starpu_data_handle_t *data_movements_handles = NULL;
+
+/* Load balancer interface which contains the application-specific methods for
+ * the load balancer to use. */
+static struct starpu_mpi_lb_conf *user_itf = NULL;
+
+static double time_threshold = 20000;
+
+/******************************************************************************
+ *                              Balancing                                     *
+ *****************************************************************************/
+
+
+/* Decides which data has to move where, and fills the
+ * data_movements_handles[my_rank] data handle from that. 
+ * In data :
+ *  - local load_data_handle
+ *  - nneighbors
+ *  - neighbor_ids[nneighbors]
+ *  - neighbor_load_data_handles[nneighbors] 
+ * Out data :
+ *  - data_movements_handles[my_rank]
+ */
+
+static void balance(starpu_data_handle_t load_data_cpy)
+{
+    int less_loaded = -1;
+    int n;
+    double elapsed_time, ref_elapsed_time;
+    double my_elapsed_time = load_data_get_elapsed_time(load_data_cpy);
+
+    /* Search for the less loaded neighbor */
+    ref_elapsed_time = my_elapsed_time;
+    for (n = 0; n < nneighbors; n++)
+    {
+        elapsed_time = load_data_get_elapsed_time(neighbor_load_data_handles[n]);
+        if (ref_elapsed_time > elapsed_time)
+        {
+            //fprintf(stderr,"Node%d: ref local time %lf vs neighbour%d time %lf\n", my_rank, ref_elapsed_time, neighbor_ids[n], elapsed_time);
+            less_loaded = neighbor_ids[n];
+            ref_elapsed_time = elapsed_time;
+        }
+    }
+    
+    /* We found it */
+    if (less_loaded >= 0)
+    {
+        double diff_time = my_elapsed_time - ref_elapsed_time;
+        /* If the difference is higher than a time threshold, we move
+         * one data to the less loaded neighbour. */
+        /* TODO: How to decide the time threshold ? */
+        if ((time_threshold > 0) && (diff_time >= time_threshold))
+        {
+            //fprintf(stderr,"Less loaded found on node %d : %d\n", my_rank, less_loaded);
+            starpu_data_handle_t *handles = NULL;
+            int nhandles = 0;
+            user_itf->get_data_unit_to_migrate(&handles, &nhandles, less_loaded);
+
+            data_movements_reallocate_tables(data_movements_handles[my_rank], nhandles);
+
+            if (nhandles)
+            {
+                int *tags = data_movements_get_tags_table(data_movements_handles[my_rank]);
+                int *ranks = data_movements_get_ranks_table(data_movements_handles[my_rank]);
+            
+                for (n = 0; n < nhandles; n++)
+                {
+                    tags[n] = starpu_mpi_data_get_tag(handles[n]); 
+                    ranks[n] = less_loaded; 
+                }
+
+                free(handles);
+            }
+        }
+        else
+            data_movements_reallocate_tables(data_movements_handles[my_rank], 0);
+    }
+    else
+        data_movements_reallocate_tables(data_movements_handles[my_rank], 0);
+}
+
+static void exchange_load_data_infos(starpu_data_handle_t load_data_cpy)
+{
+    int i;
+
+    /* Allocate all requests and status for point-to-point communications */
+    starpu_mpi_req load_send_req[nneighbors];
+    starpu_mpi_req load_recv_req[nneighbors];
+
+    MPI_Status load_send_status[nneighbors];
+    MPI_Status load_recv_status[nneighbors];
+
+    int flag;
+
+    /* Send the local load data to neighbour nodes, and receive the remote load
+     * data from neighbour nodes */
+    for (i = 0; i < nneighbors; i++)
+    {
+        starpu_mpi_isend(load_data_cpy, &load_send_req[i], neighbor_ids[i], TAG_LOAD(my_rank), MPI_COMM_WORLD);
+        starpu_mpi_irecv(neighbor_load_data_handles[i], &load_recv_req[i], neighbor_ids[i], TAG_LOAD(neighbor_ids[i]), MPI_COMM_WORLD);
+    }
+    
+    /* Wait for completion of all send requests */
+    for (i = 0; i < nneighbors; i++)
+    {
+        flag = 0;
+        while (!flag)
+            starpu_mpi_test(&load_send_req[i], &flag, &load_send_status[i]);
+    }
+
+    /* Wait for completion of all receive requests */
+    for (i = 0; i < nneighbors; i++)
+    {   
+        flag = 0;
+        while (!flag)
+            starpu_mpi_test(&load_recv_req[i], &flag, &load_recv_status[i]);
+    }
+}
+
+static void exchange_data_movements_infos()
+{
+    int i;
+
+    /* Allocate all requests and status for point-to-point communications */
+    starpu_mpi_req data_movements_send_req[world_size];
+    starpu_mpi_req data_movements_recv_req[world_size];
+
+    MPI_Status data_movements_send_status[world_size];
+    MPI_Status data_movements_recv_status[world_size];
+
+    int flag;
+
+    /* Send the new ranks of local data to all other nodes, and receive the new
+     * ranks of all remote data from all other nodes */
+    for (i = 0; i < world_size; i++)
+    {
+        if (i != my_rank)
+        {
+            //fprintf(stderr,"Send data movement of %d to %d\n", my_rank, i);
+            starpu_mpi_isend(data_movements_handles[my_rank], &data_movements_send_req[i], i, TAG_MOV(my_rank), MPI_COMM_WORLD);
+            //fprintf(stderr,"Receive data movement from %d on %d\n", i, my_rank);
+            starpu_mpi_irecv(data_movements_handles[i], &data_movements_recv_req[i], i, TAG_MOV(i), MPI_COMM_WORLD);
+        }
+    }
+    
+    /* Wait for completion of all send requests */
+    for (i = 0; i < world_size; i++)
+    {
+        if (i != my_rank)
+        {
+            //fprintf(stderr,"Wait for sending data movement of %d to %d\n", my_rank, i);
+            flag = 0;
+            while (!flag)
+                starpu_mpi_test(&data_movements_send_req[i], &flag, &data_movements_send_status[i]);
+        }
+    }
+
+    /* Wait for completion of all receive requests */
+    for (i = 0; i < world_size; i++)
+    {
+        if (i != my_rank)
+        {
+            //fprintf(stderr,"Wait for recieving data movement from %d on %d\n", i, my_rank);
+            flag = 0;
+            while (!flag)
+                starpu_mpi_test(&data_movements_recv_req[i], &flag, &data_movements_recv_status[i]);
+        }
+    }
+}
+
+static void update_data_ranks()
+{
+    int i,j;
+
+    /* Update the new ranks for all concerned data */
+    for (i = 0; i < world_size; i++)
+    {
+        int ndata_to_update = data_movements_get_size_tables(data_movements_handles[i]);
+        if (ndata_to_update)
+        {
+            //fprintf(stderr,"Update %d data from table %d on node %d\n", ndata_to_update, i, my_rank);
+
+            for (j = 0; j < ndata_to_update; j++)
+            {
+                starpu_data_handle_t handle = starpu_mpi_data_get_data_handle_from_tag((data_movements_get_tags_table(data_movements_handles[i]))[j]);
+                STARPU_ASSERT(handle);
+                int dst_rank = (data_movements_get_ranks_table(data_movements_handles[i]))[j];
+
+                /* Save the fact that the data has been moved out of this node */
+                if (i == my_rank)
+                {
+                    struct moved_data_entry *md = (struct moved_data_entry *)malloc(sizeof(struct moved_data_entry));
+                    md->handle = handle;
+                    HASH_ADD_PTR(mdh, handle, md);
+                }
+                else
+                {
+                    if (dst_rank == my_rank)
+                    {
+                        /* The data has been moved out, and now is moved back, so
+                         * update the state of the moved_data hash table to reflect
+                         * this change */
+                        struct moved_data_entry *md = NULL;
+                        HASH_FIND_PTR(mdh, &handle, md);
+                        if (md)
+                        {
+                            HASH_DEL(mdh, md);
+                            free(md);
+                        }
+                    }
+                }
+
+                //if (i == my_rank)
+                //{
+                //    if (dst_rank != my_rank)
+                //        fprintf(stderr,"Move data %p (tag %d) from node %d to node %d\n", handle, (data_movements_get_tags_table(data_movements_handles[i]))[j], my_rank, dst_rank);
+                //    else
+                //        fprintf(stderr,"Bring back data %p (tag %d) from node %d on node %d\n", handle, (data_movements_get_tags_table(data_movements_handles[i]))[j], starpu_mpi_data_get_rank(handle), my_rank);
+                //}
+
+                //fprintf(stderr,"Call of starpu_mpi_get_data_on_node(%d,%d) on node %d\n", starpu_mpi_data_get_tag(handle), dst_rank, my_rank);
+
+                /* Migrate the data handle */
+                starpu_mpi_get_data_on_node_detached(MPI_COMM_WORLD, handle, dst_rank, NULL, NULL);
+
+                //fprintf(stderr,"New rank (%d) of data %d upgraded on node %d\n", dst_rank, starpu_mpi_data_get_tag(handle), my_rank);
+                starpu_mpi_data_set_rank_comm(handle, dst_rank, MPI_COMM_WORLD);
+            }
+        }
+    }
+}
+
+static void clean_balance()
+{
+    int i;
+    starpu_mpi_cache_flush(MPI_COMM_WORLD, *load_data_handle_cpy);
+    for (i = 0; i < nneighbors; i++)
+        starpu_mpi_cache_flush(MPI_COMM_WORLD, neighbor_load_data_handles[i]);
+    for (i = 0; i < world_size; i++)
+        starpu_mpi_cache_flush(MPI_COMM_WORLD, data_movements_handles[i]);
+}
+
+/* Core function of the load balancer. Computes from the load_data_cpy handle a
+ * load balancing of the work to come (if needed), perform the necessary data
+ * communications and negociate with the other nodes the rebalancing. */
+static void heat_balance(starpu_data_handle_t load_data_cpy)
+{
+    /* Exchange load data handles with neighboring nodes */
+    exchange_load_data_infos(load_data_cpy);
+
+    /* Determine if this node should sent data to other nodes : 
+     * which ones, how much data */
+    balance(load_data_cpy); 
+
+    /* Exchange data movements with neighboring nodes */
+    exchange_data_movements_infos();
+
+    /* Perform data movements */
+    update_data_ranks();
+
+    /* Clean the data handles to properly launch the next balance phase */
+    clean_balance();
+}
+
+
+/******************************************************************************
+ *                      Heat Load Balancer Entry Points                       *
+ *****************************************************************************/
+
+
+static void submitted_task_heat(struct starpu_task *task)
+{
+    load_data_inc_nsubmitted_tasks(*load_data_handle);
+    //if (load_data_get_nsubmitted_tasks(*load_data_handle) > task->tag_id)
+    //{
+    //    fprintf(stderr,"Error : nsubmitted_tasks (%d) > tag_id (%lld) ! \n", load_data_get_nsubmitted_tasks(*load_data_handle), (long long int)task->tag_id);
+    //    STARPU_ASSERT(0);
+    //}
+
+    int phase = load_data_get_current_phase(*load_data_handle);
+    /* Numbering of tasks in StarPU-MPI should be given by the application with
+     * the STARPU_TAG_ONLY insert task option for now. */
+    /* TODO: Properly implement a solution for numbering tasks in StarPU-MPI */
+    if ((task->tag_id / load_data_get_sleep_threshold(*load_data_handle)) > phase)
+    {
+        STARPU_PTHREAD_MUTEX_LOCK(&load_data_mutex);
+        load_data_update_wakeup_cond(*load_data_handle);
+        //fprintf(stderr,"Node %d sleep on tag %lld\n", my_rank, (long long int)task->tag_id);
+        //if (load_data_get_nsubmitted_tasks(*load_data_handle) < load_data_get_wakeup_threshold(*load_data_handle))
+        //{
+        //    fprintf(stderr,"Error : nsubmitted_tasks (%d) lower than wakeup_threshold (%d) !\n", load_data_get_nsubmitted_tasks(*load_data_handle), load_data_get_wakeup_threshold(*load_data_handle));
+        //    STARPU_ASSERT(0);
+        //}
+
+        if (load_data_get_wakeup_threshold(*load_data_handle) > load_data_get_nfinished_tasks(*load_data_handle))
+            STARPU_PTHREAD_COND_WAIT(&load_data_cond, &load_data_mutex);
+
+        load_data_next_phase(*load_data_handle);
+
+        /* Register a copy of the load data at this moment, to allow to compute
+         * the heat balance while not locking the load data during the whole
+         * balance step, which could cause all the workers to wait on the lock
+         * to update the data. */
+
+        struct starpu_data_interface_ops *itf_load_data = starpu_data_get_interface_ops(*load_data_handle);
+		void* itf_src = starpu_data_get_interface_on_node(*load_data_handle, STARPU_MAIN_RAM);
+		void* itf_dst = starpu_data_get_interface_on_node(*load_data_handle_cpy, STARPU_MAIN_RAM);
+        memcpy(itf_dst, itf_src, itf_load_data->interface_size);
+
+        fprintf(stderr,"Balance phase %d on node %d\n", load_data_get_current_phase(*load_data_handle), my_rank);
+        STARPU_PTHREAD_MUTEX_UNLOCK(&load_data_mutex);
+
+        heat_balance(*load_data_handle_cpy);
+    }
+}
+
+static void finished_task_heat()
+{
+    //fprintf(stderr,"Try to decrement nsubmitted_tasks...");
+    STARPU_PTHREAD_MUTEX_LOCK(&load_data_mutex);
+
+    load_data_inc_nfinished_tasks(*load_data_handle);
+    //fprintf(stderr,"Decrement nsubmitted_tasks, now %d\n", load_data_get_nsubmitted_tasks(*load_data_handle));
+    if (load_data_wakeup_cond(*load_data_handle))
+    {
+        //fprintf(stderr,"Wakeup ! nfinished_tasks = %d, wakeup_threshold = %d\n", load_data_get_nfinished_tasks(*load_data_handle), load_data_get_wakeup_threshold(*load_data_handle));
+        load_data_update_elapsed_time(*load_data_handle); 
+        STARPU_PTHREAD_COND_SIGNAL(&load_data_cond);
+        STARPU_PTHREAD_MUTEX_UNLOCK(&load_data_mutex);
+    }
+    else
+        STARPU_PTHREAD_MUTEX_UNLOCK(&load_data_mutex);
+}
+
+
+/******************************************************************************
+ *                  Initialization / Deinitialization                         *
+ *****************************************************************************/
+
+static int init_heat(struct starpu_mpi_lb_conf *itf)
+{
+    int i;
+    int sleep_task_threshold;
+    double wakeup_ratio; 
+
+    starpu_mpi_comm_size(MPI_COMM_WORLD, &world_size);
+    starpu_mpi_comm_rank(MPI_COMM_WORLD, &my_rank);
+
+    /* Immediately return if the starpu_mpi_lb_conf is invalid. */
+    if (!(itf && itf->get_neighbors && itf->get_data_unit_to_migrate))
+        return 1;
+
+    user_itf = malloc(sizeof(struct starpu_mpi_lb_conf));
+    memcpy(user_itf, itf, sizeof(struct starpu_mpi_lb_conf));;
+
+    /* Get the neighbors of the local MPI node */
+    user_itf->get_neighbors(&neighbor_ids, &nneighbors);
+    if (nneighbors == 0)
+    {
+        free(user_itf);
+        user_itf = NULL;
+        return 2;
+    }
+
+    /* The sleep threshold is deducted from the numbering of tasks by the
+     * application. For example, with this threshold, the submission thread
+     * will stop when a task for which the numbering is 2000 or above will be
+     * submitted to StarPU-MPI. However, much less tasks can be really
+     * submitted to the local MPI node: the sleeping of the submission threads
+     * checks the numbering of the tasks, not how many tasks have been
+     * submitted to the local MPI node, which are two different things. */
+    char *sleep_env = getenv("LB_HEAT_SLEEP_THRESHOLD");
+    if (sleep_env)
+        sleep_task_threshold = atoi(sleep_env);
+    else
+        sleep_task_threshold = 2000; 
+
+    char *wakeup_env = getenv("LB_HEAT_WAKEUP_RATIO");
+    if (wakeup_env)
+        wakeup_ratio = atof(wakeup_env);
+    else
+        wakeup_ratio = 0.5; 
+
+    char *time_env = getenv("LB_HEAT_TIME_THRESHOLD");
+    if (time_env)
+        time_threshold = atoi(time_env);
+    else
+        time_threshold = 2000; 
+
+    STARPU_PTHREAD_MUTEX_INIT(&load_data_mutex, NULL);
+    STARPU_PTHREAD_COND_INIT(&load_data_cond, NULL);
+
+    /* Allocate, initialize and register all the data handles that will be
+     * needed for the load balancer, to not reallocate them at each balance
+     * step. */
+
+    /* Local load data */
+    load_data_handle = malloc(sizeof(starpu_data_handle_t));
+    memset(load_data_handle, 0, sizeof(starpu_data_handle_t));
+    load_data_data_register(load_data_handle, STARPU_MAIN_RAM, sleep_task_threshold, wakeup_ratio);
+
+    /* Copy of the local load data to enable parallel update of the load data
+     * with communications to neighbor nodes */
+    load_data_handle_cpy = malloc(sizeof(starpu_data_handle_t));
+    memset(load_data_handle_cpy, 0, sizeof(starpu_data_handle_t));
+    void *local_interface = starpu_data_get_interface_on_node(*load_data_handle, STARPU_MAIN_RAM);
+    struct starpu_data_interface_ops *itf_load_data = starpu_data_get_interface_ops(*load_data_handle);
+    starpu_data_register(load_data_handle_cpy, STARPU_MAIN_RAM, local_interface, itf_load_data);
+    starpu_mpi_data_register(*load_data_handle_cpy, TAG_LOAD(my_rank), my_rank);
+
+    /* Remote load data */
+
+    neighbor_load_data_handles = malloc(nneighbors*sizeof(starpu_data_handle_t));
+    memset(neighbor_load_data_handles, 0, nneighbors*sizeof(starpu_data_handle_t));
+    for (i = 0; i < nneighbors; i++)
+    {
+        load_data_data_register(&neighbor_load_data_handles[i], STARPU_MAIN_RAM, sleep_task_threshold, wakeup_ratio);
+        starpu_mpi_data_register(neighbor_load_data_handles[i], TAG_LOAD(neighbor_ids[i]), neighbor_ids[i]);
+    }
+
+    /* Data movements handles */
+    data_movements_handles = malloc(world_size*sizeof(starpu_data_handle_t));
+    for (i = 0; i < world_size; i++)
+    {
+        data_movements_data_register(&data_movements_handles[i], STARPU_MAIN_RAM, NULL, NULL, 0);
+        starpu_mpi_data_register(data_movements_handles[i], TAG_MOV(i), i);
+    }
+
+    /* Hash table of moved data that will be brought back on the node at
+     * termination time */
+    mdh = NULL; 
+
+    return 0;
+}
+
+/* Move back all the data that has been migrated out of this node at
+ * denitialization time of the load balancer, to ensure the consistency with
+ * the ranks of data originally registered by the application. */
+static void move_back_data()
+{
+    int i,j;
+
+    /* Update the new ranks for all concerned data */
+    for (i = 0; i < world_size; i++)
+    {
+        /* In this case, each data_movements_handles contains the handles to move back on the specific node */
+        int ndata_to_update = data_movements_get_size_tables(data_movements_handles[i]);
+        if (ndata_to_update)
+        {
+            //fprintf(stderr,"Move back %d data from table %d on node %d\n", ndata_to_update, i, my_rank);
+
+            for (j = 0; j < ndata_to_update; j++)
+            {
+                starpu_data_handle_t handle = starpu_mpi_data_get_data_handle_from_tag((data_movements_get_tags_table(data_movements_handles[i]))[j]);
+                STARPU_ASSERT(handle);
+
+                int dst_rank = (data_movements_get_ranks_table(data_movements_handles[i]))[j];
+                STARPU_ASSERT(i == dst_rank);
+
+                if (i == my_rank)
+                {
+                    /* The data is moved back, so update the state of the
+                     * moved_data hash table to reflect this change */
+                    struct moved_data_entry *md = NULL;
+                    HASH_FIND_PTR(mdh, &handle, md);
+                    if (md)
+                    {
+                        HASH_DEL(mdh, md);
+                        free(md);
+                    }
+                }
+
+                //fprintf(stderr,"Call of starpu_mpi_get_data_on_node(%d,%d) on node %d\n", starpu_mpi_data_get_tag(handle), dst_rank, my_rank);
+
+                /* Migrate the data handle */
+                starpu_mpi_get_data_on_node_detached(MPI_COMM_WORLD, handle, dst_rank, NULL, NULL);
+
+                //fprintf(stderr,"New rank (%d) of data %d upgraded on node %d\n", dst_rank, starpu_mpi_data_get_tag(handle), my_rank);
+                starpu_mpi_data_set_rank_comm(handle, dst_rank, MPI_COMM_WORLD);
+            }
+        }
+    }
+}
+
+static int deinit_heat()
+{
+    int i;
+
+    if ((!user_itf) || (nneighbors == 0))
+        return 1;
+
+    unsigned int ndata_to_move_back = HASH_COUNT(mdh);
+
+    if (ndata_to_move_back)
+    {
+        //fprintf(stderr,"Move back %u data on node %d ..\n", ndata_to_move_back, my_rank);
+        data_movements_reallocate_tables(data_movements_handles[my_rank], ndata_to_move_back);
+
+        int *tags = data_movements_get_tags_table(data_movements_handles[my_rank]);
+        int *ranks = data_movements_get_ranks_table(data_movements_handles[my_rank]);
+
+        int n = 0;
+        struct moved_data_entry *md, *tmp;
+        HASH_ITER(hh, mdh, md, tmp)
+        {
+            tags[n] = starpu_mpi_data_get_tag(md->handle); 
+            ranks[n] = my_rank; 
+            n++;
+        }
+    }
+    else
+        data_movements_reallocate_tables(data_movements_handles[my_rank], 0);
+
+    exchange_data_movements_infos();
+    move_back_data();
+
+    /* This assert ensures that all nodes have properly gotten back all the
+     * data that has been moven out of the node. */
+    STARPU_ASSERT(HASH_COUNT(mdh) == 0);
+    free(mdh);
+    mdh = NULL;
+
+    starpu_data_unregister(*load_data_handle);
+    free(load_data_handle);
+    load_data_handle = NULL;
+
+    starpu_mpi_cache_flush(MPI_COMM_WORLD, *load_data_handle_cpy);
+    starpu_data_unregister(*load_data_handle_cpy);
+    free(load_data_handle_cpy);
+    load_data_handle_cpy = NULL;
+
+    for (i = 0; i < nneighbors; i++)
+    {
+        starpu_mpi_cache_flush(MPI_COMM_WORLD, neighbor_load_data_handles[i]);
+        starpu_data_unregister(neighbor_load_data_handles[i]);
+    }
+    free(neighbor_load_data_handles);
+    neighbor_load_data_handles = NULL;
+
+    nneighbors = 0;
+    free(neighbor_ids);
+    neighbor_ids = NULL;
+
+    for (i = 0; i < world_size; i++)
+    {
+        starpu_mpi_cache_flush(MPI_COMM_WORLD, data_movements_handles[i]);
+        data_movements_reallocate_tables(data_movements_handles[i], 0);
+        starpu_data_unregister(data_movements_handles[i]);
+    }
+    free(data_movements_handles);
+    data_movements_handles = NULL;
+
+    STARPU_PTHREAD_MUTEX_DESTROY(&load_data_mutex);
+    STARPU_PTHREAD_COND_DESTROY(&load_data_cond);
+    free(user_itf);
+    user_itf = NULL;
+
+    return 0;
+}
+
+
+/******************************************************************************
+ *                                  Policy                                    *
+ *****************************************************************************/
+
+
+struct load_balancer_policy load_heat_propagation_policy =
+{
+    .init = init_heat,
+    .deinit = deinit_heat,
+    .submitted_task_entry_point = submitted_task_heat,
+    .finished_task_entry_point = finished_task_heat,
+	.policy_name = "heat"
+};

+ 2 - 2
mpi/src/starpu_mpi.c

@@ -1174,7 +1174,7 @@ static void _starpu_mpi_receive_early_data(struct _starpu_mpi_envelope *envelope
 
 	starpu_data_handle_t data_handle = NULL;
 	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
-	data_handle = _starpu_mpi_data_get_data_handle_from_tag(envelope->data_tag);
+	data_handle = starpu_mpi_data_get_data_handle_from_tag(envelope->data_tag);
 	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
 
 	if (data_handle && starpu_data_get_interface_id(data_handle) < STARPU_MAX_INTERFACE_ID)
@@ -1590,7 +1590,7 @@ int _starpu_mpi_initialize(int *argc, char ***argv, int initialize_mpi, MPI_Comm
  * create MSG processes to run application's main */
 int _starpu_mpi_simgrid_init(int argc, char *argv[])
 {
-	return _starpu_mpi_initialize(&argc, &argv, 1, MPI_COMM_WORLD);
+	return _starpu_mpi_initialize(&argc, &argv, 1, MPI_COMM_WORLD, NULL);
 }
 #endif
 

+ 17 - 0
mpi/src/starpu_mpi_cache.c

@@ -38,6 +38,23 @@ int _starpu_cache_enabled=1;
 MPI_Comm _starpu_cache_comm;
 int _starpu_cache_comm_size;
 
+starpu_data_handle_t starpu_mpi_get_data_from_sent_cache(int dst_node)
+{
+    starpu_data_handle_t data = NULL;
+
+    STARPU_PTHREAD_MUTEX_LOCK(&_cache_received_mutex[dst_node]);
+    struct _starpu_data_entry *entry = _cache_received_data[dst_node];
+    while (entry && entry->hh.next)
+    {
+        entry = entry->hh.next;
+    }
+    if (entry)
+        data = entry->data;
+    STARPU_PTHREAD_MUTEX_UNLOCK(&_cache_received_mutex[dst_node]);
+
+    return data;
+}
+
 int starpu_mpi_cache_is_enabled()
 {
 	return _starpu_cache_enabled==1;

+ 3 - 3
mpi/src/starpu_mpi_tag.c

@@ -55,7 +55,7 @@ void _starpu_mpi_tag_free(void)
 	registered_tag_handles = NULL;
 }
 
-starpu_data_handle_t _starpu_mpi_data_get_data_handle_from_tag(int tag)
+starpu_data_handle_t starpu_mpi_data_get_data_handle_from_tag(int tag)
 {
 	struct handle_tag_entry *ret;
 
@@ -79,8 +79,8 @@ void _starpu_mpi_data_register_tag(starpu_data_handle_t handle, int tag)
 	entry = (struct handle_tag_entry *) malloc(sizeof(*entry));
 	STARPU_ASSERT(entry != NULL);
 
-	STARPU_ASSERT_MSG(!(_starpu_mpi_data_get_data_handle_from_tag(tag)),
-			  "There is already a data handle %p registered with the tag %d\n", _starpu_mpi_data_get_data_handle_from_tag(tag), tag);
+	STARPU_ASSERT_MSG(!(starpu_mpi_data_get_data_handle_from_tag(tag)),
+			  "There is already a data handle %p registered with the tag %d\n", starpu_mpi_data_get_data_handle_from_tag(tag), tag);
 
 	_STARPU_MPI_DEBUG(42, "Adding handle %p with tag %d in hashtable\n", handle, tag);
 

+ 1 - 1
mpi/src/starpu_mpi_tag.h

@@ -29,7 +29,7 @@ void _starpu_mpi_tag_init(void);
 void _starpu_mpi_tag_free(void);
 void _starpu_mpi_data_register_tag(starpu_data_handle_t handle, int tag);
 int _starpu_mpi_data_release_tag(starpu_data_handle_t handle);
-starpu_data_handle_t _starpu_mpi_data_get_data_handle_from_tag(int tag);
+starpu_data_handle_t starpu_mpi_data_get_data_handle_from_tag(int tag);
 
 #ifdef __cplusplus
 }

+ 25 - 1
mpi/src/starpu_mpi_task_insert.c

@@ -36,6 +36,23 @@
 	else								\
 		starpu_mpi_isend_detached(data, dest, data_tag, comm, callback, arg);
 
+static void (*pre_submit_hook)(struct starpu_task *task) = NULL;
+
+int starpu_mpi_pre_submit_hook_register(void (*f)(struct starpu_task *))
+{
+    if (pre_submit_hook)
+        fprintf(stderr,"Warning: a pre_submit_hook has already been registered.\nPlease check if you really want to erase the previously registered hook.\n");
+
+    pre_submit_hook = f; 
+    return 0;
+}
+
+int starpu_mpi_pre_submit_hook_unregister()
+{
+    pre_submit_hook = NULL;
+    return 0;
+}
+
 static
 int _starpu_mpi_find_executee_node(starpu_data_handle_t data, enum starpu_data_access_mode mode, int me, int *do_execute, int *inconsistent_execute, int *xrank)
 {
@@ -472,6 +489,7 @@ int _starpu_mpi_task_build_v(MPI_Comm comm, struct starpu_codelet *codelet, stru
 		va_copy(varg_list_copy, varg_list);
 		_starpu_task_insert_create(codelet, task, varg_list_copy);
 		va_end(varg_list_copy);
+
 		return 0;
 	}
 }
@@ -527,7 +545,13 @@ int _starpu_mpi_task_insert_v(MPI_Comm comm, struct starpu_codelet *codelet, va_
 			starpu_task_destroy(task);
 		}
 	}
-	return _starpu_mpi_task_postbuild_v(comm, xrank, do_execute, descrs, nb_data);
+
+	int val = _starpu_mpi_task_postbuild_v(comm, xrank, do_execute, descrs, nb_data);
+
+    if (ret == 0 && pre_submit_hook)
+        pre_submit_hook(task);
+
+    return val;
 }
 
 int starpu_mpi_task_insert(MPI_Comm comm, struct starpu_codelet *codelet, ...)

+ 14 - 0
src/core/sched_policy.c

@@ -72,6 +72,20 @@ struct starpu_sched_policy *_starpu_get_sched_policy(struct _starpu_sched_ctx *s
 	return sched_ctx->sched_policy;
 }
 
+void starpu_sched_policy_set_post_exec_hook(void (*post_exec_hook)(struct starpu_task *task), const char *policy_name)
+{
+    int i;
+    struct _starpu_sched_ctx *sched_ctx;
+    for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
+    {
+        sched_ctx = _starpu_get_sched_ctx_struct(i);
+        if (sched_ctx && sched_ctx->sched_policy)
+            sched_ctx->sched_policy->post_exec_hook = post_exec_hook;
+        else
+            break;
+    }
+}
+
 /*
  *	Methods to initialize the scheduling policy
  */