Pārlūkot izejas kodu

parallel heft - it doesn't work for several ctxs because of combined
workers

Andra Hugo 13 gadi atpakaļ
vecāks
revīzija
b5231de2d4
3 mainītis faili ar 170 papildinājumiem un 119 dzēšanām
  1. 1 1
      src/Makefile.am
  2. 14 6
      src/sched_policies/heft.c
  3. 155 112
      src/sched_policies/parallel_heft.c

+ 1 - 1
src/Makefile.am

@@ -162,6 +162,7 @@ libstarpu_@STARPU_EFFECTIVE_VERSION@_la_SOURCES = 						\
 	sched_policies/random_policy.c				\
 	sched_policies/work_stealing_policy.c			\
 	sched_policies/parallel_greedy.c			\
+	sched_policies/parallel_heft.c				\
 	sched_policies/detect_combined_workers.c		\
 	drivers/driver_common/driver_common.c			\
 	datawizard/memory_nodes.c				\
@@ -214,7 +215,6 @@ libstarpu_@STARPU_EFFECTIVE_VERSION@_la_SOURCES = 						\
 	top/starpu_top_message_queue.c				\
 	top/starpu_top_connection.c                          	\
 	worker_collection/worker_list.c
-#	sched_policies/parallel_heft.c				
 
 if STARPU_USE_CPU
 libstarpu_@STARPU_EFFECTIVE_VERSION@_la_SOURCES += drivers/cpu/driver_cpu.c

+ 14 - 6
src/sched_policies/heft.c

@@ -304,22 +304,26 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 	while(workers->has_next(workers))
 	{
 		worker = workers->get_next(workers);
+		unsigned incremented = 0;
 		for (nimpl = 0; nimpl <STARPU_MAXIMPLEMENTATIONS; nimpl++) 
 		{
 			/* Sometimes workers didn't take the tasks as early as we expected */
 			pthread_mutex_t *sched_mutex;
 			pthread_cond_t *sched_cond;
 			starpu_worker_get_sched_condition(sched_ctx_id, worker, &sched_mutex, &sched_cond);
-			_STARPU_PTHREAD_MUTEX_LOCK(&sched_mutex[worker]);
+			_STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
 			exp_start[worker] = STARPU_MAX(exp_start[worker], starpu_timing_now());
 			exp_end[worker_ctx][nimpl] = exp_start[worker] + exp_len[worker];
 			if (exp_end[worker_ctx][nimpl] > max_exp_end)
  				max_exp_end = exp_end[worker_ctx][nimpl];
-			_STARPU_PTHREAD_MUTEX_UNLOCK(&sched_mutex[worker]);
+			_STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
 			if (!starpu_worker_can_execute_task(worker, task, nimpl))
 			{
 				/* no one on that queue may execute this task */
-				worker_ctx++;
+
+				if(!incremented)
+					worker_ctx++;
+				incremented = 1;
 				continue;
 			}
 
@@ -385,7 +389,8 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 				local_power[worker_ctx][nimpl] = 0.;
 
 		}
-		worker_ctx++;
+		if(!incremented)
+			worker_ctx++;
 	}
 
 	*forced_worker = unknown?ntasks_best:-1;
@@ -503,12 +508,14 @@ static int _heft_push_task(struct starpu_task *task, unsigned prio, unsigned sch
 	while(workers->has_next(workers))
 	{
 		worker = workers->get_next(workers);
+		unsigned incremented = 0;
 		for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
 		{
 			if (!starpu_worker_can_execute_task(worker, task, nimpl))
 			{
 				/* no one on that queue may execute this task */
-				worker_ctx++;
+				if(!incremented)
+					worker_ctx++;
 				continue;
 			}
 
@@ -532,7 +539,8 @@ static int _heft_push_task(struct starpu_task *task, unsigned prio, unsigned sch
 				selected_impl = nimpl;
 			}
 		}
-		worker_ctx++;
+		if(!incremented)
+			worker_ctx++;
 	}
 
 	/* By now, we must have found a solution */

+ 155 - 112
src/sched_policies/parallel_heft.c

@@ -34,7 +34,7 @@
 #define DBL_MAX __DBL_MAX__
 #endif
 
-static unsigned nworkers, ncombinedworkers;
+static unsigned ncombinedworkers;
 //static enum starpu_perf_archtype applicable_perf_archtypes[STARPU_NARCH_VARIATIONS];
 //static unsigned napplicable_perf_archtypes = 0;
 
@@ -50,37 +50,47 @@ static double worker_exp_end[STARPU_NMAXWORKERS];
 static double worker_exp_len[STARPU_NMAXWORKERS];
 static int ntasks[STARPU_NMAXWORKERS];
 
-static void parallel_heft_post_exec_hook(struct starpu_task *task, unsigned sched_ctx_id)
+
+/*!!!!!!! It doesn't work with several contexts because the combined workers are constructed         
+  from the workers available to the program, and not to the context !!!!!!!!!!!!!!!!!!!!!!!          
+*/
+
+static void parallel_heft_post_exec_hook(struct starpu_task *task)
 {
 	if (!task->cl || task->execute_on_a_specific_worker)
 		return;
 
 	int workerid = starpu_worker_get_id();
 	double model = task->predicted;
-	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	unsigned sched_ctx_id = task->sched_ctx;
 	double transfer_model = task->predicted_transfer;
 
 	if (isnan(model))
 		model = 0.0;
 
+	pthread_mutex_t *sched_mutex;
+	pthread_cond_t *sched_cond;
+	starpu_worker_get_sched_condition(sched_ctx_id, workerid, &sched_mutex, &sched_cond);
+
 	/* Once we have executed the task, we can update the predicted amount
 	 * of work. */
-	_STARPU_PTHREAD_MUTEX_LOCK(sched_ctx->sched_mutex[workerid]);
+	_STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
 	worker_exp_len[workerid] -= model + transfer_model;
 	worker_exp_start[workerid] = starpu_timing_now();
 	worker_exp_end[workerid] = worker_exp_start[workerid] + worker_exp_len[workerid];
 	ntasks[workerid]--;
-	_STARPU_PTHREAD_MUTEX_UNLOCK(sched_ctx->sched_mutex[workerid]);
+	_STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
 }
 
-static int push_task_on_best_worker(struct starpu_task *task, int best_workerid, double exp_end_predicted, int prio, struct starpu_sched_ctx *sched_ctx)
+static int push_task_on_best_worker(struct starpu_task *task, int best_workerid, double exp_end_predicted, int prio, unsigned sched_ctx_id)
 {
 	/* make sure someone coule execute that task ! */
 	STARPU_ASSERT(best_workerid != -1);
 
+	_starpu_increment_nsubmitted_tasks_of_worker(best_workerid);
+
 	/* Is this a basic worker or a combined worker ? */
-//	int nbasic_workers = (int)starpu_worker_get_count();
-	int nbasic_workers = sched_ctx->nworkers;
+	int nbasic_workers = starpu_get_nworkers_of_sched_ctx(sched_ctx_id);
 	int is_basic_worker = (best_workerid < nbasic_workers);
 
 	unsigned memory_node;
@@ -96,13 +106,17 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 		task->predicted = exp_end_predicted - worker_exp_end[best_workerid];
 		/* TODO */
 		task->predicted_transfer = 0;
-		_STARPU_PTHREAD_MUTEX_LOCK(sched_ctx->sched_mutex[best_workerid]);
+		pthread_mutex_t *sched_mutex;
+		pthread_cond_t *sched_cond;
+		starpu_worker_get_sched_condition(sched_ctx_id, best_workerid, &sched_mutex, &sched_cond);
+
+		_STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
 		worker_exp_len[best_workerid] += task->predicted;
 		worker_exp_end[best_workerid] = exp_end_predicted;
 		worker_exp_start[best_workerid] = exp_end_predicted - worker_exp_len[best_workerid];
 
 		ntasks[best_workerid]++;
-		_STARPU_PTHREAD_MUTEX_UNLOCK(sched_ctx->sched_mutex[best_workerid]);
+		_STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
 
 		ret = starpu_push_local_task(best_workerid, task, prio);
 	}
@@ -132,14 +146,16 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 			alias->predicted = exp_end_predicted - worker_exp_end[local_worker];
 			/* TODO */
 			alias->predicted_transfer = 0;
-
-			_STARPU_PTHREAD_MUTEX_LOCK(sched_ctx->sched_mutex[local_worker]);
+			pthread_mutex_t *sched_mutex;
+			pthread_cond_t *sched_cond;
+			starpu_worker_get_sched_condition(sched_ctx_id, local_worker, &sched_mutex, &sched_cond);
+			_STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
 			worker_exp_len[local_worker] += alias->predicted;
 			worker_exp_end[local_worker] = exp_end_predicted;
 			worker_exp_start[local_worker] = exp_end_predicted - worker_exp_len[local_worker];
 
 			ntasks[local_worker]++;
-			_STARPU_PTHREAD_MUTEX_UNLOCK(sched_ctx->sched_mutex[local_worker]);
+			_STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
 
 			ret |= starpu_push_local_task(local_worker, alias, prio);
 		}
@@ -149,8 +165,9 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 	return ret;
 }
 
-static double compute_expected_end(int workerid, double length)
+static double compute_expected_end(int workerid, double length, unsigned sched_ctx_id)
 {
+	unsigned nworkers = starpu_get_nworkers_of_sched_ctx(sched_ctx_id);
 	if (workerid < (int)nworkers)
 	{
 		/* This is a basic worker */
@@ -178,8 +195,9 @@ static double compute_expected_end(int workerid, double length)
 	}
 }
 
-static double compute_ntasks_end(int workerid)
+static double compute_ntasks_end(int workerid, unsigned sched_ctx_id)
 {
+	unsigned nworkers = starpu_get_nworkers_of_sched_ctx(sched_ctx_id);
 	enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(workerid);
 	if (workerid < (int)nworkers)
 	{
@@ -208,11 +226,12 @@ static double compute_ntasks_end(int workerid)
 
 static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio, unsigned sched_ctx_id)
 {
-	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	pheft_data *hd = (pheft_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
 
-	pheft_data *hd = (pheft_data*)sched_ctx->policy_data;
-	unsigned nworkers_ctx = sched_ctx->nworkers;
-	unsigned worker, worker_ctx;
+	struct worker_collection *workers = starpu_get_worker_collection_of_sched_ctx(sched_ctx_id);
+	unsigned nworkers_ctx = workers->nworkers;
+
+	unsigned worker, worker_ctx = 0;
 	int best = -1, best_id_ctx = -1;
 	
 	/* this flag is set if the corresponding worker is selected because
@@ -238,29 +257,38 @@ static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio, uns
 
 	/* A priori, we know all estimations */
 	int unknown = 0;
+	if(workers->init_cursor)
+                workers->init_cursor(workers);
 
-	for (worker_ctx = 0; worker_ctx < nworkers_ctx; worker_ctx++)
-	{
-		worker = sched_ctx->workerids[worker_ctx];
+	while(workers->has_next(workers))
+        {
+                worker = workers->get_next(workers);
+
+		pthread_mutex_t *sched_mutex;
+		pthread_cond_t *sched_cond;
+		starpu_worker_get_sched_condition(sched_ctx_id, worker, &sched_mutex, &sched_cond);
 		/* Sometimes workers didn't take the tasks as early as we expected */
-		_STARPU_PTHREAD_MUTEX_LOCK(sched_ctx->sched_mutex[worker]);
+		_STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
 		worker_exp_start[worker] = STARPU_MAX(worker_exp_start[worker], starpu_timing_now());
 		worker_exp_end[worker] = worker_exp_start[worker] + worker_exp_len[worker];
 		if (worker_exp_end[worker] > max_exp_end)
 			max_exp_end = worker_exp_end[worker];
-		_STARPU_PTHREAD_MUTEX_UNLOCK(sched_ctx->sched_mutex[worker]);
+		_STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
 	}
 
 	unsigned nimpl;
-	for (worker_ctx = 0; worker_ctx < (nworkers_ctx + ncombinedworkers); worker_ctx++)
- 	{
-		worker = sched_ctx->workerids[worker_ctx];
+	while(workers->has_next(workers) || worker_ctx < (nworkers_ctx + ncombinedworkers))
+	{
+                worker = workers->has_next(workers) ? workers->get_next(workers) : worker_ctx;
+		unsigned incremented = 0;
 		for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
 		{
 			if (!starpu_combined_worker_can_execute_task(worker, task, nimpl))
 			{
 				/* no one on that queue may execute this task */
 				skip_worker[worker][nimpl] = 1;
+				if(!incremented)
+					worker_ctx++;
 				continue;
 			}
 			else
@@ -275,7 +303,7 @@ static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio, uns
 			unsigned memory_node = starpu_worker_get_memory_node(worker);
 			local_data_penalty[worker_ctx][nimpl] = starpu_task_expected_data_transfer_time(memory_node, task);
 
-			double ntasks_end = compute_ntasks_end(worker);
+			double ntasks_end = compute_ntasks_end(worker, sched_ctx_id);
 
 			if (ntasks_best == -1
 			    || (!calibrating && ntasks_end < ntasks_best_end) /* Not calibrating, take better task */
@@ -304,7 +332,7 @@ static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio, uns
 			if (unknown)
 				continue;
 
-			local_exp_end[worker_ctx][nimpl] = compute_expected_end(worker, local_task_length[worker_ctx][nimpl]);
+			local_exp_end[worker_ctx][nimpl] = compute_expected_end(worker, local_task_length[worker_ctx][nimpl], sched_ctx_id);
 
 			//fprintf(stderr, "WORKER %d -> length %e end %e\n", worker, local_task_length[worker_ctx][nimpl], local_exp_end[worker][nimpl]);
 
@@ -322,7 +350,9 @@ static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio, uns
 			if (isnan(local_power[worker_ctx][nimpl]))
 				local_power[worker_ctx][nimpl] = 0.;
 
-		} //end for
+		}
+		if(!incremented)
+			worker_ctx++;
 	}
 
 	if (unknown) {
@@ -333,20 +363,23 @@ static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio, uns
 
 	double best_fitness = -1;
 
-
 	if (forced_best == -1)
 	{
-		for (worker_ctx = 0; worker_ctx < nworkers_ctx + ncombinedworkers; worker_ctx++)
+		worker_ctx = 0;
+		while(workers->has_next(workers) || worker_ctx < (nworkers_ctx + ncombinedworkers))
 		{
-			/* if combinedworker don't search the id in the ctx */
-			worker = worker_ctx >= nworkers_ctx ? worker_ctx : 
-				sched_ctx->workerids[worker_ctx];
+			worker = workers->has_next(workers) ? workers->get_next(workers) : worker_ctx;
 
+			unsigned incremented = 0;
 			for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
 			{
 				if (skip_worker[worker_ctx][nimpl])
 				{
 					/* no one on that queue may execute this task */
+					if(!incremented)
+						worker_ctx++;
+
+					incremented = 1;
 					continue;
 				}
 
@@ -360,7 +393,7 @@ static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio, uns
 					 * consumption of other cpus */
 					fitness[worker_ctx][nimpl] += hd->_gamma * hd->idle_power * (local_exp_end[worker_ctx][nimpl] - max_exp_end) / 1000000.0;
 
-				if (best == -1 || fitness[worker_ctx] < best_fitness)
+				if (best == -1 || fitness[worker_ctx][nimpl] < best_fitness)
 				{
 					/* we found a better solution */
 					best_fitness = fitness[worker_ctx][nimpl];
@@ -371,6 +404,9 @@ static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio, uns
 
 			//	fprintf(stderr, "FITNESS worker %d -> %e local_exp_end %e - local_data_penalty %e\n", worker, fitness[worker][nimpl], local_exp_end[worker][nimpl] - best_exp_end, local_data_penalty[worker][nimpl]);
 			}
+			if(!incremented)
+				worker_ctx++;
+		}
 	}
 
 	STARPU_ASSERT(forced_best != -1 || best != -1);
@@ -384,7 +420,7 @@ static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio, uns
 		best_id_ctx = forced_best_ctx;
 		nimpl_best = forced_nimpl;
 		//penality_best = 0.0;
-		best_exp_end = compute_expected_end(best, 0);
+		best_exp_end = compute_expected_end(best, 0, sched_ctx_id);
 	}
 	else
 	{
@@ -396,27 +432,52 @@ static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio, uns
 	//_STARPU_DEBUG("Scheduler parallel heft: kernel (%u)\n", nimpl_best);
 	_starpu_get_job_associated_to_task(task)->nimpl = nimpl_best;
 	/* we should now have the best worker in variable "best" */
-	return push_task_on_best_worker(task, best, best_exp_end, prio, sched_ctx);
+	return push_task_on_best_worker(task, best, best_exp_end, prio, sched_ctx_id);
 }
 
-static int parallel_heft_push_task(struct starpu_task *task, unsigned sched_ctx_id)
+static int parallel_heft_push_task(struct starpu_task *task)
 {
+	unsigned sched_ctx_id = task->sched_ctx;
+	pthread_mutex_t *changing_ctx_mutex = starpu_get_changing_ctx_mutex(sched_ctx_id);
+	unsigned nworkers;
+	int ret_val = -1;
+
 	if (task->priority == STARPU_MAX_PRIO)
-		return _parallel_heft_push_task(task, 1, sched_ctx_id);
+	{  _STARPU_PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
+                nworkers = starpu_get_nworkers_of_sched_ctx(sched_ctx_id);
+                if(nworkers == 0)
+                {
+                        _STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
+                        return ret_val;
+                }
+
+		ret_val = _parallel_heft_push_task(task, 1, sched_ctx_id);
+		_STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
+                return ret_val;
+        }
+
+
+	_STARPU_PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
+	nworkers = starpu_get_nworkers_of_sched_ctx(sched_ctx_id);
+        if(nworkers == 0)
+	{
+		_STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
+                return ret_val;
+        }
 
-	return _parallel_heft_push_task(task, 0, sched_ctx_id);
+        ret_val = _parallel_heft_push_task(task, 0, sched_ctx_id);
+	_STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
+	return ret_val;
 }
 
-static void parallel_heft_init_for_workers(unsigned sched_ctx_id, int *workerids, unsigned nnew_workers)
+static void parallel_heft_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
 {
-	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	
 	int workerid;
 	unsigned i;
-	for (i = 0; i < nnew_workers; i++)
+	for (i = 0; i < nworkers; i++)
 	{
 		workerid = workerids[i];
-		struct starpu_worker_s *workerarg = _starpu_get_worker_struct(workerid);
+		struct _starpu_worker *workerarg = _starpu_get_worker_struct(workerid);
 		/* init these structures only once for each worker */
 		if(!workerarg->has_prev_init)
 		{
@@ -427,64 +488,16 @@ static void parallel_heft_init_for_workers(unsigned sched_ctx_id, int *workerids
 			workerarg->has_prev_init = 1;
 		}
 
-		PTHREAD_MUTEX_INIT(sched_ctx->sched_mutex[workerid], NULL);
-		PTHREAD_COND_INIT(sched_ctx->sched_cond[workerid], NULL);
+		starpu_worker_init_sched_condition(sched_ctx_id, workerid);
 	}
-}
 
-static void initialize_parallel_heft_policy(unsigned sched_ctx_id) 
-{	
-	pheft_data *hd = (pheft_data*)malloc(sizeof(pheft_data));
-	hd->alpha = STARPU_DEFAULT_ALPHA;
-	hd->beta = STARPU_DEFAULT_BETA;
-	hd->_gamma = STARPU_DEFAULT_GAMMA;
-	hd->idle_power = 0.0;
-	
-	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	unsigned nworkers_ctx = sched_ctx->nworkers;
-	sched_ctx->policy_data = (void*)hd;
-
-	const char *strval_alpha = getenv("STARPU_SCHED_ALPHA");
-	if (strval_alpha)
-		hd->alpha = atof(strval_alpha);
-
-	const char *strval_beta = getenv("STARPU_SCHED_BETA");
-	if (strval_beta)
-		hd->beta = atof(strval_beta);
-
-	const char *strval_gamma = getenv("STARPU_SCHED_GAMMA");
-	if (strval_gamma)
-		hd->_gamma = atof(strval_gamma);
-
-	const char *strval_idle_power = getenv("STARPU_IDLE_POWER");
-	if (strval_idle_power)
-		hd->idle_power = atof(strval_idle_power);
-
-	struct starpu_machine_config_s *config = (struct starpu_machine_config_s *)_starpu_get_machine_config();
+	struct _starpu_machine_config *config = (struct _starpu_machine_config *)_starpu_get_machine_config();
 	_starpu_sched_find_worker_combinations(&config->topology);
 
 	ncombinedworkers = config->topology.ncombinedworkers;
 
-	unsigned workerid_ctx;
-	int workerid;
-	for (workerid_ctx = 0; workerid_ctx < nworkers_ctx; workerid_ctx++)
-	{
-		workerid = sched_ctx->workerids[workerid_ctx];
-		struct starpu_worker_s *workerarg = _starpu_get_worker_struct(workerid);
-		if(!workerarg->has_prev_init)
-		{
-			worker_exp_start[workerid] = starpu_timing_now();
-			worker_exp_len[workerid] = 0.0;
-			worker_exp_end[workerid] = worker_exp_start[workerid]; 
-			ntasks[workerid] = 0;
-		}
-		_STARPU_PTHREAD_MUTEX_INIT(sched_ctx->sched_mutex[workerid], NULL);
-		_STARPU_PTHREAD_COND_INIT(sched_ctx->sched_cond[workerid], NULL);
-	}
-
-
 	/* We pre-compute an array of all the perfmodel archs that are applicable */
-	unsigned total_worker_count = nworkers_ctx + ncombinedworkers;
+	unsigned total_worker_count = nworkers + ncombinedworkers;
 
 	unsigned used_perf_archtypes[STARPU_NARCH_VARIATIONS];
 	memset(used_perf_archtypes, 0, sizeof(used_perf_archtypes));
@@ -503,33 +516,63 @@ static void initialize_parallel_heft_policy(unsigned sched_ctx_id)
 //		if (used_perf_archtypes[arch])
 //			applicable_perf_archtypes[napplicable_perf_archtypes++] = arch;
 //	}
+
 }
 
-static void parallel_heft_deinit(unsigned sched_ctx_id) 
+static void parallel_heft_remove_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
 {
-	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	pheft_data *ht = (pheft_data*)sched_ctx->policy_data;	  
-	unsigned workerid_ctx;
-	int workerid;
-
-	unsigned nworkers_ctx = sched_ctx->nworkers;
-	for (workerid_ctx = 0; workerid_ctx < nworkers_ctx; workerid_ctx++)
-	{	
-		workerid = sched_ctx->workerids[workerid_ctx];
-		PTHREAD_MUTEX_DESTROY(sched_ctx->sched_mutex[workerid]);
-		PTHREAD_COND_DESTROY(sched_ctx->sched_cond[workerid]);
+	unsigned i;
+	int worker;
+	for(i = 0; i < nworkers; i++)
+	{
+		worker = workerids[i];
+		starpu_worker_deinit_sched_condition(sched_ctx_id, worker);
 	}
+}
+static void initialize_parallel_heft_policy(unsigned sched_ctx_id) 
+{	
+	starpu_create_worker_collection_for_sched_ctx(sched_ctx_id, WORKER_LIST);
+	pheft_data *hd = (pheft_data*)malloc(sizeof(pheft_data));
+	hd->alpha = _STARPU_DEFAULT_ALPHA;
+	hd->beta = _STARPU_DEFAULT_BETA;
+	hd->_gamma = _STARPU_DEFAULT_GAMMA;
+	hd->idle_power = 0.0;
+	
+	starpu_set_sched_ctx_policy_data(sched_ctx_id, (void*)hd);
 
-	free(ht);
-	PTHREAD_MUTEX_DESTROY(&big_lock);
+	const char *strval_alpha = getenv("STARPU_SCHED_ALPHA");
+	if (strval_alpha)
+		hd->alpha = atof(strval_alpha);
+
+	const char *strval_beta = getenv("STARPU_SCHED_BETA");
+	if (strval_beta)
+		hd->beta = atof(strval_beta);
+
+	const char *strval_gamma = getenv("STARPU_SCHED_GAMMA");
+	if (strval_gamma)
+		hd->_gamma = atof(strval_gamma);
+
+	const char *strval_idle_power = getenv("STARPU_IDLE_POWER");
+	if (strval_idle_power)
+		hd->idle_power = atof(strval_idle_power);
+
+
+}
+
+static void parallel_heft_deinit(unsigned sched_ctx_id) 
+{
+	pheft_data *hd = (pheft_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
+	starpu_delete_worker_collection_for_sched_ctx(sched_ctx_id);
+	free(hd);
 }
 
 /* TODO: use post_exec_hook to fix the expected start */
 struct starpu_sched_policy _starpu_sched_parallel_heft_policy =
 {
 	.init_sched = initialize_parallel_heft_policy,
-	.init_sched_for_workers = parallel_heft_init_for_workers,
 	.deinit_sched = parallel_heft_deinit,
+	.add_workers = parallel_heft_add_workers,
+	.remove_workers = parallel_heft_remove_workers,
 	.push_task = parallel_heft_push_task, 
 	.pop_task = NULL,
 	.pre_exec_hook = NULL,