瀏覽代碼

Implement a variant of the dmda strategy where the tasks are sorted.

Cédric Augonnet 15 年之前
父節點
當前提交
c0bdc7d533
共有 2 個文件被更改,包括 104 次插入9 次删除
  1. 3 1
      src/core/sched_policy.c
  2. 101 8
      src/sched_policies/deque_modeling_policy_data_aware.c

+ 3 - 1
src/core/sched_policy.c

@@ -41,9 +41,10 @@ extern struct starpu_sched_policy_s _starpu_sched_random_policy;
 extern struct starpu_sched_policy_s _starpu_sched_dm_policy;
 extern struct starpu_sched_policy_s _starpu_sched_dmda_policy;
 extern struct starpu_sched_policy_s _starpu_sched_dmda_ready_policy;
+extern struct starpu_sched_policy_s _starpu_sched_dmda_sorted_policy;
 extern struct starpu_sched_policy_s _starpu_sched_eager_policy;
 
-#define NPREDEFINED_POLICIES	8
+#define NPREDEFINED_POLICIES	9
 
 static struct starpu_sched_policy_s *predefined_policies[NPREDEFINED_POLICIES] = {
 	&_starpu_sched_ws_policy,
@@ -52,6 +53,7 @@ static struct starpu_sched_policy_s *predefined_policies[NPREDEFINED_POLICIES] =
 	&_starpu_sched_dm_policy,
 	&_starpu_sched_dmda_policy,
 	&_starpu_sched_dmda_ready_policy,
+	&_starpu_sched_dmda_sorted_policy,
 	&_starpu_sched_random_policy,
 	&_starpu_sched_eager_policy
 };

+ 101 - 8
src/sched_policies/deque_modeling_policy_data_aware.c

@@ -28,6 +28,10 @@ static double alpha = 1.0;
 static double beta = 1.0;
 
 #ifdef STARPU_VERBOSE
+#define DISPLAY_DEBUG
+#endif
+
+#ifdef DISPLAY_DEBUG
 static long int total_task_cnt = 0;
 static long int ready_task_cnt = 0;
 #endif
@@ -71,15 +75,18 @@ struct starpu_task *_starpu_fifo_pop_first_ready_task(struct starpu_fifo_taskq_s
 
 		task = starpu_task_list_back(&fifo_queue->taskq);
 
+		int first_task_priority = task->priority;
+
 		current = task;
 
 		int non_ready_best = count_non_ready_buffers(current, node);
 
 		while (current)
 		{
+			int priority = current->priority;
 			int non_ready = count_non_ready_buffers(current, node);
 
-			if (non_ready < non_ready_best)
+			if ((priority < first_task_priority) && (non_ready < non_ready_best))
 			{
 				non_ready_best = non_ready;
 				task = current;
@@ -116,7 +123,7 @@ static struct starpu_task *dmda_pop_ready_task(void)
 		fifo->exp_start = _starpu_timing_now() + model;
 		fifo->exp_end = fifo->exp_start + fifo->exp_len;
 
-#ifdef STARPU_VERBOSE
+#ifdef DISPLAY_DEBUG
 		if (task->cl)
 		{
 			int non_ready = count_non_ready_buffers(task, starpu_worker_get_memory_node(workerid));
@@ -146,7 +153,7 @@ static struct starpu_task *dmda_pop_task(void)
 		fifo->exp_start = _starpu_timing_now() + model;
 		fifo->exp_end = fifo->exp_start + fifo->exp_len;
 
-#ifdef STARPU_VERBOSE
+#ifdef DISPLAY_DEBUG
 		if (task->cl)
 		{
 			int non_ready = count_non_ready_buffers(task, starpu_worker_get_memory_node(workerid));
@@ -187,6 +194,72 @@ static struct starpu_task *dmda_pop_every_task(uint32_t where)
 	return new_list;
 }
 
+int _starpu_fifo_push_sorted_task(struct starpu_fifo_taskq_s *fifo_queue, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond, struct starpu_task *task)
+{
+	struct starpu_task_list *list = &fifo_queue->taskq;
+
+	PTHREAD_MUTEX_LOCK(sched_mutex);
+
+	STARPU_TRACE_JOB_PUSH(task, 0);
+
+	if (list->head == NULL)
+	{
+		list->head = task;
+		list->tail = task;
+		task->prev = NULL;
+		task->next = NULL;
+	}
+	else {
+		struct starpu_task *current = list->head;
+		struct starpu_task *prev = NULL;
+
+		while (current)
+		{
+			if (current->priority >= task->priority)
+				break;
+
+			prev = current;
+			current = current->next;
+		}
+
+		if (prev == NULL)
+		{
+			/* Insert at the front of the list */
+			list->head->prev = task;
+			task->prev = NULL;
+			task->next = list->head;
+			list->head = task;
+		}
+		else {
+			if (current)
+			{
+				/* Insert between prev and current */
+				task->prev = prev;
+				prev->next = task;
+				task->next = current;
+				current->prev = task;
+			}
+			else {
+				/* Insert at the tail of the list */
+				list->tail->next = task;
+				task->next = NULL;
+				task->prev = list->tail;
+				list->tail = task;
+			}
+		}
+	}
+
+	fifo_queue->ntasks++;
+	fifo_queue->nprocessed++;
+
+	pthread_cond_signal(sched_cond);
+	PTHREAD_MUTEX_UNLOCK(sched_mutex);
+
+	return 0;
+}
+
+
+
 static int push_task_on_best_worker(struct starpu_task *task, int best_workerid, double predicted, int prio)
 {
 	/* make sure someone coule execute that task ! */
@@ -205,11 +278,15 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 	if (_starpu_get_prefetch_flag())
 		_starpu_prefetch_task_input_on_node(task, memory_node);
 
-	if (prio) {
-		return _starpu_fifo_push_prio_task(queue_array[best_workerid],
+	switch (prio) {
+		case 1:
+			return _starpu_fifo_push_prio_task(queue_array[best_workerid],
+				&sched_mutex[best_workerid], &sched_cond[best_workerid], task);
+		case 2:
+			return _starpu_fifo_push_sorted_task(queue_array[best_workerid],
 				&sched_mutex[best_workerid], &sched_cond[best_workerid], task);
-	} else {
-		return _starpu_fifo_push_task(queue_array[best_workerid],
+		default:
+			return _starpu_fifo_push_task(queue_array[best_workerid],
 				&sched_mutex[best_workerid], &sched_cond[best_workerid], task);
 	}
 }
@@ -374,6 +451,11 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio)
 	return push_task_on_best_worker(task, best, model_best, prio);
 }
 
+static int dmda_push_sorted_task(struct starpu_task *task)
+{
+	return _dmda_push_task(task, 2);
+}
+
 static int dm_push_prio_task(struct starpu_task *task)
 {
 	return _dm_push_task(task, 1);
@@ -432,7 +514,7 @@ static void deinitialize_dmda_policy(struct starpu_machine_topology_s *topology,
 	for (workerid = 0; workerid < topology->nworkers; workerid++)
 		_starpu_destroy_fifo(queue_array[workerid]);
 
-#ifdef STARPU_VERBOSE
+#ifdef DISPLAY_DEBUG
 	fprintf(stderr, "total_task_cnt %ld ready_task_cnt %ld -> %f\n", total_task_cnt, ready_task_cnt, (100.0f*ready_task_cnt)/total_task_cnt);
 #endif
 }
@@ -459,6 +541,17 @@ struct starpu_sched_policy_s _starpu_sched_dmda_policy = {
 	.policy_description = "data-aware performance model"
 };
 
+struct starpu_sched_policy_s _starpu_sched_dmda_sorted_policy = {
+	.init_sched = initialize_dmda_policy,
+	.deinit_sched = deinitialize_dmda_policy,
+	.push_task = dmda_push_sorted_task, 
+	.push_prio_task = dmda_push_sorted_task, 
+	.pop_task = dmda_pop_ready_task,
+	.pop_every_task = dmda_pop_every_task,
+	.policy_name = "dmdas",
+	.policy_description = "data-aware performance model (sorted)"
+};
+
 struct starpu_sched_policy_s _starpu_sched_dmda_ready_policy = {
 	.init_sched = initialize_dmda_policy,
 	.deinit_sched = deinitialize_dmda_policy,