Ver código fonte

heft change expected values from local to global

Andra Hugo 14 anos atrás
pai
commit
3538532df3
2 arquivos alterados com 66 adições e 102 exclusões
  1. 3 2
      src/drivers/cuda/driver_cuda.c
  2. 63 100
      src/sched_policies/heft.c

+ 3 - 2
src/drivers/cuda/driver_cuda.c

@@ -246,7 +246,9 @@ void *_starpu_cuda_worker(void *arg)
 
 	_starpu_set_local_worker_key(args);
 
+	PTHREAD_MUTEX_LOCK(args->sched_mutex);
 	init_context(devid);
+	PTHREAD_MUTEX_UNLOCK(args->sched_mutex);
 
 	/* one more time to avoid hacks from third party lib :) */
 	_starpu_bind_thread_on_cpu(args->config, args->bindid);
@@ -296,10 +298,9 @@ void *_starpu_cuda_worker(void *arg)
 
                 PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
 
-
 		task = _starpu_pop_task(args);
 
-                if (task == NULL) 
+                if (!task) 
 		{
 			PTHREAD_MUTEX_LOCK(sched_mutex);
 			if (_starpu_worker_can_block(memnode))

+ 63 - 100
src/sched_policies/heft.c

@@ -31,38 +31,42 @@ typedef struct {
 	double _gamma;
 	double idle_power;
 
-	double *exp_start;
-	double *exp_end;
-	double *exp_len;
-	double *ntasks;
+/* 	double *exp_start; */
+/* 	double *exp_end; */
+/* 	double *exp_len; */
+/* 	double *ntasks; */
 } heft_data;
 
+double exp_start[STARPU_NMAXWORKERS];
+double exp_end[STARPU_NMAXWORKERS];
+double exp_len[STARPU_NMAXWORKERS];
+double ntasks[STARPU_NMAXWORKERS];
+
 static void heft_init_for_workers(unsigned sched_ctx_id, int nnew_workers)
 {
 	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
-
 	unsigned nworkers_ctx = sched_ctx->nworkers_in_ctx;
-	heft_data *hd = (heft_data*)sched_ctx->policy_data;
 
-	//	unsigned initial_nworkers = nworkers_ctx - nnew_workers;
 	struct starpu_machine_config_s *config = (struct starpu_machine_config_s *)_starpu_get_machine_config();
-
 	unsigned ntotal_workers = config->topology.nworkers;
 
 	unsigned all_workers = nnew_workers == ntotal_workers ? ntotal_workers : nworkers_ctx + nnew_workers;
 
 	unsigned workerid_ctx;
+	int workerid;
 	for (workerid_ctx = nworkers_ctx; workerid_ctx < all_workers; workerid_ctx++)
 	  {
-	    hd->exp_start[workerid_ctx] = starpu_timing_now();
-	    hd->exp_len[workerid_ctx] = 0.0;
-	    hd->exp_end[workerid_ctx] = hd->exp_start[workerid_ctx]; 
-	    hd->ntasks[workerid_ctx] = 0;
-	    	    
-	    sched_ctx->sched_mutex[workerid_ctx] = (pthread_mutex_t*)malloc(sizeof(pthread_mutex_t));
-	    sched_ctx->sched_cond[workerid_ctx] = (pthread_cond_t*)malloc(sizeof(pthread_cond_t));
-	    PTHREAD_MUTEX_INIT(sched_ctx->sched_mutex[workerid_ctx], NULL);
-	    PTHREAD_COND_INIT(sched_ctx->sched_cond[workerid_ctx], NULL);
+	    workerid = sched_ctx->workerid[workerid_ctx];
+	    struct starpu_worker_s *workerarg = _starpu_get_worker_struct(workerid);
+	    if(workerarg->nctxs == 1)
+	      {
+		exp_start[workerid] = starpu_timing_now();
+		exp_len[workerid] = 0.0;
+		exp_end[workerid] = exp_start[workerid]; 
+		ntasks[workerid] = 0;
+	      }
+	    sched_ctx->sched_mutex[workerid_ctx] = workerarg->sched_mutex;
+	    sched_ctx->sched_cond[workerid_ctx] = workerarg->sched_cond;
 	  }
 
 	/* take into account the new number of threads at the next push */
@@ -83,11 +87,6 @@ static void heft_init(unsigned sched_ctx_id)
 	unsigned nworkers = sched_ctx->nworkers_in_ctx;
 	sched_ctx->policy_data = (void*)hd;
 
-	hd->exp_start = (double*)malloc(STARPU_NMAXWORKERS*sizeof(double));
-	hd->exp_end = (double*)malloc(STARPU_NMAXWORKERS*sizeof(double));
-	hd->exp_len = (double*)malloc(STARPU_NMAXWORKERS*sizeof(double));
-	hd->ntasks = (double*)malloc(STARPU_NMAXWORKERS*sizeof(double));
-
 	const char *strval_alpha = getenv("STARPU_SCHED_ALPHA");
 	if (strval_alpha)
 		hd->alpha = atof(strval_alpha);
@@ -108,80 +107,76 @@ static void heft_init(unsigned sched_ctx_id)
 
 	for (workerid_ctx = 0; workerid_ctx < nworkers; workerid_ctx++)
 	  {
-	    hd->exp_start[workerid_ctx] = starpu_timing_now();
-	    hd->exp_len[workerid_ctx] = 0.0;
-	    hd->exp_end[workerid_ctx] = hd->exp_start[workerid_ctx]; 
-	    hd->ntasks[workerid_ctx] = 0;
-	    	    
-	    sched_ctx->sched_mutex[workerid_ctx] = (pthread_mutex_t*)malloc(sizeof(pthread_mutex_t));
-	    sched_ctx->sched_cond[workerid_ctx] = (pthread_cond_t*)malloc(sizeof(pthread_cond_t));
-	    PTHREAD_MUTEX_INIT(sched_ctx->sched_mutex[workerid_ctx], NULL);
-	    PTHREAD_COND_INIT(sched_ctx->sched_cond[workerid_ctx], NULL);
+	    int workerid = sched_ctx->workerid[workerid_ctx];
+	    struct starpu_worker_s *workerarg = _starpu_get_worker_struct(workerid);
+	    if(workerarg->nctxs == 1)
+	      {
+		exp_start[workerid] = starpu_timing_now();
+		exp_len[workerid] = 0.0;
+		exp_end[workerid] = exp_start[workerid]; 
+		ntasks[workerid] = 0;
+	      }
+	    sched_ctx->sched_mutex[workerid_ctx] = workerarg->sched_mutex;
+	    sched_ctx->sched_cond[workerid_ctx] = workerarg->sched_cond;
+
 	  }
 }
 
 static void heft_post_exec_hook(struct starpu_task *task, unsigned sched_ctx_id)
 {
 	int workerid = starpu_worker_get_id();
-	int workerid_ctx =  _starpu_get_index_in_ctx_of_workerid(sched_ctx_id, workerid);
-
+	struct starpu_worker_s *worker = _starpu_get_worker_struct(workerid);
 	double model = task->predicted;
-	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
-	heft_data *hd = (heft_data*)sched_ctx->policy_data;
 
 	/* Once we have executed the task, we can update the predicted amount
 	 * of work. */
-	PTHREAD_MUTEX_LOCK(sched_ctx->sched_mutex[workerid_ctx]);
-	hd->exp_len[workerid_ctx] -= model;
-	hd->exp_start[workerid_ctx] = starpu_timing_now() + model;
-	hd->exp_end[workerid_ctx] = hd->exp_start[workerid_ctx] + hd->exp_len[workerid_ctx];
-	hd->ntasks[workerid_ctx]--;
-	PTHREAD_MUTEX_UNLOCK(sched_ctx->sched_mutex[workerid_ctx]);
+	PTHREAD_MUTEX_LOCK(worker->sched_mutex);
+	exp_len[workerid] -= model;
+	exp_start[workerid] = starpu_timing_now() + model;
+	exp_end[workerid] = exp_start[workerid] + exp_len[workerid];
+	ntasks[workerid]--;
+	PTHREAD_MUTEX_UNLOCK(worker->sched_mutex);
 }
 
 static void heft_push_task_notify(struct starpu_task *task, int workerid, unsigned sched_ctx_id)
 {
-	int workerid_ctx =  _starpu_get_index_in_ctx_of_workerid(sched_ctx_id, workerid);
-	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
-	heft_data *hd = (heft_data*)sched_ctx->policy_data;
-
+	struct starpu_worker_s *worker = _starpu_get_worker_struct(workerid);
 	/* Compute the expected penality */
 	enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(workerid);
 	double predicted = starpu_task_expected_length(task, perf_arch);
 
 	/* Update the predictions */
-	PTHREAD_MUTEX_LOCK(sched_ctx->sched_mutex[workerid_ctx]);
+	PTHREAD_MUTEX_LOCK(worker->sched_mutex);
 
 	/* Sometimes workers didn't take the tasks as early as we expected */
-	hd->exp_start[workerid_ctx] = STARPU_MAX(hd->exp_start[workerid_ctx], starpu_timing_now());
-	hd->exp_end[workerid_ctx] = STARPU_MAX(hd->exp_start[workerid_ctx], starpu_timing_now());
+	exp_start[workerid] = STARPU_MAX(exp_start[workerid], starpu_timing_now());
+	exp_end[workerid] = STARPU_MAX(exp_start[workerid], starpu_timing_now());
 
 	/* If there is no prediction available, we consider the task has a null length */
 	if (predicted != -1.0)
 	{
 		task->predicted = predicted;
-		hd->exp_end[workerid_ctx] += predicted;
-		hd->exp_len[workerid_ctx] += predicted;
+		exp_end[workerid] += predicted;
+		exp_len[workerid] += predicted;
 	}
 
-	hd->ntasks[workerid_ctx]++;
+	ntasks[workerid]++;
 
-	PTHREAD_MUTEX_UNLOCK(sched_ctx->sched_mutex[workerid_ctx]);
+	PTHREAD_MUTEX_UNLOCK(worker->sched_mutex);
 }
 
-static int push_task_on_best_worker(struct starpu_task *task, int best_workerid, double predicted, int prio, struct starpu_sched_ctx *sched_ctx)
+static int push_task_on_best_worker(struct starpu_task *task, int best_workerid, double predicted, int prio)
 {
-	heft_data *hd = (heft_data*)sched_ctx->policy_data;
-
 	/* make sure someone coule execute that task ! */
 	STARPU_ASSERT(best_workerid != -1);
-	int best_workerid_ctx =  _starpu_get_index_in_ctx_of_workerid(sched_ctx->sched_ctx_id, best_workerid);
+	struct starpu_worker_s *best_worker = _starpu_get_worker_struct(best_workerid);
+
+	PTHREAD_MUTEX_LOCK(best_worker->sched_mutex);
+	exp_end[best_workerid] += predicted;
+	exp_len[best_workerid] += predicted;
 
-	PTHREAD_MUTEX_LOCK(sched_ctx->sched_mutex[best_workerid_ctx]);
-	hd->exp_end[best_workerid_ctx] += predicted;
-	hd->exp_len[best_workerid_ctx] += predicted;
-	hd->ntasks[best_workerid_ctx]++;
-	PTHREAD_MUTEX_UNLOCK(sched_ctx->sched_mutex[best_workerid_ctx]);
+	ntasks[best_workerid]++;
+	PTHREAD_MUTEX_UNLOCK(best_worker->sched_mutex);
 
 	task->predicted = predicted;
 
@@ -219,8 +214,8 @@ static void compute_all_performance_predictions(struct starpu_task *task,
     {
       worker = sched_ctx->workerid[worker_in_ctx];
       /* Sometimes workers didn't take the tasks as early as we expected */
-      hd->exp_start[worker_in_ctx] = STARPU_MAX(hd->exp_start[worker_in_ctx], starpu_timing_now());
-      exp_end[worker_in_ctx] = hd->exp_start[worker_in_ctx] + hd->exp_len[worker_in_ctx];
+      exp_start[worker] = STARPU_MAX(exp_start[worker], starpu_timing_now());
+      exp_end[worker_in_ctx] = exp_start[worker] + exp_len[worker];
       if (exp_end[worker_in_ctx] > max_exp_end)
  	max_exp_end = exp_end[worker_in_ctx];
 
@@ -247,7 +242,7 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 
       //      printf("%d: local task len = %2.2f perf model %d\n", worker, local_task_length[worker_in_ctx], task->cl->model->type);
 
-      double ntasks_end = hd->ntasks[worker_in_ctx] / starpu_worker_get_relative_speedup(perf_arch);
+      double ntasks_end = ntasks[worker] / starpu_worker_get_relative_speedup(perf_arch);
 
       if (ntasks_best == -1
 	  || (!calibrating && ntasks_end < ntasks_best_end) /* Not calibrating, take better task */
@@ -272,7 +267,7 @@ static void compute_all_performance_predictions(struct starpu_task *task,
       if (unknown)
 	continue;
 
-      exp_end[worker_in_ctx] = hd->exp_start[worker_in_ctx] + hd->exp_len[worker_in_ctx] + local_task_length[worker_in_ctx];
+      exp_end[worker_in_ctx] = exp_start[worker] + exp_len[worker] + local_task_length[worker_in_ctx];
 
       if (exp_end[worker_in_ctx] < best_exp_end)
 	{
@@ -325,7 +320,7 @@ static int _heft_push_task(struct starpu_task *task, unsigned prio, unsigned sch
 	 * want to speed-up calibration time so we force this measurement */
 	if (forced_best != -1){
 		_starpu_increment_nsubmitted_tasks_of_worker(forced_best);
-		return push_task_on_best_worker(task, forced_best, 0.0, prio, sched_ctx);
+		return push_task_on_best_worker(task, forced_best, 0.0, prio);
 	}
 
 	/*
@@ -396,7 +391,7 @@ static int _heft_push_task(struct starpu_task *task, unsigned prio, unsigned sch
 	}
 
 	_starpu_increment_nsubmitted_tasks_of_worker(best);
-	return push_task_on_best_worker(task, best, model_best, prio, sched_ctx);
+	return push_task_on_best_worker(task, best, model_best, prio);
 }
 
 static int heft_push_prio_task(struct starpu_task *task, unsigned sched_ctx_id)
@@ -415,42 +410,10 @@ static int heft_push_task(struct starpu_task *task, unsigned sched_ctx_id)
 static void heft_deinit(unsigned sched_ctx_id) 
 {
 	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
-	int workerid_in_ctx;
-	int nworkers = sched_ctx->nworkers_in_ctx;
-	heft_data *ht = (heft_data*)sched_ctx->policy_data;
-
-	for (workerid_in_ctx = 0; workerid_in_ctx < nworkers; workerid_in_ctx++){
-		PTHREAD_MUTEX_DESTROY(sched_ctx->sched_mutex[workerid_in_ctx]);
-		PTHREAD_COND_DESTROY(sched_ctx->sched_cond[workerid_in_ctx]);
-		free(sched_ctx->sched_mutex[workerid_in_ctx]);
-		free(sched_ctx->sched_cond[workerid_in_ctx]);
-	}
-
-	free(ht->exp_start);
-	free(ht->exp_end);
-	free(ht->exp_len);
-	free(ht->ntasks);
-	  
+	heft_data *ht = (heft_data*)sched_ctx->policy_data;	  
 	free(ht);
 }
 
-
-static void heft_deinit_for_workers(unsigned sched_ctx_id) 
-{
-  //TODO: solve pb with indexes before
-	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
-	int workerid_in_ctx;
-	int nworkers = sched_ctx->nworkers_in_ctx;
-
-	for (workerid_in_ctx = 0; workerid_in_ctx < nworkers; workerid_in_ctx++){
-		PTHREAD_MUTEX_DESTROY(sched_ctx->sched_mutex[workerid_in_ctx]);
-		PTHREAD_COND_DESTROY(sched_ctx->sched_cond[workerid_in_ctx]);
-		free(sched_ctx->sched_mutex[workerid_in_ctx]);
-		free(sched_ctx->sched_cond[workerid_in_ctx]);
-	}
-
-}
-
 struct starpu_sched_policy_s heft_policy = {
 	.init_sched = heft_init,
 	.deinit_sched = heft_deinit,