ソースを参照

Fix reference count for starpu_data_acquire_cb when the target GPU is already full: we do want the request to succeed, even if it is asynchronous.

Samuel Thibault 13 年 前
コミット
07a56c8386

+ 13 - 8
src/datawizard/coherency.c

@@ -291,7 +291,9 @@ static struct _starpu_data_request *_starpu_search_existing_data_request(struct
 		if (mode & STARPU_R)
 		{
 			/* in case the exisiting request did not imply a memory
-			 * transfer yet, we have to increment the refcnt now
+			 * transfer yet, we have to take a second refcnt now
+			 * for the source, in addition to the refcnt for the
+			 * destination
 			 * (so that the source remains valid) */
 			if (!(r->mode & STARPU_R))
 			{
@@ -335,6 +337,7 @@ static struct _starpu_data_request *_starpu_search_existing_data_request(struct
 struct _starpu_data_request *_starpu_create_request_to_fetch_data(starpu_data_handle_t handle,
 								  struct _starpu_data_replicate *dst_replicate,
 								  enum starpu_access_mode mode, unsigned is_prefetch,
+								  unsigned async,
 								  void (*callback_func)(void *), void *callback_arg)
 {
 	unsigned requesting_node = dst_replicate->memory_node;
@@ -451,7 +454,7 @@ struct _starpu_data_request *_starpu_create_request_to_fetch_data(starpu_data_ha
 			_starpu_spin_unlock(&r->lock);
 	}
 
-	if (!is_prefetch)
+	if (!async)
 		requests[nhops - 1]->refcnt++;
 
 
@@ -464,7 +467,7 @@ struct _starpu_data_request *_starpu_create_request_to_fetch_data(starpu_data_ha
 }
 
 int _starpu_fetch_data_on_node(starpu_data_handle_t handle, struct _starpu_data_replicate *dst_replicate,
-			       enum starpu_access_mode mode, unsigned is_prefetch,
+			       enum starpu_access_mode mode, unsigned detached, unsigned async,
 			       void (*callback_func)(void *), void *callback_arg)
 {
 	uint32_t local_node = _starpu_get_local_memory_node();
@@ -473,15 +476,16 @@ int _starpu_fetch_data_on_node(starpu_data_handle_t handle, struct _starpu_data_
 	while (_starpu_spin_trylock(&handle->header_lock))
 		_starpu_datawizard_progress(local_node, 1);
 
-	if (!is_prefetch)
+	if (!detached)
 	{
+		/* Take a reference which will be released by _starpu_release_data_on_node */
 		dst_replicate->refcnt++;
 		dst_replicate->handle->busy_count++;
 	}
 
 	struct _starpu_data_request *r;
 	r = _starpu_create_request_to_fetch_data(handle, dst_replicate, mode,
-						 is_prefetch, callback_func, callback_arg);
+						 detached, async, callback_func, callback_arg);
 
 	/* If no request was created, the handle was already up-to-date on the
 	 * node. In this case, _starpu_create_request_to_fetch_data has already
@@ -491,19 +495,19 @@ int _starpu_fetch_data_on_node(starpu_data_handle_t handle, struct _starpu_data_
 
 	_starpu_spin_unlock(&handle->header_lock);
 
-	int ret = is_prefetch?0:_starpu_wait_data_request_completion(r, 1);
+	int ret = async?0:_starpu_wait_data_request_completion(r, 1);
         _STARPU_LOG_OUT();
         return ret;
 }
 
 static int prefetch_data_on_node(starpu_data_handle_t handle, struct _starpu_data_replicate *replicate, enum starpu_access_mode mode)
 {
-	return _starpu_fetch_data_on_node(handle, replicate, mode, 1, NULL, NULL);
+	return _starpu_fetch_data_on_node(handle, replicate, mode, 1, 1, NULL, NULL);
 }
 
 static int fetch_data(starpu_data_handle_t handle, struct _starpu_data_replicate *replicate, enum starpu_access_mode mode)
 {
-	return _starpu_fetch_data_on_node(handle, replicate, mode, 0, NULL, NULL);
+	return _starpu_fetch_data_on_node(handle, replicate, mode, 0, 0, NULL, NULL);
 }
 
 uint32_t _starpu_get_data_refcnt(starpu_data_handle_t handle, uint32_t node)
@@ -543,6 +547,7 @@ void _starpu_release_data_on_node(starpu_data_handle_t handle, uint32_t default_
 	while (_starpu_spin_trylock(&handle->header_lock))
 		_starpu_datawizard_progress(local_node, 1);
 
+	/* Release refcnt taken by fetch_data_on_node */
 	replicate->refcnt--;
 	STARPU_ASSERT(replicate->refcnt >= 0);
 

+ 11 - 2
src/datawizard/coherency.h

@@ -225,9 +225,13 @@ struct _starpu_data_state
 void _starpu_display_msi_stats(void);
 
 /* This does not take a reference on the handle, the caller has to do it,
- * e.g. through _starpu_attempt_to_submit_data_request_from_apps() */
+ * e.g. through _starpu_attempt_to_submit_data_request_from_apps()
+ * detached means that the core is allowed to drop the request. The caller
+ * should thus *not* take a reference since it can not know whether the request will complete
+ * async means that _starpu_fetch_data_on_node will wait for completion of the request
+ */
 int _starpu_fetch_data_on_node(starpu_data_handle_t handle, struct _starpu_data_replicate *replicate,
-			       enum starpu_access_mode mode, unsigned is_prefetch,
+			       enum starpu_access_mode mode, unsigned detached, unsigned async,
 			       void (*callback_func)(void *), void *callback_arg);
 /* This releases a reference on the handle */
 void _starpu_release_data_on_node(struct _starpu_data_state *state, uint32_t default_wt_mask,
@@ -254,9 +258,14 @@ unsigned starpu_data_test_if_allocated_on_node(starpu_data_handle_t handle, uint
 
 uint32_t _starpu_select_src_node(struct _starpu_data_state *state, unsigned destination);
 
+/* is_prefetch is whether the DSM may drop the request (when there is not enough memory for instance
+ * async is whether the caller wants a reference on the last request, to be
+ * able to wait for it (which will release that reference).
+ */
 struct _starpu_data_request *_starpu_create_request_to_fetch_data(starpu_data_handle_t handle,
 								  struct _starpu_data_replicate *dst_replicate,
 								  enum starpu_access_mode mode, unsigned is_prefetch,
+								  unsigned async,
 								  void (*callback_func)(void *), void *callback_arg);
 
 void _starpu_redux_init_data_replicate(starpu_data_handle_t handle, struct _starpu_data_replicate *replicate, int workerid);

+ 4 - 2
src/datawizard/data_request.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2009-2011  Université de Bordeaux 1
+ * Copyright (C) 2009-2012  Université de Bordeaux 1
  * Copyright (C) 2010, 2011  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -110,6 +110,7 @@ struct _starpu_data_request *_starpu_create_data_request(starpu_data_handle_t ha
 
 	_starpu_spin_lock(&r->lock);
 
+	/* Take a reference on the target for the request to be able to write it */
 	dst_replicate->refcnt++;
 	handle->busy_count++;
 
@@ -117,6 +118,7 @@ struct _starpu_data_request *_starpu_create_data_request(starpu_data_handle_t ha
 	{
 		unsigned src_node = src_replicate->memory_node;
 		dst_replicate->request[src_node] = r;
+		/* Take a reference on the source for the request to be able to read it */
 		src_replicate->refcnt++;
 		handle->busy_count++;
 	}
@@ -279,7 +281,7 @@ static void starpu_handle_data_request_completion(struct _starpu_data_request *r
 
 	r->completed = 1;
 
-	/* Remove a reference on the destination replicate  */
+	/* Remove a reference on the destination replicate for the request */
 	STARPU_ASSERT(dst_replicate->refcnt > 0);
 	dst_replicate->refcnt--;
 	STARPU_ASSERT(handle->busy_count > 0);

+ 1 - 1
src/datawizard/filters.c

@@ -307,7 +307,7 @@ void starpu_data_unpartition(starpu_data_handle_t root_handle, uint32_t gatherin
 		}
 
 		int ret;
-		ret = _starpu_fetch_data_on_node(child_handle, &child_handle->per_node[gathering_node], STARPU_R, 0, NULL, NULL);
+		ret = _starpu_fetch_data_on_node(child_handle, &child_handle->per_node[gathering_node], STARPU_R, 0, 0, NULL, NULL);
 		/* for now we pretend that the RAM is almost unlimited and that gathering
 		 * data should be possible from the node that does the unpartionning ... we
 		 * don't want to have the programming deal with memory shortage at that time,

+ 3 - 3
src/datawizard/interfaces/data_interface.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2009, 2010, 2011  Université de Bordeaux 1
+ * Copyright (C) 2009, 2010, 2011-2012  Université de Bordeaux 1
  * Copyright (C) 2010, 2011, 2012  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -406,7 +406,7 @@ static void _starpu_data_unregister_fetch_data_callback(void *_arg)
 
 	struct _starpu_data_replicate *replicate = &handle->per_node[arg->memory_node];
 
-	ret = _starpu_fetch_data_on_node(handle, replicate, STARPU_R, 0, NULL, NULL);
+	ret = _starpu_fetch_data_on_node(handle, replicate, STARPU_R, 0, 0, NULL, NULL);
 	STARPU_ASSERT(!ret);
 
 	/* unlock the caller */
@@ -445,7 +445,7 @@ static void _starpu_data_unregister(starpu_data_handle_t handle, unsigned cohere
 			{
 				/* no one has locked this data yet, so we proceed immediately */
 				struct _starpu_data_replicate *home_replicate = &handle->per_node[home_node];
-				int ret = _starpu_fetch_data_on_node(handle, home_replicate, STARPU_R, 0, NULL, NULL);
+				int ret = _starpu_fetch_data_on_node(handle, home_replicate, STARPU_R, 0, 0, NULL, NULL);
 				STARPU_ASSERT(!ret);
 			}
 			else

+ 2 - 0
src/datawizard/memalloc.c

@@ -157,6 +157,7 @@ static void transfer_subtree_to_node(starpu_data_handle_t handle, unsigned src_n
 #warning we should use requests during memory reclaim
 #endif
 			/* TODO use request !! */
+			/* Take temporary references on the replicates */
 			src_replicate->refcnt++;
 			dst_replicate->refcnt++;
 			handle->busy_count+=2;
@@ -797,6 +798,7 @@ static ssize_t _starpu_allocate_interface(starpu_data_handle_t handle, struct _s
 			if (starpu_memstrategy_data_size_coefficient*handle->data_size > reclaim)
 				reclaim = starpu_memstrategy_data_size_coefficient*handle->data_size;
 
+			/* Take temporary reference on the replicate */
 			replicate->refcnt++;
 			handle->busy_count++;
 			_starpu_spin_unlock(&handle->header_lock);

+ 14 - 14
src/datawizard/user_interactions.c

@@ -89,7 +89,7 @@ static void _starpu_data_acquire_continuation_non_blocking(void *arg)
 
 	struct _starpu_data_replicate *ram_replicate = &handle->per_node[0];
 
-	ret = _starpu_fetch_data_on_node(handle, ram_replicate, wrapper->mode, 1,
+	ret = _starpu_fetch_data_on_node(handle, ram_replicate, wrapper->mode, 0, 1,
 					 _starpu_data_acquire_fetch_data_callback, wrapper);
 	STARPU_ASSERT(!ret);
 }
@@ -128,14 +128,6 @@ int starpu_data_acquire_cb(starpu_data_handle_t handle,
 	_STARPU_PTHREAD_MUTEX_INIT(&wrapper->lock, NULL);
 	wrapper->finished = 0;
 
-#ifdef STARPU_DEVEL
-#warning TODO instead of having the is_prefetch argument, _starpu_fetch_data shoud consider two flags: async and detached
-#endif
-	_starpu_spin_lock(&handle->header_lock);
-	handle->per_node[0].refcnt++;
-	handle->busy_count++;
-	_starpu_spin_unlock(&handle->header_lock);
-
 	_STARPU_PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
 	int sequential_consistency = handle->sequential_consistency;
 	if (sequential_consistency)
@@ -192,7 +184,7 @@ static inline void _starpu_data_acquire_continuation(void *arg)
 
 	struct _starpu_data_replicate *ram_replicate = &handle->per_node[0];
 
-	_starpu_fetch_data_on_node(handle, ram_replicate, wrapper->mode, 0, NULL, NULL);
+	_starpu_fetch_data_on_node(handle, ram_replicate, wrapper->mode, 0, 0, NULL, NULL);
 
 	/* continuation of starpu_data_acquire */
 	_STARPU_PTHREAD_MUTEX_LOCK(&wrapper->lock);
@@ -282,7 +274,7 @@ int starpu_data_acquire(starpu_data_handle_t handle, enum starpu_access_mode mod
 	{
 		/* no one has locked this data yet, so we proceed immediately */
 		struct _starpu_data_replicate *ram_replicate = &handle->per_node[0];
-		int ret = _starpu_fetch_data_on_node(handle, ram_replicate, mode, 0, NULL, NULL);
+		int ret = _starpu_fetch_data_on_node(handle, ram_replicate, mode, 0, 0, NULL, NULL);
 		STARPU_ASSERT(!ret);
 	}
 	else
@@ -323,7 +315,7 @@ static void _prefetch_data_on_node(void *arg)
         int ret;
 
 	struct _starpu_data_replicate *replicate = &handle->per_node[wrapper->node];
-	ret = _starpu_fetch_data_on_node(handle, replicate, STARPU_R, wrapper->async, NULL, NULL);
+	ret = _starpu_fetch_data_on_node(handle, replicate, STARPU_R, wrapper->async, wrapper->async, NULL, NULL);
         STARPU_ASSERT(!ret);
 
 	if (!wrapper->async)
@@ -362,7 +354,7 @@ int _starpu_prefetch_data_on_node_with_mode(starpu_data_handle_t handle, unsigne
 	{
 		/* we can immediately proceed */
 		struct _starpu_data_replicate *replicate = &handle->per_node[node];
-		_starpu_fetch_data_on_node(handle, replicate, mode, async, NULL, NULL);
+		_starpu_fetch_data_on_node(handle, replicate, mode, async, async, NULL, NULL);
 
 		/* remove the "lock"/reference */
 
@@ -370,6 +362,7 @@ int _starpu_prefetch_data_on_node_with_mode(starpu_data_handle_t handle, unsigne
 
 		if (!async)
 		{
+			/* Release our refcnt, like _starpu_release_data_on_node would do */
 			replicate->refcnt--;
 			STARPU_ASSERT(replicate->refcnt >= 0);
 			STARPU_ASSERT(handle->busy_count > 0);
@@ -377,8 +370,15 @@ int _starpu_prefetch_data_on_node_with_mode(starpu_data_handle_t handle, unsigne
 			_starpu_data_check_not_busy(handle);
 		}
 
+		/* In case there was a temporary handle (eg. used for reduction), this
+		 * handle may have requested to be destroyed when the data is released
+		 * */
+		unsigned handle_was_destroyed = handle->lazy_unregister;
+
 		_starpu_notify_data_dependencies(handle);
-		_starpu_spin_unlock(&handle->header_lock);
+
+		if (!handle_was_destroyed)
+			_starpu_spin_unlock(&handle->header_lock);
 
 	}
 	else if (!async)

+ 2 - 2
src/datawizard/write_back.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2009-2011  Université de Bordeaux 1
+ * Copyright (C) 2009-2012  Université de Bordeaux 1
  * Copyright (C) 2010, 2011  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -58,7 +58,7 @@ void _starpu_write_through_data(starpu_data_handle_t handle, uint32_t requesting
 
 				struct _starpu_data_request *r;
 				r = _starpu_create_request_to_fetch_data(handle, &handle->per_node[node],
-									 STARPU_R, 1, wt_callback, handle);
+									 STARPU_R, 1, 0, wt_callback, handle);
 
 			        /* If no request was created, the handle was already up-to-date on the
 			         * node */

+ 3 - 3
src/debug/latency.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2010-2011  Université de Bordeaux 1
+ * Copyright (C) 2010-2012  Université de Bordeaux 1
  * Copyright (C) 2010, 2011  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -35,7 +35,7 @@ void _starpu_benchmark_ping_pong(starpu_data_handle_t handle,
 		_starpu_spin_unlock(&handle->header_lock);
 
 		struct _starpu_data_replicate *replicate_0 = &handle->per_node[node0];
-		ret = _starpu_fetch_data_on_node(handle, replicate_0, STARPU_RW, 0, NULL, NULL);
+		ret = _starpu_fetch_data_on_node(handle, replicate_0, STARPU_RW, 0, 0, NULL, NULL);
 		STARPU_ASSERT(!ret);
 		_starpu_release_data_on_node(handle, 0, replicate_0);
 
@@ -45,7 +45,7 @@ void _starpu_benchmark_ping_pong(starpu_data_handle_t handle,
 		_starpu_spin_unlock(&handle->header_lock);
 
 		struct _starpu_data_replicate *replicate_1 = &handle->per_node[node1];
-		ret = _starpu_fetch_data_on_node(handle, replicate_1, STARPU_RW, 0, NULL, NULL);
+		ret = _starpu_fetch_data_on_node(handle, replicate_1, STARPU_RW, 0, 0, NULL, NULL);
 		STARPU_ASSERT(!ret);
 		_starpu_release_data_on_node(handle, 0, replicate_1);
 	}