Browse Source

mpi: Move MPI cache functions into the public API

Nathalie Furmento 5 years ago
parent
commit
ee34c4c332

+ 1 - 0
ChangeLog

@@ -77,6 +77,7 @@ Small features:
     calling hwloc-ps
     calling hwloc-ps
 Small changes:
 Small changes:
   * New configure option --disable-build-doc-pdf
   * New configure option --disable-build-doc-pdf
+  * Move MPI cache functions into the public API
 
 
 StarPU 1.3.3 (git revision 11afc5b007fe1ab1c729b55b47a5a98ef7f3cfad)
 StarPU 1.3.3 (git revision 11afc5b007fe1ab1c729b55b47a5a98ef7f3cfad)
 ====================================================================
 ====================================================================

+ 10 - 2
doc/doxygen/chapters/410_mpi_support.doxy

@@ -759,8 +759,16 @@ add fine-graph starpu_mpi_cache_flush() calls during the algorithm; the effect
 for the data deallocation will be the same, but it will additionally release some
 for the data deallocation will be the same, but it will additionally release some
 pressure from the StarPU-MPI cache hash table during task submission.
 pressure from the StarPU-MPI cache hash table during task submission.
 
 
-One can determine whether a piece of is cached with starpu_mpi_cached_receive()
+One can determine whether a piece of data is cached with
-and starpu_mpi_cached_send().
+starpu_mpi_cached_receive() and starpu_mpi_cached_send().
+
+Functions starpu_mpi_cached_receive_set() and
+starpu_mpi_cached_send_set() are automatically called by
+starpu_mpi_task_insert() but can also be called directly by the
+application. Functions starpu_mpi_cached_send_clear() and
+starpu_mpi_cached_receive_clear() must be called to clear data from
+the cache. They are also automatically called when using
+starpu_mpi_task_insert().
 
 
 The whole caching behavior can be disabled thanks to the \ref STARPU_MPI_CACHE
 The whole caching behavior can be disabled thanks to the \ref STARPU_MPI_CACHE
 environment variable. The variable \ref STARPU_MPI_CACHE_STATS can be set to <c>1</c>
 environment variable. The variable \ref STARPU_MPI_CACHE_STATS can be set to <c>1</c>

+ 11 - 0
mpi/examples/Makefile.am

@@ -128,6 +128,17 @@ starpu_mpi_EXAMPLES	+=	\
 endif
 endif
 
 
 ##################
 ##################
+# Cache examples #
+##################
+examplebin_PROGRAMS +=		\
+	cache/cache		\
+	cache/cache_disable
+starpu_mpi_EXAMPLES +=		\
+	cache/cache		\
+	cache/cache_disable
+
+
+##################
 # MPI LU example #
 # MPI LU example #
 ##################
 ##################
 
 

+ 1 - 2
mpi/tests/cache.c

@@ -17,7 +17,6 @@
 #include <starpu_mpi.h>
 #include <starpu_mpi.h>
 #include <math.h>
 #include <math.h>
 #include "helper.h"
 #include "helper.h"
-#include <starpu_mpi_cache.h>
 
 
 void func_cpu(void *descr[], void *_args)
 void func_cpu(void *descr[], void *_args)
 {
 {
@@ -57,7 +56,7 @@ void test(struct starpu_codelet *codelet, enum starpu_data_access_mode mode, sta
 	ret = starpu_mpi_task_insert(MPI_COMM_WORLD, codelet, mode, data, STARPU_EXECUTE_ON_NODE, 1, 0);
 	ret = starpu_mpi_task_insert(MPI_COMM_WORLD, codelet, mode, data, STARPU_EXECUTE_ON_NODE, 1, 0);
 	STARPU_CHECK_RETURN_VALUE(ret, "starpu_mpi_task_insert");
 	STARPU_CHECK_RETURN_VALUE(ret, "starpu_mpi_task_insert");
 
 
-	cache = _starpu_mpi_cache_received_data_get(data);
+	cache = starpu_mpi_cached_receive(data);
 
 
 	if (rank == 1)
 	if (rank == 1)
 	{
 	{

+ 3 - 4
mpi/tests/cache_disable.c

@@ -17,7 +17,6 @@
 #include <starpu_mpi.h>
 #include <starpu_mpi.h>
 #include <math.h>
 #include <math.h>
 #include "helper.h"
 #include "helper.h"
-#include <starpu_mpi_cache.h>
 
 
 void func_cpu(void *descr[], void *_args)
 void func_cpu(void *descr[], void *_args)
 {
 {
@@ -63,7 +62,7 @@ int main(int argc, char **argv)
 	ret = starpu_mpi_task_insert(MPI_COMM_WORLD, &mycodelet_r, STARPU_R, data, STARPU_EXECUTE_ON_NODE, 1, 0);
 	ret = starpu_mpi_task_insert(MPI_COMM_WORLD, &mycodelet_r, STARPU_R, data, STARPU_EXECUTE_ON_NODE, 1, 0);
 	STARPU_CHECK_RETURN_VALUE(ret, "starpu_mpi_task_insert");
 	STARPU_CHECK_RETURN_VALUE(ret, "starpu_mpi_task_insert");
 
 
-	in_cache = _starpu_mpi_cache_received_data_get(data);
+	in_cache = starpu_mpi_cached_receive(data);
 	if (rank == 1)
 	if (rank == 1)
 	{
 	{
 		STARPU_ASSERT_MSG(in_cache == 1, "Data should be in cache\n");
 		STARPU_ASSERT_MSG(in_cache == 1, "Data should be in cache\n");
@@ -73,7 +72,7 @@ int main(int argc, char **argv)
 	starpu_mpi_cache_set(0);
 	starpu_mpi_cache_set(0);
 
 
 	// We check the data is no longer in the cache
 	// We check the data is no longer in the cache
-	in_cache = _starpu_mpi_cache_received_data_get(data);
+	in_cache = starpu_mpi_cached_receive(data);
 	if (rank == 1)
 	if (rank == 1)
 	{
 	{
 		STARPU_ASSERT_MSG(in_cache == 0, "Data should NOT be in cache\n");
 		STARPU_ASSERT_MSG(in_cache == 0, "Data should NOT be in cache\n");
@@ -81,7 +80,7 @@ int main(int argc, char **argv)
 
 
 	ret = starpu_mpi_task_insert(MPI_COMM_WORLD, &mycodelet_r, STARPU_R, data, STARPU_EXECUTE_ON_NODE, 1, 0);
 	ret = starpu_mpi_task_insert(MPI_COMM_WORLD, &mycodelet_r, STARPU_R, data, STARPU_EXECUTE_ON_NODE, 1, 0);
 	STARPU_CHECK_RETURN_VALUE(ret, "starpu_mpi_task_insert");
 	STARPU_CHECK_RETURN_VALUE(ret, "starpu_mpi_task_insert");
-	in_cache = _starpu_mpi_cache_received_data_get(data);
+	in_cache = starpu_mpi_cached_receive(data);
 	if (rank == 1)
 	if (rank == 1)
 	{
 	{
 		STARPU_ASSERT_MSG(in_cache == 0, "Data should NOT be in cache\n");
 		STARPU_ASSERT_MSG(in_cache == 0, "Data should NOT be in cache\n");

+ 27 - 0
mpi/include/starpu_mpi.h

@@ -422,12 +422,39 @@ void starpu_mpi_cache_flush_all_data(MPI_Comm comm);
 int starpu_mpi_cached_receive(starpu_data_handle_t data_handle);
 int starpu_mpi_cached_receive(starpu_data_handle_t data_handle);
 
 
 /**
 /**
+ * If \p data is already available in the reception cache, return 1
+ * If \p data is NOT available in the reception cache, add it to the
+ * cache and return 0
+ * Return 0 if the communication cache is not enabled
+ */
+int starpu_mpi_cached_receive_set(starpu_data_handle_t data);
+
+/**
+ * Remove \p data from the reception cache
+ */
+void starpu_mpi_cached_receive_clear(starpu_data_handle_t data);
+
+/**
    Test whether \p data_handle is cached for emission to node \p dest,
    Test whether \p data_handle is cached for emission to node \p dest,
    i.e. the value was previously sent to \p dest, and not flushed
    i.e. the value was previously sent to \p dest, and not flushed
    since then.
    since then.
 */
 */
 int starpu_mpi_cached_send(starpu_data_handle_t data_handle, int dest);
 int starpu_mpi_cached_send(starpu_data_handle_t data_handle, int dest);
 
 
+/**
+ * If \p data is already available in the emission cache for node
+ * \p dest, return 1
+ * If \p data is NOT available in the emission cache for node \p dest,
+ * add it to the cache and return 0
+ * Return 0 if the communication cache is not enabled
+ */
+int starpu_mpi_cached_send_set(starpu_data_handle_t data, int dest);
+
+/**
+ * Remove \p data from the emission cache
+ */
+void starpu_mpi_cached_send_clear(starpu_data_handle_t data);
+
 /** @} */
 /** @} */
 
 
 /**
 /**

+ 4 - 4
mpi/src/starpu_mpi.c

@@ -346,7 +346,7 @@ void starpu_mpi_get_data_on_node_detached(MPI_Comm comm, starpu_data_handle_t da
 	if (me == node)
 	if (me == node)
 	{
 	{
 		_STARPU_MPI_DEBUG(1, "Migrating data %p from %d to %d\n", data_handle, rank, node);
 		_STARPU_MPI_DEBUG(1, "Migrating data %p from %d to %d\n", data_handle, rank, node);
-		int already_received = _starpu_mpi_cache_received_data_set(data_handle);
+		int already_received = starpu_mpi_cached_receive_set(data_handle);
 		if (already_received == 0)
 		if (already_received == 0)
 		{
 		{
 			_STARPU_MPI_DEBUG(1, "Receiving data %p from %d\n", data_handle, rank);
 			_STARPU_MPI_DEBUG(1, "Receiving data %p from %d\n", data_handle, rank);
@@ -356,7 +356,7 @@ void starpu_mpi_get_data_on_node_detached(MPI_Comm comm, starpu_data_handle_t da
 	else if (me == rank)
 	else if (me == rank)
 	{
 	{
 		_STARPU_MPI_DEBUG(1, "Migrating data %p from %d to %d\n", data_handle, rank, node);
 		_STARPU_MPI_DEBUG(1, "Migrating data %p from %d to %d\n", data_handle, rank, node);
-		int already_sent = _starpu_mpi_cache_sent_data_set(data_handle, node);
+		int already_sent = starpu_mpi_cached_send_set(data_handle, node);
 		if (already_sent == 0)
 		if (already_sent == 0)
 		{
 		{
 			_STARPU_MPI_DEBUG(1, "Sending data %p to %d\n", data_handle, node);
 			_STARPU_MPI_DEBUG(1, "Sending data %p to %d\n", data_handle, node);
@@ -389,7 +389,7 @@ void starpu_mpi_get_data_on_node(MPI_Comm comm, starpu_data_handle_t data_handle
 	{
 	{
 		MPI_Status status;
 		MPI_Status status;
 		_STARPU_MPI_DEBUG(1, "Migrating data %p from %d to %d\n", data_handle, rank, node);
 		_STARPU_MPI_DEBUG(1, "Migrating data %p from %d to %d\n", data_handle, rank, node);
-		int already_received = _starpu_mpi_cache_received_data_set(data_handle);
+		int already_received = starpu_mpi_cached_receive_set(data_handle);
 		if (already_received == 0)
 		if (already_received == 0)
 		{
 		{
 			_STARPU_MPI_DEBUG(1, "Receiving data %p from %d\n", data_handle, rank);
 			_STARPU_MPI_DEBUG(1, "Receiving data %p from %d\n", data_handle, rank);
@@ -399,7 +399,7 @@ void starpu_mpi_get_data_on_node(MPI_Comm comm, starpu_data_handle_t data_handle
 	else if (me == rank)
 	else if (me == rank)
 	{
 	{
 		_STARPU_MPI_DEBUG(1, "Migrating data %p from %d to %d\n", data_handle, rank, node);
 		_STARPU_MPI_DEBUG(1, "Migrating data %p from %d to %d\n", data_handle, rank, node);
-		int already_sent = _starpu_mpi_cache_sent_data_set(data_handle, node);
+		int already_sent = starpu_mpi_cached_send_set(data_handle, node);
 		if (already_sent == 0)
 		if (already_sent == 0)
 		{
 		{
 			_STARPU_MPI_DEBUG(1, "Sending data %p to %d\n", data_handle, node);
 			_STARPU_MPI_DEBUG(1, "Sending data %p to %d\n", data_handle, node);

+ 6 - 16
mpi/src/starpu_mpi_cache.c

@@ -172,7 +172,7 @@ static void _starpu_mpi_cache_data_remove_nolock(starpu_data_handle_t data_handl
 /**************************************
 /**************************************
  * Received cache
  * Received cache
  **************************************/
  **************************************/
-void _starpu_mpi_cache_received_data_clear(starpu_data_handle_t data_handle)
+void starpu_mpi_cached_receive_clear(starpu_data_handle_t data_handle)
 {
 {
 	int mpi_rank = starpu_mpi_data_get_rank(data_handle);
 	int mpi_rank = starpu_mpi_data_get_rank(data_handle);
 	struct _starpu_mpi_data *mpi_data = data_handle->mpi_data;
 	struct _starpu_mpi_data *mpi_data = data_handle->mpi_data;
@@ -198,7 +198,7 @@ void _starpu_mpi_cache_received_data_clear(starpu_data_handle_t data_handle)
 	STARPU_PTHREAD_MUTEX_UNLOCK(&_cache_mutex);
 	STARPU_PTHREAD_MUTEX_UNLOCK(&_cache_mutex);
 }
 }
 
 
-int _starpu_mpi_cache_received_data_set(starpu_data_handle_t data_handle)
+int starpu_mpi_cached_receive_set(starpu_data_handle_t data_handle)
 {
 {
 	int mpi_rank = starpu_mpi_data_get_rank(data_handle);
 	int mpi_rank = starpu_mpi_data_get_rank(data_handle);
 	struct _starpu_mpi_data *mpi_data = data_handle->mpi_data;
 	struct _starpu_mpi_data *mpi_data = data_handle->mpi_data;
@@ -226,7 +226,7 @@ int _starpu_mpi_cache_received_data_set(starpu_data_handle_t data_handle)
 	return already_received;
 	return already_received;
 }
 }
 
 
-int _starpu_mpi_cache_received_data_get(starpu_data_handle_t data_handle)
+int starpu_mpi_cached_receive(starpu_data_handle_t data_handle)
 {
 {
 	int already_received;
 	int already_received;
 	struct _starpu_mpi_data *mpi_data = data_handle->mpi_data;
 	struct _starpu_mpi_data *mpi_data = data_handle->mpi_data;
@@ -241,15 +241,10 @@ int _starpu_mpi_cache_received_data_get(starpu_data_handle_t data_handle)
 	return already_received;
 	return already_received;
 }
 }
 
 
-int starpu_mpi_cached_receive(starpu_data_handle_t data_handle)
-{
-	return _starpu_mpi_cache_received_data_get(data_handle);
-}
-
 /**************************************
 /**************************************
  * Send cache
  * Send cache
  **************************************/
  **************************************/
-void _starpu_mpi_cache_sent_data_clear(starpu_data_handle_t data_handle)
+void starpu_mpi_cached_send_clear(starpu_data_handle_t data_handle)
 {
 {
 	int n, size;
 	int n, size;
 	struct _starpu_mpi_data *mpi_data = data_handle->mpi_data;
 	struct _starpu_mpi_data *mpi_data = data_handle->mpi_data;
@@ -271,7 +266,7 @@ void _starpu_mpi_cache_sent_data_clear(starpu_data_handle_t data_handle)
 	STARPU_PTHREAD_MUTEX_UNLOCK(&_cache_mutex);
 	STARPU_PTHREAD_MUTEX_UNLOCK(&_cache_mutex);
 }
 }
 
 
-int _starpu_mpi_cache_sent_data_set(starpu_data_handle_t data_handle, int dest)
+int starpu_mpi_cached_send_set(starpu_data_handle_t data_handle, int dest)
 {
 {
 	struct _starpu_mpi_data *mpi_data = data_handle->mpi_data;
 	struct _starpu_mpi_data *mpi_data = data_handle->mpi_data;
 
 
@@ -296,7 +291,7 @@ int _starpu_mpi_cache_sent_data_set(starpu_data_handle_t data_handle, int dest)
 	return already_sent;
 	return already_sent;
 }
 }
 
 
-int _starpu_mpi_cache_sent_data_get(starpu_data_handle_t data_handle, int dest)
+int starpu_mpi_cached_send(starpu_data_handle_t data_handle, int dest)
 {
 {
 	struct _starpu_mpi_data *mpi_data = data_handle->mpi_data;
 	struct _starpu_mpi_data *mpi_data = data_handle->mpi_data;
 	int already_sent;
 	int already_sent;
@@ -311,11 +306,6 @@ int _starpu_mpi_cache_sent_data_get(starpu_data_handle_t data_handle, int dest)
 	return already_sent;
 	return already_sent;
 }
 }
 
 
-int starpu_mpi_cached_send(starpu_data_handle_t data_handle, int dest)
-{
-	return _starpu_mpi_cache_sent_data_get(data_handle, dest);
-}
-
 static void _starpu_mpi_cache_flush_nolock(starpu_data_handle_t data_handle)
 static void _starpu_mpi_cache_flush_nolock(starpu_data_handle_t data_handle)
 {
 {
 	struct _starpu_mpi_data *mpi_data = data_handle->mpi_data;
 	struct _starpu_mpi_data *mpi_data = data_handle->mpi_data;

+ 0 - 16
mpi/src/starpu_mpi_cache.h

@@ -32,22 +32,6 @@ void _starpu_mpi_cache_shutdown();
 void _starpu_mpi_cache_data_init(starpu_data_handle_t data_handle);
 void _starpu_mpi_cache_data_init(starpu_data_handle_t data_handle);
 void _starpu_mpi_cache_data_clear(starpu_data_handle_t data_handle);
 void _starpu_mpi_cache_data_clear(starpu_data_handle_t data_handle);
 
 
-/*
- * If the data is already available in the cache, return a pointer to the data
- * If the data is NOT available in the cache, add it to the cache and return NULL
- */
-int _starpu_mpi_cache_received_data_set(starpu_data_handle_t data);
-int _starpu_mpi_cache_received_data_get(starpu_data_handle_t data);
-void _starpu_mpi_cache_received_data_clear(starpu_data_handle_t data);
-
-/*
- * If the data is already available in the cache, return a pointer to the data
- * If the data is NOT available in the cache, add it to the cache and return NULL
- */
-int _starpu_mpi_cache_sent_data_set(starpu_data_handle_t data, int dest);
-int _starpu_mpi_cache_sent_data_get(starpu_data_handle_t data, int dest);
-void _starpu_mpi_cache_sent_data_clear(starpu_data_handle_t data);
-
 void _starpu_mpi_cache_flush(starpu_data_handle_t data_handle);
 void _starpu_mpi_cache_flush(starpu_data_handle_t data_handle);
 
 
 #ifdef __cplusplus
 #ifdef __cplusplus

+ 4 - 4
mpi/src/starpu_mpi_task_insert.c

@@ -112,7 +112,7 @@ void _starpu_mpi_exchange_data_before_execution(starpu_data_handle_t data, enum
 		if (do_execute && mpi_rank != STARPU_MPI_PER_NODE && mpi_rank != me)
 		if (do_execute && mpi_rank != STARPU_MPI_PER_NODE && mpi_rank != me)
 		{
 		{
 			/* The node is going to execute the codelet, but it does not own the data, it needs to receive the data from the owner node */
 			/* The node is going to execute the codelet, but it does not own the data, it needs to receive the data from the owner node */
-			int already_received = _starpu_mpi_cache_received_data_set(data);
+			int already_received = starpu_mpi_cached_receive_set(data);
 			if (already_received == 0)
 			if (already_received == 0)
 			{
 			{
 				if (data_tag == -1)
 				if (data_tag == -1)
@@ -126,7 +126,7 @@ void _starpu_mpi_exchange_data_before_execution(starpu_data_handle_t data, enum
 		if (!do_execute && mpi_rank == me)
 		if (!do_execute && mpi_rank == me)
 		{
 		{
 			/* The node owns the data, but another node is going to execute the codelet, the node needs to send the data to the executee node. */
 			/* The node owns the data, but another node is going to execute the codelet, the node needs to send the data to the executee node. */
-			int already_sent = _starpu_mpi_cache_sent_data_set(data, xrank);
+			int already_sent = starpu_mpi_cached_send_set(data, xrank);
 			if (already_sent == 0)
 			if (already_sent == 0)
 			{
 			{
 				if (data_tag == -1)
 				if (data_tag == -1)
@@ -182,8 +182,8 @@ void _starpu_mpi_clear_data_after_execution(starpu_data_handle_t data, enum star
 		if (mode & STARPU_W || mode & STARPU_REDUX)
 		if (mode & STARPU_W || mode & STARPU_REDUX)
 		{
 		{
 			/* The data has been modified, it MUST be removed from the cache */
 			/* The data has been modified, it MUST be removed from the cache */
-			_starpu_mpi_cache_sent_data_clear(data);
+			starpu_mpi_cached_send_clear(data);
-			_starpu_mpi_cache_received_data_clear(data);
+			starpu_mpi_cached_receive_clear(data);
 		}
 		}
 	}
 	}
 	else
 	else

+ 0 - 4
mpi/tests/Makefile.am

@@ -96,8 +96,6 @@ starpu_mpi_TESTS =
 
 
 starpu_mpi_TESTS +=				\
 starpu_mpi_TESTS +=				\
 	broadcast				\
 	broadcast				\
-	cache					\
-	cache_disable				\
 	callback				\
 	callback				\
 	driver					\
 	driver					\
 	early_request				\
 	early_request				\
@@ -192,8 +190,6 @@ noinst_PROGRAMS +=				\
 	block_interface_pinned			\
 	block_interface_pinned			\
 	attr					\
 	attr					\
 	broadcast				\
 	broadcast				\
-	cache					\
-	cache_disable				\
 	callback				\
 	callback				\
 	matrix					\
 	matrix					\
 	matrix2					\
 	matrix2					\