Pārlūkot izejas kodu

revert fortran task insert ; remove init after redux_data ; example now uses 2 contributing nodes over 1 handle instead of 2 over 2

Antoine Jego 4 gadi atpakaļ
vecāks
revīzija
ae6818ee93

+ 1 - 0
AUTHORS

@@ -17,6 +17,7 @@ Guilbaud Adrien, Inria, <adrien.guilbaud@inria.fr>
 He Kun, Inria, <kun.he@inria.fr>
 Henry Sylvain, Université de Bordeaux, <sylvain.henry@inria.fr>
 Hugo Andra, Université de Bordeaux/Inria, <andra.hugo@inria.fr>
+Jego Antoine, Enseeiht, <antoine.jego@etu.enseeiht.fr>
 Juhoor Mehdi, Université de Bordeaux, <mjuhoor@gmail.com>
 Juven Alexis, Inria, <alexis.juven@inria.fr>
 Keryell-Even Maël, Inria, <mael.keryell@inria.fr>

+ 1 - 0
ChangeLog

@@ -51,6 +51,7 @@ New features:
     starpu_mpi_interface_datatype_node_register which will be needed for
     MPI/NUMA/GPUDirect.
   * Add peek_data interface method.
+  * Add STARPU_MPI_REDUX
 
 Small changes:
   * Add a synthetic energy efficiency testcase.

+ 31 - 0
doc/doxygen/chapters/410_mpi_support.doxy

@@ -744,6 +744,37 @@ starpu_mpi_data_set_rank(data, STARPU_MPI_PER_NODE);
 
 The data can then be used just like pernode above.
 
+\section MPIMpiRedux Inter-node reduction
+
+One might want to leverage a reduction pattern across several nodes.
+Using \c STARPU_REDUX, one can obtain reduction patterns across several nodes,
+however each core across the contributing nodes will spawn their own
+contribution to work with. In the case that these allocations or the
+required reductions are too expensive to execute for each contribution,
+the access mode \c STARPU_MPI_REDUX tells StarPU to spawn only one contribution 
+on node executing tasks partaking in the reduction.
+
+Tasks producing a result in the inter-node reduction should be registered as
+accessing the contribution through \c STARPU_RW|STARPU_COMMUTE mode.
+
+\code{.c}
+static struct starpu_codelet contrib_cl =
+{
+	.cpu_funcs = {cpu_contrib}, /* cpu implementation(s) of the routine */
+	.nbuffers = 1, /* number of data handles referenced by this routine */
+	.modes = {STARPU_RW | STARPU_COMMUTE} /* access modes for the contribution */
+	.name = "contribution"
+};
+\endcode
+
+When inserting these tasks, the access mode handed out to the StarPU-MPI layer
+should be \c STARPU_MPI_REDUX. Assuming \c data is owned by node 0 and we want node
+1 to compute the contribution, we could do the following.
+
+\code{.c}
+starpu_mpi_task_insert(MPI_COMM_WORLD, &contrib_cl, STARPU_MPI_REDUX, data, EXECUTE_ON_NODE, 1); /* Node 1 computes it */
+\endcode
+
 \section MPIPriorities Priorities
 
 All send functions have a <c>_prio</c> variant which takes an additional

+ 2 - 0
include/fstarpu_mod.f90

@@ -25,6 +25,7 @@ module fstarpu_mod
         type(c_ptr), bind(C) :: FSTARPU_RW
         type(c_ptr), bind(C) :: FSTARPU_SCRATCH
         type(c_ptr), bind(C) :: FSTARPU_REDUX
+        type(c_ptr), bind(C) :: FSTARPU_MPI_REDUX
         type(c_ptr), bind(C) :: FSTARPU_COMMUTE
         type(c_ptr), bind(C) :: FSTARPU_SSEND
         type(c_ptr), bind(C) :: FSTARPU_LOCALITY
@@ -2395,6 +2396,7 @@ module fstarpu_mod
                         FSTARPU_RW      = fstarpu_get_constant(C_CHAR_"FSTARPU_RW"//C_NULL_CHAR)
                         FSTARPU_SCRATCH = fstarpu_get_constant(C_CHAR_"FSTARPU_SCRATCH"//C_NULL_CHAR)
                         FSTARPU_REDUX   = fstarpu_get_constant(C_CHAR_"FSTARPU_REDUX"//C_NULL_CHAR)
+                        FSTARPU_MPI_REDUX   = fstarpu_get_constant(C_CHAR_"FSTARPU_MPI_REDUX"//C_NULL_CHAR)
                         FSTARPU_COMMUTE   = fstarpu_get_constant(C_CHAR_"FSTARPU_COMMUTE"//C_NULL_CHAR)
                         FSTARPU_SSEND   = fstarpu_get_constant(C_CHAR_"FSTARPU_SSEND"//C_NULL_CHAR)
                         FSTARPU_LOCALITY   = fstarpu_get_constant(C_CHAR_"FSTARPU_LOCALITY"//C_NULL_CHAR)

+ 9 - 1
include/starpu_data.h

@@ -110,7 +110,15 @@ enum starpu_data_access_mode
 				   src/sched_policies/work_stealing_policy.c
 				   source code.
 				*/
-	STARPU_ACCESS_MODE_MAX=(1<<7) /**< todo */
+	STARPU_MPI_REDUX=(1<<7), /** Inter-node reduction only. Codelets 
+				    contributing to these reductions should
+				    be registered with STARPU_RW | STARPU_COMMUTE 
+				    access modes.
+			            When inserting these tasks through the
+				    MPI layer however, the access mode needs
+				    to be STARPU_MPI_REDUX. */
+	STARPU_ACCESS_MODE_MAX=(1<<8) /** The purpose of ACCESS_MODE_MAX is to
+					be the maximum of this enum. */
 };
 
 struct starpu_data_interface_ops;

+ 1 - 1
julia/src/StarPU.jl

@@ -65,7 +65,7 @@ export STARPU_HISTORY_BASED, STARPU_REGRESSION_BASED
 export STARPU_NL_REGRESSION_BASED, STARPU_MULTIPLE_REGRESSION_BASED
 export starpu_tag_t
 export STARPU_NONE,STARPU_R,STARPU_W,STARPU_RW, STARPU_SCRATCH
-export STARPU_REDUX,STARPU_COMMUTE, STARPU_SSEND, STARPU_LOCALITY
+export STARPU_MPI_REDUX, STARPU_REDUX,STARPU_COMMUTE, STARPU_SSEND, STARPU_LOCALITY
 export STARPU_ACCESS_MODE_MAX
 
 # BLAS

+ 40 - 0
mpi/examples/Makefile.am

@@ -290,6 +290,23 @@ starpu_mpi_EXAMPLES +=				\
 	matrix_mult/mm
 endif
 
+########################
+# MPI STARPU_MPI_REDUX #
+########################
+
+examplebin_PROGRAMS +=		\
+	mpi_redux/mpi_redux
+
+matrix_mult_mm_SOURCES	=		\
+	mpi_redux/mpi_redux.c
+
+matrix_mult_mm_LDADD =			\
+	-lm
+
+if !STARPU_SIMGRID
+starpu_mpi_EXAMPLES +=				\
+	mpi_redux/mpi_redux
+endif
 ##########################################
 # Native Fortran MPI Matrix mult example #
 ##########################################
@@ -335,7 +352,30 @@ starpu_mpi_EXAMPLES +=				\
 endif
 endif
 endif
+########################################
+# Native Fortran MPI STARPU_REDUX test #
+########################################
 
+if STARPU_HAVE_MPIFORT
+if !STARPU_SANITIZE
+examplebin_PROGRAMS +=		\
+	native_fortran/nf_mpi_redux
+
+native_fortran_nf_mpi_redux_SOURCES	=			\
+	native_fortran/fstarpu_mpi_mod.f90	\
+	native_fortran/fstarpu_mod.f90		\
+	native_fortran/nf_mpi_redux.f90	
+
+native_fortran_nf_mpi_redux_LDADD =					\
+	-lm
+
+if !STARPU_SIMGRID
+starpu_mpi_EXAMPLES +=				\
+	native_fortran/nf_mpi_redux
+endif
+endif
+endif
+ 						
 ###################
 # complex example #
 ###################

+ 205 - 0
mpi/examples/mpi_redux/mpi_redux.c

@@ -0,0 +1,205 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2016-2021  Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+ 
+/*
+ * This example illustrates how to use the STARPU_MPI_REDUX mode
+ * and compare it with the standard STARPU_REDUX.
+ *
+ * In order to make this comparison salliant, the init codelet is not
+ * a task that set the handle to a neutral element but rather depends
+ * on the working node.
+ * This is not a proper way to use a reduction pattern however it 
+ * can be analogous to the cost/weight of each contribution.
+ */
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <assert.h>
+#include <math.h>
+#include <starpu.h>
+#include <starpu_mpi.h>
+#include "helper.h"
+#include <unistd.h>
+
+
+static void cl_cpu_work(void *handles[], void*arg)
+{
+	(void)arg;
+	double *a = (double *)STARPU_VARIABLE_GET_PTR(handles[0]);
+	double *b = (double *)STARPU_VARIABLE_GET_PTR(handles[1]);
+	sleep(2);
+	printf("work_cl (rank:%d,worker:%d) %f =>",starpu_mpi_world_rank(), starpu_worker_get_id(), *a);
+	*a = 3.0 + *a + *b;
+	printf("%f\n",*a);
+}
+
+static struct starpu_codelet work_cl = 
+{
+	.cpu_funcs = { cl_cpu_work },
+	.nbuffers = 2,
+	.modes = { STARPU_REDUX, STARPU_R },
+	.name = "task_init"
+};
+
+static struct starpu_codelet mpi_work_cl = 
+{
+	.cpu_funcs = { cl_cpu_work },
+	.nbuffers = 2,
+	.modes = { STARPU_RW | STARPU_COMMUTE, STARPU_R },
+	.name = "task_init-mpi"
+};
+
+static void cl_cpu_task_init(void *handles[], void*arg)
+{
+	(void) arg;
+	double *a = (double *)STARPU_VARIABLE_GET_PTR(handles[0]);
+	sleep(1);
+	printf("init_cl (rank:%d,worker:%d) %d (was %f)\n", starpu_mpi_world_rank(), starpu_worker_get_id(), starpu_mpi_world_rank(), *a);
+	*a = starpu_mpi_world_rank();
+}
+
+static struct starpu_codelet task_init_cl = 
+{
+	.cpu_funcs = { cl_cpu_task_init },
+	.nbuffers = 1,
+	.modes = { STARPU_W },
+	.name = "task_init"
+};
+
+static void cl_cpu_task_red(void *handles[], void*arg)
+{
+	(void) arg;
+	double *ad = (double *)STARPU_VARIABLE_GET_PTR(handles[0]);
+	double *as = (double *)STARPU_VARIABLE_GET_PTR(handles[1]);
+	sleep(2);
+	printf("red_cl (rank:%d,worker:%d) %f ; %f --> %f\n", starpu_mpi_world_rank(), starpu_worker_get_id(), *as, *ad, *as+*ad);
+	*ad = *ad + *as;
+}
+
+static struct starpu_codelet task_red_cl = 
+{
+	.cpu_funcs = { cl_cpu_task_red },
+	.nbuffers = 2,
+	.modes = { STARPU_RW, STARPU_R },
+	.name = "task_red"
+};
+
+int main(int argc, char *argv[])
+{
+	int comm_rank, comm_size;
+	/* Initializes STarPU and the StarPU-MPI layer */
+	starpu_fxt_autostart_profiling(0);
+	int ret = starpu_mpi_init_conf(&argc, &argv, 1, MPI_COMM_WORLD, NULL);
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_mpi_ini_conft");
+
+	int nworkers = starpu_cpu_worker_get_count();
+	if (nworkers < 2)
+	{
+        	FPRINTF(stderr, "We need at least 2 CPU worker per node.\n");
+        	starpu_mpi_shutdown();
+       		return STARPU_TEST_SKIPPED;
+	}
+	starpu_mpi_comm_size(MPI_COMM_WORLD, &comm_size);
+	if (comm_size< 2)
+	{
+        	FPRINTF(stderr, "We need at least 2 nodes.\n");
+        	starpu_mpi_shutdown();
+       		return STARPU_TEST_SKIPPED;
+	}
+	starpu_mpi_comm_rank(MPI_COMM_WORLD, &comm_rank);
+
+	double a, b[comm_size];
+	starpu_data_handle_t a_h, b_h[comm_size];
+	double work_coef = 2;
+	enum starpu_data_access_mode codelet_mode;
+	enum starpu_data_access_mode task_mode;
+	int i,j,work_node;
+    	starpu_mpi_tag_t tag = 0;
+	for (i = 0 ; i < 2 ; i++)
+	{
+		starpu_mpi_barrier(MPI_COMM_WORLD);
+		if (i==0)
+			task_mode = STARPU_MPI_REDUX;
+		else
+			task_mode = STARPU_REDUX;
+		if (comm_rank == 0)
+		{
+			a = 1.0;
+			printf("init a = %f\n", a);
+			starpu_variable_data_register(&a_h, STARPU_MAIN_RAM, (uintptr_t)&a, sizeof(double));
+			for (j=0;j<comm_size;j++)
+				starpu_variable_data_register(&b_h[j], -1, 0, sizeof(double));
+		}
+		else
+		{
+			b[comm_rank] = 1.0 / (comm_rank + 1.0);
+			printf("init b_%d = %f\n", comm_rank, b[comm_rank]);
+			starpu_variable_data_register(&a_h, -1, 0, sizeof(double));
+			for (j=0;j<comm_size;j++)
+			{
+				if (j == comm_rank)
+					starpu_variable_data_register(&b_h[j], STARPU_MAIN_RAM, (uintptr_t)&b[j], sizeof(double));
+				else
+					starpu_variable_data_register(&b_h[j], -1, 0, sizeof(double));
+			}
+		}
+		starpu_mpi_data_register(a_h, tag++, 0);
+		for (j=0;j<comm_size;j++)
+			starpu_mpi_data_register(b_h[j], tag++, j);
+		
+		starpu_data_set_reduction_methods(a_h, &task_red_cl, &task_init_cl);
+		starpu_fxt_start_profiling();
+		for (work_node=1; work_node < comm_size;work_node++)
+		{
+			for (j=1;j<=work_coef*nworkers;j++)
+			{
+				if (i == 0)
+				    starpu_mpi_task_insert(MPI_COMM_WORLD,
+					&mpi_work_cl,
+					task_mode, a_h,
+					STARPU_R, b_h[work_node],
+					STARPU_EXECUTE_ON_NODE, work_node,
+					0);
+				else
+				    starpu_mpi_task_insert(MPI_COMM_WORLD,
+					&work_cl,
+					task_mode, a_h,
+					STARPU_R, b_h[work_node],
+					STARPU_EXECUTE_ON_NODE, work_node,
+					0);
+			}
+		}
+		starpu_mpi_redux_data(MPI_COMM_WORLD, a_h);
+		starpu_mpi_wait_for_all(MPI_COMM_WORLD);
+		starpu_mpi_barrier(MPI_COMM_WORLD);
+		if (comm_rank == 0) 
+		{
+			double tmp = 0.0;
+			for (work_node = 1; work_node < comm_size ; work_node++)
+				tmp += 1.0 / (work_node + 1.0);
+			printf("computed result ---> %f expected %f\n", a, 1.0 + (comm_size - 1.0)*(comm_size)/2.0 + work_coef*nworkers*((comm_size-1)*3.0 + tmp));
+		}
+		starpu_data_unregister(a_h);
+		for (work_node=1; work_node < comm_size-1;work_node++)
+			starpu_data_unregister(b_h[work_node]);
+		starpu_mpi_barrier(MPI_COMM_WORLD);
+	}
+	starpu_mpi_shutdown();
+	return 0;
+}
+
+
+

+ 240 - 0
mpi/examples/native_fortran/nf_mpi_redux.f90

@@ -0,0 +1,240 @@
+program nf_mpi_redux
+  use iso_c_binding
+  use fstarpu_mod
+  use fstarpu_mpi_mod
+
+  implicit none
+  
+  integer, target                         :: ret, np, i, j, trial
+  type(c_ptr)                             :: work_cl, task_rw_cl,task_red_cl, task_ini_cl
+  character(kind=c_char,len=*), parameter :: name=C_CHAR_"task"//C_NULL_CHAR
+  character(kind=c_char,len=*), parameter :: namered=C_CHAR_"task_red"//C_NULL_CHAR
+  character(kind=c_char,len=*), parameter :: nameini=C_CHAR_"task_ini"//C_NULL_CHAR
+  real(kind(1.d0)), target                :: a,tmp
+  real(kind(1.d0)), target, allocatable   :: b(:)
+  integer(kind=8)                         :: tag, err
+  type(c_ptr)                             :: ahdl
+  type(c_ptr), target, allocatable        :: bhdl(:)
+  type(c_ptr)                             :: task_mode, codelet_mode
+  integer, target                         :: comm_world,comm_w_rank, comm_size
+  integer(c_int), target                  :: w_node, nworkers, work_coef
+  
+  call fstarpu_fxt_autostart_profiling(0)
+  ret = fstarpu_init(c_null_ptr)
+  ret = fstarpu_mpi_init(1)
+
+  comm_world = fstarpu_mpi_world_comm()
+  comm_w_rank  = fstarpu_mpi_world_rank()
+  comm_size  = fstarpu_mpi_world_size()
+  if (comm_size.lt.2) then
+    write(*,'(" ")')
+    write(*,'("This application is meant to run with at least two nodes.")')
+    stop 2
+  end if
+  allocate(b(comm_size-1), bhdl(comm_size-1))
+  nworkers = fstarpu_worker_get_count()
+  if (nworkers.lt.1) then
+    write(*,'(" ")')
+    write(*,'("This application is meant to run with at least one worker per node.")')
+    stop 2
+  end if
+
+  ! allocate and reduction codelets
+  task_red_cl = fstarpu_codelet_allocate()
+  call fstarpu_codelet_set_name(task_red_cl, namered)
+  call fstarpu_codelet_add_cpu_func(task_red_cl,C_FUNLOC(cl_cpu_task_red))
+  call fstarpu_codelet_add_buffer(task_red_cl, FSTARPU_RW)
+  call fstarpu_codelet_add_buffer(task_red_cl, FSTARPU_R)
+
+  task_ini_cl = fstarpu_codelet_allocate()
+  call fstarpu_codelet_set_name(task_ini_cl, nameini)
+  call fstarpu_codelet_add_cpu_func(task_ini_cl,C_FUNLOC(cl_cpu_task_ini))
+  call fstarpu_codelet_add_buffer(task_ini_cl, FSTARPU_W)
+
+  work_coef=2
+
+  do trial=1,2
+
+  if (trial.eq.1) then
+        write(*,*) "Using STARPU_MPI_REDUX"
+        codelet_mode = FSTARPU_RW.ior.FSTARPU_COMMUTE
+        task_mode = FSTARPU_MPI_REDUX
+  else if (trial.eq.2) then
+        write(*,*) "Using STARPU_REDUX"
+        codelet_mode = FSTARPU_REDUX
+        task_mode = FSTARPU_REDUX
+  end if
+  ! allocate and fill codelet structs
+  work_cl = fstarpu_codelet_allocate()
+  call fstarpu_codelet_set_name(work_cl, name)
+  call fstarpu_codelet_add_cpu_func(work_cl, C_FUNLOC(cl_cpu_task))
+  call fstarpu_codelet_add_buffer(work_cl, codelet_mode)
+  call fstarpu_codelet_add_buffer(work_cl, FSTARPU_R)
+  err = fstarpu_mpi_barrier(comm_world)
+
+  if(comm_w_rank.eq.0) then
+    write(*,'(" ")')
+    a = 1.0
+    write(*,*) "init a = ", a
+  else 
+    b(comm_w_rank) = 1.0 / (comm_w_rank + 1.0)
+    write(*,*) "init b_",comm_w_rank,"=", b(comm_w_rank), " AT ", &
+c_loc(bhdl(comm_w_rank)) ! This is not really meaningful
+  end if
+
+  err = fstarpu_mpi_barrier(comm_world)
+
+  tag = 0
+  if(comm_w_rank.eq.0) then
+    call fstarpu_variable_data_register(ahdl, 0, c_loc(a),c_sizeof(a))
+    do i=1,comm_size-1
+        call fstarpu_variable_data_register(bhdl(i), -1, c_null_ptr,c_sizeof(b(i)))
+    end do
+  else 
+    call fstarpu_variable_data_register(ahdl, -1, c_null_ptr,c_sizeof(a))
+    do i=1,comm_size-1
+      if (i.eq.comm_w_rank) then
+        call fstarpu_variable_data_register(bhdl(i), 0, c_loc(b(i)),c_sizeof(b(i)))
+      else
+        call fstarpu_variable_data_register(bhdl(i), -1, c_null_ptr,c_sizeof(b(i)))
+      end if
+    end do
+  end if
+  call fstarpu_mpi_data_register(ahdl,  tag,  0)
+  do i=1,comm_size-1
+     call fstarpu_mpi_data_register(bhdl(i), tag+i,i)
+  end do
+
+  tag = tag + comm_size
+
+  call fstarpu_data_set_reduction_methods(ahdl,task_red_cl,task_ini_cl)
+
+  err = fstarpu_mpi_barrier(comm_world)
+  
+  
+  call fstarpu_fxt_start_profiling()
+  do w_node=1,comm_size-1
+    do i=1,work_coef*nworkers
+      call fstarpu_mpi_task_insert( (/ c_loc(comm_world),   &
+             work_cl,                                         &
+             task_mode, ahdl,                            &
+             FSTARPU_R, bhdl(w_node),                      &
+             FSTARPU_EXECUTE_ON_NODE, c_loc(w_node),          &
+             C_NULL_PTR /))
+    end do
+  end do
+  call fstarpu_mpi_redux_data(comm_world, ahdl)
+  err = fstarpu_mpi_wait_for_all(comm_world)
+  
+  if(comm_w_rank.eq.0) then
+    tmp = 0
+    do w_node=1,comm_size-1
+      tmp = tmp + 1.0 / (w_node+1.0)
+    end do
+    write(*,*) 'computed result ---> ',a, "expected =",&
+      1.0 + (comm_size-1.0)*(comm_size)/2.0 + work_coef*nworkers*((comm_size-1.0)*3.0 + tmp)
+  end if 
+  err = fstarpu_mpi_barrier(comm_world)
+  call fstarpu_data_unregister(ahdl)
+  do w_node=1,comm_size-1
+    call fstarpu_data_unregister(bhdl(w_node))
+  end do
+  call fstarpu_codelet_free(work_cl)
+  
+  end do
+  
+  call fstarpu_fxt_stop_profiling()
+  call fstarpu_codelet_free(task_red_cl)
+  call fstarpu_codelet_free(task_ini_cl)
+
+  
+  err = fstarpu_mpi_shutdown()
+  call fstarpu_shutdown()
+  deallocate(b, bhdl)  
+  stop
+
+contains
+
+  recursive subroutine cl_cpu_task (buffers, cl_args) bind(C)
+    use iso_c_binding       ! C interfacing module
+    use fstarpu_mod         ! StarPU interfacing module
+    implicit none
+    
+    type(c_ptr), value, intent(in) :: buffers, cl_args ! cl_args is unused
+    integer(c_int) :: ret, worker_id
+    integer        :: comm_rank
+    integer, target :: i
+    real(kind(1.d0)), pointer :: a, b
+    real(kind(1.d0))          :: old_a
+
+    worker_id = fstarpu_worker_get_id()
+    comm_rank  = fstarpu_mpi_world_rank()
+
+    call c_f_pointer(fstarpu_variable_get_ptr(buffers, 0), a)
+    call c_f_pointer(fstarpu_variable_get_ptr(buffers, 1), b)
+    call sleep(1.d0)
+    old_a = a
+    a = old_a + 3.0 + b
+    write(*,*) "task   (c_w_rank:",comm_rank," worker_id:",worker_id,") from ",old_a,"to",a
+    
+    return
+  end subroutine cl_cpu_task
+
+
+  recursive subroutine cl_cpu_task_red (buffers, cl_args) bind(C)
+    use iso_c_binding       ! C interfacing module
+    use fstarpu_mod         ! StarPU interfacing module
+    implicit none
+    
+    type(c_ptr), value, intent(in) :: buffers, cl_args ! cl_args is unused
+    integer(c_int) :: ret, worker_id
+    integer, target                         :: comm_rank
+    real(kind(1.d0)), pointer :: as, ad
+    real(kind(1.d0))           :: old_ad
+    worker_id = fstarpu_worker_get_id()
+    comm_rank  = fstarpu_mpi_world_rank()
+    call c_f_pointer(fstarpu_variable_get_ptr(buffers, 0), ad)
+    call c_f_pointer(fstarpu_variable_get_ptr(buffers, 1), as)
+    old_ad = ad
+    ad = ad + as
+    call sleep(1.d0)
+    write(*,*) "red_cl (c_w_rank:",comm_rank,"worker_id:",worker_id,")",as, old_ad, ' ---> ',ad
+    
+    return
+  end subroutine cl_cpu_task_red
+
+  recursive subroutine cl_cpu_task_ini (buffers, cl_args) bind(C)
+    use iso_c_binding       ! C interfacing module
+    use fstarpu_mod         ! StarPU interfacing module
+    implicit none
+    
+    type(c_ptr), value, intent(in) :: buffers, cl_args 
+        ! cl_args is unused
+    integer(c_int) :: ret, worker_id
+    integer, target                         :: comm_rank
+    real(kind(1.d0)), pointer :: a
+    worker_id = fstarpu_worker_get_id()
+    comm_rank  = fstarpu_mpi_world_rank()
+    call c_f_pointer(fstarpu_variable_get_ptr(buffers, 0), a)
+    call sleep(0.5d0)
+    ! As this codelet is run by each worker in the REDUX mode case
+    ! this initialization makes salient the number of copies spawned
+    write(*,*) "ini_cl (c_w_rank:",comm_rank,"worker_id:",worker_id,") set to", comm_rank, "(was",a,")"
+    a = comm_rank
+    return
+  end subroutine cl_cpu_task_ini
+
+
+  subroutine sleep(t)
+    implicit none
+    integer :: t_start, t_end, t_rate
+    real(kind(1.d0))     :: ta, t
+    call system_clock(t_start)
+    do
+       call system_clock(t_end, t_rate)
+       ta = real(t_end-t_start)/real(t_rate)
+       if(ta.gt.t) return
+    end do
+  end subroutine sleep
+
+end program

+ 6 - 6
mpi/src/starpu_mpi_task_insert.c

@@ -100,7 +100,7 @@ void _starpu_mpi_exchange_data_before_execution(starpu_data_handle_t data, enum
 	{
 		STARPU_ASSERT_MSG(starpu_mpi_data_get_rank(data) == STARPU_MPI_PER_NODE, "If task is replicated, it has to access only per-node data");
 	}
-	if (data && mode & STARPU_R)
+	if (data && mode & STARPU_R && !(mode & STARPU_MPI_REDUX))
 	{
 		int mpi_rank = starpu_mpi_data_get_rank(data);
 		starpu_mpi_tag_t data_tag = starpu_mpi_data_get_tag(data);
@@ -142,7 +142,7 @@ void _starpu_mpi_exchange_data_before_execution(starpu_data_handle_t data, enum
 static
 void _starpu_mpi_exchange_data_after_execution(starpu_data_handle_t data, enum starpu_data_access_mode mode, int me, int xrank, int do_execute, int prio, MPI_Comm comm)
 {
-	if (mode & STARPU_W)
+	if (mode & STARPU_W && !(mode & STARPU_MPI_REDUX))
 	{
 		int mpi_rank = starpu_mpi_data_get_rank(data);
 		starpu_mpi_tag_t data_tag = starpu_mpi_data_get_tag(data);
@@ -179,7 +179,7 @@ void _starpu_mpi_clear_data_after_execution(starpu_data_handle_t data, enum star
 {
 	if (_starpu_cache_enabled)
 	{
-		if (mode & STARPU_W || mode & STARPU_REDUX)
+		if ((mode & STARPU_W && !(mode & STARPU_MPI_REDUX)) || mode & STARPU_REDUX)
 		{
 			/* The data has been modified, it MUST be removed from the cache */
 			starpu_mpi_cached_send_clear(data);
@@ -189,7 +189,7 @@ void _starpu_mpi_clear_data_after_execution(starpu_data_handle_t data, enum star
 	else
 	{
 		/* We allocated a temporary buffer for the received data, now drop it */
-		if ((mode & STARPU_R) && do_execute)
+		if ((mode & STARPU_R && !(mode & STARPU_MPI_REDUX)) && do_execute)
 		{
 			int mpi_rank = starpu_mpi_data_get_rank(data);
 			if (mpi_rank == STARPU_MPI_PER_NODE)
@@ -254,7 +254,7 @@ int _starpu_mpi_task_decode_v(struct starpu_codelet *codelet, int me, int nb_nod
 				inconsistent_execute = 0;
 			}
 		}
-		else if (arg_type_nocommute & STARPU_R || arg_type_nocommute & STARPU_W || arg_type_nocommute & STARPU_RW || arg_type & STARPU_SCRATCH || arg_type & STARPU_REDUX)
+		else if (arg_type_nocommute & STARPU_R || arg_type_nocommute & STARPU_W || arg_type_nocommute & STARPU_RW || arg_type & STARPU_SCRATCH || arg_type & STARPU_REDUX || arg_type & STARPU_MPI_REDUX)
 		{
 			starpu_data_handle_t data = va_arg(varg_list_copy, starpu_data_handle_t);
 			enum starpu_data_access_mode mode = (enum starpu_data_access_mode) arg_type;
@@ -902,7 +902,7 @@ void starpu_mpi_redux_data_prio(MPI_Comm comm, starpu_data_handle_t data_handle,
 	{
 		_STARPU_MPI_DEBUG(1, "Sending redux handle to %d ...\n", rank);
 		starpu_mpi_isend_detached_prio(data_handle, rank, data_tag, prio, comm, NULL, NULL);
-		starpu_task_insert(data_handle->init_cl, STARPU_W, data_handle, 0);
+		starpu_data_invalidate_submit(data_handle);
 	}
 	/* FIXME: In order to prevent simultaneous receive submissions
 	 * on the same handle, we need to wait that all the starpu_mpi

+ 1 - 1
mpi/src/starpu_mpi_task_insert_fortran.c

@@ -74,7 +74,7 @@ int _fstarpu_mpi_task_decode_v(struct starpu_codelet *codelet, int me, int nb_no
 				inconsistent_execute = 0;
 			}
 		}
-		else if (arg_type_nocommute & STARPU_R || arg_type_nocommute & STARPU_W || arg_type_nocommute & STARPU_RW || arg_type & STARPU_SCRATCH || arg_type & STARPU_REDUX)
+		else if (arg_type_nocommute & STARPU_R || arg_type_nocommute & STARPU_W || arg_type_nocommute & STARPU_RW || arg_type & STARPU_SCRATCH || arg_type & STARPU_REDUX || arg_type & STARPU_MPI_REDUX)
 		{
 			arg_i++;
 			starpu_data_handle_t data = arglist[arg_i];

+ 1 - 0
src/debug/traces/starpu_fxt.c

@@ -256,6 +256,7 @@ static void task_dump(struct task_info *task, struct starpu_fxt_options *options
 				(task->data[i].mode & STARPU_W)?"W":"",
 				(task->data[i].mode & STARPU_SCRATCH)?"S":"",
 				(task->data[i].mode & STARPU_REDUX)?"X":"",
+				(task->data[i].mode & STARPU_MPI_REDUX)?"X-mpi":"",
 				(task->data[i].mode & STARPU_COMMUTE)?"C":"");
 		fprintf(tasks_file, "\n");
 		fprintf(tasks_file, "Sizes:");

+ 2 - 0
src/util/fstarpu.c

@@ -27,6 +27,7 @@ static const intptr_t fstarpu_w	= STARPU_W;
 static const intptr_t fstarpu_rw	= STARPU_RW;
 static const intptr_t fstarpu_scratch	= STARPU_SCRATCH;
 static const intptr_t fstarpu_redux	= STARPU_REDUX;
+static const intptr_t fstarpu_mpi_redux	= STARPU_MPI_REDUX;
 static const intptr_t fstarpu_commute	= STARPU_COMMUTE;
 static const intptr_t fstarpu_ssend	= STARPU_SSEND;
 static const intptr_t fstarpu_locality	= STARPU_LOCALITY;
@@ -121,6 +122,7 @@ intptr_t fstarpu_get_constant(char *s)
 	else if	(!strcmp(s, "FSTARPU_RW"))	{ return fstarpu_rw; }
 	else if	(!strcmp(s, "FSTARPU_SCRATCH"))	{ return fstarpu_scratch; }
 	else if	(!strcmp(s, "FSTARPU_REDUX"))	{ return fstarpu_redux; }
+	else if	(!strcmp(s, "FSTARPU_MPI_REDUX"))	{ return fstarpu_mpi_redux; }
 	else if	(!strcmp(s, "FSTARPU_COMMUTE"))	{ return fstarpu_commute; }
 	else if	(!strcmp(s, "FSTARPU_SSEND"))	{ return fstarpu_ssend; }
 	else if	(!strcmp(s, "FSTARPU_LOCALITY"))	{ return fstarpu_locality; }

+ 20 - 11
src/util/starpu_task_insert_utils.c

@@ -364,21 +364,29 @@ void starpu_task_insert_data_make_room(struct starpu_codelet *cl, struct starpu_
 
 void starpu_task_insert_data_process_arg(struct starpu_codelet *cl, struct starpu_task *task, int *allocated_buffers, int *current_buffer, int arg_type, starpu_data_handle_t handle)
 {
-	enum starpu_data_access_mode mode = (enum starpu_data_access_mode) arg_type & ~STARPU_SSEND;
 	STARPU_ASSERT(cl != NULL);
 	STARPU_ASSERT_MSG(cl->nbuffers == STARPU_VARIABLE_NBUFFERS || *current_buffer < cl->nbuffers, "Too many data passed to starpu_task_insert");
 
 	starpu_task_insert_data_make_room(cl, task, allocated_buffers, *current_buffer, 1);
-
 	STARPU_TASK_SET_HANDLE(task, handle, *current_buffer);
-	if (cl->nbuffers == STARPU_VARIABLE_NBUFFERS || (cl->nbuffers > STARPU_NMAXBUFS && !cl->dyn_modes))
-		STARPU_TASK_SET_MODE(task, mode,* current_buffer);
+	
+	enum starpu_data_access_mode arg_mode = (enum starpu_data_access_mode) arg_type & ~STARPU_SSEND;
+
+	/* MPI_REDUX should be interpreted as RW|COMMUTE by the "ground" StarPU layer.*/ 
+	if (arg_mode & STARPU_MPI_REDUX) 
+	{
+		arg_mode = STARPU_RW|STARPU_COMMUTE;
+	}
+	if (cl->nbuffers == STARPU_VARIABLE_NBUFFERS || (cl->nbuffers > STARPU_NMAXBUFS && !cl->dyn_modes)) 
+	{ 
+		STARPU_TASK_SET_MODE(task, arg_mode,* current_buffer);
+	}
 	else if (STARPU_CODELET_GET_MODE(cl, *current_buffer))
 	{
-		STARPU_ASSERT_MSG(STARPU_CODELET_GET_MODE(cl, *current_buffer) == mode,
-				"The codelet <%s> defines the access mode %d for the buffer %d which is different from the mode %d given to starpu_task_insert\n",
-				cl->name, STARPU_CODELET_GET_MODE(cl, *current_buffer),
-				*current_buffer, mode);
+		STARPU_ASSERT_MSG(STARPU_CODELET_GET_MODE(cl, *current_buffer) == arg_mode,
+			"The codelet <%s> defines the access mode %d for the buffer %d which is different from the mode %d given to starpu_task_insert\n",
+			cl->name, STARPU_CODELET_GET_MODE(cl, *current_buffer),
+			*current_buffer, arg_mode);
 	}
 	else
 	{
@@ -386,7 +394,7 @@ void starpu_task_insert_data_process_arg(struct starpu_codelet *cl, struct starp
 #  warning shall we print a warning to the user
 		/* Morse uses it to avoid having to set it in the codelet structure */
 #endif
-		STARPU_CODELET_SET_MODE(cl, mode, *current_buffer);
+		STARPU_CODELET_SET_MODE(cl, arg_mode, *current_buffer);
 	}
 
 	(*current_buffer)++;
@@ -460,7 +468,7 @@ int _starpu_task_insert_create(struct starpu_codelet *cl, struct starpu_task *ta
 
 	while((arg_type = va_arg(varg_list, int)) != 0)
 	{
-		if (arg_type & STARPU_R || arg_type & STARPU_W || arg_type & STARPU_SCRATCH || arg_type & STARPU_REDUX)
+		if (arg_type & STARPU_R || arg_type & STARPU_W || arg_type & STARPU_SCRATCH || arg_type & STARPU_REDUX || arg_type & STARPU_MPI_REDUX)
 		{
 			/* We have an access mode : we expect to find a handle */
 			starpu_data_handle_t handle = va_arg(varg_list, starpu_data_handle_t);
@@ -753,7 +761,8 @@ int _fstarpu_task_insert_create(struct starpu_codelet *cl, struct starpu_task *t
 		if (arg_type & STARPU_R
 			|| arg_type & STARPU_W
 			|| arg_type & STARPU_SCRATCH
-			|| arg_type & STARPU_REDUX)
+			|| arg_type & STARPU_REDUX
+			|| arg_type & STARPU_MPI_REDUX)
 		{
 			arg_i++;
 			starpu_data_handle_t handle = arglist[arg_i];