Browse Source

Make starpu_data_unregister wait for all pending requests. This should fix issues with WT, notably

Samuel Thibault 13 years ago
parent
commit
94040d2af4

+ 5 - 1
src/core/dependencies/data_concurrency.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  *
- * Copyright (C) 2010  Université de Bordeaux 1
+ * Copyright (C) 2010-2011  Université de Bordeaux 1
  * Copyright (C) 2010  Centre National de la Recherche Scientifique
  * Copyright (C) 2010  Centre National de la Recherche Scientifique
  *
  *
  * StarPU is free software; you can redistribute it and/or modify
  * StarPU is free software; you can redistribute it and/or modify
@@ -249,8 +249,12 @@ static unsigned unlock_one_requester(starpu_data_requester_t r)
 void _starpu_notify_data_dependencies(starpu_data_handle handle)
 void _starpu_notify_data_dependencies(starpu_data_handle handle)
 {
 {
 	/* A data access has finished so we remove a reference. */
 	/* A data access has finished so we remove a reference. */
+	PTHREAD_MUTEX_LOCK(&handle->refcnt_mutex);
 	STARPU_ASSERT(handle->refcnt > 0);
 	STARPU_ASSERT(handle->refcnt > 0);
 	handle->refcnt--;
 	handle->refcnt--;
+	if (!handle->refcnt)
+		PTHREAD_COND_BROADCAST(&handle->refcnt_cond);
+	PTHREAD_MUTEX_UNLOCK(&handle->refcnt_mutex);
 
 
 	/* The handle has been destroyed in between (eg. this was a temporary
 	/* The handle has been destroyed in between (eg. this was a temporary
 	 * handle created for a reduction.) */
 	 * handle created for a reduction.) */

+ 5 - 0
src/datawizard/coherency.h

@@ -99,6 +99,11 @@ struct starpu_data_state_t {
 	/* the number of requests currently in the scheduling engine
 	/* the number of requests currently in the scheduling engine
 	 * (not in the req_list anymore) */
 	 * (not in the req_list anymore) */
 	unsigned refcnt;
 	unsigned refcnt;
+	/* Condition to make application wait for all transfers before freeing handle */
+	/* TODO: rather free the handle asynchronously? */
+	pthread_mutex_t refcnt_mutex;
+	pthread_cond_t refcnt_cond;
+
 	starpu_access_mode current_mode;
 	starpu_access_mode current_mode;
 	/* protect meta data */
 	/* protect meta data */
 	starpu_spinlock_t header_lock;
 	starpu_spinlock_t header_lock;

+ 2 - 0
src/datawizard/filters.c

@@ -155,6 +155,8 @@ void starpu_data_partition(starpu_data_handle initial_handle, struct starpu_data
 		child->req_list = starpu_data_requester_list_new();
 		child->req_list = starpu_data_requester_list_new();
 		child->reduction_req_list = starpu_data_requester_list_new();
 		child->reduction_req_list = starpu_data_requester_list_new();
 		child->refcnt = 0;
 		child->refcnt = 0;
+		PTHREAD_MUTEX_INIT(&child->refcnt_mutex, NULL);
+		PTHREAD_COND_INIT(&child->refcnt_cond, NULL);
 		child->reduction_refcnt = 0;
 		child->reduction_refcnt = 0;
 		_starpu_spin_init(&child->header_lock);
 		_starpu_spin_init(&child->header_lock);
 
 

+ 12 - 0
src/datawizard/interfaces/data_interface.c

@@ -103,6 +103,8 @@ static void _starpu_register_new_data(starpu_data_handle handle,
 	/* initialize the new lock */
 	/* initialize the new lock */
 	handle->req_list = starpu_data_requester_list_new();
 	handle->req_list = starpu_data_requester_list_new();
 	handle->refcnt = 0;
 	handle->refcnt = 0;
+	PTHREAD_MUTEX_INIT(&handle->refcnt_mutex, NULL);
+	PTHREAD_COND_INIT(&handle->refcnt_cond, NULL);
 	_starpu_spin_init(&handle->header_lock);
 	_starpu_spin_init(&handle->header_lock);
 
 
 	/* first take care to properly lock the data */
 	/* first take care to properly lock the data */
@@ -423,6 +425,11 @@ static void _starpu_data_unregister(starpu_data_handle handle, unsigned coherent
 					PTHREAD_COND_WAIT(&arg.cond, &arg.mutex);
 					PTHREAD_COND_WAIT(&arg.cond, &arg.mutex);
 				PTHREAD_MUTEX_UNLOCK(&arg.mutex);
 				PTHREAD_MUTEX_UNLOCK(&arg.mutex);
 			}
 			}
+			_starpu_spin_lock(&handle->header_lock);
+			STARPU_ASSERT(handle->refcnt > 0);
+			/* Drop the reference count we've acquired by submitting an R data request */
+			handle->refcnt--;
+			_starpu_spin_unlock(&handle->header_lock);
 		}
 		}
 	}
 	}
 	else {
 	else {
@@ -431,6 +438,11 @@ static void _starpu_data_unregister(starpu_data_handle handle, unsigned coherent
 			return;
 			return;
 	}
 	}
 
 
+	/* Wait for all requests to finish (notably WT requests) */
+	PTHREAD_MUTEX_LOCK(&handle->refcnt_mutex);
+	while (handle->refcnt)
+		PTHREAD_COND_WAIT(&handle->refcnt_cond, &handle->refcnt_mutex);
+
 	_starpu_data_free_interfaces(handle);
 	_starpu_data_free_interfaces(handle);
 
 
 	/* Destroy the data now */
 	/* Destroy the data now */