瀏覽代碼

Add mpi/tests/burst_gemm

* bursts with several configurations (polling workers, paused workers
and working workers)
* refactor mpi/tests/{burst,sendrecv_gemm_bench}
* add option -nreqs in burst* programs
* simplify bursts: since all requests are explicitely waited, no need to
call starpu_mpi_wait_for_all() anymore
Philippe SWARTVAGHER 5 年之前
父節點
當前提交
eb3ee5c828
共有 9 個文件被更改,包括 799 次插入491 次删除
  1. 17 2
      mpi/tests/Makefile.am
  2. 26 217
      mpi/tests/burst.c
  3. 163 0
      mpi/tests/burst_gemm.c
  4. 232 0
      mpi/tests/burst_helper.c
  5. 29 0
      mpi/tests/burst_helper.h
  6. 282 0
      mpi/tests/gemm_helper.c
  7. 33 0
      mpi/tests/gemm_helper.h
  8. 1 0
      mpi/tests/helper.h
  9. 16 272
      mpi/tests/sendrecv_gemm_bench.c

+ 17 - 2
mpi/tests/Makefile.am

@@ -62,6 +62,8 @@ EXTRA_DIST = 				\
 	abstract_sendrecv_bench.h	\
 	bench_helper.h			\
 	helper.h			\
+	gemm_helper.h			\
+	burst_helper.h			\
 	user_defined_datatype_value.h
 
 examplebindir = $(libdir)/starpu/examples/mpi
@@ -149,7 +151,8 @@ endif
 
 if !NO_BLAS_LIB
 starpu_mpi_TESTS +=				\
-	sendrecv_gemm_bench
+	sendrecv_gemm_bench			\
+	burst_gemm
 endif
 
 if !STARPU_SIMGRID
@@ -246,7 +249,8 @@ noinst_PROGRAMS =				\
 
 if !NO_BLAS_LIB
 noinst_PROGRAMS +=				\
-	sendrecv_gemm_bench
+	sendrecv_gemm_bench			\
+	burst_gemm
 endif
 
 XFAIL_TESTS=					\
@@ -287,13 +291,24 @@ sendrecv_parallel_tasks_bench_SOURCES = sendrecv_parallel_tasks_bench.c
 sendrecv_parallel_tasks_bench_SOURCES += bench_helper.c
 sendrecv_parallel_tasks_bench_SOURCES += abstract_sendrecv_bench.c
 
+burst_SOURCES = burst.c
+burst_SOURCES += burst_helper.c
+
 if !NO_BLAS_LIB
 sendrecv_gemm_bench_SOURCES = sendrecv_gemm_bench.c
 sendrecv_gemm_bench_SOURCES += bench_helper.c
+sendrecv_gemm_bench_SOURCES += gemm_helper.c
 sendrecv_gemm_bench_SOURCES += abstract_sendrecv_bench.c
 sendrecv_gemm_bench_SOURCES += ../../examples/common/blas.c
 
 sendrecv_gemm_bench_LDADD = $(STARPU_BLAS_LDFLAGS)
+
+burst_gemm_SOURCES = burst_gemm.c
+burst_gemm_SOURCES += gemm_helper.c
+burst_gemm_SOURCES += burst_helper.c
+burst_gemm_SOURCES += ../../examples/common/blas.c
+
+burst_gemm_LDADD = $(STARPU_BLAS_LDFLAGS)
 endif
 
 endif

+ 26 - 217
mpi/tests/burst.c

@@ -17,252 +17,61 @@
 /*
  * This test sends simultaneously many communications, with various configurations.
  *
- * Global purpose is to watch the behaviour with traces.
+ * Global purpose is to run with trace recording, to watch the behaviour of communications.
  */
 
 #include <starpu_mpi.h>
 #include "helper.h"
+#include "burst_helper.h"
 
-#if defined(STARPU_SIMGRID) || defined(STARPU_QUICK_CHECK)
-#define NB_REQUESTS 10
-#else
-#define NB_REQUESTS 500
-#endif
-#define NX_ARRAY (320 * 320)
 
-static starpu_pthread_mutex_t mutex = STARPU_PTHREAD_MUTEX_INITIALIZER;
-static starpu_pthread_cond_t cond = STARPU_PTHREAD_COND_INITIALIZER;
-
-void recv_callback(void* arg)
+void parse_args(int argc, char **argv)
 {
-	int* received = arg;
-
-	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
-	*received = 1;
-	STARPU_PTHREAD_COND_SIGNAL(&cond);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
-}
-
-int main(int argc, char **argv)
-{
-	int ret, rank, size, mpi_init, other_rank;
-	starpu_data_handle_t recv_handles[NB_REQUESTS];
-	starpu_data_handle_t send_handles[NB_REQUESTS];
-	float* recv_buffers[NB_REQUESTS];
-	float* send_buffers[NB_REQUESTS];
-	starpu_mpi_req recv_reqs[NB_REQUESTS];
-	starpu_mpi_req send_reqs[NB_REQUESTS];
-
-	MPI_INIT_THREAD(&argc, &argv, MPI_THREAD_SERIALIZED, &mpi_init);
-
-	ret = starpu_mpi_init_conf(&argc, &argv, mpi_init, MPI_COMM_WORLD, NULL);
-	STARPU_CHECK_RETURN_VALUE(ret, "starpu_mpi_init_conf");
-
-	starpu_mpi_comm_rank(MPI_COMM_WORLD, &rank);
-	starpu_mpi_comm_size(MPI_COMM_WORLD, &size);
-
-	other_rank = (rank == 0) ? 1 : 0;
-
-	if (rank == 0 || rank == 1)
-	{
-		for (int i = 0; i < NB_REQUESTS; i++)
-		{
-			send_buffers[i] = malloc(NX_ARRAY * sizeof(float));
-			memset(send_buffers[i], 0, NX_ARRAY * sizeof(float));
-			starpu_vector_data_register(&send_handles[i], STARPU_MAIN_RAM, (uintptr_t) send_buffers[i], NX_ARRAY, sizeof(float));
-
-			recv_buffers[i] = malloc(NX_ARRAY * sizeof(float));
-			memset(recv_buffers[i], 0, NX_ARRAY * sizeof(float));
-			starpu_vector_data_register(&recv_handles[i], STARPU_MAIN_RAM, (uintptr_t) recv_buffers[i], NX_ARRAY, sizeof(float));
-		}
-	}
-
-	{
-		/* Burst simultaneous from both nodes: 0 and 1 post all the recvs, synchronise, and then post all the sends */
-		FPRINTF(stderr, "Simultaneous....start (rank %d)\n", rank);
-
-		if (rank == 0 || rank == 1)
-		{
-			for (int i = 0; i < NB_REQUESTS; i++)
-			{
-				recv_reqs[i] = NULL;
-				starpu_mpi_irecv(recv_handles[i], &recv_reqs[i], other_rank, i, MPI_COMM_WORLD);
-			}
-		}
-
-		starpu_mpi_barrier(MPI_COMM_WORLD);
-
-		if (rank == 0 || rank == 1)
-		{
-			for (int i = 0; i < NB_REQUESTS; i++)
-			{
-				send_reqs[i] = NULL;
-				starpu_mpi_isend_prio(send_handles[i], &send_reqs[i], other_rank, i, i, MPI_COMM_WORLD);
-			}
-		}
-
-		if (rank == 0 || rank == 1)
-		{
-			for (int i = 0; i < NB_REQUESTS; i++)
-			{
-				if (recv_reqs[i]) starpu_mpi_wait(&recv_reqs[i], MPI_STATUS_IGNORE);
-				if (send_reqs[i]) starpu_mpi_wait(&send_reqs[i], MPI_STATUS_IGNORE);
-			}
-		}
-		starpu_mpi_wait_for_all(MPI_COMM_WORLD);
-		FPRINTF(stderr, "Simultaneous....end (rank %d)\n", rank);
-		starpu_mpi_barrier(MPI_COMM_WORLD);
-	}
-
+	int i;
+	for (i = 1; i < argc; i++)
 	{
-		/* Burst from 0 to 1 : rank 1 posts all the recvs, barrier, then rank 0 posts all the sends */
-		FPRINTF(stderr, "0 -> 1...start (rank %d)\n", rank);
-
-		if (rank == 1)
+		if (strcmp(argv[i], "-nreqs") == 0)
 		{
-			for (int i = 0; i < NB_REQUESTS; i++)
-			{
-				recv_reqs[i] = NULL;
-				starpu_mpi_irecv(recv_handles[i], &recv_reqs[i], other_rank, i, MPI_COMM_WORLD);
-			}
+			burst_nb_requests = atoi(argv[++i]);
 		}
 
-		starpu_mpi_barrier(MPI_COMM_WORLD);
-
-		if (rank == 0)
+		else if (strcmp(argv[i], "-help") == 0 || strcmp(argv[i], "--help") == 0 || strcmp(argv[i], "-h") == 0)
 		{
-			for (int i = 0; i < NB_REQUESTS; i++)
-			{
-				send_reqs[i] = NULL;
-				starpu_mpi_isend_prio(send_handles[i], &send_reqs[i], other_rank, i, i, MPI_COMM_WORLD);
-			}
+			fprintf(stderr,"Usage: %s [-nreqs nreqs]\n", argv[0]);
+			fprintf(stderr,"Currently selected: %d requests in each burst\n", burst_nb_requests);
+			exit(EXIT_SUCCESS);
 		}
 
-		if (rank == 0 || rank == 1)
+		else
 		{
-			for (int i = 0; i < NB_REQUESTS; i++)
-			{
-				if (rank == 1 && recv_reqs[i]) starpu_mpi_wait(&recv_reqs[i], MPI_STATUS_IGNORE);
-				if (rank == 0 && send_reqs[i]) starpu_mpi_wait(&send_reqs[i], MPI_STATUS_IGNORE);
-			}
+			fprintf(stderr,"Unrecognized option %s\n", argv[i]);
+			exit(EXIT_FAILURE);
 		}
-		starpu_mpi_wait_for_all(MPI_COMM_WORLD);
-		FPRINTF(stderr, "0 -> 1...done (rank %d)\n", rank);
-		starpu_mpi_barrier(MPI_COMM_WORLD);
 	}
+}
 
-	{
-		FPRINTF(stderr, "1 -> 0...start (rank %d)\n", rank);
-		/* Burst from 1 to 0 */
-		if (rank == 0)
-		{
-			for (int i = 0; i < NB_REQUESTS; i++)
-			{
-				recv_reqs[i] = NULL;
-				starpu_mpi_irecv(recv_handles[i], &recv_reqs[i], other_rank, i, MPI_COMM_WORLD);
-			}
-		}
-
-		starpu_mpi_barrier(MPI_COMM_WORLD);
-
-		if (rank == 1)
-		{
-			for (int i = 0; i < NB_REQUESTS; i++)
-			{
-				send_reqs[i] = NULL;
-				starpu_mpi_isend_prio(send_handles[i], &send_reqs[i], other_rank, i, i, MPI_COMM_WORLD);
-			}
-		}
-
-		if (rank == 0 || rank == 1)
-		{
-			for (int i = 0; i < NB_REQUESTS; i++)
-			{
-				if (rank == 0 && recv_reqs[i]) starpu_mpi_wait(&recv_reqs[i], MPI_STATUS_IGNORE);
-				if (rank == 1 && send_reqs[i]) starpu_mpi_wait(&send_reqs[i], MPI_STATUS_IGNORE);
-			}
-		}
-		starpu_mpi_wait_for_all(MPI_COMM_WORLD);
-		FPRINTF(stderr, "1 -> 0...done (rank %d)\n", rank);
-		starpu_mpi_barrier(MPI_COMM_WORLD);
-	}
 
-	{
-		/* Half burst from both nodes, second half burst is triggered after some requests finished. */
-		FPRINTF(stderr, "Half/half burst...start (rank %d)\n", rank);
+int main(int argc, char **argv)
+{
+	int ret, rank, mpi_init, other_rank;
 
-		int received = 0;
+	parse_args(argc, argv);
 
-		if (rank == 0 || rank == 1)
-		{
-			for (int i = 0; i < NB_REQUESTS; i++)
-			{
-				recv_reqs[i] = NULL;
-				if (i % 2)
-				{
-					starpu_mpi_irecv_detached(recv_handles[i], other_rank, i, MPI_COMM_WORLD, recv_callback, &received);
-				}
-				else
-				{
-					starpu_mpi_irecv(recv_handles[i], &recv_reqs[i], other_rank, i, MPI_COMM_WORLD);
-				}
-			}
-		}
+	MPI_INIT_THREAD(&argc, &argv, MPI_THREAD_SERIALIZED, &mpi_init);
 
-		starpu_mpi_barrier(MPI_COMM_WORLD);
+	ret = starpu_mpi_init_conf(&argc, &argv, mpi_init, MPI_COMM_WORLD, NULL);
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_mpi_init_conf");
 
-		if (rank == 0 || rank == 1)
-		{
-			for (int i = 0; i < (NB_REQUESTS / 2); i++)
-			{
-				send_reqs[i] = NULL;
-				starpu_mpi_isend_prio(send_handles[i], &send_reqs[i], other_rank, i, i, MPI_COMM_WORLD);
-			}
-		}
+	starpu_mpi_comm_rank(MPI_COMM_WORLD, &rank);
 
-		if (rank == 0 || rank == 1)
-		{
-			STARPU_PTHREAD_MUTEX_LOCK(&mutex);
-			while (!received)
-				STARPU_PTHREAD_COND_WAIT(&cond, &mutex);
-			STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
-		}
+	burst_init_data(rank);
 
-		if (rank == 0 || rank == 1)
-		{
-			for (int i = (NB_REQUESTS / 2); i < NB_REQUESTS; i++)
-			{
-				send_reqs[i] = NULL;
-				starpu_mpi_isend_prio(send_handles[i], &send_reqs[i], other_rank, i, i, MPI_COMM_WORLD);
-			}
-		}
 
-		if (rank == 0 || rank == 1)
-		{
-			for (int i = 0; i < NB_REQUESTS; i++)
-			{
-				if (recv_reqs[i]) starpu_mpi_wait(&recv_reqs[i], MPI_STATUS_IGNORE);
-				if (send_reqs[i]) starpu_mpi_wait(&send_reqs[i], MPI_STATUS_IGNORE);
-			}
-		}
+	burst_all(rank);
 
-		starpu_mpi_wait_for_all(MPI_COMM_WORLD);
-		FPRINTF(stderr, "Half/half burst...done (rank %d)\n", rank);
-		starpu_mpi_barrier(MPI_COMM_WORLD);
-	}
 
 	/* Clear up */
-	if (rank == 0 || rank == 1)
-	{
-		for (int i = 0; i < NB_REQUESTS; i++)
-		{
-			starpu_data_unregister(send_handles[i]);
-			free(send_buffers[i]);
-
-			starpu_data_unregister(recv_handles[i]);
-			free(recv_buffers[i]);
-		}
-	}
+	burst_free_data(rank);
 
 	starpu_mpi_shutdown();
 	if (!mpi_init)

+ 163 - 0
mpi/tests/burst_gemm.c

@@ -0,0 +1,163 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2009-2020  Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+/*
+ * Program to be executed with trace recording to watch the impact of
+ * computations (or task polling) on communications.
+ */
+#include <limits.h>
+#include <string.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <starpu_mpi.h>
+#include <starpu_fxt.h>
+
+#include "helper.h"
+#include "gemm_helper.h"
+#include "burst_helper.h"
+
+
+void parse_args(int argc, char **argv)
+{
+	int i;
+	for (i = 1; i < argc; i++)
+	{
+		if (strcmp(argv[i], "-nblocks") == 0)
+		{
+			char *argptr;
+			nslices = strtol(argv[++i], &argptr, 10);
+			matrix_dim = 320 * nslices;
+		}
+
+		else if (strcmp(argv[i], "-size") == 0)
+		{
+			char *argptr;
+			unsigned matrix_dim_tmp = strtol(argv[++i], &argptr, 10);
+			if (matrix_dim_tmp % 320 != 0)
+			{
+				fprintf(stderr, "Matrix size has to be a multiple of 320\n");
+			}
+			else
+			{
+				matrix_dim = matrix_dim_tmp;
+				nslices = matrix_dim / 320;
+			}
+		}
+
+		else if (strcmp(argv[i], "-check") == 0)
+		{
+			check = 1;
+		}
+
+		else if (strcmp(argv[i], "-nreqs") == 0)
+		{
+			burst_nb_requests = atoi(argv[++i]);
+		}
+
+		else if (strcmp(argv[i], "-help") == 0 || strcmp(argv[i], "--help") == 0 || strcmp(argv[i], "-h") == 0)
+		{
+			fprintf(stderr,"Usage: %s [-nblocks n] [-size size] [-check] [-nreqs nreqs]\n", argv[0]);
+			fprintf(stderr,"Currently selected: matrix size: %u - %u blocks - %d requests in each burst\n", matrix_dim, nslices, burst_nb_requests);
+			exit(EXIT_SUCCESS);
+		}
+
+		else
+		{
+			fprintf(stderr,"Unrecognized option %s\n", argv[i]);
+			exit(EXIT_FAILURE);
+		}
+	}
+}
+
+
+int main(int argc, char **argv)
+{
+	int ret, mpi_init, worldsize, mpi_rank;
+
+	parse_args(argc, argv);
+
+	MPI_INIT_THREAD(&argc, &argv, MPI_THREAD_SERIALIZED, &mpi_init);
+	ret = starpu_mpi_init_conf(&argc, &argv, mpi_init, MPI_COMM_WORLD, NULL);
+	if (ret == -ENODEV)
+		return 77;
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_mpi_init_conf");
+
+	starpu_mpi_comm_rank(MPI_COMM_WORLD, &mpi_rank);
+	starpu_mpi_comm_size(MPI_COMM_WORLD, &worldsize);
+
+	if (worldsize < 2)
+	{
+		if (mpi_rank == 0)
+			FPRINTF(stderr, "We need 2 processes.\n");
+
+		starpu_mpi_shutdown();
+		if (!mpi_init)
+			MPI_Finalize();
+		return STARPU_TEST_SKIPPED;
+	}
+
+	gemm_alloc_data();
+	if(gemm_init_data() == -ENODEV)
+		goto enodev;
+
+	burst_init_data(mpi_rank);
+
+	/* Wait for everything and everybody: */
+	starpu_task_wait_for_all();
+	starpu_mpi_barrier(MPI_COMM_WORLD);
+
+
+	FPRINTF(stderr, "** Burst warmup **\n");
+	burst_all(mpi_rank);
+
+	starpu_sleep(0.3); // sleep to easily distinguish different bursts in traces
+
+	FPRINTF(stderr, "** Burst while there is no task available, but workers are polling **\n");
+	burst_all(mpi_rank);
+
+	starpu_sleep(0.3); // sleep to easily distinguish different bursts in traces
+
+	FPRINTF(stderr, "** Burst while there is no task available, workers are paused **\n");
+	starpu_pause();
+	burst_all(mpi_rank);
+
+	starpu_sleep(0.3); // sleep to easily distinguish different bursts in traces
+
+	FPRINTF(stderr, "** Burst while workers are really working **\n");
+	if(gemm_submit_tasks() == -ENODEV)
+		goto enodev;
+	starpu_resume();
+
+	burst_all(mpi_rank);
+
+	FPRINTF(stderr, "Burst done, now waiting for computing tasks to finish\n");
+
+
+	/* Wait for everything and everybody: */
+	starpu_task_wait_for_all();
+	starpu_mpi_barrier(MPI_COMM_WORLD);
+
+
+enodev:
+	gemm_release();
+	burst_free_data(mpi_rank);
+
+	starpu_mpi_shutdown();
+	if (!mpi_init)
+		MPI_Finalize();
+
+	return ret;
+}

+ 232 - 0
mpi/tests/burst_helper.c

@@ -0,0 +1,232 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2020  Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+#include <starpu_mpi.h>
+
+#include "helper.h"
+#include "burst_helper.h"
+
+#if defined(STARPU_SIMGRID) || defined(STARPU_QUICK_CHECK)
+#define NB_REQUESTS 10
+#else
+#define NB_REQUESTS 50
+#endif
+#define NX_ARRAY (320 * 320)
+
+
+static starpu_data_handle_t* recv_handles;
+static starpu_data_handle_t* send_handles;
+static float** recv_buffers;
+static float** send_buffers;
+static starpu_mpi_req* recv_reqs;
+static starpu_mpi_req* send_reqs;
+
+int burst_nb_requests = NB_REQUESTS;
+
+void burst_init_data(int rank)
+{
+	if (rank == 0 || rank == 1)
+	{
+		recv_handles = malloc(burst_nb_requests * sizeof(starpu_data_handle_t));
+		send_handles = malloc(burst_nb_requests * sizeof(starpu_data_handle_t));
+		recv_buffers = malloc(burst_nb_requests * sizeof(float*));
+		send_buffers = malloc(burst_nb_requests * sizeof(float*));
+		recv_reqs = malloc(burst_nb_requests * sizeof(starpu_mpi_req));
+		send_reqs = malloc(burst_nb_requests * sizeof(starpu_mpi_req));
+
+		for (int i = 0; i < burst_nb_requests; i++)
+		{
+			send_buffers[i] = malloc(NX_ARRAY * sizeof(float));
+			memset(send_buffers[i], 0, NX_ARRAY * sizeof(float));
+			starpu_vector_data_register(&send_handles[i], STARPU_MAIN_RAM, (uintptr_t) send_buffers[i], NX_ARRAY, sizeof(float));
+
+			recv_buffers[i] = malloc(NX_ARRAY * sizeof(float));
+			memset(recv_buffers[i], 0, NX_ARRAY * sizeof(float));
+			starpu_vector_data_register(&recv_handles[i], STARPU_MAIN_RAM, (uintptr_t) recv_buffers[i], NX_ARRAY, sizeof(float));
+		}
+	}
+}
+
+
+void burst_free_data(int rank)
+{
+	if (rank == 0 || rank == 1)
+	{
+		for (int i = 0; i < burst_nb_requests; i++)
+		{
+			starpu_data_unregister(send_handles[i]);
+			free(send_buffers[i]);
+
+			starpu_data_unregister(recv_handles[i]);
+			free(recv_buffers[i]);
+		}
+
+		free(recv_handles);
+		free(send_handles);
+		free(recv_buffers);
+		free(send_buffers);
+		free(recv_reqs);
+		free(send_reqs);
+	}
+}
+
+
+/* Burst simultaneous from both nodes: 0 and 1 post all the recvs, synchronise, and then post all the sends */
+void burst_bidir(int rank)
+{
+	int other_rank = (rank == 0) ? 1 : 0;
+
+	FPRINTF(stderr, "Simultaneous....start (rank %d)\n", rank);
+
+	if (rank == 0 || rank == 1)
+	{
+		for (int i = 0; i < burst_nb_requests; i++)
+		{
+			recv_reqs[i] = NULL;
+			starpu_mpi_irecv(recv_handles[i], &recv_reqs[i], other_rank, i, MPI_COMM_WORLD);
+		}
+	}
+
+	starpu_mpi_barrier(MPI_COMM_WORLD);
+
+	if (rank == 0 || rank == 1)
+	{
+		for (int i = 0; i < burst_nb_requests; i++)
+		{
+			send_reqs[i] = NULL;
+			starpu_mpi_isend_prio(send_handles[i], &send_reqs[i], other_rank, i, i, MPI_COMM_WORLD);
+		}
+
+		for (int i = 0; i < burst_nb_requests; i++)
+		{
+			if (recv_reqs[i]) starpu_mpi_wait(&recv_reqs[i], MPI_STATUS_IGNORE);
+			if (send_reqs[i]) starpu_mpi_wait(&send_reqs[i], MPI_STATUS_IGNORE);
+		}
+	}
+
+	FPRINTF(stderr, "Simultaneous....end (rank %d)\n", rank);
+	starpu_mpi_barrier(MPI_COMM_WORLD);
+}
+
+
+void burst_unidir(int sender, int rank)
+{
+	int other_rank = (rank == 0) ? 1 : 0;
+	int receiver = (sender == 0) ? 1 : 0;
+
+	FPRINTF(stderr, "%d -> %d... start (rank %d)\n", sender, receiver, rank);
+
+	if (rank != sender)
+	{
+		for (int i = 0; i < burst_nb_requests; i++)
+		{
+			recv_reqs[i] = NULL;
+			starpu_mpi_irecv(recv_handles[i], &recv_reqs[i], other_rank, i, MPI_COMM_WORLD);
+		}
+	}
+
+	starpu_mpi_barrier(MPI_COMM_WORLD);
+
+	if (rank == sender)
+	{
+		for (int i = 0; i < burst_nb_requests; i++)
+		{
+			send_reqs[i] = NULL;
+			starpu_mpi_isend_prio(send_handles[i], &send_reqs[i], other_rank, i, i, MPI_COMM_WORLD);
+		}
+	}
+
+	if (rank == 0 || rank == 1)
+	{
+		for (int i = 0; i < burst_nb_requests; i++)
+		{
+			if (rank != sender && recv_reqs[i]) starpu_mpi_wait(&recv_reqs[i], MPI_STATUS_IGNORE);
+			if (rank == sender && send_reqs[i]) starpu_mpi_wait(&send_reqs[i], MPI_STATUS_IGNORE);
+		}
+	}
+
+	FPRINTF(stderr, "%d -> %d... end (rank %d)\n", sender, receiver, rank);
+
+	starpu_mpi_barrier(MPI_COMM_WORLD);
+}
+
+
+/* Half burst from both nodes, second half burst is triggered after some requests finished. */
+void burst_bidir_half_postponed(int rank)
+{
+	int other_rank = (rank == 0) ? 1 : 0;
+	int received = 0;
+
+	FPRINTF(stderr, "Half/half burst...start (rank %d)\n", rank);
+
+	if (rank == 0 || rank == 1)
+	{
+		for (int i = 0; i < burst_nb_requests; i++)
+		{
+			recv_reqs[i] = NULL;
+			starpu_mpi_irecv(recv_handles[i], &recv_reqs[i], other_rank, i, MPI_COMM_WORLD);
+		}
+	}
+
+	starpu_mpi_barrier(MPI_COMM_WORLD);
+
+	if (rank == 0 || rank == 1)
+	{
+		for (int i = 0; i < (burst_nb_requests / 2); i++)
+		{
+			send_reqs[i] = NULL;
+			starpu_mpi_isend_prio(send_handles[i], &send_reqs[i], other_rank, i, i, MPI_COMM_WORLD);
+		}
+
+		if (recv_reqs[burst_nb_requests / 4]) starpu_mpi_wait(&recv_reqs[burst_nb_requests / 4], MPI_STATUS_IGNORE);
+
+		for (int i = (burst_nb_requests / 2); i < burst_nb_requests; i++)
+		{
+			send_reqs[i] = NULL;
+			starpu_mpi_isend_prio(send_handles[i], &send_reqs[i], other_rank, i, i, MPI_COMM_WORLD);
+		}
+
+		for (int i = 0; i < burst_nb_requests; i++)
+		{
+			if (recv_reqs[i]) starpu_mpi_wait(&recv_reqs[i], MPI_STATUS_IGNORE);
+			if (send_reqs[i]) starpu_mpi_wait(&send_reqs[i], MPI_STATUS_IGNORE);
+		}
+	}
+
+	FPRINTF(stderr, "Half/half burst...done (rank %d)\n", rank);
+	starpu_mpi_barrier(MPI_COMM_WORLD);
+}
+
+
+void burst_all(int rank)
+{
+	double start, end;
+	start = starpu_timing_now();
+
+	/* Burst simultaneous from both nodes: 0 and 1 post all the recvs, synchronise, and then post all the sends */
+	burst_bidir(rank);
+
+	/* Burst from 0 to 1 : rank 1 posts all the recvs, barrier, then rank 0 posts all the sends */
+	burst_unidir(0, rank);
+
+	/* Burst from 1 to 0 : rank 0 posts all the recvs, barrier, then rank 1 posts all the sends */
+	burst_unidir(1, rank);
+
+	/* Half burst from both nodes, second half burst is triggered after some requests finished. */
+	burst_bidir_half_postponed(rank);
+
+	end = starpu_timing_now();
+	FPRINTF(stderr, "All bursts took %.0f ms\n", (end - start) / 1000.0);
+}

+ 29 - 0
mpi/tests/burst_helper.h

@@ -0,0 +1,29 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2020  Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#ifndef __MPI_TESTS_BURST_HELPER__
+#define __MPI_TESTS_BURST_HELPER__
+
+int burst_nb_requests;
+
+void burst_init_data(int rank);
+void burst_free_data(int rank);
+void burst_bidir(int rank);
+void burst_unidir(int sender, int rank);
+void burst_bidir_half_postponed(int rank);
+void burst_all(int rank);
+
+#endif /* __MPI_TESTS_BURST_HELPER__ */

+ 282 - 0
mpi/tests/gemm_helper.c

@@ -0,0 +1,282 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2020  Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <common/blas.h>
+#include "../../examples/mult/simple.h"
+#include "helper.h"
+#include "gemm_helper.h"
+
+
+#define CHECK_TASK_SUBMIT(ret) do {				\
+	if (ret == -ENODEV)					\
+	{							\
+		return -ENODEV;					\
+	}							\
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");	\
+} while(0)
+
+
+unsigned nslices = 4;
+#if defined(STARPU_QUICK_CHECK) && !defined(STARPU_SIMGRID)
+unsigned matrix_dim = 256;
+#else
+unsigned matrix_dim = 320 * 4;
+#endif
+unsigned check = 0;
+int comm_thread_cpuid = -1;
+
+static TYPE *A, *B, *C;
+static starpu_data_handle_t A_handle, B_handle, C_handle;
+
+static void check_output(void)
+{
+	/* compute C = C - AB */
+	CPU_GEMM("N", "N", matrix_dim, matrix_dim, matrix_dim, (TYPE)-1.0f, A, matrix_dim, B, matrix_dim, (TYPE)1.0f, C, matrix_dim);
+
+	/* make sure C = 0 */
+	TYPE err;
+	err = CPU_ASUM(matrix_dim*matrix_dim, C, 1);
+
+	if (err < matrix_dim*matrix_dim*0.001)
+	{
+		FPRINTF(stderr, "Results are OK\n");
+	}
+	else
+	{
+		int max;
+		max = CPU_IAMAX(matrix_dim*matrix_dim, C, 1);
+
+		FPRINTF(stderr, "There were errors ... err = %f\n", err);
+		FPRINTF(stderr, "Max error : %e\n", C[max]);
+	}
+}
+
+
+static void partition_mult_data(void)
+{
+	starpu_matrix_data_register(&A_handle, STARPU_MAIN_RAM, (uintptr_t)A,
+		matrix_dim, matrix_dim, matrix_dim, sizeof(TYPE));
+	starpu_matrix_data_register(&B_handle, STARPU_MAIN_RAM, (uintptr_t)B,
+		matrix_dim, matrix_dim, matrix_dim, sizeof(TYPE));
+	starpu_matrix_data_register(&C_handle, STARPU_MAIN_RAM, (uintptr_t)C,
+		matrix_dim, matrix_dim, matrix_dim, sizeof(TYPE));
+
+	struct starpu_data_filter vert;
+	memset(&vert, 0, sizeof(vert));
+	vert.filter_func = starpu_matrix_filter_vertical_block;
+	vert.nchildren = nslices;
+
+	struct starpu_data_filter horiz;
+	memset(&horiz, 0, sizeof(horiz));
+	horiz.filter_func = starpu_matrix_filter_block;
+	horiz.nchildren = nslices;
+
+	starpu_data_partition(B_handle, &vert);
+	starpu_data_partition(A_handle, &horiz);
+
+	starpu_data_map_filters(C_handle, 2, &vert, &horiz);
+}
+
+
+static void cpu_init_matrix_random(void *descr[], void *arg)
+{
+	(void)arg;
+	TYPE *subA = (TYPE *)STARPU_MATRIX_GET_PTR(descr[0]);
+	TYPE *subB = (TYPE *)STARPU_MATRIX_GET_PTR(descr[1]);
+	unsigned nx = STARPU_MATRIX_GET_NX(descr[0]);
+	unsigned ny = STARPU_MATRIX_GET_NY(descr[0]);
+
+	for (unsigned i = 0; i < nx *ny; i++)
+	{
+		subA[i] = (TYPE) (starpu_drand48());
+		subB[i] = (TYPE) (starpu_drand48());
+	}
+}
+
+
+static void cpu_init_matrix_zero(void *descr[], void *arg)
+{
+	(void)arg;
+	TYPE *subA = (TYPE *)STARPU_MATRIX_GET_PTR(descr[0]);
+	unsigned nx = STARPU_MATRIX_GET_NX(descr[0]);
+	unsigned ny = STARPU_MATRIX_GET_NY(descr[0]);
+
+	for (unsigned i = 0; i < nx *ny; i++)
+	{
+		subA[i] = (TYPE) (0);
+	}
+}
+
+
+static void cpu_mult(void *descr[], void *arg)
+{
+	(void)arg;
+	TYPE *subA = (TYPE *)STARPU_MATRIX_GET_PTR(descr[0]);
+	TYPE *subB = (TYPE *)STARPU_MATRIX_GET_PTR(descr[1]);
+	TYPE *subC = (TYPE *)STARPU_MATRIX_GET_PTR(descr[2]);
+
+	unsigned nxC = STARPU_MATRIX_GET_NX(descr[2]);
+	unsigned nyC = STARPU_MATRIX_GET_NY(descr[2]);
+	unsigned nyA = STARPU_MATRIX_GET_NY(descr[0]);
+
+	unsigned ldA = STARPU_MATRIX_GET_LD(descr[0]);
+	unsigned ldB = STARPU_MATRIX_GET_LD(descr[1]);
+	unsigned ldC = STARPU_MATRIX_GET_LD(descr[2]);
+
+	int worker_size = starpu_combined_worker_get_size();
+
+	if (worker_size == 1)
+	{
+		/* Sequential CPU task */
+		CPU_GEMM("N", "N", nxC, nyC, nyA, (TYPE)1.0, subA, ldA, subB, ldB, (TYPE)0.0, subC, ldC);
+	}
+	else
+	{
+		/* Parallel CPU task */
+		unsigned rank = starpu_combined_worker_get_rank();
+
+		unsigned block_size = (nyC + worker_size - 1)/worker_size;
+		unsigned new_nyC = STARPU_MIN(nyC, block_size*(rank+1)) - block_size*rank;
+
+		STARPU_ASSERT(nyC == STARPU_MATRIX_GET_NY(descr[1]));
+
+		TYPE *new_subB = &subB[block_size*rank];
+		TYPE *new_subC = &subC[block_size*rank];
+
+		CPU_GEMM("N", "N", nxC, new_nyC, nyA, (TYPE)1.0, subA, ldA, new_subB, ldB, (TYPE)0.0, new_subC, ldC);
+	}
+}
+
+static struct starpu_perfmodel starpu_gemm_model =
+{
+	.type = STARPU_HISTORY_BASED,
+	.symbol = STARPU_GEMM_STR(gemm)
+};
+
+static struct starpu_codelet cl =
+{
+	.type = STARPU_SEQ, /* changed to STARPU_SPMD if -spmd is passed */
+	.max_parallelism = INT_MAX,
+	.cpu_funcs = {cpu_mult},
+	.cpu_funcs_name = {"cpu_mult"},
+	.nbuffers = 3,
+	.modes = {STARPU_R, STARPU_R, STARPU_RW},
+	.model = &starpu_gemm_model
+};
+
+static struct starpu_codelet cl_init_matrix_random =
+{
+	.max_parallelism = INT_MAX,
+	.cpu_funcs = {cpu_init_matrix_random},
+	.cpu_funcs_name = {"cpu_init_matrix_random"},
+	.nbuffers = 2,
+	.modes = {STARPU_W, STARPU_W},
+	.name = "init_matrix_random",
+	.color = 0xffa500 // orange
+};
+
+static struct starpu_codelet cl_init_matrix_zero =
+{
+	.max_parallelism = INT_MAX,
+	.cpu_funcs = {cpu_init_matrix_zero},
+	.cpu_funcs_name = {"cpu_init_matrix_zero"},
+	.nbuffers = 1,
+	.modes = {STARPU_W},
+	.name = "init_matrix_zero",
+	.color = 0x808000 // olive
+};
+
+void gemm_alloc_data()
+{
+	starpu_malloc_flags((void **)&A, matrix_dim*matrix_dim*sizeof(TYPE), STARPU_MALLOC_PINNED|STARPU_MALLOC_SIMULATION_FOLDED);
+	starpu_malloc_flags((void **)&B, matrix_dim*matrix_dim*sizeof(TYPE), STARPU_MALLOC_PINNED|STARPU_MALLOC_SIMULATION_FOLDED);
+	starpu_malloc_flags((void **)&C, matrix_dim*matrix_dim*sizeof(TYPE), STARPU_MALLOC_PINNED|STARPU_MALLOC_SIMULATION_FOLDED);
+	partition_mult_data();
+}
+
+
+int gemm_init_data()
+{
+#ifndef STARPU_SIMGRID
+	int ret;
+	unsigned x, y;
+
+	// Initialize matrices:
+	for (x = 0; x < nslices; x++)
+	{
+		struct starpu_task *task = starpu_task_create();
+		task->cl = &cl_init_matrix_random;
+		task->handles[0] = starpu_data_get_sub_data(A_handle, 1, x);
+		task->handles[1] = starpu_data_get_sub_data(B_handle, 1, x);
+		ret = starpu_task_submit(task);
+		CHECK_TASK_SUBMIT(ret);
+
+		for (y = 0; y < nslices; y++)
+		{
+			task = starpu_task_create();
+			task->cl = &cl_init_matrix_zero;
+			task->handles[0] = starpu_data_get_sub_data(C_handle, 2, x, y);
+			ret = starpu_task_submit(task);
+			CHECK_TASK_SUBMIT(ret);
+		}
+	}
+#endif
+	return 0;
+}
+
+
+int gemm_submit_tasks()
+{
+	int ret;
+	unsigned x, y;
+
+	for (x = 0; x < nslices; x++)
+	for (y = 0; y < nslices; y++)
+	{
+		struct starpu_task *task = starpu_task_create();
+		task->cl = &cl;
+		task->handles[0] = starpu_data_get_sub_data(A_handle, 1, y);
+		task->handles[1] = starpu_data_get_sub_data(B_handle, 1, x);
+		task->handles[2] = starpu_data_get_sub_data(C_handle, 2, x, y);
+		task->flops = 2ULL * (matrix_dim/nslices) * (matrix_dim/nslices) * matrix_dim;
+
+		ret = starpu_task_submit(task);
+		CHECK_TASK_SUBMIT(ret);
+		starpu_data_wont_use(starpu_data_get_sub_data(C_handle, 2, x, y));
+	}
+
+	return 0;
+}
+
+void gemm_release()
+{
+	starpu_data_unpartition(C_handle, STARPU_MAIN_RAM);
+	starpu_data_unpartition(B_handle, STARPU_MAIN_RAM);
+	starpu_data_unpartition(A_handle, STARPU_MAIN_RAM);
+
+	starpu_data_unregister(A_handle);
+	starpu_data_unregister(B_handle);
+	starpu_data_unregister(C_handle);
+
+	if (check)
+		check_output();
+
+	starpu_free_flags(A, matrix_dim*matrix_dim*sizeof(TYPE), STARPU_MALLOC_PINNED|STARPU_MALLOC_SIMULATION_FOLDED);
+	starpu_free_flags(B, matrix_dim*matrix_dim*sizeof(TYPE), STARPU_MALLOC_PINNED|STARPU_MALLOC_SIMULATION_FOLDED);
+	starpu_free_flags(C, matrix_dim*matrix_dim*sizeof(TYPE), STARPU_MALLOC_PINNED|STARPU_MALLOC_SIMULATION_FOLDED);
+}
+
+

+ 33 - 0
mpi/tests/gemm_helper.h

@@ -0,0 +1,33 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2020  Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#ifndef __MPI_TESTS_GEMM_HELPER__
+#define __MPI_TESTS_GEMM_HELPER__
+
+#include <starpu_config.h>
+
+unsigned nslices;
+unsigned matrix_dim;
+unsigned check;
+int comm_thread_cpuid;
+
+
+void gemm_alloc_data();
+int gemm_init_data();
+int gemm_submit_tasks();
+void gemm_release();
+
+#endif /* __MPI_TESTS_GEMM_HELPER__ */

+ 1 - 0
mpi/tests/helper.h

@@ -19,6 +19,7 @@
 #include <starpu_config.h>
 #include "../../tests/helper.h"
 
+#define PRINTF(fmt, ...) do { if (!getenv("STARPU_SSILENT")) {printf(fmt, ## __VA_ARGS__); fflush(stdout); }} while(0)
 #define FPRINTF(ofile, fmt, ...) do { if (!getenv("STARPU_SSILENT")) {fprintf(ofile, fmt, ## __VA_ARGS__); }} while(0)
 #define FPRINTF_MPI(ofile, fmt, ...) do { if (!getenv("STARPU_SSILENT")) { \
 			int _disp_rank; starpu_mpi_comm_rank(MPI_COMM_WORLD, &_disp_rank); \

+ 16 - 272
mpi/tests/sendrecv_gemm_bench.c

@@ -33,217 +33,36 @@
 #include <starpu_mpi.h>
 #include <starpu_fxt.h>
 
-#include <common/blas.h>
-
 #include "helper.h"
 #include "abstract_sendrecv_bench.h"
-#include "../../examples/mult/simple.h"
+#include "gemm_helper.h"
 
-#define CHECK_TASK_SUBMIT(ret) do {				\
-	if (ret == -ENODEV)					\
-	{							\
-		ret = 77;					\
-		goto enodev;					\
-	}							\
-	STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");	\
-} while(0)
 
 static int mpi_rank;
-static int comm_thread_cpuid = -1;
-static unsigned nslices = 4;
-#if defined(STARPU_QUICK_CHECK) && !defined(STARPU_SIMGRID)
-static unsigned matrix_dim = 256;
-#else
-static unsigned matrix_dim = 320 * 4;
-#endif
-static unsigned check = 0;
-
-static TYPE *A, *B, *C;
-static starpu_data_handle_t A_handle, B_handle, C_handle;
-
 static starpu_pthread_barrier_t thread_barrier;
 
-#define FPRINTF(ofile, fmt, ...) do { if (!getenv("STARPU_SSILENT")) {fprintf(ofile, fmt, ## __VA_ARGS__); }} while(0)
-#define PRINTF(fmt, ...) do { if (!getenv("STARPU_SSILENT")) {printf(fmt, ## __VA_ARGS__); fflush(stdout); }} while(0)
-
-static void check_output(void)
-{
-	/* compute C = C - AB */
-	CPU_GEMM("N", "N", matrix_dim, matrix_dim, matrix_dim, (TYPE)-1.0f, A, matrix_dim, B, matrix_dim, (TYPE)1.0f, C, matrix_dim);
-
-	/* make sure C = 0 */
-	TYPE err;
-	err = CPU_ASUM(matrix_dim*matrix_dim, C, 1);
-
-	if (err < matrix_dim*matrix_dim*0.001)
-	{
-		FPRINTF(stderr, "Results are OK\n");
-	}
-	else
-	{
-		int max;
-		max = CPU_IAMAX(matrix_dim*matrix_dim, C, 1);
-
-		FPRINTF(stderr, "There were errors ... err = %f\n", err);
-		FPRINTF(stderr, "Max error : %e\n", C[max]);
-	}
-}
 
-static void init_problem_data(void)
-{
-#ifndef STARPU_SIMGRID
-	unsigned i,j;
-#endif
-
-	starpu_malloc_flags((void **)&A, matrix_dim*matrix_dim*sizeof(TYPE), STARPU_MALLOC_PINNED|STARPU_MALLOC_SIMULATION_FOLDED);
-	starpu_malloc_flags((void **)&B, matrix_dim*matrix_dim*sizeof(TYPE), STARPU_MALLOC_PINNED|STARPU_MALLOC_SIMULATION_FOLDED);
-	starpu_malloc_flags((void **)&C, matrix_dim*matrix_dim*sizeof(TYPE), STARPU_MALLOC_PINNED|STARPU_MALLOC_SIMULATION_FOLDED);
-
-#ifndef STARPU_SIMGRID
-	/* fill the matrices */
-	for (j=0; j < matrix_dim; j++)
-	{
-		for (i=0; i < matrix_dim; i++)
-		{
-			A[j+i*matrix_dim] = (TYPE)(starpu_drand48());
-			B[j+i*matrix_dim] = (TYPE)(starpu_drand48());
-			C[j+i*matrix_dim] = (TYPE)(0);
-		}
-	}
-#endif
-}
-
-static void partition_mult_data(void)
-{
-	starpu_matrix_data_register(&A_handle, STARPU_MAIN_RAM, (uintptr_t)A,
-		matrix_dim, matrix_dim, matrix_dim, sizeof(TYPE));
-	starpu_matrix_data_register(&B_handle, STARPU_MAIN_RAM, (uintptr_t)B,
-		matrix_dim, matrix_dim, matrix_dim, sizeof(TYPE));
-	starpu_matrix_data_register(&C_handle, STARPU_MAIN_RAM, (uintptr_t)C,
-		matrix_dim, matrix_dim, matrix_dim, sizeof(TYPE));
-
-	struct starpu_data_filter vert;
-	memset(&vert, 0, sizeof(vert));
-	vert.filter_func = starpu_matrix_filter_vertical_block;
-	vert.nchildren = nslices;
-
-	struct starpu_data_filter horiz;
-	memset(&horiz, 0, sizeof(horiz));
-	horiz.filter_func = starpu_matrix_filter_block;
-	horiz.nchildren = nslices;
-
-	starpu_data_partition(B_handle, &vert);
-	starpu_data_partition(A_handle, &horiz);
-
-	starpu_data_map_filters(C_handle, 2, &vert, &horiz);
-}
-
-
-void cpu_init_matrix_random(void *descr[], void *arg)
-{
-	(void)arg;
-	TYPE *subA = (TYPE *)STARPU_MATRIX_GET_PTR(descr[0]);
-	TYPE *subB = (TYPE *)STARPU_MATRIX_GET_PTR(descr[1]);
-	unsigned nx = STARPU_MATRIX_GET_NX(descr[0]);
-	unsigned ny = STARPU_MATRIX_GET_NY(descr[0]);
-
-	for (unsigned i = 0; i < nx *ny; i++)
-	{
-		subA[i] = (TYPE) (starpu_drand48());
-		subB[i] = (TYPE) (starpu_drand48());
-	}
-}
-
-
-void cpu_init_matrix_zero(void *descr[], void *arg)
+static void* comm_thread_func(void* arg)
 {
-	(void)arg;
-	TYPE *subA = (TYPE *)STARPU_MATRIX_GET_PTR(descr[0]);
-	unsigned nx = STARPU_MATRIX_GET_NX(descr[0]);
-	unsigned ny = STARPU_MATRIX_GET_NY(descr[0]);
-
-	for (unsigned i = 0; i < nx *ny; i++)
+	if (comm_thread_cpuid < 0)
 	{
-		subA[i] = (TYPE) (0);
+		comm_thread_cpuid = starpu_get_next_bindid(STARPU_THREAD_ACTIVE, NULL, 0);
 	}
-}
-
-
-void cpu_mult(void *descr[], void *arg)
-{
-	(void)arg;
-	TYPE *subA = (TYPE *)STARPU_MATRIX_GET_PTR(descr[0]);
-	TYPE *subB = (TYPE *)STARPU_MATRIX_GET_PTR(descr[1]);
-	TYPE *subC = (TYPE *)STARPU_MATRIX_GET_PTR(descr[2]);
 
-	unsigned nxC = STARPU_MATRIX_GET_NX(descr[2]);
-	unsigned nyC = STARPU_MATRIX_GET_NY(descr[2]);
-	unsigned nyA = STARPU_MATRIX_GET_NY(descr[0]);
-
-	unsigned ldA = STARPU_MATRIX_GET_LD(descr[0]);
-	unsigned ldB = STARPU_MATRIX_GET_LD(descr[1]);
-	unsigned ldC = STARPU_MATRIX_GET_LD(descr[2]);
-
-	int worker_size = starpu_combined_worker_get_size();
-
-	if (worker_size == 1)
+	if (starpu_bind_thread_on(comm_thread_cpuid, 0, "Comm") < 0)
 	{
-		/* Sequential CPU task */
-		CPU_GEMM("N", "N", nxC, nyC, nyA, (TYPE)1.0, subA, ldA, subB, ldB, (TYPE)0.0, subC, ldC);
+		char hostname[65];
+		gethostname(hostname, sizeof(hostname));
+		_STARPU_DISP("[%s] No core was available for the comm thread. You should increase STARPU_RESERVE_NCPU or decrease STARPU_NCPU\n", hostname);
 	}
-	else
-	{
-		/* Parallel CPU task */
-		unsigned rank = starpu_combined_worker_get_rank();
 
-		unsigned block_size = (nyC + worker_size - 1)/worker_size;
-		unsigned new_nyC = STARPU_MIN(nyC, block_size*(rank+1)) - block_size*rank;
-
-		STARPU_ASSERT(nyC == STARPU_MATRIX_GET_NY(descr[1]));
-
-		TYPE *new_subB = &subB[block_size*rank];
-		TYPE *new_subC = &subC[block_size*rank];
+	sendrecv_bench(mpi_rank, &thread_barrier);
 
-		CPU_GEMM("N", "N", nxC, new_nyC, nyA, (TYPE)1.0, subA, ldA, new_subB, ldB, (TYPE)0.0, new_subC, ldC);
-	}
+	return NULL;
 }
 
-static struct starpu_perfmodel starpu_gemm_model =
-{
-	.type = STARPU_HISTORY_BASED,
-	.symbol = STARPU_GEMM_STR(gemm)
-};
 
-static struct starpu_codelet cl =
-{
-	.type = STARPU_SEQ, /* changed to STARPU_SPMD if -spmd is passed */
-	.max_parallelism = INT_MAX,
-	.cpu_funcs = {cpu_mult},
-	.cpu_funcs_name = {"cpu_mult"},
-	.nbuffers = 3,
-	.modes = {STARPU_R, STARPU_R, STARPU_RW},
-	.model = &starpu_gemm_model
-};
-
-static struct starpu_codelet cl_init_matrix_random =
-{
-	.max_parallelism = INT_MAX,
-	.cpu_funcs = {cpu_init_matrix_random},
-	.cpu_funcs_name = {"cpu_init_matrix_random"},
-	.nbuffers = 2,
-	.modes = {STARPU_W, STARPU_W}
-};
-
-static struct starpu_codelet cl_init_matrix_zero =
-{
-	.max_parallelism = INT_MAX,
-	.cpu_funcs = {cpu_init_matrix_zero},
-	.cpu_funcs_name = {"cpu_init_matrix_zero"},
-	.nbuffers = 1,
-	.modes = {STARPU_W}
-};
-
-static void parse_args(int argc, char **argv)
+void parse_args(int argc, char **argv)
 {
 	int i;
 	for (i = 1; i < argc; i++)
@@ -275,11 +94,6 @@ static void parse_args(int argc, char **argv)
 			check = 1;
 		}
 
-		else if (strcmp(argv[i], "-spmd") == 0)
-		{
-			cl.type = STARPU_SPMD;
-		}
-
 		else if (strcmp(argv[i], "-comm-thread-cpuid") == 0)
 		{
 			comm_thread_cpuid = atoi(argv[++i]);
@@ -287,7 +101,7 @@ static void parse_args(int argc, char **argv)
 
 		else if (strcmp(argv[i], "-help") == 0 || strcmp(argv[i], "--help") == 0 || strcmp(argv[i], "-h") == 0)
 		{
-			fprintf(stderr,"Usage: %s [-nblocks n] [-size size] [-check] [-spmd] [-comm-thread-cpuid cpuid]\n", argv[0]);
+			fprintf(stderr,"Usage: %s [-nblocks n] [-size size] [-check] [-comm-thread-cpuid cpuid]\n", argv[0]);
 			fprintf(stderr,"Currently selected: matrix size: %u - %u blocks\n", matrix_dim, nslices);
 			fprintf(stderr, "Use -comm-thread-cpuid to specifiy where to bind the comm benchmarking thread\n");
 			exit(EXIT_SUCCESS);
@@ -301,26 +115,6 @@ static void parse_args(int argc, char **argv)
 	}
 }
 
-
-static void* comm_thread_func(void* arg)
-{
-	if (comm_thread_cpuid < 0)
-	{
-		comm_thread_cpuid = starpu_get_next_bindid(STARPU_THREAD_ACTIVE, NULL, 0);
-	}
-
-	if (starpu_bind_thread_on(comm_thread_cpuid, 0, "Comm") < 0)
-	{
-		char hostname[65];
-		gethostname(hostname, sizeof(hostname));
-		_STARPU_DISP("[%s] No core was available for the comm thread. You should increase STARPU_RESERVE_NCPU or decrease STARPU_NCPU\n", hostname);
-	}
-
-	sendrecv_bench(mpi_rank, &thread_barrier);
-
-	return NULL;
-}
-
 int main(int argc, char **argv)
 {
 	double start, end;
@@ -363,10 +157,7 @@ int main(int argc, char **argv)
 
 
 	// Main thread will submit GEMM tasks:
-	starpu_malloc_flags((void **)&A, matrix_dim*matrix_dim*sizeof(TYPE), STARPU_MALLOC_PINNED|STARPU_MALLOC_SIMULATION_FOLDED);
-	starpu_malloc_flags((void **)&B, matrix_dim*matrix_dim*sizeof(TYPE), STARPU_MALLOC_PINNED|STARPU_MALLOC_SIMULATION_FOLDED);
-	starpu_malloc_flags((void **)&C, matrix_dim*matrix_dim*sizeof(TYPE), STARPU_MALLOC_PINNED|STARPU_MALLOC_SIMULATION_FOLDED);
-	partition_mult_data();
+	gemm_alloc_data();
 
 
 	if (mpi_rank == 0)
@@ -376,43 +167,9 @@ int main(int argc, char **argv)
 
 	starpu_pause();
 
-	unsigned x, y;
-#ifndef STARPU_SIMGRID
-	// Initialize matrices:
-	for (x = 0; x < nslices; x++)
-	{
-		struct starpu_task *task = starpu_task_create();
-		task->cl = &cl_init_matrix_random;
-		task->handles[0] = starpu_data_get_sub_data(A_handle, 1, x);
-		task->handles[1] = starpu_data_get_sub_data(B_handle, 1, x);
-		ret = starpu_task_submit(task);
-		CHECK_TASK_SUBMIT(ret);
-
-		for (y = 0; y < nslices; y++)
-		{
-			task = starpu_task_create();
-			task->cl = &cl_init_matrix_zero;
-			task->handles[0] = starpu_data_get_sub_data(C_handle, 2, x, y);
-			ret = starpu_task_submit(task);
-			CHECK_TASK_SUBMIT(ret);
-		}
-	}
-#endif
+	if(gemm_init_data() == -ENODEV || gemm_submit_tasks() == -ENODEV)
+		goto enodev;
 
-	for (x = 0; x < nslices; x++)
-	for (y = 0; y < nslices; y++)
-	{
-		struct starpu_task *task = starpu_task_create();
-		task->cl = &cl;
-		task->handles[0] = starpu_data_get_sub_data(A_handle, 1, y);
-		task->handles[1] = starpu_data_get_sub_data(B_handle, 1, x);
-		task->handles[2] = starpu_data_get_sub_data(C_handle, 2, x, y);
-		task->flops = 2ULL * (matrix_dim/nslices) * (matrix_dim/nslices) * matrix_dim;
-
-		ret = starpu_task_submit(task);
-		CHECK_TASK_SUBMIT(ret);
-		starpu_data_wont_use(starpu_data_get_sub_data(C_handle, 2, x, y));
-	}
 
 	starpu_mpi_barrier(MPI_COMM_WORLD);
 	starpu_fxt_start_profiling();
@@ -432,20 +189,7 @@ int main(int argc, char **argv)
 
 
 enodev:
-	starpu_data_unpartition(C_handle, STARPU_MAIN_RAM);
-	starpu_data_unpartition(B_handle, STARPU_MAIN_RAM);
-	starpu_data_unpartition(A_handle, STARPU_MAIN_RAM);
-
-	starpu_data_unregister(A_handle);
-	starpu_data_unregister(B_handle);
-	starpu_data_unregister(C_handle);
-
-	if (check)
-		check_output();
-
-	starpu_free_flags(A, matrix_dim*matrix_dim*sizeof(TYPE), STARPU_MALLOC_PINNED|STARPU_MALLOC_SIMULATION_FOLDED);
-	starpu_free_flags(B, matrix_dim*matrix_dim*sizeof(TYPE), STARPU_MALLOC_PINNED|STARPU_MALLOC_SIMULATION_FOLDED);
-	starpu_free_flags(C, matrix_dim*matrix_dim*sizeof(TYPE), STARPU_MALLOC_PINNED|STARPU_MALLOC_SIMULATION_FOLDED);
+	gemm_release();
 
 
 	// Wait comm thread: