123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375 |
- /*
- * StarPU
- * Copyright (C) Université Bordeaux 1, CNRS 2008-2010 (see AUTHORS file)
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU Lesser General Public License as published by
- * the Free Software Foundation; either version 2.1 of the License, or (at
- * your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
- *
- * See the GNU Lesser General Public License in COPYING.LGPL for more details.
- */
- /* Distributed queues using performance modeling to assign tasks */
- #include <float.h>
- #include <limits.h>
- #include <core/workers.h>
- #include <sched_policies/fifo_queues.h>
- #include <core/perfmodel/perfmodel.h>
- static pthread_mutex_t big_lock;
- static unsigned nworkers, ncombinedworkers;
- static enum starpu_perf_archtype applicable_perf_archtypes[STARPU_NARCH_VARIATIONS];
- static unsigned napplicable_perf_archtypes = 0;
- static struct starpu_fifo_taskq_s *queue_array[STARPU_NMAXWORKERS];
- static pthread_cond_t sched_cond[STARPU_NMAXWORKERS];
- static pthread_mutex_t sched_mutex[STARPU_NMAXWORKERS];
- static double alpha = 1.0;
- static double beta = 1.0;
- static struct starpu_task *parallel_heft_pop_task(void)
- {
- struct starpu_task *task;
- int workerid = starpu_worker_get_id();
- struct starpu_fifo_taskq_s *fifo = queue_array[workerid];
- task = _starpu_fifo_pop_task(fifo, -1);
- if (task) {
- double model = task->predicted;
-
- fifo->exp_len -= model;
- fifo->exp_start = _starpu_timing_now() + model;
- fifo->exp_end = fifo->exp_start + fifo->exp_len;
- }
- return task;
- }
- static int push_task_on_best_worker(struct starpu_task *task, int best_workerid, double predicted, int prio)
- {
- /* make sure someone coule execute that task ! */
- STARPU_ASSERT(best_workerid != -1);
- /* Is this a basic worker or a combined worker ? */
- int nbasic_workers = (int)starpu_worker_get_count();
- int is_basic_worker = (best_workerid < nbasic_workers);
- unsigned memory_node;
- memory_node = starpu_worker_get_memory_node(best_workerid);
- if (_starpu_get_prefetch_flag())
- _starpu_prefetch_task_input_on_node(task, memory_node);
- if (is_basic_worker)
- {
- PTHREAD_MUTEX_LOCK(&big_lock);
- struct starpu_fifo_taskq_s *fifo;
- fifo = queue_array[best_workerid];
-
- fifo->exp_end += predicted;
- fifo->exp_len += predicted;
-
- task->predicted = predicted;
-
- int ret;
- if (prio)
- {
- ret = _starpu_fifo_push_prio_task(queue_array[best_workerid],
- &sched_mutex[best_workerid], &sched_cond[best_workerid], task);
- }
- else {
- ret = _starpu_fifo_push_task(queue_array[best_workerid],
- &sched_mutex[best_workerid], &sched_cond[best_workerid], task);
- }
- PTHREAD_MUTEX_UNLOCK(&big_lock);
- return ret;
- }
- else {
- /* This is a combined worker so we create task aliases */
- struct starpu_combined_worker_s *combined_worker;
- combined_worker = _starpu_get_combined_worker_struct(best_workerid);
- int worker_size = combined_worker->worker_size;
- int *combined_workerid = combined_worker->combined_workerid;
- int ret = 0;
- int i;
-
- task->predicted = predicted;
- starpu_job_t j = _starpu_get_job_associated_to_task(task);
- j->task_size = worker_size;
- j->combined_workerid = best_workerid;
- j->active_task_alias_count = 0;
- PTHREAD_BARRIER_INIT(&j->before_work_barrier, NULL, worker_size);
- PTHREAD_BARRIER_INIT(&j->after_work_barrier, NULL, worker_size);
- PTHREAD_MUTEX_LOCK(&big_lock);
- for (i = 0; i < worker_size; i++)
- {
- struct starpu_task *alias = _starpu_create_task_alias(task);
- int local_worker = combined_workerid[i];
- struct starpu_fifo_taskq_s *fifo;
- fifo = queue_array[local_worker];
-
- fifo->exp_end += predicted;
- fifo->exp_len += predicted;
-
- alias->predicted = predicted;
-
- if (prio)
- {
- ret |= _starpu_fifo_push_prio_task(queue_array[local_worker],
- &sched_mutex[local_worker], &sched_cond[local_worker], alias);
- }
- else {
- ret |= _starpu_fifo_push_task(queue_array[local_worker],
- &sched_mutex[local_worker], &sched_cond[local_worker], alias);
- }
- }
- PTHREAD_MUTEX_UNLOCK(&big_lock);
- return ret;
- }
- }
- static double compute_expected_end(int workerid, double length)
- {
- if (workerid < (int)nworkers)
- {
- /* This is a basic worker */
- struct starpu_fifo_taskq_s *fifo;
- fifo = queue_array[workerid];
- return (fifo->exp_start + fifo->exp_len + length);
- }
- else {
- /* This is a combined worker, the expected end is the end for the latest worker */
- int worker_size;
- int *combined_workerid;
- starpu_combined_worker_get_description(workerid, &worker_size, &combined_workerid);
- double exp_end = DBL_MIN;
- int i;
- for (i = 0; i < worker_size; i++)
- {
- struct starpu_fifo_taskq_s *fifo;
- fifo = queue_array[combined_workerid[i]];
- double local_exp_end = (fifo->exp_start + fifo->exp_len + length);
- exp_end = STARPU_MAX(exp_end, local_exp_end);
- }
- return exp_end;
- }
- }
- static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio)
- {
- /* find the queue */
- struct starpu_fifo_taskq_s *fifo;
- unsigned worker;
- int best = -1;
-
- /* this flag is set if the corresponding worker is selected because
- there is no performance prediction available yet */
- int forced_best = -1;
- double local_task_length[nworkers+ncombinedworkers];
- double local_data_penalty[nworkers+ncombinedworkers];
- double exp_end[nworkers+ncombinedworkers];
- double fitness[nworkers+ncombinedworkers];
- int skip_worker[nworkers+ncombinedworkers];
- double best_exp_end = DBL_MAX;
- double model_best = 0.0;
- double penality_best = 0.0;
- for (worker = 0; worker < nworkers; worker++)
- {
- fifo = queue_array[worker];
- fifo->exp_start = STARPU_MAX(fifo->exp_start, _starpu_timing_now());
- fifo->exp_end = STARPU_MAX(fifo->exp_end, _starpu_timing_now());
- }
- for (worker = 0; worker < (nworkers+ncombinedworkers); worker++)
- {
- if (!_starpu_combined_worker_may_execute_task(worker, task))
- {
- /* no one on that queue may execute this task */
- skip_worker[worker] = 1;
- continue;
- }
- else {
- skip_worker[worker] = 0;
- }
- enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(worker);
- local_task_length[worker] = _starpu_task_expected_length(task, perf_arch);
- unsigned memory_node = starpu_worker_get_memory_node(worker);
- local_data_penalty[worker] = _starpu_data_expected_penalty(memory_node, task);
- if (local_task_length[worker] == -1.0)
- {
- forced_best = worker;
- break;
- }
- exp_end[worker] = compute_expected_end(worker, local_task_length[worker]);
- if (exp_end[worker] < best_exp_end)
- {
- /* a better solution was found */
- best_exp_end = exp_end[worker];
- }
- }
- double best_fitness = -1;
-
- if (forced_best == -1)
- {
- for (worker = 0; worker < nworkers+ncombinedworkers; worker++)
- {
- if (skip_worker[worker])
- {
- /* no one on that queue may execute this task */
- continue;
- }
-
- fitness[worker] = alpha*(exp_end[worker] - best_exp_end)
- + beta*(local_data_penalty[worker]);
- if (best == -1 || fitness[worker] < best_fitness)
- {
- /* we found a better solution */
- best_fitness = fitness[worker];
- best = worker;
- }
- }
- }
- STARPU_ASSERT(forced_best != -1 || best != -1);
-
- if (forced_best != -1)
- {
- /* there is no prediction available for that task
- * with that arch we want to speed-up calibration time
- * so we force this measurement */
- best = worker;
- model_best = 0.0;
- penality_best = 0.0;
- }
- else
- {
- model_best = local_task_length[best];
- penality_best = local_data_penalty[best];
- }
- /* we should now have the best worker in variable "best" */
- return push_task_on_best_worker(task, best, model_best, prio);
- }
- static int parallel_heft_push_prio_task(struct starpu_task *task)
- {
- return _parallel_heft_push_task(task, 1);
- }
- static int parallel_heft_push_task(struct starpu_task *task)
- {
- if (task->priority == STARPU_MAX_PRIO)
- return _parallel_heft_push_task(task, 1);
- return _parallel_heft_push_task(task, 0);
- }
- static void initialize_parallel_heft_policy(struct starpu_machine_topology_s *topology,
- __attribute__ ((unused)) struct starpu_sched_policy_s *_policy)
- {
- nworkers = topology->nworkers;
- const char *strval_alpha = getenv("STARPU_SCHED_ALPHA");
- if (strval_alpha)
- beta = atof(strval_alpha);
- const char *strval_beta = getenv("STARPU_SCHED_BETA");
- if (strval_beta)
- beta = atof(strval_beta);
- _starpu_sched_find_worker_combinations(topology);
- ncombinedworkers = topology->ncombinedworkers;
- unsigned workerid;
- for (workerid = 0; workerid < nworkers; workerid++)
- {
- queue_array[workerid] = _starpu_create_fifo();
-
- PTHREAD_MUTEX_INIT(&sched_mutex[workerid], NULL);
- PTHREAD_COND_INIT(&sched_cond[workerid], NULL);
-
- starpu_worker_set_sched_condition(workerid, &sched_cond[workerid], &sched_mutex[workerid]);
- }
- PTHREAD_MUTEX_INIT(&big_lock, NULL);
- /* We pre-compute an array of all the perfmodel archs that are applicable */
- unsigned total_worker_count = nworkers + ncombinedworkers;
- unsigned used_perf_archtypes[STARPU_NARCH_VARIATIONS];
- memset(used_perf_archtypes, 0, sizeof(used_perf_archtypes));
- for (workerid = 0; workerid < total_worker_count; workerid++)
- {
- enum starpu_perf_archtype perf_archtype = starpu_worker_get_perf_archtype(workerid);
- used_perf_archtypes[perf_archtype] = 1;
- }
- napplicable_perf_archtypes = 0;
- int arch;
- for (arch = 0; arch < STARPU_NARCH_VARIATIONS; arch++)
- {
- if (used_perf_archtypes[arch])
- applicable_perf_archtypes[napplicable_perf_archtypes++] = arch;
- }
- }
- static void deinitialize_parallel_heft_policy(struct starpu_machine_topology_s *topology,
- __attribute__ ((unused)) struct starpu_sched_policy_s *_policy)
- {
- unsigned workerid;
- for (workerid = 0; workerid < topology->nworkers; workerid++)
- _starpu_destroy_fifo(queue_array[workerid]);
- }
- struct starpu_sched_policy_s _starpu_sched_parallel_heft_policy = {
- .init_sched = initialize_parallel_heft_policy,
- .deinit_sched = deinitialize_parallel_heft_policy,
- .push_task = parallel_heft_push_task,
- .push_prio_task = parallel_heft_push_prio_task,
- .pop_task = parallel_heft_pop_task,
- .post_exec_hook = NULL,
- .pop_every_task = NULL,
- .policy_name = "pheft",
- .policy_description = "parallel HEFT"
- };
|