123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567 |
- /* StarPU --- Runtime system for heterogeneous multicore architectures.
- *
- * Copyright (C) 2013-2020 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
- * Copyright (C) 2013 Simon Archipoff
- * Copyright (C) 2020 Télécom-Sud Paris
- *
- * StarPU is free software; you can redistribute it and/or modify
- * it under the terms of the GNU Lesser General Public License as published by
- * the Free Software Foundation; either version 2.1 of the License, or (at
- * your option) any later version.
- *
- * StarPU is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
- *
- * See the GNU Lesser General Public License in COPYING.LGPL for more details.
- */
- /* Heteroprio, which sorts tasks by acceleration factor into buckets, and makes
- * GPUs take accelerated tasks first and CPUs take non-accelerated tasks first */
- #include <starpu_sched_component.h>
- #include "prio_deque.h"
- #include <starpu_perfmodel.h>
- #include "helper_mct.h"
- #include <float.h>
- #include <core/sched_policy.h>
- #include <core/task.h>
- /* Approximation ratio for acceleration factor bucketing
- * We will put tasks with +-10% similar acceleration into the same bucket. */
- #define APPROX 0.10
- struct _starpu_heteroprio_data
- {
- /* This is an array of priority queues.
- * The array is sorted by acceleration factor, most accelerated first */
- struct _starpu_prio_deque **bucket;
- float *accel;
- unsigned naccel;
- /* This contains tasks which are not supported on all archs. */
- struct _starpu_prio_deque no_accel;
- /* This protects all queues */
- starpu_pthread_mutex_t mutex;
- struct _starpu_mct_data *mct_data;
- unsigned batch;
- };
- static int heteroprio_progress_accel(struct starpu_sched_component *component, struct _starpu_heteroprio_data *data, enum starpu_worker_archtype archtype, int front)
- {
- struct starpu_task *task = NULL;
- starpu_pthread_mutex_t * mutex = &data->mutex;
- int j, ret = 1;
- double acceleration = INFINITY;
- struct _starpu_mct_data * d = data->mct_data;
- STARPU_COMPONENT_MUTEX_LOCK(mutex);
- if (front)
- /* Pick up accelerated tasks first */
- for (j = 0; j < (int) data->naccel; j++)
- {
- task = _starpu_prio_deque_pop_task(data->bucket[j]);
- if (task)
- break;
- }
- else
- /* Pick up accelerated tasks last */
- for (j = (int) data->naccel-1; j >= 0; j--)
- {
- if (data->batch && 0)
- task = _starpu_prio_deque_pop_back_task(data->bucket[j]);
- else
- task = _starpu_prio_deque_pop_task(data->bucket[j]);
- if (task)
- break;
- }
- if (task)
- {
- acceleration = data->accel[j];
- //fprintf(stderr, "for %s thus %s, found task %p in bucket %d: %f\n", starpu_worker_get_type_as_string(archtype), front?"front":"back", task, j, acceleration);
- }
- STARPU_COMPONENT_MUTEX_UNLOCK(mutex);
- if (!task)
- return 1;
- if (data->batch)
- /* In batch mode the fifos below do not use priorities. Do not
- * leak a priority for the data prefetches either */
- task->priority = INT_MAX;
- /* TODO: we might want to prefer to pick up a task whose data is already on some GPU */
- struct starpu_sched_component * best_component;
- /* Estimated task duration for each child */
- double estimated_lengths[component->nchildren];
- /* Estimated transfer duration for each child */
- double estimated_transfer_length[component->nchildren];
- /* Estimated transfer+task termination for each child */
- double estimated_ends_with_task[component->nchildren];
- /* provided local energy */
- double local_energy[component->nchildren];
- /* Minimum transfer+task termination of the task over all workers */
- double min_exp_end_of_task;
- /* Maximum termination of the already-scheduled tasks over all workers */
- double max_exp_end_of_workers;
- unsigned suitable_components[component->nchildren];
- unsigned nsuitable_components;
- nsuitable_components = starpu_mct_compute_execution_times(component, task,
- estimated_lengths,
- estimated_transfer_length,
- suitable_components);
- if (data->batch && 0)
- {
- /* In batch mode, we may want to insist on filling workers with tasks
- * by ignoring when other workers would finish this. */
- unsigned i;
- for (i = 0; i < component->nchildren; i++)
- {
- int idworker;
- for(idworker = starpu_bitmap_first(&component->children[i]->workers);
- idworker != -1;
- idworker = starpu_bitmap_next(&component->children[i]->workers, idworker))
- {
- if (starpu_worker_get_type(idworker) == archtype)
- break;
- }
- if (idworker == -1)
- {
- /* Not the targetted arch, avoid it */
- /* XXX: INFINITY doesn't seem to be working properly */
- estimated_lengths[i] = 1000000000;
- estimated_transfer_length[i] = 1000000000;
- }
- }
- }
- /* Entering critical section to make sure no two workers
- make scheduling decisions at the same time */
- STARPU_COMPONENT_MUTEX_LOCK(&d->scheduling_mutex);
- starpu_mct_compute_expected_times(component, task,
- estimated_lengths,
- estimated_transfer_length,
- estimated_ends_with_task,
- &min_exp_end_of_task, &max_exp_end_of_workers,
- suitable_components, nsuitable_components);
- /* Compute the energy, if provided*/
- starpu_mct_compute_energy(component, task, local_energy, suitable_components, nsuitable_components);
- /* And now find out which worker suits best for this task,
- * including data transfer */
- int best_icomponent = starpu_mct_get_best_component(d, task,
- estimated_lengths,
- estimated_transfer_length,
- estimated_ends_with_task,
- local_energy,
- min_exp_end_of_task, max_exp_end_of_workers,
- suitable_components, nsuitable_components);
- if (best_icomponent == -1)
- goto out;
- best_component = component->children[best_icomponent];
- int idworker;
- for(idworker = starpu_bitmap_first(&best_component->workers);
- idworker != -1;
- idworker = starpu_bitmap_next(&best_component->workers, idworker))
- {
- if (starpu_worker_get_type(idworker) == archtype)
- break;
- }
- if (idworker == -1)
- goto out;
- /* Ok, we do have a worker there of that type, try to push it there. */
- STARPU_ASSERT(!starpu_sched_component_is_worker(best_component));
- starpu_sched_task_break(task);
- ret = starpu_sched_component_push_task(component,best_component,task);
- /* I can now exit the critical section: Pushing the task above ensures that its execution
- time will be taken into account for subsequent scheduling decisions */
- if (!ret)
- {
- STARPU_COMPONENT_MUTEX_UNLOCK(&d->scheduling_mutex);
- //fprintf(stderr, "pushed %p to %d\n", task, best_icomponent);
- /* Great! */
- return 0;
- }
- out:
- STARPU_COMPONENT_MUTEX_UNLOCK(&d->scheduling_mutex);
- /* No such kind of worker there, or it refused our task, abort */
- //fprintf(stderr, "could not push %p to %d actually\n", task, best_icomponent);
- /* Could not push to child actually, push that one back */
- STARPU_COMPONENT_MUTEX_LOCK(mutex);
- for (j = 0; j < (int) data->naccel; j++)
- {
- if (acceleration == data->accel[j])
- {
- _starpu_prio_deque_push_front_task(data->bucket[j], task);
- break;
- }
- }
- STARPU_ASSERT(j != (int) data->naccel);
- STARPU_COMPONENT_MUTEX_UNLOCK(mutex);
- //fprintf(stderr, "finished pushing to %d\n", archtype);
- return 1;
- }
- static int heteroprio_progress_noaccel(struct starpu_sched_component *component, struct _starpu_heteroprio_data *data, struct starpu_task *task)
- {
- struct _starpu_mct_data * d = data->mct_data;
- int ret;
- struct starpu_sched_component * best_component;
- /* Estimated task duration for each child */
- double estimated_lengths[component->nchildren];
- /* Estimated transfer duration for each child */
- double estimated_transfer_length[component->nchildren];
- /* Estimated transfer+task termination for each child */
- double estimated_ends_with_task[component->nchildren];
- /* estimated energy */
- double local_energy[component->nchildren];
- /* Minimum transfer+task termination of the task over all workers */
- double min_exp_end_of_task;
- /* Maximum termination of the already-scheduled tasks over all workers */
- double max_exp_end_of_workers;
- unsigned suitable_components[component->nchildren];
- unsigned nsuitable_components;
- nsuitable_components = starpu_mct_compute_execution_times(component, task,
- estimated_lengths,
- estimated_transfer_length,
- suitable_components);
- /* If no suitable components were found, it means that the perfmodel of
- * the task had been purged since it has been pushed on the mct component.
- * We should send a push_fail message to its parent so that it will
- * be able to reschedule the task properly. */
- if(nsuitable_components == 0)
- return 1;
- /* Entering critical section to make sure no two workers
- make scheduling decisions at the same time */
- STARPU_COMPONENT_MUTEX_LOCK(&d->scheduling_mutex);
- starpu_mct_compute_expected_times(component, task,
- estimated_lengths,
- estimated_transfer_length,
- estimated_ends_with_task,
- &min_exp_end_of_task, &max_exp_end_of_workers,
- suitable_components, nsuitable_components);
- /* Compute the energy, if provided*/
- starpu_mct_compute_energy(component, task, local_energy, suitable_components, nsuitable_components);
-
- /* And now find out which worker suits best for this task,
- * including data transfer */
- int best_icomponent = starpu_mct_get_best_component(d, task,
- estimated_lengths,
- estimated_transfer_length,
- estimated_ends_with_task,
- local_energy,
- min_exp_end_of_task, max_exp_end_of_workers,
- suitable_components, nsuitable_components);
- /* If no best component is found, it means that the perfmodel of
- * the task had been purged since it has been pushed on the mct component.
- * We should send a push_fail message to its parent so that it will
- * be able to reschedule the task properly. */
- if(best_icomponent == -1)
- {
- STARPU_COMPONENT_MUTEX_UNLOCK(&d->scheduling_mutex);
- return 1;
- }
- best_component = component->children[best_icomponent];
- STARPU_ASSERT(!starpu_sched_component_is_worker(best_component));
- ret = starpu_sched_component_push_task(component,best_component,task);
- STARPU_COMPONENT_MUTEX_UNLOCK(&d->scheduling_mutex);
- return ret;
- }
- static int heteroprio_progress_one(struct starpu_sched_component *component)
- {
- struct _starpu_heteroprio_data * data = component->data;
- starpu_pthread_mutex_t * mutex = &data->mutex;
- struct starpu_task *task;
- struct _starpu_prio_deque * no_accel = &data->no_accel;
- STARPU_COMPONENT_MUTEX_LOCK(mutex);
- task = _starpu_prio_deque_pop_task(no_accel);
- STARPU_COMPONENT_MUTEX_UNLOCK(mutex);
- if (task)
- {
- if (heteroprio_progress_noaccel(component, data, task))
- {
- /* Could not push to child actually, push that one back */
- STARPU_COMPONENT_MUTEX_LOCK(mutex);
- _starpu_prio_deque_push_front_task(no_accel, task);
- STARPU_COMPONENT_MUTEX_UNLOCK(mutex);
- }
- }
- /* Note: this hardcodes acceleration order */
- if (!heteroprio_progress_accel(component, data, STARPU_CUDA_WORKER, 1))
- return 0;
- if (!heteroprio_progress_accel(component, data, STARPU_OPENCL_WORKER, 1))
- return 0;
- if (!heteroprio_progress_accel(component, data, STARPU_MIC_WORKER, 1))
- return 0;
- if (!heteroprio_progress_accel(component, data, STARPU_MPI_MS_WORKER, 0))
- return 0;
- if (!heteroprio_progress_accel(component, data, STARPU_CPU_WORKER, 0))
- return 0;
- return 1;
- }
- /* Try to push some tasks below */
- static void heteroprio_progress(struct starpu_sched_component *component)
- {
- STARPU_ASSERT(component && starpu_sched_component_is_heteroprio(component));
- while (!heteroprio_progress_one(component))
- ;
- }
- static int heteroprio_push_task(struct starpu_sched_component * component, struct starpu_task * task)
- {
- STARPU_ASSERT(component && task && starpu_sched_component_is_heteroprio(component));
- struct _starpu_heteroprio_data * data = component->data;
- starpu_pthread_mutex_t * mutex = &data->mutex;
- unsigned nimpl;
- double min_expected = INFINITY, max_expected = -INFINITY;
- double acceleration;
- if (data->batch && 0)
- /* Batch mode, we may want to ignore priorities completely */
- task->priority = INT_MAX;
- /* Compute acceleration between best-performing arch and least-performing arch */
- int workerid;
- for(workerid = starpu_bitmap_first(&component->workers_in_ctx);
- workerid != -1;
- workerid = starpu_bitmap_next(&component->workers_in_ctx, workerid))
- {
- unsigned impl_mask;
- if (!starpu_worker_can_execute_task_impl(workerid, task, &impl_mask))
- break;
- struct starpu_perfmodel_arch* perf_arch = starpu_worker_get_perf_archtype(workerid, task->sched_ctx);
- double min_arch = INFINITY;
- for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
- {
- if (!(impl_mask & (1U << nimpl)))
- continue;
- double expected = starpu_task_expected_length(task, perf_arch, nimpl);
- if (isnan(expected) || expected == 0.)
- {
- min_arch = expected;
- break;
- }
- if (expected < min_arch)
- min_arch = expected;
- }
- if (isnan(min_arch) || min_arch == 0.)
- /* No known execution time, can't do anything here */
- break;
- STARPU_ASSERT(min_arch != INFINITY);
- if (min_arch < min_expected)
- min_expected = min_arch;
- if (min_arch > max_expected)
- max_expected = min_arch;
- }
- if (workerid == -1)
- {
- /* All archs can run it */
- STARPU_ASSERT(!isnan(min_expected));
- STARPU_ASSERT(!isnan(max_expected));
- STARPU_ASSERT(min_expected != INFINITY);
- STARPU_ASSERT(max_expected != -INFINITY);
- acceleration = max_expected / min_expected;
- STARPU_ASSERT(!isnan(acceleration));
- //fprintf(stderr,"%s: acceleration %f\n", starpu_task_get_name(task), acceleration);
- STARPU_COMPONENT_MUTEX_LOCK(mutex);
- unsigned i, j;
- /* Try to find a bucket with similar acceleration */
- for (i = 0; i < data->naccel; i++)
- {
- if (acceleration >= data->accel[i] * (1 - APPROX) &&
- acceleration <= data->accel[i] * (1 + APPROX))
- break;
- }
- if (i == data->naccel)
- {
- /* Didn't find it, add one */
- data->naccel++;
- float *newaccel;
- _STARPU_MALLOC(newaccel, data->naccel * sizeof(*newaccel));
- struct _starpu_prio_deque **newbuckets;
- _STARPU_MALLOC(newbuckets, data->naccel * sizeof(*newbuckets));
- struct _starpu_prio_deque *newbucket;
- _STARPU_MALLOC(newbucket, sizeof(*newbucket));
- _starpu_prio_deque_init(newbucket);
- int inserted = 0;
- for (j = 0; j < data->naccel-1; j++)
- {
- if (!inserted && acceleration > data->accel[j])
- {
- /* Insert the new bucket here */
- i = j;
- newbuckets[j] = newbucket;
- newaccel[j] = acceleration;
- inserted = 1;
- }
- newbuckets[j+inserted] = data->bucket[j];
- newaccel[j+inserted] = data->accel[j];
- }
- if (!inserted)
- {
- /* Insert it last */
- newbuckets[data->naccel-1] = newbucket;
- newaccel[data->naccel-1] = acceleration;
- }
- free(data->bucket);
- free(data->accel);
- data->bucket = newbuckets;
- data->accel = newaccel;
- }
- #if 0
- fprintf(stderr,"buckets:");
- for (j = 0; j < data->naccel; j++)
- {
- fprintf(stderr, " %f", data->accel[j]);
- }
- fprintf(stderr,"\ninserting %p %f to %d\n", task, acceleration, i);
- #endif
- _starpu_prio_deque_push_back_task(data->bucket[i],task);
- STARPU_COMPONENT_MUTEX_UNLOCK(mutex);
- }
- else
- {
- /* Not all archs can run it, will resort to HEFT strategy */
- acceleration = INFINITY;
- //fprintf(stderr,"%s: some archs can't do it\n", starpu_task_get_name(task));
- struct _starpu_prio_deque * no_accel = &data->no_accel;
- STARPU_COMPONENT_MUTEX_LOCK(mutex);
- _starpu_prio_deque_push_back_task(no_accel,task);
- STARPU_COMPONENT_MUTEX_UNLOCK(mutex);
- }
- heteroprio_progress(component);
- return 0;
- }
- static int heteroprio_can_push(struct starpu_sched_component *component, struct starpu_sched_component * to STARPU_ATTRIBUTE_UNUSED)
- {
- heteroprio_progress(component);
- int ret = 0;
- unsigned j;
- for(j=0; j < component->nparents; j++)
- {
- if(component->parents[j] == NULL)
- continue;
- else
- {
- ret = component->parents[j]->can_push(component->parents[j], component);
- if(ret)
- break;
- }
- }
- return ret;
- }
- static void heteroprio_component_deinit_data(struct starpu_sched_component * component)
- {
- STARPU_ASSERT(starpu_sched_component_is_heteroprio(component));
- struct _starpu_heteroprio_data * d = component->data;
- struct _starpu_mct_data * mct_d = d->mct_data;
- unsigned i;
- for (i = 0; i < d->naccel; i++)
- {
- _starpu_prio_deque_destroy(d->bucket[i]);
- free(d->bucket[i]);
- }
- free(d->bucket);
- free(d->accel);
- _starpu_prio_deque_destroy(&d->no_accel);
- STARPU_PTHREAD_MUTEX_DESTROY(&d->mutex);
- STARPU_PTHREAD_MUTEX_DESTROY(&mct_d->scheduling_mutex);
- free(mct_d);
- free(d);
- }
- int starpu_sched_component_is_heteroprio(struct starpu_sched_component * component)
- {
- return component->push_task == heteroprio_push_task;
- }
- struct starpu_sched_component * starpu_sched_component_heteroprio_create(struct starpu_sched_tree *tree, struct starpu_sched_component_heteroprio_data * params)
- {
- struct starpu_sched_component * component = starpu_sched_component_create(tree, "heteroprio");
- struct _starpu_mct_data *mct_data = starpu_mct_init_parameters(params ? params->mct : NULL);
- struct _starpu_heteroprio_data *data;
- _STARPU_MALLOC(data, sizeof(*data));
- data->bucket = NULL;
- data->accel = NULL;
- data->naccel = 0;
- _starpu_prio_deque_init(&data->no_accel);
- STARPU_PTHREAD_MUTEX_INIT(&data->mutex,NULL);
- data->mct_data = mct_data;
- STARPU_PTHREAD_MUTEX_INIT(&mct_data->scheduling_mutex,NULL);
- if (params)
- data->batch = params->batch;
- else
- data->batch = 1;
- component->data = data;
- component->push_task = heteroprio_push_task;
- component->can_push = heteroprio_can_push;
- component->deinit_data = heteroprio_component_deinit_data;
- return component;
- }
|