Explorar o código

mpi/tests: add example which posts several early requests with the same tag and source.

Nathalie Furmento %!s(int64=10) %!d(string=hai) anos
pai
achega
b1eb29bb86
Modificáronse 2 ficheiros con 223 adicións e 2 borrados
  1. 6 2
      mpi/tests/Makefile.am
  2. 217 0
      mpi/tests/early_request.c

+ 6 - 2
mpi/tests/Makefile.am

@@ -124,7 +124,8 @@ starpu_mpi_TESTS =				\
 	policy_register				\
 	policy_register_many			\
 	policy_register_toomany			\
-	policy_unregister
+	policy_unregister			\
+	early_request
 
 noinst_PROGRAMS =				\
 	datatypes				\
@@ -173,7 +174,8 @@ noinst_PROGRAMS =				\
 	policy_register				\
 	policy_register_many			\
 	policy_register_toomany			\
-	policy_unregister
+	policy_unregister			\
+	early_request
 
 XFAIL_TESTS=					\
 	policy_register_toomany			\
@@ -273,6 +275,8 @@ policy_register_toomany_LDADD =			\
 	../src/libstarpumpi-@STARPU_EFFECTIVE_VERSION@.la
 policy_unregister_LDADD =			\
 	../src/libstarpumpi-@STARPU_EFFECTIVE_VERSION@.la
+early_request_LDADD =					\
+	../src/libstarpumpi-@STARPU_EFFECTIVE_VERSION@.la
 
 ring_SOURCES = ring.c
 ring_sync_SOURCES = ring_sync.c

+ 217 - 0
mpi/tests/early_request.c

@@ -0,0 +1,217 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2015  CNRS
+ * Copyright (C) 2015  INRIA
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <starpu.h>
+#include <starpu_mpi.h>
+#include "helper.h"
+
+#define NUM_EL 5
+#define NUM_LOOPS 10
+
+/*
+ * This testcase written by J-M Couteyen allows to test that several
+ * early requests for a given source and tag can be posted to StarPU
+ * by the application before data arrive.
+ *
+ * In this test case, multiples processes (called "domains") exchanges
+ * informations between multiple "elements" multiple times, with
+ * different sizes (in order to catch error more easily).
+ * The communications are independent between the elements (each one
+ * as its proper tag), but must occur in the submitted order for an
+ * element taken independtly.
+*/
+
+struct element
+{
+	int tag;
+	int foreign_domain;
+
+	int array_send[100];
+	int array_recv[100];
+
+	starpu_data_handle_t ensure_submitted_order_send;
+	starpu_data_handle_t ensure_submitted_order_recv;
+	starpu_data_handle_t send;
+	starpu_data_handle_t recv;
+};
+
+/* functions/codelet to fill the bufferss*/
+void fill_tmp_buffer(void *buffers[], void *cl_arg)
+{
+	int *tmp = (int *) STARPU_VECTOR_GET_PTR(buffers[0]);
+	int nx = STARPU_VECTOR_GET_NX(buffers[0]);
+	int i;
+
+	for (i=0; i<nx; i++)
+		tmp[i]=nx+i;
+}
+
+static struct starpu_codelet fill_tmp_buffer_cl =
+{
+	.where = STARPU_CPU,
+	.cpu_funcs = {fill_tmp_buffer, NULL},
+	.nbuffers = 1,
+	.modes = {STARPU_W},
+	.name = "fill_tmp_buffer"
+};
+
+void read_ghost(void *buffers[], void *cl_arg)
+{
+	int *tmp = (int *) STARPU_VECTOR_GET_PTR(buffers[0]);
+	int nx=STARPU_VECTOR_GET_NX(buffers[0]);
+	int i;
+	for(i=0; i<nx;i++)
+	{
+		assert(tmp[i]==nx+i);
+	}
+}
+
+static struct starpu_codelet read_ghost_value_cl =
+{
+	.where = STARPU_CPU,
+	.cpu_funcs = {read_ghost, NULL},
+	.nbuffers = 1,
+	.modes = {STARPU_R},
+	.name = "read_ghost_value"
+};
+
+/*codelet to ensure submitted order for a given element*/
+void noop(void *buffers[], void *cl_arg)
+{
+}
+
+void submitted_order_fun(void *buffers[], void *cl_arg)
+{
+}
+
+static struct starpu_codelet submitted_order =
+{
+	.where = STARPU_CPU,
+	.cpu_funcs = {submitted_order_fun, NULL},
+	.nbuffers = 2,
+	.modes = {STARPU_RW, STARPU_W},
+	.name = "submitted_order_enforcer"
+};
+
+void init_element(struct element *el, int size, int foreign_domain)
+{
+	el->tag=size;
+	el->foreign_domain=foreign_domain;
+
+	int mpi_rank, mpi_size;
+	MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank);
+
+	starpu_vector_data_register(&el->recv, 0, (uintptr_t)el->array_recv, size, sizeof(int));
+	starpu_vector_data_register(&el->send, 0, (uintptr_t)el->array_send, size, sizeof(int));
+	starpu_void_data_register(&el->ensure_submitted_order_send);
+	starpu_void_data_register(&el->ensure_submitted_order_recv);
+}
+
+void free_element(struct element *el)
+{
+	starpu_data_unregister(el->recv);
+	starpu_data_unregister(el->send);
+	starpu_data_unregister(el->ensure_submitted_order_send);
+	starpu_data_unregister(el->ensure_submitted_order_recv);
+}
+
+void insert_work_for_one_element(struct element *el)
+{
+	starpu_data_handle_t tmp_recv;
+	starpu_data_handle_t tmp_send;
+
+	starpu_vector_data_register(&tmp_recv, -1, 0, el->tag, sizeof(int));
+	starpu_vector_data_register(&tmp_send, -1, 0, el->tag, sizeof(int));
+
+	//Emulate the work to fill the send buffer
+	starpu_insert_task(&fill_tmp_buffer_cl,
+			   STARPU_W,tmp_send,
+			   0);
+	//Send operation
+	starpu_insert_task(&submitted_order,
+			   STARPU_RW,el->ensure_submitted_order_send,
+			   STARPU_W,tmp_send,
+			   0);
+	starpu_mpi_isend_detached(tmp_send,el->foreign_domain,el->tag, MPI_COMM_WORLD, NULL, NULL);
+
+	//Recv operation for current element
+	starpu_insert_task(&submitted_order,
+			   STARPU_RW,el->ensure_submitted_order_recv,
+			   STARPU_W,tmp_recv,
+			   0);
+	starpu_mpi_irecv_detached(tmp_recv,el->foreign_domain,el->tag, MPI_COMM_WORLD, NULL, NULL);
+	//Emulate the "reading" of the recv value.
+	starpu_insert_task(&read_ghost_value_cl,
+			   STARPU_R,tmp_recv,
+			   0);
+
+	starpu_data_unregister_submit(tmp_send);
+	starpu_data_unregister_submit(tmp_recv);
+}
+
+/*main program*/
+int main(int argc, char * argv[])
+{
+	/* Init */
+	int ret;
+	int mpi_rank, mpi_size;
+	MPI_Init(&argc, &argv);
+	MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank);
+	MPI_Comm_size(MPI_COMM_WORLD, &mpi_size);
+
+	ret = starpu_init(NULL);
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
+	ret = starpu_mpi_init(NULL, NULL, 0);
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_mpi_init");
+
+	/*element initialization : domains are connected as a ring for this test*/
+	int num_elements=NUM_EL;
+	struct element * el_left=malloc(num_elements*sizeof(el_left[0]));
+	struct element * el_right=malloc(num_elements*sizeof(el_right[0]));
+	int i;
+	for(i=0;i<num_elements;i++)
+	{
+		init_element(el_left+i,i+1,((mpi_rank-1)+mpi_size)%mpi_size);
+		init_element(el_right+i,i+1,(mpi_rank+1)%mpi_size);
+	}
+
+	/* Communication loop */
+	for (i=0; i<NUM_LOOPS; i++) //number of "computations loops"
+	{
+		int e;
+		for (e=0;e<num_elements;e++) //Do something for each elements
+		{
+			insert_work_for_one_element(el_right+e);
+			insert_work_for_one_element(el_left+e);
+		}
+	}
+	/* End */
+	starpu_task_wait_for_all();
+
+	for(i=0;i<num_elements;i++)
+	{
+		free_element(el_left+i);
+		free_element(el_right+i);
+	}
+
+	starpu_mpi_shutdown();
+	starpu_shutdown();
+
+	MPI_Finalize();
+	FPRINTF(stderr, "No assert until end\n");
+	return 0;
+}