Sfoglia il codice sorgente

Merge from trunk @ 8872:8927

Marc Sergent 12 anni fa
parent
commit
202d612ae3
47 ha cambiato i file con 644 aggiunte e 289 eliminazioni
  1. 7 0
      ChangeLog
  2. 7 2
      doc/chapters/advanced-api.texi
  3. 45 22
      doc/chapters/basic-api.texi
  4. 7 0
      doc/chapters/configuration.texi
  5. 2 1
      doc/chapters/mpi-support.texi
  6. 13 1
      doc/chapters/tips-tricks.texi
  7. 3 2
      examples/basic_examples/block.c
  8. 3 2
      examples/basic_examples/variable.c
  9. 5 5
      examples/interface/complex.c
  10. 7 3
      examples/interface/complex_codelet.h
  11. 7 4
      examples/interface/complex_interface.c
  12. 3 3
      examples/sched_ctx_utils/sched_ctx_utils.c
  13. 3 3
      examples/spmd/vector_scal_spmd.c
  14. 2 1
      examples/tag_example/tag_example.c
  15. 2 1
      examples/tag_example/tag_example2.c
  16. 2 1
      examples/tag_example/tag_example3.c
  17. 2 1
      examples/tag_example/tag_restartable.c
  18. 6 2
      include/starpu_data_interfaces.h
  19. 2 4
      include/starpu_stdlib.h
  20. 24 45
      mpi/examples/complex/mpi_complex.c
  21. 45 24
      mpi/src/starpu_mpi.c
  22. 14 11
      mpi/src/starpu_mpi_insert_task.c
  23. 5 2
      mpi/src/starpu_mpi_private.h
  24. 3 1
      mpi/tests/insert_task_owner2.c
  25. 2 0
      mpi/tests/mpi_detached_tag.c
  26. 2 0
      mpi/tests/mpi_irecv_detached.c
  27. 2 0
      mpi/tests/mpi_isend_detached.c
  28. 1 3
      mpi/tests/multiple_send.c
  29. 100 47
      mpi/tests/user_defined_datatype.c
  30. 1 1
      socl/examples/basic/basic.c
  31. 1 1
      socl/examples/mansched/mansched.c
  32. 1 1
      socl/examples/testmap/testmap.c
  33. 2 0
      src/common/starpu_spinlock.c
  34. 16 0
      src/common/starpu_spinlock.h
  35. 2 2
      src/core/dependencies/cg.h
  36. 1 1
      src/datawizard/data_request.c
  37. 7 2
      src/datawizard/filters.c
  38. 7 3
      src/datawizard/interfaces/data_interface.c
  39. 48 20
      src/datawizard/malloc.c
  40. 88 57
      src/datawizard/memalloc.c
  41. 3 3
      src/datawizard/memalloc.h
  42. 21 3
      src/drivers/cpu/driver_cpu.c
  43. 1 0
      tests/Makefile.am
  44. 87 0
      tests/datawizard/allocate.c
  45. 4 2
      tests/loader.c
  46. 2 2
      tools/gdbinit
  47. 26 0
      tools/valgrind/openmpi.suppr

+ 7 - 0
ChangeLog

@@ -92,6 +92,13 @@ New features:
     instead STARPU_LIMIT_CUDA_MEM and STARPU_LIMIT_OPENCL_MEM
   * Introduce new variables STARPU_LIMIT_CUDA_devid_MEM and
     STARPU_LIMIT_OPENCL_devid_MEM to limit memory per specific device
+  * Introduce new variable STARPU_LIMIT_CPU_MEM to limit memory for
+    the CPU devices
+  * Define new functions starpu_malloc_count and starpu_free_count to
+    be used for allocating memory up to the limits defined by the
+    environment variables STARPU_LIMIT_xxx (see above). When no memory
+    is left, starpu_malloc_count tries to reclaim memory from StarPU
+    and returns -ENOMEM on failure.
 
 Small features:
   * Add starpu_worker_get_by_type and starpu_worker_get_by_devid

+ 7 - 2
doc/chapters/advanced-api.texi

@@ -332,8 +332,13 @@ todo
 @item @code{struct starpu_multiformat_data_interface_ops* (*get_mf_ops)(void *data_interface)}
 todo
 
-@item @code{int (*pack_data)(starpu_data_handle_t handle, unsigned node, void **ptr, size_t *count)}
-Pack the data handle into a contiguous buffer at the address @code{ptr} and set the size of the newly created buffer in @code{count}
+@item @code{int (*pack_data)(starpu_data_handle_t handle, unsigned node, void **ptr, ssize_t *count)}
+Pack the data handle into a contiguous buffer at the address
+@code{ptr} and set the size of the newly created buffer in
+@code{count}. If @var{ptr} is @code{NULL}, the function should not copy the data in the
+buffer but just set @var{count} to the size of the buffer which
+would have been allocated. The special value @code{-1} indicates the
+size is yet unknown.
 
 @item @code{int (*unpack_data)(starpu_data_handle_t handle, unsigned node, void *ptr, size_t count)}
 Unpack the data handle from the contiguous buffer at the address @code{ptr} of size @var{count}

+ 45 - 22
doc/chapters/basic-api.texi

@@ -9,6 +9,7 @@
 @menu
 * Versioning::
 * Initialization and Termination::
+* Standard memory library::
 * Workers' Properties::
 * Data Management::
 * Data Interfaces::
@@ -235,6 +236,43 @@ Return 1 if asynchronous data transfers between CPU and OpenCL accelerators
 are disabled.
 @end deftypefun
 
+@node Standard memory library
+@section Standard memory library
+
+@deftypefun int starpu_malloc (void **@var{A}, size_t @var{dim})
+This function allocates data of the given size in main memory. It will also try to pin it in
+CUDA or OpenCL, so that data transfers from this buffer can be asynchronous, and
+thus permit data transfer and computation overlapping. The allocated buffer must
+be freed thanks to the @code{starpu_free} function.
+@end deftypefun
+
+@deftypefun void starpu_malloc_set_align (size_t @var{align})
+This functions sets an alignment constraints for @code{starpu_malloc}
+allocations. @var{align} must be a power of two. This is for instance called
+automatically by the OpenCL driver to specify its own alignment constraints.
+@end deftypefun
+
+@deftypefun int starpu_free (void *@var{A})
+This function frees memory which has previously allocated with
+@code{starpu_malloc}.
+@end deftypefun
+
+@deftypefun int starpu_malloc_count (void **@var{A}, size_t @var{dim})
+This function is similar to @code{starpu_malloc}. It only allocates
+memory up to the limit defined by the environment variables
+@code{STARPU_LIMIT_CUDA_devid_MEM}, @code{STARPU_LIMIT_CUDA_MEM},
+@code{STARPU_LIMIT_OPENCL_devid_MEM}, @code{STARPU_LIMIT_OPENCL_MEM}
+and @code{STARPU_LIMIT_CPU_MEM} (@pxref{Limit memory}). If no memory
+is available, it tries to reclaim memory from StarPU.
+Memory allocated through this function needs to be freed thanks to the
+@code{starpu_free_count} function.
+@end deftypefun
+
+@deftypefun int starpu_free_count (void *@var{A}, size_t @var{dim})
+This function frees memory which has previously allocated with
+@code{starpu_malloc_count}.
+@end deftypefun
+
 @node Workers' Properties
 @section Workers' Properties
 
@@ -400,24 +438,6 @@ data).
 @node Basic Data Management API
 @subsection Basic Data Management API
 
-@deftypefun int starpu_malloc (void **@var{A}, size_t @var{dim})
-This function allocates data of the given size in main memory. It will also try to pin it in
-CUDA or OpenCL, so that data transfers from this buffer can be asynchronous, and
-thus permit data transfer and computation overlapping. The allocated buffer must
-be freed thanks to the @code{starpu_free} function.
-@end deftypefun
-
-@deftypefun void starpu_malloc_set_align (size_t @var{align})
-This functions sets an alignment constraints for @code{starpu_malloc}
-allocations. @var{align} must be a power of two. This is for instance called
-automatically by the OpenCL driver to specify its own alignment constraints.
-@end deftypefun
-
-@deftypefun int starpu_free (void *@var{A})
-This function frees memory which has previously allocated with
-@code{starpu_malloc}.
-@end deftypefun
-
 @deftp {Data Type} {enum starpu_access_mode}
 This datatype describes a data access mode. The different available modes are:
 @table @asis
@@ -782,14 +802,17 @@ Return the unique identifier of the interface associated with the given @var{han
 Return the size of the data associated with @var{handle}
 @end deftypefun
 
-@deftypefun int starpu_handle_pack_data (starpu_data_handle_t @var{handle}, {void **}@var{ptr}, {size_t *}@var{count})
+@deftypefun int starpu_handle_pack_data (starpu_data_handle_t @var{handle}, {void **}@var{ptr}, {ssize_t *}@var{count})
 Execute the packing operation of the interface of the data registered
 at @var{handle} (@pxref{struct starpu_data_interface_ops}). This
 packing operation must allocate a buffer large enough at @var{ptr} and
 copy into the newly allocated buffer the data associated to
-@var{handle}.
-The function also sets @var{count} to the size of the data handle by calling
-@code{starpu_handle_get_size()}.
+@var{handle}. @var{count} will be set to the size of the allocated
+buffer.
+If @var{ptr} is @code{NULL}, the function should not copy the data in the
+buffer but just set @var{count} to the size of the buffer which
+would have been allocated. The special value @code{-1} indicates the
+size is yet unknown.
 @end deftypefun
 
 @deftypefun int starpu_handle_unpack_data (starpu_data_handle_t @var{handle}, {void *}@var{ptr}, size_t @var{count})

+ 7 - 0
doc/chapters/configuration.texi

@@ -627,6 +627,13 @@ intended to be used for experimental purposes as it emulates devices
 that have a limited amount of memory.
 @end defvr
 
+@defvr {Environment variable} STARPU_LIMIT_CPU_MEM
+This variable specifies the maximum number of megabytes that should be
+available to the application on each CPU device. This variable is
+intended to be used for experimental purposes as it emulates devices
+that have a limited amount of memory.
+@end defvr
+
 @defvr {Environment variable} STARPU_GENERATE_TRACE
 When set to @code{1}, this variable indicates that StarPU should automatically
 generate a Paje trace when @code{starpu_shutdown()} is called.

+ 2 - 1
doc/chapters/mpi-support.texi

@@ -179,7 +179,7 @@ handle.
 
 @cartouche
 @smallexample
-static int complex_pack_data(starpu_data_handle_t handle, unsigned node, void **ptr, size_t *count)
+static int complex_pack_data(starpu_data_handle_t handle, unsigned node, void **ptr, ssize_t *count)
 @{
   STARPU_ASSERT(starpu_data_test_if_allocated_on_node(handle, node));
 
@@ -309,6 +309,7 @@ The algorithm also includes a communication cache mechanism that
 allows not to send data twice to the same MPI node, unless the data
 has been modified. The cache can be disabled
 (@pxref{STARPU_MPI_CACHE}).
+@c todo parler plus du cache
 
 @end deftypefun
 

+ 13 - 1
doc/chapters/tips-tricks.texi

@@ -2,12 +2,13 @@
 
 @c This file is part of the StarPU Handbook.
 @c Copyright (C) 2009--2011  Universit@'e de Bordeaux 1
-@c Copyright (C) 2010, 2011, 2012  Centre National de la Recherche Scientifique
+@c Copyright (C) 2010, 2011, 2012, 2013  Centre National de la Recherche Scientifique
 @c Copyright (C) 2011 Institut National de Recherche en Informatique et Automatique
 @c See the file starpu.texi for copying conditions.
 
 @menu
 * Per-worker library initialization::  How to initialize a computation library once for each worker?
+* Limit memory::
 @end menu
 
 @node Per-worker library initialization
@@ -77,3 +78,14 @@ void starpu_cublas_init(void)
 @}
 @end smallexample
 @end cartouche
+
+@node Limit memory
+@section How to limit memory per node
+
+TODO
+
+Talk about
+@code{STARPU_LIMIT_CUDA_devid_MEM}, @code{STARPU_LIMIT_CUDA_MEM},
+@code{STARPU_LIMIT_OPENCL_devid_MEM}, @code{STARPU_LIMIT_OPENCL_MEM}
+and @code{STARPU_LIMIT_CPU_MEM}
+

+ 3 - 2
examples/basic_examples/block.c

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2010, 2011  Université de Bordeaux 1
- * Copyright (C) 2010, 2011, 2012  Centre National de la Recherche Scientifique
+ * Copyright (C) 2010, 2011, 2012, 2013  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -34,12 +34,13 @@ typedef void (*device_func)(void **, void *);
 
 int execute_on(uint32_t where, device_func func, float *block, int pnx, int pny, int pnz, float multiplier)
 {
-	struct starpu_codelet cl = {};
+	struct starpu_codelet cl;
 	starpu_data_handle_t block_handle;
         int i;
 
 	starpu_block_data_register(&block_handle, 0, (uintptr_t)block, pnx, pnx*pny, pnx, pny, pnz, sizeof(float));
 
+	starpu_codelet_init(&cl);
 	cl.where = where;
         cl.cuda_funcs[0] = func;
         cl.cpu_funcs[0] = func;

+ 3 - 2
examples/basic_examples/variable.c

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2010, 2011  Université de Bordeaux 1
- * Copyright (C) 2010, 2011, 2012  Centre National de la Recherche Scientifique
+ * Copyright (C) 2010, 2011, 2012, 2013  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -38,7 +38,7 @@ int main(int argc, char **argv)
 	unsigned i;
         float foo;
 	starpu_data_handle_t float_array_handle;
-	struct starpu_codelet cl = {};
+	struct starpu_codelet cl;
 	int ret;
 
 	ret = starpu_init(NULL);
@@ -59,6 +59,7 @@ int main(int argc, char **argv)
 	STARPU_CHECK_RETURN_VALUE(ret, "starpu_opencl_load_opencl_from_file");
 #endif
 
+	starpu_codelet_init(&cl);
         cl.cpu_funcs[0] = cpu_codelet;
 #ifdef STARPU_USE_CUDA
         cl.cuda_funcs[0] = cuda_codelet;

+ 5 - 5
examples/interface/complex.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2012  Centre National de la Recherche Scientifique
+ * Copyright (C) 2012, 2013  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -95,11 +95,11 @@ int main(int argc, char **argv)
 	starpu_complex_data_register(&handle1, 0, &real, &imaginary, 1);
 	starpu_complex_data_register(&handle2, 0, &copy_real, &copy_imaginary, 1);
 
-	ret = starpu_insert_task(&cl_display, STARPU_R, handle1, 0);
+	ret = starpu_insert_task(&cl_display, STARPU_VALUE, "handle1", strlen("handle1"), STARPU_R, handle1, 0);
 	if (ret == -ENODEV) goto end;
 	STARPU_CHECK_RETURN_VALUE(ret, "starpu_insert_task");
 
-	ret = starpu_insert_task(&cl_display, STARPU_R, handle2, 0);
+	ret = starpu_insert_task(&cl_display, STARPU_VALUE, "handle2", strlen("handle2"), STARPU_R, handle2, 0);
 	if (ret == -ENODEV) goto end;
 	STARPU_CHECK_RETURN_VALUE(ret, "starpu_insert_task");
 
@@ -124,11 +124,11 @@ int main(int argc, char **argv)
 	if (ret == -ENODEV) goto end;
 	STARPU_CHECK_RETURN_VALUE(ret, "starpu_insert_task");
 
-	ret = starpu_insert_task(&cl_display, STARPU_R, handle1, 0);
+	ret = starpu_insert_task(&cl_display, STARPU_VALUE, "handle1", strlen("handle1"), STARPU_R, handle1, 0);
 	if (ret == -ENODEV) goto end;
 	STARPU_CHECK_RETURN_VALUE(ret, "starpu_insert_task");
 
-	ret = starpu_insert_task(&cl_display, STARPU_R, handle2, 0);
+	ret = starpu_insert_task(&cl_display, STARPU_VALUE, "handle2", strlen("handle2"), STARPU_R, handle2, 0);
 	if (ret == -ENODEV) goto end;
 	STARPU_CHECK_RETURN_VALUE(ret, "starpu_insert_task");
 

+ 7 - 3
examples/interface/complex_codelet.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2012  Centre National de la Recherche Scientifique
+ * Copyright (C) 2012, 2013  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -56,16 +56,20 @@ struct starpu_codelet cl_compare =
 	.name = "cl_compare"
 };
 
-void display_complex_codelet(void *descr[], __attribute__ ((unused)) void *_args)
+void display_complex_codelet(void *descr[], void *_args)
 {
 	int nx = STARPU_COMPLEX_GET_NX(descr[0]);
 	double *real = STARPU_COMPLEX_GET_REAL(descr[0]);
 	double *imaginary = STARPU_COMPLEX_GET_IMAGINARY(descr[0]);
 	int i;
+	char msg[100];
+
+	if (_args)
+		starpu_codelet_unpack_args(_args, &msg);
 
 	for(i=0 ; i<nx ; i++)
 	{
-		fprintf(stderr, "Complex[%d] = %3.2f + %3.2f i\n", i, real[i], imaginary[i]);
+		fprintf(stderr, "[%s] Complex[%d] = %3.2f + %3.2f i\n", _args?msg:NULL, i, real[i], imaginary[i]);
 	}
 }
 

+ 7 - 4
examples/interface/complex_interface.c

@@ -118,7 +118,7 @@ static void *complex_handle_to_pointer(starpu_data_handle_t handle, unsigned nod
 	return (void*) complex_interface->real;
 }
 
-static int complex_pack_data(starpu_data_handle_t handle, unsigned node, void **ptr, size_t *count)
+static int complex_pack_data(starpu_data_handle_t handle, unsigned node, void **ptr, ssize_t *count)
 {
 	STARPU_ASSERT(starpu_data_test_if_allocated_on_node(handle, node));
 
@@ -126,9 +126,12 @@ static int complex_pack_data(starpu_data_handle_t handle, unsigned node, void **
 		starpu_data_get_interface_on_node(handle, node);
 
 	*count = complex_get_size(handle);
-	*ptr = malloc(*count);
-	memcpy(*ptr, complex_interface->real, complex_interface->nx*sizeof(double));
-	memcpy(*ptr+complex_interface->nx*sizeof(double), complex_interface->imaginary, complex_interface->nx*sizeof(double));
+	if (ptr != NULL)
+	{
+		*ptr = malloc(*count);
+		memcpy(*ptr, complex_interface->real, complex_interface->nx*sizeof(double));
+		memcpy(*ptr+complex_interface->nx*sizeof(double), complex_interface->imaginary, complex_interface->nx*sizeof(double));
+	}
 
 	return 0;
 }

+ 3 - 3
examples/sched_ctx_utils/sched_ctx_utils.c

@@ -204,13 +204,13 @@ void start_2ndbench(void (*bench)(unsigned, unsigned))
 
 void construct_contexts(void (*bench)(unsigned, unsigned))
 {
-	int nprocs1 = cpu1 + gpu + gpu1;
-	int nprocs2 = cpu2 + gpu + gpu2;
+	unsigned nprocs1 = cpu1 + gpu + gpu1;
+	unsigned nprocs2 = cpu2 + gpu + gpu2;
 	unsigned n_all_gpus = gpu + gpu1 + gpu2;
 
 
 	int procs[nprocs1];
-	int i;
+	unsigned i;
 	int k = 0;
 
 	for(i = 0; i < gpu; i++)

+ 3 - 3
examples/spmd/vector_scal_spmd.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2010, 2011, 2012  Centre National de la Recherche Scientifique
+ * Copyright (C) 2010, 2011, 2012, 2013  Centre National de la Recherche Scientifique
  * Copyright (C) 2010-2013  Université de Bordeaux 1
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -43,10 +43,10 @@ static int get_first_element_rank(int nel, int rank, int nb_workers)
 
 void scal_cpu_func(void *buffers[], void *_args)
 {
-	unsigned i;
+	int i;
 	float *factor = _args, f = *factor;
 	struct starpu_vector_interface *vector = buffers[0];
-	unsigned n = STARPU_VECTOR_GET_NX(vector);
+	int n = STARPU_VECTOR_GET_NX(vector);
 	float *val = (float *)STARPU_VECTOR_GET_PTR(vector);
 
 	int nb_workers = starpu_combined_worker_get_size();

+ 2 - 1
examples/tag_example/tag_example.c

@@ -37,7 +37,7 @@
 #define FPRINTF(ofile, fmt, args ...) do { if (!getenv("STARPU_SSILENT")) {fprintf(ofile, fmt, ##args); }} while(0)
 #define TAG(i, j, iter)	((starpu_tag_t) ( ((uint64_t)(iter)<<48) |  ((uint64_t)(j)<<24) | (i)) )
 
-struct starpu_codelet cl = {};
+struct starpu_codelet cl;
 
 #ifdef STARPU_QUICK_CHECK
 #define Ni	32
@@ -219,6 +219,7 @@ int main(int argc __attribute__((unused)) , char **argv __attribute__((unused)))
 
 	FPRINTF(stderr, "ITER: %u\n", nk);
 
+	starpu_codelet_init(&cl);
 	cl.cpu_funcs[0] = cpu_codelet;
 	cl.cuda_funcs[0] = cpu_codelet;
 	cl.opencl_funcs[0] = cpu_codelet;

+ 2 - 1
examples/tag_example/tag_example2.c

@@ -33,7 +33,7 @@
 #define FPRINTF(ofile, fmt, args ...) do { if (!getenv("STARPU_SSILENT")) {fprintf(ofile, fmt, ##args); }} while(0)
 #define TAG(i, iter)	((starpu_tag_t)  (((uint64_t)iter)<<32 | (i)) )
 
-struct starpu_codelet cl = {};
+struct starpu_codelet cl;
 
 #define Ni	64
 #define Nk	256
@@ -123,6 +123,7 @@ int main(int argc __attribute__((unused)) , char **argv __attribute__((unused)))
 
 	parse_args(argc, argv);
 
+	starpu_codelet_init(&cl);
 	cl.cpu_funcs[0] = cpu_codelet;
 	cl.cuda_funcs[0] = cpu_codelet;
 	cl.opencl_funcs[0] = cpu_codelet;

+ 2 - 1
examples/tag_example/tag_example3.c

@@ -35,7 +35,7 @@
 #define FPRINTF(ofile, fmt, args ...) do { if (!getenv("STARPU_SSILENT")) {fprintf(ofile, fmt, ##args); }} while(0)
 #define TAG(i, iter)	((starpu_tag_t)  (((uint64_t)iter)<<32 | (i)) )
 
-struct starpu_codelet cl = {};
+struct starpu_codelet cl;
 
 #define Ni	64
 #define Nk	256
@@ -125,6 +125,7 @@ int main(int argc __attribute__((unused)) , char **argv __attribute__((unused)))
 
 	parse_args(argc, argv);
 
+	starpu_codelet_init(&cl);
 	cl.cpu_funcs[0] = cpu_codelet;
 	cl.cuda_funcs[0] = cpu_codelet;
 	cl.opencl_funcs[0] = cpu_codelet;

+ 2 - 1
examples/tag_example/tag_restartable.c

@@ -31,7 +31,7 @@
 #define FPRINTF(ofile, fmt, args ...) do { if (!getenv("STARPU_SSILENT")) {fprintf(ofile, fmt, ##args); }} while(0)
 #define TAG(i, iter)	((starpu_tag_t)  (((uint64_t)((iter)%Nrolls))<<32 | (i)) )
 
-struct starpu_codelet cl = {};
+struct starpu_codelet cl;
 
 #define Ni	64
 #define Nk	256
@@ -132,6 +132,7 @@ int main(int argc __attribute__((unused)) , char **argv __attribute__((unused)))
 
 	parse_args(argc, argv);
 
+	starpu_codelet_init(&cl);
 	cl.cpu_funcs[0] = cpu_codelet;
 	cl.cuda_funcs[0] = cpu_codelet;
 	cl.opencl_funcs[0] = cpu_codelet;

+ 6 - 2
include/starpu_data_interfaces.h

@@ -78,6 +78,10 @@ struct starpu_data_copy_methods
 };
 
 int starpu_interface_copy(uintptr_t src, size_t src_offset, unsigned src_node, uintptr_t dst, size_t dst_offset, unsigned dst_node, size_t size, void *async_data);
+/* Allocate SIZE bytes on node NODE */
+uintptr_t starpu_malloc_on_node(unsigned dst_node, size_t size);
+/* Free ADDR on node NODE */
+void starpu_free_on_node(unsigned dst_node, uintptr_t addr, size_t size);
 
 enum starpu_data_interface_id
 {
@@ -124,7 +128,7 @@ struct starpu_data_interface_ops
 	struct starpu_multiformat_data_interface_ops* (*get_mf_ops)(void *data_interface);
 
 	/* Pack the data handle into a contiguous buffer at the address ptr and store the size of the buffer in count */
-	int (*pack_data)(starpu_data_handle_t handle, unsigned node, void **ptr, size_t *count);
+	int (*pack_data)(starpu_data_handle_t handle, unsigned node, void **ptr, ssize_t *count);
 	/* Unpack the data handle from the contiguous buffer at the address ptr */
 	int (*unpack_data)(starpu_data_handle_t handle, unsigned node, void *ptr, size_t count);
 };
@@ -422,7 +426,7 @@ void starpu_multiformat_data_register(starpu_data_handle_t *handle, unsigned hom
 
 enum starpu_data_interface_id starpu_handle_get_interface_id(starpu_data_handle_t handle);
 
-int starpu_handle_pack_data(starpu_data_handle_t handle, void **ptr, size_t *count);
+int starpu_handle_pack_data(starpu_data_handle_t handle, void **ptr, ssize_t *count);
 int starpu_handle_unpack_data(starpu_data_handle_t handle, void *ptr, size_t count);
 size_t starpu_handle_get_size(starpu_data_handle_t handle);
 

+ 2 - 4
include/starpu_stdlib.h

@@ -29,10 +29,8 @@ void starpu_malloc_set_align(size_t align);
 int starpu_malloc(void **A, size_t dim);
 int starpu_free(void *A);
 
-/* Allocate SIZE bytes on node NODE */
-uintptr_t starpu_malloc_on_node(unsigned dst_node, size_t size);
-/* Free ADDR on node NODE */
-void starpu_free_on_node(unsigned dst_node, uintptr_t addr, size_t size);
+int starpu_malloc_count(void **A, size_t dim);
+int starpu_free_count(void *A, size_t dim);
 
 #ifdef __cplusplus
 }

+ 24 - 45
mpi/examples/complex/mpi_complex.c

@@ -37,10 +37,6 @@ int main(int argc, char **argv)
 	int ret;
 	int compare;
 
-	starpu_data_handle_t handle;
-	starpu_data_handle_t handle2;
-	starpu_data_handle_t foo_handle;
-
 	ret = starpu_init(NULL);
 	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
 	ret = starpu_mpi_init(&argc, &argv, 1);
@@ -55,65 +51,48 @@ int main(int argc, char **argv)
 	}
 	else
 	{
-		if (rank == 0)
+		starpu_data_handle_t handle;
+		starpu_data_handle_t handle2;
+
+		double real[2] = {4.0, 2.0};
+		double imaginary[2] = {7.0, 9.0};
+
+		double real2[2] = {14.0, 12.0};
+		double imaginary2[2] = {17.0, 19.0};
+
+		if (rank == 1)
 		{
-			double real[2] = {4.0, 2.0};
-			double imaginary[2] = {7.0, 9.0};
+			real[0] = 0.0;
+			real[1] = 0.0;
+			imaginary[0] = 0.0;
+			imaginary[1] = 0.0;
+		}
 
-			double real2[2] = {14.0, 12.0};
-			double imaginary2[2] = {17.0, 19.0};
+		starpu_complex_data_register(&handle, 0, real, imaginary, 2);
+		starpu_complex_data_register(&handle2, -1, real2, imaginary2, 2);
 
+		if (rank == 0)
+		{
 			int *compare_ptr = &compare;
 
-			starpu_complex_data_register(&handle, 0, real, imaginary, 2);
-			starpu_complex_data_register(&handle2, -1, real2, imaginary2, 2);
-
-			starpu_insert_task(&cl_display, STARPU_R, handle, 0);
+			starpu_insert_task(&cl_display, STARPU_VALUE, "node0 initial value", strlen("node0 initial value"), STARPU_R, handle, 0);
 			starpu_mpi_isend_detached(handle, 1, 10, MPI_COMM_WORLD, NULL, NULL);
 			starpu_mpi_irecv_detached(handle2, 1, 20, MPI_COMM_WORLD, NULL, NULL);
 
-			starpu_insert_task(&cl_display, STARPU_R, handle2, 0);
+			starpu_insert_task(&cl_display, STARPU_VALUE, "node0 received value", strlen("node0 received value"), STARPU_R, handle2, 0);
 			starpu_insert_task(&cl_compare, STARPU_R, handle, STARPU_R, handle2, STARPU_VALUE, &compare_ptr, sizeof(compare_ptr), 0);
-
-			{
-				// We send a dummy variable only to check communication with predefined datatypes
-				int foo=12;
-				starpu_variable_data_register(&foo_handle, 0, (uintptr_t)&foo, sizeof(foo));
-				starpu_mpi_isend_detached(foo_handle, 1, 40, MPI_COMM_WORLD, NULL, NULL);
-				starpu_insert_task(&foo_display, STARPU_R, foo_handle, 0);
-			}
 		}
 		else if (rank == 1)
 		{
-			double real[2] = {0.0, 0.0};
-			double imaginary[2] = {0.0, 0.0};
-
-			starpu_complex_data_register(&handle, 0, real, imaginary, 2);
 			starpu_mpi_irecv_detached(handle, 0, 10, MPI_COMM_WORLD, NULL, NULL);
-			starpu_insert_task(&cl_display, STARPU_R, handle, 0);
+			starpu_insert_task(&cl_display, STARPU_VALUE, "node1 received value", strlen("node1 received value"), STARPU_R, handle, 0);
 			starpu_mpi_isend_detached(handle, 0, 20, MPI_COMM_WORLD, NULL, NULL);
-
-			{
-				// We send a dummy variable only to check communication with predefined datatypes
-				int foo=12;
-				starpu_variable_data_register(&foo_handle, -1, (uintptr_t)NULL, sizeof(foo));
-				starpu_mpi_irecv_detached(foo_handle, 0, 40, MPI_COMM_WORLD, NULL, NULL);
-				starpu_insert_task(&foo_display, STARPU_R, foo_handle, 0);
-			}
-
 		}
-	}
 
-	starpu_task_wait_for_all();
+		starpu_task_wait_for_all();
 
-	if (rank == 0)
-	{
-		starpu_data_unregister(handle2);
-	}
-	if (rank == 0 || rank == 1)
-	{
 		starpu_data_unregister(handle);
-		starpu_data_unregister(foo_handle);
+		starpu_data_unregister(handle2);
 	}
 
 	starpu_mpi_shutdown();

+ 45 - 24
mpi/src/starpu_mpi.c

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2009, 2010-2012  Université de Bordeaux 1
- * Copyright (C) 2010, 2011, 2012  Centre National de la Recherche Scientifique
+ * Copyright (C) 2009, 2010-2013  Université de Bordeaux 1
+ * Copyright (C) 2010, 2011, 2012, 2013  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -88,7 +88,8 @@ static struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle
 	_STARPU_PTHREAD_COND_INIT(&req->req_cond, NULL);
 
 	req->request_type = request_type;
-
+	req->user_datatype = -1;
+	req->count = -1;
 	req->data_handle = data_handle;
 	req->srcdst = srcdst;
 	req->mpi_tag = mpi_tag;
@@ -121,7 +122,7 @@ static void _starpu_mpi_isend_data_func(struct _starpu_mpi_req *req)
 
 	STARPU_ASSERT(req->ptr);
 
-	_STARPU_MPI_DEBUG("post MPI isend tag %d dst %d ptr %p datatype %p count %d req %p\n", req->mpi_tag, req->srcdst, req->ptr, req->datatype, (int)req->count, &req->request);
+	_STARPU_MPI_DEBUG("post MPI isend request %p type %s tag %d src %d data %p ptr %p datatype %p count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, req->datatype, (int)req->count, req->user_datatype);
 
 	_starpu_mpi_comm_amounts_inc(req->comm, req->srcdst, req->datatype, req->count);
 
@@ -143,12 +144,6 @@ static void _starpu_mpi_isend_data_func(struct _starpu_mpi_req *req)
 	_STARPU_MPI_LOG_OUT();
 }
 
-static void _starpu_mpi_isend_size_callback(void *arg)
-{
-	struct _starpu_mpi_req *req = (struct _starpu_mpi_req *) arg;
-	_starpu_mpi_isend_data_func(req);
-}
-
 static void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
 {
 	_starpu_mpi_handle_allocate_datatype(req->data_handle, &req->datatype, &req->user_datatype);
@@ -156,17 +151,37 @@ static void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
 	{
 		req->count = 1;
 		req->ptr = starpu_handle_get_local_ptr(req->data_handle);
-		_starpu_mpi_isend_data_func(req);
 	}
 	else
 	{
-		starpu_data_handle_t count_handle;
+		ssize_t psize;
+
+		// Do not pack the data, just try to find out the size
+		starpu_handle_pack_data(req->data_handle, NULL, &psize);
 
+		if (psize != -1)
+		{
+			// We already know the size of the data, let's send it to overlap with the packing of the data
+			MPI_Isend(&psize, sizeof(psize), MPI_BYTE, req->srcdst, req->mpi_tag, req->comm, &req->size_req);
+			req->count = psize;
+		}
+
+		// Pack the data
 		starpu_handle_pack_data(req->data_handle, &req->ptr, &req->count);
-		starpu_variable_data_register(&count_handle, 0, (uintptr_t)&req->count, sizeof(req->count));
-		_starpu_mpi_isend_common(count_handle, req->srcdst, req->mpi_tag, req->comm, 1, _starpu_mpi_isend_size_callback, req);
-		starpu_data_unregister_submit(count_handle);
+		if (psize == -1)
+		{
+			// We know the size now, let's send it
+			MPI_Isend(&req->count, sizeof(req->count), MPI_BYTE, req->srcdst, req->mpi_tag, req->comm, &req->size_req);
+		}
+		else
+		{
+			// We check the size returned with the 2 calls to pack is the same
+			STARPU_ASSERT(req->count == psize);
+		}
+
+		// We can send the data now
 	}
+	_starpu_mpi_isend_data_func(req);
 }
 
 static struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t data_handle,
@@ -228,7 +243,7 @@ static void _starpu_mpi_irecv_data_func(struct _starpu_mpi_req *req)
 
 	STARPU_ASSERT(req->ptr);
 
-	_STARPU_MPI_DEBUG("post MPI irecv tag %d src %d data %p ptr %p datatype %p count %d req %p \n", req->mpi_tag, req->srcdst, req->data_handle, req->ptr, req->datatype, (int)req->count, &req->request);
+	_STARPU_MPI_DEBUG("post MPI irecv request %p type %s tag %d src %d data %p ptr %p datatype %p count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, req->datatype, (int)req->count, req->user_datatype);
 
 	TRACE_MPI_IRECV_SUBMIT_BEGIN(req->srcdst, req->mpi_tag);
 
@@ -260,9 +275,6 @@ static void _starpu_mpi_irecv_size_callback(void *arg)
 
 	starpu_data_unregister(callback->handle);
 	callback->req->ptr = malloc(callback->req->count);
-#ifdef STARPU_DEVEL
-#warning TODO: in some cases, callback->req->count is incorrect, we need to fix that
-#endif
 	STARPU_ASSERT_MSG(callback->req->ptr, "cannot allocate message of size %ld\n", callback->req->count);
 	_starpu_mpi_irecv_data_func(callback->req);
 	free(callback);
@@ -286,6 +298,7 @@ static void _starpu_mpi_irecv_size_func(struct _starpu_mpi_req *req)
 		starpu_variable_data_register(&callback->handle, 0, (uintptr_t)&(callback->req->count), sizeof(callback->req->count));
 		_starpu_mpi_irecv_common(callback->handle, req->srcdst, req->mpi_tag, req->comm, 1, _starpu_mpi_irecv_size_callback, callback);
 	}
+
 }
 
 static struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, unsigned detached, void (*callback)(void *), void *arg)
@@ -340,7 +353,7 @@ static void _starpu_mpi_probe_func(struct _starpu_mpi_req *req)
 	req->count = 1;
 	req->ptr = starpu_handle_get_local_ptr(req->data_handle);
 
-	_STARPU_MPI_DEBUG("MPI probe tag %d dst %d ptr %p datatype %p count %d req %p\n", req->mpi_tag, req->srcdst, req->ptr, req->datatype, (int)req->count, req);
+	_STARPU_MPI_DEBUG("MPI probe request %p type %s tag %d src %d data %p ptr %p datatype %p count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, req->datatype, (int)req->count, req->user_datatype);
 
 	/* somebody is perhaps waiting for the MPI request to be posted */
 	_STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
@@ -440,7 +453,7 @@ static void _starpu_mpi_test_func(struct _starpu_mpi_req *testing_req)
 	/* Which is the mpi request we are testing for ? */
 	struct _starpu_mpi_req *req = testing_req->other_request;
 
-	_STARPU_MPI_DEBUG("Test request %p - mpitag %d - TYPE %s %d\n", &req->request, req->mpi_tag, _starpu_mpi_request_type(req->request_type), req->srcdst);
+	_STARPU_MPI_DEBUG("Test request %p type %s tag %d src %d data %p ptr %p datatype %p count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, req->datatype, (int)req->count, req->user_datatype);
 
 	TRACE_MPI_UTESTING_BEGIN(req->srcdst, req->mpi_tag);
 
@@ -621,7 +634,7 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req)
 {
 	_STARPU_MPI_LOG_IN();
 
-	_STARPU_MPI_DEBUG("complete MPI req %p (%s) srcdst %d - data %p - tag %d - user datatype %d\n", req, _starpu_mpi_request_type(req->request_type), req->srcdst, req->data_handle, req->mpi_tag, req->user_datatype);
+	_STARPU_MPI_DEBUG("complete MPI request %p type %s tag %d src %d data %p ptr %p datatype %p count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, req->datatype, (int)req->count, req->user_datatype);
 	if (req->request_type == PROBE_REQ)
 	{
 #ifdef STARPU_DEVEL
@@ -636,6 +649,14 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req)
 	{
 		if (req->user_datatype == 1)
 		{
+			if (req->request_type == SEND_REQ)
+			{
+				// We already know the request to send the size is completed, we just call MPI_Test to make sure that the request object is deallocated
+				MPI_Status status;
+				int flag;
+				MPI_Test(&req->size_req, &flag, &status);
+				STARPU_ASSERT(flag);
+			}
 			if (req->request_type == RECV_REQ)
 				// req->ptr is freed by starpu_handle_unpack_data
 				starpu_handle_unpack_data(req->data_handle, req->ptr, req->count);
@@ -672,7 +693,7 @@ static void _starpu_mpi_submit_new_mpi_request(void *arg)
 	_STARPU_PTHREAD_MUTEX_LOCK(&mutex);
 	_starpu_mpi_req_list_push_front(new_requests, req);
 	newer_requests = 1;
-	_STARPU_MPI_DEBUG("Pushing new request type %p (%s)\n", req, _starpu_mpi_request_type(req->request_type));
+	_STARPU_MPI_DEBUG("Pushing new request %p type %s tag %d src %d data %p ptr %p datatype %p count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, req->datatype, (int)req->count, req->user_datatype);
 	_STARPU_PTHREAD_COND_BROADCAST(&cond_progression);
 	_STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
 	_STARPU_MPI_LOG_OUT();
@@ -785,7 +806,7 @@ static void _starpu_mpi_handle_new_request(struct _starpu_mpi_req *req)
 	STARPU_ASSERT(req);
 
 	/* submit the request to MPI */
-	_STARPU_MPI_DEBUG("Handling new request type %p (%s)\n", req, _starpu_mpi_request_type(req->request_type));
+	_STARPU_MPI_DEBUG("Handling new request %p type %s tag %d src %d data %p ptr %p datatype %p count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, req->datatype, (int)req->count, req->user_datatype);
 	req->func(req);
 
 	_STARPU_MPI_LOG_OUT();

+ 14 - 11
mpi/src/starpu_mpi_insert_task.c

@@ -70,7 +70,7 @@ void _starpu_mpi_cache_empty_tables(int world_size)
 
 	if (_cache_enabled == 0) return;
 
-	_STARPU_MPI_DEBUG("Clearing htable for cache\n");
+	_STARPU_DEBUG("Clearing htable for cache\n");
 
 	for(i=0 ; i<world_size ; i++)
 	{
@@ -121,12 +121,14 @@ void starpu_mpi_cache_flush(MPI_Comm comm, starpu_data_handle_t data_handle)
 		if (avail)
 		{
 			_STARPU_MPI_DEBUG("Clearing send cache for data %p\n", data_handle);
+			free(avail);
 			HASH_DEL(_cache_sent_data[i], avail);
 		}
 		HASH_FIND_PTR(_cache_received_data[i], &data_handle, avail);
 		if (avail)
 		{
 			_STARPU_MPI_DEBUG("Clearing send cache for data %p\n", data_handle);
+			free(avail);
 			HASH_DEL(_cache_received_data[i], avail);
 		}
 	}
@@ -377,16 +379,6 @@ int starpu_mpi_insert_task(MPI_Comm comm, struct starpu_codelet *codelet, ...)
 
 	size_on_nodes = (size_t *)calloc(1, nb_nodes * sizeof(size_t));
 
-	/* Get the number of buffers and the size of the arguments */
-	va_start(varg_list, codelet);
-	arg_buffer_size = _starpu_insert_task_get_arg_size(varg_list);
-
-	if (arg_buffer_size)
-	{
-		va_start(varg_list, codelet);
-		_starpu_codelet_pack_args(arg_buffer_size, &arg_buffer, varg_list);
-	}
-
 	/* Find out whether we are to execute the data because we own the data to be written to. */
 	inconsistent_execute = 0;
 	do_execute = -1;
@@ -589,6 +581,17 @@ int starpu_mpi_insert_task(MPI_Comm comm, struct starpu_codelet *codelet, ...)
 
 	if (do_execute)
 	{
+		/* Get the number of buffers and the size of the arguments */
+		va_start(varg_list, codelet);
+		arg_buffer_size = _starpu_insert_task_get_arg_size(varg_list);
+
+		/* Pack arguments if needed */
+		if (arg_buffer_size)
+		{
+			va_start(varg_list, codelet);
+			_starpu_codelet_pack_args(arg_buffer_size, &arg_buffer, varg_list);
+		}
+
 		_STARPU_MPI_DEBUG("Execution of the codelet %p (%s)\n", codelet, codelet->name);
 		va_start(varg_list, codelet);
 		struct starpu_task *task = starpu_task_create();

+ 5 - 2
mpi/src/starpu_mpi_private.h

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2010, 2012  Université de Bordeaux 1
- * Copyright (C) 2010, 2011, 2012  Centre National de la Recherche Scientifique
+ * Copyright (C) 2010, 2011, 2012, 2013  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -77,7 +77,7 @@ LIST_TYPE(_starpu_mpi_req,
 	/* description of the data to be sent/received */
 	MPI_Datatype datatype;
 	void *ptr;
-	size_t count;
+	ssize_t count;
 	int user_datatype;
 
 	/* who are we talking to ? */
@@ -108,6 +108,9 @@ LIST_TYPE(_starpu_mpi_req,
 	unsigned detached;
 	void *callback_arg;
 	void (*callback)(void *);
+
+        /* in the case of user-defined datatypes, we need to send the size of the data */
+	MPI_Request size_req;
 );
 
 #ifdef __cplusplus

+ 3 - 1
mpi/tests/insert_task_owner2.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2011, 2012  Centre National de la Recherche Scientifique
+ * Copyright (C) 2011, 2012, 2013  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -109,7 +109,9 @@ int main(int argc, char **argv)
 		{
 			starpu_data_acquire(data_handles[i], STARPU_R);
 			values[i] = *((int *)starpu_handle_get_local_ptr(data_handles[i]));
+			starpu_data_release(data_handles[i]);
 		}
+		starpu_data_unregister(data_handles[i]);
 	}
         FPRINTF(stderr, "[%d][local ptr] VALUES: %d %d %d %d\n", rank, values[0], values[1], values[2], values[3]);
         FPRINTF(stderr, "[%d][end] VALUES: %d %d %d %d\n", rank, x[0], x[1], x[2], y);

+ 2 - 0
mpi/tests/mpi_detached_tag.c

@@ -75,6 +75,8 @@ int main(int argc, char **argv)
 	}
 
 	starpu_data_unregister(tab_handle);
+	free(tab);
+
 	starpu_mpi_shutdown();
 	starpu_shutdown();
 

+ 2 - 0
mpi/tests/mpi_irecv_detached.c

@@ -92,6 +92,8 @@ int main(int argc, char **argv)
 	}
 
 	starpu_data_unregister(tab_handle);
+	free(tab);
+
 	starpu_mpi_shutdown();
 	starpu_shutdown();
 

+ 2 - 0
mpi/tests/mpi_isend_detached.c

@@ -97,6 +97,8 @@ int main(int argc, char **argv)
 	}
 
 	starpu_data_unregister(tab_handle);
+	free(tab);
+
 	starpu_mpi_shutdown();
 	starpu_shutdown();
 

+ 1 - 3
mpi/tests/multiple_send.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2011, 2012  Centre National de la Recherche Scientifique
+ * Copyright (C) 2011, 2012, 2013  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -17,8 +17,6 @@
 #include <starpu_mpi.h>
 #include "helper.h"
 
-#define NITER	2048
-
 int main(int argc, char **argv)
 {
 	int ret, rank, size;

+ 100 - 47
mpi/tests/user_defined_datatype.c

@@ -18,49 +18,42 @@
 #include <interface/complex_interface.h>
 #include <interface/complex_codelet.h>
 
-void display_double_codelet(void *descr[], __attribute__ ((unused)) void *_args)
-{
-	double *foo = (double *)STARPU_VARIABLE_GET_PTR(descr[0]);
-	fprintf(stderr, "foo = %f\n", *foo);
-}
+#ifdef STARPU_QUICK_CHECK
+#  define ELEMENTS 10
+#else
+#  define ELEMENTS 1000
+#endif
 
-struct starpu_codelet double_display =
-{
-	.cpu_funcs = {display_double_codelet, NULL},
-	.nbuffers = 1,
-	.modes = {STARPU_R}
-};
+typedef void (*test_func)(starpu_data_handle_t *, int, int);
 
-typedef void (*test_func)(starpu_data_handle_t, struct starpu_codelet *, int, int);
-
-void test_handle_irecv_isend_detached(starpu_data_handle_t handle, struct starpu_codelet *codelet, int rank, int tag)
+void test_handle_irecv_isend_detached(starpu_data_handle_t *handles, int nb_handles, int rank)
 {
-	starpu_data_set_rank(handle, 1);
-	starpu_data_set_tag(handle, tag);
+	int i;
 
-	if (rank == 0)
-	{
-		starpu_insert_task(codelet, STARPU_R, handle, 0);
-	}
-	starpu_mpi_get_data_on_node_detached(MPI_COMM_WORLD, handle, 0, NULL, NULL);
-	if (rank == 0)
+	for(i=0 ; i<nb_handles ; i++)
 	{
-		starpu_insert_task(codelet, STARPU_R, handle, 0);
+		starpu_data_set_rank(handles[i], 1);
+		starpu_data_set_tag(handles[i], i+100);
 	}
+
+	for(i=0 ; i<nb_handles ; i++)
+		starpu_mpi_get_data_on_node_detached(MPI_COMM_WORLD, handles[i], 0, NULL, NULL);
 }
 
-void test_handle_recv_send(starpu_data_handle_t handle, struct starpu_codelet *codelet, int rank, int tag)
+void test_handle_recv_send(starpu_data_handle_t *handles, int nb_handles, int rank)
 {
+	int i;
+
 	if (rank == 1)
 	{
-		starpu_mpi_send(handle, 0, tag, MPI_COMM_WORLD);
+		for(i=0 ; i<nb_handles ; i++)
+			starpu_mpi_send(handles[i], 0, i+100, MPI_COMM_WORLD);
 	}
 	else if (rank == 0)
 	{
-		MPI_Status status;
-		starpu_insert_task(codelet, STARPU_R, handle, 0);
-		starpu_mpi_recv(handle, 1, tag, MPI_COMM_WORLD, &status);
-		starpu_insert_task(codelet, STARPU_R, handle, 0);
+		MPI_Status statuses[nb_handles];
+		for(i=0 ; i<nb_handles ; i++)
+			starpu_mpi_recv(handles[i], 1, i+100, MPI_COMM_WORLD, &statuses[i]);
 	}
 }
 
@@ -69,6 +62,7 @@ int main(int argc, char **argv)
 {
 	int rank, nodes;
 	int ret;
+	int compare = 0;
 
 	ret = starpu_init(NULL);
 	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
@@ -89,36 +83,95 @@ int main(int argc, char **argv)
 		for(func=funcs ; *func!=NULL ; func++)
 		{
 			test_func f = *func;
-			double real[2] = {0.0, 0.0};
-			double imaginary[2] = {0.0, 0.0};
-			double foo=8;
-			starpu_data_handle_t handle_complex;
-			starpu_data_handle_t handle_var;
+			int i;
+
+			starpu_data_handle_t handle_complex[ELEMENTS];
+			starpu_data_handle_t handle_vars[ELEMENTS];
+
+			double real[ELEMENTS][2];
+			double imaginary[ELEMENTS][2];
+			double foo[ELEMENTS];
+
+			double foo_compare=42;
+			double real_compare[2] = {12.0, 45.0};
+			double imaginary_compare[2] = {7.0, 42.0};
 
 			fprintf(stderr, "\nTesting with function %p\n", f);
 
+			if (rank == 0)
+			{
+				for(i=0 ; i<ELEMENTS; i++)
+				{
+					foo[i] = 8;
+					real[i][0] = 0.0;
+					real[i][1] = 0.0;
+					imaginary[i][0] = 0.0;
+					imaginary[i][1] = 0.0;
+				}
+			}
 			if (rank == 1)
 			{
-				foo = 42;
-				real[0] = 12.0;
-				real[1] = 45.0;
-				imaginary[0] = 7.0;
-				imaginary[1] = 42.0;
+				for(i=0 ; i<ELEMENTS; i++)
+				{
+					foo[i] = 42;
+					real[i][0] = 12.0;
+					real[i][1] = 45.0;
+					imaginary[i][0] = 7.0;
+					imaginary[i][1] = 42.0;
+				}
+			}
+			for(i=0 ; i<ELEMENTS ; i++)
+			{
+				starpu_complex_data_register(&handle_complex[i], 0, real[i], imaginary[i], 2);
+				starpu_variable_data_register(&handle_vars[i], 0, (uintptr_t)&foo[i], sizeof(double));
 			}
-			starpu_complex_data_register(&handle_complex, 0, real, imaginary, 2);
-			starpu_variable_data_register(&handle_var, 0, (uintptr_t)&foo, sizeof(double));
 
-			f(handle_var, &double_display, rank, 11);
-			f(handle_complex, &cl_display, rank, 22);
+			f(handle_vars, ELEMENTS, rank);
+			f(handle_complex, ELEMENTS, rank);
 
-			starpu_data_unregister(handle_complex);
-			starpu_data_unregister(handle_var);
+			for(i=0 ; i<ELEMENTS ; i++)
+			{
+				starpu_data_unregister(handle_complex[i]);
+				starpu_data_unregister(handle_vars[i]);
+			}
 			starpu_task_wait_for_all();
-			fflush(stderr);
+
+			if (rank == 0)
+			{
+				for(i=0 ; i<ELEMENTS ; i++)
+				{
+					int j;
+					compare = (foo[i] == foo_compare);
+					if (compare == 0)
+					{
+						fprintf(stderr, "ERROR. foo[%d] == %f != %f\n", i, foo[i], foo_compare);
+						goto end;
+					}
+					for(j=0 ; j<2 ; j++)
+					{
+						compare = (real[i][j] == real_compare[j]);
+						if (compare == 0)
+						{
+							fprintf(stderr, "ERROR. real[%d][%d] == %f != %f\n", i, j, real[i][j], real_compare[j]);
+							goto end;
+						}
+					}
+					for(j=0 ; j<2 ; j++)
+					{
+						compare = (imaginary[i][j] == imaginary_compare[j]);
+						if (compare == 0)
+						{
+							fprintf(stderr, "ERROR. imaginary[%d][%d] == %f != %f\n", i, j, imaginary[i][j], imaginary_compare[j]);
+							goto end;
+						}
+					}
+				}
+			}
 		}
 	}
+end:
 	starpu_mpi_shutdown();
 	starpu_shutdown();
 
-	return 0;
+	if (rank == 0) return !compare; else return ret;
 }

+ 1 - 1
socl/examples/basic/basic.c

@@ -90,7 +90,7 @@ int main(int UNUSED(argc), char** UNUSED(argv)) {
    err = clGetPlatformIDs(sizeof(platforms)/sizeof(cl_platform_id), platforms, NULL);
    check(err, "clGetPlatformIDs");
 
-   unsigned int platform_idx = -1;
+   int platform_idx = -1;
    for (i=0; i<num_platforms;i++) {
     char vendor[256];
     clGetPlatformInfo(platforms[i], CL_PLATFORM_VENDOR, sizeof(vendor), vendor, NULL);

+ 1 - 1
socl/examples/mansched/mansched.c

@@ -56,7 +56,7 @@ int main(int UNUSED(argc), char** UNUSED(argv)) {
    cl_kernel kernel;
    cl_mem s1m, s2m, dm;
    cl_command_queue cq;
-   int d;
+   unsigned int d;
    cl_int err;
 
    TYPE s1[SIZE],s2[SIZE],dst[SIZE];

+ 1 - 1
socl/examples/testmap/testmap.c

@@ -82,7 +82,7 @@ int main(int UNUSED(argc), char** UNUSED(argv)) {
    err = clGetPlatformIDs(sizeof(platforms)/sizeof(cl_platform_id), platforms, NULL);
    check(err, "clGetPlatformIDs");
 
-   unsigned int platform_idx = -1;
+   int platform_idx = -1;
    for (i=0; i<num_platforms;i++) {
     char vendor[256];
     clGetPlatformInfo(platforms[i], CL_PLATFORM_VENDOR, sizeof(vendor), vendor, NULL);

+ 2 - 0
src/common/starpu_spinlock.c

@@ -68,6 +68,7 @@ int _starpu_spin_destroy(struct _starpu_spinlock *lock STARPU_ATTRIBUTE_UNUSED)
 #endif
 }
 
+#undef _starpu_spin_lock
 int _starpu_spin_lock(struct _starpu_spinlock *lock)
 {
 #ifdef STARPU_SIMGRID
@@ -120,6 +121,7 @@ int _starpu_spin_checklocked(struct _starpu_spinlock *lock)
 #endif
 }
 
+#undef _starpu_spin_trylock
 int _starpu_spin_trylock(struct _starpu_spinlock *lock)
 {
 #ifdef STARPU_SIMGRID

+ 16 - 0
src/common/starpu_spinlock.h

@@ -30,6 +30,7 @@ struct _starpu_spinlock
 #elif defined(STARPU_SPINLOCK_CHECK)
 	pthread_mutexattr_t errcheck_attr;
 	_starpu_pthread_mutex_t errcheck_lock;
+	const char *last_taker;
 #elif defined(HAVE_PTHREAD_SPIN_LOCK)
 	_starpu_pthread_spinlock_t lock;
 #else
@@ -42,7 +43,22 @@ int _starpu_spin_init(struct _starpu_spinlock *lock);
 int _starpu_spin_destroy(struct _starpu_spinlock *lock);
 
 int _starpu_spin_lock(struct _starpu_spinlock *lock);
+#if defined(STARPU_SPINLOCK_CHECK)
+#define _starpu_spin_lock(lock) ({ \
+	_starpu_spin_lock(lock); \
+	(lock)->last_taker = __func__; \
+	0; \
+})
+#endif
 int _starpu_spin_trylock(struct _starpu_spinlock *lock);
+#if defined(STARPU_SPINLOCK_CHECK)
+#define _starpu_spin_trylock(lock) ({ \
+	int err = _starpu_spin_trylock(lock); \
+	if (!err) \
+		(lock)->last_taker = __func__; \
+	err; \
+})
+#endif
 int _starpu_spin_checklocked(struct _starpu_spinlock *lock);
 int _starpu_spin_unlock(struct _starpu_spinlock *lock);
 

+ 2 - 2
src/core/dependencies/cg.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2010, 2012  Université de Bordeaux 1
+ * Copyright (C) 2010, 2012-2013  Université de Bordeaux 1
  * Copyright (C) 2010, 2011  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -53,7 +53,7 @@ struct _starpu_cg_list
 	/* List of successors */
 	unsigned nsuccs; /* how many successors ? */
 #ifdef STARPU_DYNAMIC_DEPS_SIZE
-	unsigned succ_list_size;
+	unsigned succ_list_size; /* How many allocated items in succ */
 	struct _starpu_cg **succ;
 #else
 	struct _starpu_cg *succ[STARPU_NMAXDEPS];

+ 1 - 1
src/datawizard/data_request.c

@@ -223,7 +223,7 @@ void _starpu_data_request_append_callback(struct _starpu_data_request *r, void (
 	}
 }
 
-/* This method is called with handle's header_lock taken */
+/* This method is called with handle's header_lock taken, and unlocks it */
 static void starpu_handle_data_request_completion(struct _starpu_data_request *r)
 {
 	unsigned do_delete = 0;

+ 7 - 2
src/datawizard/filters.c

@@ -273,6 +273,7 @@ void starpu_data_unpartition(starpu_data_handle_t root_handle, unsigned gatherin
 {
 	unsigned child;
 	unsigned node;
+	unsigned sizes[root_handle->nchildren];
 
 	_starpu_spin_lock(&root_handle->header_lock);
 
@@ -287,6 +288,8 @@ void starpu_data_unpartition(starpu_data_handle_t root_handle, unsigned gatherin
 		if (child_handle->nchildren > 0)
 			starpu_data_unpartition(child_handle, gathering_node);
 
+		sizes[child] = _starpu_data_get_size(child_handle);
+
 		/* If this is a multiformat handle, we must convert the data now */
 #ifdef STARPU_DEVEL
 #warning TODO: _starpu_fetch_data_on_node should be doing it
@@ -357,11 +360,13 @@ void starpu_data_unpartition(starpu_data_handle_t root_handle, unsigned gatherin
 			}
 
 			if (local->allocated && local->automatically_allocated)
+			{
 				/* free the child data copy in a lazy fashion */
 #ifdef STARPU_DEVEL
 #warning FIXME!! this needs access to the child interface, which was freed above!
 #endif
-				_starpu_request_mem_chunk_removal(child_handle, node, 1);
+				_starpu_request_mem_chunk_removal(child_handle, node, sizes[child]);
+			}
 		}
 
 		if (!root_handle->per_node[node].allocated)
@@ -371,7 +376,7 @@ void starpu_data_unpartition(starpu_data_handle_t root_handle, unsigned gatherin
 
 		if (!isvalid && root_handle->per_node[node].allocated && root_handle->per_node[node].automatically_allocated)
 			/* free the data copy in a lazy fashion */
-			_starpu_request_mem_chunk_removal(root_handle, node, 1);
+			_starpu_request_mem_chunk_removal(root_handle, node, _starpu_data_get_size(root_handle));
 
 		/* if there was no invalid copy, the node still has a valid copy */
 		still_valid[node] = isvalid;

+ 7 - 3
src/datawizard/interfaces/data_interface.c

@@ -555,10 +555,11 @@ static void _starpu_data_unregister(starpu_data_handle_t handle, unsigned cohere
 
 	/* Destroy the data now */
 	unsigned node;
+	size_t size = _starpu_data_get_size(handle);
 	for (node = 0; node < STARPU_MAXNODES; node++)
 	{
 		/* free the data copy in a lazy fashion */
-		_starpu_request_mem_chunk_removal(handle, node, 1);
+		_starpu_request_mem_chunk_removal(handle, node, size);
 	}
 
 	_starpu_data_free_interfaces(handle);
@@ -595,6 +596,7 @@ void starpu_data_unregister_submit(starpu_data_handle_t handle)
 static void _starpu_data_invalidate(void *data)
 {
 	starpu_data_handle_t handle = data;
+	size_t size = _starpu_data_get_size(handle);
 	_starpu_spin_lock(&handle->header_lock);
 
 	unsigned node;
@@ -605,7 +607,9 @@ static void _starpu_data_invalidate(void *data)
 		if (local->allocated && local->automatically_allocated)
 		{
 			/* free the data copy in a lazy fashion */
-			_starpu_request_mem_chunk_removal(handle, node, 0);
+			_starpu_request_mem_chunk_removal(handle, node, size);
+			local->allocated = 0;
+			local->automatically_allocated = 0;
 		}
 
 		local->state = STARPU_INVALID;
@@ -648,7 +652,7 @@ int starpu_data_interface_get_next_id(void)
 	return _data_interface_number-1;
 }
 
-int starpu_handle_pack_data(starpu_data_handle_t handle, void **ptr, size_t *count)
+int starpu_handle_pack_data(starpu_data_handle_t handle, void **ptr, ssize_t *count)
 {
 	STARPU_ASSERT(handle->ops->pack_data);
 	return handle->ops->pack_data(handle, _starpu_memory_node_get_local_key(), ptr, count);

+ 48 - 20
src/datawizard/malloc.c

@@ -83,10 +83,7 @@ static struct starpu_codelet malloc_pinned_cl =
 
 int starpu_malloc(void **A, size_t dim)
 {
-#ifdef STARPU_DEVEL
-#warning TODO: we need to request _starpu_memory_manager_can_allocate_size()
-#warning TODO: if it fails, we should reclaim memory
-#endif
+	int ret=0;
 
 	if (STARPU_UNLIKELY(!_starpu_worker_may_perform_blocking_calls()))
 		return -EDEADLK;
@@ -113,9 +110,9 @@ int starpu_malloc(void **A, size_t dim)
 
                 malloc_pinned_cl.where = STARPU_CUDA;
 		struct starpu_task *task = starpu_task_create();
-			task->callback_func = NULL;
-			task->cl = &malloc_pinned_cl;
-			task->cl_arg = &s;
+		task->callback_func = NULL;
+		task->cl = &malloc_pinned_cl;
+		task->cl_arg = &s;
 
 		task->synchronous = 1;
 
@@ -123,8 +120,8 @@ int starpu_malloc(void **A, size_t dim)
 
 		push_res = _starpu_task_submit_internally(task);
 		STARPU_ASSERT(push_res != -ENODEV);
-#endif
-#endif
+#endif /* HAVE_CUDA_MEMCPY_PEER */
+#endif /* STARPU_USE_CUDA */
 	}
 //	else if (_starpu_can_submit_opencl_task())
 //	{
@@ -149,16 +146,19 @@ int starpu_malloc(void **A, size_t dim)
 //
 //		push_res = _starpu_task_submit_internally(task);
 //		STARPU_ASSERT(push_res != -ENODEV);
-//#endif
+//#endif /* STARPU_USE_OPENCL */
 //        }
         else
-#endif
+#endif /* STARPU_SIMGRID */
 	{
 #ifdef STARPU_HAVE_POSIX_MEMALIGN
 		if (malloc_align != sizeof(void*))
 		{
 			if (posix_memalign(A, malloc_align, dim))
+			{
+				ret = -ENOMEM;
 				*A = NULL;
+			}
 		}
 		else
 #elif defined(STARPU_HAVE_MEMALIGN)
@@ -167,15 +167,18 @@ int starpu_malloc(void **A, size_t dim)
 			*A = memalign(malloc_align, dim);
 		}
 		else
-#endif
+#endif /* STARPU_HAVE_POSIX_MEMALIGN */
 		{
 			*A = malloc(dim);
 		}
 	}
 
-	STARPU_ASSERT(*A);
+	if (ret == 0)
+	{
+		STARPU_ASSERT(*A);
+	}
 
-	return 0;
+	return ret;
 }
 
 #if defined(STARPU_USE_CUDA) && !defined(HAVE_CUDA_MEMCPY_PEER) && !defined(STARPU_SIMGRID)
@@ -242,9 +245,9 @@ int starpu_free(void *A)
 
                 free_pinned_cl.where = STARPU_CUDA;
 		struct starpu_task *task = starpu_task_create();
-			task->callback_func = NULL;
-			task->cl = &free_pinned_cl;
-			task->cl_arg = A;
+		task->callback_func = NULL;
+		task->cl = &free_pinned_cl;
+		task->cl_arg = A;
 
 		task->synchronous = 1;
 
@@ -283,6 +286,34 @@ int starpu_free(void *A)
 	return 0;
 }
 
+
+int starpu_malloc_count(void **A, size_t dim)
+{
+	if (_starpu_memory_manager_can_allocate_size(dim, 0) == 0)
+	{
+		size_t freed;
+		size_t reclaim = 2 * dim;
+		_STARPU_DEBUG("There is not enough memory left, we are going to reclaim %ld\n", reclaim);
+		_STARPU_TRACE_START_MEMRECLAIM(0);
+		freed = _starpu_memory_reclaim_generic(0, 0, reclaim);
+		_STARPU_TRACE_END_MEMRECLAIM(0);
+		if (freed < dim)
+		{
+			// We could not reclaim enough memory
+			*A = NULL;
+			return -ENOMEM;
+		}
+	}
+
+	return starpu_malloc(A, dim);
+}
+
+int starpu_free_count(void *A, size_t dim)
+{
+	_starpu_memory_manager_deallocate_size(dim, 0);
+	starpu_free(A);
+}
+
 #ifdef STARPU_SIMGRID
 static _starpu_pthread_mutex_t cuda_alloc_mutex = _STARPU_PTHREAD_MUTEX_INITIALIZER;
 static _starpu_pthread_mutex_t opencl_alloc_mutex = _STARPU_PTHREAD_MUTEX_INITIALIZER;
@@ -300,9 +331,6 @@ starpu_malloc_on_node(unsigned dst_node, size_t size)
 	if (_starpu_memory_manager_can_allocate_size(size, dst_node) == 0)
 		return 0;
 
-#ifdef STARPU_DEVEL
-#warning TODO: we need to use starpu_malloc which should itself inquire from the memory manager is there is enough available memory
-#endif
 	switch(starpu_node_get_kind(dst_node))
 	{
 		case STARPU_CPU_RAM:

+ 88 - 57
src/datawizard/memalloc.c

@@ -21,6 +21,7 @@
 #include <starpu.h>
 
 /* This per-node RW-locks protect mc_list and memchunk_cache entries */
+/* Note: handle header lock is always taken before this */
 static _starpu_pthread_rwlock_t mc_rwlock[STARPU_MAXNODES];
 
 /* This per-node spinlock protect lru_list */
@@ -173,7 +174,8 @@ static void transfer_subtree_to_node(starpu_data_handle_t handle, unsigned src_n
 			dst_replicate->refcnt--;
 			STARPU_ASSERT(handle->busy_count >= 2);
 			handle->busy_count -= 2;
-			_starpu_data_check_not_busy(handle);
+			ret = _starpu_data_check_not_busy(handle);
+			STARPU_ASSERT(ret == 0);
 
 			break;
 		case STARPU_SHARED:
@@ -225,24 +227,15 @@ static size_t free_memory_on_node(struct _starpu_mem_chunk *mc, unsigned node)
 
 	starpu_data_handle_t handle = mc->data;
 
-	/* Does this memory chunk refers to a handle that does not exist
-	 * anymore ? */
-	unsigned data_was_deleted = mc->data_was_deleted;
-
 	struct _starpu_data_replicate *replicate = mc->replicate;
 
-//	while (_starpu_spin_trylock(&handle->header_lock))
-//		_starpu_datawizard_progress(_starpu_memory_node_get_local_key());
-
-#ifdef STARPU_DEVEL
-#warning can we block here ?
-#endif
-//	_starpu_spin_lock(&handle->header_lock);
+	if (handle)
+		_starpu_spin_checklocked(&handle->header_lock);
 
 	if (mc->automatically_allocated &&
-		(!handle || data_was_deleted || replicate->refcnt == 0))
+		(!handle || replicate->refcnt == 0))
 	{
-		if (handle && !data_was_deleted)
+		if (handle)
 			STARPU_ASSERT(replicate->allocated);
 
 #if defined(STARPU_USE_CUDA) && defined(HAVE_CUDA_MEMCPY_PEER) && !defined(STARPU_SIMGRID)
@@ -258,7 +251,7 @@ static size_t free_memory_on_node(struct _starpu_mem_chunk *mc, unsigned node)
 
 		mc->ops->free_data_on_node(mc->chunk_interface, node);
 
-		if (handle && !data_was_deleted)
+		if (handle)
 		{
 			replicate->allocated = 0;
 
@@ -268,12 +261,10 @@ static size_t free_memory_on_node(struct _starpu_mem_chunk *mc, unsigned node)
 
 		freed = mc->size;
 
-		if (handle && !data_was_deleted)
+		if (handle)
 			STARPU_ASSERT(replicate->refcnt == 0);
 	}
 
-//	_starpu_spin_unlock(&handle->header_lock);
-
 	return freed;
 }
 
@@ -282,6 +273,10 @@ static size_t free_memory_on_node(struct _starpu_mem_chunk *mc, unsigned node)
 static size_t do_free_mem_chunk(struct _starpu_mem_chunk *mc, unsigned node)
 {
 	size_t size;
+	starpu_data_handle_t handle = mc->data;
+
+	if (handle)
+		_starpu_spin_checklocked(&handle->header_lock);
 
 	mc->replicate->mc=NULL;
 
@@ -401,7 +396,6 @@ static void reuse_mem_chunk(unsigned node, struct _starpu_data_replicate *new_re
 	memcpy(new_replicate->data_interface, mc->chunk_interface, old_replicate->handle->ops->interface_size);
 
 	mc->data = new_replicate->handle;
-	mc->data_was_deleted = 0;
 	/* mc->ops, mc->footprint and mc->interface should be
  	 * unchanged ! */
 
@@ -528,26 +522,32 @@ static unsigned try_to_find_reusable_mem_chunk(unsigned node, starpu_data_handle
  */
 static size_t flush_memchunk_cache(unsigned node, size_t reclaim)
 {
-	struct _starpu_mem_chunk *mc, *next_mc;
+	struct _starpu_mem_chunk *mc;
 
 	size_t freed = 0;
 
-	for (mc = _starpu_mem_chunk_list_begin(memchunk_cache[node]);
-	     mc != _starpu_mem_chunk_list_end(memchunk_cache[node]);
-	     mc = next_mc)
-	{
-		next_mc = _starpu_mem_chunk_list_next(mc);
+	_STARPU_PTHREAD_RWLOCK_WRLOCK(&mc_rwlock[node]);
+	while (!_starpu_mem_chunk_list_empty(memchunk_cache[node])) {
+		mc = _starpu_mem_chunk_list_pop_front(memchunk_cache[node]);
+		_STARPU_PTHREAD_RWLOCK_UNLOCK(&mc_rwlock[node]);
 
-		freed += free_memory_on_node(mc, node);
+		starpu_data_handle_t handle = mc->data;
 
-		_starpu_mem_chunk_list_erase(memchunk_cache[node], mc);
+		if (handle)
+			while (_starpu_spin_trylock(&handle->header_lock))
+				_starpu_datawizard_progress(_starpu_memory_node_get_local_key(), 0);
+		freed += free_memory_on_node(mc, node);
+		if (handle)
+			_starpu_spin_unlock(&handle->header_lock);
 
 		free(mc->chunk_interface);
 		_starpu_mem_chunk_delete(mc);
+
+		_STARPU_PTHREAD_RWLOCK_WRLOCK(&mc_rwlock[node]);
 		if (reclaim && freed>reclaim)
 			break;
 	}
-
+	_STARPU_PTHREAD_RWLOCK_UNLOCK(&mc_rwlock[node]);
 	return freed;
 }
 
@@ -561,42 +561,69 @@ static size_t free_potentially_in_use_mc(unsigned node, unsigned force, size_t r
 {
 	size_t freed = 0;
 
-	struct _starpu_mem_chunk *mc, *next_mc;
+	struct _starpu_mem_chunk *mc, *next_mc = NULL;
 
-	for (mc = _starpu_mem_chunk_list_begin(mc_list[node]);
-	     mc != _starpu_mem_chunk_list_end(mc_list[node]);
-	     mc = next_mc)
+	/*
+	 * We have to unlock mc_rwlock before locking header_lock, so we have
+	 * to be careful with the list.  We try to do just one pass, by
+	 * remembering the next mc to be tried. If it gets dropped, we restart
+	 * from zero. So we continue until we go through the whole list without
+	 * finding anything to free.
+	 */
+
+	while (1)
 	{
-		/* there is a risk that the memory chunk is freed
-		   before next iteration starts: so we compute the next
-		   element of the list now */
+		_STARPU_PTHREAD_RWLOCK_WRLOCK(&mc_rwlock[node]);
+		/* A priori, start from the beginning */
+		mc = _starpu_mem_chunk_list_begin(mc_list[node]);
+		if (next_mc)
+			/* Unless we might restart from where we were */
+			for (mc = _starpu_mem_chunk_list_begin(mc_list[node]);
+			     mc != _starpu_mem_chunk_list_end(mc_list[node]);
+			     mc = _starpu_mem_chunk_list_next(mc))
+				if (mc == next_mc)
+					/* Yes, restart from there.  */
+					break;
+
+		if (mc == _starpu_mem_chunk_list_end(mc_list[node]))
+		{
+			/* But it was the last one of the list :/ */
+			_STARPU_PTHREAD_RWLOCK_UNLOCK(&mc_rwlock[node]);
+			break;
+		}
+		/* Remember where to try next */
 		next_mc = _starpu_mem_chunk_list_next(mc);
+		_STARPU_PTHREAD_RWLOCK_UNLOCK(&mc_rwlock[node]);
 
 		if (!force)
 		{
 			freed += try_to_free_mem_chunk(mc, node);
-			#if 1
+
 			if (reclaim && freed > reclaim)
 				break;
-			#endif
 		}
 		else
 		{
-			/* We must free the memory now: note that data
-			 * coherency is not maintained in that case ! */
+			starpu_data_handle_t handle = mc->data;
+
+			_starpu_spin_lock(&handle->header_lock);
+
+			/* We must free the memory now, because we are
+			 * terminating the drivers: note that data coherency is
+			 * not maintained in that case ! */
 			freed += do_free_mem_chunk(mc, node);
+
+			_starpu_spin_unlock(&handle->header_lock);
 		}
 	}
 
 	return freed;
 }
 
-static size_t reclaim_memory_generic(unsigned node, unsigned force, size_t reclaim)
+size_t _starpu_memory_reclaim_generic(unsigned node, unsigned force, size_t reclaim)
 {
 	size_t freed = 0;
 
-	_STARPU_PTHREAD_RWLOCK_WRLOCK(&mc_rwlock[node]);
-
 	starpu_lru(node);
 
 	/* remove all buffers for which there was a removal request */
@@ -606,8 +633,6 @@ static size_t reclaim_memory_generic(unsigned node, unsigned force, size_t recla
 	if (reclaim && freed<reclaim)
 		freed += free_potentially_in_use_mc(node, force, reclaim);
 
-	_STARPU_PTHREAD_RWLOCK_UNLOCK(&mc_rwlock[node]);
-
 	return freed;
 
 }
@@ -619,7 +644,7 @@ static size_t reclaim_memory_generic(unsigned node, unsigned force, size_t recla
  */
 size_t _starpu_free_all_automatically_allocated_buffers(unsigned node)
 {
-	return reclaim_memory_generic(node, 1, 0);
+	return _starpu_memory_reclaim_generic(node, 1, 0);
 }
 
 static struct _starpu_mem_chunk *_starpu_memchunk_init(struct _starpu_data_replicate *replicate, size_t interface_size, unsigned automatically_allocated)
@@ -633,7 +658,6 @@ static struct _starpu_mem_chunk *_starpu_memchunk_init(struct _starpu_data_repli
 	mc->data = handle;
 	mc->footprint = _starpu_compute_data_footprint(handle);
 	mc->ops = handle->ops;
-	mc->data_was_deleted = 0;
 	mc->automatically_allocated = automatically_allocated;
 	mc->relaxed_coherency = replicate->relaxed_coherency;
 	mc->replicate = replicate;
@@ -669,16 +693,14 @@ static void register_mem_chunk(struct _starpu_data_replicate *replicate, unsigne
 /* This function is called when the handle is destroyed (eg. when calling
  * unregister or unpartition). It puts all the memchunks that refer to the
  * specified handle into the cache.
- * handle_deleted specifies whether the handle is deleted or not (and thus we
- * need to update it)
  */
-void _starpu_request_mem_chunk_removal(starpu_data_handle_t handle, unsigned node, int handle_deleted)
+void _starpu_request_mem_chunk_removal(starpu_data_handle_t handle, unsigned node, size_t size)
 {
-	_STARPU_PTHREAD_RWLOCK_WRLOCK(&mc_rwlock[node]);
+	_starpu_spin_checklocked(&handle->header_lock);
 
-	size_t size = _starpu_data_get_size(handle);
+	_STARPU_PTHREAD_RWLOCK_WRLOCK(&mc_rwlock[node]);
 
-	/* TODO: expensive, handle should its own list of chunks? */
+	/* TODO: expensive, handle should have its own list of chunks? */
 	/* iterate over the list of memory chunks and remove the entry */
 	struct _starpu_mem_chunk *mc, *next_mc;
 	for (mc = _starpu_mem_chunk_list_begin(mc_list[node]);
@@ -690,8 +712,13 @@ void _starpu_request_mem_chunk_removal(starpu_data_handle_t handle, unsigned nod
 		if (mc->data == handle)
 		{
 			/* we found the data */
+
+			/* Record the allocated size, so that later in memory
+			 * reclaiming we can estimate how much memory we free
+			 * by freeing this.  */
 			mc->size = size;
-			mc->data_was_deleted = handle_deleted;
+			/* This memchunk doesn't have to do with the data any more. */
+			mc->data = NULL;
 
 			/* remove it from the main list */
 			_starpu_mem_chunk_list_erase(mc_list[node], mc);
@@ -738,6 +765,7 @@ static ssize_t _starpu_allocate_interface(starpu_data_handle_t handle, struct _s
 {
 	unsigned attempts = 0;
 	ssize_t allocated_memory;
+	int ret;
 
 	_starpu_spin_checklocked(&handle->header_lock);
 
@@ -799,12 +827,10 @@ static ssize_t _starpu_allocate_interface(starpu_data_handle_t handle, struct _s
 			_STARPU_TRACE_START_MEMRECLAIM(dst_node);
 			if (is_prefetch)
 			{
-				_STARPU_PTHREAD_RWLOCK_WRLOCK(&mc_rwlock[dst_node]);
 				flush_memchunk_cache(dst_node, reclaim);
-				_STARPU_PTHREAD_RWLOCK_UNLOCK(&mc_rwlock[dst_node]);
 			}
 			else
-				reclaim_memory_generic(dst_node, 0, reclaim);
+				_starpu_memory_reclaim_generic(dst_node, 0, reclaim);
 			_STARPU_TRACE_END_MEMRECLAIM(dst_node);
 
 		        while (_starpu_spin_trylock(&handle->header_lock))
@@ -814,7 +840,8 @@ static ssize_t _starpu_allocate_interface(starpu_data_handle_t handle, struct _s
 			STARPU_ASSERT(replicate->refcnt >= 0);
 			STARPU_ASSERT(handle->busy_count > 0);
 			handle->busy_count--;
-			_starpu_data_check_not_busy(handle);
+			ret = _starpu_data_check_not_busy(handle);
+			STARPU_ASSERT(ret == 0);
 		}
 
 	}
@@ -894,6 +921,8 @@ static void _starpu_memchunk_recently_used_move(struct _starpu_mem_chunk *mc, un
 
 static void starpu_lru(unsigned node)
 {
+	_STARPU_PTHREAD_RWLOCK_WRLOCK(&mc_rwlock[node]);
+
 	_starpu_spin_lock(&lru_rwlock[node]);
 	while (!_starpu_mem_chunk_lru_list_empty(starpu_lru_list[node]))
 	{
@@ -903,6 +932,8 @@ static void starpu_lru(unsigned node)
 		_starpu_mem_chunk_lru_delete(mc_lru);
 	}
 	_starpu_spin_unlock(&lru_rwlock[node]);
+
+	_STARPU_PTHREAD_RWLOCK_UNLOCK(&mc_rwlock[node]);
 }
 
 #ifdef STARPU_MEMORY_STATS

+ 3 - 3
src/datawizard/memalloc.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2009, 2010, 2012  Université de Bordeaux 1
+ * Copyright (C) 2009, 2010, 2012-2013  Université de Bordeaux 1
  * Copyright (C) 2010, 2011, 2012, 2013  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -42,7 +42,6 @@ LIST_TYPE(_starpu_mem_chunk,
 	struct starpu_data_interface_ops *ops;
 	void *chunk_interface;
 	unsigned automatically_allocated;
-	unsigned data_was_deleted;
 
 	/* the size is only set when calling _starpu_request_mem_chunk_removal(),
          * it is needed by free_memory_on_node() which is called when
@@ -63,11 +62,12 @@ LIST_TYPE(_starpu_mem_chunk_lru,
 
 void _starpu_init_mem_chunk_lists(void);
 void _starpu_deinit_mem_chunk_lists(void);
-void _starpu_request_mem_chunk_removal(starpu_data_handle_t handle, unsigned node, int handle_deleted);
+void _starpu_request_mem_chunk_removal(starpu_data_handle_t handle, unsigned node, size_t size);
 int _starpu_allocate_memory_on_node(starpu_data_handle_t handle, struct _starpu_data_replicate *replicate, unsigned is_prefetch);
 size_t _starpu_free_all_automatically_allocated_buffers(unsigned node);
 void _starpu_memchunk_recently_used(struct _starpu_mem_chunk *mc, unsigned node);
 
 void _starpu_display_memory_stats_by_node(int node);
+size_t _starpu_memory_reclaim_generic(unsigned node, unsigned force, size_t reclaim);
 
 #endif

+ 21 - 3
src/drivers/cpu/driver_cpu.c

@@ -191,22 +191,40 @@ _starpu_get_worker_from_driver(struct starpu_driver *d)
 
 static size_t _starpu_cpu_get_global_mem_size(int devid, struct _starpu_machine_config *config)
 {
+	ssize_t global_mem;
+	ssize_t limit;
+
+	limit = starpu_get_env_number("STARPU_LIMIT_CPU_MEM");
+#ifdef STARPU_DEVEL
+#  warning TODO: take into account NUMA node and check STARPU_LIMIT_CPU_numanode_MEM
+#endif
+
 #if defined(STARPU_HAVE_HWLOC)
         int depth_node;
 	struct starpu_machine_topology *topology = &config->topology;
         depth_node = hwloc_get_type_depth(topology->hwtopology, HWLOC_OBJ_NODE);
 
 	if (depth_node == HWLOC_TYPE_DEPTH_UNKNOWN)
-	     return hwloc_get_root_obj(topology->hwtopology)->memory.total_memory;
+	     global_mem = hwloc_get_root_obj(topology->hwtopology)->memory.total_memory;
 	else
-	     return hwloc_get_obj_by_depth(topology->hwtopology, depth_node, devid)->memory.local_memory;
+	     global_mem = hwloc_get_obj_by_depth(topology->hwtopology, depth_node, devid)->memory.local_memory;
 
 #else /* STARPU_HAVE_HWLOC */
 #ifdef STARPU_DEVEL
 #  warning use sysinfo when available to get global size
 #endif
-	return 0;
+	global_mem = 0;
 #endif
+
+	if (limit == -1)
+		// No limit is defined, we return the global memory size
+		return global_mem;
+	else if (limit*1024*1024 > global_mem)
+		// The requested limit is higher than what is available, we return the global memory size
+		return global_mem;
+	else
+		// We limit the memory
+		return limit*1024*1024;
 }
 
 int _starpu_cpu_driver_init(struct starpu_driver *d)

+ 1 - 0
tests/Makefile.am

@@ -134,6 +134,7 @@ noinst_PROGRAMS =				\
 	main/starpu_init			\
 	main/starpu_worker_exists		\
 	main/submit				\
+	datawizard/allocate			\
 	datawizard/acquire_cb			\
 	datawizard/acquire_cb_insert		\
 	datawizard/acquire_release		\

+ 87 - 0
tests/datawizard/allocate.c

@@ -0,0 +1,87 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2013  Centre National de la Recherche Scientifique
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <starpu.h>
+#include "../helper.h"
+#include <stdlib.h>
+#include <datawizard/memory_manager.h>
+
+#if !defined(STARPU_HAVE_SETENV)
+#warning setenv is not defined. Skipping test
+int main(int argc, char **argv)
+{
+	return STARPU_TEST_SKIPPED;
+}
+#else
+
+int main(int argc, char **argv)
+{
+	int ret;
+	float *buffer;
+	float *buffer2;
+	float *buffer3;
+	size_t global_size;
+
+	setenv("STARPU_LIMIT_CUDA_MEM", "1", 1);
+	setenv("STARPU_LIMIT_OPENCL_MEM", "1", 1);
+	setenv("STARPU_LIMIT_CPU_MEM", "1", 1);
+
+        ret = starpu_init(NULL);
+	if (ret == -ENODEV) return STARPU_TEST_SKIPPED;
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
+
+	global_size = _starpu_memory_manager_get_global_memory_size(0);
+	if (global_size == 0)
+	{
+		FPRINTF(stderr, "Global memory size unavailable, skip the test\n");
+		starpu_shutdown();
+		return STARPU_TEST_SKIPPED;
+	}
+	STARPU_CHECK_RETURN_VALUE_IS((int)global_size, 1*1024*1024, "get_global_memory_size");
+	FPRINTF(stderr, "Available memory size on node 0: %ld\n", global_size);
+
+	ret = starpu_malloc_count((void **)&buffer, 1);
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_malloc_count");
+	FPRINTF(stderr, "Allocation succesfull for 1 b\n");
+
+	ret = starpu_malloc_count((void **)&buffer2, 1*1024*512);
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_malloc_count");
+	FPRINTF(stderr, "Allocation succesfull for %d b\n", 1*1024*512);
+
+	ret = starpu_malloc_count((void **)&buffer3, 1*1024*512);
+	STARPU_CHECK_RETURN_VALUE_IS(ret, -ENOMEM, "starpu_malloc_count");
+	FPRINTF(stderr, "Allocation failed for %d b\n", 1*1024*512);
+
+	ret = starpu_malloc((void **)&buffer3, 1*1024*512);
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_malloc");
+	FPRINTF(stderr, "Allocation successful for %d b\n", 1*1024*512);
+	starpu_free(buffer3);
+
+	starpu_free_count(buffer2, 1*1024*512);
+	FPRINTF(stderr, "Freeing %d b\n", 1*1024*512);
+
+	ret = starpu_malloc_count((void **)&buffer3, 1*1024*512);
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_malloc_count");
+	FPRINTF(stderr, "Allocation succesfull for %d b\n", 1*1024*512);
+
+	starpu_free_count(buffer3, 1*1024*512);
+	starpu_free_count(buffer, 1);
+
+	starpu_shutdown();
+	return 0;
+}
+
+ #endif

+ 4 - 2
tests/loader.c

@@ -110,13 +110,16 @@ static void launch_gdb(const char *exe)
 #endif	/* STARPU_GDB_PATH */
 }
 
+static char *test_name;
+
 static void test_cleaner(int sig)
 {
 	pid_t child_gid;
 
 	// send signal to all loader family members
-	fprintf(stderr, "[error] test has been blocked for %d seconds. Mark it as failed\n", timeout);
+	fprintf(stderr, "[error] test %s has been blocked for %d seconds. Mark it as failed\n", test_name, timeout);
 	child_gid = getpgid(child_pid);
+	launch_gdb(test_name);
 	kill(-child_gid, SIGKILL);
 	exit(EXIT_FAILURE);
 }
@@ -149,7 +152,6 @@ static void decode(char **src, char *motif, const char *value)
 int main(int argc, char *argv[])
 {
 	int   child_exit_status;
-	char *test_name;
 	char *test_args;
 	int   status;
 	char *launcher;

+ 2 - 2
tools/gdbinit

@@ -1,7 +1,7 @@
 
 # StarPU --- Runtime system for heterogeneous multicore architectures.
 #
-# Copyright (C) 2010-2012  Université de Bordeaux 1
+# Copyright (C) 2010-2013  Université de Bordeaux 1
 # Copyright (C) 2010, 2011, 2012  Centre National de la Recherche Scientifique
 #
 # StarPU is free software; you can redistribute it and/or modify
@@ -92,7 +92,7 @@ end
 define starpu-tasks-on-worker
   set language c
   set $worker=config->workers[$arg0]
-  set $task=$worker->local_tasks->head
+  set $task=$worker.local_tasks.head
   while $task != 0x0
     starpu-print-task $task
     set $task=$task->next

+ 26 - 0
tools/valgrind/openmpi.suppr

@@ -114,3 +114,29 @@
    fun:ompi_request_default_test
    fun:PMPI_Test
 }
+
+{
+   suppr16
+   Memcheck:Leak
+   fun:malloc
+   fun:ompi_ddt_set_args
+   fun:PMPI_Type_vector
+}
+
+{
+   suppr17
+   Memcheck:Leak
+   fun:malloc
+   fun:ompi_ddt_optimize_short.constprop.0
+   fun:ompi_ddt_commit
+   fun:PMPI_Type_commit
+}
+
+{
+   suppr18
+   Memcheck:Leak
+   fun:calloc
+   fun:ompi_ddt_create
+   fun:ompi_ddt_create_vector
+   fun:PMPI_Type_vector
+}