|
@@ -931,28 +931,57 @@ struct _starpu_data_replicate *get_replicate(starpu_data_handle_t handle, enum s
|
|
|
return &handle->per_node[node];
|
|
|
}
|
|
|
|
|
|
-/* Synchronously fetch data for a given task (if it's not there already) */
|
|
|
-int _starpu_fetch_task_input(struct _starpu_job *j)
|
|
|
+/* Callback used when a buffer is send asynchronously to the sink */
|
|
|
+static void _starpu_fetch_task_input_cb(void *arg)
|
|
|
{
|
|
|
- _STARPU_TRACE_START_FETCH_INPUT(NULL);
|
|
|
+ struct _starpu_worker * worker = (struct _starpu_worker *) arg;
|
|
|
+
|
|
|
+ /* increase the number of buffer received */
|
|
|
+ STARPU_WMB();
|
|
|
+ (void)STARPU_ATOMIC_ADD(&worker->nb_buffers_transferred, 1);
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+/* Synchronously or asynchronously fetch data for a given task (if it's not there already)
|
|
|
+ * Returns the number of data acquired here. */
|
|
|
+
|
|
|
+/* The synchronous version of _starpu_fetch_task_input must be called before
|
|
|
+ * executing the task. __starpu_push_task_output but be called after the
|
|
|
+ * execution of the task. */
|
|
|
+/* To improve overlapping, the driver can, before calling the synchronous
|
|
|
+ * version of _starpu_fetch_task_input, call _starpu_fetch_task_input with
|
|
|
+ * async==1, then wait for transfers to complete, then call
|
|
|
+ * _starpu_release_fetch_task_input_async to release them before calling the
|
|
|
+ * synchronous version of _starpu_fetch_task_input. */
|
|
|
+int _starpu_fetch_task_input(struct starpu_task *task, struct _starpu_job *j, int async)
|
|
|
+{
|
|
|
+ struct _starpu_worker *worker = _starpu_get_local_worker_key();
|
|
|
+ int workerid = worker->workerid;
|
|
|
+ if (async)
|
|
|
+ {
|
|
|
+ worker->task_transferring = task;
|
|
|
+ worker->nb_buffers_transferred = 0;
|
|
|
+ _STARPU_TRACE_WORKER_START_FETCH_INPUT(NULL, workerid);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ _STARPU_TRACE_START_FETCH_INPUT(NULL);
|
|
|
|
|
|
int profiling = starpu_profiling_status_get();
|
|
|
- struct starpu_task *task = j->task;
|
|
|
if (profiling && task->profiling_info)
|
|
|
_starpu_clock_gettime(&task->profiling_info->acquire_data_start_time);
|
|
|
|
|
|
struct _starpu_data_descr *descrs = _STARPU_JOB_GET_ORDERED_BUFFERS(j);
|
|
|
unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
|
|
|
+ unsigned nacquires;
|
|
|
|
|
|
unsigned local_memory_node = _starpu_memory_node_get_local_key();
|
|
|
|
|
|
- int workerid = starpu_worker_get_id_check();
|
|
|
-
|
|
|
#ifdef STARPU_USE_FXT
|
|
|
unsigned long total_size = 0;
|
|
|
#endif
|
|
|
|
|
|
unsigned index;
|
|
|
+ nacquires = 0;
|
|
|
for (index = 0; index < nbuffers; index++)
|
|
|
{
|
|
|
int ret;
|
|
@@ -976,13 +1005,33 @@ int _starpu_fetch_task_input(struct _starpu_job *j)
|
|
|
|
|
|
local_replicate = get_replicate(handle, mode, workerid, node);
|
|
|
|
|
|
- ret = fetch_data(handle, node, local_replicate, mode, 0);
|
|
|
- if (STARPU_UNLIKELY(ret))
|
|
|
- goto enomem;
|
|
|
+ if (async)
|
|
|
+ {
|
|
|
+ ret = _starpu_fetch_data_on_node(handle, node, local_replicate, mode, 0, 0, 1,
|
|
|
+ _starpu_fetch_task_input_cb, worker, 0, "_starpu_src_common_worker_internal_work");
|
|
|
+ if (STARPU_UNLIKELY(ret))
|
|
|
+ {
|
|
|
+ /* Ooops, not enough memory, make worker wait for these for now, and the synchronous call will finish by forcing eviction*/
|
|
|
+ worker->nb_buffers_totransfer = nacquires;
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ ret = fetch_data(handle, node, local_replicate, mode, 0);
|
|
|
+ if (STARPU_UNLIKELY(ret))
|
|
|
+ goto enomem;
|
|
|
+ }
|
|
|
|
|
|
#ifdef STARPU_USE_FXT
|
|
|
total_size += _starpu_data_get_size(handle);
|
|
|
#endif
|
|
|
+ nacquires++;
|
|
|
+ }
|
|
|
+ if (async)
|
|
|
+ {
|
|
|
+ worker->nb_buffers_totransfer = nacquires;
|
|
|
+ return 0;
|
|
|
}
|
|
|
|
|
|
_STARPU_TRACE_DATA_LOAD(workerid,total_size);
|
|
@@ -1043,6 +1092,53 @@ enomem:
|
|
|
return -1;
|
|
|
}
|
|
|
|
|
|
+/* This is to be called after having called _starpu_fetch_task_input with async=1 and getting the cb called as many times as there are buffers. */
|
|
|
+int _starpu_release_fetch_task_input_async(struct _starpu_job *j, int workerid, int nbtransfers)
|
|
|
+{
|
|
|
+ STARPU_RMB();
|
|
|
+ _STARPU_TRACE_WORKER_END_FETCH_INPUT(NULL, workerid);
|
|
|
+ struct starpu_task *task = j->task;
|
|
|
+
|
|
|
+ struct _starpu_data_descr *descrs = _STARPU_JOB_GET_ORDERED_BUFFERS(j);
|
|
|
+ unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
|
|
|
+ unsigned local_memory_node = _starpu_memory_node_get_local_key();
|
|
|
+ unsigned index;
|
|
|
+ unsigned nreleases;
|
|
|
+
|
|
|
+ nreleases = 0;
|
|
|
+ for (index = 0; index < nbuffers; index++)
|
|
|
+ {
|
|
|
+ if (nreleases == nbtransfers)
|
|
|
+ /* That was a partial fetch */
|
|
|
+ break;
|
|
|
+ starpu_data_handle_t handle = descrs[index].handle;
|
|
|
+ enum starpu_data_access_mode mode = descrs[index].mode;
|
|
|
+ int node = descrs[index].node;
|
|
|
+ if (node == -1)
|
|
|
+ node = local_memory_node;
|
|
|
+
|
|
|
+ struct _starpu_data_replicate *local_replicate;
|
|
|
+
|
|
|
+ if (index && descrs[index-1].handle == descrs[index].handle)
|
|
|
+ /* We have already took this data, skip it. This
|
|
|
+ * depends on ordering putting writes before reads, see
|
|
|
+ * _starpu_compar_handles */
|
|
|
+ continue;
|
|
|
+
|
|
|
+ local_replicate = get_replicate(handle, mode, workerid, node);
|
|
|
+
|
|
|
+ /* Release our refcnt */
|
|
|
+ _starpu_spin_lock(&handle->header_lock);
|
|
|
+ local_replicate->refcnt--;
|
|
|
+ STARPU_ASSERT(local_replicate->refcnt >= 0);
|
|
|
+ STARPU_ASSERT(handle->busy_count > 0);
|
|
|
+ handle->busy_count--;
|
|
|
+ if (!_starpu_data_check_not_busy(handle))
|
|
|
+ _starpu_spin_unlock(&handle->header_lock);
|
|
|
+ }
|
|
|
+ return 0;
|
|
|
+}
|
|
|
+
|
|
|
/* Release task data dependencies */
|
|
|
void __starpu_push_task_output(struct _starpu_job *j)
|
|
|
{
|