Bläddra i källkod

Distributed CG

Here is a distributed implementation of the Conjugate Gradient. It's based on the basic example already in StarPU examples/cg/cg.c, but distributes data among MPI processes and submits tasks taking into account MPI.
Philippe SWARTVAGHER 4 år sedan
förälder
incheckning
6a9688f1ef
6 ändrade filer med 688 tillägg och 236 borttagningar
  1. 0 1
      examples/Makefile.am
  2. 27 173
      examples/cg/cg.c
  3. 0 25
      examples/cg/cg.h
  4. 212 35
      examples/cg/cg_kernels.c
  5. 20 2
      mpi/examples/Makefile.am
  6. 429 0
      mpi/examples/cg/cg.c

+ 0 - 1
examples/Makefile.am

@@ -869,7 +869,6 @@ if !STARPU_NO_BLAS_LIB
 
 cg_cg_SOURCES =					\
 	cg/cg.c					\
-	cg/cg_kernels.c				\
 	common/blas.c
 
 cg_cg_LDADD =					\

+ 27 - 173
examples/cg/cg.c

@@ -19,11 +19,6 @@
 #include <starpu.h>
 #include <common/blas.h>
 
-#ifdef STARPU_USE_CUDA
-#include <cuda.h>
-#endif
-
-#define FPRINTF(ofile, fmt, ...) do { if (!getenv("STARPU_SSILENT")) {fprintf(ofile, fmt, ## __VA_ARGS__); }} while(0)
 
 /*
  *	Conjugate Gradient
@@ -68,33 +63,34 @@
 
 #include "cg.h"
 
-static int long long n = 4096;
-static int nblocks = 8;
-static int use_reduction = 1;
-static int display_result = 0;
+static int copy_handle(starpu_data_handle_t dst, starpu_data_handle_t src, unsigned nblocks);
 
-static starpu_data_handle_t A_handle, b_handle, x_handle;
-static TYPE *A, *b, *x;
+#define HANDLE_TYPE_VECTOR starpu_data_handle_t
+#define HANDLE_TYPE_MATRIX starpu_data_handle_t
+#define TASK_INSERT(cl, ...) starpu_task_insert(cl, ##__VA_ARGS__)
+#define GET_VECTOR_BLOCK(v, i) starpu_data_get_sub_data(v, 1, i)
+#define GET_MATRIX_BLOCK(m, i, j) starpu_data_get_sub_data(m, 2, i, j)
+#define BARRIER()
+#define GET_DATA_HANDLE(handle)
+#define FPRINTF_SERVER FPRINTF
+
+#include "cg_kernels.c"
 
-#ifdef STARPU_QUICK_CHECK
-static int i_max = 5;
-#elif !defined(STARPU_LONG_CHECK)
-static int i_max = 100;
-#else
-static int i_max = 1000;
-#endif
-static double eps = (10e-14);
 
-static starpu_data_handle_t r_handle, d_handle, q_handle;
+
+static TYPE *A, *b, *x;
 static TYPE *r, *d, *q;
 
-static starpu_data_handle_t dtq_handle, rtr_handle;
-static TYPE dtq, rtr;
 
-extern struct starpu_codelet accumulate_variable_cl;
-extern struct starpu_codelet accumulate_vector_cl;
-extern struct starpu_codelet bzero_variable_cl;
-extern struct starpu_codelet bzero_vector_cl;
+static int copy_handle(starpu_data_handle_t dst, starpu_data_handle_t src, unsigned nblocks)
+{
+	unsigned b;
+
+	for (b = 0; b < nblocks; b++)
+		starpu_data_cpy(starpu_data_get_sub_data(dst, 1, b), starpu_data_get_sub_data(src, 1, b), 1, NULL, NULL);
+	return 0;
+}
+
 
 /*
  *	Generate Input data
@@ -286,165 +282,23 @@ static void display_x_result(void)
 	}
 }
 
-/*
- *	Main loop
- */
-
-static int cg(void)
-{
-	double delta_new, delta_0;
-
-	int i = 0;
-	int ret;
-
-	/* r <- b */
-	ret = copy_handle(r_handle, b_handle, nblocks);
-	if (ret == -ENODEV) return ret;
-
-	/* r <- r - A x */
-	ret = gemv_kernel(r_handle, A_handle, x_handle, 1.0, -1.0, nblocks, use_reduction);
-	if (ret == -ENODEV) return ret;
-
-	/* d <- r */
-	ret = copy_handle(d_handle, r_handle, nblocks);
-	if (ret == -ENODEV) return ret;
-
-	/* delta_new = dot(r,r) */
-	ret = dot_kernel(r_handle, r_handle, rtr_handle, nblocks, use_reduction);
-	if (ret == -ENODEV) return ret;
-
-	starpu_data_acquire(rtr_handle, STARPU_R);
-	delta_new = rtr;
-	delta_0 = delta_new;
-	starpu_data_release(rtr_handle);
-
-	FPRINTF(stderr, "Delta limit: %e\n", (double) (eps*eps*delta_0));
-
-	FPRINTF(stderr, "**************** INITIAL ****************\n");
-	FPRINTF(stderr, "Delta 0: %e\n", delta_new);
-
-	double start;
-	double end;
-	start = starpu_timing_now();
-
-	while ((i < i_max) && ((double)delta_new > (double)(eps*eps*delta_0)))
-	{
-		double delta_old;
-		double alpha, beta;
-
-		starpu_iteration_push(i);
-
-		/* q <- A d */
-		gemv_kernel(q_handle, A_handle, d_handle, 0.0, 1.0, nblocks, use_reduction);
-
-		/* dtq <- dot(d,q) */
-		dot_kernel(d_handle, q_handle, dtq_handle, nblocks, use_reduction);
-
-		/* alpha = delta_new / dtq */
-		starpu_data_acquire(dtq_handle, STARPU_R);
-		alpha = delta_new/dtq;
-		starpu_data_release(dtq_handle);
-
-		/* x <- x + alpha d */
-		axpy_kernel(x_handle, d_handle, alpha, nblocks);
-
-		if ((i % 50) == 0)
-		{
-			/* r <- b */
-			copy_handle(r_handle, b_handle, nblocks);
-
-			/* r <- r - A x */
-			gemv_kernel(r_handle, A_handle, x_handle, 1.0, -1.0, nblocks, use_reduction);
-		}
-		else
-		{
-			/* r <- r - alpha q */
-			axpy_kernel(r_handle, q_handle, -alpha, nblocks);
-		}
-
-		/* delta_new = dot(r,r) */
-		dot_kernel(r_handle, r_handle, rtr_handle, nblocks, use_reduction);
-
-		starpu_data_acquire(rtr_handle, STARPU_R);
-		delta_old = delta_new;
-		delta_new = rtr;
-		beta = delta_new / delta_old;
-		starpu_data_release(rtr_handle);
-
-		/* d <- beta d + r */
-		scal_axpy_kernel(d_handle, beta, r_handle, 1.0, nblocks);
-
-		if ((i % 10) == 0)
-		{
-			/* We here take the error as ||r||_2 / (n||b||_2) */
-			double error = sqrt(delta_new/delta_0)/(1.0*n);
-			FPRINTF(stderr, "*****************************************\n");
-			FPRINTF(stderr, "iter %d DELTA %e - %e\n", i, delta_new, error);
-		}
-
-		starpu_iteration_pop();
-		i++;
-	}
-
-	end = starpu_timing_now();
-	double timing = end - start;
-
-	FPRINTF(stderr, "Total timing : %2.2f seconds\n", timing/10e6);
-	FPRINTF(stderr, "Seconds per iteration : %2.2e seconds\n", timing/10e6/i);
-	FPRINTF(stderr, "Number of iterations per second : %2.2e it/s\n", i/(timing/10e6));
-
-	return 0;
-}
 
 static void parse_args(int argc, char **argv)
 {
 	int i;
 	for (i = 1; i < argc; i++)
 	{
-	        if (strcmp(argv[i], "-n") == 0)
-		{
-			n = (int long long)atoi(argv[++i]);
-			continue;
-		}
-
-		if (strcmp(argv[i], "-display-result") == 0)
-		{
-			display_result = 1;
-			continue;
-		}
-
-
-	        if (strcmp(argv[i], "-maxiter") == 0)
-		{
-			i_max = atoi(argv[++i]);
-			if (i_max <= 0)
-			{
-				FPRINTF(stderr, "the number of iterations must be positive, not %d\n", i_max);
-				exit(EXIT_FAILURE);
-			}
-			continue;
-		}
-
-	        if (strcmp(argv[i], "-nblocks") == 0)
-		{
-			nblocks = atoi(argv[++i]);
-			continue;
-		}
-
-	        if (strcmp(argv[i], "-no-reduction") == 0)
-		{
-			use_reduction = 0;
-			continue;
-		}
-
 		if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0 || strcmp(argv[i], "-help") == 0)
 		{
-			FPRINTF(stderr, "usage: %s [-h] [-nblocks #blocks] [-display-result] [-n problem_size] [-no-reduction] [-maxiter i]\n", argv[0]);
+			FPRINTF_SERVER(stderr, "usage: %s [-h] [-nblocks #blocks] [-display-result] [-n problem_size] [-no-reduction] [-maxiter i]\n", argv[0]);
 			exit(-1);
 		}
-        }
+	}
+
+	parse_common_args(argc, argv);
 }
 
+
 int main(int argc, char **argv)
 {
 	int ret;

+ 0 - 25
examples/cg/cg.h

@@ -54,29 +54,4 @@
 #define cublasscal	cublasSscal
 #endif
 
-int dot_kernel(starpu_data_handle_t v1,
-	       starpu_data_handle_t v2,
-	       starpu_data_handle_t s,
-	       unsigned nblocks,
-	       int use_reduction);
-
-int gemv_kernel(starpu_data_handle_t v1,
-                starpu_data_handle_t matrix, 
-                starpu_data_handle_t v2,
-                TYPE p1, TYPE p2,
-		unsigned nblocks,
-		int use_reduction);
-
-int axpy_kernel(starpu_data_handle_t v1,
-		starpu_data_handle_t v2, TYPE p1,
-		unsigned nblocks);
-
-int scal_axpy_kernel(starpu_data_handle_t v1, TYPE p1,
-		     starpu_data_handle_t v2, TYPE p2,
-		     unsigned nblocks);
-
-int copy_handle(starpu_data_handle_t dst,
-		starpu_data_handle_t src,
-		unsigned nblocks);
-
 #endif /* __STARPU_EXAMPLE_CG_H__ */

+ 212 - 35
examples/cg/cg_kernels.c

@@ -23,11 +23,41 @@
 #include <limits.h>
 
 #ifdef STARPU_USE_CUDA
+#include <cuda.h>
 #include <starpu_cublas_v2.h>
 static const TYPE gp1 = 1.0;
 static const TYPE gm1 = -1.0;
 #endif
 
+#define FPRINTF(ofile, fmt, ...) do { if (!getenv("STARPU_SSILENT")) {fprintf(ofile, fmt, ## __VA_ARGS__); }} while(0)
+
+static int long long n = 4096;
+static int nblocks = 8;
+
+#ifdef STARPU_QUICK_CHECK
+static int i_max = 5;
+#elif !defined(STARPU_LONG_CHECK)
+static int i_max = 100;
+#else
+static int i_max = 1000;
+#endif
+static double eps = (10e-14);
+
+int use_reduction = 1;
+int display_result = 0;
+
+HANDLE_TYPE_MATRIX A_handle;
+HANDLE_TYPE_VECTOR b_handle;
+HANDLE_TYPE_VECTOR x_handle;
+
+HANDLE_TYPE_VECTOR r_handle;
+HANDLE_TYPE_VECTOR d_handle;
+HANDLE_TYPE_VECTOR q_handle;
+
+starpu_data_handle_t dtq_handle;
+starpu_data_handle_t rtr_handle;
+TYPE dtq, rtr;
+
 #if 0
 static void print_vector_from_descr(unsigned nx, TYPE *v)
 {
@@ -314,8 +344,8 @@ static struct starpu_codelet dot_kernel_cl =
 	.model = &dot_kernel_model
 };
 
-int dot_kernel(starpu_data_handle_t v1,
-	       starpu_data_handle_t v2,
+int dot_kernel(HANDLE_TYPE_VECTOR v1,
+	       HANDLE_TYPE_VECTOR v2,
 	       starpu_data_handle_t s,
 	       unsigned nblocks,
 	       int use_reduction)
@@ -327,21 +357,21 @@ int dot_kernel(starpu_data_handle_t v1,
 		starpu_data_invalidate_submit(s);
 	else
 	{
-		ret = starpu_task_insert(&bzero_variable_cl, STARPU_W, s, 0);
+		ret = TASK_INSERT(&bzero_variable_cl, STARPU_W, s, 0);
 		if (ret == -ENODEV) return ret;
-		STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_insert");
+		STARPU_CHECK_RETURN_VALUE(ret, "TASK_INSERT");
 	}
 
 	unsigned b;
 	for (b = 0; b < nblocks; b++)
 	{
-		ret = starpu_task_insert(&dot_kernel_cl,
+		ret = TASK_INSERT(&dot_kernel_cl,
 					 use_reduction?STARPU_REDUX:STARPU_RW, s,
-					 STARPU_R, starpu_data_get_sub_data(v1, 1, b),
-					 STARPU_R, starpu_data_get_sub_data(v2, 1, b),
+					 STARPU_R, GET_VECTOR_BLOCK(v1, b),
+					 STARPU_R, GET_VECTOR_BLOCK(v2, b),
 					 STARPU_TAG_ONLY, (starpu_tag_t) b,
 					 0);
-		STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_insert");
+		STARPU_CHECK_RETURN_VALUE(ret, "TASK_INSERT");
 	}
 	return 0;
 }
@@ -477,9 +507,9 @@ static struct starpu_codelet gemv_kernel_cl =
 	.model = &gemv_kernel_model
 };
 
-int gemv_kernel(starpu_data_handle_t v1,
-		starpu_data_handle_t matrix,
-		starpu_data_handle_t v2,
+int gemv_kernel(HANDLE_TYPE_VECTOR v1,
+		HANDLE_TYPE_MATRIX matrix,
+		HANDLE_TYPE_VECTOR v2,
 		TYPE p1, TYPE p2,
 		unsigned nblocks,
 		int use_reduction)
@@ -489,13 +519,13 @@ int gemv_kernel(starpu_data_handle_t v1,
 
 	for (b2 = 0; b2 < nblocks; b2++)
 	{
-		ret = starpu_task_insert(&scal_kernel_cl,
-					 STARPU_RW, starpu_data_get_sub_data(v1, 1, b2),
+		ret = TASK_INSERT(&scal_kernel_cl,
+					 STARPU_RW, GET_VECTOR_BLOCK(v1, b2),
 					 STARPU_VALUE, &p1, sizeof(p1),
 					 STARPU_TAG_ONLY, (starpu_tag_t) b2,
 					 0);
 		if (ret == -ENODEV) return ret;
-		STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_insert");
+		STARPU_CHECK_RETURN_VALUE(ret, "TASK_INSERT");
 	}
 
 	for (b2 = 0; b2 < nblocks; b2++)
@@ -503,15 +533,15 @@ int gemv_kernel(starpu_data_handle_t v1,
 		for (b1 = 0; b1 < nblocks; b1++)
 		{
 			TYPE one = 1.0;
-			ret = starpu_task_insert(&gemv_kernel_cl,
-						 use_reduction?STARPU_REDUX:STARPU_RW,	starpu_data_get_sub_data(v1, 1, b2),
-						 STARPU_R,	starpu_data_get_sub_data(matrix, 2, b2, b1),
-						 STARPU_R,	starpu_data_get_sub_data(v2, 1, b1),
+			ret = TASK_INSERT(&gemv_kernel_cl,
+						 use_reduction?STARPU_REDUX:STARPU_RW,	GET_VECTOR_BLOCK(v1, b2),
+						 STARPU_R,	GET_MATRIX_BLOCK(matrix, b2, b1),
+						 STARPU_R,	GET_VECTOR_BLOCK(v2, b1),
 						 STARPU_VALUE,	&one,	sizeof(one),
 						 STARPU_VALUE,	&p2,	sizeof(p2),
 						 STARPU_TAG_ONLY, ((starpu_tag_t)b2) * nblocks + b1,
 						 0);
-			STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_insert");
+			STARPU_CHECK_RETURN_VALUE(ret, "TASK_INSERT");
 		}
 	}
 	return 0;
@@ -582,23 +612,23 @@ static struct starpu_codelet scal_axpy_kernel_cl =
 	.model = &scal_axpy_kernel_model
 };
 
-int scal_axpy_kernel(starpu_data_handle_t v1, TYPE p1,
-		     starpu_data_handle_t v2, TYPE p2,
+int scal_axpy_kernel(HANDLE_TYPE_VECTOR v1, TYPE p1,
+		     HANDLE_TYPE_VECTOR v2, TYPE p2,
 		     unsigned nblocks)
 {
 	unsigned b;
 	for (b = 0; b < nblocks; b++)
 	{
 		int ret;
-		ret = starpu_task_insert(&scal_axpy_kernel_cl,
-					 STARPU_RW, starpu_data_get_sub_data(v1, 1, b),
-					 STARPU_R,  starpu_data_get_sub_data(v2, 1, b),
+		ret = TASK_INSERT(&scal_axpy_kernel_cl,
+					 STARPU_RW, GET_VECTOR_BLOCK(v1, b),
+					 STARPU_R,  GET_VECTOR_BLOCK(v2, b),
 					 STARPU_VALUE, &p1, sizeof(p1),
 					 STARPU_VALUE, &p2, sizeof(p2),
 					 STARPU_TAG_ONLY, (starpu_tag_t) b,
 					 0);
 		if (ret == -ENODEV) return ret;
-		STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_insert");
+		STARPU_CHECK_RETURN_VALUE(ret, "TASK_INSERT");
 	}
 	return 0;
 }
@@ -661,30 +691,177 @@ static struct starpu_codelet axpy_kernel_cl =
 	.model = &axpy_kernel_model
 };
 
-int axpy_kernel(starpu_data_handle_t v1,
-		starpu_data_handle_t v2, TYPE p1,
+int axpy_kernel(HANDLE_TYPE_VECTOR v1,
+		HANDLE_TYPE_VECTOR v2, TYPE p1,
 		unsigned nblocks)
 {
 	unsigned b;
 	for (b = 0; b < nblocks; b++)
 	{
 		int ret;
-		ret = starpu_task_insert(&axpy_kernel_cl,
-					 STARPU_RW, starpu_data_get_sub_data(v1, 1, b),
-					 STARPU_R,  starpu_data_get_sub_data(v2, 1, b),
+		ret = TASK_INSERT(&axpy_kernel_cl,
+					 STARPU_RW, GET_VECTOR_BLOCK(v1, b),
+					 STARPU_R,  GET_VECTOR_BLOCK(v2, b),
 					 STARPU_VALUE, &p1, sizeof(p1),
 					 STARPU_TAG_ONLY, (starpu_tag_t) b,
 					 0);
 		if (ret == -ENODEV) return ret;
-		STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_insert");
+		STARPU_CHECK_RETURN_VALUE(ret, "TASK_INSERT");
 	}
 	return 0;
 }
 
-int copy_handle(starpu_data_handle_t dst, starpu_data_handle_t src, unsigned nblocks)
+
+/*
+ *	Main loop
+ */
+int cg(void)
 {
-	unsigned b;
-	for (b = 0; b < nblocks; b++)
-		starpu_data_cpy(starpu_data_get_sub_data(dst, 1, b), starpu_data_get_sub_data(src, 1, b), 1, NULL, NULL);
+	TYPE delta_new, delta_0, error, delta_old, alpha, beta;
+	double start, end, timing;
+	int i = 0, ret;
+
+	/* r <- b */
+	ret = copy_handle(r_handle, b_handle, nblocks);
+	if (ret == -ENODEV) return ret;
+
+	/* r <- r - A x */
+	ret = gemv_kernel(r_handle, A_handle, x_handle, 1.0, -1.0, nblocks, use_reduction);
+	if (ret == -ENODEV) return ret;
+
+	/* d <- r */
+	ret = copy_handle(d_handle, r_handle, nblocks);
+	if (ret == -ENODEV) return ret;
+
+	/* delta_new = dot(r,r) */
+	ret = dot_kernel(r_handle, r_handle, rtr_handle, nblocks, use_reduction);
+	if (ret == -ENODEV) return ret;
+
+	GET_DATA_HANDLE(rtr_handle);
+	starpu_data_acquire(rtr_handle, STARPU_R);
+	delta_new = rtr;
+	delta_0 = delta_new;
+	starpu_data_release(rtr_handle);
+
+	FPRINTF_SERVER(stderr, "Delta limit: %e\n", (double) (eps*eps*delta_0));
+
+	FPRINTF_SERVER(stderr, "**************** INITIAL ****************\n");
+	FPRINTF_SERVER(stderr, "Delta 0: %e\n", delta_new);
+
+	BARRIER();
+	start = starpu_timing_now();
+
+	while ((i < i_max) && ((double)delta_new > (double)(eps*eps*delta_0)))
+	{
+		starpu_iteration_push(i);
+
+		/* q <- A d */
+		gemv_kernel(q_handle, A_handle, d_handle, 0.0, 1.0, nblocks, use_reduction);
+
+		/* dtq <- dot(d,q) */
+		dot_kernel(d_handle, q_handle, dtq_handle, nblocks, use_reduction);
+
+		/* alpha = delta_new / dtq */
+		GET_DATA_HANDLE(dtq_handle);
+		starpu_data_acquire(dtq_handle, STARPU_R);
+		alpha = delta_new / dtq;
+		starpu_data_release(dtq_handle);
+
+		/* x <- x + alpha d */
+		axpy_kernel(x_handle, d_handle, alpha, nblocks);
+
+		if ((i % 50) == 0)
+		{
+			/* r <- b */
+			copy_handle(r_handle, b_handle, nblocks);
+
+			/* r <- r - A x */
+			gemv_kernel(r_handle, A_handle, x_handle, 1.0, -1.0, nblocks, use_reduction);
+		}
+		else
+		{
+			/* r <- r - alpha q */
+			axpy_kernel(r_handle, q_handle, -alpha, nblocks);
+		}
+
+		/* delta_new = dot(r,r) */
+		dot_kernel(r_handle, r_handle, rtr_handle, nblocks, use_reduction);
+
+		GET_DATA_HANDLE(rtr_handle);
+		starpu_data_acquire(rtr_handle, STARPU_R);
+		delta_old = delta_new;
+		delta_new = rtr;
+		beta = delta_new / delta_old;
+		starpu_data_release(rtr_handle);
+
+		/* d <- beta d + r */
+		scal_axpy_kernel(d_handle, beta, r_handle, 1.0, nblocks);
+
+		if ((i % 10) == 0)
+		{
+			/* We here take the error as ||r||_2 / (n||b||_2) */
+			error = sqrt(delta_new/delta_0)/(1.0*n);
+			FPRINTF_SERVER(stderr, "*****************************************\n");
+			FPRINTF_SERVER(stderr, "iter %d DELTA %e - %e\n", i, delta_new, error);
+		}
+
+		starpu_iteration_pop();
+		i++;
+	}
+
+	BARRIER();
+	end = starpu_timing_now();
+	timing = end - start;
+
+	error = sqrt(delta_new/delta_0)/(1.0*n);
+	FPRINTF_SERVER(stderr, "*****************************************\n");
+	FPRINTF_SERVER(stderr, "iter %d DELTA %e - %e\n", i, delta_new, error);
+	FPRINTF_SERVER(stderr, "Total timing : %2.2f seconds\n", timing/10e6);
+	FPRINTF_SERVER(stderr, "Seconds per iteration : %2.2e seconds\n", timing/10e6/i);
+	FPRINTF_SERVER(stderr, "Number of iterations per second : %2.2e it/s\n", i/(timing/10e6));
+
 	return 0;
 }
+
+
+void parse_common_args(int argc, char **argv)
+{
+	int i;
+	for (i = 1; i < argc; i++)
+	{
+		if (strcmp(argv[i], "-n") == 0)
+		{
+			n = (int long long)atoi(argv[++i]);
+			continue;
+		}
+
+		if (strcmp(argv[i], "-display-result") == 0)
+		{
+			display_result = 1;
+			continue;
+		}
+
+		if (strcmp(argv[i], "-maxiter") == 0)
+		{
+			i_max = atoi(argv[++i]);
+			if (i_max <= 0)
+			{
+				FPRINTF_SERVER(stderr, "the number of iterations must be positive, not %d\n", i_max);
+				exit(EXIT_FAILURE);
+			}
+			continue;
+		}
+
+		if (strcmp(argv[i], "-nblocks") == 0)
+		{
+			nblocks = atoi(argv[++i]);
+			continue;
+		}
+
+		if (strcmp(argv[i], "-no-reduction") == 0)
+		{
+			use_reduction = 0;
+			continue;
+		}
+	}
+}

+ 20 - 2
mpi/examples/Makefile.am

@@ -272,9 +272,25 @@ starpu_mpi_EXAMPLES +=				\
 	matrix_decomposition/mpi_cholesky_distributed
 endif
 
-########################
+##############
+# CG example #
+##############
+
+if !STARPU_NO_BLAS_LIB
+examplebin_PROGRAMS += cg/cg
+starpu_mpi_EXAMPLES += cg/cg
+
+cg_cg_SOURCES =					\
+	cg/cg.c						\
+	../../examples/common/blas.c
+
+cg_cg_LDADD =					\
+	$(STARPU_BLAS_LDFLAGS)
+endif
+
+###########################
 # MPI Matrix mult example #
-########################
+###########################
 
 examplebin_PROGRAMS +=		\
 	matrix_mult/mm
@@ -307,6 +323,7 @@ if !STARPU_SIMGRID
 starpu_mpi_EXAMPLES +=				\
 	mpi_redux/mpi_redux
 endif
+
 ##########################################
 # Native Fortran MPI Matrix mult example #
 ##########################################
@@ -352,6 +369,7 @@ starpu_mpi_EXAMPLES +=				\
 endif
 endif
 endif
+
 ########################################
 # Native Fortran MPI STARPU_REDUX test #
 ########################################

+ 429 - 0
mpi/examples/cg/cg.c

@@ -0,0 +1,429 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2021  Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <math.h>
+#include <assert.h>
+#include <starpu.h>
+#include <starpu_mpi.h>
+#include <common/blas.h>
+
+
+/*
+ * Distributed version of Conjugate Gradient implemented in examples/cg/cg.c
+ *
+ * Use -display-result option and compare with the non-distributed version: the
+ * x vector should be the same.
+ */
+
+#include "../../../examples/cg/cg.h"
+
+static int copy_handle(starpu_data_handle_t* dst, starpu_data_handle_t* src, unsigned nblocks);
+
+#define HANDLE_TYPE_VECTOR starpu_data_handle_t*
+#define HANDLE_TYPE_MATRIX starpu_data_handle_t**
+#define TASK_INSERT(cl, ...) starpu_mpi_task_insert(MPI_COMM_WORLD, cl, ##__VA_ARGS__)
+#define GET_VECTOR_BLOCK(v, i) v[i]
+#define GET_MATRIX_BLOCK(m, i, j) m[i][j]
+#define BARRIER() starpu_mpi_barrier(MPI_COMM_WORLD);
+#define GET_DATA_HANDLE(handle) starpu_mpi_get_data_on_all_nodes_detached(MPI_COMM_WORLD, handle)
+
+static int block_size;
+
+static int rank;
+static int nodes_p = 2;
+static int nodes_q;
+
+static TYPE ***A;
+static TYPE **x;
+static TYPE **b;
+
+static TYPE **r;
+static TYPE **d;
+static TYPE **q;
+
+#define FPRINTF_SERVER(ofile, fmt, ...) do { if (!getenv("STARPU_SSILENT") && rank == 0) {fprintf(ofile, fmt, ## __VA_ARGS__); }} while(0)
+
+#include "../../../examples/cg/cg_kernels.c"
+
+
+static int my_distrib(const int y, const int x)
+{
+	return (y%nodes_q)*nodes_p + (x%nodes_p);
+}
+
+
+static int copy_handle(starpu_data_handle_t* dst, starpu_data_handle_t* src, unsigned nblocks)
+{
+	unsigned b;
+
+	for (b = 0; b < nblocks; b++)
+	{
+		if (rank == my_distrib(b, 0))
+		{
+			starpu_data_cpy(dst[b], src[b], /* asynchronous */ 1, /* without callback */ NULL, NULL);
+		}
+	}
+
+	return 0;
+}
+
+
+/*
+ *	Generate Input data
+ */
+
+static void generate_random_problem(void)
+{
+	unsigned nn, mm, m, n, mpi_rank;
+
+	A = malloc(nblocks * sizeof(TYPE **));
+	x = malloc(nblocks * sizeof(TYPE *));
+	b = malloc(nblocks * sizeof(TYPE *));
+
+	r = malloc(nblocks * sizeof(TYPE *));
+	d = malloc(nblocks * sizeof(TYPE *));
+	q = malloc(nblocks * sizeof(TYPE *));
+
+	for (m = 0; m < nblocks; m++)
+	{
+		A[m] = malloc(nblocks * sizeof(TYPE*));
+
+		mpi_rank = my_distrib(m, 0);
+
+		if (mpi_rank == rank || display_result)
+		{
+			starpu_malloc((void**) &x[m], block_size*sizeof(TYPE));
+		}
+
+		if (mpi_rank == rank)
+		{
+			starpu_malloc((void**) &b[m], block_size*sizeof(TYPE));
+			starpu_malloc((void**) &r[m], block_size*sizeof(TYPE));
+			starpu_malloc((void**) &d[m], block_size*sizeof(TYPE));
+			starpu_malloc((void**) &q[m], block_size*sizeof(TYPE));
+
+			for (mm = 0; mm < block_size; mm++)
+			{
+				x[m][mm] = (TYPE) 0.0;
+				b[m][mm] = (TYPE) 1.0;
+				r[m][mm] = (TYPE) 0.0;
+				d[m][mm] = (TYPE) 0.0;
+				q[m][mm] = (TYPE) 0.0;
+			}
+		}
+
+		for (n = 0; n < nblocks; n++)
+		{
+			mpi_rank = my_distrib(m, n);
+			if (mpi_rank == rank)
+			{
+				starpu_malloc((void**) &A[m][n], block_size*block_size*sizeof(TYPE));
+
+				for (nn = 0; nn < block_size; nn++)
+				{
+					for (mm = 0; mm < block_size; mm++)
+					{
+						/* We take Hilbert matrix that is not well conditionned but definite positive: H(i,j) = 1/(1+i+j) */
+						A[m][n][mm + nn*block_size] = (TYPE) (1.0/(1.0+(nn+(m*block_size)+mm+(n*block_size))));
+					}
+				}
+			}
+		}
+	}
+}
+
+static void free_data(void)
+{
+	unsigned nn, mm, m, n, mpi_rank;
+
+	for (m = 0; m < nblocks; m++)
+	{
+		mpi_rank = my_distrib(m, 0);
+
+		if (mpi_rank == rank || display_result)
+		{
+			starpu_free((void*) x[m]);
+		}
+
+		if (mpi_rank == rank)
+		{
+			starpu_free((void*) b[m]);
+			starpu_free((void*) r[m]);
+			starpu_free((void*) d[m]);
+			starpu_free((void*) q[m]);
+		}
+
+		for (n = 0; n < nblocks; n++)
+		{
+			mpi_rank = my_distrib(m, n);
+			if (mpi_rank == rank)
+			{
+				starpu_free((void*) A[m][n]);
+			}
+		}
+
+		free(A[m]);
+	}
+
+	free(A);
+	free(x);
+	free(b);
+	free(r);
+	free(d);
+	free(q);
+}
+
+static void register_data(void)
+{
+	unsigned m, n;
+	int mpi_rank;
+	starpu_mpi_tag_t mpi_tag = 0;
+
+	A_handle = malloc(nblocks*sizeof(starpu_data_handle_t*));
+	x_handle = malloc(nblocks*sizeof(starpu_data_handle_t));
+	b_handle = malloc(nblocks*sizeof(starpu_data_handle_t));
+	r_handle = malloc(nblocks*sizeof(starpu_data_handle_t));
+	d_handle = malloc(nblocks*sizeof(starpu_data_handle_t));
+	q_handle = malloc(nblocks*sizeof(starpu_data_handle_t));
+
+	for (m = 0; m < nblocks; m++)
+	{
+		mpi_rank = my_distrib(m, 0);
+		A_handle[m] = malloc(nblocks*sizeof(starpu_data_handle_t));
+
+		if (mpi_rank == rank || display_result)
+		{
+			starpu_vector_data_register(&x_handle[m], STARPU_MAIN_RAM, (uintptr_t) x[m], block_size, sizeof(TYPE));
+		}
+		else if (!display_result)
+		{
+			assert(mpi_rank != rank);
+			starpu_vector_data_register(&x_handle[m], -1, (uintptr_t) NULL, block_size, sizeof(TYPE));
+		}
+
+		if (mpi_rank == rank)
+		{
+			starpu_vector_data_register(&b_handle[m], STARPU_MAIN_RAM, (uintptr_t) b[m], block_size, sizeof(TYPE));
+			starpu_vector_data_register(&r_handle[m], STARPU_MAIN_RAM, (uintptr_t) r[m], block_size, sizeof(TYPE));
+			starpu_vector_data_register(&d_handle[m], STARPU_MAIN_RAM, (uintptr_t) d[m], block_size, sizeof(TYPE));
+			starpu_vector_data_register(&q_handle[m], STARPU_MAIN_RAM, (uintptr_t) q[m], block_size, sizeof(TYPE));
+		}
+		else
+		{
+			starpu_vector_data_register(&b_handle[m], -1, (uintptr_t) NULL, block_size, sizeof(TYPE));
+			starpu_vector_data_register(&r_handle[m], -1, (uintptr_t) NULL, block_size, sizeof(TYPE));
+			starpu_vector_data_register(&d_handle[m], -1, (uintptr_t) NULL, block_size, sizeof(TYPE));
+			starpu_vector_data_register(&q_handle[m], -1, (uintptr_t) NULL, block_size, sizeof(TYPE));
+		}
+
+		starpu_data_set_coordinates(x_handle[m], 1, m);
+		starpu_mpi_data_register(x_handle[m], ++mpi_tag, mpi_rank);
+		starpu_data_set_coordinates(b_handle[m], 1, m);
+		starpu_mpi_data_register(b_handle[m], ++mpi_tag, mpi_rank);
+		starpu_data_set_coordinates(r_handle[m], 1, m);
+		starpu_mpi_data_register(r_handle[m], ++mpi_tag, mpi_rank);
+		starpu_data_set_coordinates(d_handle[m], 1, m);
+		starpu_mpi_data_register(d_handle[m], ++mpi_tag, mpi_rank);
+		starpu_data_set_coordinates(q_handle[m], 1, m);
+		starpu_mpi_data_register(q_handle[m], ++mpi_tag, mpi_rank);
+
+		if (use_reduction)
+		{
+			starpu_data_set_reduction_methods(q_handle[m], &accumulate_vector_cl, &bzero_vector_cl);
+			starpu_data_set_reduction_methods(r_handle[m], &accumulate_vector_cl, &bzero_vector_cl);
+		}
+
+		for (n = 0; n < nblocks; n++)
+		{
+			mpi_rank = my_distrib(m, n);
+
+			if (mpi_rank == rank)
+			{
+				starpu_matrix_data_register(&A_handle[m][n], STARPU_MAIN_RAM, (uintptr_t) A[m][n], block_size, block_size, block_size, sizeof(TYPE));
+			}
+			else
+			{
+				starpu_matrix_data_register(&A_handle[m][n], -1, (uintptr_t) NULL, block_size, block_size, block_size, sizeof(TYPE));
+			}
+
+			starpu_data_set_coordinates(A_handle[m][n], 2, n, m);
+			starpu_mpi_data_register(A_handle[m][n], ++mpi_tag, mpi_rank);
+		}
+	}
+
+	starpu_variable_data_register(&dtq_handle, STARPU_MAIN_RAM, (uintptr_t)&dtq, sizeof(TYPE));
+	starpu_variable_data_register(&rtr_handle, STARPU_MAIN_RAM, (uintptr_t)&rtr, sizeof(TYPE));
+	starpu_mpi_data_register(rtr_handle, ++mpi_tag, 0);
+	starpu_mpi_data_register(dtq_handle, ++mpi_tag, 0);
+
+	if (use_reduction)
+	{
+		starpu_data_set_reduction_methods(dtq_handle, &accumulate_variable_cl, &bzero_variable_cl);
+		starpu_data_set_reduction_methods(rtr_handle, &accumulate_variable_cl, &bzero_variable_cl);
+	}
+}
+
+static void unregister_data(void)
+{
+	unsigned m, n;
+
+	for (m = 0; m < nblocks; m++)
+	{
+		starpu_data_unregister(x_handle[m]);
+		starpu_data_unregister(b_handle[m]);
+		starpu_data_unregister(r_handle[m]);
+		starpu_data_unregister(d_handle[m]);
+		starpu_data_unregister(q_handle[m]);
+
+		for (n = 0; n < nblocks; n++)
+		{
+			starpu_data_unregister(A_handle[m][n]);
+		}
+
+		free(A_handle[m]);
+	}
+
+	starpu_data_unregister(dtq_handle);
+	starpu_data_unregister(rtr_handle);
+
+	free(A_handle);
+	free(x_handle);
+	free(b_handle);
+	free(r_handle);
+	free(d_handle);
+	free(q_handle);
+}
+
+static void display_x_result(void)
+{
+	int j, i;
+
+	for (j = 0; j < nblocks; j++)
+	{
+		starpu_mpi_get_data_on_node(MPI_COMM_WORLD, x_handle[j], 0);
+	}
+
+	if (rank == 0)
+	{
+		FPRINTF_SERVER(stderr, "Computed X vector:\n");
+		for (j = 0; j < nblocks; j++)
+		{
+			starpu_data_acquire(x_handle[j], STARPU_R);
+			for (i = 0; i < block_size; i++)
+			{
+				FPRINTF(stderr, "% 02.2e\n", x[j][i]);
+			}
+			starpu_data_release(x_handle[j]);
+		}
+	}
+}
+
+
+static void parse_args(int argc, char **argv)
+{
+	int i;
+	for (i = 1; i < argc; i++)
+	{
+		if (strcmp(argv[i], "-p") == 0)
+		{
+			nodes_p = atoi(argv[++i]);
+			continue;
+		}
+
+		if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0 || strcmp(argv[i], "-help") == 0)
+		{
+			FPRINTF_SERVER(stderr, "usage: %s [-h] [-nblocks #blocks] [-display-result] [-p node_grid_width] [-n problem_size] [-no-reduction] [-maxiter i]\n", argv[0]);
+			exit(-1);
+		}
+	}
+
+	parse_common_args(argc, argv);
+}
+
+
+int main(int argc, char **argv)
+{
+	int worldsize, ret;
+	double start, end;
+
+	/* Not supported yet */
+	if (starpu_get_env_number_default("STARPU_GLOBAL_ARBITER", 0) > 0)
+		return 77;
+
+	ret = starpu_mpi_init_conf(&argc, &argv, 1, MPI_COMM_WORLD, NULL);
+	if (ret == -ENODEV)
+		return 77;
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_mpi_init_conf");
+	starpu_mpi_comm_rank(MPI_COMM_WORLD, &rank);
+	starpu_mpi_comm_size(MPI_COMM_WORLD, &worldsize);
+
+	parse_args(argc, argv);
+
+	if (worldsize % nodes_p != 0)
+	{
+		FPRINTF_SERVER(stderr, "Node grid width must divide the number of nodes.\n");
+		starpu_mpi_shutdown();
+		return 1;
+	}
+	nodes_q = worldsize / nodes_p;
+
+	if (n % nblocks != 0)
+	{
+		FPRINTF_SERVER(stderr, "The number of blocks must divide the matrix size.\n");
+		starpu_mpi_shutdown();
+		return 1;
+	}
+	block_size = n / nblocks;
+
+	starpu_cublas_init();
+
+	FPRINTF_SERVER(stderr, "************** PARAMETERS ***************\n");
+	FPRINTF_SERVER(stderr, "%d nodes (%dx%d)\n", worldsize, nodes_p, nodes_q);
+	FPRINTF_SERVER(stderr, "Problem size (-n): %lld\n", n);
+	FPRINTF_SERVER(stderr, "Maximum number of iterations (-maxiter): %d\n", i_max);
+	FPRINTF_SERVER(stderr, "Number of blocks (-nblocks): %d\n", nblocks);
+	FPRINTF_SERVER(stderr, "Reduction (-no-reduction): %s\n", use_reduction ? "enabled" : "disabled");
+
+	starpu_mpi_barrier(MPI_COMM_WORLD);
+	start = starpu_timing_now();
+	generate_random_problem();
+	register_data();
+	starpu_mpi_barrier(MPI_COMM_WORLD);
+	end = starpu_timing_now();
+
+	FPRINTF_SERVER(stderr, "Problem intialization timing : %2.2f seconds\n", (end-start)/10e6);
+
+	ret = cg();
+	if (ret == -ENODEV)
+	{
+		ret = 77;
+		goto enodev;
+	}
+
+	starpu_task_wait_for_all();
+
+	if (display_result)
+	{
+		display_x_result();
+	}
+
+enodev:
+	unregister_data();
+	free_data();
+	starpu_cublas_shutdown();
+	starpu_mpi_shutdown();
+	return ret;
+}