Browse Source

Use asynchronous task input fetching for cuda and opencl too

Samuel Thibault 8 years ago
parent
commit
64115beec5

+ 3 - 0
ChangeLog

@@ -34,6 +34,9 @@ Small features:
 Changes:
   * Vastly improve simgrid simulation time.
 
+Small changes:
+  * Use asynchronous transfers for task data fetches with were not prefetched.
+
 StarPU 1.2.1 (svn revision xxx)
 ==============================================
 New features:

+ 52 - 21
src/drivers/cuda/driver_cuda.c

@@ -472,6 +472,7 @@ static int start_job_on_cuda(struct _starpu_job *j, struct _starpu_worker *worke
 	struct starpu_codelet *cl = task->cl;
 	STARPU_ASSERT(cl);
 
+	_starpu_set_local_worker_key(worker);
 	_starpu_set_current_task(task);
 
 	ret = _starpu_fetch_task_input(task, j, 0);
@@ -728,27 +729,65 @@ int _starpu_cuda_driver_run_once(struct _starpu_worker_set *worker_set)
 	struct _starpu_job *j;
 	int i, res;
 
-	int idle;
+	int idle_tasks, idle_transfers;
 
 #ifdef STARPU_SIMGRID
 	starpu_pthread_wait_reset(&worker0->wait);
 #endif
 
 	/* First poll for completed jobs */
-	idle = 0;
+	idle_tasks = 0;
+	idle_transfers = 0;
 	for (i = 0; i < (int) worker_set->nworkers; i++)
 	{
 		struct _starpu_worker *worker = &worker_set->workers[i];
 		int workerid = worker->workerid;
 
 		if (!worker->ntasks)
+			idle_tasks++;
+		if (!worker->task_transferring)
+			idle_transfers++;
+
+		if (!worker->ntasks && !worker->task_transferring)
 		{
-			idle++;
 			/* Even nothing to test */
 			continue;
 		}
 
+		/* First test for transfers pending for next task */
+		task = worker->task_transferring;
+		if (task && worker->nb_buffers_transferred == worker->nb_buffers_totransfer)
+		{
+			struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
+
+			_starpu_release_fetch_task_input_async(j, workerid, worker->nb_buffers_totransfer);
+			/* Reset it */
+			worker->task_transferring = NULL;
+
+			if (worker->ntasks > 1 && !(task->cl->cuda_flags[j->nimpl] & STARPU_CUDA_ASYNC))
+			{
+				/* We have to execute a non-asynchronous task but we
+				 * still have tasks in the pipeline...  Record it to
+				 * prevent more tasks from coming, and do it later */
+				worker->pipeline_stuck = 1;
+			}
+			else
+			{
+				_STARPU_TRACE_END_PROGRESS(memnode);
+				execute_job_on_cuda(task, worker);
+				_STARPU_TRACE_START_PROGRESS(memnode);
+			}
+		}
+
+		/* Then test for termination of queued tasks */
+		if (!worker->ntasks)
+			/* No queued task */
+			continue;
+
 		task = worker->current_tasks[worker->first_task];
+		if (task == worker->task_transferring)
+			/* Next task is still pending transfer */
+			continue;
 
 		/* On-going asynchronous task, check for its termination first */
 #ifdef STARPU_SIMGRID
@@ -767,7 +806,7 @@ int _starpu_cuda_driver_run_once(struct _starpu_worker_set *worker_set)
 			_starpu_set_local_worker_key(worker);
 			finish_job_on_cuda(_starpu_get_job_associated_to_task(task), worker);
 			/* See next task if any */
-			if (worker->ntasks)
+			if (worker->ntasks && worker->current_tasks[worker->first_task] != worker->task_transferring)
 			{
 				task = worker->current_tasks[worker->first_task];
 				j = _starpu_get_job_associated_to_task(task);
@@ -803,20 +842,20 @@ int _starpu_cuda_driver_run_once(struct _starpu_worker_set *worker_set)
 		}
 
 		if (!worker->pipeline_length || worker->ntasks < worker->pipeline_length)
-			idle++;
+			idle_tasks++;
 	}
 
 #if defined(STARPU_NON_BLOCKING_DRIVERS) && !defined(STARPU_SIMGRID)
-	if (!idle)
+	if (!idle_tasks)
 	{
-		/* Nothing ready yet, no better thing to do than waiting */
-		__starpu_datawizard_progress(1, 0);
+		/* No task ready yet, no better thing to do than waiting */
+		__starpu_datawizard_progress(1, !idle_transfers);
 		return 0;
 	}
 #endif
 
 	/* Something done, make some progress */
-	res = !idle;
+	res = !idle_tasks || !idle_transfers;
 	res |= __starpu_datawizard_progress(1, 1);
 
 	/* And pull tasks */
@@ -850,19 +889,11 @@ int _starpu_cuda_driver_run_once(struct _starpu_worker_set *worker_set)
 			continue;
 		}
 
-		if (worker->ntasks > 1 && !(task->cl->cuda_flags[j->nimpl] & STARPU_CUDA_ASYNC))
-		{
-			/* We have to execute a non-asynchronous task but we
-			 * still have tasks in the pipeline...  Record it to
-			 * prevent more tasks from coming, and do it later */
-			worker->pipeline_stuck = 1;
-			continue;
-		}
-
-		_starpu_set_local_worker_key(worker);
-
+		/* Fetch data asynchronously */
 		_STARPU_TRACE_END_PROGRESS(memnode);
-		execute_job_on_cuda(task, worker);
+		_starpu_set_local_worker_key(worker);
+		res = _starpu_fetch_task_input(task, j, 1);
+		STARPU_ASSERT(res == 0);
 		_STARPU_TRACE_START_PROGRESS(memnode);
 	}
 

+ 5 - 0
src/drivers/driver_common/driver_common.c

@@ -410,8 +410,13 @@ struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *worker, int w
 		/* This worker is executing something */
 		executing = 1;
 
+	/*if the worker is already executing a task then */
 	if (worker->pipeline_length && (worker->ntasks == worker->pipeline_length || worker->pipeline_stuck))
 		task = NULL;
+	/* don't push a task if we are already transferring one */
+	else if (worker->task_transferring != NULL)
+		task = NULL;
+	/*else try to pop a task*/
 	else
 		task = _starpu_pop_task(worker);
 

+ 42 - 21
src/drivers/opencl/driver_opencl.c

@@ -682,15 +682,43 @@ int _starpu_opencl_driver_run_once(struct _starpu_worker *worker)
 	struct starpu_task *task;
 	int res;
 
-	int idle;
+	int idle_tasks, idle_transfers;
 
 #ifdef STARPU_SIMGRID
 	starpu_pthread_wait_reset(&worker->wait);
 #endif
 
-	/* First poll for completed jobs */
-	idle = 0;
-	if (worker->ntasks)
+	idle_tasks = 0;
+	idle_transfers = 0;
+
+	/* First test for transfers pending for next task */
+	task = worker->task_transferring;
+	if (!task)
+		idle_transfers++;
+	if (task && worker->nb_buffers_transferred == worker->nb_buffers_totransfer)
+	{
+		struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
+
+		_starpu_release_fetch_task_input_async(j, workerid, worker->nb_buffers_totransfer);
+		/* Reset it */
+		worker->task_transferring = NULL;
+
+		if (worker->ntasks > 1 && !(task->cl->opencl_flags[j->nimpl] & STARPU_OPENCL_ASYNC))
+		{
+			/* We have to execute a non-asynchronous task but we
+			 * still have tasks in the pipeline...  Record it to
+			 * prevent more tasks from coming, and do it later */
+			worker->pipeline_stuck = 1;
+			return 0;
+		}
+
+		_STARPU_TRACE_END_PROGRESS(memnode);
+		_starpu_opencl_execute_job(task, worker);
+		_STARPU_TRACE_START_PROGRESS(memnode);
+	}
+
+	/* Then poll for completed jobs */
+	if (worker->ntasks && worker->current_tasks[worker->first_task] != worker->task_transferring)
 	{
 #ifndef STARPU_SIMGRID
 		size_t size;
@@ -706,8 +734,8 @@ int _starpu_opencl_driver_run_once(struct _starpu_worker *worker)
 #else /* !STARPU_SIMGRID */
 		cl_int status;
 		err = clGetEventInfo(task_events[worker->devid][worker->first_task], CL_EVENT_COMMAND_EXECUTION_STATUS, sizeof(cl_int), &status, &size);
-		STARPU_ASSERT(size == sizeof(cl_int));
 		if (STARPU_UNLIKELY(err != CL_SUCCESS)) STARPU_OPENCL_REPORT_ERROR(err);
+		STARPU_ASSERT(size == sizeof(cl_int));
 
 		if (status != CL_COMPLETE)
 #endif /* !STARPU_SIMGRID */
@@ -725,7 +753,7 @@ int _starpu_opencl_driver_run_once(struct _starpu_worker *worker)
 			/* Asynchronous task completed! */
 			_starpu_opencl_stop_job(_starpu_get_job_associated_to_task(task), worker);
 			/* See next task if any */
-			if (worker->ntasks)
+			if (worker->ntasks && worker->current_tasks[worker->first_task] != worker->task_transferring)
 			{
 				task = worker->current_tasks[worker->first_task];
 				j = _starpu_get_job_associated_to_task(task);
@@ -750,18 +778,18 @@ int _starpu_opencl_driver_run_once(struct _starpu_worker *worker)
 		}
 	}
 	if (!worker->pipeline_length || worker->ntasks < worker->pipeline_length)
-		idle++;
+		idle_tasks++;
 
 #if defined(STARPU_NON_BLOCKING_DRIVERS) && !defined(STARPU_SIMGRID)
-	if (!idle)
+	if (!idle_tasks)
 	{
-		/* Not ready yet, no better thing to do than waiting */
-		__starpu_datawizard_progress(1, 0);
+		/* No task ready yet, no better thing to do than waiting */
+		__starpu_datawizard_progress(1, !idle_transfers);
 		return 0;
 	}
 #endif
 
-	res = !idle;
+	res = !idle_tasks || !idle_transfers;
 	res |= __starpu_datawizard_progress(1, 1);
 
 	task = _starpu_get_worker_task(worker, workerid, memnode);
@@ -787,17 +815,10 @@ int _starpu_opencl_driver_run_once(struct _starpu_worker *worker)
 	worker->current_tasks[(worker->first_task  + worker->ntasks)%STARPU_MAX_PIPELINE] = task;
 	worker->ntasks++;
 
-	if (worker->ntasks > 1 && !(task->cl->opencl_flags[j->nimpl] & STARPU_OPENCL_ASYNC))
-	{
-		/* We have to execute a non-asynchronous task but we
-		 * still have tasks in the pipeline...  Record it to
-		 * prevent more tasks from coming, and do it later */
-		worker->pipeline_stuck = 1;
-		return 0;
-	}
-
+	/* Fetch data asynchronously */
 	_STARPU_TRACE_END_PROGRESS(memnode);
-	_starpu_opencl_execute_job(task, worker);
+	res = _starpu_fetch_task_input(task, j, 1);
+	STARPU_ASSERT(res == 0);
 	_STARPU_TRACE_START_PROGRESS(memnode);
 
 	return 0;