Browse Source

The starpu_push_local_task function puts a task into the local queue associated
to a specific worker. The scheduling policy may now defined pop_task as NULL,
so that the worker would only pick tasks from this local queue in case its
sufficient.

Cédric Augonnet 14 years ago
parent
commit
6a644744e4

+ 22 - 3
doc/starpu.texi

@@ -4270,6 +4270,7 @@ the StarPU sources in the directory @code{examples/scheduler/}.
 * starpu_worker_set_sched_condition::
 * starpu_worker_set_sched_condition::
 * starpu_sched_set_min_priority::       Set the minimum priority level
 * starpu_sched_set_min_priority::       Set the minimum priority level
 * starpu_sched_set_max_priority::       Set the maximum priority level
 * starpu_sched_set_max_priority::       Set the maximum priority level
+* starpu_push_local_task::		Assign a task to a worker
 * Source code::                 
 * Source code::                 
 @end menu
 @end menu
 
 
@@ -4294,7 +4295,10 @@ Insert a task into the scheduler.
 Insert a priority task into the scheduler.
 Insert a priority task into the scheduler.
 @item @code{pop_task}:
 @item @code{pop_task}:
 Get a task from the scheduler. The mutex associated to the worker is already
 Get a task from the scheduler. The mutex associated to the worker is already
-taken when this method is called.
+taken when this method is called. If this method is defined as @code{NULL}, the
+worker will only execute tasks from its local queue. In this case, the
+@code{push_task} method should use the @code{starpu_push_local_task} method to
+assign tasks to the different workers.
 @item @code{pop_every_task}:
 @item @code{pop_every_task}:
 Remove all available tasks from the scheduler (tasks are chained by the means
 Remove all available tasks from the scheduler (tasks are chained by the means
 of the prev and next fields of the starpu_task structure). The mutex associated
 of the prev and next fields of the starpu_task structure). The mutex associated
@@ -4337,7 +4341,7 @@ is 0 by convention.  The application may access that value by calling the
 called from the initialization method of the scheduling policy, and should not
 called from the initialization method of the scheduling policy, and should not
 be used directly from the application.
 be used directly from the application.
 @item @emph{Prototype}:
 @item @emph{Prototype}:
-@code{void starpu_sched_set_min_priority(int min_prio)}
+@code{void starpu_sched_set_min_priority(int min_prio);}
 @end table
 @end table
 
 
 @node starpu_sched_set_max_priority
 @node starpu_sched_set_max_priority
@@ -4350,9 +4354,24 @@ calling the @code{starpu_sched_get_max_priority} function. This function should
 only be called from the initialization method of the scheduling policy, and
 only be called from the initialization method of the scheduling policy, and
 should not be used directly from the application.
 should not be used directly from the application.
 @item @emph{Prototype}:
 @item @emph{Prototype}:
-@code{void starpu_sched_set_min_priority(int max_prio)}
+@code{void starpu_sched_set_min_priority(int max_prio);}
 @end table
 @end table
 
 
+@node starpu_push_local_task
+@subsection @code{starpu_push_local_task}
+@table @asis
+@item @emph{Description}:
+The scheduling policy may put tasks directly into a worker's local queue so
+that it is not always necessary to create its own queue when the local queue
+is sufficient. If "back" not null, the task is put at the back of the queue
+where the worker will pop tasks first. Setting "back" to 0 therefore ensures
+a FIFO ordering. 
+
+@item @emph{Prototype}:
+@code{int starpu_push_local_task(int workerid, struct starpu_task *task, int back);}
+@end table
+
+
 
 
 
 
 @node Source code
 @node Source code

+ 7 - 0
include/starpu_scheduler.h

@@ -136,4 +136,11 @@ void _starpu_sched_find_worker_combinations(struct starpu_machine_topology_s *to
 
 
 int starpu_combined_worker_get_description(int workerid, int *worker_size, int **combined_workerid);
 int starpu_combined_worker_get_description(int workerid, int *worker_size, int **combined_workerid);
 
 
+/* The scheduling policy may put tasks directly into a worker's local queue so
+ * that it is not always necessary to create its own queue when the local queue
+ * is sufficient. If "back" not null, the task is put at the back of the queue
+ * where the worker will pop tasks first. Setting "back" to 0 therefore ensures
+ * a FIFO ordering. */
+int starpu_push_local_task(int workerid, struct starpu_task *task, int back);
+
 #endif // __STARPU_SCHEDULER_H__
 #endif // __STARPU_SCHEDULER_H__

+ 7 - 2
src/core/jobs.c

@@ -363,7 +363,7 @@ struct starpu_task *_starpu_pop_local_task(struct starpu_worker_s *worker)
 	return task;
 	return task;
 }
 }
 
 
-int _starpu_push_local_task(struct starpu_worker_s *worker, struct starpu_task *task)
+int _starpu_push_local_task(struct starpu_worker_s *worker, struct starpu_task *task, int back)
 {
 {
 	/* Check that the worker is able to execute the task ! */
 	/* Check that the worker is able to execute the task ! */
 	STARPU_ASSERT(task && task->cl);
 	STARPU_ASSERT(task && task->cl);
@@ -371,7 +371,12 @@ int _starpu_push_local_task(struct starpu_worker_s *worker, struct starpu_task *
 		return -ENODEV;
 		return -ENODEV;
 
 
 	PTHREAD_MUTEX_LOCK(worker->sched_mutex);
 	PTHREAD_MUTEX_LOCK(worker->sched_mutex);
-	starpu_task_list_push_front(&worker->local_tasks, task);
+
+	if (back)
+		starpu_task_list_push_back(&worker->local_tasks, task);
+	else
+		starpu_task_list_push_front(&worker->local_tasks, task);
+
 	PTHREAD_COND_BROADCAST(worker->sched_cond);
 	PTHREAD_COND_BROADCAST(worker->sched_cond);
 	PTHREAD_MUTEX_UNLOCK(worker->sched_mutex);
 	PTHREAD_MUTEX_UNLOCK(worker->sched_mutex);
 
 

+ 4 - 2
src/core/jobs.h

@@ -153,8 +153,10 @@ size_t _starpu_job_get_data_size(starpu_job_t j);
 struct starpu_task *_starpu_pop_local_task(struct starpu_worker_s *worker);
 struct starpu_task *_starpu_pop_local_task(struct starpu_worker_s *worker);
 
 
 /* Put a task into the pool of tasks that are explicitly attributed to the
 /* Put a task into the pool of tasks that are explicitly attributed to the
- * specified worker. */
-int _starpu_push_local_task(struct starpu_worker_s *worker, struct starpu_task *task);
+ * specified worker. If "back" is set, the task is put at the back of the list.
+ * Considering the tasks are popped from the back, this value should be 0 to
+ * enforce a FIFO ordering. */
+int _starpu_push_local_task(struct starpu_worker_s *worker, struct starpu_task *task, int back);
 
 
 /* Returns the symbol associated to that job if any. */
 /* Returns the symbol associated to that job if any. */
 const char *_starpu_get_model_name(starpu_job_t j);
 const char *_starpu_get_model_name(starpu_job_t j);

+ 22 - 4
src/core/sched_policy.c

@@ -235,7 +235,7 @@ static int _starpu_push_task_on_specific_worker(struct starpu_task *task, int wo
 
 
 	if (is_basic_worker)
 	if (is_basic_worker)
 	{
 	{
-		return _starpu_push_local_task(worker, task);
+		return _starpu_push_local_task(worker, task, 0);
 	}
 	}
 	else {
 	else {
 		/* This is a combined worker so we create task aliases */
 		/* This is a combined worker so we create task aliases */
@@ -258,7 +258,7 @@ static int _starpu_push_task_on_specific_worker(struct starpu_task *task, int wo
 			struct starpu_task *alias = _starpu_create_task_alias(task);
 			struct starpu_task *alias = _starpu_create_task_alias(task);
 
 
 			worker = _starpu_get_worker_struct(combined_workerid[i]);
 			worker = _starpu_get_worker_struct(combined_workerid[i]);
-			ret |= _starpu_push_local_task(worker, alias);
+			ret |= _starpu_push_local_task(worker, alias, 0);
 		}
 		}
 
 
 		return ret;
 		return ret;
@@ -300,7 +300,7 @@ int _starpu_push_task(starpu_job_t j, unsigned job_is_already_locked)
         return ret;
         return ret;
 }
 }
 
 
-struct starpu_task *_starpu_pop_task(void)
+struct starpu_task *_starpu_pop_task(struct starpu_worker_s *worker)
 {
 {
 	struct starpu_task *task;
 	struct starpu_task *task;
 
 
@@ -311,7 +311,11 @@ struct starpu_task *_starpu_pop_task(void)
 	if (profiling)
 	if (profiling)
 		starpu_clock_gettime(&pop_start_time);
 		starpu_clock_gettime(&pop_start_time);
 
 
-	task = policy.pop_task();
+	/* perhaps there is some local task to be executed first */
+	task = _starpu_pop_local_task(worker);
+
+	if (!task && policy.pop_task)
+		task = policy.pop_task();
 
 
 	/* Note that we may get a NULL task in case the scheduler was unlocked
 	/* Note that we may get a NULL task in case the scheduler was unlocked
 	 * for some reason. */
 	 * for some reason. */
@@ -365,3 +369,17 @@ void _starpu_wait_on_sched_event(void)
 
 
 	PTHREAD_MUTEX_UNLOCK(worker->sched_mutex);
 	PTHREAD_MUTEX_UNLOCK(worker->sched_mutex);
 }
 }
+
+/* The scheduling policy may put tasks directly into a worker's local queue so
+ * that it is not always necessary to create its own queue when the local queue
+ * is sufficient. If "back" not null, the task is put at the back of the queue
+ * where the worker will pop tasks first. Setting "back" to 0 therefore ensures
+ * a FIFO ordering. */
+int starpu_push_local_task(int workerid, struct starpu_task *task, int back)
+{
+	struct starpu_worker_s *worker = _starpu_get_worker_struct(workerid);
+
+	return _starpu_push_local_task(worker, task, back);
+}
+
+

+ 1 - 1
src/core/sched_policy.h

@@ -30,7 +30,7 @@ void _starpu_deinit_sched_policy(struct starpu_machine_config_s *config);
 
 
 int _starpu_push_task(starpu_job_t task, unsigned job_is_already_locked);
 int _starpu_push_task(starpu_job_t task, unsigned job_is_already_locked);
 /* pop a task that can be executed on the worker */
 /* pop a task that can be executed on the worker */
-struct starpu_task *_starpu_pop_task(void);
+struct starpu_task *_starpu_pop_task(struct starpu_worker_s *worker);
 /* pop every task that can be executed on the worker */
 /* pop every task that can be executed on the worker */
 struct starpu_task *_starpu_pop_every_task(void);
 struct starpu_task *_starpu_pop_every_task(void);
 void _starpu_sched_post_exec_hook(struct starpu_task *task);
 void _starpu_sched_post_exec_hook(struct starpu_task *task);

+ 1 - 6
src/drivers/cpu/driver_cpu.c

@@ -153,12 +153,7 @@ void *_starpu_cpu_worker(void *arg)
 
 
 		PTHREAD_MUTEX_LOCK(cpu_arg->sched_mutex);
 		PTHREAD_MUTEX_LOCK(cpu_arg->sched_mutex);
 
 
-		/* perhaps there is some local task to be executed first */
-		task = _starpu_pop_local_task(cpu_arg);
-
-		/* otherwise ask a task to the scheduler */
-		if (!task)
-			task = _starpu_pop_task();
+		task = _starpu_pop_task(cpu_arg);
 	
 	
                 if (!task) 
                 if (!task) 
 		{
 		{

+ 1 - 6
src/drivers/cuda/driver_cuda.c

@@ -266,12 +266,7 @@ void *_starpu_cuda_worker(void *arg)
 
 
 		PTHREAD_MUTEX_LOCK(args->sched_mutex);
 		PTHREAD_MUTEX_LOCK(args->sched_mutex);
 
 
-		/* perhaps there is some local task to be executed first */
-		task = _starpu_pop_local_task(args);
-
-		/* otherwise ask a task to the scheduler */
-		if (!task)
-			task = _starpu_pop_task();
+		task = _starpu_pop_task(args);
 	
 	
                 if (task == NULL) 
                 if (task == NULL) 
 		{
 		{

+ 1 - 6
src/drivers/opencl/driver_opencl.c

@@ -402,12 +402,7 @@ void *_starpu_opencl_worker(void *arg)
 
 
 		PTHREAD_MUTEX_LOCK(args->sched_mutex);
 		PTHREAD_MUTEX_LOCK(args->sched_mutex);
 
 
-		/* perhaps there is some local task to be executed first */
-		task = _starpu_pop_local_task(args);
-
-		/* otherwise ask a task to the scheduler */
-		if (!task)
-			task = _starpu_pop_task();
+		task = _starpu_pop_task(args);
 		
 		
                 if (task == NULL) 
                 if (task == NULL) 
 		{
 		{

+ 2 - 20
src/sched_policies/random_policy.c

@@ -21,22 +21,10 @@
 #include <sched_policies/fifo_queues.h>
 #include <sched_policies/fifo_queues.h>
 
 
 static unsigned nworkers;
 static unsigned nworkers;
-static struct starpu_fifo_taskq_s *queue_array[STARPU_NMAXWORKERS];
 
 
 static pthread_cond_t sched_cond[STARPU_NMAXWORKERS];
 static pthread_cond_t sched_cond[STARPU_NMAXWORKERS];
 static pthread_mutex_t sched_mutex[STARPU_NMAXWORKERS];
 static pthread_mutex_t sched_mutex[STARPU_NMAXWORKERS];
 
 
-static struct starpu_task *random_pop_task(void)
-{
-	struct starpu_task *task;
-
-	int workerid = starpu_worker_get_id();
-
-	task = _starpu_fifo_pop_task(queue_array[workerid], -1);
-
-	return task;
-}
-
 static int _random_push_task(struct starpu_task *task, unsigned prio)
 static int _random_push_task(struct starpu_task *task, unsigned prio)
 {
 {
 	/* find the queue */
 	/* find the queue */
@@ -71,11 +59,7 @@ static int _random_push_task(struct starpu_task *task, unsigned prio)
 	}
 	}
 
 
 	/* we should now have the best worker in variable "selected" */
 	/* we should now have the best worker in variable "selected" */
-	if (prio) {
-		return _starpu_fifo_push_prio_task(queue_array[selected], &sched_mutex[selected], &sched_cond[selected], task);
-	} else {
-		return _starpu_fifo_push_task(queue_array[selected],&sched_mutex[selected], &sched_cond[selected],  task);
-	}
+	return starpu_push_local_task(selected, task, prio);
 }
 }
 
 
 static int random_push_prio_task(struct starpu_task *task)
 static int random_push_prio_task(struct starpu_task *task)
@@ -98,8 +82,6 @@ static void initialize_random_policy(struct starpu_machine_topology_s *topology,
 	unsigned workerid;
 	unsigned workerid;
 	for (workerid = 0; workerid < nworkers; workerid++)
 	for (workerid = 0; workerid < nworkers; workerid++)
 	{
 	{
-		queue_array[workerid] = _starpu_create_fifo();
-	
 		PTHREAD_MUTEX_INIT(&sched_mutex[workerid], NULL);
 		PTHREAD_MUTEX_INIT(&sched_mutex[workerid], NULL);
 		PTHREAD_COND_INIT(&sched_cond[workerid], NULL);
 		PTHREAD_COND_INIT(&sched_cond[workerid], NULL);
 	
 	
@@ -112,7 +94,7 @@ struct starpu_sched_policy_s _starpu_sched_random_policy = {
 	.deinit_sched = NULL,
 	.deinit_sched = NULL,
 	.push_task = random_push_task,
 	.push_task = random_push_task,
 	.push_prio_task = random_push_prio_task,
 	.push_prio_task = random_push_prio_task,
-	.pop_task = random_pop_task,
+	.pop_task = NULL,
 	.post_exec_hook = NULL,
 	.post_exec_hook = NULL,
 	.pop_every_task = NULL,
 	.pop_every_task = NULL,
 	.policy_name = "random",
 	.policy_name = "random",