Browse Source

src/sched_policies/heft.c: use an array of _starpu_fifo_taskq* rather than multiple global variables.

This way, we get rid of 4 global variables (exp_start, exp_len, exp_end and
ntasks), while adding a new one (queue_array). The whole thing is a bit easier
to read.
Cyril Roelandt 12 years ago
parent
commit
c68b925ce5
1 changed files with 45 additions and 37 deletions
  1. 45 37
      src/sched_policies/heft.c

+ 45 - 37
src/sched_policies/heft.c

@@ -28,6 +28,7 @@
 #include <starpu_top.h>
 #include <core/jobs.h>
 #include <top/starpu_top_core.h>
+#include <sched_policies/fifo_queues.h>
 
 #ifndef DBL_MIN
 #define DBL_MIN __DBL_MIN__
@@ -38,6 +39,7 @@
 #endif
 
 static unsigned nworkers;
+static struct _starpu_fifo_taskq *queue_array[STARPU_NMAXWORKERS];
 
 static pthread_cond_t sched_cond[STARPU_NMAXWORKERS];
 static pthread_mutex_t sched_mutex[STARPU_NMAXWORKERS];
@@ -47,11 +49,6 @@ static double beta = _STARPU_DEFAULT_BETA;
 static double _gamma = _STARPU_DEFAULT_GAMMA;
 static double idle_power = 0.0;
 
-static double exp_start[STARPU_NMAXWORKERS]; /* of the first queued task */
-static double exp_end[STARPU_NMAXWORKERS];   /* of the set of queued tasks */
-static double exp_len[STARPU_NMAXWORKERS];   /* of the last queued task */
-static double ntasks[STARPU_NMAXWORKERS];
-
 const float alpha_minimum=0;
 const float alpha_maximum=10.0;
 const float beta_minimum=0;
@@ -97,10 +94,13 @@ static void heft_init(struct starpu_machine_topology *topology,
 	unsigned workerid;
 	for (workerid = 0; workerid < nworkers; workerid++)
 	{
-		exp_start[workerid] = starpu_timing_now();
-		exp_len[workerid] = 0.0;
-		exp_end[workerid] = exp_start[workerid];
-		ntasks[workerid] = 0;
+		queue_array[workerid] = _starpu_create_fifo();
+		struct _starpu_fifo_taskq *fifo = queue_array[workerid];
+		
+		fifo->exp_start = starpu_timing_now();
+		fifo->exp_len = 0.0;
+		fifo->exp_end = fifo->exp_start;
+		fifo->ntasks = 0;
 
 		_STARPU_PTHREAD_MUTEX_INIT(&sched_mutex[workerid], NULL);
 		_STARPU_PTHREAD_COND_INIT(&sched_cond[workerid], NULL);
@@ -115,21 +115,24 @@ static void heft_init(struct starpu_machine_topology *topology,
 static void heft_pre_exec_hook(struct starpu_task *task)
 {
 	int workerid = starpu_worker_get_id();
+	struct _starpu_fifo_taskq *fifo = queue_array[workerid];
 	double model = task->predicted;
 	double transfer_model = task->predicted_transfer;
 
 	/* Once the task is executing, we can update the predicted amount
 	 * of work. */
 	_STARPU_PTHREAD_MUTEX_LOCK(&sched_mutex[workerid]);
-	exp_len[workerid] -= model + transfer_model;
-	exp_start[workerid] = starpu_timing_now() + model;
-	exp_end[workerid] = exp_start[workerid] + exp_len[workerid];
-	ntasks[workerid]--;
+	fifo->exp_len-= model + transfer_model;
+	fifo->exp_start = starpu_timing_now() + model;
+	fifo->exp_end= fifo->exp_start + fifo->exp_len;
+	fifo->ntasks--;
 	_STARPU_PTHREAD_MUTEX_UNLOCK(&sched_mutex[workerid]);
 }
 
 static void heft_push_task_notify(struct starpu_task *task, int workerid)
 {
+	struct _starpu_fifo_taskq *fifo = queue_array[workerid];
+
 	/* Compute the expected penality */
 	enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(workerid);
 	unsigned memory_node = starpu_worker_get_memory_node(workerid);
@@ -143,21 +146,21 @@ static void heft_push_task_notify(struct starpu_task *task, int workerid)
 	_STARPU_PTHREAD_MUTEX_LOCK(&sched_mutex[workerid]);
 
 	/* Sometimes workers didn't take the tasks as early as we expected */
-	exp_start[workerid] = STARPU_MAX(exp_start[workerid], starpu_timing_now());
-	exp_end[workerid] = exp_start[workerid] + exp_len[workerid];
+	fifo->exp_start = STARPU_MAX(fifo->exp_start, starpu_timing_now());
+	fifo->exp_end = fifo->exp_start + fifo->exp_len;
 
 	/* If there is no prediction available, we consider the task has a null length */
 	if (!isnan(predicted))
 	{
 		task->predicted = predicted;
-		exp_end[workerid] += predicted;
-		exp_len[workerid] += predicted;
+		fifo->exp_end += predicted;
+		fifo->exp_len += predicted;
 	}
 
 	/* If there is no prediction available, we consider the task has a null length */
 	if (!isnan(predicted_transfer))
 	{
-		if (starpu_timing_now() + predicted_transfer < exp_end[workerid])
+		if (starpu_timing_now() + predicted_transfer < fifo->exp_end)
 		{
 			/* We may hope that the transfer will be finished by
 			 * the start of the task. */
@@ -167,33 +170,35 @@ static void heft_push_task_notify(struct starpu_task *task, int workerid)
 		{
 			/* The transfer will not be finished by then, take the
 			 * remainder into account */
-			predicted_transfer = (starpu_timing_now() + predicted_transfer) - exp_end[workerid];
+			predicted_transfer = (starpu_timing_now() + predicted_transfer) - fifo->exp_end;
 		}
 		task->predicted_transfer = predicted_transfer;
-		exp_end[workerid] += predicted_transfer;
-		exp_len[workerid] += predicted_transfer;
+		fifo->exp_end += predicted_transfer;
+		fifo->exp_len += predicted_transfer;
 	}
 
-	ntasks[workerid]++;
+	fifo->ntasks++;
 
 	_STARPU_PTHREAD_MUTEX_UNLOCK(&sched_mutex[workerid]);
 }
 
 static int push_task_on_best_worker(struct starpu_task *task, int best_workerid, double predicted, double predicted_transfer, int prio)
 {
+	struct _starpu_fifo_taskq *fifo = queue_array[best_workerid];
+
 	/* make sure someone coule execute that task ! */
 	STARPU_ASSERT(best_workerid != -1);
 
 	_STARPU_PTHREAD_MUTEX_LOCK(&sched_mutex[best_workerid]);
 
 	/* Sometimes workers didn't take the tasks as early as we expected */
-	exp_start[best_workerid] = STARPU_MAX(exp_start[best_workerid], starpu_timing_now());
-	exp_end[best_workerid] = exp_start[best_workerid] + exp_len[best_workerid];
+	fifo->exp_start = STARPU_MAX(fifo->exp_start, starpu_timing_now());
+	fifo->exp_end = fifo->exp_start + fifo->exp_len;
 
-	exp_end[best_workerid] += predicted;
-	exp_len[best_workerid] += predicted;
+	fifo->exp_end += predicted;
+	fifo->exp_len += predicted;
 
-	if (starpu_timing_now() + predicted_transfer < exp_end[best_workerid])
+	if (starpu_timing_now() + predicted_transfer < fifo->exp_end)
 	{
 		/* We may hope that the transfer will be finished by
 		 * the start of the task. */
@@ -203,12 +208,12 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 	{
 		/* The transfer will not be finished by then, take the
 		 * remainder into account */
-		predicted_transfer = (starpu_timing_now() + predicted_transfer) - exp_end[best_workerid];
+		predicted_transfer = (starpu_timing_now() + predicted_transfer) - fifo->exp_end;
 	}
-	exp_end[best_workerid] += predicted_transfer;
-	exp_len[best_workerid] += predicted_transfer;
+	fifo->exp_end += predicted_transfer;
+	fifo->exp_len += predicted_transfer;
 
-	ntasks[best_workerid]++;
+	fifo->ntasks++;
 	_STARPU_PTHREAD_MUTEX_UNLOCK(&sched_mutex[best_workerid]);
 
 	task->predicted = predicted;
@@ -216,8 +221,8 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 
 	if (_starpu_top_status_get())
 		_starpu_top_task_prevision(task, best_workerid,
-					(unsigned long long)(exp_end[best_workerid]-predicted)/1000,
-					(unsigned long long)exp_end[best_workerid]/1000);
+					(unsigned long long)(fifo->exp_end-predicted)/1000,
+					(unsigned long long)fifo->exp_end/1000);
 
 	if (starpu_get_prefetch_flag())
 	{
@@ -257,6 +262,8 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 
 	for (worker = 0; worker < nworkers; worker++)
 	{
+		struct _starpu_fifo_taskq *fifo = queue_array[worker];
+
 		for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
 		{
 			if (!starpu_worker_can_execute_task(worker, task, nimpl))
@@ -267,8 +274,8 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 
 			/* Sometimes workers didn't take the tasks as early as we expected */
 			_STARPU_PTHREAD_MUTEX_LOCK(&sched_mutex[worker]);
-			exp_start[worker] = STARPU_MAX(exp_start[worker], starpu_timing_now());
-			exp_end[worker][nimpl] = exp_start[worker] + exp_len[worker];
+			fifo->exp_start = STARPU_MAX(fifo->exp_start, starpu_timing_now());
+			exp_end[worker][nimpl] = fifo->exp_start + fifo->exp_len;
 			if (exp_end[worker][nimpl] > max_exp_end)
 				max_exp_end = exp_end[worker][nimpl];
 			_STARPU_PTHREAD_MUTEX_UNLOCK(&sched_mutex[worker]);
@@ -297,7 +304,7 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 
 			}
 
-			double ntasks_end = ntasks[worker] / starpu_worker_get_relative_speedup(perf_arch);
+			double ntasks_end = fifo->ntasks / starpu_worker_get_relative_speedup(perf_arch);
 
 			if (ntasks_best == -1
 			    || (!calibrating && ntasks_end < ntasks_best_end) /* Not calibrating, take better worker */
@@ -325,7 +332,7 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 			if (unknown)
 				continue;
 
-			exp_end[worker][nimpl] = exp_start[worker] + exp_len[worker] + local_task_length[worker][nimpl];
+			exp_end[worker][nimpl] = fifo->exp_start + fifo->exp_len + local_task_length[worker][nimpl];
 
 			if (exp_end[worker][nimpl] < best_exp_end)
 			{
@@ -526,6 +533,7 @@ static void heft_deinit(__attribute__ ((unused)) struct starpu_machine_topology
 	unsigned workerid;
 	for (workerid = 0; workerid < nworkers; workerid++)
 	{
+		_starpu_destroy_fifo(queue_array[workerid]);
 		_STARPU_PTHREAD_MUTEX_DESTROY(&sched_mutex[workerid]);
 		_STARPU_PTHREAD_COND_DESTROY(&sched_cond[workerid]);
 	}