Przeglądaj źródła

Move unregistering tasks from implicit dependencies lists from
_starpu_push_task_output to _starpu_handle_job_termination. This will be needed
for some experimental work on dividable tasks.

This requires keeping a reference in the meanwhile, counted in busy_count, to make sure the data does not disappear.

Along the way, make lazy unregistration wait for busy_count being 0, not refcnt, and thus make _starpu_data_check_not_busy unregister data lazily as needed.

Make _starpu_notify_data_dependencies and _starpu_data_check_not_busy tell when they have unregistered the data, so that caller can know whether they can use it again or not.

Samuel Thibault 13 lat temu
rodzic
commit
9badf4f117

+ 12 - 16
src/core/dependencies/data_concurrency.c

@@ -260,8 +260,11 @@ static unsigned unlock_one_requester(struct _starpu_data_requester *r)
 		return 0;
 }
 
-/* The header lock must already be taken by the caller */
-void _starpu_notify_data_dependencies(starpu_data_handle_t handle)
+/* The header lock must already be taken by the caller.
+ * This may free the handle if it was lazily unregistered (1 is returned in
+ * that case). The handle pointer thus becomes invalid for the caller.
+ */
+int _starpu_notify_data_dependencies(starpu_data_handle_t handle)
 {
 	_starpu_spin_checklocked(&handle->header_lock);
 	/* A data access has finished so we remove a reference. */
@@ -269,19 +272,9 @@ void _starpu_notify_data_dependencies(starpu_data_handle_t handle)
 	handle->refcnt--;
 	STARPU_ASSERT(handle->busy_count > 0);
 	handle->busy_count--;
-	_starpu_data_check_not_busy(handle);
-
-	/* The handle has been destroyed in between (eg. this was a temporary
-	 * handle created for a reduction.) */
-	if (handle->lazy_unregister && handle->refcnt == 0)
-	{
-		_starpu_spin_unlock(&handle->header_lock);
-		starpu_data_unregister_no_coherency(handle);
-		/* Warning: in case we unregister the handle, we must be sure
-		 * that the caller will not try to unlock the header after
-		 * !*/
-		return;
-	}
+	if (_starpu_data_check_not_busy(handle))
+		/* Handle was destroyed, nothing left to do.  */
+		return 1;
 
 	/* In case there is a pending reduction, and that this is the last
 	 * requester, we may go back to a "normal" coherency model. */
@@ -358,7 +351,10 @@ void _starpu_notify_data_dependencies(starpu_data_handle_t handle)
 			_starpu_spin_lock(&handle->header_lock);
 			STARPU_ASSERT(handle->busy_count > 0);
 			handle->busy_count--;
-			_starpu_data_check_not_busy(handle);
+			if (_starpu_data_check_not_busy(handle))
+				return 1;
 		}
 	}
+
+	return 0;
 }

+ 2 - 2
src/core/dependencies/data_concurrency.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2010  Université de Bordeaux 1
+ * Copyright (C) 2010, 2012  Université de Bordeaux 1
  * Copyright (C) 2010, 2011  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -22,7 +22,7 @@
 
 unsigned _starpu_submit_job_enforce_data_deps(struct _starpu_job *j);
 
-void _starpu_notify_data_dependencies(starpu_data_handle_t handle);
+int _starpu_notify_data_dependencies(starpu_data_handle_t handle);
 
 unsigned _starpu_attempt_to_submit_data_request_from_apps(starpu_data_handle_t handle,
 							  enum starpu_access_mode mode,

+ 35 - 0
src/core/dependencies/implicit_data_deps.c

@@ -405,6 +405,41 @@ void _starpu_release_data_enforce_sequential_consistency(struct starpu_task *tas
 	_STARPU_PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
 }
 
+/* This is the same as _starpu_release_data_enforce_sequential_consistency, but
+ * for all data of a task */
+void _starpu_release_task_enforce_sequential_consistency(struct _starpu_job *j)
+{
+	struct starpu_task *task = j->task;
+        struct starpu_buffer_descr *descrs = j->ordered_buffers;
+
+	if (!task->cl)
+		return;
+
+        unsigned nbuffers = task->cl->nbuffers;
+
+	unsigned index;
+	for (index = 0; index < nbuffers; index++)
+	{
+		starpu_data_handle_t handle = descrs[index].handle;
+
+		if (index && descrs[index-1].handle == descrs[index].handle)
+			/* We have already released this data, skip it. This
+			 * depends on ordering putting writes before reads, see
+			 * _starpu_compar_handles */
+			continue;
+
+		_starpu_release_data_enforce_sequential_consistency(task, handle);
+		/* Release the reference acquired in _starpu_push_task_output */
+		_starpu_spin_lock(&handle->header_lock);
+		STARPU_ASSERT(handle->busy_count > 0);
+		handle->busy_count--;
+		if (!_starpu_data_check_not_busy(handle))
+			_starpu_spin_unlock(&handle->header_lock);
+
+	}
+}
+
+
 void _starpu_add_post_sync_tasks(struct starpu_task *post_sync_task, starpu_data_handle_t handle)
 {
         _STARPU_LOG_IN();

+ 1 - 0
src/core/dependencies/implicit_data_deps.h

@@ -25,6 +25,7 @@ struct starpu_task *_starpu_detect_implicit_data_deps_with_handle(struct starpu_
 						   starpu_data_handle_t handle, enum starpu_access_mode mode);
 void _starpu_detect_implicit_data_deps(struct starpu_task *task);
 void _starpu_release_data_enforce_sequential_consistency(struct starpu_task *task, starpu_data_handle_t handle);
+void _starpu_release_task_enforce_sequential_consistency(struct _starpu_job *j);
 
 void _starpu_add_post_sync_tasks(struct starpu_task *post_sync_task, starpu_data_handle_t handle);
 void _starpu_unlock_post_sync_tasks(starpu_data_handle_t handle);

+ 3 - 0
src/core/jobs.c

@@ -162,6 +162,9 @@ void _starpu_handle_job_termination(struct _starpu_job *j)
 
 	_STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
 
+	/* Tell other tasks that we don't exist any more, thus no need for
+	 * implicit dependencies any more.  */
+	_starpu_release_task_enforce_sequential_consistency(j);
 	/* Task does not have a cl, but has explicit data dependencies, we need
 	 * to tell them that we will not exist any more before notifying the
 	 * tasks waiting for us */

+ 6 - 16
src/datawizard/coherency.c

@@ -559,16 +559,8 @@ void _starpu_release_data_on_node(starpu_data_handle_t handle, uint32_t default_
 
 	STARPU_ASSERT(handle->busy_count > 0);
 	handle->busy_count--;
-	_starpu_data_check_not_busy(handle);
 
-	/* In case there was a temporary handle (eg. used for reduction), this
-	 * handle may have requested to be destroyed when the data is released
-	 * */
-	unsigned handle_was_destroyed = handle->lazy_unregister;
-
-	_starpu_notify_data_dependencies(handle);
-
-	if (!handle_was_destroyed)
+	if (!_starpu_notify_data_dependencies(handle))
 		_starpu_spin_unlock(&handle->header_lock);
 }
 
@@ -723,15 +715,13 @@ void _starpu_push_task_output(struct _starpu_job *j, uint32_t mask)
 
 		local_replicate = get_replicate(handle, mode, workerid, local_memory_node);
 
-		/* In case there was a temporary handle (eg. used for
-		 * reduction), this handle may have requested to be destroyed
-		 * when the data is released
-		 * */
-		unsigned handle_was_destroyed = handle->lazy_unregister;
+		/* Keep a reference for future
+		 * _starpu_release_task_enforce_sequential_consistency call */
+		_starpu_spin_lock(&handle->header_lock);
+		handle->busy_count++;
+		_starpu_spin_unlock(&handle->header_lock);
 
 		_starpu_release_data_on_node(handle, mask, local_replicate);
-		if (!handle_was_destroyed)
-			_starpu_release_data_enforce_sequential_consistency(task, handle);
 	}
 
 	if (profiling && task->profiling_info)

+ 1 - 1
src/datawizard/coherency.h

@@ -110,7 +110,7 @@ struct _starpu_data_state
 	struct _starpu_spinlock header_lock;
 
 	/* Condition to make application wait for all transfers before freeing handle */
-	/* busy_count is the number of handle->refcnt, handle->per_node[*]->refcnt, and number of starpu_data_requesters */
+	/* busy_count is the number of handle->refcnt, handle->per_node[*]->refcnt, number of starpu_data_requesters, and number of tasks that have released it but are still registered on the implicit data dependency lists. */
 	/* Core code which releases busy_count has to call
 	 * _starpu_data_check_not_busy to let starpu_data_unregister proceed */
 	unsigned busy_count;

+ 3 - 2
src/datawizard/data_request.c

@@ -296,7 +296,7 @@ static void starpu_handle_data_request_completion(struct _starpu_data_request *r
 		handle->busy_count--;
 	}
 
-	_starpu_data_check_not_busy(handle);
+	unsigned destroyed = _starpu_data_check_not_busy(handle);
 
 	r->refcnt--;
 
@@ -314,7 +314,8 @@ static void starpu_handle_data_request_completion(struct _starpu_data_request *r
 	if (do_delete)
 		starpu_data_request_destroy(r);
 
-	_starpu_spin_unlock(&handle->header_lock);
+	if (!destroyed)
+		_starpu_spin_unlock(&handle->header_lock);
 
 	/* We do the callback once the lock is released so that they can do
 	 * blocking operations with the handle (eg. release it) */

+ 26 - 7
src/datawizard/interfaces/data_interface.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2009, 2010, 2011-2012  Université de Bordeaux 1
+ * Copyright (C) 2009-2012  Université de Bordeaux 1
  * Copyright (C) 2010, 2011, 2012  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -405,8 +405,11 @@ struct _starpu_unregister_callback_arg
 
 /* Check whether we should tell starpu_data_unregister that the data handle is
  * not busy any more.
- * The header is supposed to be locked */
-void _starpu_data_check_not_busy(starpu_data_handle_t handle)
+ * The header is supposed to be locked.
+ * This may free the handle, if it was lazily unregistered (1 is returned in
+ * that case).  The handle pointer thus becomes invalid for the caller.
+ */
+int _starpu_data_check_not_busy(starpu_data_handle_t handle)
 {
 	if (!handle->busy_count && handle->busy_waiting)
 	{
@@ -414,6 +417,20 @@ void _starpu_data_check_not_busy(starpu_data_handle_t handle)
 		_STARPU_PTHREAD_COND_BROADCAST(&handle->busy_cond);
 		_STARPU_PTHREAD_MUTEX_UNLOCK(&handle->busy_mutex);
 	}
+
+	/* The handle has been destroyed in between (eg. this was a temporary
+	 * handle created for a reduction.) */
+	if (handle->lazy_unregister && handle->busy_count == 0)
+	{
+		_starpu_spin_unlock(&handle->header_lock);
+		starpu_data_unregister_no_coherency(handle);
+		/* Warning: in case we unregister the handle, we must be sure
+		 * that the caller will not try to unlock the header after
+		 * !*/
+		return 1;
+	}
+
+	return 0;
 }
 
 static void _starpu_data_unregister_fetch_data_callback(void *_arg)
@@ -519,14 +536,16 @@ static void _starpu_data_unregister(starpu_data_handle_t handle, unsigned cohere
 			func(buffers, NULL);
 		}
 	}
-	else
-	{
+
+	_starpu_spin_lock(&handle->header_lock);
+	if (!coherent) {
 		/* Should we postpone the unregister operation ? */
-		if ((handle->refcnt > 0) && handle->lazy_unregister)
+		if ((handle->busy_count > 0) && handle->lazy_unregister) {
+			_starpu_spin_unlock(&handle->header_lock);
 			return;
+		}
 	}
 
-	_starpu_spin_lock(&handle->header_lock);
 	/* Tell holders of references that we're starting waiting */
 	handle->busy_waiting = 1;
 	_starpu_spin_unlock(&handle->header_lock);

+ 2 - 2
src/datawizard/interfaces/data_interface.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2009-2011  Université de Bordeaux 1
+ * Copyright (C) 2009-2012  Université de Bordeaux 1
  * Copyright (C) 2010  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -27,7 +27,7 @@ void _starpu_data_free_interfaces(starpu_data_handle_t handle)
 	STARPU_ATTRIBUTE_INTERNAL;
 
 extern void _starpu_data_interface_init(void) STARPU_ATTRIBUTE_INTERNAL;
-extern void _starpu_data_check_not_busy(starpu_data_handle_t handle) STARPU_ATTRIBUTE_INTERNAL;
+extern int _starpu_data_check_not_busy(starpu_data_handle_t handle) STARPU_ATTRIBUTE_INTERNAL;
 extern void _starpu_data_interface_shutdown(void) STARPU_ATTRIBUTE_INTERNAL;
 
 extern void _starpu_data_register_ram_pointer(starpu_data_handle_t handle,

+ 2 - 0
src/datawizard/reduction.c

@@ -342,7 +342,9 @@ void _starpu_data_end_reduction_mode_terminate(starpu_data_handle_t handle)
 		if (handle->reduction_tmp_handles[worker])
 		{
 //			fprintf(stderr, "unregister handle %p\n", handle);
+			_starpu_spin_lock(&handle->reduction_tmp_handles[worker]->header_lock);
 			handle->reduction_tmp_handles[worker]->lazy_unregister = 1;
+			_starpu_spin_unlock(&handle->reduction_tmp_handles[worker]->header_lock);
 			starpu_data_unregister_no_coherency(handle->reduction_tmp_handles[worker]);
 			handle->per_worker[worker].refcnt--;
 			/* TODO put in cache */

+ 3 - 8
src/datawizard/user_interactions.c

@@ -334,8 +334,8 @@ static void _prefetch_data_on_node(void *arg)
 	}
 
 	_starpu_spin_lock(&handle->header_lock);
-	_starpu_notify_data_dependencies(handle);
-	_starpu_spin_unlock(&handle->header_lock);
+	if (!_starpu_notify_data_dependencies(handle))
+		_starpu_spin_unlock(&handle->header_lock);
 }
 
 static
@@ -376,17 +376,12 @@ int _starpu_prefetch_data_on_node_with_mode(starpu_data_handle_t handle, unsigne
 			STARPU_ASSERT(replicate->refcnt >= 0);
 			STARPU_ASSERT(handle->busy_count > 0);
 			handle->busy_count--;
-			_starpu_data_check_not_busy(handle);
 		}
 
 		/* In case there was a temporary handle (eg. used for reduction), this
 		 * handle may have requested to be destroyed when the data is released
 		 * */
-		unsigned handle_was_destroyed = handle->lazy_unregister;
-
-		_starpu_notify_data_dependencies(handle);
-
-		if (!handle_was_destroyed)
+		if (!_starpu_notify_data_dependencies(handle))
 			_starpu_spin_unlock(&handle->header_lock);
 	}
 	else if (!async)

+ 2 - 2
src/datawizard/write_back.c

@@ -24,8 +24,8 @@ static void wt_callback(void *arg)
 	starpu_data_handle_t handle = (starpu_data_handle_t) arg;
 
 	_starpu_spin_lock(&handle->header_lock);
-	_starpu_notify_data_dependencies(handle);
-	_starpu_spin_unlock(&handle->header_lock);
+	if (!_starpu_notify_data_dependencies(handle))
+		_starpu_spin_unlock(&handle->header_lock);
 }
 
 void _starpu_write_through_data(starpu_data_handle_t handle, uint32_t requesting_node,