Bladeren bron

* sched_ctxs without sched_policy
* terry's algo to chose the master of a group of workers (ctx without sched_policy now)
* bug fix to block workers belonging to a ctx without sched_policy
* example for sched_ctxs without sched_policy

Andra Hugo 11 jaren geleden
bovenliggende
commit
3ca3790ef1

+ 5 - 0
examples/Makefile.am

@@ -189,6 +189,7 @@ examplebin_PROGRAMS +=				\
 	sched_ctx/parallel_code			\
 	sched_ctx/dummy_sched_with_ctx		\
 	sched_ctx/prio				\
+	sched_ctx/sched_ctx_without_sched_policy\
 	worker_collections/worker_tree_example  \
 	worker_collections/worker_list_example  \
 	reductions/dot_product			\
@@ -268,6 +269,7 @@ STARPU_EXAMPLES +=				\
 	sched_ctx/sched_ctx			\
 	sched_ctx/prio				\
 	sched_ctx/dummy_sched_with_ctx		\
+	sched_ctx/sched_ctx_without_sched_policy\
 	worker_collections/worker_tree_example  \
 	worker_collections/worker_list_example  \
 	reductions/dot_product			\
@@ -920,6 +922,9 @@ openmp_vector_scal_omp_CFLAGS = \
 sched_ctx_parallel_code_CFLAGS = \
 	$(AM_CFLAGS) -fopenmp
 
+sched_ctx_sched_ctx_without_sched_policy_CFLAGS = \
+	$(AM_CFLAGS) -fopenmp
+
 endif
 
 showcheck:

+ 13 - 15
examples/sched_ctx/parallel_code.c

@@ -44,14 +44,17 @@ int parallel_code(int sched_ctx)
 		for(i = 0; i < NTASKS; i++)
 			t++;
 	}
+
 	free(cpuids);
 	return t;
 }
 
 static void sched_ctx_func(void *descr[] STARPU_ATTRIBUTE_UNUSED, void *arg)
 {
+	int w = starpu_worker_get_id();
 	unsigned sched_ctx = (unsigned)arg;
-	tasks_executed[sched_ctx-1] = parallel_code(sched_ctx);
+	int n = parallel_code(sched_ctx);
+	printf("w %d executed %d it \n", w, n);
 }
 
 
@@ -68,11 +71,13 @@ static struct starpu_codelet sched_ctx_codelet =
 void *th(void* p)
 {
 	unsigned sched_ctx = (unsigned)p;
-	tasks_executed[sched_ctx-1] = (int)starpu_sched_ctx_exec_parallel_code((void*)parallel_code, (void*)sched_ctx, sched_ctx); 
+	tasks_executed[sched_ctx-1] += (int)starpu_sched_ctx_exec_parallel_code((void*)parallel_code, (void*)sched_ctx, sched_ctx); 
 }
 
 int main(int argc, char **argv)
 {
+	tasks_executed[0] = 0;
+	tasks_executed[1] = 0;
 	int ntasks = NTASKS;
 	int ret, j, k;
 
@@ -104,13 +109,6 @@ int main(int argc, char **argv)
 	procs2[0] = 0;
 #endif
 
-	int p;
-	for(p = 0; p <nprocs1; p++)
-		printf("w %d in ctx 1 \n", procs1[p]);
-
-	for(p = 0; p <nprocs2; p++)
-		printf("w %d in ctx 2 \n", procs2[p]);
-
 	/*create contexts however you want*/
 	unsigned sched_ctx1 = starpu_sched_ctx_create(procs1, nprocs1, "ctx1", STARPU_SCHED_CTX_POLICY_NAME, "dmda", 0);
 	unsigned sched_ctx2 = starpu_sched_ctx_create(procs2, nprocs2, "ctx2", STARPU_SCHED_CTX_POLICY_NAME, "dmda", 0);
@@ -141,14 +139,14 @@ int main(int argc, char **argv)
 	for(j = nprocs5; j < nprocs5+nprocs6; j++)
 		procs6[k++] = procs2[j];
 
-	int master3 = starpu_sched_ctx_book_workers_for_task(sched_ctx1, procs3, nprocs3);
-	int master4 = starpu_sched_ctx_book_workers_for_task(sched_ctx1, procs4, nprocs4);
+	int master3 = starpu_sched_ctx_book_workers_for_task(procs3, nprocs3);
+	int master4 = starpu_sched_ctx_book_workers_for_task(procs4, nprocs4);
 
-	int master5 = starpu_sched_ctx_book_workers_for_task(sched_ctx2, procs5, nprocs5);
-	int master6 = starpu_sched_ctx_book_workers_for_task(sched_ctx2, procs6, nprocs6);
+	int master5 = starpu_sched_ctx_book_workers_for_task(procs5, nprocs5);
+	int master6 = starpu_sched_ctx_book_workers_for_task(procs6, nprocs6);
 
-/* 	int master1 = starpu_sched_ctx_book_workers_for_task(sched_ctx1, procs1, nprocs1); */
-/* 	int master2 = starpu_sched_ctx_book_workers_for_task(sched_ctx2, procs2, nprocs2); */
+/* 	int master1 = starpu_sched_ctx_book_workers_for_task(procs1, nprocs1); */
+/* 	int master2 = starpu_sched_ctx_book_workers_for_task(procs2, nprocs2); */
 
 
 	int i;

+ 1 - 1
include/starpu_sched_ctx.h

@@ -121,7 +121,7 @@ void starpu_sched_ctx_get_available_cpuids(unsigned sched_ctx_id, int **cpuids,
 
 void starpu_sched_ctx_bind_current_thread_to_cpuid(unsigned cpuid);
 
-int starpu_sched_ctx_book_workers_for_task(unsigned sched_ctx_id, int *workerids, int nworkers);
+int starpu_sched_ctx_book_workers_for_task(int *workerids, int nworkers);
 
 void starpu_sched_ctx_unbook_workers_for_task(unsigned sched_ctx_id, int master);
 

+ 163 - 89
src/core/sched_ctx.c

@@ -32,6 +32,8 @@ double flops[STARPU_NMAX_SCHED_CTXS][STARPU_NMAXWORKERS];
 size_t data_size[STARPU_NMAX_SCHED_CTXS][STARPU_NMAXWORKERS];
 
 static unsigned _starpu_get_first_free_sched_ctx(struct _starpu_machine_config *config);
+static void _starpu_sched_ctx_add_workers_to_master(int *workerids, int nworkers, int new_master);
+static void _starpu_sched_ctx_wake_these_workers_up(int *workerids, int nworkers);
 
 static void _starpu_worker_gets_into_ctx(unsigned sched_ctx_id, struct _starpu_worker *worker)
 {
@@ -166,7 +168,16 @@ static void _starpu_add_workers_to_sched_ctx(struct _starpu_sched_ctx *sched_ctx
 		}
 	}
 
-	if(sched_ctx->sched_policy->add_workers)
+	if(!sched_ctx->sched_policy)
+	{
+		if(sched_ctx->master == -1)
+			sched_ctx->master = starpu_sched_ctx_book_workers_for_task(workerids, nworkers);
+		else
+		{
+			_starpu_sched_ctx_add_workers_to_master(workerids, nworkers, sched_ctx->master);
+		}
+	}
+	else if(sched_ctx->sched_policy->add_workers)
 	{
 		if(added_workers)
 		{
@@ -199,6 +210,9 @@ static void _starpu_remove_workers_from_sched_ctx(struct _starpu_sched_ctx *sche
 		}
 	}
 
+	if(!sched_ctx->sched_policy)
+		_starpu_sched_ctx_wake_these_workers_up(removed_workers, *n_removed_workers);
+
 	return;
 }
 
@@ -269,7 +283,7 @@ struct _starpu_sched_ctx* _starpu_create_sched_ctx(struct starpu_sched_policy *p
 
 	starpu_task_list_init(&sched_ctx->empty_ctx_tasks);
 
-	sched_ctx->sched_policy = (struct starpu_sched_policy*)malloc(sizeof(struct starpu_sched_policy));
+	sched_ctx->sched_policy = policy ? (struct starpu_sched_policy*)malloc(sizeof(struct starpu_sched_policy)) : NULL;
 	sched_ctx->is_initial_sched = is_initial_sched;
 	sched_ctx->name = sched_ctx_name;
 	sched_ctx->inheritor = STARPU_NMAX_SCHED_CTXS;
@@ -284,12 +298,18 @@ struct _starpu_sched_ctx* _starpu_create_sched_ctx(struct starpu_sched_policy *p
 	_starpu_barrier_counter_init(&sched_ctx->ready_tasks_barrier, 0);
 
 	sched_ctx->ready_flops = 0.0;
-	/*init the strategy structs and the worker_collection of the ressources of the context */
-	_starpu_init_sched_policy(config, sched_ctx, policy);
-
-	/* construct the collection of workers(list/tree/etc.) */
+	sched_ctx->master = -1;
+	
+        /*init the strategy structs and the worker_collection of the ressources of the context */
+	if(policy)
+		_starpu_init_sched_policy(config, sched_ctx, policy);
+	else
+		starpu_sched_ctx_create_worker_collection(sched_ctx->id, STARPU_WORKER_LIST);
+	
+        /* construct the collection of workers(list/tree/etc.) */
 	sched_ctx->workers->init(sched_ctx->workers);
 
+
 	/* after having an worker_collection on the ressources add them */
 	_starpu_add_workers_to_sched_ctx(sched_ctx, workerids, nworkers_ctx, NULL, NULL);
 
@@ -315,7 +335,7 @@ struct _starpu_sched_ctx* _starpu_create_sched_ctx(struct starpu_sched_policy *p
 			worker->nsched_ctxs++;
 		}
 	}
-
+	
 	return sched_ctx;
 }
 
@@ -555,9 +575,14 @@ void starpu_sched_ctx_set_perf_counters(unsigned sched_ctx_id, void* perf_counte
 static void _starpu_delete_sched_ctx(struct _starpu_sched_ctx *sched_ctx)
 {
 	STARPU_ASSERT(sched_ctx->id != STARPU_NMAX_SCHED_CTXS);
-	_starpu_deinit_sched_policy(sched_ctx);
-	free(sched_ctx->sched_policy);
-	sched_ctx->sched_policy = NULL;
+	if(sched_ctx->sched_policy)
+	{
+		_starpu_deinit_sched_policy(sched_ctx);
+		free(sched_ctx->sched_policy);
+		sched_ctx->sched_policy = NULL;
+	}
+	else
+		starpu_sched_ctx_unbook_workers_for_task(sched_ctx->id, sched_ctx->master);
 
 	STARPU_PTHREAD_MUTEX_DESTROY(&sched_ctx->empty_ctx_mutex);
 	sched_ctx->id = STARPU_NMAX_SCHED_CTXS;
@@ -616,7 +641,6 @@ void starpu_sched_ctx_delete(unsigned sched_ctx_id)
 		_starpu_delete_sched_ctx(sched_ctx);
 
 	}
-
 	/* workerids is malloc-ed in starpu_sched_ctx_get_workers_list, don't forget to free it when
 	   you don't use it anymore */
 	free(workerids);
@@ -1494,7 +1518,6 @@ static void _starpu_sched_ctx_bind_thread_to_ctx_cpus(unsigned sched_ctx_id)
         {
 		hwloc_bitmap_t set = sched_ctx->hwloc_workers_set;
                 int ret;
-		int current_worker_id = starpu_worker_get_id();
                 ret = hwloc_set_cpubind (config->topology.hwtopology, set,
                                          HWLOC_CPUBIND_THREAD);
 		if (ret)
@@ -1571,22 +1594,22 @@ void starpu_sched_ctx_bind_current_thread_to_cpuid(unsigned cpuid STARPU_ATTRIBU
 
 }
 
-static void _starpu_sched_ctx_get_workers_to_sleep(unsigned sched_ctx_id, int *workerids, int nworkers, int master)
+static void _starpu_sched_ctx_get_workers_to_sleep(int *workerids, int nworkers, int master)
 {
 	int current_worker_id = starpu_worker_get_id();
 	
-	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
 	int w;
 	struct _starpu_worker *worker = NULL;
 	for(w = 0; w < nworkers; w++)
 	{
 		worker = _starpu_get_worker_struct(workerids[w]);
-		worker->master = master;
 		if(current_worker_id == -1 || worker->workerid != current_worker_id)
-			STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
+			STARPU_PTHREAD_MUTEX_LOCK(&worker->parallel_sect_mutex);
+
+		worker->master = master;
 		worker->parallel_sect = 1;
 		if(current_worker_id == -1 || worker->workerid != current_worker_id)
-			STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
+			STARPU_PTHREAD_MUTEX_UNLOCK(&worker->parallel_sect_mutex);
 	}
 
 	struct _starpu_worker *master_worker = _starpu_get_worker_struct(master);
@@ -1595,7 +1618,9 @@ static void _starpu_sched_ctx_get_workers_to_sleep(unsigned sched_ctx_id, int *w
 	{
 		workerid = workerids[w];
 		if(current_worker_id == -1 || workerid != current_worker_id)
-			sem_wait(&master_worker->parallel_code_sem);
+		{
+			sem_wait(&master_worker->fall_asleep_sem);
+		}
 	}
 	return;
 }
@@ -1606,12 +1631,21 @@ void _starpu_sched_ctx_signal_worker_blocked(int workerid)
 	struct _starpu_worker *master_worker = _starpu_get_worker_struct(worker->master);
 	struct _starpu_sched_ctx *sched_ctx = NULL;
 	struct _starpu_sched_ctx_list *l = NULL;
-	for (l = worker->sched_ctx_list; l; l = l->next)
-	{
-		sched_ctx = _starpu_get_sched_ctx_struct(l->sched_ctx);
-		if(sched_ctx->id != 0)
-			sem_post(&master_worker->parallel_code_sem);
-	}	
+	sem_post(&master_worker->fall_asleep_sem);
+
+	return;
+}
+
+void _starpu_sched_ctx_signal_worker_woke_up(int workerid)
+{
+	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
+	struct _starpu_worker *master_worker = _starpu_get_worker_struct(worker->master);
+	struct _starpu_sched_ctx *sched_ctx = NULL;
+	struct _starpu_sched_ctx_list *l = NULL;
+	
+	sem_post(&master_worker->wake_up_sem);
+
+	worker->master = -1;
 	return;
 }
 
@@ -1621,6 +1655,7 @@ static void _starpu_sched_ctx_wake_up_workers(unsigned sched_ctx_id, int master)
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
 	struct starpu_worker_collection *workers = sched_ctx->workers;
 	struct _starpu_worker *worker = NULL;
+	struct _starpu_worker *master_worker = _starpu_get_worker_struct(master);
 
 	struct starpu_sched_ctx_iterator it;
 	if(workers->init_iterator)
@@ -1636,12 +1671,13 @@ static void _starpu_sched_ctx_wake_up_workers(unsigned sched_ctx_id, int master)
 				STARPU_PTHREAD_MUTEX_LOCK(&worker->parallel_sect_mutex);
 				STARPU_PTHREAD_COND_SIGNAL(&worker->parallel_sect_cond);
 				STARPU_PTHREAD_MUTEX_UNLOCK(&worker->parallel_sect_mutex);
+				sem_wait(&master_worker->wake_up_sem);
 			}
 			else
 				worker->parallel_sect = 0;
-			worker->master = -1;
 		}
 	}
+
 	return;
 }
 
@@ -1649,10 +1685,10 @@ void* starpu_sched_ctx_exec_parallel_code(void* (*func)(void*), void* param, uns
 {
 	int *workerids;
 	int nworkers = starpu_sched_ctx_get_workers_list(sched_ctx_id, &workerids);
-	_starpu_sched_ctx_get_workers_to_sleep(sched_ctx_id, workerids, nworkers, workerids[nworkers-1]);
+	_starpu_sched_ctx_get_workers_to_sleep(workerids, nworkers, workerids[nworkers-1]);
 
 	/* bind current thread on all workers of the context */
-	_starpu_sched_ctx_bind_thread_to_ctx_cpus(sched_ctx_id);
+//	_starpu_sched_ctx_bind_thread_to_ctx_cpus(sched_ctx_id);
 	
 	/* execute parallel code */
 	void* ret = func(param);
@@ -1683,70 +1719,25 @@ void starpu_sched_ctx_get_available_cpuids(unsigned sched_ctx_id, int **cpuids,
 		workerid = workers->get_next(workers, &it);
 		worker = _starpu_get_worker_struct(workerid);
 		if(worker->master == current_worker_id || workerid == current_worker_id || current_worker_id == -1)
+		{
 			(*cpuids)[w++] = starpu_worker_get_bindid(workerid);
+		}
 	}
 	*ncpuids = w;
 	return;
 }
 
-/* int starpu_sched_ctx_book_workers_for_task(unsigned sched_ctx_id, int *workerids, int nworkers) */
-/* { */
-/* 	int current_worker_id = starpu_worker_get_id(); */
-
-/* 	int final_workerids[nworkers]; */
-/* 	int nfinal_workerids = 0; */
-/* 	int w; */
-/* 	int master = -1; */
-/* 	for(w = 0; w < nworkers; w++) */
-/* 	{ */
-/* 		if(current_worker_id == -1) */
-/* 		{ */
-/* 			final_workerids[nfinal_workerids++] = workerids[w];                           */
-/* 			if(nfinal_workerids == nworkers - 1)                          */
-/* 			{ */
-/* 				master = workerids[nfinal_workerids];   */
-/* 				break;   */
-/* 			} */
-/* 		} */
-/* 		else */
-/* 		{ */
-/* 			if(workerids[w] != current_worker_id) */
-/* 				final_workerids[nfinal_workerids++] = workerids[w]; */
-/* 			else */
-/* 			{ */
-/* 				if(nfinal_workerids == nworkers - 1) */
-/* 				{ */
-/* 					master = workerids[nfinal_workerids]; */
-/* 					break; */
-/* 				} */
-/* 				else */
-/* 					master = current_worker_id; */
-/* 			}	 */
-/* 		} */
-/* 	} */
-/* 	if(master == -1 && nfinal_workerids > 0) */
-/* 	{ */
-/* 		nfinal_workerids--; */
-/* 		master = final_workerids[nfinal_workerids]; */
-/* 	} */
-/* 	/\* get starpu workers to sleep *\/ */
-/* 	_starpu_sched_ctx_get_workers_to_sleep(sched_ctx_id, final_workerids, nfinal_workerids, master); */
-
-/* 	/\* bind current thread on all workers of the context *\/ */
-/* //	_starpu_sched_ctx_bind_thread_to_ctx_cpus(sched_ctx_id); */
-/* 	return master; */
-/* } */
-
-static void _starpu_sched_ctx_wake_these_workers_up(unsigned sched_ctx_id, int *workerids, int nworkers)
+static void _starpu_sched_ctx_wake_these_workers_up(int *workerids, int nworkers)
 {
 	int current_worker_id = starpu_worker_get_id();
-	
-	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+
+	int masters[nworkers];
 	int w;
 	struct _starpu_worker *worker = NULL;
 	for(w = 0; w < nworkers; w++)
 	{
 		worker = _starpu_get_worker_struct(workerids[w]);
+		masters[w] = worker->master;
 		if(current_worker_id == -1 || worker->workerid != current_worker_id)
 		{
 			STARPU_PTHREAD_MUTEX_LOCK(&worker->parallel_sect_mutex);
@@ -1755,14 +1746,95 @@ static void _starpu_sched_ctx_wake_these_workers_up(unsigned sched_ctx_id, int *
 		}
 		else
 			worker->parallel_sect = 0;
+		worker->master = -1;
+	}
+
+	int workerid;
+	struct _starpu_worker *master_worker = NULL;
+	for(w = 0; w < nworkers; w++)
+	{
+		workerid = workerids[w];
+		if(masters[w] != -1)
+		{
+			master_worker = _starpu_get_worker_struct(masters[w]);
+			if(current_worker_id == -1 || workerid != current_worker_id)
+				sem_wait(&master_worker->wake_up_sem);
+		}
 	}
+
 	return;
 }
 
-
-int starpu_sched_ctx_book_workers_for_task(unsigned sched_ctx_id, int *workerids, int nworkers)
-{ 
+static int _starpu_sched_ctx_find_master(int *workerids, int nworkers)
+{
 	int new_master = workerids[nworkers-1];
+        int current_worker_id = starpu_worker_get_id();
+        int current_is_in_section = 0;
+        int npotential_masters = 0;
+        int nawake_workers = 0;
+        int ntrue_masters = 0;
+        int potential_masters[nworkers];
+        int awake_workers[nworkers];
+        int true_masters[nworkers];
+
+        int i,w;
+        for(w = 0 ; w < nworkers ; w++)
+        {
+                if (current_worker_id == workerids[w])
+                        current_is_in_section = 1;
+
+                struct _starpu_worker *worker = _starpu_get_worker_struct(workerids[w]);
+                if (worker->master > -1)
+		{
+                        int already_seen = 0;
+                        //Could create a function for this. Basically searching an element in an array.                                                                                                             
+                        for (i=0 ; i<npotential_masters ; i++)
+                        {
+                                if (potential_masters[i] == worker->master)
+				{
+                                        already_seen = 1;
+                                        break;
+				}
+                        }
+                        if (!already_seen)
+				potential_masters[npotential_masters++] = worker->master;
+                }
+                else if (worker->master == -1)
+                        awake_workers[nawake_workers++] = workerids[w];
+        }
+
+        for (i = 0 ; i < npotential_masters ; i++) {
+		int master_is_in_section = 0;
+		//Could create a function for this. Basically searching an element in an array.                                                                                                                     
+		for (w = 0 ; w < nworkers ; w++)
+		{
+			if (workerids[w] == potential_masters[i])
+			{
+				master_is_in_section = 1;
+				break;
+			}
+		}
+                if (master_is_in_section)
+			true_masters[ntrue_masters++] = potential_masters[i];
+        }
+
+        if (current_is_in_section)
+                new_master = current_worker_id;
+        else
+        {
+                if (ntrue_masters > 1)
+		{
+                        if (nawake_workers > 0)
+                                new_master = awake_workers[nawake_workers - 1];
+                        else
+                                new_master = true_masters[ntrue_masters - 1];
+		}
+	}
+	return new_master;
+}
+
+static void _starpu_sched_ctx_add_workers_to_master(int *workerids, int nworkers, int new_master)
+{
 	int w;
 	int nput_to_sleep = 0;
 	int nwake_up = 0;
@@ -1776,15 +1848,17 @@ int starpu_sched_ctx_book_workers_for_task(unsigned sched_ctx_id, int *workerids
 			put_to_sleep[nput_to_sleep++] = workerids[w];
 		else if(worker->master != -1 && workerids[w] == new_master)
 			wake_up[nwake_up++] = workerids[w];
-		
-		if (workerids[w] != new_master)
-			worker->master = new_master;
-		else
-			worker->master = -1;
 	}
-	_starpu_sched_ctx_wake_these_workers_up(sched_ctx_id, wake_up, nwake_up);
-	_starpu_sched_ctx_get_workers_to_sleep(sched_ctx_id, put_to_sleep, nput_to_sleep, new_master);
-	
+
+	_starpu_sched_ctx_wake_these_workers_up(wake_up, nwake_up);
+	_starpu_sched_ctx_get_workers_to_sleep(put_to_sleep, nput_to_sleep, new_master);
+
+}
+
+int starpu_sched_ctx_book_workers_for_task(int *workerids, int nworkers)
+{ 
+	int new_master = _starpu_sched_ctx_find_master(workerids, nworkers);	
+	_starpu_sched_ctx_add_workers_to_master(workerids, nworkers, new_master);
 	return new_master;
 }
 

+ 8 - 0
src/core/sched_ctx.h

@@ -119,6 +119,11 @@ struct _starpu_sched_ctx
 	
 	/* value placing the contexts in their hierarchy */
 	unsigned hierarchy_level;
+
+	/* if we execute non-StarPU code inside the context 
+	   we have a single master worker that stays awake, 
+	   if not master is -1 */
+	int master;
 };
 
 struct _starpu_machine_config;
@@ -179,6 +184,9 @@ unsigned _starpu_sched_ctx_last_worker_awake(struct _starpu_worker *worker);
 /* let the appl know that the worker blocked to execute parallel code */
 void _starpu_sched_ctx_signal_worker_blocked(int workerid);
 
+/* let the appl know that the worker woke up */
+void _starpu_sched_ctx_signal_worker_woke_up(int workerid);
+
 /* If starpu_sched_ctx_set_context() has been called, returns the context
  * id set by its last call, or the id of the initial context */
 unsigned _starpu_sched_ctx_get_current_context();

+ 24 - 14
src/core/sched_policy.c

@@ -444,13 +444,20 @@ int _starpu_push_task_to_workers(struct starpu_task *task)
 				starpu_prefetch_task_input_on_node(task, config->scc_nodeid);
 		}
 
-		STARPU_ASSERT(sched_ctx->sched_policy->push_task);
-		/* check out if there are any workers in the context */
-		starpu_pthread_rwlock_t *changing_ctx_mutex = _starpu_sched_ctx_get_changing_ctx_mutex(sched_ctx->id);
-		STARPU_PTHREAD_RWLOCK_RDLOCK(changing_ctx_mutex);
-		nworkers = starpu_sched_ctx_get_nworkers(sched_ctx->id);
-		ret = nworkers == 0 ? -1 : sched_ctx->sched_policy->push_task(task);
-		STARPU_PTHREAD_RWLOCK_UNLOCK(changing_ctx_mutex);
+		if(!sched_ctx->sched_policy)
+		{
+			ret = _starpu_push_task_on_specific_worker(task, sched_ctx->master);
+		}
+		else
+		{
+			STARPU_ASSERT(sched_ctx->sched_policy->push_task);
+			/* check out if there are any workers in the context */
+			starpu_pthread_rwlock_t *changing_ctx_mutex = _starpu_sched_ctx_get_changing_ctx_mutex(sched_ctx->id);
+			STARPU_PTHREAD_RWLOCK_RDLOCK(changing_ctx_mutex);
+			nworkers = starpu_sched_ctx_get_nworkers(sched_ctx->id);
+			ret = nworkers == 0 ? -1 : sched_ctx->sched_policy->push_task(task);
+			STARPU_PTHREAD_RWLOCK_UNLOCK(changing_ctx_mutex);
+		}
 
 		if(ret == -1)
 		{
@@ -853,18 +860,21 @@ profiling:
 
 struct starpu_task *_starpu_pop_every_task(struct _starpu_sched_ctx *sched_ctx)
 {
-	STARPU_ASSERT(sched_ctx->sched_policy->pop_every_task);
-
-	/* TODO set profiling info */
-	if(sched_ctx->sched_policy->pop_every_task)
-		return sched_ctx->sched_policy->pop_every_task(sched_ctx->id);
+	if(sched_ctx->sched_policy)
+	{
+		STARPU_ASSERT(sched_ctx->sched_policy->pop_every_task);
+		
+		/* TODO set profiling info */
+		if(sched_ctx->sched_policy->pop_every_task)
+			return sched_ctx->sched_policy->pop_every_task(sched_ctx->id);
+	}
 	return NULL;
 }
 
 void _starpu_sched_pre_exec_hook(struct starpu_task *task)
 {
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
-	if (sched_ctx->sched_policy->pre_exec_hook)
+	if (sched_ctx->sched_policy && sched_ctx->sched_policy->pre_exec_hook)
 		sched_ctx->sched_policy->pre_exec_hook(task);
 }
 
@@ -872,7 +882,7 @@ void _starpu_sched_post_exec_hook(struct starpu_task *task)
 {
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
 
-	if (sched_ctx->sched_policy->post_exec_hook)
+	if (sched_ctx->sched_policy && sched_ctx->sched_policy->post_exec_hook)
 		sched_ctx->sched_policy->post_exec_hook(task);
 }
 

+ 2 - 1
src/core/workers.c

@@ -412,7 +412,8 @@ static void _starpu_worker_init(struct _starpu_worker *workerarg, struct _starpu
 	starpu_task_list_init(&workerarg->local_tasks);
 	workerarg->current_task = NULL;
 	workerarg->set = NULL;
-	sem_init(&workerarg->parallel_code_sem, 0, 0);
+	sem_init(&workerarg->fall_asleep_sem, 0, 0);
+	sem_init(&workerarg->wake_up_sem, 0, 0);
 
 	/* if some codelet's termination cannot be handled directly :
 	 * for instance in the Gordon driver, Gordon tasks' callbacks

+ 7 - 3
src/core/workers.h

@@ -121,9 +121,13 @@ LIST_TYPE(_starpu_worker,
 	/* id of the master worker */
 	int master;
 
-	/* semaphore that block appl thread until threads are ready 
-	   to exec the parallel code */
-	sem_t parallel_code_sem;
+	/* semaphore that block appl thread until starpu threads are 
+	   all blocked and ready to exec the parallel code */
+	sem_t fall_asleep_sem;
+
+	/* semaphore that block appl thread until starpu threads are 
+	   all woke up and ready continue appl */
+	sem_t wake_up_sem;
 
 #ifdef __GLIBC__
 	cpu_set_t cpu_set;

+ 4 - 4
src/drivers/driver_common/driver_common.c

@@ -197,18 +197,18 @@ struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *args, int wor
 {
 	struct starpu_task *task;
 
-	STARPU_PTHREAD_MUTEX_LOCK(&args->sched_mutex);
+	STARPU_PTHREAD_MUTEX_LOCK(&args->parallel_sect_mutex);
 	if(args->parallel_sect)
 	{
-		STARPU_PTHREAD_MUTEX_LOCK(&args->parallel_sect_mutex);
-		STARPU_PTHREAD_MUTEX_UNLOCK(&args->sched_mutex);
 		_starpu_sched_ctx_signal_worker_blocked(args->workerid);
 		STARPU_PTHREAD_COND_WAIT(&args->parallel_sect_cond, &args->parallel_sect_mutex);
 		starpu_sched_ctx_bind_current_thread_to_cpuid(args->bindid);
-		STARPU_PTHREAD_MUTEX_UNLOCK(&args->parallel_sect_mutex);
+		_starpu_sched_ctx_signal_worker_woke_up(workerid);
 		args->parallel_sect = 0;
 	}
+	STARPU_PTHREAD_MUTEX_UNLOCK(&args->parallel_sect_mutex);
 
+	STARPU_PTHREAD_MUTEX_LOCK(&args->sched_mutex);
 	task = _starpu_pop_task(args);
 
 	if (task == NULL)