|
@@ -87,7 +87,7 @@ struct _starpu_work_stealing_data_per_worker
|
|
|
|
|
|
struct _starpu_work_stealing_data
|
|
|
{
|
|
|
- int (*select_victim)(unsigned, int);
|
|
|
+ int (*select_victim)(struct _starpu_work_stealing_data *, unsigned, int);
|
|
|
struct _starpu_work_stealing_data_per_worker *per_worker;
|
|
|
/* keep track of the work performed from the beginning of the algorithm to make
|
|
|
* better decisions about which queue to select when stealing or deferring work
|
|
@@ -113,9 +113,8 @@ static int calibration_value = 0;
|
|
|
* the worker previously selected doesn't own any task,
|
|
|
* then we return the first non-empty worker.
|
|
|
*/
|
|
|
-static int select_victim_round_robin(unsigned sched_ctx_id)
|
|
|
+static int select_victim_round_robin(struct _starpu_work_stealing_data *ws, unsigned sched_ctx_id)
|
|
|
{
|
|
|
- struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
|
|
|
unsigned worker = ws->last_pop_worker;
|
|
|
unsigned nworkers;
|
|
|
int *workerids = NULL;
|
|
@@ -158,9 +157,8 @@ static int select_victim_round_robin(unsigned sched_ctx_id)
|
|
|
* Return a worker to whom add a task.
|
|
|
* Selecting a worker is done in a round-robin fashion.
|
|
|
*/
|
|
|
-static unsigned select_worker_round_robin(unsigned sched_ctx_id)
|
|
|
+static unsigned select_worker_round_robin(struct _starpu_work_stealing_data *ws, unsigned sched_ctx_id)
|
|
|
{
|
|
|
- struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
|
|
|
unsigned worker = ws->last_push_worker;
|
|
|
unsigned nworkers;
|
|
|
int *workerids;
|
|
@@ -175,9 +173,8 @@ static unsigned select_worker_round_robin(unsigned sched_ctx_id)
|
|
|
|
|
|
#ifdef USE_LOCALITY
|
|
|
/* Select a worker according to the locality of the data of the task to be scheduled */
|
|
|
-static unsigned select_worker_locality(struct starpu_task *task, unsigned sched_ctx_id)
|
|
|
+static unsigned select_worker_locality(struct _starpu_work_stealing_data *ws, struct starpu_task *task, unsigned sched_ctx_id)
|
|
|
{
|
|
|
- struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
|
|
|
unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
|
|
|
if (nbuffers == 0)
|
|
|
return -1;
|
|
@@ -238,11 +235,10 @@ static void record_data_locality(struct starpu_task *task STARPU_ATTRIBUTE_UNUSE
|
|
|
|
|
|
#ifdef USE_LOCALITY_TASKS
|
|
|
/* Record in the worker which data it used last with the locality flag */
|
|
|
-static void record_worker_locality(struct starpu_task *task, int workerid, unsigned sched_ctx_id)
|
|
|
+static void record_worker_locality(struct _starpu_work_stealing_data *ws, struct starpu_task *task, int workerid, unsigned sched_ctx_id)
|
|
|
{
|
|
|
/* Record where in locality data where the task went */
|
|
|
unsigned i;
|
|
|
- struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
|
|
|
struct _starpu_work_stealing_data_per_worker *data = &ws->per_worker[workerid];
|
|
|
|
|
|
data->nlast_locality = 0;
|
|
@@ -256,9 +252,8 @@ static void record_worker_locality(struct starpu_task *task, int workerid, unsig
|
|
|
}
|
|
|
}
|
|
|
/* Called when pushing a task to a queue */
|
|
|
-static void locality_pushed_task(struct starpu_task *task, int workerid, unsigned sched_ctx_id)
|
|
|
+static void locality_pushed_task(struct _starpu_work_stealing_data *ws, struct starpu_task *task, int workerid, unsigned sched_ctx_id)
|
|
|
{
|
|
|
- struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
|
|
|
struct _starpu_work_stealing_data_per_worker *data = &ws->per_worker[workerid];
|
|
|
unsigned i;
|
|
|
for (i = 0; i < STARPU_TASK_GET_NBUFFERS(task); i++)
|
|
@@ -332,9 +327,8 @@ static struct starpu_task *ws_pick_task(struct _starpu_work_stealing_data *ws, i
|
|
|
}
|
|
|
|
|
|
/* Called when popping a task from a queue */
|
|
|
-static void locality_popped_task(struct starpu_task *task, int workerid, unsigned sched_ctx_id)
|
|
|
+static void locality_popped_task(struct _starpu_work_stealing_data *ws, struct starpu_task *task, int workerid, unsigned sched_ctx_id)
|
|
|
{
|
|
|
- struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
|
|
|
struct _starpu_work_stealing_data_per_worker *data = &ws->per_worker[workerid];
|
|
|
unsigned i;
|
|
|
for (i = 0; i < STARPU_TASK_GET_NBUFFERS(task); i++)
|
|
@@ -354,11 +348,11 @@ static void locality_popped_task(struct starpu_task *task, int workerid, unsigne
|
|
|
}
|
|
|
}
|
|
|
#else
|
|
|
-static void record_worker_locality(struct starpu_task *task STARPU_ATTRIBUTE_UNUSED, int workerid STARPU_ATTRIBUTE_UNUSED, unsigned sched_ctx_id STARPU_ATTRIBUTE_UNUSED)
|
|
|
+static void record_worker_locality(struct _starpu_work_stealing_data *ws STARPU_ATTRIBUTE_UNUSED, struct starpu_task *task STARPU_ATTRIBUTE_UNUSED, int workerid STARPU_ATTRIBUTE_UNUSED, unsigned sched_ctx_id STARPU_ATTRIBUTE_UNUSED)
|
|
|
{
|
|
|
}
|
|
|
/* Called when pushing a task to a queue */
|
|
|
-static void locality_pushed_task(struct starpu_task *task STARPU_ATTRIBUTE_UNUSED, int workerid STARPU_ATTRIBUTE_UNUSED, unsigned sched_ctx_id STARPU_ATTRIBUTE_UNUSED)
|
|
|
+static void locality_pushed_task(struct _starpu_work_stealing_data *ws STARPU_ATTRIBUTE_UNUSED, struct starpu_task *task STARPU_ATTRIBUTE_UNUSED, int workerid STARPU_ATTRIBUTE_UNUSED, unsigned sched_ctx_id STARPU_ATTRIBUTE_UNUSED)
|
|
|
{
|
|
|
}
|
|
|
/* Pick a task from workerid's queue, for execution on target */
|
|
@@ -367,7 +361,7 @@ static struct starpu_task *ws_pick_task(struct _starpu_work_stealing_data *ws, i
|
|
|
return _starpu_fifo_pop_task(ws->per_worker[source].queue_array, target);
|
|
|
}
|
|
|
/* Called when popping a task from a queue */
|
|
|
-static void locality_popped_task(struct starpu_task *task STARPU_ATTRIBUTE_UNUSED, int workerid STARPU_ATTRIBUTE_UNUSED, unsigned sched_ctx_id STARPU_ATTRIBUTE_UNUSED)
|
|
|
+static void locality_popped_task(struct _starpu_work_stealing_data *ws STARPU_ATTRIBUTE_UNUSED, struct starpu_task *task STARPU_ATTRIBUTE_UNUSED, int workerid STARPU_ATTRIBUTE_UNUSED, unsigned sched_ctx_id STARPU_ATTRIBUTE_UNUSED)
|
|
|
{
|
|
|
}
|
|
|
#endif
|
|
@@ -382,9 +376,8 @@ static void locality_popped_task(struct starpu_task *task STARPU_ATTRIBUTE_UNUSE
|
|
|
* a smaller value implies a faster worker with an relatively emptier queue : more suitable to put tasks in
|
|
|
* a bigger value implies a slower worker with an reletively more replete queue : more suitable to steal tasks from
|
|
|
*/
|
|
|
-static float overload_metric(unsigned sched_ctx_id, unsigned id)
|
|
|
+static float overload_metric(struct _starpu_work_stealing_data *ws, unsigned sched_ctx_id, unsigned id)
|
|
|
{
|
|
|
- struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
|
|
|
float execution_ratio = 0.0f;
|
|
|
float current_ratio = 0.0f;
|
|
|
|
|
@@ -414,9 +407,8 @@ static float overload_metric(unsigned sched_ctx_id, unsigned id)
|
|
|
* by the tasks are taken into account to select the most suitable
|
|
|
* worker to steal task from.
|
|
|
*/
|
|
|
-static int select_victim_overload(unsigned sched_ctx_id)
|
|
|
+static int select_victim_overload(struct _starpu_work_stealing_data *ws, unsigned sched_ctx_id)
|
|
|
{
|
|
|
- struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
|
|
|
unsigned worker;
|
|
|
float worker_ratio;
|
|
|
unsigned best_worker = 0;
|
|
@@ -425,7 +417,7 @@ static int select_victim_overload(unsigned sched_ctx_id)
|
|
|
/* Don't try to play smart until we get
|
|
|
* enough informations. */
|
|
|
if (performed_total < calibration_value)
|
|
|
- return select_victim_round_robin(sched_ctx_id);
|
|
|
+ return select_victim_round_robin(ws, sched_ctx_id);
|
|
|
|
|
|
struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
|
|
|
|
|
@@ -435,7 +427,7 @@ static int select_victim_overload(unsigned sched_ctx_id)
|
|
|
while(workers->has_next(workers, &it))
|
|
|
{
|
|
|
worker = workers->get_next(workers, &it);
|
|
|
- worker_ratio = overload_metric(sched_ctx_id, worker);
|
|
|
+ worker_ratio = overload_metric(ws, sched_ctx_id, worker);
|
|
|
|
|
|
if (worker_ratio > best_ratio && ws->per_worker[worker].busy)
|
|
|
{
|
|
@@ -454,7 +446,7 @@ static int select_victim_overload(unsigned sched_ctx_id)
|
|
|
* by the tasks are taken into account to select the most suitable
|
|
|
* worker to add a task to.
|
|
|
*/
|
|
|
-static unsigned select_worker_overload(unsigned sched_ctx_id)
|
|
|
+static unsigned select_worker_overload(struct _starpu_work_stealing_data *ws, unsigned sched_ctx_id)
|
|
|
{
|
|
|
unsigned worker;
|
|
|
float worker_ratio;
|
|
@@ -475,7 +467,7 @@ static unsigned select_worker_overload(unsigned sched_ctx_id)
|
|
|
{
|
|
|
worker = workers->get_next(workers, &it);
|
|
|
|
|
|
- worker_ratio = overload_metric(sched_ctx_id, worker);
|
|
|
+ worker_ratio = overload_metric(ws, sched_ctx_id, worker);
|
|
|
|
|
|
if (worker_ratio < best_ratio)
|
|
|
{
|
|
@@ -495,13 +487,13 @@ static unsigned select_worker_overload(unsigned sched_ctx_id)
|
|
|
* This is a phony function used to call the right
|
|
|
* function depending on the value of USE_OVERLOAD.
|
|
|
*/
|
|
|
-static inline int select_victim(unsigned sched_ctx_id,
|
|
|
+static inline int select_victim(struct _starpu_work_stealing_data *ws, unsigned sched_ctx_id,
|
|
|
int workerid STARPU_ATTRIBUTE_UNUSED)
|
|
|
{
|
|
|
#ifdef USE_OVERLOAD
|
|
|
- return select_victim_overload(sched_ctx_id);
|
|
|
+ return select_victim_overload(ws, sched_ctx_id);
|
|
|
#else
|
|
|
- return select_victim_round_robin(sched_ctx_id);
|
|
|
+ return select_victim_round_robin(ws, sched_ctx_id);
|
|
|
#endif /* USE_OVERLOAD */
|
|
|
}
|
|
|
|
|
@@ -510,12 +502,12 @@ static inline int select_victim(unsigned sched_ctx_id,
|
|
|
* This is a phony function used to call the right
|
|
|
* function depending on the value of USE_OVERLOAD.
|
|
|
*/
|
|
|
-static inline unsigned select_worker(unsigned sched_ctx_id)
|
|
|
+static inline unsigned select_worker(struct _starpu_work_stealing_data *ws, unsigned sched_ctx_id)
|
|
|
{
|
|
|
#ifdef USE_OVERLOAD
|
|
|
- return select_worker_overload(sched_ctx_id);
|
|
|
+ return select_worker_overload(ws, sched_ctx_id);
|
|
|
#else
|
|
|
- return select_worker_round_robin(sched_ctx_id);
|
|
|
+ return select_worker_round_robin(ws, sched_ctx_id);
|
|
|
#endif /* USE_OVERLOAD */
|
|
|
}
|
|
|
|
|
@@ -537,7 +529,7 @@ static struct starpu_task *ws_pop_task(unsigned sched_ctx_id)
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&ws->per_worker[workerid].worker_mutex);
|
|
|
task = ws_pick_task(ws, workerid, workerid);
|
|
|
if (task)
|
|
|
- locality_popped_task(task, workerid, sched_ctx_id);
|
|
|
+ locality_popped_task(ws, task, workerid, sched_ctx_id);
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&ws->per_worker[workerid].worker_mutex);
|
|
|
}
|
|
|
|
|
@@ -556,7 +548,7 @@ static struct starpu_task *ws_pop_task(unsigned sched_ctx_id)
|
|
|
|
|
|
|
|
|
/* we need to steal someone's job */
|
|
|
- int victim = ws->select_victim(sched_ctx_id, workerid);
|
|
|
+ int victim = ws->select_victim(ws, sched_ctx_id, workerid);
|
|
|
if (victim == -1)
|
|
|
{
|
|
|
STARPU_PTHREAD_MUTEX_LOCK_SCHED(sched_mutex);
|
|
@@ -579,8 +571,8 @@ static struct starpu_task *ws_pop_task(unsigned sched_ctx_id)
|
|
|
_STARPU_TRACE_WORK_STEALING(workerid, victim);
|
|
|
_STARPU_TASK_BREAK_ON(task, sched);
|
|
|
record_data_locality(task, workerid);
|
|
|
- record_worker_locality(task, workerid, sched_ctx_id);
|
|
|
- locality_popped_task(task, victim, sched_ctx_id);
|
|
|
+ record_worker_locality(ws, task, workerid, sched_ctx_id);
|
|
|
+ locality_popped_task(ws, task, victim, sched_ctx_id);
|
|
|
}
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&ws->per_worker[victim].worker_mutex);
|
|
|
|
|
@@ -599,7 +591,7 @@ int ws_push_task(struct starpu_task *task)
|
|
|
int workerid = -1;
|
|
|
|
|
|
#ifdef USE_LOCALITY
|
|
|
- workerid = select_worker_locality(task, sched_ctx_id);
|
|
|
+ workerid = select_worker_locality(ws, task, sched_ctx_id);
|
|
|
#endif
|
|
|
if (workerid == -1)
|
|
|
workerid = starpu_worker_get_id();
|
|
@@ -608,7 +600,7 @@ int ws_push_task(struct starpu_task *task)
|
|
|
* the main thread (-1) or the current worker is not in the target
|
|
|
* context, we find the better one to put task on its queue */
|
|
|
if (workerid == -1 || !starpu_sched_ctx_contains_worker(workerid, sched_ctx_id))
|
|
|
- workerid = select_worker(sched_ctx_id);
|
|
|
+ workerid = select_worker(ws, sched_ctx_id);
|
|
|
|
|
|
starpu_pthread_mutex_t *sched_mutex;
|
|
|
starpu_pthread_cond_t *sched_cond;
|
|
@@ -619,7 +611,7 @@ int ws_push_task(struct starpu_task *task)
|
|
|
_STARPU_TASK_BREAK_ON(task, sched);
|
|
|
record_data_locality(task, workerid);
|
|
|
_starpu_fifo_push_task(ws->per_worker[workerid].queue_array, task);
|
|
|
- locality_pushed_task(task, workerid, sched_ctx_id);
|
|
|
+ locality_pushed_task(ws, task, workerid, sched_ctx_id);
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&ws->per_worker[workerid].worker_mutex);
|
|
|
|
|
|
starpu_push_task_end(task);
|
|
@@ -725,9 +717,8 @@ struct starpu_sched_policy _starpu_sched_ws_policy =
|
|
|
* the proximity list built using the info on te architecture provided by hwloc
|
|
|
*/
|
|
|
#ifdef STARPU_HAVE_HWLOC
|
|
|
-static int lws_select_victim(unsigned sched_ctx_id, int workerid)
|
|
|
+static int lws_select_victim(struct _starpu_work_stealing_data *ws, unsigned sched_ctx_id, int workerid)
|
|
|
{
|
|
|
- struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data *)starpu_sched_ctx_get_policy_data(sched_ctx_id);
|
|
|
int nworkers = starpu_sched_ctx_get_nworkers(sched_ctx_id);
|
|
|
int neighbor;
|
|
|
int i;
|