Browse Source

mpi: Communication cache mechanism is enabled by default, and can only be disabled at execution time by setting the environment variable STARPU_MPI_CACHE to 0.

Nathalie Furmento 12 years ago
parent
commit
c26e1ccdc6
5 changed files with 72 additions and 61 deletions
  1. 7 2
      ChangeLog
  2. 0 8
      configure.ac
  3. 6 0
      doc/chapters/configuration.texi
  4. 4 2
      doc/chapters/mpi-support.texi
  5. 55 49
      mpi/src/starpu_mpi_insert_task.c

+ 7 - 2
ChangeLog

@@ -36,8 +36,13 @@ New features:
   * SOCL
         - Manual mapping of commands on specific devices is now possible
   * New interface: COO matrix.
-  * Communication statistics for MPI can only be enabled at execution
-    time by defining the environment variable STARPU_COMM_STATS
+  * MPI: 
+        - Communication statistics for MPI can only be enabled at
+	  execution time by defining the environment variable
+	  STARPU_COMM_STATS
+        - Communication cache mechanism is enabled by default, and can
+	  only be disabled at execution time by setting the
+	  environment variable STARPU_MPI_CACHE to 0.
 
 Changes:
   * Fix the block filter functions.

+ 0 - 8
configure.ac

@@ -1064,14 +1064,6 @@ else
 fi
 AC_SUBST(CC_OR_MPICC, $cc_or_mpicc)
 
-# Check if user wants to disable mpi cache
-AC_ARG_ENABLE(mpi-cache, AC_HELP_STRING([--enable-mpi-cache],
-			 [Enable MPI communication cache]),
-			 enable_mpi_cache=$enableval, enable_mpi_cache=true)
-if  test x$enable_mpi_cache = xtrue; then
-    	AC_DEFINE(STARPU_MPI_CACHE, [1], [enable MPI communication cache])
-fi
-
 # If the user specifically asks for it, or if we are in a developer checkout, we enable mpi check
 AC_ARG_ENABLE(mpi-check, AC_HELP_STRING([--enable-mpi-check], [Enable execution of MPI testcases]))
 running_mpi_check=no

+ 6 - 0
doc/chapters/configuration.texi

@@ -415,6 +415,12 @@ Communication statistics for starpumpi (@pxref{StarPU MPI support})
 will be enabled when the environment variable @code{STARPU_COMM_STATS}
 is defined to an value other than 0.
 
+@item @code{STARPU_MPI_CACHE}
+@anchor{STARPU_MPI_CACHE}
+Communication cache for starpumpi (@pxref{StarPU MPI support}) will be
+disabled when the environment variable @code{STARPU_MPI_CACHE} is set
+to 0. It is enabled by default or for any other values of the variable
+@code{STARPU_MPI_CACHE}.
 @end table
 
 @node Misc

+ 4 - 2
doc/chapters/mpi-support.texi

@@ -375,8 +375,10 @@ latter receives them.
 written data back to their owners.
 @end enumerate
 
-The algorithm also includes a cache mechanism that allows not to send
-data twice to the same MPI node, unless the data has been modified.
+The algorithm also includes a communication cache mechanism that
+allows not to send data twice to the same MPI node, unless the data
+has been modified. The cache can be disabled
+(@pxref{STARPU_MPI_CACHE}).
 
 @end deftypefun
 

+ 55 - 49
mpi/src/starpu_mpi_insert_task.c

@@ -28,7 +28,6 @@
 //#define STARPU_MPI_VERBOSE 1
 #include <starpu_mpi_private.h>
 
-#ifdef STARPU_MPI_CACHE
 /* Whether we are allowed to keep copies of remote data. */
 struct _starpu_data_entry
 {
@@ -36,32 +35,41 @@ struct _starpu_data_entry
 	void *data;
 };
 
-struct _starpu_data_entry **sent_data = NULL;
-struct _starpu_data_entry **received_data = NULL;
-#endif /* STARPU_MPI_CACHE */
+static struct _starpu_data_entry **sent_data = NULL;
+static struct _starpu_data_entry **received_data = NULL;
+static int cache_enabled=1;
 
 void _starpu_mpi_tables_init(MPI_Comm comm)
 {
-#ifdef STARPU_MPI_CACHE
 	int nb_nodes;
 	int i;
 
+	cache_enabled = starpu_get_env_number("STARPU_MPI_CACHE");
+	if (cache_enabled == -1)
+	{
+		cache_enabled = 1;
+	}
+
+	if (cache_enabled == 0)
+	{
+		if (!getenv("STARPU_SILENT")) fprintf(stderr,"Warning: StarPU MPI Communication cache is disabled\n");
+		return;
+	}
+
 	MPI_Comm_size(comm, &nb_nodes);
 	_STARPU_MPI_DEBUG("Initialising htable for cache\n");
 	sent_data = malloc(nb_nodes * sizeof(struct _starpu_data_entry *));
 	for(i=0 ; i<nb_nodes ; i++) sent_data[i] = NULL;
 	received_data = malloc(nb_nodes * sizeof(struct _starpu_data_entry *));
 	for(i=0 ; i<nb_nodes ; i++) received_data[i] = NULL;
-#else
-	if (!getenv("STARPU_SILENT")) fprintf(stderr,"Warning: StarPU was configured with --disable-mpi-cache\n");
-#endif /* STARPU_MPI_CACHE */
 }
 
 void _starpu_mpi_tables_free(int world_size)
 {
-#ifdef STARPU_MPI_CACHE
 	int i;
 
+	if (cache_enabled == 0) return;
+
 	_STARPU_MPI_DEBUG("Clearing htable for cache\n");
 
 	for(i=0 ; i<world_size ; i++)
@@ -80,7 +88,6 @@ void _starpu_mpi_tables_free(int world_size)
 	}
 	free(sent_data);
 	free(received_data);
-#endif
 }
 
 static
@@ -145,7 +152,8 @@ int _starpu_mpi_find_executee_node(starpu_data_handle_t data, enum starpu_access
 static
 void *_starpu_mpi_already_received(starpu_data_handle_t data, int mpi_rank)
 {
-#ifdef STARPU_MPI_CACHE
+	if (cache_enabled == 0) return NULL;
+
 	struct _starpu_data_entry *already_received;
 	HASH_FIND_PTR(received_data[mpi_rank], &data, already_received);
 	if (already_received == NULL)
@@ -159,15 +167,13 @@ void *_starpu_mpi_already_received(starpu_data_handle_t data, int mpi_rank)
 		_STARPU_MPI_DEBUG("Do not receive data %p from node %d as it is already available\n", data, mpi_rank);
 	}
 	return already_received;
-#else
-	return NULL;
-#endif
 }
 
 static
 void *_starpu_mpi_already_sent(starpu_data_handle_t data, int dest)
 {
-#ifdef STARPU_MPI_CACHE
+	if (cache_enabled == 0) return NULL;
+
 	struct _starpu_data_entry *already_sent;
 	HASH_FIND_PTR(sent_data[dest], &data, already_sent);
 	if (already_sent == NULL)
@@ -182,9 +188,6 @@ void *_starpu_mpi_already_sent(starpu_data_handle_t data, int dest)
 		_STARPU_MPI_DEBUG("Do not send data %p to node %d as it has already been sent\n", data, dest);
 	}
 	return already_sent;
-#else
-	return NULL;
-#endif
 }
 
 static
@@ -263,51 +266,54 @@ void _starpu_mpi_exchange_data_after_execution(starpu_data_handle_t data, enum s
 
 void _starpu_mpi_clear_data_after_execution(starpu_data_handle_t data, enum starpu_access_mode mode, int me, int do_execute, MPI_Comm comm)
 {
-#ifdef STARPU_MPI_CACHE
-	if (mode & STARPU_W)
+	if (cache_enabled)
 	{
-		if (do_execute)
+		if (mode & STARPU_W)
 		{
-			/* Note that all copies I've sent to neighbours are now invalid */
-			int n, size;
-			MPI_Comm_size(comm, &size);
-			for(n=0 ; n<size ; n++)
+			if (do_execute)
 			{
-				struct _starpu_data_entry *already_sent;
-				HASH_FIND_PTR(sent_data[n], &data, already_sent);
-				if (already_sent)
+				/* Note that all copies I've sent to neighbours are now invalid */
+				int n, size;
+				MPI_Comm_size(comm, &size);
+				for(n=0 ; n<size ; n++)
 				{
-					_STARPU_MPI_DEBUG("Clearing send cache for data %p\n", data);
-					HASH_DEL(sent_data[n], already_sent);
+					struct _starpu_data_entry *already_sent;
+					HASH_FIND_PTR(sent_data[n], &data, already_sent);
+					if (already_sent)
+					{
+						_STARPU_MPI_DEBUG("Clearing send cache for data %p\n", data);
+						HASH_DEL(sent_data[n], already_sent);
+					}
 				}
 			}
-		}
-		else
-		{
-			int mpi_rank = starpu_data_get_rank(data);
-			struct _starpu_data_entry *already_received;
-			HASH_FIND_PTR(received_data[mpi_rank], &data, already_received);
-			if (already_received)
+			else
 			{
-				/* Somebody else will write to the data, so discard our cached copy if any */
-				/* TODO: starpu_mpi could just remember itself. */
-				_STARPU_MPI_DEBUG("Clearing receive cache for data %p\n", data);
-				HASH_DEL(received_data[mpi_rank], already_received);
-				starpu_data_invalidate_submit(data);
+				int mpi_rank = starpu_data_get_rank(data);
+				struct _starpu_data_entry *already_received;
+				HASH_FIND_PTR(received_data[mpi_rank], &data, already_received);
+				if (already_received)
+				{
+					/* Somebody else will write to the data, so discard our cached copy if any */
+					/* TODO: starpu_mpi could just remember itself. */
+					_STARPU_MPI_DEBUG("Clearing receive cache for data %p\n", data);
+					HASH_DEL(received_data[mpi_rank], already_received);
+					starpu_data_invalidate_submit(data);
+				}
 			}
 		}
 	}
-#else
-	/* We allocated a temporary buffer for the received data, now drop it */
-	if ((mode & STARPU_R) && do_execute)
+	else
 	{
-		int mpi_rank = starpu_data_get_rank(data);
-		if (mpi_rank != me && mpi_rank != -1)
+		/* We allocated a temporary buffer for the received data, now drop it */
+		if ((mode & STARPU_R) && do_execute)
 		{
-			starpu_data_invalidate_submit(data);
+			int mpi_rank = starpu_data_get_rank(data);
+			if (mpi_rank != me && mpi_rank != -1)
+			{
+				starpu_data_invalidate_submit(data);
+			}
 		}
 	}
-#endif
 }
 
 int starpu_mpi_insert_task(MPI_Comm comm, struct starpu_codelet *codelet, ...)