浏览代码

data_request: Record the origin task in the request

Samuel Thibault 4 年之前
父节点
当前提交
c5d75b6685

+ 21 - 18
src/datawizard/coherency.c

@@ -468,7 +468,9 @@ static struct _starpu_data_request *_starpu_search_existing_data_request(struct
 
 struct _starpu_data_request *_starpu_create_request_to_fetch_data(starpu_data_handle_t handle,
 								  struct _starpu_data_replicate *dst_replicate,
-								  enum starpu_data_access_mode mode, enum starpu_is_prefetch is_prefetch,
+								  enum starpu_data_access_mode mode,
+								  struct starpu_task *task,
+								  enum starpu_is_prefetch is_prefetch,
 								  unsigned async,
 								  void (*callback_func)(void *), void *callback_arg, int prio, const char *origin)
 {
@@ -643,7 +645,7 @@ struct _starpu_data_request *_starpu_create_request_to_fetch_data(starpu_data_ha
 			/* Create a new request if there was no request to reuse */
 			r = _starpu_create_data_request(handle, hop_src_replicate,
 							hop_dst_replicate, hop_handling_node,
-							mode, ndeps, is_prefetch, prio, 0, origin);
+							mode, ndeps, task, is_prefetch, prio, 0, origin);
 			nwait++;
 		}
 
@@ -689,7 +691,7 @@ struct _starpu_data_request *_starpu_create_request_to_fetch_data(starpu_data_ha
 		 */
 		struct _starpu_data_request *r = _starpu_create_data_request(handle, dst_replicate,
 							dst_replicate, requesting_node,
-							STARPU_W, nwait, is_prefetch, prio, 1, origin);
+							STARPU_W, nwait, task, is_prefetch, prio, 1, origin);
 
 		/* and perform the callback after termination */
 		_starpu_data_request_append_callback(r, callback_func, callback_arg);
@@ -739,7 +741,8 @@ struct _starpu_data_request *_starpu_create_request_to_fetch_data(starpu_data_ha
 }
 
 int _starpu_fetch_data_on_node(starpu_data_handle_t handle, int node, struct _starpu_data_replicate *dst_replicate,
-			       enum starpu_data_access_mode mode, unsigned detached, enum starpu_is_prefetch is_prefetch, unsigned async,
+			       enum starpu_data_access_mode mode, unsigned detached,
+			       struct starpu_task *task, enum starpu_is_prefetch is_prefetch, unsigned async,
 			       void (*callback_func)(void *), void *callback_arg, int prio, const char *origin)
 {
         _STARPU_LOG_IN();
@@ -793,7 +796,7 @@ int _starpu_fetch_data_on_node(starpu_data_handle_t handle, int node, struct _st
 
 	struct _starpu_data_request *r;
 	r = _starpu_create_request_to_fetch_data(handle, dst_replicate, mode,
-						 is_prefetch, async, callback_func, callback_arg, prio, origin);
+						 task, is_prefetch, async, callback_func, callback_arg, prio, origin);
 
 	/* If no request was created, the handle was already up-to-date on the
 	 * node. In this case, _starpu_create_request_to_fetch_data has already
@@ -808,24 +811,24 @@ int _starpu_fetch_data_on_node(starpu_data_handle_t handle, int node, struct _st
         return ret;
 }
 
-static int idle_prefetch_data_on_node(starpu_data_handle_t handle, int node, struct _starpu_data_replicate *replicate, enum starpu_data_access_mode mode, int prio)
+static int idle_prefetch_data_on_node(starpu_data_handle_t handle, int node, struct _starpu_data_replicate *replicate, enum starpu_data_access_mode mode, struct starpu_task *task, int prio)
 {
-	return _starpu_fetch_data_on_node(handle, node, replicate, mode, 1, STARPU_IDLEFETCH, 1, NULL, NULL, prio, "idle_prefetch_data_on_node");
+	return _starpu_fetch_data_on_node(handle, node, replicate, mode, 1, task, STARPU_IDLEFETCH, 1, NULL, NULL, prio, "idle_prefetch_data_on_node");
 }
 
-static int task_prefetch_data_on_node(starpu_data_handle_t handle, int node, struct _starpu_data_replicate *replicate, enum starpu_data_access_mode mode, int prio)
+static int task_prefetch_data_on_node(starpu_data_handle_t handle, int node, struct _starpu_data_replicate *replicate, enum starpu_data_access_mode mode, struct starpu_task *task, int prio)
 {
-	return _starpu_fetch_data_on_node(handle, node, replicate, mode, 1, STARPU_TASK_PREFETCH, 1, NULL, NULL, prio, "task_prefetch_data_on_node");
+	return _starpu_fetch_data_on_node(handle, node, replicate, mode, 1, task, STARPU_TASK_PREFETCH, 1, NULL, NULL, prio, "task_prefetch_data_on_node");
 }
 
-static int STARPU_ATTRIBUTE_UNUSED prefetch_data_on_node(starpu_data_handle_t handle, int node, struct _starpu_data_replicate *replicate, enum starpu_data_access_mode mode, int prio)
+static int STARPU_ATTRIBUTE_UNUSED prefetch_data_on_node(starpu_data_handle_t handle, int node, struct _starpu_data_replicate *replicate, enum starpu_data_access_mode mode, struct starpu_task *task, int prio)
 {
-	return _starpu_fetch_data_on_node(handle, node, replicate, mode, 1, STARPU_PREFETCH, 1, NULL, NULL, prio, "prefetch_data_on_node");
+	return _starpu_fetch_data_on_node(handle, node, replicate, mode, 1, task, STARPU_PREFETCH, 1, NULL, NULL, prio, "prefetch_data_on_node");
 }
 
-static int fetch_data(starpu_data_handle_t handle, int node, struct _starpu_data_replicate *replicate, enum starpu_data_access_mode mode, int prio)
+static int fetch_data(starpu_data_handle_t handle, int node, struct _starpu_data_replicate *replicate, enum starpu_data_access_mode mode, struct starpu_task *task, int prio)
 {
-	return _starpu_fetch_data_on_node(handle, node, replicate, mode, 0, STARPU_FETCH, 0, NULL, NULL, prio, "fetch_data");
+	return _starpu_fetch_data_on_node(handle, node, replicate, mode, 0, task, STARPU_FETCH, 0, NULL, NULL, prio, "fetch_data");
 }
 
 uint32_t _starpu_get_data_refcnt(starpu_data_handle_t handle, unsigned node)
@@ -935,9 +938,9 @@ int _starpu_prefetch_task_input_prio(struct starpu_task *task, int target_node,
 
 		struct _starpu_data_replicate *replicate = &handle->per_node[node];
 		if (prefetch == STARPU_PREFETCH)
-			task_prefetch_data_on_node(handle, node, replicate, mode, prio);
+			task_prefetch_data_on_node(handle, node, replicate, mode, task, prio);
 		else
-			idle_prefetch_data_on_node(handle, node, replicate, mode, prio);
+			idle_prefetch_data_on_node(handle, node, replicate, mode, task, prio);
 	}
 
 	if (prefetch == STARPU_PREFETCH)
@@ -1104,7 +1107,7 @@ int _starpu_fetch_task_input(struct starpu_task *task, struct _starpu_job *j, in
 
 		if (async)
 		{
-			ret = _starpu_fetch_data_on_node(handle, node, local_replicate, mode, 0, STARPU_FETCH, 1,
+			ret = _starpu_fetch_data_on_node(handle, node, local_replicate, mode, 0, task, STARPU_FETCH, 1,
 					_starpu_fetch_task_input_cb, worker, 0, "_starpu_fetch_task_input");
 #ifdef STARPU_SIMGRID
 			if (_starpu_simgrid_fetching_input_cost())
@@ -1120,7 +1123,7 @@ int _starpu_fetch_task_input(struct starpu_task *task, struct _starpu_job *j, in
 		}
 		else
 		{
-			ret = fetch_data(handle, node, local_replicate, mode, 0);
+			ret = fetch_data(handle, node, local_replicate, mode, task, 0);
 #ifdef STARPU_SIMGRID
 			if (_starpu_simgrid_fetching_input_cost())
 				starpu_sleep(0.000001);
@@ -1358,7 +1361,7 @@ void _starpu_fetch_nowhere_task_input(struct _starpu_job *j)
 
 		local_replicate = get_replicate(handle, mode, -1, node);
 
-		_starpu_fetch_data_on_node(handle, node, local_replicate, mode, 0, STARPU_FETCH, 1, _starpu_fetch_nowhere_task_input_cb, wrapper, 0, "_starpu_fetch_nowhere_task_input");
+		_starpu_fetch_data_on_node(handle, node, local_replicate, mode, 0, task, STARPU_FETCH, 1, _starpu_fetch_nowhere_task_input_cb, wrapper, 0, "_starpu_fetch_nowhere_task_input");
 	}
 
 	if (profiling && task->profiling_info)

+ 4 - 2
src/datawizard/coherency.h

@@ -321,7 +321,8 @@ struct _starpu_data_state
  * async means that _starpu_fetch_data_on_node will wait for completion of the request
  */
 int _starpu_fetch_data_on_node(starpu_data_handle_t handle, int node, struct _starpu_data_replicate *replicate,
-			       enum starpu_data_access_mode mode, unsigned detached, enum starpu_is_prefetch is_prefetch, unsigned async,
+			       enum starpu_data_access_mode mode, unsigned detached,
+			       struct starpu_task *task, enum starpu_is_prefetch is_prefetch, unsigned async,
 			       void (*callback_func)(void *), void *callback_arg, int prio, const char *origin);
 /** This releases a reference on the handle */
 void _starpu_release_data_on_node(struct _starpu_data_state *state, uint32_t default_wt_mask,
@@ -368,7 +369,8 @@ int _starpu_determine_request_path(starpu_data_handle_t handle,
  */
 struct _starpu_data_request *_starpu_create_request_to_fetch_data(starpu_data_handle_t handle,
 								  struct _starpu_data_replicate *dst_replicate,
-								  enum starpu_data_access_mode mode, enum starpu_is_prefetch is_prefetch,
+								  enum starpu_data_access_mode mode,
+								  struct starpu_task *task, enum starpu_is_prefetch is_prefetch,
 								  unsigned async,
 								  void (*callback_func)(void *), void *callback_arg, int prio, const char *origin);
 

+ 2 - 0
src/datawizard/data_request.c

@@ -134,6 +134,7 @@ struct _starpu_data_request *_starpu_create_data_request(starpu_data_handle_t ha
 							 int handling_node,
 							 enum starpu_data_access_mode mode,
 							 unsigned ndeps,
+							 struct starpu_task *task,
 							 enum starpu_is_prefetch is_prefetch,
 							 int prio,
 							 unsigned is_write_invalidation,
@@ -185,6 +186,7 @@ struct _starpu_data_request *_starpu_create_data_request(starpu_data_handle_t ha
 	r->completed = 0;
 	r->added_ref = 0;
 	r->prefetch = is_prefetch;
+	r->task = task;
 	r->nb_tasks_prefetch = 0;
 	r->prio = prio;
 	r->retval = -1;

+ 4 - 0
src/datawizard/data_request.h

@@ -93,6 +93,9 @@ LIST_TYPE(_starpu_data_request,
 	/** Whether this is just a prefetch request */
 	enum starpu_is_prefetch prefetch;
 
+	/** Task this request is for */
+	struct starpu_task *task;
+
 	/** Number of tasks which used this as a prefetch */
 	unsigned nb_tasks_prefetch;
 
@@ -161,6 +164,7 @@ struct _starpu_data_request *_starpu_create_data_request(starpu_data_handle_t ha
 							 int handling_node,
 							 enum starpu_data_access_mode mode,
 							 unsigned ndeps,
+							 struct starpu_task *task,
 							 enum starpu_is_prefetch is_prefetch,
 							 int prio,
 							 unsigned is_write_invalidation,

+ 1 - 1
src/datawizard/interfaces/data_interface.c

@@ -784,7 +784,7 @@ void _starpu_check_if_valid_and_fetch_data_on_node(starpu_data_handle_t handle,
 	}
 	if (valid)
 	{
-		int ret = _starpu_fetch_data_on_node(handle, handle->home_node, replicate, STARPU_R, 0, STARPU_FETCH, 0, NULL, NULL, 0, origin);
+		int ret = _starpu_fetch_data_on_node(handle, handle->home_node, replicate, STARPU_R, 0, NULL, STARPU_FETCH, 0, NULL, NULL, 0, origin);
 		STARPU_ASSERT(!ret);
 		_starpu_release_data_on_node(handle, 0, STARPU_NONE, replicate);
 	}

+ 2 - 2
src/datawizard/memalloc.c

@@ -330,7 +330,7 @@ static int STARPU_ATTRIBUTE_WARN_UNUSED_RESULT transfer_subtree_to_node(starpu_d
 		{
 			/* This is the only copy, push it to destination */
 			struct _starpu_data_request *r;
-			r = _starpu_create_request_to_fetch_data(handle, dst_replicate, STARPU_R, STARPU_FETCH, 0, NULL, NULL, 0, "transfer_subtree_to_node");
+			r = _starpu_create_request_to_fetch_data(handle, dst_replicate, STARPU_R, NULL, STARPU_FETCH, 0, NULL, NULL, 0, "transfer_subtree_to_node");
 			/* There is no way we don't need a request, since
 			 * source is OWNER, destination can't be having it */
 			STARPU_ASSERT(r);
@@ -1235,7 +1235,7 @@ void starpu_memchunk_tidy(unsigned node)
 			}
 
 			_starpu_spin_unlock(&mc_lock[node]);
-			if (!_starpu_create_request_to_fetch_data(handle, &handle->per_node[target_node], STARPU_R, STARPU_IDLEFETCH, 1, NULL, NULL, 0, "starpu_memchunk_tidy"))
+			if (!_starpu_create_request_to_fetch_data(handle, &handle->per_node[target_node], STARPU_R, NULL, STARPU_IDLEFETCH, 1, NULL, NULL, 0, "starpu_memchunk_tidy"))
 			{
 				/* No request was actually needed??
 				 * Odd, but cope with it.  */

+ 2 - 2
src/datawizard/user_interactions.c

@@ -53,7 +53,7 @@ int starpu_data_request_allocation(starpu_data_handle_t handle, unsigned node)
 
 	_starpu_spin_lock(&handle->header_lock);
 
-	r = _starpu_create_data_request(handle, NULL, &handle->per_node[node], node, STARPU_NONE, 0, STARPU_PREFETCH, 0, 0, "starpu_data_request_allocation");
+	r = _starpu_create_data_request(handle, NULL, &handle->per_node[node], node, STARPU_NONE, 0, NULL, STARPU_PREFETCH, 0, 0, "starpu_data_request_allocation");
 
 	/* we do not increase the refcnt associated to the request since we are
 	 * not waiting for its termination */
@@ -126,7 +126,7 @@ static inline void _starpu_data_acquire_launch_fetch(struct user_interaction_wra
 	starpu_data_handle_t handle = wrapper->handle;
 	struct _starpu_data_replicate *replicate = node >= 0 ? &handle->per_node[node] : NULL;
 
-	int ret = _starpu_fetch_data_on_node(handle, node, replicate, wrapper->mode, wrapper->detached, wrapper->prefetch, async, callback, callback_arg, wrapper->prio, "_starpu_data_acquire_launch_fetch");
+	int ret = _starpu_fetch_data_on_node(handle, node, replicate, wrapper->mode, wrapper->detached, NULL, wrapper->prefetch, async, callback, callback_arg, wrapper->prio, "_starpu_data_acquire_launch_fetch");
 	STARPU_ASSERT(!ret);
 }
 

+ 1 - 1
src/datawizard/write_back.c

@@ -64,7 +64,7 @@ void _starpu_write_through_data(starpu_data_handle_t handle, unsigned requesting
 
 				struct _starpu_data_request *r;
 				r = _starpu_create_request_to_fetch_data(handle, &handle->per_node[node],
-									 STARPU_R, STARPU_IDLEFETCH, 1, wt_callback, handle, 0, "_starpu_write_through_data");
+									 STARPU_R, NULL, STARPU_IDLEFETCH, 1, wt_callback, handle, 0, "_starpu_write_through_data");
 
 			        /* If no request was created, the handle was already up-to-date on the
 			         * node */

+ 2 - 2
src/debug/latency.c

@@ -34,7 +34,7 @@ void _starpu_benchmark_ping_pong(starpu_data_handle_t handle,
 		_starpu_spin_unlock(&handle->header_lock);
 
 		struct _starpu_data_replicate *replicate_0 = &handle->per_node[node0];
-		ret = _starpu_fetch_data_on_node(handle, node0, replicate_0, STARPU_RW, 0, STARPU_FETCH, 0, NULL, NULL, 0, "_starpu_benchmark_ping_pong");
+		ret = _starpu_fetch_data_on_node(handle, node0, replicate_0, STARPU_RW, 0, NULL, STARPU_FETCH, 0, NULL, NULL, 0, "_starpu_benchmark_ping_pong");
 		STARPU_ASSERT(!ret);
 		_starpu_release_data_on_node(handle, 0, STARPU_NONE, replicate_0);
 
@@ -44,7 +44,7 @@ void _starpu_benchmark_ping_pong(starpu_data_handle_t handle,
 		_starpu_spin_unlock(&handle->header_lock);
 
 		struct _starpu_data_replicate *replicate_1 = &handle->per_node[node1];
-		ret = _starpu_fetch_data_on_node(handle, node1, replicate_1, STARPU_RW, 0, STARPU_FETCH, 0, NULL, NULL, 0, "_starpu_benchmark_ping_pong");
+		ret = _starpu_fetch_data_on_node(handle, node1, replicate_1, STARPU_RW, 0, NULL, STARPU_FETCH, 0, NULL, NULL, 0, "_starpu_benchmark_ping_pong");
 		STARPU_ASSERT(!ret);
 		_starpu_release_data_on_node(handle, 0, STARPU_NONE, replicate_1);
 	}