Browse Source

- merge trunk

Olivier Aumage 11 years ago
parent
commit
bd21a2eb8c

+ 14 - 1
ChangeLog

@@ -45,6 +45,10 @@ New features:
     nodes.
   * Add STARPU_CUDA_ASYNC and STARPU_OPENCL_ASYNC flags to allow asynchronous
     CUDA and OpenCL kernel execution.
+  * Add STARPU_CUDA_PIPELINE and STARPU_OPENCL_PIPELINE to specify how
+    many asynchronous tasks are submitted in advance on CUDA and
+    OpenCL devices. Setting the value to 0 forces a synchronous
+    execution of all tasks.
   * Add CUDA concurrent kernel execution support through
     the STARPU_NWORKER_PER_CUDA environment variable.
   * Add CUDA and OpenCL kernel submission pipelining, to overlap costs and allow
@@ -79,6 +83,8 @@ Small features:
   * On Linux x86, spinlocks now block after a hundred tries. This avoids
     typical 10ms pauses when the application thread tries to submit tasks.
   * New function char *starpu_worker_get_type_as_string(enum starpu_worker_archtype type)
+  * Improve static scheduling by adding support for specifying the task
+    execution order.
 
 Changes:
   * Data interfaces (variable, vector, matrix and block) now define
@@ -95,7 +101,14 @@ Small changes:
   * Rename function starpu_trace_user_event() as
     starpu_fxt_trace_user_event()
 
-StarPU 1.1.3 (svn revision xxx)
+StarPU 1.1.4 (svn revision xxx)
+==============================================
+The scheduling context release
+
+New features:
+  * Fix and actually enable the cache allocation.
+
+StarPU 1.1.3 (svn revision 13450)
 ==============================================
 The scheduling context release
 

+ 11 - 0
doc/doxygen/chapters/08scheduling.doxy

@@ -146,6 +146,17 @@ task->execute_on_a_specific_worker = 1;
 task->worker = starpu_worker_get_by_type(STARPU_CUDA_WORKER, 0);
 \endcode
 
+One can also specify the order in which tasks must be executed by setting the
+starpu_task::workerder field. If this field is set to a non-zero value, it
+provides the per-worker consecutive order in which tasks will be executed,
+starting from 1. For a given of such task, the worker will thus not execute
+it before all the tasks with smaller order value have been executed, notably
+in case those tasks are not available yet due to some dependencies. This
+eventually gives total control of task scheduling, and StarPU will only serve as
+a "self-timed" task runtime. Of course, the provided order has to be runnable,
+i.e. a task should should not depend on another task bound to the same worker
+with a bigger order.
+
 Note however that using scheduling contexts while statically scheduling tasks on workers
 could be tricky. Be careful to schedule the tasks exactly on the workers of the corresponding
 contexts, otherwise the workers' corresponding scheduling structures may not be allocated or

+ 8 - 0
doc/doxygen/chapters/21simgrid.doxy

@@ -121,6 +121,14 @@ case. Since during simgrid execution, the functions of the codelet are actually
 not called, one can use dummy functions such as the following to still permit
 CUDA or OpenCL execution:
 
+\section Debugging applications
+
+By default, simgrid uses its own implementation of threads, which prevents gdb
+from being able to inspect stacks of all threads.  To be able to fully debug an
+application running with simgrid, pass the <c>--cfg=contexts/factory:thread</c>
+option to the application, to make simgrid use system threads, which gdb will be
+able to manipulate as usual.
+
 \snippet simgrid.c To be included. You should update doxygen if you see this text.
 
 

+ 4 - 2
doc/doxygen/chapters/40environment_variables.doxy

@@ -58,7 +58,8 @@ which will be concurrently running on the devices. The default value is 1.
 Specify how many asynchronous tasks are submitted in advance on CUDA
 devices. This for instance permits to overlap task management with the execution
 of previous tasks, but it also allows concurrent execution on Fermi cards, which
-otherwise bring spurious synchronizations. The default is 2.
+otherwise bring spurious synchronizations. The default is 2. Setting the value to 0 forces a synchronous
+execution of all tasks.
 </dd>
 
 <dt>STARPU_NOPENCL</dt>
@@ -75,7 +76,8 @@ OpenCL equivalent of the environment variable \ref STARPU_NCUDA.
 Specify how many asynchronous tasks are submitted in advance on OpenCL
 devices. This for instance permits to overlap task management with the execution
 of previous tasks, but it also allows concurrent execution on Fermi cards, which
-otherwise bring spurious synchronizations. The default is 2.
+otherwise bring spurious synchronizations. The default is 2. Setting the value to 0 forces a synchronous
+execution of all tasks.
 </dd>
 
 <dt>STARPU_NMICDEVS</dt>

+ 8 - 0
doc/doxygen/chapters/api/codelet_and_tasks.doxy

@@ -549,6 +549,14 @@ process this task (as returned by starpu_worker_get_id()). This field
 is ignored if the field starpu_task::execute_on_a_specific_worker is
 set to 0.
 
+\var starpu_task::workerorder
+Optional field. If the field starpu_task::execute_on_a_specific_worker is
+set, this field indicates the per-worker consecutive order in which tasks
+should be executed on the worker. Tasks will be executed in consecutive
+starpu_task::workerorder values, thus ignoring the availability order or task
+priority. See \ref StaticScheduling for more details. This field is ignored if
+the field starpu_task::execute_on_a_specific_worker is set to 0.
+
 \var starpu_task::bundle
 Optional field. The bundle that includes this task. If no bundle is
 used, this should be NULL.

+ 3 - 2
doc/doxygen/chapters/api/data_interfaces.doxy

@@ -1000,11 +1000,12 @@ DefiningANewDataInterface.
 \ingroup API_Data_Interfaces
 Allocate \p size bytes on node \p dst_node. This returns 0 if
 allocation failed, the allocation method should then return <c>-ENOMEM</c> as
-allocated size.
+allocated size. Deallocation must be done with starpu_free_on_node.
 
 \fn void starpu_free_on_node(unsigned dst_node, uintptr_t addr, size_t size)
 \ingroup API_Data_Interfaces
-Free \p addr of \p size bytes on node \p dst_node.
+Free \p addr of \p size bytes on node \p dst_node which was previously allocated
+with starpu_malloc_on_node.
 
 \fn int starpu_interface_copy(uintptr_t src, size_t src_offset, unsigned src_node, uintptr_t dst, size_t dst_offset, unsigned dst_node, size_t size, void *async_data)
 \ingroup API_Data_Interfaces

+ 7 - 1
doc/doxygen/chapters/api/insert_task.doxy

@@ -23,7 +23,7 @@ The arguments following the codelet can be of the following types:
 ::STARPU_REDUX an access mode followed by a data handle;
 <li> ::STARPU_DATA_ARRAY followed by an array of data handles and its
 number of elements;
-<li> ::STARPU_EXECUTE_ON_WORKER followed by an integer value
+<li> ::STARPU_EXECUTE_ON_WORKER, ::STARPU_WORKER_ORDER followed by an integer value
 specifying the worker on which to execute the task (as specified by
 starpu_task::execute_on_a_specific_worker)
 <li> the specific values ::STARPU_VALUE, ::STARPU_CALLBACK,
@@ -80,6 +80,12 @@ this macro is used when calling starpu_task_insert(), and must be
 followed by an integer value specifying the worker on which to execute
 the task (as specified by starpu_task::execute_on_a_specific_worker)
 
+\def STARPU_WORKER_ORDER
+\ingroup API_Insert_Task
+this macro is used when calling starpu_task_insert(), and must be
+followed by an integer value specifying the worker order in which to execute
+the tasks (as specified by starpu_task::workerorder)
+
 \def STARPU_TAG
 \ingroup API_Insert_Task
 this macro is used when calling starpu_task_insert(), and must be followed by a tag.

+ 1 - 1
doc/doxygen/refman.tex

@@ -10,7 +10,7 @@
 ~\\
 \vspace*{15cm}
 \begin{flushright}
-Generated by Doxygen $doxygenversion on $datetime
+Generated by Doxygen.
 \end{flushright}
 \end{titlepage}
 

+ 6 - 1
examples/worker_collections/worker_list_example.c

@@ -21,7 +21,12 @@
 
 int main()
 {
-	starpu_init(NULL);
+	int ret;
+
+	ret = starpu_init(NULL);
+	if (ret == -ENODEV)
+		return 77;
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
 
 	int procs[STARPU_NMAXWORKERS];
 	unsigned ncpus =  starpu_cpu_worker_get_count();

+ 1 - 1
examples/worker_collections/worker_tree_example.c

@@ -63,7 +63,7 @@ int main()
 
 	double timing = (end_time - start_time) / 1000;
 
-	int i;
+	unsigned i;
 	for(i = 0; i < ncpus; i++)
 	{
 		int added = co->add(co, procs[i]);

+ 2 - 0
include/starpu_task.h

@@ -170,6 +170,7 @@ struct starpu_task
 	unsigned regenerate:1;
 
 	unsigned workerid;
+	unsigned workerorder;
 
 	unsigned scheduled:1;
 
@@ -211,6 +212,7 @@ struct starpu_task
 	.use_tag = 0,					\
 	.synchronous = 0,				\
 	.execute_on_a_specific_worker = 0,		\
+	.workerorder = 0,				\
 	.bundle = NULL,					\
 	.detach = 1,					\
 	.destroy = 0,					\

+ 1 - 0
include/starpu_task_util.h

@@ -50,6 +50,7 @@ void starpu_create_sync_task(starpu_tag_t sync_tag, unsigned ndeps, starpu_tag_t
 #define STARPU_PROLOGUE_CALLBACK_POP_ARG (16<<18)
 #define STARPU_EXECUTE_ON_WORKER (17<<18)
 #define STARPU_TAG_ONLY          (18<<18)
+#define STARPU_WORKER_ORDER      (19<<18)
 
 struct starpu_task *starpu_task_build(struct starpu_codelet *cl, ...);
 int starpu_task_insert(struct starpu_codelet *cl, ...);

+ 16 - 0
mpi/src/starpu_mpi_task_insert.c

@@ -234,6 +234,12 @@ int _starpu_mpi_task_decode_v(struct starpu_codelet *codelet, int me, int nb_nod
 			// calling function _starpu_task_insert_create()
 			(void)va_arg(varg_list_copy, int);
 		}
+		else if (arg_type==STARPU_WORKER_ORDER)
+		{
+			// the flag is decoded and set later when
+			// calling function _starpu_task_insert_create()
+			(void)va_arg(varg_list_copy, unsigned);
+		}
 		else if (arg_type_nocommute==STARPU_R || arg_type_nocommute==STARPU_W || arg_type_nocommute==STARPU_RW || arg_type==STARPU_SCRATCH || arg_type==STARPU_REDUX)
 		{
 			starpu_data_handle_t data = va_arg(varg_list_copy, starpu_data_handle_t);
@@ -467,6 +473,12 @@ int _starpu_mpi_task_build_v(MPI_Comm comm, struct starpu_codelet *codelet, stru
 			// calling function _starpu_task_insert_create()
 			va_arg(varg_list_copy, int);
 		}
+		else if (arg_type==STARPU_WORKER_ORDER)
+		{
+			// the flag is decoded and set later when
+			// calling function _starpu_task_insert_create()
+			va_arg(varg_list_copy, unsigned);
+		}
 		else if (arg_type==STARPU_HYPERVISOR_TAG)
 		{
 			(void)va_arg(varg_list_copy, int);
@@ -609,6 +621,10 @@ int _starpu_mpi_task_postbuild_v(MPI_Comm comm, struct starpu_codelet *codelet,
 		{
 			va_arg(varg_list_copy, int);
 		}
+		else if (arg_type==STARPU_WORKER_ORDER)
+		{
+			va_arg(varg_list_copy, unsigned);
+		}
 		else if (arg_type==STARPU_HYPERVISOR_TAG)
 		{
 			(void)va_arg(varg_list_copy, int);

+ 2 - 2
mpi/tests/block_interface.c

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2009, 2010  Université de Bordeaux 1
- * Copyright (C) 2010, 2011, 2012  Centre National de la Recherche Scientifique
+ * Copyright (C) 2010, 2011, 2012, 2014  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -51,7 +51,7 @@ int main(int argc, char **argv)
 	 * register it directly. Node 0 and 1 will then exchange the content of
 	 * their blocks. */
 
-	float *block;
+	float *block = NULL;
 	starpu_data_handle_t block_handle;
 
 	if (rank == 0)

+ 1 - 1
mpi/tests/insert_task_recv_cache.c

@@ -130,7 +130,7 @@ int main(int argc, char **argv)
 	if (rank == 1)
 	{
 		result = (comm_amount_with_cache[0] == comm_amount_without_cache[0] * 2);
-		FPRINTF_MPI("Communication cache mechanism is %sworking (with cache: %d) (without cache: %d)\n", result?"":"NOT ", comm_amount_with_cache[0], comm_amount_without_cache[0]);
+		FPRINTF_MPI("Communication cache mechanism is %sworking (with cache: %ld) (without cache: %ld)\n", result?"":"NOT ", comm_amount_with_cache[0], comm_amount_without_cache[0]);
 	}
 	else
 		result = 1;

+ 6 - 5
src/common/starpu_spinlock.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2010, 2012-2013  Université de Bordeaux 1
+ * Copyright (C) 2010, 2012-2014  Université de Bordeaux 1
  * Copyright (C) 2010, 2011, 2013, 2014  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -24,15 +24,17 @@
 int _starpu_spin_init(struct _starpu_spinlock *lock)
 {
 #if defined(STARPU_SPINLOCK_CHECK)
+	starpu_pthread_mutexattr_t errcheck_attr;
 //	memcpy(&lock->errcheck_lock, PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP, sizeof(PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP));
 	int ret;
-	ret = starpu_pthread_mutexattr_init(&lock->errcheck_attr);
+	ret = starpu_pthread_mutexattr_init(&errcheck_attr);
 	STARPU_CHECK_RETURN_VALUE(ret, "pthread_mutexattr_init");
 
-	ret = starpu_pthread_mutexattr_settype(&lock->errcheck_attr, PTHREAD_MUTEX_ERRORCHECK);
+	ret = starpu_pthread_mutexattr_settype(&errcheck_attr, PTHREAD_MUTEX_ERRORCHECK);
 	STARPU_ASSERT(!ret);
 
-	ret = starpu_pthread_mutex_init(&lock->errcheck_lock, &lock->errcheck_attr);
+	ret = starpu_pthread_mutex_init(&lock->errcheck_lock, &errcheck_attr);
+	starpu_pthread_mutexattr_destroy(&errcheck_attr);
 	return ret;
 #else
 	int ret = starpu_pthread_spin_init(&lock->lock, 0);
@@ -44,7 +46,6 @@ int _starpu_spin_init(struct _starpu_spinlock *lock)
 int _starpu_spin_destroy(struct _starpu_spinlock *lock STARPU_ATTRIBUTE_UNUSED)
 {
 #if defined(STARPU_SPINLOCK_CHECK)
-	starpu_pthread_mutexattr_destroy(&lock->errcheck_attr);
 	return starpu_pthread_mutex_destroy(&lock->errcheck_lock);
 #else
 	return starpu_pthread_spin_destroy(&lock->lock);

+ 1 - 2
src/common/starpu_spinlock.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2010-2013  Université de Bordeaux 1
+ * Copyright (C) 2010-2014  Université de Bordeaux 1
  * Copyright (C) 2010, 2011, 2013, 2014  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -25,7 +25,6 @@
 struct _starpu_spinlock
 {
 #if defined(STARPU_SPINLOCK_CHECK)
-	starpu_pthread_mutexattr_t errcheck_attr;
 	starpu_pthread_mutex_t errcheck_lock;
 	const char *last_taker;
 #else

+ 54 - 3
src/core/jobs.c

@@ -600,11 +600,29 @@ unsigned _starpu_reenforce_task_deps_and_schedule(struct _starpu_job *j)
 }
 #endif
 
+/* Ordered tasks are simply recorded as they arrive in the local_ordered_tasks
+ * ring buffer, indexed by order, and pulled from its head. */
+/* TODO: replace with perhaps a heap */
+
 /* This function must be called with worker->sched_mutex taken */
 struct starpu_task *_starpu_pop_local_task(struct _starpu_worker *worker)
 {
 	struct starpu_task *task = NULL;
 
+	if (worker->local_ordered_tasks_size)
+	{
+		task = worker->local_ordered_tasks[worker->current_ordered_task];
+		if (task)
+		{
+			worker->local_ordered_tasks[worker->current_ordered_task] = NULL;
+			STARPU_ASSERT(task->workerorder == worker->current_ordered_task_order);
+			/* Next ordered task is there, return it */
+			worker->current_ordered_task = (worker->current_ordered_task + 1) % worker->local_ordered_tasks_size;
+			worker->current_ordered_task_order++;
+			return task;
+		}
+	}
+
 	if (!starpu_task_list_empty(&worker->local_tasks))
 		task = starpu_task_list_pop_front(&worker->local_tasks);
 
@@ -620,10 +638,43 @@ int _starpu_push_local_task(struct _starpu_worker *worker, struct starpu_task *t
 
 	STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
 
-	if (prio)
-		starpu_task_list_push_front(&worker->local_tasks, task);
+	if (task->execute_on_a_specific_worker && task->workerorder)
+	{
+		STARPU_ASSERT_MSG(task->workerorder >= worker->current_ordered_task_order, "worker order values must not have duplicates");
+		/* Put it in the ordered task ring */
+		unsigned needed = task->workerorder - worker->current_ordered_task_order + 1;
+		if (worker->local_ordered_tasks_size < needed)
+		{
+			/* Increase the size */
+			unsigned alloc = worker->local_ordered_tasks_size;
+			struct starpu_task **new;
+			unsigned copied;
+
+			if (!alloc)
+				alloc = 1;
+			while (alloc < needed)
+				alloc *= 2;
+			new = malloc(alloc * sizeof(*new));
+
+			/* Put existing tasks at the beginning of the new ring */
+			copied = worker->local_ordered_tasks_size - worker->current_ordered_task;
+			memcpy(new, &worker->local_ordered_tasks[worker->current_ordered_task], copied * sizeof(*new));
+			memcpy(new + copied, worker->local_ordered_tasks, (worker->local_ordered_tasks_size - copied) * sizeof(*new));
+			memset(new + worker->local_ordered_tasks_size, 0, (alloc - worker->local_ordered_tasks_size) * sizeof(*new));
+			free(worker->local_ordered_tasks);
+			worker->local_ordered_tasks = new;
+			worker->local_ordered_tasks_size = alloc;
+			worker->current_ordered_task = 0;
+		}
+		worker->local_ordered_tasks[(worker->current_ordered_task + task->workerorder - worker->current_ordered_task_order) % worker->local_ordered_tasks_size] = task;
+	}
 	else
-		starpu_task_list_push_back(&worker->local_tasks, task);
+	{
+		if (prio)
+			starpu_task_list_push_front(&worker->local_tasks, task);
+		else
+			starpu_task_list_push_back(&worker->local_tasks, task);
+	}
 
 	STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
 	starpu_push_task_end(task);

+ 10 - 6
src/core/perfmodel/perfmodel_history.c

@@ -814,10 +814,14 @@ static void get_model_path(struct starpu_perfmodel *model, char *path, size_t ma
 	_starpu_get_perf_model_dir_codelets(path, maxlen);
 	strncat(path, model->symbol, maxlen);
 
-	char hostname[65];
-	_starpu_gethostname(hostname, sizeof(hostname));
-	strncat(path, ".", maxlen);
-	strncat(path, hostname, maxlen);
+	const char *dot = strrchr(model->symbol, '.');
+	if (dot == NULL)
+	{
+		char hostname[65];
+		_starpu_gethostname(hostname, sizeof(hostname));
+		strncat(path, ".", maxlen);
+		strncat(path, hostname, maxlen);
+	}
 }
 
 static void save_history_based_model(struct starpu_perfmodel *model)
@@ -1226,7 +1230,7 @@ double _starpu_non_linear_regression_based_job_expected_perf(struct starpu_perfm
 			char archname[32];
 
 			starpu_perfmodel_get_arch_name(arch, archname, sizeof(archname), nimpl);
-			_STARPU_DISP("Warning: model %s is not calibrated enough for %s, forcing calibration for this run. Use the STARPU_CALIBRATE environment variable to control this.\n", model->symbol, archname);
+			_STARPU_DISP("Warning: model %s is not calibrated enough for %s (only %u measurements), forcing calibration for this run. Use the STARPU_CALIBRATE environment variable to control this.\n", model->symbol, archname, entry && entry->history_entry ? entry->history_entry->nsample : 0);
 			_starpu_set_calibrate_flag(1);
 			model->benchmarking = 1;
 		}
@@ -1268,7 +1272,7 @@ double _starpu_history_based_job_expected_perf(struct starpu_perfmodel *model, s
 		char archname[32];
 
 		starpu_perfmodel_get_arch_name(arch, archname, sizeof(archname), nimpl);
-		_STARPU_DISP("Warning: model %s is not calibrated enough for %s, forcing calibration for this run. Use the STARPU_CALIBRATE environment variable to control this.\n", model->symbol, archname);
+		_STARPU_DISP("Warning: model %s is not calibrated enough for %s (only %u measurements), forcing calibration for this run. Use the STARPU_CALIBRATE environment variable to control this.\n", model->symbol, archname, entry ? entry->nsample : 0);
 		_starpu_set_calibrate_flag(1);
 		model->benchmarking = 1;
 	}

+ 8 - 0
src/core/workers.c

@@ -427,6 +427,10 @@ static void _starpu_worker_init(struct _starpu_worker *workerarg, struct _starpu
 	STARPU_PTHREAD_COND_INIT(&workerarg->sched_cond, NULL);
 	STARPU_PTHREAD_MUTEX_INIT(&workerarg->sched_mutex, NULL);
 	starpu_task_list_init(&workerarg->local_tasks);
+	workerarg->local_ordered_tasks = NULL;
+	workerarg->local_ordered_tasks_size = 0;
+	workerarg->current_ordered_task = 0;
+	workerarg->current_ordered_task_order = 1;
 	workerarg->current_task = NULL;
 	workerarg->first_task = 0;
 	workerarg->ntasks = 0;
@@ -1156,6 +1160,7 @@ static void _starpu_terminate_workers(struct _starpu_machine_config *pconfig)
 {
 	int status = 0;
 	unsigned workerid;
+	unsigned n;
 
 	for (workerid = 0; workerid < pconfig->topology.nworkers; workerid++)
 	{
@@ -1208,8 +1213,11 @@ static void _starpu_terminate_workers(struct _starpu_machine_config *pconfig)
 
 out:
 		STARPU_ASSERT(starpu_task_list_empty(&worker->local_tasks));
+		for (n = 0; n < worker->local_ordered_tasks_size; n++)
+			STARPU_ASSERT(worker->local_ordered_tasks[n] == NULL);
 		_starpu_sched_ctx_list_delete(&worker->sched_ctx_list);
 		_starpu_job_list_delete(worker->terminated_jobs);
+		free(worker->local_ordered_tasks);
 	}
 }
 

+ 4 - 0
src/core/workers.h

@@ -75,6 +75,10 @@ LIST_TYPE(_starpu_worker,
 	starpu_pthread_cond_t sched_cond; /* condition variable used when the worker waits for tasks. */
         starpu_pthread_mutex_t sched_mutex; /* mutex protecting sched_cond */
 	struct starpu_task_list local_tasks; /* this queue contains tasks that have been explicitely submitted to that queue */
+	struct starpu_task **local_ordered_tasks; /* this queue contains tasks that have been explicitely submitted to that queue with an explicit order */
+	unsigned local_ordered_tasks_size; /* this records the size of local_ordered_tasks */
+	unsigned current_ordered_task; /* this records the index (within local_ordered_tasks) of the next ordered task to be executed */
+	unsigned current_ordered_task_order; /* this records the order of the next ordered task to be executed */
 	struct starpu_task *current_task; /* task currently executed by this worker (non-pipelined version) */
 	struct starpu_task *current_tasks[STARPU_MAX_PIPELINE]; /* tasks currently executed by this worker (pipelined version) */
 	unsigned char first_task; /* Index of first task in the pipeline */

+ 11 - 6
src/datawizard/data_request.c

@@ -155,17 +155,22 @@ int _starpu_wait_data_request_completion(struct _starpu_data_request *r, unsigne
 {
 	int retval;
 	int do_delete = 0;
+	int completed;
 
 	unsigned local_node = _starpu_memory_node_get_local_key();
 
 	do
 	{
-		_starpu_spin_lock(&r->lock);
-
-		if (r->completed)
-			break;
-
-		_starpu_spin_unlock(&r->lock);
+		STARPU_HG_DISABLE_CHECKING(r->completed);
+		completed = r->completed;
+		STARPU_HG_ENABLE_CHECKING(r->completed);
+		if (completed)
+		{
+			_starpu_spin_lock(&r->lock);
+			if (r->completed)
+				break;
+			_starpu_spin_unlock(&r->lock);
+		}
 
 #ifndef STARPU_NON_BLOCKING_DRIVERS
 		_starpu_wake_all_blocked_workers_on_node(r->handling_node);

+ 1 - 5
src/datawizard/memalloc.c

@@ -434,10 +434,6 @@ static void reuse_mem_chunk(unsigned node, struct _starpu_data_replicate *new_re
 	else
 		data_interface = mc->chunk_interface;
 
-	new_replicate->allocated = 1;
-	new_replicate->automatically_allocated = 1;
-	new_replicate->initialized = 0;
-
 	STARPU_ASSERT(new_replicate->data_interface);
 	STARPU_ASSERT(data_interface);
 	memcpy(new_replicate->data_interface, data_interface, mc->size_interface);
@@ -522,7 +518,7 @@ static struct _starpu_mem_chunk *_starpu_memchunk_cache_lookup_locked(unsigned n
 	     mc = _starpu_mem_chunk_list_next(mc))
 	{
 		/* Is that a false hit ? (this is _very_ unlikely) */
-		if (_starpu_data_interface_compare(handle->per_node[node].data_interface, handle->ops, mc->chunk_interface, mc->ops))
+		if (_starpu_data_interface_compare(handle->per_node[node].data_interface, handle->ops, mc->chunk_interface, mc->ops) != 1)
 			continue;
 
 		/* Cache hit */

+ 54 - 49
src/debug/traces/starpu_fxt.c

@@ -29,12 +29,19 @@
 #include <inttypes.h>
 #include <starpu_hash.h>
 
-static char *cpus_worker_colors[STARPU_NMAXWORKERS] = {"/greens9/7", "/greens9/6", "/greens9/5", "/greens9/4",  "/greens9/9", "/greens9/3",  "/greens9/2",  "/greens9/1"  };
-static char *cuda_worker_colors[STARPU_NMAXWORKERS] = {"/ylorrd9/9", "/ylorrd9/6", "/ylorrd9/3", "/ylorrd9/1", "/ylorrd9/8", "/ylorrd9/7", "/ylorrd9/4", "/ylorrd9/2",  "/ylorrd9/1"};
-static char *opencl_worker_colors[STARPU_NMAXWORKERS] = {"/blues9/9", "/blues9/6", "/blues9/3", "/blues9/1", "/blues9/8", "/blues9/7", "/blues9/4", "/blues9/2",  "/blues9/1"};
-static char *mic_worker_colors[STARPU_NMAXWORKERS] = {"/reds9/9", "/reds9/6", "/reds9/3", "/reds9/1", "/reds9/8", "/reds9/7", "/reds9/4", "/reds9/2",  "/reds9/1"};
-static char *scc_worker_colors[STARPU_NMAXWORKERS] = {"/reds9/9", "/reds9/6", "/reds9/3", "/reds9/1", "/reds9/8", "/reds9/7", "/reds9/4", "/reds9/2",  "/reds9/1"};
-static char *other_worker_colors[STARPU_NMAXWORKERS] = {"/greys9/9", "/greys9/8", "/greys9/7", "/greys9/6"};
+#define CPUS_WORKER_COLORS_NB	8
+#define CUDA_WORKER_COLORS_NB	9
+#define OPENCL_WORKER_COLORS_NB 8
+#define MIC_WORKER_COLORS_NB	9
+#define SCC_WORKER_COLORS_NB	9
+#define OTHER_WORKER_COLORS_NB	4
+
+static char *cpus_worker_colors[CPUS_WORKER_COLORS_NB] = {"/greens9/7", "/greens9/6", "/greens9/5", "/greens9/4",  "/greens9/9", "/greens9/3",  "/greens9/2",  "/greens9/1"  };
+static char *cuda_worker_colors[CUDA_WORKER_COLORS_NB] = {"/ylorrd9/9", "/ylorrd9/6", "/ylorrd9/3", "/ylorrd9/1", "/ylorrd9/8", "/ylorrd9/7", "/ylorrd9/4", "/ylorrd9/2",  "/ylorrd9/1"};
+static char *opencl_worker_colors[OPENCL_WORKER_COLORS_NB] = {"/blues9/9", "/blues9/6", "/blues9/3", "/blues9/1", "/blues9/8", "/blues9/7", "/blues9/4", "/blues9/2",  "/blues9/1"};
+static char *mic_worker_colors[MIC_WORKER_COLORS_NB] = {"/reds9/9", "/reds9/6", "/reds9/3", "/reds9/1", "/reds9/8", "/reds9/7", "/reds9/4", "/reds9/2",  "/reds9/1"};
+static char *scc_worker_colors[SCC_WORKER_COLORS_NB] = {"/reds9/9", "/reds9/6", "/reds9/3", "/reds9/1", "/reds9/8", "/reds9/7", "/reds9/4", "/reds9/2",  "/reds9/1"};
+static char *other_worker_colors[OTHER_WORKER_COLORS_NB] = {"/greys9/9", "/greys9/8", "/greys9/7", "/greys9/6"};
 static char *worker_colors[STARPU_NMAXWORKERS];
 
 static unsigned opencl_index = 0;
@@ -49,6 +56,7 @@ static void set_next_other_worker_color(int workerid)
 	if (workerid >= STARPU_NMAXWORKERS)
 		return;
 	worker_colors[workerid] = other_worker_colors[other_index++];
+	if (other_index == OTHER_WORKER_COLORS_NB) other_index = 0;
 }
 
 static void set_next_cpu_worker_color(int workerid)
@@ -56,6 +64,7 @@ static void set_next_cpu_worker_color(int workerid)
 	if (workerid >= STARPU_NMAXWORKERS)
 		return;
 	worker_colors[workerid] = cpus_worker_colors[cpus_index++];
+	if (cpus_index == CPUS_WORKER_COLORS_NB) cpus_index = 0;
 }
 
 static void set_next_cuda_worker_color(int workerid)
@@ -63,6 +72,7 @@ static void set_next_cuda_worker_color(int workerid)
 	if (workerid >= STARPU_NMAXWORKERS)
 		return;
 	worker_colors[workerid] = cuda_worker_colors[cuda_index++];
+	if (cuda_index == CUDA_WORKER_COLORS_NB) cuda_index = 0;
 }
 
 static void set_next_opencl_worker_color(int workerid)
@@ -70,16 +80,23 @@ static void set_next_opencl_worker_color(int workerid)
 	if (workerid >= STARPU_NMAXWORKERS)
 		return;
 	worker_colors[workerid] = opencl_worker_colors[opencl_index++];
+	if (opencl_index == OPENCL_WORKER_COLORS_NB) opencl_index = 0;
 }
 
 static void set_next_mic_worker_color(int workerid)
 {
+	if (workerid >= STARPU_NMAXWORKERS)
+		return;
 	worker_colors[workerid] = mic_worker_colors[mic_index++];
+	if (mic_index == MIC_WORKER_COLORS_NB) mic_index = 0;
 }
 
 static void set_next_scc_worker_color(int workerid)
 {
+	if (workerid >= STARPU_NMAXWORKERS)
+		return;
 	worker_colors[workerid] = scc_worker_colors[scc_index++];
+	if (scc_index == SCC_WORKER_COLORS_NB) scc_index = 0;
 }
 
 static const char *get_worker_color(int workerid)
@@ -808,7 +825,7 @@ static void handle_end_codelet_body(struct fxt_ev_64 *ev, struct starpu_fxt_opti
 	}
 }
 
-static void handle_start_thread_executing(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
+static void handle_start_executing(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
 {
 	char *prefix = options->file_prefix;
 
@@ -816,7 +833,7 @@ static void handle_start_thread_executing(struct fxt_ev_64 *ev, struct starpu_fx
 		thread_set_state(get_event_time_stamp(ev, options), prefix, ev->param[0], "E");
 }
 
-static void handle_end_thread_executing(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
+static void handle_end_executing(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
 {
 	char *prefix = options->file_prefix;
 
@@ -882,7 +899,7 @@ static void handle_end_callback(struct fxt_ev_64 *ev, struct starpu_fxt_options
 		thread_set_state(get_event_time_stamp(ev, options), options->file_prefix, ev->param[1], "B");
 }
 
-static void handle_hyp_begin(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
+static void handle_hypervisor_begin(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
 {
 	int worker;
 	worker = find_worker_id(ev->param[0]);
@@ -893,7 +910,7 @@ static void handle_hyp_begin(struct fxt_ev_64 *ev, struct starpu_fxt_options *op
 		thread_set_state(get_event_time_stamp(ev, options), options->file_prefix, ev->param[0], "H");
 }
 
-static void handle_hyp_end(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
+static void handle_hypervisor_end(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
 {
 	int worker;
 	worker = find_worker_id(ev->param[0]);
@@ -917,7 +934,7 @@ static void handle_worker_status(struct fxt_ev_64 *ev, struct starpu_fxt_options
 
 static double last_sleep_start[STARPU_NMAXWORKERS];
 
-static void handle_start_scheduling(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
+static void handle_worker_scheduling_start(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
 {
 	int worker;
 	worker = find_worker_id(ev->param[0]);
@@ -927,7 +944,7 @@ static void handle_start_scheduling(struct fxt_ev_64 *ev, struct starpu_fxt_opti
 		thread_set_state(get_event_time_stamp(ev, options), options->file_prefix, ev->param[0], "Sc");
 }
 
-static void handle_end_scheduling(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
+static void handle_worker_scheduling_end(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
 {
 	int worker;
 	worker = find_worker_id(ev->param[0]);
@@ -937,7 +954,7 @@ static void handle_end_scheduling(struct fxt_ev_64 *ev, struct starpu_fxt_option
 		thread_set_state(get_event_time_stamp(ev, options), options->file_prefix, ev->param[0], "B");
 }
 
-static void handle_push_scheduling(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
+static void handle_worker_scheduling_push(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
 {
 	int worker;
 	worker = find_worker_id(ev->param[0]);
@@ -947,7 +964,7 @@ static void handle_push_scheduling(struct fxt_ev_64 *ev, struct starpu_fxt_optio
 		thread_push_state(get_event_time_stamp(ev, options), options->file_prefix, ev->param[0], "Sc");
 }
 
-static void handle_pop_scheduling(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
+static void handle_worker_scheduling_pop(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
 {
 	int worker;
 	worker = find_worker_id(ev->param[0]);
@@ -957,7 +974,7 @@ static void handle_pop_scheduling(struct fxt_ev_64 *ev, struct starpu_fxt_option
 		thread_pop_state(get_event_time_stamp(ev, options), options->file_prefix, ev->param[0]);
 }
 
-static void handle_start_sleep(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
+static void handle_worker_sleep_start(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
 {
 	int worker;
 	worker = find_worker_id(ev->param[0]);
@@ -970,7 +987,7 @@ static void handle_start_sleep(struct fxt_ev_64 *ev, struct starpu_fxt_options *
 		thread_set_state(get_event_time_stamp(ev, options), options->file_prefix, ev->param[0], "Sl");
 }
 
-static void handle_end_sleep(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
+static void handle_worker_sleep_end(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
 {
 	int worker;
 	worker = find_worker_id(ev->param[0]);
@@ -1209,7 +1226,7 @@ void handle_update_task_cnt(struct fxt_ev_64 *ev, struct starpu_fxt_options *opt
 	fprintf(activity_file, "cnt_submitted\t%.9f\t%lu\n", current_timestamp, nsubmitted);
 }
 
-static void handle_codelet_tag(struct fxt_ev_64 *ev)
+static void handle_tag(struct fxt_ev_64 *ev)
 {
 	uint64_t tag;
 	unsigned long job;
@@ -1220,7 +1237,7 @@ static void handle_codelet_tag(struct fxt_ev_64 *ev)
 	_starpu_fxt_dag_add_tag(tag, job);
 }
 
-static void handle_codelet_tag_deps(struct fxt_ev_64 *ev)
+static void handle_tag_deps(struct fxt_ev_64 *ev)
 {
 	uint64_t child;
 	uint64_t father;
@@ -1684,10 +1701,10 @@ void starpu_fxt_parse_new_file(char *filename_in, struct starpu_fxt_options *opt
 				break;
 
 			case _STARPU_FUT_START_EXECUTING:
-				handle_start_thread_executing(&ev, options);
+				handle_start_executing(&ev, options);
 				break;
 			case _STARPU_FUT_END_EXECUTING:
-				handle_end_thread_executing(&ev, options);
+				handle_end_executing(&ev, options);
 				break;
 
 			case _STARPU_FUT_START_CALLBACK:
@@ -1713,19 +1730,15 @@ void starpu_fxt_parse_new_file(char *filename_in, struct starpu_fxt_options *opt
 			case _STARPU_FUT_START_FETCH_INPUT:
 				handle_worker_status(&ev, options, "Fi");
 				break;
-
 			case _STARPU_FUT_START_PUSH_OUTPUT:
 				handle_worker_status(&ev, options, "Po");
 				break;
-
 			case _STARPU_FUT_START_PROGRESS:
 				handle_worker_status(&ev, options, "P");
 				break;
-
 			case _STARPU_FUT_START_UNPARTITION:
 				handle_worker_status(&ev, options, "U");
 				break;
-
 			case _STARPU_FUT_END_FETCH_INPUT:
 			case _STARPU_FUT_END_PROGRESS:
 			case _STARPU_FUT_END_PUSH_OUTPUT:
@@ -1734,35 +1747,35 @@ void starpu_fxt_parse_new_file(char *filename_in, struct starpu_fxt_options *opt
 				break;
 
 			case _STARPU_FUT_WORKER_SCHEDULING_START:
-				handle_start_scheduling(&ev, options);
+				handle_worker_scheduling_start(&ev, options);
 				break;
 
 			case _STARPU_FUT_WORKER_SCHEDULING_END:
-				handle_end_scheduling(&ev, options);
+				handle_worker_scheduling_end(&ev, options);
 				break;
 
 			case _STARPU_FUT_WORKER_SCHEDULING_PUSH:
-				handle_push_scheduling(&ev, options);
+				handle_worker_scheduling_push(&ev, options);
 				break;
 
 			case _STARPU_FUT_WORKER_SCHEDULING_POP:
-				handle_pop_scheduling(&ev, options);
+				handle_worker_scheduling_pop(&ev, options);
 				break;
 
 			case _STARPU_FUT_WORKER_SLEEP_START:
-				handle_start_sleep(&ev, options);
+				handle_worker_sleep_start(&ev, options);
 				break;
 
 			case _STARPU_FUT_WORKER_SLEEP_END:
-				handle_end_sleep(&ev, options);
+				handle_worker_sleep_end(&ev, options);
 				break;
 
 			case _STARPU_FUT_TAG:
-				handle_codelet_tag(&ev);
+				handle_tag(&ev);
 				break;
 
 			case _STARPU_FUT_TAG_DEPS:
-				handle_codelet_tag_deps(&ev);
+				handle_tag_deps(&ev);
 				break;
 
 			case _STARPU_FUT_TASK_DEPS:
@@ -1779,27 +1792,27 @@ void starpu_fxt_parse_new_file(char *filename_in, struct starpu_fxt_options *opt
 
 			case _STARPU_FUT_DATA_COPY:
 				if (!options->no_bus)
-				handle_data_copy();
+				     handle_data_copy();
 				break;
 
 			case _STARPU_FUT_START_DRIVER_COPY:
 				if (!options->no_bus)
-				handle_start_driver_copy(&ev, options);
+					handle_start_driver_copy(&ev, options);
 				break;
 
 			case _STARPU_FUT_END_DRIVER_COPY:
 				if (!options->no_bus)
-				handle_end_driver_copy(&ev, options);
+					handle_end_driver_copy(&ev, options);
 				break;
 
 			case _STARPU_FUT_START_DRIVER_COPY_ASYNC:
 				if (!options->no_bus)
-				handle_start_driver_copy_async(&ev, options);
+					handle_start_driver_copy_async(&ev, options);
 				break;
 
 			case _STARPU_FUT_END_DRIVER_COPY_ASYNC:
 				if (!options->no_bus)
-				handle_end_driver_copy_async(&ev, options);
+					handle_end_driver_copy_async(&ev, options);
 				break;
 
 			case _STARPU_FUT_WORK_STEALING:
@@ -1816,27 +1829,23 @@ void starpu_fxt_parse_new_file(char *filename_in, struct starpu_fxt_options *opt
 
 			case _STARPU_FUT_START_ALLOC:
 				if (!options->no_bus)
-				handle_memnode_event(&ev, options, "A");
+					handle_memnode_event(&ev, options, "A");
 				break;
-
 			case _STARPU_FUT_START_ALLOC_REUSE:
 				if (!options->no_bus)
-				handle_memnode_event(&ev, options, "Ar");
+					handle_memnode_event(&ev, options, "Ar");
 				break;
-
 			case _STARPU_FUT_END_ALLOC:
 			case _STARPU_FUT_END_ALLOC_REUSE:
 				if (!options->no_bus)
 				handle_memnode_event(&ev, options, "No");
 				break;
-
 			case _STARPU_FUT_START_FREE:
 				if (!options->no_bus)
 				{
 					handle_memnode_event(&ev, options, "F");
 				}
 				break;
-
 			case _STARPU_FUT_END_FREE:
 				if (!options->no_bus)
 				{
@@ -1847,14 +1856,12 @@ void starpu_fxt_parse_new_file(char *filename_in, struct starpu_fxt_options *opt
 						handle_memnode_event(&ev, options, "No");
 				}
 				break;
-
 			case _STARPU_FUT_START_WRITEBACK:
 				if (!options->no_bus)
 				{
 					handle_memnode_event(&ev, options, "W");
 				}
 				break;
-
 			case _STARPU_FUT_END_WRITEBACK:
 				if (!options->no_bus)
 				{
@@ -1865,7 +1872,6 @@ void starpu_fxt_parse_new_file(char *filename_in, struct starpu_fxt_options *opt
 						handle_memnode_event(&ev, options, "No");
 				}
 				break;
-
 			case _STARPU_FUT_START_MEMRECLAIM:
 				if (!options->no_bus)
 				{
@@ -1874,7 +1880,6 @@ void starpu_fxt_parse_new_file(char *filename_in, struct starpu_fxt_options *opt
 					handle_memnode_event(&ev, options, "R");
 				}
 				break;
-
 			case _STARPU_FUT_END_MEMRECLAIM:
 				if (!options->no_bus)
 				{
@@ -2044,11 +2049,11 @@ void starpu_fxt_parse_new_file(char *filename_in, struct starpu_fxt_options *opt
 				break;
 
 			case _STARPU_FUT_HYPERVISOR_BEGIN:
-				handle_hyp_begin(&ev, options);
+				handle_hypervisor_begin(&ev, options);
 				break;
 
 			case _STARPU_FUT_HYPERVISOR_END:
-				handle_hyp_end(&ev, options);
+				handle_hypervisor_end(&ev, options);
 				break;
 
 			default:

+ 23 - 9
src/drivers/cuda/driver_cuda.c

@@ -472,17 +472,26 @@ static void execute_job_on_cuda(struct starpu_task *task, struct _starpu_worker
 #ifndef STARPU_SIMGRID
 	if (task->cl->cuda_flags[j->nimpl] & STARPU_CUDA_ASYNC)
 	{
-		/* Record event to synchronize with task termination later */
-		cudaEventRecord(task_events[workerid][(worker->first_task + worker->ntasks - 1)%STARPU_MAX_PIPELINE], starpu_cuda_get_local_stream());
+		if (worker->pipeline_length == 0)
+		{
+			/* Forced synchronous execution */
+			cudaStreamSynchronize(starpu_cuda_get_local_stream());
+			finish_job_on_cuda(j, worker);
+		}
+		else
+		{
+			/* Record event to synchronize with task termination later */
+			cudaEventRecord(task_events[workerid][(worker->first_task + worker->ntasks - 1)%STARPU_MAX_PIPELINE], starpu_cuda_get_local_stream());
 #ifdef STARPU_USE_FXT
-		int k;
-		for (k = 0; k < (int) worker->set->nworkers; k++)
-			if (worker->set->workers[k].ntasks == worker->set->workers[k].pipeline_length)
-				break;
-		if (k == (int) worker->set->nworkers)
-			/* Everybody busy */
-			_STARPU_TRACE_START_EXECUTING()
+			int k;
+			for (k = 0; k < (int) worker->set->nworkers; k++)
+				if (worker->set->workers[k].ntasks == worker->set->workers[k].pipeline_length)
+					break;
+			if (k == (int) worker->set->nworkers)
+				/* Everybody busy */
+				_STARPU_TRACE_START_EXECUTING();
 #endif
+		}
 	}
 	else
 #else
@@ -558,6 +567,11 @@ int _starpu_cuda_driver_init(struct _starpu_worker_set *worker_set)
 		_STARPU_DEBUG("cuda (%s) dev id %u worker %u thread is ready to run on CPU %d !\n", devname, devid, i, worker->bindid);
 
 		worker->pipeline_length = starpu_get_env_number_default("STARPU_CUDA_PIPELINE", 2);
+		if (worker->pipeline_length > STARPU_MAX_PIPELINE)
+		{
+			_STARPU_DISP("Warning: STARPU_CUDA_PIPELINE is %u, but STARPU_MAX_PIPELINE is only %u", worker->pipeline_length, STARPU_MAX_PIPELINE);
+			worker->pipeline_length = STARPU_MAX_PIPELINE;
+		}
 		_STARPU_TRACE_WORKER_INIT_END(worker_set->workers[i].workerid);
 	}
 

+ 27 - 12
src/drivers/opencl/driver_opencl.c

@@ -597,6 +597,11 @@ int _starpu_opencl_driver_init(struct _starpu_worker *worker)
 	snprintf(worker->short_name, sizeof(worker->short_name), "OpenCL %u", devid);
 
 	worker->pipeline_length = starpu_get_env_number_default("STARPU_OPENCL_PIPELINE", 2);
+	if (worker->pipeline_length > STARPU_MAX_PIPELINE)
+	{
+		_STARPU_DISP("Warning: STARPU_OPENCL_PIPELINE is %u, but STARPU_MAX_PIPELINE is only %u", worker->pipeline_length, STARPU_MAX_PIPELINE);
+		worker->pipeline_length = STARPU_MAX_PIPELINE;
+	}
 
 	_STARPU_DEBUG("OpenCL (%s) dev id %d thread is ready to run on CPU %d !\n", devname, devid, worker->bindid);
 
@@ -903,18 +908,28 @@ static void _starpu_opencl_execute_job(struct starpu_task *task, struct _starpu_
 		int err;
 		cl_command_queue queue;
 		starpu_opencl_get_queue(worker->devid, &queue);
-		/* the function clEnqueueMarker is deprecated from
-		 * OpenCL version 1.2. We would like to use the new
-		 * function clEnqueueMarkerWithWaitList. We could do
-		 * it by checking its availability through our own
-		 * configure macro HAVE_CLENQUEUEMARKERWITHWAITLIST
-		 * and the OpenCL macro CL_VERSION_1_2. However these
-		 * 2 macros detect the function availability in the
-		 * ICD and not in the device implementation.
-		 */
-		err = clEnqueueMarker(queue, &task_events[worker->devid][(worker->first_task + worker->ntasks - 1)%STARPU_MAX_PIPELINE]);
-		if (STARPU_UNLIKELY(err != CL_SUCCESS)) STARPU_OPENCL_REPORT_ERROR(err);
-		_STARPU_TRACE_START_EXECUTING();
+
+		if (worker->pipeline_length == 0)
+		{
+			starpu_opencl_get_queue(worker->devid, &queue);
+			clFinish(queue);
+			_starpu_opencl_stop_job(j, worker);
+		}
+		else
+		{
+			/* the function clEnqueueMarker is deprecated from
+			 * OpenCL version 1.2. We would like to use the new
+			 * function clEnqueueMarkerWithWaitList. We could do
+			 * it by checking its availability through our own
+			 * configure macro HAVE_CLENQUEUEMARKERWITHWAITLIST
+			 * and the OpenCL macro CL_VERSION_1_2. However these
+			 * 2 macros detect the function availability in the
+			 * ICD and not in the device implementation.
+			 */
+			err = clEnqueueMarker(queue, &task_events[worker->devid][(worker->first_task + worker->ntasks - 1)%STARPU_MAX_PIPELINE]);
+			if (STARPU_UNLIKELY(err != CL_SUCCESS)) STARPU_OPENCL_REPORT_ERROR(err);
+			_STARPU_TRACE_START_EXECUTING();
+		}
 	}
 	else
 #else

+ 17 - 0
src/util/starpu_task_insert_utils.c

@@ -118,6 +118,10 @@ void _starpu_task_insert_get_args_size(va_list varg_list, unsigned *nbuffers, si
 		{
 			va_arg(varg_list, int);
 		}
+		else if (arg_type==STARPU_WORKER_ORDER)
+		{
+			va_arg(varg_list, unsigned);
+		}
 		else if (arg_type==STARPU_SCHED_CTX)
 		{
 			(void)va_arg(varg_list, unsigned);
@@ -231,6 +235,10 @@ int _starpu_codelet_pack_args(void **arg_buffer, size_t arg_buffer_size, va_list
 		{
 			va_arg(varg_list, int);
 		}
+		else if (arg_type==STARPU_WORKER_ORDER)
+		{
+			va_arg(varg_list, unsigned);
+		}
 		else if (arg_type==STARPU_SCHED_CTX)
 		{
 			(void)va_arg(varg_list, unsigned);
@@ -405,6 +413,15 @@ void _starpu_task_insert_create(void *arg_buffer, size_t arg_buffer_size, struct
 				(*task)->execute_on_a_specific_worker = 1;
 			}
 		}
+		else if (arg_type==STARPU_WORKER_ORDER)
+		{
+			unsigned order = va_arg(varg_list, unsigned);
+			if (order != 0)
+			{
+				STARPU_ASSERT_MSG((*task)->execute_on_a_specific_worker, "worker order only makes sense if a workerid is provided");
+				(*task)->workerorder = order;
+			}
+		}
 		else if (arg_type==STARPU_SCHED_CTX)
 		{
 			unsigned sched_ctx = va_arg(varg_list, unsigned);

+ 1 - 0
tests/Makefile.am

@@ -113,6 +113,7 @@ noinst_PROGRAMS =				\
 	main/deploop                            \
 	main/restart				\
 	main/execute_on_a_specific_worker	\
+	main/execute_schedule			\
 	main/insert_task			\
 	main/insert_task_nullcodelet			\
 	main/insert_task_array			\

+ 2 - 2
tests/datawizard/interfaces/multiformat/multiformat_interface.c

@@ -83,7 +83,7 @@ void test_multiformat_mic_func(void *buffers[], void *args)
 	printf("MIC\n");
 
 	struct struct_of_arrays *soa;
-	unsigned int n, i;
+	int n, i;
 	int factor;
 
 	soa = (struct struct_of_arrays *) STARPU_MULTIFORMAT_GET_MIC_PTR(buffers[0]);
@@ -92,7 +92,7 @@ void test_multiformat_mic_func(void *buffers[], void *args)
 
 	for (i = 0; i < n; i++)
 	{
-			FPRINTF(stderr, "(%d %d) [%d]", soa->x[i], soa->y[i], factor);
+		FPRINTF(stderr, "(%d %d) [%d]", soa->x[i], soa->y[i], factor);
 		if (soa->x[i] != i * factor || soa->y[i] != i * factor)
 		{
 			multiformat_config.copy_failed = 1;

+ 109 - 0
tests/main/execute_schedule.c

@@ -0,0 +1,109 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2014  Université de Bordeaux 1
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <stdio.h>
+#include <unistd.h>
+#include <errno.h>
+#include <starpu.h>
+#include <stdlib.h>
+#include "../helper.h"
+#include <common/thread.h>
+
+#ifdef STARPU_QUICK_CHECK
+  #define K 2
+#else
+  #define K 16
+#endif
+
+#define N 64
+
+static unsigned current = 1;
+
+void codelet(STARPU_ATTRIBUTE_UNUSED void *descr[], void *_args)
+{
+	uintptr_t me = (uintptr_t) _args;
+	STARPU_ASSERT(current == me);
+	current++;
+}
+
+static struct starpu_codelet cl =
+{
+	.cpu_funcs = {codelet, NULL},
+	.cuda_funcs = {codelet, NULL},
+	.opencl_funcs = {codelet, NULL},
+	.nbuffers = 0,
+};
+
+int main(int argc, char **argv)
+{
+	int ret;
+	struct starpu_task *dep_task[N];
+
+	ret = starpu_initialize(NULL, &argc, &argv);
+	if (ret == -ENODEV) return STARPU_TEST_SKIPPED;
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
+
+	unsigned n, i, k;
+
+	for (k = 0; k < K; k++)
+	{
+		for (n = 0; n < N; n++)
+		{
+			struct starpu_task *task;
+
+			dep_task[n] = starpu_task_create();
+
+			dep_task[n]->cl = NULL;
+
+			task = starpu_task_create();
+
+			task->cl = &cl;
+
+			task->execute_on_a_specific_worker = 1;
+			task->workerid = 0;
+			task->workerorder = k*N + n+1;
+			task->cl_arg = (void*) (uintptr_t) (k*N + n+1);
+
+			starpu_task_declare_deps_array(task, 1, &dep_task[n]);
+
+			ret = starpu_task_submit(task);
+			if (ret == -ENODEV) goto enodev;
+			STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");
+		}
+
+		for (n = 0; n < N; n++)
+		{
+			i = (int)starpu_drand48()%(N-n);
+			ret = starpu_task_submit(dep_task[i]);
+			memmove(&dep_task[i], &dep_task[i+1], (N-i-1)*sizeof(dep_task[i]));
+			if (ret == -ENODEV) goto enodev;
+			STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");
+		}
+	}
+
+	starpu_task_wait_for_all();
+
+	starpu_shutdown();
+
+	return EXIT_SUCCESS;
+
+enodev:
+	starpu_shutdown();
+	fprintf(stderr, "WARNING: No one can execute this task\n");
+	/* yes, we do not perform the computation but we did detect that no one
+ 	 * could perform the kernel, so this is not an error from StarPU */
+	return STARPU_TEST_SKIPPED;
+}

+ 11 - 0
tests/sched_policies/simple_cpu_gpu_sched.c

@@ -205,7 +205,18 @@ run(struct starpu_sched_policy *policy)
 	if (cpu_task_worker != STARPU_CPU_WORKER ||
 			(gpu_task_worker != STARPU_CUDA_WORKER &&
 			 gpu_task_worker != STARPU_OPENCL_WORKER))
+	{
+		if (cpu_task_worker != STARPU_CPU_WORKER)
+		{
+			FPRINTF(stderr, "The CPU task did not run on a CPU worker\n");
+		}
+		if (gpu_task_worker != STARPU_CUDA_WORKER && gpu_task_worker != STARPU_OPENCL_WORKER)
+		{
+			FPRINTF(stderr, "The GPU task did not run on a Cuda or OpenCL worker\n");
+		}
+
 		ret = 1;
+	}
 	else
 		ret = 0;