Browse Source

Fix and test the case where we try to invalidate a data which has some transfers pending. Also make sure that eviction doesn't evict a data being invalidated

Samuel Thibault 8 years ago
parent
commit
c0b04e4d0d

+ 22 - 18
src/datawizard/coherency.c

@@ -483,7 +483,7 @@ struct _starpu_data_request *_starpu_create_request_to_fetch_data(starpu_data_ha
 	/* This function is called with handle's header lock taken */
 	_starpu_spin_checklocked(&handle->header_lock);
 
-	unsigned requesting_node = dst_replicate->memory_node;
+	unsigned requesting_node = dst_replicate ? dst_replicate->memory_node : -1;
 	unsigned nwait = 0;
 
 	if (mode & STARPU_W)
@@ -506,27 +506,30 @@ struct _starpu_data_request *_starpu_create_request_to_fetch_data(starpu_data_ha
 		 */
 	}
 
-	if (dst_replicate->state != STARPU_INVALID && (!nwait || is_prefetch))
+	if ((!dst_replicate || dst_replicate->state != STARPU_INVALID) && (!nwait || is_prefetch))
 	{
+		if (dst_replicate)
+		{
 #ifdef STARPU_MEMORY_STATS
-		enum _starpu_cache_state old_state = dst_replicate->state;
+			enum _starpu_cache_state old_state = dst_replicate->state;
 #endif
-		/* the data is already available and we don't have to wait for
-		 * any request, so we can stop */
-		_starpu_update_data_state(handle, dst_replicate, mode);
-		_starpu_msi_cache_hit(requesting_node);
+			/* the data is already available and we don't have to wait for
+			 * any request, so we can stop */
+			_starpu_update_data_state(handle, dst_replicate, mode);
+			_starpu_msi_cache_hit(requesting_node);
 
 #ifdef STARPU_MEMORY_STATS
-		_starpu_memory_handle_stats_cache_hit(handle, requesting_node);
+			_starpu_memory_handle_stats_cache_hit(handle, requesting_node);
 
-		/* XXX Broken ? */
-		if (old_state == STARPU_SHARED
-		    && dst_replicate->state == STARPU_OWNER)
-			_starpu_memory_handle_stats_shared_to_owner(handle, requesting_node);
+			/* XXX Broken ? */
+			if (old_state == STARPU_SHARED
+			    && dst_replicate->state == STARPU_OWNER)
+				_starpu_memory_handle_stats_shared_to_owner(handle, requesting_node);
 #endif
 
 		if (dst_replicate->mc)
 			_starpu_memchunk_recently_used(dst_replicate->mc, requesting_node);
+		}
 
 		_starpu_spin_unlock(&handle->header_lock);
 
@@ -540,12 +543,12 @@ struct _starpu_data_request *_starpu_create_request_to_fetch_data(starpu_data_ha
 	_starpu_msi_cache_miss(requesting_node);
 
 	/* the only remaining situation is that the local copy was invalid */
-	STARPU_ASSERT(dst_replicate->state == STARPU_INVALID || nwait);
+	STARPU_ASSERT((dst_replicate && dst_replicate->state == STARPU_INVALID) || nwait);
 
 	/* find someone who already has the data */
 	int src_node = -1;
 
-	if (mode & STARPU_R)
+	if (dst_replicate && mode & STARPU_R)
 	{
 		if (dst_replicate->state == STARPU_INVALID)
 			src_node = _starpu_select_src_node(handle, requesting_node);
@@ -557,7 +560,7 @@ struct _starpu_data_request *_starpu_create_request_to_fetch_data(starpu_data_ha
 			mode &= ~STARPU_R;
 		}
 	}
-	else
+	else if (dst_replicate)
 	{
 		/* if the data is in write only mode (and not SCRATCH or REDUX), there is no need for a source, data will be initialized by the task itself */
 		if (mode & STARPU_W)
@@ -731,9 +734,10 @@ int _starpu_fetch_data_on_node(starpu_data_handle_t handle, struct _starpu_data_
 
 	if (!detached)
 	{
-		/* Take a reference which will be released by _starpu_release_data_on_node */
-		dst_replicate->refcnt++;
-		dst_replicate->handle->busy_count++;
+		/* Take references which will be released by _starpu_release_data_on_node */
+		if (dst_replicate)
+			dst_replicate->refcnt++;
+		handle->busy_count++;
 	}
 
 	struct _starpu_data_request *r;

+ 33 - 23
src/datawizard/data_request.c

@@ -152,6 +152,8 @@ struct _starpu_data_request *_starpu_create_data_request(starpu_data_handle_t ha
 	r->dst_replicate = dst_replicate;
 	r->mode = mode;
 	r->async_channel.type = STARPU_UNUSED;
+	if (handling_node == -1)
+		handling_node = STARPU_MAIN_RAM;
 	r->handling_node = handling_node;
 	STARPU_ASSERT(handling_node == STARPU_MAIN_RAM || _starpu_memory_node_get_nworkers(handling_node));
 	r->completed = 0;
@@ -166,7 +168,8 @@ struct _starpu_data_request *_starpu_create_data_request(starpu_data_handle_t ha
 	_starpu_spin_lock(&r->lock);
 
 	/* Take a reference on the target for the request to be able to write it */
-	dst_replicate->refcnt++;
+	if (dst_replicate)
+		dst_replicate->refcnt++;
 	handle->busy_count++;
 
 	if (is_write_invalidation)
@@ -337,32 +340,35 @@ static void starpu_handle_data_request_completion(struct _starpu_data_request *r
 	struct _starpu_data_replicate *dst_replicate = r->dst_replicate;
 
 
+	if (dst_replicate)
+	{
 #ifdef STARPU_MEMORY_STATS
-	enum _starpu_cache_state old_src_replicate_state = src_replicate->state;
+		enum _starpu_cache_state old_src_replicate_state = src_replicate->state;
 #endif
 
-	_starpu_spin_checklocked(&handle->header_lock);
-	_starpu_update_data_state(handle, r->dst_replicate, mode);
+		_starpu_spin_checklocked(&handle->header_lock);
+		_starpu_update_data_state(handle, r->dst_replicate, mode);
 
 #ifdef STARPU_MEMORY_STATS
-	if (src_replicate->state == STARPU_INVALID)
-	{
-		if (old_src_replicate_state == STARPU_OWNER)
-			_starpu_memory_handle_stats_invalidated(handle, src_replicate->memory_node);
-		else
+		if (src_replicate->state == STARPU_INVALID)
 		{
-			/* XXX Currently only ex-OWNER are tagged as invalidated */
-			/* XXX Have to check all old state of every node in case a SHARED data become OWNED by the dst_replicate */
-		}
+			if (old_src_replicate_state == STARPU_OWNER)
+				_starpu_memory_handle_stats_invalidated(handle, src_replicate->memory_node);
+			else
+			{
+				/* XXX Currently only ex-OWNER are tagged as invalidated */
+				/* XXX Have to check all old state of every node in case a SHARED data become OWNED by the dst_replicate */
+			}
 
-	}
-	if (dst_replicate->state == STARPU_SHARED)
-		_starpu_memory_handle_stats_loaded_shared(handle, dst_replicate->memory_node);
-	else if (dst_replicate->state == STARPU_OWNER)
-	{
-		_starpu_memory_handle_stats_loaded_owner(handle, dst_replicate->memory_node);
-	}
+		}
+		if (dst_replicate->state == STARPU_SHARED)
+			_starpu_memory_handle_stats_loaded_shared(handle, dst_replicate->memory_node);
+		else if (dst_replicate->state == STARPU_OWNER)
+		{
+			_starpu_memory_handle_stats_loaded_owner(handle, dst_replicate->memory_node);
+		}
 #endif
+	}
 
 	if (r->com_id > 0)
 	{
@@ -389,12 +395,16 @@ static void starpu_handle_data_request_completion(struct _starpu_data_request *r
 
 #ifdef STARPU_SIMGRID
 	/* Wake potential worker which was waiting for it */
-	_starpu_wake_all_blocked_workers_on_node(dst_replicate->memory_node);
+	if (dst_replicate)
+		_starpu_wake_all_blocked_workers_on_node(dst_replicate->memory_node);
 #endif
 
 	/* Remove a reference on the destination replicate for the request */
-	STARPU_ASSERT(dst_replicate->refcnt > 0);
-	dst_replicate->refcnt--;
+	if (dst_replicate)
+	{
+		STARPU_ASSERT(dst_replicate->refcnt > 0);
+		dst_replicate->refcnt--;
+	}
 	STARPU_ASSERT(handle->busy_count > 0);
 	handle->busy_count--;
 
@@ -479,7 +489,7 @@ static int starpu_handle_data_request(struct _starpu_data_request *r, unsigned m
 	/* the header of the data must be locked by the worker that submitted the request */
 
 
-	if (dst_replicate->state == STARPU_INVALID)
+	if (dst_replicate && dst_replicate->state == STARPU_INVALID)
 		r->retval = _starpu_driver_copy_data_1_to_1(handle, src_replicate,
 						    dst_replicate, !(r_mode & STARPU_R), r, may_alloc, prefetch);
 	else

+ 7 - 0
src/datawizard/memalloc.c

@@ -209,6 +209,9 @@ static unsigned may_free_subtree(starpu_data_handle_t handle, unsigned node)
 
 	if (handle->current_mode == STARPU_W)
 	{
+		if (handle->write_invalidation_req)
+			/* Some request is invalidating it anyway */
+			return 0;
 		unsigned n;
 		for (n = 0; n < STARPU_MAXNODES; n++)
 			if (_starpu_get_data_refcnt(handle, n))
@@ -1010,6 +1013,10 @@ void starpu_memchunk_tidy(unsigned node)
 
 			if (handle->current_mode == STARPU_W)
 			{
+				if (handle->write_invalidation_req)
+					/* Some request is invalidating it anyway */
+					continue;
+
 				unsigned n;
 				for (n = 0; n < STARPU_MAXNODES; n++)
 					if (_starpu_get_data_refcnt(handle, n))

+ 17 - 23
src/datawizard/user_interactions.c

@@ -94,16 +94,12 @@ static void _starpu_data_acquire_continuation_non_blocking(void *arg)
 
 	STARPU_ASSERT(handle);
 
-	if (wrapper->node >= 0)
-	{
-		struct _starpu_data_replicate *replicate = &handle->per_node[wrapper->node];
+	struct _starpu_data_replicate *replicate =
+		wrapper->node >= 0 ? &handle->per_node[wrapper->node] : NULL;
 
-		ret = _starpu_fetch_data_on_node(handle, replicate, wrapper->mode, 0, 0, 1,
-				_starpu_data_acquire_fetch_data_callback, wrapper, 0, "_starpu_data_acquire_continuation_non_blocking");
-		STARPU_ASSERT(!ret);
-	}
-	else
-		_starpu_data_acquire_fetch_data_callback(wrapper);
+	ret = _starpu_fetch_data_on_node(handle, replicate, wrapper->mode, 0, 0, 1,
+			_starpu_data_acquire_fetch_data_callback, wrapper, 0, "_starpu_data_acquire_continuation_non_blocking");
+	STARPU_ASSERT(!ret);
 }
 
 static void starpu_data_acquire_cb_pre_sync_callback(void *arg)
@@ -213,14 +209,13 @@ static inline void _starpu_data_acquire_continuation(void *arg)
 
 	STARPU_ASSERT(handle);
 
-	if (wrapper->node >= 0)
-	{
-		int ret;
-		struct _starpu_data_replicate *replicate = &handle->per_node[wrapper->node];
+	struct _starpu_data_replicate *replicate =
+		wrapper->node >= 0 ? &handle->per_node[wrapper->node] : NULL;
 
-		ret = _starpu_fetch_data_on_node(handle, replicate, wrapper->mode, 0, 0, 0, NULL, NULL, 0, "_starpu_data_acquire_continuation");
-		STARPU_ASSERT(!ret);
-	}
+	int ret;
+
+	ret = _starpu_fetch_data_on_node(handle, replicate, wrapper->mode, 0, 0, 0, NULL, NULL, 0, "_starpu_data_acquire_continuation");
+	STARPU_ASSERT(!ret);
 
 	/* continuation of starpu_data_acquire */
 	STARPU_PTHREAD_MUTEX_LOCK(&wrapper->lock);
@@ -302,13 +297,11 @@ int starpu_data_acquire_on_node(starpu_data_handle_t handle, int node, enum star
  	* available again, otherwise we fetch the data directly */
 	if (!_starpu_attempt_to_submit_data_request_from_apps(handle, mode, _starpu_data_acquire_continuation, &wrapper))
 	{
-		if (node >= 0)
-		{
-			/* no one has locked this data yet, so we proceed immediately */
-			struct _starpu_data_replicate *replicate = &handle->per_node[node];
-			int ret = _starpu_fetch_data_on_node(handle, replicate, mode, 0, 0, 0, NULL, NULL, 0, "starpu_data_acquire_on_node");
-			STARPU_ASSERT(!ret);
-		}
+		struct _starpu_data_replicate *replicate =
+			node >= 0 ? &handle->per_node[node] : NULL;
+		/* no one has locked this data yet, so we proceed immediately */
+		int ret = _starpu_fetch_data_on_node(handle, replicate, mode, 0, 0, 0, NULL, NULL, 0, "starpu_data_acquire_on_node");
+		STARPU_ASSERT(!ret);
 	}
 	else
 	{
@@ -351,6 +344,7 @@ void starpu_data_release_on_node(starpu_data_handle_t handle, int node)
 	else
 	{
 		_starpu_spin_lock(&handle->header_lock);
+		handle->busy_count--;
 		if (!_starpu_notify_data_dependencies(handle))
 			_starpu_spin_unlock(&handle->header_lock);
 	}

+ 1 - 0
tests/Makefile.am

@@ -253,6 +253,7 @@ myPROGRAMS +=				\
 	datawizard/specific_node		\
 	datawizard/task_with_multiple_time_the_same_handle	\
 	datawizard/test_arbiter			\
+	datawizard/invalidate_pending_requests	\
 	disk/disk_copy				\
 	disk/disk_compute			\
 	disk/disk_pack				\

+ 59 - 0
tests/datawizard/invalidate_pending_requests.c

@@ -0,0 +1,59 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2010, 2016  Université de Bordeaux
+ * Copyright (C) 2010, 2011, 2012, 2013  CNRS
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <starpu.h>
+#include "../helper.h"
+
+/*
+ * Try invalidating a variable which is pending a request
+ */
+#define SIZE (100<<20)
+
+int main(int argc, char **argv)
+{
+	int ret;
+	char *var = malloc(SIZE);
+	starpu_data_handle_t handle;
+
+	ret = starpu_init(NULL);
+	if (ret == -ENODEV) return STARPU_TEST_SKIPPED;
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
+
+	if (starpu_worker_get_count_by_type(STARPU_CUDA_WORKER) == 0 &&
+		starpu_worker_get_count_by_type(STARPU_OPENCL_WORKER) == 0)
+		goto enodev;
+
+	starpu_variable_data_register(&handle, STARPU_MAIN_RAM, (uintptr_t)var, SIZE);
+
+	/* Let a request fly */
+	starpu_fxt_trace_user_event_string("requesting");
+	starpu_data_fetch_on_node(handle, 1, 1);
+	starpu_fxt_trace_user_event_string("requested");
+	/* But suddenly invalidate the data while it's on the fly! */
+	starpu_data_invalidate_submit(handle);
+	starpu_fxt_trace_user_event_string("invalidated");
+
+	starpu_data_unregister(handle);
+
+	starpu_shutdown();
+
+	return 0;
+
+enodev:
+	starpu_shutdown();
+	return STARPU_TEST_SKIPPED;
+}