|
@@ -46,23 +46,22 @@ static void _starpu_add_dependency(starpu_data_handle_t handle STARPU_ATTRIBUTE_
|
|
|
_starpu_add_ghost_dependency(handle, _starpu_get_job_associated_to_task(previous)->job_id, next);
|
|
|
}
|
|
|
|
|
|
-/* Read after Write (RAW) or Read after Read (RAR) */
|
|
|
-static void _starpu_add_reader_after_writer(starpu_data_handle_t handle, struct starpu_task *pre_sync_task, struct starpu_task *post_sync_task)
|
|
|
+/* Add pre_sync_task as new accessor among the existing ones, making it depend on the last synchronization task if any. */
|
|
|
+static void _starpu_add_accessor(starpu_data_handle_t handle, struct starpu_task *pre_sync_task, struct starpu_task *post_sync_task)
|
|
|
{
|
|
|
/* Add this task to the list of readers */
|
|
|
struct _starpu_task_wrapper_list *link = (struct _starpu_task_wrapper_list *) malloc(sizeof(struct _starpu_task_wrapper_list));
|
|
|
link->task = post_sync_task;
|
|
|
- link->next = handle->last_submitted_readers;
|
|
|
- handle->last_submitted_readers = link;
|
|
|
+ link->next = handle->last_submitted_accessors;
|
|
|
+ handle->last_submitted_accessors = link;
|
|
|
|
|
|
- /* This task depends on the previous writer if any */
|
|
|
- if (handle->last_submitted_writer && handle->last_submitted_writer != post_sync_task)
|
|
|
+ /* This task depends on the previous synchronization task if any */
|
|
|
+ if (handle->last_sync_task && handle->last_sync_task != post_sync_task)
|
|
|
{
|
|
|
- _STARPU_DEP_DEBUG("RAW %p\n", handle);
|
|
|
- struct starpu_task *task_array[1] = {handle->last_submitted_writer};
|
|
|
+ struct starpu_task *task_array[1] = {handle->last_sync_task};
|
|
|
_starpu_task_declare_deps_array(pre_sync_task, 1, task_array, 0);
|
|
|
- _starpu_add_dependency(handle, handle->last_submitted_writer, pre_sync_task);
|
|
|
- _STARPU_DEP_DEBUG("dep %p -> %p\n", handle->last_submitted_writer, pre_sync_task);
|
|
|
+ _starpu_add_dependency(handle, handle->last_sync_task, pre_sync_task);
|
|
|
+ _STARPU_DEP_DEBUG("dep %p -> %p\n", handle->last_sync_task, pre_sync_task);
|
|
|
}
|
|
|
else
|
|
|
{
|
|
@@ -82,12 +81,12 @@ static void _starpu_add_reader_after_writer(starpu_data_handle_t handle, struct
|
|
|
#ifdef HAVE_AYUDAME_H
|
|
|
|| AYU_event
|
|
|
#endif
|
|
|
- ) && handle->last_submitted_ghost_writer_id_is_valid)
|
|
|
+ ) && handle->last_submitted_ghost_sync_id_is_valid)
|
|
|
{
|
|
|
- _STARPU_TRACE_GHOST_TASK_DEPS(handle->last_submitted_ghost_writer_id,
|
|
|
+ _STARPU_TRACE_GHOST_TASK_DEPS(handle->last_submitted_ghost_sync_id,
|
|
|
_starpu_get_job_associated_to_task(pre_sync_task)->job_id);
|
|
|
- _starpu_add_ghost_dependency(handle, handle->last_submitted_ghost_writer_id, pre_sync_task);
|
|
|
- _STARPU_DEP_DEBUG("dep ID%lu -> %p\n", handle->last_submitted_ghost_writer_id, pre_sync_task);
|
|
|
+ _starpu_add_ghost_dependency(handle, handle->last_submitted_ghost_sync_id, pre_sync_task);
|
|
|
+ _STARPU_DEP_DEBUG("dep ID%lu -> %p\n", handle->last_submitted_ghost_sync_id, pre_sync_task);
|
|
|
}
|
|
|
|
|
|
if (!pre_sync_task->cl) {
|
|
@@ -99,27 +98,27 @@ static void _starpu_add_reader_after_writer(starpu_data_handle_t handle, struct
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-/* Write after Read (WAR) */
|
|
|
-static void _starpu_add_writer_after_readers(starpu_data_handle_t handle, struct starpu_task *pre_sync_task, struct starpu_task *post_sync_task)
|
|
|
+/* This adds a new synchronization task which depends on all the previous accessors */
|
|
|
+static void _starpu_add_sync_task(starpu_data_handle_t handle, struct starpu_task *pre_sync_task, struct starpu_task *post_sync_task)
|
|
|
{
|
|
|
- /* Count the readers */
|
|
|
- unsigned nreaders = 0;
|
|
|
+ /* Count the existing accessors */
|
|
|
+ unsigned naccessors = 0;
|
|
|
struct _starpu_task_wrapper_list *l;
|
|
|
- l = handle->last_submitted_readers;
|
|
|
+ l = handle->last_submitted_accessors;
|
|
|
while (l)
|
|
|
{
|
|
|
if (l->task != post_sync_task)
|
|
|
- nreaders++;
|
|
|
+ naccessors++;
|
|
|
l = l->next;
|
|
|
}
|
|
|
- _STARPU_DEP_DEBUG("%d readers\n", nreaders);
|
|
|
+ _STARPU_DEP_DEBUG("%d accessors\n", naccessors);
|
|
|
|
|
|
- if (nreaders > 0)
|
|
|
+ if (naccessors > 0)
|
|
|
{
|
|
|
/* Put all tasks in the list into task_array */
|
|
|
- struct starpu_task *task_array[nreaders];
|
|
|
+ struct starpu_task *task_array[naccessors];
|
|
|
unsigned i = 0;
|
|
|
- l = handle->last_submitted_readers;
|
|
|
+ l = handle->last_submitted_accessors;
|
|
|
while (l)
|
|
|
{
|
|
|
STARPU_ASSERT(l->task);
|
|
@@ -134,80 +133,31 @@ static void _starpu_add_writer_after_readers(starpu_data_handle_t handle, struct
|
|
|
l = l->next;
|
|
|
free(prev);
|
|
|
}
|
|
|
- _starpu_task_declare_deps_array(pre_sync_task, nreaders, task_array, 0);
|
|
|
+ _starpu_task_declare_deps_array(pre_sync_task, naccessors, task_array, 0);
|
|
|
}
|
|
|
#ifndef STARPU_USE_FXT
|
|
|
if (_starpu_bound_recording)
|
|
|
#endif
|
|
|
{
|
|
|
- /* Declare all dependencies with ghost readers */
|
|
|
- struct _starpu_jobid_list *ghost_readers_id = handle->last_submitted_ghost_readers_id;
|
|
|
- while (ghost_readers_id)
|
|
|
+ /* Declare all dependencies with ghost accessors */
|
|
|
+ struct _starpu_jobid_list *ghost_accessors_id = handle->last_submitted_ghost_accessors_id;
|
|
|
+ while (ghost_accessors_id)
|
|
|
{
|
|
|
- unsigned long id = ghost_readers_id->id;
|
|
|
+ unsigned long id = ghost_accessors_id->id;
|
|
|
_STARPU_TRACE_GHOST_TASK_DEPS(id,
|
|
|
_starpu_get_job_associated_to_task(pre_sync_task)->job_id);
|
|
|
_starpu_add_ghost_dependency(handle, id, pre_sync_task);
|
|
|
_STARPU_DEP_DEBUG("dep ID%lu -> %p\n", id, pre_sync_task);
|
|
|
|
|
|
- struct _starpu_jobid_list *prev = ghost_readers_id;
|
|
|
- ghost_readers_id = ghost_readers_id->next;
|
|
|
+ struct _starpu_jobid_list *prev = ghost_accessors_id;
|
|
|
+ ghost_accessors_id = ghost_accessors_id->next;
|
|
|
free(prev);
|
|
|
}
|
|
|
- handle->last_submitted_ghost_readers_id = NULL;
|
|
|
+ handle->last_submitted_ghost_accessors_id = NULL;
|
|
|
}
|
|
|
|
|
|
- handle->last_submitted_readers = NULL;
|
|
|
- handle->last_submitted_writer = post_sync_task;
|
|
|
-
|
|
|
- if (!post_sync_task->cl) {
|
|
|
- /* Add a reference to be released in _starpu_handle_job_termination */
|
|
|
- _starpu_spin_lock(&handle->header_lock);
|
|
|
- handle->busy_count++;
|
|
|
- _starpu_spin_unlock(&handle->header_lock);
|
|
|
- _starpu_get_job_associated_to_task(post_sync_task)->implicit_dep_handle = handle;
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-/* Write after Write (WAW) */
|
|
|
-static void _starpu_add_writer_after_writer(starpu_data_handle_t handle, struct starpu_task *pre_sync_task, struct starpu_task *post_sync_task)
|
|
|
-{
|
|
|
- /* (Read) Write */
|
|
|
- /* This task depends on the previous writer */
|
|
|
- if (handle->last_submitted_writer && handle->last_submitted_writer != post_sync_task)
|
|
|
- {
|
|
|
- struct starpu_task *task_array[1] = {handle->last_submitted_writer};
|
|
|
- _starpu_task_declare_deps_array(pre_sync_task, 1, task_array, 0);
|
|
|
- _starpu_add_dependency(handle, handle->last_submitted_writer, pre_sync_task);
|
|
|
- _STARPU_DEP_DEBUG("dep %p -> %p\n", handle->last_submitted_writer, pre_sync_task);
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- _STARPU_DEP_DEBUG("No dep\n");
|
|
|
- }
|
|
|
-
|
|
|
- /* If there is a ghost writer instead, we
|
|
|
- * should declare a ghost dependency here, and
|
|
|
- * invalidate the ghost value. */
|
|
|
-#ifndef STARPU_USE_FXT
|
|
|
- if (_starpu_bound_recording)
|
|
|
-#endif
|
|
|
- {
|
|
|
- if (handle->last_submitted_ghost_writer_id_is_valid)
|
|
|
- {
|
|
|
- _STARPU_TRACE_GHOST_TASK_DEPS(handle->last_submitted_ghost_writer_id,
|
|
|
- _starpu_get_job_associated_to_task(pre_sync_task)->job_id);
|
|
|
- _starpu_add_ghost_dependency(handle, handle->last_submitted_ghost_writer_id, pre_sync_task);
|
|
|
- _STARPU_DEP_DEBUG("dep ID%lu -> %p\n", handle->last_submitted_ghost_writer_id, pre_sync_task);
|
|
|
- handle->last_submitted_ghost_writer_id_is_valid = 0;
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- _STARPU_DEP_DEBUG("No dep ID\n");
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- handle->last_submitted_writer = post_sync_task;
|
|
|
+ handle->last_submitted_accessors = NULL;
|
|
|
+ handle->last_sync_task = post_sync_task;
|
|
|
|
|
|
if (!post_sync_task->cl) {
|
|
|
/* Add a reference to be released in _starpu_handle_job_termination */
|
|
@@ -245,7 +195,6 @@ struct starpu_task *_starpu_detect_implicit_data_deps_with_handle(struct starpu_
|
|
|
if (pre_sync_job->reduction_task || post_sync_job->reduction_task)
|
|
|
return NULL;
|
|
|
|
|
|
- _STARPU_DEP_DEBUG("Tasks %p %p\n", pre_sync_task, post_sync_task);
|
|
|
/* In case we are generating the DAG, we add an implicit
|
|
|
* dependency between the pre and the post sync tasks in case
|
|
|
* they are not the same. */
|
|
@@ -261,54 +210,75 @@ struct starpu_task *_starpu_detect_implicit_data_deps_with_handle(struct starpu_
|
|
|
|
|
|
enum starpu_data_access_mode previous_mode = handle->last_submitted_mode;
|
|
|
|
|
|
- if (mode & STARPU_W)
|
|
|
+ _STARPU_DEP_DEBUG("Handle %p Tasks %p %p %x->%x\n", handle, pre_sync_task, post_sync_task, previous_mode, mode);
|
|
|
+
|
|
|
+ /*
|
|
|
+ * Tasks can access the data concurrently only if they have the
|
|
|
+ * same access mode, which can only be either:
|
|
|
+ * - write with STARPU_COMMUTE
|
|
|
+ * - read
|
|
|
+ * - redux
|
|
|
+ *
|
|
|
+ * In other cases, the tasks have to depend on each other.
|
|
|
+ */
|
|
|
+
|
|
|
+ if ((mode & STARPU_W && mode & STARPU_COMMUTE && previous_mode & STARPU_W && previous_mode && STARPU_COMMUTE)
|
|
|
+ || (mode == STARPU_R && previous_mode == STARPU_R)
|
|
|
+ || (mode == STARPU_REDUX && previous_mode == STARPU_REDUX))
|
|
|
{
|
|
|
- _STARPU_DEP_DEBUG("W %p\n", handle);
|
|
|
- if (previous_mode & STARPU_W)
|
|
|
- {
|
|
|
- _STARPU_DEP_DEBUG("WAW %p\n", handle);
|
|
|
- _starpu_add_writer_after_writer(handle, pre_sync_task, post_sync_task);
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- /* The task submitted previously were in read-only
|
|
|
- * mode: this task must depend on all those read-only
|
|
|
- * tasks and we get rid of the list of readers */
|
|
|
- _STARPU_DEP_DEBUG("WAR %p\n", handle);
|
|
|
- _starpu_add_writer_after_readers(handle, pre_sync_task, post_sync_task);
|
|
|
- }
|
|
|
+ _STARPU_DEP_DEBUG("concurrently\n");
|
|
|
+ /* Can access concurrently with current tasks */
|
|
|
+ _starpu_add_accessor(handle, pre_sync_task, post_sync_task);
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- _STARPU_DEP_DEBUG("R %p %d -> %d\n", handle, previous_mode, mode);
|
|
|
- /* Add a reader, after a writer or a reader. */
|
|
|
- STARPU_ASSERT(pre_sync_task);
|
|
|
- STARPU_ASSERT(post_sync_task);
|
|
|
+ /* Can not access concurrently, have to wait for existing accessors */
|
|
|
+ struct _starpu_task_wrapper_list *l = handle->last_submitted_accessors;
|
|
|
+ _STARPU_DEP_DEBUG("dependency\n");
|
|
|
|
|
|
- STARPU_ASSERT(mode & (STARPU_R|STARPU_REDUX));
|
|
|
-
|
|
|
- if (!(previous_mode & STARPU_W) && (mode != previous_mode))
|
|
|
+ if (l && l->next)
|
|
|
{
|
|
|
- /* Read after Redux or Redux after Read: we
|
|
|
- * insert a dummy synchronization task so that
|
|
|
- * we don't need to have a gigantic number of
|
|
|
- * dependencies between all readers and all
|
|
|
- * redux tasks. */
|
|
|
-
|
|
|
- /* Create an empty task */
|
|
|
- struct starpu_task *new_sync_task;
|
|
|
- new_sync_task = starpu_task_create();
|
|
|
- STARPU_ASSERT(new_sync_task);
|
|
|
- new_sync_task->cl = NULL;
|
|
|
+ /* Several previous accessors */
|
|
|
+
|
|
|
+ if (mode == STARPU_W)
|
|
|
+ {
|
|
|
+ /* Optimization: this task can not
|
|
|
+ * combine with others anyway, use it
|
|
|
+ * as synchronization task by making it
|
|
|
+ * wait for the previous ones. */
|
|
|
+ _starpu_add_sync_task(handle, pre_sync_task, post_sync_task);
|
|
|
+ } else {
|
|
|
+ _STARPU_DEP_DEBUG("several predecessors, adding sync task\n");
|
|
|
+ /* insert an empty synchronization task
|
|
|
+ * which waits for the whole set,
|
|
|
+ * instead of creating a quadratic
|
|
|
+ * number of dependencies. */
|
|
|
+ struct starpu_task *sync_task = starpu_task_create();
|
|
|
+ STARPU_ASSERT(sync_task);
|
|
|
+ sync_task->cl = NULL;
|
|
|
#ifdef STARPU_USE_FXT
|
|
|
- _starpu_get_job_associated_to_task(new_sync_task)->model_name = "sync_task_redux";
|
|
|
+ _starpu_get_job_associated_to_task(sync_task)->model_name = "sync_task_redux";
|
|
|
#endif
|
|
|
+ /* Make this task wait for the previous ones */
|
|
|
+ _starpu_add_sync_task(handle, sync_task, sync_task);
|
|
|
+ /* And the requested task wait for this one */
|
|
|
+ _starpu_add_accessor(handle, pre_sync_task, post_sync_task);
|
|
|
|
|
|
- _starpu_add_writer_after_readers(handle, new_sync_task, new_sync_task);
|
|
|
-
|
|
|
- task = new_sync_task;
|
|
|
+ task = sync_task;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ if (l)
|
|
|
+ {
|
|
|
+ /* One previous accessor, make it the sync
|
|
|
+ * task, and start depending on it. */
|
|
|
+ handle->last_sync_task = l->task;
|
|
|
+ handle->last_submitted_accessors = NULL;
|
|
|
+ free(l);
|
|
|
+ }
|
|
|
+ _starpu_add_accessor(handle, pre_sync_task, post_sync_task);
|
|
|
}
|
|
|
- _starpu_add_reader_after_writer(handle, pre_sync_task, post_sync_task);
|
|
|
}
|
|
|
handle->last_submitted_mode = mode;
|
|
|
}
|
|
@@ -374,18 +344,18 @@ void _starpu_release_data_enforce_sequential_consistency(struct starpu_task *tas
|
|
|
|
|
|
/* If this is the last writer, there is no point in adding
|
|
|
* extra deps to that tasks that does not exists anymore */
|
|
|
- if (task == handle->last_submitted_writer)
|
|
|
+ if (task == handle->last_sync_task)
|
|
|
{
|
|
|
- handle->last_submitted_writer = NULL;
|
|
|
+ handle->last_sync_task = NULL;
|
|
|
|
|
|
#ifndef STARPU_USE_FXT
|
|
|
if (_starpu_bound_recording)
|
|
|
#endif
|
|
|
{
|
|
|
/* Save the previous writer as the ghost last writer */
|
|
|
- handle->last_submitted_ghost_writer_id_is_valid = 1;
|
|
|
+ handle->last_submitted_ghost_sync_id_is_valid = 1;
|
|
|
struct _starpu_job *ghost_job = _starpu_get_job_associated_to_task(task);
|
|
|
- handle->last_submitted_ghost_writer_id = ghost_job->job_id;
|
|
|
+ handle->last_submitted_ghost_sync_id = ghost_job->job_id;
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -397,7 +367,7 @@ void _starpu_release_data_enforce_sequential_consistency(struct starpu_task *tas
|
|
|
/* Same if this is one of the readers: we go through the list
|
|
|
* of readers and remove the task if it is found. */
|
|
|
struct _starpu_task_wrapper_list *l;
|
|
|
- l = handle->last_submitted_readers;
|
|
|
+ l = handle->last_submitted_accessors;
|
|
|
struct _starpu_task_wrapper_list *prev = NULL;
|
|
|
#ifdef STARPU_DEVEL
|
|
|
#warning TODO: use double-linked list to make finding ourself fast
|
|
@@ -419,9 +389,9 @@ void _starpu_release_data_enforce_sequential_consistency(struct starpu_task *tas
|
|
|
struct _starpu_job *ghost_reader_job = _starpu_get_job_associated_to_task(task);
|
|
|
struct _starpu_jobid_list *link = (struct _starpu_jobid_list *) malloc(sizeof(struct _starpu_jobid_list));
|
|
|
STARPU_ASSERT(link);
|
|
|
- link->next = handle->last_submitted_ghost_readers_id;
|
|
|
+ link->next = handle->last_submitted_ghost_accessors_id;
|
|
|
link->id = ghost_reader_job->job_id;
|
|
|
- handle->last_submitted_ghost_readers_id = link;
|
|
|
+ handle->last_submitted_ghost_accessors_id = link;
|
|
|
}
|
|
|
|
|
|
if (prev)
|
|
@@ -431,7 +401,7 @@ void _starpu_release_data_enforce_sequential_consistency(struct starpu_task *tas
|
|
|
else
|
|
|
{
|
|
|
/* This is the first element of the list */
|
|
|
- handle->last_submitted_readers = next;
|
|
|
+ handle->last_submitted_accessors = next;
|
|
|
}
|
|
|
|
|
|
/* XXX can we really find the same task again
|