Browse Source

Heft: use a scheduler-specific fifo rather than pushing tasks directly to the workers' local tasks.

This makes heft look a bit more like the dm* schedulers, thus probably making a
future code factorization easier.

The "prio" variable used in many different places in heft.c seems no longer
needed, but has been kept so that heft really looks like dm*.
Cyril Roelandt 12 years ago
parent
commit
347032f30d
1 changed files with 27 additions and 8 deletions
  1. 27 8
      src/sched_policies/heft.c

+ 27 - 8
src/sched_policies/heft.c

@@ -122,10 +122,9 @@ static void heft_pre_exec_hook(struct starpu_task *task)
 	/* Once the task is executing, we can update the predicted amount
 	 * of work. */
 	_STARPU_PTHREAD_MUTEX_LOCK(&sched_mutex[workerid]);
-	fifo->exp_len-= model + transfer_model;
+	fifo->exp_len-= transfer_model;
 	fifo->exp_start = starpu_timing_now() + model;
 	fifo->exp_end= fifo->exp_start + fifo->exp_len;
-	fifo->ntasks--;
 	_STARPU_PTHREAD_MUTEX_UNLOCK(&sched_mutex[workerid]);
 }
 
@@ -230,9 +229,9 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 		starpu_prefetch_task_input_on_node(task, memory_node);
 	}
 
-
-	//_STARPU_DEBUG("Heft : pushing local task\n");
-	return starpu_push_local_task(best_workerid, task, prio);
+	return _starpu_fifo_push_task(queue_array[best_workerid],
+				      &sched_mutex[best_workerid],
+				      &sched_cond[best_workerid], task);
 }
 
 /* TODO: Correct the bugs in the scheduling !!! */
@@ -431,7 +430,7 @@ static int _heft_push_task(struct starpu_task *task, unsigned prio)
 			 * Our task uses multiformat handles, which may need to be converted.
 			 */
 			push_conversion_tasks(task, forced_worker);
-			prio = 0;
+			task->priority = 0;
 		}
 
 		return push_task_on_best_worker(task, forced_worker, 0.0, 0.0, prio);
@@ -513,7 +512,7 @@ static int _heft_push_task(struct starpu_task *task, unsigned prio)
 		 * Our task uses multiformat handles, which may need to be converted.
 		 */
 		push_conversion_tasks(task, forced_worker);
-		prio = 0;
+		task->priority = 0;
 	}
 
 	return push_task_on_best_worker(task, best, model_best, transfer_model_best, prio);
@@ -527,6 +526,26 @@ static int heft_push_task(struct starpu_task *task)
 	return _heft_push_task(task, 0);
 }
 
+static struct starpu_task *heft_pop_task(void)
+{
+	struct starpu_task *task;
+
+	int workerid = starpu_worker_get_id();
+	struct _starpu_fifo_taskq *fifo = queue_array[workerid];
+
+	task = _starpu_fifo_pop_local_task(fifo);
+	if (task)
+	{
+		double model = task->predicted;
+
+		fifo->exp_len -= model;
+		fifo->exp_start = starpu_timing_now() + model;
+		fifo->exp_end = fifo->exp_start + fifo->exp_len;
+	}
+
+	return task;
+}
+
 static void heft_deinit(__attribute__ ((unused)) struct starpu_machine_topology *topology,
                         __attribute__ ((unused)) struct starpu_sched_policy *_policy)
 {
@@ -545,7 +564,7 @@ struct starpu_sched_policy _starpu_sched_heft_policy =
 	.deinit_sched = heft_deinit,
 	.push_task = heft_push_task,
 	.push_task_notify = heft_push_task_notify,
-	.pop_task = NULL,
+	.pop_task = heft_pop_task,
 	.pop_every_task = NULL,
 	.pre_exec_hook = heft_pre_exec_hook,
 	.post_exec_hook = NULL,