Просмотр исходного кода

Fix complexity of implicit task/data dependency, from quadratic to linear

Samuel Thibault лет назад: 11
Родитель
Сommit
f43bf86292

+ 1 - 0
ChangeLog

@@ -87,6 +87,7 @@ Changes:
   * StarPU-MPI: Fix for being able to receive data with the same tag
     from several nodes (see mpi/tests/gather.c)
   * Remove the long-deprecated cost_model fields and task->buffers field.
+  * Fix complexity of implicit task/data dependency, from quadratic to linear.
 
 Small changes:
   * Rename function starpu_trace_user_event() as

+ 64 - 77
src/core/dependencies/implicit_data_deps.c

@@ -47,13 +47,14 @@ static void _starpu_add_dependency(starpu_data_handle_t handle STARPU_ATTRIBUTE_
 }
 
 /* Add pre_sync_task as new accessor among the existing ones, making it depend on the last synchronization task if any.  */
-static void _starpu_add_accessor(starpu_data_handle_t handle, struct starpu_task *pre_sync_task, struct starpu_task *post_sync_task)
+static void _starpu_add_accessor(starpu_data_handle_t handle, struct starpu_task *pre_sync_task, struct starpu_task *post_sync_task, struct _starpu_task_wrapper_dlist *post_sync_task_dependency_slot)
 {
 	/* Add this task to the list of readers */
-	struct _starpu_task_wrapper_list *link = (struct _starpu_task_wrapper_list *) malloc(sizeof(struct _starpu_task_wrapper_list));
-	link->task = post_sync_task;
-	link->next = handle->last_submitted_accessors;
-	handle->last_submitted_accessors = link;
+	post_sync_task_dependency_slot->task = post_sync_task;
+	post_sync_task_dependency_slot->next = handle->last_submitted_accessors.next;
+	post_sync_task_dependency_slot->prev = &handle->last_submitted_accessors;
+	post_sync_task_dependency_slot->next->prev = post_sync_task_dependency_slot;
+	handle->last_submitted_accessors.next = post_sync_task_dependency_slot;
 
 	/* This task depends on the previous synchronization task if any */
 	if (handle->last_sync_task && handle->last_sync_task != post_sync_task)
@@ -103,9 +104,9 @@ static void _starpu_add_sync_task(starpu_data_handle_t handle, struct starpu_tas
 {
 	/* Count the existing accessors */
 	unsigned naccessors = 0;
-	struct _starpu_task_wrapper_list *l;
-	l = handle->last_submitted_accessors;
-	while (l)
+	struct _starpu_task_wrapper_dlist *l;
+	l = handle->last_submitted_accessors.next;
+	while (l != &handle->last_submitted_accessors)
 	{
 		if (l->task != post_sync_task)
 			naccessors++;
@@ -118,8 +119,8 @@ static void _starpu_add_sync_task(starpu_data_handle_t handle, struct starpu_tas
 		/* Put all tasks in the list into task_array */
 		struct starpu_task *task_array[naccessors];
 		unsigned i = 0;
-		l = handle->last_submitted_accessors;
-		while (l)
+		l = handle->last_submitted_accessors.next;
+		while (l != &handle->last_submitted_accessors)
 		{
 			STARPU_ASSERT(l->task);
 			if (l->task != post_sync_task)
@@ -129,9 +130,10 @@ static void _starpu_add_sync_task(starpu_data_handle_t handle, struct starpu_tas
 				_STARPU_DEP_DEBUG("dep %p -> %p\n", l->task, pre_sync_task);
 			}
 
-			struct _starpu_task_wrapper_list *prev = l;
+			struct _starpu_task_wrapper_dlist *prev = l;
 			l = l->next;
-			free(prev);
+			prev->next = NULL;
+			prev->prev = NULL;
 		}
 		_starpu_task_declare_deps_array(pre_sync_task, naccessors, task_array, 0);
 	}
@@ -156,7 +158,8 @@ static void _starpu_add_sync_task(starpu_data_handle_t handle, struct starpu_tas
 		handle->last_submitted_ghost_accessors_id = NULL;
 	}
 
-	handle->last_submitted_accessors = NULL;
+	handle->last_submitted_accessors.next = &handle->last_submitted_accessors;
+	handle->last_submitted_accessors.prev = &handle->last_submitted_accessors;
 	handle->last_sync_task = post_sync_task;
 
 	if (!post_sync_task->cl) {
@@ -177,7 +180,7 @@ static void _starpu_add_sync_task(starpu_data_handle_t handle, struct starpu_tas
  * */
 /* NB : handle->sequential_consistency_mutex must be hold by the caller;
  * returns a task, to be submitted after releasing that mutex. */
-struct starpu_task *_starpu_detect_implicit_data_deps_with_handle(struct starpu_task *pre_sync_task, struct starpu_task *post_sync_task,
+struct starpu_task *_starpu_detect_implicit_data_deps_with_handle(struct starpu_task *pre_sync_task, struct starpu_task *post_sync_task, struct _starpu_task_wrapper_dlist *post_sync_task_dependency_slot,
 						   starpu_data_handle_t handle, enum starpu_data_access_mode mode)
 {
 	struct starpu_task *task = NULL;
@@ -228,15 +231,16 @@ struct starpu_task *_starpu_detect_implicit_data_deps_with_handle(struct starpu_
 		{
 			_STARPU_DEP_DEBUG("concurrently\n");
 			/* Can access concurrently with current tasks */
-			_starpu_add_accessor(handle, pre_sync_task, post_sync_task);
+			_starpu_add_accessor(handle, pre_sync_task, post_sync_task, post_sync_task_dependency_slot);
 		}
 		else
 		{
 			/* Can not access concurrently, have to wait for existing accessors */
-			struct _starpu_task_wrapper_list *l = handle->last_submitted_accessors;
+			struct _starpu_task_wrapper_dlist *l = handle->last_submitted_accessors.next;
 			_STARPU_DEP_DEBUG("dependency\n");
 
-			if ((l && l->next) || (handle->last_submitted_ghost_accessors_id && handle->last_submitted_ghost_accessors_id->next))
+			if ((l != &handle->last_submitted_accessors && l->next != &handle->last_submitted_accessors)
+					|| (handle->last_submitted_ghost_accessors_id && handle->last_submitted_ghost_accessors_id->next))
 			{
 				/* Several previous accessors */
 
@@ -261,7 +265,7 @@ struct starpu_task *_starpu_detect_implicit_data_deps_with_handle(struct starpu_
 					/* Make this task wait for the previous ones */
 					_starpu_add_sync_task(handle, sync_task, sync_task);
 					/* And the requested task wait for this one */
-					_starpu_add_accessor(handle, pre_sync_task, post_sync_task);
+					_starpu_add_accessor(handle, pre_sync_task, post_sync_task, post_sync_task_dependency_slot);
 
 					task = sync_task;
 				}
@@ -270,11 +274,13 @@ struct starpu_task *_starpu_detect_implicit_data_deps_with_handle(struct starpu_
 			{
 				/* One previous accessor, make it the sync
 				 * task, and start depending on it. */
-				if (l)
+				if (l != &handle->last_submitted_accessors)
 				{
 					handle->last_sync_task = l->task;
-					handle->last_submitted_accessors = NULL;
-					free(l);
+					l->next = NULL;
+					l->prev = NULL;
+					handle->last_submitted_accessors.next = &handle->last_submitted_accessors;
+					handle->last_submitted_accessors.prev = &handle->last_submitted_accessors;
 				}
 				else if (handle->last_submitted_ghost_accessors_id)
 				{
@@ -283,7 +289,7 @@ struct starpu_task *_starpu_detect_implicit_data_deps_with_handle(struct starpu_
 					free(handle->last_submitted_ghost_accessors_id);
 					handle->last_submitted_ghost_accessors_id = NULL;
 				}
-				_starpu_add_accessor(handle, pre_sync_task, post_sync_task);
+				_starpu_add_accessor(handle, pre_sync_task, post_sync_task, post_sync_task_dependency_slot);
 			}
 		}
 		handle->last_submitted_mode = mode;
@@ -308,6 +314,7 @@ void _starpu_detect_implicit_data_deps(struct starpu_task *task)
 		return;
 
 	unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
+	struct _starpu_task_wrapper_dlist *dep_slots = _STARPU_JOB_GET_DEP_SLOTS(j);
 
 	unsigned buffer;
 	for (buffer = 0; buffer < nbuffers; buffer++)
@@ -321,7 +328,7 @@ void _starpu_detect_implicit_data_deps(struct starpu_task *task)
 			continue;
 
 		STARPU_PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
-		new_task = _starpu_detect_implicit_data_deps_with_handle(task, task, handle, mode);
+		new_task = _starpu_detect_implicit_data_deps_with_handle(task, task, &dep_slots[buffer], handle, mode);
 		STARPU_PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
 		if (new_task)
 		{
@@ -341,7 +348,7 @@ void _starpu_detect_implicit_data_deps(struct starpu_task *task)
  * if h is submitted after the termination of f or g, StarPU will not create a
  * dependency as this is not needed anymore. */
 /* the sequential_consistency_mutex of the handle has to be already held */
-void _starpu_release_data_enforce_sequential_consistency(struct starpu_task *task, starpu_data_handle_t handle)
+void _starpu_release_data_enforce_sequential_consistency(struct starpu_task *task, struct _starpu_task_wrapper_dlist *task_dependency_slot, starpu_data_handle_t handle)
 {
 	STARPU_PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
 
@@ -365,63 +372,35 @@ void _starpu_release_data_enforce_sequential_consistency(struct starpu_task *tas
 			}
 		}
 
-		/* XXX can a task be both the last writer associated to a data
-		 * and be in its list of readers ? If not, we should not go
-		 * through the entire list once we have detected it was the
-		 * last writer. */
-
 		/* Same if this is one of the readers: we go through the list
 		 * of readers and remove the task if it is found. */
-		struct _starpu_task_wrapper_list *l;
-		l = handle->last_submitted_accessors;
-		struct _starpu_task_wrapper_list *prev = NULL;
-#ifdef STARPU_DEVEL
-#warning TODO: use double-linked list to make finding ourself fast
-#endif
-		while (l)
+		if (task_dependency_slot && task_dependency_slot->next)
 		{
-			struct _starpu_task_wrapper_list *next = l->next;
-
-			if (l->task == task)
-			{
-				/* If we found the task in the reader list */
-				free(l);
+#ifdef STARPU_DEBUG
+			/* Make sure we are removing ourself from the proper handle */
+			struct _starpu_task_wrapper_dlist *l;
+			for (l = task_dependency_slot->prev; l->task; l = l->prev)
+				;
+			STARPU_ASSERT(l == &handle->last_submitted_accessors);
+			for (l = task_dependency_slot->next; l->task; l = l->next)
+				;
+			STARPU_ASSERT(l == &handle->last_submitted_accessors);
+#endif
 
+			task_dependency_slot->next->prev = task_dependency_slot->prev;
+			task_dependency_slot->prev->next = task_dependency_slot->next;
 #ifndef STARPU_USE_FXT
-				if (_starpu_bound_recording)
+			if (_starpu_bound_recording)
 #endif
-				{
-					/* Save the job id of the reader task in the ghost reader linked list list */
-					struct _starpu_job *ghost_reader_job = _starpu_get_job_associated_to_task(task);
-					struct _starpu_jobid_list *link = (struct _starpu_jobid_list *) malloc(sizeof(struct _starpu_jobid_list));
-					STARPU_ASSERT(link);
-					link->next = handle->last_submitted_ghost_accessors_id;
-					link->id = ghost_reader_job->job_id;
-					handle->last_submitted_ghost_accessors_id = link;
-				}
-
-				if (prev)
-				{
-					prev->next = next;
-				}
-				else
-				{
-					/* This is the first element of the list */
-					handle->last_submitted_accessors = next;
-				}
-
-				/* XXX can we really find the same task again
-				 * once we have found it ? Otherwise, we should
-				 * avoid going through the entire list and stop
-				 * as soon as we find the task. TODO: check how
-				 * duplicate dependencies are treated. */
-			}
-			else
 			{
-				prev = l;
+				/* Save the job id of the reader task in the ghost reader linked list list */
+				struct _starpu_job *ghost_reader_job = _starpu_get_job_associated_to_task(task);
+				struct _starpu_jobid_list *link = (struct _starpu_jobid_list *) malloc(sizeof(struct _starpu_jobid_list));
+				STARPU_ASSERT(link);
+				link->next = handle->last_submitted_ghost_accessors_id;
+				link->id = ghost_reader_job->job_id;
+				handle->last_submitted_ghost_accessors_id = link;
 			}
-
-			l = next;
 		}
 	}
 
@@ -434,13 +413,22 @@ void _starpu_release_task_enforce_sequential_consistency(struct _starpu_job *j)
 {
 	struct starpu_task *task = j->task;
         struct _starpu_data_descr *descrs = _STARPU_JOB_GET_ORDERED_BUFFERS(j);
+	struct _starpu_task_wrapper_dlist *slots = _STARPU_JOB_GET_DEP_SLOTS(j);
 
 	if (!task->cl)
 		return;
 
         unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
-
 	unsigned index;
+
+	/* Release all implicit dependencies */
+	for (index = 0; index < nbuffers; index++)
+	{
+		starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, index);
+
+		_starpu_release_data_enforce_sequential_consistency(task, &slots[index], handle);
+	}
+
 	for (index = 0; index < nbuffers; index++)
 	{
 		starpu_data_handle_t handle = descrs[index].handle;
@@ -451,7 +439,6 @@ void _starpu_release_task_enforce_sequential_consistency(struct _starpu_job *j)
 			 * _starpu_compar_handles */
 			continue;
 
-		_starpu_release_data_enforce_sequential_consistency(task, handle);
 		/* Release the reference acquired in _starpu_push_task_output */
 		_starpu_spin_lock(&handle->header_lock);
 		STARPU_ASSERT(handle->busy_count > 0);
@@ -512,7 +499,7 @@ void _starpu_unlock_post_sync_tasks(starpu_data_handle_t handle)
 		while (link)
 		{
 			/* There is no need to depend on that task now, since it was already unlocked */
-			_starpu_release_data_enforce_sequential_consistency(link->task, handle);
+			_starpu_release_data_enforce_sequential_consistency(link->task, &_starpu_get_job_associated_to_task(link->task)->implicit_dep_slot, handle);
 
 			int ret = _starpu_task_submit_internally(link->task);
 			STARPU_ASSERT(!ret);
@@ -540,7 +527,7 @@ int _starpu_data_wait_until_available(starpu_data_handle_t handle, enum starpu_d
 
 		/* It is not really a RW access, but we want to make sure that
 		 * all previous accesses are done */
-		new_task = _starpu_detect_implicit_data_deps_with_handle(sync_task, sync_task, handle, mode);
+		new_task = _starpu_detect_implicit_data_deps_with_handle(sync_task, sync_task, &_starpu_get_job_associated_to_task(sync_task)->implicit_dep_slot, handle, mode);
 		STARPU_PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
 
 		if (new_task)

+ 2 - 2
src/core/dependencies/implicit_data_deps.h

@@ -21,10 +21,10 @@
 #include <starpu.h>
 #include <common/config.h>
 
-struct starpu_task *_starpu_detect_implicit_data_deps_with_handle(struct starpu_task *pre_sync_task, struct starpu_task *post_sync_task,
+struct starpu_task *_starpu_detect_implicit_data_deps_with_handle(struct starpu_task *pre_sync_task, struct starpu_task *post_sync_task, struct _starpu_task_wrapper_dlist *post_sync_task_dependency_slot,
 						   starpu_data_handle_t handle, enum starpu_data_access_mode mode);
 void _starpu_detect_implicit_data_deps(struct starpu_task *task);
-void _starpu_release_data_enforce_sequential_consistency(struct starpu_task *task, starpu_data_handle_t handle);
+void _starpu_release_data_enforce_sequential_consistency(struct starpu_task *task, struct _starpu_task_wrapper_dlist *task_dependency_slot, starpu_data_handle_t handle);
 void _starpu_release_task_enforce_sequential_consistency(struct _starpu_job *j);
 
 void _starpu_add_post_sync_tasks(struct starpu_task *post_sync_task, starpu_data_handle_t handle);

+ 11 - 3
src/core/jobs.c

@@ -53,7 +53,10 @@ struct _starpu_job* STARPU_ATTRIBUTE_MALLOC _starpu_job_create(struct starpu_tas
 	memset(job, 0, sizeof(*job));
 
 	if (task->dyn_handles)
+	{
 	     job->dyn_ordered_buffers = malloc(STARPU_TASK_GET_NBUFFERS(task) * sizeof(job->dyn_ordered_buffers[0]));
+	     job->dyn_dep_slots = malloc(STARPU_TASK_GET_NBUFFERS(task) * sizeof(job->dyn_dep_slots[0]));
+	}
 
 	job->task = task;
 
@@ -109,8 +112,13 @@ void _starpu_job_destroy(struct _starpu_job *j)
 	_starpu_cg_list_deinit(&j->job_successors);
 	if (j->dyn_ordered_buffers)
 	{
-	     free(j->dyn_ordered_buffers);
-	     j->dyn_ordered_buffers = NULL;
+		free(j->dyn_ordered_buffers);
+		j->dyn_ordered_buffers = NULL;
+	}
+	if (j->dyn_dep_slots)
+	{
+		free(j->dyn_dep_slots);
+		j->dyn_dep_slots = NULL;
 	}
 
 	_starpu_job_delete(j);
@@ -202,7 +210,7 @@ void _starpu_handle_job_termination(struct _starpu_job *j)
 	 * tasks waiting for us */
 	if (j->implicit_dep_handle) {
 		starpu_data_handle_t handle = j->implicit_dep_handle;
-		_starpu_release_data_enforce_sequential_consistency(j->task, handle);
+		_starpu_release_data_enforce_sequential_consistency(j->task, &j->implicit_dep_slot, handle);
 		/* Release reference taken while setting implicit_dep_handle */
 		_starpu_spin_lock(&handle->header_lock);
 		handle->busy_count--;

+ 5 - 0
src/core/jobs.h

@@ -77,7 +77,9 @@ LIST_TYPE(_starpu_job,
 	 * the task so that we always grab the rw-lock associated to the
 	 * handles in the same order. */
 	struct _starpu_data_descr ordered_buffers[STARPU_NMAXBUFS];
+	struct _starpu_task_wrapper_dlist dep_slots[STARPU_NMAXBUFS];
 	struct _starpu_data_descr *dyn_ordered_buffers;
+	struct _starpu_task_wrapper_dlist *dyn_dep_slots;
 
 	/* If a tag is associated to the job, this points to the internal data
 	 * structure that describes the tag status. */
@@ -91,6 +93,7 @@ LIST_TYPE(_starpu_job,
 	 * the handle for this dependency, so as to remove the task from the
 	 * last_writer/readers */
 	starpu_data_handle_t implicit_dep_handle;
+	struct _starpu_task_wrapper_dlist implicit_dep_slot;
 
 	/* Indicates whether the task associated to that job has already been
 	 * submitted to StarPU (1) or not (0) (using starpu_task_submit).
@@ -198,4 +201,6 @@ int _starpu_push_local_task(struct _starpu_worker *worker, struct starpu_task *t
 #define _STARPU_JOB_SET_ORDERED_BUFFER(job, buffer, i) do { if (job->dyn_ordered_buffers) job->dyn_ordered_buffers[i] = buffer; else job->ordered_buffers[i] = buffer;} while(0)
 #define _STARPU_JOB_GET_ORDERED_BUFFERS(job) (job->dyn_ordered_buffers) ? job->dyn_ordered_buffers : job->ordered_buffers
 
+#define _STARPU_JOB_GET_DEP_SLOTS(job) (((job)->dyn_dep_slots) ? (job)->dyn_dep_slots : (job)->dep_slots)
+
 #endif // __JOBS_H__

+ 8 - 1
src/datawizard/coherency.h

@@ -97,6 +97,13 @@ struct _starpu_task_wrapper_list
 	struct _starpu_task_wrapper_list *next;
 };
 
+/* This structure describes a doubly-linked list of task */
+struct _starpu_task_wrapper_dlist {
+	struct starpu_task *task;
+	struct _starpu_task_wrapper_dlist *next;
+	struct _starpu_task_wrapper_dlist *prev;
+};
+
 extern int _starpu_has_not_important_data;
 
 typedef void (*_starpu_data_handle_unregister_hook)(starpu_data_handle_t);
@@ -169,7 +176,7 @@ struct _starpu_data_state
 	 * sequential_consistency flag is enabled. */
 	enum starpu_data_access_mode last_submitted_mode;
 	struct starpu_task *last_sync_task;
-	struct _starpu_task_wrapper_list *last_submitted_accessors;
+	struct _starpu_task_wrapper_dlist last_submitted_accessors;
 
 	/* If FxT is enabled, we keep track of "ghost dependencies": that is to
 	 * say the dependencies that are not needed anymore, but that should

+ 3 - 1
src/datawizard/filters.c

@@ -190,7 +190,9 @@ void starpu_data_partition(starpu_data_handle_t initial_handle, struct starpu_da
 		STARPU_PTHREAD_MUTEX_INIT(&child->sequential_consistency_mutex, NULL);
 		child->last_submitted_mode = STARPU_R;
 		child->last_sync_task = NULL;
-		child->last_submitted_accessors = NULL;
+		child->last_submitted_accessors.task = NULL;
+		child->last_submitted_accessors.next = &child->last_submitted_accessors;
+		child->last_submitted_accessors.prev = &child->last_submitted_accessors;
 		child->post_sync_tasks = NULL;
 		/* Tell helgrind that the race in _starpu_unlock_post_sync_tasks is fine */
 		STARPU_HG_DISABLE_CHECKING(child->post_sync_tasks_cnt);

+ 3 - 1
src/datawizard/interfaces/data_interface.c

@@ -205,7 +205,9 @@ static void _starpu_register_new_data(starpu_data_handle_t handle,
 	STARPU_PTHREAD_MUTEX_INIT(&handle->sequential_consistency_mutex, NULL);
 	handle->last_submitted_mode = STARPU_R;
 	handle->last_sync_task = NULL;
-	handle->last_submitted_accessors = NULL;
+	handle->last_submitted_accessors.task = NULL;
+	handle->last_submitted_accessors.next = &handle->last_submitted_accessors;
+	handle->last_submitted_accessors.prev = &handle->last_submitted_accessors;
 	handle->post_sync_tasks = NULL;
 
 	/* Tell helgrind that the race in _starpu_unlock_post_sync_tasks is fine */

+ 2 - 2
src/datawizard/user_interactions.c

@@ -157,7 +157,7 @@ int starpu_data_acquire_on_node_cb_sequential_consistency(starpu_data_handle_t h
 		wrapper->post_sync_task->name = "acquire_cb_post";
 		wrapper->post_sync_task->detach = 1;
 
-		new_task = _starpu_detect_implicit_data_deps_with_handle(wrapper->pre_sync_task, wrapper->post_sync_task, handle, mode);
+		new_task = _starpu_detect_implicit_data_deps_with_handle(wrapper->pre_sync_task, wrapper->post_sync_task, &_starpu_get_job_associated_to_task(wrapper->post_sync_task)->implicit_dep_slot, handle, mode);
 		STARPU_PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
 
 		if (new_task)
@@ -277,7 +277,7 @@ int starpu_data_acquire_on_node(starpu_data_handle_t handle, int node, enum star
 		wrapper.post_sync_task->name = "acquire_post";
 		wrapper.post_sync_task->detach = 1;
 
-		new_task = _starpu_detect_implicit_data_deps_with_handle(wrapper.pre_sync_task, wrapper.post_sync_task, handle, mode);
+		new_task = _starpu_detect_implicit_data_deps_with_handle(wrapper.pre_sync_task, wrapper.post_sync_task, &_starpu_get_job_associated_to_task(wrapper.post_sync_task)->implicit_dep_slot, handle, mode);
 		STARPU_PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
 		if (new_task)
 		{