Просмотр исходного кода

modify sched_mutex locking scheme to allow unlocking sched_mutexes after an upcoming sched_ctx change has been notified to the workers
make check sequence is back to green

Olivier Aumage лет назад: 8
Родитель
Сommit
96bf2e5d12
4 измененных файлов с 94 добавлено и 54 удалено
  1. 61 47
      src/core/sched_ctx.c
  2. 1 0
      src/core/workers.c
  3. 22 0
      src/core/workers.h
  4. 10 7
      src/drivers/driver_common/driver_common.c

+ 61 - 47
src/core/sched_ctx.c

@@ -38,33 +38,37 @@ static int occupied_sms = 0;
 static unsigned _starpu_get_first_free_sched_ctx(struct _starpu_machine_config *config);
 
 static void _starpu_sched_ctx_put_new_master(unsigned sched_ctx_id);
-static void _starpu_sched_ctx_put_workers_to_sleep(unsigned sched_ctx_id, unsigned all);
-static void _starpu_sched_ctx_wake_up_workers(unsigned sched_ctx_id, unsigned all, const int workers_locked);
+static void _starpu_sched_ctx_put_workers_to_sleep(unsigned sched_ctx_id, unsigned all, int notified_workers);
+static void _starpu_sched_ctx_wake_up_workers(unsigned sched_ctx_id, unsigned all, int notified_workers);
 static void _starpu_sched_ctx_update_parallel_workers_with(unsigned sched_ctx_id);
 static void _starpu_sched_ctx_update_parallel_workers_without(unsigned sched_ctx_id);
 
-static void set_priority_on_locked_workers(int *workers, int nworkers, unsigned sched_ctx_id, unsigned priority);
-static void set_priority_hierarchically_on_locked_workers(int* workers_to_add, unsigned nworkers_to_add, unsigned sched_ctx, unsigned priority);
+static void set_priority_on_notified_workers(int *workers, int nworkers, unsigned sched_ctx_id, unsigned priority);
+static void set_priority_hierarchically_on_notified_workers(int* workers_to_add, unsigned nworkers_to_add, unsigned sched_ctx, unsigned priority);
 static void fetch_tasks_from_empty_ctx_list(struct _starpu_sched_ctx *sched_ctx);
-static void add_locked_workers(int *workers_to_add, int nworkers_to_add, unsigned sched_ctx_id);
+static void add_notified_workers(int *workers_to_add, int nworkers_to_add, unsigned sched_ctx_id);
 
-static void lock_workers_for_changing_ctx(const unsigned nworkers, const int * const workerids)
+static void notify_workers_about_changing_ctx_pending(const unsigned nworkers, const int * const workerids)
 {
 	unsigned i;
 	for (i=0; i<nworkers; i++)
 	{
 		struct _starpu_worker *worker = _starpu_get_worker_struct(workerids[i]);
 		STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
+		_starpu_worker_set_changing_ctx_notice(worker);
 		_starpu_worker_wait_for_transient_sched_op_completion(worker);
+		STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
 	}
 }
 
-static void unlock_workers_for_changing_ctx(const unsigned nworkers, const int * const workerids)
+static void notify_workers_about_changing_ctx_done(const unsigned nworkers, const int * const workerids)
 {
 	unsigned i;
 	for (i=0; i<nworkers; i++)
 	{
 		struct _starpu_worker *worker = _starpu_get_worker_struct(workerids[i]);
+		STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
+		_starpu_worker_clear_changing_ctx_notice(worker);
 		STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
 	}
 }
@@ -125,7 +129,7 @@ static void _starpu_update_workers_with_ctx(int *workerids, int nworkers, int sc
 	return;
 }
 
-static void _starpu_update_locked_workers_with_ctx(int *workerids, int nworkers, int sched_ctx_id)
+static void _starpu_update_notified_workers_with_ctx(int *workerids, int nworkers, int sched_ctx_id)
 {
 	int i;
 	struct _starpu_worker *worker = NULL;
@@ -163,7 +167,7 @@ static void _starpu_update_workers_without_ctx(int *workerids, int nworkers, int
 	return;
 }
 
-static void _starpu_update_locked_workers_without_ctx(int *workerids, int nworkers, int sched_ctx_id, unsigned now)
+static void _starpu_update_notified_workers_without_ctx(int *workerids, int nworkers, int sched_ctx_id, unsigned now)
 {
 	int i;
 	struct _starpu_worker *worker = NULL;
@@ -197,7 +201,7 @@ void starpu_sched_ctx_worker_shares_tasks_lists(int workerid, int sched_ctx_id)
 	worker->shares_tasks_lists[sched_ctx_id] = 1;
 }
 
-static void _do_add_locked_workers(struct _starpu_sched_ctx *sched_ctx, int *workerids, int nworkers)
+static void _do_add_notified_workers(struct _starpu_sched_ctx *sched_ctx, int *workerids, int nworkers)
 {
 	int ndevices = 0;
 	struct starpu_perfmodel_device devices[nworkers];
@@ -328,12 +332,14 @@ static void _starpu_add_workers_to_new_sched_ctx(struct _starpu_sched_ctx *sched
 			(void)_workerid;
 		}
 		struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
+		STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
 		worker->tmp_sched_ctx = (int)sched_ctx->id;
+		STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
 	}
 
-	lock_workers_for_changing_ctx(nworkers, workerids);
-	_do_add_locked_workers(sched_ctx, workerids, nworkers);
-	unlock_workers_for_changing_ctx(nworkers, workerids);
+	notify_workers_about_changing_ctx_pending(nworkers, workerids);
+	_do_add_notified_workers(sched_ctx, workerids, nworkers);
+	notify_workers_about_changing_ctx_done(nworkers, workerids);
 
 	if(sched_ctx->sched_policy && sched_ctx->sched_policy->add_workers)
 	{
@@ -1088,7 +1094,7 @@ void starpu_sched_ctx_delete(unsigned sched_ctx_id)
 	unsigned nworkers_ctx = starpu_sched_ctx_get_workers_list(sched_ctx->id, &workerids);
 	int backup_workerids[nworkers_ctx];
 	memcpy(backup_workerids, workerids, nworkers_ctx*sizeof(backup_workerids[0]));
-	lock_workers_for_changing_ctx(nworkers_ctx, backup_workerids);
+	notify_workers_about_changing_ctx_pending(nworkers_ctx, backup_workerids);
 
 	/*if both of them have all the ressources is pointless*/
 	/*trying to transfer ressources from one ctx to the other*/
@@ -1098,7 +1104,7 @@ void starpu_sched_ctx_delete(unsigned sched_ctx_id)
 	if(nworkers_ctx > 0 && inheritor_sched_ctx && inheritor_sched_ctx->id != STARPU_NMAX_SCHED_CTXS &&
 	   !(nworkers_ctx == nworkers && nworkers_ctx == inheritor_sched_ctx->workers->nworkers))
 	{
-		add_locked_workers(workerids, nworkers_ctx, inheritor_sched_ctx_id);
+		add_notified_workers(workerids, nworkers_ctx, inheritor_sched_ctx_id);
 		starpu_sched_ctx_set_priority_on_level(workerids, nworkers_ctx, inheritor_sched_ctx_id, 1);
 	}
 
@@ -1111,11 +1117,11 @@ void starpu_sched_ctx_delete(unsigned sched_ctx_id)
 
 		/* announce upcoming context changes, then wait for transient unlocked operations to
 		 * complete before altering the sched_ctx under sched_mutex protection */
-		_starpu_update_locked_workers_without_ctx(workerids, nworkers_ctx, sched_ctx_id, 1);
+		_starpu_update_notified_workers_without_ctx(workerids, nworkers_ctx, sched_ctx_id, 1);
 		_starpu_sched_ctx_free_scheduling_data(sched_ctx);
 		_starpu_delete_sched_ctx(sched_ctx);
 	}
-	unlock_workers_for_changing_ctx(nworkers_ctx, backup_workerids);
+	notify_workers_about_changing_ctx_done(nworkers_ctx, backup_workerids);
 
 	_starpu_sched_ctx_unlock_write(sched_ctx_id);
 	/* workerids is malloc-ed in starpu_sched_ctx_get_workers_list, don't forget to free it when
@@ -1233,7 +1239,7 @@ void _starpu_push_task_to_waiting_list(struct _starpu_sched_ctx *sched_ctx, stru
 	return;
 }
 
-static void set_priority_on_level_on_locked_workers(int* workers_to_add, unsigned nworkers_to_add, unsigned sched_ctx, unsigned priority)
+static void set_priority_on_level_on_notified_workers(int* workers_to_add, unsigned nworkers_to_add, unsigned sched_ctx, unsigned priority)
 {
 	(void) workers_to_add;
 	(void) nworkers_to_add;
@@ -1271,19 +1277,19 @@ void starpu_sched_ctx_set_priority_on_level(int* workers_to_add, unsigned nworke
 
 }
 
-static void set_priority_hierarchically_on_locked_workers(int* workers_to_add, unsigned nworkers_to_add, unsigned sched_ctx, unsigned priority)
+static void set_priority_hierarchically_on_notified_workers(int* workers_to_add, unsigned nworkers_to_add, unsigned sched_ctx, unsigned priority)
 {
 	if(starpu_sched_ctx_get_hierarchy_level(sched_ctx) > 0)
 	{
 		unsigned father = starpu_sched_ctx_get_inheritor(sched_ctx);
-		set_priority_on_locked_workers(workers_to_add, nworkers_to_add, father, priority);
-		set_priority_on_level_on_locked_workers(workers_to_add, nworkers_to_add, father, priority);
-		set_priority_hierarchically_on_locked_workers(workers_to_add, nworkers_to_add, father, priority);
+		set_priority_on_notified_workers(workers_to_add, nworkers_to_add, father, priority);
+		set_priority_on_level_on_notified_workers(workers_to_add, nworkers_to_add, father, priority);
+		set_priority_hierarchically_on_notified_workers(workers_to_add, nworkers_to_add, father, priority);
 	}
 	return;
 }
 
-static void add_locked_workers(int *workerids, int nworkers, unsigned sched_ctx_id)
+static void add_notified_workers(int *workerids, int nworkers, unsigned sched_ctx_id)
 {
 	if (!nworkers)
 		return;
@@ -1311,7 +1317,7 @@ static void add_locked_workers(int *workerids, int nworkers, unsigned sched_ctx_
 			}
 		}
 	}
-	_do_add_locked_workers(sched_ctx, workerids, nworkers);
+	_do_add_notified_workers(sched_ctx, workerids, nworkers);
 	if(n_added_workers > 0)
 	{
 		if(sched_ctx->sched_policy && sched_ctx->sched_policy->add_workers)
@@ -1320,10 +1326,10 @@ static void add_locked_workers(int *workerids, int nworkers, unsigned sched_ctx_
 			sched_ctx->sched_policy->add_workers(sched_ctx->id, added_workers, n_added_workers);
 			_STARPU_SCHED_END;
 		}
-		_starpu_update_locked_workers_with_ctx(added_workers, n_added_workers, sched_ctx->id);
+		_starpu_update_notified_workers_with_ctx(added_workers, n_added_workers, sched_ctx->id);
 	}
-	set_priority_on_locked_workers(workerids, nworkers, sched_ctx_id, 1);
-	set_priority_hierarchically_on_locked_workers(workerids, nworkers, sched_ctx_id, 0);
+	set_priority_on_notified_workers(workerids, nworkers, sched_ctx_id, 1);
+	set_priority_hierarchically_on_notified_workers(workerids, nworkers, sched_ctx_id, 0);
 	fetch_tasks_from_empty_ctx_list(sched_ctx);
 }
 
@@ -1333,9 +1339,9 @@ void starpu_sched_ctx_add_workers(int *workers_to_add, int nworkers_to_add, unsi
 	_starpu_check_workers(workers_to_add, nworkers_to_add);
 
 	_starpu_sched_ctx_lock_write(sched_ctx_id);
-	lock_workers_for_changing_ctx(nworkers_to_add, workers_to_add);
-	add_locked_workers(workers_to_add, nworkers_to_add, sched_ctx_id);
-	unlock_workers_for_changing_ctx(nworkers_to_add, workers_to_add);
+	notify_workers_about_changing_ctx_pending(nworkers_to_add, workers_to_add);
+	add_notified_workers(workers_to_add, nworkers_to_add, sched_ctx_id);
+	notify_workers_about_changing_ctx_done(nworkers_to_add, workers_to_add);
 	_starpu_sched_ctx_unlock_write(sched_ctx_id);
 }
 
@@ -1352,15 +1358,15 @@ void starpu_sched_ctx_remove_workers(int *workers_to_remove, int nworkers_to_rem
 		int removed_workers[sched_ctx->workers->nworkers];
 		int n_removed_workers = 0;
 
-		lock_workers_for_changing_ctx(nworkers_to_remove, workers_to_remove);
+		notify_workers_about_changing_ctx_pending(nworkers_to_remove, workers_to_remove);
 		_starpu_remove_workers_from_sched_ctx(sched_ctx, workers_to_remove, nworkers_to_remove, removed_workers, &n_removed_workers);
 
 		if(n_removed_workers > 0)
 		{
-			_starpu_update_locked_workers_without_ctx(removed_workers, n_removed_workers, sched_ctx_id, 0);
-			set_priority_on_locked_workers(removed_workers, n_removed_workers, sched_ctx_id, 1);
+			_starpu_update_notified_workers_without_ctx(removed_workers, n_removed_workers, sched_ctx_id, 0);
+			set_priority_on_notified_workers(removed_workers, n_removed_workers, sched_ctx_id, 1);
 		}
-		unlock_workers_for_changing_ctx(nworkers_to_remove, workers_to_remove);
+		notify_workers_about_changing_ctx_done(nworkers_to_remove, workers_to_remove);
 	}
 	_starpu_sched_ctx_unlock_write(sched_ctx_id);
 
@@ -2049,7 +2055,7 @@ int starpu_sched_ctx_max_priority_is_set(unsigned sched_ctx_id)
 	return sched_ctx->max_priority_is_set;
 }
 
-static void set_priority_on_locked_workers(int *workers, int nworkers, unsigned sched_ctx_id, unsigned priority)
+static void set_priority_on_notified_workers(int *workers, int nworkers, unsigned sched_ctx_id, unsigned priority)
 {
 	if(nworkers != -1)
 	{
@@ -2336,7 +2342,7 @@ static unsigned _worker_blocked_in_other_ctx(unsigned sched_ctx_id, int workerid
 
 }
 
-static void _starpu_sched_ctx_put_workers_to_sleep(unsigned sched_ctx_id, unsigned all)
+static void _starpu_sched_ctx_put_workers_to_sleep(unsigned sched_ctx_id, unsigned all, int notified_workers)
 {
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
 	int current_worker_id = starpu_worker_get_id();
@@ -2365,7 +2371,10 @@ static void _starpu_sched_ctx_put_workers_to_sleep(unsigned sched_ctx_id, unsign
 		{
 			if (current_worker_id == -1 || workerid != current_worker_id)
 			{
+				struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
+				STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
 				sched_ctx->parallel_sect[workerid] = 1;
+				STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
 			}
 		}
 		workers_count++;
@@ -2382,8 +2391,15 @@ static void _starpu_sched_ctx_put_workers_to_sleep(unsigned sched_ctx_id, unsign
 				&& !blocked[workers_count])
 		{
 			struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
-			while (!worker->state_blocked)
-				STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
+			STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
+			if (!notified_workers)
+			{
+				/* only wait if we are not in the middle of a notified changing ctx op,
+				 * otherwise, the workers will */
+				while (!worker->state_blocked)
+					STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
+			}
+			STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
 		}
 		workers_count++;
 	}
@@ -2392,7 +2408,7 @@ static void _starpu_sched_ctx_put_workers_to_sleep(unsigned sched_ctx_id, unsign
 		sched_ctx->main_master = -1;
 }
 
-static void _starpu_sched_ctx_wake_up_workers(unsigned sched_ctx_id, unsigned all, const int workers_locked)
+static void _starpu_sched_ctx_wake_up_workers(unsigned sched_ctx_id, unsigned all, int notified_workers)
 {
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
 	int current_worker_id = starpu_worker_get_id();
@@ -2416,11 +2432,10 @@ static void _starpu_sched_ctx_wake_up_workers(unsigned sched_ctx_id, unsigned al
 			 && sched_ctx->parallel_sect[workerid] && (workerid != master || all))
 		{
 			struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
-			if (!workers_locked)
-				STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
+			STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
 			if((current_worker_id == -1 || workerid != current_worker_id) && worker->state_blocked)
 			{
-				if (worker->state_wait_ack__blocked)
+				if (!notified_workers && worker->state_wait_ack__blocked)
 				{
 					worker->state_wait_ack__blocked = 0;
 					worker->state_wait_handshake__blocked = 1;
@@ -2435,8 +2450,7 @@ static void _starpu_sched_ctx_wake_up_workers(unsigned sched_ctx_id, unsigned al
 			}
 			else
 				sched_ctx->parallel_sect[workerid] = 0;
-			if (!workers_locked)
-				STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
+			STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
 		}
 	}
 
@@ -2448,7 +2462,7 @@ static void _starpu_sched_ctx_wake_up_workers(unsigned sched_ctx_id, unsigned al
 
 void* starpu_sched_ctx_exec_parallel_code(void* (*func)(void*), void* param, unsigned sched_ctx_id)
 {
-	_starpu_sched_ctx_put_workers_to_sleep(sched_ctx_id, 1);
+	_starpu_sched_ctx_put_workers_to_sleep(sched_ctx_id, 1, 0);
 
 	/* execute parallel code */
 	void* ret = func(param);
@@ -2470,7 +2484,7 @@ static void _starpu_sched_ctx_update_parallel_workers_with(unsigned sched_ctx_id
 
 	if(!sched_ctx->awake_workers)
 	{
-		_starpu_sched_ctx_put_workers_to_sleep(sched_ctx_id, 0);
+		_starpu_sched_ctx_put_workers_to_sleep(sched_ctx_id, 0, 1);
 	}
 }
 
@@ -2486,7 +2500,7 @@ static void _starpu_sched_ctx_update_parallel_workers_without(unsigned sched_ctx
 
 	if(!sched_ctx->awake_workers)
 	{
-		_starpu_sched_ctx_wake_up_workers(sched_ctx_id, 0, 0);
+		_starpu_sched_ctx_wake_up_workers(sched_ctx_id, 0, 1);
 	}
 }
 

+ 1 - 0
src/core/workers.c

@@ -581,6 +581,7 @@ static void _starpu_worker_init(struct _starpu_worker *workerarg, struct _starpu
 
 	workerarg->state_sched_op_pending = 0;
 	workerarg->state_changing_ctx_waiting = 0;
+	workerarg->state_changing_ctx_notice = 0;
 	workerarg->state_blocked = 0;
 	workerarg->state_wait_ack__blocked = 0;
 	workerarg->state_wait_handshake__blocked = 0;

+ 22 - 0
src/core/workers.h

@@ -84,6 +84,7 @@ LIST_TYPE(_starpu_worker,
         starpu_pthread_mutex_t sched_mutex; /* mutex protecting sched_cond */
 	int state_sched_op_pending:1; /* a task pop is ongoing even though sched_mutex may temporarily be unlocked */
 	int state_changing_ctx_waiting:1; /* a thread is waiting for transient operations such as pop to complete before acquiring sched_mutex and modifying the worker ctx*/
+	int state_changing_ctx_notice:1; /* the worker ctx is about to change or being changed, wait for flag to be cleared before starting new transient scheduling operations */
 	int state_blocked:1;
 	int state_wait_ack__blocked:1;
 	int state_wait_handshake__blocked:1;
@@ -616,6 +617,8 @@ struct _starpu_sched_ctx* _starpu_worker_get_ctx_stream(unsigned stream_workerid
  * should not be modified */
 static inline void _starpu_worker_enter_transient_sched_op(struct _starpu_worker * const worker)
 {
+	while (worker->state_changing_ctx_notice)
+		STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
 	worker->state_sched_op_pending = 1;
 }
 
@@ -633,6 +636,24 @@ static inline void  _starpu_worker_leave_transient_sched_op(struct _starpu_worke
 }
 
 /* Must be called with worker's sched_mutex held.
+ */
+static inline void _starpu_worker_set_changing_ctx_notice(struct _starpu_worker * const worker)
+{
+	/* another ctx change might be under way, wait for the way to be cleared */
+	while (worker->state_changing_ctx_notice)
+		STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
+	worker->state_changing_ctx_notice = 1;
+}
+
+/* Must be called with worker's sched_mutex held.
+ */
+static inline void _starpu_worker_clear_changing_ctx_notice(struct _starpu_worker * const worker)
+{
+	worker->state_changing_ctx_notice = 0;
+	STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
+}
+
+/* Must be called with worker's sched_mutex held.
  * Passively wait until state_sched_op_pending is cleared.
  */
 static inline void _starpu_worker_wait_for_transient_sched_op_completion(struct _starpu_worker * const worker)
@@ -640,6 +661,7 @@ static inline void _starpu_worker_wait_for_transient_sched_op_completion(struct
 	if (worker->state_sched_op_pending)
 	{
 		worker->state_changing_ctx_waiting = 1;
+		STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
 		do
 		{
 			STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);

+ 10 - 7
src/drivers/driver_common/driver_common.c

@@ -340,6 +340,7 @@ static void _starpu_exponential_backoff(struct _starpu_worker *worker)
 struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *worker, int workerid, unsigned memnode STARPU_ATTRIBUTE_UNUSED)
 {
 	STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
+	_starpu_worker_enter_transient_sched_op(worker);
 	struct starpu_task *task;
 	unsigned needed = 1;
 	unsigned executing STARPU_ATTRIBUTE_UNUSED = 0;
@@ -374,10 +375,10 @@ struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *worker, int w
 					{
 						STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
 					}
-					while (worker->state_wait_ack__blocked);
+					while (worker->state_wait_ack__blocked && !worker->state_changing_ctx_waiting);
 					worker->state_blocked = 0;
 					sched_ctx->parallel_sect[workerid] = 0;
-					if (worker->state_wait_handshake__blocked)
+					if (worker->state_wait_handshake__blocked && !worker->state_changing_ctx_waiting)
 					{
 						worker->state_wait_handshake__blocked = 0;
 						STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
@@ -401,17 +402,18 @@ struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *worker, int w
 				{
 					STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
 				}
-				while (worker->state_wait_ack__blocked);
+				while (worker->state_wait_ack__blocked && !worker->state_changing_ctx_waiting);
 				worker->state_blocked = 0;
 				sched_ctx->parallel_sect[workerid] = 0;
-				if (worker->state_wait_handshake__blocked)
+				if (worker->state_wait_handshake__blocked && !worker->state_changing_ctx_waiting)
 				{
 					worker->state_wait_handshake__blocked = 0;
 					STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
 				}
 			}
 		}
-
+		if (worker->state_changing_ctx_waiting)
+			break;
 		needed = !needed;
 	}
 
@@ -429,9 +431,7 @@ struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *worker, int w
 	/*else try to pop a task*/
 	else
 	{
-		_starpu_worker_enter_transient_sched_op(worker);
 		task = _starpu_pop_task(worker);
-		_starpu_worker_leave_transient_sched_op(worker);
 	}
 
 #if !defined(STARPU_SIMGRID)
@@ -449,6 +449,7 @@ struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *worker, int w
 		if (_starpu_worker_can_block(memnode, worker)
 			&& !_starpu_sched_ctx_last_worker_awake(worker))
 		{
+			_starpu_worker_leave_transient_sched_op(worker);
 			do
 			{
 				STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
@@ -458,6 +459,7 @@ struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *worker, int w
 		}
 		else
 		{
+			_starpu_worker_leave_transient_sched_op(worker);
 			STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
 			if (_starpu_machine_is_running())
 				_starpu_exponential_backoff(worker);
@@ -478,6 +480,7 @@ struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *worker, int w
 	}
 	worker->spinning_backoff = BACKOFF_MIN;
 
+	_starpu_worker_leave_transient_sched_op(worker);
 	STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);