Browse Source

Display task dependencies (ie. not only tag dependencies) in the graph
generated by fxt_tools.

Cédric Augonnet 15 years ago
parent
commit
dec83d29d2

+ 47 - 6
src/common/fxt.h

@@ -52,6 +52,8 @@
 #define STARPU_FUT_CODELET_TAG		0x5111
 #define STARPU_FUT_CODELET_TAG_DEPS	0x5112
 
+#define STARPU_FUT_TASK_DEPS		0x5137
+
 #define STARPU_FUT_DATA_COPY		0x5113
 #define STARPU_FUT_WORK_STEALING	0x5114
 
@@ -67,6 +69,7 @@
 #define	STARPU_FUT_END_CALLBACK	0x5120
 
 #define	STARPU_FUT_TASK_DONE		0x5121
+#define	STARPU_FUT_TAG_DONE		0x5138
 
 #define	STARPU_FUT_START_ALLOC		0x5122
 #define	STARPU_FUT_END_ALLOC		0x5123
@@ -110,6 +113,23 @@ do {									\
 	sprintf((char *)args, "%s", str);				\
 } while (0);
 
+#define STARPU_FUT_DO_PROBE4STR(CODE, P1, P2, P3, P4, str)		\
+do {									\
+	/* we add a \0 just in case ... */				\
+	size_t len = strlen((str)) + 1;					\
+	unsigned nbargs = 4 + (len + sizeof(unsigned long) - 1)/(sizeof(unsigned long));\
+	size_t total_len = FUT_SIZE(nbargs);				\
+	unsigned long *args =						\
+		fut_getstampedbuffer(FUT_CODE(CODE, nbargs), total_len);\
+	*(args++) = (unsigned long)(P1);				\
+	*(args++) = (unsigned long)(P2);				\
+	*(args++) = (unsigned long)(P3);				\
+	*(args++) = (unsigned long)(P4);				\
+	sprintf((char *)args, "%s", str);				\
+} while (0);
+
+
+
 /* workerkind = STARPU_FUT_CPU_KEY for instance */
 #define STARPU_TRACE_NEW_MEM_NODE(nodeid)			\
 	FUT_DO_PROBE2(STARPU_FUT_NEW_MEM_NODE, nodeid, syscall(SYS_gettid));
@@ -162,24 +182,43 @@ do {									\
 	FUT_DO_PROBE2(STARPU_FUT_END_PUSH_OUTPUT, job, syscall(SYS_gettid));
 
 #define STARPU_TRACE_CODELET_TAG(tag, job)	\
-	FUT_DO_PROBE2(STARPU_FUT_CODELET_TAG, tag, job)
+	FUT_DO_PROBE2(STARPU_FUT_CODELET_TAG, tag, (job)->job_id)
 
 #define STARPU_TRACE_CODELET_TAG_DEPS(tag_child, tag_father)	\
 	FUT_DO_PROBE2(STARPU_FUT_CODELET_TAG_DEPS, tag_child, tag_father)
 
-#define STARPU_TRACE_TASK_DONE(tag)							\
+#define STARPU_TRACE_TASK_DEPS(job_prev, job_succ)	\
+	FUT_DO_PROBE2(STARPU_FUT_TASK_DEPS, (job_prev)->job_id, (job_succ)->job_id)
+
+#define STARPU_TRACE_TASK_DONE(job)						\
+do {										\
+	struct starpu_task *task = (job)->task;					\
+	unsigned exclude_from_dag = (job)->exclude_from_dag;			\
+	if (task && task->cl 							\
+		&& task->cl->model						\
+		&& task->cl->model->symbol)					\
+	{									\
+		char *symbol = task->cl->model->symbol;				\
+		STARPU_FUT_DO_PROBE4STR(STARPU_FUT_TASK_DONE, (job)->job_id, syscall(SYS_gettid), (long unsigned)exclude_from_dag, 1, symbol);\
+	}									\
+	else {									\
+		FUT_DO_PROBE4(STARPU_FUT_TASK_DONE, (job)->job_id, syscall(SYS_gettid), (long unsigned)exclude_from_dag, 0);\
+	}									\
+} while(0);
+
+#define STARPU_TRACE_TAG_DONE(tag)						\
 do {										\
-	struct starpu_job_s *job = (tag)->job;						\
+	struct starpu_job_s *job = (tag)->job;					\
 	if (job && job->task 							\
 		&& job->task->cl						\
 		&& job->task->cl->model						\
 		&& job->task->cl->model->symbol)				\
 	{									\
 		char *symbol = job->task->cl->model->symbol;			\
-		STARPU_FUT_DO_PROBE3STR(STARPU_FUT_TASK_DONE, tag->id, syscall(SYS_gettid), 1, symbol);\
+		STARPU_FUT_DO_PROBE3STR(STARPU_FUT_TAG_DONE, (tag)->id, syscall(SYS_gettid), 1, symbol);\
 	}									\
 	else {									\
-		FUT_DO_PROBE3(STARPU_FUT_TASK_DONE, tag->id, syscall(SYS_gettid), 0);	\
+		FUT_DO_PROBE3(STARPU_FUT_TAG_DONE, (tag)->id, syscall(SYS_gettid), 0);\
 	}									\
 } while(0);
 
@@ -252,7 +291,9 @@ do {										\
 #define STARPU_TRACE_END_PUSH_OUTPUT(job)	do {} while(0);
 #define STARPU_TRACE_CODELET_TAG(tag, job)	do {} while(0);
 #define STARPU_TRACE_CODELET_TAG_DEPS(a, b)	do {} while(0);
-#define STARPU_TRACE_TASK_DONE(tag)		do {} while(0);
+#define STARPU_TRACE_TASK_DEPS(a, b)		do {} while(0);
+#define STARPU_TRACE_TASK_DONE(a)		do {} while(0);
+#define STARPU_TRACE_TAG_DONE(a)		do {} while(0);
 #define STARPU_TRACE_DATA_COPY(a, b, c)	do {} while(0);
 #define STARPU_TRACE_START_DRIVER_COPY(a,b,c,d)	do {} while(0);
 #define STARPU_TRACE_END_DRIVER_COPY(a,b,c,d)	do {} while(0);

+ 1 - 0
src/core/dependencies/implicit_data_deps.c

@@ -133,6 +133,7 @@ void _starpu_detect_implicit_data_deps(struct starpu_task *task)
 
 /* This function is called when a task has been executed so that we don't
  * create dependencies to task that do not exist anymore. */
+#warning TODO in order to generate a useful DAG with FXT, we may have to do something here to save the deps that are just implicit
 void _starpu_release_data_enforce_sequential_consistency(struct starpu_task *task, starpu_data_handle handle)
 {
 	PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);

+ 1 - 1
src/core/dependencies/tags.c

@@ -177,7 +177,7 @@ void _starpu_notify_tag_dependencies(struct starpu_tag_s *tag)
 	_starpu_spin_lock(&tag->lock);
 
 	tag->state = STARPU_DONE;
-	STARPU_TRACE_TASK_DONE(tag);
+	STARPU_TRACE_TAG_DONE(tag);
 
 	_starpu_notify_cg_list(&tag->tag_successors);
 

+ 2 - 0
src/core/dependencies/task_deps.c

@@ -79,6 +79,8 @@ void starpu_task_declare_deps_array(struct starpu_task *task, unsigned ndeps, st
 		starpu_job_t dep_job;
 		dep_job = _starpu_get_job_associated_to_task(dep_task);
 
+		STARPU_TRACE_TASK_DEPS(dep_job, job);
+
 		PTHREAD_MUTEX_LOCK(&dep_job->sync_mutex);
 		_starpu_task_add_succ(dep_job, cg);
 		PTHREAD_MUTEX_UNLOCK(&dep_job->sync_mutex);

+ 20 - 0
src/core/jobs.c

@@ -39,6 +39,18 @@ size_t _starpu_job_get_data_size(starpu_job_t j)
 	return size;
 }
 
+#ifdef STARPU_USE_FXT
+/* we need to identify each task to generate the DAG. */
+static unsigned long job_cnt = 0;
+
+void _starpu_exclude_task_from_dag(struct starpu_task *task)
+{
+	starpu_job_t j = _starpu_get_job_associated_to_task(task);
+
+	j->exclude_from_dag = 1;
+}
+#endif
+
 /* create an internal starpu_job_t structure to encapsulate the task */
 starpu_job_t __attribute__((malloc)) _starpu_job_create(struct starpu_task *task)
 {
@@ -53,6 +65,12 @@ starpu_job_t __attribute__((malloc)) _starpu_job_create(struct starpu_task *task
 	job->submitted = 0;
 	job->terminated = 0;
 
+#ifdef STARPU_USE_FXT
+	job->job_id = STARPU_ATOMIC_ADD(&job_cnt, 1);
+	/* display all tasks by default */
+	job->exclude_from_dag = 0;
+#endif
+
 	_starpu_cg_list_init(&job->job_successors);
 
 	PTHREAD_MUTEX_INIT(&job->sync_mutex, NULL);
@@ -136,6 +154,8 @@ void _starpu_handle_job_termination(starpu_job_t j, unsigned job_is_already_lock
 		_starpu_set_local_worker_status(STATUS_UNKNOWN);
 	}
 
+	STARPU_TRACE_TASK_DONE(j);
+
 	/* NB: we do not save those values before the callback, in case the
 	 * application changes some parameters eventually (eg. a task may not
 	 * be generated if the application is terminated). */

+ 9 - 0
src/core/jobs.h

@@ -71,12 +71,21 @@ LIST_TYPE(starpu_job,
 
 	unsigned submitted;
 	unsigned terminated;
+
+#ifdef STARPU_USE_FXT
+	unsigned long job_id;
+	unsigned exclude_from_dag;
+#endif
 );
 
 starpu_job_t __attribute__((malloc)) _starpu_job_create(struct starpu_task *task);
 void _starpu_job_destroy(starpu_job_t j);
 void _starpu_wait_job(starpu_job_t j);
 
+#ifdef STARPU_USE_FXT
+void _starpu_exclude_task_from_dag(struct starpu_task *task);
+#endif
+
 /* try to submit job j, enqueue it if it's not schedulable yet */
 unsigned _starpu_enforce_deps_and_schedule(starpu_job_t j, unsigned job_is_already_locked);
 unsigned _starpu_enforce_deps_starting_from_task(starpu_job_t j, unsigned job_is_already_locked);

+ 17 - 5
src/util/execute_on_all.c

@@ -16,6 +16,7 @@
 
 #include <starpu.h>
 #include <common/config.h>
+#include <core/jobs.h>
 
 struct wrapper_func_args {
 	void (*func)(void *);
@@ -28,11 +29,19 @@ static void wrapper_func(void *buffers[] __attribute__ ((unused)), void *_args)
 	args->func(args->arg);
 }
 
+static struct starpu_perfmodel_t wrapper_model = {
+	.type = STARPU_HISTORY_BASED,
+	.symbol = "_wrapper_model"
+};
+
+
 /* execute func(arg) on each worker that matches the "where" flag */
 void starpu_execute_on_each_worker(void (*func)(void *), void *arg, uint32_t where)
 {
+	int ret;
 	unsigned worker;
 	unsigned nworkers = starpu_worker_get_count();
+	struct starpu_task *tasks[STARPU_NMAXWORKERS];
 
 	/* create a wrapper codelet */
 	struct starpu_codelet_t wrapper_cl = {
@@ -42,11 +51,9 @@ void starpu_execute_on_each_worker(void (*func)(void *), void *arg, uint32_t whe
 		.opencl_func = wrapper_func,
 		/* XXX we do not handle Cell .. */
 		.nbuffers = 0,
-		.model = NULL
+		.model = &wrapper_model
 	};
 
-	struct starpu_task *tasks[STARPU_NMAXWORKERS];
-
 	struct wrapper_func_args args = {
 		.func = func,
 		.arg = arg
@@ -65,7 +72,11 @@ void starpu_execute_on_each_worker(void (*func)(void *), void *arg, uint32_t whe
 		tasks[worker]->detach = 0;
 		tasks[worker]->destroy = 0;
 
-		int ret = starpu_task_submit(tasks[worker]);
+#ifdef STARPU_USE_FXT
+		_starpu_exclude_task_from_dag(tasks[worker]);
+#endif
+
+		ret = starpu_task_submit(tasks[worker]);
 		if (ret == -ENODEV)
 		{
 			/* if the worker is not able to execute this tasks, we
@@ -80,7 +91,8 @@ void starpu_execute_on_each_worker(void (*func)(void *), void *arg, uint32_t whe
 	{
 		if (tasks[worker])
 		{
-			starpu_task_wait(tasks[worker]);
+			ret = starpu_task_wait(tasks[worker]);
+			STARPU_ASSERT(!ret);
 			starpu_task_destroy(tasks[worker]);
 		}
 	}

+ 30 - 8
src/util/malloc.c

@@ -57,15 +57,18 @@ static void malloc_pinned_cuda_codelet(void *buffers[] __attribute__((unused)),
 #endif
 
 #if defined(STARPU_USE_CUDA)// || defined(STARPU_USE_OPENCL)
+static struct starpu_perfmodel_t malloc_pinned_model = {
+	.type = STARPU_HISTORY_BASED,
+	.symbol = "malloc_pinned"
+};
+
 static starpu_codelet malloc_pinned_cl = {
-#ifdef STARPU_USE_CUDA
 	.cuda_func = malloc_pinned_cuda_codelet,
-#endif
 //#ifdef STARPU_USE_OPENCL
 //	.opencl_func = malloc_pinned_opencl_codelet,
 //#endif
-	.model = NULL,
-	.nbuffers = 0
+	.nbuffers = 0,
+	.model = &malloc_pinned_model
 };
 #endif
 
@@ -94,6 +97,10 @@ int starpu_data_malloc_pinned_if_possible(void **A, size_t dim)
 
 		task->synchronous = 1;
 
+#ifdef STARPU_USE_FXT
+		_starpu_exclude_task_from_dag(task);
+#endif
+
 		push_res = starpu_task_submit(task);
 		STARPU_ASSERT(push_res != -ENODEV);
 #endif
@@ -116,6 +123,10 @@ int starpu_data_malloc_pinned_if_possible(void **A, size_t dim)
 //
 //		task->synchronous = 1;
 //
+//#ifdef STARPU_USE_FXT
+//		_starpu_exclude_task_from_dag(task);
+//#endif
+//
 //		push_res = starpu_task_submit(task);
 //		STARPU_ASSERT(push_res != -ENODEV);
 //#endif
@@ -149,15 +160,18 @@ static void free_pinned_cuda_codelet(void *buffers[] __attribute__((unused)), vo
 //#endif
 
 #if defined(STARPU_USE_CUDA) // || defined(STARPU_USE_OPENCL)
+static struct starpu_perfmodel_t free_pinned_model = {
+	.type = STARPU_HISTORY_BASED,
+	.symbol = "free_pinned"
+};
+
 static starpu_codelet free_pinned_cl = {
-#ifdef STARPU_USE_CUDA
 	.cuda_func = free_pinned_cuda_codelet,
-#endif
 //#ifdef STARPU_USE_OPENCL
 //	.opencl_func = free_pinned_opencl_codelet,
 //#endif
-	.model = NULL,
-	.nbuffers = 0
+	.nbuffers = 0,
+	.model = &free_pinned_model
 };
 #endif
 
@@ -179,6 +193,10 @@ int starpu_data_free_pinned_if_possible(void *A)
 
 		task->synchronous = 1;
 
+#ifdef STARPU_USE_FXT
+		_starpu_exclude_task_from_dag(task);
+#endif
+
 		push_res = starpu_task_submit(task);
 		STARPU_ASSERT(push_res != -ENODEV);
 #endif
@@ -196,6 +214,10 @@ int starpu_data_free_pinned_if_possible(void *A)
 //
 //		task->synchronous = 1;
 //
+//#ifdef STARPU_USE_FXT
+//		_starpu_exclude_task_from_dag(task);
+//#endif
+//
 //		push_res = starpu_task_submit(task);
 //		STARPU_ASSERT(push_res != -ENODEV);
 //#endif

+ 13 - 3
tools/dag_dot.c

@@ -41,13 +41,23 @@ void terminate_dat_dot(void)
 
 void add_deps(uint64_t child, uint64_t father)
 {
-	fprintf(out_file, "\t \"%llx\"->\"%llx\"\n", 
+	fprintf(out_file, "\t \"tag_%llx\"->\"tag_%llx\"\n", 
 		(unsigned long long)father, (unsigned long long)child);
 }
 
-void dot_set_tag_done(uint64_t tag, char *color)
+void add_task_deps(unsigned long dep_prev, unsigned long dep_succ)
 {
+	fprintf(out_file, "\t \"task_%lx\"->\"task_%lx\"\n", dep_prev, dep_succ);
+} 
 
-	fprintf(out_file, "\t \"%llx\" \[ style=filled, label=\"\", color=\"%s\"]\n", 
+void dot_set_tag_done(uint64_t tag, const char *color)
+{
+
+	fprintf(out_file, "\t \"tag_%llx\" \[ style=filled, label=\"\", color=\"%s\"]\n", 
 		(unsigned long long)tag, color);
 }
+
+void dot_set_task_done(unsigned long job_id, const char *label, const char *color)
+{
+	fprintf(out_file, "\t \"task_%lx\" \[ style=filled, label=\"%s\", color=\"%s\"]\n", job_id, label, color);
+}

+ 48 - 1
tools/fxt_tool.c

@@ -484,8 +484,47 @@ static void handle_codelet_tag_deps(void)
 	add_deps(child, father);
 }
 
+static void handle_task_deps(void)
+{
+	unsigned long dep_prev = ev.param[0];
+	unsigned long dep_succ = ev.param[1];
+
+	/* There is a dependency between both job id : dep_prev -> dep_succ */
+	add_task_deps(dep_prev, dep_succ);
+}
+
 static void handle_task_done(void)
 {
+	unsigned long job_id;
+	job_id = ev.param[0];
+
+	unsigned long has_name = ev.param[3];
+	char *name = has_name?(char *)&ev.param[4]:"unknown";
+
+        int worker;
+        worker = find_worker_id(ev.param[1]);
+
+	const char *colour;
+	char buffer[32];
+	if (per_task_colour) {
+		snprintf(buffer, 32, "#%x%x%x",
+			get_colour_symbol_red(name)/4,
+			get_colour_symbol_green(name)/4,
+			get_colour_symbol_blue(name)/4);
+		colour = &buffer[0];
+	}
+	else {
+		colour= (worker < 0)?"#000000":get_worker_color(worker);
+	}
+
+	unsigned exclude_from_dag = ev.param[2];
+
+	if (!exclude_from_dag)
+		dot_set_task_done(job_id, name, colour);
+}
+
+static void handle_tag_done(void)
+{
 	uint64_t tag_id;
 	tag_id = ev.param[0];
 
@@ -495,7 +534,7 @@ static void handle_task_done(void)
         int worker;
         worker = find_worker_id(ev.param[1]);
 
-	char *colour;
+	const char *colour;
 	char buffer[32];
 	if (per_task_colour) {
 		snprintf(buffer, 32, "%.4f,%.4f,%.4f",
@@ -720,10 +759,18 @@ void parse_new_file(char *filename_in, char *file_prefix, uint64_t file_offset)
 				handle_codelet_tag_deps();
 				break;
 
+			case STARPU_FUT_TASK_DEPS:
+				handle_task_deps();
+				break;
+
 			case STARPU_FUT_TASK_DONE:
 				handle_task_done();
 				break;
 
+			case STARPU_FUT_TAG_DONE:
+				handle_tag_done();
+				break;
+
 			case STARPU_FUT_DATA_COPY:
 				if (!no_bus)
 				handle_data_copy();

+ 2 - 1
tools/fxt_tool.h

@@ -38,7 +38,8 @@
 extern void init_dag_dot(void);
 extern void terminate_dat_dot(void);
 extern void add_deps(uint64_t child, uint64_t father);
-extern void dot_set_tag_done(uint64_t tag, char *color);
+extern void dot_set_tag_done(uint64_t tag, const char *color);
+extern void dot_set_task_done(unsigned long job_id, const char *label, const char *color);
 
 void set_next_other_worker_color(int workerid);
 void set_next_cpu_worker_color(int workerid);