Samuel Thibault 11 lat temu
rodzic
commit
e5dc2fe953
50 zmienionych plików z 1548 dodań i 332 usunięć
  1. 15 0
      ChangeLog
  2. 3 2
      configure.ac
  3. 4 6
      doc/doxygen/chapters/05check_list_performance.doxy
  4. 5 0
      doc/doxygen/chapters/07data_management.doxy
  5. 32 14
      doc/doxygen/chapters/08scheduling.doxy
  6. 33 0
      doc/doxygen/chapters/16mpi_support.doxy
  7. 1 1
      doc/doxygen/chapters/40environment_variables.doxy
  8. 5 0
      examples/Makefile.am
  9. 85 1
      examples/common/blas.c
  10. 11 13
      examples/sched_ctx/parallel_code.c
  11. 2 2
      examples/sched_ctx/sched_ctx.c
  12. 164 0
      examples/sched_ctx/sched_ctx_without_sched_policy.c
  13. 3 1
      include/starpu.h
  14. 1 1
      include/starpu_thread.h
  15. 13 13
      mpi/src/starpu_mpi_task_insert.c
  16. 4 0
      mpi/tests/Makefile.am
  17. 116 0
      mpi/tests/callback.c
  18. 6 2
      src/Makefile.am
  19. 89 1
      src/common/fxt.h
  20. 5 4
      src/common/thread.c
  21. 3 1
      src/core/errorcheck.h
  22. 6 2
      src/core/perfmodel/perfmodel.c
  23. 1 1
      src/core/perfmodel/perfmodel_bus.c
  24. 229 148
      src/core/sched_ctx.c
  25. 32 1
      src/core/sched_ctx.h
  26. 30 14
      src/core/sched_policy.c
  27. 114 14
      src/core/simgrid.c
  28. 2 0
      src/core/simgrid.h
  29. 5 2
      src/core/task.c
  30. 7 6
      src/core/topology.c
  31. 10 9
      src/core/workers.c
  32. 1 14
      src/core/workers.h
  33. 21 11
      src/datawizard/coherency.c
  34. 2 3
      src/datawizard/coherency.h
  35. 6 4
      src/datawizard/copy_driver.c
  36. 1 0
      src/datawizard/filters.c
  37. 2 0
      src/datawizard/interfaces/data_interface.c
  38. 4 1
      src/datawizard/memalloc.c
  39. 31 0
      src/datawizard/memory_nodes.c
  40. 2 1
      src/datawizard/memory_nodes.h
  41. 14 0
      src/debug/traces/starpu_fxt.c
  42. 5 1
      src/debug/traces/starpu_paje.c
  43. 49 20
      src/drivers/cuda/driver_cuda.c
  44. 2 2
      src/drivers/cuda/driver_cuda.h
  45. 60 10
      src/drivers/driver_common/driver_common.c
  46. 2 2
      src/drivers/opencl/driver_opencl.c
  47. 47 0
      src/starpu_smpi.xslt
  48. 1 0
      tests/Makefile.am
  49. 214 0
      tests/datawizard/increment_init.c
  50. 48 4
      tests/main/codelet_null_callback.c

+ 15 - 0
ChangeLog

@@ -17,6 +17,18 @@
 StarPU 1.2.0 (svn revision xxxx)
 ==============================================
 
+StarPU 1.1.2 (svn revision xxxx)
+==============================================
+The scheduling context release
+
+New features:
+  * The reduction init codelet is automatically used to initialize temporary
+    buffers.
+
+StarPU 1.1.1 (svn revision 12638)
+==============================================
+The scheduling context release
+
 New features:
   * Xeon Phi support
   * SCC support
@@ -53,6 +65,7 @@ New features:
   * Add paje traces statistics tools.
   * Add CUDA concurrent kernel execution support through
     the STARPU_NWORKER_PER_CUDA environment variable.
+  * Use streams for GPUA->GPUB and GPUB->GPUA transfers.
 
 Small features:
   * New functions starpu_data_acquire_cb_sequential_consistency() and
@@ -344,6 +357,8 @@ Small changes:
   * Use C99 variadic macro support, not GNU.
   * Fix performance regression: dmda queues were inadvertently made
     LIFOs in r9611.
+  * Use big fat abortions when one tries to make a task or callback
+    sleep, instead of just returning EDEADLCK which few people will test
   * By default, StarPU FFT examples are not compiled and checked, the
     configure option --enable-starpufft-examples needs to be specified
     to change this behaviour.

+ 3 - 2
configure.ac

@@ -865,7 +865,7 @@ AC_SUBST(STARPU_USE_OPENCL, $enable_opencl)
 AM_CONDITIONAL(STARPU_USE_OPENCL, test x$enable_opencl = xyes)
 if test x$enable_opencl = xyes ; then
 	AC_DEFINE(STARPU_USE_OPENCL, [1], [OpenCL support is activated])
-	STARPU_OPENCL_CPPFLAGS="${STARPU_OPENCL_CPPFLAGS} -DSTARPU_OPENCL_DATADIR=${datarootdir}/starpu/opencl"
+	STARPU_OPENCL_CPPFLAGS="${STARPU_OPENCL_CPPFLAGS} -DSTARPU_OPENCL_DATADIR=${datarootdir}/starpu/opencl -DCL_USE_DEPRECATED_OPENCL_1_1_APIS"
         AC_SUBST(STARPU_OPENCL_DATAdir, "$(eval echo ${datarootdir}/starpu/opencl/examples)")
         AC_SUBST(STARPU_OPENCL_CPPFLAGS)
         AC_SUBST(STARPU_OPENCL_LDFLAGS)
@@ -994,6 +994,7 @@ if test x$enable_simgrid = xyes ; then
 			AC_MSG_ERROR(Simgrid support needs simgrid installed)
 		]
 	)
+   	AC_CHECK_FUNCS([MSG_process_join])
 	AC_COMPILE_IFELSE([AC_LANG_PROGRAM(
 		    		[[#include <msg/msg.h>]],
 				[[msg_host_t foo; ]]
@@ -2196,7 +2197,7 @@ AM_CONDITIONAL(ATLAS_BLAS_LIB, test x$blas_lib = xatlas)
 AM_CONDITIONAL(GOTO_BLAS_LIB, test x$blas_lib = xgoto)
 AM_CONDITIONAL(MKL_BLAS_LIB, test x$blas_lib = xmkl)
 AM_CONDITIONAL(SYSTEM_BLAS_LIB, test x$blas_lib = xsystem)
-AM_CONDITIONAL(NO_BLAS_LIB, test x$blas_lib = xnone)
+AM_CONDITIONAL(NO_BLAS_LIB, test x$blas_lib = xnone -a x$enable_simgrid = xno)
 
 AC_MSG_CHECKING(which BLAS lib should be used)
 AC_MSG_RESULT($blas_lib)

+ 4 - 6
doc/doxygen/chapters/05check_list_performance.doxy

@@ -100,8 +100,9 @@ any task for 10ms. In addition to that,
 
 <c>export STARPU_WATCHDOG_CRASH=1</c>
 
-triggers a crash in that condition, thus allowing to catch the situation in gdb
-etc.
+raises SIGABRT in that condition, thus allowing to catch the situation in gdb.
+It can also be useful to type "handle SIGABRT nopass" in gdb to be able to let
+the process continue, after inspecting the state of the process.
 
 \section HowToLimitMemoryPerNode How to limit memory per node
 
@@ -168,10 +169,7 @@ If a kernel source code was modified (e.g. performance improvement), the
 calibration information is stale and should be dropped, to re-calibrate from
 start. This can be done by using <c>export STARPU_CALIBRATE=2</c>.
 
-Note: due to CUDA limitations, to be able to measure kernel duration,
-calibration mode needs to disable asynchronous data transfers. Calibration thus
-disables data transfer / computation overlapping, and should thus not be used
-for eventual benchmarks. Note 2: history-based performance models get calibrated
+Note: history-based performance models get calibrated
 only if a performance-model-based scheduler is chosen.
 
 The history-based performance models can also be explicitly filled by the

+ 5 - 0
doc/doxygen/chapters/07data_management.doxy

@@ -325,6 +325,11 @@ starpu_task_insert(&summarize_data, STARPU_R, handle, STARPU_W, result_handle, 0
 starpu_data_unregister_submit(handle);
 \endcode
 
+The application may also want to see the temporary data initialized
+on the fly before being used by the task. This can be done by using
+starpu_data_set_reduction_methods() to set an initialization codelet (no redux
+codelet is needed).
+
 \subsection ScratchData Scratch Data
 
 Some kernels sometimes need temporary data to achieve the computations, i.e. a

+ 32 - 14
doc/doxygen/chapters/08scheduling.doxy

@@ -10,6 +10,19 @@
 
 \section TaskSchedulingPolicy Task Scheduling Policy
 
+The basics of the scheduling policy are that
+
+<ul>
+<li>The scheduler gets to schedule tasks (<c>push</c> operation) when they become
+ready to be executed, i.e. they are not waiting for some tags, data dependencies
+or task dependencies.</li>
+<li>Workers pull tasks (<c>pop</c> operation) one by one from the scheduler.
+</ul>
+
+This means scheduling policies usually contain at least one queue of tasks to
+store them between the time when they become available, and the time when a
+worker gets to grab them.
+
 By default, StarPU uses the simple greedy scheduler <c>eager</c>. This is
 because it provides correct load balance even if the application codelets do not
 have performance models. If your application codelets have performance models
@@ -17,38 +30,43 @@ have performance models. If your application codelets have performance models
 to the environment variable \ref STARPU_SCHED. For instance <c>export
 STARPU_SCHED=dmda</c> . Use <c>help</c> to get the list of available schedulers.
 
-The <b>eager</b> scheduler uses a central task queue, from which workers draw tasks
-to work on. This however does not permit to prefetch data since the scheduling
+The <b>eager</b> scheduler uses a central task queue, from which all workers draw tasks
+to work on concurrently. This however does not permit to prefetch data since the scheduling
 decision is taken late. If a task has a non-0 priority, it is put at the front of the queue.
 
 The <b>prio</b> scheduler also uses a central task queue, but sorts tasks by
 priority (between -5 and 5).
 
-The <b>random</b> scheduler distributes tasks randomly according to assumed worker
+The <b>random</b> scheduler uses a queue per worker, and distributes tasks randomly according to assumed worker
 overall performance.
 
-The <b>ws</b> (work stealing) scheduler schedules tasks on the local worker by
+The <b>ws</b> (work stealing) scheduler uses a queue per worker, and schedules
+a task on the worker which released it by
 default. When a worker becomes idle, it steals a task from the most loaded
 worker.
 
 The <b>dm</b> (deque model) scheduler uses task execution performance models into account to
-perform an HEFT-similar scheduling strategy: it schedules tasks where their
-termination time will be minimal.
+perform a HEFT-similar scheduling strategy: it schedules tasks where their
+termination time will be minimal. The difference with HEFT is that <b>dm</b>
+schedules tasks as soon as they become available, and thus in the order they
+become available, without taking priorities into account.
 
-The <b>dmda</b> (deque model data aware) scheduler is similar to dm, it also takes
+The <b>dmda</b> (deque model data aware) scheduler is similar to dm, but it also takes
 into account data transfer time.
 
 The <b>dmdar</b> (deque model data aware ready) scheduler is similar to dmda,
-it also sorts tasks on per-worker queues by number of already-available data
-buffers.
+but it also sorts tasks on per-worker queues by number of already-available data
+buffers on the target device.
 
-The <b>dmdas</b> (deque model data aware sorted) scheduler is similar to dmda, it
-also supports arbitrary priority values.
+The <b>dmdas</b> (deque model data aware sorted) scheduler is similar to dmdar,
+except that it sorts tasks by priority order, which allows to become even closer
+to HEFT by respecting priorities after having made the scheduling decision (but
+it still schedules tasks in the order they become available).
 
-The <b>heft</b> (heterogeneous earliest finish time) scheduler is deprecated. It
-is now just an alias for <b>dmda</b>.
+The <b>heft</b> (heterogeneous earliest finish time) scheduler is a deprecated
+alias for <b>dmda</b>.
 
-The <b>pheft</b> (parallel HEFT) scheduler is similar to heft, it also supports
+The <b>pheft</b> (parallel HEFT) scheduler is similar to dmda, it also supports
 parallel tasks (still experimental). Should not be used when several contexts using
 it are being executed simultaneously.
 

+ 33 - 0
doc/doxygen/chapters/16mpi_support.doxy

@@ -320,6 +320,39 @@ application can prune the task for loops according to the data distribution,
 so as to only submit tasks on nodes which have to care about them (either to
 execute them, or to send the required data).
 
+A way to do some of this quite easily can be to just add an <c>if</c> like this:
+
+\code{.c}
+    for(loop=0 ; loop<niter; loop++)
+        for (x = 1; x < X-1; x++)
+            for (y = 1; y < Y-1; y++)
+                if (my_distrib(x,y,size) == my_rank
+                 || my_distrib(x-1,y,size) == my_rank
+                 || my_distrib(x+1,y,size) == my_rank
+                 || my_distrib(x,y-1,size) == my_rank
+                 || my_distrib(x,y+1,size) == my_rank)
+                    starpu_mpi_insert_task(MPI_COMM_WORLD, &stencil5_cl,
+                                           STARPU_RW, data_handles[x][y],
+                                           STARPU_R, data_handles[x-1][y],
+                                           STARPU_R, data_handles[x+1][y],
+                                           STARPU_R, data_handles[x][y-1],
+                                           STARPU_R, data_handles[x][y+1],
+                                           0);
+    starpu_task_wait_for_all();
+\endcode
+
+This permits to drop the cost of function call argument passing and parsing.
+
+If the <c>my_distrib</c> function can be inlined by the compiler, the latter can
+improve the test.
+
+If the <c>size</c> can be made a compile-time constant, the compiler can
+considerably improve the test further.
+
+If the distribution function is not too complex and the compiler is very good,
+the latter can even optimize the <c>for</c> loops, thus dramatically reducing
+the cost of task submission.
+
 A function starpu_mpi_task_build() is also provided with the aim to
 only construct the task structure. All MPI nodes need to call the
 function, only the node which is to execute the task will return a

+ 1 - 1
doc/doxygen/chapters/40environment_variables.doxy

@@ -587,7 +587,7 @@ end of the execution of an application (\ref DataStatistics).
 \anchor STARPU_WATCHDOG_TIMEOUT
 \addindex __env__STARPU_WATCHDOG_TIMEOUT
 When set to a value other than 0, allows to make StarPU print an error
-message whenever StarPU does not terminate any task for 10ms. Should
+message whenever StarPU does not terminate any task for the given time (in µs). Should
 be used in combination with \ref STARPU_WATCHDOG_CRASH (see \ref
 DetectionStuckConditions).
 </dd>

+ 5 - 0
examples/Makefile.am

@@ -189,6 +189,7 @@ examplebin_PROGRAMS +=				\
 	sched_ctx/parallel_code			\
 	sched_ctx/dummy_sched_with_ctx		\
 	sched_ctx/prio				\
+	sched_ctx/sched_ctx_without_sched_policy\
 	worker_collections/worker_tree_example  \
 	worker_collections/worker_list_example  \
 	reductions/dot_product			\
@@ -268,6 +269,7 @@ STARPU_EXAMPLES +=				\
 	sched_ctx/sched_ctx			\
 	sched_ctx/prio				\
 	sched_ctx/dummy_sched_with_ctx		\
+	sched_ctx/sched_ctx_without_sched_policy\
 	worker_collections/worker_tree_example  \
 	worker_collections/worker_list_example  \
 	reductions/dot_product			\
@@ -920,6 +922,9 @@ openmp_vector_scal_omp_CFLAGS = \
 sched_ctx_parallel_code_CFLAGS = \
 	$(AM_CFLAGS) -fopenmp
 
+sched_ctx_sched_ctx_without_sched_policy_CFLAGS = \
+	$(AM_CFLAGS) -fopenmp
+
 endif
 
 showcheck:

+ 85 - 1
examples/common/blas.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2009, 2010  Université de Bordeaux 1
+ * Copyright (C) 2009, 2010, 2014  Université de Bordeaux 1
  * Copyright (C) 2010  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -415,6 +415,90 @@ void DSWAP(const int n, double *X, const int incX, double *Y, const int incY)
 }
 
 
+#elif defined(STARPU_SIMGRID)
+inline void SGEMM(char *transa, char *transb, int M, int N, int K, 
+			float alpha, const float *A, int lda, const float *B, int ldb, 
+			float beta, float *C, int ldc) { }
+
+inline void DGEMM(char *transa, char *transb, int M, int N, int K, 
+			double alpha, double *A, int lda, double *B, int ldb, 
+			double beta, double *C, int ldc) { }
+
+inline void SGEMV(char *transa, int M, int N, float alpha, float *A, int lda,
+		float *X, int incX, float beta, float *Y, int incY) { }
+
+inline void DGEMV(char *transa, int M, int N, double alpha, double *A, int lda,
+		double *X, int incX, double beta, double *Y, int incY) { }
+
+inline float SASUM(int N, float *X, int incX) { }
+
+inline double DASUM(int N, double *X, int incX) { }
+
+void SSCAL(int N, float alpha, float *X, int incX) { }
+
+void DSCAL(int N, double alpha, double *X, int incX) { }
+
+void STRSM (const char *side, const char *uplo, const char *transa,
+                   const char *diag, const int m, const int n,
+                   const float alpha, const float *A, const int lda,
+                   float *B, const int ldb) { }
+
+void DTRSM (const char *side, const char *uplo, const char *transa,
+                   const char *diag, const int m, const int n,
+                   const double alpha, const double *A, const int lda,
+                   double *B, const int ldb) { }
+
+void SSYR (const char *uplo, const int n, const float alpha,
+                  const float *x, const int incx, float *A, const int lda) { }
+
+void SSYRK (const char *uplo, const char *trans, const int n,
+                   const int k, const float alpha, const float *A,
+                   const int lda, const float beta, float *C,
+                   const int ldc) { }
+
+void SGER(const int m, const int n, const float alpha,
+                  const float *x, const int incx, const float *y,
+                  const int incy, float *A, const int lda) { }
+
+void DGER(const int m, const int n, const double alpha,
+                  const double *x, const int incx, const double *y,
+                  const int incy, double *A, const int lda) { }
+
+void STRSV (const char *uplo, const char *trans, const char *diag, 
+                   const int n, const float *A, const int lda, float *x, 
+                   const int incx) { }
+
+void STRMM(const char *side, const char *uplo, const char *transA,
+                 const char *diag, const int m, const int n,
+                 const float alpha, const float *A, const int lda,
+                 float *B, const int ldb) { }
+
+void DTRMM(const char *side, const char *uplo, const char *transA,
+                 const char *diag, const int m, const int n,
+                 const double alpha, const double *A, const int lda,
+                 double *B, const int ldb) { }
+
+void STRMV(const char *uplo, const char *transA, const char *diag,
+                 const int n, const float *A, const int lda, float *X,
+                 const int incX) { }
+
+void SAXPY(const int n, const float alpha, float *X, const int incX, float *Y, const int incY) { }
+
+void DAXPY(const int n, const double alpha, double *X, const int incX, double *Y, const int incY) { }
+
+int ISAMAX (const int n, float *X, const int incX) { }
+
+int IDAMAX (const int n, double *X, const int incX) { }
+
+float SDOT(const int n, const float *x, const int incx, const float *y, const int incy) { }
+
+double DDOT(const int n, const double *x, const int incx, const double *y, const int incy) { }
+
+void SSWAP(const int n, float *X, const int incX, float *Y, const int incY) { }
+
+void DSWAP(const int n, double *X, const int incX, double *Y, const int incY) { }
+
+
 #else
 #error "no BLAS lib available..."
 #endif

+ 11 - 13
examples/sched_ctx/parallel_code.c

@@ -44,22 +44,25 @@ int parallel_code(int sched_ctx)
 		for(i = 0; i < NTASKS; i++)
 			t++;
 	}
+
 	free(cpuids);
 	return t;
 }
 
 static void sched_ctx_func(void *descr[] STARPU_ATTRIBUTE_UNUSED, void *arg)
 {
+	int w = starpu_worker_get_id();
 	unsigned sched_ctx = (unsigned)arg;
-	tasks_executed[sched_ctx-1] = parallel_code(sched_ctx);
+	int n = parallel_code(sched_ctx);
+	printf("w %d executed %d it \n", w, n);
 }
 
 
 static struct starpu_codelet sched_ctx_codelet =
 {
 	.cpu_funcs = {sched_ctx_func, NULL},
-	.cuda_funcs = {sched_ctx_func, NULL},
-	.opencl_funcs = {sched_ctx_func, NULL},
+	.cuda_funcs = {NULL},
+	.opencl_funcs = {NULL},
 	.model = NULL,
 	.nbuffers = 0,
 	.name = "sched_ctx"
@@ -68,11 +71,13 @@ static struct starpu_codelet sched_ctx_codelet =
 void *th(void* p)
 {
 	unsigned sched_ctx = (unsigned)p;
-	tasks_executed[sched_ctx-1] = (int)starpu_sched_ctx_exec_parallel_code((void*)parallel_code, (void*)sched_ctx, sched_ctx); 
+	tasks_executed[sched_ctx-1] += (int)starpu_sched_ctx_exec_parallel_code((void*)parallel_code, (void*)sched_ctx, sched_ctx); 
 }
 
 int main(int argc, char **argv)
 {
+	tasks_executed[0] = 0;
+	tasks_executed[1] = 0;
 	int ntasks = NTASKS;
 	int ret, j, k;
 
@@ -104,13 +109,6 @@ int main(int argc, char **argv)
 	procs2[0] = 0;
 #endif
 
-	int p;
-	for(p = 0; p <nprocs1; p++)
-		printf("w %d in ctx 1 \n", procs1[p]);
-
-	for(p = 0; p <nprocs2; p++)
-		printf("w %d in ctx 2 \n", procs2[p]);
-
 	/*create contexts however you want*/
 	unsigned sched_ctx1 = starpu_sched_ctx_create(procs1, nprocs1, "ctx1", STARPU_SCHED_CTX_POLICY_NAME, "dmda", 0);
 	unsigned sched_ctx2 = starpu_sched_ctx_create(procs2, nprocs2, "ctx2", STARPU_SCHED_CTX_POLICY_NAME, "dmda", 0);
@@ -147,8 +145,8 @@ int main(int argc, char **argv)
 	int master5 = starpu_sched_ctx_book_workers_for_task(sched_ctx2, procs5, nprocs5);
 	int master6 = starpu_sched_ctx_book_workers_for_task(sched_ctx2, procs6, nprocs6);
 
-/* 	int master1 = starpu_sched_ctx_book_workers_for_task(sched_ctx1, procs1, nprocs1); */
-/* 	int master2 = starpu_sched_ctx_book_workers_for_task(sched_ctx2, procs2, nprocs2); */
+/* 	int master1 = starpu_sched_ctx_book_workers_for_task(procs1, nprocs1); */
+/* 	int master2 = starpu_sched_ctx_book_workers_for_task(procs2, nprocs2); */
 
 
 	int i;

+ 2 - 2
examples/sched_ctx/sched_ctx.c

@@ -36,8 +36,8 @@ static void sched_ctx_func(void *descr[] STARPU_ATTRIBUTE_UNUSED, void *arg STAR
 static struct starpu_codelet sched_ctx_codelet =
 {
 	.cpu_funcs = {sched_ctx_func, NULL},
-	.cuda_funcs = {sched_ctx_func, NULL},
-	.opencl_funcs = {sched_ctx_func, NULL},
+	.cuda_funcs = {NULL},
+	.opencl_funcs = {NULL},
 	.model = NULL,
 	.nbuffers = 0,
 	.name = "sched_ctx"

+ 164 - 0
examples/sched_ctx/sched_ctx_without_sched_policy.c

@@ -0,0 +1,164 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2010-2013  Université de Bordeaux 1
+ * Copyright (C) 2010-2014  Centre National de la Recherche Scientifique
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <starpu.h>
+#include <omp.h>
+
+#ifdef STARPU_QUICK_CHECK
+#define NTASKS 64
+#else
+#define NTASKS 10
+#endif
+
+int tasks_executed[2];
+starpu_pthread_mutex_t mut;
+
+int parallel_code(int sched_ctx)
+{
+	int i;
+	int t = 0;
+	int *cpuids = NULL;
+	int ncpuids = 0;
+	starpu_sched_ctx_get_available_cpuids(sched_ctx, &cpuids, &ncpuids);
+
+//	printf("execute task of %d threads \n", ncpuids);
+#pragma omp parallel num_threads(ncpuids)
+	{
+		starpu_sched_ctx_bind_current_thread_to_cpuid(cpuids[omp_get_thread_num()]);
+// 			printf("cpu = %d ctx%d nth = %d\n", sched_getcpu(), sched_ctx, omp_get_num_threads());
+#pragma omp for
+		for(i = 0; i < NTASKS; i++)
+			t++;
+	}
+
+	free(cpuids);
+	return t;
+}
+
+static void sched_ctx_func(void *descr[] STARPU_ATTRIBUTE_UNUSED, void *arg)
+{
+	unsigned sched_ctx = (unsigned)arg;
+	tasks_executed[sched_ctx-1] += parallel_code(sched_ctx);
+}
+
+
+static struct starpu_codelet sched_ctx_codelet =
+{
+	.cpu_funcs = {sched_ctx_func, NULL},
+	.cuda_funcs = { NULL},
+	.opencl_funcs = {NULL},
+	.model = NULL,
+	.nbuffers = 0,
+	.name = "sched_ctx"
+};
+
+
+int main(int argc, char **argv)
+{
+	tasks_executed[0] = 0;
+	tasks_executed[1] = 0;
+	int ntasks = NTASKS;
+	int ret, j, k;
+
+	ret = starpu_init(NULL);
+	if (ret == -ENODEV)
+		return 77;
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
+
+	starpu_pthread_mutex_init(&mut, NULL);
+	int nprocs1 = 1;
+	int nprocs2 = 1;
+	int *procs1, *procs2;
+
+#ifdef STARPU_USE_CPU
+	unsigned ncpus =  starpu_cpu_worker_get_count();
+	procs1 = (int*)malloc(ncpus*sizeof(int));
+	procs2 = (int*)malloc(ncpus*sizeof(int));
+	starpu_worker_get_ids_by_type(STARPU_CPU_WORKER, procs1, ncpus);
+
+	if(ncpus > 1)
+	{
+		nprocs1 = ncpus/2;
+		nprocs2 =  ncpus-nprocs1;
+		k = 0;
+		for(j = nprocs1; j < nprocs1+nprocs2; j++)
+			procs2[k++] = j;
+	}
+	else
+	{
+		procs1 = (int*)malloc(nprocs1*sizeof(int));
+		procs2 = (int*)malloc(nprocs2*sizeof(int));
+		procs1[0] = 0;
+		procs2[0] = 0;
+
+	}
+#else
+	procs1 = (int*)malloc(nprocs1*sizeof(int));
+	procs2 = (int*)malloc(nprocs2*sizeof(int));
+	procs1[0] = 0;
+	procs2[0] = 0;
+#endif
+
+	/*create contexts however you want*/
+	unsigned sched_ctx1 = starpu_sched_ctx_create(procs1, nprocs1, "ctx1", 0);
+	unsigned sched_ctx2 = starpu_sched_ctx_create(procs2, nprocs2, "ctx2", 0);
+
+	int i;
+	for (i = 0; i < ntasks; i++)
+	{
+		struct starpu_task *task = starpu_task_create();
+
+		task->cl = &sched_ctx_codelet;
+		task->cl_arg = sched_ctx1;
+
+		/*submit tasks to context*/
+		ret = starpu_task_submit_to_ctx(task,sched_ctx1);
+
+		STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");
+	}
+
+	for (i = 0; i < ntasks; i++)
+	{
+		struct starpu_task *task = starpu_task_create();
+
+		task->cl = &sched_ctx_codelet;
+		task->cl_arg = sched_ctx2;
+
+		/*submit tasks to context*/
+		ret = starpu_task_submit_to_ctx(task,sched_ctx2);
+
+		STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");
+	}
+
+
+	/* tell starpu when you finished submitting tasks to this context
+	   in order to allow moving resources from this context to the inheritor one
+	   when its corresponding tasks finished executing */
+
+
+
+	/* wait for all tasks at the end*/
+	starpu_task_wait_for_all();
+
+	starpu_sched_ctx_delete(sched_ctx1);
+	starpu_sched_ctx_delete(sched_ctx2);
+	printf("ctx%d: tasks starpu executed %d out of %d\n", sched_ctx1, tasks_executed[0], NTASKS*NTASKS);
+	printf("ctx%d: tasks starpu executed %d out of %d\n", sched_ctx2, tasks_executed[1], NTASKS*NTASKS);
+	starpu_shutdown();
+
+	return 0;
+}

+ 3 - 1
include/starpu.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2009-2013  Université de Bordeaux 1
+ * Copyright (C) 2009-2014  Université de Bordeaux 1
  * Copyright (C) 2010-2014  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -73,8 +73,10 @@ extern "C"
 #endif
 
 #ifdef STARPU_SIMGRID
+#ifndef main
 #define main starpu_main
 #endif
+#endif
 
 struct starpu_conf
 {

+ 1 - 1
include/starpu_thread.h

@@ -39,7 +39,7 @@ extern "C"
 
 #ifdef STARPU_SIMGRID
 
-typedef int starpu_pthread_t;
+typedef msg_process_t starpu_pthread_t;
 typedef int starpu_pthread_attr_t;
 
 int starpu_pthread_create_on(char *name, starpu_pthread_t *thread, const starpu_pthread_attr_t *attr, void *(*start_routine) (void *), void *arg, int where);

+ 13 - 13
mpi/src/starpu_mpi_task_insert.c

@@ -232,7 +232,7 @@ int _starpu_mpi_task_decode_v(struct starpu_codelet *codelet, int me, int nb_nod
 		{
 			// the flag is decoded and set later when
 			// calling function _starpu_task_insert_create()
-			va_arg(varg_list_copy, int);
+			(void)va_arg(varg_list_copy, int);
 		}
 		else if (arg_type_nocommute==STARPU_R || arg_type_nocommute==STARPU_W || arg_type_nocommute==STARPU_RW || arg_type==STARPU_SCRATCH || arg_type==STARPU_REDUX)
 		{
@@ -265,41 +265,41 @@ int _starpu_mpi_task_decode_v(struct starpu_codelet *codelet, int me, int nb_nod
 		}
 		else if (arg_type==STARPU_VALUE)
 		{
-			va_arg(varg_list_copy, void *);
-			va_arg(varg_list_copy, size_t);
+			(void)va_arg(varg_list_copy, void *);
+			(void)va_arg(varg_list_copy, size_t);
 		}
 		else if (arg_type==STARPU_CALLBACK)
 		{
-			va_arg(varg_list_copy, void (*)(void *));
+			(void)va_arg(varg_list_copy, _starpu_callback_func_t);
 		}
 		else if (arg_type==STARPU_CALLBACK_WITH_ARG)
 		{
-			va_arg(varg_list_copy, void (*)(void *));
-			va_arg(varg_list_copy, void *);
+			(void)va_arg(varg_list_copy, _starpu_callback_func_t);
+			(void)va_arg(varg_list_copy, void *);
 		}
 		else if (arg_type==STARPU_CALLBACK_ARG)
 		{
-			va_arg(varg_list_copy, void *);
+			(void)va_arg(varg_list_copy, void *);
 		}
 		else if (arg_type==STARPU_PROLOGUE_CALLBACK)
                 {
-                        (void)va_arg(varg_list, _starpu_callback_func_t);
+			(void)va_arg(varg_list_copy, _starpu_callback_func_t);
 		}
                 else if (arg_type==STARPU_PROLOGUE_CALLBACK_ARG)
                 {
-                        (void)va_arg(varg_list, void *);
+                        (void)va_arg(varg_list_copy, void *);
                 }
                 else if (arg_type==STARPU_PROLOGUE_CALLBACK_POP)
                 {
-			(void)va_arg(varg_list, _starpu_callback_func_t);
+			(void)va_arg(varg_list_copy, _starpu_callback_func_t);
                 }
                 else if (arg_type==STARPU_PROLOGUE_CALLBACK_POP_ARG)
                 {
-                        (void)va_arg(varg_list, void *);
+                        (void)va_arg(varg_list_copy, void *);
 		}
 		else if (arg_type==STARPU_PRIORITY)
 		{
-			va_arg(varg_list_copy, int);
+			(void)va_arg(varg_list_copy, int);
 		}
 		else if (arg_type==STARPU_HYPERVISOR_TAG)
 		{
@@ -502,7 +502,7 @@ int _starpu_mpi_task_build_v(MPI_Comm comm, struct starpu_codelet *codelet, stru
 		*task = starpu_task_create();
 		(*task)->cl_arg_free = 1;
 
-		if (codelet->nbuffers > STARPU_NMAXBUFS)
+		if (codelet && codelet->nbuffers > STARPU_NMAXBUFS)
 		{
 			(*task)->dyn_handles = malloc(codelet->nbuffers * sizeof(starpu_data_handle_t));
 		}

+ 4 - 0
mpi/tests/Makefile.am

@@ -93,6 +93,7 @@ starpu_mpi_TESTS =				\
 	ring_async_implicit			\
 	block_interface				\
 	block_interface_pinned			\
+	callback				\
 	insert_task				\
 	insert_task_compute			\
 	insert_task_sent_cache			\
@@ -125,6 +126,7 @@ noinst_PROGRAMS =				\
 	ring_async_implicit			\
 	block_interface				\
 	block_interface_pinned			\
+	callback				\
 	insert_task				\
 	insert_task_compute			\
 	insert_task_sent_cache			\
@@ -172,6 +174,8 @@ block_interface_LDADD =				\
 	../src/libstarpumpi-@STARPU_EFFECTIVE_VERSION@.la
 block_interface_pinned_LDADD =			\
 	../src/libstarpumpi-@STARPU_EFFECTIVE_VERSION@.la
+callback_LDADD =				\
+	../src/libstarpumpi-@STARPU_EFFECTIVE_VERSION@.la
 insert_task_LDADD =				\
 	../src/libstarpumpi-@STARPU_EFFECTIVE_VERSION@.la
 insert_task_compute_LDADD =				\

+ 116 - 0
mpi/tests/callback.c

@@ -0,0 +1,116 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2013, 2014  Centre National de la Recherche Scientifique
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <starpu_mpi.h>
+#include "helper.h"
+
+static
+int expected_x=40;
+static
+int expected_y=12;
+
+void my_func(STARPU_ATTRIBUTE_UNUSED void *descr[], STARPU_ATTRIBUTE_UNUSED void *_args)
+{
+	FPRINTF_MPI("i am here\n");
+}
+
+struct starpu_codelet my_codelet =
+{
+	.cpu_funcs = {my_func, NULL},
+	.cuda_funcs = {my_func, NULL},
+	.opencl_funcs = {my_func, NULL}
+};
+
+static
+void callback(void *ptr)
+{
+	int *x = (int *)ptr;
+	FPRINTF_MPI("x=%d\n", *x);
+	STARPU_ASSERT_MSG(*x == expected_x, "%d != %d\n", *x, expected_x);
+	(*x)++;
+}
+
+static
+void prologue_callback(void *ptr)
+{
+	int *y = (int *)ptr;
+	FPRINTF_MPI("y=%d\n", *y);
+	STARPU_ASSERT_MSG(*y == expected_y, "%d != %d\n", *y, expected_y);
+	(*y)++;
+}
+
+int main(int argc, char **argv)
+{
+	int ret;
+	int x=40;
+	int y=12;
+	int rank, size;
+
+	ret = starpu_initialize(NULL, &argc, &argv);
+	if (ret == -ENODEV) return STARPU_TEST_SKIPPED;
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
+
+	ret = starpu_mpi_init(&argc, &argv, 1);
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_mpi_init");
+	MPI_Comm_rank(MPI_COMM_WORLD, &rank);
+	MPI_Comm_size(MPI_COMM_WORLD, &size);
+
+	ret = starpu_mpi_task_insert(MPI_COMM_WORLD,
+				     NULL,
+				     STARPU_EXECUTE_ON_NODE, 0,
+				     STARPU_CALLBACK_WITH_ARG, callback, &x,
+				     0);
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_mpi_task_insert");
+
+	if (rank == 0) expected_x ++;
+	ret = starpu_mpi_task_insert(MPI_COMM_WORLD,
+				     NULL,
+				     STARPU_EXECUTE_ON_NODE, 0,
+				     STARPU_CALLBACK, callback,
+				     STARPU_CALLBACK_ARG, &x,
+				     0);
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_insert");
+
+	if (rank == 0) expected_x ++;
+	STARPU_ASSERT_MSG(x == expected_x, "x should be equal to %d and not %d\n", expected_x, x);
+
+	ret = starpu_mpi_task_insert(MPI_COMM_WORLD,
+				     NULL,
+				     STARPU_EXECUTE_ON_NODE, 0,
+				     STARPU_PROLOGUE_CALLBACK, prologue_callback,
+				     STARPU_PROLOGUE_CALLBACK_ARG, &y,
+				     0);
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_insert");
+
+	if (rank == 0) expected_y ++;
+	ret = starpu_mpi_task_insert(MPI_COMM_WORLD,
+				     &my_codelet,
+				     STARPU_EXECUTE_ON_NODE, 0,
+				     STARPU_PROLOGUE_CALLBACK_POP, prologue_callback,
+				     STARPU_PROLOGUE_CALLBACK_POP_ARG, &y,
+				     0);
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_insert");
+
+	starpu_task_wait_for_all();
+	if (rank == 0) expected_y ++;
+	STARPU_ASSERT_MSG(y == expected_y, "y should be equal to %d and not %d\n", expected_y, y);
+
+	starpu_mpi_shutdown();
+	starpu_shutdown();
+
+	return EXIT_SUCCESS;
+}
+

+ 6 - 2
src/Makefile.am

@@ -1,6 +1,6 @@
 # StarPU --- Runtime system for heterogeneous multicore architectures.
 #
-# Copyright (C) 2009-2013  Université de Bordeaux 1
+# Copyright (C) 2009-2014  Université de Bordeaux 1
 # Copyright (C) 2010, 2011, 2012, 2013  Centre National de la Recherche Scientifique
 # Copyright (C) 2011  INRIA
 #
@@ -49,7 +49,7 @@ endif STARPU_HAVE_WINDOWS
 
 lib_LTLIBRARIES = libstarpu-@STARPU_EFFECTIVE_VERSION@.la
 
-libstarpu_@STARPU_EFFECTIVE_VERSION@_la_CPPFLAGS = -I$(top_srcdir)/include/ $(STARPU_RCCE_CPPFLAGS) -DBUILDING_STARPU
+libstarpu_@STARPU_EFFECTIVE_VERSION@_la_CPPFLAGS = -I$(top_srcdir)/include/ $(STARPU_RCCE_CPPFLAGS) -DBUILDING_STARPU -DSTARPU_DATADIR='"$(datadir)"'
 
 libstarpu_@STARPU_EFFECTIVE_VERSION@_la_CFLAGS = $(GLOBAL_AM_CFLAGS) $(HWLOC_CFLAGS) $(STARPU_CUDA_CPPFLAGS) $(STARPU_OPENCL_CPPFLAGS) $(STARPU_COI_CPPFLAGS) $(STARPU_SCIF_CPPFLAGS) $(STARPU_RCCE_CFLAGS) $(FXT_CFLAGS)
 libstarpu_@STARPU_EFFECTIVE_VERSION@_la_LIBADD = -lm $(HWLOC_LIBS) $(STARPU_OPENCL_LDFLAGS) $(STARPU_CUDA_LDFLAGS) $(STARPU_COI_LDFLAGS) $(STARPU_SCIF_LDFLAGS) $(STARPU_RCCE_LDFLAGS) $(FXT_LIBS) $(STARPU_GLPK_LDFLAGS) $(STARPU_LEVELDB_LDFLAGS)
@@ -335,5 +335,9 @@ endif
 
 #########################################
 
+if STARPU_SIMGRID
+dist_pkgdata_DATA = starpu_smpi.xslt
+endif
+
 showcheck:
 	-cat /dev/null

+ 89 - 1
src/common/fxt.h

@@ -107,6 +107,8 @@
 
 #define _STARPU_FUT_EVENT	0x513c
 
+#define _STARPU_FUT_WORKER_SCHEDULING_START	0x513e
+
 #define _STARPU_FUT_LOCKING_MUTEX	0x5140	
 #define _STARPU_FUT_MUTEX_LOCKED	0x5141	
 
@@ -185,6 +187,15 @@ void _starpu_stop_fxt_profiling(void);
  * the worker. */
 void _starpu_fxt_register_thread(unsigned);
 
+#ifdef FUT_NEEDS_COMMIT
+#define _STARPU_FUT_COMMIT(size) fut_commitstampedbuffer(size)
+#else
+#define _STARPU_FUT_COMMIT(size) do { } while (0)
+#endif
+
+#ifdef FUT_DO_PROBE2STR
+#define _STARPU_FUT_DO_PROBE2STR(CODE, P1, P2, str) FUT_DO_PROBE2STR(CODE, P1, P2, str)
+#else
 /* Sometimes we need something a little more specific than the wrappers from
  * FxT: these macro permit to put add an event with 3 (or 4) numbers followed
  * by a string. */
@@ -203,9 +214,14 @@ do {									\
 	*(futargs++) = (unsigned long)(P2);				\
 	snprintf((char *)futargs, len, "%s", str);			\
 	((char *)futargs)[len - 1] = '\0';				\
+	_STARPU_FUT_COMMIT(total_len);					\
     }									\
 } while (0);
+#endif
 
+#ifdef FUT_DO_PROBE3STR
+#define _STARPU_FUT_DO_PROBE3STR(CODE, P1, P2, P3, str) FUT_DO_PROBE3STR(CODE, P1, P2, P3, str)
+#else
 #define _STARPU_FUT_DO_PROBE3STR(CODE, P1, P2, P3, str)			\
 do {									\
     if(fut_active) {							\
@@ -222,9 +238,14 @@ do {									\
 	*(futargs++) = (unsigned long)(P3);				\
 	snprintf((char *)futargs, len, "%s", str);			\
 	((char *)futargs)[len - 1] = '\0';				\
+	_STARPU_FUT_COMMIT(total_len);					\
     }									\
 } while (0);
+#endif
 
+#ifdef FUT_DO_PROBE4STR
+#define _STARPU_FUT_DO_PROBE4STR(CODE, P1, P2, P3, P4, str) FUT_DO_PROBE4STR(CODE, P1, P2, P3, P4, str)
+#else
 #define _STARPU_FUT_DO_PROBE4STR(CODE, P1, P2, P3, P4, str)		\
 do {									\
     if(fut_active) {							\
@@ -242,9 +263,14 @@ do {									\
 	*(futargs++) = (unsigned long)(P4);				\
 	snprintf((char *)futargs, len, "%s", str);			\
 	((char *)futargs)[len - 1] = '\0';				\
+	_STARPU_FUT_COMMIT(total_len);					\
     }									\
 } while (0);
+#endif
 
+#ifdef FUT_DO_PROBE5STR
+#define _STARPU_FUT_DO_PROBE5STR(CODE, P1, P2, P3, P4, P5, str) FUT_DO_PROBE5STR(CODE, P1, P2, P3, P4, P5, str)
+#else
 #define _STARPU_FUT_DO_PROBE5STR(CODE, P1, P2, P3, P4, P5, str)		\
 do {									\
     if(fut_active) {							\
@@ -263,8 +289,65 @@ do {									\
 	*(futargs++) = (unsigned long)(P5);				\
 	snprintf((char *)futargs, len, "%s", str);			\
 	((char *)futargs)[len - 1] = '\0';				\
+	_STARPU_FUT_COMMIT(total_len);					\
+    }									\
+} while (0);
+#endif
+
+#ifdef FUT_DO_PROBE6STR
+#define _STARPU_FUT_DO_PROBE6STR(CODE, P1, P2, P3, P4, P5, P6, str) FUT_DO_PROBE6STR(CODE, P1, P2, P3, P4, P5, P6, str)
+#else
+#define _STARPU_FUT_DO_PROBE5STR(CODE, P1, P2, P3, P4, P5, P6, str)	\
+do {									\
+    if(fut_active) {							\
+	/* No more than FXT_MAX_PARAMS args are allowed */		\
+	/* we add a \0 just in case ... */				\
+	size_t len = STARPU_MIN(strlen(str)+1, (FXT_MAX_PARAMS - 6)*sizeof(unsigned long));\
+	unsigned nbargs_str = (len + sizeof(unsigned long) - 1)/(sizeof(unsigned long));\
+	unsigned nbargs = 6 + nbargs_str;				\
+	size_t total_len = FUT_SIZE(nbargs);				\
+	unsigned long *futargs =					\
+		fut_getstampedbuffer(FUT_CODE(CODE, nbargs), total_len);\
+	*(futargs++) = (unsigned long)(P1);				\
+	*(futargs++) = (unsigned long)(P2);				\
+	*(futargs++) = (unsigned long)(P3);				\
+	*(futargs++) = (unsigned long)(P4);				\
+	*(futargs++) = (unsigned long)(P5);				\
+	*(futargs++) = (unsigned long)(P6);				\
+	snprintf((char *)futargs, len, "%s", str);			\
+	((char *)futargs)[len - 1] = '\0';				\
+	_STARPU_FUT_COMMIT(total_len);					\
     }									\
 } while (0);
+#endif
+
+#ifdef FUT_DO_PROBE7STR
+#define _STARPU_FUT_DO_PROBE7STR(CODE, P1, P2, P3, P4, P5, P6, P7, str) FUT_DO_PROBE7STR(CODE, P1, P2, P3, P4, P5, P6, P7, str)
+#else
+#define _STARPU_FUT_DO_PROBE6STR(CODE, P1, P2, P3, P4, P5, P6, P7, str)	\
+do {									\
+    if(fut_active) {							\
+	/* No more than FXT_MAX_PARAMS args are allowed */		\
+	/* we add a \0 just in case ... */				\
+	size_t len = STARPU_MIN(strlen(str)+1, (FXT_MAX_PARAMS - 7)*sizeof(unsigned long));\
+	unsigned nbargs_str = (len + sizeof(unsigned long) - 1)/(sizeof(unsigned long));\
+	unsigned nbargs = 7 + nbargs_str;				\
+	size_t total_len = FUT_SIZE(nbargs);				\
+	unsigned long *futargs =					\
+		fut_getstampedbuffer(FUT_CODE(CODE, nbargs), total_len);\
+	*(futargs++) = (unsigned long)(P1);				\
+	*(futargs++) = (unsigned long)(P2);				\
+	*(futargs++) = (unsigned long)(P3);				\
+	*(futargs++) = (unsigned long)(P4);				\
+	*(futargs++) = (unsigned long)(P5);				\
+	*(futargs++) = (unsigned long)(P6);				\
+	*(futargs++) = (unsigned long)(P7);				\
+	snprintf((char *)futargs, len, "%s", str);			\
+	((char *)futargs)[len - 1] = '\0';				\
+	_STARPU_FUT_COMMIT(total_len);					\
+    }									\
+} while (0);
+#endif
 
 #ifndef FUT_RAW_PROBE7
 #define FUT_RAW_PROBE7(CODE,P1,P2,P3,P4,P5,P6,P7) do {		\
@@ -273,7 +356,8 @@ do {									\
 				fut_getstampedbuffer(CODE,		\
 						     FUT_SIZE(7)); \
 			*(__args++)=(unsigned long)(P1);*(__args++)=(unsigned long)(P2);*(__args++)=(unsigned long)(P3);*(__args++)=(unsigned long)(P4);*(__args++)=(unsigned long)(P5);*(__args++)=(unsigned long)(P6);*(__args++)=(unsigned long)(P7);				\
-				}					\
+			_STARPU_FUT_COMMIT(FUT_SIZE(7));		\
+		}							\
 	} while (0)
 #endif
 
@@ -403,6 +487,9 @@ do {										\
 #define _STARPU_TRACE_WORKER_DEINIT_END(workerkind)		\
 	FUT_DO_PROBE2(_STARPU_FUT_WORKER_DEINIT_END, workerkind, _starpu_gettid());
 
+#define _STARPU_TRACE_WORKER_SCHEDULING_START	\
+	FUT_DO_PROBE1(_STARPU_FUT_WORKER_SCHEDULING_START, _starpu_gettid());
+
 #define _STARPU_TRACE_WORKER_SLEEP_START	\
 	FUT_DO_PROBE1(_STARPU_FUT_WORKER_SLEEP_START, _starpu_gettid());
 
@@ -681,6 +768,7 @@ do {										\
 #define _STARPU_TRACE_WORK_STEALING(a, b)	do {} while(0)
 #define _STARPU_TRACE_WORKER_DEINIT_START	do {} while(0)
 #define _STARPU_TRACE_WORKER_DEINIT_END(a)	do {} while(0)
+#define _STARPU_TRACE_WORKER_SCHEDULING_START		do {} while(0)
 #define _STARPU_TRACE_WORKER_SLEEP_START		do {} while(0)
 #define _STARPU_TRACE_WORKER_SLEEP_END		do {} while(0)
 #define _STARPU_TRACE_USER_DEFINED_START		do {} while(0)

+ 5 - 4
src/common/thread.c

@@ -34,7 +34,7 @@ int starpu_pthread_create_on(char *name, starpu_pthread_t *thread, const starpu_
 	_args->f = start_routine;
 	_args->arg = arg;
 	_hosts = MSG_hosts_as_dynar();
-	MSG_process_create(name, _starpu_simgrid_thread_start, _args,
+	*thread = MSG_process_create(name, _starpu_simgrid_thread_start, _args,
 			   xbt_dynar_get_as(_hosts, (where), msg_host_t));
 	xbt_dynar_free(&_hosts);
 	return 0;
@@ -47,10 +47,11 @@ int starpu_pthread_create(starpu_pthread_t *thread, const starpu_pthread_attr_t
 
 int starpu_pthread_join(starpu_pthread_t thread, void **retval)
 {
-#ifdef STARPU_DEVEL
-#warning TODO: use a simgrid_join when it becomes available
-#endif
+#if 0 //def HAVE_MSG_PROCESS_JOIN
+	MSG_process_join(thread, 100);
+#else
 	MSG_process_sleep(1);
+#endif
 	return 0;
 }
 

+ 3 - 1
src/core/errorcheck.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2009, 2010  Université de Bordeaux 1
+ * Copyright (C) 2009, 2010, 2014  Université de Bordeaux 1
  * Copyright (C) 2010, 2011  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -34,6 +34,8 @@ enum _starpu_worker_status
 	STATUS_EXECUTING,
 	/* during the execution of the callback */
 	STATUS_CALLBACK,
+	/* while executing the scheduler code */
+	STATUS_SCHEDULING,
 	/* while sleeping because there is nothing to do */
 	STATUS_SLEEPING,
 	/* while a sleeping worker is about to wake up (to avoid waking twice for the same worker) */

+ 6 - 2
src/core/perfmodel/perfmodel.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2009-2013  Université de Bordeaux 1
+ * Copyright (C) 2009-2014  Université de Bordeaux 1
  * Copyright (C) 2010, 2011, 2012, 2013  Centre National de la Recherche Scientifique
  * Copyright (C) 2011  Télécom-SudParis
  *
@@ -293,7 +293,11 @@ double starpu_data_expected_transfer_time(starpu_data_handle_t handle, unsigned
 	if (size == 0)
 		return 0.0;
 
-	unsigned src_node = _starpu_select_src_node(handle, memory_node);
+	int src_node = _starpu_select_src_node(handle, memory_node);
+	if (src_node < 0)
+		/* Will just create it in place. Ideally we should take the
+		 * time to create it into account */
+		return 0.0;
 	return starpu_transfer_predict(src_node, memory_node, size);
 }
 

+ 1 - 1
src/core/perfmodel/perfmodel_bus.c

@@ -208,7 +208,7 @@ static void measure_bandwidth_between_host_and_dev_on_cpu_with_cuda(int dev, int
 	gettimeofday(&start, NULL);
 	for (iter = 0; iter < NITER; iter++)
 	{
-		cudaMemcpy(d_buffer, h_buffer, 1, cudaMemcpyHostToDevice);
+		cudaMemcpy(h_buffer, d_buffer, 1, cudaMemcpyDeviceToHost);
 		cudaThreadSynchronize();
 	}
 	gettimeofday(&end, NULL);

+ 229 - 148
src/core/sched_ctx.c

@@ -32,6 +32,8 @@ double flops[STARPU_NMAX_SCHED_CTXS][STARPU_NMAXWORKERS];
 size_t data_size[STARPU_NMAX_SCHED_CTXS][STARPU_NMAXWORKERS];
 
 static unsigned _starpu_get_first_free_sched_ctx(struct _starpu_machine_config *config);
+static void _starpu_sched_ctx_add_workers_to_master(unsigned sched_ctx_id, int *workerids, int nworkers, int new_master);
+static void _starpu_sched_ctx_wake_these_workers_up(unsigned sched_ctx_id, int *workerids, int nworkers);
 
 static void _starpu_worker_gets_into_ctx(unsigned sched_ctx_id, struct _starpu_worker *worker)
 {
@@ -45,6 +47,8 @@ static void _starpu_worker_gets_into_ctx(unsigned sched_ctx_id, struct _starpu_w
 		worker->nsched_ctxs++;
 	}
 	worker->removed_from_ctx[sched_ctx_id] = 0;
+	if(worker->tmp_sched_ctx == sched_ctx_id)
+		worker->tmp_sched_ctx = -1;
 	return;
 }
 
@@ -135,6 +139,7 @@ static void _starpu_add_workers_to_sched_ctx(struct _starpu_sched_ctx *sched_ctx
 	int nworkers_to_add = nworkers == -1 ? (int)config->topology.nworkers : nworkers;
 	int workers_to_add[nworkers_to_add];
 
+
 	int i = 0;
 	for(i = 0; i < nworkers_to_add; i++)
 	{
@@ -163,10 +168,22 @@ static void _starpu_add_workers_to_sched_ctx(struct _starpu_sched_ctx *sched_ctx
 			int worker = (workerids == NULL ? i : workerids[i]);
 			workers->add(workers, worker);
 			workers_to_add[i] = worker;
+			struct _starpu_worker *str_worker = _starpu_get_worker_struct(worker);
+			str_worker->tmp_sched_ctx = (int)sched_ctx->id;
+
 		}
 	}
 
-	if(sched_ctx->sched_policy->add_workers)
+	if(!sched_ctx->sched_policy)
+	{
+		if(sched_ctx->main_master == -1)
+			sched_ctx->main_master = starpu_sched_ctx_book_workers_for_task(sched_ctx->id, workerids, nworkers);
+		else
+		{
+			_starpu_sched_ctx_add_workers_to_master(sched_ctx->id, workerids, nworkers, sched_ctx->main_master);
+		}
+	}
+	else if(sched_ctx->sched_policy->add_workers)
 	{
 		if(added_workers)
 		{
@@ -199,6 +216,9 @@ static void _starpu_remove_workers_from_sched_ctx(struct _starpu_sched_ctx *sche
 		}
 	}
 
+	if(!sched_ctx->sched_policy)
+		_starpu_sched_ctx_wake_these_workers_up(sched_ctx->id, removed_workers, *n_removed_workers);
+
 	return;
 }
 
@@ -269,7 +289,7 @@ struct _starpu_sched_ctx* _starpu_create_sched_ctx(struct starpu_sched_policy *p
 
 	starpu_task_list_init(&sched_ctx->empty_ctx_tasks);
 
-	sched_ctx->sched_policy = (struct starpu_sched_policy*)malloc(sizeof(struct starpu_sched_policy));
+	sched_ctx->sched_policy = policy ? (struct starpu_sched_policy*)malloc(sizeof(struct starpu_sched_policy)) : NULL;
 	sched_ctx->is_initial_sched = is_initial_sched;
 	sched_ctx->name = sched_ctx_name;
 	sched_ctx->inheritor = STARPU_NMAX_SCHED_CTXS;
@@ -284,12 +304,33 @@ struct _starpu_sched_ctx* _starpu_create_sched_ctx(struct starpu_sched_policy *p
 	_starpu_barrier_counter_init(&sched_ctx->ready_tasks_barrier, 0);
 
 	sched_ctx->ready_flops = 0.0;
-	/*init the strategy structs and the worker_collection of the ressources of the context */
-	_starpu_init_sched_policy(config, sched_ctx, policy);
+	sched_ctx->main_master = -1;
+	
+	int w;
+	for(w = 0; w < nworkers; w++)
+	{
+		sem_init(&sched_ctx->fall_asleep_sem[w], 0, 0);
+		sem_init(&sched_ctx->wake_up_sem[w], 0, 0);
+
+		STARPU_PTHREAD_COND_INIT(&sched_ctx->parallel_sect_cond[w], NULL);
+		STARPU_PTHREAD_MUTEX_INIT(&sched_ctx->parallel_sect_mutex[w], NULL);
+		
+		sched_ctx->master[w] = -1;
+		sched_ctx->parallel_sect[w] = 0;
+		sched_ctx->sleeping[w] = 0;
+	}
 
-	/* construct the collection of workers(list/tree/etc.) */
+	
+        /*init the strategy structs and the worker_collection of the ressources of the context */
+	if(policy)
+		_starpu_init_sched_policy(config, sched_ctx, policy);
+	else
+		starpu_sched_ctx_create_worker_collection(sched_ctx->id, STARPU_WORKER_LIST);
+	
+        /* construct the collection of workers(list/tree/etc.) */
 	sched_ctx->workers->init(sched_ctx->workers);
 
+
 	/* after having an worker_collection on the ressources add them */
 	_starpu_add_workers_to_sched_ctx(sched_ctx, workerids, nworkers_ctx, NULL, NULL);
 
@@ -315,7 +356,7 @@ struct _starpu_sched_ctx* _starpu_create_sched_ctx(struct starpu_sched_policy *p
 			worker->nsched_ctxs++;
 		}
 	}
-
+	
 	return sched_ctx;
 }
 
@@ -555,9 +596,13 @@ void starpu_sched_ctx_set_perf_counters(unsigned sched_ctx_id, void* perf_counte
 static void _starpu_delete_sched_ctx(struct _starpu_sched_ctx *sched_ctx)
 {
 	STARPU_ASSERT(sched_ctx->id != STARPU_NMAX_SCHED_CTXS);
-	_starpu_deinit_sched_policy(sched_ctx);
-	free(sched_ctx->sched_policy);
-	sched_ctx->sched_policy = NULL;
+	if(sched_ctx->sched_policy)
+	{
+		_starpu_deinit_sched_policy(sched_ctx);
+		free(sched_ctx->sched_policy);
+		sched_ctx->sched_policy = NULL;
+	}
+	
 
 	STARPU_PTHREAD_MUTEX_DESTROY(&sched_ctx->empty_ctx_mutex);
 	sched_ctx->id = STARPU_NMAX_SCHED_CTXS;
@@ -609,6 +654,8 @@ void starpu_sched_ctx_delete(unsigned sched_ctx_id)
 
 	if(!_starpu_wait_for_all_tasks_of_sched_ctx(sched_ctx_id))
 	{
+		if(!sched_ctx->sched_policy)
+			starpu_sched_ctx_unbook_workers_for_task(sched_ctx->id, sched_ctx->main_master);
 		/*if btw the mutex release & the mutex lock the context has changed take care to free all
 		  scheduling data before deleting the context */
 		_starpu_update_workers_without_ctx(workerids, nworkers_ctx, sched_ctx_id, 1);
@@ -616,7 +663,6 @@ void starpu_sched_ctx_delete(unsigned sched_ctx_id)
 		_starpu_delete_sched_ctx(sched_ctx);
 
 	}
-
 	/* workerids is malloc-ed in starpu_sched_ctx_get_workers_list, don't forget to free it when
 	   you don't use it anymore */
 	free(workerids);
@@ -1483,41 +1529,15 @@ unsigned _starpu_sched_ctx_last_worker_awake(struct _starpu_worker *worker)
 	return 0;
 }
 
-static void _starpu_sched_ctx_bind_thread_to_ctx_cpus(unsigned sched_ctx_id)
+void starpu_sched_ctx_bind_current_thread_to_cpuid(unsigned cpuid STARPU_ATTRIBUTE_UNUSED)
 {
-	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
 	struct _starpu_machine_config *config = _starpu_get_machine_config();
 
-#ifdef STARPU_HAVE_HWLOC	
-	const struct hwloc_topology_support *support = hwloc_topology_get_support(config->topology.hwtopology);
-        if (support->cpubind->set_thisthread_cpubind)
-        {
-		hwloc_bitmap_t set = sched_ctx->hwloc_workers_set;
-                int ret;
-		int current_worker_id = starpu_worker_get_id();
-                ret = hwloc_set_cpubind (config->topology.hwtopology, set,
-                                         HWLOC_CPUBIND_THREAD);
-		if (ret)
-                {
-                        perror("binding thread");
-			STARPU_ABORT();
-                }
-	}
-
-#else
-#warning no sched ctx CPU binding support
-#endif
-
-	return;
-}
-
-void starpu_sched_ctx_bind_current_thread_to_cpuid(unsigned cpuid)
-{
-	struct _starpu_machine_config *config = _starpu_get_machine_config();
+	/* FIXME: why not factorize with _starpu_bind_thread_on_cpu? */
 
 #ifdef STARPU_SIMGRID
 	return;
-#endif
+#else
 	if (starpu_get_env_number("STARPU_WORKERS_NOBIND") > 0)
 		return;
 
@@ -1565,59 +1585,79 @@ void starpu_sched_ctx_bind_current_thread_to_cpuid(unsigned cpuid)
 #else
 #warning no CPU binding support
 #endif
+#endif
 
 }
 
+static unsigned _worker_sleeping_in_other_ctx(unsigned sched_ctx_id, int workerid)
+{
+	int s;
+	for(s = 0; s < STARPU_NMAX_SCHED_CTXS; s++)
+	{
+		struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(s);
+		if(sched_ctx && sched_ctx->id > 0 && sched_ctx->id < STARPU_NMAX_SCHED_CTXS && sched_ctx->id != sched_ctx_id)
+		{
+			if(sched_ctx->parallel_sect[workerid])
+				return 1;
+		}
+	}
+	return 0;
+
+}
 static void _starpu_sched_ctx_get_workers_to_sleep(unsigned sched_ctx_id, int *workerids, int nworkers, int master)
 {
-	int current_worker_id = starpu_worker_get_id();
-	
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	int current_worker_id = starpu_worker_get_id();
+	unsigned sleeping[nworkers];
 	int w;
-	struct _starpu_worker *worker = NULL;
 	for(w = 0; w < nworkers; w++)
 	{
-		worker = _starpu_get_worker_struct(workerids[w]);
-		worker->master = master;
-		if(current_worker_id == -1 || worker->workerid != current_worker_id)
-			STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
-		worker->parallel_sect = 1;
-		if(current_worker_id == -1 || worker->workerid != current_worker_id)
-			STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
+		if(current_worker_id == -1 || workerids[w] != current_worker_id)
+			STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->parallel_sect_mutex[workerids[w]]);
+		sleeping[w] = _worker_sleeping_in_other_ctx(sched_ctx_id, workerids[w]);
+		sched_ctx->master[workerids[w]] = master;
+		sched_ctx->parallel_sect[workerids[w]] = 1;
+		if(current_worker_id == -1 || workerids[w] != current_worker_id)
+			STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->parallel_sect_mutex[workerids[w]]);
 	}
 
-	struct _starpu_worker *master_worker = _starpu_get_worker_struct(master);
 	int workerid;
 	for(w = 0; w < nworkers; w++)
 	{
 		workerid = workerids[w];
-		if(current_worker_id == -1 || workerid != current_worker_id)
-			sem_wait(&master_worker->parallel_code_sem);
+		if((current_worker_id == -1 || workerid != current_worker_id) && !sleeping[w])
+		{
+			sched_ctx->sleeping[workerids[w]] = 1;
+			sem_wait(&sched_ctx->fall_asleep_sem[master]);
+		}
 	}
 	return;
 }
 
-void _starpu_sched_ctx_signal_worker_blocked(int workerid)
+void _starpu_sched_ctx_signal_worker_blocked(unsigned sched_ctx_id, int workerid)
 {
-	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
-	struct _starpu_worker *master_worker = _starpu_get_worker_struct(worker->master);
-	struct _starpu_sched_ctx *sched_ctx = NULL;
-	struct _starpu_sched_ctx_list *l = NULL;
-	for (l = worker->sched_ctx_list; l; l = l->next)
-	{
-		sched_ctx = _starpu_get_sched_ctx_struct(l->sched_ctx);
-		if(sched_ctx->id != 0)
-			sem_post(&master_worker->parallel_code_sem);
-	}	
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	int master = sched_ctx->master[workerid];
+	sem_post(&sched_ctx->fall_asleep_sem[master]);
+
+	return;
+}
+
+void _starpu_sched_ctx_signal_worker_woke_up(unsigned sched_ctx_id, int workerid)
+{
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	int master = sched_ctx->master[workerid];
+	sem_post(&sched_ctx->wake_up_sem[master]);
+	sched_ctx->sleeping[workerid] = 0;
+	sched_ctx->master[workerid] = -1;
 	return;
 }
 
 static void _starpu_sched_ctx_wake_up_workers(unsigned sched_ctx_id, int master)
 {
-	int current_worker_id = starpu_worker_get_id();
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	int current_worker_id = starpu_worker_get_id();
 	struct starpu_worker_collection *workers = sched_ctx->workers;
-	struct _starpu_worker *worker = NULL;
 
 	struct starpu_sched_ctx_iterator it;
 	if(workers->init_iterator)
@@ -1625,20 +1665,22 @@ static void _starpu_sched_ctx_wake_up_workers(unsigned sched_ctx_id, int master)
 
 	while(workers->has_next(workers, &it))
 	{
-		worker = _starpu_get_worker_struct(workers->get_next(workers, &it));
-		if(worker->master == master)
+		int workerid = workers->get_next(workers, &it);
+		int curr_master = sched_ctx->master[workerid];
+		if(curr_master == master && sched_ctx->parallel_sect[workerid])
 		{
-			if(current_worker_id == -1 || worker->workerid != current_worker_id)
+			if((current_worker_id == -1 || workerid != current_worker_id) && sched_ctx->sleeping[workerid])
 			{
-				STARPU_PTHREAD_MUTEX_LOCK(&worker->parallel_sect_mutex);
-				STARPU_PTHREAD_COND_SIGNAL(&worker->parallel_sect_cond);
-				STARPU_PTHREAD_MUTEX_UNLOCK(&worker->parallel_sect_mutex);
+				STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->parallel_sect_mutex[workerid]);
+				STARPU_PTHREAD_COND_SIGNAL(&sched_ctx->parallel_sect_cond[workerid]);
+				STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->parallel_sect_mutex[workerid]);
+				sem_wait(&sched_ctx->wake_up_sem[master]);
 			}
 			else
-				worker->parallel_sect = 0;
-			worker->master = -1;
+				sched_ctx->parallel_sect[workerid] = 0;
 		}
 	}
+
 	return;
 }
 
@@ -1648,9 +1690,6 @@ void* starpu_sched_ctx_exec_parallel_code(void* (*func)(void*), void* param, uns
 	int nworkers = starpu_sched_ctx_get_workers_list(sched_ctx_id, &workerids);
 	_starpu_sched_ctx_get_workers_to_sleep(sched_ctx_id, workerids, nworkers, workerids[nworkers-1]);
 
-	/* bind current thread on all workers of the context */
-	_starpu_sched_ctx_bind_thread_to_ctx_cpus(sched_ctx_id);
-	
 	/* execute parallel code */
 	void* ret = func(param);
 
@@ -1678,88 +1717,126 @@ void starpu_sched_ctx_get_available_cpuids(unsigned sched_ctx_id, int **cpuids,
 	while(workers->has_next(workers, &it))
 	{
 		workerid = workers->get_next(workers, &it);
-		worker = _starpu_get_worker_struct(workerid);
-		if(worker->master == current_worker_id || workerid == current_worker_id || current_worker_id == -1)
+		int master = sched_ctx->master[workerid];
+		if(master == current_worker_id || workerid == current_worker_id || current_worker_id == -1)
+		{
 			(*cpuids)[w++] = starpu_worker_get_bindid(workerid);
+		}
 	}
 	*ncpuids = w;
 	return;
 }
 
-/* int starpu_sched_ctx_book_workers_for_task(unsigned sched_ctx_id, int *workerids, int nworkers) */
-/* { */
-/* 	int current_worker_id = starpu_worker_get_id(); */
-
-/* 	int final_workerids[nworkers]; */
-/* 	int nfinal_workerids = 0; */
-/* 	int w; */
-/* 	int master = -1; */
-/* 	for(w = 0; w < nworkers; w++) */
-/* 	{ */
-/* 		if(current_worker_id == -1) */
-/* 		{ */
-/* 			final_workerids[nfinal_workerids++] = workerids[w];                           */
-/* 			if(nfinal_workerids == nworkers - 1)                          */
-/* 			{ */
-/* 				master = workerids[nfinal_workerids];   */
-/* 				break;   */
-/* 			} */
-/* 		} */
-/* 		else */
-/* 		{ */
-/* 			if(workerids[w] != current_worker_id) */
-/* 				final_workerids[nfinal_workerids++] = workerids[w]; */
-/* 			else */
-/* 			{ */
-/* 				if(nfinal_workerids == nworkers - 1) */
-/* 				{ */
-/* 					master = workerids[nfinal_workerids]; */
-/* 					break; */
-/* 				} */
-/* 				else */
-/* 					master = current_worker_id; */
-/* 			}	 */
-/* 		} */
-/* 	} */
-/* 	if(master == -1 && nfinal_workerids > 0) */
-/* 	{ */
-/* 		nfinal_workerids--; */
-/* 		master = final_workerids[nfinal_workerids]; */
-/* 	} */
-/* 	/\* get starpu workers to sleep *\/ */
-/* 	_starpu_sched_ctx_get_workers_to_sleep(sched_ctx_id, final_workerids, nfinal_workerids, master); */
-
-/* 	/\* bind current thread on all workers of the context *\/ */
-/* //	_starpu_sched_ctx_bind_thread_to_ctx_cpus(sched_ctx_id); */
-/* 	return master; */
-/* } */
-
 static void _starpu_sched_ctx_wake_these_workers_up(unsigned sched_ctx_id, int *workerids, int nworkers)
 {
-	int current_worker_id = starpu_worker_get_id();
-	
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	int current_worker_id = starpu_worker_get_id();
+
+	int masters[nworkers];
 	int w;
 	struct _starpu_worker *worker = NULL;
 	for(w = 0; w < nworkers; w++)
 	{
-		worker = _starpu_get_worker_struct(workerids[w]);
-		if(current_worker_id == -1 || worker->workerid != current_worker_id)
+		int workerid = workerids[w];
+		masters[w] = sched_ctx->master[workerid];
+		if(current_worker_id == -1 || workerid != current_worker_id)
 		{
-			STARPU_PTHREAD_MUTEX_LOCK(&worker->parallel_sect_mutex);
-			STARPU_PTHREAD_COND_SIGNAL(&worker->parallel_sect_cond);
-			STARPU_PTHREAD_MUTEX_UNLOCK(&worker->parallel_sect_mutex);
+			STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->parallel_sect_mutex[workerid]);
+			STARPU_PTHREAD_COND_SIGNAL(&sched_ctx->parallel_sect_cond[workerid]);
+			STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->parallel_sect_mutex[workerid]);
 		}
 		else
-			worker->parallel_sect = 0;
+			sched_ctx->parallel_sect[workerid] = 0;
+		sched_ctx->master[workerid] = -1;
+	}
+
+	int workerid;
+	for(w = 0; w < nworkers; w++)
+	{
+		workerid = workerids[w];
+		if(masters[w] != -1)
+		{
+			int master = sched_ctx->master[workerid];
+			if(current_worker_id == -1 || workerid != current_worker_id)
+				sem_wait(&sched_ctx->wake_up_sem[master]);
+		}
 	}
+
 	return;
 }
 
-
-int starpu_sched_ctx_book_workers_for_task(unsigned sched_ctx_id, int *workerids, int nworkers)
-{ 
+static int _starpu_sched_ctx_find_master(unsigned sched_ctx_id, int *workerids, int nworkers)
+{
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
 	int new_master = workerids[nworkers-1];
+        int current_worker_id = starpu_worker_get_id();
+        int current_is_in_section = 0;
+        int npotential_masters = 0;
+        int nawake_workers = 0;
+        int ntrue_masters = 0;
+        int potential_masters[nworkers];
+        int awake_workers[nworkers];
+        int true_masters[nworkers];
+
+        int i,w;
+        for(w = 0 ; w < nworkers ; w++)
+        {
+                if (current_worker_id == workerids[w])
+                        current_is_in_section = 1;
+
+		int master = sched_ctx->master[workerids[w]];
+                if (master > -1)
+		{
+                        int already_seen = 0;
+                        //Could create a function for this. Basically searching an element in an array.                                                                                                             
+                        for (i = 0 ; i < npotential_masters; i++)
+                        {
+                                if (potential_masters[i] == master)
+				{
+                                        already_seen = 1;
+                                        break;
+				}
+                        }
+                        if (!already_seen)
+				potential_masters[npotential_masters++] = master;
+                }
+                else if (master == -1)
+                        awake_workers[nawake_workers++] = workerids[w];
+        }
+
+        for (i = 0 ; i < npotential_masters ; i++) {
+		int master_is_in_section = 0;
+		//Could create a function for this. Basically searching an element in an array.                                                                                                                     
+		for (w = 0 ; w < nworkers ; w++)
+		{
+			if (workerids[w] == potential_masters[i])
+			{
+				master_is_in_section = 1;
+				break;
+			}
+		}
+                if (master_is_in_section)
+			true_masters[ntrue_masters++] = potential_masters[i];
+        }
+
+        if (current_is_in_section)
+                new_master = current_worker_id;
+        else
+        {
+                if (ntrue_masters > 1)
+		{
+                        if (nawake_workers > 0)
+                                new_master = awake_workers[nawake_workers - 1];
+                        else
+                                new_master = true_masters[ntrue_masters - 1];
+		}
+	}
+	return new_master;
+}
+
+static void _starpu_sched_ctx_add_workers_to_master(unsigned sched_ctx_id, int *workerids, int nworkers, int new_master)
+{
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
 	int w;
 	int nput_to_sleep = 0;
 	int nwake_up = 0;
@@ -1768,20 +1845,24 @@ int starpu_sched_ctx_book_workers_for_task(unsigned sched_ctx_id, int *workerids
 	
 	for(w = 0 ; w < nworkers ; w++)
 	{
-		struct _starpu_worker *worker = _starpu_get_worker_struct(workerids[w]);
-		if (worker->master == -1 && workerids[w] != new_master)
+		int master = sched_ctx->master[workerids[w]];
+		if (master == -1 && workerids[w] != new_master)
 			put_to_sleep[nput_to_sleep++] = workerids[w];
-		else if(worker->master != -1 && workerids[w] == new_master)
+		else if(master != -1 && workerids[w] == new_master)
 			wake_up[nwake_up++] = workerids[w];
-		
-		if (workerids[w] != new_master)
-			worker->master = new_master;
-		else
-			worker->master = -1;
 	}
-	_starpu_sched_ctx_wake_these_workers_up(sched_ctx_id, wake_up, nwake_up);
-	_starpu_sched_ctx_get_workers_to_sleep(sched_ctx_id, put_to_sleep, nput_to_sleep, new_master);
-	
+
+	if(nwake_up > 0)
+		_starpu_sched_ctx_wake_these_workers_up(sched_ctx_id, wake_up, nwake_up);
+	if(nput_to_sleep > 0)
+		_starpu_sched_ctx_get_workers_to_sleep(sched_ctx_id, put_to_sleep, nput_to_sleep, new_master);
+
+}
+
+int starpu_sched_ctx_book_workers_for_task(unsigned sched_ctx_id, int *workerids, int nworkers)
+{ 
+	int new_master = _starpu_sched_ctx_find_master(sched_ctx_id, workerids, nworkers);	
+	_starpu_sched_ctx_add_workers_to_master(sched_ctx_id, workerids, nworkers, new_master);
 	return new_master;
 }
 

+ 32 - 1
src/core/sched_ctx.h

@@ -119,6 +119,34 @@ struct _starpu_sched_ctx
 	
 	/* value placing the contexts in their hierarchy */
 	unsigned hierarchy_level;
+
+	/* if we execute non-StarPU code inside the context 
+	   we have a single master worker that stays awake, 
+	   if not master is -1 */
+	int main_master;
+
+	/* conditions variables used when parallel sections are executed in contexts */
+	starpu_pthread_cond_t parallel_sect_cond[STARPU_NMAXWORKERS];
+	starpu_pthread_mutex_t parallel_sect_mutex[STARPU_NMAXWORKERS];
+
+	/* boolean indicating that workers should block in order to allow
+	   parallel sections to be executed on their allocated resources */
+	unsigned parallel_sect[STARPU_NMAXWORKERS];
+
+	/* id of the master worker */
+	int master[STARPU_NMAXWORKERS];
+
+	/* semaphore that block appl thread until starpu threads are 
+	   all blocked and ready to exec the parallel code */
+	sem_t fall_asleep_sem[STARPU_NMAXWORKERS];
+
+	/* semaphore that block appl thread until starpu threads are 
+	   all woke up and ready continue appl */
+	sem_t wake_up_sem[STARPU_NMAXWORKERS];
+       
+	/* bool indicating if the workers is sleeping in this ctx */
+	unsigned sleeping[STARPU_NMAXWORKERS];
+
 };
 
 struct _starpu_machine_config;
@@ -177,7 +205,10 @@ starpu_pthread_rwlock_t* _starpu_sched_ctx_get_changing_ctx_mutex(unsigned sched
 unsigned _starpu_sched_ctx_last_worker_awake(struct _starpu_worker *worker);
 
 /* let the appl know that the worker blocked to execute parallel code */
-void _starpu_sched_ctx_signal_worker_blocked(int workerid);
+void _starpu_sched_ctx_signal_worker_blocked(unsigned sched_ctx_id, int workerid);
+
+/* let the appl know that the worker woke up */
+void _starpu_sched_ctx_signal_worker_woke_up(unsigned sched_ctx_id, int workerid);
 
 /* If starpu_sched_ctx_set_context() has been called, returns the context
  * id set by its last call, or the id of the initial context */

+ 30 - 14
src/core/sched_policy.c

@@ -455,13 +455,20 @@ int _starpu_push_task_to_workers(struct starpu_task *task)
 				starpu_prefetch_task_input_on_node(task, config->scc_nodeid);
 		}
 
-		STARPU_ASSERT(sched_ctx->sched_policy->push_task);
-		/* check out if there are any workers in the context */
-		starpu_pthread_rwlock_t *changing_ctx_mutex = _starpu_sched_ctx_get_changing_ctx_mutex(sched_ctx->id);
-		STARPU_PTHREAD_RWLOCK_RDLOCK(changing_ctx_mutex);
-		nworkers = starpu_sched_ctx_get_nworkers(sched_ctx->id);
-		ret = nworkers == 0 ? -1 : sched_ctx->sched_policy->push_task(task);
-		STARPU_PTHREAD_RWLOCK_UNLOCK(changing_ctx_mutex);
+		if(!sched_ctx->sched_policy)
+		{
+			ret = _starpu_push_task_on_specific_worker(task, sched_ctx->main_master);
+		}
+		else
+		{
+			STARPU_ASSERT(sched_ctx->sched_policy->push_task);
+			/* check out if there are any workers in the context */
+			starpu_pthread_rwlock_t *changing_ctx_mutex = _starpu_sched_ctx_get_changing_ctx_mutex(sched_ctx->id);
+			STARPU_PTHREAD_RWLOCK_RDLOCK(changing_ctx_mutex);
+			nworkers = starpu_sched_ctx_get_nworkers(sched_ctx->id);
+			ret = nworkers == 0 ? -1 : sched_ctx->sched_policy->push_task(task);
+			STARPU_PTHREAD_RWLOCK_UNLOCK(changing_ctx_mutex);
+		}
 
 		if(ret == -1)
 		{
@@ -799,6 +806,12 @@ pick:
 	if (task->mf_skip)
 		goto profiling;
 
+	/*
+	 * This worker may not be able to execute this task. In this case, we
+	 * should return the task anyway. It will be pushed back almost immediatly.
+	 * This way, we avoid computing and executing the conversions tasks.
+	 * Here, we do not care about what implementation is used.
+	 */
 	worker_id = starpu_worker_get_id();
 	if (!starpu_worker_can_execute_task(worker_id, task, 0))
 		return task;
@@ -858,18 +871,21 @@ profiling:
 
 struct starpu_task *_starpu_pop_every_task(struct _starpu_sched_ctx *sched_ctx)
 {
-	STARPU_ASSERT(sched_ctx->sched_policy->pop_every_task);
-
-	/* TODO set profiling info */
-	if(sched_ctx->sched_policy->pop_every_task)
-		return sched_ctx->sched_policy->pop_every_task(sched_ctx->id);
+	if(sched_ctx->sched_policy)
+	{
+		STARPU_ASSERT(sched_ctx->sched_policy->pop_every_task);
+		
+		/* TODO set profiling info */
+		if(sched_ctx->sched_policy->pop_every_task)
+			return sched_ctx->sched_policy->pop_every_task(sched_ctx->id);
+	}
 	return NULL;
 }
 
 void _starpu_sched_pre_exec_hook(struct starpu_task *task)
 {
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
-	if (sched_ctx->sched_policy->pre_exec_hook)
+	if (sched_ctx->sched_policy && sched_ctx->sched_policy->pre_exec_hook)
 		sched_ctx->sched_policy->pre_exec_hook(task);
 }
 
@@ -877,7 +893,7 @@ void _starpu_sched_post_exec_hook(struct starpu_task *task)
 {
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
 
-	if (sched_ctx->sched_policy->post_exec_hook)
+	if (sched_ctx->sched_policy && sched_ctx->sched_policy->post_exec_hook)
 		sched_ctx->sched_policy->post_exec_hook(task);
 }
 

+ 114 - 14
src/core/simgrid.c

@@ -23,9 +23,18 @@
 
 #ifdef STARPU_SIMGRID
 #include <msg/msg.h>
+#include <smpi/smpif.h>
+
+#define STARPU_MPI_AS_PREFIX "StarPU-MPI"
 
 #pragma weak starpu_main
 extern int starpu_main(int argc, char *argv[]);
+#pragma weak smpi_main
+extern int smpi_main(int (*realmain) (int argc, char *argv[]), int argc, char *argv[]);
+#pragma weak smpi_simulated_main_
+extern int smpi_simulated_main_(int argc, char *argv[]);
+
+#define _starpu_simgrid_running_smpi() (getenv("SMPI_GLOBAL_SIZE") != NULL)
 
 struct main_args
 {
@@ -39,13 +48,45 @@ int do_starpu_main(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[] STARPU_ATTRIBU
 	return starpu_main(args->argc, args->argv);
 }
 
+static msg_as_t __starpu_simgrid_get_as_by_name(msg_as_t root, const char *name)
+{
+	xbt_dict_t dict;
+	xbt_dict_cursor_t cursor;
+	const char *key;
+	msg_as_t as, ret;
+	dict = MSG_environment_as_get_routing_sons(root);
+	xbt_dict_foreach(dict, cursor, key, as) {
+		if (!strcmp(MSG_environment_as_get_name(as), name))
+			return as;
+		ret = __starpu_simgrid_get_as_by_name(as, name);
+		if (ret)
+			return ret;
+	}
+	return NULL;
+}
+
+static msg_as_t _starpu_simgrid_get_as_by_name(const char *name)
+{
+	return __starpu_simgrid_get_as_by_name(MSG_environment_get_routing_root(), name);
+}
+
 int _starpu_simgrid_get_nbhosts(const char *prefix)
 {
 	int ret;
-	xbt_dynar_t hosts = MSG_hosts_as_dynar();
-	unsigned i, nb = xbt_dynar_length(hosts);
+	xbt_dynar_t hosts;
+	unsigned i, nb;
 	unsigned len = strlen(prefix);
 
+	if (_starpu_simgrid_running_smpi())
+	{
+		char name[16];
+		snprintf(name, sizeof(name), STARPU_MPI_AS_PREFIX"%u", smpi_current_rank);
+		hosts = MSG_environment_as_get_hosts(_starpu_simgrid_get_as_by_name(name));
+	}
+	else
+		hosts = MSG_hosts_as_dynar();
+	nb = xbt_dynar_length(hosts);
+
 	ret = 0;
 	for (i = 0; i < nb; i++) {
 		const char *name;
@@ -65,7 +106,7 @@ unsigned long long _starpu_simgrid_get_memsize(const char *prefix, unsigned devi
 
 	snprintf(name, sizeof(name), "%s%u", prefix, devid);
 
-	host = MSG_get_host_by_name(name);
+	host = _starpu_simgrid_get_host_by_name(name);
 	if (!host)
 		return 0;
 
@@ -79,22 +120,39 @@ unsigned long long _starpu_simgrid_get_memsize(const char *prefix, unsigned devi
 	return atoll(memsize);
 }
 
+msg_host_t _starpu_simgrid_get_host_by_name(const char *name)
+{
+	if (_starpu_simgrid_running_smpi())
+	{
+		char mpiname[16];
+		snprintf(mpiname, sizeof(mpiname), "%d-%s", smpi_current_rank, name);
+		return MSG_get_host_by_name(mpiname);
+	}
+	else
+		return MSG_get_host_by_name(name);
+}
+
 #ifdef STARPU_DEVEL
 #warning TODO: use another way to start main, when simgrid provides it, and then include the application-provided configuration for platform numbers
 #endif
 #undef main
 int main(int argc, char **argv)
 {
-	xbt_dynar_t hosts;
-	int i;
 	char path[256];
 
-	if (!starpu_main)
+	if (!starpu_main && !(smpi_main && smpi_simulated_main_))
 	{
 		_STARPU_ERROR("The main file of this application needs to be compiled with starpu.h included, to properly define starpu_main\n");
 		exit(EXIT_FAILURE);
 	}
 
+	if (_starpu_simgrid_running_smpi())
+	{
+		/* Oops, we are running SMPI, let it start Simgrid, and we'll
+		 * take back hand in _starpu_simgrid_init from starpu_init() */
+		return smpi_main(smpi_simulated_main_, argc, argv);
+	}
+
 	MSG_init(&argc, argv);
 #if SIMGRID_VERSION_MAJOR < 3 || (SIMGRID_VERSION_MAJOR == 3 && SIMGRID_VERSION_MINOR < 9)
 	/* Versions earlier than 3.9 didn't support our communication tasks */
@@ -108,17 +166,59 @@ int main(int argc, char **argv)
 	_starpu_simgrid_get_platform_path(path, sizeof(path));
 	MSG_create_environment(path);
 
-	hosts = MSG_hosts_as_dynar();
+	struct main_args args = { .argc = argc, .argv = argv };
+	MSG_process_create("main", &do_starpu_main, &args, MSG_get_host_by_name("MAIN"));
+
+	MSG_main();
+	return 0;
+}
+
+void _starpu_simgrid_init()
+{
+	xbt_dynar_t hosts;
+	int i;
+
+	if (_starpu_simgrid_running_smpi())
+	{
+		/* Take back hand to create the local platform for this MPI
+		 * node */
+
+		char asname[16];
+		char path[256];
+		char cmdline[1024];
+		FILE *in;
+		int out;
+		char template[] = "/tmp/"STARPU_MPI_AS_PREFIX"-platform-XXXXXX.xml";
+		int ret;
+
+		snprintf(asname, sizeof(asname), STARPU_MPI_AS_PREFIX"%u", smpi_current_rank);
+
+		/* Get XML platform */
+		_starpu_simgrid_get_platform_path(path, sizeof(path));
+		in = fopen(path, "r");
+		STARPU_ASSERT_MSG(in, "Could not open platform file %s", path);
+		out = mkstemps(template, strlen(".xml"));
+
+		/* Generate modified XML platform */
+		STARPU_ASSERT_MSG(out >= 0, "Could not create temporary file like %s", template);
+		close(out);
+		snprintf(cmdline, sizeof(cmdline), "xsltproc --novalid --stringparam ASname %s -o %s "STARPU_DATADIR"/starpu/starpu_smpi.xslt %s", asname, template, path);
+		ret = system(cmdline);
+		STARPU_ASSERT_MSG(ret == 0, "running xsltproc to generate SMPI platforms %s from %s failed", template, path);
+
+		/* And create it */
+		MSG_create_environment(template);
+		unlink(template);
+		hosts = MSG_environment_as_get_hosts(_starpu_simgrid_get_as_by_name(asname));
+	}
+	else
+		hosts = MSG_hosts_as_dynar();
+
 	int nb = xbt_dynar_length(hosts);
 	for (i = 0; i < nb; i++)
 		MSG_host_set_data(xbt_dynar_get_as(hosts, i, msg_host_t), calloc(MAX_TSD, sizeof(void*)));
 
-	struct main_args args = { .argc = argc, .argv = argv };
-	MSG_process_create("main", &do_starpu_main, &args, xbt_dynar_get_as(hosts, 0, msg_host_t));
 	xbt_dynar_free(&hosts);
-
-	MSG_main();
-	return 0;
 }
 
 /* Task execution submitted by StarPU */
@@ -247,7 +347,7 @@ static int transfer_execute(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[] STARP
 		if (!wake->nwait)
 		{
 			_STARPU_DEBUG("triggering transfer %p\n", wake);
-			MSG_process_create("transfer task", transfer_execute, wake, MSG_get_host_by_name("MAIN"));
+			MSG_process_create("transfer task", transfer_execute, wake, _starpu_simgrid_get_host_by_name("MAIN"));
 		}
 	}
 
@@ -288,7 +388,7 @@ static void transfer_submit(struct transfer *transfer)
 	if (!transfer->nwait)
 	{
 		_STARPU_DEBUG("transfer %p waits for nobody, starting\n", transfer);
-		MSG_process_create("transfer task", transfer_execute, transfer, MSG_get_host_by_name("MAIN"));
+		MSG_process_create("transfer task", transfer_execute, transfer, _starpu_simgrid_get_host_by_name("MAIN"));
 	}
 }
 

+ 2 - 0
src/core/simgrid.h

@@ -30,11 +30,13 @@ struct _starpu_pthread_args
 
 #define MAX_TSD 16
 
+void _starpu_simgrid_init(void);
 void _starpu_simgrid_execute_job(struct _starpu_job *job, struct starpu_perfmodel_arch* perf_arch, double length);
 int _starpu_simgrid_transfer(size_t size, unsigned src_node, unsigned dst_node, struct _starpu_data_request *req);
 /* Return the number of hosts prefixed by PREFIX */
 int _starpu_simgrid_get_nbhosts(const char *prefix);
 unsigned long long _starpu_simgrid_get_memsize(const char *prefix, unsigned devid);
+msg_host_t _starpu_simgrid_get_host_by_name(const char *name);
 void _starpu_simgrid_get_platform_path(char *path, size_t maxlen);
 #endif
 

+ 5 - 2
src/core/task.c

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2009-2014  Université de Bordeaux 1
- * Copyright (C) 2010, 2011, 2012, 2013  Centre National de la Recherche Scientifique
+ * Copyright (C) 2010, 2011, 2012, 2013, 2014  Centre National de la Recherche Scientifique
  * Copyright (C) 2011  Télécom-SudParis
  * Copyright (C) 2011  INRIA
  *
@@ -34,6 +34,7 @@
 #include <core/debug.h>
 #include <core/sched_ctx.h>
 #include <time.h>
+#include <signal.h>
 #ifdef STARPU_HAVE_WINDOWS
 #include <windows.h>
 #endif
@@ -1047,11 +1048,13 @@ static void *watchdog_func(void *foo STARPU_ATTRIBUTE_UNUSED)
 			if (getenv("STARPU_WATCHDOG_CRASH"))
 			{
 				fprintf(stderr,"Crashing the process\n");
-				assert(0);
+				raise(SIGABRT);
 			}
 			else
 				fprintf(stderr,"Set the STARPU_WATCHDOG_CRASH environment variable if you want to abort the process in such a case\n");
 		}
+		/* Only shout again after another period */
+		config->watchdog_ok = 1;
 	}
 	STARPU_PTHREAD_MUTEX_UNLOCK(&config->submitted_mutex);
 	return NULL;

+ 7 - 6
src/core/topology.c

@@ -1093,11 +1093,11 @@ _starpu_init_machine_config (struct _starpu_machine_config *config, int no_mp_co
 void
 _starpu_bind_thread_on_cpu (
 	struct _starpu_machine_config *config STARPU_ATTRIBUTE_UNUSED,
-	int cpuid)
+	int cpuid STARPU_ATTRIBUTE_UNUSED)
 {
 #ifdef STARPU_SIMGRID
 	return;
-#endif
+#else
 	if (starpu_get_env_number("STARPU_WORKERS_NOBIND") > 0)
 		return;
 	if (cpuid < 0)
@@ -1157,6 +1157,7 @@ _starpu_bind_thread_on_cpu (
 #else
 #warning no CPU binding support
 #endif
+#endif
 }
 
 
@@ -1217,7 +1218,7 @@ _starpu_init_workers_binding (struct _starpu_machine_config *config, int no_mp_c
 
 #ifdef STARPU_SIMGRID
 	char name[16];
-	msg_host_t host = MSG_get_host_by_name("RAM");
+	msg_host_t host = _starpu_simgrid_get_host_by_name("RAM");
 	STARPU_ASSERT(host);
 	_starpu_simgrid_memory_node_set_host(STARPU_MAIN_RAM, host);
 #endif
@@ -1277,7 +1278,7 @@ _starpu_init_workers_binding (struct _starpu_machine_config *config, int no_mp_c
 					memory_node = numa_memory_nodes[numaid] = _starpu_memory_node_register(STARPU_CPU_RAM, numaid);
 #ifdef STARPU_SIMGRID
 					snprintf(name, sizeof(name), "RAM%d", numaid);
-					host = MSG_get_host_by_name(name);
+					host = _starpu_simgrid_get_host_by_name(name);
 					STARPU_ASSERT(host);
 					_starpu_simgrid_memory_node_set_host(memory_node, host);
 #endif
@@ -1315,7 +1316,7 @@ _starpu_init_workers_binding (struct _starpu_machine_config *config, int no_mp_c
 					_starpu_register_bus(memory_node, STARPU_MAIN_RAM);
 #ifdef STARPU_SIMGRID
 					snprintf(name, sizeof(name), "CUDA%d", devid);
-					host = MSG_get_host_by_name(name);
+					host = _starpu_simgrid_get_host_by_name(name);
 					STARPU_ASSERT(host);
 					_starpu_simgrid_memory_node_set_host(memory_node, host);
 #endif /* SIMGRID */
@@ -1365,7 +1366,7 @@ _starpu_init_workers_binding (struct _starpu_machine_config *config, int no_mp_c
 					_starpu_register_bus(memory_node, STARPU_MAIN_RAM);
 #ifdef STARPU_SIMGRID
 					snprintf(name, sizeof(name), "OpenCL%d", devid);
-					host = MSG_get_host_by_name(name);
+					host = _starpu_simgrid_get_host_by_name(name);
 					STARPU_ASSERT(host);
 					_starpu_simgrid_memory_node_set_host(memory_node, host);
 #endif /* SIMGRID */

+ 10 - 9
src/core/workers.c

@@ -39,6 +39,7 @@
 
 #ifdef STARPU_SIMGRID
 #include <msg/msg.h>
+#include <core/simgrid.h>
 #endif
 
 #ifdef __MINGW32__
@@ -269,7 +270,8 @@ static int _starpu_can_use_nth_implementation(enum starpu_worker_archtype arch,
 
 int starpu_worker_can_execute_task(unsigned workerid, struct starpu_task *task, unsigned nimpl)
 {
-	if(config.workers[workerid].parallel_sect) return 0;
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
+	if(sched_ctx->parallel_sect[workerid]) return 0;
 	/* TODO: check that the task operand sizes will fit on that device */
 	return (task->cl->where & config.workers[workerid].worker_mask) &&
 		_starpu_can_use_nth_implementation(config.workers[workerid].arch, task->cl, nimpl) &&
@@ -326,7 +328,7 @@ int starpu_combined_worker_can_execute_task(unsigned workerid, struct starpu_tas
  * Runtime initialization methods
  */
 
-#ifdef STARPU_USE_CUDA
+#if defined(STARPU_USE_CUDA) || defined(STARPU_SIMGRID)
 static struct _starpu_worker_set cuda_worker_set[STARPU_MAXCUDADEVS];
 #endif
 #ifdef STARPU_USE_MIC
@@ -411,7 +413,6 @@ static void _starpu_worker_init(struct _starpu_worker *workerarg, struct _starpu
 	starpu_task_list_init(&workerarg->local_tasks);
 	workerarg->current_task = NULL;
 	workerarg->set = NULL;
-	sem_init(&workerarg->parallel_code_sem, 0, 0);
 
 	/* if some codelet's termination cannot be handled directly :
 	 * for instance in the Gordon driver, Gordon tasks' callbacks
@@ -428,6 +429,7 @@ static void _starpu_worker_init(struct _starpu_worker *workerarg, struct _starpu
 	workerarg->run_by_starpu = 1;
 
 	workerarg->sched_ctx_list = NULL;
+	workerarg->tmp_sched_ctx = -1;
 	workerarg->nsched_ctxs = 0;
 	_starpu_barrier_counter_init(&workerarg->tasks_barrier, 0);
 
@@ -439,10 +441,6 @@ static void _starpu_worker_init(struct _starpu_worker *workerarg, struct _starpu
 
 	workerarg->spinning_backoff = 1;
 
-	STARPU_PTHREAD_COND_INIT(&workerarg->parallel_sect_cond, NULL);
-	STARPU_PTHREAD_MUTEX_INIT(&workerarg->parallel_sect_mutex, NULL);
-
-	workerarg->parallel_sect = 0;
 
 	for(ctx = 0; ctx < STARPU_NMAX_SCHED_CTXS; ctx++)
 	{
@@ -453,7 +451,6 @@ static void _starpu_worker_init(struct _starpu_worker *workerarg, struct _starpu
 	workerarg->reverse_phase[1] = 0;
 	workerarg->pop_ctx_priority = 1;
 	workerarg->sched_mutex_locked = 0;
-	workerarg->master = -1;
 
 	/* cpu_set/hwloc_cpu_set initialized in topology.c */
 }
@@ -943,7 +940,9 @@ int starpu_initialize(struct starpu_conf *user_conf, int *argc, char ***argv)
 
 	int ret;
 
-#ifndef STARPU_SIMGRID
+#ifdef STARPU_SIMGRID
+	_starpu_simgrid_init();
+#else
 #ifdef __GNUC__
 #ifndef __OPTIMIZE__
 	_STARPU_DISP("Warning: StarPU was configured with --enable-debug (-O0), and is thus not optimized\n");
@@ -1316,7 +1315,9 @@ void starpu_shutdown(void)
 	_starpu_delete_all_sched_ctxs();
 
 	_starpu_disk_unregister();
+#ifdef STARPU_HAVE_HWLOC
 	starpu_tree_free(config.topology.tree);
+#endif
 	_starpu_destroy_topology(&config);
 #ifdef STARPU_USE_FXT
 	_starpu_stop_fxt_profiling();

+ 1 - 14
src/core/workers.h

@@ -84,6 +84,7 @@ LIST_TYPE(_starpu_worker,
 	unsigned run_by_starpu; /* Is this run by StarPU or directly by the application ? */
 
 	struct _starpu_sched_ctx_list *sched_ctx_list;
+	int tmp_sched_ctx;
 	unsigned nsched_ctxs; /* the no of contexts a worker belongs to*/
 	struct _starpu_barrier_counter tasks_barrier; /* wait for the tasks submitted */
 
@@ -93,13 +94,6 @@ LIST_TYPE(_starpu_worker,
 
 	unsigned spinning_backoff ; /* number of cycles to pause when spinning  */
 
-	/* conditions variables used when parallel sections are executed in contexts */
-	starpu_pthread_cond_t parallel_sect_cond;
-	starpu_pthread_mutex_t parallel_sect_mutex;
-
-	/* boolean indicating that workers should block in order to allow
-	   parallel sections to be executed on their allocated resources */
-	unsigned parallel_sect;
 
 	/* indicate whether the workers shares tasks lists with other workers*/
 	/* in this case when removing him from a context it disapears instantly */
@@ -118,13 +112,6 @@ LIST_TYPE(_starpu_worker,
 	/* flag to know if sched_mutex is locked or not */
 	unsigned sched_mutex_locked;
 
-	/* id of the master worker */
-	int master;
-
-	/* semaphore that block appl thread until threads are ready 
-	   to exec the parallel code */
-	sem_t parallel_code_sem;
-
 #ifdef __GLIBC__
 	cpu_set_t cpu_set;
 #endif /* __GLIBC__ */

+ 21 - 11
src/datawizard/coherency.c

@@ -27,7 +27,7 @@
 #include <starpu_scheduler.h>
 
 static int link_supports_direct_transfers(starpu_data_handle_t handle, unsigned src_node, unsigned dst_node, unsigned *handling_node);
-unsigned _starpu_select_src_node(starpu_data_handle_t handle, unsigned destination)
+int _starpu_select_src_node(starpu_data_handle_t handle, unsigned destination)
 {
 	int src_node = -1;
 	unsigned i;
@@ -50,6 +50,12 @@ unsigned _starpu_select_src_node(starpu_data_handle_t handle, unsigned destinati
 		}
 	}
 
+	if (src_node_mask == 0 && handle->init_cl)
+	{
+		/* No copy yet, but applicationg told us how to build it.  */
+		return -1;
+	}
+
 	/* we should have found at least one copy ! */
 	STARPU_ASSERT(src_node_mask != 0);
 
@@ -194,7 +200,7 @@ static int worker_supports_direct_access(unsigned node, unsigned handling_node)
 			enum starpu_node_kind kind = starpu_node_get_kind(handling_node);
 			/* GPUs not always allow direct remote access: if CUDA4
 			 * is enabled, we allow two CUDA devices to communicate. */
-			return kind == STARPU_CPU_RAM || kind == STARPU_CUDA_RAM;
+			return kind == STARPU_CUDA_RAM;
 		}
 #else
 			/* Direct GPU-GPU transfers are not allowed in general */
@@ -427,16 +433,23 @@ struct _starpu_data_request *_starpu_create_request_to_fetch_data(starpu_data_ha
 	STARPU_ASSERT(dst_replicate->state == STARPU_INVALID);
 
 	/* find someone who already has the data */
-	unsigned src_node = 0;
+	int src_node = 0;
 
 	if (mode & STARPU_R)
 	{
 		src_node = _starpu_select_src_node(handle, requesting_node);
-		STARPU_ASSERT(src_node != requesting_node);
+		STARPU_ASSERT(src_node != (int) requesting_node);
+		if (src_node < 0)
+		{
+			/* We will create it, no need to read an existing value */
+			mode &= ~STARPU_R;
+		}
 	}
 	else
 	{
-		/* if the data is in write only mode, there is no need for a source */
+		/* if the data is in write only mode (and not SCRATCH or REDUX), there is no need for a source, data will be initialized by the task itself */
+		if (mode & STARPU_W)
+			dst_replicate->initialized = 1;
 		if (requesting_node == STARPU_MAIN_RAM) {
 			/* And this is the main RAM, really no need for a
 			 * request, just allocate */
@@ -754,12 +767,9 @@ int _starpu_fetch_task_input(struct _starpu_job *j)
 
 		_STARPU_TASK_SET_INTERFACE(task , local_replicate->data_interface, index);
 
-		if (mode & STARPU_REDUX)
-		{
-			/* If the replicate was not initialized yet, we have to do it now */
-			if (!local_replicate->initialized)
-				_starpu_redux_init_data_replicate(handle, local_replicate, workerid);
-		}
+		/* If the replicate was not initialized yet, we have to do it now */
+		if (!(mode & STARPU_SCRATCH) && !local_replicate->initialized)
+			_starpu_redux_init_data_replicate(handle, local_replicate, workerid);
 	}
 
 	if (profiling && task->profiling_info)

+ 2 - 3
src/datawizard/coherency.h

@@ -51,8 +51,7 @@ LIST_TYPE(_starpu_data_replicate,
 	 * filters. */
 	unsigned relaxed_coherency;
 
-	/* In the case of a SCRATCH access, we need to initialize the replicate
-	 * with a neutral element before using it. */
+	/* We may need to initialize the replicate with some value before using it. */
 	unsigned initialized;
 
 	/* describes the state of the local data in term of coherency */
@@ -255,7 +254,7 @@ int _starpu_fetch_task_input(struct _starpu_job *j);
 
 unsigned _starpu_is_data_present_or_requested(struct _starpu_data_state *state, unsigned node);
 
-unsigned _starpu_select_src_node(struct _starpu_data_state *state, unsigned destination);
+int _starpu_select_src_node(struct _starpu_data_state *state, unsigned destination);
 
 /* is_prefetch is whether the DSM may drop the request (when there is not enough memory for instance
  * async is whether the caller wants a reference on the last request, to be

+ 6 - 4
src/datawizard/copy_driver.c

@@ -163,7 +163,7 @@ static int copy_data_1_to_1_generic(starpu_data_handle_t handle,
 			cures = cudaEventCreateWithFlags(&req->async_channel.event.cuda_event, cudaEventDisableTiming);
 			if (STARPU_UNLIKELY(cures != cudaSuccess)) STARPU_CUDA_REPORT_ERROR(cures);
 
-			stream = starpu_cuda_get_out_transfer_stream(src_node);
+			stream = starpu_cuda_get_local_out_transfer_stream();
 			if (copy_methods->cuda_to_ram_async)
 				ret = copy_methods->cuda_to_ram_async(src_interface, src_node, dst_interface, dst_node, stream);
 			else
@@ -199,7 +199,7 @@ static int copy_data_1_to_1_generic(starpu_data_handle_t handle,
 			if (STARPU_UNLIKELY(cures != cudaSuccess))
 				STARPU_CUDA_REPORT_ERROR(cures);
 
-			stream = starpu_cuda_get_in_transfer_stream(dst_node);
+			stream = starpu_cuda_get_local_in_transfer_stream();
 			if (copy_methods->ram_to_cuda_async)
 				ret = copy_methods->ram_to_cuda_async(src_interface, src_node, dst_interface, dst_node, stream);
 			else
@@ -493,6 +493,8 @@ int STARPU_ATTRIBUTE_WARN_UNUSED_RESULT _starpu_driver_copy_data_1_to_1(starpu_d
 			req->com_id = com_id;
 #endif
 
+		dst_replicate->initialized = 1;
+
 		_STARPU_TRACE_START_DRIVER_COPY(src_node, dst_node, size, com_id);
 		ret_copy = copy_data_1_to_1_generic(handle, src_replicate, dst_replicate, req);
 		if (!req)
@@ -531,7 +533,7 @@ int starpu_interface_copy(uintptr_t src, size_t src_offset, unsigned src_node, u
 				(void*) src + src_offset, src_node,
 				(void*) dst + dst_offset, dst_node,
 				size,
-				async_channel?starpu_cuda_get_out_transfer_stream(src_node):NULL,
+				async_channel?starpu_cuda_get_local_out_transfer_stream():NULL,
 				cudaMemcpyDeviceToHost);
 
 	case _STARPU_MEMORY_NODE_TUPLE(STARPU_CPU_RAM,STARPU_CUDA_RAM):
@@ -539,7 +541,7 @@ int starpu_interface_copy(uintptr_t src, size_t src_offset, unsigned src_node, u
 				(void*) src + src_offset, src_node,
 				(void*) dst + dst_offset, dst_node,
 				size,
-				async_channel?starpu_cuda_get_in_transfer_stream(dst_node):NULL,
+				async_channel?starpu_cuda_get_local_in_transfer_stream():NULL,
 				cudaMemcpyHostToDevice);
 
 	case _STARPU_MEMORY_NODE_TUPLE(STARPU_CUDA_RAM,STARPU_CUDA_RAM):

+ 1 - 0
src/datawizard/filters.c

@@ -220,6 +220,7 @@ void starpu_data_partition(starpu_data_handle_t initial_handle, struct starpu_da
 			child_replicate->refcnt = 0;
 			child_replicate->memory_node = node;
 			child_replicate->relaxed_coherency = 0;
+			child_replicate->initialized = initial_replicate->initialized;
 
 			/* update the interface */
 			void *initial_interface = starpu_data_get_interface_on_node(initial_handle, node);

+ 2 - 0
src/datawizard/interfaces/data_interface.c

@@ -246,12 +246,14 @@ static void _starpu_register_new_data(starpu_data_handle_t handle,
 			replicate->state = STARPU_OWNER;
 			replicate->allocated = 1;
 			replicate->automatically_allocated = 0;
+			replicate->initialized = 1;
 		}
 		else
 		{
 			/* the value is not available here yet */
 			replicate->state = STARPU_INVALID;
 			replicate->allocated = 0;
+			replicate->initialized = 0;
 		}
 	}
 

+ 4 - 1
src/datawizard/memalloc.c

@@ -697,8 +697,10 @@ size_t _starpu_memory_reclaim_generic(unsigned node, unsigned force, size_t recl
 	if (reclaim && !force)
 	{
 		static int warned;
+		char name[32];
+		_starpu_memory_node_get_name(node, name, sizeof(name));
 		if (!warned) {
-			_STARPU_DISP("Not enough memory left on node %u. Trying to purge %lu bytes out. This message will not be printed again for further purges\n", node, (unsigned long) reclaim);
+			_STARPU_DISP("Not enough memory left on node %s. Your application working set is probably simply just hard to fit in the devices, but StarPU will cope with it by trying to purge %lu bytes out. This message will not be printed again for further purges\n", name, (unsigned long) reclaim);
 			warned = 1;
 		}
 	}
@@ -789,6 +791,7 @@ void _starpu_request_mem_chunk_removal(starpu_data_handle_t handle, struct _star
 	mc->replicate = NULL;
 	replicate->allocated = 0;
 	replicate->automatically_allocated = 0;
+	replicate->initialized = 0;
 
 	_starpu_spin_lock(&mc_lock[node]);
 

+ 31 - 0
src/datawizard/memory_nodes.c

@@ -106,6 +106,37 @@ unsigned starpu_memory_nodes_get_count(void)
 	return descr.nnodes;
 }
 
+void _starpu_memory_node_get_name(unsigned node, char *name, int size)
+{
+	const char *prefix;
+	switch (descr.nodes[node]) {
+	case STARPU_CPU_RAM:
+		prefix = "RAM";
+		break;
+	case STARPU_CUDA_RAM:
+		prefix = "CUDA";
+		break;
+	case STARPU_OPENCL_RAM:
+		prefix = "OpenCL";
+		break;
+	case STARPU_DISK_RAM:
+		prefix = "Disk";
+		break;
+	case STARPU_MIC_RAM:
+		prefix = "MIC";
+		break;
+	case STARPU_SCC_RAM:
+		prefix = "SCC_RAM";
+		break;
+	case STARPU_SCC_SHM:
+		prefix = "SCC_shared";
+		break;
+	case STARPU_UNUSED:
+		STARPU_ASSERT(0);
+	}
+	snprintf(name, size, "%s %u\n", prefix, descr.devid[node]);
+}
+
 unsigned _starpu_memory_node_register(enum starpu_node_kind kind, int devid)
 {
 	unsigned node;

+ 2 - 1
src/datawizard/memory_nodes.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2009-2012  Université de Bordeaux 1
+ * Copyright (C) 2009-2012, 2014  Université de Bordeaux 1
  * Copyright (C) 2010, 2011, 2013  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -82,6 +82,7 @@ unsigned _starpu_memory_node_register(enum starpu_node_kind kind, int devid);
 void _starpu_memory_node_register_condition(starpu_pthread_cond_t *cond, starpu_pthread_mutex_t *mutex, unsigned memory_node);
 
 int _starpu_memory_node_get_devid(unsigned node);
+void _starpu_memory_node_get_name(unsigned node, char *name, int size);
 
 struct _starpu_memory_node_descr *_starpu_memory_node_get_description(void);
 

+ 14 - 0
src/debug/traces/starpu_fxt.c

@@ -780,6 +780,16 @@ static void handle_worker_status(struct fxt_ev_64 *ev, struct starpu_fxt_options
 
 static double last_sleep_start[STARPU_NMAXWORKERS];
 
+static void handle_start_scheduling(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
+{
+	int worker;
+	worker = find_worker_id(ev->param[0]);
+	if (worker < 0) return;
+
+	if (out_paje_file)
+		worker_set_state(get_event_time_stamp(ev, options), options->file_prefix, ev->param[0], "Sc");
+}
+
 static void handle_start_sleep(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
 {
 	int worker;
@@ -1492,6 +1502,10 @@ void starpu_fxt_parse_new_file(char *filename_in, struct starpu_fxt_options *opt
 				handle_worker_status(&ev, options, "B");
 				break;
 
+			case _STARPU_FUT_WORKER_SCHEDULING_START:
+				handle_start_scheduling(&ev, options);
+				break;
+
 			case _STARPU_FUT_WORKER_SLEEP_START:
 				handle_start_sleep(&ev, options);
 				break;

+ 5 - 1
src/debug/traces/starpu_paje.c

@@ -164,6 +164,7 @@ void _starpu_fxt_write_paje_header(FILE *file)
 	poti_DefineEntityValue("Po", "S", "PushingOutput", "0.1 1.0 1.0");
 	poti_DefineEntityValue("C", "S", "Callback", ".0 .3 .8");
 	poti_DefineEntityValue("B", "S", "Overhead", ".5 .18 .0");
+	poti_DefineEntityValue("Sc", "S", "Scheduling", ".7 .36 .0");
 	poti_DefineEntityValue("Sl", "S", "Sleeping", ".9 .1 .0");
 	poti_DefineEntityValue("P", "S", "Progressing", ".4 .1 .6");
 	poti_DefineEntityValue("U", "S", "Unpartitioning", ".0 .0 1.0");
@@ -192,6 +193,7 @@ void _starpu_fxt_write_paje_header(FILE *file)
 		poti_DefineEntityValue("Po", ctx, "PushingOutput", "0.1 1.0 1.0");
 		poti_DefineEntityValue("C", ctx, "Callback", ".0 .3 .8");
 		poti_DefineEntityValue("B", ctx, "Overhead", ".5 .18 .0");
+		poti_DefineEntityValue("Sc", ctx, "Scheduling", ".7 .36 .0");
 		poti_DefineEntityValue("Sl", ctx, "Sleeping", ".9 .1 .0");
 		poti_DefineEntityValue("P", ctx, "Progressing", ".4 .1 .6");
 		poti_DefineEntityValue("U", ctx, "Unpartitioning", ".0 .0 1.0");
@@ -234,6 +236,7 @@ void _starpu_fxt_write_paje_header(FILE *file)
 6       Po       S      PushingOutput       \"0.1 1.0 1.0\"            \n\
 6       C       S       Callback       \".0 .3 .8\"            \n\
 6       B       S       Overhead         \".5 .18 .0\"		\n\
+6       Sc       S      Scheduling         \".7 .36 .0\"		\n\
 6       Sl       S      Sleeping         \".9 .1 .0\"		\n\
 6       P       S       Progressing         \".4 .1 .6\"		\n\
 6       U       S       Unpartitioning      \".0 .0 1.0\"		\n\
@@ -255,10 +258,11 @@ void _starpu_fxt_write_paje_header(FILE *file)
 6       Po       Ctx%u      PushingOutput       \"0.1 1.0 1.0\"            \n\
 6       C       Ctx%u       Callback       \".0 .3 .8\"            \n\
 6       B       Ctx%u       Overhead         \".5 .18 .0\"		\n\
+6       Sc       Ctx%u      Scheduling         \".7 .36 .0\"		\n\
 6       Sl       Ctx%u      Sleeping         \".9 .1 .0\"		\n\
 6       P       Ctx%u       Progressing         \".4 .1 .6\"		\n\
 6       U       Ctx%u       Unpartitioning         \".0 .0 1.0\"		\n",
-		i, i, i, i, i, i, i, i, i);
+		i, i, i, i, i, i, i, i, i, i);
 	fprintf(file, "\
 6       A       MS      Allocating         \".4 .1 .0\"		\n\
 6       Ar       MS      AllocatingReuse       \".1 .1 .8\"		\n\

+ 49 - 20
src/drivers/cuda/driver_cuda.c

@@ -44,7 +44,10 @@ static size_t global_mem[STARPU_MAXCUDADEVS];
 static cudaStream_t streams[STARPU_NMAXWORKERS];
 static cudaStream_t out_transfer_streams[STARPU_MAXCUDADEVS];
 static cudaStream_t in_transfer_streams[STARPU_MAXCUDADEVS];
-static cudaStream_t peer_transfer_streams[STARPU_MAXCUDADEVS][STARPU_MAXCUDADEVS];
+/* Note: streams are not thread-safe, so we define them for each CUDA worker
+ * emitting a GPU-GPU transfer */
+static cudaStream_t in_peer_transfer_streams[STARPU_MAXCUDADEVS][STARPU_MAXCUDADEVS];
+static cudaStream_t out_peer_transfer_streams[STARPU_MAXCUDADEVS][STARPU_MAXCUDADEVS];
 static struct cudaDeviceProp props[STARPU_MAXCUDADEVS];
 static cudaEvent_t task_events[STARPU_NMAXWORKERS];
 #endif /* STARPU_USE_CUDA */
@@ -116,26 +119,44 @@ static void _starpu_cuda_limit_gpu_mem_if_needed(unsigned devid)
 }
 
 #ifdef STARPU_USE_CUDA
-cudaStream_t starpu_cuda_get_in_transfer_stream(unsigned node)
+cudaStream_t starpu_cuda_get_local_in_transfer_stream()
 {
-	int devid = _starpu_memory_node_get_devid(node);
+	int worker = starpu_worker_get_id();
+	int devid = starpu_worker_get_devid(worker);
+	cudaStream_t stream;
 
-	return in_transfer_streams[devid];
+	stream = in_transfer_streams[devid];
+	STARPU_ASSERT(stream);
+	return stream;
 }
 
-cudaStream_t starpu_cuda_get_out_transfer_stream(unsigned node)
+cudaStream_t starpu_cuda_get_local_out_transfer_stream()
 {
-	int devid = _starpu_memory_node_get_devid(node);
+	int worker = starpu_worker_get_id();
+	int devid = starpu_worker_get_devid(worker);
+	cudaStream_t stream;
 
-	return out_transfer_streams[devid];
+	stream = out_transfer_streams[devid];
+	STARPU_ASSERT(stream);
+	return stream;
 }
 
 cudaStream_t starpu_cuda_get_peer_transfer_stream(unsigned src_node, unsigned dst_node)
 {
+	int worker = starpu_worker_get_id();
+	int devid = starpu_worker_get_devid(worker);
 	int src_devid = _starpu_memory_node_get_devid(src_node);
 	int dst_devid = _starpu_memory_node_get_devid(dst_node);
+	cudaStream_t stream;
 
-	return peer_transfer_streams[src_devid][dst_devid];
+	STARPU_ASSERT(devid == src_devid || devid == dst_devid);
+
+	if (devid == dst_devid)
+		stream = in_peer_transfer_streams[src_devid][dst_devid];
+	else
+		stream = out_peer_transfer_streams[src_devid][dst_devid];
+	STARPU_ASSERT(stream);
+	return stream;
 }
 
 cudaStream_t starpu_cuda_get_local_stream(void)
@@ -262,19 +283,22 @@ static void init_context(struct _starpu_worker_set *worker_set, unsigned devid)
 		cures = cudaStreamCreate(&streams[workerid]);
 		if (STARPU_UNLIKELY(cures))
 			STARPU_CUDA_REPORT_ERROR(cures);
+	}
 
-		cures = cudaStreamCreate(&in_transfer_streams[devid]);
-		if (STARPU_UNLIKELY(cures))
-			STARPU_CUDA_REPORT_ERROR(cures);
+	cures = cudaStreamCreate(&in_transfer_streams[devid]);
+	if (STARPU_UNLIKELY(cures))
+		STARPU_CUDA_REPORT_ERROR(cures);
 
-		cures = cudaStreamCreate(&out_transfer_streams[devid]);
-		if (STARPU_UNLIKELY(cures))
-			STARPU_CUDA_REPORT_ERROR(cures);
-	}
+	cures = cudaStreamCreate(&out_transfer_streams[devid]);
+	if (STARPU_UNLIKELY(cures))
+		STARPU_CUDA_REPORT_ERROR(cures);
 
 	for (i = 0; i < ncudagpus; i++)
 	{
-		cures = cudaStreamCreate(&peer_transfer_streams[i][devid]);
+		cures = cudaStreamCreate(&in_peer_transfer_streams[i][devid]);
+		if (STARPU_UNLIKELY(cures))
+			STARPU_CUDA_REPORT_ERROR(cures);
+		cures = cudaStreamCreate(&out_peer_transfer_streams[devid][i]);
 		if (STARPU_UNLIKELY(cures))
 			STARPU_CUDA_REPORT_ERROR(cures);
 	}
@@ -284,7 +308,8 @@ static void deinit_context(struct _starpu_worker_set *worker_set)
 {
 	cudaError_t cures;
 	unsigned i;
-	int workerid, devid;
+	int workerid = worker_set->workers[0].workerid;
+	int devid = starpu_worker_get_devid(workerid);
 
 	for (i = 0; i < worker_set->nworkers; i++)
 	{
@@ -293,12 +318,16 @@ static void deinit_context(struct _starpu_worker_set *worker_set)
 
 		cudaEventDestroy(task_events[workerid]);
 		cudaStreamDestroy(streams[workerid]);
-		cudaStreamDestroy(in_transfer_streams[devid]);
-		cudaStreamDestroy(out_transfer_streams[devid]);
 	}
 
+	cudaStreamDestroy(in_transfer_streams[devid]);
+	cudaStreamDestroy(out_transfer_streams[devid]);
+
 	for (i = 0; i < ncudagpus; i++)
-		cudaStreamDestroy(peer_transfer_streams[i][devid]);
+	{
+		cudaStreamDestroy(in_peer_transfer_streams[i][devid]);
+		cudaStreamDestroy(out_peer_transfer_streams[devid][i]);
+	}
 
 	/* cleanup the runtime API internal stuffs (which CUBLAS is using) */
 	cures = cudaThreadExit();

+ 2 - 2
src/drivers/cuda/driver_cuda.h

@@ -48,8 +48,8 @@ void *_starpu_cuda_worker(void *);
 #  define _starpu_cuda_discover_devices(config) ((void) config)
 #endif
 #ifdef STARPU_USE_CUDA
-cudaStream_t starpu_cuda_get_in_transfer_stream(unsigned node);
-cudaStream_t starpu_cuda_get_out_transfer_stream(unsigned node);
+cudaStream_t starpu_cuda_get_local_in_transfer_stream(void);
+cudaStream_t starpu_cuda_get_local_out_transfer_stream(void);
 cudaStream_t starpu_cuda_get_peer_transfer_stream(unsigned src_node, unsigned dst_node);
 
 struct _starpu_worker_set;

+ 60 - 10
src/drivers/driver_common/driver_common.c

@@ -155,6 +155,17 @@ void _starpu_driver_update_job_feedback(struct _starpu_job *j, struct _starpu_wo
 	}
 }
 
+static void _starpu_worker_set_status_scheduling(int workerid)
+{
+	if (_starpu_worker_get_status(workerid) != STATUS_SLEEPING
+		&& _starpu_worker_get_status(workerid) != STATUS_SCHEDULING)
+	{
+		_STARPU_TRACE_WORKER_SCHEDULING_START;
+		_starpu_worker_set_status(workerid, STATUS_SCHEDULING);
+	}
+
+}
+
 static void _starpu_worker_set_status_sleeping(int workerid)
 {
 	if ( _starpu_worker_get_status(workerid) == STATUS_WAKING_UP)
@@ -195,18 +206,57 @@ static void _starpu_exponential_backoff(struct _starpu_worker *args)
 /* Workers may block when there is no work to do at all. */
 struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *args, int workerid, unsigned memnode)
 {
-	struct starpu_task *task;
-
 	STARPU_PTHREAD_MUTEX_LOCK(&args->sched_mutex);
-	if(args->parallel_sect)
+	struct starpu_task *task;
+	unsigned needed = 1;
+	_starpu_worker_set_status_scheduling(workerid);
+	while(needed)
 	{
-		STARPU_PTHREAD_MUTEX_LOCK(&args->parallel_sect_mutex);
-		STARPU_PTHREAD_MUTEX_UNLOCK(&args->sched_mutex);
-		_starpu_sched_ctx_signal_worker_blocked(args->workerid);
-		STARPU_PTHREAD_COND_WAIT(&args->parallel_sect_cond, &args->parallel_sect_mutex);
-		starpu_sched_ctx_bind_current_thread_to_cpuid(args->bindid);
-		STARPU_PTHREAD_MUTEX_UNLOCK(&args->parallel_sect_mutex);
-		args->parallel_sect = 0;
+		struct _starpu_sched_ctx *sched_ctx = NULL;
+		struct _starpu_sched_ctx_list *l = NULL;
+		for (l = args->sched_ctx_list; l; l = l->next)
+		{
+			sched_ctx = _starpu_get_sched_ctx_struct(l->sched_ctx);
+			if(sched_ctx && sched_ctx->id > 0 && sched_ctx->id < STARPU_NMAX_SCHED_CTXS)
+			{
+				STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->parallel_sect_mutex[workerid]);
+				if(sched_ctx->parallel_sect[workerid])
+				{
+					/* don't let the worker sleep with the sched_mutex taken */
+					/* we need it until here bc of the list of ctxs of the workers
+					   that can change in another thread */
+					STARPU_PTHREAD_MUTEX_UNLOCK(&args->sched_mutex);
+					needed = 0;
+					_starpu_sched_ctx_signal_worker_blocked(sched_ctx->id, workerid);
+					STARPU_PTHREAD_COND_WAIT(&sched_ctx->parallel_sect_cond[workerid], &sched_ctx->parallel_sect_mutex[workerid]);
+					_starpu_sched_ctx_signal_worker_woke_up(sched_ctx->id, workerid);
+					sched_ctx->parallel_sect[workerid] = 0;
+					STARPU_PTHREAD_MUTEX_LOCK(&args->sched_mutex);
+				}
+				STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->parallel_sect_mutex[workerid]);
+			}
+			if(!needed)
+				break;
+		}
+		/* don't worry if the value is not correct (no lock) it will do it next time */
+		if(args->tmp_sched_ctx != -1)
+		{
+			sched_ctx = _starpu_get_sched_ctx_struct(args->tmp_sched_ctx);
+			STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->parallel_sect_mutex[workerid]);
+			if(sched_ctx->parallel_sect[workerid])
+			{
+//				needed = 0;
+				STARPU_PTHREAD_MUTEX_UNLOCK(&args->sched_mutex);
+				_starpu_sched_ctx_signal_worker_blocked(sched_ctx->id, workerid);
+				STARPU_PTHREAD_COND_WAIT(&sched_ctx->parallel_sect_cond[workerid], &sched_ctx->parallel_sect_mutex[workerid]);
+				_starpu_sched_ctx_signal_worker_woke_up(sched_ctx->id, workerid);
+				sched_ctx->parallel_sect[workerid] = 0;
+				STARPU_PTHREAD_MUTEX_LOCK(&args->sched_mutex);
+			}
+			STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->parallel_sect_mutex[workerid]);
+		}
+
+		needed = !needed;
 	}
 
 	task = _starpu_pop_task(args);

+ 2 - 2
src/drivers/opencl/driver_opencl.c

@@ -516,9 +516,9 @@ void _starpu_opencl_init(void)
 				else
 					_STARPU_DEBUG("Platform invalid\n");
 #endif
-				if (platform_valid)
+				if (platform_valid && nb_devices <= STARPU_MAXOPENCLDEVS)
 				{
-					err = clGetDeviceIDs(platform_id[j], device_type, STARPU_MAXOPENCLDEVS-nb_devices, &devices[nb_devices], &num);
+					err = clGetDeviceIDs(platform_id[j], device_type, STARPU_MAXOPENCLDEVS-nb_devices, STARPU_MAXOPENCLDEVS == nb_devices ? NULL : &devices[nb_devices], &num);
 					if (err == CL_DEVICE_NOT_FOUND)
 					{
 						_STARPU_DEBUG("  No devices detected on this platform\n");

+ 47 - 0
src/starpu_smpi.xslt

@@ -0,0 +1,47 @@
+<!--
+StarPU   Runtime system for heterogeneous multicore architectures.
+
+Copyright (C) 2014  Université de Bordeaux 1
+
+StarPU is free software; you can redistribute it and/or modify
+it under the terms of the GNU Lesser General Public License as published by
+the Free Software Foundation; either version 2.1 of the License, or (at
+your option) any later version.
+
+StarPU is distributed in the hope that it will be useful, but
+WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+
+See the GNU Lesser General Public License in COPYING.LGPL for more details.
+-->
+
+<xsl:stylesheet version="1.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform">
+
+	<xsl:output doctype-system="http://simgrid.gforge.inria.fr/simgrid.dtd"/>
+
+     <!-- Add doctype 
+     <xsl:text>&lt;!DOCTYPE platform SYSTEM 'http://simgrid.gforge.inria.fr/simgrid.dtd'&gt;</xsl:text>
+
+-->
+    <!-- Copy everything by default but keep applying templates.  -->
+    <xsl:template match="platform|AS|host|link|@*">
+        <xsl:copy>
+            <xsl:apply-templates select="node()|@*"/>
+        </xsl:copy>
+    </xsl:template>
+
+    <!-- Replace AS name.  -->
+    <xsl:template match="platform/AS/@id">
+        <xsl:attribute name="id">
+            <xsl:value-of select="$ASname"/>
+        </xsl:attribute>
+    </xsl:template>
+
+    <!-- Prepend AS name to host names.  -->
+    <xsl:template match="platform/AS/host/@id">
+	    <xsl:attribute name="id"><xsl:value-of select="$ASname"/>-<xsl:value-of select="."/></xsl:attribute>
+    </xsl:template>
+
+</xsl:stylesheet>
+
+

+ 1 - 0
tests/Makefile.am

@@ -173,6 +173,7 @@ noinst_PROGRAMS =				\
 	datawizard/mpi_like			\
 	datawizard/mpi_like_async		\
 	datawizard/critical_section_with_void_interface\
+	datawizard/increment_init		\
 	datawizard/increment_redux		\
 	datawizard/increment_redux_v2		\
 	datawizard/increment_redux_lazy		\

+ 214 - 0
tests/datawizard/increment_init.c

@@ -0,0 +1,214 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2010, 2012-2014  Université de Bordeaux 1
+ * Copyright (C) 2010, 2011, 2012  Centre National de la Recherche Scientifique
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <config.h>
+#include <starpu.h>
+#include "../helper.h"
+
+static starpu_data_handle_t handle;
+
+/*
+ *	Reduction methods
+ */
+
+#ifdef STARPU_USE_CUDA
+static void neutral_cuda_kernel(void *descr[], void *arg)
+{
+	STARPU_SKIP_IF_VALGRIND;
+
+	unsigned *dst = (unsigned *)STARPU_VARIABLE_GET_PTR(descr[0]);
+
+	/* This is a dummy technique of course */
+	unsigned host_dst = 0;
+	cudaMemcpyAsync(dst, &host_dst, sizeof(unsigned), cudaMemcpyHostToDevice, starpu_cuda_get_local_stream());
+	cudaStreamSynchronize(starpu_cuda_get_local_stream());
+}
+#endif
+
+#ifdef STARPU_USE_OPENCL
+static void neutral_opencl_kernel(void *descr[], void *arg)
+{
+	STARPU_SKIP_IF_VALGRIND;
+
+	unsigned h_dst = 0;
+	cl_mem d_dst = (cl_mem)STARPU_VARIABLE_GET_PTR(descr[0]);
+
+	cl_command_queue queue;
+	starpu_opencl_get_current_queue(&queue);
+
+	clEnqueueWriteBuffer(queue, d_dst, CL_TRUE, 0, sizeof(unsigned), (void *)&h_dst, 0, NULL, NULL);
+	clFinish(queue);
+}
+#endif
+
+
+
+static void neutral_cpu_kernel(void *descr[], void *arg)
+{
+	STARPU_SKIP_IF_VALGRIND;
+
+	unsigned *dst = (unsigned *)STARPU_VARIABLE_GET_PTR(descr[0]);
+	*dst = 0;
+}
+
+static struct starpu_codelet neutral_cl =
+{
+#ifdef STARPU_USE_CUDA
+	.cuda_funcs = {neutral_cuda_kernel, NULL},
+#endif
+#ifdef STARPU_USE_OPENCL
+	.opencl_funcs = {neutral_opencl_kernel, NULL},
+#endif
+	.cpu_funcs = {neutral_cpu_kernel, NULL},
+	.modes = {STARPU_W},
+	.nbuffers = 1
+};
+
+/*
+ *	Increment codelet
+ */
+
+#ifdef STARPU_USE_OPENCL
+/* dummy OpenCL implementation */
+static void increment_opencl_kernel(void *descr[], void *cl_arg STARPU_ATTRIBUTE_UNUSED)
+{
+	STARPU_SKIP_IF_VALGRIND;
+
+	cl_mem d_token = (cl_mem)STARPU_VARIABLE_GET_PTR(descr[0]);
+	unsigned h_token;
+
+	cl_command_queue queue;
+	starpu_opencl_get_current_queue(&queue);
+
+	clEnqueueReadBuffer(queue, d_token, CL_TRUE, 0, sizeof(unsigned), (void *)&h_token, 0, NULL, NULL);
+	h_token++;
+	clEnqueueWriteBuffer(queue, d_token, CL_TRUE, 0, sizeof(unsigned), (void *)&h_token, 0, NULL, NULL);
+	clFinish(queue);
+}
+#endif
+
+
+#ifdef STARPU_USE_CUDA
+static void increment_cuda_kernel(void *descr[], void *arg)
+{
+	STARPU_SKIP_IF_VALGRIND;
+
+	unsigned *tokenptr = (unsigned *)STARPU_VARIABLE_GET_PTR(descr[0]);
+	unsigned host_token;
+
+	/* This is a dummy technique of course */
+	cudaMemcpyAsync(&host_token, tokenptr, sizeof(unsigned), cudaMemcpyDeviceToHost, starpu_cuda_get_local_stream());
+	cudaStreamSynchronize(starpu_cuda_get_local_stream());
+
+	host_token++;
+
+	cudaMemcpyAsync(tokenptr, &host_token, sizeof(unsigned), cudaMemcpyHostToDevice, starpu_cuda_get_local_stream());
+	cudaStreamSynchronize(starpu_cuda_get_local_stream());
+}
+#endif
+
+static void increment_cpu_kernel(void *descr[], void *arg)
+{
+	STARPU_SKIP_IF_VALGRIND;
+
+	unsigned *tokenptr = (unsigned *)STARPU_VARIABLE_GET_PTR(descr[0]);
+	*tokenptr = *tokenptr + 1;
+}
+
+static struct starpu_codelet increment_cl =
+{
+#ifdef STARPU_USE_CUDA
+	.cuda_funcs = {increment_cuda_kernel, NULL},
+#endif
+#ifdef STARPU_USE_OPENCL
+	.opencl_funcs = {increment_opencl_kernel, NULL},
+#endif
+	.cpu_funcs = {increment_cpu_kernel, NULL},
+	.nbuffers = 1,
+	.modes = {STARPU_RW}
+};
+
+int main(int argc, char **argv)
+{
+	unsigned *pvar;
+	int ret;
+
+	ret = starpu_init(NULL);
+	if (ret == -ENODEV) return STARPU_TEST_SKIPPED;
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
+
+	starpu_variable_data_register(&handle, -1, 0, sizeof(unsigned));
+
+	starpu_data_set_reduction_methods(handle, NULL, &neutral_cl);
+
+#ifdef STARPU_QUICK_CHECK
+	unsigned ntasks = 32;
+	unsigned nloops = 4;
+#else
+	unsigned ntasks = 1024;
+	unsigned nloops = 16;
+#endif
+
+	unsigned loop;
+	unsigned t;
+
+	for (loop = 0; loop < nloops; loop++)
+	{
+		for (t = 0; t < ntasks; t++)
+		{
+			struct starpu_task *task = starpu_task_create();
+
+			task->cl = &increment_cl;
+			task->handles[0] = handle;
+
+			ret = starpu_task_submit(task);
+			if (ret == -ENODEV) goto enodev;
+			STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");
+		}
+
+		ret = starpu_data_acquire(handle, STARPU_R);
+		pvar = starpu_data_handle_to_pointer(handle, 0);
+		STARPU_CHECK_RETURN_VALUE(ret, "starpu_data_acquire");
+		if (*pvar != ntasks)
+		{
+			FPRINTF(stderr, "[end of loop] Value %u != Expected value %u\n", *pvar, ntasks * (loop+1));
+			starpu_data_release(handle);
+			starpu_data_unregister(handle);
+			goto err;
+		}
+		starpu_data_release(handle);
+		starpu_data_invalidate(handle);
+	}
+
+	starpu_data_unregister(handle);
+	starpu_shutdown();
+
+	return EXIT_SUCCESS;
+
+enodev:
+	starpu_data_unregister(handle);
+	fprintf(stderr, "WARNING: No one can execute this task\n");
+	/* yes, we do not perform the computation but we did detect that no one
+ 	 * could perform the kernel, so this is not an error from StarPU */
+	starpu_shutdown();
+	return STARPU_TEST_SKIPPED;
+
+err:
+	starpu_shutdown();
+	STARPU_RETURN(EXIT_FAILURE);
+
+}

+ 48 - 4
tests/main/codelet_null_callback.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2013  Centre National de la Recherche Scientifique
+ * Copyright (C) 2013, 2014  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -18,17 +18,33 @@
 #include "../helper.h"
 
 static
+int expected_x=40;
+static
+int expected_y=12;
+
+static
 void callback(void *ptr)
 {
      int *x = (int *)ptr;
      FPRINTF(stderr, "x=%d\n", *x);
-     STARPU_ASSERT(*x == 42);
+     STARPU_ASSERT_MSG(*x == expected_x, "%d != %d\n", *x, expected_x);
+     (*x)++;
+}
+
+static
+void prologue_callback(void *ptr)
+{
+     int *y = (int *)ptr;
+     FPRINTF(stderr, "y=%d\n", *y);
+     STARPU_ASSERT_MSG(*y == expected_y, "%d != %d\n", *y, expected_y);
+     (*y)++;
 }
 
 int main(int argc, char **argv)
 {
 	int ret;
-	int x=42;
+	int x=40;
+	int y=12;
 
 	ret = starpu_initialize(NULL, &argc, &argv);
 	if (ret == -ENODEV) return STARPU_TEST_SKIPPED;
@@ -39,7 +55,35 @@ int main(int argc, char **argv)
 				 0);
 	STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_insert");
 
-	starpu_task_wait_for_all();
+	expected_x ++;
+	ret = starpu_task_insert(NULL,
+				 STARPU_CALLBACK, callback,
+				 STARPU_CALLBACK_ARG, &x,
+				 0);
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_insert");
+
+	expected_x ++;
+	STARPU_ASSERT_MSG(x == expected_x, "x should be equal to %d and not %d\n", expected_x, x);
+
+	ret = starpu_task_insert(NULL,
+				 STARPU_PROLOGUE_CALLBACK, prologue_callback,
+				 STARPU_PROLOGUE_CALLBACK_ARG, &y,
+				 0);
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_insert");
+
+#warning the following code should work
+#if 0
+	expected_y ++;
+	ret = starpu_task_insert(NULL,
+				 STARPU_PROLOGUE_CALLBACK_POP, prologue_callback,
+				 STARPU_PROLOGUE_CALLBACK_ARG, &y,
+				 0);
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_insert");
+#endif
+
+	expected_y ++;
+	STARPU_ASSERT_MSG(y == expected_y, "y should be equal to %d and not %d\n", expected_y, y);
+
 	starpu_shutdown();
 
 	return EXIT_SUCCESS;