Samuel Thibault преди 9 години
родител
ревизия
badedd961f
променени са 1 файла, в които са добавени 19 реда и са изтрити 10 реда
  1. 19 10
      src/sched_policies/graph_test_policy.c

+ 19 - 10
src/sched_policies/graph_test_policy.c

@@ -83,11 +83,11 @@ static void deinitialize_graph_test_policy(unsigned sched_ctx_id)
 }
 
 /* Push the given task on CPU or GPU prio list, using a dumb heuristic */
-static void push_task_to_prio(unsigned sched_ctx_id, struct _starpu_graph_test_policy_data *data, struct starpu_task *task)
+static struct _starpu_prio_deque *select_prio(unsigned sched_ctx_id, struct _starpu_graph_test_policy_data *data, struct starpu_task *task)
 {
 	int cpu_can = 0, gpu_can = 0;
-	double cpu_speed;
-	double gpu_speed;
+	double cpu_speed = 0.;
+	double gpu_speed = 0.;
 
 	/* Compute how fast CPUs can compute it, and how fast GPUs can compute it */
 	unsigned worker;
@@ -134,9 +134,9 @@ static void push_task_to_prio(unsigned sched_ctx_id, struct _starpu_graph_test_p
 
 	/* Decide to push on CPUs or GPUs depending on the overall computation power */
 	if (!gpu_can || (cpu_can && cpu_speed > gpu_speed))
-		_starpu_prio_deque_push_back_task(&data->prio_cpu, task);
+		return &data->prio_cpu;
 	else
-		_starpu_prio_deque_push_back_task(&data->prio_gpu, task);
+		return &data->prio_gpu;
 
 }
 
@@ -156,22 +156,24 @@ static void do_schedule_graph_test_policy(unsigned sched_ctx_id)
 
 	/* Now that we have priorities, move tasks from bag to priority queue */
 	while(!_starpu_fifo_empty(data->fifo)) {
-		push_task_to_prio(sched_ctx_id, data, _starpu_fifo_pop_task(data->fifo, -1));
+		struct starpu_task *task = _starpu_fifo_pop_task(data->fifo, -1);
+		struct _starpu_prio_deque *prio = select_prio(sched_ctx_id, data, task);
+		_starpu_prio_deque_push_back_task(prio, task);
 	}
 
 	/* And unleash the beast! */
 	unsigned worker;
 	struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
 	struct starpu_sched_ctx_iterator it;
+#ifdef STARPU_NON_BLOCKING_DRIVERS
 	workers->init_iterator(workers, &it);
 	while(workers->has_next(workers, &it))
 	{
 		/* Tell each worker is shouldn't sleep any more */
 		worker = workers->get_next(workers, &it);
-#ifdef STARPU_NON_BLOCKING_DRIVERS
-			starpu_bitmap_unset(data->waiters, worker);
-#endif
+		starpu_bitmap_unset(data->waiters, worker);
 	}
+#endif
 	STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
 
 #if !defined(STARPU_NON_BLOCKING_DRIVERS) || defined(STARPU_SIMGRID)
@@ -203,7 +205,8 @@ static int push_task_graph_test_policy(struct starpu_task *task)
 	}
 
 	/* Priorities are computed, we can push to execution */
-	push_task_to_prio(sched_ctx_id, data, task);
+	struct _starpu_prio_deque *prio = select_prio(sched_ctx_id, data, task);
+	_starpu_prio_deque_push_back_task(prio, task);
 
 	starpu_push_task_end(task);
 
@@ -227,6 +230,12 @@ static int push_task_graph_test_policy(struct starpu_task *task)
 			/* This worker is not waiting for a task */
 			continue;
 #endif
+		if (prio == &data->prio_cpu && starpu_worker_get_type(worker) != STARPU_CPU_WORKER)
+			/* This worker doesn't pop from the queue we have filled */
+			continue;
+		if (prio == &data->prio_gpu && starpu_worker_get_type(worker) == STARPU_CPU_WORKER)
+			/* This worker doesn't pop from the queue we have filled */
+			continue;
 
 		if (starpu_worker_can_execute_task_first_impl(worker, task, NULL))
 		{