Преглед на файлове

change global variables into local variables

Andra Hugo преди 14 години
родител
ревизия
30f2913472
променени са 7 файла, в които са добавени 128 реда и са изтрити 82 реда
  1. 4 3
      src/core/sched_ctx.c
  2. 27 8
      src/core/sched_ctx.h
  3. 2 2
      src/core/sched_policy.c
  4. 2 4
      src/core/task.c
  5. 1 0
      src/core/workers.c
  6. 2 1
      src/drivers/cpu/driver_cpu.c
  7. 90 64
      src/sched_policies/heft.c

+ 4 - 3
src/core/sched_ctx.c

@@ -18,6 +18,8 @@ int _starpu_create_sched_ctx(const char *policy_name, int *workerids_in_ctx,
 
 	struct starpu_sched_ctx *sched_ctx = &config->sched_ctxs[config->topology.nsched_ctxs];
 
+	sched_ctx->sched_ctx_id = config->topology.nsched_ctxs;
+
 	int nworkers = config->topology.nworkers;
 	
 	STARPU_ASSERT(nworkerids_in_ctx <= nworkers);
@@ -64,9 +66,8 @@ int _starpu_create_sched_ctx(const char *policy_name, int *workerids_in_ctx,
 	  }
 
 	_starpu_init_sched_policy(config, sched_ctx, policy_name);
-	
-	sched_ctx->sched_ctx_id = config->topology.nsched_ctxs;
-	config->topology.nsched_ctxs++;
+
+	config->topology.nsched_ctxs++;	
 
  	return sched_ctx->sched_ctx_id;
 }

+ 27 - 8
src/core/sched_ctx.h

@@ -21,14 +21,33 @@
 #include <starpu_scheduler.h>
 
 struct starpu_sched_ctx {
-	int sched_ctx_id;
-	struct starpu_sched_policy_s *sched_policy; /* policy of the contex */
-	int workerid[STARPU_NMAXWORKERS]; /* list of indices of workers */
-	int nworkers_in_ctx; /* number of threads in contex */
-	unsigned is_initial_sched; /* we keep an initial sched which we never delete */
-	pthread_cond_t submitted_cond; /* cond used for no of submitted tasks to a sched_ctx */
-	pthread_mutex_t submitted_mutex; /* mut used for no of submitted tasks to a sched_ctx */
-	int nsubmitted;	 /* counter used for no of submitted tasks to a sched_ctx */
+	unsigned sched_ctx_id;
+
+	/* policy of the context */
+	struct starpu_sched_policy_s *sched_policy;
+
+	/* data necessary for the policy */
+	void *policy_data;
+	
+	/* list of indices of workers */
+	int workerid[STARPU_NMAXWORKERS]; 
+	
+	/* number of threads in contex */
+	int nworkers_in_ctx; 
+
+	/* we keep an initial sched which we never delete */
+	unsigned is_initial_sched; 
+
+	/* cond used for no of submitted tasks to a sched_ctx */
+	pthread_cond_t submitted_cond; 
+	
+	/* mut used for no of submitted tasks to a sched_ctx */
+	pthread_mutex_t submitted_mutex; 
+	
+	/* counter used for no of submitted tasks to a sched_ctx */
+	int nsubmitted;	 
+	
+	/* name of context */
 	const char *sched_name;
 };
 

+ 2 - 2
src/core/sched_policy.c

@@ -247,7 +247,7 @@ static int _starpu_push_task_on_specific_worker(struct starpu_task *task, int wo
 	unsigned i;
 	for(i = 0; i < worker->nctxs; i++){
 		if (worker->sched_ctx[i]->sched_policy->push_task_notify){
-			worker->sched_ctx[i]->sched_policy->push_task_notify(task, workerid);
+		  worker->sched_ctx[i]->sched_policy->push_task_notify(task, workerid, worker->sched_ctx[i]->sched_ctx_id);
 		}
 	}
 
@@ -382,7 +382,7 @@ void _starpu_sched_post_exec_hook(struct starpu_task *task)
 {
 	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(task->sched_ctx);
 	if (sched_ctx->sched_policy->post_exec_hook)
-		sched_ctx->sched_policy->post_exec_hook(task);
+		sched_ctx->sched_policy->post_exec_hook(task, sched_ctx->sched_ctx_id);
 }
 
 void _starpu_wait_on_sched_event(void)

+ 2 - 4
src/core/task.c

@@ -219,10 +219,8 @@ int _starpu_submit_job(starpu_job_t j, unsigned do_not_increment_nsubmitted)
 /* application should submit new tasks to StarPU through this function */
 int starpu_task_submit_to_ctx(struct starpu_task *task, unsigned sched_ctx)
 {
-	unsigned init_sched_ctx = _starpu_get_initial_sched_ctx()->sched_ctx_id;
-        if(task->sched_ctx ==  init_sched_ctx && sched_ctx != init_sched_ctx)
-	  task->sched_ctx = sched_ctx;
-
+	task->sched_ctx = sched_ctx;
+	  
 	int ret;
 	unsigned is_sync = task->synchronous;
         _STARPU_LOG_IN();

+ 1 - 0
src/core/workers.c

@@ -622,6 +622,7 @@ struct starpu_worker_s *_starpu_get_worker_struct(unsigned id)
 
 struct starpu_sched_ctx *_starpu_get_sched_ctx(unsigned id)
 {
+	STARPU_ASSERT(id <= config.topology.nsched_ctxs);
 	return &config.sched_ctxs[id];
 }
 

+ 2 - 1
src/drivers/cpu/driver_cpu.c

@@ -50,6 +50,7 @@ static int execute_job_on_cpu(starpu_job_t j, struct starpu_worker_s *cpu_args,
 		{
 			/* there was not enough memory so the codelet cannot be executed right now ... */
 			/* push the codelet back and try another one ... */
+			STARPU_ASSERT(ret == 0);
 			return -EAGAIN;
 		}
 	}
@@ -171,7 +172,6 @@ void *_starpu_cpu_worker(void *arg)
 		PTHREAD_MUTEX_LOCK(sched_mutex);
 
 		task = _starpu_pop_task(cpu_arg);
-	
                 if (!task) 
 		{
 		   if (_starpu_worker_can_block(memnode))
@@ -184,6 +184,7 @@ void *_starpu_cpu_worker(void *arg)
 		PTHREAD_MUTEX_UNLOCK(sched_mutex);	
 
 		STARPU_ASSERT(task);
+		STARPU_ASSERT(task->sched_ctx < 2);
 		j = _starpu_get_job_associated_to_task(task);
 	
 		/* can a cpu perform that task ? */

+ 90 - 64
src/sched_policies/heft.c

@@ -25,109 +25,130 @@
 #include <starpu_parameters.h>
 #include <starpu_task_bundle.h>
 
-//static unsigned nworkers;
+typedef struct {
+  double alpha;
+  double beta;
+  double _gamma;
+  double idle_power;
 
-static pthread_cond_t sched_cond[STARPU_NMAXWORKERS];
-static pthread_mutex_t sched_mutex[STARPU_NMAXWORKERS];
+  double exp_start[STARPU_NMAXWORKERS];
+  double exp_end[STARPU_NMAXWORKERS];
+  double exp_len[STARPU_NMAXWORKERS];
+  double ntasks[STARPU_NMAXWORKERS];
 
-static double alpha = STARPU_DEFAULT_ALPHA;
-static double beta = STARPU_DEFAULT_BETA;
-static double _gamma = STARPU_DEFAULT_GAMMA;
-static double idle_power = 0.0;
-
-static double exp_start[STARPU_NMAXWORKERS];
-static double exp_end[STARPU_NMAXWORKERS];
-static double exp_len[STARPU_NMAXWORKERS];
-static double ntasks[STARPU_NMAXWORKERS];
+} heft_data;
 
 static void heft_init(unsigned sched_ctx_id)
 {
+	heft_data *hd = (heft_data*)malloc(sizeof(heft_data));
+	hd->alpha = STARPU_DEFAULT_ALPHA;
+	hd->beta = STARPU_DEFAULT_BETA;
+	hd->_gamma = STARPU_DEFAULT_GAMMA;
+	hd->idle_power = 0.0;
+	
 	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
+
 	unsigned nworkers = sched_ctx->nworkers_in_ctx;
+	sched_ctx->policy_data = (void*)hd;
 
 	const char *strval_alpha = getenv("STARPU_SCHED_ALPHA");
 	if (strval_alpha)
-		alpha = atof(strval_alpha);
+		hd->alpha = atof(strval_alpha);
 
 	const char *strval_beta = getenv("STARPU_SCHED_BETA");
 	if (strval_beta)
-		beta = atof(strval_beta);
+		hd->beta = atof(strval_beta);
 
 	const char *strval_gamma = getenv("STARPU_SCHED_GAMMA");
 	if (strval_gamma)
-		_gamma = atof(strval_gamma);
+		hd->_gamma = atof(strval_gamma);
 
 	const char *strval_idle_power = getenv("STARPU_IDLE_POWER");
 	if (strval_idle_power)
-		idle_power = atof(strval_idle_power);
+		hd->idle_power = atof(strval_idle_power);
 
 	unsigned workerid, workerid_ctx;
-        for (workerid_ctx = 0; workerid_ctx < nworkers; workerid_ctx++)
+
+	for (workerid_ctx = 0; workerid_ctx < nworkers; workerid_ctx++)
 	  {
-	        workerid = sched_ctx->workerid[workerid_ctx];
-		exp_start[workerid] = starpu_timing_now();
-		exp_len[workerid] = 0.0;
-		exp_end[workerid] = exp_start[workerid]; 
-		ntasks[workerid] = 0;
-
-		PTHREAD_MUTEX_INIT(&sched_mutex[workerid], NULL);
-		PTHREAD_COND_INIT(&sched_cond[workerid], NULL);
-	
-		starpu_worker_set_sched_condition(workerid, &sched_cond[workerid], &sched_mutex[workerid]);
-	}
+	    workerid = sched_ctx->workerid[workerid_ctx];
+	    hd->exp_start[workerid] = starpu_timing_now();
+	    hd->exp_len[workerid] = 0.0;
+	    hd->exp_end[workerid] = hd->exp_start[workerid]; 
+	    hd->ntasks[workerid] = 0;
+	    
+	    pthread_cond_t *sched_cond = malloc(sizeof(pthread_cond_t));
+	    pthread_mutex_t *sched_mutex = malloc(sizeof(pthread_mutex_t));
+	    
+	    PTHREAD_MUTEX_INIT(sched_mutex, NULL);
+	    PTHREAD_COND_INIT(sched_cond, NULL);
+	    
+	    starpu_worker_set_sched_condition(workerid, sched_cond, sched_mutex);
+	  }
 }
 
-static void heft_post_exec_hook(struct starpu_task *task)
+static void heft_post_exec_hook(struct starpu_task *task, unsigned sched_ctx_id)
 {
 	int workerid = starpu_worker_get_id();
 	double model = task->predicted;
-	
+	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
+	heft_data *hd = (heft_data*)sched_ctx->policy_data;
+
+	struct starpu_worker_s *worker = _starpu_get_worker_struct(workerid);
+
 	/* Once we have executed the task, we can update the predicted amount
 	 * of work. */
-	PTHREAD_MUTEX_LOCK(&sched_mutex[workerid]);
-	exp_len[workerid] -= model;
-	exp_start[workerid] = starpu_timing_now() + model;
-	exp_end[workerid] = exp_start[workerid] + exp_len[workerid];
-	ntasks[workerid]--;
-	PTHREAD_MUTEX_UNLOCK(&sched_mutex[workerid]);
+	PTHREAD_MUTEX_LOCK(worker->sched_mutex);
+	hd->exp_len[workerid] -= model;
+	hd->exp_start[workerid] = starpu_timing_now() + model;
+	hd->exp_end[workerid] = hd->exp_start[workerid] + hd->exp_len[workerid];
+	hd->ntasks[workerid]--;
+	PTHREAD_MUTEX_UNLOCK(worker->sched_mutex);
 }
 
-static void heft_push_task_notify(struct starpu_task *task, int workerid)
+static void heft_push_task_notify(struct starpu_task *task, int workerid, unsigned sched_ctx_id)
 {
+	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
+	heft_data *hd = (heft_data*)sched_ctx->policy_data;
+
 	/* Compute the expected penality */
 	enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(workerid);
 	double predicted = starpu_task_expected_length(task, perf_arch);
 
+	struct starpu_worker_s *worker = _starpu_get_worker_struct(workerid);
 	/* Update the predictions */
-	PTHREAD_MUTEX_LOCK(&sched_mutex[workerid]);
+	PTHREAD_MUTEX_LOCK(worker->sched_mutex);
 
 	/* Sometimes workers didn't take the tasks as early as we expected */
-	exp_start[workerid] = STARPU_MAX(exp_start[workerid], starpu_timing_now());
-	exp_end[workerid] = STARPU_MAX(exp_start[workerid], starpu_timing_now());
+	hd->exp_start[workerid] = STARPU_MAX(hd->exp_start[workerid], starpu_timing_now());
+	hd->exp_end[workerid] = STARPU_MAX(hd->exp_start[workerid], starpu_timing_now());
 
 	/* If there is no prediction available, we consider the task has a null length */
 	if (predicted != -1.0)
 	{
 		task->predicted = predicted;
-		exp_end[workerid] += predicted;
-		exp_len[workerid] += predicted;
+		hd->exp_end[workerid] += predicted;
+		hd->exp_len[workerid] += predicted;
 	}
 
-	ntasks[workerid]++;
+	hd->ntasks[workerid]++;
 
-	PTHREAD_MUTEX_UNLOCK(&sched_mutex[workerid]);
+	PTHREAD_MUTEX_UNLOCK(worker->sched_mutex);
 }
 
-static int push_task_on_best_worker(struct starpu_task *task, int best_workerid, double predicted, int prio)
+static int push_task_on_best_worker(struct starpu_task *task, int best_workerid, double predicted, int prio, struct starpu_sched_ctx *sched_ctx)
 {
+	heft_data *hd = (heft_data*)sched_ctx->policy_data;
+
 	/* make sure someone coule execute that task ! */
 	STARPU_ASSERT(best_workerid != -1);
+	struct starpu_worker_s *worker = _starpu_get_worker_struct(best_workerid);
 
-	PTHREAD_MUTEX_LOCK(&sched_mutex[best_workerid]);
-	exp_end[best_workerid] += predicted;
-	exp_len[best_workerid] += predicted;
-	ntasks[best_workerid]++;
-	PTHREAD_MUTEX_UNLOCK(&sched_mutex[best_workerid]);
+	PTHREAD_MUTEX_LOCK(worker->sched_mutex);
+	hd->exp_end[best_workerid] += predicted;
+	hd->exp_len[best_workerid] += predicted;
+	hd->ntasks[best_workerid]++;
+	PTHREAD_MUTEX_UNLOCK(worker->sched_mutex);
 
 	task->predicted = predicted;
 
@@ -158,14 +179,15 @@ static void compute_all_performance_predictions(struct starpu_task *task,
   int unknown = 0;
 
   unsigned nworkers = sched_ctx->nworkers_in_ctx;
+  heft_data *hd = (heft_data*)sched_ctx->policy_data;
 
   unsigned worker, worker_in_ctx;
   for (worker_in_ctx = 0; worker_in_ctx < nworkers; worker_in_ctx++)
     {
       worker = sched_ctx->workerid[worker_in_ctx];
       /* Sometimes workers didn't take the tasks as early as we expected */
-      exp_start[worker] = STARPU_MAX(exp_start[worker], starpu_timing_now());
-      exp_end[worker_in_ctx] = exp_start[worker] + exp_len[worker];
+      hd->exp_start[worker] = STARPU_MAX(hd->exp_start[worker], starpu_timing_now());
+      exp_end[worker_in_ctx] = hd->exp_start[worker] + hd->exp_len[worker];
       if (exp_end[worker_in_ctx] > max_exp_end)
  	max_exp_end = exp_end[worker_in_ctx];
 
@@ -190,9 +212,9 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 	local_power[worker_in_ctx] = starpu_task_expected_power(task, perf_arch);
       }
 
-      /* printf("%d(in ctx%d): local task len = %2.2f locald data penalty = %2.2f local_power = %2.2f\n", worker, worker_in_ctx, local_task_length[worker_in_ctx], local_data_penalty[worker_in_ctx], local_power[worker_in_ctx]); */
+/*       printf("%d(in ctx%d): local task len = %2.2f locald data penalty = %2.2f local_power = %2.2f\n", worker, worker_in_ctx, local_task_length[worker_in_ctx], local_data_penalty[worker_in_ctx], local_power[worker_in_ctx]); */
 
-      double ntasks_end = ntasks[worker] / starpu_worker_get_relative_speedup(perf_arch);
+      double ntasks_end = hd->ntasks[worker] / starpu_worker_get_relative_speedup(perf_arch);
 
       if (ntasks_best == -1
 	  || (!calibrating && ntasks_end < ntasks_best_end) /* Not calibrating, take better task */
@@ -217,7 +239,7 @@ static void compute_all_performance_predictions(struct starpu_task *task,
       if (unknown)
 	continue;
 
-      exp_end[worker_in_ctx] = exp_start[worker] + exp_len[worker] + local_task_length[worker_in_ctx];
+      exp_end[worker_in_ctx] = hd->exp_start[worker] + hd->exp_len[worker] + local_task_length[worker_in_ctx];
 
       if (exp_end[worker_in_ctx] < best_exp_end)
 	{
@@ -237,6 +259,7 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 static int _heft_push_task(struct starpu_task *task, unsigned prio, unsigned sched_ctx_id)
 {
 	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
+	heft_data *hd = (heft_data*)sched_ctx->policy_data;
 	unsigned worker, worker_in_ctx;
 	int best = -1, best_id_in_ctx = -1;
 	
@@ -269,7 +292,7 @@ static int _heft_push_task(struct starpu_task *task, unsigned prio, unsigned sch
 	 * want to speed-up calibration time so we force this measurement */
 	if (forced_best != -1){
 		_starpu_increment_nsubmitted_tasks_of_worker(forced_best);
-		return push_task_on_best_worker(task, forced_best, 0.0, prio);
+		return push_task_on_best_worker(task, forced_best, 0.0, prio, sched_ctx);
 	}
 
 	/*
@@ -290,15 +313,15 @@ static int _heft_push_task(struct starpu_task *task, unsigned prio, unsigned sch
 			continue;
 		}
 
-		fitness[worker_in_ctx] = alpha*(exp_end[worker_in_ctx] - best_exp_end) 
-				+ beta*(local_data_penalty[worker_in_ctx])
-				+ _gamma*(local_power[worker_in_ctx]);
+		fitness[worker_in_ctx] = hd->alpha*(exp_end[worker_in_ctx] - best_exp_end) 
+				+ hd->beta*(local_data_penalty[worker_in_ctx])
+				+ hd->_gamma*(local_power[worker_in_ctx]);
 
 		if (exp_end[worker_in_ctx] > max_exp_end)
 			/* This placement will make the computation
 			 * longer, take into account the idle
 			 * consumption of other cpus */
-			fitness[worker_in_ctx] += _gamma * idle_power * (exp_end[worker_in_ctx] - max_exp_end) / 1000000.0;
+			fitness[worker_in_ctx] += hd->_gamma * hd->idle_power * (exp_end[worker_in_ctx] - max_exp_end) / 1000000.0;
 
 		if (best == -1 || fitness[worker_in_ctx] < best_fitness)
 		{
@@ -307,7 +330,6 @@ static int _heft_push_task(struct starpu_task *task, unsigned prio, unsigned sch
 			best = worker;
 			best_id_in_ctx = worker_in_ctx;
 		}
-
 	}
 
 
@@ -341,7 +363,7 @@ static int _heft_push_task(struct starpu_task *task, unsigned prio, unsigned sch
 	}
 
 	_starpu_increment_nsubmitted_tasks_of_worker(best);
-	return push_task_on_best_worker(task, best, model_best, prio);
+	return push_task_on_best_worker(task, best, model_best, prio, sched_ctx);
 }
 
 static int heft_push_prio_task(struct starpu_task *task, unsigned sched_ctx_id)
@@ -365,9 +387,13 @@ static void heft_deinit(unsigned sched_ctx_id)
 	int nworkers = sched_ctx->nworkers_in_ctx;
 	for (workerid_in_ctx = 0; workerid_in_ctx < nworkers; workerid_in_ctx++){
 	        workerid = sched_ctx->workerid[workerid_in_ctx];
-		PTHREAD_MUTEX_DESTROY(&sched_mutex[workerid]);
-		PTHREAD_COND_DESTROY(&sched_cond[workerid]);
+		struct starpu_worker_s *worker = _starpu_get_worker_struct(workerid);
+		PTHREAD_MUTEX_DESTROY(worker->sched_mutex);
+		PTHREAD_COND_DESTROY(worker->sched_cond);
+		free(worker->sched_mutex);
+		free(worker->sched_cond);
 	}
+	free(sched_ctx->policy_data);
 }
 
 struct starpu_sched_policy_s heft_policy = {