Browse Source

remove synchro changes from trunk/, since development has moved to branch synchro/

Olivier Aumage 8 years ago
parent
commit
10ec2f8eec

+ 3 - 0
include/starpu_scheduler.h

@@ -62,8 +62,11 @@ unsigned long starpu_task_get_job_id(struct starpu_task *task);
 /* This function must be called to wake up a worker that is sleeping on the cond. 
  * It returns 0 whenever the worker is not in a sleeping state */
 int starpu_wake_worker(int workerid);
+int starpu_wakeup_worker(int workerid, starpu_pthread_cond_t *cond, starpu_pthread_mutex_t *mutex);
 /* This is a version of starpu_wake_worker which assumes that the sched mutex is locked */
 int starpu_wake_worker_locked(int workerid);
+/* This is a version of starpu_wakeup_worker which assumes that the sched mutex is locked */
+int starpu_wakeup_worker_locked(int workerid, starpu_pthread_cond_t *cond, starpu_pthread_mutex_t *mutex);
 
 int starpu_worker_can_execute_task(unsigned workerid, struct starpu_task *task, unsigned nimpl);
 int starpu_worker_can_execute_task_impl(unsigned workerid, struct starpu_task *task, unsigned *impl_mask);

+ 15 - 28
src/common/thread.c

@@ -714,47 +714,34 @@ int starpu_pthread_barrier_wait(starpu_pthread_barrier_t *barrier)
  * macros of course) which record when the mutex is held or not */
 int starpu_pthread_mutex_lock_sched(starpu_pthread_mutex_t *mutex)
 {
-	const int workerid = starpu_worker_get_id();
-	struct _starpu_worker * const worker = (workerid != -1)?_starpu_get_worker_struct(workerid):NULL;
-	if(worker && mutex == &worker->sched_mutex)
-	{
-		STARPU_ASSERT(worker->sched_mutex_depth < UINT_MAX);
-		worker->sched_mutex_depth++;
-		if (worker->sched_mutex_depth > 1)
-			return 0;
-	}
-
-	return starpu_pthread_mutex_lock(mutex);
+	int p_ret = starpu_pthread_mutex_lock(mutex);
+	int workerid = starpu_worker_get_id();
+	if(workerid != -1 && _starpu_worker_mutex_is_sched_mutex(workerid, mutex))
+		_starpu_worker_set_flag_sched_mutex_locked(workerid, 1);
+	return p_ret;
 }
 
 int starpu_pthread_mutex_unlock_sched(starpu_pthread_mutex_t *mutex)
 {
-	const int workerid = starpu_worker_get_id();
-	struct _starpu_worker * const worker = (workerid != -1)?_starpu_get_worker_struct(workerid):NULL;
-	if(worker && mutex == &worker->sched_mutex)
-	{
-		STARPU_ASSERT(worker->sched_mutex_depth > 0);
-		worker->sched_mutex_depth--;
-		if (worker->sched_mutex_depth > 0)
-			return 0;
-	}
+	int workerid = starpu_worker_get_id();
+	if(workerid != -1 && _starpu_worker_mutex_is_sched_mutex(workerid, mutex))
+		_starpu_worker_set_flag_sched_mutex_locked(workerid, 0);
 
 	return starpu_pthread_mutex_unlock(mutex);
 }
 
 int starpu_pthread_mutex_trylock_sched(starpu_pthread_mutex_t *mutex)
 {
-	const int workerid = starpu_worker_get_id();
-	struct _starpu_worker * const worker = (workerid != -1)?_starpu_get_worker_struct(workerid):NULL;
-	if(worker && mutex == &worker->sched_mutex)
+	int ret = starpu_pthread_mutex_trylock(mutex);
+
+	if (!ret)
 	{
-		STARPU_ASSERT(worker->sched_mutex_depth < UINT_MAX);
-		worker->sched_mutex_depth++;
-		if (worker->sched_mutex_depth > 1)
-			return 0;
+		int workerid = starpu_worker_get_id();
+		if(workerid != -1 && _starpu_worker_mutex_is_sched_mutex(workerid, mutex))
+			_starpu_worker_set_flag_sched_mutex_locked(workerid, 1);
 	}
 
-	return starpu_pthread_mutex_trylock(mutex);
+	return ret;
 }
 
 #ifdef STARPU_DEBUG

File diff suppressed because it is too large
+ 336 - 269
src/core/sched_ctx.c


+ 45 - 42
src/core/sched_ctx.h

@@ -73,12 +73,27 @@ struct _starpu_sched_ctx
 	long iterations[2];
 	int iteration_level;
 
+	/* cond to block push when there are no workers in the ctx */
+	starpu_pthread_cond_t no_workers_cond;
+
+	/* mutex to block push when there are no workers in the ctx */
+	starpu_pthread_mutex_t no_workers_mutex;
+
 	/*ready tasks that couldn't be pushed because the ctx has no workers*/
 	struct starpu_task_list empty_ctx_tasks;
 
+	/* mutext protecting empty_ctx_tasks list */
+	starpu_pthread_mutex_t empty_ctx_mutex;
+
 	/*ready tasks that couldn't be pushed because the the window of tasks was already full*/
 	struct starpu_task_list waiting_tasks;
 
+	/* mutext protecting waiting_tasks list */
+	starpu_pthread_mutex_t waiting_tasks_mutex;
+
+	/* mutext protecting write to all worker's sched_ctx_list structure for this sched_ctx */
+	starpu_pthread_mutex_t sched_ctx_list_mutex;
+
 	/* min CPUs to execute*/
 	int min_ncpus;
 
@@ -127,6 +142,27 @@ struct _starpu_sched_ctx
 	   if not master is -1 */
 	int main_master;
 
+	/* conditions variables used when parallel sections are executed in contexts */
+	starpu_pthread_cond_t parallel_sect_cond[STARPU_NMAXWORKERS];
+	starpu_pthread_mutex_t parallel_sect_mutex[STARPU_NMAXWORKERS];
+	starpu_pthread_cond_t parallel_sect_cond_busy[STARPU_NMAXWORKERS];
+	int busy[STARPU_NMAXWORKERS];
+
+	/* boolean indicating that workers should block in order to allow
+	   parallel sections to be executed on their allocated resources */
+	unsigned parallel_sect[STARPU_NMAXWORKERS];
+
+	/* semaphore that block appl thread until starpu threads are
+	   all blocked and ready to exec the parallel code */
+	sem_t fall_asleep_sem[STARPU_NMAXWORKERS];
+
+	/* semaphore that block appl thread until starpu threads are 
+	   all woke up and ready continue appl */
+	sem_t wake_up_sem[STARPU_NMAXWORKERS];
+
+	/* bool indicating if the workers is sleeping in this ctx */
+	unsigned sleeping[STARPU_NMAXWORKERS];
+
 	/* ctx nesting the current ctx */
 	unsigned nesting_sched_ctx;
 
@@ -154,9 +190,6 @@ struct _starpu_sched_ctx
 	int sms_end_idx;
 
 	int stream_worker;
-
-	starpu_pthread_rwlock_t rwlock;
-	starpu_pthread_t lock_write_owner;
 };
 
 struct _starpu_machine_config;
@@ -208,10 +241,19 @@ void _starpu_worker_gets_out_of_ctx(unsigned sched_ctx_id, struct _starpu_worker
 /* Check if the worker belongs to another sched_ctx */
 unsigned _starpu_worker_belongs_to_a_sched_ctx(int workerid, unsigned sched_ctx_id);
 
+/* mutex synchronising several simultaneous modifications of a context */
+starpu_pthread_rwlock_t* _starpu_sched_ctx_get_changing_ctx_mutex(unsigned sched_ctx_id);
+
 /* indicates wheather this worker should go to sleep or not 
    (if it is the last one awake in a context he should better keep awake) */
 unsigned _starpu_sched_ctx_last_worker_awake(struct _starpu_worker *worker);
 
+/* let the appl know that the worker blocked to execute parallel code */
+void _starpu_sched_ctx_signal_worker_blocked(unsigned sched_ctx_id, int workerid);
+
+/* let the appl know that the worker woke up */
+void _starpu_sched_ctx_signal_worker_woke_up(unsigned sched_ctx_id, int workerid);
+
 /* If starpu_sched_ctx_set_context() has been called, returns the context
  * id set by its last call, or the id of the initial context */
 unsigned _starpu_sched_ctx_get_current_context();
@@ -236,43 +278,4 @@ struct _starpu_sched_ctx *__starpu_sched_ctx_get_sched_ctx_for_worker_and_job(st
 #define _starpu_sched_ctx_get_sched_ctx_for_worker_and_job(w,j) \
 	(_starpu_get_nsched_ctxs() <= 1 ? _starpu_get_sched_ctx_struct(0) : __starpu_sched_ctx_get_sched_ctx_for_worker_and_job((w),(j)))
 
-static inline struct _starpu_sched_ctx *_starpu_get_sched_ctx_struct(unsigned id);
-
-static inline int _starpu_sched_ctx_check_write_locked(unsigned sched_ctx_id)
-{
-	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	return starpu_pthread_equal(sched_ctx->lock_write_owner, starpu_pthread_self());
-}
-#define STARPU_SCHED_CTX_CHECK_LOCK(sched_ctx_id) STARPU_ASSERT(_starpu_sched_ctx_check_write_locked((sched_ctx_id)))
-
-static inline void _starpu_sched_ctx_lock_write(unsigned sched_ctx_id)
-{
-	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	STARPU_ASSERT(!starpu_pthread_equal(sched_ctx->lock_write_owner, starpu_pthread_self()));
-	STARPU_PTHREAD_RWLOCK_WRLOCK(&sched_ctx->rwlock);
-	sched_ctx->lock_write_owner = starpu_pthread_self();
-}
-
-static inline void _starpu_sched_ctx_unlock_write(unsigned sched_ctx_id)
-{
-	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	STARPU_ASSERT(starpu_pthread_equal(sched_ctx->lock_write_owner, starpu_pthread_self()));
-	memset(&sched_ctx->lock_write_owner, 0, sizeof(sched_ctx->lock_write_owner));
-	STARPU_PTHREAD_RWLOCK_UNLOCK(&sched_ctx->rwlock);
-}
-
-static inline void _starpu_sched_ctx_lock_read(unsigned sched_ctx_id)
-{
-	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	STARPU_ASSERT(!starpu_pthread_equal(sched_ctx->lock_write_owner, starpu_pthread_self()));
-	STARPU_PTHREAD_RWLOCK_RDLOCK(&sched_ctx->rwlock);
-}
-
-static inline void _starpu_sched_ctx_unlock_read(unsigned sched_ctx_id)
-{
-	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	STARPU_ASSERT(!starpu_pthread_equal(sched_ctx->lock_write_owner, starpu_pthread_self()));
-	STARPU_PTHREAD_RWLOCK_UNLOCK(&sched_ctx->rwlock);
-}
-
 #endif // __SCHED_CONTEXT_H__

+ 7 - 4
src/core/sched_policy.c

@@ -433,9 +433,9 @@ int _starpu_repush_task(struct _starpu_job *j)
 
 		if(nworkers == 0)
 		{
-			_starpu_sched_ctx_lock_write(sched_ctx->id);
+			STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->empty_ctx_mutex);
 			starpu_task_list_push_front(&sched_ctx->empty_ctx_tasks, task);
-			_starpu_sched_ctx_unlock_write(sched_ctx->id);
+			STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->empty_ctx_mutex);
 #ifdef STARPU_USE_SC_HYPERVISOR
 			if(sched_ctx->id != 0 && sched_ctx->perf_counters != NULL
 			   && sched_ctx->perf_counters->notify_empty_ctx)
@@ -499,9 +499,9 @@ int _starpu_push_task_to_workers(struct starpu_task *task)
 
 		if (nworkers == 0)
 		{
-			_starpu_sched_ctx_lock_write(sched_ctx->id);
+			STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->empty_ctx_mutex);
 			starpu_task_list_push_back(&sched_ctx->empty_ctx_tasks, task);
-			_starpu_sched_ctx_unlock_write(sched_ctx->id);
+			STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->empty_ctx_mutex);
 #ifdef STARPU_USE_SC_HYPERVISOR
 			if(sched_ctx->id != 0 && sched_ctx->perf_counters != NULL
 			   && sched_ctx->perf_counters->notify_empty_ctx)
@@ -591,6 +591,8 @@ int _starpu_push_task_to_workers(struct starpu_task *task)
 		{
 			STARPU_ASSERT(sched_ctx->sched_policy->push_task);
 			/* check out if there are any workers in the context */
+			starpu_pthread_rwlock_t *changing_ctx_mutex = _starpu_sched_ctx_get_changing_ctx_mutex(sched_ctx->id);
+			STARPU_PTHREAD_RWLOCK_RDLOCK(changing_ctx_mutex);
 			nworkers = starpu_sched_ctx_get_nworkers(sched_ctx->id);
 			if (nworkers == 0)
 				ret = -1;
@@ -601,6 +603,7 @@ int _starpu_push_task_to_workers(struct starpu_task *task)
 				ret = sched_ctx->sched_policy->push_task(task);
 				_STARPU_SCHED_END;
 			}
+			STARPU_PTHREAD_RWLOCK_UNLOCK(changing_ctx_mutex);
 		}
 
 		if(ret == -1)

+ 42 - 28
src/core/workers.c

@@ -310,12 +310,12 @@ static inline int _starpu_can_use_nth_implementation(enum starpu_worker_archtype
 	return 0;
 }
 
-/* must be called with sched_mutex locked to protect state_blocked */
 int starpu_worker_can_execute_task(unsigned workerid, struct starpu_task *task, unsigned nimpl)
 {
-	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
+
 	/* if the worker is blocked in a parallel ctx don't submit tasks on it */
-	if(worker->state_blocked) return 0;
+	if(sched_ctx->parallel_sect[workerid] ) return 0;
 
 	/* TODO: check that the task operand sizes will fit on that device */
 	return (task->cl->where & _starpu_config.workers[workerid].worker_mask) &&
@@ -323,12 +323,12 @@ int starpu_worker_can_execute_task(unsigned workerid, struct starpu_task *task,
 		(!task->cl->can_execute || task->cl->can_execute(workerid, task, nimpl));
 }
 
-/* must be called with sched_mutex locked to protect state_blocked */
 int starpu_worker_can_execute_task_impl(unsigned workerid, struct starpu_task *task, unsigned *impl_mask)
 {
-	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
+
 	/* if the worker is blocked in a parallel ctx don't submit tasks on it */
-	if(worker->state_blocked) return 0;
+	if(sched_ctx->parallel_sect[workerid]) return 0;
 
 	unsigned mask;
 	int i;
@@ -366,15 +366,13 @@ int starpu_worker_can_execute_task_impl(unsigned workerid, struct starpu_task *t
 	return mask != 0;
 }
 
-/* must be called with sched_mutex locked to protect state_blocked */
 int starpu_worker_can_execute_task_first_impl(unsigned workerid, struct starpu_task *task, unsigned *nimpl)
 {
-	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
-	/* if the worker is blocked in a parallel ctx don't submit tasks on it */
-	if(worker->state_blocked) return 0;
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
 	int i;
 	enum starpu_worker_archtype arch;
 	struct starpu_codelet *cl;
+	if(sched_ctx->parallel_sect[workerid]) return 0;
 	/* TODO: check that the task operand sizes will fit on that device */
 	cl = task->cl;
 	if (!(cl->where & _starpu_config.workers[workerid].worker_mask)) return 0;
@@ -578,19 +576,10 @@ static void _starpu_worker_init(struct _starpu_worker *workerarg, struct _starpu
 	workerarg->reverse_phase[0] = 0;
 	workerarg->reverse_phase[1] = 0;
 	workerarg->pop_ctx_priority = 1;
-	workerarg->sched_mutex_depth = 0;
+	workerarg->sched_mutex_locked = 0;
+	workerarg->blocked = 0;
 	workerarg->is_slave_somewhere = 0;
 
-	workerarg->state_sched_op_pending = 0;
-	workerarg->state_changing_ctx_waiting = 0;
-	workerarg->state_changing_ctx_notice = 0;
-	workerarg->state_blocked = 0;
-	workerarg->state_block_req = 0;
-	workerarg->state_block_ack = 0;
-	workerarg->state_unblock_req = 0;
-	workerarg->state_unblock_ack = 0;
-	workerarg->block_ref_count = 0;
-
 	/* cpu_set/hwloc_cpu_set initialized in topology.c */
 }
 
@@ -1694,7 +1683,7 @@ unsigned starpu_worker_get_count(void)
 
 unsigned starpu_worker_is_blocked(int workerid)
 {
-	return (unsigned)_starpu_config.workers[workerid].state_blocked;
+	return _starpu_config.workers[workerid].blocked;
 }
 
 unsigned starpu_worker_is_slave_somewhere(int workerid)
@@ -2054,7 +2043,7 @@ void starpu_worker_get_sched_condition(int workerid, starpu_pthread_mutex_t **sc
 	*sched_mutex = &_starpu_config.workers[workerid].sched_mutex;
 }
 
-static int starpu_wakeup_worker_locked(int workerid, starpu_pthread_cond_t *sched_cond, starpu_pthread_mutex_t *mutex STARPU_ATTRIBUTE_UNUSED)
+int starpu_wakeup_worker_locked(int workerid, starpu_pthread_cond_t *cond, starpu_pthread_mutex_t *mutex STARPU_ATTRIBUTE_UNUSED)
 {
 #ifdef STARPU_SIMGRID
 	starpu_pthread_queue_broadcast(&_starpu_simgrid_task_queue[workerid]);
@@ -2062,19 +2051,17 @@ static int starpu_wakeup_worker_locked(int workerid, starpu_pthread_cond_t *sche
 	if (_starpu_config.workers[workerid].status == STATUS_SLEEPING)
 	{
 		_starpu_config.workers[workerid].status = STATUS_WAKING_UP;
-		/* cond_broadcast is required over cond_signal since
-		 * the condition is share for multiple purpose */
-		STARPU_PTHREAD_COND_BROADCAST(sched_cond);
+		STARPU_PTHREAD_COND_SIGNAL(cond);
 		return 1;
 	}
 	return 0;
 }
 
-static int starpu_wakeup_worker(int workerid, starpu_pthread_cond_t *sched_cond, starpu_pthread_mutex_t *mutex)
+int starpu_wakeup_worker(int workerid, starpu_pthread_cond_t *cond, starpu_pthread_mutex_t *mutex)
 {
 	int success;
 	STARPU_PTHREAD_MUTEX_LOCK_SCHED(mutex);
-	success = starpu_wakeup_worker_locked(workerid, sched_cond, mutex);
+	success = starpu_wakeup_worker_locked(workerid, cond, mutex);
 	STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(mutex);
 	return success;
 }
@@ -2168,6 +2155,33 @@ void starpu_get_version(int *major, int *minor, int *release)
 	*release = STARPU_RELEASE_VERSION;
 }
 
+void _starpu_unlock_mutex_if_prev_locked()
+{
+	int workerid = starpu_worker_get_id();
+	if(workerid != -1)
+	{
+		struct _starpu_worker *w = _starpu_get_worker_struct(workerid);
+		if(w->sched_mutex_locked)
+		{
+			STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&w->sched_mutex);
+			_starpu_worker_set_flag_sched_mutex_locked(workerid, 1);
+		}
+	}
+	return;
+}
+
+void _starpu_relock_mutex_if_prev_locked()
+{
+	int workerid = starpu_worker_get_id();
+	if(workerid != -1)
+	{
+		struct _starpu_worker *w = _starpu_get_worker_struct(workerid);
+		if(w->sched_mutex_locked)
+			STARPU_PTHREAD_MUTEX_LOCK_SCHED(&w->sched_mutex);
+	}
+	return;
+}
+
 unsigned starpu_worker_get_sched_ctx_list(int workerid, unsigned **sched_ctxs)
 {
 	unsigned s = 0;

+ 19 - 224
src/core/workers.h

@@ -82,26 +82,8 @@ LIST_TYPE(_starpu_worker,
 	starpu_pthread_cond_t started_cond; /* indicate when the worker is ready */
 	starpu_pthread_cond_t ready_cond; /* indicate when the worker is ready */
 	unsigned memory_node; /* which memory node is the worker associated with ? */
-	/* condition variable used for passive waiting operations on worker
-	 * STARPU_PTHREAD_COND_BROADCAST must be used instead of STARPU_PTHREAD_COND_SIGNAL,
-	 * since the condition is shared for multiple purpose */
-	starpu_pthread_cond_t sched_cond;
+	starpu_pthread_cond_t sched_cond; /* condition variable used when the worker waits for tasks. */
         starpu_pthread_mutex_t sched_mutex; /* mutex protecting sched_cond */
-	int state_sched_op_pending:1; /* a task pop is ongoing even though sched_mutex may temporarily be unlocked */
-	int state_changing_ctx_waiting:1; /* a thread is waiting for operations such as pop to complete before acquiring sched_mutex and modifying the worker ctx*/
-	int state_changing_ctx_notice:1; /* the worker ctx is about to change or being changed, wait for flag to be cleared before starting new scheduling operations */
-	int state_blocked:1; /* worker is currently blocked */
-	int state_block_req:1; /* a request for state transition from unblocked to blocked is pending */
-	int state_block_ack:1; /* a block request has been honored */
-	int state_unblock_req:1; /* a request for state transition from blocked to unblocked is pending */
-	int state_unblock_ack:1; /* an unblock request has been honored */
-	 /* cumulative blocking depth
-	  * - =0  worker unblocked
-	  * - >0  worker blocked
-	  * - transition from 0 to 1 triggers a block_req
-	  * - transition from 1 to 0 triggers a unblock_req
-	  */
-	unsigned block_ref_count;
 	struct starpu_task_list local_tasks; /* this queue contains tasks that have been explicitely submitted to that queue */
 	struct starpu_task **local_ordered_tasks; /* this queue contains tasks that have been explicitely submitted to that queue with an explicit order */
 	unsigned local_ordered_tasks_size; /* this records the size of local_ordered_tasks */
@@ -155,8 +137,11 @@ LIST_TYPE(_starpu_worker,
 	/* indicate which priority of ctx is currently active: the values are 0 or 1*/
 	unsigned pop_ctx_priority;
 
-	/* sched mutex local worker locking depth */
-	unsigned sched_mutex_depth;
+	/* flag to know if sched_mutex is locked or not */
+	unsigned sched_mutex_locked;
+
+	/* bool to indicate if the worker is blocked in a ctx */
+	unsigned blocked;
 
 	/* bool to indicate if the worker is slave in a ctx */
 	unsigned is_slave_somewhere;
@@ -523,7 +508,7 @@ static inline struct _starpu_worker *_starpu_get_worker_struct(unsigned id)
 	return &_starpu_config.workers[id];
 }
 
-/* Returns the starpu_sched_ctx structure that describes the state of the 
+/* Returns the starpu_sched_ctx structure that descriebes the state of the 
  * specified ctx */
 static inline struct _starpu_sched_ctx *_starpu_get_sched_ctx_struct(unsigned id)
 {
@@ -573,6 +558,18 @@ int starpu_worker_get_nids_by_type(enum starpu_worker_archtype type, int *worker
    the list might not be updated */
 int starpu_worker_get_nids_ctx_free_by_type(enum starpu_worker_archtype type, int *workerids, int maxsize);
 
+/* if the current worker has the lock release it */
+void _starpu_unlock_mutex_if_prev_locked();
+
+/* if we prev released the lock relock it */
+void _starpu_relock_mutex_if_prev_locked();
+
+static inline void _starpu_worker_set_flag_sched_mutex_locked(int workerid, unsigned flag)
+{
+	struct _starpu_worker *w = _starpu_get_worker_struct(workerid);
+	w->sched_mutex_locked = flag;
+}
+
 static inline unsigned _starpu_worker_mutex_is_sched_mutex(int workerid, starpu_pthread_mutex_t *mutex)
 {
 	struct _starpu_worker *w = _starpu_get_worker_struct(workerid);
@@ -625,206 +622,4 @@ void _starpu_worker_set_stream_ctx(unsigned workerid, struct _starpu_sched_ctx *
 
 struct _starpu_sched_ctx* _starpu_worker_get_ctx_stream(unsigned stream_workerid);
 
-/* Must be called with worker's sched_mutex held.
- */
-static inline void _starpu_worker_request_blocking(struct _starpu_worker * const worker)
-{
-	/* flush pending requests to start on a fresh transaction epoch */
-	while (worker->state_unblock_req)
-		STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
-
-	/* announce blocking intent */
-	STARPU_ASSERT(worker->block_ref_count < UINT_MAX);
-	worker->block_ref_count++;
-
-	if (worker->block_ref_count == 1)
-	{
-		/* only the transition from 0 to 1 triggers the block_req */
-
-		STARPU_ASSERT(!worker->state_blocked);
-		STARPU_ASSERT(!worker->state_block_req);
-		STARPU_ASSERT(!worker->state_block_ack);
-		STARPU_ASSERT(!worker->state_unblock_req);
-		STARPU_ASSERT(!worker->state_unblock_ack);
-
-		/* trigger the block_req */
-		worker->state_block_req = 1;
-		STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
-
-		/* wait for block_req to be processed */
-		while (!worker->state_block_ack)
-			STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
-
-		STARPU_ASSERT(worker->block_ref_count >= 1);
-		STARPU_ASSERT(worker->state_block_req);
-		STARPU_ASSERT(worker->state_blocked);
-
-		/* reset block_req state flags */
-		worker->state_block_req = 0;
-		worker->state_block_ack = 0;
-
-		/* broadcast block_req state flags reset */
-		STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
-	}
-}
-
-/* Must be called with worker's sched_mutex held.
- */
-static inline void _starpu_worker_request_unblocking(struct _starpu_worker * const worker)
-{
-	/* flush pending requests to start on a fresh transaction epoch */
-	while (worker->state_block_req)
-		STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
-
-	/* unblocking may be requested unconditionnally
-	 * thus, check is unblocking is really needed */
-	if (worker->state_blocked)
-	{
-		if (worker->block_ref_count == 1)
-		{
-			/* only the transition from 1 to 0 triggers the unblock_req */
-
-			STARPU_ASSERT(!worker->state_block_req);
-			STARPU_ASSERT(!worker->state_block_ack);
-			STARPU_ASSERT(!worker->state_unblock_req);
-			STARPU_ASSERT(!worker->state_unblock_ack);
-
-			/* trigger the unblock_req */
-			worker->state_unblock_req = 1;
-			STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
-
-			/* wait for the unblock_req to be processed */
-			while (!worker->state_unblock_ack)
-				STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
-
-			STARPU_ASSERT(worker->state_unblock_req);
-			STARPU_ASSERT(!worker->state_blocked);
-
-			/* reset unblock_req state flags */
-			worker->state_unblock_req = 0;
-			worker->state_unblock_ack = 0;
-
-			/* broadcast unblock_req state flags reset */
-			STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
-		}
-
-		/* announce unblocking complete */
-		STARPU_ASSERT(worker->block_ref_count > 0);
-		worker->block_ref_count--;
-	}
-}
-
-/* Must be called with worker's sched_mutex held.
- */
-static inline void _starpu_worker_process_block_requests(struct _starpu_worker * const worker)
-{
-	while (worker->state_block_req)
-	{
-		STARPU_ASSERT(!worker->state_blocked);
-		STARPU_ASSERT(!worker->state_block_ack);
-		STARPU_ASSERT(!worker->state_unblock_req);
-		STARPU_ASSERT(!worker->state_unblock_ack);
-		STARPU_ASSERT(worker->block_ref_count > 0);
-		
-		/* enter effective blocked state */
-		worker->state_blocked = 1;
-
-		/* notify block_req processing */
-		worker->state_block_ack = 1;
-		STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
-
-		/* block */
-		while (!worker->state_unblock_req)
-			STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
-
-		STARPU_ASSERT(worker->state_blocked);
-		STARPU_ASSERT(!worker->state_block_req);
-		STARPU_ASSERT(!worker->state_block_ack);
-		STARPU_ASSERT(!worker->state_unblock_ack);
-		STARPU_ASSERT(worker->block_ref_count > 0);
-
-		/* leave effective blocked state */
-		worker->state_blocked = 0;
-
-		/* notify unblock_req processing */
-		worker->state_unblock_ack = 1;
-		STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
-	}
-}
-
-/* Must be called with worker's sched_mutex held.
- * Mark the beginning of a scheduling operation during which the sched_mutex
- * lock may be temporarily released, but the scheduling context of the worker
- * should not be modified */
-static inline void _starpu_worker_enter_sched_op(struct _starpu_worker * const worker)
-{
-	/* process pending block requests before entering a sched_op region */
-	_starpu_worker_process_block_requests(worker);
-	while (worker->state_changing_ctx_notice)
-	{
-		STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
-
-		/* new block requests may have been triggered during the wait,
-		 * need to check again */
-		_starpu_worker_process_block_requests(worker);
-	}
-
-	/* no block request and no ctx change ahead,
-	 * enter sched_op */
-	worker->state_sched_op_pending = 1;
-}
-
-/* Must be called with worker's sched_mutex held.
- * Mark the end of a scheduling operation, and notify potential waiters that
- * scheduling context changes can safely be performed again.
- */
-static inline void  _starpu_worker_leave_sched_op(struct _starpu_worker * const worker)
-{
-	worker->state_sched_op_pending = 0;
-	if (worker->state_changing_ctx_waiting)
-		STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
-}
-
-/* Must be called with worker's sched_mutex held.
- */
-static inline void _starpu_worker_enter_changing_ctx_op(struct _starpu_worker * const worker)
-{
-	/* flush pending requests to start on a fresh transaction epoch */
-	while (worker->state_changing_ctx_notice)
-		STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
-
-	/* announce changing_ctx intent
-	 *
-	 * - an already started sched_op is allowed to complete
-	 * - no new sched_op may be started
-	 */
-	worker->state_changing_ctx_notice = 1;
-
-	/* allow for an already started sched_op to complete */
-	if (worker->state_sched_op_pending)
-	{
-		/* request sched_op to broadcast when way is cleared */
-		worker->state_changing_ctx_waiting = 1;
-
-		/* wait for sched_op completion */
-		STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
-		do
-		{
-			STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
-		}
-		while (worker->state_sched_op_pending);
-
-		/* reset flag so other sched_ops wont have to broadcast state */
-		worker->state_changing_ctx_waiting = 0;
-	}
-}
-
-/* Must be called with worker's sched_mutex held.
- */
-static inline void _starpu_worker_leave_changing_ctx_op(struct _starpu_worker * const worker)
-{
-	worker->state_changing_ctx_notice = 0;
-	STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
-}
-
 #endif // __WORKERS_H__

+ 66 - 27
src/drivers/driver_common/driver_common.c

@@ -342,10 +342,72 @@ struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *worker, int w
 {
 	STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
 	struct starpu_task *task;
+	unsigned needed = 1;
 	unsigned executing STARPU_ATTRIBUTE_UNUSED = 0;
 
 	_starpu_worker_set_status_scheduling(workerid);
-	_starpu_worker_enter_sched_op(worker);
+	while(needed)
+	{
+		struct _starpu_sched_ctx *sched_ctx = NULL;
+		struct _starpu_sched_ctx_elt *e = NULL;
+		struct _starpu_sched_ctx_list_iterator list_it;
+
+		_starpu_sched_ctx_list_iterator_init(worker->sched_ctx_list, &list_it);
+		while (_starpu_sched_ctx_list_iterator_has_next(&list_it))
+		{
+			e = _starpu_sched_ctx_list_iterator_get_next(&list_it);
+			sched_ctx = _starpu_get_sched_ctx_struct(e->sched_ctx);
+			if(sched_ctx && sched_ctx->id > 0 && sched_ctx->id < STARPU_NMAX_SCHED_CTXS)
+			{
+				STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->parallel_sect_mutex[workerid]);
+				if(!sched_ctx->sched_policy)
+					worker->is_slave_somewhere = sched_ctx->main_master != workerid;
+
+				if(sched_ctx->parallel_sect[workerid])
+				{
+					/* don't let the worker sleep with the sched_mutex taken */
+					/* we need it until here bc of the list of ctxs of the workers
+					   that can change in another thread */
+					STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
+					needed = 0;
+					_starpu_sched_ctx_signal_worker_blocked(sched_ctx->id, workerid);
+					sched_ctx->busy[workerid] = 1;
+					STARPU_PTHREAD_COND_WAIT(&sched_ctx->parallel_sect_cond[workerid], &sched_ctx->parallel_sect_mutex[workerid]);
+					sched_ctx->busy[workerid] = 0;
+					STARPU_PTHREAD_COND_SIGNAL(&sched_ctx->parallel_sect_cond_busy[workerid]);
+					_starpu_sched_ctx_signal_worker_woke_up(sched_ctx->id, workerid);
+					sched_ctx->parallel_sect[workerid] = 0;
+					STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
+				}
+				STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->parallel_sect_mutex[workerid]);
+			}
+			if(!needed)
+				break;
+		}
+		/* don't worry if the value is not correct (no lock) it will do it next time */
+		if(worker->tmp_sched_ctx != -1)
+		{
+			sched_ctx = _starpu_get_sched_ctx_struct(worker->tmp_sched_ctx);
+			STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->parallel_sect_mutex[workerid]);
+			if(sched_ctx->parallel_sect[workerid])
+			{
+//				needed = 0;
+				STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
+				_starpu_sched_ctx_signal_worker_blocked(sched_ctx->id, workerid);
+				sched_ctx->busy[workerid] = 1;
+				STARPU_PTHREAD_COND_WAIT(&sched_ctx->parallel_sect_cond[workerid], &sched_ctx->parallel_sect_mutex[workerid]);
+				sched_ctx->busy[workerid] = 0;
+				STARPU_PTHREAD_COND_SIGNAL(&sched_ctx->parallel_sect_cond_busy[workerid]);
+				_starpu_sched_ctx_signal_worker_woke_up(sched_ctx->id, workerid);
+				sched_ctx->parallel_sect[workerid] = 0;
+				STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
+			}
+			STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->parallel_sect_mutex[workerid]);
+		}
+
+		needed = !needed;
+	}
+
 	if ((worker->pipeline_length == 0 && worker->current_task)
 		|| (worker->pipeline_length != 0 && worker->ntasks))
 		/* This worker is executing something */
@@ -359,9 +421,7 @@ struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *worker, int w
 		task = NULL;
 	/*else try to pop a task*/
 	else
-	{
 		task = _starpu_pop_task(worker);
-	}
 
 #if !defined(STARPU_SIMGRID)
 	if (task == NULL && !executing)
@@ -378,17 +438,11 @@ struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *worker, int w
 		if (_starpu_worker_can_block(memnode, worker)
 			&& !_starpu_sched_ctx_last_worker_awake(worker))
 		{
-			_starpu_worker_leave_sched_op(worker);
-			do
-			{
-				STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
-			}
-			while (worker->status == STATUS_SLEEPING);
+			STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
 			STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
 		}
 		else
 		{
-			_starpu_worker_leave_sched_op(worker);
 			STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
 			if (_starpu_machine_is_running())
 				_starpu_exponential_backoff(worker);
@@ -409,7 +463,6 @@ struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *worker, int w
 	}
 	worker->spinning_backoff = BACKOFF_MIN;
 
-	_starpu_worker_leave_sched_op(worker);
 	STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
 
 
@@ -430,9 +483,7 @@ int _starpu_get_multi_worker_task(struct _starpu_worker *workers, struct starpu_
 #ifndef STARPU_NON_BLOCKING_DRIVERS
 	/* This assumes only 1 worker */
 	STARPU_ASSERT_MSG(nworkers == 1, "Multiple workers is not yet possible in blocking drivers mode\n");
-	_starpu_set_local_worker_key(&workers[0]);
 	STARPU_PTHREAD_MUTEX_LOCK_SCHED(&workers[0].sched_mutex);
-	_starpu_worker_enter_sched_op(&workers[0]);
 #endif
 	for (i = 0; i < nworkers; i++)
 	{
@@ -457,20 +508,16 @@ int _starpu_get_multi_worker_task(struct _starpu_worker *workers, struct starpu_
 		else
 		{
 #ifdef STARPU_NON_BLOCKING_DRIVERS
-			_starpu_set_local_worker_key(&workers[i]);
 			STARPU_PTHREAD_MUTEX_LOCK_SCHED(&workers[i].sched_mutex);
 #endif
 			_starpu_worker_set_status_scheduling(workers[i].workerid);
-#ifdef STARPU_NON_BLOCKING_DRIVERS
-			_starpu_worker_enter_sched_op(&workers[i]);
-#endif
+			_starpu_set_local_worker_key(&workers[i]);
 			tasks[i] = _starpu_pop_task(&workers[i]);
 			if(tasks[i] != NULL)
 			{
 				_starpu_worker_set_status_scheduling_done(workers[i].workerid);
 				_starpu_worker_set_status_wakeup(workers[i].workerid);
 #ifdef STARPU_NON_BLOCKING_DRIVERS
-				_starpu_worker_leave_sched_op(&workers[i]);
 				STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&workers[i].sched_mutex);
 #endif
 
@@ -509,7 +556,6 @@ int _starpu_get_multi_worker_task(struct _starpu_worker *workers, struct starpu_
 			{
 				_starpu_worker_set_status_sleeping(workers[i].workerid);
 #ifdef STARPU_NON_BLOCKING_DRIVERS
-				_starpu_worker_leave_sched_op(&workers[i]);
 				STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&workers[i].sched_mutex);
 #endif
 			}
@@ -535,17 +581,11 @@ int _starpu_get_multi_worker_task(struct _starpu_worker *workers, struct starpu_
 		if (_starpu_worker_can_block(memnode, worker)
 				&& !_starpu_sched_ctx_last_worker_awake(worker))
 		{
-			_starpu_worker_leave_sched_op(worker);
-			do
-			{
-				STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
-			}
-			while (worker->status == STATUS_SLEEPING);
+			STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
 			STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
 		}
 		else
 		{
-			_starpu_worker_leave_sched_op(worker);
 			STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
 			if (_starpu_machine_is_running())
 				_starpu_exponential_backoff(worker);
@@ -557,7 +597,6 @@ int _starpu_get_multi_worker_task(struct _starpu_worker *workers, struct starpu_
 	worker->spinning_backoff = BACKOFF_MIN;
 #endif /* !STARPU_SIMGRID */
 
-	_starpu_worker_leave_sched_op(&workers[0]);
 	STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&workers[0].sched_mutex);
 #endif /* !STARPU_NON_BLOCKING_DRIVERS */
 

+ 8 - 2
src/sched_policies/component_worker.c

@@ -503,8 +503,11 @@ static void simple_worker_can_pull(struct starpu_sched_component * worker_compon
 	}
 	if(_starpu_sched_component_worker_is_sleeping_status(worker_component))
 	{
+		starpu_pthread_mutex_t *sched_mutex;
+		starpu_pthread_cond_t *sched_cond;
+		starpu_worker_get_sched_condition(w->workerid, &sched_mutex, &sched_cond);
 		_starpu_sched_component_unlock_worker(worker_component->tree->sched_ctx_id, w->workerid);
-		starpu_wake_worker(w->workerid);
+		starpu_wakeup_worker(w->workerid, sched_cond, sched_mutex);
 	}
 	else
 		_starpu_sched_component_unlock_worker(worker_component->tree->sched_ctx_id, w->workerid);
@@ -723,7 +726,10 @@ static void combined_worker_can_pull(struct starpu_sched_component * component)
 		_starpu_sched_component_lock_worker(component->tree->sched_ctx_id, worker);
 		if(_starpu_sched_component_worker_is_sleeping_status(component))
 		{
-			starpu_wake_worker(worker);
+			starpu_pthread_mutex_t *sched_mutex;
+			starpu_pthread_cond_t *sched_cond;
+			starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
+			starpu_wakeup_worker(worker, sched_cond, sched_mutex);
 		}
 		if(_starpu_sched_component_worker_is_reset_status(component))
 			_starpu_sched_component_worker_set_changed_status(component);

+ 2 - 2
src/sched_policies/deque_modeling_policy_data_aware.c

@@ -452,7 +452,7 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 
 
 #if !defined(STARPU_NON_BLOCKING_DRIVERS) || defined(STARPU_SIMGRID)
-		starpu_wake_worker_locked(best_workerid);
+		starpu_wakeup_worker_locked(best_workerid, sched_cond, sched_mutex);
 #endif
 		starpu_push_task_end(task);
 		STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(sched_mutex);
@@ -464,7 +464,7 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 		dt->queue_array[best_workerid]->ntasks++;
 		dt->queue_array[best_workerid]->nprocessed++;
 #if !defined(STARPU_NON_BLOCKING_DRIVERS) || defined(STARPU_SIMGRID)
-		starpu_wake_worker_locked(best_workerid);
+		starpu_wakeup_worker_locked(best_workerid, sched_cond, sched_mutex);
 #endif
 		starpu_push_task_end(task);
 		STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(sched_mutex);

+ 1 - 1
src/sched_policies/eager_central_policy.c

@@ -201,7 +201,7 @@ static void eager_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nw
 		int workerid = workerids[i];
 		int curr_workerid = _starpu_worker_get_id();
 		if(workerid != curr_workerid)
-			starpu_wake_worker_locked(workerid);
+			starpu_wake_worker(workerid);
 
 		starpu_sched_ctx_worker_shares_tasks_lists(workerid, sched_ctx_id);
 	}

+ 1 - 1
src/sched_policies/eager_central_priority_policy.c

@@ -308,7 +308,7 @@ static void eager_center_priority_add_workers(unsigned sched_ctx_id, int *worker
 		int workerid = workerids[i];
 		int curr_workerid = _starpu_worker_get_id();
 		if(workerid != curr_workerid)
-			starpu_wake_worker_locked(workerid);
+			starpu_wake_worker(workerid);
 
                 starpu_sched_ctx_worker_shares_tasks_lists(workerid, sched_ctx_id);
         }

+ 1 - 1
src/sched_policies/parallel_eager.c

@@ -265,7 +265,7 @@ static struct starpu_task *pop_task_peager_policy(unsigned sched_ctx_id)
 				_starpu_fifo_push_task(data->local_fifo[local_worker], alias);
 
 #if !defined(STARPU_NON_BLOCKING_DRIVERS) || defined(STARPU_SIMGRID)
-				starpu_wake_worker_locked(local_worker);
+				starpu_wakeup_worker_locked(local_worker, sched_cond, sched_mutex);
 #endif
 				STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(sched_mutex);