|
@@ -107,7 +107,7 @@ static double prio_estimated_load(struct starpu_sched_node * node)
|
|
|
return load;
|
|
|
}
|
|
|
|
|
|
-static int prio_push_task(struct starpu_sched_node * node, struct starpu_task * task)
|
|
|
+static int prio_push_local_task(struct starpu_sched_node * node, struct starpu_task * task, unsigned is_pushback)
|
|
|
{
|
|
|
STARPU_ASSERT(node && node->data && task);
|
|
|
STARPU_ASSERT(starpu_sched_node_can_execute_task(node,task));
|
|
@@ -116,93 +116,40 @@ static int prio_push_task(struct starpu_sched_node * node, struct starpu_task *
|
|
|
starpu_pthread_mutex_t * mutex = &data->mutex;
|
|
|
int ret;
|
|
|
|
|
|
- STARPU_ASSERT(node->nchilds == 1);
|
|
|
- struct starpu_sched_node * child = node->childs[0];
|
|
|
-
|
|
|
- if(starpu_sched_node_is_worker(child))
|
|
|
- ret = 1;
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(mutex);
|
|
|
+ double exp_len;
|
|
|
+ if(!isnan(task->predicted))
|
|
|
+ exp_len = prio->exp_len + task->predicted;
|
|
|
else
|
|
|
- /* TODO: no, don't try to push it immediately to the child.
|
|
|
- * Just push it to the queue, pop one task from the queue and
|
|
|
- * try to push that task to the child. */
|
|
|
- ret = child->push_task(child,task);
|
|
|
+ exp_len = prio->exp_len;
|
|
|
|
|
|
- if(ret)
|
|
|
+ if((data->ntasks_threshold != 0) && (data->exp_len_threshold != 0.0) &&
|
|
|
+ ((prio->ntasks >= data->ntasks_threshold) || (exp_len >= data->exp_len_threshold)))
|
|
|
{
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(mutex);
|
|
|
- double exp_len;
|
|
|
- if(!isnan(task->predicted))
|
|
|
- exp_len = prio->exp_len + task->predicted;
|
|
|
- else
|
|
|
- exp_len = prio->exp_len;
|
|
|
-
|
|
|
- if((data->ntasks_threshold != 0) && (data->exp_len_threshold != 0.0) &&
|
|
|
- ((prio->ntasks >= data->ntasks_threshold) || (exp_len >= data->exp_len_threshold)))
|
|
|
+ static int warned;
|
|
|
+ if(task->predicted > data->exp_len_threshold && !warned)
|
|
|
{
|
|
|
- static int warned;
|
|
|
- if(task->predicted > data->exp_len_threshold && !warned)
|
|
|
- {
|
|
|
- _STARPU_DISP("Warning : a predicted task length (%lf) exceeds the expected length threshold (%lf) of a prio node queue, you should reconsider the value of this threshold. This message will not be printed again for further thresholds exceeding.\n",task->predicted,data->exp_len_threshold);
|
|
|
- warned = 1;
|
|
|
- }
|
|
|
- ret = 1;
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(mutex);
|
|
|
+ _STARPU_DISP("Warning : a predicted task length (%lf) exceeds the expected length threshold (%lf) of a prio node queue, you should reconsider the value of this threshold. This message will not be printed again for further thresholds exceeding.\n",task->predicted,data->exp_len_threshold);
|
|
|
+ warned = 1;
|
|
|
}
|
|
|
+ ret = 1;
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(mutex);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ if(is_pushback)
|
|
|
+ ret = _starpu_prio_deque_push_back_task(prio,task);
|
|
|
else
|
|
|
{
|
|
|
- starpu_sched_node_prefetch_on_node(node, task);
|
|
|
ret = _starpu_prio_deque_push_task(prio,task);
|
|
|
+ starpu_sched_node_prefetch_on_node(node, task);
|
|
|
STARPU_TRACE_SCHED_NODE_PUSH_PRIO(node, prio->ntasks, exp_len);
|
|
|
- if(!isnan(task->predicted))
|
|
|
- {
|
|
|
- prio->exp_len = exp_len;
|
|
|
- prio->exp_end = prio->exp_start + prio->exp_len;
|
|
|
- }
|
|
|
- STARPU_ASSERT(!isnan(prio->exp_end));
|
|
|
- STARPU_ASSERT(!isnan(prio->exp_len));
|
|
|
- STARPU_ASSERT(!isnan(prio->exp_start));
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(mutex);
|
|
|
-
|
|
|
- // When a task is pushed onto the local queue, we signify to our children
|
|
|
- // that a task has been pushed, and that if everyone is sleeping, someone
|
|
|
- // needs to wake up to come and take it.
|
|
|
- node->avail(node);
|
|
|
}
|
|
|
- }
|
|
|
- return ret;
|
|
|
-}
|
|
|
-
|
|
|
-static int prio_push_back_task(struct starpu_sched_node * node, struct starpu_task * task)
|
|
|
-{
|
|
|
- STARPU_ASSERT(node && node->data && task);
|
|
|
- STARPU_ASSERT(starpu_sched_node_can_execute_task(node,task));
|
|
|
- struct _starpu_prio_data * data = node->data;
|
|
|
- struct _starpu_prio_deque * prio = &data->prio;
|
|
|
- starpu_pthread_mutex_t * mutex = &data->mutex;
|
|
|
- int ret;
|
|
|
-
|
|
|
- STARPU_ASSERT(node->nchilds == 1);
|
|
|
- struct starpu_sched_node * child = node->childs[0];
|
|
|
- if(starpu_sched_node_is_worker(child))
|
|
|
- ret = 1;
|
|
|
- else
|
|
|
- ret = child->push_task(child,task);
|
|
|
|
|
|
- if(ret)
|
|
|
- {
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(mutex);
|
|
|
- double exp_len;
|
|
|
- if(!isnan(task->predicted))
|
|
|
- exp_len = prio->exp_len + task->predicted;
|
|
|
- else
|
|
|
- exp_len = prio->exp_len;
|
|
|
-
|
|
|
- _starpu_prio_deque_push_back_task(prio,task);
|
|
|
- STARPU_TRACE_SCHED_NODE_PUSH_PRIO(node, prio->ntasks, exp_len);
|
|
|
if(!isnan(task->predicted))
|
|
|
{
|
|
|
prio->exp_len = exp_len;
|
|
|
- prio->exp_end = prio->exp_start + exp_len;
|
|
|
+ prio->exp_end = prio->exp_start + prio->exp_len;
|
|
|
}
|
|
|
STARPU_ASSERT(!isnan(prio->exp_end));
|
|
|
STARPU_ASSERT(!isnan(prio->exp_len));
|
|
@@ -212,17 +159,24 @@ static int prio_push_back_task(struct starpu_sched_node * node, struct starpu_ta
|
|
|
// When a task is pushed onto the local queue, we signify to our children
|
|
|
// that a task has been pushed, and that if everyone is sleeping, someone
|
|
|
// needs to wake up to come and take it.
|
|
|
- node->avail(node);
|
|
|
+ if(!is_pushback)
|
|
|
+ node->avail(node);
|
|
|
}
|
|
|
+
|
|
|
return ret;
|
|
|
}
|
|
|
|
|
|
+static int prio_push_task(struct starpu_sched_node * node, struct starpu_task * task)
|
|
|
+{
|
|
|
+ return prio_push_local_task(node, task, 0);
|
|
|
+}
|
|
|
+
|
|
|
int starpu_sched_node_is_prio(struct starpu_sched_node * node)
|
|
|
{
|
|
|
return node->push_task == prio_push_task;
|
|
|
}
|
|
|
|
|
|
-static struct starpu_task * prio_pop_task(struct starpu_sched_node * node, unsigned sched_ctx_id)
|
|
|
+static struct starpu_task * prio_pop_task(struct starpu_sched_node * node)
|
|
|
{
|
|
|
STARPU_ASSERT(node && node->data);
|
|
|
struct _starpu_prio_data * data = node->data;
|
|
@@ -250,9 +204,14 @@ static struct starpu_task * prio_pop_task(struct starpu_sched_node * node, unsig
|
|
|
|
|
|
// When a pop is called, a room is called for pushing tasks onto
|
|
|
// the empty place of the queue left by the popped task.
|
|
|
- struct starpu_sched_node * father = node->fathers[sched_ctx_id];
|
|
|
- if(father != NULL)
|
|
|
- father->room(father, sched_ctx_id);
|
|
|
+ int i;
|
|
|
+ for(i=0; i < node->nfathers; i++)
|
|
|
+ {
|
|
|
+ if(node->fathers[i] == NULL)
|
|
|
+ continue;
|
|
|
+ else
|
|
|
+ node->fathers[i]->room(node->fathers[i]);
|
|
|
+ }
|
|
|
|
|
|
if(task)
|
|
|
return task;
|
|
@@ -265,20 +224,27 @@ static struct starpu_task * prio_pop_task(struct starpu_sched_node * node, unsig
|
|
|
* push fails, which means that the worker prio_nodes are
|
|
|
* currently "full".
|
|
|
*/
|
|
|
-static void prio_room(struct starpu_sched_node * node, unsigned sched_ctx_id)
|
|
|
+static int prio_room(struct starpu_sched_node * node)
|
|
|
{
|
|
|
STARPU_ASSERT(node && starpu_sched_node_is_prio(node));
|
|
|
int ret = 0;
|
|
|
|
|
|
- struct starpu_task * task = node->pop_task(node, sched_ctx_id);
|
|
|
+ STARPU_ASSERT(node->nchilds == 1);
|
|
|
+ struct starpu_sched_node * child = node->childs[0];
|
|
|
+
|
|
|
+ struct starpu_task * task = node->pop_task(node);
|
|
|
if(task)
|
|
|
- ret = node->push_back_task(node,task);
|
|
|
+ ret = child->push_task(child,task);
|
|
|
while(task && !ret)
|
|
|
{
|
|
|
- task = node->pop_task(node, sched_ctx_id);
|
|
|
+ task = node->pop_task(node);
|
|
|
if(task)
|
|
|
- ret = node->push_back_task(node,task);
|
|
|
+ ret = child->push_task(child,task);
|
|
|
}
|
|
|
+ if(task && ret)
|
|
|
+ prio_push_local_task(node,task,1);
|
|
|
+
|
|
|
+ return 1;
|
|
|
}
|
|
|
|
|
|
struct starpu_sched_node * starpu_sched_node_prio_create(struct starpu_prio_data * params)
|
|
@@ -291,7 +257,6 @@ struct starpu_sched_node * starpu_sched_node_prio_create(struct starpu_prio_data
|
|
|
node->estimated_end = prio_estimated_end;
|
|
|
node->estimated_load = prio_estimated_load;
|
|
|
node->push_task = prio_push_task;
|
|
|
- node->push_back_task = prio_push_back_task;
|
|
|
node->pop_task = prio_pop_task;
|
|
|
node->room = prio_room;
|
|
|
node->deinit_data = prio_node_deinit_data;
|