Browse Source

Factorize numbering jobs and submitorder, and add DATA_WONT_USE events for tasks.rec

Samuel Thibault 7 years ago
parent
commit
aa2f42befe

+ 0 - 1
include/starpu_task.h

@@ -165,7 +165,6 @@ struct starpu_task
 	void *prologue_callback_pop_arg;
 
 	starpu_tag_t tag_id;
-	unsigned long submit_order;
 
 	unsigned cl_arg_free:1;
 	unsigned callback_arg_free:1;

+ 5 - 0
src/common/fxt.c

@@ -40,6 +40,11 @@ int _starpu_fxt_started = 0;
 starpu_pthread_mutex_t _starpu_fxt_started_mutex = STARPU_PTHREAD_MUTEX_INITIALIZER;
 starpu_pthread_cond_t _starpu_fxt_started_cond = STARPU_PTHREAD_COND_INITIALIZER;
 
+/* we need to identify each task to generate the DAG. */
+unsigned long _starpu_job_cnt = 0;
+/* and their submission order. */
+unsigned long _starpu_submit_order = 0;
+
 static int _starpu_written = 0;
 
 static int _starpu_id;

+ 25 - 1
src/common/fxt.h

@@ -103,6 +103,8 @@
 
 #define _STARPU_FUT_TASK_NAME	0x512b
 
+#define _STARPU_FUT_DATA_WONT_USE	0x512c
+
 #define	_STARPU_FUT_START_MEMRECLAIM	0x5131
 #define	_STARPU_FUT_END_MEMRECLAIM	0x5132
 
@@ -251,6 +253,24 @@ static inline void _starpu_fxt_wait_initialisation()
 	STARPU_PTHREAD_MUTEX_UNLOCK(&_starpu_fxt_started_mutex);
 }
 
+extern unsigned long _starpu_submit_order;
+
+static inline unsigned long _starpu_fxt_get_submit_order(void)
+{
+	unsigned long ret = STARPU_ATOMIC_ADDL(&_starpu_submit_order, 1);
+	STARPU_ASSERT_MSG(_starpu_submit_order != 0, "Oops, submit_order wrapped!");
+	return ret;
+}
+
+extern unsigned long _starpu_job_cnt;
+
+static inline unsigned long _starpu_fxt_get_job_id(void)
+{
+	unsigned long ret = STARPU_ATOMIC_ADDL(&_starpu_job_cnt, 1);
+	STARPU_ASSERT_MSG(_starpu_job_cnt != 0, "Oops, job_id wrapped!");
+	return ret;
+}
+
 long _starpu_gettid(void);
 
 /* Initialize the FxT library. */
@@ -703,6 +723,9 @@ do {										\
 #define _STARPU_TRACE_DATA_COPY(src_node, dst_node, size)	\
 	FUT_DO_PROBE3(_STARPU_FUT_DATA_COPY, src_node, dst_node, size)
 
+#define _STARPU_TRACE_DATA_WONT_USE(handle)						\
+	FUT_DO_PROBE4(_STARPU_FUT_DATA_WONT_USE, handle, _starpu_fxt_get_submit_order(), _starpu_fxt_get_job_id(), _starpu_gettid())
+
 #define _STARPU_TRACE_START_DRIVER_COPY(src_node, dst_node, size, com_id, prefetch, handle) \
 	FUT_DO_PROBE6(_STARPU_FUT_START_DRIVER_COPY, src_node, dst_node, size, com_id, prefetch, handle)
 
@@ -743,7 +766,7 @@ do {										\
 	FUT_DO_PROBE1(_STARPU_FUT_WORKER_SLEEP_END, _starpu_gettid());
 
 #define _STARPU_TRACE_TASK_SUBMIT(job, iter, subiter)	\
-	FUT_DO_PROBE6(_STARPU_FUT_TASK_SUBMIT, (job)->job_id, iter, subiter, (job)->task->submit_order, (job)->task->priority, _starpu_gettid());
+	FUT_DO_PROBE6(_STARPU_FUT_TASK_SUBMIT, (job)->job_id, iter, subiter, _starpu_fxt_get_submit_order(), (job)->task->priority, _starpu_gettid());
 
 #define _STARPU_TRACE_TASK_SUBMIT_START()	\
 	FUT_DO_PROBE1(_STARPU_FUT_TASK_SUBMIT_START, _starpu_gettid());
@@ -1106,6 +1129,7 @@ do {										\
 #define _STARPU_TRACE_DATA_NAME(a, b)		do {(void)(a); (void)(b);} while(0)
 #define _STARPU_TRACE_DATA_COORDINATES(a, b, c)	do {(void)(a); (void)(b); (void)(c);} while(0)
 #define _STARPU_TRACE_DATA_COPY(a, b, c)		do {(void)(a); (void)(b); (void)(c);} while(0)
+#define _STARPU_TRACE_DATA_WONT_USE(a)		do {(void)(a);} while(0)
 #define _STARPU_TRACE_START_DRIVER_COPY(a,b,c,d,e,f)	do {(void)(a); (void)(b); (void)(c); (void)(d); (void)(e); (void)(f);} while(0)
 #define _STARPU_TRACE_END_DRIVER_COPY(a,b,c,d,e)	do {(void)(a); (void)(b); (void)(c); (void)(d); (void)(e);} while(0)
 #define _STARPU_TRACE_START_DRIVER_COPY_ASYNC(a,b)	do {(void)(a); (void)(b);} while(0)

+ 1 - 3
src/core/jobs.c

@@ -32,8 +32,6 @@
 #include <core/debug.h>
 #include <limits.h>
 
-/* we need to identify each task to generate the DAG. */
-static unsigned long job_cnt = 0;
 static int max_memory_use;
 static int njobs, maxnjobs;
 
@@ -93,7 +91,7 @@ struct _starpu_job* STARPU_ATTRIBUTE_MALLOC _starpu_job_create(struct starpu_tas
 		|| STARPU_AYU_EVENT)
 #endif
 	{
-		job->job_id = STARPU_ATOMIC_ADDL(&job_cnt, 1);
+		job->job_id = _starpu_fxt_get_job_id();
 		STARPU_AYU_ADDTASK(job->job_id, task);
 		STARPU_ASSERT(job->job_id != ULONG_MAX);
 	}

+ 0 - 10
src/core/task.c

@@ -665,16 +665,6 @@ int starpu_task_submit(struct starpu_task *task)
 
 	if (!continuation)
 	{
-#ifdef STARPU_USE_FXT
-		{
-			static unsigned long submit_order = 0;
-			if (task->submit_order == 0)
-			{
-				task->submit_order = STARPU_ATOMIC_ADDL(&submit_order, 1);
-				STARPU_ASSERT(task->submit_order != ULONG_MAX);
-			}
-		}
-#endif
 		_STARPU_TRACE_TASK_SUBMIT(j,
 			_starpu_get_sched_ctx_struct(task->sched_ctx)->iterations[0],
 			_starpu_get_sched_ctx_struct(task->sched_ctx)->iterations[1]);

+ 1 - 0
src/datawizard/user_interactions.c

@@ -589,6 +589,7 @@ static void _starpu_data_wont_use(void *data)
 
 void starpu_data_wont_use(starpu_data_handle_t handle)
 {
+	_STARPU_TRACE_DATA_WONT_USE(handle);
 	starpu_data_acquire_on_node_cb(handle, STARPU_ACQUIRE_NO_NODE_LOCK_ALL, STARPU_R, _starpu_data_wont_use, handle);
 }
 

+ 19 - 0
src/debug/traces/starpu_fxt.c

@@ -2106,6 +2106,21 @@ static void handle_data_coordinates(struct fxt_ev_64 *ev, struct starpu_fxt_opti
 		data->dims[i] = ev->param[i+2];
 }
 
+static void handle_data_wont_use(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
+{
+	unsigned long handle = ev->param[0];
+	unsigned long submit_order = ev->param[1];
+	unsigned long job_id = ev->param[2];
+
+	fprintf(tasks_file, "Control: WontUse\n");
+	fprintf(tasks_file, "JobId: %lu\n", job_id);
+	fprintf(tasks_file, "SubmitOrder: %lu\n", submit_order);
+	fprintf(tasks_file, "SubmitTime: %f\n", get_event_time_stamp(ev, options));
+	fprintf(tasks_file, "Handles: %lx\n", handle);
+	fprintf(tasks_file, "MPIRank: %d\n", options->file_rank);
+	fprintf(tasks_file, "\n");
+}
+
 static void handle_mpi_data_set_rank(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
 {
 	unsigned long handle = ev->param[0];
@@ -3380,6 +3395,10 @@ void _starpu_fxt_parse_new_file(char *filename_in, struct starpu_fxt_options *op
 				handle_data_coordinates(&ev, options);
 				break;
 
+			case _STARPU_FUT_DATA_WONT_USE:
+				handle_data_wont_use(&ev, options);
+				break;
+
 			case _STARPU_FUT_START_DRIVER_COPY:
 				if (!options->no_bus)
 					handle_start_driver_copy(&ev, options);