Przeglądaj źródła

consider the nodeid for hash indexing when storing alias for mpi, worker and thread containers

details:
- this fix starpu_fxt_tool when converting large-scale traces (with more than 4 nodes)
- avoid conflicts when storing info for threads, workers and MPI processes
Lucas Schnorr 8 lat temu
rodzic
commit
0a2947df09
1 zmienionych plików z 97 dodań i 66 usunięć
  1. 97 66
      src/debug/traces/starpu_fxt.c

+ 97 - 66
src/debug/traces/starpu_fxt.c

@@ -565,10 +565,12 @@ struct worker_entry
 	int sync; /* Set only for workers which are part of the same set, i.e. on thread drivers several workers */
 } *worker_ids;
 
-static int register_thread(unsigned long tid, int workerid, int sync)
+static int register_thread(unsigned long nodeid, unsigned long tid, int workerid, int sync)
 {
 	struct worker_entry *entry = NULL;
 
+	tid = nodeid*tid+tid;
+
 	HASH_FIND(hh, worker_ids, &tid, sizeof(tid), entry);
 
 	/* only register a thread once */
@@ -584,18 +586,27 @@ static int register_thread(unsigned long tid, int workerid, int sync)
 	return 1;
 }
 
-static int register_worker_id(unsigned long tid, int workerid, int sync)
+static int register_worker_id(unsigned long nodeid, unsigned long tid, int workerid, int sync)
 {
 	nworkers++;
 	STARPU_ASSERT_MSG(workerid < STARPU_NMAXWORKERS, "Too many workers in this trace, please increase in ./configure invocation the maximum number of CPUs and GPUs to the same value as was used for execution");
 
-	return register_thread(tid, workerid, sync);
+	return register_thread(nodeid, tid, workerid, sync);
+}
+static int prefixTOnodeid (const char *prefix)
+{
+	char *str = malloc(sizeof(char)*strlen(prefix));
+	strncpy (str, prefix, strlen(prefix));
+	str[strlen(prefix)-1] = '\0';
+	unsigned long nodeid = atoi(str);
+	free(str);
+	return nodeid;
 }
 
 /* Register user threads if not done already */
 static void register_user_thread(double timestamp, unsigned long tid, const char *prefix)
 {
-	if (register_thread(tid, -1, 0) && out_paje_file)
+	if (register_thread(prefixTOnodeid(prefix), tid, -1, 0) && out_paje_file)
 	{
 #ifdef STARPU_HAVE_POTI
 		char program_container[STARPU_POTI_STR_LEN];
@@ -612,16 +623,18 @@ static void register_user_thread(double timestamp, unsigned long tid, const char
 	}
 }
 
-static void register_mpi_thread(unsigned long tid)
+static void register_mpi_thread(unsigned long nodeid, unsigned long tid)
 {
-	int ret = register_thread(tid, -2, 0);
+	int ret = register_thread(nodeid, tid, -2, 0);
 	STARPU_ASSERT(ret == 1);
 }
 
-static int find_worker_id(unsigned long tid)
+static int find_worker_id(unsigned long nodeid, unsigned long tid)
 {
 	struct worker_entry *entry;
 
+	tid = nodeid*tid+tid;
+
 	HASH_FIND(hh, worker_ids, &tid, sizeof(tid), entry);
 	if (!entry)
 		return -1;
@@ -629,10 +642,12 @@ static int find_worker_id(unsigned long tid)
 	return entry->workerid;
 }
 
-static int find_sync(unsigned long tid)
+static int find_sync(unsigned long nodeid, unsigned long tid)
 {
 	struct worker_entry *entry;
 
+	tid = nodeid*tid+tid;
+
 	HASH_FIND(hh, worker_ids, &tid, sizeof(tid), entry);
 	if (!entry)
 		return 0;
@@ -734,9 +749,9 @@ static void worker_pop_state(double time, const char *prefix, long unsigned int
 
 static void thread_set_state(double time, const char *prefix, long unsigned int threadid, const char *name)
 {
-	if (find_sync(threadid))
+	if (find_sync(prefixTOnodeid(prefix), threadid))
 		/* Unless using worker sets, collapse thread and worker */
-		return worker_set_state(time, prefix, find_worker_id(threadid), name);
+		return worker_set_state(time, prefix, find_worker_id(prefixTOnodeid(prefix), threadid), name);
 
 #ifdef STARPU_HAVE_POTI
 	char container[STARPU_POTI_STR_LEN];
@@ -788,9 +803,9 @@ static void user_thread_pop_state(double time, const char *prefix, long unsigned
 
 static void thread_push_state(double time, const char *prefix, long unsigned int threadid, const char *name)
 {
-	if (find_sync(threadid))
+	if (find_sync(prefixTOnodeid(prefix), threadid))
 		/* Unless using worker sets, collapse thread and worker */
-		return worker_push_state(time, prefix, find_worker_id(threadid), name);
+		return worker_push_state(time, prefix, find_worker_id(prefixTOnodeid(prefix), threadid), name);
 
 #ifdef STARPU_HAVE_POTI
 	char container[STARPU_POTI_STR_LEN];
@@ -803,9 +818,9 @@ static void thread_push_state(double time, const char *prefix, long unsigned int
 
 static void thread_pop_state(double time, const char *prefix, long unsigned int threadid)
 {
-	if (find_sync(threadid))
+	if (find_sync(prefixTOnodeid(prefix), threadid))
 		/* Unless using worker sets, collapse thread and worker */
-		return worker_pop_state(time, prefix, find_worker_id(threadid));
+		return worker_pop_state(time, prefix, find_worker_id(prefixTOnodeid(prefix), threadid));
 
 #ifdef STARPU_HAVE_POTI
 	char container[STARPU_POTI_STR_LEN];
@@ -934,7 +949,7 @@ static void recfmt_worker_set_state(double time, int workerid, const char *name,
 	recfmt_set_state(time, workerid, -1, state_name, type);
 }
 
-static void recfmt_thread_set_state(double time, long unsigned int threadid, const char *name, const char *type)
+static void recfmt_thread_set_state(double time, unsigned long nodeid, long unsigned int threadid, const char *name, const char *type)
 {
 	const char *state_name;
 
@@ -944,18 +959,18 @@ static void recfmt_thread_set_state(double time, long unsigned int threadid, con
 	else
 		state_name = get_state_name(name, THREAD_STATE);
 
-	recfmt_set_state(time, find_worker_id(threadid), threadid, state_name, type);
+	recfmt_set_state(time, find_worker_id(nodeid, threadid), threadid, state_name, type);
 }
 
-static void recfmt_thread_push_state(double time, long unsigned int threadid, const char *name, const char *type)
+static void recfmt_thread_push_state(double time, unsigned long nodeid, long unsigned int threadid, const char *name, const char *type)
 {
 	const char *state_name = get_state_name(name, THREAD_STATE);
-	recfmt_push_state(time, find_worker_id(threadid), threadid, state_name, type);
+	recfmt_push_state(time, find_worker_id(nodeid, threadid), threadid, state_name, type);
 }
 
-static void recfmt_thread_pop_state(double time, long unsigned int threadid)
+static void recfmt_thread_pop_state(double time, unsigned long nodeid, long unsigned int threadid)
 {
-	recfmt_pop_state(time, find_worker_id(threadid), threadid);
+	recfmt_pop_state(time, find_worker_id(nodeid, threadid), threadid);
 }
 
 static void recfmt_mpicommthread_set_state(double time, const char *name)
@@ -1059,7 +1074,7 @@ static void handle_worker_init_start(struct fxt_ev_64 *ev, struct starpu_fxt_opt
 	long unsigned int threadid = ev->param[6];
 	int new_thread;
 
-	new_thread = register_worker_id(threadid, workerid, set);
+	new_thread = register_worker_id(prefixTOnodeid(prefix), threadid, workerid, set);
 
 	char *kindstr = "";
 	struct starpu_perfmodel_arch arch;
@@ -1161,7 +1176,7 @@ static void handle_worker_init_start(struct fxt_ev_64 *ev, struct starpu_fxt_opt
 	if (out_paje_file)
 		thread_set_state(now, prefix, threadid, "In");
 	if (trace_file)
-		recfmt_thread_set_state(now, threadid, "In", "Runtime");
+		recfmt_thread_set_state(now, prefixTOnodeid(prefix), threadid, "In", "Runtime");
 
 	if (activity_file)
 		fprintf(activity_file, "name\t%d\t%s %d\n", workerid, kindstr, devid);
@@ -1178,7 +1193,7 @@ static void handle_worker_init_end(struct fxt_ev_64 *ev, struct starpu_fxt_optio
 
 	if (ev->nb_params < 2)
 	{
-		worker = find_worker_id(ev->param[0]);
+		worker = find_worker_id(prefixTOnodeid(prefix), ev->param[0]);
 		STARPU_ASSERT(worker >= 0);
 	}
 	else
@@ -1187,7 +1202,7 @@ static void handle_worker_init_end(struct fxt_ev_64 *ev, struct starpu_fxt_optio
 	if (out_paje_file)
 		thread_set_state(get_event_time_stamp(ev, options), prefix, ev->param[0], "B");
 	if (trace_file)
-		recfmt_thread_set_state(get_event_time_stamp(ev, options), ev->param[0], "B", "Runtime");
+		recfmt_thread_set_state(get_event_time_stamp(ev, options), prefixTOnodeid(prefix), ev->param[0], "B", "Runtime");
 
 	if (out_paje_file)
 		worker_set_state(get_event_time_stamp(ev, options), prefix, worker, "I");
@@ -1208,7 +1223,7 @@ static void handle_worker_deinit_start(struct fxt_ev_64 *ev, struct starpu_fxt_o
 	if (out_paje_file)
 		thread_set_state(get_event_time_stamp(ev, options), prefix, threadid, "D");
 	if (trace_file)
-		recfmt_thread_set_state(get_event_time_stamp(ev, options), threadid, "D", "Runtime");
+		recfmt_thread_set_state(get_event_time_stamp(ev, options), prefixTOnodeid(prefix), threadid, "D", "Runtime");
 }
 
 static void handle_worker_deinit_end(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
@@ -1227,7 +1242,7 @@ static void handle_worker_deinit_end(struct fxt_ev_64 *ev, struct starpu_fxt_opt
 #endif
 	}
 	if (trace_file)
-		recfmt_thread_set_state(get_event_time_stamp(ev, options), ev->param[1], "End", NULL);
+		recfmt_thread_set_state(get_event_time_stamp(ev, options), prefixTOnodeid(prefix), ev->param[1], "End", NULL);
 }
 
 #ifdef STARPU_HAVE_POTI
@@ -1657,10 +1672,10 @@ static void handle_start_executing(struct fxt_ev_64 *ev, struct starpu_fxt_optio
 	char *prefix = options->file_prefix;
 	long unsigned int threadid = ev->param[0];
 
-	if (out_paje_file && !find_sync(threadid))
+	if (out_paje_file && !find_sync(prefixTOnodeid(prefix), threadid))
 		thread_set_state(get_event_time_stamp(ev, options), prefix, threadid, "E");
-	if (trace_file && !find_sync(threadid))
-		recfmt_thread_set_state(get_event_time_stamp(ev, options), threadid, "E", "Runtime");
+	if (trace_file && !find_sync(prefixTOnodeid(prefix), threadid))
+		recfmt_thread_set_state(get_event_time_stamp(ev, options), prefixTOnodeid(prefix), threadid, "E", "Runtime");
 }
 
 static void handle_end_executing(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
@@ -1668,10 +1683,10 @@ static void handle_end_executing(struct fxt_ev_64 *ev, struct starpu_fxt_options
 	char *prefix = options->file_prefix;
 	long unsigned int threadid = ev->param[0];
 
-	if (out_paje_file && !find_sync(threadid))
+	if (out_paje_file && !find_sync(prefixTOnodeid(prefix), threadid))
 		thread_set_state(get_event_time_stamp(ev, options), prefix, threadid, "B");
-	if (trace_file && !find_sync(threadid))
-		recfmt_thread_set_state(get_event_time_stamp(ev, options), threadid, "B", "Runtime");
+	if (trace_file && !find_sync(prefixTOnodeid(prefix), threadid))
+		recfmt_thread_set_state(get_event_time_stamp(ev, options), prefixTOnodeid(prefix), threadid, "B", "Runtime");
 }
 
 static void handle_user_event(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
@@ -1686,7 +1701,7 @@ static void handle_user_event(struct fxt_ev_64 *ev, struct starpu_fxt_options *o
 	char *prefix = options->file_prefix;
 	double now = get_event_time_stamp(ev, options);
 
-	worker = find_worker_id(ev->param[1]);
+	worker = find_worker_id(prefixTOnodeid(prefix), ev->param[1]);
 	if (worker < 0)
 	{
 		if (out_paje_file)
@@ -1714,13 +1729,14 @@ static void handle_user_event(struct fxt_ev_64 *ev, struct starpu_fxt_options *o
 static void handle_start_callback(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
 {
 	int worker;
-	worker = find_worker_id(ev->param[1]);
+	char *prefix = options->file_prefix;
+	worker = find_worker_id(prefixTOnodeid(prefix), ev->param[1]);
 	if (worker >= 0)
 	{
 		if (out_paje_file)
 			thread_set_state(get_event_time_stamp(ev, options), options->file_prefix, ev->param[1], "C");
 		if (trace_file)
-			recfmt_thread_set_state(get_event_time_stamp(ev, options), ev->param[1], "C", "Runtime");
+			recfmt_thread_set_state(get_event_time_stamp(ev, options), prefixTOnodeid(prefix), ev->param[1], "C", "Runtime");
 	}
 	else if (worker == -2)
 	{
@@ -1738,14 +1754,14 @@ static void handle_start_callback(struct fxt_ev_64 *ev, struct starpu_fxt_option
 static void handle_end_callback(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
 {
 	int worker;
-	worker = find_worker_id(ev->param[1]);
-
+	char *prefix = options->file_prefix;
+	worker = find_worker_id(prefixTOnodeid(prefix), ev->param[1]);
 	if (worker >= 0)
 	{
 		if (out_paje_file)
 			thread_set_state(get_event_time_stamp(ev, options), options->file_prefix, ev->param[1], "B");
 		if (trace_file)
-			recfmt_thread_set_state(get_event_time_stamp(ev, options), ev->param[1], "B", "Runtime");
+			recfmt_thread_set_state(get_event_time_stamp(ev, options), prefixTOnodeid(prefix), ev->param[1], "B", "Runtime");
 	}
 	else if (worker == -2)
 	{
@@ -1763,13 +1779,14 @@ static void handle_end_callback(struct fxt_ev_64 *ev, struct starpu_fxt_options
 static void handle_hypervisor_begin(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
 {
 	int worker;
-	worker = find_worker_id(ev->param[0]);
+	char *prefix = options->file_prefix;
+	worker = find_worker_id(prefixTOnodeid(prefix), ev->param[0]);
 	if (worker >= 0)
 	{
 		if (out_paje_file)
 			thread_set_state(get_event_time_stamp(ev, options), options->file_prefix, ev->param[0], "H");
 		if (trace_file)
-			recfmt_thread_set_state(get_event_time_stamp(ev, options), ev->param[0], "H", "Runtime");
+			recfmt_thread_set_state(get_event_time_stamp(ev, options), prefixTOnodeid(prefix), ev->param[0], "H", "Runtime");
 	}
 	else if (worker == -2)
 	{
@@ -1787,13 +1804,14 @@ static void handle_hypervisor_begin(struct fxt_ev_64 *ev, struct starpu_fxt_opti
 static void handle_hypervisor_end(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
 {
 	int worker;
-	worker = find_worker_id(ev->param[0]);
+	char *prefix = options->file_prefix;
+	worker = find_worker_id(prefixTOnodeid(prefix), ev->param[0]);
 	if (worker >= 0)
 	{
 		if (out_paje_file)
 			thread_set_state(get_event_time_stamp(ev, options), options->file_prefix, ev->param[0], "B");
 		if (trace_file)
-			recfmt_thread_set_state(get_event_time_stamp(ev, options), ev->param[0], "B", "Runtime");
+			recfmt_thread_set_state(get_event_time_stamp(ev, options), prefixTOnodeid(prefix), ev->param[0], "B", "Runtime");
 	}
 	else if (worker == -2)
 	{
@@ -1811,14 +1829,15 @@ static void handle_hypervisor_end(struct fxt_ev_64 *ev, struct starpu_fxt_option
 static void handle_worker_status_on_tid(struct fxt_ev_64 *ev, struct starpu_fxt_options *options, const char *newstatus)
 {
 	int worker;
-	worker = find_worker_id(ev->param[1]);
+	char *prefix = options->file_prefix;
+	worker = find_worker_id(prefixTOnodeid(prefix), ev->param[1]);
 	if (worker < 0)
 		return;
 
 	if (out_paje_file)
 		thread_set_state(get_event_time_stamp(ev, options), options->file_prefix, ev->param[1], newstatus);
 	if (trace_file)
-		recfmt_thread_set_state(get_event_time_stamp(ev, options), ev->param[1], newstatus, "Runtime");
+		recfmt_thread_set_state(get_event_time_stamp(ev, options), prefixTOnodeid(prefix), ev->param[1], newstatus, "Runtime");
 }
 
 static void handle_worker_status(struct fxt_ev_64 *ev, struct starpu_fxt_options *options, const char *newstatus)
@@ -1839,55 +1858,60 @@ static double last_sleep_start[STARPU_NMAXWORKERS];
 static void handle_worker_scheduling_start(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
 {
 	int worker;
-	worker = find_worker_id(ev->param[0]);
+	char *prefix = options->file_prefix;
+	worker = find_worker_id(prefixTOnodeid(prefix), ev->param[0]);
 	if (worker < 0) return;
 
 	if (out_paje_file)
 		thread_set_state(get_event_time_stamp(ev, options), options->file_prefix, ev->param[0], "Sc");
 	if (trace_file)
-		recfmt_thread_set_state(get_event_time_stamp(ev, options), ev->param[0], "Sc", "Runtime");
+		recfmt_thread_set_state(get_event_time_stamp(ev, options), prefixTOnodeid(prefix), ev->param[0], "Sc", "Runtime");
 }
 
 static void handle_worker_scheduling_end(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
 {
 	int worker;
-	worker = find_worker_id(ev->param[0]);
+	char *prefix = options->file_prefix;
+	worker = find_worker_id(prefixTOnodeid(prefix), ev->param[0]);
 	if (worker < 0) return;
 
 	if (out_paje_file)
 		thread_set_state(get_event_time_stamp(ev, options), options->file_prefix, ev->param[0], "B");
 	if (trace_file)
-		recfmt_thread_set_state(get_event_time_stamp(ev, options), ev->param[0], "B", "Runtime");
+		recfmt_thread_set_state(get_event_time_stamp(ev, options), prefixTOnodeid(prefix), ev->param[0], "B", "Runtime");
 }
 
 static void handle_worker_scheduling_push(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
 {
 	int worker;
-	worker = find_worker_id(ev->param[0]);
+	char *prefix = options->file_prefix;
+	worker = find_worker_id(prefixTOnodeid(prefix), ev->param[0]);
 	if (worker < 0) return;
 
 	if (out_paje_file)
 		thread_push_state(get_event_time_stamp(ev, options), options->file_prefix, ev->param[0], "Sc");
 	if (trace_file)
-		recfmt_thread_push_state(get_event_time_stamp(ev, options), ev->param[0], "Sc", "Runtime");
+		recfmt_thread_push_state(get_event_time_stamp(ev, options), prefixTOnodeid(prefix), ev->param[0], "Sc", "Runtime");
 }
 
 static void handle_worker_scheduling_pop(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
 {
 	int worker;
-	worker = find_worker_id(ev->param[0]);
+	char *prefix = options->file_prefix;
+	worker = find_worker_id(prefixTOnodeid(prefix), ev->param[0]);
 	if (worker < 0) return;
 
 	if (out_paje_file)
 		thread_pop_state(get_event_time_stamp(ev, options), options->file_prefix, ev->param[0]);
 	if (trace_file)
-		recfmt_thread_pop_state(get_event_time_stamp(ev, options), ev->param[0]);
+		recfmt_thread_pop_state(get_event_time_stamp(ev, options), prefixTOnodeid(prefix), ev->param[0]);
 }
 
 static void handle_worker_sleep_start(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
 {
 	int worker;
-	worker = find_worker_id(ev->param[0]);
+	char *prefix = options->file_prefix;
+	worker = find_worker_id(prefixTOnodeid(prefix), ev->param[0]);
 	if (worker < 0) return;
 
 	double start_sleep_time = get_event_time_stamp(ev, options);
@@ -1896,13 +1920,14 @@ static void handle_worker_sleep_start(struct fxt_ev_64 *ev, struct starpu_fxt_op
 	if (out_paje_file)
 		thread_set_state(get_event_time_stamp(ev, options), options->file_prefix, ev->param[0], "Sl");
 	if (trace_file)
-		recfmt_thread_set_state(get_event_time_stamp(ev, options), ev->param[0], "Sl", "Other");
+		recfmt_thread_set_state(get_event_time_stamp(ev, options), prefixTOnodeid(prefix), ev->param[0], "Sl", "Other");
 }
 
 static void handle_worker_sleep_end(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
 {
 	int worker;
-	worker = find_worker_id(ev->param[0]);
+	char *prefix = options->file_prefix;
+	worker = find_worker_id(prefixTOnodeid(prefix), ev->param[0]);
 	if (worker < 0) return;
 
 	double end_sleep_timestamp = get_event_time_stamp(ev, options);
@@ -1910,7 +1935,7 @@ static void handle_worker_sleep_end(struct fxt_ev_64 *ev, struct starpu_fxt_opti
 	if (out_paje_file)
 		thread_set_state(end_sleep_timestamp, options->file_prefix, ev->param[0], "B");
 	if (trace_file)
-		recfmt_thread_set_state(end_sleep_timestamp, ev->param[0], "B", "Runtime");
+		recfmt_thread_set_state(end_sleep_timestamp, prefixTOnodeid(prefix), ev->param[0], "B", "Runtime");
 
 	double sleep_length = end_sleep_timestamp - last_sleep_start[worker];
 
@@ -2208,9 +2233,9 @@ static void handle_used_mem(struct fxt_ev_64 *ev, struct starpu_fxt_options *opt
 
 static void handle_task_submit_event(struct fxt_ev_64 *ev, struct starpu_fxt_options *options, unsigned long tid, const char *eventstr)
 {
-	int workerid = find_worker_id(tid);
-	double timestamp = get_event_time_stamp(ev, options);
 	char *prefix = options->file_prefix;
+	int workerid = find_worker_id(prefixTOnodeid(prefix), tid);
+	double timestamp = get_event_time_stamp(ev, options);
 
 	if (workerid >= 0)
 	{
@@ -2218,12 +2243,12 @@ static void handle_task_submit_event(struct fxt_ev_64 *ev, struct starpu_fxt_opt
 		if (eventstr)
 		{
 			thread_push_state(timestamp, prefix, tid, eventstr);
-			recfmt_thread_push_state(timestamp, tid, eventstr, "Runtime");
+			recfmt_thread_push_state(timestamp, prefixTOnodeid(prefix), tid, eventstr, "Runtime");
 		}
 		else
 		{
 			thread_pop_state(timestamp, prefix, tid);
-			recfmt_thread_pop_state(timestamp, tid);
+			recfmt_thread_pop_state(timestamp, prefixTOnodeid(prefix), tid);
 		}
 	}
 	else if (workerid == -2)
@@ -2325,15 +2350,17 @@ static void handle_component_connect(struct fxt_ev_64 *ev, struct starpu_fxt_opt
 
 static void handle_component_push(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
 {
+	char *prefix = options->file_prefix;
 	double current_timestamp = get_event_time_stamp(ev, options);
-	int workerid = find_worker_id(ev->param[0]);
+	int workerid = find_worker_id(prefixTOnodeid(prefix), ev->param[0]);
 	_starpu_fxt_component_push(anim_file, options, current_timestamp, workerid, ev->param[1], ev->param[2], ev->param[3], ev->param[4]);
 }
 
 static void handle_component_pull(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
 {
+	char *prefix = options->file_prefix;
 	double current_timestamp = get_event_time_stamp(ev, options);
-	int workerid = find_worker_id(ev->param[0]);
+	int workerid = find_worker_id(prefixTOnodeid(prefix), ev->param[0]);
 	_starpu_fxt_component_pull(anim_file, options, current_timestamp, workerid, ev->param[1], ev->param[2], ev->param[3], ev->param[4]);
 }
 
@@ -2426,6 +2453,8 @@ static void handle_task_submit(struct fxt_ev_64 *ev, struct starpu_fxt_options *
 
 static void handle_task_done(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
 {
+	char *prefix = options->file_prefix;
+
 	unsigned long job_id;
 	job_id = ev->param[0];
 
@@ -2433,7 +2462,7 @@ static void handle_task_done(struct fxt_ev_64 *ev, struct starpu_fxt_options *op
 	char *name = has_name?get_fxt_string(ev,4):"unknown";
 
         int worker;
-        worker = find_worker_id(ev->param[1]);
+        worker = find_worker_id(prefixTOnodeid(prefix), ev->param[1]);
 
 	const char *colour;
 	char buffer[32];
@@ -2462,6 +2491,8 @@ static void handle_task_done(struct fxt_ev_64 *ev, struct starpu_fxt_options *op
 
 static void handle_tag_done(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
 {
+	char *prefix = options->file_prefix;
+
 	uint64_t tag_id;
 	tag_id = ev->param[0];
 
@@ -2469,7 +2500,7 @@ static void handle_tag_done(struct fxt_ev_64 *ev, struct starpu_fxt_options *opt
 	char *name = has_name?get_fxt_string(ev,3):"unknown";
 
         int worker;
-        worker = find_worker_id(ev->param[1]);
+        worker = find_worker_id(prefixTOnodeid(prefix), ev->param[1]);
 
 	const char *colour;
 	char buffer[32];
@@ -2515,7 +2546,7 @@ static void handle_mpi_start(struct fxt_ev_64 *ev, struct starpu_fxt_options *op
 
 	char *prefix = options->file_prefix;
 
-	register_mpi_thread(ev->param[2]);
+	register_mpi_thread(prefixTOnodeid(prefix), ev->param[2]);
 
 	if (out_paje_file)
 	{
@@ -4015,7 +4046,7 @@ void starpu_fxt_write_data_trace(char *filename_in)
 		switch (ev.code)
 		{
 		case _STARPU_FUT_WORKER_INIT_START:
-			register_worker_id(ev.param[6], ev.param[1], ev.param[5]);
+			register_worker_id(0 /* TODO: Add nodeid here instead */, ev.param[6], ev.param[1], ev.param[5]);
 			break;
 
 		case _STARPU_FUT_START_CODELET_BODY: