浏览代码

Merge from trunk @11789:11910

Marc Sergent 11 年之前
父节点
当前提交
638f30cba2

+ 17 - 16
ChangeLog

@@ -40,22 +40,6 @@ New features:
     handle (sequential consistency will be enabled or disabled based
     on the value of the function parameter and the value of the
     sequential consistency defined for the given data)
-  * Performance models files are now stored in a directory whose name
-    include the version of the performance model format. The version
-    number is also written in the file itself.
-    When updating the format, the internal variable
-    _STARPU_PERFMODEL_VERSION should be updated. It is then possible
-    to switch easily between differents versions of StarPU having
-    different performance model formats.
-  * Tasks can now define a optional prologue callback which is executed
-    on the host when the task becomes ready for execution, before getting
-    scheduled.
-  * Small CUDA allocations (<= 4MiB) are now batched to avoid the huge
-    cudaMalloc overhead.
-  * Prefetching is now done for all schedulers when it can be done whatever
-    the scheduling decision.
-  * Add a watchdog which permits to easily trigger a crash when StarPU gets
-    stuck.
   * New hierarchical schedulers which allow the user to easily build
     its own scheduler, by coding itself each "box" it wants, or by
     combining existing boxes in StarPU to build it. Hierarchical
@@ -199,6 +183,23 @@ New features:
     the Simgrid StarPU features should use it.
   * Allow to have a dynamically allocated number of buffers per task,
     and so overwrite the value defined --enable-maxbuffers=XXX
+  * Performance models files are now stored in a directory whose name
+    include the version of the performance model format. The version
+    number is also written in the file itself.
+    When updating the format, the internal variable
+    _STARPU_PERFMODEL_VERSION should be updated. It is then possible
+    to switch easily between differents versions of StarPU having
+    different performance model formats.
+  * Tasks can now define a optional prologue callback which is executed
+    on the host when the task becomes ready for execution, before getting
+    scheduled.
+  * Small CUDA allocations (<= 4MiB) are now batched to avoid the huge
+    cudaMalloc overhead.
+  * Prefetching is now done for all schedulers when it can be done whatever
+    the scheduling decision.
+  * Add a watchdog which permits to easily trigger a crash when StarPU gets
+    stuck.
+  * Document how to migrate data over MPI.
 
 Small features:
   * Add starpu_worker_get_by_type and starpu_worker_get_by_devid

+ 58 - 1
doc/doxygen/chapters/08mpi_support.doxy

@@ -271,7 +271,7 @@ data which will be needed by the tasks that we will execute.
     for(x = 0; x < X; x++) {
         for (y = 0; y < Y; y++) {
             int mpi_rank = my_distrib(x, y, size);
-             if (mpi_rank == my_rank)
+            if (mpi_rank == my_rank)
                 /* Owning data */
                 starpu_variable_data_register(&data_handles[x][y], STARPU_MAIN_RAM,
                                               (uintptr_t)&(matrix[x][y]), sizeof(unsigned));
@@ -318,6 +318,63 @@ application can prune the task for loops according to the data distribution,
 so as to only submit tasks on nodes which have to care about them (either to
 execute them, or to send the required data).
 
+\section MPIMigration MPI Data migration
+
+The application can dynamically change its mind about the data distribution, to
+balance the load over MPI nodes for instance. This can be done very simply by
+requesting an explicit move and then change the registered rank. For instance,
+we here switch to a new distribution function <c>my_distrib2</c>: we first
+register any data that wasn't registered already and will be needed, then
+migrate the data, and register the new location.
+
+\code{.c}
+    for(x = 0; x < X; x++) {
+        for (y = 0; y < Y; y++) {
+            int mpi_rank = my_distrib2(x, y, size);
+            if (!data_handles[x][y] && (mpi_rank == my_rank
+                  || my_rank == my_distrib(x+1, y, size) || my_rank == my_distrib(x-1, y, size)
+                  || my_rank == my_distrib(x, y+1, size) || my_rank == my_distrib(x, y-1, size)))
+                /* Register newly-needed data */
+                starpu_variable_data_register(&data_handles[x][y], -1,
+                                              (uintptr_t)NULL, sizeof(unsigned));
+            if (data_handles[x][y]) {
+                /* Migrate the data */
+                starpu_mpi_get_data_on_node_detached(MPI_COMM_WORLD, data_handles[x][y], mpi_rank, NULL, NULL);
+                /* And register the new rank of the matrix */
+                starpu_data_set_rank(data_handles[x][y], mpi_rank);
+            }
+        }
+    }
+\endcode
+
+From then on, further tasks submissions will use the new data distribution,
+which will thus change both MPI communications and task assignments.
+
+Very importantly, since all nodes have to agree on which node owns which data
+so as to determine MPI communications and task assignments the same way, all
+nodes have to perform the same data migration, and at the same point among task
+submissions. It thus does not require a strict synchronization, just a clear
+separation of task submissions before and after the data redistribution.
+
+Before data unregistration, it has to be migrated back to its original home
+node (the value, at least), since that is where the user-provided buffer
+resides. Otherwise the unregistration will complain that it does not have the
+latest value on the original home node.
+
+\code{.c}
+    for(x = 0; x < X; x++) {
+        for (y = 0; y < Y; y++) {
+            if (data_handles[x][y]) {
+                int mpi_rank = my_distrib(x, y, size);
+                /* Get back data to original place where the user-provided buffer is.  */
+                starpu_mpi_get_data_on_node_detached(MPI_COMM_WORLD, data_handles[x][y], mpi_rank, NULL, NULL);
+                /* And unregister it */
+                starpu_data_unregister(data_handles[x][y]);
+            }
+        }
+    }
+\endcode
+
 \section MPICollective MPI Collective Operations
 
 The functions are described in \ref MPICollectiveOperations "MPI Collective Operations".

+ 2 - 0
include/starpu_sched_ctx.h

@@ -106,6 +106,8 @@ int starpu_get_nready_tasks_of_sched_ctx(unsigned sched_ctx_id);
 
 double starpu_get_nready_flops_of_sched_ctx(unsigned sched_ctx_id);
 
+void starpu_sched_ctx_set_priority(int *workers, int nworkers, unsigned sched_ctx_id, unsigned priority);
+
 #ifdef STARPU_USE_SC_HYPERVISOR
 void starpu_sched_ctx_call_pushed_task_cb(int workerid, unsigned sched_ctx_id);
 #endif //STARPU_USE_SC_HYPERVISOR

+ 2 - 2
include/starpu_thread_util.h

@@ -69,8 +69,8 @@
 	}                                                                      \
 } while(0)
 
-#define STARPU_PTHREAD_MUTEX_LOCK(mutex) do {                                 \
-	int p_ret = starpu_pthread_mutex_lock(mutex);                          \
+#define STARPU_PTHREAD_MUTEX_LOCK(mutex) do {				      \
+	int p_ret = starpu_pthread_mutex_lock(mutex);			      \
 	if (STARPU_UNLIKELY(p_ret)) {                                          \
 		fprintf(stderr,                                                \
 			"%s:%d starpu_pthread_mutex_lock: %s\n",               \

+ 4 - 0
include/starpu_worker.h

@@ -96,6 +96,10 @@ int starpu_worker_get_mp_nodeid(int id);
 
 int starpu_worker_get_nsched_ctxs(int workerid);
 
+void starpu_worker_set_flag_sched_mutex_locked(int workerid, unsigned flag);
+
+unsigned starpu_worker_mutex_is_sched_mutex(int workerid, pthread_mutex_t *mutex);
+
 #ifdef __cplusplus
 }
 #endif

+ 58 - 0
mpi/examples/stencil/stencil5.c

@@ -58,6 +58,12 @@ int my_distrib(int x, int y, int nb_nodes)
 	return ((int)(x / sqrt(nb_nodes) + (y / sqrt(nb_nodes)) * sqrt(nb_nodes))) % nb_nodes;
 }
 
+/* Shifted distribution, for migration example */
+int my_distrib2(int x, int y, int nb_nodes)
+{
+	return (my_distrib(x, y, nb_nodes) + 1) % nb_nodes;
+}
+
 
 static void parse_args(int argc, char **argv)
 {
@@ -91,6 +97,7 @@ int main(int argc, char **argv)
 
 	parse_args(argc, argv);
 
+	/* Initial data values */
 	for(x = 0; x < X; x++)
 	{
 		for (y = 0; y < Y; y++)
@@ -102,6 +109,7 @@ int main(int argc, char **argv)
 	}
 	mean /= value;
 
+	/* Initial distribution */
 	for(x = 0; x < X; x++)
 	{
 		for (y = 0; y < Y; y++)
@@ -132,6 +140,49 @@ int main(int argc, char **argv)
 		}
 	}
 
+	/* First computation with initial distribution */
+	for(loop=0 ; loop<niter; loop++)
+	{
+		for (x = 1; x < X-1; x++)
+		{
+			for (y = 1; y < Y-1; y++)
+			{
+				starpu_mpi_task_insert(MPI_COMM_WORLD, &stencil5_cl, STARPU_RW, data_handles[x][y],
+						       STARPU_R, data_handles[x-1][y], STARPU_R, data_handles[x+1][y],
+						       STARPU_R, data_handles[x][y-1], STARPU_R, data_handles[x][y+1],
+						       0);
+			}
+		}
+	}
+	FPRINTF(stderr, "Waiting ...\n");
+	starpu_task_wait_for_all();
+
+	/* Now migrate data to a new distribution */
+
+	/* First register newly needed data */
+	for(x = 0; x < X; x++)
+	{
+		for (y = 0; y < Y; y++)
+		{
+			int mpi_rank = my_distrib2(x, y, size);
+			if (!data_handles[x][y] && (mpi_rank == my_rank
+				 || my_rank == my_distrib(x+1, y, size) || my_rank == my_distrib(x-1, y, size)
+				 || my_rank == my_distrib(x, y+1, size) || my_rank == my_distrib(x, y-1, size)))
+			{
+				/* Register newly-needed data */
+				starpu_variable_data_register(&data_handles[x][y], -1, (uintptr_t)NULL, sizeof(unsigned));
+			}
+			if (data_handles[x][y] && mpi_rank != starpu_data_get_rank(data_handles[x][y]))
+			{
+				/* Migrate the data */
+				starpu_mpi_get_data_on_node_detached(MPI_COMM_WORLD, data_handles[x][y], mpi_rank, NULL, NULL);
+				/* And register new rank of the matrix */
+				starpu_data_set_rank(data_handles[x][y], mpi_rank);
+			}
+		}
+	}
+
+	/* Second computation with new distribution */
 	for(loop=0 ; loop<niter; loop++)
 	{
 		for (x = 1; x < X-1; x++)
@@ -148,12 +199,19 @@ int main(int argc, char **argv)
 	FPRINTF(stderr, "Waiting ...\n");
 	starpu_task_wait_for_all();
 
+	/* Unregister data */
 	for(x = 0; x < X; x++)
 	{
 		for (y = 0; y < Y; y++)
 		{
 			if (data_handles[x][y])
 			{
+				int mpi_rank = my_distrib(x, y, size);
+				/* Get back data to original place where the user-provided buffer is. */
+				starpu_mpi_get_data_on_node_detached(MPI_COMM_WORLD, data_handles[x][y], mpi_rank, NULL, NULL);
+				/* Register original rank of the matrix (although useless) */
+				starpu_data_set_rank(data_handles[x][y], mpi_rank);
+				/* And unregister it */
 				starpu_data_unregister(data_handles[x][y]);
 			}
 		}

+ 47 - 1
sc_hypervisor/src/sc_hypervisor.c

@@ -586,6 +586,7 @@ void sc_hypervisor_add_workers_to_sched_ctx(int* workers_to_add, unsigned nworke
 			printf(" %d", workers_to_add[j]);
 		printf("\n");
 		starpu_sched_ctx_add_workers(workers_to_add, nworkers_to_add, sched_ctx);
+		starpu_sched_ctx_set_priority(workers_to_add, nworkers_to_add, sched_ctx, 1);
 		struct sc_hypervisor_policy_config *new_config = sc_hypervisor_get_config(sched_ctx);
 		unsigned i;
 		for(i = 0; i < nworkers_to_add; i++)
@@ -945,6 +946,25 @@ static void notify_pushed_task(unsigned sched_ctx, int worker)
 		hypervisor.policy.handle_pushed_task(sched_ctx, worker);
 }
 
+unsigned choose_ctx_to_steal(int worker)
+{
+	int j;
+	int ns = hypervisor.nsched_ctxs;
+	int max_ready_tasks = 0;
+	unsigned chosen_ctx = STARPU_NMAX_SCHED_CTXS;
+	for(j = 0; j < ns; j++)
+	{
+		unsigned other_ctx = hypervisor.sched_ctxs[j];
+		int nready = starpu_get_nready_tasks_of_sched_ctx(other_ctx);
+		if(!starpu_sched_ctx_contains_worker(worker, other_ctx) && max_ready_tasks < nready)
+		{
+			max_ready_tasks = nready;
+			chosen_ctx = other_ctx;
+		}
+	}
+	return chosen_ctx;
+}
+
 /* notifies the hypervisor that the worker spent another cycle in idle time */
 static void notify_idle_cycle(unsigned sched_ctx, int worker, double idle_time)
 {
@@ -969,7 +989,33 @@ static void notify_idle_cycle(unsigned sched_ctx, int worker, double idle_time)
 		{
 			if(sc_hypervisor_check_idle(sched_ctx, worker))
 			{
-				hypervisor.policy.handle_idle_cycle(sched_ctx, worker);
+				int ret = starpu_pthread_mutex_trylock(&act_hypervisor_mutex);
+				if(ret != EBUSY)
+				{
+					int j;
+					int ns = hypervisor.nsched_ctxs;
+					unsigned idle_everywhere = 1;
+					for(j = 0; j < ns; j++)
+					{
+						if(starpu_sched_ctx_contains_worker(worker, hypervisor.sched_ctxs[j]))
+						{
+							if(!sc_hypervisor_check_idle(hypervisor.sched_ctxs[j], worker))
+								idle_everywhere = 0;
+						}
+					}
+					if(idle_everywhere)
+					{
+						unsigned other_ctx = choose_ctx_to_steal(worker);
+						if(other_ctx != STARPU_NMAX_SCHED_CTXS)
+						{
+							sc_hypervisor_add_workers_to_sched_ctx(&worker, 1, other_ctx);
+							starpu_sched_ctx_set_priority(&worker, 1, other_ctx, 0);
+							_sc_hypervisor_allow_compute_idle(other_ctx, worker, 0);
+						}
+					}
+					starpu_pthread_mutex_unlock(&act_hypervisor_mutex);
+				}
+//				hypervisor.policy.handle_idle_cycle(sched_ctx, worker);
 			}
 		}
 	}

+ 12 - 0
src/common/thread.c

@@ -267,6 +267,9 @@ int starpu_pthread_mutex_lock(starpu_pthread_mutex_t *mutex)
 	_STARPU_TRACE_LOCKING_MUTEX();
 
 	int p_ret = pthread_mutex_lock(mutex);
+	int workerid = starpu_worker_get_id();
+	if(workerid != -1 && starpu_worker_mutex_is_sched_mutex(workerid, mutex))
+		starpu_worker_set_flag_sched_mutex_locked(workerid, 1);	 
 
 	_STARPU_TRACE_MUTEX_LOCKED();
 
@@ -278,6 +281,9 @@ int starpu_pthread_mutex_unlock(starpu_pthread_mutex_t *mutex)
 	_STARPU_TRACE_UNLOCKING_MUTEX();
 
 	int p_ret = pthread_mutex_unlock(mutex);
+	int workerid = starpu_worker_get_id(); 
+	if(workerid != -1 && starpu_worker_mutex_is_sched_mutex(workerid, mutex)) 
+		starpu_worker_set_flag_sched_mutex_locked(workerid, 0);	
 
 	_STARPU_TRACE_MUTEX_UNLOCKED();
 
@@ -292,7 +298,13 @@ int starpu_pthread_mutex_trylock(starpu_pthread_mutex_t *mutex)
 	ret = pthread_mutex_trylock(mutex);
 
 	if (!ret)
+	{
+		int workerid = starpu_worker_get_id();	
+		if(workerid != -1 && starpu_worker_mutex_is_sched_mutex(workerid, mutex)) 
+			starpu_worker_set_flag_sched_mutex_locked(workerid, 1);
+
 		_STARPU_TRACE_MUTEX_LOCKED();
+	}
 
 	return ret;
 }

+ 48 - 28
src/core/sched_ctx.c

@@ -74,16 +74,9 @@ static void _starpu_update_workers_with_ctx(int *workerids, int nworkers, int sc
 	{
 		worker = _starpu_get_worker_struct(workerids[i]);
 
-		/* if the current thread requires resize it's no need
-		   to lock it in order to change its  sched_ctx info */
-		if(curr_worker && curr_worker == worker)
-			_starpu_worker_gets_into_ctx(sched_ctx_id, worker);
-		else
-		{
-			STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
-			_starpu_worker_gets_into_ctx(sched_ctx_id, worker);
-			STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
-		}
+		STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
+		_starpu_worker_gets_into_ctx(sched_ctx_id, worker);
+		STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
 	}
 
 	return;
@@ -100,25 +93,15 @@ static void _starpu_update_workers_without_ctx(int *workerids, int nworkers, int
 		worker = _starpu_get_worker_struct(workerids[i]);
 		if(now)
 		{
-			if(curr_worker && curr_worker == worker)
-				_starpu_worker_gets_out_of_ctx(sched_ctx_id, worker);
-			else
-			{
-					STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
-					_starpu_worker_gets_out_of_ctx(sched_ctx_id, worker);
-					STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
-			}
+			STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
+			_starpu_worker_gets_out_of_ctx(sched_ctx_id, worker);
+			STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
 		}
 		else
 		{
-			if(curr_worker && curr_worker == worker)
-				worker->removed_from_ctx[sched_ctx_id] = 1;
-			else
-			{
-				STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
-				worker->removed_from_ctx[sched_ctx_id] = 1;
-				STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
-			}
+			STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
+			worker->removed_from_ctx[sched_ctx_id] = 1;
+			STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
 		}
 	}
 	return;
@@ -480,8 +463,9 @@ unsigned starpu_sched_ctx_create_inside_interval(const char *policy_name, const
 	sched_ctx->max_ncpus = max_ncpus;
 	sched_ctx->min_ngpus = min_ngpus;
 	sched_ctx->max_ngpus = max_ngpus;
-
+	_starpu_unlock_mutex_if_prev_locked();
 	_starpu_update_workers_without_ctx(sched_ctx->workers->workerids, sched_ctx->workers->nworkers, sched_ctx->id, 0);
+	_starpu_relock_mutex_if_prev_locked();
 #ifdef STARPU_USE_SC_HYPERVISOR
 	sched_ctx->perf_counters = NULL;
 #endif
@@ -533,7 +517,9 @@ unsigned starpu_sched_ctx_create(int *workerids, int nworkers, const char *sched
 	struct _starpu_sched_ctx *sched_ctx = NULL;
 	sched_ctx = _starpu_create_sched_ctx(sched_policy, workerids, nworkers, 0, sched_ctx_name, min_prio_set, min_prio, max_prio_set, max_prio);
 
+	_starpu_unlock_mutex_if_prev_locked();
 	_starpu_update_workers_with_ctx(sched_ctx->workers->workerids, sched_ctx->workers->nworkers, sched_ctx->id);
+	_starpu_relock_mutex_if_prev_locked();
 #ifdef STARPU_USE_SC_HYPERVISOR
 	sched_ctx->perf_counters = NULL;
 #endif
@@ -594,6 +580,7 @@ void starpu_sched_ctx_delete(unsigned sched_ctx_id)
 	unsigned inheritor_sched_ctx_id = sched_ctx->inheritor;
 	struct _starpu_sched_ctx *inheritor_sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx->inheritor);
 
+	_starpu_unlock_mutex_if_prev_locked();
 	STARPU_PTHREAD_RWLOCK_WRLOCK(&changing_ctx_mutex[sched_ctx_id]);
 	STARPU_ASSERT(sched_ctx->id != STARPU_NMAX_SCHED_CTXS);
 
@@ -625,6 +612,7 @@ void starpu_sched_ctx_delete(unsigned sched_ctx_id)
 	   you don't use it anymore */
 	free(workerids);
 	STARPU_PTHREAD_RWLOCK_UNLOCK(&changing_ctx_mutex[sched_ctx_id]);
+	_starpu_relock_mutex_if_prev_locked();
 	return;
 }
 
@@ -644,7 +632,6 @@ void _starpu_delete_all_sched_ctxs()
 			_starpu_delete_sched_ctx(sched_ctx);
 		}
 		STARPU_PTHREAD_RWLOCK_UNLOCK(&changing_ctx_mutex[i]);
-
 		STARPU_PTHREAD_RWLOCK_DESTROY(&changing_ctx_mutex[i]);
 	}
 	return;
@@ -708,6 +695,8 @@ void starpu_sched_ctx_add_workers(int *workers_to_add, int nworkers_to_add, unsi
 	int added_workers[nworkers_to_add];
 	int n_added_workers = 0;
 
+	_starpu_unlock_mutex_if_prev_locked();
+
 	STARPU_PTHREAD_RWLOCK_WRLOCK(&changing_ctx_mutex[sched_ctx_id]);
 
 	STARPU_ASSERT(workers_to_add != NULL && nworkers_to_add > 0);
@@ -727,6 +716,8 @@ void starpu_sched_ctx_add_workers(int *workers_to_add, int nworkers_to_add, unsi
 
 	STARPU_PTHREAD_RWLOCK_UNLOCK(&changing_ctx_mutex[sched_ctx_id]);
 
+	_starpu_relock_mutex_if_prev_locked();
+
 	if(sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
 	{
 		STARPU_PTHREAD_RWLOCK_RDLOCK(&changing_ctx_mutex[sched_ctx_id]);
@@ -745,6 +736,8 @@ void starpu_sched_ctx_remove_workers(int *workers_to_remove, int nworkers_to_rem
 
 	_starpu_check_workers(workers_to_remove, nworkers_to_remove);
 
+	_starpu_unlock_mutex_if_prev_locked();
+
 	STARPU_PTHREAD_RWLOCK_WRLOCK(&changing_ctx_mutex[sched_ctx_id]);
 	/* if the context has not already been deleted */
 	if(sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
@@ -756,12 +749,16 @@ void starpu_sched_ctx_remove_workers(int *workers_to_remove, int nworkers_to_rem
 
 	}
 	STARPU_PTHREAD_RWLOCK_UNLOCK(&changing_ctx_mutex[sched_ctx_id]);
+
+	_starpu_relock_mutex_if_prev_locked();
+
 	return;
 }
 
 int _starpu_nworkers_able_to_execute_task(struct starpu_task *task, struct _starpu_sched_ctx *sched_ctx)
 {
 	unsigned worker = 0, nworkers = 0;
+
 	STARPU_PTHREAD_RWLOCK_WRLOCK(&changing_ctx_mutex[sched_ctx->id]);
 	struct starpu_worker_collection *workers = sched_ctx->workers;
 
@@ -1334,6 +1331,29 @@ int starpu_sched_ctx_max_priority_is_set(unsigned sched_ctx_id)
 	return sched_ctx->max_priority_is_set;
 }
 
+void starpu_sched_ctx_set_priority(int *workers, int nworkers, unsigned sched_ctx_id, unsigned priority)
+{
+	if(nworkers != -1)
+	{
+		int w;
+		struct _starpu_worker *worker = NULL;
+		for(w = 0; w < nworkers; w++)
+		{
+			worker = _starpu_get_worker_struct(workers[w]);
+			struct _starpu_sched_ctx_list *l = NULL;
+			for (l = worker->sched_ctx_list; l; l = l->next)
+			{
+				if(l->sched_ctx == sched_ctx_id)
+				{
+					l->priority = priority;
+					break;
+				}
+			}
+		}
+	}
+	return;
+}
+
 unsigned _starpu_sched_ctx_last_worker_awake(struct _starpu_worker *worker)
 {
 	struct _starpu_sched_ctx_list *l = NULL;

+ 2 - 0
src/core/sched_ctx_list.c

@@ -21,6 +21,7 @@ void _starpu_sched_ctx_list_init(struct _starpu_sched_ctx_list *list)
 {
 	list->next = NULL;
 	list->sched_ctx = STARPU_NMAX_SCHED_CTXS;
+	list->priority = 1;
 }
 
 void _starpu_sched_ctx_list_add(struct _starpu_sched_ctx_list **list, unsigned sched_ctx)
@@ -31,6 +32,7 @@ void _starpu_sched_ctx_list_add(struct _starpu_sched_ctx_list **list, unsigned s
 	{
 		struct _starpu_sched_ctx_list *l = (struct _starpu_sched_ctx_list*)malloc(sizeof(struct _starpu_sched_ctx_list));
 		l->sched_ctx = sched_ctx;
+		l->priority = 1;
 		l->next = *list;
 		*list = l;
 	}

+ 1 - 0
src/core/sched_ctx_list.h

@@ -21,6 +21,7 @@ struct _starpu_sched_ctx_list
 {
 	struct _starpu_sched_ctx_list *next;
 	unsigned sched_ctx;
+	unsigned priority;
 };
 
 void _starpu_sched_ctx_list_init(struct _starpu_sched_ctx_list *list);

+ 56 - 17
src/core/sched_policy.c

@@ -582,34 +582,69 @@ struct _starpu_sched_ctx* _get_next_sched_ctx_to_pop_into(struct _starpu_worker
 	struct _starpu_sched_ctx *sched_ctx, *good_sched_ctx = NULL;
 	unsigned smallest_counter =  worker->nsched_ctxs;
 	struct _starpu_sched_ctx_list *l = NULL;
-	if(!worker->reverse_phase)
+	unsigned are_2_priorities = 0;
+	for (l = worker->sched_ctx_list; l; l = l->next)
+	{
+		if(l->priority != worker->pop_ctx_priority)
+		{
+			are_2_priorities = 1;
+			break;
+		}
+	}
+
+	if(!worker->reverse_phase[worker->pop_ctx_priority])
 	{
 		/* find a context in which the worker hasn't poped yet */
 		for (l = worker->sched_ctx_list; l; l = l->next)
 		{
-			if(!worker->poped_in_ctx[l->sched_ctx])
+			if(l->priority == worker->pop_ctx_priority)
 			{
-				worker->poped_in_ctx[l->sched_ctx] = !worker->poped_in_ctx[l->sched_ctx];
-				return	_starpu_get_sched_ctx_struct(l->sched_ctx);
+				if(!worker->poped_in_ctx[l->sched_ctx])
+				{
+					worker->poped_in_ctx[l->sched_ctx] = !worker->poped_in_ctx[l->sched_ctx];
+					return	_starpu_get_sched_ctx_struct(l->sched_ctx);
+				}
 			}
 		}
-		worker->reverse_phase = !worker->reverse_phase;
+		worker->reverse_phase[worker->pop_ctx_priority] = !worker->reverse_phase[worker->pop_ctx_priority];
+		if(are_2_priorities)
+			worker->pop_ctx_priority = !worker->pop_ctx_priority;
 	}
-	if(worker->reverse_phase)
+	are_2_priorities = 0;
+	if(worker->reverse_phase[worker->pop_ctx_priority])
 	{
 		/* if the context has already poped in every one start from the begining */
 		for (l = worker->sched_ctx_list; l; l = l->next)
 		{
-			if(worker->poped_in_ctx[l->sched_ctx])
+			if(l->priority == worker->pop_ctx_priority)
 			{
-				worker->poped_in_ctx[l->sched_ctx] = !worker->poped_in_ctx[l->sched_ctx];
-				return	_starpu_get_sched_ctx_struct(l->sched_ctx);
+				if(worker->poped_in_ctx[l->sched_ctx])
+				{
+					worker->poped_in_ctx[l->sched_ctx] = !worker->poped_in_ctx[l->sched_ctx];
+					return	_starpu_get_sched_ctx_struct(l->sched_ctx);
+				}
 			}
 		}
-		worker->reverse_phase = !worker->reverse_phase;
+		worker->reverse_phase[worker->pop_ctx_priority] = !worker->reverse_phase[worker->pop_ctx_priority];
+		if(are_2_priorities)
+			worker->pop_ctx_priority = !worker->pop_ctx_priority;
 	}	
-	worker->poped_in_ctx[worker->sched_ctx_list->sched_ctx] = !worker->poped_in_ctx[worker->sched_ctx_list->sched_ctx];
-	return _starpu_get_sched_ctx_struct(worker->sched_ctx_list->sched_ctx);
+
+	unsigned first_sched_ctx = STARPU_NMAX_SCHED_CTXS;
+	for (l = worker->sched_ctx_list; l; l = l->next)
+	{
+		if(l->priority == worker->pop_ctx_priority)
+		{
+			first_sched_ctx = l->sched_ctx;
+			break;
+		}
+	}
+
+	if(worker->pop_ctx_priority == 0 && first_sched_ctx == STARPU_NMAX_SCHED_CTXS)
+		first_sched_ctx = worker->sched_ctx_list->sched_ctx;
+
+	worker->poped_in_ctx[first_sched_ctx] = !worker->poped_in_ctx[first_sched_ctx];
+	return _starpu_get_sched_ctx_struct(first_sched_ctx);
 }
 
 struct starpu_task *_starpu_pop_task(struct _starpu_worker *worker)
@@ -650,7 +685,7 @@ pick:
 				while(1)
 				{
 					sched_ctx = _get_next_sched_ctx_to_pop_into(worker);
-					
+
 					if(worker->removed_from_ctx[sched_ctx->id] == 1 && worker->shares_tasks_lists[sched_ctx->id] == 1)
 					{
 						_starpu_worker_gets_out_of_ctx(sched_ctx->id, worker);
@@ -662,7 +697,6 @@ pick:
 				}
 			}
 
-
 			if(sched_ctx && sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
 			{
 				if (sched_ctx->sched_policy && sched_ctx->sched_policy->pop_task)
@@ -683,19 +717,24 @@ pick:
 					worker->removed_from_ctx[sched_ctx->id] = 0;
 				}
 #ifdef STARPU_USE_SC_HYPERVISOR
-				struct starpu_sched_ctx_performance_counters *perf_counters = sched_ctx->perf_counters;
-				if(sched_ctx->id != 0 && perf_counters != NULL && perf_counters->notify_idle_cycle && _starpu_sched_ctx_allow_hypervisor(sched_ctx->id))
+				if(worker->pop_ctx_priority)
 				{
+					struct starpu_sched_ctx_performance_counters *perf_counters = sched_ctx->perf_counters;
+					if(sched_ctx->id != 0 && perf_counters != NULL && perf_counters->notify_idle_cycle && _starpu_sched_ctx_allow_hypervisor(sched_ctx->id))
+					{
 //					_STARPU_TRACE_HYPERVISOR_BEGIN();
-					perf_counters->notify_idle_cycle(sched_ctx->id, worker->workerid, 1.0);
+						perf_counters->notify_idle_cycle(sched_ctx->id, worker->workerid, 1.0);
 //					_STARPU_TRACE_HYPERVISOR_END();
+					}
 				}
 #endif //STARPU_USE_SC_HYPERVISOR
 				
 #ifndef STARPU_NON_BLOCKING_DRIVERS
 				if(been_here[sched_ctx->id] || worker->nsched_ctxs == 1)
 					break;
+
 				been_here[sched_ctx->id] = 1;
+
 #endif
 			}
 		}

+ 1 - 1
src/core/task.c

@@ -199,7 +199,7 @@ int starpu_task_wait(struct starpu_task *task)
 
 	if (task->detach || task->synchronous)
 	{
-		_STARPU_DEBUG("Task is detached or asynchronous. Waiting returns immediately\n");
+		_STARPU_DEBUG("Task is detached or synchronous. Waiting returns immediately\n");
 		_STARPU_LOG_OUT_TAG("einval");
 		return -EINVAL;
 	}

+ 43 - 1
src/core/workers.c

@@ -444,7 +444,10 @@ static void _starpu_worker_init(struct _starpu_worker *workerarg, struct _starpu
 		workerarg->shares_tasks_lists[ctx] = 0;
 		workerarg->poped_in_ctx[ctx] = 0;
 	}
-	workerarg->reverse_phase = 0;
+	workerarg->reverse_phase[0] = 0;
+	workerarg->reverse_phase[1] = 0;
+	workerarg->pop_ctx_priority = 1;
+	workerarg->sched_mutex_locked = 0;
 
 	/* cpu_set/hwloc_cpu_set initialized in topology.c */
 }
@@ -1760,3 +1763,42 @@ void starpu_get_version(int *major, int *minor, int *release)
 	*minor = STARPU_MINOR_VERSION;
 	*release = STARPU_RELEASE_VERSION;
 }
+
+void _starpu_unlock_mutex_if_prev_locked()
+{
+	int workerid = starpu_worker_get_id();
+	if(workerid != -1)
+	{
+		struct _starpu_worker *w = _starpu_get_worker_struct(workerid);
+		if(w->sched_mutex_locked)
+		{
+			STARPU_PTHREAD_MUTEX_UNLOCK(&w->sched_mutex);
+			starpu_worker_set_flag_sched_mutex_locked(workerid, 1);
+		}
+	}
+	return;
+}
+
+void _starpu_relock_mutex_if_prev_locked()
+{
+	int workerid = starpu_worker_get_id();
+	if(workerid != -1)
+	{
+		struct _starpu_worker *w = _starpu_get_worker_struct(workerid);
+		if(w->sched_mutex_locked)
+			STARPU_PTHREAD_MUTEX_LOCK(&w->sched_mutex);
+	}
+	return;
+}
+
+void starpu_worker_set_flag_sched_mutex_locked(int workerid, unsigned flag)
+{
+	struct _starpu_worker *w = _starpu_get_worker_struct(workerid);
+	w->sched_mutex_locked = flag;
+}
+
+unsigned starpu_worker_mutex_is_sched_mutex(int workerid, pthread_mutex_t *mutex)
+{
+	struct _starpu_worker *w = _starpu_get_worker_struct(workerid);
+	return &w->sched_mutex == mutex;
+}

+ 12 - 2
src/core/workers.h

@@ -110,10 +110,14 @@ LIST_TYPE(_starpu_worker,
 	unsigned poped_in_ctx[STARPU_NMAX_SCHED_CTXS];	  
 
        /* boolean indicating at which moment we checked all ctxs and change phase for the booleab poped_in_ctx*/
-	unsigned reverse_phase;
-
+       /* one for each of the 2 priorities*/
+	unsigned reverse_phase[2];
 
+	/* indicate which priority of ctx is currently active: the values are 0 or 1*/
+	unsigned pop_ctx_priority;
 
+	/* flag to know if sched_mutex is locked or not */
+	unsigned sched_mutex_locked;
 #ifdef __GLIBC__
 	cpu_set_t cpu_set;
 #endif /* __GLIBC__ */
@@ -405,4 +409,10 @@ int starpu_worker_get_nids_by_type(enum starpu_worker_archtype type, int *worker
    the list might not be updated */
 int starpu_worker_get_nids_ctx_free_by_type(enum starpu_worker_archtype type, int *workerids, int maxsize);
 
+/* if the current worker has the lock release it */
+void _starpu_unlock_mutex_if_prev_locked();
+
+/* if we prev released the lock relock it */
+void _starpu_relock_mutex_if_prev_locked();
+
 #endif // __WORKERS_H__

+ 1 - 0
tests/microbenchs/tasks_size_overhead.c

@@ -223,6 +223,7 @@ int main(int argc, char **argv)
 		starpu_shutdown();
 
 		FPRINTF(stdout, "\n");
+		fflush(stdout);
 	}
 
 	free(tasks);