|
@@ -461,11 +461,29 @@ unsigned _starpu_enforce_deps_starting_from_task(struct _starpu_job *j)
|
|
return ret;
|
|
return ret;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+/* Ordered tasks are simply recorded as they arrive in the local_ordered_tasks
|
|
|
|
+ * ring buffer, indexed by order, and pulled from its head. */
|
|
|
|
+/* TODO: replace with perhaps a heap */
|
|
|
|
+
|
|
/* This function must be called with worker->sched_mutex taken */
|
|
/* This function must be called with worker->sched_mutex taken */
|
|
struct starpu_task *_starpu_pop_local_task(struct _starpu_worker *worker)
|
|
struct starpu_task *_starpu_pop_local_task(struct _starpu_worker *worker)
|
|
{
|
|
{
|
|
struct starpu_task *task = NULL;
|
|
struct starpu_task *task = NULL;
|
|
|
|
|
|
|
|
+ if (worker->local_ordered_tasks_size)
|
|
|
|
+ {
|
|
|
|
+ task = worker->local_ordered_tasks[worker->current_ordered_task];
|
|
|
|
+ if (task)
|
|
|
|
+ {
|
|
|
|
+ worker->local_ordered_tasks[worker->current_ordered_task] = NULL;
|
|
|
|
+ STARPU_ASSERT(task->workerorder == worker->current_ordered_task_order);
|
|
|
|
+ /* Next ordered task is there, return it */
|
|
|
|
+ worker->current_ordered_task = (worker->current_ordered_task + 1) % worker->local_ordered_tasks_size;
|
|
|
|
+ worker->current_ordered_task_order++;
|
|
|
|
+ return task;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
if (!starpu_task_list_empty(&worker->local_tasks))
|
|
if (!starpu_task_list_empty(&worker->local_tasks))
|
|
task = starpu_task_list_pop_front(&worker->local_tasks);
|
|
task = starpu_task_list_pop_front(&worker->local_tasks);
|
|
|
|
|
|
@@ -481,10 +499,43 @@ int _starpu_push_local_task(struct _starpu_worker *worker, struct starpu_task *t
|
|
|
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
|
|
STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
|
|
|
|
|
|
- if (prio)
|
|
|
|
- starpu_task_list_push_front(&worker->local_tasks, task);
|
|
|
|
|
|
+ if (task->execute_on_a_specific_worker && task->workerorder)
|
|
|
|
+ {
|
|
|
|
+ STARPU_ASSERT_MSG(task->workerorder >= worker->current_ordered_task_order, "worker order values must not have duplicates");
|
|
|
|
+ /* Put it in the ordered task ring */
|
|
|
|
+ unsigned needed = task->workerorder - worker->current_ordered_task_order + 1;
|
|
|
|
+ if (worker->local_ordered_tasks_size < needed)
|
|
|
|
+ {
|
|
|
|
+ /* Increase the size */
|
|
|
|
+ unsigned alloc = worker->local_ordered_tasks_size;
|
|
|
|
+ struct starpu_task **new;
|
|
|
|
+ unsigned copied;
|
|
|
|
+
|
|
|
|
+ if (!alloc)
|
|
|
|
+ alloc = 1;
|
|
|
|
+ while (alloc < needed)
|
|
|
|
+ alloc *= 2;
|
|
|
|
+ new = malloc(alloc * sizeof(*new));
|
|
|
|
+
|
|
|
|
+ /* Put existing tasks at the beginning of the new ring */
|
|
|
|
+ copied = worker->local_ordered_tasks_size - worker->current_ordered_task;
|
|
|
|
+ memcpy(new, &worker->local_ordered_tasks[worker->current_ordered_task], copied * sizeof(*new));
|
|
|
|
+ memcpy(new + copied, worker->local_ordered_tasks, (worker->local_ordered_tasks_size - copied) * sizeof(*new));
|
|
|
|
+ memset(new + worker->local_ordered_tasks_size, 0, (alloc - worker->local_ordered_tasks_size) * sizeof(*new));
|
|
|
|
+ free(worker->local_ordered_tasks);
|
|
|
|
+ worker->local_ordered_tasks = new;
|
|
|
|
+ worker->local_ordered_tasks_size = alloc;
|
|
|
|
+ worker->current_ordered_task = 0;
|
|
|
|
+ }
|
|
|
|
+ worker->local_ordered_tasks[(worker->current_ordered_task + task->workerorder - worker->current_ordered_task_order) % worker->local_ordered_tasks_size] = task;
|
|
|
|
+ }
|
|
else
|
|
else
|
|
- starpu_task_list_push_back(&worker->local_tasks, task);
|
|
|
|
|
|
+ {
|
|
|
|
+ if (prio)
|
|
|
|
+ starpu_task_list_push_front(&worker->local_tasks, task);
|
|
|
|
+ else
|
|
|
|
+ starpu_task_list_push_back(&worker->local_tasks, task);
|
|
|
|
+ }
|
|
|
|
|
|
STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
|
|
STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
|
|
starpu_push_task_end(task);
|
|
starpu_push_task_end(task);
|