Browse Source

- merge trunk

Olivier Aumage 11 years ago
parent
commit
cc86ba5270

+ 1 - 1
configure.ac

@@ -2216,7 +2216,7 @@ AM_CONDITIONAL(ATLAS_BLAS_LIB, test x$blas_lib = xatlas)
 AM_CONDITIONAL(GOTO_BLAS_LIB, test x$blas_lib = xgoto)
 AM_CONDITIONAL(MKL_BLAS_LIB, test x$blas_lib = xmkl)
 AM_CONDITIONAL(SYSTEM_BLAS_LIB, test x$blas_lib = xsystem)
-AM_CONDITIONAL(NO_BLAS_LIB, test x$blas_lib = xnone)
+AM_CONDITIONAL(NO_BLAS_LIB, test x$blas_lib = xnone -a x$enable_simgrid = xno)
 
 AC_MSG_CHECKING(which BLAS lib should be used)
 AC_MSG_RESULT($blas_lib)

+ 85 - 1
examples/common/blas.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2009, 2010  Université de Bordeaux 1
+ * Copyright (C) 2009, 2010, 2014  Université de Bordeaux 1
  * Copyright (C) 2010  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -415,6 +415,90 @@ void DSWAP(const int n, double *X, const int incX, double *Y, const int incY)
 }
 
 
+#elif defined(STARPU_SIMGRID)
+inline void SGEMM(char *transa, char *transb, int M, int N, int K, 
+			float alpha, const float *A, int lda, const float *B, int ldb, 
+			float beta, float *C, int ldc) { }
+
+inline void DGEMM(char *transa, char *transb, int M, int N, int K, 
+			double alpha, double *A, int lda, double *B, int ldb, 
+			double beta, double *C, int ldc) { }
+
+inline void SGEMV(char *transa, int M, int N, float alpha, float *A, int lda,
+		float *X, int incX, float beta, float *Y, int incY) { }
+
+inline void DGEMV(char *transa, int M, int N, double alpha, double *A, int lda,
+		double *X, int incX, double beta, double *Y, int incY) { }
+
+inline float SASUM(int N, float *X, int incX) { }
+
+inline double DASUM(int N, double *X, int incX) { }
+
+void SSCAL(int N, float alpha, float *X, int incX) { }
+
+void DSCAL(int N, double alpha, double *X, int incX) { }
+
+void STRSM (const char *side, const char *uplo, const char *transa,
+                   const char *diag, const int m, const int n,
+                   const float alpha, const float *A, const int lda,
+                   float *B, const int ldb) { }
+
+void DTRSM (const char *side, const char *uplo, const char *transa,
+                   const char *diag, const int m, const int n,
+                   const double alpha, const double *A, const int lda,
+                   double *B, const int ldb) { }
+
+void SSYR (const char *uplo, const int n, const float alpha,
+                  const float *x, const int incx, float *A, const int lda) { }
+
+void SSYRK (const char *uplo, const char *trans, const int n,
+                   const int k, const float alpha, const float *A,
+                   const int lda, const float beta, float *C,
+                   const int ldc) { }
+
+void SGER(const int m, const int n, const float alpha,
+                  const float *x, const int incx, const float *y,
+                  const int incy, float *A, const int lda) { }
+
+void DGER(const int m, const int n, const double alpha,
+                  const double *x, const int incx, const double *y,
+                  const int incy, double *A, const int lda) { }
+
+void STRSV (const char *uplo, const char *trans, const char *diag, 
+                   const int n, const float *A, const int lda, float *x, 
+                   const int incx) { }
+
+void STRMM(const char *side, const char *uplo, const char *transA,
+                 const char *diag, const int m, const int n,
+                 const float alpha, const float *A, const int lda,
+                 float *B, const int ldb) { }
+
+void DTRMM(const char *side, const char *uplo, const char *transA,
+                 const char *diag, const int m, const int n,
+                 const double alpha, const double *A, const int lda,
+                 double *B, const int ldb) { }
+
+void STRMV(const char *uplo, const char *transA, const char *diag,
+                 const int n, const float *A, const int lda, float *X,
+                 const int incX) { }
+
+void SAXPY(const int n, const float alpha, float *X, const int incX, float *Y, const int incY) { }
+
+void DAXPY(const int n, const double alpha, double *X, const int incX, double *Y, const int incY) { }
+
+int ISAMAX (const int n, float *X, const int incX) { }
+
+int IDAMAX (const int n, double *X, const int incX) { }
+
+float SDOT(const int n, const float *x, const int incx, const float *y, const int incy) { }
+
+double DDOT(const int n, const double *x, const int incx, const double *y, const int incy) { }
+
+void SSWAP(const int n, float *X, const int incX, float *Y, const int incY) { }
+
+void DSWAP(const int n, double *X, const int incX, double *Y, const int incY) { }
+
+
 #else
 #error "no BLAS lib available..."
 #endif

+ 6 - 6
examples/sched_ctx/parallel_code.c

@@ -61,8 +61,8 @@ static void sched_ctx_func(void *descr[] STARPU_ATTRIBUTE_UNUSED, void *arg)
 static struct starpu_codelet sched_ctx_codelet =
 {
 	.cpu_funcs = {sched_ctx_func, NULL},
-	.cuda_funcs = {sched_ctx_func, NULL},
-	.opencl_funcs = {sched_ctx_func, NULL},
+	.cuda_funcs = {NULL},
+	.opencl_funcs = {NULL},
 	.model = NULL,
 	.nbuffers = 0,
 	.name = "sched_ctx"
@@ -139,11 +139,11 @@ int main(int argc, char **argv)
 	for(j = nprocs5; j < nprocs5+nprocs6; j++)
 		procs6[k++] = procs2[j];
 
-	int master3 = starpu_sched_ctx_book_workers_for_task(procs3, nprocs3);
-	int master4 = starpu_sched_ctx_book_workers_for_task(procs4, nprocs4);
+	int master3 = starpu_sched_ctx_book_workers_for_task(sched_ctx1, procs3, nprocs3);
+	int master4 = starpu_sched_ctx_book_workers_for_task(sched_ctx1, procs4, nprocs4);
 
-	int master5 = starpu_sched_ctx_book_workers_for_task(procs5, nprocs5);
-	int master6 = starpu_sched_ctx_book_workers_for_task(procs6, nprocs6);
+	int master5 = starpu_sched_ctx_book_workers_for_task(sched_ctx2, procs5, nprocs5);
+	int master6 = starpu_sched_ctx_book_workers_for_task(sched_ctx2, procs6, nprocs6);
 
 /* 	int master1 = starpu_sched_ctx_book_workers_for_task(procs1, nprocs1); */
 /* 	int master2 = starpu_sched_ctx_book_workers_for_task(procs2, nprocs2); */

+ 2 - 2
examples/sched_ctx/sched_ctx.c

@@ -36,8 +36,8 @@ static void sched_ctx_func(void *descr[] STARPU_ATTRIBUTE_UNUSED, void *arg STAR
 static struct starpu_codelet sched_ctx_codelet =
 {
 	.cpu_funcs = {sched_ctx_func, NULL},
-	.cuda_funcs = {sched_ctx_func, NULL},
-	.opencl_funcs = {sched_ctx_func, NULL},
+	.cuda_funcs = {NULL},
+	.opencl_funcs = {NULL},
 	.model = NULL,
 	.nbuffers = 0,
 	.name = "sched_ctx"

+ 18 - 7
examples/sched_ctx/sched_ctx_without_sched_policy.c

@@ -59,8 +59,8 @@ static void sched_ctx_func(void *descr[] STARPU_ATTRIBUTE_UNUSED, void *arg)
 static struct starpu_codelet sched_ctx_codelet =
 {
 	.cpu_funcs = {sched_ctx_func, NULL},
-	.cuda_funcs = {sched_ctx_func, NULL},
-	.opencl_funcs = {sched_ctx_func, NULL},
+	.cuda_funcs = { NULL},
+	.opencl_funcs = {NULL},
 	.model = NULL,
 	.nbuffers = 0,
 	.name = "sched_ctx"
@@ -90,11 +90,22 @@ int main(int argc, char **argv)
 	procs2 = (int*)malloc(ncpus*sizeof(int));
 	starpu_worker_get_ids_by_type(STARPU_CPU_WORKER, procs1, ncpus);
 
-	nprocs1 = ncpus/2;
-	nprocs2 =  nprocs1;
-	k = 0;
-	for(j = nprocs1; j < nprocs1+nprocs2; j++)
-		procs2[k++] = j;
+	if(ncpus > 1)
+	{
+		nprocs1 = ncpus/2;
+		nprocs2 =  ncpus-nprocs1;
+		k = 0;
+		for(j = nprocs1; j < nprocs1+nprocs2; j++)
+			procs2[k++] = j;
+	}
+	else
+	{
+		procs1 = (int*)malloc(nprocs1*sizeof(int));
+		procs2 = (int*)malloc(nprocs2*sizeof(int));
+		procs1[0] = 0;
+		procs2[0] = 0;
+
+	}
 #else
 	procs1 = (int*)malloc(nprocs1*sizeof(int));
 	procs2 = (int*)malloc(nprocs2*sizeof(int));

+ 3 - 1
examples/worker_collections/worker_list_example.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2010-2013  Université de Bordeaux 1
+ * Copyright (C) 2010-2014  Université de Bordeaux 1
  * Copyright (C) 2010-2014  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -86,4 +86,6 @@ int main()
 	FPRINTF(stderr, "timing init = %lf \n", timing);
 	co->deinit(co);
 	starpu_shutdown();
+
+	return 0;
 }

+ 3 - 1
examples/worker_collections/worker_tree_example.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2010-2013  Université de Bordeaux 1
+ * Copyright (C) 2010-2014  Université de Bordeaux 1
  * Copyright (C) 2010-2014  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -95,5 +95,7 @@ int main()
 
 	co->deinit(co);
 	starpu_shutdown();
+
+	return 0;
 }
 #endif

+ 1 - 1
include/starpu_sched_ctx.h

@@ -121,7 +121,7 @@ void starpu_sched_ctx_get_available_cpuids(unsigned sched_ctx_id, int **cpuids,
 
 void starpu_sched_ctx_bind_current_thread_to_cpuid(unsigned cpuid);
 
-int starpu_sched_ctx_book_workers_for_task(int *workerids, int nworkers);
+int starpu_sched_ctx_book_workers_for_task(unsigned sched_ctx_id, int *workerids, int nworkers);
 
 void starpu_sched_ctx_unbook_workers_for_task(unsigned sched_ctx_id, int master);
 

+ 61 - 0
src/common/fxt.h

@@ -107,6 +107,8 @@
 
 #define _STARPU_FUT_EVENT	0x513c
 
+#define _STARPU_FUT_WORKER_SCHEDULING_START	0x513e
+
 #define _STARPU_FUT_LOCKING_MUTEX	0x5140	
 #define _STARPU_FUT_MUTEX_LOCKED	0x5141	
 
@@ -289,6 +291,61 @@ do {									\
 } while (0);
 #endif
 
+#ifdef FUT_DO_PROBE6STR
+#define _STARPU_FUT_DO_PROBE6STR(CODE, P1, P2, P3, P4, P5, P6, str) FUT_DO_PROBE6STR(CODE, P1, P2, P3, P4, P5, P6, str)
+#else
+#define _STARPU_FUT_DO_PROBE5STR(CODE, P1, P2, P3, P4, P5, P6, str)	\
+do {									\
+    if(fut_active) {							\
+	/* No more than FXT_MAX_PARAMS args are allowed */		\
+	/* we add a \0 just in case ... */				\
+	size_t len = STARPU_MIN(strlen(str)+1, (FXT_MAX_PARAMS - 6)*sizeof(unsigned long));\
+	unsigned nbargs_str = (len + sizeof(unsigned long) - 1)/(sizeof(unsigned long));\
+	unsigned nbargs = 6 + nbargs_str;				\
+	size_t total_len = FUT_SIZE(nbargs);				\
+	unsigned long *futargs =					\
+		fut_getstampedbuffer(FUT_CODE(CODE, nbargs), total_len);\
+	*(futargs++) = (unsigned long)(P1);				\
+	*(futargs++) = (unsigned long)(P2);				\
+	*(futargs++) = (unsigned long)(P3);				\
+	*(futargs++) = (unsigned long)(P4);				\
+	*(futargs++) = (unsigned long)(P5);				\
+	*(futargs++) = (unsigned long)(P6);				\
+	snprintf((char *)futargs, len, "%s", str);			\
+	((char *)futargs)[len - 1] = '\0';				\
+	_STARPU_FUT_COMMIT(total_len);					\
+    }									\
+} while (0);
+#endif
+
+#ifdef FUT_DO_PROBE7STR
+#define _STARPU_FUT_DO_PROBE7STR(CODE, P1, P2, P3, P4, P5, P6, P7, str) FUT_DO_PROBE7STR(CODE, P1, P2, P3, P4, P5, P6, P7, str)
+#else
+#define _STARPU_FUT_DO_PROBE6STR(CODE, P1, P2, P3, P4, P5, P6, P7, str)	\
+do {									\
+    if(fut_active) {							\
+	/* No more than FXT_MAX_PARAMS args are allowed */		\
+	/* we add a \0 just in case ... */				\
+	size_t len = STARPU_MIN(strlen(str)+1, (FXT_MAX_PARAMS - 7)*sizeof(unsigned long));\
+	unsigned nbargs_str = (len + sizeof(unsigned long) - 1)/(sizeof(unsigned long));\
+	unsigned nbargs = 7 + nbargs_str;				\
+	size_t total_len = FUT_SIZE(nbargs);				\
+	unsigned long *futargs =					\
+		fut_getstampedbuffer(FUT_CODE(CODE, nbargs), total_len);\
+	*(futargs++) = (unsigned long)(P1);				\
+	*(futargs++) = (unsigned long)(P2);				\
+	*(futargs++) = (unsigned long)(P3);				\
+	*(futargs++) = (unsigned long)(P4);				\
+	*(futargs++) = (unsigned long)(P5);				\
+	*(futargs++) = (unsigned long)(P6);				\
+	*(futargs++) = (unsigned long)(P7);				\
+	snprintf((char *)futargs, len, "%s", str);			\
+	((char *)futargs)[len - 1] = '\0';				\
+	_STARPU_FUT_COMMIT(total_len);					\
+    }									\
+} while (0);
+#endif
+
 #ifndef FUT_RAW_PROBE7
 #define FUT_RAW_PROBE7(CODE,P1,P2,P3,P4,P5,P6,P7) do {		\
 		if(fut_active) {					\
@@ -427,6 +484,9 @@ do {										\
 #define _STARPU_TRACE_WORKER_DEINIT_END(workerkind)		\
 	FUT_DO_PROBE2(_STARPU_FUT_WORKER_DEINIT_END, workerkind, _starpu_gettid());
 
+#define _STARPU_TRACE_WORKER_SCHEDULING_START	\
+	FUT_DO_PROBE1(_STARPU_FUT_WORKER_SCHEDULING_START, _starpu_gettid());
+
 #define _STARPU_TRACE_WORKER_SLEEP_START	\
 	FUT_DO_PROBE1(_STARPU_FUT_WORKER_SLEEP_START, _starpu_gettid());
 
@@ -699,6 +759,7 @@ do {										\
 #define _STARPU_TRACE_WORK_STEALING(a, b)	do {} while(0)
 #define _STARPU_TRACE_WORKER_DEINIT_START	do {} while(0)
 #define _STARPU_TRACE_WORKER_DEINIT_END(a)	do {} while(0)
+#define _STARPU_TRACE_WORKER_SCHEDULING_START		do {} while(0)
 #define _STARPU_TRACE_WORKER_SLEEP_START		do {} while(0)
 #define _STARPU_TRACE_WORKER_SLEEP_END		do {} while(0)
 #define _STARPU_TRACE_USER_DEFINED_START		do {} while(0)

+ 3 - 1
src/core/errorcheck.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2009, 2010  Université de Bordeaux 1
+ * Copyright (C) 2009, 2010, 2014  Université de Bordeaux 1
  * Copyright (C) 2010, 2011  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -34,6 +34,8 @@ enum _starpu_worker_status
 	STATUS_EXECUTING,
 	/* during the execution of the callback */
 	STATUS_CALLBACK,
+	/* while executing the scheduler code */
+	STATUS_SCHEDULING,
 	/* while sleeping because there is nothing to do */
 	STATUS_SLEEPING,
 	/* while a sleeping worker is about to wake up (to avoid waking twice for the same worker) */

+ 114 - 110
src/core/sched_ctx.c

@@ -32,8 +32,8 @@ double flops[STARPU_NMAX_SCHED_CTXS][STARPU_NMAXWORKERS];
 size_t data_size[STARPU_NMAX_SCHED_CTXS][STARPU_NMAXWORKERS];
 
 static unsigned _starpu_get_first_free_sched_ctx(struct _starpu_machine_config *config);
-static void _starpu_sched_ctx_add_workers_to_master(int *workerids, int nworkers, int new_master);
-static void _starpu_sched_ctx_wake_these_workers_up(int *workerids, int nworkers);
+static void _starpu_sched_ctx_add_workers_to_master(unsigned sched_ctx_id, int *workerids, int nworkers, int new_master);
+static void _starpu_sched_ctx_wake_these_workers_up(unsigned sched_ctx_id, int *workerids, int nworkers);
 
 static void _starpu_worker_gets_into_ctx(unsigned sched_ctx_id, struct _starpu_worker *worker)
 {
@@ -47,6 +47,8 @@ static void _starpu_worker_gets_into_ctx(unsigned sched_ctx_id, struct _starpu_w
 		worker->nsched_ctxs++;
 	}
 	worker->removed_from_ctx[sched_ctx_id] = 0;
+	if(worker->tmp_sched_ctx == sched_ctx_id)
+		worker->tmp_sched_ctx = -1;
 	return;
 }
 
@@ -137,6 +139,7 @@ static void _starpu_add_workers_to_sched_ctx(struct _starpu_sched_ctx *sched_ctx
 	int nworkers_to_add = nworkers == -1 ? (int)config->topology.nworkers : nworkers;
 	int workers_to_add[nworkers_to_add];
 
+
 	int i = 0;
 	for(i = 0; i < nworkers_to_add; i++)
 	{
@@ -165,16 +168,19 @@ static void _starpu_add_workers_to_sched_ctx(struct _starpu_sched_ctx *sched_ctx
 			int worker = (workerids == NULL ? i : workerids[i]);
 			workers->add(workers, worker);
 			workers_to_add[i] = worker;
+			struct _starpu_worker *str_worker = _starpu_get_worker_struct(worker);
+			str_worker->tmp_sched_ctx = (int)sched_ctx->id;
+
 		}
 	}
 
 	if(!sched_ctx->sched_policy)
 	{
-		if(sched_ctx->master == -1)
-			sched_ctx->master = starpu_sched_ctx_book_workers_for_task(workerids, nworkers);
+		if(sched_ctx->main_master == -1)
+			sched_ctx->main_master = starpu_sched_ctx_book_workers_for_task(sched_ctx->id, workerids, nworkers);
 		else
 		{
-			_starpu_sched_ctx_add_workers_to_master(workerids, nworkers, sched_ctx->master);
+			_starpu_sched_ctx_add_workers_to_master(sched_ctx->id, workerids, nworkers, sched_ctx->main_master);
 		}
 	}
 	else if(sched_ctx->sched_policy->add_workers)
@@ -211,7 +217,7 @@ static void _starpu_remove_workers_from_sched_ctx(struct _starpu_sched_ctx *sche
 	}
 
 	if(!sched_ctx->sched_policy)
-		_starpu_sched_ctx_wake_these_workers_up(removed_workers, *n_removed_workers);
+		_starpu_sched_ctx_wake_these_workers_up(sched_ctx->id, removed_workers, *n_removed_workers);
 
 	return;
 }
@@ -298,7 +304,22 @@ struct _starpu_sched_ctx* _starpu_create_sched_ctx(struct starpu_sched_policy *p
 	_starpu_barrier_counter_init(&sched_ctx->ready_tasks_barrier, 0);
 
 	sched_ctx->ready_flops = 0.0;
-	sched_ctx->master = -1;
+	sched_ctx->main_master = -1;
+	
+	int w;
+	for(w = 0; w < nworkers; w++)
+	{
+		sem_init(&sched_ctx->fall_asleep_sem[w], 0, 0);
+		sem_init(&sched_ctx->wake_up_sem[w], 0, 0);
+
+		STARPU_PTHREAD_COND_INIT(&sched_ctx->parallel_sect_cond[w], NULL);
+		STARPU_PTHREAD_MUTEX_INIT(&sched_ctx->parallel_sect_mutex[w], NULL);
+		
+		sched_ctx->master[w] = -1;
+		sched_ctx->parallel_sect[w] = 0;
+		sched_ctx->sleeping[w] = 0;
+	}
+
 	
         /*init the strategy structs and the worker_collection of the ressources of the context */
 	if(policy)
@@ -581,8 +602,7 @@ static void _starpu_delete_sched_ctx(struct _starpu_sched_ctx *sched_ctx)
 		free(sched_ctx->sched_policy);
 		sched_ctx->sched_policy = NULL;
 	}
-	else
-		starpu_sched_ctx_unbook_workers_for_task(sched_ctx->id, sched_ctx->master);
+	
 
 	STARPU_PTHREAD_MUTEX_DESTROY(&sched_ctx->empty_ctx_mutex);
 	sched_ctx->id = STARPU_NMAX_SCHED_CTXS;
@@ -634,6 +654,8 @@ void starpu_sched_ctx_delete(unsigned sched_ctx_id)
 
 	if(!_starpu_wait_for_all_tasks_of_sched_ctx(sched_ctx_id))
 	{
+		if(!sched_ctx->sched_policy)
+			starpu_sched_ctx_unbook_workers_for_task(sched_ctx->id, sched_ctx->main_master);
 		/*if btw the mutex release & the mutex lock the context has changed take care to free all
 		  scheduling data before deleting the context */
 		_starpu_update_workers_without_ctx(workerids, nworkers_ctx, sched_ctx_id, 1);
@@ -1507,33 +1529,6 @@ unsigned _starpu_sched_ctx_last_worker_awake(struct _starpu_worker *worker)
 	return 0;
 }
 
-static void _starpu_sched_ctx_bind_thread_to_ctx_cpus(unsigned sched_ctx_id)
-{
-	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	struct _starpu_machine_config *config = _starpu_get_machine_config();
-
-#ifdef STARPU_HAVE_HWLOC	
-	const struct hwloc_topology_support *support = hwloc_topology_get_support(config->topology.hwtopology);
-        if (support->cpubind->set_thisthread_cpubind)
-        {
-		hwloc_bitmap_t set = sched_ctx->hwloc_workers_set;
-                int ret;
-                ret = hwloc_set_cpubind (config->topology.hwtopology, set,
-                                         HWLOC_CPUBIND_THREAD);
-		if (ret)
-                {
-                        perror("binding thread");
-			STARPU_ABORT();
-                }
-	}
-
-#else
-#warning no sched ctx CPU binding support
-#endif
-
-	return;
-}
-
 void starpu_sched_ctx_bind_current_thread_to_cpuid(unsigned cpuid STARPU_ATTRIBUTE_UNUSED)
 {
 	struct _starpu_machine_config *config = _starpu_get_machine_config();
@@ -1594,68 +1589,75 @@ void starpu_sched_ctx_bind_current_thread_to_cpuid(unsigned cpuid STARPU_ATTRIBU
 
 }
 
-static void _starpu_sched_ctx_get_workers_to_sleep(int *workerids, int nworkers, int master)
+static unsigned _worker_sleeping_in_other_ctx(unsigned sched_ctx_id, int workerid)
 {
+	int s;
+	for(s = 0; s < STARPU_NMAX_SCHED_CTXS; s++)
+	{
+		struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(s);
+		if(sched_ctx && sched_ctx->id > 0 && sched_ctx->id < STARPU_NMAX_SCHED_CTXS && sched_ctx->id != sched_ctx_id)
+		{
+			if(sched_ctx->parallel_sect[workerid])
+				return 1;
+		}
+	}
+	return 0;
+
+}
+static void _starpu_sched_ctx_get_workers_to_sleep(unsigned sched_ctx_id, int *workerids, int nworkers, int master)
+{
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
 	int current_worker_id = starpu_worker_get_id();
-	
+	unsigned sleeping[nworkers];
 	int w;
-	struct _starpu_worker *worker = NULL;
 	for(w = 0; w < nworkers; w++)
 	{
-		worker = _starpu_get_worker_struct(workerids[w]);
-		if(current_worker_id == -1 || worker->workerid != current_worker_id)
-			STARPU_PTHREAD_MUTEX_LOCK(&worker->parallel_sect_mutex);
-
-		worker->master = master;
-		worker->parallel_sect = 1;
-		if(current_worker_id == -1 || worker->workerid != current_worker_id)
-			STARPU_PTHREAD_MUTEX_UNLOCK(&worker->parallel_sect_mutex);
+		if(current_worker_id == -1 || workerids[w] != current_worker_id)
+			STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->parallel_sect_mutex[workerids[w]]);
+		sleeping[w] = _worker_sleeping_in_other_ctx(sched_ctx_id, workerids[w]);
+		sched_ctx->master[workerids[w]] = master;
+		sched_ctx->parallel_sect[workerids[w]] = 1;
+		if(current_worker_id == -1 || workerids[w] != current_worker_id)
+			STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->parallel_sect_mutex[workerids[w]]);
 	}
 
-	struct _starpu_worker *master_worker = _starpu_get_worker_struct(master);
 	int workerid;
 	for(w = 0; w < nworkers; w++)
 	{
 		workerid = workerids[w];
-		if(current_worker_id == -1 || workerid != current_worker_id)
+		if((current_worker_id == -1 || workerid != current_worker_id) && !sleeping[w])
 		{
-			sem_wait(&master_worker->fall_asleep_sem);
+			sched_ctx->sleeping[workerids[w]] = 1;
+			sem_wait(&sched_ctx->fall_asleep_sem[master]);
 		}
 	}
 	return;
 }
 
-void _starpu_sched_ctx_signal_worker_blocked(int workerid)
+void _starpu_sched_ctx_signal_worker_blocked(unsigned sched_ctx_id, int workerid)
 {
-	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
-	struct _starpu_worker *master_worker = _starpu_get_worker_struct(worker->master);
-	struct _starpu_sched_ctx *sched_ctx = NULL;
-	struct _starpu_sched_ctx_list *l = NULL;
-	sem_post(&master_worker->fall_asleep_sem);
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	int master = sched_ctx->master[workerid];
+	sem_post(&sched_ctx->fall_asleep_sem[master]);
 
 	return;
 }
 
-void _starpu_sched_ctx_signal_worker_woke_up(int workerid)
+void _starpu_sched_ctx_signal_worker_woke_up(unsigned sched_ctx_id, int workerid)
 {
-	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
-	struct _starpu_worker *master_worker = _starpu_get_worker_struct(worker->master);
-	struct _starpu_sched_ctx *sched_ctx = NULL;
-	struct _starpu_sched_ctx_list *l = NULL;
-	
-	sem_post(&master_worker->wake_up_sem);
-
-	worker->master = -1;
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	int master = sched_ctx->master[workerid];
+	sem_post(&sched_ctx->wake_up_sem[master]);
+	sched_ctx->sleeping[workerid] = 0;
+	sched_ctx->master[workerid] = -1;
 	return;
 }
 
 static void _starpu_sched_ctx_wake_up_workers(unsigned sched_ctx_id, int master)
 {
-	int current_worker_id = starpu_worker_get_id();
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	int current_worker_id = starpu_worker_get_id();
 	struct starpu_worker_collection *workers = sched_ctx->workers;
-	struct _starpu_worker *worker = NULL;
-	struct _starpu_worker *master_worker = _starpu_get_worker_struct(master);
 
 	struct starpu_sched_ctx_iterator it;
 	if(workers->init_iterator)
@@ -1663,18 +1665,19 @@ static void _starpu_sched_ctx_wake_up_workers(unsigned sched_ctx_id, int master)
 
 	while(workers->has_next(workers, &it))
 	{
-		worker = _starpu_get_worker_struct(workers->get_next(workers, &it));
-		if(worker->master == master)
+		int workerid = workers->get_next(workers, &it);
+		int curr_master = sched_ctx->master[workerid];
+		if(curr_master == master && sched_ctx->parallel_sect[workerid])
 		{
-			if(current_worker_id == -1 || worker->workerid != current_worker_id)
+			if((current_worker_id == -1 || workerid != current_worker_id) && sched_ctx->sleeping[workerid])
 			{
-				STARPU_PTHREAD_MUTEX_LOCK(&worker->parallel_sect_mutex);
-				STARPU_PTHREAD_COND_SIGNAL(&worker->parallel_sect_cond);
-				STARPU_PTHREAD_MUTEX_UNLOCK(&worker->parallel_sect_mutex);
-				sem_wait(&master_worker->wake_up_sem);
+				STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->parallel_sect_mutex[workerid]);
+				STARPU_PTHREAD_COND_SIGNAL(&sched_ctx->parallel_sect_cond[workerid]);
+				STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->parallel_sect_mutex[workerid]);
+				sem_wait(&sched_ctx->wake_up_sem[master]);
 			}
 			else
-				worker->parallel_sect = 0;
+				sched_ctx->parallel_sect[workerid] = 0;
 		}
 	}
 
@@ -1685,11 +1688,8 @@ void* starpu_sched_ctx_exec_parallel_code(void* (*func)(void*), void* param, uns
 {
 	int *workerids;
 	int nworkers = starpu_sched_ctx_get_workers_list(sched_ctx_id, &workerids);
-	_starpu_sched_ctx_get_workers_to_sleep(workerids, nworkers, workerids[nworkers-1]);
+	_starpu_sched_ctx_get_workers_to_sleep(sched_ctx_id, workerids, nworkers, workerids[nworkers-1]);
 
-	/* bind current thread on all workers of the context */
-//	_starpu_sched_ctx_bind_thread_to_ctx_cpus(sched_ctx_id);
-	
 	/* execute parallel code */
 	void* ret = func(param);
 
@@ -1717,8 +1717,8 @@ void starpu_sched_ctx_get_available_cpuids(unsigned sched_ctx_id, int **cpuids,
 	while(workers->has_next(workers, &it))
 	{
 		workerid = workers->get_next(workers, &it);
-		worker = _starpu_get_worker_struct(workerid);
-		if(worker->master == current_worker_id || workerid == current_worker_id || current_worker_id == -1)
+		int master = sched_ctx->master[workerid];
+		if(master == current_worker_id || workerid == current_worker_id || current_worker_id == -1)
 		{
 			(*cpuids)[w++] = starpu_worker_get_bindid(workerid);
 		}
@@ -1727,8 +1727,9 @@ void starpu_sched_ctx_get_available_cpuids(unsigned sched_ctx_id, int **cpuids,
 	return;
 }
 
-static void _starpu_sched_ctx_wake_these_workers_up(int *workerids, int nworkers)
+static void _starpu_sched_ctx_wake_these_workers_up(unsigned sched_ctx_id, int *workerids, int nworkers)
 {
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
 	int current_worker_id = starpu_worker_get_id();
 
 	int masters[nworkers];
@@ -1736,37 +1737,37 @@ static void _starpu_sched_ctx_wake_these_workers_up(int *workerids, int nworkers
 	struct _starpu_worker *worker = NULL;
 	for(w = 0; w < nworkers; w++)
 	{
-		worker = _starpu_get_worker_struct(workerids[w]);
-		masters[w] = worker->master;
-		if(current_worker_id == -1 || worker->workerid != current_worker_id)
+		int workerid = workerids[w];
+		masters[w] = sched_ctx->master[workerid];
+		if(current_worker_id == -1 || workerid != current_worker_id)
 		{
-			STARPU_PTHREAD_MUTEX_LOCK(&worker->parallel_sect_mutex);
-			STARPU_PTHREAD_COND_SIGNAL(&worker->parallel_sect_cond);
-			STARPU_PTHREAD_MUTEX_UNLOCK(&worker->parallel_sect_mutex);
+			STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->parallel_sect_mutex[workerid]);
+			STARPU_PTHREAD_COND_SIGNAL(&sched_ctx->parallel_sect_cond[workerid]);
+			STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->parallel_sect_mutex[workerid]);
 		}
 		else
-			worker->parallel_sect = 0;
-		worker->master = -1;
+			sched_ctx->parallel_sect[workerid] = 0;
+		sched_ctx->master[workerid] = -1;
 	}
 
 	int workerid;
-	struct _starpu_worker *master_worker = NULL;
 	for(w = 0; w < nworkers; w++)
 	{
 		workerid = workerids[w];
 		if(masters[w] != -1)
 		{
-			master_worker = _starpu_get_worker_struct(masters[w]);
+			int master = sched_ctx->master[workerid];
 			if(current_worker_id == -1 || workerid != current_worker_id)
-				sem_wait(&master_worker->wake_up_sem);
+				sem_wait(&sched_ctx->wake_up_sem[master]);
 		}
 	}
 
 	return;
 }
 
-static int _starpu_sched_ctx_find_master(int *workerids, int nworkers)
+static int _starpu_sched_ctx_find_master(unsigned sched_ctx_id, int *workerids, int nworkers)
 {
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
 	int new_master = workerids[nworkers-1];
         int current_worker_id = starpu_worker_get_id();
         int current_is_in_section = 0;
@@ -1783,23 +1784,23 @@ static int _starpu_sched_ctx_find_master(int *workerids, int nworkers)
                 if (current_worker_id == workerids[w])
                         current_is_in_section = 1;
 
-                struct _starpu_worker *worker = _starpu_get_worker_struct(workerids[w]);
-                if (worker->master > -1)
+		int master = sched_ctx->master[workerids[w]];
+                if (master > -1)
 		{
                         int already_seen = 0;
                         //Could create a function for this. Basically searching an element in an array.                                                                                                             
-                        for (i=0 ; i<npotential_masters ; i++)
+                        for (i = 0 ; i < npotential_masters; i++)
                         {
-                                if (potential_masters[i] == worker->master)
+                                if (potential_masters[i] == master)
 				{
                                         already_seen = 1;
                                         break;
 				}
                         }
                         if (!already_seen)
-				potential_masters[npotential_masters++] = worker->master;
+				potential_masters[npotential_masters++] = master;
                 }
-                else if (worker->master == -1)
+                else if (master == -1)
                         awake_workers[nawake_workers++] = workerids[w];
         }
 
@@ -1833,8 +1834,9 @@ static int _starpu_sched_ctx_find_master(int *workerids, int nworkers)
 	return new_master;
 }
 
-static void _starpu_sched_ctx_add_workers_to_master(int *workerids, int nworkers, int new_master)
+static void _starpu_sched_ctx_add_workers_to_master(unsigned sched_ctx_id, int *workerids, int nworkers, int new_master)
 {
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
 	int w;
 	int nput_to_sleep = 0;
 	int nwake_up = 0;
@@ -1843,22 +1845,24 @@ static void _starpu_sched_ctx_add_workers_to_master(int *workerids, int nworkers
 	
 	for(w = 0 ; w < nworkers ; w++)
 	{
-		struct _starpu_worker *worker = _starpu_get_worker_struct(workerids[w]);
-		if (worker->master == -1 && workerids[w] != new_master)
+		int master = sched_ctx->master[workerids[w]];
+		if (master == -1 && workerids[w] != new_master)
 			put_to_sleep[nput_to_sleep++] = workerids[w];
-		else if(worker->master != -1 && workerids[w] == new_master)
+		else if(master != -1 && workerids[w] == new_master)
 			wake_up[nwake_up++] = workerids[w];
 	}
 
-	_starpu_sched_ctx_wake_these_workers_up(wake_up, nwake_up);
-	_starpu_sched_ctx_get_workers_to_sleep(put_to_sleep, nput_to_sleep, new_master);
+	if(nwake_up > 0)
+		_starpu_sched_ctx_wake_these_workers_up(sched_ctx_id, wake_up, nwake_up);
+	if(nput_to_sleep > 0)
+		_starpu_sched_ctx_get_workers_to_sleep(sched_ctx_id, put_to_sleep, nput_to_sleep, new_master);
 
 }
 
-int starpu_sched_ctx_book_workers_for_task(int *workerids, int nworkers)
+int starpu_sched_ctx_book_workers_for_task(unsigned sched_ctx_id, int *workerids, int nworkers)
 { 
-	int new_master = _starpu_sched_ctx_find_master(workerids, nworkers);	
-	_starpu_sched_ctx_add_workers_to_master(workerids, nworkers, new_master);
+	int new_master = _starpu_sched_ctx_find_master(sched_ctx_id, workerids, nworkers);	
+	_starpu_sched_ctx_add_workers_to_master(sched_ctx_id, workerids, nworkers, new_master);
 	return new_master;
 }
 

+ 26 - 3
src/core/sched_ctx.h

@@ -123,7 +123,30 @@ struct _starpu_sched_ctx
 	/* if we execute non-StarPU code inside the context 
 	   we have a single master worker that stays awake, 
 	   if not master is -1 */
-	int master;
+	int main_master;
+
+	/* conditions variables used when parallel sections are executed in contexts */
+	starpu_pthread_cond_t parallel_sect_cond[STARPU_NMAXWORKERS];
+	starpu_pthread_mutex_t parallel_sect_mutex[STARPU_NMAXWORKERS];
+
+	/* boolean indicating that workers should block in order to allow
+	   parallel sections to be executed on their allocated resources */
+	unsigned parallel_sect[STARPU_NMAXWORKERS];
+
+	/* id of the master worker */
+	int master[STARPU_NMAXWORKERS];
+
+	/* semaphore that block appl thread until starpu threads are 
+	   all blocked and ready to exec the parallel code */
+	sem_t fall_asleep_sem[STARPU_NMAXWORKERS];
+
+	/* semaphore that block appl thread until starpu threads are 
+	   all woke up and ready continue appl */
+	sem_t wake_up_sem[STARPU_NMAXWORKERS];
+       
+	/* bool indicating if the workers is sleeping in this ctx */
+	unsigned sleeping[STARPU_NMAXWORKERS];
+
 };
 
 struct _starpu_machine_config;
@@ -182,10 +205,10 @@ starpu_pthread_rwlock_t* _starpu_sched_ctx_get_changing_ctx_mutex(unsigned sched
 unsigned _starpu_sched_ctx_last_worker_awake(struct _starpu_worker *worker);
 
 /* let the appl know that the worker blocked to execute parallel code */
-void _starpu_sched_ctx_signal_worker_blocked(int workerid);
+void _starpu_sched_ctx_signal_worker_blocked(unsigned sched_ctx_id, int workerid);
 
 /* let the appl know that the worker woke up */
-void _starpu_sched_ctx_signal_worker_woke_up(int workerid);
+void _starpu_sched_ctx_signal_worker_woke_up(unsigned sched_ctx_id, int workerid);
 
 /* If starpu_sched_ctx_set_context() has been called, returns the context
  * id set by its last call, or the id of the initial context */

+ 1 - 1
src/core/sched_policy.c

@@ -446,7 +446,7 @@ int _starpu_push_task_to_workers(struct starpu_task *task)
 
 		if(!sched_ctx->sched_policy)
 		{
-			ret = _starpu_push_task_on_specific_worker(task, sched_ctx->master);
+			ret = _starpu_push_task_on_specific_worker(task, sched_ctx->main_master);
 		}
 		else
 		{

+ 3 - 8
src/core/workers.c

@@ -270,7 +270,8 @@ static int _starpu_can_use_nth_implementation(enum starpu_worker_archtype arch,
 
 int starpu_worker_can_execute_task(unsigned workerid, struct starpu_task *task, unsigned nimpl)
 {
-	if(config.workers[workerid].parallel_sect) return 0;
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
+	if(sched_ctx->parallel_sect[workerid]) return 0;
 	/* TODO: check that the task operand sizes will fit on that device */
 	return (task->cl->where & config.workers[workerid].worker_mask) &&
 		_starpu_can_use_nth_implementation(config.workers[workerid].arch, task->cl, nimpl) &&
@@ -412,8 +413,6 @@ static void _starpu_worker_init(struct _starpu_worker *workerarg, struct _starpu
 	starpu_task_list_init(&workerarg->local_tasks);
 	workerarg->current_task = NULL;
 	workerarg->set = NULL;
-	sem_init(&workerarg->fall_asleep_sem, 0, 0);
-	sem_init(&workerarg->wake_up_sem, 0, 0);
 
 	/* if some codelet's termination cannot be handled directly :
 	 * for instance in the Gordon driver, Gordon tasks' callbacks
@@ -430,6 +429,7 @@ static void _starpu_worker_init(struct _starpu_worker *workerarg, struct _starpu
 	workerarg->run_by_starpu = 1;
 
 	workerarg->sched_ctx_list = NULL;
+	workerarg->tmp_sched_ctx = -1;
 	workerarg->nsched_ctxs = 0;
 	_starpu_barrier_counter_init(&workerarg->tasks_barrier, 0);
 
@@ -441,10 +441,6 @@ static void _starpu_worker_init(struct _starpu_worker *workerarg, struct _starpu
 
 	workerarg->spinning_backoff = 1;
 
-	STARPU_PTHREAD_COND_INIT(&workerarg->parallel_sect_cond, NULL);
-	STARPU_PTHREAD_MUTEX_INIT(&workerarg->parallel_sect_mutex, NULL);
-
-	workerarg->parallel_sect = 0;
 
 	for(ctx = 0; ctx < STARPU_NMAX_SCHED_CTXS; ctx++)
 	{
@@ -455,7 +451,6 @@ static void _starpu_worker_init(struct _starpu_worker *workerarg, struct _starpu
 	workerarg->reverse_phase[1] = 0;
 	workerarg->pop_ctx_priority = 1;
 	workerarg->sched_mutex_locked = 0;
-	workerarg->master = -1;
 
 	/* cpu_set/hwloc_cpu_set initialized in topology.c */
 }

+ 1 - 18
src/core/workers.h

@@ -84,6 +84,7 @@ LIST_TYPE(_starpu_worker,
 	unsigned run_by_starpu; /* Is this run by StarPU or directly by the application ? */
 
 	struct _starpu_sched_ctx_list *sched_ctx_list;
+	int tmp_sched_ctx;
 	unsigned nsched_ctxs; /* the no of contexts a worker belongs to*/
 	struct _starpu_barrier_counter tasks_barrier; /* wait for the tasks submitted */
 
@@ -93,13 +94,6 @@ LIST_TYPE(_starpu_worker,
 
 	unsigned spinning_backoff ; /* number of cycles to pause when spinning  */
 
-	/* conditions variables used when parallel sections are executed in contexts */
-	starpu_pthread_cond_t parallel_sect_cond;
-	starpu_pthread_mutex_t parallel_sect_mutex;
-
-	/* boolean indicating that workers should block in order to allow
-	   parallel sections to be executed on their allocated resources */
-	unsigned parallel_sect;
 
 	/* indicate whether the workers shares tasks lists with other workers*/
 	/* in this case when removing him from a context it disapears instantly */
@@ -118,17 +112,6 @@ LIST_TYPE(_starpu_worker,
 	/* flag to know if sched_mutex is locked or not */
 	unsigned sched_mutex_locked;
 
-	/* id of the master worker */
-	int master;
-
-	/* semaphore that block appl thread until starpu threads are 
-	   all blocked and ready to exec the parallel code */
-	sem_t fall_asleep_sem;
-
-	/* semaphore that block appl thread until starpu threads are 
-	   all woke up and ready continue appl */
-	sem_t wake_up_sem;
-
 #ifdef __GLIBC__
 	cpu_set_t cpu_set;
 #endif /* __GLIBC__ */

+ 1 - 1
src/datawizard/coherency.c

@@ -201,7 +201,7 @@ static int worker_supports_direct_access(unsigned node, unsigned handling_node)
 			enum starpu_node_kind kind = starpu_node_get_kind(handling_node);
 			/* GPUs not always allow direct remote access: if CUDA4
 			 * is enabled, we allow two CUDA devices to communicate. */
-			return kind == STARPU_CPU_RAM || kind == STARPU_CUDA_RAM;
+			return kind == STARPU_CUDA_RAM;
 		}
 #else
 			/* Direct GPU-GPU transfers are not allowed in general */

+ 4 - 4
src/datawizard/copy_driver.c

@@ -163,7 +163,7 @@ static int copy_data_1_to_1_generic(starpu_data_handle_t handle,
 			cures = cudaEventCreateWithFlags(&req->async_channel.event.cuda_event, cudaEventDisableTiming);
 			if (STARPU_UNLIKELY(cures != cudaSuccess)) STARPU_CUDA_REPORT_ERROR(cures);
 
-			stream = starpu_cuda_get_out_transfer_stream(src_node);
+			stream = starpu_cuda_get_local_out_transfer_stream();
 			if (copy_methods->cuda_to_ram_async)
 				ret = copy_methods->cuda_to_ram_async(src_interface, src_node, dst_interface, dst_node, stream);
 			else
@@ -199,7 +199,7 @@ static int copy_data_1_to_1_generic(starpu_data_handle_t handle,
 			if (STARPU_UNLIKELY(cures != cudaSuccess))
 				STARPU_CUDA_REPORT_ERROR(cures);
 
-			stream = starpu_cuda_get_in_transfer_stream(dst_node);
+			stream = starpu_cuda_get_local_in_transfer_stream();
 			if (copy_methods->ram_to_cuda_async)
 				ret = copy_methods->ram_to_cuda_async(src_interface, src_node, dst_interface, dst_node, stream);
 			else
@@ -533,7 +533,7 @@ int starpu_interface_copy(uintptr_t src, size_t src_offset, unsigned src_node, u
 				(void*) src + src_offset, src_node,
 				(void*) dst + dst_offset, dst_node,
 				size,
-				async_channel?starpu_cuda_get_out_transfer_stream(src_node):NULL,
+				async_channel?starpu_cuda_get_local_out_transfer_stream():NULL,
 				cudaMemcpyDeviceToHost);
 
 	case _STARPU_MEMORY_NODE_TUPLE(STARPU_CPU_RAM,STARPU_CUDA_RAM):
@@ -541,7 +541,7 @@ int starpu_interface_copy(uintptr_t src, size_t src_offset, unsigned src_node, u
 				(void*) src + src_offset, src_node,
 				(void*) dst + dst_offset, dst_node,
 				size,
-				async_channel?starpu_cuda_get_in_transfer_stream(dst_node):NULL,
+				async_channel?starpu_cuda_get_local_in_transfer_stream():NULL,
 				cudaMemcpyHostToDevice);
 
 	case _STARPU_MEMORY_NODE_TUPLE(STARPU_CUDA_RAM,STARPU_CUDA_RAM):

+ 14 - 0
src/debug/traces/starpu_fxt.c

@@ -780,6 +780,16 @@ static void handle_worker_status(struct fxt_ev_64 *ev, struct starpu_fxt_options
 
 static double last_sleep_start[STARPU_NMAXWORKERS];
 
+static void handle_start_scheduling(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
+{
+	int worker;
+	worker = find_worker_id(ev->param[0]);
+	if (worker < 0) return;
+
+	if (out_paje_file)
+		worker_set_state(get_event_time_stamp(ev, options), options->file_prefix, ev->param[0], "Sc");
+}
+
 static void handle_start_sleep(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
 {
 	int worker;
@@ -1492,6 +1502,10 @@ void starpu_fxt_parse_new_file(char *filename_in, struct starpu_fxt_options *opt
 				handle_worker_status(&ev, options, "B");
 				break;
 
+			case _STARPU_FUT_WORKER_SCHEDULING_START:
+				handle_start_scheduling(&ev, options);
+				break;
+
 			case _STARPU_FUT_WORKER_SLEEP_START:
 				handle_start_sleep(&ev, options);
 				break;

+ 5 - 1
src/debug/traces/starpu_paje.c

@@ -164,6 +164,7 @@ void _starpu_fxt_write_paje_header(FILE *file)
 	poti_DefineEntityValue("Po", "S", "PushingOutput", "0.1 1.0 1.0");
 	poti_DefineEntityValue("C", "S", "Callback", ".0 .3 .8");
 	poti_DefineEntityValue("B", "S", "Overhead", ".5 .18 .0");
+	poti_DefineEntityValue("Sc", "S", "Scheduling", ".7 .36 .0");
 	poti_DefineEntityValue("Sl", "S", "Sleeping", ".9 .1 .0");
 	poti_DefineEntityValue("P", "S", "Progressing", ".4 .1 .6");
 	poti_DefineEntityValue("U", "S", "Unpartitioning", ".0 .0 1.0");
@@ -192,6 +193,7 @@ void _starpu_fxt_write_paje_header(FILE *file)
 		poti_DefineEntityValue("Po", ctx, "PushingOutput", "0.1 1.0 1.0");
 		poti_DefineEntityValue("C", ctx, "Callback", ".0 .3 .8");
 		poti_DefineEntityValue("B", ctx, "Overhead", ".5 .18 .0");
+		poti_DefineEntityValue("Sc", ctx, "Scheduling", ".7 .36 .0");
 		poti_DefineEntityValue("Sl", ctx, "Sleeping", ".9 .1 .0");
 		poti_DefineEntityValue("P", ctx, "Progressing", ".4 .1 .6");
 		poti_DefineEntityValue("U", ctx, "Unpartitioning", ".0 .0 1.0");
@@ -234,6 +236,7 @@ void _starpu_fxt_write_paje_header(FILE *file)
 6       Po       S      PushingOutput       \"0.1 1.0 1.0\"            \n\
 6       C       S       Callback       \".0 .3 .8\"            \n\
 6       B       S       Overhead         \".5 .18 .0\"		\n\
+6       Sc       S      Scheduling         \".7 .36 .0\"		\n\
 6       Sl       S      Sleeping         \".9 .1 .0\"		\n\
 6       P       S       Progressing         \".4 .1 .6\"		\n\
 6       U       S       Unpartitioning      \".0 .0 1.0\"		\n\
@@ -255,10 +258,11 @@ void _starpu_fxt_write_paje_header(FILE *file)
 6       Po       Ctx%u      PushingOutput       \"0.1 1.0 1.0\"            \n\
 6       C       Ctx%u       Callback       \".0 .3 .8\"            \n\
 6       B       Ctx%u       Overhead         \".5 .18 .0\"		\n\
+6       Sc       Ctx%u      Scheduling         \".7 .36 .0\"		\n\
 6       Sl       Ctx%u      Sleeping         \".9 .1 .0\"		\n\
 6       P       Ctx%u       Progressing         \".4 .1 .6\"		\n\
 6       U       Ctx%u       Unpartitioning         \".0 .0 1.0\"		\n",
-		i, i, i, i, i, i, i, i, i);
+		i, i, i, i, i, i, i, i, i, i);
 	fprintf(file, "\
 6       A       MS      Allocating         \".4 .1 .0\"		\n\
 6       Ar       MS      AllocatingReuse       \".1 .1 .8\"		\n\

+ 37 - 10
src/drivers/cuda/driver_cuda.c

@@ -44,7 +44,10 @@ static size_t global_mem[STARPU_MAXCUDADEVS];
 static cudaStream_t streams[STARPU_NMAXWORKERS];
 static cudaStream_t out_transfer_streams[STARPU_MAXCUDADEVS];
 static cudaStream_t in_transfer_streams[STARPU_MAXCUDADEVS];
-static cudaStream_t peer_transfer_streams[STARPU_MAXCUDADEVS][STARPU_MAXCUDADEVS];
+/* Note: streams are not thread-safe, so we define them for each CUDA worker
+ * emitting a GPU-GPU transfer */
+static cudaStream_t in_peer_transfer_streams[STARPU_MAXCUDADEVS][STARPU_MAXCUDADEVS];
+static cudaStream_t out_peer_transfer_streams[STARPU_MAXCUDADEVS][STARPU_MAXCUDADEVS];
 static struct cudaDeviceProp props[STARPU_MAXCUDADEVS];
 static cudaEvent_t task_events[STARPU_NMAXWORKERS];
 #endif /* STARPU_USE_CUDA */
@@ -116,26 +119,44 @@ static void _starpu_cuda_limit_gpu_mem_if_needed(unsigned devid)
 }
 
 #ifdef STARPU_USE_CUDA
-cudaStream_t starpu_cuda_get_in_transfer_stream(unsigned node)
+cudaStream_t starpu_cuda_get_local_in_transfer_stream()
 {
-	int devid = _starpu_memory_node_get_devid(node);
+	int worker = starpu_worker_get_id();
+	int devid = starpu_worker_get_devid(worker);
+	cudaStream_t stream;
 
-	return in_transfer_streams[devid];
+	stream = in_transfer_streams[devid];
+	STARPU_ASSERT(stream);
+	return stream;
 }
 
-cudaStream_t starpu_cuda_get_out_transfer_stream(unsigned node)
+cudaStream_t starpu_cuda_get_local_out_transfer_stream()
 {
-	int devid = _starpu_memory_node_get_devid(node);
+	int worker = starpu_worker_get_id();
+	int devid = starpu_worker_get_devid(worker);
+	cudaStream_t stream;
 
-	return out_transfer_streams[devid];
+	stream = out_transfer_streams[devid];
+	STARPU_ASSERT(stream);
+	return stream;
 }
 
 cudaStream_t starpu_cuda_get_peer_transfer_stream(unsigned src_node, unsigned dst_node)
 {
+	int worker = starpu_worker_get_id();
+	int devid = starpu_worker_get_devid(worker);
 	int src_devid = _starpu_memory_node_get_devid(src_node);
 	int dst_devid = _starpu_memory_node_get_devid(dst_node);
+	cudaStream_t stream;
 
-	return peer_transfer_streams[src_devid][dst_devid];
+	STARPU_ASSERT(devid == src_devid || devid == dst_devid);
+
+	if (devid == dst_devid)
+		stream = in_peer_transfer_streams[src_devid][dst_devid];
+	else
+		stream = out_peer_transfer_streams[src_devid][dst_devid];
+	STARPU_ASSERT(stream);
+	return stream;
 }
 
 cudaStream_t starpu_cuda_get_local_stream(void)
@@ -274,7 +295,10 @@ static void init_context(struct _starpu_worker_set *worker_set, unsigned devid)
 
 	for (i = 0; i < ncudagpus; i++)
 	{
-		cures = cudaStreamCreate(&peer_transfer_streams[i][devid]);
+		cures = cudaStreamCreate(&in_peer_transfer_streams[i][devid]);
+		if (STARPU_UNLIKELY(cures))
+			STARPU_CUDA_REPORT_ERROR(cures);
+		cures = cudaStreamCreate(&out_peer_transfer_streams[devid][i]);
 		if (STARPU_UNLIKELY(cures))
 			STARPU_CUDA_REPORT_ERROR(cures);
 	}
@@ -300,7 +324,10 @@ static void deinit_context(struct _starpu_worker_set *worker_set)
 	cudaStreamDestroy(out_transfer_streams[devid]);
 
 	for (i = 0; i < ncudagpus; i++)
-		cudaStreamDestroy(peer_transfer_streams[i][devid]);
+	{
+		cudaStreamDestroy(in_peer_transfer_streams[i][devid]);
+		cudaStreamDestroy(out_peer_transfer_streams[devid][i]);
+	}
 
 	/* cleanup the runtime API internal stuffs (which CUBLAS is using) */
 	cures = cudaThreadExit();

+ 2 - 2
src/drivers/cuda/driver_cuda.h

@@ -48,8 +48,8 @@ void *_starpu_cuda_worker(void *);
 #  define _starpu_cuda_discover_devices(config) ((void) config)
 #endif
 #ifdef STARPU_USE_CUDA
-cudaStream_t starpu_cuda_get_in_transfer_stream(unsigned node);
-cudaStream_t starpu_cuda_get_out_transfer_stream(unsigned node);
+cudaStream_t starpu_cuda_get_local_in_transfer_stream(void);
+cudaStream_t starpu_cuda_get_local_out_transfer_stream(void);
 cudaStream_t starpu_cuda_get_peer_transfer_stream(unsigned src_node, unsigned dst_node);
 
 struct _starpu_worker_set;

+ 60 - 10
src/drivers/driver_common/driver_common.c

@@ -212,6 +212,17 @@ void _starpu_driver_update_job_feedback(struct _starpu_job *j, struct _starpu_wo
 	}
 }
 
+static void _starpu_worker_set_status_scheduling(int workerid)
+{
+	if (_starpu_worker_get_status(workerid) != STATUS_SLEEPING
+		&& _starpu_worker_get_status(workerid) != STATUS_SCHEDULING)
+	{
+		_STARPU_TRACE_WORKER_SCHEDULING_START;
+		_starpu_worker_set_status(workerid, STATUS_SCHEDULING);
+	}
+
+}
+
 static void _starpu_worker_set_status_sleeping(int workerid)
 {
 	if ( _starpu_worker_get_status(workerid) == STATUS_WAKING_UP)
@@ -252,20 +263,59 @@ static void _starpu_exponential_backoff(struct _starpu_worker *args)
 /* Workers may block when there is no work to do at all. */
 struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *args, int workerid, unsigned memnode)
 {
+	STARPU_PTHREAD_MUTEX_LOCK(&args->sched_mutex);
 	struct starpu_task *task;
-
-	STARPU_PTHREAD_MUTEX_LOCK(&args->parallel_sect_mutex);
-	if(args->parallel_sect)
+	unsigned needed = 1;
+	_starpu_worker_set_status_scheduling(workerid);
+	while(needed)
 	{
-		_starpu_sched_ctx_signal_worker_blocked(args->workerid);
-		STARPU_PTHREAD_COND_WAIT(&args->parallel_sect_cond, &args->parallel_sect_mutex);
-		starpu_sched_ctx_bind_current_thread_to_cpuid(args->bindid);
-		_starpu_sched_ctx_signal_worker_woke_up(workerid);
-		args->parallel_sect = 0;
+		struct _starpu_sched_ctx *sched_ctx = NULL;
+		struct _starpu_sched_ctx_list *l = NULL;
+		for (l = args->sched_ctx_list; l; l = l->next)
+		{
+			sched_ctx = _starpu_get_sched_ctx_struct(l->sched_ctx);
+			if(sched_ctx && sched_ctx->id > 0 && sched_ctx->id < STARPU_NMAX_SCHED_CTXS)
+			{
+				STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->parallel_sect_mutex[workerid]);
+				if(sched_ctx->parallel_sect[workerid])
+				{
+					/* don't let the worker sleep with the sched_mutex taken */
+					/* we need it until here bc of the list of ctxs of the workers
+					   that can change in another thread */
+					STARPU_PTHREAD_MUTEX_UNLOCK(&args->sched_mutex);
+					needed = 0;
+					_starpu_sched_ctx_signal_worker_blocked(sched_ctx->id, workerid);
+					STARPU_PTHREAD_COND_WAIT(&sched_ctx->parallel_sect_cond[workerid], &sched_ctx->parallel_sect_mutex[workerid]);
+					_starpu_sched_ctx_signal_worker_woke_up(sched_ctx->id, workerid);
+					sched_ctx->parallel_sect[workerid] = 0;
+					STARPU_PTHREAD_MUTEX_LOCK(&args->sched_mutex);
+				}
+				STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->parallel_sect_mutex[workerid]);
+			}
+			if(!needed)
+				break;
+		}
+		/* don't worry if the value is not correct (no lock) it will do it next time */
+		if(args->tmp_sched_ctx != -1)
+		{
+			sched_ctx = _starpu_get_sched_ctx_struct(args->tmp_sched_ctx);
+			STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->parallel_sect_mutex[workerid]);
+			if(sched_ctx->parallel_sect[workerid])
+			{
+//				needed = 0;
+				STARPU_PTHREAD_MUTEX_UNLOCK(&args->sched_mutex);
+				_starpu_sched_ctx_signal_worker_blocked(sched_ctx->id, workerid);
+				STARPU_PTHREAD_COND_WAIT(&sched_ctx->parallel_sect_cond[workerid], &sched_ctx->parallel_sect_mutex[workerid]);
+				_starpu_sched_ctx_signal_worker_woke_up(sched_ctx->id, workerid);
+				sched_ctx->parallel_sect[workerid] = 0;
+				STARPU_PTHREAD_MUTEX_LOCK(&args->sched_mutex);
+			}
+			STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->parallel_sect_mutex[workerid]);
+		}
+
+		needed = !needed;
 	}
-	STARPU_PTHREAD_MUTEX_UNLOCK(&args->parallel_sect_mutex);
 
-	STARPU_PTHREAD_MUTEX_LOCK(&args->sched_mutex);
 	task = _starpu_pop_task(args);
 
 	if (task == NULL)