Browse Source

execute parallel code (openmp for eg) inside contexts ------ temporary solution

Andra Hugo 11 years ago
parent
commit
6fd68c0e07

+ 118 - 25
examples/sched_ctx/parallel_code.c

@@ -16,23 +16,49 @@
  */
  */
 
 
 #include <starpu.h>
 #include <starpu.h>
+#include <omp.h>
 
 
 #ifdef STARPU_QUICK_CHECK
 #ifdef STARPU_QUICK_CHECK
 #define NTASKS 64
 #define NTASKS 64
 #else
 #else
-#define NTASKS 1000
+#define NTASKS 10
 #endif
 #endif
 
 
-int tasks_executed = 0;
+int tasks_executed[2];
 starpu_pthread_mutex_t mut;
 starpu_pthread_mutex_t mut;
 
 
-static void sched_ctx_func(void *descr[] STARPU_ATTRIBUTE_UNUSED, void *arg STARPU_ATTRIBUTE_UNUSED)
+int parallel_code(int sched_ctx)
 {
 {
-	starpu_pthread_mutex_lock(&mut);
-	tasks_executed++;
-	starpu_pthread_mutex_unlock(&mut);
+	int i;
+	int t = 0;
+	int *cpuids = NULL;
+	int ncpuids = 0;
+	starpu_sched_ctx_get_available_cpuids(sched_ctx, &cpuids, &ncpuids);
+
+//	printf("execute task of %d threads \n", ncpuids);
+	omp_set_nested(1);
+#pragma omp parallel num_threads(1)
+	{
+#pragma omp parallel num_threads(ncpuids)
+		{
+			starpu_sched_ctx_bind_current_thread_to_cpuid(cpuids[omp_get_thread_num()]);
+// 			printf("cpu = %d ctx%d nth = %d\n", sched_getcpu(), sched_ctx, omp_get_num_threads());
+#pragma omp for
+			for(i = 0; i < NTASKS; i++)
+				t++;
+		}
+	}
+	free(cpuids);
+	return t;
+}
+
+static void sched_ctx_func(void *descr[] STARPU_ATTRIBUTE_UNUSED, void *arg)
+{
+	unsigned sched_ctx = (unsigned)arg;
+	tasks_executed[sched_ctx-1] = parallel_code(sched_ctx);
 }
 }
 
 
+
 static struct starpu_codelet sched_ctx_codelet =
 static struct starpu_codelet sched_ctx_codelet =
 {
 {
 	.cpu_funcs = {sched_ctx_func, NULL},
 	.cpu_funcs = {sched_ctx_func, NULL},
@@ -43,15 +69,10 @@ static struct starpu_codelet sched_ctx_codelet =
 	.name = "sched_ctx"
 	.name = "sched_ctx"
 };
 };
 
 
-int parallel_code(int nprocs)
+void *th(void* p)
 {
 {
-	int i;
-	int tasks = 0;
-#pragma omp parallel for num_threads(nprocs)
-	for (i = 0; i < NTASKS; i++) 
-		tasks++;
-
-	return tasks;
+	unsigned sched_ctx = (unsigned)p;
+	tasks_executed[sched_ctx-1] = (int)starpu_sched_ctx_exec_parallel_code((void*)parallel_code, (void*)sched_ctx, sched_ctx); 
 }
 }
 
 
 int main(int argc, char **argv)
 int main(int argc, char **argv)
@@ -67,12 +88,12 @@ int main(int argc, char **argv)
 	starpu_pthread_mutex_init(&mut, NULL);
 	starpu_pthread_mutex_init(&mut, NULL);
 	int nprocs1 = 1;
 	int nprocs1 = 1;
 	int nprocs2 = 1;
 	int nprocs2 = 1;
-	int procs1[20], procs2[20];
-	procs1[0] = 0;
-	procs2[0] = 0;
+	int *procs1, *procs2;
 
 
 #ifdef STARPU_USE_CPU
 #ifdef STARPU_USE_CPU
 	unsigned ncpus =  starpu_cpu_worker_get_count();
 	unsigned ncpus =  starpu_cpu_worker_get_count();
+	procs1 = (int*)malloc(ncpus*sizeof(int));
+	procs2 = (int*)malloc(ncpus*sizeof(int));
 	starpu_worker_get_ids_by_type(STARPU_CPU_WORKER, procs1, ncpus);
 	starpu_worker_get_ids_by_type(STARPU_CPU_WORKER, procs1, ncpus);
 
 
 	nprocs1 = ncpus/2;
 	nprocs1 = ncpus/2;
@@ -80,14 +101,60 @@ int main(int argc, char **argv)
 	int j, k = 0;
 	int j, k = 0;
 	for(j = nprocs1; j < nprocs1+nprocs2; j++)
 	for(j = nprocs1; j < nprocs1+nprocs2; j++)
 		procs2[k++] = j;
 		procs2[k++] = j;
+#else
+	procs1 = (int*)malloc(nprocs1*sizeof(int));
+	procs2 = (int*)malloc(nprocs2*sizeof(int));
+	procs1[0] = 0:
+	procs2[0] = 0:
+
 #endif
 #endif
 
 
+	int p;
+	for(p = 0; p <nprocs1; p++)
+		printf("w %d in ctx 1 \n", procs1[p]);
+
+	for(p = 0; p <nprocs2; p++)
+		printf("w %d in ctx 2 \n", procs2[p]);
+
 	/*create contexts however you want*/
 	/*create contexts however you want*/
 	unsigned sched_ctx1 = starpu_sched_ctx_create(procs1, nprocs1, "ctx1", STARPU_SCHED_CTX_POLICY_NAME, "dmda", 0);
 	unsigned sched_ctx1 = starpu_sched_ctx_create(procs1, nprocs1, "ctx1", STARPU_SCHED_CTX_POLICY_NAME, "dmda", 0);
 	unsigned sched_ctx2 = starpu_sched_ctx_create(procs2, nprocs2, "ctx2", STARPU_SCHED_CTX_POLICY_NAME, "dmda", 0);
 	unsigned sched_ctx2 = starpu_sched_ctx_create(procs2, nprocs2, "ctx2", STARPU_SCHED_CTX_POLICY_NAME, "dmda", 0);
 
 
 	/*indicate what to do with the resources when context 2 finishes (it depends on your application)*/
 	/*indicate what to do with the resources when context 2 finishes (it depends on your application)*/
-	starpu_sched_ctx_set_inheritor(sched_ctx2, sched_ctx1);
+//	starpu_sched_ctx_set_inheritor(sched_ctx2, sched_ctx1);
+
+	int nprocs3 = nprocs1/2;
+	int nprocs4 = nprocs1/2;
+	int nprocs5 = nprocs2/2;
+	int nprocs6 = nprocs2/2;
+	int procs3[nprocs3];
+	int procs4[nprocs4];
+	int procs5[nprocs5];
+	int procs6[nprocs6];
+
+	k = 0;
+	for(j = 0; j < nprocs3; j++)
+		procs3[k++] = procs1[j];
+	k = 0;
+	for(j = nprocs3; j < nprocs3+nprocs4; j++)
+		procs4[k++] = procs1[j];
+
+	k = 0;
+	for(j = 0; j < nprocs5; j++)
+		procs5[k++] = procs2[j];
+	k = 0;
+	for(j = nprocs5; j < nprocs5+nprocs6; j++)
+		procs6[k++] = procs2[j];
+
+	int master3 = starpu_sched_ctx_book_workers_for_task(sched_ctx1, procs3, nprocs3);
+	int master4 = starpu_sched_ctx_book_workers_for_task(sched_ctx1, procs4, nprocs4);
+
+	int master5 = starpu_sched_ctx_book_workers_for_task(sched_ctx2, procs5, nprocs5);
+	int master6 = starpu_sched_ctx_book_workers_for_task(sched_ctx2, procs6, nprocs6);
+
+/* 	int master1 = starpu_sched_ctx_book_workers_for_task(sched_ctx1, procs1, nprocs1); */
+/* 	int master2 = starpu_sched_ctx_book_workers_for_task(sched_ctx2, procs2, nprocs2); */
+
 
 
 	int i;
 	int i;
 	for (i = 0; i < ntasks; i++)
 	for (i = 0; i < ntasks; i++)
@@ -95,7 +162,7 @@ int main(int argc, char **argv)
 		struct starpu_task *task = starpu_task_create();
 		struct starpu_task *task = starpu_task_create();
 
 
 		task->cl = &sched_ctx_codelet;
 		task->cl = &sched_ctx_codelet;
-		task->cl_arg = NULL;
+		task->cl_arg = sched_ctx1;
 
 
 		/*submit tasks to context*/
 		/*submit tasks to context*/
 		ret = starpu_task_submit_to_ctx(task,sched_ctx1);
 		ret = starpu_task_submit_to_ctx(task,sched_ctx1);
@@ -103,23 +170,49 @@ int main(int argc, char **argv)
 		STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");
 		STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");
 	}
 	}
 
 
+	for (i = 0; i < ntasks; i++)
+	{
+		struct starpu_task *task = starpu_task_create();
+
+		task->cl = &sched_ctx_codelet;
+		task->cl_arg = sched_ctx2;
+
+		/*submit tasks to context*/
+		ret = starpu_task_submit_to_ctx(task,sched_ctx2);
+
+		STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");
+	}
+
+
 	/* tell starpu when you finished submitting tasks to this context
 	/* tell starpu when you finished submitting tasks to this context
 	   in order to allow moving resources from this context to the inheritor one
 	   in order to allow moving resources from this context to the inheritor one
 	   when its corresponding tasks finished executing */
 	   when its corresponding tasks finished executing */
 
 
-	starpu_sched_ctx_finished_submit(sched_ctx1);
 
 
-	/* execute an openmp code */
-	int ret_ntasks = (int)starpu_sched_ctx_exec_parallel_code((void*)parallel_code, (void*)nprocs2, sched_ctx2);
-	starpu_sched_ctx_finished_submit(sched_ctx2);
 
 
 	/* wait for all tasks at the end*/
 	/* wait for all tasks at the end*/
 	starpu_task_wait_for_all();
 	starpu_task_wait_for_all();
 
 
+/* 	starpu_sched_ctx_unbook_workers_for_task(sched_ctx1, master1); */
+/* 	starpu_sched_ctx_unbook_workers_for_task(sched_ctx2, master2); */
+
+	starpu_sched_ctx_unbook_workers_for_task(sched_ctx1, master3);
+	starpu_sched_ctx_unbook_workers_for_task(sched_ctx1, master4);
+
+	starpu_sched_ctx_unbook_workers_for_task(sched_ctx2, master5);
+	starpu_sched_ctx_unbook_workers_for_task(sched_ctx2, master6);
+
+	pthread_t mp[2];
+	pthread_create(&mp[0], NULL, th, sched_ctx1);
+	pthread_create(&mp[1], NULL, th, sched_ctx2);
+
+	pthread_join(mp[0], NULL);
+	pthread_join(mp[1], NULL);
+
 	starpu_sched_ctx_delete(sched_ctx1);
 	starpu_sched_ctx_delete(sched_ctx1);
 	starpu_sched_ctx_delete(sched_ctx2);
 	starpu_sched_ctx_delete(sched_ctx2);
-	printf("ctx%d: tasks starpu executed %d out of %d\n", sched_ctx1, tasks_executed, ntasks);
-	printf("ctx%d: tasks openmp executed %d out of %d\n", sched_ctx2, ret_ntasks, NTASKS);
+	printf("ctx%d: tasks starpu executed %d out of %d\n", sched_ctx1, tasks_executed[0], NTASKS);
+	printf("ctx%d: tasks starpu executed %d out of %d\n", sched_ctx2, tasks_executed[1], NTASKS);
 	starpu_shutdown();
 	starpu_shutdown();
 
 
 	return 0;
 	return 0;

+ 9 - 0
include/starpu_sched_ctx.h

@@ -116,6 +116,15 @@ void starpu_sched_ctx_set_priority(int *workers, int nworkers, unsigned sched_ct
 void starpu_sched_ctx_set_priority_on_level(int* workers_to_add, unsigned nworkers_to_add, unsigned sched_ctx, unsigned priority);
 void starpu_sched_ctx_set_priority_on_level(int* workers_to_add, unsigned nworkers_to_add, unsigned sched_ctx, unsigned priority);
 
 
 unsigned starpu_sched_ctx_get_priority(int worker, unsigned sched_ctx_id);
 unsigned starpu_sched_ctx_get_priority(int worker, unsigned sched_ctx_id);
+
+void starpu_sched_ctx_get_available_cpuids(unsigned sched_ctx_id, int **cpuids, int *ncpuids);
+
+void starpu_sched_ctx_bind_current_thread_to_cpuid(unsigned cpuid);
+
+int starpu_sched_ctx_book_workers_for_task(unsigned sched_ctx_id, int *workerids, int nworkers);
+
+void starpu_sched_ctx_unbook_workers_for_task(unsigned sched_ctx_id, int master);
+
 #ifdef STARPU_USE_SC_HYPERVISOR
 #ifdef STARPU_USE_SC_HYPERVISOR
 void starpu_sched_ctx_call_pushed_task_cb(int workerid, unsigned sched_ctx_id);
 void starpu_sched_ctx_call_pushed_task_cb(int workerid, unsigned sched_ctx_id);
 #endif /* STARPU_USE_SC_HYPERVISOR */
 #endif /* STARPU_USE_SC_HYPERVISOR */

+ 119 - 30
src/core/sched_ctx.c

@@ -219,23 +219,22 @@ static void _starpu_sched_ctx_free_scheduling_data(struct _starpu_sched_ctx *sch
 #ifdef STARPU_HAVE_HWLOC
 #ifdef STARPU_HAVE_HWLOC
 static void _starpu_sched_ctx_create_hwloc_tree(struct _starpu_sched_ctx *sched_ctx)
 static void _starpu_sched_ctx_create_hwloc_tree(struct _starpu_sched_ctx *sched_ctx)
 {
 {
-	struct _starpu_machine_config *config = (struct _starpu_machine_config *)_starpu_get_machine_config();
 	sched_ctx->hwloc_workers_set = hwloc_bitmap_alloc();
 	sched_ctx->hwloc_workers_set = hwloc_bitmap_alloc();
 
 
 	struct starpu_worker_collection *workers = sched_ctx->workers;
 	struct starpu_worker_collection *workers = sched_ctx->workers;
-	int worker;
+	struct _starpu_worker *worker;
 	struct starpu_sched_ctx_iterator it;
 	struct starpu_sched_ctx_iterator it;
 	if(workers->init_iterator)
 	if(workers->init_iterator)
 		workers->init_iterator(workers, &it);
 		workers->init_iterator(workers, &it);
 
 
 	while(workers->has_next(workers, &it))
 	while(workers->has_next(workers, &it))
 	{
 	{
-		worker = workers->get_next(workers, &it);
-		if(!starpu_worker_is_combined_worker(worker))
+		worker = _starpu_get_worker_struct(workers->get_next(workers, &it));
+		if(!starpu_worker_is_combined_worker(worker->workerid))
 		{
 		{
 			hwloc_bitmap_or(sched_ctx->hwloc_workers_set,
 			hwloc_bitmap_or(sched_ctx->hwloc_workers_set,
 					sched_ctx->hwloc_workers_set,
 					sched_ctx->hwloc_workers_set,
-					config->workers[worker].hwloc_cpu_set);
+					worker->hwloc_cpu_set);
 		}
 		}
 
 
 	}
 	}
@@ -1496,7 +1495,7 @@ static void _starpu_sched_ctx_bind_thread_to_ctx_cpus(unsigned sched_ctx_id)
         {
         {
 		hwloc_bitmap_t set = sched_ctx->hwloc_workers_set;
 		hwloc_bitmap_t set = sched_ctx->hwloc_workers_set;
                 int ret;
                 int ret;
-		
+		int current_worker_id = starpu_worker_get_id();
                 ret = hwloc_set_cpubind (config->topology.hwtopology, set,
                 ret = hwloc_set_cpubind (config->topology.hwtopology, set,
                                          HWLOC_CPUBIND_THREAD);
                                          HWLOC_CPUBIND_THREAD);
 		if (ret)
 		if (ret)
@@ -1509,10 +1508,11 @@ static void _starpu_sched_ctx_bind_thread_to_ctx_cpus(unsigned sched_ctx_id)
 #else
 #else
 #warning no sched ctx CPU binding support
 #warning no sched ctx CPU binding support
 #endif
 #endif
+
 	return;
 	return;
 }
 }
 
 
-void _starpu_sched_ctx_rebind_thread_to_its_cpu(unsigned cpuid)
+void starpu_sched_ctx_bind_current_thread_to_cpuid(unsigned cpuid)
 {
 {
 	struct _starpu_machine_config *config = _starpu_get_machine_config();
 	struct _starpu_machine_config *config = _starpu_get_machine_config();
 
 
@@ -1527,7 +1527,7 @@ void _starpu_sched_ctx_rebind_thread_to_its_cpu(unsigned cpuid)
 	if (support->cpubind->set_thisthread_cpubind)
 	if (support->cpubind->set_thisthread_cpubind)
 	{
 	{
 		hwloc_obj_t obj = hwloc_get_obj_by_depth (config->topology.hwtopology,
 		hwloc_obj_t obj = hwloc_get_obj_by_depth (config->topology.hwtopology,
-							  config->cpu_depth, cpuid);
+							  config->pu_depth, cpuid);
 		hwloc_bitmap_t set = obj->cpuset;
 		hwloc_bitmap_t set = obj->cpuset;
 		int ret;
 		int ret;
 		
 		
@@ -1569,27 +1569,30 @@ void _starpu_sched_ctx_rebind_thread_to_its_cpu(unsigned cpuid)
 
 
 }
 }
 
 
-static void _starpu_sched_ctx_get_workers_to_sleep(unsigned sched_ctx_id)
+static void _starpu_sched_ctx_get_workers_to_sleep(unsigned sched_ctx_id, int *workerids, int nworkers, int master)
 {
 {
+	int current_worker_id = starpu_worker_get_id();
+	
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	struct starpu_worker_collection *workers = sched_ctx->workers;
-	struct starpu_sched_ctx_iterator it;
+	int w;
 	struct _starpu_worker *worker = NULL;
 	struct _starpu_worker *worker = NULL;
-	if(workers->init_iterator)
-		workers->init_iterator(workers, &it);
-
-	while(workers->has_next(workers, &it))
+	for(w = 0; w < nworkers; w++)
 	{
 	{
-		worker = _starpu_get_worker_struct(workers->get_next(workers, &it));
-		STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
+		worker = _starpu_get_worker_struct(workerids[w]);
+		worker->master = master;
+		if(current_worker_id == -1 || worker->workerid != current_worker_id)
+			STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
 		worker->parallel_sect = 1;
 		worker->parallel_sect = 1;
-		STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
+		if(current_worker_id == -1 || worker->workerid != current_worker_id)
+			STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
 	}
 	}
 
 
-	while(workers->has_next(workers, &it))
+	int workerid;
+	for(w = 0; w < nworkers; w++)
 	{
 	{
-		workers->get_next(workers, &it);
-		sem_wait(&sched_ctx->parallel_code_sem);
+		workerid = workerids[w];
+		if(current_worker_id == -1 || workerid != current_worker_id)
+			sem_wait(&sched_ctx->parallel_code_sem);
 	}
 	}
 	return;
 	return;
 }
 }
@@ -1608,30 +1611,41 @@ void _starpu_sched_ctx_signal_worker_blocked(int workerid)
 	return;
 	return;
 }
 }
 
 
-static void _starpu_sched_ctx_wake_up_workers(unsigned sched_ctx_id)
+static void _starpu_sched_ctx_wake_up_workers(unsigned sched_ctx_id, int master)
 {
 {
+	int current_worker_id = starpu_worker_get_id();
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-
 	struct starpu_worker_collection *workers = sched_ctx->workers;
 	struct starpu_worker_collection *workers = sched_ctx->workers;
-	struct starpu_sched_ctx_iterator it;
 	struct _starpu_worker *worker = NULL;
 	struct _starpu_worker *worker = NULL;
+
+	struct starpu_sched_ctx_iterator it;
 	if(workers->init_iterator)
 	if(workers->init_iterator)
 		workers->init_iterator(workers, &it);
 		workers->init_iterator(workers, &it);
 
 
 	while(workers->has_next(workers, &it))
 	while(workers->has_next(workers, &it))
 	{
 	{
 		worker = _starpu_get_worker_struct(workers->get_next(workers, &it));
 		worker = _starpu_get_worker_struct(workers->get_next(workers, &it));
-		STARPU_PTHREAD_MUTEX_LOCK(&worker->parallel_sect_mutex);
-		STARPU_PTHREAD_COND_SIGNAL(&worker->parallel_sect_cond);
-		STARPU_PTHREAD_MUTEX_UNLOCK(&worker->parallel_sect_mutex);
+		if(worker->master == master)
+		{
+			if(current_worker_id == -1 || worker->workerid != current_worker_id)
+			{
+				STARPU_PTHREAD_MUTEX_LOCK(&worker->parallel_sect_mutex);
+				STARPU_PTHREAD_COND_SIGNAL(&worker->parallel_sect_cond);
+				STARPU_PTHREAD_MUTEX_UNLOCK(&worker->parallel_sect_mutex);
+			}
+			else
+				worker->parallel_sect = 0;
+			worker->master = -1;
+		}
 	}
 	}
 	return;
 	return;
 }
 }
 
 
 void* starpu_sched_ctx_exec_parallel_code(void* (*func)(void*), void* param, unsigned sched_ctx_id)
 void* starpu_sched_ctx_exec_parallel_code(void* (*func)(void*), void* param, unsigned sched_ctx_id)
 {
 {
-	/* get starpu workers to sleep */
-	_starpu_sched_ctx_get_workers_to_sleep(sched_ctx_id);
+	int *workerids;
+	int nworkers = starpu_sched_ctx_get_workers_list(sched_ctx_id, &workerids);
+	int master = starpu_sched_ctx_book_workers_for_task(sched_ctx_id, workerids, nworkers);
 
 
 	/* bind current thread on all workers of the context */
 	/* bind current thread on all workers of the context */
 	_starpu_sched_ctx_bind_thread_to_ctx_cpus(sched_ctx_id);
 	_starpu_sched_ctx_bind_thread_to_ctx_cpus(sched_ctx_id);
@@ -1640,7 +1654,82 @@ void* starpu_sched_ctx_exec_parallel_code(void* (*func)(void*), void* param, uns
 	void* ret = func(param);
 	void* ret = func(param);
 
 
 	/* wake up starpu workers */
 	/* wake up starpu workers */
-	_starpu_sched_ctx_wake_up_workers(sched_ctx_id);
+	starpu_sched_ctx_unbook_workers_for_task(sched_ctx_id, master);
 
 
 	return ret;
 	return ret;
 }
 }
+
+void starpu_sched_ctx_get_available_cpuids(unsigned sched_ctx_id, int **cpuids, int *ncpuids)
+{
+	int current_worker_id = starpu_worker_get_id();
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	struct starpu_worker_collection *workers = sched_ctx->workers;
+
+	(*cpuids) = (int*)malloc(workers->nworkers*sizeof(int));
+	int w = 0;
+
+	struct _starpu_worker *worker = NULL;
+	struct starpu_sched_ctx_iterator it;
+	int workerid;
+	if(workers->init_iterator)
+		workers->init_iterator(workers, &it);
+
+	while(workers->has_next(workers, &it))
+	{
+		workerid = workers->get_next(workers, &it);
+		worker = _starpu_get_worker_struct(workerid);
+		if(worker->master == current_worker_id || workerid == current_worker_id)
+			(*cpuids)[w++] = starpu_worker_get_bindid(workerid);
+	}
+	*ncpuids = w;
+	return;
+}
+
+int starpu_sched_ctx_book_workers_for_task(unsigned sched_ctx_id, int *workerids, int nworkers)
+{
+	int current_worker_id = starpu_worker_get_id();
+
+	int final_workerids[nworkers];
+	int nfinal_workerids = 0;
+	int w;
+	int master = -1;
+	for(w = 0; w < nworkers; w++)
+	{
+		if(current_worker_id == -1)
+		{
+			final_workerids[nfinal_workerids++] = workerids[w];
+			if(nfinal_workerids == nworkers - 1)
+			{
+				master = workerids[nfinal_workerids];
+				break;
+			}
+		}
+		else
+		{
+			if(workerids[w] != current_worker_id)
+				final_workerids[nfinal_workerids++] = workerids[w];
+			else
+			{
+				if(nfinal_workerids == nworkers - 1)
+				{
+					master = workerids[nfinal_workerids];
+					break;
+				}
+				else
+					master = current_worker_id;
+			}	
+		}
+	}
+	/* get starpu workers to sleep */
+	_starpu_sched_ctx_get_workers_to_sleep(sched_ctx_id, final_workerids, nfinal_workerids, master);
+
+	/* bind current thread on all workers of the context */
+//	_starpu_sched_ctx_bind_thread_to_ctx_cpus(sched_ctx_id);
+	return master;
+}
+
+void starpu_sched_ctx_unbook_workers_for_task(unsigned sched_ctx_id, int master)
+{
+	/* wake up starpu workers */
+	_starpu_sched_ctx_wake_up_workers(sched_ctx_id, master);
+}

+ 0 - 3
src/core/sched_ctx.h

@@ -180,9 +180,6 @@ starpu_pthread_rwlock_t* _starpu_sched_ctx_get_changing_ctx_mutex(unsigned sched
    (if it is the last one awake in a context he should better keep awake) */
    (if it is the last one awake in a context he should better keep awake) */
 unsigned _starpu_sched_ctx_last_worker_awake(struct _starpu_worker *worker);
 unsigned _starpu_sched_ctx_last_worker_awake(struct _starpu_worker *worker);
 
 
-/*rebind each thread on its cpu after finishing a parallel code */
-void _starpu_sched_ctx_rebind_thread_to_its_cpu(unsigned cpuid);
-
 /* let the appl know that the worker blocked to execute parallel code */
 /* let the appl know that the worker blocked to execute parallel code */
 void _starpu_sched_ctx_signal_worker_blocked(int workerid);
 void _starpu_sched_ctx_signal_worker_blocked(int workerid);
 
 

+ 2 - 0
src/core/workers.c

@@ -269,6 +269,7 @@ static int _starpu_can_use_nth_implementation(enum starpu_worker_archtype arch,
 
 
 int starpu_worker_can_execute_task(unsigned workerid, struct starpu_task *task, unsigned nimpl)
 int starpu_worker_can_execute_task(unsigned workerid, struct starpu_task *task, unsigned nimpl)
 {
 {
+	if(config.workers[workerid].parallel_sect) return 0;
 	/* TODO: check that the task operand sizes will fit on that device */
 	/* TODO: check that the task operand sizes will fit on that device */
 	return (task->cl->where & config.workers[workerid].worker_mask) &&
 	return (task->cl->where & config.workers[workerid].worker_mask) &&
 		_starpu_can_use_nth_implementation(config.workers[workerid].arch, task->cl, nimpl) &&
 		_starpu_can_use_nth_implementation(config.workers[workerid].arch, task->cl, nimpl) &&
@@ -448,6 +449,7 @@ static void _starpu_worker_init(struct _starpu_worker *workerarg, struct _starpu
 	workerarg->reverse_phase[1] = 0;
 	workerarg->reverse_phase[1] = 0;
 	workerarg->pop_ctx_priority = 1;
 	workerarg->pop_ctx_priority = 1;
 	workerarg->sched_mutex_locked = 0;
 	workerarg->sched_mutex_locked = 0;
+	workerarg->master = -1;
 
 
 	/* cpu_set/hwloc_cpu_set initialized in topology.c */
 	/* cpu_set/hwloc_cpu_set initialized in topology.c */
 }
 }

+ 2 - 0
src/core/workers.h

@@ -117,6 +117,8 @@ LIST_TYPE(_starpu_worker,
 
 
 	/* flag to know if sched_mutex is locked or not */
 	/* flag to know if sched_mutex is locked or not */
 	unsigned sched_mutex_locked;
 	unsigned sched_mutex_locked;
+	
+	int master;
 #ifdef __GLIBC__
 #ifdef __GLIBC__
 	cpu_set_t cpu_set;
 	cpu_set_t cpu_set;
 #endif /* __GLIBC__ */
 #endif /* __GLIBC__ */

+ 1 - 1
src/drivers/driver_common/driver_common.c

@@ -203,7 +203,7 @@ struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *args, int wor
 		STARPU_PTHREAD_MUTEX_LOCK(&args->parallel_sect_mutex);
 		STARPU_PTHREAD_MUTEX_LOCK(&args->parallel_sect_mutex);
 		_starpu_sched_ctx_signal_worker_blocked(args->workerid);
 		_starpu_sched_ctx_signal_worker_blocked(args->workerid);
 		STARPU_PTHREAD_COND_WAIT(&args->parallel_sect_cond, &args->parallel_sect_mutex);
 		STARPU_PTHREAD_COND_WAIT(&args->parallel_sect_cond, &args->parallel_sect_mutex);
-		_starpu_sched_ctx_rebind_thread_to_its_cpu(args->bindid);
+		starpu_sched_ctx_bind_current_thread_to_cpuid(args->bindid);
 		STARPU_PTHREAD_MUTEX_UNLOCK(&args->parallel_sect_mutex);
 		STARPU_PTHREAD_MUTEX_UNLOCK(&args->parallel_sect_mutex);
 		args->parallel_sect = 0;
 		args->parallel_sect = 0;
 	}
 	}

+ 0 - 6
src/sched_policies/deque_modeling_policy_data_aware.c

@@ -719,10 +719,6 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio, unsigned sch
 		while(workers->has_next(workers, &it))
 		while(workers->has_next(workers, &it))
 		{
 		{
 			worker = workers->get_next(workers, &it);
 			worker = workers->get_next(workers, &it);
-			if (worker >= nworkers_ctx)
-				/* This is a just-added worker, discard it */
-				continue;
-
 			for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
 			for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
 			{
 			{
 				if (!starpu_worker_can_execute_task(worker, task, nimpl))
 				if (!starpu_worker_can_execute_task(worker, task, nimpl))
@@ -730,8 +726,6 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio, unsigned sch
 					/* no one on that queue may execute this task */
 					/* no one on that queue may execute this task */
 					continue;
 					continue;
 				}
 				}
-
-
 				fitness[worker_ctx][nimpl] = dt->alpha*(exp_end[worker_ctx][nimpl] - best_exp_end)
 				fitness[worker_ctx][nimpl] = dt->alpha*(exp_end[worker_ctx][nimpl] - best_exp_end)
 					+ dt->beta*(local_data_penalty[worker_ctx][nimpl])
 					+ dt->beta*(local_data_penalty[worker_ctx][nimpl])
 					+ dt->_gamma*(local_power[worker_ctx][nimpl]);
 					+ dt->_gamma*(local_power[worker_ctx][nimpl]);