Browse Source

Add CUDA kernel submission pipelining, to overlap costs and allow concurrent
kernel execution on Fermi cards.

Samuel Thibault 11 years ago
parent
commit
f99ef13c88

+ 2 - 0
ChangeLog

@@ -47,6 +47,8 @@ New features:
     CUDA and OpenCL kernel execution.
     CUDA and OpenCL kernel execution.
   * Add CUDA concurrent kernel execution support through
   * Add CUDA concurrent kernel execution support through
     the STARPU_NWORKER_PER_CUDA environment variable.
     the STARPU_NWORKER_PER_CUDA environment variable.
+  * Add CUDA kernel submission pipelining, to overlap costs and allow concurrent
+    kernel execution on Fermi cards.
   * New locality work stealing scheduler (lws).
   * New locality work stealing scheduler (lws).
   * Add STARPU_VARIABLE_NBUFFERS to be set in cl.nbuffers, and nbuffers and
   * Add STARPU_VARIABLE_NBUFFERS to be set in cl.nbuffers, and nbuffers and
     modes field to the task structure, which permit to define codelets taking a
     modes field to the task structure, which permit to define codelets taking a

+ 10 - 0
doc/doxygen/chapters/40environment_variables.doxy

@@ -51,6 +51,16 @@ Specify the number of workers per CUDA device, and thus the number of kernels
 which will be concurrently running on the devices. The default value is 1.
 which will be concurrently running on the devices. The default value is 1.
 </dd>
 </dd>
 
 
+<dt>STARPU_CUDA_PIPELINE</dt>
+<dd>
+\anchor STARPU_CUDA_PIPELINE
+\addindex __env__STARPU_CUDA_PIPELINE
+Specify how many asynchronous tasks are submitted in advance on CUDA
+devices. This for instance permits to overlap task management with the execution
+of previous tasks, but it also allows concurrent execution on Fermi cards, which
+otherwise bring spurious synchronizations. The default is 2.
+</dd>
+
 <dt>STARPU_NOPENCL</dt>
 <dt>STARPU_NOPENCL</dt>
 <dd>
 <dd>
 \anchor STARPU_NOPENCL
 \anchor STARPU_NOPENCL

+ 3 - 0
src/core/workers.c

@@ -428,6 +428,9 @@ static void _starpu_worker_init(struct _starpu_worker *workerarg, struct _starpu
 	STARPU_PTHREAD_MUTEX_INIT(&workerarg->sched_mutex, NULL);
 	STARPU_PTHREAD_MUTEX_INIT(&workerarg->sched_mutex, NULL);
 	starpu_task_list_init(&workerarg->local_tasks);
 	starpu_task_list_init(&workerarg->local_tasks);
 	workerarg->current_task = NULL;
 	workerarg->current_task = NULL;
+	workerarg->first_task = 0;
+	workerarg->ntasks = 0;
+	workerarg->pipeline_length = 0;
 	workerarg->set = NULL;
 	workerarg->set = NULL;
 
 
 	/* if some codelet's termination cannot be handled directly :
 	/* if some codelet's termination cannot be handled directly :

+ 8 - 1
src/core/workers.h

@@ -52,6 +52,8 @@
 
 
 #include <starpu_parameters.h>
 #include <starpu_parameters.h>
 
 
+#define STARPU_MAX_PIPELINE 4
+
 /* This is initialized from in _starpu_worker_init */
 /* This is initialized from in _starpu_worker_init */
 LIST_TYPE(_starpu_worker,
 LIST_TYPE(_starpu_worker,
 	struct _starpu_machine_config *config;
 	struct _starpu_machine_config *config;
@@ -73,7 +75,12 @@ LIST_TYPE(_starpu_worker,
 	starpu_pthread_cond_t sched_cond; /* condition variable used when the worker waits for tasks. */
 	starpu_pthread_cond_t sched_cond; /* condition variable used when the worker waits for tasks. */
         starpu_pthread_mutex_t sched_mutex; /* mutex protecting sched_cond */
         starpu_pthread_mutex_t sched_mutex; /* mutex protecting sched_cond */
 	struct starpu_task_list local_tasks; /* this queue contains tasks that have been explicitely submitted to that queue */
 	struct starpu_task_list local_tasks; /* this queue contains tasks that have been explicitely submitted to that queue */
-	struct starpu_task *current_task; /* task currently executed by this worker */
+	struct starpu_task *current_task; /* task currently executed by this worker (non-pipelined version) */
+	struct starpu_task *current_tasks[STARPU_MAX_PIPELINE]; /* tasks currently executed by this worker (pipelined version) */
+	unsigned char first_task; /* Index of first task in the pipeline */
+	unsigned char ntasks; /* number of tasks in the pipeline */
+	unsigned char pipeline_length; /* number of tasks to be put in the pipeline */
+	unsigned char pipeline_stuck; /* whether a task prevents us from pipelining */
 	struct _starpu_worker_set *set; /* in case this worker belongs to a set */
 	struct _starpu_worker_set *set; /* in case this worker belongs to a set */
 	struct _starpu_job_list *terminated_jobs; /* list of pending jobs which were executed */
 	struct _starpu_job_list *terminated_jobs; /* list of pending jobs which were executed */
 	unsigned worker_is_running;
 	unsigned worker_is_running;

+ 133 - 78
src/drivers/cuda/driver_cuda.c

@@ -49,7 +49,7 @@ static cudaStream_t in_transfer_streams[STARPU_MAXCUDADEVS];
 static cudaStream_t in_peer_transfer_streams[STARPU_MAXCUDADEVS][STARPU_MAXCUDADEVS];
 static cudaStream_t in_peer_transfer_streams[STARPU_MAXCUDADEVS][STARPU_MAXCUDADEVS];
 static cudaStream_t out_peer_transfer_streams[STARPU_MAXCUDADEVS][STARPU_MAXCUDADEVS];
 static cudaStream_t out_peer_transfer_streams[STARPU_MAXCUDADEVS][STARPU_MAXCUDADEVS];
 static struct cudaDeviceProp props[STARPU_MAXCUDADEVS];
 static struct cudaDeviceProp props[STARPU_MAXCUDADEVS];
-static cudaEvent_t task_events[STARPU_NMAXWORKERS];
+static cudaEvent_t task_events[STARPU_NMAXWORKERS][STARPU_MAX_PIPELINE];
 #endif /* STARPU_USE_CUDA */
 #endif /* STARPU_USE_CUDA */
 
 
 void
 void
@@ -221,7 +221,7 @@ static void init_context(struct _starpu_worker_set *worker_set, unsigned devid)
 {
 {
 	cudaError_t cures;
 	cudaError_t cures;
 	int workerid;
 	int workerid;
-	unsigned i;
+	unsigned i, j;
 
 
 	/* TODO: cudaSetDeviceFlag(cudaDeviceMapHost) */
 	/* TODO: cudaSetDeviceFlag(cudaDeviceMapHost) */
 
 
@@ -276,7 +276,8 @@ static void init_context(struct _starpu_worker_set *worker_set, unsigned devid)
 	{
 	{
 		workerid = worker_set->workers[i].workerid;
 		workerid = worker_set->workers[i].workerid;
 
 
-		cures = cudaEventCreateWithFlags(&task_events[workerid], cudaEventDisableTiming);
+		for (j = 0; j < STARPU_MAX_PIPELINE; j++)
+			cures = cudaEventCreateWithFlags(&task_events[workerid][j], cudaEventDisableTiming);
 		if (STARPU_UNLIKELY(cures))
 		if (STARPU_UNLIKELY(cures))
 			STARPU_CUDA_REPORT_ERROR(cures);
 			STARPU_CUDA_REPORT_ERROR(cures);
 
 
@@ -307,7 +308,7 @@ static void init_context(struct _starpu_worker_set *worker_set, unsigned devid)
 static void deinit_context(struct _starpu_worker_set *worker_set)
 static void deinit_context(struct _starpu_worker_set *worker_set)
 {
 {
 	cudaError_t cures;
 	cudaError_t cures;
-	unsigned i;
+	unsigned i, j;
 	int workerid = worker_set->workers[0].workerid;
 	int workerid = worker_set->workers[0].workerid;
 	int devid = starpu_worker_get_devid(workerid);
 	int devid = starpu_worker_get_devid(workerid);
 
 
@@ -316,7 +317,8 @@ static void deinit_context(struct _starpu_worker_set *worker_set)
 		workerid = worker_set->workers[i].workerid;
 		workerid = worker_set->workers[i].workerid;
 		devid = starpu_worker_get_devid(workerid);
 		devid = starpu_worker_get_devid(workerid);
 
 
-		cudaEventDestroy(task_events[workerid]);
+		for (j = 0; j < STARPU_MAX_PIPELINE; j++)
+			cudaEventDestroy(task_events[workerid][j]);
 		cudaStreamDestroy(streams[workerid]);
 		cudaStreamDestroy(streams[workerid]);
 	}
 	}
 
 
@@ -396,7 +398,11 @@ static int start_job_on_cuda(struct _starpu_job *j, struct _starpu_worker *worke
 		return -EAGAIN;
 		return -EAGAIN;
 	}
 	}
 
 
-	_starpu_driver_start_job(worker, j, &worker->perf_arch, &j->cl_start, 0, profiling);
+	if (worker->ntasks == 1)
+	{
+		/* We are alone in the pipeline, the kernel will start now, record it */
+		_starpu_driver_start_job(worker, j, &worker->perf_arch, &j->cl_start, 0, profiling);
+	}
 
 
 #if defined(HAVE_CUDA_MEMCPY_PEER) && !defined(STARPU_SIMGRID)
 #if defined(HAVE_CUDA_MEMCPY_PEER) && !defined(STARPU_SIMGRID)
 	/* We make sure we do manipulate the proper device */
 	/* We make sure we do manipulate the proper device */
@@ -427,7 +433,9 @@ static void finish_job_on_cuda(struct _starpu_job *j, struct _starpu_worker *wor
 	int profiling = starpu_profiling_status_get();
 	int profiling = starpu_profiling_status_get();
 
 
 	_starpu_set_current_task(NULL);
 	_starpu_set_current_task(NULL);
-	worker->current_task = NULL;
+	worker->current_tasks[worker->first_task] = NULL;
+	worker->first_task = (worker->first_task + 1) % STARPU_MAX_PIPELINE;
+	worker->ntasks--;
 
 
 	_starpu_driver_end_job(worker, j, &worker->perf_arch, &codelet_end, 0, profiling);
 	_starpu_driver_end_job(worker, j, &worker->perf_arch, &codelet_end, 0, profiling);
 
 
@@ -438,21 +446,74 @@ static void finish_job_on_cuda(struct _starpu_job *j, struct _starpu_worker *wor
 	_starpu_handle_job_termination(j);
 	_starpu_handle_job_termination(j);
 }
 }
 
 
+/* Execute a job, up to completion for synchronous jobs */
+static void execute_job_on_cuda(struct starpu_task *task, struct _starpu_worker *worker)
+{
+	int workerid = worker->workerid;
+	int res;
+
+	struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
+
+	res = start_job_on_cuda(j, worker);
+
+	if (res)
+	{
+		switch (res)
+		{
+			case -EAGAIN:
+				_STARPU_DISP("ouch, CUDA could not actually run task %p, putting it back...\n", task);
+				_starpu_push_task_to_workers(task);
+				STARPU_ABORT();
+			default:
+				STARPU_ABORT();
+		}
+	}
+
+#ifndef STARPU_SIMGRID
+	if (task->cl->cuda_flags[j->nimpl] & STARPU_CUDA_ASYNC)
+	{
+		/* Record event to synchronize with task termination later */
+		cudaEventRecord(task_events[workerid][(worker->first_task + worker->ntasks - 1)%STARPU_MAX_PIPELINE], starpu_cuda_get_local_stream());
+#ifdef STARPU_USE_FXT
+		int k;
+		for (k = 0; k < (int) worker->set->nworkers; k++)
+			if (worker->set->workers[k].ntasks)
+				break;
+		if (k < (int) worker->set->nworkers)
+			/* Everybody busy */
+			_STARPU_TRACE_START_EXECUTING()
+#endif
+	}
+	else
+#else
+#ifdef STARPU_DEVEL
+#warning No CUDA asynchronous execution with simgrid yet.
+#endif
+#endif
+	/* Synchronous execution */
+	{
+#if defined(STARPU_DEBUG) && !defined(STARPU_SIMGRID)
+		STARPU_ASSERT_MSG(cudaStreamQuery(starpu_cuda_get_local_stream()) == cudaSuccess, "CUDA codelets have to wait for termination of their kernels on the starpu_cuda_get_local_stream() stream");
+#endif
+		finish_job_on_cuda(j, worker);
+	}
+}
+
 /* XXX Should this be merged with _starpu_init_cuda ? */
 /* XXX Should this be merged with _starpu_init_cuda ? */
 int _starpu_cuda_driver_init(struct _starpu_worker_set *worker_set)
 int _starpu_cuda_driver_init(struct _starpu_worker_set *worker_set)
 {
 {
-	struct _starpu_worker *worker = &worker_set->workers[0];
-	unsigned devid = worker->devid;
+	struct _starpu_worker *worker0 = &worker_set->workers[0];
+	unsigned devid = worker0->devid;
 	unsigned i;
 	unsigned i;
 
 
-	_starpu_worker_start(worker, _STARPU_FUT_CUDA_KEY);
+	_starpu_worker_start(worker0, _STARPU_FUT_CUDA_KEY);
 
 
 #ifdef STARPU_USE_FXT
 #ifdef STARPU_USE_FXT
-	unsigned memnode = worker->memory_node;
+	unsigned memnode = worker0->memory_node;
 	for (i = 1; i < worker_set->nworkers; i++)
 	for (i = 1; i < worker_set->nworkers; i++)
 	{
 	{
-		struct _starpu_worker *_worker = &worker_set->workers[i];
-		_STARPU_TRACE_WORKER_INIT_START(_STARPU_FUT_CUDA_KEY, _worker->workerid, devid, memnode);
+		struct _starpu_worker *worker = &worker_set->workers[i];
+		_STARPU_TRACE_WORKER_INIT_START(_STARPU_FUT_CUDA_KEY, worker->workerid, devid, memnode);
 	}
 	}
 #endif
 #endif
 
 
@@ -461,14 +522,14 @@ int _starpu_cuda_driver_init(struct _starpu_worker_set *worker_set)
 #endif
 #endif
 
 
 	_starpu_cuda_limit_gpu_mem_if_needed(devid);
 	_starpu_cuda_limit_gpu_mem_if_needed(devid);
-	_starpu_memory_manager_set_global_memory_size(worker->memory_node, _starpu_cuda_get_global_mem_size(devid));
+	_starpu_memory_manager_set_global_memory_size(worker0->memory_node, _starpu_cuda_get_global_mem_size(devid));
 
 
-	_starpu_malloc_init(worker->memory_node);
+	_starpu_malloc_init(worker0->memory_node);
 
 
 	/* one more time to avoid hacks from third party lib :) */
 	/* one more time to avoid hacks from third party lib :) */
-	_starpu_bind_thread_on_cpu(worker->config, worker->bindid);
+	_starpu_bind_thread_on_cpu(worker0->config, worker0->bindid);
 
 
-	worker->status = STATUS_UNKNOWN;
+	worker0->status = STATUS_UNKNOWN;
 
 
 	float size = (float) global_mem[devid] / (1<<30);
 	float size = (float) global_mem[devid] / (1<<30);
 #ifdef STARPU_SIMGRID
 #ifdef STARPU_SIMGRID
@@ -479,29 +540,31 @@ int _starpu_cuda_driver_init(struct _starpu_worker_set *worker_set)
 	strncpy(devname, props[devid].name, 128);
 	strncpy(devname, props[devid].name, 128);
 #endif
 #endif
 
 
+	for (i = 0; i < worker_set->nworkers; i++)
+	{
+		struct _starpu_worker *worker = &worker_set->workers[i];
 #if defined(STARPU_HAVE_BUSID) && !defined(STARPU_SIMGRID)
 #if defined(STARPU_HAVE_BUSID) && !defined(STARPU_SIMGRID)
 #if defined(STARPU_HAVE_DOMAINID) && !defined(STARPU_SIMGRID)
 #if defined(STARPU_HAVE_DOMAINID) && !defined(STARPU_SIMGRID)
-	if (props[devid].pciDomainID)
-		snprintf(worker->name, sizeof(worker->name), "CUDA %u (%s %.1f GiB %04x:%02x:%02x.0)", devid, devname, size, props[devid].pciDomainID, props[devid].pciBusID, props[devid].pciDeviceID);
-	else
+		if (props[devid].pciDomainID)
+			snprintf(worker->name, sizeof(worker->name), "CUDA %u.%u (%s %.1f GiB %04x:%02x:%02x.0)", devid, i, devname, size, props[devid].pciDomainID, props[devid].pciBusID, props[devid].pciDeviceID);
+		else
 #endif
 #endif
-		snprintf(worker->name, sizeof(worker->name), "CUDA %u (%s %.1f GiB %02x:%02x.0)", devid, devname, size, props[devid].pciBusID, props[devid].pciDeviceID);
+			snprintf(worker->name, sizeof(worker->name), "CUDA %u.%u (%s %.1f GiB %02x:%02x.0)", devid, i, devname, size, props[devid].pciBusID, props[devid].pciDeviceID);
 #else
 #else
-	snprintf(worker->name, sizeof(worker->name), "CUDA %u (%s %.1f GiB)", devid, devname, size);
+		snprintf(worker->name, sizeof(worker->name), "CUDA %u.%u (%s %.1f GiB)", devid, i, devname, size);
 #endif
 #endif
-	snprintf(worker->short_name, sizeof(worker->short_name), "CUDA %u", devid);
-	_STARPU_DEBUG("cuda (%s) dev id %u thread is ready to run on CPU %d !\n", devname, devid, worker->bindid);
+		snprintf(worker->short_name, sizeof(worker->short_name), "CUDA %u.%u", devid, i);
+		_STARPU_DEBUG("cuda (%s) dev id %u worker %u thread is ready to run on CPU %d !\n", devname, devid, i, worker->bindid);
 
 
-	for (i = 0; i < worker_set->nworkers; i++)
-	{
+		worker->pipeline_length = starpu_get_env_number_default("STARPU_CUDA_PIPELINE", 2);
 		_STARPU_TRACE_WORKER_INIT_END(worker_set->workers[i].workerid);
 		_STARPU_TRACE_WORKER_INIT_END(worker_set->workers[i].workerid);
 	}
 	}
 
 
 	/* tell the main thread that this one is ready */
 	/* tell the main thread that this one is ready */
-	STARPU_PTHREAD_MUTEX_LOCK(&worker->mutex);
-	worker->worker_is_initialized = 1;
-	STARPU_PTHREAD_COND_SIGNAL(&worker->ready_cond);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&worker->mutex);
+	STARPU_PTHREAD_MUTEX_LOCK(&worker0->mutex);
+	worker0->worker_is_initialized = 1;
+	STARPU_PTHREAD_COND_SIGNAL(&worker0->ready_cond);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&worker0->mutex);
 
 
 	/* tell the main thread that this one is ready */
 	/* tell the main thread that this one is ready */
 	STARPU_PTHREAD_MUTEX_LOCK(&worker_set->mutex);
 	STARPU_PTHREAD_MUTEX_LOCK(&worker_set->mutex);
@@ -530,16 +593,16 @@ int _starpu_cuda_driver_run_once(struct _starpu_worker_set *worker_set)
 		struct _starpu_worker *worker = &worker_set->workers[i];
 		struct _starpu_worker *worker = &worker_set->workers[i];
 		int workerid = worker->workerid;
 		int workerid = worker->workerid;
 
 
-		task = worker->current_task;
-
-		if (!task)
+		if (!worker->ntasks)
 		{
 		{
 			idle++;
 			idle++;
 			continue;
 			continue;
 		}
 		}
 
 
+		task = worker->current_tasks[worker->first_task];
+
 		/* On-going asynchronous task, check for its termination first */
 		/* On-going asynchronous task, check for its termination first */
-		cudaError_t cures = cudaEventQuery(task_events[workerid]);
+		cudaError_t cures = cudaEventQuery(task_events[workerid][worker->first_task]);
 
 
 		if (cures != cudaSuccess)
 		if (cures != cudaSuccess)
 		{
 		{
@@ -550,11 +613,36 @@ int _starpu_cuda_driver_run_once(struct _starpu_worker_set *worker_set)
 			/* Asynchronous task completed! */
 			/* Asynchronous task completed! */
 			_starpu_set_local_worker_key(worker);
 			_starpu_set_local_worker_key(worker);
 			finish_job_on_cuda(_starpu_get_job_associated_to_task(task), worker);
 			finish_job_on_cuda(_starpu_get_job_associated_to_task(task), worker);
+			/* See next task if any */
+			if (worker->ntasks)
+			{
+				task = worker->current_tasks[worker->first_task];
+				j = _starpu_get_job_associated_to_task(task);
+				if (task->cl->cuda_flags[j->nimpl] & STARPU_CUDA_ASYNC)
+				{
+					/* An asynchronous task, it was already
+					 * queued, it's now running, record its start time.  */
+					_starpu_driver_start_job(worker, j, &worker->perf_arch, &j->cl_start, 0, starpu_profiling_status_get());
+					/* Skip the idle handling part, we are still busy */
+					continue;
+				}
+				else
+				{
+					/* A synchronous task, we have finished
+					 * flushing the pipeline, we can now at
+					 * last execute it.  */
+
+					_STARPU_TRACE_END_PROGRESS(memnode);
+					execute_job_on_cuda(task, worker);
+					_STARPU_TRACE_START_PROGRESS(memnode);
+					worker->pipeline_stuck = 0;
+				}
+			}
 			idle++;
 			idle++;
 #ifdef STARPU_USE_FXT
 #ifdef STARPU_USE_FXT
 			int k;
 			int k;
 			for (k = 0; k < (int) worker_set->nworkers; k++)
 			for (k = 0; k < (int) worker_set->nworkers; k++)
-				if (worker_set->workers[k].current_task)
+				if (worker_set->workers[k].ntasks)
 					break;
 					break;
 			if (k == (int) worker_set->nworkers)
 			if (k == (int) worker_set->nworkers)
 				/* Everybody busy */
 				/* Everybody busy */
@@ -583,13 +671,11 @@ int _starpu_cuda_driver_run_once(struct _starpu_worker_set *worker_set)
 	for (i = 0; i < (int) worker_set->nworkers; i++)
 	for (i = 0; i < (int) worker_set->nworkers; i++)
 	{
 	{
 		struct _starpu_worker *worker = &worker_set->workers[i];
 		struct _starpu_worker *worker = &worker_set->workers[i];
-		int workerid = worker->workerid;
 
 
 		task = tasks[i];
 		task = tasks[i];
 		if (!task)
 		if (!task)
 			continue;
 			continue;
 
 
-		_starpu_set_local_worker_key(worker);
 
 
 		j = _starpu_get_job_associated_to_task(task);
 		j = _starpu_get_job_associated_to_task(task);
 
 
@@ -601,50 +687,19 @@ int _starpu_cuda_driver_run_once(struct _starpu_worker_set *worker_set)
 			continue;
 			continue;
 		}
 		}
 
 
-		_STARPU_TRACE_END_PROGRESS(memnode);
-		res = start_job_on_cuda(j, worker);
-
-		if (res)
+		if (worker->ntasks > 1 && !(task->cl->cuda_flags[j->nimpl] & STARPU_CUDA_ASYNC))
 		{
 		{
-			switch (res)
-			{
-				case -EAGAIN:
-					_STARPU_DISP("ouch, CUDA could not actually run task %p, putting it back...\n", task);
-					_starpu_push_task_to_workers(task);
-					STARPU_ABORT();
-				default:
-					STARPU_ABORT();
-			}
+			/* We have to execute a non-asynchronous task but we
+			 * still have tasks in the pipeline...  Record it to
+			 * prevent more tasks from coming, and do it later */
+			worker->pipeline_stuck = 1;
+			continue;
 		}
 		}
 
 
-#ifndef STARPU_SIMGRID
-		if (task->cl->cuda_flags[j->nimpl] & STARPU_CUDA_ASYNC)
-		{
-			/* Record event to synchronize with task termination later */
-			cudaEventRecord(task_events[workerid], starpu_cuda_get_local_stream());
-#ifdef STARPU_USE_FXT
-			int k;
-			for (k = 0; k < (int) worker_set->nworkers; k++)
-				if (worker_set->workers[k].current_task)
-					break;
-			if (k < (int) worker_set->nworkers)
-				/* Everybody busy */
-				_STARPU_TRACE_START_EXECUTING()
-#endif
-		}
-		else
-#else
-#ifdef STARPU_DEVEL
-#warning No CUDA asynchronous execution with simgrid yet.
-#endif
-#endif
-		/* Synchronous execution */
-		{
-#if defined(STARPU_DEBUG) && !defined(STARPU_SIMGRID)
-			STARPU_ASSERT_MSG(cudaStreamQuery(starpu_cuda_get_local_stream()) == cudaSuccess, "CUDA codelets have to wait for termination of their kernels on the starpu_cuda_get_local_stream() stream");
-#endif
-			finish_job_on_cuda(j, worker);
-		}
+		_starpu_set_local_worker_key(worker);
+
+		_STARPU_TRACE_END_PROGRESS(memnode);
+		execute_job_on_cuda(task, worker);
 		_STARPU_TRACE_START_PROGRESS(memnode);
 		_STARPU_TRACE_START_PROGRESS(memnode);
 	}
 	}
 
 

+ 12 - 3
src/drivers/driver_common/driver_common.c

@@ -333,8 +333,11 @@ int _starpu_get_multi_worker_task(struct _starpu_worker *workers, struct starpu_
 	/*for each worker*/
 	/*for each worker*/
 	for (i = 0; i < nworkers; i++)
 	for (i = 0; i < nworkers; i++)
 	{
 	{
-		/*if the worker is already executinf a task then */
-		if(workers[i].current_task)
+		/*if the worker is already executing a task then */
+		if((workers[i].pipeline_length == 0 && workers[i].current_task)
+			|| (workers[i].pipeline_length != 0 &&
+				(workers[i].ntasks == workers[i].pipeline_length
+				 || workers[i].pipeline_stuck)))
 		{
 		{
 			tasks[i] = NULL;
 			tasks[i] = NULL;
 		}
 		}
@@ -354,7 +357,13 @@ int _starpu_get_multi_worker_task(struct _starpu_worker *workers, struct starpu_
 				count ++;
 				count ++;
 				j = _starpu_get_job_associated_to_task(tasks[i]);
 				j = _starpu_get_job_associated_to_task(tasks[i]);
 				is_parallel_task = (j->task_size > 1);
 				is_parallel_task = (j->task_size > 1);
-				workers[i].current_task = j->task;
+				if (workers[i].pipeline_length)
+				{
+					workers[i].current_tasks[(workers[i].first_task + workers[i].ntasks)%STARPU_MAX_PIPELINE] = tasks[i];
+					workers[i].ntasks++;
+				}
+				else
+					workers[i].current_task = j->task;
 				/* Get the rank in case it is a parallel task */
 				/* Get the rank in case it is a parallel task */
 				if (is_parallel_task)
 				if (is_parallel_task)
 				{
 				{

+ 34 - 8
tests/overlap/gpu_concurrency.c

@@ -24,27 +24,48 @@
 #include <common/thread.h>
 #include <common/thread.h>
 
 
 #define NITERS 1000000
 #define NITERS 1000000
-#define NTASKS 128
+#define NTASKS 64
+#define SYNC 16
 
 
 #ifdef STARPU_USE_CUDA
 #ifdef STARPU_USE_CUDA
 extern void long_kernel_cuda(unsigned long niters);
 extern void long_kernel_cuda(unsigned long niters);
-void codelet_long_kernel(STARPU_ATTRIBUTE_UNUSED void *descr[], STARPU_ATTRIBUTE_UNUSED void *_args)
+
+void codelet_long_kernel_async(STARPU_ATTRIBUTE_UNUSED void *descr[], STARPU_ATTRIBUTE_UNUSED void *_args)
+{
+	long_kernel_cuda(NITERS);
+}
+
+void codelet_long_kernel_sync(STARPU_ATTRIBUTE_UNUSED void *descr[], STARPU_ATTRIBUTE_UNUSED void *_args)
 {
 {
 	long_kernel_cuda(NITERS);
 	long_kernel_cuda(NITERS);
+	cudaStreamSynchronize(starpu_cuda_get_local_stream());
 }
 }
 
 
-static struct starpu_perfmodel model =
+static struct starpu_perfmodel model_async =
 {
 {
 	.type = STARPU_HISTORY_BASED,
 	.type = STARPU_HISTORY_BASED,
-	.symbol = "long_kernel",
+	.symbol = "long_kernel_async",
 };
 };
 
 
-static struct starpu_codelet cl =
+static struct starpu_perfmodel model_sync =
+{
+	.type = STARPU_HISTORY_BASED,
+	.symbol = "long_kernel_sync",
+};
+
+static struct starpu_codelet cl_async =
 {
 {
-	.cuda_funcs = {codelet_long_kernel, NULL},
+	.cuda_funcs = {codelet_long_kernel_async, NULL},
 	.cuda_flags = {STARPU_CUDA_ASYNC},
 	.cuda_flags = {STARPU_CUDA_ASYNC},
 	.nbuffers = 0,
 	.nbuffers = 0,
-	.model =  &model
+	.model =  &model_async,
+};
+
+static struct starpu_codelet cl =
+{
+	.cuda_funcs = {codelet_long_kernel_sync, NULL},
+	.nbuffers = 0,
+	.model =  &model_sync,
 };
 };
 #endif
 #endif
 
 
@@ -53,6 +74,7 @@ int main(int argc, char **argv)
 #ifndef STARPU_USE_CUDA
 #ifndef STARPU_USE_CUDA
 	return STARPU_TEST_SKIPPED;
 	return STARPU_TEST_SKIPPED;
 #else
 #else
+	setenv("STARPU_NWORKER_PER_CUDA", "4", 1);
 	int ret = starpu_initialize(NULL, &argc, &argv);
 	int ret = starpu_initialize(NULL, &argc, &argv);
 	if (ret == -ENODEV) return STARPU_TEST_SKIPPED;
 	if (ret == -ENODEV) return STARPU_TEST_SKIPPED;
 	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
 	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
@@ -66,7 +88,11 @@ int main(int argc, char **argv)
 	for (iter = 0; iter < NTASKS; iter++)
 	for (iter = 0; iter < NTASKS; iter++)
 	{
 	{
 		struct starpu_task *task = starpu_task_create();
 		struct starpu_task *task = starpu_task_create();
-		task->cl = &cl;
+
+		if (!(iter % SYNC))
+			task->cl = &cl;
+		else
+			task->cl = &cl_async;
 
 
 		ret = starpu_task_submit(task);
 		ret = starpu_task_submit(task);
 		if (ret == -ENODEV) goto enodev;
 		if (ret == -ENODEV) goto enodev;

+ 41 - 25
tools/gdbinit

@@ -1,7 +1,7 @@
 
 
 # StarPU --- Runtime system for heterogeneous multicore architectures.
 # StarPU --- Runtime system for heterogeneous multicore architectures.
 #
 #
-# Copyright (C) 2010-2013  Université de Bordeaux 1
+# Copyright (C) 2010-2014  Université de Bordeaux 1
 # Copyright (C) 2010, 2011, 2012, 2013  Centre National de la Recherche Scientifique
 # Copyright (C) 2010, 2011, 2012, 2013  Centre National de la Recherche Scientifique
 #
 #
 # StarPU is free software; you can redistribute it and/or modify
 # StarPU is free software; you can redistribute it and/or modify
@@ -30,7 +30,7 @@ define starpu-print-job
     printf "\tsubmitted:\t\t\t<%d>\n", $job->submitted
     printf "\tsubmitted:\t\t\t<%d>\n", $job->submitted
     printf "\tterminated:\t\t\t<%d>\n", $job->terminated
     printf "\tterminated:\t\t\t<%d>\n", $job->terminated
     printf "\tjob_id:\t\t\t\t<%d>\n", $job->job_id
     printf "\tjob_id:\t\t\t\t<%d>\n", $job->job_id
-    if $job->task
+    if $job->task && $job->task->name
         printf "\tname:\t\t\t\t<%s>\n", $job->task->name
         printf "\tname:\t\t\t\t<%s>\n", $job->task->name
     end
     end
   end
   end
@@ -71,7 +71,9 @@ define starpu-print-task
   end
   end
 
 
   printf "StarPU Task (%p)\n", $task
   printf "StarPU Task (%p)\n", $task
-  printf "\tname:\t\t\t\t<%s>\n", $task->name
+  if $task->name
+    printf "\tname:\t\t\t\t<%s>\n", $task->name
+  end
   printf "\tcodelet:\t\t\t<%p>\n", $task->cl
   printf "\tcodelet:\t\t\t<%p>\n", $task->cl
   printf "\tcallback:\t\t\t<%p>\n", $task->callback_func
   printf "\tcallback:\t\t\t<%p>\n", $task->callback_func
   printf "\tsynchronous:\t\t\t<%d>\n", $task->synchronous
   printf "\tsynchronous:\t\t\t<%d>\n", $task->synchronous
@@ -90,6 +92,32 @@ define starpu-print-task
   end
   end
 end
 end
 
 
+define starpu-print-task-and-successor
+  set language c
+  set $t = (struct starpu_task *) ($arg0)
+  starpu-print-task $t
+  set $j = (struct _starpu_job *) $t->starpu_private
+  set $nsuccs = $j->job_successors.nsuccs
+  set $i = 0
+  while $i < $nsuccs
+    set $cg = $j->job_successors.succ[$i]
+    if ($cg->cg_type == 1)
+      # STARPU_CG_APPS
+      printf "waited for by application"
+    end
+    if ($cg->cg_type == 2)
+      # STARPU_CG_TAG
+      printf "will produce tag %x\n", $cg->succ.tag
+    end
+    if ($cg->cg_type == 4)
+      # STARPU_CG_TASK
+      printf "dep of task %p\n", $cg->succ.job
+      starpu-print-task $cg->succ.job->task
+    end
+    set $i = $i + 1
+  end
+end
+
 document starpu-print-task
 document starpu-print-task
 Prints a StarPU task
 Prints a StarPU task
 end
 end
@@ -150,30 +178,18 @@ define starpu-tasks
   printf "Tasks being run:\n"
   printf "Tasks being run:\n"
   set $n = 0
   set $n = 0
   while $n < config.topology.nworkers
   while $n < config.topology.nworkers
+    printf "worker %d %s:\n", $n, config.workers[$n].short_name
+    if config.workers[$n].pipeline_length > 0
+      set $m = 0
+      while $m < config.workers[$n].ntasks
+        set $t = config.workers[$n].current_tasks[(config.workers[$n].first_task + $m) % (sizeof(config.workers[$n].current_tasks)/sizeof(config.workers[$n].current_tasks[0]))]
+        starpu-print-task-and-successor $t
+        set $m = $m + 1
+      end
+    end
     set $task = config.workers[$n].current_task
     set $task = config.workers[$n].current_task
     if ($task)
     if ($task)
-      printf "worker %d:\n", $n
-      starpu-print-task $task
-      set $j = (struct _starpu_job *) $task->starpu_private
-      set $nsuccs = $j->job_successors.nsuccs
-      set $i = 0
-      while $i < $nsuccs
-        set $cg = $j->job_successors.succ[$i]
-	if ($cg->cg_type == 1)
-	  # STARPU_CG_APPS
-	  printf "waited for by application"
-	end
-	if ($cg->cg_type == 2)
-	  # STARPU_CG_TAG
-	  printf "will produce tag %x\n", $cg->succ.tag
-	end
-	if ($cg->cg_type == 4)
-	  # STARPU_CG_TASK
-	  printf "dep of task %p\n", $cg->succ.job
-	  starpu-print-task $cg->succ.job->task
-	end
-        set $i = $i + 1
-      end
+      starpu-print-task-and-successor $task
     end
     end
     set $n = $n + 1
     set $n = $n + 1
   end
   end