Browse Source

Factorize code

Samuel Thibault 8 years ago
parent
commit
05a423147e
1 changed files with 101 additions and 112 deletions
  1. 101 112
      src/datawizard/user_interactions.c

+ 101 - 112
src/datawizard/user_interactions.c

@@ -54,8 +54,9 @@ struct user_interaction_wrapper
 	starpu_pthread_cond_t cond;
 	starpu_pthread_mutex_t lock;
 	unsigned finished;
-	unsigned async;
+	unsigned detached;
 	unsigned prefetch;
+	unsigned async;
 	int prio;
 	void (*callback)(void *);
 	void (*callback_fetch_data)(void *); // called after fetch_data
@@ -64,10 +65,71 @@ struct user_interaction_wrapper
 	struct starpu_task *post_sync_task;
 };
 
+static inline void _starpu_data_acquire_wrapper_init(struct user_interaction_wrapper *wrapper, starpu_data_handle_t handle, int node, enum starpu_data_access_mode mode)
+{
+	memset(wrapper, 0, sizeof(*wrapper));
+	wrapper->handle = handle;
+	wrapper->node = node;
+	wrapper->mode = mode;
+	wrapper->finished = 0;
+	STARPU_PTHREAD_COND_INIT(&wrapper->cond, NULL);
+	STARPU_PTHREAD_MUTEX_INIT(&wrapper->lock, NULL);
+}
+
+/* Called to signal completion of asynchronous data acquisition */
+static inline void _starpu_data_acquire_wrapper_finished(struct user_interaction_wrapper *wrapper)
+{
+	STARPU_PTHREAD_MUTEX_LOCK(&wrapper->lock);
+	wrapper->finished = 1;
+	STARPU_PTHREAD_COND_SIGNAL(&wrapper->cond);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&wrapper->lock);
+}
+
+/* Called to wait for completion of asynchronous data acquisition */
+static inline void _starpu_data_acquire_wrapper_wait(struct user_interaction_wrapper *wrapper)
+{
+	STARPU_PTHREAD_MUTEX_LOCK(&wrapper->lock);
+	while (!wrapper->finished)
+		STARPU_PTHREAD_COND_WAIT(&wrapper->cond, &wrapper->lock);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&wrapper->lock);
+}
+
+static inline void _starpu_data_acquire_wrapper_fini(struct user_interaction_wrapper *wrapper)
+{
+	STARPU_PTHREAD_COND_DESTROY(&wrapper->cond);
+	STARPU_PTHREAD_MUTEX_DESTROY(&wrapper->lock);
+}
+
+/* Called when the fetch into target memory is done, we're done! */
+static inline void _starpu_data_acquire_fetch_done(struct user_interaction_wrapper *wrapper)
+{
+	if (wrapper->node >= 0)
+	{
+		struct _starpu_data_replicate *replicate = &wrapper->handle->per_node[wrapper->node];
+		if (replicate->mc)
+			replicate->mc->diduse = 1;
+	}
+}
+
+/* Called when the data acquisition is done, to launch the fetch into target memory */
+static inline void _starpu_data_acquire_launch_fetch(struct user_interaction_wrapper *wrapper, int async, void (*callback)(void *), void *callback_arg)
+{
+	int node = wrapper->node;
+	starpu_data_handle_t handle = wrapper->handle;
+	struct _starpu_data_replicate *replicate = node >= 0 ? &handle->per_node[node] : NULL;
+
+	int ret = _starpu_fetch_data_on_node(handle, node, replicate, wrapper->mode, wrapper->detached, wrapper->prefetch, async, callback, callback_arg, wrapper->prio, "_starpu_data_acquire_launch_fetch");
+	STARPU_ASSERT(!ret);
+}
+
+
+
 /*
  *	Non Blocking data request from application
  */
-/* put the current value of the data into RAM */
+
+
+/* Called when fetch is done, call the callback */
 static void _starpu_data_acquire_fetch_data_callback(void *arg)
 {
 	struct user_interaction_wrapper *wrapper = (struct user_interaction_wrapper *) arg;
@@ -80,33 +142,21 @@ static void _starpu_data_acquire_fetch_data_callback(void *arg)
 	if (wrapper->post_sync_task)
 		_starpu_add_post_sync_tasks(wrapper->post_sync_task, handle);
 
-	struct _starpu_data_replicate *replicate =
-		wrapper->node >= 0 ? &handle->per_node[wrapper->node] : NULL;
-	if (replicate && replicate->mc)
-		replicate->mc->diduse = 1;
+	_starpu_data_acquire_fetch_done(wrapper);
 
 	wrapper->callback(wrapper->callback_arg);
 
+	_starpu_data_acquire_wrapper_fini(wrapper);
 	free(wrapper);
 }
 
+/* Called when the data acquisition is done, launch the fetch into target memory */
 static void _starpu_data_acquire_continuation_non_blocking(void *arg)
 {
-	int ret;
-	struct user_interaction_wrapper *wrapper = (struct user_interaction_wrapper *) arg;
-
-	starpu_data_handle_t handle = wrapper->handle;
-
-	STARPU_ASSERT(handle);
-
-	struct _starpu_data_replicate *replicate =
-		wrapper->node >= 0 ? &handle->per_node[wrapper->node] : NULL;
-
-	ret = _starpu_fetch_data_on_node(handle, wrapper->node, replicate, wrapper->mode, 0, 0, 1,
-			_starpu_data_acquire_fetch_data_callback, wrapper, 0, "_starpu_data_acquire_continuation_non_blocking");
-	STARPU_ASSERT(!ret);
+	_starpu_data_acquire_launch_fetch(arg, 1, _starpu_data_acquire_fetch_data_callback, arg);
 }
 
+/* Called when the implicit data dependencies are done, launch the data acquisition */
 static void starpu_data_acquire_cb_pre_sync_callback(void *arg)
 {
 	struct user_interaction_wrapper *wrapper = (struct user_interaction_wrapper *) arg;
@@ -134,14 +184,11 @@ int starpu_data_acquire_on_node_cb_sequential_consistency(starpu_data_handle_t h
 	struct user_interaction_wrapper *wrapper;
 	_STARPU_MALLOC(wrapper, sizeof(struct user_interaction_wrapper));
 
-	wrapper->handle = handle;
-	wrapper->node = node;
-	wrapper->mode = mode;
+	_starpu_data_acquire_wrapper_init(wrapper, handle, node, mode);
+	wrapper->async = 1;
+
 	wrapper->callback = callback;
 	wrapper->callback_arg = arg;
-	STARPU_PTHREAD_COND_INIT(&wrapper->cond, NULL);
-	STARPU_PTHREAD_MUTEX_INIT(&wrapper->lock, NULL);
-	wrapper->finished = 0;
 	wrapper->pre_sync_task = NULL;
 	wrapper->post_sync_task = NULL;
 
@@ -203,32 +250,22 @@ int starpu_data_acquire_cb_sequential_consistency(starpu_data_handle_t handle,
 	return starpu_data_acquire_on_node_cb_sequential_consistency(handle, STARPU_MAIN_RAM, mode, callback, arg, sequential_consistency);
 }
 
+
 /*
- *	Block data request from application
+ *	Blockin data request from application
  */
+
+
 static inline void _starpu_data_acquire_continuation(void *arg)
 {
 	struct user_interaction_wrapper *wrapper = (struct user_interaction_wrapper *) arg;
 
 	starpu_data_handle_t handle = wrapper->handle;
-
 	STARPU_ASSERT(handle);
 
-	struct _starpu_data_replicate *replicate =
-		wrapper->node >= 0 ? &handle->per_node[wrapper->node] : NULL;
-
-	int ret;
-
-	ret = _starpu_fetch_data_on_node(handle, wrapper->node, replicate, wrapper->mode, 0, 0, 0, NULL, NULL, 0, "_starpu_data_acquire_continuation");
-	STARPU_ASSERT(!ret);
-	if (replicate && replicate->mc)
-		replicate->mc->diduse = 1;
-
-	/* continuation of starpu_data_acquire */
-	STARPU_PTHREAD_MUTEX_LOCK(&wrapper->lock);
-	wrapper->finished = 1;
-	STARPU_PTHREAD_COND_SIGNAL(&wrapper->cond);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&wrapper->lock);
+	_starpu_data_acquire_launch_fetch(wrapper, 0, NULL, NULL);
+	_starpu_data_acquire_fetch_done(wrapper);
+	_starpu_data_acquire_wrapper_finished(wrapper);
 }
 
 /* The data must be released by calling starpu_data_release later on */
@@ -256,16 +293,8 @@ int starpu_data_acquire_on_node(starpu_data_handle_t handle, int node, enum star
 		STARPU_ASSERT(!ret);
 	}
 
-	struct user_interaction_wrapper wrapper =
-	{
-		.handle = handle,
-		.mode = mode,
-		.node = node,
-		.finished = 0
-	};
-
-	STARPU_PTHREAD_COND_INIT(&wrapper.cond, NULL);
-	STARPU_PTHREAD_MUTEX_INIT(&wrapper.lock, NULL);
+	struct user_interaction_wrapper wrapper;
+	_starpu_data_acquire_wrapper_init(&wrapper, handle, node, mode);
 
 //	_STARPU_DEBUG("TAKE sequential_consistency_mutex starpu_data_acquire\n");
 	STARPU_PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
@@ -304,23 +333,15 @@ int starpu_data_acquire_on_node(starpu_data_handle_t handle, int node, enum star
  	* available again, otherwise we fetch the data directly */
 	if (!_starpu_attempt_to_submit_data_request_from_apps(handle, mode, _starpu_data_acquire_continuation, &wrapper))
 	{
-		struct _starpu_data_replicate *replicate =
-			node >= 0 ? &handle->per_node[node] : NULL;
 		/* no one has locked this data yet, so we proceed immediately */
-		int ret = _starpu_fetch_data_on_node(handle, node, replicate, mode, 0, 0, 0, NULL, NULL, 0, "starpu_data_acquire_on_node");
-		STARPU_ASSERT(!ret);
-		if (replicate && replicate->mc)
-			replicate->mc->diduse = 1;
+		_starpu_data_acquire_launch_fetch(&wrapper, 0, NULL, NULL);
+		_starpu_data_acquire_fetch_done(&wrapper);
 	}
 	else
 	{
-		STARPU_PTHREAD_MUTEX_LOCK(&wrapper.lock);
-		while (!wrapper.finished)
-			STARPU_PTHREAD_COND_WAIT(&wrapper.cond, &wrapper.lock);
-		STARPU_PTHREAD_MUTEX_UNLOCK(&wrapper.lock);
+		_starpu_data_acquire_wrapper_wait(&wrapper);
 	}
-	STARPU_PTHREAD_COND_DESTROY(&wrapper.cond);
-	STARPU_PTHREAD_MUTEX_DESTROY(&wrapper.lock);
+	_starpu_data_acquire_wrapper_fini(&wrapper);
 
 	/* At that moment, the caller holds a reference to the piece of data.
 	 * We enqueue the "post" sync task in the list associated to the handle
@@ -353,39 +374,23 @@ int starpu_data_acquire_on_node_try(starpu_data_handle_t handle, int node, enum
 	if (ret)
 		return ret;
 
-	struct user_interaction_wrapper wrapper =
-	{
-		.handle = handle,
-		.mode = mode,
-		.node = node,
-		.finished = 0
-	};
-
-	STARPU_PTHREAD_COND_INIT(&wrapper.cond, NULL);
-	STARPU_PTHREAD_MUTEX_INIT(&wrapper.lock, NULL);
+	struct user_interaction_wrapper wrapper;
+	_starpu_data_acquire_wrapper_init(&wrapper, handle, node, mode);
 
 	/* we try to get the data, if we do not succeed immediately, we set a
  	* callback function that will be executed automatically when the data is
  	* available again, otherwise we fetch the data directly */
 	if (!_starpu_attempt_to_submit_data_request_from_apps(handle, mode, _starpu_data_acquire_continuation, &wrapper))
 	{
-		struct _starpu_data_replicate *replicate =
-			node >= 0 ? &handle->per_node[node] : NULL;
 		/* no one has locked this data yet, so we proceed immediately */
-		ret = _starpu_fetch_data_on_node(handle, node, replicate, mode, 0, 0, 0, NULL, NULL, 0, "starpu_data_acquire_on_node");
-		STARPU_ASSERT(!ret);
-		if (replicate && replicate->mc)
-			replicate->mc->diduse = 1;
+		_starpu_data_acquire_launch_fetch(&wrapper, 0, NULL, NULL);
+		_starpu_data_acquire_fetch_done(&wrapper);
 	}
 	else
 	{
-		STARPU_PTHREAD_MUTEX_LOCK(&wrapper.lock);
-		while (!wrapper.finished)
-			STARPU_PTHREAD_COND_WAIT(&wrapper.cond, &wrapper.lock);
-		STARPU_PTHREAD_MUTEX_UNLOCK(&wrapper.lock);
+		_starpu_data_acquire_wrapper_wait(&wrapper);
 	}
-	STARPU_PTHREAD_COND_DESTROY(&wrapper.cond);
-	STARPU_PTHREAD_MUTEX_DESTROY(&wrapper.lock);
+	_starpu_data_acquire_wrapper_fini(&wrapper);
 
 	return 0;
 }
@@ -431,21 +436,13 @@ static void _prefetch_data_on_node(void *arg)
 {
 	struct user_interaction_wrapper *wrapper = (struct user_interaction_wrapper *) arg;
 	starpu_data_handle_t handle = wrapper->handle;
-        int ret;
 
-	struct _starpu_data_replicate *replicate = &handle->per_node[wrapper->node];
-	ret = _starpu_fetch_data_on_node(handle, wrapper->node, replicate, STARPU_R, wrapper->async, wrapper->prefetch, wrapper->async, NULL, NULL, wrapper->prio, "_prefetch_data_on_node");
-        STARPU_ASSERT(!ret);
+	_starpu_data_acquire_launch_fetch(wrapper, wrapper->async, NULL, NULL);
 
 	if (wrapper->async)
 		free(wrapper);
 	else
-	{
-		STARPU_PTHREAD_MUTEX_LOCK(&wrapper->lock);
-		wrapper->finished = 1;
-		STARPU_PTHREAD_COND_SIGNAL(&wrapper->cond);
-		STARPU_PTHREAD_MUTEX_UNLOCK(&wrapper->lock);
-	}
+		_starpu_data_acquire_wrapper_finished(wrapper);
 
 	_starpu_spin_lock(&handle->header_lock);
 	if (!_starpu_notify_data_dependencies(handle))
@@ -463,26 +460,22 @@ int _starpu_prefetch_data_on_node_with_mode(starpu_data_handle_t handle, unsigne
 	struct user_interaction_wrapper *wrapper;
 	_STARPU_MALLOC(wrapper, sizeof(*wrapper));
 
-	wrapper->handle = handle;
-	wrapper->node = node;
-	wrapper->async = async;
+	_starpu_data_acquire_wrapper_init(wrapper, handle, node, STARPU_R);
+
+	wrapper->detached = async;
 	wrapper->prefetch = prefetch;
+	wrapper->async = async;
 	wrapper->prio = prio;
-	STARPU_PTHREAD_COND_INIT(&wrapper->cond, NULL);
-	STARPU_PTHREAD_MUTEX_INIT(&wrapper->lock, NULL);
-	wrapper->finished = 0;
 
 	if (!_starpu_attempt_to_submit_data_request_from_apps(handle, mode, _prefetch_data_on_node, wrapper))
 	{
 		/* we can immediately proceed */
 		struct _starpu_data_replicate *replicate = &handle->per_node[node];
+		_starpu_data_acquire_launch_fetch(wrapper, async, NULL, NULL);
 
-		STARPU_PTHREAD_COND_DESTROY(&wrapper->cond);
-		STARPU_PTHREAD_MUTEX_DESTROY(&wrapper->lock);
+		_starpu_data_acquire_wrapper_fini(wrapper);
 		free(wrapper);
 
-		_starpu_fetch_data_on_node(handle, node, replicate, mode, async, prefetch, async, NULL, NULL, prio, "_starpu_prefetch_data_on_node_with_mode");
-
 		/* remove the "lock"/reference */
 
 		_starpu_spin_lock(&handle->header_lock);
@@ -504,12 +497,8 @@ int _starpu_prefetch_data_on_node_with_mode(starpu_data_handle_t handle, unsigne
 	}
 	else if (!async)
 	{
-		STARPU_PTHREAD_MUTEX_LOCK(&wrapper->lock);
-		while (!wrapper->finished)
-			STARPU_PTHREAD_COND_WAIT(&wrapper->cond, &wrapper->lock);
-		STARPU_PTHREAD_MUTEX_UNLOCK(&wrapper->lock);
-		STARPU_PTHREAD_COND_DESTROY(&wrapper->cond);
-		STARPU_PTHREAD_MUTEX_DESTROY(&wrapper->lock);
+		_starpu_data_acquire_wrapper_wait(wrapper);
+		_starpu_data_acquire_wrapper_fini(wrapper);
 		free(wrapper);
 	}