浏览代码

Support pipeline mode in simgrid too. The results match the reality!

Samuel Thibault 10 年之前
父节点
当前提交
113cdbb1a3

+ 1 - 0
include/starpu_scheduler.h

@@ -53,6 +53,7 @@ void starpu_worker_get_sched_condition(int workerid, starpu_pthread_mutex_t **sc
 
 /* This function must be called to wake up a worker that is sleeping on the cond. 
  * It returns 0 whenever the worker is not in a sleeping state */
+int starpu_wake_worker(int workerid);
 int starpu_wakeup_worker(int workerid, starpu_pthread_cond_t *cond, starpu_pthread_mutex_t *mutex);
 
 int starpu_worker_can_execute_task(unsigned workerid, struct starpu_task *task, unsigned nimpl);

+ 92 - 5
src/core/simgrid.c

@@ -269,10 +269,65 @@ void _starpu_simgrid_init()
  * Tasks
  */
 
+struct task {
+	msg_task_t task;
+	int workerid;
+
+	/* communication termination signalization */
+	unsigned *finished;
+	starpu_pthread_mutex_t *mutex;
+	starpu_pthread_cond_t *cond;
+
+	/* Task which waits for this task */
+	struct task *next;
+};
+
+static struct task *last_task[STARPU_NMAXWORKERS];
+
+/* Actually execute the task.  */
+static int task_execute(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[] STARPU_ATTRIBUTE_UNUSED)
+{
+	struct task *task = MSG_process_get_data(MSG_process_self());
+	_STARPU_DEBUG("task %p started\n", transfer);
+	MSG_task_execute(task->task);
+	MSG_task_destroy(task->task);
+	_STARPU_DEBUG("task %p finished\n", transfer);
+	STARPU_PTHREAD_MUTEX_LOCK(task->mutex);
+	*task->finished = 1;
+	STARPU_PTHREAD_COND_BROADCAST(task->cond);
+	STARPU_PTHREAD_MUTEX_UNLOCK(task->mutex);
+
+	/* The worker which started this task may be sleeping out of tasks, wake it  */
+	starpu_wake_worker(task->workerid);
+
+	if (last_task[task->workerid] == task)
+		last_task[task->workerid] = NULL;
+	if (task->next)
+		MSG_process_create("task", task_execute, task->next, MSG_host_self());
+	free(task);
+	return 0;
+}
+
+/* Wait for completion of all asynchronous tasks for this worker */
+void _starpu_simgrid_wait_tasks(int workerid)
+{
+	struct task *task = last_task[workerid];
+	if (!task)
+		return;
+
+	unsigned *finished = task->finished;
+	starpu_pthread_mutex_t *mutex = task->mutex;
+	starpu_pthread_cond_t *cond = task->cond;
+	STARPU_PTHREAD_MUTEX_LOCK(mutex);
+	while (!*finished)
+		STARPU_PTHREAD_COND_WAIT(cond, mutex);
+	STARPU_PTHREAD_MUTEX_UNLOCK(mutex);
+}
+
 /* Task execution submitted by StarPU */
-void _starpu_simgrid_execute_job(struct _starpu_job *j, struct starpu_perfmodel_arch* perf_arch, double length)
+void _starpu_simgrid_submit_job(int workerid, struct _starpu_job *j, struct starpu_perfmodel_arch* perf_arch, double length, unsigned *finished, starpu_pthread_mutex_t *mutex, starpu_pthread_cond_t *cond)
 {
-	struct starpu_task *task = j->task;
+	struct starpu_task *starpu_task = j->task;
 	msg_task_t simgrid_task;
 
 	if (j->internal)
@@ -282,7 +337,7 @@ void _starpu_simgrid_execute_job(struct _starpu_job *j, struct starpu_perfmodel_
 
 	if (isnan(length))
 	{
-		length = starpu_task_expected_length(task, perf_arch, j->nimpl);
+		length = starpu_task_expected_length(starpu_task, perf_arch, j->nimpl);
 		STARPU_ASSERT_MSG(!_STARPU_IS_ZERO(length) && !isnan(length),
 				"Codelet %s does not have a perfmodel, or is not calibrated enough, please re-run in non-simgrid mode until it is calibrated",
 			_starpu_job_get_model_name(j));
@@ -291,8 +346,40 @@ void _starpu_simgrid_execute_job(struct _starpu_job *j, struct starpu_perfmodel_
 	simgrid_task = MSG_task_create(_starpu_job_get_model_name(j),
 			length/1000000.0*MSG_get_host_speed(MSG_host_self()),
 			0, NULL);
-	MSG_task_execute(simgrid_task);
-	MSG_task_destroy(simgrid_task);
+
+	if (finished == NULL)
+	{
+		/* Synchronous execution */
+		/* First wait for previous tasks */
+		_starpu_simgrid_wait_tasks(workerid);
+		MSG_task_execute(simgrid_task);
+		MSG_task_destroy(simgrid_task);
+	}
+	else
+	{
+		/* Asynchronous execution */
+		struct task *task = malloc(sizeof(*task));
+		task->task = simgrid_task;
+		task->workerid = workerid;
+		task->finished = finished;
+		*finished = 0;
+		task->mutex = mutex;
+		task->cond = cond;
+		task->next = NULL;
+		/* Sleep 10µs for the GPU task queueing */
+		MSG_process_sleep(0.000010);
+		if (last_task[workerid])
+		{
+			/* Make this task depend on the previous */
+			last_task[workerid]->next = task;
+			last_task[workerid] = task;
+		}
+		else
+		{
+			last_task[workerid] = task;
+			MSG_process_create("task", task_execute, task, MSG_host_self());
+		}
+	}
 }
 
 /*

+ 2 - 1
src/core/simgrid.h

@@ -31,7 +31,8 @@ struct _starpu_pthread_args
 #define MAX_TSD 16
 
 void _starpu_simgrid_init(void);
-void _starpu_simgrid_execute_job(struct _starpu_job *job, struct starpu_perfmodel_arch* perf_arch, double length);
+void _starpu_simgrid_wait_tasks(int workerid);
+void _starpu_simgrid_submit_job(int workerid, struct _starpu_job *job, struct starpu_perfmodel_arch* perf_arch, double length, unsigned *finished, starpu_pthread_mutex_t *mutex, starpu_pthread_cond_t *cond);
 int _starpu_simgrid_transfer(size_t size, unsigned src_node, unsigned dst_node, struct _starpu_data_request *req);
 /* Return the number of hosts prefixed by PREFIX */
 int _starpu_simgrid_get_nbhosts(const char *prefix);

+ 8 - 0
src/core/topology.c

@@ -825,6 +825,14 @@ _starpu_init_machine_config (struct _starpu_machine_config *config, int no_mp_co
 
 	STARPU_ASSERT_MSG(nworker_per_cuda > 0, "STARPU_NWORKER_PER_CUDA has to be > 0");
 
+#ifndef STARPU_NON_BLOCKING_DRIVERS
+	if (nworker_per_cuda > 1)
+	{
+		_STARPU_DISP("Warning: reducing STARPU_NWORKER_PER_CUDA to 1 because blocking drivers are enabled\n");
+		nworker_per_cuda = 1;
+	}
+#endif
+
 	if (ncuda != 0)
 	{
 		/* The user did not disable CUDA. We need to initialize CUDA

+ 15 - 0
src/core/workers.c

@@ -669,6 +669,13 @@ static void _starpu_launch_drivers(struct _starpu_machine_config *pconfig)
 					break;
 
 				cuda_worker_set[devid].nworkers = starpu_get_env_number_default("STARPU_NWORKER_PER_CUDA", 1);
+#ifndef STARPU_NON_BLOCKING_DRIVERS
+				if (cuda_worker_set[devid].nworkers > 1)
+				{
+					_STARPU_DISP("Warning: reducing STARPU_NWORKER_PER_CUDA to 1 because blocking drivers are enabled\n");
+					cuda_worker_set[devid].nworkers = 1;
+				}
+#endif
 				cuda_worker_set[devid].workers = workerarg;
 				cuda_worker_set[devid].set_is_initialized = 0;
 
@@ -1807,6 +1814,14 @@ int starpu_wakeup_worker(int workerid, starpu_pthread_cond_t *cond, starpu_pthre
 	return success;
 }
 
+int starpu_wake_worker(int workerid)
+{
+	starpu_pthread_mutex_t *sched_mutex;
+	starpu_pthread_cond_t *sched_cond;
+	starpu_worker_get_sched_condition(workerid, &sched_mutex, &sched_cond);
+	return starpu_wakeup_worker(workerid, sched_cond, sched_mutex);
+}
+
 
 int starpu_worker_get_nids_by_type(enum starpu_worker_archtype type, int *workerids, int maxsize)
 {

+ 2 - 2
src/drivers/cpu/driver_cpu.c

@@ -104,7 +104,7 @@ static int execute_job_on_cpu(struct _starpu_job *j, struct starpu_task *worker_
 		{
 			_STARPU_TRACE_START_EXECUTING();
 #ifdef STARPU_SIMGRID
-			_starpu_simgrid_execute_job(j, perf_arch, NAN);
+			_starpu_simgrid_submit_job(cpu_args->workerid, j, perf_arch, NAN, NULL, NULL, NULL);
 #else
 			func(_STARPU_TASK_GET_INTERFACES(task), task->cl_arg);
 #endif
@@ -135,7 +135,7 @@ static int execute_job_on_cpu(struct _starpu_job *j, struct starpu_task *worker_
 	return 0;
 }
 
-static size_t _starpu_cpu_get_global_mem_size(int nodeid STARPU_ATTRIBUTE_UNUSED, struct _starpu_machine_config *config)
+static size_t _starpu_cpu_get_global_mem_size(int nodeid STARPU_ATTRIBUTE_UNUSED, struct _starpu_machine_config *config STARPU_ATTRIBUTE_UNUSED)
 {
 	size_t global_mem;
 	starpu_ssize_t limit;

+ 90 - 24
src/drivers/cuda/driver_cuda.c

@@ -49,8 +49,15 @@ static cudaStream_t in_transfer_streams[STARPU_MAXCUDADEVS];
 static cudaStream_t in_peer_transfer_streams[STARPU_MAXCUDADEVS][STARPU_MAXCUDADEVS];
 static cudaStream_t out_peer_transfer_streams[STARPU_MAXCUDADEVS][STARPU_MAXCUDADEVS];
 static struct cudaDeviceProp props[STARPU_MAXCUDADEVS];
+#ifndef STARPU_SIMGRID
 static cudaEvent_t task_events[STARPU_NMAXWORKERS][STARPU_MAX_PIPELINE];
+#endif
 #endif /* STARPU_USE_CUDA */
+#ifdef STARPU_SIMGRID
+static unsigned task_finished[STARPU_NMAXWORKERS][STARPU_MAX_PIPELINE];
+static starpu_pthread_mutex_t task_mutex[STARPU_NMAXWORKERS][STARPU_MAX_PIPELINE];
+static starpu_pthread_cond_t task_cond[STARPU_NMAXWORKERS][STARPU_MAX_PIPELINE];
+#endif /* STARPU_SIMGRID */
 
 void
 _starpu_cuda_discover_devices (struct _starpu_machine_config *config)
@@ -216,13 +223,25 @@ done:
 #endif
 }
 
-#ifndef STARPU_SIMGRID
-static void init_context(struct _starpu_worker_set *worker_set, unsigned devid)
+static void init_context(struct _starpu_worker_set *worker_set, unsigned devid STARPU_ATTRIBUTE_UNUSED)
 {
-	cudaError_t cures;
 	int workerid;
 	unsigned i, j;
 
+#ifdef STARPU_SIMGRID
+	for (i = 0; i < worker_set->nworkers; i++)
+	{
+		workerid = worker_set->workers[i].workerid;
+		for (j = 0; j < STARPU_MAX_PIPELINE; j++)
+		{
+			task_finished[workerid][j] = 0;
+			STARPU_PTHREAD_MUTEX_INIT(&task_mutex[workerid][j], NULL);
+			STARPU_PTHREAD_COND_INIT(&task_cond[workerid][j], NULL);
+		}
+	}
+#else /* !SIMGRID */
+	cudaError_t cures;
+
 	/* TODO: cudaSetDeviceFlag(cudaDeviceMapHost) */
 
 	starpu_cuda_set_device(devid);
@@ -303,13 +322,27 @@ static void init_context(struct _starpu_worker_set *worker_set, unsigned devid)
 		if (STARPU_UNLIKELY(cures))
 			STARPU_CUDA_REPORT_ERROR(cures);
 	}
+#endif /* !SIMGRID */
 }
 
 static void deinit_context(struct _starpu_worker_set *worker_set)
 {
-	cudaError_t cures;
 	unsigned i, j;
-	int workerid = worker_set->workers[0].workerid;
+	int workerid;
+
+#ifdef STARPU_SIMGRID
+	for (i = 0; i < worker_set->nworkers; i++)
+	{
+		workerid = worker_set->workers[i].workerid;
+		for (j = 0; j < STARPU_MAX_PIPELINE; j++)
+		{
+			STARPU_PTHREAD_MUTEX_DESTROY(&task_mutex[workerid][j]);
+			STARPU_PTHREAD_COND_DESTROY(&task_cond[workerid][j]);
+		}
+	}
+#else /* !STARPU_SIMGRID */
+	cudaError_t cures;
+	workerid = worker_set->workers[0].workerid;
 	int devid = starpu_worker_get_devid(workerid);
 
 	for (i = 0; i < worker_set->nworkers; i++)
@@ -345,8 +378,8 @@ static void deinit_context(struct _starpu_worker_set *worker_set)
 #endif /* STARPU_OPENMP */
 	)
 		STARPU_CUDA_REPORT_ERROR(cures);
-}
 #endif /* !SIMGRID */
+}
 
 static size_t _starpu_cuda_get_global_mem_size(unsigned devid)
 {
@@ -384,7 +417,7 @@ void _starpu_init_cuda(void)
 	STARPU_ASSERT(ncudagpus <= STARPU_MAXCUDADEVS);
 }
 
-static int start_job_on_cuda(struct _starpu_job *j, struct _starpu_worker *worker)
+static int start_job_on_cuda(struct _starpu_job *j, struct _starpu_worker *worker, unsigned char pipeline_idx STARPU_ATTRIBUTE_UNUSED)
 {
 	int ret;
 
@@ -426,7 +459,12 @@ static int start_job_on_cuda(struct _starpu_job *j, struct _starpu_worker *worke
 	{
 		_STARPU_TRACE_START_EXECUTING();
 #ifdef STARPU_SIMGRID
-		_starpu_simgrid_execute_job(j, &worker->perf_arch, NAN);
+		int async = task->cl->cuda_flags[j->nimpl] & STARPU_CUDA_ASYNC;
+		unsigned workerid = worker->workerid;
+		_starpu_simgrid_submit_job(workerid, j, &worker->perf_arch, NAN,
+				async ? &task_finished[workerid][pipeline_idx] : NULL,
+				async ? &task_mutex[workerid][pipeline_idx] : NULL,
+				async ? &task_cond[workerid][pipeline_idx] : NULL);
 #else
 		func(_STARPU_TASK_GET_INTERFACES(task), task->cl_arg);
 #endif
@@ -443,7 +481,10 @@ static void finish_job_on_cuda(struct _starpu_job *j, struct _starpu_worker *wor
 	int profiling = starpu_profiling_status_get();
 
 	_starpu_set_current_task(NULL);
-	worker->current_tasks[worker->first_task] = NULL;
+	if (worker->pipeline_length)
+		worker->current_tasks[worker->first_task] = NULL;
+	else
+		worker->current_task = NULL;
 	worker->first_task = (worker->first_task + 1) % STARPU_MAX_PIPELINE;
 	worker->ntasks--;
 
@@ -464,7 +505,9 @@ static void execute_job_on_cuda(struct starpu_task *task, struct _starpu_worker
 
 	struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
 
-	res = start_job_on_cuda(j, worker);
+	unsigned char pipeline_idx = (worker->first_task + worker->ntasks - 1)%STARPU_MAX_PIPELINE;
+
+	res = start_job_on_cuda(j, worker, pipeline_idx);
 
 	if (res)
 	{
@@ -479,19 +522,24 @@ static void execute_job_on_cuda(struct starpu_task *task, struct _starpu_worker
 		}
 	}
 
-#ifndef STARPU_SIMGRID
 	if (task->cl->cuda_flags[j->nimpl] & STARPU_CUDA_ASYNC)
 	{
 		if (worker->pipeline_length == 0)
 		{
+#ifdef STARPU_SIMGRID
+			_starpu_simgrid_wait_tasks(workerid);
+#else
 			/* Forced synchronous execution */
 			cudaStreamSynchronize(starpu_cuda_get_local_stream());
+#endif
 			finish_job_on_cuda(j, worker);
 		}
 		else
 		{
+#ifndef STARPU_SIMGRID
 			/* Record event to synchronize with task termination later */
-			cudaEventRecord(task_events[workerid][(worker->first_task + worker->ntasks - 1)%STARPU_MAX_PIPELINE], starpu_cuda_get_local_stream());
+			cudaEventRecord(task_events[workerid][pipeline_idx], starpu_cuda_get_local_stream());
+#endif
 #ifdef STARPU_USE_FXT
 			int k;
 			for (k = 0; k < (int) worker->set->nworkers; k++)
@@ -504,11 +552,6 @@ static void execute_job_on_cuda(struct starpu_task *task, struct _starpu_worker
 		}
 	}
 	else
-#else
-#ifdef STARPU_DEVEL
-#warning No CUDA asynchronous execution with simgrid yet.
-#endif
-#endif
 	/* Synchronous execution */
 	{
 #if defined(STARPU_DEBUG) && !defined(STARPU_SIMGRID)
@@ -536,12 +579,14 @@ int _starpu_cuda_driver_init(struct _starpu_worker_set *worker_set)
 	}
 #endif
 
-#ifndef STARPU_SIMGRID
 	init_context(worker_set, devid);
 
+#ifdef STARPU_SIMGRID
+	STARPU_ASSERT_MSG (worker_set->nworkers = 1, "Simgrid mode does not support concurrent kernel execution yet\n");
+#else /* !STARPU_SIMGRID */
 	if (worker_set->nworkers > 1 && props[devid].concurrentKernels == 0)
 		_STARPU_DISP("Warning: STARPU_NWORKER_PER_CUDA is %u, but the device does not support concurrent kernel execution!\n", worker_set->nworkers);
-#endif
+#endif /* !STARPU_SIMGRID */
 
 	_starpu_cuda_limit_gpu_mem_if_needed(devid);
 	_starpu_memory_manager_set_global_memory_size(worker0->memory_node, _starpu_cuda_get_global_mem_size(devid));
@@ -582,6 +627,25 @@ int _starpu_cuda_driver_init(struct _starpu_worker_set *worker_set)
 			_STARPU_DISP("Warning: STARPU_CUDA_PIPELINE is %u, but STARPU_MAX_PIPELINE is only %u", worker->pipeline_length, STARPU_MAX_PIPELINE);
 			worker->pipeline_length = STARPU_MAX_PIPELINE;
 		}
+#if defined(STARPU_SIMGRID) && defined(STARPU_NON_BLOCKING_DRIVERS)
+		if (worker->pipeline_length >= 1)
+		{
+			/* We need blocking drivers, otherwise idle drivers
+			 * would keep consuming real CPU time while just
+			 * polling for task termination */
+			_STARPU_DISP("Warning: reducing STARPU_CUDA_PIPELINE to 0 because simgrid is enabled and blocking drivers are not enabled\n");
+			worker->pipeline_length = 0;
+		}
+#endif
+#if !defined(STARPU_SIMGRID) && !defined(STARPU_NON_BLOCKING_DRIVERS)
+		if (worker->pipeline_length >= 1)
+		{
+			/* We need non-blocking drivers, to poll for CUDA task
+			 * termination */
+			_STARPU_DISP("Warning: reducing STARPU_CUDA_PIPELINE to 0 because blocking drivers are not enabled (and simgrid is not enabled)\n");
+			worker->pipeline_length = 0;
+		}
+#endif
 		_STARPU_TRACE_WORKER_INIT_END(worker_set->workers[i].workerid);
 	}
 
@@ -609,7 +673,6 @@ int _starpu_cuda_driver_run_once(struct _starpu_worker_set *worker_set)
 	struct _starpu_job *j;
 	int i, res;
 
-#ifndef STARPU_SIMGRID
 	int idle;
 
 	/* First poll for completed jobs */
@@ -629,6 +692,9 @@ int _starpu_cuda_driver_run_once(struct _starpu_worker_set *worker_set)
 		task = worker->current_tasks[worker->first_task];
 
 		/* On-going asynchronous task, check for its termination first */
+#ifdef STARPU_SIMGRID
+		if (task_finished[workerid][worker->first_task])
+#else /* !STARPU_SIMGRID */
 		cudaError_t cures = cudaEventQuery(task_events[workerid][worker->first_task]);
 
 		if (cures != cudaSuccess)
@@ -636,6 +702,7 @@ int _starpu_cuda_driver_run_once(struct _starpu_worker_set *worker_set)
 			STARPU_ASSERT(cures == cudaErrorNotReady);
 		}
 		else
+#endif /* !STARPU_SIMGRID */
 		{
 			/* Asynchronous task completed! */
 			_starpu_set_local_worker_key(worker);
@@ -680,19 +747,20 @@ int _starpu_cuda_driver_run_once(struct _starpu_worker_set *worker_set)
 			idle++;
 	}
 
+#ifdef STARPU_NON_BLOCKING_DRIVERS
 	if (!idle)
 	{
 		/* Nothing ready yet, no better thing to do than waiting */
 		__starpu_datawizard_progress(memnode, 1, 0);
 		return 0;
 	}
-#endif /* STARPU_SIMGRID */
+#endif
 
 	/* Something done, make some progress */
 	__starpu_datawizard_progress(memnode, 1, 1);
 
 	/* And pull tasks */
-	res = _starpu_get_multi_worker_task(worker_set->workers, tasks, worker_set->nworkers);
+	res = _starpu_get_multi_worker_task(worker_set->workers, tasks, worker_set->nworkers, memnode);
 
 	if (!res)
 		return 0;
@@ -751,9 +819,7 @@ int _starpu_cuda_driver_deinit(struct _starpu_worker_set *arg)
 
 	_starpu_malloc_shutdown(memnode);
 
-#ifndef STARPU_SIMGRID
 	deinit_context(arg);
-#endif
 
 	_STARPU_TRACE_WORKER_DEINIT_END(_STARPU_FUT_CUDA_KEY);
 

+ 68 - 4
src/drivers/driver_common/driver_common.c

@@ -388,13 +388,18 @@ struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *worker, int w
 }
 
 
-int _starpu_get_multi_worker_task(struct _starpu_worker *workers, struct starpu_task ** tasks, int nworkers)
+int _starpu_get_multi_worker_task(struct _starpu_worker *workers, struct starpu_task ** tasks, int nworkers, unsigned memnode STARPU_ATTRIBUTE_UNUSED)
 {
 	int i, count = 0;
 	struct _starpu_job * j;
 	int is_parallel_task;
 	struct _starpu_combined_worker *combined_worker;
 	/*for each worker*/
+#ifndef STARPU_NON_BLOCKING_DRIVERS
+	/* This assumes only 1 worker */
+	STARPU_ASSERT_MSG(nworkers == 1, "Multiple workers is not yet possible in block drivers mode\n");
+	STARPU_PTHREAD_MUTEX_LOCK(&workers[0].sched_mutex);
+#endif
 	for (i = 0; i < nworkers; i++)
 	{
 		/*if the worker is already executing a task then */
@@ -408,7 +413,9 @@ int _starpu_get_multi_worker_task(struct _starpu_worker *workers, struct starpu_
 		/*else try to pop a task*/
 		else
 		{
+#ifdef STARPU_NON_BLOCKING_DRIVERS
 			STARPU_PTHREAD_MUTEX_LOCK(&workers[i].sched_mutex);
+#endif
 			_starpu_worker_set_status_scheduling(workers[i].workerid);
 			_starpu_set_local_worker_key(&workers[i]);
 			tasks[i] = _starpu_pop_task(&workers[i]);
@@ -416,18 +423,18 @@ int _starpu_get_multi_worker_task(struct _starpu_worker *workers, struct starpu_
 			{
 				_starpu_worker_set_status_scheduling_done(workers[i].workerid);
 				_starpu_worker_set_status_wakeup(workers[i].workerid);
+#ifdef STARPU_NON_BLOCKING_DRIVERS
 				STARPU_PTHREAD_MUTEX_UNLOCK(&workers[i].sched_mutex);
+#endif
 
 				count ++;
 				j = _starpu_get_job_associated_to_task(tasks[i]);
 				is_parallel_task = (j->task_size > 1);
 				if (workers[i].pipeline_length)
-				{
 					workers[i].current_tasks[(workers[i].first_task + workers[i].ntasks)%STARPU_MAX_PIPELINE] = tasks[i];
-					workers[i].ntasks++;
-				}
 				else
 					workers[i].current_task = j->task;
+				workers[i].ntasks++;
 				/* Get the rank in case it is a parallel task */
 				if (is_parallel_task)
 				{
@@ -446,14 +453,71 @@ int _starpu_get_multi_worker_task(struct _starpu_worker *workers, struct starpu_
 					workers[i].worker_size = 1;
 					workers[i].current_rank = 0;
 				}
+#ifdef HAVE_AYUDAME_H
+				if (AYU_event)
+				{
+					intptr_t id = workers[i].workerid;
+					AYU_event(AYU_PRERUNTASK, _starpu_get_job_associated_to_task(tasks[i])->job_id, &id);
+				}
+#endif
 			}
 			else
 			{
 				_starpu_worker_set_status_sleeping(workers[i].workerid);
+#ifdef STARPU_NON_BLOCKING_DRIVERS
 				STARPU_PTHREAD_MUTEX_UNLOCK(&workers[i].sched_mutex);
+#endif
 			}
 		}
 	}
+
+#ifndef STARPU_NON_BLOCKING_DRIVERS
+	/* Block the assumed-to-be-only worker */
+	struct _starpu_worker *worker = &workers[0];
+	unsigned workerid = workers[0].workerid;
+
+	if (!count) {
+		/* Note: we need to keep the sched condition mutex all along the path
+		 * from popping a task from the scheduler to blocking. Otherwise the
+		 * driver may go block just after the scheduler got a new task to be
+		 * executed, and thus hanging. */
+		_starpu_worker_set_status_sleeping(workerid);
+
+		if (_starpu_worker_can_block(memnode, worker)
+#ifndef STARPU_SIMGRID
+				&& !_starpu_sched_ctx_last_worker_awake(worker)
+#endif
+				)
+		{
+			STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
+			STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
+		}
+		else
+		{
+			STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
+			if (_starpu_machine_is_running())
+			{
+				_starpu_exponential_backoff(worker);
+#ifdef STARPU_SIMGRID
+				static int warned;
+				if (!warned)
+				{
+					warned = 1;
+					_STARPU_DISP("Has to make simgrid spin for CPU idle time.  You can try to pass --enable-blocking-drivers to ./configure to avoid this\n");
+				}
+				MSG_process_sleep(0.000010);
+#endif
+			}
+		}
+		return 0;
+	}
+
+	_starpu_worker_set_status_wakeup(workerid);
+	worker->spinning_backoff = BACKOFF_MIN;
+
+	STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
+#endif /* !STARPU_NON_BLOCKING_DRIVERS */
+
 	return count;
 }
 

+ 1 - 1
src/drivers/driver_common/driver_common.h

@@ -32,5 +32,5 @@ void _starpu_driver_update_job_feedback(struct _starpu_job *j, struct _starpu_wo
 					struct timespec *codelet_start, struct timespec *codelet_end, int profiling);
 
 struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *args, int workerid, unsigned memnode);
-int _starpu_get_multi_worker_task(struct _starpu_worker *workers, struct starpu_task ** tasks, int nworker);
+int _starpu_get_multi_worker_task(struct _starpu_worker *workers, struct starpu_task ** tasks, int nworker, unsigned memnode);
 #endif // __DRIVER_COMMON_H__

+ 1 - 1
src/drivers/mp_common/source_common.c

@@ -697,7 +697,7 @@ void _starpu_src_common_worker(struct _starpu_worker_set * worker_set,
 			_starpu_src_common_recv_async(mp_node);
 
 		/* get task for each worker*/
-		res = _starpu_get_multi_worker_task(worker_set->workers, tasks, worker_set->nworkers);
+		res = _starpu_get_multi_worker_task(worker_set->workers, tasks, worker_set->nworkers, memnode);
 
 		/*if at least one worker have pop a task*/
 		if(res != 0)

+ 107 - 51
src/drivers/opencl/driver_opencl.c

@@ -49,9 +49,16 @@ static cl_command_queue queues[STARPU_MAXOPENCLDEVS];
 static cl_command_queue in_transfer_queues[STARPU_MAXOPENCLDEVS];
 static cl_command_queue out_transfer_queues[STARPU_MAXOPENCLDEVS];
 static cl_command_queue peer_transfer_queues[STARPU_MAXOPENCLDEVS];
+#ifndef STARPU_SIMGRID
 static cl_command_queue alloc_queues[STARPU_MAXOPENCLDEVS];
 static cl_event task_events[STARPU_MAXOPENCLDEVS][STARPU_MAX_PIPELINE];
+#endif /* !STARPU_SIMGRID */
 #endif
+#ifdef STARPU_SIMGRID
+static unsigned task_finished[STARPU_MAXOPENCLDEVS][STARPU_MAX_PIPELINE];
+static starpu_pthread_mutex_t task_mutex[STARPU_MAXOPENCLDEVS][STARPU_MAX_PIPELINE];
+static starpu_pthread_cond_t task_cond[STARPU_MAXOPENCLDEVS][STARPU_MAX_PIPELINE];
+#endif /* STARPU_SIMGRID */
 
 void
 _starpu_opencl_discover_devices(struct _starpu_machine_config *config)
@@ -137,9 +144,17 @@ void starpu_opencl_get_current_context(cl_context *context)
         *context = contexts[worker->devid];
 }
 
-#ifndef STARPU_SIMGRID
-cl_int _starpu_opencl_init_context(int devid)
+int _starpu_opencl_init_context(int devid)
 {
+#ifdef STARPU_SIMGRID
+	int j;
+	for (j = 0; j < STARPU_MAX_PIPELINE; j++)
+	{
+		task_finished[devid][j] = 0;
+		STARPU_PTHREAD_MUTEX_INIT(&task_mutex[devid][j], NULL);
+		STARPU_PTHREAD_COND_INIT(&task_cond[devid][j], NULL);
+	}
+#else /* !STARPU_SIMGRID */
 	cl_int err;
 	cl_uint uint;
 
@@ -178,12 +193,21 @@ cl_int _starpu_opencl_init_context(int devid)
         if (STARPU_UNLIKELY(err != CL_SUCCESS)) STARPU_OPENCL_REPORT_ERROR(err);
 
 	STARPU_PTHREAD_MUTEX_UNLOCK(&big_lock);
-
-	return CL_SUCCESS;
+#endif /* !STARPU_SIMGRID */
+	return 0;
 }
 
-cl_int _starpu_opencl_deinit_context(int devid)
+int _starpu_opencl_deinit_context(int devid)
 {
+#ifdef STARPU_SIMGRID
+	int j;
+	for (j = 0; j < STARPU_MAX_PIPELINE; j++)
+	{
+		task_finished[devid][j] = 0;
+		STARPU_PTHREAD_MUTEX_DESTROY(&task_mutex[devid][j]);
+		STARPU_PTHREAD_COND_DESTROY(&task_cond[devid][j]);
+	}
+#else /* !STARPU_SIMGRID */
         cl_int err;
 
 	STARPU_PTHREAD_MUTEX_LOCK(&big_lock);
@@ -209,12 +233,12 @@ cl_int _starpu_opencl_deinit_context(int devid)
         contexts[devid] = NULL;
 
 	STARPU_PTHREAD_MUTEX_UNLOCK(&big_lock);
+#endif
 
-        return CL_SUCCESS;
+        return 0;
 }
-#endif
 
-cl_int starpu_opencl_allocate_memory(int devid, cl_mem *mem STARPU_ATTRIBUTE_UNUSED, size_t size STARPU_ATTRIBUTE_UNUSED, cl_mem_flags flags STARPU_ATTRIBUTE_UNUSED)
+cl_int starpu_opencl_allocate_memory(int devid STARPU_ATTRIBUTE_UNUSED, cl_mem *mem STARPU_ATTRIBUTE_UNUSED, size_t size STARPU_ATTRIBUTE_UNUSED, cl_mem_flags flags STARPU_ATTRIBUTE_UNUSED)
 {
 #ifdef STARPU_SIMGRID
 	STARPU_ABORT();
@@ -561,7 +585,7 @@ void _starpu_opencl_init(void)
 #ifndef STARPU_SIMGRID
 static unsigned _starpu_opencl_get_device_name(int dev, char *name, int lname);
 #endif
-static int _starpu_opencl_start_job(struct _starpu_job *j, struct _starpu_worker *worker);
+static int _starpu_opencl_start_job(struct _starpu_job *j, struct _starpu_worker *worker, unsigned char pipeline_idx);
 static void _starpu_opencl_stop_job(struct _starpu_job *j, struct _starpu_worker *worker);
 static void _starpu_opencl_execute_job(struct starpu_task *task, struct _starpu_worker *worker);
 
@@ -571,9 +595,7 @@ int _starpu_opencl_driver_init(struct _starpu_worker *worker)
 
 	_starpu_worker_start(worker, _STARPU_FUT_OPENCL_KEY);
 
-#ifndef STARPU_SIMGRID
 	_starpu_opencl_init_context(devid);
-#endif
 
 	/* one more time to avoid hacks from third party lib :) */
 	_starpu_bind_thread_on_cpu(worker->config, worker->bindid);
@@ -601,6 +623,25 @@ int _starpu_opencl_driver_init(struct _starpu_worker *worker)
 		_STARPU_DISP("Warning: STARPU_OPENCL_PIPELINE is %u, but STARPU_MAX_PIPELINE is only %u", worker->pipeline_length, STARPU_MAX_PIPELINE);
 		worker->pipeline_length = STARPU_MAX_PIPELINE;
 	}
+#if defined(STARPU_SIMGRID) && defined(STARPU_NON_BLOCKING_DRIVERS)
+	if (worker->pipeline_length >= 1)
+	{
+		/* We need blocking drivers, otherwise idle drivers
+		 * would keep consuming real CPU time while just
+		 * polling for task termination */
+		_STARPU_DISP("Warning: reducing STARPU_OPENCL_PIPELINE to 0 because simgrid is enabled and blocking drivers are not enabled\n");
+		worker->pipeline_length = 0;
+	}
+#endif
+#if !defined(STARPU_SIMGRID) && !defined(STARPU_NON_BLOCKING_DRIVERS)
+	if (worker->pipeline_length >= 1)
+	{
+		/* We need non-blocking drivers, to poll for OPENCL task
+		 * termination */
+		_STARPU_DISP("Warning: reducing STARPU_OPENCL_PIPELINE to 0 because blocking drivers are not enabled (and simgrid is not enabled)\n");
+		worker->pipeline_length = 0;
+	}
+#endif
 
 	_STARPU_DEBUG("OpenCL (%s) dev id %d thread is ready to run on CPU %d !\n", devname, devid, worker->bindid);
 
@@ -624,7 +665,6 @@ int _starpu_opencl_driver_run_once(struct _starpu_worker *worker)
 	struct _starpu_job *j;
 	struct starpu_task *task;
 
-#ifndef STARPU_SIMGRID
 	if (worker->ntasks)
 	{
 		cl_int status;
@@ -635,47 +675,54 @@ int _starpu_opencl_driver_run_once(struct _starpu_worker *worker)
 
 		task = worker->current_tasks[worker->first_task];
 
+#ifdef STARPU_SIMGRID
+		if (task_finished[worker->devid][worker->first_task])
+#else /* !STARPU_SIMGRID */
 		err = clGetEventInfo(task_events[worker->devid][worker->first_task], CL_EVENT_COMMAND_EXECUTION_STATUS, sizeof(cl_int), &status, &size);
 		STARPU_ASSERT(size == sizeof(cl_int));
 		if (STARPU_UNLIKELY(err != CL_SUCCESS)) STARPU_OPENCL_REPORT_ERROR(err);
 
 		if (status != CL_COMPLETE)
+#endif /* !STARPU_SIMGRID */
 		{
 			_STARPU_TRACE_START_EXECUTING();
 			/* Not ready yet, no better thing to do than waiting */
 			__starpu_datawizard_progress(memnode, 1, 0);
 			return 0;
 		}
-
-		task_events[worker->devid][worker->first_task] = 0;
-
-		/* Asynchronous task completed! */
-		_starpu_opencl_stop_job(_starpu_get_job_associated_to_task(task), worker);
-		/* See next task if any */
-		if (worker->ntasks)
+		else
 		{
-			task = worker->current_tasks[worker->first_task];
-			j = _starpu_get_job_associated_to_task(task);
-			if (task->cl->opencl_flags[j->nimpl] & STARPU_OPENCL_ASYNC)
-			{
-				/* An asynchronous task, it was already queued,
-				 * it's now running, record its start time.  */
-				_starpu_driver_start_job(worker, j, &worker->perf_arch, &j->cl_start, 0, starpu_profiling_status_get());
-			}
-			else
+#ifndef STARPU_SIMGRID
+			task_events[worker->devid][worker->first_task] = 0;
+#endif
+
+			/* Asynchronous task completed! */
+			_starpu_opencl_stop_job(_starpu_get_job_associated_to_task(task), worker);
+			/* See next task if any */
+			if (worker->ntasks)
 			{
-				/* A synchronous task, we have finished flushing the pipeline, we can now at last execute it.  */
-				_STARPU_TRACE_END_PROGRESS(memnode);
-				_STARPU_TRACE_EVENT("sync_task");
-				_starpu_opencl_execute_job(task, worker);
-				_STARPU_TRACE_EVENT("end_sync_task");
-				_STARPU_TRACE_START_PROGRESS(memnode);
-				worker->pipeline_stuck = 0;
+				task = worker->current_tasks[worker->first_task];
+				j = _starpu_get_job_associated_to_task(task);
+				if (task->cl->opencl_flags[j->nimpl] & STARPU_OPENCL_ASYNC)
+				{
+					/* An asynchronous task, it was already queued,
+					 * it's now running, record its start time.  */
+					_starpu_driver_start_job(worker, j, &worker->perf_arch, &j->cl_start, 0, starpu_profiling_status_get());
+				}
+				else
+				{
+					/* A synchronous task, we have finished flushing the pipeline, we can now at last execute it.  */
+					_STARPU_TRACE_END_PROGRESS(memnode);
+					_STARPU_TRACE_EVENT("sync_task");
+					_starpu_opencl_execute_job(task, worker);
+					_STARPU_TRACE_EVENT("end_sync_task");
+					_STARPU_TRACE_START_PROGRESS(memnode);
+					worker->pipeline_stuck = 0;
+				}
 			}
+			_STARPU_TRACE_END_EXECUTING();
 		}
-		_STARPU_TRACE_END_EXECUTING();
 	}
-#endif /* STARPU_SIMGRID */
 
 	__starpu_datawizard_progress(memnode, 1, 1);
 
@@ -728,10 +775,8 @@ int _starpu_opencl_driver_deinit(struct _starpu_worker *worker)
 
 	_starpu_malloc_shutdown(memnode);
 
-#ifndef STARPU_SIMGRID
 	unsigned devid   = worker->devid;
         _starpu_opencl_deinit_context(devid);
-#endif
 
 	_STARPU_TRACE_WORKER_DEINIT_END(_STARPU_FUT_OPENCL_KEY);
 
@@ -802,7 +847,7 @@ cl_device_type _starpu_opencl_get_device_type(int devid)
 }
 #endif /* STARPU_USE_OPENCL */
 
-static int _starpu_opencl_start_job(struct _starpu_job *j, struct _starpu_worker *worker)
+static int _starpu_opencl_start_job(struct _starpu_job *j, struct _starpu_worker *worker, unsigned char pipeline_idx STARPU_ATTRIBUTE_UNUSED)
 {
 	int ret;
 
@@ -853,7 +898,11 @@ static int _starpu_opencl_start_job(struct _starpu_job *j, struct _starpu_worker
 		STARPU_ASSERT_MSG(profiling_info->used_cycles, "Application kernel must call starpu_opencl_collect_stats to collect simulated time");
 		length = ((double) profiling_info->used_cycles)/MSG_get_host_speed(MSG_host_self());
 	  #endif
-		_starpu_simgrid_execute_job(j, &worker->perf_arch, length);
+		int async = task->cl->opencl_flags[j->nimpl] & STARPU_OPENCL_ASYNC;
+		_starpu_simgrid_submit_job(worker->workerid, j, &worker->perf_arch, length,
+				async ? &task_finished[worker->devid][pipeline_idx] : NULL,
+				async ? &task_mutex[worker->devid][pipeline_idx] : NULL,
+				async ? &task_cond[worker->devid][pipeline_idx] : NULL);
 #else
 		func(_STARPU_TASK_GET_INTERFACES(task), task->cl_arg);
 #endif
@@ -868,7 +917,10 @@ static void _starpu_opencl_stop_job(struct _starpu_job *j, struct _starpu_worker
 	int profiling = starpu_profiling_status_get();
 
 	_starpu_set_current_task(NULL);
-	worker->current_tasks[worker->first_task] = NULL;
+	if (worker->pipeline_length)
+		worker->current_tasks[worker->first_task] = NULL;
+	else
+		worker->current_task = NULL;
 	worker->first_task = (worker->first_task + 1) % STARPU_MAX_PIPELINE;
 	worker->ntasks--;
 
@@ -888,7 +940,9 @@ static void _starpu_opencl_execute_job(struct starpu_task *task, struct _starpu_
 
 	struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
 
-	res = _starpu_opencl_start_job(j, worker);
+	unsigned char pipeline_idx = (worker->first_task + worker->ntasks - 1)%STARPU_MAX_PIPELINE;
+
+	res = _starpu_opencl_start_job(j, worker, pipeline_idx);
 
 	if (res)
 	{
@@ -903,22 +957,28 @@ static void _starpu_opencl_execute_job(struct starpu_task *task, struct _starpu_
 		}
 	}
 
-#ifndef STARPU_SIMGRID
 	if (task->cl->opencl_flags[j->nimpl] & STARPU_OPENCL_ASYNC)
 	{
 		/* Record event to synchronize with task termination later */
-		int err;
+#ifndef STARPU_SIMGRID
 		cl_command_queue queue;
 		starpu_opencl_get_queue(worker->devid, &queue);
+#endif
 
 		if (worker->pipeline_length == 0)
 		{
+#ifdef STARPU_SIMGRID
+			_starpu_simgrid_wait_tasks(worker->workerid);
+#else
 			starpu_opencl_get_queue(worker->devid, &queue);
 			clFinish(queue);
+#endif
 			_starpu_opencl_stop_job(j, worker);
 		}
 		else
 		{
+#ifndef STARPU_SIMGRID
+			int err;
 			/* the function clEnqueueMarker is deprecated from
 			 * OpenCL version 1.2. We would like to use the new
 			 * function clEnqueueMarkerWithWaitList. We could do
@@ -928,17 +988,13 @@ static void _starpu_opencl_execute_job(struct starpu_task *task, struct _starpu_
 			 * 2 macros detect the function availability in the
 			 * ICD and not in the device implementation.
 			 */
-			err = clEnqueueMarker(queue, &task_events[worker->devid][(worker->first_task + worker->ntasks - 1)%STARPU_MAX_PIPELINE]);
+			err = clEnqueueMarker(queue, &task_events[worker->devid][pipeline_idx]);
 			if (STARPU_UNLIKELY(err != CL_SUCCESS)) STARPU_OPENCL_REPORT_ERROR(err);
+#endif
 			_STARPU_TRACE_START_EXECUTING();
 		}
 	}
 	else
-#else
-#ifdef STARPU_DEVEL
-#warning No OpenCL asynchronous execution with simgrid yet.
-#endif
-#endif
 	/* Synchronous execution */
 	{
 		_starpu_opencl_stop_job(j, worker);

+ 2 - 13
src/sched_policies/eager_central_policy.c

@@ -129,14 +129,8 @@ static int push_task_eager_policy(struct starpu_task *task)
 	{
 		worker = workers->get_next(workers, &it);
 		if (dowake[worker])
-		{
-			starpu_pthread_mutex_t *sched_mutex;
-			starpu_pthread_cond_t *sched_cond;
-			starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
-
-			if (starpu_wakeup_worker(worker, sched_cond, sched_mutex))
+			if (starpu_wake_worker(worker))
 				break; // wake up a single worker
-		}
 	}
 #endif
 
@@ -207,12 +201,7 @@ static void eager_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nw
 		workerid = workerids[i];
 		int curr_workerid = starpu_worker_get_id();
 		if(workerid != curr_workerid)
-		{
-			starpu_pthread_mutex_t *sched_mutex;
-			starpu_pthread_cond_t *sched_cond;
-			starpu_worker_get_sched_condition(workerid, &sched_mutex, &sched_cond);
-			starpu_wakeup_worker(workerid, sched_cond, sched_mutex);
-		}
+			starpu_wake_worker(workerid);
 
 		starpu_sched_ctx_worker_shares_tasks_lists(workerid, sched_ctx_id);
 	}

+ 1 - 7
src/sched_policies/eager_central_priority_policy.c

@@ -177,14 +177,8 @@ static int _starpu_priority_push_task(struct starpu_task *task)
 	{
 		worker = workers->get_next(workers, &it);
 		if (dowake[worker])
-		{
-			starpu_pthread_mutex_t *sched_mutex;
-			starpu_pthread_cond_t *sched_cond;
-			starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
-
-			if (starpu_wakeup_worker(worker, sched_cond, sched_mutex))
+			if (starpu_wake_worker(worker))
 				break; // wake up a single worker
-		}
 	}
 #endif