浏览代码

sharing workers btw ctxs with no sched_policy

Andra Hugo 11 年之前
父节点
当前提交
c8def5adac

+ 4 - 4
examples/sched_ctx/parallel_code.c

@@ -139,11 +139,11 @@ int main(int argc, char **argv)
 	for(j = nprocs5; j < nprocs5+nprocs6; j++)
 		procs6[k++] = procs2[j];
 
-	int master3 = starpu_sched_ctx_book_workers_for_task(procs3, nprocs3);
-	int master4 = starpu_sched_ctx_book_workers_for_task(procs4, nprocs4);
+	int master3 = starpu_sched_ctx_book_workers_for_task(sched_ctx1, procs3, nprocs3);
+	int master4 = starpu_sched_ctx_book_workers_for_task(sched_ctx1, procs4, nprocs4);
 
-	int master5 = starpu_sched_ctx_book_workers_for_task(procs5, nprocs5);
-	int master6 = starpu_sched_ctx_book_workers_for_task(procs6, nprocs6);
+	int master5 = starpu_sched_ctx_book_workers_for_task(sched_ctx2, procs5, nprocs5);
+	int master6 = starpu_sched_ctx_book_workers_for_task(sched_ctx2, procs6, nprocs6);
 
 /* 	int master1 = starpu_sched_ctx_book_workers_for_task(procs1, nprocs1); */
 /* 	int master2 = starpu_sched_ctx_book_workers_for_task(procs2, nprocs2); */

+ 16 - 5
examples/sched_ctx/sched_ctx_without_sched_policy.c

@@ -90,11 +90,22 @@ int main(int argc, char **argv)
 	procs2 = (int*)malloc(ncpus*sizeof(int));
 	starpu_worker_get_ids_by_type(STARPU_CPU_WORKER, procs1, ncpus);
 
-	nprocs1 = ncpus/2;
-	nprocs2 =  nprocs1;
-	k = 0;
-	for(j = nprocs1; j < nprocs1+nprocs2; j++)
-		procs2[k++] = j;
+	if(ncpus > 1)
+	{
+		nprocs1 = ncpus/2;
+		nprocs2 =  nprocs1;
+		k = 0;
+		for(j = nprocs1; j < nprocs1+nprocs2; j++)
+			procs2[k++] = j;
+	}
+	else
+	{
+		procs1 = (int*)malloc(nprocs1*sizeof(int));
+		procs2 = (int*)malloc(nprocs2*sizeof(int));
+		procs1[0] = 0;
+		procs2[0] = 0;
+
+	}
 #else
 	procs1 = (int*)malloc(nprocs1*sizeof(int));
 	procs2 = (int*)malloc(nprocs2*sizeof(int));

+ 1 - 1
include/starpu_sched_ctx.h

@@ -121,7 +121,7 @@ void starpu_sched_ctx_get_available_cpuids(unsigned sched_ctx_id, int **cpuids,
 
 void starpu_sched_ctx_bind_current_thread_to_cpuid(unsigned cpuid);
 
-int starpu_sched_ctx_book_workers_for_task(int *workerids, int nworkers);
+int starpu_sched_ctx_book_workers_for_task(unsigned sched_ctx_id, int *workerids, int nworkers);
 
 void starpu_sched_ctx_unbook_workers_for_task(unsigned sched_ctx_id, int master);
 

+ 115 - 109
src/core/sched_ctx.c

@@ -32,8 +32,8 @@ double flops[STARPU_NMAX_SCHED_CTXS][STARPU_NMAXWORKERS];
 size_t data_size[STARPU_NMAX_SCHED_CTXS][STARPU_NMAXWORKERS];
 
 static unsigned _starpu_get_first_free_sched_ctx(struct _starpu_machine_config *config);
-static void _starpu_sched_ctx_add_workers_to_master(int *workerids, int nworkers, int new_master);
-static void _starpu_sched_ctx_wake_these_workers_up(int *workerids, int nworkers);
+static void _starpu_sched_ctx_add_workers_to_master(unsigned sched_ctx_id, int *workerids, int nworkers, int new_master);
+static void _starpu_sched_ctx_wake_these_workers_up(unsigned sched_ctx_id, int *workerids, int nworkers);
 
 static void _starpu_worker_gets_into_ctx(unsigned sched_ctx_id, struct _starpu_worker *worker)
 {
@@ -137,6 +137,7 @@ static void _starpu_add_workers_to_sched_ctx(struct _starpu_sched_ctx *sched_ctx
 	int nworkers_to_add = nworkers == -1 ? (int)config->topology.nworkers : nworkers;
 	int workers_to_add[nworkers_to_add];
 
+
 	int i = 0;
 	for(i = 0; i < nworkers_to_add; i++)
 	{
@@ -165,16 +166,24 @@ static void _starpu_add_workers_to_sched_ctx(struct _starpu_sched_ctx *sched_ctx
 			int worker = (workerids == NULL ? i : workerids[i]);
 			workers->add(workers, worker);
 			workers_to_add[i] = worker;
+			struct _starpu_worker *str_worker = _starpu_get_worker_struct(worker);
+			if(!str_worker->tmp_sched_ctx_list)
+			{
+				str_worker->tmp_sched_ctx_list = (struct _starpu_sched_ctx_list*)malloc(sizeof(struct _starpu_sched_ctx_list));
+				_starpu_sched_ctx_list_init(str_worker->tmp_sched_ctx_list);
+			}
+			_starpu_sched_ctx_list_add(&str_worker->tmp_sched_ctx_list, sched_ctx->id);
+
 		}
 	}
 
 	if(!sched_ctx->sched_policy)
 	{
-		if(sched_ctx->master == -1)
-			sched_ctx->master = starpu_sched_ctx_book_workers_for_task(workerids, nworkers);
+		if(sched_ctx->main_master == -1)
+			sched_ctx->main_master = starpu_sched_ctx_book_workers_for_task(sched_ctx->id, workerids, nworkers);
 		else
 		{
-			_starpu_sched_ctx_add_workers_to_master(workerids, nworkers, sched_ctx->master);
+			_starpu_sched_ctx_add_workers_to_master(sched_ctx->id, workerids, nworkers, sched_ctx->main_master);
 		}
 	}
 	else if(sched_ctx->sched_policy->add_workers)
@@ -211,7 +220,7 @@ static void _starpu_remove_workers_from_sched_ctx(struct _starpu_sched_ctx *sche
 	}
 
 	if(!sched_ctx->sched_policy)
-		_starpu_sched_ctx_wake_these_workers_up(removed_workers, *n_removed_workers);
+		_starpu_sched_ctx_wake_these_workers_up(sched_ctx->id, removed_workers, *n_removed_workers);
 
 	return;
 }
@@ -298,7 +307,22 @@ struct _starpu_sched_ctx* _starpu_create_sched_ctx(struct starpu_sched_policy *p
 	_starpu_barrier_counter_init(&sched_ctx->ready_tasks_barrier, 0);
 
 	sched_ctx->ready_flops = 0.0;
-	sched_ctx->master = -1;
+	sched_ctx->main_master = -1;
+	
+	int w;
+	for(w = 0; w < nworkers; w++)
+	{
+		sem_init(&sched_ctx->fall_asleep_sem[w], 0, 0);
+		sem_init(&sched_ctx->wake_up_sem[w], 0, 0);
+
+		STARPU_PTHREAD_COND_INIT(&sched_ctx->parallel_sect_cond[w], NULL);
+		STARPU_PTHREAD_MUTEX_INIT(&sched_ctx->parallel_sect_mutex[w], NULL);
+		
+		sched_ctx->master[w] = -1;
+		sched_ctx->parallel_sect[w] = 0;
+		sched_ctx->sleeping[w] = 0;
+	}
+
 	
         /*init the strategy structs and the worker_collection of the ressources of the context */
 	if(policy)
@@ -582,7 +606,7 @@ static void _starpu_delete_sched_ctx(struct _starpu_sched_ctx *sched_ctx)
 		sched_ctx->sched_policy = NULL;
 	}
 	else
-		starpu_sched_ctx_unbook_workers_for_task(sched_ctx->id, sched_ctx->master);
+		starpu_sched_ctx_unbook_workers_for_task(sched_ctx->id, sched_ctx->main_master);
 
 	STARPU_PTHREAD_MUTEX_DESTROY(&sched_ctx->empty_ctx_mutex);
 	sched_ctx->id = STARPU_NMAX_SCHED_CTXS;
@@ -1507,33 +1531,6 @@ unsigned _starpu_sched_ctx_last_worker_awake(struct _starpu_worker *worker)
 	return 0;
 }
 
-static void _starpu_sched_ctx_bind_thread_to_ctx_cpus(unsigned sched_ctx_id)
-{
-	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	struct _starpu_machine_config *config = _starpu_get_machine_config();
-
-#ifdef STARPU_HAVE_HWLOC	
-	const struct hwloc_topology_support *support = hwloc_topology_get_support(config->topology.hwtopology);
-        if (support->cpubind->set_thisthread_cpubind)
-        {
-		hwloc_bitmap_t set = sched_ctx->hwloc_workers_set;
-                int ret;
-                ret = hwloc_set_cpubind (config->topology.hwtopology, set,
-                                         HWLOC_CPUBIND_THREAD);
-		if (ret)
-                {
-                        perror("binding thread");
-			STARPU_ABORT();
-                }
-	}
-
-#else
-#warning no sched ctx CPU binding support
-#endif
-
-	return;
-}
-
 void starpu_sched_ctx_bind_current_thread_to_cpuid(unsigned cpuid STARPU_ATTRIBUTE_UNUSED)
 {
 	struct _starpu_machine_config *config = _starpu_get_machine_config();
@@ -1594,68 +1591,75 @@ void starpu_sched_ctx_bind_current_thread_to_cpuid(unsigned cpuid STARPU_ATTRIBU
 
 }
 
-static void _starpu_sched_ctx_get_workers_to_sleep(int *workerids, int nworkers, int master)
+static unsigned _worker_sleeping_in_other_ctx(unsigned sched_ctx_id, int workerid)
 {
+	int s;
+	for(s = 0; s < STARPU_NMAX_SCHED_CTXS; s++)
+	{
+		struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(s);
+		if(sched_ctx && sched_ctx->id > 0 && sched_ctx->id < STARPU_NMAX_SCHED_CTXS && sched_ctx->id != sched_ctx_id)
+		{
+			if(sched_ctx->parallel_sect[workerid])
+				return 1;
+		}
+	}
+	return 0;
+
+}
+static void _starpu_sched_ctx_get_workers_to_sleep(unsigned sched_ctx_id, int *workerids, int nworkers, int master)
+{
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
 	int current_worker_id = starpu_worker_get_id();
-	
+	unsigned sleeping[nworkers];
 	int w;
-	struct _starpu_worker *worker = NULL;
 	for(w = 0; w < nworkers; w++)
 	{
-		worker = _starpu_get_worker_struct(workerids[w]);
-		if(current_worker_id == -1 || worker->workerid != current_worker_id)
-			STARPU_PTHREAD_MUTEX_LOCK(&worker->parallel_sect_mutex);
-
-		worker->master = master;
-		worker->parallel_sect = 1;
-		if(current_worker_id == -1 || worker->workerid != current_worker_id)
-			STARPU_PTHREAD_MUTEX_UNLOCK(&worker->parallel_sect_mutex);
+		if(current_worker_id == -1 || workerids[w] != current_worker_id)
+			STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->parallel_sect_mutex[workerids[w]]);
+		sleeping[w] = _worker_sleeping_in_other_ctx(sched_ctx_id, workerids[w]);
+		sched_ctx->master[workerids[w]] = master;
+		sched_ctx->parallel_sect[workerids[w]] = 1;
+		if(current_worker_id == -1 || workerids[w] != current_worker_id)
+			STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->parallel_sect_mutex[workerids[w]]);
 	}
 
-	struct _starpu_worker *master_worker = _starpu_get_worker_struct(master);
 	int workerid;
 	for(w = 0; w < nworkers; w++)
 	{
 		workerid = workerids[w];
-		if(current_worker_id == -1 || workerid != current_worker_id)
+		if((current_worker_id == -1 || workerid != current_worker_id) && !sleeping[w])
 		{
-			sem_wait(&master_worker->fall_asleep_sem);
+			sched_ctx->sleeping[workerids[w]] = 1;
+			sem_wait(&sched_ctx->fall_asleep_sem[master]);
 		}
 	}
 	return;
 }
 
-void _starpu_sched_ctx_signal_worker_blocked(int workerid)
+void _starpu_sched_ctx_signal_worker_blocked(unsigned sched_ctx_id, int workerid)
 {
-	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
-	struct _starpu_worker *master_worker = _starpu_get_worker_struct(worker->master);
-	struct _starpu_sched_ctx *sched_ctx = NULL;
-	struct _starpu_sched_ctx_list *l = NULL;
-	sem_post(&master_worker->fall_asleep_sem);
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	int master = sched_ctx->master[workerid];
+	sem_post(&sched_ctx->fall_asleep_sem[master]);
 
 	return;
 }
 
-void _starpu_sched_ctx_signal_worker_woke_up(int workerid)
+void _starpu_sched_ctx_signal_worker_woke_up(unsigned sched_ctx_id, int workerid)
 {
-	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
-	struct _starpu_worker *master_worker = _starpu_get_worker_struct(worker->master);
-	struct _starpu_sched_ctx *sched_ctx = NULL;
-	struct _starpu_sched_ctx_list *l = NULL;
-	
-	sem_post(&master_worker->wake_up_sem);
-
-	worker->master = -1;
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	int master = sched_ctx->master[workerid];
+	sem_post(&sched_ctx->wake_up_sem[master]);
+	sched_ctx->sleeping[workerid] = 0;
+	sched_ctx->master[workerid] = -1;
 	return;
 }
 
 static void _starpu_sched_ctx_wake_up_workers(unsigned sched_ctx_id, int master)
 {
-	int current_worker_id = starpu_worker_get_id();
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	int current_worker_id = starpu_worker_get_id();
 	struct starpu_worker_collection *workers = sched_ctx->workers;
-	struct _starpu_worker *worker = NULL;
-	struct _starpu_worker *master_worker = _starpu_get_worker_struct(master);
 
 	struct starpu_sched_ctx_iterator it;
 	if(workers->init_iterator)
@@ -1663,18 +1667,19 @@ static void _starpu_sched_ctx_wake_up_workers(unsigned sched_ctx_id, int master)
 
 	while(workers->has_next(workers, &it))
 	{
-		worker = _starpu_get_worker_struct(workers->get_next(workers, &it));
-		if(worker->master == master)
+		int workerid = workers->get_next(workers, &it);
+		int curr_master = sched_ctx->master[workerid];
+		if(curr_master == master && sched_ctx->parallel_sect[workerid])
 		{
-			if(current_worker_id == -1 || worker->workerid != current_worker_id)
+			if((current_worker_id == -1 || workerid != current_worker_id) && sched_ctx->sleeping[workerid])
 			{
-				STARPU_PTHREAD_MUTEX_LOCK(&worker->parallel_sect_mutex);
-				STARPU_PTHREAD_COND_SIGNAL(&worker->parallel_sect_cond);
-				STARPU_PTHREAD_MUTEX_UNLOCK(&worker->parallel_sect_mutex);
-				sem_wait(&master_worker->wake_up_sem);
+				STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->parallel_sect_mutex[workerid]);
+				STARPU_PTHREAD_COND_SIGNAL(&sched_ctx->parallel_sect_cond[workerid]);
+				STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->parallel_sect_mutex[workerid]);
+				sem_wait(&sched_ctx->wake_up_sem[master]);
 			}
 			else
-				worker->parallel_sect = 0;
+				sched_ctx->parallel_sect[workerid] = 0;
 		}
 	}
 
@@ -1685,11 +1690,8 @@ void* starpu_sched_ctx_exec_parallel_code(void* (*func)(void*), void* param, uns
 {
 	int *workerids;
 	int nworkers = starpu_sched_ctx_get_workers_list(sched_ctx_id, &workerids);
-	_starpu_sched_ctx_get_workers_to_sleep(workerids, nworkers, workerids[nworkers-1]);
+	_starpu_sched_ctx_get_workers_to_sleep(sched_ctx_id, workerids, nworkers, workerids[nworkers-1]);
 
-	/* bind current thread on all workers of the context */
-//	_starpu_sched_ctx_bind_thread_to_ctx_cpus(sched_ctx_id);
-	
 	/* execute parallel code */
 	void* ret = func(param);
 
@@ -1717,8 +1719,8 @@ void starpu_sched_ctx_get_available_cpuids(unsigned sched_ctx_id, int **cpuids,
 	while(workers->has_next(workers, &it))
 	{
 		workerid = workers->get_next(workers, &it);
-		worker = _starpu_get_worker_struct(workerid);
-		if(worker->master == current_worker_id || workerid == current_worker_id || current_worker_id == -1)
+		int master = sched_ctx->master[workerid];
+		if(master == current_worker_id || workerid == current_worker_id || current_worker_id == -1)
 		{
 			(*cpuids)[w++] = starpu_worker_get_bindid(workerid);
 		}
@@ -1727,8 +1729,9 @@ void starpu_sched_ctx_get_available_cpuids(unsigned sched_ctx_id, int **cpuids,
 	return;
 }
 
-static void _starpu_sched_ctx_wake_these_workers_up(int *workerids, int nworkers)
+static void _starpu_sched_ctx_wake_these_workers_up(unsigned sched_ctx_id, int *workerids, int nworkers)
 {
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
 	int current_worker_id = starpu_worker_get_id();
 
 	int masters[nworkers];
@@ -1736,37 +1739,37 @@ static void _starpu_sched_ctx_wake_these_workers_up(int *workerids, int nworkers
 	struct _starpu_worker *worker = NULL;
 	for(w = 0; w < nworkers; w++)
 	{
-		worker = _starpu_get_worker_struct(workerids[w]);
-		masters[w] = worker->master;
-		if(current_worker_id == -1 || worker->workerid != current_worker_id)
+		int workerid = workerids[w];
+		masters[w] = sched_ctx->master[workerid];
+		if(current_worker_id == -1 || workerid != current_worker_id)
 		{
-			STARPU_PTHREAD_MUTEX_LOCK(&worker->parallel_sect_mutex);
-			STARPU_PTHREAD_COND_SIGNAL(&worker->parallel_sect_cond);
-			STARPU_PTHREAD_MUTEX_UNLOCK(&worker->parallel_sect_mutex);
+			STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->parallel_sect_mutex[workerid]);
+			STARPU_PTHREAD_COND_SIGNAL(&sched_ctx->parallel_sect_cond[workerid]);
+			STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->parallel_sect_mutex[workerid]);
 		}
 		else
-			worker->parallel_sect = 0;
-		worker->master = -1;
+			sched_ctx->parallel_sect[workerid] = 0;
+		sched_ctx->master[workerid] = -1;
 	}
 
 	int workerid;
-	struct _starpu_worker *master_worker = NULL;
 	for(w = 0; w < nworkers; w++)
 	{
 		workerid = workerids[w];
 		if(masters[w] != -1)
 		{
-			master_worker = _starpu_get_worker_struct(masters[w]);
+			int master = sched_ctx->master[workerid];
 			if(current_worker_id == -1 || workerid != current_worker_id)
-				sem_wait(&master_worker->wake_up_sem);
+				sem_wait(&sched_ctx->wake_up_sem[master]);
 		}
 	}
 
 	return;
 }
 
-static int _starpu_sched_ctx_find_master(int *workerids, int nworkers)
+static int _starpu_sched_ctx_find_master(unsigned sched_ctx_id, int *workerids, int nworkers)
 {
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
 	int new_master = workerids[nworkers-1];
         int current_worker_id = starpu_worker_get_id();
         int current_is_in_section = 0;
@@ -1783,23 +1786,23 @@ static int _starpu_sched_ctx_find_master(int *workerids, int nworkers)
                 if (current_worker_id == workerids[w])
                         current_is_in_section = 1;
 
-                struct _starpu_worker *worker = _starpu_get_worker_struct(workerids[w]);
-                if (worker->master > -1)
+		int master = sched_ctx->master[workerids[w]];
+                if (master > -1)
 		{
                         int already_seen = 0;
                         //Could create a function for this. Basically searching an element in an array.                                                                                                             
-                        for (i=0 ; i<npotential_masters ; i++)
+                        for (i = 0 ; i < npotential_masters; i++)
                         {
-                                if (potential_masters[i] == worker->master)
+                                if (potential_masters[i] == master)
 				{
                                         already_seen = 1;
                                         break;
 				}
                         }
                         if (!already_seen)
-				potential_masters[npotential_masters++] = worker->master;
+				potential_masters[npotential_masters++] = master;
                 }
-                else if (worker->master == -1)
+                else if (master == -1)
                         awake_workers[nawake_workers++] = workerids[w];
         }
 
@@ -1833,8 +1836,9 @@ static int _starpu_sched_ctx_find_master(int *workerids, int nworkers)
 	return new_master;
 }
 
-static void _starpu_sched_ctx_add_workers_to_master(int *workerids, int nworkers, int new_master)
+static void _starpu_sched_ctx_add_workers_to_master(unsigned sched_ctx_id, int *workerids, int nworkers, int new_master)
 {
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
 	int w;
 	int nput_to_sleep = 0;
 	int nwake_up = 0;
@@ -1843,22 +1847,24 @@ static void _starpu_sched_ctx_add_workers_to_master(int *workerids, int nworkers
 	
 	for(w = 0 ; w < nworkers ; w++)
 	{
-		struct _starpu_worker *worker = _starpu_get_worker_struct(workerids[w]);
-		if (worker->master == -1 && workerids[w] != new_master)
+		int master = sched_ctx->master[workerids[w]];
+		if (master == -1 && workerids[w] != new_master)
 			put_to_sleep[nput_to_sleep++] = workerids[w];
-		else if(worker->master != -1 && workerids[w] == new_master)
+		else if(master != -1 && workerids[w] == new_master)
 			wake_up[nwake_up++] = workerids[w];
 	}
 
-	_starpu_sched_ctx_wake_these_workers_up(wake_up, nwake_up);
-	_starpu_sched_ctx_get_workers_to_sleep(put_to_sleep, nput_to_sleep, new_master);
+	if(nwake_up > 0)
+		_starpu_sched_ctx_wake_these_workers_up(sched_ctx_id, wake_up, nwake_up);
+	if(nput_to_sleep > 0)
+		_starpu_sched_ctx_get_workers_to_sleep(sched_ctx_id, put_to_sleep, nput_to_sleep, new_master);
 
 }
 
-int starpu_sched_ctx_book_workers_for_task(int *workerids, int nworkers)
+int starpu_sched_ctx_book_workers_for_task(unsigned sched_ctx_id, int *workerids, int nworkers)
 { 
-	int new_master = _starpu_sched_ctx_find_master(workerids, nworkers);	
-	_starpu_sched_ctx_add_workers_to_master(workerids, nworkers, new_master);
+	int new_master = _starpu_sched_ctx_find_master(sched_ctx_id, workerids, nworkers);	
+	_starpu_sched_ctx_add_workers_to_master(sched_ctx_id, workerids, nworkers, new_master);
 	return new_master;
 }
 

+ 26 - 3
src/core/sched_ctx.h

@@ -123,7 +123,30 @@ struct _starpu_sched_ctx
 	/* if we execute non-StarPU code inside the context 
 	   we have a single master worker that stays awake, 
 	   if not master is -1 */
-	int master;
+	int main_master;
+
+	/* conditions variables used when parallel sections are executed in contexts */
+	starpu_pthread_cond_t parallel_sect_cond[STARPU_NMAXWORKERS];
+	starpu_pthread_mutex_t parallel_sect_mutex[STARPU_NMAXWORKERS];
+
+	/* boolean indicating that workers should block in order to allow
+	   parallel sections to be executed on their allocated resources */
+	unsigned parallel_sect[STARPU_NMAXWORKERS];
+
+	/* id of the master worker */
+	int master[STARPU_NMAXWORKERS];
+
+	/* semaphore that block appl thread until starpu threads are 
+	   all blocked and ready to exec the parallel code */
+	sem_t fall_asleep_sem[STARPU_NMAXWORKERS];
+
+	/* semaphore that block appl thread until starpu threads are 
+	   all woke up and ready continue appl */
+	sem_t wake_up_sem[STARPU_NMAXWORKERS];
+       
+	/* bool indicating if the workers is sleeping in this ctx */
+	unsigned sleeping[STARPU_NMAXWORKERS];
+
 };
 
 struct _starpu_machine_config;
@@ -182,10 +205,10 @@ starpu_pthread_rwlock_t* _starpu_sched_ctx_get_changing_ctx_mutex(unsigned sched
 unsigned _starpu_sched_ctx_last_worker_awake(struct _starpu_worker *worker);
 
 /* let the appl know that the worker blocked to execute parallel code */
-void _starpu_sched_ctx_signal_worker_blocked(int workerid);
+void _starpu_sched_ctx_signal_worker_blocked(unsigned sched_ctx_id, int workerid);
 
 /* let the appl know that the worker woke up */
-void _starpu_sched_ctx_signal_worker_woke_up(int workerid);
+void _starpu_sched_ctx_signal_worker_woke_up(unsigned sched_ctx_id, int workerid);
 
 /* If starpu_sched_ctx_set_context() has been called, returns the context
  * id set by its last call, or the id of the initial context */

+ 1 - 1
src/core/sched_policy.c

@@ -446,7 +446,7 @@ int _starpu_push_task_to_workers(struct starpu_task *task)
 
 		if(!sched_ctx->sched_policy)
 		{
-			ret = _starpu_push_task_on_specific_worker(task, sched_ctx->master);
+			ret = _starpu_push_task_on_specific_worker(task, sched_ctx->main_master);
 		}
 		else
 		{

+ 4 - 8
src/core/workers.c

@@ -270,7 +270,8 @@ static int _starpu_can_use_nth_implementation(enum starpu_worker_archtype arch,
 
 int starpu_worker_can_execute_task(unsigned workerid, struct starpu_task *task, unsigned nimpl)
 {
-	if(config.workers[workerid].parallel_sect) return 0;
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
+	if(sched_ctx->parallel_sect[workerid]) return 0;
 	/* TODO: check that the task operand sizes will fit on that device */
 	return (task->cl->where & config.workers[workerid].worker_mask) &&
 		_starpu_can_use_nth_implementation(config.workers[workerid].arch, task->cl, nimpl) &&
@@ -412,8 +413,6 @@ static void _starpu_worker_init(struct _starpu_worker *workerarg, struct _starpu
 	starpu_task_list_init(&workerarg->local_tasks);
 	workerarg->current_task = NULL;
 	workerarg->set = NULL;
-	sem_init(&workerarg->fall_asleep_sem, 0, 0);
-	sem_init(&workerarg->wake_up_sem, 0, 0);
 
 	/* if some codelet's termination cannot be handled directly :
 	 * for instance in the Gordon driver, Gordon tasks' callbacks
@@ -430,6 +429,7 @@ static void _starpu_worker_init(struct _starpu_worker *workerarg, struct _starpu
 	workerarg->run_by_starpu = 1;
 
 	workerarg->sched_ctx_list = NULL;
+	workerarg->tmp_sched_ctx_list = NULL;
 	workerarg->nsched_ctxs = 0;
 	_starpu_barrier_counter_init(&workerarg->tasks_barrier, 0);
 
@@ -441,10 +441,6 @@ static void _starpu_worker_init(struct _starpu_worker *workerarg, struct _starpu
 
 	workerarg->spinning_backoff = 1;
 
-	STARPU_PTHREAD_COND_INIT(&workerarg->parallel_sect_cond, NULL);
-	STARPU_PTHREAD_MUTEX_INIT(&workerarg->parallel_sect_mutex, NULL);
-
-	workerarg->parallel_sect = 0;
 
 	for(ctx = 0; ctx < STARPU_NMAX_SCHED_CTXS; ctx++)
 	{
@@ -455,7 +451,6 @@ static void _starpu_worker_init(struct _starpu_worker *workerarg, struct _starpu
 	workerarg->reverse_phase[1] = 0;
 	workerarg->pop_ctx_priority = 1;
 	workerarg->sched_mutex_locked = 0;
-	workerarg->master = -1;
 
 	/* cpu_set/hwloc_cpu_set initialized in topology.c */
 }
@@ -1183,6 +1178,7 @@ static void _starpu_terminate_workers(struct _starpu_machine_config *pconfig)
 out:
 		STARPU_ASSERT(starpu_task_list_empty(&worker->local_tasks));
 		_starpu_sched_ctx_list_delete(&worker->sched_ctx_list);
+		_starpu_sched_ctx_list_delete(&worker->tmp_sched_ctx_list);
 		_starpu_job_list_delete(worker->terminated_jobs);
 	}
 }

+ 1 - 18
src/core/workers.h

@@ -84,6 +84,7 @@ LIST_TYPE(_starpu_worker,
 	unsigned run_by_starpu; /* Is this run by StarPU or directly by the application ? */
 
 	struct _starpu_sched_ctx_list *sched_ctx_list;
+	struct _starpu_sched_ctx_list *tmp_sched_ctx_list;
 	unsigned nsched_ctxs; /* the no of contexts a worker belongs to*/
 	struct _starpu_barrier_counter tasks_barrier; /* wait for the tasks submitted */
 
@@ -93,13 +94,6 @@ LIST_TYPE(_starpu_worker,
 
 	unsigned spinning_backoff ; /* number of cycles to pause when spinning  */
 
-	/* conditions variables used when parallel sections are executed in contexts */
-	starpu_pthread_cond_t parallel_sect_cond;
-	starpu_pthread_mutex_t parallel_sect_mutex;
-
-	/* boolean indicating that workers should block in order to allow
-	   parallel sections to be executed on their allocated resources */
-	unsigned parallel_sect;
 
 	/* indicate whether the workers shares tasks lists with other workers*/
 	/* in this case when removing him from a context it disapears instantly */
@@ -118,17 +112,6 @@ LIST_TYPE(_starpu_worker,
 	/* flag to know if sched_mutex is locked or not */
 	unsigned sched_mutex_locked;
 
-	/* id of the master worker */
-	int master;
-
-	/* semaphore that block appl thread until starpu threads are 
-	   all blocked and ready to exec the parallel code */
-	sem_t fall_asleep_sem;
-
-	/* semaphore that block appl thread until starpu threads are 
-	   all woke up and ready continue appl */
-	sem_t wake_up_sem;
-
 #ifdef __GLIBC__
 	cpu_set_t cpu_set;
 #endif /* __GLIBC__ */

+ 44 - 10
src/drivers/driver_common/driver_common.c

@@ -196,18 +196,52 @@ static void _starpu_exponential_backoff(struct _starpu_worker *args)
 struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *args, int workerid, unsigned memnode)
 {
 	struct starpu_task *task;
-
-	STARPU_PTHREAD_MUTEX_LOCK(&args->parallel_sect_mutex);
-	if(args->parallel_sect)
+	unsigned needed = 1;
+	while(needed)
 	{
-		_starpu_sched_ctx_signal_worker_blocked(args->workerid);
-		STARPU_PTHREAD_COND_WAIT(&args->parallel_sect_cond, &args->parallel_sect_mutex);
-		starpu_sched_ctx_bind_current_thread_to_cpuid(args->bindid);
-		_starpu_sched_ctx_signal_worker_woke_up(workerid);
-		args->parallel_sect = 0;
-	}
-	STARPU_PTHREAD_MUTEX_UNLOCK(&args->parallel_sect_mutex);
+		struct _starpu_sched_ctx_list *l = NULL;
+		for (l = args->sched_ctx_list; l; l = l->next)
+		{
+			struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(l->sched_ctx);
+			if(sched_ctx && sched_ctx->id > 0 && sched_ctx->id < STARPU_NMAX_SCHED_CTXS)
+			{
+				STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->parallel_sect_mutex[workerid]);
+				if(sched_ctx->parallel_sect[workerid])
+				{
+					needed = 0;
+					_starpu_sched_ctx_signal_worker_blocked(sched_ctx->id, workerid);
+					STARPU_PTHREAD_COND_WAIT(&sched_ctx->parallel_sect_cond[workerid], &sched_ctx->parallel_sect_mutex[workerid]);
+					_starpu_sched_ctx_signal_worker_woke_up(sched_ctx->id, workerid);
+					sched_ctx->parallel_sect[workerid] = 0;
+				}
+				STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->parallel_sect_mutex[workerid]);
+			}
+			if(!needed)
+				break;
+		}
 
+		for (l = args->tmp_sched_ctx_list; l; l = l->next)
+		{
+			struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(l->sched_ctx);
+			if(sched_ctx && sched_ctx->id > 0 && sched_ctx->id < STARPU_NMAX_SCHED_CTXS)
+			{
+				STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->parallel_sect_mutex[workerid]);
+				if(sched_ctx->parallel_sect[workerid])
+				{
+					needed = 0;
+					_starpu_sched_ctx_signal_worker_blocked(sched_ctx->id, workerid);
+					STARPU_PTHREAD_COND_WAIT(&sched_ctx->parallel_sect_cond[workerid], &sched_ctx->parallel_sect_mutex[workerid]);
+					_starpu_sched_ctx_signal_worker_woke_up(sched_ctx->id, workerid);
+					sched_ctx->parallel_sect[workerid] = 0;
+				}
+				STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->parallel_sect_mutex[workerid]);
+			}
+			if(!needed)
+				break;
+		}
+
+		needed = !needed;
+	}
 	STARPU_PTHREAD_MUTEX_LOCK(&args->sched_mutex);
 	task = _starpu_pop_task(args);