Browse Source

Fix data migration: we need to flush the cache too. Add starpu_mpi_data_migrate which does things properly

Samuel Thibault 8 years ago
parent
commit
8bb9a66912

+ 1 - 3
doc/doxygen/chapters/16mpi_support.doxy

@@ -484,9 +484,7 @@ migrate the data, and register the new location.
                                               (uintptr_t)NULL, sizeof(unsigned));
             if (data_handles[x][y]) {
                 /* Migrate the data */
-                starpu_mpi_get_data_on_node_detached(MPI_COMM_WORLD, data_handles[x][y], mpi_rank, NULL, NULL);
-                /* And register the new rank of the matrix */
-                starpu_mpi_data_set_rank(data_handles[x][y], mpi_rank);
+                starpu_mpi_data_migrate(MPI_COMM_WORLD, data_handles[x][y], mpi_rank);
             }
         }
     }

+ 11 - 2
doc/doxygen/chapters/api/mpi.doxy

@@ -274,7 +274,7 @@ Symbol kept for backward compatibility. Calling function starpu_mpi_data_set_ran
 \ingroup API_MPI_Support
 Return the rank of the given data.
 
-\def starpu_data_get_rank(handle)
+\def starpu_data_get_rank(starpu_data_handle_t handle)
 \ingroup API_MPI_Support
 Return the rank of the given data.
 Symbol kept for backward compatibility. Calling function starpu_mpi_data_get_rank
@@ -283,11 +283,20 @@ Symbol kept for backward compatibility. Calling function starpu_mpi_data_get_ran
 \ingroup API_MPI_Support
 Return the tag of the given data.
 
-\def starpu_data_get_tag(handle)
+\def starpu_data_get_tag(starpu_data_handle_t handle)
 \ingroup API_MPI_Support
 Return the tag of the given data.
 Symbol kept for backward compatibility. Calling function starpu_mpi_data_get_tag
 
+\def starpu_data_migrate(MPI_Comm comm, starpu_data_handle_t data, int new_rank)
+\ingroup API_MPI_Support
+Migration the data onto the \p new_rank MPI node. This means both transferring
+the data to node \p new_rank if it hasn't been transferred already, and setting
+the home node of the data to the new node. Further data transfers triggered by
+starpu_mpi_task_insert() will be done from that new node. This function thus
+needs to be called on all nodes which have registered the data. This also
+flushes the cache for this data to avoid incoherencies.
+
 \def STARPU_EXECUTE_ON_NODE
 \ingroup API_MPI_Support
 this macro is used when calling starpu_mpi_task_insert(), and must be

+ 2 - 4
mpi/examples/matrix_mult/mm.c

@@ -182,8 +182,7 @@ static void distribute_matrix_C(void)
 			int target_rank = (b_row+b_col)%comm_size;
 
 			/* Move the block on to its new owner. */
-			starpu_mpi_get_data_on_node(MPI_COMM_WORLD, h, target_rank);
-			starpu_mpi_data_set_rank(h, target_rank);
+			starpu_mpi_data_migrate(MPI_COMM_WORLD, h, target_rank);
 		}
 	}
 }
@@ -195,8 +194,7 @@ static void undistribute_matrix_C(void)
 	for (b_row = 0; b_row < NB; b_row++) {
 		for (b_col = 0; b_col < NB; b_col++) {
 			starpu_data_handle_t h = C_h[b_row*NB+b_col]; 
-			starpu_mpi_get_data_on_node(MPI_COMM_WORLD, h, 0);
-			starpu_mpi_data_set_rank(h, 0);
+			starpu_mpi_data_migrate(MPI_COMM_WORLD, h, 0);
 		}
 	}
 }

+ 2 - 4
mpi/examples/native_fortran/nf_mm.f90

@@ -159,8 +159,7 @@ program nf_mm
         do b_col=1,NB
         do b_row=1,NB
         rank = modulo(b_row+b_col, comm_size)
-        call fstarpu_mpi_get_data_on_node(comm_world, dh_c(b_row,b_col), rank)
-        call fstarpu_mpi_data_set_rank(dh_c(b_row,b_col), rank)
+        call fstarpu_mpi_data_migrate(comm_world, dh_c(b_row,b_col), rank)
         end do
         end do
 
@@ -179,8 +178,7 @@ program nf_mm
         ! undistribute matrix C
         do b_col=1,NB
         do b_row=1,NB
-        call fstarpu_mpi_get_data_on_node(comm_world, dh_c(b_row,b_col), 0)
-        call fstarpu_mpi_data_set_rank(dh_c(b_row,b_col), 0)
+        call fstarpu_mpi_data_migrate(comm_world, dh_c(b_row,b_col), 0)
         end do
         end do
 

+ 2 - 8
mpi/examples/stencil/stencil5.c

@@ -205,12 +205,8 @@ int main(int argc, char **argv)
 				starpu_mpi_data_register(data_handles[x][y], (y*X)+x, mpi_rank);
 			}
 			if (data_handles[x][y] && mpi_rank != starpu_mpi_data_get_rank(data_handles[x][y]))
-			{
 				/* Migrate the data */
-				starpu_mpi_get_data_on_node_detached(MPI_COMM_WORLD, data_handles[x][y], mpi_rank, NULL, NULL);
-				/* And register new rank of the matrix */
-				starpu_mpi_data_set_rank(data_handles[x][y], mpi_rank);
-			}
+				starpu_mpi_data_migrate(MPI_COMM_WORLD, data_handles[x][y], mpi_rank);
 		}
 	}
 
@@ -240,9 +236,7 @@ int main(int argc, char **argv)
 			{
 				int mpi_rank = my_distrib(x, y, size);
 				/* Get back data to original place where the user-provided buffer is. */
-				starpu_mpi_get_data_on_node_detached(MPI_COMM_WORLD, data_handles[x][y], mpi_rank, NULL, NULL);
-				/* Register original rank of the matrix (although useless) */
-				starpu_mpi_data_set_rank(data_handles[x][y], mpi_rank);
+				starpu_mpi_data_migrate(MPI_COMM_WORLD, data_handles[x][y], mpi_rank);
 				/* And unregister it */
 				starpu_data_unregister(data_handles[x][y]);
 			}

+ 9 - 0
mpi/include/fstarpu_mpi_mod.f90

@@ -455,6 +455,15 @@ module fstarpu_mpi_mod
                         type(c_ptr), value, intent(in) :: dh
                 end function fstarpu_mpi_data_get_tag
 
+                ! void starpu_mpi_data_migrate(MPI_Comm comm, starpu_data_handle_t handle, int rank);
+                subroutine fstarpu_mpi_data_migrate(mpi_comm,dh,rank) bind(C)
+                        use iso_c_binding
+                        implicit none
+                        integer(c_int), value, intent(in) :: mpi_comm
+                        type(c_ptr), value, intent(in) :: dh
+                        integer(c_int), value, intent(in) :: rank
+                end subroutine fstarpu_mpi_data_migrate
+
                 ! #define STARPU_MPI_NODE_SELECTION_CURRENT_POLICY -1
                 ! #define STARPU_MPI_NODE_SELECTION_MOST_R_DATA    0
 

+ 3 - 1
mpi/include/starpu_mpi.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2009-2012, 2014-2015  Université de Bordeaux
+ * Copyright (C) 2009-2012, 2014-2016  Université de Bordeaux
  * Copyright (C) 2010, 2011, 2012, 2013, 2014, 2015, 2016  CNRS
  * Copyright (C) 2016  Inria
  *
@@ -97,6 +97,8 @@ int starpu_mpi_data_get_tag(starpu_data_handle_t handle);
 #define starpu_data_get_rank starpu_mpi_data_get_rank
 #define starpu_data_get_tag starpu_mpi_data_get_tag
 
+void starpu_mpi_data_migrate(MPI_Comm comm, starpu_data_handle_t handle, int rank);
+
 #define STARPU_MPI_NODE_SELECTION_CURRENT_POLICY -1
 #define STARPU_MPI_NODE_SELECTION_MOST_R_DATA    0
 

+ 44 - 0
mpi/src/starpu_mpi.c

@@ -1732,6 +1732,9 @@ void starpu_mpi_data_register_comm(starpu_data_handle_t data_handle, int tag, in
 	else
 	{
 		mpi_data = calloc(1, sizeof(struct _starpu_mpi_node_tag));
+		mpi_data->data_tag = -1;
+		mpi_data->rank = -1;
+		mpi_data->comm = MPI_COMM_WORLD;
 		data_handle->mpi_data = mpi_data;
 		_starpu_mpi_data_register_tag(data_handle, tag);
 		_starpu_data_set_unregister_hook(data_handle, _starpu_mpi_clear_cache);
@@ -1771,6 +1774,47 @@ int starpu_mpi_data_get_tag(starpu_data_handle_t data)
 	return ((struct _starpu_mpi_node_tag *)(data->mpi_data))->data_tag;
 }
 
+void starpu_mpi_data_migrate(MPI_Comm comm, starpu_data_handle_t data, int new_rank)
+{
+	int me;
+	int old_rank = starpu_mpi_data_get_rank(data);
+	int data_tag = starpu_mpi_data_get_tag(data);
+	if (new_rank == old_rank)
+		/* Already there */
+		return;
+	starpu_mpi_comm_rank(comm, &me);
+
+	/* First submit data migration if it's not already on destination */
+	if (new_rank == me)
+	{
+		_STARPU_MPI_DEBUG(1, "Migrating data %p from %d to %d\n", data, old_rank, new_rank);
+		void *already_received = _starpu_mpi_cache_received_data_set(data, old_rank);
+		if (already_received == NULL)
+		{
+			_STARPU_MPI_DEBUG(1, "Receiving data %p from %d\n", data, old_rank);
+			starpu_mpi_irecv_detached(data, old_rank, data_tag, comm, NULL, NULL);
+		}
+	}
+	if (old_rank == me)
+	{
+		_STARPU_MPI_DEBUG(1, "Migrating data %p from %d to %d\n", data, old_rank, new_rank);
+		void *already_sent = _starpu_mpi_cache_sent_data_set(data, new_rank);
+		if (already_sent == NULL)
+		{
+			_STARPU_MPI_DEBUG(1, "Sending data %p to %d\n", data, new_rank);
+			starpu_mpi_isend_detached(data, new_rank, data_tag, comm, NULL, NULL);
+		}
+	}
+
+	/* And note new owner */
+	starpu_mpi_data_set_rank_comm(data, new_rank, comm);
+
+	/* Flush cache in all other nodes */
+	/* TODO: Ideally we'd transmit the knowledge of who owns it */
+	starpu_mpi_cache_flush(comm, data);
+	return;
+}
+
 int starpu_mpi_comm_size(MPI_Comm comm, int *size)
 {
 #ifdef STARPU_SIMGRID

+ 2 - 1
mpi/src/starpu_mpi_cache.c

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2011, 2012, 2013, 2014, 2015  CNRS
- * Copyright (C) 2011-2015  Université de Bordeaux
+ * Copyright (C) 2011-2016  Université de Bordeaux
  * Copyright (C) 2014 INRIA
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -293,6 +293,7 @@ void *_starpu_mpi_cache_received_data_set(starpu_data_handle_t data, int mpi_ran
 		struct _starpu_data_entry *entry = (struct _starpu_data_entry *)malloc(sizeof(*entry));
 		entry->data = data;
 		HASH_ADD_PTR(_cache_received_data[mpi_rank], data, entry);
+		_STARPU_MPI_DEBUG(2, "Noting that data %p has already been received by %d\n", data, mpi_rank);
 		_starpu_mpi_cache_stats_inc(mpi_rank, data);
 	}
 	else

+ 5 - 0
mpi/src/starpu_mpi_fortran.c

@@ -249,6 +249,11 @@ void fstarpu_mpi_data_set_rank(starpu_data_handle_t handle, int rank)
 	return starpu_mpi_data_set_rank_comm(handle, rank, MPI_COMM_WORLD);
 }
 
+void fstarpu_mpi_data_migrate(MPI_Fint comm, starpu_data_handle_t handle, int rank)
+{
+	return starpu_mpi_data_migrate(MPI_Comm_f2c(comm), handle, rank);
+}
+
 int fstarpu_mpi_wait_for_all(MPI_Fint comm)
 {
 	return starpu_mpi_wait_for_all(MPI_Comm_f2c(comm));

+ 1 - 1
mpi/src/starpu_mpi_private.c

@@ -19,7 +19,7 @@
 
 int _starpu_debug_rank=-1;
 int _starpu_debug_level_min=0;
-int _starpu_debug_level_max=0;
+int _starpu_debug_level_max=2;
 int _starpu_mpi_tag = 42;
 int _starpu_mpi_comm;
 

+ 1 - 1
src/datawizard/data_request.c

@@ -636,7 +636,7 @@ static int __starpu_handle_node_data_requests(struct _starpu_data_request_prio_l
 			 * rather have the caller block, and explicitly wait
 			 * for eviction to happen.
 			 */
-			MSG_process_sleep(0.000010);
+			MSG_process_sleep(0.000001);
 			_starpu_wake_all_blocked_workers_on_node(src_node);
 		}
 #elif !defined(STARPU_NON_BLOCKING_DRIVERS)

+ 2 - 2
src/datawizard/datawizard.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2009-2010, 2012-2015  Université de Bordeaux
+ * Copyright (C) 2009-2010, 2012-2016  Université de Bordeaux
  * Copyright (C) 2010, 2011, 2013  CNRS
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -31,7 +31,7 @@ int __starpu_datawizard_progress(unsigned memory_node, unsigned may_alloc, unsig
 
 #ifdef STARPU_SIMGRID
 	/* XXX */
-	MSG_process_sleep(0.000010);
+	MSG_process_sleep(0.000001);
 #endif
 	STARPU_UYIELD();
 

+ 2 - 0
src/datawizard/interfaces/data_interface.c

@@ -930,6 +930,8 @@ static void _starpu_data_invalidate(void *data)
 	size_t size = _starpu_data_get_size(handle);
 	_starpu_spin_lock(&handle->header_lock);
 
+	_STARPU_DEBUG("Really invalidating data %p\n", data);
+
 	unsigned node;
 	for (node = 0; node < STARPU_MAXNODES; node++)
 	{