123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317 |
- /* StarPU --- Runtime system for heterogeneous multicore architectures.
- *
- * Copyright (C) 2011-2013,2015,2017,2018 Inria
- * Copyright (C) 2011-2016 Université de Bordeaux
- * Copyright (C) 2011-2014,2016-2018 CNRS
- * Copyright (C) 2013 Thibaut Lambert
- * Copyright (C) 2011 Télécom-SudParis
- *
- * StarPU is free software; you can redistribute it and/or modify
- * it under the terms of the GNU Lesser General Public License as published by
- * the Free Software Foundation; either version 2.1 of the License, or (at
- * your option) any later version.
- *
- * StarPU is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
- *
- * See the GNU Lesser General Public License in COPYING.LGPL for more details.
- */
- #include <sched_policies/fifo_queues.h>
- #include <core/detect_combined_workers.h>
- #include <starpu_scheduler.h>
- #include <core/workers.h>
- struct _starpu_peager_data
- {
- struct _starpu_fifo_taskq *fifo;
- struct _starpu_fifo_taskq *local_fifo[STARPU_NMAXWORKERS];
- starpu_pthread_mutex_t policy_mutex;
- int possible_combinations_cnt[STARPU_NMAXWORKERS];
- int *possible_combinations[STARPU_NMAXWORKERS];
- int *possible_combinations_size[STARPU_NMAXWORKERS];
- int max_combination_size[STARPU_NMAXWORKERS];
- };
- static void peager_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
- {
- _starpu_sched_find_worker_combinations(workerids, nworkers);
- struct _starpu_peager_data *data = (struct _starpu_peager_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
- const unsigned nbasic_workers = starpu_worker_get_count();
- const unsigned ncombined_workers = starpu_combined_worker_get_count();
- unsigned i;
- for(i = 0; i < nworkers; i++)
- {
- unsigned workerid = workerids[i];
- starpu_sched_ctx_worker_shares_tasks_lists(workerid, sched_ctx_id);
- data->possible_combinations_cnt[workerid] = 0;
- int cnt = data->possible_combinations_cnt[workerid]++;
- _STARPU_CALLOC(data->possible_combinations[workerid], ncombined_workers, sizeof(int));
- _STARPU_CALLOC(data->possible_combinations_size[workerid], ncombined_workers, sizeof(int));
- data->possible_combinations[workerid][cnt] = workerid;
- data->possible_combinations_size[workerid][cnt] = 1;
- data->max_combination_size[workerid] = 1;
- }
- for (i = 0; i < ncombined_workers; i++)
- {
- unsigned combined_workerid = nbasic_workers + i;
- int *workers;
- int size;
- starpu_combined_worker_get_description(combined_workerid, &size, &workers);
- int master = workers[0];
- if (size > data->max_combination_size[master])
- {
- data->max_combination_size[master] = size;
- }
- int cnt = data->possible_combinations_cnt[master]++;
- data->possible_combinations[master][cnt] = combined_workerid;
- data->possible_combinations_size[master][cnt] = size;
- }
- for(i = 0; i < nworkers; i++)
- {
- unsigned workerid = workerids[i];
- /* slaves pick up tasks from their local queue, their master
- * will put tasks directly in that local list when a parallel
- * tasks comes. */
- data->local_fifo[workerid] = _starpu_create_fifo();
- }
- }
- static void peager_remove_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
- {
- struct _starpu_peager_data *data = (struct _starpu_peager_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
- unsigned i;
- for(i = 0; i < nworkers; i++)
- {
- int workerid = workerids[i];
- if(!starpu_worker_is_combined_worker(workerid))
- {
- _starpu_destroy_fifo(data->local_fifo[workerid]);
- free(data->possible_combinations[workerid]);
- data->possible_combinations[workerid] = NULL;
- free(data->possible_combinations_size[workerid]);
- data->possible_combinations_size[workerid] = NULL;
- }
- }
- }
- static void initialize_peager_policy(unsigned sched_ctx_id)
- {
- struct _starpu_peager_data *data;
- _STARPU_MALLOC(data, sizeof(struct _starpu_peager_data));
- /* masters pick tasks from that queue */
- data->fifo = _starpu_create_fifo();
- starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)data);
- STARPU_PTHREAD_MUTEX_INIT(&data->policy_mutex, NULL);
- }
- static void deinitialize_peager_policy(unsigned sched_ctx_id)
- {
- /* TODO check that there is no task left in the queue */
- struct _starpu_peager_data *data = (struct _starpu_peager_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
- /* deallocate the job queue */
- _starpu_destroy_fifo(data->fifo);
- STARPU_PTHREAD_MUTEX_DESTROY(&data->policy_mutex);
- free(data);
- }
- static int push_task_peager_policy(struct starpu_task *task)
- {
- unsigned sched_ctx_id = task->sched_ctx;
- int ret_val;
- struct _starpu_peager_data *data = (struct _starpu_peager_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
- STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
- ret_val = _starpu_fifo_push_task(data->fifo, task);
- #ifndef STARPU_NON_BLOCKING_DRIVERS
- int is_parallel_task = task->cl && task->cl->max_parallelism > 1;
- #endif
- starpu_push_task_end(task);
- STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
- #ifndef STARPU_NON_BLOCKING_DRIVERS
- /* if there are no tasks block */
- /* wake people waiting for a task */
- struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
- struct starpu_sched_ctx_iterator it;
- const unsigned ncombined_workers = starpu_combined_worker_get_count();
- workers->init_iterator(workers, &it);
- while(workers->has_next(workers, &it))
- {
- int worker = workers->get_next(workers, &it);
- /* If this is not a CPU or a MIC, then the worker simply grabs tasks from the fifo */
- if (starpu_worker_is_combined_worker(worker))
- {
- continue;
- }
- if (starpu_worker_get_type(worker) != STARPU_MIC_WORKER
- && starpu_worker_get_type(worker) != STARPU_CPU_WORKER)
- {
- _starpu_wake_worker_relax_light(worker);
- continue;
- }
- if ((!is_parallel_task) /* This is not a parallel task, can wake any worker */
- || (ncombined_workers == 0) /* There is no combined worker */
- || (data->max_combination_size[worker] > 1) /* This is a combined worker master and the task is parallel */
- )
- {
- _starpu_wake_worker_relax_light(worker);
- }
- }
- #endif
- return ret_val;
- }
- static struct starpu_task *pop_task_peager_policy(unsigned sched_ctx_id)
- {
- struct _starpu_peager_data *data = (struct _starpu_peager_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
- int workerid = starpu_worker_get_id_check();
- /* If this is not a CPU or a MIC, then the worker simply grabs tasks from the fifo */
- if (starpu_worker_get_type(workerid) != STARPU_CPU_WORKER && starpu_worker_get_type(workerid) != STARPU_MIC_WORKER)
- {
- struct starpu_task *task = NULL;
- _starpu_worker_relax_on();
- STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
- _starpu_worker_relax_off();
- task = _starpu_fifo_pop_task(data->fifo, workerid);
- STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
- return task;
- }
- const unsigned ncombined_workers = starpu_combined_worker_get_count();
- struct starpu_task *task = NULL;
- int slave_task = 0;
- _starpu_worker_relax_on();
- STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
- _starpu_worker_relax_off();
- /* check if a slave task is available in the local queue */
- task = _starpu_fifo_pop_task(data->local_fifo[workerid], workerid);
- if (!task)
- {
- /* no slave task, try to pop a task as master */
- task = _starpu_fifo_pop_task(data->fifo, workerid);
- if (task)
- {
- _STARPU_DEBUG("poping master task %p\n", task);
- }
- #if 1
- /* Optional heuristic to filter out purely slave workers for parallel tasks */
- if (task && task->cl && task->cl->max_parallelism > 1 && data->max_combination_size[workerid] == 1 && ncombined_workers > 0)
- {
- /* task is potentially parallel, leave it for a combined worker master */
- _starpu_fifo_push_back_task(data->fifo, task);
- task = NULL;
- }
- #endif
- }
- else
- {
- slave_task = 1;
- _STARPU_DEBUG("poping slave task %p\n", task);
- }
- if (!task || slave_task)
- {
- STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
- goto ret;
- }
- /* Find the largest compatible worker combination */
- int best_size = -1;
- int best_workerid = -1;
- int i;
- for (i = 0; i < data->possible_combinations_cnt[workerid]; i++)
- {
- if (data->possible_combinations_size[workerid][i] > best_size)
- {
- int combined_worker = data->possible_combinations[workerid][i];
- if (starpu_combined_worker_can_execute_task(combined_worker, task, 0))
- {
- best_size = data->possible_combinations_size[workerid][i];
- best_workerid = combined_worker;
- }
- }
- }
- _STARPU_DEBUG("task %p, best_workerid=%d, best_size=%d\n", task, best_workerid, best_size);
- /* In case nobody can execute this task, we let the master
- * worker take it anyway, so that it can discard it afterward.
- * */
- if (best_workerid == -1)
- {
- STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
- goto ret;
- }
- /* Is this a basic worker or a combined worker ? */
- if (best_workerid < starpu_worker_get_count())
- {
- STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
- /* The master is alone */
- goto ret;
- }
- starpu_parallel_task_barrier_init(task, best_workerid);
- int worker_size = 0;
- int *combined_workerid;
- starpu_combined_worker_get_description(best_workerid, &worker_size, &combined_workerid);
- _STARPU_DEBUG("dispatching task %p on combined worker %d of size %d\n", task, best_workerid, worker_size);
- /* Dispatch task aliases to the different slaves */
- for (i = 1; i < worker_size; i++)
- {
- struct starpu_task *alias = starpu_task_dup(task);
- int local_worker = combined_workerid[i];
- alias->destroy = 1;
- _starpu_fifo_push_task(data->local_fifo[local_worker], alias);
- }
- /* The master also manipulated an alias */
- struct starpu_task *master_alias = starpu_task_dup(task);
- master_alias->destroy = 1;
- task = master_alias;
- STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
- for (i = 1; i < worker_size; i++)
- {
- int local_worker = combined_workerid[i];
- _starpu_worker_lock(local_worker);
- #if !defined(STARPU_NON_BLOCKING_DRIVERS) || defined(STARPU_SIMGRID)
- starpu_wake_worker_locked(local_worker);
- #endif
- _starpu_worker_unlock(local_worker);
- }
- ret:
- return task;
- }
- struct starpu_sched_policy _starpu_sched_peager_policy =
- {
- .init_sched = initialize_peager_policy,
- .deinit_sched = deinitialize_peager_policy,
- .add_workers = peager_add_workers,
- .remove_workers = peager_remove_workers,
- .push_task = push_task_peager_policy,
- .pop_task = pop_task_peager_policy,
- .pre_exec_hook = NULL,
- .post_exec_hook = NULL,
- .pop_every_task = NULL,
- .policy_name = "peager",
- .policy_description = "parallel eager policy",
- .worker_type = STARPU_WORKER_LIST,
- };
|