123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405 |
- /*
- * StarPU
- * Copyright (C) INRIA 2008-2009 (see AUTHORS file)
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU Lesser General Public License as published by
- * the Free Software Foundation; either version 2.1 of the License, or (at
- * your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
- *
- * See the GNU Lesser General Public License in COPYING.LGPL for more details.
- */
- #include <core/workers.h>
- /* XXX quick and dirty implementation for now ... */
- pthread_key_t local_workers_key;
- static struct machine_config_s config;
- /* in case a task is submitted, we may check whether there exists a worker
- that may execute the task or not */
- static uint32_t worker_mask = 0;
- inline uint32_t worker_exists(uint32_t task_mask)
- {
- return (task_mask & worker_mask);
- }
- inline uint32_t may_submit_cuda_task(void)
- {
- return ((CUDA|CUBLAS) & worker_mask);
- }
- inline uint32_t may_submit_core_task(void)
- {
- return (CORE & worker_mask);
- }
- #ifdef USE_CPUS
- static unsigned ncores;
- #endif
- #ifdef USE_CUDA
- static unsigned ncudagpus;
- #endif
- #ifdef USE_GORDON
- static unsigned ngordon_spus;
- #endif
- /*
- * Runtime initialization methods
- */
- #ifdef USE_CUDA
- extern unsigned get_cuda_device_count(void);
- #endif
- static void init_machine_config(struct machine_config_s *config)
- {
- int envval __attribute__((unused));
- unsigned use_accelerator = 0;
- config->nworkers = 0;
- #ifdef USE_CUDA
- /* we need to initialize CUDA early to count the number of devices */
- init_cuda();
- envval = starpu_get_env_number("NCUDA");
- if (envval < 0) {
- ncudagpus = STARPU_MIN(get_cuda_device_count(), MAXCUDADEVS);
- } else {
- /* use the specified value */
- ncudagpus = (unsigned)envval;
- STARPU_ASSERT(ncudagpus <= MAXCUDADEVS);
- }
- STARPU_ASSERT(ncudagpus + config->nworkers <= NMAXWORKERS);
- if (ncudagpus > 0)
- use_accelerator = 1;
- unsigned cudagpu;
- for (cudagpu = 0; cudagpu < ncudagpus; cudagpu++)
- {
- config->workers[config->nworkers + cudagpu].arch = CUDA_WORKER;
- config->workers[config->nworkers + cudagpu].perf_arch = STARPU_CUDA_DEFAULT;
- config->workers[config->nworkers + cudagpu].id = cudagpu;
- worker_mask |= (CUDA|CUBLAS);
- }
- config->nworkers += ncudagpus;
- #endif
-
- #ifdef USE_GORDON
- envval = starpu_get_env_number("NGORDON");
- if (envval < 0) {
- ngordon_spus = spe_cpu_info_get(SPE_COUNT_USABLE_SPES, -1);
- } else {
- /* use the specified value */
- ngordon_spus = (unsigned)envval;
- STARPU_ASSERT(ngordon_spus <= NMAXGORDONSPUS);
- }
- STARPU_ASSERT(ngordon_spus + config->nworkers <= NMAXWORKERS);
- if (ngordon_spus > 0)
- use_accelerator = 1;
- unsigned spu;
- for (spu = 0; spu < ngordon_spus; spu++)
- {
- config->workers[config->nworkers + spu].arch = GORDON_WORKER;
- config->workers[config->nworkers + spu].perf_arch = STARPU_GORDON_DEFAULT;
- config->workers[config->nworkers + spu].id = spu;
- config->workers[config->nworkers + spu].worker_is_running = 0;
- worker_mask |= GORDON;
- }
- config->nworkers += ngordon_spus;
- #endif
- /* we put the CPU section after the accelerator : in case there was an
- * accelerator found, we devote one core */
- #ifdef USE_CPUS
- envval = starpu_get_env_number("NCPUS");
- if (envval < 0) {
- long avail_cores = sysconf(_SC_NPROCESSORS_ONLN)
- - (use_accelerator?1:0);
- ncores = STARPU_MIN(avail_cores, NMAXCORES);
- } else {
- /* use the specified value */
- ncores = (unsigned)envval;
- STARPU_ASSERT(ncores <= NMAXCORES);
- }
- STARPU_ASSERT(ncores + config->nworkers <= NMAXWORKERS);
- unsigned core;
- for (core = 0; core < ncores; core++)
- {
- config->workers[config->nworkers + core].arch = CORE_WORKER;
- config->workers[config->nworkers + core].perf_arch = STARPU_CORE_DEFAULT;
- config->workers[config->nworkers + core].id = core;
- worker_mask |= CORE;
- }
- config->nworkers += ncores;
- #endif
- if (config->nworkers == 0)
- {
- fprintf(stderr, "No worker found, aborting ...\n");
- exit(-1);
- }
- }
- static void init_workers_binding(struct machine_config_s *config)
- {
- /* launch one thread per CPU */
- unsigned ram_memory_node;
- int current_bindid = 0;
- /* a single core is dedicated for the accelerators */
- int accelerator_bindid = -1;
- /* note that even if the CPU core are not used, we always have a RAM node */
- /* TODO : support NUMA ;) */
- ram_memory_node = register_memory_node(RAM);
- unsigned worker;
- for (worker = 0; worker < config->nworkers; worker++)
- {
- unsigned memory_node = -1;
- unsigned is_an_accelerator = 0;
- struct worker_s *workerarg = &config->workers[worker];
-
- /* select the memory node that contains worker's memory */
- switch (workerarg->arch) {
- case CORE_WORKER:
- /* "dedicate" a cpu core to that worker */
- is_an_accelerator = 0;
- memory_node = ram_memory_node;
- break;
- #ifdef USE_GORDON
- case GORDON_WORKER:
- is_an_accelerator = 1;
- memory_node = ram_memory_node;
- break;
- #endif
- #ifdef USE_CUDA
- case CUDA_WORKER:
- is_an_accelerator = 1;
- memory_node = register_memory_node(CUDA_RAM);
- break;
- #endif
- default:
- STARPU_ASSERT(0);
- }
- if (is_an_accelerator) {
- if (accelerator_bindid == -1)
- accelerator_bindid = (current_bindid++) % (sysconf(_SC_NPROCESSORS_ONLN));
- workerarg->bindid = accelerator_bindid;
- }
- else {
- workerarg->bindid = (current_bindid++) % (sysconf(_SC_NPROCESSORS_ONLN));
- }
- workerarg->memory_node = memory_node;
- }
- }
- #ifdef USE_GORDON
- unsigned gordon_inited = 0;
- struct worker_set_s gordon_worker_set;
- #endif
- static void init_workers(struct machine_config_s *config)
- {
- config->running = 1;
- pthread_key_create(&local_workers_key, NULL);
- unsigned worker;
- for (worker = 0; worker < config->nworkers; worker++)
- {
- struct worker_s *workerarg = &config->workers[worker];
- pthread_mutex_init(&workerarg->mutex, NULL);
- pthread_cond_init(&workerarg->ready_cond, NULL);
- /* if some codelet's termination cannot be handled directly :
- * for instance in the Gordon driver, Gordon tasks' callbacks
- * may be executed by another thread than that of the Gordon
- * driver so that we cannot call the push_codelet_output method
- * directly */
- workerarg->terminated_jobs = job_list_new();
-
- switch (workerarg->arch) {
- #ifdef USE_CPUS
- case CORE_WORKER:
- workerarg->set = NULL;
- pthread_create(&workerarg->worker_thread,
- NULL, core_worker, workerarg);
- pthread_mutex_lock(&workerarg->mutex);
- pthread_cond_wait(&workerarg->ready_cond, &workerarg->mutex);
- pthread_mutex_unlock(&workerarg->mutex);
- break;
- #endif
- #ifdef USE_CUDA
- case CUDA_WORKER:
- workerarg->set = NULL;
- pthread_create(&workerarg->worker_thread,
- NULL, cuda_worker, workerarg);
- pthread_mutex_lock(&workerarg->mutex);
- pthread_cond_wait(&workerarg->ready_cond, &workerarg->mutex);
- pthread_mutex_unlock(&workerarg->mutex);
- break;
- #endif
- #ifdef USE_GORDON
- case GORDON_WORKER:
- /* we will only launch gordon once, but it will handle
- * the different SPU workers */
- if (!gordon_inited)
- {
- gordon_worker_set.nworkers = ngordon_spus;
- gordon_worker_set.workers = &config->workers[worker];
- pthread_create(&gordon_worker_set.worker_thread, NULL,
- gordon_worker, &gordon_worker_set);
- pthread_mutex_lock(&gordon_worker_set.mutex);
- pthread_cond_wait(&gordon_worker_set.ready_cond, &gordon_worker_set.mutex);
- pthread_mutex_unlock(&gordon_worker_set.mutex);
- gordon_inited = 1;
- }
-
- workerarg->set = &gordon_worker_set;
- gordon_worker_set.joined = 0;
- workerarg->worker_is_running = 1;
- break;
- #endif
- default:
- STARPU_ASSERT(0);
- }
- }
- }
- void starpu_init(void)
- {
- srand(2008);
- #ifdef USE_FXT
- start_fxt_profiling();
- #endif
- timing_init();
- init_machine_config(&config);
- /* for the data wizard */
- init_memory_nodes();
- init_workers_binding(&config);
- /* initialize the scheduler */
- /* initialize the queue containing the jobs */
- init_sched_policy(&config);
- init_workers(&config);
- }
- /*
- * Handle runtime termination
- */
- void terminate_workers(struct machine_config_s *config)
- {
- int status;
- unsigned workerid;
- for (workerid = 0; workerid < config->nworkers; workerid++)
- {
- wake_all_blocked_workers();
-
- #ifdef VERBOSE
- fprintf(stderr, "wait for worker %d\n", workerid);
- #endif
- struct worker_set_s *set = config->workers[workerid].set;
- /* in case StarPU termination code is called from a callback,
- * we have to check if pthread_self() is the worker itself */
- if (set){
- if (!set->joined) {
- if (pthread_self() != set->worker_thread)
- {
- status = pthread_join(set->worker_thread, NULL);
- #ifdef VERBOSE
- if (status)
- fprintf(stderr, "pthread_join -> %d\n", status);
- #endif
- }
- set->joined = 1;
- }
- }
- else {
- struct worker_s *worker = &config->workers[workerid];
- if (pthread_self() != worker->worker_thread)
- {
- status = pthread_join(worker->worker_thread, NULL);
- #ifdef VERBOSE
- if (status)
- fprintf(stderr, "pthread_join -> %d\n", status);
- #endif
- }
- }
- }
- }
- unsigned machine_is_running(void)
- {
- return config.running;
- }
- void kill_all_workers(struct machine_config_s *config)
- {
- /* set the flag which will tell workers to stop */
- config->running = 0;
- /* in case some workers are waiting on some event
- wake them up ... */
- wake_all_blocked_workers();
- }
- void starpu_shutdown(void)
- {
- display_msi_stats();
- display_alloc_cache_stats();
- /* tell all workers to shutdown */
- kill_all_workers(&config);
- #ifdef DATA_STATS
- display_comm_ammounts();
- #endif
- if (starpu_get_env_number("CALIBRATE") != -1)
- dump_registered_models();
- /* wait for their termination */
- terminate_workers(&config);
- }
|