Browse Source

add Beranger's ordo

Andra Hugo 10 years ago
parent
commit
305100c749

+ 2 - 1
Makefile.am

@@ -74,7 +74,8 @@ versinclude_HEADERS = 				\
 	include/starpu_profiling.h		\
 	include/starpu_profiling.h		\
 	include/starpu_bound.h			\
 	include/starpu_bound.h			\
 	include/starpu_scheduler.h		\
 	include/starpu_scheduler.h		\
-	include/starpu_sched_component.h		\
+	include/schedulers/heteroprio.h		\
+	include/starpu_sched_component.h	\
 	include/starpu_sched_ctx.h		\
 	include/starpu_sched_ctx.h		\
 	include/starpu_sched_ctx_hypervisor.h	\
 	include/starpu_sched_ctx_hypervisor.h	\
 	include/starpu_top.h			\
 	include/starpu_top.h			\

+ 1 - 0
examples/Makefile.am

@@ -195,6 +195,7 @@ STARPU_EXAMPLES =				\
 	matvecmult/matvecmult			\
 	matvecmult/matvecmult			\
 	profiling/profiling			\
 	profiling/profiling			\
 	scheduler/dummy_sched			\
 	scheduler/dummy_sched			\
+	scheduler/heteroprio_test		\
 	sched_ctx/sched_ctx			\
 	sched_ctx/sched_ctx			\
 	sched_ctx/prio				\
 	sched_ctx/prio				\
 	sched_ctx/dummy_sched_with_ctx		\
 	sched_ctx/dummy_sched_with_ctx		\

+ 199 - 0
examples/scheduler/heteroprio_test.c

@@ -0,0 +1,199 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2015  INRIA
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <starpu.h>
+#include <schedulers/heteroprio.h>
+#include <unistd.h>
+
+void initSchedulerCallback(){
+	// CPU uses 3 buckets
+	starpu_heteroprio_set_nb_prios(0, FSTARPU_CPU_IDX, 3);
+	// It uses direct mapping idx => idx
+	unsigned idx;
+	for(idx = 0; idx < 3; ++idx){
+		starpu_heteroprio_set_mapping(0, FSTARPU_CPU_IDX, idx, idx);
+		starpu_heteroprio_set_faster_arch(0, FSTARPU_CPU_IDX, idx);
+	}
+#ifdef STARPU_USE_OPENCL
+	// OpenCL is enabled and uses 2 buckets
+	starpu_heteroprio_set_nb_prios(0, FSTARPU_OPENCL_IDX, 2);
+	// OpenCL will first look to priority 2
+	starpu_heteroprio_set_mapping(0, FSTARPU_OPENCL_IDX, 0, 2);
+	// For this bucket OpenCL is the fastest
+	starpu_heteroprio_set_faster_arch(0, FSTARPU_OPENCL_IDX, 2);
+	// And CPU is 4 times slower
+	starpu_heteroprio_set_arch_slow_factor(0, FSTARPU_CPU_IDX, 2, 4.0f);
+
+	starpu_heteroprio_set_mapping(0, FSTARPU_OPENCL_IDX, 1, 1);
+	// We let the CPU as the fastest and tell that OpenCL is 1.7 times slower
+	starpu_heteroprio_set_arch_slow_factor(0, FSTARPU_OPENCL_IDX, 1, 1.7f);
+#endif
+}
+
+
+void callback_a_cpu(void *buffers[], void *cl_arg){
+	usleep(100000);
+	printf("COMMUTE_LOG] callback %s\n", __FUNCTION__); fflush(stdout);
+}
+
+void callback_b_cpu(void *buffers[], void *cl_arg){
+	usleep(100000);
+	printf("COMMUTE_LOG] callback %s\n", __FUNCTION__); fflush(stdout);
+}
+
+void callback_c_cpu(void *buffers[], void *cl_arg){
+	usleep(100000);
+	printf("COMMUTE_LOG] callback %s\n", __FUNCTION__); fflush(stdout);
+}
+
+#ifdef STARPU_USE_OPENCL
+void callback_a_opencl(void *buffers[], void *cl_arg){
+	usleep(100000);
+	printf("COMMUTE_LOG] callback %s\n", __FUNCTION__); fflush(stdout);
+}
+
+void callback_b_opencl(void *buffers[], void *cl_arg){
+	usleep(100000);
+	printf("COMMUTE_LOG] callback %s\n", __FUNCTION__); fflush(stdout);
+}
+
+void callback_c_opencl(void *buffers[], void *cl_arg){
+	usleep(100000);
+	printf("COMMUTE_LOG] callback %s\n", __FUNCTION__); fflush(stdout);
+}
+#endif
+
+int main(int argc, char** argv){
+	unsigned ret;
+	struct starpu_conf conf;
+	ret = starpu_conf_init(&conf);
+	assert(ret == 0);
+
+	conf.sched_policy_name = "heteroprio";
+	conf.sched_policy_init = &initSchedulerCallback;
+
+	ret = starpu_init(&conf);
+	assert(ret == 0);
+
+	starpu_pause();
+
+	printf("Worker = %d\n",  starpu_worker_get_count());
+	printf("Worker CPU = %d\n", starpu_cpu_worker_get_count());
+#ifdef STARPU_USE_OPENCL
+	printf("Worker OpenCL = %d\n", starpu_cpu_worker_get_count());
+#endif
+
+	struct starpu_codelet codeleteA;
+	{
+		memset(&codeleteA, 0, sizeof(codeleteA));
+		codeleteA.nbuffers = 2;
+		codeleteA.modes[0] = STARPU_RW;
+		codeleteA.modes[1] = STARPU_RW;
+		codeleteA.name = "codeleteA";
+		codeleteA.where = STARPU_CPU;
+		codeleteA.cpu_funcs[0] = callback_a_cpu;
+#ifdef STARPU_USE_OPENCL
+		codeleteA.where |= STARPU_OPENCL;
+		codeleteA.cpu_funcs[0] = callback_a_opencl;
+#endif
+	}
+	struct starpu_codelet codeleteB;
+	{
+		memset(&codeleteB, 0, sizeof(codeleteB));
+		codeleteB.nbuffers = 2;
+		codeleteB.modes[0] = STARPU_RW;
+		codeleteB.modes[1] = STARPU_RW;
+		codeleteB.name = "codeleteB";
+		codeleteB.where = STARPU_CPU;
+		codeleteB.cpu_funcs[0] = callback_b_cpu;
+#ifdef STARPU_USE_OPENCL
+		codeleteB.where |= STARPU_OPENCL;
+		codeleteB.cpu_funcs[0] = callback_b_opencl;
+#endif
+	}
+	struct starpu_codelet codeleteC;
+	{
+		memset(&codeleteC, 0, sizeof(codeleteC));
+		codeleteC.nbuffers = 2;
+		codeleteC.modes[0] = STARPU_RW;
+		codeleteC.modes[1] = STARPU_RW;
+		codeleteC.name = "codeleteC";
+		codeleteC.where = STARPU_CPU;
+		codeleteC.cpu_funcs[0] = callback_c_cpu;
+#ifdef STARPU_USE_OPENCL
+		codeleteC.where |= STARPU_OPENCL;
+		codeleteC.cpu_funcs[0] = callback_c_opencl;
+#endif
+	}
+
+	const int nbHandles = 10;
+	printf("Nb handles = %d\n", nbHandles);
+
+	starpu_data_handle_t handles[nbHandles];
+	memset(handles, 0, sizeof(handles[0])*nbHandles);
+	int dataA[nbHandles];
+	int idx;
+	for(idx = 0; idx < nbHandles; ++idx){
+		dataA[idx] = idx;
+	}
+	int idxHandle;
+	for(idxHandle = 0; idxHandle < nbHandles; ++idxHandle){
+		starpu_variable_data_register(&handles[idxHandle], 0, (uintptr_t)&dataA[idxHandle], sizeof(dataA[idxHandle]));
+	}
+
+	const int nbTasks = 40;
+	printf("Submit %d tasks \n", nbTasks);
+
+	starpu_resume();
+	
+	int idxTask;
+	for(idxTask = 0; idxTask < nbTasks; ++idxTask){
+		starpu_insert_task(&codeleteA,
+				   STARPU_PRIORITY, 0,
+				   (STARPU_RW), handles[(idxTask*2)%nbHandles],
+				   (STARPU_RW), handles[(idxTask*3+1)%nbHandles],
+				   0);
+		starpu_insert_task(&codeleteB,
+				   STARPU_PRIORITY, 1,
+				   (STARPU_RW), handles[(idxTask*2 +1 )%nbHandles],
+				   (STARPU_RW), handles[(idxTask*2)%nbHandles],
+				   0);
+		starpu_insert_task(&codeleteC,
+				   STARPU_PRIORITY, 2,
+				   (STARPU_RW), handles[(idxTask)%nbHandles],
+				   (STARPU_RW), handles[(idxTask*idxTask)%nbHandles],
+				   0);
+	}
+
+	printf("Wait task\n");
+
+	starpu_task_wait_for_all();
+	starpu_pause();
+
+	printf("Release data\n");
+
+	for(idxHandle = 0 ; idxHandle < nbHandles ; ++idxHandle){
+		starpu_data_unregister(handles[idxHandle]);
+	}
+	
+	printf("Shutdown\n");
+
+
+	starpu_resume();
+	starpu_shutdown();
+
+	return 0;
+}

+ 83 - 0
include/schedulers/heteroprio.h

@@ -0,0 +1,83 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2015  INRIA
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#ifndef __STARPU_SCHEDULER_HETEROPRIO_H__
+#define __STARPU_SCHEDULER_HETEROPRIO_H__
+
+#include <starpu.h>
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
+#define HETEROPRIO_MAX_PRIO 100
+/* #define FSTARPU_NB_TYPES 3 */
+/* #define FSTARPU_CPU_IDX 0 */
+/* #define FSTARPU_CUDA_IDX 1 */
+/* #define FSTARPU_OPENCL_IDX 2 */
+
+#define HETEROPRIO_MAX_PREFETCH 2
+#if HETEROPRIO_MAX_PREFETCH <= 0
+#error HETEROPRIO_MAX_PREFETCH == 1 means no prefetch so HETEROPRIO_MAX_PREFETCH must >= 1
+#endif
+
+enum FStarPUTypes{
+// First will be zero
+#ifdef STARPU_USE_CPU
+	FSTARPU_CPU_IDX, // = 0
+#endif
+#ifdef STARPU_USE_CUDA
+	FSTARPU_CUDA_IDX,
+#endif
+#ifdef STARPU_USE_OPENCL
+	FSTARPU_OPENCL_IDX,
+#endif
+// This will be the number of archs
+	FSTARPU_NB_TYPES
+};
+
+const unsigned FStarPUTypesToArch[FSTARPU_NB_TYPES+1] = {
+#ifdef STARPU_USE_CPU
+	STARPU_CPU,
+#endif
+#ifdef STARPU_USE_CUDA
+	STARPU_CUDA,
+#endif
+#ifdef STARPU_USE_OPENCL
+	STARPU_OPENCL,
+#endif
+	0
+};
+
+
+/** Tell how many prio there are for a given arch */
+void starpu_heteroprio_set_nb_prios(unsigned sched_ctx_id, enum FStarPUTypes arch, unsigned max_prio);
+
+/** Set the mapping for a given arch prio=>bucket */
+void starpu_heteroprio_set_mapping(unsigned sched_ctx_id, enum FStarPUTypes arch, unsigned source_prio, unsigned dest_bucket_id);
+
+/** Tell which arch is the faster for the tasks of a bucket (optional) */
+void starpu_heteroprio_set_faster_arch(unsigned sched_ctx_id, enum FStarPUTypes arch, unsigned bucket_id);
+
+/** Tell how slow is a arch for the tasks of a bucket (optional) */ 
+void starpu_heteroprio_set_arch_slow_factor(unsigned sched_ctx_id, enum FStarPUTypes arch, unsigned bucket_id, float slow_factor);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* __STARPU_SCHEDULER_HETEROPRIO_H__ */

+ 2 - 0
include/starpu.h

@@ -126,6 +126,8 @@ struct starpu_conf
 	int global_sched_ctx_min_priority;
 	int global_sched_ctx_min_priority;
 	int global_sched_ctx_max_priority;
 	int global_sched_ctx_max_priority;
 
 
+	void (*sched_policy_init)(void);
+
 };
 };
 
 
 int starpu_conf_init(struct starpu_conf *conf);
 int starpu_conf_init(struct starpu_conf *conf);

+ 3 - 0
include/starpu_sched_ctx.h

@@ -31,6 +31,7 @@ extern "C"
 #define STARPU_SCHED_CTX_HIERARCHY_LEVEL         (5<<16)
 #define STARPU_SCHED_CTX_HIERARCHY_LEVEL         (5<<16)
 #define STARPU_SCHED_CTX_NESTED                  (6<<16)
 #define STARPU_SCHED_CTX_NESTED                  (6<<16)
 #define STARPU_SCHED_CTX_AWAKE_WORKERS           (7<<16)
 #define STARPU_SCHED_CTX_AWAKE_WORKERS           (7<<16)
+#define STARPU_SCHED_CTX_POLICY_INIT             (8<<16)
 
 
 unsigned starpu_sched_ctx_create(int *workerids_ctx, int nworkers_ctx, const char *sched_ctx_name, ...);
 unsigned starpu_sched_ctx_create(int *workerids_ctx, int nworkers_ctx, const char *sched_ctx_name, ...);
 
 
@@ -138,6 +139,8 @@ void starpu_sched_ctx_move_task_to_ctx(struct starpu_task *task, unsigned sched_
 
 
 int starpu_sched_ctx_get_worker_rank(unsigned sched_ctx_id);
 int starpu_sched_ctx_get_worker_rank(unsigned sched_ctx_id);
 
 
+void (*starpu_sched_ctx_get_sched_policy_init(unsigned sched_ctx_id))(void);
+
 #ifdef STARPU_USE_SC_HYPERVISOR
 #ifdef STARPU_USE_SC_HYPERVISOR
 void starpu_sched_ctx_call_pushed_task_cb(int workerid, unsigned sched_ctx_id);
 void starpu_sched_ctx_call_pushed_task_cb(int workerid, unsigned sched_ctx_id);
 #endif /* STARPU_USE_SC_HYPERVISOR */
 #endif /* STARPU_USE_SC_HYPERVISOR */

+ 1 - 0
src/Makefile.am

@@ -192,6 +192,7 @@ libstarpu_@STARPU_EFFECTIVE_VERSION@_la_SOURCES = 		\
 	sched_policies/fifo_queues.c				\
 	sched_policies/fifo_queues.c				\
 	sched_policies/parallel_heft.c				\
 	sched_policies/parallel_heft.c				\
 	sched_policies/parallel_eager.c				\
 	sched_policies/parallel_eager.c				\
+	sched_policies/heteroprio.c				\
 	drivers/driver_common/driver_common.c			\
 	drivers/driver_common/driver_common.c			\
 	drivers/disk/driver_disk.c				\
 	drivers/disk/driver_disk.c				\
 	datawizard/memory_nodes.c				\
 	datawizard/memory_nodes.c				\

+ 17 - 4
src/core/sched_ctx.c

@@ -455,7 +455,9 @@ struct _starpu_sched_ctx* _starpu_create_sched_ctx(struct starpu_sched_policy *p
 						   int nworkers_ctx, unsigned is_initial_sched,
 						   int nworkers_ctx, unsigned is_initial_sched,
 						   const char *sched_ctx_name,
 						   const char *sched_ctx_name,
 						   int min_prio_set, int min_prio,
 						   int min_prio_set, int min_prio,
-						   int max_prio_set, int max_prio, unsigned awake_workers)
+						   int max_prio_set, int max_prio, 
+						   unsigned awake_workers,  
+						   void (*sched_policy_init)(void))
 {
 {
 	struct _starpu_machine_config *config = (struct _starpu_machine_config *)_starpu_get_machine_config();
 	struct _starpu_machine_config *config = (struct _starpu_machine_config *)_starpu_get_machine_config();
 
 
@@ -499,7 +501,7 @@ struct _starpu_sched_ctx* _starpu_create_sched_ctx(struct starpu_sched_policy *p
 	sched_ctx->main_master = -1;
 	sched_ctx->main_master = -1;
 	sched_ctx->perf_arch.devices = NULL;
 	sched_ctx->perf_arch.devices = NULL;
 	sched_ctx->perf_arch.ndevices = 0;
 	sched_ctx->perf_arch.ndevices = 0;
-
+	sched_ctx->init_sched = sched_policy_init;
 	int w;
 	int w;
 	for(w = 0; w < nworkers; w++)
 	for(w = 0; w < nworkers; w++)
 	{
 	{
@@ -693,7 +695,7 @@ unsigned starpu_sched_ctx_create_inside_interval(const char *policy_name, const
 	for(i = 0; i < nw; i++)
 	for(i = 0; i < nw; i++)
 		printf("%d ", workers[i]);
 		printf("%d ", workers[i]);
 	printf("\n");
 	printf("\n");
-	sched_ctx = _starpu_create_sched_ctx(selected_policy, workers, nw, 0, sched_ctx_name, 0, 0, 0, 0, 1);
+	sched_ctx = _starpu_create_sched_ctx(selected_policy, workers, nw, 0, sched_ctx_name, 0, 0, 0, 0, 1, NULL);
 	sched_ctx->min_ncpus = min_ncpus;
 	sched_ctx->min_ncpus = min_ncpus;
 	sched_ctx->max_ncpus = max_ncpus;
 	sched_ctx->max_ncpus = max_ncpus;
 	sched_ctx->min_ngpus = min_ngpus;
 	sched_ctx->min_ngpus = min_ngpus;
@@ -723,6 +725,7 @@ unsigned starpu_sched_ctx_create(int *workerids, int nworkers, const char *sched
 	unsigned hierarchy_level = 0;
 	unsigned hierarchy_level = 0;
 	unsigned nesting_sched_ctx = STARPU_NMAX_SCHED_CTXS;
 	unsigned nesting_sched_ctx = STARPU_NMAX_SCHED_CTXS;
 	unsigned awake_workers = 0;
 	unsigned awake_workers = 0;
+	void (*init_sched)(void) = NULL;
 
 
 	va_start(varg_list, sched_ctx_name);
 	va_start(varg_list, sched_ctx_name);
 	while ((arg_type = va_arg(varg_list, int)) != 0)
 	while ((arg_type = va_arg(varg_list, int)) != 0)
@@ -759,6 +762,10 @@ unsigned starpu_sched_ctx_create(int *workerids, int nworkers, const char *sched
 		{
 		{
 			awake_workers = 1;
 			awake_workers = 1;
 		}
 		}
+		else if (arg_type == STARPU_SCHED_CTX_POLICY_INIT)
+		{
+			init_sched = va_arg(varg_list, void(*)(void));
+		}
 		else
 		else
 		{
 		{
 			STARPU_ABORT_MSG("Unrecognized argument %d\n", arg_type);
 			STARPU_ABORT_MSG("Unrecognized argument %d\n", arg_type);
@@ -768,7 +775,7 @@ unsigned starpu_sched_ctx_create(int *workerids, int nworkers, const char *sched
 	va_end(varg_list);
 	va_end(varg_list);
 
 
 	struct _starpu_sched_ctx *sched_ctx = NULL;
 	struct _starpu_sched_ctx *sched_ctx = NULL;
-	sched_ctx = _starpu_create_sched_ctx(sched_policy, workerids, nworkers, 0, sched_ctx_name, min_prio_set, min_prio, max_prio_set, max_prio, awake_workers);
+	sched_ctx = _starpu_create_sched_ctx(sched_policy, workerids, nworkers, 0, sched_ctx_name, min_prio_set, min_prio, max_prio_set, max_prio, awake_workers, init_sched);
 	sched_ctx->hierarchy_level = hierarchy_level;
 	sched_ctx->hierarchy_level = hierarchy_level;
 	sched_ctx->nesting_sched_ctx = nesting_sched_ctx;
 	sched_ctx->nesting_sched_ctx = nesting_sched_ctx;
 
 
@@ -2315,3 +2322,9 @@ int starpu_sched_ctx_get_worker_rank(unsigned sched_ctx_id)
 
 
 	return -1;
 	return -1;
 }
 }
+
+void (*starpu_sched_ctx_get_sched_policy_init(unsigned sched_ctx_id))(void)
+{
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	return sched_ctx->init_sched;
+}

+ 4 - 1
src/core/sched_ctx.h

@@ -165,6 +165,9 @@ struct _starpu_sched_ctx
 	   the threads to sleep in order to replace them with other threads or leave
 	   the threads to sleep in order to replace them with other threads or leave
 	   them awake & use them in the parallel code*/
 	   them awake & use them in the parallel code*/
 	unsigned awake_workers;
 	unsigned awake_workers;
+	
+	/* function called when initializing the scheduler */
+	void (*init_sched)();
 };
 };
 
 
 struct _starpu_machine_config;
 struct _starpu_machine_config;
@@ -175,7 +178,7 @@ void _starpu_init_all_sched_ctxs(struct _starpu_machine_config *config);
 /* allocate all structures belonging to a context */
 /* allocate all structures belonging to a context */
 struct _starpu_sched_ctx*  _starpu_create_sched_ctx(struct starpu_sched_policy *policy, int *workerid, int nworkerids, unsigned is_init_sched, const char *sched_name,
 struct _starpu_sched_ctx*  _starpu_create_sched_ctx(struct starpu_sched_policy *policy, int *workerid, int nworkerids, unsigned is_init_sched, const char *sched_name,
 						    int min_prio_set, int min_prio,
 						    int min_prio_set, int min_prio,
-						    int max_prio_set, int max_prio, unsigned awake_workers);
+						    int max_prio_set, int max_prio, unsigned awake_workers, void (*sched_policy_init)(void));
 
 
 /* delete all sched_ctx */
 /* delete all sched_ctx */
 void _starpu_delete_all_sched_ctxs();
 void _starpu_delete_all_sched_ctxs();

+ 1 - 0
src/core/sched_policy.c

@@ -58,6 +58,7 @@ static struct starpu_sched_policy *predefined_policies[] =
 	&_starpu_sched_dmda_sorted_decision_policy,
 	&_starpu_sched_dmda_sorted_decision_policy,
 	&_starpu_sched_parallel_heft_policy,
 	&_starpu_sched_parallel_heft_policy,
 	&_starpu_sched_peager_policy,
 	&_starpu_sched_peager_policy,
+	&_starpu_sched_heteroprio_policy,
 	NULL
 	NULL
 };
 };
 
 

+ 1 - 0
src/core/sched_policy.h

@@ -71,6 +71,7 @@ extern struct starpu_sched_policy _starpu_sched_dmda_sorted_decision_policy;
 extern struct starpu_sched_policy _starpu_sched_eager_policy;
 extern struct starpu_sched_policy _starpu_sched_eager_policy;
 extern struct starpu_sched_policy _starpu_sched_parallel_heft_policy;
 extern struct starpu_sched_policy _starpu_sched_parallel_heft_policy;
 extern struct starpu_sched_policy _starpu_sched_peager_policy;
 extern struct starpu_sched_policy _starpu_sched_peager_policy;
+extern struct starpu_sched_policy _starpu_sched_heteroprio_policy;
 extern struct starpu_sched_policy _starpu_sched_modular_eager_policy;
 extern struct starpu_sched_policy _starpu_sched_modular_eager_policy;
 extern struct starpu_sched_policy _starpu_sched_modular_eager_prefetching_policy;
 extern struct starpu_sched_policy _starpu_sched_modular_eager_prefetching_policy;
 extern struct starpu_sched_policy _starpu_sched_modular_prio_policy;
 extern struct starpu_sched_policy _starpu_sched_modular_prio_policy;

+ 1 - 1
src/core/workers.c

@@ -1258,7 +1258,7 @@ int starpu_initialize(struct starpu_conf *user_conf, int *argc, char ***argv)
 	if (!is_a_sink)
 	if (!is_a_sink)
 	{
 	{
 		struct starpu_sched_policy *selected_policy = _starpu_select_sched_policy(&config, config.conf->sched_policy_name);
 		struct starpu_sched_policy *selected_policy = _starpu_select_sched_policy(&config, config.conf->sched_policy_name);
-		_starpu_create_sched_ctx(selected_policy, NULL, -1, 1, "init", (config.conf->global_sched_ctx_min_priority != -1), config.conf->global_sched_ctx_min_priority, (config.conf->global_sched_ctx_min_priority != -1), config.conf->global_sched_ctx_max_priority, 1);
+		_starpu_create_sched_ctx(selected_policy, NULL, -1, 1, "init", (config.conf->global_sched_ctx_min_priority != -1), config.conf->global_sched_ctx_min_priority, (config.conf->global_sched_ctx_min_priority != -1), config.conf->global_sched_ctx_max_priority, 1, config.conf->sched_policy_init);
 	}
 	}
 
 
 	_starpu_initialize_registered_performance_models();
 	_starpu_initialize_registered_performance_models();

+ 618 - 0
src/sched_policies/heteroprio.c

@@ -0,0 +1,618 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2015  INRIA
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+/* Distributed queues using performance modeling to assign tasks */
+
+#include <starpu_config.h>
+#include <starpu_scheduler.h>
+#include <schedulers/heteroprio.h>
+
+#include <common/fxt.h>
+#include <core/task.h>
+
+#include <sched_policies/fifo_queues.h>
+#include <limits.h>
+
+#ifdef HAVE_AYUDAME_H
+#include <Ayudame.h>
+#endif
+
+#ifndef DBL_MIN
+#define DBL_MIN __DBL_MIN__
+#endif
+
+#ifndef DBL_MAX
+#define DBL_MAX __DBL_MAX__
+#endif
+
+/* A bucket corresponds to a Pair of priorities
+ * When a task is pushed with a priority X, it will be stored
+ * into the bucket X.
+ * All the tasks stored in the fifo should be computable by the arch
+ * in valide_archs.
+ * For example if valide_archs = (STARPU_CPU|STARPU_CUDA)
+ * Then task->task->cl->where should be at least (STARPU_CPU|STARPU_CUDA)
+ */
+struct _heteroprio_bucket{
+	/* The task of the current bucket */
+	struct _starpu_fifo_taskq* tasks_queue;
+	/* The correct arch for the current bucket */
+	unsigned valide_archs;
+	/* The slow factors for any archs */
+	float slow_factors_per_index[FSTARPU_NB_TYPES];
+	/* The base arch for the slow factor (the fatest arch for the current task in the bucket */
+	unsigned factor_base_arch_index;
+};
+
+/* Init a bucket */
+static void _heteroprio_bucket_init(struct _heteroprio_bucket* bucket){
+	memset(bucket, 0, sizeof(*bucket));
+	bucket->tasks_queue =  _starpu_create_fifo();
+}
+
+/* Release a bucket */
+static void _heteroprio_bucket_release(struct _heteroprio_bucket* bucket){
+	STARPU_ASSERT(_starpu_fifo_empty(bucket->tasks_queue) != 0);
+	_starpu_destroy_fifo(bucket->tasks_queue);
+}
+
+
+/* A worker is mainly composed of a fifo for the tasks
+ * and some direct access to worker properties.
+ * The fifo is implemented with any array,
+ * to read a task, access tasks_queue[tasks_queue_index]
+ * to write a task, access tasks_queue[(tasks_queue_index+tasks_queue_size)%HETEROPRIO_MAX_PREFETCH]
+ */
+/* ANDRA_MODIF: can use starpu fifo + starpu sched_mutex*/
+struct _heteroprio_worker_wrapper{
+	unsigned arch_type;
+	unsigned arch_index;
+	struct _starpu_fifo_taskq *tasks_queue;
+};
+
+struct _starpu_heteroprio_data
+{
+	starpu_pthread_mutex_t policy_mutex;
+	struct starpu_bitmap *waiters;
+	/* The bucket to store the tasks */
+	struct _heteroprio_bucket buckets[HETEROPRIO_MAX_PRIO];
+	/* The number of buckets for each arch */
+	unsigned nb_prio_per_arch_index[FSTARPU_NB_TYPES];
+	/* The mapping to the corresponding buckets */
+	unsigned prio_mapping_per_arch_index[FSTARPU_NB_TYPES][HETEROPRIO_MAX_PRIO];
+	/* The number of available tasks for a given arch (not prefetched) */
+	unsigned nb_remaining_tasks_per_arch_index[FSTARPU_NB_TYPES];
+	/* The total number of tasks in the bucket (not prefetched) */
+	unsigned total_tasks_in_buckets;
+	/* The total number of prefetched tasks for a given arch */
+	unsigned nb_prefetched_tasks_per_arch_index[FSTARPU_NB_TYPES];
+	/* The information for all the workers */
+	struct _heteroprio_worker_wrapper workers_heteroprio[STARPU_NMAXWORKERS];
+	/* The number of workers for a given arch */
+	unsigned nb_workers_per_arch_index[FSTARPU_NB_TYPES];
+};
+
+  
+
+/** Tell how many prio there are for a given arch */
+void starpu_heteroprio_set_nb_prios(unsigned sched_ctx_id, enum FStarPUTypes arch, unsigned max_prio)
+{
+	
+	struct _starpu_heteroprio_data *hp = (struct _starpu_heteroprio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
+
+	STARPU_ASSERT(max_prio < HETEROPRIO_MAX_PRIO);
+
+	hp->nb_prio_per_arch_index[arch] = max_prio;
+
+}
+
+ 
+
+/** Set the mapping for a given arch prio=>bucket */
+inline void starpu_heteroprio_set_mapping(unsigned sched_ctx_id, enum FStarPUTypes arch, unsigned source_prio, unsigned dest_bucket_id)
+{
+
+	STARPU_ASSERT(dest_bucket_id < HETEROPRIO_MAX_PRIO);
+
+	struct _starpu_heteroprio_data *hp = (struct _starpu_heteroprio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
+
+	hp->prio_mapping_per_arch_index[arch][source_prio] = dest_bucket_id;
+
+	hp->buckets[dest_bucket_id].valide_archs |= FStarPUTypesToArch[arch];
+
+}
+
+ 
+
+/** Tell which arch is the faster for the tasks of a bucket (optional) */
+inline void starpu_heteroprio_set_faster_arch(unsigned sched_ctx_id, enum FStarPUTypes arch, unsigned bucket_id){
+
+	STARPU_ASSERT(bucket_id < HETEROPRIO_MAX_PRIO);
+
+	struct _starpu_heteroprio_data *hp = (struct _starpu_heteroprio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
+
+	hp->buckets[bucket_id].factor_base_arch_index = arch;
+
+	hp->buckets[bucket_id].slow_factors_per_index[arch] = 0;
+
+}
+
+ 
+
+/** Tell how slow is a arch for the tasks of a bucket (optional) */
+
+inline void starpu_heteroprio_set_arch_slow_factor(unsigned sched_ctx_id, enum FStarPUTypes arch, unsigned bucket_id, float slow_factor){
+
+	STARPU_ASSERT(bucket_id < HETEROPRIO_MAX_PRIO);
+
+	struct _starpu_heteroprio_data *hp = (struct _starpu_heteroprio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
+
+	hp->buckets[bucket_id].slow_factors_per_index[arch] = slow_factor;
+
+} 
+
+static void initialize_heteroprio_policy(unsigned sched_ctx_id)
+{
+#ifdef STARPU_HAVE_HWLOC
+	starpu_sched_ctx_create_worker_collection(sched_ctx_id, STARPU_WORKER_TREE);
+#else
+	starpu_sched_ctx_create_worker_collection(sched_ctx_id, STARPU_WORKER_LIST);
+#endif
+	/* Alloc the scheduler data  */
+	struct _starpu_heteroprio_data *hp = (struct _starpu_heteroprio_data*)malloc(sizeof(struct _starpu_heteroprio_data));
+	memset(hp, 0, sizeof(*hp));
+
+	hp->waiters = starpu_bitmap_create();
+
+	starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)hp);
+
+	STARPU_PTHREAD_MUTEX_INIT(&hp->policy_mutex, NULL);
+
+	unsigned idx_prio;
+	for(idx_prio = 0; idx_prio < HETEROPRIO_MAX_PRIO; ++idx_prio)
+		_heteroprio_bucket_init(&hp->buckets[idx_prio]);
+
+	/* TODO call the callback */	
+	void (*init_sched)(void) = starpu_sched_ctx_get_sched_policy_init(sched_ctx_id);
+
+	if(init_sched)
+		init_sched();
+
+	/* Ensure that information have been correctly filled */
+	unsigned check_all_archs[HETEROPRIO_MAX_PRIO];
+	memset(check_all_archs, 0, sizeof(unsigned)*HETEROPRIO_MAX_PRIO);
+	unsigned arch_index;
+	for(arch_index = 0; arch_index < FSTARPU_NB_TYPES; ++arch_index)
+	{
+		STARPU_ASSERT(hp->nb_prio_per_arch_index[arch_index] <= HETEROPRIO_MAX_PRIO);
+		
+		unsigned check_archs[HETEROPRIO_MAX_PRIO];
+		memset(check_archs, 0, sizeof(unsigned)*HETEROPRIO_MAX_PRIO);
+		
+		for(idx_prio = 0; idx_prio < hp->nb_prio_per_arch_index[arch_index]; ++idx_prio)
+		{
+			const unsigned mapped_prio = hp->prio_mapping_per_arch_index[arch_index][idx_prio];
+			STARPU_ASSERT(mapped_prio <= HETEROPRIO_MAX_PRIO);
+			STARPU_ASSERT(hp->buckets[mapped_prio].slow_factors_per_index[arch_index] >= 0.0);
+			STARPU_ASSERT(hp->buckets[mapped_prio].valide_archs & FStarPUTypesToArch[arch_index]);
+			check_archs[mapped_prio]      = 1;
+			check_all_archs[mapped_prio] += 1;
+		}
+		for(idx_prio = 0; idx_prio < HETEROPRIO_MAX_PRIO; ++idx_prio)
+		{
+			/* Ensure the current arch use a bucket or someone else can use it */
+			STARPU_ASSERT(check_archs[idx_prio] == 1 || hp->buckets[idx_prio].valide_archs == 0
+				      || (hp->buckets[idx_prio].valide_archs & ~FStarPUTypesToArch[arch_index]) != 0);
+		}
+	}
+	/* Ensure that if a valide_archs = (STARPU_CPU|STARPU_CUDA) then check_all_archs[] = 2 for example */
+	
+	for(idx_prio = 0; idx_prio < HETEROPRIO_MAX_PRIO; ++idx_prio)
+	{
+		unsigned nb_arch_on_bucket = 0;
+		for(arch_index = 0; arch_index < FSTARPU_NB_TYPES; ++arch_index)
+		{
+			if(hp->buckets[idx_prio].valide_archs & FStarPUTypesToArch[arch_index])
+			{
+				nb_arch_on_bucket += 1;
+			}
+		}
+		STARPU_ASSERT(check_all_archs[idx_prio] == nb_arch_on_bucket);
+	}
+}
+
+static void deinitialize_heteroprio_policy(unsigned sched_ctx_id)
+{
+	struct _starpu_heteroprio_data *hp = (struct _starpu_heteroprio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
+
+	/* Ensure there are no more tasks */
+	STARPU_ASSERT(hp->total_tasks_in_buckets == 0);
+	unsigned arch_index;
+	for(arch_index = 0; arch_index < FSTARPU_NB_TYPES; ++arch_index){
+		STARPU_ASSERT(hp->nb_remaining_tasks_per_arch_index[arch_index] == 0);
+		STARPU_ASSERT(hp->nb_prefetched_tasks_per_arch_index[arch_index] == 0);
+	}
+
+	unsigned idx_prio;
+	for(idx_prio = 0; idx_prio < HETEROPRIO_MAX_PRIO; ++idx_prio){
+		_heteroprio_bucket_release(&hp->buckets[idx_prio]);
+	}
+
+	starpu_bitmap_destroy(hp->waiters);
+
+	starpu_sched_ctx_delete_worker_collection(sched_ctx_id);
+	STARPU_PTHREAD_MUTEX_DESTROY(&hp->policy_mutex);
+	free(hp);
+}
+
+static void add_workers_heteroprio_policy(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
+{
+	struct _starpu_heteroprio_data *hp = (struct _starpu_heteroprio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
+
+	int workerid;
+	unsigned i;
+	for (i = 0; i < nworkers; i++)
+	{
+		workerid = workerids[i];
+		memset(&hp->workers_heteroprio[workerid], 0, sizeof(hp->workers_heteroprio[workerid]));
+		/* if the worker has alreadry belonged to this context
+		   the queue and the synchronization variables have been already initialized */
+		if(hp->workers_heteroprio[workerid].tasks_queue == NULL)
+		{
+			hp->workers_heteroprio[workerid].tasks_queue = _starpu_create_fifo();
+			switch(starpu_worker_get_type(workerid))
+			{
+#ifdef STARPU_USE_CPU
+			case STARPU_CPU_WORKER:
+				hp->workers_heteroprio[workerid].arch_type = STARPU_CPU;
+				hp->workers_heteroprio[workerid].arch_index = FSTARPU_CPU_IDX;
+				break;
+#endif
+#ifdef STARPU_USE_CUDA
+			case STARPU_CUDA_WORKER:
+				hp->workers_heteroprio[workerid].arch_type = STARPU_CUDA;
+				hp->workers_heteroprio[workerid].arch_index = FSTARPU_CUDA_IDX;
+				break;
+#endif
+#ifdef STARPU_USE_OPENCL
+			case STARPU_OPENCL_WORKER:
+				hp->workers_heteroprio[workerid].arch_type = STARPU_OPENCL;
+				hp->workers_heteroprio[workerid].arch_index = FSTARPU_OPENCL_IDX;
+				break;
+#endif
+			default:
+				STARPU_ASSERT(0);
+			}
+		}
+		hp->nb_workers_per_arch_index[hp->workers_heteroprio[workerid].arch_index]++;
+
+	}
+}
+
+static void remove_workers_heteroprio_policy(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
+{
+	struct _starpu_heteroprio_data *hp = (struct _starpu_heteroprio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
+
+	int workerid;
+	unsigned i;
+	for (i = 0; i < nworkers; i++)
+	{
+		workerid = workerids[i];
+		if(hp->workers_heteroprio[workerid].tasks_queue != NULL)
+		{
+			_starpu_destroy_fifo(hp->workers_heteroprio[workerid].tasks_queue);
+			hp->workers_heteroprio[workerid].tasks_queue = NULL;
+		}
+	}
+}
+
+/* Push a new task (simply store it and update counters) */
+static int push_task_heteroprio_policy(struct starpu_task *task)
+{
+	unsigned sched_ctx_id = task->sched_ctx;
+	struct _starpu_heteroprio_data *hp = (struct _starpu_heteroprio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
+
+	/* One worker at a time use heteroprio */
+	STARPU_PTHREAD_MUTEX_LOCK(&hp->policy_mutex);
+
+	/* Retrieve the correct bucket */
+	STARPU_ASSERT(task->priority < HETEROPRIO_MAX_PRIO);
+	struct _heteroprio_bucket* bucket = &hp->buckets[task->priority];
+	/* Ensure that any worker that check that list can compute the task */
+	STARPU_ASSERT(bucket->valide_archs
+	       && ((bucket->valide_archs ^ task->cl->where) & bucket->valide_archs) == 0);
+	/* save the task */
+	_starpu_fifo_push_back_task(bucket->tasks_queue,task);
+
+	/* Inc counters */
+	unsigned arch_index;
+	for(arch_index = 0; arch_index < FSTARPU_NB_TYPES; ++arch_index)
+	{
+		/* We test the archs on the bucket and not on task->cl->where since it is restrictive */
+		if(bucket->valide_archs & FStarPUTypesToArch[arch_index])
+			hp->nb_remaining_tasks_per_arch_index[arch_index] += 1;
+	}
+
+	hp->total_tasks_in_buckets += 1;
+
+	starpu_push_task_end(task);
+
+	/*if there are no tasks_queue block */
+	/* wake people waiting for a task */
+	unsigned worker = 0;
+	struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
+
+	struct starpu_sched_ctx_iterator it;
+#ifndef STARPU_NON_BLOCKING_DRIVERS
+	char dowake[STARPU_NMAXWORKERS] = { 0 };
+#endif
+
+	workers->init_iterator(workers, &it);
+	while(workers->has_next_master(workers, &it))
+	{
+		worker = workers->get_next_master(workers, &it);
+
+#ifdef STARPU_NON_BLOCKING_DRIVERS
+		if (!starpu_bitmap_get(hp->waiters, worker))
+			/* This worker is not waiting for a task */
+			continue;
+#endif
+
+		if (starpu_worker_can_execute_task_first_impl(worker, task, NULL))
+		{
+			/* It can execute this one, tell him! */
+#ifdef STARPU_NON_BLOCKING_DRIVERS
+			starpu_bitmap_unset(hp->waiters, worker);
+			/* We really woke at least somebody, no need to wake somebody else */
+			break;
+#else
+			dowake[worker] = 1;
+#endif
+		}
+	}
+	/* Let the task free */
+	STARPU_PTHREAD_MUTEX_UNLOCK(&hp->policy_mutex);
+
+#ifndef STARPU_NON_BLOCKING_DRIVERS
+	/* Now that we have a list of potential workers, try to wake one */
+
+	workers->init_iterator(workers, &it);
+	while(workers->has_next(workers, &it))
+	{
+		worker = workers->get_next(workers, &it);
+		if (dowake[worker])
+			if (starpu_wake_worker(worker))
+				break; // wake up a single worker
+	}
+#endif
+
+	return 0;
+}
+
+static struct starpu_task *pop_task_heteroprio_policy(unsigned sched_ctx_id)
+{
+	const unsigned workerid = starpu_worker_get_id();
+	struct _starpu_heteroprio_data *hp = (struct _starpu_heteroprio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
+	struct _heteroprio_worker_wrapper* worker = &hp->workers_heteroprio[workerid];
+
+	/* If no tasks available, no tasks in worker queue or some arch worker queue just return NULL */
+	if ((hp->total_tasks_in_buckets == 0 || hp->nb_remaining_tasks_per_arch_index[worker->arch_index] == 0)
+            && worker->tasks_queue->ntasks == 0 && hp->nb_prefetched_tasks_per_arch_index[worker->arch_index] == 0){
+		return NULL;
+	}
+
+#ifdef STARPU_NON_BLOCKING_DRIVERS
+	if (starpu_bitmap_get(hp->waiters, workerid)){
+		/* Nobody woke us, avoid bothering the mutex */
+		return NULL;
+	}
+#endif
+/* TOTO beranger check this out */
+	starpu_pthread_mutex_t *worker_sched_mutex;
+	starpu_pthread_cond_t *worker_sched_cond;
+	starpu_worker_get_sched_condition(workerid, &worker_sched_mutex, &worker_sched_cond);
+	
+	
+	/* Note: Releasing this mutex before taking the victim mutex, to avoid interlock*/
+	STARPU_PTHREAD_MUTEX_UNLOCK(worker_sched_mutex);
+
+	STARPU_PTHREAD_MUTEX_LOCK(&hp->policy_mutex);
+
+	/* keep track of the new added task to perfom real prefetch on node */
+	unsigned nb_added_tasks = 0;
+
+	/* Check that some tasks are available for the current worker arch */
+	if( hp->nb_remaining_tasks_per_arch_index[worker->arch_index] != 0 ){
+		/* Ideally we would like to fill the prefetch array */
+		unsigned nb_tasks_to_prefetch = (HETEROPRIO_MAX_PREFETCH-worker->tasks_queue->ntasks);
+		/* But there are maybe less tasks than that! */
+		if(nb_tasks_to_prefetch > hp->nb_remaining_tasks_per_arch_index[worker->arch_index]){
+			nb_tasks_to_prefetch = hp->nb_remaining_tasks_per_arch_index[worker->arch_index];
+		}
+		/* But in case there are less tasks than worker we take the minimum */
+		if(hp->nb_remaining_tasks_per_arch_index[worker->arch_index] < starpu_sched_ctx_get_nworkers(sched_ctx_id)){
+			if(worker->tasks_queue->ntasks == 0) 
+				nb_tasks_to_prefetch = 1;
+			else 
+				nb_tasks_to_prefetch = 0;
+		}
+
+		nb_added_tasks = nb_tasks_to_prefetch;
+
+		unsigned idx_prio, arch_index;
+		/* We iterate until we found all the tasks we need */
+		for(idx_prio = 0; nb_tasks_to_prefetch && idx_prio < hp->nb_prio_per_arch_index[worker->arch_index]; ++idx_prio)
+		{
+			/* Retrieve the bucket using the mapping */
+			struct _heteroprio_bucket* bucket = &hp->buckets[hp->prio_mapping_per_arch_index[worker->arch_index][idx_prio]];
+			/* Ensure we can compute task from this bucket */
+			STARPU_ASSERT(bucket->valide_archs & worker->arch_type);
+			/* Take nb_tasks_to_prefetch tasks if possible */
+			while(!_starpu_fifo_empty(bucket->tasks_queue) && nb_tasks_to_prefetch && 
+			      (bucket->factor_base_arch_index == 0 || 
+			       worker->arch_index == bucket->factor_base_arch_index || 
+			       (((float)bucket->tasks_queue->ntasks)/((float)hp->nb_workers_per_arch_index[bucket->factor_base_arch_index])) >= bucket->slow_factors_per_index[worker->arch_index]))
+			{
+				struct starpu_task* task = _starpu_fifo_pop_local_task(bucket->tasks_queue);
+				STARPU_ASSERT(starpu_worker_can_execute_task(workerid, task, 0));
+				/* Save the task */
+				_starpu_fifo_push_task(worker->tasks_queue, task);
+
+				/* Update general counter */
+				hp->nb_prefetched_tasks_per_arch_index[worker->arch_index] += 1;
+				hp->total_tasks_in_buckets -= 1;
+				
+				for(arch_index = 0; arch_index < FSTARPU_NB_TYPES; ++arch_index){
+					/* We test the archs on the bucket and not on task->cl->where since it is restrictive */
+					if(bucket->valide_archs & FStarPUTypesToArch[arch_index]){
+						hp->nb_remaining_tasks_per_arch_index[arch_index] -= 1;
+					}
+				}
+				/* Decrease the number of tasks to found */
+				nb_tasks_to_prefetch -= 1;
+				// TODO starpu_prefetch_task_input_on_node(task, workerid);
+			}
+		}
+		STARPU_ASSERT_MSG(nb_tasks_to_prefetch == 0, "but %d and worker %d \n", nb_tasks_to_prefetch, workerid);
+	}
+
+	struct starpu_task* task = NULL;
+
+	/* The worker has some tasks in its queue */
+	if(worker->tasks_queue->ntasks){
+		task = _starpu_fifo_pop_task(worker->tasks_queue, workerid);
+		hp->nb_prefetched_tasks_per_arch_index[worker->arch_index] -= 1;
+	}
+	/* Otherwise look if we can steal some work */
+	else if(hp->nb_prefetched_tasks_per_arch_index[worker->arch_index]){
+		/* If HETEROPRIO_MAX_PREFETCH==1 it should not be possible to steal work */
+		STARPU_ASSERT(HETEROPRIO_MAX_PREFETCH != 1);
+		
+		struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
+
+		struct starpu_sched_ctx_iterator it;
+
+		workers->init_iterator(workers, &it);
+		unsigned victim = workerid;
+		unsigned current_worker;
+		while(workers->has_next_master(workers, &it))
+		{
+			current_worker = workers->get_next_master(workers, &it);
+			if(current_worker == victim)
+				break;
+		}
+		
+		/* circular loop */
+		while(1)
+		{
+			while(workers->has_next_master(workers, &it))
+			{
+				victim = workers->get_next_master(workers, &it);
+				if(victim == workerid)
+					continue;
+		
+				/* If it is the same arch and there is a task to steal */
+				if(hp->workers_heteroprio[victim].arch_index == worker->arch_index
+				   && hp->workers_heteroprio[victim].tasks_queue->ntasks){
+					starpu_pthread_mutex_t *victim_sched_mutex;
+					starpu_pthread_cond_t *victim_sched_cond;
+					starpu_worker_get_sched_condition(victim, &victim_sched_mutex, &victim_sched_cond);
+
+					/* ensure the worker is not currently prefetching its data */
+					STARPU_PTHREAD_MUTEX_LOCK(victim_sched_mutex);
+
+					if(hp->workers_heteroprio[victim].arch_index == worker->arch_index
+					   && hp->workers_heteroprio[victim].tasks_queue->ntasks){
+						/* steal the last added task */
+						task = starpu_task_list_pop_back(&hp->workers_heteroprio[victim].tasks_queue->taskq);
+						/* we steal a task update global counter */
+						hp->nb_prefetched_tasks_per_arch_index[hp->workers_heteroprio[victim].arch_index] -= 1;
+						
+						STARPU_PTHREAD_MUTEX_UNLOCK(victim_sched_mutex);
+						break;
+					}
+					STARPU_PTHREAD_MUTEX_UNLOCK(victim_sched_mutex);
+				}
+			}
+		}
+	}
+
+	if (!task){
+		/* Tell pushers that we are waiting for tasks_queue for us */
+		starpu_bitmap_set(hp->waiters, workerid);
+	}
+	STARPU_PTHREAD_MUTEX_UNLOCK(&hp->policy_mutex);
+	STARPU_PTHREAD_MUTEX_LOCK(worker_sched_mutex);
+
+	if(task){
+		unsigned child_sched_ctx = starpu_sched_ctx_worker_is_master_for_child_ctx(workerid, sched_ctx_id);
+		if(child_sched_ctx != STARPU_NMAX_SCHED_CTXS){
+			starpu_sched_ctx_move_task_to_ctx(task, child_sched_ctx);
+			starpu_sched_ctx_revert_task_counters(sched_ctx_id, task->flops);
+			return NULL;
+		}
+	}
+
+	/* if we have task (task) me way have some in the queue (worker->tasks_queue_size) that was freshly addeed (nb_added_tasks) */
+	if(task && worker->tasks_queue->ntasks && nb_added_tasks && starpu_get_prefetch_flag()){
+		const unsigned memory_node = starpu_worker_get_memory_node(workerid);
+
+		/* prefetch the new task that I own but protecte my node from work stealing during the prefetch */
+//		STARPU_PTHREAD_MUTEX_LOCK(&worker->ws_prefetch_mutex);
+		/*already protected - the lock of the worker is taken */
+
+		/* prefetch task but stop in case we now some one may steal a task from us */
+		/* while(nb_added_tasks && hp->nb_remaining_tasks_per_arch_index[worker->arch_index] != 0){ */
+		/* 	/\* prefetch from closest to end task *\/ */
+		/* 	starpu_prefetch_task_input_on_node(worker->tasks_queue[(worker->tasks_queue_index+worker->tasks_queue_size-nb_added_tasks)%HETEROPRIO_MAX_PREFETCH], memory_node); */
+		/* 	nb_added_tasks -= 1; */
+		/* } */
+
+/* TOTO beranger check this out - is this how you planned to prefetch tasks ? */
+		struct starpu_task *task_to_prefetch = NULL;
+		for (task_to_prefetch  = starpu_task_list_begin(&worker->tasks_queue->taskq);
+		     (task_to_prefetch != starpu_task_list_end(&worker->tasks_queue->taskq) && 
+		      nb_added_tasks && hp->nb_remaining_tasks_per_arch_index[worker->arch_index] != 0);
+		     task_to_prefetch  = starpu_task_list_next(task_to_prefetch))
+		{
+			/* prefetch from closest to end task */
+			starpu_prefetch_task_input_on_node(task_to_prefetch, memory_node);
+			nb_added_tasks -= 1;
+		}
+
+
+//		STARPU_PTHREAD_MUTEX_UNLOCK(&worker->ws_prefetch_mutex);
+	}
+
+	return task;
+}
+
+struct starpu_sched_policy _starpu_sched_heteroprio_policy =
+{
+        .init_sched = initialize_heteroprio_policy,
+        .deinit_sched = deinitialize_heteroprio_policy,
+        .add_workers = add_workers_heteroprio_policy,
+        .remove_workers = remove_workers_heteroprio_policy,
+        .push_task = push_task_heteroprio_policy,
+	.simulate_push_task = NULL,
+        .push_task_notify = NULL,
+	.pop_task = pop_task_heteroprio_policy,
+	.pre_exec_hook = NULL,
+        .post_exec_hook = NULL,
+	.pop_every_task = NULL,
+        .policy_name = "heteroprio",
+        .policy_description = "heteroprio"
+};