Browse Source

mpi: add burst test

Philippe SWARTVAGHER 5 years ago
parent
commit
963ddee450
2 changed files with 230 additions and 2 deletions
  1. 4 2
      mpi/tests/Makefile.am
  2. 226 0
      mpi/tests/burst.c

+ 4 - 2
mpi/tests/Makefile.am

@@ -139,7 +139,8 @@ starpu_mpi_TESTS +=				\
 	temporary				\
 	user_defined_datatype			\
 	early_stuff				\
-	sendrecv_bench
+	sendrecv_bench				\
+	burst
 
 if !STARPU_USE_MPI_MPI
 starpu_mpi_TESTS +=				\
@@ -239,7 +240,8 @@ noinst_PROGRAMS =				\
 	load_balancer				\
 	driver					\
 	sendrecv_bench				\
-	sendrecv_parallel_tasks_bench
+	sendrecv_parallel_tasks_bench		\
+	burst
 
 if !NO_BLAS_LIB
 noinst_PROGRAMS +=				\

+ 226 - 0
mpi/tests/burst.c

@@ -0,0 +1,226 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2020  Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+/*
+ * This test sends simultaneously many communications, with various configurations.
+ *
+ * Global purpose is to watch the behaviour with traces.
+ */
+
+#include <starpu_mpi.h>
+#include "helper.h"
+
+#define NB_REQUESTS 500
+#define NX_ARRAY (320 * 320)
+
+static starpu_pthread_mutex_t mutex = STARPU_PTHREAD_MUTEX_INITIALIZER;
+static starpu_pthread_cond_t cond = STARPU_PTHREAD_COND_INITIALIZER;
+
+void recv_callback(void* arg)
+{
+	int* received = arg;
+
+	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+	*received = 1;
+	STARPU_PTHREAD_COND_SIGNAL(&cond);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+}
+
+int main(int argc, char **argv)
+{
+	int ret, rank, size, mpi_init, other_rank;
+	starpu_data_handle_t recv_handles[NB_REQUESTS];
+	starpu_data_handle_t send_handles[NB_REQUESTS];
+	float* recv_buffers[NB_REQUESTS];
+	float* send_buffers[NB_REQUESTS];
+	starpu_mpi_req recv_reqs[NB_REQUESTS];
+	starpu_mpi_req send_reqs[NB_REQUESTS];
+	MPI_Status status;
+
+	MPI_INIT_THREAD(&argc, &argv, MPI_THREAD_SERIALIZED, &mpi_init);
+
+	ret = starpu_mpi_init_conf(&argc, &argv, mpi_init, MPI_COMM_WORLD, NULL);
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_mpi_init_conf");
+
+	starpu_mpi_comm_rank(MPI_COMM_WORLD, &rank);
+	starpu_mpi_comm_size(MPI_COMM_WORLD, &size);
+
+	if (rank > 1)
+	{
+		starpu_mpi_barrier(MPI_COMM_WORLD);
+		starpu_mpi_wait_for_all(MPI_COMM_WORLD);
+
+		starpu_mpi_barrier(MPI_COMM_WORLD);
+		starpu_mpi_wait_for_all(MPI_COMM_WORLD);
+
+		starpu_mpi_barrier(MPI_COMM_WORLD);
+		starpu_mpi_wait_for_all(MPI_COMM_WORLD);
+
+		starpu_mpi_barrier(MPI_COMM_WORLD);
+		starpu_mpi_wait_for_all(MPI_COMM_WORLD);
+
+		starpu_mpi_shutdown();
+		if (!mpi_init)
+			MPI_Finalize();
+
+		return 0;
+	}
+
+	other_rank = (rank == 0) ? 1 : 0;
+
+
+	/* Burst simultaneous from both nodes */
+	if (rank == 0)
+	{
+		printf("Simultaneous....\n");
+	}
+
+	for (int i = 0; i < NB_REQUESTS; i++)
+	{
+		send_buffers[i] = malloc(NX_ARRAY * sizeof(float));
+		memset(send_buffers[i], 0, NX_ARRAY * sizeof(float));
+		starpu_vector_data_register(&send_handles[i], STARPU_MAIN_RAM, (uintptr_t) send_buffers[i], NX_ARRAY, sizeof(float));
+
+		recv_buffers[i] = malloc(NX_ARRAY * sizeof(float));
+		memset(recv_buffers[i], 0, NX_ARRAY * sizeof(float));
+		starpu_vector_data_register(&recv_handles[i], STARPU_MAIN_RAM, (uintptr_t) recv_buffers[i], NX_ARRAY, sizeof(float));
+
+		starpu_mpi_irecv(recv_handles[i], &recv_reqs[i], other_rank, i, MPI_COMM_WORLD);
+	}
+
+	starpu_mpi_barrier(MPI_COMM_WORLD);
+
+	for (int i = 0; i < NB_REQUESTS; i++)
+	{
+		starpu_mpi_isend_prio(send_handles[i], &send_reqs[i], other_rank, i, i, MPI_COMM_WORLD);
+	}
+
+	starpu_mpi_wait_for_all(MPI_COMM_WORLD);
+
+
+	/* Burst from 0 to 1 */
+	if (rank == 0)
+	{
+		printf("Done.\n");
+		printf("0 -> 1...\n");
+	}
+	else
+	{
+		for (int i = 0; i < NB_REQUESTS; i++)
+		{
+			starpu_mpi_irecv(recv_handles[i], &recv_reqs[i], other_rank, i, MPI_COMM_WORLD);
+		}
+	}
+
+	starpu_mpi_barrier(MPI_COMM_WORLD);
+
+	if (rank == 0)
+	{
+		for (int i = 0; i < NB_REQUESTS; i++)
+		{
+			starpu_mpi_isend_prio(send_handles[i], &send_reqs[i], other_rank, i, i, MPI_COMM_WORLD);
+		}
+	}
+
+	starpu_mpi_wait_for_all(MPI_COMM_WORLD);
+
+
+	/* Burst from 1 to 0 */
+	if (rank == 0)
+	{
+		printf("Done.\n");
+		printf("1 -> 0...\n");
+
+		for (int i = 0; i < NB_REQUESTS; i++)
+		{
+			starpu_mpi_irecv(recv_handles[i], &recv_reqs[i], other_rank, i, MPI_COMM_WORLD);
+		}
+	}
+
+	starpu_mpi_barrier(MPI_COMM_WORLD);
+
+	if (rank == 1)
+	{
+		for (int i = 0; i < NB_REQUESTS; i++)
+		{
+			starpu_mpi_isend_prio(send_handles[i], &send_reqs[i], other_rank, i, i, MPI_COMM_WORLD);
+		}
+	}
+
+	starpu_mpi_wait_for_all(MPI_COMM_WORLD);
+
+
+	/* Half burst from both nodes, second half burst is triggered after some requests finished. */
+	if (rank == 0)
+	{
+		printf("Done.\n");
+		printf("Half/half burst...\n");
+	}
+
+	int received = 0;
+
+	for (int i = 0; i < NB_REQUESTS; i++)
+	{
+		if (i == (NB_REQUESTS / 4))
+		{
+			starpu_mpi_irecv_detached(recv_handles[i], other_rank, i, MPI_COMM_WORLD, recv_callback, &received);
+		}
+		else
+		{
+			starpu_mpi_irecv(recv_handles[i], &recv_reqs[i], other_rank, i, MPI_COMM_WORLD);
+		}
+	}
+
+	starpu_mpi_barrier(MPI_COMM_WORLD);
+
+	for (int i = 0; i < (NB_REQUESTS / 2); i++)
+	{
+		starpu_mpi_isend_prio(send_handles[i], &send_reqs[i], other_rank, i, i, MPI_COMM_WORLD);
+	}
+
+	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+	while (!received)
+		STARPU_PTHREAD_COND_WAIT(&cond, &mutex);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+
+	for (int i = (NB_REQUESTS / 2); i < NB_REQUESTS; i++)
+	{
+		starpu_mpi_isend_prio(send_handles[i], &send_reqs[i], other_rank, i, i, MPI_COMM_WORLD);
+	}
+
+	starpu_mpi_wait_for_all(MPI_COMM_WORLD);
+
+	if (rank == 0)
+	{
+		printf("Done.\n");
+	}
+
+
+	for (int i = 0; i < NB_REQUESTS; i++)
+	{
+		starpu_data_unregister(send_handles[i]);
+		free(send_buffers[i]);
+
+		starpu_data_unregister(recv_handles[i]);
+		free(recv_buffers[i]);
+	}
+
+	starpu_mpi_shutdown();
+	if (!mpi_init)
+		MPI_Finalize();
+
+	return 0;
+}