|
@@ -1,6 +1,6 @@
|
|
|
/* StarPU --- Runtime system for heterogeneous multicore architectures.
|
|
|
*
|
|
|
- * Copyright (C) 2010-2016 Université de Bordeaux
|
|
|
+ * Copyright (C) 2010-2017 Université de Bordeaux
|
|
|
* Copyright (C) 2010, 2011, 2012, 2013, 2015, 2016 CNRS
|
|
|
* Copyright (C) 2011 INRIA
|
|
|
* Copyright (C) 2016 Uppsala University
|
|
@@ -20,33 +20,21 @@
|
|
|
/*
|
|
|
* This is policy where every worker use the same JOB QUEUE, but taking
|
|
|
* task priorities into account
|
|
|
+ *
|
|
|
+ * TODO: merge with eager, after checking the scalability
|
|
|
*/
|
|
|
|
|
|
#include <starpu.h>
|
|
|
#include <starpu_scheduler.h>
|
|
|
#include <starpu_bitmap.h>
|
|
|
+#include "prio_deque.h"
|
|
|
|
|
|
#include <common/fxt.h>
|
|
|
#include <core/workers.h>
|
|
|
|
|
|
-#define DEFAULT_MIN_LEVEL (-5)
|
|
|
-#define DEFAULT_MAX_LEVEL (+5)
|
|
|
-
|
|
|
-struct _starpu_priority_taskq
|
|
|
-{
|
|
|
- int min_prio;
|
|
|
- int max_prio;
|
|
|
- /* the actual lists
|
|
|
- * taskq[p] is for priority [p - STARPU_MIN_PRIO] */
|
|
|
- struct starpu_task_list *taskq;
|
|
|
- unsigned *ntasks;
|
|
|
-
|
|
|
- unsigned total_ntasks;
|
|
|
-};
|
|
|
-
|
|
|
struct _starpu_eager_central_prio_data
|
|
|
{
|
|
|
- struct _starpu_priority_taskq *taskq;
|
|
|
+ struct _starpu_prio_deque taskq;
|
|
|
starpu_pthread_mutex_t policy_mutex;
|
|
|
struct starpu_bitmap *waiters;
|
|
|
};
|
|
@@ -55,54 +43,19 @@ struct _starpu_eager_central_prio_data
|
|
|
* Centralized queue with priorities
|
|
|
*/
|
|
|
|
|
|
-static struct _starpu_priority_taskq *_starpu_create_priority_taskq(int min_prio, int max_prio)
|
|
|
-{
|
|
|
- struct _starpu_priority_taskq *central_queue;
|
|
|
-
|
|
|
- _STARPU_MALLOC(central_queue, sizeof(struct _starpu_priority_taskq));
|
|
|
- central_queue->min_prio = min_prio;
|
|
|
- central_queue->max_prio = max_prio;
|
|
|
- central_queue->total_ntasks = 0;
|
|
|
- _STARPU_MALLOC(central_queue->taskq, (max_prio-min_prio+1) * sizeof(struct starpu_task_list));
|
|
|
- _STARPU_MALLOC(central_queue->ntasks, (max_prio-min_prio+1) * sizeof(unsigned));
|
|
|
-
|
|
|
- int prio;
|
|
|
- for (prio = 0; prio < (max_prio-min_prio+1); prio++)
|
|
|
- {
|
|
|
- starpu_task_list_init(¢ral_queue->taskq[prio]);
|
|
|
- central_queue->ntasks[prio] = 0;
|
|
|
- }
|
|
|
-
|
|
|
- return central_queue;
|
|
|
-}
|
|
|
-
|
|
|
-static void _starpu_destroy_priority_taskq(struct _starpu_priority_taskq *priority_queue)
|
|
|
-{
|
|
|
- free(priority_queue->ntasks);
|
|
|
- free(priority_queue->taskq);
|
|
|
- free(priority_queue);
|
|
|
-}
|
|
|
-
|
|
|
static void initialize_eager_center_priority_policy(unsigned sched_ctx_id)
|
|
|
{
|
|
|
struct _starpu_eager_central_prio_data *data;
|
|
|
_STARPU_MALLOC(data, sizeof(struct _starpu_eager_central_prio_data));
|
|
|
|
|
|
- /* In this policy, we support more than two levels of priority. */
|
|
|
-
|
|
|
- if (starpu_sched_ctx_min_priority_is_set(sched_ctx_id) == 0)
|
|
|
- starpu_sched_ctx_set_min_priority(sched_ctx_id, DEFAULT_MIN_LEVEL);
|
|
|
- if (starpu_sched_ctx_max_priority_is_set(sched_ctx_id) == 0)
|
|
|
- starpu_sched_ctx_set_max_priority(sched_ctx_id, DEFAULT_MAX_LEVEL);
|
|
|
-
|
|
|
/* only a single queue (even though there are several internaly) */
|
|
|
- data->taskq = _starpu_create_priority_taskq(starpu_sched_ctx_get_min_priority(sched_ctx_id), starpu_sched_ctx_get_max_priority(sched_ctx_id));
|
|
|
+ _starpu_prio_deque_init(&data->taskq);
|
|
|
data->waiters = starpu_bitmap_create();
|
|
|
|
|
|
/* Tell helgrind that it's fine to check for empty fifo in
|
|
|
* _starpu_priority_pop_task without actual mutex (it's just an
|
|
|
* integer) */
|
|
|
- STARPU_HG_DISABLE_CHECKING(data->taskq->total_ntasks);
|
|
|
+ STARPU_HG_DISABLE_CHECKING(data->taskq.ntasks);
|
|
|
starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)data);
|
|
|
STARPU_PTHREAD_MUTEX_INIT(&data->policy_mutex, NULL);
|
|
|
}
|
|
@@ -113,7 +66,7 @@ static void deinitialize_eager_center_priority_policy(unsigned sched_ctx_id)
|
|
|
struct _starpu_eager_central_prio_data *data = (struct _starpu_eager_central_prio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
|
|
|
|
|
|
/* deallocate the job queue */
|
|
|
- _starpu_destroy_priority_taskq(data->taskq);
|
|
|
+ _starpu_prio_deque_destroy(&data->taskq);
|
|
|
starpu_bitmap_destroy(data->waiters);
|
|
|
|
|
|
STARPU_PTHREAD_MUTEX_DESTROY(&data->policy_mutex);
|
|
@@ -124,16 +77,11 @@ static int _starpu_priority_push_task(struct starpu_task *task)
|
|
|
{
|
|
|
unsigned sched_ctx_id = task->sched_ctx;
|
|
|
struct _starpu_eager_central_prio_data *data = (struct _starpu_eager_central_prio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
|
|
|
- struct _starpu_priority_taskq *taskq = data->taskq;
|
|
|
+ struct _starpu_prio_deque *taskq = &data->taskq;
|
|
|
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
|
|
|
- unsigned priolevel = task->priority - starpu_sched_ctx_get_min_priority(sched_ctx_id);
|
|
|
- STARPU_ASSERT_MSG(task->priority >= starpu_sched_ctx_get_min_priority(sched_ctx_id) &&
|
|
|
- task->priority <= starpu_sched_ctx_get_max_priority(sched_ctx_id), "task priority %d is not between minimum %d and maximum %d\n", task->priority, starpu_sched_ctx_get_min_priority(sched_ctx_id), starpu_sched_ctx_get_max_priority(sched_ctx_id));
|
|
|
|
|
|
- starpu_task_list_push_back(&taskq->taskq[priolevel], task);
|
|
|
- taskq->ntasks[priolevel]++;
|
|
|
- taskq->total_ntasks++;
|
|
|
+ _starpu_prio_deque_push_back_task(taskq, task);
|
|
|
starpu_push_task_end(task);
|
|
|
|
|
|
/*if there are no tasks block */
|
|
@@ -191,19 +139,19 @@ static int _starpu_priority_push_task(struct starpu_task *task)
|
|
|
|
|
|
static struct starpu_task *_starpu_priority_pop_task(unsigned sched_ctx_id)
|
|
|
{
|
|
|
- struct starpu_task *chosen_task = NULL, *task, *nexttask;
|
|
|
+ struct starpu_task *chosen_task;
|
|
|
unsigned workerid = starpu_worker_get_id_check();
|
|
|
int skipped = 0;
|
|
|
|
|
|
struct _starpu_eager_central_prio_data *data = (struct _starpu_eager_central_prio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
|
|
|
|
|
|
- struct _starpu_priority_taskq *taskq = data->taskq;
|
|
|
+ struct _starpu_prio_deque *taskq = &data->taskq;
|
|
|
|
|
|
/* block until some event happens */
|
|
|
/* Here helgrind would shout that this is unprotected, this is just an
|
|
|
* integer access, and we hold the sched mutex, so we can not miss any
|
|
|
* wake up. */
|
|
|
- if (!STARPU_RUNNING_ON_VALGRIND && taskq->total_ntasks == 0)
|
|
|
+ if (!STARPU_RUNNING_ON_VALGRIND && _starpu_prio_deque_is_empty(taskq))
|
|
|
return NULL;
|
|
|
|
|
|
#ifdef STARPU_NON_BLOCKING_DRIVERS
|
|
@@ -222,34 +170,7 @@ static struct starpu_task *_starpu_priority_pop_task(unsigned sched_ctx_id)
|
|
|
there's no need for their own mutex to be locked */
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
|
|
|
|
|
|
- unsigned priolevel = taskq->max_prio - taskq->min_prio;
|
|
|
- do
|
|
|
- {
|
|
|
- if (taskq->ntasks[priolevel] > 0)
|
|
|
- {
|
|
|
- for (task = starpu_task_list_begin(&taskq->taskq[priolevel]);
|
|
|
- task != starpu_task_list_end(&taskq->taskq[priolevel]) && !chosen_task;
|
|
|
- task = nexttask)
|
|
|
- {
|
|
|
- unsigned nimpl;
|
|
|
- nexttask = starpu_task_list_next(task);
|
|
|
- if (starpu_worker_can_execute_task_first_impl(workerid, task, &nimpl))
|
|
|
- {
|
|
|
- /* there is some task that we can grab */
|
|
|
- starpu_task_set_implementation(task, nimpl);
|
|
|
- starpu_task_list_erase(&taskq->taskq[priolevel], task);
|
|
|
- chosen_task = task;
|
|
|
- taskq->ntasks[priolevel]--;
|
|
|
- taskq->total_ntasks--;
|
|
|
- break;
|
|
|
- }
|
|
|
- else
|
|
|
- skipped = 1;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- while (!chosen_task && priolevel-- > 0);
|
|
|
-
|
|
|
+ chosen_task = _starpu_prio_deque_pop_task_for_worker(taskq, workerid, &skipped);
|
|
|
|
|
|
if (!chosen_task && skipped)
|
|
|
{
|