Sfoglia il codice sorgente

bug fix
modif in worker.c in combined_worker_can_execute_task

Simon Archipoff 12 anni fa
parent
commit
c04166861e
3 ha cambiato i file con 63 aggiunte e 29 eliminazioni
  1. 1 1
      src/core/workers.c
  2. 6 4
      src/sched_policies/node_heft.c
  3. 56 24
      src/sched_policies/node_worker.c

+ 1 - 1
src/core/workers.c

@@ -302,7 +302,7 @@ int starpu_combined_worker_can_execute_task(unsigned workerid, struct starpu_tas
 			/* Is the worker larger than requested ? */
 			int worker_size = (int)config.combined_workers[workerid - nworkers].worker_size;
 			return !!((worker_size <= task->cl->max_parallelism) &&
-				_starpu_can_use_nth_implementation(config.workers[workerid].arch, task->cl, nimpl));
+				  _starpu_can_use_nth_implementation(config.workers[workerid - nworkers].arch, task->cl, nimpl));
 		}
 		else
 		{

+ 6 - 4
src/sched_policies/node_heft.c

@@ -123,7 +123,9 @@ static int push_task(struct _starpu_sched_node * node, struct starpu_task * task
 			index_best_fitness = i;
 		}
 	}
+
 	STARPU_ASSERT(best_fitness != DBL_MAX);
+
 	struct _starpu_sched_node * c = node->childs[index_best_fitness];
 	starpu_task_set_implementation(task, preds[index_best_fitness].impl);
 	task->predicted = preds[index_best_fitness].expected_length;
@@ -252,7 +254,6 @@ void _starpu_sched_node_heft_set_no_model_node(struct _starpu_sched_node * heft_
 
 struct _starpu_sched_node * _starpu_sched_node_heft_create(void * arg STARPU_ATTRIBUTE_UNUSED)
 {
-
 	struct _starpu_sched_node * node = _starpu_sched_node_create();
 
 	node->push_task = push_task;
@@ -281,12 +282,13 @@ static void initialize_heft_center_policy(unsigned sched_ctx_id)
 	{
 		struct _starpu_sched_node * worker_node = _starpu_sched_node_worker_get(i);
 		STARPU_ASSERT(worker_node);
+/*
 		struct _starpu_sched_node * fifo_node = _starpu_sched_node_fifo_create(NULL);
 		_starpu_sched_node_add_child(fifo_node, worker_node);
 		_starpu_sched_node_set_father(worker_node, fifo_node, sched_ctx_id);
-
-		_starpu_sched_node_add_child(t->root, fifo_node);
-		_starpu_sched_node_set_father(fifo_node, t->root, sched_ctx_id);
+*/
+		_starpu_sched_node_add_child(t->root, worker_node);
+		_starpu_sched_node_set_father(worker_node, t->root, sched_ctx_id);
 	}
 	
 	_starpu_set_workers_bitmaps();

+ 56 - 24
src/sched_policies/node_worker.c

@@ -62,6 +62,7 @@ struct _starpu_worker_task_list
 {
 	double exp_start, exp_len, exp_end;
 	struct _starpu_task_grid *first, *last;
+	unsigned ntasks;
 	starpu_pthread_mutex_t mutex;
 };
 
@@ -94,10 +95,12 @@ static void _starpu_task_grid_destroy(struct _starpu_task_grid * t)
 }
 static void _starpu_worker_task_list_destroy(struct _starpu_worker_task_list * l)
 {
-	if(!l)
-		return;
-	STARPU_PTHREAD_MUTEX_DESTROY(&l->mutex);
-	free(l);
+	if(l)
+	{
+
+		STARPU_PTHREAD_MUTEX_DESTROY(&l->mutex);
+		free(l);
+	}
 }
 
 /* the task, ntasks, pntasks, left and right field members are set by the caller */
@@ -109,6 +112,7 @@ static inline void _starpu_worker_task_list_push(struct _starpu_worker_task_list
 	l->last->up = t;
 	t->up = NULL;
 	l->last = t;
+	l->ntasks++;
 }
 
 //recursively set left and right pointers to NULL
@@ -167,6 +171,7 @@ static inline struct starpu_task * _starpu_worker_task_list_pop(struct _starpu_w
 			STARPU_ATOMIC_ADD(p, -1);
 			if(*p == 0)
 				_starpu_task_grid_unset_left_right_member(t);
+			l->ntasks--;
 			return task;
 		}
 		t = t->up;
@@ -227,6 +232,7 @@ int _starpu_sched_node_worker_push_task(struct _starpu_sched_node * node, struct
 	STARPU_PTHREAD_MUTEX_LOCK(&data->list->mutex);
 	_starpu_worker_task_list_push(data->list, t);
 	STARPU_PTHREAD_MUTEX_UNLOCK(&data->list->mutex);
+	node->available(node);
 	return 0;
 }
 
@@ -280,7 +286,7 @@ void _starpu_sched_node_worker_destroy(struct _starpu_sched_node *node)
 	_worker_nodes[id] = NULL;
 }
 
-static void available_worker(struct _starpu_sched_node * worker_node)
+static void simple_worker_available(struct _starpu_sched_node * worker_node)
 {
 	(void) worker_node;
 
@@ -296,7 +302,7 @@ static void available_worker(struct _starpu_sched_node * worker_node)
 #endif
 }
 
-static void available_combined_worker(struct _starpu_sched_node * node)
+static void combined_worker_available(struct _starpu_sched_node * node)
 {
 	(void) node;
 #ifndef STARPU_NON_BLOCKING_DRIVERS
@@ -399,7 +405,7 @@ static struct _starpu_task_execute_preds estimated_execute_preds(struct _starpu_
 	int nimpl;
 	for(nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
 	{
-		if(starpu_worker_can_execute_task(workerid,task,nimpl))
+		if(/*starpu_worker_can_execute_task(workerid,task,nimpl) ||*/ starpu_combined_worker_can_execute_task(workerid, task, nimpl))
 		{
 			double d;
 			if(bundle)
@@ -455,7 +461,7 @@ static struct _starpu_task_execute_preds simple_worker_estimated_execute_preds(s
 }
 
 
-static double estimated_load(struct _starpu_sched_node * node)
+static double simple_worker_estimated_load(struct _starpu_sched_node * node)
 {
 	struct _starpu_worker * worker = _starpu_sched_node_worker_get_worker(node);
 	int nb_task = 0;
@@ -467,10 +473,27 @@ static double estimated_load(struct _starpu_sched_node * node)
 	    task = starpu_task_list_next(task))
 		nb_task++;
 	STARPU_PTHREAD_MUTEX_UNLOCK(&worker->mutex);
-	return (double) nb_task
+	struct _starpu_worker_node_data * d = node->data;
+	struct _starpu_worker_task_list * l = d->list;
+	int ntasks_in_fifo = l ? l->ntasks : 0;
+	return (double) (nb_task + ntasks_in_fifo)
 		/ starpu_worker_get_relative_speedup(_starpu_bitmap_first(node->workers));
 }
 
+static double combined_worker_estimated_load(struct _starpu_sched_node * node)
+{
+	struct _starpu_worker_node_data * d = node->data;
+	struct _starpu_combined_worker * c = d->combined_worker;
+	double load = 0;
+	int i;
+	for(i = 0; i < c->worker_size; i++)
+	{
+		struct _starpu_sched_node * n = _starpu_sched_node_worker_get(c->combined_workerid[i]);
+		load += n->estimated_load(n);
+	}
+	return load;
+}
+
 static void worker_deinit_data(struct _starpu_sched_node * node)
 {
 	struct _starpu_worker_node_data * data = node->data;
@@ -529,20 +552,29 @@ static int _starpu_sched_node_combined_worker_push_task(struct _starpu_sched_nod
 	STARPU_PTHREAD_MUTEX_UNLOCK(mutex_to_unlock);
 
 	int workerid = starpu_worker_get_id();
-	starpu_pthread_mutex_t *worker_sched_mutex;
-	starpu_pthread_cond_t *worker_sched_cond;
-	starpu_worker_get_sched_condition(workerid, &worker_sched_mutex, &worker_sched_cond);
-	STARPU_PTHREAD_MUTEX_UNLOCK(worker_sched_mutex);
-
-	/* wake up all other workers of combined worker */
-	for(i = 0; i < combined_worker->worker_size; i++)
+	if(-1 == workerid)
 	{
-		struct _starpu_sched_node * worker_node = _starpu_sched_node_worker_get(combined_worker->combined_workerid[i]);
-		worker_node->available(worker_node);
+		node->available(node);
 	}
+	else
+	{
+		starpu_pthread_mutex_t *worker_sched_mutex;
+		starpu_pthread_cond_t *worker_sched_cond;
+		starpu_worker_get_sched_condition(workerid, &worker_sched_mutex, &worker_sched_cond);
+		STARPU_PTHREAD_MUTEX_UNLOCK(worker_sched_mutex);
+		
+		/* wake up all other workers of combined worker */
+		for(i = 0; i < combined_worker->worker_size; i++)
+		{
+			struct _starpu_sched_node * worker_node = _starpu_sched_node_worker_get(combined_worker->combined_workerid[i]);
+			worker_node->available(worker_node);
+		}
 
-	STARPU_PTHREAD_MUTEX_LOCK(worker_sched_mutex);
-	return 0;
+		node->available(node);
+
+		STARPU_PTHREAD_MUTEX_LOCK(worker_sched_mutex);
+	}
+	return 0;	
 }
 
 static struct _starpu_sched_node * _starpu_sched_node_worker_create(int workerid)
@@ -564,8 +596,8 @@ static struct _starpu_sched_node * _starpu_sched_node_worker_create(int workerid
 	node->push_task = _starpu_sched_node_worker_push_task;
 	node->pop_task = _starpu_sched_node_worker_pop_task;
 	node->estimated_execute_preds = simple_worker_estimated_execute_preds;
-	node->estimated_load = estimated_load;
-	node->available = available_worker;
+	node->estimated_load = simple_worker_estimated_load;
+	node->available = simple_worker_available;
 	node->deinit_data = worker_deinit_data;
 	node->workers = _starpu_bitmap_create();
 	_starpu_bitmap_set(node->workers, workerid);
@@ -602,8 +634,8 @@ static struct _starpu_sched_node  * _starpu_sched_node_combined_worker_create(in
 	node->push_task = _starpu_sched_node_combined_worker_push_task;
 	node->pop_task = NULL;
 	node->estimated_execute_preds = combined_worker_estimated_execute_preds;
-	node->estimated_load = estimated_load;
-	node->available = available_combined_worker;
+	node->estimated_load = combined_worker_estimated_load;
+	node->available = combined_worker_available;
 	node->deinit_data = worker_deinit_data;
 	node->workers = _starpu_bitmap_create();
 	_starpu_bitmap_set(node->workers, workerid);