123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431 |
- /*
- * StarPU
- * Copyright (C) Université Bordeaux 1, CNRS 2008-2010 (see AUTHORS file)
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU Lesser General Public License as published by
- * the Free Software Foundation; either version 2.1 of the License, or (at
- * your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
- *
- * See the GNU Lesser General Public License in COPYING.LGPL for more details.
- */
- #include <starpu.h>
- #include <common/config.h>
- #include <core/task.h>
- #include <datawizard/datawizard.h>
- #include <profiling/bound.h>
- #if 0
- # define _STARPU_DEP_DEBUG(fmt, args ...) fprintf(stderr, fmt, ##args);
- #else
- # define _STARPU_DEP_DEBUG(fmt, args ...)
- #endif
- /* This function adds the implicit task dependencies introduced by data
- * sequential consistency. Two tasks are provided: pre_sync and post_sync which
- * respectively indicates which task is going to depend on the previous deps
- * and on which task future deps should wait. In the case of a dependency
- * introduced by a task submission, both tasks are just the submitted task, but
- * in the case of user interactions with the DSM, these may be different tasks.
- * */
- /* NB : handle->sequential_consistency_mutex must be hold by the caller */
- void _starpu_detect_implicit_data_deps_with_handle(struct starpu_task *pre_sync_task, struct starpu_task *post_sync_task,
- starpu_data_handle handle, starpu_access_mode mode)
- {
- STARPU_ASSERT(!(mode & STARPU_SCRATCH));
- _STARPU_LOG_IN();
- if (handle->sequential_consistency)
- {
- /* Skip tasks that are associated to a reduction phase so that
- * they do not interfere with the application. */
- {
- starpu_job_t pre_sync_job = _starpu_get_job_associated_to_task(pre_sync_task);
- starpu_job_t post_sync_job = _starpu_get_job_associated_to_task(post_sync_task);
-
- if (pre_sync_job->reduction_task || post_sync_job->reduction_task)
- return;
- }
-
-
- _STARPU_DEP_DEBUG("Tasks %p %p\n", pre_sync_task, post_sync_task);
- /* In case we are generating the DAG, we add an implicit
- * dependency between the pre and the post sync tasks in case
- * they are not the same. */
- if (pre_sync_task != post_sync_task
- #ifndef STARPU_USE_FXT
- && _starpu_bound_recording
- #endif
- )
- {
- starpu_job_t pre_sync_job = _starpu_get_job_associated_to_task(pre_sync_task);
- starpu_job_t post_sync_job = _starpu_get_job_associated_to_task(post_sync_task);
- STARPU_TRACE_GHOST_TASK_DEPS(pre_sync_job->job_id, post_sync_job->job_id);
- _starpu_bound_task_dep(post_sync_job, pre_sync_job);
- }
- starpu_access_mode previous_mode = handle->last_submitted_mode;
-
- if (mode & STARPU_W)
- {
- _STARPU_DEP_DEBUG("W %p\n", handle);
- if (previous_mode & STARPU_W)
- {
- _STARPU_DEP_DEBUG("WAW %p\n", handle);
- /* (Read) Write */
- /* This task depends on the previous writer */
- if (handle->last_submitted_writer)
- {
- struct starpu_task *task_array[1] = {handle->last_submitted_writer};
- starpu_task_declare_deps_array(pre_sync_task, 1, task_array);
- _STARPU_DEP_DEBUG("dep %p -> %p\n", handle->last_submitted_writer, pre_sync_task);
- }
- else
- {
- _STARPU_DEP_DEBUG("No dep\n");
- }
- /* If there is a ghost writer instead, we
- * should declare a ghost dependency here, and
- * invalidate the ghost value. */
- #ifndef STARPU_USE_FXT
- if (_starpu_bound_recording)
- #endif
- {
- if (handle->last_submitted_ghost_writer_id_is_valid)
- {
- starpu_job_t pre_sync_job = _starpu_get_job_associated_to_task(pre_sync_task);
- STARPU_TRACE_GHOST_TASK_DEPS(handle->last_submitted_ghost_writer_id, pre_sync_job->job_id);
- _starpu_bound_job_id_dep(pre_sync_job, handle->last_submitted_ghost_writer_id);
- _STARPU_DEP_DEBUG("dep ID%lu -> %p\n", handle->last_submitted_ghost_writer_id, pre_sync_task);
- handle->last_submitted_ghost_writer_id_is_valid = 0;
- }
- else
- {
- _STARPU_DEP_DEBUG("No dep ID\n");
- }
- }
-
- handle->last_submitted_writer = post_sync_task;
- }
- else {
- /* The task submitted previously were in read-only
- * mode: this task must depend on all those read-only
- * tasks and we get rid of the list of readers */
-
- _STARPU_DEP_DEBUG("WAR %p\n", handle);
- /* Count the readers */
- unsigned nreaders = 0;
- struct starpu_task_wrapper_list *l;
- l = handle->last_submitted_readers;
- while (l)
- {
- nreaders++;
- l = l->next;
- }
- _STARPU_DEP_DEBUG("%d readers\n", nreaders);
- struct starpu_task *task_array[nreaders];
- unsigned i = 0;
- l = handle->last_submitted_readers;
- while (l)
- {
- STARPU_ASSERT(l->task);
- task_array[i++] = l->task;
- _STARPU_DEP_DEBUG("dep %p -> %p\n", l->task, pre_sync_task);
- struct starpu_task_wrapper_list *prev = l;
- l = l->next;
- free(prev);
- }
- #ifndef STARPU_USE_FXT
- if (_starpu_bound_recording)
- #endif
- {
- /* Declare all dependencies with ghost readers */
- starpu_job_t pre_sync_job = _starpu_get_job_associated_to_task(pre_sync_task);
- struct starpu_jobid_list *ghost_readers_id = handle->last_submitted_ghost_readers_id;
- while (ghost_readers_id)
- {
- unsigned long id = ghost_readers_id->id;
- STARPU_TRACE_GHOST_TASK_DEPS(id, pre_sync_job->job_id);
- _starpu_bound_job_id_dep(pre_sync_job, id);
- _STARPU_DEP_DEBUG("dep ID%lu -> %p\n", id, pre_sync_task);
- struct starpu_jobid_list *prev = ghost_readers_id;
- ghost_readers_id = ghost_readers_id->next;
- free(prev);
- }
- handle->last_submitted_ghost_readers_id = NULL;
- }
- handle->last_submitted_readers = NULL;
- handle->last_submitted_writer = post_sync_task;
-
- starpu_task_declare_deps_array(pre_sync_task, nreaders, task_array);
- }
-
- }
- else {
- _STARPU_DEP_DEBUG("R %p\n", handle);
- /* Add a reader */
- STARPU_ASSERT(pre_sync_task);
- STARPU_ASSERT(post_sync_task);
-
- /* Add this task to the list of readers */
- struct starpu_task_wrapper_list *link = malloc(sizeof(struct starpu_task_wrapper_list));
- link->task = post_sync_task;
- link->next = handle->last_submitted_readers;
- handle->last_submitted_readers = link;
- /* This task depends on the previous writer if any */
- if (handle->last_submitted_writer)
- {
- _STARPU_DEP_DEBUG("RAW %p\n", handle);
- struct starpu_task *task_array[1] = {handle->last_submitted_writer};
- _STARPU_DEP_DEBUG("dep %p -> %p\n", handle->last_submitted_writer, pre_sync_task);
- starpu_task_declare_deps_array(pre_sync_task, 1, task_array);
- }
- else
- {
- _STARPU_DEP_DEBUG("No dep\n");
- }
- /* There was perhaps no last submitted writer but a
- * ghost one, we should report that here, and keep the
- * ghost writer valid */
- if (
- #ifndef STARPU_USE_FXT
- _starpu_bound_recording &&
- #endif
- handle->last_submitted_ghost_writer_id_is_valid)
- {
- starpu_job_t pre_sync_job = _starpu_get_job_associated_to_task(pre_sync_task);
- STARPU_TRACE_GHOST_TASK_DEPS(handle->last_submitted_ghost_writer_id, pre_sync_job->job_id);
- _starpu_bound_job_id_dep(pre_sync_job, handle->last_submitted_ghost_writer_id);
- _STARPU_DEP_DEBUG("dep ID%lu -> %p\n", handle->last_submitted_ghost_writer_id, pre_sync_task);
- }
- }
-
- handle->last_submitted_mode = mode;
- }
- _STARPU_LOG_OUT();
- }
- /* Create the implicit dependencies for a newly submitted task */
- void _starpu_detect_implicit_data_deps(struct starpu_task *task)
- {
- STARPU_ASSERT(task->cl);
- _STARPU_LOG_IN();
- /* We don't want to enforce a sequential consistency for tasks that are
- * not visible to the application. */
- starpu_job_t j = _starpu_get_job_associated_to_task(task);
- if (j->reduction_task)
- return;
- unsigned nbuffers = task->cl->nbuffers;
- unsigned buffer;
- for (buffer = 0; buffer < nbuffers; buffer++)
- {
- starpu_data_handle handle = task->buffers[buffer].handle;
- starpu_access_mode mode = task->buffers[buffer].mode;
- /* Scratch memory does not introduce any deps */
- if (mode & STARPU_SCRATCH)
- continue;
- PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
- _starpu_detect_implicit_data_deps_with_handle(task, task, handle, mode);
- PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
- }
- _STARPU_LOG_OUT();
- }
- /* This function is called when a task has been executed so that we don't
- * create dependencies to task that do not exist anymore. */
- /* NB: We maintain a list of "ghost deps" in case FXT is enabled. Ghost
- * dependencies are the dependencies that are implicitely enforced by StarPU
- * even if they do not imply a real dependency. For instance in the following
- * sequence, f(Ar) g(Ar) h(Aw), we expect to have h depend on both f and g, but
- * if h is submitted after the termination of f or g, StarPU will not create a
- * dependency as this is not needed anymore. */
- void _starpu_release_data_enforce_sequential_consistency(struct starpu_task *task, starpu_data_handle handle)
- {
- PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
- if (handle->sequential_consistency)
- {
- /* If this is the last writer, there is no point in adding
- * extra deps to that tasks that does not exists anymore */
- if (task == handle->last_submitted_writer)
- {
- handle->last_submitted_writer = NULL;
-
- #ifndef STARPU_USE_FXT
- if (_starpu_bound_recording)
- #endif
- {
- /* Save the previous writer as the ghost last writer */
- handle->last_submitted_ghost_writer_id_is_valid = 1;
- starpu_job_t ghost_job = _starpu_get_job_associated_to_task(task);
- handle->last_submitted_ghost_writer_id = ghost_job->job_id;
- }
-
- }
-
- /* XXX can a task be both the last writer associated to a data
- * and be in its list of readers ? If not, we should not go
- * through the entire list once we have detected it was the
- * last writer. */
- /* Same if this is one of the readers: we go through the list
- * of readers and remove the task if it is found. */
- struct starpu_task_wrapper_list *l;
- l = handle->last_submitted_readers;
- struct starpu_task_wrapper_list *prev = NULL;
- while (l)
- {
- struct starpu_task_wrapper_list *next = l->next;
- if (l->task == task)
- {
- /* If we found the task in the reader list */
- free(l);
- #ifndef STARPU_USE_FXT
- if (_starpu_bound_recording)
- #endif
- {
- /* Save the job id of the reader task in the ghost reader linked list list */
- starpu_job_t ghost_reader_job = _starpu_get_job_associated_to_task(task);
- struct starpu_jobid_list *link = malloc(sizeof(struct starpu_jobid_list));
- STARPU_ASSERT(link);
- link->next = handle->last_submitted_ghost_readers_id;
- link->id = ghost_reader_job->job_id;
- handle->last_submitted_ghost_readers_id = link;
- }
- if (prev)
- {
- prev->next = next;
- }
- else {
- /* This is the first element of the list */
- handle->last_submitted_readers = next;
- }
- /* XXX can we really find the same task again
- * once we have found it ? Otherwise, we should
- * avoid going through the entire list and stop
- * as soon as we find the task. TODO: check how
- * duplicate dependencies are treated. */
- }
- else {
- prev = l;
- }
- l = next;
- }
- }
- PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
- }
- void _starpu_add_post_sync_tasks(struct starpu_task *post_sync_task, starpu_data_handle handle)
- {
- _STARPU_LOG_IN();
- PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
- if (handle->sequential_consistency)
- {
- handle->post_sync_tasks_cnt++;
- struct starpu_task_wrapper_list *link = malloc(sizeof(struct starpu_task_wrapper_list));
- link->task = post_sync_task;
- link->next = handle->post_sync_tasks;
- handle->post_sync_tasks = link;
- }
- PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
- _STARPU_LOG_OUT();
- }
- void _starpu_unlock_post_sync_tasks(starpu_data_handle handle)
- {
- struct starpu_task_wrapper_list *post_sync_tasks = NULL;
- unsigned do_submit_tasks = 0;
- PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
- if (handle->sequential_consistency)
- {
- STARPU_ASSERT(handle->post_sync_tasks_cnt > 0);
- if (--handle->post_sync_tasks_cnt == 0)
- {
- /* unlock all tasks : we need not hold the lock while unlocking all these tasks */
- do_submit_tasks = 1;
- post_sync_tasks = handle->post_sync_tasks;
- handle->post_sync_tasks = NULL;
- }
- }
- PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
- if (do_submit_tasks)
- {
- struct starpu_task_wrapper_list *link = post_sync_tasks;
- while (link) {
- /* There is no need to depend on that task now, since it was already unlocked */
- _starpu_release_data_enforce_sequential_consistency(link->task, handle);
- int ret = starpu_task_submit(link->task);
- STARPU_ASSERT(!ret);
- link = link->next;
- }
- }
- }
- /* If sequential consistency mode is enabled, this function blocks until the
- * handle is available in the requested access mode. */
- int _starpu_data_wait_until_available(starpu_data_handle handle, starpu_access_mode mode)
- {
- /* If sequential consistency is enabled, wait until data is available */
- PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
- int sequential_consistency = handle->sequential_consistency;
- if (sequential_consistency)
- {
- struct starpu_task *sync_task;
- sync_task = starpu_task_create();
- sync_task->detach = 0;
- sync_task->destroy = 1;
- /* It is not really a RW access, but we want to make sure that
- * all previous accesses are done */
- _starpu_detect_implicit_data_deps_with_handle(sync_task, sync_task, handle, mode);
- PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
- /* TODO detect if this is superflous */
- int ret = starpu_task_submit(sync_task);
- STARPU_ASSERT(!ret);
- starpu_task_wait(sync_task);
- }
- else {
- PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
- }
- return 0;
- }
|