Переглянути джерело

GPU partitioning using contexts, and fixing dmda to consider the pipeline len

Andra Hugo 8 роки тому
батько
коміт
15103b9723

+ 30 - 0
doc/doxygen/chapters/330_scheduling_contexts.doxy

@@ -96,6 +96,36 @@ int id_ctx = starpu_sched_ctx_create(workerids, 3, "my_ctx", STARPU_SCHED_CTX_PO
 /* .... */
 \endcode
 
+\section CreatingAContext Creating A Context To Partition a GPU
+
+The contexts can also be used to group set of SMs of an NVIDIA GPU in order to isolate
+the parallel kernels and allow them to coexecution on a specified partiton of the GPU.
+
+Each context will be mapped to a stream and the user can indicate the number of SMs.
+The context can be added to a larger context already grouping CPU cores. 
+This larger context can use a scheduling policy that assigns tasks to both CPUs and contexts (partitions of the GPU)
+based on performance models adjusted to the number of SMs.
+
+The GPU implementation of the task has to be modified accordingly and receive as a parameter the number of SMs.
+
+\code{.c}
+/* get the available streams (suppose we have nstreams = 2 by specifying them with STARPU_NWORKER_PER_CUDA=2  */
+int nstreams = starpu_worker_get_stream_workerids(gpu_devid, stream_workerids, STARPU_CUDA_WORKER);
+
+int sched_ctx[nstreams];
+sched_ctx[0] = starpu_sched_ctx_create(&stream_workerids[0], 1, "subctx",  STARPU_SCHED_CTX_CUDA_NSMS, 6, 0);
+sched_ctx[1] = starpu_sched_ctx_create(&stream_workerids[1], 1, "subctx",  STARPU_SCHED_CTX_CUDA_NSMS, 7, 0);
+
+int ncpus = 4;
+int workers[ncpus+nstreams];
+workers[ncpus+0] = stream_workerids[0];
+workers[ncpus+1] = stream_workerids[1];
+
+big_sched_ctx = starpu_sched_ctx_create(workers, ncpus+nstreams, "ctx1", STARPU_SCHED_CTX_SUB_CTXS, sched_ctxs, nstreams, STARPU_SCHED_CTX_POLICY_NAME, "dmdas", 0); 
+
+starpu_task_submit_to_ctx(task, big_sched_ctx);
+
+\endcode
 
 \section ModifyingAContext Modifying A Context
 

+ 9 - 0
doc/doxygen/chapters/501_environment_variables.doxy

@@ -51,6 +51,15 @@ Specify the number of workers per CUDA device, and thus the number of kernels
 which will be concurrently running on the devices. The default value is 1.
 </dd>
 
+<dt>STARPU_NWORKER_PER_CUDA</dt>
+<dd>
+\anchor STARPU_ONE_THREAD_PER_STREAM
+\addindex __env__STARPU_ONE_THREAD_PER_STREAM
+Specify if the cuda driver should provide a thread per stream or a single thread 
+dealing with all the streams. 0 if one thread per stream, 1 otherwise. The default 
+value is 1.
+</dd>
+
 <dt>STARPU_CUDA_PIPELINE</dt>
 <dd>
 \anchor STARPU_CUDA_PIPELINE

+ 11 - 0
doc/doxygen/chapters/api/scheduling_contexts.doxy

@@ -106,6 +106,17 @@ function pointer allowing to initialize the scheduling policy.
 This macro is used when calling starpu_sched_ctx_create() to specify a
 pointer to some user data related to the context being created.
 
+\def STARPU_SCHED_CTX_SUB_CTXS
+\ingroup API_Scheduling_Contexts
+This macro is used when calling starpu_sched_ctx_create() to specify 
+a list of sub contextes of the current context.
+
+\def STARPU_SCHED_CTX_CUDA_NSMS
+\ingroup API_Scheduling_Contexts
+This macro is used when calling starpu_sched_ctx_create() in order
+to create a context on the NVIDIA GPU to specify the number of SMs
+the context should have
+
 \fn unsigned starpu_sched_ctx_create_inside_interval(const char *policy_name, const char *sched_ctx_name, int min_ncpus, int max_ncpus, int min_ngpus, int max_ngpus, unsigned allow_overlap)
 \ingroup API_Scheduling_Contexts
 Create a context indicating an approximate interval of resources

+ 17 - 4
examples/Makefile.am

@@ -73,7 +73,10 @@ EXTRA_DIST = 					\
 	reductions/dot_product_opencl_kernels.cl	\
 	scheduler/schedulers.sh				\
 	scheduler/schedulers_context.sh			\
-	fortran/Makefile
+	fortran/Makefile				\
+	sched_ctx/axpy_partition_gpu.h				\
+	sched_ctx/axpy_partition_gpu.cu		
+
 
 CLEANFILES = *.gcno *.gcda *.linkinfo *.mod starpu_idle_microsec.log
 
@@ -138,7 +141,8 @@ noinst_HEADERS = 				\
 	pi/SobolQRNG/sobol_gpu.h		\
 	pi/SobolQRNG/sobol_primitives.h         \
 	reductions/dot_product.h                \
-	basic_examples/vector_scal_cpu_template.h
+	basic_examples/vector_scal_cpu_template.h \
+	sched_ctx/axpy_partition_gpu.h				
 
 #####################################
 # What to install and what to check #
@@ -229,7 +233,8 @@ STARPU_EXAMPLES +=				\
 	sched_ctx/dummy_sched_with_ctx		\
 	worker_collections/worker_tree_example  \
 	reductions/dot_product			\
-	reductions/minmax_reduction
+	reductions/minmax_reduction		\
+	sched_ctx/gpu_partition
 
 endif
 
@@ -337,6 +342,14 @@ endif
 
 endif !STARPU_SIMGRID
 
+sched_ctx_gpu_partition_SOURCES =		\
+	sched_ctx/gpu_partition.c
+
+if STARPU_USE_CUDA
+sched_ctx_gpu_partition_SOURCES +=		\
+	sched_ctx/axpy_partition_gpu.cu
+endif
+
 ##################
 # Basic examples #
 ##################
@@ -851,7 +864,7 @@ endif
 
 cpp_add_vectors_SOURCES	=	\
 	cpp/add_vectors.cpp
-	
+
 if STARPU_HAVE_CXX11
 cpp_add_vectors_cpp11_SOURCES	=	\
 	cpp/add_vectors_cpp11.cpp

+ 7 - 1
include/starpu_sched_ctx.h

@@ -33,6 +33,8 @@ extern "C"
 #define STARPU_SCHED_CTX_AWAKE_WORKERS           (7<<16)
 #define STARPU_SCHED_CTX_POLICY_INIT             (8<<16)
 #define STARPU_SCHED_CTX_USER_DATA               (9<<16)
+#define STARPU_SCHED_CTX_CUDA_NSMS               (10<<16)
+#define STARPU_SCHED_CTX_SUB_CTXS                (11<<16)
 
 unsigned starpu_sched_ctx_create(int *workerids_ctx, int nworkers_ctx, const char *sched_ctx_name, ...);
 
@@ -157,7 +159,7 @@ unsigned starpu_sched_ctx_master_get_context(int masterid);
 
 void starpu_sched_ctx_revert_task_counters(unsigned sched_ctx_id, double flops);
 
-void starpu_sched_ctx_move_task_to_ctx(struct starpu_task *task, unsigned sched_ctx, unsigned manage_mutex);
+void starpu_sched_ctx_move_task_to_ctx(struct starpu_task *task, unsigned sched_ctx, unsigned manage_mutex, unsigned with_repush);
 
 int starpu_sched_ctx_get_worker_rank(unsigned sched_ctx_id);
 
@@ -168,6 +170,10 @@ unsigned starpu_sched_ctx_has_starpu_scheduler(unsigned sched_ctx_id, unsigned *
 void starpu_sched_ctx_call_pushed_task_cb(int workerid, unsigned sched_ctx_id);
 #endif /* STARPU_USE_SC_HYPERVISOR */
 
+int starpu_sched_ctx_get_stream_worker(unsigned sub_ctx);
+int starpu_sched_ctx_get_nsms(unsigned sched_ctx);
+void starpu_sched_ctx_get_sms_interval(int stream_workerid, int *start, int *end);
+
 #ifdef __cplusplus
 }
 #endif

+ 2 - 2
include/starpu_scheduler.h

@@ -39,8 +39,8 @@ struct starpu_sched_policy
 	struct starpu_task *(*pop_every_task)(unsigned sched_ctx_id);
 
 	void (*submit_hook)(struct starpu_task *task);
-	void (*pre_exec_hook)(struct starpu_task *);
-	void (*post_exec_hook)(struct starpu_task *);
+	void (*pre_exec_hook)(struct starpu_task *, unsigned sched_ctx_id);
+	void (*post_exec_hook)(struct starpu_task *, unsigned sched_ctx_id);
 
 	void (*do_schedule)(unsigned sched_ctx_id);
 

+ 1 - 0
include/starpu_task.h

@@ -196,6 +196,7 @@ struct starpu_task
 	double flops;
 	double predicted;
 	double predicted_transfer;
+	double predicted_start;
 
 	struct starpu_task *prev;
 	struct starpu_task *next;

+ 5 - 0
include/starpu_worker.h

@@ -129,6 +129,11 @@ char *starpu_worker_get_type_as_string(enum starpu_worker_archtype type);
 
 int starpu_bindid_get_workerids(int bindid, int **workerids);
 
+int starpu_worker_get_devids(enum starpu_worker_archtype type, int *devids, int num);
+
+int starpu_worker_get_stream_workerids(int devid, int *workerids, enum starpu_worker_archtype type);
+
+unsigned starpu_worker_get_sched_ctx_id_stream(int stream_workerid);
 #ifdef __cplusplus
 }
 #endif

+ 3 - 0
src/core/perfmodel/perfmodel.c

@@ -59,6 +59,9 @@ struct starpu_perfmodel_arch* starpu_worker_get_perf_archtype(int workerid, unsi
 		unsigned child_sched_ctx = starpu_sched_ctx_worker_is_master_for_child_ctx(workerid, sched_ctx_id);
 		if(child_sched_ctx != STARPU_NMAX_SCHED_CTXS)
 			return _starpu_sched_ctx_get_perf_archtype(child_sched_ctx);
+		struct _starpu_sched_ctx *stream_ctx = _starpu_worker_get_ctx_stream(workerid);
+		if(stream_ctx != NULL)
+			return _starpu_sched_ctx_get_perf_archtype(stream_ctx->id); 
 	}
 
 	struct _starpu_machine_config *config = _starpu_get_machine_config();

+ 112 - 8
src/core/sched_ctx.c

@@ -33,6 +33,7 @@ static size_t data_size[STARPU_NMAX_SCHED_CTXS][STARPU_NMAXWORKERS];
 static double hyp_actual_start_sample[STARPU_NMAX_SCHED_CTXS];
 static double window_size;
 static int nobind;
+static int occupied_sms = 0;
 
 static unsigned _starpu_get_first_free_sched_ctx(struct _starpu_machine_config *config);
 static void _starpu_sched_ctx_add_workers_to_master(unsigned sched_ctx_id, int *workerids, int nworkers, int new_master);
@@ -297,7 +298,10 @@ static void _starpu_add_workers_to_sched_ctx(struct _starpu_sched_ctx *sched_ctx
 			{
 				sched_ctx->perf_arch.devices[sched_ctx->perf_arch.ndevices].type = devices[dev1].type;
 				sched_ctx->perf_arch.devices[sched_ctx->perf_arch.ndevices].devid = devices[dev1].devid;
-				sched_ctx->perf_arch.devices[sched_ctx->perf_arch.ndevices].ncores = devices[dev1].ncores;
+				if (sched_ctx->stream_worker != -1)
+					sched_ctx->perf_arch.devices[sched_ctx->perf_arch.ndevices].ncores = sched_ctx->nsms;
+				else
+					sched_ctx->perf_arch.devices[sched_ctx->perf_arch.ndevices].ncores = devices[dev1].ncores;
 				sched_ctx->perf_arch.ndevices++;
 			}
 			else
@@ -472,7 +476,8 @@ struct _starpu_sched_ctx* _starpu_create_sched_ctx(struct starpu_sched_policy *p
 						   int max_prio_set, int max_prio,
 						   unsigned awake_workers,
 						   void (*sched_policy_init)(unsigned),
-						   void * user_data)
+						   void * user_data,
+						   int nsub_ctxs, int *sub_ctxs, int nsms)
 {
 	struct _starpu_machine_config *config = (struct _starpu_machine_config *)_starpu_get_machine_config();
 
@@ -526,6 +531,23 @@ struct _starpu_sched_ctx* _starpu_create_sched_ctx(struct starpu_sched_policy *p
 	sched_ctx->perf_arch.ndevices = 0;
 	sched_ctx->init_sched = sched_policy_init;
 	sched_ctx->user_data = user_data;
+	sched_ctx->sms_start_idx = 0;
+	sched_ctx->sms_end_idx = STARPU_NMAXSMS;
+	sched_ctx->nsms = nsms;
+	sched_ctx->stream_worker = -1;
+	if(nsms > 0)
+	{
+		sched_ctx->sms_start_idx = occupied_sms;
+		sched_ctx->sms_end_idx = occupied_sms+nsms;
+		occupied_sms += nsms;
+		printf("ctx %d: stream worker %d nsms %d ocupied sms %d\n", sched_ctx->id, workerids[0], nsms, occupied_sms);
+		STARPU_ASSERT_MSG(occupied_sms <= STARPU_NMAXSMS , "STARPU:requested more sms than available");
+		_starpu_worker_set_stream_ctx(workerids[0], sched_ctx);
+		sched_ctx->stream_worker = workerids[0];
+	}
+
+	sched_ctx->nsub_ctxs = 0;
+
 	int w;
 	for(w = 0; w < nworkers; w++)
 	{
@@ -565,6 +587,15 @@ struct _starpu_sched_ctx* _starpu_create_sched_ctx(struct starpu_sched_policy *p
 		  }
 	}
 
+        /*add sub_ctxs before add workers, in order to be able to associate them if necessary */
+	if(nsub_ctxs != 0)
+	{
+		int i;
+		for(i = 0; i < nsub_ctxs; i++)
+			sched_ctx->sub_ctxs[i] = sub_ctxs[i];
+		sched_ctx->nsub_ctxs = nsub_ctxs;
+	}
+	
 	/* after having an worker_collection on the ressources add them */
 	_starpu_add_workers_to_sched_ctx(sched_ctx, workerids, nworkers_ctx, NULL, NULL);
 
@@ -724,7 +755,7 @@ unsigned starpu_sched_ctx_create_inside_interval(const char *policy_name, const
 	for(i = 0; i < nw; i++)
 		printf("%d ", workers[i]);
 	printf("\n");
-	sched_ctx = _starpu_create_sched_ctx(selected_policy, workers, nw, 0, sched_ctx_name, 0, 0, 0, 0, 1, NULL, NULL);
+	sched_ctx = _starpu_create_sched_ctx(selected_policy, workers, nw, 0, sched_ctx_name, 0, 0, 0, 0, 1, NULL, NULL,0, NULL, 0);
 	sched_ctx->min_ncpus = min_ncpus;
 	sched_ctx->max_ncpus = max_ncpus;
 	sched_ctx->min_ngpus = min_ngpus;
@@ -742,6 +773,45 @@ unsigned starpu_sched_ctx_create_inside_interval(const char *policy_name, const
 
 }
 
+int starpu_sched_ctx_get_nsms(unsigned sched_ctx)
+{
+	struct _starpu_sched_ctx *sc = _starpu_get_sched_ctx_struct(sched_ctx);
+	return sc->nsms;
+}
+
+void starpu_sched_ctx_get_sms_interval(int stream_workerid, int *start, int *end)
+{
+	struct _starpu_sched_ctx *sc = _starpu_worker_get_ctx_stream(stream_workerid);
+	*start = sc->sms_start_idx;
+	*end = sc->sms_end_idx;
+}
+
+int starpu_sched_ctx_get_sub_ctxs(unsigned sched_ctx, int *ctxs)
+{
+	struct _starpu_sched_ctx *sc = _starpu_get_sched_ctx_struct(sched_ctx);
+	int i;
+	for(i = 0; i < sc->nsub_ctxs; i++)
+		    ctxs[i] = sc->sub_ctxs[i];
+	return sc->nsub_ctxs;
+}
+
+int starpu_sched_ctx_get_stream_worker(unsigned sub_ctx)
+{
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sub_ctx);
+	struct starpu_worker_collection *workers = sched_ctx->workers;
+
+	struct starpu_sched_ctx_iterator it;
+	int worker = -1;
+	
+	workers->init_iterator(workers, &it);
+	if(workers->has_next(workers, &it))
+	{
+		worker = workers->get_next(workers, &it);
+	}
+
+	return worker;
+}
+
 unsigned starpu_sched_ctx_create(int *workerids, int nworkers, const char *sched_ctx_name, ...)
 {
 	va_list varg_list;
@@ -750,6 +820,9 @@ unsigned starpu_sched_ctx_create(int *workerids, int nworkers, const char *sched
 	int max_prio_set = 0;
 	int min_prio = 0;
 	int max_prio = 0;
+	int nsms = 0;
+        int *sub_ctxs = NULL;
+        int nsub_ctxs = 0;
 	void *user_data = NULL;
 	struct starpu_sched_policy *sched_policy = NULL;
 	unsigned hierarchy_level = 0;
@@ -800,6 +873,15 @@ unsigned starpu_sched_ctx_create(int *workerids, int nworkers, const char *sched
 		{
 			user_data = va_arg(varg_list, void *);
 		}
+		else if (arg_type == STARPU_SCHED_CTX_SUB_CTXS)
+		{
+			sub_ctxs = va_arg(varg_list, int*);
+			nsub_ctxs = va_arg(varg_list, int);
+		}
+		else if (arg_type == STARPU_SCHED_CTX_CUDA_NSMS)
+		{
+			nsms = va_arg(varg_list, int);
+		}
 		else
 		{
 			STARPU_ABORT_MSG("Unrecognized argument %d\n", arg_type);
@@ -824,7 +906,7 @@ unsigned starpu_sched_ctx_create(int *workerids, int nworkers, const char *sched
 	}
 
 	struct _starpu_sched_ctx *sched_ctx = NULL;
-	sched_ctx = _starpu_create_sched_ctx(sched_policy, workerids, nworkers, 0, sched_ctx_name, min_prio_set, min_prio, max_prio_set, max_prio, awake_workers, init_sched, user_data);
+	sched_ctx = _starpu_create_sched_ctx(sched_policy, workerids, nworkers, 0, sched_ctx_name, min_prio_set, min_prio, max_prio_set, max_prio, awake_workers, init_sched, user_data, nsub_ctxs, sub_ctxs, nsms);
 	sched_ctx->hierarchy_level = hierarchy_level;
 	sched_ctx->nesting_sched_ctx = nesting_sched_ctx;
 
@@ -848,6 +930,9 @@ int fstarpu_sched_ctx_create(int *workerids, int nworkers, const char *sched_ctx
 	int max_prio_set = 0;
 	int min_prio = 0;
 	int max_prio = 0;
+	int nsms = 0;
+        int *sub_ctxs = NULL;
+        int nsub_ctxs = 0;
 	void *user_data = NULL;
 	struct starpu_sched_policy *sched_policy = NULL;
 	unsigned hierarchy_level = 0;
@@ -910,6 +995,19 @@ int fstarpu_sched_ctx_create(int *workerids, int nworkers, const char *sched_ctx
 			arg_i++;
 			user_data = arglist[arg_i];
 		}
+		else if (arg_type == STARPU_SCHED_CTX_SUB_CTXS)
+		{
+			arg_i++;
+			sub_ctxs = (int*)arglist[arg_i]; 
+			arg_i++;
+			nsub_ctxs = *(int*)arglist[arg_i]; 
+		}
+		else if (arg_type == STARPU_SCHED_CTX_CUDA_NSMS)
+		{
+			arg_i++;
+			nsms = *(int*)arglist[arg_i]; 
+		}
+
 		else
 		{
 			STARPU_ABORT_MSG("Unrecognized argument %d\n", arg_type);
@@ -933,7 +1031,7 @@ int fstarpu_sched_ctx_create(int *workerids, int nworkers, const char *sched_ctx
 	}
 
 	struct _starpu_sched_ctx *sched_ctx = NULL;
-	sched_ctx = _starpu_create_sched_ctx(sched_policy, workerids, nworkers, 0, sched_ctx_name, min_prio_set, min_prio, max_prio_set, max_prio, awake_workers, init_sched, user_data);
+	sched_ctx = _starpu_create_sched_ctx(sched_policy, workerids, nworkers, 0, sched_ctx_name, min_prio_set, min_prio, max_prio_set, max_prio, awake_workers, init_sched, user_data, nsub_ctxs, sub_ctxs, nsms);
 	sched_ctx->hierarchy_level = hierarchy_level;
 	sched_ctx->nesting_sched_ctx = nesting_sched_ctx;
 
@@ -1015,7 +1113,8 @@ void starpu_sched_ctx_delete(unsigned sched_ctx_id)
 {
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
 #ifdef STARPU_USE_SC_HYPERVISOR
-	if (sched_ctx_id != 0 && sched_ctx_id != STARPU_NMAX_SCHED_CTXS && sched_ctx->perf_counters != NULL)
+	if(sched_ctx != NULL && sched_ctx_id != 0 && sched_ctx_id != STARPU_NMAX_SCHED_CTXS
+	   && sched_ctx->perf_counters != NULL)
 	{
 		_STARPU_TRACE_HYPERVISOR_BEGIN();
 		sched_ctx->perf_counters->notify_delete_context(sched_ctx_id);
@@ -1062,6 +1161,7 @@ void starpu_sched_ctx_delete(unsigned sched_ctx_id)
 	   you don't use it anymore */
 	free(workerids);
 	_starpu_relock_mutex_if_prev_locked();
+	occupied_sms -= sched_ctx->nsms;
 	return;
 }
 
@@ -2092,7 +2192,8 @@ void starpu_sched_ctx_revert_task_counters(unsigned sched_ctx_id, double ready_f
         _starpu_decrement_nready_tasks_of_sched_ctx(sched_ctx_id, ready_flops);
 }
 
-void starpu_sched_ctx_move_task_to_ctx(struct starpu_task *task, unsigned sched_ctx, unsigned manage_mutex)
+void starpu_sched_ctx_move_task_to_ctx(struct starpu_task *task, unsigned sched_ctx, unsigned manage_mutex, 
+				       unsigned with_repush)
 {
 	/* TODO: make something cleaner which differentiates between calls
 	   from push or pop (have mutex or not) and from another worker or not */
@@ -2111,7 +2212,10 @@ void starpu_sched_ctx_move_task_to_ctx(struct starpu_task *task, unsigned sched_
 
 	_starpu_increment_nsubmitted_tasks_of_sched_ctx(j->task->sched_ctx);
 
-	_starpu_repush_task(j);
+	if(with_repush)
+		_starpu_repush_task(j);
+	else
+		_starpu_increment_nready_tasks_of_sched_ctx(j->task->sched_ctx, j->task->flops, j->task);
 
 	if(workerid != -1 && manage_mutex)
 		STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);

+ 13 - 2
src/core/sched_ctx.h

@@ -36,7 +36,7 @@
 #define DO_RESIZE 1
 
 #define STARPU_GLOBAL_SCHED_CTX 0
-
+#define STARPU_NMAXSMS 13
 struct _starpu_sched_ctx
 {
 	/* id of the context used in user mode*/
@@ -174,6 +174,16 @@ struct _starpu_sched_ctx
 
 	/* function called when initializing the scheduler */
 	void (*init_sched)(unsigned);
+
+	int sub_ctxs[STARPU_NMAXWORKERS];
+	int nsub_ctxs;
+
+	/* nr of SMs assigned to this ctx if we partition gpus*/
+	int nsms;
+	int sms_start_idx;
+	int sms_end_idx;
+
+	int stream_worker;
 };
 
 struct _starpu_machine_config;
@@ -184,7 +194,8 @@ void _starpu_init_all_sched_ctxs(struct _starpu_machine_config *config);
 /* allocate all structures belonging to a context */
 struct _starpu_sched_ctx*  _starpu_create_sched_ctx(struct starpu_sched_policy *policy, int *workerid, int nworkerids, unsigned is_init_sched, const char *sched_name,
 						    int min_prio_set, int min_prio,
-						    int max_prio_set, int max_prio, unsigned awake_workers, void (*sched_policy_init)(unsigned), void *user_data);
+						    int max_prio_set, int max_prio, unsigned awake_workers, void (*sched_policy_init)(unsigned), void *user_data,
+							int nsub_ctxs, int *sub_ctxs, int nsms);
 
 /* delete all sched_ctx */
 void _starpu_delete_all_sched_ctxs();

+ 50 - 2
src/core/sched_policy.c

@@ -1009,9 +1009,34 @@ void _starpu_sched_pre_exec_hook(struct starpu_task *task)
 	if (sched_ctx->sched_policy && sched_ctx->sched_policy->pre_exec_hook)
 	{
 		_STARPU_TRACE_WORKER_SCHEDULING_PUSH;
-		sched_ctx->sched_policy->pre_exec_hook(task);
+		sched_ctx->sched_policy->pre_exec_hook(task, sched_ctx_id);
 		_STARPU_TRACE_WORKER_SCHEDULING_POP;
 	}
+
+	if(!sched_ctx->sched_policy)
+	{
+		int workerid = starpu_worker_get_id();
+		struct _starpu_worker *worker =  _starpu_get_worker_struct(workerid);
+		struct _starpu_sched_ctx *other_sched_ctx;
+		struct _starpu_sched_ctx_elt *e = NULL;
+		struct _starpu_sched_ctx_list_iterator list_it;
+		
+		_starpu_sched_ctx_list_iterator_init(worker->sched_ctx_list, &list_it);
+		while (_starpu_sched_ctx_list_iterator_has_next(&list_it))
+		{
+			e = _starpu_sched_ctx_list_iterator_get_next(&list_it);
+			other_sched_ctx = _starpu_get_sched_ctx_struct(e->sched_ctx);
+			if (other_sched_ctx != sched_ctx && 
+			    other_sched_ctx->sched_policy != NULL && 
+			    other_sched_ctx->sched_policy->pre_exec_hook)
+			{
+				_STARPU_TRACE_WORKER_SCHEDULING_PUSH;
+				other_sched_ctx->sched_policy->pre_exec_hook(task, other_sched_ctx->id);
+				_STARPU_TRACE_WORKER_SCHEDULING_POP;
+			}
+		}
+	}
+
 }
 
 void _starpu_sched_post_exec_hook(struct starpu_task *task)
@@ -1021,9 +1046,32 @@ void _starpu_sched_post_exec_hook(struct starpu_task *task)
 	if (sched_ctx->sched_policy && sched_ctx->sched_policy->post_exec_hook)
 	{
 		_STARPU_TRACE_WORKER_SCHEDULING_PUSH;
-		sched_ctx->sched_policy->post_exec_hook(task);
+		sched_ctx->sched_policy->post_exec_hook(task, sched_ctx_id);
 		_STARPU_TRACE_WORKER_SCHEDULING_POP;
 	}
+	if(!sched_ctx->sched_policy)
+	{
+		int workerid = starpu_worker_get_id();
+		struct _starpu_worker *worker =  _starpu_get_worker_struct(workerid);
+		struct _starpu_sched_ctx *other_sched_ctx;
+		struct _starpu_sched_ctx_elt *e = NULL;
+		struct _starpu_sched_ctx_list_iterator list_it;
+		
+		_starpu_sched_ctx_list_iterator_init(worker->sched_ctx_list, &list_it);
+		while (_starpu_sched_ctx_list_iterator_has_next(&list_it))
+		{
+			e = _starpu_sched_ctx_list_iterator_get_next(&list_it);
+			other_sched_ctx = _starpu_get_sched_ctx_struct(e->sched_ctx);
+			if (other_sched_ctx != sched_ctx && 
+			    other_sched_ctx->sched_policy != NULL && 
+			    other_sched_ctx->sched_policy->post_exec_hook)
+			{
+				_STARPU_TRACE_WORKER_SCHEDULING_PUSH;
+				other_sched_ctx->sched_policy->post_exec_hook(task, other_sched_ctx->id);
+				_STARPU_TRACE_WORKER_SCHEDULING_POP;
+			}
+		}
+	}
 }
 
 void _starpu_wait_on_sched_event(void)

+ 1 - 0
src/core/task.c

@@ -94,6 +94,7 @@ void starpu_task_init(struct starpu_task *task)
 
 	task->predicted = NAN;
 	task->predicted_transfer = NAN;
+	task->predicted_start = NAN;
 
 	task->magic = 42;
 	task->sched_ctx = STARPU_NMAX_SCHED_CTXS;

+ 28 - 4
src/core/topology.c

@@ -89,7 +89,12 @@ _starpu_get_worker_from_driver(struct starpu_driver *d)
 
 #ifdef STARPU_USE_CUDA
 	if (d->type == STARPU_CUDA_WORKER)
-		return &cuda_worker_set[d->id.cuda_id];
+	{
+		unsigned th_per_stream = starpu_get_env_number_default("STARPU_ONE_THREAD_PER_STREAM", 1);
+		if(th_per_stream == 0)
+			return &cuda_worker_set[d->id.cuda_id];
+
+	}
 #endif
 
 	for (workerid = 0; workerid < nworkers; workerid++)
@@ -116,6 +121,16 @@ _starpu_get_worker_from_driver(struct starpu_driver *d)
 				break;
 			}
 #endif
+#ifdef STARPU_USE_CUDA
+			case STARPU_CUDA_WORKER:
+			{
+				if (worker->devid == d->id.cuda_id)
+					return &worker->set;
+				break;
+
+			}
+#endif
+
 			default:
 				_STARPU_DEBUG("Invalid device type\n");
 				return NULL;
@@ -1038,17 +1053,27 @@ _starpu_init_machine_config(struct _starpu_machine_config *config, int no_mp_con
 
 	_starpu_initialize_workers_cuda_gpuid(config);
 
+	/* allow having one worker per stream */
+	unsigned th_per_stream = starpu_get_env_number_default("STARPU_WORKER_PER_STREAM", 1);
+
 	unsigned cudagpu;
 	for (cudagpu = 0; cudagpu < topology->ncudagpus; cudagpu++)
 	{
 		int devid = _starpu_get_next_cuda_gpuid(config);
 		int worker_idx0 = topology->nworkers + cudagpu * nworker_per_cuda;
 		cuda_worker_set[devid].workers = &config->workers[worker_idx0];
+
 		for (i = 0; i < nworker_per_cuda; i++)
 		{
 			int worker_idx = worker_idx0 + i;
+			if(th_per_stream)
+			{
+				config->workers[worker_idx].set = (struct _starpu_worker_set *)malloc(sizeof(struct _starpu_worker_set));
+				config->workers[worker_idx].set->workers = &config->workers[worker_idx];
+			}
+			else
+				config->workers[worker_idx].set = &cuda_worker_set[devid];
 
-			config->workers[worker_idx].set = &cuda_worker_set[devid];
 			config->workers[worker_idx].arch = STARPU_CUDA_WORKER;
 			_STARPU_MALLOC(config->workers[worker_idx].perf_arch.devices, sizeof(struct starpu_perfmodel_device));
 			config->workers[worker_idx].perf_arch.ndevices = 1;
@@ -1554,7 +1579,6 @@ _starpu_init_workers_binding (struct _starpu_machine_config *config, int no_mp_c
 		int *preferred_binding = NULL;
 		int npreferred = 0;
 #endif
-
 		/* select the memory node that contains worker's memory */
 		switch (workerarg->arch)
 		{
@@ -1601,7 +1625,7 @@ _starpu_init_workers_binding (struct _starpu_machine_config *config, int no_mp_c
 				{
 					memory_node = cuda_memory_nodes[devid];
 #ifndef STARPU_SIMGRID
-					workerarg->bindid = cuda_bindid[devid];
+					workerarg->bindid = _starpu_get_next_bindid(config, preferred_binding, npreferred);//cuda_bindid[devid];
 #endif /* SIMGRID */
 				}
 				else

+ 129 - 32
src/core/workers.c

@@ -371,7 +371,7 @@ int starpu_worker_can_execute_task_first_impl(unsigned workerid, struct starpu_t
 	{
 		for (i = 0; i < STARPU_MAXIMPLEMENTATIONS; i++)
 			if (_starpu_can_use_nth_implementation(arch, cl, i)
-			 && task->cl->can_execute(workerid, task, i))
+			 && (!task->cl->can_execute || task->cl->can_execute(workerid, task, i)))
 			{
 				if (nimpl)
 					*nimpl = i;
@@ -676,12 +676,16 @@ static void _starpu_launch_drivers(struct _starpu_machine_config *pconfig)
 #if defined(STARPU_USE_CUDA) || defined(STARPU_SIMGRID)
 			case STARPU_CUDA_WORKER:
 				driver.id.cuda_id = devid;
-
-				/* We spawn only one thread per CUDA driver,
-				 * which will control all CUDA workers of this
-				 * driver. (by using a worker set). */
-				if (worker_set->workers != workerarg)
-					break;
+				/* allow having one worker per stream */
+				unsigned th_per_stream = starpu_get_env_number_default("STARPU_ONE_THREAD_PER_STREAM", 1);
+				if(th_per_stream == 0)
+				{
+					/* We spawn only one thread per CUDA driver,
+					 * which will control all CUDA workers of this
+					 * driver. (by using a worker set). */
+					if (worker_set->workers != workerarg)
+						break;
+				}
 
 				worker_set->nworkers = starpu_get_env_number_default("STARPU_NWORKER_PER_CUDA", 1);
 
@@ -701,19 +705,53 @@ static void _starpu_launch_drivers(struct _starpu_machine_config *pconfig)
 					break;
 				}
 
-				STARPU_PTHREAD_CREATE_ON(
-					workerarg->name,
-					&worker_set->worker_thread,
-					NULL,
-					_starpu_cuda_worker,
-					worker_set,
-					_starpu_simgrid_get_host_by_worker(workerarg));
+
+				if(th_per_stream == 0)
+				{
+					STARPU_PTHREAD_CREATE_ON(
+						workerarg->name,
+						&worker_set->worker_thread,
+						NULL,
+						_starpu_cuda_worker,
+						worker_set,
+						_starpu_simgrid_get_host_by_worker(workerarg));
+				}
+				else
+				{
+					worker_set->nworkers = 1;
+					STARPU_PTHREAD_CREATE_ON(
+						workerarg->name,
+						&workerarg->worker_thread,
+						NULL,
+						_starpu_cuda_worker,
+//						workerarg,
+						worker_set,
+						_starpu_simgrid_get_host_by_worker(workerarg));
+				}
 #ifdef STARPU_USE_FXT
 				STARPU_PTHREAD_MUTEX_LOCK(&workerarg->mutex);
 				while (!workerarg->worker_is_running)
 					STARPU_PTHREAD_COND_WAIT(&workerarg->started_cond, &workerarg->mutex);
 				STARPU_PTHREAD_MUTEX_UNLOCK(&workerarg->mutex);
 #endif
+
+				if(th_per_stream == 0)
+				{
+
+					STARPU_PTHREAD_MUTEX_LOCK(&worker_set->mutex);
+					while (!worker_set->set_is_initialized)
+						STARPU_PTHREAD_COND_WAIT(&worker_set->ready_cond,
+									 &worker_set->mutex);
+					STARPU_PTHREAD_MUTEX_UNLOCK(&worker_set->mutex);
+				}
+				else
+				{
+					STARPU_PTHREAD_MUTEX_LOCK(&workerarg->mutex);
+					while (!workerarg->worker_is_initialized)
+						STARPU_PTHREAD_COND_WAIT(&workerarg->ready_cond, &workerarg->mutex);
+					STARPU_PTHREAD_MUTEX_UNLOCK(&workerarg->mutex);
+				}
+				worker_set->started = 1;
 				break;
 #endif
 #if defined(STARPU_USE_OPENCL) || defined(STARPU_SIMGRID)
@@ -809,9 +847,6 @@ static void _starpu_launch_drivers(struct _starpu_machine_config *pconfig)
 		struct starpu_driver driver;
 		unsigned devid = workerarg->devid;
 		driver.type = workerarg->arch;
-#if defined(STARPU_USE_CUDA) || defined(STARPU_SIMGRID)
-		struct _starpu_worker_set *worker_set = workerarg->set;
-#endif
 
 		switch (workerarg->arch)
 		{
@@ -827,19 +862,7 @@ static void _starpu_launch_drivers(struct _starpu_machine_config *pconfig)
 				break;
 #if defined(STARPU_USE_CUDA) || defined(STARPU_SIMGRID)
 			case STARPU_CUDA_WORKER:
-#ifndef STARPU_SIMGRID
-				driver.id.cuda_id = devid;
-				if (!_starpu_may_launch_driver(&pconfig->conf, &driver))
-					break;
-#endif
-				_STARPU_DEBUG("waiting for worker %u initialization\n", worker);
-				STARPU_PTHREAD_MUTEX_LOCK(&worker_set->mutex);
-				while (!worker_set->set_is_initialized)
-					STARPU_PTHREAD_COND_WAIT(&worker_set->ready_cond,
-								 &worker_set->mutex);
-				STARPU_PTHREAD_MUTEX_UNLOCK(&worker_set->mutex);
-				worker_set->started = 1;
-
+				/* Already waited above */
 				break;
 #endif
 #if defined(STARPU_USE_OPENCL) || defined(STARPU_SIMGRID)
@@ -1248,7 +1271,7 @@ int starpu_initialize(struct starpu_conf *user_conf, int *argc, char ***argv)
 	if (!is_a_sink)
 	{
 		struct starpu_sched_policy *selected_policy = _starpu_select_sched_policy(&_starpu_config, _starpu_config.conf.sched_policy_name);
-		_starpu_create_sched_ctx(selected_policy, NULL, -1, 1, "init", (_starpu_config.conf.global_sched_ctx_min_priority != -1), _starpu_config.conf.global_sched_ctx_min_priority, (_starpu_config.conf.global_sched_ctx_min_priority != -1), _starpu_config.conf.global_sched_ctx_max_priority, 1, _starpu_config.conf.sched_policy_init, NULL);
+		_starpu_create_sched_ctx(selected_policy, NULL, -1, 1, "init", (_starpu_config.conf.global_sched_ctx_min_priority != -1), _starpu_config.conf.global_sched_ctx_min_priority, (_starpu_config.conf.global_sched_ctx_min_priority != -1), _starpu_config.conf.global_sched_ctx_max_priority, 1, _starpu_config.conf.sched_policy_init, NULL,  0, NULL, 0);
 	}
 
 	_starpu_initialize_registered_performance_models();
@@ -1310,7 +1333,7 @@ static void _starpu_terminate_workers(struct _starpu_machine_config *pconfig)
 
 		/* in case StarPU termination code is called from a callback,
  		 * we have to check if pthread_self() is the worker itself */
-		if (set)
+		if (set && set->nworkers > 1)
 		{
 			if (set->started)
 			{
@@ -1842,6 +1865,47 @@ int starpu_worker_get_by_devid(enum starpu_worker_archtype type, int devid)
 	return -1;
 }
 
+int starpu_worker_get_devids(enum starpu_worker_archtype type, int *devids, int num)
+{
+	int cnt = 0;
+	unsigned nworkers = starpu_worker_get_count();
+	int *workerids = (int *)malloc(nworkers*sizeof(int));
+
+	int ndevice_workers = starpu_worker_get_ids_by_type(type, workerids, nworkers);
+
+	int ndevids = 0;
+
+	if(ndevice_workers > 0)
+	{
+		unsigned id, devid;
+		int curr_devid = -1;
+		unsigned found = 0;
+		for(id = 0; id < ndevice_workers; id++)
+		{
+			curr_devid = _starpu_config.workers[workerids[id]].devid;
+			for(devid = 0; devid < ndevids; devid++)
+			{
+				if(curr_devid == devids[devid])
+				{
+					found = 1;
+					break;
+				}
+			}
+			if(!found)
+			{
+				devids[ndevids++] = curr_devid;
+				cnt++;
+			}
+			else
+				found = 0;
+
+			if(cnt == num)
+				break;
+		}
+	}
+	return ndevids;
+}
+
 void starpu_worker_get_name(int id, char *dst, size_t maxlen)
 {
 	char *name = _starpu_config.workers[id].name;
@@ -1862,6 +1926,19 @@ int starpu_bindid_get_workerids(int bindid, int **workerids)
 	return _starpu_config.bindid_workers[bindid].nworkers;
 }
 
+int starpu_worker_get_stream_workerids(int devid, int *workerids, enum starpu_worker_archtype type)
+{
+	unsigned nworkers = starpu_worker_get_count();
+	int nw = 0;
+	unsigned id;
+	for (id = 0; id < nworkers; id++)
+	{
+		if (_starpu_config.workers[id].devid == devid && _starpu_config.workers[id].arch == type)
+			workerids[nw++] = id;
+	}
+	return nw;
+}
+
 void starpu_worker_get_sched_condition(int workerid, starpu_pthread_mutex_t **sched_mutex, starpu_pthread_cond_t **sched_cond)
 {
 	*sched_cond = &_starpu_config.workers[workerid].sched_cond;
@@ -2142,3 +2219,23 @@ char *starpu_worker_get_type_as_string(enum starpu_worker_archtype type)
 	if (type == STARPU_ANY_WORKER) return "STARPU_ANY_WORKER";
 	return "STARPU_unknown_WORKER";
 }
+
+void _starpu_worker_set_stream_ctx(int workerid, struct _starpu_sched_ctx *sched_ctx)
+{
+        struct _starpu_worker *w = _starpu_get_worker_struct(workerid);
+        w->stream_ctx = sched_ctx;
+}
+
+struct _starpu_sched_ctx* _starpu_worker_get_ctx_stream(int stream_workerid)
+{
+        struct _starpu_worker *w = _starpu_get_worker_struct(stream_workerid);
+        return w->stream_ctx;
+}
+
+unsigned starpu_worker_get_sched_ctx_id_stream(int stream_workerid)
+{
+        struct _starpu_worker *w = _starpu_get_worker_struct(stream_workerid);
+	return w->stream_ctx != NULL ? w->stream_ctx->id : STARPU_NMAX_SCHED_CTXS;
+}
+
+

+ 6 - 0
src/core/workers.h

@@ -133,6 +133,8 @@ LIST_TYPE(_starpu_worker,
 	/* bool to indicate if the worker is slave in a ctx */
 	unsigned is_slave_somewhere;
 
+	struct _starpu_sched_ctx *stream_ctx;
+
 #ifdef __GLIBC__
 	cpu_set_t cpu_set;
 #endif /* __GLIBC__ */
@@ -576,4 +578,8 @@ static inline unsigned __starpu_worker_get_id_check(const char *f, int l)
 }
 #define _starpu_worker_get_id_check(f,l) __starpu_worker_get_id_check(f,l)
 
+void _starpu_worker_set_stream_ctx(int workerid, struct _starpu_sched_ctx *sched_ctx);
+
+struct _starpu_sched_ctx* _starpu_worker_get_ctx_stream(int stream_workerid);
+
 #endif // __WORKERS_H__

+ 30 - 13
src/drivers/cuda/driver_cuda.c

@@ -70,6 +70,9 @@ static starpu_pthread_mutex_t task_mutex[STARPU_NMAXWORKERS][STARPU_MAX_PIPELINE
 static starpu_pthread_cond_t task_cond[STARPU_NMAXWORKERS][STARPU_MAX_PIPELINE];
 #endif /* STARPU_SIMGRID */
 
+static unsigned cuda_memnode_deinit[STARPU_MAXCUDADEVS];
+static starpu_pthread_mutex_t cuda_deinit_mutex[STARPU_MAXCUDADEVS];
+
 void
 _starpu_cuda_discover_devices (struct _starpu_machine_config *config)
 {
@@ -676,11 +679,16 @@ int _starpu_cuda_driver_init(struct _starpu_worker_set *worker_set)
 	STARPU_PTHREAD_COND_SIGNAL(&worker0->ready_cond);
 	STARPU_PTHREAD_MUTEX_UNLOCK(&worker0->mutex);
 
-	/* tell the main thread that this one is ready */
-	STARPU_PTHREAD_MUTEX_LOCK(&worker_set->mutex);
-	worker_set->set_is_initialized = 1;
-	STARPU_PTHREAD_COND_SIGNAL(&worker_set->ready_cond);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&worker_set->mutex);
+	unsigned th_per_stream = starpu_get_env_number_default("STARPU_ONE_THREAD_PER_STREAM", 1);
+
+	if(th_per_stream == 0)
+	{
+		/* tell the main thread that this one is ready */
+		STARPU_PTHREAD_MUTEX_LOCK(&worker_set->mutex);
+		worker_set->set_is_initialized = 1;
+		STARPU_PTHREAD_COND_SIGNAL(&worker_set->ready_cond);
+		STARPU_PTHREAD_MUTEX_UNLOCK(&worker_set->mutex);
+	}
 
 	return 0;
 }
@@ -852,18 +860,27 @@ int _starpu_cuda_driver_deinit(struct _starpu_worker_set *worker_set)
 			continue;
 		lastdevid = devid;
 
-		_starpu_handle_all_pending_node_data_requests(memnode);
-
-		/* In case there remains some memory that was automatically
-		 * allocated by StarPU, we release it now. Note that data
-		 * coherency is not maintained anymore at that point ! */
-		_starpu_free_all_automatically_allocated_buffers(memnode);
+		STARPU_PTHREAD_MUTEX_LOCK(&cuda_deinit_mutex[memnode]);
+		if(!cuda_memnode_deinit[devid])
+                {
 
-		_starpu_malloc_shutdown(memnode);
+			_starpu_handle_all_pending_node_data_requests(memnode);
+			
+			/* In case there remains some memory that was automatically
+			 * allocated by StarPU, we release it now. Note that data
+			 * coherency is not maintained anymore at that point ! */
+			_starpu_free_all_automatically_allocated_buffers(memnode);
+			
+			_starpu_malloc_shutdown(memnode);
+			cuda_memnode_deinit[devid] = 1;
 
 #ifndef STARPU_SIMGRID
-		deinit_device_context(devid);
+			deinit_device_context(devid);
 #endif /* !STARPU_SIMGRID */
+                }
+
+                STARPU_PTHREAD_MUTEX_UNLOCK(&cuda_deinit_mutex[memnode]);
+
 	}
 
 	for (i = 0; i < worker_set->nworkers; i++)

+ 91 - 26
src/sched_policies/deque_modeling_policy_data_aware.c

@@ -196,6 +196,26 @@ static struct starpu_task *dmda_pop_ready_task(unsigned sched_ctx_id)
 	task = _starpu_fifo_pop_first_ready_task(fifo, node, dt->num_priorities);
 	if (task)
 	{
+		/* We now start the transfer, get rid of it in the completion
+		 * prediction */
+		double transfer_model = task->predicted_transfer;
+		if(!isnan(transfer_model)) 
+		{
+			fifo->exp_len -= transfer_model;
+			fifo->exp_start = starpu_timing_now() + transfer_model;
+			fifo->exp_end = fifo->exp_start + fifo->exp_len;
+			if(dt->num_priorities != -1)
+			{
+				int i;
+				int task_prio = _normalize_prio(task->priority, dt->num_priorities, task->sched_ctx);
+				for(i = 0; i <= task_prio; i++)
+					fifo->exp_len_per_priority[i] -= transfer_model;
+			}
+
+			fifo->pipeline_len += task->predicted + transfer_model;
+			fifo->pipelined_tasks++;
+		}
+
 		starpu_sched_ctx_list_task_counters_decrement(sched_ctx_id, workerid);
 
 #ifdef STARPU_VERBOSE
@@ -230,8 +250,30 @@ static struct starpu_task *dmda_pop_task(unsigned sched_ctx_id)
 	task = _starpu_fifo_pop_local_task(fifo);
 	if (task)
 	{
-		starpu_sched_ctx_list_task_counters_decrement(sched_ctx_id, workerid);
+		double transfer_model = task->predicted_transfer;
+		/* We now start the transfer, get rid of it in the completion
+		 * prediction */
+
+		if(!isnan(transfer_model)) 
+		{
+			double model = task->predicted;
+			fifo->exp_len -= transfer_model;
+			fifo->exp_start = starpu_timing_now() + transfer_model+model;
+			fifo->exp_end = fifo->exp_start + fifo->exp_len;
+			if(dt->num_priorities != -1)
+			{
+				int i;
+				int task_prio = _normalize_prio(task->priority, dt->num_priorities, task->sched_ctx);
+				for(i = 0; i <= task_prio; i++)
+					fifo->exp_len_per_priority[i] -= transfer_model;
+			}
 
+			fifo->pipeline_len += task->predicted + transfer_model;
+			fifo->pipelined_tasks++;
+
+		}
+		starpu_sched_ctx_list_task_counters_decrement(sched_ctx_id, workerid);
+		  
 #ifdef STARPU_VERBOSE
 		if (task->cl)
 		{
@@ -268,6 +310,28 @@ static struct starpu_task *dmda_pop_every_task(unsigned sched_ctx_id)
 
 	starpu_sched_ctx_list_task_counters_reset(sched_ctx_id, workerid);
 
+	while (new_list)
+	{
+		double transfer_model = new_list->predicted_transfer;
+		/* We now start the transfer, get rid of it in the completion
+		 * prediction */
+		if(!isnan(transfer_model)) 
+		{
+			fifo->exp_len -= transfer_model;
+			fifo->exp_start = starpu_timing_now() + transfer_model;
+			fifo->exp_end = fifo->exp_start + fifo->exp_len;
+			if(dt->num_priorities != -1)
+			{
+				int i;
+				for(i = 0; i < new_list->priority; i++)
+					fifo->exp_len_per_priority[i] -= transfer_model;
+			}
+		
+		}
+
+		new_list = new_list->next;
+	}
+
 	return new_list;
 }
 
@@ -282,7 +346,7 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 
         if(child_sched_ctx != STARPU_NMAX_SCHED_CTXS)
         {
-                starpu_sched_ctx_move_task_to_ctx(task, child_sched_ctx, 0);
+                starpu_sched_ctx_move_task_to_ctx(task, child_sched_ctx, 0, 1);
 		starpu_sched_ctx_revert_task_counters(sched_ctx_id, task->flops);
                 return 0;
         }
@@ -362,6 +426,13 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 	}
 
 	STARPU_AYU_ADDTOTASKQUEUE(_starpu_get_job_associated_to_task(task)->job_id, best_workerid);
+	unsigned stream_ctx_id = starpu_worker_get_sched_ctx_id_stream(best_workerid);
+	if(stream_ctx_id != STARPU_NMAX_SCHED_CTXS)
+	{
+		starpu_sched_ctx_move_task_to_ctx(task, stream_ctx_id, 0, 0);
+		starpu_sched_ctx_revert_task_counters(sched_ctx_id, task->flops);
+	}
+
 	int ret = 0;
 	if (prio)
 	{
@@ -584,6 +655,8 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 
 		/* Sometimes workers didn't take the tasks as early as we expected */
 		double exp_start = isnan(fifo->exp_start) ? starpu_timing_now() : STARPU_MAX(fifo->exp_start, starpu_timing_now());
+		exp_start += fifo->pipeline_len;
+
 		if (!starpu_worker_can_execute_task_impl(worker, task, &impl_mask))
 			continue;
 
@@ -852,8 +925,6 @@ static double _dmda_push_task(struct starpu_task *task, unsigned prio, unsigned
 	}
 	else
 	{
-//		double max_len = (max_exp_end - starpu_timing_now());
-		/* printf("%d: dmda max_exp_end %lf best_exp_end %lf max_len %lf \n", sched_ctx_id, max_exp_end/1000000.0, best_exp_end/1000000.0, max_len/1000000.0);	 */
 		return exp_end[best_in_ctx][selected_impl] ;
 	}
 }
@@ -1022,9 +1093,8 @@ static void deinitialize_dmda_policy(unsigned sched_ctx_id)
 /* dmda_pre_exec_hook is called right after the data transfer is done and right
  * before the computation to begin, it is useful to update more precisely the
  * value of the expected start, end, length, etc... */
-static void dmda_pre_exec_hook(struct starpu_task *task)
+static void dmda_pre_exec_hook(struct starpu_task *task, unsigned sched_ctx_id)
 {
-	unsigned sched_ctx_id = starpu_sched_ctx_get_ctx_for_task(task);
 	unsigned workerid = starpu_worker_get_id_check();
 	struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 	struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
@@ -1039,30 +1109,27 @@ static void dmda_pre_exec_hook(struct starpu_task *task)
 	 * of work. */
 	STARPU_PTHREAD_MUTEX_LOCK_SCHED(sched_mutex);
 
-	/* Take the opportunity to update start time */
-	fifo->exp_start = STARPU_MAX(starpu_timing_now(), fifo->exp_start);
-
-	if(!isnan(transfer_model))
+	if(fifo->pipelined_tasks > 0)
 	{
-		/* The transfer is over, get rid of it in the completion
-		 * prediction */
-		fifo->exp_len -= transfer_model;
-		if(dt->num_priorities != -1)
-		{
-			int i;
-			int task_prio = _normalize_prio(task->priority, dt->num_priorities, task->sched_ctx);
-			for(i = 0; i <= task_prio; i++)
-				fifo->exp_len_per_priority[i] -= transfer_model;
-		}
-
+		/* decrement here bc we add the predicted exec time of the task to exp_start
+		   we don't want to add it twice */
+		if (!isnan(task->predicted))
+			fifo->pipeline_len -= task->predicted;
+		if(!isnan(task->predicted_transfer))
+			fifo->pipeline_len -= task->predicted_transfer;
+		fifo->pipelined_tasks--;
 	}
 
+	/* Take the opportunity to update start time */
+	fifo->exp_start = STARPU_MAX(starpu_timing_now(), fifo->exp_start);
+
 	if(!isnan(model))
 	{
 		/* We now start the computation, get rid of it in the completion
 		 * prediction */
-		fifo->exp_len -= model;
-		fifo->exp_start += model;
+		fifo->exp_len-= model;
+                fifo->exp_start = starpu_timing_now() + model;
+                fifo->exp_end= fifo->exp_start + fifo->exp_len;
 		if(dt->num_priorities != -1)
 		{
 			int i;
@@ -1072,7 +1139,6 @@ static void dmda_pre_exec_hook(struct starpu_task *task)
 		}
 	}
 
-	fifo->exp_end = fifo->exp_start + fifo->exp_len;
 	STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(sched_mutex);
 }
 
@@ -1155,9 +1221,8 @@ static void dmda_push_task_notify(struct starpu_task *task, int workerid, int pe
 	STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(sched_mutex);
 }
 
-static void dmda_post_exec_hook(struct starpu_task * task)
+static void dmda_post_exec_hook(struct starpu_task * task, unsigned sched_ctx_id)
 {
-	unsigned sched_ctx_id = starpu_sched_ctx_get_ctx_for_task(task);
 	struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 	unsigned workerid = starpu_worker_get_id_check();
 	struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];

+ 1 - 1
src/sched_policies/eager_central_policy.c

@@ -182,7 +182,7 @@ static struct starpu_task *pop_task_eager_policy(unsigned sched_ctx_id)
 		unsigned child_sched_ctx = starpu_sched_ctx_worker_is_master_for_child_ctx(workerid, sched_ctx_id);
 		if(child_sched_ctx != STARPU_NMAX_SCHED_CTXS)
 		{
-			starpu_sched_ctx_move_task_to_ctx(chosen_task, child_sched_ctx, 1);
+			starpu_sched_ctx_move_task_to_ctx(chosen_task, child_sched_ctx, 1, 1);
 			starpu_sched_ctx_revert_task_counters(sched_ctx_id, chosen_task->flops);
 			return NULL;
 		}

+ 1 - 1
src/sched_policies/eager_central_priority_policy.c

@@ -289,7 +289,7 @@ static struct starpu_task *_starpu_priority_pop_task(unsigned sched_ctx_id)
                 unsigned child_sched_ctx = starpu_sched_ctx_worker_is_master_for_child_ctx(workerid, sched_ctx_id);
 		if(child_sched_ctx != STARPU_NMAX_SCHED_CTXS)
 		{
-			starpu_sched_ctx_move_task_to_ctx(chosen_task, child_sched_ctx, 1);
+			starpu_sched_ctx_move_task_to_ctx(chosen_task, child_sched_ctx, 1, 1);
 			starpu_sched_ctx_revert_task_counters(sched_ctx_id, chosen_task->flops);
 			return NULL;
 		}

+ 2 - 0
src/sched_policies/fifo_queues.c

@@ -56,6 +56,8 @@ struct _starpu_fifo_taskq *_starpu_create_fifo(void)
 	fifo->exp_len = 0.0;
 	fifo->exp_end = fifo->exp_start;
 	fifo->exp_len_per_priority = NULL;
+	fifo->pipeline_len = 0.0;
+	fifo->pipelined_tasks = 0;
 
 	return fifo;
 }

+ 2 - 0
src/sched_policies/fifo_queues.h

@@ -42,6 +42,8 @@ struct _starpu_fifo_taskq
 	double exp_end; /* Expected end date of last task in the queue */
 	double exp_len; /* Expected duration of the set of tasks in the queue */
 	double *exp_len_per_priority; /* Expected duration of the set of tasks in the queue corresponding to each priority */
+	double pipeline_len; /* the expected the length of the pipelined tasks */
+	int pipelined_tasks; /* the expected no of pipelined tasks */
 };
 
 struct _starpu_fifo_taskq*_starpu_create_fifo(void) STARPU_ATTRIBUTE_MALLOC;

+ 1 - 1
src/sched_policies/heteroprio.c

@@ -609,7 +609,7 @@ done:		;
 		unsigned child_sched_ctx = starpu_sched_ctx_worker_is_master_for_child_ctx(workerid, sched_ctx_id);
 		if(child_sched_ctx != STARPU_NMAX_SCHED_CTXS)
 		{
-			starpu_sched_ctx_move_task_to_ctx(task, child_sched_ctx, 1);
+			starpu_sched_ctx_move_task_to_ctx(task, child_sched_ctx, 1, 1);
 			starpu_sched_ctx_revert_task_counters(sched_ctx_id, task->flops);
 			return NULL;
 		}