|
@@ -47,13 +47,14 @@ static void _starpu_add_dependency(starpu_data_handle_t handle STARPU_ATTRIBUTE_
|
|
|
}
|
|
|
|
|
|
/* Add pre_sync_task as new accessor among the existing ones, making it depend on the last synchronization task if any. */
|
|
|
-static void _starpu_add_accessor(starpu_data_handle_t handle, struct starpu_task *pre_sync_task, struct starpu_task *post_sync_task)
|
|
|
+static void _starpu_add_accessor(starpu_data_handle_t handle, struct starpu_task *pre_sync_task, struct starpu_task *post_sync_task, struct _starpu_task_wrapper_dlist *post_sync_task_dependency_slot)
|
|
|
{
|
|
|
/* Add this task to the list of readers */
|
|
|
- struct _starpu_task_wrapper_list *link = (struct _starpu_task_wrapper_list *) malloc(sizeof(struct _starpu_task_wrapper_list));
|
|
|
- link->task = post_sync_task;
|
|
|
- link->next = handle->last_submitted_accessors;
|
|
|
- handle->last_submitted_accessors = link;
|
|
|
+ post_sync_task_dependency_slot->task = post_sync_task;
|
|
|
+ post_sync_task_dependency_slot->next = handle->last_submitted_accessors.next;
|
|
|
+ post_sync_task_dependency_slot->prev = &handle->last_submitted_accessors;
|
|
|
+ post_sync_task_dependency_slot->next->prev = post_sync_task_dependency_slot;
|
|
|
+ handle->last_submitted_accessors.next = post_sync_task_dependency_slot;
|
|
|
|
|
|
/* This task depends on the previous synchronization task if any */
|
|
|
if (handle->last_sync_task && handle->last_sync_task != post_sync_task)
|
|
@@ -103,9 +104,9 @@ static void _starpu_add_sync_task(starpu_data_handle_t handle, struct starpu_tas
|
|
|
{
|
|
|
/* Count the existing accessors */
|
|
|
unsigned naccessors = 0;
|
|
|
- struct _starpu_task_wrapper_list *l;
|
|
|
- l = handle->last_submitted_accessors;
|
|
|
- while (l)
|
|
|
+ struct _starpu_task_wrapper_dlist *l;
|
|
|
+ l = handle->last_submitted_accessors.next;
|
|
|
+ while (l != &handle->last_submitted_accessors)
|
|
|
{
|
|
|
if (l->task != post_sync_task)
|
|
|
naccessors++;
|
|
@@ -118,8 +119,8 @@ static void _starpu_add_sync_task(starpu_data_handle_t handle, struct starpu_tas
|
|
|
/* Put all tasks in the list into task_array */
|
|
|
struct starpu_task *task_array[naccessors];
|
|
|
unsigned i = 0;
|
|
|
- l = handle->last_submitted_accessors;
|
|
|
- while (l)
|
|
|
+ l = handle->last_submitted_accessors.next;
|
|
|
+ while (l != &handle->last_submitted_accessors)
|
|
|
{
|
|
|
STARPU_ASSERT(l->task);
|
|
|
if (l->task != post_sync_task)
|
|
@@ -129,9 +130,10 @@ static void _starpu_add_sync_task(starpu_data_handle_t handle, struct starpu_tas
|
|
|
_STARPU_DEP_DEBUG("dep %p -> %p\n", l->task, pre_sync_task);
|
|
|
}
|
|
|
|
|
|
- struct _starpu_task_wrapper_list *prev = l;
|
|
|
+ struct _starpu_task_wrapper_dlist *prev = l;
|
|
|
l = l->next;
|
|
|
- free(prev);
|
|
|
+ prev->next = NULL;
|
|
|
+ prev->prev = NULL;
|
|
|
}
|
|
|
_starpu_task_declare_deps_array(pre_sync_task, naccessors, task_array, 0);
|
|
|
}
|
|
@@ -156,7 +158,8 @@ static void _starpu_add_sync_task(starpu_data_handle_t handle, struct starpu_tas
|
|
|
handle->last_submitted_ghost_accessors_id = NULL;
|
|
|
}
|
|
|
|
|
|
- handle->last_submitted_accessors = NULL;
|
|
|
+ handle->last_submitted_accessors.next = &handle->last_submitted_accessors;
|
|
|
+ handle->last_submitted_accessors.prev = &handle->last_submitted_accessors;
|
|
|
handle->last_sync_task = post_sync_task;
|
|
|
|
|
|
if (!post_sync_task->cl) {
|
|
@@ -177,7 +180,7 @@ static void _starpu_add_sync_task(starpu_data_handle_t handle, struct starpu_tas
|
|
|
* */
|
|
|
/* NB : handle->sequential_consistency_mutex must be hold by the caller;
|
|
|
* returns a task, to be submitted after releasing that mutex. */
|
|
|
-struct starpu_task *_starpu_detect_implicit_data_deps_with_handle(struct starpu_task *pre_sync_task, struct starpu_task *post_sync_task,
|
|
|
+struct starpu_task *_starpu_detect_implicit_data_deps_with_handle(struct starpu_task *pre_sync_task, struct starpu_task *post_sync_task, struct _starpu_task_wrapper_dlist *post_sync_task_dependency_slot,
|
|
|
starpu_data_handle_t handle, enum starpu_data_access_mode mode)
|
|
|
{
|
|
|
struct starpu_task *task = NULL;
|
|
@@ -228,15 +231,16 @@ struct starpu_task *_starpu_detect_implicit_data_deps_with_handle(struct starpu_
|
|
|
{
|
|
|
_STARPU_DEP_DEBUG("concurrently\n");
|
|
|
/* Can access concurrently with current tasks */
|
|
|
- _starpu_add_accessor(handle, pre_sync_task, post_sync_task);
|
|
|
+ _starpu_add_accessor(handle, pre_sync_task, post_sync_task, post_sync_task_dependency_slot);
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
/* Can not access concurrently, have to wait for existing accessors */
|
|
|
- struct _starpu_task_wrapper_list *l = handle->last_submitted_accessors;
|
|
|
+ struct _starpu_task_wrapper_dlist *l = handle->last_submitted_accessors.next;
|
|
|
_STARPU_DEP_DEBUG("dependency\n");
|
|
|
|
|
|
- if ((l && l->next) || (handle->last_submitted_ghost_accessors_id && handle->last_submitted_ghost_accessors_id->next))
|
|
|
+ if ((l != &handle->last_submitted_accessors && l->next != &handle->last_submitted_accessors)
|
|
|
+ || (handle->last_submitted_ghost_accessors_id && handle->last_submitted_ghost_accessors_id->next))
|
|
|
{
|
|
|
/* Several previous accessors */
|
|
|
|
|
@@ -261,7 +265,7 @@ struct starpu_task *_starpu_detect_implicit_data_deps_with_handle(struct starpu_
|
|
|
/* Make this task wait for the previous ones */
|
|
|
_starpu_add_sync_task(handle, sync_task, sync_task);
|
|
|
/* And the requested task wait for this one */
|
|
|
- _starpu_add_accessor(handle, pre_sync_task, post_sync_task);
|
|
|
+ _starpu_add_accessor(handle, pre_sync_task, post_sync_task, post_sync_task_dependency_slot);
|
|
|
|
|
|
task = sync_task;
|
|
|
}
|
|
@@ -270,11 +274,13 @@ struct starpu_task *_starpu_detect_implicit_data_deps_with_handle(struct starpu_
|
|
|
{
|
|
|
/* One previous accessor, make it the sync
|
|
|
* task, and start depending on it. */
|
|
|
- if (l)
|
|
|
+ if (l != &handle->last_submitted_accessors)
|
|
|
{
|
|
|
handle->last_sync_task = l->task;
|
|
|
- handle->last_submitted_accessors = NULL;
|
|
|
- free(l);
|
|
|
+ l->next = NULL;
|
|
|
+ l->prev = NULL;
|
|
|
+ handle->last_submitted_accessors.next = &handle->last_submitted_accessors;
|
|
|
+ handle->last_submitted_accessors.prev = &handle->last_submitted_accessors;
|
|
|
}
|
|
|
else if (handle->last_submitted_ghost_accessors_id)
|
|
|
{
|
|
@@ -283,7 +289,7 @@ struct starpu_task *_starpu_detect_implicit_data_deps_with_handle(struct starpu_
|
|
|
free(handle->last_submitted_ghost_accessors_id);
|
|
|
handle->last_submitted_ghost_accessors_id = NULL;
|
|
|
}
|
|
|
- _starpu_add_accessor(handle, pre_sync_task, post_sync_task);
|
|
|
+ _starpu_add_accessor(handle, pre_sync_task, post_sync_task, post_sync_task_dependency_slot);
|
|
|
}
|
|
|
}
|
|
|
handle->last_submitted_mode = mode;
|
|
@@ -307,13 +313,14 @@ void _starpu_detect_implicit_data_deps(struct starpu_task *task)
|
|
|
if (j->reduction_task)
|
|
|
return;
|
|
|
|
|
|
- unsigned nbuffers = task->cl->nbuffers;
|
|
|
+ unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
|
|
|
+ struct _starpu_task_wrapper_dlist *dep_slots = _STARPU_JOB_GET_DEP_SLOTS(j);
|
|
|
|
|
|
unsigned buffer;
|
|
|
for (buffer = 0; buffer < nbuffers; buffer++)
|
|
|
{
|
|
|
starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, buffer);
|
|
|
- enum starpu_data_access_mode mode = STARPU_CODELET_GET_MODE(task->cl, buffer);
|
|
|
+ enum starpu_data_access_mode mode = STARPU_TASK_GET_MODE(task, buffer);
|
|
|
struct starpu_task *new_task;
|
|
|
|
|
|
/* Scratch memory does not introduce any deps */
|
|
@@ -321,7 +328,7 @@ void _starpu_detect_implicit_data_deps(struct starpu_task *task)
|
|
|
continue;
|
|
|
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
|
|
|
- new_task = _starpu_detect_implicit_data_deps_with_handle(task, task, handle, mode);
|
|
|
+ new_task = _starpu_detect_implicit_data_deps_with_handle(task, task, &dep_slots[buffer], handle, mode);
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
|
|
|
if (new_task)
|
|
|
{
|
|
@@ -341,7 +348,7 @@ void _starpu_detect_implicit_data_deps(struct starpu_task *task)
|
|
|
* if h is submitted after the termination of f or g, StarPU will not create a
|
|
|
* dependency as this is not needed anymore. */
|
|
|
/* the sequential_consistency_mutex of the handle has to be already held */
|
|
|
-void _starpu_release_data_enforce_sequential_consistency(struct starpu_task *task, starpu_data_handle_t handle)
|
|
|
+void _starpu_release_data_enforce_sequential_consistency(struct starpu_task *task, struct _starpu_task_wrapper_dlist *task_dependency_slot, starpu_data_handle_t handle)
|
|
|
{
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
|
|
|
|
|
@@ -365,63 +372,35 @@ void _starpu_release_data_enforce_sequential_consistency(struct starpu_task *tas
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /* XXX can a task be both the last writer associated to a data
|
|
|
- * and be in its list of readers ? If not, we should not go
|
|
|
- * through the entire list once we have detected it was the
|
|
|
- * last writer. */
|
|
|
-
|
|
|
/* Same if this is one of the readers: we go through the list
|
|
|
* of readers and remove the task if it is found. */
|
|
|
- struct _starpu_task_wrapper_list *l;
|
|
|
- l = handle->last_submitted_accessors;
|
|
|
- struct _starpu_task_wrapper_list *prev = NULL;
|
|
|
-#ifdef STARPU_DEVEL
|
|
|
-#warning TODO: use double-linked list to make finding ourself fast
|
|
|
-#endif
|
|
|
- while (l)
|
|
|
+ if (task_dependency_slot && task_dependency_slot->next)
|
|
|
{
|
|
|
- struct _starpu_task_wrapper_list *next = l->next;
|
|
|
-
|
|
|
- if (l->task == task)
|
|
|
- {
|
|
|
- /* If we found the task in the reader list */
|
|
|
- free(l);
|
|
|
+#ifdef STARPU_DEBUG
|
|
|
+ /* Make sure we are removing ourself from the proper handle */
|
|
|
+ struct _starpu_task_wrapper_dlist *l;
|
|
|
+ for (l = task_dependency_slot->prev; l->task; l = l->prev)
|
|
|
+ ;
|
|
|
+ STARPU_ASSERT(l == &handle->last_submitted_accessors);
|
|
|
+ for (l = task_dependency_slot->next; l->task; l = l->next)
|
|
|
+ ;
|
|
|
+ STARPU_ASSERT(l == &handle->last_submitted_accessors);
|
|
|
+#endif
|
|
|
|
|
|
+ task_dependency_slot->next->prev = task_dependency_slot->prev;
|
|
|
+ task_dependency_slot->prev->next = task_dependency_slot->next;
|
|
|
#ifndef STARPU_USE_FXT
|
|
|
- if (_starpu_bound_recording)
|
|
|
+ if (_starpu_bound_recording)
|
|
|
#endif
|
|
|
- {
|
|
|
- /* Save the job id of the reader task in the ghost reader linked list list */
|
|
|
- struct _starpu_job *ghost_reader_job = _starpu_get_job_associated_to_task(task);
|
|
|
- struct _starpu_jobid_list *link = (struct _starpu_jobid_list *) malloc(sizeof(struct _starpu_jobid_list));
|
|
|
- STARPU_ASSERT(link);
|
|
|
- link->next = handle->last_submitted_ghost_accessors_id;
|
|
|
- link->id = ghost_reader_job->job_id;
|
|
|
- handle->last_submitted_ghost_accessors_id = link;
|
|
|
- }
|
|
|
-
|
|
|
- if (prev)
|
|
|
- {
|
|
|
- prev->next = next;
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- /* This is the first element of the list */
|
|
|
- handle->last_submitted_accessors = next;
|
|
|
- }
|
|
|
-
|
|
|
- /* XXX can we really find the same task again
|
|
|
- * once we have found it ? Otherwise, we should
|
|
|
- * avoid going through the entire list and stop
|
|
|
- * as soon as we find the task. TODO: check how
|
|
|
- * duplicate dependencies are treated. */
|
|
|
- }
|
|
|
- else
|
|
|
{
|
|
|
- prev = l;
|
|
|
+ /* Save the job id of the reader task in the ghost reader linked list list */
|
|
|
+ struct _starpu_job *ghost_reader_job = _starpu_get_job_associated_to_task(task);
|
|
|
+ struct _starpu_jobid_list *link = (struct _starpu_jobid_list *) malloc(sizeof(struct _starpu_jobid_list));
|
|
|
+ STARPU_ASSERT(link);
|
|
|
+ link->next = handle->last_submitted_ghost_accessors_id;
|
|
|
+ link->id = ghost_reader_job->job_id;
|
|
|
+ handle->last_submitted_ghost_accessors_id = link;
|
|
|
}
|
|
|
-
|
|
|
- l = next;
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -434,13 +413,22 @@ void _starpu_release_task_enforce_sequential_consistency(struct _starpu_job *j)
|
|
|
{
|
|
|
struct starpu_task *task = j->task;
|
|
|
struct _starpu_data_descr *descrs = _STARPU_JOB_GET_ORDERED_BUFFERS(j);
|
|
|
+ struct _starpu_task_wrapper_dlist *slots = _STARPU_JOB_GET_DEP_SLOTS(j);
|
|
|
|
|
|
if (!task->cl)
|
|
|
return;
|
|
|
|
|
|
- unsigned nbuffers = task->cl->nbuffers;
|
|
|
-
|
|
|
+ unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
|
|
|
unsigned index;
|
|
|
+
|
|
|
+ /* Release all implicit dependencies */
|
|
|
+ for (index = 0; index < nbuffers; index++)
|
|
|
+ {
|
|
|
+ starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, index);
|
|
|
+
|
|
|
+ _starpu_release_data_enforce_sequential_consistency(task, &slots[index], handle);
|
|
|
+ }
|
|
|
+
|
|
|
for (index = 0; index < nbuffers; index++)
|
|
|
{
|
|
|
starpu_data_handle_t handle = descrs[index].handle;
|
|
@@ -451,7 +439,6 @@ void _starpu_release_task_enforce_sequential_consistency(struct _starpu_job *j)
|
|
|
* _starpu_compar_handles */
|
|
|
continue;
|
|
|
|
|
|
- _starpu_release_data_enforce_sequential_consistency(task, handle);
|
|
|
/* Release the reference acquired in _starpu_push_task_output */
|
|
|
_starpu_spin_lock(&handle->header_lock);
|
|
|
STARPU_ASSERT(handle->busy_count > 0);
|
|
@@ -512,7 +499,7 @@ void _starpu_unlock_post_sync_tasks(starpu_data_handle_t handle)
|
|
|
while (link)
|
|
|
{
|
|
|
/* There is no need to depend on that task now, since it was already unlocked */
|
|
|
- _starpu_release_data_enforce_sequential_consistency(link->task, handle);
|
|
|
+ _starpu_release_data_enforce_sequential_consistency(link->task, &_starpu_get_job_associated_to_task(link->task)->implicit_dep_slot, handle);
|
|
|
|
|
|
int ret = _starpu_task_submit_internally(link->task);
|
|
|
STARPU_ASSERT(!ret);
|
|
@@ -540,7 +527,7 @@ int _starpu_data_wait_until_available(starpu_data_handle_t handle, enum starpu_d
|
|
|
|
|
|
/* It is not really a RW access, but we want to make sure that
|
|
|
* all previous accesses are done */
|
|
|
- new_task = _starpu_detect_implicit_data_deps_with_handle(sync_task, sync_task, handle, mode);
|
|
|
+ new_task = _starpu_detect_implicit_data_deps_with_handle(sync_task, sync_task, &_starpu_get_job_associated_to_task(sync_task)->implicit_dep_slot, handle, mode);
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
|
|
|
|
|
|
if (new_task)
|