Browse Source

port r19310 from 1.2: Fix locking memory replicates when e.g. invalidating data: we need to lock all nodes to prevent e.g. an eviction to disk.

Samuel Thibault 8 years ago
parent
commit
de0bb767b0

+ 19 - 1
doc/doxygen/chapters/api/data_management.doxy

@@ -302,22 +302,39 @@ Similarly to starpu_data_acquire_cb(), this function is
 non-blocking and may be called from task callbacks. Upon successful
 completion, this function returns 0.
 
+\def STARPU_ACQUIRE_NO_NODE
+\ingroup API_Data_Management
+This macro can be used to acquire data, but not require it to be available on a given node, only enforce R/W dependencies.
+This can for instance be used to wait for tasks which produce the data, but without requesting a fetch to the main memory.
+
+\def STARPU_ACQUIRE_ALL_NODES
+\ingroup API_Data_Management
+This is the same as STARPU_ACQUIRE_NO_NODE, but will lock the data on all nodes, preventing them from being evicted for instance.
+This is mostly useful inside starpu only.
+
 \fn int starpu_data_acquire_on_node(starpu_data_handle_t handle, int node, enum starpu_data_access_mode mode)
 \ingroup API_Data_Management
 This is the same as starpu_data_acquire(), except that the data
-will be available on the given memory node instead of main memory.
+will be available on the given memory node instead of main
+memory.
+STARPU_ACQUIRE_NO_NODE and STARPU_ACQUIRE_ALL_NODES can be used instead of an
+explicit node number.
 
 \fn int starpu_data_acquire_on_node_cb(starpu_data_handle_t handle, int node, enum starpu_data_access_mode mode, void (*callback)(void *), void *arg)
 \ingroup API_Data_Management
 This is the same as starpu_data_acquire_cb(), except that the
 data will be available on the given memory node instead of main
 memory.
+STARPU_ACQUIRE_NO_NODE and STARPU_ACQUIRE_ALL_NODES can be used instead of an
+explicit node number.
 
 \fn int starpu_data_acquire_on_node_cb_sequential_consistency(starpu_data_handle_t handle, int node, enum starpu_data_access_mode mode, void (*callback)(void *), void *arg, int sequential_consistency)
 \ingroup API_Data_Management
 This is the same as starpu_data_acquire_cb_sequential_consistency(), except that the
 data will be available on the given memory node instead of main
 memory.
+STARPU_ACQUIRE_NO_NODE and STARPU_ACQUIRE_ALL_NODES can be used instead of an
+explicit node number.
 
 \def STARPU_DATA_ACQUIRE_CB(handle, mode, code)
 \ingroup API_Data_Management
@@ -338,6 +355,7 @@ starpu_data_acquire_cb().
 \ingroup API_Data_Management
 This is the same as starpu_data_release(), except that the data
 will be available on the given memory \p node instead of main memory.
+The \p node parameter must be exactly the same as the corresponding starpu_data_acquire_on_node* call.
 
 \fn starpu_arbiter_t starpu_arbiter_create(void)
 \ingroup API_Data_Management

+ 2 - 0
include/starpu_data.h

@@ -61,6 +61,8 @@ void starpu_data_invalidate_submit(starpu_data_handle_t handle);
 
 void starpu_data_advise_as_important(starpu_data_handle_t handle, unsigned is_important);
 
+#define STARPU_ACQUIRE_NO_NODE -1
+#define STARPU_ACQUIRE_ALL_NODES -2
 int starpu_data_acquire(starpu_data_handle_t handle, enum starpu_data_access_mode mode);
 int starpu_data_acquire_on_node(starpu_data_handle_t handle, int node, enum starpu_data_access_mode mode);
 int starpu_data_acquire_cb(starpu_data_handle_t handle, enum starpu_data_access_mode mode, void (*callback)(void *), void *arg);

+ 17 - 11
src/datawizard/coherency.c

@@ -717,7 +717,7 @@ struct _starpu_data_request *_starpu_create_request_to_fetch_data(starpu_data_ha
 	return requests[nhops - 1];
 }
 
-int _starpu_fetch_data_on_node(starpu_data_handle_t handle, struct _starpu_data_replicate *dst_replicate,
+int _starpu_fetch_data_on_node(starpu_data_handle_t handle, int node, struct _starpu_data_replicate *dst_replicate,
 			       enum starpu_data_access_mode mode, unsigned detached, unsigned is_prefetch, unsigned async,
 			       void (*callback_func)(void *), void *callback_arg, int prio, const char *origin)
 {
@@ -738,6 +738,12 @@ int _starpu_fetch_data_on_node(starpu_data_handle_t handle, struct _starpu_data_
 		/* Take references which will be released by _starpu_release_data_on_node */
 		if (dst_replicate)
 			dst_replicate->refcnt++;
+		else if (node == STARPU_ACQUIRE_ALL_NODES)
+		{
+			int i;
+			for (i = 0; i < STARPU_MAXNODES; i++)
+				handle->per_node[i].refcnt++;
+		}
 		handle->busy_count++;
 	}
 
@@ -758,19 +764,19 @@ int _starpu_fetch_data_on_node(starpu_data_handle_t handle, struct _starpu_data_
         return ret;
 }
 
-static int idle_prefetch_data_on_node(starpu_data_handle_t handle, struct _starpu_data_replicate *replicate, enum starpu_data_access_mode mode, int prio)
+static int idle_prefetch_data_on_node(starpu_data_handle_t handle, int node, struct _starpu_data_replicate *replicate, enum starpu_data_access_mode mode, int prio)
 {
-	return _starpu_fetch_data_on_node(handle, replicate, mode, 1, 2, 1, NULL, NULL, prio, "idle_prefetch_data_on_node");
+	return _starpu_fetch_data_on_node(handle, node, replicate, mode, 1, 2, 1, NULL, NULL, prio, "idle_prefetch_data_on_node");
 }
 
-static int prefetch_data_on_node(starpu_data_handle_t handle, struct _starpu_data_replicate *replicate, enum starpu_data_access_mode mode, int prio)
+static int prefetch_data_on_node(starpu_data_handle_t handle, int node, struct _starpu_data_replicate *replicate, enum starpu_data_access_mode mode, int prio)
 {
-	return _starpu_fetch_data_on_node(handle, replicate, mode, 1, 1, 1, NULL, NULL, prio, "prefetch_data_on_node");
+	return _starpu_fetch_data_on_node(handle, node, replicate, mode, 1, 1, 1, NULL, NULL, prio, "prefetch_data_on_node");
 }
 
-static int fetch_data(starpu_data_handle_t handle, struct _starpu_data_replicate *replicate, enum starpu_data_access_mode mode, int prio)
+static int fetch_data(starpu_data_handle_t handle, int node, struct _starpu_data_replicate *replicate, enum starpu_data_access_mode mode, int prio)
 {
-	return _starpu_fetch_data_on_node(handle, replicate, mode, 0, 0, 0, NULL, NULL, prio, "fetch_data");
+	return _starpu_fetch_data_on_node(handle, node, replicate, mode, 0, 0, 0, NULL, NULL, prio, "fetch_data");
 }
 
 uint32_t _starpu_get_data_refcnt(starpu_data_handle_t handle, unsigned node)
@@ -863,7 +869,7 @@ int starpu_prefetch_task_input_on_node_prio(struct starpu_task *task, unsigned n
 			continue;
 
 		struct _starpu_data_replicate *replicate = &handle->per_node[node];
-		prefetch_data_on_node(handle, replicate, mode, prio);
+		prefetch_data_on_node(handle, node, replicate, mode, prio);
 
 		_starpu_set_data_requested_flag_if_needed(handle, replicate);
 	}
@@ -893,7 +899,7 @@ int starpu_idle_prefetch_task_input_on_node_prio(struct starpu_task *task, unsig
 			continue;
 
 		struct _starpu_data_replicate *replicate = &handle->per_node[node];
-		idle_prefetch_data_on_node(handle, replicate, mode, prio);
+		idle_prefetch_data_on_node(handle, node, replicate, mode, prio);
 	}
 
 	return 0;
@@ -971,7 +977,7 @@ int _starpu_fetch_task_input(struct _starpu_job *j)
 
 		local_replicate = get_replicate(handle, mode, workerid, node);
 
-		ret = fetch_data(handle, local_replicate, mode, 0);
+		ret = fetch_data(handle, node, local_replicate, mode, 0);
 		if (STARPU_UNLIKELY(ret))
 			goto enomem;
 
@@ -1165,7 +1171,7 @@ void _starpu_fetch_nowhere_task_input(struct _starpu_job *j)
 
 		local_replicate = get_replicate(handle, mode, -1, node);
 
-		_starpu_fetch_data_on_node(handle, local_replicate, mode, 0, 0, 1, _starpu_fetch_nowhere_task_input_cb, wrapper, 0, "_starpu_fetch_nowhere_task_input");
+		_starpu_fetch_data_on_node(handle, node, local_replicate, mode, 0, 0, 1, _starpu_fetch_nowhere_task_input_cb, wrapper, 0, "_starpu_fetch_nowhere_task_input");
 	}
 
 	if (profiling && task->profiling_info)

+ 1 - 1
src/datawizard/coherency.h

@@ -279,7 +279,7 @@ void _starpu_display_msi_stats(void);
  * should thus *not* take a reference since it can not know whether the request will complete
  * async means that _starpu_fetch_data_on_node will wait for completion of the request
  */
-int _starpu_fetch_data_on_node(starpu_data_handle_t handle, struct _starpu_data_replicate *replicate,
+int _starpu_fetch_data_on_node(starpu_data_handle_t handle, int node, struct _starpu_data_replicate *replicate,
 			       enum starpu_data_access_mode mode, unsigned detached, unsigned is_prefetch, unsigned async,
 			       void (*callback_func)(void *), void *callback_arg, int prio, const char *origin);
 /* This releases a reference on the handle */

+ 2 - 2
src/datawizard/filters.c

@@ -533,8 +533,8 @@ void starpu_data_partition(starpu_data_handle_t initial_handle, struct starpu_da
 	initial_handle->children = NULL;
 
 	/* Make sure to wait for previous tasks working on the whole data */
-	starpu_data_acquire_on_node(initial_handle, -1, STARPU_RW);
-	starpu_data_release_on_node(initial_handle, -1);
+	starpu_data_acquire_on_node(initial_handle, STARPU_ACQUIRE_NO_NODE, STARPU_RW);
+	starpu_data_release_on_node(initial_handle, STARPU_ACQUIRE_NO_NODE);
 
 	_starpu_data_partition(initial_handle, NULL, nparts, f, 1);
 }

+ 7 - 7
src/datawizard/interfaces/data_interface.c

@@ -633,9 +633,9 @@ void _starpu_check_if_valid_and_fetch_data_on_node(starpu_data_handle_t handle,
 	}
 	if (valid)
 	{
-		int ret = _starpu_fetch_data_on_node(handle, replicate, STARPU_R, 0, 0, 0, NULL, NULL, 0, origin);
+		int ret = _starpu_fetch_data_on_node(handle, handle->home_node, replicate, STARPU_R, 0, 0, 0, NULL, NULL, 0, origin);
 		STARPU_ASSERT(!ret);
-		_starpu_release_data_on_node(handle, 0, &handle->per_node[handle->home_node]);
+		_starpu_release_data_on_node(handle, handle->home_node, replicate);
 	}
 	else
 	{
@@ -906,7 +906,7 @@ static void _starpu_data_unregister_submit_cb(void *arg)
 	STARPU_ASSERT(handle->busy_count);
         _starpu_spin_unlock(&handle->header_lock);
 
-	starpu_data_release_on_node(handle, -1);
+	starpu_data_release_on_node(handle, STARPU_ACQUIRE_ALL_NODES);
 }
 
 void starpu_data_unregister_submit(starpu_data_handle_t handle)
@@ -920,7 +920,7 @@ void starpu_data_unregister_submit(starpu_data_handle_t handle)
 	}
 
 	/* Wait for all task dependencies on this handle before putting it for free */
-	starpu_data_acquire_on_node_cb(handle, -1, STARPU_RW, _starpu_data_unregister_submit_cb, handle);
+	starpu_data_acquire_on_node_cb(handle, STARPU_ACQUIRE_ALL_NODES, STARPU_RW, _starpu_data_unregister_submit_cb, handle);
 }
 
 static void _starpu_data_invalidate(void *data)
@@ -976,14 +976,14 @@ static void _starpu_data_invalidate(void *data)
 
 	_starpu_spin_unlock(&handle->header_lock);
 
-	starpu_data_release_on_node(handle, -1);
+	starpu_data_release_on_node(handle, STARPU_ACQUIRE_ALL_NODES);
 }
 
 void starpu_data_invalidate(starpu_data_handle_t handle)
 {
 	STARPU_ASSERT(handle);
 
-	starpu_data_acquire_on_node(handle, -1, STARPU_W);
+	starpu_data_acquire_on_node(handle, STARPU_ACQUIRE_ALL_NODES, STARPU_W);
 
 	_starpu_data_invalidate(handle);
 
@@ -994,7 +994,7 @@ void starpu_data_invalidate_submit(starpu_data_handle_t handle)
 {
 	STARPU_ASSERT(handle);
 
-	starpu_data_acquire_on_node_cb(handle, -1, STARPU_W, _starpu_data_invalidate, handle);
+	starpu_data_acquire_on_node_cb(handle, STARPU_ACQUIRE_ALL_NODES, STARPU_W, _starpu_data_invalidate, handle);
 
 	handle->initialized = 0;
 }

+ 13 - 7
src/datawizard/user_interactions.c

@@ -97,7 +97,7 @@ static void _starpu_data_acquire_continuation_non_blocking(void *arg)
 	struct _starpu_data_replicate *replicate =
 		wrapper->node >= 0 ? &handle->per_node[wrapper->node] : NULL;
 
-	ret = _starpu_fetch_data_on_node(handle, replicate, wrapper->mode, 0, 0, 1,
+	ret = _starpu_fetch_data_on_node(handle, wrapper->node, replicate, wrapper->mode, 0, 0, 1,
 			_starpu_data_acquire_fetch_data_callback, wrapper, 0, "_starpu_data_acquire_continuation_non_blocking");
 	STARPU_ASSERT(!ret);
 }
@@ -214,7 +214,7 @@ static inline void _starpu_data_acquire_continuation(void *arg)
 
 	int ret;
 
-	ret = _starpu_fetch_data_on_node(handle, replicate, wrapper->mode, 0, 0, 0, NULL, NULL, 0, "_starpu_data_acquire_continuation");
+	ret = _starpu_fetch_data_on_node(handle, wrapper->node, replicate, wrapper->mode, 0, 0, 0, NULL, NULL, 0, "_starpu_data_acquire_continuation");
 	STARPU_ASSERT(!ret);
 
 	/* continuation of starpu_data_acquire */
@@ -300,7 +300,7 @@ int starpu_data_acquire_on_node(starpu_data_handle_t handle, int node, enum star
 		struct _starpu_data_replicate *replicate =
 			node >= 0 ? &handle->per_node[node] : NULL;
 		/* no one has locked this data yet, so we proceed immediately */
-		int ret = _starpu_fetch_data_on_node(handle, replicate, mode, 0, 0, 0, NULL, NULL, 0, "starpu_data_acquire_on_node");
+		int ret = _starpu_fetch_data_on_node(handle, node, replicate, mode, 0, 0, 0, NULL, NULL, 0, "starpu_data_acquire_on_node");
 		STARPU_ASSERT(!ret);
 	}
 	else
@@ -344,6 +344,12 @@ void starpu_data_release_on_node(starpu_data_handle_t handle, int node)
 	else
 	{
 		_starpu_spin_lock(&handle->header_lock);
+		if (node == STARPU_ACQUIRE_ALL_NODES)
+		{
+			int i;
+			for (i = 0; i < STARPU_MAXNODES; i++)
+				handle->per_node[i].refcnt--;
+		}
 		handle->busy_count--;
 		if (!_starpu_notify_data_dependencies(handle))
 			_starpu_spin_unlock(&handle->header_lock);
@@ -362,7 +368,7 @@ static void _prefetch_data_on_node(void *arg)
         int ret;
 
 	struct _starpu_data_replicate *replicate = &handle->per_node[wrapper->node];
-	ret = _starpu_fetch_data_on_node(handle, replicate, STARPU_R, wrapper->async, wrapper->prefetch, wrapper->async, NULL, NULL, wrapper->prio, "_prefetch_data_on_node");
+	ret = _starpu_fetch_data_on_node(handle, wrapper->node, replicate, STARPU_R, wrapper->async, wrapper->prefetch, wrapper->async, NULL, NULL, wrapper->prio, "_prefetch_data_on_node");
         STARPU_ASSERT(!ret);
 
 	if (wrapper->async)
@@ -409,7 +415,7 @@ int _starpu_prefetch_data_on_node_with_mode(starpu_data_handle_t handle, unsigne
 		STARPU_PTHREAD_MUTEX_DESTROY(&wrapper->lock);
 		free(wrapper);
 
-		_starpu_fetch_data_on_node(handle, replicate, mode, async, prefetch, async, NULL, NULL, prio, "_starpu_prefetch_data_on_node_with_mode");
+		_starpu_fetch_data_on_node(handle, node, replicate, mode, async, prefetch, async, NULL, NULL, prio, "_starpu_prefetch_data_on_node_with_mode");
 
 		/* remove the "lock"/reference */
 
@@ -489,14 +495,14 @@ static void _starpu_data_wont_use(void *data)
 			_starpu_memchunk_wont_use(local->mc, starpu_worker_get_memory_node(worker));
 	}
 	_starpu_spin_unlock(&handle->header_lock);
-	starpu_data_release_on_node(handle, -1);
+	starpu_data_release_on_node(handle, STARPU_ACQUIRE_ALL_NODES);
 	if (handle->home_node != -1)
 		starpu_data_idle_prefetch_on_node(handle, handle->home_node, 1);
 }
 
 void starpu_data_wont_use(starpu_data_handle_t handle)
 {
-	starpu_data_acquire_on_node_cb(handle, -1, STARPU_R, _starpu_data_wont_use, handle);
+	starpu_data_acquire_on_node_cb(handle, STARPU_ACQUIRE_ALL_NODES, STARPU_R, _starpu_data_wont_use, handle);
 }
 
 /*

+ 4 - 4
src/debug/latency.c

@@ -35,9 +35,9 @@ void _starpu_benchmark_ping_pong(starpu_data_handle_t handle,
 		_starpu_spin_unlock(&handle->header_lock);
 
 		struct _starpu_data_replicate *replicate_0 = &handle->per_node[node0];
-		ret = _starpu_fetch_data_on_node(handle, replicate_0, STARPU_RW, 0, 0, 0, NULL, NULL, 0, "_starpu_benchmark_ping_pong");
+		ret = _starpu_fetch_data_on_node(handle, node0, replicate_0, STARPU_RW, 0, 0, 0, NULL, NULL, 0, "_starpu_benchmark_ping_pong");
 		STARPU_ASSERT(!ret);
-		_starpu_release_data_on_node(handle, 0, replicate_0);
+		_starpu_release_data_on_node(handle, node0, replicate_0);
 
 		_starpu_spin_lock(&handle->header_lock);
 		handle->refcnt++;
@@ -45,8 +45,8 @@ void _starpu_benchmark_ping_pong(starpu_data_handle_t handle,
 		_starpu_spin_unlock(&handle->header_lock);
 
 		struct _starpu_data_replicate *replicate_1 = &handle->per_node[node1];
-		ret = _starpu_fetch_data_on_node(handle, replicate_1, STARPU_RW, 0, 0, 0, NULL, NULL, 0, "_starpu_benchmark_ping_pong");
+		ret = _starpu_fetch_data_on_node(handle, node1, replicate_1, STARPU_RW, 0, 0, 0, NULL, NULL, 0, "_starpu_benchmark_ping_pong");
 		STARPU_ASSERT(!ret);
-		_starpu_release_data_on_node(handle, 0, replicate_1);
+		_starpu_release_data_on_node(handle, node1, replicate_1);
 	}
 }