Browse Source

Merge branch 'master' into fpga

Nathalie Furmento 5 years ago
parent
commit
9be83fdcdf

File diff suppressed because it is too large
+ 69 - 43
doc/doxygen/chapters/410_mpi_support.doxy


+ 1 - 0
examples/Makefile.am

@@ -226,6 +226,7 @@ STARPU_EXAMPLES +=				\
 	cpp/incrementer_cpp			\
 	cpp/add_vectors				\
 	cpp/add_vectors_interface		\
+	filters/fread				\
 	filters/fvector				\
 	filters/fblock				\
 	filters/fmatrix				\

+ 146 - 0
examples/filters/fread.c

@@ -0,0 +1,146 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2020       Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <starpu.h>
+
+#define NX    20
+#define PARTS 2
+
+#define FPRINTF(ofile, fmt, ...) do { if (!getenv("STARPU_SSILENT")) {fprintf(ofile, fmt, ## __VA_ARGS__); }} while(0)
+
+void display_func(void *buffers[], void *cl_arg)
+{
+        unsigned i;
+
+        /* length of the vector */
+        unsigned n = STARPU_VECTOR_GET_NX(buffers[0]);
+        /* local copy of the vector pointer */
+        int *val = (int *)STARPU_VECTOR_GET_PTR(buffers[0]);
+
+	FPRINTF(stderr, "vector with n=%u : ", n);
+        for (i = 0; i < n; i++)
+		FPRINTF(stderr, "%5d ", val[i]);
+	FPRINTF(stderr, "\n");
+}
+
+void cpu_func(void *buffers[], void *cl_arg)
+{
+        unsigned i;
+
+        /* length of the vector */
+        unsigned n = STARPU_VECTOR_GET_NX(buffers[0]);
+        /* local copy of the vector pointer */
+        int *val = (int *)STARPU_VECTOR_GET_PTR(buffers[0]);
+
+	FPRINTF(stderr, "computing on vector with n=%u\n", n);
+        for (i = 0; i < n; i++)
+                val[i] *= 2;
+}
+
+int main(void)
+{
+	int i;
+        int vector[NX];
+        starpu_data_handle_t handle;
+	starpu_data_handle_t subhandles[PARTS];
+	int ret;
+
+        struct starpu_codelet cl =
+	{
+                .cpu_funcs = {cpu_func},
+                .cpu_funcs_name = {"cpu_func"},
+                .nbuffers = 1,
+		.modes = {STARPU_RW},
+		.name = "vector_scal"
+        };
+        struct starpu_codelet print_cl =
+	{
+                .cpu_funcs = {display_func},
+                .cpu_funcs_name = {"display_func"},
+                .nbuffers = 1,
+		.modes = {STARPU_R},
+		.name = "vector_display"
+        };
+
+        for(i=0 ; i<NX ; i++) vector[i] = i;
+
+	ret = starpu_init(NULL);
+	if (ret == -ENODEV)
+		exit(77);
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
+
+	/* Declare data to StarPU */
+	starpu_vector_data_register(&handle, STARPU_MAIN_RAM, (uintptr_t)vector, NX, sizeof(vector[0]));
+
+        /* Partition the vector in PARTS sub-vectors */
+	struct starpu_data_filter f =
+	{
+		.filter_func = starpu_vector_filter_block,
+		.nchildren = PARTS
+	};
+	starpu_data_partition_plan(handle, &f, subhandles);
+
+	ret = starpu_task_insert(&print_cl,
+				 STARPU_R, handle,
+				 0);
+	if (ret == -ENODEV) goto enodev;
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");
+
+        /* Submit a task on each sub-vector */
+	for (i=0; i<PARTS; i++)
+	{
+		ret = starpu_task_insert(&cl,
+					 STARPU_RW, subhandles[i],
+					 0);
+		if (ret == -ENODEV) goto enodev;
+		STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");
+	}
+
+	/* Submit a read on the whole vector */
+	ret = starpu_task_insert(&print_cl,
+				 STARPU_R, handle,
+				 0);
+	if (ret == -ENODEV) goto enodev;
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");
+
+        /* Submit a read on each sub-vector */
+	for (i=0; i<PARTS; i++)
+	{
+		ret = starpu_task_insert(&print_cl,
+					 STARPU_R, subhandles[i],
+					 0);
+		if (ret == -ENODEV) goto enodev;
+		STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");
+	}
+
+	/* Submit a read on the whole vector */
+	ret = starpu_task_insert(&print_cl,
+				 STARPU_R, handle,
+				 0);
+	if (ret == -ENODEV) goto enodev;
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");
+
+	starpu_data_partition_clean(handle, PARTS, subhandles);
+        starpu_data_unregister(handle);
+	starpu_shutdown();
+
+	return 0;
+
+enodev:
+	FPRINTF(stderr, "WARNING: No one can execute this task\n");
+	starpu_shutdown();
+	return 77;
+}

+ 7 - 1
include/starpu_data.h

@@ -161,7 +161,13 @@ void starpu_data_unregister_no_coherency(starpu_data_handle_t handle);
 
 /**
    Destroy the data \p handle once it is no longer needed by any
-   submitted task. No coherency is assumed.
+   submitted task. No coherency is provided.
+
+   This is not safe to call starpu_data_unregister_submit() on a handle that
+   comes from the registration of a non-NULL application home buffer, since the
+   moment when the unregistration will happen is unknown to the
+   application. Only calling starpu_shutdown() allows to be sure that the data
+   was really unregistered.
 */
 void starpu_data_unregister_submit(starpu_data_handle_t handle);
 

+ 7 - 1
mpi/examples/Makefile.am

@@ -336,7 +336,8 @@ starpu_mpi_EXAMPLES	+=			\
 
 examplebin_PROGRAMS +=				\
 	user_datatype/user_datatype		\
-	user_datatype/user_datatype2
+	user_datatype/user_datatype2		\
+	user_datatype/user_datatype_early
 
 user_datatype_user_datatype_SOURCES =		\
 	user_datatype/user_datatype.c		\
@@ -346,9 +347,14 @@ user_datatype_user_datatype2_SOURCES =		\
 	user_datatype/user_datatype2.c		\
 	user_datatype/my_interface.c
 
+user_datatype_user_datatype_early_SOURCES =	\
+	user_datatype/user_datatype_early.c	\
+	user_datatype/my_interface.c
+
 if !STARPU_SIMGRID
 starpu_mpi_EXAMPLES	+=			\
 	user_datatype/user_datatype2		\
+	user_datatype/user_datatype_early	\
 	user_datatype/user_datatype
 endif
 

+ 1 - 1
mpi/examples/matrix_decomposition/mpi_cholesky.c

@@ -68,7 +68,7 @@ int main(int argc, char **argv)
 	starpu_mpi_shutdown();
 
 #ifndef STARPU_SIMGRID
-	if (check)
+	if (check && rank == 0)
 		assert(correctness);
 #endif
 

+ 91 - 21
mpi/examples/matrix_decomposition/mpi_cholesky_codelets.c

@@ -115,7 +115,7 @@ static void run_cholesky(starpu_data_handle_t **data_handles, int rank, int node
 	}
 }
 
-/* TODO: generated from compiler polyhedral analysis of classical algorithm */
+/* TODO: generate from compiler polyhedral analysis of classical algorithm */
 static void run_cholesky_column(starpu_data_handle_t **data_handles, int rank, int nodes)
 {
 	unsigned k, m, n;
@@ -169,10 +169,10 @@ static void run_cholesky_column(starpu_data_handle_t **data_handles, int rank, i
 			starpu_data_wont_use(data_handles[m][n]);
 }
 
-/* TODO: generated from compiler polyhedral analysis of classical algorithm */
+/* TODO: generate from compiler polyhedral analysis of classical algorithm */
 static void run_cholesky_antidiagonal(starpu_data_handle_t **data_handles, int rank, int nodes)
 {
-	unsigned a, b, c;
+	unsigned a, c;
 	unsigned k, m, n;
 	unsigned unbound_prio = STARPU_MAX_PRIO == INT_MAX && STARPU_MIN_PRIO == INT_MIN;
 
@@ -185,24 +185,21 @@ static void run_cholesky_antidiagonal(starpu_data_handle_t **data_handles, int r
 	{
 		starpu_iteration_push(a);
 
-		unsigned bfirst;
+		unsigned nfirst;
 		if (2*a < nblocks)
-			bfirst = 0;
+			nfirst = 0;
 		else
-			bfirst = 2*a - (nblocks-1);
+			nfirst = 2*a - (nblocks-1);
 
 		/* column within first antidiagonal for a */
-		for (b = bfirst; b <= a; b++)
+		for (n = nfirst; n <= a; n++)
 		{
-			/* column */
-			n = b;
 			/* row */
-			m = 2*a-b;
+			m = 2*a-n;
 
 			/* Accumulate updates from TRSMs */
-			for (c = 0; c < n; c++)
+			for (k = 0; k < n; k++)
 			{
-				k = c;
 				starpu_mpi_task_insert(MPI_COMM_WORLD, &cl22,
 						       STARPU_PRIORITY, noprio ? STARPU_DEFAULT_PRIO : unbound_prio ? (int)(2*nblocks - 2*k - m - n) : ((n == k+1) && (m == k+1))?STARPU_MAX_PRIO:STARPU_DEFAULT_PRIO,
 						       STARPU_R, data_handles[n][k],
@@ -211,10 +208,10 @@ static void run_cholesky_antidiagonal(starpu_data_handle_t **data_handles, int r
 						       0);
 			}
 
-			if (b < a)
+			/* k = n */
+			if (n < a)
 			{
 				/* non-diagonal block, solve */
-				k = n;
 				starpu_mpi_task_insert(MPI_COMM_WORLD, &cl21,
 						       STARPU_PRIORITY, noprio ? STARPU_DEFAULT_PRIO : unbound_prio ? (int)(2*nblocks - 2*k - m) : (m == k+1)?STARPU_MAX_PRIO:STARPU_DEFAULT_PRIO,
 						       STARPU_R, data_handles[k][k],
@@ -224,7 +221,6 @@ static void run_cholesky_antidiagonal(starpu_data_handle_t **data_handles, int r
 			else
 			{
 				/* diagonal block, factorize */
-				k = a;
 				starpu_mpi_task_insert(MPI_COMM_WORLD, &cl11,
 						       STARPU_PRIORITY, noprio ? STARPU_DEFAULT_PRIO : unbound_prio ? (int)(2*nblocks - 2*k) : STARPU_MAX_PRIO,
 						       STARPU_RW, data_handles[k][k],
@@ -233,21 +229,18 @@ static void run_cholesky_antidiagonal(starpu_data_handle_t **data_handles, int r
 		}
 
 		/* column within second antidiagonal for a */
-		for (b = bfirst; b <= a; b++)
+		for (n = nfirst; n <= a; n++)
 		{
-			/* column */
-			n = b;
 			/* row */
-			m = 2*a-b + 1;
+			m = 2*a-n + 1;
 
 			if (m >= nblocks)
 				/* Skip first item when even number of tiles */
 				continue;
 
 			/* Accumulate updates from TRSMs */
-			for (c = 0; c < n; c++)
+			for (k = 0; k < n; k++)
 			{
-				k = c;
 				starpu_mpi_task_insert(MPI_COMM_WORLD, &cl22,
 						       STARPU_PRIORITY, noprio ? STARPU_DEFAULT_PRIO : unbound_prio ? (int)(2*nblocks - 2*k - m - n) : ((n == k+1) && (m == k+1))?STARPU_MAX_PRIO:STARPU_DEFAULT_PRIO,
 						       STARPU_R, data_handles[n][k],
@@ -274,6 +267,82 @@ static void run_cholesky_antidiagonal(starpu_data_handle_t **data_handles, int r
 			starpu_data_wont_use(data_handles[m][n]);
 }
 
+/* TODO: generate from compiler polyhedral analysis of classical algorithm */
+static void run_cholesky_prio(starpu_data_handle_t **data_handles, int rank, int nodes)
+{
+	unsigned a;
+	int k, m, n;
+	unsigned unbound_prio = STARPU_MAX_PRIO == INT_MAX && STARPU_MIN_PRIO == INT_MIN;
+
+	/*
+	 * This is basically similar to above, except that we shift k according to the priorities set in the algorithm, so that prio ~ 2*a or 2*a+1
+	 * double-antidiagonal number:
+	 * - a=0 contains (0,0) plus (1,0)
+	 * - a=1 contains (2,0), (1,1) plus (3,0), (2, 1)
+	 * - etc.
+	 */
+	for (a = 0; a < 4*nblocks; a++)
+	{
+		starpu_iteration_push(a);
+
+		for (k = 0; k < nblocks; k++)
+		{
+			n = k;
+			/* Should be m = a-k-n; for potrf and trsm to respect
+			   priorities, but needs to be this for dependencies */
+			m = a-2*k-n;
+
+			if (m < 0 || m >= nblocks)
+				continue;
+
+			if (m == n)
+			{
+				/* diagonal block, factorize */
+				starpu_mpi_task_insert(MPI_COMM_WORLD, &cl11,
+						       STARPU_PRIORITY, noprio ? STARPU_DEFAULT_PRIO : unbound_prio ? (int)(2*nblocks - 2*k) : STARPU_MAX_PRIO,
+						       STARPU_RW, data_handles[k][k],
+						       0);
+			}
+			else
+			{
+				/* non-diagonal block, solve */
+				starpu_mpi_task_insert(MPI_COMM_WORLD, &cl21,
+						       STARPU_PRIORITY, noprio ? STARPU_DEFAULT_PRIO : unbound_prio ? (int)(2*nblocks - 2*k - m) : (m == k+1)?STARPU_MAX_PRIO:STARPU_DEFAULT_PRIO,
+						       STARPU_R, data_handles[k][k],
+						       STARPU_RW, data_handles[m][k],
+						       0);
+			}
+
+			/* column within antidiagonal for a */
+			for (n = k + 1; n < nblocks; n++)
+			{
+				/* row */
+				m = a-2*k-n;
+
+				if (m >= n && m < nblocks)
+				{
+					/* Update */
+					starpu_mpi_task_insert(MPI_COMM_WORLD, &cl22,
+							       STARPU_PRIORITY, noprio ? STARPU_DEFAULT_PRIO : unbound_prio ? (int)(2*nblocks - 2*k - m - n) : ((n == k+1) && (m == k+1))?STARPU_MAX_PRIO:STARPU_DEFAULT_PRIO,
+							       STARPU_R, data_handles[n][k],
+							       STARPU_R, data_handles[m][k],
+							       STARPU_RW | STARPU_COMMUTE, data_handles[m][n],
+							       0);
+				}
+			}
+
+		}
+
+		starpu_iteration_pop();
+	}
+
+	/* Submit flushes, StarPU will fit them according to the progress */
+	starpu_mpi_cache_flush_all_data(MPI_COMM_WORLD);
+	for (m = 0; m < nblocks; m++)
+		for (n = 0; n < nblocks ; n++)
+			starpu_data_wont_use(data_handles[m][n]);
+}
+
 /*
  *	code to bootstrap the factorization
  *	and construct the DAG
@@ -328,6 +397,7 @@ void dw_cholesky(float ***matA, unsigned ld, int rank, int nodes, double *timing
 		case TRIANGLES:		run_cholesky(data_handles, rank, nodes); break;
 		case COLUMNS:		run_cholesky_column(data_handles, rank, nodes); break;
 		case ANTIDIAGONALS:	run_cholesky_antidiagonal(data_handles, rank, nodes); break;
+		case PRIOS:		run_cholesky_prio(data_handles, rank, nodes); break;
 		default: STARPU_ABORT();
 	}
 

+ 6 - 1
mpi/examples/matrix_decomposition/mpi_decomposition_params.c

@@ -90,6 +90,11 @@ void parse_args(int argc, char **argv, int nodes)
                         submission = ANTIDIAGONALS;
                 }
 
+                if (strcmp(argv[i], "-prios") == 0)
+                {
+                        submission = PRIOS;
+                }
+
                 if (strcmp(argv[i], "-no-prio") == 0)
                 {
                         noprio = 1;
@@ -107,7 +112,7 @@ void parse_args(int argc, char **argv, int nodes)
 
                 if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0)
                 {
-                        printf("usage : %s [-size size] [-nblocks nblocks] [-columns] [-antidiagonals] [-no-prio] [-display] [-check]\n", argv[0]);
+                        printf("usage : %s [-size size] [-nblocks nblocks] [-columns] [-antidiagonals] [-prios] [-no-prio] [-display] [-check]\n", argv[0]);
                 }
         }
 

+ 1 - 0
mpi/examples/matrix_decomposition/mpi_decomposition_params.h

@@ -33,6 +33,7 @@ enum submission
 	TRIANGLES,
 	COLUMNS,
 	ANTIDIAGONALS,
+	PRIOS,
 };
 extern enum submission submission;
 

+ 15 - 0
mpi/examples/user_datatype/my_interface.c

@@ -15,6 +15,7 @@
  */
 
 #include <starpu.h>
+#include <starpu_mpi.h>
 
 #include "my_interface.h"
 
@@ -314,6 +315,7 @@ void starpu_my_data_register(starpu_data_handle_t *handleptr, unsigned home_node
 	if (interface_data_ops.interfaceid == STARPU_UNKNOWN_INTERFACE_ID)
 	{
 		interface_data_ops.interfaceid = starpu_data_interface_get_next_id();
+		starpu_mpi_interface_datatype_register(interface_data_ops.interfaceid, starpu_my_data_datatype_allocate, starpu_my_data_datatype_free);
 	}
 
 	struct starpu_my_data_interface data =
@@ -327,6 +329,12 @@ void starpu_my_data_register(starpu_data_handle_t *handleptr, unsigned home_node
 	starpu_data_register(handleptr, home_node, &data, &interface_data_ops);
 }
 
+void starpu_my_data_shutdown(void)
+{
+	starpu_mpi_interface_datatype_unregister(interface_data_ops.interfaceid);
+
+}
+
 static struct starpu_data_interface_ops interface_data2_ops =
 {
 	.register_data_handle = data_register_data_handle,
@@ -349,6 +357,7 @@ void starpu_my_data2_register(starpu_data_handle_t *handleptr, unsigned home_nod
 	if (interface_data2_ops.interfaceid == STARPU_UNKNOWN_INTERFACE_ID)
 	{
 		interface_data2_ops.interfaceid = starpu_data_interface_get_next_id();
+		starpu_mpi_interface_datatype_register(interface_data2_ops.interfaceid, starpu_my_data2_datatype_allocate, starpu_my_data2_datatype_free);
 	}
 
 	struct starpu_my_data_interface data =
@@ -361,3 +370,9 @@ void starpu_my_data2_register(starpu_data_handle_t *handleptr, unsigned home_nod
 
 	starpu_data_register(handleptr, home_node, &data, &interface_data2_ops);
 }
+
+void starpu_my_data2_shutdown(void)
+{
+	starpu_mpi_interface_datatype_unregister(interface_data2_ops.interfaceid);
+
+}

+ 3 - 0
mpi/examples/user_datatype/my_interface.h

@@ -76,4 +76,7 @@ static struct starpu_codelet starpu_my_data_compare_codelet =
 	.name = "starpu_my_data_compare_codelet"
 };
 
+void starpu_my_data_shutdown(void);
+void starpu_my_data2_shutdown(void);
+
 #endif /* __MY_INTERFACE_H */

+ 1 - 4
mpi/examples/user_datatype/user_datatype.c

@@ -57,9 +57,6 @@ int main(int argc, char **argv)
 
 	starpu_my_data_register(&handle0, STARPU_MAIN_RAM, &my0);
 	starpu_my_data_register(&handle1, -1, &my1);
-	starpu_mpi_datatype_register(handle1, starpu_my_data_datatype_allocate, starpu_my_data_datatype_free);
-
-	starpu_mpi_barrier(MPI_COMM_WORLD);
 
 	// Send data directly with MPI
 	if (rank == 0)
@@ -123,10 +120,10 @@ int main(int argc, char **argv)
 	starpu_mpi_wait_for_all(MPI_COMM_WORLD);
 	starpu_mpi_barrier(MPI_COMM_WORLD);
 
-	starpu_mpi_datatype_unregister(handle0);
 	starpu_data_unregister(handle0);
 	starpu_data_unregister(handle1);
 
+	starpu_my_data_shutdown();
 	starpu_mpi_shutdown();
 
 	if (rank == 0)

+ 1 - 1
mpi/examples/user_datatype/user_datatype2.c

@@ -57,7 +57,6 @@ int main(int argc, char **argv)
 
 	starpu_my_data2_register(&handle0, STARPU_MAIN_RAM, &my0);
 	starpu_my_data2_register(&handle1, -1, &my1);
-	starpu_mpi_datatype_register(handle1, starpu_my_data2_datatype_allocate, starpu_my_data2_datatype_free);
 
 	starpu_mpi_barrier(MPI_COMM_WORLD);
 
@@ -87,6 +86,7 @@ int main(int argc, char **argv)
 	starpu_data_unregister(handle0);
 	starpu_data_unregister(handle1);
 
+	starpu_my_data2_shutdown();
 	starpu_mpi_shutdown();
 
 	if (rank == 0)

+ 92 - 0
mpi/examples/user_datatype/user_datatype_early.c

@@ -0,0 +1,92 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2015-2020  Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <starpu_mpi.h>
+#include "my_interface.h"
+
+#define FPRINTF(ofile, fmt, ...) do { if (!getenv("STARPU_SSILENT")) {fprintf(ofile, fmt, ## __VA_ARGS__); }} while(0)
+
+int main(int argc, char **argv)
+{
+	int rank, nodes;
+	int ret=0;
+
+	ret = starpu_mpi_init_conf(&argc, &argv, 1, MPI_COMM_WORLD, NULL);
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_mpi_init_conf");
+	starpu_mpi_comm_rank(MPI_COMM_WORLD, &rank);
+	starpu_mpi_comm_size(MPI_COMM_WORLD, &nodes);
+
+	if (nodes < 2 || (starpu_cpu_worker_get_count() == 0))
+	{
+		if (rank == 0)
+		{
+			if (nodes < 2)
+				fprintf(stderr, "We need at least 2 processes.\n");
+			else
+				fprintf(stderr, "We need at least 1 CPU.\n");
+		}
+		starpu_mpi_shutdown();
+		return 77;
+	}
+
+	struct starpu_my_data my0 = {.d = 42 , .c = 'n'};
+	struct starpu_my_data my1 = {.d = 11 , .c = 'a'};
+
+	if (rank == 1)
+	{
+		my0.d *= 2;
+		my0.c += 1;
+		my1.d *= 2;
+		my1.c += 1;
+	}
+
+	starpu_data_handle_t handle0;
+	starpu_data_handle_t handle1;
+	starpu_my_data_register(&handle0, STARPU_MAIN_RAM, &my0);
+	starpu_my_data_register(&handle1, STARPU_MAIN_RAM, &my1);
+
+	if (rank == 0)
+	{
+		starpu_mpi_send(handle0, 1, 10, MPI_COMM_WORLD);
+		starpu_mpi_send(handle1, 1, 20, MPI_COMM_WORLD);
+	}
+	else if (rank == 1)
+	{
+		// We want handle0 to be received as early_data and as starpu_mpi_data_register() has not be called, it will be received as raw memory, and then unpacked with MPI_Unpack()
+		starpu_task_insert(&starpu_my_data_display_codelet, STARPU_VALUE, "node1 handle0 init value", strlen("node1 handle0 init value")+1, STARPU_R, handle0, 0);
+		starpu_task_insert(&starpu_my_data_display_codelet, STARPU_VALUE, "node1 handle1 init value", strlen("node1 handle1 init value")+1, STARPU_R, handle1, 0);
+		starpu_mpi_recv(handle1, 0, 20, MPI_COMM_WORLD, NULL);
+		starpu_mpi_recv(handle0, 0, 10, MPI_COMM_WORLD, NULL);
+		starpu_task_insert(&starpu_my_data_display_codelet, STARPU_VALUE, "node1 handle0 received value", strlen("node1 handle0 received value")+1, STARPU_R, handle0, 0);
+		starpu_task_insert(&starpu_my_data_display_codelet, STARPU_VALUE, "node1 handle1 received value", strlen("node1 handle1 received value")+1, STARPU_R, handle1, 0);
+	}
+
+	starpu_mpi_wait_for_all(MPI_COMM_WORLD);
+	starpu_mpi_barrier(MPI_COMM_WORLD);
+
+	starpu_data_unregister(handle0);
+	starpu_data_unregister(handle1);
+
+	if (rank == 1)
+	{
+		STARPU_ASSERT_MSG(my0.d == 42 && my0.c == 'n' && my1.d == 11 && my1.c == 'a', "Incorrect received values");
+	}
+
+	starpu_my_data_shutdown();
+	starpu_mpi_shutdown();
+
+	return 0;
+}

+ 1 - 0
mpi/include/starpu_mpi.h

@@ -349,6 +349,7 @@ typedef void (*starpu_mpi_datatype_free_func_t)(MPI_Datatype *);
 /**
    Register functions to create and free a MPI datatype for the given
    handle.
+   Similar to starpu_mpi_interface_datatype_register().
    It is important that the function is called before any
    communication can take place for a data with the given handle. See
    \ref ExchangingUserDefinedDataInterface for an example.

+ 0 - 1
mpi/src/mpi/starpu_mpi_early_data.c

@@ -86,7 +86,6 @@ struct _starpu_mpi_early_data_handle *_starpu_mpi_early_data_create(struct _star
 	_STARPU_MPI_CALLOC(early_data_handle, 1, sizeof(struct _starpu_mpi_early_data_handle));
 	STARPU_PTHREAD_MUTEX_INIT(&early_data_handle->req_mutex, NULL);
 	STARPU_PTHREAD_COND_INIT(&early_data_handle->req_cond, NULL);
-	early_data_handle->env = envelope;
 	early_data_handle->node_tag.node.comm = comm;
 	early_data_handle->node_tag.node.rank = source;
 	early_data_handle->node_tag.data_tag = envelope->data_tag;

+ 3 - 1
mpi/src/mpi/starpu_mpi_early_data.h

@@ -34,9 +34,9 @@ extern "C"
 
 LIST_TYPE(_starpu_mpi_early_data_handle,
 	  starpu_data_handle_t handle;
-	  struct _starpu_mpi_envelope *env;
 	  struct _starpu_mpi_req *req;
 	  void *buffer;
+	  size_t size;
 	  int req_ready;
 	  struct _starpu_mpi_node_tag node_tag;
 	  starpu_pthread_mutex_t req_mutex;
@@ -50,6 +50,8 @@ struct _starpu_mpi_early_data_handle_tag_hashlist
 	starpu_mpi_tag_t data_tag;
 };
 
+struct _starpu_mpi_envelope;
+
 void _starpu_mpi_early_data_init(void);
 void _starpu_mpi_early_data_check_termination(void);
 void _starpu_mpi_early_data_shutdown(void);

+ 26 - 6
mpi/src/mpi/starpu_mpi_mpi.c

@@ -132,6 +132,7 @@ struct _starpu_mpi_early_data_cb_args
 	starpu_data_handle_t early_handle;
 	struct _starpu_mpi_req *req;
 	void *buffer;
+	size_t size;
 };
 
 void _starpu_mpi_submit_ready_request_inc(struct _starpu_mpi_req *req)
@@ -248,6 +249,7 @@ void _starpu_mpi_submit_ready_request(void *arg)
 				cb_args->data_handle = req->data_handle;
 				cb_args->early_handle = early_data_handle->handle;
 				cb_args->buffer = early_data_handle->buffer;
+				cb_args->size = early_data_handle->size;
 				cb_args->req = req;
 
 				_STARPU_MPI_DEBUG(3, "Calling data_acquire_cb on starpu_mpi_copy_cb..\n");
@@ -915,9 +917,23 @@ static void _starpu_mpi_early_data_cb(void* arg)
 		/* Data has been received as a raw memory, it has to be unpacked */
 		struct starpu_data_interface_ops *itf_src = starpu_data_get_interface_ops(args->early_handle);
 		struct starpu_data_interface_ops *itf_dst = starpu_data_get_interface_ops(args->data_handle);
-		STARPU_MPI_ASSERT_MSG(itf_dst->unpack_data, "The data interface does not define an unpack function\n");
-		itf_dst->unpack_data(args->data_handle, STARPU_MAIN_RAM, args->buffer, itf_src->get_size(args->early_handle));
-		args->buffer = NULL;
+		MPI_Datatype datatype = _starpu_mpi_datatype_get_user_defined_datatype(args->data_handle);
+
+		if (datatype)
+		{
+			int position=0;
+			void *ptr = starpu_data_get_local_ptr(args->data_handle);
+			MPI_Unpack(args->buffer, itf_src->get_size(args->early_handle), &position, ptr, 1, datatype, args->req->node_tag.node.comm);
+			starpu_free_on_node_flags(STARPU_MAIN_RAM, (uintptr_t) args->buffer, args->size, 0);
+			args->buffer = NULL;
+			_starpu_mpi_datatype_free(args->data_handle, &datatype);
+		}
+		else
+		{
+			STARPU_MPI_ASSERT_MSG(itf_dst->unpack_data, "The data interface does not define an unpack function\n");
+			itf_dst->unpack_data(args->data_handle, STARPU_MAIN_RAM, args->buffer, itf_src->get_size(args->early_handle));
+			args->buffer = NULL;
+		}
 	}
 	else
 	{
@@ -941,6 +957,9 @@ static void _starpu_mpi_early_data_cb(void* arg)
 	starpu_data_release(args->early_handle);
 
 	_STARPU_MPI_DEBUG(3, "Done, handling unregister of early_handle..\n");
+	/* XXX: note that we have already freed the registered buffer above. In
+	 * principle that's unsafe. As of now it is fine because StarPU has no
+	 reason to access it. */
 	starpu_data_unregister_submit(args->early_handle);
 
 	_STARPU_MPI_DEBUG(3, "Done, handling request %p termination of the already received request\n",args->req);
@@ -1130,9 +1149,10 @@ static void _starpu_mpi_receive_early_data(struct _starpu_mpi_envelope *envelope
 		 * we are going to receive the data as a raw memory, and give it
 		 * to the application when it post a receive for this tag
 		 */
-		_STARPU_MPI_DEBUG(3, "Posting a receive for a data of size %d which has not yet been registered\n", (int)early_data_handle->env->size);
-		early_data_handle->buffer = (void *)starpu_malloc_on_node_flags(STARPU_MAIN_RAM, early_data_handle->env->size, 0);
-		starpu_variable_data_register(&early_data_handle->handle, STARPU_MAIN_RAM, (uintptr_t) early_data_handle->buffer, early_data_handle->env->size);
+		_STARPU_MPI_DEBUG(3, "Posting a receive for a data of size %d which has not yet been registered\n", (int)envelope->size);
+		early_data_handle->buffer = (void *)starpu_malloc_on_node_flags(STARPU_MAIN_RAM, envelope->size, 0);
+		early_data_handle->size = envelope->size;
+		starpu_variable_data_register(&early_data_handle->handle, STARPU_MAIN_RAM, (uintptr_t) early_data_handle->buffer, envelope->size);
 		//_starpu_mpi_early_data_add(early_data_handle);
 	}
 

+ 21 - 0
mpi/src/starpu_mpi_datatype.c

@@ -208,6 +208,27 @@ static starpu_mpi_datatype_allocate_func_t handle_to_datatype_funcs[STARPU_MAX_I
 	[STARPU_MULTIFORMAT_INTERFACE_ID] = NULL,
 };
 
+MPI_Datatype _starpu_mpi_datatype_get_user_defined_datatype(starpu_data_handle_t data_handle)
+{
+	enum starpu_data_interface_id id = starpu_data_get_interface_id(data_handle);
+	if (id < STARPU_MAX_INTERFACE_ID) return 0;
+
+	struct _starpu_mpi_datatype_funcs *table;
+	STARPU_PTHREAD_MUTEX_LOCK(&_starpu_mpi_datatype_funcs_table_mutex);
+	HASH_FIND_INT(_starpu_mpi_datatype_funcs_table, &id, table);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&_starpu_mpi_datatype_funcs_table_mutex);
+	if (table && table->allocate_datatype_func)
+	{
+		MPI_Datatype datatype;
+		int ret = table->allocate_datatype_func(data_handle, &datatype);
+		if (ret == 0)
+			return datatype;
+		else
+			return 0;
+	}
+	return 0;
+}
+
 void _starpu_mpi_datatype_allocate(starpu_data_handle_t data_handle, struct _starpu_mpi_req *req)
 {
 	enum starpu_data_interface_id id = starpu_data_get_interface_id(data_handle);

+ 2 - 0
mpi/src/starpu_mpi_datatype.h

@@ -31,6 +31,8 @@ void _starpu_mpi_datatype_shutdown(void);
 void _starpu_mpi_datatype_allocate(starpu_data_handle_t data_handle, struct _starpu_mpi_req *req);
 void _starpu_mpi_datatype_free(starpu_data_handle_t data_handle, MPI_Datatype *datatype);
 
+MPI_Datatype _starpu_mpi_datatype_get_user_defined_datatype(starpu_data_handle_t data_handle);
+
 #ifdef __cplusplus
 }
 #endif

+ 2 - 2
mpi/tests/Makefile.am

@@ -137,13 +137,13 @@ starpu_mpi_TESTS +=				\
 	ring_sync_detached			\
 	temporary				\
 	user_defined_datatype			\
-	early_stuff				\
-	display_bindings
+	early_stuff
 
 if !STARPU_SIMGRID
 # missing support in simgrid
 starpu_mpi_TESTS +=				\
 	attr					\
+	display_bindings			\
 	mpi_earlyrecv2				\
 	mpi_earlyrecv2_sync			\
 	block_interface				\

+ 1 - 1
src/common/utils.c

@@ -528,7 +528,7 @@ void _starpu_gethostname(char *hostname, size_t size)
 
 	if (force_mpi_hostnames && force_mpi_hostnames[0])
 	{
-		char *host, *srv_hosts;
+		char *host=NULL, *srv_hosts;
 		srv_hosts = strdup(force_mpi_hostnames);
 		int rank;
 		if (starpu_mpi_world_rank)

+ 4 - 0
src/core/perfmodel/perfmodel_bus.c

@@ -329,9 +329,11 @@ static void measure_bandwidth_between_dev_and_dev_cuda(int src, int dst)
 	if (starpu_get_env_number("STARPU_ENABLE_CUDA_GPU_GPU_DIRECT") != 0)
 	{
 		cures = cudaDeviceCanAccessPeer(&can, src, dst);
+		(void) cudaGetLastError();
 		if (!cures && can)
 		{
 			cures = cudaDeviceEnablePeerAccess(dst, 0);
+			(void) cudaGetLastError();
 			if (!cures)
 			{
 				_STARPU_DISP("GPU-Direct %d -> %d\n", dst, src);
@@ -355,9 +357,11 @@ static void measure_bandwidth_between_dev_and_dev_cuda(int src, int dst)
 	if (starpu_get_env_number("STARPU_ENABLE_CUDA_GPU_GPU_DIRECT") != 0)
 	{
 		cures = cudaDeviceCanAccessPeer(&can, dst, src);
+		(void) cudaGetLastError();
 		if (!cures && can)
 		{
 			cures = cudaDeviceEnablePeerAccess(src, 0);
+			(void) cudaGetLastError();
 			if (!cures)
 			{
 				_STARPU_DISP("GPU-Direct %d -> %d\n", src, dst);

+ 9 - 0
src/drivers/cuda/driver_cuda.c

@@ -336,9 +336,13 @@ static void init_device_context(unsigned devid, unsigned memnode)
 			{
 				int can;
 				cures = cudaDeviceCanAccessPeer(&can, devid, worker->devid);
+				(void) cudaGetLastError();
+
 				if (!cures && can)
 				{
 					cures = cudaDeviceEnablePeerAccess(worker->devid, 0);
+					(void) cudaGetLastError();
+
 					if (!cures)
 					{
 						_STARPU_DEBUG("Enabled GPU-Direct %d -> %d\n", worker->devid, devid);
@@ -1170,6 +1174,7 @@ starpu_cuda_copy_async_sync(void *src_ptr, unsigned src_node,
 		{
 			cures = cudaMemcpyAsync((char *)dst_ptr, (char *)src_ptr, ssize, kind, stream);
 		}
+		(void) cudaGetLastError();
 		starpu_interface_end_driver_copy_async(src_node, dst_node, start);
 	}
 
@@ -1189,6 +1194,7 @@ starpu_cuda_copy_async_sync(void *src_ptr, unsigned src_node,
 		{
 			cures = cudaMemcpy((char *)dst_ptr, (char *)src_ptr, ssize, kind);
 		}
+		(void) cudaGetLastError();
 
 		if (!cures)
 			cures = cudaDeviceSynchronize();
@@ -1246,12 +1252,14 @@ starpu_cuda_copy2d_async_sync(void *src_ptr, unsigned src_node,
 			double start;
 			starpu_interface_start_driver_copy_async(src_node, dst_node, &start);
 			cures = cudaMemcpy3DPeerAsync(&p, stream);
+			(void) cudaGetLastError();
 		}
 
 		/* Test if the asynchronous copy has failed or if the caller only asked for a synchronous copy */
 		if (stream == NULL || cures)
 		{
 			cures = cudaMemcpy3DPeer(&p);
+			(void) cudaGetLastError();
 
 			if (!cures)
 				cures = cudaDeviceSynchronize();
@@ -1343,6 +1351,7 @@ starpu_cuda_copy3d_async_sync(void *src_ptr, unsigned src_node,
 		if (stream == NULL || cures)
 		{
 			cures = cudaMemcpy3DPeer(&p);
+			(void) cudaGetLastError();
 
 			if (!cures)
 				cures = cudaDeviceSynchronize();

+ 4 - 1
tests/Makefile.am

@@ -148,7 +148,6 @@ myPROGRAMS =
 myPROGRAMS +=					\
 	main/bind				\
 	main/mkdtemp				\
-	main/display_binding			\
 	main/execute_schedule			\
 	main/insert_task_pack			\
 	main/insert_task_nullcodelet		\
@@ -225,6 +224,7 @@ myPROGRAMS +=				\
 	main/driver_api/init_run_deinit         \
 	main/driver_api/run_driver              \
 	main/deploop                            \
+	main/display_binding			\
 	main/execute_on_a_specific_worker	\
 	main/insert_task			\
 	main/insert_task_value			\
@@ -415,6 +415,9 @@ endif
 endif
 
 examplebin_PROGRAMS = \
+	microbenchs/async_tasks_overhead	\
+	microbenchs/sync_tasks_overhead		\
+	microbenchs/tasks_overhead		\
 	microbenchs/tasks_size_overhead		\
 	microbenchs/local_pingpong
 examplebin_SCRIPTS = \

+ 2 - 0
tests/datawizard/lazy_unregister.c

@@ -67,6 +67,8 @@ int main(void)
 	STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");
 
 	starpu_data_unregister_submit(handle);
+	/* Note: we have no way to know when this will happen. We have to wait
+	 * for starpu_shutdown before being able to free the registered buffer */
 
 	ret = starpu_task_submit(t1);
 	if (ret == -ENODEV)