Переглянути джерело

One mutex per list of tasks (one for the local one, one for each strategy) instead of a single one for all(!!!!! parallel_greedy left to be modified)

Andra Hugo 14 роки тому
батько
коміт
d689285904

+ 1 - 1
examples/cholesky_and_lu/cholesky/cholesky.h

@@ -71,7 +71,7 @@ void chol_cublas_codelet_update_u21(void *descr[], void *_args);
 void chol_cublas_codelet_update_u22(void *descr[], void *_args);
 #endif
 
-double run_cholesky_implicit(int sched_ctx, int start, int argc, char **argv, double *timing, pthread_barrier_t *barrier);
+double run_cholesky_implicit(unsigned sched_ctx, int start, int argc, char **argv, double *timing, pthread_barrier_t *barrier);
 
 extern struct starpu_perfmodel_t chol_model_11;
 extern struct starpu_perfmodel_t chol_model_21;

+ 23 - 44
examples/cholesky_and_lu/cholesky/cholesky_implicit.c

@@ -69,7 +69,7 @@ static void callback_turn_spmd_on(void *arg __attribute__ ((unused)))
 	cl22.type = STARPU_SPMD;
 }
 
-static double _cholesky(starpu_data_handle dataA, unsigned nblocks, int sched_ctx, double *timing)
+static double _cholesky(starpu_data_handle dataA, unsigned nblocks, unsigned sched_ctx, double *timing)
 {
 	struct timeval start;
 	struct timeval end;
@@ -85,36 +85,22 @@ static double _cholesky(starpu_data_handle dataA, unsigned nblocks, int sched_ct
 	{
                 starpu_data_handle sdatakk = starpu_data_get_sub_data(dataA, 2, k, k);
 
-		if(sched_ctx != -1)
-			starpu_insert_task(&cl11,
-					   STARPU_PRIORITY, prio_level,
-					   STARPU_RW, sdatakk,
-					   STARPU_CALLBACK, (k == 3*nblocks/4)?callback_turn_spmd_on:NULL,
-					   STARPU_CTX, sched_ctx,
-					   0);
-		else
-			starpu_insert_task(&cl11,
-					   STARPU_PRIORITY, prio_level,
-					   STARPU_RW, sdatakk,
-					   STARPU_CALLBACK, (k == 3*nblocks/4)?callback_turn_spmd_on:NULL,
-					   0);
+		starpu_insert_task(&cl11,
+				   STARPU_PRIORITY, prio_level,
+				   STARPU_RW, sdatakk,
+				   STARPU_CALLBACK, (k == 3*nblocks/4)?callback_turn_spmd_on:NULL,
+				   STARPU_CTX, sched_ctx,
+				   0);
 
 		for (j = k+1; j<nblocks; j++)
 		{
                         starpu_data_handle sdatakj = starpu_data_get_sub_data(dataA, 2, k, j);
-			if(sched_ctx != -1)
-				starpu_insert_task(&cl21,
-						   STARPU_PRIORITY, (j == k+1)?prio_level:STARPU_DEFAULT_PRIO,
-						   STARPU_R, sdatakk,
-						   STARPU_RW, sdatakj,
-						   STARPU_CTX, sched_ctx,
-						   0);
-			else
-				starpu_insert_task(&cl21,
-						   STARPU_PRIORITY, (j == k+1)?prio_level:STARPU_DEFAULT_PRIO,
-						   STARPU_R, sdatakk,
-						   STARPU_RW, sdatakj,
-						   0);
+			starpu_insert_task(&cl21,
+					   STARPU_PRIORITY, (j == k+1)?prio_level:STARPU_DEFAULT_PRIO,
+					   STARPU_R, sdatakk,
+					   STARPU_RW, sdatakj,
+					   STARPU_CTX, sched_ctx,
+					   0);
 
 			for (i = k+1; i<nblocks; i++)
 			{
@@ -122,21 +108,14 @@ static double _cholesky(starpu_data_handle dataA, unsigned nblocks, int sched_ct
                                 {
 					starpu_data_handle sdataki = starpu_data_get_sub_data(dataA, 2, k, i);
 					starpu_data_handle sdataij = starpu_data_get_sub_data(dataA, 2, i, j);
-					if(sched_ctx != -1)
-						starpu_insert_task(&cl22,
-								   STARPU_PRIORITY, ((i == k+1) && (j == k+1))?prio_level:STARPU_DEFAULT_PRIO,
-								   STARPU_R, sdataki,
-								   STARPU_R, sdatakj,
-								   STARPU_RW, sdataij,
-								   STARPU_CTX, sched_ctx,
-								   0);
-					else 
-						starpu_insert_task(&cl22,
-								   STARPU_PRIORITY, ((i == k+1) && (j == k+1))?prio_level:STARPU_DEFAULT_PRIO,
-								   STARPU_R, sdataki,
-								   STARPU_R, sdatakj,
-								   STARPU_RW, sdataij,
-								   0);
+					
+					starpu_insert_task(&cl22,
+							   STARPU_PRIORITY, ((i == k+1) && (j == k+1))?prio_level:STARPU_DEFAULT_PRIO,
+							   STARPU_R, sdataki,
+							   STARPU_R, sdatakj,
+							   STARPU_RW, sdataij,
+							   STARPU_CTX, sched_ctx,
+							   0);
                                 }
 			}
 		}
@@ -160,7 +139,7 @@ static double _cholesky(starpu_data_handle dataA, unsigned nblocks, int sched_ct
 	return (flop/(*timing)/1000.0f);
 }
 
-static double cholesky(float *matA, unsigned size, unsigned ld, unsigned nblocks, int sched_ctx, double *timing)
+static double cholesky(float *matA, unsigned size, unsigned ld, unsigned nblocks, unsigned sched_ctx, double *timing)
 {
 	starpu_data_handle dataA;
 
@@ -184,7 +163,7 @@ static double cholesky(float *matA, unsigned size, unsigned ld, unsigned nblocks
 	return _cholesky(dataA, nblocks, sched_ctx, timing);
 }
 
-double run_cholesky_implicit(int sched_ctx, int start, int argc, char **argv, double *timing, pthread_barrier_t *barrier)
+double run_cholesky_implicit(unsigned sched_ctx, int start, int argc, char **argv, double *timing, pthread_barrier_t *barrier)
 {
 	/* create a simple definite positive symetric matrix example
 	 *

+ 5 - 5
examples/cholesky_and_lu/cholesky_and_lu.c

@@ -5,7 +5,7 @@ typedef struct {
   int start;
   int argc;
   char **argv;
-  int ctx;
+  unsigned ctx;
 } params;
 
 typedef struct {
@@ -19,7 +19,7 @@ pthread_barrier_t barrier;
 
 void* func_cholesky(void *val){
   params *p = (params*)val;
-  int sched_ctx = p->ctx;
+  unsigned sched_ctx = p->ctx;
 
   int i;
   retvals *rv  = (retvals*)malloc(sizeof(retvals));
@@ -28,7 +28,7 @@ void* func_cholesky(void *val){
   double timing = 0;
   for(i = 0; i < NSAMPLES; i++)
     {
-      rv->flops += run_cholesky_implicit(sched_ctx, p->start, p->argc, p->argv, &timing, (sched_ctx == -1 ? NULL : &barrier));
+      rv->flops += run_cholesky_implicit(sched_ctx, p->start, p->argc, p->argv, &timing, &barrier);
       rv->avg_timing += timing;
     }
 
@@ -50,7 +50,7 @@ void cholesky_vs_cholesky(params *p1, params *p2, params *p3){
 		 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66,
 		 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77,  78,
 		 79, 80, 81, 82, 83, 84, 85};
-  int id = starpu_create_sched_ctx("heft", NULL, -1, "cholesky");
+  unsigned id = starpu_create_sched_ctx("heft", NULL, -1, "cholesky");
   p1->ctx = id;
 
   int procs2[] =  {86, 87, 88, 89, 90,
@@ -127,7 +127,7 @@ int main(int argc, char **argv)
   params p3;
   p3.argc = argc;
   p3.argv = argv;
-  p3.ctx = -1;
+  p3.ctx = 0;
   cholesky_vs_cholesky(&p1, &p2,&p3);
 
   return 0;

+ 5 - 5
include/starpu_scheduler.h

@@ -76,23 +76,23 @@ struct starpu_sched_policy_s {
 	 * worker is scheduled. This method therefore permits to keep the state
 	 * of of the scheduler coherent even when StarPU bypasses the
 	 * scheduling strategy. */
-	void (*push_task_notify)(struct starpu_task *, int workerid);
+	void (*push_task_notify)(struct starpu_task *, int workerid, unsigned);
 
 	/* Insert a priority task into the scheduler. */
         int (*push_prio_task)(struct starpu_task *, unsigned);
 
 	/* Get a task from the scheduler. The mutex associated to the worker is
 	 * already taken when this method is called. */
-	struct starpu_task *(*pop_task)(void);
+	struct starpu_task *(*pop_task)(unsigned);
 
 	 /* Remove all available tasks from the scheduler (tasks are chained by
 	  * the means of the prev and next fields of the starpu_task
 	  * structure). The mutex associated to the worker is already taken
 	  * when this method is called. */
-	struct starpu_task *(*pop_every_task)(void);
+	struct starpu_task *(*pop_every_task)(unsigned);
 
 	/* This method is called every time a task has been executed. (optionnal) */
-	void (*post_exec_hook)(struct starpu_task *);
+	void (*post_exec_hook)(struct starpu_task *, unsigned);
 
 	/* Name of the policy (optionnal) */
 	const char *policy_name;
@@ -101,7 +101,7 @@ struct starpu_sched_policy_s {
 	const char *policy_description;
 };
 
-int starpu_create_sched_ctx(const char *policy_name, int *workerids_in_ctx, int nworkerids_in_ctx, const char *sched_name);
+unsigned starpu_create_sched_ctx(const char *policy_name, int *workerids_in_ctx, int nworkerids_in_ctx, const char *sched_name);
 
 void starpu_delete_sched_ctx(unsigned sched_ctx_id);
 

+ 43 - 11
src/core/sched_ctx.c

@@ -9,7 +9,7 @@ static pthread_cond_t wakeup_ths_cond = PTHREAD_COND_INITIALIZER;
 static pthread_mutex_t blocking_ths_mutex = PTHREAD_MUTEX_INITIALIZER;
 static int nblocked_ths = 0;
 
-int _starpu_create_sched_ctx(const char *policy_name, int *workerids_in_ctx, 
+unsigned _starpu_create_sched_ctx(const char *policy_name, int *workerids_in_ctx, 
 			     int nworkerids_in_ctx, unsigned is_initial_sched,
 			     const char *sched_name)
 {
@@ -65,6 +65,10 @@ int _starpu_create_sched_ctx(const char *policy_name, int *workerids_in_ctx,
 		  }
 	  }
 
+	sched_ctx->sched_mutex = (pthread_mutex_t**)malloc(sched_ctx->nworkers_in_ctx * sizeof(pthread_mutex_t*));
+	sched_ctx->sched_cond = (pthread_cond_t**)malloc(sched_ctx->nworkers_in_ctx *sizeof(pthread_cond_t*));
+
+
 	_starpu_init_sched_policy(config, sched_ctx, policy_name);
 
 	config->topology.nsched_ctxs++;	
@@ -161,10 +165,10 @@ static int set_changing_ctx_flag(starpu_worker_status changing_ctx, int nworkeri
   return 0;
 }
 
-int starpu_create_sched_ctx(const char *policy_name, int *workerids_in_ctx, 
+unsigned starpu_create_sched_ctx(const char *policy_name, int *workerids_in_ctx, 
 			    int nworkerids_in_ctx, const char *sched_name)
 {
-	int ret;
+	unsigned ret;
 	/* block the workers until the contex is switched */
 	set_changing_ctx_flag(STATUS_CHANGING_CTX, nworkerids_in_ctx, workerids_in_ctx);
 	ret = _starpu_create_sched_ctx(policy_name, workerids_in_ctx, nworkerids_in_ctx, 0, sched_name);
@@ -222,6 +226,8 @@ void starpu_delete_sched_ctx(unsigned sched_ctx_id)
 		  }
 	
 		free(sched_ctx->sched_policy);
+		free(sched_ctx->sched_mutex);
+		free(sched_ctx->sched_cond);
 		sched_ctx->sched_policy = NULL;
 	  }		
 	return;	
@@ -485,19 +491,45 @@ int starpu_wait_for_all_tasks_of_sched_ctx(unsigned sched_ctx_id)
 
 void _starpu_decrement_nsubmitted_tasks_of_sched_ctx(struct starpu_sched_ctx *sched_ctx)
 {
-  PTHREAD_MUTEX_LOCK(&sched_ctx->submitted_mutex);
-
-  if (--sched_ctx->nsubmitted == 0)
-    PTHREAD_COND_BROADCAST(&sched_ctx->submitted_cond);
+	PTHREAD_MUTEX_LOCK(&sched_ctx->submitted_mutex);
 
-  PTHREAD_MUTEX_UNLOCK(&sched_ctx->submitted_mutex);
+	if (--sched_ctx->nsubmitted == 0)
+		PTHREAD_COND_BROADCAST(&sched_ctx->submitted_cond);
+	
+	PTHREAD_MUTEX_UNLOCK(&sched_ctx->submitted_mutex);
 }
 
 void _starpu_increment_nsubmitted_tasks_of_sched_ctx(struct starpu_sched_ctx *sched_ctx)
 {
-  PTHREAD_MUTEX_LOCK(&sched_ctx->submitted_mutex);
+	PTHREAD_MUTEX_LOCK(&sched_ctx->submitted_mutex);
+
+	sched_ctx->nsubmitted++;
+	
+	PTHREAD_MUTEX_UNLOCK(&sched_ctx->submitted_mutex);
+}
+
+int _starpu_get_index_in_ctx_of_workerid(unsigned sched_ctx_id, unsigned workerid)
+{
+	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
+	
+	int nworkers_in_ctx = sched_ctx->nworkers_in_ctx;
 
-  sched_ctx->nsubmitted++;
+	int i;
+	for(i = 0; i < nworkers_in_ctx; i++)
+		if(sched_ctx->workerid[i] == (int)workerid)
+			return i;
+	
+	return -1;
+}
 
-  PTHREAD_MUTEX_UNLOCK(&sched_ctx->submitted_mutex);
+pthread_mutex_t *_starpu_get_sched_mutex(struct starpu_sched_ctx *sched_ctx, int worker)
+{
+	int workerid_ctx = _starpu_get_index_in_ctx_of_workerid(sched_ctx->sched_ctx_id, worker);
+	return sched_ctx->sched_mutex[workerid_ctx];
+}
+
+pthread_cond_t *_starpu_get_sched_cond(struct starpu_sched_ctx *sched_ctx, int worker)
+{
+	int workerid_ctx = _starpu_get_index_in_ctx_of_workerid(sched_ctx->sched_ctx_id, worker);
+	return sched_ctx->sched_cond[workerid_ctx];
 }

+ 19 - 4
src/core/sched_ctx.h

@@ -21,8 +21,12 @@
 #include <starpu_scheduler.h>
 
 struct starpu_sched_ctx {
+	/* id of the context used in user mode*/
 	unsigned sched_ctx_id;
 
+	/* name of context */
+	const char *sched_name;
+
 	/* policy of the context */
 	struct starpu_sched_policy_s *sched_policy;
 
@@ -46,12 +50,15 @@ struct starpu_sched_ctx {
 	
 	/* counter used for no of submitted tasks to a sched_ctx */
 	int nsubmitted;	 
-	
-	/* name of context */
-	const char *sched_name;
+
+	/* table of sched cond corresponding to each worker in this ctx */
+	pthread_cond_t **sched_cond;
+
+	/* table of sched mutex corresponding to each worker in this ctx */
+	pthread_mutex_t **sched_mutex;
 };
 
-int _starpu_create_sched_ctx(const char *policy_name, int *workerid, int nworkerids, unsigned is_init_sched, const char *sched_name);
+unsigned _starpu_create_sched_ctx(const char *policy_name, int *workerid, int nworkerids, unsigned is_init_sched, const char *sched_name);
 
 void _starpu_delete_all_sched_ctxs();
 
@@ -67,4 +74,12 @@ void _starpu_increment_nsubmitted_tasks_of_worker(int workerid);
 void _starpu_decrement_nsubmitted_tasks_of_sched_ctx(struct starpu_sched_ctx *sched_ctx);
 void _starpu_increment_nsubmitted_tasks_of_sched_ctx(struct starpu_sched_ctx *sched_ctx);
 
+/* Return the corresponding index of the workerid in the ctx table */
+int _starpu_get_index_in_ctx_of_workerid(unsigned sched_ctx, unsigned workerid);
+
+/* Get the mutex corresponding to the global workerid */
+pthread_mutex_t *_starpu_get_sched_mutex(struct starpu_sched_ctx *sched_ctx, int worker);
+
+/* Get the cond corresponding to the global workerid */
+pthread_cond_t *_starpu_get_sched_cond(struct starpu_sched_ctx *sched_ctx, int worker);
 #endif // __SCHED_CONTEXT_H__

+ 10 - 2
src/core/sched_policy.c

@@ -331,21 +331,29 @@ struct starpu_task *_starpu_pop_task(struct starpu_worker_s *worker)
 	if (profiling)
 		starpu_clock_gettime(&pop_start_time);
 
+	PTHREAD_MUTEX_LOCK(worker->sched_mutex);
 	/* perhaps there is some local task to be executed first */
 	task = _starpu_pop_local_task(worker);
+	PTHREAD_MUTEX_UNLOCK(worker->sched_mutex);
 
 	if(!task)
 	  {
 		struct starpu_sched_ctx *sched_ctx;
+		pthread_mutex_t *sched_mutex_ctx;
+
 		unsigned i;
 		for(i = 0; i < worker->nctxs; i++)
 		  {
 			sched_ctx = worker->sched_ctx[i];
+			sched_mutex_ctx = _starpu_get_sched_mutex(sched_ctx, worker->workerid);
+			PTHREAD_MUTEX_LOCK(sched_mutex_ctx);
 			if (sched_ctx->sched_policy->pop_task)
 			  {
-				task = sched_ctx->sched_policy->pop_task();
+				task = sched_ctx->sched_policy->pop_task(sched_ctx->sched_ctx_id);
+				PTHREAD_MUTEX_UNLOCK(sched_mutex_ctx);
 				break;
 			  }
+			PTHREAD_MUTEX_UNLOCK(sched_mutex_ctx);
 		  }
 	  }
 
@@ -375,7 +383,7 @@ struct starpu_task *_starpu_pop_every_task(struct starpu_sched_ctx *sched_ctx)
 	STARPU_ASSERT(sched_ctx->sched_policy->pop_every_task);
 
 	/* TODO set profiling info */
-	return sched_ctx->sched_policy->pop_every_task();
+	return sched_ctx->sched_policy->pop_every_task(sched_ctx->sched_ctx_id);
 }
 
 void _starpu_sched_post_exec_hook(struct starpu_task *task)

+ 11 - 0
src/core/workers.c

@@ -33,6 +33,10 @@
 /* acquire/release semantic for concurrent initialization/de-initialization */
 static pthread_mutex_t init_mutex = PTHREAD_MUTEX_INITIALIZER;
 static pthread_cond_t init_cond = PTHREAD_COND_INITIALIZER;
+
+static pthread_mutex_t local_list_sched_mutex[STARPU_NMAXWORKERS]; 
+static pthread_cond_t local_list_sched_cond[STARPU_NMAXWORKERS];
+
 static int init_count;
 static enum { UNINITIALIZED, CHANGING, INITIALIZED } initialized = UNINITIALIZED;
 
@@ -159,6 +163,13 @@ static void _starpu_launch_drivers(struct starpu_machine_config_s *config)
 		workerarg->combined_workerid = workerarg->workerid;
 		workerarg->current_rank = 1;
 
+		/* mutex + cond only for the local list */
+		/* we have a single local list */
+		/* afterwards there would be a mutex + cond for the list of each strategy */
+		PTHREAD_MUTEX_INIT(&local_list_sched_mutex[worker], NULL);
+		PTHREAD_COND_INIT(&local_list_sched_cond[worker], NULL);
+
+		starpu_worker_set_sched_condition(worker, &local_list_sched_cond[worker], &local_list_sched_mutex[worker]);
 		/* if some codelet's termination cannot be handled directly :
 		 * for instance in the Gordon driver, Gordon tasks' callbacks
 		 * may be executed by another thread than that of the Gordon

+ 6 - 6
src/drivers/cpu/driver_cpu.c

@@ -168,23 +168,23 @@ void *_starpu_cpu_worker(void *arg)
 		}
 
 		PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
-  
-		PTHREAD_MUTEX_LOCK(sched_mutex);
 
+		/* take the mutex inside pop because it depends what mutex:
+		   the one of the local task or the one of one of the strategies */
 		task = _starpu_pop_task(cpu_arg);
+
                 if (!task) 
 		{
-		   if (_starpu_worker_can_block(memnode))
+			PTHREAD_MUTEX_LOCK(sched_mutex);
+			if (_starpu_worker_can_block(memnode))
 				_starpu_block_worker(workerid, sched_cond, sched_mutex);
 
 			PTHREAD_MUTEX_UNLOCK(sched_mutex);
 			continue;
 		};
 
-		PTHREAD_MUTEX_UNLOCK(sched_mutex);	
-
 		STARPU_ASSERT(task);
-		STARPU_ASSERT(task->sched_ctx < 2);
+
 		j = _starpu_get_job_associated_to_task(task);
 	
 		/* can a cpu perform that task ? */

+ 1 - 2
src/drivers/cuda/driver_cuda.c

@@ -296,12 +296,12 @@ void *_starpu_cuda_worker(void *arg)
 
                 PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
 
-		PTHREAD_MUTEX_LOCK(sched_mutex);
 
 		task = _starpu_pop_task(args);
 
                 if (task == NULL) 
 		{
+			PTHREAD_MUTEX_LOCK(sched_mutex);
 			if (_starpu_worker_can_block(memnode))
 				_starpu_block_worker(workerid, sched_cond, sched_mutex);
 		  
@@ -311,7 +311,6 @@ void *_starpu_cuda_worker(void *arg)
 			continue;
 		};
 
-		PTHREAD_MUTEX_UNLOCK(sched_mutex);
 
 		STARPU_ASSERT(task);
 		j = _starpu_get_job_associated_to_task(task);

+ 116 - 78
src/sched_policies/deque_modeling_policy_data_aware.c

@@ -24,21 +24,25 @@
 #include <core/perfmodel/perfmodel.h>
 #include <starpu_parameters.h>
 
-static unsigned nworkers;
-static struct starpu_fifo_taskq_s *queue_array[STARPU_NMAXWORKERS];
+//static struct starpu_fifo_taskq_s *queue_array[STARPU_NMAXWORKERS];
 
-static pthread_cond_t sched_cond[STARPU_NMAXWORKERS];
-static pthread_mutex_t sched_mutex[STARPU_NMAXWORKERS];
+/* #ifdef STARPU_VERBOSE */
+/* static long int total_task_cnt = 0; */
+/* static long int ready_task_cnt = 0; */
+/* #endif */
 
-static double alpha = STARPU_DEFAULT_ALPHA;
-static double beta = STARPU_DEFAULT_BETA;
-static double _gamma = STARPU_DEFAULT_GAMMA;
-static double idle_power = 0.0;
+typedef struct {
+	double alpha;
+	double beta;
+	double _gamma;
+	double idle_power;
+
+	struct starpu_fifo_taskq_s **queue_array;
+
+	long int total_task_cnt;
+	long int ready_task_cnt;
+} dmda_data;
 
-#ifdef STARPU_VERBOSE
-static long int total_task_cnt = 0;
-static long int ready_task_cnt = 0;
-#endif
 
 static int count_non_ready_buffers(struct starpu_task *task, uint32_t node)
 {
@@ -113,12 +117,16 @@ static struct starpu_task *_starpu_fifo_pop_first_ready_task(struct starpu_fifo_
 	return task;
 }
 
-static struct starpu_task *dmda_pop_ready_task(void)
+static struct starpu_task *dmda_pop_ready_task(unsigned sched_ctx_id)
 {
+	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
+	 dmda_data *dt = (dmda_data*)sched_ctx->policy_data;
+
 	struct starpu_task *task;
 
 	int workerid = starpu_worker_get_id();
-	struct starpu_fifo_taskq_s *fifo = queue_array[workerid];
+	int workerid_ctx =  _starpu_get_index_in_ctx_of_workerid(sched_ctx_id, workerid);
+	struct starpu_fifo_taskq_s *fifo = dt->queue_array[workerid_ctx];
 
 	unsigned node = starpu_worker_get_memory_node(workerid);
 
@@ -135,22 +143,26 @@ static struct starpu_task *dmda_pop_ready_task(void)
 		{
 			int non_ready = count_non_ready_buffers(task, starpu_worker_get_memory_node(workerid));
 			if (non_ready == 0)
-				ready_task_cnt++;
+				dt->ready_task_cnt++;
 		}
 
-		total_task_cnt++;
+		dt->total_task_cnt++;
 #endif
 	}
 
 	return task;
 }
 
-static struct starpu_task *dmda_pop_task(void)
+static struct starpu_task *dmda_pop_task(unsigned sched_ctx_id)
 {
+	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
+	 dmda_data *dt = (dmda_data*)sched_ctx->policy_data;
+
 	struct starpu_task *task;
 
 	int workerid = starpu_worker_get_id();
-	struct starpu_fifo_taskq_s *fifo = queue_array[workerid];
+	int workerid_ctx =  _starpu_get_index_in_ctx_of_workerid(sched_ctx_id, workerid);
+	struct starpu_fifo_taskq_s *fifo = dt->queue_array[workerid_ctx];
 
 	task = _starpu_fifo_pop_task(fifo, -1);
 	if (task) {
@@ -165,10 +177,10 @@ static struct starpu_task *dmda_pop_task(void)
 		{
 			int non_ready = count_non_ready_buffers(task, starpu_worker_get_memory_node(workerid));
 			if (non_ready == 0)
-				ready_task_cnt++;
+				dt->ready_task_cnt++;
 		}
 
-		total_task_cnt++;
+		dt->total_task_cnt++;
 #endif
 	}
 
@@ -177,15 +189,18 @@ static struct starpu_task *dmda_pop_task(void)
 
 
 
-static struct starpu_task *dmda_pop_every_task(void)
+static struct starpu_task *dmda_pop_every_task(unsigned sched_ctx_id)
 {
+	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
+	 dmda_data *dt = (dmda_data*)sched_ctx->policy_data;
+
 	struct starpu_task *new_list;
 
 	int workerid = starpu_worker_get_id();
+	int workerid_ctx =  _starpu_get_index_in_ctx_of_workerid(sched_ctx_id, workerid);
+	struct starpu_fifo_taskq_s *fifo = dt->queue_array[workerid_ctx];
 
-	struct starpu_fifo_taskq_s *fifo = queue_array[workerid];
-
-	new_list = _starpu_fifo_pop_every_task(fifo, &sched_mutex[workerid], workerid);
+	new_list = _starpu_fifo_pop_every_task(fifo, sched_ctx->sched_mutex[workerid_ctx], workerid);
 
 	while (new_list)
 	{
@@ -267,13 +282,16 @@ int _starpu_fifo_push_sorted_task(struct starpu_fifo_taskq_s *fifo_queue, pthrea
 
 
 
-static int push_task_on_best_worker(struct starpu_task *task, int best_workerid, double predicted, int prio)
+static int push_task_on_best_worker(struct starpu_task *task, int best_workerid, double predicted, int prio, struct starpu_sched_ctx *sched_ctx)
 {
+	dmda_data *dt = (dmda_data*)sched_ctx->policy_data;
 	/* make sure someone coule execute that task ! */
 	STARPU_ASSERT(best_workerid != -1);
 
+	int best_workerid_ctx =  _starpu_get_index_in_ctx_of_workerid(sched_ctx->sched_ctx_id, best_workerid);
+
 	struct starpu_fifo_taskq_s *fifo;
-	fifo = queue_array[best_workerid];
+	fifo = dt->queue_array[best_workerid_ctx];
 
 	fifo->exp_end += predicted;
 	fifo->exp_len += predicted;
@@ -287,19 +305,20 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 
 	switch (prio) {
 		case 1:
-			return _starpu_fifo_push_prio_task(queue_array[best_workerid],
-				&sched_mutex[best_workerid], &sched_cond[best_workerid], task);
+			return _starpu_fifo_push_prio_task(dt->queue_array[best_workerid_ctx],
+				sched_ctx->sched_mutex[best_workerid_ctx], sched_ctx->sched_cond[best_workerid_ctx], task);
 		case 2:
-			return _starpu_fifo_push_sorted_task(queue_array[best_workerid],
-				&sched_mutex[best_workerid], &sched_cond[best_workerid], task);
+			return _starpu_fifo_push_sorted_task(dt->queue_array[best_workerid_ctx],
+				sched_ctx->sched_mutex[best_workerid_ctx], sched_ctx->sched_cond[best_workerid_ctx], task);
 		default:
-			return _starpu_fifo_push_task(queue_array[best_workerid],
-				&sched_mutex[best_workerid], &sched_cond[best_workerid], task);
+			return _starpu_fifo_push_task(dt->queue_array[best_workerid_ctx],
+				sched_ctx->sched_mutex[best_workerid_ctx], sched_ctx->sched_cond[best_workerid_ctx], task);
 	}
 }
 
 static int _dm_push_task(struct starpu_task *task, unsigned prio, struct starpu_sched_ctx *sched_ctx)
 {
+	dmda_data *dt = (dmda_data*)sched_ctx->policy_data;
 	/* find the queue */
 	struct starpu_fifo_taskq_s *fifo;
 	unsigned worker, worker_in_ctx;
@@ -315,12 +334,13 @@ static int _dm_push_task(struct starpu_task *task, unsigned prio, struct starpu_
 	/* A priori, we know all estimations */
 	int unknown = 0;
 
+	unsigned nworkers = sched_ctx->nworkers_in_ctx;
 	for (worker_in_ctx = 0; worker_in_ctx < nworkers; worker_in_ctx++)
 	{
                 worker = sched_ctx->workerid[worker_in_ctx];
 		double exp_end;
 		
-		fifo = queue_array[worker];
+		fifo = dt->queue_array[worker_in_ctx];
 
 		/* Sometimes workers didn't take the tasks as early as we expected */
 		fifo->exp_start = STARPU_MAX(fifo->exp_start, starpu_timing_now());
@@ -375,28 +395,31 @@ static int _dm_push_task(struct starpu_task *task, unsigned prio, struct starpu_
 		model_best = 0.0;
 	}
 	
+	_starpu_increment_nsubmitted_tasks_of_worker(best);
 	/* we should now have the best worker in variable "best" */
-	return push_task_on_best_worker(task, best, model_best, prio);
+	return push_task_on_best_worker(task, best, model_best, prio, sched_ctx);
 }
 
 static int _dmda_push_task(struct starpu_task *task, unsigned prio, struct starpu_sched_ctx *sched_ctx)
 {
+	dmda_data *dt = (dmda_data*)sched_ctx->policy_data;
 	/* find the queue */
 	struct starpu_fifo_taskq_s *fifo;
 	unsigned worker, worker_in_ctx;
-	int best = -1;
+	int best = -1, best_in_ctx = -1;
 	
 	/* this flag is set if the corresponding worker is selected because
 	   there is no performance prediction available yet */
 	int forced_best = -1;
 
-	double local_task_length[nworkers];
-	double local_data_penalty[nworkers];
-	double local_power[nworkers];
-	double exp_end[nworkers];
+	unsigned nworkers_in_ctx = sched_ctx->nworkers_in_ctx;
+	double local_task_length[nworkers_in_ctx];
+	double local_data_penalty[nworkers_in_ctx];
+	double local_power[nworkers_in_ctx];
+	double exp_end[nworkers_in_ctx];
 	double max_exp_end = 0.0;
 
-	double fitness[nworkers];
+	double fitness[nworkers_in_ctx];
 
 	double best_exp_end = 10e240;
 	double model_best = 0.0;
@@ -409,11 +432,11 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio, struct starp
 	/* A priori, we know all estimations */
 	int unknown = 0;
 
-	for (worker_in_ctx = 0; worker_in_ctx < nworkers; worker_in_ctx++)
+	for (worker_in_ctx = 0; worker_in_ctx < nworkers_in_ctx; worker_in_ctx++)
 	{
                 worker = sched_ctx->workerid[worker_in_ctx];
 
-		fifo = queue_array[worker];
+		fifo = dt->queue_array[worker_in_ctx];
 
 		/* Sometimes workers didn't take the tasks as early as we expected */
 		fifo->exp_start = STARPU_MAX(fifo->exp_start, starpu_timing_now());
@@ -428,10 +451,10 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio, struct starp
 		}
 
 		enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(worker);
-		local_task_length[worker] = starpu_task_expected_length(task, perf_arch);
+		local_task_length[worker_in_ctx] = starpu_task_expected_length(task, perf_arch);
 
 		unsigned memory_node = starpu_worker_get_memory_node(worker);
-		local_data_penalty[worker] = starpu_task_expected_data_transfer_time(memory_node, task);
+		local_data_penalty[worker_in_ctx] = starpu_task_expected_data_transfer_time(memory_node, task);
 
 		double ntasks_end = fifo->ntasks / starpu_worker_get_relative_speedup(perf_arch);
 
@@ -444,13 +467,13 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio, struct starp
 			ntasks_best = worker;
 		}
 
-		if (local_task_length[worker] == -1.0)
+		if (local_task_length[worker_in_ctx] == -1.0)
 			/* we are calibrating, we want to speed-up calibration time
 			 * so we privilege non-calibrated tasks (but still
 			 * greedily distribute them to avoid dumb schedules) */
 			calibrating = 1;
 
-		if (local_task_length[worker] <= 0.0)
+		if (local_task_length[worker_in_ctx] <= 0.0)
 			/* there is no prediction available for that task
 			 * with that arch yet, so switch to a greedy strategy */
 			unknown = 1;
@@ -458,17 +481,17 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio, struct starp
 		if (unknown)
 			continue;
 
-		exp_end[worker] = fifo->exp_start + fifo->exp_len + local_task_length[worker];
+		exp_end[worker_in_ctx] = fifo->exp_start + fifo->exp_len + local_task_length[worker_in_ctx];
 
-		if (exp_end[worker] < best_exp_end)
+		if (exp_end[worker_in_ctx] < best_exp_end)
 		{
 			/* a better solution was found */
-			best_exp_end = exp_end[worker];
+			best_exp_end = exp_end[worker_in_ctx];
 		}
 
-		local_power[worker] = starpu_task_expected_power(task, perf_arch);
-		if (local_power[worker] == -1.0)
-			local_power[worker] = 0.;
+		local_power[worker_in_ctx] = starpu_task_expected_power(task, perf_arch);
+		if (local_power[worker_in_ctx] == -1.0)
+			local_power[worker_in_ctx] = 0.;
 	}
 
 	if (unknown)
@@ -478,11 +501,11 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio, struct starp
 	
 	if (forced_best == -1)
 	{
-	        for (worker_in_ctx = 0; worker_in_ctx < nworkers; worker_in_ctx++)
+	        for (worker_in_ctx = 0; worker_in_ctx < nworkers_in_ctx; worker_in_ctx++)
 	        {
 		        worker = sched_ctx->workerid[worker_in_ctx];
 
-			fifo = queue_array[worker];
+			fifo = dt->queue_array[worker_in_ctx];
 	
 			if (!starpu_worker_may_execute_task(worker, task))
 			{
@@ -490,21 +513,22 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio, struct starp
 				continue;
 			}
 	
-			fitness[worker] = alpha*(exp_end[worker] - best_exp_end) 
-					+ beta*(local_data_penalty[worker])
-					+ _gamma*(local_power[worker]);
+			fitness[worker_in_ctx] = dt->alpha*(exp_end[worker_in_ctx] - best_exp_end) 
+					+ dt->beta*(local_data_penalty[worker_in_ctx])
+					+ dt->_gamma*(local_power[worker_in_ctx]);
 
-			if (exp_end[worker] > max_exp_end)
+			if (exp_end[worker_in_ctx] > max_exp_end)
 				/* This placement will make the computation
 				 * longer, take into account the idle
 				 * consumption of other cpus */
-				fitness[worker] += _gamma * idle_power * (exp_end[worker] - max_exp_end) / 1000000.0;
+				fitness[worker_in_ctx] += dt->_gamma * dt->idle_power * (exp_end[worker_in_ctx] - max_exp_end) / 1000000.0;
 
-			if (best == -1 || fitness[worker] < best_fitness)
+			if (best == -1 || fitness[worker_in_ctx] < best_fitness)
 			{
 				/* we found a better solution */
-				best_fitness = fitness[worker];
+				best_fitness = fitness[worker_in_ctx];
 				best = worker;
+				best_in_ctx = worker_in_ctx;
 
 	//			_STARPU_DEBUG("best fitness (worker %d) %le = alpha*(%le) + beta(%le) +gamma(%le)\n", worker, best_fitness, exp_end[worker] - best_exp_end, local_data_penalty[worker], local_power[worker]);
 			}
@@ -524,12 +548,12 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio, struct starp
 	}
 	else 
 	{
-		model_best = local_task_length[best];
-		penality_best = local_data_penalty[best];
+		model_best = local_task_length[best_in_ctx];
+		penality_best = local_data_penalty[best_in_ctx];
 	}
 
 	/* we should now have the best worker in variable "best" */
-	return push_task_on_best_worker(task, best, model_best, prio);
+	return push_task_on_best_worker(task, best, model_best, prio, sched_ctx);
 }
 
 static int dmda_push_sorted_task(struct starpu_task *task, unsigned sched_ctx_id)
@@ -570,31 +594,39 @@ static int dmda_push_task(struct starpu_task *task, unsigned sched_ctx_id)
 
 static void initialize_dmda_policy(unsigned sched_ctx_id) 
 {
+	dmda_data *dt = (dmda_data*)malloc(sizeof(dmda_data));
+	dt->alpha = STARPU_DEFAULT_ALPHA;
+	dt->beta = STARPU_DEFAULT_BETA;
+	dt->_gamma = STARPU_DEFAULT_GAMMA;
+	dt->idle_power = 0.0;
+
 	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
-	nworkers = sched_ctx->nworkers_in_ctx;
+	unsigned nworkers = sched_ctx->nworkers_in_ctx;
+	sched_ctx->policy_data = (void*)dt;
+
+	dt->queue_array = (struct starpu_fifo_taskq_s**)malloc(nworkers*sizeof(struct starpu_fifo_taskq_s*));
 
 	const char *strval_alpha = getenv("STARPU_SCHED_ALPHA");
 	if (strval_alpha)
-		alpha = atof(strval_alpha);
+		dt->alpha = atof(strval_alpha);
 
 	const char *strval_beta = getenv("STARPU_SCHED_BETA");
 	if (strval_beta)
-		beta = atof(strval_beta);
+		dt->beta = atof(strval_beta);
 
 	const char *strval_gamma = getenv("STARPU_SCHED_GAMMA");
 	if (strval_gamma)
-		_gamma = atof(strval_gamma);
+		dt->_gamma = atof(strval_gamma);
 
-	unsigned workerid, workerid_ctx;
+	unsigned workerid_ctx;
 	for (workerid_ctx = 0; workerid_ctx < nworkers; workerid_ctx++)
 	{
-                workerid = sched_ctx->workerid[workerid_ctx];
-		queue_array[workerid] = _starpu_create_fifo();
+		dt->queue_array[workerid_ctx] = _starpu_create_fifo();
 	
-		PTHREAD_MUTEX_INIT(&sched_mutex[workerid], NULL);
-		PTHREAD_COND_INIT(&sched_cond[workerid], NULL);
-	
-		starpu_worker_set_sched_condition(workerid, &sched_cond[workerid], &sched_mutex[workerid]);
+		sched_ctx->sched_mutex[workerid_ctx] = (pthread_mutex_t*)malloc(sizeof(pthread_mutex_t));
+		sched_ctx->sched_cond[workerid_ctx] = (pthread_cond_t*)malloc(sizeof(pthread_cond_t));
+		PTHREAD_MUTEX_INIT(sched_ctx->sched_mutex[workerid_ctx], NULL);
+		PTHREAD_COND_INIT(sched_ctx->sched_cond[workerid_ctx], NULL);
 	}
 }
 
@@ -610,14 +642,20 @@ static void initialize_dmda_sorted_policy(unsigned sched_ctx_id)
 static void deinitialize_dmda_policy(unsigned sched_ctx_id) 
 {
 	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
-        unsigned workerid;
+	dmda_data *dt = (dmda_data*)sched_ctx->policy_data;
 	int workerid_in_ctx;
         int nworkers = sched_ctx->nworkers_in_ctx;
 	for (workerid_in_ctx = 0; workerid_in_ctx < nworkers; workerid_in_ctx++){
-                workerid = sched_ctx->workerid[workerid_in_ctx];
-		_starpu_destroy_fifo(queue_array[workerid]);
+		_starpu_destroy_fifo(dt->queue_array[workerid_in_ctx]);
+		PTHREAD_MUTEX_DESTROY(sched_ctx->sched_mutex[workerid_in_ctx]);
+                PTHREAD_COND_DESTROY(sched_ctx->sched_cond[workerid_in_ctx]);
+		free(sched_ctx->sched_mutex[workerid_in_ctx]);
+                free(sched_ctx->sched_cond[workerid_in_ctx]);
 	}
 
+	free(dt->queue_array);
+	free(dt);
+
 	_STARPU_DEBUG("total_task_cnt %ld ready_task_cnt %ld -> %f\n", total_task_cnt, ready_task_cnt, (100.0f*ready_task_cnt)/total_task_cnt);
 }
 

+ 36 - 22
src/sched_policies/eager_central_policy.c

@@ -24,43 +24,54 @@
 #include <sched_policies/fifo_queues.h>
 
 /* the former is the actual queue, the latter some container */
-static struct starpu_fifo_taskq_s *fifo;
+//static struct starpu_fifo_taskq_s *fifo;
 
-static pthread_cond_t sched_cond;
-static pthread_mutex_t sched_mutex;
+//static pthread_cond_t sched_cond;
+//static pthread_mutex_t sched_mutex;
 
 static void initialize_eager_center_policy(unsigned sched_ctx_id) 
 {
 	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
 
 	/* there is only a single queue in that trivial design */
-	fifo = _starpu_create_fifo();
+	struct starpu_fifo_taskq_s *fifo =  _starpu_create_fifo();
+	sched_ctx->policy_data = (void*)fifo;
 
-	PTHREAD_MUTEX_INIT(&sched_mutex, NULL);
-	PTHREAD_COND_INIT(&sched_cond, NULL);
+	pthread_mutex_t *sched_mutex = (pthread_mutex_t*) malloc(sizeof(pthread_mutex_t));
+	pthread_cond_t *sched_cond = (pthread_cond_t*) malloc(sizeof(pthread_cond_t));
+	PTHREAD_MUTEX_INIT(sched_mutex, NULL);
+	PTHREAD_COND_INIT(sched_cond, NULL);
 
-
-	unsigned workerid;
 	int workerid_in_ctx;
 	int nworkers = sched_ctx->nworkers_in_ctx;
 	for (workerid_in_ctx = 0; workerid_in_ctx < nworkers; workerid_in_ctx++){
-	        workerid = sched_ctx->workerid[workerid_in_ctx];
-		starpu_worker_set_sched_condition(workerid, &sched_cond, &sched_mutex);
+		sched_ctx->sched_mutex[workerid_in_ctx] = sched_mutex;
+		sched_ctx->sched_cond[workerid_in_ctx] = sched_cond;
 	}
 }
 
-static void deinitialize_eager_center_policy(__attribute__ ((unused)) unsigned sched_ctx_id) 
+static void deinitialize_eager_center_policy(unsigned sched_ctx_id) 
 {
 	/* TODO check that there is no task left in the queue */
 
+	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
+	struct starpu_fifo_taskq_s *fifo = (struct starpu_fifo_taskq_s*)sched_ctx->policy_data;
+
 	/* deallocate the job queue */
 	_starpu_destroy_fifo(fifo);
+
+	PTHREAD_MUTEX_DESTROY(sched_ctx->sched_mutex[0]);
+	PTHREAD_COND_DESTROY(sched_ctx->sched_cond[0]);
+	free(sched_ctx->sched_mutex[0]);
+	free(sched_ctx->sched_cond[0]);
+	
+	free(fifo);
 }
 
 static int push_task_eager_policy(struct starpu_task *task, unsigned sched_ctx_id)
 {
 	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
-
+	int nworkers_in_ctx = sched_ctx->nworkers_in_ctx;
 	int i;
 	int workerid;
 	for(i = 0; i < sched_ctx->nworkers_in_ctx; i++){
@@ -68,30 +79,33 @@ static int push_task_eager_policy(struct starpu_task *task, unsigned sched_ctx_i
 		_starpu_increment_nsubmitted_tasks_of_worker(workerid);
 	}
 
-
-	return _starpu_fifo_push_task(fifo, &sched_mutex, &sched_cond, task);
+	struct starpu_fifo_taskq_s *fifo = (struct starpu_fifo_taskq_s*)sched_ctx->policy_data;
+	return _starpu_fifo_push_task(fifo, sched_ctx->sched_mutex[0], sched_ctx->sched_cond[0], task);
 }
 
-static int push_prio_task_eager_policy(struct starpu_task *task, __attribute__ ((unused)) unsigned sched_ctx_id)
+static int push_prio_task_eager_policy(struct starpu_task *task, unsigned sched_ctx_id)
 {
-	return _starpu_fifo_push_prio_task(fifo, &sched_mutex, &sched_cond, task);
+	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
+	struct starpu_fifo_taskq_s *fifo = (struct starpu_fifo_taskq_s*)sched_ctx->policy_data;
+	return _starpu_fifo_push_prio_task(fifo, sched_ctx->sched_mutex[0], sched_ctx->sched_cond[0], task);
 }
 
-static struct starpu_task *pop_every_task_eager_policy(void)
+static struct starpu_task *pop_every_task_eager_policy(unsigned sched_ctx_id)
 {
-	return _starpu_fifo_pop_every_task(fifo, &sched_mutex, starpu_worker_get_id());
+	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
+	struct starpu_fifo_taskq_s *fifo = (struct starpu_fifo_taskq_s*)sched_ctx->policy_data;
+	return _starpu_fifo_pop_every_task(fifo, sched_ctx->sched_mutex[0], starpu_worker_get_id());
 }
 
-static struct starpu_task *pop_task_eager_policy(void)
+static struct starpu_task *pop_task_eager_policy(unsigned sched_ctx_id)
 {
         unsigned workerid = starpu_worker_get_id();
+	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
+	struct starpu_fifo_taskq_s *fifo = (struct starpu_fifo_taskq_s*)sched_ctx->policy_data;
 	struct starpu_task *task =  _starpu_fifo_pop_task(fifo, workerid);
 
 	if(task)
 	  {
-	    struct starpu_sched_ctx *sched_ctx = NULL;
-		  _starpu_get_sched_ctx(task->sched_ctx);
-
 		int i;
 		for(i = 0; i <sched_ctx->nworkers_in_ctx; i++)
 		  {

+ 37 - 19
src/sched_policies/eager_central_priority_policy.c

@@ -41,12 +41,12 @@ struct starpu_priority_taskq_s {
 };
 
 /* the former is the actual queue, the latter some container */
-static struct starpu_priority_taskq_s *taskq;
+//static struct starpu_priority_taskq_s *taskq;
 
 /* keep track of the total number of tasks to be scheduled to avoid infinite 
  * polling when there are really few tasks in the overall queue */
-static pthread_cond_t global_sched_cond;
-static pthread_mutex_t global_sched_mutex;
+//static pthread_cond_t global_sched_cond;
+//static pthread_mutex_t global_sched_mutex;
 
 /*
  * Centralized queue with priorities 
@@ -83,33 +83,48 @@ static void initialize_eager_center_priority_policy(unsigned sched_ctx_id)
 	starpu_sched_set_max_priority(MAX_LEVEL);
 
 	/* only a single queue (even though there are several internaly) */
-	taskq = _starpu_create_priority_taskq();
+	struct starpu_priority_taskq_s *taskq = _starpu_create_priority_taskq();
+	sched_ctx->policy_data = (void*) taskq;
 
-	PTHREAD_MUTEX_INIT(&global_sched_mutex, NULL);
-	PTHREAD_COND_INIT(&global_sched_cond, NULL);
+	pthread_cond_t *global_sched_cond = (pthread_cond_t*)malloc(sizeof(pthread_cond_t));
+	pthread_mutex_t *global_sched_mutex = (pthread_mutex_t*)malloc(sizeof(pthread_mutex_t));
+
+	PTHREAD_MUTEX_INIT(global_sched_mutex, NULL);
+	PTHREAD_COND_INIT(global_sched_cond, NULL);
 
 	int nworkers = sched_ctx->nworkers_in_ctx;
-	unsigned workerid;
 	int workerid_ctx;
 	for (workerid_ctx = 0; workerid_ctx < nworkers; workerid_ctx++)
 	{
-                workerid = sched_ctx->workerid[workerid_ctx];
-		starpu_worker_set_sched_condition(workerid, &global_sched_cond, &global_sched_mutex);
+		sched_ctx->sched_mutex[workerid_ctx] = global_sched_mutex;
+		sched_ctx->sched_cond[workerid_ctx] = global_sched_cond;
 	}
 }
 
-static void deinitialize_eager_center_priority_policy(__attribute__ ((unused)) unsigned sched_ctx_id) 
+static void deinitialize_eager_center_priority_policy(unsigned sched_ctx_id) 
 {
 	/* TODO check that there is no task left in the queue */
+	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
+	struct starpu_priority_taskq_s *taskq = (struct starpu_priority_taskq_s*)sched_ctx->policy_data;
 
 	/* deallocate the task queue */
 	_starpu_destroy_priority_taskq(taskq);
+
+	PTHREAD_MUTEX_DESTROY(sched_ctx->sched_mutex[0]);
+        PTHREAD_COND_DESTROY(sched_ctx->sched_cond[0]);
+        free(sched_ctx->sched_mutex[0]);
+        free(sched_ctx->sched_cond[0]);
+
+        free(taskq);
 }
 
-static int _starpu_priority_push_task(struct starpu_task *task, __attribute__ ((unused)) unsigned sched_ctx_id)
+static int _starpu_priority_push_task(struct starpu_task *task, unsigned sched_ctx_id)
 {
+	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
+	struct starpu_priority_taskq_s *taskq = (struct starpu_priority_taskq_s*)sched_ctx->policy_data;
+
 	/* wake people waiting for a task */
-	PTHREAD_MUTEX_LOCK(&global_sched_mutex);
+	PTHREAD_MUTEX_LOCK(sched_ctx->sched_mutex[0]);
 
 	STARPU_TRACE_JOB_PUSH(task, 1);
 	
@@ -119,26 +134,29 @@ static int _starpu_priority_push_task(struct starpu_task *task, __attribute__ ((
 	taskq->ntasks[priolevel]++;
 	taskq->total_ntasks++;
 
-	PTHREAD_COND_SIGNAL(&global_sched_cond);
-	PTHREAD_MUTEX_UNLOCK(&global_sched_mutex);
+	PTHREAD_COND_SIGNAL(sched_ctx->sched_cond[0]);
+	PTHREAD_MUTEX_UNLOCK(sched_ctx->sched_mutex[0]);
 
 	return 0;
 }
 
-static struct starpu_task *_starpu_priority_pop_task(void)
+static struct starpu_task *_starpu_priority_pop_task(unsigned sched_ctx_id)
 {
 	struct starpu_task *task = NULL;
 
+	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
+	struct starpu_priority_taskq_s *taskq = (struct starpu_priority_taskq_s*)sched_ctx->policy_data;
+
 	/* block until some event happens */
-	PTHREAD_MUTEX_LOCK(&global_sched_mutex);
+	PTHREAD_MUTEX_LOCK(sched_ctx->sched_mutex[0]);
 
 	if ((taskq->total_ntasks == 0) && _starpu_machine_is_running())
 	{
 #ifdef STARPU_NON_BLOCKING_DRIVERS
-		PTHREAD_MUTEX_UNLOCK(&global_sched_mutex);
+		PTHREAD_MUTEX_UNLOCK(sched_ctx->sched_mutex[0]);
 		return NULL;
 #else
-		PTHREAD_COND_WAIT(&global_sched_cond, &global_sched_mutex);
+		PTHREAD_COND_WAIT(sched_ctx->sched_cond[0], sched_ctx->sched_mutex[0]);
 #endif
 	}
 
@@ -156,7 +174,7 @@ static struct starpu_task *_starpu_priority_pop_task(void)
 		} while (!task && priolevel-- > 0);
 	}
 
-	PTHREAD_MUTEX_UNLOCK(&global_sched_mutex);
+	PTHREAD_MUTEX_UNLOCK(sched_ctx->sched_mutex[0]);
 
 	return task;
 }

+ 63 - 60
src/sched_policies/heft.c

@@ -26,16 +26,15 @@
 #include <starpu_task_bundle.h>
 
 typedef struct {
-  double alpha;
-  double beta;
-  double _gamma;
-  double idle_power;
-
-  double exp_start[STARPU_NMAXWORKERS];
-  double exp_end[STARPU_NMAXWORKERS];
-  double exp_len[STARPU_NMAXWORKERS];
-  double ntasks[STARPU_NMAXWORKERS];
-
+	double alpha;
+	double beta;
+	double _gamma;
+	double idle_power;
+
+	double *exp_start;
+	double *exp_end;
+	double *exp_len;
+	double *ntasks;
 } heft_data;
 
 static void heft_init(unsigned sched_ctx_id)
@@ -51,6 +50,11 @@ static void heft_init(unsigned sched_ctx_id)
 	unsigned nworkers = sched_ctx->nworkers_in_ctx;
 	sched_ctx->policy_data = (void*)hd;
 
+	hd->exp_start = (double*)malloc(nworkers*sizeof(double));
+	hd->exp_end = (double*)malloc(nworkers*sizeof(double));
+	hd->exp_len = (double*)malloc(nworkers*sizeof(double));
+	hd->ntasks = (double*)malloc(nworkers*sizeof(double));
+
 	const char *strval_alpha = getenv("STARPU_SCHED_ALPHA");
 	if (strval_alpha)
 		hd->alpha = atof(strval_alpha);
@@ -67,47 +71,44 @@ static void heft_init(unsigned sched_ctx_id)
 	if (strval_idle_power)
 		hd->idle_power = atof(strval_idle_power);
 
-	unsigned workerid, workerid_ctx;
+	unsigned workerid_ctx;
 
 	for (workerid_ctx = 0; workerid_ctx < nworkers; workerid_ctx++)
 	  {
-	    workerid = sched_ctx->workerid[workerid_ctx];
-	    hd->exp_start[workerid] = starpu_timing_now();
-	    hd->exp_len[workerid] = 0.0;
-	    hd->exp_end[workerid] = hd->exp_start[workerid]; 
-	    hd->ntasks[workerid] = 0;
-	    
-	    pthread_cond_t *sched_cond = malloc(sizeof(pthread_cond_t));
-	    pthread_mutex_t *sched_mutex = malloc(sizeof(pthread_mutex_t));
-	    
-	    PTHREAD_MUTEX_INIT(sched_mutex, NULL);
-	    PTHREAD_COND_INIT(sched_cond, NULL);
-	    
-	    starpu_worker_set_sched_condition(workerid, sched_cond, sched_mutex);
+	    hd->exp_start[workerid_ctx] = starpu_timing_now();
+	    hd->exp_len[workerid_ctx] = 0.0;
+	    hd->exp_end[workerid_ctx] = hd->exp_start[workerid_ctx]; 
+	    hd->ntasks[workerid_ctx] = 0;
+	    	    
+	    sched_ctx->sched_mutex[workerid_ctx] = (pthread_mutex_t*)malloc(sizeof(pthread_mutex_t));
+	    sched_ctx->sched_cond[workerid_ctx] = (pthread_cond_t*)malloc(sizeof(pthread_cond_t));
+	    PTHREAD_MUTEX_INIT(sched_ctx->sched_mutex[workerid_ctx], NULL);
+	    PTHREAD_COND_INIT(sched_ctx->sched_cond[workerid_ctx], NULL);
 	  }
 }
 
 static void heft_post_exec_hook(struct starpu_task *task, unsigned sched_ctx_id)
 {
 	int workerid = starpu_worker_get_id();
+	int workerid_ctx =  _starpu_get_index_in_ctx_of_workerid(sched_ctx_id, workerid);
+
 	double model = task->predicted;
 	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
 	heft_data *hd = (heft_data*)sched_ctx->policy_data;
 
-	struct starpu_worker_s *worker = _starpu_get_worker_struct(workerid);
-
 	/* Once we have executed the task, we can update the predicted amount
 	 * of work. */
-	PTHREAD_MUTEX_LOCK(worker->sched_mutex);
-	hd->exp_len[workerid] -= model;
-	hd->exp_start[workerid] = starpu_timing_now() + model;
-	hd->exp_end[workerid] = hd->exp_start[workerid] + hd->exp_len[workerid];
-	hd->ntasks[workerid]--;
-	PTHREAD_MUTEX_UNLOCK(worker->sched_mutex);
+	PTHREAD_MUTEX_LOCK(sched_ctx->sched_mutex[workerid_ctx]);
+	hd->exp_len[workerid_ctx] -= model;
+	hd->exp_start[workerid_ctx] = starpu_timing_now() + model;
+	hd->exp_end[workerid_ctx] = hd->exp_start[workerid_ctx] + hd->exp_len[workerid_ctx];
+	hd->ntasks[workerid_ctx]--;
+	PTHREAD_MUTEX_UNLOCK(sched_ctx->sched_mutex[workerid_ctx]);
 }
 
 static void heft_push_task_notify(struct starpu_task *task, int workerid, unsigned sched_ctx_id)
 {
+	int workerid_ctx =  _starpu_get_index_in_ctx_of_workerid(sched_ctx_id, workerid);
 	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
 	heft_data *hd = (heft_data*)sched_ctx->policy_data;
 
@@ -115,25 +116,24 @@ static void heft_push_task_notify(struct starpu_task *task, int workerid, unsign
 	enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(workerid);
 	double predicted = starpu_task_expected_length(task, perf_arch);
 
-	struct starpu_worker_s *worker = _starpu_get_worker_struct(workerid);
 	/* Update the predictions */
-	PTHREAD_MUTEX_LOCK(worker->sched_mutex);
+	PTHREAD_MUTEX_LOCK(sched_ctx->sched_mutex[workerid_ctx]);
 
 	/* Sometimes workers didn't take the tasks as early as we expected */
-	hd->exp_start[workerid] = STARPU_MAX(hd->exp_start[workerid], starpu_timing_now());
-	hd->exp_end[workerid] = STARPU_MAX(hd->exp_start[workerid], starpu_timing_now());
+	hd->exp_start[workerid_ctx] = STARPU_MAX(hd->exp_start[workerid_ctx], starpu_timing_now());
+	hd->exp_end[workerid_ctx] = STARPU_MAX(hd->exp_start[workerid_ctx], starpu_timing_now());
 
 	/* If there is no prediction available, we consider the task has a null length */
 	if (predicted != -1.0)
 	{
 		task->predicted = predicted;
-		hd->exp_end[workerid] += predicted;
-		hd->exp_len[workerid] += predicted;
+		hd->exp_end[workerid_ctx] += predicted;
+		hd->exp_len[workerid_ctx] += predicted;
 	}
 
-	hd->ntasks[workerid]++;
+	hd->ntasks[workerid_ctx]++;
 
-	PTHREAD_MUTEX_UNLOCK(worker->sched_mutex);
+	PTHREAD_MUTEX_UNLOCK(sched_ctx->sched_mutex[workerid_ctx]);
 }
 
 static int push_task_on_best_worker(struct starpu_task *task, int best_workerid, double predicted, int prio, struct starpu_sched_ctx *sched_ctx)
@@ -142,13 +142,13 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 
 	/* make sure someone coule execute that task ! */
 	STARPU_ASSERT(best_workerid != -1);
-	struct starpu_worker_s *worker = _starpu_get_worker_struct(best_workerid);
+	int best_workerid_ctx =  _starpu_get_index_in_ctx_of_workerid(sched_ctx->sched_ctx_id, best_workerid);
 
-	PTHREAD_MUTEX_LOCK(worker->sched_mutex);
-	hd->exp_end[best_workerid] += predicted;
-	hd->exp_len[best_workerid] += predicted;
-	hd->ntasks[best_workerid]++;
-	PTHREAD_MUTEX_UNLOCK(worker->sched_mutex);
+	PTHREAD_MUTEX_LOCK(sched_ctx->sched_mutex[best_workerid_ctx]);
+	hd->exp_end[best_workerid_ctx] += predicted;
+	hd->exp_len[best_workerid_ctx] += predicted;
+	hd->ntasks[best_workerid_ctx]++;
+	PTHREAD_MUTEX_UNLOCK(sched_ctx->sched_mutex[best_workerid_ctx]);
 
 	task->predicted = predicted;
 
@@ -157,7 +157,6 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 		unsigned memory_node = starpu_worker_get_memory_node(best_workerid);
 		starpu_prefetch_task_input_on_node(task, memory_node);
 	}
-
 	return starpu_push_local_task(best_workerid, task, prio);
 }
 
@@ -186,8 +185,8 @@ static void compute_all_performance_predictions(struct starpu_task *task,
     {
       worker = sched_ctx->workerid[worker_in_ctx];
       /* Sometimes workers didn't take the tasks as early as we expected */
-      hd->exp_start[worker] = STARPU_MAX(hd->exp_start[worker], starpu_timing_now());
-      exp_end[worker_in_ctx] = hd->exp_start[worker] + hd->exp_len[worker];
+      hd->exp_start[worker_in_ctx] = STARPU_MAX(hd->exp_start[worker_in_ctx], starpu_timing_now());
+      exp_end[worker_in_ctx] = hd->exp_start[worker_in_ctx] + hd->exp_len[worker_in_ctx];
       if (exp_end[worker_in_ctx] > max_exp_end)
  	max_exp_end = exp_end[worker_in_ctx];
 
@@ -214,7 +213,7 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 
 /*       printf("%d(in ctx%d): local task len = %2.2f locald data penalty = %2.2f local_power = %2.2f\n", worker, worker_in_ctx, local_task_length[worker_in_ctx], local_data_penalty[worker_in_ctx], local_power[worker_in_ctx]); */
 
-      double ntasks_end = hd->ntasks[worker] / starpu_worker_get_relative_speedup(perf_arch);
+      double ntasks_end = hd->ntasks[worker_in_ctx] / starpu_worker_get_relative_speedup(perf_arch);
 
       if (ntasks_best == -1
 	  || (!calibrating && ntasks_end < ntasks_best_end) /* Not calibrating, take better task */
@@ -239,7 +238,7 @@ static void compute_all_performance_predictions(struct starpu_task *task,
       if (unknown)
 	continue;
 
-      exp_end[worker_in_ctx] = hd->exp_start[worker] + hd->exp_len[worker] + local_task_length[worker_in_ctx];
+      exp_end[worker_in_ctx] = hd->exp_start[worker_in_ctx] + hd->exp_len[worker_in_ctx] + local_task_length[worker_in_ctx];
 
       if (exp_end[worker_in_ctx] < best_exp_end)
 	{
@@ -332,7 +331,6 @@ static int _heft_push_task(struct starpu_task *task, unsigned prio, unsigned sch
 		}
 	}
 
-
 	/* By now, we must have found a solution */
 	STARPU_ASSERT(best != -1);
 	
@@ -382,18 +380,23 @@ static int heft_push_task(struct starpu_task *task, unsigned sched_ctx_id)
 static void heft_deinit(unsigned sched_ctx_id) 
 {
 	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
-        unsigned workerid;
 	int workerid_in_ctx;
 	int nworkers = sched_ctx->nworkers_in_ctx;
+	heft_data *ht = (heft_data*)sched_ctx->policy_data;
+
 	for (workerid_in_ctx = 0; workerid_in_ctx < nworkers; workerid_in_ctx++){
-	        workerid = sched_ctx->workerid[workerid_in_ctx];
-		struct starpu_worker_s *worker = _starpu_get_worker_struct(workerid);
-		PTHREAD_MUTEX_DESTROY(worker->sched_mutex);
-		PTHREAD_COND_DESTROY(worker->sched_cond);
-		free(worker->sched_mutex);
-		free(worker->sched_cond);
+		PTHREAD_MUTEX_DESTROY(sched_ctx->sched_mutex[workerid_in_ctx]);
+		PTHREAD_COND_DESTROY(sched_ctx->sched_cond[workerid_in_ctx]);
+		free(sched_ctx->sched_mutex[workerid_in_ctx]);
+		free(sched_ctx->sched_cond[workerid_in_ctx]);
 	}
-	free(sched_ctx->policy_data);
+
+	free(ht->exp_start);
+	free(ht->exp_end);
+	free(ht->exp_len);
+	free(ht->ntasks);
+	  
+	free(ht);
 }
 
 struct starpu_sched_policy_s heft_policy = {

+ 7 - 9
src/sched_policies/random_policy.c

@@ -22,8 +22,8 @@
 
 //static unsigned nworkers;
 
-static pthread_cond_t sched_cond[STARPU_NMAXWORKERS];
-static pthread_mutex_t sched_mutex[STARPU_NMAXWORKERS];
+/* static pthread_cond_t sched_cond[STARPU_NMAXWORKERS]; */
+/* static pthread_mutex_t sched_mutex[STARPU_NMAXWORKERS]; */
 
 static int _random_push_task(struct starpu_task *task, unsigned prio, struct starpu_sched_ctx *sched_ctx)
 {
@@ -92,15 +92,13 @@ static void initialize_random_policy(unsigned sched_ctx_id)
 
 	unsigned nworkers = sched_ctx->nworkers_in_ctx;	
 
-	unsigned workerid, workerid_ctx;
+	unsigned workerid_ctx;
 	for (workerid_ctx = 0; workerid_ctx < nworkers; workerid_ctx++)
 	{
-                workerid = sched_ctx->workerid[workerid_ctx];
-	
-		PTHREAD_MUTEX_INIT(&sched_mutex[workerid], NULL);
-		PTHREAD_COND_INIT(&sched_cond[workerid], NULL);
-	
-		starpu_worker_set_sched_condition(workerid, &sched_cond[workerid], &sched_mutex[workerid]);
+		sched_ctx->sched_mutex[workerid_ctx] = (pthread_mutex_t*)malloc(sizeof(pthread_mutex_t));
+		sched_ctx->sched_cond[workerid_ctx] = (pthread_cond_t*)malloc(sizeof(pthread_cond_t));
+		PTHREAD_MUTEX_INIT(sched_ctx->sched_mutex[workerid_ctx], NULL);
+		PTHREAD_COND_INIT(sched_ctx->sched_cond[workerid_ctx], NULL);
 	}
 }
 

+ 69 - 48
src/sched_policies/work_stealing_policy.c

@@ -20,49 +20,55 @@
 #include <core/workers.h>
 #include <sched_policies/deque_queues.h>
 
-static unsigned nworkers;
-static unsigned rr_worker;
-static struct starpu_deque_jobq_s *queue_array[STARPU_NMAXWORKERS];
+//static unsigned nworkers;
+//static unsigned rr_worker;
+//static struct starpu_deque_jobq_s *queue_array[STARPU_NMAXWORKERS];
 
-static pthread_mutex_t global_sched_mutex;
-static pthread_cond_t global_sched_cond;
+/* static pthread_mutex_t global_sched_mutex; */
+/* static pthread_cond_t global_sched_cond; */
 
 /* keep track of the work performed from the beginning of the algorithm to make
  * better decisions about which queue to select when stealing or deferring work
  */
-static unsigned performed_total = 0;
+//static unsigned performed_total = 0;
+
+typedef struct {
+	struct starpu_deque_jobq_s **queue_array;
+	unsigned rr_worker;
+	unsigned performed_total;
+} work_stealing_data;
 
 #ifdef USE_OVERLOAD
-static float overload_metric(unsigned id)
+static float overload_metric(struct starpu_deque_jobq_s *dequeue_queue, unsigned *performed_total)
 {
 	float execution_ratio = 0.0f;
-	if (performed_total > 0) {
-		execution_ratio = _starpu_get_deque_nprocessed(queue_array[id])/performed_total;
+	if (*performed_total > 0) {
+		execution_ratio = _starpu_get_deque_nprocessed(dequeue_queue)/ *performed_total;
 	}
 
 	unsigned performed_queue;
-	performed_queue = _starpu_get_deque_nprocessed(queue_array[id]);
+	performed_queue = _starpu_get_deque_nprocessed(dequeue_queue);
 
 	float current_ratio = 0.0f;
 	if (performed_queue > 0) {
-		current_ratio = _starpu_get_deque_njobs(queue_array[id])/performed_queue;
+		current_ratio = _starpu_get_deque_njobs(dequeue_queue)/performed_queue;
 	}
 	
 	return (current_ratio - execution_ratio);
 }
 
 /* who to steal work to ? */
-static struct starpu_deque_jobq_s *select_victimq(void)
+static struct starpu_deque_jobq_s *select_victimq(work_stealing_data *ws, unsigned nworkers)
 {
 	struct starpu_deque_jobq_s *q;
 
 	unsigned attempts = nworkers;
 
-	unsigned worker = rr_worker;
+	unsigned worker = ws->rr_worker;
 	do {
 		if (overload_metric(worker) > 0.0f)
 		{
-			q = queue_array[worker];
+			q = ws->queue_array[worker];
 			return q;
 		}
 		else {
@@ -71,23 +77,23 @@ static struct starpu_deque_jobq_s *select_victimq(void)
 	} while(attempts-- > 0);
 
 	/* take one anyway ... */
-	q = queue_array[rr_worker];
-	rr_worker = (rr_worker + 1 )%nworkers;
+	q = ws->queue_array[ws->rr_worker];
+	ws->rr_worker = (ws->rr_worker + 1 )%nworkers;
 
 	return q;
 }
 
-static struct starpu_deque_jobq_s *select_workerq(void)
+static struct starpu_deque_jobq_s *select_workerq(work_stealing_data *ws, unsigned nworkers)
 {
 	struct starpu_deque_jobq_s *q;
 
 	unsigned attempts = nworkers;
 
-	unsigned worker = rr_worker;
+	unsigned worker = ws->rr_worker;
 	do {
 		if (overload_metric(worker) < 0.0f)
 		{
-			q = queue_array[worker];
+			q = ws->queue_array[worker];
 			return q;
 		}
 		else {
@@ -96,8 +102,8 @@ static struct starpu_deque_jobq_s *select_workerq(void)
 	} while(attempts-- > 0);
 
 	/* take one anyway ... */
-	q = queue_array[rr_worker];
-	rr_worker = (rr_worker + 1 )%nworkers;
+	q = ws->queue_array[ws->rr_worker];
+	ws->rr_worker = (ws->rr_worker + 1 )%nworkers;
 
 	return q;
 }
@@ -105,13 +111,13 @@ static struct starpu_deque_jobq_s *select_workerq(void)
 #else
 
 /* who to steal work to ? */
-static struct starpu_deque_jobq_s *select_victimq(void)
+static struct starpu_deque_jobq_s *select_victimq(work_stealing_data *ws, unsigned nworkers)
 {
 	struct starpu_deque_jobq_s *q;
 
-	q = queue_array[rr_worker];
+	q = ws->queue_array[ws->rr_worker];
 
-	rr_worker = (rr_worker + 1 )%nworkers;
+	ws->rr_worker = (ws->rr_worker + 1 )%nworkers;
 
 	return q;
 }
@@ -119,13 +125,13 @@ static struct starpu_deque_jobq_s *select_victimq(void)
 
 /* when anonymous threads submit tasks, 
  * we need to select a queue where to dispose them */
-static struct starpu_deque_jobq_s *select_workerq(void)
+static struct starpu_deque_jobq_s *select_workerq(work_stealing_data *ws, unsigned nworkers)
 {
 	struct starpu_deque_jobq_s *q;
 
-	q = queue_array[rr_worker];
+	q = ws->queue_array[ws->rr_worker];
 
-	rr_worker = (rr_worker + 1 )%nworkers;
+	ws->rr_worker = (ws->rr_worker + 1 )%nworkers;
 
 	return q;
 }
@@ -133,51 +139,59 @@ static struct starpu_deque_jobq_s *select_workerq(void)
 #endif
 
 #warning TODO rewrite ... this will not scale at all now
-static struct starpu_task *ws_pop_task(void)
+static struct starpu_task *ws_pop_task(unsigned sched_ctx_id)
 {
+	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
+	work_stealing_data *ws = (work_stealing_data*)sched_ctx->policy_data;
+
 	struct starpu_task *task;
 
 	int workerid = starpu_worker_get_id();
+	int workerid_ctx =  _starpu_get_index_in_ctx_of_workerid(sched_ctx_id, workerid);
 
 	struct starpu_deque_jobq_s *q;
 
-	q = queue_array[workerid];
+	q = ws->queue_array[workerid_ctx];
 
-	PTHREAD_MUTEX_LOCK(&global_sched_mutex);
+	PTHREAD_MUTEX_LOCK(sched_ctx->sched_mutex[0]);
 
 	task = _starpu_deque_pop_task(q, -1);
 	if (task) {
 		/* there was a local task */
-		performed_total++;
-		PTHREAD_MUTEX_UNLOCK(&global_sched_mutex);
+		ws->performed_total++;
+		PTHREAD_MUTEX_UNLOCK(sched_ctx->sched_mutex[0]);
 		return task;
 	}
 	
 	/* we need to steal someone's job */
 	struct starpu_deque_jobq_s *victimq;
-	victimq = select_victimq();
+	victimq = select_victimq(ws, sched_ctx->nworkers_in_ctx);
 
 	task = _starpu_deque_pop_task(victimq, workerid);
 	if (task) {
 		STARPU_TRACE_WORK_STEALING(q, victimq);
-		performed_total++;
+		ws->performed_total++;
 	}
 
-	PTHREAD_MUTEX_UNLOCK(&global_sched_mutex);
+	PTHREAD_MUTEX_UNLOCK(sched_ctx->sched_mutex[0]);
 
 	return task;
 }
 
-int ws_push_task(struct starpu_task *task, __attribute__ ((unused)) unsigned sched_ctx_id)
+int ws_push_task(struct starpu_task *task, unsigned sched_ctx_id)
 {
 	starpu_job_t j = _starpu_get_job_associated_to_task(task);
 
+	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
+	work_stealing_data *ws = (work_stealing_data*)sched_ctx->policy_data;
+
 	int workerid = starpu_worker_get_id();
+	int workerid_ctx =  _starpu_get_index_in_ctx_of_workerid(sched_ctx_id, workerid);
 
         struct starpu_deque_jobq_s *deque_queue;
-	deque_queue = queue_array[workerid];
+	deque_queue = ws->queue_array[workerid_ctx];
 
-        PTHREAD_MUTEX_LOCK(&global_sched_mutex);
+        PTHREAD_MUTEX_LOCK(sched_ctx->sched_mutex[0]);
 	// XXX reuse ?
         //total_number_of_jobs++;
 
@@ -186,8 +200,8 @@ int ws_push_task(struct starpu_task *task, __attribute__ ((unused)) unsigned sch
         deque_queue->njobs++;
         deque_queue->nprocessed++;
 
-        PTHREAD_COND_SIGNAL(&global_sched_cond);
-        PTHREAD_MUTEX_UNLOCK(&global_sched_mutex);
+        PTHREAD_COND_SIGNAL(sched_ctx->sched_cond[0]);
+        PTHREAD_MUTEX_UNLOCK(sched_ctx->sched_mutex[0]);
 
         return 0;
 }
@@ -195,19 +209,26 @@ int ws_push_task(struct starpu_task *task, __attribute__ ((unused)) unsigned sch
 static void initialize_ws_policy(unsigned sched_ctx_id) 
 {
 	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
+	work_stealing_data *ws = (work_stealing_data*)malloc(sizeof(work_stealing_data));
+	sched_ctx->policy_data = (void*)ws;
+	
+	unsigned nworkers = sched_ctx->nworkers_in_ctx;
+	ws->rr_worker = 0;
+	ws->queue_array = (struct starpu_deque_jobq_s**)malloc(nworkers*sizeof(struct starpu_deque_jobq_s*));
 
-	nworkers = sched_ctx->nworkers_in_ctx;
-	rr_worker = 0;
 
-	PTHREAD_MUTEX_INIT(&global_sched_mutex, NULL);
-	PTHREAD_COND_INIT(&global_sched_cond, NULL);
+	pthread_mutex_t *sched_mutex = (pthread_mutex_t*) malloc(sizeof(pthread_mutex_t));
+	pthread_cond_t *sched_cond = (pthread_cond_t*) malloc(sizeof(pthread_cond_t));
+	PTHREAD_MUTEX_INIT(sched_mutex, NULL);
+	PTHREAD_COND_INIT(sched_cond, NULL);
 
-	unsigned workerid, workerid_ctx;
+	unsigned workerid_ctx;
 	for (workerid_ctx = 0; workerid_ctx < nworkers; workerid_ctx++)
 	{
-                workerid = sched_ctx->workerid[workerid_ctx];
-		queue_array[workerid] = _starpu_create_deque();
-		starpu_worker_set_sched_condition(workerid, &global_sched_cond, &global_sched_mutex);
+		ws->queue_array[workerid_ctx] = _starpu_create_deque();
+
+		sched_ctx->sched_mutex[workerid_ctx] = sched_mutex;
+		sched_ctx->sched_cond[workerid_ctx] = sched_cond;
 	}
 }
 

+ 4 - 4
src/util/starpu_insert_task_utils.c

@@ -131,7 +131,7 @@ int _starpu_pack_cl_args(size_t arg_buffer_size, char **arg_buffer, va_list varg
 	va_end(varg_list);
 	return 0;
 }
-static void _starpu_prepare_task(char *arg_buffer, starpu_codelet *cl, struct starpu_task **task, va_list varg_list, int *ctx) {
+static void _starpu_prepare_task(char *arg_buffer, starpu_codelet *cl, struct starpu_task **task, va_list varg_list, unsigned *ctx) {
         int arg_type;
 	unsigned current_buffer = 0;
 
@@ -180,7 +180,7 @@ static void _starpu_prepare_task(char *arg_buffer, starpu_codelet *cl, struct st
 			va_arg(varg_list, int);
 		}
 		else if (arg_type==STARPU_CTX) {
-			*ctx = va_arg(varg_list, int);
+			*ctx = va_arg(varg_list, unsigned);
 		}
 
 	}
@@ -199,10 +199,10 @@ static void _starpu_prepare_task(char *arg_buffer, starpu_codelet *cl, struct st
 }
 
 int _starpu_insert_task_create_and_submit(char *arg_buffer, starpu_codelet *cl, struct starpu_task **task, va_list varg_list) {
-	int ctx = -1;
+	unsigned ctx = 0;
 	_starpu_prepare_task(arg_buffer, cl, task, varg_list, &ctx);
 	
-	 int ret = ctx == -1 ? starpu_task_submit(*task) : starpu_task_submit_to_ctx(*task, ctx);
+	 int ret = ctx == 0 ? starpu_task_submit(*task) : starpu_task_submit_to_ctx(*task, ctx);
 
 	if (STARPU_UNLIKELY(ret == -ENODEV))
           fprintf(stderr, "No one can execute task %p wih cl %p (symbol %s)\n", *task, (*task)->cl, ((*task)->cl->model && (*task)->cl->model->symbol)?(*task)->cl->model->symbol:"none");