/* StarPU --- Runtime system for heterogeneous multicore architectures. * * Copyright (C) 2010-2013 Université de Bordeaux 1 * Copyright (C) 2010-2013 Centre National de la Recherche Scientifique * Copyright (C) 2011 INRIA * * StarPU is free software; you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published by * the Free Software Foundation; either version 2.1 of the License, or (at * your option) any later version. * * StarPU is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. * * See the GNU Lesser General Public License in COPYING.LGPL for more details. */ #include #include #include #include #include #include #include static int use_prefetch = 0; int starpu_get_prefetch_flag(void) { return use_prefetch; } static struct starpu_sched_policy *predefined_policies[] = { #ifdef STARPU_HAVE_HWLOC &_starpu_sched_tree_heft_hierarchical_policy, #endif &_starpu_sched_tree_eager_policy, &_starpu_sched_tree_random_policy, &_starpu_sched_tree_ws_policy, &_starpu_sched_tree_heft_policy, &_starpu_sched_eager_policy, &_starpu_sched_prio_policy, &_starpu_sched_random_policy, &_starpu_sched_ws_policy, &_starpu_sched_dm_policy, &_starpu_sched_dmda_policy, &_starpu_sched_dmda_ready_policy, &_starpu_sched_dmda_sorted_policy, &_starpu_sched_parallel_heft_policy, &_starpu_sched_peager_policy, NULL }; struct starpu_sched_policy **starpu_sched_get_predefined_policies() { return predefined_policies; } struct starpu_sched_policy *_starpu_get_sched_policy(struct _starpu_sched_ctx *sched_ctx) { return sched_ctx->sched_policy; } /* * Methods to initialize the scheduling policy */ static void load_sched_policy(struct starpu_sched_policy *sched_policy, struct _starpu_sched_ctx *sched_ctx) { STARPU_ASSERT(sched_policy); #ifdef STARPU_VERBOSE if (sched_policy->policy_name) { if (sched_policy->policy_description) _STARPU_DEBUG("Use %s scheduler (%s)\n", sched_policy->policy_name, sched_policy->policy_description); else _STARPU_DEBUG("Use %s scheduler \n", sched_policy->policy_name); } #endif struct starpu_sched_policy *policy = sched_ctx->sched_policy; memcpy(policy, sched_policy, sizeof(*policy)); } static struct starpu_sched_policy *find_sched_policy_from_name(const char *policy_name) { if (!policy_name) return NULL; if (strncmp(policy_name, "heft", 5) == 0) { _STARPU_DISP("Warning: heft is now called \"dmda\".\n"); return &_starpu_sched_dmda_policy; } struct starpu_sched_policy **policy; for(policy=predefined_policies ; *policy!=NULL ; policy++) { struct starpu_sched_policy *p = *policy; if (p->policy_name) { if (strcmp(policy_name, p->policy_name) == 0) { /* we found a policy with the requested name */ return p; } } } fprintf(stderr, "Warning: scheduling policy \"%s\" was not found, try \"help\" to get a list\n", policy_name); /* nothing was found */ return NULL; } static void display_sched_help_message(void) { const char *sched_env = getenv("STARPU_SCHED"); if (sched_env && (strcmp(sched_env, "help") == 0)) { /* display the description of all predefined policies */ struct starpu_sched_policy **policy; fprintf(stderr, "STARPU_SCHED can be either of\n"); for(policy=predefined_policies ; *policy!=NULL ; policy++) { struct starpu_sched_policy *p = *policy; fprintf(stderr, "%s\t-> %s\n", p->policy_name, p->policy_description); } } } struct starpu_sched_policy *_starpu_select_sched_policy(struct _starpu_machine_config *config, const char *required_policy) { struct starpu_sched_policy *selected_policy = NULL; struct starpu_conf *user_conf = config->conf; if(required_policy) selected_policy = find_sched_policy_from_name(required_policy); /* First, we check whether the application explicitely gave a scheduling policy or not */ if (!selected_policy && user_conf && (user_conf->sched_policy)) return user_conf->sched_policy; /* Otherwise, we look if the application specified the name of a policy to load */ const char *sched_pol_name; sched_pol_name = getenv("STARPU_SCHED"); if (sched_pol_name == NULL && user_conf && user_conf->sched_policy_name) sched_pol_name = user_conf->sched_policy_name; if (!selected_policy && sched_pol_name) selected_policy = find_sched_policy_from_name(sched_pol_name); /* Perhaps there was no policy that matched the name */ if (selected_policy) return selected_policy; /* If no policy was specified, we use the greedy policy as a default */ return &_starpu_sched_eager_policy; } void _starpu_init_sched_policy(struct _starpu_machine_config *config, struct _starpu_sched_ctx *sched_ctx, struct starpu_sched_policy *selected_policy) { /* Perhaps we have to display some help */ display_sched_help_message(); /* Prefetch is activated by default */ use_prefetch = starpu_get_env_number("STARPU_PREFETCH"); if (use_prefetch == -1) use_prefetch = 1; /* Set calibrate flag */ _starpu_set_calibrate_flag(config->conf->calibrate); load_sched_policy(selected_policy, sched_ctx); sched_ctx->sched_policy->init_sched(sched_ctx->id); } void _starpu_deinit_sched_policy(struct _starpu_sched_ctx *sched_ctx) { struct starpu_sched_policy *policy = sched_ctx->sched_policy; if (policy->deinit_sched) policy->deinit_sched(sched_ctx->id); } /* Enqueue a task into the list of tasks explicitely attached to a worker. In * case workerid identifies a combined worker, a task will be enqueued into * each worker of the combination. */ static int _starpu_push_task_on_specific_worker(struct starpu_task *task, int workerid) { int nbasic_workers = (int)starpu_worker_get_count(); /* Is this a basic worker or a combined worker ? */ int is_basic_worker = (workerid < nbasic_workers); unsigned memory_node; struct _starpu_worker *worker = NULL; struct _starpu_combined_worker *combined_worker = NULL; if (is_basic_worker) { worker = _starpu_get_worker_struct(workerid); memory_node = worker->memory_node; } else { combined_worker = _starpu_get_combined_worker_struct(workerid); memory_node = combined_worker->memory_node; } if (use_prefetch) starpu_prefetch_task_input_on_node(task, memory_node); /* if we push a task on a specific worker, notify all the sched_ctxs the worker belongs to */ unsigned i; struct _starpu_sched_ctx *sched_ctx; for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++) { sched_ctx = worker->sched_ctx[i]; if (sched_ctx != NULL && sched_ctx->sched_policy != NULL && sched_ctx->sched_policy->push_task_notify) sched_ctx->sched_policy->push_task_notify(task, workerid, sched_ctx->id); } #ifdef STARPU_USE_SC_HYPERVISOR starpu_sched_ctx_call_pushed_task_cb(workerid, task->sched_ctx); #endif //STARPU_USE_SC_HYPERVISOR if (is_basic_worker) { unsigned node = starpu_worker_get_memory_node(workerid); if (_starpu_task_uses_multiformat_handles(task)) { for (i = 0; i < task->cl->nbuffers; i++) { struct starpu_task *conversion_task; starpu_data_handle_t handle; handle = STARPU_TASK_GET_HANDLE(task, i); if (!_starpu_handle_needs_conversion_task(handle, node)) continue; conversion_task = _starpu_create_conversion_task(handle, node); conversion_task->mf_skip = 1; conversion_task->execute_on_a_specific_worker = 1; conversion_task->workerid = workerid; _starpu_task_submit_conversion_task(conversion_task, workerid); //_STARPU_DEBUG("Pushing a conversion task\n"); } for (i = 0; i < task->cl->nbuffers; i++) { starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, i); handle->mf_node = node; } } // if(task->sched_ctx != _starpu_get_initial_sched_ctx()->id) if(task->priority > 0) return _starpu_push_local_task(worker, task, 1); else return _starpu_push_local_task(worker, task, 0); } else { /* This is a combined worker so we create task aliases */ int worker_size = combined_worker->worker_size; int *combined_workerid = combined_worker->combined_workerid; int ret = 0; struct _starpu_job *job = _starpu_get_job_associated_to_task(task); job->task_size = worker_size; job->combined_workerid = workerid; job->active_task_alias_count = 0; STARPU_PTHREAD_BARRIER_INIT(&job->before_work_barrier, NULL, worker_size); STARPU_PTHREAD_BARRIER_INIT(&job->after_work_barrier, NULL, worker_size); /* Note: we have to call that early, or else the task may have * disappeared already */ starpu_push_task_end(task); int j; for (j = 0; j < worker_size; j++) { struct starpu_task *alias = starpu_task_dup(task); worker = _starpu_get_worker_struct(combined_workerid[j]); ret |= _starpu_push_local_task(worker, alias, 0); } return ret; } } static int _starpu_nworkers_able_to_execute_task(struct starpu_task *task, struct _starpu_sched_ctx *sched_ctx) { unsigned worker = 0, nworkers = 0; struct starpu_worker_collection *workers = sched_ctx->workers; struct starpu_sched_ctx_iterator it; if(workers->init_iterator) workers->init_iterator(workers, &it); while(workers->has_next(workers, &it)) { worker = workers->get_next(workers, &it); if (starpu_worker_can_execute_task(worker, task, 0) && starpu_sched_ctx_is_ctxs_turn(worker, sched_ctx->id)) nworkers++; } return nworkers; } /* the generic interface that call the proper underlying implementation */ int _starpu_push_task(struct _starpu_job *j) { struct starpu_task *task = j->task; struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx); unsigned nworkers = 0; int ret; _STARPU_LOG_IN(); _STARPU_TRACE_JOB_PUSH(task, task->priority > 0); _starpu_increment_nready_tasks(); task->status = STARPU_TASK_READY; #ifdef HAVE_AYUDAME_H if (AYU_event) { int id = -1; AYU_event(AYU_ADDTASKTOQUEUE, j->job_id, &id); } #endif /* if the context does not have any workers save the tasks in a temp list */ if(!sched_ctx->is_initial_sched) { /*if there are workers in the ctx that are not able to execute tasks we consider the ctx empty */ nworkers = _starpu_nworkers_able_to_execute_task(task, sched_ctx); if(nworkers == 0) { STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->empty_ctx_mutex); starpu_task_list_push_front(&sched_ctx->empty_ctx_tasks, task); STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->empty_ctx_mutex); return 0; } } /* in case there is no codelet associated to the task (that's a control * task), we directly execute its callback and enforce the * corresponding dependencies */ if (task->cl == NULL) { _starpu_handle_job_termination(j); _STARPU_LOG_OUT_TAG("handle_job_termination"); return 0; } ret = _starpu_push_task_to_workers(task); if (ret == -EAGAIN) /* pushed to empty context, that's fine */ ret = 0; return ret; } int _starpu_push_task_to_workers(struct starpu_task *task) { struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx); unsigned nworkers = 0; /* if the contexts still does not have workers put the task back to its place in the empty ctx list */ if(!sched_ctx->is_initial_sched) { /*if there are workers in the ctx that are not able to execute tasks we consider the ctx empty */ nworkers = _starpu_nworkers_able_to_execute_task(task, sched_ctx); if (nworkers == 0) { STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->empty_ctx_mutex); starpu_task_list_push_back(&sched_ctx->empty_ctx_tasks, task); STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->empty_ctx_mutex); return -EAGAIN; } } _starpu_profiling_set_task_push_start_time(task); int ret; if (STARPU_UNLIKELY(task->execute_on_a_specific_worker)) { ret = _starpu_push_task_on_specific_worker(task, task->workerid); } else { STARPU_ASSERT(sched_ctx->sched_policy->push_task); /* check out if there are any workers in the context */ starpu_pthread_mutex_t *changing_ctx_mutex = _starpu_sched_ctx_get_changing_ctx_mutex(sched_ctx->id); STARPU_PTHREAD_MUTEX_LOCK(changing_ctx_mutex); nworkers = starpu_sched_ctx_get_nworkers(sched_ctx->id); ret = nworkers == 0 ? -1 : sched_ctx->sched_policy->push_task(task); STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex); if(ret == -1) { fprintf(stderr, "repush task \n"); _STARPU_TRACE_JOB_POP(task, task->priority > 0); ret = _starpu_push_task_to_workers(task); } } /* Note: from here, the task might have been destroyed already! */ _STARPU_LOG_OUT(); return ret; } /* This is called right after the scheduler has pushed a task to a queue * but just before releasing mutexes: we need the task to still be alive! */ int starpu_push_task_end(struct starpu_task *task) { _starpu_profiling_set_task_push_end_time(task); task->scheduled = 1; return 0; } /* * Given a handle that needs to be converted in order to be used on the given * node, returns a task that takes care of the conversion. */ struct starpu_task *_starpu_create_conversion_task(starpu_data_handle_t handle, unsigned int node) { return _starpu_create_conversion_task_for_arch(handle, starpu_node_get_kind(node)); } struct starpu_task *_starpu_create_conversion_task_for_arch(starpu_data_handle_t handle, enum starpu_node_kind node_kind) { struct starpu_task *conversion_task; #if defined(STARPU_USE_OPENCL) || defined(STARPU_USE_CUDA) || defined(STARPU_USE_MIC) || defined(STARPU_USE_SCC) || defined(STARPU_SIMGRID) struct starpu_multiformat_interface *format_interface; #endif conversion_task = starpu_task_create(); conversion_task->synchronous = 0; STARPU_TASK_SET_HANDLE(conversion_task, handle, 0); #if defined(STARPU_USE_OPENCL) || defined(STARPU_USE_CUDA) || defined(STARPU_USE_MIC) || defined(STARPU_USE_SCC) || defined(STARPU_SIMGRID) /* The node does not really matter here */ format_interface = (struct starpu_multiformat_interface *) starpu_data_get_interface_on_node(handle, 0); #endif _starpu_spin_lock(&handle->header_lock); handle->refcnt++; handle->busy_count++; _starpu_spin_unlock(&handle->header_lock); switch(node_kind) { case STARPU_CPU_RAM: case STARPU_SCC_RAM: case STARPU_SCC_SHM: switch (starpu_node_get_kind(handle->mf_node)) { case STARPU_CPU_RAM: case STARPU_SCC_RAM: case STARPU_SCC_SHM: STARPU_ABORT(); #if defined(STARPU_USE_CUDA) || defined(STARPU_SIMGRID) case STARPU_CUDA_RAM: { struct starpu_multiformat_data_interface_ops *mf_ops; mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface); conversion_task->cl = mf_ops->cuda_to_cpu_cl; break; } #endif #if defined(STARPU_USE_OPENCL) || defined(STARPU_SIMGRID) case STARPU_OPENCL_RAM: { struct starpu_multiformat_data_interface_ops *mf_ops; mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface); conversion_task->cl = mf_ops->opencl_to_cpu_cl; break; } #endif #ifdef STARPU_USE_MIC case STARPU_MIC_RAM: { struct starpu_multiformat_data_interface_ops *mf_ops; mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface); conversion_task->cl = mf_ops->mic_to_cpu_cl; break; } #endif default: _STARPU_ERROR("Oops : %u\n", handle->mf_node); } break; #if defined(STARPU_USE_CUDA) || defined(STARPU_SIMGRID) case STARPU_CUDA_RAM: { struct starpu_multiformat_data_interface_ops *mf_ops; mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface); conversion_task->cl = mf_ops->cpu_to_cuda_cl; break; } #endif #if defined(STARPU_USE_OPENCL) || defined(STARPU_SIMGRID) case STARPU_OPENCL_RAM: { struct starpu_multiformat_data_interface_ops *mf_ops; mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface); conversion_task->cl = mf_ops->cpu_to_opencl_cl; break; } #endif #ifdef STARPU_USE_MIC case STARPU_MIC_RAM: { struct starpu_multiformat_data_interface_ops *mf_ops; mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface); conversion_task->cl = mf_ops->cpu_to_mic_cl; break; } #endif default: STARPU_ABORT(); } STARPU_CODELET_SET_MODE(conversion_task->cl, STARPU_RW, 0); return conversion_task; } struct _starpu_sched_ctx* _get_next_sched_ctx_to_pop_into(struct _starpu_worker *worker) { while(1) { struct _starpu_sched_ctx *sched_ctx, *good_sched_ctx = NULL; unsigned smallest_counter = worker->nsched_ctxs; unsigned i; for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++) { sched_ctx = worker->sched_ctx[i]; if(sched_ctx != NULL && sched_ctx->id != STARPU_NMAX_SCHED_CTXS && worker->removed_from_ctx[sched_ctx->id]) return sched_ctx; if(sched_ctx != NULL && sched_ctx->id != STARPU_NMAX_SCHED_CTXS && sched_ctx->pop_counter[worker->workerid] < worker->nsched_ctxs && smallest_counter > sched_ctx->pop_counter[worker->workerid]) { good_sched_ctx = sched_ctx; smallest_counter = sched_ctx->pop_counter[worker->workerid]; } } if(good_sched_ctx == NULL) { for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++) { sched_ctx = worker->sched_ctx[i]; if(sched_ctx != NULL && sched_ctx->id != STARPU_NMAX_SCHED_CTXS) sched_ctx->pop_counter[worker->workerid] = 0; } continue; } return good_sched_ctx; } } struct starpu_task *_starpu_pop_task(struct _starpu_worker *worker) { struct starpu_task *task; int worker_id; unsigned node; /* We can't tell in advance which task will be picked up, so we measure * a timestamp, and will attribute it afterwards to the task. */ int profiling = starpu_profiling_status_get(); struct timespec pop_start_time; if (profiling) _starpu_clock_gettime(&pop_start_time); pick: /* perhaps there is some local task to be executed first */ task = _starpu_pop_local_task(worker); /* get tasks from the stacks of the strategy */ if(!task) { struct _starpu_sched_ctx *sched_ctx; //unsigned lucky_ctx = STARPU_NMAX_SCHED_CTXS; int been_here[STARPU_NMAX_SCHED_CTXS]; int i; for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++) been_here[i] = 0; while(!task) { if(worker->nsched_ctxs == 1) sched_ctx = _starpu_get_initial_sched_ctx(); else sched_ctx = _get_next_sched_ctx_to_pop_into(worker); if(sched_ctx != NULL && sched_ctx->id != STARPU_NMAX_SCHED_CTXS) { if (sched_ctx->sched_policy && sched_ctx->sched_policy->pop_task) { task = sched_ctx->sched_policy->pop_task(sched_ctx->id); //lucky_ctx = sched_ctx->id; } } if(!task && worker->removed_from_ctx[sched_ctx->id]) { _starpu_worker_gets_out_of_ctx(sched_ctx->id, worker); worker->removed_from_ctx[sched_ctx->id] = 0; } if((!task && sched_ctx->pop_counter[worker->workerid] == 0 && been_here[sched_ctx->id]) || worker->nsched_ctxs == 1) break; been_here[sched_ctx->id] = 1; sched_ctx->pop_counter[worker->workerid]++; } } if (!task) return NULL; /* Make sure we do not bother with all the multiformat-specific code if * it is not necessary. */ if (!_starpu_task_uses_multiformat_handles(task)) goto profiling; /* This is either a conversion task, or a regular task for which the * conversion tasks have already been created and submitted */ if (task->mf_skip) goto profiling; worker_id = starpu_worker_get_id(); if (!starpu_worker_can_execute_task(worker_id, task, 0)) return task; node = starpu_worker_get_memory_node(worker_id); /* * We do have a task that uses multiformat handles. Let's create the * required conversion tasks. */ unsigned i; for (i = 0; i < task->cl->nbuffers; i++) { struct starpu_task *conversion_task; starpu_data_handle_t handle; handle = STARPU_TASK_GET_HANDLE(task, i); if (!_starpu_handle_needs_conversion_task(handle, node)) continue; conversion_task = _starpu_create_conversion_task(handle, node); conversion_task->mf_skip = 1; conversion_task->execute_on_a_specific_worker = 1; conversion_task->workerid = worker_id; /* * Next tasks will need to know where these handles have gone. */ handle->mf_node = node; _starpu_task_submit_conversion_task(conversion_task, worker_id); } task->mf_skip = 1; starpu_task_list_push_back(&worker->local_tasks, task); goto pick; profiling: if (profiling) { struct starpu_profiling_task_info *profiling_info; profiling_info = task->profiling_info; /* The task may have been created before profiling was enabled, * so we check if the profiling_info structure is available * even though we already tested if profiling is enabled. */ if (profiling_info) { memcpy(&profiling_info->pop_start_time, &pop_start_time, sizeof(struct timespec)); _starpu_clock_gettime(&profiling_info->pop_end_time); } } return task; } struct starpu_task *_starpu_pop_every_task(struct _starpu_sched_ctx *sched_ctx) { STARPU_ASSERT(sched_ctx->sched_policy->pop_every_task); /* TODO set profiling info */ if(sched_ctx->sched_policy->pop_every_task) return sched_ctx->sched_policy->pop_every_task(sched_ctx->id); return NULL; } void _starpu_sched_pre_exec_hook(struct starpu_task *task) { struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx); if (sched_ctx->sched_policy->pre_exec_hook) sched_ctx->sched_policy->pre_exec_hook(task); } void _starpu_sched_post_exec_hook(struct starpu_task *task) { struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx); #ifdef STARPU_USE_SC_HYPERVISOR if(task->hypervisor_tag > 0 && sched_ctx != NULL && sched_ctx->id != 0 && sched_ctx->perf_counters != NULL) sched_ctx->perf_counters->notify_post_exec_hook(sched_ctx->id, task->hypervisor_tag); #endif //STARPU_USE_SC_HYPERVISOR if (sched_ctx->sched_policy->post_exec_hook) sched_ctx->sched_policy->post_exec_hook(task); } void _starpu_wait_on_sched_event(void) { struct _starpu_worker *worker = _starpu_get_local_worker_key(); STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex); _starpu_handle_all_pending_node_data_requests(worker->memory_node); if (_starpu_machine_is_running()) { #ifndef STARPU_NON_BLOCKING_DRIVERS STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex); #endif } STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex); } /* The scheduling policy may put tasks directly into a worker's local queue so * that it is not always necessary to create its own queue when the local queue * is sufficient. If "back" not null, the task is put at the back of the queue * where the worker will pop tasks first. Setting "back" to 0 therefore ensures * a FIFO ordering. */ int starpu_push_local_task(int workerid, struct starpu_task *task, int prio) { struct _starpu_worker *worker = _starpu_get_worker_struct(workerid); return _starpu_push_local_task(worker, task, prio); }