Просмотр исходного кода

Add support for parallel tasks:
- introduce the "type" field in the codelet structure, it may be either
STARPU_SEQ for a sequential task, STARPU_SPMD for a parallel task in SPMD
mode (each worker executes the codelet implementation) or STARPU_FORKJOIN for
a Fork-join mode (only one worker executes the codelet implementation and is
responsible for launching extra threads).
- the max_parallelism field indicates the maximum number of workers that a
parallel task may use.
- the scheduling policy may create "combined workers": a combined worker is a
combination of "basic workers" (serial workers). Each combined worker is
attributed a workerid so that it is also possible to explicitely bind a task
to a parallel worker by the means of this workerid.
- starpu_combined_worker_get_id is a variation of starpu_worker_get_id which
returns the id of the combined worker in case the worker is currently in a
parallel mode, starpu_worker_get_id always returns the id of the basic worker
executing the code.
- starpu_combined_worker_get_size returns the number of basic workers in the
current combined workers (or 1 if this is a basic worker).
- starpu_combined_worker_get_rank returns the rank of the basic worker in case
we execute a parallel task in SPMD mode.
- the Parallel-HEFT (pheft) policy adds support for parallel tasks to the DMDA
strategy.
- the _starpu_sched_find_worker_combinations determines all CPU combinations
and may be called during the initialization phase of the scheduling policy.
It is currently extremelly naive and should be extended to use hwloc and map
workers with respect to the actual topology.
- the tests/parallel_tasks/ directory now contains tests for parallel tasks

Cédric Augonnet лет назад: 14
Родитель
Сommit
61cdbb41bd

+ 5 - 0
configure.ac

@@ -156,6 +156,11 @@ if test x$enable_cpu = xyes; then
 	AC_DEFINE(STARPU_USE_CPU, [1], [CPU driver is activated])
 fi
 
+# How many parallel worker can we support ?
+nmaxcombinedworkers=`expr 2 \* $nmaxcpus`
+AC_DEFINE_UNQUOTED(STARPU_NMAX_COMBINEDWORKERS,
+	[$nmaxcombinedworkers], [Maximum number of worker combinations])
+
 ###############################################################################
 #                                                                             #
 #                                 CUDA settings                               #

+ 5 - 0
include/starpu.h

@@ -82,6 +82,7 @@ void starpu_shutdown(void);
 /* This function returns the number of workers (ie. processing units executing
  * StarPU tasks). The returned value should be at most STARPU_NMAXWORKERS. */
 unsigned starpu_worker_get_count(void);
+unsigned starpu_combined_worker_get_count(void);
 
 unsigned starpu_cpu_worker_get_count(void);
 unsigned starpu_cuda_worker_get_count(void);
@@ -93,6 +94,10 @@ unsigned starpu_opencl_worker_get_count(void);
  * or if it is some SPU worker where a single thread controls different SPUs. */
 int starpu_worker_get_id(void);
 
+int starpu_combined_worker_get_id(void);
+int starpu_combined_worker_get_size(void);
+int starpu_combined_worker_get_rank(void);
+
 enum starpu_archtype {
 	STARPU_CPU_WORKER, /* CPU core */
 	STARPU_CUDA_WORKER, /* NVIDIA CUDA device */

+ 1 - 0
include/starpu_config.h.in

@@ -42,6 +42,7 @@
 #undef STARPU_HAVE_CURAND
 
 #undef STARPU_NMAXBUFS
+#undef STARPU_NMAXCPUS
 #undef STARPU_MAXCUDADEVS
 #undef STARPU_MAXOPENCLDEVS
 #undef STARPU_NMAXWORKERS

+ 1 - 1
include/starpu_perfmodel.h

@@ -39,7 +39,7 @@ struct starpu_buffer_descr_t;
 
 enum starpu_perf_archtype {
 	STARPU_CPU_DEFAULT = 0,
-	STARPU_CUDA_DEFAULT = 1,
+	STARPU_CUDA_DEFAULT = STARPU_NMAXCPUS,
 	STARPU_OPENCL_DEFAULT = STARPU_CUDA_DEFAULT + STARPU_MAXCUDADEVS,
 	/* STARPU_OPENCL_DEFAULT + devid */
 	STARPU_GORDON_DEFAULT = STARPU_OPENCL_DEFAULT + STARPU_MAXOPENCLDEVS

+ 8 - 0
include/starpu_scheduler.h

@@ -30,6 +30,8 @@ struct starpu_task;
 struct starpu_machine_topology_s {
 	unsigned nworkers;
 
+	unsigned ncombinedworkers;
+
 #ifdef STARPU_HAVE_HWLOC
 	hwloc_topology_t hwtopology;
 #else
@@ -115,4 +117,10 @@ int starpu_sched_get_max_priority(void);
 void starpu_sched_set_min_priority(int min_prio);
 void starpu_sched_set_max_priority(int max_prio);
 
+int starpu_combined_worker_assign_workerid(int nworkers, int workerid_array[]);
+
+void _starpu_sched_find_worker_combinations(struct starpu_machine_topology_s *topology);
+
+int starpu_combined_worker_get_description(int workerid, int *worker_size, int **combined_workerid);
+
 #endif // __STARPU_SCHEDULER_H__

+ 7 - 0
include/starpu_task.h

@@ -33,6 +33,11 @@
 #define STARPU_GORDON	((1ULL)<<5)
 #define STARPU_OPENCL	((1ULL)<<6)
 
+/* Codelet types */
+#define STARPU_SEQ		0
+#define STARPU_SPMD		1
+#define STARPU_FORKJOIN		2
+
 /* task status */
 #define STARPU_TASK_INVALID	0
 #define STARPU_TASK_BLOCKED	1
@@ -57,6 +62,8 @@ typedef uint64_t starpu_tag_t;
 typedef struct starpu_codelet_t {
 	/* where can it be performed ? */
 	uint32_t where;
+	unsigned type;
+	int max_parallelism;
 
 	/* the different implementations of the codelet */
 	void (*cuda_func)(void **, void *);

+ 4 - 0
src/Makefile.am

@@ -112,6 +112,7 @@ libstarpu_la_SOURCES = 						\
 	core/jobs.c						\
 	core/task.c						\
 	core/workers.c						\
+	core/combined_workers.c					\
 	core/topology.c						\
 	core/debug.c						\
 	core/errorcheck.c					\
@@ -129,6 +130,7 @@ libstarpu_la_SOURCES = 						\
 	core/perfmodel/regression.c				\
 	core/sched_policy.c					\
 	core/priorities.c					\
+	core/parallel_task.c					\
 	sched_policies/eager_central_policy.c			\
 	sched_policies/eager_central_priority_policy.c		\
 	sched_policies/work_stealing_policy.c			\
@@ -137,6 +139,8 @@ libstarpu_la_SOURCES = 						\
 	sched_policies/stack_queues.c				\
 	sched_policies/deque_queues.c				\
 	sched_policies/fifo_queues.c				\
+	sched_policies/detect_combined_workers.c		\
+	sched_policies/parallel_heft.c				\
 	drivers/driver_common/driver_common.c			\
 	datawizard/memory_nodes.c				\
 	datawizard/write_back.c					\

+ 125 - 0
src/core/combined_workers.c

@@ -0,0 +1,125 @@
+/*
+ * StarPU
+ * Copyright (C) Université Bordeaux 1, CNRS 2008-2010 (see AUTHORS file)
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <starpu.h>
+#include <common/config.h>
+#include <core/workers.h>
+
+#include <sched.h>
+
+#ifdef __MINGW32__
+#include <windows.h>
+#endif
+
+/* Create a new worker id for a combination of workers. This method should
+ * typically be called at the initialization of the scheduling policy. This
+ * worker should be the combination of the list of id's contained in the
+ * workerid_array array which has nworkers entries. This function returns
+ * the identifier of the combined worker in case of success, a negative value
+ * is returned otherwise. */
+int starpu_combined_worker_assign_workerid(int nworkers, int workerid_array[])
+{
+	int new_workerid;
+
+	/* Return the number of actual workers. */
+	struct starpu_machine_config_s *config = _starpu_get_machine_config();
+
+	int basic_worker_count = (int)config->topology.nworkers;
+	int combined_worker_id = (int)config->topology.ncombinedworkers;
+
+	/* Test that all workers are not combined workers already. */
+	int i;
+	for (i = 0; i < nworkers; i++)
+	{
+		int id = workerid_array[i];
+
+		/* We only combine CPUs */
+		STARPU_ASSERT(config->workers[id].perf_arch == STARPU_CPU_DEFAULT);
+		STARPU_ASSERT(config->workers[id].worker_mask == STARPU_CPU);
+
+		/* We only combine valid "basic" workers */
+		if ((id < 0) || (id >= basic_worker_count))
+			return -EINVAL;
+	}
+
+	/* Get an id for that combined worker. Note that this is not thread
+	 * safe because thhis method should only be called when the scheduler
+	 * is being initialized. */
+	new_workerid = basic_worker_count + combined_worker_id;
+	config->topology.ncombinedworkers++;
+
+#if 0
+	fprintf(stderr, "COMBINED WORKERS ");
+	for (i = 0; i < nworkers; i++)
+	{
+		fprintf(stderr, "%d ", workerid_array[i]);
+	}
+	fprintf(stderr, "into worker %d\n", new_workerid);
+#endif
+
+	struct starpu_combined_worker_s *combined_worker =
+		&config->combined_workers[combined_worker_id];
+
+	combined_worker->worker_size = nworkers;
+	combined_worker->perf_arch = STARPU_CPU_DEFAULT + nworkers - 1;
+	combined_worker->worker_mask = STARPU_CPU;
+
+	/* We assume that the memory node should either be that of the first
+	 * entry, and it is very likely that every worker in the combination
+	 * should be on the same memory node.*/
+	int first_id = workerid_array[0];
+	combined_worker->memory_node = config->workers[first_id].memory_node;
+
+	/* Save the list of combined workers */
+	memcpy(&combined_worker->combined_workerid, workerid_array, nworkers*sizeof(int));
+
+	CPU_ZERO(&combined_worker->cpu_set);
+#ifdef STARPU_HAVE_HWLOC
+	combined_worker->hwloc_cpu_set = hwloc_cpuset_alloc();
+#endif
+
+	for (i = 0; i < nworkers; i++)
+	{
+		int id = workerid_array[i];
+		CPU_OR(&combined_worker->cpu_set,
+			&combined_worker->cpu_set,
+			&config->workers[id].initial_cpu_set);
+
+#ifdef STARPU_HAVE_HWLOC
+		hwloc_cpuset_or(combined_worker->hwloc_cpu_set,
+				combined_worker->hwloc_cpu_set,
+				config->workers[id].initial_hwloc_cpu_set);
+#endif
+	}
+
+	return new_workerid;
+}
+
+int starpu_combined_worker_get_description(int workerid, int *worker_size, int **combined_workerid)
+{
+	/* Check that this is the id of a combined worker */
+	struct starpu_combined_worker_s *worker;
+	worker = _starpu_get_combined_worker_struct(workerid);
+	STARPU_ASSERT(worker);
+
+	if (worker_size)
+		*worker_size = worker->worker_size;
+
+	if (combined_workerid)
+		*combined_workerid = worker->combined_workerid;
+
+	return 0;
+}

+ 23 - 0
src/core/combined_workers.h

@@ -0,0 +1,23 @@
+/*
+ * StarPU
+ * Copyright (C) Université Bordeaux 1, CNRS 2008-2010 (see AUTHORS file)
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#ifndef __COMBINED_WORKERS_H__
+#define __COMBINED_WORKERS_H__
+
+#include <starpu.h>
+#include <common/config.h>
+
+#endif // __COMBINED_WORKERS_H__

+ 3 - 0
src/core/jobs.c

@@ -85,6 +85,9 @@ starpu_job_t __attribute__((malloc)) _starpu_job_create(struct starpu_task *task
 
 	job->bound_task = NULL;
 
+	/* By default we have sequential tasks */
+	job->task_size = 1;
+
 	if (task->use_tag)
 		_starpu_tag_declare(task->tag_id, job);
 

+ 15 - 0
src/core/jobs.h

@@ -106,6 +106,21 @@ LIST_TYPE(starpu_job,
         const char *model_name;
 #endif
 	struct bound_task *bound_task;
+
+	/* Number of workers executing that task (>1 if the task is parallel)
+	 * */
+	int task_size;
+
+	/* In case we have assigned this job to a combined workerid */
+	int combined_workerid;
+
+	/* How many workers are currently running an alias of that job (for
+	 * parallel tasks only). */
+	int active_task_alias_count;
+
+	/* Parallel workers may have to synchronize before/after the execution of a parallel task. */
+	pthread_barrier_t before_work_barrier;
+	pthread_barrier_t after_work_barrier;
 );
 
 /* Create an internal starpu_job_t structure to encapsulate the task. */

+ 33 - 0
src/core/parallel_task.c

@@ -0,0 +1,33 @@
+/*
+ * StarPU
+ * Copyright (C) Université Bordeaux 1, CNRS 2008-2010 (see AUTHORS file)
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <starpu.h>
+#include <common/config.h>
+#include <core/jobs.h>
+#include <core/task.h>
+#include <common/utils.h>
+
+struct starpu_task *_starpu_create_task_alias(struct starpu_task *task)
+{
+	struct starpu_task *task_dup = malloc(sizeof(struct starpu_task));
+	STARPU_ASSERT(task_dup);
+
+	/* XXX perhaps this is a bit too much overhead and we should only copy
+	 * part of the structure ? */
+	memcpy(task_dup, task, sizeof(struct starpu_task));
+
+	return task_dup;
+}

+ 0 - 0
src/core/parallel_task.h


+ 10 - 1
src/core/perfmodel/perfmodel.c

@@ -50,7 +50,16 @@ enum starpu_perf_archtype starpu_worker_get_perf_archtype(int workerid)
 {
 	struct starpu_machine_config_s *config = _starpu_get_machine_config();
 
-	return config->workers[workerid].perf_arch;
+	/* This workerid may either be a basic worker or a combined worker */
+	unsigned nworkers = config->topology.nworkers;
+
+	if (workerid < (int)config->topology.nworkers)
+		return config->workers[workerid].perf_arch;
+
+	/* We have a combined worker */
+	unsigned ncombinedworkers = config->topology.ncombinedworkers;
+	STARPU_ASSERT(workerid < (int)(ncombinedworkers + nworkers));
+	return config->combined_workers[workerid - nworkers].perf_arch;
 }
 
 /*

+ 12 - 2
src/core/perfmodel/perfmodel_history.c

@@ -471,9 +471,19 @@ int starpu_load_history_debug(const char *symbol, struct starpu_perfmodel_t *mod
 
 void starpu_perfmodel_get_arch_name(enum starpu_perf_archtype arch, char *archname, size_t maxlen)
 {
-	if (arch == STARPU_CPU_DEFAULT)
+	if (arch < STARPU_CUDA_DEFAULT)
 	{
-		snprintf(archname, maxlen, "cpu");
+		if (arch == STARPU_CPU_DEFAULT)
+		{
+#warning We could just use cpu_1 as well ...
+			snprintf(archname, maxlen, "cpu");
+		}
+		else
+		{
+			/* For combined CPU workers */
+			int cpu_count = arch - STARPU_CPU_DEFAULT + 1;
+			snprintf(archname, maxlen, "cpu_%d", cpu_count);
+		}
 	}
 	else if ((STARPU_CUDA_DEFAULT <= arch)
 		&& (arch < STARPU_CUDA_DEFAULT + STARPU_MAXCUDADEVS))

+ 65 - 13
src/core/sched_policy.c

@@ -44,8 +44,9 @@ extern struct starpu_sched_policy_s _starpu_sched_dmda_policy;
 extern struct starpu_sched_policy_s _starpu_sched_dmda_ready_policy;
 extern struct starpu_sched_policy_s _starpu_sched_dmda_sorted_policy;
 extern struct starpu_sched_policy_s _starpu_sched_eager_policy;
+extern struct starpu_sched_policy_s _starpu_sched_parallel_heft_policy;
 
-#define NPREDEFINED_POLICIES	9
+#define NPREDEFINED_POLICIES	10
 
 static struct starpu_sched_policy_s *predefined_policies[NPREDEFINED_POLICIES] = {
 	&_starpu_sched_ws_policy,
@@ -56,7 +57,8 @@ static struct starpu_sched_policy_s *predefined_policies[NPREDEFINED_POLICIES] =
 	&_starpu_sched_dmda_ready_policy,
 	&_starpu_sched_dmda_sorted_policy,
 	&_starpu_sched_random_policy,
-	&_starpu_sched_eager_policy
+	&_starpu_sched_eager_policy,
+	&_starpu_sched_parallel_heft_policy
 };
 
 struct starpu_sched_policy_s *_starpu_get_sched_policy(void)
@@ -200,6 +202,66 @@ void _starpu_deinit_sched_policy(struct starpu_machine_config_s *config)
 		policy.deinit_sched(&config->topology, &policy);
 }
 
+/* Enqueue a task into the list of tasks explicitely attached to a worker. In
+ * case workerid identifies a combined worker, a task will be enqueued into
+ * each worker of the combination. */
+static int _starpu_push_task_on_specific_worker(struct starpu_task *task, int workerid)
+{
+	int nbasic_workers = (int)starpu_worker_get_count();
+
+	/* Is this a basic worker or a combined worker ? */
+	int is_basic_worker = (workerid < nbasic_workers);
+
+	unsigned memory_node; 
+	struct starpu_worker_s *worker;
+	struct starpu_combined_worker_s *combined_worker;
+
+	if (is_basic_worker)
+	{
+		worker = _starpu_get_worker_struct(workerid);
+		memory_node = worker->memory_node;
+	}
+	else
+	{
+		combined_worker = _starpu_get_combined_worker_struct(workerid);
+		memory_node = combined_worker->memory_node;
+	}
+
+	if (use_prefetch)
+		_starpu_prefetch_task_input_on_node(task, memory_node);
+
+	if (is_basic_worker)
+	{
+		return _starpu_push_local_task(worker, task);
+	}
+	else {
+		/* This is a combined worker so we create task aliases */
+		int worker_size = combined_worker->worker_size;
+		int *combined_workerid = combined_worker->combined_workerid;
+
+		int ret = 0;
+		int i;
+
+		starpu_job_t j = _starpu_get_job_associated_to_task(task);
+		j->task_size = worker_size;
+		j->combined_workerid = workerid;
+		j->active_task_alias_count = 0;
+
+		pthread_barrier_init(&j->before_work_barrier, NULL, worker_size);
+		pthread_barrier_init(&j->after_work_barrier, NULL, worker_size);
+
+		for (i = 0; i < worker_size; i++)
+		{
+			struct starpu_task *alias = _starpu_create_task_alias(task);
+
+			worker = _starpu_get_worker_struct(combined_workerid[i]);
+			ret |= _starpu_push_local_task(worker, alias);
+		}
+
+		return ret;
+	}
+}
+
 /* the generic interface that call the proper underlying implementation */
 int _starpu_push_task(starpu_job_t j, unsigned job_is_already_locked)
 {
@@ -222,20 +284,10 @@ int _starpu_push_task(starpu_job_t j, unsigned job_is_already_locked)
         int ret;
 	if (STARPU_UNLIKELY(task->execute_on_a_specific_worker))
 	{
-		unsigned workerid = task->workerid;
-		struct starpu_worker_s *worker = _starpu_get_worker_struct(workerid);
-		
-		if (use_prefetch)
-		{
-			uint32_t memory_node = starpu_worker_get_memory_node(workerid); 
-			_starpu_prefetch_task_input_on_node(task, memory_node);
-		}
-
-		ret = _starpu_push_local_task(worker, task);
+		ret = _starpu_push_task_on_specific_worker(task, task->workerid);
 	}
 	else {
 		STARPU_ASSERT(policy.push_task);
-
 		ret = policy.push_task(task);
 	}
 

+ 1 - 1
src/core/task.c

@@ -221,7 +221,7 @@ int starpu_task_submit(struct starpu_task *task)
 		/* In case we require that a task should be explicitely
 		 * executed on a specific worker, we make sure that the worker
 		 * is able to execute this task.  */
-		if (task->execute_on_a_specific_worker && !_starpu_worker_may_execute_task(task->workerid, task)) {
+		if (task->execute_on_a_specific_worker && !_starpu_combined_worker_may_execute_task(task->workerid, task)) {
                         _STARPU_LOG_OUT_TAG("ENODEV");
 			return -ENODEV;
                 }

+ 2 - 0
src/core/task.h

@@ -39,4 +39,6 @@ int _starpu_submit_job(starpu_job_t j, unsigned do_not_increment_nsubmitted);
  * to a task). */
 starpu_job_t _starpu_get_job_associated_to_task(struct starpu_task *task);
 
+struct starpu_task *_starpu_create_task_alias(struct starpu_task *task);
+
 #endif // __CORE_TASK_H__

+ 15 - 0
src/core/topology.c

@@ -266,6 +266,7 @@ static int _starpu_init_machine_config(struct starpu_machine_config_s *config,
 	struct starpu_machine_topology_s *topology = &config->topology;
 
 	topology->nworkers = 0;
+	topology->ncombinedworkers = 0;
 
 	_starpu_init_topology(config);
 
@@ -699,6 +700,20 @@ static void _starpu_init_workers_binding(struct starpu_machine_config_s *config)
 		}
 
 		workerarg->memory_node = memory_node;
+
+		/* Save the initial cpuset */
+		CPU_ZERO(&workerarg->initial_cpu_set);
+		CPU_SET(workerarg->bindid, &workerarg->initial_cpu_set);
+		CPU_ZERO(&workerarg->current_cpu_set);
+		CPU_SET(workerarg->bindid, &workerarg->current_cpu_set);
+
+#ifdef STARPU_HAVE_HWLOC
+		/* Clear the cpu set and set the cpu */
+		workerarg->initial_hwloc_cpu_set = hwloc_cpuset_alloc();
+		hwloc_cpuset_cpu(workerarg->initial_hwloc_cpu_set, workerarg->bindid);
+		workerarg->current_hwloc_cpu_set = hwloc_cpuset_alloc();
+		hwloc_cpuset_cpu(workerarg->current_hwloc_cpu_set, workerarg->bindid);
+#endif
 	}
 }
 

+ 101 - 5
src/core/workers.c

@@ -74,6 +74,39 @@ int _starpu_worker_may_execute_task(unsigned workerid, struct starpu_task *task)
 	return !!(task->cl->where & config.workers[workerid].worker_mask);
 }
 
+
+
+int _starpu_combined_worker_may_execute_task(unsigned workerid, struct starpu_task *task)
+{
+	/* TODO: check that the task operand sizes will fit on that device */
+	/* TODO: call application-provided function for various cases like
+	 * double support, shared memory size limit, etc. */
+
+	struct starpu_codelet_t *cl = task->cl;
+	unsigned nworkers = config.topology.nworkers;
+
+	/* Is this a parallel worker ? */
+	if (workerid < nworkers)
+	{
+		return !!(task->cl->where & config.workers[workerid].worker_mask);
+	}
+	else {
+		if ((cl->type == STARPU_SPMD) || (cl->type == STARPU_FORKJOIN))
+		{
+			/* TODO we should add other types of constraints */
+
+			/* Is the worker larger than requested ? */
+			int worker_size = (int)config.combined_workers[workerid - nworkers].worker_size;
+			return !!(worker_size <= task->cl->max_parallelism);
+		}
+		else
+		{
+			/* We have a sequential task but a parallel worker */
+			return 0;
+		}
+	}
+}
+
 /*
  * Runtime initialization methods
  */
@@ -93,7 +126,7 @@ static void _starpu_init_worker_queue(struct starpu_worker_s *workerarg)
 	_starpu_memory_node_register_condition(cond, mutex, memory_node);
 }
 
-static void _starpu_init_workers(struct starpu_machine_config_s *config)
+static void _starpu_launch_drivers(struct starpu_machine_config_s *config)
 {
 	config->running = 1;
 
@@ -113,6 +146,9 @@ static void _starpu_init_workers(struct starpu_machine_config_s *config)
 		PTHREAD_COND_INIT(&workerarg->ready_cond, NULL);
 
 		workerarg->workerid = (int)worker;
+		workerarg->worker_size = 1;
+		workerarg->combined_workerid = workerarg->workerid;
+		workerarg->current_rank = 0;
 
 		/* if some codelet's termination cannot be handled directly :
 		 * for instance in the Gordon driver, Gordon tasks' callbacks
@@ -281,14 +317,13 @@ int starpu_init(struct starpu_conf *user_conf)
 	 * threads */
 	_starpu_initialize_current_task_key();	
 
-	/* initialize the scheduler */
-
-	/* initialize the queue containing the jobs */
+	/* initialize the scheduling policy */
 	_starpu_init_sched_policy(&config);
 
 	_starpu_initialize_registered_performance_models();
 
-	_starpu_init_workers(&config);
+	/* Launch "basic" workers (ie. non-combined workers) */
+	_starpu_launch_drivers(&config);
 
 	PTHREAD_MUTEX_LOCK(&init_mutex);
 	initialized = INITIALIZED;
@@ -431,6 +466,11 @@ unsigned starpu_worker_get_count(void)
 	return config.topology.nworkers;
 }
 
+unsigned starpu_combined_worker_get_count(void)
+{
+	return config.topology.ncombinedworkers;
+}
+
 unsigned starpu_cpu_worker_get_count(void)
 {
 	return config.topology.ncpus;
@@ -472,6 +512,54 @@ int starpu_worker_get_id(void)
 	}
 }
 
+int starpu_combined_worker_get_id(void)
+{
+	struct starpu_worker_s *worker;
+
+	worker = _starpu_get_local_worker_key();
+	if (worker)
+	{
+		return worker->combined_workerid;
+	}
+	else {
+		/* there is no worker associated to that thread, perhaps it is
+		 * a thread from the application or this is some SPU worker */
+		return -1;
+	}
+}
+
+int starpu_combined_worker_get_size(void)
+{
+	struct starpu_worker_s *worker;
+
+	worker = _starpu_get_local_worker_key();
+	if (worker)
+	{
+		return worker->worker_size;
+	}
+	else {
+		/* there is no worker associated to that thread, perhaps it is
+		 * a thread from the application or this is some SPU worker */
+		return -1;
+	}
+}
+
+int starpu_combined_worker_get_rank(void)
+{
+	struct starpu_worker_s *worker;
+
+	worker = _starpu_get_local_worker_key();
+	if (worker)
+	{
+		return worker->current_rank;
+	}
+	else {
+		/* there is no worker associated to that thread, perhaps it is
+		 * a thread from the application or this is some SPU worker */
+		return -1;
+	}
+}
+
 int starpu_worker_get_devid(int id)
 {
 	return config.workers[id].devid;
@@ -482,6 +570,14 @@ struct starpu_worker_s *_starpu_get_worker_struct(unsigned id)
 	return &config.workers[id];
 }
 
+struct starpu_combined_worker_s *_starpu_get_combined_worker_struct(unsigned id)
+{
+	unsigned basic_worker_count = starpu_worker_get_count();
+
+	STARPU_ASSERT(id >= basic_worker_count);
+	return &config.combined_workers[id - basic_worker_count];
+}
+
 enum starpu_archtype starpu_worker_get_type(int id)
 {
 	return config.workers[id].arch;

+ 32 - 0
src/core/workers.h

@@ -65,6 +65,9 @@ struct starpu_worker_s {
 	int devid; /* which cpu/gpu/etc is controlled by the workker ? */
 	int bindid; /* which cpu is the driver bound to ? */
 	int workerid; /* uniquely identify the worker among all processing units types */
+	int combined_workerid; /* combined worker currently using this worker */
+	int current_rank; /* current rank in case the worker is used in a parallel fashion */
+	int worker_size; /* size of the worker in case we use a combined worker */
         pthread_cond_t ready_cond; /* indicate when the worker is ready */
 	unsigned memory_node; /* which memory node is associated that worker to ? */
 	pthread_cond_t *sched_cond; /* condition variable used when the worker waits for tasks. */
@@ -77,6 +80,26 @@ struct starpu_worker_s {
 	unsigned worker_is_initialized;
 	starpu_worker_status status; /* what is the worker doing now ? (eg. CALLBACK) */
 	char name[32];
+
+	cpu_set_t initial_cpu_set;
+	cpu_set_t current_cpu_set;
+#ifdef STARPU_HAVE_HWLOC
+	hwloc_cpuset_t initial_hwloc_cpu_set;
+	hwloc_cpuset_t current_hwloc_cpu_set;
+#endif
+};
+
+struct starpu_combined_worker_s {
+	enum starpu_perf_archtype perf_arch; /* in case there are different models of the same arch */
+	uint32_t worker_mask; /* what is the type of workers ? */
+	int worker_size;
+	unsigned memory_node; /* which memory node is associated that worker to ? */
+	int combined_workerid[STARPU_NMAXWORKERS];
+
+	cpu_set_t cpu_set;
+#ifdef STARPU_HAVE_HWLOC
+	hwloc_cpuset_t hwloc_cpu_set;
+#endif
 };
 
 /* in case a single CPU worker may control multiple 
@@ -109,8 +132,14 @@ struct starpu_machine_config_s {
 	/* Which GPU(s) do we use for OpenCL ? */
 	int current_opencl_gpuid;
 	
+	/* Basic workers : each of this worker is running its own driver and
+	 * can be combined with other basic workers. */
 	struct starpu_worker_s workers[STARPU_NMAXWORKERS];
 
+	/* Combined workers: these worker are a combination of basic workers
+	 * that can run parallel tasks together. */
+	struct starpu_combined_worker_s combined_workers[STARPU_NMAX_COMBINEDWORKERS];
+
 	/* This bitmask indicates which kinds of worker are available. For
 	 * instance it is possible to test if there is a CUDA worker with
 	 * the result of (worker_mask & STARPU_CUDA). */
@@ -141,6 +170,7 @@ uint32_t _starpu_may_submit_opencl_task(void);
 
 /* Check if the worker specified by workerid can execute the codelet. */
 int _starpu_worker_may_execute_task(unsigned workerid, struct starpu_task *task);
+int _starpu_combined_worker_may_execute_task(unsigned workerid, struct starpu_task *task);
 
 /* Check whether there is anything that the worker should do instead of
  * sleeping (waiting on something to happen). */
@@ -164,6 +194,8 @@ struct starpu_worker_s *_starpu_get_local_worker_key(void);
  * specified worker. */
 struct starpu_worker_s *_starpu_get_worker_struct(unsigned id);
 
+struct starpu_combined_worker_s *_starpu_get_combined_worker_struct(unsigned id);
+
 /* Returns the structure that describes the overall machine configuration (eg.
  * all workers and topology). */
 struct starpu_machine_config_s *_starpu_get_machine_config(void);

+ 12 - 2
src/datawizard/memory_nodes.c

@@ -151,7 +151,17 @@ void _starpu_memory_node_register_condition(pthread_cond_t *cond, pthread_mutex_
 
 unsigned starpu_worker_get_memory_node(unsigned workerid)
 {
-	struct starpu_worker_s *worker = _starpu_get_worker_struct(workerid);
+	struct starpu_machine_config_s *config = _starpu_get_machine_config();
+
+	/* This workerid may either be a basic worker or a combined worker */
+	unsigned nworkers = config->topology.nworkers;
+
+	if (workerid < config->topology.nworkers)
+		return config->workers[workerid].memory_node;
+
+	/* We have a combined worker */
+	unsigned ncombinedworkers = config->topology.ncombinedworkers;
+	STARPU_ASSERT(workerid < ncombinedworkers + nworkers);
+	return config->combined_workers[workerid - nworkers].memory_node;
 
-	return worker->memory_node;
 }

+ 86 - 31
src/drivers/cpu/driver_cpu.c

@@ -24,7 +24,7 @@
 #include "driver_cpu.h"
 #include <core/sched_policy.h>
 
-static int execute_job_on_cpu(starpu_job_t j, struct starpu_worker_s *cpu_args)
+static int execute_job_on_cpu(starpu_job_t j, struct starpu_worker_s *cpu_args, int is_parallel_task, int rank, enum starpu_perf_archtype perf_arch)
 {
 	int ret;
 	struct timespec codelet_start, codelet_end;
@@ -40,43 +40,68 @@ static int execute_job_on_cpu(starpu_job_t j, struct starpu_worker_s *cpu_args)
 	if (cl->model && cl->model->benchmarking)
 		calibrate_model = 1;
 
-	ret = _starpu_fetch_task_input(task, 0);
-
-	if (ret != 0) {
-		/* there was not enough memory so the codelet cannot be executed right now ... */
-		/* push the codelet back and try another one ... */
-		return -EAGAIN;
+	if (rank == 0)
+	{
+		ret = _starpu_fetch_task_input(task, 0);
+		if (ret != 0)
+		{
+			/* there was not enough memory so the codelet cannot be executed right now ... */
+			/* push the codelet back and try another one ... */
+			return -EAGAIN;
+		}
 	}
 
+	if (is_parallel_task)
+		pthread_barrier_wait(&j->before_work_barrier);
+
 	STARPU_TRACE_START_CODELET_BODY(j);
 
 	struct starpu_task_profiling_info *profiling_info;
-	profiling_info = task->profiling_info;
 
-	if (profiling_info || calibrate_model)
+	if (rank == 0)
 	{
-		starpu_clock_gettime(&codelet_start);
-		_starpu_worker_register_executing_start_date(workerid, &codelet_start);
+		profiling_info = task->profiling_info;
+	
+		if (profiling_info || calibrate_model)
+		{
+			starpu_clock_gettime(&codelet_start);
+			_starpu_worker_register_executing_start_date(workerid, &codelet_start);
+		}
+	
+		cpu_args->status = STATUS_EXECUTING;
+		task->status = STARPU_TASK_RUNNING;	
+	}
+	
+	/* In case this is a Fork-join parallel task, the worker does not
+	 * execute the kernel at all. */
+	if ((rank == 0) || (cl->type != STARPU_FORKJOIN))
+	{
+		cl_func func = cl->cpu_func;
+		func(task->interface, task->cl_arg);
 	}
-
-	cpu_args->status = STATUS_EXECUTING;
-	task->status = STARPU_TASK_RUNNING;	
-
-	cl_func func = cl->cpu_func;
-	func(task->interface, task->cl_arg);
-
-	cl->per_worker_stats[workerid]++;
 	
-	if (profiling_info || calibrate_model)
-		starpu_clock_gettime(&codelet_end);
+	if (rank == 0)
+	{
+		cl->per_worker_stats[workerid]++;
+		
+		if (profiling_info || calibrate_model)
+			starpu_clock_gettime(&codelet_end);
+	
+		STARPU_TRACE_END_CODELET_BODY(j);
+		cpu_args->status = STATUS_UNKNOWN;
+	}
 
-	STARPU_TRACE_END_CODELET_BODY(j);
-	cpu_args->status = STATUS_UNKNOWN;
+	if (is_parallel_task)
+		pthread_barrier_wait(&j->after_work_barrier);
 
-	_starpu_push_task_output(task, 0);
+	if (rank == 0)
+	{
+		_starpu_push_task_output(task, 0);
 
-	_starpu_driver_update_job_feedback(j, cpu_args, profiling_info, calibrate_model,
-			&codelet_start, &codelet_end);
+		_starpu_driver_update_job_feedback(j, cpu_args, profiling_info,
+				calibrate_model, perf_arch,
+				&codelet_start, &codelet_end);
+	}
 
 	return 0;
 }
@@ -124,8 +149,6 @@ void *_starpu_cpu_worker(void *arg)
 		_starpu_datawizard_progress(memnode, 1);
 		STARPU_TRACE_END_PROGRESS(memnode);
 
-		_starpu_execute_registered_progression_hooks();
-
 		PTHREAD_MUTEX_LOCK(cpu_arg->sched_mutex);
 
 		/* perhaps there is some local task to be executed first */
@@ -158,9 +181,40 @@ void *_starpu_cpu_worker(void *arg)
 			continue;
 		}
 
-		_starpu_set_current_task(task);
+		int rank = 0;
+		int is_parallel_task = (j->task_size > 1);
+
+		enum starpu_perf_archtype perf_arch; 
+	
+		/* Get the rank in case it is a parallel task */
+		if (is_parallel_task)
+		{
+			/* We can release the fake task */
+			STARPU_ASSERT(task != j->task);
+			free(task);
+
+			PTHREAD_MUTEX_LOCK(&j->sync_mutex);
+			rank = j->active_task_alias_count++;
+			PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
+
+			struct starpu_combined_worker_s *combined_worker;
+			combined_worker = _starpu_get_combined_worker_struct(j->combined_workerid);
+
+			cpu_arg->combined_workerid = j->combined_workerid;
+			cpu_arg->worker_size = combined_worker->worker_size;
+			cpu_arg->current_rank = rank;
+			perf_arch = combined_worker->perf_arch;
+		}
+		else {
+			cpu_arg->combined_workerid = cpu_arg->workerid;
+			cpu_arg->worker_size = 1;
+			cpu_arg->current_rank = 0;
+			perf_arch = cpu_arg->perf_arch;
+		}
+
+		_starpu_set_current_task(j->task);
 
-                res = execute_job_on_cpu(j, cpu_arg);
+                res = execute_job_on_cpu(j, cpu_arg, is_parallel_task, rank, perf_arch);
 
 		_starpu_set_current_task(NULL);
 
@@ -174,7 +228,8 @@ void *_starpu_cpu_worker(void *arg)
 			}
 		}
 
-		_starpu_handle_job_termination(j, 0);
+		if (rank == 0)
+			_starpu_handle_job_termination(j, 0);
         }
 
 	STARPU_TRACE_WORKER_DEINIT_START

+ 1 - 3
src/drivers/cuda/driver_cuda.c

@@ -201,7 +201,7 @@ static int execute_job_on_cuda(starpu_job_t j, struct starpu_worker_s *args)
 
 	_starpu_push_task_output(task, mask);
 
-	_starpu_driver_update_job_feedback(j, args, profiling_info, calibrate_model,
+	_starpu_driver_update_job_feedback(j, args, profiling_info, calibrate_model, args->perf_arch,
 			&codelet_start, &codelet_end);
 
 	return 0;
@@ -260,8 +260,6 @@ void *_starpu_cuda_worker(void *arg)
 		_starpu_datawizard_progress(memnode, 1);
 		STARPU_TRACE_END_PROGRESS(memnode);
 
-		_starpu_execute_registered_progression_hooks();
-	
 		PTHREAD_MUTEX_LOCK(args->sched_mutex);
 
 		/* perhaps there is some local task to be executed first */

+ 2 - 2
src/drivers/driver_common/driver_common.c

@@ -24,7 +24,7 @@
 
 void _starpu_driver_update_job_feedback(starpu_job_t j, struct starpu_worker_s *worker_args,
 					struct starpu_task_profiling_info *profiling_info,
-					unsigned calibrate_model,
+					unsigned calibrate_model, enum starpu_perf_archtype perf_arch,
 					struct timespec *codelet_start, struct timespec *codelet_end)
 {
 	struct timespec measured_ts;
@@ -47,7 +47,7 @@ void _starpu_driver_update_job_feedback(starpu_job_t j, struct starpu_worker_s *
 		}
 
 		if (calibrate_model)
-			_starpu_update_perfmodel_history(j, worker_args->perf_arch, worker_args->devid, measured);
+			_starpu_update_perfmodel_history(j, perf_arch, worker_args->devid, measured);
 	}
 }
 

+ 1 - 1
src/drivers/driver_common/driver_common.h

@@ -26,7 +26,7 @@
 
 void _starpu_driver_update_job_feedback(starpu_job_t j, struct starpu_worker_s *worker_args,
 		struct starpu_task_profiling_info *profiling_info,
-		unsigned calibrate_model,
+		unsigned calibrate_model, enum starpu_perf_archtype perf_arch,
 		struct timespec *codelet_start, struct timespec *codelet_end);
 
 void _starpu_block_worker(int workerid, pthread_cond_t *cond, pthread_mutex_t *mutex);

+ 1 - 3
src/drivers/opencl/driver_opencl.c

@@ -382,8 +382,6 @@ void *_starpu_opencl_worker(void *arg)
 		_starpu_datawizard_progress(memnode, 1);
 		STARPU_TRACE_END_PROGRESS(memnode);
 
-		_starpu_execute_registered_progression_hooks();
-
 		PTHREAD_MUTEX_LOCK(args->sched_mutex);
 
 		/* perhaps there is some local task to be executed first */
@@ -526,7 +524,7 @@ static int _starpu_opencl_execute_job(starpu_job_t j, struct starpu_worker_s *ar
 
 	_starpu_push_task_output(task, mask);
 
-	_starpu_driver_update_job_feedback(j, args, profiling_info, calibrate_model,
+	_starpu_driver_update_job_feedback(j, args, profiling_info, calibrate_model, args->perf_arch,
 							&codelet_start, &codelet_end);
 
 	return EXIT_SUCCESS;

+ 61 - 0
src/sched_policies/detect_combined_workers.c

@@ -0,0 +1,61 @@
+/*
+ * StarPU
+ * Copyright (C) Université Bordeaux 1, CNRS 2008-2010 (see AUTHORS file)
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <starpu.h>
+#include <common/config.h>
+#include <common/utils.h>
+#include <core/workers.h>
+
+void _starpu_sched_find_worker_combinations(struct starpu_machine_topology_s *topology)
+{
+//#ifdef STARPU_HAVE_HWLOC
+//#error TODO !
+//#else
+	struct starpu_machine_config_s *config = _starpu_get_machine_config();
+
+	/* We put the id of all CPU workers in this array */
+	int cpu_workers[STARPU_NMAXWORKERS];
+	unsigned ncpus = 0;
+
+	unsigned i;
+	for (i = 0; i < topology->nworkers; i++)
+	{
+		if (config->workers[i].perf_arch == STARPU_CPU_DEFAULT)
+			cpu_workers[ncpus++] = i;
+	}
+	
+	unsigned size;
+	for (size = 2; size <= ncpus; size *= 2)
+	{
+		unsigned first_cpu;
+		for (first_cpu = 0; first_cpu < ncpus; first_cpu += size)
+		{
+			if (first_cpu + size <= ncpus)
+			{
+				int workerids[size];
+
+				for (i = 0; i < size; i++)
+					workerids[i] = cpu_workers[first_cpu + i];
+
+				/* We register this combination */
+				int ret;
+				ret = starpu_combined_worker_assign_workerid(size, workerids); 
+				STARPU_ASSERT(ret >= 0);
+			}
+		}
+	}
+//#endif
+}

+ 374 - 0
src/sched_policies/parallel_heft.c

@@ -0,0 +1,374 @@
+/*
+ * StarPU
+ * Copyright (C) Université Bordeaux 1, CNRS 2008-2010 (see AUTHORS file)
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+/* Distributed queues using performance modeling to assign tasks */
+
+#include <float.h>
+#include <limits.h>
+#include <core/workers.h>
+#include <sched_policies/fifo_queues.h>
+#include <core/perfmodel/perfmodel.h>
+
+static pthread_mutex_t big_lock;
+
+static unsigned nworkers, ncombinedworkers;
+static enum starpu_perf_archtype applicable_perf_archtypes[STARPU_NARCH_VARIATIONS];
+static unsigned napplicable_perf_archtypes = 0;
+
+static struct starpu_fifo_taskq_s *queue_array[STARPU_NMAXWORKERS];
+
+static pthread_cond_t sched_cond[STARPU_NMAXWORKERS];
+static pthread_mutex_t sched_mutex[STARPU_NMAXWORKERS];
+
+static double alpha = 1.0;
+static double beta = 1.0;
+
+static struct starpu_task *parallel_heft_pop_task(void)
+{
+	struct starpu_task *task;
+
+	int workerid = starpu_worker_get_id();
+	struct starpu_fifo_taskq_s *fifo = queue_array[workerid];
+
+	task = _starpu_fifo_pop_task(fifo, -1);
+	if (task) {
+		double model = task->predicted;
+	
+		fifo->exp_len -= model;
+		fifo->exp_start = _starpu_timing_now() + model;
+		fifo->exp_end = fifo->exp_start + fifo->exp_len;
+	}
+
+	return task;
+}
+
+static int push_task_on_best_worker(struct starpu_task *task, int best_workerid, double predicted, int prio)
+{
+	/* make sure someone coule execute that task ! */
+	STARPU_ASSERT(best_workerid != -1);
+
+	/* Is this a basic worker or a combined worker ? */
+	int nbasic_workers = (int)starpu_worker_get_count();
+	int is_basic_worker = (best_workerid < nbasic_workers);
+
+	unsigned memory_node; 
+	memory_node = starpu_worker_get_memory_node(best_workerid);
+
+	if (_starpu_get_prefetch_flag())
+		_starpu_prefetch_task_input_on_node(task, memory_node);
+
+	if (is_basic_worker)
+	{
+		PTHREAD_MUTEX_LOCK(&big_lock);
+
+		struct starpu_fifo_taskq_s *fifo;
+		fifo = queue_array[best_workerid];
+	
+		fifo->exp_end += predicted;
+		fifo->exp_len += predicted;
+	
+		task->predicted = predicted;
+	
+		int ret;
+
+		if (prio)
+		{
+			ret = _starpu_fifo_push_prio_task(queue_array[best_workerid],
+				&sched_mutex[best_workerid], &sched_cond[best_workerid], task);
+		}
+		else {
+			ret = _starpu_fifo_push_task(queue_array[best_workerid],
+				&sched_mutex[best_workerid], &sched_cond[best_workerid], task);
+		}
+
+		PTHREAD_MUTEX_UNLOCK(&big_lock);
+
+		return ret;
+	}
+	else {
+		/* This is a combined worker so we create task aliases */
+		struct starpu_combined_worker_s *combined_worker;
+		combined_worker = _starpu_get_combined_worker_struct(best_workerid);
+		int worker_size = combined_worker->worker_size;
+		int *combined_workerid = combined_worker->combined_workerid;
+
+		int ret = 0;
+		int i;
+		
+		task->predicted = predicted;
+
+		starpu_job_t j = _starpu_get_job_associated_to_task(task);
+		j->task_size = worker_size;
+		j->combined_workerid = best_workerid;
+		j->active_task_alias_count = 0;
+
+		pthread_barrier_init(&j->before_work_barrier, NULL, worker_size);
+		pthread_barrier_init(&j->after_work_barrier, NULL, worker_size);
+
+		PTHREAD_MUTEX_LOCK(&big_lock);
+
+		for (i = 0; i < worker_size; i++)
+		{
+			struct starpu_task *alias = _starpu_create_task_alias(task);
+			int local_worker = combined_workerid[i];
+
+			struct starpu_fifo_taskq_s *fifo;
+			fifo = queue_array[local_worker];
+		
+			fifo->exp_end += predicted;
+			fifo->exp_len += predicted;
+		
+			alias->predicted = predicted;
+		
+			if (prio)
+			{
+				ret |= _starpu_fifo_push_prio_task(queue_array[local_worker],
+					&sched_mutex[local_worker], &sched_cond[local_worker], alias);
+			}
+			else {
+				ret |= _starpu_fifo_push_task(queue_array[local_worker],
+					&sched_mutex[local_worker], &sched_cond[local_worker], alias);
+			}
+		}
+
+		PTHREAD_MUTEX_UNLOCK(&big_lock);
+
+		return ret;
+	}
+}
+
+static double compute_expected_end(int workerid, double length)
+{
+	if (workerid < (int)nworkers)
+	{
+		/* This is a basic worker */
+		struct starpu_fifo_taskq_s *fifo;
+		fifo = queue_array[workerid];
+		return (fifo->exp_start + fifo->exp_len + length);
+	}
+	else {
+		/* This is a combined worker, the expected end is the end for the latest worker */
+		int worker_size;
+		int *combined_workerid;
+		starpu_combined_worker_get_description(workerid, &worker_size, &combined_workerid);
+
+		double exp_end = DBL_MIN;
+
+		int i;
+		for (i = 0; i < worker_size; i++)
+		{
+			struct starpu_fifo_taskq_s *fifo;
+			fifo = queue_array[combined_workerid[i]];
+			double local_exp_end = (fifo->exp_start + fifo->exp_len + length);
+			exp_end = STARPU_MAX(exp_end, local_exp_end);
+		}
+
+		return exp_end;
+	}
+}
+
+static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio)
+{
+	/* find the queue */
+	struct starpu_fifo_taskq_s *fifo;
+	unsigned worker;
+	int best = -1;
+	
+	/* this flag is set if the corresponding worker is selected because
+	   there is no performance prediction available yet */
+	int forced_best = -1;
+
+	double local_task_length[nworkers+ncombinedworkers];
+	double local_data_penalty[nworkers+ncombinedworkers];
+	double exp_end[nworkers+ncombinedworkers];
+	double fitness[nworkers+ncombinedworkers];
+
+	int skip_worker[nworkers+ncombinedworkers];
+
+	double best_exp_end = DBL_MAX;
+	double model_best = 0.0;
+	double penality_best = 0.0;
+
+	for (worker = 0; worker < nworkers; worker++)
+	{
+		fifo = queue_array[worker];
+		fifo->exp_start = STARPU_MAX(fifo->exp_start, _starpu_timing_now());
+		fifo->exp_end = STARPU_MAX(fifo->exp_end, _starpu_timing_now());
+	}
+
+	for (worker = 0; worker < (nworkers+ncombinedworkers); worker++)
+	{
+		if (!_starpu_combined_worker_may_execute_task(worker, task))
+		{
+			/* no one on that queue may execute this task */
+			skip_worker[worker] = 1;
+			continue;
+		}
+		else {
+			skip_worker[worker] = 0;
+		}
+
+		enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(worker);
+		local_task_length[worker] = _starpu_task_expected_length(task, perf_arch);
+
+		unsigned memory_node = starpu_worker_get_memory_node(worker);
+		local_data_penalty[worker] = _starpu_data_expected_penalty(memory_node, task);
+
+		if (local_task_length[worker] == -1.0)
+		{
+			forced_best = worker;
+			break;
+		}
+
+		exp_end[worker] = compute_expected_end(worker, local_task_length[worker]);
+
+		if (exp_end[worker] < best_exp_end)
+		{
+			/* a better solution was found */
+			best_exp_end = exp_end[worker];
+		}
+	}
+
+	double best_fitness = -1;
+	
+	if (forced_best == -1)
+	{
+		for (worker = 0; worker < nworkers+ncombinedworkers; worker++)
+		{
+
+			if (skip_worker[worker])
+			{
+				/* no one on that queue may execute this task */
+				continue;
+			}
+	
+			fitness[worker] = alpha*(exp_end[worker] - best_exp_end) 
+					+ beta*(local_data_penalty[worker]);
+
+			if (best == -1 || fitness[worker] < best_fitness)
+			{
+				/* we found a better solution */
+				best_fitness = fitness[worker];
+				best = worker;
+			}
+		}
+	}
+
+	STARPU_ASSERT(forced_best != -1 || best != -1);
+	
+	if (forced_best != -1)
+	{
+		/* there is no prediction available for that task
+		 * with that arch we want to speed-up calibration time
+		 * so we force this measurement */
+		best = worker;
+		model_best = 0.0;
+		penality_best = 0.0;
+	}
+	else 
+	{
+		model_best = local_task_length[best];
+		penality_best = local_data_penalty[best];
+	}
+
+	/* we should now have the best worker in variable "best" */
+	return push_task_on_best_worker(task, best, model_best, prio);
+}
+
+static int parallel_heft_push_prio_task(struct starpu_task *task)
+{
+	return _parallel_heft_push_task(task, 1);
+}
+
+static int parallel_heft_push_task(struct starpu_task *task)
+{
+	if (task->priority == STARPU_MAX_PRIO)
+		return _parallel_heft_push_task(task, 1);
+
+	return _parallel_heft_push_task(task, 0);
+}
+
+static void initialize_parallel_heft_policy(struct starpu_machine_topology_s *topology, 
+	 __attribute__ ((unused)) struct starpu_sched_policy_s *_policy) 
+{
+	nworkers = topology->nworkers;
+
+	const char *strval_alpha = getenv("STARPU_SCHED_ALPHA");
+	if (strval_alpha)
+		beta = atof(strval_alpha);
+
+	const char *strval_beta = getenv("STARPU_SCHED_BETA");
+	if (strval_beta)
+		beta = atof(strval_beta);
+
+	_starpu_sched_find_worker_combinations(topology);
+
+	ncombinedworkers = topology->ncombinedworkers;
+
+	unsigned workerid;
+	for (workerid = 0; workerid < nworkers; workerid++)
+	{
+		queue_array[workerid] = _starpu_create_fifo();
+	
+		PTHREAD_MUTEX_INIT(&sched_mutex[workerid], NULL);
+		PTHREAD_COND_INIT(&sched_cond[workerid], NULL);
+	
+		starpu_worker_set_sched_condition(workerid, &sched_cond[workerid], &sched_mutex[workerid]);
+	}
+
+	PTHREAD_MUTEX_INIT(&big_lock, NULL);
+
+	/* We pre-compute an array of all the perfmodel archs that are applicable */
+	unsigned total_worker_count = nworkers + ncombinedworkers;
+
+	unsigned used_perf_archtypes[STARPU_NARCH_VARIATIONS];
+	memset(used_perf_archtypes, 0, sizeof(used_perf_archtypes));
+
+	for (workerid = 0; workerid < total_worker_count; workerid++)
+	{
+		enum starpu_perf_archtype perf_archtype = starpu_worker_get_perf_archtype(workerid);
+		used_perf_archtypes[perf_archtype] = 1;
+	}
+
+	napplicable_perf_archtypes = 0;
+
+	int arch;
+	for (arch = 0; arch < STARPU_NARCH_VARIATIONS; arch++)
+	{
+		if (used_perf_archtypes[arch])
+			applicable_perf_archtypes[napplicable_perf_archtypes++] = arch;
+	}
+}
+
+static void deinitialize_parallel_heft_policy(struct starpu_machine_topology_s *topology, 
+	 __attribute__ ((unused)) struct starpu_sched_policy_s *_policy) 
+{
+	unsigned workerid;
+	for (workerid = 0; workerid < topology->nworkers; workerid++)
+		_starpu_destroy_fifo(queue_array[workerid]);
+}
+
+struct starpu_sched_policy_s _starpu_sched_parallel_heft_policy = {
+	.init_sched = initialize_parallel_heft_policy,
+	.deinit_sched = deinitialize_parallel_heft_policy,
+	.push_task = parallel_heft_push_task, 
+	.push_prio_task = parallel_heft_push_prio_task, 
+	.pop_task = parallel_heft_pop_task,
+	.post_exec_hook = NULL,
+	.pop_every_task = NULL,
+	.policy_name = "pheft",
+	.policy_description = "parallel HEFT"
+};

+ 19 - 1
tests/Makefile.am

@@ -144,7 +144,10 @@ check_PROGRAMS += 				\
 	microbenchs/prefetch_data_on_node 	\
 	microbenchs/redundant_buffer		\
 	microbenchs/local_pingpong		\
-	overlap/overlap
+	overlap/overlap				\
+	parallel_tasks/explicit_combined_worker	\
+	parallel_tasks/parallel_kernels		\
+	parallel_tasks/parallel_kernels_spmd
 
 testbin_PROGRAMS +=				\
 	core/restart
@@ -505,3 +508,18 @@ testbin_PROGRAMS +=				\
 	overlap/overlap
 overlap_overlap_SOURCES =			\
 	overlap/overlap.c
+
+testbin_PROGRAMS +=					\
+	parallel_tasks/explicit_combined_worker
+parallel_tasks_explicit_combined_worker_SOURCES =	\
+	parallel_tasks/explicit_combined_worker.c
+
+testbin_PROGRAMS +=				\
+	parallel_tasks/parallel_kernels
+parallel_tasks_parallel_kernels_SOURCES =	\
+	parallel_tasks/parallel_kernels.c
+
+testbin_PROGRAMS +=				\
+	parallel_tasks/parallel_kernels_spmd
+parallel_tasks_parallel_kernels_spmd_SOURCES =	\
+	parallel_tasks/parallel_kernels_spmd.c

+ 110 - 0
tests/parallel_tasks/explicit_combined_worker.c

@@ -0,0 +1,110 @@
+/*
+ * StarPU
+ * Copyright (C) Université Bordeaux 1, CNRS 2008-2010 (see AUTHORS file)
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <starpu.h>
+
+#define N	1000
+#define VECTORSIZE	1024
+
+static pthread_mutex_t mutex;
+static pthread_cond_t cond;
+
+static unsigned finished = 0;
+
+static unsigned cnt;
+
+starpu_data_handle v_handle;
+static unsigned *v;
+
+static void codelet_null(void *descr[], __attribute__ ((unused)) void *_args)
+{
+	int worker_size = starpu_combined_worker_get_size();
+	assert(worker_size > 0);
+	usleep(1000/worker_size);
+#if 0
+	int id = starpu_worker_get_id();
+	int combined_id = starpu_combined_worker_get_id();
+	fprintf(stderr, "worker id %d - combined id %d - worker size %d\n", id, combined_id, worker_size);
+#endif
+}
+
+static starpu_codelet cl = {
+	.where = STARPU_CPU|STARPU_CUDA|STARPU_OPENCL,
+	.type = STARPU_FORKJOIN,
+	.max_parallelism = INT_MAX,
+	.cpu_func = codelet_null,
+	.cuda_func = codelet_null,
+        .opencl_func = codelet_null,
+	.nbuffers = 1
+};
+
+
+int main(int argc, char **argv)
+{
+        struct starpu_conf conf = {
+                .sched_policy_name = "pheft",
+                .ncpus = -1,
+                .ncuda = -1,
+                .nopencl = -1,
+                .nspus = -1,
+                .use_explicit_workers_bindid = 0,
+                .use_explicit_workers_cuda_gpuid = 0,
+                .use_explicit_workers_opencl_gpuid = 0,
+                .calibrate = -1
+        };
+
+	starpu_init(NULL);
+
+	starpu_data_malloc_pinned_if_possible((void **)&v, VECTORSIZE*sizeof(unsigned));
+	starpu_vector_data_register(&v_handle, 0, (uintptr_t)v, VECTORSIZE, sizeof(unsigned));
+
+	unsigned nworker = starpu_worker_get_count() + starpu_combined_worker_get_count();
+
+	cnt = nworker*N;
+
+	unsigned iter, worker;
+	for (iter = 0; iter < N; iter++)
+	{
+		for (worker = 0; worker < nworker; worker++)
+		{
+			/* execute a task on that worker */
+			struct starpu_task *task = starpu_task_create();
+			task->cl = &cl;
+
+			task->buffers[0].handle = v_handle;
+			task->buffers[0].mode = STARPU_R;
+
+			task->execute_on_a_specific_worker = 1;
+			task->workerid = worker;
+
+			int ret = starpu_task_submit(task);
+			if (ret == -ENODEV)
+				goto enodev;
+		}
+	}
+
+	starpu_task_wait_for_all();
+
+	starpu_shutdown();
+
+	return 0;
+
+enodev:
+	fprintf(stderr, "WARNING: No one can execute this task\n");
+	/* yes, we do not perform the computation but we did detect that no one
+ 	 * could perform the kernel, so this is not an error from StarPU */
+	return 0;
+}

+ 113 - 0
tests/parallel_tasks/parallel_kernels.c

@@ -0,0 +1,113 @@
+/*
+ * StarPU
+ * Copyright (C) Université Bordeaux 1, CNRS 2008-2010 (see AUTHORS file)
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <starpu.h>
+
+#define N	1000
+#define VECTORSIZE	1024
+
+static pthread_mutex_t mutex;
+static pthread_cond_t cond;
+
+static unsigned finished = 0;
+
+static unsigned cnt;
+
+starpu_data_handle v_handle;
+static unsigned *v;
+
+static void codelet_null(void *descr[], __attribute__ ((unused)) void *_args)
+{
+	int worker_size = starpu_combined_worker_get_size();
+	assert(worker_size > 0);
+	usleep(1000/worker_size);
+#if 0
+	int id = starpu_worker_get_id();
+	int combined_id = starpu_combined_worker_get_id();
+	fprintf(stderr, "worker id %d - combined id %d - worker size %d\n", id, combined_id, worker_size);
+#endif
+}
+
+struct starpu_perfmodel_t model = {
+	.type = STARPU_HISTORY_BASED,
+	.symbol = "parallel_kernel_test"
+};
+
+static starpu_codelet cl = {
+	.where = STARPU_CPU|STARPU_CUDA|STARPU_OPENCL,
+	.type = STARPU_FORKJOIN,
+	.max_parallelism = INT_MAX,
+	.cpu_func = codelet_null,
+	.cuda_func = codelet_null,
+        .opencl_func = codelet_null,
+	.model = &model,
+	.nbuffers = 1
+};
+
+
+int main(int argc, char **argv)
+{
+        struct starpu_conf conf = {
+                .sched_policy_name = "pheft",
+                .ncpus = -1,
+                .ncuda = -1,
+                .nopencl = -1,
+                .nspus = -1,
+                .use_explicit_workers_bindid = 0,
+                .use_explicit_workers_cuda_gpuid = 0,
+                .use_explicit_workers_opencl_gpuid = 0,
+                .calibrate = 1
+        };
+
+	starpu_init(&conf);
+
+	starpu_data_malloc_pinned_if_possible((void **)&v, VECTORSIZE*sizeof(unsigned));
+	starpu_vector_data_register(&v_handle, 0, (uintptr_t)v, VECTORSIZE, sizeof(unsigned));
+
+	unsigned nworker = starpu_worker_get_count() + starpu_combined_worker_get_count();
+
+	cnt = nworker*N;
+
+	unsigned iter, worker;
+	for (iter = 0; iter < N; iter++)
+	{
+		for (worker = 0; worker < nworker; worker++)
+		{
+			/* execute a task on that worker */
+			struct starpu_task *task = starpu_task_create();
+			task->cl = &cl;
+
+			task->buffers[0].handle = v_handle;
+			task->buffers[0].mode = STARPU_R;
+
+			int ret = starpu_task_submit(task);
+			if (ret == -ENODEV)
+				goto enodev;
+		}
+	}
+
+	starpu_task_wait_for_all();
+
+	starpu_shutdown();
+
+	return 0;
+
+enodev:
+	fprintf(stderr, "WARNING: No one can execute this task\n");
+	/* yes, we do not perform the computation but we did detect that no one
+ 	 * could perform the kernel, so this is not an error from StarPU */
+	return 0;
+}

+ 1 - 0
tools/perfmodel_display.c

@@ -232,6 +232,7 @@ static void display_all_perf_models(struct starpu_perfmodel_t *model)
 		}
 	}
 	else {
+#warning TODO add the cpu:k interface as in the branch
 		if (strcmp(arch, "cpu") == 0) {
 			display_perf_model(model, STARPU_CPU_DEFAULT);
 			return;