瀏覽代碼

mpi: move cache functionalities to starpu_mpi_cache

Nathalie Furmento 11 年之前
父節點
當前提交
1e9e024d8d
共有 4 個文件被更改,包括 288 次插入211 次删除
  1. 2 0
      mpi/src/Makefile.am
  2. 242 0
      mpi/src/starpu_mpi_cache.c
  3. 41 0
      mpi/src/starpu_mpi_cache.h
  4. 3 211
      mpi/src/starpu_mpi_task_insert.c

+ 2 - 0
mpi/src/Makefile.am

@@ -38,6 +38,7 @@ noinst_HEADERS =					\
 	starpu_mpi_stats.h				\
 	starpu_mpi_task_insert.h			\
 	starpu_mpi_datatype.h				\
+	starpu_mpi_cache.h				\
 	starpu_mpi_cache_stats.h
 
 libstarpumpi_@STARPU_EFFECTIVE_VERSION@_la_SOURCES =	\
@@ -48,6 +49,7 @@ libstarpumpi_@STARPU_EFFECTIVE_VERSION@_la_SOURCES =	\
 	starpu_mpi_collective.c				\
 	starpu_mpi_stats.c				\
 	starpu_mpi_private.c				\
+	starpu_mpi_cache.c				\
 	starpu_mpi_cache_stats.c
 
 showcheck:

+ 242 - 0
mpi/src/starpu_mpi_cache.c

@@ -0,0 +1,242 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2011, 2012, 2013, 2014  Centre National de la Recherche Scientifique
+ * Copyright (C) 2011-2014  Université de Bordeaux 1
+ * Copyright (C) 2014 INRIA
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <starpu.h>
+#include <common/uthash.h>
+
+#include <starpu_mpi_cache.h>
+#include <starpu_mpi_cache_stats.h>
+#include <starpu_mpi_private.h>
+
+/* Whether we are allowed to keep copies of remote data. */
+struct _starpu_data_entry
+{
+	UT_hash_handle hh;
+	void *data;
+};
+
+static struct _starpu_data_entry **_cache_sent_data = NULL;
+static struct _starpu_data_entry **_cache_received_data = NULL;
+int _cache_enabled=1;
+
+void _starpu_mpi_cache_init(MPI_Comm comm)
+{
+	int nb_nodes;
+	int i;
+
+	_cache_enabled = starpu_get_env_number("STARPU_MPI_CACHE");
+	if (_cache_enabled == -1)
+	{
+		_cache_enabled = 1;
+	}
+
+	if (_cache_enabled == 0)
+	{
+		if (!getenv("STARPU_SILENT")) fprintf(stderr,"Warning: StarPU MPI Communication cache is disabled\n");
+		return;
+	}
+
+	MPI_Comm_size(comm, &nb_nodes);
+	_STARPU_MPI_DEBUG(2, "Initialising htable for cache\n");
+	_cache_sent_data = malloc(nb_nodes * sizeof(struct _starpu_data_entry *));
+	for(i=0 ; i<nb_nodes ; i++) _cache_sent_data[i] = NULL;
+	_cache_received_data = malloc(nb_nodes * sizeof(struct _starpu_data_entry *));
+	for(i=0 ; i<nb_nodes ; i++) _cache_received_data[i] = NULL;
+	_starpu_mpi_cache_stats_init(comm);
+}
+
+static
+void _starpu_mpi_cache_empty_tables(int world_size)
+{
+	int i;
+
+	if (_cache_enabled == 0) return;
+
+	_STARPU_MPI_DEBUG(2, "Clearing htable for cache\n");
+
+	for(i=0 ; i<world_size ; i++)
+	{
+		struct _starpu_data_entry *entry, *tmp;
+		HASH_ITER(hh, _cache_sent_data[i], entry, tmp)
+		{
+			HASH_DEL(_cache_sent_data[i], entry);
+			free(entry);
+		}
+		HASH_ITER(hh, _cache_received_data[i], entry, tmp)
+		{
+			HASH_DEL(_cache_received_data[i], entry);
+			_starpu_mpi_cache_stats_dec(-1, i, (starpu_data_handle_t) entry->data);
+			free(entry);
+		}
+	}
+}
+
+void _starpu_mpi_cache_free(int world_size)
+{
+	if (_cache_enabled == 0) return;
+
+	_starpu_mpi_cache_empty_tables(world_size);
+	free(_cache_sent_data);
+	free(_cache_received_data);
+}
+
+void _starpu_mpi_cache_flush_sent(MPI_Comm comm, starpu_data_handle_t data)
+{
+	int n, size;
+	MPI_Comm_size(comm, &size);
+
+	for(n=0 ; n<size ; n++)
+	{
+		struct _starpu_data_entry *already_sent;
+		HASH_FIND_PTR(_cache_sent_data[n], &data, already_sent);
+		if (already_sent)
+		{
+			_STARPU_MPI_DEBUG(2, "Clearing send cache for data %p\n", data);
+			HASH_DEL(_cache_sent_data[n], already_sent);
+			free(already_sent);
+		}
+	}
+}
+
+void _starpu_mpi_cache_flush_recv(starpu_data_handle_t data, int me)
+{
+	int mpi_rank = starpu_data_get_rank(data);
+	struct _starpu_data_entry *already_received;
+
+	HASH_FIND_PTR(_cache_received_data[mpi_rank], &data, already_received);
+	if (already_received)
+	{
+#ifdef STARPU_DEVEL
+#  warning TODO: Somebody else will write to the data, so discard our cached copy if any. starpu_mpi could just remember itself.
+#endif
+		_STARPU_MPI_DEBUG(2, "Clearing receive cache for data %p\n", data);
+		HASH_DEL(_cache_received_data[mpi_rank], already_received);
+		_starpu_mpi_cache_stats_dec(me, mpi_rank, data);
+		free(already_received);
+		starpu_data_invalidate_submit(data);
+	}
+}
+
+void starpu_mpi_cache_flush_all_data(MPI_Comm comm)
+{
+	int nb_nodes, i;
+	int mpi_rank, my_rank;
+
+	if (_cache_enabled == 0) return;
+
+	MPI_Comm_size(comm, &nb_nodes);
+	MPI_Comm_rank(comm, &my_rank);
+
+	for(i=0 ; i<nb_nodes ; i++)
+	{
+		struct _starpu_data_entry *entry, *tmp;
+		HASH_ITER(hh, _cache_sent_data[i], entry, tmp)
+		{
+			mpi_rank = starpu_data_get_rank((starpu_data_handle_t) entry->data);
+			if (mpi_rank != my_rank && mpi_rank != -1)
+				starpu_data_invalidate_submit((starpu_data_handle_t) entry->data);
+			HASH_DEL(_cache_sent_data[i], entry);
+			free(entry);
+		}
+		HASH_ITER(hh, _cache_received_data[i], entry, tmp)
+		{
+			mpi_rank = starpu_data_get_rank((starpu_data_handle_t) entry->data);
+			if (mpi_rank != my_rank && mpi_rank != -1)
+				starpu_data_invalidate_submit((starpu_data_handle_t) entry->data);
+			HASH_DEL(_cache_received_data[i], entry);
+			_starpu_mpi_cache_stats_dec(my_rank, i, (starpu_data_handle_t) entry->data);
+			free(entry);
+		}
+	}
+}
+
+void starpu_mpi_cache_flush(MPI_Comm comm, starpu_data_handle_t data_handle)
+{
+	struct _starpu_data_entry *avail;
+	int i, my_rank, nb_nodes;
+	int mpi_rank;
+
+	if (_cache_enabled == 0) return;
+
+	MPI_Comm_size(comm, &nb_nodes);
+	MPI_Comm_rank(comm, &my_rank);
+	mpi_rank = starpu_data_get_rank(data_handle);
+
+	for(i=0 ; i<nb_nodes ; i++)
+	{
+		HASH_FIND_PTR(_cache_sent_data[i], &data_handle, avail);
+		if (avail)
+		{
+			_STARPU_MPI_DEBUG(2, "Clearing send cache for data %p\n", data_handle);
+			HASH_DEL(_cache_sent_data[i], avail);
+			free(avail);
+		}
+		HASH_FIND_PTR(_cache_received_data[i], &data_handle, avail);
+		if (avail)
+		{
+			_STARPU_MPI_DEBUG(2, "Clearing send cache for data %p\n", data_handle);
+			HASH_DEL(_cache_received_data[i], avail);
+			_starpu_mpi_cache_stats_dec(my_rank, i, data_handle);
+			free(avail);
+		}
+	}
+
+	if (mpi_rank != my_rank && mpi_rank != -1)
+		starpu_data_invalidate_submit(data_handle);
+}
+
+void *_starpu_mpi_already_received(int src, starpu_data_handle_t data, int mpi_rank)
+{
+	if (_cache_enabled == 0) return NULL;
+
+	struct _starpu_data_entry *already_received;
+	HASH_FIND_PTR(_cache_received_data[mpi_rank], &data, already_received);
+	if (already_received == NULL)
+	{
+		struct _starpu_data_entry *entry = (struct _starpu_data_entry *)malloc(sizeof(*entry));
+		entry->data = data;
+		HASH_ADD_PTR(_cache_received_data[mpi_rank], data, entry);
+		_starpu_mpi_cache_stats_inc(src, mpi_rank, data);
+	}
+	else
+	{
+		_STARPU_MPI_DEBUG(2, "Do not receive data %p from node %d as it is already available\n", data, mpi_rank);
+	}
+	return already_received;
+}
+
+void *_starpu_mpi_already_sent(starpu_data_handle_t data, int dest)
+{
+	if (_cache_enabled == 0) return NULL;
+
+	struct _starpu_data_entry *already_sent;
+	HASH_FIND_PTR(_cache_sent_data[dest], &data, already_sent);
+	if (already_sent == NULL)
+	{
+		struct _starpu_data_entry *entry = (struct _starpu_data_entry *)malloc(sizeof(*entry));
+		entry->data = data;
+		HASH_ADD_PTR(_cache_sent_data[dest], data, entry);
+		_STARPU_MPI_DEBUG(2, "Noting that data %p has already been sent to %d\n", data, dest);
+	}
+	else
+	{
+		_STARPU_MPI_DEBUG(2, "Do not send data %p to node %d as it has already been sent\n", data, dest);
+	}
+	return already_sent;
+}
+

+ 41 - 0
mpi/src/starpu_mpi_cache.h

@@ -0,0 +1,41 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2011, 2012, 2013, 2014  Centre National de la Recherche Scientifique
+ * Copyright (C) 2011-2014  Université de Bordeaux 1
+ * Copyright (C) 2014 INRIA
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#ifndef __STARPU_MPI_CACHE_H__
+#define __STARPU_MPI_CACHE_H__
+
+#include <starpu.h>
+#include <stdlib.h>
+#include <mpi.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+extern int _cache_enabled;
+void _starpu_mpi_cache_init(MPI_Comm comm);
+void *_starpu_mpi_already_received(int src, starpu_data_handle_t data, int mpi_rank);
+void *_starpu_mpi_already_sent(starpu_data_handle_t data, int dest);
+void _starpu_mpi_cache_flush_sent(MPI_Comm comm, starpu_data_handle_t data);
+void _starpu_mpi_cache_flush_recv(starpu_data_handle_t data, int me);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // __STARPU_MPI_CACHE_H__

+ 3 - 211
mpi/src/starpu_mpi_task_insert.c

@@ -22,196 +22,13 @@
 #include <starpu.h>
 #include <starpu_data.h>
 #include <common/utils.h>
-#include <common/uthash.h>
 #include <util/starpu_task_insert_utils.h>
 #include <datawizard/coherency.h>
 #include <core/task.h>
 
-#include <starpu_mpi_cache_stats.h>
 #include <starpu_mpi_private.h>
 #include <starpu_mpi_task_insert.h>
-
-/* Whether we are allowed to keep copies of remote data. */
-struct _starpu_data_entry
-{
-	UT_hash_handle hh;
-	void *data;
-};
-
-static struct _starpu_data_entry **_cache_sent_data = NULL;
-static struct _starpu_data_entry **_cache_received_data = NULL;
-static int _cache_enabled=1;
-
-void _starpu_mpi_cache_init(MPI_Comm comm)
-{
-	int nb_nodes;
-	int i;
-
-	_cache_enabled = starpu_get_env_number("STARPU_MPI_CACHE");
-	if (_cache_enabled == -1)
-	{
-		_cache_enabled = 1;
-	}
-
-	if (_cache_enabled == 0)
-	{
-		if (!getenv("STARPU_SILENT")) fprintf(stderr,"Warning: StarPU MPI Communication cache is disabled\n");
-		return;
-	}
-
-	MPI_Comm_size(comm, &nb_nodes);
-	_STARPU_MPI_DEBUG(2, "Initialising htable for cache\n");
-	_cache_sent_data = malloc(nb_nodes * sizeof(struct _starpu_data_entry *));
-	for(i=0 ; i<nb_nodes ; i++) _cache_sent_data[i] = NULL;
-	_cache_received_data = malloc(nb_nodes * sizeof(struct _starpu_data_entry *));
-	for(i=0 ; i<nb_nodes ; i++) _cache_received_data[i] = NULL;
-	_starpu_mpi_cache_stats_init(comm);
-}
-
-static
-void _starpu_mpi_cache_empty_tables(int world_size)
-{
-	int i;
-
-	if (_cache_enabled == 0) return;
-
-	_STARPU_MPI_DEBUG(2, "Clearing htable for cache\n");
-
-	for(i=0 ; i<world_size ; i++)
-	{
-		struct _starpu_data_entry *entry, *tmp;
-		HASH_ITER(hh, _cache_sent_data[i], entry, tmp)
-		{
-			HASH_DEL(_cache_sent_data[i], entry);
-			free(entry);
-		}
-		HASH_ITER(hh, _cache_received_data[i], entry, tmp)
-		{
-			HASH_DEL(_cache_received_data[i], entry);
-			_starpu_mpi_cache_stats_dec(-1, i, (starpu_data_handle_t) entry->data);
-			free(entry);
-		}
-	}
-}
-
-void _starpu_mpi_cache_free(int world_size)
-{
-	if (_cache_enabled == 0) return;
-
-	_starpu_mpi_cache_empty_tables(world_size);
-	free(_cache_sent_data);
-	free(_cache_received_data);
-}
-
-void starpu_mpi_cache_flush_all_data(MPI_Comm comm)
-{
-	int nb_nodes, i;
-	int mpi_rank, my_rank;
-
-	if (_cache_enabled == 0) return;
-
-	MPI_Comm_size(comm, &nb_nodes);
-	MPI_Comm_rank(comm, &my_rank);
-
-	for(i=0 ; i<nb_nodes ; i++)
-	{
-		struct _starpu_data_entry *entry, *tmp;
-		HASH_ITER(hh, _cache_sent_data[i], entry, tmp)
-		{
-			mpi_rank = starpu_data_get_rank((starpu_data_handle_t) entry->data);
-			if (mpi_rank != my_rank && mpi_rank != -1)
-				starpu_data_invalidate_submit((starpu_data_handle_t) entry->data);
-			HASH_DEL(_cache_sent_data[i], entry);
-			free(entry);
-		}
-		HASH_ITER(hh, _cache_received_data[i], entry, tmp)
-		{
-			mpi_rank = starpu_data_get_rank((starpu_data_handle_t) entry->data);
-			if (mpi_rank != my_rank && mpi_rank != -1)
-				starpu_data_invalidate_submit((starpu_data_handle_t) entry->data);
-			HASH_DEL(_cache_received_data[i], entry);
-			_starpu_mpi_cache_stats_dec(my_rank, i, (starpu_data_handle_t) entry->data);
-			free(entry);
-		}
-	}
-}
-
-void starpu_mpi_cache_flush(MPI_Comm comm, starpu_data_handle_t data_handle)
-{
-	struct _starpu_data_entry *avail;
-	int i, my_rank, nb_nodes;
-	int mpi_rank;
-
-	if (_cache_enabled == 0) return;
-
-	MPI_Comm_size(comm, &nb_nodes);
-	MPI_Comm_rank(comm, &my_rank);
-	mpi_rank = starpu_data_get_rank(data_handle);
-
-	for(i=0 ; i<nb_nodes ; i++)
-	{
-		HASH_FIND_PTR(_cache_sent_data[i], &data_handle, avail);
-		if (avail)
-		{
-			_STARPU_MPI_DEBUG(2, "Clearing send cache for data %p\n", data_handle);
-			HASH_DEL(_cache_sent_data[i], avail);
-			free(avail);
-		}
-		HASH_FIND_PTR(_cache_received_data[i], &data_handle, avail);
-		if (avail)
-		{
-			_STARPU_MPI_DEBUG(2, "Clearing send cache for data %p\n", data_handle);
-			HASH_DEL(_cache_received_data[i], avail);
-			_starpu_mpi_cache_stats_dec(my_rank, i, data_handle);
-			free(avail);
-		}
-	}
-
-	if (mpi_rank != my_rank && mpi_rank != -1)
-		starpu_data_invalidate_submit(data_handle);
-}
-
-static
-void *_starpu_mpi_already_received(int src, starpu_data_handle_t data, int mpi_rank)
-{
-	if (_cache_enabled == 0) return NULL;
-
-	struct _starpu_data_entry *already_received;
-	HASH_FIND_PTR(_cache_received_data[mpi_rank], &data, already_received);
-	if (already_received == NULL)
-	{
-		struct _starpu_data_entry *entry = (struct _starpu_data_entry *)malloc(sizeof(*entry));
-		entry->data = data;
-		HASH_ADD_PTR(_cache_received_data[mpi_rank], data, entry);
-		_starpu_mpi_cache_stats_inc(src, mpi_rank, data);
-	}
-	else
-	{
-		_STARPU_MPI_DEBUG(2, "Do not receive data %p from node %d as it is already available\n", data, mpi_rank);
-	}
-	return already_received;
-}
-
-static
-void *_starpu_mpi_already_sent(starpu_data_handle_t data, int dest)
-{
-	if (_cache_enabled == 0) return NULL;
-
-	struct _starpu_data_entry *already_sent;
-	HASH_FIND_PTR(_cache_sent_data[dest], &data, already_sent);
-	if (already_sent == NULL)
-	{
-		struct _starpu_data_entry *entry = (struct _starpu_data_entry *)malloc(sizeof(*entry));
-		entry->data = data;
-		HASH_ADD_PTR(_cache_sent_data[dest], data, entry);
-		_STARPU_MPI_DEBUG(2, "Noting that data %p has already been sent to %d\n", data, dest);
-	}
-	else
-	{
-		_STARPU_MPI_DEBUG(2, "Do not send data %p to node %d as it has already been sent\n", data, dest);
-	}
-	return already_sent;
-}
+#include <starpu_mpi_cache.h>
 
 static
 int _starpu_mpi_find_executee_node(starpu_data_handle_t data, enum starpu_data_access_mode mode, int me, int *do_execute, int *inconsistent_execute, int *dest, size_t *size_on_nodes)
@@ -356,36 +173,11 @@ void _starpu_mpi_clear_data_after_execution(starpu_data_handle_t data, enum star
 			if (do_execute)
 			{
 				/* Note that all copies I've sent to neighbours are now invalid */
-				int n, size;
-				MPI_Comm_size(comm, &size);
-				for(n=0 ; n<size ; n++)
-				{
-					struct _starpu_data_entry *already_sent;
-					HASH_FIND_PTR(_cache_sent_data[n], &data, already_sent);
-					if (already_sent)
-					{
-						_STARPU_MPI_DEBUG(2, "Clearing send cache for data %p\n", data);
-						HASH_DEL(_cache_sent_data[n], already_sent);
-						free(already_sent);
-					}
-				}
+				_starpu_mpi_cache_flush_sent(comm, data);
 			}
 			else
 			{
-				int mpi_rank = starpu_data_get_rank(data);
-				struct _starpu_data_entry *already_received;
-				HASH_FIND_PTR(_cache_received_data[mpi_rank], &data, already_received);
-				if (already_received)
-				{
-#ifdef STARPU_DEVEL
-#  warning TODO: Somebody else will write to the data, so discard our cached copy if any. starpu_mpi could just remember itself.
-#endif
-					_STARPU_MPI_DEBUG(2, "Clearing receive cache for data %p\n", data);
-					HASH_DEL(_cache_received_data[mpi_rank], already_received);
-					_starpu_mpi_cache_stats_dec(me, mpi_rank, data);
-					free(already_received);
-					starpu_data_invalidate_submit(data);
-				}
+				_starpu_mpi_cache_flush_recv(data, me);
 			}
 		}
 	}