Explorar o código

Collapse thread and worker for CPU workers, since we are always synchronous there

Samuel Thibault %!s(int64=10) %!d(string=hai) anos
pai
achega
ebf7530d52

+ 4 - 4
src/common/fxt.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2009-2014  Université de Bordeaux
+ * Copyright (C) 2009-2015  Université de Bordeaux
  * Copyright (C) 2010, 2011, 2012, 2013, 2014  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -408,8 +408,8 @@ do {									\
 #define _STARPU_TRACE_NEW_MEM_NODE(nodeid)			\
 	FUT_DO_PROBE2(_STARPU_FUT_NEW_MEM_NODE, nodeid, _starpu_gettid());
 
-#define _STARPU_TRACE_WORKER_INIT_START(workerkind, workerid, devid, memnode)	\
-	FUT_DO_PROBE5(_STARPU_FUT_WORKER_INIT_START, workerkind, workerid, devid, memnode, _starpu_gettid());
+#define _STARPU_TRACE_WORKER_INIT_START(workerkind, workerid, devid, memnode, sync)	\
+	FUT_DO_PROBE6(_STARPU_FUT_WORKER_INIT_START, workerkind, workerid, devid, memnode, sync, _starpu_gettid());
 
 #define _STARPU_TRACE_WORKER_INIT_END(__workerid)				\
 	FUT_DO_PROBE2(_STARPU_FUT_WORKER_INIT_END, _starpu_gettid(), (__workerid));
@@ -818,7 +818,7 @@ do {										\
 
 /* Dummy macros in case FxT is disabled */
 #define _STARPU_TRACE_NEW_MEM_NODE(nodeid)	do {} while(0)
-#define _STARPU_TRACE_WORKER_INIT_START(a,b,c)	do {} while(0)
+#define _STARPU_TRACE_WORKER_INIT_START(a,b,c,d,e)	do {} while(0)
 #define _STARPU_TRACE_WORKER_INIT_END(workerid)	do {} while(0)
 #define _STARPU_TRACE_START_CODELET_BODY(job, nimpl, perf_arch, workerid)	do {} while(0)
 #define _STARPU_TRACE_END_CODELET_BODY(job, nimpl, perf_arch, workerid)	do {} while(0)

+ 11 - 4
src/core/workers.c

@@ -557,7 +557,16 @@ static void _starpu_worker_init(struct _starpu_worker *workerarg, struct _starpu
 	/* cpu_set/hwloc_cpu_set initialized in topology.c */
 }
 
-void _starpu_worker_start(struct _starpu_worker *worker, unsigned fut_key)
+#ifdef STARPU_USE_FXT
+void _starpu_worker_start(struct _starpu_worker *worker, unsigned fut_key, unsigned sync)
+{
+	unsigned devid = worker->devid;
+	unsigned memnode = worker->memory_node;
+	_STARPU_TRACE_WORKER_INIT_START(fut_key, worker->workerid, devid, memnode, sync);
+}
+#endif
+
+void _starpu_driver_start(struct _starpu_worker *worker, unsigned fut_key, unsigned sync)
 {
 	(void) fut_key;
 	int devid = worker->devid;
@@ -569,9 +578,7 @@ void _starpu_worker_start(struct _starpu_worker *worker, unsigned fut_key)
 
 #ifdef STARPU_USE_FXT
 	_starpu_fxt_register_thread(worker->bindid);
-
-	unsigned memnode = worker->memory_node;
-	_STARPU_TRACE_WORKER_INIT_START(fut_key, worker->workerid, devid, memnode);
+	_starpu_worker_start(worker, fut_key, sync);
 #endif
 
 	_starpu_bind_thread_on_cpu(worker->config, worker->bindid);

+ 4 - 2
src/core/workers.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2009-2014  Université de Bordeaux
+ * Copyright (C) 2009-2015  Université de Bordeaux
  * Copyright (C) 2010, 2011, 2012, 2013, 2014  Centre National de la Recherche Scientifique
  * Copyright (C) 2011  INRIA
  *
@@ -384,8 +384,10 @@ unsigned _starpu_worker_can_block(unsigned memnode, struct _starpu_worker *worke
  * */
 void _starpu_block_worker(int workerid, starpu_pthread_cond_t *cond, starpu_pthread_mutex_t *mutex);
 
+/* This function initializes the current driver for the given worker */
+void _starpu_driver_start(struct _starpu_worker *worker, unsigned fut_key, unsigned sync);
 /* This function initializes the current thread for the given worker */
-void _starpu_worker_start(struct _starpu_worker *worker, unsigned fut_key);
+void _starpu_worker_start(struct _starpu_worker *worker, unsigned fut_key, unsigned sync);
 
 /* The _starpu_worker structure describes all the state of a StarPU worker.
  * This function sets the pthread key which stores a pointer to this structure.

+ 59 - 11
src/debug/traces/starpu_fxt.c

@@ -184,9 +184,10 @@ struct worker_entry
 	UT_hash_handle hh;
 	unsigned long tid;
 	int workerid;
+	int sync;
 } *worker_ids;
 
-static int register_worker_id(unsigned long tid, int workerid)
+static int register_worker_id(unsigned long tid, int workerid, int sync)
 {
 	nworkers++;
 	struct worker_entry *entry;
@@ -202,6 +203,7 @@ static int register_worker_id(unsigned long tid, int workerid)
 	entry = malloc(sizeof(*entry));
 	entry->tid = tid;
 	entry->workerid = workerid;
+	entry->sync = sync;
 
 	HASH_ADD(hh, worker_ids, tid, sizeof(tid), entry);
 	return 1;
@@ -218,6 +220,17 @@ static int find_worker_id(unsigned long tid)
 	return entry->workerid;
 }
 
+static int find_sync(unsigned long tid)
+{
+	struct worker_entry *entry;
+
+	HASH_FIND(hh, worker_ids, &tid, sizeof(tid), entry);
+	if (!entry)
+		return 0;
+
+	return entry->sync;
+}
+
 static void update_accumulated_time(int worker, double sleep_time, double exec_time, double current_timestamp, int forceflush)
 {
 	accumulated_sleep_time[worker] += sleep_time;
@@ -296,30 +309,60 @@ static void memnode_set_state(double time, const char *prefix, unsigned int memn
 #endif
 }
 
-static void thread_set_state(double time, const char *prefix, long unsigned int threadid, const char *name)
+static void worker_set_state(double time, const char *prefix, long unsigned int workerid, const char *name)
 {
 #ifdef STARPU_HAVE_POTI
 	char container[STARPU_POTI_STR_LEN];
-	thread_container_alias(container, STARPU_POTI_STR_LEN, prefix, threadid);
-	poti_SetState(time, container, "S", name);
+	worker_container_alias(container, STARPU_POTI_STR_LEN, prefix, workerid);
+	poti_SetState(time, container, "WS", name);
 #else
-	fprintf(out_paje_file, "10	%.9f	%st%lu	S	%s\n", time, prefix, threadid, name);
+	fprintf(out_paje_file, "10	%.9f	%sw%lu	WS	%s\n", time, prefix, workerid, name);
 #endif
 }
 
-static void worker_set_state(double time, const char *prefix, long unsigned int workerid, const char *name)
+static void worker_push_state(double time, const char *prefix, long unsigned int workerid, const char *name)
 {
 #ifdef STARPU_HAVE_POTI
 	char container[STARPU_POTI_STR_LEN];
 	worker_container_alias(container, STARPU_POTI_STR_LEN, prefix, workerid);
-	poti_SetState(time, container, "WS", name);
+	poti_PushState(time, container, "WS", name);
 #else
-	fprintf(out_paje_file, "10	%.9f	%sw%lu	WS	%s\n", time, prefix, workerid, name);
+	fprintf(out_paje_file, "11	%.9f	%sw%lu	WS	%s\n", time, prefix, workerid, name);
+#endif
+}
+
+static void worker_pop_state(double time, const char *prefix, long unsigned int workerid)
+{
+#ifdef STARPU_HAVE_POTI
+	char container[STARPU_POTI_STR_LEN];
+	worker_container_alias(container, STARPU_POTI_STR_LEN, prefix, workerid);
+	poti_PopState(time, container, "WS");
+#else
+	fprintf(out_paje_file, "12	%.9f	%sw%lu	WS\n", time, prefix, workerid);
+#endif
+}
+
+static void thread_set_state(double time, const char *prefix, long unsigned int threadid, const char *name)
+{
+	if (!find_sync(threadid))
+		/* Unless using worker sets, collapse thread and worker */
+		return worker_set_state(time, prefix, find_worker_id(threadid), name);
+
+#ifdef STARPU_HAVE_POTI
+	char container[STARPU_POTI_STR_LEN];
+	thread_container_alias(container, STARPU_POTI_STR_LEN, prefix, threadid);
+	poti_SetState(time, container, "S", name);
+#else
+	fprintf(out_paje_file, "10	%.9f	%st%lu	S	%s\n", time, prefix, threadid, name);
 #endif
 }
 
 static void thread_push_state(double time, const char *prefix, long unsigned int threadid, const char *name)
 {
+	if (!find_sync(threadid))
+		/* Unless using worker sets, collapse thread and worker */
+		return worker_push_state(time, prefix, find_worker_id(threadid), name);
+
 #ifdef STARPU_HAVE_POTI
 	char container[STARPU_POTI_STR_LEN];
 	thread_container_alias(container, STARPU_POTI_STR_LEN, prefix, threadid);
@@ -331,6 +374,10 @@ static void thread_push_state(double time, const char *prefix, long unsigned int
 
 static void thread_pop_state(double time, const char *prefix, long unsigned int threadid)
 {
+	if (!find_sync(threadid))
+		/* Unless using worker sets, collapse thread and worker */
+		return worker_pop_state(time, prefix, find_worker_id(threadid));
+
 #ifdef STARPU_HAVE_POTI
 	char container[STARPU_POTI_STR_LEN];
 	thread_container_alias(container, STARPU_POTI_STR_LEN, prefix, threadid);
@@ -416,10 +463,11 @@ static void handle_worker_init_start(struct fxt_ev_64 *ev, struct starpu_fxt_opt
 	int devid = ev->param[2];
 	int workerid = ev->param[1];
 	int nodeid = ev->param[3];
-	int threadid = ev->param[4];
+	int set = ev->param[4];
+	int threadid = ev->param[5];
 	int new_thread;
 
-	new_thread = register_worker_id(threadid, workerid);
+	new_thread = register_worker_id(threadid, workerid, set);
 
 	char *kindstr = "";
 	struct starpu_perfmodel_arch arch;
@@ -2470,7 +2518,7 @@ void starpu_fxt_write_data_trace(char *filename_in)
 		switch (ev.code)
 		{
 		case _STARPU_FUT_WORKER_INIT_START:
-			register_worker_id(ev.param[4], ev.param[1]);
+			register_worker_id(ev.param[5], ev.param[1], ev.param[4]);
 			break;
 
 		case _STARPU_FUT_START_CODELET_BODY:

+ 26 - 1
src/debug/traces/starpu_paje.c

@@ -183,8 +183,21 @@ void _starpu_fxt_write_paje_header(FILE *file STARPU_ATTRIBUTE_UNUSED)
 	poti_DefineEntityValue("Sl", "S", "Sleeping", ".9 .1 .0");
 	poti_DefineEntityValue("P", "S", "Progressing", ".4 .1 .6");
 	poti_DefineEntityValue("U", "S", "Unpartitioning", ".0 .0 1.0");
+	poti_DefineEntityValue("H", "S", "Hypervisor", ".5 .18 .0");
 	poti_DefineStateType("WS", "W", "Worker State");
 	poti_DefineEntityValue("I", "WS", "Idle", ".9 .1 .0");
+	poti_DefineEntityValue("I", "WS", "Initializing", "0.0 .7 1.0");
+	poti_DefineEntityValue("D", "WS", "Deinitializing", "0.0 .1 .7");
+	poti_DefineEntityValue("Fi", "WS", "FetchingInput", "1.0 .1 1.0");
+	poti_DefineEntityValue("Po", "WS", "PushingOutput", "0.1 1.0 1.0");
+	poti_DefineEntityValue("C", "WS", "Callback", ".0 .3 .8");
+	poti_DefineEntityValue("B", "WS", "Overhead", ".5 .18 .0");
+	poti_DefineEntityValue("E", "WS", "Executing", ".0 .6 .5");
+	poti_DefineEntityValue("Sc", "WS", "Scheduling", ".7 .36 .0");
+	poti_DefineEntityValue("Sl", "WS", "Sleeping", ".9 .1 .0");
+	poti_DefineEntityValue("P", "WS", "Progressing", ".4 .1 .6");
+	poti_DefineEntityValue("U", "WS", "Unpartitioning", ".0 .0 1.0");
+	poti_DefineEntityValue("H", "WS", "Hypervisor", ".5 .18 .0");
 
 	/* Types for the MPI Communication Thread of the Memory Node */
 	poti_DefineEventType("MPIev", "MPICt", "MPI event type");
@@ -262,9 +275,21 @@ void _starpu_fxt_write_paje_header(FILE *file STARPU_ATTRIBUTE_UNUSED)
 6       Sl       S      Sleeping         \".9 .1 .0\"		\n\
 6       P       S       Progressing         \".4 .1 .6\"		\n\
 6       U       S       Unpartitioning      \".0 .0 1.0\"		\n\
+6       H       S       Hypervisor      \".5 .18 .0\"		\n\
 3       WS       W       \"Worker State\"                        \n\
 6       I       WS       Idle         \".9 .1 .0\"		\n\
-6       H       S       Hypervisor      \".5 .18 .0\"		\n");
+6       I       WS      Initializing       \"0.0 .7 1.0\"            \n\
+6       D       WS      Deinitializing       \"0.0 .1 .7\"            \n\
+6       Fi       WS      FetchingInput       \"1.0 .1 1.0\"            \n\
+6       Po       WS      PushingOutput       \"0.1 1.0 1.0\"            \n\
+6       C       WS       Callback       \".0 .3 .8\"            \n\
+6       B       WS       Overhead         \".5 .18 .0\"		\n\
+6       E       WS       Executing         \".0 .6 .5\"		\n\
+6       Sc       WS      Scheduling         \".7 .36 .0\"		\n\
+6       Sl       WS      Sleeping         \".9 .1 .0\"		\n\
+6       P       WS       Progressing         \".4 .1 .6\"		\n\
+6       U       WS       Unpartitioning      \".0 .0 1.0\"		\n\
+6       H       WS       Hypervisor      \".5 .18 .0\"		\n");
 	fprintf(file, "\
 6       P       CtS       Processing         \"0 0 0\"		\n\
 6       Sl       CtS      Sleeping         \".9 .1 .0\"		\n\

+ 2 - 2
src/drivers/cpu/driver_cpu.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2010-2014  Université de Bordeaux
+ * Copyright (C) 2010-2015  Université de Bordeaux
  * Copyright (C) 2010  Mehdi Juhoor <mjuhoor@gmail.com>
  * Copyright (C) 2010-2014  Centre National de la Recherche Scientifique
  * Copyright (C) 2011  Télécom-SudParis
@@ -182,7 +182,7 @@ int _starpu_cpu_driver_init(struct _starpu_worker *cpu_worker)
 {
 	int devid = cpu_worker->devid;
 
-	_starpu_worker_start(cpu_worker, _STARPU_FUT_CPU_KEY);
+	_starpu_driver_start(cpu_worker, _STARPU_FUT_CPU_KEY, 1);
 	/* FIXME: when we have NUMA support, properly turn node number into NUMA node number */
 	_starpu_memory_manager_set_global_memory_size(cpu_worker->memory_node, _starpu_cpu_get_global_mem_size(cpu_worker->memory_node, cpu_worker->config));
 

+ 2 - 7
src/drivers/cuda/driver_cuda.c

@@ -562,17 +562,12 @@ int _starpu_cuda_driver_init(struct _starpu_worker_set *worker_set)
 	int lastdevid = -1;
 	unsigned i;
 
-	_starpu_worker_start(worker0, _STARPU_FUT_CUDA_KEY);
+	_starpu_driver_start(worker0, _STARPU_FUT_CUDA_KEY, 0);
 	_starpu_set_local_worker_set_key(worker_set);
 
 #ifdef STARPU_USE_FXT
 	for (i = 1; i < worker_set->nworkers; i++)
-	{
-		struct _starpu_worker *worker = &worker_set->workers[i];
-		unsigned devid = worker->devid;
-		unsigned memnode = worker->memory_node;
-		_STARPU_TRACE_WORKER_INIT_START(_STARPU_FUT_CUDA_KEY, worker->workerid, devid, memnode);
-	}
+		_starpu_worker_start(&worker_set->workers[i], _STARPU_FUT_CUDA_KEY, 0);
 #endif
 
 	for (i = 0; i < worker_set->nworkers; i++)

+ 1 - 1
src/drivers/mic/driver_mic_source.c

@@ -517,7 +517,7 @@ void *_starpu_mic_src_worker(void *arg)
 
 	/* unsigned memnode = baseworker->memory_node; */
 
-	_starpu_worker_start(baseworker, _STARPU_FUT_MIC_KEY);
+	_starpu_driver_start(baseworker, _STARPU_FUT_MIC_KEY, 0);
 	for (i = 1; i < worker_set->nworkers; i++)
 	{
 		struct _starpu_worker *worker = &worker_set->workers[i];

+ 1 - 1
src/drivers/opencl/driver_opencl.c

@@ -606,7 +606,7 @@ int _starpu_opencl_driver_init(struct _starpu_worker *worker)
 {
 	int devid = worker->devid;
 
-	_starpu_worker_start(worker, _STARPU_FUT_OPENCL_KEY);
+	_starpu_driver_start(worker, _STARPU_FUT_OPENCL_KEY, 0);
 
 	_starpu_opencl_init_context(devid);
 

+ 1 - 1
src/drivers/scc/driver_scc_source.c

@@ -291,7 +291,7 @@ void *_starpu_scc_src_worker(void *arg)
 	unsigned subworkerid = args->subworkerid;
 	unsigned i;
 
-	_starpu_worker_start(args, _STARPU_FUT_SCC_KEY);
+	_starpu_driver_start(args, _STARPU_FUT_SCC_KEY, 0);
 
 	_starpu_scc_src_init_context(subworkerid);