Browse Source

update prediction function to new data structure

Simon Archipoff 12 years ago
parent
commit
befbeaa725
2 changed files with 45 additions and 8 deletions
  1. 0 1
      src/sched_policies/node_sched.c
  2. 45 7
      src/sched_policies/node_worker.c

+ 0 - 1
src/sched_policies/node_sched.c

@@ -368,7 +368,6 @@ void _starpu_sched_node_destroy(struct _starpu_sched_node *node)
 	}
 	free(node->childs);
 	_starpu_bitmap_destroy(node->workers);
-
 	free(node);
 }
 

+ 45 - 7
src/sched_policies/node_worker.c

@@ -161,7 +161,7 @@ static inline struct starpu_task * _starpu_worker_task_list_pop(struct _starpu_w
 		{
 			struct starpu_task * task = t->task;
 			t->task = NULL;
-			int * p = t->left ? t->ntasks : &t->ntasks;
+			int * p = t->left ? t->pntasks : &t->ntasks;
 			STARPU_ATOMIC_ADD(p, -1);
 			if(*p == 0)
 				_starpu_task_grid_unset_left_right_member(t);
@@ -303,15 +303,15 @@ static double estimated_transfer_length(struct _starpu_sched_node * node,
 {
 	STARPU_ASSERT(_starpu_sched_node_is_worker(node));
 	starpu_task_bundle_t bundle = task->bundle;
-	unsigned memory_node = starpu_worker_get_memory_node(_starpu_bitmap_first(node->workers));
+	struct _starpu_worker_node_data * data = node->data;
+	unsigned memory_node = data->worker ? data->worker->memory_node : data->combined_worker->memory_node;
 	if(bundle)
 		return starpu_task_bundle_expected_data_transfer_time(bundle, memory_node);
 	else
 		return starpu_task_expected_data_transfer_time(memory_node, task);
 }
-static double estimated_finish_time(struct _starpu_sched_node * node)
+static double worker_estimated_finish_time(struct _starpu_worker * worker)
 {
-	struct _starpu_worker * worker = _starpu_sched_node_worker_get_worker(node);
 	STARPU_PTHREAD_MUTEX_LOCK(&worker->mutex);
 	double sum = 0.0;
 	struct starpu_task_list list = worker->local_tasks;
@@ -332,7 +332,34 @@ static double estimated_finish_time(struct _starpu_sched_node * node)
 	return sum + starpu_timing_now();
 }
 
-struct _starpu_task_execute_preds estimated_execute_preds(struct _starpu_sched_node * node, struct starpu_task * task)
+static double combined_worker_expected_finish_time(struct _starpu_sched_node * node)
+{
+	STARPU_ASSERT(_starpu_sched_node_is_combined_worker(node));
+	struct _starpu_worker_node_data * data = node->data;
+	struct _starpu_combined_worker * combined_worker = data->combined_worker;
+	double max = 0.0;
+	int i;
+	for(i = 0; i < combined_worker->worker_size; i++)
+	{
+		data = _worker_nodes[combined_worker->combined_workerid[i]]->data;
+		STARPU_PTHREAD_MUTEX_LOCK(&data->list->mutex);
+		double tmp = data->list->exp_end;
+		STARPU_PTHREAD_MUTEX_UNLOCK(&data->list->mutex);
+		max = tmp > max ? tmp : max;
+	}
+	return max;
+}
+static double simple_worker_expected_finish_time(struct _starpu_sched_node * node)
+{
+	struct _starpu_worker_node_data * data = node->data;
+	STARPU_PTHREAD_MUTEX_LOCK(&data->list->mutex);
+	double tmp = data->list->exp_end;
+	STARPU_PTHREAD_MUTEX_UNLOCK(&data->list->mutex);
+	return tmp;
+}
+
+static struct _starpu_task_execute_preds estimated_execute_preds(struct _starpu_sched_node * node, struct starpu_task * task,
+								 double (*estimated_finish_time)(struct _starpu_sched_node*))
 {
 	STARPU_ASSERT(_starpu_sched_node_is_worker(node));
 	starpu_task_bundle_t bundle = task->bundle;
@@ -396,6 +423,17 @@ struct _starpu_task_execute_preds estimated_execute_preds(struct _starpu_sched_n
 	return preds;
 }
 
+static struct _starpu_task_execute_preds combined_worker_estimated_execute_preds(struct _starpu_sched_node * node, struct starpu_task * task)
+{
+	return estimated_execute_preds(node,task,combined_worker_expected_finish_time);
+}
+
+static struct _starpu_task_execute_preds simple_worker_estimated_execute_preds(struct _starpu_sched_node * node, struct starpu_task * task)
+{
+	return estimated_execute_preds(node,task,simple_worker_expected_finish_time);
+}
+
+
 static double estimated_load(struct _starpu_sched_node * node)
 {
 	struct _starpu_worker * worker = _starpu_sched_node_worker_get_worker(node);
@@ -490,7 +528,7 @@ static struct _starpu_sched_node * _starpu_sched_node_worker_create(int workerid
 	node->data = data;
 	node->push_task = _starpu_sched_node_worker_push_task;
 	node->pop_task = _starpu_sched_node_worker_pop_task;
-	node->estimated_execute_preds = estimated_execute_preds;
+	node->estimated_execute_preds = simple_worker_estimated_execute_preds;
 	node->estimated_load = estimated_load;
 	node->available = available_worker;
 	node->deinit_data = worker_deinit_data;
@@ -528,7 +566,7 @@ static struct _starpu_sched_node  * _starpu_sched_node_combined_worker_create(in
 	node->data = data;
 	node->push_task = _starpu_sched_node_combined_worker_push_task;
 	node->pop_task = NULL;
-	node->estimated_execute_preds = estimated_execute_preds;
+	node->estimated_execute_preds = combined_worker_estimated_execute_preds;
 	node->estimated_load = estimated_load;
 	node->available = available_combined_worker;
 	node->deinit_data = worker_deinit_data;