소스 검색

Vastly improve simgrid simulation time by using wake queues

Samuel Thibault 9 년 전
부모
커밋
90bf39960e

+ 3 - 0
ChangeLog

@@ -22,6 +22,9 @@ New features:
   * Enable anticipative writeback by default.
   * New scheduler with heterogeneous priorities
 
+Changes:
+  * Vastly improve simgrid simulation time.
+
 StarPU 1.2.0 (svn revision xxxx)
 ==============================================
 

+ 77 - 0
include/starpu_thread.h

@@ -283,6 +283,83 @@ typedef void* starpu_pthread_cond_t;
 typedef void* starpu_pthread_barrier_t;
 #endif /* _MSC_VER */
 
+/*
+ * Simgrid-specific register/wait synchronization
+ *
+ * Producers create a "queue" object, and when they have produced something,
+ * they call either queue_signal or queue_broadcast in order to wake either one
+ * or all consumers waiting on the queue.
+ *
+ * starpu_pthread_queue_init(&global_queue1->queue);
+ * while (1) {
+ * 	element = compute();
+ * 	push(element, global_queue1);
+ * 	starpu_pthread_queue_signal(global_queue1);
+ * }
+ * starpu_pthread_queue_destroy(&global_queue1->queue);
+ *
+ * Consumers create a "wait" object, then queue_register on as many queues they
+ * want. In their consumption loop, they wait_reset, then test for availibility
+ * on all producers, and if none was available, call wait_wait to actually wait
+ * for producers. On termination, consumers have to queue_unregister before
+ * destroying the "wait" object:
+ *
+ * starpu_pthread_wait_t wait;
+ *
+ * starpu_pthread_wait_init(&wait);
+ * starpu_pthread_queue_register(&wait, &global_queue1->queue);
+ * starpu_pthread_queue_register(&wait, &global_queue2->queue);
+ *
+ * while (1) {
+ * 	int sleep = 1;
+ * 	starpu_pthread_wait_reset(&wait);
+ * 	if (global_queue1->navailable)
+ * 	{
+ * 		work(global_queue1);
+ * 		sleep = 0;
+ * 	}
+ * 	if (global_queue2->navailable)
+ * 	{
+ * 		work(global_queue2);
+ * 		sleep = 0;
+ * 	}
+ * 	if (sleep)
+ * 		starpu_pthread_wait_wait(&wait);
+ * }
+ * starpu_pthread_queue_unregister(&wait, &global_queue1->queue);
+ * starpu_pthread_queue_unregister(&wait, &global_queue2->queue);
+ * starpu_pthread_wait_destroy(&wait);
+ */
+
+#ifdef STARPU_SIMGRID
+typedef struct
+{
+	starpu_pthread_mutex_t mutex;
+	starpu_pthread_cond_t cond;
+	unsigned block;
+} starpu_pthread_wait_t;
+
+typedef struct
+{
+	starpu_pthread_mutex_t mutex;
+	starpu_pthread_wait_t **queue;
+	unsigned allocqueue;
+	unsigned nqueue;
+} starpu_pthread_queue_t;
+
+int starpu_pthread_queue_init(starpu_pthread_queue_t *q);
+int starpu_pthread_queue_signal(starpu_pthread_queue_t *q);
+int starpu_pthread_queue_broadcast(starpu_pthread_queue_t *q);
+int starpu_pthread_queue_destroy(starpu_pthread_queue_t *q);
+
+int starpu_pthread_wait_init(starpu_pthread_wait_t *w);
+int starpu_pthread_queue_register(starpu_pthread_wait_t *w, starpu_pthread_queue_t *q);
+int starpu_pthread_queue_unregister(starpu_pthread_wait_t *w, starpu_pthread_queue_t *q);
+int starpu_pthread_wait_reset(starpu_pthread_wait_t *w);
+int starpu_pthread_wait_wait(starpu_pthread_wait_t *w);
+int starpu_pthread_wait_destroy(starpu_pthread_wait_t *w);
+#endif
+
 #ifdef __cplusplus
 }
 #endif

+ 129 - 1
src/common/thread.c

@@ -349,7 +349,135 @@ int starpu_pthread_barrier_wait(starpu_pthread_barrier_t *barrier)
 	_STARPU_TRACE_BARRIER_WAIT_END();
 	return 0;
 }
-#endif /* defined(STARPU_SIMGRID) */
+#endif /* defined(STARPU_SIMGRID_HAVE_XBT_BARRIER_INIT) */
+
+int starpu_pthread_queue_init(starpu_pthread_queue_t *q)
+{
+	STARPU_PTHREAD_MUTEX_INIT(&q->mutex, NULL);
+	q->queue = NULL;
+	q->allocqueue = 0;
+	q->nqueue = 0;
+	return 0;
+}
+
+int starpu_pthread_wait_init(starpu_pthread_wait_t *w)
+{
+	STARPU_PTHREAD_MUTEX_INIT(&w->mutex, NULL);
+	STARPU_PTHREAD_COND_INIT(&w->cond, NULL);
+	w->block = 1;
+	return 0;
+}
+
+int starpu_pthread_queue_register(starpu_pthread_wait_t *w, starpu_pthread_queue_t *q)
+{
+	STARPU_PTHREAD_MUTEX_LOCK(&q->mutex);
+
+	if (q->nqueue == q->allocqueue)
+	{
+		/* Make room for the new waiter */
+		unsigned newalloc;
+		starpu_pthread_wait_t **newqueue;
+		newalloc = q->allocqueue * 2;
+		if (!newalloc)
+			newalloc = 1;
+		newqueue = realloc(q->queue, newalloc * sizeof(*(q->queue)));
+		STARPU_ASSERT(newqueue);
+		q->queue = newqueue;
+		q->allocqueue = newalloc;
+	}
+	q->queue[q->nqueue++] = w;
+
+	STARPU_PTHREAD_MUTEX_UNLOCK(&q->mutex);
+	return 0;
+}
+
+int starpu_pthread_queue_unregister(starpu_pthread_wait_t *w, starpu_pthread_queue_t *q)
+{
+	unsigned i;
+	STARPU_PTHREAD_MUTEX_LOCK(&q->mutex);
+	for (i = 0; i < q->nqueue; i++)
+	{
+		if (q->queue[i] == w)
+		{
+			memmove(&q->queue[i], &q->queue[i+1], (q->nqueue - i - 1) * sizeof(*(q->queue)));
+			break;
+		}
+	}
+	STARPU_ASSERT(i < q->nqueue);
+	q->nqueue--;
+	STARPU_PTHREAD_MUTEX_UNLOCK(&q->mutex);
+	return 0;
+}
+
+int starpu_pthread_wait_reset(starpu_pthread_wait_t *w)
+{
+	STARPU_PTHREAD_MUTEX_LOCK(&w->mutex);
+	w->block = 1;
+	STARPU_PTHREAD_MUTEX_UNLOCK(&w->mutex);
+	return 0;
+}
+
+int starpu_pthread_wait_wait(starpu_pthread_wait_t *w)
+{
+	STARPU_PTHREAD_MUTEX_LOCK(&w->mutex);
+	while (w->block == 1)
+		STARPU_PTHREAD_COND_WAIT(&w->cond, &w->mutex);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&w->mutex);
+	return 0;
+}
+
+int starpu_pthread_queue_signal(starpu_pthread_queue_t *q)
+{
+	starpu_pthread_wait_t *w;
+	STARPU_PTHREAD_MUTEX_LOCK(&q->mutex);
+	if (q->nqueue)
+	{
+		/* TODO: better try to wake a sleeping one if possible */
+		w = q->queue[0];
+		STARPU_PTHREAD_MUTEX_LOCK(&w->mutex);
+		w->block = 0;
+		STARPU_PTHREAD_COND_SIGNAL(&w->cond);
+		STARPU_PTHREAD_MUTEX_UNLOCK(&w->mutex);
+	}
+	STARPU_PTHREAD_MUTEX_UNLOCK(&q->mutex);
+	return 0;
+}
+
+int starpu_pthread_queue_broadcast(starpu_pthread_queue_t *q)
+{
+	unsigned i;
+	starpu_pthread_wait_t *w;
+	STARPU_PTHREAD_MUTEX_LOCK(&q->mutex);
+	for (i = 0; i < q->nqueue; i++)
+	{
+		w = q->queue[i];
+		STARPU_PTHREAD_MUTEX_LOCK(&w->mutex);
+		w->block = 0;
+		STARPU_PTHREAD_COND_SIGNAL(&w->cond);
+		STARPU_PTHREAD_MUTEX_UNLOCK(&w->mutex);
+	}
+	STARPU_PTHREAD_MUTEX_UNLOCK(&q->mutex);
+	return 0;
+}
+
+int starpu_pthread_wait_destroy(starpu_pthread_wait_t *w)
+{
+	STARPU_PTHREAD_MUTEX_LOCK(&w->mutex);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&w->mutex);
+	STARPU_PTHREAD_MUTEX_DESTROY(&w->mutex);
+	STARPU_PTHREAD_COND_DESTROY(&w->cond);
+	return 0;
+}
+
+int starpu_pthread_queue_destroy(starpu_pthread_queue_t *q)
+{
+	STARPU_ASSERT(!q->nqueue);
+	STARPU_PTHREAD_MUTEX_LOCK(&q->mutex);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&q->mutex);
+	STARPU_PTHREAD_MUTEX_DESTROY(&q->mutex);
+	free(q->queue);
+	return 0;
+}
 
 #endif /* STARPU_SIMGRID */
 

+ 8 - 0
src/core/simgrid.c

@@ -34,6 +34,9 @@ extern int smpi_main(int (*realmain) (int argc, char *argv[]), int argc, char *a
 #pragma weak _starpu_mpi_simgrid_init
 extern int _starpu_mpi_simgrid_init(int argc, char *argv[]);
 
+starpu_pthread_queue_t _starpu_simgrid_transfer_queue[STARPU_MAXNODES];
+starpu_pthread_queue_t _starpu_simgrid_task_queue[STARPU_NMAXWORKERS];
+
 struct main_args
 {
 	int argc;
@@ -238,6 +241,7 @@ int main(int argc, char **argv)
 
 void _starpu_simgrid_init()
 {
+	unsigned i;
 	if (!starpu_main && !(smpi_main && smpi_simulated_main_))
 	{
 		_STARPU_ERROR("In simgrid mode, the file containing the main() function of this application needs to be compiled with starpu.h included, to properly rename it into starpu_main\n");
@@ -247,6 +251,10 @@ void _starpu_simgrid_init()
 	{
 		MSG_process_set_data(MSG_process_self(), calloc(MAX_TSD, sizeof(void*)));
 	}
+	for (i = 0; i < STARPU_MAXNODES; i++)
+		starpu_pthread_queue_init(&_starpu_simgrid_transfer_queue[i]);
+	for (i = 0; i < STARPU_NMAXWORKERS; i++)
+		starpu_pthread_queue_init(&_starpu_simgrid_task_queue[i]);
 }
 
 /*

+ 3 - 0
src/core/simgrid.h

@@ -54,6 +54,9 @@ extern int starpu_mpi_world_rank(void);
 #pragma weak _starpu_mpi_simgrid_init
 int _starpu_mpi_simgrid_init(int argc, char *argv[]);
 
+starpu_pthread_queue_t _starpu_simgrid_transfer_queue[STARPU_MAXNODES];
+starpu_pthread_queue_t _starpu_simgrid_task_queue[STARPU_NMAXWORKERS];
+
 #define _starpu_simgrid_cuda_malloc_cost() starpu_get_env_number_default("STARPU_SIMGRID_CUDA_MALLOC_COST", 1)
 #define _starpu_simgrid_queue_malloc_cost() starpu_get_env_number_default("STARPU_SIMGRID_QUEUE_MALLOC_COST", 1)
 

+ 21 - 0
src/core/topology.c

@@ -1441,6 +1441,11 @@ _starpu_init_workers_binding (struct _starpu_machine_config *config, int no_mp_c
 				}
 				workerarg->bindid = _starpu_get_next_bindid(config, NULL, 0);
 				_starpu_memory_node_add_nworkers(memory_node);
+#ifdef STARPU_SIMGRID
+				starpu_pthread_queue_register(&workerarg->wait, &_starpu_simgrid_transfer_queue[memory_node]);
+				if (memory_node != STARPU_MAIN_RAM)
+					starpu_pthread_queue_register(&workerarg->wait, &_starpu_simgrid_transfer_queue[STARPU_MAIN_RAM]);
+#endif
 				break;
 			}
 #if defined(STARPU_USE_CUDA) || defined(STARPU_SIMGRID)
@@ -1502,6 +1507,10 @@ _starpu_init_workers_binding (struct _starpu_machine_config *config, int no_mp_c
 					}
 				}
 				_starpu_memory_node_add_nworkers(memory_node);
+#ifdef STARPU_SIMGRID
+				starpu_pthread_queue_register(&workerarg->set->workers[0].wait, &_starpu_simgrid_transfer_queue[memory_node]);
+				starpu_pthread_queue_register(&workerarg->set->workers[0].wait, &_starpu_simgrid_transfer_queue[STARPU_MAIN_RAM]);
+#endif
 				break;
 #endif
 
@@ -1539,6 +1548,10 @@ _starpu_init_workers_binding (struct _starpu_machine_config *config, int no_mp_c
 #endif /* SIMGRID */
 				}
 				_starpu_memory_node_add_nworkers(memory_node);
+#ifdef STARPU_SIMGRID
+				starpu_pthread_queue_register(&workerarg->wait, &_starpu_simgrid_transfer_queue[memory_node]);
+				starpu_pthread_queue_register(&workerarg->wait, &_starpu_simgrid_transfer_queue[STARPU_MAIN_RAM]);
+#endif
 				break;
 #endif
 
@@ -1568,6 +1581,10 @@ _starpu_init_workers_binding (struct _starpu_machine_config *config, int no_mp_c
 				}
 				workerarg->bindid = mic_bindid[devid];
 				_starpu_memory_node_add_nworkers(memory_node);
+#ifdef STARPU_SIMGRID
+				starpu_pthread_queue_register(&workerarg->set->workers[0].wait, &_starpu_simgrid_transfer_queue[memory_node]);
+				starpu_pthread_queue_register(&workerarg->set->workers[0].wait, &_starpu_simgrid_transfer_queue[STARPU_MAIN_RAM]);
+#endif
 				break;
 #endif /* STARPU_USE_MIC */
 
@@ -1580,6 +1597,10 @@ _starpu_init_workers_binding (struct _starpu_machine_config *config, int no_mp_c
 
 				memory_node = ram_memory_node;
 				_starpu_memory_node_add_nworkers(memory_node);
+#ifdef STARPU_SIMGRID
+				starpu_pthread_queue_register(&workerarg->wait, &_starpu_simgrid_transfer_queue[memory_node]);
+				starpu_pthread_queue_register(&workerarg->wait, &_starpu_simgrid_transfer_queue[STARPU_MAIN_RAM]);
+#endif
 			}
 				break;
 #endif

+ 21 - 0
src/core/workers.c

@@ -522,8 +522,13 @@ static void _starpu_worker_init(struct _starpu_worker *workerarg, struct _starpu
 	workerarg->current_ordered_task = 0;
 	workerarg->current_ordered_task_order = 1;
 	workerarg->current_task = NULL;
+#ifdef STARPU_SIMGRID
+	starpu_pthread_wait_init(&workerarg->wait);
+	starpu_pthread_queue_register(&workerarg->wait, &_starpu_simgrid_task_queue[workerarg->workerid]);
+#endif
 	workerarg->first_task = 0;
 	workerarg->ntasks = 0;
+	/* set initialized by topology.c */
 	workerarg->pipeline_length = 0;
 	workerarg->pipeline_stuck = 0;
 	workerarg->worker_is_running = 0;
@@ -562,6 +567,14 @@ static void _starpu_worker_init(struct _starpu_worker *workerarg, struct _starpu
 	/* cpu_set/hwloc_cpu_set initialized in topology.c */
 }
 
+static void _starpu_worker_deinit(struct _starpu_worker *workerarg, struct _starpu_machine_config *pconfig)
+{
+#ifdef STARPU_SIMGRID
+	starpu_pthread_queue_unregister(&workerarg->wait, &_starpu_simgrid_task_queue[workerarg->workerid]);
+	starpu_pthread_wait_destroy(&workerarg->wait);
+#endif
+}
+
 #ifdef STARPU_USE_FXT
 void _starpu_worker_start(struct _starpu_worker *worker, unsigned fut_key, unsigned sync)
 {
@@ -1475,6 +1488,7 @@ void starpu_display_stats()
 
 void starpu_shutdown(void)
 {
+	unsigned worker;
 	STARPU_PTHREAD_MUTEX_LOCK(&init_mutex);
 	init_count--;
 	STARPU_ASSERT_MSG(init_count >= 0, "Number of calls to starpu_shutdown() can not be higher than the number of calls to starpu_init()\n");
@@ -1529,6 +1543,9 @@ void starpu_shutdown(void)
 	_starpu_delete_all_sched_ctxs();
 	_starpu_sched_component_workers_destroy();
 
+	for (worker = 0; worker < config.topology.nworkers; worker++)
+		_starpu_worker_deinit(&config.workers[worker], &config);
+
 	_starpu_disk_unregister();
 #ifdef STARPU_HAVE_HWLOC
 	starpu_tree_free(config.topology.tree);
@@ -1907,6 +1924,10 @@ int starpu_wake_worker(int workerid)
 	starpu_pthread_mutex_t *sched_mutex;
 	starpu_pthread_cond_t *sched_cond;
 	starpu_worker_get_sched_condition(workerid, &sched_mutex, &sched_cond);
+
+#ifdef STARPU_SIMGRID
+	starpu_pthread_queue_broadcast(&_starpu_simgrid_task_queue[workerid]);
+#endif
 	return starpu_wakeup_worker(workerid, sched_cond, sched_mutex);
 }
 

+ 5 - 0
src/core/workers.h

@@ -23,6 +23,7 @@
 #include <common/config.h>
 #include <common/timing.h>
 #include <common/fxt.h>
+#include <common/thread.h>
 #include <core/jobs.h>
 #include <core/perfmodel/perfmodel.h>
 #include <core/sched_policy.h>
@@ -81,6 +82,10 @@ LIST_TYPE(_starpu_worker,
 	unsigned current_ordered_task_order; /* this records the order of the next ordered task to be executed */
 	struct starpu_task *current_task; /* task currently executed by this worker (non-pipelined version) */
 	struct starpu_task *current_tasks[STARPU_MAX_PIPELINE]; /* tasks currently executed by this worker (pipelined version) */
+#ifdef STARPU_SIMGRID
+	starpu_pthread_wait_t wait;
+#endif
+
 	unsigned char first_task; /* Index of first task in the pipeline */
 	unsigned char ntasks; /* number of tasks in the pipeline */
 	unsigned char pipeline_length; /* number of tasks to be put in the pipeline */

+ 12 - 0
src/datawizard/copy_driver.c

@@ -56,6 +56,10 @@ void _starpu_wake_all_blocked_workers_on_node(unsigned nodeid)
 	}
 
 	STARPU_PTHREAD_RWLOCK_UNLOCK(&descr->conditions_rwlock);
+
+#ifdef STARPU_SIMGRID
+	starpu_pthread_queue_broadcast(&_starpu_simgrid_transfer_queue[nodeid]);
+#endif
 }
 
 void starpu_wake_all_blocked_workers(void)
@@ -80,6 +84,14 @@ void starpu_wake_all_blocked_workers(void)
 	}
 
 	STARPU_PTHREAD_RWLOCK_UNLOCK(&descr->conditions_rwlock);
+
+#ifdef STARPU_SIMGRID
+	unsigned workerid, nodeid;
+	for (workerid = 0; workerid < starpu_worker_get_count(); workerid++)
+		starpu_pthread_queue_broadcast(&_starpu_simgrid_task_queue[workerid]);
+	for (nodeid = 0; nodeid < starpu_memory_nodes_get_count(); nodeid++)
+		starpu_pthread_queue_broadcast(&_starpu_simgrid_transfer_queue[nodeid]);
+#endif
 }
 
 #ifdef STARPU_USE_FXT

+ 44 - 1
src/datawizard/data_request.c

@@ -21,6 +21,7 @@
 #include <datawizard/datawizard.h>
 #include <datawizard/memory_nodes.h>
 #include <core/disk.h>
+#include <core/simgrid.h>
 
 /* TODO: This should be tuned according to driver capabilities
  * Data interfaces should also have to declare how many asynchronous requests
@@ -194,8 +195,22 @@ int _starpu_wait_data_request_completion(struct _starpu_data_request *r, unsigne
 
 	unsigned local_node = _starpu_memory_node_get_local_key();
 
+#ifdef STARPU_SIMGRID
+	starpu_pthread_wait_t wait;
+
+	starpu_pthread_wait_init(&wait);
+	/* We need to get woken both when requests finish on our node, and on
+	 * the target node of the request we are waiting for */
+	starpu_pthread_queue_register(&wait, &_starpu_simgrid_transfer_queue[local_node]);
+	starpu_pthread_queue_register(&wait, &_starpu_simgrid_transfer_queue[r->dst_replicate->memory_node]);
+#endif
+
 	do
 	{
+#ifdef STARPU_SIMGRID
+		starpu_pthread_wait_reset(&wait);
+#endif
+
 		STARPU_SYNCHRONIZE();
 		if (STARPU_RUNNING_ON_VALGRIND)
 			completed = 1;
@@ -209,16 +224,27 @@ int _starpu_wait_data_request_completion(struct _starpu_data_request *r, unsigne
 			_starpu_spin_unlock(&r->lock);
 		}
 
+#ifndef STARPU_SIMGRID
 #ifndef STARPU_NON_BLOCKING_DRIVERS
 		/* XXX: shouldn't be needed, and doesn't work with chained requests anyway */
 		_starpu_wake_all_blocked_workers_on_node(r->handling_node);
 #endif
+#endif
 
 		_starpu_datawizard_progress(local_node, may_alloc);
 
+#ifdef STARPU_SIMGRID
+		starpu_pthread_wait_wait(&wait);
+#endif
 	}
 	while (1);
 
+#ifdef STARPU_SIMGRID
+	starpu_pthread_queue_unregister(&wait, &_starpu_simgrid_transfer_queue[local_node]);
+	starpu_pthread_queue_unregister(&wait, &_starpu_simgrid_transfer_queue[r->dst_replicate->memory_node]);
+	starpu_pthread_wait_destroy(&wait);
+#endif
+
 
 	retval = r->retval;
 	if (retval)
@@ -348,6 +374,11 @@ static void starpu_handle_data_request_completion(struct _starpu_data_request *r
 
 	r->completed = 1;
 
+#ifdef STARPU_SIMGRID
+	/* Wake potential worker which was waiting for it */
+	_starpu_wake_all_blocked_workers_on_node(dst_replicate->memory_node);
+#endif
+
 	/* Remove a reference on the destination replicate for the request */
 	STARPU_ASSERT(dst_replicate->refcnt > 0);
 	dst_replicate->refcnt--;
@@ -574,7 +605,19 @@ static int __starpu_handle_node_data_requests(struct _starpu_data_request_list *
 			_starpu_data_request_list_push_list_front(&new_data_requests[2], &idle_requests[src_node]);
 		STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[src_node]);
 
-#ifndef STARPU_NON_BLOCKING_DRIVERS
+#ifdef STARPU_SIMGRID
+		if (*pushed)
+		{
+			/* We couldn't process the request due to missing
+			 * space. Advance the clock a bit to let eviction have
+			 * the time to make some room for us. Ideally we should
+			 * rather have the caller block, and explicitly wait
+			 * for eviction to happen.
+			 */
+			MSG_process_sleep(0.000010);
+			_starpu_wake_all_blocked_workers_on_node(src_node);
+		}
+#elif !defined(STARPU_NON_BLOCKING_DRIVERS)
 		_starpu_wake_all_blocked_workers_on_node(src_node);
 #endif
 	}

+ 13 - 3
src/drivers/cpu/driver_cpu.c

@@ -221,18 +221,28 @@ int _starpu_cpu_driver_run_once(struct _starpu_worker *cpu_worker)
 	unsigned memnode = cpu_worker->memory_node;
 	int workerid = cpu_worker->workerid;
 
+	int res;
+
+#ifdef STARPU_SIMGRID
+	starpu_pthread_wait_reset(&cpu_worker->wait);
+#endif
+
 	_STARPU_TRACE_START_PROGRESS(memnode);
-	_starpu_datawizard_progress(memnode, 1);
+	res = __starpu_datawizard_progress(memnode, 1, 1);
 	if (memnode != STARPU_MAIN_RAM)
-		_starpu_datawizard_progress(STARPU_MAIN_RAM, 1);
+		res |= __starpu_datawizard_progress(STARPU_MAIN_RAM, 1, 1);
 	_STARPU_TRACE_END_PROGRESS(memnode);
 
 	struct _starpu_job *j;
 	struct starpu_task *task;
-	int res;
 
 	task = _starpu_get_worker_task(cpu_worker, workerid, memnode);
 
+#ifdef STARPU_SIMGRID
+	if (!res && !task)
+		starpu_pthread_wait_wait(&cpu_worker->wait);
+#endif
+
 	if (!task)
 		return 0;
 

+ 14 - 14
src/drivers/cuda/driver_cuda.c

@@ -641,16 +641,6 @@ int _starpu_cuda_driver_init(struct _starpu_worker_set *worker_set)
 			_STARPU_DISP("Warning: STARPU_CUDA_PIPELINE is %u, but STARPU_MAX_PIPELINE is only %u", worker->pipeline_length, STARPU_MAX_PIPELINE);
 			worker->pipeline_length = STARPU_MAX_PIPELINE;
 		}
-#if defined(STARPU_SIMGRID) && defined(STARPU_NON_BLOCKING_DRIVERS)
-		if (worker->pipeline_length >= 1)
-		{
-			/* We need blocking drivers, otherwise idle drivers
-			 * would keep consuming real CPU time while just
-			 * polling for task termination */
-			_STARPU_DISP("Warning: reducing STARPU_CUDA_PIPELINE to 0 because simgrid is enabled and blocking drivers are not enabled\n");
-			worker->pipeline_length = 0;
-		}
-#endif
 #if !defined(STARPU_SIMGRID) && !defined(STARPU_NON_BLOCKING_DRIVERS)
 		if (worker->pipeline_length >= 1)
 		{
@@ -691,6 +681,10 @@ int _starpu_cuda_driver_run_once(struct _starpu_worker_set *worker_set)
 
 	int idle;
 
+#ifdef STARPU_SIMGRID
+	starpu_pthread_wait_reset(&worker0->wait);
+#endif
+
 	/* First poll for completed jobs */
 	idle = 0;
 	for (i = 0; i < (int) worker_set->nworkers; i++)
@@ -763,7 +757,7 @@ int _starpu_cuda_driver_run_once(struct _starpu_worker_set *worker_set)
 			idle++;
 	}
 
-#ifdef STARPU_NON_BLOCKING_DRIVERS
+#if defined(STARPU_NON_BLOCKING_DRIVERS) && !defined(STARPU_SIMGRID)
 	if (!idle)
 	{
 		/* Nothing ready yet, no better thing to do than waiting */
@@ -774,14 +768,20 @@ int _starpu_cuda_driver_run_once(struct _starpu_worker_set *worker_set)
 #endif
 
 	/* Something done, make some progress */
-	__starpu_datawizard_progress(memnode, 1, 1);
-	__starpu_datawizard_progress(STARPU_MAIN_RAM, 1, 1);
+	res = !idle;
+	res |= __starpu_datawizard_progress(memnode, 1, 1);
+	res |= __starpu_datawizard_progress(STARPU_MAIN_RAM, 1, 1);
 
 	/* And pull tasks */
-	res = _starpu_get_multi_worker_task(worker_set->workers, tasks, worker_set->nworkers, memnode);
+	res |= _starpu_get_multi_worker_task(worker_set->workers, tasks, worker_set->nworkers, memnode);
 
+#ifdef STARPU_SIMGRID
+	if (!res)
+		starpu_pthread_wait_wait(&worker0->wait);
+#else
 	if (!res)
 		return 0;
+#endif
 
 	for (i = 0; i < (int) worker_set->nworkers; i++)
 	{

+ 6 - 2
src/drivers/driver_common/driver_common.c

@@ -415,6 +415,7 @@ struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *worker, int w
 	else
 		task = _starpu_pop_task(worker);
 
+#if !defined(STARPU_SIMGRID)
 	if (task == NULL && !executing)
 	{
 		/* Didn't get a task to run and none are running, go to sleep */
@@ -455,6 +456,7 @@ struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *worker, int w
 
 		return NULL;
 	}
+#endif
 
 	_starpu_worker_set_status_scheduling_done(workerid);
 
@@ -571,7 +573,8 @@ int _starpu_get_multi_worker_task(struct _starpu_worker *workers, struct starpu_
 		}
 	}
 
-#ifndef STARPU_NON_BLOCKING_DRIVERS
+#if !defined(STARPU_NON_BLOCKING_DRIVERS)
+#if !defined(STARPU_SIMGRID)
 	/* Block the assumed-to-be-only worker */
 	struct _starpu_worker *worker = &workers[0];
 	unsigned workerid = workers[0].workerid;
@@ -617,8 +620,9 @@ int _starpu_get_multi_worker_task(struct _starpu_worker *workers, struct starpu_
 
 	_starpu_worker_set_status_wakeup(workerid);
 	worker->spinning_backoff = BACKOFF_MIN;
+#endif /* STARPU_SIMGRID */
 
-	STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&workers[0].sched_mutex);
 #endif /* !STARPU_NON_BLOCKING_DRIVERS */
 
 	return count;

+ 13 - 4
src/drivers/mp_common/source_common.c

@@ -682,14 +682,18 @@ void _starpu_src_common_worker(struct _starpu_worker_set * worker_set,
 	/*main loop*/
 	while (_starpu_machine_is_running())
 	{
-		int res;
+		int res = 0;
 		struct _starpu_job * j;
 
 		_starpu_may_pause();
 
+#ifdef STARPU_SIMGRID
+		starpu_pthread_wait_reset(&worker_set->workers[0].wait);
+#endif
+
 		_STARPU_TRACE_START_PROGRESS(memnode);
-		_starpu_datawizard_progress(memnode, 1);
-		_starpu_datawizard_progress(STARPU_MAIN_RAM, 1);
+		res |= __starpu_datawizard_progress(memnode, 1, 1);
+		res |= __starpu_datawizard_progress(STARPU_MAIN_RAM, 1, 1);
 		_STARPU_TRACE_END_PROGRESS(memnode);
 
 		/* Handle message which have been store */
@@ -700,7 +704,12 @@ void _starpu_src_common_worker(struct _starpu_worker_set * worker_set,
 			_starpu_src_common_recv_async(mp_node);
 
 		/* get task for each worker*/
-		res = _starpu_get_multi_worker_task(worker_set->workers, tasks, worker_set->nworkers, memnode);
+		res |= _starpu_get_multi_worker_task(worker_set->workers, tasks, worker_set->nworkers, memnode);
+
+#ifdef STARPU_SIMGRID
+		if (!res)
+			starpu_pthread_wait_wait(&worker_set->workers[0].wait);
+#endif
 
 		/*if at least one worker have pop a task*/
 		if(res != 0)

+ 14 - 13
src/drivers/opencl/driver_opencl.c

@@ -635,16 +635,6 @@ int _starpu_opencl_driver_init(struct _starpu_worker *worker)
 		_STARPU_DISP("Warning: STARPU_OPENCL_PIPELINE is %u, but STARPU_MAX_PIPELINE is only %u", worker->pipeline_length, STARPU_MAX_PIPELINE);
 		worker->pipeline_length = STARPU_MAX_PIPELINE;
 	}
-#if defined(STARPU_SIMGRID) && defined(STARPU_NON_BLOCKING_DRIVERS)
-	if (worker->pipeline_length >= 1)
-	{
-		/* We need blocking drivers, otherwise idle drivers
-		 * would keep consuming real CPU time while just
-		 * polling for task termination */
-		_STARPU_DISP("Warning: reducing STARPU_OPENCL_PIPELINE to 0 because simgrid is enabled and blocking drivers are not enabled\n");
-		worker->pipeline_length = 0;
-	}
-#endif
 #if !defined(STARPU_SIMGRID) && !defined(STARPU_NON_BLOCKING_DRIVERS)
 	if (worker->pipeline_length >= 1)
 	{
@@ -676,9 +666,14 @@ int _starpu_opencl_driver_run_once(struct _starpu_worker *worker)
 
 	struct _starpu_job *j;
 	struct starpu_task *task;
+	int res;
 
 	int idle;
 
+#ifdef STARPU_SIMGRID
+	starpu_pthread_wait_reset(&worker->wait);
+#endif
+
 	/* First poll for completed jobs */
 	idle = 0;
 	if (worker->ntasks)
@@ -741,7 +736,7 @@ int _starpu_opencl_driver_run_once(struct _starpu_worker *worker)
 	if (worker->ntasks < worker->pipeline_length)
 		idle++;
 
-#ifdef STARPU_NON_BLOCKING_DRIVERS
+#if defined(STARPU_NON_BLOCKING_DRIVERS) && !defined(STARPU_SIMGRID)
 	if (!idle)
 	{
 		/* Not ready yet, no better thing to do than waiting */
@@ -751,11 +746,17 @@ int _starpu_opencl_driver_run_once(struct _starpu_worker *worker)
 	}
 #endif
 
-	__starpu_datawizard_progress(memnode, 1, 1);
-	__starpu_datawizard_progress(STARPU_MAIN_RAM, 1, 1);
+	res = !idle;
+	res |= __starpu_datawizard_progress(memnode, 1, 1);
+	res |= __starpu_datawizard_progress(STARPU_MAIN_RAM, 1, 1);
 
 	task = _starpu_get_worker_task(worker, workerid, memnode);
 
+#ifdef STARPU_SIMGRID
+	if (!res && !task)
+		starpu_pthread_wait_wait(&worker->wait);
+#endif
+
 	if (task == NULL)
 		return 0;
 

+ 1 - 1
src/sched_policies/eager_central_policy.c

@@ -121,7 +121,7 @@ static int push_task_eager_policy(struct starpu_task *task)
 	/* Let the task free */
 	STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
 
-#ifndef STARPU_NON_BLOCKING_DRIVERS
+#if !defined(STARPU_NON_BLOCKING_DRIVERS) || defined(STARPU_SIMGRID)
 	/* Now that we have a list of potential workers, try to wake one */
 
 	workers->init_iterator_for_parallel_tasks(workers, &it, task);