123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353 |
- /* StarPU --- Runtime system for heterogeneous multicore architectures.
- *
- * Copyright (C) 2013 INRIA
- * Copyright (C) 2013 Simon Archipoff
- *
- * StarPU is free software; you can redistribute it and/or modify
- * it under the terms of the GNU Lesser General Public License as published by
- * the Free Software Foundation; either version 2.1 of the License, or (at
- * your option) any later version.
- *
- * StarPU is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
- *
- * See the GNU Lesser General Public License in COPYING.LGPL for more details.
- */
- #include <starpu_sched_component.h>
- #include <starpu_scheduler.h>
- #include <starpu.h>
- #include <float.h>
- #include "prio_deque.h"
- struct _starpu_work_stealing_data
- {
- /* keep track of the work performed from the beginning of the algorithm to make
- * better decisions about which queue to child when stealing or deferring work
- */
- unsigned performed_total, last_pop_child, last_push_child;
- struct _starpu_prio_deque ** fifos;
- starpu_pthread_mutex_t ** mutexes;
- int size;
- };
- /**
- * steal a task in a round robin way
- * return NULL if none available
- */
- static struct starpu_task * steal_task_round_robin(struct starpu_sched_component *component, int workerid)
- {
- struct _starpu_work_stealing_data *wsd = component->data;
- unsigned i = wsd->last_pop_child;
- wsd->last_pop_child = (wsd->last_pop_child + 1) % component->nchildren;
- /* If the worker's queue have no suitable tasks, let's try
- * the next ones */
- struct starpu_task * task = NULL;
- while (1)
- {
- struct _starpu_prio_deque * fifo = wsd->fifos[i];
- STARPU_PTHREAD_MUTEX_LOCK(wsd->mutexes[i]);
- task = _starpu_prio_deque_deque_task_for_worker(fifo, workerid);
- if(task && !isnan(task->predicted))
- {
- fifo->exp_len -= task->predicted;
- fifo->nprocessed--;
- }
- STARPU_PTHREAD_MUTEX_UNLOCK(wsd->mutexes[i]);
- if(task)
- break;
- if (i == wsd->last_pop_child)
- {
- /* We got back to the first worker,
- * don't go in infinite loop */
- return NULL;
- }
- i = (i + 1) % component->nchildren;
-
- }
- return task;
- }
- /**
- * Return a worker to whom add a task.
- * Selecting a worker is done in a round-robin fashion.
- */
- static unsigned select_worker_round_robin(struct starpu_sched_component * component)
- {
- struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)component->data;
- unsigned i = (ws->last_push_child + 1) % component->nchildren ;
- ws->last_push_child = i;
- return i;
- }
- /**
- * Return a worker from which a task can be stolen.
- * This is a phony function used to call the right
- * function depending on the value of USE_OVERLOAD.
- */
- static inline struct starpu_task * steal_task(struct starpu_sched_component * component, int workerid)
- {
- return steal_task_round_robin(component, workerid);
- }
- /**
- * Return a worker from which a task can be stolen.
- * This is a phony function used to call the right
- * function depending on the value of USE_OVERLOAD.
- */
- static inline unsigned select_worker(struct starpu_sched_component * component)
- {
- return select_worker_round_robin(component);
- }
- static int is_worker_of_component(struct starpu_sched_component * component, int workerid)
- {
- return starpu_bitmap_get(component->workers, workerid);
- }
- static struct starpu_task * pull_task(struct starpu_sched_component * component)
- {
- int workerid = starpu_worker_get_id();
- int i;
- for(i = 0; i < component->nchildren; i++)
- {
- if(is_worker_of_component(component->children[i], workerid))
- break;
- }
- STARPU_ASSERT(i < component->nchildren);
- struct _starpu_work_stealing_data * wsd = component->data;
- STARPU_PTHREAD_MUTEX_LOCK(wsd->mutexes[i]);
- struct starpu_task * task = _starpu_prio_deque_pop_task(wsd->fifos[i]);
- if(task)
- {
- if(!isnan(task->predicted))
- {
- wsd->fifos[i]->exp_len -= task->predicted;
- wsd->fifos[i]->exp_start = starpu_timing_now() + task->predicted;
- }
- }
- else
- wsd->fifos[i]->exp_len = 0.0;
- STARPU_PTHREAD_MUTEX_UNLOCK(wsd->mutexes[i]);
- if(task)
- {
- return task;
- }
-
- task = steal_task(component, workerid);
- if(task)
- {
- STARPU_PTHREAD_MUTEX_LOCK(wsd->mutexes[i]);
- wsd->fifos[i]->nprocessed++;
- STARPU_PTHREAD_MUTEX_UNLOCK(wsd->mutexes[i]);
- return task;
- }
- for(i=0; i < component->nparents; i++)
- {
- if(component->parents[i] == NULL)
- continue;
- else
- {
- task = component->parents[i]->pull_task(component->parents[i]);
- if(task)
- break;
- }
- }
- if(task)
- return task;
- else
- return NULL;
- }
- double _ws_estimated_end(struct starpu_sched_component * component)
- {
- STARPU_ASSERT(starpu_sched_component_is_work_stealing(component));
- struct _starpu_work_stealing_data * wsd = component->data;
- double sum_len = 0.0;
- double sum_start = 0.0;
- int i;
- for(i = 0; i < component->nchildren; i++)
- {
- STARPU_PTHREAD_MUTEX_LOCK(wsd->mutexes[i]);
- sum_len += wsd->fifos[i]->exp_len;
- wsd->fifos[i]->exp_start = STARPU_MAX(starpu_timing_now(), wsd->fifos[i]->exp_start);
- sum_start += wsd->fifos[i]->exp_start;
- STARPU_PTHREAD_MUTEX_UNLOCK(wsd->mutexes[i]);
- }
- int nb_workers = starpu_bitmap_cardinal(component->workers_in_ctx);
- return (sum_start + sum_len) / nb_workers;
- }
- double _ws_estimated_load(struct starpu_sched_component * component)
- {
- STARPU_ASSERT(starpu_sched_component_is_work_stealing(component));
- struct _starpu_work_stealing_data * wsd = component->data;
- int ntasks = 0;
- int i;
- for(i = 0; i < component->nchildren; i++)
- {
- STARPU_PTHREAD_MUTEX_LOCK(wsd->mutexes[i]);
- ntasks += wsd->fifos[i]->ntasks;
- STARPU_PTHREAD_MUTEX_UNLOCK(wsd->mutexes[i]);
- }
- double speedup = 0.0;
- int workerid;
- for(workerid = starpu_bitmap_first(component->workers_in_ctx);
- -1 != workerid;
- workerid = starpu_bitmap_next(component->workers_in_ctx, workerid))
- {
- speedup += starpu_worker_get_relative_speedup(starpu_worker_get_perf_archtype(workerid));
- }
-
- return ntasks / speedup;
- }
- static int push_task(struct starpu_sched_component * component, struct starpu_task * task)
- {
- struct _starpu_work_stealing_data * wsd = component->data;
- int ret = -1;
- int i = wsd->last_push_child;
- i = (i+1)%component->nchildren;
- STARPU_PTHREAD_MUTEX_LOCK(wsd->mutexes[i]);
- ret = _starpu_prio_deque_push_task(wsd->fifos[i], task);
- STARPU_PTHREAD_MUTEX_UNLOCK(wsd->mutexes[i]);
- wsd->last_push_child = i;
- component->can_pull(component);
- return ret;
- }
- //this function is special, when a worker call it, we want to push the task in his fifo
- int starpu_sched_tree_work_stealing_push_task(struct starpu_task *task)
- {
- int workerid = starpu_worker_get_id();
- if(workerid == -1)
- return starpu_sched_tree_push_task(task);
- unsigned sched_ctx_id = task->sched_ctx;
- struct starpu_sched_component * component =starpu_sched_component_worker_get(workerid);
- while(component->parents[sched_ctx_id] != NULL)
- {
- component = component->parents[sched_ctx_id];
- if(starpu_sched_component_is_work_stealing(component))
- {
- if(!starpu_sched_component_can_execute_task(component, task))
- return starpu_sched_tree_push_task(task);
- int i;
- for(i = 0; i < component->nchildren; i++)
- if(is_worker_of_component(component->children[i], workerid))
- break;
- STARPU_ASSERT(i < component->nchildren);
-
- struct _starpu_work_stealing_data * wsd = component->data;
- STARPU_PTHREAD_MUTEX_LOCK(wsd->mutexes[i]);
- int ret = _starpu_prio_deque_push_task(wsd->fifos[i] , task);
- if(ret == 0 && !isnan(task->predicted))
- wsd->fifos[i]->exp_len += task->predicted;
- STARPU_PTHREAD_MUTEX_UNLOCK(wsd->mutexes[i]);
-
- //we need to wake all workers
- component->can_pull(component);
- return ret;
- }
- }
- /* this should not be reached */
- return starpu_sched_tree_push_task(task);
- }
- void _ws_add_child(struct starpu_sched_component * component, struct starpu_sched_component * child)
- {
- struct _starpu_work_stealing_data * wsd = component->data;
- component->add_child(component, child);
- if(wsd->size < component->nchildren)
- {
- STARPU_ASSERT(wsd->size == component->nchildren - 1);
- wsd->fifos = realloc(wsd->fifos, component->nchildren * sizeof(*wsd->fifos));
- wsd->mutexes = realloc(wsd->mutexes, component->nchildren * sizeof(*wsd->mutexes));
- wsd->size = component->nchildren;
- }
- struct _starpu_prio_deque * fifo = malloc(sizeof(*fifo));
- _starpu_prio_deque_init(fifo);
- wsd->fifos[component->nchildren - 1] = fifo;
- starpu_pthread_mutex_t * mutex = malloc(sizeof(*mutex));
- STARPU_PTHREAD_MUTEX_INIT(mutex,NULL);
- wsd->mutexes[component->nchildren - 1] = mutex;
- }
- void _ws_remove_child(struct starpu_sched_component * component, struct starpu_sched_component * child)
- {
- struct _starpu_work_stealing_data * wsd = component->data;
- STARPU_PTHREAD_MUTEX_DESTROY(wsd->mutexes[component->nchildren - 1]);
- free(wsd->mutexes[component->nchildren - 1]);
- int i_component;
- for(i_component = 0; i_component < component->nchildren; i_component++)
- {
- if(component->children[i_component] == child)
- break;
- }
- STARPU_ASSERT(i_component != component->nchildren);
- struct _starpu_prio_deque * tmp_fifo = wsd->fifos[i_component];
- wsd->fifos[i_component] = wsd->fifos[component->nchildren - 1];
-
- component->children[i_component] = component->children[component->nchildren - 1];
- component->nchildren--;
- struct starpu_task * task;
- while((task = _starpu_prio_deque_pop_task(tmp_fifo)))
- {
- component->push_task(component, task);
- }
- _starpu_prio_deque_destroy(tmp_fifo);
- free(tmp_fifo);
- }
- void _work_stealing_component_deinit_data(struct starpu_sched_component * component)
- {
- free(component->data);
- }
- int starpu_sched_component_is_work_stealing(struct starpu_sched_component * component)
- {
- return component->push_task == push_task;
- }
- struct starpu_sched_component * starpu_sched_component_work_stealing_create(void * arg STARPU_ATTRIBUTE_UNUSED)
- {
- struct starpu_sched_component * component = starpu_sched_component_create();
- struct _starpu_work_stealing_data * wsd = malloc(sizeof(*wsd));
- memset(wsd, 0, sizeof(*wsd));
- component->pull_task = pull_task;
- component->push_task = push_task;
- component->add_child = _ws_add_child;
- component->remove_child = _ws_remove_child;
- component->estimated_end = _ws_estimated_end;
- component->estimated_load = _ws_estimated_load;
- component->deinit_data = _work_stealing_component_deinit_data;
- component->data = wsd;
- return component;
- }
|