浏览代码

Add OpenCL kernel submission pipelining, to overlap costs

Samuel Thibault 11 年之前
父节点
当前提交
1f1954c43b

+ 2 - 2
ChangeLog

@@ -47,8 +47,8 @@ New features:
     CUDA and OpenCL kernel execution.
   * Add CUDA concurrent kernel execution support through
     the STARPU_NWORKER_PER_CUDA environment variable.
-  * Add CUDA kernel submission pipelining, to overlap costs and allow concurrent
-    kernel execution on Fermi cards.
+  * Add CUDA and OpenCL kernel submission pipelining, to overlap costs and allow
+    concurrent kernel execution on Fermi cards.
   * New locality work stealing scheduler (lws).
   * Add STARPU_VARIABLE_NBUFFERS to be set in cl.nbuffers, and nbuffers and
     modes field to the task structure, which permit to define codelets taking a

+ 10 - 0
doc/doxygen/chapters/40environment_variables.doxy

@@ -68,6 +68,16 @@ otherwise bring spurious synchronizations. The default is 2.
 OpenCL equivalent of the environment variable \ref STARPU_NCUDA.
 </dd>
 
+<dt>STARPU_OPENCL_PIPELINE</dt>
+<dd>
+\anchor STARPU_OPENCL_PIPELINE
+\addindex __env__STARPU_OPENCL_PIPELINE
+Specify how many asynchronous tasks are submitted in advance on OpenCL
+devices. This for instance permits to overlap task management with the execution
+of previous tasks, but it also allows concurrent execution on Fermi cards, which
+otherwise bring spurious synchronizations. The default is 2.
+</dd>
+
 <dt>STARPU_NMICDEVS</dt>
 <dd>
 \anchor STARPU_NMICDEVS

+ 5 - 1
src/drivers/driver_common/driver_common.c

@@ -217,6 +217,7 @@ struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *worker, int w
 	STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
 	struct starpu_task *task;
 	unsigned needed = 1;
+
 	_starpu_worker_set_status_scheduling(workerid);
 	while(needed)
 	{
@@ -267,7 +268,10 @@ struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *worker, int w
 		needed = !needed;
 	}
 
-	task = _starpu_pop_task(worker);
+	if (worker->pipeline_length && (worker->ntasks == worker->pipeline_length || worker->pipeline_stuck))
+		task = NULL;
+	else
+		task = _starpu_pop_task(worker);
 
 	if (task == NULL)
 	{

+ 55 - 13
src/drivers/opencl/driver_opencl.c

@@ -50,7 +50,7 @@ static cl_command_queue in_transfer_queues[STARPU_MAXOPENCLDEVS];
 static cl_command_queue out_transfer_queues[STARPU_MAXOPENCLDEVS];
 static cl_command_queue peer_transfer_queues[STARPU_MAXOPENCLDEVS];
 static cl_command_queue alloc_queues[STARPU_MAXOPENCLDEVS];
-static cl_event task_events[STARPU_MAXOPENCLDEVS];
+static cl_event task_events[STARPU_MAXOPENCLDEVS][STARPU_MAX_PIPELINE];
 #endif
 
 void
@@ -597,6 +597,8 @@ int _starpu_opencl_driver_init(struct _starpu_worker *worker)
 	snprintf(worker->name, sizeof(worker->name), "OpenCL %u (%s %.1f GiB)", devid, devname, size);
 	snprintf(worker->short_name, sizeof(worker->short_name), "OpenCL %u", devid);
 
+	worker->pipeline_length = starpu_get_env_number_default("STARPU_OPENCL_PIPELINE", 2);
+
 	_STARPU_DEBUG("OpenCL (%s) dev id %d thread is ready to run on CPU %d !\n", devname, devid, worker->bindid);
 
 	_STARPU_TRACE_WORKER_INIT_END(worker->workerid);
@@ -619,16 +621,17 @@ int _starpu_opencl_driver_run_once(struct _starpu_worker *worker)
 	struct starpu_task *task;
 
 #ifndef STARPU_SIMGRID
-	task = starpu_task_get_current();
-
-	if (task)
+	if (worker->ntasks)
 	{
 		cl_int status;
 		size_t size;
 		int err;
+
 		/* On-going asynchronous task, check for its termination first */
 
-		err = clGetEventInfo(task_events[worker->devid], CL_EVENT_COMMAND_EXECUTION_STATUS, sizeof(cl_int), &status, &size);
+		task = worker->current_tasks[worker->first_task];
+
+		err = clGetEventInfo(task_events[worker->devid][worker->first_task], CL_EVENT_COMMAND_EXECUTION_STATUS, sizeof(cl_int), &status, &size);
 		STARPU_ASSERT(size == sizeof(cl_int));
 		if (STARPU_UNLIKELY(err != CL_SUCCESS)) STARPU_OPENCL_REPORT_ERROR(err);
 
@@ -640,16 +643,38 @@ int _starpu_opencl_driver_run_once(struct _starpu_worker *worker)
 			return 0;
 		}
 
+		task_events[worker->devid][worker->first_task] = 0;
+
 		/* Asynchronous task completed! */
-		_STARPU_TRACE_END_EXECUTING();
 		_starpu_opencl_stop_job(_starpu_get_job_associated_to_task(task), worker);
+		/* See next task if any */
+		if (worker->ntasks)
+		{
+			task = worker->current_tasks[worker->first_task];
+			j = _starpu_get_job_associated_to_task(task);
+			if (task->cl->opencl_flags[j->nimpl] & STARPU_OPENCL_ASYNC)
+			{
+				/* An asynchronous task, it was already queued,
+				 * it's now running, record its start time.  */
+				_starpu_driver_start_job(worker, j, &worker->perf_arch, &j->cl_start, 0, starpu_profiling_status_get());
+			}
+			else
+			{
+				/* A synchronous task, we have finished flushing the pipeline, we can now at last execute it.  */
+				_STARPU_TRACE_END_PROGRESS(memnode);
+				_STARPU_TRACE_EVENT("sync_task");
+				_starpu_opencl_execute_job(task, worker);
+				_STARPU_TRACE_EVENT("end_sync_task");
+				_STARPU_TRACE_START_PROGRESS(memnode);
+				worker->pipeline_stuck = 0;
+			}
+		}
+		_STARPU_TRACE_END_EXECUTING();
 	}
 #endif /* STARPU_SIMGRID */
 
 	__starpu_datawizard_progress(memnode, 1, 1);
 
-	_STARPU_TRACE_END_PROGRESS(memnode);
-
 	task = _starpu_get_worker_task(worker, workerid, memnode);
 
 	if (task == NULL)
@@ -665,8 +690,20 @@ int _starpu_opencl_driver_run_once(struct _starpu_worker *worker)
 		return 0;
 	}
 
-	_starpu_opencl_execute_job(task, worker);
+	worker->current_tasks[(worker->first_task  + worker->ntasks)%STARPU_MAX_PIPELINE] = task;
+	worker->ntasks++;
+
+	if (worker->ntasks > 1 && !(task->cl->opencl_flags[j->nimpl] & STARPU_OPENCL_ASYNC))
+	{
+		/* We have to execute a non-asynchronous task but we
+		 * still have tasks in the pipeline...  Record it to
+		 * prevent more tasks from coming, and do it later */
+		worker->pipeline_stuck = 1;
+		return 0;
+	}
 
+	_STARPU_TRACE_END_PROGRESS(memnode);
+	_starpu_opencl_execute_job(task, worker);
 	_STARPU_TRACE_START_PROGRESS(memnode);
 
 	return 0;
@@ -772,7 +809,6 @@ static int _starpu_opencl_start_job(struct _starpu_job *j, struct _starpu_worker
 	STARPU_ASSERT(cl);
 
 	_starpu_set_current_task(j->task);
-	worker->current_task = j->task;
 
 	ret = _starpu_fetch_task_input(j);
 	if (ret != 0)
@@ -783,7 +819,11 @@ static int _starpu_opencl_start_job(struct _starpu_job *j, struct _starpu_worker
 		return -EAGAIN;
 	}
 
-	_starpu_driver_start_job(worker, j, &worker->perf_arch, &j->cl_start, 0, profiling);
+	if (worker->ntasks == 1)
+	{
+		/* We are alone in the pipeline, the kernel will start now, record it */
+		_starpu_driver_start_job(worker, j, &worker->perf_arch, &j->cl_start, 0, profiling);
+	}
 
 	starpu_opencl_func_t func = _starpu_task_get_opencl_nth_implementation(cl, j->nimpl);
 	STARPU_ASSERT_MSG(func, "when STARPU_OPENCL is defined in 'where', opencl_func or opencl_funcs has to be defined");
@@ -821,7 +861,9 @@ static void _starpu_opencl_stop_job(struct _starpu_job *j, struct _starpu_worker
 	int profiling = starpu_profiling_status_get();
 
 	_starpu_set_current_task(NULL);
-	worker->current_task = NULL;
+	worker->current_tasks[worker->first_task] = NULL;
+	worker->first_task = (worker->first_task + 1) % STARPU_MAX_PIPELINE;
+	worker->ntasks--;
 
 	_starpu_driver_end_job(worker, j, &worker->perf_arch, &codelet_end, 0, profiling);
 
@@ -870,7 +912,7 @@ static void _starpu_opencl_execute_job(struct starpu_task *task, struct _starpu_
 		 * 2 macros detect the function availability in the
 		 * ICD and not in the device implementation.
 		 */
-		err = clEnqueueMarker(queue, &task_events[worker->devid]);
+		err = clEnqueueMarker(queue, &task_events[worker->devid][(worker->first_task + worker->ntasks - 1)%STARPU_MAX_PIPELINE]);
 		if (STARPU_UNLIKELY(err != CL_SUCCESS)) STARPU_OPENCL_REPORT_ERROR(err);
 		_STARPU_TRACE_START_EXECUTING();
 	}