Andra Hugo лет назад: 13
Родитель
Сommit
60a84c3725

+ 2 - 2
include/starpu_scheduler.h

@@ -92,13 +92,13 @@ struct starpu_sched_policy
 
 	/* Get a task from the scheduler. The mutex associated to the worker is
 	 * already taken when this method is called. */
-	struct starpu_task *(*pop_task)(void);
+	struct starpu_task *(*pop_task)(unsigned sched_ctx);
 
 	 /* Remove all available tasks from the scheduler (tasks are chained by
 	  * the means of the prev and next fields of the starpu_task
 	  * structure). The mutex associated to the worker is already taken
 	  * when this method is called. */
-	struct starpu_task *(*pop_every_task)(void);
+	struct starpu_task *(*pop_every_task)(unsigned sched_ctx);
 
 	/* This method is called every time a task is starting. (optional) */
 	void (*pre_exec_hook)(struct starpu_task *);

+ 1 - 1
src/Makefile.am

@@ -156,6 +156,7 @@ libstarpu_@STARPU_EFFECTIVE_VERSION@_la_SOURCES = 						\
 	sched_policies/stack_queues.c				\
 	sched_policies/deque_queues.c				\
 	sched_policies/fifo_queues.c				\
+	sched_policies/deque_modeling_policy_data_aware.c	\
 	sched_policies/detect_combined_workers.c		\
 	drivers/driver_common/driver_common.c			\
 	datawizard/memory_nodes.c				\
@@ -211,7 +212,6 @@ libstarpu_@STARPU_EFFECTIVE_VERSION@_la_SOURCES = 						\
 #	sched_policies/eager_central_policy.c			
 #	sched_policies/eager_central_priority_policy.c		
 #	sched_policies/work_stealing_policy.c			
-#	sched_policies/deque_modeling_policy_data_aware.c	
 #	sched_policies/random_policy.c				
 #	sched_policies/parallel_heft.c				
 #	sched_policies/parallel_greedy.c			

+ 6 - 0
src/core/jobs.c

@@ -207,7 +207,13 @@ void _starpu_handle_job_termination(struct _starpu_job *j, int workerid)
 
 	/* control task should not execute post_exec_hook */
 	if(task->cl != NULL && !task->control_task)
+	{
 	  _starpu_sched_post_exec_hook(task);
+#ifdef STARPU_USE_SCHED_CTX_HYPERVISOR
+	  starpu_call_poped_task_cb(workerid, sched_ctx, task->flops);
+#endif //STARPU_USE_SCHED_CTX_HYPERVISOR
+	}
+
 
 	_STARPU_TRACE_TASK_DONE(j);
 

+ 1 - 1
src/core/sched_ctx.h

@@ -105,6 +105,6 @@ int _starpu_get_index_in_ctx_of_workerid(unsigned sched_ctx, unsigned workerid);
 unsigned _starpu_get_nsched_ctxs();
 
 /* Get the mutex corresponding to the global workerid */
-pthread_mutex_t *_starpu_get_sched_mutex(struct starpu_sched_ctx *sched_ctx, int worker);
+pthread_mutex_t *_starpu_get_sched_mutex(struct _starpu_sched_ctx *sched_ctx, int worker);
 
 #endif // __SCHED_CONTEXT_H__

+ 18 - 11
src/core/sched_policy.c

@@ -39,10 +39,10 @@ int starpu_get_prefetch_flag(void)
 /* extern struct starpu_sched_policy _starpu_sched_ws_policy; */
 /* extern struct starpu_sched_policy _starpu_sched_prio_policy; */
 /* extern struct starpu_sched_policy _starpu_sched_random_policy; */
-/* extern struct starpu_sched_policy _starpu_sched_dm_policy; */
-/* extern struct starpu_sched_policy _starpu_sched_dmda_policy; */
-/* extern struct starpu_sched_policy _starpu_sched_dmda_ready_policy; */
-/* extern struct starpu_sched_policy _starpu_sched_dmda_sorted_policy; */
+extern struct starpu_sched_policy _starpu_sched_dm_policy;
+extern struct starpu_sched_policy _starpu_sched_dmda_policy;
+extern struct starpu_sched_policy _starpu_sched_dmda_ready_policy;
+extern struct starpu_sched_policy _starpu_sched_dmda_sorted_policy;
 /* extern struct starpu_sched_policy _starpu_sched_eager_policy; */
 /* extern struct starpu_sched_policy _starpu_sched_parallel_heft_policy; */
 /* extern struct starpu_sched_policy _starpu_sched_pgreedy_policy; */
@@ -143,7 +143,7 @@ static void display_sched_help_message(void)
 	 }
 }
 
-static struct starpu_sched_policy *select_sched_policy(struct _starpu_machine_config *config, char *required_policy)
+static struct starpu_sched_policy *select_sched_policy(struct _starpu_machine_config *config, const char *required_policy)
 {
 	struct starpu_sched_policy *selected_policy = NULL;
 	struct starpu_conf *user_conf = config->user_conf;
@@ -244,7 +244,13 @@ static int _starpu_push_task_on_specific_worker(struct starpu_task *task, int wo
 	{
 		sched_ctx = worker->sched_ctx[i];
 		if (sched_ctx != NULL && sched_ctx->sched_policy != NULL && sched_ctx->sched_policy->push_task_notify)
+		{
 			sched_ctx->sched_policy->push_task_notify(task, workerid);
+#ifdef STARPU_USE_SCHED_CTX_HYPERVISOR
+			starpu_call_pushed_task_cb(workerid, sched_ctx->id);
+#endif //STARPU_USE_SCHED_CTX_HYPERVISOR
+		}
+
 	}
 	
 	if (is_basic_worker)
@@ -282,7 +288,6 @@ static int _starpu_push_task_on_specific_worker(struct starpu_task *task, int wo
 		int *combined_workerid = combined_worker->combined_workerid;
 
 		int ret = 0;
-		int i;
 
 		struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
 		j->task_size = worker_size;
@@ -292,11 +297,12 @@ static int _starpu_push_task_on_specific_worker(struct starpu_task *task, int wo
 		_STARPU_PTHREAD_BARRIER_INIT(&j->before_work_barrier, NULL, worker_size);
 		_STARPU_PTHREAD_BARRIER_INIT(&j->after_work_barrier, NULL, worker_size);
 
-		for (i = 0; i < worker_size; i++)
+		int k;
+		for (k = 0; k < worker_size; k++)
 		{
 			struct starpu_task *alias = _starpu_create_task_alias(task);
 
-			worker = _starpu_get_worker_struct(combined_workerid[i]);
+			worker = _starpu_get_worker_struct(combined_workerid[k]);
 			ret |= _starpu_push_local_task(worker, alias, 0);
 		}
 
@@ -329,7 +335,6 @@ int _starpu_push_task(struct _starpu_job *j)
 	struct starpu_task *task = j->task;
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
 	int workerid = starpu_worker_get_id();
-	unsigned no_workers = 0;
 	unsigned nworkers = 0; 
 
 	if(!sched_ctx->is_initial_sched)
@@ -500,7 +505,7 @@ pick:
 					_STARPU_PTHREAD_MUTEX_LOCK(sched_ctx_mutex);
 					if (sched_ctx->sched_policy && sched_ctx->sched_policy->pop_task)
 					{
-						task = sched_ctx->sched_policy->pop_task();
+						task = sched_ctx->sched_policy->pop_task(sched_ctx->id);
 						_STARPU_PTHREAD_MUTEX_UNLOCK(sched_ctx_mutex);
 						break;
 					}
@@ -604,7 +609,9 @@ struct starpu_task *_starpu_pop_every_task(struct _starpu_sched_ctx *sched_ctx)
 	STARPU_ASSERT(sched_ctx->sched_policy->pop_every_task);
 
 	/* TODO set profiling info */
-	return sched_ctx->sched_policy->pop_every_task();
+	if(sched_ctx->sched_policy->pop_every_task)
+		return sched_ctx->sched_policy->pop_every_task(sched_ctx->id);
+	return NULL;
 }
 
 void _starpu_sched_pre_exec_hook(struct starpu_task *task)

+ 4 - 4
src/core/sched_policy.h

@@ -24,18 +24,18 @@
 #include <starpu_scheduler.h>
 
 struct starpu_machine_config;
-struct starpu_sched_policy *_starpu_get_sched_policy( struct starpu_sched_ctx *sched_ctx);
+struct starpu_sched_policy *_starpu_get_sched_policy( struct _starpu_sched_ctx *sched_ctx);
 
 void _starpu_init_sched_policy(struct _starpu_machine_config *config, 
-			       struct starpu_sched_ctx *sched_ctx, const char *required_policy);
+			       struct _starpu_sched_ctx *sched_ctx, const char *required_policy);
 
-void _starpu_deinit_sched_policy(struct starpu_sched_ctx *sched_ctx);
+void _starpu_deinit_sched_policy(struct _starpu_sched_ctx *sched_ctx);
 
 int _starpu_push_task(struct _starpu_job *task);
 /* pop a task that can be executed on the worker */
 struct starpu_task *_starpu_pop_task(struct _starpu_worker *worker);
 /* pop every task that can be executed on the worker */
-struct starpu_task *_starpu_pop_every_task(struct starpu_sched_ctx *sched_ctx);
+struct starpu_task *_starpu_pop_every_task(struct _starpu_sched_ctx *sched_ctx);
 void _starpu_sched_post_exec_hook(struct starpu_task *task);
 
 void _starpu_wait_on_sched_event(void);

+ 151 - 85
src/sched_policies/deque_modeling_policy_data_aware.c

@@ -32,7 +32,7 @@ typedef struct {
 	double _gamma;
 	double idle_power;
 
-	struct starpu_fifo_taskq_s **queue_array;
+	struct _starpu_fifo_taskq **queue_array;
 
 	long int total_task_cnt;
 	long int ready_task_cnt;
@@ -300,6 +300,11 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 	pthread_mutex_t *sched_mutex;
 	pthread_cond_t *sched_cond;
 	starpu_worker_get_sched_condition(sched_ctx_id, best_workerid, &sched_mutex, &sched_cond);
+
+#ifdef STARPU_USE_SCHED_CTX_HYPERVISOR
+        starpu_call_pushed_task_cb(best_workerid, sched_ctx_id);
+#endif //STARPU_USE_SCHED_CTX_HYPERVISOR 
+
 	if (prio)
 		return _starpu_fifo_push_sorted_task(dt->queue_array[best_workerid],
 			sched_mutex, sched_cond, task);
@@ -329,13 +334,16 @@ static int _dm_push_task(struct starpu_task *task, unsigned prio, unsigned sched
 
 	unsigned best_impl = 0;
 	unsigned nimpl;
-	unsigned nworkers = starpu_get_nworkers_of_ctx(sched_ctx_id);
-	int *workerids = starpu_get_workers_of_ctx(sched_ctx_id);
-	for (worker_ctx = 0; worker_ctx < nworkers; worker_ctx++)
-	{
+	struct worker_collection *workers = starpu_get_worker_collection_of_sched_ctx(sched_ctx_id);
+
+	if(workers->init_cursor)
+                workers->init_cursor(workers);
+
+	while(workers->has_next(workers))
+        {
+                worker = workers->get_next(workers);
 		for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
 		{
-			worker = workerids[worker_ctx];
 			double exp_end;
 		
 			fifo = dt->queue_array[worker];
@@ -347,6 +355,7 @@ static int _dm_push_task(struct starpu_task *task, unsigned prio, unsigned sched
 			if (!starpu_worker_can_execute_task(worker, task, nimpl))
 			{
 				/* no one on that queue may execute this task */
+				worker_ctx++;
 				continue;
 			}
 
@@ -392,6 +401,7 @@ static int _dm_push_task(struct starpu_task *task, unsigned prio, unsigned sched
 				best_impl = nimpl;
 			}
 		}
+		worker_ctx++;
 	}
 
 	if (unknown)
@@ -404,6 +414,9 @@ static int _dm_push_task(struct starpu_task *task, unsigned prio, unsigned sched
 
 	//_STARPU_DEBUG("Scheduler dm: kernel (%u)\n", best_impl);
 
+	if(workers->init_cursor)
+                workers->deinit_cursor(workers);
+
 	 _starpu_get_job_associated_to_task(task)->nimpl = best_impl;
 
 	/* we should now have the best worker in variable "best" */
@@ -416,14 +429,16 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio, unsigned sch
 	/* find the queue */
 	struct _starpu_fifo_taskq *fifo;
 	unsigned worker, worker_ctx;
-	int best = -1, best_ctx = -1;
+	int best = -1, best_in_ctx = -1;
 	
 	/* this flag is set if the corresponding worker is selected because
 	   there is no performance prediction available yet */
 	int forced_best = -1;
 
-	unsigned nworkers_ctx = starpu_get_nworkers_of_ctx(sched_ctx_id);
-	int *workerids = starpu_get_workers_of_ctx(sched_ctx_id);
+	struct worker_collection *workers = starpu_get_worker_collection_of_sched_ctx(sched_ctx_id);
+
+	unsigned nworkers_ctx = workers->nworkers;
+
 	double local_task_length[nworkers_ctx][STARPU_MAXIMPLEMENTATIONS];
 	double local_data_penalty[nworkers_ctx][STARPU_MAXIMPLEMENTATIONS];
 	double local_power[nworkers_ctx][STARPU_MAXIMPLEMENTATIONS];
@@ -445,9 +460,14 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio, unsigned sch
 
 	unsigned best_impl = 0;
 	unsigned nimpl=0;
-	for (worker_ctx = 0; worker_ctx < nworkers_ctx; worker_ctx++)
-	{
-		worker = workerids[worker_ctx];
+
+	if(workers->init_cursor)
+		workers->init_cursor(workers);
+
+
+	while(workers->has_next(workers))
+        {
+                worker = workers->get_next(workers);
 		for(nimpl  = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
 	 	{
 			fifo = dt->queue_array[worker];
@@ -461,6 +481,7 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio, unsigned sch
 			if (!starpu_worker_can_execute_task(worker, task, nimpl))
 			{
 				/* no one on that queue may execute this task */
+				worker_ctx++;
 				continue;
 			}
 
@@ -498,7 +519,7 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio, unsigned sch
 				unknown = 1;
 
 			if (unknown)
-					continue;
+				continue;
 
 			exp_end[worker_ctx][nimpl] = fifo->exp_start + fifo->exp_len + local_task_length[worker_ctx][nimpl];
 
@@ -514,6 +535,7 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio, unsigned sch
 				local_power[worker_ctx][nimpl] = 0.;
 
 		 }
+		worker_ctx++;
 	}
 
 	if (unknown)
@@ -521,44 +543,47 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio, unsigned sch
 
 	double best_fitness = -1;
 	
+	worker_ctx = 0;
 	if (forced_best == -1)
 	{
-		for (worker_ctx = 0; worker_ctx < nworkers_ctx; worker_ctx++)
+		while(workers->has_next(workers))
 		{
-			worker = workerids[worker_ctx];
+			worker = workers->get_next(workers);
 			for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
 			{	
 				if (!starpu_worker_can_execute_task(worker, task, nimpl))
 				{
 					/* no one on that queue may execute this task */
+					worker_ctx++;
 					continue;
 				}
-
-					fifo = dt->queue_array[worker];
-	
-					fitness[worker_ctx][nimpl] = dt->alpha*(exp_end[worker_ctx][nimpl] - best_exp_end) 
+				
+				fifo = dt->queue_array[worker];
+				
+				fitness[worker_ctx][nimpl] = dt->alpha*(exp_end[worker_ctx][nimpl] - best_exp_end) 
 					+ dt->beta*(local_data_penalty[worker_ctx][nimpl])
 					+ dt->_gamma*(local_power[worker_ctx][nimpl]);
-
-			if (exp_end[worker_ctx][nimpl] > max_exp_end)
-				/* This placement will make the computation
-				 * longer, take into account the idle
-				 * consumption of other cpus */
-				fitness[worker_ctx][nimpl] += dt->_gamma * dt->idle_power * (exp_end[worker_ctx][nimpl] - max_exp_end) / 1000000.0;
-
-			if (best == -1 || fitness[worker_ctx][nimpl] < best_fitness)
-			{
-				/* we found a better solution */
-				best_fitness = fitness[worker_ctx][nimpl];
-				best = worker;
-				best_ctx = worker_ctx;
-				best_impl = nimpl;
-
-				//			_STARPU_DEBUG("best fitness (worker %d) %e = alpha*(%e) + beta(%e) +gamma(%e)\n", worker, best_fitness, exp_end[worker][nimpl] - best_exp_end, local_data_penalty[worker][nimpl], local_power[worker][nimpl]);
+				
+				if (exp_end[worker_ctx][nimpl] > max_exp_end)
+					/* This placement will make the computation
+					 * longer, take into account the idle
+					 * consumption of other cpus */
+					fitness[worker_ctx][nimpl] += dt->_gamma * dt->idle_power * (exp_end[worker_ctx][nimpl] - max_exp_end) / 1000000.0;
+				
+				if (best == -1 || fitness[worker_ctx][nimpl] < best_fitness)
+				{
+					/* we found a better solution */
+					best_fitness = fitness[worker_ctx][nimpl];
+					best = worker;
+					best_in_ctx = worker_ctx;
+					best_impl = nimpl;
+					
+					//			_STARPU_DEBUG("best fitness (worker %d) %e = alpha*(%e) + beta(%e) +gamma(%e)\n", worker, best_fitness, exp_end[worker][nimpl] - best_exp_end, local_data_penalty[worker][nimpl], local_power[worker][nimpl]);
+				}
 			}
 		}
 	}
-
+		
 	STARPU_ASSERT(forced_best != -1 || best != -1);
 
 	if (forced_best != -1)
@@ -572,10 +597,12 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio, unsigned sch
 	}
 	else
 	{
-		model_best = local_task_length[best_ctx][best_impl];
-		//penality_best = local_data_penalty[best_ctx][best_impl];
+		model_best = local_task_length[best_in_ctx][best_impl];
+		//penality_best = local_data_penalty[best_in_ctx][best_impl];
 	}
 
+        if(workers->init_cursor)
+                workers->deinit_cursor(workers);
 
 	//_STARPU_DEBUG("Scheduler dmda: kernel (%u)\n", best_impl);
 	 _starpu_get_job_associated_to_task(task)->nimpl = best_impl;
@@ -584,54 +611,108 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio, unsigned sch
 	return push_task_on_best_worker(task, best, model_best, prio, sched_ctx_id);
 }
 
-static int dmda_push_sorted_task(struct starpu_task *task, unsigned sched_ctx_id)
+static int dmda_push_sorted_task(struct starpu_task *task)
 {
-	return _dmda_push_task(task, 1, sched_ctx_id);
+	unsigned sched_ctx_id = task->sched_ctx;
+        pthread_mutex_t *changing_ctx_mutex = starpu_get_changing_ctx_mutex(sched_ctx_id);
+        unsigned nworkers;
+        int ret_val = -1;
+
+	_STARPU_PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
+	nworkers = starpu_get_nworkers_of_sched_ctx(sched_ctx_id);
+	if(nworkers == 0)
+	{
+		_STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
+		return ret_val;
+	}
+
+	ret_val = _dmda_push_task(task, 1, sched_ctx_id);
+	_STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
+	return ret_val;
+
 }
 
-static int dm_push_task(struct starpu_task *task, unsigned sched_ctx_id)
+static int dm_push_task(struct starpu_task *task)
 {
-	return _dm_push_task(task, 0, sched_ctx_id);
+	unsigned sched_ctx_id = task->sched_ctx;
+        pthread_mutex_t *changing_ctx_mutex = starpu_get_changing_ctx_mutex(sched_ctx_id);
+        unsigned nworkers;
+        int ret_val = -1;
+
+	_STARPU_PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
+	nworkers = starpu_get_nworkers_of_sched_ctx(sched_ctx_id);
+	if(nworkers == 0)
+	{
+		_STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
+		return ret_val;
+	}
+
+	ret_val = _dm_push_task(task, 0, sched_ctx_id);
+	_STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
+	return ret_val;
 }
 
-static int dmda_push_task(struct starpu_task *task, unsigned sched_ctx_id)
+static int dmda_push_task(struct starpu_task *task)
 {
-	return _dmda_push_task(task, 0, sched_ctx_id);
+	unsigned sched_ctx_id = task->sched_ctx;
+        pthread_mutex_t *changing_ctx_mutex = starpu_get_changing_ctx_mutex(sched_ctx_id);
+        unsigned nworkers;
+        int ret_val = -1;
+
+	_STARPU_PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
+	nworkers = starpu_get_nworkers_of_sched_ctx(sched_ctx_id);
+	if(nworkers == 0)
+	{
+		_STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
+		return ret_val;
+	}
+
+	ret_val = _dmda_push_task(task, 0, sched_ctx_id);
+	_STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
+	return ret_val;
 }
 
-static void initialize_dmda_policy_for_workers(unsigned sched_ctx_id, int *workerids, unsigned nnew_workers) 
+static void dmda_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers) 
 {
-	unsigned nworkers = starpu_get_nworkers_of_ctx(sched_ctx_id);
 	dmda_data *dt = (dmda_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
 
-	
 	int workerid;
 	unsigned i;
-	pthread_mutex_t *sched_mutex;
-	pthread_cond_t *sched_cond;
-	for (i = 0; i < nnew_workers; i++)
+	for (i = 0; i < nworkers; i++)
 	{
 		workerid = workerids[i];
 		dt->queue_array[workerid] = _starpu_create_fifo();
-	
 		starpu_worker_init_sched_condition(sched_ctx_id, workerid);
 	}
+}
+
+static void dmda_remove_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
+{
+	dmda_data *dt = (dmda_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
 
+	int workerid;
+	unsigned i;
+	for (i = 0; i < nworkers; i++)
+	{
+		workerid = workerids[i];
+		_starpu_destroy_fifo(dt->queue_array[workerid]);
+		starpu_worker_deinit_sched_condition(sched_ctx_id, workerid);
+	}
 }
 
 static void initialize_dmda_policy(unsigned sched_ctx_id) 
 {
+	starpu_create_worker_collection_for_sched_ctx(sched_ctx_id, WORKER_LIST);
+
 	dmda_data *dt = (dmda_data*)malloc(sizeof(dmda_data));
-	dt->alpha = STARPU_DEFAULT_ALPHA;
-	dt->beta = STARPU_DEFAULT_BETA;
-	dt->_gamma = STARPU_DEFAULT_GAMMA;
+	dt->alpha = _STARPU_DEFAULT_ALPHA;
+	dt->beta = _STARPU_DEFAULT_BETA;
+	dt->_gamma = _STARPU_DEFAULT_GAMMA;
 	dt->idle_power = 0.0;
 
-	unsigned nworkers = starpu_get_nworkers_of_ctx(sched_ctx_id);
-	int *workerids = starpu_get_workers_of_ctx(sched_ctx_id);
 	starpu_set_sched_ctx_policy_data(sched_ctx_id, (void*)dt);
 
-	dt->queue_array = (struct starpu_fifo_taskq_s**)malloc(STARPU_NMAXWORKERS*sizeof(struct starpu_fifo_taskq_s*));
+	dt->queue_array = (struct _starpu_fifo_taskq**)malloc(STARPU_NMAXWORKERS*sizeof(struct _starpu_fifo_taskq*));
 
 	const char *strval_alpha = getenv("STARPU_SCHED_ALPHA");
 	if (strval_alpha)
@@ -644,16 +725,6 @@ static void initialize_dmda_policy(unsigned sched_ctx_id)
 	const char *strval_gamma = getenv("STARPU_SCHED_GAMMA");
 	if (strval_gamma)
 		dt->_gamma = atof(strval_gamma);
-
-	unsigned workerid_ctx;
-	int workerid;
-	for (workerid_ctx = 0; workerid_ctx < nworkers; workerid_ctx++)
-	{
-		workerid = workerids[workerid_ctx];
-		dt->queue_array[workerid] = _starpu_create_fifo();
-	
-		starpu_worker_init_sched_condition(sched_ctx_id, workerid);
-	}
 }
 
 static void initialize_dmda_sorted_policy(unsigned sched_ctx_id)
@@ -669,19 +740,10 @@ static void deinitialize_dmda_policy(unsigned sched_ctx_id)
 {
 
 	dmda_data *dt = (dmda_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
-	int workerid_ctx, workerid;
-        int nworkers = starpu_get_nworkers_of_ctx(sched_ctx_id);
-	int *workerids = starpu_get_workers_of_ctx(sched_ctx_id);
-	for (workerid_ctx = 0; workerid_ctx < nworkers; workerid_ctx++){
-		workerid = workerids[workerid_ctx];
-		_starpu_destroy_fifo(dt->queue_array[workerid]);
-		starpu_worker_deinit_sched_condition(sched_ctx_id, workerid);
-	}
-
 	free(dt->queue_array);
 	free(dt);
 
-	_STARPU_DEBUG("total_task_cnt %ld ready_task_cnt %ld -> %f\n", total_task_cnt, ready_task_cnt, (100.0f*ready_task_cnt)/total_task_cnt);
+	_STARPU_DEBUG("total_task_cnt %ld ready_task_cnt %ld -> %f\n", dt->total_task_cnt, dt->ready_task_cnt, (100.0f*dt->ready_task_cnt)/dt->total_task_cnt);
 }
 
 /* TODO: use post_exec_hook to fix the expected start */
@@ -689,54 +751,58 @@ struct starpu_sched_policy _starpu_sched_dm_policy =
 {
 	.init_sched = initialize_dmda_policy,
 	.deinit_sched = deinitialize_dmda_policy,
+	.add_workers = dmda_add_workers ,
+        .remove_workers = dmda_remove_workers,
 	.push_task = dm_push_task,
 	.pop_task = dmda_pop_task,
 	.pre_exec_hook = NULL,
 	.post_exec_hook = NULL,
 	.pop_every_task = dmda_pop_every_task,
 	.policy_name = "dm",
-	.policy_description = "performance model",
-	.init_sched_for_workers = initialize_dmda_policy_for_workers
+	.policy_description = "performance model"
 };
 
 struct starpu_sched_policy _starpu_sched_dmda_policy =
 {
 	.init_sched = initialize_dmda_policy,
 	.deinit_sched = deinitialize_dmda_policy,
+	.add_workers = dmda_add_workers ,
+        .remove_workers = dmda_remove_workers,
 	.push_task = dmda_push_task,
 	.pop_task = dmda_pop_task,
 	.pre_exec_hook = NULL,
 	.post_exec_hook = NULL,
 	.pop_every_task = dmda_pop_every_task,
 	.policy_name = "dmda",
-	.policy_description = "data-aware performance model",
-	.init_sched_for_workers = initialize_dmda_policy_for_workers
+	.policy_description = "data-aware performance model"
 };
 
 struct starpu_sched_policy _starpu_sched_dmda_sorted_policy =
 {
 	.init_sched = initialize_dmda_sorted_policy,
 	.deinit_sched = deinitialize_dmda_policy,
+	.add_workers = dmda_add_workers ,
+        .remove_workers = dmda_remove_workers,
 	.push_task = dmda_push_sorted_task,
 	.pop_task = dmda_pop_ready_task,
 	.pre_exec_hook = NULL,
 	.post_exec_hook = NULL,
 	.pop_every_task = dmda_pop_every_task,
 	.policy_name = "dmdas",
-	.policy_description = "data-aware performance model (sorted)",
-	.init_sched_for_workers = initialize_dmda_policy_for_workers
+	.policy_description = "data-aware performance model (sorted)"
 };
 
 struct starpu_sched_policy _starpu_sched_dmda_ready_policy =
 {
 	.init_sched = initialize_dmda_policy,
 	.deinit_sched = deinitialize_dmda_policy,
+	.add_workers = dmda_add_workers ,
+        .remove_workers = dmda_remove_workers,
 	.push_task = dmda_push_task,
 	.pop_task = dmda_pop_ready_task,
 	.pre_exec_hook = NULL,
 	.post_exec_hook = NULL,
 	.pop_every_task = dmda_pop_every_task,
 	.policy_name = "dmdar",
-	.policy_description = "data-aware performance model (ready)",
-	.init_sched_for_workers = initialize_dmda_policy_for_workers
+	.policy_description = "data-aware performance model (ready)"
 };

+ 15 - 23
src/sched_policies/heft.c

@@ -65,11 +65,11 @@ static void param_modified(struct starpu_top_param* d)
 }
 
 
-static void heft_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nnew_workers)
+static void heft_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
 {
 	int workerid;
 	unsigned i;
-	for (i = 0; i < nnew_workers; i++)
+	for (i = 0; i < nworkers; i++)
 	{
 		workerid = workerids[i];
 		struct _starpu_worker *workerarg = _starpu_get_worker_struct(workerid);
@@ -160,16 +160,6 @@ static void heft_pre_exec_hook(struct starpu_task *task)
 	_STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
 }
 
-static void heft_post_exec_hook(struct starpu_task *task)
-{
-#ifdef STARPU_USE_SCHED_CTX_HYPERVISOR
-	unsigned sched_ctx_id = task->sched_ctx;
-	int workerid = starpu_worker_get_id();
-
-	starpu_call_poped_task_cb(workerid, sched_ctx_id, task->flops);
-#endif //STARPU_USE_SCHED_CTX_HYPERVISOR
-}
-
 static void heft_push_task_notify(struct starpu_task *task, int workerid)
 {
 	unsigned sched_ctx_id = task->sched_ctx;
@@ -185,9 +175,6 @@ static void heft_push_task_notify(struct starpu_task *task, int workerid)
 	pthread_cond_t *sched_cond;
 	starpu_worker_get_sched_condition(sched_ctx_id, workerid, &sched_mutex, &sched_cond);
 
-#ifdef STARPU_USE_SCHED_CTX_HYPERVISOR
-	starpu_call_pushed_task_cb(workerid, sched_ctx_id);
-#endif //STARPU_USE_SCHED_CTX_HYPERVISOR
 
 	/* Update the predictions */
 	_STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
@@ -328,6 +315,7 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 			if (!starpu_worker_can_execute_task(worker, task, nimpl))
 			{
 				/* no one on that queue may execute this task */
+				worker_ctx++;
 				continue;
 			}
 
@@ -442,12 +430,12 @@ static int push_conversion_tasks(struct starpu_task *task, unsigned int workerid
 	return 0;
 }
 
-
+/* TODO: factorize with dmda */
 static int _heft_push_task(struct starpu_task *task, unsigned prio, unsigned sched_ctx_id)
 {
 	heft_data *hd = (heft_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
 	unsigned worker, nimpl, worker_ctx = 0;
-	int best = -1, best_id_ctx = -1;
+	int best = -1, best_in_ctx = -1;
 	int selected_impl= -1;
 
 	/* this flag is set if the corresponding worker is selected because
@@ -472,6 +460,9 @@ static int _heft_push_task(struct starpu_task *task, unsigned prio, unsigned sch
 
 	starpu_task_bundle_t bundle = task->bundle;
 
+	if(workers->init_cursor)
+		workers->init_cursor(workers);
+
 	compute_all_performance_predictions(task, local_task_length, exp_end,
 					&max_exp_end, &best_exp_end,
 					local_data_penalty,
@@ -512,8 +503,8 @@ static int _heft_push_task(struct starpu_task *task, unsigned prio, unsigned sch
 		{
 			if (!starpu_worker_can_execute_task(worker, task, nimpl))
 			{
-				worker_ctx++;
 				/* no one on that queue may execute this task */
+				worker_ctx++;
 				continue;
 			}
 
@@ -533,7 +524,7 @@ static int _heft_push_task(struct starpu_task *task, unsigned prio, unsigned sch
 				/* we found a better solution */
 				best_fitness = fitness[worker_ctx][nimpl];
 				best = worker;
-				best_id_ctx = worker_ctx;
+				best_in_ctx = worker_ctx;
 				selected_impl = nimpl;
 			}
 		}
@@ -562,8 +553,8 @@ static int _heft_push_task(struct starpu_task *task, unsigned prio, unsigned sch
 	}
 	else 
 	{
-		model_best = local_task_length[best_id_ctx][selected_impl];
-		transfer_model_best = local_data_penalty[best_id_ctx][selected_impl];
+		model_best = local_task_length[best_in_ctx][selected_impl];
+		transfer_model_best = local_data_penalty[best_in_ctx][selected_impl];
 	}
 
 	if(workers->init_cursor)
@@ -623,7 +614,8 @@ static void heft_deinit(unsigned sched_ctx_id)
 	free(ht);
 }
 
-struct starpu_sched_policy heft_policy = {
+struct starpu_sched_policy heft_policy = 
+{
 	.init_sched = heft_init,
 	.deinit_sched = heft_deinit,
 	.push_task = heft_push_task,
@@ -631,7 +623,7 @@ struct starpu_sched_policy heft_policy = {
 	.pop_task = NULL,
 	.pop_every_task = NULL,
 	.pre_exec_hook = heft_pre_exec_hook,
-	.post_exec_hook = heft_post_exec_hook,
+	.post_exec_hook = NULL,
 	.add_workers = heft_add_workers	,
 	.remove_workers = heft_remove_workers,
 	.policy_name = "heft",