瀏覽代碼

starpu_data_sync_with_mem now enforces sequential consistency if required.

Cédric Augonnet 15 年之前
父節點
當前提交
2d974ba10e

+ 75 - 15
src/core/dependencies/implicit_data_deps.c

@@ -18,12 +18,19 @@
 #include <common/config.h>
 #include <datawizard/datawizard.h>
 
-static void _starpu_detect_implicit_data_deps_with_handle(struct starpu_task *task, starpu_data_handle handle, starpu_access_mode mode)
+/* This function adds the implicit task dependencies introduced by data
+ * sequential consistency. Two tasks are provided: pre_sync and post_sync which
+ * respectively indicates which task is going to depend on the previous deps
+ * and on which task future deps should wait. In the case of a dependency
+ * introduced by a task submission, both tasks are just the submitted task, but
+ * in the case of user interactions with the DSM, these may be different tasks.
+ * */
+/* NB : handle->sequential_consistency_mutex must be hold by the caller */
+void _starpu_detect_implicit_data_deps_with_handle(struct starpu_task *pre_sync_task, struct starpu_task *post_sync_task,
+						starpu_data_handle handle, starpu_access_mode mode)
 {
 	STARPU_ASSERT(!(mode & STARPU_SCRATCH));
 
-	PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
-
 	if (handle->sequential_consistency)
 	{
 		starpu_access_mode previous_mode = handle->last_submitted_mode;
@@ -37,10 +44,10 @@ static void _starpu_detect_implicit_data_deps_with_handle(struct starpu_task *ta
 				if (handle->last_submitted_writer)
 				{
 					struct starpu_task *task_array[1] = {handle->last_submitted_writer};
-					starpu_task_declare_deps_array(task, 1, task_array);
+					starpu_task_declare_deps_array(pre_sync_task, 1, task_array);
 				}
 	
-				handle->last_submitted_writer = task;
+				handle->last_submitted_writer = post_sync_task;
 			}
 			else {
 				/* The task submitted previously were in read-only
@@ -71,19 +78,20 @@ static void _starpu_detect_implicit_data_deps_with_handle(struct starpu_task *ta
 				}
 	
 				handle->last_submitted_readers = NULL;
-				handle->last_submitted_writer = task;
+				handle->last_submitted_writer = post_sync_task;
 	
-				starpu_task_declare_deps_array(task, nreaders, task_array);
+				starpu_task_declare_deps_array(pre_sync_task, nreaders, task_array);
 			}
 	
 		}
 		else {
 			/* Add a reader */
-			STARPU_ASSERT(task);
+			STARPU_ASSERT(pre_sync_task);
+			STARPU_ASSERT(post_sync_task);
 	
 			/* Add this task to the list of readers */
 			struct starpu_task_list *link = malloc(sizeof(struct starpu_task_list));
-			link->task = task;
+			link->task = post_sync_task;
 			link->next = handle->last_submitted_readers;
 			handle->last_submitted_readers = link;
 
@@ -92,21 +100,18 @@ static void _starpu_detect_implicit_data_deps_with_handle(struct starpu_task *ta
 			if (handle->last_submitted_writer)
 			{
 				struct starpu_task *task_array[1] = {handle->last_submitted_writer};
-				starpu_task_declare_deps_array(task, 1, task_array);
+				starpu_task_declare_deps_array(pre_sync_task, 1, task_array);
 			}
 		}
 	
 		handle->last_submitted_mode = mode;
 	}
-
-	PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
 }
 
 /* Create the implicit dependencies for a newly submitted task */
 void _starpu_detect_implicit_data_deps(struct starpu_task *task)
 {
-	if (!task->cl)
-		return;
+	STARPU_ASSERT(task->cl);
 
 	unsigned nbuffers = task->cl->nbuffers;
 
@@ -120,7 +125,9 @@ void _starpu_detect_implicit_data_deps(struct starpu_task *task)
 		if (mode & STARPU_SCRATCH)
 			continue;
 
-		_starpu_detect_implicit_data_deps_with_handle(task, handle, mode);
+		PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
+		_starpu_detect_implicit_data_deps_with_handle(task, task, handle, mode);
+		PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
 	}
 }
 
@@ -171,4 +178,57 @@ void _starpu_release_data_enforce_sequential_consistency(struct starpu_task *tas
 	PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
 }
 
+void _starpu_add_post_sync_tasks(struct starpu_task *post_sync_task, starpu_data_handle handle)
+{
+	PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
+
+	if (handle->sequential_consistency)
+	{
+		handle->post_sync_tasks_cnt++;
+
+		struct starpu_task_list *link = malloc(sizeof(struct starpu_task_list));
+		link->task = post_sync_task;
+		link->next = handle->post_sync_tasks;
+		handle->post_sync_tasks = link;		
+	}
+
+	PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
+}
+
+void _starpu_unlock_post_sync_tasks(starpu_data_handle handle)
+{
+	struct starpu_task_list *post_sync_tasks = NULL;
+	unsigned do_submit_tasks = 0;
+
+	PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
+
+	if (handle->sequential_consistency)
+	{
+		STARPU_ASSERT(handle->post_sync_tasks_cnt > 0);
+
+		if (--handle->post_sync_tasks_cnt == 0)
+		{
+			/* unlock all tasks : we need not hold the lock while unlocking all these tasks */
+			do_submit_tasks = 1;
+			post_sync_tasks = handle->post_sync_tasks;
+			handle->post_sync_tasks = NULL;
+		}
+
+	}
 
+	PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
+
+	if (do_submit_tasks)
+	{
+		struct starpu_task_list *link = post_sync_tasks;
+
+		while (link) {
+			/* There is no need to depend on that task now, since it was already unlocked */
+			_starpu_release_data_enforce_sequential_consistency(link->task, handle);
+
+			int ret = starpu_task_submit(link->task);
+			STARPU_ASSERT(!ret);
+			link = link->next;
+		}
+	}
+}

+ 5 - 0
src/core/dependencies/implicit_data_deps.h

@@ -20,8 +20,13 @@
 #include <starpu.h>
 #include <common/config.h>
 
+void _starpu_detect_implicit_data_deps_with_handle(struct starpu_task *pre_sync_task, struct starpu_task *post_sync_task,
+						starpu_data_handle handle, starpu_access_mode mode);
 void _starpu_detect_implicit_data_deps(struct starpu_task *task);
 void _starpu_release_data_enforce_sequential_consistency(struct starpu_task *task, starpu_data_handle handle);
 
+void _starpu_add_post_sync_tasks(struct starpu_task *post_sync_task, starpu_data_handle handle);
+void _starpu_unlock_post_sync_tasks(starpu_data_handle handle);
+
 #endif // __IMPLICIT_DATA_DEPS_H__
 

+ 2 - 2
src/core/task.c

@@ -187,9 +187,9 @@ int starpu_task_submit(struct starpu_task *task)
 		if (task->execute_on_a_specific_worker 
 			&& !_starpu_worker_may_execute_task(task->workerid, where))
 			return -ENODEV;
-	}
 
-	_starpu_detect_implicit_data_deps(task);
+		_starpu_detect_implicit_data_deps(task);
+	}
 
 	/* internally, StarPU manipulates a starpu_job_t which is a wrapper around a
 	* task structure, it is possible that this job structure was already

+ 2 - 10
src/datawizard/coherency.h

@@ -121,18 +121,10 @@ struct starpu_data_state_t {
 	 * sequential_consistency flag is enabled. */
 	starpu_access_mode last_submitted_mode;
 	struct starpu_task *last_submitted_writer;
-	
 	struct starpu_task_list *last_submitted_readers;
 	
-	/* to synchronize with the latest for sync_data_with_mem* call. When
-	 * releasing a piece of data, we notify this cg, which unlocks
-	 * last_submitted_sync_task_apps */
-	struct starpu_cg_s *last_submitted_cg_apps; 
-	struct starpu_cg_s *current_cg_apps;
-
-	/* To synchronize with the last call(s) to sync_data_with_mem*,
-	 * synchronize with that (empty) task. */
-	struct starpu_task *last_submitted_sync_task_apps;
+	struct starpu_task_list *post_sync_tasks;
+	unsigned post_sync_tasks_cnt;
 };
 
 void _starpu_display_msi_stats(void);

+ 2 - 3
src/datawizard/interfaces/data_interface.c

@@ -48,9 +48,8 @@ static void _starpu_register_new_data(starpu_data_handle handle,
 	handle->last_submitted_mode = STARPU_R;
 	handle->last_submitted_writer = NULL;
 	handle->last_submitted_readers = NULL;
-	handle->last_submitted_cg_apps = NULL;
-	handle->current_cg_apps = NULL;
-	handle->last_submitted_sync_task_apps = NULL;
+	handle->post_sync_tasks = NULL;
+	handle->post_sync_tasks_cnt = 0;
 
 	handle->wb_mask = wb_mask;
 

+ 50 - 2
src/datawizard/user_interactions.c

@@ -49,6 +49,8 @@ struct state_and_node {
 	unsigned async;
 	void (*callback)(void *);
 	void *callback_arg;
+	struct starpu_task *pre_sync_task;
+	struct starpu_task *post_sync_task;
 };
 
 /*
@@ -105,6 +107,16 @@ int starpu_data_sync_with_mem_non_blocking(starpu_data_handle handle,
 		_starpu_sync_data_with_mem_continuation_non_blocking(statenode);
 	}
 
+#warning TODO fix sequential consistency !
+	/* XXX this is a temporary hack to have the starpu_sync_data_with_mem
+	 * function working properly. It should be fixed later on. */
+	PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
+	if (handle->sequential_consistency)
+	{
+		handle->post_sync_tasks_cnt++;
+	}
+	PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
+
 	return 0;
 }
 
@@ -133,7 +145,6 @@ static inline void _starpu_sync_data_with_mem_continuation(void *arg)
 	PTHREAD_MUTEX_UNLOCK(&statenode->lock);
 }
 
-
 /* The data must be released by calling starpu_data_release_from_mem later on */
 int starpu_data_sync_with_mem(starpu_data_handle handle, starpu_access_mode mode)
 {
@@ -153,6 +164,30 @@ int starpu_data_sync_with_mem(starpu_data_handle handle, starpu_access_mode mode
 		.finished = 0
 	};
 
+//	fprintf(stderr, "TAKE sequential_consistency_mutex starpu_data_sync_with_mem\n");
+	PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
+	int sequential_consistency = handle->sequential_consistency;
+	if (sequential_consistency)
+	{
+		statenode.pre_sync_task = starpu_task_create();
+		statenode.pre_sync_task->detach = 0;
+
+		statenode.post_sync_task = starpu_task_create();
+		statenode.post_sync_task->detach = 1;
+
+		_starpu_detect_implicit_data_deps_with_handle(statenode.pre_sync_task, statenode.post_sync_task, handle, mode);
+		PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
+
+		/* TODO detect if this is superflous */
+		statenode.pre_sync_task->synchronous = 1;
+		int ret = starpu_task_submit(statenode.pre_sync_task);
+		STARPU_ASSERT(!ret);
+		//starpu_task_wait(statenode.pre_sync_task);
+	}
+	else {
+		PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
+	}
+
 	/* we try to get the data, if we do not succeed immediately, we set a
  	* callback function that will be executed automatically when the data is
  	* available again, otherwise we fetch the data directly */
@@ -160,7 +195,11 @@ int starpu_data_sync_with_mem(starpu_data_handle handle, starpu_access_mode mode
 			_starpu_sync_data_with_mem_continuation, &statenode))
 	{
 		/* no one has locked this data yet, so we proceed immediately */
-		_starpu_sync_data_with_mem_continuation(&statenode);
+		unsigned r = (mode & STARPU_R);
+		unsigned w = (mode & STARPU_W);
+
+		int ret = _starpu_fetch_data_on_node(handle, 0, r, w, 0);
+		STARPU_ASSERT(!ret);
 	}
 	else {
 		PTHREAD_MUTEX_LOCK(&statenode.lock);
@@ -169,6 +208,12 @@ int starpu_data_sync_with_mem(starpu_data_handle handle, starpu_access_mode mode
 		PTHREAD_MUTEX_UNLOCK(&statenode.lock);
 	}
 
+	/* At that moment, the caller holds a reference to the piece of data.
+	 * We enqueue the "post" sync task in the list associated to the handle
+	 * so that it is submitted by the starpu_data_release_from_mem
+	 * function. */
+	_starpu_add_post_sync_tasks(statenode.post_sync_task, handle);
+
 	return 0;
 }
 
@@ -180,6 +225,9 @@ void starpu_data_release_from_mem(starpu_data_handle handle)
 
 	/* The application can now release the rw-lock */
 	_starpu_release_data_on_node(handle, 0, 0);
+
+	/* In case there are some implicit dependencies, unlock the "post sync" tasks */
+	_starpu_unlock_post_sync_tasks(handle);
 }
 
 static void _prefetch_data_on_node(void *arg)