Parcourir la source

hypervisor: steal work from other ctxs when the worker is idle
starpu: overlapping ctxs: impose priorities between ctxs when popping (needed for the stealing, the base ctxs have priority before the ones the workers are stealing from)
starpu: release workers sched mutex before taking the chaning ctx mutex (à little bit of a hack) TODO: make it better and remove this hack

Andra Hugo il y a 11 ans
Parent
commit
e28f7852a3

+ 2 - 0
include/starpu_sched_ctx.h

@@ -106,6 +106,8 @@ int starpu_get_nready_tasks_of_sched_ctx(unsigned sched_ctx_id);
 
 double starpu_get_nready_flops_of_sched_ctx(unsigned sched_ctx_id);
 
+void starpu_sched_ctx_set_priority(int *workers, int nworkers, unsigned sched_ctx_id, unsigned priority);
+
 #ifdef STARPU_USE_SC_HYPERVISOR
 void starpu_sched_ctx_call_pushed_task_cb(int workerid, unsigned sched_ctx_id);
 #endif //STARPU_USE_SC_HYPERVISOR

+ 8 - 2
include/starpu_thread_util.h

@@ -69,8 +69,11 @@
 	}                                                                      \
 } while(0)
 
-#define STARPU_PTHREAD_MUTEX_LOCK(mutex) do {                                 \
-	int p_ret = starpu_pthread_mutex_lock(mutex);                          \
+#define STARPU_PTHREAD_MUTEX_LOCK(mutex) do {				      \
+	int p_ret = starpu_pthread_mutex_lock(mutex);			      \
+	int workerid = starpu_worker_get_id();                                \
+	if(workerid != -1 && starpu_worker_mutex_is_sched_mutex(workerid, mutex)) \
+		starpu_worker_set_flag_sched_mutex_locked(workerid, 1);	      \
 	if (STARPU_UNLIKELY(p_ret)) {                                          \
 		fprintf(stderr,                                                \
 			"%s:%d starpu_pthread_mutex_lock: %s\n",               \
@@ -96,6 +99,9 @@ int _STARPU_PTHREAD_MUTEX_TRYLOCK(starpu_pthread_mutex_t *mutex, char *file, int
 
 #define STARPU_PTHREAD_MUTEX_UNLOCK(mutex) do {                               \
 	int p_ret = starpu_pthread_mutex_unlock(mutex);                        \
+	int workerid = starpu_worker_get_id();                                \
+	if(workerid != -1 && starpu_worker_mutex_is_sched_mutex(workerid, mutex)) \
+		starpu_worker_set_flag_sched_mutex_locked(workerid, 0);	      \
 	if (STARPU_UNLIKELY(p_ret)) {                                          \
 		fprintf(stderr,                                                \
 			"%s:%d starpu_pthread_mutex_unlock: %s\n",             \

+ 4 - 0
include/starpu_worker.h

@@ -96,6 +96,10 @@ int starpu_worker_get_mp_nodeid(int id);
 
 int starpu_worker_get_nsched_ctxs(int workerid);
 
+void starpu_worker_set_flag_sched_mutex_locked(int workerid, unsigned flag);
+
+unsigned starpu_worker_mutex_is_sched_mutex(int workerid, pthread_mutex_t *mutex);
+
 #ifdef __cplusplus
 }
 #endif

+ 47 - 1
sc_hypervisor/src/sc_hypervisor.c

@@ -586,6 +586,7 @@ void sc_hypervisor_add_workers_to_sched_ctx(int* workers_to_add, unsigned nworke
 			printf(" %d", workers_to_add[j]);
 		printf("\n");
 		starpu_sched_ctx_add_workers(workers_to_add, nworkers_to_add, sched_ctx);
+		starpu_sched_ctx_set_priority(workers_to_add, nworkers_to_add, sched_ctx, 1);
 		struct sc_hypervisor_policy_config *new_config = sc_hypervisor_get_config(sched_ctx);
 		unsigned i;
 		for(i = 0; i < nworkers_to_add; i++)
@@ -945,6 +946,25 @@ static void notify_pushed_task(unsigned sched_ctx, int worker)
 		hypervisor.policy.handle_pushed_task(sched_ctx, worker);
 }
 
+unsigned choose_ctx_to_steal(int worker)
+{
+	int j;
+	int ns = hypervisor.nsched_ctxs;
+	int max_ready_tasks = 0;
+	unsigned chosen_ctx = STARPU_NMAX_SCHED_CTXS;
+	for(j = 0; j < ns; j++)
+	{
+		unsigned other_ctx = hypervisor.sched_ctxs[j];
+		int nready = starpu_get_nready_tasks_of_sched_ctx(other_ctx);
+		if(!starpu_sched_ctx_contains_worker(worker, other_ctx) && max_ready_tasks < nready)
+		{
+			max_ready_tasks = nready;
+			chosen_ctx = other_ctx;
+		}
+	}
+	return chosen_ctx;
+}
+
 /* notifies the hypervisor that the worker spent another cycle in idle time */
 static void notify_idle_cycle(unsigned sched_ctx, int worker, double idle_time)
 {
@@ -969,7 +989,33 @@ static void notify_idle_cycle(unsigned sched_ctx, int worker, double idle_time)
 		{
 			if(sc_hypervisor_check_idle(sched_ctx, worker))
 			{
-				hypervisor.policy.handle_idle_cycle(sched_ctx, worker);
+				int ret = starpu_pthread_mutex_trylock(&act_hypervisor_mutex);
+				if(ret != EBUSY)
+				{
+					int j;
+					int ns = hypervisor.nsched_ctxs;
+					unsigned idle_everywhere = 1;
+					for(j = 0; j < ns; j++)
+					{
+						if(starpu_sched_ctx_contains_worker(worker, hypervisor.sched_ctxs[j]))
+						{
+							if(!sc_hypervisor_check_idle(hypervisor.sched_ctxs[j], worker))
+								idle_everywhere = 0;
+						}
+					}
+					if(idle_everywhere)
+					{
+						unsigned other_ctx = choose_ctx_to_steal(worker);
+						if(other_ctx != STARPU_NMAX_SCHED_CTXS)
+						{
+							sc_hypervisor_add_workers_to_sched_ctx(&worker, 1, other_ctx);
+							starpu_sched_ctx_set_priority(&worker, 1, other_ctx, 0);
+							_sc_hypervisor_allow_compute_idle(other_ctx, worker, 0);
+						}
+					}
+					starpu_pthread_mutex_unlock(&act_hypervisor_mutex);
+				}
+//				hypervisor.policy.handle_idle_cycle(sched_ctx, worker);
 			}
 		}
 	}

+ 48 - 28
src/core/sched_ctx.c

@@ -74,16 +74,9 @@ static void _starpu_update_workers_with_ctx(int *workerids, int nworkers, int sc
 	{
 		worker = _starpu_get_worker_struct(workerids[i]);
 
-		/* if the current thread requires resize it's no need
-		   to lock it in order to change its  sched_ctx info */
-		if(curr_worker && curr_worker == worker)
-			_starpu_worker_gets_into_ctx(sched_ctx_id, worker);
-		else
-		{
-			STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
-			_starpu_worker_gets_into_ctx(sched_ctx_id, worker);
-			STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
-		}
+		STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
+		_starpu_worker_gets_into_ctx(sched_ctx_id, worker);
+		STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
 	}
 
 	return;
@@ -100,25 +93,15 @@ static void _starpu_update_workers_without_ctx(int *workerids, int nworkers, int
 		worker = _starpu_get_worker_struct(workerids[i]);
 		if(now)
 		{
-			if(curr_worker && curr_worker == worker)
-				_starpu_worker_gets_out_of_ctx(sched_ctx_id, worker);
-			else
-			{
-					STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
-					_starpu_worker_gets_out_of_ctx(sched_ctx_id, worker);
-					STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
-			}
+			STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
+			_starpu_worker_gets_out_of_ctx(sched_ctx_id, worker);
+			STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
 		}
 		else
 		{
-			if(curr_worker && curr_worker == worker)
-				worker->removed_from_ctx[sched_ctx_id] = 1;
-			else
-			{
-				STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
-				worker->removed_from_ctx[sched_ctx_id] = 1;
-				STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
-			}
+			STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
+			worker->removed_from_ctx[sched_ctx_id] = 1;
+			STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
 		}
 	}
 	return;
@@ -480,8 +463,9 @@ unsigned starpu_sched_ctx_create_inside_interval(const char *policy_name, const
 	sched_ctx->max_ncpus = max_ncpus;
 	sched_ctx->min_ngpus = min_ngpus;
 	sched_ctx->max_ngpus = max_ngpus;
-
+	_starpu_unlock_mutex_if_prev_locked();
 	_starpu_update_workers_without_ctx(sched_ctx->workers->workerids, sched_ctx->workers->nworkers, sched_ctx->id, 0);
+	_starpu_relock_mutex_if_prev_locked();
 #ifdef STARPU_USE_SC_HYPERVISOR
 	sched_ctx->perf_counters = NULL;
 #endif
@@ -533,7 +517,9 @@ unsigned starpu_sched_ctx_create(int *workerids, int nworkers, const char *sched
 	struct _starpu_sched_ctx *sched_ctx = NULL;
 	sched_ctx = _starpu_create_sched_ctx(sched_policy, workerids, nworkers, 0, sched_ctx_name, min_prio_set, min_prio, max_prio_set, max_prio);
 
+	_starpu_unlock_mutex_if_prev_locked();
 	_starpu_update_workers_with_ctx(sched_ctx->workers->workerids, sched_ctx->workers->nworkers, sched_ctx->id);
+	_starpu_relock_mutex_if_prev_locked();
 #ifdef STARPU_USE_SC_HYPERVISOR
 	sched_ctx->perf_counters = NULL;
 #endif
@@ -594,6 +580,7 @@ void starpu_sched_ctx_delete(unsigned sched_ctx_id)
 	unsigned inheritor_sched_ctx_id = sched_ctx->inheritor;
 	struct _starpu_sched_ctx *inheritor_sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx->inheritor);
 
+	_starpu_unlock_mutex_if_prev_locked();
 	STARPU_PTHREAD_RWLOCK_WRLOCK(&changing_ctx_mutex[sched_ctx_id]);
 	STARPU_ASSERT(sched_ctx->id != STARPU_NMAX_SCHED_CTXS);
 
@@ -625,6 +612,7 @@ void starpu_sched_ctx_delete(unsigned sched_ctx_id)
 	   you don't use it anymore */
 	free(workerids);
 	STARPU_PTHREAD_RWLOCK_UNLOCK(&changing_ctx_mutex[sched_ctx_id]);
+	_starpu_relock_mutex_if_prev_locked();
 	return;
 }
 
@@ -644,7 +632,6 @@ void _starpu_delete_all_sched_ctxs()
 			_starpu_delete_sched_ctx(sched_ctx);
 		}
 		STARPU_PTHREAD_RWLOCK_UNLOCK(&changing_ctx_mutex[i]);
-
 		STARPU_PTHREAD_RWLOCK_DESTROY(&changing_ctx_mutex[i]);
 	}
 	return;
@@ -708,6 +695,8 @@ void starpu_sched_ctx_add_workers(int *workers_to_add, int nworkers_to_add, unsi
 	int added_workers[nworkers_to_add];
 	int n_added_workers = 0;
 
+	_starpu_unlock_mutex_if_prev_locked();
+
 	STARPU_PTHREAD_RWLOCK_WRLOCK(&changing_ctx_mutex[sched_ctx_id]);
 
 	STARPU_ASSERT(workers_to_add != NULL && nworkers_to_add > 0);
@@ -727,6 +716,8 @@ void starpu_sched_ctx_add_workers(int *workers_to_add, int nworkers_to_add, unsi
 
 	STARPU_PTHREAD_RWLOCK_UNLOCK(&changing_ctx_mutex[sched_ctx_id]);
 
+	_starpu_relock_mutex_if_prev_locked();
+
 	if(sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
 	{
 		STARPU_PTHREAD_RWLOCK_RDLOCK(&changing_ctx_mutex[sched_ctx_id]);
@@ -745,6 +736,8 @@ void starpu_sched_ctx_remove_workers(int *workers_to_remove, int nworkers_to_rem
 
 	_starpu_check_workers(workers_to_remove, nworkers_to_remove);
 
+	_starpu_unlock_mutex_if_prev_locked();
+
 	STARPU_PTHREAD_RWLOCK_WRLOCK(&changing_ctx_mutex[sched_ctx_id]);
 	/* if the context has not already been deleted */
 	if(sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
@@ -756,12 +749,16 @@ void starpu_sched_ctx_remove_workers(int *workers_to_remove, int nworkers_to_rem
 
 	}
 	STARPU_PTHREAD_RWLOCK_UNLOCK(&changing_ctx_mutex[sched_ctx_id]);
+
+	_starpu_relock_mutex_if_prev_locked();
+
 	return;
 }
 
 int _starpu_nworkers_able_to_execute_task(struct starpu_task *task, struct _starpu_sched_ctx *sched_ctx)
 {
 	unsigned worker = 0, nworkers = 0;
+
 	STARPU_PTHREAD_RWLOCK_WRLOCK(&changing_ctx_mutex[sched_ctx->id]);
 	struct starpu_worker_collection *workers = sched_ctx->workers;
 
@@ -1334,6 +1331,29 @@ int starpu_sched_ctx_max_priority_is_set(unsigned sched_ctx_id)
 	return sched_ctx->max_priority_is_set;
 }
 
+void starpu_sched_ctx_set_priority(int *workers, int nworkers, unsigned sched_ctx_id, unsigned priority)
+{
+	if(nworkers != -1)
+	{
+		int w;
+		struct _starpu_worker *worker = NULL;
+		for(w = 0; w < nworkers; w++)
+		{
+			worker = _starpu_get_worker_struct(workers[w]);
+			struct _starpu_sched_ctx_list *l = NULL;
+			for (l = worker->sched_ctx_list; l; l = l->next)
+			{
+				if(l->sched_ctx == sched_ctx_id)
+				{
+					l->priority = priority;
+					break;
+				}
+			}
+		}
+	}
+	return;
+}
+
 unsigned _starpu_sched_ctx_last_worker_awake(struct _starpu_worker *worker)
 {
 	struct _starpu_sched_ctx_list *l = NULL;

+ 2 - 0
src/core/sched_ctx_list.c

@@ -21,6 +21,7 @@ void _starpu_sched_ctx_list_init(struct _starpu_sched_ctx_list *list)
 {
 	list->next = NULL;
 	list->sched_ctx = STARPU_NMAX_SCHED_CTXS;
+	list->priority = 1;
 }
 
 void _starpu_sched_ctx_list_add(struct _starpu_sched_ctx_list **list, unsigned sched_ctx)
@@ -31,6 +32,7 @@ void _starpu_sched_ctx_list_add(struct _starpu_sched_ctx_list **list, unsigned s
 	{
 		struct _starpu_sched_ctx_list *l = (struct _starpu_sched_ctx_list*)malloc(sizeof(struct _starpu_sched_ctx_list));
 		l->sched_ctx = sched_ctx;
+		l->priority = 1;
 		l->next = *list;
 		*list = l;
 	}

+ 1 - 0
src/core/sched_ctx_list.h

@@ -21,6 +21,7 @@ struct _starpu_sched_ctx_list
 {
 	struct _starpu_sched_ctx_list *next;
 	unsigned sched_ctx;
+	unsigned priority;
 };
 
 void _starpu_sched_ctx_list_init(struct _starpu_sched_ctx_list *list);

+ 56 - 17
src/core/sched_policy.c

@@ -571,34 +571,69 @@ struct _starpu_sched_ctx* _get_next_sched_ctx_to_pop_into(struct _starpu_worker
 	struct _starpu_sched_ctx *sched_ctx, *good_sched_ctx = NULL;
 	unsigned smallest_counter =  worker->nsched_ctxs;
 	struct _starpu_sched_ctx_list *l = NULL;
-	if(!worker->reverse_phase)
+	unsigned are_2_priorities = 0;
+	for (l = worker->sched_ctx_list; l; l = l->next)
+	{
+		if(l->priority != worker->pop_ctx_priority)
+		{
+			are_2_priorities = 1;
+			break;
+		}
+	}
+
+	if(!worker->reverse_phase[worker->pop_ctx_priority])
 	{
 		/* find a context in which the worker hasn't poped yet */
 		for (l = worker->sched_ctx_list; l; l = l->next)
 		{
-			if(!worker->poped_in_ctx[l->sched_ctx])
+			if(l->priority == worker->pop_ctx_priority)
 			{
-				worker->poped_in_ctx[l->sched_ctx] = !worker->poped_in_ctx[l->sched_ctx];
-				return	_starpu_get_sched_ctx_struct(l->sched_ctx);
+				if(!worker->poped_in_ctx[l->sched_ctx])
+				{
+					worker->poped_in_ctx[l->sched_ctx] = !worker->poped_in_ctx[l->sched_ctx];
+					return	_starpu_get_sched_ctx_struct(l->sched_ctx);
+				}
 			}
 		}
-		worker->reverse_phase = !worker->reverse_phase;
+		worker->reverse_phase[worker->pop_ctx_priority] = !worker->reverse_phase[worker->pop_ctx_priority];
+		if(are_2_priorities)
+			worker->pop_ctx_priority = !worker->pop_ctx_priority;
 	}
-	if(worker->reverse_phase)
+	are_2_priorities = 0;
+	if(worker->reverse_phase[worker->pop_ctx_priority])
 	{
 		/* if the context has already poped in every one start from the begining */
 		for (l = worker->sched_ctx_list; l; l = l->next)
 		{
-			if(worker->poped_in_ctx[l->sched_ctx])
+			if(l->priority == worker->pop_ctx_priority)
 			{
-				worker->poped_in_ctx[l->sched_ctx] = !worker->poped_in_ctx[l->sched_ctx];
-				return	_starpu_get_sched_ctx_struct(l->sched_ctx);
+				if(worker->poped_in_ctx[l->sched_ctx])
+				{
+					worker->poped_in_ctx[l->sched_ctx] = !worker->poped_in_ctx[l->sched_ctx];
+					return	_starpu_get_sched_ctx_struct(l->sched_ctx);
+				}
 			}
 		}
-		worker->reverse_phase = !worker->reverse_phase;
+		worker->reverse_phase[worker->pop_ctx_priority] = !worker->reverse_phase[worker->pop_ctx_priority];
+		if(are_2_priorities)
+			worker->pop_ctx_priority = !worker->pop_ctx_priority;
 	}	
-	worker->poped_in_ctx[worker->sched_ctx_list->sched_ctx] = !worker->poped_in_ctx[worker->sched_ctx_list->sched_ctx];
-	return _starpu_get_sched_ctx_struct(worker->sched_ctx_list->sched_ctx);
+
+	unsigned first_sched_ctx = STARPU_NMAX_SCHED_CTXS;
+	for (l = worker->sched_ctx_list; l; l = l->next)
+	{
+		if(l->priority == worker->pop_ctx_priority)
+		{
+			first_sched_ctx = l->sched_ctx;
+			break;
+		}
+	}
+
+	if(worker->pop_ctx_priority == 0 && first_sched_ctx == STARPU_NMAX_SCHED_CTXS)
+		first_sched_ctx = worker->sched_ctx_list->sched_ctx;
+
+	worker->poped_in_ctx[first_sched_ctx] = !worker->poped_in_ctx[first_sched_ctx];
+	return _starpu_get_sched_ctx_struct(first_sched_ctx);
 }
 
 struct starpu_task *_starpu_pop_task(struct _starpu_worker *worker)
@@ -639,7 +674,7 @@ pick:
 				while(1)
 				{
 					sched_ctx = _get_next_sched_ctx_to_pop_into(worker);
-					
+
 					if(worker->removed_from_ctx[sched_ctx->id] == 1 && worker->shares_tasks_lists[sched_ctx->id] == 1)
 					{
 						_starpu_worker_gets_out_of_ctx(sched_ctx->id, worker);
@@ -651,7 +686,6 @@ pick:
 				}
 			}
 
-
 			if(sched_ctx && sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
 			{
 				if (sched_ctx->sched_policy && sched_ctx->sched_policy->pop_task)
@@ -672,19 +706,24 @@ pick:
 					worker->removed_from_ctx[sched_ctx->id] = 0;
 				}
 #ifdef STARPU_USE_SC_HYPERVISOR
-				struct starpu_sched_ctx_performance_counters *perf_counters = sched_ctx->perf_counters;
-				if(sched_ctx->id != 0 && perf_counters != NULL && perf_counters->notify_idle_cycle && _starpu_sched_ctx_allow_hypervisor(sched_ctx->id))
+				if(worker->pop_ctx_priority)
 				{
+					struct starpu_sched_ctx_performance_counters *perf_counters = sched_ctx->perf_counters;
+					if(sched_ctx->id != 0 && perf_counters != NULL && perf_counters->notify_idle_cycle && _starpu_sched_ctx_allow_hypervisor(sched_ctx->id))
+					{
 //					_STARPU_TRACE_HYPERVISOR_BEGIN();
-					perf_counters->notify_idle_cycle(sched_ctx->id, worker->workerid, 1.0);
+						perf_counters->notify_idle_cycle(sched_ctx->id, worker->workerid, 1.0);
 //					_STARPU_TRACE_HYPERVISOR_END();
+					}
 				}
 #endif //STARPU_USE_SC_HYPERVISOR
 				
 #ifndef STARPU_NON_BLOCKING_DRIVERS
 				if(been_here[sched_ctx->id] || worker->nsched_ctxs == 1)
 					break;
+
 				been_here[sched_ctx->id] = 1;
+
 #endif
 			}
 		}

+ 45 - 1
src/core/workers.c

@@ -444,7 +444,10 @@ static void _starpu_worker_init(struct _starpu_worker *workerarg, struct _starpu
 		workerarg->shares_tasks_lists[ctx] = 0;
 		workerarg->poped_in_ctx[ctx] = 0;
 	}
-	workerarg->reverse_phase = 0;
+	workerarg->reverse_phase[0] = 0;
+	workerarg->reverse_phase[1] = 0;
+	workerarg->pop_ctx_priority = 1;
+	workerarg->sched_mutex_locked = 0;
 
 	/* cpu_set/hwloc_cpu_set initialized in topology.c */
 }
@@ -1760,3 +1763,44 @@ void starpu_get_version(int *major, int *minor, int *release)
 	*minor = STARPU_MINOR_VERSION;
 	*release = STARPU_RELEASE_VERSION;
 }
+
+void _starpu_unlock_mutex_if_prev_locked()
+{
+	int workerid = starpu_worker_get_id();
+	if(workerid != -1)
+	{
+		struct _starpu_worker *w = _starpu_get_worker_struct(workerid);
+		printf("got here but not locked\n");
+		if(w->sched_mutex_locked)
+		{
+			printf("unlock in decrement \n");
+			STARPU_PTHREAD_MUTEX_UNLOCK(&w->sched_mutex);
+			starpu_worker_set_flag_sched_mutex_locked(workerid, 1);
+		}
+	}
+	return;
+}
+
+void _starpu_relock_mutex_if_prev_locked()
+{
+	int workerid = starpu_worker_get_id();
+	if(workerid != -1)
+	{
+		struct _starpu_worker *w = _starpu_get_worker_struct(workerid);
+		if(w->sched_mutex_locked)
+			STARPU_PTHREAD_MUTEX_LOCK(&w->sched_mutex);
+	}
+	return;
+}
+
+void starpu_worker_set_flag_sched_mutex_locked(int workerid, unsigned flag)
+{
+	struct _starpu_worker *w = _starpu_get_worker_struct(workerid);
+	w->sched_mutex_locked = flag;
+}
+
+unsigned starpu_worker_mutex_is_sched_mutex(int workerid, pthread_mutex_t *mutex)
+{
+	struct _starpu_worker *w = _starpu_get_worker_struct(workerid);
+	return &w->sched_mutex == mutex;
+}

+ 12 - 2
src/core/workers.h

@@ -110,10 +110,14 @@ LIST_TYPE(_starpu_worker,
 	unsigned poped_in_ctx[STARPU_NMAX_SCHED_CTXS];	  
 
        /* boolean indicating at which moment we checked all ctxs and change phase for the booleab poped_in_ctx*/
-	unsigned reverse_phase;
-
+       /* one for each of the 2 priorities*/
+	unsigned reverse_phase[2];
 
+	/* indicate which priority of ctx is currently active: the values are 0 or 1*/
+	unsigned pop_ctx_priority;
 
+	/* flag to know if sched_mutex is locked or not */
+	unsigned sched_mutex_locked;
 #ifdef __GLIBC__
 	cpu_set_t cpu_set;
 #endif /* __GLIBC__ */
@@ -402,4 +406,10 @@ int starpu_worker_get_nids_by_type(enum starpu_worker_archtype type, int *worker
    the list might not be updated */
 int starpu_worker_get_nids_ctx_free_by_type(enum starpu_worker_archtype type, int *workerids, int maxsize);
 
+/* if the current worker has the lock release it */
+void _starpu_unlock_mutex_if_prev_locked();
+
+/* if we prev released the lock relock it */
+void _starpu_relock_mutex_if_prev_locked();
+
 #endif // __WORKERS_H__