瀏覽代碼

Move scheduling helper functions to shared area

Samuel Thibault 5 年之前
父節點
當前提交
c5b931b16e
共有 3 個文件被更改,包括 103 次插入98 次删除
  1. 11 96
      src/sched_policies/deque_modeling_policy_data_aware.c
  2. 88 1
      src/sched_policies/fifo_queues.c
  3. 4 1
      src/sched_policies/fifo_queues.h

+ 11 - 96
src/sched_policies/deque_modeling_policy_data_aware.c

@@ -165,36 +165,6 @@ void _starpu__dmda_c__register_knobs(void)
 #define _STARPU_SCHED_BETA_DEFAULT 1.0
 #define _STARPU_SCHED_GAMMA_DEFAULT 1000.0
 
-static int count_non_ready_buffers(struct starpu_task *task, unsigned worker)
-{
-	int cnt = 0;
-	unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
-	unsigned index;
-
-	for (index = 0; index < nbuffers; index++)
-	{
-		starpu_data_handle_t handle;
-		unsigned buffer_node = _starpu_task_data_get_node_on_worker(task, index, worker);
-
-		handle = STARPU_TASK_GET_HANDLE(task, index);
-
-		int is_valid;
-		starpu_data_query_status(handle, buffer_node, NULL, &is_valid, NULL);
-
-		if (!is_valid)
-			cnt++;
-	}
-
-	return cnt;
-}
-
-static int _normalize_prio(int priority, int num_priorities, unsigned sched_ctx_id)
-{
-	int min = starpu_sched_ctx_get_min_priority(sched_ctx_id);
-	int max = starpu_sched_ctx_get_max_priority(sched_ctx_id);
-	return ((num_priorities-1)/(max-min)) * (priority - min);
-}
-
 /* This is called when a transfer request is actually pushed to the worker */
 static void _starpu_fifo_task_transfer_started(struct _starpu_fifo_taskq *fifo, struct starpu_task *task, int num_priorities)
 {
@@ -210,7 +180,7 @@ static void _starpu_fifo_task_transfer_started(struct _starpu_fifo_taskq *fifo,
 	if(num_priorities != -1)
 	{
 		int i;
-		int task_prio = _normalize_prio(task->priority, num_priorities, task->sched_ctx);
+		int task_prio = _starpu_normalize_prio(task->priority, num_priorities, task->sched_ctx);
 		for(i = 0; i <= task_prio; i++)
 			fifo->exp_len_per_priority[i] -= transfer_model;
 	}
@@ -235,7 +205,7 @@ static void _starpu_fifo_task_started(struct _starpu_fifo_taskq *fifo, struct st
 		if(num_priorities != -1)
 		{
 			int i;
-			int task_prio = _normalize_prio(task->priority, num_priorities, task->sched_ctx);
+			int task_prio = _starpu_normalize_prio(task->priority, num_priorities, task->sched_ctx);
 			for(i = 0; i <= task_prio; i++)
 				fifo->exp_len_per_priority[i] -= model;
 		}
@@ -254,61 +224,6 @@ static void _starpu_fifo_task_finished(struct _starpu_fifo_taskq *fifo, struct s
 
 
 
-static struct starpu_task *_starpu_fifo_pop_first_ready_task(struct _starpu_fifo_taskq *fifo_queue, unsigned workerid, int num_priorities)
-{
-	struct starpu_task *task = NULL, *current;
-
-	if (fifo_queue->ntasks == 0)
-		return NULL;
-
-	if (fifo_queue->ntasks > 0)
-	{
-		fifo_queue->ntasks--;
-
-		task = starpu_task_list_front(&fifo_queue->taskq);
-		if (STARPU_UNLIKELY(!task))
-			return NULL;
-
-		int first_task_priority = task->priority;
-
-		current = task;
-
-		int non_ready_best = INT_MAX;
-
-		while (current)
-		{
-			int priority = current->priority;
-
-			if (priority >= first_task_priority)
-			{
-				int non_ready = count_non_ready_buffers(current, workerid);
-				if (non_ready < non_ready_best)
-				{
-					non_ready_best = non_ready;
-					task = current;
-
-					if (non_ready == 0)
-						break;
-				}
-			}
-
-			current = current->next;
-		}
-
-		if(num_priorities != -1)
-		{
-			int i;
-			int task_prio = _normalize_prio(task->priority, num_priorities, task->sched_ctx);
-			for(i = 0; i <= task_prio; i++)
-				fifo_queue->ntasks_per_priority[i]--;
-		}
-
-		starpu_task_list_erase(&fifo_queue->taskq, task);
-	}
-
-	return task;
-}
-
 static struct starpu_task *dmda_pop_ready_task(unsigned sched_ctx_id)
 {
 	struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
@@ -332,7 +247,7 @@ static struct starpu_task *dmda_pop_ready_task(unsigned sched_ctx_id)
 #ifdef STARPU_VERBOSE
 		if (task->cl)
 		{
-			int non_ready = count_non_ready_buffers(task, workerid);
+			int non_ready = _starpu_count_non_ready_buffers(task, workerid);
 			if (non_ready == 0)
 				dt->ready_task_cnt++;
 		}
@@ -369,7 +284,7 @@ static struct starpu_task *dmda_pop_task(unsigned sched_ctx_id)
 #ifdef STARPU_VERBOSE
 		if (task->cl)
 		{
-			int non_ready = count_non_ready_buffers(task, workerid);
+			int non_ready = _starpu_count_non_ready_buffers(task, workerid);
 			if (non_ready == 0)
 				dt->ready_task_cnt++;
 		}
@@ -460,7 +375,7 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 		if(dt->num_priorities != -1)
 		{
 			int i;
-			int task_prio = _normalize_prio(task->priority, dt->num_priorities, task->sched_ctx);
+			int task_prio = _starpu_normalize_prio(task->priority, dt->num_priorities, task->sched_ctx);
 			for(i = 0; i <= task_prio; i++)
 				fifo->exp_len_per_priority[i] += predicted_transfer;
 		}
@@ -473,7 +388,7 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 		if(dt->num_priorities != -1)
 		{
 			int i;
-			int task_prio = _normalize_prio(task->priority, dt->num_priorities, task->sched_ctx);
+			int task_prio = _starpu_normalize_prio(task->priority, dt->num_priorities, task->sched_ctx);
 			for(i = 0; i <= task_prio; i++)
 				fifo->exp_len_per_priority[i] += predicted;
 		}
@@ -513,7 +428,7 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 		if(dt->num_priorities != -1)
 		{
 			int i;
-			int task_prio = _normalize_prio(task->priority, dt->num_priorities, task->sched_ctx);
+			int task_prio = _starpu_normalize_prio(task->priority, dt->num_priorities, task->sched_ctx);
 			for(i = 0; i <= task_prio; i++)
 				dt->queue_array[best_workerid]->ntasks_per_priority[i]++;
 		}
@@ -716,7 +631,7 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 	struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 
 	if(sorted_decision && dt->num_priorities != -1)
-		task_prio = _normalize_prio(task->priority, dt->num_priorities, sched_ctx_id);
+		task_prio = _starpu_normalize_prio(task->priority, dt->num_priorities, sched_ctx_id);
 
 	struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
 	double now = starpu_timing_now();
@@ -1240,7 +1155,7 @@ static void dmda_push_task_notify(struct starpu_task *task, int workerid, int pe
 		if(dt->num_priorities != -1)
 		{
 			int i;
-			int task_prio = _normalize_prio(task->priority, dt->num_priorities, task->sched_ctx);
+			int task_prio = _starpu_normalize_prio(task->priority, dt->num_priorities, task->sched_ctx);
 			for(i = 0; i <= task_prio; i++)
 				fifo->exp_len_per_priority[i] += predicted_transfer;
 		}
@@ -1256,7 +1171,7 @@ static void dmda_push_task_notify(struct starpu_task *task, int workerid, int pe
 		if(dt->num_priorities != -1)
 		{
 			int i;
-			int task_prio = _normalize_prio(task->priority, dt->num_priorities, task->sched_ctx);
+			int task_prio = _starpu_normalize_prio(task->priority, dt->num_priorities, task->sched_ctx);
 			for(i = 0; i <= task_prio; i++)
 				fifo->exp_len_per_priority[i] += predicted;
 		}
@@ -1265,7 +1180,7 @@ static void dmda_push_task_notify(struct starpu_task *task, int workerid, int pe
 	if(dt->num_priorities != -1)
 	{
 		int i;
-		int task_prio = _normalize_prio(task->priority, dt->num_priorities, task->sched_ctx);
+		int task_prio = _starpu_normalize_prio(task->priority, dt->num_priorities, task->sched_ctx);
 		for(i = 0; i <= task_prio; i++)
 			fifo->ntasks_per_priority[i]++;
 	}

+ 88 - 1
src/sched_policies/fifo_queues.c

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2011-2013,2015,2016                      Inria
- * Copyright (C) 2008-2017                                Université de Bordeaux
+ * Copyright (C) 2008-2017,2019                           Université de Bordeaux
  * Copyright (C) 2010,2011,2013,2015-2017                 CNRS
  * Copyright (C) 2013                                     Simon Archipoff
  * Copyright (C) 2011                                     Télécom-SudParis
@@ -25,6 +25,7 @@
 
 #include <sched_policies/fifo_queues.h>
 #include <common/fxt.h>
+#include <core/topology.h>
 /*
 static int is_sorted_task_list(struct starpu_task * task)
 {
@@ -336,3 +337,89 @@ struct starpu_task *_starpu_fifo_pop_every_task(struct _starpu_fifo_taskq *fifo_
 
 	return new_list;
 }
+
+int _starpu_normalize_prio(int priority, int num_priorities, unsigned sched_ctx_id)
+{
+	int min = starpu_sched_ctx_get_min_priority(sched_ctx_id);
+	int max = starpu_sched_ctx_get_max_priority(sched_ctx_id);
+	return ((num_priorities-1)/(max-min)) * (priority - min);
+}
+
+int _starpu_count_non_ready_buffers(struct starpu_task *task, unsigned worker)
+{
+	int cnt = 0;
+	unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
+	unsigned index;
+
+	for (index = 0; index < nbuffers; index++)
+	{
+		starpu_data_handle_t handle;
+		unsigned buffer_node = _starpu_task_data_get_node_on_worker(task, index, worker);
+
+		handle = STARPU_TASK_GET_HANDLE(task, index);
+
+		int is_valid;
+		starpu_data_query_status(handle, buffer_node, NULL, &is_valid, NULL);
+
+		if (!is_valid)
+			cnt++;
+	}
+
+	return cnt;
+}
+
+struct starpu_task *_starpu_fifo_pop_first_ready_task(struct _starpu_fifo_taskq *fifo_queue, unsigned workerid, int num_priorities)
+{
+	struct starpu_task *task = NULL, *current;
+
+	if (fifo_queue->ntasks == 0)
+		return NULL;
+
+	if (fifo_queue->ntasks > 0)
+	{
+		fifo_queue->ntasks--;
+
+		task = starpu_task_list_front(&fifo_queue->taskq);
+		if (STARPU_UNLIKELY(!task))
+			return NULL;
+
+		int first_task_priority = task->priority;
+
+		current = task;
+
+		int non_ready_best = INT_MAX;
+
+		while (current)
+		{
+			int priority = current->priority;
+
+			if (priority >= first_task_priority)
+			{
+				int non_ready = _starpu_count_non_ready_buffers(current, workerid);
+				if (non_ready < non_ready_best)
+				{
+					non_ready_best = non_ready;
+					task = current;
+
+					if (non_ready == 0)
+						break;
+				}
+			}
+
+			current = current->next;
+		}
+
+		if(num_priorities != -1)
+		{
+			int i;
+			int task_prio = _starpu_normalize_prio(task->priority, num_priorities, task->sched_ctx);
+			for(i = 0; i <= task_prio; i++)
+				fifo_queue->ntasks_per_priority[i]--;
+		}
+
+		starpu_task_list_erase(&fifo_queue->taskq, task);
+	}
+
+	return task;
+}
+

+ 4 - 1
src/sched_policies/fifo_queues.h

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2012,2013,2015,2016                      Inria
- * Copyright (C) 2008-2014,2016,2017                      Université de Bordeaux
+ * Copyright (C) 2008-2014,2016,2017,2019                 Université de Bordeaux
  * Copyright (C) 2010,2011,2013,2017                      CNRS
  * Copyright (C) 2016                                     Uppsala University
  *
@@ -66,5 +66,8 @@ int _starpu_fifo_pop_this_task(struct _starpu_fifo_taskq *fifo_queue, int worker
 struct starpu_task *_starpu_fifo_pop_task(struct _starpu_fifo_taskq *fifo, int workerid);
 struct starpu_task *_starpu_fifo_pop_local_task(struct _starpu_fifo_taskq *fifo);
 struct starpu_task *_starpu_fifo_pop_every_task(struct _starpu_fifo_taskq *fifo, int workerid);
+int _starpu_normalize_prio(int priority, int num_priorities, unsigned sched_ctx_id);
+int _starpu_count_non_ready_buffers(struct starpu_task *task, unsigned worker);
+struct starpu_task *_starpu_fifo_pop_first_ready_task(struct _starpu_fifo_taskq *fifo_queue, unsigned workerid, int num_priorities);
 
 #endif // __FIFO_QUEUES_H__