Explorar o código

change all node->available(node) to starpu_sched_node_available(node)

Simon Archipoff %!s(int64=11) %!d(string=hai) anos
pai
achega
697d92474c

+ 3 - 10
include/starpu_sched_node.h

@@ -35,15 +35,6 @@ struct starpu_sched_node
 	struct starpu_task * (*pop_task)(struct starpu_sched_node *,
 					 unsigned sched_ctx_id);
 
-	/* this function notify underlying worker that a task as been pushed
-	 * and would be returned by a pop_task call
-	 * it should be called each time a node localy store a task
-
-	 * default implementation simply perform a recursive call on childrens
-	 * this function can be called by a worker as it doesn't try to wake up himself
-	 */
-	void (*available)(struct starpu_sched_node *);
-
 	/* this function is an heuristic that compute load of subtree, basicaly
 	 * it compute
 	 * estimated_load(node) = sum(estimated_load(node_childs)) +
@@ -198,7 +189,9 @@ void starpu_sched_tree_update_workers(struct starpu_sched_tree * t);
  *
  */
 void starpu_sched_tree_update_workers_in_ctx(struct starpu_sched_tree * t);
-
+/* wake up underlaying workers of node
+ */
+void starpu_sched_node_available(struct starpu_sched_node * node);
 
 int starpu_sched_tree_push_task(struct starpu_task * task);
 struct starpu_task * starpu_sched_tree_pop_task(unsigned sched_ctx_id);

+ 3 - 3
src/sched_policies/node_composed.c

@@ -104,13 +104,13 @@ struct starpu_task * composed_node_pop_task(struct starpu_sched_node *node, unsi
 	return NULL;
 }
 
-
+/*
 void composed_node_available(struct starpu_sched_node *node)
 {
 	struct composed_node * c = node->data;
 	c->top->available(c->top);
 }
-	
+*/	
 double composed_node_estimated_load(struct starpu_sched_node * node)
 {
 	struct composed_node * c = node->data;
@@ -187,7 +187,7 @@ struct starpu_sched_node * starpu_sched_node_composed_node_create(struct _starpu
 	node->data = c;
 	node->push_task = composed_node_push_task;
 	node->pop_task = composed_node_pop_task;
-	node->available = composed_node_available;
+//	node->available = composed_node_available;
 	node->estimated_load = composed_node_estimated_load;
 	node->add_child = composed_node_add_child;
 	node->remove_child = composed_node_remove_child;

+ 3 - 1
src/sched_policies/node_fifo.c

@@ -90,7 +90,9 @@ static int push_task(struct starpu_sched_node * node, struct starpu_task * task)
 	STARPU_ASSERT(!isnan(fifo->exp_start));
 	STARPU_PTHREAD_MUTEX_UNLOCK(mutex);
 
-	node->available(node);
+
+	starpu_sched_node_available(node);
+//	node->available(node);
 	return ret;
 }
 

+ 0 - 1
src/sched_policies/node_random.c

@@ -54,7 +54,6 @@ static int push_task(struct starpu_sched_node * node, struct starpu_task * task)
 	}
 	STARPU_ASSERT(select != NULL);
 	int ret_val = select->push_task(select,task);
-	node->available(node);
 
 	return ret_val;
 }

+ 40 - 4
src/sched_policies/node_sched.c

@@ -32,15 +32,52 @@ double starpu_sched_compute_expected_time(double now, double predicted_end, doub
 	return predicted_end;
 }
 
-static void available(struct starpu_sched_node * node)
+
+static void _wake_simple_worker(int workerid)
+{
+	STARPU_ASSERT(0 <= workerid && workerid < starpu_worker_get_count());
+	starpu_pthread_mutex_t * sched_mutex;
+	starpu_pthread_cond_t * sched_cond;
+	if(workerid == starpu_worker_get_id())
+		return;
+	starpu_worker_get_sched_condition(workerid, &sched_mutex, &sched_cond);
+	STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
+	STARPU_PTHREAD_COND_SIGNAL(sched_cond);
+	STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
+}
+
+static void _wake_combined_worker(int workerid)
+{
+	STARPU_ASSERT(starpu_worker_get_count() <= workerid
+		      && workerid < starpu_worker_get_count() + starpu_combined_worker_get_count());
+	int me = starpu_worker_get_id();
+	struct _starpu_combined_worker * combined_worker = _starpu_get_combined_worker_struct(workerid);
+	int * list = combined_worker->combined_workerid;
+	int size = combined_worker->worker_size;
+	int i;
+	for(i = 0; i < size; i++)
+		_wake_simple_worker(list[i]);
+}
+
+void starpu_sched_node_available(struct starpu_sched_node * node)
 {
 	(void)node;
+	STARPU_ASSERT(!starpu_sched_node_is_worker(node));
 #ifndef STARPU_NON_BLOCKING_DRIVERS
 	int i;
-	for(i = 0; i < node->nchilds; i++)
-		node->childs[i]->available(node->childs[i]);
+	for(i = starpu_bitmap_first(node->workers_in_ctx);
+	    i != -1;
+	    i = starpu_bitmap_next(node->workers_in_ctx, i))
+	{
+		if(i < starpu_worker_get_count())
+			_wake_simple_worker(i);
+		else
+			_wake_combined_worker(i);
+	}
 #endif
 }
+
+
 static struct starpu_task * pop_task_node(struct starpu_sched_node * node, unsigned sched_ctx_id)
 {
 	if(node->fathers[sched_ctx_id] == NULL)
@@ -390,7 +427,6 @@ struct starpu_sched_node * starpu_sched_node_create(void)
 	memset(node,0,sizeof(*node));
 	node->workers = starpu_bitmap_create();
 	node->workers_in_ctx = starpu_bitmap_create();
-	node->available = available;
 	node->add_child = starpu_sched_node_add_child;
 	node->remove_child = starpu_sched_node_remove_child;
 	node->notify_change_workers = take_node_and_does_nothing;

+ 5 - 3
src/sched_policies/node_work_stealing.c

@@ -193,7 +193,8 @@ static int push_task(struct starpu_sched_node * node, struct starpu_task * task)
 	STARPU_PTHREAD_MUTEX_UNLOCK(wsd->mutexes[i]);
 
 	wsd->last_push_child = i;
-	node->available(node);
+//	node->available(node);
+	starpu_sched_node_available(node);
 	return ret;
 }
 
@@ -229,14 +230,15 @@ int _starpu_ws_push_task(struct starpu_task *task)
 			STARPU_PTHREAD_MUTEX_UNLOCK(wsd->mutexes[i]);
 			
 			//we need to wake all workers
-			int j;
+/*			int j;
 			for(j = 0; j < node->nchilds; j++)
 			{
 				if(j == i)
 					continue;
 				node->childs[j]->available(node->childs[j]);
 			}
-
+*/
+			starpu_sched_node_available(node);
 			return ret;
 		}
 	}

+ 45 - 43
src/sched_policies/node_worker.c

@@ -284,6 +284,44 @@ enum starpu_perfmodel_archtype starpu_sched_node_worker_get_perf_arch(struct sta
 		return _starpu_sched_node_combined_worker_get_combined_worker(worker_node)->perf_arch;
 }
 
+static void simple_worker_available(struct starpu_sched_node * worker_node)
+{
+	(void) worker_node;
+
+#ifndef STARPU_NON_BLOCKING_DRIVERS
+	struct _starpu_worker * w = _starpu_sched_node_worker_get_worker(worker_node);
+	if(w->workerid == starpu_worker_get_id())
+		return;
+	starpu_pthread_mutex_t *sched_mutex = &w->sched_mutex;
+	starpu_pthread_cond_t *sched_cond = &w->sched_cond;
+	STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
+	STARPU_PTHREAD_COND_SIGNAL(sched_cond);
+	STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
+#endif
+}
+
+static void combined_worker_available(struct starpu_sched_node * node)
+{
+	(void) node;
+#ifndef STARPU_NON_BLOCKING_DRIVERS
+	STARPU_ASSERT(starpu_sched_node_is_combined_worker(node));
+	struct _starpu_worker_node_data * data = node->data;
+	int workerid = starpu_worker_get_id();
+	int i;
+	for(i = 0; i < data->combined_worker->worker_size; i++)
+	{
+		if(i == workerid)
+			continue;
+		int worker = data->combined_worker->combined_workerid[i];
+		starpu_pthread_mutex_t *sched_mutex;
+		starpu_pthread_cond_t *sched_cond;
+		starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
+		STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
+		STARPU_PTHREAD_COND_SIGNAL(sched_cond);
+		STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
+	}
+#endif
+}
 
 int starpu_sched_node_worker_push_task(struct starpu_sched_node * node, struct starpu_task *task)
 {
@@ -304,7 +342,7 @@ int starpu_sched_node_worker_push_task(struct starpu_sched_node * node, struct s
 	STARPU_PTHREAD_MUTEX_LOCK(&data->list->mutex);
 	_starpu_worker_task_list_push(data->list, t);
 	STARPU_PTHREAD_MUTEX_UNLOCK(&data->list->mutex);
-	node->available(node);
+	simple_worker_available(node);	
 	return 0;
 }
 
@@ -388,44 +426,7 @@ void _starpu_sched_node_unlock_all_workers(void)
 
 
 
-static void simple_worker_available(struct starpu_sched_node * worker_node)
-{
-	(void) worker_node;
 
-#ifndef STARPU_NON_BLOCKING_DRIVERS
-	struct _starpu_worker * w = _starpu_sched_node_worker_get_worker(worker_node);
-	if(w->workerid == starpu_worker_get_id())
-		return;
-	starpu_pthread_mutex_t *sched_mutex = &w->sched_mutex;
-	starpu_pthread_cond_t *sched_cond = &w->sched_cond;
-	STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
-	STARPU_PTHREAD_COND_SIGNAL(sched_cond);
-	STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
-#endif
-}
-
-static void combined_worker_available(struct starpu_sched_node * node)
-{
-	(void) node;
-#ifndef STARPU_NON_BLOCKING_DRIVERS
-	STARPU_ASSERT(starpu_sched_node_is_combined_worker(node));
-	struct _starpu_worker_node_data * data = node->data;
-	int workerid = starpu_worker_get_id();
-	int i;
-	for(i = 0; i < data->combined_worker->worker_size; i++)
-	{
-		if(i == workerid)
-			continue;
-		int worker = data->combined_worker->combined_workerid[i];
-		starpu_pthread_mutex_t *sched_mutex;
-		starpu_pthread_cond_t *sched_cond;
-		starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
-		STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
-		STARPU_PTHREAD_COND_SIGNAL(sched_cond);
-		STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
-	}
-#endif
-}
 
 static double estimated_transfer_length(struct starpu_sched_node * node,
 					struct starpu_task * task)
@@ -569,7 +570,7 @@ static int starpu_sched_node_combined_worker_push_task(struct starpu_sched_node
 	int workerid = starpu_worker_get_id();
 	if(-1 == workerid)
 	{
-		node->available(node);
+		combined_worker_available(node);
 	}
 	else
 	{
@@ -582,13 +583,14 @@ static int starpu_sched_node_combined_worker_push_task(struct starpu_sched_node
 		for(i = 0; i < combined_worker->worker_size; i++)
 		{
 			struct starpu_sched_node * worker_node = starpu_sched_node_worker_get(combined_worker->combined_workerid[i]);
-			worker_node->available(worker_node);
+			simple_worker_available(worker_node);
 		}
 
-		node->available(node);
+		combined_worker_available(node);
 
 		STARPU_PTHREAD_MUTEX_LOCK(worker_sched_mutex);
 	}
+
 	return 0;
 }
 
@@ -631,7 +633,7 @@ static struct starpu_sched_node * starpu_sched_node_worker_create(int workerid)
 	node->pop_task = starpu_sched_node_worker_pop_task;
 	node->estimated_end = simple_worker_estimated_end;
 	node->estimated_load = simple_worker_estimated_load;
-	node->available = simple_worker_available;
+//	node->available = simple_worker_available;
 	node->deinit_data = _worker_node_deinit_data;
 	starpu_bitmap_set(node->workers, workerid);
 	starpu_bitmap_or(node->workers_in_ctx, node->workers);
@@ -669,7 +671,7 @@ static struct starpu_sched_node  * starpu_sched_node_combined_worker_create(int
 	node->pop_task = NULL;
 	node->estimated_end = combined_worker_estimated_end;
 	node->estimated_load = combined_worker_estimated_load;
-	node->available = combined_worker_available;
+//	node->available = combined_worker_available;
 	node->deinit_data = _worker_node_deinit_data;
 	starpu_bitmap_set(node->workers, workerid);
 	starpu_bitmap_or(node->workers_in_ctx, node->workers);