Browse Source

add new dmda scheduler: dmdasd that considers priority when deciding on which worker to schedule
(precise algo that considers dynamically detected nr of priorities and that takes the locks).
TODO: statically detected nr of priorities and iterate on them ( less overhead)and the scheduler

Andra Hugo 10 years ago
parent
commit
ef3659f515

+ 1 - 0
src/core/sched_policy.c

@@ -55,6 +55,7 @@ static struct starpu_sched_policy *predefined_policies[] =
 	&_starpu_sched_dmda_policy,
 	&_starpu_sched_dmda_policy,
 	&_starpu_sched_dmda_ready_policy,
 	&_starpu_sched_dmda_ready_policy,
 	&_starpu_sched_dmda_sorted_policy,
 	&_starpu_sched_dmda_sorted_policy,
+	&_starpu_sched_dmda_sorted_decision_policy,
 	&_starpu_sched_parallel_heft_policy,
 	&_starpu_sched_parallel_heft_policy,
 	&_starpu_sched_peager_policy,
 	&_starpu_sched_peager_policy,
 	NULL
 	NULL

+ 1 - 0
src/core/sched_policy.h

@@ -67,6 +67,7 @@ extern struct starpu_sched_policy _starpu_sched_dm_policy;
 extern struct starpu_sched_policy _starpu_sched_dmda_policy;
 extern struct starpu_sched_policy _starpu_sched_dmda_policy;
 extern struct starpu_sched_policy _starpu_sched_dmda_ready_policy;
 extern struct starpu_sched_policy _starpu_sched_dmda_ready_policy;
 extern struct starpu_sched_policy _starpu_sched_dmda_sorted_policy;
 extern struct starpu_sched_policy _starpu_sched_dmda_sorted_policy;
+extern struct starpu_sched_policy _starpu_sched_dmda_sorted_decision_policy;
 extern struct starpu_sched_policy _starpu_sched_eager_policy;
 extern struct starpu_sched_policy _starpu_sched_eager_policy;
 extern struct starpu_sched_policy _starpu_sched_parallel_heft_policy;
 extern struct starpu_sched_policy _starpu_sched_parallel_heft_policy;
 extern struct starpu_sched_policy _starpu_sched_peager_policy;
 extern struct starpu_sched_policy _starpu_sched_peager_policy;

+ 61 - 11
src/sched_policies/deque_modeling_policy_data_aware.c

@@ -530,7 +530,7 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 						double *best_exp_endp,
 						double *best_exp_endp,
 						double local_data_penalty[nworkers][STARPU_MAXIMPLEMENTATIONS],
 						double local_data_penalty[nworkers][STARPU_MAXIMPLEMENTATIONS],
 						double local_power[nworkers][STARPU_MAXIMPLEMENTATIONS],
 						double local_power[nworkers][STARPU_MAXIMPLEMENTATIONS],
-						int *forced_worker, int *forced_impl, unsigned sched_ctx_id)
+						int *forced_worker, int *forced_impl, unsigned sched_ctx_id, unsigned sorted_decision)
 {
 {
 	int calibrating = 0;
 	int calibrating = 0;
 	double max_exp_end = DBL_MIN;
 	double max_exp_end = DBL_MIN;
@@ -573,9 +573,23 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 				/* no one on that queue may execute this task */
 				/* no one on that queue may execute this task */
 				continue;
 				continue;
 			}
 			}
-
 			STARPU_ASSERT_MSG(fifo != NULL, "worker %d ctx %d\n", worker, sched_ctx_id);
 			STARPU_ASSERT_MSG(fifo != NULL, "worker %d ctx %d\n", worker, sched_ctx_id);
-			exp_end[worker_ctx][nimpl] = exp_start + fifo->exp_len;
+
+			int fifo_ntasks = fifo->ntasks;
+			double prev_exp_len = fifo->exp_len;
+			/* consider the priority of the task when deciding on which worker to schedule, 
+			   compute the expected_end of the task if it is inserted before other tasks already scheduled */
+			if(sorted_decision)
+			{
+				starpu_pthread_mutex_t *sched_mutex;
+				starpu_pthread_cond_t *sched_cond;
+				starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
+				STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
+				prev_exp_len = _starpu_fifo_get_exp_len_prev_task_list(fifo, task, worker, nimpl, &fifo_ntasks);
+				STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
+			}
+				
+			exp_end[worker_ctx][nimpl] = exp_start + prev_exp_len;
 			if (exp_end[worker_ctx][nimpl] > max_exp_end)
 			if (exp_end[worker_ctx][nimpl] > max_exp_end)
 				max_exp_end = exp_end[worker_ctx][nimpl];
 				max_exp_end = exp_end[worker_ctx][nimpl];
 
 
@@ -598,7 +612,7 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 				if (conversion_time > 0.0)
 				if (conversion_time > 0.0)
 					local_task_length[worker_ctx][nimpl] += conversion_time;
 					local_task_length[worker_ctx][nimpl] += conversion_time;
 			}
 			}
-			double ntasks_end = fifo->ntasks / starpu_worker_get_relative_speedup(perf_arch);
+			double ntasks_end = fifo_ntasks / starpu_worker_get_relative_speedup(perf_arch);
 
 
 			/*
 			/*
 			 * This implements a default greedy scheduler for the
 			 * This implements a default greedy scheduler for the
@@ -650,7 +664,7 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 			if (unknown)
 			if (unknown)
 				continue;
 				continue;
 
 
-			exp_end[worker_ctx][nimpl] = exp_start + fifo->exp_len + local_task_length[worker_ctx][nimpl];
+			exp_end[worker_ctx][nimpl] = exp_start + prev_exp_len + local_task_length[worker_ctx][nimpl];
 
 
 			if (exp_end[worker_ctx][nimpl] < best_exp_end)
 			if (exp_end[worker_ctx][nimpl] < best_exp_end)
 			{
 			{
@@ -673,7 +687,7 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 	*max_exp_endp = max_exp_end;
 	*max_exp_endp = max_exp_end;
 }
 }
 
 
-static double _dmda_push_task(struct starpu_task *task, unsigned prio, unsigned sched_ctx_id, unsigned simulate)
+static double _dmda_push_task(struct starpu_task *task, unsigned prio, unsigned sched_ctx_id, unsigned simulate, unsigned sorted_decision)
 {
 {
 	/* find the queue */
 	/* find the queue */
 	unsigned worker, worker_ctx = 0;
 	unsigned worker, worker_ctx = 0;
@@ -705,6 +719,7 @@ static double _dmda_push_task(struct starpu_task *task, unsigned prio, unsigned
 
 
 	double fitness[nworkers_ctx][STARPU_MAXIMPLEMENTATIONS];
 	double fitness[nworkers_ctx][STARPU_MAXIMPLEMENTATIONS];
 
 
+
 	compute_all_performance_predictions(task,
 	compute_all_performance_predictions(task,
 					    nworkers_ctx,
 					    nworkers_ctx,
 					    local_task_length,
 					    local_task_length,
@@ -714,7 +729,8 @@ static double _dmda_push_task(struct starpu_task *task, unsigned prio, unsigned
 					    local_data_penalty,
 					    local_data_penalty,
 					    local_power,
 					    local_power,
 					    &forced_best,
 					    &forced_best,
-					    &forced_impl, sched_ctx_id);
+					    &forced_impl, sched_ctx_id, sorted_decision);
+	
 	
 	
 	double best_fitness = -1;
 	double best_fitness = -1;
 
 
@@ -805,12 +821,17 @@ static double _dmda_push_task(struct starpu_task *task, unsigned prio, unsigned
 	}
 	}
 }
 }
 
 
+static int dmda_push_sorted_decision_task(struct starpu_task *task)
+{
+	return _dmda_push_task(task, 1, task->sched_ctx, 0, 1);
+}
+
 static int dmda_push_sorted_task(struct starpu_task *task)
 static int dmda_push_sorted_task(struct starpu_task *task)
 {
 {
 #ifdef STARPU_DEVEL
 #ifdef STARPU_DEVEL
 #warning TODO: after defining a scheduling window, use that instead of empty_ctx_tasks
 #warning TODO: after defining a scheduling window, use that instead of empty_ctx_tasks
 #endif
 #endif
-	return _dmda_push_task(task, 1, task->sched_ctx, 0);
+	return _dmda_push_task(task, 1, task->sched_ctx, 0, 0);
 }
 }
 
 
 static int dm_push_task(struct starpu_task *task)
 static int dm_push_task(struct starpu_task *task)
@@ -821,12 +842,24 @@ static int dm_push_task(struct starpu_task *task)
 static int dmda_push_task(struct starpu_task *task)
 static int dmda_push_task(struct starpu_task *task)
 {
 {
 	STARPU_ASSERT(task);
 	STARPU_ASSERT(task);
-	return _dmda_push_task(task, 0, task->sched_ctx, 0);
+	return _dmda_push_task(task, 0, task->sched_ctx, 0, 0);
 }
 }
 static double dmda_simulate_push_task(struct starpu_task *task)
 static double dmda_simulate_push_task(struct starpu_task *task)
 {
 {
 	STARPU_ASSERT(task);
 	STARPU_ASSERT(task);
-	return _dmda_push_task(task, 0, task->sched_ctx, 1);
+	return _dmda_push_task(task, 0, task->sched_ctx, 1, 0);
+}
+
+static double dmda_simulate_push_sorted_task(struct starpu_task *task)
+{
+	STARPU_ASSERT(task);
+	return _dmda_push_task(task, 1, task->sched_ctx, 1, 0);
+}
+
+static double dmda_simulate_push_sorted_decision_task(struct starpu_task *task)
+{
+	STARPU_ASSERT(task);
+	return _dmda_push_task(task, 1, task->sched_ctx, 1, 1);
 }
 }
 
 
 static void dmda_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
 static void dmda_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
@@ -1056,7 +1089,7 @@ struct starpu_sched_policy _starpu_sched_dmda_sorted_policy =
 	.add_workers = dmda_add_workers ,
 	.add_workers = dmda_add_workers ,
 	.remove_workers = dmda_remove_workers,
 	.remove_workers = dmda_remove_workers,
 	.push_task = dmda_push_sorted_task,
 	.push_task = dmda_push_sorted_task,
-	.simulate_push_task = NULL,
+	.simulate_push_task = dmda_simulate_push_sorted_task,
 	.push_task_notify = dmda_push_task_notify,
 	.push_task_notify = dmda_push_task_notify,
 	.pop_task = dmda_pop_ready_task,
 	.pop_task = dmda_pop_ready_task,
 	.pre_exec_hook = dmda_pre_exec_hook,
 	.pre_exec_hook = dmda_pre_exec_hook,
@@ -1066,6 +1099,23 @@ struct starpu_sched_policy _starpu_sched_dmda_sorted_policy =
 	.policy_description = "data-aware performance model (sorted)"
 	.policy_description = "data-aware performance model (sorted)"
 };
 };
 
 
+struct starpu_sched_policy _starpu_sched_dmda_sorted_decision_policy =
+{
+	.init_sched = initialize_dmda_sorted_policy,
+	.deinit_sched = deinitialize_dmda_policy,
+	.add_workers = dmda_add_workers ,
+	.remove_workers = dmda_remove_workers,
+	.push_task = dmda_push_sorted_decision_task,
+	.simulate_push_task = dmda_simulate_push_sorted_decision_task,
+	.push_task_notify = dmda_push_task_notify,
+	.pop_task = dmda_pop_ready_task,
+	.pre_exec_hook = dmda_pre_exec_hook,
+	.post_exec_hook = dmda_post_exec_hook,
+	.pop_every_task = dmda_pop_every_task,
+	.policy_name = "dmdasd",
+	.policy_description = "data-aware performance model (sorted decision)"
+};
+
 struct starpu_sched_policy _starpu_sched_dmda_ready_policy =
 struct starpu_sched_policy _starpu_sched_dmda_ready_policy =
 {
 {
 	.init_sched = initialize_dmda_policy,
 	.init_sched = initialize_dmda_policy,

+ 46 - 0
src/sched_policies/fifo_queues.c

@@ -69,6 +69,52 @@ int _starpu_fifo_empty(struct _starpu_fifo_taskq *fifo)
 	return fifo->ntasks == 0;
 	return fifo->ntasks == 0;
 }
 }
 
 
+double 
+_starpu_fifo_get_exp_len_prev_task_list(struct _starpu_fifo_taskq *fifo_queue, struct starpu_task *task, int workerid, int nimpl, int *fifo_ntasks)
+{
+	struct starpu_task_list *list = &fifo_queue->taskq;
+	struct starpu_perfmodel_arch* perf_arch = starpu_worker_get_perf_archtype(workerid, task->sched_ctx);
+	double exp_len = 0.0;
+	
+	if (list->head != NULL)
+	{
+		struct starpu_task *current = list->head;
+		struct starpu_task *prev = NULL;
+
+		while (current)
+		{
+			if (current->priority < task->priority)
+				break;
+
+			prev = current;
+			current = current->next;
+		}
+
+		if (prev != NULL)
+		{
+			if (current)
+			{
+				/* the task's place is between prev and current */
+				struct starpu_task *it;
+				for(it = list->head; it != current; it = it->next)
+				{
+					exp_len += starpu_task_expected_length(it, perf_arch, nimpl);
+					(*fifo_ntasks) ++;
+				}
+			}
+			else
+			{
+				/* the task's place is at the tail of the list */
+				exp_len = fifo_queue->exp_len;
+				*fifo_ntasks = fifo_queue->ntasks;
+			}
+		}
+	}
+
+
+	return exp_len;
+}
+
 int
 int
 _starpu_fifo_push_sorted_task(struct _starpu_fifo_taskq *fifo_queue, struct starpu_task *task)
 _starpu_fifo_push_sorted_task(struct _starpu_fifo_taskq *fifo_queue, struct starpu_task *task)
 {
 {

+ 3 - 0
src/sched_policies/fifo_queues.h

@@ -45,6 +45,9 @@ void _starpu_destroy_fifo(struct _starpu_fifo_taskq *fifo);
 
 
 int _starpu_fifo_empty(struct _starpu_fifo_taskq *fifo);
 int _starpu_fifo_empty(struct _starpu_fifo_taskq *fifo);
 
 
+double _starpu_fifo_get_exp_len_prev_task_list(struct _starpu_fifo_taskq *fifo_queue, struct starpu_task *task, 
+					       int workerid, int nimpl, int *fifo_ntasks);
+
 int _starpu_fifo_push_sorted_task(struct _starpu_fifo_taskq *fifo_queue, struct starpu_task *task);
 int _starpu_fifo_push_sorted_task(struct _starpu_fifo_taskq *fifo_queue, struct starpu_task *task);
 
 
 int _starpu_fifo_push_task(struct _starpu_fifo_taskq *fifo, struct starpu_task *task);
 int _starpu_fifo_push_task(struct _starpu_fifo_taskq *fifo, struct starpu_task *task);