Browse Source

It is now possible to maintain the sequential coherency of a data handle. In
case the "sequential consistency flag" (set with
starpu_data_set_sequential_consistency_flag) is set (it is by default), the
access to the handle will be done in a sequential fashion thanks to implicit
task dependencies.
For instance if the application submits a task T1 that modifies A and then T2
and T3 that read A, T2 and T3 will implicitely depend on T1. If a T4 task that
modifies A is submitted after T2 and T3, T4 will implicitely depend on T2 and
T3.
NB: Implicit data dependencies introduced by
starpu_sync_data_with_mem(_non_blocking), starpu_(un)partition_data and
starpu_map_filters are NOT detected yet. It remains the responsability of the
programmer to add those dependencies explicitely. This should be done
automatically in the (close) future.

Cédric Augonnet 15 years ago
parent
commit
2e02972ba3

+ 2 - 0
include/starpu_data.h

@@ -67,6 +67,8 @@ unsigned starpu_get_worker_memory_node(unsigned workerid);
  * commit their changes in main memory (node 0). */
  * commit their changes in main memory (node 0). */
 void starpu_data_set_wb_mask(starpu_data_handle state, uint32_t wb_mask);
 void starpu_data_set_wb_mask(starpu_data_handle state, uint32_t wb_mask);
 
 
+void starpu_data_set_sequential_consistency_flag(starpu_data_handle handle, unsigned flag);
+
 unsigned starpu_test_if_data_is_allocated_on_node(starpu_data_handle handle, uint32_t memory_node);
 unsigned starpu_test_if_data_is_allocated_on_node(starpu_data_handle handle, uint32_t memory_node);
 
 
 #ifdef __cplusplus
 #ifdef __cplusplus

+ 2 - 0
src/Makefile.am

@@ -31,6 +31,7 @@ noinst_HEADERS = 						\
 	core/dependencies/cg.h					\
 	core/dependencies/cg.h					\
 	core/dependencies/tags.h				\
 	core/dependencies/tags.h				\
 	core/dependencies/htable.h				\
 	core/dependencies/htable.h				\
+	core/dependencies/implicit_data_deps.h			\
 	core/policies/eager_central_priority_policy.h		\
 	core/policies/eager_central_priority_policy.h		\
 	core/policies/sched_policy.h				\
 	core/policies/sched_policy.h				\
 	core/policies/random_policy.h				\
 	core/policies/random_policy.h				\
@@ -94,6 +95,7 @@ libstarpu_la_SOURCES = 						\
 	core/progress_hook.c					\
 	core/progress_hook.c					\
 	core/dependencies/cg.c					\
 	core/dependencies/cg.c					\
 	core/dependencies/dependencies.c			\
 	core/dependencies/dependencies.c			\
+	core/dependencies/implicit_data_deps.c			\
 	core/dependencies/tags.c				\
 	core/dependencies/tags.c				\
 	core/dependencies/task_deps.c				\
 	core/dependencies/task_deps.c				\
 	core/dependencies/htable.c				\
 	core/dependencies/htable.c				\

+ 119 - 0
src/core/dependencies/implicit_data_deps.c

@@ -0,0 +1,119 @@
+/*
+ * StarPU
+ * Copyright (C) INRIA 2008-2010 (see AUTHORS file)
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <starpu.h>
+#include <common/config.h>
+#include <datawizard/datawizard.h>
+
+static void _starpu_detect_implicit_data_deps_with_handle(struct starpu_task *task, starpu_data_handle handle, starpu_access_mode mode)
+{
+	PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
+
+	if (handle->sequential_consistency)
+	{
+		starpu_access_mode previous_mode = handle->last_submitted_mode;
+	
+		if (mode != STARPU_R)
+		{
+			if (previous_mode != STARPU_R)
+			{
+				/* (Read) Write */
+				/* This task depends on the previous writer */
+				if (handle->last_submitted_writer)
+				{
+					struct starpu_task *task_array[1] = {handle->last_submitted_writer};
+					starpu_task_declare_deps_array(task, 1, task_array);
+				}
+	
+				handle->last_submitted_writer = task;
+			}
+			else {
+				/* The task submitted previously were in read-only
+				 * mode: this task must depend on all those read-only
+				 * tasks and we get rid of the list of readers */
+			
+				/* Count the readers */
+				unsigned nreaders = 0;
+				struct starpu_task_list *l;
+				l = handle->last_submitted_readers;
+				while (l)
+				{
+					nreaders++;
+					l = l->next;
+				}
+	
+				struct starpu_task *task_array[nreaders];
+				unsigned i = 0;
+				l = handle->last_submitted_readers;
+				while (l)
+				{
+					STARPU_ASSERT(l->task);
+					task_array[i++] = l->task;
+
+					struct starpu_task_list *prev = l;
+					l = l->next;
+					free(prev);
+				}
+	
+				handle->last_submitted_readers = NULL;
+				handle->last_submitted_writer = task;
+	
+				starpu_task_declare_deps_array(task, nreaders, task_array);
+			}
+	
+		}
+		else {
+			/* Add a reader */
+			STARPU_ASSERT(task);
+	
+			/* Add this task to the list of readers */
+			struct starpu_task_list *link = malloc(sizeof(struct starpu_task_list));
+			link->task = task;
+			link->next = handle->last_submitted_readers;
+			handle->last_submitted_readers = link;
+
+
+			/* This task depends on the previous writer if any */
+			if (handle->last_submitted_writer)
+			{
+				struct starpu_task *task_array[1] = {handle->last_submitted_writer};
+				starpu_task_declare_deps_array(task, 1, task_array);
+			}
+		}
+	
+		handle->last_submitted_mode = mode;
+	}
+
+	PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
+}
+
+/* Create the implicit dependencies for a newly submitted task */
+void _starpu_detect_implicit_data_deps(struct starpu_task *task)
+{
+	if (!task->cl)
+		return;
+
+	unsigned nbuffers = task->cl->nbuffers;
+
+	unsigned buffer;
+	for (buffer = 0; buffer < nbuffers; buffer++)
+	{
+		starpu_data_handle handle = task->buffers[buffer].handle;
+		starpu_access_mode mode = task->buffers[buffer].mode;
+
+		_starpu_detect_implicit_data_deps_with_handle(task, handle, mode);
+	}
+}

+ 1 - 0
src/core/task.c

@@ -190,6 +190,7 @@ int starpu_submit_task(struct starpu_task *task)
 			return -ENODEV;
 			return -ENODEV;
 	}
 	}
 
 
+	_starpu_detect_implicit_data_deps(task);
 
 
 	/* internally, StarPU manipulates a starpu_job_t which is a wrapper around a
 	/* internally, StarPU manipulates a starpu_job_t which is a wrapper around a
 	* task structure, it is possible that this job structure was already
 	* task structure, it is possible that this job structure was already

+ 45 - 1
src/datawizard/coherency.c

@@ -401,7 +401,51 @@ void _starpu_push_task_output(struct starpu_task *task, uint32_t mask)
 	unsigned index;
 	unsigned index;
 	for (index = 0; index < nbuffers; index++)
 	for (index = 0; index < nbuffers; index++)
 	{
 	{
-		_starpu_release_data_on_node(descrs[index].handle, mask, local_node);
+		starpu_data_handle handle = descrs[index].handle;
+
+		_starpu_release_data_on_node(handle, mask, local_node);
+
+		PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
+
+		if (handle->sequential_consistency)
+		{
+			/* If this is the last writer, there is no point in adding
+			 * extra deps to that tasks that does not exists anymore */
+			if (task == handle->last_submitted_writer)
+				handle->last_submitted_writer = NULL;
+
+			/* Same if this is one of the readers: we go through the list
+			 * of readers and remove the task if it is found. */
+			struct starpu_task_list *l;
+			l = handle->last_submitted_readers;
+			struct starpu_task_list *prev = NULL;
+			while (l)
+			{
+				struct starpu_task_list *next = l->next;
+
+				if (l->task == task)
+				{
+					/* If we found the task in the reader list */
+					free(l);
+
+					if (prev)
+					{
+						prev->next = next;
+					}
+					else {
+						/* This is the first element of the list */
+						handle->last_submitted_readers = next;
+					}
+				}
+				else {
+					prev = l;
+				}
+
+				l = next;
+			}
+		}
+
+		PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
 	}
 	}
 
 
 	STARPU_TRACE_END_PUSH_OUTPUT(NULL);
 	STARPU_TRACE_END_PUSH_OUTPUT(NULL);

+ 31 - 0
src/datawizard/coherency.h

@@ -64,6 +64,11 @@ typedef struct starpu_local_data_state_t {
 
 
 struct starpu_data_requester_list_s;
 struct starpu_data_requester_list_s;
 
 
+struct starpu_task_list {
+	struct starpu_task *task;
+	struct starpu_task_list *next;
+};
+
 struct starpu_data_state_t {
 struct starpu_data_state_t {
 	struct starpu_data_requester_list_s *req_list;
 	struct starpu_data_requester_list_s *req_list;
 	/* the number of requests currently in the scheduling engine
 	/* the number of requests currently in the scheduling engine
@@ -104,6 +109,32 @@ struct starpu_data_state_t {
 	/* in some case, the application may explicitly tell StarPU that a
 	/* in some case, the application may explicitly tell StarPU that a
  	 * piece of data is not likely to be used soon again */
  	 * piece of data is not likely to be used soon again */
 	unsigned is_not_important;
 	unsigned is_not_important;
+
+	/* Does StarPU have to enforce some implicit data-dependencies ? */
+	unsigned sequential_consistency;
+
+	/* This lock should protect any operation to enforce
+	 * sequential_consistency */
+	pthread_mutex_t sequential_consistency_mutex;
+	
+	/* The last submitted task (or application data request) that declared
+	 * it would modify the piece of data ? Any task accessing the data in a
+	 * read-only mode should depend on that task implicitely if the
+	 * sequential_consistency flag is enabled. */
+	starpu_access_mode last_submitted_mode;
+	struct starpu_task *last_submitted_writer;
+	
+	struct starpu_task_list *last_submitted_readers;
+	
+	/* to synchronize with the latest for sync_data_with_mem* call. When
+	 * releasing a piece of data, we notify this cg, which unlocks
+	 * last_submitted_sync_task_apps */
+	struct starpu_cg_s *last_submitted_cg_apps; 
+	struct starpu_cg_s *current_cg_apps;
+
+	/* To synchronize with the last call(s) to sync_data_with_mem*,
+	 * synchronize with that (empty) task. */
+	struct starpu_task *last_submitted_sync_task_apps;
 };
 };
 
 
 void _starpu_display_msi_stats(void);
 void _starpu_display_msi_stats(void);

+ 2 - 0
src/datawizard/datawizard.h

@@ -30,6 +30,8 @@
 #include <datawizard/data_request.h>
 #include <datawizard/data_request.h>
 #include <datawizard/interfaces/data_interface.h>
 #include <datawizard/interfaces/data_interface.h>
 
 
+#include <core/dependencies/implicit_data_deps.h>
+
 void _starpu_datawizard_progress(uint32_t memory_node, unsigned may_alloc);
 void _starpu_datawizard_progress(uint32_t memory_node, unsigned may_alloc);
 
 
 #endif // __DATAWIZARD_H__
 #endif // __DATAWIZARD_H__

+ 10 - 0
src/datawizard/interfaces/data_interface.c

@@ -45,6 +45,16 @@ static void _starpu_register_new_data(starpu_data_handle handle,
 
 
 	handle->is_not_important = 0;
 	handle->is_not_important = 0;
 
 
+	handle->sequential_consistency = 1; /* enabled by default */
+
+	PTHREAD_MUTEX_INIT(&handle->sequential_consistency_mutex, NULL);
+	handle->last_submitted_mode = STARPU_R;
+	handle->last_submitted_writer = NULL;
+	handle->last_submitted_readers = NULL;
+	handle->last_submitted_cg_apps = NULL;
+	handle->current_cg_apps = NULL;
+	handle->last_submitted_sync_task_apps = NULL;
+
 	handle->wb_mask = wb_mask;
 	handle->wb_mask = wb_mask;
 
 
 	/* that new data is invalid from all nodes perpective except for the
 	/* that new data is invalid from all nodes perpective except for the

+ 21 - 2
src/datawizard/user_interactions.c

@@ -247,8 +247,9 @@ void starpu_advise_if_data_is_important(starpu_data_handle handle, unsigned is_i
 	for (child = 0; child < handle->nchildren; child++)
 	for (child = 0; child < handle->nchildren; child++)
 	{
 	{
 		/* make sure the intermediate children is advised as well */
 		/* make sure the intermediate children is advised as well */
-		if (handle->children[child].nchildren > 0)
-			starpu_advise_if_data_is_important(&handle->children[child], is_important);
+		struct starpu_data_state_t *child_handle = &handle->children[child];
+		if (child_handle->nchildren > 0)
+			starpu_advise_if_data_is_important(child_handle, is_important);
 	}
 	}
 
 
 	handle->is_not_important = !is_important;
 	handle->is_not_important = !is_important;
@@ -258,4 +259,22 @@ void starpu_advise_if_data_is_important(starpu_data_handle handle, unsigned is_i
 
 
 }
 }
 
 
+void starpu_data_set_sequential_consistency_flag(starpu_data_handle handle, unsigned flag)
+{
+	_starpu_spin_lock(&handle->header_lock);
+
+	unsigned child;
+	for (child = 0; child < handle->nchildren; child++)
+	{
+		/* make sure that the flags are applied to the children as well */
+		struct starpu_data_state_t *child_handle = &handle->children[child];
+		if (child_handle->nchildren > 0)
+			starpu_data_set_sequential_consistency_flag(child_handle, flag);
+	}
+
+	PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
+	handle->sequential_consistency = flag;
+	PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
 
 
+	_starpu_spin_unlock(&handle->header_lock);
+}