Преглед изворни кода

new merge after the merge of heft+dmda

Andra Hugo пре 12 година
родитељ
комит
b9f1897f10

+ 2 - 0
ChangeLog

@@ -86,6 +86,8 @@ Changes:
     - It is no longer possible to enable the cell support via the
       gordon driver
   * Fix data transfer arrows in paje traces
+  * The "heft" scheduler no longer exists. Users should now pick "dmda"
+    instead.
 
 Small changes:
   * STARPU_NCPU should now be used instead of STARPU_NCPUS. STARPU_NCPUS is

+ 5 - 1
configure.ac

@@ -1267,7 +1267,11 @@ else
   build_starpu_top=no
 fi
 
-AM_CONDITIONAL(BUILD_STARPU_TOP, test x$build_starpu_top = xyes)
+AC_SUBST(STARPU_USE_TOP, test "x$build_starpu_top" = "xyes")
+AM_CONDITIONAL(BUILD_STARPU_TOP, test "x$build_starpu_top" = "xyes")
+if test "x$build_starpu_top" = "xyes"; then
+	AC_DEFINE(STARPU_USE_TOP, [1], [StarPU-Top is activated])
+fi
 
 ###############################################################################
 #                                                                             #

+ 5 - 5
doc/chapters/advanced-examples.texi

@@ -63,8 +63,8 @@ struct starpu_codelet cl = @{
 @end smallexample
 @end cartouche
 
-Schedulers which are multi-implementation aware (only @code{dmda}, @code{heft}
-and @code{pheft} for now) will use the performance models of all the
+Schedulers which are multi-implementation aware (only @code{dmda} and
+@code{pheft} for now) will use the performance models of all the
 implementations it was given, and pick the one that seems to be the fastest.
 
 @node Enabling implementation according to capabilities
@@ -334,7 +334,7 @@ the @code{starpu_perfmodel_plot} (@pxref{Performance model calibration}).  The
 models are indexed by machine name. To
 share the models between machines (e.g. for a homogeneous cluster), use
 @code{export STARPU_HOSTNAME=some_global_name}. Measurements are only done
-when using a task scheduler which makes use of it, such as @code{heft} or
+when using a task scheduler which makes use of it, such as 
 @code{dmda}. Measurements can also be provided explicitly by the application, by
 using the @code{starpu_perfmodel_update_history} function.
 
@@ -389,7 +389,7 @@ inputs. The application can also provide the measurements explictly by using
 tools can be used to observe how much the performance model is calibrated (@pxref{Performance model calibration}); when
 their output look good, @code{STARPU_CALIBRATE} can be reset to @code{0} to let
 StarPU use the resulting performance model without recording new measures, and
-@code{STARPU_SCHED} can be set to @code{heft} to benefit from the performance models. If
+@code{STARPU_SCHED} can be set to @code{dmda} to benefit from the performance models. If
 the data input sizes vary a lot, it is really important to set
 @code{STARPU_CALIBRATE} to @code{0}, otherwise StarPU will continue adding the
 measures, and result with a very big performance model, which will take time a
@@ -979,7 +979,7 @@ It may be interesting to represent the same piece of data using two different
 data structures: one that would only be used on CPUs, and one that would only
 be used on GPUs. This can be done by using the multiformat interface. StarPU
 will be able to convert data from one data structure to the other when needed.
-Note that the heft scheduler is the only one optimized for this interface. The
+Note that the dmda scheduler is the only one optimized for this interface. The
 user must provide StarPU with conversion codelets:
 
 @cartouche

+ 1 - 1
doc/chapters/benchmarks.texi

@@ -14,7 +14,7 @@
 
 Some interesting benchmarks are installed among examples in
 /usr/lib/starpu/examples . Make sure to try various schedulers, for instance
-STARPU_SCHED=heft
+STARPU_SCHED=dmda
 
 @node Task size overhead
 @section Task size overhead

+ 1 - 1
doc/chapters/configuration.texi

@@ -459,7 +459,7 @@ the execution. If it is set to 2, the previous values are dropped to restart
 calibration from scratch. Setting this variable to 0 disable calibration, this
 is the default behaviour.
 
-Note: this currently only applies to @code{dm}, @code{dmda} and @code{heft} scheduling policies.
+Note: this currently only applies to @code{dm} and @code{dmda} scheduling policies.
 @end defvr
 
 @defvr {Environment variable} STARPU_BUS_CALIBRATE

+ 2 - 2
doc/chapters/perf-feedback.texi

@@ -221,13 +221,13 @@ start the application itself (possibly on a remote machine). The SSH checkbox
 should be checked, and a command line provided, e.g.:
 
 @example
-ssh myserver STARPU_SCHED=heft ./application
+ssh myserver STARPU_SCHED=dmda ./application
 @end example
 
 If port 2011 of the remote machine can not be accessed directly, an ssh port bridge should be added:
 
 @example
-ssh -L 2011:localhost:2011 myserver STARPU_SCHED=heft ./application
+ssh -L 2011:localhost:2011 myserver STARPU_SCHED=dmda ./application
 @end example
 
 and "localhost" should be used as IP Address to connect to.

+ 2 - 1
doc/chapters/perf-optimization.texi

@@ -191,7 +191,8 @@ buffers.
 The @b{dmdas} (deque model data aware sorted) scheduler is similar to dmda, it
 also supports arbitrary priority values.
 
-The @b{heft} (heterogeneous earliest finish time) scheduler is similar to dmda, it also supports task bundles.
+The @b{heft} (heterogeneous earliest finish time) scheduler is deprecated. It
+is now just an alias for @b{dmda}.
 
 The @b{pheft} (parallel HEFT) scheduler is similar to heft, it also supports
 parallel tasks (still experimental).

+ 1 - 1
gcc-plugin/examples/cholesky/cholesky.c

@@ -107,7 +107,7 @@ int main(int argc, char **argv)
 #endif
 //	struct starpu_conf conf;
 //	starpu_conf_init(&conf);
-//	conf.sched_policy_name = "heft";
+//	conf.sched_policy_name = "dmda";
 //	conf.calibrate = 1;
 #pragma starpu initialize
 

+ 1 - 4
include/pthread_win32/pthread.h

@@ -54,9 +54,6 @@ extern "C" {
 #define winPthreadAssertPthread(expr) do { int ret = (expr); if (ret) return ret; } while (0)
 #define winPthreadAssert(expr) do { if (!(expr)) return EIO; } while (0)
 #endif
-#if 0
-#else
-#endif
 
 /***********
  * threads *
@@ -137,7 +134,7 @@ static inline int pthread_cancel (pthread_t thread) {
 }
 
 static inline void pthread_exit (void *res) {
-  ExitThread((DWORD_PTR) (DWORD) res);
+  ExitThread((DWORD) (DWORD_PTR) res);
 }
 
 static inline int pthread_join (pthread_t thread, void **res) {

+ 1 - 0
include/starpu_config.h.in

@@ -97,5 +97,6 @@ typedef ssize_t starpu_ssize_t;
 #undef STARPU_USE_ERAND48_R
 #undef STARPU_HAVE_NEARBYINTF
 #undef STARPU_HAVE_RINTF
+#undef STARPU_USE_TOP
 
 #endif

+ 0 - 1
src/Makefile.am

@@ -161,7 +161,6 @@ libstarpu_@STARPU_EFFECTIVE_VERSION@_la_SOURCES = 						\
 	sched_policies/eager_central_priority_policy.c		\
 	sched_policies/work_stealing_policy.c			\
 	sched_policies/deque_modeling_policy_data_aware.c	\
-	sched_policies/heft.c					\
 	sched_policies/random_policy.c				\
 	sched_policies/stack_queues.c				\
 	sched_policies/deque_queues.c				\

+ 6 - 7
src/core/dependencies/implicit_data_deps.c

@@ -83,8 +83,8 @@ static void _starpu_add_reader_after_writer(starpu_data_handle_t handle, struct
 #endif
 		) && handle->last_submitted_ghost_writer_id_is_valid)
 	{
-		struct _starpu_job *pre_sync_job = _starpu_get_job_associated_to_task(pre_sync_task);
-		_STARPU_TRACE_GHOST_TASK_DEPS(handle->last_submitted_ghost_writer_id, pre_sync_job->job_id);
+		_STARPU_TRACE_GHOST_TASK_DEPS(handle->last_submitted_ghost_writer_id,
+			_starpu_get_job_associated_to_task(pre_sync_task)->job_id);
 		_starpu_add_ghost_dependency(handle, handle->last_submitted_ghost_writer_id, pre_sync_task);
 		_STARPU_DEP_DEBUG("dep ID%lu -> %p\n", handle->last_submitted_ghost_writer_id, pre_sync_task);
 	}
@@ -134,13 +134,12 @@ static void _starpu_add_writer_after_readers(starpu_data_handle_t handle, struct
 #endif
 	{
 		/* Declare all dependencies with ghost readers */
-		struct _starpu_job *pre_sync_job = _starpu_get_job_associated_to_task(pre_sync_task);
-
 		struct _starpu_jobid_list *ghost_readers_id = handle->last_submitted_ghost_readers_id;
 		while (ghost_readers_id)
 		{
 			unsigned long id = ghost_readers_id->id;
-			_STARPU_TRACE_GHOST_TASK_DEPS(id, pre_sync_job->job_id);
+			_STARPU_TRACE_GHOST_TASK_DEPS(id,
+				_starpu_get_job_associated_to_task(pre_sync_task)->job_id);
 			_starpu_add_ghost_dependency(handle, id, pre_sync_task);
 			_STARPU_DEP_DEBUG("dep ID%lu -> %p\n", id, pre_sync_task);
 
@@ -184,8 +183,8 @@ static void _starpu_add_writer_after_writer(starpu_data_handle_t handle, struct
 	{
 		if (handle->last_submitted_ghost_writer_id_is_valid)
 		{
-			struct _starpu_job *pre_sync_job = _starpu_get_job_associated_to_task(pre_sync_task);
-			_STARPU_TRACE_GHOST_TASK_DEPS(handle->last_submitted_ghost_writer_id, pre_sync_job->job_id);
+			_STARPU_TRACE_GHOST_TASK_DEPS(handle->last_submitted_ghost_writer_id, 
+				_starpu_get_job_associated_to_task(pre_sync_task)->job_id);
 			_starpu_add_ghost_dependency(handle, handle->last_submitted_ghost_writer_id, pre_sync_task);
 			_STARPU_DEP_DEBUG("dep ID%lu -> %p\n", handle->last_submitted_ghost_writer_id, pre_sync_task);
 			handle->last_submitted_ghost_writer_id_is_valid = 0;

+ 6 - 1
src/core/sched_policy.c

@@ -39,7 +39,6 @@ static struct starpu_sched_policy *predefined_policies[] =
 	&_starpu_sched_prio_policy,
 	&_starpu_sched_dm_policy,
 	&_starpu_sched_dmda_policy,
-	&_starpu_sched_heft_policy,
 	&_starpu_sched_dmda_ready_policy,
 	&_starpu_sched_dmda_sorted_policy,
 	&_starpu_sched_random_policy,
@@ -81,6 +80,12 @@ static struct starpu_sched_policy *find_sched_policy_from_name(const char *polic
 	if (!policy_name)
 		return NULL;
 
+	if (strncmp(policy_name, "heft", 5) == 0)
+	{
+		_STARPU_DISP("Warning: heft is now called \"dmda\".\n");
+		return &_starpu_sched_dmda_policy;
+	}
+
 	unsigned i;
 	for (i = 0; i < sizeof(predefined_policies)/sizeof(predefined_policies[0]); i++)
 	{

+ 0 - 1
src/core/sched_policy.h

@@ -58,7 +58,6 @@ extern struct starpu_sched_policy _starpu_sched_dmda_sorted_policy;
 extern struct starpu_sched_policy _starpu_sched_eager_policy;
 extern struct starpu_sched_policy _starpu_sched_parallel_heft_policy;
 extern struct starpu_sched_policy _starpu_sched_pgreedy_policy;
-extern struct starpu_sched_policy _starpu_sched_heft_policy;
 
 
 #endif // __SCHED_POLICY_H__

+ 202 - 36
src/sched_policies/deque_modeling_policy_data_aware.c

@@ -3,7 +3,7 @@
  * Copyright (C) 2010-2012  Université de Bordeaux 1
  * Copyright (C) 2010, 2011, 2012  Centre National de la Recherche Scientifique
  * Copyright (C) 2011  Télécom-SudParis
- * Copyright (C) 2011  INRIA
+ * Copyright (C) 2011-2012  INRIA
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -19,6 +19,7 @@
 
 /* Distributed queues using performance modeling to assign tasks */
 
+#include <starpu_config.h>
 #include <limits.h>
 
 #include <core/perfmodel/perfmodel.h>
@@ -28,6 +29,9 @@
 #include <core/perfmodel/perfmodel.h>
 #include <starpu_parameters.h>
 #include <core/debug.h>
+#ifdef STARPU_USE_TOP
+#include <top/starpu_top_core.h>
+#endif /* !STARPU_USE_TOP */
 
 #ifndef DBL_MIN
 #define DBL_MIN __DBL_MIN__
@@ -49,6 +53,16 @@ typedef struct {
 	long int ready_task_cnt;
 } dmda_data;
 
+#ifdef STARPU_USE_TOP
+static const float alpha_minimum=0;
+static const float alpha_maximum=10.0;
+static const float beta_minimum=0;
+static const float beta_maximum=10.0;
+static const float gamma_minimum=0;
+static const float gamma_maximum=10000.0;
+static const float idle_power_minimum=0;
+static const float idle_power_maximum=10000.0;
+#endif /* !STARPU_USE_TOP */
 
 static int count_non_ready_buffers(struct starpu_task *task, uint32_t node)
 {
@@ -72,6 +86,17 @@ static int count_non_ready_buffers(struct starpu_task *task, uint32_t node)
 	return cnt;
 }
 
+#ifdef STARPU_USE_TOP
+static void param_modified(struct starpu_top_param* d)
+{
+	/* Just to show parameter modification. */
+	fprintf(stderr,
+		"%s has been modified : "
+		"alpha=%f|beta=%f|gamma=%f|idle_power=%f !\n",
+		d->name, alpha,beta,_gamma, idle_power);
+}
+#endif /* !STARPU_USE_TOP */
+
 static struct starpu_task *_starpu_fifo_pop_first_ready_task(struct _starpu_fifo_taskq *fifo_queue, unsigned node)
 {
 	struct starpu_task *task = NULL, *current;
@@ -84,6 +109,8 @@ static struct starpu_task *_starpu_fifo_pop_first_ready_task(struct _starpu_fifo
 		fifo_queue->ntasks--;
 
 		task = starpu_task_list_back(&fifo_queue->taskq);
+		if (STARPU_UNLIKELY(!task))
+			return NULL;
 
 		int first_task_priority = task->priority;
 
@@ -220,22 +247,61 @@ static struct starpu_task *dmda_pop_every_task(unsigned sched_ctx_id)
 
 
 
-static int push_task_on_best_worker(struct starpu_task *task, int best_workerid, double predicted, int prio, unsigned sched_ctx_id)
+static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
+				    double predicted, double predicted_transfer,
+				    int prio, unsigned sched_ctx_id)
 {
 	dmda_data *dt = (dmda_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
 	/* make sure someone coule execute that task ! */
 	STARPU_ASSERT(best_workerid != -1);
 
-	struct _starpu_fifo_taskq *fifo;
-	fifo = dt->queue_array[best_workerid];
+	struct _starpu_fifo_taskq *fifo = dt->queue_array[best_workerid];
+
+	pthread_mutex_t *sched_mutex;
+	pthread_cond_t *sched_cond;
+	starpu_worker_get_sched_condition(sched_ctx_id, best_workerid, &sched_mutex, &sched_cond);
+
+#ifdef STARPU_USE_SCHED_CTX_HYPERVISOR
+	starpu_call_pushed_task_cb(best_workerid, sched_ctx_id);
+#endif //STARPU_USE_SCHED_CTX_HYPERVISOR
+
+	_STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
+
+/* Sometimes workers didn't take the tasks as early as we expected */
+	fifo->exp_start = STARPU_MAX(fifo->exp_start, starpu_timing_now());
+	fifo->exp_end = fifo->exp_start + fifo->exp_len;
 
 	fifo->exp_end += predicted;
 	fifo->exp_len += predicted;
 
-	task->predicted = predicted;
+	if (starpu_timing_now() + predicted_transfer < fifo->exp_end)
+	{
+		/* We may hope that the transfer will be finished by
+		 * the start of the task. */
+		predicted_transfer = 0;
+	}
+	else
+	{
+		/* The transfer will not be finished by then, take the
+		 * remainder into account */
+		predicted_transfer += starpu_timing_now();
+		predicted_transfer -= fifo->exp_end;
+	}
 
-	/* TODO predicted_transfer */
+	fifo->exp_end += predicted_transfer;
+	fifo->exp_len += predicted_transfer;
 
+	_STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
+
+	task->predicted = predicted;
+	task->predicted_transfer = predicted_transfer;
+
+#ifdef STARPU_USE_TOP
+	if (_starpu_top_status_get())
+		_starpu_top_task_prevision(task, best_workerid,
+			(unsigned long long)(fifo->exp_end-predicted)/1000,
+			(unsigned long long)fifo->exp_end/1000);
+#endif /* !STARPU_USE_TOP */
 
 	if (starpu_get_prefetch_flag())
 	{
@@ -249,14 +315,6 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 		AYU_event(AYU_ADDTASKTOQUEUE, _starpu_get_job_associated_to_task(task)->job_id, &id);
 	}
 #endif
-	pthread_mutex_t *sched_mutex;
-	pthread_cond_t *sched_cond;
-	starpu_worker_get_sched_condition(sched_ctx_id, best_workerid, &sched_mutex, &sched_cond);
-
-#ifdef STARPU_USE_SCHED_CTX_HYPERVISOR
-        starpu_call_pushed_task_cb(best_workerid, sched_ctx_id);
-#endif //STARPU_USE_SCHED_CTX_HYPERVISOR 
-
 	if (prio)
 		return _starpu_fifo_push_sorted_task(dt->queue_array[best_workerid],
 			sched_mutex, sched_cond, task);
@@ -276,6 +334,7 @@ static int _dm_push_task(struct starpu_task *task, unsigned prio, unsigned sched
 
 	double best_exp_end = 0.0;
 	double model_best = 0.0;
+	double transfer_model_best = 0.0;
 
 	int ntasks_best = -1;
 	double ntasks_best_end = 0.0;
@@ -293,17 +352,13 @@ static int _dm_push_task(struct starpu_task *task, unsigned prio, unsigned sched
 
 	while(workers->has_next(workers))
         {
-                worker = workers->get_next(workers);
+	         worker = workers->get_next(workers);
+			struct _starpu_fifo_taskq *fifo  = dt->queue_array[worker];
+			unsigned memory_node = starpu_worker_get_memory_node(worker);
+			enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(worker);
+	
 		for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
 		{
-			double exp_end;
-		
-			fifo = dt->queue_array[worker];
-
-			/* Sometimes workers didn't take the tasks as early as we expected */
-			fifo->exp_start = STARPU_MAX(fifo->exp_start, starpu_timing_now());
-			fifo->exp_end = fifo->exp_start + fifo->exp_len;
-
 			if (!starpu_worker_can_execute_task(worker, task, nimpl))
 			{
 				/* no one on that queue may execute this task */
@@ -311,8 +366,20 @@ static int _dm_push_task(struct starpu_task *task, unsigned prio, unsigned sched
 				continue;
 			}
 
-			enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(worker);
+			double exp_end;
+			pthread_mutex_t *sched_mutex;
+			pthread_cond_t *sched_cond;
+			starpu_worker_get_sched_condition(sched_ctx_id, workerid, &sched_mutex, &sched_cond);
+
+			/* Sometimes workers didn't take the tasks as early as we expected */
+			_STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
+			fifo->exp_start = STARPU_MAX(fifo->exp_start, starpu_timing_now());
+			fifo->exp_end = fifo->exp_start + fifo->exp_len;
+			_STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
+
+
 			double local_length = starpu_task_expected_length(task, perf_arch, nimpl);
+			double local_penalty = starpu_task_expected_data_transfer_time(memory_node, task);
 			double ntasks_end = fifo->ntasks / starpu_worker_get_relative_speedup(perf_arch);
 
 			//_STARPU_DEBUG("Scheduler dm: task length (%lf) worker (%u) kernel (%u) \n", local_length,worker,nimpl);
@@ -350,6 +417,7 @@ static int _dm_push_task(struct starpu_task *task, unsigned prio, unsigned sched
 				best_exp_end = exp_end;
 				best = worker;
 				model_best = local_length;
+				transfer_model_best = local_penalty;
 				best_impl = nimpl;
 			}
 		}
@@ -360,6 +428,7 @@ static int _dm_push_task(struct starpu_task *task, unsigned prio, unsigned sched
 	{
 		best = ntasks_best;
 		model_best = 0.0;
+		transfer_model_best = 0.0;
 	}
 
 	//_STARPU_DEBUG("Scheduler dm: kernel (%u)\n", best_impl);
@@ -370,7 +439,8 @@ static int _dm_push_task(struct starpu_task *task, unsigned prio, unsigned sched
 	 _starpu_get_job_associated_to_task(task)->nimpl = best_impl;
 
 	/* we should now have the best worker in variable "best" */
-	return push_task_on_best_worker(task, best, model_best, prio, sched_ctx_id);
+	return push_task_on_best_worker(task, best,
+					model_best, transfer_model_best, prio, sched_ctx_id);
 }
 
 static void compute_all_performance_predictions(struct starpu_task *task,
@@ -392,9 +462,9 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 	/* A priori, we know all estimations */
 	int unknown = 0;
 	unsigned worker, worker_ctx = 0;
-	
+
 	unsigned nimpl;
-	
+
 	starpu_task_bundle_t bundle = task->bundle;
 	dmda_data *dt = (dmda_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
 	struct worker_collection *workers = starpu_get_worker_collection_of_sched_ctx(sched_ctx_id);
@@ -424,9 +494,6 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 			exp_end[worker_ctx][nimpl] = fifo->exp_start + fifo->exp_len;
 			if (exp_end[worker_ctx][nimpl] > max_exp_end)
 				max_exp_end = exp_end[worker_ctx][nimpl];
-			
-			enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(worker);
-			unsigned memory_node = starpu_worker_get_memory_node(worker);
 
 			//_STARPU_DEBUG("Scheduler dmda: task length (%lf) worker (%u) kernel (%u) \n", local_task_length[worker][nimpl],worker,nimpl);
 
@@ -505,6 +572,7 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio, unsigned sch
 	int best = -1, best_in_ctx = -1;
 	int selected_impl = 0;
 	double model_best = 0.0;
+	double transfer_model_best = 0.0;
 
 	/* this flag is set if the corresponding worker is selected because
 	   there is no performance prediction available yet */
@@ -558,10 +626,12 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio, unsigned sch
 					+ dt->_gamma*(local_power[worker_ctx][nimpl]);
 				
 				if (exp_end[worker_ctx][nimpl] > max_exp_end)
+				{
 					/* This placement will make the computation
 					 * longer, take into account the idle
 					 * consumption of other cpus */
 					fitness[worker_ctx][nimpl] += dt->_gamma * dt->idle_power * (exp_end[worker_ctx][nimpl] - max_exp_end) / 1000000.0;
+				}
 				
 				if (best == -1 || fitness[worker_ctx][nimpl] < best_fitness)
 				{
@@ -586,17 +656,19 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio, unsigned sch
 		 * so we force this measurement */
 		best = forced_best;
 		model_best = 0.0;
-		//penality_best = 0.0;
+		transfer_model_best = 0.0;
 	}
 	else if (task->bundle)
 	{
 		enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(best_in_ctx);
+		unsigned memory_node = starpu_worker_get_memory_node(best);
 		model_best = starpu_task_expected_length(task, perf_arch, selected_impl);
+		transfer_model_best = starpu_task_expected_data_transfer_time(memory_node, task);
 	}
 	else
 	{
 		model_best = local_task_length[best_in_ctx][selected_impl];
-		//penality_best = local_data_penalty[best_in_ctx][best_impl];
+		transfer_model_best = local_data_penalty[best_in_ctx][selected_impl];
 	}
 	
 	if (task->bundle)
@@ -608,7 +680,7 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio, unsigned sch
 	 _starpu_get_job_associated_to_task(task)->nimpl = selected_impl;
 
 	/* we should now have the best worker in variable "best" */
-	return push_task_on_best_worker(task, best, model_best, prio, sched_ctx_id);
+	return push_task_on_best_worker(task, best, model_best, transfer_model_best, prio, sched_ctx_id);
 }
 
 static int dmda_push_sorted_task(struct starpu_task *task)
@@ -731,6 +803,16 @@ static void initialize_dmda_policy(unsigned sched_ctx_id)
 	if (strval_idle_power)
 		dt->idle_power = atof(strval_idle_power);
 
+#ifdef STARPU_USE_TOP
+	starpu_top_register_parameter_float("DMDA_ALPHA", &alpha,
+		alpha_minimum, alpha_maximum, param_modified);
+	starpu_top_register_parameter_float("DMDA_BETA", &beta,
+		beta_minimum, beta_maximum, param_modified);
+	starpu_top_register_parameter_float("DMDA_GAMMA", &_gamma,
+		gamma_minimum, gamma_maximum, param_modified);
+	starpu_top_register_parameter_float("DMDA_IDLE_POWER", &idle_power,
+		idle_power_minimum, idle_power_maximum, param_modified);
+#endif /* !STARPU_USE_TOP */
 }
 
 static void initialize_dmda_sorted_policy(unsigned sched_ctx_id)
@@ -752,6 +834,87 @@ static void deinitialize_dmda_policy(unsigned sched_ctx_id)
 	_STARPU_DEBUG("total_task_cnt %ld ready_task_cnt %ld -> %f\n", dt->total_task_cnt, dt->ready_task_cnt, (100.0f*dt->ready_task_cnt)/dt->total_task_cnt);
 }
 
+/* dmda_pre_exec_hook is called right after the data transfer is done and right
+ * before the computation to begin, it is useful to update more precisely the
+ * value of the expected start, end, length, etc... */
+static void dmda_pre_exec_hook(struct starpu_task *task)
+{
+	unsigned sched_ctx_id = task->sched_ctx;
+	int workerid = starpu_worker_get_id();
+	dmda_data *dt = (dmda_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
+	struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
+	double model = task->predicted;
+	double transfer_model = task->predicted_transfer;
+
+	pthread_mutex_t *sched_mutex;
+	pthread_cond_t *sched_cond;
+	starpu_worker_get_sched_condition(sched_ctx_id, workerid, &sched_mutex, &sched_cond);
+	/* Once the task is executing, we can update the predicted amount
+	 * of work. */
+	_STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
+	fifo->exp_len-= transfer_model;
+	fifo->exp_start = starpu_timing_now() + model;
+	fifo->exp_end= fifo->exp_start + fifo->exp_len;
+	_STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
+}
+
+static void dmda_push_task_notify(struct starpu_task *task, int workerid)
+{
+	unsigned sched_ctx_id = task->sched_ctx;
+	dmda_data *dt = (heft_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
+	struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
+	/* Compute the expected penality */
+	enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(workerid);
+	unsigned memory_node = starpu_worker_get_memory_node(workerid);
+
+	double predicted = starpu_task_expected_length(task, perf_arch,
+			_starpu_get_job_associated_to_task(task)->nimpl);
+
+	double predicted_transfer = starpu_task_expected_data_transfer_time(memory_node, task);
+	pthread_mutex_t *sched_mutex;
+	pthread_cond_t *sched_cond;
+	starpu_worker_get_sched_condition(sched_ctx_id, workerid, &sched_mutex, &sched_cond);
+
+
+	/* Update the predictions */
+	_STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
+	/* Sometimes workers didn't take the tasks as early as we expected */
+	fifo->exp_start = STARPU_MAX(fifo->exp_start, starpu_timing_now());
+	fifo->exp_end = fifo->exp_start + fifo->exp_len;
+
+	/* If there is no prediction available, we consider the task has a null length */
+	if (!isnan(predicted))
+	{
+		task->predicted = predicted;
+		fifo->exp_end += predicted;
+		fifo->exp_len += predicted;
+	}
+
+	/* If there is no prediction available, we consider the task has a null length */
+	if (!isnan(predicted_transfer))
+	{
+		if (starpu_timing_now() + predicted_transfer < fifo->exp_end)
+		{
+			/* We may hope that the transfer will be finished by
+			 * the start of the task. */
+			predicted_transfer = 0;
+		}
+		else
+		{
+			/* The transfer will not be finished by then, take the
+			 * remainder into account */
+			predicted_transfer = (starpu_timing_now() + predicted_transfer) - fifo->exp_end;
+		}
+		task->predicted_transfer = predicted_transfer;
+		fifo->exp_end += predicted_transfer;
+		fifo->exp_len += predicted_transfer;
+	}
+
+	fifo->ntasks++;
+
+	_STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
+}
+
 /* TODO: use post_exec_hook to fix the expected start */
 struct starpu_sched_policy _starpu_sched_dm_policy =
 {
@@ -775,8 +938,9 @@ struct starpu_sched_policy _starpu_sched_dmda_policy =
 	.add_workers = dmda_add_workers ,
         .remove_workers = dmda_remove_workers,
 	.push_task = dmda_push_task,
+	.push_task_notify = dmda_push_task_notify,
 	.pop_task = dmda_pop_task,
-	.pre_exec_hook = NULL,
+	.pre_exec_hook = dmda_pre_exec_hook,
 	.post_exec_hook = NULL,
 	.pop_every_task = dmda_pop_every_task,
 	.policy_name = "dmda",
@@ -790,8 +954,9 @@ struct starpu_sched_policy _starpu_sched_dmda_sorted_policy =
 	.add_workers = dmda_add_workers ,
         .remove_workers = dmda_remove_workers,
 	.push_task = dmda_push_sorted_task,
+	.push_task_notify = dmda_push_task_notify,
 	.pop_task = dmda_pop_ready_task,
-	.pre_exec_hook = NULL,
+	.pre_exec_hook = dmda_pre_exec_hook,
 	.post_exec_hook = NULL,
 	.pop_every_task = dmda_pop_every_task,
 	.policy_name = "dmdas",
@@ -805,8 +970,9 @@ struct starpu_sched_policy _starpu_sched_dmda_ready_policy =
 	.add_workers = dmda_add_workers ,
         .remove_workers = dmda_remove_workers,
 	.push_task = dmda_push_task,
+	.push_task_notify = dmda_push_task_notify,
 	.pop_task = dmda_pop_ready_task,
-	.pre_exec_hook = NULL,
+	.pre_exec_hook = dmda_pre_exec_hook,
 	.post_exec_hook = NULL,
 	.pop_every_task = dmda_pop_every_task,
 	.policy_name = "dmdar",

+ 1 - 2
src/sched_policies/eager_central_policy.c

@@ -59,8 +59,7 @@ static void initialize_eager_center_policy(unsigned sched_ctx_id)
 
 	eager_center_policy_data *data = (eager_center_policy_data*)malloc(sizeof(eager_center_policy_data));
 
-	if (!getenv("STARPU_SILENT")) fprintf(stderr,"Warning: you are running the default eager scheduler, which is not very smart. Make sure to read the StarPU documentation about adding performance models in order to be able to use the heft or dmda schedulers instead.\n");
-
+	_STARPU_DISP("Warning: you are running the default eager scheduler, which is not very smart. Make sure to read the StarPU documentation about adding performance models in order to be able to use the dmda scheduler instead.\n");
 
 	/* there is only a single queue in that trivial design */
 	data->fifo =  _starpu_create_fifo();

+ 1 - 3
tests/sched_policies/data_locality.c

@@ -9,7 +9,7 @@
  * test makes sure the scheduler will take account of the data locality
  * when scheduling tasks.
  *
- * Applies to : dmda, heft, pheft.
+ * Applies to : dmda, pheft.
  */
 
 static void
@@ -155,7 +155,6 @@ extern struct starpu_sched_policy _starpu_sched_dmda_policy;
 //extern struct starpu_sched_policy _starpu_sched_eager_policy;
 extern struct starpu_sched_policy _starpu_sched_parallel_heft_policy;
 //extern struct starpu_sched_policy _starpu_sched_pgreedy_policy;
-extern struct starpu_sched_policy _starpu_sched_heft_policy;
 
 static struct starpu_sched_policy *policies[] =
 {
@@ -163,7 +162,6 @@ static struct starpu_sched_policy *policies[] =
 	//&_starpu_sched_prio_policy,
 	//&_starpu_sched_dm_policy,
 	&_starpu_sched_dmda_policy,
-	&_starpu_sched_heft_policy,
 	//&_starpu_sched_dmda_ready_policy,
 	//&_starpu_sched_dmda_sorted_policy,
 	//&_starpu_sched_random_policy,

+ 0 - 2
tests/sched_policies/execute_all_tasks.c

@@ -35,7 +35,6 @@ extern struct starpu_sched_policy _starpu_sched_dmda_sorted_policy;
 extern struct starpu_sched_policy _starpu_sched_eager_policy;
 extern struct starpu_sched_policy _starpu_sched_parallel_heft_policy;
 extern struct starpu_sched_policy _starpu_sched_pgreedy_policy;
-extern struct starpu_sched_policy _starpu_sched_heft_policy;
 
 static struct starpu_sched_policy *policies[] =
 {
@@ -43,7 +42,6 @@ static struct starpu_sched_policy *policies[] =
 	&_starpu_sched_prio_policy,
 	&_starpu_sched_dm_policy,
 	&_starpu_sched_dmda_policy,
-	&_starpu_sched_heft_policy,
 	&_starpu_sched_dmda_ready_policy,
 	&_starpu_sched_dmda_sorted_policy,
 	&_starpu_sched_random_policy,

+ 3 - 5
tests/sched_policies/simple_cpu_gpu_sched.c

@@ -23,7 +23,7 @@
  * 	- cpu_task is cheduled on a CPU.
  * 	- gpu_task is scheduled on a GPU.
  *
- * Applies to : heft, XXX : and to what other schedulers ?
+ * Applies to : dmda and to what other schedulers ?
  */
 
 
@@ -189,14 +189,13 @@ extern struct starpu_sched_policy _starpu_sched_ws_policy;
 extern struct starpu_sched_policy _starpu_sched_prio_policy;
 extern struct starpu_sched_policy _starpu_sched_random_policy;
 extern struct starpu_sched_policy _starpu_sched_dm_policy;
-extern struct starpu_sched_policy _starpu_sched_dmda_policy;
 extern struct starpu_sched_policy _starpu_sched_dmda_ready_policy;
 extern struct starpu_sched_policy _starpu_sched_dmda_sorted_policy;
 extern struct starpu_sched_policy _starpu_sched_eager_policy;
 extern struct starpu_sched_policy _starpu_sched_parallel_heft_policy;
 extern struct starpu_sched_policy _starpu_sched_pgreedy_policy;
 */
-extern struct starpu_sched_policy _starpu_sched_heft_policy;
+extern struct starpu_sched_policy _starpu_sched_dmda_policy;
 
 /* XXX: what policies are we interested in ? */
 static struct starpu_sched_policy *policies[] =
@@ -204,8 +203,7 @@ static struct starpu_sched_policy *policies[] =
 	//&_starpu_sched_ws_policy,
 	//&_starpu_sched_prio_policy,
 	//&_starpu_sched_dm_policy,
-	//&_starpu_sched_dmda_policy,
-	&_starpu_sched_heft_policy,
+	&_starpu_sched_dmda_policy,
 	//&_starpu_sched_dmda_ready_policy,
 	//&_starpu_sched_dmda_sorted_policy,
 	//&_starpu_sched_random_policy,

+ 0 - 2
tests/sched_policies/simple_deps.c

@@ -93,7 +93,6 @@ extern struct starpu_sched_policy _starpu_sched_dmda_sorted_policy;
 extern struct starpu_sched_policy _starpu_sched_eager_policy;
 extern struct starpu_sched_policy _starpu_sched_parallel_heft_policy;
 extern struct starpu_sched_policy _starpu_sched_pgreedy_policy;
-extern struct starpu_sched_policy _starpu_sched_heft_policy;
 
 static struct starpu_sched_policy *policies[] =
 {
@@ -101,7 +100,6 @@ static struct starpu_sched_policy *policies[] =
 	&_starpu_sched_prio_policy,
 	&_starpu_sched_dm_policy,
 	&_starpu_sched_dmda_policy,
-	&_starpu_sched_heft_policy,
 	&_starpu_sched_dmda_ready_policy,
 	&_starpu_sched_dmda_sorted_policy,
 	&_starpu_sched_random_policy,