Browse Source

Fix starpu_data_unregister_submit and starpu_data_unregister_no_coherency, to make them properly wait for pending tasks

Samuel Thibault 11 years ago
parent
commit
b59aff6ad5
1 changed files with 50 additions and 17 deletions
  1. 50 17
      src/datawizard/interfaces/data_interface.c

+ 50 - 17
src/datawizard/interfaces/data_interface.c

@@ -50,6 +50,8 @@ struct handle_tag_entry
 static struct handle_tag_entry *registered_tag_handles;
 static struct handle_tag_entry *registered_tag_handles;
 static struct _starpu_spinlock    registered_tag_handles_lock;
 static struct _starpu_spinlock    registered_tag_handles_lock;
 
 
+static void _starpu_data_unregister(starpu_data_handle_t handle, unsigned coherent, unsigned nowait);
+
 void _starpu_data_interface_init(void)
 void _starpu_data_interface_init(void)
 {
 {
 	_starpu_spin_init(&registered_handles_lock);
 	_starpu_spin_init(&registered_handles_lock);
@@ -545,7 +547,7 @@ int _starpu_data_check_not_busy(starpu_data_handle_t handle)
 	if (handle->lazy_unregister && handle->busy_count == 0)
 	if (handle->lazy_unregister && handle->busy_count == 0)
 	{
 	{
 		_starpu_spin_unlock(&handle->header_lock);
 		_starpu_spin_unlock(&handle->header_lock);
-		starpu_data_unregister_no_coherency(handle);
+		_starpu_data_unregister(handle, 0, 1);
 		/* Warning: in case we unregister the handle, we must be sure
 		/* Warning: in case we unregister the handle, we must be sure
 		 * that the caller will not try to unlock the header after
 		 * that the caller will not try to unlock the header after
 		 * !*/
 		 * !*/
@@ -577,18 +579,27 @@ static void _starpu_data_unregister_fetch_data_callback(void *_arg)
 }
 }
 
 
 /* Unregister the data handle, perhaps we don't need to update the home_node
 /* Unregister the data handle, perhaps we don't need to update the home_node
- * (in that case coherent is set to 0) */
-static void _starpu_data_unregister(starpu_data_handle_t handle, unsigned coherent)
+ * (in that case coherent is set to 0)
+ * nowait is for internal use when we already know for sure that we won't have to wait.
+ */
+static void _starpu_data_unregister(starpu_data_handle_t handle, unsigned coherent, unsigned nowait)
 {
 {
 	STARPU_ASSERT(handle);
 	STARPU_ASSERT(handle);
 	STARPU_ASSERT_MSG(handle->nchildren == 0, "data %p needs to be unpartitioned before unregistration", handle);
 	STARPU_ASSERT_MSG(handle->nchildren == 0, "data %p needs to be unpartitioned before unregistration", handle);
+	STARPU_ASSERT(!(nowait && handle->busy_count != 0));
 
 
-	if (coherent)
+	int sequential_consistency = handle->sequential_consistency;
+	if (sequential_consistency && !nowait)
 	{
 	{
 		STARPU_ASSERT_MSG(_starpu_worker_may_perform_blocking_calls(), "starpu_data_unregister must not be called from a task or callback, perhaps you can use starpu_data_unregister_submit instead");
 		STARPU_ASSERT_MSG(_starpu_worker_may_perform_blocking_calls(), "starpu_data_unregister must not be called from a task or callback, perhaps you can use starpu_data_unregister_submit instead");
 
 
 		/* If sequential consistency is enabled, wait until data is available */
 		/* If sequential consistency is enabled, wait until data is available */
 		_starpu_data_wait_until_available(handle, STARPU_RW);
 		_starpu_data_wait_until_available(handle, STARPU_RW);
+	}
+
+	if (coherent && !nowait)
+	{
+		STARPU_ASSERT_MSG(_starpu_worker_may_perform_blocking_calls(), "starpu_data_unregister must not be called from a task or callback, perhaps you can use starpu_data_unregister_submit instead");
 
 
 		/* Fetch data in the home of the data to ensure we have a valid copy
 		/* Fetch data in the home of the data to ensure we have a valid copy
 		 * where we registered it */
 		 * where we registered it */
@@ -717,13 +728,6 @@ static void _starpu_data_unregister(starpu_data_handle_t handle, unsigned cohere
 
 
 	size_t size = _starpu_data_get_size(handle);
 	size_t size = _starpu_data_get_size(handle);
 
 
-	if (handle->unregister_hook)
-	{
-		_starpu_spin_unlock(&handle->header_lock);
-		handle->unregister_hook(handle);
-		_starpu_spin_lock(&handle->header_lock);
-	}
-
 	_starpu_data_unregister_ram_pointer(handle);
 	_starpu_data_unregister_ram_pointer(handle);
 	_starpu_data_free_interfaces(handle);
 	_starpu_data_free_interfaces(handle);
 
 
@@ -765,21 +769,50 @@ static void _starpu_data_unregister(starpu_data_handle_t handle, unsigned cohere
 void starpu_data_unregister(starpu_data_handle_t handle)
 void starpu_data_unregister(starpu_data_handle_t handle)
 {
 {
 	STARPU_ASSERT_MSG(!handle->lazy_unregister, "data %p can not be unregistered twice", handle);
 	STARPU_ASSERT_MSG(!handle->lazy_unregister, "data %p can not be unregistered twice", handle);
-	_starpu_data_unregister(handle, 1);
+
+	if (handle->unregister_hook)
+	{
+		handle->unregister_hook(handle);
+	}
+
+	_starpu_data_unregister(handle, 1, 0);
 }
 }
 
 
 void starpu_data_unregister_no_coherency(starpu_data_handle_t handle)
 void starpu_data_unregister_no_coherency(starpu_data_handle_t handle)
 {
 {
-	_starpu_data_unregister(handle, 0);
+	if (handle->unregister_hook)
+	{
+		handle->unregister_hook(handle);
+	}
+
+	_starpu_data_unregister(handle, 0, 0);
+}
+
+static void _starpu_data_unregister_submit_cb(void *arg)
+{
+	starpu_data_handle_t handle = arg;
+
+	_starpu_spin_lock(&handle->header_lock);
+	handle->lazy_unregister = 1;
+	/* The handle should be busy since we are working on it.
+         * when data_acquire releases the handle, it will be destroyed by _starpu_data_check_not_busy */
+	STARPU_ASSERT(handle->busy_count);
+        _starpu_spin_unlock(&handle->header_lock);
+
+	starpu_data_release_on_node(handle, -1);
 }
 }
 
 
 void starpu_data_unregister_submit(starpu_data_handle_t handle)
 void starpu_data_unregister_submit(starpu_data_handle_t handle)
 {
 {
-	_starpu_spin_lock(&handle->header_lock);
 	STARPU_ASSERT_MSG(!handle->lazy_unregister, "data %p can not be unregistered twice", handle);
 	STARPU_ASSERT_MSG(!handle->lazy_unregister, "data %p can not be unregistered twice", handle);
-	handle->lazy_unregister = 1;
-	_starpu_spin_unlock(&handle->header_lock);
-	_starpu_data_unregister(handle, 0);
+
+	if (handle->unregister_hook)
+	{
+		handle->unregister_hook(handle);
+	}
+
+	/* Wait for all task dependencies on this handle before putting it for free */
+	starpu_data_acquire_on_node_cb(handle, -1, STARPU_RW, _starpu_data_unregister_submit_cb, handle);
 }
 }
 
 
 static void _starpu_data_invalidate(void *data)
 static void _starpu_data_invalidate(void *data)