Pārlūkot izejas kodu

Make starpu_data_unregister also wait for replicate refcnt and queued requests, should fix yet more odd issues

Samuel Thibault 13 gadi atpakaļ
vecāks
revīzija
3c0a0e89cc

+ 15 - 5
src/core/dependencies/data_concurrency.c

@@ -141,6 +141,7 @@ static unsigned _starpu_attempt_to_submit_data_request(unsigned request_from_cod
 		/* there cannot be multiple writers or a new writer
 		 * while the data is in read mode */
 		
+		handle->busy_count++;
 		/* enqueue the request */
 		starpu_data_requester_t r = starpu_data_requester_new();
 			r->mode = mode;
@@ -161,6 +162,7 @@ static unsigned _starpu_attempt_to_submit_data_request(unsigned request_from_cod
 	}
 	else {
 		handle->refcnt++;
+		handle->busy_count++;
 
 		handle->current_mode = mode;
 
@@ -249,20 +251,22 @@ static unsigned unlock_one_requester(starpu_data_requester_t r)
 void _starpu_notify_data_dependencies(starpu_data_handle handle)
 {
 	/* A data access has finished so we remove a reference. */
-	PTHREAD_MUTEX_LOCK(&handle->refcnt_mutex);
 	STARPU_ASSERT(handle->refcnt > 0);
 	handle->refcnt--;
-	if (!handle->refcnt)
-		PTHREAD_COND_BROADCAST(&handle->refcnt_cond);
-	PTHREAD_MUTEX_UNLOCK(&handle->refcnt_mutex);
+	PTHREAD_MUTEX_LOCK(&handle->busy_mutex);
+	STARPU_ASSERT(handle->busy_count > 0);
+	if (!--handle->busy_count)
+		PTHREAD_COND_BROADCAST(&handle->busy_cond);
+	PTHREAD_MUTEX_UNLOCK(&handle->busy_mutex);
 
 	/* The handle has been destroyed in between (eg. this was a temporary
 	 * handle created for a reduction.) */
 	if (handle->lazy_unregister && handle->refcnt == 0)
 	{
+		_starpu_spin_unlock(&handle->header_lock);
 		starpu_data_unregister_no_coherency(handle);
 		/* Warning: in case we unregister the handle, we must be sure
-		 * that the application will not try to unlock the header after
+		 * that the caller will not try to unlock the header after
 		 * !*/
 		return;
 	}
@@ -309,6 +313,7 @@ void _starpu_notify_data_dependencies(starpu_data_handle handle)
 			/* The data is now attributed to that request so we put a
 			 * reference on it. */
 			handle->refcnt++;
+			handle->busy_count++;
 		
 			starpu_access_mode previous_mode = handle->current_mode;
 			handle->current_mode = r_mode;
@@ -338,6 +343,11 @@ void _starpu_notify_data_dependencies(starpu_data_handle handle)
 			starpu_data_requester_delete(r);
 			
 			_starpu_spin_lock(&handle->header_lock);
+			PTHREAD_MUTEX_LOCK(&handle->busy_mutex);
+			STARPU_ASSERT(handle->busy_count > 0);
+			if (!--handle->busy_count)
+				PTHREAD_COND_BROADCAST(&handle->busy_cond);
+			PTHREAD_MUTEX_UNLOCK(&handle->busy_mutex);
 		}
 	}
 }

+ 12 - 3
src/datawizard/coherency.c

@@ -283,8 +283,10 @@ static starpu_data_request_t _starpu_search_existing_data_request(struct starpu_
 			/* in case the exisiting request did not imply a memory
 			 * transfer yet, we have to increment the refcnt now
 			 * (so that the source remains valid) */
-			if (!(r->mode & STARPU_R))
+			if (!(r->mode & STARPU_R)) {
 				replicate->refcnt++;
+				replicate->handle->busy_count++;
+			}
 
 			r->mode |= STARPU_R;
 		}
@@ -459,8 +461,10 @@ int _starpu_fetch_data_on_node(starpu_data_handle handle, struct starpu_data_rep
 	while (_starpu_spin_trylock(&handle->header_lock))
 		_starpu_datawizard_progress(local_node, 1);
 
-	if (!is_prefetch)
+	if (!is_prefetch) {
 		dst_replicate->refcnt++;
+		dst_replicate->handle->busy_count++;
+	}
 
 	starpu_data_request_t r;
 	r = create_request_to_fetch_data(handle, dst_replicate, mode,
@@ -527,9 +531,14 @@ void _starpu_release_data_on_node(starpu_data_handle handle, uint32_t default_wt
 		_starpu_datawizard_progress(local_node, 1);
 
 	replicate->refcnt--;
-
 	STARPU_ASSERT(replicate->refcnt >= 0);
 
+	PTHREAD_MUTEX_LOCK(&handle->busy_mutex);
+	STARPU_ASSERT(handle->busy_count > 0);
+	if (!--handle->busy_count)
+		PTHREAD_COND_BROADCAST(&handle->busy_cond);
+	PTHREAD_MUTEX_UNLOCK(&handle->busy_mutex);
+
 	/* In case there was a temporary handle (eg. used for reduction), this
 	 * handle may have requested to be destroyed when the data is released
 	 * */

+ 11 - 7
src/datawizard/coherency.h

@@ -96,18 +96,22 @@ struct starpu_task_wrapper_list {
 
 struct starpu_data_state_t {
 	struct starpu_data_requester_list_s *req_list;
-	/* the number of requests currently in the scheduling engine
-	 * (not in the req_list anymore) */
+	/* the number of requests currently in the scheduling engine (not in
+	 * the req_list anymore), i.e. the number of holders of the
+	 * current_mode rwlock */
 	unsigned refcnt;
-	/* Condition to make application wait for all transfers before freeing handle */
-	/* TODO: rather free the handle asynchronously? */
-	pthread_mutex_t refcnt_mutex;
-	pthread_cond_t refcnt_cond;
-
 	starpu_access_mode current_mode;
 	/* protect meta data */
 	starpu_spinlock_t header_lock;
 
+	/* Condition to make application wait for all transfers before freeing handle */
+	/* busy_count is the number of handle->refcnt, handle->per_node[*]->refcnt, and number of starpu_data_requesters */
+	/* Core code which releases busy_count has to broadcast busy_cond to
+	 * let starpu_data_unregister proceed */
+	unsigned busy_count;
+	pthread_mutex_t busy_mutex;
+	pthread_cond_t busy_cond;
+
 	/* In case we user filters, the handle may describe a sub-data */
 	struct starpu_data_state_t *root_handle; /* root of the tree */
 	struct starpu_data_state_t *father_handle; /* father of the node, NULL if the current node is the root */

+ 15 - 1
src/datawizard/data_request.c

@@ -110,12 +110,14 @@ starpu_data_request_t _starpu_create_data_request(starpu_data_handle handle,
 	_starpu_spin_lock(&r->lock);
 
 	dst_replicate->refcnt++;
+	handle->busy_count++;
 
 	if (mode & STARPU_R)
 	{
 		unsigned src_node = src_replicate->memory_node;
 		dst_replicate->request[src_node] = r;
 		src_replicate->refcnt++;
+		handle->busy_count++;
 	}
 	else {
 		unsigned dst_node = dst_replicate->memory_node;
@@ -284,6 +286,17 @@ static void starpu_handle_data_request_completion(starpu_data_request_t r)
 		src_replicate->refcnt--;
 	}
 
+	PTHREAD_MUTEX_LOCK(&handle->busy_mutex);
+	STARPU_ASSERT(handle->busy_count > 0);
+	handle->busy_count--;
+	if (mode & STARPU_R) {
+		STARPU_ASSERT(handle->busy_count > 0);
+		handle->busy_count--;
+	}
+	if (!handle->busy_count)
+		PTHREAD_COND_BROADCAST(&handle->busy_cond);
+	PTHREAD_MUTEX_UNLOCK(&handle->busy_mutex);
+
 	r->refcnt--;
 
 	/* if nobody is waiting on that request, we can get rid of it */
@@ -573,7 +586,8 @@ void _starpu_update_prefetch_status(starpu_data_request_t r){
 	for (chained_req = 0; chained_req < r->next_req_count; chained_req++)
 	{
 		struct starpu_data_request_s *next_req = r->next_req[chained_req];
-		_starpu_update_prefetch_status(next_req);		
+		if (next_req->prefetch)
+			_starpu_update_prefetch_status(next_req);
 	}
 
 	PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[r->handling_node]);

+ 3 - 2
src/datawizard/filters.c

@@ -155,8 +155,9 @@ void starpu_data_partition(starpu_data_handle initial_handle, struct starpu_data
 		child->req_list = starpu_data_requester_list_new();
 		child->reduction_req_list = starpu_data_requester_list_new();
 		child->refcnt = 0;
-		PTHREAD_MUTEX_INIT(&child->refcnt_mutex, NULL);
-		PTHREAD_COND_INIT(&child->refcnt_cond, NULL);
+		child->busy_count = 0;
+		PTHREAD_MUTEX_INIT(&child->busy_mutex, NULL);
+		PTHREAD_COND_INIT(&child->busy_cond, NULL);
 		child->reduction_refcnt = 0;
 		_starpu_spin_init(&child->header_lock);
 

+ 9 - 10
src/datawizard/interfaces/data_interface.c

@@ -103,8 +103,9 @@ static void _starpu_register_new_data(starpu_data_handle handle,
 	/* initialize the new lock */
 	handle->req_list = starpu_data_requester_list_new();
 	handle->refcnt = 0;
-	PTHREAD_MUTEX_INIT(&handle->refcnt_mutex, NULL);
-	PTHREAD_COND_INIT(&handle->refcnt_cond, NULL);
+	handle->busy_count = 0;
+	PTHREAD_MUTEX_INIT(&handle->busy_mutex, NULL);
+	PTHREAD_COND_INIT(&handle->busy_cond, NULL);
 	_starpu_spin_init(&handle->header_lock);
 
 	/* first take care to properly lock the data */
@@ -425,11 +426,7 @@ static void _starpu_data_unregister(starpu_data_handle handle, unsigned coherent
 					PTHREAD_COND_WAIT(&arg.cond, &arg.mutex);
 				PTHREAD_MUTEX_UNLOCK(&arg.mutex);
 			}
-			_starpu_spin_lock(&handle->header_lock);
-			STARPU_ASSERT(handle->refcnt > 0);
-			/* Drop the reference count we've acquired by submitting an R data request */
-			handle->refcnt--;
-			_starpu_spin_unlock(&handle->header_lock);
+			_starpu_release_data_on_node(handle, 0, &handle->per_node[home_node]);
 		}
 	}
 	else {
@@ -439,10 +436,12 @@ static void _starpu_data_unregister(starpu_data_handle handle, unsigned coherent
 	}
 
 	/* Wait for all requests to finish (notably WT requests) */
-	PTHREAD_MUTEX_LOCK(&handle->refcnt_mutex);
-	while (handle->refcnt)
-		PTHREAD_COND_WAIT(&handle->refcnt_cond, &handle->refcnt_mutex);
+	PTHREAD_MUTEX_LOCK(&handle->busy_mutex);
+	while (handle->busy_count)
+		PTHREAD_COND_WAIT(&handle->busy_cond, &handle->busy_mutex);
 
+	/* Wait for finished requests to release the handle */
+	_starpu_spin_lock(&handle->header_lock);
 	_starpu_data_free_interfaces(handle);
 
 	/* Destroy the data now */

+ 13 - 0
src/datawizard/memalloc.c

@@ -156,12 +156,18 @@ static void transfer_subtree_to_node(starpu_data_handle handle, unsigned src_nod
 			/* TODO use request !! */
 			src_replicate->refcnt++;
 			dst_replicate->refcnt++;
+			handle->busy_count+=2;
 
 			ret = _starpu_driver_copy_data_1_to_1(handle, src_replicate, dst_replicate, 0, NULL, 1);
 			STARPU_ASSERT(ret == 0);
 
 			src_replicate->refcnt--;
 			dst_replicate->refcnt--;
+			PTHREAD_MUTEX_LOCK(&handle->busy_mutex);
+			STARPU_ASSERT(handle->busy_count >= 2);
+			if (!(handle->busy_count -= 2))
+				PTHREAD_COND_BROADCAST(&handle->busy_cond);
+			PTHREAD_MUTEX_UNLOCK(&handle->busy_mutex);
 
 			break;
 		case STARPU_SHARED:
@@ -782,6 +788,7 @@ static ssize_t _starpu_allocate_interface(starpu_data_handle handle, struct star
 				reclaim = starpu_memstrategy_data_size_coefficient*handle->data_size;
 
 			replicate->refcnt++;
+			handle->busy_count++;
 			_starpu_spin_unlock(&handle->header_lock);
 
 			STARPU_TRACE_START_MEMRECLAIM(dst_node);
@@ -795,6 +802,12 @@ static ssize_t _starpu_allocate_interface(starpu_data_handle handle, struct star
 		                _starpu_datawizard_progress(_starpu_get_local_memory_node(), 0);
 
 			replicate->refcnt--;
+			STARPU_ASSERT(replicate->refcnt >= 0);
+			PTHREAD_MUTEX_LOCK(&handle->busy_mutex);
+			STARPU_ASSERT(handle->busy_count > 0);
+			if (!--handle->busy_count)
+				PTHREAD_COND_BROADCAST(&handle->busy_cond);
+			PTHREAD_MUTEX_UNLOCK(&handle->busy_mutex);
 		}
 
 	} while((allocated_memory == -ENOMEM) && attempts++ < 2);

+ 8 - 0
src/datawizard/user_interactions.c

@@ -130,6 +130,7 @@ int starpu_data_acquire_cb(starpu_data_handle handle,
 #endif
 	_starpu_spin_lock(&handle->header_lock);
 	handle->per_node[0].refcnt++;
+	handle->busy_count++;
 	_starpu_spin_unlock(&handle->header_lock);
 
 	PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
@@ -334,10 +335,17 @@ int _starpu_prefetch_data_on_node_with_mode(starpu_data_handle handle, unsigned
 		/* remove the "lock"/reference */
 
 		_starpu_spin_lock(&handle->header_lock);
+
 		if (!async) {
 			replicate->refcnt--;
 			STARPU_ASSERT(replicate->refcnt >= 0);
+			PTHREAD_MUTEX_LOCK(&handle->busy_mutex);
+			STARPU_ASSERT(handle->busy_count > 0);
+			if (!--handle->busy_count)
+				PTHREAD_COND_BROADCAST(&handle->busy_cond);
+			PTHREAD_MUTEX_UNLOCK(&handle->busy_mutex);
 		}
+
 		_starpu_notify_data_dependencies(handle);
 		_starpu_spin_unlock(&handle->header_lock);
 

+ 1 - 0
src/datawizard/write_back.c

@@ -50,6 +50,7 @@ void _starpu_write_through_data(starpu_data_handle handle, uint32_t requesting_n
 				STARPU_ASSERT(handle->current_mode != STARPU_REDUX);
 				STARPU_ASSERT(handle->current_mode != STARPU_SCRATCH);
 				handle->refcnt++;
+				handle->busy_count++;
 				handle->current_mode = STARPU_R;
 
 				starpu_data_request_t r;

+ 3 - 2
tools/gdbinit

@@ -150,7 +150,8 @@ define starpu-print-data
   set $data = (starpu_data_handle) $arg0
   printf "Data handle %p\n", $data
   printf "Home node %d\n", $data->home_node
-  printf "Requests %d\n", $data->refcnt
+  printf "RWlock refs %d\n", $data->refcnt
+  printf "Busy count %d\n", $data->busy_count
   printf "Current mode "
   starpu-print-mode $data->current_mode
   printf "\n"
@@ -178,7 +179,7 @@ define starpu-print-data
     set $n = 0
     while $n < descr.nnodes
       set $replicate = &$data->per_node[$n]
-      printf "Node %2d:", $n
+      printf "Node %2d (%2d):", $n, $replicate->refcnt
       if $replicate->state == 0
         printf " OWNER"
       end