| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657 |
- /* StarPU --- Runtime system for heterogeneous multicore architectures.
- *
- * Copyright (C) 2015 INRIA
- * Copyright (C) 2016 CNRS
- * Copyright (C) 2016 Uppsala University
- *
- * StarPU is free software; you can redistribute it and/or modify
- * it under the terms of the GNU Lesser General Public License as published by
- * the Free Software Foundation; either version 2.1 of the License, or (at
- * your option) any later version.
- *
- * StarPU is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
- *
- * See the GNU Lesser General Public License in COPYING.LGPL for more details.
- */
- /* Distributed queues using performance modeling to assign tasks */
- #include <starpu_config.h>
- #include <starpu_scheduler.h>
- #include <schedulers/starpu_heteroprio.h>
- #include <common/fxt.h>
- #include <core/task.h>
- #include <core/workers.h>
- #include <core/debug.h>
- #include <sched_policies/fifo_queues.h>
- #include <limits.h>
- #ifndef DBL_MIN
- #define DBL_MIN __DBL_MIN__
- #endif
- #ifndef DBL_MAX
- #define DBL_MAX __DBL_MAX__
- #endif
- /* A bucket corresponds to a Pair of priorities
- * When a task is pushed with a priority X, it will be stored
- * into the bucket X.
- * All the tasks stored in the fifo should be computable by the arch
- * in valid_archs.
- * For example if valid_archs = (STARPU_CPU|STARPU_CUDA)
- * Then task->task->cl->where should be at least (STARPU_CPU|STARPU_CUDA)
- */
- struct _heteroprio_bucket
- {
- /* The task of the current bucket */
- struct _starpu_fifo_taskq* tasks_queue;
- /* The correct arch for the current bucket */
- unsigned valid_archs;
- /* The slow factors for any archs */
- float slow_factors_per_index[STARPU_NB_TYPES];
- /* The base arch for the slow factor (the fatest arch for the current task in the bucket */
- unsigned factor_base_arch_index;
- };
- /* Init a bucket */
- static void _heteroprio_bucket_init(struct _heteroprio_bucket* bucket)
- {
- memset(bucket, 0, sizeof(*bucket));
- bucket->tasks_queue = _starpu_create_fifo();
- }
- /* Release a bucket */
- static void _heteroprio_bucket_release(struct _heteroprio_bucket* bucket)
- {
- STARPU_ASSERT(_starpu_fifo_empty(bucket->tasks_queue) != 0);
- _starpu_destroy_fifo(bucket->tasks_queue);
- }
- /* A worker is mainly composed of a fifo for the tasks
- * and some direct access to worker properties.
- * The fifo is implemented with any array,
- * to read a task, access tasks_queue[tasks_queue_index]
- * to write a task, access tasks_queue[(tasks_queue_index+tasks_queue_size)%HETEROPRIO_MAX_PREFETCH]
- */
- /* ANDRA_MODIF: can use starpu fifo + starpu sched_mutex*/
- struct _heteroprio_worker_wrapper
- {
- unsigned arch_type;
- unsigned arch_index;
- struct _starpu_fifo_taskq *tasks_queue;
- };
- struct _starpu_heteroprio_data
- {
- starpu_pthread_mutex_t policy_mutex;
- struct starpu_bitmap *waiters;
- /* The bucket to store the tasks */
- struct _heteroprio_bucket buckets[STARPU_HETEROPRIO_MAX_PRIO];
- /* The number of buckets for each arch */
- unsigned nb_prio_per_arch_index[STARPU_NB_TYPES];
- /* The mapping to the corresponding buckets */
- unsigned prio_mapping_per_arch_index[STARPU_NB_TYPES][STARPU_HETEROPRIO_MAX_PRIO];
- /* The number of available tasks for a given arch (not prefetched) */
- unsigned nb_remaining_tasks_per_arch_index[STARPU_NB_TYPES];
- /* The total number of tasks in the bucket (not prefetched) */
- unsigned total_tasks_in_buckets;
- /* The total number of prefetched tasks for a given arch */
- unsigned nb_prefetched_tasks_per_arch_index[STARPU_NB_TYPES];
- /* The information for all the workers */
- struct _heteroprio_worker_wrapper workers_heteroprio[STARPU_NMAXWORKERS];
- /* The number of workers for a given arch */
- unsigned nb_workers_per_arch_index[STARPU_NB_TYPES];
- };
- /** Tell how many prio there are for a given arch */
- void starpu_heteroprio_set_nb_prios(unsigned sched_ctx_id, enum starpu_heteroprio_types arch, unsigned max_prio)
- {
- struct _starpu_heteroprio_data *hp = (struct _starpu_heteroprio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
- STARPU_ASSERT(max_prio < STARPU_HETEROPRIO_MAX_PRIO);
- hp->nb_prio_per_arch_index[arch] = max_prio;
- }
- /** Set the mapping for a given arch prio=>bucket */
- inline void starpu_heteroprio_set_mapping(unsigned sched_ctx_id, enum starpu_heteroprio_types arch, unsigned source_prio, unsigned dest_bucket_id)
- {
- STARPU_ASSERT(dest_bucket_id < STARPU_HETEROPRIO_MAX_PRIO);
- struct _starpu_heteroprio_data *hp = (struct _starpu_heteroprio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
- hp->prio_mapping_per_arch_index[arch][source_prio] = dest_bucket_id;
- hp->buckets[dest_bucket_id].valid_archs |= starpu_heteroprio_types_to_arch[arch];
- _STARPU_DEBUG("Adding arch %d to bucket %d\n", arch, dest_bucket_id);
- }
- /** Tell which arch is the faster for the tasks of a bucket (optional) */
- inline void starpu_heteroprio_set_faster_arch(unsigned sched_ctx_id, enum starpu_heteroprio_types arch, unsigned bucket_id)
- {
- STARPU_ASSERT(bucket_id < STARPU_HETEROPRIO_MAX_PRIO);
- struct _starpu_heteroprio_data *hp = (struct _starpu_heteroprio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
- hp->buckets[bucket_id].factor_base_arch_index = arch;
- hp->buckets[bucket_id].slow_factors_per_index[arch] = 0;
- }
- /** Tell how slow is a arch for the tasks of a bucket (optional) */
- inline void starpu_heteroprio_set_arch_slow_factor(unsigned sched_ctx_id, enum starpu_heteroprio_types arch, unsigned bucket_id, float slow_factor)
- {
- STARPU_ASSERT(bucket_id < STARPU_HETEROPRIO_MAX_PRIO);
- struct _starpu_heteroprio_data *hp = (struct _starpu_heteroprio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
- hp->buckets[bucket_id].slow_factors_per_index[arch] = slow_factor;
- }
- /** If the user does not provide an init callback we create a single bucket for all architectures */
- static inline void default_init_sched(unsigned sched_ctx_id)
- {
- int min_prio = starpu_sched_ctx_get_min_priority(sched_ctx_id);
- int max_prio = starpu_sched_ctx_get_max_priority(sched_ctx_id);
- // By default each type of devices uses 1 bucket and no slow factor
- #ifdef STARPU_USE_CPU
- starpu_heteroprio_set_nb_prios(sched_ctx_id, STARPU_CPU_IDX, max_prio-min_prio+1);
- #endif
- #ifdef STARPU_USE_CUDA
- starpu_heteroprio_set_nb_prios(sched_ctx_id, STARPU_CUDA_IDX, max_prio-min_prio+1);
- #endif
- #ifdef STARPU_USE_OPENCL
- starpu_heteroprio_set_nb_prios(sched_ctx_id, STARPU_OPENCL_IDX, max_prio-min_prio+1);
- #endif
- #ifdef STARPU_USE_MIC
- starpu_heteroprio_set_nb_prios(sched_ctx_id, STARPU_MIC_IDX, max_prio-min_prio+1);
- #endif
- #ifdef STARPU_USE_SCC
- starpu_heteroprio_set_nb_prios(sched_ctx_id, STARPU_SCC_IDX, max_prio-min_prio+1);
- #endif
- // Direct mapping
- int prio;
- for(prio=min_prio ; prio<=max_prio ; prio++)
- {
- #ifdef STARPU_USE_CPU
- starpu_heteroprio_set_mapping(sched_ctx_id, STARPU_CPU_IDX, prio, prio);
- #endif
- #ifdef STARPU_USE_CUDA
- starpu_heteroprio_set_mapping(sched_ctx_id, STARPU_CUDA_IDX, prio, prio);
- #endif
- #ifdef STARPU_USE_OPENCL
- starpu_heteroprio_set_mapping(sched_ctx_id, STARPU_OPENCL_IDX, prio, prio);
- #endif
- #ifdef STARPU_USE_MIC
- starpu_heteroprio_set_mapping(sched_ctx_id, STARPU_MIC_IDX, prio, prio);
- #endif
- #ifdef STARPU_USE_SCC
- starpu_heteroprio_set_mapping(sched_ctx_id, STARPU_SCC_IDX, prio, prio);
- #endif
- }
- }
- static void initialize_heteroprio_policy(unsigned sched_ctx_id)
- {
- /* Alloc the scheduler data */
- struct _starpu_heteroprio_data *hp;
- _STARPU_MALLOC(hp, sizeof(struct _starpu_heteroprio_data));
- memset(hp, 0, sizeof(*hp));
- hp->waiters = starpu_bitmap_create();
- starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)hp);
- STARPU_PTHREAD_MUTEX_INIT(&hp->policy_mutex, NULL);
- unsigned idx_prio;
- for(idx_prio = 0; idx_prio < STARPU_HETEROPRIO_MAX_PRIO; ++idx_prio)
- _heteroprio_bucket_init(&hp->buckets[idx_prio]);
- void (*init_sched)(unsigned) = starpu_sched_ctx_get_sched_policy_init(sched_ctx_id);
- if(init_sched)
- init_sched(sched_ctx_id);
- else
- default_init_sched(sched_ctx_id);
- /* Ensure that information have been correctly filled */
- unsigned check_all_archs[STARPU_HETEROPRIO_MAX_PRIO];
- memset(check_all_archs, 0, sizeof(unsigned)*STARPU_HETEROPRIO_MAX_PRIO);
- unsigned arch_index;
- for(arch_index = 0; arch_index < STARPU_NB_TYPES; ++arch_index)
- {
- STARPU_ASSERT(hp->nb_prio_per_arch_index[arch_index] <= STARPU_HETEROPRIO_MAX_PRIO);
- unsigned check_archs[STARPU_HETEROPRIO_MAX_PRIO];
- memset(check_archs, 0, sizeof(unsigned)*STARPU_HETEROPRIO_MAX_PRIO);
- for(idx_prio = 0; idx_prio < hp->nb_prio_per_arch_index[arch_index]; ++idx_prio)
- {
- const unsigned mapped_prio = hp->prio_mapping_per_arch_index[arch_index][idx_prio];
- STARPU_ASSERT(mapped_prio <= STARPU_HETEROPRIO_MAX_PRIO);
- STARPU_ASSERT(hp->buckets[mapped_prio].slow_factors_per_index[arch_index] >= 0.0);
- STARPU_ASSERT(hp->buckets[mapped_prio].valid_archs & starpu_heteroprio_types_to_arch[arch_index]);
- check_archs[mapped_prio] = 1;
- check_all_archs[mapped_prio] += 1;
- }
- for(idx_prio = 0; idx_prio < STARPU_HETEROPRIO_MAX_PRIO; ++idx_prio)
- {
- /* Ensure the current arch use a bucket or someone else can use it */
- STARPU_ASSERT(check_archs[idx_prio] == 1 || hp->buckets[idx_prio].valid_archs == 0
- || (hp->buckets[idx_prio].valid_archs & ~starpu_heteroprio_types_to_arch[arch_index]) != 0);
- }
- }
- /* Ensure that if a valid_archs = (STARPU_CPU|STARPU_CUDA) then check_all_archs[] = 2 for example */
- for(idx_prio = 0; idx_prio < STARPU_HETEROPRIO_MAX_PRIO; ++idx_prio)
- {
- unsigned nb_arch_on_bucket = 0;
- for(arch_index = 0; arch_index < STARPU_NB_TYPES; ++arch_index)
- {
- if(hp->buckets[idx_prio].valid_archs & starpu_heteroprio_types_to_arch[arch_index])
- {
- nb_arch_on_bucket += 1;
- }
- }
- STARPU_ASSERT_MSG(check_all_archs[idx_prio] == nb_arch_on_bucket, "check_all_archs[idx_prio(%u)] = %u != nb_arch_on_bucket = %u\n", idx_prio, check_all_archs[idx_prio], nb_arch_on_bucket);
- }
- }
- static void deinitialize_heteroprio_policy(unsigned sched_ctx_id)
- {
- struct _starpu_heteroprio_data *hp = (struct _starpu_heteroprio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
- /* Ensure there are no more tasks */
- STARPU_ASSERT(hp->total_tasks_in_buckets == 0);
- unsigned arch_index;
- for(arch_index = 0; arch_index < STARPU_NB_TYPES; ++arch_index)
- {
- STARPU_ASSERT(hp->nb_remaining_tasks_per_arch_index[arch_index] == 0);
- STARPU_ASSERT(hp->nb_prefetched_tasks_per_arch_index[arch_index] == 0);
- }
- unsigned idx_prio;
- for(idx_prio = 0; idx_prio < STARPU_HETEROPRIO_MAX_PRIO; ++idx_prio)
- {
- _heteroprio_bucket_release(&hp->buckets[idx_prio]);
- }
- starpu_bitmap_destroy(hp->waiters);
- STARPU_PTHREAD_MUTEX_DESTROY(&hp->policy_mutex);
- free(hp);
- }
- static void add_workers_heteroprio_policy(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
- {
- struct _starpu_heteroprio_data *hp = (struct _starpu_heteroprio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
- unsigned i;
- for (i = 0; i < nworkers; i++)
- {
- int workerid = workerids[i];
- memset(&hp->workers_heteroprio[workerid], 0, sizeof(hp->workers_heteroprio[workerid]));
- /* if the worker has already belonged to this context
- the queue and the synchronization variables have been already initialized */
- if(hp->workers_heteroprio[workerid].tasks_queue == NULL)
- {
- hp->workers_heteroprio[workerid].tasks_queue = _starpu_create_fifo();
- switch(starpu_worker_get_type(workerid))
- {
- case STARPU_CPU_WORKER:
- hp->workers_heteroprio[workerid].arch_type = STARPU_CPU;
- hp->workers_heteroprio[workerid].arch_index = STARPU_CPU_IDX;
- break;
- case STARPU_CUDA_WORKER:
- hp->workers_heteroprio[workerid].arch_type = STARPU_CUDA;
- hp->workers_heteroprio[workerid].arch_index = STARPU_CUDA_IDX;
- break;
- case STARPU_OPENCL_WORKER:
- hp->workers_heteroprio[workerid].arch_type = STARPU_OPENCL;
- hp->workers_heteroprio[workerid].arch_index = STARPU_OPENCL_IDX;
- break;
- case STARPU_MIC_WORKER:
- hp->workers_heteroprio[workerid].arch_type = STARPU_MIC;
- hp->workers_heteroprio[workerid].arch_index = STARPU_MIC_IDX;
- break;
- case STARPU_SCC_WORKER:
- hp->workers_heteroprio[workerid].arch_type = STARPU_SCC;
- hp->workers_heteroprio[workerid].arch_index = STARPU_SCC_IDX;
- break;
- default:
- STARPU_ASSERT(0);
- }
- }
- hp->nb_workers_per_arch_index[hp->workers_heteroprio[workerid].arch_index]++;
- }
- }
- static void remove_workers_heteroprio_policy(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
- {
- struct _starpu_heteroprio_data *hp = (struct _starpu_heteroprio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
- unsigned i;
- for (i = 0; i < nworkers; i++)
- {
- int workerid = workerids[i];
- if(hp->workers_heteroprio[workerid].tasks_queue != NULL)
- {
- _starpu_destroy_fifo(hp->workers_heteroprio[workerid].tasks_queue);
- hp->workers_heteroprio[workerid].tasks_queue = NULL;
- }
- }
- }
- /* Push a new task (simply store it and update counters) */
- static int push_task_heteroprio_policy(struct starpu_task *task)
- {
- unsigned sched_ctx_id = task->sched_ctx;
- struct _starpu_heteroprio_data *hp = (struct _starpu_heteroprio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
- /* One worker at a time use heteroprio */
- STARPU_PTHREAD_MUTEX_LOCK(&hp->policy_mutex);
- /* Retrieve the correct bucket */
- STARPU_ASSERT(task->priority < STARPU_HETEROPRIO_MAX_PRIO);
- struct _heteroprio_bucket* bucket = &hp->buckets[task->priority];
- /* Ensure that any worker that check that list can compute the task */
- STARPU_ASSERT_MSG(bucket->valid_archs, "The bucket %d does not have any archs\n", task->priority);
- STARPU_ASSERT(((bucket->valid_archs ^ task->cl->where) & bucket->valid_archs) == 0);
- /* save the task */
- _starpu_fifo_push_back_task(bucket->tasks_queue,task);
- /* Inc counters */
- unsigned arch_index;
- for(arch_index = 0; arch_index < STARPU_NB_TYPES; ++arch_index)
- {
- /* We test the archs on the bucket and not on task->cl->where since it is restrictive */
- if(bucket->valid_archs & starpu_heteroprio_types_to_arch[arch_index])
- hp->nb_remaining_tasks_per_arch_index[arch_index] += 1;
- }
- hp->total_tasks_in_buckets += 1;
- starpu_push_task_end(task);
- /*if there are no tasks_queue block */
- /* wake people waiting for a task */
- struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
- struct starpu_sched_ctx_iterator it;
- #ifndef STARPU_NON_BLOCKING_DRIVERS
- char dowake[STARPU_NMAXWORKERS] = { 0 };
- #endif
- workers->init_iterator(workers, &it);
- while(workers->has_next(workers, &it))
- {
- unsigned worker = workers->get_next(workers, &it);
- #ifdef STARPU_NON_BLOCKING_DRIVERS
- if (!starpu_bitmap_get(hp->waiters, worker))
- /* This worker is not waiting for a task */
- continue;
- #endif
- if (starpu_worker_can_execute_task_first_impl(worker, task, NULL))
- {
- /* It can execute this one, tell him! */
- #ifdef STARPU_NON_BLOCKING_DRIVERS
- starpu_bitmap_unset(hp->waiters, worker);
- /* We really woke at least somebody, no need to wake somebody else */
- break;
- #else
- dowake[worker] = 1;
- #endif
- }
- }
- /* Let the task free */
- STARPU_PTHREAD_MUTEX_UNLOCK(&hp->policy_mutex);
- #ifndef STARPU_NON_BLOCKING_DRIVERS
- /* Now that we have a list of potential workers, try to wake one */
- workers->init_iterator(workers, &it);
- while(workers->has_next(workers, &it))
- {
- unsigned worker = workers->get_next(workers, &it);
- if (dowake[worker])
- if (starpu_wake_worker(worker))
- break; // wake up a single worker
- }
- #endif
- return 0;
- }
- static struct starpu_task *pop_task_heteroprio_policy(unsigned sched_ctx_id)
- {
- const unsigned workerid = starpu_worker_get_id_check();
- struct _starpu_heteroprio_data *hp = (struct _starpu_heteroprio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
- struct _heteroprio_worker_wrapper* worker = &hp->workers_heteroprio[workerid];
- #ifdef STARPU_NON_BLOCKING_DRIVERS
- /* If no tasks available, no tasks in worker queue or some arch worker queue just return NULL */
- if (!STARPU_RUNNING_ON_VALGRIND
- && (hp->total_tasks_in_buckets == 0 || hp->nb_remaining_tasks_per_arch_index[worker->arch_index] == 0)
- && worker->tasks_queue->ntasks == 0 && hp->nb_prefetched_tasks_per_arch_index[worker->arch_index] == 0){
- return NULL;
- }
- if (!STARPU_RUNNING_ON_VALGRIND && starpu_bitmap_get(hp->waiters, workerid))
- {
- /* Nobody woke us, avoid bothering the mutex */
- return NULL;
- }
- #endif
- starpu_pthread_mutex_t *worker_sched_mutex;
- starpu_pthread_cond_t *worker_sched_cond;
- starpu_worker_get_sched_condition(workerid, &worker_sched_mutex, &worker_sched_cond);
- /* Note: Releasing this mutex before taking the victim mutex, to avoid interlock*/
- STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(worker_sched_mutex);
- STARPU_PTHREAD_MUTEX_LOCK(&hp->policy_mutex);
- /* keep track of the new added task to perfom real prefetch on node */
- unsigned nb_added_tasks = 0;
- /* Check that some tasks are available for the current worker arch */
- if( hp->nb_remaining_tasks_per_arch_index[worker->arch_index] != 0 )
- {
- /* Ideally we would like to fill the prefetch array */
- unsigned nb_tasks_to_prefetch = (STARPU_HETEROPRIO_MAX_PREFETCH-worker->tasks_queue->ntasks);
- /* But there are maybe less tasks than that! */
- if(nb_tasks_to_prefetch > hp->nb_remaining_tasks_per_arch_index[worker->arch_index])
- {
- nb_tasks_to_prefetch = hp->nb_remaining_tasks_per_arch_index[worker->arch_index];
- }
- /* But in case there are less tasks than worker we take the minimum */
- if(hp->nb_remaining_tasks_per_arch_index[worker->arch_index] < starpu_sched_ctx_get_nworkers(sched_ctx_id))
- {
- if(worker->tasks_queue->ntasks == 0)
- nb_tasks_to_prefetch = 1;
- else
- nb_tasks_to_prefetch = 0;
- }
- unsigned idx_prio, arch_index;
- /* We iterate until we found all the tasks we need */
- for(idx_prio = 0; nb_tasks_to_prefetch && idx_prio < hp->nb_prio_per_arch_index[worker->arch_index]; ++idx_prio)
- {
- /* Retrieve the bucket using the mapping */
- struct _heteroprio_bucket* bucket = &hp->buckets[hp->prio_mapping_per_arch_index[worker->arch_index][idx_prio]];
- /* Ensure we can compute task from this bucket */
- STARPU_ASSERT(bucket->valid_archs & worker->arch_type);
- /* Take nb_tasks_to_prefetch tasks if possible */
- while(!_starpu_fifo_empty(bucket->tasks_queue) && nb_tasks_to_prefetch &&
- (bucket->factor_base_arch_index == 0 ||
- worker->arch_index == bucket->factor_base_arch_index ||
- (((float)bucket->tasks_queue->ntasks)/((float)hp->nb_workers_per_arch_index[bucket->factor_base_arch_index])) >= bucket->slow_factors_per_index[worker->arch_index]))
- {
- struct starpu_task* task = _starpu_fifo_pop_local_task(bucket->tasks_queue);
- STARPU_ASSERT(starpu_worker_can_execute_task(workerid, task, 0));
- /* Save the task */
- STARPU_AYU_ADDTOTASKQUEUE(_starpu_get_job_associated_to_task(task)->job_id, workerid);
- _starpu_fifo_push_task(worker->tasks_queue, task);
- /* Update general counter */
- hp->nb_prefetched_tasks_per_arch_index[worker->arch_index] += 1;
- hp->total_tasks_in_buckets -= 1;
- for(arch_index = 0; arch_index < STARPU_NB_TYPES; ++arch_index)
- {
- /* We test the archs on the bucket and not on task->cl->where since it is restrictive */
- if(bucket->valid_archs & starpu_heteroprio_types_to_arch[arch_index])
- {
- hp->nb_remaining_tasks_per_arch_index[arch_index] -= 1;
- }
- }
- /* Decrease the number of tasks to found */
- nb_tasks_to_prefetch -= 1;
- nb_added_tasks += 1;
- // TODO starpu_prefetch_task_input_on_node(task, workerid);
- }
- }
- }
- struct starpu_task* task = NULL;
- /* The worker has some tasks in its queue */
- if(worker->tasks_queue->ntasks)
- {
- task = _starpu_fifo_pop_task(worker->tasks_queue, workerid);
- hp->nb_prefetched_tasks_per_arch_index[worker->arch_index] -= 1;
- }
- /* Otherwise look if we can steal some work */
- else if(hp->nb_prefetched_tasks_per_arch_index[worker->arch_index])
- {
- /* If HETEROPRIO_MAX_PREFETCH==1 it should not be possible to steal work */
- STARPU_ASSERT(STARPU_HETEROPRIO_MAX_PREFETCH != 1);
- struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
- struct starpu_sched_ctx_iterator it;
- workers->init_iterator(workers, &it);
- unsigned victim;
- unsigned current_worker;
- /* Start stealing from just after ourself */
- while(workers->has_next(workers, &it))
- {
- current_worker = workers->get_next(workers, &it);
- if(current_worker == workerid)
- break;
- }
- /* circular loop */
- while (1) {
- if (!workers->has_next(workers, &it))
- {
- /* End of the list, restart from the beginning */
- workers->init_iterator(workers, &it);
- }
- while(workers->has_next(workers, &it))
- {
- victim = workers->get_next(workers, &it);
- /* When getting on ourself again, we're done trying to find work */
- if(victim == workerid)
- goto done;
- /* If it is the same arch and there is a task to steal */
- if(hp->workers_heteroprio[victim].arch_index == worker->arch_index
- && hp->workers_heteroprio[victim].tasks_queue->ntasks){
- starpu_pthread_mutex_t *victim_sched_mutex;
- starpu_pthread_cond_t *victim_sched_cond;
- starpu_worker_get_sched_condition(victim, &victim_sched_mutex, &victim_sched_cond);
- /* ensure the worker is not currently prefetching its data */
- STARPU_PTHREAD_MUTEX_LOCK_SCHED(victim_sched_mutex);
- if(hp->workers_heteroprio[victim].arch_index == worker->arch_index
- && hp->workers_heteroprio[victim].tasks_queue->ntasks)
- {
- /* steal the last added task */
- task = _starpu_fifo_pop_task(hp->workers_heteroprio[victim].tasks_queue, workerid);
- /* we steal a task update global counter */
- hp->nb_prefetched_tasks_per_arch_index[hp->workers_heteroprio[victim].arch_index] -= 1;
- STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(victim_sched_mutex);
- goto done;
- }
- STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(victim_sched_mutex);
- }
- }
- }
- done: ;
- }
- if (!task)
- {
- /* Tell pushers that we are waiting for tasks_queue for us */
- starpu_bitmap_set(hp->waiters, workerid);
- }
- STARPU_PTHREAD_MUTEX_UNLOCK(&hp->policy_mutex);
- STARPU_PTHREAD_MUTEX_LOCK_SCHED(worker_sched_mutex);
- if(task)
- {
- unsigned child_sched_ctx = starpu_sched_ctx_worker_is_master_for_child_ctx(workerid, sched_ctx_id);
- if(child_sched_ctx != STARPU_NMAX_SCHED_CTXS)
- {
- starpu_sched_ctx_move_task_to_ctx(task, child_sched_ctx, 1, 1);
- starpu_sched_ctx_revert_task_counters(sched_ctx_id, task->flops);
- return NULL;
- }
- }
- /* if we have task (task) me way have some in the queue (worker->tasks_queue_size) that was freshly addeed (nb_added_tasks) */
- if(task && worker->tasks_queue->ntasks && nb_added_tasks && starpu_get_prefetch_flag())
- {
- const unsigned memory_node = starpu_worker_get_memory_node(workerid);
- /* TOTO berenger: iterate in the other sense */
- struct starpu_task *task_to_prefetch = NULL;
- for (task_to_prefetch = starpu_task_list_begin(&worker->tasks_queue->taskq);
- (task_to_prefetch != starpu_task_list_end(&worker->tasks_queue->taskq) &&
- nb_added_tasks && hp->nb_remaining_tasks_per_arch_index[worker->arch_index] != 0);
- task_to_prefetch = starpu_task_list_next(task_to_prefetch))
- {
- /* prefetch from closest to end task */
- starpu_prefetch_task_input_on_node(task_to_prefetch, memory_node);
- nb_added_tasks -= 1;
- }
- }
- return task;
- }
- struct starpu_sched_policy _starpu_sched_heteroprio_policy =
- {
- .init_sched = initialize_heteroprio_policy,
- .deinit_sched = deinitialize_heteroprio_policy,
- .add_workers = add_workers_heteroprio_policy,
- .remove_workers = remove_workers_heteroprio_policy,
- .push_task = push_task_heteroprio_policy,
- .simulate_push_task = NULL,
- .push_task_notify = NULL,
- .pop_task = pop_task_heteroprio_policy,
- .pre_exec_hook = NULL,
- .post_exec_hook = NULL,
- .pop_every_task = NULL,
- .policy_name = "heteroprio",
- .policy_description = "heteroprio",
- .worker_type = STARPU_WORKER_LIST,
- };
|