Browse Source

Add submit_hook and do_schedule scheduler methods

Samuel Thibault 9 years ago
parent
commit
f0106e1e82

+ 1 - 0
ChangeLog

@@ -177,6 +177,7 @@ Small features:
     void starpu_opencl_load_program_source_malloc(const char *source_file_name, char **located_file_name, char **located_dir_name, char **opencl_program_source)
     which allocates the pointers located_file_name, located_dir_name
     and opencl_program_source.
+  * Add submit_hook and do_schedule scheduler methods.
 
 
 Changes:

+ 32 - 4
doc/doxygen/chapters/api/scheduling_policy.doxy

@@ -19,12 +19,34 @@ This structure contains all the methods that implement a
 scheduling policy. An application may specify which scheduling
 strategy in the field starpu_conf::sched_policy passed to the function
 starpu_init().
+
+For each task going through the scheduler, the following methods get called in the given order:
+
+<ul>
+<li>starpu_sched_policy::submit_hook when the task is submitted</li>
+<li>starpu_sched_policy::push_task when the task becomes ready. The scheduler is here <b>given</b> the task</li>
+<li>starpu_sched_policy::pop_task when a worker is idle. The scheduler here <b>gives</b> back the task to the core</li>
+<li>starpu_sched_policy::pre_exec_hook right before the worker actually starts the task computation (after transferring any missing data).</li>
+<li>starpu_sched_policy::post_exec_hook right after the worker actually completed the task computation.</li>
+</ul>
+
+For each task not going through the scheduler (because starpu_task::execute_on_a_specific_worker was set), these get called:
+
+<ul>
+<li>starpu_sched_policy::submit_hook when the task is submitted</li>
+<li>starpu_sched_policy::push_task_notify when the task becomes ready. This is just a notification, the scheduler does not have to do anything about the task.</li>
+<li>starpu_sched_policy::pre_exec_hook right before the worker actually starts the task computation (after transferring any missing data).</li>
+<li>starpu_sched_policy::post_exec_hook right after the worker actually completed the task computation.</li>
+</ul>
+
+
 \var void (*starpu_sched_policy::init_sched)(unsigned sched_ctx_id)
-        Initialize the scheduling policy.
+        Initialize the scheduling policy, called before any other method.
 \var void (*starpu_sched_policy::deinit_sched)(unsigned sched_ctx_id)
-        Cleanup the scheduling policy.
+        Cleanup the scheduling policy, called before any other method.
 \var int (*starpu_sched_policy::push_task)(struct starpu_task *)
-        Insert a task into the scheduler.
+        Insert a task into the scheduler, called when the task becomes ready for
+        execution.
 \var void (*starpu_sched_policy::push_task_notify)(struct starpu_task *, int workerid, int perf_workerid, unsigned sched_ctx_id)
         Notify the scheduler that a task was pushed on a given worker.
 	This method is called when a task that was explicitly
@@ -44,11 +66,17 @@ starpu_init().
 	chained by the means of the field starpu_task::prev and
 	starpu_task::next). The mutex associated to the worker is
 	already taken when this method is called. This is currently
-	not used.
+	not used and can be discarded.
+\var void (*starpu_sched_policy::submit_hook)(struct starpu_task *)
+        Optional field. This method is called when a task is submitted.
 \var void (*starpu_sched_policy::pre_exec_hook)(struct starpu_task *)
         Optional field. This method is called every time a task is starting.
 \var void (*starpu_sched_policy::post_exec_hook)(struct starpu_task *)
         Optional field. This method is called every time a task has been executed.
+\var void (*starpu_sched_policy::do_schedule)(unsigned sched_ctx_id)
+        Optional field. This method is called when it is a good time to start
+        scheduling tasks. This is notably called when the application calls
+        starpu_task_wait_for_all.
 \var void (*starpu_sched_policy::add_workers)(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
         Initialize scheduling structures corresponding to each worker used by the policy.
 \var void (*starpu_sched_policy::remove_workers)(unsigned sched_ctx_id, int *workerids, unsigned nworkers)

+ 4 - 1
include/starpu_scheduler.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2010-2015  Université de Bordeaux
+ * Copyright (C) 2010-2016  Université de Bordeaux
  * Copyright (C) 2011  Télécom-SudParis
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -38,9 +38,12 @@ struct starpu_sched_policy
 	struct starpu_task *(*pop_task)(unsigned sched_ctx_id);
 	struct starpu_task *(*pop_every_task)(unsigned sched_ctx_id);
 
+	void (*submit_hook)(struct starpu_task *task);
 	void (*pre_exec_hook)(struct starpu_task *);
 	void (*post_exec_hook)(struct starpu_task *);
 
+	void (*do_schedule)(unsigned sched_ctx_id);
+
 	void (*add_workers)(unsigned sched_ctx_id, int *workerids, unsigned nworkers);
 	void (*remove_workers)(unsigned sched_ctx_id, int *workerids, unsigned nworkers);
 

+ 24 - 0
src/core/sched_policy.c

@@ -216,6 +216,30 @@ void _starpu_deinit_sched_policy(struct _starpu_sched_ctx *sched_ctx)
 	starpu_sched_ctx_delete_worker_collection(sched_ctx->id);
 }
 
+void _starpu_sched_task_submit(struct starpu_task *task)
+{
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
+	if (!sched_ctx->sched_policy)
+		return;
+	if (!sched_ctx->sched_policy->submit_hook)
+		return;
+	_STARPU_TRACE_WORKER_SCHEDULING_PUSH;
+	sched_ctx->sched_policy->submit_hook(task);
+	_STARPU_TRACE_WORKER_SCHEDULING_POP;
+}
+
+void _starpu_sched_do_schedule(unsigned sched_ctx_id)
+{
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	if (!sched_ctx->sched_policy)
+		return;
+	if (!sched_ctx->sched_policy->do_schedule)
+		return;
+	_STARPU_TRACE_WORKER_SCHEDULING_PUSH;
+	sched_ctx->sched_policy->do_schedule(sched_ctx_id);
+	_STARPU_TRACE_WORKER_SCHEDULING_POP;
+}
+
 static void _starpu_push_task_on_specific_worker_notify_sched(struct starpu_task *task, struct _starpu_worker *worker, int workerid, int perf_workerid)
 {
 	/* if we push a task on a specific worker, notify all the sched_ctxs the worker belongs to */

+ 4 - 1
src/core/sched_policy.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2010, 2012-2013, 2015  Université de Bordeaux
+ * Copyright (C) 2010, 2012-2013, 2015-2016  Université de Bordeaux
  * Copyright (C) 2011  INRIA
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -33,6 +33,9 @@ void _starpu_deinit_sched_policy(struct _starpu_sched_ctx *sched_ctx);
 
 struct starpu_sched_policy *_starpu_select_sched_policy(struct _starpu_machine_config *config, const char *required_policy);
 
+void _starpu_sched_task_submit(struct starpu_task *task);
+void _starpu_sched_do_schedule(unsigned sched_ctx_id);
+
 int _starpu_push_task(struct _starpu_job *task);
 int _starpu_repush_task(struct _starpu_job *task);
 

+ 14 - 0
src/core/task.c

@@ -302,6 +302,7 @@ int _starpu_submit_job(struct _starpu_job *j)
 	_starpu_bound_record(j);
 
 	_starpu_increment_nsubmitted_tasks_of_sched_ctx(j->task->sched_ctx);
+	_starpu_sched_task_submit(task);
 
 #ifdef STARPU_USE_SC_HYPERVISOR
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(j->task->sched_ctx);
@@ -717,6 +718,7 @@ int _starpu_task_submit_nodeps(struct starpu_task *task)
 	struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
 
 	_starpu_increment_nsubmitted_tasks_of_sched_ctx(j->task->sched_ctx);
+	_starpu_sched_task_submit(task);
 
 	STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
 	_starpu_handle_job_submission(j);
@@ -757,6 +759,7 @@ int _starpu_task_submit_conversion_task(struct starpu_task *task,
 	struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
 
 	_starpu_increment_nsubmitted_tasks_of_sched_ctx(j->task->sched_ctx);
+	_starpu_sched_task_submit(task);
 
 	STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
 	_starpu_handle_job_submission(j);
@@ -831,7 +834,10 @@ int starpu_task_wait_for_all(void)
 #endif
 		struct _starpu_machine_config *config = (struct _starpu_machine_config *)_starpu_get_machine_config();
 		if(config->topology.nsched_ctxs == 1)
+		{
+			_starpu_sched_do_schedule(0);
 			starpu_task_wait_for_all_in_ctx(0);
+		}
 		else
 		{
 			int s;
@@ -839,6 +845,13 @@ int starpu_task_wait_for_all(void)
 			{
 				if(config->sched_ctxs[s].id != STARPU_NMAX_SCHED_CTXS)
 				{
+					_starpu_sched_do_schedule(config->sched_ctxs[s].id);
+				}
+			}
+			for(s = 0; s < STARPU_NMAX_SCHED_CTXS; s++)
+			{
+				if(config->sched_ctxs[s].id != STARPU_NMAX_SCHED_CTXS)
+				{
 					starpu_task_wait_for_all_in_ctx(config->sched_ctxs[s].id);
 				}
 			}
@@ -848,6 +861,7 @@ int starpu_task_wait_for_all(void)
 	}
 	else
 	{
+		_starpu_sched_do_schedule(sched_ctx_id);
 		_STARPU_DEBUG("Waiting for tasks submitted to context %u\n", sched_ctx_id);
 		return starpu_task_wait_for_all_in_ctx(sched_ctx_id);
 	}