Bläddra i källkod

enable to keep a worker awake if a new task was pushed to it after it decided to sleep but before the sleeping is enforced
enable policies eager, prio, random, ws and lws

Olivier Aumage 8 år sedan
förälder
incheckning
16e2c15fbe

+ 22 - 21
src/core/sched_policy.c

@@ -52,32 +52,33 @@ int starpu_get_prefetch_flag(void)
 
 static struct starpu_sched_policy *predefined_policies[] =
 {
-	&_starpu_sched_modular_eager_policy,
-	&_starpu_sched_modular_eager_prefetching_policy,
-	&_starpu_sched_modular_prio_policy,
-	&_starpu_sched_modular_prio_prefetching_policy,
-	&_starpu_sched_modular_random_policy,
-	&_starpu_sched_modular_random_prio_policy,
-	&_starpu_sched_modular_random_prefetching_policy,
-	&_starpu_sched_modular_random_prio_prefetching_policy,
-	&_starpu_sched_modular_ws_policy,
-	&_starpu_sched_modular_heft_policy,
-	&_starpu_sched_modular_heft_prio_policy,
-	&_starpu_sched_modular_heft2_policy,
+#warning update policies with sleeping synchro
+	//&_starpu_sched_modular_eager_policy,
+	//&_starpu_sched_modular_eager_prefetching_policy,
+	//&_starpu_sched_modular_prio_policy,
+	//&_starpu_sched_modular_prio_prefetching_policy,
+	//&_starpu_sched_modular_random_policy,
+	//&_starpu_sched_modular_random_prio_policy,
+	//&_starpu_sched_modular_random_prefetching_policy,
+	//&_starpu_sched_modular_random_prio_prefetching_policy,
+	//&_starpu_sched_modular_ws_policy,
+	//&_starpu_sched_modular_heft_policy,
+	//&_starpu_sched_modular_heft_prio_policy,
+	//&_starpu_sched_modular_heft2_policy,
 	&_starpu_sched_eager_policy,
 	&_starpu_sched_prio_policy,
 	&_starpu_sched_random_policy,
 	&_starpu_sched_lws_policy,
 	&_starpu_sched_ws_policy,
-	&_starpu_sched_dm_policy,
-	&_starpu_sched_dmda_policy,
-	&_starpu_sched_dmda_ready_policy,
-	&_starpu_sched_dmda_sorted_policy,
-	&_starpu_sched_dmda_sorted_decision_policy,
-	&_starpu_sched_parallel_heft_policy,
-	&_starpu_sched_peager_policy,
-	&_starpu_sched_heteroprio_policy,
-	&_starpu_sched_graph_test_policy,
+	//&_starpu_sched_dm_policy,
+	//&_starpu_sched_dmda_policy,
+	//&_starpu_sched_dmda_ready_policy,
+	//&_starpu_sched_dmda_sorted_policy,
+	//&_starpu_sched_dmda_sorted_decision_policy,
+	//&_starpu_sched_parallel_heft_policy,
+	//&_starpu_sched_peager_policy,
+	//&_starpu_sched_heteroprio_policy,
+	//&_starpu_sched_graph_test_policy,
 	NULL
 };
 

+ 7 - 1
src/core/workers.c

@@ -554,6 +554,7 @@ static void _starpu_worker_init(struct _starpu_worker *workerarg, struct _starpu
 	workerarg->worker_is_running = 0;
 	workerarg->worker_is_initialized = 0;
 	workerarg->status = STATUS_INITIALIZING;
+	workerarg->state_keep_awake = 0;
 	/* name initialized by driver */
 	/* short_name initialized by driver */
 	workerarg->run_by_starpu = 1;
@@ -2110,7 +2111,12 @@ static int starpu_wakeup_worker_locked(int workerid, starpu_pthread_cond_t *sche
 #ifdef STARPU_SIMGRID
 	starpu_pthread_queue_broadcast(&_starpu_simgrid_task_queue[workerid]);
 #endif
-	if (_starpu_config.workers[workerid].status == STATUS_SLEEPING)
+	if (_starpu_config.workers[workerid].status == STATUS_SCHEDULING)
+	{
+		_starpu_config.workers[workerid].state_keep_awake = 1;
+		return 1;
+	}
+	else if (_starpu_config.workers[workerid].status == STATUS_SLEEPING)
 	{
 		_starpu_config.workers[workerid].status = STATUS_WAKING_UP;
 		/* cond_broadcast is required over cond_signal since

+ 1 - 0
src/core/workers.h

@@ -123,6 +123,7 @@ LIST_TYPE(_starpu_worker,
 	unsigned worker_is_running;
 	unsigned worker_is_initialized;
 	enum _starpu_worker_status status; /* what is the worker doing now ? (eg. CALLBACK) */
+	unsigned state_keep_awake; /* !0 if a task has been pushed to the worker and the task has not yet been seen by the worker, the worker should no go to sleep before processing this task*/
 	char name[64];
 	char short_name[10];
 	unsigned run_by_starpu; /* Is this run by StarPU or directly by the application ? */

+ 17 - 2
src/drivers/driver_common/driver_common.c

@@ -344,6 +344,7 @@ struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *worker, int w
 {
 	struct starpu_task *task;
 	unsigned executing STARPU_ATTRIBUTE_UNUSED = 0;
+	unsigned keep_awake = 0;
 
 	STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
 	_starpu_worker_enter_sched_op(worker);
@@ -365,10 +366,15 @@ struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *worker, int w
 		STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
 		task = _starpu_pop_task(worker);
 		STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
+		if (worker->state_keep_awake)
+		{
+			keep_awake = worker->state_keep_awake;
+			worker->state_keep_awake = 0;
+		}
 	}
 
 #if !defined(STARPU_SIMGRID)
-	if (task == NULL && !executing)
+	if (task == NULL && !executing && !keep_awake)
 	{
 		/* Didn't get a task to run and none are running, go to sleep */
 
@@ -442,6 +448,7 @@ int _starpu_get_multi_worker_task(struct _starpu_worker *workers, struct starpu_
 #endif
 	for (i = 0; i < nworkers; i++)
 	{
+		unsigned keep_awake = 0;
 		if ((workers[i].pipeline_length == 0 && workers[i].current_task)
 			|| (workers[i].pipeline_length != 0 && workers[i].ntasks))
 			/* At least this worker is executing something */
@@ -471,7 +478,12 @@ int _starpu_get_multi_worker_task(struct _starpu_worker *workers, struct starpu_
 			STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&workers[i].sched_mutex);
 			tasks[i] = _starpu_pop_task(&workers[i]);
 			STARPU_PTHREAD_MUTEX_LOCK_SCHED(&workers[i].sched_mutex);
-			if(tasks[i] != NULL)
+			if (workers[i].state_keep_awake)
+			{
+				keep_awake = workers[i].state_keep_awake;
+				workers[i].state_keep_awake = 0;
+			}
+			if(tasks[i] != NULL || keep_awake)
 			{
 				_starpu_worker_set_status_scheduling_done(workers[i].workerid);
 				_starpu_worker_set_status_wakeup(workers[i].workerid);
@@ -482,6 +494,9 @@ int _starpu_get_multi_worker_task(struct _starpu_worker *workers, struct starpu_
 #endif
 
 				count ++;
+				if (tasks[i] == NULL)
+					/* no task, but keep_awake */
+					continue;
 				j = _starpu_get_job_associated_to_task(tasks[i]);
 				is_parallel_task = (j->task_size > 1);
 				if (workers[i].pipeline_length)

+ 17 - 12
src/sched_policies/work_stealing_policy.c

@@ -535,16 +535,16 @@ static struct starpu_task *ws_pop_task(unsigned sched_ctx_id)
 
 	if (task)
 	{
-		_starpu_worker_relax_on();
-		_starpu_sched_ctx_lock_write(sched_ctx_id);
 		/* there was a local task */
 		ws->per_worker[workerid].busy = 1;
+		_starpu_worker_relax_on();
+		_starpu_sched_ctx_lock_write(sched_ctx_id);
 		starpu_sched_ctx_list_task_counters_decrement(sched_ctx_id, workerid);
 		unsigned child_sched_ctx = starpu_sched_ctx_worker_is_master_for_child_ctx(workerid, sched_ctx_id);
 		if(child_sched_ctx != STARPU_NMAX_SCHED_CTXS)
 		{
-			starpu_sched_ctx_move_task_to_ctx(task, child_sched_ctx, 1, 1);
-			starpu_sched_ctx_revert_task_counters(sched_ctx_id, task->flops);
+			starpu_sched_ctx_move_task_to_ctx_locked(task, child_sched_ctx, 1);
+			starpu_sched_ctx_revert_task_counters_ctx_locked(sched_ctx_id, task->flops);
 			task = NULL;
 		}
 		_starpu_sched_ctx_unlock_write(sched_ctx_id);
@@ -587,30 +587,35 @@ static struct starpu_task *ws_pop_task(unsigned sched_ctx_id)
 #ifndef STARPU_NON_BLOCKING_DRIVERS
         /* While stealing, perhaps somebody actually give us a task, don't miss
          * the opportunity to take it before going to sleep. */
-	if (!task)
 	{
-		task = ws_pick_task(ws, workerid, workerid);
-		if (task)
-			locality_popped_task(ws, task, workerid, sched_ctx_id);
+		struct _starpu_worker *worker = _starpu_get_worker_struct(starpu_worker_get_id());
+		if (!task && worker->keep_awake)
+		{
+			/* keep_awake notice taken into account here, clear flag */
+			worker->keep_awake = 0;
+			task = ws_pick_task(ws, workerid, workerid);
+			if (task)
+				locality_popped_task(ws, task, workerid, sched_ctx_id);
+		}
 	}
 #endif
 
-	_starpu_worker_relax_on();
 	if (task)
 	{
+		_starpu_worker_relax_on();
 		_starpu_sched_ctx_lock_write(sched_ctx_id);
 		unsigned child_sched_ctx = starpu_sched_ctx_worker_is_master_for_child_ctx(workerid, sched_ctx_id);
 		if(child_sched_ctx != STARPU_NMAX_SCHED_CTXS)
 		{
-			starpu_sched_ctx_move_task_to_ctx(task, child_sched_ctx, 1, 1);
-			starpu_sched_ctx_revert_task_counters(sched_ctx_id, task->flops);
+			starpu_sched_ctx_move_task_to_ctx_locked(task, child_sched_ctx, 1);
+			starpu_sched_ctx_revert_task_counters_ctx_locked(sched_ctx_id, task->flops);
 			_starpu_sched_ctx_unlock_write(sched_ctx_id);
 			_starpu_worker_relax_off();
 			return NULL;
 		}
 		_starpu_sched_ctx_unlock_write(sched_ctx_id);
+		_starpu_worker_relax_off();
 	}
-	_starpu_worker_relax_off();
 	ws->per_worker[workerid].busy = !!task;
 	return task;
 }