Browse Source

New modular-heteroprio scheduler

Samuel Thibault 5 years ago
parent
commit
8cd00d737f

+ 1 - 0
ChangeLog

@@ -43,6 +43,7 @@ New features:
   * starpufft: Support 3D.
   * starpufft: Support 3D.
   * New modular-eager-prio scheduler.
   * New modular-eager-prio scheduler.
   * Add 'ready' heuristic to modular schedulers.
   * Add 'ready' heuristic to modular schedulers.
+  * New modular-heteroprio scheduler.
 
 
 Changes:
 Changes:
   * Modification in the Native Fortran interface of the functions
   * Modification in the Native Fortran interface of the functions

+ 4 - 0
doc/doxygen/chapters/320_scheduling.doxy

@@ -238,6 +238,10 @@ modular-heft-prio is similar to \c modular-heft, but only decides the memory
 node, not the exact worker, just pushing tasks to one central queue per memory
 node, not the exact worker, just pushing tasks to one central queue per memory
 node.
 node.
 
 
+- Heteroprio Scheduler: \n
+Maps tasks to worker similarly to HEFT, but first attribute accelerated tasks to
+GPUs, then not-so-accelerated tasks to CPUs.
+
 To use one of these schedulers, one can set the environment variable \ref STARPU_SCHED.
 To use one of these schedulers, one can set the environment variable \ref STARPU_SCHED.
 
 
 \section StaticScheduling Static Scheduling
 \section StaticScheduling Static Scheduling

+ 1 - 1
examples/cholesky/cholesky.sh

@@ -17,7 +17,7 @@
 
 
 ROOT=${0%.sh}
 ROOT=${0%.sh}
 #[ -n "$STARPU_SCHEDS" ] || STARPU_SCHEDS=`$(dirname $0)/../../tools/starpu_sched_display`
 #[ -n "$STARPU_SCHEDS" ] || STARPU_SCHEDS=`$(dirname $0)/../../tools/starpu_sched_display`
-[ -n "$STARPU_SCHEDS" ] || STARPU_SCHEDS="dmdas modular-heft modular-heft-prio dmdap dmdar dmda dmdasd prio lws"
+[ -n "$STARPU_SCHEDS" ] || STARPU_SCHEDS="dmdas modular-heft2 modular-heft modular-heft-prio modular-heteroprio dmdap dmdar dmda dmdasd prio lws"
 [ -n "$STARPU_HOSTNAME" ] || export STARPU_HOSTNAME=mirage
 [ -n "$STARPU_HOSTNAME" ] || export STARPU_HOSTNAME=mirage
 unset MALLOC_PERTURB_
 unset MALLOC_PERTURB_
 
 

+ 10 - 0
include/starpu_sched_component.h

@@ -553,6 +553,16 @@ int starpu_sched_component_is_heft(struct starpu_sched_component *component);
 /** @} */
 /** @} */
 
 
 /**
 /**
+   @name Resource-mapping Heteroprio Component API
+   @{
+*/
+
+struct starpu_sched_component * starpu_sched_component_heteroprio_create(struct starpu_sched_tree *tree, struct starpu_sched_component_mct_data * params) STARPU_ATTRIBUTE_MALLOC;
+int starpu_sched_component_is_heteroprio(struct starpu_sched_component *component);
+
+/** @} */
+
+/**
    @name Special-purpose Best_Implementation Component API
    @name Special-purpose Best_Implementation Component API
    @{
    @{
 */
 */

+ 2 - 0
src/Makefile.am

@@ -287,6 +287,7 @@ libstarpu_@STARPU_EFFECTIVE_VERSION@_la_SOURCES = 		\
 	sched_policies/component_eager_calibration.c				\
 	sched_policies/component_eager_calibration.c				\
 	sched_policies/component_mct.c				\
 	sched_policies/component_mct.c				\
 	sched_policies/component_heft.c				\
 	sched_policies/component_heft.c				\
+	sched_policies/component_heteroprio.c				\
 	sched_policies/component_best_implementation.c		\
 	sched_policies/component_best_implementation.c		\
 	sched_policies/component_perfmodel_select.c				\
 	sched_policies/component_perfmodel_select.c				\
 	sched_policies/component_composed.c				\
 	sched_policies/component_composed.c				\
@@ -303,6 +304,7 @@ libstarpu_@STARPU_EFFECTIVE_VERSION@_la_SOURCES = 		\
 	sched_policies/modular_parallel_heft.c			\
 	sched_policies/modular_parallel_heft.c			\
 	sched_policies/modular_heft.c				\
 	sched_policies/modular_heft.c				\
 	sched_policies/modular_heft_prio.c			\
 	sched_policies/modular_heft_prio.c			\
+	sched_policies/modular_heteroprio.c			\
 	sched_policies/modular_heft2.c				\
 	sched_policies/modular_heft2.c				\
 	sched_policies/modular_ws.c				\
 	sched_policies/modular_ws.c				\
 	sched_policies/modular_ez.c
 	sched_policies/modular_ez.c

+ 1 - 0
src/core/sched_policy.c

@@ -70,6 +70,7 @@ static struct starpu_sched_policy *predefined_policies[] =
 	&_starpu_sched_modular_heft_policy,
 	&_starpu_sched_modular_heft_policy,
 	&_starpu_sched_modular_heft_prio_policy,
 	&_starpu_sched_modular_heft_prio_policy,
 	&_starpu_sched_modular_heft2_policy,
 	&_starpu_sched_modular_heft2_policy,
+	&_starpu_sched_modular_heteroprio_policy,
 	&_starpu_sched_modular_parallel_heft_policy,
 	&_starpu_sched_modular_parallel_heft_policy,
 	&_starpu_sched_eager_policy,
 	&_starpu_sched_eager_policy,
 	&_starpu_sched_prio_policy,
 	&_starpu_sched_prio_policy,

+ 1 - 0
src/core/sched_policy.h

@@ -107,6 +107,7 @@ extern struct starpu_sched_policy _starpu_sched_modular_ws_policy;
 extern struct starpu_sched_policy _starpu_sched_modular_heft_policy;
 extern struct starpu_sched_policy _starpu_sched_modular_heft_policy;
 extern struct starpu_sched_policy _starpu_sched_modular_heft_prio_policy;
 extern struct starpu_sched_policy _starpu_sched_modular_heft_prio_policy;
 extern struct starpu_sched_policy _starpu_sched_modular_heft2_policy;
 extern struct starpu_sched_policy _starpu_sched_modular_heft2_policy;
+extern struct starpu_sched_policy _starpu_sched_modular_heteroprio_policy;
 extern struct starpu_sched_policy _starpu_sched_modular_parallel_heft_policy;
 extern struct starpu_sched_policy _starpu_sched_modular_parallel_heft_policy;
 extern struct starpu_sched_policy _starpu_sched_graph_test_policy;
 extern struct starpu_sched_policy _starpu_sched_graph_test_policy;
 extern struct starpu_sched_policy _starpu_sched_tree_heft_hierarchical_policy;
 extern struct starpu_sched_policy _starpu_sched_tree_heft_hierarchical_policy;

+ 478 - 0
src/sched_policies/component_heteroprio.c

@@ -0,0 +1,478 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2013,2017,2018                           Inria
+ * Copyright (C) 2014-2018                                CNRS
+ * Copyright (C) 2013-2019                                Université de Bordeaux
+ * Copyright (C) 2013                                     Simon Archipoff
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+/* Heteroprio, which sorts tasks by acceleration factor into buckets, and makes
+ * GPUs take accelerated tasks first and CPUs take non-accelerated tasks first */
+
+#include <starpu_sched_component.h>
+#include "prio_deque.h"
+#include <starpu_perfmodel.h>
+#include "helper_mct.h"
+#include <float.h>
+#include <core/sched_policy.h>
+#include <core/task.h>
+
+/* Approximation ratio for acceleration factor bucketing
+ * We will put tasks with +-10% similar acceleration into the same bucket. */
+#define APPROX 0.10
+
+struct _starpu_heteroprio_data
+{
+	/* This is an array of priority queues.
+	 * The array is sorted by acceleration factor, most accelerated first */
+	struct _starpu_prio_deque **bucket;
+	float *accel;
+	unsigned naccel;
+
+	/* This contains tasks which are not supported on all archs. */
+	struct _starpu_prio_deque no_accel;
+
+	/* This protects all queues */
+	starpu_pthread_mutex_t mutex;
+
+	struct _starpu_mct_data *mct_data;
+};
+
+static int heteroprio_push(struct starpu_sched_component *component, struct _starpu_heteroprio_data *data, enum starpu_worker_archtype archtype, int front)
+{
+	struct starpu_task *task = NULL;
+	starpu_pthread_mutex_t * mutex = &data->mutex;
+	int j, ret;
+	double acceleration = INFINITY;
+
+	struct _starpu_mct_data * d = data->mct_data;
+
+	STARPU_COMPONENT_MUTEX_LOCK(mutex);
+	if (front)
+		/* Pick up accelerated tasks first */
+		for (j = 0; j < (int) data->naccel; j++)
+		{
+			task = _starpu_prio_deque_pop_task(data->bucket[j]);
+			if (task)
+				break;
+		}
+	else
+		/* Pick up accelerated tasks last */
+		for (j = (int) data->naccel-1; j >= 0; j--)
+		{
+			task = _starpu_prio_deque_pop_task(data->bucket[j]);
+			if (task)
+				break;
+		}
+
+	if (task)
+	{
+		acceleration = data->accel[j];
+		//fprintf(stderr, "for %s thus %s, found task %p in bucket %d: %f\n", starpu_worker_get_type_as_string(archtype), front?"front":"back", task, j, acceleration);
+	}
+
+	STARPU_COMPONENT_MUTEX_UNLOCK(mutex);
+
+	if (!task)
+		return 1;
+
+	/* TODO: we might want to prefer to pick up a task whose data is already on some GPU */
+
+	struct starpu_sched_component * best_component;
+
+	/* Estimated task duration for each child */
+	double estimated_lengths[component->nchildren];
+	/* Estimated transfer duration for each child */
+	double estimated_transfer_length[component->nchildren];
+	/* Estimated transfer+task termination for each child */
+	double estimated_ends_with_task[component->nchildren];
+
+	/* Minimum transfer+task termination on all children */
+	double min_exp_end_with_task;
+	/* Maximum transfer+task termination on all children */
+	double max_exp_end_with_task;
+
+	unsigned suitable_components[component->nchildren];
+	unsigned nsuitable_components;
+
+	nsuitable_components = starpu_mct_compute_execution_times(component, task,
+			estimated_lengths,
+			estimated_transfer_length,
+			suitable_components);
+
+	/* Entering critical section to make sure no two workers
+	   make scheduling decisions at the same time */
+	STARPU_COMPONENT_MUTEX_LOCK(&d->scheduling_mutex);
+
+	starpu_mct_compute_expected_times(component, task,
+			estimated_lengths,
+			estimated_transfer_length,
+			estimated_ends_with_task,
+			&min_exp_end_with_task, &max_exp_end_with_task,
+			suitable_components, nsuitable_components);
+
+	/* And now find out which worker suits best for this task,
+	 * including data transfer */
+	int best_icomponent = starpu_mct_get_best_component(d, task,
+			estimated_lengths,
+			estimated_transfer_length,
+			estimated_ends_with_task,
+			min_exp_end_with_task, max_exp_end_with_task,
+			suitable_components, nsuitable_components);
+
+	best_component = component->children[best_icomponent];
+
+	int idworker;
+	for(idworker = starpu_bitmap_first(best_component->workers);
+		idworker != -1;
+		idworker = starpu_bitmap_next(best_component->workers, idworker))
+	{
+		if (starpu_worker_get_type(idworker) == archtype)
+			break;
+	}
+
+	if (idworker == -1)
+	{
+		STARPU_COMPONENT_MUTEX_UNLOCK(&d->scheduling_mutex);
+	}
+	else
+	{
+		/* Ok, we do have a worker there of that type, try to push it there. */
+		STARPU_ASSERT(!starpu_sched_component_is_worker(best_component));
+		starpu_sched_task_break(task);
+		ret = starpu_sched_component_push_task(component,best_component,task);
+
+		/* I can now exit the critical section: Pushing the task above ensures that its execution
+		   time will be taken into account for subsequent scheduling decisions */
+		STARPU_COMPONENT_MUTEX_UNLOCK(&d->scheduling_mutex);
+		if (!ret)
+		{
+			//fprintf(stderr, "pushed %p to %d\n", task, best_icomponent);
+			/* Great! */
+			return 0;
+		}
+	}
+
+	/* No such kind of worker there, or it refused our task, abort */
+
+	//fprintf(stderr, "could not push %p to %d actually\n", task, best_icomponent);
+	/* Could not push to child actually, push that one back */
+	STARPU_COMPONENT_MUTEX_LOCK(mutex);
+	for (j = 0; j < (int) data->naccel; j++) {
+		if (acceleration == data->accel[j])
+		{
+			_starpu_prio_deque_push_front_task(data->bucket[j], task);
+			break;
+		}
+	}
+	STARPU_ASSERT(j != (int) data->naccel);
+	STARPU_COMPONENT_MUTEX_UNLOCK(mutex);
+
+	//fprintf(stderr, "finished pushing to %d\n", archtype);
+
+	return 1;
+}
+
+static int heteroprio_push_heft(struct starpu_sched_component *component, struct _starpu_heteroprio_data *data, struct starpu_task *task)
+{
+	struct _starpu_mct_data * d = data->mct_data;
+	int ret;
+
+	struct starpu_sched_component * best_component;
+
+	/* Estimated task duration for each child */
+	double estimated_lengths[component->nchildren];
+	/* Estimated transfer duration for each child */
+	double estimated_transfer_length[component->nchildren];
+	/* Estimated transfer+task termination for each child */
+	double estimated_ends_with_task[component->nchildren];
+
+	/* Minimum transfer+task termination on all children */
+	double min_exp_end_with_task;
+	/* Maximum transfer+task termination on all children */
+	double max_exp_end_with_task;
+
+	unsigned suitable_components[component->nchildren];
+	unsigned nsuitable_components;
+
+	nsuitable_components = starpu_mct_compute_execution_times(component, task,
+			estimated_lengths,
+			estimated_transfer_length,
+			suitable_components);
+
+	/* If no suitable components were found, it means that the perfmodel of
+	 * the task had been purged since it has been pushed on the mct component.
+	 * We should send a push_fail message to its parent so that it will
+	 * be able to reschedule the task properly. */
+	if(nsuitable_components == 0)
+		return 1;
+
+	/* Entering critical section to make sure no two workers
+	   make scheduling decisions at the same time */
+	STARPU_COMPONENT_MUTEX_LOCK(&d->scheduling_mutex);
+
+	starpu_mct_compute_expected_times(component, task,
+			estimated_lengths,
+			estimated_transfer_length,
+			estimated_ends_with_task,
+			&min_exp_end_with_task, &max_exp_end_with_task,
+			suitable_components, nsuitable_components);
+
+	/* And now find out which worker suits best for this task,
+	 * including data transfer */
+	int best_icomponent = starpu_mct_get_best_component(d, task,
+			estimated_lengths,
+			estimated_transfer_length,
+			estimated_ends_with_task,
+			min_exp_end_with_task, max_exp_end_with_task,
+			suitable_components, nsuitable_components);
+
+	/* If no best component is found, it means that the perfmodel of
+	 * the task had been purged since it has been pushed on the mct component.
+	 * We should send a push_fail message to its parent so that it will
+	 * be able to reschedule the task properly. */
+	if(best_icomponent == -1)
+	{
+		STARPU_COMPONENT_MUTEX_UNLOCK(&d->scheduling_mutex);
+		return 1;
+	}
+
+	best_component = component->children[best_icomponent];
+
+	STARPU_ASSERT(!starpu_sched_component_is_worker(best_component));
+	ret = starpu_sched_component_push_task(component,best_component,task);
+	STARPU_COMPONENT_MUTEX_UNLOCK(&d->scheduling_mutex);
+
+	return ret;
+}
+
+static int heteroprio_progress_one(struct starpu_sched_component *component)
+{
+	struct _starpu_heteroprio_data * data = component->data;
+	starpu_pthread_mutex_t * mutex = &data->mutex;
+	struct starpu_task *task;
+
+	struct _starpu_prio_deque * no_accel = &data->no_accel;
+	STARPU_COMPONENT_MUTEX_LOCK(mutex);
+	task = _starpu_prio_deque_pop_task(no_accel);
+	STARPU_COMPONENT_MUTEX_UNLOCK(mutex);
+
+	if (task) {
+		if (heteroprio_push_heft(component, data, task))
+		{
+			/* Could not push to child actually, push that one back */
+			STARPU_COMPONENT_MUTEX_LOCK(mutex);
+			_starpu_prio_deque_push_front_task(no_accel, task);
+			STARPU_COMPONENT_MUTEX_UNLOCK(mutex);
+		}
+	}
+
+	/* Note: this hardcodes acceleration order */
+	if (!heteroprio_push(component, data, STARPU_CUDA_WORKER, 1))
+		return 0;
+	if (!heteroprio_push(component, data, STARPU_OPENCL_WORKER, 1))
+		return 0;
+	if (!heteroprio_push(component, data, STARPU_MIC_WORKER, 1))
+		return 0;
+	if (!heteroprio_push(component, data, STARPU_MPI_MS_WORKER, 0))
+		return 0;
+	if (!heteroprio_push(component, data, STARPU_CPU_WORKER, 0))
+		return 0;
+
+	return 1;
+}
+
+/* Try to push some tasks below */
+static void heteroprio_progress(struct starpu_sched_component *component)
+{
+	STARPU_ASSERT(component && starpu_sched_component_is_heteroprio(component));
+	while (!heteroprio_progress_one(component))
+		;
+}
+
+static int heteroprio_push_task(struct starpu_sched_component * component, struct starpu_task * task)
+{
+	STARPU_ASSERT(component && task && starpu_sched_component_is_heteroprio(component));
+	struct _starpu_heteroprio_data * data = component->data;
+	starpu_pthread_mutex_t * mutex = &data->mutex;
+	unsigned nimpl;
+
+	double min_expected = INFINITY, max_expected = -INFINITY;
+	double acceleration;
+
+	/* Compute acceleration between best-performing arch and least-performing arch */
+	int workerid;
+	for(workerid = starpu_bitmap_first(component->workers_in_ctx);
+	    workerid != -1;
+	    workerid = starpu_bitmap_next(component->workers_in_ctx, workerid))
+	{
+		unsigned impl_mask;
+		if (!starpu_worker_can_execute_task_impl(workerid, task, &impl_mask))
+			break;
+
+		struct starpu_perfmodel_arch* perf_arch = starpu_worker_get_perf_archtype(workerid, task->sched_ctx);
+		double min_arch = INFINITY;
+		for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
+		{
+			if (!(impl_mask & (1U << nimpl)))
+				continue;
+			double expected = starpu_task_expected_length(task, perf_arch, nimpl);
+			if (expected < min_arch)
+				min_arch = expected;
+		}
+		STARPU_ASSERT(min_arch != INFINITY);
+		if (min_arch < min_expected)
+			min_expected = min_arch;
+		if (min_arch > max_expected)
+			max_expected = min_arch;
+	}
+
+	if (workerid == -1) {
+		/* All archs can run it */
+		STARPU_ASSERT(min_expected != INFINITY);
+		STARPU_ASSERT(max_expected != -INFINITY);
+		acceleration = max_expected / min_expected;
+
+		//fprintf(stderr,"%s: acceleration %f\n", starpu_task_get_name(task), acceleration);
+
+		STARPU_COMPONENT_MUTEX_LOCK(mutex);
+		unsigned i, j;
+		/* Try to find a bucket with similar acceleration */
+		for (i = 0; i < data->naccel; i++) {
+			if (acceleration >= data->accel[i] * (1 - APPROX) &&
+			    acceleration <= data->accel[i] * (1 + APPROX))
+				break;
+		}
+
+		if (i == data->naccel) {
+			/* Didn't find it, add one */
+			data->naccel++;
+
+			float *newaccel = malloc(data->naccel * sizeof(*newaccel));
+			struct _starpu_prio_deque **newbuckets = malloc(data->naccel * sizeof(*newbuckets));
+			struct _starpu_prio_deque *newbucket = malloc(sizeof(*newbucket));
+			_starpu_prio_deque_init(newbucket);
+			int inserted = 0;
+
+			for (j = 0; j < data->naccel-1; j++) {
+				if (!inserted && acceleration > data->accel[j]) {
+					/* Insert the new bucket here */
+					i = j;
+					newbuckets[j] = newbucket;
+					newaccel[j] = acceleration;
+					inserted = 1;
+				}
+				newbuckets[j+inserted] = data->bucket[j];
+				newaccel[j+inserted] = data->accel[j];
+			}
+			if (!inserted) {
+				/* Insert it last */
+				newbuckets[data->naccel-1] = newbucket;
+				newaccel[data->naccel-1] = acceleration;
+			}
+			free(data->bucket);
+			free(data->accel);
+			data->bucket = newbuckets;
+			data->accel = newaccel;
+		}
+#if 0
+		fprintf(stderr,"buckets:");
+		for (j = 0; j < data->naccel; j++) {
+			fprintf(stderr, " %f", data->accel[j]);
+		}
+		fprintf(stderr,"\ninserting %p %f to %d\n", task, acceleration, i);
+#endif
+		_starpu_prio_deque_push_back_task(data->bucket[i],task);
+		STARPU_COMPONENT_MUTEX_UNLOCK(mutex);
+	} else {
+		/* Not all archs can run it, will resort to HEFT strategy */
+		acceleration = INFINITY;
+		//fprintf(stderr,"%s: some archs can't do it\n", starpu_task_get_name(task));
+		struct _starpu_prio_deque * no_accel = &data->no_accel;
+		STARPU_COMPONENT_MUTEX_LOCK(mutex);
+		_starpu_prio_deque_push_back_task(no_accel,task);
+		STARPU_COMPONENT_MUTEX_UNLOCK(mutex);
+	}
+
+	heteroprio_progress(component);
+
+	return 0;
+}
+
+static int heteroprio_can_push(struct starpu_sched_component *component, struct starpu_sched_component * to STARPU_ATTRIBUTE_UNUSED)
+{
+	heteroprio_progress(component);
+	int ret = 0;
+	unsigned j;
+	for(j=0; j < component->nparents; j++)
+	{
+		if(component->parents[j] == NULL)
+			continue;
+		else
+		{
+			ret = component->parents[j]->can_push(component->parents[j], component);
+			if(ret)
+				break;
+		}
+	}
+	return ret;
+}
+
+static void heteroprio_component_deinit_data(struct starpu_sched_component * component)
+{
+	STARPU_ASSERT(starpu_sched_component_is_heteroprio(component));
+	struct _starpu_heteroprio_data * d = component->data;
+	struct _starpu_mct_data * mct_d = d->mct_data;
+	unsigned i;
+	for (i = 0; i < d->naccel; i++)
+	{
+		_starpu_prio_deque_destroy(d->bucket[i]);
+		free(d->bucket[i]);
+	}
+	free(d->bucket);
+	free(d->accel);
+	_starpu_prio_deque_destroy(&d->no_accel);
+	STARPU_PTHREAD_MUTEX_DESTROY(&d->mutex);
+	STARPU_PTHREAD_MUTEX_DESTROY(&mct_d->scheduling_mutex);
+	free(mct_d);
+	free(d);
+}
+
+int starpu_sched_component_is_heteroprio(struct starpu_sched_component * component)
+{
+	return component->push_task == heteroprio_push_task;
+}
+
+struct starpu_sched_component * starpu_sched_component_heteroprio_create(struct starpu_sched_tree *tree, struct starpu_sched_component_mct_data * params)
+{
+	struct starpu_sched_component * component = starpu_sched_component_create(tree, "heteroprio");
+	struct _starpu_mct_data *mct_data = starpu_mct_init_parameters(params);
+	struct _starpu_heteroprio_data *data;
+	_STARPU_MALLOC(data, sizeof(*data));
+
+	data->bucket = NULL;
+	data->accel = NULL;
+	data->naccel = 0;
+	_starpu_prio_deque_init(&data->no_accel);
+	STARPU_PTHREAD_MUTEX_INIT(&data->mutex,NULL);
+	data->mct_data = mct_data;
+	STARPU_PTHREAD_MUTEX_INIT(&mct_data->scheduling_mutex,NULL);
+	component->data = data;
+
+	component->push_task = heteroprio_push_task;
+	component->can_push = heteroprio_can_push;
+	component->deinit_data = heteroprio_component_deinit_data;
+
+	return component;
+}

+ 2 - 1
src/sched_policies/modular_ez.c

@@ -201,7 +201,8 @@ void starpu_sched_component_initialize_simple_scheduler(starpu_sched_component_c
 	/* Take default ntasks_threshold */
 	/* Take default ntasks_threshold */
 	unsigned ntasks_threshold;
 	unsigned ntasks_threshold;
 	if (starpu_sched_component_is_heft(decision_component) ||
 	if (starpu_sched_component_is_heft(decision_component) ||
-	    starpu_sched_component_is_mct(decision_component)) {
+	    starpu_sched_component_is_mct(decision_component) ||
+	    starpu_sched_component_is_heteroprio(decision_component)) {
 		/* These need more queueing to allow CPUs to take some share of the work */
 		/* These need more queueing to allow CPUs to take some share of the work */
 		ntasks_threshold = _STARPU_SCHED_NTASKS_THRESHOLD_HEFT;
 		ntasks_threshold = _STARPU_SCHED_NTASKS_THRESHOLD_HEFT;
 	} else {
 	} else {

+ 58 - 0
src/sched_policies/modular_heteroprio.c

@@ -0,0 +1,58 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2013-2015,2017                           Inria
+ * Copyright (C) 2014,2015,2017                           CNRS
+ * Copyright (C) 2013-2015,2017,2018-2019                 Université de Bordeaux
+ * Copyright (C) 2013                                     Simon Archipoff
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <starpu_sched_component.h>
+#include <starpu_scheduler.h>
+#include <float.h>
+#include <limits.h>
+
+static void initialize_heteroprio_center_policy(unsigned sched_ctx_id)
+{
+	starpu_sched_component_initialize_simple_scheduler((starpu_sched_component_create_t) starpu_sched_component_heteroprio_create, NULL,
+			STARPU_SCHED_SIMPLE_DECIDE_WORKERS |
+			STARPU_SCHED_SIMPLE_PERFMODEL |
+			STARPU_SCHED_SIMPLE_FIFO_ABOVE |
+			STARPU_SCHED_SIMPLE_FIFO_ABOVE_PRIO |
+			STARPU_SCHED_SIMPLE_FIFOS_BELOW |
+			STARPU_SCHED_SIMPLE_FIFOS_BELOW_PRIO |
+			STARPU_SCHED_SIMPLE_FIFOS_BELOW_READY |
+			STARPU_SCHED_SIMPLE_IMPL, sched_ctx_id);
+}
+
+static void deinitialize_heteroprio_center_policy(unsigned sched_ctx_id)
+{
+	struct starpu_sched_tree *t = (struct starpu_sched_tree*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
+	starpu_sched_tree_destroy(t);
+}
+
+struct starpu_sched_policy _starpu_sched_modular_heteroprio_policy =
+{
+	.init_sched = initialize_heteroprio_center_policy,
+	.deinit_sched = deinitialize_heteroprio_center_policy,
+	.add_workers = starpu_sched_tree_add_workers,
+	.remove_workers = starpu_sched_tree_remove_workers,
+	.push_task = starpu_sched_tree_push_task,
+	.pop_task = starpu_sched_tree_pop_task,
+	.pre_exec_hook = starpu_sched_component_worker_pre_exec_hook,
+	.post_exec_hook = starpu_sched_component_worker_post_exec_hook,
+	.pop_every_task = NULL,
+	.policy_name = "modular-heteroprio",
+	.policy_description = "heteroprio modular policy",
+	.worker_type = STARPU_WORKER_LIST,
+};

+ 1 - 1
tests/microbenchs/parallel_independent_heterogeneous_tasks.sh

@@ -17,6 +17,6 @@
 #
 #
 source $(dirname $0)/microbench.sh
 source $(dirname $0)/microbench.sh
 
 
-XFAIL="modular-eager-prefetching modular-prio-prefetching modular-random-prefetching modular-random-prio-prefetching modular-prandom modular-prandom-prio modular-ws modular-heft modular-heft-prio modular-heft2 modular-gemm random peager heteroprio graph_test"
+XFAIL="modular-eager-prefetching modular-prio-prefetching modular-random-prefetching modular-random-prio-prefetching modular-prandom modular-prandom-prio modular-ws modular-heft modular-heft-prio modular-heft2 modular-heteroprio modular-gemm random peager heteroprio graph_test"
 
 
 test_scheds parallel_independent_heterogeneous_tasks
 test_scheds parallel_independent_heterogeneous_tasks

+ 1 - 1
tests/microbenchs/parallel_independent_homogeneous_tasks.sh

@@ -17,6 +17,6 @@
 #
 #
 source $(dirname $0)/microbench.sh
 source $(dirname $0)/microbench.sh
 
 
-XFAIL="modular-eager-prefetching modular-prio-prefetching modular-random-prefetching modular-random-prio-prefetching modular-prandom modular-prandom-prio modular-ws modular-heft modular-heft-prio modular-heft2 modular-gemm random peager heteroprio graph_test"
+XFAIL="modular-eager-prefetching modular-prio-prefetching modular-random-prefetching modular-random-prio-prefetching modular-prandom modular-prandom-prio modular-ws modular-heft modular-heft-prio modular-heft2 modular-heteroprio modular-gemm random peager heteroprio graph_test"
 
 
 test_scheds parallel_independent_homogeneous_tasks
 test_scheds parallel_independent_homogeneous_tasks