Преглед на файлове

Directly use workers' local queues instead of reimplementing queues and a pop
method. When updating the expected start/end, make sure the prediction was not
a negative value (which indicates that no model is available).

Cédric Augonnet преди 14 години
родител
ревизия
71952e4c0c
променени са 1 файла, в които са добавени 56 реда и са изтрити 93 реда
  1. 56 93
      src/sched_policies/parallel_heft.c

+ 56 - 93
src/sched_policies/parallel_heft.c

@@ -19,7 +19,6 @@
 #include <float.h>
 #include <limits.h>
 #include <core/workers.h>
-#include <sched_policies/fifo_queues.h>
 #include <core/perfmodel/perfmodel.h>
 #include <starpu_parameters.h>
 #include <common/barrier.h>
@@ -30,8 +29,6 @@ static unsigned nworkers, ncombinedworkers;
 static enum starpu_perf_archtype applicable_perf_archtypes[STARPU_NARCH_VARIATIONS];
 static unsigned napplicable_perf_archtypes = 0;
 
-static struct starpu_fifo_taskq_s *queue_array[STARPU_NMAXWORKERS];
-
 static pthread_cond_t sched_cond[STARPU_NMAXWORKERS];
 static pthread_mutex_t sched_mutex[STARPU_NMAXWORKERS];
 
@@ -40,25 +37,31 @@ static double beta = STARPU_DEFAULT_BETA;
 static double _gamma = STARPU_DEFAULT_GAMMA;
 static double idle_power = 0.0;
 
-static struct starpu_task *parallel_heft_pop_task(void)
-{
-	struct starpu_task *task;
+static double exp_start[STARPU_NMAXWORKERS];
+static double exp_end[STARPU_NMAXWORKERS];
+static double exp_len[STARPU_NMAXWORKERS];
+static double ntasks[STARPU_NMAXWORKERS];
 
+static void parallel_heft_post_exec_hook(struct starpu_task *task)
+{
 	int workerid = starpu_worker_get_id();
-	struct starpu_fifo_taskq_s *fifo = queue_array[workerid];
-
-	task = _starpu_fifo_pop_task(fifo, -1);
-	if (task) {
-		double model = task->predicted;
+	double model = task->predicted;
 	
-		fifo->exp_len -= model;
-		fifo->exp_start = starpu_timing_now() + model;
-		fifo->exp_end = fifo->exp_start + fifo->exp_len;
-	}
-
-	return task;
+	if (model < 0.0)
+		model = 0.0;
+	
+	/* Once we have executed the task, we can update the predicted amount
+	 * of work. */
+	PTHREAD_MUTEX_LOCK(&sched_mutex[workerid]);
+	exp_len[workerid] -= model;
+	exp_start[workerid] = starpu_timing_now() + model;
+	exp_end[workerid] = exp_start[workerid] + exp_len[workerid];
+	ntasks[workerid]--;
+	PTHREAD_MUTEX_UNLOCK(&sched_mutex[workerid]);
 }
 
+
+
 static int push_task_on_best_worker(struct starpu_task *task, int best_workerid, double predicted, int prio)
 {
 	/* make sure someone coule execute that task ! */
@@ -74,33 +77,21 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 	if (starpu_get_prefetch_flag())
 		starpu_prefetch_task_input_on_node(task, memory_node);
 
+	int ret = 0;
+
+	PTHREAD_MUTEX_LOCK(&big_lock);
+
 	if (is_basic_worker)
 	{
-		PTHREAD_MUTEX_LOCK(&big_lock);
-
-		struct starpu_fifo_taskq_s *fifo;
-		fifo = queue_array[best_workerid];
-	
-		fifo->exp_end += predicted;
-		fifo->exp_len += predicted;
+		if (predicted > 0.0)
+		{
+			exp_end[best_workerid] += predicted;
+			exp_len[best_workerid] += predicted;
+		}
 	
 		task->predicted = predicted;
 	
-		int ret;
-
-		if (prio)
-		{
-			ret = _starpu_fifo_push_prio_task(queue_array[best_workerid],
-				&sched_mutex[best_workerid], &sched_cond[best_workerid], task);
-		}
-		else {
-			ret = _starpu_fifo_push_task(queue_array[best_workerid],
-				&sched_mutex[best_workerid], &sched_cond[best_workerid], task);
-		}
-
-		PTHREAD_MUTEX_UNLOCK(&big_lock);
-
-		return ret;
+		ret = starpu_push_local_task(best_workerid, task, prio);
 	}
 	else {
 		/* This is a combined worker so we create task aliases */
@@ -109,9 +100,6 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 		int worker_size = combined_worker->worker_size;
 		int *combined_workerid = combined_worker->combined_workerid;
 
-		int ret = 0;
-		int i;
-		
 		task->predicted = predicted;
 
 		starpu_job_t j = _starpu_get_job_associated_to_task(task);
@@ -122,36 +110,28 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 		PTHREAD_BARRIER_INIT(&j->before_work_barrier, NULL, worker_size);
 		PTHREAD_BARRIER_INIT(&j->after_work_barrier, NULL, worker_size);
 
-		PTHREAD_MUTEX_LOCK(&big_lock);
-
+		int i;
 		for (i = 0; i < worker_size; i++)
 		{
 			struct starpu_task *alias = _starpu_create_task_alias(task);
 			int local_worker = combined_workerid[i];
 
-			struct starpu_fifo_taskq_s *fifo;
-			fifo = queue_array[local_worker];
-		
-			fifo->exp_end += predicted;
-			fifo->exp_len += predicted;
+			if (predicted > 0.0)
+			{
+				exp_end[local_worker] += predicted;
+				exp_len[local_worker] += predicted;
+			}
 		
 			alias->predicted = predicted;
 		
-			if (prio)
-			{
-				ret |= _starpu_fifo_push_prio_task(queue_array[local_worker],
-					&sched_mutex[local_worker], &sched_cond[local_worker], alias);
-			}
-			else {
-				ret |= _starpu_fifo_push_task(queue_array[local_worker],
-					&sched_mutex[local_worker], &sched_cond[local_worker], alias);
-			}
+			ret |= starpu_push_local_task(local_worker, alias, prio);
 		}
 
-		PTHREAD_MUTEX_UNLOCK(&big_lock);
-
-		return ret;
 	}
+
+	PTHREAD_MUTEX_UNLOCK(&big_lock);
+
+	return ret;
 }
 
 static double compute_expected_end(int workerid, double length)
@@ -159,9 +139,7 @@ static double compute_expected_end(int workerid, double length)
 	if (workerid < (int)nworkers)
 	{
 		/* This is a basic worker */
-		struct starpu_fifo_taskq_s *fifo;
-		fifo = queue_array[workerid];
-		return (fifo->exp_start + fifo->exp_len + length);
+		return exp_start[workerid] + exp_len[workerid] + length;
 	}
 	else {
 		/* This is a combined worker, the expected end is the end for the latest worker */
@@ -174,9 +152,7 @@ static double compute_expected_end(int workerid, double length)
 		int i;
 		for (i = 0; i < worker_size; i++)
 		{
-			struct starpu_fifo_taskq_s *fifo;
-			fifo = queue_array[combined_workerid[i]];
-			double local_exp_end = (fifo->exp_start + fifo->exp_len + length);
+			double local_exp_end = (exp_start[combined_workerid[i]] + exp_len[combined_workerid[i]] + length);
 			exp_end = STARPU_MAX(exp_end, local_exp_end);
 		}
 
@@ -190,9 +166,7 @@ static double compute_ntasks_end(int workerid)
 	if (workerid < (int)nworkers)
 	{
 		/* This is a basic worker */
-		struct starpu_fifo_taskq_s *fifo;
-		fifo = queue_array[workerid];
-		return fifo->ntasks / starpu_worker_get_relative_speedup(perf_arch);
+		return ntasks[workerid] / starpu_worker_get_relative_speedup(perf_arch);
 	}
 	else {
 		/* This is a combined worker, the expected end is the end for the latest worker */
@@ -205,10 +179,8 @@ static double compute_ntasks_end(int workerid)
 		int i;
 		for (i = 0; i < worker_size; i++)
 		{
-			struct starpu_fifo_taskq_s *fifo;
-			fifo = queue_array[combined_workerid[i]];
 			/* XXX: this is actually bogus: not all pushed tasks are necessarily parallel... */
-			ntasks_end = STARPU_MAX(ntasks_end, fifo->ntasks / starpu_worker_get_relative_speedup(perf_arch));
+			ntasks_end = STARPU_MAX(ntasks_end, ntasks[combined_workerid[i]] / starpu_worker_get_relative_speedup(perf_arch));
 		}
 
 		return ntasks_end;
@@ -217,8 +189,6 @@ static double compute_ntasks_end(int workerid)
 
 static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio)
 {
-	/* find the queue */
-	struct starpu_fifo_taskq_s *fifo;
 	unsigned worker;
 	int best = -1;
 	
@@ -249,13 +219,11 @@ static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio)
 
 	for (worker = 0; worker < nworkers; worker++)
 	{
-		fifo = queue_array[worker];
-
 		/* Sometimes workers didn't take the tasks as early as we expected */
-		fifo->exp_start = STARPU_MAX(fifo->exp_start, starpu_timing_now());
-		fifo->exp_end = fifo->exp_start + fifo->exp_len;
-		if (fifo->exp_end > max_exp_end)
-			max_exp_end = fifo->exp_end;
+		exp_start[worker] = STARPU_MAX(exp_start[worker], starpu_timing_now());
+		exp_end[worker] = exp_start[worker] + exp_len[worker];
+		if (exp_end[worker] > max_exp_end)
+			max_exp_end = exp_end[worker];
 	}
 
 	for (worker = 0; worker < (nworkers+ncombinedworkers); worker++)
@@ -411,7 +379,10 @@ static void initialize_parallel_heft_policy(struct starpu_machine_topology_s *to
 	unsigned workerid;
 	for (workerid = 0; workerid < nworkers; workerid++)
 	{
-		queue_array[workerid] = _starpu_create_fifo();
+		exp_start[workerid] = starpu_timing_now();
+		exp_len[workerid] = 0.0;
+		exp_end[workerid] = exp_start[workerid]; 
+		ntasks[workerid] = 0;
 	
 		PTHREAD_MUTEX_INIT(&sched_mutex[workerid], NULL);
 		PTHREAD_COND_INIT(&sched_cond[workerid], NULL);
@@ -443,22 +414,14 @@ static void initialize_parallel_heft_policy(struct starpu_machine_topology_s *to
 	}
 }
 
-static void deinitialize_parallel_heft_policy(struct starpu_machine_topology_s *topology, 
-	 __attribute__ ((unused)) struct starpu_sched_policy_s *_policy) 
-{
-	unsigned workerid;
-	for (workerid = 0; workerid < topology->nworkers; workerid++)
-		_starpu_destroy_fifo(queue_array[workerid]);
-}
-
 /* TODO: use post_exec_hook to fix the expected start */
 struct starpu_sched_policy_s _starpu_sched_parallel_heft_policy = {
 	.init_sched = initialize_parallel_heft_policy,
-	.deinit_sched = deinitialize_parallel_heft_policy,
+	.deinit_sched = NULL,
 	.push_task = parallel_heft_push_task, 
 	.push_prio_task = parallel_heft_push_prio_task, 
-	.pop_task = parallel_heft_pop_task,
-	.post_exec_hook = NULL,
+	.pop_task = NULL,
+	.post_exec_hook = parallel_heft_post_exec_hook,
 	.pop_every_task = NULL,
 	.policy_name = "pheft",
 	.policy_description = "parallel HEFT"