Quellcode durchsuchen

bug fixing + update all scheduling strategy with the contexts -> !!!!work_stealing still doesn't work

Andra Hugo vor 14 Jahren
Ursprung
Commit
f746bcc3eb

+ 4 - 2
examples/cholesky_and_lu/cholesky/cholesky_implicit.c

@@ -163,7 +163,9 @@ static double cholesky(float *matA, unsigned size, unsigned ld, unsigned nblocks
 		f2.get_child_ops = NULL;
 
 	starpu_data_map_filters(dataA, 2, &f, &f2);
-	return _cholesky(dataA, nblocks, sched_ctx, timing);
+	double gflops = _cholesky(dataA, nblocks, sched_ctx, timing);
+	starpu_data_unregister(dataA);
+	return gflops;
 }
 
 double run_cholesky_implicit(unsigned sched_ctx, int start, int argc, char **argv, double *timing, pthread_barrier_t *barrier)
@@ -283,7 +285,7 @@ double run_cholesky_implicit(unsigned sched_ctx, int start, int argc, char **arg
 			}
 	        }
 	}
-
+	starpu_data_free_pinned_if_possible((void *)mat);
 	//	starpu_helper_cublas_shutdown();
 	//	starpu_shutdown();
 

+ 1 - 1
examples/cholesky_and_lu/cholesky_and_lu.c

@@ -113,7 +113,7 @@ void cholesky_vs_cholesky(params *p1, params *p2, params *p3,
 
   //  printf("\n");
 
-  p2->ctx = starpu_create_sched_ctx("heft", procs2, ncpus2, "cholesky2");
+  p2->ctx = starpu_create_sched_ctx("prio", procs2, ncpus2, "cholesky2");
   p1->the_other_ctx = (int)p2->ctx;
   p2->procs = procs2;
   p2->ncpus = ncpus2;

+ 1 - 1
include/starpu_scheduler.h

@@ -67,7 +67,7 @@ struct starpu_sched_policy_s {
 	void (*init_sched)(unsigned);
 
 	/* Initialize the scheduling policy only for certain workers. */
-	void (*init_sched_for_workers)(unsigned, int);
+	void (*init_sched_for_workers)(unsigned, unsigned);
 
 	/* Cleanup the scheduling policy. */
 	void (*deinit_sched)(unsigned);

+ 4 - 9
src/core/sched_ctx.c

@@ -1,8 +1,6 @@
 #include <core/sched_policy.h>
 #include <core/sched_ctx.h>
 
-static struct _starpu_barrier_counter_t workers_barrier[STARPU_NMAX_SCHED_CTXS];
-
 static unsigned _starpu_get_first_available_sched_ctx_id(struct starpu_machine_config_s *config);
 static unsigned _starpu_get_first_free_sched_ctx_in_worker_list(struct starpu_worker_s *worker);
 static void _starpu_rearange_sched_ctx_workerids(struct starpu_sched_ctx *sched_ctx, int old_nworkerids_in_ctx);
@@ -201,6 +199,7 @@ static void free_sched_ctx_mem(struct starpu_sched_ctx *sched_ctx)
 	free(sched_ctx->sched_policy);
 	free(sched_ctx->sched_mutex);
 	free(sched_ctx->sched_cond);
+	/* just for debug in order to seg fault if we use these structures after del */
 	sched_ctx->sched_policy = NULL;
 	sched_ctx->sched_mutex = NULL;
 	sched_ctx->sched_cond = NULL;
@@ -367,7 +366,7 @@ static void _starpu_remove_workers_from_sched_ctx(int *workerids_in_ctx, int nwo
 void starpu_remove_workers_from_sched_ctx(int *workerids_in_ctx, int nworkerids_in_ctx, 
 					  unsigned sched_ctx_id)
 {
-	  /* wait for the workers concerned by the change of contex                       
+	  /* wait for the workers concerned by the change of contex    
 	   * to finish their work in the previous context */
 	if(!starpu_wait_for_all_tasks_of_workers(workerids_in_ctx, nworkerids_in_ctx))
 	  {
@@ -385,10 +384,8 @@ void _starpu_init_all_sched_ctx(struct starpu_machine_config_s *config)
 {
 	unsigned i;
 	for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
-	  {
 		config->sched_ctxs[i].sched_ctx_id = STARPU_NMAX_SCHED_CTXS;
-		_starpu_barrier_counter_init(&workers_barrier[i], 0);
-	  }
+
 	return;
 }
 
@@ -399,10 +396,8 @@ void _starpu_init_sched_ctx_for_worker(unsigned workerid)
 	worker->sched_ctx = (struct starpu_sched_ctx**)malloc(STARPU_NMAX_SCHED_CTXS * sizeof(struct starpu_sched_ctx*));
 	unsigned i;
 	for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
-	  {
 		worker->sched_ctx[i] = NULL;
-		worker->workers_barrier[i] = NULL;
-	  }
+
 	return;
 }
 

+ 0 - 6
src/core/sched_ctx.h

@@ -74,12 +74,6 @@ struct starpu_sched_ctx*  _starpu_create_sched_ctx(const char *policy_name, int
 /* delete all sched_ctx */
 void _starpu_delete_all_sched_ctxs();
 
-/* Workers are blocked when constructing or modifying a context */
-void _starpu_increment_nblocked_ths(struct _starpu_barrier_counter_t **barrier);
-  void _starpu_decrement_nblocked_ths(struct _starpu_barrier_counter_t **barrier);
-int _starpu_wait_for_all_threads_to_block(struct _starpu_barrier_counter_t *workers_barrier);
-int _starpu_wait_for_all_threads_to_wake_up(struct _starpu_barrier_counter_t *workers_barrier);
-
 /* Keeps track of the number of tasks currently submitted to a worker */
 void _starpu_decrement_nsubmitted_tasks_of_worker(int workerid);
 void _starpu_increment_nsubmitted_tasks_of_worker(int workerid);

+ 1 - 3
src/core/sched_policy.c

@@ -366,16 +366,14 @@ struct starpu_task *_starpu_pop_task(struct starpu_worker_s *worker)
 		    
 		    if(sched_ctx != NULL)
 		      {
-			if(i > 2)
-			  printf("i = %d\n", i);
 			sched_ctx_mutex = _starpu_get_sched_mutex(sched_ctx, worker->workerid);
 			if(sched_ctx_mutex != NULL)
 			  {
 			    PTHREAD_MUTEX_LOCK(sched_ctx_mutex);
 			    if (sched_ctx->sched_policy->pop_task)
 			      {
-				task = sched_ctx->sched_policy->pop_task(sched_ctx->sched_ctx_id);
 				PTHREAD_MUTEX_UNLOCK(sched_ctx_mutex);
+				task = sched_ctx->sched_policy->pop_task(sched_ctx->sched_ctx_id);
 				break;
 			      }
 			    PTHREAD_MUTEX_UNLOCK(sched_ctx_mutex);

+ 1 - 4
src/core/workers.c

@@ -148,9 +148,6 @@ static void _starpu_launch_drivers(struct starpu_machine_config_s *config)
 
 		workerarg->config = config;
 
-		PTHREAD_MUTEX_INIT(&workerarg->changing_ctx_mutex, NULL);
-		PTHREAD_COND_INIT(&workerarg->changing_ctx_cond, NULL);
-
 		_starpu_barrier_counter_init(&workerarg->tasks_barrier, 0);
 
 		PTHREAD_MUTEX_INIT(&workerarg->mutex, NULL);
@@ -429,7 +426,7 @@ static void _starpu_terminate_workers(struct starpu_machine_config_s *config)
 #endif
 			}
 		}
-		worker->status = STATUS_JOINED;
+		//		worker->status = STATUS_JOINED;
 		STARPU_ASSERT(starpu_task_list_empty(&worker->local_tasks));
 		starpu_job_list_delete(worker->terminated_jobs);
 	}

+ 0 - 8
src/core/workers.h

@@ -76,20 +76,12 @@ struct starpu_worker_s {
 	unsigned worker_is_running;
 	unsigned worker_is_initialized;
 	starpu_worker_status status; /* what is the worker doing now ? (eg. CALLBACK) */
-	starpu_worker_status blocking_status; /* blocked or not */
 	char name[32];
 
 	struct starpu_sched_ctx **sched_ctx;
 	unsigned nctxs; /* the no of contexts a worker belongs to*/
-	unsigned changing_ctx;
-	pthread_mutex_t changing_ctx_mutex;
-	pthread_cond_t changing_ctx_cond;
-	int nworkers_of_next_ctx;
 
 	struct _starpu_barrier_counter_t tasks_barrier; /* wait for the tasks submitted */
-
-	/* block workers to update their affiliation to a context */
-	struct _starpu_barrier_counter_t *workers_barrier[STARPU_NMAX_SCHED_CTXS];
        
 #ifdef __GLIBC__
 	cpu_set_t initial_cpu_set;

+ 3 - 1
src/drivers/cuda/driver_cuda.c

@@ -109,7 +109,9 @@ static void init_context(int devid)
 		STARPU_CUDA_REPORT_ERROR(cures);
 
 	/* force CUDA to initialize the context for real */
-	cudaFree(0);
+	cures = cudaFree(0);
+	if (STARPU_UNLIKELY(cures))
+		STARPU_CUDA_REPORT_ERROR(cures);
 
 	limit_gpu_mem_if_needed(devid);
 

+ 40 - 10
src/sched_policies/deque_modeling_policy_data_aware.c

@@ -24,8 +24,6 @@
 #include <core/perfmodel/perfmodel.h>
 #include <starpu_parameters.h>
 
-//static struct starpu_fifo_taskq_s *queue_array[STARPU_NMAXWORKERS];
-
 /* #ifdef STARPU_VERBOSE */
 /* static long int total_task_cnt = 0; */
 /* static long int ready_task_cnt = 0; */
@@ -120,7 +118,7 @@ static struct starpu_task *_starpu_fifo_pop_first_ready_task(struct starpu_fifo_
 static struct starpu_task *dmda_pop_ready_task(unsigned sched_ctx_id)
 {
 	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
-	 dmda_data *dt = (dmda_data*)sched_ctx->policy_data;
+	dmda_data *dt = (dmda_data*)sched_ctx->policy_data;
 
 	struct starpu_task *task;
 
@@ -156,7 +154,7 @@ static struct starpu_task *dmda_pop_ready_task(unsigned sched_ctx_id)
 static struct starpu_task *dmda_pop_task(unsigned sched_ctx_id)
 {
 	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
-	 dmda_data *dt = (dmda_data*)sched_ctx->policy_data;
+	dmda_data *dt = (dmda_data*)sched_ctx->policy_data;
 
 	struct starpu_task *task;
 
@@ -192,7 +190,7 @@ static struct starpu_task *dmda_pop_task(unsigned sched_ctx_id)
 static struct starpu_task *dmda_pop_every_task(unsigned sched_ctx_id)
 {
 	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
-	 dmda_data *dt = (dmda_data*)sched_ctx->policy_data;
+	dmda_data *dt = (dmda_data*)sched_ctx->policy_data;
 
 	struct starpu_task *new_list;
 
@@ -592,6 +590,34 @@ static int dmda_push_task(struct starpu_task *task, unsigned sched_ctx_id)
 	return _dmda_push_task(task, 0, sched_ctx);
 }
 
+static void initialize_dmda_policy_for_workers(unsigned sched_ctx_id, unsigned nnew_workers) 
+{
+	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
+	unsigned nworkers = sched_ctx->nworkers_in_ctx;
+	dmda_data *dt = (dmda_data*)sched_ctx->policy_data;
+
+	struct starpu_machine_config_s *config = (struct starpu_machine_config_s *)_starpu_get_machine_config();
+	unsigned ntotal_workers = config->topology.nworkers;
+
+	unsigned all_workers = nnew_workers == ntotal_workers ? ntotal_workers : nworkers + nnew_workers;
+
+	unsigned workerid_ctx;
+	for (workerid_ctx = nworkers; workerid_ctx < all_workers; workerid_ctx++)
+	{
+		dt->queue_array[workerid_ctx] = _starpu_create_fifo();
+	
+		sched_ctx->sched_mutex[workerid_ctx] = (pthread_mutex_t*)malloc(sizeof(pthread_mutex_t));
+		sched_ctx->sched_cond[workerid_ctx] = (pthread_cond_t*)malloc(sizeof(pthread_cond_t));
+		PTHREAD_MUTEX_INIT(sched_ctx->sched_mutex[workerid_ctx], NULL);
+		PTHREAD_COND_INIT(sched_ctx->sched_cond[workerid_ctx], NULL);
+	}
+
+	/* take into account the new number of threads at the next push */
+	PTHREAD_MUTEX_LOCK(&sched_ctx->changing_ctx_mutex);
+	sched_ctx->temp_nworkers_in_ctx = all_workers;
+	PTHREAD_MUTEX_UNLOCK(&sched_ctx->changing_ctx_mutex);
+}
+
 static void initialize_dmda_policy(unsigned sched_ctx_id) 
 {
 	dmda_data *dt = (dmda_data*)malloc(sizeof(dmda_data));
@@ -604,7 +630,7 @@ static void initialize_dmda_policy(unsigned sched_ctx_id)
 	unsigned nworkers = sched_ctx->nworkers_in_ctx;
 	sched_ctx->policy_data = (void*)dt;
 
-	dt->queue_array = (struct starpu_fifo_taskq_s**)malloc(nworkers*sizeof(struct starpu_fifo_taskq_s*));
+	dt->queue_array = (struct starpu_fifo_taskq_s**)malloc(STARPU_NMAXWORKERS*sizeof(struct starpu_fifo_taskq_s*));
 
 	const char *strval_alpha = getenv("STARPU_SCHED_ALPHA");
 	if (strval_alpha)
@@ -669,7 +695,8 @@ struct starpu_sched_policy_s _starpu_sched_dm_policy = {
 	.post_exec_hook = NULL,
 	.pop_every_task = dmda_pop_every_task,
 	.policy_name = "dm",
-	.policy_description = "performance model"
+	.policy_description = "performance model",
+	.init_sched_for_workers = initialize_dmda_policy_for_workers
 };
 
 struct starpu_sched_policy_s _starpu_sched_dmda_policy = {
@@ -681,7 +708,8 @@ struct starpu_sched_policy_s _starpu_sched_dmda_policy = {
 	.post_exec_hook = NULL,
 	.pop_every_task = dmda_pop_every_task,
 	.policy_name = "dmda",
-	.policy_description = "data-aware performance model"
+	.policy_description = "data-aware performance model",
+	.init_sched_for_workers = initialize_dmda_policy_for_workers
 };
 
 struct starpu_sched_policy_s _starpu_sched_dmda_sorted_policy = {
@@ -693,7 +721,8 @@ struct starpu_sched_policy_s _starpu_sched_dmda_sorted_policy = {
 	.post_exec_hook = NULL,
 	.pop_every_task = dmda_pop_every_task,
 	.policy_name = "dmdas",
-	.policy_description = "data-aware performance model (sorted)"
+	.policy_description = "data-aware performance model (sorted)",
+	.init_sched_for_workers = initialize_dmda_policy_for_workers
 };
 
 struct starpu_sched_policy_s _starpu_sched_dmda_ready_policy = {
@@ -705,5 +734,6 @@ struct starpu_sched_policy_s _starpu_sched_dmda_ready_policy = {
 	.post_exec_hook = NULL,
 	.pop_every_task = dmda_pop_every_task,
 	.policy_name = "dmdar",
-	.policy_description = "data-aware performance model (ready)"
+	.policy_description = "data-aware performance model (ready)",
+	.init_sched_for_workers = initialize_dmda_policy_for_workers
 };

+ 21 - 5
src/sched_policies/eager_central_policy.c

@@ -23,11 +23,26 @@
 #include <core/workers.h>
 #include <sched_policies/fifo_queues.h>
 
-/* the former is the actual queue, the latter some container */
-//static struct starpu_fifo_taskq_s *fifo;
+static void initialize_eager_center_policy_for_workers(unsigned sched_ctx_id, unsigned nnew_workers) 
+{
+	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
+
+	unsigned nworkers_ctx = sched_ctx->nworkers_in_ctx;
+	struct starpu_machine_config_s *config = (struct starpu_machine_config_s *)_starpu_get_machine_config();
+	unsigned ntotal_workers = config->topology.nworkers;
 
-//static pthread_cond_t sched_cond;
-//static pthread_mutex_t sched_mutex;
+	unsigned all_workers = nnew_workers == ntotal_workers ? ntotal_workers : nworkers_ctx + nnew_workers;
+
+	unsigned workerid_in_ctx;
+	for (workerid_in_ctx = nworkers_ctx; workerid_in_ctx < all_workers; workerid_in_ctx++){
+		sched_ctx->sched_mutex[workerid_in_ctx] = sched_ctx->sched_mutex[0];
+		sched_ctx->sched_cond[workerid_in_ctx] = sched_ctx->sched_cond[0];
+	}
+	/* take into account the new number of threads at the next push */
+	PTHREAD_MUTEX_LOCK(&sched_ctx->changing_ctx_mutex);
+	sched_ctx->temp_nworkers_in_ctx = all_workers;
+	PTHREAD_MUTEX_UNLOCK(&sched_ctx->changing_ctx_mutex);
+}
 
 static void initialize_eager_center_policy(unsigned sched_ctx_id) 
 {
@@ -71,7 +86,6 @@ static void deinitialize_eager_center_policy(unsigned sched_ctx_id)
 static int push_task_eager_policy(struct starpu_task *task, unsigned sched_ctx_id)
 {
 	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
-	int nworkers_in_ctx = sched_ctx->nworkers_in_ctx;
 	int i;
 	int workerid;
 	for(i = 0; i < sched_ctx->nworkers_in_ctx; i++){
@@ -119,6 +133,7 @@ static struct starpu_task *pop_task_eager_policy(unsigned sched_ctx_id)
 
 struct starpu_sched_policy_s _starpu_sched_eager_policy = {
 	.init_sched = initialize_eager_center_policy,
+	.init_sched_for_workers = initialize_eager_center_policy_for_workers,
 	.deinit_sched = deinitialize_eager_center_policy,
 	.push_task = push_task_eager_policy,
 	.push_task_notify = NULL,
@@ -132,6 +147,7 @@ struct starpu_sched_policy_s _starpu_sched_eager_policy = {
 
 struct starpu_sched_policy_s _starpu_sched_no_prio_policy = {
 	.init_sched = initialize_eager_center_policy,
+	.init_sched_for_workers = initialize_eager_center_policy_for_workers,
 	.deinit_sched = deinitialize_eager_center_policy,
 	.push_task = push_task_eager_policy,
 	.push_task_notify = NULL,

+ 24 - 8
src/sched_policies/eager_central_priority_policy.c

@@ -40,14 +40,6 @@ struct starpu_priority_taskq_s {
 	unsigned total_ntasks;
 };
 
-/* the former is the actual queue, the latter some container */
-//static struct starpu_priority_taskq_s *taskq;
-
-/* keep track of the total number of tasks to be scheduled to avoid infinite 
- * polling when there are really few tasks in the overall queue */
-//static pthread_cond_t global_sched_cond;
-//static pthread_mutex_t global_sched_mutex;
-
 /*
  * Centralized queue with priorities 
  */
@@ -74,6 +66,29 @@ static void _starpu_destroy_priority_taskq(struct starpu_priority_taskq_s *prior
 	free(priority_queue);
 }
 
+static void initialize_eager_center_priority_policy_for_workers(unsigned sched_ctx_id, unsigned nnew_workers) 
+{
+	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
+	unsigned nworkers_ctx = sched_ctx->nworkers_in_ctx;
+
+	struct starpu_machine_config_s *config = (struct starpu_machine_config_s *)_starpu_get_machine_config();
+	unsigned ntotal_workers = config->topology.nworkers;
+
+	unsigned all_workers = nnew_workers == ntotal_workers ? ntotal_workers : nworkers_ctx + nnew_workers;
+
+	unsigned workerid_ctx;
+	for (workerid_ctx = nworkers_ctx; workerid_ctx < all_workers; workerid_ctx++)
+	{
+		sched_ctx->sched_mutex[workerid_ctx] = sched_ctx->sched_mutex[0];
+		sched_ctx->sched_cond[workerid_ctx] = sched_ctx->sched_cond[0];
+	}
+
+	/* take into account the new number of threads at the next push */
+	PTHREAD_MUTEX_LOCK(&sched_ctx->changing_ctx_mutex);
+	sched_ctx->temp_nworkers_in_ctx = all_workers;
+	PTHREAD_MUTEX_UNLOCK(&sched_ctx->changing_ctx_mutex);
+}
+
 static void initialize_eager_center_priority_policy(unsigned sched_ctx_id) 
 {
 	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
@@ -181,6 +196,7 @@ static struct starpu_task *_starpu_priority_pop_task(unsigned sched_ctx_id)
 
 struct starpu_sched_policy_s _starpu_sched_prio_policy = {
 	.init_sched = initialize_eager_center_priority_policy,
+	.init_sched_for_workers = initialize_eager_center_priority_policy_for_workers,
 	.deinit_sched = deinitialize_eager_center_priority_policy,
 	/* we always use priorities in that policy */
 	.push_task = _starpu_priority_push_task,

+ 24 - 18
src/sched_policies/heft.c

@@ -30,11 +30,6 @@ typedef struct {
 	double beta;
 	double _gamma;
 	double idle_power;
-
-/* 	double *exp_start; */
-/* 	double *exp_end; */
-/* 	double *exp_len; */
-/* 	double *ntasks; */
 } heft_data;
 
 double exp_start[STARPU_NMAXWORKERS];
@@ -42,7 +37,7 @@ double exp_end[STARPU_NMAXWORKERS];
 double exp_len[STARPU_NMAXWORKERS];
 double ntasks[STARPU_NMAXWORKERS];
 
-static void heft_init_for_workers(unsigned sched_ctx_id, int nnew_workers)
+static void heft_init_for_workers(unsigned sched_ctx_id, unsigned nnew_workers)
 {
 	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
 	unsigned nworkers_ctx = sched_ctx->nworkers_in_ctx;
@@ -58,6 +53,7 @@ static void heft_init_for_workers(unsigned sched_ctx_id, int nnew_workers)
 	  {
 	    workerid = sched_ctx->workerid[workerid_ctx];
 	    struct starpu_worker_s *workerarg = _starpu_get_worker_struct(workerid);
+	    /* init these structures only once for each worker */
 	    if(workerarg->nctxs == 1)
 	      {
 		exp_start[workerid] = starpu_timing_now();
@@ -65,6 +61,10 @@ static void heft_init_for_workers(unsigned sched_ctx_id, int nnew_workers)
 		exp_end[workerid] = exp_start[workerid]; 
 		ntasks[workerid] = 0;
 	      }
+
+	    /* we push the tasks on the local lists of the workers
+	       therefore the synchronisations mechanisms of the strategy
+	       are the global ones */
 	    sched_ctx->sched_mutex[workerid_ctx] = workerarg->sched_mutex;
 	    sched_ctx->sched_cond[workerid_ctx] = workerarg->sched_cond;
 	  }
@@ -109,6 +109,7 @@ static void heft_init(unsigned sched_ctx_id)
 	  {
 	    int workerid = sched_ctx->workerid[workerid_ctx];
 	    struct starpu_worker_s *workerarg = _starpu_get_worker_struct(workerid);
+	    /* init these structures only once for each worker */
 	    if(workerarg->nctxs == 1)
 	      {
 		exp_start[workerid] = starpu_timing_now();
@@ -116,6 +117,9 @@ static void heft_init(unsigned sched_ctx_id)
 		exp_end[workerid] = exp_start[workerid]; 
 		ntasks[workerid] = 0;
 	      }
+	    /* we push the tasks on the local lists of the workers
+	       therefore the synchronisations mechanisms of the strategy
+	       are the global ones */
 	    sched_ctx->sched_mutex[workerid_ctx] = workerarg->sched_mutex;
 	    sched_ctx->sched_cond[workerid_ctx] = workerarg->sched_cond;
 
@@ -125,17 +129,20 @@ static void heft_init(unsigned sched_ctx_id)
 static void heft_post_exec_hook(struct starpu_task *task, unsigned sched_ctx_id)
 {
 	int workerid = starpu_worker_get_id();
-	struct starpu_worker_s *worker = _starpu_get_worker_struct(workerid);
-	double model = task->predicted;
-
-	/* Once we have executed the task, we can update the predicted amount
-	 * of work. */
-	PTHREAD_MUTEX_LOCK(worker->sched_mutex);
-	exp_len[workerid] -= model;
-	exp_start[workerid] = starpu_timing_now() + model;
-	exp_end[workerid] = exp_start[workerid] + exp_len[workerid];
-	ntasks[workerid]--;
-	PTHREAD_MUTEX_UNLOCK(worker->sched_mutex);
+	if(workerid >= 0)
+	  {
+		struct starpu_worker_s *worker = _starpu_get_worker_struct(workerid);
+		double model = task->predicted;
+		
+		/* Once we have executed the task, we can update the predicted amount
+		 * of work. */
+		PTHREAD_MUTEX_LOCK(worker->sched_mutex);
+		exp_len[workerid] -= model;
+		exp_start[workerid] = starpu_timing_now() + model;
+		exp_end[workerid] = exp_start[workerid] + exp_len[workerid];
+		ntasks[workerid]--;
+		PTHREAD_MUTEX_UNLOCK(worker->sched_mutex);
+	  }
 }
 
 static void heft_push_task_notify(struct starpu_task *task, int workerid, unsigned sched_ctx_id)
@@ -207,7 +214,6 @@ static void compute_all_performance_predictions(struct starpu_task *task,
   int unknown = 0;
 
   unsigned nworkers = sched_ctx->nworkers_in_ctx;
-  heft_data *hd = (heft_data*)sched_ctx->policy_data;
 
   unsigned worker, worker_in_ctx;
   for (worker_in_ctx = 0; worker_in_ctx < nworkers; worker_in_ctx++)

+ 32 - 9
src/sched_policies/random_policy.c

@@ -20,11 +20,6 @@
 #include <core/workers.h>
 #include <sched_policies/fifo_queues.h>
 
-//static unsigned nworkers;
-
-/* static pthread_cond_t sched_cond[STARPU_NMAXWORKERS]; */
-/* static pthread_mutex_t sched_mutex[STARPU_NMAXWORKERS]; */
-
 static int _random_push_task(struct starpu_task *task, unsigned prio, struct starpu_sched_ctx *sched_ctx)
 {
 	/* find the queue */
@@ -84,6 +79,32 @@ static int random_push_task(struct starpu_task *task, unsigned sched_ctx_id)
         return _random_push_task(task, 0, sched_ctx);
 }
 
+static void initialize_random_policy_for_workers(unsigned sched_ctx_id, unsigned nnew_workers) 
+{
+	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
+
+	unsigned nworkers_ctx = sched_ctx->nworkers_in_ctx;
+
+	struct starpu_machine_config_s *config = (struct starpu_machine_config_s *)_starpu_get_machine_config();
+	unsigned ntotal_workers = config->topology.nworkers;
+
+	unsigned all_workers = nnew_workers == ntotal_workers ? ntotal_workers : nworkers_ctx + nnew_workers;
+
+	unsigned workerid_ctx;
+	int workerid;
+	for (workerid_ctx = nworkers_ctx; workerid_ctx < all_workers; workerid_ctx++)
+	{
+		workerid = sched_ctx->workerid[workerid_ctx];
+		struct starpu_worker_s *workerarg = _starpu_get_worker_struct(workerid);
+		sched_ctx->sched_mutex[workerid_ctx] = workerarg->sched_mutex;
+		sched_ctx->sched_cond[workerid_ctx] = workerarg->sched_cond;
+	}
+	/* take into account the new number of threads at the next push */
+	PTHREAD_MUTEX_LOCK(&sched_ctx->changing_ctx_mutex);
+	sched_ctx->temp_nworkers_in_ctx = all_workers;
+	PTHREAD_MUTEX_UNLOCK(&sched_ctx->changing_ctx_mutex);
+}
+
 static void initialize_random_policy(unsigned sched_ctx_id) 
 {
 	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
@@ -93,17 +114,19 @@ static void initialize_random_policy(unsigned sched_ctx_id)
 	unsigned nworkers = sched_ctx->nworkers_in_ctx;	
 
 	unsigned workerid_ctx;
+	int workerid;
 	for (workerid_ctx = 0; workerid_ctx < nworkers; workerid_ctx++)
 	{
-		sched_ctx->sched_mutex[workerid_ctx] = (pthread_mutex_t*)malloc(sizeof(pthread_mutex_t));
-		sched_ctx->sched_cond[workerid_ctx] = (pthread_cond_t*)malloc(sizeof(pthread_cond_t));
-		PTHREAD_MUTEX_INIT(sched_ctx->sched_mutex[workerid_ctx], NULL);
-		PTHREAD_COND_INIT(sched_ctx->sched_cond[workerid_ctx], NULL);
+		workerid = sched_ctx->workerid[workerid_ctx];
+		struct starpu_worker_s *workerarg = _starpu_get_worker_struct(workerid);
+		sched_ctx->sched_mutex[workerid_ctx] = workerarg->sched_mutex;
+		sched_ctx->sched_cond[workerid_ctx] = workerarg->sched_cond;
 	}
 }
 
 struct starpu_sched_policy_s _starpu_sched_random_policy = {
 	.init_sched = initialize_random_policy,
+	.init_sched_for_workers = initialize_random_policy_for_workers,
 	.deinit_sched = NULL,
 	.push_task = random_push_task,
 	.push_prio_task = random_push_prio_task,

+ 32 - 15
src/sched_policies/work_stealing_policy.c

@@ -20,21 +20,12 @@
 #include <core/workers.h>
 #include <sched_policies/deque_queues.h>
 
-//static unsigned nworkers;
-//static unsigned rr_worker;
-//static struct starpu_deque_jobq_s *queue_array[STARPU_NMAXWORKERS];
-
-/* static pthread_mutex_t global_sched_mutex; */
-/* static pthread_cond_t global_sched_cond; */
-
-/* keep track of the work performed from the beginning of the algorithm to make
- * better decisions about which queue to select when stealing or deferring work
- */
-//static unsigned performed_total = 0;
-
 typedef struct {
 	struct starpu_deque_jobq_s **queue_array;
 	unsigned rr_worker;
+	/* keep track of the work performed from the beginning of the algorithm to make
+	 * better decisions about which queue to select when stealing or deferring work
+	 */
 	unsigned performed_total;
 } work_stealing_data;
 
@@ -206,6 +197,32 @@ int ws_push_task(struct starpu_task *task, unsigned sched_ctx_id)
         return 0;
 }
 
+static void initialize_ws_policy_for_workers(unsigned sched_ctx_id, unsigned nnew_workers) 
+{
+	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
+	work_stealing_data *ws = (work_stealing_data*)sched_ctx->policy_data;
+
+	unsigned nworkers_ctx = sched_ctx->nworkers_in_ctx;
+
+	struct starpu_machine_config_s *config = (struct starpu_machine_config_s *)_starpu_get_machine_config();
+	unsigned ntotal_workers = config->topology.nworkers;
+
+	unsigned all_workers = nnew_workers == ntotal_workers ? ntotal_workers : nworkers_ctx + nnew_workers;
+
+	unsigned workerid_ctx;
+	for (workerid_ctx = nworkers_ctx; workerid_ctx < all_workers; workerid_ctx++)
+	  {
+		ws->queue_array[workerid_ctx] = _starpu_create_deque();
+
+		sched_ctx->sched_mutex[workerid_ctx] = sched_ctx->sched_mutex[0];
+		sched_ctx->sched_cond[workerid_ctx] = sched_ctx->sched_cond[0];
+	}
+	/* take into account the new number of threads at the next push */
+	PTHREAD_MUTEX_LOCK(&sched_ctx->changing_ctx_mutex);
+	sched_ctx->temp_nworkers_in_ctx = all_workers;
+	PTHREAD_MUTEX_UNLOCK(&sched_ctx->changing_ctx_mutex);
+}
+
 static void initialize_ws_policy(unsigned sched_ctx_id) 
 {
 	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
@@ -214,8 +231,7 @@ static void initialize_ws_policy(unsigned sched_ctx_id)
 	
 	unsigned nworkers = sched_ctx->nworkers_in_ctx;
 	ws->rr_worker = 0;
-	ws->queue_array = (struct starpu_deque_jobq_s**)malloc(nworkers*sizeof(struct starpu_deque_jobq_s*));
-
+	ws->queue_array = (struct starpu_deque_jobq_s**)malloc(STARPU_NMAXWORKERS*sizeof(struct starpu_deque_jobq_s*));
 
 	pthread_mutex_t *sched_mutex = (pthread_mutex_t*) malloc(sizeof(pthread_mutex_t));
 	pthread_cond_t *sched_cond = (pthread_cond_t*) malloc(sizeof(pthread_cond_t));
@@ -241,5 +257,6 @@ struct starpu_sched_policy_s _starpu_sched_ws_policy = {
 	.post_exec_hook = NULL,
 	.pop_every_task = NULL,
 	.policy_name = "ws",
-	.policy_description = "work stealing"
+	.policy_description = "work stealing",
+	.init_sched_for_workers = initialize_ws_policy_for_workers
 };