瀏覽代碼

mpi: new functions to clear the communication cache used by starpu_mpi_insert_task

void starpu_mpi_cache_flush(MPI_Comm comm, starpu_data_handle_t data_handle);
void starpu_mpi_cache_flush_all_data(MPI_Comm comm);
Nathalie Furmento 12 年之前
父節點
當前提交
644678ee6d
共有 3 個文件被更改,包括 73 次插入1 次删除
  1. 26 0
      doc/chapters/mpi-support.texi
  2. 4 0
      mpi/include/starpu_mpi.h
  3. 43 1
      mpi/src/starpu_mpi_insert_task.c

+ 26 - 0
doc/chapters/mpi-support.texi

@@ -30,6 +30,14 @@ according to the task graph and an application-provided distribution.
 @node The API
 @section The API
 
+@menu
+* Compilation::
+* Initialisation::
+* Communication::
+* Communication cache::
+@end menu
+
+@node Compilation
 @subsection Compilation
 
 The flags required to compile or link against the MPI layer are then
@@ -42,6 +50,7 @@ accessible with the following commands:
 
 Also pass the @code{--static} option if the application is to be linked statically.
 
+@node Initialisation
 @subsection Initialisation
 
 @deftypefun int starpu_mpi_init (int *@var{argc}, char ***@var{argv}, int initialize_mpi)
@@ -78,6 +87,7 @@ to the world size. Communications statistics must be enabled
 (@pxref{STARPU_COMM_STATS}).
 @end deftypefun
 
+@node Communication
 @subsection Communication
 
 The standard point to point communications of MPI have been
@@ -170,6 +180,22 @@ node of the array @var{source} using the n-th message tag of the array
 On completion of the all the requests, @var{tag} is unlocked.
 @end deftypefun
 
+@node Communication cache
+@subsection Communication cache
+
+@deftypefun void starpu_mpi_cache_flush (MPI_Comm @var{comm}, starpu_data_handle_t @var{data_handle})
+Clear the send and receive communication cache for the data
+@var{data_handle}. The function has to be called synchronously by all
+the MPI nodes.
+The function does nothing if the cache mechanism is disabled (@pxref{STARPU_MPI_CACHE}).
+@end deftypefun
+
+@deftypefun void starpu_mpi_cache_flush_all_data (MPI_Comm @var{comm})
+Clear the send and receive communication cache for all data. The
+function has to be called synchronously by all the MPI nodes.
+The function does nothing if the cache mechanism is disabled (@pxref{STARPU_MPI_CACHE}).
+@end deftypefun
+
 @page
 @node Simple Example
 @section Simple Example

+ 4 - 0
mpi/include/starpu_mpi.h

@@ -66,6 +66,10 @@ int starpu_mpi_irecv_array_detached_unlock_tag(unsigned array_size, starpu_data_
 
 /* retrieve the current amount of communications from the current node */
 void starpu_mpi_comm_amounts_retrieve(size_t *comm_amounts);
+
+void starpu_mpi_cache_flush(MPI_Comm comm, starpu_data_handle_t data_handle);
+void starpu_mpi_cache_flush_all_data(MPI_Comm comm);
+
 #ifdef __cplusplus
 }
 #endif

+ 43 - 1
mpi/src/starpu_mpi_insert_task.c

@@ -64,7 +64,7 @@ void _starpu_mpi_cache_init(MPI_Comm comm)
 	for(i=0 ; i<nb_nodes ; i++) _cache_received_data[i] = NULL;
 }
 
-void _starpu_mpi_cache_free(int world_size)
+void _starpu_mpi_cache_empty_tables(int world_size)
 {
 	int i;
 
@@ -86,10 +86,52 @@ void _starpu_mpi_cache_free(int world_size)
 			free(entry);
 		}
 	}
+}
+
+void _starpu_mpi_cache_free(int world_size)
+{
+	if (_cache_enabled == 0) return;
+
+	_starpu_mpi_cache_empty_tables(world_size);
 	free(_cache_sent_data);
 	free(_cache_received_data);
 }
 
+void starpu_mpi_cache_flush_all_data(MPI_Comm comm)
+{
+	int nb_nodes;
+
+	if (_cache_enabled == 0) return;
+
+	MPI_Comm_size(comm, &nb_nodes);
+	_starpu_mpi_cache_empty_tables(nb_nodes);
+}
+
+void starpu_mpi_cache_flush(MPI_Comm comm, starpu_data_handle_t data_handle)
+{
+	struct _starpu_data_entry *avail;
+	int i, nb_nodes;
+
+	if (_cache_enabled == 0) return;
+
+	MPI_Comm_size(comm, &nb_nodes);
+	for(i=0 ; i<nb_nodes ; i++)
+	{
+		HASH_FIND_PTR(_cache_sent_data[i], &data_handle, avail);
+		if (avail)
+		{
+			_STARPU_MPI_DEBUG("Clearing send cache for data %p\n", data_handle);
+			HASH_DEL(_cache_sent_data[i], avail);
+		}
+		HASH_FIND_PTR(_cache_received_data[i], &data_handle, avail);
+		if (avail)
+		{
+			_STARPU_MPI_DEBUG("Clearing send cache for data %p\n", data_handle);
+			HASH_DEL(_cache_received_data[i], avail);
+		}
+	}
+}
+
 static
 void *_starpu_mpi_already_received(starpu_data_handle_t data, int mpi_rank)
 {