Преглед на файлове

worker_collection (generic structure to represent the set of workers), possible to construct several structures
for the moment available for heft only

Andra Hugo преди 13 години
родител
ревизия
6769aa9ea3
променени са 9 файла, в които са добавени 466 реда и са изтрити 349 реда
  1. 1 2
      examples/Makefile.am
  2. 29 7
      include/starpu_scheduler.h
  3. 39 47
      sched_ctx_hypervisor/src/sched_ctx_hypervisor.c
  4. 9 8
      src/Makefile.am
  5. 155 207
      src/core/sched_ctx.c
  6. 3 12
      src/core/sched_ctx.h
  7. 24 25
      src/core/sched_policy.c
  8. 49 41
      src/sched_policies/heft.c
  9. 157 0
      src/worker_collection/worker_list.c

+ 1 - 2
examples/Makefile.am

@@ -171,13 +171,12 @@ examplebin_PROGRAMS +=				\
 	incrementer/incrementer			\
 	matvecmult/matvecmult			\
 	profiling/profiling			\
-	scheduler/dummy_sched			\
 	reductions/dot_product			\
 	reductions/minmax_reduction		\
 	mandelbrot/mandelbrot			\
 	ppm_downscaler/ppm_downscaler		\
 	ppm_downscaler/yuv_downscaler
-
+#	scheduler/dummy_sched			
 if STARPU_HAVE_F77_H
 examplebin_PROGRAMS +=				\
 	basic_examples/vector_scal_fortran

+ 29 - 7
include/starpu_scheduler.h

@@ -71,9 +71,6 @@ struct starpu_sched_policy_s {
 	/* Initialize the scheduling policy. */
 	void (*init_sched)(unsigned ctx_id);
 
-	/* Initialize the scheduling policy only for certain workers. */
-	void (*init_sched_for_workers)(unsigned ctx_id, int *workerids, unsigned nworkers);
-
 	/* Cleanup the scheduling policy. */
 	void (*deinit_sched)(unsigned ctx_id);
 
@@ -100,6 +97,12 @@ struct starpu_sched_policy_s {
 	/* This method is called every time a task has been executed. (optionnal) */
 	void (*post_exec_hook)(struct starpu_task *, unsigned ctx_id);
 
+	/* Initialize the scheduling policy for added workers. */
+	void (*add_workers)(unsigned ctx_id, int *workerids, unsigned nworkers);
+
+	/* Deinitialize the scheduling policy for removed workers. */
+	void (*remove_workers)(unsigned ctx_id, int *workerids, unsigned nworkers);
+
 	/* Name of the policy (optionnal) */
 	const char *policy_name;
 
@@ -107,6 +110,23 @@ struct starpu_sched_policy_s {
 	const char *policy_description;
 };
 
+struct worker_collection {
+	void *workerids;
+	unsigned nworkers;
+	pthread_key_t cursor_key;
+	int type;
+	unsigned (*has_next)(struct worker_collection *workers);
+	int (*get_next)(struct worker_collection *workers);
+	int (*add)(struct worker_collection *workers, int worker);
+	int (*remove)(struct worker_collection *workers, int worker);
+	void* (*init)(struct worker_collection *workers);
+	void (*deinit)(struct worker_collection *workers);
+	void (*init_cursor)(struct worker_collection *workers);
+	void (*deinit_cursor)(struct worker_collection *workers);
+};
+
+#define WORKER_LIST 0
+
 struct starpu_sched_ctx_hypervisor_criteria {
 	void (*update_current_idle_time)(unsigned sched_ctx, int worker, double idle_time, unsigned current_nprocs);
 	void (*update_current_working_time)(unsigned sched_ctx, int worker, double working_time, unsigned current_nprocs);
@@ -124,10 +144,6 @@ void starpu_add_workers_to_sched_ctx(int *workerids_ctx, int nworkers_ctx, unsig
 
 void starpu_remove_workers_from_sched_ctx(int *workerids_ctx, int nworkers_ctx, unsigned sched_ctx);
 
-int* starpu_get_workers_of_ctx(unsigned sched_ctx);
-
-unsigned starpu_get_nworkers_of_ctx(unsigned sched_ctx);
-
 void starpu_set_sched_ctx_policy_data(unsigned sched_ctx, void* policy_data);
 
 void* starpu_get_sched_ctx_policy_data(unsigned sched_ctx);
@@ -140,6 +156,12 @@ void starpu_worker_init_sched_condition(unsigned sched_ctx, int workerid);
 
 void starpu_worker_deinit_sched_condition(unsigned sched_ctx, int workerid);
 
+void starpu_create_worker_collection_for_sched_ctx(unsigned sched_ctx_id, int type);
+
+struct worker_collection* starpu_get_worker_collection_of_sched_ctx(unsigned sched_ctx_id);
+
+pthread_mutex_t* starpu_get_changing_ctx_mutex(unsigned sched_ctx_id);
+
 /* Check if the worker specified by workerid can execute the codelet. */
 int starpu_worker_may_execute_task(unsigned workerid, struct starpu_task *task, unsigned nimpl);
 

+ 39 - 47
sched_ctx_hypervisor/src/sched_ctx_hypervisor.c

@@ -103,17 +103,17 @@ void sched_ctx_hypervisor_increase_priority(unsigned sched_ctx, int priority_ste
 static int compute_priority_per_sched_ctx(unsigned sched_ctx)
 {
 	struct sched_ctx_wrapper *sched_ctx_wrapper = &hypervisor.sched_ctx_wrapper[sched_ctx];
-	int i;
 	int total_priority = 0;
 
-	int nworkers_ctx = starpu_get_nworkers_of_ctx(sched_ctx);
-	int *workers = starpu_get_workers_of_ctx(sched_ctx);
-
-	sched_ctx_wrapper->current_nprocs = nworkers_ctx;
-
-	for(i = 0; i < sched_ctx_wrapper->current_nprocs; i++)
-		total_priority += sched_ctx_wrapper->priority[workers[i]] + sched_ctx_wrapper->current_idle_time[workers[i]];
-
+	struct worker_collection *workers = starpu_get_worker_collection_of_sched_ctx(sched_ctx);
+	int worker;
+	workers->init_cursor(workers);
+	while(workers->has_next(workers))
+	{
+		worker = workers->get_next(workers);
+		total_priority += sched_ctx_wrapper->priority[worker] + sched_ctx_wrapper->current_idle_time[worker];
+	}
+	workers->init_cursor(workers);
 	return total_priority;
 }
 
@@ -139,39 +139,42 @@ static struct sched_ctx_wrapper* find_highest_priority_sched_ctx(unsigned sched_
 	return sched_ctx_wrapper;
 }
 
-static int* sort_workers_by_priority(unsigned sched_ctx, int worker)
+static int* get_first_workers_by_priority(unsigned sched_ctx, int req_worker, int nworkers)
 {
-	int nworkers_ctx = starpu_get_nworkers_of_ctx(sched_ctx);
-	int *workers = starpu_get_workers_of_ctx(sched_ctx);
-	
 	struct sched_ctx_wrapper *sched_ctx_wrapper = &hypervisor.sched_ctx_wrapper[sched_ctx];
-	sched_ctx_wrapper->current_nprocs = nworkers_ctx;
 
-	int curr_workers[nworkers_ctx];
+	int curr_workers[nworkers];
+	int i;
+	for(i = 0; i < nworkers; i++)
+		curr_workers[i] = -1;
 
-	int k;
-	for(k = 0; k < nworkers_ctx; k++)
-		curr_workers[k] = workers[k];
+	struct worker_collection *workers = starpu_get_worker_collection_of_sched_ctx(sched_ctx);
+	int index;
+	int worker;
+	int considered = 0;
 
-	unsigned i;
-	int temp;
-	for(i = 1; i < nworkers_ctx; i++)
+	workers->init_cursor(workers);
+	for(index = 0; index < nworkers; index++)
 	{
-		if(curr_workers[i] == worker)
+		while(workers->has_next(workers))
 		{
-			temp = curr_workers[0];
-			curr_workers[0] = curr_workers[i];
-			curr_workers[i] = temp;
-		}
-		else if(sched_ctx_wrapper->priority[workers[i - 1]] >
-			sched_ctx_wrapper->priority[workers[i]])
-		{
-			temp = curr_workers[i - 1];
-			curr_workers[i - 1] = curr_workers[i];
-			curr_workers[i] = temp;
+			worker = workers->get_next(workers);
+			for(i = 0; i < index; i++)
+			{
+				if(curr_workers[i] == worker)
+				{
+					considered = 1;
+					break;
+				}
+			}
+			
+			if(!considered && (curr_workers[index] < 0 || 
+					  sched_ctx_wrapper->priority[worker] >
+					  sched_ctx_wrapper->priority[curr_workers[index]]))
+				curr_workers[index] = worker;
 		}
 	}
-
+	workers->deinit_cursor(workers);
 	return curr_workers;
 }
 
@@ -181,9 +184,6 @@ static void resize_ctxs_if_possible(unsigned sched_ctx, int worker)
 	struct sched_ctx_wrapper *current_sched_ctx = &hypervisor.sched_ctx_wrapper[sched_ctx];
 	if(highest_priority_sched_ctx != NULL && current_sched_ctx->sched_ctx != STARPU_NMAX_SCHED_CTXS)
 	{	
-		/* sort workers by priority in order to find the first ones with the lowest
-		   priority in the current ctx and move them to the ctx with the highest priority*/
-		int *ordered_workers = sort_workers_by_priority(sched_ctx, worker);
 		unsigned nworkers_to_be_moved = 0;
 		
 		unsigned potential_nprocs = highest_priority_sched_ctx->current_nprocs +
@@ -193,22 +193,14 @@ static void resize_ctxs_if_possible(unsigned sched_ctx, int worker)
 		   potential_nprocs > current_sched_ctx->min_nprocs)
 			nworkers_to_be_moved = hypervisor.resize_granularity;
 		
+
 		if(nworkers_to_be_moved > 0)
 		{
-			int workers_to_be_moved[nworkers_to_be_moved];
-			
-			int i, j = 0;
-			for(i = 0; i < current_sched_ctx->current_nprocs; i++)
-				workers_to_be_moved[j++] = ordered_workers[i];
-			
-//			printf("high prio %d\n", highest_priority_sched_ctx->sched_ctx);
-//			printf("curr %d\n", current_sched_ctx->sched_ctx);
-			
-//			printf("n = %d %d\n", nworkers_to_be_moved, workers_to_be_moved[0]);
+			int *workers_to_be_moved = get_first_workers_by_priority(sched_ctx, worker, nworkers_to_be_moved);
+						
 			starpu_remove_workers_from_sched_ctx(workers_to_be_moved, nworkers_to_be_moved, sched_ctx);
 			starpu_add_workers_to_sched_ctx(workers_to_be_moved, nworkers_to_be_moved, highest_priority_sched_ctx->sched_ctx);
 			reset_ctx_wrapper_info(sched_ctx);
-//			printf("done resize \n");
 		}
 	}
 }

+ 9 - 8
src/Makefile.am

@@ -146,18 +146,11 @@ libstarpu_la_SOURCES = 						\
 	core/sched_ctx.c					\
 	core/priorities.c					\
 	core/parallel_task.c					\
-	sched_policies/eager_central_policy.c			\
-	sched_policies/eager_central_priority_policy.c		\
-	sched_policies/work_stealing_policy.c			\
-	sched_policies/deque_modeling_policy_data_aware.c	\
 	sched_policies/heft.c					\
-	sched_policies/random_policy.c				\
 	sched_policies/stack_queues.c				\
 	sched_policies/deque_queues.c				\
 	sched_policies/fifo_queues.c				\
 	sched_policies/detect_combined_workers.c		\
-	sched_policies/parallel_heft.c				\
-	sched_policies/parallel_greedy.c			\
 	drivers/driver_common/driver_common.c			\
 	datawizard/memory_nodes.c				\
 	datawizard/write_back.c					\
@@ -206,7 +199,15 @@ libstarpu_la_SOURCES = 						\
 	top/starpu_top.c					\
 	top/starputop_task.c					\
 	top/starputop_message_queue.c				\
-	top/starputop_connection.c
+	top/starputop_connection.c				\
+	worker_collection/worker_list.c				
+#	sched_policies/eager_central_policy.c			
+#	sched_policies/eager_central_priority_policy.c		
+#	sched_policies/work_stealing_policy.c			
+#	sched_policies/deque_modeling_policy_data_aware.c	
+#	sched_policies/random_policy.c				
+#	sched_policies/parallel_heft.c				
+#	sched_policies/parallel_greedy.c			
 
 if STARPU_USE_CPU
 libstarpu_la_SOURCES += drivers/cpu/driver_cpu.c

+ 155 - 207
src/core/sched_ctx.c

@@ -18,6 +18,8 @@
 #include <core/sched_ctx.h>
 #include <common/utils.h>
 
+extern struct worker_collection worker_list;
+
 pthread_key_t sched_ctx_key;
 
 struct sched_ctx_info {
@@ -28,7 +30,7 @@ struct sched_ctx_info {
 
 static unsigned _starpu_get_first_free_sched_ctx(struct starpu_machine_config_s *config);
 static unsigned _starpu_worker_get_first_free_sched_ctx(struct starpu_worker_s *worker);
-static void _starpu_rearange_sched_ctx_workerids(struct starpu_sched_ctx *sched_ctx, int old_nworkers_ctx);
+
 static unsigned _starpu_worker_get_sched_ctx_id(struct starpu_worker_s *worker, unsigned sched_ctx_id);
 
 static void change_worker_sched_ctx( struct starpu_worker_s *worker, struct starpu_sched_ctx *sched_ctx, unsigned sched_ctx_id)
@@ -49,6 +51,7 @@ static void change_worker_sched_ctx( struct starpu_worker_s *worker, struct star
 	}
 
 }
+
 static void update_workers_func(void *buffers[] __attribute__ ((unused)), void *_args)
 {
 	struct sched_ctx_info *sched_ctx_info_args = (struct sched_ctx_info*)_args;
@@ -122,6 +125,7 @@ static void _starpu_update_workers(int *workerids, int nworkers,
 			
 		}		
 	}
+
 	for (i = 0; i < nworkers; i++)
 	{
 		if (tasks[i])
@@ -133,11 +137,55 @@ static void _starpu_update_workers(int *workerids, int nworkers,
 	}
 }
 
-static void _starpu_init_workerids(int *workerids)
+
+static void _starpu_add_workers_to_sched_ctx(struct starpu_sched_ctx *sched_ctx, int *workerids, int nworkers, 
+				       int *added_workers, int *n_added_workers)
 {
-	unsigned i;
-	for(i = 0; i < STARPU_NMAXWORKERS; i++)
-		workerids[i] = -1;
+	struct worker_collection *workers = sched_ctx->workers;
+	struct starpu_machine_config_s *config = (struct starpu_machine_config_s *)_starpu_get_machine_config();
+	
+	int nworkers_to_add = nworkers == -1 ? config->topology.nworkers : nworkers;
+	int workers_to_add[nworkers_to_add];
+
+	int i = 0;
+	for(i = 0; i < nworkers_to_add; i++)
+	{
+		/* added_workers is NULL for the call of this func at the creation of the context*/
+		/* if the function is called at the creation of the context it's no need to do this verif */
+		if(added_workers)
+		{
+			int worker = workers->add(workers, (workerids == NULL ? i : workerids[i]));
+			if(worker > 0)
+				added_workers[(*n_added_workers)++] = worker;		
+		}
+		else
+		{
+			int worker = (workerids == NULL ? i : workerids[i]); 
+			workers->add(workers, worker);
+			workers_to_add[i] = worker;
+		}
+	}
+	if(added_workers && *n_added_workers > 0)
+		sched_ctx->sched_policy->add_workers(sched_ctx->id, added_workers, *n_added_workers);	
+	else
+		sched_ctx->sched_policy->add_workers(sched_ctx->id, workers_to_add, nworkers_to_add);		
+	return;
+}
+
+static void _starpu_remove_workers_from_sched_ctx(struct starpu_sched_ctx *sched_ctx, int *workerids, unsigned nworkers, 
+					    int *removed_workers, int *n_removed_workers)
+{
+	struct worker_collection *workers = sched_ctx->workers;
+
+	int i = 0;
+	for(i = 0; i < nworkers; i++)
+	{
+		int worker = workers->remove(workers, workerids[i]);
+		if(worker > 0)
+			removed_workers[(*n_removed_workers)++] = worker;
+	}
+					   
+	return;
 }
 
 struct starpu_sched_ctx*  _starpu_create_sched_ctx(const char *policy_name, int *workerids, 
@@ -155,7 +203,6 @@ struct starpu_sched_ctx*  _starpu_create_sched_ctx(const char *policy_name, int
 	
 	STARPU_ASSERT(nworkers_ctx <= nworkers);
   
-	sched_ctx->nworkers = nworkers_ctx;
 	PTHREAD_MUTEX_INIT(&sched_ctx->changing_ctx_mutex, NULL);
 
 	sched_ctx->sched_policy = malloc(sizeof(struct starpu_sched_policy_s));
@@ -164,49 +211,34 @@ struct starpu_sched_ctx*  _starpu_create_sched_ctx(const char *policy_name, int
 
 	_starpu_barrier_counter_init(&sched_ctx->tasks_barrier, 0);
 
-	_starpu_init_workerids(sched_ctx->workerids);
-	int j;
-	/* if null add all the workers are to the contex */
-	if(workerids == NULL)
-	{
-		for(j = 0; j < nworkers; j++)
-			sched_ctx->workerids[j] = j;
-
-		sched_ctx->nworkers = nworkers;
-	} 
-	else 
-	{
-		int i;
-		for(i = 0; i < nworkers_ctx; i++)
-		{
-			/* the user should not ask for a resource that does not exist */
-			STARPU_ASSERT( workerids[i] >= 0 &&  workerids[i] <= nworkers);	
-			sched_ctx->workerids[i] = workerids[i];
-			
-		}
-	}
-	
 	/* initialise all sync structures bc the number of workers can modify */
-	sched_ctx->sched_mutex = (pthread_mutex_t**)malloc(STARPU_NMAXWORKERS* sizeof(pthread_mutex_t*));
-	sched_ctx->sched_cond = (pthread_cond_t**)malloc(STARPU_NMAXWORKERS *sizeof(pthread_cond_t*));
-
+	sched_ctx->sched_mutex = (pthread_mutex_t**)malloc(STARPU_NMAXWORKERS * sizeof(pthread_mutex_t*));
+	sched_ctx->sched_cond = (pthread_cond_t**)malloc(STARPU_NMAXWORKERS * sizeof(pthread_cond_t*));
 
+	/*init the strategy structs and the worker_collection of the ressources of the context */
 	_starpu_init_sched_policy(config, sched_ctx, policy_name);
 
+	/* construct the collection of workers(list/tree/etc.) */
+	sched_ctx->workers->workerids = sched_ctx->workers->init(sched_ctx->workers);
+	sched_ctx->workers->nworkers = 0;
+
+	/* after having an worker_collection on the ressources add them */
+	_starpu_add_workers_to_sched_ctx(sched_ctx, workerids, nworkers_ctx, NULL, NULL);
+
 	config->topology.nsched_ctxs++;	
 
 	/* if we create the initial big sched ctx we can update workers' status here
 	   because they haven't been launched yet */
 	if(is_initial_sched)
-	  {
-	    int i;
-	    for(i = 0; i < sched_ctx->nworkers; i++)
-	      {
-		struct starpu_worker_s *worker = _starpu_get_worker_struct(sched_ctx->workerids[i]);
-		worker->sched_ctx[_starpu_worker_get_first_free_sched_ctx(worker)] = sched_ctx;
-	      }
-	  }
-
+	{
+		int i;
+		for(i = 0; i < nworkers_ctx; i++)
+		{
+			struct starpu_worker_s *worker = _starpu_get_worker_struct(workerids[i]);
+			worker->sched_ctx[_starpu_worker_get_first_free_sched_ctx(worker)] = sched_ctx;
+		}
+	}
+	
 	return sched_ctx;
 }
 
@@ -215,7 +247,7 @@ unsigned starpu_create_sched_ctx(const char *policy_name, int *workerids,
 {
 	struct starpu_sched_ctx *sched_ctx = _starpu_create_sched_ctx(policy_name, workerids, nworkers_ctx, 0, sched_name);
 
-	_starpu_update_workers(sched_ctx->workerids, sched_ctx->nworkers, -1, sched_ctx);
+	_starpu_update_workers(sched_ctx->workers->workerids, sched_ctx->workers->nworkers, -1, sched_ctx);
 	return sched_ctx->id;
 }
 
@@ -231,15 +263,6 @@ unsigned starpu_create_sched_ctx_with_criteria(const char *policy_name, int *wor
 }
 #endif
 
-/* check if the worker already belongs to the context */
-static unsigned _starpu_worker_belongs_to_ctx(int workerid, struct starpu_sched_ctx *sched_ctx)
-{
-	int i;
-	for(i = 0; i < sched_ctx->nworkers; i++)
-	  if(sched_ctx->workerids[i] == workerid)
-		  return 1;
-	return 0;
-}
 
 /* free all structures for the context */
 static void free_sched_ctx_mem(struct starpu_sched_ctx *sched_ctx)
@@ -248,6 +271,9 @@ static void free_sched_ctx_mem(struct starpu_sched_ctx *sched_ctx)
 	sched_ctx->sched_policy = NULL;
 	sched_ctx->sched_mutex = NULL;
 	sched_ctx->sched_cond = NULL;
+	sched_ctx->workers->deinit(sched_ctx->workers);
+
+	free(sched_ctx->workers);
 	free(sched_ctx->sched_policy);
 	free(sched_ctx->sched_mutex);
 	free(sched_ctx->sched_cond);
@@ -259,52 +285,7 @@ static void free_sched_ctx_mem(struct starpu_sched_ctx *sched_ctx)
 
 static void _starpu_manage_delete_sched_ctx(struct starpu_sched_ctx *sched_ctx)
 {
-	_starpu_update_workers(sched_ctx->workerids, sched_ctx->nworkers, sched_ctx->id, NULL);
-}
-
-static void _starpu_add_workers_to_sched_ctx(int *new_workers, int nnew_workers,
-					     struct starpu_sched_ctx *sched_ctx)
-{
-        struct starpu_machine_config_s *config = (struct starpu_machine_config_s *)_starpu_get_machine_config();
-        int nworkers = config->topology.nworkers;
-	int nworkers_ctx = sched_ctx->nworkers;
-	int n_added_workers = 0;
-        int added_workers[nworkers];
-
-        /*if null add the rest of the workers which don't already belong to this ctx*/
-        if(new_workers == NULL)
-	{        
-		int j;
-                for(j = 0; j < nworkers; j++)
-                        if(!_starpu_worker_belongs_to_ctx(j, sched_ctx))
-			{
-				sched_ctx->workerids[nworkers_ctx++]= j;
-				added_workers[n_added_workers++] = j;
-			}
-	}
-        else
-	{
-                int i;
-                for(i = 0; i < nnew_workers; i++)
-		{
-                        /* take care the user does not ask for a resource that does not exist */
-                        STARPU_ASSERT(new_workers[i] >= 0 &&  new_workers[i] <= nworkers);
-
-			if(!_starpu_worker_belongs_to_ctx(new_workers[i], sched_ctx))
-			{
-				sched_ctx->workerids[nworkers_ctx++] = new_workers[i];
-				added_workers[n_added_workers++] = new_workers[i];
-			}
-		}
-	}
-	if(n_added_workers > 0)
-	{
-		sched_ctx->sched_policy->init_sched_for_workers(sched_ctx->id, added_workers, n_added_workers);
-		
-		_starpu_update_workers(added_workers, n_added_workers, -1, sched_ctx);
-		sched_ctx->nworkers += n_added_workers;
-	}
-	return;
+	_starpu_update_workers(sched_ctx->workers->workerids, sched_ctx->workers->nworkers, sched_ctx->id, NULL);
 }
 
 void starpu_delete_sched_ctx(unsigned sched_ctx_id, unsigned inheritor_sched_ctx_id)
@@ -318,8 +299,9 @@ void starpu_delete_sched_ctx(unsigned sched_ctx_id, unsigned inheritor_sched_ctx
 	struct starpu_machine_config_s *config = (struct starpu_machine_config_s *)_starpu_get_machine_config();
 	int nworkers = config->topology.nworkers;
 	
-	if(!(sched_ctx->nworkers == nworkers && sched_ctx->nworkers == inheritor_sched_ctx->nworkers))
-		_starpu_add_workers_to_sched_ctx(sched_ctx->workerids, sched_ctx->nworkers, inheritor_sched_ctx);
+	if(!(sched_ctx->workers->nworkers == nworkers && sched_ctx->workers->nworkers == inheritor_sched_ctx->workers->nworkers))
+		starpu_add_workers_to_sched_ctx(sched_ctx->workers->workerids, sched_ctx->workers->nworkers, inheritor_sched_ctx_id);
+
 	if(!starpu_wait_for_all_tasks_of_sched_ctx(sched_ctx_id))
 	{
 		free_sched_ctx_mem(sched_ctx);
@@ -345,65 +327,38 @@ void _starpu_delete_all_sched_ctxs()
 	return;
 }
 
-void starpu_add_workers_to_sched_ctx(int *workers_to_add, int nworkers_to_add,
-				     unsigned sched_ctx_id)
-{
-	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	PTHREAD_MUTEX_LOCK(&sched_ctx->changing_ctx_mutex);
-	_starpu_add_workers_to_sched_ctx(workers_to_add, nworkers_to_add, sched_ctx);
-	PTHREAD_MUTEX_UNLOCK(&sched_ctx->changing_ctx_mutex);
-	return;
-}
-
-static void _starpu_remove_workers_from_sched_ctx(int *workerids, int nworkers_to_remove, 
-						  struct starpu_sched_ctx *sched_ctx)
+static void _starpu_check_workers(int *workerids, int nworkers)
 {
-	struct starpu_machine_config_s *config = (struct starpu_machine_config_s *)_starpu_get_machine_config();
-	int nworkers = config->topology.nworkers;
-	int nworkers_ctx =  sched_ctx->nworkers;
-
-	STARPU_ASSERT(nworkers_to_remove  <= nworkers_ctx);
-
-	int i, workerid;
-	int nremoved_workers = 0;
-        int removed_workers[nworkers_ctx];
+        struct starpu_machine_config_s *config = (struct starpu_machine_config_s *)_starpu_get_machine_config();
+        int nworkers_conf = config->topology.nworkers;
 
-	/*if null remove all the workers that belong to this ctx*/
-	if(workerids == NULL)
-	{
-		for(i = 0; i < nworkers_ctx; i++)
-		{
-			removed_workers[i] = sched_ctx->workerids[i];
-			sched_ctx->workerids[i] = -1;
-			nremoved_workers++;
-		}
-		sched_ctx->nworkers = 0;
-	} 
-	else 
+	int i;
+	for(i = 0; i < nworkers; i++)
 	{
-		for(i = 0; i < nworkers_to_remove; i++)
-		{
-			workerid = workerids[i]; 
-			/* take care the user does not ask for a resource that does not exist */
-			STARPU_ASSERT( workerid >= 0 &&  workerid <= nworkers);
- 			removed_workers[nremoved_workers++] = workerid;
-			int workerid_ctx = _starpu_get_index_in_ctx_of_workerid(sched_ctx->id, workerid);
-			sched_ctx->workerids[workerid_ctx] = -1;
+		/* take care the user does not ask for a resource that does not exist */
+		STARPU_ASSERT(workerids[i] >= 0 &&  workerids[i] <= nworkers_conf);
+	}		
 
-		}
+}
 
-		if(nremoved_workers > 0)
-		{
-			sched_ctx->nworkers -= nremoved_workers;
-			_starpu_rearange_sched_ctx_workerids(sched_ctx, nworkers_ctx);
-		}
-	}
+void starpu_add_workers_to_sched_ctx(int *workers_to_add, int nworkers_to_add,
+				     unsigned sched_ctx_id)
+{
+	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	int added_workers[nworkers_to_add];
+	int n_added_workers = 0;
+
+	PTHREAD_MUTEX_LOCK(&sched_ctx->changing_ctx_mutex);
 
-	int j;
+	STARPU_ASSERT(workers_to_add != NULL && nworkers_to_add > 0);
+	_starpu_check_workers(workers_to_add, nworkers_to_add);
 
-	if(nremoved_workers > 0)
-		_starpu_update_workers(removed_workers, nremoved_workers, sched_ctx->id, NULL);
+	_starpu_add_workers_to_sched_ctx(sched_ctx, workers_to_add, nworkers_to_add, added_workers, &n_added_workers);
 
+	if(n_added_workers > 0)
+		_starpu_update_workers(added_workers, n_added_workers, -1, sched_ctx);
+       
+	PTHREAD_MUTEX_UNLOCK(&sched_ctx->changing_ctx_mutex);
 	return;
 }
 
@@ -411,8 +366,22 @@ void starpu_remove_workers_from_sched_ctx(int *workers_to_remove, int nworkers_t
 					  unsigned sched_ctx_id)
 {
 	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	int removed_workers[nworkers_to_remove];
+	int n_removed_workers = 0;
+
 	PTHREAD_MUTEX_LOCK(&sched_ctx->changing_ctx_mutex);
-	_starpu_remove_workers_from_sched_ctx(workers_to_remove, nworkers_to_remove, sched_ctx);
+
+	STARPU_ASSERT(workers_to_remove != NULL && nworkers_to_remove > 0);
+	_starpu_check_workers(workers_to_remove, nworkers_to_remove);
+
+	_starpu_remove_workers_from_sched_ctx(sched_ctx, workers_to_remove, nworkers_to_remove, removed_workers, &n_removed_workers);
+	
+	if(n_removed_workers > 0)
+	{
+		sched_ctx->sched_policy->remove_workers(sched_ctx_id, removed_workers, n_removed_workers);
+		_starpu_update_workers(removed_workers, n_removed_workers, sched_ctx->id, NULL);
+	}
+       
 	PTHREAD_MUTEX_UNLOCK(&sched_ctx->changing_ctx_mutex);
 	return;
 }
@@ -475,38 +444,6 @@ static unsigned _starpu_worker_get_sched_ctx_id(struct starpu_worker_s *worker,
 	return STARPU_NMAX_SCHED_CTXS;
 }
 
-static int _starpu_get_first_free_worker(int *workerids, int nworkers)
-{
-	int i;
-	for(i = 0; i < nworkers; i++)
-		if(workerids[i] == -1)
-			return i;
-
-	return -1;
-}
-
-/* rearange array of workerids in order not to have {-1, -1, 5, -1, 7}
-   and have instead {5, 7, -1, -1, -1} 
-   it is easier afterwards to iterate the array
-*/
-static void _starpu_rearange_sched_ctx_workerids(struct starpu_sched_ctx *sched_ctx, int old_nworkers)
-{
-	int first_free_id = -1;
-	int i;
-	for(i = 0; i < old_nworkers; i++)
-	{
-		if(sched_ctx->workerids[i] != -1)
-		{
-			first_free_id = _starpu_get_first_free_worker(sched_ctx->workerids,old_nworkers);
-			if(first_free_id != -1)
-			{
-				sched_ctx->workerids[first_free_id] = sched_ctx->workerids[i];
-				sched_ctx->workerids[i] = -1;
-			}
-		}
-	  }
-}
-
 int starpu_wait_for_all_tasks_of_worker(int workerid)
 {
 	if (STARPU_UNLIKELY(!_starpu_worker_may_perform_blocking_calls()))
@@ -578,19 +515,6 @@ void _starpu_increment_nsubmitted_tasks_of_sched_ctx(unsigned sched_ctx_id)
 	_starpu_barrier_counter_increment(&sched_ctx->tasks_barrier);
 }
 
-int _starpu_get_index_in_ctx_of_workerid(unsigned sched_ctx_id, unsigned workerid)
-{
-	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	
-	int nworkers_ctx = sched_ctx->nworkers;
-
-	int i;
-	for(i = 0; i < nworkers_ctx; i++)
-		if(sched_ctx->workerids[i] == workerid)
-			return i;
-
-	return -1;
-}
 
 pthread_mutex_t *_starpu_get_sched_mutex(struct starpu_sched_ctx *sched_ctx, int workerid)
 {
@@ -602,12 +526,6 @@ pthread_cond_t *_starpu_get_sched_cond(struct starpu_sched_ctx *sched_ctx, int w
 	return sched_ctx->sched_cond[workerid];
 }
 
-int* starpu_get_workers_of_ctx(unsigned sched_ctx_id)
-{
-	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	return sched_ctx->workerids;
-}
-
 void starpu_set_sched_ctx(unsigned *sched_ctx)
 {
 	pthread_setspecific(sched_ctx_key, (void*)sched_ctx);
@@ -626,12 +544,6 @@ unsigned _starpu_get_nsched_ctxs()
 	return config->topology.nsched_ctxs;
 }
 
-unsigned starpu_get_nworkers_of_ctx(unsigned sched_ctx_id)
-{
-	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	return sched_ctx->nworkers;
-}
-
 void starpu_set_sched_ctx_policy_data(unsigned sched_ctx_id, void* policy_data)
 {
 	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
@@ -675,3 +587,39 @@ void starpu_worker_deinit_sched_condition(unsigned sched_ctx_id, int workerid)
 	free(sched_ctx->sched_mutex[workerid]);
 	free(sched_ctx->sched_cond[workerid]);
 }
+
+void starpu_create_worker_collection_for_sched_ctx(unsigned sched_ctx_id, int worker_collection_type)
+{
+	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	sched_ctx->workers = (struct worker_collection*)malloc(sizeof(struct worker_collection));
+
+	switch(worker_collection_type)
+	{
+	case WORKER_LIST:
+		sched_ctx->workers->has_next = worker_list.has_next;
+		sched_ctx->workers->get_next = worker_list.get_next;
+		sched_ctx->workers->add = worker_list.add;
+		sched_ctx->workers->remove = worker_list.remove;
+		sched_ctx->workers->init = worker_list.init;
+		sched_ctx->workers->deinit = worker_list.deinit;
+		sched_ctx->workers->init_cursor = worker_list.init_cursor;
+		sched_ctx->workers->deinit_cursor = worker_list.deinit_cursor;
+		sched_ctx->workers->type = WORKER_LIST; 
+		break;
+	}
+
+
+	return;
+}
+
+struct worker_collection* starpu_get_worker_collection_of_sched_ctx(unsigned sched_ctx_id)
+{
+	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	return sched_ctx->workers;
+}
+
+pthread_mutex_t* starpu_get_changing_ctx_mutex(unsigned sched_ctx_id)
+{
+	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	return &sched_ctx->changing_ctx_mutex;
+}

+ 3 - 12
src/core/sched_ctx.h

@@ -39,13 +39,9 @@ struct starpu_sched_ctx {
 
 	/* data necessary for the policy */
 	void *policy_data;
-	
-	/* list of indices of workers */
-	int workerids[STARPU_NMAXWORKERS]; 
-	
-	/* number of threads in contex */
-	int nworkers; 
 
+	struct worker_collection *workers;
+	
 	/* mutext for temp_nworkers_in_ctx*/
 	pthread_mutex_t changing_ctx_mutex;
 
@@ -60,6 +56,7 @@ struct starpu_sched_ctx {
 
 	/* table of sched mutex corresponding to each worker in this ctx */
 	pthread_mutex_t **sched_mutex;
+
 #ifdef STARPU_USE_SCHED_CTX_HYPERVISOR
 	/* a structure containing a series of criteria determining the resize procedure */
 	struct starpu_sched_ctx_hypervisor_criteria *criteria;
@@ -98,10 +95,4 @@ unsigned _starpu_get_nsched_ctxs();
 /* Get the mutex corresponding to the global workerid */
 pthread_mutex_t *_starpu_get_sched_mutex(struct starpu_sched_ctx *sched_ctx, int worker);
 
-/* Treat add workers requests */
-void _starpu_actually_add_workers_to_sched_ctx(struct starpu_sched_ctx *sched_ctx);
-
-/* Treat remove workers requests */
-void _starpu_actually_remove_workers_from_sched_ctx(struct starpu_sched_ctx *sched_ctx);
-
 #endif // __SCHED_CONTEXT_H__

+ 24 - 25
src/core/sched_policy.c

@@ -36,30 +36,30 @@ int starpu_get_prefetch_flag(void)
  *	Predefined policies
  */
 
-extern struct starpu_sched_policy_s _starpu_sched_ws_policy;
-extern struct starpu_sched_policy_s _starpu_sched_prio_policy;
-extern struct starpu_sched_policy_s _starpu_sched_random_policy;
-extern struct starpu_sched_policy_s _starpu_sched_dm_policy;
-extern struct starpu_sched_policy_s _starpu_sched_dmda_policy;
-extern struct starpu_sched_policy_s _starpu_sched_dmda_ready_policy;
-extern struct starpu_sched_policy_s _starpu_sched_dmda_sorted_policy;
-extern struct starpu_sched_policy_s _starpu_sched_eager_policy;
-extern struct starpu_sched_policy_s _starpu_sched_parallel_heft_policy;
-extern struct starpu_sched_policy_s _starpu_sched_pgreedy_policy;
+/* extern struct starpu_sched_policy_s _starpu_sched_ws_policy; */
+/* extern struct starpu_sched_policy_s _starpu_sched_prio_policy; */
+/* extern struct starpu_sched_policy_s _starpu_sched_random_policy; */
+/* extern struct starpu_sched_policy_s _starpu_sched_dm_policy; */
+/* extern struct starpu_sched_policy_s _starpu_sched_dmda_policy; */
+/* extern struct starpu_sched_policy_s _starpu_sched_dmda_ready_policy; */
+/* extern struct starpu_sched_policy_s _starpu_sched_dmda_sorted_policy; */
+/* extern struct starpu_sched_policy_s _starpu_sched_eager_policy; */
+/* extern struct starpu_sched_policy_s _starpu_sched_parallel_heft_policy; */
+/* extern struct starpu_sched_policy_s _starpu_sched_pgreedy_policy; */
 extern struct starpu_sched_policy_s heft_policy;
 
 static struct starpu_sched_policy_s *predefined_policies[] = {
-	&_starpu_sched_ws_policy,
-	&_starpu_sched_prio_policy,
-	&_starpu_sched_dm_policy,
-	&_starpu_sched_dmda_policy,
-	&heft_policy,
-	&_starpu_sched_dmda_ready_policy,
-	&_starpu_sched_dmda_sorted_policy,
-	&_starpu_sched_random_policy,
-	&_starpu_sched_eager_policy,
-	&_starpu_sched_parallel_heft_policy,
-	&_starpu_sched_pgreedy_policy
+	/* &_starpu_sched_ws_policy, */
+	/* &_starpu_sched_prio_policy, */
+	/* &_starpu_sched_dm_policy, */
+	/* &_starpu_sched_dmda_policy, */
+	&heft_policy
+	/* &_starpu_sched_dmda_ready_policy, */
+	/* &_starpu_sched_dmda_sorted_policy, */
+	/* &_starpu_sched_random_policy, */
+	/* &_starpu_sched_eager_policy, */
+	/* &_starpu_sched_parallel_heft_policy, */
+	/* &_starpu_sched_pgreedy_policy */
 };
 
 struct starpu_sched_policy_s *_starpu_get_sched_policy(struct starpu_sched_ctx *sched_ctx)
@@ -95,7 +95,8 @@ static void load_sched_policy(struct starpu_sched_policy_s *sched_policy, struct
 	policy->pop_every_task = sched_policy->pop_every_task;
 	policy->push_task_notify = sched_policy->push_task_notify;
 	policy->policy_name = sched_policy->policy_name;
-	policy->init_sched_for_workers = sched_policy->init_sched_for_workers;
+	policy->add_workers = sched_policy->add_workers;
+	policy->remove_workers = sched_policy->remove_workers;
 }
 
 static struct starpu_sched_policy_s *find_sched_policy_from_name(const char *policy_name)
@@ -312,9 +313,7 @@ int _starpu_push_task(starpu_job_t j, unsigned job_is_already_locked)
 		struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
 		STARPU_ASSERT(sched_ctx->sched_policy->push_task);
 
-		PTHREAD_MUTEX_LOCK(&sched_ctx->changing_ctx_mutex);
 		ret = sched_ctx->sched_policy->push_task(task, sched_ctx->id);
-		PTHREAD_MUTEX_UNLOCK(&sched_ctx->changing_ctx_mutex);
 	}
 
 	_starpu_profiling_set_task_push_end_time(task);
@@ -402,7 +401,7 @@ struct starpu_task *_starpu_pop_task(struct starpu_worker_s *worker)
 			{
 				if(sched_ctx != NULL && sched_ctx->criteria != NULL)
 				{
-					sched_ctx->criteria->update_current_idle_time(sched_ctx->id, worker->workerid, 1.0, sched_ctx->nworkers);
+					sched_ctx->criteria->update_current_idle_time(sched_ctx->id, worker->workerid, 1.0, sched_ctx->workers->nworkers);
 				}
 			}
 		}

+ 49 - 41
src/sched_policies/heft.c

@@ -53,7 +53,9 @@ void param_modified(struct starputop_param_t* d){
 	//just to show parameter modification
 	fprintf(stderr,"%s has been modified : %f !\n", d->name, d->value);
 }
-static void heft_init_for_workers(unsigned sched_ctx_id, int *workerids, unsigned nnew_workers)
+
+
+static void heft_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nnew_workers)
 {
 	int workerid;
 	unsigned i;
@@ -77,17 +79,28 @@ static void heft_init_for_workers(unsigned sched_ctx_id, int *workerids, unsigne
 		starpu_worker_set_sched_condition(sched_ctx_id, workerid, &workerarg->sched_mutex, &workerarg->sched_cond);
 	}
 }
+
+static void heft_remove_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
+{
+	int workerid;
+	unsigned i;
+	for (i = 0; i < nworkers; i++)
+	{
+		workerid = workerids[i];
+		starpu_worker_set_sched_condition(sched_ctx_id, workerid, NULL, NULL);
+	}
+}
+
 static void heft_init(unsigned sched_ctx_id)
 {
+	starpu_create_worker_collection_for_sched_ctx(sched_ctx_id, WORKER_LIST);
+
 	heft_data *hd = (heft_data*)malloc(sizeof(heft_data));
 	hd->alpha = STARPU_DEFAULT_ALPHA;
 	hd->beta = STARPU_DEFAULT_BETA;
 	hd->_gamma = STARPU_DEFAULT_GAMMA;
 	hd->idle_power = 0.0;
 	
-	unsigned nworkers_ctx = starpu_get_nworkers_of_ctx(sched_ctx_id);
-	int *workerids = starpu_get_workers_of_ctx(sched_ctx_id);
-
 	starpu_set_sched_ctx_policy_data(sched_ctx_id, (void*)hd);
 
 	const char *strval_alpha = getenv("STARPU_SCHED_ALPHA");
@@ -110,27 +123,6 @@ static void heft_init(unsigned sched_ctx_id)
 	starputop_register_parameter_float("HEFT_BETA", &hd->beta, beta_minimum,beta_maximum,param_modified);
 	starputop_register_parameter_float("HEFT_GAMMA", &hd->_gamma, gamma_minimum,gamma_maximum,param_modified);
 	starputop_register_parameter_float("HEFT_IDLE_POWER", &hd->idle_power, idle_power_minimum,idle_power_maximum,param_modified);
-
-	unsigned workerid_ctx;
-
-	for (workerid_ctx = 0; workerid_ctx < nworkers_ctx; workerid_ctx++)
-	{
-		int workerid = workerids[workerid_ctx];
-		struct starpu_worker_s *workerarg = _starpu_get_worker_struct(workerid);
-		/* init these structures only once for each worker */
-		if(!workerarg->has_prev_init)
-		{
-			exp_start[workerid] = starpu_timing_now();
-			exp_len[workerid] = 0.0;
-			exp_end[workerid] = exp_start[workerid]; 
-			ntasks[workerid] = 0;
-			workerarg->has_prev_init = 1;
-		}
-		/* we push the tasks on the local lists of the workers
-		   therefore the synchronisations mechanisms of the strategy
-		   are the global ones */
-		starpu_worker_set_sched_condition(sched_ctx_id, workerid, &workerarg->sched_mutex, &workerarg->sched_cond);
-	}
 }
 
 static void heft_post_exec_hook(struct starpu_task *task, unsigned sched_ctx_id)
@@ -233,15 +225,15 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 	/* A priori, we know all estimations */
 	int unknown = 0;
 	
-	unsigned nworkers_ctx = starpu_get_nworkers_of_ctx(sched_ctx_id);
-	int *workerids = starpu_get_workers_of_ctx(sched_ctx_id);
-
 	unsigned nimpl;
 	unsigned best_impl = 0;
-	unsigned worker, worker_ctx;
-	for (worker_ctx = 0; worker_ctx < nworkers_ctx; worker_ctx++)
+	unsigned worker, worker_ctx = 0;
+
+	struct worker_collection *workers = starpu_get_worker_collection_of_sched_ctx(sched_ctx_id);
+
+	while(workers->has_next(workers))
 	{
-		worker = workerids[worker_ctx];
+		worker = workers->get_next(workers);
 		for (nimpl = 0; nimpl <STARPU_MAXIMPLEMENTATIONS; nimpl++) 
 		{
 			/* Sometimes workers didn't take the tasks as early as we expected */
@@ -312,6 +304,7 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 			if (local_power[worker_ctx] == -1.0)
 				local_power[worker_ctx] = 0.;
 		}
+		worker_ctx++;
 	}
 
 	*forced_best = unknown?ntasks_best:-1;
@@ -327,14 +320,15 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 static int _heft_push_task(struct starpu_task *task, unsigned prio, unsigned sched_ctx_id)
 {
 	heft_data *hd = (heft_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
-	unsigned worker, worker_ctx;
+	unsigned worker, worker_ctx = 0;
 	int best = -1, best_id_ctx = -1;
 	
 	/* this flag is set if the corresponding worker is selected because
 	   there is no performance prediction available yet */
 	int forced_best;
+	struct worker_collection *workers = starpu_get_worker_collection_of_sched_ctx(sched_ctx_id);
 
-	unsigned nworkers_ctx = starpu_get_nworkers_of_ctx(sched_ctx_id);
+	unsigned nworkers_ctx = workers->nworkers;
 	double local_task_length[nworkers_ctx];
 	double local_data_penalty[nworkers_ctx];
 	double local_power[nworkers_ctx];
@@ -350,6 +344,8 @@ static int _heft_push_task(struct starpu_task *task, unsigned prio, unsigned sch
 
 	struct starpu_task_bundle *bundle = task->bundle;
 
+	workers->init_cursor(workers);
+
 	compute_all_performance_predictions(task, local_task_length, exp_end,
 					    &max_exp_end, &best_exp_end,
 					    local_data_penalty,
@@ -371,10 +367,9 @@ static int _heft_push_task(struct starpu_task *task, unsigned prio, unsigned sch
 	double fitness[nworkers_ctx];
 	double best_fitness = -1;
 
-	int *workerids = starpu_get_workers_of_ctx(sched_ctx_id);
-	for (worker_ctx = 0; worker_ctx < nworkers_ctx; worker_ctx++)
+	while(workers->has_next(workers))
 	{
-		worker = workerids[worker_ctx];
+		worker = workers->get_next(workers);
 
 		if (!starpu_worker_may_execute_task(worker, task, 0))
 		{
@@ -399,6 +394,7 @@ static int _heft_push_task(struct starpu_task *task, unsigned prio, unsigned sch
 			best = worker;
 			best_id_ctx = worker_ctx;
 		}
+		worker_ctx++;
 	}
 
 	/* By now, we must have found a solution */
@@ -431,16 +427,27 @@ static int _heft_push_task(struct starpu_task *task, unsigned prio, unsigned sch
 		model_best = local_task_length[best_id_ctx];
 	}
 
+	workers->deinit_cursor(workers);
+
 	_starpu_increment_nsubmitted_tasks_of_worker(best);
 	return push_task_on_best_worker(task, best, model_best, prio, sched_ctx_id);
 }
 
 static int heft_push_task(struct starpu_task *task, unsigned sched_ctx_id)
 {
+	pthread_mutex_t *changing_ctx_mutex = starpu_get_changing_ctx_mutex(sched_ctx_id);
+	int ret_val = -1;
 	if (task->priority > 0)
-        	  return _heft_push_task(task, 1, sched_ctx_id);
-
-	return _heft_push_task(task, 0, sched_ctx_id);
+	{
+		PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
+		ret_val = _heft_push_task(task, 1, sched_ctx_id);
+		PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
+		return ret_val;
+	}
+	PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
+	ret_val = _heft_push_task(task, 0, sched_ctx_id);
+	PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
+	return ret_val;
 }
 
 static void heft_deinit(unsigned sched_ctx_id) 
@@ -457,7 +464,8 @@ struct starpu_sched_policy_s heft_policy = {
 	.pop_task = NULL,
 	.pop_every_task = NULL,
 	.post_exec_hook = heft_post_exec_hook,
+	.add_workers = heft_add_workers	,
+	.remove_workers = heft_remove_workers,
 	.policy_name = "heft",
-	.policy_description = "Heterogeneous Earliest Finish Task",
-	.init_sched_for_workers = heft_init_for_workers	
+	.policy_description = "Heterogeneous Earliest Finish Task"
 };

+ 157 - 0
src/worker_collection/worker_list.c

@@ -0,0 +1,157 @@
+#include <starpu.h>
+#include <pthread.h>
+
+static unsigned list_has_next(struct worker_collection *workers)
+{
+	unsigned nworkers = workers->nworkers;
+
+	int *cursor = (int*)pthread_getspecific(workers->cursor_key);
+
+	unsigned ret = *cursor < (nworkers - 1);
+
+	if(!ret) *cursor = 0;
+
+	return ret;
+}
+
+static int list_get_next(struct worker_collection *workers)
+{
+	int *workerids = (int *)workers->workerids;
+	unsigned nworkers = workers->nworkers;
+
+	int *cursor = (int*)pthread_getspecific(workers->cursor_key);
+
+	STARPU_ASSERT(*cursor < (nworkers - 1));
+
+	return workerids[(*cursor)++];
+}
+
+static unsigned _worker_belongs_to_ctx(struct worker_collection *workers, int workerid)
+{
+	int *workerids = (int *)workers->workerids;
+	unsigned nworkers = workers->nworkers;
+	
+	int i;
+	for(i = 0; i < nworkers; i++)
+	  if(workerids[i] == workerid)
+		  return 1;
+	return 0;
+}
+
+static int list_add(struct worker_collection *workers, int worker)
+{
+	int *workerids = (int *)workers->workerids;
+	unsigned *nworkers = &workers->nworkers;
+
+	STARPU_ASSERT(*nworkers < STARPU_NMAXWORKERS - 1);
+
+	if(!_worker_belongs_to_ctx(workers, worker))
+	{
+		workerids[(*nworkers)++] = worker;
+		return worker;
+	}
+	else return -1;
+}
+
+static int _get_first_free_worker(int *workerids, int nworkers)
+{
+	int i;
+	for(i = 0; i < nworkers; i++)
+		if(workerids[i] == -1)
+			return i;
+
+	return -1;
+}
+
+/* rearange array of workerids in order not to have {-1, -1, 5, -1, 7}
+   and have instead {5, 7, -1, -1, -1} 
+   it is easier afterwards to iterate the array
+*/
+static void _rearange_workerids(int *workerids, int old_nworkers)
+{
+	int first_free_id = -1;
+	int i;
+	for(i = 0; i < old_nworkers; i++)
+	{
+		if(workerids[i] != -1)
+		{
+			first_free_id = _get_first_free_worker(workerids, old_nworkers);
+			if(first_free_id != -1)
+			{
+				workerids[first_free_id] = workerids[i];
+				workerids[i] = -1;
+			}
+		}
+	  }
+}
+
+static int list_remove(struct worker_collection *workers, int worker)
+{
+	int *workerids = (int *)workers->workerids;
+	unsigned nworkers = workers->nworkers;
+
+	unsigned i;
+	for(i = 0; i < nworkers; i++)
+	{
+		if(workerids[i] == worker)
+		{
+			workerids[i] = -1;
+			break;
+		}
+	}
+
+	_rearange_workerids(workerids, nworkers);
+	workers->nworkers--;
+
+	return;
+}
+
+static void _init_workers(int *workerids)
+{
+	unsigned i;
+	for(i = 0; i < STARPU_NMAXWORKERS; i++)
+		workerids[i] = -1;
+	return;
+}
+
+static void* list_init(struct worker_collection *workers)
+{
+	int *workerids = (int*)malloc(STARPU_NMAXWORKERS * sizeof(int));
+	_init_workers(workerids);
+
+	pthread_key_create(&workers->cursor_key, NULL);
+
+	return (void*)workerids;
+}
+
+static void list_deinit(struct worker_collection *workers)
+{
+	free(workers->workerids);
+	pthread_key_delete(workers->cursor_key);
+}
+
+static void list_init_cursor(struct worker_collection *workers)
+{
+	int *cursor = (int*)malloc(sizeof(int));
+	*cursor = 0;
+	pthread_setspecific(workers->cursor_key, (void*)cursor);
+}
+
+static void list_deinit_cursor(struct worker_collection *workers)
+{
+	int *cursor = (int*)pthread_getspecific(workers->cursor_key);
+	free(cursor);
+}
+
+struct worker_collection worker_list = {
+	.has_next = list_has_next,
+	.get_next = list_get_next,
+	.add = list_add,
+	.remove = list_remove,
+	.init = list_init,
+	.deinit = list_deinit,
+	.init_cursor = list_init_cursor,
+	.deinit_cursor = list_deinit_cursor,
+	.type = WORKER_LIST
+};
+