|
@@ -596,6 +596,17 @@ int starpu_prefetch_task_input_on_node(struct starpu_task *task, uint32_t node)
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
|
+static struct _starpu_data_replicate *get_replicate(struct starpu_buffer_descr *descr, int workerid, unsigned local_memory_node) {
|
|
|
+ starpu_data_handle_t handle = descr->handle;
|
|
|
+ enum starpu_access_mode mode = descr->mode;
|
|
|
+
|
|
|
+ if (mode & (STARPU_SCRATCH|STARPU_REDUX))
|
|
|
+ return &handle->per_worker[workerid];
|
|
|
+ else
|
|
|
+ /* That's a "normal" buffer (R/W) */
|
|
|
+ return &handle->per_node[local_memory_node];
|
|
|
+}
|
|
|
+
|
|
|
int _starpu_fetch_task_input(struct _starpu_job *j, uint32_t mask)
|
|
|
{
|
|
|
_STARPU_TRACE_START_FETCH_INPUT(NULL);
|
|
@@ -627,19 +638,24 @@ int _starpu_fetch_task_input(struct _starpu_job *j, uint32_t mask)
|
|
|
* _starpu_compar_handles */
|
|
|
continue;
|
|
|
|
|
|
- if (mode & (STARPU_SCRATCH|STARPU_REDUX))
|
|
|
- {
|
|
|
- local_replicate = &handle->per_worker[workerid];
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- /* That's a "normal" buffer (R/W) */
|
|
|
- local_replicate = &handle->per_node[local_memory_node];
|
|
|
- }
|
|
|
+ local_replicate = get_replicate(&descrs[index], workerid, local_memory_node);
|
|
|
|
|
|
ret = fetch_data(handle, local_replicate, mode);
|
|
|
if (STARPU_UNLIKELY(ret))
|
|
|
goto enomem;
|
|
|
+ }
|
|
|
+
|
|
|
+ /* Now that we have taken the data locks in locking order, fill the codelet interfaces in function order. */
|
|
|
+ descrs = task->buffers;
|
|
|
+
|
|
|
+ for (index = 0; index < nbuffers; index++)
|
|
|
+ {
|
|
|
+ starpu_data_handle_t handle = descrs[index].handle;
|
|
|
+ enum starpu_access_mode mode = descrs[index].mode;
|
|
|
+
|
|
|
+ struct _starpu_data_replicate *local_replicate;
|
|
|
+
|
|
|
+ local_replicate = get_replicate(&descrs[index], workerid, local_memory_node);
|
|
|
|
|
|
task->interfaces[index] = local_replicate->data_interface;
|
|
|
|
|
@@ -679,13 +695,15 @@ void _starpu_push_task_output(struct _starpu_job *j, uint32_t mask)
|
|
|
struct starpu_buffer_descr *descrs = j->ordered_buffers;
|
|
|
unsigned nbuffers = task->cl->nbuffers;
|
|
|
|
|
|
+ int workerid = starpu_worker_get_id();
|
|
|
+ unsigned local_memory_node = _starpu_get_local_memory_node();
|
|
|
+
|
|
|
unsigned index;
|
|
|
for (index = 0; index < nbuffers; index++)
|
|
|
{
|
|
|
starpu_data_handle_t handle = descrs[index].handle;
|
|
|
- enum starpu_access_mode mode = descrs[index].mode;
|
|
|
|
|
|
- struct _starpu_data_replicate *replicate;
|
|
|
+ struct _starpu_data_replicate *local_replicate;
|
|
|
|
|
|
if (index && descrs[index-1].handle == descrs[index].handle)
|
|
|
/* We have already released this data, skip it. This
|
|
@@ -693,16 +711,7 @@ void _starpu_push_task_output(struct _starpu_job *j, uint32_t mask)
|
|
|
* _starpu_compar_handles */
|
|
|
continue;
|
|
|
|
|
|
- if (mode & STARPU_RW)
|
|
|
- {
|
|
|
- unsigned local_node = _starpu_get_local_memory_node();
|
|
|
- replicate = &handle->per_node[local_node];
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- int workerid = starpu_worker_get_id();
|
|
|
- replicate = &handle->per_worker[workerid];
|
|
|
- }
|
|
|
+ local_replicate = get_replicate(&descrs[index], workerid, local_memory_node);
|
|
|
|
|
|
/* In case there was a temporary handle (eg. used for
|
|
|
* reduction), this handle may have requested to be destroyed
|
|
@@ -710,7 +719,7 @@ void _starpu_push_task_output(struct _starpu_job *j, uint32_t mask)
|
|
|
* */
|
|
|
unsigned handle_was_destroyed = handle->lazy_unregister;
|
|
|
|
|
|
- _starpu_release_data_on_node(handle, mask, replicate);
|
|
|
+ _starpu_release_data_on_node(handle, mask, local_replicate);
|
|
|
if (!handle_was_destroyed)
|
|
|
_starpu_release_data_enforce_sequential_consistency(task, handle);
|
|
|
}
|