Bladeren bron

ongoing work on synchronization to remove multi-lock ordering issues
still a lot of work ahead

Olivier Aumage 8 jaren geleden
bovenliggende
commit
fd2034810e

+ 3 - 0
include/starpu_thread_util.h

@@ -89,6 +89,7 @@
 } while (0)
 
 #define STARPU_PTHREAD_MUTEX_LOCK_SCHED(mutex) do {			      \
+	if (0) fprintf(stderr, "%p: %s (%s:%d) LOCK_SCHED(%p)\n", (void*)pthread_self(), __func__, __FILE__, __LINE__, mutex); \
 	int p_ret = starpu_pthread_mutex_lock_sched(mutex);		      \
 	if (STARPU_UNLIKELY(p_ret)) {                                          \
 		fprintf(stderr,                                                \
@@ -115,6 +116,7 @@ int _starpu_pthread_mutex_trylock(starpu_pthread_mutex_t *mutex, char *file, int
 }
 
 #define STARPU_PTHREAD_MUTEX_TRYLOCK_SCHED(mutex) \
+	if (0) fprintf(stderr, "%p: %s (%s:%d) TRYLOCK_SCHED(%p)\n", (void*)pthread_self(), __func__, __FILE__, __LINE__, mutex); \
 	_starpu_pthread_mutex_trylock_sched(mutex, __FILE__, __LINE__)
 static STARPU_INLINE
 int _starpu_pthread_mutex_trylock_sched(starpu_pthread_mutex_t *mutex, char *file, int line)
@@ -141,6 +143,7 @@ int _starpu_pthread_mutex_trylock_sched(starpu_pthread_mutex_t *mutex, char *fil
 } while (0)
 
 #define STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(mutex) do {                          \
+	if (0) fprintf(stderr, "%p: %s (%s:%d) UNLOCK_SCHED(%p)\n", (void*)pthread_self(), __func__, __FILE__, __LINE__, mutex); \
 	int p_ret = starpu_pthread_mutex_unlock_sched(mutex);                  \
 	if (STARPU_UNLIKELY(p_ret)) {                                          \
 		fprintf(stderr,                                                \

+ 0 - 30
src/common/thread.c

@@ -713,46 +713,16 @@ int starpu_pthread_barrier_wait(starpu_pthread_barrier_t *barrier)
  * macros of course) which record when the mutex is held or not */
 int starpu_pthread_mutex_lock_sched(starpu_pthread_mutex_t *mutex)
 {
-	const int workerid = starpu_worker_get_id();
-	struct _starpu_worker * const worker = (workerid != -1)?_starpu_get_worker_struct(workerid):NULL;
-	if(worker && mutex == &worker->sched_mutex)
-	{
-		STARPU_ASSERT(worker->sched_mutex_depth < UINT_MAX);
-		worker->sched_mutex_depth++;
-		if (worker->sched_mutex_depth > 1)
-			return 0;
-	}
-
 	return starpu_pthread_mutex_lock(mutex);
 }
 
 int starpu_pthread_mutex_unlock_sched(starpu_pthread_mutex_t *mutex)
 {
-	const int workerid = starpu_worker_get_id();
-	struct _starpu_worker * const worker = (workerid != -1)?_starpu_get_worker_struct(workerid):NULL;
-	if(worker && mutex == &worker->sched_mutex)
-	{
-		STARPU_ASSERT(worker->sched_mutex_depth > 0);
-		worker->sched_mutex_depth--;
-		if (worker->sched_mutex_depth > 0)
-			return 0;
-	}
-
 	return starpu_pthread_mutex_unlock(mutex);
 }
 
 int starpu_pthread_mutex_trylock_sched(starpu_pthread_mutex_t *mutex)
 {
-	const int workerid = starpu_worker_get_id();
-	struct _starpu_worker * const worker = (workerid != -1)?_starpu_get_worker_struct(workerid):NULL;
-	if(worker && mutex == &worker->sched_mutex)
-	{
-		STARPU_ASSERT(worker->sched_mutex_depth < UINT_MAX);
-		worker->sched_mutex_depth++;
-		if (worker->sched_mutex_depth > 1)
-			return 0;
-	}
-
 	return starpu_pthread_mutex_trylock(mutex);
 }
 

+ 24 - 37
src/core/sched_ctx.c

@@ -2157,16 +2157,22 @@ unsigned starpu_sched_ctx_master_get_context(int masterid)
 struct _starpu_sched_ctx *__starpu_sched_ctx_get_sched_ctx_for_worker_and_job(struct _starpu_worker *worker, struct _starpu_job *j)
 {
 	struct _starpu_sched_ctx_list_iterator list_it;
+	struct _starpu_sched_ctx *ret = NULL;
 
+	STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
 	_starpu_sched_ctx_list_iterator_init(worker->sched_ctx_list, &list_it);
 	while (_starpu_sched_ctx_list_iterator_has_next(&list_it))
 	{
 		struct _starpu_sched_ctx_elt *e = _starpu_sched_ctx_list_iterator_get_next(&list_it);
 		struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(e->sched_ctx);
 		if (j->task->sched_ctx == sched_ctx->id)
-			return sched_ctx;
+		{
+			ret = sched_ctx;
+			break;
+		}
 	}
-	return NULL;
+	STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
+	return ret;
 }
 
 void starpu_sched_ctx_revert_task_counters(unsigned sched_ctx_id, double ready_flops)
@@ -2242,12 +2248,14 @@ void starpu_sched_ctx_list_task_counters_increment_all(struct starpu_task *task,
 		struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
 		struct starpu_sched_ctx_iterator it;
 
+		_starpu_sched_ctx_lock_write(sched_ctx_id);
 		workers->init_iterator_for_parallel_tasks(workers, &it, task);
 		while(workers->has_next(workers, &it))
 		{
 			int worker = workers->get_next(workers, &it);
 			starpu_sched_ctx_list_task_counters_increment(sched_ctx_id, worker);
 		}
+		_starpu_sched_ctx_unlock_write(sched_ctx_id);
 	}
 }
 
@@ -2255,32 +2263,22 @@ void starpu_sched_ctx_list_task_counters_decrement_all(struct starpu_task *task,
 {
 	if (_starpu_get_nsched_ctxs() > 1)
 	{
-		int curr_workerid = starpu_worker_get_id();
-		struct _starpu_worker *curr_worker_str = NULL, *worker_str;
-		if(curr_workerid != -1)
-		{
-			curr_worker_str = _starpu_get_worker_struct(curr_workerid);
-			STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&curr_worker_str->sched_mutex);
-		}
-
 		struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
 		struct starpu_sched_ctx_iterator it;
+		_starpu_sched_ctx_lock_write(sched_ctx_id);
 		workers->init_iterator_for_parallel_tasks(workers, &it, task);
 		while(workers->has_next(workers, &it))
 		{
-			int worker = workers->get_next(workers, &it);
-
-			worker_str = _starpu_get_worker_struct(worker);
-			if (worker_str->nsched_ctxs > 1)
+			int workerid = workers->get_next(workers, &it);
+			struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
+			if (worker->nsched_ctxs > 1)
 			{
-				STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker_str->sched_mutex);
-				starpu_sched_ctx_list_task_counters_decrement(sched_ctx_id, worker);
-				STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker_str->sched_mutex);
+				_starpu_worker_lock_for_observation(workerid);
+				starpu_sched_ctx_list_task_counters_decrement(sched_ctx_id, workerid);
+				_starpu_worker_unlock_for_observation(workerid);
 			}
 		}
-
-		if(curr_workerid != -1)
-			STARPU_PTHREAD_MUTEX_LOCK_SCHED(&curr_worker_str->sched_mutex);
+		_starpu_sched_ctx_unlock_write(sched_ctx_id);
 	}
 }
 
@@ -2288,33 +2286,22 @@ void starpu_sched_ctx_list_task_counters_reset_all(struct starpu_task *task, uns
 {
 	if (_starpu_get_nsched_ctxs() > 1)
 	{
-		int curr_workerid = starpu_worker_get_id();
-		struct _starpu_worker *curr_worker_str = NULL, *worker_str;
-		if(curr_workerid != -1)
-		{
-			curr_worker_str = _starpu_get_worker_struct(curr_workerid);
-			STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&curr_worker_str->sched_mutex);
-		}
-
 		_starpu_sched_ctx_lock_write(sched_ctx_id);
 		struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
 		struct starpu_sched_ctx_iterator it;
 		workers->init_iterator_for_parallel_tasks(workers, &it, task);
 		while(workers->has_next(workers, &it))
 		{
-			int worker = workers->get_next(workers, &it);
-			worker_str = _starpu_get_worker_struct(worker);
-			if (worker_str->nsched_ctxs > 1)
+			int workerid = workers->get_next(workers, &it);
+			struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
+			if (worker->nsched_ctxs > 1)
 			{
-				STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker_str->sched_mutex);
-				starpu_sched_ctx_list_task_counters_reset(sched_ctx_id, worker);
-				STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker_str->sched_mutex);
+				_starpu_worker_lock_for_observation(workerid);
+				starpu_sched_ctx_list_task_counters_reset(sched_ctx_id, workerid);
+				_starpu_worker_unlock_for_observation(workerid);
 			}
 		}
 		_starpu_sched_ctx_unlock_write(sched_ctx_id);
-
-		if(curr_workerid != -1)
-			STARPU_PTHREAD_MUTEX_LOCK_SCHED(&curr_worker_str->sched_mutex);
 	}
 }
 

+ 4 - 0
src/core/sched_ctx.h

@@ -248,7 +248,9 @@ static inline int _starpu_sched_ctx_check_write_locked(unsigned sched_ctx_id)
 static inline void _starpu_sched_ctx_lock_write(unsigned sched_ctx_id)
 {
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	STARPU_HG_DISABLE_CHECKING(sched_ctx->lock_write_owner);
 	STARPU_ASSERT(sched_ctx->lock_write_owner != starpu_pthread_self());
+	STARPU_HG_ENABLE_CHECKING(sched_ctx->lock_write_owner);
 	STARPU_PTHREAD_RWLOCK_WRLOCK(&sched_ctx->rwlock);
 	sched_ctx->lock_write_owner = starpu_pthread_self();
 }
@@ -264,7 +266,9 @@ static inline void _starpu_sched_ctx_unlock_write(unsigned sched_ctx_id)
 static inline void _starpu_sched_ctx_lock_read(unsigned sched_ctx_id)
 {
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	STARPU_HG_DISABLE_CHECKING(sched_ctx->lock_write_owner);
 	STARPU_ASSERT(sched_ctx->lock_write_owner != starpu_pthread_self());
+	STARPU_HG_ENABLE_CHECKING(sched_ctx->lock_write_owner);
 	STARPU_PTHREAD_RWLOCK_RDLOCK(&sched_ctx->rwlock);
 }
 

+ 19 - 12
src/core/workers.c

@@ -313,8 +313,8 @@ static inline int _starpu_can_use_nth_implementation(enum starpu_worker_archtype
 /* must be called with sched_mutex locked to protect state_blocked_in_parallel */
 int starpu_worker_can_execute_task(unsigned workerid, struct starpu_task *task, unsigned nimpl)
 {
-	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
-	if(worker->state_blocked_in_parallel) return 0;
+	if(starpu_worker_is_blocked_in_parallel(workerid))
+		return 0;
 
 	/* TODO: check that the task operand sizes will fit on that device */
 	return (task->cl->where & _starpu_config.workers[workerid].worker_mask) &&
@@ -325,8 +325,8 @@ int starpu_worker_can_execute_task(unsigned workerid, struct starpu_task *task,
 /* must be called with sched_mutex locked to protect state_blocked_in_parallel */
 int starpu_worker_can_execute_task_impl(unsigned workerid, struct starpu_task *task, unsigned *impl_mask)
 {
-	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
-	if(worker->state_blocked_in_parallel) return 0;
+	if(starpu_worker_is_blocked_in_parallel(workerid))
+		return 0;
 
 	unsigned mask;
 	int i;
@@ -367,8 +367,8 @@ int starpu_worker_can_execute_task_impl(unsigned workerid, struct starpu_task *t
 /* must be called with sched_mutex locked to protect state_blocked */
 int starpu_worker_can_execute_task_first_impl(unsigned workerid, struct starpu_task *task, unsigned *nimpl)
 {
-	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
-	if(worker->state_blocked_in_parallel) return 0;
+	if(starpu_worker_is_blocked_in_parallel(workerid))
+		return 0;
 	int i;
 	enum starpu_worker_archtype arch;
 	struct starpu_codelet *cl;
@@ -578,6 +578,7 @@ static void _starpu_worker_init(struct _starpu_worker *workerarg, struct _starpu
 	workerarg->sched_mutex_depth = 0;
 	workerarg->is_slave_somewhere = 0;
 
+	workerarg->state_safe_for_observation = 1;
 	workerarg->state_sched_op_pending = 0;
 	workerarg->state_changing_ctx_waiting = 0;
 	workerarg->state_changing_ctx_notice = 0;
@@ -1691,12 +1692,18 @@ unsigned starpu_worker_get_count(void)
 
 unsigned starpu_worker_is_blocked_in_parallel(int workerid)
 {
-	return (unsigned)_starpu_config.workers[workerid].state_blocked_in_parallel;
+	_starpu_worker_lock_for_observation(workerid);
+	unsigned ret = _starpu_config.workers[workerid].state_blocked_in_parallel;
+	_starpu_worker_unlock_for_observation(workerid);
+	return ret;
 }
 
 unsigned starpu_worker_is_slave_somewhere(int workerid)
 {
-	return _starpu_config.workers[workerid].is_slave_somewhere;
+	_starpu_worker_lock_for_observation(workerid);
+	unsigned ret = _starpu_config.workers[workerid].is_slave_somewhere;
+	_starpu_worker_unlock_for_observation(workerid);
+	return ret;
 }
 
 int starpu_worker_get_count_by_type(enum starpu_worker_archtype type)
@@ -2067,12 +2074,12 @@ static int starpu_wakeup_worker_locked(int workerid, starpu_pthread_cond_t *sche
 	return 0;
 }
 
-static int starpu_wakeup_worker(int workerid, starpu_pthread_cond_t *sched_cond, starpu_pthread_mutex_t *mutex)
+static int starpu_wakeup_worker(int workerid, starpu_pthread_cond_t *sched_cond, starpu_pthread_mutex_t *sched_mutex)
 {
 	int success;
-	STARPU_PTHREAD_MUTEX_LOCK_SCHED(mutex);
-	success = starpu_wakeup_worker_locked(workerid, sched_cond, mutex);
-	STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(mutex);
+	STARPU_PTHREAD_MUTEX_LOCK_SCHED(sched_mutex);
+	success = starpu_wakeup_worker_locked(workerid, sched_cond, sched_mutex);
+	STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(sched_mutex);
 	return success;
 }
 

+ 53 - 0
src/core/workers.h

@@ -87,6 +87,7 @@ LIST_TYPE(_starpu_worker,
 	 * since the condition is shared for multiple purpose */
 	starpu_pthread_cond_t sched_cond;
         starpu_pthread_mutex_t sched_mutex; /* mutex protecting sched_cond */
+	int state_safe_for_observation:1; /* mark scheduling sections where other workers can safely access the worker state */
 	int state_sched_op_pending:1; /* a task pop is ongoing even though sched_mutex may temporarily be unlocked */
 	int state_changing_ctx_waiting:1; /* a thread is waiting for operations such as pop to complete before acquiring sched_mutex and modifying the worker ctx*/
 	int state_changing_ctx_notice:1; /* the worker ctx is about to change or being changed, wait for flag to be cleared before starting new scheduling operations */
@@ -771,6 +772,7 @@ static inline void _starpu_worker_enter_sched_op(struct _starpu_worker * const w
 
 	/* no block request and no ctx change ahead,
 	 * enter sched_op */
+	worker->state_safe_for_observation = 0;
 	worker->state_sched_op_pending = 1;
 }
 
@@ -780,6 +782,7 @@ static inline void _starpu_worker_enter_sched_op(struct _starpu_worker * const w
  */
 static inline void  _starpu_worker_leave_sched_op(struct _starpu_worker * const worker)
 {
+	worker->state_safe_for_observation = 1;
 	worker->state_sched_op_pending = 0;
 	if (worker->state_changing_ctx_waiting)
 		STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
@@ -827,4 +830,54 @@ static inline void _starpu_worker_leave_changing_ctx_op(struct _starpu_worker *
 	STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
 }
 
+/* lock a worker for observing contents 
+ *
+ * notes:
+ * - if the observed worker is not in state_safe_for_observation, the function block until the state is reached */
+static inline void _starpu_worker_lock_for_observation(int workerid)
+{
+	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
+	STARPU_ASSERT(worker != NULL);
+	STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
+	if (workerid != starpu_worker_get_id())
+	{
+		while (!worker->state_safe_for_observation)
+		{
+			STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
+		}
+	}
+}
+
+static inline void _starpu_worker_unlock_for_observation(int workerid)
+{
+	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
+	STARPU_ASSERT(worker != NULL);
+	STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
+}
+
+/* Temporarily allow other worker to access current worker state, when still scheduling,
+ * but the scheduling has not yet been made or is already done */
+static inline void _starpu_worker_enter_section_safe_for_observation(void)
+{
+	int workerid = starpu_worker_get_id();
+	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
+	STARPU_ASSERT(worker != NULL);
+	STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
+	STARPU_ASSERT(!worker->state_safe_for_observation);
+	worker->state_safe_for_observation = 1;
+	STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
+	STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
+}
+
+static inline void _starpu_worker_leave_section_safe_for_observation(void)
+{
+	int workerid = starpu_worker_get_id();
+	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
+	STARPU_ASSERT(worker != NULL);
+	STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
+	STARPU_ASSERT(worker->state_safe_for_observation);
+	worker->state_safe_for_observation = 0;
+	STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
+}
+
 #endif // __WORKERS_H__

+ 10 - 0
src/drivers/driver_common/driver_common.c

@@ -359,7 +359,9 @@ struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *worker, int w
 	/*else try to pop a task*/
 	else
 	{
+		STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
 		task = _starpu_pop_task(worker);
+		STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
 	}
 
 #if !defined(STARPU_SIMGRID)
@@ -373,6 +375,7 @@ struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *worker, int w
 		 * executed, and thus hanging. */
 
 		_starpu_worker_set_status_sleeping(workerid);
+		STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
 
 		if (_starpu_worker_can_block(memnode, worker)
 			&& !_starpu_sched_ctx_last_worker_awake(worker))
@@ -406,6 +409,7 @@ struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *worker, int w
 	{
 		_starpu_worker_set_status_sleeping(workerid);
 	}
+	STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
 	worker->spinning_backoff = BACKOFF_MIN;
 
 	_starpu_worker_leave_sched_op(worker);
@@ -461,11 +465,14 @@ int _starpu_get_multi_worker_task(struct _starpu_worker *workers, struct starpu_
 			_starpu_worker_enter_sched_op(&workers[i]);
 #endif
 			_starpu_worker_set_status_scheduling(workers[i].workerid);
+			STARPU_PTHREAD_MUTEX_LOCK_SCHED(&workers[i].sched_mutex);
 			tasks[i] = _starpu_pop_task(&workers[i]);
+			STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&workers[i].sched_mutex);
 			if(tasks[i] != NULL)
 			{
 				_starpu_worker_set_status_scheduling_done(workers[i].workerid);
 				_starpu_worker_set_status_wakeup(workers[i].workerid);
+				STARPU_PTHREAD_COND_BROADCAST(&workers[i].sched_cond);
 #ifdef STARPU_NON_BLOCKING_DRIVERS
 				_starpu_worker_leave_sched_op(&workers[i]);
 				STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&workers[i].sched_mutex);
@@ -505,6 +512,7 @@ int _starpu_get_multi_worker_task(struct _starpu_worker *workers, struct starpu_
 			else
 			{
 				_starpu_worker_set_status_sleeping(workers[i].workerid);
+				STARPU_PTHREAD_COND_BROADCAST(&workers[i].sched_cond);
 #ifdef STARPU_NON_BLOCKING_DRIVERS
 				_starpu_worker_leave_sched_op(&workers[i]);
 				STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&workers[i].sched_mutex);
@@ -528,6 +536,7 @@ int _starpu_get_multi_worker_task(struct _starpu_worker *workers, struct starpu_
 		 * driver may go block just after the scheduler got a new task to be
 		 * executed, and thus hanging. */
 		_starpu_worker_set_status_sleeping(workerid);
+		STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
 
 		if (_starpu_worker_can_block(memnode, worker)
 				&& !_starpu_sched_ctx_last_worker_awake(worker))
@@ -551,6 +560,7 @@ int _starpu_get_multi_worker_task(struct _starpu_worker *workers, struct starpu_
 	}
 
 	_starpu_worker_set_status_wakeup(workerid);
+	STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
 	worker->spinning_backoff = BACKOFF_MIN;
 #endif /* !STARPU_SIMGRID */
 

+ 57 - 10
src/sched_policies/eager_central_policy.c

@@ -32,9 +32,41 @@ struct _starpu_eager_center_policy_data
 {
 	struct _starpu_fifo_taskq *fifo;
 	starpu_pthread_mutex_t policy_mutex;
+	starpu_pthread_cond_t policy_cond;
+	int state_busy:1;
 	struct starpu_bitmap *waiters;
+#warning debug code
+	/* debug only */
+	void *busy_thread;
+	int busy_workerid;
+	int busy_leave_signaled;
 };
 
+static void enter_busy_state(struct _starpu_eager_center_policy_data *data)
+{
+	STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
+	while (data->state_busy)
+	{
+		STARPU_PTHREAD_COND_WAIT(&data->policy_cond, &data->policy_mutex);
+	}
+	data->state_busy = 1;
+	data->busy_thread = (void *)pthread_self();
+	data->busy_workerid = starpu_worker_get_id();
+	data->busy_leave_signaled = 0;
+	STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
+}
+
+static void leave_busy_state(struct _starpu_eager_center_policy_data *data)
+{
+	STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
+	data->state_busy = 0;
+	data->busy_thread = NULL;
+	data->busy_workerid = -1;
+	STARPU_PTHREAD_COND_SIGNAL(&data->policy_cond);
+	data->busy_leave_signaled = 1;
+	STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
+}
+
 static void initialize_eager_center_policy(unsigned sched_ctx_id)
 {
 	struct _starpu_eager_center_policy_data *data;
@@ -53,6 +85,12 @@ static void initialize_eager_center_policy(unsigned sched_ctx_id)
 
 	starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)data);
 	STARPU_PTHREAD_MUTEX_INIT(&data->policy_mutex, NULL);
+	STARPU_PTHREAD_COND_INIT(&data->policy_cond, NULL);
+	data->state_busy = 0;
+
+	data->busy_thread = NULL;
+	data->busy_workerid = -1;
+	data->busy_leave_signaled = 0;
 }
 
 static void deinitialize_eager_center_policy(unsigned sched_ctx_id)
@@ -67,6 +105,7 @@ static void deinitialize_eager_center_policy(unsigned sched_ctx_id)
 	starpu_bitmap_destroy(data->waiters);
 
 	STARPU_PTHREAD_MUTEX_DESTROY(&data->policy_mutex);
+	STARPU_PTHREAD_COND_DESTROY(&data->policy_cond);
 	free(data);
 }
 
@@ -75,7 +114,7 @@ static int push_task_eager_policy(struct starpu_task *task)
 	unsigned sched_ctx_id = task->sched_ctx;
 	struct _starpu_eager_center_policy_data *data = (struct _starpu_eager_center_policy_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 
-	STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
+	enter_busy_state(data);
 	starpu_task_list_push_back(&data->fifo->taskq,task);
 	data->fifo->ntasks++;
 	data->fifo->nprocessed++;
@@ -115,7 +154,7 @@ static int push_task_eager_policy(struct starpu_task *task)
 		}
 	}
 	/* Let the task free */
-	STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
+	leave_busy_state(data);
 
 #if !defined(STARPU_NON_BLOCKING_DRIVERS) || defined(STARPU_SIMGRID)
 	/* Now that we have a list of potential workers, try to wake one */
@@ -139,9 +178,9 @@ static struct starpu_task *pop_every_task_eager_policy(unsigned sched_ctx_id)
 {
 	struct _starpu_eager_center_policy_data *data = (struct _starpu_eager_center_policy_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 	unsigned workerid = starpu_worker_get_id_check();
-	STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
+	enter_busy_state(data);
 	struct starpu_task* task = _starpu_fifo_pop_every_task(data->fifo, workerid);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
+	leave_busy_state(data);
 
 	starpu_sched_ctx_list_task_counters_reset_all(task, sched_ctx_id);
 
@@ -150,32 +189,40 @@ static struct starpu_task *pop_every_task_eager_policy(unsigned sched_ctx_id)
 
 static struct starpu_task *pop_task_eager_policy(unsigned sched_ctx_id)
 {
+	_starpu_worker_enter_section_safe_for_observation();
 	struct starpu_task *chosen_task = NULL;
 	unsigned workerid = starpu_worker_get_id_check();
 	struct _starpu_eager_center_policy_data *data = (struct _starpu_eager_center_policy_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 
 	/* block until some event happens */
+	enter_busy_state(data);
+	_starpu_worker_leave_section_safe_for_observation();
+
 	/* Here helgrind would shout that this is unprotected, this is just an
 	 * integer access, and we hold the sched mutex, so we can not miss any
 	 * wake up. */
 	if (!STARPU_RUNNING_ON_VALGRIND && _starpu_fifo_empty(data->fifo))
+	{
+		leave_busy_state(data);
 		return NULL;
+	}
 
 #ifdef STARPU_NON_BLOCKING_DRIVERS
 	if (!STARPU_RUNNING_ON_VALGRIND && starpu_bitmap_get(data->waiters, workerid))
 		/* Nobody woke us, avoid bothering the mutex */
+	{
+		leave_busy_state(data);
 		return NULL;
+	}
 #endif
 
-	STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
-
 	chosen_task = _starpu_fifo_pop_task(data->fifo, workerid);
 	if (!chosen_task)
 		/* Tell pushers that we are waiting for tasks for us */
 		starpu_bitmap_set(data->waiters, workerid);
 
-	STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
-
+	_starpu_worker_enter_section_safe_for_observation();
+	leave_busy_state(data);
 	if(chosen_task)
 	{
 		starpu_sched_ctx_list_task_counters_decrement_all(chosen_task, sched_ctx_id);
@@ -185,10 +232,10 @@ static struct starpu_task *pop_task_eager_policy(unsigned sched_ctx_id)
 		{
 			starpu_sched_ctx_move_task_to_ctx(chosen_task, child_sched_ctx, 1, 1);
 			starpu_sched_ctx_revert_task_counters(sched_ctx_id, chosen_task->flops);
-			return NULL;
+			chosen_task = NULL;
 		}
 	}
-
+	_starpu_worker_leave_section_safe_for_observation();
 
 	return chosen_task;
 }

+ 43 - 21
src/sched_policies/eager_central_priority_policy.c

@@ -48,9 +48,30 @@ struct _starpu_eager_central_prio_data
 {
 	struct _starpu_priority_taskq *taskq;
 	starpu_pthread_mutex_t policy_mutex;
+	starpu_pthread_cond_t policy_cond;
+	int state_busy:1;
 	struct starpu_bitmap *waiters;
 };
 
+static void enter_busy_state(struct _starpu_eager_central_prio_data *data)
+{
+	STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
+	while (data->state_busy)
+	{
+		STARPU_PTHREAD_COND_WAIT(&data->policy_cond, &data->policy_mutex);
+	}
+	data->state_busy = 1;
+	STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
+}
+
+static void leave_busy_state(struct _starpu_eager_central_prio_data *data)
+{
+	STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
+	data->state_busy = 0;
+	STARPU_PTHREAD_COND_SIGNAL(&data->policy_cond);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
+}
+
 /*
  * Centralized queue with priorities
  */
@@ -105,6 +126,8 @@ static void initialize_eager_center_priority_policy(unsigned sched_ctx_id)
 	STARPU_HG_DISABLE_CHECKING(data->taskq->total_ntasks);
 	starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)data);
 	STARPU_PTHREAD_MUTEX_INIT(&data->policy_mutex, NULL);
+	STARPU_PTHREAD_COND_INIT(&data->policy_cond, NULL);
+	data->state_busy = 0;
 }
 
 static void deinitialize_eager_center_priority_policy(unsigned sched_ctx_id)
@@ -117,6 +140,7 @@ static void deinitialize_eager_center_priority_policy(unsigned sched_ctx_id)
 	starpu_bitmap_destroy(data->waiters);
 
 	STARPU_PTHREAD_MUTEX_DESTROY(&data->policy_mutex);
+	STARPU_PTHREAD_COND_DESTROY(&data->policy_cond);
 	free(data);
 }
 
@@ -126,7 +150,7 @@ static int _starpu_priority_push_task(struct starpu_task *task)
 	struct _starpu_eager_central_prio_data *data = (struct _starpu_eager_central_prio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 	struct _starpu_priority_taskq *taskq = data->taskq;
 
-	STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
+	enter_busy_state(data);
 	unsigned priolevel = task->priority - starpu_sched_ctx_get_min_priority(sched_ctx_id);
 	STARPU_ASSERT_MSG(task->priority >= starpu_sched_ctx_get_min_priority(sched_ctx_id) &&
 			  task->priority <= starpu_sched_ctx_get_max_priority(sched_ctx_id), "task priority %d is not between minimum %d and maximum %d\n", task->priority, starpu_sched_ctx_get_min_priority(sched_ctx_id), starpu_sched_ctx_get_max_priority(sched_ctx_id));
@@ -169,7 +193,7 @@ static int _starpu_priority_push_task(struct starpu_task *task)
 		}
 	}
 	/* Let the task free */
-	STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
+	leave_busy_state(data);
 
 #if !defined(STARPU_NON_BLOCKING_DRIVERS) || defined(STARPU_SIMGRID)
 	/* Now that we have a list of potential workers, try to wake one */
@@ -191,6 +215,7 @@ static int _starpu_priority_push_task(struct starpu_task *task)
 
 static struct starpu_task *_starpu_priority_pop_task(unsigned sched_ctx_id)
 {
+	_starpu_worker_enter_section_safe_for_observation();
 	struct starpu_task *chosen_task = NULL, *task, *nexttask;
 	unsigned workerid = starpu_worker_get_id_check();
 	int skipped = 0;
@@ -200,28 +225,26 @@ static struct starpu_task *_starpu_priority_pop_task(unsigned sched_ctx_id)
 	struct _starpu_priority_taskq *taskq = data->taskq;
 
 	/* block until some event happens */
+	enter_busy_state(data);
+	_starpu_worker_leave_section_safe_for_observation();
+
 	/* Here helgrind would shout that this is unprotected, this is just an
 	 * integer access, and we hold the sched mutex, so we can not miss any
 	 * wake up. */
 	if (!STARPU_RUNNING_ON_VALGRIND && taskq->total_ntasks == 0)
+	{
+		leave_busy_state(data);
 		return NULL;
+	}
 
 #ifdef STARPU_NON_BLOCKING_DRIVERS
 	if (!STARPU_RUNNING_ON_VALGRIND && starpu_bitmap_get(data->waiters, workerid))
 		/* Nobody woke us, avoid bothering the mutex */
+	{
+		leave_busy_state(data);
 		return NULL;
+	}
 #endif
-
-	/* release this mutex before trying to wake up other workers */
-	starpu_pthread_mutex_t *curr_sched_mutex;
-	starpu_pthread_cond_t *curr_sched_cond;
-	starpu_worker_get_sched_condition(workerid, &curr_sched_mutex, &curr_sched_cond);
-	STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(curr_sched_mutex);
-
-	/* all workers will block on this mutex anyway so
-	   there's no need for their own mutex to be locked */
-	STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
-
 	unsigned priolevel = taskq->max_prio - taskq->min_prio;
 	do
 	{
@@ -250,7 +273,6 @@ static struct starpu_task *_starpu_priority_pop_task(unsigned sched_ctx_id)
 	}
 	while (!chosen_task && priolevel-- > 0);
 
-
 	if (!chosen_task && skipped)
 	{
 		/* Notify another worker to do that task */
@@ -267,7 +289,9 @@ static struct starpu_task *_starpu_priority_pop_task(unsigned sched_ctx_id)
 #ifdef STARPU_NON_BLOCKING_DRIVERS
 				starpu_bitmap_unset(data->waiters, worker);
 #else
+				_starpu_worker_lock_for_observation(worker);
 				starpu_wake_worker_locked(worker);
+				_starpu_worker_unlock_for_observation(worker);
 #endif
 			}
 		}
@@ -278,10 +302,8 @@ static struct starpu_task *_starpu_priority_pop_task(unsigned sched_ctx_id)
 		/* Tell pushers that we are waiting for tasks for us */
 		starpu_bitmap_set(data->waiters, workerid);
 
-	STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
-
-	/* leave the mutex how it was found before this */
-	STARPU_PTHREAD_MUTEX_LOCK_SCHED(curr_sched_mutex);
+	_starpu_worker_enter_section_safe_for_observation();
+	leave_busy_state(data);
 
 	if(chosen_task)
 	{
@@ -290,12 +312,12 @@ static struct starpu_task *_starpu_priority_pop_task(unsigned sched_ctx_id)
                 unsigned child_sched_ctx = starpu_sched_ctx_worker_is_master_for_child_ctx(workerid, sched_ctx_id);
 		if(child_sched_ctx != STARPU_NMAX_SCHED_CTXS)
 		{
-			starpu_sched_ctx_move_task_to_ctx(chosen_task, child_sched_ctx, 1, 1);
+			starpu_sched_ctx_move_task_to_ctx(chosen_task, child_sched_ctx, 0, 1);
 			starpu_sched_ctx_revert_task_counters(sched_ctx_id, chosen_task->flops);
-			return NULL;
+			chosen_task = NULL;
 		}
 	}
-
+	_starpu_worker_leave_section_safe_for_observation();
 
 	return chosen_task;
 }