| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487 |
- /* StarPU --- Runtime system for heterogeneous multicore architectures.
- *
- * Copyright (C) 2009-2013, 2016 Université de Bordeaux
- * Copyright (C) 2010, 2011, 2012, 2013 CNRS
- * Copyright (C) 2016 Inria
- *
- * StarPU is free software; you can redistribute it and/or modify
- * it under the terms of the GNU Lesser General Public License as published by
- * the Free Software Foundation; either version 2.1 of the License, or (at
- * your option) any later version.
- *
- * StarPU is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
- *
- * See the GNU Lesser General Public License in COPYING.LGPL for more details.
- */
- #include <starpu.h>
- #include <common/config.h>
- #include <common/utils.h>
- #include <core/dependencies/tags.h>
- #include <core/jobs.h>
- #include <core/sched_policy.h>
- #include <core/dependencies/data_concurrency.h>
- #include <profiling/bound.h>
- #include <common/uthash.h>
- #include <core/debug.h>
- #define STARPU_AYUDAME_OFFSET 4000000000000000000ULL
- struct _starpu_tag_table
- {
- UT_hash_handle hh;
- starpu_tag_t id;
- struct _starpu_tag *tag;
- };
- #define HASH_ADD_UINT64_T(head,field,add) HASH_ADD(hh,head,field,sizeof(uint64_t),add)
- #define HASH_FIND_UINT64_T(head,find,out) HASH_FIND(hh,head,find,sizeof(uint64_t),out)
- static struct _starpu_tag_table *tag_htbl = NULL;
- static starpu_pthread_rwlock_t tag_global_rwlock;
- static struct _starpu_cg *create_cg_apps(unsigned ntags)
- {
- struct _starpu_cg *cg = (struct _starpu_cg *) malloc(sizeof(struct _starpu_cg));
- STARPU_ASSERT(cg);
- cg->ntags = ntags;
- cg->remaining = ntags;
- cg->cg_type = STARPU_CG_APPS;
- cg->succ.succ_apps.completed = 0;
- STARPU_PTHREAD_MUTEX_INIT(&cg->succ.succ_apps.cg_mutex, NULL);
- STARPU_PTHREAD_COND_INIT(&cg->succ.succ_apps.cg_cond, NULL);
- return cg;
- }
- static struct _starpu_cg *create_cg_tag(unsigned ntags, struct _starpu_tag *tag)
- {
- struct _starpu_cg *cg = (struct _starpu_cg *) malloc(sizeof(struct _starpu_cg));
- STARPU_ASSERT(cg);
- cg->ntags = ntags;
- cg->remaining = ntags;
- cg->cg_type = STARPU_CG_TAG;
- cg->succ.tag = tag;
- tag->tag_successors.ndeps++;
- return cg;
- }
- static struct _starpu_tag *_starpu_tag_init(starpu_tag_t id)
- {
- struct _starpu_tag *tag;
- tag = (struct _starpu_tag *) malloc(sizeof(struct _starpu_tag));
- STARPU_ASSERT(tag);
- tag->job = NULL;
- tag->is_assigned = 0;
- tag->is_submitted = 0;
- tag->id = id;
- tag->state = STARPU_INVALID_STATE;
- _starpu_cg_list_init(&tag->tag_successors);
- _starpu_spin_init(&tag->lock);
- return tag;
- }
- static void _starpu_tag_free(void *_tag)
- {
- struct _starpu_tag *tag = (struct _starpu_tag *) _tag;
- if (tag)
- {
- _starpu_spin_lock(&tag->lock);
- unsigned nsuccs = tag->tag_successors.nsuccs;
- unsigned succ;
- for (succ = 0; succ < nsuccs; succ++)
- {
- struct _starpu_cg *cg = tag->tag_successors.succ[succ];
- unsigned ntags = STARPU_ATOMIC_ADD(&cg->ntags, -1);
- unsigned remaining STARPU_ATTRIBUTE_UNUSED = STARPU_ATOMIC_ADD(&cg->remaining, -1);
- if (!ntags && (cg->cg_type == STARPU_CG_TAG))
- /* Last tag this cg depends on, cg becomes unreferenced */
- free(cg);
- }
- #ifdef STARPU_DYNAMIC_DEPS_SIZE
- free(tag->tag_successors.succ);
- #endif
- _starpu_spin_unlock(&tag->lock);
- _starpu_spin_destroy(&tag->lock);
- free(tag);
- }
- }
- /*
- * Staticly initializing tag_global_rwlock seems to lead to weird errors
- * on Darwin, so we do it dynamically.
- */
- void _starpu_init_tags(void)
- {
- STARPU_PTHREAD_RWLOCK_INIT(&tag_global_rwlock, NULL);
- }
- void starpu_tag_remove(starpu_tag_t id)
- {
- struct _starpu_tag_table *entry;
- STARPU_ASSERT(!STARPU_AYU_EVENT || id < STARPU_AYUDAME_OFFSET);
- STARPU_AYU_REMOVETASK(id + STARPU_AYUDAME_OFFSET);
- STARPU_PTHREAD_RWLOCK_WRLOCK(&tag_global_rwlock);
- HASH_FIND_UINT64_T(tag_htbl, &id, entry);
- if (entry) HASH_DEL(tag_htbl, entry);
- STARPU_PTHREAD_RWLOCK_UNLOCK(&tag_global_rwlock);
- if (entry)
- {
- _starpu_tag_free(entry->tag);
- free(entry);
- }
- }
- void _starpu_tag_clear(void)
- {
- STARPU_PTHREAD_RWLOCK_WRLOCK(&tag_global_rwlock);
- /* XXX: _starpu_tag_free takes the tag spinlocks while we are keeping
- * the global rwlock. This contradicts the lock order of
- * starpu_tag_wait_array. Should not be a problem in practice since
- * _starpu_tag_clear is called at shutdown only. */
- struct _starpu_tag_table *entry, *tmp;
- HASH_ITER(hh, tag_htbl, entry, tmp)
- {
- HASH_DEL(tag_htbl, entry);
- _starpu_tag_free(entry->tag);
- free(entry);
- }
- STARPU_PTHREAD_RWLOCK_UNLOCK(&tag_global_rwlock);
- }
- static struct _starpu_tag *_gettag_struct(starpu_tag_t id)
- {
- /* search if the tag is already declared or not */
- struct _starpu_tag_table *entry;
- struct _starpu_tag *tag;
- HASH_FIND_UINT64_T(tag_htbl, &id, entry);
- if (entry != NULL)
- tag = entry->tag;
- else
- {
- /* the tag does not exist yet : create an entry */
- tag = _starpu_tag_init(id);
- struct _starpu_tag_table *entry2;
- entry2 = (struct _starpu_tag_table *) malloc(sizeof(*entry2));
- STARPU_ASSERT(entry2 != NULL);
- entry2->id = id;
- entry2->tag = tag;
- HASH_ADD_UINT64_T(tag_htbl, id, entry2);
- STARPU_ASSERT(!STARPU_AYU_EVENT || id < STARPU_AYUDAME_OFFSET);
- STARPU_AYU_ADDTASK(id + STARPU_AYUDAME_OFFSET, NULL);
- }
- return tag;
- }
- static struct _starpu_tag *gettag_struct(starpu_tag_t id)
- {
- struct _starpu_tag *tag;
- STARPU_PTHREAD_RWLOCK_WRLOCK(&tag_global_rwlock);
- tag = _gettag_struct(id);
- STARPU_PTHREAD_RWLOCK_UNLOCK(&tag_global_rwlock);
- return tag;
- }
- /* lock should be taken */
- void _starpu_tag_set_ready(struct _starpu_tag *tag)
- {
- /* mark this tag as ready to run */
- tag->state = STARPU_READY;
- /* declare it to the scheduler ! */
- struct _starpu_job *j = tag->job;
- /* In case the task job is going to be scheduled immediately, and if
- * the task is "empty", calling _starpu_push_task would directly try to enforce
- * the dependencies of the task, and therefore it would try to grab the
- * lock again, resulting in a deadlock. */
- _starpu_spin_unlock(&tag->lock);
- /* enforce data dependencies */
- STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
- _starpu_enforce_deps_starting_from_task(j);
- _starpu_spin_lock(&tag->lock);
- STARPU_ASSERT(!STARPU_AYU_EVENT || tag->id < STARPU_AYUDAME_OFFSET);
- STARPU_AYU_PRERUNTASK(tag->id + STARPU_AYUDAME_OFFSET, -1);
- STARPU_AYU_POSTRUNTASK(tag->id + STARPU_AYUDAME_OFFSET);
- }
- /* the lock must be taken ! */
- static void _starpu_tag_add_succ(struct _starpu_tag *tag, struct _starpu_cg *cg)
- {
- STARPU_ASSERT(tag);
- _starpu_add_successor_to_cg_list(&tag->tag_successors, cg);
- if (tag->state == STARPU_DONE)
- {
- /* the tag was already completed sooner */
- _starpu_notify_cg(cg);
- }
- }
- void _starpu_notify_tag_dependencies(struct _starpu_tag *tag)
- {
- _starpu_spin_lock(&tag->lock);
- if (tag->state == STARPU_DONE)
- {
- _starpu_spin_unlock(&tag->lock);
- return;
- }
- tag->state = STARPU_DONE;
- _STARPU_TRACE_TAG_DONE(tag);
- _starpu_notify_cg_list(&tag->tag_successors);
- _starpu_spin_unlock(&tag->lock);
- }
- void starpu_tag_restart(starpu_tag_t id)
- {
- struct _starpu_tag *tag = gettag_struct(id);
- _starpu_spin_lock(&tag->lock);
- STARPU_ASSERT_MSG(tag->state == STARPU_DONE || tag->state == STARPU_INVALID_STATE || tag->state == STARPU_ASSOCIATED || tag->state == STARPU_BLOCKED, "Only completed tags can be restarted (%llu was %d)", (unsigned long long) id, tag->state);
- tag->state = STARPU_BLOCKED;
- _starpu_spin_unlock(&tag->lock);
- }
- void starpu_tag_notify_from_apps(starpu_tag_t id)
- {
- struct _starpu_tag *tag = gettag_struct(id);
- _starpu_notify_tag_dependencies(tag);
- }
- void _starpu_tag_declare(starpu_tag_t id, struct _starpu_job *job)
- {
- _STARPU_TRACE_TAG(id, job);
- job->task->use_tag = 1;
- struct _starpu_tag *tag= gettag_struct(id);
- _starpu_spin_lock(&tag->lock);
- /* Note: a tag can be shared by several tasks, when it is used to
- * detect when either of them are finished. We however don't allow
- * several tasks to share a tag when it is used to wake them by
- * dependency */
- if (tag->job != job)
- tag->is_assigned++;
- tag->job = job;
- job->tag = tag;
- /* the tag is now associated to a job */
- /* When the same tag may be signaled several times by different tasks,
- * and it's already done, we should not reset the "done" state.
- * When the tag is simply used by the same task several times, we have
- * to do so. */
- if (job->task->regenerate || job->submitted == 2 ||
- tag->state != STARPU_DONE)
- tag->state = STARPU_ASSOCIATED;
- STARPU_ASSERT(!STARPU_AYU_EVENT || id < STARPU_AYUDAME_OFFSET);
- STARPU_AYU_ADDDEPENDENCY(id+STARPU_AYUDAME_OFFSET, 0, job->job_id);
- STARPU_AYU_ADDDEPENDENCY(job->job_id, 0, id+STARPU_AYUDAME_OFFSET);
- _starpu_spin_unlock(&tag->lock);
- }
- void starpu_tag_declare_deps_array(starpu_tag_t id, unsigned ndeps, starpu_tag_t *array)
- {
- if (!ndeps)
- return;
- unsigned i;
- /* create the associated completion group */
- struct _starpu_tag *tag_child = gettag_struct(id);
- _starpu_spin_lock(&tag_child->lock);
- struct _starpu_cg *cg = create_cg_tag(ndeps, tag_child);
- _starpu_spin_unlock(&tag_child->lock);
- for (i = 0; i < ndeps; i++)
- {
- starpu_tag_t dep_id = array[i];
- /* id depends on dep_id
- * so cg should be among dep_id's successors*/
- _STARPU_TRACE_TAG_DEPS(id, dep_id);
- _starpu_bound_tag_dep(id, dep_id);
- struct _starpu_tag *tag_dep = gettag_struct(dep_id);
- STARPU_ASSERT(tag_dep != tag_child);
- _starpu_spin_lock(&tag_dep->lock);
- _starpu_spin_lock(&tag_child->lock);
- _starpu_tag_add_succ(tag_dep, cg);
- STARPU_ASSERT(!STARPU_AYU_EVENT || dep_id < STARPU_AYUDAME_OFFSET);
- STARPU_ASSERT(!STARPU_AYU_EVENT || id < STARPU_AYUDAME_OFFSET);
- STARPU_AYU_ADDDEPENDENCY(dep_id+STARPU_AYUDAME_OFFSET, 0, id+STARPU_AYUDAME_OFFSET);
- _starpu_spin_unlock(&tag_child->lock);
- _starpu_spin_unlock(&tag_dep->lock);
- }
- }
- void starpu_tag_declare_deps(starpu_tag_t id, unsigned ndeps, ...)
- {
- if (!ndeps)
- return;
- unsigned i;
- /* create the associated completion group */
- struct _starpu_tag *tag_child = gettag_struct(id);
- _starpu_spin_lock(&tag_child->lock);
- struct _starpu_cg *cg = create_cg_tag(ndeps, tag_child);
- _starpu_spin_unlock(&tag_child->lock);
- va_list pa;
- va_start(pa, ndeps);
- for (i = 0; i < ndeps; i++)
- {
- starpu_tag_t dep_id;
- dep_id = va_arg(pa, starpu_tag_t);
- /* id depends on dep_id
- * so cg should be among dep_id's successors*/
- _STARPU_TRACE_TAG_DEPS(id, dep_id);
- _starpu_bound_tag_dep(id, dep_id);
- struct _starpu_tag *tag_dep = gettag_struct(dep_id);
- STARPU_ASSERT(tag_dep != tag_child);
- _starpu_spin_lock(&tag_dep->lock);
- _starpu_spin_lock(&tag_child->lock);
- _starpu_tag_add_succ(tag_dep, cg);
- STARPU_ASSERT(!STARPU_AYU_EVENT || dep_id < STARPU_AYUDAME_OFFSET);
- STARPU_ASSERT(!STARPU_AYU_EVENT || id < STARPU_AYUDAME_OFFSET);
- STARPU_AYU_ADDDEPENDENCY(dep_id+STARPU_AYUDAME_OFFSET, 0, id+STARPU_AYUDAME_OFFSET);
- _starpu_spin_unlock(&tag_child->lock);
- _starpu_spin_unlock(&tag_dep->lock);
- }
- va_end(pa);
- }
- /* this function may be called by the application (outside callbacks !) */
- int starpu_tag_wait_array(unsigned ntags, starpu_tag_t *id)
- {
- unsigned i;
- unsigned current;
- struct _starpu_tag *tag_array[ntags];
- _STARPU_LOG_IN();
- /* It is forbidden to block within callbacks or codelets */
- STARPU_ASSERT_MSG(_starpu_worker_may_perform_blocking_calls(), "starpu_tag_wait must not be called from a task or callback");
- STARPU_PTHREAD_RWLOCK_WRLOCK(&tag_global_rwlock);
- /* only wait the tags that are not done yet */
- for (i = 0, current = 0; i < ntags; i++)
- {
- struct _starpu_tag *tag = _gettag_struct(id[i]);
- _starpu_spin_lock(&tag->lock);
- if (tag->state == STARPU_DONE)
- {
- /* that tag is done already */
- _starpu_spin_unlock(&tag->lock);
- }
- else
- {
- tag_array[current] = tag;
- current++;
- }
- }
- STARPU_PTHREAD_RWLOCK_UNLOCK(&tag_global_rwlock);
- if (current == 0)
- {
- /* all deps are already fulfilled */
- _STARPU_LOG_OUT_TAG("all deps are already fulfilled");
- return 0;
- }
- /* there is at least one task that is not finished */
- struct _starpu_cg *cg = create_cg_apps(current);
- for (i = 0; i < current; i++)
- {
- _starpu_tag_add_succ(tag_array[i], cg);
- _starpu_spin_unlock(&tag_array[i]->lock);
- }
- STARPU_PTHREAD_MUTEX_LOCK(&cg->succ.succ_apps.cg_mutex);
- while (!cg->succ.succ_apps.completed)
- STARPU_PTHREAD_COND_WAIT(&cg->succ.succ_apps.cg_cond, &cg->succ.succ_apps.cg_mutex);
- STARPU_PTHREAD_MUTEX_UNLOCK(&cg->succ.succ_apps.cg_mutex);
- STARPU_PTHREAD_MUTEX_DESTROY(&cg->succ.succ_apps.cg_mutex);
- STARPU_PTHREAD_COND_DESTROY(&cg->succ.succ_apps.cg_cond);
- free(cg);
- _STARPU_LOG_OUT();
- return 0;
- }
- int starpu_tag_wait(starpu_tag_t id)
- {
- return starpu_tag_wait_array(1, &id);
- }
- struct starpu_task *starpu_tag_get_task(starpu_tag_t id)
- {
- struct _starpu_tag_table *entry;
- struct _starpu_tag *tag;
- STARPU_PTHREAD_RWLOCK_WRLOCK(&tag_global_rwlock);
- HASH_FIND_UINT64_T(tag_htbl, &id, entry);
- STARPU_PTHREAD_RWLOCK_UNLOCK(&tag_global_rwlock);
- if (!entry)
- return NULL;
- tag = entry->tag;
- if (!tag->job)
- return NULL;
- return tag->job->task;
- }
|