|
@@ -404,7 +404,7 @@ int _starpu_determine_request_path(starpu_data_handle_t handle,
|
|
|
/* handle->lock should be taken. r is returned locked. The node parameter
|
|
|
* indicate either the source of the request, or the destination for a
|
|
|
* write-only request. */
|
|
|
-static struct _starpu_data_request *_starpu_search_existing_data_request(struct _starpu_data_replicate *replicate, unsigned node, enum starpu_data_access_mode mode, unsigned is_prefetch)
|
|
|
+static struct _starpu_data_request *_starpu_search_existing_data_request(struct _starpu_data_replicate *replicate, unsigned node, enum starpu_data_access_mode mode, enum _starpu_is_prefetch is_prefetch)
|
|
|
{
|
|
|
struct _starpu_data_request *r;
|
|
|
|
|
@@ -467,7 +467,7 @@ static struct _starpu_data_request *_starpu_search_existing_data_request(struct
|
|
|
|
|
|
struct _starpu_data_request *_starpu_create_request_to_fetch_data(starpu_data_handle_t handle,
|
|
|
struct _starpu_data_replicate *dst_replicate,
|
|
|
- enum starpu_data_access_mode mode, unsigned is_prefetch,
|
|
|
+ enum starpu_data_access_mode mode, enum _starpu_is_prefetch is_prefetch,
|
|
|
unsigned async,
|
|
|
void (*callback_func)(void *), void *callback_arg, int prio, const char *origin)
|
|
|
{
|
|
@@ -522,7 +522,13 @@ struct _starpu_data_request *_starpu_create_request_to_fetch_data(starpu_data_ha
|
|
|
#endif
|
|
|
|
|
|
if (dst_replicate->mc)
|
|
|
+ {
|
|
|
+ if (is_prefetch == STARPU_TASK_PREFETCH)
|
|
|
+ /* Make sure it stays there */
|
|
|
+ dst_replicate->mc->nb_tasks_prefetch++;
|
|
|
+
|
|
|
_starpu_memchunk_recently_used(dst_replicate->mc, requesting_node);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
_starpu_spin_unlock(&handle->header_lock);
|
|
@@ -567,6 +573,9 @@ struct _starpu_data_request *_starpu_create_request_to_fetch_data(starpu_data_ha
|
|
|
if (_starpu_allocate_memory_on_node(handle, dst_replicate, is_prefetch) == 0)
|
|
|
{
|
|
|
_starpu_update_data_state(handle, dst_replicate, mode);
|
|
|
+ if (is_prefetch == STARPU_TASK_PREFETCH)
|
|
|
+ /* Make sure it stays there */
|
|
|
+ dst_replicate->mc->nb_tasks_prefetch++;
|
|
|
|
|
|
_starpu_spin_unlock(&handle->header_lock);
|
|
|
|
|
@@ -645,9 +654,17 @@ struct _starpu_data_request *_starpu_create_request_to_fetch_data(starpu_data_ha
|
|
|
STARPU_ASSERT(r->next_req_count <= STARPU_MAXNODES);
|
|
|
}
|
|
|
}
|
|
|
- else if (!write_invalidation)
|
|
|
- /* The last request will perform the callback after termination */
|
|
|
- _starpu_data_request_append_callback(r, callback_func, callback_arg);
|
|
|
+ else
|
|
|
+ {
|
|
|
+ if (is_prefetch == STARPU_TASK_PREFETCH)
|
|
|
+ /* Make last request add the prefetch count on the mc to keep the data
|
|
|
+ * there until the task gets to execute. */
|
|
|
+ r->nb_tasks_prefetch++;
|
|
|
+
|
|
|
+ if (!write_invalidation)
|
|
|
+ /* The last request will perform the callback after termination */
|
|
|
+ _starpu_data_request_append_callback(r, callback_func, callback_arg);
|
|
|
+ }
|
|
|
|
|
|
|
|
|
if (reused_requests[hop])
|
|
@@ -712,7 +729,7 @@ struct _starpu_data_request *_starpu_create_request_to_fetch_data(starpu_data_ha
|
|
|
}
|
|
|
|
|
|
int _starpu_fetch_data_on_node(starpu_data_handle_t handle, int node, struct _starpu_data_replicate *dst_replicate,
|
|
|
- enum starpu_data_access_mode mode, unsigned detached, unsigned is_prefetch, unsigned async,
|
|
|
+ enum starpu_data_access_mode mode, unsigned detached, enum _starpu_is_prefetch is_prefetch, unsigned async,
|
|
|
void (*callback_func)(void *), void *callback_arg, int prio, const char *origin)
|
|
|
{
|
|
|
_STARPU_LOG_IN();
|
|
@@ -744,6 +761,7 @@ int _starpu_fetch_data_on_node(starpu_data_handle_t handle, int node, struct _st
|
|
|
if (src_node_mask == 0)
|
|
|
{
|
|
|
/* no valid copy, nothing to prefetch */
|
|
|
+ _STARPU_DISP("Warning: no valid copy to prefetch?! that's not supposed to happen, please report\n");
|
|
|
_starpu_spin_unlock(&handle->header_lock);
|
|
|
return 0;
|
|
|
}
|
|
@@ -782,17 +800,22 @@ int _starpu_fetch_data_on_node(starpu_data_handle_t handle, int node, struct _st
|
|
|
|
|
|
static int idle_prefetch_data_on_node(starpu_data_handle_t handle, int node, struct _starpu_data_replicate *replicate, enum starpu_data_access_mode mode, int prio)
|
|
|
{
|
|
|
- return _starpu_fetch_data_on_node(handle, node, replicate, mode, 1, 2, 1, NULL, NULL, prio, "idle_prefetch_data_on_node");
|
|
|
+ return _starpu_fetch_data_on_node(handle, node, replicate, mode, 1, STARPU_IDLEFETCH, 1, NULL, NULL, prio, "idle_prefetch_data_on_node");
|
|
|
}
|
|
|
|
|
|
-static int prefetch_data_on_node(starpu_data_handle_t handle, int node, struct _starpu_data_replicate *replicate, enum starpu_data_access_mode mode, int prio)
|
|
|
+static int task_prefetch_data_on_node(starpu_data_handle_t handle, int node, struct _starpu_data_replicate *replicate, enum starpu_data_access_mode mode, int prio)
|
|
|
{
|
|
|
- return _starpu_fetch_data_on_node(handle, node, replicate, mode, 1, 1, 1, NULL, NULL, prio, "prefetch_data_on_node");
|
|
|
+ return _starpu_fetch_data_on_node(handle, node, replicate, mode, 1, STARPU_TASK_PREFETCH, 1, NULL, NULL, prio, "task_prefetch_data_on_node");
|
|
|
+}
|
|
|
+
|
|
|
+static int STARPU_ATTRIBUTE_UNUSED prefetch_data_on_node(starpu_data_handle_t handle, int node, struct _starpu_data_replicate *replicate, enum starpu_data_access_mode mode, int prio)
|
|
|
+{
|
|
|
+ return _starpu_fetch_data_on_node(handle, node, replicate, mode, 1, STARPU_PREFETCH, 1, NULL, NULL, prio, "prefetch_data_on_node");
|
|
|
}
|
|
|
|
|
|
static int fetch_data(starpu_data_handle_t handle, int node, struct _starpu_data_replicate *replicate, enum starpu_data_access_mode mode, int prio)
|
|
|
{
|
|
|
- return _starpu_fetch_data_on_node(handle, node, replicate, mode, 0, 0, 0, NULL, NULL, prio, "fetch_data");
|
|
|
+ return _starpu_fetch_data_on_node(handle, node, replicate, mode, 0, STARPU_FETCH, 0, NULL, NULL, prio, "fetch_data");
|
|
|
}
|
|
|
|
|
|
uint32_t _starpu_get_data_refcnt(starpu_data_handle_t handle, unsigned node)
|
|
@@ -911,10 +934,11 @@ int starpu_prefetch_task_input_on_node_prio(struct starpu_task *task, unsigned t
|
|
|
int node = _starpu_task_data_get_node_on_node(task, index, target_node);
|
|
|
|
|
|
struct _starpu_data_replicate *replicate = &handle->per_node[node];
|
|
|
- prefetch_data_on_node(handle, node, replicate, mode, prio);
|
|
|
+ task_prefetch_data_on_node(handle, node, replicate, mode, prio);
|
|
|
|
|
|
_starpu_set_data_requested_flag_if_needed(handle, replicate);
|
|
|
}
|
|
|
+ task->prefetched = 1;
|
|
|
|
|
|
return 0;
|
|
|
}
|
|
@@ -988,10 +1012,11 @@ int starpu_prefetch_task_input_for_prio(struct starpu_task *task, unsigned worke
|
|
|
int node = _starpu_task_data_get_node_on_worker(task, index, worker);
|
|
|
|
|
|
struct _starpu_data_replicate *replicate = &handle->per_node[node];
|
|
|
- prefetch_data_on_node(handle, node, replicate, mode, prio);
|
|
|
+ task_prefetch_data_on_node(handle, node, replicate, mode, prio);
|
|
|
|
|
|
_starpu_set_data_requested_flag_if_needed(handle, replicate);
|
|
|
}
|
|
|
+ task->prefetched = 1;
|
|
|
|
|
|
return 0;
|
|
|
}
|
|
@@ -1133,7 +1158,7 @@ int _starpu_fetch_task_input(struct starpu_task *task, struct _starpu_job *j, in
|
|
|
|
|
|
if (async)
|
|
|
{
|
|
|
- ret = _starpu_fetch_data_on_node(handle, node, local_replicate, mode, 0, 0, 1,
|
|
|
+ ret = _starpu_fetch_data_on_node(handle, node, local_replicate, mode, 0, STARPU_FETCH, 1,
|
|
|
_starpu_fetch_task_input_cb, worker, 0, "_starpu_fetch_task_input");
|
|
|
#ifdef STARPU_SIMGRID
|
|
|
if (_starpu_simgrid_fetching_input_cost())
|
|
@@ -1223,7 +1248,19 @@ void _starpu_fetch_task_input_tail(struct starpu_task *task, struct _starpu_job
|
|
|
local_replicate = get_replicate(handle, mode, workerid, node);
|
|
|
_starpu_spin_lock(&handle->header_lock);
|
|
|
if (local_replicate->mc)
|
|
|
+ {
|
|
|
local_replicate->mc->diduse = 1;
|
|
|
+ if (task->prefetched &&
|
|
|
+ !(mode & (STARPU_SCRATCH|STARPU_REDUX)) &&
|
|
|
+ (mode & STARPU_R))
|
|
|
+ {
|
|
|
+ /* Allocations or transfer prefetchs should have been done by now and marked
|
|
|
+ * this mc as needed for us.
|
|
|
+ * Now that we added a reference for the task, we can relieve that. */
|
|
|
+ STARPU_ASSERT(local_replicate->mc->nb_tasks_prefetch > 0);
|
|
|
+ local_replicate->mc->nb_tasks_prefetch--;
|
|
|
+ }
|
|
|
+ }
|
|
|
_starpu_spin_unlock(&handle->header_lock);
|
|
|
|
|
|
_STARPU_TASK_SET_INTERFACE(task , local_replicate->data_interface, descrs[index].index);
|
|
@@ -1372,7 +1409,7 @@ void _starpu_fetch_nowhere_task_input(struct _starpu_job *j)
|
|
|
|
|
|
local_replicate = get_replicate(handle, mode, -1, node);
|
|
|
|
|
|
- _starpu_fetch_data_on_node(handle, node, local_replicate, mode, 0, 0, 1, _starpu_fetch_nowhere_task_input_cb, wrapper, 0, "_starpu_fetch_nowhere_task_input");
|
|
|
+ _starpu_fetch_data_on_node(handle, node, local_replicate, mode, 0, STARPU_FETCH, 1, _starpu_fetch_nowhere_task_input_cb, wrapper, 0, "_starpu_fetch_nowhere_task_input");
|
|
|
}
|
|
|
|
|
|
if (profiling && task->profiling_info)
|