Olivier Aumage 11 gadi atpakaļ
vecāks
revīzija
fb637c5fc5

+ 4 - 0
ChangeLog

@@ -71,6 +71,8 @@ Small features:
   * Add codelet size, footprint and tag id in the paje trace.
   * Add STARPU_TAG_ONLY, to specify a tag for traces without making StarPU
     manage the tag.
+  * On Linux x86, spinlocks now block after a hundred tries. This avoids
+    typical 10ms pauses when the application thread tries to submit tasks.
 
 Changes:
   * Data interfaces (variable, vector, matrix and block) now define
@@ -96,6 +98,8 @@ New features:
     scheduler.
   * Add STARPU_CALIBRATE_MINIMUM environment variable to specify the minimum
     number of calibration measurements.
+  * Add STARPU_TRACE_BUFFER_SIZE environment variable to specify the size of
+    the trace buffer.
 
 StarPU 1.1.1 (svn revision 12638)
 ==============================================

+ 5 - 2
configure.ac

@@ -2130,7 +2130,8 @@ AC_ARG_ENABLE(blas-lib,
  [  --enable-blas-lib[=blaslibname]:
                       none [default]: no BLAS lib is used
                       atlas: use ATLAS library
-                      goto: use GotoBLAS library],
+                      goto: use GotoBLAS library
+		      mkl: use MKL library (you may need to set specific CFLAGS and LDFLAGS with --with-mkl-cflags and --with-mkl-ldflags)],
  [
      if   test "x$enableval" = "xatlas" ; then
         blas_lib=atlas
@@ -2138,6 +2139,8 @@ AC_ARG_ENABLE(blas-lib,
         blas_lib=goto
      elif test "x$enableval" = "xnone" ; then
         blas_lib=none
+     elif test "x$enableval" = "xmkl" ; then
+        blas_lib=mkl
      elif test x$enableval = xno; then
 	blas_lib=none
      else
@@ -2195,7 +2198,7 @@ if test x$blas_lib = xmaybe -o x$blas_lib = xatlas; then
     fi
 fi
 
-if test x$blas_lib = xmaybe; then
+if test x$blas_lib = xmaybe -o x$blas_lib = xmkl; then
     # Should we use MKL ?
     AC_ARG_WITH(mkl-cflags, [AS_HELP_STRING([--with-mkl-cflags], [specify MKL compilation flags])],
 	[

+ 9 - 0
doc/doxygen/chapters/40environment_variables.doxy

@@ -548,6 +548,15 @@ intended to be used for experimental purposes as it emulates devices
 that have a limited amount of memory.
 </dd>
 
+<dt>STARPU_TRACE_BUFFER_SIZE</dt>
+<dd>
+\anchor STARPU_TRACE_BUFFER_SIZE
+\addindex __env__STARPU_TRACE_BUFFER_SIZE
+This sets the buffer size for recording trace events in MiB. Setting it to a big
+size allows to avoid pauses in the trace while it is recorded on the disk. This
+however also consumes memory, of course. The default value is 64.
+</dd>
+
 <dt>STARPU_GENERATE_TRACE</dt>
 <dd>
 \anchor STARPU_GENERATE_TRACE

+ 2 - 0
include/starpu_config.h.in

@@ -58,6 +58,7 @@
 #undef STARPU_HAVE_MALLOC_H
 
 #undef STARPU_HAVE_SYNC_BOOL_COMPARE_AND_SWAP
+#undef STARPU_HAVE_SYNC_VAL_COMPARE_AND_SWAP
 #undef STARPU_HAVE_SYNC_FETCH_AND_ADD
 #undef STARPU_HAVE_SYNC_FETCH_AND_OR
 #undef STARPU_HAVE_SYNC_LOCK_TEST_AND_SET
@@ -90,6 +91,7 @@
 #undef STARPU_HAVE_LIBNUMA
 
 #undef STARPU_HAVE_WINDOWS
+#undef STARPU_LINUX_SYS
 #undef STARPU_HAVE_UNSETENV
 
 #ifdef _MSC_VER

+ 3 - 1
include/starpu_thread.h

@@ -236,12 +236,14 @@ int starpu_pthread_barrier_wait(starpu_pthread_barrier_t *barrier);
  * Encapsulation of the pthread_spin_* functions.
  */
 
-#if defined(STARPU_SIMGRID) || !defined(STARPU_HAVE_PTHREAD_SPIN_LOCK)
+#if defined(STARPU_SIMGRID) || (defined(STARPU_LINUX_SYS) && defined(STARPU_HAVE_XCHG)) || !defined(STARPU_HAVE_PTHREAD_SPIN_LOCK)
 
 typedef struct
 {
 #ifdef STARPU_SIMGRID
 	int taken;
+#elif defined(STARPU_LINUX_SYS) && defined(STARPU_HAVE_XCHG)
+	unsigned taken STARPU_ATTRIBUTE_ALIGNED(16);
 #else /* we only have a trivial implementation yet ! */
 	uint32_t taken STARPU_ATTRIBUTE_ALIGNED(16);
 #endif

+ 6 - 0
include/starpu_util.h

@@ -176,6 +176,12 @@ STARPU_ATOMIC_SOMETHING(or, old | value)
 #define STARPU_BOOL_COMPARE_AND_SWAP(ptr, old, value) (starpu_cmpxchg((ptr), (old), (value)) == (old))
 #endif
 
+#ifdef STARPU_HAVE_SYNC_VAL_COMPARE_AND_SWAP
+#define STARPU_VAL_COMPARE_AND_SWAP(ptr, old, value)  (__sync_val_compare_and_swap ((ptr), (old), (value)))
+#elif defined(STARPU_HAVE_XCHG)
+#define STARPU_VAL_COMPARE_AND_SWAP(ptr, old, value) (starpu_cmpxchg((ptr), (old), (value)))
+#endif
+
 #ifdef STARPU_HAVE_SYNC_LOCK_TEST_AND_SET
 #define STARPU_TEST_AND_SET(ptr, value) (__sync_lock_test_and_set ((ptr), (value)))
 #define STARPU_RELEASE(ptr) (__sync_lock_release ((ptr)))

+ 13 - 0
m4/acinclude.m4

@@ -42,6 +42,19 @@ AC_DEFUN([STARPU_CHECK_SYNC_BOOL_COMPARE_AND_SWAP], [
 	      [Define to 1 if the target supports __sync_bool_compare_and_swap])
   fi])
 
+dnl Check whether the target supports __sync_val_compare_and_swap.
+AC_DEFUN([STARPU_CHECK_SYNC_VAL_COMPARE_AND_SWAP], [
+  AC_CACHE_CHECK([whether the target supports __sync_val_compare_and_swap],
+		 ac_cv_have_sync_val_compare_and_swap, [
+  AC_LINK_IFELSE([AC_LANG_PROGRAM([int foo, bar;],
+			[bar = __sync_val_compare_and_swap(&foo, 0, 1);])],
+			[ac_cv_have_sync_val_compare_and_swap=yes],
+			[ac_cv_have_sync_val_compare_and_swap=no])])
+  if test $ac_cv_have_sync_val_compare_and_swap = yes; then
+    AC_DEFINE(STARPU_HAVE_SYNC_VAL_COMPARE_AND_SWAP, 1,
+	      [Define to 1 if the target supports __sync_val_compare_and_swap])
+  fi])
+
 dnl Check whether the target supports __sync_fetch_and_add.
 AC_DEFUN([STARPU_CHECK_SYNC_FETCH_AND_ADD], [
   AC_CACHE_CHECK([whether the target supports __sync_fetch_and_add],

+ 19 - 8
src/common/fxt.h

@@ -162,6 +162,9 @@
 #define _STARPU_FUT_WORKER_SCHEDULING_PUSH	0x5166
 #define _STARPU_FUT_WORKER_SCHEDULING_POP	0x5167
 
+#define	_STARPU_FUT_START_EXECUTING	0x5168
+#define	_STARPU_FUT_END_EXECUTING	0x5169
+
 #ifdef STARPU_USE_FXT
 #include <fxt/fxt.h>
 #include <fxt/fut.h>
@@ -406,31 +409,37 @@ do {									\
 #define _STARPU_TRACE_WORKER_INIT_END(workerid)				\
 	FUT_DO_PROBE2(_STARPU_FUT_WORKER_INIT_END, _starpu_gettid(), (workerid));
 
-#define _STARPU_TRACE_START_CODELET_BODY(job, nimpl, archtype)				\
+#define _STARPU_TRACE_START_CODELET_BODY(job, nimpl, archtype, workerid)				\
 do {									\
         const char *model_name = _starpu_job_get_model_name((job));         \
 	if (model_name)                                                 \
 	{								\
 		/* we include the symbol name */			\
-		_STARPU_FUT_DO_PROBE4STR(_STARPU_FUT_START_CODELET_BODY, (job), ((job)->task)->sched_ctx, _starpu_gettid(), 1, model_name); \
+		_STARPU_FUT_DO_PROBE4STR(_STARPU_FUT_START_CODELET_BODY, (job), ((job)->task)->sched_ctx, workerid, 1, model_name); \
 	}								\
 	else {                                                          \
-		FUT_DO_PROBE4(_STARPU_FUT_START_CODELET_BODY, (job), ((job)->task)->sched_ctx, _starpu_gettid(), 0); \
+		FUT_DO_PROBE4(_STARPU_FUT_START_CODELET_BODY, (job), ((job)->task)->sched_ctx, workerid, 0); \
 	}								\
 	{								\
 		const size_t __job_size = _starpu_job_get_data_size((job)->task->cl?(job)->task->cl->model:NULL, archtype, nimpl, (job));	\
 		const uint32_t __job_hash = _starpu_compute_buffers_footprint((job)->task->cl?(job)->task->cl->model:NULL, archtype, nimpl, (job));\
-		FUT_DO_PROBE6(_STARPU_FUT_CODELET_DETAILS, (job), ((job)->task)->sched_ctx, __job_size, __job_hash, (job)->task->tag_id, _starpu_gettid());	\
+		FUT_DO_PROBE6(_STARPU_FUT_CODELET_DETAILS, (job), ((job)->task)->sched_ctx, __job_size, __job_hash, (job)->task->tag_id, workerid);	\
 	}								\
 } while(0);
 
-#define _STARPU_TRACE_END_CODELET_BODY(job, nimpl, archtype)			\
+#define _STARPU_TRACE_END_CODELET_BODY(job, nimpl, archtype, workerid)			\
 do {									\
 	const size_t job_size = _starpu_job_get_data_size((job)->task->cl?(job)->task->cl->model:NULL, archtype, nimpl, (job));	\
 	const uint32_t job_hash = _starpu_compute_buffers_footprint((job)->task->cl?(job)->task->cl->model:NULL, archtype, nimpl, (job));\
-	FUT_DO_PROBE7(_STARPU_FUT_END_CODELET_BODY, (job), (job_size), (job_hash), (archtype)->type, (archtype)->devid, (archtype)->ncore, _starpu_gettid());	\
+	FUT_DO_PROBE7(_STARPU_FUT_END_CODELET_BODY, (job), (job_size), (job_hash), (archtype)->type, (archtype)->devid, (archtype)->ncore, workerid);	\
 } while(0);
 
+#define _STARPU_TRACE_START_EXECUTING()				\
+	FUT_DO_PROBE1(_STARPU_FUT_START_EXECUTING, _starpu_gettid());
+
+#define _STARPU_TRACE_END_EXECUTING()				\
+	FUT_DO_PROBE1(_STARPU_FUT_END_EXECUTING, _starpu_gettid());
+
 #define _STARPU_TRACE_START_CALLBACK(job)	\
 	FUT_DO_PROBE2(_STARPU_FUT_START_CALLBACK, job, _starpu_gettid());
 
@@ -782,8 +791,10 @@ do {										\
 #define _STARPU_TRACE_NEW_MEM_NODE(nodeid)	do {} while(0)
 #define _STARPU_TRACE_WORKER_INIT_START(a,b,c)	do {} while(0)
 #define _STARPU_TRACE_WORKER_INIT_END(workerid)	do {} while(0)
-#define _STARPU_TRACE_START_CODELET_BODY(job, nimpl, archtype)	do {} while(0)
-#define _STARPU_TRACE_END_CODELET_BODY(job, nimpl, a)	do {} while(0)
+#define _STARPU_TRACE_START_CODELET_BODY(job, nimpl, archtype, workerid)	do {} while(0)
+#define _STARPU_TRACE_END_CODELET_BODY(job, nimpl, a, workerid)	do {} while(0)
+#define _STARPU_TRACE_START_EXECUTING()	do {} while(0)
+#define _STARPU_TRACE_END_EXECUTING()	do {} while(0)
 #define _STARPU_TRACE_START_CALLBACK(job)	do {} while(0)
 #define _STARPU_TRACE_END_CALLBACK(job)		do {} while(0)
 #define _STARPU_TRACE_JOB_PUSH(task, prio)	do {} while(0)

+ 92 - 8
src/common/thread.c

@@ -23,6 +23,21 @@
 #include <xbt/synchro_core.h>
 #endif
 
+#if defined(STARPU_LINUX_SYS) && defined(STARPU_HAVE_XCHG)
+#include <linux/futex.h>
+#include <sys/syscall.h>
+
+/* Private futexes are not so old, cope with old kernels.  */
+#ifdef FUTEX_WAIT_PRIVATE
+static int _starpu_futex_wait = FUTEX_WAIT_PRIVATE;
+static int _starpu_futex_wake = FUTEX_WAKE_PRIVATE;
+#else
+static int _starpu_futex_wait = FUTEX_WAIT;
+static int _starpu_futex_wake = FUTEX_WAKE;
+#endif
+
+#endif
+
 #ifdef STARPU_SIMGRID
 
 extern int _starpu_simgrid_thread_start(int argc, char *argv[]);
@@ -490,15 +505,15 @@ int starpu_pthread_barrier_wait(starpu_pthread_barrier_t *barrier)
 }
 #endif /* STARPU_SIMGRID, _MSC_VER, STARPU_HAVE_PTHREAD_BARRIER */
 
-#if defined(STARPU_SIMGRID) || !defined(HAVE_PTHREAD_SPIN_LOCK)
+#if defined(STARPU_SIMGRID) || (defined(STARPU_LINUX_SYS) && defined(STARPU_HAVE_XCHG)) || !defined(HAVE_PTHREAD_SPIN_LOCK)
 
-int starpu_pthread_spin_init(starpu_pthread_spinlock_t *lock, int pshared)
+int starpu_pthread_spin_init(starpu_pthread_spinlock_t *lock, int pshared STARPU_ATTRIBUTE_UNUSED)
 {
 	lock->taken = 0;
 	return 0;
 }
 
-int starpu_pthread_spin_destroy(starpu_pthread_spinlock_t *lock)
+int starpu_pthread_spin_destroy(starpu_pthread_spinlock_t *lock STARPU_ATTRIBUTE_UNUSED)
 {
 	/* we don't do anything */
 	return 0;
@@ -519,7 +534,53 @@ int starpu_pthread_spin_lock(starpu_pthread_spinlock_t *lock)
 		MSG_process_sleep(0.000001);
 		STARPU_UYIELD();
 	}
-#else
+#elif defined(STARPU_LINUX_SYS) && defined(STARPU_HAVE_XCHG)
+	if (STARPU_VAL_COMPARE_AND_SWAP(&lock->taken, 0, 1) == 0)
+		/* Got it on first try! */
+		return 0;
+
+	/* Busy, spin a bit.  */
+	unsigned i;
+	for (i = 0; i < 128; i++)
+	{
+		/* Pause a bit before retrying */
+		STARPU_UYIELD();
+		/* And synchronize with other threads */
+		STARPU_SYNCHRONIZE();
+		if (!lock->taken)
+			/* Holder released it, try again */
+			if (STARPU_VAL_COMPARE_AND_SWAP(&lock->taken, 0, 1) == 0)
+				/* Got it! */
+				return 0;
+	}
+
+	/* We have spent enough time with spinning, let's block */
+	while (1)
+	{
+		/* Tell releaser to wake us */
+		unsigned prev = starpu_xchg(&lock->taken, 2);
+		if (prev == 0)
+			/* Ah, it just got released and we actually acquired
+			 * it!
+			 * Note: the sad thing is that we have just written 2,
+			 * so will spuriously try to wake a thread on unlock,
+			 * but we can not avoid it since we do not know whether
+			 * there are other threads sleeping or not.
+			 */
+			return 0;
+
+		/* Now start sleeping (unless it was released in between)
+		 * We are sure to get woken because either
+		 * - some thread has not released the lock yet, and lock->taken
+		 *   is 2, so it will wake us.
+		 * - some other thread started blocking, and will set
+		 *   lock->taken back to 2
+		 */
+		if (syscall(SYS_futex, &lock->taken, _starpu_futex_wait, 2, NULL, NULL, 0))
+			if (errno == ENOSYS)
+				_starpu_futex_wait = FUTEX_WAIT;
+	}
+#else /* !SIMGRID && !LINUX */
 	uint32_t prev;
 	do
 	{
@@ -539,7 +600,11 @@ int starpu_pthread_spin_trylock(starpu_pthread_spinlock_t *lock)
 		return EBUSY;
 	lock->taken = 1;
 	return 0;
-#else
+#elif defined(STARPU_LINUX_SYS) && defined(STARPU_HAVE_XCHG)
+	unsigned prev;
+	prev = STARPU_VAL_COMPARE_AND_SWAP(&lock->taken, 0, 1);
+	return (prev == 0)?0:EBUSY;
+#else /* !SIMGRID && !LINUX */
 	uint32_t prev;
 	prev = STARPU_TEST_AND_SET(&lock->taken, 1);
 	return (prev == 0)?0:EBUSY;
@@ -550,11 +615,27 @@ int starpu_pthread_spin_unlock(starpu_pthread_spinlock_t *lock)
 {
 #ifdef STARPU_SIMGRID
 	lock->taken = 0;
-	return 0;
-#else
+#elif defined(STARPU_LINUX_SYS) && defined(STARPU_HAVE_XCHG)
+	STARPU_ASSERT(lock->taken != 0);
+	unsigned next = STARPU_ATOMIC_ADD(&lock->taken, -1);
+	if (next == 0)
+		/* Nobody to wake, we are done */
+		return 0;
+
+	/*
+	 * Somebody to wake. Clear 'taken' and wake him.
+	 * Note that he may not be sleeping yet, but if he is not, we won't
+	 * since the value of 'taken' will have changed.
+	 */
+	lock->taken = 0;
+	STARPU_SYNCHRONIZE();
+	if (syscall(SYS_futex, &lock->taken, _starpu_futex_wake, 1, NULL, NULL, 0))
+		if (errno == ENOSYS)
+			_starpu_futex_wake = FUTEX_WAKE;
+#else /* !SIMGRID && !LINUX */
 	STARPU_RELEASE(&lock->taken);
-	return 0;
 #endif
+	return 0;
 }
 
 #endif /* defined(STARPU_SIMGRID) || !defined(HAVE_PTHREAD_SPIN_LOCK) */
@@ -564,6 +645,9 @@ int _starpu_pthread_spin_checklocked(starpu_pthread_spinlock_t *lock)
 #ifdef STARPU_SIMGRID
 	STARPU_ASSERT(lock->taken);
 	return !lock->taken;
+#elif defined(STARPU_LINUX_SYS) && defined(STARPU_HAVE_XCHG)
+	STARPU_ASSERT(lock->taken == 1 || lock->taken == 2);
+	return lock->taken == 0;
 #elif defined(HAVE_PTHREAD_SPIN_LOCK)
 	int ret = pthread_spin_trylock((pthread_spinlock_t *)lock);
 	STARPU_ASSERT(ret != 0);

+ 11 - 6
src/core/jobs.h

@@ -93,19 +93,19 @@ LIST_TYPE(_starpu_job,
 	 * last_writer/readers */
 	starpu_data_handle_t implicit_dep_handle;
 
-	/* The value of the footprint that identifies the job may be stored in
-	 * this structure. */
-	uint32_t footprint;
-	unsigned footprint_is_computed:1;
-
 	/* Indicates whether the task associated to that job has already been
 	 * submitted to StarPU (1) or not (0) (using starpu_task_submit).
 	 * Becomes and stays 2 when the task is submitted several times.
+	 *
+	 * Protected by j->sync_mutex.
 	 */
 	unsigned submitted:2;
 
 	/* Indicates whether the task associated to this job is terminated or
-	 * not. */
+	 * not.
+	 *
+	 * Protected by j->sync_mutex.
+	 */
 	unsigned terminated:2;
 
 #ifdef STARPU_OPENMP
@@ -138,6 +138,11 @@ LIST_TYPE(_starpu_job,
 	double cumulated_power_consumed;
 #endif
 
+	/* The value of the footprint that identifies the job may be stored in
+	 * this structure. */
+	uint32_t footprint;
+	unsigned footprint_is_computed:1;
+
 	/* Should that task appear in the debug tools ? (eg. the DAG generated
 	 * with dot) */
 	unsigned exclude_from_dag:1;

+ 6 - 4
src/core/perfmodel/perfmodel_history.c

@@ -1302,17 +1302,19 @@ void _starpu_update_perfmodel_history(struct _starpu_job *j, struct starpu_perfm
 				STARPU_HG_DISABLE_CHECKING(entry->nsample);
 				STARPU_HG_DISABLE_CHECKING(entry->mean);
 
-				entry->mean = measured;
-				entry->sum = measured;
+				/* Do not take the first measurement into account, it is very often quite bogus */
+				/* TODO: it'd be good to use a better estimation heuristic, like the median, or latest n values, etc. */
+				entry->mean = 0;
+				entry->sum = 0;
 
 				entry->deviation = 0.0;
-				entry->sum2 = measured*measured;
+				entry->sum2 = 0;
 
 				entry->size = _starpu_job_get_data_size(model, arch, nimpl, j);
 				entry->flops = j->task->flops;
 
 				entry->footprint = key;
-				entry->nsample = 1;
+				entry->nsample = 0;
 				entry->nerror = 0;
 
 				insert_history_entry(entry, list, &per_arch_model->history);

+ 1 - 1
src/core/workers.c

@@ -863,7 +863,7 @@ int starpu_conf_init(struct starpu_conf *conf)
 #endif
 
 	/* 64MiB by default */
-	conf->trace_buffer_size = 64<<20;
+	conf->trace_buffer_size = starpu_get_env_number_default("STARPU_TRACE_BUFFER_SIZE", 64) << 20;
 	return 0;
 }
 

+ 1 - 1
src/datawizard/coherency.c

@@ -58,7 +58,7 @@ int _starpu_select_src_node(starpu_data_handle_t handle, unsigned destination)
 	}
 
 	/* we should have found at least one copy ! */
-	STARPU_ASSERT(src_node_mask != 0);
+	STARPU_ASSERT_MSG(src_node_mask != 0, "The data for this handle is requested, but this handle does not have a valid value. Perhaps some initialization task is missing?");
 
 	/* Without knowing the size, we won't know the cost */
 	if (!size)

+ 12 - 6
src/datawizard/data_request.c

@@ -424,10 +424,10 @@ int _starpu_handle_node_data_requests(unsigned src_node, unsigned may_alloc, uns
 
 	*pushed = 0;
 
-	/* Here helgrind would should that this is an un protected access.
-	 * We however don't care about missing an entry, we will get called
-	 * again sooner or later. */
-	if (_starpu_data_request_list_empty(data_requests[src_node]))
+	/* This is racy, but not posing problems actually, since we know we
+	 * will come back here to probe again regularly anyway.
+	 * Thus, do not expose this optimization to helgrind */
+	if (!RUNNING_ON_VALGRIND && _starpu_data_request_list_empty(data_requests[src_node]))
 		return 0;
 
 	empty_list = _starpu_data_request_list_new();
@@ -513,7 +513,10 @@ void _starpu_handle_node_prefetch_requests(unsigned src_node, unsigned may_alloc
 
 	*pushed = 0;
 
-	if (_starpu_data_request_list_empty(prefetch_requests[src_node]))
+	/* This is racy, but not posing problems actually, since we know we
+	 * will come back here to probe again regularly anyway.
+	 * Thus, do not expose this optimization to valgrind */
+	if (!RUNNING_ON_VALGRIND && _starpu_data_request_list_empty(prefetch_requests[src_node]))
 		return;
 
 	empty_list = _starpu_data_request_list_new();
@@ -606,7 +609,10 @@ static int _handle_pending_node_data_requests(unsigned src_node, unsigned force)
 	struct _starpu_data_request_list *empty_list;
 	unsigned taken, kept;
 
-	if (_starpu_data_request_list_empty(data_requests_pending[src_node]))
+	/* Here helgrind would should that this is an un protected access.
+	 * We however don't care about missing an entry, we will get called
+	 * again sooner or later. */
+	if (!RUNNING_ON_VALGRIND && _starpu_data_request_list_empty(data_requests_pending[src_node]))
 		return 0;
 
 	empty_list = _starpu_data_request_list_new();

+ 97 - 50
src/debug/traces/starpu_fxt.c

@@ -46,21 +46,29 @@ static unsigned other_index = 0;
 
 static void set_next_other_worker_color(int workerid)
 {
+	if (workerid >= STARPU_NMAXWORKERS)
+		return;
 	worker_colors[workerid] = other_worker_colors[other_index++];
 }
 
 static void set_next_cpu_worker_color(int workerid)
 {
+	if (workerid >= STARPU_NMAXWORKERS)
+		return;
 	worker_colors[workerid] = cpus_worker_colors[cpus_index++];
 }
 
 static void set_next_cuda_worker_color(int workerid)
 {
+	if (workerid >= STARPU_NMAXWORKERS)
+		return;
 	worker_colors[workerid] = cuda_worker_colors[cuda_index++];
 }
 
 static void set_next_opencl_worker_color(int workerid)
 {
+	if (workerid >= STARPU_NMAXWORKERS)
+		return;
 	worker_colors[workerid] = opencl_worker_colors[opencl_index++];
 }
 
@@ -76,6 +84,8 @@ static void set_next_scc_worker_color(int workerid)
 
 static const char *get_worker_color(int workerid)
 {
+	if (workerid >= STARPU_NMAXWORKERS)
+		workerid = STARPU_NMAXWORKERS - 1;
 	return worker_colors[workerid];
 }
 
@@ -154,7 +164,7 @@ struct worker_entry
 	int workerid;
 } *worker_ids;
 
-static void register_worker_id(unsigned long tid, int workerid)
+static int register_worker_id(unsigned long tid, int workerid)
 {
 	nworkers++;
 	struct worker_entry *entry;
@@ -164,15 +174,15 @@ static void register_worker_id(unsigned long tid, int workerid)
 	STARPU_ASSERT_MSG(workerid < STARPU_NMAXWORKERS, "Too many workers in this trace, please increase in ./configure invocation the maximum number of CPUs and GPUs to the same value as was used for execution");
 
 	/* only register a thread once */
-	//STARPU_ASSERT(entry == NULL);
 	if (entry)
-		return;
+		return 0;
 
 	entry = malloc(sizeof(*entry));
 	entry->tid = tid;
 	entry->workerid = workerid;
 
 	HASH_ADD(hh, worker_ids, tid, sizeof(tid), entry);
+	return 1;
 }
 
 static int find_worker_id(unsigned long tid)
@@ -264,48 +274,59 @@ static void memnode_set_state(double time, const char *prefix, unsigned int memn
 #endif
 }
 
-static void worker_set_state(double time, const char *prefix, long unsigned int workerid, const char *name)
+static void thread_set_state(double time, const char *prefix, long unsigned int threadid, const char *name)
 {
 #ifdef STARPU_HAVE_POTI
 	char container[STARPU_POTI_STR_LEN];
-	thread_container_alias(container, STARPU_POTI_STR_LEN, prefix, workerid);
+	thread_container_alias(container, STARPU_POTI_STR_LEN, prefix, threadid);
 	poti_SetState(time, container, "S", name);
 #else
-	fprintf(out_paje_file, "10	%.9f	%st%lu	S	%s\n", time, prefix, workerid, name);
+	fprintf(out_paje_file, "10	%.9f	%st%lu	S	%s\n", time, prefix, threadid, name);
 #endif
 }
 
-static void worker_set_detailed_state(double time, const char *prefix, long unsigned int workerid, const char *name, unsigned long size, unsigned long footprint, unsigned long long tag)
+static void worker_set_state(double time, const char *prefix, long unsigned int workerid, const char *name)
 {
 #ifdef STARPU_HAVE_POTI
 	char container[STARPU_POTI_STR_LEN];
-	thread_container_alias(container, STARPU_POTI_STR_LEN, prefix, workerid);
-	/* TODO: set detailed state */
-	poti_SetState(time, container, "S", name);
+	worker_container_alias(container, STARPU_POTI_STR_LEN, prefix, workerid);
+	poti_SetState(time, container, "WS", name);
 #else
-	fprintf(out_paje_file, "20	%.9f	%st%lu	S	%s	%lu	%08lx	%016llx\n", time, prefix, workerid, name, size, footprint, tag);
+	fprintf(out_paje_file, "10	%.9f	%sw%lu	WS	%s\n", time, prefix, workerid, name);
 #endif
 }
 
-static void worker_push_state(double time, const char *prefix, long unsigned int workerid, const char *name)
+static void thread_push_state(double time, const char *prefix, long unsigned int threadid, const char *name)
 {
 #ifdef STARPU_HAVE_POTI
 	char container[STARPU_POTI_STR_LEN];
-	thread_container_alias(container, STARPU_POTI_STR_LEN, prefix, workerid);
+	thread_container_alias(container, STARPU_POTI_STR_LEN, prefix, threadid);
 	poti_PushState(time, container, "S", name);
 #else
-	fprintf(out_paje_file, "11	%.9f	%st%lu	S	%s\n", time, prefix, workerid, name);
+	fprintf(out_paje_file, "11	%.9f	%st%lu	S	%s\n", time, prefix, threadid, name);
 #endif
 }
 
-static void worker_pop_state(double time, const char *prefix, long unsigned int workerid)
+static void thread_pop_state(double time, const char *prefix, long unsigned int threadid)
 {
 #ifdef STARPU_HAVE_POTI
 	char container[STARPU_POTI_STR_LEN];
-	thread_container_alias(container, STARPU_POTI_STR_LEN, prefix, workerid);
+	thread_container_alias(container, STARPU_POTI_STR_LEN, prefix, threadid);
 	poti_PopState(time, container, "S");
 #else
-	fprintf(out_paje_file, "12	%.9f	%st%lu	S\n", time, prefix, workerid);
+	fprintf(out_paje_file, "12	%.9f	%st%lu	S\n", time, prefix, threadid);
+#endif
+}
+
+static void worker_set_detailed_state(double time, const char *prefix, long unsigned int workerid, const char *name, unsigned long size, unsigned long footprint, unsigned long long tag)
+{
+#ifdef STARPU_HAVE_POTI
+	char container[STARPU_POTI_STR_LEN];
+	worker_container_alias(container, STARPU_POTI_STR_LEN, prefix, workerid);
+	/* TODO: set detailed state */
+	poti_SetState(time, container, "WS", name);
+#else
+	fprintf(out_paje_file, "20	%.9f	%sw%lu	WS	%s	%lu	%08lx	%016llx\n", time, prefix, workerid, name, size, footprint, tag);
 #endif
 }
 
@@ -371,8 +392,9 @@ static void handle_worker_init_start(struct fxt_ev_64 *ev, struct starpu_fxt_opt
 	int workerid = ev->param[1];
 	int nodeid = ev->param[3];
 	int threadid = ev->param[4];
+	int new_thread;
 
-	register_worker_id(threadid, workerid);
+	new_thread = register_worker_id(threadid, workerid);
 
 	char *kindstr = "";
 	struct starpu_perfmodel_arch arch;
@@ -435,11 +457,13 @@ static void handle_worker_init_start(struct fxt_ev_64 *ev, struct starpu_fxt_opt
 		snprintf(new_thread_container_name, STARPU_POTI_STR_LEN, "%s%d", prefix, threadid);
 		char new_worker_container_name[STARPU_POTI_STR_LEN];
 		snprintf(new_worker_container_name, STARPU_POTI_STR_LEN, "%s%s%d", prefix, kindstr, devid);
-		poti_CreateContainer(get_event_time_stamp(ev, options), new_thread_container_alias, "T", memnode_container, new_thread_container_name);
+		if (new_thread)
+			poti_CreateContainer(get_event_time_stamp(ev, options), new_thread_container_alias, "T", memnode_container, new_thread_container_name);
 		poti_CreateContainer(get_event_time_stamp(ev, options), new_worker_container_alias, "W", new_thread_container_alias, new_worker_container_name);
 #else
-		fprintf(out_paje_file, "7	%.9f	%st%d	T	%smn%d	%s%d\n",
-			get_event_time_stamp(ev, options), prefix, threadid, prefix, nodeid, prefix, threadid);
+		if (new_thread)
+			fprintf(out_paje_file, "7	%.9f	%st%d	T	%smn%d	%s%d\n",
+				get_event_time_stamp(ev, options), prefix, threadid, prefix, nodeid, prefix, threadid);
 		fprintf(out_paje_file, "7	%.9f	%sw%d	W	%st%d	%s%s%d\n",
 			get_event_time_stamp(ev, options), prefix, workerid, prefix, threadid, prefix, kindstr, devid);
 #endif
@@ -447,7 +471,7 @@ static void handle_worker_init_start(struct fxt_ev_64 *ev, struct starpu_fxt_opt
 
 	/* start initialization */
 	if (out_paje_file)
-		worker_set_state(get_event_time_stamp(ev, options), prefix, threadid, "I");
+		thread_set_state(get_event_time_stamp(ev, options), prefix, threadid, "I");
 
 	if (activity_file)
 	fprintf(activity_file, "name\t%d\t%s %d\n", workerid, kindstr, devid);
@@ -462,13 +486,16 @@ static void handle_worker_init_end(struct fxt_ev_64 *ev, struct starpu_fxt_optio
 	int worker;
 
 	if (out_paje_file)
-		worker_set_state(get_event_time_stamp(ev, options), prefix, ev->param[0], "B");
+		thread_set_state(get_event_time_stamp(ev, options), prefix, ev->param[0], "B");
 
 	if (ev->nb_params < 2)
 		worker = find_worker_id(ev->param[0]);
 	else
 		worker = ev->param[1];
 
+	if (out_paje_file)
+		worker_set_state(get_event_time_stamp(ev, options), prefix, worker, "I");
+
 	/* Initilize the accumulated time counters */
 	last_activity_flush_timestamp[worker] = get_event_time_stamp(ev, options);
 	accumulated_sleep_time[worker] = 0.0;
@@ -480,7 +507,7 @@ static void handle_worker_deinit_start(struct fxt_ev_64 *ev, struct starpu_fxt_o
 	char *prefix = options->file_prefix;
 
 	if (out_paje_file)
-		worker_set_state(get_event_time_stamp(ev, options), prefix, ev->param[0], "D");
+		thread_set_state(get_event_time_stamp(ev, options), prefix, ev->param[0], "D");
 }
 
 static void handle_worker_deinit_end(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
@@ -556,7 +583,7 @@ static void create_paje_state_if_not_found(char *name, struct starpu_fxt_options
 	if (out_paje_file)
 	{
 #ifdef STARPU_HAVE_POTI
-		create_paje_state_color(name, "S", red, green, blue);
+		create_paje_state_color(name, "WS", red, green, blue);
 		int i;
 		for(i = 1; i < STARPU_NMAX_SCHED_CTXS; i++)
 		{
@@ -595,7 +622,7 @@ static void create_paje_state_if_not_found(char *name, struct starpu_fxt_options
 /* 		create_paje_state_color(name, "Ctx9", .0, .0, 1.0); */
 /* 		create_paje_state_color(name, "Ctx10", 154.0, 205.0, 50.0); */
 #else
-		fprintf(out_paje_file, "6	%s	S	%s	\"%f %f %f\" \n", name, name, red, green, blue);
+		fprintf(out_paje_file, "6	%s	WS	%s	\"%f %f %f\" \n", name, name, red, green, blue);
 		int i;
 		for(i = 1; i < STARPU_NMAX_SCHED_CTXS; i++)
 		{
@@ -640,8 +667,7 @@ static void create_paje_state_if_not_found(char *name, struct starpu_fxt_options
 
 static void handle_start_codelet_body(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
 {
-	int worker;
-	worker = find_worker_id(ev->param[2]);
+	int worker = ev->param[2];
 
 	if (worker < 0) return;
 
@@ -668,10 +694,10 @@ static void handle_start_codelet_body(struct fxt_ev_64 *ev, struct starpu_fxt_op
 			char container[STARPU_POTI_STR_LEN];
 			char ctx[6];
 			snprintf(ctx, sizeof(ctx), "Ctx%d", sched_ctx);
-			thread_container_alias(container, STARPU_POTI_STR_LEN, prefix, ev->param[2]);
+			worker_container_alias(container, STARPU_POTI_STR_LEN, prefix, ev->param[2]);
 			poti_SetState(start_codelet_time, container, ctx, name);
 #else
-			fprintf(out_paje_file, "10	%.9f	%st%"PRIu64"	Ctx%d	%s\n", start_codelet_time, prefix, ev->param[2], sched_ctx, name);
+			fprintf(out_paje_file, "10	%.9f	%sw%"PRIu64"	Ctx%d	%s\n", start_codelet_time, prefix, ev->param[2], sched_ctx, name);
 #endif
 		}
 	}
@@ -682,8 +708,7 @@ static void handle_start_codelet_body(struct fxt_ev_64 *ev, struct starpu_fxt_op
 static void handle_codelet_details(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
 {
 #ifdef STARPU_ENABLE_PAJE_CODELET_DETAILS
-	int worker;
-	worker = find_worker_id(ev->param[5]);
+	int worker = ev->param[5];
 
 	unsigned sched_ctx = ev->param[1];
 	if (worker < 0) return;
@@ -699,10 +724,10 @@ static void handle_codelet_details(struct fxt_ev_64 *ev, struct starpu_fxt_optio
 			char container[STARPU_POTI_STR_LEN];
 			char ctx[6];
 			snprintf(ctx, sizeof(ctx), "Ctx%d", sched_ctx);
-			thread_container_alias(container, STARPU_POTI_STR_LEN, prefix, ev->param[5]);
+			worker_container_alias(container, STARPU_POTI_STR_LEN, prefix, ev->param[5]);
 			poti_SetState(last_codelet_start[worker], container, ctx, last_codelet_symbol[worker]);
 #else
-			fprintf(out_paje_file, "20	%.9f	%st%"PRIu64"	Ctx%d	%s	%08lx	%lu	%016llx\n", last_codelet_start[worker], prefix, ev->param[2], sched_ctx, last_codelet_symbol[worker], (unsigned long) ev->param[2], (unsigned long) ev->param[3], (unsigned long long) ev->param[4]);
+			fprintf(out_paje_file, "20	%.9f	%sw%"PRIu64"	Ctx%d	%s	%08lx	%lu	%016llx\n", last_codelet_start[worker], prefix, ev->param[2], sched_ctx, last_codelet_symbol[worker], (unsigned long) ev->param[2], (unsigned long) ev->param[3], (unsigned long long) ev->param[4]);
 #endif
 		}
 	}
@@ -714,8 +739,7 @@ static struct starpu_fxt_codelet_event *dumped_codelets;
 
 static void handle_end_codelet_body(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
 {
-	int worker;
-	worker = find_worker_id(ev->param[6]);
+	int worker = ev->param[6];
 	if (worker < 0) return;
 
 	char *prefix = options->file_prefix;
@@ -726,7 +750,7 @@ static void handle_end_codelet_body(struct fxt_ev_64 *ev, struct starpu_fxt_opti
 	uint32_t codelet_hash = ev->param[2];
 
 	if (out_paje_file)
-		worker_set_state(end_codelet_time, prefix, ev->param[6], "B");
+		worker_set_state(end_codelet_time, prefix, ev->param[6], "I");
 
 	double codelet_length = (end_codelet_time - last_codelet_start[worker]);
 
@@ -753,6 +777,22 @@ static void handle_end_codelet_body(struct fxt_ev_64 *ev, struct starpu_fxt_opti
 	}
 }
 
+static void handle_start_thread_executing(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
+{
+	char *prefix = options->file_prefix;
+
+	if (out_paje_file)
+		thread_set_state(get_event_time_stamp(ev, options), prefix, ev->param[0], "E");
+}
+
+static void handle_end_thread_executing(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
+{
+	char *prefix = options->file_prefix;
+
+	if (out_paje_file)
+		thread_set_state(get_event_time_stamp(ev, options), prefix, ev->param[0], "B");
+}
+
 static void handle_user_event(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
 {
 	int worker;
@@ -797,7 +837,7 @@ static void handle_start_callback(struct fxt_ev_64 *ev, struct starpu_fxt_option
 		return;
 
 	if (out_paje_file)
-		worker_set_state(get_event_time_stamp(ev, options), options->file_prefix, ev->param[1], "C");
+		thread_set_state(get_event_time_stamp(ev, options), options->file_prefix, ev->param[1], "C");
 }
 
 static void handle_end_callback(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
@@ -808,7 +848,7 @@ static void handle_end_callback(struct fxt_ev_64 *ev, struct starpu_fxt_options
 		return;
 
 	if (out_paje_file)
-		worker_set_state(get_event_time_stamp(ev, options), options->file_prefix, ev->param[1], "B");
+		thread_set_state(get_event_time_stamp(ev, options), options->file_prefix, ev->param[1], "B");
 }
 
 static void handle_hyp_begin(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
@@ -819,7 +859,7 @@ static void handle_hyp_begin(struct fxt_ev_64 *ev, struct starpu_fxt_options *op
 		return;
 
 	if (out_paje_file)
-		worker_set_state(get_event_time_stamp(ev, options), options->file_prefix, ev->param[0], "H");
+		thread_set_state(get_event_time_stamp(ev, options), options->file_prefix, ev->param[0], "H");
 }
 
 static void handle_hyp_end(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
@@ -830,7 +870,7 @@ static void handle_hyp_end(struct fxt_ev_64 *ev, struct starpu_fxt_options *opti
 		return;
 
 	if (out_paje_file)
-		worker_set_state(get_event_time_stamp(ev, options), options->file_prefix, ev->param[0], "B");
+		thread_set_state(get_event_time_stamp(ev, options), options->file_prefix, ev->param[0], "B");
 }
 
 static void handle_worker_status(struct fxt_ev_64 *ev, struct starpu_fxt_options *options, const char *newstatus)
@@ -841,7 +881,7 @@ static void handle_worker_status(struct fxt_ev_64 *ev, struct starpu_fxt_options
 		return;
 
 	if (out_paje_file)
-		worker_set_state(get_event_time_stamp(ev, options), options->file_prefix, ev->param[1], newstatus);
+		thread_set_state(get_event_time_stamp(ev, options), options->file_prefix, ev->param[1], newstatus);
 }
 
 static double last_sleep_start[STARPU_NMAXWORKERS];
@@ -853,7 +893,7 @@ static void handle_start_scheduling(struct fxt_ev_64 *ev, struct starpu_fxt_opti
 	if (worker < 0) return;
 
 	if (out_paje_file)
-		worker_set_state(get_event_time_stamp(ev, options), options->file_prefix, ev->param[0], "Sc");
+		thread_set_state(get_event_time_stamp(ev, options), options->file_prefix, ev->param[0], "Sc");
 }
 
 static void handle_end_scheduling(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
@@ -863,7 +903,7 @@ static void handle_end_scheduling(struct fxt_ev_64 *ev, struct starpu_fxt_option
 	if (worker < 0) return;
 
 	if (out_paje_file)
-		worker_set_state(get_event_time_stamp(ev, options), options->file_prefix, ev->param[0], "B");
+		thread_set_state(get_event_time_stamp(ev, options), options->file_prefix, ev->param[0], "B");
 }
 
 static void handle_push_scheduling(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
@@ -873,7 +913,7 @@ static void handle_push_scheduling(struct fxt_ev_64 *ev, struct starpu_fxt_optio
 	if (worker < 0) return;
 
 	if (out_paje_file)
-		worker_push_state(get_event_time_stamp(ev, options), options->file_prefix, ev->param[0], "Sc");
+		thread_push_state(get_event_time_stamp(ev, options), options->file_prefix, ev->param[0], "Sc");
 }
 
 static void handle_pop_scheduling(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
@@ -883,7 +923,7 @@ static void handle_pop_scheduling(struct fxt_ev_64 *ev, struct starpu_fxt_option
 	if (worker < 0) return;
 
 	if (out_paje_file)
-		worker_pop_state(get_event_time_stamp(ev, options), options->file_prefix, ev->param[0]);
+		thread_pop_state(get_event_time_stamp(ev, options), options->file_prefix, ev->param[0]);
 }
 
 static void handle_start_sleep(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
@@ -896,7 +936,7 @@ static void handle_start_sleep(struct fxt_ev_64 *ev, struct starpu_fxt_options *
 	last_sleep_start[worker] = start_sleep_time;
 
 	if (out_paje_file)
-		worker_set_state(get_event_time_stamp(ev, options), options->file_prefix, ev->param[0], "Sl");
+		thread_set_state(get_event_time_stamp(ev, options), options->file_prefix, ev->param[0], "Sl");
 }
 
 static void handle_end_sleep(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
@@ -908,7 +948,7 @@ static void handle_end_sleep(struct fxt_ev_64 *ev, struct starpu_fxt_options *op
 	double end_sleep_timestamp = get_event_time_stamp(ev, options);
 
 	if (out_paje_file)
-		worker_set_state(end_sleep_timestamp, options->file_prefix, ev->param[0], "B");
+		thread_set_state(end_sleep_timestamp, options->file_prefix, ev->param[0], "B");
 
 	double sleep_length = end_sleep_timestamp - last_sleep_start[worker];
 
@@ -1609,6 +1649,13 @@ void starpu_fxt_parse_new_file(char *filename_in, struct starpu_fxt_options *opt
 				handle_end_codelet_body(&ev, options);
 				break;
 
+			case _STARPU_FUT_START_EXECUTING:
+				handle_start_thread_executing(&ev, options);
+				break;
+			case _STARPU_FUT_END_EXECUTING:
+				handle_end_thread_executing(&ev, options);
+				break;
+
 			case _STARPU_FUT_START_CALLBACK:
 				handle_start_callback(&ev, options);
 				break;
@@ -2348,7 +2395,7 @@ void starpu_fxt_write_data_trace(char *filename_in)
 			break;
 
 		case _STARPU_FUT_START_CODELET_BODY:
-			workerid = find_worker_id(ev.param[2]);
+			workerid = ev.param[2];
 			tasks[workerid].exec_time = ev.time;
 			has_name = ev.param[3];
 			tasks[workerid].codelet_name = strdup(has_name ? (char *) &ev.param[4] : "unknown");
@@ -2356,7 +2403,7 @@ void starpu_fxt_write_data_trace(char *filename_in)
 			break;
 
 		case _STARPU_FUT_END_CODELET_BODY:
-			workerid = find_worker_id(ev.param[6]);
+			workerid = ev.param[6];
 			assert(workerid != -1);
 			tasks[workerid].exec_time = ev.time - tasks[workerid].exec_time;
 			write_task(tasks[workerid]);

+ 8 - 0
src/debug/traces/starpu_paje.c

@@ -176,10 +176,13 @@ void _starpu_fxt_write_paje_header(FILE *file)
 	poti_DefineEntityValue("Po", "S", "PushingOutput", "0.1 1.0 1.0");
 	poti_DefineEntityValue("C", "S", "Callback", ".0 .3 .8");
 	poti_DefineEntityValue("B", "S", "Overhead", ".5 .18 .0");
+	poti_DefineEntityValue("E", "S", "Executing", ".0 .6 .5");
 	poti_DefineEntityValue("Sc", "S", "Scheduling", ".7 .36 .0");
 	poti_DefineEntityValue("Sl", "S", "Sleeping", ".9 .1 .0");
 	poti_DefineEntityValue("P", "S", "Progressing", ".4 .1 .6");
 	poti_DefineEntityValue("U", "S", "Unpartitioning", ".0 .0 1.0");
+	poti_DefineStateType("WS", "W", "Worker State");
+	poti_DefineEntityValue("I", "WS", "Idle", ".9 .1 .0");
 
 	/* Types for the MPI Communication Thread of the Memory Node */
 	poti_DefineEventType("MPIev", "MPICt", "MPI event type");
@@ -205,6 +208,7 @@ void _starpu_fxt_write_paje_header(FILE *file)
 		poti_DefineEntityValue("Po", ctx, "PushingOutput", "0.1 1.0 1.0");
 		poti_DefineEntityValue("C", ctx, "Callback", ".0 .3 .8");
 		poti_DefineEntityValue("B", ctx, "Overhead", ".5 .18 .0");
+		poti_DefineEntityValue("E", ctx, "Executing", ".0 .6 .5");
 		poti_DefineEntityValue("Sc", ctx, "Scheduling", ".7 .36 .0");
 		poti_DefineEntityValue("Sl", ctx, "Sleeping", ".9 .1 .0");
 		poti_DefineEntityValue("P", ctx, "Progressing", ".4 .1 .6");
@@ -249,10 +253,13 @@ void _starpu_fxt_write_paje_header(FILE *file)
 6       Po       S      PushingOutput       \"0.1 1.0 1.0\"            \n\
 6       C       S       Callback       \".0 .3 .8\"            \n\
 6       B       S       Overhead         \".5 .18 .0\"		\n\
+6       E       S       Executing         \".0 .6 .5\"		\n\
 6       Sc       S      Scheduling         \".7 .36 .0\"		\n\
 6       Sl       S      Sleeping         \".9 .1 .0\"		\n\
 6       P       S       Progressing         \".4 .1 .6\"		\n\
 6       U       S       Unpartitioning      \".0 .0 1.0\"		\n\
+3       WS       W       \"Worker State\"                        \n\
+6       I       WS       Idle         \".9 .1 .0\"		\n\
 6       H       S       Hypervisor      \".5 .18 .0\"		\n");
 	fprintf(file, "\
 6       P       CtS       Processing         \"0 0 0\"		\n\
@@ -271,6 +278,7 @@ void _starpu_fxt_write_paje_header(FILE *file)
 6       Po       Ctx%u      PushingOutput       \"0.1 1.0 1.0\"            \n\
 6       C       Ctx%u       Callback       \".0 .3 .8\"            \n\
 6       B       Ctx%u       Overhead         \".5 .18 .0\"		\n\
+6       E       Ctx%u       Executing         \".0 .6 .5\"		\n\
 6       Sc       Ctx%u      Scheduling         \".7 .36 .0\"		\n\
 6       Sl       Ctx%u      Sleeping         \".9 .1 .0\"		\n\
 6       P       Ctx%u       Progressing         \".4 .1 .6\"		\n\

+ 2 - 0
src/drivers/cpu/driver_cpu.c

@@ -102,11 +102,13 @@ static int execute_job_on_cpu(struct _starpu_job *j, struct starpu_task *worker_
 		STARPU_ASSERT_MSG(func, "when STARPU_CPU is defined in 'where', cpu_func or cpu_funcs has to be defined");
 		if (starpu_get_env_number("STARPU_DISABLE_KERNELS") <= 0)
 		{
+			_STARPU_TRACE_START_EXECUTING();
 #ifdef STARPU_SIMGRID
 			_starpu_simgrid_execute_job(j, perf_arch, NAN);
 #else
 			func(_STARPU_TASK_GET_INTERFACES(task), task->cl_arg);
 #endif
+			_STARPU_TRACE_END_EXECUTING();
 		}
 		if (is_parallel_task && cl->type == STARPU_FORKJOIN)
 			/* rebind to single CPU */

+ 20 - 0
src/drivers/cuda/driver_cuda.c

@@ -408,11 +408,13 @@ static int start_job_on_cuda(struct _starpu_job *j, struct _starpu_worker *args)
 
 	if (starpu_get_env_number("STARPU_DISABLE_KERNELS") <= 0)
 	{
+		_STARPU_TRACE_START_EXECUTING();
 #ifdef STARPU_SIMGRID
 		_starpu_simgrid_execute_job(j, &args->perf_arch, NAN);
 #else
 		func(_STARPU_TASK_GET_INTERFACES(task), task->cl_arg);
 #endif
+		_STARPU_TRACE_END_EXECUTING();
 	}
 
 	return 0;
@@ -550,6 +552,15 @@ int _starpu_cuda_driver_run_once(struct _starpu_worker_set *worker_set)
 			_starpu_set_local_worker_key(args);
 			finish_job_on_cuda(_starpu_get_job_associated_to_task(task), args);
 			idle++;
+#ifdef STARPU_USE_FXT
+			int k;
+			for (k = 0; k < (int) worker_set->nworkers; k++)
+				if (worker_set->workers[k].current_task)
+					break;
+			if (k == (int) worker_set->nworkers)
+				/* Everybody busy */
+				_STARPU_TRACE_END_EXECUTING()
+#endif
 		}
 	}
 
@@ -612,6 +623,15 @@ int _starpu_cuda_driver_run_once(struct _starpu_worker_set *worker_set)
 		{
 			/* Record event to synchronize with task termination later */
 			cudaEventRecord(task_events[workerid], starpu_cuda_get_local_stream());
+#ifdef STARPU_USE_FXT
+			int k;
+			for (k = 0; k < (int) worker_set->nworkers; k++)
+				if (worker_set->workers[k].current_task)
+					break;
+			if (k < (int) worker_set->nworkers)
+				/* Everybody busy */
+				_STARPU_TRACE_START_EXECUTING()
+#endif
 		}
 		else
 #else

+ 10 - 9
src/drivers/driver_common/driver_common.c

@@ -74,7 +74,7 @@ void _starpu_driver_start_job(struct _starpu_worker *args, struct _starpu_job *j
 	if (starpu_top)
 		_starpu_top_task_started(task,workerid,codelet_start);
 
-	_STARPU_TRACE_START_CODELET_BODY(j, j->nimpl, perf_arch);
+	_STARPU_TRACE_START_CODELET_BODY(j, j->nimpl, perf_arch, workerid);
 }
 
 void _starpu_driver_end_job(struct _starpu_worker *args, struct _starpu_job *j, struct starpu_perfmodel_arch* perf_arch STARPU_ATTRIBUTE_UNUSED, struct timespec *codelet_end, int rank, int profiling)
@@ -86,7 +86,7 @@ void _starpu_driver_end_job(struct _starpu_worker *args, struct _starpu_job *j,
 	int workerid = args->workerid;
 	unsigned calibrate_model = 0;
 
-	_STARPU_TRACE_END_CODELET_BODY(j, j->nimpl, perf_arch);
+	_STARPU_TRACE_END_CODELET_BODY(j, j->nimpl, perf_arch, workerid);
 
 	if (cl && cl->model && cl->model->benchmarking)
 		calibrate_model = 1;
@@ -361,13 +361,13 @@ struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *args, int wor
 		return NULL;
 	}
 
-	STARPU_PTHREAD_MUTEX_UNLOCK(&args->sched_mutex);
-
 	_starpu_worker_set_status_scheduling_done(workerid);
 
 	_starpu_worker_set_status_wakeup(workerid);
 	args->spinning_backoff = BACKOFF_MIN;
 
+	STARPU_PTHREAD_MUTEX_UNLOCK(&args->sched_mutex);
+
 
 #ifdef HAVE_AYUDAME_H
 	if (AYU_event)
@@ -398,13 +398,16 @@ int _starpu_get_multi_worker_task(struct _starpu_worker *workers, struct starpu_
 		/*else try to pop a task*/
 		else
 		{
-			_starpu_worker_set_status_scheduling(workers[i].workerid);
 			STARPU_PTHREAD_MUTEX_LOCK(&workers[i].sched_mutex);
+			_starpu_worker_set_status_scheduling(workers[i].workerid);
 			_starpu_set_local_worker_key(&workers[i]);
 			tasks[i] = _starpu_pop_task(&workers[i]);
-			STARPU_PTHREAD_MUTEX_UNLOCK(&workers[i].sched_mutex);
 			if(tasks[i] != NULL)
 			{
+				_starpu_worker_set_status_scheduling_done(workers[i].workerid);
+				_starpu_worker_set_status_wakeup(workers[i].workerid);
+				STARPU_PTHREAD_MUTEX_UNLOCK(&workers[i].sched_mutex);
+
 				count ++;
 				j = _starpu_get_job_associated_to_task(tasks[i]);
 				is_parallel_task = (j->task_size > 1);
@@ -427,13 +430,11 @@ int _starpu_get_multi_worker_task(struct _starpu_worker *workers, struct starpu_
 					workers[i].worker_size = 1;
 					workers[i].current_rank = 0;
 				}
-
-				_starpu_worker_set_status_scheduling_done(workers[i].workerid);
-				_starpu_worker_set_status_wakeup(workers[i].workerid);
 			}
 			else
 			{
 				_starpu_worker_set_status_sleeping(workers[i].workerid);
+				STARPU_PTHREAD_MUTEX_UNLOCK(&workers[i].sched_mutex);
 			}
 		}
 	}

+ 5 - 0
src/drivers/opencl/driver_opencl.c

@@ -635,12 +635,14 @@ int _starpu_opencl_driver_run_once(struct _starpu_worker *args)
 
 		if (status != CL_COMPLETE)
 		{
+			_STARPU_TRACE_START_EXECUTING();
 			/* Not ready yet, no better thing to do than waiting */
 			__starpu_datawizard_progress(memnode, 1, 0);
 			return 0;
 		}
 
 		/* Asynchronous task completed! */
+		_STARPU_TRACE_END_EXECUTING();
 		_starpu_opencl_stop_job(_starpu_get_job_associated_to_task(task), args);
 	}
 #endif /* STARPU_SIMGRID */
@@ -698,6 +700,7 @@ int _starpu_opencl_driver_run_once(struct _starpu_worker *args)
 		 */
 		err = clEnqueueMarker(queue, &task_events[args->devid]);
 		if (STARPU_UNLIKELY(err != CL_SUCCESS)) STARPU_OPENCL_REPORT_ERROR(err);
+		_STARPU_TRACE_START_EXECUTING();
 	}
 	else
 #else
@@ -832,6 +835,7 @@ static int _starpu_opencl_start_job(struct _starpu_job *j, struct _starpu_worker
 
 	if (starpu_get_env_number("STARPU_DISABLE_KERNELS") <= 0)
 	{
+		_STARPU_TRACE_START_EXECUTING();
 #ifdef STARPU_SIMGRID
 		double length = NAN;
 	  #ifdef STARPU_OPENCL_SIMULATOR
@@ -851,6 +855,7 @@ static int _starpu_opencl_start_job(struct _starpu_job *j, struct _starpu_worker
 #else
 		func(_STARPU_TASK_GET_INTERFACES(task), task->cl_arg);
 #endif
+		_STARPU_TRACE_END_EXECUTING();
 	}
 	return 0;
 }

+ 1 - 1
src/sched_policies/deque_modeling_policy_data_aware.c

@@ -1019,7 +1019,7 @@ struct starpu_sched_policy _starpu_sched_dm_policy =
 	.remove_workers = dmda_remove_workers,
 	.push_task = dm_push_task,
 	.pop_task = dmda_pop_task,
-	.pre_exec_hook = NULL,
+	.pre_exec_hook = dmda_pre_exec_hook,
 	.post_exec_hook = dmda_post_exec_hook,
 	.pop_every_task = dmda_pop_every_task,
 	.policy_name = "dm",