Bladeren bron

Add starpu_wake_worker_locked and starpu_wakeup_worker_locked. Use starpu_wake{,up}_worker{,_locked} in all code instead of explicitly waking the scheduling condition

Samuel Thibault 9 jaren geleden
bovenliggende
commit
c277c9b080

+ 4 - 0
include/starpu_scheduler.h

@@ -56,6 +56,10 @@ void starpu_worker_get_sched_condition(int workerid, starpu_pthread_mutex_t **sc
  * It returns 0 whenever the worker is not in a sleeping state */
 int starpu_wake_worker(int workerid);
 int starpu_wakeup_worker(int workerid, starpu_pthread_cond_t *cond, starpu_pthread_mutex_t *mutex);
+/* This is a version of starpu_wake_worker which assumes that the sched mutex is locked */
+int starpu_wake_worker_locked(int workerid);
+/* This is a version of starpu_wakeup_worker which assumes that the sched mutex is locked */
+int starpu_wakeup_worker_locked(int workerid, starpu_pthread_cond_t *cond, starpu_pthread_mutex_t *mutex);
 
 int starpu_worker_can_execute_task(unsigned workerid, struct starpu_task *task, unsigned nimpl);
 int starpu_worker_can_execute_task_impl(unsigned workerid, struct starpu_task *task, unsigned *impl_mask);

+ 1 - 1
src/core/jobs.c

@@ -698,7 +698,7 @@ int _starpu_push_local_task(struct _starpu_worker *worker, struct starpu_task *t
 			starpu_task_list_push_back(&worker->local_tasks, task);
 	}
 
-	STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
+	starpu_wake_worker_locked(worker->workerid);
 	starpu_push_task_end(task);
 	STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
 

+ 18 - 6
src/core/workers.c

@@ -1909,10 +1909,8 @@ void starpu_worker_get_sched_condition(int workerid, starpu_pthread_mutex_t **sc
 	*sched_mutex = &config.workers[workerid].sched_mutex;
 }
 
-int starpu_wakeup_worker(int workerid, starpu_pthread_cond_t *cond, starpu_pthread_mutex_t *mutex)
+int starpu_wakeup_worker_locked(int workerid, starpu_pthread_cond_t *cond, starpu_pthread_mutex_t *mutex)
 {
-	int success = 0;
-	STARPU_PTHREAD_MUTEX_LOCK(mutex);
 #ifdef STARPU_SIMGRID
 	starpu_pthread_queue_broadcast(&_starpu_simgrid_task_queue[workerid]);
 #endif
@@ -1920,22 +1918,36 @@ int starpu_wakeup_worker(int workerid, starpu_pthread_cond_t *cond, starpu_pthre
 	{
 		config.workers[workerid].status = STATUS_WAKING_UP;
 		STARPU_PTHREAD_COND_SIGNAL(cond);
-		success = 1;
+		return 1;
 	}
+	return 0;
+}
+
+int starpu_wakeup_worker(int workerid, starpu_pthread_cond_t *cond, starpu_pthread_mutex_t *mutex)
+{
+	int success;
+	STARPU_PTHREAD_MUTEX_LOCK(mutex);
+	success = starpu_wakeup_worker_locked(workerid, cond, mutex);
 	STARPU_PTHREAD_MUTEX_UNLOCK(mutex);
 	return success;
 }
 
-int starpu_wake_worker(int workerid)
+int starpu_wake_worker_locked(int workerid)
 {
 	starpu_pthread_mutex_t *sched_mutex;
 	starpu_pthread_cond_t *sched_cond;
 	starpu_worker_get_sched_condition(workerid, &sched_mutex, &sched_cond);
+	return starpu_wakeup_worker_locked(workerid, sched_cond, sched_mutex);
+}
 
+int starpu_wake_worker(int workerid)
+{
+	starpu_pthread_mutex_t *sched_mutex;
+	starpu_pthread_cond_t *sched_cond;
+	starpu_worker_get_sched_condition(workerid, &sched_mutex, &sched_cond);
 	return starpu_wakeup_worker(workerid, sched_cond, sched_mutex);
 }
 
-
 int starpu_worker_get_nids_by_type(enum starpu_worker_archtype type, int *workerids, int maxsize)
 {
 	unsigned nworkers = starpu_worker_get_count();

+ 5 - 5
src/sched_policies/deque_modeling_policy_data_aware.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2010-2014  Université de Bordeaux
+ * Copyright (C) 2010-2015  Université de Bordeaux
  * Copyright (C) 2010, 2011, 2012, 2013, 2015  CNRS
  * Copyright (C) 2011  Télécom-SudParis
  * Copyright (C) 2011-2012  INRIA
@@ -432,8 +432,8 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 		}
 
 
-#ifndef STARPU_NON_BLOCKING_DRIVERS
-		STARPU_PTHREAD_COND_SIGNAL(sched_cond);
+#if !defined(STARPU_NON_BLOCKING_DRIVERS) || defined(STARPU_SIMGRID)
+		starpu_wakeup_worker_locked(best_workerid, sched_cond, sched_mutex);
 #endif
 		starpu_push_task_end(task);
 		STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
@@ -444,8 +444,8 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 		starpu_task_list_push_back (&dt->queue_array[best_workerid]->taskq, task);
 		dt->queue_array[best_workerid]->ntasks++;
 		dt->queue_array[best_workerid]->nprocessed++;
-#ifndef STARPU_NON_BLOCKING_DRIVERS
-		STARPU_PTHREAD_COND_SIGNAL(sched_cond);
+#if !defined(STARPU_NON_BLOCKING_DRIVERS) || defined(STARPU_SIMGRID)
+		starpu_wakeup_worker_locked(best_workerid, sched_cond, sched_mutex);
 #endif
 		starpu_push_task_end(task);
 		STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);

+ 1 - 4
src/sched_policies/eager_central_priority_policy.c

@@ -265,10 +265,7 @@ static struct starpu_task *_starpu_priority_pop_task(unsigned sched_ctx_id)
 #ifdef STARPU_NON_BLOCKING_DRIVERS
 				starpu_bitmap_unset(data->waiters, worker);
 #else
-				starpu_pthread_mutex_t *sched_mutex;
-				starpu_pthread_cond_t *sched_cond;
-				starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
-				STARPU_PTHREAD_COND_SIGNAL(sched_cond);
+				starpu_wake_worker_locked(worker);
 #endif
 			}
 		}

+ 2 - 7
src/sched_policies/locality_work_stealing_policy.c

@@ -221,19 +221,14 @@ static int lws_push_task(struct starpu_task *task)
 
 	STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
 
-#ifndef STARPU_NON_BLOCKING_DRIVERS
+#if !defined(STARPU_NON_BLOCKING_DRIVERS) || defined(STARPU_SIMGRID)
 	/* TODO: implement fine-grain signaling, similar to what eager does */
 	struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
 	struct starpu_sched_ctx_iterator it;
-	unsigned worker;
 
 	workers->init_iterator(workers, &it);
 	while(workers->has_next(workers, &it))
-	{
-		worker = workers->get_next(workers, &it);
-		starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
-		STARPU_PTHREAD_COND_SIGNAL(sched_cond);
-	}
+		starpu_wake_worker(workers->get_next(workers, &it));
 #endif
 
 

+ 3 - 10
src/sched_policies/parallel_eager.c

@@ -173,14 +173,7 @@ static int push_task_peager_policy(struct starpu_task *task)
 		    starpu_worker_get_type(worker) != STARPU_MIC_WORKER &&
 		    starpu_worker_get_type(worker) != STARPU_CPU_WORKER)  
 			|| (master == worker))
-		{
-			starpu_pthread_mutex_t *sched_mutex;
-			starpu_pthread_cond_t *sched_cond;
-			starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
-			STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
-			STARPU_PTHREAD_COND_SIGNAL(sched_cond);
-			STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
-		}
+			starpu_wake_worker(worker);
 	}
 #endif
 
@@ -274,8 +267,8 @@ static struct starpu_task *pop_task_peager_policy(unsigned sched_ctx_id)
 
 				_starpu_fifo_push_task(data->local_fifo[local_worker], alias);
 
-#ifndef STARPU_NON_BLOCKING_DRIVERS
-				STARPU_PTHREAD_COND_SIGNAL(sched_cond);
+#if !defined(STARPU_NON_BLOCKING_DRIVERS) || defined(STARPU_SIMGRID)
+				starpu_wakeup_worker_locked(local_worker, sched_cond, sched_mutex);
 #endif
 				STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
 

+ 3 - 2
src/sched_policies/work_stealing_policy.c

@@ -23,6 +23,7 @@
 #include <core/workers.h>
 #include <sched_policies/deque_queues.h>
 #include <core/debug.h>
+#include <starpu_scheduler.h>
 
 #ifdef HAVE_AYUDAME_H
 #include <Ayudame.h>
@@ -380,8 +381,8 @@ int ws_push_task(struct starpu_task *task)
 		starpu_pthread_mutex_t *sched_mutex;
 		starpu_pthread_cond_t *sched_cond;
 		starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
-#ifndef STARPU_NON_BLOCKING_DRIVERS
-		STARPU_PTHREAD_COND_SIGNAL(sched_cond);
+#if !defined(STARPU_NON_BLOCKING_DRIVERS) || defined(STARPU_SIMGRID)
+		starpu_wakeup_worker_locked(worker, sched_cond, sched_mutex);
 #endif
 		STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
 	}