瀏覽代碼

Fix the starpu_sync_data_with_mem API, it is now required to call
starpu_release_data_from_mem once the application does not use the data
anymore. The starpu_notify_data_modification function is removed as it is
broken by design.

Cédric Augonnet 15 年之前
父節點
當前提交
c94a373e97

+ 3 - 1
examples/incrementer/incrementer.c

@@ -77,7 +77,7 @@ int main(int argc, char **argv)
 	starpu_wait_all_tasks();
 
 	/* update the array in RAM */
-	starpu_sync_data_with_mem(float_array_handle);
+	starpu_sync_data_with_mem(float_array_handle, STARPU_R);
 	
 	fprintf(stderr, "array -> %f, %f, %f\n", float_array[0], 
 			float_array[1], float_array[2]);
@@ -85,6 +85,8 @@ int main(int argc, char **argv)
 	if (float_array[0] != float_array[1] + float_array[2])
 		return 1;
 	
+	starpu_release_data_from_mem(float_array_handle);
+
 	starpu_shutdown();
 
 	return 0;

+ 3 - 3
examples/ppm-downscaler/yuv-downscaler.c

@@ -290,9 +290,9 @@ int main(int argc, char **argv)
 	/* make sure all output buffers are sync'ed */
 	for (frame = 0; frame < nframes; frame++)
 	{
-		starpu_sync_data_with_mem(new_frame_y_handle[frame]);
-		starpu_sync_data_with_mem(new_frame_u_handle[frame]);
-		starpu_sync_data_with_mem(new_frame_v_handle[frame]);
+		starpu_sync_data_with_mem(new_frame_y_handle[frame], STARPU_R);
+		starpu_sync_data_with_mem(new_frame_u_handle[frame], STARPU_R);
+		starpu_sync_data_with_mem(new_frame_v_handle[frame], STARPU_R);
 	}
 
 	/* partition the layers into smaller parts */

+ 2 - 2
include/starpu-data.h

@@ -43,8 +43,8 @@ void starpu_delete_data(struct starpu_data_state_t *state);
 
 void starpu_advise_if_data_is_important(struct starpu_data_state_t *state, unsigned is_important);
 
-int starpu_sync_data_with_mem(struct starpu_data_state_t *state);
-int starpu_notify_data_modification(struct starpu_data_state_t *state, uint32_t modifying_node);
+int starpu_sync_data_with_mem(struct starpu_data_state_t *state, starpu_access_mode mode);
+void starpu_release_data_from_mem(struct starpu_data_state_t *state);
 
 int starpu_malloc_pinned_if_possible(void **A, size_t dim);
 int starpu_free_pinned_if_possible(void *A);

+ 15 - 66
src/datawizard/user_interactions.c

@@ -34,9 +34,9 @@ int starpu_request_data_allocation(data_state *state, uint32_t node)
 	return 0;
 }
 
-
 struct state_and_node {
 	data_state *state;
+	starpu_access_mode mode;
 	unsigned node;
 	pthread_cond_t cond;
 	pthread_mutex_t lock;
@@ -52,20 +52,20 @@ static inline void _starpu_sync_data_with_mem_continuation(void *arg)
 
 	data_state *state = statenode->state;
 
-	ret = fetch_data_on_node(state, 0, 1, 0, 0);
-	
+	unsigned r = (statenode->mode != STARPU_W);
+	unsigned w = (statenode->mode != STARPU_R);
+
+	ret = fetch_data_on_node(state, 0, r, w, 0);
 	STARPU_ASSERT(!ret);
 	
-	/* the application does not need to "lock" the data anymore */
-	release_data_on_node(state, 0, 0);
-
 	pthread_mutex_lock(&statenode->lock);
 	statenode->finished = 1;
 	pthread_cond_signal(&statenode->cond);
 	pthread_mutex_unlock(&statenode->lock);
 }
 
-int starpu_sync_data_with_mem(data_state *state)
+/* The data must be released by calling starpu_release_data_from_mem later on */
+int starpu_sync_data_with_mem(data_state *state, starpu_access_mode mode)
 {
 	/* it is forbidden to call this function from a callback or a codelet */
 	if (STARPU_UNLIKELY(!worker_may_perform_blocking_calls()))
@@ -74,7 +74,8 @@ int starpu_sync_data_with_mem(data_state *state)
 	struct state_and_node statenode =
 	{
 		.state = state,
-		.node = 0, /* unused here */
+		.mode = mode,
+		.node = 0, // unused
 		.cond = PTHREAD_COND_INITIALIZER,
 		.lock = PTHREAD_MUTEX_INITIALIZER,
 		.finished = 0
@@ -83,7 +84,7 @@ int starpu_sync_data_with_mem(data_state *state)
 	/* we try to get the data, if we do not succeed immediately, we set a
  	* callback function that will be executed automatically when the data is
  	* available again, otherwise we fetch the data directly */
-	if (!attempt_to_submit_data_request_from_apps(state, STARPU_R, 
+	if (!attempt_to_submit_data_request_from_apps(state, mode,
 			_starpu_sync_data_with_mem_continuation, &statenode))
 	{
 		/* no one has locked this data yet, so we proceed immediately */
@@ -99,67 +100,15 @@ int starpu_sync_data_with_mem(data_state *state)
 	return 0;
 }
 
-static inline void do_notify_data_modification(data_state *state, uint32_t modifying_node)
-{
-	starpu_spin_lock(&state->header_lock);
-
-	unsigned node = 0;
-	for (node = 0; node < MAXNODES; node++)
-	{
-		state->per_node[node].state =
-			(node == modifying_node?OWNER:INVALID);
-	}
-
-	starpu_spin_unlock(&state->header_lock);
-}
-
-static inline void _notify_data_modification_continuation(void *arg)
+/* This function must be called after starpu_sync_data_with_mem so that the
+ * application release the data */
+void starpu_release_data_from_mem(data_state *state)
 {
-	struct state_and_node *statenode = arg;
-
-	do_notify_data_modification(statenode->state, statenode->node);
-
-	pthread_mutex_lock(&statenode->lock);
-	statenode->finished = 1;
-	pthread_cond_signal(&statenode->cond);
-	pthread_mutex_unlock(&statenode->lock);
+	/* The application can now release the rw-lock */
+	release_data_on_node(state, 0, 0);
 }
 
-/* in case the application did modify the data ... invalidate all other copies  */
-int starpu_notify_data_modification(data_state *state, uint32_t modifying_node)
-{
-	/* It is forbidden to call this function from a callback or a codelet */
-	if (STARPU_UNLIKELY(!worker_may_perform_blocking_calls()))
-		return -EDEADLK;
-
-	struct state_and_node statenode =
-	{
-		.state = state,
-		.node = modifying_node,
-		.cond = PTHREAD_COND_INITIALIZER,
-		.lock = PTHREAD_MUTEX_INITIALIZER,
-		.finished = 0
-	};
-
-	if (!attempt_to_submit_data_request_from_apps(state, STARPU_W, _notify_data_modification_continuation, &statenode))
-	{
-		/* we can immediately proceed */
-		do_notify_data_modification(state, modifying_node);
-	}
-	else {
-		pthread_mutex_lock(&statenode.lock);
-		if (!statenode.finished)
-			pthread_cond_wait(&statenode.cond, &statenode.lock);
-		pthread_mutex_unlock(&statenode.lock);
-	}
-
-	/* remove the "lock"/reference */
-	starpu_spin_lock(&state->header_lock);
-	notify_data_dependencies(state);
-	starpu_spin_unlock(&state->header_lock);
 
-	return 0;
-}
 
 static void _prefetch_data_on_node(void *arg)
 {

+ 5 - 3
tests/errorcheck/invalid_blocking_calls.c

@@ -27,7 +27,7 @@ static void wrong_func(starpu_data_interface_t *descr, void *arg)
 
 	/* try to fetch data in the RAM while we are in a codelet, such a
 	 * blocking call is forbidden */
-	ret = starpu_sync_data_with_mem(handle);
+	ret = starpu_sync_data_with_mem(handle, STARPU_RW);
 	if (ret != -EDEADLK)
 		exit(-1);
 
@@ -49,7 +49,7 @@ static void wrong_callback(void *arg)
 {
 	int ret;
 
-	ret  = starpu_sync_data_with_mem(handle);
+	ret  = starpu_sync_data_with_mem(handle, STARPU_RW);
 	if (ret != -EDEADLK)
 		exit(-1);
 
@@ -90,10 +90,12 @@ int main(int argc, char **argv)
 
 	/* This call is valid as it is done by the application outside a
 	 * callback */
-	ret = starpu_sync_data_with_mem(handle);
+	ret = starpu_sync_data_with_mem(handle, STARPU_RW);
 	if (ret)
 		return -1;
 
+	starpu_release_data_from_mem(handle);
+
 	starpu_shutdown();
 
 	return 0;

+ 5 - 4
tests/microbenchs/sync_and_notify_data.c

@@ -116,13 +116,12 @@ int main(int argc, char **argv)
 		}
 
 		/* synchronize v in RAM */
-		starpu_sync_data_with_mem(v_handle);
+		starpu_sync_data_with_mem(v_handle, STARPU_RW);
 
 		/* increment b */
 		v[1]++;
 
-		/* inform StarPU that v was modified by the apps */
-		starpu_notify_data_modification(v_handle, 0);
+		starpu_release_data_from_mem(v_handle);
 
 		for (ind = 0; ind < N; ind++)
 		{
@@ -154,10 +153,12 @@ int main(int argc, char **argv)
 
 	}
 
-	starpu_sync_data_with_mem(v_handle);
+	starpu_sync_data_with_mem(v_handle, STARPU_RW);
 
 	fprintf(stderr, "V = { %d, %d, %d }\n", v[0], v[1], v[2]);
 
+	starpu_release_data_from_mem(v_handle);
+
 	starpu_shutdown();
 
 	if ((v[0] != N*K) || (v[1] != K) || (v[2] != N*K))