/* StarPU --- Runtime system for heterogeneous multicore architectures. * * Copyright (C) 2010-2011 Université de Bordeaux 1 * Copyright (C) 2010 Centre National de la Recherche Scientifique * * StarPU is free software; you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published by * the Free Software Foundation; either version 2.1 of the License, or (at * your option) any later version. * * StarPU is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. * * See the GNU Lesser General Public License in COPYING.LGPL for more details. */ #include #include #include #include #include #if 0 # define _STARPU_DEP_DEBUG(fmt, args ...) fprintf(stderr, fmt, ##args); #else # define _STARPU_DEP_DEBUG(fmt, args ...) #endif /* Read after Write (RAW) or Read after Read (RAR) */ static void _starpu_add_reader_after_writer(starpu_data_handle handle, struct starpu_task *pre_sync_task, struct starpu_task *post_sync_task) { /* Add this task to the list of readers */ struct starpu_task_wrapper_list *link = (struct starpu_task_wrapper_list *) malloc(sizeof(struct starpu_task_wrapper_list)); link->task = post_sync_task; link->next = handle->last_submitted_readers; handle->last_submitted_readers = link; /* This task depends on the previous writer if any */ if (handle->last_submitted_writer) { _STARPU_DEP_DEBUG("RAW %p\n", handle); struct starpu_task *task_array[1] = {handle->last_submitted_writer}; _STARPU_DEP_DEBUG("dep %p -> %p\n", handle->last_submitted_writer, pre_sync_task); starpu_task_declare_deps_array(pre_sync_task, 1, task_array); } else { _STARPU_DEP_DEBUG("No dep\n"); } /* There was perhaps no last submitted writer but a * ghost one, we should report that here, and keep the * ghost writer valid */ if ( #ifndef STARPU_USE_FXT _starpu_bound_recording && #endif handle->last_submitted_ghost_writer_id_is_valid) { starpu_job_t pre_sync_job = _starpu_get_job_associated_to_task(pre_sync_task); STARPU_TRACE_GHOST_TASK_DEPS(handle->last_submitted_ghost_writer_id, pre_sync_job->job_id); _starpu_bound_job_id_dep(pre_sync_job, handle->last_submitted_ghost_writer_id); _STARPU_DEP_DEBUG("dep ID%lu -> %p\n", handle->last_submitted_ghost_writer_id, pre_sync_task); } } /* Write after Read (WAR) */ static void _starpu_add_writer_after_readers(starpu_data_handle handle, struct starpu_task *pre_sync_task, struct starpu_task *post_sync_task) { /* Count the readers */ unsigned nreaders = 0; struct starpu_task_wrapper_list *l; l = handle->last_submitted_readers; while (l) { nreaders++; l = l->next; } _STARPU_DEP_DEBUG("%d readers\n", nreaders); /* Put all tasks in the list into task_array */ struct starpu_task *task_array[nreaders]; unsigned i = 0; l = handle->last_submitted_readers; while (l) { STARPU_ASSERT(l->task); task_array[i++] = l->task; _STARPU_DEP_DEBUG("dep %p -> %p\n", l->task, pre_sync_task); struct starpu_task_wrapper_list *prev = l; l = l->next; free(prev); } #ifndef STARPU_USE_FXT if (_starpu_bound_recording) #endif { /* Declare all dependencies with ghost readers */ starpu_job_t pre_sync_job = _starpu_get_job_associated_to_task(pre_sync_task); struct starpu_jobid_list *ghost_readers_id = handle->last_submitted_ghost_readers_id; while (ghost_readers_id) { unsigned long id = ghost_readers_id->id; STARPU_TRACE_GHOST_TASK_DEPS(id, pre_sync_job->job_id); _starpu_bound_job_id_dep(pre_sync_job, id); _STARPU_DEP_DEBUG("dep ID%lu -> %p\n", id, pre_sync_task); struct starpu_jobid_list *prev = ghost_readers_id; ghost_readers_id = ghost_readers_id->next; free(prev); } handle->last_submitted_ghost_readers_id = NULL; } handle->last_submitted_readers = NULL; handle->last_submitted_writer = post_sync_task; starpu_task_declare_deps_array(pre_sync_task, nreaders, task_array); } /* Write after Write (WAW) */ static void _starpu_add_writer_after_writer(starpu_data_handle handle, struct starpu_task *pre_sync_task, struct starpu_task *post_sync_task) { /* (Read) Write */ /* This task depends on the previous writer */ if (handle->last_submitted_writer) { struct starpu_task *task_array[1] = {handle->last_submitted_writer}; starpu_task_declare_deps_array(pre_sync_task, 1, task_array); _STARPU_DEP_DEBUG("dep %p -> %p\n", handle->last_submitted_writer, pre_sync_task); } else { _STARPU_DEP_DEBUG("No dep\n"); } /* If there is a ghost writer instead, we * should declare a ghost dependency here, and * invalidate the ghost value. */ #ifndef STARPU_USE_FXT if (_starpu_bound_recording) #endif { if (handle->last_submitted_ghost_writer_id_is_valid) { starpu_job_t pre_sync_job = _starpu_get_job_associated_to_task(pre_sync_task); STARPU_TRACE_GHOST_TASK_DEPS(handle->last_submitted_ghost_writer_id, pre_sync_job->job_id); _starpu_bound_job_id_dep(pre_sync_job, handle->last_submitted_ghost_writer_id); _STARPU_DEP_DEBUG("dep ID%lu -> %p\n", handle->last_submitted_ghost_writer_id, pre_sync_task); handle->last_submitted_ghost_writer_id_is_valid = 0; } else { _STARPU_DEP_DEBUG("No dep ID\n"); } } handle->last_submitted_writer = post_sync_task; } static void disable_last_writer_callback(void *cl_arg) { starpu_data_handle handle = (starpu_data_handle) cl_arg; /* NB: we don't take the handle->sequential_consistency_mutex mutex * because the empty task that is used for synchronization is going to * be unlock in the context of a call to * _starpu_detect_implicit_data_deps_with_handle. It will therefore * already have been locked. */ handle->last_submitted_writer = NULL; } /* This function adds the implicit task dependencies introduced by data * sequential consistency. Two tasks are provided: pre_sync and post_sync which * respectively indicates which task is going to depend on the previous deps * and on which task future deps should wait. In the case of a dependency * introduced by a task submission, both tasks are just the submitted task, but * in the case of user interactions with the DSM, these may be different tasks. * */ /* NB : handle->sequential_consistency_mutex must be hold by the caller */ void _starpu_detect_implicit_data_deps_with_handle(struct starpu_task *pre_sync_task, struct starpu_task *post_sync_task, starpu_data_handle handle, starpu_access_mode mode) { STARPU_ASSERT(!(mode & STARPU_SCRATCH)); _STARPU_LOG_IN(); if (handle->sequential_consistency) { starpu_job_t pre_sync_job = _starpu_get_job_associated_to_task(pre_sync_task); starpu_job_t post_sync_job = _starpu_get_job_associated_to_task(post_sync_task); /* Skip tasks that are associated to a reduction phase so that * they do not interfere with the application. */ if (pre_sync_job->reduction_task || post_sync_job->reduction_task) return; _STARPU_DEP_DEBUG("Tasks %p %p\n", pre_sync_task, post_sync_task); /* In case we are generating the DAG, we add an implicit * dependency between the pre and the post sync tasks in case * they are not the same. */ if (pre_sync_task != post_sync_task #ifndef STARPU_USE_FXT && _starpu_bound_recording #endif ) { STARPU_TRACE_GHOST_TASK_DEPS(pre_sync_job->job_id, post_sync_job->job_id); _starpu_bound_task_dep(post_sync_job, pre_sync_job); } starpu_access_mode previous_mode = handle->last_submitted_mode; if (mode & STARPU_W) { _STARPU_DEP_DEBUG("W %p\n", handle); if (previous_mode & STARPU_W) { _STARPU_DEP_DEBUG("WAW %p\n", handle); _starpu_add_writer_after_writer(handle, pre_sync_task, post_sync_task); } else { /* The task submitted previously were in read-only * mode: this task must depend on all those read-only * tasks and we get rid of the list of readers */ _STARPU_DEP_DEBUG("WAR %p\n", handle); _starpu_add_writer_after_readers(handle, pre_sync_task, post_sync_task); } } else { _STARPU_DEP_DEBUG("R %p %d -> %d\n", handle, previous_mode, mode); /* Add a reader, after a writer or a reader. */ STARPU_ASSERT(pre_sync_task); STARPU_ASSERT(post_sync_task); STARPU_ASSERT(mode & (STARPU_R|STARPU_REDUX)); if (!(previous_mode & STARPU_W) && (mode != previous_mode)) { /* Read after Redux or Redux after Read: we * insert a dummy synchronization task so that * we don't need to have a gigantic number of * dependencies between all readers and all * redux tasks. */ /* Create an empty task */ struct starpu_task *new_sync_task; new_sync_task = starpu_task_create(); STARPU_ASSERT(new_sync_task); new_sync_task->cl = NULL; new_sync_task->callback_func = disable_last_writer_callback; new_sync_task->callback_arg = handle; #ifdef STARPU_USE_FXT _starpu_get_job_associated_to_task(new_sync_task)->model_name = "sync_task_redux"; #endif _starpu_add_writer_after_readers(handle, new_sync_task, new_sync_task); _starpu_task_submit_internal(new_sync_task); } _starpu_add_reader_after_writer(handle, pre_sync_task, post_sync_task); } handle->last_submitted_mode = mode; } _STARPU_LOG_OUT(); } /* Create the implicit dependencies for a newly submitted task */ void _starpu_detect_implicit_data_deps(struct starpu_task *task) { STARPU_ASSERT(task->cl); _STARPU_LOG_IN(); /* We don't want to enforce a sequential consistency for tasks that are * not visible to the application. */ starpu_job_t j = _starpu_get_job_associated_to_task(task); if (j->reduction_task) return; unsigned nbuffers = task->cl->nbuffers; unsigned buffer; for (buffer = 0; buffer < nbuffers; buffer++) { starpu_data_handle handle = task->buffers[buffer].handle; starpu_access_mode mode = task->buffers[buffer].mode; /* Scratch memory does not introduce any deps */ if (mode & STARPU_SCRATCH) continue; PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex); _starpu_detect_implicit_data_deps_with_handle(task, task, handle, mode); PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex); } _STARPU_LOG_OUT(); } /* This function is called when a task has been executed so that we don't * create dependencies to task that do not exist anymore. */ /* NB: We maintain a list of "ghost deps" in case FXT is enabled. Ghost * dependencies are the dependencies that are implicitely enforced by StarPU * even if they do not imply a real dependency. For instance in the following * sequence, f(Ar) g(Ar) h(Aw), we expect to have h depend on both f and g, but * if h is submitted after the termination of f or g, StarPU will not create a * dependency as this is not needed anymore. */ void _starpu_release_data_enforce_sequential_consistency(struct starpu_task *task, starpu_data_handle handle) { PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex); if (handle->sequential_consistency) { /* If this is the last writer, there is no point in adding * extra deps to that tasks that does not exists anymore */ if (task == handle->last_submitted_writer) { handle->last_submitted_writer = NULL; #ifndef STARPU_USE_FXT if (_starpu_bound_recording) #endif { /* Save the previous writer as the ghost last writer */ handle->last_submitted_ghost_writer_id_is_valid = 1; starpu_job_t ghost_job = _starpu_get_job_associated_to_task(task); handle->last_submitted_ghost_writer_id = ghost_job->job_id; } } /* XXX can a task be both the last writer associated to a data * and be in its list of readers ? If not, we should not go * through the entire list once we have detected it was the * last writer. */ /* Same if this is one of the readers: we go through the list * of readers and remove the task if it is found. */ struct starpu_task_wrapper_list *l; l = handle->last_submitted_readers; struct starpu_task_wrapper_list *prev = NULL; while (l) { struct starpu_task_wrapper_list *next = l->next; if (l->task == task) { /* If we found the task in the reader list */ free(l); #ifndef STARPU_USE_FXT if (_starpu_bound_recording) #endif { /* Save the job id of the reader task in the ghost reader linked list list */ starpu_job_t ghost_reader_job = _starpu_get_job_associated_to_task(task); struct starpu_jobid_list *link = (struct starpu_jobid_list *) malloc(sizeof(struct starpu_jobid_list)); STARPU_ASSERT(link); link->next = handle->last_submitted_ghost_readers_id; link->id = ghost_reader_job->job_id; handle->last_submitted_ghost_readers_id = link; } if (prev) { prev->next = next; } else { /* This is the first element of the list */ handle->last_submitted_readers = next; } /* XXX can we really find the same task again * once we have found it ? Otherwise, we should * avoid going through the entire list and stop * as soon as we find the task. TODO: check how * duplicate dependencies are treated. */ } else { prev = l; } l = next; } } PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex); } void _starpu_add_post_sync_tasks(struct starpu_task *post_sync_task, starpu_data_handle handle) { _STARPU_LOG_IN(); PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex); if (handle->sequential_consistency) { handle->post_sync_tasks_cnt++; struct starpu_task_wrapper_list *link = (struct starpu_task_wrapper_list *) malloc(sizeof(struct starpu_task_wrapper_list)); link->task = post_sync_task; link->next = handle->post_sync_tasks; handle->post_sync_tasks = link; } PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex); _STARPU_LOG_OUT(); } void _starpu_unlock_post_sync_tasks(starpu_data_handle handle) { struct starpu_task_wrapper_list *post_sync_tasks = NULL; unsigned do_submit_tasks = 0; PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex); if (handle->sequential_consistency) { STARPU_ASSERT(handle->post_sync_tasks_cnt > 0); if (--handle->post_sync_tasks_cnt == 0) { /* unlock all tasks : we need not hold the lock while unlocking all these tasks */ do_submit_tasks = 1; post_sync_tasks = handle->post_sync_tasks; handle->post_sync_tasks = NULL; } } PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex); if (do_submit_tasks) { struct starpu_task_wrapper_list *link = post_sync_tasks; while (link) { /* There is no need to depend on that task now, since it was already unlocked */ _starpu_release_data_enforce_sequential_consistency(link->task, handle); int ret = _starpu_task_submit_internal(link->task); STARPU_ASSERT(!ret); link = link->next; } } } /* If sequential consistency mode is enabled, this function blocks until the * handle is available in the requested access mode. */ int _starpu_data_wait_until_available(starpu_data_handle handle, starpu_access_mode mode) { /* If sequential consistency is enabled, wait until data is available */ PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex); int sequential_consistency = handle->sequential_consistency; if (sequential_consistency) { struct starpu_task *sync_task; sync_task = starpu_task_create(); sync_task->detach = 0; sync_task->destroy = 1; #ifdef STARPU_USE_FXT _starpu_get_job_associated_to_task(sync_task)->model_name = "sync_task"; #endif /* It is not really a RW access, but we want to make sure that * all previous accesses are done */ _starpu_detect_implicit_data_deps_with_handle(sync_task, sync_task, handle, mode); PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex); /* TODO detect if this is superflous */ int ret = _starpu_task_submit_internal(sync_task); STARPU_ASSERT(!ret); starpu_task_wait(sync_task); } else { PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex); } return 0; }