Browse Source

Experiment with a small optimization in the dmda scheduling policy: in the pop
method, instead of taking the first task, we take the first task for which all
data are already available (ie. we are not going to block the driver to execute
that task).

Cédric Augonnet 14 years ago
parent
commit
e96dd83619

+ 3 - 1
src/core/sched_policy.c

@@ -40,9 +40,10 @@ extern struct starpu_sched_policy_s _starpu_sched_no_prio_policy;
 extern struct starpu_sched_policy_s _starpu_sched_random_policy;
 extern struct starpu_sched_policy_s _starpu_sched_dm_policy;
 extern struct starpu_sched_policy_s _starpu_sched_dmda_policy;
+extern struct starpu_sched_policy_s _starpu_sched_dmda_ready_policy;
 extern struct starpu_sched_policy_s _starpu_sched_eager_policy;
 
-#define NPREDEFINED_POLICIES	7
+#define NPREDEFINED_POLICIES	8
 
 static struct starpu_sched_policy_s *predefined_policies[NPREDEFINED_POLICIES] = {
 	&_starpu_sched_ws_policy,
@@ -50,6 +51,7 @@ static struct starpu_sched_policy_s *predefined_policies[NPREDEFINED_POLICIES] =
 	&_starpu_sched_no_prio_policy,
 	&_starpu_sched_dm_policy,
 	&_starpu_sched_dmda_policy,
+	&_starpu_sched_dmda_ready_policy,
 	&_starpu_sched_random_policy,
 	&_starpu_sched_eager_policy
 };

+ 3 - 2
src/datawizard/user_interactions.c

@@ -383,7 +383,8 @@ void starpu_data_set_default_sequential_consistency_flag(unsigned flag)
 /* Query the status of the handle on the specified memory node. */
 void starpu_data_query_status(starpu_data_handle handle, int memory_node, int *is_allocated, int *is_valid, int *is_requested)
 {
-	_starpu_spin_lock(&handle->header_lock);
+#warning FIXME
+//	_starpu_spin_lock(&handle->header_lock);
 
 	if (is_allocated)
 		*is_allocated = handle->per_node[memory_node].allocated;
@@ -394,5 +395,5 @@ void starpu_data_query_status(starpu_data_handle handle, int memory_node, int *i
 	if (is_requested)
 		*is_requested = handle->per_node[memory_node].requested;
 
-	_starpu_spin_unlock(&handle->header_lock);
+//	_starpu_spin_unlock(&handle->header_lock);
 }

+ 133 - 1
src/sched_policies/deque_modeling_policy_data_aware.c

@@ -27,6 +27,110 @@ static pthread_mutex_t sched_mutex[STARPU_NMAXWORKERS];
 static double alpha = 1.0;
 static double beta = 1.0;
 
+#ifdef STARPU_VERBOSE
+static long int total_task_cnt = 0;
+static long int ready_task_cnt = 0;
+#endif
+
+static int count_non_ready_buffers(struct starpu_task *task, uint32_t node)
+{
+	int cnt = 0;
+
+	starpu_buffer_descr *descrs = task->buffers;
+	unsigned nbuffers = task->cl->nbuffers;
+
+	unsigned index;
+	for (index = 0; index < nbuffers; index++)
+	{
+		starpu_buffer_descr *descr;
+		starpu_data_handle handle;
+
+		descr = &descrs[index];
+		handle = descr->handle;
+		
+		int is_valid;
+		starpu_data_query_status(handle, node, NULL, &is_valid, NULL);
+
+		if (!is_valid)
+			cnt++;
+	}
+
+	return cnt;
+}
+
+struct starpu_task *_starpu_fifo_pop_first_ready_task(struct starpu_fifo_taskq_s *fifo_queue, unsigned node)
+{
+	struct starpu_task *task = NULL, *current;
+
+	if (fifo_queue->ntasks == 0)
+		return NULL;
+
+	if (fifo_queue->ntasks > 0) 
+	{
+		fifo_queue->ntasks--;
+
+		task = starpu_task_list_back(&fifo_queue->taskq);
+
+		current = task;
+
+		int non_ready_best = count_non_ready_buffers(current, node);
+
+		while (current)
+		{
+			int non_ready = count_non_ready_buffers(current, node);
+
+			if (non_ready < non_ready_best)
+			{
+				non_ready_best = non_ready;
+				task = current;
+
+				if (non_ready == 0)
+					break;
+			}
+
+			current = current->prev;
+		}
+		
+		starpu_task_list_erase(&fifo_queue->taskq, task);
+
+		STARPU_TRACE_JOB_POP(task, 0);
+	}
+	
+	return task;
+}
+
+static struct starpu_task *dmda_pop_ready_task(void)
+{
+	struct starpu_task *task;
+
+	int workerid = starpu_worker_get_id();
+	struct starpu_fifo_taskq_s *fifo = queue_array[workerid];
+
+	unsigned node = starpu_worker_get_memory_node(workerid);
+
+	task = _starpu_fifo_pop_first_ready_task(fifo, node);
+	if (task) {
+		double model = task->predicted;
+	
+		fifo->exp_len -= model;
+		fifo->exp_start = _starpu_timing_now() + model;
+		fifo->exp_end = fifo->exp_start + fifo->exp_len;
+
+#ifdef STARPU_VERBOSE
+		if (task->cl)
+		{
+			int non_ready = count_non_ready_buffers(task, starpu_worker_get_memory_node(workerid));
+			if (non_ready == 0)
+				ready_task_cnt++;
+		}
+
+		total_task_cnt++;
+#endif
+	}
+
+	return task;
+}
+
 static struct starpu_task *dmda_pop_task(void)
 {
 	struct starpu_task *task;
@@ -41,11 +145,24 @@ static struct starpu_task *dmda_pop_task(void)
 		fifo->exp_len -= model;
 		fifo->exp_start = _starpu_timing_now() + model;
 		fifo->exp_end = fifo->exp_start + fifo->exp_len;
-	}	
+
+#ifdef STARPU_VERBOSE
+		if (task->cl)
+		{
+			int non_ready = count_non_ready_buffers(task, starpu_worker_get_memory_node(workerid));
+			if (non_ready == 0)
+				ready_task_cnt++;
+		}
+
+		total_task_cnt++;
+#endif
+	}
 
 	return task;
 }
 
+
+
 static struct starpu_task *dmda_pop_every_task(uint32_t where)
 {
 	struct starpu_task *new_list;
@@ -314,6 +431,10 @@ static void deinitialize_dmda_policy(struct starpu_machine_topology_s *topology,
 	unsigned workerid;
 	for (workerid = 0; workerid < topology->nworkers; workerid++)
 		_starpu_destroy_fifo(queue_array[workerid]);
+
+#ifdef STARPU_VERBOSE
+	fprintf(stderr, "total_task_cnt %ld ready_task_cnt %ld -> %f\n", total_task_cnt, ready_task_cnt, (100.0f*ready_task_cnt)/total_task_cnt);
+#endif
 }
 
 struct starpu_sched_policy_s _starpu_sched_dm_policy = {
@@ -337,3 +458,14 @@ struct starpu_sched_policy_s _starpu_sched_dmda_policy = {
 	.policy_name = "dmda",
 	.policy_description = "data-aware performance model"
 };
+
+struct starpu_sched_policy_s _starpu_sched_dmda_ready_policy = {
+	.init_sched = initialize_dmda_policy,
+	.deinit_sched = deinitialize_dmda_policy,
+	.push_task = dmda_push_task, 
+	.push_prio_task = dmda_push_prio_task, 
+	.pop_task = dmda_pop_ready_task,
+	.pop_every_task = dmda_pop_every_task,
+	.policy_name = "dmdar",
+	.policy_description = "data-aware performance model (ready)"
+};