Browse Source

encapsulated sched_ctx struct

Andra Hugo 13 years ago
parent
commit
58267baf3a

+ 4 - 4
examples/sched_ctx_utils/sched_ctx_utils.c

@@ -151,8 +151,8 @@ void start_1stbench(void (*bench)(unsigned, unsigned))
 	double timing = (double)((end.tv_sec - start.tv_sec)*1000000 + (end.tv_usec - start.tv_usec));
 	timing /= 1000000;
 
-	printf("%2.2f %2.2f ", rv[0].flops);
-	printf("%2.2f %2.2f %2.2f\n", rv[0].avg_timing, timing);
+	printf("%2.2f ", rv[0].flops);
+	printf("%2.2f %2.2f\n", rv[0].avg_timing, timing);
 }
 
 void start_2ndbench(void (*bench)(unsigned, unsigned))
@@ -175,8 +175,8 @@ void start_2ndbench(void (*bench)(unsigned, unsigned))
 	double timing = (double)((end.tv_sec - start.tv_sec)*1000000 + (end.tv_usec - start.tv_usec));
 	timing /= 1000000;
 
-	printf("%2.2f %2.2f ", rv[1].flops);
-	printf("%2.2f %2.2f %2.2f\n", rv[1].avg_timing, timing);
+	printf("%2.2f ", rv[1].flops);
+	printf("%2.2f %2.2f\n", rv[1].avg_timing, timing);
 }
 
 void construct_contexts(void (*bench)(unsigned, unsigned))

+ 68 - 24
examples/scheduler/dummy_sched.c

@@ -19,66 +19,110 @@
 #include <starpu.h>
 
 #define NTASKS	32000
+//#define NTASKS	20
 #define FPRINTF(ofile, fmt, args ...) do { if (!getenv("STARPU_SSILENT")) {fprintf(ofile, fmt, ##args); }} while(0)
 
-struct starpu_task_list sched_list;
+typedef struct dummy_sched_data {
+	struct starpu_task_list sched_list;
+	pthread_mutex_t sched_mutex;
+	pthread_cond_t sched_cond;
+} dummy_sched_data;
 
-static pthread_cond_t sched_cond;
-static pthread_mutex_t sched_mutex;
+static void init_dummy_sched_for_workers(unsigned sched_ctx_id, int *workerids, unsigned nnew_workers)
+{
+	struct dummy_sched_data *data = (struct dummy_sched_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
+	
+	unsigned i;
+	int workerid;
+	for(i = 0; i < nnew_workers; i++)
+	{
+		workerid = workerids[i];
+		starpu_worker_set_sched_condition(sched_ctx_id, workerid, &data->sched_mutex,  &data->sched_cond);
+	}
+}
 
-static void init_dummy_sched(struct starpu_machine_topology_s *topology,
-			struct starpu_sched_policy_s *policy)
+static void init_dummy_sched(unsigned sched_ctx_id)
 {
+	unsigned nworkers_ctx = starpu_get_nworkers_of_ctx(sched_ctx_id);
+
+	struct dummy_sched_data *data = (struct dummy_sched_data*)malloc(sizeof(struct dummy_sched_data));
+	
+
 	/* Create a linked-list of tasks and a condition variable to protect it */
-	starpu_task_list_init(&sched_list);
+	starpu_task_list_init(&data->sched_list);
+
+	pthread_mutex_init(&data->sched_mutex, NULL);
+	pthread_cond_init(&data->sched_cond, NULL);
 
-	pthread_mutex_init(&sched_mutex, NULL);
-	pthread_cond_init(&sched_cond, NULL);
+	starpu_set_sched_ctx_policy_data(sched_ctx_id, (void*)data);
 
-	unsigned workerid;
-	for (workerid = 0; workerid < topology->nworkers; workerid++)
-		starpu_worker_set_sched_condition(workerid, &sched_cond, &sched_mutex);
+	int *workerids = starpu_get_workers_of_ctx(sched_ctx_id);
+	int workerid;
+	unsigned workerid_ctx;
+	for (workerid_ctx = 0; workerid_ctx < nworkers_ctx; workerid_ctx++)
+	{
+		workerid = workerids[workerid_ctx];
+		starpu_worker_set_sched_condition(sched_ctx_id, workerid, &data->sched_mutex,  &data->sched_cond);
+	}
+		
 
 	FPRINTF(stderr, "Initialising Dummy scheduler\n");
 }
 
-static void deinit_dummy_sched(struct starpu_machine_topology_s *topology,
-				struct starpu_sched_policy_s *policy)
+static void deinit_dummy_sched(unsigned sched_ctx_id)
 {
-	STARPU_ASSERT(starpu_task_list_empty(&sched_list));
+	struct dummy_sched_data *data = (struct dummy_sched_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
+
+	STARPU_ASSERT(starpu_task_list_empty(&data->sched_list));
+
+	unsigned nworkers_ctx = starpu_get_nworkers_of_ctx(sched_ctx_id);
+	int *workerids = starpu_get_workers_of_ctx(sched_ctx_id);
+	int workerid;
+	unsigned workerid_ctx;
+	for (workerid_ctx = 0; workerid_ctx < nworkers_ctx; workerid_ctx++)
+	{
+		workerid = workerids[workerid_ctx];
 
-	pthread_cond_destroy(&sched_cond);
-	pthread_mutex_destroy(&sched_mutex);
+		starpu_worker_set_sched_condition(sched_ctx_id, workerid, NULL, NULL);
+	}
 
+	pthread_cond_destroy(&data->sched_cond);
+	pthread_mutex_destroy(&data->sched_mutex);
+	free(data);
+	
 	FPRINTF(stderr, "Destroying Dummy scheduler\n");
 }
 
-static int push_task_dummy(struct starpu_task *task)
+static int push_task_dummy(struct starpu_task *task, unsigned sched_ctx_id)
 {
-	pthread_mutex_lock(&sched_mutex);
+	struct dummy_sched_data *data = (struct dummy_sched_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
+
+	pthread_mutex_lock(&data->sched_mutex);
 
-	starpu_task_list_push_front(&sched_list, task);
+	starpu_task_list_push_front(&data->sched_list, task);
 
-	pthread_cond_signal(&sched_cond);
+	pthread_cond_signal(&data->sched_cond);
 
-	pthread_mutex_unlock(&sched_mutex);
+	pthread_mutex_unlock(&data->sched_mutex);
 
 	return 0;
 }
 
 /* The mutex associated to the calling worker is already taken by StarPU */
-static struct starpu_task *pop_task_dummy(void)
+static struct starpu_task *pop_task_dummy(unsigned sched_ctx_id)
 {
 	/* NB: In this simplistic strategy, we assume that all workers are able
 	 * to execute all tasks, otherwise, it would have been necessary to go
 	 * through the entire list until we find a task that is executable from
 	 * the calling worker. So we just take the head of the list and give it
 	 * to the worker. */
-	return starpu_task_list_pop_back(&sched_list);
+	struct dummy_sched_data *data = (struct dummy_sched_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
+	return starpu_task_list_pop_back(&data->sched_list);
 }
 
 static struct starpu_sched_policy_s dummy_sched_policy = {
 	.init_sched = init_dummy_sched,
+	.init_sched_for_workers = init_dummy_sched_for_workers,
 	.deinit_sched = deinit_dummy_sched,
 	.push_task = push_task_dummy,
 	.pop_task = pop_task_dummy,
@@ -133,7 +177,7 @@ int main(int argc, char **argv)
 	
 		task->cl = &dummy_codelet;
 		task->cl_arg = NULL;
-	
+
 		starpu_task_submit(task);
 	}
 

+ 12 - 12
include/starpu_scheduler.h

@@ -124,21 +124,21 @@ void starpu_add_workers_to_sched_ctx(int *workerids_ctx, int nworkers_ctx, unsig
 
 void starpu_remove_workers_from_sched_ctx(int *workerids_ctx, int nworkers_ctx, unsigned sched_ctx);
 
-int* starpu_get_workers_of_ctx(unsigned sched_ctx, int *nworkers);
+int* starpu_get_workers_of_ctx(unsigned sched_ctx);
 
-void starpu_require_resize(unsigned sched_ctx, int *workers_to_be_moved, unsigned nworkers_to_be_moved);
+unsigned starpu_get_nworkers_of_ctx(unsigned sched_ctx);
 
+void starpu_set_sched_ctx_policy_data(unsigned sched_ctx, void* policy_data);
 
-/* When there is no available task for a worker, StarPU blocks this worker on a
-condition variable. This function specifies which condition variable (and the
-associated mutex) should be used to block (and to wake up) a worker. Note that
-multiple workers may use the same condition variable. For instance, in the case
-of a scheduling strategy with a single task queue, the same condition variable
-would be used to block and wake up all workers.  The initialization method of a
-scheduling strategy (init_sched) must call this function once per worker. */
-#if !defined(_MSC_VER)
-void starpu_worker_set_sched_condition(int workerid, pthread_cond_t *sched_cond, pthread_mutex_t *sched_mutex);
-#endif
+void* starpu_get_sched_ctx_policy_data(unsigned sched_ctx);
+
+void starpu_worker_set_sched_condition(unsigned sched_ctx, int workerid, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond);
+
+void starpu_worker_get_sched_condition(unsigned sched_ctx, int workerid, pthread_mutex_t **sched_mutex, pthread_cond_t **sched_cond);
+
+void starpu_worker_init_sched_condition(unsigned sched_ctx, int workerid);
+
+void starpu_worker_deinit_sched_condition(unsigned sched_ctx, int workerid);
 
 /* Check if the worker specified by workerid can execute the codelet. */
 int starpu_worker_may_execute_task(unsigned workerid, struct starpu_task *task, unsigned nimpl);

File diff suppressed because it is too large
+ 1 - 1
sched_ctx_hypervisor/examples/cholesky_2ctxs/cholesky_2ctxs


+ 5 - 4
sched_ctx_hypervisor/src/sched_ctx_hypervisor.c

@@ -106,8 +106,9 @@ static int compute_priority_per_sched_ctx(unsigned sched_ctx)
 	int i;
 	int total_priority = 0;
 
-	int nworkers_ctx = 0;
-	int *workers = starpu_get_workers_of_ctx(sched_ctx, &nworkers_ctx);
+	int nworkers_ctx = starpu_get_nworkers_of_ctx(sched_ctx);
+	int *workers = starpu_get_workers_of_ctx(sched_ctx);
+
 	sched_ctx_wrapper->current_nprocs = nworkers_ctx;
 
 	for(i = 0; i < sched_ctx_wrapper->current_nprocs; i++)
@@ -140,8 +141,8 @@ static struct sched_ctx_wrapper* find_highest_priority_sched_ctx(unsigned sched_
 
 static int* sort_workers_by_priority(unsigned sched_ctx, int worker)
 {
-	int nworkers_ctx = 0;
-	int *workers = starpu_get_workers_of_ctx(sched_ctx, &nworkers_ctx);
+	int nworkers_ctx = starpu_get_nworkers_of_ctx(sched_ctx);
+	int *workers = starpu_get_workers_of_ctx(sched_ctx);
 	
 	struct sched_ctx_wrapper *sched_ctx_wrapper = &hypervisor.sched_ctx_wrapper[sched_ctx];
 	sched_ctx_wrapper->current_nprocs = nworkers_ctx;

+ 5 - 5
src/core/jobs.c

@@ -237,12 +237,12 @@ void _starpu_handle_job_termination(starpu_job_t j, unsigned job_is_already_lock
 	else {
 		_starpu_decrement_nsubmitted_tasks();
 	}
+
 	if(workerid >= 0)
 	{
 		_starpu_decrement_nsubmitted_tasks_of_worker(workerid);
 		_starpu_decrement_nsubmitted_tasks_of_sched_ctx(sched_ctx);
-	}
-			
+	}			
 }
 
 /* This function is called when a new task is submitted to StarPU 
@@ -388,15 +388,15 @@ int _starpu_push_local_task(struct starpu_worker_s *worker, struct starpu_task *
 	if (STARPU_UNLIKELY(!(worker->worker_mask & task->cl->where)))
 		return -ENODEV;
 
-	PTHREAD_MUTEX_LOCK(worker->sched_mutex);
+	PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
 
 	if (back)
 		starpu_task_list_push_back(&worker->local_tasks, task);
 	else
 		starpu_task_list_push_front(&worker->local_tasks, task);
 
-	PTHREAD_COND_BROADCAST(worker->sched_cond);
-	PTHREAD_MUTEX_UNLOCK(worker->sched_mutex);
+	PTHREAD_COND_BROADCAST(&worker->sched_cond);
+	PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
 
 	return 0;
 }

+ 56 - 6
src/core/sched_ctx.c

@@ -332,12 +332,16 @@ void _starpu_delete_all_sched_ctxs()
 {
 	unsigned i;
 	for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
-	  {
+	{
 		struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(i);
-		_starpu_barrier_counter_destroy(&sched_ctx->tasks_barrier);
 		if(sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
-			free_sched_ctx_mem(sched_ctx);
-	  }
+		{
+			_starpu_deinit_sched_policy(sched_ctx);		
+			_starpu_barrier_counter_destroy(&sched_ctx->tasks_barrier);
+			if(sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
+				free_sched_ctx_mem(sched_ctx);
+		}
+	}
 	return;
 }
 
@@ -598,10 +602,9 @@ pthread_cond_t *_starpu_get_sched_cond(struct starpu_sched_ctx *sched_ctx, int w
 	return sched_ctx->sched_cond[workerid];
 }
 
-int* starpu_get_workers_of_ctx(unsigned sched_ctx_id, int *nworkers)
+int* starpu_get_workers_of_ctx(unsigned sched_ctx_id)
 {
 	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	*nworkers = sched_ctx->nworkers;
 	return sched_ctx->workerids;
 }
 
@@ -623,5 +626,52 @@ unsigned _starpu_get_nsched_ctxs()
 	return config->topology.nsched_ctxs;
 }
 
+unsigned starpu_get_nworkers_of_ctx(unsigned sched_ctx_id)
+{
+	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	return sched_ctx->nworkers;
+}
+
+void starpu_set_sched_ctx_policy_data(unsigned sched_ctx_id, void* policy_data)
+{
+	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	sched_ctx->policy_data = policy_data;
+}
 
+void* starpu_get_sched_ctx_policy_data(unsigned sched_ctx_id)
+{
+	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	return sched_ctx->policy_data;
+}
 
+void starpu_worker_set_sched_condition(unsigned sched_ctx_id, int workerid, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond)
+{
+	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	sched_ctx->sched_mutex[workerid] = sched_mutex;
+	sched_ctx->sched_cond[workerid] = sched_cond;
+}
+
+void starpu_worker_get_sched_condition(unsigned sched_ctx_id, int workerid, pthread_mutex_t **sched_mutex, pthread_cond_t **sched_cond)
+{
+	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	*sched_mutex = sched_ctx->sched_mutex[workerid];
+	*sched_cond = sched_ctx->sched_cond[workerid];
+}
+
+void starpu_worker_init_sched_condition(unsigned sched_ctx_id, int workerid)
+{
+	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	sched_ctx->sched_mutex[workerid] = (pthread_mutex_t*)malloc(sizeof(pthread_mutex_t));
+	sched_ctx->sched_cond[workerid] = (pthread_cond_t*)malloc(sizeof(pthread_cond_t));
+	PTHREAD_MUTEX_INIT(sched_ctx->sched_mutex[workerid], NULL);
+	PTHREAD_COND_INIT(sched_ctx->sched_cond[workerid], NULL);
+}
+
+void starpu_worker_deinit_sched_condition(unsigned sched_ctx_id, int workerid)
+{
+	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	PTHREAD_MUTEX_DESTROY(sched_ctx->sched_mutex[workerid]);
+	PTHREAD_COND_DESTROY(sched_ctx->sched_cond[workerid]);
+	free(sched_ctx->sched_mutex[workerid]);
+	free(sched_ctx->sched_cond[workerid]);
+}

+ 3 - 6
src/core/sched_ctx.h

@@ -92,15 +92,12 @@ void _starpu_increment_nsubmitted_tasks_of_sched_ctx(unsigned sched_ctx_id);
 /* Return the corresponding index of the workerid in the ctx table */
 int _starpu_get_index_in_ctx_of_workerid(unsigned sched_ctx, unsigned workerid);
 
-/* Get the mutex corresponding to the global workerid */
-pthread_mutex_t *_starpu_get_sched_mutex(struct starpu_sched_ctx *sched_ctx, int worker);
-
-/* Get the cond corresponding to the global workerid */
-pthread_cond_t *_starpu_get_sched_cond(struct starpu_sched_ctx *sched_ctx, int worker);
-
 /* Get the total number of sched_ctxs created till now */
 unsigned _starpu_get_nsched_ctxs();
 
+/* Get the mutex corresponding to the global workerid */
+pthread_mutex_t *_starpu_get_sched_mutex(struct starpu_sched_ctx *sched_ctx, int worker);
+
 /* Treat add workers requests */
 void _starpu_actually_add_workers_to_sched_ctx(struct starpu_sched_ctx *sched_ctx);
 

+ 7 - 7
src/core/sched_policy.c

@@ -205,7 +205,7 @@ void _starpu_init_sched_policy(struct starpu_machine_config_s *config, struct st
 	sched_ctx->sched_policy->init_sched(sched_ctx->id);
 }
 
-void _starpu_deinit_sched_policy(struct starpu_machine_config_s *config, struct starpu_sched_ctx *sched_ctx)
+void _starpu_deinit_sched_policy(struct starpu_sched_ctx *sched_ctx)
 {
         struct starpu_sched_policy_s *policy = sched_ctx->sched_policy;
 	if (policy->deinit_sched)
@@ -334,10 +334,10 @@ struct starpu_task *_starpu_pop_task(struct starpu_worker_s *worker)
 	if (profiling)
 		starpu_clock_gettime(&pop_start_time);
 	
-	PTHREAD_MUTEX_LOCK(worker->sched_mutex);
+	PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
 	/* perhaps there is some local task to be executed first */
 	task = _starpu_pop_local_task(worker);
-	PTHREAD_MUTEX_UNLOCK(worker->sched_mutex);
+	PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
 	
 	
 	/* get tasks from the stacks of the strategy */
@@ -359,8 +359,8 @@ struct starpu_task *_starpu_pop_task(struct starpu_worker_s *worker)
 					PTHREAD_MUTEX_LOCK(sched_ctx_mutex);
 					if (sched_ctx->sched_policy->pop_task)
 					{
-						PTHREAD_MUTEX_UNLOCK(sched_ctx_mutex);
 						task = sched_ctx->sched_policy->pop_task(sched_ctx->id);
+						PTHREAD_MUTEX_UNLOCK(sched_ctx_mutex);
 						break;
 					}
 					PTHREAD_MUTEX_UNLOCK(sched_ctx_mutex);
@@ -431,18 +431,18 @@ void _starpu_wait_on_sched_event(void)
 {
  	struct starpu_worker_s *worker = _starpu_get_local_worker_key();
 
-	PTHREAD_MUTEX_LOCK(worker->sched_mutex);
+	PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
 
 	_starpu_handle_all_pending_node_data_requests(worker->memory_node);
 
 	if (_starpu_machine_is_running())
 	{
 #ifndef STARPU_NON_BLOCKING_DRIVERS
-		pthread_cond_wait(worker->sched_cond, worker->sched_mutex);
+		pthread_cond_wait(&worker->sched_cond, &worker->sched_mutex);
 #endif
 	}
 
-	PTHREAD_MUTEX_UNLOCK(worker->sched_mutex);
+	PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
 }
 
 /* The scheduling policy may put tasks directly into a worker's local queue so

+ 1 - 1
src/core/sched_policy.h

@@ -29,7 +29,7 @@ struct starpu_sched_policy_s *_starpu_get_sched_policy( struct starpu_sched_ctx
 void _starpu_init_sched_policy(struct starpu_machine_config_s *config, 
 			       struct starpu_sched_ctx *sched_ctx, const char *policy_name);
 
-void _starpu_deinit_sched_policy(struct starpu_machine_config_s *config, struct starpu_sched_ctx *sched_ctx);
+void _starpu_deinit_sched_policy(struct starpu_sched_ctx *sched_ctx);
 
 int _starpu_push_task(starpu_job_t task, unsigned job_is_already_locked);
 /* pop a task that can be executed on the worker */

+ 5 - 14
src/core/workers.c

@@ -36,9 +36,6 @@
 static pthread_mutex_t init_mutex = PTHREAD_MUTEX_INITIALIZER;
 static pthread_cond_t init_cond = PTHREAD_COND_INITIALIZER;
 
-static pthread_mutex_t local_list_sched_mutex[STARPU_NMAXWORKERS]; 
-static pthread_cond_t local_list_sched_cond[STARPU_NMAXWORKERS];
-
 static int init_count;
 static enum { UNINITIALIZED, CHANGING, INITIALIZED } initialized = UNINITIALIZED;
 
@@ -150,8 +147,8 @@ static struct starpu_worker_set_s gordon_worker_set;
 
 static void _starpu_init_worker_queue(struct starpu_worker_s *workerarg)
 {
-	pthread_cond_t *cond = workerarg->sched_cond;
-	pthread_mutex_t *mutex = workerarg->sched_mutex;
+	pthread_cond_t *cond = &workerarg->sched_cond;
+	pthread_mutex_t *mutex = &workerarg->sched_mutex;
 
 	unsigned memory_node = workerarg->memory_node;
 
@@ -186,10 +183,10 @@ static void _starpu_launch_drivers(struct starpu_machine_config_s *config)
 		/* mutex + cond only for the local list */
 		/* we have a single local list */
 		/* afterwards there would be a mutex + cond for the list of each strategy */
-		PTHREAD_MUTEX_INIT(&local_list_sched_mutex[worker], NULL);
-		PTHREAD_COND_INIT(&local_list_sched_cond[worker], NULL);
 
-		starpu_worker_set_sched_condition(worker, &local_list_sched_cond[worker], &local_list_sched_mutex[worker]);
+		PTHREAD_MUTEX_INIT(&workerarg->sched_mutex, NULL);
+		PTHREAD_COND_INIT(&workerarg->sched_cond, NULL);
+
 		/* if some codelet's termination cannot be handled directly :
 		 * for instance in the Gordon driver, Gordon tasks' callbacks
 		 * may be executed by another thread than that of the Gordon
@@ -738,12 +735,6 @@ void _starpu_worker_set_status(int workerid, starpu_worker_status status)
 	config.workers[workerid].status = status;
 }
 
-void starpu_worker_set_sched_condition(int workerid, pthread_cond_t *sched_cond, pthread_mutex_t *sched_mutex)
-{
-	config.workers[workerid].sched_cond = sched_cond;
-	config.workers[workerid].sched_mutex = sched_mutex;
-}
-
 struct starpu_sched_ctx* _starpu_get_initial_sched_ctx(void){
 	return &config.sched_ctxs[0];
 }

+ 3 - 2
src/core/workers.h

@@ -69,8 +69,8 @@ struct starpu_worker_s {
 	int worker_size; /* size of the worker in case we use a combined worker */
         pthread_cond_t ready_cond; /* indicate when the worker is ready */
 	unsigned memory_node; /* which memory node is associated that worker to ? */
-	pthread_cond_t *sched_cond; /* condition variable used when the worker waits for tasks. */
-	pthread_mutex_t *sched_mutex; /* mutex protecting sched_cond */
+	pthread_cond_t sched_cond; /* condition variable used when the worker waits for tasks. */
+	pthread_mutex_t sched_mutex; /* mutex protecting sched_cond */
 	struct starpu_task_list local_tasks; /* this queue contains tasks that have been explicitely submitted to that queue */
 	struct starpu_worker_set_s *set; /* in case this worker belongs to a set */
 	struct starpu_job_list_s *terminated_jobs; /* list of pending jobs which were executed */
@@ -225,4 +225,5 @@ unsigned _starpu_execute_registered_progression_hooks(void);
 
 /* We keep an initial sched ctx which might be used in case no other ctx is available */
 struct starpu_sched_ctx* _starpu_get_initial_sched_ctx(void);
+
 #endif // __WORKERS_H__

+ 2 - 2
src/drivers/cpu/driver_cpu.c

@@ -128,8 +128,8 @@ void *_starpu_cpu_worker(void *arg)
 
 	int res;
 
-	pthread_cond_t *sched_cond = cpu_arg->sched_cond;
-	pthread_mutex_t *sched_mutex = cpu_arg->sched_mutex;
+	pthread_cond_t *sched_cond = &cpu_arg->sched_cond;
+	pthread_mutex_t *sched_mutex = &cpu_arg->sched_mutex;
 
 	while (_starpu_machine_is_running())
 	{

+ 2 - 2
src/drivers/cuda/driver_cuda.c

@@ -275,8 +275,8 @@ void *_starpu_cuda_worker(void *arg)
 	struct starpu_task *task;
 	int res;
 
-	pthread_cond_t *sched_cond = args->sched_cond;
-	pthread_mutex_t *sched_mutex = args->sched_mutex;
+	pthread_cond_t *sched_cond = &args->sched_cond;
+	pthread_mutex_t *sched_mutex = &args->sched_mutex;
 
 	while (_starpu_machine_is_running())
 	{

+ 3 - 3
src/drivers/opencl/driver_opencl.c

@@ -425,8 +425,8 @@ void *_starpu_opencl_worker(void *arg)
 	struct starpu_task *task;
 	int res;
 
-	pthread_cond_t *sched_cond = args->sched_cond;
-        pthread_mutex_t *sched_mutex = args->sched_mutex;
+	pthread_cond_t *sched_cond = &args->sched_cond;
+        pthread_mutex_t *sched_mutex = &args->sched_mutex;
 
 	while (_starpu_machine_is_running())
 	{
@@ -447,7 +447,7 @@ void *_starpu_opencl_worker(void *arg)
 			continue;
 		};
 
-		PTHREAD_MUTEX_UNLOCK(args->sched_mutex);
+		PTHREAD_MUTEX_UNLOCK(sched_mutex);
 
 		STARPU_ASSERT(task);
 		j = _starpu_get_job_associated_to_task(task);

+ 67 - 75
src/sched_policies/deque_modeling_policy_data_aware.c

@@ -119,14 +119,13 @@ static struct starpu_task *_starpu_fifo_pop_first_ready_task(struct starpu_fifo_
 
 static struct starpu_task *dmda_pop_ready_task(unsigned sched_ctx_id)
 {
-	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	dmda_data *dt = (dmda_data*)sched_ctx->policy_data;
+	dmda_data *dt = (dmda_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
 
 	struct starpu_task *task;
 
 	int workerid = starpu_worker_get_id();
-	int workerid_ctx =  _starpu_get_index_in_ctx_of_workerid(sched_ctx_id, workerid);
-	struct starpu_fifo_taskq_s *fifo = dt->queue_array[workerid_ctx];
+
+	struct starpu_fifo_taskq_s *fifo = dt->queue_array[workerid];
 
 	unsigned node = starpu_worker_get_memory_node(workerid);
 
@@ -155,14 +154,12 @@ static struct starpu_task *dmda_pop_ready_task(unsigned sched_ctx_id)
 
 static struct starpu_task *dmda_pop_task(unsigned sched_ctx_id)
 {
-	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	dmda_data *dt = (dmda_data*)sched_ctx->policy_data;
+	dmda_data *dt = (dmda_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
 
 	struct starpu_task *task;
 
 	int workerid = starpu_worker_get_id();
-	int workerid_ctx =  _starpu_get_index_in_ctx_of_workerid(sched_ctx_id, workerid);
-	struct starpu_fifo_taskq_s *fifo = dt->queue_array[workerid_ctx];
+	struct starpu_fifo_taskq_s *fifo = dt->queue_array[workerid];
 
 	task = _starpu_fifo_pop_task(fifo, -1);
 	if (task) {
@@ -191,16 +188,17 @@ static struct starpu_task *dmda_pop_task(unsigned sched_ctx_id)
 
 static struct starpu_task *dmda_pop_every_task(unsigned sched_ctx_id)
 {
-	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	dmda_data *dt = (dmda_data*)sched_ctx->policy_data;
+	dmda_data *dt = (dmda_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
 
 	struct starpu_task *new_list;
 
 	int workerid = starpu_worker_get_id();
-	int workerid_ctx =  _starpu_get_index_in_ctx_of_workerid(sched_ctx_id, workerid);
-	struct starpu_fifo_taskq_s *fifo = dt->queue_array[workerid_ctx];
+	struct starpu_fifo_taskq_s *fifo = dt->queue_array[workerid];
 
-	new_list = _starpu_fifo_pop_every_task(fifo, sched_ctx->sched_mutex[workerid_ctx], workerid);
+	pthread_mutex_t *sched_mutex;
+	pthread_cond_t *sched_cond;
+	starpu_worker_get_sched_condition(sched_ctx_id, workerid, &sched_mutex, &sched_cond);
+	new_list = _starpu_fifo_pop_every_task(fifo, sched_mutex, workerid);
 
 	while (new_list)
 	{
@@ -283,16 +281,14 @@ int _starpu_fifo_push_sorted_task(struct starpu_fifo_taskq_s *fifo_queue, pthrea
 
 
 
-static int push_task_on_best_worker(struct starpu_task *task, int best_workerid, double predicted, int prio, struct starpu_sched_ctx *sched_ctx)
+static int push_task_on_best_worker(struct starpu_task *task, int best_workerid, double predicted, int prio, unsigned sched_ctx_id)
 {
-	dmda_data *dt = (dmda_data*)sched_ctx->policy_data;
+	dmda_data *dt = (dmda_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
 	/* make sure someone coule execute that task ! */
 	STARPU_ASSERT(best_workerid != -1);
 
-	int best_workerid_ctx =  _starpu_get_index_in_ctx_of_workerid(sched_ctx->id, best_workerid);
-
 	struct starpu_fifo_taskq_s *fifo;
-	fifo = dt->queue_array[best_workerid_ctx];
+	fifo = dt->queue_array[best_workerid];
 
 	fifo->exp_end += predicted;
 	fifo->exp_len += predicted;
@@ -304,17 +300,20 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 	if (starpu_get_prefetch_flag())
 		starpu_prefetch_task_input_on_node(task, memory_node);
 
+	pthread_mutex_t *sched_mutex;
+	pthread_cond_t *sched_cond;
+	starpu_worker_get_sched_condition(sched_ctx_id, best_workerid, &sched_mutex, &sched_cond);
 	if (prio)
-		return _starpu_fifo_push_sorted_task(dt->queue_array[best_workerid_ctx],
-			sched_ctx->sched_mutex[best_workerid_ctx], sched_ctx->sched_cond[best_workerid_ctx], task);
+		return _starpu_fifo_push_sorted_task(dt->queue_array[best_workerid],
+			sched_mutex, sched_cond, task);
 	else
-		return _starpu_fifo_push_task(dt->queue_array[best_workerid_ctx],
-			sched_ctx->sched_mutex[best_workerid_ctx], sched_ctx->sched_cond[best_workerid_ctx], task);
+		return _starpu_fifo_push_task(dt->queue_array[best_workerid],
+			sched_mutex, sched_cond, task);
 }
 
-static int _dm_push_task(struct starpu_task *task, unsigned prio, struct starpu_sched_ctx *sched_ctx)
+static int _dm_push_task(struct starpu_task *task, unsigned prio, unsigned sched_ctx_id)
 {
-	dmda_data *dt = (dmda_data*)sched_ctx->policy_data;
+	dmda_data *dt = (dmda_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
 	/* find the queue */
 	struct starpu_fifo_taskq_s *fifo;
 	unsigned worker, worker_ctx;
@@ -332,15 +331,16 @@ static int _dm_push_task(struct starpu_task *task, unsigned prio, struct starpu_
 
 	unsigned best_impl = 0;
 	unsigned nimpl;
-	unsigned nworkers = sched_ctx->nworkers;
+	unsigned nworkers = starpu_get_nworkers_of_ctx(sched_ctx_id);
+	int *workerids = starpu_get_workers_of_ctx(sched_ctx_id);
 	for (worker_ctx = 0; worker_ctx < nworkers; worker_ctx++)
 	{
 		for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
 		{
-        	worker = sched_ctx->workerids[worker_ctx];
+			worker = workerids[worker_ctx];
 			double exp_end;
 		
-			fifo = dt->queue_array[worker_ctx];
+			fifo = dt->queue_array[worker];
 
 			/* Sometimes workers didn't take the tasks as early as we expected */
 			fifo->exp_start = STARPU_MAX(fifo->exp_start, starpu_timing_now());
@@ -406,12 +406,12 @@ static int _dm_push_task(struct starpu_task *task, unsigned prio, struct starpu_
 	 _starpu_get_job_associated_to_task(task)->nimpl = 0;//best_impl;
 
 	/* we should now have the best worker in variable "best" */
-	return push_task_on_best_worker(task, best, model_best, prio, sched_ctx);
+	return push_task_on_best_worker(task, best, model_best, prio, sched_ctx_id);
 }
 
-static int _dmda_push_task(struct starpu_task *task, unsigned prio, struct starpu_sched_ctx *sched_ctx)
+static int _dmda_push_task(struct starpu_task *task, unsigned prio, unsigned sched_ctx_id)
 {
-	dmda_data *dt = (dmda_data*)sched_ctx->policy_data;
+	dmda_data *dt = (dmda_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
 	/* find the queue */
 	struct starpu_fifo_taskq_s *fifo;
 	unsigned worker, worker_ctx;
@@ -421,7 +421,8 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio, struct starp
 	   there is no performance prediction available yet */
 	int forced_best = -1;
 
-	unsigned nworkers_ctx = sched_ctx->nworkers;
+	unsigned nworkers_ctx = starpu_get_nworkers_of_ctx(sched_ctx_id);
+	int *workerids = starpu_get_workers_of_ctx(sched_ctx_id);
 	double local_task_length[nworkers_ctx];
 	double local_data_penalty[nworkers_ctx];
 	double local_power[nworkers_ctx];
@@ -445,10 +446,10 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio, struct starp
 	unsigned nimpl=0;
 	for (worker_ctx = 0; worker_ctx < nworkers_ctx; worker_ctx++)
 	{
-		worker = sched_ctx->workerids[worker_ctx];
+		worker = workerids[worker_ctx];
 		for(nimpl  = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
 	 	{
-			fifo = dt->queue_array[worker_ctx];
+			fifo = dt->queue_array[worker];
 
 			/* Sometimes workers didn't take the tasks as early as we expected */
 			fifo->exp_start = STARPU_MAX(fifo->exp_start, starpu_timing_now());
@@ -521,9 +522,9 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio, struct starp
 		{
 	        for (worker_ctx = 0; worker_ctx < nworkers_ctx; worker_ctx++)
 	        {
-		        worker = sched_ctx->workerids[worker_ctx];
+		        worker = workerids[worker_ctx];
 
-				fifo = dt->queue_array[worker_ctx];
+			fifo = dt->queue_array[worker];
 	
 			if (!starpu_worker_may_execute_task(worker, task, 0))
 			{
@@ -575,47 +576,40 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio, struct starp
 	 _starpu_get_job_associated_to_task(task)->nimpl = best_impl;
 
 	/* we should now have the best worker in variable "best" */
-	return push_task_on_best_worker(task, best, model_best, prio, sched_ctx);
+	return push_task_on_best_worker(task, best, model_best, prio, sched_ctx_id);
 }
 
 static int dmda_push_sorted_task(struct starpu_task *task, unsigned sched_ctx_id)
 {
-	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	return _dmda_push_task(task, 2, sched_ctx);
+	return _dmda_push_task(task, 2, sched_ctx_id);
 }
 
 static int dm_push_task(struct starpu_task *task, unsigned sched_ctx_id)
 {
-	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	return _dm_push_task(task, 0, sched_ctx);
+	return _dm_push_task(task, 0, sched_ctx_id);
 }
 
 static int dmda_push_task(struct starpu_task *task, unsigned sched_ctx_id)
 {
-	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	return _dmda_push_task(task, 0, sched_ctx);
+	return _dmda_push_task(task, 0, sched_ctx_id);
 }
 
-static void initialize_dmda_policy_for_workers(unsigned sched_ctx_id, unsigned nnew_workers) 
+static void initialize_dmda_policy_for_workers(unsigned sched_ctx_id, int *workerids, unsigned nnew_workers) 
 {
-	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	unsigned nworkers = sched_ctx->nworkers;
-	dmda_data *dt = (dmda_data*)sched_ctx->policy_data;
-
-	struct starpu_machine_config_s *config = (struct starpu_machine_config_s *)_starpu_get_machine_config();
-	unsigned ntotal_workers = config->topology.nworkers;
-
-	unsigned all_workers = nnew_workers == ntotal_workers ? ntotal_workers : nworkers + nnew_workers;
+	unsigned nworkers = starpu_get_nworkers_of_ctx(sched_ctx_id);
+	dmda_data *dt = (dmda_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
 
-	unsigned workerid_ctx;
-	for (workerid_ctx = nworkers; workerid_ctx < all_workers; workerid_ctx++)
+	
+	int workerid;
+	unsigned i;
+	pthread_mutex_t *sched_mutex;
+	pthread_cond_t *sched_cond;
+	for (i = 0; i < nnew_workers; i++)
 	{
-		dt->queue_array[workerid_ctx] = _starpu_create_fifo();
+		workerid = workerids[i];
+		dt->queue_array[workerid] = _starpu_create_fifo();
 	
-		sched_ctx->sched_mutex[workerid_ctx] = (pthread_mutex_t*)malloc(sizeof(pthread_mutex_t));
-		sched_ctx->sched_cond[workerid_ctx] = (pthread_cond_t*)malloc(sizeof(pthread_cond_t));
-		PTHREAD_MUTEX_INIT(sched_ctx->sched_mutex[workerid_ctx], NULL);
-		PTHREAD_COND_INIT(sched_ctx->sched_cond[workerid_ctx], NULL);
+		starpu_worker_init_sched_condition(sched_ctx_id, workerid);
 	}
 
 }
@@ -628,9 +622,9 @@ static void initialize_dmda_policy(unsigned sched_ctx_id)
 	dt->_gamma = STARPU_DEFAULT_GAMMA;
 	dt->idle_power = 0.0;
 
-	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	unsigned nworkers = sched_ctx->nworkers;
-	sched_ctx->policy_data = (void*)dt;
+	unsigned nworkers = starpu_get_nworkers_of_ctx(sched_ctx_id);
+	int *workerids = starpu_get_workers_of_ctx(sched_ctx_id);
+	starpu_set_sched_ctx_policy_data(sched_ctx_id, (void*)dt);
 
 	dt->queue_array = (struct starpu_fifo_taskq_s**)malloc(STARPU_NMAXWORKERS*sizeof(struct starpu_fifo_taskq_s*));
 
@@ -647,14 +641,13 @@ static void initialize_dmda_policy(unsigned sched_ctx_id)
 		dt->_gamma = atof(strval_gamma);
 
 	unsigned workerid_ctx;
+	int workerid;
 	for (workerid_ctx = 0; workerid_ctx < nworkers; workerid_ctx++)
 	{
-		dt->queue_array[workerid_ctx] = _starpu_create_fifo();
+		workerid = workerids[workerid_ctx];
+		dt->queue_array[workerid] = _starpu_create_fifo();
 	
-		sched_ctx->sched_mutex[workerid_ctx] = (pthread_mutex_t*)malloc(sizeof(pthread_mutex_t));
-		sched_ctx->sched_cond[workerid_ctx] = (pthread_cond_t*)malloc(sizeof(pthread_cond_t));
-		PTHREAD_MUTEX_INIT(sched_ctx->sched_mutex[workerid_ctx], NULL);
-		PTHREAD_COND_INIT(sched_ctx->sched_cond[workerid_ctx], NULL);
+		starpu_worker_init_sched_condition(sched_ctx_id, workerid);
 	}
 }
 
@@ -669,16 +662,15 @@ static void initialize_dmda_sorted_policy(unsigned sched_ctx_id)
 
 static void deinitialize_dmda_policy(unsigned sched_ctx_id) 
 {
-	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	dmda_data *dt = (dmda_data*)sched_ctx->policy_data;
-	int workerid_ctx;
-        int nworkers = sched_ctx->nworkers;
+
+	dmda_data *dt = (dmda_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
+	int workerid_ctx, workerid;
+        int nworkers = starpu_get_nworkers_of_ctx(sched_ctx_id);
+	int *workerids = starpu_get_workers_of_ctx(sched_ctx_id);
 	for (workerid_ctx = 0; workerid_ctx < nworkers; workerid_ctx++){
-		_starpu_destroy_fifo(dt->queue_array[workerid_ctx]);
-		PTHREAD_MUTEX_DESTROY(sched_ctx->sched_mutex[workerid_ctx]);
-                PTHREAD_COND_DESTROY(sched_ctx->sched_cond[workerid_ctx]);
-		free(sched_ctx->sched_mutex[workerid_ctx]);
-                free(sched_ctx->sched_cond[workerid_ctx]);
+		workerid = workerids[workerid_ctx];
+		_starpu_destroy_fifo(dt->queue_array[workerid]);
+		starpu_worker_deinit_sched_condition(sched_ctx_id, workerid);
 	}
 
 	free(dt->queue_array);

+ 59 - 36
src/sched_policies/eager_central_policy.c

@@ -24,20 +24,24 @@
 #include <core/workers.h>
 #include <sched_policies/fifo_queues.h>
 
-static void initialize_eager_center_policy_for_workers(unsigned sched_ctx_id, unsigned nnew_workers) 
+typedef struct eager_center_policy_data {
+	struct starpu_fifo_taskq_s *fifo;
+	pthread_mutex_t sched_mutex;
+	pthread_cond_t sched_cond;
+} eager_center_policy_data;
+
+static void initialize_eager_center_policy_for_workers(unsigned sched_ctx_id, int *workerids, unsigned nnew_workers) 
 {
 	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	struct eager_center_policy_data *data = (struct eager_center_policy_data*)sched_ctx->policy_data;
 
-	unsigned nworkers_ctx = sched_ctx->nworkers;
-	struct starpu_machine_config_s *config = (struct starpu_machine_config_s *)_starpu_get_machine_config();
-	unsigned ntotal_workers = config->topology.nworkers;
-
-	unsigned all_workers = nnew_workers == ntotal_workers ? ntotal_workers : nworkers_ctx + nnew_workers;
-
-	unsigned workerid_ctx;
-	for (workerid_ctx = nworkers_ctx; workerid_ctx < all_workers; workerid_ctx++){
-		sched_ctx->sched_mutex[workerid_ctx] = sched_ctx->sched_mutex[0];
-		sched_ctx->sched_cond[workerid_ctx] = sched_ctx->sched_cond[0];
+	unsigned i;
+	int workerid;
+	for (i = 0; i < nnew_workers; i++)
+	{
+		workerid = workerids[i];
+		sched_ctx->sched_mutex[workerid] = &data->sched_mutex;
+		sched_ctx->sched_cond[workerid] = &data->sched_cond;
 	}
 }
 
@@ -45,20 +49,23 @@ static void initialize_eager_center_policy(unsigned sched_ctx_id)
 {
 	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
 
+	struct eager_center_policy_data *data = (struct eager_center_policy_data*)malloc(sizeof(eager_center_policy_data));
+
 	/* there is only a single queue in that trivial design */
-	struct starpu_fifo_taskq_s *fifo =  _starpu_create_fifo();
-	sched_ctx->policy_data = (void*)fifo;
-
-	pthread_mutex_t *sched_mutex = (pthread_mutex_t*) malloc(sizeof(pthread_mutex_t));
-	pthread_cond_t *sched_cond = (pthread_cond_t*) malloc(sizeof(pthread_cond_t));
-	PTHREAD_MUTEX_INIT(sched_mutex, NULL);
-	PTHREAD_COND_INIT(sched_cond, NULL);
-
-	int workerid_ctx;
-	int nworkers = sched_ctx->nworkers;
-	for (workerid_ctx = 0; workerid_ctx < nworkers; workerid_ctx++){
-		sched_ctx->sched_mutex[workerid_ctx] = sched_mutex;
-		sched_ctx->sched_cond[workerid_ctx] = sched_cond;
+	data->fifo =  _starpu_create_fifo();
+
+	PTHREAD_MUTEX_INIT(&data->sched_mutex, NULL);
+	PTHREAD_COND_INIT(&data->sched_cond, NULL);
+
+	sched_ctx->policy_data = (void*)data;
+
+	int workerid;
+	unsigned workerid_ctx;
+	int nworkers_ctx = sched_ctx->nworkers;
+	for (workerid_ctx = 0; workerid_ctx < nworkers_ctx; workerid_ctx++){
+		workerid = sched_ctx->workerids[workerid_ctx];
+		sched_ctx->sched_mutex[workerid] = &data->sched_mutex;
+		sched_ctx->sched_cond[workerid] = &data->sched_cond;
 	}
 }
 
@@ -67,22 +74,34 @@ static void deinitialize_eager_center_policy(unsigned sched_ctx_id)
 	/* TODO check that there is no task left in the queue */
 
 	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	struct starpu_fifo_taskq_s *fifo = (struct starpu_fifo_taskq_s*)sched_ctx->policy_data;
+	struct eager_center_policy_data *data = (struct eager_center_policy_data*)sched_ctx->policy_data;
+
 
 	/* deallocate the job queue */
-	_starpu_destroy_fifo(fifo);
+	_starpu_destroy_fifo(data->fifo);
 
-	PTHREAD_MUTEX_DESTROY(sched_ctx->sched_mutex[0]);
-	PTHREAD_COND_DESTROY(sched_ctx->sched_cond[0]);
-	free(sched_ctx->sched_mutex[0]);
-	free(sched_ctx->sched_cond[0]);
+	PTHREAD_MUTEX_DESTROY(&data->sched_mutex);
+	PTHREAD_COND_DESTROY(&data->sched_cond);
+	
+	free(data);	
 	
-	free(fifo);
+	unsigned nworkers_ctx = sched_ctx->nworkers;
+	int workerid;
+	unsigned workerid_ctx;
+	for (workerid_ctx = 0; workerid_ctx < nworkers_ctx; workerid_ctx++)
+	{
+		workerid = sched_ctx->workerids[workerid_ctx];
+		sched_ctx->sched_mutex[workerid] = NULL;
+		sched_ctx->sched_cond[workerid] = NULL;
+	}
+
 }
 
 static int push_task_eager_policy(struct starpu_task *task, unsigned sched_ctx_id)
 {
 	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	struct eager_center_policy_data *data = (struct eager_center_policy_data*)sched_ctx->policy_data;
+
 	int i;
 	int workerid;
 	for(i = 0; i < sched_ctx->nworkers; i++){
@@ -90,22 +109,26 @@ static int push_task_eager_policy(struct starpu_task *task, unsigned sched_ctx_i
 		_starpu_increment_nsubmitted_tasks_of_worker(workerid);
 	}
 
-	struct starpu_fifo_taskq_s *fifo = (struct starpu_fifo_taskq_s*)sched_ctx->policy_data;
-	return _starpu_fifo_push_task(fifo, sched_ctx->sched_mutex[0], sched_ctx->sched_cond[0], task);
+	struct starpu_fifo_taskq_s *fifo = data->fifo;
+	return _starpu_fifo_push_task(fifo, &data->sched_mutex, &data->sched_cond, task);
 }
 
 static struct starpu_task *pop_every_task_eager_policy(unsigned sched_ctx_id)
 {
 	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	struct starpu_fifo_taskq_s *fifo = (struct starpu_fifo_taskq_s*)sched_ctx->policy_data;
-	return _starpu_fifo_pop_every_task(fifo, sched_ctx->sched_mutex[0], starpu_worker_get_id());
+	struct eager_center_policy_data *data = (struct eager_center_policy_data*)sched_ctx->policy_data;
+
+	struct starpu_fifo_taskq_s *fifo = data->fifo;
+	return _starpu_fifo_pop_every_task(fifo, &data->sched_mutex, starpu_worker_get_id());
 }
 
 static struct starpu_task *pop_task_eager_policy(unsigned sched_ctx_id)
 {
         unsigned workerid = starpu_worker_get_id();
 	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	struct starpu_fifo_taskq_s *fifo = (struct starpu_fifo_taskq_s*)sched_ctx->policy_data;
+	struct eager_center_policy_data *data = (struct eager_center_policy_data*)sched_ctx->policy_data;
+
+	struct starpu_fifo_taskq_s *fifo = data->fifo;
 	struct starpu_task *task =  _starpu_fifo_pop_task(fifo, workerid);
 
 	if(task)

+ 53 - 35
src/sched_policies/eager_central_priority_policy.c

@@ -41,6 +41,12 @@ struct starpu_priority_taskq_s {
 	unsigned total_ntasks;
 };
 
+typedef struct eager_central_prio_data{
+	struct starpu_priority_taskq_s *taskq;
+	pthread_mutex_t sched_mutex;
+	pthread_cond_t sched_cond;
+} eager_central_prio_data;
+
 /*
  * Centralized queue with priorities 
  */
@@ -67,21 +73,20 @@ static void _starpu_destroy_priority_taskq(struct starpu_priority_taskq_s *prior
 	free(priority_queue);
 }
 
-static void initialize_eager_center_priority_policy_for_workers(unsigned sched_ctx_id, unsigned nnew_workers) 
+static void initialize_eager_center_priority_policy_for_workers(unsigned sched_ctx_id, int *workerids, unsigned nnew_workers) 
 {
 	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	unsigned nworkers_ctx = sched_ctx->nworkers;
+	struct eager_central_prio_data *data = (struct eager_central_prio_data*)sched_ctx->policy_data;
 
-	struct starpu_machine_config_s *config = (struct starpu_machine_config_s *)_starpu_get_machine_config();
-	unsigned ntotal_workers = config->topology.nworkers;
-
-	unsigned all_workers = nnew_workers == ntotal_workers ? ntotal_workers : nworkers_ctx + nnew_workers;
+	unsigned nworkers_ctx = sched_ctx->nworkers;
 
-	unsigned workerid_ctx;
-	for (workerid_ctx = nworkers_ctx; workerid_ctx < all_workers; workerid_ctx++)
+	unsigned i;
+	int workerid;
+	for (i = 0; i < nnew_workers; i++)
 	{
-		sched_ctx->sched_mutex[workerid_ctx] = sched_ctx->sched_mutex[0];
-		sched_ctx->sched_cond[workerid_ctx] = sched_ctx->sched_cond[0];
+		workerid = workerids[i];
+		sched_ctx->sched_mutex[workerid] = &data->sched_mutex;
+		sched_ctx->sched_cond[workerid] = &data->sched_cond;
 	}
 
 }
@@ -89,27 +94,27 @@ static void initialize_eager_center_priority_policy_for_workers(unsigned sched_c
 static void initialize_eager_center_priority_policy(unsigned sched_ctx_id) 
 {
 	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	struct eager_central_prio_data *data = (struct eager_central_prio_data*)malloc(sizeof(struct eager_central_prio_data));
 
 	/* In this policy, we support more than two levels of priority. */
 	starpu_sched_set_min_priority(MIN_LEVEL);
 	starpu_sched_set_max_priority(MAX_LEVEL);
 
 	/* only a single queue (even though there are several internaly) */
-	struct starpu_priority_taskq_s *taskq = _starpu_create_priority_taskq();
-	sched_ctx->policy_data = (void*) taskq;
-
-	pthread_cond_t *global_sched_cond = (pthread_cond_t*)malloc(sizeof(pthread_cond_t));
-	pthread_mutex_t *global_sched_mutex = (pthread_mutex_t*)malloc(sizeof(pthread_mutex_t));
+	data->taskq = _starpu_create_priority_taskq();
+	sched_ctx->policy_data = (void*) data;
 
-	PTHREAD_MUTEX_INIT(global_sched_mutex, NULL);
-	PTHREAD_COND_INIT(global_sched_cond, NULL);
+	PTHREAD_MUTEX_INIT(&data->sched_mutex, NULL);
+	PTHREAD_COND_INIT(&data->sched_cond, NULL);
 
 	int nworkers = sched_ctx->nworkers;
 	int workerid_ctx;
+	int workerid;
 	for (workerid_ctx = 0; workerid_ctx < nworkers; workerid_ctx++)
 	{
-		sched_ctx->sched_mutex[workerid_ctx] = global_sched_mutex;
-		sched_ctx->sched_cond[workerid_ctx] = global_sched_cond;
+		workerid = sched_ctx->workerids[workerid_ctx];
+		sched_ctx->sched_mutex[workerid] = &data->sched_mutex;
+		sched_ctx->sched_cond[workerid] = &data->sched_cond;
 	}
 }
 
@@ -117,26 +122,37 @@ static void deinitialize_eager_center_priority_policy(unsigned sched_ctx_id)
 {
 	/* TODO check that there is no task left in the queue */
 	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	struct starpu_priority_taskq_s *taskq = (struct starpu_priority_taskq_s*)sched_ctx->policy_data;
+	struct eager_central_prio_data *data = (struct eager_central_prio_data*)sched_ctx->policy_data;
 
 	/* deallocate the task queue */
-	_starpu_destroy_priority_taskq(taskq);
+	_starpu_destroy_priority_taskq(data->taskq);
+
+	PTHREAD_MUTEX_DESTROY(&data->sched_mutex);
+        PTHREAD_COND_DESTROY(&data->sched_cond);
 
-	PTHREAD_MUTEX_DESTROY(sched_ctx->sched_mutex[0]);
-        PTHREAD_COND_DESTROY(sched_ctx->sched_cond[0]);
-        free(sched_ctx->sched_mutex[0]);
-        free(sched_ctx->sched_cond[0]);
+        free(data);
 
-        free(taskq);
+	unsigned nworkers_ctx = sched_ctx->nworkers;
+	int workerid;
+	unsigned workerid_ctx;
+	for (workerid_ctx = 0; workerid_ctx < nworkers_ctx; workerid_ctx++)
+	{
+		workerid = sched_ctx->workerids[workerid_ctx];
+		sched_ctx->sched_mutex[workerid] = NULL;
+		sched_ctx->sched_cond[workerid] = NULL;
+	}
+	
 }
 
 static int _starpu_priority_push_task(struct starpu_task *task, unsigned sched_ctx_id)
 {
 	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	struct starpu_priority_taskq_s *taskq = (struct starpu_priority_taskq_s*)sched_ctx->policy_data;
+	struct eager_central_prio_data *data = (struct eager_central_prio_data*)sched_ctx->policy_data;
+
+	struct starpu_priority_taskq_s *taskq = data->taskq;
 
 	/* wake people waiting for a task */
-	PTHREAD_MUTEX_LOCK(sched_ctx->sched_mutex[0]);
+	PTHREAD_MUTEX_LOCK(&data->sched_mutex);
 
 	STARPU_TRACE_JOB_PUSH(task, 1);
 	
@@ -146,8 +162,8 @@ static int _starpu_priority_push_task(struct starpu_task *task, unsigned sched_c
 	taskq->ntasks[priolevel]++;
 	taskq->total_ntasks++;
 
-	PTHREAD_COND_SIGNAL(sched_ctx->sched_cond[0]);
-	PTHREAD_MUTEX_UNLOCK(sched_ctx->sched_mutex[0]);
+	PTHREAD_COND_SIGNAL(&data->sched_cond);
+	PTHREAD_MUTEX_UNLOCK(&data->sched_mutex);
 
 	return 0;
 }
@@ -157,18 +173,20 @@ static struct starpu_task *_starpu_priority_pop_task(unsigned sched_ctx_id)
 	struct starpu_task *task = NULL;
 
 	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	struct starpu_priority_taskq_s *taskq = (struct starpu_priority_taskq_s*)sched_ctx->policy_data;
+	struct eager_central_prio_data *data = (struct eager_central_prio_data*)sched_ctx->policy_data;
+	
+	struct starpu_priority_taskq_s *taskq = data->taskq;
 
 	/* block until some event happens */
-	PTHREAD_MUTEX_LOCK(sched_ctx->sched_mutex[0]);
+	PTHREAD_MUTEX_LOCK(&data->sched_mutex);
 
 	if ((taskq->total_ntasks == 0) && _starpu_machine_is_running())
 	{
 #ifdef STARPU_NON_BLOCKING_DRIVERS
-		PTHREAD_MUTEX_UNLOCK(sched_ctx->sched_mutex[0]);
+		PTHREAD_MUTEX_UNLOCK(&data->sched_mutex);
 		return NULL;
 #else
-		PTHREAD_COND_WAIT(sched_ctx->sched_cond[0], sched_ctx->sched_mutex[0]);
+		PTHREAD_COND_WAIT(&data->sched_cond, &data->sched_mutex);
 #endif
 	}
 
@@ -186,7 +204,7 @@ static struct starpu_task *_starpu_priority_pop_task(unsigned sched_ctx_id)
 		} while (!task && priolevel-- > 0);
 	}
 
-	PTHREAD_MUTEX_UNLOCK(sched_ctx->sched_mutex[0]);
+	PTHREAD_MUTEX_UNLOCK(&data->sched_mutex);
 
 	return task;
 }

+ 40 - 42
src/sched_policies/heft.c

@@ -22,7 +22,6 @@
 #include <float.h>
 
 #include <core/workers.h>
-#include <core/sched_ctx.h>
 #include <core/perfmodel/perfmodel.h>
 #include <starpu_parameters.h>
 #include <starpu_task_bundle.h>
@@ -56,14 +55,6 @@ void param_modified(struct starputop_param_t* d){
 }
 static void heft_init_for_workers(unsigned sched_ctx_id, int *workerids, unsigned nnew_workers)
 {
-	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	unsigned nworkers_ctx = sched_ctx->nworkers;
-	
-	struct starpu_machine_config_s *config = (struct starpu_machine_config_s *)_starpu_get_machine_config();
-	unsigned nworkers = config->topology.nworkers;
-
-	
-	unsigned workerid_ctx;
 	int workerid;
 	unsigned i;
 	for (i = 0; i < nnew_workers; i++)
@@ -83,8 +74,7 @@ static void heft_init_for_workers(unsigned sched_ctx_id, int *workerids, unsigne
 		/* we push the tasks on the local lists of the workers
 		   therefore the synchronisations mechanisms of the strategy
 		   are the global ones */
-		sched_ctx->sched_mutex[workerid] = workerarg->sched_mutex;
-		sched_ctx->sched_cond[workerid] = workerarg->sched_cond;
+		starpu_worker_set_sched_condition(sched_ctx_id, workerid, &workerarg->sched_mutex, &workerarg->sched_cond);
 	}
 }
 static void heft_init(unsigned sched_ctx_id)
@@ -95,9 +85,10 @@ static void heft_init(unsigned sched_ctx_id)
 	hd->_gamma = STARPU_DEFAULT_GAMMA;
 	hd->idle_power = 0.0;
 	
-	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	unsigned nworkers_ctx = sched_ctx->nworkers;
-	sched_ctx->policy_data = (void*)hd;
+	unsigned nworkers_ctx = starpu_get_nworkers_of_ctx(sched_ctx_id);
+	int *workerids = starpu_get_workers_of_ctx(sched_ctx_id);
+
+	starpu_set_sched_ctx_policy_data(sched_ctx_id, (void*)hd);
 
 	const char *strval_alpha = getenv("STARPU_SCHED_ALPHA");
 	if (strval_alpha)
@@ -124,7 +115,7 @@ static void heft_init(unsigned sched_ctx_id)
 
 	for (workerid_ctx = 0; workerid_ctx < nworkers_ctx; workerid_ctx++)
 	{
-		int workerid = sched_ctx->workerids[workerid_ctx];
+		int workerid = workerids[workerid_ctx];
 		struct starpu_worker_s *workerarg = _starpu_get_worker_struct(workerid);
 		/* init these structures only once for each worker */
 		if(!workerarg->has_prev_init)
@@ -138,9 +129,7 @@ static void heft_init(unsigned sched_ctx_id)
 		/* we push the tasks on the local lists of the workers
 		   therefore the synchronisations mechanisms of the strategy
 		   are the global ones */
-		sched_ctx->sched_mutex[workerid] = workerarg->sched_mutex;
-		sched_ctx->sched_cond[workerid] = workerarg->sched_cond;
-		
+		starpu_worker_set_sched_condition(sched_ctx_id, workerid, &workerarg->sched_mutex, &workerarg->sched_cond);
 	}
 }
 
@@ -148,30 +137,36 @@ static void heft_post_exec_hook(struct starpu_task *task, unsigned sched_ctx_id)
 {
 	int workerid = starpu_worker_get_id();
 	STARPU_ASSERT(workerid >= 0);
-	struct starpu_worker_s *worker = _starpu_get_worker_struct(workerid);
+
 	double model = task->predicted;
-	
+
+	pthread_mutex_t *sched_mutex;
+	pthread_cond_t *sched_cond;
+	starpu_worker_get_sched_condition(sched_ctx_id, workerid, &sched_mutex, &sched_cond);
 	/* Once we have executed the task, we can update the predicted amount
 	 * of work. */
-	PTHREAD_MUTEX_LOCK(worker->sched_mutex);
+	PTHREAD_MUTEX_LOCK(sched_mutex);
 	exp_len[workerid] -= model;
 	exp_start[workerid] = starpu_timing_now() + model;
 	exp_end[workerid] = exp_start[workerid] + exp_len[workerid];
 	ntasks[workerid]--;
-	PTHREAD_MUTEX_UNLOCK(worker->sched_mutex);
+	PTHREAD_MUTEX_UNLOCK(sched_mutex);
 }
 
 static void heft_push_task_notify(struct starpu_task *task, int workerid, unsigned sched_ctx_id)
 {
-	struct starpu_worker_s *worker = _starpu_get_worker_struct(workerid);
 	/* Compute the expected penality */
 	enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(workerid);
 
 	double predicted = starpu_task_expected_length(task, perf_arch,
 			_starpu_get_job_associated_to_task(task)->nimpl);
 
+	pthread_mutex_t *sched_mutex;
+	pthread_cond_t *sched_cond;
+	starpu_worker_get_sched_condition(sched_ctx_id, workerid, &sched_mutex, &sched_cond);
+
 	/* Update the predictions */
-	PTHREAD_MUTEX_LOCK(worker->sched_mutex);
+	PTHREAD_MUTEX_LOCK(sched_mutex);
 
 	/* Sometimes workers didn't take the tasks as early as we expected */
 	exp_start[workerid] = STARPU_MAX(exp_start[workerid], starpu_timing_now());
@@ -187,20 +182,23 @@ static void heft_push_task_notify(struct starpu_task *task, int workerid, unsign
 
 	ntasks[workerid]++;
 
-	PTHREAD_MUTEX_UNLOCK(worker->sched_mutex);
+	PTHREAD_MUTEX_UNLOCK(sched_mutex);
 }
 
-static int push_task_on_best_worker(struct starpu_task *task, int best_workerid, double predicted, int prio)
+static int push_task_on_best_worker(struct starpu_task *task, int best_workerid, double predicted, int prio, unsigned sched_ctx_id)
 {
 	/* make sure someone coule execute that task ! */
 	STARPU_ASSERT(best_workerid != -1);
-	struct starpu_worker_s *best_worker = _starpu_get_worker_struct(best_workerid);
 
-	PTHREAD_MUTEX_LOCK(best_worker->sched_mutex);
+	pthread_mutex_t *sched_mutex;
+	pthread_cond_t *sched_cond;
+	starpu_worker_get_sched_condition(sched_ctx_id, best_workerid, &sched_mutex, &sched_cond);
+
+	PTHREAD_MUTEX_LOCK(sched_mutex);
 	exp_end[best_workerid] += predicted;
 	exp_len[best_workerid] += predicted;
 	ntasks[best_workerid]++;
-	PTHREAD_MUTEX_UNLOCK(best_worker->sched_mutex);
+	PTHREAD_MUTEX_UNLOCK(sched_mutex);
 
 	task->predicted = predicted;
 
@@ -224,7 +222,7 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 					double *local_data_penalty,
 					double *local_power, int *forced_best,
 					struct starpu_task_bundle *bundle,
-					struct starpu_sched_ctx *sched_ctx )
+					unsigned sched_ctx_id)
 {
 	int calibrating = 0;
 	double max_exp_end = DBL_MIN;
@@ -235,14 +233,15 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 	/* A priori, we know all estimations */
 	int unknown = 0;
 	
-	unsigned nworkers_ctx = sched_ctx->nworkers;
-	
+	unsigned nworkers_ctx = starpu_get_nworkers_of_ctx(sched_ctx_id);
+	int *workerids = starpu_get_workers_of_ctx(sched_ctx_id);
+
 	unsigned nimpl;
 	unsigned best_impl = 0;
 	unsigned worker, worker_ctx;
 	for (worker_ctx = 0; worker_ctx < nworkers_ctx; worker_ctx++)
 	{
-		worker = sched_ctx->workerids[worker_ctx];
+		worker = workerids[worker_ctx];
 		for (nimpl = 0; nimpl <STARPU_MAXIMPLEMENTATIONS; nimpl++) 
 		{
 			/* Sometimes workers didn't take the tasks as early as we expected */
@@ -327,8 +326,7 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 
 static int _heft_push_task(struct starpu_task *task, unsigned prio, unsigned sched_ctx_id)
 {
-	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	heft_data *hd = (heft_data*)sched_ctx->policy_data;
+	heft_data *hd = (heft_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
 	unsigned worker, worker_ctx;
 	int best = -1, best_id_ctx = -1;
 	
@@ -336,7 +334,7 @@ static int _heft_push_task(struct starpu_task *task, unsigned prio, unsigned sch
 	   there is no performance prediction available yet */
 	int forced_best;
 
-	unsigned nworkers_ctx = sched_ctx->nworkers;
+	unsigned nworkers_ctx = starpu_get_nworkers_of_ctx(sched_ctx_id);
 	double local_task_length[nworkers_ctx];
 	double local_data_penalty[nworkers_ctx];
 	double local_power[nworkers_ctx];
@@ -355,13 +353,13 @@ static int _heft_push_task(struct starpu_task *task, unsigned prio, unsigned sch
 	compute_all_performance_predictions(task, local_task_length, exp_end,
 					    &max_exp_end, &best_exp_end,
 					    local_data_penalty,
-					    local_power, &forced_best, bundle, sched_ctx);
+					    local_power, &forced_best, bundle, sched_ctx_id);
 	
 	/* If there is no prediction available for that task with that arch we
 	 * want to speed-up calibration time so we force this measurement */
 	if (forced_best != -1){
 		_starpu_increment_nsubmitted_tasks_of_worker(forced_best);
-		return push_task_on_best_worker(task, forced_best, 0.0, prio);
+		return push_task_on_best_worker(task, forced_best, 0.0, prio, sched_ctx_id);
 	}
 	
 	/*
@@ -373,9 +371,10 @@ static int _heft_push_task(struct starpu_task *task, unsigned prio, unsigned sch
 	double fitness[nworkers_ctx];
 	double best_fitness = -1;
 
+	int *workerids = starpu_get_workers_of_ctx(sched_ctx_id);
 	for (worker_ctx = 0; worker_ctx < nworkers_ctx; worker_ctx++)
 	{
-		worker = sched_ctx->workerids[worker_ctx];
+		worker = workerids[worker_ctx];
 
 		if (!starpu_worker_may_execute_task(worker, task, 0))
 		{
@@ -433,7 +432,7 @@ static int _heft_push_task(struct starpu_task *task, unsigned prio, unsigned sch
 	}
 
 	_starpu_increment_nsubmitted_tasks_of_worker(best);
-	return push_task_on_best_worker(task, best, model_best, prio);
+	return push_task_on_best_worker(task, best, model_best, prio, sched_ctx_id);
 }
 
 static int heft_push_task(struct starpu_task *task, unsigned sched_ctx_id)
@@ -446,8 +445,7 @@ static int heft_push_task(struct starpu_task *task, unsigned sched_ctx_id)
 
 static void heft_deinit(unsigned sched_ctx_id) 
 {
-	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	heft_data *ht = (heft_data*)sched_ctx->policy_data;	  
+	heft_data *ht = (heft_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
 	free(ht);
 }
 

+ 73 - 48
src/sched_policies/parallel_greedy.c

@@ -2,6 +2,7 @@
  *
  * Copyright (C) 2011  Université de Bordeaux 1
  * Copyright (C) 2011  Télécom-SudParis
+ * Copyright (C) 2011  INRIA
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -19,17 +20,15 @@
 #include <sched_policies/fifo_queues.h>
 #include <common/barrier.h>
 
-/* the former is the actual queue, the latter some container */
-static struct starpu_fifo_taskq_s *fifo;
-static struct starpu_fifo_taskq_s *local_fifo[STARPU_NMAXWORKERS];
+typedef struct pgreedy_data {
+	struct starpu_fifo_taskq_s *fifo;
+	struct starpu_fifo_taskq_s *local_fifo[STARPU_NMAXWORKERS];
 
-static int master_id[STARPU_NMAXWORKERS];
+	int master_id[STARPU_NMAXWORKERS];
 
-static pthread_cond_t sched_cond;
-static pthread_mutex_t sched_mutex;
-
-static pthread_cond_t master_sched_cond[STARPU_NMAXWORKERS];
-static pthread_mutex_t master_sched_mutex[STARPU_NMAXWORKERS];
+	pthread_cond_t sched_cond;
+	pthread_mutex_t sched_mutex;
+} pgreedy_data;
 
 /* XXX instead of 10, we should use some "MAX combination .."*/
 static int possible_combinations_cnt[STARPU_NMAXWORKERS];
@@ -39,16 +38,16 @@ static int possible_combinations_size[STARPU_NMAXWORKERS][10];
 static void initialize_pgreedy_policy(unsigned sched_ctx_id) 
 {
 	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-
+	struct pgreedy_data *data = (struct pgreedy_data*)malloc(sizeof(pgreedy_data));
 	/* masters pick tasks from that queue */
-	fifo = _starpu_create_fifo();
+	data->fifo = _starpu_create_fifo();
 
 	struct starpu_machine_config_s *config = _starpu_get_machine_config();
-    struct starpu_machine_topology_s *topology = &config->topology;
+	struct starpu_machine_topology_s *topology = &config->topology;
 
 	_starpu_sched_find_worker_combinations(topology);
 
-	unsigned workerid, workerid_ctx;;
+	unsigned workerid, workerid_ctx;
 	unsigned ncombinedworkers, nworkers, nworkers_ctx;
 	
 	nworkers = topology->nworkers;
@@ -60,16 +59,16 @@ static void initialize_pgreedy_policy(unsigned sched_ctx_id)
 	 * to find the biggest combination containing this worker. */
 
 	for (workerid_ctx = 0; workerid_ctx < nworkers_ctx; workerid_ctx++)
-	  {
+	{
     	        workerid = sched_ctx->workerids[workerid_ctx];
-
+		
 		int cnt = possible_combinations_cnt[workerid]++;
 		possible_combinations[workerid][cnt] = workerid;
 		possible_combinations_size[workerid][cnt] = 1;
-
-		master_id[workerid] = workerid;
+		
+		data->master_id[workerid] = workerid;
 	}
-
+	
 	unsigned i;
 	
 	for (i = 0; i < ncombinedworkers; i++)
@@ -86,8 +85,8 @@ static void initialize_pgreedy_policy(unsigned sched_ctx_id)
 		int j;
 		for (j = 0; j < size; j++)
 		{
-			if (master_id[workers[j]] > master)
-				master_id[workers[j]] = master;
+			if (data->master_id[workers[j]] > master)
+				data->master_id[workers[j]] = master;
 
 			int cnt = possible_combinations_cnt[workers[j]]++;
 			possible_combinations[workers[j]][cnt] = workerid;
@@ -95,77 +94,103 @@ static void initialize_pgreedy_policy(unsigned sched_ctx_id)
 		}
 	}
 
-	PTHREAD_MUTEX_INIT(&sched_mutex, NULL);
-	PTHREAD_COND_INIT(&sched_cond, NULL);
+	PTHREAD_MUTEX_INIT(&data->sched_mutex, NULL);
+	PTHREAD_COND_INIT(&data->sched_cond, NULL);
 
 	for (workerid_ctx = 0; workerid_ctx < nworkers_ctx; workerid_ctx++)
 	{
-      	workerid = sched_ctx->workerids[workerid_ctx];
+		workerid = sched_ctx->workerids[workerid_ctx];
 
-		PTHREAD_MUTEX_INIT(&master_sched_mutex[workerid], NULL);
-		PTHREAD_COND_INIT(&master_sched_cond[workerid], NULL);
+		PTHREAD_MUTEX_INIT(sched_ctx->sched_mutex[workerid], NULL);
+		PTHREAD_COND_INIT(sched_ctx->sched_cond[workerid], NULL);
 	}
+
 	for (workerid_ctx = 0; workerid_ctx < nworkers_ctx; workerid_ctx++)
-    {
+	{
 		workerid = sched_ctx->workerids[workerid_ctx];
 
 		/* slaves pick up tasks from their local queue, their master
 		 * will put tasks directly in that local list when a parallel
 		 * tasks comes. */
-		local_fifo[workerid] = _starpu_create_fifo();
+		data->local_fifo[workerid] = _starpu_create_fifo();
 
-		unsigned master = master_id[workerid];
+		unsigned master = data->master_id[workerid];
 
 		/* All masters use the same condition/mutex */
 		if (master == workerid)
 		{
-			starpu_worker_set_sched_condition(workerid,
-				&sched_cond, &sched_mutex);
-		}
-		else {
-			starpu_worker_set_sched_condition(workerid,
-				&master_sched_cond[master],
-				&master_sched_mutex[master]);
+			sched_ctx->sched_mutex[workerid] = &data->sched_mutex;
+			sched_ctx->sched_cond[workerid] = &data->sched_cond;
 		}
 	}
+	sched_ctx->policy_data = (void*)data;
 
 #if 0
 	for (workerid_ctx = 0; workerid_ctx < nworkers_ctx; workerid_ctx++)
 	{
-        workerid = sched_ctx->workerids[workerid_ctx];
+		workerid = sched_ctx->workerids[workerid_ctx];
 
 		fprintf(stderr, "MASTER of %d = %d\n", workerid, master_id[workerid]);
 	}
 #endif
 }
 
-static void deinitialize_pgreedy_policy(__attribute__ ((unused)) unsigned sched_ctx_id) 
+static void deinitialize_pgreedy_policy(unsigned sched_ctx_id) 
 {
 	/* TODO check that there is no task left in the queue */
+	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	struct pgreedy_data *data = (struct pgreedy_data*)sched_ctx->policy_data;
+
 
 	/* deallocate the job queue */
-	_starpu_destroy_fifo(fifo);
+	_starpu_destroy_fifo(data->fifo);
+
+	PTHREAD_MUTEX_DESTROY(&data->sched_mutex);
+	PTHREAD_COND_DESTROY(&data->sched_cond);
+	
+	free(data);	
+	
+	unsigned nworkers_ctx = sched_ctx->nworkers;
+	int workerid;
+	unsigned workerid_ctx;
+	for (workerid_ctx = 0; workerid_ctx < nworkers_ctx; workerid_ctx++)
+	{
+		workerid = sched_ctx->workerids[workerid_ctx];
+		_starpu_destroy_fifo(data->local_fifo[workerid]);
+		PTHREAD_MUTEX_DESTROY(sched_ctx->sched_mutex[workerid]);
+		PTHREAD_COND_DESTROY(sched_ctx->sched_cond[workerid]);
+
+		sched_ctx->sched_mutex[workerid] = NULL;
+		sched_ctx->sched_cond[workerid] = NULL;
+	}
+
 }
 
-static int push_task_pgreedy_policy(struct starpu_task *task, __attribute__ ((unused)) unsigned sched_ctx_id)
+static int push_task_pgreedy_policy(struct starpu_task *task, unsigned sched_ctx_id)
 {
-	return _starpu_fifo_push_task(fifo, &sched_mutex, &sched_cond, task);
+	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	struct pgreedy_data *data = (struct pgreedy_data*)sched_ctx->policy_data;
+
+	return _starpu_fifo_push_task(data->fifo, &data->sched_mutex, &data->sched_cond, task);
 }
 
-static struct starpu_task *pop_task_pgreedy_policy(void)
+static struct starpu_task *pop_task_pgreedy_policy(unsigned sched_ctx_id)
 {
+	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	struct pgreedy_data *data = (struct pgreedy_data*)sched_ctx->policy_data;
+
 	int workerid = starpu_worker_get_id();
 
 	/* If this is not a CPU, then the worker simply grabs tasks from the fifo */
 	if (starpu_worker_get_type(workerid) != STARPU_CPU_WORKER)
-		return  _starpu_fifo_pop_task(fifo, workerid);
+		return  _starpu_fifo_pop_task(data->fifo, workerid);
 
-	int master = master_id[workerid];
+	int master = data->master_id[workerid];
 
 	if (master == workerid)
 	{
 		/* The worker is a master */
-		struct starpu_task *task = _starpu_fifo_pop_task(fifo, workerid);
+		struct starpu_task *task = _starpu_fifo_pop_task(data->fifo, workerid);
 
 		if (!task)
 			return NULL;
@@ -226,9 +251,9 @@ static struct starpu_task *pop_task_pgreedy_policy(void)
 				struct starpu_task *alias = _starpu_create_task_alias(task);
 				int local_worker = combined_workerid[i];
 
-				_starpu_fifo_push_task(local_fifo[local_worker],
-					&master_sched_mutex[master],
-					&master_sched_cond[master], alias);
+				_starpu_fifo_push_task(data->local_fifo[local_worker],
+					sched_ctx->sched_mutex[master],
+					sched_ctx->sched_cond[master], alias);
 			}
 
 			/* The master also manipulated an alias */
@@ -238,7 +263,7 @@ static struct starpu_task *pop_task_pgreedy_policy(void)
 	}
 	else {
 		/* The worker is a slave */
-		return _starpu_fifo_pop_task(local_fifo[workerid], workerid);
+		return _starpu_fifo_pop_task(data->local_fifo[workerid], workerid);
 	}
 }
 

+ 150 - 78
src/sched_policies/parallel_heft.c

@@ -26,51 +26,51 @@
 
 static pthread_mutex_t big_lock;
 
-static unsigned nworkers, ncombinedworkers;
+static unsigned  ncombinedworkers;
 //static enum starpu_perf_archtype applicable_perf_archtypes[STARPU_NARCH_VARIATIONS];
 //static unsigned napplicable_perf_archtypes = 0;
 
-static pthread_cond_t sched_cond[STARPU_NMAXWORKERS];
-static pthread_mutex_t sched_mutex[STARPU_NMAXWORKERS];
-
-static double alpha = STARPU_DEFAULT_ALPHA;
-static double beta = STARPU_DEFAULT_BETA;
-static double _gamma = STARPU_DEFAULT_GAMMA;
-static double idle_power = 0.0;
+typedef struct {
+	double alpha;
+	double beta;
+	double _gamma;
+	double idle_power;
+} pheft_data;
 
 static double worker_exp_start[STARPU_NMAXWORKERS];
 static double worker_exp_end[STARPU_NMAXWORKERS];
 static double worker_exp_len[STARPU_NMAXWORKERS];
 static int ntasks[STARPU_NMAXWORKERS];
 
-static void parallel_heft_post_exec_hook(struct starpu_task *task)
+static void parallel_heft_post_exec_hook(struct starpu_task *task, unsigned sched_ctx_id)
 {
 	if (!task->cl || task->execute_on_a_specific_worker)
 		return;
 
 	int workerid = starpu_worker_get_id();
 	double model = task->predicted;
-	
+	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
 	if (model < 0.0)
 		model = 0.0;
 	
 	/* Once we have executed the task, we can update the predicted amount
 	 * of work. */
-	PTHREAD_MUTEX_LOCK(&sched_mutex[workerid]);
+	PTHREAD_MUTEX_LOCK(sched_ctx->sched_mutex[workerid]);
 	worker_exp_len[workerid] -= model;
 	worker_exp_start[workerid] = starpu_timing_now();
 	worker_exp_end[workerid] = worker_exp_start[workerid] + worker_exp_len[workerid];
 	ntasks[workerid]--;
-	PTHREAD_MUTEX_UNLOCK(&sched_mutex[workerid]);
+	PTHREAD_MUTEX_UNLOCK(sched_ctx->sched_mutex[workerid]);
 }
 
-static int push_task_on_best_worker(struct starpu_task *task, int best_workerid, double exp_end_predicted, int prio)
+static int push_task_on_best_worker(struct starpu_task *task, int best_workerid, double exp_end_predicted, int prio, struct starpu_sched_ctx *sched_ctx)
 {
 	/* make sure someone coule execute that task ! */
 	STARPU_ASSERT(best_workerid != -1);
 
 	/* Is this a basic worker or a combined worker ? */
-	int nbasic_workers = (int)starpu_worker_get_count();
+//	int nbasic_workers = (int)starpu_worker_get_count();
+	int nbasic_workers = sched_ctx->nworkers;
 	int is_basic_worker = (best_workerid < nbasic_workers);
 
 	unsigned memory_node; 
@@ -133,7 +133,7 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 	return ret;
 }
 
-static double compute_expected_end(int workerid, double length)
+static double compute_expected_end(int workerid, double length, int nworkers)
 {
 	if (workerid < (int)nworkers)
 	{
@@ -161,7 +161,7 @@ static double compute_expected_end(int workerid, double length)
 	}
 }
 
-static double compute_ntasks_end(int workerid)
+static double compute_ntasks_end(int workerid, int nworkers)
 {
 	enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(workerid);
 	if (workerid < (int)nworkers)
@@ -188,37 +188,42 @@ static double compute_ntasks_end(int workerid)
 	}
 }
 
-static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio)
+static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio, unsigned sched_ctx_id)
 {
-	unsigned worker;
-	int best = -1;
+	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+
+	pheft_data *hd = (pheft_data*)sched_ctx->policy_data;
+	unsigned nworkers_ctx = sched_ctx->nworkers;
+	unsigned worker, worker_ctx;
+	int best = -1, best_id_ctx = -1;
 	
 	/* this flag is set if the corresponding worker is selected because
 	   there is no performance prediction available yet */
-	int forced_best = -1;
+	int forced_best = -1, forced_best_ctx = -1;
 
-	double local_task_length[nworkers+ncombinedworkers];
-	double local_data_penalty[nworkers+ncombinedworkers];
-	double local_power[nworkers+ncombinedworkers];
-	double local_exp_end[nworkers+ncombinedworkers];
-	double fitness[nworkers+ncombinedworkers];
+	double local_task_length[nworkers_ctx + ncombinedworkers];
+	double local_data_penalty[nworkers_ctx + ncombinedworkers];
+	double local_power[nworkers_ctx + ncombinedworkers];
+	double local_exp_end[nworkers_ctx + ncombinedworkers];
+	double fitness[nworkers_ctx + ncombinedworkers];
 
 	double max_exp_end = 0.0;
 
-	int skip_worker[nworkers+ncombinedworkers];
+	int skip_worker[nworkers_ctx + ncombinedworkers];
 
 	double best_exp_end = DBL_MAX;
 	//double penality_best = 0.0;
 
-	int ntasks_best = -1;
+	int ntasks_best = -1, ntasks_best_ctx = -1;
 	double ntasks_best_end = 0.0;
 	int calibrating = 0;
 
 	/* A priori, we know all estimations */
 	int unknown = 0;
 
-	for (worker = 0; worker < nworkers; worker++)
+	for (worker_ctx = 0; worker_ctx < nworkers_ctx; worker_ctx++)
 	{
+		worker = sched_ctx->workerids[worker_ctx];
 		/* Sometimes workers didn't take the tasks as early as we expected */
 		worker_exp_start[worker] = STARPU_MAX(worker_exp_start[worker], starpu_timing_now());
 		worker_exp_end[worker] = worker_exp_start[worker] + worker_exp_len[worker];
@@ -228,8 +233,9 @@ static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio)
 
 	unsigned nimpl;
 	unsigned best_impl = 0;
-	for (worker = 0; worker < (nworkers+ncombinedworkers); worker++)
-	{
+	for (worker_ctx = 0; worker_ctx < (nworkers_ctx + ncombinedworkers); worker_ctx++)
+ 	{
+		worker = sched_ctx->workerids[worker_ctx];
 		for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
 		{
 			if (!starpu_combined_worker_may_execute_task(worker, task, nimpl))
@@ -244,12 +250,12 @@ static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio)
 
 			enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(worker);
 
-			local_task_length[worker] = starpu_task_expected_length(task, perf_arch,nimpl);
+			local_task_length[worker_ctx] = starpu_task_expected_length(task, perf_arch,nimpl);
 
 			unsigned memory_node = starpu_worker_get_memory_node(worker);
-			local_data_penalty[worker] = starpu_task_expected_data_transfer_time(memory_node, task);
+			local_data_penalty[worker_ctx] = starpu_task_expected_data_transfer_time(memory_node, task);
 
-			double ntasks_end = compute_ntasks_end(worker);
+			double ntasks_end = compute_ntasks_end(worker, nworkers_ctx);
 
 			if (ntasks_best == -1
 					|| (!calibrating && ntasks_end < ntasks_best_end) /* Not calibrating, take better task */
@@ -258,15 +264,16 @@ static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio)
 					) {
 				ntasks_best_end = ntasks_end;
 				ntasks_best = worker;
+				ntasks_best_ctx = worker_ctx;
 			}
 
-			if (local_task_length[worker] == -1.0)
+			if (local_task_length[worker_ctx] == -1.0)
 				/* we are calibrating, we want to speed-up calibration time
 				 * so we privilege non-calibrated tasks (but still
 				 * greedily distribute them to avoid dumb schedules) */
 				calibrating = 1;
 
-			if (local_task_length[worker] <= 0.0)
+			if (local_task_length[worker_ctx] <= 0.0)
 				/* there is no prediction available for that task
 				 * with that arch yet, so switch to a greedy strategy */
 				unknown = 1;
@@ -274,59 +281,67 @@ static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio)
 			if (unknown)
 				continue;
 
-			local_exp_end[worker] = compute_expected_end(worker, local_task_length[worker]);
+			local_exp_end[worker_ctx] = compute_expected_end(worker, local_task_length[worker], nworkers_ctx);
 
 			//fprintf(stderr, "WORKER %d -> length %e end %e\n", worker, local_task_length[worker], local_exp_end[worker]);
 
-			if (local_exp_end[worker] < best_exp_end)
+			if (local_exp_end[worker_ctx] < best_exp_end)
 			{
 				/* a better solution was found */
-				best_exp_end = local_exp_end[worker];
+				best_exp_end = local_exp_end[worker_ctx];
 				best_impl = nimpl;
 			}
 
 
-			local_power[worker] = starpu_task_expected_power(task, perf_arch,nimpl);
+			local_power[worker_ctx] = starpu_task_expected_power(task, perf_arch,nimpl);
 			//_STARPU_DEBUG("Scheduler parallel heft: task length (%lf) local power (%lf) worker (%u) kernel (%u) \n", local_task_length[worker],local_power[worker],worker,nimpl);
 
-			if (local_power[worker] == -1.0)
-				local_power[worker] = 0.;
+			if (local_power[worker_ctx] == -1.0)
+				local_power[worker_ctx] = 0.;
 
 		} //end for
 	}
 
 	if (unknown)
+	{
 		forced_best = ntasks_best;
+		forced_best_ctx = ntasks_best_ctx;
+	}
+
 
 	double best_fitness = -1;
 
 
 	if (forced_best == -1)
 	{
-		for (worker = 0; worker < nworkers+ncombinedworkers; worker++)
+		for (worker_ctx = 0; worker_ctx < nworkers_ctx + ncombinedworkers; worker_ctx++)
 		{
+			/* if combinedworker don't search the id in the ctx */
+			worker = worker_ctx >= nworkers_ctx ? worker_ctx : 
+				sched_ctx->workerids[worker_ctx];
 
-			if (skip_worker[worker])
+			if (skip_worker[worker_ctx])
 			{
 				/* no one on that queue may execute this task */
 				continue;
 			}
 	
-			fitness[worker] = alpha*(local_exp_end[worker] - best_exp_end) 
-					+ beta*(local_data_penalty[worker])
-					+ _gamma*(local_power[worker]);
+			fitness[worker_ctx] = hd->alpha*(local_exp_end[worker_ctx] - best_exp_end) 
+					+ hd->beta*(local_data_penalty[worker_ctx])
+					+ hd->_gamma*(local_power[worker_ctx]);
 
-			if (local_exp_end[worker] > max_exp_end)
+			if (local_exp_end[worker_ctx] > max_exp_end)
 				/* This placement will make the computation
 				 * longer, take into account the idle
 				 * consumption of other cpus */
-				fitness[worker] += _gamma * idle_power * (local_exp_end[worker] - max_exp_end) / 1000000.0;
+				fitness[worker_ctx] += hd->_gamma * hd->idle_power * (local_exp_end[worker_ctx] - max_exp_end) / 1000000.0;
 
-			if (best == -1 || fitness[worker] < best_fitness)
+			if (best == -1 || fitness[worker_ctx] < best_fitness)
 			{
 				/* we found a better solution */
-				best_fitness = fitness[worker];
+				best_fitness = fitness[worker_ctx];
 				best = worker;
+				best_id_ctx = worker_ctx;
 			}
 
 		//	fprintf(stderr, "FITNESS worker %d -> %e local_exp_end %e - local_data_penalty %e\n", worker, fitness[worker], local_exp_end[worker] - best_exp_end, local_data_penalty[worker]);
@@ -341,73 +356,110 @@ static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio)
 		 * with that arch we want to speed-up calibration time
 		 * so we force this measurement */
 		best = forced_best;
+		best_id_ctx = forced_best_ctx;
 		//penality_best = 0.0;
-		best_exp_end = local_exp_end[best];
+		best_exp_end = local_exp_end[best_id_ctx];
 	}
 	else 
 	{
                 //penality_best = local_data_penalty[best];
-		best_exp_end = local_exp_end[best];
+		best_exp_end = local_exp_end[best_id_ctx];
 	}
 
 
 	//_STARPU_DEBUG("Scheduler parallel heft: kernel (%u)\n", best_impl);
 	_starpu_get_job_associated_to_task(task)->nimpl = best_impl;
 	/* we should now have the best worker in variable "best" */
-	return push_task_on_best_worker(task, best, best_exp_end, prio);
+	return push_task_on_best_worker(task, best, best_exp_end, prio, sched_ctx);
 }
 
-static int parallel_heft_push_task(struct starpu_task *task)
+static int parallel_heft_push_task(struct starpu_task *task, unsigned sched_ctx_id)
 {
 	if (task->priority == STARPU_MAX_PRIO)
-		return _parallel_heft_push_task(task, 1);
+		return _parallel_heft_push_task(task, 1, sched_ctx_id);
 
-	return _parallel_heft_push_task(task, 0);
+	return _parallel_heft_push_task(task, 0, sched_ctx_id);
 }
 
-static void initialize_parallel_heft_policy(struct starpu_machine_topology_s *topology, 
-	 __attribute__ ((unused)) struct starpu_sched_policy_s *_policy) 
+static void parallel_heft_init_for_workers(unsigned sched_ctx_id, int *workerids, unsigned nnew_workers)
 {
-	nworkers = topology->nworkers;
+	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	
+	int workerid;
+	unsigned i;
+	for (i = 0; i < nnew_workers; i++)
+	{
+		workerid = workerids[i];
+		struct starpu_worker_s *workerarg = _starpu_get_worker_struct(workerid);
+		/* init these structures only once for each worker */
+		if(!workerarg->has_prev_init)
+		{
+			worker_exp_start[workerid] = starpu_timing_now();
+			worker_exp_len[workerid] = 0.0;
+			worker_exp_end[workerid] = worker_exp_start[workerid]; 
+			ntasks[workerid] = 0;
+			workerarg->has_prev_init = 1;
+		}
+
+		PTHREAD_MUTEX_INIT(sched_ctx->sched_mutex[workerid], NULL);
+		PTHREAD_COND_INIT(sched_ctx->sched_cond[workerid], NULL);
+	}
+}
+
+static void initialize_parallel_heft_policy(unsigned sched_ctx_id) 
+{	
+	pheft_data *hd = (pheft_data*)malloc(sizeof(pheft_data));
+	hd->alpha = STARPU_DEFAULT_ALPHA;
+	hd->beta = STARPU_DEFAULT_BETA;
+	hd->_gamma = STARPU_DEFAULT_GAMMA;
+	hd->idle_power = 0.0;
+	
+	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	unsigned nworkers_ctx = sched_ctx->nworkers;
+	sched_ctx->policy_data = (void*)hd;
 
 	const char *strval_alpha = getenv("STARPU_SCHED_ALPHA");
 	if (strval_alpha)
-		alpha = atof(strval_alpha);
+		hd->alpha = atof(strval_alpha);
 
 	const char *strval_beta = getenv("STARPU_SCHED_BETA");
 	if (strval_beta)
-		beta = atof(strval_beta);
+		hd->beta = atof(strval_beta);
 
 	const char *strval_gamma = getenv("STARPU_SCHED_GAMMA");
 	if (strval_gamma)
-		_gamma = atof(strval_gamma);
+		hd->_gamma = atof(strval_gamma);
 
 	const char *strval_idle_power = getenv("STARPU_IDLE_POWER");
 	if (strval_idle_power)
-		idle_power = atof(strval_idle_power);
+		hd->idle_power = atof(strval_idle_power);
 
-	_starpu_sched_find_worker_combinations(topology);
+	struct starpu_machine_config_s *config = (struct starpu_machine_config_s *)_starpu_get_machine_config();
+	_starpu_sched_find_worker_combinations(&config->topology);
 
-	ncombinedworkers = topology->ncombinedworkers;
+	ncombinedworkers = config->topology.ncombinedworkers;
 
-	unsigned workerid;
-	for (workerid = 0; workerid < nworkers; workerid++)
+	unsigned workerid_ctx;
+	int workerid;
+	for (workerid_ctx = 0; workerid_ctx < nworkers_ctx; workerid_ctx++)
 	{
-		worker_exp_start[workerid] = starpu_timing_now();
-		worker_exp_len[workerid] = 0.0;
-		worker_exp_end[workerid] = worker_exp_start[workerid]; 
-		ntasks[workerid] = 0;
-	
-		PTHREAD_MUTEX_INIT(&sched_mutex[workerid], NULL);
-		PTHREAD_COND_INIT(&sched_cond[workerid], NULL);
-	
-		starpu_worker_set_sched_condition(workerid, &sched_cond[workerid], &sched_mutex[workerid]);
+		workerid = sched_ctx->workerids[workerid_ctx];
+		struct starpu_worker_s *workerarg = _starpu_get_worker_struct(workerid);
+		if(!workerarg->has_prev_init)
+		{
+			worker_exp_start[workerid] = starpu_timing_now();
+			worker_exp_len[workerid] = 0.0;
+			worker_exp_end[workerid] = worker_exp_start[workerid]; 
+			ntasks[workerid] = 0;
+		}
+		PTHREAD_MUTEX_INIT(sched_ctx->sched_mutex[workerid], NULL);
+		PTHREAD_COND_INIT(sched_ctx->sched_cond[workerid], NULL);
 	}
 
 	PTHREAD_MUTEX_INIT(&big_lock, NULL);
 
 	/* We pre-compute an array of all the perfmodel archs that are applicable */
-	unsigned total_worker_count = nworkers + ncombinedworkers;
+	unsigned total_worker_count = nworkers_ctx + ncombinedworkers;
 
 	unsigned used_perf_archtypes[STARPU_NARCH_VARIATIONS];
 	memset(used_perf_archtypes, 0, sizeof(used_perf_archtypes));
@@ -428,10 +480,30 @@ static void initialize_parallel_heft_policy(struct starpu_machine_topology_s *to
 //	}
 }
 
+static void parallel_heft_deinit(unsigned sched_ctx_id) 
+{
+	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	pheft_data *ht = (pheft_data*)sched_ctx->policy_data;	  
+	unsigned workerid_ctx;
+	int workerid;
+
+	unsigned nworkers_ctx = sched_ctx->nworkers;
+	for (workerid_ctx = 0; workerid_ctx < nworkers_ctx; workerid_ctx++)
+	{	
+		workerid = sched_ctx->workerids[workerid_ctx];
+		PTHREAD_MUTEX_DESTROY(sched_ctx->sched_mutex[workerid]);
+		PTHREAD_COND_DESTROY(sched_ctx->sched_cond[workerid]);
+	}
+
+	free(ht);
+	PTHREAD_MUTEX_DESTROY(&big_lock);
+}
+
 /* TODO: use post_exec_hook to fix the expected start */
 struct starpu_sched_policy_s _starpu_sched_parallel_heft_policy = {
 	.init_sched = initialize_parallel_heft_policy,
-	.deinit_sched = NULL,
+	.init_sched_for_workers = parallel_heft_init_for_workers,
+	.deinit_sched = parallel_heft_deinit,
 	.push_task = parallel_heft_push_task, 
 	.pop_task = NULL,
 	.post_exec_hook = parallel_heft_post_exec_hook,

+ 10 - 16
src/sched_policies/random_policy.c

@@ -19,6 +19,7 @@
 /* Policy attributing tasks randomly to workers */
 
 #include <core/workers.h>
+#include <core/sched_ctx.h>
 #include <sched_policies/fifo_queues.h>
 
 static int _random_push_task(struct starpu_task *task, unsigned prio, struct starpu_sched_ctx *sched_ctx)
@@ -46,7 +47,7 @@ static int _random_push_task(struct starpu_task *task, unsigned prio, struct sta
 	double alpha = 0.0;
 	for (worker_ctx = 0; worker_ctx < nworkers; worker_ctx++)
 	{
-        worker = sched_ctx->workerids[worker_ctx];
+		worker = sched_ctx->workerids[worker_ctx];
 
 		enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(worker);
 		double worker_alpha = starpu_worker_get_relative_speedup(perf_arch);
@@ -74,25 +75,18 @@ static int random_push_task(struct starpu_task *task, unsigned sched_ctx_id)
     return _random_push_task(task, 0, sched_ctx);
 }
 
-static void initialize_random_policy_for_workers(unsigned sched_ctx_id, unsigned nnew_workers) 
+static void initialize_random_policy_for_workers(unsigned sched_ctx_id, int *workerids, unsigned nnew_workers) 
 {
 	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
 
-	unsigned nworkers_ctx = sched_ctx->nworkers;
-
-	struct starpu_machine_config_s *config = (struct starpu_machine_config_s *)_starpu_get_machine_config();
-	unsigned ntotal_workers = config->topology.nworkers;
-
-	unsigned all_workers = nnew_workers == ntotal_workers ? ntotal_workers : nworkers_ctx + nnew_workers;
-
-	unsigned workerid_ctx;
+	unsigned i;
 	int workerid;
-	for (workerid_ctx = nworkers_ctx; workerid_ctx < all_workers; workerid_ctx++)
+	for (i = 0; i < nnew_workers; i++)
 	{
-		workerid = sched_ctx->workerids[workerid_ctx];
+		workerid = sched_ctx->workerids[i];
 		struct starpu_worker_s *workerarg = _starpu_get_worker_struct(workerid);
-		sched_ctx->sched_mutex[workerid_ctx] = workerarg->sched_mutex;
-		sched_ctx->sched_cond[workerid_ctx] = workerarg->sched_cond;
+		sched_ctx->sched_mutex[workerid] = &workerarg->sched_mutex;
+		sched_ctx->sched_cond[workerid] = &workerarg->sched_cond;
 	}
 }
 
@@ -110,8 +104,8 @@ static void initialize_random_policy(unsigned sched_ctx_id)
 	{
 		workerid = sched_ctx->workerids[workerid_ctx];
 		struct starpu_worker_s *workerarg = _starpu_get_worker_struct(workerid);
-		sched_ctx->sched_mutex[workerid_ctx] = workerarg->sched_mutex;
-		sched_ctx->sched_cond[workerid_ctx] = workerarg->sched_cond;
+		sched_ctx->sched_mutex[workerid] = &workerarg->sched_mutex;
+		sched_ctx->sched_cond[workerid] = &workerarg->sched_cond;
 	}
 }
 

+ 52 - 33
src/sched_policies/work_stealing_policy.c

@@ -21,13 +21,15 @@
 #include <core/workers.h>
 #include <sched_policies/deque_queues.h>
 
-typedef struct {
+typedef struct work_stealing_data{
 	struct starpu_deque_jobq_s **queue_array;
 	unsigned rr_worker;
 	/* keep track of the work performed from the beginning of the algorithm to make
 	 * better decisions about which queue to select when stealing or deferring work
 	 */
 	unsigned performed_total;
+	pthread_mutex_t sched_mutex;
+	pthread_cond_t sched_cond;
 } work_stealing_data;
 
 #ifdef USE_OVERLOAD
@@ -141,19 +143,18 @@ static struct starpu_task *ws_pop_task(unsigned sched_ctx_id)
 	struct starpu_task *task;
 
 	int workerid = starpu_worker_get_id();
-	int workerid_ctx =  _starpu_get_index_in_ctx_of_workerid(sched_ctx_id, workerid);
 
 	struct starpu_deque_jobq_s *q;
 
-	q = ws->queue_array[workerid_ctx];
+	q = ws->queue_array[workerid];
 
-	PTHREAD_MUTEX_LOCK(sched_ctx->sched_mutex[0]);
+	PTHREAD_MUTEX_LOCK(&ws->sched_mutex);
 
 	task = _starpu_deque_pop_task(q, -1);
 	if (task) {
 		/* there was a local task */
 		ws->performed_total++;
-		PTHREAD_MUTEX_UNLOCK(sched_ctx->sched_mutex[0]);
+		PTHREAD_MUTEX_UNLOCK(&ws->sched_mutex);
 		return task;
 	}
 	
@@ -167,7 +168,7 @@ static struct starpu_task *ws_pop_task(unsigned sched_ctx_id)
 		ws->performed_total++;
 	}
 
-	PTHREAD_MUTEX_UNLOCK(sched_ctx->sched_mutex[0]);
+	PTHREAD_MUTEX_UNLOCK(&ws->sched_mutex);
 
 	return task;
 }
@@ -180,12 +181,12 @@ int ws_push_task(struct starpu_task *task, unsigned sched_ctx_id)
 	work_stealing_data *ws = (work_stealing_data*)sched_ctx->policy_data;
 
 	int workerid = starpu_worker_get_id();
-	int workerid_ctx =  _starpu_get_index_in_ctx_of_workerid(sched_ctx_id, workerid);
+
 
         struct starpu_deque_jobq_s *deque_queue;
-	deque_queue = ws->queue_array[workerid_ctx];
+	deque_queue = ws->queue_array[workerid];
 
-        PTHREAD_MUTEX_LOCK(sched_ctx->sched_mutex[0]);
+        PTHREAD_MUTEX_LOCK(&ws->sched_mutex);
 	// XXX reuse ?
         //total_number_of_jobs++;
 
@@ -194,31 +195,27 @@ int ws_push_task(struct starpu_task *task, unsigned sched_ctx_id)
         deque_queue->njobs++;
         deque_queue->nprocessed++;
 
-        PTHREAD_COND_SIGNAL(sched_ctx->sched_cond[0]);
-        PTHREAD_MUTEX_UNLOCK(sched_ctx->sched_mutex[0]);
+        PTHREAD_COND_SIGNAL(&ws->sched_cond);
+        PTHREAD_MUTEX_UNLOCK(&ws->sched_mutex);
 
         return 0;
 }
 
-static void initialize_ws_policy_for_workers(unsigned sched_ctx_id, unsigned nnew_workers) 
+static void initialize_ws_policy_for_workers(unsigned sched_ctx_id, int *workerids,unsigned nnew_workers) 
 {
 	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
 	work_stealing_data *ws = (work_stealing_data*)sched_ctx->policy_data;
 
-	unsigned nworkers_ctx = sched_ctx->nworkers;
-
-	struct starpu_machine_config_s *config = (struct starpu_machine_config_s *)_starpu_get_machine_config();
-	unsigned ntotal_workers = config->topology.nworkers;
-
-	unsigned all_workers = nnew_workers == ntotal_workers ? ntotal_workers : nworkers_ctx + nnew_workers;
-
-	unsigned workerid_ctx;
-	for (workerid_ctx = nworkers_ctx; workerid_ctx < all_workers; workerid_ctx++)
-	  {
-		ws->queue_array[workerid_ctx] = _starpu_create_deque();
+	unsigned i;
+	int workerid;
+	
+	for (i = 0; i < nnew_workers; i++)
+	{
+		workerid = workerids[i];
+		ws->queue_array[workerid] = _starpu_create_deque();
 
-		sched_ctx->sched_mutex[workerid_ctx] = sched_ctx->sched_mutex[0];
-		sched_ctx->sched_cond[workerid_ctx] = sched_ctx->sched_cond[0];
+		sched_ctx->sched_mutex[workerid] = &ws->sched_mutex;
+		sched_ctx->sched_cond[workerid] = &ws->sched_cond;
 	}
 }
 
@@ -232,24 +229,46 @@ static void initialize_ws_policy(unsigned sched_ctx_id)
 	ws->rr_worker = 0;
 	ws->queue_array = (struct starpu_deque_jobq_s**)malloc(STARPU_NMAXWORKERS*sizeof(struct starpu_deque_jobq_s*));
 
-	pthread_mutex_t *sched_mutex = (pthread_mutex_t*) malloc(sizeof(pthread_mutex_t));
-	pthread_cond_t *sched_cond = (pthread_cond_t*) malloc(sizeof(pthread_cond_t));
-	PTHREAD_MUTEX_INIT(sched_mutex, NULL);
-	PTHREAD_COND_INIT(sched_cond, NULL);
+	PTHREAD_MUTEX_INIT(&ws->sched_mutex, NULL);
+	PTHREAD_COND_INIT(&ws->sched_cond, NULL);
 
 	unsigned workerid_ctx;
+	int workerid;
 	for (workerid_ctx = 0; workerid_ctx < nworkers; workerid_ctx++)
 	{
-		ws->queue_array[workerid_ctx] = _starpu_create_deque();
+		workerid = sched_ctx->workerids[workerid_ctx];
+		ws->queue_array[workerid] = _starpu_create_deque();
+
+		sched_ctx->sched_mutex[workerid] = &ws->sched_mutex;
+		sched_ctx->sched_cond[workerid] = &ws->sched_cond;
+	}
+}
+
+static void deinit_ws_policy(unsigned sched_ctx_id)
+{
+	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	unsigned nworkers_ctx = sched_ctx->nworkers;
 
-		sched_ctx->sched_mutex[workerid_ctx] = sched_mutex;
-		sched_ctx->sched_cond[workerid_ctx] = sched_cond;
+	struct work_stealing_data *data = (struct work_stealing_data*)malloc(sizeof(work_stealing_data));
+	
+
+	pthread_mutex_init(&data->sched_mutex, NULL);
+	pthread_cond_init(&data->sched_cond, NULL);
+
+	int workerid;
+	unsigned workerid_ctx;
+	for (workerid_ctx = 0; workerid_ctx < nworkers_ctx; workerid_ctx++)
+	{
+		workerid = sched_ctx->workerids[workerid_ctx];
+		_starpu_destroy_deque(&data->queue_array[workerid]);
+		sched_ctx->sched_mutex[workerid] = &data->sched_mutex;
+		sched_ctx->sched_cond[workerid] = &data->sched_cond;
 	}
 }
 
 struct starpu_sched_policy_s _starpu_sched_ws_policy = {
 	.init_sched = initialize_ws_policy,
-	.deinit_sched = NULL,
+	.deinit_sched = deinit_ws_policy,
 	.push_task = ws_push_task,
 	.pop_task = ws_pop_task,
 	.post_exec_hook = NULL,

+ 1 - 1
tests/cholesky_ctxs/all_sched.sh

@@ -20,7 +20,7 @@
 #export STARPU_DIR=$HOME/sched_ctx/build
 
 #source sched.sh isole 0 0 0 
-source sched_no_ctxs.sh
+#source sched_no_ctxs.sh
 source sched_no_ctxs.sh 1stchole -chole1
 source sched_no_ctxs.sh 2ndchole -chole2