浏览代码

trace asynchronous source to sink transfers

Corentin Salingue 8 年之前
父节点
当前提交
475bd9b66d
共有 3 个文件被更改,包括 66 次插入29 次删除
  1. 25 14
      src/common/fxt.h
  2. 35 14
      src/debug/traces/starpu_fxt.c
  3. 6 1
      src/drivers/mp_common/source_common.c

+ 25 - 14
src/common/fxt.h

@@ -54,10 +54,10 @@
 
 #define _STARPU_FUT_UPDATE_TASK_CNT	0x5106
 
-#define _STARPU_FUT_START_FETCH_INPUT	0x5107
-#define _STARPU_FUT_END_FETCH_INPUT	0x5108
-#define _STARPU_FUT_START_PUSH_OUTPUT	0x5109
-#define _STARPU_FUT_END_PUSH_OUTPUT	0x5110
+#define _STARPU_FUT_START_FETCH_INPUT_ON_TID	0x5107
+#define _STARPU_FUT_END_FETCH_INPUT_ON_TID	0x5108
+#define _STARPU_FUT_START_PUSH_OUTPUT_ON_TID	0x5109
+#define _STARPU_FUT_END_PUSH_OUTPUT_ON_TID	0x5110
 
 #define _STARPU_FUT_TAG		0x5111
 #define _STARPU_FUT_TAG_DEPS	0x5112
@@ -104,8 +104,8 @@
 #define	_STARPU_FUT_START_DRIVER_COPY_ASYNC	0x5135
 #define	_STARPU_FUT_END_DRIVER_COPY_ASYNC	0x5136
 
-#define	_STARPU_FUT_START_PROGRESS	0x5137
-#define	_STARPU_FUT_END_PROGRESS		0x5138
+#define	_STARPU_FUT_START_PROGRESS_ON_TID	0x5137
+#define	_STARPU_FUT_END_PROGRESS_ON_TID		0x5138
 
 #define _STARPU_FUT_USER_EVENT		0x5139
 
@@ -151,8 +151,8 @@
 
 #define _STARPU_FUT_DATA_LOAD 0x5153
 
-#define _STARPU_FUT_START_UNPARTITION 0x5154
-#define _STARPU_FUT_END_UNPARTITION 0x5155
+#define _STARPU_FUT_START_UNPARTITION_ON_TID 0x5154
+#define _STARPU_FUT_END_UNPARTITION_ON_TID 0x5155
 
 #define	_STARPU_FUT_START_FREE		0x5156
 #define	_STARPU_FUT_END_FREE		0x5157
@@ -209,6 +209,9 @@
 #define _STARPU_FUT_HANDLE_DATA_REGISTER 0x517c
 #define _STARPU_FUT_DATA_INVALIDATE 0x517d
 
+#define _STARPU_FUT_START_FETCH_INPUT	0x517e
+#define _STARPU_FUT_END_FETCH_INPUT	0x517f
+
 #ifdef STARPU_USE_FXT
 #include <fxt/fxt.h>
 #include <fxt/fut.h>
@@ -525,16 +528,22 @@ do {									\
 	FUT_DO_PROBE2(_STARPU_FUT_UPDATE_TASK_CNT, counter, _starpu_gettid())
 
 #define _STARPU_TRACE_START_FETCH_INPUT(job)	\
-	FUT_DO_PROBE2(_STARPU_FUT_START_FETCH_INPUT, job, _starpu_gettid());
+	FUT_DO_PROBE2(_STARPU_FUT_START_FETCH_INPUT_ON_TID, job, _starpu_gettid());
 
 #define _STARPU_TRACE_END_FETCH_INPUT(job)	\
-	FUT_DO_PROBE2(_STARPU_FUT_END_FETCH_INPUT, job, _starpu_gettid());
+	FUT_DO_PROBE2(_STARPU_FUT_END_FETCH_INPUT_ON_TID, job, _starpu_gettid());
 
 #define _STARPU_TRACE_START_PUSH_OUTPUT(job)	\
-	FUT_DO_PROBE2(_STARPU_FUT_START_PUSH_OUTPUT, job, _starpu_gettid());
+	FUT_DO_PROBE2(_STARPU_FUT_START_PUSH_OUTPUT_ON_TID, job, _starpu_gettid());
 
 #define _STARPU_TRACE_END_PUSH_OUTPUT(job)	\
-	FUT_DO_PROBE2(_STARPU_FUT_END_PUSH_OUTPUT, job, _starpu_gettid());
+	FUT_DO_PROBE2(_STARPU_FUT_END_PUSH_OUTPUT_ON_TID, job, _starpu_gettid());
+
+#define _STARPU_TRACE_WORKER_END_FETCH_INPUT(job, id)	\
+	FUT_DO_PROBE2(_STARPU_FUT_END_FETCH_INPUT, job, id);
+
+#define _STARPU_TRACE_WORKER_START_FETCH_INPUT(job, id)	\
+	FUT_DO_PROBE2(_STARPU_FUT_START_FETCH_INPUT, job, id);
 
 #define _STARPU_TRACE_TAG(tag, job)	\
 	FUT_DO_PROBE2(_STARPU_FUT_TAG, tag, (job)->job_id)
@@ -900,10 +909,10 @@ do {										\
 	FUT_DO_PROBE2(_STARPU_FUT_DATA_LOAD, workerid, size);
 
 #define _STARPU_TRACE_START_UNPARTITION(handle, memnode)		\
-	FUT_DO_PROBE3(_STARPU_FUT_START_UNPARTITION, memnode, _starpu_gettid(), handle);
+	FUT_DO_PROBE3(_STARPU_FUT_START_UNPARTITION_ON_TID, memnode, _starpu_gettid(), handle);
 	
 #define _STARPU_TRACE_END_UNPARTITION(handle, memnode)		\
-	FUT_DO_PROBE3(_STARPU_FUT_END_UNPARTITION, memnode, _starpu_gettid(), handle);
+	FUT_DO_PROBE3(_STARPU_FUT_END_UNPARTITION_ON_TID, memnode, _starpu_gettid(), handle);
 
 #define _STARPU_TRACE_SCHED_COMPONENT_PUSH_PRIO(workerid, ntasks, exp_len)		\
 	FUT_DO_PROBE4(_STARPU_FUT_SCHED_COMPONENT_PUSH_PRIO, _starpu_gettid(), workerid, ntasks, exp_len);
@@ -1042,6 +1051,8 @@ do {										\
 #define _STARPU_TRACE_SCHED_COMPONENT_PULL(from, to, task)	do {} while (0)
 #define _STARPU_TRACE_HANDLE_DATA_REGISTER(handle)	do {} while (0)
 #define _STARPU_TRACE_DATA_INVALIDATE(handle, node)	do {} while (0)
+#define _STARPU_TRACE_WORKER_START_FETCH_INPUT(job, id)	do {} while(0)
+#define _STARPU_TRACE_WORKER_END_FETCH_INPUT(job, id)	do {} while(0)
 
 #endif // STARPU_USE_FXT
 

+ 35 - 14
src/debug/traces/starpu_fxt.c

@@ -1516,7 +1516,7 @@ static void handle_hypervisor_end(struct fxt_ev_64 *ev, struct starpu_fxt_option
 	}
 }
 
-static void handle_worker_status(struct fxt_ev_64 *ev, struct starpu_fxt_options *options, const char *newstatus)
+static void handle_worker_status_on_tid(struct fxt_ev_64 *ev, struct starpu_fxt_options *options, const char *newstatus)
 {
 	int worker;
 	worker = find_worker_id(ev->param[1]);
@@ -1529,6 +1529,19 @@ static void handle_worker_status(struct fxt_ev_64 *ev, struct starpu_fxt_options
 		recfmt_thread_set_state(get_event_time_stamp(ev, options), ev->param[1], newstatus, "Runtime");
 }
 
+static void handle_worker_status(struct fxt_ev_64 *ev, struct starpu_fxt_options *options, const char *newstatus)
+{
+	int worker;
+	worker = ev->param[1];
+	if (worker < 0)
+		return;
+
+	if (out_paje_file)
+		thread_set_state(get_event_time_stamp(ev, options), options->file_prefix, ev->param[1], newstatus);
+	if (trace_file)
+		recfmt_thread_set_state(get_event_time_stamp(ev, options), ev->param[1], newstatus, "Runtime");
+}
+
 static double last_sleep_start[STARPU_NMAXWORKERS];
 
 static void handle_worker_scheduling_start(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
@@ -2591,25 +2604,33 @@ void _starpu_fxt_parse_new_file(char *filename_in, struct starpu_fxt_options *op
 				break;
 
 			/* check the memory transfer overhead */
-			case _STARPU_FUT_START_FETCH_INPUT:
-				handle_worker_status(&ev, options, "Fi");
+			case _STARPU_FUT_START_FETCH_INPUT_ON_TID:
+				handle_worker_status_on_tid(&ev, options, "Fi");
 				break;
-			case _STARPU_FUT_START_PUSH_OUTPUT:
-				handle_worker_status(&ev, options, "Po");
+			case _STARPU_FUT_START_PUSH_OUTPUT_ON_TID:
+				handle_worker_status_on_tid(&ev, options, "Po");
 				break;
-			case _STARPU_FUT_START_PROGRESS:
-				handle_worker_status(&ev, options, "P");
+			case _STARPU_FUT_START_PROGRESS_ON_TID:
+				handle_worker_status_on_tid(&ev, options, "P");
 				break;
-			case _STARPU_FUT_START_UNPARTITION:
-				handle_worker_status(&ev, options, "U");
+			case _STARPU_FUT_START_UNPARTITION_ON_TID:
+				handle_worker_status_on_tid(&ev, options, "U");
 				break;
-			case _STARPU_FUT_END_FETCH_INPUT:
-			case _STARPU_FUT_END_PROGRESS:
-			case _STARPU_FUT_END_PUSH_OUTPUT:
-			case _STARPU_FUT_END_UNPARTITION:
-				handle_worker_status(&ev, options, "B");
+			case _STARPU_FUT_END_FETCH_INPUT_ON_TID:
+			case _STARPU_FUT_END_PROGRESS_ON_TID:
+			case _STARPU_FUT_END_PUSH_OUTPUT_ON_TID:
+			case _STARPU_FUT_END_UNPARTITION_ON_TID:
+				handle_worker_status_on_tid(&ev, options, "B");
 				break;
 
+            case _STARPU_FUT_START_FETCH_INPUT:
+                handle_worker_status(&ev, options, "Fi");
+                break;
+
+            case _STARPU_FUT_END_FETCH_INPUT:
+                handle_worker_status(&ev, options, "B");
+                break;
+
 			case _STARPU_FUT_WORKER_SCHEDULING_START:
 				handle_worker_scheduling_start(&ev, options);
 				break;

+ 6 - 1
src/drivers/mp_common/source_common.c

@@ -964,7 +964,11 @@ static void _starpu_src_common_worker_internal_work(struct _starpu_worker_set *
         /* We send all buffers to execute the task */
         if (worker_set->workers[i].task_sending != NULL && worker_set->workers[i].nb_buffers_sent == STARPU_TASK_GET_NBUFFERS(worker_set->workers[i].task_sending))
         {
+            int workerid = worker_set->workers[i].workerid;
+
             STARPU_RMB();
+	        _STARPU_TRACE_WORKER_END_FETCH_INPUT(NULL, workerid);
+
             unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(worker_set->workers[i].task_sending);
             unsigned buf;
             for (buf = 0; buf < nbuffers; buf++)
@@ -1041,13 +1045,14 @@ static void _starpu_src_common_worker_internal_work(struct _starpu_worker_set *
         {
             if(tasks[i] != NULL)
             {
+                int workerid = worker_set->workers[i].workerid;
+                _STARPU_TRACE_WORKER_START_FETCH_INPUT(NULL, workerid);
                 unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(tasks[i]);
 
                 for (buf = 0; buf < nbuffers; buf++)
                 {
                     starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(tasks[i], buf);
                     enum starpu_data_access_mode mode = STARPU_TASK_GET_MODE(tasks[i], buf);
-                    int workerid = starpu_worker_get_id_check();
                     struct _starpu_data_replicate *local_replicate = get_replicate(handle, mode, workerid, memnode);
 
                     int ret = _starpu_fetch_data_on_node(handle, memnode, local_replicate, mode, 0, 0, 1,