Browse Source

The push_task_notify method permits to keep the scheduler's structures up to
date when a task is scheduled without the help of the scheduling strategy (ie.
when we have a task that was explicitely assigned to a scheduler).

Cédric Augonnet 14 years ago
parent
commit
918f77005c
4 changed files with 37 additions and 3 deletions
  1. 5 0
      doc/starpu.texi
  2. 7 0
      include/starpu_scheduler.h
  3. 4 3
      src/core/sched_policy.c
  4. 21 0
      src/sched_policies/heft.c

+ 5 - 0
doc/starpu.texi

@@ -4326,6 +4326,11 @@ Cleanup the scheduling policy.
 Insert a task into the scheduler.
 @item @code{push_prio_task}:
 Insert a priority task into the scheduler.
+@item @code{push_prio_notify}:
+Notify the scheduler that a task was pushed on the worker. This method is
+called when a task that was explicitely assigned to a worker is scheduled. This
+method therefore permits to keep the state of of the scheduler coherent even
+when StarPU bypasses the scheduling strategy.
 @item @code{pop_task}:
 Get a task from the scheduler. The mutex associated to the worker is already
 taken when this method is called. If this method is defined as @code{NULL}, the

+ 7 - 0
include/starpu_scheduler.h

@@ -71,6 +71,13 @@ struct starpu_sched_policy_s {
 	/* Insert a task into the scheduler. */
 	int (*push_task)(struct starpu_task *);
 
+	/* Notify the scheduler that a task was pushed on the worker. This
+	 * method is called when a task that was explicitely assigned to a
+	 * worker is scheduled. This method therefore permits to keep the state
+	 * of of the scheduler coherent even when StarPU bypasses the
+	 * scheduling strategy. */
+	void (*push_task_notify)(struct starpu_task *, int workerid);
+
 	/* Insert a priority task into the scheduler. */
 	int (*push_prio_task)(struct starpu_task *);
 

+ 4 - 3
src/core/sched_policy.c

@@ -235,6 +235,9 @@ static int _starpu_push_task_on_specific_worker(struct starpu_task *task, int wo
 	if (use_prefetch)
 		starpu_prefetch_task_input_on_node(task, memory_node);
 
+	if (policy.push_task_notify)
+		policy.push_task_notify(task, workerid);
+
 	if (is_basic_worker)
 	{
 		return _starpu_push_local_task(worker, task, 0);
@@ -350,9 +353,7 @@ struct starpu_task *_starpu_pop_every_task(void)
 
 void _starpu_sched_post_exec_hook(struct starpu_task *task)
 {
-	/* We only execute the hook if the task was put here by the scheduling
-	 * policy */
-	if (!task->execute_on_a_specific_worker && policy.post_exec_hook)
+	if (policy.post_exec_hook)
 		policy.post_exec_hook(task);
 }
 

+ 21 - 0
src/sched_policies/heft.c

@@ -87,6 +87,26 @@ static void heft_post_exec_hook(struct starpu_task *task)
 	PTHREAD_MUTEX_UNLOCK(&sched_mutex[workerid]);
 }
 
+static void heft_push_task_notify(struct starpu_task *task, int workerid)
+{
+	/* Compute the expected penality */
+	enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(workerid);
+	double predicted = starpu_task_expected_length(task, perf_arch);
+
+	/* Update the predictions */
+	PTHREAD_MUTEX_LOCK(&sched_mutex[workerid]);
+
+	/* Sometimes workers didn't take the tasks as early as we expected */
+	exp_start[workerid] = STARPU_MAX(exp_start[workerid], starpu_timing_now());
+	exp_end[workerid] = STARPU_MAX(exp_start[workerid], starpu_timing_now());
+
+	exp_end[workerid] += predicted;
+	exp_len[workerid] += predicted;
+	ntasks[workerid]++;
+
+	PTHREAD_MUTEX_UNLOCK(&sched_mutex[workerid]);
+}
+
 static int push_task_on_best_worker(struct starpu_task *task, int best_workerid, double predicted, int prio)
 {
 	/* make sure someone coule execute that task ! */
@@ -290,6 +310,7 @@ struct starpu_sched_policy_s heft_policy = {
 	.deinit_sched = heft_deinit,
 	.push_task = heft_push_task, 
 	.push_prio_task = heft_push_prio_task, 
+	.push_task_notify = heft_push_task_notify,
 	.pop_task = NULL,
 	.pop_every_task = NULL,
 	.post_exec_hook = heft_post_exec_hook,