Browse Source

Added sched_ctxs

Andra Hugo 14 years ago
parent
commit
0ec66b2e5f

+ 12 - 5
include/starpu_scheduler.h

@@ -63,14 +63,13 @@ struct starpu_machine_topology_s {
  * field of the starpu_conf structure passed to the starpu_init function. */
  * field of the starpu_conf structure passed to the starpu_init function. */
 struct starpu_sched_policy_s {
 struct starpu_sched_policy_s {
 	/* Initialize the scheduling policy. */
 	/* Initialize the scheduling policy. */
-	void (*init_sched)(struct starpu_machine_topology_s *, struct starpu_sched_policy_s *);
+	void (*init_sched)(struct starpu_sched_ctx *);
 
 
 	/* Cleanup the scheduling policy. */
 	/* Cleanup the scheduling policy. */
-	void (*deinit_sched)(struct starpu_machine_topology_s *, struct starpu_sched_policy_s *);
+	void (*deinit_sched)(struct starpu_sched_ctx *);
 
 
 	/* Insert a task into the scheduler. */
 	/* Insert a task into the scheduler. */
-	int (*push_task)(struct starpu_task *);
-
+        int (*push_task)(struct starpu_task *, struct starpu_sched_ctx *);
 	/* Notify the scheduler that a task was pushed on the worker. This
 	/* Notify the scheduler that a task was pushed on the worker. This
 	 * method is called when a task that was explicitely assigned to a
 	 * method is called when a task that was explicitely assigned to a
 	 * worker is scheduled. This method therefore permits to keep the state
 	 * worker is scheduled. This method therefore permits to keep the state
@@ -79,7 +78,7 @@ struct starpu_sched_policy_s {
 	void (*push_task_notify)(struct starpu_task *, int workerid);
 	void (*push_task_notify)(struct starpu_task *, int workerid);
 
 
 	/* Insert a priority task into the scheduler. */
 	/* Insert a priority task into the scheduler. */
-	int (*push_prio_task)(struct starpu_task *);
+        int (*push_prio_task)(struct starpu_task *, struct starpu_sched_ctx *);
 
 
 	/* Get a task from the scheduler. The mutex associated to the worker is
 	/* Get a task from the scheduler. The mutex associated to the worker is
 	 * already taken when this method is called. */
 	 * already taken when this method is called. */
@@ -101,6 +100,14 @@ struct starpu_sched_policy_s {
 	const char *policy_description;
 	const char *policy_description;
 };
 };
 
 
+struct starpu_sched_ctx {
+	struct starpu_sched_policy_s *sched_policy;
+	int workerid[STARPU_NMAXWORKERS];
+	int nworkers_in_ctx;
+	int nworkers_of_next_ctx;
+};
+
+
 /* When there is no available task for a worker, StarPU blocks this worker on a
 /* When there is no available task for a worker, StarPU blocks this worker on a
 condition variable. This function specifies which condition variable (and the
 condition variable. This function specifies which condition variable (and the
 associated mutex) should be used to block (and to wake up) a worker. Note that
 associated mutex) should be used to block (and to wake up) a worker. Note that

+ 5 - 0
include/starpu_task.h

@@ -88,6 +88,7 @@ typedef struct starpu_codelet_t {
 } starpu_codelet;
 } starpu_codelet;
 
 
 struct starpu_task {
 struct starpu_task {
+  char *name;
 	struct starpu_codelet_t *cl;
 	struct starpu_codelet_t *cl;
 
 
 	/* arguments managed by the DSM */
 	/* arguments managed by the DSM */
@@ -156,6 +157,9 @@ struct starpu_task {
 	 * by hand (without starpu_task_create), this field should be set to
 	 * by hand (without starpu_task_create), this field should be set to
 	 * NULL. */
 	 * NULL. */
 	void *starpu_private;
 	void *starpu_private;
+     
+        struct starpu_sched_ctx *sched_ctx;
+  
 };
 };
 
 
 /* It is possible to initialize statically allocated tasks with this value.
 /* It is possible to initialize statically allocated tasks with this value.
@@ -242,6 +246,7 @@ struct starpu_task *starpu_task_create(void);
  * allocated task results in an undefined behaviour. */
  * allocated task results in an undefined behaviour. */
 void starpu_task_destroy(struct starpu_task *task);
 void starpu_task_destroy(struct starpu_task *task);
 int starpu_task_submit(struct starpu_task *task);
 int starpu_task_submit(struct starpu_task *task);
+int starpu_task_submit_to_ctx(struct starpu_task *task, struct starpu_sched_ctx *sched_ctx);
 
 
 /* This function blocks until the task was executed. It is not possible to
 /* This function blocks until the task was executed. It is not possible to
  * synchronize with a task more than once. It is not possible to wait
  * synchronize with a task more than once. It is not possible to wait

+ 13 - 46
src/common/barrier.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  *
- * Copyright (C) 2010,2011  Centre National de la Recherche Scientifique
+ * Copyright (C) 2010  Centre National de la Recherche Scientifique
  *
  *
  * StarPU is free software; you can redistribute it and/or modify
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
  * it under the terms of the GNU Lesser General Public License as published by
@@ -15,45 +15,20 @@
  */
  */
 
 
 #include <common/barrier.h>
 #include <common/barrier.h>
-#include <common/utils.h>
 
 
 int _starpu_barrier_init(_starpu_barrier_t *barrier, int count)
 int _starpu_barrier_init(_starpu_barrier_t *barrier, int count)
 {
 {
 	barrier->count = count;
 	barrier->count = count;
-	barrier->reached_start = 0;
-	barrier->reached_exit = 0;
-	PTHREAD_MUTEX_INIT(&barrier->mutex, NULL);
-	PTHREAD_MUTEX_INIT(&barrier->mutex_exit, NULL);
-	PTHREAD_COND_INIT(&barrier->cond, NULL);
+	barrier->reached = 0;
+	pthread_mutex_init(&barrier->mutex,NULL);
+	pthread_cond_init(&barrier->cond,NULL);
 	return 0;
 	return 0;
 }
 }
 
 
-int _starpu_barrier_test(_starpu_barrier_t *barrier)
-{
-    /*
-     * Check whether any threads are known to be waiting; report
-     * "BUSY" if so.
-     */
-        PTHREAD_MUTEX_LOCK(&barrier->mutex_exit);
-        if (barrier->reached_exit != barrier->count) {
-                PTHREAD_MUTEX_UNLOCK(&barrier->mutex_exit);
-                return EBUSY;
-        }
-        PTHREAD_MUTEX_UNLOCK(&barrier->mutex_exit);
-        return 0;
-}
-
 int _starpu_barrier_destroy(_starpu_barrier_t *barrier)
 int _starpu_barrier_destroy(_starpu_barrier_t *barrier)
 {
 {
-	int ret = _starpu_barrier_test(barrier);
-	while (ret == EBUSY) {
-		ret = _starpu_barrier_test(barrier);
-	}
-	_STARPU_DEBUG("reached_exit %d\n", barrier->reached_exit);
-
-	PTHREAD_MUTEX_DESTROY(&barrier->mutex);
-	PTHREAD_MUTEX_DESTROY(&barrier->mutex_exit);
-	PTHREAD_COND_DESTROY(&barrier->cond);
+	pthread_mutex_destroy(&barrier->mutex);
+	pthread_cond_destroy(&barrier->cond);
 	return 0;
 	return 0;
 }
 }
 
 
@@ -61,26 +36,18 @@ int _starpu_barrier_wait(_starpu_barrier_t *barrier)
 {
 {
 	int ret=0;
 	int ret=0;
 
 
-        // Wait until all threads enter the barrier
-	PTHREAD_MUTEX_LOCK(&barrier->mutex);
-	barrier->reached_exit=0;
-	barrier->reached_start++;
-	if (barrier->reached_start == barrier->count)
+	pthread_mutex_lock(&barrier->mutex);
+	barrier->reached++;
+	if (barrier->reached == barrier->count)
 	{
 	{
-		barrier->reached_start = 0;
-		PTHREAD_COND_BROADCAST(&barrier->cond);
+		barrier->reached = 0;
+		pthread_cond_broadcast(&barrier->cond);
 		ret = PTHREAD_BARRIER_SERIAL_THREAD;
 		ret = PTHREAD_BARRIER_SERIAL_THREAD;
 	}
 	}
 	else
 	else
 	{
 	{
-                PTHREAD_COND_WAIT(&barrier->cond,&barrier->mutex);
+		pthread_cond_wait(&barrier->cond,&barrier->mutex);
 	}
 	}
-	PTHREAD_MUTEX_UNLOCK(&barrier->mutex);
-
-        // Count number of threads that exit the barrier
-	PTHREAD_MUTEX_LOCK(&barrier->mutex_exit);
-	barrier->reached_exit ++;
-	PTHREAD_MUTEX_UNLOCK(&barrier->mutex_exit);
-
+	pthread_mutex_unlock(&barrier->mutex);
 	return ret;
 	return ret;
 }
 }

+ 2 - 4
src/common/barrier.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  *
- * Copyright (C) 2010, 2011  Centre National de la Recherche Scientifique
+ * Copyright (C) 2010  Centre National de la Recherche Scientifique
  *
  *
  * StarPU is free software; you can redistribute it and/or modify
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
  * it under the terms of the GNU Lesser General Public License as published by
@@ -21,10 +21,8 @@
 
 
 typedef struct {
 typedef struct {
 	int count;
 	int count;
-	int reached_start;
-	int reached_exit;
+	int reached;
 	pthread_mutex_t mutex;
 	pthread_mutex_t mutex;
-	pthread_mutex_t mutex_exit;
 	pthread_cond_t cond;
 	pthread_cond_t cond;
 } _starpu_barrier_t;
 } _starpu_barrier_t;
 
 

+ 1 - 1
src/core/dependencies/implicit_data_deps.c

@@ -182,7 +182,7 @@ static void disable_last_writer_callback(void *cl_arg)
  * */
  * */
 /* NB : handle->sequential_consistency_mutex must be hold by the caller */
 /* NB : handle->sequential_consistency_mutex must be hold by the caller */
 void _starpu_detect_implicit_data_deps_with_handle(struct starpu_task *pre_sync_task, struct starpu_task *post_sync_task,
 void _starpu_detect_implicit_data_deps_with_handle(struct starpu_task *pre_sync_task, struct starpu_task *post_sync_task,
-						starpu_data_handle handle, starpu_access_mode mode)
+						   starpu_data_handle handle, starpu_access_mode mode)
 {
 {
 	STARPU_ASSERT(!(mode & STARPU_SCRATCH));
 	STARPU_ASSERT(!(mode & STARPU_SCRATCH));
         _STARPU_LOG_IN();
         _STARPU_LOG_IN();

+ 1 - 1
src/core/dependencies/implicit_data_deps.h

@@ -22,7 +22,7 @@
 #include <common/config.h>
 #include <common/config.h>
 
 
 void _starpu_detect_implicit_data_deps_with_handle(struct starpu_task *pre_sync_task, struct starpu_task *post_sync_task,
 void _starpu_detect_implicit_data_deps_with_handle(struct starpu_task *pre_sync_task, struct starpu_task *post_sync_task,
-						starpu_data_handle handle, starpu_access_mode mode);
+						   starpu_data_handle handle, starpu_access_mode mode);
 void _starpu_detect_implicit_data_deps(struct starpu_task *task);
 void _starpu_detect_implicit_data_deps(struct starpu_task *task);
 void _starpu_release_data_enforce_sequential_consistency(struct starpu_task *task, starpu_data_handle handle);
 void _starpu_release_data_enforce_sequential_consistency(struct starpu_task *task, starpu_data_handle handle);
 
 

+ 5 - 1
src/core/errorcheck.h

@@ -34,7 +34,11 @@ typedef enum {
 	/* during the execution of the callback */
 	/* during the execution of the callback */
 	STATUS_CALLBACK,
 	STATUS_CALLBACK,
 	/* while sleeping because there is nothing to do */
 	/* while sleeping because there is nothing to do */
-	STATUS_SLEEPING
+	STATUS_SLEEPING,
+	/* changing ctx because a new one was create */
+	STATUS_CHANGING_CTX,
+	/* after having done join */
+	STATUS_JOINED
 } starpu_worker_status;
 } starpu_worker_status;
 
 
 /* Specify what the local worker is currently doing (eg. executing a callback).
 /* Specify what the local worker is currently doing (eg. executing a callback).

+ 1 - 1
src/core/jobs.c

@@ -186,7 +186,7 @@ void _starpu_handle_job_termination(starpu_job_t j, unsigned job_is_already_lock
 			starpu_clock_gettime(&task->profiling_info->callback_end_time);
 			starpu_clock_gettime(&task->profiling_info->callback_end_time);
 	}
 	}
 
 
-	_starpu_sched_post_exec_hook(task);
+	//	_starpu_sched_post_exec_hook(task);
 
 
 	STARPU_TRACE_TASK_DONE(j);
 	STARPU_TRACE_TASK_DONE(j);
 
 

+ 1 - 2
src/core/jobs.h

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  *
  * Copyright (C) 2009, 2010  Université de Bordeaux 1
  * Copyright (C) 2009, 2010  Université de Bordeaux 1
- * Copyright (C) 2010, 2011  Centre National de la Recherche Scientifique
+ * Copyright (C) 2010  Centre National de la Recherche Scientifique
  *
  *
  * StarPU is free software; you can redistribute it and/or modify
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
  * it under the terms of the GNU Lesser General Public License as published by
@@ -35,7 +35,6 @@
 #include <datawizard/datawizard.h>
 #include <datawizard/datawizard.h>
 #include <core/perfmodel/perfmodel.h>
 #include <core/perfmodel/perfmodel.h>
 #include <core/errorcheck.h>
 #include <core/errorcheck.h>
-#include <common/barrier.h>
 
 
 #ifdef STARPU_USE_CUDA
 #ifdef STARPU_USE_CUDA
 #include <cuda.h>
 #include <cuda.h>

+ 186 - 33
src/core/sched_policy.c

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  *
  * Copyright (C) 2010  Université de Bordeaux 1
  * Copyright (C) 2010  Université de Bordeaux 1
- * Copyright (C) 2010, 2011  Centre National de la Recherche Scientifique
+ * Copyright (C) 2010  Centre National de la Recherche Scientifique
  *
  *
  * StarPU is free software; you can redistribute it and/or modify
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
  * it under the terms of the GNU Lesser General Public License as published by
@@ -22,11 +22,13 @@
 #include <common/utils.h>
 #include <common/utils.h>
 #include <core/sched_policy.h>
 #include <core/sched_policy.h>
 #include <profiling/profiling.h>
 #include <profiling/profiling.h>
-#include <common/barrier.h>
 
 
-static struct starpu_sched_policy_s policy;
+//static struct starpu_sched_policy_s policy;
 
 
 static int use_prefetch = 0;
 static int use_prefetch = 0;
+static pthread_cond_t blocking_ths_cond = PTHREAD_COND_INITIALIZER;
+static pthread_mutex_t blocking_ths_mutex = PTHREAD_MUTEX_INITIALIZER;
+static int nblocked_ths = 0;
 
 
 int starpu_get_prefetch_flag(void)
 int starpu_get_prefetch_flag(void)
 {
 {
@@ -67,16 +69,16 @@ static struct starpu_sched_policy_s *predefined_policies[NPREDEFINED_POLICIES] =
 	&_starpu_sched_pgreedy_policy
 	&_starpu_sched_pgreedy_policy
 };
 };
 
 
-struct starpu_sched_policy_s *_starpu_get_sched_policy(void)
+struct starpu_sched_policy_s *_starpu_get_sched_policy(struct starpu_sched_ctx *sched_ctx)
 {
 {
-	return &policy;
+	return sched_ctx->sched_policy;
 }
 }
 
 
 /*
 /*
  *	Methods to initialize the scheduling policy
  *	Methods to initialize the scheduling policy
  */
  */
 
 
-static void load_sched_policy(struct starpu_sched_policy_s *sched_policy)
+static void load_sched_policy(struct starpu_sched_policy_s *sched_policy, struct starpu_sched_ctx *sched_ctx)
 {
 {
 	STARPU_ASSERT(sched_policy);
 	STARPU_ASSERT(sched_policy);
 
 
@@ -90,14 +92,16 @@ static void load_sched_policy(struct starpu_sched_policy_s *sched_policy)
 
 
 	}
 	}
 #endif
 #endif
-
-	policy.init_sched = sched_policy->init_sched;
-	policy.deinit_sched = sched_policy->deinit_sched;
-	policy.push_task = sched_policy->push_task;
-	policy.push_prio_task = sched_policy->push_prio_task;
-	policy.pop_task = sched_policy->pop_task;
-        policy.post_exec_hook = sched_policy->post_exec_hook;
-	policy.pop_every_task = sched_policy->pop_every_task;
+        struct starpu_sched_policy_s *policy = sched_ctx->sched_policy;
+
+	policy->init_sched = sched_policy->init_sched;
+	policy->deinit_sched = sched_policy->deinit_sched;
+	policy->push_task = sched_policy->push_task;
+	policy->push_prio_task = sched_policy->push_prio_task;
+	policy->pop_task = sched_policy->pop_task;
+        policy->post_exec_hook = sched_policy->post_exec_hook;
+	policy->pop_every_task = sched_policy->pop_every_task;
+	policy->policy_name = sched_policy->policy_name;
 }
 }
 
 
 static struct starpu_sched_policy_s *find_sched_policy_from_name(const char *policy_name)
 static struct starpu_sched_policy_s *find_sched_policy_from_name(const char *policy_name)
@@ -141,7 +145,7 @@ static void display_sched_help_message(void)
 	 }
 	 }
 }
 }
 
 
-static struct starpu_sched_policy_s *select_sched_policy(struct starpu_machine_config_s *config)
+static struct starpu_sched_policy_s *select_sched_policy(struct starpu_machine_config_s *config, int init_ctx, const char *policy_name)
 {
 {
 	struct starpu_sched_policy_s *selected_policy = NULL;
 	struct starpu_sched_policy_s *selected_policy = NULL;
 	struct starpu_conf *user_conf = config->user_conf;
 	struct starpu_conf *user_conf = config->user_conf;
@@ -162,6 +166,9 @@ static struct starpu_sched_policy_s *select_sched_policy(struct starpu_machine_c
 
 
 	if (sched_pol_name)
 	if (sched_pol_name)
 		selected_policy = find_sched_policy_from_name(sched_pol_name);
 		selected_policy = find_sched_policy_from_name(sched_pol_name);
+	else
+		if(policy_name)
+			selected_policy = find_sched_policy_from_name(policy_name);
 
 
 	/* Perhaps there was no policy that matched the name */
 	/* Perhaps there was no policy that matched the name */
 	if (selected_policy)
 	if (selected_policy)
@@ -171,7 +178,7 @@ static struct starpu_sched_policy_s *select_sched_policy(struct starpu_machine_c
 	return &_starpu_sched_eager_policy;
 	return &_starpu_sched_eager_policy;
 }
 }
 
 
-void _starpu_init_sched_policy(struct starpu_machine_config_s *config)
+void _starpu_init_sched_policy(struct starpu_machine_config_s *config, struct starpu_sched_ctx *sched_ctx, int init_ctx, const char *policy_name)
 {
 {
 	/* Perhaps we have to display some help */
 	/* Perhaps we have to display some help */
 	display_sched_help_message();
 	display_sched_help_message();
@@ -180,6 +187,7 @@ void _starpu_init_sched_policy(struct starpu_machine_config_s *config)
 	use_prefetch = starpu_get_env_number("STARPU_PREFETCH");
 	use_prefetch = starpu_get_env_number("STARPU_PREFETCH");
 	if (use_prefetch == -1)
 	if (use_prefetch == -1)
 		use_prefetch = 1;
 		use_prefetch = 1;
+  
 
 
 	/* By default, we don't calibrate */
 	/* By default, we don't calibrate */
 	unsigned do_calibrate = 0;
 	unsigned do_calibrate = 0;
@@ -195,17 +203,18 @@ void _starpu_init_sched_policy(struct starpu_machine_config_s *config)
 	_starpu_set_calibrate_flag(do_calibrate);
 	_starpu_set_calibrate_flag(do_calibrate);
 
 
 	struct starpu_sched_policy_s *selected_policy;
 	struct starpu_sched_policy_s *selected_policy;
-	selected_policy = select_sched_policy(config);
+	selected_policy = select_sched_policy(config, init_ctx, policy_name);
 
 
-	load_sched_policy(selected_policy);
+	load_sched_policy(selected_policy, sched_ctx);
 
 
-	policy.init_sched(&config->topology, &policy);
+	sched_ctx->sched_policy->init_sched(sched_ctx);
 }
 }
 
 
-void _starpu_deinit_sched_policy(struct starpu_machine_config_s *config)
+void _starpu_deinit_sched_policy(struct starpu_machine_config_s *config, struct starpu_sched_ctx *sched_ctx)
 {
 {
-	if (policy.deinit_sched)
-		policy.deinit_sched(&config->topology, &policy);
+        struct starpu_sched_policy_s *policy = sched_ctx->sched_policy;
+	if (policy->deinit_sched)
+		policy->deinit_sched(sched_ctx);
 }
 }
 
 
 /* Enqueue a task into the list of tasks explicitely attached to a worker. In
 /* Enqueue a task into the list of tasks explicitely attached to a worker. In
@@ -236,8 +245,8 @@ static int _starpu_push_task_on_specific_worker(struct starpu_task *task, int wo
 	if (use_prefetch)
 	if (use_prefetch)
 		starpu_prefetch_task_input_on_node(task, memory_node);
 		starpu_prefetch_task_input_on_node(task, memory_node);
 
 
-	if (policy.push_task_notify)
-		policy.push_task_notify(task, workerid);
+	if (worker->sched_ctx->sched_policy->push_task_notify)
+		worker->sched_ctx->sched_policy->push_task_notify(task, workerid);
 
 
 	if (is_basic_worker)
 	if (is_basic_worker)
 	{
 	{
@@ -296,8 +305,9 @@ int _starpu_push_task(starpu_job_t j, unsigned job_is_already_locked)
 		ret = _starpu_push_task_on_specific_worker(task, task->workerid);
 		ret = _starpu_push_task_on_specific_worker(task, task->workerid);
 	}
 	}
 	else {
 	else {
-		STARPU_ASSERT(policy.push_task);
-		ret = policy.push_task(task);
+	        struct starpu_sched_ctx *sched_ctx = task->sched_ctx;
+		STARPU_ASSERT(sched_ctx->sched_policy->push_task);
+		ret = sched_ctx->sched_policy->push_task(task, sched_ctx);
 	}
 	}
 
 
 	_starpu_profiling_set_task_push_end_time(task);
 	_starpu_profiling_set_task_push_end_time(task);
@@ -309,6 +319,7 @@ int _starpu_push_task(starpu_job_t j, unsigned job_is_already_locked)
 struct starpu_task *_starpu_pop_task(struct starpu_worker_s *worker)
 struct starpu_task *_starpu_pop_task(struct starpu_worker_s *worker)
 {
 {
 	struct starpu_task *task;
 	struct starpu_task *task;
+	struct starpu_sched_ctx *sched_ctx = worker->sched_ctx;
 
 
 	/* We can't tell in advance which task will be picked up, so we measure
 	/* We can't tell in advance which task will be picked up, so we measure
 	 * a timestamp, and will attribute it afterwards to the task. */
 	 * a timestamp, and will attribute it afterwards to the task. */
@@ -320,9 +331,12 @@ struct starpu_task *_starpu_pop_task(struct starpu_worker_s *worker)
 	/* perhaps there is some local task to be executed first */
 	/* perhaps there is some local task to be executed first */
 	task = _starpu_pop_local_task(worker);
 	task = _starpu_pop_local_task(worker);
 
 
-	if (!task && policy.pop_task)
-		task = policy.pop_task();
+	if (!task && sched_ctx->sched_policy->pop_task)
+		task = sched_ctx->sched_policy->pop_task();
 
 
+	if(task){
+	  printf("task %s poped by th %d with strateg %s\n", task->name, worker->workerid, task->sched_ctx->sched_policy->policy_name);
+	}
 	/* Note that we may get a NULL task in case the scheduler was unlocked
 	/* Note that we may get a NULL task in case the scheduler was unlocked
 	 * for some reason. */
 	 * for some reason. */
 	if (profiling && task)
 	if (profiling && task)
@@ -344,18 +358,18 @@ struct starpu_task *_starpu_pop_task(struct starpu_worker_s *worker)
 	return task;
 	return task;
 }
 }
 
 
-struct starpu_task *_starpu_pop_every_task(void)
+struct starpu_task *_starpu_pop_every_task(struct starpu_sched_ctx *sched_ctx)
 {
 {
-	STARPU_ASSERT(policy.pop_every_task);
+	STARPU_ASSERT(sched_ctx->sched_policy->pop_every_task);
 
 
 	/* TODO set profiling info */
 	/* TODO set profiling info */
-	return policy.pop_every_task();
+	return sched_ctx->sched_policy->pop_every_task();
 }
 }
 
 
 void _starpu_sched_post_exec_hook(struct starpu_task *task)
 void _starpu_sched_post_exec_hook(struct starpu_task *task)
 {
 {
-	if (policy.post_exec_hook)
-		policy.post_exec_hook(task);
+	if (task->sched_ctx->sched_policy->post_exec_hook)
+		task->sched_ctx->sched_policy->post_exec_hook(task);
 }
 }
 
 
 void _starpu_wait_on_sched_event(void)
 void _starpu_wait_on_sched_event(void)
@@ -389,3 +403,142 @@ int starpu_push_local_task(int workerid, struct starpu_task *task, int back)
 }
 }
 
 
 
 
+void _starpu_create_sched_ctx(struct starpu_sched_ctx *sched_ctx, const char *policy_name, int *workerids_in_ctx, int nworkerids_in_ctx, int init_ctx)
+{
+	sched_ctx->nworkers_in_ctx = nworkerids_in_ctx;
+	sched_ctx->sched_policy = malloc(sizeof(struct starpu_sched_policy_s));
+
+
+	struct starpu_machine_config_s *config = _starpu_get_machine_config();
+	int nworkers = config->topology.nworkers;
+       
+	int j;
+	/*all the workers are in this contex*/
+	if(workerids_in_ctx == NULL){
+		for(j = 0; j < nworkers; j++){
+			sched_ctx->workerid[j] = j;
+			struct starpu_worker_s *workerarg = &config->workers[j];
+			_starpu_delete_sched_ctx(workerarg->sched_ctx);
+			workerarg->sched_ctx = sched_ctx;
+		}
+		sched_ctx->nworkers_in_ctx = nworkers;
+	} else {
+		int i;
+		for(i = 0; i < nworkerids_in_ctx; i++){
+			sched_ctx->workerid[i] = workerids_in_ctx[i];
+			for(j = 0; j < nworkers; j++){
+				if(sched_ctx->workerid[i] == j){
+					struct starpu_worker_s *workerarg = &config->workers[j];
+					_starpu_delete_sched_ctx(workerarg->sched_ctx);
+					workerarg->sched_ctx = sched_ctx;
+				}
+			}
+		}
+	}
+
+	_starpu_init_sched_policy(config, sched_ctx, init_ctx, policy_name);
+
+
+	return;
+}
+
+static int _starpu_wait_for_all_threads_to_block(int nworkers)
+{
+	PTHREAD_MUTEX_LOCK(&blocking_ths_mutex);
+
+	while (nblocked_ths < nworkers)
+		PTHREAD_COND_WAIT(&blocking_ths_cond, &blocking_ths_mutex);
+       
+	PTHREAD_MUTEX_UNLOCK(&blocking_ths_mutex);
+
+	return 0;
+}
+
+static void _starpu_decrement_nblocked_ths(void)
+{
+	PTHREAD_MUTEX_LOCK(&blocking_ths_mutex);
+
+	nblocked_ths--;
+	
+	PTHREAD_MUTEX_UNLOCK(&blocking_ths_mutex);
+}
+
+void _starpu_increment_nblocked_ths(int nworkers)
+{
+	PTHREAD_MUTEX_LOCK(&blocking_ths_mutex);
+
+	if (++nblocked_ths == nworkers)
+		PTHREAD_COND_BROADCAST(&blocking_ths_cond);
+
+	PTHREAD_MUTEX_UNLOCK(&blocking_ths_mutex);
+}
+
+int set_changing_ctx_flag(starpu_worker_status changing_ctx, int nworkerids_in_ctx, int *workerids_in_ctx){
+	struct starpu_machine_config_s *config = _starpu_get_machine_config();
+
+	int i;
+	int nworkers = nworkerids_in_ctx == -1 ? config->topology.nworkers : nworkerids_in_ctx;
+	
+	struct starpu_worker_s *worker = NULL;
+	pthread_mutex_t *changing_ctx_mutex = NULL;
+	pthread_cond_t *changing_ctx_cond = NULL;
+
+	int workerid = -1;
+	
+	for(i = 0; i < nworkers; i++){
+		workerid = workerids_in_ctx == NULL ? i : workerids_in_ctx[i];
+		worker = &config->workers[workerid];
+		
+		changing_ctx_mutex = &worker->changing_ctx_mutex;
+		changing_ctx_cond = &worker->changing_ctx_cond;
+
+		PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
+		worker->status = changing_ctx;
+		worker->sched_ctx->nworkers_of_next_ctx = nworkers;
+		PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
+		
+		if(changing_ctx == STATUS_UNKNOWN){
+			PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
+			PTHREAD_COND_SIGNAL(changing_ctx_cond);
+			PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
+			_starpu_decrement_nblocked_ths();
+		}
+	}
+
+	if(changing_ctx == STATUS_CHANGING_CTX){
+		_starpu_wait_for_all_threads_to_block(nworkers);
+	}
+
+	return 0;
+}
+
+void starpu_create_sched_ctx(struct starpu_sched_ctx *sched_ctx, const char *policy_name, int *workerids_in_ctx, int nworkerids_in_ctx){
+    	  if(!starpu_task_wait_for_all()){
+		/*block the workers until the contex is switched*/
+		set_changing_ctx_flag(STATUS_CHANGING_CTX, nworkerids_in_ctx, workerids_in_ctx);
+		_starpu_create_sched_ctx(sched_ctx, policy_name, workerids_in_ctx, nworkerids_in_ctx, 0);
+		set_changing_ctx_flag(STATUS_UNKNOWN, nworkerids_in_ctx, workerids_in_ctx);
+	  }
+	  return;
+}
+
+void _starpu_delete_sched_ctx(struct starpu_sched_ctx *sched_ctx)
+{
+	struct starpu_machine_config_s *config = _starpu_get_machine_config();
+	int nworkers = config->topology.nworkers;
+       
+	unsigned used_sched_ctx = 0;
+	int i;
+	for(i = 0; i < nworkers; i++){
+		struct starpu_worker_s *workerarg = &config->workers[i];
+		if(sched_ctx != NULL && workerarg->sched_ctx == sched_ctx && workerarg->status != STATUS_JOINED)
+			used_sched_ctx++;
+	}
+
+	if(used_sched_ctx < 2  && sched_ctx != NULL){
+	  printf("free \n");
+		free(sched_ctx->sched_policy);
+		sched_ctx->sched_policy = NULL;
+		sched_ctx = NULL;
+	}
+}

+ 10 - 4
src/core/sched_policy.h

@@ -23,18 +23,24 @@
 #include <starpu_scheduler.h>
 #include <starpu_scheduler.h>
 
 
 struct starpu_machine_config_s;
 struct starpu_machine_config_s;
-struct starpu_sched_policy_s *_starpu_get_sched_policy(void);
+struct starpu_sched_policy_s *_starpu_get_sched_policy( struct starpu_sched_ctx *sched_ctx);
 
 
-void _starpu_init_sched_policy(struct starpu_machine_config_s *config);
-void _starpu_deinit_sched_policy(struct starpu_machine_config_s *config);
+//void _starpu_init_sched_policy(struct starpu_machine_config_s *config, struct starpu_sched_ctx *sched_ctx);
+void _starpu_deinit_sched_policy(struct starpu_machine_config_s *config, struct starpu_sched_ctx *sched_ctx);
 
 
 int _starpu_push_task(starpu_job_t task, unsigned job_is_already_locked);
 int _starpu_push_task(starpu_job_t task, unsigned job_is_already_locked);
 /* pop a task that can be executed on the worker */
 /* pop a task that can be executed on the worker */
 struct starpu_task *_starpu_pop_task(struct starpu_worker_s *worker);
 struct starpu_task *_starpu_pop_task(struct starpu_worker_s *worker);
 /* pop every task that can be executed on the worker */
 /* pop every task that can be executed on the worker */
-struct starpu_task *_starpu_pop_every_task(void);
+struct starpu_task *_starpu_pop_every_task(struct starpu_sched_ctx *sched_ctx);
 void _starpu_sched_post_exec_hook(struct starpu_task *task);
 void _starpu_sched_post_exec_hook(struct starpu_task *task);
 
 
 void _starpu_wait_on_sched_event(void);
 void _starpu_wait_on_sched_event(void);
 
 
+void _starpu_create_sched_ctx(struct starpu_sched_ctx *sched_ctx, const char *policy_name, int *workerid, int nworkerids, int init_ctx);
+
+void _starpu_delete_sched_ctx(struct starpu_sched_ctx *sched_ctx);
+
+void _starpu_increment_nblocked_ths(int nworkers);
+
 #endif // __SCHED_POLICY_H__
 #endif // __SCHED_POLICY_H__

+ 10 - 1
src/core/task.c

@@ -76,6 +76,8 @@ void starpu_task_init(struct starpu_task *task)
 	task->predicted = -1.0;
 	task->predicted = -1.0;
 
 
 	task->starpu_private = NULL;
 	task->starpu_private = NULL;
+
+	task->sched_ctx = NULL;
 }
 }
 
 
 /* Free all the ressources allocated for a task, without deallocating the task
 /* Free all the ressources allocated for a task, without deallocating the task
@@ -212,8 +214,11 @@ int _starpu_submit_job(starpu_job_t j, unsigned do_not_increment_nsubmitted)
 }
 }
 
 
 /* application should submit new tasks to StarPU through this function */
 /* application should submit new tasks to StarPU through this function */
-int starpu_task_submit(struct starpu_task *task)
+int starpu_task_submit_to_ctx(struct starpu_task *task, struct starpu_sched_ctx *sched_ctx)
 {
 {
+        if(task->sched_ctx == NULL && sched_ctx != NULL)
+	  task->sched_ctx = sched_ctx;
+
 	int ret;
 	int ret;
 	unsigned is_sync = task->synchronous;
 	unsigned is_sync = task->synchronous;
         _STARPU_LOG_IN();
         _STARPU_LOG_IN();
@@ -279,6 +284,10 @@ int starpu_task_submit(struct starpu_task *task)
 	return ret;
 	return ret;
 }
 }
 
 
+int starpu_task_submit(struct starpu_task *task){
+  starpu_task_submit_to_ctx(task, _starpu_get_initial_sched_ctx());
+}
+
 void starpu_display_codelet_stats(struct starpu_codelet_t *cl)
 void starpu_display_codelet_stats(struct starpu_codelet_t *cl)
 {
 {
 	unsigned worker;
 	unsigned worker;

+ 19 - 3
src/core/workers.c

@@ -40,6 +40,8 @@ static pthread_key_t worker_key;
 
 
 static struct starpu_machine_config_s config;
 static struct starpu_machine_config_s config;
 
 
+static struct starpu_sched_ctx sched_ctx;
+
 struct starpu_machine_config_s *_starpu_get_machine_config(void)
 struct starpu_machine_config_s *_starpu_get_machine_config(void)
 {
 {
 	return &config;
 	return &config;
@@ -144,6 +146,9 @@ static void _starpu_launch_drivers(struct starpu_machine_config_s *config)
 
 
 		workerarg->config = config;
 		workerarg->config = config;
 
 
+		PTHREAD_MUTEX_INIT(&workerarg->changing_ctx_mutex, NULL);
+		PTHREAD_COND_INIT(&workerarg->changing_ctx_cond, NULL);
+
 		PTHREAD_MUTEX_INIT(&workerarg->mutex, NULL);
 		PTHREAD_MUTEX_INIT(&workerarg->mutex, NULL);
 		PTHREAD_COND_INIT(&workerarg->ready_cond, NULL);
 		PTHREAD_COND_INIT(&workerarg->ready_cond, NULL);
 
 
@@ -344,7 +349,13 @@ int starpu_init(struct starpu_conf *user_conf)
 	_starpu_initialize_current_task_key();	
 	_starpu_initialize_current_task_key();	
 
 
 	/* initialize the scheduling policy */
 	/* initialize the scheduling policy */
-	_starpu_init_sched_policy(&config);
+
+	if(user_conf == NULL)
+	  _starpu_create_sched_ctx(&sched_ctx, NULL, NULL, -1, 1);
+	else
+	  _starpu_create_sched_ctx(&sched_ctx, user_conf->sched_policy_name, NULL, -1, 1);
+
+	//_starpu_init_sched_policy(&config, &sched_ctx);
 
 
 	_starpu_initialize_registered_performance_models();
 	_starpu_initialize_registered_performance_models();
 
 
@@ -406,7 +417,7 @@ static void _starpu_terminate_workers(struct starpu_machine_config_s *config)
 #endif
 #endif
 			}
 			}
 		}
 		}
-
+		worker->status = STATUS_JOINED;
 		STARPU_ASSERT(starpu_task_list_empty(&worker->local_tasks));
 		STARPU_ASSERT(starpu_task_list_empty(&worker->local_tasks));
 		starpu_job_list_delete(worker->terminated_jobs);
 		starpu_job_list_delete(worker->terminated_jobs);
 	}
 	}
@@ -476,7 +487,8 @@ void starpu_shutdown(void)
 	/* wait for their termination */
 	/* wait for their termination */
 	_starpu_terminate_workers(&config);
 	_starpu_terminate_workers(&config);
 
 
-	_starpu_deinit_sched_policy(&config);
+	//	_starpu_deinit_sched_policy(&config);
+	
 
 
 	_starpu_destroy_topology(&config);
 	_starpu_destroy_topology(&config);
 
 
@@ -640,3 +652,7 @@ void starpu_worker_set_sched_condition(int workerid, pthread_cond_t *sched_cond,
 	config.workers[workerid].sched_cond = sched_cond;
 	config.workers[workerid].sched_cond = sched_cond;
 	config.workers[workerid].sched_mutex = sched_mutex;
 	config.workers[workerid].sched_mutex = sched_mutex;
 }
 }
+
+struct starpu_sched_ctx* _starpu_get_initial_sched_ctx(void){
+	return &sched_ctx;
+}

+ 6 - 1
src/core/workers.h

@@ -77,7 +77,10 @@ struct starpu_worker_s {
 	unsigned worker_is_initialized;
 	unsigned worker_is_initialized;
 	starpu_worker_status status; /* what is the worker doing now ? (eg. CALLBACK) */
 	starpu_worker_status status; /* what is the worker doing now ? (eg. CALLBACK) */
 	char name[32];
 	char name[32];
-
+	struct starpu_sched_ctx *sched_ctx;
+	unsigned changing_ctx;
+	pthread_mutex_t changing_ctx_mutex;
+	pthread_cond_t changing_ctx_cond;
 #ifdef __GLIBC__
 #ifdef __GLIBC__
 	cpu_set_t initial_cpu_set;
 	cpu_set_t initial_cpu_set;
 	cpu_set_t current_cpu_set;
 	cpu_set_t current_cpu_set;
@@ -207,4 +210,6 @@ void _starpu_worker_set_status(int workerid, starpu_worker_status status);
 /* TODO move */
 /* TODO move */
 unsigned _starpu_execute_registered_progression_hooks(void);
 unsigned _starpu_execute_registered_progression_hooks(void);
 
 
+/* We keep an initial sched ctx which might be used in care no other ctx is available */
+struct starpu_sched_ctx* _starpu_get_initial_sched_ctx(void);
 #endif // __WORKERS_H__
 #endif // __WORKERS_H__

+ 1 - 1
src/datawizard/filters.c

@@ -86,7 +86,7 @@ starpu_data_handle starpu_data_get_sub_data(starpu_data_handle root_handle, unsi
 	{
 	{
 		unsigned next_child;
 		unsigned next_child;
 		next_child = va_arg(pa, unsigned);
 		next_child = va_arg(pa, unsigned);
-
+		printf("next child = %d \n", next_child);
 		STARPU_ASSERT(next_child < current_handle->nchildren);
 		STARPU_ASSERT(next_child < current_handle->nchildren);
 
 
 		current_handle = &current_handle->children[next_child];
 		current_handle = &current_handle->children[next_child];

+ 20 - 6
src/drivers/cpu/driver_cpu.c

@@ -144,27 +144,41 @@ void *_starpu_cpu_worker(void *arg)
 
 
 	int res;
 	int res;
 
 
+	pthread_cond_t *sched_cond = cpu_arg->sched_cond;
+	pthread_mutex_t *sched_mutex = cpu_arg->sched_mutex;
+	pthread_cond_t *changing_ctx_cond = &cpu_arg->changing_ctx_cond;
+	pthread_mutex_t *changing_ctx_mutex = &cpu_arg->changing_ctx_mutex;
+
+
 	while (_starpu_machine_is_running())
 	while (_starpu_machine_is_running())
 	{
 	{
 		STARPU_TRACE_START_PROGRESS(memnode);
 		STARPU_TRACE_START_PROGRESS(memnode);
 		_starpu_datawizard_progress(memnode, 1);
 		_starpu_datawizard_progress(memnode, 1);
 		STARPU_TRACE_END_PROGRESS(memnode);
 		STARPU_TRACE_END_PROGRESS(memnode);
 
 
-		PTHREAD_MUTEX_LOCK(cpu_arg->sched_mutex);
+		/*when contex is changing block the threads belonging to it*/
+		PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
+		if(cpu_arg->status == STATUS_CHANGING_CTX){
+			_starpu_increment_nblocked_ths(cpu_arg->sched_ctx->nworkers_of_next_ctx);
+			_starpu_block_worker(workerid, changing_ctx_cond, changing_ctx_mutex);
+		}
+		PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
+  
+		PTHREAD_MUTEX_LOCK(sched_mutex);
 
 
 		task = _starpu_pop_task(cpu_arg);
 		task = _starpu_pop_task(cpu_arg);
 	
 	
                 if (!task) 
                 if (!task) 
 		{
 		{
-			if (_starpu_worker_can_block(memnode))
-				_starpu_block_worker(workerid, cpu_arg->sched_cond, cpu_arg->sched_mutex);
-
-			PTHREAD_MUTEX_UNLOCK(cpu_arg->sched_mutex);
+		   if (_starpu_worker_can_block(memnode))
+				_starpu_block_worker(workerid, sched_cond, sched_mutex);
 
 
+			PTHREAD_MUTEX_UNLOCK(sched_mutex);
 			continue;
 			continue;
 		};
 		};
 
 
-		PTHREAD_MUTEX_UNLOCK(cpu_arg->sched_mutex);	
+		PTHREAD_MUTEX_UNLOCK(sched_mutex);	
+
 
 
 		STARPU_ASSERT(task);
 		STARPU_ASSERT(task);
 		j = _starpu_get_job_associated_to_task(task);
 		j = _starpu_get_job_associated_to_task(task);

+ 2 - 2
src/drivers/gordon/driver_gordon.c

@@ -390,7 +390,7 @@ void *gordon_worker_inject(struct starpu_worker_set_s *arg)
 #else
 #else
 			/* gordon should accept a little more work */
 			/* gordon should accept a little more work */
 			starpu_job_t j;
 			starpu_job_t j;
-			j =  _starpu_pop_task();
+			j =  _starpu_pop_task(arg->current_sched_ctx);
 	//		_STARPU_DEBUG("pop task %p\n", j);
 	//		_STARPU_DEBUG("pop task %p\n", j);
 			if (j) {
 			if (j) {
 				if (STARPU_GORDON_MAY_PERFORM(j)) {
 				if (STARPU_GORDON_MAY_PERFORM(j)) {
@@ -399,7 +399,7 @@ void *gordon_worker_inject(struct starpu_worker_set_s *arg)
 					inject_task(j, &arg->workers[0]);
 					inject_task(j, &arg->workers[0]);
 				}
 				}
 				else {
 				else {
-					_starpu_push_task(j, 0);
+				  _starpu_push_task(j, 0, arg->current_sched_ctx);
 				}
 				}
 			}
 			}
 #endif
 #endif

+ 39 - 36
src/sched_policies/deque_modeling_policy_data_aware.c

@@ -298,11 +298,11 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 	}
 	}
 }
 }
 
 
-static int _dm_push_task(struct starpu_task *task, unsigned prio)
+static int _dm_push_task(struct starpu_task *task, unsigned prio, struct starpu_sched_ctx *sched_ctx)
 {
 {
 	/* find the queue */
 	/* find the queue */
 	struct starpu_fifo_taskq_s *fifo;
 	struct starpu_fifo_taskq_s *fifo;
-	unsigned worker;
+	unsigned worker, worker_in_ctx;
 	int best = -1;
 	int best = -1;
 
 
 	double best_exp_end = 0.0;
 	double best_exp_end = 0.0;
@@ -315,8 +315,9 @@ static int _dm_push_task(struct starpu_task *task, unsigned prio)
 	/* A priori, we know all estimations */
 	/* A priori, we know all estimations */
 	int unknown = 0;
 	int unknown = 0;
 
 
-	for (worker = 0; worker < nworkers; worker++)
+	for (worker_in_ctx = 0; worker_in_ctx < nworkers; worker_in_ctx++)
 	{
 	{
+                worker = sched_ctx->workerid[worker_in_ctx];
 		double exp_end;
 		double exp_end;
 		
 		
 		fifo = queue_array[worker];
 		fifo = queue_array[worker];
@@ -378,11 +379,11 @@ static int _dm_push_task(struct starpu_task *task, unsigned prio)
 	return push_task_on_best_worker(task, best, model_best, prio);
 	return push_task_on_best_worker(task, best, model_best, prio);
 }
 }
 
 
-static int _dmda_push_task(struct starpu_task *task, unsigned prio)
+static int _dmda_push_task(struct starpu_task *task, unsigned prio, struct starpu_sched_ctx *sched_ctx)
 {
 {
 	/* find the queue */
 	/* find the queue */
 	struct starpu_fifo_taskq_s *fifo;
 	struct starpu_fifo_taskq_s *fifo;
-	unsigned worker;
+	unsigned worker, worker_in_ctx;
 	int best = -1;
 	int best = -1;
 	
 	
 	/* this flag is set if the corresponding worker is selected because
 	/* this flag is set if the corresponding worker is selected because
@@ -408,8 +409,10 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio)
 	/* A priori, we know all estimations */
 	/* A priori, we know all estimations */
 	int unknown = 0;
 	int unknown = 0;
 
 
-	for (worker = 0; worker < nworkers; worker++)
+	for (worker_in_ctx = 0; worker_in_ctx < nworkers; worker_in_ctx++)
 	{
 	{
+                worker = sched_ctx->workerid[worker_in_ctx];
+
 		fifo = queue_array[worker];
 		fifo = queue_array[worker];
 
 
 		/* Sometimes workers didn't take the tasks as early as we expected */
 		/* Sometimes workers didn't take the tasks as early as we expected */
@@ -475,8 +478,10 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio)
 	
 	
 	if (forced_best == -1)
 	if (forced_best == -1)
 	{
 	{
-		for (worker = 0; worker < nworkers; worker++)
-		{
+	        for (worker_in_ctx = 0; worker_in_ctx < nworkers; worker_in_ctx++)
+	        {
+		        worker = sched_ctx->workerid[worker_in_ctx];
+
 			fifo = queue_array[worker];
 			fifo = queue_array[worker];
 	
 	
 			if (!starpu_worker_may_execute_task(worker, task))
 			if (!starpu_worker_may_execute_task(worker, task))
@@ -527,41 +532,40 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio)
 	return push_task_on_best_worker(task, best, model_best, prio);
 	return push_task_on_best_worker(task, best, model_best, prio);
 }
 }
 
 
-static int dmda_push_sorted_task(struct starpu_task *task)
+static int dmda_push_sorted_task(struct starpu_task *task, struct starpu_sched_ctx *sched_ctx)
 {
 {
-	return _dmda_push_task(task, 2);
+  return _dmda_push_task(task, 2, sched_ctx);
 }
 }
 
 
-static int dm_push_prio_task(struct starpu_task *task)
+static int dm_push_prio_task(struct starpu_task *task, struct starpu_sched_ctx *sched_ctx)
 {
 {
-	return _dm_push_task(task, 1);
+	return _dm_push_task(task, 1, sched_ctx);
 }
 }
 
 
-static int dm_push_task(struct starpu_task *task)
+static int dm_push_task(struct starpu_task *task, struct starpu_sched_ctx *sched_ctx)
 {
 {
 	if (task->priority > 0)
 	if (task->priority > 0)
-		return _dm_push_task(task, 1);
+		return _dm_push_task(task, 1, sched_ctx);
 
 
-	return _dm_push_task(task, 0);
+	return _dm_push_task(task, 0, sched_ctx);
 }
 }
 
 
-static int dmda_push_prio_task(struct starpu_task *task)
+static int dmda_push_prio_task(struct starpu_task *task, struct starpu_sched_ctx *sched_ctx)
 {
 {
-	return _dmda_push_task(task, 1);
+	return _dmda_push_task(task, 1, sched_ctx);
 }
 }
 
 
-static int dmda_push_task(struct starpu_task *task)
+static int dmda_push_task(struct starpu_task *task, struct starpu_sched_ctx *sched_ctx)
 {
 {
 	if (task->priority > 0)
 	if (task->priority > 0)
-		return _dmda_push_task(task, 1);
+		return _dmda_push_task(task, 1, sched_ctx);
 
 
-	return _dmda_push_task(task, 0);
+	return _dmda_push_task(task, 0, sched_ctx);
 }
 }
 
 
-static void initialize_dmda_policy(struct starpu_machine_topology_s *topology, 
-	 __attribute__ ((unused)) struct starpu_sched_policy_s *_policy) 
+static void initialize_dmda_policy(struct starpu_sched_ctx *sched_ctx) 
 {
 {
-	nworkers = topology->nworkers;
+	nworkers = sched_ctx->nworkers_in_ctx;
 
 
 	const char *strval_alpha = getenv("STARPU_SCHED_ALPHA");
 	const char *strval_alpha = getenv("STARPU_SCHED_ALPHA");
 	if (strval_alpha)
 	if (strval_alpha)
@@ -575,13 +579,10 @@ static void initialize_dmda_policy(struct starpu_machine_topology_s *topology,
 	if (strval_gamma)
 	if (strval_gamma)
 		_gamma = atof(strval_gamma);
 		_gamma = atof(strval_gamma);
 
 
-	const char *strval_idle_power = getenv("STARPU_IDLE_POWER");
-	if (strval_idle_power)
-		idle_power = atof(strval_idle_power);
-
-	unsigned workerid;
-	for (workerid = 0; workerid < nworkers; workerid++)
+	unsigned workerid, workerid_ctx;
+	for (workerid_ctx = 0; workerid_ctx < nworkers; workerid_ctx++)
 	{
 	{
+                workerid = sched_ctx->workerid[workerid_ctx];
 		queue_array[workerid] = _starpu_create_fifo();
 		queue_array[workerid] = _starpu_create_fifo();
 	
 	
 		PTHREAD_MUTEX_INIT(&sched_mutex[workerid], NULL);
 		PTHREAD_MUTEX_INIT(&sched_mutex[workerid], NULL);
@@ -591,22 +592,24 @@ static void initialize_dmda_policy(struct starpu_machine_topology_s *topology,
 	}
 	}
 }
 }
 
 
-static void initialize_dmda_sorted_policy(struct starpu_machine_topology_s *topology,
-					struct starpu_sched_policy_s *_policy)
+static void initialize_dmda_sorted_policy(struct starpu_sched_ctx *sched_ctx)
 {
 {
-	initialize_dmda_policy(topology, _policy);
+	initialize_dmda_policy(sched_ctx);
 
 
 	/* The application may use any integer */
 	/* The application may use any integer */
 	starpu_sched_set_min_priority(INT_MIN);
 	starpu_sched_set_min_priority(INT_MIN);
 	starpu_sched_set_max_priority(INT_MAX);
 	starpu_sched_set_max_priority(INT_MAX);
 }
 }
 
 
-static void deinitialize_dmda_policy(struct starpu_machine_topology_s *topology, 
-	 __attribute__ ((unused)) struct starpu_sched_policy_s *_policy) 
+static void deinitialize_dmda_policy(struct starpu_sched_ctx *sched_ctx) 
 {
 {
-	unsigned workerid;
-	for (workerid = 0; workerid < topology->nworkers; workerid++)
+        unsigned workerid;
+	int workerid_in_ctx;
+        int nworkers = sched_ctx->nworkers_in_ctx;
+	for (workerid_in_ctx = 0; workerid_in_ctx < nworkers; workerid_in_ctx++){
+                workerid = sched_ctx->workerid[workerid_in_ctx];
 		_starpu_destroy_fifo(queue_array[workerid]);
 		_starpu_destroy_fifo(queue_array[workerid]);
+	}
 
 
 	_STARPU_DEBUG("total_task_cnt %ld ready_task_cnt %ld -> %f\n", total_task_cnt, ready_task_cnt, (100.0f*ready_task_cnt)/total_task_cnt);
 	_STARPU_DEBUG("total_task_cnt %ld ready_task_cnt %ld -> %f\n", total_task_cnt, ready_task_cnt, (100.0f*ready_task_cnt)/total_task_cnt);
 }
 }

+ 13 - 8
src/sched_policies/eager_central_policy.c

@@ -29,8 +29,7 @@ static struct starpu_fifo_taskq_s *fifo;
 static pthread_cond_t sched_cond;
 static pthread_cond_t sched_cond;
 static pthread_mutex_t sched_mutex;
 static pthread_mutex_t sched_mutex;
 
 
-static void initialize_eager_center_policy(struct starpu_machine_topology_s *topology, 
-		   __attribute__ ((unused)) struct starpu_sched_policy_s *_policy) 
+static void initialize_eager_center_policy(struct starpu_sched_ctx *sched_ctx) 
 {
 {
 	/* there is only a single queue in that trivial design */
 	/* there is only a single queue in that trivial design */
 	fifo = _starpu_create_fifo();
 	fifo = _starpu_create_fifo();
@@ -38,13 +37,17 @@ static void initialize_eager_center_policy(struct starpu_machine_topology_s *top
 	PTHREAD_MUTEX_INIT(&sched_mutex, NULL);
 	PTHREAD_MUTEX_INIT(&sched_mutex, NULL);
 	PTHREAD_COND_INIT(&sched_cond, NULL);
 	PTHREAD_COND_INIT(&sched_cond, NULL);
 
 
+
 	unsigned workerid;
 	unsigned workerid;
-	for (workerid = 0; workerid < topology->nworkers; workerid++)
+	int workerid_in_ctx;
+	int nworkers = sched_ctx->nworkers_in_ctx;
+	for (workerid_in_ctx = 0; workerid_in_ctx < nworkers; workerid_in_ctx++){
+	        workerid = sched_ctx->workerid[workerid_in_ctx];
 		starpu_worker_set_sched_condition(workerid, &sched_cond, &sched_mutex);
 		starpu_worker_set_sched_condition(workerid, &sched_cond, &sched_mutex);
+	}
 }
 }
 
 
-static void deinitialize_eager_center_policy(__attribute__ ((unused)) struct starpu_machine_topology_s *topology, 
-		   __attribute__ ((unused)) struct starpu_sched_policy_s *_policy) 
+static void deinitialize_eager_center_policy(struct starpu_sched_ctx *sched_ctx) 
 {
 {
 	/* TODO check that there is no task left in the queue */
 	/* TODO check that there is no task left in the queue */
 
 
@@ -52,19 +55,21 @@ static void deinitialize_eager_center_policy(__attribute__ ((unused)) struct sta
 	_starpu_destroy_fifo(fifo);
 	_starpu_destroy_fifo(fifo);
 }
 }
 
 
-static int push_task_eager_policy(struct starpu_task *task)
+static int push_task_eager_policy(struct starpu_task *task, struct starpu_sched_ctx *sched_ctx)
 {
 {
 	return _starpu_fifo_push_task(fifo, &sched_mutex, &sched_cond, task);
 	return _starpu_fifo_push_task(fifo, &sched_mutex, &sched_cond, task);
 }
 }
 
 
-static int push_prio_task_eager_policy(struct starpu_task *task)
+static int push_prio_task_eager_policy(struct starpu_task *task, struct starpu_sched_ctx *sched_ctx)
 {
 {
 	return _starpu_fifo_push_prio_task(fifo, &sched_mutex, &sched_cond, task);
 	return _starpu_fifo_push_prio_task(fifo, &sched_mutex, &sched_cond, task);
 }
 }
 
 
 static struct starpu_task *pop_every_task_eager_policy(void)
 static struct starpu_task *pop_every_task_eager_policy(void)
 {
 {
-	return _starpu_fifo_pop_every_task(fifo, &sched_mutex, starpu_worker_get_id());
+        int workerid = starpu_worker_get_id();
+        struct starpu_worker_s *worker = _starpu_get_worker_struct(workerid);
+	return _starpu_fifo_pop_every_task(fifo, &sched_mutex, workerid);
 }
 }
 
 
 static struct starpu_task *pop_task_eager_policy(void)
 static struct starpu_task *pop_task_eager_policy(void)

+ 9 - 6
src/sched_policies/eager_central_priority_policy.c

@@ -74,8 +74,7 @@ static void _starpu_destroy_priority_taskq(struct starpu_priority_taskq_s *prior
 	free(priority_queue);
 	free(priority_queue);
 }
 }
 
 
-static void initialize_eager_center_priority_policy(struct starpu_machine_topology_s *topology, 
-			__attribute__ ((unused))	struct starpu_sched_policy_s *_policy) 
+static void initialize_eager_center_priority_policy(struct starpu_sched_ctx *sched_ctx) 
 {
 {
 	/* In this policy, we support more than two levels of priority. */
 	/* In this policy, we support more than two levels of priority. */
 	starpu_sched_set_min_priority(MIN_LEVEL);
 	starpu_sched_set_min_priority(MIN_LEVEL);
@@ -87,13 +86,17 @@ static void initialize_eager_center_priority_policy(struct starpu_machine_topolo
 	PTHREAD_MUTEX_INIT(&global_sched_mutex, NULL);
 	PTHREAD_MUTEX_INIT(&global_sched_mutex, NULL);
 	PTHREAD_COND_INIT(&global_sched_cond, NULL);
 	PTHREAD_COND_INIT(&global_sched_cond, NULL);
 
 
+	int nworkers = sched_ctx->nworkers_in_ctx;
 	unsigned workerid;
 	unsigned workerid;
-	for (workerid = 0; workerid < topology->nworkers; workerid++)
+	int workerid_ctx;
+	for (workerid_ctx = 0; workerid_ctx < nworkers; workerid_ctx++)
+	{
+                workerid = sched_ctx->workerid[workerid_ctx];
 		starpu_worker_set_sched_condition(workerid, &global_sched_cond, &global_sched_mutex);
 		starpu_worker_set_sched_condition(workerid, &global_sched_cond, &global_sched_mutex);
+	}
 }
 }
 
 
-static void deinitialize_eager_center_priority_policy(struct starpu_machine_topology_s *topology __attribute__ ((unused)),
-		   __attribute__ ((unused)) struct starpu_sched_policy_s *_policy) 
+static void deinitialize_eager_center_priority_policy(struct starpu_sched_ctx *sched_ctx) 
 {
 {
 	/* TODO check that there is no task left in the queue */
 	/* TODO check that there is no task left in the queue */
 
 
@@ -101,7 +104,7 @@ static void deinitialize_eager_center_priority_policy(struct starpu_machine_topo
 	_starpu_destroy_priority_taskq(taskq);
 	_starpu_destroy_priority_taskq(taskq);
 }
 }
 
 
-static int _starpu_priority_push_task(struct starpu_task *task)
+static int _starpu_priority_push_task(struct starpu_task *task, struct starpu_sched_ctx *sched_ctx)
 {
 {
 	/* wake people waiting for a task */
 	/* wake people waiting for a task */
 	PTHREAD_MUTEX_LOCK(&global_sched_mutex);
 	PTHREAD_MUTEX_LOCK(&global_sched_mutex);

+ 0 - 1
src/sched_policies/fifo_queues.c

@@ -94,7 +94,6 @@ struct starpu_task *_starpu_fifo_pop_task(struct starpu_fifo_taskq_s *fifo_queue
 		
 		
 		STARPU_TRACE_JOB_POP(task, 0);
 		STARPU_TRACE_JOB_POP(task, 0);
 	}
 	}
-	
 	return task;
 	return task;
 }
 }
 
 

+ 100 - 98
src/sched_policies/heft.c

@@ -39,10 +39,9 @@ static double exp_end[STARPU_NMAXWORKERS];
 static double exp_len[STARPU_NMAXWORKERS];
 static double exp_len[STARPU_NMAXWORKERS];
 static double ntasks[STARPU_NMAXWORKERS];
 static double ntasks[STARPU_NMAXWORKERS];
 
 
-static void heft_init(struct starpu_machine_topology_s *topology, 
-	 __attribute__ ((unused)) struct starpu_sched_policy_s *_policy) 
+static void heft_init(struct starpu_sched_ctx *sched_ctx)
 {
 {
-	nworkers = topology->nworkers;
+	nworkers = sched_ctx->nworkers_in_ctx;
 
 
 	const char *strval_alpha = getenv("STARPU_SCHED_ALPHA");
 	const char *strval_alpha = getenv("STARPU_SCHED_ALPHA");
 	if (strval_alpha)
 	if (strval_alpha)
@@ -60,9 +59,10 @@ static void heft_init(struct starpu_machine_topology_s *topology,
 	if (strval_idle_power)
 	if (strval_idle_power)
 		idle_power = atof(strval_idle_power);
 		idle_power = atof(strval_idle_power);
 
 
-	unsigned workerid;
-	for (workerid = 0; workerid < nworkers; workerid++)
-	{
+	unsigned workerid, workerid_ctx;
+        for (workerid_ctx = 0; workerid_ctx < nworkers; workerid_ctx++)
+	  {
+	        workerid = sched_ctx->workerid[workerid_ctx];
 		exp_start[workerid] = starpu_timing_now();
 		exp_start[workerid] = starpu_timing_now();
 		exp_len[workerid] = 0.0;
 		exp_len[workerid] = 0.0;
 		exp_end[workerid] = exp_start[workerid]; 
 		exp_end[workerid] = exp_start[workerid]; 
@@ -139,97 +139,97 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 }
 }
 
 
 static void compute_all_performance_predictions(struct starpu_task *task,
 static void compute_all_performance_predictions(struct starpu_task *task,
-					double *local_task_length, double *exp_end,
-					double *max_exp_endp, double *best_exp_endp,
-					double *local_data_penalty,
-					double *local_power, int *forced_best,
-					struct starpu_task_bundle *bundle)
+						double *local_task_length, double *exp_end,
+						double *max_exp_endp, double *best_exp_endp,
+						double *local_data_penalty,
+						double *local_power, int *forced_best,
+						struct starpu_task_bundle *bundle)
 {
 {
-	int calibrating = 0;
-	double max_exp_end = DBL_MIN;
-	double best_exp_end = DBL_MAX;
-	int ntasks_best = -1;
-	double ntasks_best_end = 0.0;
-
-	/* A priori, we know all estimations */
-	int unknown = 0;
-
-	unsigned worker;
-	for (worker = 0; worker < nworkers; worker++)
+  int calibrating = 0;
+  double max_exp_end = DBL_MIN;
+  double best_exp_end = DBL_MAX;
+  int ntasks_best = -1;
+  double ntasks_best_end = 0.0;
+
+  /* A priori, we know all estimations */
+  int unknown = 0;
+
+  unsigned worker;
+  for (worker = 0; worker < nworkers; worker++)
+    {
+      /* Sometimes workers didn't take the tasks as early as we expected */
+      exp_start[worker] = STARPU_MAX(exp_start[worker], starpu_timing_now());
+      exp_end[worker] = exp_start[worker] + exp_len[worker];
+      if (exp_end[worker] > max_exp_end)
+	max_exp_end = exp_end[worker];
+
+      if (!starpu_worker_may_execute_task(worker, task))
 	{
 	{
-		/* Sometimes workers didn't take the tasks as early as we expected */
-		exp_start[worker] = STARPU_MAX(exp_start[worker], starpu_timing_now());
-		exp_end[worker] = exp_start[worker] + exp_len[worker];
-		if (exp_end[worker] > max_exp_end)
-			max_exp_end = exp_end[worker];
-
-		if (!starpu_worker_may_execute_task(worker, task))
-		{
-			/* no one on that queue may execute this task */
-			continue;
-		}
-
-		enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(worker);
-		unsigned memory_node = starpu_worker_get_memory_node(worker);
-
-		if (bundle)
-		{
-			local_task_length[worker] = starpu_task_bundle_expected_length(bundle, perf_arch);
-			local_data_penalty[worker] = starpu_task_bundle_expected_data_transfer_time(bundle, memory_node);
-			local_power[worker] = starpu_task_bundle_expected_power(bundle, perf_arch);
-		}
-		else {
-			local_task_length[worker] = starpu_task_expected_length(task, perf_arch);
-			local_data_penalty[worker] = starpu_task_expected_data_transfer_time(memory_node, task);
-			local_power[worker] = starpu_task_expected_power(task, perf_arch);
-		}
-
-		double ntasks_end = ntasks[worker] / starpu_worker_get_relative_speedup(perf_arch);
-
-		if (ntasks_best == -1
-				|| (!calibrating && ntasks_end < ntasks_best_end) /* Not calibrating, take better task */
-				|| (!calibrating && local_task_length[worker] == -1.0) /* Not calibrating but this worker is being calibrated */
-				|| (calibrating && local_task_length[worker] == -1.0 && ntasks_end < ntasks_best_end) /* Calibrating, compete this worker with other non-calibrated */
-				) {
-			ntasks_best_end = ntasks_end;
-			ntasks_best = worker;
-		}
-
-		if (local_task_length[worker] == -1.0)
-			/* we are calibrating, we want to speed-up calibration time
-			 * so we privilege non-calibrated tasks (but still
-			 * greedily distribute them to avoid dumb schedules) */
-			calibrating = 1;
-
-		if (local_task_length[worker] <= 0.0)
-			/* there is no prediction available for that task
-			 * with that arch yet, so switch to a greedy strategy */
-			unknown = 1;
-
-		if (unknown)
-			continue;
+	  /* no one on that queue may execute this task */
+	  continue;
+	}
 
 
-		exp_end[worker] = exp_start[worker] + exp_len[worker] + local_task_length[worker];
+      enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(worker);
+      unsigned memory_node = starpu_worker_get_memory_node(worker);
 
 
-		if (exp_end[worker] < best_exp_end)
-		{
-			/* a better solution was found */
-			best_exp_end = exp_end[worker];
-		}
-
-		if (local_power[worker] == -1.0)
-			local_power[worker] = 0.;
+      if (bundle)
+	{
+	  local_task_length[worker] = starpu_task_bundle_expected_length(bundle, perf_arch);
+	  local_data_penalty[worker] = starpu_task_bundle_expected_data_transfer_time(bundle, memory_node);
+	  local_power[worker] = starpu_task_bundle_expected_power(bundle, perf_arch);
 	}
 	}
+      else {
+	local_task_length[worker] = starpu_task_expected_length(task, perf_arch);
+	local_data_penalty[worker] = starpu_task_expected_data_transfer_time(memory_node, task);
+	local_power[worker] = starpu_task_expected_power(task, perf_arch);
+      }
+
+      double ntasks_end = ntasks[worker] / starpu_worker_get_relative_speedup(perf_arch);
+
+      if (ntasks_best == -1
+	  || (!calibrating && ntasks_end < ntasks_best_end) /* Not calibrating, take better task */
+	  || (!calibrating && local_task_length[worker] == -1.0) /* Not calibrating but this worker is being calibrated */
+	  || (calibrating && local_task_length[worker] == -1.0 && ntasks_end < ntasks_best_end) /* Calibrating, compete this worker with other non-calibrated */
+	  ) {
+	ntasks_best_end = ntasks_end;
+	ntasks_best = worker;
+      }
+
+      if (local_task_length[worker] == -1.0)
+	/* we are calibrating, we want to speed-up calibration time
+	 * so we privilege non-calibrated tasks (but still
+	 * greedily distribute them to avoid dumb schedules) */
+	calibrating = 1;
+
+      if (local_task_length[worker] <= 0.0)
+	/* there is no prediction available for that task
+	 * with that arch yet, so switch to a greedy strategy */
+	unknown = 1;
+
+      if (unknown)
+	continue;
+
+      exp_end[worker] = exp_start[worker] + exp_len[worker] + local_task_length[worker];
+
+      if (exp_end[worker] < best_exp_end)
+	{
+	  /* a better solution was found */
+	  best_exp_end = exp_end[worker];
+	}
+
+      if (local_power[worker] == -1.0)
+	local_power[worker] = 0.;
+    }
 
 
-	*forced_best = unknown?ntasks_best:-1;
+  *forced_best = unknown?ntasks_best:-1;
 
 
-	*best_exp_endp = best_exp_end;
-	*max_exp_endp = max_exp_end;
+  *best_exp_endp = best_exp_end;
+  *max_exp_endp = max_exp_end;
 }
 }
 
 
-static int _heft_push_task(struct starpu_task *task, unsigned prio)
+static int _heft_push_task(struct starpu_task *task, unsigned prio, struct starpu_sched_ctx *sched_ctx)
 {
 {
-	unsigned worker;
+	unsigned worker, worker_in_ctx;
 	int best = -1;
 	int best = -1;
 	
 	
 	/* this flag is set if the corresponding worker is selected because
 	/* this flag is set if the corresponding worker is selected because
@@ -270,8 +270,9 @@ static int _heft_push_task(struct starpu_task *task, unsigned prio)
 	double fitness[nworkers];
 	double fitness[nworkers];
 	double best_fitness = -1;
 	double best_fitness = -1;
 
 
-	for (worker = 0; worker < nworkers; worker++)
+	for (worker_in_ctx = 0; worker_in_ctx < nworkers; worker_in_ctx++)
 	{
 	{
+		worker = sched_ctx->workerid[worker_in_ctx];
 		if (!starpu_worker_may_execute_task(worker, task))
 		if (!starpu_worker_may_execute_task(worker, task))
 		{
 		{
 			/* no one on that queue may execute this task */
 			/* no one on that queue may execute this task */
@@ -328,25 +329,26 @@ static int _heft_push_task(struct starpu_task *task, unsigned prio)
 	return push_task_on_best_worker(task, best, model_best, prio);
 	return push_task_on_best_worker(task, best, model_best, prio);
 }
 }
 
 
-static int heft_push_prio_task(struct starpu_task *task)
+static int heft_push_prio_task(struct starpu_task *task, struct starpu_sched_ctx *sched_ctx)
 {
 {
-	return _heft_push_task(task, 1);
+        return _heft_push_task(task, 1, sched_ctx);
 }
 }
 
 
-static int heft_push_task(struct starpu_task *task)
+static int heft_push_task(struct starpu_task *task, struct starpu_sched_ctx *sched_ctx)
 {
 {
 	if (task->priority > 0)
 	if (task->priority > 0)
-		return _heft_push_task(task, 1);
+        	  return _heft_push_task(task, 1, sched_ctx);
 
 
-	return _heft_push_task(task, 0);
+	return _heft_push_task(task, 0, sched_ctx);
 }
 }
 
 
-static void heft_deinit(struct starpu_machine_topology_s *topology, 
-	 __attribute__ ((unused)) struct starpu_sched_policy_s *_policy) 
+static void heft_deinit(struct starpu_sched_ctx *sched_ctx) 
 {
 {
-	unsigned workerid;
-	for (workerid = 0; workerid < nworkers; workerid++)
-	{
+        unsigned workerid;
+	int workerid_in_ctx;
+	int nworkers = sched_ctx->nworkers_in_ctx;
+	for (workerid_in_ctx = 0; workerid_in_ctx < nworkers; workerid_in_ctx++){
+	        workerid = sched_ctx->workerid[workerid_in_ctx];
 		PTHREAD_MUTEX_DESTROY(&sched_mutex[workerid]);
 		PTHREAD_MUTEX_DESTROY(&sched_mutex[workerid]);
 		PTHREAD_COND_DESTROY(&sched_cond[workerid]);
 		PTHREAD_COND_DESTROY(&sched_cond[workerid]);
 	}
 	}

+ 25 - 17
src/sched_policies/parallel_greedy.c

@@ -16,7 +16,6 @@
 
 
 #include <core/workers.h>
 #include <core/workers.h>
 #include <sched_policies/fifo_queues.h>
 #include <sched_policies/fifo_queues.h>
-#include <common/barrier.h>
 
 
 /* the former is the actual queue, the latter some container */
 /* the former is the actual queue, the latter some container */
 static struct starpu_fifo_taskq_s *fifo;
 static struct starpu_fifo_taskq_s *fifo;
@@ -35,26 +34,31 @@ static int possible_combinations_cnt[STARPU_NMAXWORKERS];
 static int possible_combinations[STARPU_NMAXWORKERS][10];
 static int possible_combinations[STARPU_NMAXWORKERS][10];
 static int possible_combinations_size[STARPU_NMAXWORKERS][10];
 static int possible_combinations_size[STARPU_NMAXWORKERS][10];
 
 
-static void initialize_pgreedy_policy(struct starpu_machine_topology_s *topology, 
-		   __attribute__ ((unused)) struct starpu_sched_policy_s *_policy) 
+static void initialize_pgreedy_policy(struct starpu_sched_ctx *sched_ctx) 
 {
 {
 	/* masters pick tasks from that queue */
 	/* masters pick tasks from that queue */
 	fifo = _starpu_create_fifo();
 	fifo = _starpu_create_fifo();
 
 
-	_starpu_sched_find_worker_combinations(topology);
+	struct starpu_machine_config_s *config = _starpu_get_machine_config();
+        struct starpu_machine_topology_s *topology = &config->topology;
 
 
-	unsigned workerid;
-	unsigned ncombinedworkers, nworkers;
+	_starpu_sched_find_worker_combinations(topology);
 
 
+	unsigned workerid, workerid_ctx;;
+	unsigned ncombinedworkers, nworkers, nworkers_in_ctx;
+	
 	nworkers = topology->nworkers;
 	nworkers = topology->nworkers;
+	nworkers_in_ctx = sched_ctx->nworkers_in_ctx;
 	ncombinedworkers = starpu_combined_worker_get_count();
 	ncombinedworkers = starpu_combined_worker_get_count();
 
 
 	/* Find the master of each worker. We first assign the worker as its
 	/* Find the master of each worker. We first assign the worker as its
 	 * own master, and then iterate over the different worker combinations
 	 * own master, and then iterate over the different worker combinations
 	 * to find the biggest combination containing this worker. */
 	 * to find the biggest combination containing this worker. */
 
 
-	for (workerid = 0; workerid < nworkers; workerid++)
-	{
+	for (workerid_ctx = 0; workerid_ctx < nworkers_in_ctx; workerid_ctx++)
+	  {
+    	        workerid = sched_ctx->workerid[workerid_ctx];
+
 		int cnt = possible_combinations_cnt[workerid]++;
 		int cnt = possible_combinations_cnt[workerid]++;
 		possible_combinations[workerid][cnt] = workerid;
 		possible_combinations[workerid][cnt] = workerid;
 		possible_combinations_size[workerid][cnt] = 1;
 		possible_combinations_size[workerid][cnt] = 1;
@@ -90,14 +94,17 @@ static void initialize_pgreedy_policy(struct starpu_machine_topology_s *topology
 	PTHREAD_MUTEX_INIT(&sched_mutex, NULL);
 	PTHREAD_MUTEX_INIT(&sched_mutex, NULL);
 	PTHREAD_COND_INIT(&sched_cond, NULL);
 	PTHREAD_COND_INIT(&sched_cond, NULL);
 
 
-	for (workerid = 0; workerid < nworkers; workerid++)
-	{
+	for (workerid_ctx = 0; workerid_ctx < nworkers_in_ctx; workerid_ctx++)
+	  {
+                workerid = sched_ctx->workerid[workerid_ctx];
+
 		PTHREAD_MUTEX_INIT(&master_sched_mutex[workerid], NULL);
 		PTHREAD_MUTEX_INIT(&master_sched_mutex[workerid], NULL);
 		PTHREAD_COND_INIT(&master_sched_cond[workerid], NULL);
 		PTHREAD_COND_INIT(&master_sched_cond[workerid], NULL);
 	}
 	}
+	for (workerid_ctx = 0; workerid_ctx < nworkers_in_ctx; workerid_ctx++)
+          {
+	        workerid = sched_ctx->workerid[workerid_ctx];
 
 
-	for (workerid = 0; workerid < nworkers; workerid++)
-	{
 		/* slaves pick up tasks from their local queue, their master
 		/* slaves pick up tasks from their local queue, their master
 		 * will put tasks directly in that local list when a parallel
 		 * will put tasks directly in that local list when a parallel
 		 * tasks comes. */
 		 * tasks comes. */
@@ -119,15 +126,16 @@ static void initialize_pgreedy_policy(struct starpu_machine_topology_s *topology
 	}
 	}
 
 
 #if 0
 #if 0
-	for (workerid = 0; workerid < nworkers; workerid++)
-	{
+	for (workerid_ctx = 0; workerid_ctx < nworkers_in_ctx; workerid_ctx++)
+          {
+                workerid = sched_ctx->workerid[workerid_ctx];
+
 		fprintf(stderr, "MASTER of %d = %d\n", workerid, master_id[workerid]);
 		fprintf(stderr, "MASTER of %d = %d\n", workerid, master_id[workerid]);
 	}
 	}
 #endif
 #endif
 }
 }
 
 
-static void deinitialize_pgreedy_policy(__attribute__ ((unused)) struct starpu_machine_topology_s *topology, 
-		   __attribute__ ((unused)) struct starpu_sched_policy_s *_policy) 
+static void deinitialize_pgreedy_policy(__attribute__ ((unused)) struct starpu_sched_ctx *sched_ctx) 
 {
 {
 	/* TODO check that there is no task left in the queue */
 	/* TODO check that there is no task left in the queue */
 
 
@@ -135,7 +143,7 @@ static void deinitialize_pgreedy_policy(__attribute__ ((unused)) struct starpu_m
 	_starpu_destroy_fifo(fifo);
 	_starpu_destroy_fifo(fifo);
 }
 }
 
 
-static int push_task_pgreedy_policy(struct starpu_task *task)
+static int push_task_pgreedy_policy(struct starpu_task *task, __attribute__ ((unused)) struct starpu_sched_ctx *sched_ctx)
 {
 {
 	return _starpu_fifo_push_task(fifo, &sched_mutex, &sched_cond, task);
 	return _starpu_fifo_push_task(fifo, &sched_mutex, &sched_cond, task);
 }
 }

+ 67 - 50
src/sched_policies/parallel_heft.c

@@ -22,7 +22,6 @@
 #include <sched_policies/fifo_queues.h>
 #include <sched_policies/fifo_queues.h>
 #include <core/perfmodel/perfmodel.h>
 #include <core/perfmodel/perfmodel.h>
 #include <starpu_parameters.h>
 #include <starpu_parameters.h>
-#include <common/barrier.h>
 
 
 static pthread_mutex_t big_lock;
 static pthread_mutex_t big_lock;
 
 
@@ -46,7 +45,6 @@ static struct starpu_task *parallel_heft_pop_task(void)
 
 
 	int workerid = starpu_worker_get_id();
 	int workerid = starpu_worker_get_id();
 	struct starpu_fifo_taskq_s *fifo = queue_array[workerid];
 	struct starpu_fifo_taskq_s *fifo = queue_array[workerid];
-
 	task = _starpu_fifo_pop_task(fifo, -1);
 	task = _starpu_fifo_pop_task(fifo, -1);
 	if (task) {
 	if (task) {
 		double model = task->predicted;
 		double model = task->predicted;
@@ -186,40 +184,41 @@ static double compute_expected_end(int workerid, double length)
 
 
 static double compute_ntasks_end(int workerid)
 static double compute_ntasks_end(int workerid)
 {
 {
-	enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(workerid);
-	if (workerid < (int)nworkers)
-	{
-		/* This is a basic worker */
-		struct starpu_fifo_taskq_s *fifo;
-		fifo = queue_array[workerid];
-		return fifo->ntasks / starpu_worker_get_relative_speedup(perf_arch);
-	}
-	else {
-		/* This is a combined worker, the expected end is the end for the latest worker */
-		int worker_size;
-		int *combined_workerid;
-		starpu_combined_worker_get_description(workerid, &worker_size, &combined_workerid);
-
-		int ntasks_end=0;
-
-		int i;
-		for (i = 0; i < worker_size; i++)
-		{
-			struct starpu_fifo_taskq_s *fifo;
-			fifo = queue_array[combined_workerid[i]];
-			/* XXX: this is actually bogus: not all pushed tasks are necessarily parallel... */
-			ntasks_end = STARPU_MAX(ntasks_end, fifo->ntasks / starpu_worker_get_relative_speedup(perf_arch));
-		}
+  enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(workerid);
+  if (workerid < (int)nworkers)
+    {
+      /* This is a basic worker */
+      struct starpu_fifo_taskq_s *fifo;
+      fifo = queue_array[workerid];
+      return fifo->ntasks / starpu_worker_get_relative_speedup(perf_arch);
+    }
+  else {
+    /* This is a combined worker, the expected end is the end for the latest worker */
+    int worker_size;
+    int *combined_workerid;
+    starpu_combined_worker_get_description(workerid, &worker_size, &combined_workerid);
+
+    int ntasks_end;
+
+    int i;
+    for (i = 0; i < worker_size; i++)
+      {
+	struct starpu_fifo_taskq_s *fifo;
+	fifo = queue_array[combined_workerid[i]];
+	/* XXX: this is actually bogus: not all pushed tasks are necessarily parallel... */
+	ntasks_end = STARPU_MAX(ntasks_end, fifo->ntasks / starpu_worker_get_relative_speedup(perf_arch));
+      }
 
 
-		return ntasks_end;
-	}
+    return ntasks_end;
+  }
 }
 }
 
 
-static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio)
+static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio, struct starpu_sched_ctx *sched_ctx)
 {
 {
 	/* find the queue */
 	/* find the queue */
 	struct starpu_fifo_taskq_s *fifo;
 	struct starpu_fifo_taskq_s *fifo;
-	unsigned worker;
+	unsigned worker, worker_in_ctx;
+
 	int best = -1;
 	int best = -1;
 	
 	
 	/* this flag is set if the corresponding worker is selected because
 	/* this flag is set if the corresponding worker is selected because
@@ -244,11 +243,13 @@ static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio)
 	double ntasks_best_end = 0.0;
 	double ntasks_best_end = 0.0;
 	int calibrating = 0;
 	int calibrating = 0;
 
 
-	/* A priori, we know all estimations */
+        /* A priori, we know all estimations */
 	int unknown = 0;
 	int unknown = 0;
 
 
-	for (worker = 0; worker < nworkers; worker++)
+	for (worker_in_ctx = 0; worker_in_ctx < nworkers; worker_in_ctx++)
 	{
 	{
+                worker = sched_ctx->workerid[worker_in_ctx];
+
 		fifo = queue_array[worker];
 		fifo = queue_array[worker];
 
 
 		/* Sometimes workers didn't take the tasks as early as we expected */
 		/* Sometimes workers didn't take the tasks as early as we expected */
@@ -258,8 +259,10 @@ static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio)
 			max_exp_end = fifo->exp_end;
 			max_exp_end = fifo->exp_end;
 	}
 	}
 
 
-	for (worker = 0; worker < (nworkers+ncombinedworkers); worker++)
+	for (worker_in_ctx = 0; worker_in_ctx < nworkers; worker_in_ctx++)
 	{
 	{
+                worker = sched_ctx->workerid[worker_in_ctx];
+
 		if (!starpu_combined_worker_may_execute_task(worker, task))
 		if (!starpu_combined_worker_may_execute_task(worker, task))
 		{
 		{
 			/* no one on that queue may execute this task */
 			/* no one on that queue may execute this task */
@@ -321,8 +324,10 @@ static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio)
 	
 	
 	if (forced_best == -1)
 	if (forced_best == -1)
 	{
 	{
-		for (worker = 0; worker < nworkers+ncombinedworkers; worker++)
-		{
+
+	        for (worker_in_ctx = 0; worker_in_ctx < nworkers; worker_in_ctx++)
+	        {
+		        worker = sched_ctx->workerid[worker_in_ctx];
 
 
 			if (skip_worker[worker])
 			if (skip_worker[worker])
 			{
 			{
@@ -370,23 +375,26 @@ static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio)
 	return push_task_on_best_worker(task, best, model_best, prio);
 	return push_task_on_best_worker(task, best, model_best, prio);
 }
 }
 
 
-static int parallel_heft_push_prio_task(struct starpu_task *task)
+static int parallel_heft_push_prio_task(struct starpu_task *task, struct starpu_sched_ctx *sched_ctx)
 {
 {
-	return _parallel_heft_push_task(task, 1);
+  return _parallel_heft_push_task(task, 1, sched_ctx);
 }
 }
 
 
-static int parallel_heft_push_task(struct starpu_task *task)
+static int parallel_heft_push_task(struct starpu_task *task, struct starpu_sched_ctx *sched_ctx)
 {
 {
+  printf("pheft: push task non null = %d into ctx non null = %d\n", task != NULL, sched_ctx != NULL);
 	if (task->priority == STARPU_MAX_PRIO)
 	if (task->priority == STARPU_MAX_PRIO)
-		return _parallel_heft_push_task(task, 1);
+	  return _parallel_heft_push_task(task, 1, sched_ctx);
 
 
-	return _parallel_heft_push_task(task, 0);
+	return _parallel_heft_push_task(task, 0, sched_ctx);
 }
 }
 
 
-static void initialize_parallel_heft_policy(struct starpu_machine_topology_s *topology, 
-	 __attribute__ ((unused)) struct starpu_sched_policy_s *_policy) 
+static void initialize_parallel_heft_policy(struct starpu_sched_ctx *sched_ctx) 
 {
 {
-	nworkers = topology->nworkers;
+  printf("initialize parallel heft\n");
+	nworkers = sched_ctx->nworkers_in_ctx;
+	struct starpu_machine_config_s *config = _starpu_get_machine_config();
+	struct starpu_machine_topology_s *topology = &config->topology;
 
 
 	const char *strval_alpha = getenv("STARPU_SCHED_ALPHA");
 	const char *strval_alpha = getenv("STARPU_SCHED_ALPHA");
 	if (strval_alpha)
 	if (strval_alpha)
@@ -408,9 +416,10 @@ static void initialize_parallel_heft_policy(struct starpu_machine_topology_s *to
 
 
 	ncombinedworkers = topology->ncombinedworkers;
 	ncombinedworkers = topology->ncombinedworkers;
 
 
-	unsigned workerid;
-	for (workerid = 0; workerid < nworkers; workerid++)
+	unsigned workerid, workerid_ctx;
+	for (workerid_ctx = 0; workerid_ctx < nworkers; workerid_ctx++)
 	{
 	{
+                workerid = sched_ctx->workerid[workerid_ctx];
 		queue_array[workerid] = _starpu_create_fifo();
 		queue_array[workerid] = _starpu_create_fifo();
 	
 	
 		PTHREAD_MUTEX_INIT(&sched_mutex[workerid], NULL);
 		PTHREAD_MUTEX_INIT(&sched_mutex[workerid], NULL);
@@ -423,13 +432,18 @@ static void initialize_parallel_heft_policy(struct starpu_machine_topology_s *to
 
 
 	/* We pre-compute an array of all the perfmodel archs that are applicable */
 	/* We pre-compute an array of all the perfmodel archs that are applicable */
 	unsigned total_worker_count = nworkers + ncombinedworkers;
 	unsigned total_worker_count = nworkers + ncombinedworkers;
-
+	printf("ncombinedworkers = %d\n", ncombinedworkers);
 	unsigned used_perf_archtypes[STARPU_NARCH_VARIATIONS];
 	unsigned used_perf_archtypes[STARPU_NARCH_VARIATIONS];
 	memset(used_perf_archtypes, 0, sizeof(used_perf_archtypes));
 	memset(used_perf_archtypes, 0, sizeof(used_perf_archtypes));
 
 
-	for (workerid = 0; workerid < total_worker_count; workerid++)
+	int nworkers_machine = topology->nworkers;
+
+	for (workerid_ctx = 0; workerid_ctx < total_worker_count; workerid_ctx++)
 	{
 	{
+	  workerid = workerid_ctx >= nworkers ? (nworkers_machine + workerid_ctx - nworkers) : sched_ctx->workerid[workerid_ctx];
+	  printf("workerid = %d\n", workerid);
 		enum starpu_perf_archtype perf_archtype = starpu_worker_get_perf_archtype(workerid);
 		enum starpu_perf_archtype perf_archtype = starpu_worker_get_perf_archtype(workerid);
+		printf("perf_archtype = %d\n", perf_archtype);
 		used_perf_archtypes[perf_archtype] = 1;
 		used_perf_archtypes[perf_archtype] = 1;
 	}
 	}
 
 
@@ -443,12 +457,15 @@ static void initialize_parallel_heft_policy(struct starpu_machine_topology_s *to
 	}
 	}
 }
 }
 
 
-static void deinitialize_parallel_heft_policy(struct starpu_machine_topology_s *topology, 
-	 __attribute__ ((unused)) struct starpu_sched_policy_s *_policy) 
+static void deinitialize_parallel_heft_policy(struct starpu_sched_ctx *sched_ctx) 
 {
 {
 	unsigned workerid;
 	unsigned workerid;
-	for (workerid = 0; workerid < topology->nworkers; workerid++)
+	int workerid_in_ctx;
+        int nworkers = sched_ctx->nworkers_in_ctx;
+	for (workerid_in_ctx = 0; workerid_in_ctx < nworkers; workerid_in_ctx++){
+                workerid = sched_ctx->workerid[workerid_in_ctx];
 		_starpu_destroy_fifo(queue_array[workerid]);
 		_starpu_destroy_fifo(queue_array[workerid]);
+	}
 }
 }
 
 
 /* TODO: use post_exec_hook to fix the expected start */
 /* TODO: use post_exec_hook to fix the expected start */

+ 18 - 13
src/sched_policies/random_policy.c

@@ -25,17 +25,19 @@ static unsigned nworkers;
 static pthread_cond_t sched_cond[STARPU_NMAXWORKERS];
 static pthread_cond_t sched_cond[STARPU_NMAXWORKERS];
 static pthread_mutex_t sched_mutex[STARPU_NMAXWORKERS];
 static pthread_mutex_t sched_mutex[STARPU_NMAXWORKERS];
 
 
-static int _random_push_task(struct starpu_task *task, unsigned prio)
+static int _random_push_task(struct starpu_task *task, unsigned prio, struct starpu_sched_ctx *sched_ctx)
 {
 {
 	/* find the queue */
 	/* find the queue */
-	unsigned worker;
+        unsigned worker, worker_in_ctx;
 
 
 	unsigned selected = 0;
 	unsigned selected = 0;
 
 
 	double alpha_sum = 0.0;
 	double alpha_sum = 0.0;
 
 
-	for (worker = 0; worker < nworkers; worker++)
+	for (worker_in_ctx = 0; worker_in_ctx < nworkers; worker_in_ctx++)
 	{
 	{
+                worker = sched_ctx->workerid[worker_in_ctx];
+
 		enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(worker);
 		enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(worker);
 		alpha_sum += starpu_worker_get_relative_speedup(perf_arch);
 		alpha_sum += starpu_worker_get_relative_speedup(perf_arch);
 	}
 	}
@@ -44,8 +46,10 @@ static int _random_push_task(struct starpu_task *task, unsigned prio)
 //	_STARPU_DEBUG("my rand is %e\n", random);
 //	_STARPU_DEBUG("my rand is %e\n", random);
 
 
 	double alpha = 0.0;
 	double alpha = 0.0;
-	for (worker = 0; worker < nworkers; worker++)
+	for (worker_in_ctx = 0; worker_in_ctx < nworkers; worker_in_ctx++)
 	{
 	{
+                worker = sched_ctx->workerid[worker_in_ctx];
+
 		enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(worker);
 		enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(worker);
 		double worker_alpha = starpu_worker_get_relative_speedup(perf_arch);
 		double worker_alpha = starpu_worker_get_relative_speedup(perf_arch);
 
 
@@ -62,26 +66,27 @@ static int _random_push_task(struct starpu_task *task, unsigned prio)
 	return starpu_push_local_task(selected, task, prio);
 	return starpu_push_local_task(selected, task, prio);
 }
 }
 
 
-static int random_push_prio_task(struct starpu_task *task)
+static int random_push_prio_task(struct starpu_task *task, struct starpu_sched_ctx *sched_ctx)
 {
 {
-	return _random_push_task(task, 1);
+        return _random_push_task(task, 1, sched_ctx);
 }
 }
 
 
-static int random_push_task(struct starpu_task *task)
+static int random_push_task(struct starpu_task *task, struct starpu_sched_ctx *sched_ctx)
 {
 {
-	return _random_push_task(task, 0);
+        return _random_push_task(task, 0, sched_ctx);
 }
 }
 
 
-static void initialize_random_policy(struct starpu_machine_topology_s *topology, 
-	 __attribute__ ((unused)) struct starpu_sched_policy_s *_policy) 
+static void initialize_random_policy(struct starpu_sched_ctx *sched_ctx) 
 {
 {
 	starpu_srand48(time(NULL));
 	starpu_srand48(time(NULL));
 
 
-	nworkers = topology->nworkers;
+	nworkers = sched_ctx->nworkers_in_ctx;	
 
 
-	unsigned workerid;
-	for (workerid = 0; workerid < nworkers; workerid++)
+	unsigned workerid, workerid_ctx;
+	for (workerid_ctx = 0; workerid_ctx < nworkers; workerid_ctx++)
 	{
 	{
+                workerid = sched_ctx->workerid[workerid_ctx];
+	
 		PTHREAD_MUTEX_INIT(&sched_mutex[workerid], NULL);
 		PTHREAD_MUTEX_INIT(&sched_mutex[workerid], NULL);
 		PTHREAD_COND_INIT(&sched_cond[workerid], NULL);
 		PTHREAD_COND_INIT(&sched_cond[workerid], NULL);
 	
 	

+ 6 - 6
src/sched_policies/work_stealing_policy.c

@@ -168,7 +168,7 @@ static struct starpu_task *ws_pop_task(void)
 	return task;
 	return task;
 }
 }
 
 
-int ws_push_task(struct starpu_task *task)
+int ws_push_task(struct starpu_task *task, struct starpu_sched_ctx *sched_ctx)
 {
 {
 	starpu_job_t j = _starpu_get_job_associated_to_task(task);
 	starpu_job_t j = _starpu_get_job_associated_to_task(task);
 
 
@@ -192,18 +192,18 @@ int ws_push_task(struct starpu_task *task)
         return 0;
         return 0;
 }
 }
 
 
-static void initialize_ws_policy(struct starpu_machine_topology_s *topology, 
-				__attribute__ ((unused)) struct starpu_sched_policy_s *_policy) 
+static void initialize_ws_policy(struct starpu_sched_ctx *sched_ctx) 
 {
 {
-	nworkers = topology->nworkers;
+	nworkers = sched_ctx->nworkers_in_ctx;
 	rr_worker = 0;
 	rr_worker = 0;
 
 
 	PTHREAD_MUTEX_INIT(&global_sched_mutex, NULL);
 	PTHREAD_MUTEX_INIT(&global_sched_mutex, NULL);
 	PTHREAD_COND_INIT(&global_sched_cond, NULL);
 	PTHREAD_COND_INIT(&global_sched_cond, NULL);
 
 
-	unsigned workerid;
-	for (workerid = 0; workerid < nworkers; workerid++)
+	unsigned workerid, workerid_ctx;
+	for (workerid_ctx = 0; workerid_ctx < nworkers; workerid_ctx++)
 	{
 	{
+                workerid = sched_ctx->workerid[workerid_ctx];
 		queue_array[workerid] = _starpu_create_deque();
 		queue_array[workerid] = _starpu_create_deque();
 		starpu_worker_set_sched_condition(workerid, &global_sched_cond, &global_sched_mutex);
 		starpu_worker_set_sched_condition(workerid, &global_sched_cond, &global_sched_mutex);
 	}
 	}