浏览代码

Make sure that we keep track of "ghost dependencies" when generating the DAG
with FxT. For instance, if we have the following sequence: f(Aw) g(Aw), and
that f is finished by the time g is submitted, we must display the f->g
dependency in the generated DAG.

Cédric Augonnet 15 年之前
父节点
当前提交
a88ab34ceb

+ 3 - 0
src/common/fxt.h

@@ -201,6 +201,9 @@ do {									\
 #define STARPU_TRACE_TASK_DEPS(job_prev, job_succ)	\
 	FUT_DO_PROBE2(STARPU_FUT_TASK_DEPS, (job_prev)->job_id, (job_succ)->job_id)
 
+#define STARPU_TRACE_GHOST_TASK_DEPS(ghost_prev_id, job_succ_id)		\
+	FUT_DO_PROBE2(STARPU_FUT_TASK_DEPS, (ghost_prev_id), (job_succ_id))
+
 #define STARPU_TRACE_TASK_DONE(job)						\
 do {										\
 	struct starpu_task *task = (job)->task;					\

+ 97 - 10
src/core/dependencies/implicit_data_deps.c

@@ -16,6 +16,7 @@
 
 #include <starpu.h>
 #include <common/config.h>
+#include <core/task.h>
 #include <datawizard/datawizard.h>
 
 /* This function adds the implicit task dependencies introduced by data
@@ -33,6 +34,19 @@ void _starpu_detect_implicit_data_deps_with_handle(struct starpu_task *pre_sync_
 
 	if (handle->sequential_consistency)
 	{
+
+#ifdef STARPU_USE_FXT
+		/* In case we are generating the DAG, we add an implicit
+		 * dependency between the pre and the post sync tasks in case
+		 * they are not the same. */
+		if (pre_sync_task != post_sync_task)
+		{
+			starpu_job_t pre_sync_job = _starpu_get_job_associated_to_task(pre_sync_task);
+			starpu_job_t post_sync_job = _starpu_get_job_associated_to_task(post_sync_task);
+			STARPU_TRACE_GHOST_TASK_DEPS(pre_sync_job->job_id, post_sync_job->job_id);
+		}
+#endif
+
 		starpu_access_mode previous_mode = handle->last_submitted_mode;
 	
 		if (mode & STARPU_W)
@@ -46,6 +60,18 @@ void _starpu_detect_implicit_data_deps_with_handle(struct starpu_task *pre_sync_
 					struct starpu_task *task_array[1] = {handle->last_submitted_writer};
 					starpu_task_declare_deps_array(pre_sync_task, 1, task_array);
 				}
+
+#ifdef STARPU_USE_FXT
+				/* If there is a ghost writer instead, we
+				 * should declare a ghost dependency here, and
+				 * invalidate the ghost value. */
+				if (handle->last_submitted_ghost_writer_id_is_valid)
+				{
+					starpu_job_t post_sync_job = _starpu_get_job_associated_to_task(post_sync_task);
+					STARPU_TRACE_GHOST_TASK_DEPS(handle->last_submitted_ghost_writer_id, post_sync_job->job_id);
+					handle->last_submitted_ghost_writer_id_is_valid = 0;
+				}
+#endif
 	
 				handle->last_submitted_writer = post_sync_task;
 			}
@@ -57,14 +83,7 @@ void _starpu_detect_implicit_data_deps_with_handle(struct starpu_task *pre_sync_
 				/* Count the readers */
 				unsigned nreaders = 0;
 				struct starpu_task_list *l;
-				l = handle->last_submitted_readers;
-				while (l)
-				{
-					nreaders++;
-					l = l->next;
-				}
-	
-				struct starpu_task *task_array[nreaders];
+				struct starpu_task *task_array[handle->last_submitted_readers_count];
 				unsigned i = 0;
 				l = handle->last_submitted_readers;
 				while (l)
@@ -76,7 +95,26 @@ void _starpu_detect_implicit_data_deps_with_handle(struct starpu_task *pre_sync_
 					l = l->next;
 					free(prev);
 				}
-	
+
+				handle->last_submitted_readers_count = 0;
+
+#ifdef STARPU_USE_FXT
+				/* Declare all dependencies with ghost readers */
+				starpu_job_t post_sync_job = _starpu_get_job_associated_to_task(post_sync_task);
+
+				struct starpu_jobid_list *ghost_readers_id = handle->last_submitted_ghost_readers_id;
+				while (ghost_readers_id)
+				{
+					unsigned long id = ghost_readers_id->id;
+					STARPU_TRACE_GHOST_TASK_DEPS(id, post_sync_job->job_id);
+
+					struct starpu_jobid_list *prev = ghost_readers_id;
+					ghost_readers_id = ghost_readers_id->next;
+					free(prev);
+				}
+				handle->last_submitted_ghost_readers_id = NULL;
+#endif
+
 				handle->last_submitted_readers = NULL;
 				handle->last_submitted_writer = post_sync_task;
 	
@@ -95,6 +133,7 @@ void _starpu_detect_implicit_data_deps_with_handle(struct starpu_task *pre_sync_
 			link->next = handle->last_submitted_readers;
 			handle->last_submitted_readers = link;
 
+			handle->last_submitted_readers_count++;
 
 			/* This task depends on the previous writer if any */
 			if (handle->last_submitted_writer)
@@ -102,6 +141,17 @@ void _starpu_detect_implicit_data_deps_with_handle(struct starpu_task *pre_sync_
 				struct starpu_task *task_array[1] = {handle->last_submitted_writer};
 				starpu_task_declare_deps_array(pre_sync_task, 1, task_array);
 			}
+
+#ifdef STARPU_USE_FXT
+			/* There was perhaps no last submitted writer but a
+			 * ghost one, we should report that here, and keep the
+			 * ghost writer valid */
+			if (handle->last_submitted_ghost_writer_id_is_valid)
+			{
+				starpu_job_t post_sync_job = _starpu_get_job_associated_to_task(post_sync_task);
+				STARPU_TRACE_GHOST_TASK_DEPS(handle->last_submitted_ghost_writer_id, post_sync_job->job_id);
+			}
+#endif
 		}
 	
 		handle->last_submitted_mode = mode;
@@ -133,17 +183,38 @@ void _starpu_detect_implicit_data_deps(struct starpu_task *task)
 
 /* This function is called when a task has been executed so that we don't
  * create dependencies to task that do not exist anymore. */
-#warning TODO in order to generate a useful DAG with FXT, we may have to do something here to save the deps that are just implicit
+/* NB: We maintain a list of "ghost deps" in case FXT is enabled. Ghost
+ * dependencies are the dependencies that are implicitely enforced by StarPU
+ * even if they do not imply a real dependency. For instance in the following
+ * sequence, f(Ar) g(Ar) h(Aw), we expect to have h depend on both f and g, but
+ * if h is submitted after the termination of f or g, StarPU will not create a
+ * dependency as this is not needed anymore. */
 void _starpu_release_data_enforce_sequential_consistency(struct starpu_task *task, starpu_data_handle handle)
 {
 	PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
 
 	if (handle->sequential_consistency)
 	{
+
 		/* If this is the last writer, there is no point in adding
 		 * extra deps to that tasks that does not exists anymore */
 		if (task == handle->last_submitted_writer)
+		{
 			handle->last_submitted_writer = NULL;
+			
+#ifdef STARPU_USE_FXT
+			/* Save the previous writer as the ghost last writer */
+			handle->last_submitted_ghost_writer_id_is_valid = 1;
+			starpu_job_t ghost_job = _starpu_get_job_associated_to_task(task);
+			handle->last_submitted_ghost_writer_id = ghost_job->job_id;
+#endif
+			
+		}
+		
+		/* XXX can a task be both the last writer associated to a data
+		 * and be in its list of readers ? If not, we should not go
+		 * through the entire list once we have detected it was the
+		 * last writer. */
 
 		/* Same if this is one of the readers: we go through the list
 		 * of readers and remove the task if it is found. */
@@ -159,6 +230,16 @@ void _starpu_release_data_enforce_sequential_consistency(struct starpu_task *tas
 				/* If we found the task in the reader list */
 				free(l);
 
+#ifdef STARPU_USE_FXT
+				/* Save the job id of the reader task in the ghost reader linked list list */
+				starpu_job_t ghost_reader_job = _starpu_get_job_associated_to_task(task);
+				struct starpu_jobid_list *link = malloc(sizeof(struct starpu_jobid_list));
+				STARPU_ASSERT(link);
+				link->next = handle->last_submitted_ghost_readers_id;
+				link->id = ghost_reader_job->job_id; 
+				handle->last_submitted_ghost_readers_id = link;
+#endif
+
 				if (prev)
 				{
 					prev->next = next;
@@ -167,6 +248,12 @@ void _starpu_release_data_enforce_sequential_consistency(struct starpu_task *tas
 					/* This is the first element of the list */
 					handle->last_submitted_readers = next;
 				}
+
+				/* XXX can we really find the same task again
+				 * once we have found it ? Otherwise, we should
+				 * avoid going through the entire list and stop
+				 * as soon as we find the task. TODO: check how
+				 * duplicate dependencies are treated. */
 			}
 			else {
 				prev = l;

+ 1 - 0
src/core/task.h

@@ -19,6 +19,7 @@
 
 #include <starpu.h>
 #include <common/config.h>
+#include <core/jobs.h>
 
 /* In order to implement starpu_task_wait_for_all, we keep track of the number of
  * task currently submitted */

+ 18 - 0
src/datawizard/coherency.h

@@ -69,6 +69,11 @@ struct starpu_task_list {
 	struct starpu_task_list *next;
 };
 
+struct starpu_jobid_list {
+	unsigned long id;
+	struct starpu_jobid_list *next;
+};
+
 struct starpu_data_state_t {
 	struct starpu_data_requester_list_s *req_list;
 	/* the number of requests currently in the scheduling engine
@@ -127,7 +132,20 @@ struct starpu_data_state_t {
 	 * sequential_consistency flag is enabled. */
 	starpu_access_mode last_submitted_mode;
 	struct starpu_task *last_submitted_writer;
+	unsigned last_submitted_readers_count;
 	struct starpu_task_list *last_submitted_readers;
+
+#ifdef STARPU_USE_FXT
+	/* If FxT is enabled, we keep track of "ghost dependencies": that is to
+	 * say the dependencies that are not needed anymore, but that should
+	 * appear in the post-mortem DAG. For instance if we have the sequence
+	 * f(Aw) g(Aw), and that g is submitted after the termination of f, we
+	 * want to have f->g appear in the DAG even if StarPU does not need to
+	 * enforce this dependency anymore.*/
+	unsigned last_submitted_ghost_writer_id_is_valid;
+	unsigned long last_submitted_ghost_writer_id;
+	struct starpu_jobid_list *last_submitted_ghost_readers_id;
+#endif
 	
 	struct starpu_task_list *post_sync_tasks;
 	unsigned post_sync_tasks_cnt;

+ 7 - 0
src/datawizard/interfaces/data_interface.c

@@ -49,10 +49,17 @@ static void _starpu_register_new_data(starpu_data_handle handle,
 	PTHREAD_MUTEX_INIT(&handle->sequential_consistency_mutex, NULL);
 	handle->last_submitted_mode = STARPU_R;
 	handle->last_submitted_writer = NULL;
+	handle->last_submitted_readers_count = 0;
 	handle->last_submitted_readers = NULL;
 	handle->post_sync_tasks = NULL;
 	handle->post_sync_tasks_cnt = 0;
 
+#ifdef STARPU_USE_FXT
+	handle->last_submitted_ghost_writer_id_is_valid = 0;
+	handle->last_submitted_ghost_writer_id = 0;
+	handle->last_submitted_ghost_readers_id = NULL;
+#endif
+
 	handle->wb_mask = wb_mask;
 
 	/* Store some values directly in the handle not to recompute them all

+ 1 - 0
src/datawizard/user_interactions.c

@@ -16,6 +16,7 @@
 
 #include <common/config.h>
 #include <common/utils.h>
+#include <core/task.h>
 #include <datawizard/coherency.h>
 #include <datawizard/copy_driver.h>
 #include <datawizard/write_back.h>