Browse Source

Avoid waking all workers all the time on can_pull, we only need to wake one of them

Samuel Thibault 8 years ago
parent
commit
61b1d6229a

+ 5 - 2
doc/doxygen/chapters/api/modularized_scheduler.doxy

@@ -81,14 +81,17 @@ like <c>component->push_task(component,task)</c>
 \var int (*starpu_sched_component::can_push)(struct starpu_sched_component *component)
 \var int (*starpu_sched_component::can_push)(struct starpu_sched_component *component)
      This function is called by a component which implements a queue,
      This function is called by a component which implements a queue,
      allowing it to signify to its parents that an empty slot is
      allowing it to signify to its parents that an empty slot is
-     available in its queue. The basic implementation of this function
+     available in its queue. This should return 1 if some tasks could be pushed
+     The basic implementation of this function
      is a recursive call to its parents, the user has to specify a
      is a recursive call to its parents, the user has to specify a
      personally-made function to catch those calls.
      personally-made function to catch those calls.
-\var void (*starpu_sched_component::can_pull)(struct starpu_sched_component *component)
+\var int (*starpu_sched_component::can_pull)(struct starpu_sched_component *component)
      This function allow a component to wake up a worker. It is
      This function allow a component to wake up a worker. It is
      currently called by component which implements a queue, to
      currently called by component which implements a queue, to
      signify to its children that a task have been pushed in its local
      signify to its children that a task have been pushed in its local
      queue, and is available to be popped by a worker, for example.
      queue, and is available to be popped by a worker, for example.
+     This should return 1 if some some container or worker could (or will) pull
+     some tasks.
      The basic implementation of this function is a recursive call to
      The basic implementation of this function is a recursive call to
      its children, until at least one worker have been woken up.
      its children, until at least one worker have been woken up.
 
 

+ 2 - 2
include/starpu_sched_component.h

@@ -60,7 +60,7 @@ struct starpu_sched_component
 	struct starpu_task *(*pull_task)(struct starpu_sched_component *);
 	struct starpu_task *(*pull_task)(struct starpu_sched_component *);
 
 
 	int (*can_push)(struct starpu_sched_component *component);
 	int (*can_push)(struct starpu_sched_component *component);
-	void (*can_pull)(struct starpu_sched_component *component);
+	int (*can_pull)(struct starpu_sched_component *component);
 
 
 	double (*estimated_load)(struct starpu_sched_component *component);
 	double (*estimated_load)(struct starpu_sched_component *component);
 	double (*estimated_end)(struct starpu_sched_component *component);
 	double (*estimated_end)(struct starpu_sched_component *component);
@@ -94,7 +94,7 @@ int starpu_sched_component_push_task(struct starpu_sched_component *from, struct
 struct starpu_task *starpu_sched_tree_pop_task(unsigned sched_ctx);
 struct starpu_task *starpu_sched_tree_pop_task(unsigned sched_ctx);
 struct starpu_task *starpu_sched_component_pull_task(struct starpu_sched_component *from, struct starpu_sched_component *to);
 struct starpu_task *starpu_sched_component_pull_task(struct starpu_sched_component *from, struct starpu_sched_component *to);
 struct starpu_task* starpu_sched_component_pump_downstream(struct starpu_sched_component *component, int* success);
 struct starpu_task* starpu_sched_component_pump_downstream(struct starpu_sched_component *component, int* success);
-void starpu_sched_component_send_can_push_to_parents(struct starpu_sched_component * component);
+int starpu_sched_component_send_can_push_to_parents(struct starpu_sched_component * component);
 
 
 void starpu_sched_tree_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers);
 void starpu_sched_tree_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers);
 void starpu_sched_tree_remove_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers);
 void starpu_sched_tree_remove_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers);

+ 2 - 2
src/sched_policies/component_eager.c

@@ -46,8 +46,8 @@ static int eager_push_task(struct starpu_sched_component * component, struct sta
 						{
 						{
 							if(starpu_sched_component_is_worker(component->children[i]))
 							if(starpu_sched_component_is_worker(component->children[i]))
 							{
 							{
-								component->children[i]->can_pull(component->children[i]);
-								return 1;
+								if (component->children[i]->can_pull(component->children[i]))
+									return 1;
 							}
 							}
 							else
 							else
 							{
 							{

+ 2 - 2
src/sched_policies/component_eager_calibration.c

@@ -57,8 +57,8 @@ static int eager_calibration_push_task(struct starpu_sched_component * component
 							{
 							{
 								if(starpu_sched_component_is_worker(component->children[i]))
 								if(starpu_sched_component_is_worker(component->children[i]))
 								{
 								{
-									component->children[i]->can_pull(component->children[i]);
-									return 1;
+									if (component->children[i]->can_pull(component->children[i]))
+										return 1;
 								}
 								}
 								else
 								else
 									return starpu_sched_component_push_task(component,component->children[i],task);
 									return starpu_sched_component_push_task(component,component->children[i],task);

+ 9 - 6
src/sched_policies/component_sched.c

@@ -575,13 +575,16 @@ static int starpu_sched_component_can_push(struct starpu_sched_component * compo
  * component. It is currenly called by components which holds a queue (like fifo and prio
  * component. It is currenly called by components which holds a queue (like fifo and prio
  * components) to signify its childs that a task has been pushed on its local queue.
  * components) to signify its childs that a task has been pushed on its local queue.
  */
  */
-static void starpu_sched_component_can_pull(struct starpu_sched_component * component)
+static int starpu_sched_component_can_pull(struct starpu_sched_component * component)
 {
 {
 	STARPU_ASSERT(component);
 	STARPU_ASSERT(component);
 	STARPU_ASSERT(!starpu_sched_component_is_worker(component));
 	STARPU_ASSERT(!starpu_sched_component_is_worker(component));
 	int i;
 	int i;
-	for(i = 0; i < component->nchildren; i++)
-		component->children[i]->can_pull(component->children[i]);
+	for(i = 0; i < component->nchildren; i++) {
+		if (component->children[i]->can_pull(component->children[i]))
+			return 1;
+	}
+	return 0;
 }
 }
 
 
 
 
@@ -589,12 +592,12 @@ static void starpu_sched_component_can_pull(struct starpu_sched_component * comp
    to pull but prefers that you push. It can be used by decision
    to pull but prefers that you push. It can be used by decision
    components, in which decisions are usually taken in their push()
    components, in which decisions are usually taken in their push()
    functions */
    functions */
-void starpu_sched_component_send_can_push_to_parents(struct starpu_sched_component * component)
+int starpu_sched_component_send_can_push_to_parents(struct starpu_sched_component * component)
 {
 {
 	STARPU_ASSERT(component);
 	STARPU_ASSERT(component);
 	STARPU_ASSERT(!starpu_sched_component_is_worker(component));
 	STARPU_ASSERT(!starpu_sched_component_is_worker(component));
 	
 	
-	int i,ret;
+	int i,ret = 0;
 	for(i=0; i < component->nparents; i++)
 	for(i=0; i < component->nparents; i++)
 	{
 	{
 		if(component->parents[i] == NULL)
 		if(component->parents[i] == NULL)
@@ -606,7 +609,7 @@ void starpu_sched_component_send_can_push_to_parents(struct starpu_sched_compone
 				break;
 				break;
 		}
 		}
 	}
 	}
-	
+	return ret != 0;
 }
 }
 
 
 
 

+ 0 - 1
src/sched_policies/component_work_stealing.c

@@ -275,7 +275,6 @@ int starpu_sched_tree_work_stealing_push_task(struct starpu_task *task)
 				wsd->fifos[i]->exp_len += task->predicted;
 				wsd->fifos[i]->exp_len += task->predicted;
 			STARPU_COMPONENT_MUTEX_UNLOCK(wsd->mutexes[i]);
 			STARPU_COMPONENT_MUTEX_UNLOCK(wsd->mutexes[i]);
 
 
-			//we need to wake all workers
 			component->can_pull(component);
 			component->can_pull(component);
 			return ret;
 			return ret;
 		}
 		}

+ 6 - 4
src/sched_policies/component_worker.c

@@ -396,11 +396,11 @@ static int _worker_consistant(struct starpu_sched_component * component)
 
 
 
 
 
 
-static void simple_worker_can_pull(struct starpu_sched_component * worker_component)
+static int simple_worker_can_pull(struct starpu_sched_component * worker_component)
 {
 {
 	struct _starpu_worker * worker = _starpu_sched_component_worker_get_worker(worker_component);
 	struct _starpu_worker * worker = _starpu_sched_component_worker_get_worker(worker_component);
 	int workerid = worker->workerid;
 	int workerid = worker->workerid;
-	_starpu_wake_worker_relax(workerid);
+	return _starpu_wake_worker_relax(workerid);
 }
 }
 
 
 static int simple_worker_push_task(struct starpu_sched_component * component, struct starpu_task *task)
 static int simple_worker_push_task(struct starpu_sched_component * component, struct starpu_task *task)
@@ -599,7 +599,7 @@ static struct starpu_sched_component * starpu_sched_component_worker_create(stru
 
 
 
 
 
 
-static void combined_worker_can_pull(struct starpu_sched_component * component)
+static int combined_worker_can_pull(struct starpu_sched_component * component)
 {
 {
 	(void) component;
 	(void) component;
 	STARPU_ASSERT(starpu_sched_component_is_combined_worker(component));
 	STARPU_ASSERT(starpu_sched_component_is_combined_worker(component));
@@ -610,8 +610,10 @@ static void combined_worker_can_pull(struct starpu_sched_component * component)
 	{
 	{
 		if((unsigned) i == workerid)
 		if((unsigned) i == workerid)
 			continue;
 			continue;
-		_starpu_wake_worker_relax(workerid);
+		if (_starpu_wake_worker_relax(workerid))
+			return 1;
 	}
 	}
+	return 0;
 }
 }
 
 
 static int combined_worker_push_task(struct starpu_sched_component * component, struct starpu_task *task)
 static int combined_worker_push_task(struct starpu_sched_component * component, struct starpu_task *task)