Quellcode durchsuchen

starpu_fetch_data and starpu_release_data now take data replicates

Cédric Augonnet vor 14 Jahren
Ursprung
Commit
118472e5a7

+ 16 - 14
src/datawizard/coherency.c

@@ -143,14 +143,14 @@ void _starpu_update_data_state(starpu_data_handle handle,
  * 		    else (invalid,owner->shared)
  */
 
-int _starpu_fetch_data_on_node(starpu_data_handle handle, uint32_t requesting_node,
+int _starpu_fetch_data_on_node(starpu_data_handle handle, struct starpu_data_replicate_s *dst_replicate,
 				starpu_access_mode mode, unsigned is_prefetch,
 				void (*callback_func)(void *), void *callback_arg)
 {
 	uint32_t local_node = _starpu_get_local_memory_node();
         _STARPU_LOG_IN();
 
-	struct starpu_data_replicate_s *dst_replicate = &handle->per_node[requesting_node];
+	unsigned requesting_node = dst_replicate->memory_node;
 
 	while (_starpu_spin_trylock(&handle->header_lock))
 		_starpu_datawizard_progress(local_node, 1);
@@ -297,18 +297,16 @@ int _starpu_fetch_data_on_node(starpu_data_handle handle, uint32_t requesting_no
         return ret;
 }
 
-static int prefetch_data_on_node(starpu_data_handle handle, starpu_access_mode mode, uint32_t node)
+static int prefetch_data_on_node(starpu_data_handle handle, struct starpu_data_replicate_s *replicate, starpu_access_mode mode)
 {
-	return _starpu_fetch_data_on_node(handle, node, mode, 1, NULL, NULL);
+	return _starpu_fetch_data_on_node(handle, replicate, mode, 1, NULL, NULL);
 }
 
-static int fetch_data(starpu_data_handle handle, starpu_access_mode mode)
+static int fetch_data(starpu_data_handle handle, struct starpu_data_replicate_s *replicate, starpu_access_mode mode)
 {
-	uint32_t requesting_node = _starpu_get_local_memory_node(); 
-
 	STARPU_ASSERT(!(mode & STARPU_SCRATCH));
 
-	return _starpu_fetch_data_on_node(handle, requesting_node, mode, 0, NULL, NULL);
+	return _starpu_fetch_data_on_node(handle, replicate, mode, 0, NULL, NULL);
 }
 
 inline uint32_t _starpu_get_data_refcnt(starpu_data_handle handle, uint32_t node)
@@ -328,7 +326,7 @@ uint32_t _starpu_data_get_footprint(starpu_data_handle handle)
 
 /* in case the data was accessed on a write mode, do not forget to 
  * make it accessible again once it is possible ! */
-void _starpu_release_data_on_node(starpu_data_handle handle, uint32_t default_wt_mask, uint32_t memory_node)
+void _starpu_release_data_on_node(starpu_data_handle handle, uint32_t default_wt_mask, struct starpu_data_replicate_s *replicate)
 {
 	uint32_t wt_mask;
 	wt_mask = default_wt_mask | handle->wt_mask;
@@ -337,7 +335,7 @@ void _starpu_release_data_on_node(starpu_data_handle handle, uint32_t default_wt
 	 * starpu_data_invalidate was called for instance). In that case, we do
 	 * not enforce any write-through mechanism. */
 
-	struct starpu_data_replicate_s *replicate = &handle->per_node[memory_node];
+	unsigned memory_node = replicate->memory_node;
 
 	if (replicate->state != STARPU_INVALID)
 	if ((wt_mask & ~(1<<memory_node)))
@@ -382,8 +380,9 @@ int _starpu_prefetch_task_input_on_node(struct starpu_task *task, uint32_t node)
 
 		if (mode & STARPU_SCRATCH)
 			continue;
-	
-		prefetch_data_on_node(handle, mode, node);
+
+		struct starpu_data_replicate_s *replicate = &handle->per_node[node];
+		prefetch_data_on_node(handle, replicate, mode);
 
 		_starpu_set_data_requested_flag_if_needed(handle, node);
 	}
@@ -439,7 +438,9 @@ int _starpu_fetch_task_input(struct starpu_task *task, uint32_t mask)
 		}
 		else {
 			/* That's a "normal" buffer (R/W) */
-			ret = fetch_data(handle, mode);
+			struct starpu_data_replicate_s *local_replicate;
+			local_replicate = &handle->per_node[local_memory_node];
+			ret = fetch_data(handle, local_replicate, mode);
 			if (STARPU_UNLIKELY(ret))
 				goto enomem;
 
@@ -484,7 +485,8 @@ void _starpu_push_task_output(struct starpu_task *task, uint32_t mask)
 			_starpu_memchunk_cache_insert(local_node, j->scratch_memchunks[index]);
 		}
 		else {
-			_starpu_release_data_on_node(handle, mask, local_node);
+			struct starpu_data_replicate_s *replicate = &handle->per_node[local_node];
+			_starpu_release_data_on_node(handle, mask, replicate);
 			_starpu_release_data_enforce_sequential_consistency(task, handle);
 		}
 	}

+ 3 - 2
src/datawizard/coherency.h

@@ -153,10 +153,11 @@ struct starpu_data_state_t {
 
 void _starpu_display_msi_stats(void);
 
-int _starpu_fetch_data_on_node(struct starpu_data_state_t *state, uint32_t requesting_node,
+int _starpu_fetch_data_on_node(struct starpu_data_state_t *state, struct starpu_data_replicate_s *replicate,
 				starpu_access_mode mode, unsigned is_prefetch,
 				void (*callback_func)(void *), void *callback_arg);
-void _starpu_release_data_on_node(struct starpu_data_state_t *state, uint32_t default_wt_mask, unsigned memory_node);
+void _starpu_release_data_on_node(struct starpu_data_state_t *state, uint32_t default_wt_mask,
+				struct starpu_data_replicate_s *replicate);
 
 void _starpu_update_data_state(starpu_data_handle handle,
 				struct starpu_data_replicate_s *requesting_replicate,

+ 5 - 3
src/datawizard/filters.c

@@ -179,12 +179,14 @@ void starpu_data_unpartition(starpu_data_handle root_handle, uint32_t gathering_
 	/* first take all the children lock (in order !) */
 	for (child = 0; child < root_handle->nchildren; child++)
 	{
+		struct starpu_data_state_t *child_handle = &root_handle->children[child];
+
 		/* make sure the intermediate children is unpartitionned as well */
-		if (root_handle->children[child].nchildren > 0)
-			starpu_data_unpartition(&root_handle->children[child], gathering_node);
+		if (child_handle->nchildren > 0)
+			starpu_data_unpartition(child_handle, gathering_node);
 
 		int ret;
-		ret = _starpu_fetch_data_on_node(&root_handle->children[child], gathering_node, STARPU_R, 0, NULL, NULL);
+		ret = _starpu_fetch_data_on_node(child_handle, &child_handle->per_node[gathering_node], STARPU_R, 0, NULL, NULL);
 		/* for now we pretend that the RAM is almost unlimited and that gathering 
 		 * data should be possible from the node that does the unpartionning ... we
 		 * don't want to have the programming deal with memory shortage at that time,

+ 5 - 2
src/datawizard/interfaces/data_interface.c

@@ -159,7 +159,9 @@ static void _starpu_data_unregister_fetch_data_callback(void *_arg)
 
 	STARPU_ASSERT(handle);
 
-	ret = _starpu_fetch_data_on_node(handle, arg->memory_node, STARPU_R, 0, NULL, NULL);
+	struct starpu_data_replicate_s *replicate = &handle->per_node[arg->memory_node];
+
+	ret = _starpu_fetch_data_on_node(handle, replicate, STARPU_R, 0, NULL, NULL);
 	STARPU_ASSERT(!ret);
 	
 	/* unlock the caller */
@@ -193,7 +195,8 @@ void starpu_data_unregister(starpu_data_handle handle)
 				_starpu_data_unregister_fetch_data_callback, &arg))
 		{
 			/* no one has locked this data yet, so we proceed immediately */
-			int ret = _starpu_fetch_data_on_node(handle, home_node, STARPU_R, 0, NULL, NULL);
+			struct starpu_data_replicate_s *home_replicate = &handle->per_node[home_node];
+			int ret = _starpu_fetch_data_on_node(handle, home_replicate, STARPU_R, 0, NULL, NULL);
 			STARPU_ASSERT(!ret);
 		}
 		else {

+ 17 - 9
src/datawizard/user_interactions.c

@@ -82,7 +82,9 @@ static void _starpu_data_acquire_continuation_non_blocking(void *arg)
 
 	STARPU_ASSERT(handle);
 
-	ret = _starpu_fetch_data_on_node(handle, 0, wrapper->mode, 1,
+	struct starpu_data_replicate_s *ram_replicate = &handle->per_node[0];
+
+	ret = _starpu_fetch_data_on_node(handle, ram_replicate, wrapper->mode, 1,
 			_starpu_data_acquire_fetch_data_callback, wrapper);
 	STARPU_ASSERT(!ret);
 }
@@ -172,7 +174,9 @@ static inline void _starpu_data_acquire_continuation(void *arg)
 
 	STARPU_ASSERT(handle);
 
-	_starpu_fetch_data_on_node(handle, 0, wrapper->mode, 0, NULL, NULL);
+	struct starpu_data_replicate_s *ram_replicate = &handle->per_node[0];
+
+	_starpu_fetch_data_on_node(handle, ram_replicate, wrapper->mode, 0, NULL, NULL);
 	
 	/* continuation of starpu_data_acquire */
 	PTHREAD_MUTEX_LOCK(&wrapper->lock);
@@ -240,7 +244,8 @@ int starpu_data_acquire(starpu_data_handle handle, starpu_access_mode mode)
 	if (!_starpu_attempt_to_submit_data_request_from_apps(handle, mode, _starpu_data_acquire_continuation, &wrapper))
 	{
 		/* no one has locked this data yet, so we proceed immediately */
-		int ret = _starpu_fetch_data_on_node(handle, 0, mode, 0, NULL, NULL);
+		struct starpu_data_replicate_s *ram_replicate = &handle->per_node[0];
+		int ret = _starpu_fetch_data_on_node(handle, ram_replicate, mode, 0, NULL, NULL);
 		STARPU_ASSERT(!ret);
 	}
 	else {
@@ -267,7 +272,7 @@ void starpu_data_release(starpu_data_handle handle)
 	STARPU_ASSERT(handle);
 
 	/* The application can now release the rw-lock */
-	_starpu_release_data_on_node(handle, 0, 0);
+	_starpu_release_data_on_node(handle, 0, &handle->per_node[0]);
 
 	/* In case there are some implicit dependencies, unlock the "post sync" tasks */
 	_starpu_unlock_post_sync_tasks(handle);
@@ -276,9 +281,11 @@ void starpu_data_release(starpu_data_handle handle)
 static void _prefetch_data_on_node(void *arg)
 {
 	struct user_interaction_wrapper *wrapper = arg;
+	starpu_data_handle handle = wrapper->handle;
         int ret;
 
-	ret = _starpu_fetch_data_on_node(wrapper->handle, wrapper->node, STARPU_R, wrapper->async, NULL, NULL);
+	struct starpu_data_replicate_s *replicate = &handle->per_node[wrapper->node];
+	ret = _starpu_fetch_data_on_node(handle, replicate, STARPU_R, wrapper->async, NULL, NULL);
         STARPU_ASSERT(!ret);
 
         PTHREAD_MUTEX_LOCK(&wrapper->lock);
@@ -288,9 +295,9 @@ static void _prefetch_data_on_node(void *arg)
 
 	if (!wrapper->async)
 	{
-		_starpu_spin_lock(&wrapper->handle->header_lock);
-		_starpu_notify_data_dependencies(wrapper->handle);
-		_starpu_spin_unlock(&wrapper->handle->header_lock);
+		_starpu_spin_lock(&handle->header_lock);
+		_starpu_notify_data_dependencies(handle);
+		_starpu_spin_unlock(&handle->header_lock);
 	}
 
 }
@@ -316,7 +323,8 @@ int _starpu_prefetch_data_on_node_with_mode(starpu_data_handle handle, unsigned
 	if (!_starpu_attempt_to_submit_data_request_from_apps(handle, mode, _prefetch_data_on_node, &wrapper))
 	{
 		/* we can immediately proceed */
-		_starpu_fetch_data_on_node(handle, node, mode, async, NULL, NULL);
+		struct starpu_data_replicate_s *replicate = &handle->per_node[node];
+		_starpu_fetch_data_on_node(handle, replicate, mode, async, NULL, NULL);
 
 		/* remove the "lock"/reference */
 		if (!async)

+ 6 - 2
src/debug/latency.c

@@ -26,9 +26,13 @@ void _starpu_benchmark_ping_pong(starpu_data_handle handle,
 	for (iter = 0; iter < niter; iter++)
 	{
 		int ret;
-		ret = _starpu_fetch_data_on_node(handle, node0, STARPU_RW, 0, NULL, NULL);
+
+		struct starpu_data_replicate_s *replicate_0 = &handle->per_node[node0];
+		ret = _starpu_fetch_data_on_node(handle, replicate_0, STARPU_RW, 0, NULL, NULL);
 		STARPU_ASSERT(!ret);
-		ret = _starpu_fetch_data_on_node(handle, node1, STARPU_RW, 0, NULL, NULL);
+
+		struct starpu_data_replicate_s *replicate_1 = &handle->per_node[node1];
+		ret = _starpu_fetch_data_on_node(handle, replicate_1, STARPU_RW, 0, NULL, NULL);
 		STARPU_ASSERT(!ret);
 	}
 }