123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635 |
- #include "node_sched.h"
- #include <core/workers.h>
- #include <float.h>
- static struct _starpu_sched_node * _worker_nodes[STARPU_NMAXWORKERS];
- /* data structure for worker's queue look like this :
- * W = worker
- * T = simple task
- * P = parallel task
- *
- *
- * P--P T
- * | | \|
- * P--P T T P T
- * | | | | | |
- * T T P--P--P T
- * | | | | | |
- * W W W W W W
- *
- *
- *
- * its possible that a _starpu_task_grid wont have task
- *
- * N = no task
- *
- * T T T
- * | | |
- * P--N--N
- * | | |
- * W W W
- *
- *
- * this API is a little asymmetric : _starpu_task_grid are allocated by the caller and freed by the data structure
- *
- * exp_{start,end,len} are filled by the caller
- */
- struct _starpu_task_grid
- {
- /* this member may be NULL if a worker have poped it but its a
- * parallel task and we dont want mad pointers
- */
- struct starpu_task * task;
- struct _starpu_task_grid *up, *down, *left, *right;
-
- /* this is used to count the number of task to be poped by a worker
- * the leftist _starpu_task_grid maintain the ntasks counter (ie .left == NULL),
- * all the others use the pntasks that point to it
- *
- * when the counter reach 0, all the left and right member are set to NULL,
- * that mean that we will free that nodes.
- */
- union
- {
- int ntasks;
- int * pntasks;
- };
- };
- struct _starpu_worker_task_list
- {
- double exp_start, exp_len, exp_end;
- struct _starpu_task_grid *first, *last;
- starpu_pthread_mutex_t mutex;
- };
- struct _starpu_worker_node_data
- {
- struct _starpu_worker * worker;
- struct _starpu_combined_worker * combined_worker;
- struct _starpu_worker_task_list * list;
- };
- static struct _starpu_worker_task_list * _starpu_worker_task_list_create(void)
- {
- struct _starpu_worker_task_list * l = malloc(sizeof(*l));
- memset(l, 0, sizeof(*l));
- l->exp_len = 0.0;
- l->exp_start = l->exp_end = starpu_timing_now();
- STARPU_PTHREAD_MUTEX_INIT(&l->mutex,NULL);
- return l;
- }
- static struct _starpu_task_grid * _starpu_task_grid_create(void)
- {
- struct _starpu_task_grid * t = malloc(sizeof(*t));
- memset(t, 0, sizeof(*t));
- return t;
- }
- static void _starpu_task_grid_destroy(struct _starpu_task_grid * t)
- {
- free(t);
- }
- static void _starpu_worker_task_list_destroy(struct _starpu_worker_task_list * l)
- {
- if(!l)
- return;
- STARPU_PTHREAD_MUTEX_DESTROY(&l->mutex);
- free(l);
- }
- //the task, ntasks, pntasks, left and right field members are set by the caller
- static inline void _starpu_worker_task_list_push(struct _starpu_worker_task_list * l, struct _starpu_task_grid * t)
- {
- if(l->first == NULL)
- l->first = l->last = t;
- t->down = l->last;
- l->last->up = t;
- t->up = NULL;
- l->last = t;
- }
- //recursively set left and right pointers to NULL
- static inline void _starpu_task_grid_unset_left_right_member(struct _starpu_task_grid * t)
- {
- STARPU_ASSERT(t->task == NULL);
- struct _starpu_task_grid * t_left = t->left;
- struct _starpu_task_grid * t_right = t->right;
- t->left = t->right = NULL;
- while(t_left)
- {
- STARPU_ASSERT(t_left->task == NULL);
- t = t_left;
- t_left = t_left->left;
- t->left = NULL;
- t->right = NULL;
- }
- while(t_right)
- {
- STARPU_ASSERT(t_right->task == NULL);
- t = t_right;
- t_right = t_right->right;
- t->left = NULL;
- t->right = NULL;
- }
- }
- static inline struct starpu_task * _starpu_worker_task_list_pop(struct _starpu_worker_task_list * l)
- {
- if(!l->first)
- {
- l->exp_start = l->exp_end = starpu_timing_now();
- l->exp_len = 0;
- return NULL;
- }
- struct _starpu_task_grid * t = l->first;
- if(t->task == NULL && t->right == NULL && t->left == NULL)
- {
- l->first = t->up;
- if(l->first)
- l->first->down = NULL;
- if(l->last == t)
- l->last = NULL;
- _starpu_task_grid_destroy(t);
- return _starpu_worker_task_list_pop(l);
- }
-
- while(t)
- {
- if(t->task)
- {
- struct starpu_task * task = t->task;
- t->task = NULL;
- int * p = t->left ? t->pntasks : &t->ntasks;
- STARPU_ATOMIC_ADD(p, -1);
- if(*p == 0)
- _starpu_task_grid_unset_left_right_member(t);
- return task;
- }
- t = t->up;
- }
- return NULL;
- }
- static struct _starpu_sched_node * _starpu_sched_node_worker_create(int workerid);
- static struct _starpu_sched_node * _starpu_sched_node_combined_worker_create(int workerid);
- struct _starpu_sched_node * _starpu_sched_node_worker_get(int workerid)
- {
- STARPU_ASSERT(workerid >= 0 && workerid < STARPU_NMAXWORKERS);
- /* we may need to take a mutex here */
- if(_worker_nodes[workerid])
- return _worker_nodes[workerid];
- else
- return _worker_nodes[workerid] =
- (workerid < (int) starpu_worker_get_count() ?
- _starpu_sched_node_worker_create:
- _starpu_sched_node_combined_worker_create)(workerid);
- }
- struct _starpu_worker * _starpu_sched_node_worker_get_worker(struct _starpu_sched_node * worker_node)
- {
- STARPU_ASSERT(_starpu_sched_node_is_worker(worker_node));
- struct _starpu_worker_node_data * data = worker_node->data;
- return data->worker;
- }
- int _starpu_sched_node_worker_push_task(struct _starpu_sched_node * node, struct starpu_task *task)
- {
- /*this function take the worker's mutex */
- struct _starpu_worker_node_data * data = node->data;
- struct _starpu_task_grid * t = _starpu_task_grid_create();
- t->task = task;
- t->ntasks = 1;
- STARPU_PTHREAD_MUTEX_LOCK(&data->list->mutex);
- _starpu_worker_task_list_push(data->list, t);
- STARPU_PTHREAD_MUTEX_UNLOCK(&data->list->mutex);
- return 0;
- }
- struct starpu_task * _starpu_sched_node_worker_pop_task(struct _starpu_sched_node *node,unsigned sched_ctx_id)
- {
- struct _starpu_worker_node_data * data = node->data;
- struct _starpu_worker_task_list * list = data->list;
- STARPU_PTHREAD_MUTEX_LOCK(&list->mutex);
- struct starpu_task * task = _starpu_worker_task_list_pop(list);
- STARPU_PTHREAD_MUTEX_UNLOCK(&list->mutex);
- if(task)
- {
- starpu_push_task_end(task);
- return task;
- }
-
- struct _starpu_sched_node *father = node->fathers[sched_ctx_id];
- if(father == NULL)
- return NULL;
- task = father->pop_task(father,sched_ctx_id);
- if(!task)
- return NULL;
- if(task->cl->type == STARPU_SPMD)
- {
- int combined_workerid = starpu_combined_worker_get_id();
- if(combined_workerid < 0)
- {
- starpu_push_task_end(task);
- return task;
- }
- struct _starpu_sched_node * combined_worker_node = _starpu_sched_node_worker_get(combined_workerid);
- (void)combined_worker_node->push_task(combined_worker_node, task);
- //we have pushed a task in queue, so can make a recursive call
- return _starpu_sched_node_worker_pop_task(node, sched_ctx_id);
-
- }
- if(task)
- starpu_push_task_end(task);
- return task;
- }
- void _starpu_sched_node_worker_destroy(struct _starpu_sched_node *node)
- {
- struct _starpu_worker * worker = _starpu_sched_node_worker_get_worker(node);
- unsigned id = worker->workerid;
- assert(_worker_nodes[id] == node);
- int i;
- for(i = 0; i < STARPU_NMAX_SCHED_CTXS ; i++)
- if(node->fathers[i] != NULL)
- return;//this node is shared between several contexts
- _starpu_sched_node_destroy(node);
- _worker_nodes[id] = NULL;
- }
- static void available_worker(struct _starpu_sched_node * worker_node)
- {
- (void) worker_node;
-
- #ifndef STARPU_NON_BLOCKING_DRIVERS
- struct _starpu_worker * w = _starpu_sched_node_worker_get_worker(worker_node);
- // if(w->workerid == starpu_worker_get_id())
- // return;
- starpu_pthread_mutex_t *sched_mutex = &w->sched_mutex;
- starpu_pthread_cond_t *sched_cond = &w->sched_cond;
- STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
- STARPU_PTHREAD_COND_SIGNAL(sched_cond);
- STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
- #endif
- }
- static void available_combined_worker(struct _starpu_sched_node * node)
- {
- STARPU_ASSERT(_starpu_sched_node_is_combined_worker(node));
- struct _starpu_worker_node_data * data = node->data;
- int workerid = starpu_worker_get_id();
- int i;
- for(i = 0; i < data->combined_worker->worker_size; i++)
- {
- if(i == workerid)
- continue;
- int worker = data->combined_worker->combined_workerid[i];
- starpu_pthread_mutex_t *sched_mutex;
- starpu_pthread_cond_t *sched_cond;
- starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
- STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
- STARPU_PTHREAD_COND_SIGNAL(sched_cond);
- STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
- }
- }
- static double estimated_transfer_length(struct _starpu_sched_node * node,
- struct starpu_task * task)
- {
- STARPU_ASSERT(_starpu_sched_node_is_worker(node));
- starpu_task_bundle_t bundle = task->bundle;
- struct _starpu_worker_node_data * data = node->data;
- unsigned memory_node = data->worker ? data->worker->memory_node : data->combined_worker->memory_node;
- if(bundle)
- return starpu_task_bundle_expected_data_transfer_time(bundle, memory_node);
- else
- return starpu_task_expected_data_transfer_time(memory_node, task);
- }
- static double worker_estimated_finish_time(struct _starpu_worker * worker)
- {
- STARPU_PTHREAD_MUTEX_LOCK(&worker->mutex);
- double sum = 0.0;
- struct starpu_task_list list = worker->local_tasks;
- struct starpu_task * task;
- for(task = starpu_task_list_front(&list);
- task != starpu_task_list_end(&list);
- task = starpu_task_list_next(task))
- if(!isnan(task->predicted))
- sum += task->predicted;
- if(worker->current_task)
- {
- struct starpu_task * t = worker->current_task;
- if(t && !isnan(t->predicted))
- sum += t->predicted/2;
- }
- STARPU_PTHREAD_MUTEX_UNLOCK(&worker->mutex);
- return sum + starpu_timing_now();
- }
- static double combined_worker_expected_finish_time(struct _starpu_sched_node * node)
- {
- STARPU_ASSERT(_starpu_sched_node_is_combined_worker(node));
- struct _starpu_worker_node_data * data = node->data;
- struct _starpu_combined_worker * combined_worker = data->combined_worker;
- double max = 0.0;
- int i;
- for(i = 0; i < combined_worker->worker_size; i++)
- {
- data = _worker_nodes[combined_worker->combined_workerid[i]]->data;
- STARPU_PTHREAD_MUTEX_LOCK(&data->list->mutex);
- double tmp = data->list->exp_end;
- STARPU_PTHREAD_MUTEX_UNLOCK(&data->list->mutex);
- max = tmp > max ? tmp : max;
- }
- return max;
- }
- static double simple_worker_expected_finish_time(struct _starpu_sched_node * node)
- {
- struct _starpu_worker_node_data * data = node->data;
- STARPU_PTHREAD_MUTEX_LOCK(&data->list->mutex);
- double tmp = data->list->exp_end;
- STARPU_PTHREAD_MUTEX_UNLOCK(&data->list->mutex);
- return tmp;
- }
- static struct _starpu_task_execute_preds estimated_execute_preds(struct _starpu_sched_node * node, struct starpu_task * task,
- double (*estimated_finish_time)(struct _starpu_sched_node*))
- {
- STARPU_ASSERT(_starpu_sched_node_is_worker(node));
- starpu_task_bundle_t bundle = task->bundle;
- struct _starpu_worker * worker = _starpu_sched_node_worker_get_worker(node);
-
- struct _starpu_task_execute_preds preds =
- {
- .state = CANNOT_EXECUTE,
- .archtype = worker->perf_arch,
- .expected_length = DBL_MAX,
- .expected_finish_time = estimated_finish_time(node),
- .expected_transfer_length = estimated_transfer_length(node, task),
- .expected_power = 0.0
- };
- int nimpl;
- for(nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
- {
- if(starpu_worker_can_execute_task(worker->workerid,task,nimpl))
- {
- double d;
- if(bundle)
- d = starpu_task_bundle_expected_length(bundle, worker->perf_arch, nimpl);
- else
- d = starpu_task_expected_length(task, worker->perf_arch, nimpl);
- if(isnan(d))
- {
- preds.state = CALIBRATING;
- preds.expected_length = d;
- preds.impl = nimpl;
- return preds;
- }
- if(_STARPU_IS_ZERO(d) && preds.state == CANNOT_EXECUTE)
- {
- preds.state = NO_PERF_MODEL;
- preds.impl = nimpl;
- continue;
- }
- if(d < preds.expected_length)
- {
- preds.state = PERF_MODEL;
- preds.expected_length = d;
- preds.impl = nimpl;
- }
- }
- }
- if(preds.state == PERF_MODEL)
- {
- preds.expected_finish_time = _starpu_compute_expected_time(starpu_timing_now(),
- preds.expected_finish_time,
- preds.expected_length,
- preds.expected_transfer_length);
- if(bundle)
- preds.expected_power = starpu_task_bundle_expected_power(bundle, worker->perf_arch, preds.impl);
- else
- preds.expected_power = starpu_task_expected_power(task, worker->perf_arch,preds.impl);
- }
- return preds;
- }
- static struct _starpu_task_execute_preds combined_worker_estimated_execute_preds(struct _starpu_sched_node * node, struct starpu_task * task)
- {
- return estimated_execute_preds(node,task,combined_worker_expected_finish_time);
- }
- static struct _starpu_task_execute_preds simple_worker_estimated_execute_preds(struct _starpu_sched_node * node, struct starpu_task * task)
- {
- return estimated_execute_preds(node,task,simple_worker_expected_finish_time);
- }
- static double estimated_load(struct _starpu_sched_node * node)
- {
- struct _starpu_worker * worker = _starpu_sched_node_worker_get_worker(node);
- int nb_task = 0;
- STARPU_PTHREAD_MUTEX_LOCK(&worker->mutex);
- struct starpu_task_list list = worker->local_tasks;
- struct starpu_task * task;
- for(task = starpu_task_list_front(&list);
- task != starpu_task_list_end(&list);
- task = starpu_task_list_next(task))
- nb_task++;
- STARPU_PTHREAD_MUTEX_UNLOCK(&worker->mutex);
- return (double) nb_task
- / starpu_worker_get_relative_speedup(_starpu_bitmap_first(node->workers));
- }
- static void worker_deinit_data(struct _starpu_sched_node * node)
- {
- struct _starpu_worker_node_data * data = node->data;
- if(data->list)
- _starpu_worker_task_list_destroy(data->list);
- free(data);
- node->data = NULL;
- int i;
- for(i = 0; i < STARPU_NMAXWORKERS; i++)
- if(_worker_nodes[i] == node)
- break;
- STARPU_ASSERT(i < STARPU_NMAXWORKERS);
- _worker_nodes[i] = NULL;
- }
- static int _starpu_sched_node_combined_worker_push_task(struct _starpu_sched_node * node, struct starpu_task *task)
- {
- STARPU_ASSERT(_starpu_sched_node_is_combined_worker(node));
- struct _starpu_worker_node_data * data = node->data;
- STARPU_ASSERT(data->combined_worker && !data->worker);
- struct _starpu_combined_worker * combined_worker = data->combined_worker;
- STARPU_ASSERT(combined_worker->worker_size >= 1);
- struct _starpu_task_grid * task_alias[combined_worker->worker_size];
- starpu_parallel_task_barrier_init(task, _starpu_bitmap_first(node->workers));
- task_alias[0] = _starpu_task_grid_create();
- task_alias[0]->task = task;
- task_alias[0]->left = NULL;
- task_alias[0]->ntasks = combined_worker->worker_size;
- int i;
- for(i = 1; i < combined_worker->worker_size; i++)
- {
- task_alias[i] = _starpu_task_grid_create();
- task_alias[i]->task = starpu_task_dup(task);
- task_alias[i]->left = task_alias[i-1];
- task_alias[i - 1]->right = task_alias[i];
- task_alias[i]->pntasks = &task_alias[0]->ntasks;
- }
- starpu_pthread_mutex_t * mutex_to_unlock = NULL;
- i = 0;
- do
- {
- struct _starpu_sched_node * worker_node = _starpu_sched_node_worker_get(combined_worker->combined_workerid[i]);
- struct _starpu_worker_node_data * worker_data = worker_node->data;
- struct _starpu_worker_task_list * list = worker_data->list;
- STARPU_PTHREAD_MUTEX_LOCK(&list->mutex);
- if(mutex_to_unlock)
- STARPU_PTHREAD_MUTEX_UNLOCK(mutex_to_unlock);
- mutex_to_unlock = &list->mutex;
-
- _starpu_worker_task_list_push(list, task_alias[i]);
- worker_node->available(worker_node);
- i++;
- }
- while(i < combined_worker->worker_size);
- STARPU_PTHREAD_MUTEX_UNLOCK(mutex_to_unlock);
- return 0;
- }
- static struct _starpu_sched_node * _starpu_sched_node_worker_create(int workerid)
- {
- STARPU_ASSERT(0 <= workerid && workerid < (int) starpu_worker_get_count());
- if(_worker_nodes[workerid])
- return _worker_nodes[workerid];
- struct _starpu_worker * worker = _starpu_get_worker_struct(workerid);
- if(worker == NULL)
- return NULL;
- struct _starpu_sched_node * node = _starpu_sched_node_create();
- struct _starpu_worker_node_data * data = malloc(sizeof(*data));
- memset(data, 0, sizeof(*data));
- data->worker = worker;
- data->list = _starpu_worker_task_list_create();
- node->data = data;
- node->push_task = _starpu_sched_node_worker_push_task;
- node->pop_task = _starpu_sched_node_worker_pop_task;
- node->estimated_execute_preds = simple_worker_estimated_execute_preds;
- node->estimated_load = estimated_load;
- node->available = available_worker;
- node->deinit_data = worker_deinit_data;
- node->workers = _starpu_bitmap_create();
- _starpu_bitmap_set(node->workers, workerid);
- _worker_nodes[workerid] = node;
- #ifdef STARPU_HAVE_HWLOC
- struct _starpu_machine_config *config = _starpu_get_machine_config();
- struct _starpu_machine_topology *topology = &config->topology;
- hwloc_obj_t obj = hwloc_get_obj_by_depth(topology->hwtopology, config->cpu_depth, worker->bindid);
- STARPU_ASSERT(obj);
- node->obj = obj;
- #endif
- return node;
- }
- static struct _starpu_sched_node * _starpu_sched_node_combined_worker_create(int workerid)
- {
- STARPU_ASSERT(0 <= workerid && workerid < STARPU_NMAXWORKERS);
- if(_worker_nodes[workerid])
- return _worker_nodes[workerid];
- struct _starpu_combined_worker * combined_worker = _starpu_get_combined_worker_struct(workerid);
- if(combined_worker == NULL)
- return NULL;
- struct _starpu_sched_node * node = _starpu_sched_node_create();
- struct _starpu_worker_node_data * data = malloc(sizeof(*data));
- memset(data, 0, sizeof(*data));
- data->combined_worker = combined_worker;
- node->data = data;
- node->push_task = _starpu_sched_node_combined_worker_push_task;
- node->pop_task = NULL;
- node->estimated_execute_preds = combined_worker_estimated_execute_preds;
- node->estimated_load = estimated_load;
- node->available = available_combined_worker;
- node->deinit_data = worker_deinit_data;
- node->workers = _starpu_bitmap_create();
- _starpu_bitmap_set(node->workers, workerid);
- _worker_nodes[workerid] = node;
- #ifdef STARPU_HAVE_HWLOC
- struct _starpu_machine_config *config = _starpu_get_machine_config();
- struct _starpu_machine_topology *topology = &config->topology;
- hwloc_obj_t obj = hwloc_get_obj_by_depth(topology->hwtopology, config->cpu_depth, combined_worker->combined_workerid[0]);
- STARPU_ASSERT(obj);
- node->obj = obj;
- #endif
- return node;
- }
- int _starpu_sched_node_is_simple_worker(struct _starpu_sched_node * node)
- {
- return node->push_task == _starpu_sched_node_worker_push_task;
- }
- int _starpu_sched_node_is_combined_worker(struct _starpu_sched_node * node)
- {
- return node->push_task == _starpu_sched_node_combined_worker_push_task;
- }
- int _starpu_sched_node_is_worker(struct _starpu_sched_node * node)
- {
- return _starpu_sched_node_is_simple_worker(node)
- || _starpu_sched_node_is_combined_worker(node);
- }
- #ifndef STARPU_NO_ASSERT
- static int _worker_consistant(struct _starpu_sched_node * node)
- {
- int is_a_worker = 0;
- int i;
- for(i = 0; i<STARPU_NMAXWORKERS; i++)
- if(_worker_nodes[i] == node)
- is_a_worker = 1;
- if(!is_a_worker)
- return 0;
- struct _starpu_worker_node_data * data = node->data;
- if(data->worker)
- {
- int id = data->worker->workerid;
- return (_worker_nodes[id] == node)
- && node->nchilds == 0;
- }
- return 1;
- }
- #endif
- int _starpu_sched_node_worker_get_workerid(struct _starpu_sched_node * worker_node)
- {
- #ifndef STARPU_NO_ASSERT
- STARPU_ASSERT(_worker_consistant(worker_node));
- #endif
- return _starpu_sched_node_worker_get_worker(worker_node)->workerid;
- }
|