|
@@ -1,57 +1,291 @@
|
|
|
#include "node_sched.h"
|
|
|
+#include "fifo_queues.h"
|
|
|
+#include <starpu_scheduler.h>
|
|
|
|
|
|
struct _starpu_work_stealing_data
|
|
|
{
|
|
|
/* keep track of the work performed from the beginning of the algorithm to make
|
|
|
- * better decisions about which queue to select when stealing or deferring work
|
|
|
+ * better decisions about which queue to child when stealing or deferring work
|
|
|
*/
|
|
|
|
|
|
unsigned performed_total;
|
|
|
- unsigned last_pop_worker;
|
|
|
- unsigned last_push_worker;
|
|
|
+ unsigned last_pop_child;
|
|
|
+ unsigned last_push_child;
|
|
|
};
|
|
|
-//
|
|
|
-///* little dirty hack here, fifo_nodes under the workstealing node need to know wich child they are in order to push task on themselfs
|
|
|
-// */
|
|
|
-//struct _starpu_fifo_ws_data {
|
|
|
-// struct _starpu_fifo_taskq *fifo;
|
|
|
-// int rank;
|
|
|
-//};
|
|
|
-//
|
|
|
-//static void destroy_fifo_ws(struct _starpu_sched_node * node)
|
|
|
-//{
|
|
|
-// struct _starpu_fifo_ws_data * fwsd = node->data;
|
|
|
-// _starpu_destroy_fifo(fwsd->fifo);
|
|
|
-// free(fwsd);
|
|
|
-// _starpu_sched_node_destroy(node);
|
|
|
-//}
|
|
|
-//
|
|
|
-//static int fifo_ws_push_task(struct _starpu_sched_node * node,
|
|
|
-// struct starpu_task * task)
|
|
|
-//{
|
|
|
-// STARPU_PTHREAD_MUTEX_LOCK(&node->mutex);
|
|
|
-// struct _starpu_fifo_ws_data * fwsd = node->data;
|
|
|
-// int ret_val = _starpu_push_sorted_task(node->data, task);
|
|
|
-// STARPU_PTHREAD_MUTEX_UNLOCK(&node->mutex);
|
|
|
-// return ret_val;
|
|
|
-//}
|
|
|
-//
|
|
|
-//static starpu_task * fifo_ws_pop_task(struct _starpu_sched_node * node,
|
|
|
-// unsigned sched_ctx_id)
|
|
|
-//{
|
|
|
-// STARPU_PTHREAD_MUTEX_LOCK(&node->mutex);
|
|
|
-// int ret_val = _starpu_push_sorted_task(node->data, task);
|
|
|
-// STARPU_PTHREAD_MUTEX_UNLOCK(&node->mutex);
|
|
|
-// return ret_val;
|
|
|
-//}
|
|
|
-//
|
|
|
|
|
|
|
|
|
+
|
|
|
+#ifdef USE_OVERLOAD
|
|
|
+/**
|
|
|
+ * Minimum number of task we wait for being processed before we start assuming
|
|
|
+ * on which child the computation would be faster.
|
|
|
+ */
|
|
|
+static int calibration_value = 0;
|
|
|
+
|
|
|
+#endif /* USE_OVERLOAD */
|
|
|
+
|
|
|
+
|
|
|
+/**
|
|
|
+ * Return a child from which a task can be stolen.
|
|
|
+ * Selecting a worker is done in a round-robin fashion, unless
|
|
|
+ * the child previously selected doesn't own any task,
|
|
|
+ * then we return the first non-empty worker.
|
|
|
+ * and take his mutex
|
|
|
+ *
|
|
|
+ * if no child have task, return -1 and dont take any mutex
|
|
|
+ */
|
|
|
+static unsigned select_victim_round_robin(struct _starpu_sched_node *node)
|
|
|
+{
|
|
|
+ struct _starpu_work_stealing_data *ws = node->data;
|
|
|
+ int i = (ws->last_pop_child + 1) % node->nchilds;
|
|
|
+
|
|
|
+ starpu_pthread_mutex_t *victim_sched_mutex;
|
|
|
+
|
|
|
+ /* If the worker's queue is empty, let's try
|
|
|
+ * the next ones */
|
|
|
+ while (1)
|
|
|
+ {
|
|
|
+ unsigned ntasks;
|
|
|
+ struct _starpu_sched_node * child = node->childs[i];
|
|
|
+ struct _starpu_fifo_taskq * fifo = _starpu_node_fifo_get_fifo(child);
|
|
|
+ victim_sched_mutex = &child->mutex;
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(victim_sched_mutex);
|
|
|
+ ntasks = fifo->ntasks;
|
|
|
+ if (ntasks)
|
|
|
+ break;
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(victim_sched_mutex);
|
|
|
+ i = (i + 1) % node->nchilds;
|
|
|
+ if ((unsigned) i == ws->last_pop_child)
|
|
|
+ {
|
|
|
+ /* We got back to the first worker,
|
|
|
+ * don't go in infinite loop */
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ ws->last_pop_child = i;
|
|
|
+
|
|
|
+ return i;
|
|
|
+}
|
|
|
+
|
|
|
+/**
|
|
|
+ * Return a worker to whom add a task.
|
|
|
+ * Selecting a worker is done in a round-robin fashion.
|
|
|
+ */
|
|
|
+static unsigned select_worker_round_robin(struct _starpu_sched_node * node)
|
|
|
+{
|
|
|
+ struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)node->data;
|
|
|
+ unsigned i = (ws->last_push_child + 1) % node->nchilds ;
|
|
|
+ ws->last_push_child = i;
|
|
|
+ return i;
|
|
|
+}
|
|
|
+#undef USE_OVERLOAD
|
|
|
+#ifdef USE_OVERLOAD
|
|
|
+
|
|
|
+/**
|
|
|
+ * Return a ratio helpful to determine whether a worker is suitable to steal
|
|
|
+ * tasks from or to put some tasks in its queue.
|
|
|
+ *
|
|
|
+ * \return a ratio with a positive or negative value, describing the current state of the worker :
|
|
|
+ * a smaller value implies a faster worker with an relatively emptier queue : more suitable to put tasks in
|
|
|
+ * a bigger value implies a slower worker with an reletively more replete queue : more suitable to steal tasks from
|
|
|
+ */
|
|
|
+static float overload_metric(unsigned sched_ctx_id, unsigned id)
|
|
|
+{
|
|
|
+ struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
|
|
|
+ float execution_ratio = 0.0f;
|
|
|
+ float current_ratio = 0.0f;
|
|
|
+
|
|
|
+ int nprocessed = _starpu_get_deque_nprocessed(ws->queue_array[id]);
|
|
|
+ unsigned njobs = _starpu_get_deque_njobs(ws->queue_array[id]);
|
|
|
+
|
|
|
+ /* Did we get enough information ? */
|
|
|
+ if (performed_total > 0 && nprocessed > 0)
|
|
|
+ {
|
|
|
+ /* How fast or slow is the worker compared to the other workers */
|
|
|
+ execution_ratio = (float) nprocessed / performed_total;
|
|
|
+ /* How replete is its queue */
|
|
|
+ current_ratio = (float) njobs / nprocessed;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ return 0.0f;
|
|
|
+ }
|
|
|
+
|
|
|
+ return (current_ratio - execution_ratio);
|
|
|
+}
|
|
|
+
|
|
|
+/**
|
|
|
+ * Return the most suitable worker from which a task can be stolen.
|
|
|
+ * The number of previously processed tasks, total and local,
|
|
|
+ * and the number of tasks currently awaiting to be processed
|
|
|
+ * by the tasks are taken into account to select the most suitable
|
|
|
+ * worker to steal task from.
|
|
|
+ */
|
|
|
+static unsigned select_victim_overload(unsigned sched_ctx_id)
|
|
|
+{
|
|
|
+ unsigned worker;
|
|
|
+ float worker_ratio;
|
|
|
+ unsigned best_worker = 0;
|
|
|
+ float best_ratio = FLT_MIN;
|
|
|
+
|
|
|
+ /* Don't try to play smart until we get
|
|
|
+ * enough informations. */
|
|
|
+ if (performed_total < calibration_value)
|
|
|
+ return select_victim_round_robin(sched_ctx_id);
|
|
|
+
|
|
|
+ struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
|
|
|
+
|
|
|
+ struct starpu_sched_ctx_iterator it;
|
|
|
+ if(workers->init_iterator)
|
|
|
+ workers->init_iterator(workers, &it);
|
|
|
+
|
|
|
+ while(workers->has_next(workers, &it))
|
|
|
+ {
|
|
|
+ worker = workers->get_next(workers, &it);
|
|
|
+ worker_ratio = overload_metric(sched_ctx_id, worker);
|
|
|
+
|
|
|
+ if (worker_ratio > best_ratio)
|
|
|
+ {
|
|
|
+ best_worker = worker;
|
|
|
+ best_ratio = worker_ratio;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return best_worker;
|
|
|
+}
|
|
|
+
|
|
|
+/**
|
|
|
+ * Return the most suitable worker to whom add a task.
|
|
|
+ * The number of previously processed tasks, total and local,
|
|
|
+ * and the number of tasks currently awaiting to be processed
|
|
|
+ * by the tasks are taken into account to select the most suitable
|
|
|
+ * worker to add a task to.
|
|
|
+ */
|
|
|
+static unsigned select_worker_overload(unsigned sched_ctx_id)
|
|
|
+{
|
|
|
+ unsigned worker;
|
|
|
+ float worker_ratio;
|
|
|
+ unsigned best_worker = 0;
|
|
|
+ float best_ratio = FLT_MAX;
|
|
|
+
|
|
|
+ /* Don't try to play smart until we get
|
|
|
+ * enough informations. */
|
|
|
+ if (performed_total < calibration_value)
|
|
|
+ return select_worker_round_robin(sched_ctx_id);
|
|
|
+
|
|
|
+ struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
|
|
|
+
|
|
|
+ struct starpu_sched_ctx_iterator it;
|
|
|
+ if(workers->init_iterator)
|
|
|
+ workers->init_iterator(workers, &it);
|
|
|
+
|
|
|
+ while(workers->has_next(workers, &it))
|
|
|
+ {
|
|
|
+ worker = workers->get_next(workers, &it);
|
|
|
+
|
|
|
+ worker_ratio = overload_metric(sched_ctx_id, worker);
|
|
|
+
|
|
|
+ if (worker_ratio < best_ratio)
|
|
|
+ {
|
|
|
+ best_worker = worker;
|
|
|
+ best_ratio = worker_ratio;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return best_worker;
|
|
|
+}
|
|
|
+
|
|
|
+#endif /* USE_OVERLOAD */
|
|
|
+
|
|
|
+
|
|
|
+/**
|
|
|
+ * Return a worker from which a task can be stolen.
|
|
|
+ * This is a phony function used to call the right
|
|
|
+ * function depending on the value of USE_OVERLOAD.
|
|
|
+ */
|
|
|
+static inline int select_victim(struct _starpu_sched_node * node)
|
|
|
+{
|
|
|
+#ifdef USE_OVERLOAD
|
|
|
+ return select_victim_overload(node);
|
|
|
+#else
|
|
|
+ return select_victim_round_robin(node);
|
|
|
+#endif /* USE_OVERLOAD */
|
|
|
+}
|
|
|
+
|
|
|
+/**
|
|
|
+ * Return a worker from which a task can be stolen.
|
|
|
+ * This is a phony function used to call the right
|
|
|
+ * function depending on the value of USE_OVERLOAD.
|
|
|
+ */
|
|
|
+static inline unsigned select_worker(struct _starpu_sched_node * node)
|
|
|
+{
|
|
|
+#ifdef USE_OVERLOAD
|
|
|
+ return select_worker_overload(node);
|
|
|
+#else
|
|
|
+ return select_worker_round_robin(node);
|
|
|
+#endif /* USE_OVERLOAD */
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+static struct starpu_task * pop_task(struct _starpu_sched_node * node, unsigned sched_ctx_id)
|
|
|
+{
|
|
|
+ int victim = select_victim(node);
|
|
|
+ if(victim < 0)
|
|
|
+ {
|
|
|
+ if(node->fathers[sched_ctx_id])
|
|
|
+ return node->fathers[sched_ctx_id]->pop_task(node->fathers[sched_ctx_id],sched_ctx_id);
|
|
|
+ else
|
|
|
+ return NULL;
|
|
|
+ }
|
|
|
+ struct _starpu_sched_node * child = node->childs[victim];
|
|
|
+ struct _starpu_fifo_taskq * fifo = _starpu_node_fifo_get_fifo(child);
|
|
|
+ struct starpu_task * task = _starpu_fifo_pop_task(fifo,
|
|
|
+ starpu_worker_get_id());
|
|
|
+ fifo->nprocessed--;
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&child->mutex);
|
|
|
+ return task;
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+static int push_task(struct _starpu_sched_node * node, struct starpu_task * task)
|
|
|
+{
|
|
|
+ struct _starpu_work_stealing_data * wsd = node->data;
|
|
|
+ int ret = -1;
|
|
|
+ int start = wsd->last_push_child;
|
|
|
+
|
|
|
+ int i;
|
|
|
+ for(i = (start+1)%node->nchilds; i != start; i = (i+1)%node->nchilds)
|
|
|
+ {
|
|
|
+ struct _starpu_sched_node * child = node->childs[i];
|
|
|
+ if(_starpu_sched_node_can_execute_task(child,task))
|
|
|
+ {
|
|
|
+ ret = child->push_task(child,task);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if(i == start)
|
|
|
+ ret = -ENODEV;
|
|
|
+ wsd->last_push_child = i;
|
|
|
+ return ret;
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+static void add_child(struct _starpu_sched_node *node,
|
|
|
+ struct _starpu_sched_node *child,
|
|
|
+ unsigned sched_ctx_id);
|
|
|
+//compute if le father is a work_stealing node
|
|
|
+static int is_my_fifo_node(struct _starpu_sched_node * node, unsigned sched_ctx_id)
|
|
|
+{
|
|
|
+ if(node->fathers[sched_ctx_id] == NULL)
|
|
|
+ return 0;
|
|
|
+ return node->fathers[sched_ctx_id]->add_child == add_child;
|
|
|
+}
|
|
|
+
|
|
|
//this function is special, when a worker call it, we want to push the task in his fifo
|
|
|
-//because he deserve it.
|
|
|
int _starpu_ws_push_task(struct starpu_task *task)
|
|
|
{
|
|
|
- int workerid = starpu_get_worker_id();
|
|
|
+ int workerid = starpu_worker_get_id();
|
|
|
if(workerid == -1)
|
|
|
return _starpu_tree_push_task(task);
|
|
|
unsigned sched_ctx_id = task->sched_ctx;
|
|
@@ -59,25 +293,16 @@ int _starpu_ws_push_task(struct starpu_task *task)
|
|
|
while(node->fathers[sched_ctx_id] != NULL)
|
|
|
{
|
|
|
node = node->fathers[sched_ctx_id];
|
|
|
- if(is_my_fifo_node(node))
|
|
|
+ if(is_my_fifo_node(node,sched_ctx_id))
|
|
|
{
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&node->mutex);
|
|
|
- int ret_val = _starpu_push_sorted_task(node->data, task);
|
|
|
+ int ret_val = _starpu_fifo_push_sorted_task(node->data, task);
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&node->mutex);
|
|
|
return ret_val;
|
|
|
}
|
|
|
}
|
|
|
//there were a problem here, dont know what to do
|
|
|
return _starpu_tree_push_task(task);
|
|
|
-
|
|
|
-}
|
|
|
-static struct _starpu_sched_node * fifo_ws_create(void)
|
|
|
-{
|
|
|
- struct _starpu_sched_node * node = _starpu_sched_node_create();
|
|
|
- struct _starpu_fifo_ws_data * fwsd = malloc(sizeof(struct _starpu_fifo_ws_data));
|
|
|
- fwsd->fifo = _starpu_create_fifo();
|
|
|
- node->data = fwsd;
|
|
|
- return node;
|
|
|
}
|
|
|
|
|
|
|
|
@@ -95,6 +320,9 @@ static void add_child(struct _starpu_sched_node *node,
|
|
|
sizeof(struct _starpu_sched_node*)
|
|
|
* (node->nchilds + 1));
|
|
|
struct _starpu_sched_node * fifo_node = _starpu_sched_node_fifo_create();
|
|
|
+ _starpu_sched_node_add_child(fifo_node, child, sched_ctx_id);
|
|
|
+
|
|
|
+
|
|
|
_starpu_sched_node_set_father(fifo_node, node, sched_ctx_id);
|
|
|
node->childs[node->nchilds] = fifo_node;
|
|
|
node->nchilds++;
|
|
@@ -114,7 +342,7 @@ static void remove_child(struct _starpu_sched_node *node,
|
|
|
STARPU_ASSERT(pos != node->nchilds);
|
|
|
struct _starpu_sched_node * fifo_node = node->childs[pos];
|
|
|
node->childs[pos] = node->childs[--node->nchilds];
|
|
|
- STARPU_ASSERT(fifo_node->fathers[sched_ctx_id] == node;
|
|
|
+ STARPU_ASSERT(fifo_node->fathers[sched_ctx_id] == node);
|
|
|
fifo_node->fathers[sched_ctx_id] = NULL;
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&node->mutex);
|
|
|
}
|
|
@@ -126,23 +354,70 @@ struct _starpu_sched_node * _starpu_sched_node_work_stealing_create(void)
|
|
|
struct _starpu_sched_node * node = _starpu_sched_node_create();
|
|
|
struct _starpu_work_stealing_data * wsd = malloc(sizeof(*wsd));
|
|
|
wsd->performed_total = 0;
|
|
|
- wsd->last_pop_worker = 0;
|
|
|
- wsd->last_push_worker = 0;
|
|
|
+ wsd->last_pop_child = 0;
|
|
|
+ wsd->last_push_child = 0;
|
|
|
+ node->data = wsd;
|
|
|
+ node->pop_task = pop_task;
|
|
|
+ node->push_task = push_task;
|
|
|
+ node->add_child = add_child;
|
|
|
+ node->remove_child = remove_child;
|
|
|
return node;
|
|
|
}
|
|
|
|
|
|
-static int push_task(struct _starpu_sched_node * node, struct starpu_task * task)
|
|
|
+
|
|
|
+
|
|
|
+static void initialize_ws_center_policy(unsigned sched_ctx_id)
|
|
|
{
|
|
|
- struct _starpu_work_stealing_data * wsd = node->data;
|
|
|
- int ret = -1;
|
|
|
- int start = wsd->last_pop_worker;
|
|
|
-
|
|
|
- int i;
|
|
|
- for(i = start + 1; i != start; (i+1)%node->nchilds)
|
|
|
- {
|
|
|
- if(!childs[i]->fifo)
|
|
|
- continue;
|
|
|
- ret = _starpu_fifo_push_sorted_task(childs[i]->fifo, task);
|
|
|
- }
|
|
|
+ starpu_sched_ctx_create_worker_collection(sched_ctx_id, STARPU_WORKER_LIST);
|
|
|
+ struct _starpu_sched_tree *data = malloc(sizeof(struct _starpu_sched_tree));
|
|
|
+ STARPU_PTHREAD_MUTEX_INIT(&data->mutex,NULL);
|
|
|
+ data->root = _starpu_sched_node_work_stealing_create();
|
|
|
|
|
|
+ starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)data);
|
|
|
}
|
|
|
+
|
|
|
+static void deinitialize_ws_center_policy(unsigned sched_ctx_id)
|
|
|
+{
|
|
|
+ struct _starpu_sched_tree *t = (struct _starpu_sched_tree*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
|
|
|
+ _starpu_tree_destroy(t, sched_ctx_id);
|
|
|
+ starpu_sched_ctx_delete_worker_collection(sched_ctx_id);
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+static void add_worker_ws(unsigned sched_ctx_id, int * workerids, unsigned nworkers)
|
|
|
+{
|
|
|
+ struct _starpu_sched_tree *t = starpu_sched_ctx_get_policy_data(sched_ctx_id);
|
|
|
+ unsigned i;
|
|
|
+ for(i = 0; i < nworkers; i++)
|
|
|
+ t->root->add_child(t->root,
|
|
|
+ _starpu_sched_node_worker_get(workerids[i]),
|
|
|
+ sched_ctx_id);
|
|
|
+ _starpu_tree_update_after_modification(t);
|
|
|
+}
|
|
|
+
|
|
|
+static void remove_worker_ws(unsigned sched_ctx_id, int * workerids, unsigned nworkers)
|
|
|
+{
|
|
|
+ struct _starpu_sched_tree *t = starpu_sched_ctx_get_policy_data(sched_ctx_id);
|
|
|
+ unsigned i;
|
|
|
+ for(i = 0; i < nworkers; i++)
|
|
|
+ t->root->remove_child(t->root,
|
|
|
+ _starpu_sched_node_worker_get(workerids[i]),
|
|
|
+ sched_ctx_id);
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+struct starpu_sched_policy _starpu_sched_tree_ws_policy =
|
|
|
+{
|
|
|
+ .init_sched = initialize_ws_center_policy,
|
|
|
+ .deinit_sched = deinitialize_ws_center_policy,
|
|
|
+ .add_workers = add_worker_ws,
|
|
|
+ .remove_workers = remove_worker_ws,
|
|
|
+ .push_task = _starpu_ws_push_task,
|
|
|
+ .pop_task = _starpu_tree_pop_task,
|
|
|
+ .pre_exec_hook = NULL,
|
|
|
+ .post_exec_hook = NULL,
|
|
|
+ .pop_every_task = NULL,
|
|
|
+ .policy_name = "tree-ws",
|
|
|
+ .policy_description = "work stealing tree policy"
|
|
|
+};
|