瀏覽代碼

Add "to" parameters to pull_task and can_push methods of sched components

so they can know what kind of worker etc. they are pushing to
Samuel Thibault 7 年之前
父節點
當前提交
a77b534d96

+ 11 - 3
doc/doxygen/chapters/api/modularized_scheduler.doxy

@@ -82,13 +82,13 @@ like <c>component->push_task(component,task)</c>
      This method must either return 0 if it the task was properly stored or
      This method must either return 0 if it the task was properly stored or
      passed over to a child component, or return a value different from 0 if the
      passed over to a child component, or return a value different from 0 if the
      task could not be consumed (e.g. the queue is full).
      task could not be consumed (e.g. the queue is full).
-\var struct starpu_task * (*starpu_sched_component::pull_task)(struct starpu_sched_component *)
+\var struct starpu_task * (*starpu_sched_component::pull_task)(struct starpu_sched_component *component, struct starpu_sched_component *to)
      pop a task from the scheduler module. this function is called by workers to get a task from their
      pop a task from the scheduler module. this function is called by workers to get a task from their
      parents. this function should first return a locally stored task
      parents. this function should first return a locally stored task
      or perform a recursive call on the parents.
      or perform a recursive call on the parents.
-     the task returned by this function is executable by the caller
+     the task returned by this function should be executable by the caller
 
 
-\var int (*starpu_sched_component::can_push)(struct starpu_sched_component *component)
+\var int (*starpu_sched_component::can_push)(struct starpu_sched_component *component, struct starpu_sched_component *to)
      This function is called by a component which implements a queue,
      This function is called by a component which implements a queue,
      allowing it to signify to its parents that an empty slot is
      allowing it to signify to its parents that an empty slot is
      available in its queue. This should return 1 if some tasks could be pushed
      available in its queue. This should return 1 if some tasks could be pushed
@@ -251,6 +251,14 @@ The actual scheduler
 @name Flow-control Fifo Component API
 @name Flow-control Fifo Component API
 \ingroup API_Modularized_Scheduler
 \ingroup API_Modularized_Scheduler
 
 
+\fn int starpu_sched_component_can_push(struct starpu_sched_component * component)
+\ingroup API_Modularized_Scheduler
+default function for the can_push component method, just calls can_push of parents until one of them returns non-zero
+
+\fn int starpu_sched_component_can_pull(struct starpu_sched_component * component)
+\ingroup API_Modularized_Scheduler
+default function for the can_pull component method, just calls can_pull of children until one of them returns non-zero
+
 \fn double starpu_sched_component_estimated_load(struct starpu_sched_component * component);
 \fn double starpu_sched_component_estimated_load(struct starpu_sched_component * component);
 \ingroup API_Modularized_Scheduler
 \ingroup API_Modularized_Scheduler
 default function for the estimated_load component method, just sums up the loads
 default function for the estimated_load component method, just sums up the loads

+ 5 - 2
include/starpu_sched_component.h

@@ -58,9 +58,9 @@ struct starpu_sched_component
 	void (*remove_parent)(struct starpu_sched_component *component, struct starpu_sched_component *parent);
 	void (*remove_parent)(struct starpu_sched_component *component, struct starpu_sched_component *parent);
 
 
 	int (*push_task)(struct starpu_sched_component *, struct starpu_task *);
 	int (*push_task)(struct starpu_sched_component *, struct starpu_task *);
-	struct starpu_task *(*pull_task)(struct starpu_sched_component *);
+	struct starpu_task *(*pull_task)(struct starpu_sched_component *from, struct starpu_sched_component *to);
 
 
-	int (*can_push)(struct starpu_sched_component *component);
+	int (*can_push)(struct starpu_sched_component *from, struct starpu_sched_component *to);
 	int (*can_pull)(struct starpu_sched_component *component);
 	int (*can_pull)(struct starpu_sched_component *component);
 
 
 	double (*estimated_load)(struct starpu_sched_component *component);
 	double (*estimated_load)(struct starpu_sched_component *component);
@@ -94,6 +94,7 @@ int starpu_sched_tree_push_task(struct starpu_task *task);
 int starpu_sched_component_push_task(struct starpu_sched_component *from, struct starpu_sched_component *to, struct starpu_task *task);
 int starpu_sched_component_push_task(struct starpu_sched_component *from, struct starpu_sched_component *to, struct starpu_task *task);
 struct starpu_task *starpu_sched_tree_pop_task(unsigned sched_ctx);
 struct starpu_task *starpu_sched_tree_pop_task(unsigned sched_ctx);
 struct starpu_task *starpu_sched_component_pull_task(struct starpu_sched_component *from, struct starpu_sched_component *to);
 struct starpu_task *starpu_sched_component_pull_task(struct starpu_sched_component *from, struct starpu_sched_component *to);
+struct starpu_task* starpu_sched_component_pump_to(struct starpu_sched_component *component, struct starpu_sched_component *to, int* success);
 struct starpu_task* starpu_sched_component_pump_downstream(struct starpu_sched_component *component, int* success);
 struct starpu_task* starpu_sched_component_pump_downstream(struct starpu_sched_component *component, int* success);
 int starpu_sched_component_send_can_push_to_parents(struct starpu_sched_component * component);
 int starpu_sched_component_send_can_push_to_parents(struct starpu_sched_component * component);
 
 
@@ -120,6 +121,8 @@ int starpu_sched_component_is_combined_worker(struct starpu_sched_component *com
 void starpu_sched_component_worker_pre_exec_hook(struct starpu_task *task, unsigned sched_ctx_id);
 void starpu_sched_component_worker_pre_exec_hook(struct starpu_task *task, unsigned sched_ctx_id);
 void starpu_sched_component_worker_post_exec_hook(struct starpu_task *task, unsigned sched_ctx_id);
 void starpu_sched_component_worker_post_exec_hook(struct starpu_task *task, unsigned sched_ctx_id);
 
 
+int starpu_sched_component_can_push(struct starpu_sched_component * component, struct starpu_sched_component * to);
+int starpu_sched_component_can_pull(struct starpu_sched_component * component);
 double starpu_sched_component_estimated_load(struct starpu_sched_component * component);
 double starpu_sched_component_estimated_load(struct starpu_sched_component * component);
 double starpu_sched_component_estimated_end_min(struct starpu_sched_component * component);
 double starpu_sched_component_estimated_end_min(struct starpu_sched_component * component);
 double starpu_sched_component_estimated_end_average(struct starpu_sched_component * component);
 double starpu_sched_component_estimated_end_average(struct starpu_sched_component * component);

+ 1 - 1
src/sched_policies/component_best_implementation.c

@@ -86,7 +86,7 @@ int starpu_sched_component_is_best_implementation(struct starpu_sched_component
 	return component->push_task == best_implementation_push_task;
 	return component->push_task == best_implementation_push_task;
 }
 }
 
 
-static struct starpu_task * best_implementation_pull_task(struct starpu_sched_component * component)
+static struct starpu_task * best_implementation_pull_task(struct starpu_sched_component * component, struct starpu_sched_component * from STARPU_ATTRIBUTE_UNUSED)
 {
 {
 	struct starpu_task * task = NULL;
 	struct starpu_task * task = NULL;
 	unsigned i;
 	unsigned i;

+ 1 - 1
src/sched_policies/component_composed.c

@@ -125,7 +125,7 @@ static int composed_component_push_task(struct starpu_sched_component * componen
 	struct composed_component *c = component->data;
 	struct composed_component *c = component->data;
 	return starpu_sched_component_push_task(component,c->top,task);
 	return starpu_sched_component_push_task(component,c->top,task);
 }
 }
-struct starpu_task * composed_component_pull_task(struct starpu_sched_component *component)
+struct starpu_task * composed_component_pull_task(struct starpu_sched_component *component, struct starpu_sched_component * to STARPU_ATTRIBUTE_UNUSED)
 {
 {
 	struct composed_component *c = component->data;
 	struct composed_component *c = component->data;
 	struct starpu_task * task = NULL;
 	struct starpu_task * task = NULL;

+ 2 - 2
src/sched_policies/component_fifo.c

@@ -150,7 +150,7 @@ static int fifo_push_task(struct starpu_sched_component * component, struct star
 	return fifo_push_local_task(component, task, 0);
 	return fifo_push_local_task(component, task, 0);
 }
 }
 
 
-static struct starpu_task * fifo_pull_task(struct starpu_sched_component * component)
+static struct starpu_task * fifo_pull_task(struct starpu_sched_component * component, struct starpu_sched_component * to STARPU_ATTRIBUTE_UNUSED)
 {
 {
 	STARPU_ASSERT(component && component->data);
 	STARPU_ASSERT(component && component->data);
 	struct _starpu_fifo_data * data = component->data;
 	struct _starpu_fifo_data * data = component->data;
@@ -191,7 +191,7 @@ static struct starpu_task * fifo_pull_task(struct starpu_sched_component * compo
  * push fails, which means that the worker fifo_components are
  * push fails, which means that the worker fifo_components are
  * currently "full".
  * currently "full".
  */
  */
-static int fifo_can_push(struct starpu_sched_component * component)
+static int fifo_can_push(struct starpu_sched_component * component, struct starpu_sched_component * to STARPU_ATTRIBUTE_UNUSED)
 {
 {
 	STARPU_ASSERT(component && starpu_sched_component_is_fifo(component));
 	STARPU_ASSERT(component && starpu_sched_component_is_fifo(component));
 	int res = 0;
 	int res = 0;

+ 2 - 2
src/sched_policies/component_heft.c

@@ -206,7 +206,7 @@ static int heft_push_task(struct starpu_sched_component * component, struct star
 	return 0;
 	return 0;
 }
 }
 
 
-static int heft_can_push(struct starpu_sched_component *component)
+static int heft_can_push(struct starpu_sched_component *component, struct starpu_sched_component * to STARPU_ATTRIBUTE_UNUSED)
 {
 {
 	heft_progress(component);
 	heft_progress(component);
 	int ret = 0;
 	int ret = 0;
@@ -217,7 +217,7 @@ static int heft_can_push(struct starpu_sched_component *component)
 			continue;
 			continue;
 		else
 		else
 		{
 		{
-			ret = component->parents[j]->can_push(component->parents[j]);
+			ret = component->parents[j]->can_push(component->parents[j], component);
 			if(ret)
 			if(ret)
 				break;
 				break;
 		}
 		}

+ 2 - 2
src/sched_policies/component_prio.c

@@ -182,7 +182,7 @@ static int prio_push_task(struct starpu_sched_component * component, struct star
 	return ret;
 	return ret;
 }
 }
 
 
-static struct starpu_task * prio_pull_task(struct starpu_sched_component * component)
+static struct starpu_task * prio_pull_task(struct starpu_sched_component * component, struct starpu_sched_component * to STARPU_ATTRIBUTE_UNUSED)
 {
 {
 	STARPU_ASSERT(component && component->data);
 	STARPU_ASSERT(component && component->data);
 	struct _starpu_prio_data * data = component->data;
 	struct _starpu_prio_data * data = component->data;
@@ -249,7 +249,7 @@ static struct starpu_task * prio_pull_task(struct starpu_sched_component * compo
  * push fails, which means that the worker prio_components are
  * push fails, which means that the worker prio_components are
  * currently "full".
  * currently "full".
  */
  */
-static int prio_can_push(struct starpu_sched_component * component)
+static int prio_can_push(struct starpu_sched_component * component, struct starpu_sched_component * to STARPU_ATTRIBUTE_UNUSED)
 {
 {
 	STARPU_ASSERT(component && starpu_sched_component_is_prio(component));
 	STARPU_ASSERT(component && starpu_sched_component_is_prio(component));
 	int res = 0;
 	int res = 0;

+ 15 - 11
src/sched_policies/component_sched.c

@@ -358,9 +358,9 @@ struct starpu_task * starpu_sched_tree_pop_task(unsigned sched_ctx)
 	return task;
 	return task;
 }
 }
 
 
-struct starpu_task * starpu_sched_component_pull_task(struct starpu_sched_component *from, struct starpu_sched_component *to STARPU_ATTRIBUTE_UNUSED)
+struct starpu_task * starpu_sched_component_pull_task(struct starpu_sched_component *from, struct starpu_sched_component *to)
 {
 {
-	struct starpu_task *task = from->pull_task(from);
+	struct starpu_task *task = from->pull_task(from, to);
 	if (task)
 	if (task)
 		_STARPU_TRACE_SCHED_COMPONENT_PULL(from, to, task);
 		_STARPU_TRACE_SCHED_COMPONENT_PULL(from, to, task);
 	return task;
 	return task;
@@ -369,17 +369,15 @@ struct starpu_task * starpu_sched_component_pull_task(struct starpu_sched_compon
 
 
 /* Pump mechanic to get the task flow rolling. Takes tasks from component and send them to the child.
 /* Pump mechanic to get the task flow rolling. Takes tasks from component and send them to the child.
    To be used by components with only one child */
    To be used by components with only one child */
-struct starpu_task* starpu_sched_component_pump_downstream(struct starpu_sched_component *component, int* success)
+struct starpu_task* starpu_sched_component_pump_to(struct starpu_sched_component *component, struct starpu_sched_component *child, int* success)
 {
 {
 	int ret = 0;
 	int ret = 0;
 
 
-	STARPU_ASSERT(component->nchildren == 1);
-	struct starpu_sched_component * child = component->children[0];
 	struct starpu_task * task;
 	struct starpu_task * task;
 
 
 	while (1)
 	while (1)
 	{
 	{
-		task = starpu_sched_component_pull_task(component,component);
+		task = starpu_sched_component_pull_task(component,child);
 		if (!task)
 		if (!task)
 			break;
 			break;
 		ret = starpu_sched_component_push_task(component,child,task);
 		ret = starpu_sched_component_push_task(component,child,task);
@@ -395,6 +393,12 @@ struct starpu_task* starpu_sched_component_pump_downstream(struct starpu_sched_c
 
 
 }
 }
 
 
+struct starpu_task* starpu_sched_component_pump_downstream(struct starpu_sched_component *component, int* success)
+{
+	STARPU_ASSERT(component->nchildren == 1);
+	return starpu_sched_component_pump_to(component, component->children[0], success);
+}
+
 void starpu_sched_tree_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
 void starpu_sched_tree_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
 {
 {
 	STARPU_ASSERT(sched_ctx_id < STARPU_NMAX_SCHED_CTXS);
 	STARPU_ASSERT(sched_ctx_id < STARPU_NMAX_SCHED_CTXS);
@@ -529,7 +533,7 @@ static void starpu_sched_component_remove_parent(struct starpu_sched_component *
 /* default implementation for component->pull_task()
 /* default implementation for component->pull_task()
  * just perform a recursive call on parent
  * just perform a recursive call on parent
  */
  */
-static struct starpu_task * starpu_sched_component_parents_pull_task(struct starpu_sched_component * component)
+static struct starpu_task * starpu_sched_component_parents_pull_task(struct starpu_sched_component * component, struct starpu_sched_component * to STARPU_ATTRIBUTE_UNUSED)
 {
 {
 	STARPU_ASSERT(component);
 	STARPU_ASSERT(component);
 	struct starpu_task * task = NULL;
 	struct starpu_task * task = NULL;
@@ -552,7 +556,7 @@ static struct starpu_task * starpu_sched_component_parents_pull_task(struct star
  * A personally-made can_push in a component (like in prio components) is necessary to catch
  * A personally-made can_push in a component (like in prio components) is necessary to catch
  * this recursive call somewhere, if the user wants to exploit it.
  * this recursive call somewhere, if the user wants to exploit it.
  */
  */
-static int starpu_sched_component_can_push(struct starpu_sched_component * component)
+int starpu_sched_component_can_push(struct starpu_sched_component * component, struct starpu_sched_component * to STARPU_ATTRIBUTE_UNUSED)
 {
 {
 	STARPU_ASSERT(component);
 	STARPU_ASSERT(component);
 	int ret = 0;
 	int ret = 0;
@@ -563,7 +567,7 @@ static int starpu_sched_component_can_push(struct starpu_sched_component * compo
 		{
 		{
 			struct starpu_sched_component * parent = component->parents[i];
 			struct starpu_sched_component * parent = component->parents[i];
 			if(parent != NULL)
 			if(parent != NULL)
-				ret = parent->can_push(parent);
+				ret = parent->can_push(parent, component);
 			if(ret)
 			if(ret)
 				break;
 				break;
 		}
 		}
@@ -575,7 +579,7 @@ static int starpu_sched_component_can_push(struct starpu_sched_component * compo
  * component. It is currenly called by components which holds a queue (like fifo and prio
  * component. It is currenly called by components which holds a queue (like fifo and prio
  * components) to signify its childs that a task has been pushed on its local queue.
  * components) to signify its childs that a task has been pushed on its local queue.
  */
  */
-static int starpu_sched_component_can_pull(struct starpu_sched_component * component)
+int starpu_sched_component_can_pull(struct starpu_sched_component * component)
 {
 {
 	STARPU_ASSERT(component);
 	STARPU_ASSERT(component);
 	STARPU_ASSERT(!starpu_sched_component_is_worker(component));
 	STARPU_ASSERT(!starpu_sched_component_is_worker(component));
@@ -606,7 +610,7 @@ int starpu_sched_component_send_can_push_to_parents(struct starpu_sched_componen
 			continue;
 			continue;
 		else
 		else
 		{
 		{
-			ret = component->parents[i]->can_push(component->parents[i]);
+			ret = component->parents[i]->can_push(component->parents[i], component);
 			if(ret)
 			if(ret)
 				break;
 				break;
 		}
 		}

+ 1 - 1
src/sched_policies/component_work_stealing.c

@@ -130,7 +130,7 @@ static int is_worker_of_component(struct starpu_sched_component * component, int
 
 
 
 
 
 
-static struct starpu_task * pull_task(struct starpu_sched_component * component)
+static struct starpu_task * pull_task(struct starpu_sched_component * component, struct starpu_sched_component * to STARPU_ATTRIBUTE_UNUSED)
 {
 {
 	unsigned workerid = starpu_worker_get_id_check();
 	unsigned workerid = starpu_worker_get_id_check();
 	unsigned i;
 	unsigned i;

+ 2 - 2
src/sched_policies/component_worker.c

@@ -428,7 +428,7 @@ static int simple_worker_push_task(struct starpu_sched_component * component, st
 	return 0;
 	return 0;
 }
 }
 
 
-static struct starpu_task * simple_worker_pull_task(struct starpu_sched_component *component)
+static struct starpu_task * simple_worker_pull_task(struct starpu_sched_component *component, struct starpu_sched_component * to)
 {
 {
 	unsigned workerid = starpu_worker_get_id_check();
 	unsigned workerid = starpu_worker_get_id_check();
 	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
 	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
@@ -484,7 +484,7 @@ static struct starpu_task * simple_worker_pull_task(struct starpu_sched_componen
 		struct starpu_sched_component * combined_worker_component = starpu_sched_component_worker_get(component->tree->sched_ctx_id, workerid);
 		struct starpu_sched_component * combined_worker_component = starpu_sched_component_worker_get(component->tree->sched_ctx_id, workerid);
 		starpu_sched_component_push_task(component, combined_worker_component, task);
 		starpu_sched_component_push_task(component, combined_worker_component, task);
 		/* we have pushed a task in queue, so can make a recursive call */
 		/* we have pushed a task in queue, so can make a recursive call */
-		task = simple_worker_pull_task(component);
+		task = simple_worker_pull_task(component, to);
 		goto ret;
 		goto ret;
 
 
 	}
 	}