Browse Source

Add MPI bench to measure impact of computing workers on communications

Philippe SWARTVAGHER 5 years ago
parent
commit
1bfbe4ee09

+ 18 - 2
mpi/tests/Makefile.am

@@ -137,7 +137,8 @@ starpu_mpi_TESTS +=				\
 	temporary				\
 	user_defined_datatype			\
 	early_stuff				\
-	sendrecv_bench
+	sendrecv_bench				\
+	sendrecv_gemm_bench
 
 if !STARPU_SIMGRID
 # missing support in simgrid
@@ -226,7 +227,8 @@ noinst_PROGRAMS =				\
 	starpu_redefine				\
 	load_balancer				\
 	driver					\
-	sendrecv_bench
+	sendrecv_bench				\
+	sendrecv_gemm_bench
 
 XFAIL_TESTS=					\
 	policy_register_toomany			\
@@ -256,4 +258,18 @@ mpi_earlyrecv2_SOURCES = mpi_earlyrecv2.c
 mpi_earlyrecv2_SOURCES += ../../examples/interface/complex_interface.c
 mpi_earlyrecv2_sync_SOURCES = mpi_earlyrecv2_sync.c
 mpi_earlyrecv2_sync_SOURCES += ../../examples/interface/complex_interface.c
+
+sendrecv_bench_SOURCES = sendrecv_bench.c
+sendrecv_bench_SOURCES += bench_helper.c
+sendrecv_bench_SOURCES += abstract_sendrecv_bench.c
+
+if !NO_BLAS_LIB
+sendrecv_gemm_bench_SOURCES = sendrecv_gemm_bench.c
+sendrecv_gemm_bench_SOURCES += bench_helper.c
+sendrecv_gemm_bench_SOURCES += abstract_sendrecv_bench.c
+sendrecv_gemm_bench_SOURCES += ../../examples/common/blas.c
+
+sendrecv_gemm_bench_LDADD = $(STARPU_BLAS_LDFLAGS)
+endif
+
 endif

+ 136 - 0
mpi/tests/abstract_sendrecv_bench.c

@@ -0,0 +1,136 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2020  Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include "bench_helper.h"
+#include "abstract_sendrecv_bench.h"
+
+
+
+void sendrecv_bench(int mpi_rank, starpu_pthread_barrier_t* thread_barrier)
+{
+	uint64_t iterations = LOOPS_DEFAULT;
+
+	if (mpi_rank >= 2)
+	{
+		for (uint64_t s = NX_MIN; s <= NX_MAX; s = bench_next_size(s))
+		{
+			iterations = bench_nb_iterations(iterations, s);
+
+			starpu_mpi_barrier(MPI_COMM_WORLD);
+
+			for (uint64_t j = 0; j < iterations; j++)
+			{
+				starpu_mpi_barrier(MPI_COMM_WORLD);
+			}
+		}
+
+		return;
+	}
+
+	if (mpi_rank == 0)
+	{
+		printf("Times in us\n");
+		printf("# size  (Bytes)\t|  latency \t| 10^6 B/s \t| MB/s   \t| d1    \t|median  \t| avg    \t| d9    \t| max\n");
+	}
+
+	int array_size = 0;
+	starpu_data_handle_t handle_send, handle_recv;
+	float* vector_send = NULL;
+	float* vector_recv = NULL;
+	double t1, t2, global_tstart, global_tend;
+	double* lats = malloc(sizeof(double) * LOOPS_DEFAULT);
+
+	if (thread_barrier != NULL)
+	{
+		STARPU_PTHREAD_BARRIER_WAIT(thread_barrier);
+	}
+
+	global_tstart = starpu_timing_now();
+	for (uint64_t s = NX_MIN; s <= NX_MAX; s = bench_next_size(s))
+	{
+		vector_send = malloc(s);
+		vector_recv = malloc(s);
+		memset(vector_send, 0, s);
+		memset(vector_recv, 0, s);
+
+		starpu_vector_data_register(&handle_send, STARPU_MAIN_RAM, (uintptr_t) vector_send, s, 1);
+		starpu_vector_data_register(&handle_recv, STARPU_MAIN_RAM, (uintptr_t) vector_recv, s, 1);
+
+		iterations = bench_nb_iterations(iterations, s);
+
+		starpu_mpi_barrier(MPI_COMM_WORLD);
+
+		for (uint64_t j = 0; j < iterations; j++)
+		{
+			if (mpi_rank == 0)
+			{
+				t1 = starpu_timing_now();
+				starpu_mpi_send(handle_send, 1, 0, MPI_COMM_WORLD);
+				starpu_mpi_recv(handle_recv, 1, 1, MPI_COMM_WORLD, NULL);
+				t2 = starpu_timing_now();
+
+				const double t = (t2 -t1) / 2;
+
+				lats[j] = t;
+			}
+			else
+			{
+				starpu_mpi_recv(handle_recv, 0, 0, MPI_COMM_WORLD, NULL);
+				starpu_mpi_send(handle_send, 0, 1, MPI_COMM_WORLD);
+			}
+
+			starpu_mpi_barrier(MPI_COMM_WORLD);
+		}
+
+		if (mpi_rank == 0)
+		{
+			qsort(lats, iterations, sizeof(double), &comp_double);
+
+			const double min_lat = lats[0];
+			const double max_lat = lats[iterations - 1];
+			const double med_lat = lats[(iterations - 1) / 2];
+			const double d1_lat = lats[(iterations - 1) / 10];
+			const double d9_lat = lats[9 * (iterations - 1) / 10];
+			double avg_lat = 0.0;
+
+			for(uint64_t k = 0; k < iterations; k++)
+			{
+				avg_lat += lats[k];
+			}
+
+			avg_lat /= iterations;
+			const double bw_million_byte = s / min_lat;
+			const double bw_mbyte        = bw_million_byte / 1.048576;
+
+			printf("%9lld\t%9.3lf\t%9.3f\t%9.3f\t%9.3lf\t%9.3lf\t%9.3lf\t%9.3lf\t%9.3lf\n",
+				(long long)s, min_lat, bw_million_byte, bw_mbyte, d1_lat, med_lat, avg_lat, d9_lat, max_lat);
+			fflush(stdout);
+		}
+		starpu_data_unregister(handle_recv);
+		starpu_data_unregister(handle_send);
+
+		free(vector_send);
+		free(vector_recv);
+	}
+	global_tend = starpu_timing_now();
+
+	if (mpi_rank == 0)
+	{
+		printf("Comm bench took %9.3lf ms\n", (global_tend - global_tstart) / 1000);
+	}
+
+	free(lats);
+}

+ 21 - 0
mpi/tests/abstract_sendrecv_bench.h

@@ -0,0 +1,21 @@
+
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2020  Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <starpu.h>
+
+
+void sendrecv_bench(int mpi_rank, starpu_pthread_barrier_t* thread_barrier);

+ 62 - 0
mpi/tests/bench_helper.c

@@ -0,0 +1,62 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2020  Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include "bench_helper.h"
+
+
+int comp_double(const void*_a, const void*_b)
+{
+	const double* a = _a;
+	const double* b = _b;
+
+	if(*a < *b)
+		return -1;
+	else if(*a > *b)
+		return 1;
+	else
+		return 0;
+}
+
+
+uint64_t bench_next_size(uint64_t len)
+{
+	uint64_t next = len * MULT_DEFAULT + INCR_DEFAULT;
+
+	if(next <= len)
+		next++;
+
+	return next;
+}
+
+
+uint64_t bench_nb_iterations(int iterations, uint64_t len)
+{
+	const uint64_t max_data = NX_MAX;
+
+	if(len <= 0)
+		len = 1;
+
+	uint64_t data_size = ((uint64_t)iterations * (uint64_t)len);
+
+	if(data_size  > max_data)
+	{
+		iterations = (max_data / (uint64_t)len);
+		if(iterations < 2)
+			iterations = 2;
+	}
+
+	return iterations;
+}

+ 38 - 0
mpi/tests/bench_helper.h

@@ -0,0 +1,38 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2020  Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <math.h>
+#include <starpu_mpi.h>
+#include "helper.h"
+
+#define NX_MAX (512 * 1024 * 1024) // kB
+#define NX_MIN 0
+#ifdef STARPU_QUICK_CHECK
+#define MULT_DEFAULT 4
+#else
+#define MULT_DEFAULT 2
+#endif
+#define INCR_DEFAULT 0
+#define NX_STEP 1.4 // multiplication
+#ifdef STARPU_QUICK_CHECK
+#define LOOPS_DEFAULT 100
+#else
+#define LOOPS_DEFAULT 100000
+#endif
+
+int comp_double(const void*_a, const void*_b);
+uint64_t bench_next_size(uint64_t len);
+uint64_t bench_nb_iterations(int iterations, uint64_t len);

+ 6 - 168
mpi/tests/sendrecv_bench.c

@@ -18,84 +18,15 @@
  * Inspired a lot from NewMadeleine examples/benchmarks/nm_bench_sendrecv.c
  */
 
-#include <math.h>
 #include <starpu_mpi.h>
 #include "helper.h"
+#include "abstract_sendrecv_bench.h"
 
-#define NX_MAX (512 * 1024 * 1024) // kB
-#define NX_MIN 0
-#ifdef STARPU_QUICK_CHECK
-#define MULT_DEFAULT 4
-#else
-#define MULT_DEFAULT 2
-#endif
-#define INCR_DEFAULT 0
-#define NX_STEP 1.4 // multiplication
-#ifdef STARPU_QUICK_CHECK
-#define LOOPS_DEFAULT 100
-#else
-#define LOOPS_DEFAULT 10000
-#endif
-
-int times_nb_nodes;
-int times_size;
-int worldsize;
-
-static int comp_double(const void*_a, const void*_b)
-{
-	const double* a = _a;
-	const double* b = _b;
-
-	if(*a < *b)
-		return -1;
-	else if(*a > *b)
-		return 1;
-	else
-		return 0;
-}
-
-static inline uint64_t _next(uint64_t len, double multiplier, uint64_t increment)
-{
-	uint64_t next = len * multiplier + increment;
-
-	if(next <= len)
-		next++;
-
-	return next;
-}
-
-
-static inline uint64_t _iterations(int iterations, uint64_t len)
-{
-	const uint64_t max_data = 512 * 1024 * 1024;
-
-	if(len <= 0)
-		len = 1;
-
-	uint64_t data_size = ((uint64_t)iterations * (uint64_t)len);
-
-	if(data_size  > max_data)
-	{
-		iterations = (max_data / (uint64_t)len);
-		if(iterations < 2)
-			iterations = 2;
-	}
-
-	return iterations;
-}
 
 int main(int argc, char **argv)
 {
-	int ret, rank;
-	starpu_data_handle_t handle_send, handle_recv;
+	int ret, rank, worldsize;
 	int mpi_init;
-	float* vector_send = NULL;
-	float* vector_recv = NULL;
-	double t1, t2;
-	double* lats = malloc(sizeof(double) * LOOPS_DEFAULT);
-	uint64_t iterations = LOOPS_DEFAULT;
-	double multiplier = MULT_DEFAULT;
-	uint64_t increment = INCR_DEFAULT;
 
 	MPI_INIT_THREAD(&argc, &argv, MPI_THREAD_SERIALIZED, &mpi_init);
 	ret = starpu_mpi_init_conf(&argc, &argv, mpi_init, MPI_COMM_WORLD, NULL);
@@ -115,108 +46,15 @@ int main(int argc, char **argv)
 		return STARPU_TEST_SKIPPED;
 	}
 
-	if (rank >= 2)
-	{
-		starpu_pause();
-		for (uint64_t s = NX_MIN; s <= NX_MAX; s = _next(s, multiplier, increment))
-		{
-			iterations = _iterations(iterations, s);
-
-			starpu_mpi_barrier(MPI_COMM_WORLD);
-
-			for (uint64_t j = 0; j < iterations; j++)
-			{
-				starpu_mpi_barrier(MPI_COMM_WORLD);
-			}
-		}
-		starpu_resume();
-
-		starpu_mpi_shutdown();
-		if (!mpi_init)
-			MPI_Finalize();
-		return 0;
-	}
-
-	if (rank == 0)
-	{
-		printf("Times in us\n");
-		printf("# size  (Bytes)\t|  latency \t| 10^6 B/s \t| MB/s   \t| d1    \t|median  \t| avg    \t| d9    \t| max\n");
-	}
-
-	int array_size = 0;
-
-	for (uint64_t s = NX_MIN; s <= NX_MAX; s = _next(s, multiplier, increment))
-	{
-		vector_send = malloc(s);
-		vector_recv = malloc(s);
-		memset(vector_send, 0, s);
-		memset(vector_recv, 0, s);
-
-		starpu_vector_data_register(&handle_send, STARPU_MAIN_RAM, (uintptr_t) vector_send, s, 1);
-		starpu_vector_data_register(&handle_recv, STARPU_MAIN_RAM, (uintptr_t) vector_recv, s, 1);
-
-		iterations = _iterations(iterations, s);
+	/* Pause workers for this bench: all workers polling for tasks has a strong impact on performances */
+	starpu_pause();
 
-		starpu_mpi_barrier(MPI_COMM_WORLD);
-
-		for (uint64_t j = 0; j < iterations; j++)
-		{
-			if (rank == 0)
-			{
-				t1 = starpu_timing_now();
-				starpu_mpi_send(handle_send, 1, 0, MPI_COMM_WORLD);
-				starpu_mpi_recv(handle_recv, 1, 1, MPI_COMM_WORLD, NULL);
-				t2 = starpu_timing_now();
-
-				const double delay = t2 - t1;
-				const double t = delay / 2;
-
-				lats[j] = t;
-			}
-			else
-			{
-				starpu_mpi_recv(handle_recv, 0, 0, MPI_COMM_WORLD, NULL);
-				starpu_mpi_send(handle_send, 0, 1, MPI_COMM_WORLD);
-			}
-
-			starpu_mpi_barrier(MPI_COMM_WORLD);
-		}
-
-		if (rank == 0)
-		{
-			qsort(lats, iterations, sizeof(double), &comp_double);
-
-			const double min_lat = lats[0];
-			const double max_lat = lats[iterations - 1];
-			const double med_lat = lats[(iterations - 1) / 2];
-			const double d1_lat = lats[(iterations - 1) / 10];
-			const double d9_lat = lats[9 * (iterations - 1) / 10];
-			double avg_lat = 0.0;
-
-			for(uint64_t k = 0; k < iterations; k++)
-			{
-				avg_lat += lats[k];
-			}
-
-			avg_lat /= iterations;
-			const double bw_million_byte = s / min_lat;
-			const double bw_mbyte        = bw_million_byte / 1.048576;
-
-			printf("%9lld\t%9.3lf\t%9.3f\t%9.3f\t%9.3lf\t%9.3lf\t%9.3lf\t%9.3lf\t%9.3lf\n",
-				(long long)s, min_lat, bw_million_byte, bw_mbyte, d1_lat, med_lat, avg_lat, d9_lat, max_lat);
-			fflush(stdout);
-		}
-		starpu_data_unregister(handle_recv);
-		starpu_data_unregister(handle_send);
-
-		free(vector_send);
-		free(vector_recv);
-	}
+	sendrecv_bench(rank, NULL);
 
+	starpu_resume();
 	starpu_mpi_shutdown();
 	if (!mpi_init)
 		MPI_Finalize();
 
-	free(lats);
 	return 0;
 }

+ 462 - 0
mpi/tests/sendrecv_gemm_bench.c

@@ -0,0 +1,462 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2009-2020  Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+/*
+ * Simple *not distributed* parallel GEMM implementation and sendrecv bench at the same time.
+ *
+ * This bench is a merge of mpi/tests/sendrecv_bench and examples/mult/sgemm
+ *
+ * A *non-distributed* GEMM is computed on each node, while a sendrecv bench is running,
+ * completely independently. The goal is to measure the impact of worker computations on
+ * communications.
+ *
+ * Use the -nblocks parameter to define the matrix size (matrix size = nblocks * 320), such as
+ * the GEMM finishes after the sendrecv bench.
+ */
+#include <limits.h>
+#include <string.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <starpu_mpi.h>
+#include <starpu_fxt.h>
+
+#include <common/blas.h>
+
+#include "helper.h"
+#include "abstract_sendrecv_bench.h"
+#include "../../examples/mult/simple.h"
+
+#define CHECK_TASK_SUBMIT(ret) do {				\
+	if (ret == -ENODEV)					\
+	{							\
+		ret = 77;					\
+		goto enodev;					\
+	}							\
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");	\
+} while(0)
+
+static int mpi_rank;
+static int comm_thread_cpuid = -1;
+static unsigned nslices = 4;
+#if defined(STARPU_QUICK_CHECK) && !defined(STARPU_SIMGRID)
+static unsigned matrix_dim = 256;
+#else
+static unsigned matrix_dim = 320 * 4;
+#endif
+static unsigned check = 0;
+
+static TYPE *A, *B, *C;
+static starpu_data_handle_t A_handle, B_handle, C_handle;
+
+static starpu_pthread_barrier_t thread_barrier;
+
+#define FPRINTF(ofile, fmt, ...) do { if (!getenv("STARPU_SSILENT")) {fprintf(ofile, fmt, ## __VA_ARGS__); }} while(0)
+#define PRINTF(fmt, ...) do { if (!getenv("STARPU_SSILENT")) {printf(fmt, ## __VA_ARGS__); fflush(stdout); }} while(0)
+
+static void check_output(void)
+{
+	/* compute C = C - AB */
+	CPU_GEMM("N", "N", matrix_dim, matrix_dim, matrix_dim, (TYPE)-1.0f, A, matrix_dim, B, matrix_dim, (TYPE)1.0f, C, matrix_dim);
+
+	/* make sure C = 0 */
+	TYPE err;
+	err = CPU_ASUM(matrix_dim*matrix_dim, C, 1);
+
+	if (err < matrix_dim*matrix_dim*0.001)
+	{
+		FPRINTF(stderr, "Results are OK\n");
+	}
+	else
+	{
+		int max;
+		max = CPU_IAMAX(matrix_dim*matrix_dim, C, 1);
+
+		FPRINTF(stderr, "There were errors ... err = %f\n", err);
+		FPRINTF(stderr, "Max error : %e\n", C[max]);
+	}
+}
+
+static void init_problem_data(void)
+{
+#ifndef STARPU_SIMGRID
+	unsigned i,j;
+#endif
+
+	starpu_malloc_flags((void **)&A, matrix_dim*matrix_dim*sizeof(TYPE), STARPU_MALLOC_PINNED|STARPU_MALLOC_SIMULATION_FOLDED);
+	starpu_malloc_flags((void **)&B, matrix_dim*matrix_dim*sizeof(TYPE), STARPU_MALLOC_PINNED|STARPU_MALLOC_SIMULATION_FOLDED);
+	starpu_malloc_flags((void **)&C, matrix_dim*matrix_dim*sizeof(TYPE), STARPU_MALLOC_PINNED|STARPU_MALLOC_SIMULATION_FOLDED);
+
+#ifndef STARPU_SIMGRID
+	/* fill the matrices */
+	for (j=0; j < matrix_dim; j++)
+	{
+		for (i=0; i < matrix_dim; i++)
+		{
+			A[j+i*matrix_dim] = (TYPE)(starpu_drand48());
+			B[j+i*matrix_dim] = (TYPE)(starpu_drand48());
+			C[j+i*matrix_dim] = (TYPE)(0);
+		}
+	}
+#endif
+}
+
+static void partition_mult_data(void)
+{
+	starpu_matrix_data_register(&A_handle, STARPU_MAIN_RAM, (uintptr_t)A,
+		matrix_dim, matrix_dim, matrix_dim, sizeof(TYPE));
+	starpu_matrix_data_register(&B_handle, STARPU_MAIN_RAM, (uintptr_t)B,
+		matrix_dim, matrix_dim, matrix_dim, sizeof(TYPE));
+	starpu_matrix_data_register(&C_handle, STARPU_MAIN_RAM, (uintptr_t)C,
+		matrix_dim, matrix_dim, matrix_dim, sizeof(TYPE));
+
+	struct starpu_data_filter vert;
+	memset(&vert, 0, sizeof(vert));
+	vert.filter_func = starpu_matrix_filter_vertical_block;
+	vert.nchildren = nslices;
+
+	struct starpu_data_filter horiz;
+	memset(&horiz, 0, sizeof(horiz));
+	horiz.filter_func = starpu_matrix_filter_block;
+	horiz.nchildren = nslices;
+
+	starpu_data_partition(B_handle, &vert);
+	starpu_data_partition(A_handle, &horiz);
+
+	starpu_data_map_filters(C_handle, 2, &vert, &horiz);
+}
+
+
+void cpu_init_matrix_random(void *descr[], void *arg)
+{
+	(void)arg;
+	TYPE *subA = (TYPE *)STARPU_MATRIX_GET_PTR(descr[0]);
+	TYPE *subB = (TYPE *)STARPU_MATRIX_GET_PTR(descr[1]);
+	unsigned nx = STARPU_MATRIX_GET_NX(descr[0]);
+	unsigned ny = STARPU_MATRIX_GET_NY(descr[0]);
+
+	for (unsigned i = 0; i < nx *ny; i++)
+	{
+		subA[i] = (TYPE) (starpu_drand48());
+		subB[i] = (TYPE) (starpu_drand48());
+	}
+}
+
+
+void cpu_init_matrix_zero(void *descr[], void *arg)
+{
+	(void)arg;
+	TYPE *subA = (TYPE *)STARPU_MATRIX_GET_PTR(descr[0]);
+	unsigned nx = STARPU_MATRIX_GET_NX(descr[0]);
+	unsigned ny = STARPU_MATRIX_GET_NY(descr[0]);
+
+	for (unsigned i = 0; i < nx *ny; i++)
+	{
+		subA[i] = (TYPE) (0);
+	}
+}
+
+
+void cpu_mult(void *descr[], void *arg)
+{
+	(void)arg;
+	TYPE *subA = (TYPE *)STARPU_MATRIX_GET_PTR(descr[0]);
+	TYPE *subB = (TYPE *)STARPU_MATRIX_GET_PTR(descr[1]);
+	TYPE *subC = (TYPE *)STARPU_MATRIX_GET_PTR(descr[2]);
+
+	unsigned nxC = STARPU_MATRIX_GET_NX(descr[2]);
+	unsigned nyC = STARPU_MATRIX_GET_NY(descr[2]);
+	unsigned nyA = STARPU_MATRIX_GET_NY(descr[0]);
+
+	unsigned ldA = STARPU_MATRIX_GET_LD(descr[0]);
+	unsigned ldB = STARPU_MATRIX_GET_LD(descr[1]);
+	unsigned ldC = STARPU_MATRIX_GET_LD(descr[2]);
+
+	int worker_size = starpu_combined_worker_get_size();
+
+	if (worker_size == 1)
+	{
+		/* Sequential CPU task */
+		CPU_GEMM("N", "N", nxC, nyC, nyA, (TYPE)1.0, subA, ldA, subB, ldB, (TYPE)0.0, subC, ldC);
+	}
+	else
+	{
+		/* Parallel CPU task */
+		unsigned rank = starpu_combined_worker_get_rank();
+
+		unsigned block_size = (nyC + worker_size - 1)/worker_size;
+		unsigned new_nyC = STARPU_MIN(nyC, block_size*(rank+1)) - block_size*rank;
+
+		STARPU_ASSERT(nyC == STARPU_MATRIX_GET_NY(descr[1]));
+
+		TYPE *new_subB = &subB[block_size*rank];
+		TYPE *new_subC = &subC[block_size*rank];
+
+		CPU_GEMM("N", "N", nxC, new_nyC, nyA, (TYPE)1.0, subA, ldA, new_subB, ldB, (TYPE)0.0, new_subC, ldC);
+	}
+}
+
+static struct starpu_perfmodel starpu_gemm_model =
+{
+	.type = STARPU_HISTORY_BASED,
+	.symbol = STARPU_GEMM_STR(gemm)
+};
+
+static struct starpu_codelet cl =
+{
+	.type = STARPU_SEQ, /* changed to STARPU_SPMD if -spmd is passed */
+	.max_parallelism = INT_MAX,
+	.cpu_funcs = {cpu_mult},
+	.cpu_funcs_name = {"cpu_mult"},
+	.nbuffers = 3,
+	.modes = {STARPU_R, STARPU_R, STARPU_RW},
+	.model = &starpu_gemm_model
+};
+
+static struct starpu_codelet cl_init_matrix_random =
+{
+	.max_parallelism = INT_MAX,
+	.cpu_funcs = {cpu_init_matrix_random},
+	.cpu_funcs_name = {"cpu_init_matrix_random"},
+	.nbuffers = 2,
+	.modes = {STARPU_W, STARPU_W}
+};
+
+static struct starpu_codelet cl_init_matrix_zero =
+{
+	.max_parallelism = INT_MAX,
+	.cpu_funcs = {cpu_init_matrix_zero},
+	.cpu_funcs_name = {"cpu_init_matrix_zero"},
+	.nbuffers = 1,
+	.modes = {STARPU_W}
+};
+
+static void parse_args(int argc, char **argv)
+{
+	int i;
+	for (i = 1; i < argc; i++)
+	{
+		if (strcmp(argv[i], "-nblocks") == 0)
+		{
+			char *argptr;
+			nslices = strtol(argv[++i], &argptr, 10);
+			matrix_dim = 320 * nslices;
+		}
+
+		else if (strcmp(argv[i], "-size") == 0)
+		{
+			char *argptr;
+			unsigned matrix_dim_tmp = strtol(argv[++i], &argptr, 10);
+			if (matrix_dim_tmp % 320 != 0)
+			{
+				fprintf(stderr, "Matrix size has to be a multiple of 320\n");
+			}
+			else
+			{
+				matrix_dim = matrix_dim_tmp;
+				nslices = matrix_dim / 320;
+			}
+		}
+
+		else if (strcmp(argv[i], "-check") == 0)
+		{
+			check = 1;
+		}
+
+		else if (strcmp(argv[i], "-spmd") == 0)
+		{
+			cl.type = STARPU_SPMD;
+		}
+
+		else if (strcmp(argv[i], "-comm-thread-cpuid") == 0)
+		{
+			comm_thread_cpuid = atoi(argv[++i]);
+		}
+
+		else if (strcmp(argv[i], "-help") == 0 || strcmp(argv[i], "--help") == 0 || strcmp(argv[i], "-h") == 0)
+		{
+			fprintf(stderr,"Usage: %s [-nblocks n] [-size size] [-check] [-spmd] [-comm_thread_cpuid cpuid]\n", argv[0]);
+			fprintf(stderr,"Currently selected: matrix size: %u - %u blocks\n", matrix_dim, nslices);
+			fprintf(stderr, "Use -comm_thread_cpuid to specifiy where to bind the comm benchmarking thread\n");
+			exit(EXIT_SUCCESS);
+		}
+		else
+		{
+			fprintf(stderr,"Unrecognized option %s", argv[i]);
+			exit(EXIT_FAILURE);
+		}
+	}
+}
+
+
+static void* comm_thread_func(void* arg)
+{
+	if (comm_thread_cpuid < 0)
+	{
+		comm_thread_cpuid = starpu_get_next_bindid(STARPU_THREAD_ACTIVE, NULL, 0);
+	}
+
+	if (starpu_bind_thread_on(comm_thread_cpuid, 0, "Comm") < 0)
+	{
+		char hostname[65];
+		gethostname(hostname, sizeof(hostname));
+		_STARPU_DISP("[%s] No core was available for the comm thread. You should increase STARPU_RESERVE_NCPU or decrease STARPU_NCPU\n", hostname);
+	}
+
+	sendrecv_bench(mpi_rank, &thread_barrier);
+
+	return NULL;
+}
+
+
+int main(int argc, char **argv)
+{
+	double start, end;
+	int ret, mpi_init, worldsize;
+	starpu_pthread_t comm_thread;
+
+	char hostname[255];
+	gethostname(hostname, 255);
+
+	parse_args(argc, argv);
+
+	starpu_fxt_autostart_profiling(0);
+
+	MPI_INIT_THREAD(&argc, &argv, MPI_THREAD_SERIALIZED, &mpi_init);
+	ret = starpu_mpi_init_conf(&argc, &argv, mpi_init, MPI_COMM_WORLD, NULL);
+	if (ret == -ENODEV)
+		return 77;
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_mpi_init_conf");
+
+	starpu_mpi_comm_rank(MPI_COMM_WORLD, &mpi_rank);
+	starpu_mpi_comm_size(MPI_COMM_WORLD, &worldsize);
+
+	if (worldsize < 2)
+	{
+		if (mpi_rank == 0)
+			FPRINTF(stderr, "We need 2 processes.\n");
+
+		starpu_mpi_shutdown();
+		if (!mpi_init)
+			MPI_Finalize();
+		return STARPU_TEST_SKIPPED;
+	}
+
+
+	STARPU_PTHREAD_BARRIER_INIT(&thread_barrier, NULL, 2);
+
+
+	// Start comm thread, benchmarking sendrecv:
+	STARPU_PTHREAD_CREATE(&comm_thread, NULL, comm_thread_func, NULL);
+
+
+	// Main thread will submit GEMM tasks:
+	starpu_malloc_flags((void **)&A, matrix_dim*matrix_dim*sizeof(TYPE), STARPU_MALLOC_PINNED|STARPU_MALLOC_SIMULATION_FOLDED);
+	starpu_malloc_flags((void **)&B, matrix_dim*matrix_dim*sizeof(TYPE), STARPU_MALLOC_PINNED|STARPU_MALLOC_SIMULATION_FOLDED);
+	starpu_malloc_flags((void **)&C, matrix_dim*matrix_dim*sizeof(TYPE), STARPU_MALLOC_PINNED|STARPU_MALLOC_SIMULATION_FOLDED);
+	partition_mult_data();
+
+
+	if (mpi_rank == 0)
+	{
+		PRINTF("# node\tx\ty\tz\tms\tGFlops\n");
+	}
+
+	starpu_pause();
+#ifndef STARPU_SIMGRID
+	// Initialize matrices:
+	unsigned x, y;
+	for (x = 0; x < nslices; x++)
+	{
+		struct starpu_task *task = starpu_task_create();
+		task->cl = &cl_init_matrix_random;
+		task->handles[0] = starpu_data_get_sub_data(A_handle, 1, x);
+		task->handles[1] = starpu_data_get_sub_data(B_handle, 1, x);
+		ret = starpu_task_submit(task);
+		CHECK_TASK_SUBMIT(ret);
+
+		for (y = 0; y < nslices; y++)
+		{
+			task = starpu_task_create();
+			task->cl = &cl_init_matrix_zero;
+			task->handles[0] = starpu_data_get_sub_data(C_handle, 2, x, y);
+			ret = starpu_task_submit(task);
+			CHECK_TASK_SUBMIT(ret);
+		}
+	}
+#endif
+
+	for (x = 0; x < nslices; x++)
+	for (y = 0; y < nslices; y++)
+	{
+		struct starpu_task *task = starpu_task_create();
+		task->cl = &cl;
+		task->handles[0] = starpu_data_get_sub_data(A_handle, 1, y);
+		task->handles[1] = starpu_data_get_sub_data(B_handle, 1, x);
+		task->handles[2] = starpu_data_get_sub_data(C_handle, 2, x, y);
+		task->flops = 2ULL * (matrix_dim/nslices) * (matrix_dim/nslices) * matrix_dim;
+
+		ret = starpu_task_submit(task);
+		CHECK_TASK_SUBMIT(ret);
+		starpu_data_wont_use(starpu_data_get_sub_data(C_handle, 2, x, y));
+	}
+
+	starpu_mpi_barrier(MPI_COMM_WORLD);
+	starpu_fxt_start_profiling();
+
+	STARPU_PTHREAD_BARRIER_WAIT(&thread_barrier);
+
+	start = starpu_timing_now();
+	starpu_resume();
+	starpu_task_wait_for_all();
+	end = starpu_timing_now();
+	starpu_pause(); // Pause not to disturb comm thread if it isn't done
+
+	double timing = end - start;
+	double flops = 2.0*((unsigned long long)matrix_dim) * ((unsigned long long)matrix_dim)*((unsigned long long)matrix_dim);
+
+	PRINTF("%s\t%u\t%u\t%u\t%.0f\t%.1f\n", hostname, matrix_dim, matrix_dim, matrix_dim, timing/1000.0, flops/timing/1000.0);
+
+
+enodev:
+	starpu_data_unpartition(C_handle, STARPU_MAIN_RAM);
+	starpu_data_unpartition(B_handle, STARPU_MAIN_RAM);
+	starpu_data_unpartition(A_handle, STARPU_MAIN_RAM);
+
+	starpu_data_unregister(A_handle);
+	starpu_data_unregister(B_handle);
+	starpu_data_unregister(C_handle);
+
+	if (check)
+		check_output();
+
+	starpu_free_flags(A, matrix_dim*matrix_dim*sizeof(TYPE), STARPU_MALLOC_PINNED|STARPU_MALLOC_SIMULATION_FOLDED);
+	starpu_free_flags(B, matrix_dim*matrix_dim*sizeof(TYPE), STARPU_MALLOC_PINNED|STARPU_MALLOC_SIMULATION_FOLDED);
+	starpu_free_flags(C, matrix_dim*matrix_dim*sizeof(TYPE), STARPU_MALLOC_PINNED|STARPU_MALLOC_SIMULATION_FOLDED);
+
+
+	// Wait comm thread:
+	STARPU_PTHREAD_JOIN(comm_thread, NULL);
+	STARPU_PTHREAD_BARRIER_DESTROY(&thread_barrier);
+
+	starpu_fxt_stop_profiling();
+
+	starpu_resume();
+	starpu_mpi_shutdown();
+	if (!mpi_init)
+		MPI_Finalize();
+
+	return ret;
+}