Browse Source

allow ctx to execute parallel codes by reusing starpu workers (keep the workers alive instead of getting them to sleep) -> TODO try to factorize with parallel tasks on combined workers (doing practically the same, duplicate tasks and barrier_wait before & after exec)

Andra Hugo 10 years ago
parent
commit
afade57efa

+ 2 - 1
examples/Makefile.am

@@ -251,7 +251,8 @@ STARPU_EXAMPLES +=				\
 	openmp/vector_scal_omp			\
 	sched_ctx/sched_ctx_without_sched_policy\
 	sched_ctx/nested_sched_ctxs		\
-	sched_ctx/sched_ctx_without_sched_policy
+	sched_ctx/sched_ctx_without_sched_policy\
+	sched_ctx/sched_ctx_without_sched_policy_awake 
 
 if STARPU_LONG_CHECK
 STARPU_EXAMPLES +=				\

+ 1 - 1
examples/sched_ctx/sched_ctx.c

@@ -93,7 +93,7 @@ int main(int argc, char **argv)
 
 	/*create contexts however you want*/
 	unsigned sched_ctx1 = starpu_sched_ctx_create(procs1, nprocs1, "ctx1", STARPU_SCHED_CTX_POLICY_NAME, "eager", 0);
-	unsigned sched_ctx2 = starpu_sched_ctx_create(procs2, nprocs2, "ctx2", STARPU_SCHED_CTX_POLICY_NAME, "eager",  0);
+	unsigned sched_ctx2 = starpu_sched_ctx_create(procs2, nprocs2, "ctx2", STARPU_SCHED_CTX_POLICY_NAME, "eager", 0);
 
 	/*indicate what to do with the resources when context 2 finishes (it depends on your application)*/
 	starpu_sched_ctx_set_inheritor(sched_ctx2, sched_ctx1);

+ 171 - 0
examples/sched_ctx/sched_ctx_without_sched_policy_awake.c

@@ -0,0 +1,171 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2010-2013  Université de Bordeaux 1
+ * Copyright (C) 2010-2014  Centre National de la Recherche Scientifique
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <starpu.h>
+#include <omp.h>
+
+#ifdef STARPU_QUICK_CHECK
+#define NTASKS 64
+#else
+#define NTASKS 100
+#endif
+
+
+starpu_pthread_mutex_t mut;
+
+int tasks_executed[2][STARPU_NMAXWORKERS];
+int parallel_code(int sched_ctx)
+{
+	int i;
+	int t = 0;
+	int workerid = starpu_worker_get_id();
+	for(i = 0; i < NTASKS; i++)
+		t++;
+	tasks_executed[sched_ctx-1][workerid] = t;
+//	printf("executed %d tasks on worker %d of sched_ctx %d \n", t, workerid, sched_ctx);
+
+	return t;
+}
+
+static void sched_ctx_func(void *descr[] STARPU_ATTRIBUTE_UNUSED, void *arg)
+{
+	unsigned sched_ctx = (unsigned)arg;
+	parallel_code(sched_ctx);
+}
+
+
+static struct starpu_codelet sched_ctx_codelet =
+{
+	.cpu_funcs = {sched_ctx_func, NULL},
+	.cuda_funcs = { NULL},
+	.opencl_funcs = {NULL},
+	.model = NULL,
+	.nbuffers = 0,
+	.name = "sched_ctx"
+};
+
+
+int main(int argc, char **argv)
+{
+	int i;
+	for(i = 0; i < STARPU_NMAXWORKERS; i++)
+	{
+		tasks_executed[0][i] = 0;
+		tasks_executed[1][i] = 0;
+	}
+	int ntasks = NTASKS;
+	int ret, j, k;
+	unsigned ncpus = 0;
+
+	ret = starpu_init(NULL);
+	if (ret == -ENODEV)
+		return 77;
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
+
+	starpu_pthread_mutex_init(&mut, NULL);
+	int nprocs1 = 1;
+	int nprocs2 = 1;
+	int *procs1, *procs2;
+
+#ifdef STARPU_USE_CPU
+	ncpus = starpu_cpu_worker_get_count();
+	procs1 = (int*)malloc(ncpus*sizeof(int));
+	starpu_worker_get_ids_by_type(STARPU_CPU_WORKER, procs1, ncpus);
+
+	if(ncpus > 1)
+	{
+		nprocs1 = ncpus/2;
+		nprocs2 =  ncpus-nprocs1;
+		k = 0;
+		procs2 = (int*)malloc(nprocs2*sizeof(int));
+		for(j = nprocs1; j < nprocs1+nprocs2; j++)
+			procs2[k++] = procs1[j];
+	}
+	else
+	{
+		procs2 = (int*)malloc(nprocs2*sizeof(int));
+		procs2[0] = procs1[0];
+
+	}
+#endif
+
+	if (ncpus == 0) goto enodev;
+
+	/*create contexts however you want*/
+	unsigned sched_ctx1 = starpu_sched_ctx_create(procs1, nprocs1, "ctx1", STARPU_SCHED_CTX_AWAKE_WORKERS, 0);
+	unsigned sched_ctx2 = starpu_sched_ctx_create(procs2, nprocs2, "ctx2", STARPU_SCHED_CTX_AWAKE_WORKERS, 0);
+
+
+	for (i = 0; i < ntasks; i++)
+	{
+		struct starpu_task *task = starpu_task_create();
+
+		task->cl = &sched_ctx_codelet;
+		task->cl_arg = sched_ctx1;
+
+		/*submit tasks to context*/
+		ret = starpu_task_submit_to_ctx(task,sched_ctx1);
+
+		STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");
+	}
+
+	for (i = 0; i < ntasks; i++)
+	{
+		struct starpu_task *task = starpu_task_create();
+
+		task->cl = &sched_ctx_codelet;
+		task->cl_arg = sched_ctx2;
+
+		/*submit tasks to context*/
+		ret = starpu_task_submit_to_ctx(task,sched_ctx2);
+
+		STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");
+	}
+
+
+	/* tell starpu when you finished submitting tasks to this context
+	   in order to allow moving resources from this context to the inheritor one
+	   when its corresponding tasks finished executing */
+
+
+
+	/* wait for all tasks at the end*/
+	starpu_task_wait_for_all();
+
+	starpu_sched_ctx_delete(sched_ctx1);
+	starpu_sched_ctx_delete(sched_ctx2);
+
+	int tasks_per_ctx[2];
+	tasks_per_ctx[0] = 0;
+	tasks_per_ctx[1] = 0;
+	for(i = 0; i < STARPU_NMAXWORKERS; i++)
+	{
+		tasks_per_ctx[0] += tasks_executed[0][i];
+		tasks_per_ctx[1] += tasks_executed[1][i];
+	}
+
+	printf("ctx%d: tasks starpu executed %d out of %d\n", sched_ctx1, tasks_per_ctx[0]/nprocs1, NTASKS);
+	printf("ctx%d: tasks starpu executed %d out of %d\n", sched_ctx2, tasks_per_ctx[1]/nprocs2, NTASKS);
+
+enodev:
+#ifdef STARPU_USE_CPU
+	free(procs1);
+	free(procs2);
+#endif
+	starpu_shutdown();
+	return ncpus == 0 ? 77 : 0;
+}

+ 1 - 0
include/starpu_sched_ctx.h

@@ -30,6 +30,7 @@ extern "C"
 #define STARPU_SCHED_CTX_POLICY_MAX_PRIO	 (4<<16)
 #define STARPU_SCHED_CTX_HIERARCHY_LEVEL         (5<<16)
 #define STARPU_SCHED_CTX_NESTED                  (6<<16)
+#define STARPU_SCHED_CTX_AWAKE_WORKERS           (7<<16)
 
 unsigned starpu_sched_ctx_create(int *workerids_ctx, int nworkers_ctx, const char *sched_ctx_name, ...);
 

+ 20 - 1
src/core/jobs.c

@@ -151,7 +151,27 @@ void _starpu_wait_job(struct _starpu_job *j)
 void _starpu_handle_job_termination(struct _starpu_job *j)
 {
 	struct starpu_task *task = j->task;
+	/* if sched_ctx without policy and awake workers, task may be destroyed in handle_job_termination by the master
+	   so pointless to continue */
+	if(!j->task) return;
+
 	unsigned sched_ctx = task->sched_ctx;
+	int workerid = starpu_worker_get_id();
+	/* if parallel task (managed by a context) only the master should execute this function */
+	struct _starpu_sched_ctx *sched_ctx_str = _starpu_get_sched_ctx_struct(sched_ctx);
+	if(!sched_ctx_str->sched_policy && sched_ctx_str->awake_workers)
+	{
+		if(sched_ctx_str->main_master != workerid)
+		{
+			return;
+		}
+		else
+		{
+			STARPU_PTHREAD_BARRIER_DESTROY(&j->before_work_barrier);
+			STARPU_PTHREAD_BARRIER_DESTROY(&j->after_work_barrier);
+		}
+	}
+
 	double flops = task->flops;
 	STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
 
@@ -167,7 +187,6 @@ void _starpu_handle_job_termination(struct _starpu_job *j)
 
 #ifdef STARPU_USE_SC_HYPERVISOR
 	size_t data_size = 0;
-	int workerid = starpu_worker_get_id();
 #endif //STARPU_USE_SC_HYPERVISOR
 
 	/* We release handle reference count */

+ 44 - 9
src/core/sched_ctx.c

@@ -34,6 +34,8 @@ size_t data_size[STARPU_NMAX_SCHED_CTXS][STARPU_NMAXWORKERS];
 static unsigned _starpu_get_first_free_sched_ctx(struct _starpu_machine_config *config);
 static void _starpu_sched_ctx_add_workers_to_master(unsigned sched_ctx_id, int *workerids, int nworkers, int new_master);
 static void _starpu_sched_ctx_wake_these_workers_up(unsigned sched_ctx_id, int *workerids, int nworkers);
+static int _starpu_sched_ctx_find_master(unsigned sched_ctx_id, int *workerids, int nworkers);
+static void _starpu_sched_ctx_set_master(struct _starpu_sched_ctx *sched_ctx, int *workerids, int nworkers, int master);
 
 static void _starpu_worker_gets_into_ctx(unsigned sched_ctx_id, struct _starpu_worker *worker)
 {
@@ -257,11 +259,19 @@ static void _starpu_add_workers_to_sched_ctx(struct _starpu_sched_ctx *sched_ctx
 
 	if(!sched_ctx->sched_policy)
 	{
-		if(sched_ctx->main_master == -1)
-			sched_ctx->main_master = starpu_sched_ctx_book_workers_for_task(sched_ctx->id, workerids, nworkers);
+		if(!sched_ctx->awake_workers)
+		{
+			if(sched_ctx->main_master == -1)
+				sched_ctx->main_master = starpu_sched_ctx_book_workers_for_task(sched_ctx->id, workerids, nworkers);
+			else
+			{
+				_starpu_sched_ctx_add_workers_to_master(sched_ctx->id, workerids, nworkers, sched_ctx->main_master);
+			}
+		}
 		else
 		{
-			_starpu_sched_ctx_add_workers_to_master(sched_ctx->id, workerids, nworkers, sched_ctx->main_master);
+			sched_ctx->main_master = _starpu_sched_ctx_find_master(sched_ctx->id, workerids, nworkers);
+			_starpu_sched_ctx_set_master(sched_ctx, workerids, nworkers, sched_ctx->main_master);
 		}
 	}
 	else if(sched_ctx->sched_policy->add_workers)
@@ -375,7 +385,12 @@ static void _starpu_remove_workers_from_sched_ctx(struct _starpu_sched_ctx *sche
 	}
 
 	if(!sched_ctx->sched_policy)
-		_starpu_sched_ctx_wake_these_workers_up(sched_ctx->id, removed_workers, *n_removed_workers);
+	{
+		if(!sched_ctx->awake_workers)
+		{
+			_starpu_sched_ctx_wake_these_workers_up(sched_ctx->id, removed_workers, *n_removed_workers);
+		}
+	}
 
 	return;
 }
@@ -428,7 +443,7 @@ struct _starpu_sched_ctx* _starpu_create_sched_ctx(struct starpu_sched_policy *p
 						   int nworkers_ctx, unsigned is_initial_sched,
 						   const char *sched_ctx_name,
 						   int min_prio_set, int min_prio,
-						   int max_prio_set, int max_prio)
+						   int max_prio_set, int max_prio, unsigned awake_workers)
 {
 	struct _starpu_machine_config *config = (struct _starpu_machine_config *)_starpu_get_machine_config();
 
@@ -487,9 +502,15 @@ struct _starpu_sched_ctx* _starpu_create_sched_ctx(struct starpu_sched_policy *p
 
         /*init the strategy structs and the worker_collection of the ressources of the context */
 	if(policy)
+	{
 		_starpu_init_sched_policy(config, sched_ctx, policy);
+		sched_ctx->awake_workers = 1;
+	}
 	else
+	{
+		sched_ctx->awake_workers = awake_workers;
 		starpu_sched_ctx_create_worker_collection(sched_ctx->id, STARPU_WORKER_LIST);
+	}
 
 	/* after having an worker_collection on the ressources add them */
 	_starpu_add_workers_to_sched_ctx(sched_ctx, workerids, nworkers_ctx, NULL, NULL);
@@ -655,7 +676,7 @@ unsigned starpu_sched_ctx_create_inside_interval(const char *policy_name, const
 	for(i = 0; i < nw; i++)
 		printf("%d ", workers[i]);
 	printf("\n");
-	sched_ctx = _starpu_create_sched_ctx(selected_policy, workers, nw, 0, sched_ctx_name, 0, 0, 0, 0);
+	sched_ctx = _starpu_create_sched_ctx(selected_policy, workers, nw, 0, sched_ctx_name, 0, 0, 0, 0, 1);
 	sched_ctx->min_ncpus = min_ncpus;
 	sched_ctx->max_ncpus = max_ncpus;
 	sched_ctx->min_ngpus = min_ngpus;
@@ -684,6 +705,7 @@ unsigned starpu_sched_ctx_create(int *workerids, int nworkers, const char *sched
 	struct starpu_sched_policy *sched_policy = NULL;
 	unsigned hierarchy_level = 0;
 	unsigned nesting_sched_ctx = STARPU_NMAX_SCHED_CTXS;
+	unsigned awake_workers = 0;
 
 	va_start(varg_list, sched_ctx_name);
 	while ((arg_type = va_arg(varg_list, int)) != 0)
@@ -716,6 +738,10 @@ unsigned starpu_sched_ctx_create(int *workerids, int nworkers, const char *sched
 		{
 			nesting_sched_ctx = va_arg(varg_list, unsigned);
 		}
+		else if (arg_type == STARPU_SCHED_CTX_AWAKE_WORKERS)
+		{
+			awake_workers = 1;
+		}
 		else
 		{
 			STARPU_ABORT_MSG("Unrecognized argument %d\n", arg_type);
@@ -725,7 +751,7 @@ unsigned starpu_sched_ctx_create(int *workerids, int nworkers, const char *sched
 	va_end(varg_list);
 
 	struct _starpu_sched_ctx *sched_ctx = NULL;
-	sched_ctx = _starpu_create_sched_ctx(sched_policy, workerids, nworkers, 0, sched_ctx_name, min_prio_set, min_prio, max_prio_set, max_prio);
+	sched_ctx = _starpu_create_sched_ctx(sched_policy, workerids, nworkers, 0, sched_ctx_name, min_prio_set, min_prio, max_prio_set, max_prio, awake_workers);
 	sched_ctx->hierarchy_level = hierarchy_level;
 	sched_ctx->nesting_sched_ctx = nesting_sched_ctx;
 
@@ -1804,14 +1830,13 @@ unsigned starpu_sched_ctx_worker_is_master_for_child_ctx(int workerid, unsigned
 
 }
 
-/* if the worker is the master of a parallel context, and the job is meant to be executed on this parallel context, return a pointer to the context */
 struct _starpu_sched_ctx *_starpu_sched_ctx_get_sched_ctx_for_worker_and_job(struct _starpu_worker *worker, struct _starpu_job *j)
 {
 	struct _starpu_sched_ctx_list *l = NULL;
 	for (l = worker->sched_ctx_list; l; l = l->next)
 	{
 		struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(l->sched_ctx);
-		if (sched_ctx->main_master == worker->workerid && j->task->sched_ctx == sched_ctx->id)
+		if (j->task->sched_ctx == sched_ctx->id)
 			return sched_ctx;
 	}
 	return NULL;
@@ -2115,6 +2140,16 @@ static void _starpu_sched_ctx_add_workers_to_master(unsigned sched_ctx_id, int *
 
 }
 
+static void _starpu_sched_ctx_set_master(struct _starpu_sched_ctx *sched_ctx, int *workerids, int nworkers, int master)
+{
+	int i;
+	for(i = 0; i < nworkers; i++)
+	{
+		if(workerids[i] != master)
+			sched_ctx->master[workerids[i]] = master;
+	}
+}
+
 int starpu_sched_ctx_book_workers_for_task(unsigned sched_ctx_id, int *workerids, int nworkers)
 {
 	int new_master = _starpu_sched_ctx_find_master(sched_ctx_id, workerids, nworkers);

+ 6 - 1
src/core/sched_ctx.h

@@ -152,6 +152,11 @@ struct _starpu_sched_ctx
 
 	/* perf model for the device comb of the ctx */
 	struct starpu_perfmodel_arch perf_arch;
+
+	/* for ctxs without policy: flag to indicate that we want to get
+	   the threads to sleep in order to replace them with other threads or leave
+	   them awake & use them in the parallel code*/
+	unsigned awake_workers;
 };
 
 struct _starpu_machine_config;
@@ -162,7 +167,7 @@ void _starpu_init_all_sched_ctxs(struct _starpu_machine_config *config);
 /* allocate all structures belonging to a context */
 struct _starpu_sched_ctx*  _starpu_create_sched_ctx(struct starpu_sched_policy *policy, int *workerid, int nworkerids, unsigned is_init_sched, const char *sched_name,
 						    int min_prio_set, int min_prio,
-						    int max_prio_set, int max_prio);
+						    int max_prio_set, int max_prio, unsigned awake_workers);
 
 /* delete all sched_ctx */
 void _starpu_delete_all_sched_ctxs();

+ 33 - 2
src/core/sched_policy.c

@@ -427,7 +427,7 @@ int _starpu_push_task_to_workers(struct starpu_task *task)
 
 	_starpu_profiling_set_task_push_start_time(task);
 
-	int ret;
+	int ret = 0;
 	if (STARPU_UNLIKELY(task->execute_on_a_specific_worker))
 	{
 		unsigned node = starpu_worker_get_memory_node(task->workerid);
@@ -458,7 +458,38 @@ int _starpu_push_task_to_workers(struct starpu_task *task)
 
 		if(!sched_ctx->sched_policy)
 		{
-			ret = _starpu_push_task_on_specific_worker(task, sched_ctx->main_master);
+			if(!sched_ctx->awake_workers)
+				ret = _starpu_push_task_on_specific_worker(task, sched_ctx->main_master);
+			else
+			{
+				struct starpu_worker_collection *workers = sched_ctx->workers;
+				
+				struct _starpu_job *job = _starpu_get_job_associated_to_task(task);
+				
+				STARPU_PTHREAD_BARRIER_INIT(&job->before_work_barrier, NULL, workers->nworkers);
+				STARPU_PTHREAD_BARRIER_INIT(&job->after_work_barrier, NULL, workers->nworkers);
+				
+				/* Note: we have to call that early, or else the task may have
+				 * disappeared already */
+				starpu_push_task_end(task);
+
+				unsigned workerid;
+				struct starpu_sched_ctx_iterator it;
+				if(workers->init_iterator)
+					workers->init_iterator(workers, &it);
+
+				while(workers->has_next(workers, &it))
+				{
+					workerid = workers->get_next(workers, &it);
+					if(workerid != sched_ctx->main_master)
+					{
+						struct starpu_task *alias = starpu_task_dup(task);
+						ret |= _starpu_push_task_on_specific_worker(alias, workerid);
+					}
+					else
+						ret |= _starpu_push_task_on_specific_worker(task, workerid);
+				}
+			}
 		}
 		else
 		{

+ 3 - 2
src/core/workers.c

@@ -287,7 +287,8 @@ static int _starpu_can_use_nth_implementation(enum starpu_worker_archtype arch,
 int starpu_worker_can_execute_task(unsigned workerid, struct starpu_task *task, unsigned nimpl)
 {
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
-	if(sched_ctx->parallel_sect[workerid]) return 0;
+//	if(sched_ctx->parallel_sect[workerid] || (!sched_ctx->policy && workerid != sched_ctx->main_master)) return 0;
+	if(!sched_ctx->sched_policy && workerid != sched_ctx->main_master) return 0;
 	/* TODO: check that the task operand sizes will fit on that device */
 	return (task->cl->where & config.workers[workerid].worker_mask) &&
 		_starpu_can_use_nth_implementation(config.workers[workerid].arch, task->cl, nimpl) &&
@@ -1109,7 +1110,7 @@ int starpu_initialize(struct starpu_conf *user_conf, int *argc, char ***argv)
 	if (!is_a_sink)
 	{
 		struct starpu_sched_policy *selected_policy = _starpu_select_sched_policy(&config, config.conf->sched_policy_name);
-		_starpu_create_sched_ctx(selected_policy, NULL, -1, 1, "init", 0, 0, 0, 0);
+		_starpu_create_sched_ctx(selected_policy, NULL, -1, 1, "init", 0, 0, 0, 0, 1);
 	}
 
 	_starpu_initialize_registered_performance_models();

+ 14 - 0
src/datawizard/coherency.c

@@ -25,6 +25,7 @@
 #include <math.h>
 #include <core/task.h>
 #include <starpu_scheduler.h>
+#include <core/workers.h>
 
 static int link_supports_direct_transfers(starpu_data_handle_t handle, unsigned src_node, unsigned dst_node, unsigned *handling_node);
 int _starpu_select_src_node(starpu_data_handle_t handle, unsigned destination)
@@ -816,6 +817,19 @@ enomem:
 
 void _starpu_push_task_output(struct _starpu_job *j)
 {
+	/* if sched_ctx without policy and awake workers, task may be destroyed in handle_job_termination by the master
+	   so pointless to continue */
+	if(!j->task) return;
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(j->task->sched_ctx);
+	STARPU_ASSERT_MSG(sched_ctx != NULL, "there should be a the ctx of this job \n");
+
+	if (!sched_ctx->sched_policy)
+	{
+		int workerid = starpu_worker_get_id();
+       		if(sched_ctx->main_master != workerid)
+			return;
+	}
+
 	_STARPU_TRACE_START_PUSH_OUTPUT(NULL);
 
 	int profiling = starpu_profiling_status_get();

+ 57 - 16
src/drivers/driver_common/driver_common.c

@@ -73,27 +73,42 @@ void _starpu_driver_start_job(struct _starpu_worker *args, struct _starpu_job *j
 	if (starpu_top)
 		_starpu_top_task_started(task,workerid,codelet_start);
 
-	_STARPU_TRACE_START_CODELET_BODY(j, j->nimpl, perf_arch, workerid);
 
 	// Find out if the worker is the master of a parallel context
 	struct _starpu_sched_ctx *sched_ctx = _starpu_sched_ctx_get_sched_ctx_for_worker_and_job(args, j);
-	if (sched_ctx)
+	STARPU_ASSERT_MSG(sched_ctx != NULL, "there should be a worker %d in the ctx of this job \n", args->workerid);
+	if(!sched_ctx->sched_policy) 
 	{
-		struct starpu_worker_collection *workers = sched_ctx->workers;
-		struct starpu_sched_ctx_iterator it;
-
-		if (workers->init_iterator)
-			workers->init_iterator(workers, &it);
-		while (workers->has_next(workers, &it))
+		if(sched_ctx->awake_workers)
 		{
-			int _workerid = workers->get_next(workers, &it);
-			if (_workerid != workerid)
+			STARPU_PTHREAD_BARRIER_WAIT(&j->before_work_barrier);
+		}
+		else
+		{
+			if(sched_ctx->main_master == args->workerid)
 			{
-				struct _starpu_worker *worker = _starpu_get_worker_struct(_workerid);
-				_starpu_driver_start_job(worker, j, &worker->perf_arch, codelet_start, rank, profiling);
+				struct starpu_worker_collection *workers = sched_ctx->workers;
+				struct starpu_sched_ctx_iterator it;
+				
+				if (workers->init_iterator)
+					workers->init_iterator(workers, &it);
+				while (workers->has_next(workers, &it))
+				{
+					int _workerid = workers->get_next(workers, &it);
+					if (_workerid != workerid)
+					{
+						struct _starpu_worker *worker = _starpu_get_worker_struct(_workerid);
+						_starpu_driver_start_job(worker, j, &worker->perf_arch, codelet_start, rank, profiling);
+					}
+				}
 			}
 		}
+		if(sched_ctx->main_master == args->workerid)
+			/* if the worker is the master of a ctx trace the perf_arch of the context */
+			_STARPU_TRACE_START_CODELET_BODY(j, j->nimpl, sched_ctx->perf_arch, workerid);
 	}
+	else
+		_STARPU_TRACE_START_CODELET_BODY(j, j->nimpl, perf_arch, workerid);
 }
 
 void _starpu_driver_end_job(struct _starpu_worker *args, struct _starpu_job *j, struct starpu_perfmodel_arch* perf_arch STARPU_ATTRIBUTE_UNUSED, struct timespec *codelet_end, int rank, int profiling)
@@ -105,7 +120,20 @@ void _starpu_driver_end_job(struct _starpu_worker *args, struct _starpu_job *j,
 	int workerid = args->workerid;
 	unsigned calibrate_model = 0;
 
-	_STARPU_TRACE_END_CODELET_BODY(j, j->nimpl, perf_arch, workerid);
+	// Find out if the worker is the master of a parallel context
+	struct _starpu_sched_ctx *sched_ctx = _starpu_sched_ctx_get_sched_ctx_for_worker_and_job(args, j);
+	STARPU_ASSERT_MSG(sched_ctx != NULL, "there should be a worker %d in the ctx of this job \n", args->workerid);
+
+	if(!sched_ctx->sched_policy && sched_ctx->awake_workers)
+		STARPU_PTHREAD_BARRIER_WAIT(&j->after_work_barrier);
+
+	if (!sched_ctx->sched_policy)
+	{
+		if(sched_ctx->main_master == args->workerid)
+			_STARPU_TRACE_END_CODELET_BODY(j, j->nimpl, sched_ctx->perf_arch, workerid);
+	}
+	else
+		_STARPU_TRACE_END_CODELET_BODY(j, j->nimpl, perf_arch, workerid);
 
 	if (cl && cl->model && cl->model->benchmarking)
 		calibrate_model = 1;
@@ -124,9 +152,8 @@ void _starpu_driver_end_job(struct _starpu_worker *args, struct _starpu_job *j,
 
 	args->status = STATUS_UNKNOWN;
 
-	// Find out if the worker is the master of a parallel context
-	struct _starpu_sched_ctx *sched_ctx = _starpu_sched_ctx_get_sched_ctx_for_worker_and_job(args, j);
-	if (sched_ctx)
+	if(!sched_ctx->sched_policy && !sched_ctx->awake_workers && 
+	   sched_ctx->main_master == args->workerid)
 	{
 		struct starpu_worker_collection *workers = sched_ctx->workers;
 		struct starpu_sched_ctx_iterator it;
@@ -149,6 +176,20 @@ void _starpu_driver_update_job_feedback(struct _starpu_job *j, struct _starpu_wo
 					struct starpu_perfmodel_arch* perf_arch,
 					struct timespec *codelet_start, struct timespec *codelet_end, int profiling)
 {
+	/* if sched_ctx without policy and awake workers, task may be destroyed in handle_job_termination by the master
+	   so pointless to continue */
+	if(!j->task) return;
+	struct _starpu_sched_ctx *sched_ctx = _starpu_sched_ctx_get_sched_ctx_for_worker_and_job(worker_args, j);
+	STARPU_ASSERT_MSG(sched_ctx != NULL, "there should be a worker %d in the ctx of this job \n", worker_args->workerid);
+
+	if (!sched_ctx->sched_policy)
+	{
+       		if(sched_ctx->main_master == worker_args->workerid)
+			*perf_arch = sched_ctx->perf_arch;
+		else
+			return;
+	}
+
 	struct starpu_profiling_task_info *profiling_info = j->task->profiling_info;
 	struct timespec measured_ts;
 	double measured;