Sfoglia il codice sorgente

Avoid submitting a pre_sync_task when there is nothing to wait for

Samuel Thibault 5 anni fa
parent
commit
0dbef44443

+ 44 - 16
src/core/dependencies/implicit_data_deps.c

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2011,2012,2016                           Inria
- * Copyright (C) 2010-2019                                Université de Bordeaux
+ * Copyright (C) 2010-2020                                Université de Bordeaux
  * Copyright (C) 2010-2013,2015-2018                      CNRS
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -49,8 +49,9 @@ static void _starpu_add_dependency(starpu_data_handle_t handle, struct starpu_ta
 	_starpu_add_ghost_dependency(handle, _starpu_get_job_associated_to_task(previous)->job_id, next);
 }
 
-/* Add pre_sync_task as new accessor among the existing ones, making it depend on the last synchronization task if any.  */
-static void _starpu_add_accessor(starpu_data_handle_t handle, struct starpu_task *pre_sync_task, struct starpu_task *post_sync_task, struct _starpu_task_wrapper_dlist *post_sync_task_dependency_slot)
+/* Add post_sync_task as new accessor among the existing ones, making pre_sync_task depend on the last synchronization task if any.  */
+/* This returns 1 if pre_sync_task really needs to wait for anything, and 0 otherwise (which means it does not actually need to be submitted) */
+static void _starpu_add_accessor(starpu_data_handle_t handle, struct starpu_task *pre_sync_task, int submit_pre_sync, struct starpu_task *post_sync_task, struct _starpu_task_wrapper_dlist *post_sync_task_dependency_slot)
 {
 	/* Add this task to the list of readers */
 	STARPU_ASSERT(!post_sync_task_dependency_slot->prev);
@@ -93,7 +94,7 @@ static void _starpu_add_accessor(starpu_data_handle_t handle, struct starpu_task
 		_STARPU_DEP_DEBUG("dep ID%lu -> %p\n", handle->last_submitted_ghost_sync_id, pre_sync_task);
 	}
 
-	if (!pre_sync_task->cl)
+	if (submit_pre_sync && !pre_sync_task->cl)
 	{
 		/* Add a reference to be released in _starpu_handle_job_termination */
 		_starpu_spin_lock(&handle->header_lock);
@@ -202,7 +203,14 @@ static void _starpu_add_sync_task(starpu_data_handle_t handle, struct starpu_tas
  * */
 /* NB : handle->sequential_consistency_mutex must be hold by the caller;
  * returns a task, to be submitted after releasing that mutex. */
-struct starpu_task *_starpu_detect_implicit_data_deps_with_handle(struct starpu_task *pre_sync_task, struct starpu_task *post_sync_task, struct _starpu_task_wrapper_dlist *post_sync_task_dependency_slot,
+/* *submit_pre_sync is whether the pre_sync_task will be submitted or not. The
+ * caller should set it to 1 if it intends to submit it anyway, or to 0
+ * if it may not submit it (because it has no other use for the task than
+ * synchronization). In the latter case,
+ * _starpu_detect_implicit_data_deps_with_handle will set it to 1 in case the
+ * task really needs to be submitted, or leave it to 0 if there is nothing to be
+ * waited for anyway. */
+struct starpu_task *_starpu_detect_implicit_data_deps_with_handle(struct starpu_task *pre_sync_task, int *submit_pre_sync, struct starpu_task *post_sync_task, struct _starpu_task_wrapper_dlist *post_sync_task_dependency_slot,
 								  starpu_data_handle_t handle, enum starpu_data_access_mode mode, unsigned task_handle_sequential_consistency)
 {
 	struct starpu_task *task = NULL;
@@ -228,8 +236,14 @@ struct starpu_task *_starpu_detect_implicit_data_deps_with_handle(struct starpu_
 
 		/* Skip tasks that are associated to a reduction phase so that
 		 * they do not interfere with the application. */
-		if (pre_sync_job->reduction_task || post_sync_job->reduction_task)
+		if (pre_sync_job->reduction_task) {
+			*submit_pre_sync = 1;
 			return NULL;
+		}
+		if (post_sync_job->reduction_task) {
+			*submit_pre_sync = 0;
+			return NULL;
+		}
 
 		/* In case we are generating the DAG, we add an implicit
 		 * dependency between the pre and the post sync tasks in case
@@ -264,7 +278,9 @@ struct starpu_task *_starpu_detect_implicit_data_deps_with_handle(struct starpu_
 		{
 			_STARPU_DEP_DEBUG("concurrently\n");
 			/* Can access concurrently with current tasks */
-			_starpu_add_accessor(handle, pre_sync_task, post_sync_task, post_sync_task_dependency_slot);
+			if (handle->last_sync_task != NULL)
+				*submit_pre_sync = 1;
+			_starpu_add_accessor(handle, pre_sync_task, *submit_pre_sync, post_sync_task, post_sync_task_dependency_slot);
 		}
 		else
 		{
@@ -277,6 +293,7 @@ struct starpu_task *_starpu_detect_implicit_data_deps_with_handle(struct starpu_
 					|| (l != &handle->last_submitted_accessors && handle->last_submitted_ghost_accessors_id))
 			{
 				/* Several previous accessors */
+				*submit_pre_sync = 1;
 
 				if (mode == STARPU_W)
 				{
@@ -308,7 +325,7 @@ struct starpu_task *_starpu_detect_implicit_data_deps_with_handle(struct starpu_
 					/* Make this task wait for the previous ones */
 					_starpu_add_sync_task(handle, sync_task, sync_task, post_sync_task);
 					/* And the requested task wait for this one */
-					_starpu_add_accessor(handle, pre_sync_task, post_sync_task, post_sync_task_dependency_slot);
+					_starpu_add_accessor(handle, pre_sync_task, *submit_pre_sync, post_sync_task, post_sync_task_dependency_slot);
 
 					task = sync_task;
 				}
@@ -321,6 +338,7 @@ struct starpu_task *_starpu_detect_implicit_data_deps_with_handle(struct starpu_
 				{
 					/* One accessor, make it the sync task,
 					 * and start depending on it. */
+					*submit_pre_sync = 1;
 					_STARPU_DEP_DEBUG("One previous accessor, depending on it\n");
 					handle->last_sync_task = l->task;
 					l->next = NULL;
@@ -343,10 +361,12 @@ struct starpu_task *_starpu_detect_implicit_data_deps_with_handle(struct starpu_
 				{
 					_STARPU_DEP_DEBUG("No previous accessor, no dependency\n");
 				}
-				_starpu_add_accessor(handle, pre_sync_task, post_sync_task, post_sync_task_dependency_slot);
+				_starpu_add_accessor(handle, pre_sync_task, *submit_pre_sync, post_sync_task, post_sync_task_dependency_slot);
 			}
 		}
 		handle->last_submitted_mode = mode;
+	} else {
+		*submit_pre_sync = 0;
 	}
         _STARPU_LOG_OUT();
 	return task;
@@ -423,9 +443,10 @@ void _starpu_detect_implicit_data_deps(struct starpu_task *task)
 		STARPU_PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
 		unsigned index = descrs[buffer].index;
 		unsigned task_handle_sequential_consistency = task->handles_sequential_consistency ? task->handles_sequential_consistency[index] : handle->sequential_consistency;
+		int submit_pre_sync = 1;
 		if (!task_handle_sequential_consistency)
 			j->sequential_consistency = 0;
-		new_task = _starpu_detect_implicit_data_deps_with_handle(task, task, &dep_slots[buffer], handle, mode, task_handle_sequential_consistency);
+		new_task = _starpu_detect_implicit_data_deps_with_handle(task, &submit_pre_sync, task, &dep_slots[buffer], handle, mode, task_handle_sequential_consistency);
 		STARPU_PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
 		if (new_task)
 		{
@@ -631,6 +652,7 @@ int _starpu_data_wait_until_available(starpu_data_handle_t handle, enum starpu_d
 	if (sequential_consistency)
 	{
 		struct starpu_task *sync_task, *new_task;
+		int submit_pre_sync = 0;
 		sync_task = starpu_task_create();
 		sync_task->name = sync_name;
 		sync_task->detach = 0;
@@ -639,7 +661,7 @@ int _starpu_data_wait_until_available(starpu_data_handle_t handle, enum starpu_d
 
 		/* It is not really a RW access, but we want to make sure that
 		 * all previous accesses are done */
-		new_task = _starpu_detect_implicit_data_deps_with_handle(sync_task, sync_task, &_starpu_get_job_associated_to_task(sync_task)->implicit_dep_slot, handle, mode, sequential_consistency);
+		new_task = _starpu_detect_implicit_data_deps_with_handle(sync_task, &submit_pre_sync, sync_task, &_starpu_get_job_associated_to_task(sync_task)->implicit_dep_slot, handle, mode, sequential_consistency);
 		STARPU_PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
 
 		if (new_task)
@@ -648,11 +670,17 @@ int _starpu_data_wait_until_available(starpu_data_handle_t handle, enum starpu_d
 			STARPU_ASSERT(!ret);
 		}
 
-		/* TODO detect if this is superflous */
-		int ret = _starpu_task_submit_internally(sync_task);
-		STARPU_ASSERT(!ret);
-		ret = starpu_task_wait(sync_task);
-		STARPU_ASSERT(ret == 0);
+		if (submit_pre_sync)
+		{
+			int ret = _starpu_task_submit_internally(sync_task);
+			STARPU_ASSERT(!ret);
+			ret = starpu_task_wait(sync_task);
+			STARPU_ASSERT(ret == 0);
+		}
+		else
+		{
+			starpu_task_destroy(sync_task);
+		}
 	}
 	else
 	{

+ 2 - 2
src/core/dependencies/implicit_data_deps.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2010-2012,2014,2015,2017,2018            Université de Bordeaux
+ * Copyright (C) 2010-2012,2014,2015,2017,2018,2020       Université de Bordeaux
  * Copyright (C) 2010,2011,2013,2015,2017,2018            CNRS
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -21,7 +21,7 @@
 #include <starpu.h>
 #include <common/config.h>
 
-struct starpu_task *_starpu_detect_implicit_data_deps_with_handle(struct starpu_task *pre_sync_task, struct starpu_task *post_sync_task, struct _starpu_task_wrapper_dlist *post_sync_task_dependency_slot,
+struct starpu_task *_starpu_detect_implicit_data_deps_with_handle(struct starpu_task *pre_sync_task, int *submit_pre_sync, struct starpu_task *post_sync_task, struct _starpu_task_wrapper_dlist *post_sync_task_dependency_slot,
 								  starpu_data_handle_t handle, enum starpu_data_access_mode mode, unsigned task_handle_sequential_consistency);
 int _starpu_test_implicit_data_deps_with_handle(starpu_data_handle_t handle, enum starpu_data_access_mode mode);
 void _starpu_detect_implicit_data_deps(struct starpu_task *task);

+ 28 - 10
src/datawizard/user_interactions.c

@@ -216,6 +216,7 @@ int starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(starpu_dat
 	{
 		struct starpu_task *new_task;
 		struct _starpu_job *pre_sync_job, *post_sync_job;
+		int submit_pre_sync = 0;
 		wrapper->pre_sync_task = starpu_task_create();
 		wrapper->pre_sync_task->name = "_starpu_data_acquire_cb_pre";
 		wrapper->pre_sync_task->detach = 1;
@@ -237,7 +238,7 @@ int starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(starpu_dat
 		if (quick)
 			pre_sync_job->quick_next = post_sync_job;
 
-		new_task = _starpu_detect_implicit_data_deps_with_handle(wrapper->pre_sync_task, wrapper->post_sync_task, &_starpu_get_job_associated_to_task(wrapper->post_sync_task)->implicit_dep_slot, handle, mode, sequential_consistency);
+		new_task = _starpu_detect_implicit_data_deps_with_handle(wrapper->pre_sync_task, &submit_pre_sync, wrapper->post_sync_task, &_starpu_get_job_associated_to_task(wrapper->post_sync_task)->implicit_dep_slot, handle, mode, sequential_consistency);
 		STARPU_PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
 
 		if (STARPU_UNLIKELY(new_task))
@@ -246,9 +247,17 @@ int starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(starpu_dat
 			STARPU_ASSERT(!ret);
 		}
 
-		/* TODO detect if this is superflous */
-		int ret = _starpu_task_submit_internally(wrapper->pre_sync_task);
-		STARPU_ASSERT(!ret);
+		if (submit_pre_sync)
+		{
+			int ret = _starpu_task_submit_internally(wrapper->pre_sync_task);
+			STARPU_ASSERT(!ret);
+		}
+		else
+		{
+			wrapper->pre_sync_task->detach = 0;
+			starpu_task_destroy(wrapper->pre_sync_task);
+			starpu_data_acquire_cb_pre_sync_callback(wrapper);
+		}
 	}
 	else
 	{
@@ -360,6 +369,7 @@ int starpu_data_acquire_on_node(starpu_data_handle_t handle, int node, enum star
 	if (sequential_consistency)
 	{
 		struct starpu_task *new_task;
+		int submit_pre_sync = 0;
 		wrapper.pre_sync_task = starpu_task_create();
 		wrapper.pre_sync_task->name = "_starpu_data_acquire_pre";
 		wrapper.pre_sync_task->detach = 0;
@@ -370,18 +380,26 @@ int starpu_data_acquire_on_node(starpu_data_handle_t handle, int node, enum star
 		wrapper.post_sync_task->detach = 1;
 		wrapper.post_sync_task->type = STARPU_TASK_TYPE_DATA_ACQUIRE;
 
-		new_task = _starpu_detect_implicit_data_deps_with_handle(wrapper.pre_sync_task, wrapper.post_sync_task, &_starpu_get_job_associated_to_task(wrapper.post_sync_task)->implicit_dep_slot, handle, mode, sequential_consistency);
+		new_task = _starpu_detect_implicit_data_deps_with_handle(wrapper.pre_sync_task, &submit_pre_sync, wrapper.post_sync_task, &_starpu_get_job_associated_to_task(wrapper.post_sync_task)->implicit_dep_slot, handle, mode, sequential_consistency);
 		STARPU_PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
-		if (new_task)
+
+		if (STARPU_UNLIKELY(new_task))
 		{
 			int ret = _starpu_task_submit_internally(new_task);
 			STARPU_ASSERT(!ret);
 		}
 
-		/* TODO detect if this is superflous */
-		wrapper.pre_sync_task->synchronous = 1;
-		int ret = _starpu_task_submit_internally(wrapper.pre_sync_task);
-		STARPU_ASSERT(!ret);
+		if (submit_pre_sync)
+		{
+			wrapper.pre_sync_task->synchronous = 1;
+			int ret = _starpu_task_submit_internally(wrapper.pre_sync_task);
+			STARPU_ASSERT(!ret);
+		}
+		else
+		{
+			wrapper.pre_sync_task->detach = 0;
+			starpu_task_destroy(wrapper.pre_sync_task);
+		}
 	}
 	else
 	{