浏览代码

Generalize implicit dependency handling to properly make commutative writers wait for all the previous readers, vice-versa, etc.

Samuel Thibault 12 年之前
父节点
当前提交
55897da289

+ 100 - 132
src/core/dependencies/implicit_data_deps.c

@@ -46,23 +46,22 @@ static void _starpu_add_dependency(starpu_data_handle_t handle STARPU_ATTRIBUTE_
 	_starpu_add_ghost_dependency(handle, _starpu_get_job_associated_to_task(previous)->job_id, next);
 }
 
-/* Read after Write (RAW) or Read after Read (RAR) */
-static void _starpu_add_reader_after_writer(starpu_data_handle_t handle, struct starpu_task *pre_sync_task, struct starpu_task *post_sync_task)
+/* Add pre_sync_task as new accessor among the existing ones, making it depend on the last synchronization task if any.  */
+static void _starpu_add_accessor(starpu_data_handle_t handle, struct starpu_task *pre_sync_task, struct starpu_task *post_sync_task)
 {
 	/* Add this task to the list of readers */
 	struct _starpu_task_wrapper_list *link = (struct _starpu_task_wrapper_list *) malloc(sizeof(struct _starpu_task_wrapper_list));
 	link->task = post_sync_task;
-	link->next = handle->last_submitted_readers;
-	handle->last_submitted_readers = link;
+	link->next = handle->last_submitted_accessors;
+	handle->last_submitted_accessors = link;
 
-	/* This task depends on the previous writer if any */
-	if (handle->last_submitted_writer && handle->last_submitted_writer != post_sync_task)
+	/* This task depends on the previous synchronization task if any */
+	if (handle->last_sync_task && handle->last_sync_task != post_sync_task)
 	{
-		_STARPU_DEP_DEBUG("RAW %p\n", handle);
-		struct starpu_task *task_array[1] = {handle->last_submitted_writer};
+		struct starpu_task *task_array[1] = {handle->last_sync_task};
 		_starpu_task_declare_deps_array(pre_sync_task, 1, task_array, 0);
-		_starpu_add_dependency(handle, handle->last_submitted_writer, pre_sync_task);
-		_STARPU_DEP_DEBUG("dep %p -> %p\n", handle->last_submitted_writer, pre_sync_task);
+		_starpu_add_dependency(handle, handle->last_sync_task, pre_sync_task);
+		_STARPU_DEP_DEBUG("dep %p -> %p\n", handle->last_sync_task, pre_sync_task);
 	}
         else
         {
@@ -82,12 +81,12 @@ static void _starpu_add_reader_after_writer(starpu_data_handle_t handle, struct
 #ifdef HAVE_AYUDAME_H
 		|| AYU_event
 #endif
-		) && handle->last_submitted_ghost_writer_id_is_valid)
+		) && handle->last_submitted_ghost_sync_id_is_valid)
 	{
-		_STARPU_TRACE_GHOST_TASK_DEPS(handle->last_submitted_ghost_writer_id,
+		_STARPU_TRACE_GHOST_TASK_DEPS(handle->last_submitted_ghost_sync_id,
 			_starpu_get_job_associated_to_task(pre_sync_task)->job_id);
-		_starpu_add_ghost_dependency(handle, handle->last_submitted_ghost_writer_id, pre_sync_task);
-		_STARPU_DEP_DEBUG("dep ID%lu -> %p\n", handle->last_submitted_ghost_writer_id, pre_sync_task);
+		_starpu_add_ghost_dependency(handle, handle->last_submitted_ghost_sync_id, pre_sync_task);
+		_STARPU_DEP_DEBUG("dep ID%lu -> %p\n", handle->last_submitted_ghost_sync_id, pre_sync_task);
 	}
 
 	if (!pre_sync_task->cl) {
@@ -99,27 +98,27 @@ static void _starpu_add_reader_after_writer(starpu_data_handle_t handle, struct
 	}
 }
 
-/* Write after Read (WAR) */
-static void _starpu_add_writer_after_readers(starpu_data_handle_t handle, struct starpu_task *pre_sync_task, struct starpu_task *post_sync_task)
+/* This adds a new synchronization task which depends on all the previous accessors */
+static void _starpu_add_sync_task(starpu_data_handle_t handle, struct starpu_task *pre_sync_task, struct starpu_task *post_sync_task)
 {
-	/* Count the readers */
-	unsigned nreaders = 0;
+	/* Count the existing accessors */
+	unsigned naccessors = 0;
 	struct _starpu_task_wrapper_list *l;
-	l = handle->last_submitted_readers;
+	l = handle->last_submitted_accessors;
 	while (l)
 	{
 		if (l->task != post_sync_task)
-			nreaders++;
+			naccessors++;
 		l = l->next;
 	}
-	_STARPU_DEP_DEBUG("%d readers\n", nreaders);
+	_STARPU_DEP_DEBUG("%d accessors\n", naccessors);
 
-	if (nreaders > 0)
+	if (naccessors > 0)
 	{
 		/* Put all tasks in the list into task_array */
-		struct starpu_task *task_array[nreaders];
+		struct starpu_task *task_array[naccessors];
 		unsigned i = 0;
-		l = handle->last_submitted_readers;
+		l = handle->last_submitted_accessors;
 		while (l)
 		{
 			STARPU_ASSERT(l->task);
@@ -134,80 +133,31 @@ static void _starpu_add_writer_after_readers(starpu_data_handle_t handle, struct
 			l = l->next;
 			free(prev);
 		}
-		_starpu_task_declare_deps_array(pre_sync_task, nreaders, task_array, 0);
+		_starpu_task_declare_deps_array(pre_sync_task, naccessors, task_array, 0);
 	}
 #ifndef STARPU_USE_FXT
 	if (_starpu_bound_recording)
 #endif
 	{
-		/* Declare all dependencies with ghost readers */
-		struct _starpu_jobid_list *ghost_readers_id = handle->last_submitted_ghost_readers_id;
-		while (ghost_readers_id)
+		/* Declare all dependencies with ghost accessors */
+		struct _starpu_jobid_list *ghost_accessors_id = handle->last_submitted_ghost_accessors_id;
+		while (ghost_accessors_id)
 		{
-			unsigned long id = ghost_readers_id->id;
+			unsigned long id = ghost_accessors_id->id;
 			_STARPU_TRACE_GHOST_TASK_DEPS(id,
 				_starpu_get_job_associated_to_task(pre_sync_task)->job_id);
 			_starpu_add_ghost_dependency(handle, id, pre_sync_task);
 			_STARPU_DEP_DEBUG("dep ID%lu -> %p\n", id, pre_sync_task);
 
-			struct _starpu_jobid_list *prev = ghost_readers_id;
-			ghost_readers_id = ghost_readers_id->next;
+			struct _starpu_jobid_list *prev = ghost_accessors_id;
+			ghost_accessors_id = ghost_accessors_id->next;
 			free(prev);
 		}
-		handle->last_submitted_ghost_readers_id = NULL;
+		handle->last_submitted_ghost_accessors_id = NULL;
 	}
 
-	handle->last_submitted_readers = NULL;
-	handle->last_submitted_writer = post_sync_task;
-
-	if (!post_sync_task->cl) {
-		/* Add a reference to be released in _starpu_handle_job_termination */
-		_starpu_spin_lock(&handle->header_lock);
-		handle->busy_count++;
-		_starpu_spin_unlock(&handle->header_lock);
-		_starpu_get_job_associated_to_task(post_sync_task)->implicit_dep_handle = handle;
-	}
-}
-
-/* Write after Write (WAW) */
-static void _starpu_add_writer_after_writer(starpu_data_handle_t handle, struct starpu_task *pre_sync_task, struct starpu_task *post_sync_task)
-{
-	/* (Read) Write */
-	/* This task depends on the previous writer */
-	if (handle->last_submitted_writer && handle->last_submitted_writer != post_sync_task)
-	{
-		struct starpu_task *task_array[1] = {handle->last_submitted_writer};
-		_starpu_task_declare_deps_array(pre_sync_task, 1, task_array, 0);
-		_starpu_add_dependency(handle, handle->last_submitted_writer, pre_sync_task);
-		_STARPU_DEP_DEBUG("dep %p -> %p\n", handle->last_submitted_writer, pre_sync_task);
-	}
-        else
-        {
-		_STARPU_DEP_DEBUG("No dep\n");
-        }
-
-	/* If there is a ghost writer instead, we
-	 * should declare a ghost dependency here, and
-	 * invalidate the ghost value. */
-#ifndef STARPU_USE_FXT
-	if (_starpu_bound_recording)
-#endif
-	{
-		if (handle->last_submitted_ghost_writer_id_is_valid)
-		{
-			_STARPU_TRACE_GHOST_TASK_DEPS(handle->last_submitted_ghost_writer_id, 
-				_starpu_get_job_associated_to_task(pre_sync_task)->job_id);
-			_starpu_add_ghost_dependency(handle, handle->last_submitted_ghost_writer_id, pre_sync_task);
-			_STARPU_DEP_DEBUG("dep ID%lu -> %p\n", handle->last_submitted_ghost_writer_id, pre_sync_task);
-			handle->last_submitted_ghost_writer_id_is_valid = 0;
-		}
-                else
-                {
-			_STARPU_DEP_DEBUG("No dep ID\n");
-                }
-	}
-
-	handle->last_submitted_writer = post_sync_task;
+	handle->last_submitted_accessors = NULL;
+	handle->last_sync_task = post_sync_task;
 
 	if (!post_sync_task->cl) {
 		/* Add a reference to be released in _starpu_handle_job_termination */
@@ -245,7 +195,6 @@ struct starpu_task *_starpu_detect_implicit_data_deps_with_handle(struct starpu_
 		if (pre_sync_job->reduction_task || post_sync_job->reduction_task)
 			return NULL;
 
-		_STARPU_DEP_DEBUG("Tasks %p %p\n", pre_sync_task, post_sync_task);
 		/* In case we are generating the DAG, we add an implicit
 		 * dependency between the pre and the post sync tasks in case
 		 * they are not the same. */
@@ -261,56 +210,75 @@ struct starpu_task *_starpu_detect_implicit_data_deps_with_handle(struct starpu_
 
 		enum starpu_data_access_mode previous_mode = handle->last_submitted_mode;
 
-		if (mode & STARPU_W)
+		_STARPU_DEP_DEBUG("Handle %p Tasks %p %p %x->%x\n", handle, pre_sync_task, post_sync_task, previous_mode, mode);
+
+		/*
+		 * Tasks can access the data concurrently only if they have the
+		 * same access mode, which can only be either:
+		 * - write with STARPU_COMMUTE
+		 * - read
+		 * - redux
+		 *
+		 * In other cases, the tasks have to depend on each other.
+		 */
+
+		if ((mode & STARPU_W && mode & STARPU_COMMUTE && previous_mode & STARPU_W && previous_mode && STARPU_COMMUTE)
+		  || (mode == STARPU_R && previous_mode == STARPU_R)
+		  || (mode == STARPU_REDUX && previous_mode == STARPU_REDUX))
 		{
-			_STARPU_DEP_DEBUG("W %p\n", handle);
-			if (previous_mode & STARPU_W)
-			{
-				_STARPU_DEP_DEBUG("WAW %p\n", handle);
-				/* Add WAW dependency if any of the two writers refuse to commute */
-				if (! (mode & STARPU_COMMUTE && previous_mode & STARPU_COMMUTE))
-					_starpu_add_writer_after_writer(handle, pre_sync_task, post_sync_task);
-			}
-			else
-			{
-				/* The task submitted previously were in read-only
-				 * mode: this task must depend on all those read-only
-				 * tasks and we get rid of the list of readers */
-				_STARPU_DEP_DEBUG("WAR %p\n", handle);
-				_starpu_add_writer_after_readers(handle, pre_sync_task, post_sync_task);
-			}
+			_STARPU_DEP_DEBUG("concurrently\n");
+			/* Can access concurrently with current tasks */
+			_starpu_add_accessor(handle, pre_sync_task, post_sync_task);
 		}
 		else
 		{
-			_STARPU_DEP_DEBUG("R %p %d -> %d\n", handle, previous_mode, mode);
-			/* Add a reader, after a writer or a reader. */
-			STARPU_ASSERT(pre_sync_task);
-			STARPU_ASSERT(post_sync_task);
+			/* Can not access concurrently, have to wait for existing accessors */
+			struct _starpu_task_wrapper_list *l = handle->last_submitted_accessors;
+			_STARPU_DEP_DEBUG("dependency\n");
 
-			STARPU_ASSERT(mode & (STARPU_R|STARPU_REDUX));
-
-			if (!(previous_mode & STARPU_W) && (mode != previous_mode))
+			if (l && l->next)
 			{
-				/* Read after Redux or Redux after Read: we
-				 * insert a dummy synchronization task so that
-				 * we don't need to have a gigantic number of
-				 * dependencies between all readers and all
-				 * redux tasks. */
-
-				/* Create an empty task */
-				struct starpu_task *new_sync_task;
-				new_sync_task = starpu_task_create();
-				STARPU_ASSERT(new_sync_task);
-				new_sync_task->cl = NULL;
+				/* Several previous accessors */
+
+				if (mode == STARPU_W)
+				{
+					/* Optimization: this task can not
+					 * combine with others anyway, use it
+					 * as synchronization task by making it
+					 * wait for the previous ones. */
+					_starpu_add_sync_task(handle, pre_sync_task, post_sync_task);
+				} else {
+					_STARPU_DEP_DEBUG("several predecessors, adding sync task\n");
+					/* insert an empty synchronization task
+					 * which waits for the whole set,
+					 * instead of creating a quadratic
+					 * number of dependencies. */
+					struct starpu_task *sync_task = starpu_task_create();
+					STARPU_ASSERT(sync_task);
+					sync_task->cl = NULL;
 #ifdef STARPU_USE_FXT
-				_starpu_get_job_associated_to_task(new_sync_task)->model_name = "sync_task_redux";
+					_starpu_get_job_associated_to_task(sync_task)->model_name = "sync_task_redux";
 #endif
+					/* Make this task wait for the previous ones */
+					_starpu_add_sync_task(handle, sync_task, sync_task);
+					/* And the requested task wait for this one */
+					_starpu_add_accessor(handle, pre_sync_task, post_sync_task);
 
-				_starpu_add_writer_after_readers(handle, new_sync_task, new_sync_task);
-
-				task = new_sync_task;
+					task = sync_task;
+				}
+			}
+			else
+			{
+				if (l)
+				{
+					/* One previous accessor, make it the sync
+					 * task, and start depending on it. */
+					handle->last_sync_task = l->task;
+					handle->last_submitted_accessors = NULL;
+					free(l);
+				}
+				_starpu_add_accessor(handle, pre_sync_task, post_sync_task);
 			}
-			_starpu_add_reader_after_writer(handle, pre_sync_task, post_sync_task);
 		}
 		handle->last_submitted_mode = mode;
 	}
@@ -376,18 +344,18 @@ void _starpu_release_data_enforce_sequential_consistency(struct starpu_task *tas
 
 		/* If this is the last writer, there is no point in adding
 		 * extra deps to that tasks that does not exists anymore */
-		if (task == handle->last_submitted_writer)
+		if (task == handle->last_sync_task)
 		{
-			handle->last_submitted_writer = NULL;
+			handle->last_sync_task = NULL;
 
 #ifndef STARPU_USE_FXT
 			if (_starpu_bound_recording)
 #endif
 			{
 				/* Save the previous writer as the ghost last writer */
-				handle->last_submitted_ghost_writer_id_is_valid = 1;
+				handle->last_submitted_ghost_sync_id_is_valid = 1;
 				struct _starpu_job *ghost_job = _starpu_get_job_associated_to_task(task);
-				handle->last_submitted_ghost_writer_id = ghost_job->job_id;
+				handle->last_submitted_ghost_sync_id = ghost_job->job_id;
 			}
 		}
 
@@ -399,7 +367,7 @@ void _starpu_release_data_enforce_sequential_consistency(struct starpu_task *tas
 		/* Same if this is one of the readers: we go through the list
 		 * of readers and remove the task if it is found. */
 		struct _starpu_task_wrapper_list *l;
-		l = handle->last_submitted_readers;
+		l = handle->last_submitted_accessors;
 		struct _starpu_task_wrapper_list *prev = NULL;
 #ifdef STARPU_DEVEL
 #warning TODO: use double-linked list to make finding ourself fast
@@ -421,9 +389,9 @@ void _starpu_release_data_enforce_sequential_consistency(struct starpu_task *tas
 					struct _starpu_job *ghost_reader_job = _starpu_get_job_associated_to_task(task);
 					struct _starpu_jobid_list *link = (struct _starpu_jobid_list *) malloc(sizeof(struct _starpu_jobid_list));
 					STARPU_ASSERT(link);
-					link->next = handle->last_submitted_ghost_readers_id;
+					link->next = handle->last_submitted_ghost_accessors_id;
 					link->id = ghost_reader_job->job_id;
-					handle->last_submitted_ghost_readers_id = link;
+					handle->last_submitted_ghost_accessors_id = link;
 				}
 
 				if (prev)
@@ -433,7 +401,7 @@ void _starpu_release_data_enforce_sequential_consistency(struct starpu_task *tas
 				else
 				{
 					/* This is the first element of the list */
-					handle->last_submitted_readers = next;
+					handle->last_submitted_accessors = next;
 				}
 
 				/* XXX can we really find the same task again

+ 5 - 5
src/datawizard/coherency.h

@@ -166,8 +166,8 @@ struct _starpu_data_state
 	 * read-only mode should depend on that task implicitely if the
 	 * sequential_consistency flag is enabled. */
 	enum starpu_data_access_mode last_submitted_mode;
-	struct starpu_task *last_submitted_writer;
-	struct _starpu_task_wrapper_list *last_submitted_readers;
+	struct starpu_task *last_sync_task;
+	struct _starpu_task_wrapper_list *last_submitted_accessors;
 
 	/* If FxT is enabled, we keep track of "ghost dependencies": that is to
 	 * say the dependencies that are not needed anymore, but that should
@@ -175,9 +175,9 @@ struct _starpu_data_state
 	 * f(Aw) g(Aw), and that g is submitted after the termination of f, we
 	 * want to have f->g appear in the DAG even if StarPU does not need to
 	 * enforce this dependency anymore.*/
-	unsigned last_submitted_ghost_writer_id_is_valid;
-	unsigned long last_submitted_ghost_writer_id;
-	struct _starpu_jobid_list *last_submitted_ghost_readers_id;
+	unsigned last_submitted_ghost_sync_id_is_valid;
+	unsigned long last_submitted_ghost_sync_id;
+	struct _starpu_jobid_list *last_submitted_ghost_accessors_id;
 
 	struct _starpu_task_wrapper_list *post_sync_tasks;
 	unsigned post_sync_tasks_cnt;

+ 5 - 5
src/datawizard/filters.c

@@ -184,8 +184,8 @@ void starpu_data_partition(starpu_data_handle_t initial_handle, struct starpu_da
 
 		STARPU_PTHREAD_MUTEX_INIT(&child->sequential_consistency_mutex, NULL);
 		child->last_submitted_mode = STARPU_R;
-		child->last_submitted_writer = NULL;
-		child->last_submitted_readers = NULL;
+		child->last_sync_task = NULL;
+		child->last_submitted_accessors = NULL;
 		child->post_sync_tasks = NULL;
 		child->post_sync_tasks_cnt = 0;
 
@@ -195,9 +195,9 @@ void starpu_data_partition(starpu_data_handle_t initial_handle, struct starpu_da
 		child->init_cl = initial_handle->init_cl;
 
 #ifdef STARPU_USE_FXT
-		child->last_submitted_ghost_writer_id_is_valid = 0;
-		child->last_submitted_ghost_writer_id = 0;
-		child->last_submitted_ghost_readers_id = NULL;
+		child->last_submitted_ghost_sync_id_is_valid = 0;
+		child->last_submitted_ghost_sync_id = 0;
+		child->last_submitted_ghost_accessors_id = NULL;
 #endif
 
 		for (node = 0; node < STARPU_MAXNODES; node++)

+ 5 - 5
src/datawizard/interfaces/data_interface.c

@@ -197,8 +197,8 @@ static void _starpu_register_new_data(starpu_data_handle_t handle,
 
 	STARPU_PTHREAD_MUTEX_INIT(&handle->sequential_consistency_mutex, NULL);
 	handle->last_submitted_mode = STARPU_R;
-	handle->last_submitted_writer = NULL;
-	handle->last_submitted_readers = NULL;
+	handle->last_sync_task = NULL;
+	handle->last_submitted_accessors = NULL;
 	handle->post_sync_tasks = NULL;
 	handle->post_sync_tasks_cnt = 0;
 
@@ -210,9 +210,9 @@ static void _starpu_register_new_data(starpu_data_handle_t handle,
 	handle->reduction_req_list = _starpu_data_requester_list_new();
 
 #ifdef STARPU_USE_FXT
-	handle->last_submitted_ghost_writer_id_is_valid = 0;
-	handle->last_submitted_ghost_writer_id = 0;
-	handle->last_submitted_ghost_readers_id = NULL;
+	handle->last_submitted_ghost_sync_id_is_valid = 0;
+	handle->last_submitted_ghost_sync_id = 0;
+	handle->last_submitted_ghost_accessors_id = NULL;
 #endif
 
 	handle->wt_mask = wt_mask;