|
@@ -57,6 +57,13 @@
|
|
|
*/
|
|
|
|
|
|
|
|
|
+
|
|
|
+/******************************************************************************
|
|
|
+ * Worker Components' Data Structures *
|
|
|
+ *****************************************************************************/
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
struct _starpu_task_grid
|
|
|
{
|
|
|
/* this member may be NULL if a worker have poped it but its a
|
|
@@ -117,6 +124,14 @@ struct _starpu_worker_component_data
|
|
|
/* this array store worker components */
|
|
|
static struct starpu_sched_component * _worker_components[STARPU_NMAXWORKERS];
|
|
|
|
|
|
+struct starpu_sched_component * starpu_sched_component_worker_get(int workerid);
|
|
|
+
|
|
|
+
|
|
|
+/******************************************************************************
|
|
|
+ * Worker Components' Task List and Grid Functions *
|
|
|
+ *****************************************************************************/
|
|
|
+
|
|
|
+
|
|
|
|
|
|
static struct _starpu_worker_task_list * _starpu_worker_task_list_create(void)
|
|
|
{
|
|
@@ -127,16 +142,27 @@ static struct _starpu_worker_task_list * _starpu_worker_task_list_create(void)
|
|
|
STARPU_PTHREAD_MUTEX_INIT(&l->mutex,NULL);
|
|
|
return l;
|
|
|
}
|
|
|
+
|
|
|
static struct _starpu_task_grid * _starpu_task_grid_create(void)
|
|
|
{
|
|
|
struct _starpu_task_grid * t = malloc(sizeof(*t));
|
|
|
memset(t, 0, sizeof(*t));
|
|
|
return t;
|
|
|
}
|
|
|
+
|
|
|
+static struct _starpu_worker_task_list * _worker_get_list(void)
|
|
|
+{
|
|
|
+ int workerid = starpu_worker_get_id();
|
|
|
+ STARPU_ASSERT(0 <= workerid && workerid < (int) starpu_worker_get_count());
|
|
|
+ struct _starpu_worker_component_data * d = starpu_sched_component_worker_get(workerid)->data;
|
|
|
+ return d->list;
|
|
|
+}
|
|
|
+
|
|
|
static void _starpu_task_grid_destroy(struct _starpu_task_grid * t)
|
|
|
{
|
|
|
free(t);
|
|
|
}
|
|
|
+
|
|
|
static void _starpu_worker_task_list_destroy(struct _starpu_worker_task_list * l)
|
|
|
{
|
|
|
if(l)
|
|
@@ -272,51 +298,11 @@ static inline struct starpu_task * _starpu_worker_task_list_pop(struct _starpu_w
|
|
|
|
|
|
|
|
|
|
|
|
+/******************************************************************************
|
|
|
+ * Worker Components' Private Helper Functions *
|
|
|
+ *****************************************************************************/
|
|
|
|
|
|
|
|
|
-static struct starpu_sched_component * starpu_sched_component_worker_create(int workerid);
|
|
|
-static struct starpu_sched_component * starpu_sched_component_combined_worker_create(int workerid);
|
|
|
-struct starpu_sched_component * starpu_sched_component_worker_get(int workerid)
|
|
|
-{
|
|
|
- STARPU_ASSERT(workerid >= 0 && workerid < STARPU_NMAXWORKERS);
|
|
|
- /* we may need to take a mutex here */
|
|
|
- if(_worker_components[workerid])
|
|
|
- return _worker_components[workerid];
|
|
|
- else
|
|
|
- {
|
|
|
- struct starpu_sched_component * component;
|
|
|
- if(workerid < (int) starpu_worker_get_count())
|
|
|
- component = starpu_sched_component_worker_create(workerid);
|
|
|
- else
|
|
|
- component = starpu_sched_component_combined_worker_create(workerid);
|
|
|
- _worker_components[workerid] = component;
|
|
|
- return component;
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-struct _starpu_worker * _starpu_sched_component_worker_get_worker(struct starpu_sched_component * worker_component)
|
|
|
-{
|
|
|
- STARPU_ASSERT(starpu_sched_component_is_simple_worker(worker_component));
|
|
|
- struct _starpu_worker_component_data * data = worker_component->data;
|
|
|
- return data->worker;
|
|
|
-}
|
|
|
-struct _starpu_combined_worker * _starpu_sched_component_combined_worker_get_combined_worker(struct starpu_sched_component * worker_component)
|
|
|
-{
|
|
|
- STARPU_ASSERT(starpu_sched_component_is_combined_worker(worker_component));
|
|
|
- struct _starpu_worker_component_data * data = worker_component->data;
|
|
|
- return data->combined_worker;
|
|
|
-}
|
|
|
-
|
|
|
-/*
|
|
|
-enum starpu_perfmodel_archtype starpu_sched_component_worker_get_perf_arch(struct starpu_sched_component * worker_component)
|
|
|
-{
|
|
|
- STARPU_ASSERT(starpu_sched_component_is_worker(worker_component));
|
|
|
- if(starpu_sched_component_is_simple_worker(worker_component))
|
|
|
- return _starpu_sched_component_worker_get_worker(worker_component)->perf_arch;
|
|
|
- else
|
|
|
- return _starpu_sched_component_combined_worker_get_combined_worker(worker_component)->perf_arch;
|
|
|
-}
|
|
|
-*/
|
|
|
|
|
|
static void _starpu_sched_component_worker_set_sleep_status(struct starpu_sched_component * worker_component)
|
|
|
{
|
|
@@ -324,14 +310,12 @@ static void _starpu_sched_component_worker_set_sleep_status(struct starpu_sched_
|
|
|
struct _starpu_worker_component_data * data = worker_component->data;
|
|
|
data->status = COMPONENT_STATUS_SLEEPING;
|
|
|
}
|
|
|
-
|
|
|
static void _starpu_sched_component_worker_set_changed_status(struct starpu_sched_component * worker_component)
|
|
|
{
|
|
|
STARPU_ASSERT(starpu_sched_component_is_worker(worker_component));
|
|
|
struct _starpu_worker_component_data * data = worker_component->data;
|
|
|
data->status = COMPONENT_STATUS_CHANGED;
|
|
|
}
|
|
|
-
|
|
|
static void _starpu_sched_component_worker_reset_status(struct starpu_sched_component * worker_component)
|
|
|
{
|
|
|
STARPU_ASSERT(starpu_sched_component_is_worker(worker_component));
|
|
@@ -345,14 +329,12 @@ static int _starpu_sched_component_worker_is_reset_status(struct starpu_sched_co
|
|
|
struct _starpu_worker_component_data * data = worker_component->data;
|
|
|
return (data->status == COMPONENT_STATUS_RESET);
|
|
|
}
|
|
|
-
|
|
|
static int _starpu_sched_component_worker_is_changed_status(struct starpu_sched_component * worker_component)
|
|
|
{
|
|
|
STARPU_ASSERT(starpu_sched_component_is_worker(worker_component));
|
|
|
struct _starpu_worker_component_data * data = worker_component->data;
|
|
|
return (data->status == COMPONENT_STATUS_CHANGED);
|
|
|
}
|
|
|
-
|
|
|
static int _starpu_sched_component_worker_is_sleeping_status(struct starpu_sched_component * worker_component)
|
|
|
{
|
|
|
STARPU_ASSERT(starpu_sched_component_is_worker(worker_component));
|
|
@@ -360,19 +342,69 @@ static int _starpu_sched_component_worker_is_sleeping_status(struct starpu_sched
|
|
|
return (data->status == COMPONENT_STATUS_SLEEPING);
|
|
|
}
|
|
|
|
|
|
+#ifndef STARPU_NO_ASSERT
|
|
|
+static int _worker_consistant(struct starpu_sched_component * component)
|
|
|
+{
|
|
|
+ int is_a_worker = 0;
|
|
|
+ int i;
|
|
|
+ for(i = 0; i<STARPU_NMAXWORKERS; i++)
|
|
|
+ if(_worker_components[i] == component)
|
|
|
+ is_a_worker = 1;
|
|
|
+ if(!is_a_worker)
|
|
|
+ return 0;
|
|
|
+ struct _starpu_worker_component_data * data = component->data;
|
|
|
+ if(data->worker)
|
|
|
+ {
|
|
|
+ int id = data->worker->workerid;
|
|
|
+ return (_worker_components[id] == component)
|
|
|
+ && component->nchildren == 0;
|
|
|
+ }
|
|
|
+ return 1;
|
|
|
+}
|
|
|
+#endif
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+/******************************************************************************
|
|
|
+ * Worker Components' Public Helper Functions (Part 1) *
|
|
|
+ *****************************************************************************/
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+struct _starpu_worker * _starpu_sched_component_worker_get_worker(struct starpu_sched_component * worker_component)
|
|
|
+{
|
|
|
+ STARPU_ASSERT(starpu_sched_component_is_simple_worker(worker_component));
|
|
|
+ struct _starpu_worker_component_data * data = worker_component->data;
|
|
|
+ return data->worker;
|
|
|
+}
|
|
|
+struct _starpu_combined_worker * _starpu_sched_component_combined_worker_get_combined_worker(struct starpu_sched_component * worker_component)
|
|
|
+{
|
|
|
+ STARPU_ASSERT(starpu_sched_component_is_combined_worker(worker_component));
|
|
|
+ struct _starpu_worker_component_data * data = worker_component->data;
|
|
|
+ return data->combined_worker;
|
|
|
+}
|
|
|
+
|
|
|
void _starpu_sched_component_lock_worker(int workerid)
|
|
|
{
|
|
|
STARPU_ASSERT(0 <= workerid && workerid < (int) starpu_worker_get_count());
|
|
|
- struct _starpu_worker_component_data * data = starpu_sched_component_worker_create(workerid)->data;
|
|
|
+ struct _starpu_worker_component_data * data = starpu_sched_component_worker_get(workerid)->data;
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&data->lock);
|
|
|
}
|
|
|
void _starpu_sched_component_unlock_worker(int workerid)
|
|
|
{
|
|
|
STARPU_ASSERT(0 <= workerid && workerid < (int)starpu_worker_get_count());
|
|
|
- struct _starpu_worker_component_data * data = starpu_sched_component_worker_create(workerid)->data;
|
|
|
+ struct _starpu_worker_component_data * data = starpu_sched_component_worker_get(workerid)->data;
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&data->lock);
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+
|
|
|
+/******************************************************************************
|
|
|
+ * Simple Worker Components' Interface Functions *
|
|
|
+ *****************************************************************************/
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
static void simple_worker_can_pull(struct starpu_sched_component * worker_component)
|
|
|
{
|
|
|
(void) worker_component;
|
|
@@ -398,33 +430,6 @@ static void simple_worker_can_pull(struct starpu_sched_component * worker_compon
|
|
|
_starpu_sched_component_unlock_worker(w->workerid);
|
|
|
}
|
|
|
|
|
|
-static void combined_worker_can_pull(struct starpu_sched_component * component)
|
|
|
-{
|
|
|
- (void) component;
|
|
|
- STARPU_ASSERT(starpu_sched_component_is_combined_worker(component));
|
|
|
- struct _starpu_worker_component_data * data = component->data;
|
|
|
- int workerid = starpu_worker_get_id();
|
|
|
- int i;
|
|
|
- for(i = 0; i < data->combined_worker->worker_size; i++)
|
|
|
- {
|
|
|
- if(i == workerid)
|
|
|
- continue;
|
|
|
- int worker = data->combined_worker->combined_workerid[i];
|
|
|
- _starpu_sched_component_lock_worker(worker);
|
|
|
- if(_starpu_sched_component_worker_is_sleeping_status(component))
|
|
|
- {
|
|
|
- starpu_pthread_mutex_t *sched_mutex;
|
|
|
- starpu_pthread_cond_t *sched_cond;
|
|
|
- starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
|
|
|
- starpu_wakeup_worker(worker, sched_cond, sched_mutex);
|
|
|
- }
|
|
|
- if(_starpu_sched_component_worker_is_reset_status(component))
|
|
|
- _starpu_sched_component_worker_set_changed_status(component);
|
|
|
-
|
|
|
- _starpu_sched_component_unlock_worker(worker);
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
static int simple_worker_push_task(struct starpu_sched_component * component, struct starpu_task *task)
|
|
|
{
|
|
|
STARPU_ASSERT(starpu_sched_component_is_worker(component));
|
|
@@ -449,7 +454,7 @@ static int simple_worker_push_task(struct starpu_sched_component * component, st
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
|
-struct starpu_task * simple_worker_pop_task(struct starpu_sched_component *component)
|
|
|
+static struct starpu_task * simple_worker_pop_task(struct starpu_sched_component *component)
|
|
|
{
|
|
|
struct _starpu_worker_component_data * data = component->data;
|
|
|
struct _starpu_worker_task_list * list = data->list;
|
|
@@ -499,73 +504,7 @@ struct starpu_task * simple_worker_pop_task(struct starpu_sched_component *compo
|
|
|
starpu_push_task_end(task);
|
|
|
return task;
|
|
|
}
|
|
|
-void starpu_sched_component_worker_destroy(struct starpu_sched_component *component)
|
|
|
-{
|
|
|
- struct _starpu_worker * worker = _starpu_sched_component_worker_get_worker(component);
|
|
|
- unsigned id = worker->workerid;
|
|
|
- assert(_worker_components[id] == component);
|
|
|
- int i;
|
|
|
- for(i = 0; i < STARPU_NMAX_SCHED_CTXS ; i++)
|
|
|
- if(component->parents[i] != NULL)
|
|
|
- return;//this component is shared between several contexts
|
|
|
- starpu_sched_component_destroy(component);
|
|
|
- _worker_components[id] = NULL;
|
|
|
-}
|
|
|
|
|
|
-void _starpu_sched_component_lock_all_workers(void)
|
|
|
-{
|
|
|
- unsigned i;
|
|
|
- for(i = 0; i < starpu_worker_get_count(); i++)
|
|
|
- _starpu_sched_component_lock_worker(i);
|
|
|
-}
|
|
|
-void _starpu_sched_component_unlock_all_workers(void)
|
|
|
-{
|
|
|
- unsigned i;
|
|
|
- for(i = 0; i < starpu_worker_get_count(); i++)
|
|
|
- _starpu_sched_component_unlock_worker(i);
|
|
|
-}
|
|
|
-
|
|
|
-
|
|
|
-/*
|
|
|
- dont know if this may be usefull…
|
|
|
-static double worker_estimated_finish_time(struct _starpu_worker * worker)
|
|
|
-{
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&worker->mutex);
|
|
|
- double sum = 0.0;
|
|
|
- struct starpu_task_list list = worker->local_tasks;
|
|
|
- struct starpu_task * task;
|
|
|
- for(task = starpu_task_list_front(&list);
|
|
|
- task != starpu_task_list_end(&list);
|
|
|
- task = starpu_task_list_next(task))
|
|
|
- if(!isnan(task->predicted))
|
|
|
- sum += task->predicted;
|
|
|
- if(worker->current_task)
|
|
|
- {
|
|
|
- struct starpu_task * t = worker->current_task;
|
|
|
- if(t && !isnan(t->predicted))
|
|
|
- sum += t->predicted/2;
|
|
|
- }
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&worker->mutex);
|
|
|
- return sum + starpu_timing_now();
|
|
|
-}
|
|
|
-*/
|
|
|
-static double combined_worker_estimated_end(struct starpu_sched_component * component)
|
|
|
-{
|
|
|
- STARPU_ASSERT(starpu_sched_component_is_combined_worker(component));
|
|
|
- struct _starpu_worker_component_data * data = component->data;
|
|
|
- struct _starpu_combined_worker * combined_worker = data->combined_worker;
|
|
|
- double max = 0.0;
|
|
|
- int i;
|
|
|
- for(i = 0; i < combined_worker->worker_size; i++)
|
|
|
- {
|
|
|
- data = _worker_components[combined_worker->combined_workerid[i]]->data;
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&data->list->mutex);
|
|
|
- double tmp = data->list->exp_end;
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&data->list->mutex);
|
|
|
- max = tmp > max ? tmp : max;
|
|
|
- }
|
|
|
- return max;
|
|
|
-}
|
|
|
static double simple_worker_estimated_end(struct starpu_sched_component * component)
|
|
|
{
|
|
|
struct _starpu_worker_component_data * data = component->data;
|
|
@@ -576,8 +515,6 @@ static double simple_worker_estimated_end(struct starpu_sched_component * compon
|
|
|
return tmp;
|
|
|
}
|
|
|
|
|
|
-
|
|
|
-
|
|
|
static double simple_worker_estimated_load(struct starpu_sched_component * component)
|
|
|
{
|
|
|
struct _starpu_worker * worker = _starpu_sched_component_worker_get_worker(component);
|
|
@@ -598,18 +535,96 @@ static double simple_worker_estimated_load(struct starpu_sched_component * compo
|
|
|
starpu_worker_get_perf_archtype(starpu_bitmap_first(component->workers)));
|
|
|
}
|
|
|
|
|
|
-static double combined_worker_estimated_load(struct starpu_sched_component * component)
|
|
|
+static void _worker_component_deinit_data(struct starpu_sched_component * component)
|
|
|
{
|
|
|
struct _starpu_worker_component_data * d = component->data;
|
|
|
- struct _starpu_combined_worker * c = d->combined_worker;
|
|
|
- double load = 0;
|
|
|
+ _starpu_worker_task_list_destroy(d->list);
|
|
|
+ if(starpu_sched_component_is_simple_worker(component))
|
|
|
+ STARPU_PTHREAD_MUTEX_DESTROY(&d->lock);
|
|
|
int i;
|
|
|
- for(i = 0; i < c->worker_size; i++)
|
|
|
+ for(i = 0; i < STARPU_NMAXWORKERS; i++)
|
|
|
+ if(_worker_components[i] == component)
|
|
|
+ {
|
|
|
+ _worker_components[i] = NULL;
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ free(d);
|
|
|
+}
|
|
|
+
|
|
|
+static struct starpu_sched_component * starpu_sched_component_worker_create(int workerid)
|
|
|
+{
|
|
|
+ STARPU_ASSERT(0 <= workerid && workerid < (int) starpu_worker_get_count());
|
|
|
+
|
|
|
+ if(_worker_components[workerid])
|
|
|
+ return _worker_components[workerid];
|
|
|
+
|
|
|
+ struct _starpu_worker * worker = _starpu_get_worker_struct(workerid);
|
|
|
+ if(worker == NULL)
|
|
|
+ return NULL;
|
|
|
+ struct starpu_sched_component * component = starpu_sched_component_create();
|
|
|
+ struct _starpu_worker_component_data * data = malloc(sizeof(*data));
|
|
|
+ memset(data, 0, sizeof(*data));
|
|
|
+
|
|
|
+ data->worker = worker;
|
|
|
+ STARPU_PTHREAD_MUTEX_INIT(&data->lock,NULL);
|
|
|
+ data->status = COMPONENT_STATUS_SLEEPING;
|
|
|
+ data->list = _starpu_worker_task_list_create();
|
|
|
+ component->data = data;
|
|
|
+
|
|
|
+ component->push_task = simple_worker_push_task;
|
|
|
+ component->pop_task = simple_worker_pop_task;
|
|
|
+ component->can_pull = simple_worker_can_pull;
|
|
|
+ component->estimated_end = simple_worker_estimated_end;
|
|
|
+ component->estimated_load = simple_worker_estimated_load;
|
|
|
+ component->deinit_data = _worker_component_deinit_data;
|
|
|
+ starpu_bitmap_set(component->workers, workerid);
|
|
|
+ starpu_bitmap_or(component->workers_in_ctx, component->workers);
|
|
|
+ _worker_components[workerid] = component;
|
|
|
+
|
|
|
+#ifdef STARPU_HAVE_HWLOC
|
|
|
+ struct _starpu_machine_config *config = _starpu_get_machine_config();
|
|
|
+ struct _starpu_machine_topology *topology = &config->topology;
|
|
|
+ hwloc_obj_t obj = hwloc_get_obj_by_depth(topology->hwtopology, config->cpu_depth, worker->bindid);
|
|
|
+ STARPU_ASSERT(obj);
|
|
|
+ component->obj = obj;
|
|
|
+#endif
|
|
|
+
|
|
|
+ return component;
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+/******************************************************************************
|
|
|
+ * Combined Worker Components' Interface Functions *
|
|
|
+ *****************************************************************************/
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+static void combined_worker_can_pull(struct starpu_sched_component * component)
|
|
|
+{
|
|
|
+ (void) component;
|
|
|
+ STARPU_ASSERT(starpu_sched_component_is_combined_worker(component));
|
|
|
+ struct _starpu_worker_component_data * data = component->data;
|
|
|
+ int workerid = starpu_worker_get_id();
|
|
|
+ int i;
|
|
|
+ for(i = 0; i < data->combined_worker->worker_size; i++)
|
|
|
{
|
|
|
- struct starpu_sched_component * n = starpu_sched_component_worker_get(c->combined_workerid[i]);
|
|
|
- load += n->estimated_load(n);
|
|
|
+ if(i == workerid)
|
|
|
+ continue;
|
|
|
+ int worker = data->combined_worker->combined_workerid[i];
|
|
|
+ _starpu_sched_component_lock_worker(worker);
|
|
|
+ if(_starpu_sched_component_worker_is_sleeping_status(component))
|
|
|
+ {
|
|
|
+ starpu_pthread_mutex_t *sched_mutex;
|
|
|
+ starpu_pthread_cond_t *sched_cond;
|
|
|
+ starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
|
|
|
+ starpu_wakeup_worker(worker, sched_cond, sched_mutex);
|
|
|
+ }
|
|
|
+ if(_starpu_sched_component_worker_is_reset_status(component))
|
|
|
+ _starpu_sched_component_worker_set_changed_status(component);
|
|
|
+
|
|
|
+ _starpu_sched_component_unlock_worker(worker);
|
|
|
}
|
|
|
- return load;
|
|
|
}
|
|
|
|
|
|
static int combined_worker_push_task(struct starpu_sched_component * component, struct starpu_task *task)
|
|
@@ -683,64 +698,38 @@ static int combined_worker_push_task(struct starpu_sched_component * component,
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
|
-void _worker_component_deinit_data(struct starpu_sched_component * component)
|
|
|
+static double combined_worker_estimated_end(struct starpu_sched_component * component)
|
|
|
{
|
|
|
- struct _starpu_worker_component_data * d = component->data;
|
|
|
- _starpu_worker_task_list_destroy(d->list);
|
|
|
- if(starpu_sched_component_is_simple_worker(component))
|
|
|
- STARPU_PTHREAD_MUTEX_DESTROY(&d->lock);
|
|
|
+ STARPU_ASSERT(starpu_sched_component_is_combined_worker(component));
|
|
|
+ struct _starpu_worker_component_data * data = component->data;
|
|
|
+ struct _starpu_combined_worker * combined_worker = data->combined_worker;
|
|
|
+ double max = 0.0;
|
|
|
int i;
|
|
|
- for(i = 0; i < STARPU_NMAXWORKERS; i++)
|
|
|
- if(_worker_components[i] == component)
|
|
|
- {
|
|
|
- _worker_components[i] = NULL;
|
|
|
- return;
|
|
|
- }
|
|
|
- free(d);
|
|
|
+ for(i = 0; i < combined_worker->worker_size; i++)
|
|
|
+ {
|
|
|
+ data = _worker_components[combined_worker->combined_workerid[i]]->data;
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&data->list->mutex);
|
|
|
+ double tmp = data->list->exp_end;
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&data->list->mutex);
|
|
|
+ max = tmp > max ? tmp : max;
|
|
|
+ }
|
|
|
+ return max;
|
|
|
}
|
|
|
|
|
|
-static struct starpu_sched_component * starpu_sched_component_worker_create(int workerid)
|
|
|
+static double combined_worker_estimated_load(struct starpu_sched_component * component)
|
|
|
{
|
|
|
- STARPU_ASSERT(0 <= workerid && workerid < (int) starpu_worker_get_count());
|
|
|
-
|
|
|
- if(_worker_components[workerid])
|
|
|
- return _worker_components[workerid];
|
|
|
-
|
|
|
- struct _starpu_worker * worker = _starpu_get_worker_struct(workerid);
|
|
|
- if(worker == NULL)
|
|
|
- return NULL;
|
|
|
- struct starpu_sched_component * component = starpu_sched_component_create();
|
|
|
- struct _starpu_worker_component_data * data = malloc(sizeof(*data));
|
|
|
- memset(data, 0, sizeof(*data));
|
|
|
-
|
|
|
- data->worker = worker;
|
|
|
- STARPU_PTHREAD_MUTEX_INIT(&data->lock,NULL);
|
|
|
- data->status = COMPONENT_STATUS_SLEEPING;
|
|
|
- data->list = _starpu_worker_task_list_create();
|
|
|
- component->data = data;
|
|
|
-
|
|
|
- component->push_task = simple_worker_push_task;
|
|
|
- component->pop_task = simple_worker_pop_task;
|
|
|
- component->can_pull = simple_worker_can_pull;
|
|
|
- component->estimated_end = simple_worker_estimated_end;
|
|
|
- component->estimated_load = simple_worker_estimated_load;
|
|
|
- component->deinit_data = _worker_component_deinit_data;
|
|
|
- starpu_bitmap_set(component->workers, workerid);
|
|
|
- starpu_bitmap_or(component->workers_in_ctx, component->workers);
|
|
|
- _worker_components[workerid] = component;
|
|
|
-
|
|
|
-#ifdef STARPU_HAVE_HWLOC
|
|
|
- struct _starpu_machine_config *config = _starpu_get_machine_config();
|
|
|
- struct _starpu_machine_topology *topology = &config->topology;
|
|
|
- hwloc_obj_t obj = hwloc_get_obj_by_depth(topology->hwtopology, config->cpu_depth, worker->bindid);
|
|
|
- STARPU_ASSERT(obj);
|
|
|
- component->obj = obj;
|
|
|
-#endif
|
|
|
-
|
|
|
- return component;
|
|
|
+ struct _starpu_worker_component_data * d = component->data;
|
|
|
+ struct _starpu_combined_worker * c = d->combined_worker;
|
|
|
+ double load = 0;
|
|
|
+ int i;
|
|
|
+ for(i = 0; i < c->worker_size; i++)
|
|
|
+ {
|
|
|
+ struct starpu_sched_component * n = starpu_sched_component_worker_get(c->combined_workerid[i]);
|
|
|
+ load += n->estimated_load(n);
|
|
|
+ }
|
|
|
+ return load;
|
|
|
}
|
|
|
|
|
|
-
|
|
|
static struct starpu_sched_component * starpu_sched_component_combined_worker_create(int workerid)
|
|
|
{
|
|
|
STARPU_ASSERT(0 <= workerid && workerid < STARPU_NMAXWORKERS);
|
|
@@ -778,43 +767,39 @@ static struct starpu_sched_component * starpu_sched_component_combined_worker_c
|
|
|
return component;
|
|
|
}
|
|
|
|
|
|
-int starpu_sched_component_is_simple_worker(struct starpu_sched_component * component)
|
|
|
-{
|
|
|
- return component->push_task == simple_worker_push_task;
|
|
|
-}
|
|
|
-int starpu_sched_component_is_combined_worker(struct starpu_sched_component * component)
|
|
|
+
|
|
|
+
|
|
|
+/******************************************************************************
|
|
|
+ * Worker Components' Public Helper Functions (Part 2) *
|
|
|
+ *****************************************************************************/
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+void _starpu_sched_component_lock_all_workers(void)
|
|
|
{
|
|
|
- return component->push_task == combined_worker_push_task;
|
|
|
+ unsigned i;
|
|
|
+ for(i = 0; i < starpu_worker_get_count(); i++)
|
|
|
+ _starpu_sched_component_lock_worker(i);
|
|
|
}
|
|
|
-
|
|
|
-int starpu_sched_component_is_worker(struct starpu_sched_component * component)
|
|
|
+void _starpu_sched_component_unlock_all_workers(void)
|
|
|
{
|
|
|
- return starpu_sched_component_is_simple_worker(component)
|
|
|
- || starpu_sched_component_is_combined_worker(component);
|
|
|
+ unsigned i;
|
|
|
+ for(i = 0; i < starpu_worker_get_count(); i++)
|
|
|
+ _starpu_sched_component_unlock_worker(i);
|
|
|
}
|
|
|
|
|
|
-
|
|
|
-
|
|
|
-#ifndef STARPU_NO_ASSERT
|
|
|
-static int _worker_consistant(struct starpu_sched_component * component)
|
|
|
+void starpu_sched_component_worker_destroy(struct starpu_sched_component *component)
|
|
|
{
|
|
|
- int is_a_worker = 0;
|
|
|
+ struct _starpu_worker * worker = _starpu_sched_component_worker_get_worker(component);
|
|
|
+ unsigned id = worker->workerid;
|
|
|
+ assert(_worker_components[id] == component);
|
|
|
int i;
|
|
|
- for(i = 0; i<STARPU_NMAXWORKERS; i++)
|
|
|
- if(_worker_components[i] == component)
|
|
|
- is_a_worker = 1;
|
|
|
- if(!is_a_worker)
|
|
|
- return 0;
|
|
|
- struct _starpu_worker_component_data * data = component->data;
|
|
|
- if(data->worker)
|
|
|
- {
|
|
|
- int id = data->worker->workerid;
|
|
|
- return (_worker_components[id] == component)
|
|
|
- && component->nchildren == 0;
|
|
|
- }
|
|
|
- return 1;
|
|
|
+ for(i = 0; i < STARPU_NMAX_SCHED_CTXS ; i++)
|
|
|
+ if(component->parents[i] != NULL)
|
|
|
+ return;//this component is shared between several contexts
|
|
|
+ starpu_sched_component_destroy(component);
|
|
|
+ _worker_components[id] = NULL;
|
|
|
}
|
|
|
-#endif
|
|
|
|
|
|
int starpu_sched_component_worker_get_workerid(struct starpu_sched_component * worker_component)
|
|
|
{
|
|
@@ -825,16 +810,6 @@ int starpu_sched_component_worker_get_workerid(struct starpu_sched_component * w
|
|
|
return starpu_bitmap_first(worker_component->workers);
|
|
|
}
|
|
|
|
|
|
-
|
|
|
-static struct _starpu_worker_task_list * _worker_get_list(void)
|
|
|
-{
|
|
|
- int workerid = starpu_worker_get_id();
|
|
|
- STARPU_ASSERT(0 <= workerid && workerid < (int) starpu_worker_get_count());
|
|
|
- struct _starpu_worker_component_data * d = starpu_sched_component_worker_get(workerid)->data;
|
|
|
- return d->list;
|
|
|
-}
|
|
|
-
|
|
|
-
|
|
|
void starpu_sched_component_worker_pre_exec_hook(struct starpu_task * task)
|
|
|
{
|
|
|
if(!isnan(task->predicted))
|
|
@@ -854,6 +829,7 @@ void starpu_sched_component_worker_pre_exec_hook(struct starpu_task * task)
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&list->mutex);
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
void starpu_sched_component_worker_post_exec_hook(struct starpu_task * task)
|
|
|
{
|
|
|
if(task->execute_on_a_specific_worker)
|
|
@@ -865,63 +841,37 @@ void starpu_sched_component_worker_post_exec_hook(struct starpu_task * task)
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&list->mutex);
|
|
|
}
|
|
|
|
|
|
-
|
|
|
-#if 0
|
|
|
-static void starpu_sched_component_worker_push_task_notify(struct starpu_task * task, int workerid, unsigned sched_ctx_id STARPU_ATTRIBUTE_UNUSED)
|
|
|
+int starpu_sched_component_is_simple_worker(struct starpu_sched_component * component)
|
|
|
{
|
|
|
+ return component->push_task == simple_worker_push_task;
|
|
|
+}
|
|
|
+int starpu_sched_component_is_combined_worker(struct starpu_sched_component * component)
|
|
|
+{
|
|
|
+ return component->push_task == combined_worker_push_task;
|
|
|
+}
|
|
|
|
|
|
- struct starpu_sched_component * worker_component = starpu_sched_component_worker_get(workerid);
|
|
|
- /* dont work with parallel tasks */
|
|
|
- if(starpu_sched_component_is_combined_worker(worker_component))
|
|
|
- return;
|
|
|
-
|
|
|
- struct _starpu_worker_component_data * d = worker_component->data;
|
|
|
- struct _starpu_worker_task_list * list = d->list;
|
|
|
- /* Compute the expected penality */
|
|
|
- enum starpu_perfmodel_archtype perf_arch = starpu_worker_get_perf_archtype(workerid);
|
|
|
- unsigned memory_node = starpu_worker_get_memory_node(workerid);
|
|
|
-
|
|
|
- double predicted = starpu_task_expected_length(task, perf_arch,
|
|
|
- starpu_task_get_implementation(task));
|
|
|
-
|
|
|
- double predicted_transfer = starpu_task_expected_data_transfer_time(memory_node, task);
|
|
|
-
|
|
|
- /* Update the predictions */
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&list->mutex);
|
|
|
- /* Sometimes workers didn't take the tasks as early as we expected */
|
|
|
- list->exp_start = STARPU_MAX(list->exp_start, starpu_timing_now());
|
|
|
- list->exp_end = list->exp_start + list->exp_len;
|
|
|
+int starpu_sched_component_is_worker(struct starpu_sched_component * component)
|
|
|
+{
|
|
|
+ return starpu_sched_component_is_simple_worker(component)
|
|
|
+ || starpu_sched_component_is_combined_worker(component);
|
|
|
+}
|
|
|
|
|
|
- /* If there is no prediction available, we consider the task has a null length */
|
|
|
- if (!isnan(predicted_transfer))
|
|
|
+/* As Worker Components' creating functions are protected, this function allows
|
|
|
+ * the user to get a Worker Component from a worker id */
|
|
|
+struct starpu_sched_component * starpu_sched_component_worker_get(int workerid)
|
|
|
+{
|
|
|
+ STARPU_ASSERT(workerid >= 0 && workerid < STARPU_NMAXWORKERS);
|
|
|
+ /* we may need to take a mutex here */
|
|
|
+ if(_worker_components[workerid])
|
|
|
+ return _worker_components[workerid];
|
|
|
+ else
|
|
|
{
|
|
|
- if (starpu_timing_now() + predicted_transfer < list->exp_end)
|
|
|
- {
|
|
|
- /* We may hope that the transfer will be finshied by
|
|
|
- * the start of the task. */
|
|
|
- predicted_transfer = 0;
|
|
|
- }
|
|
|
+ struct starpu_sched_component * component;
|
|
|
+ if(workerid < (int) starpu_worker_get_count())
|
|
|
+ component = starpu_sched_component_worker_create(workerid);
|
|
|
else
|
|
|
- {
|
|
|
- /* The transfer will not be finished by then, take the
|
|
|
- * remainder into account */
|
|
|
- predicted_transfer = (starpu_timing_now() + predicted_transfer) - list->exp_end;
|
|
|
- }
|
|
|
- task->predicted_transfer = predicted_transfer;
|
|
|
- list->exp_end += predicted_transfer;
|
|
|
- list->exp_len += predicted_transfer;
|
|
|
- }
|
|
|
-
|
|
|
- /* If there is no prediction available, we consider the task has a null length */
|
|
|
- if (!isnan(predicted))
|
|
|
- {
|
|
|
- task->predicted = predicted;
|
|
|
- list->exp_end += predicted;
|
|
|
- list->exp_len += predicted;
|
|
|
+ component = starpu_sched_component_combined_worker_create(workerid);
|
|
|
+ _worker_components[workerid] = component;
|
|
|
+ return component;
|
|
|
}
|
|
|
-
|
|
|
- list->ntasks++;
|
|
|
-
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&list->mutex);
|
|
|
}
|
|
|
-#endif
|