소스 검색

mpi: move all functionalities related to early received data in starpu_mpi_early_data

Nathalie Furmento 11 년 전
부모
커밋
ddaf047511
4개의 변경된 파일231개의 추가작업 그리고 146개의 파일을 삭제
  1. 4 2
      mpi/src/Makefile.am
  2. 4 144
      mpi/src/starpu_mpi.c
  3. 167 0
      mpi/src/starpu_mpi_early_data.c
  4. 56 0
      mpi/src/starpu_mpi_early_data.h

+ 4 - 2
mpi/src/Makefile.am

@@ -39,7 +39,8 @@ noinst_HEADERS =					\
 	starpu_mpi_task_insert.h			\
 	starpu_mpi_datatype.h				\
 	starpu_mpi_cache.h				\
-	starpu_mpi_cache_stats.h
+	starpu_mpi_cache_stats.h			\
+	starpu_mpi_early_data.h
 
 libstarpumpi_@STARPU_EFFECTIVE_VERSION@_la_SOURCES =	\
 	starpu_mpi.c					\
@@ -50,7 +51,8 @@ libstarpumpi_@STARPU_EFFECTIVE_VERSION@_la_SOURCES =	\
 	starpu_mpi_stats.c				\
 	starpu_mpi_private.c				\
 	starpu_mpi_cache.c				\
-	starpu_mpi_cache_stats.c
+	starpu_mpi_cache_stats.c			\
+	starpu_mpi_early_data.c
 
 showcheck:
 	-cat /dev/null

+ 4 - 144
mpi/src/starpu_mpi.c

@@ -22,6 +22,7 @@
 #include <starpu_profiling.h>
 #include <starpu_mpi_stats.h>
 #include <starpu_mpi_task_insert.h>
+#include <starpu_mpi_early_data.h>
 #include <common/config.h>
 #include <common/thread.h>
 #include <datawizard/interfaces/data_interface.h>
@@ -65,25 +66,6 @@ static int posted_requests = 0, newer_requests, barrier_running = 0;
 
 #define _STARPU_MPI_INC_POSTED_REQUESTS(value) { STARPU_PTHREAD_MUTEX_LOCK(&mutex_posted_requests); posted_requests += value; STARPU_PTHREAD_MUTEX_UNLOCK(&mutex_posted_requests); }
 
-LIST_TYPE(_starpu_mpi_copy_handle,
-	  starpu_data_handle_t handle;
-	  struct _starpu_mpi_envelope *env;
-	  struct _starpu_mpi_req *req;
-	  void *buffer;
-	  int mpi_tag;
-	  int source;
-	  int req_ready;
-	  starpu_pthread_mutex_t req_mutex;
-	  starpu_pthread_cond_t req_cond;
-);
-
-struct _starpu_mpi_copy_handle_hashlist
-{
-	struct _starpu_mpi_copy_handle_list *list;
-	UT_hash_handle hh;
-	int mpi_tag;
-};
-
 /********************************************************/
 /*                                                      */
 /*  Hashmap's requests functionalities                  */
@@ -93,9 +75,6 @@ struct _starpu_mpi_copy_handle_hashlist
 /** stores application requests for which data have not been received yet */
 static struct _starpu_mpi_req **_starpu_mpi_app_req_hashmap = NULL;
 static int _starpu_mpi_app_req_hashmap_count = 0;
-/** stores data which have been received by MPI but have not been requested by the application */
-static struct _starpu_mpi_copy_handle_hashlist **_starpu_mpi_copy_handle_hashmap = NULL;
-static int _starpu_mpi_copy_handle_hashmap_count = 0;
 
 static struct _starpu_mpi_req* find_app_req(int mpi_tag, int source)
 {
@@ -151,105 +130,6 @@ static void delete_app_req(struct _starpu_mpi_req *req)
 	}
 }
 
-#ifdef STARPU_VERBOSE
-static void _starpu_mpi_copy_handle_display_hash(int source, int tag)
-{
-	struct _starpu_mpi_copy_handle_hashlist *hashlist;
-	HASH_FIND_INT(_starpu_mpi_copy_handle_hashmap[source], &tag, hashlist);
-
-	if (hashlist == NULL)
-	{
-		_STARPU_MPI_DEBUG(60, "Hashlist for source %d and tag %d does not exist\n", source, tag);
-	}
-	else if (_starpu_mpi_copy_handle_list_empty(hashlist->list))
-	{
-		_STARPU_MPI_DEBUG(60, "Hashlist for source %d and tag %d is empty\n", source, tag);
-	}
-	else
-	{
-		struct _starpu_mpi_copy_handle *cur;
-		for (cur = _starpu_mpi_copy_handle_list_begin(hashlist->list) ;
-		     cur != _starpu_mpi_copy_handle_list_end(hashlist->list);
-		     cur = _starpu_mpi_copy_handle_list_next(cur))
-		{
-			_STARPU_MPI_DEBUG(60, "Element for source %d and tag %d: %p\n", source, tag, cur);
-		}
-	}
-}
-#endif
-
-static struct _starpu_mpi_copy_handle *pop_chandle(int mpi_tag, int source, int delete)
-{
-	struct _starpu_mpi_copy_handle_hashlist *hashlist;
-	struct _starpu_mpi_copy_handle *chandle;
-
-	_STARPU_MPI_DEBUG(60, "Looking for chandle with tag %d in the hashmap[%d]\n", mpi_tag, source);
-	HASH_FIND_INT(_starpu_mpi_copy_handle_hashmap[source], &mpi_tag, hashlist);
-	if (hashlist == NULL)
-	{
-		chandle = NULL;
-	}
-	else
-	{
-		if (_starpu_mpi_copy_handle_list_empty(hashlist->list))
-		{
-			chandle = NULL;
-		}
-		else
-		{
-			if (delete == 1)
-			{
-				chandle = _starpu_mpi_copy_handle_list_pop_front(hashlist->list);
-			}
-			else
-			{
-				chandle = _starpu_mpi_copy_handle_list_front(hashlist->list);
-			}
-		}
-	}
-	_STARPU_MPI_DEBUG(60, "Found chandle %p with tag %d in the hashmap[%d]\n", chandle, mpi_tag, source);
-	return chandle;
-}
-
-static struct _starpu_mpi_copy_handle *find_chandle(int mpi_tag, int source)
-{
-	return pop_chandle(mpi_tag, source, 0);
-}
-
-static void add_chandle(struct _starpu_mpi_copy_handle *chandle)
-{
-	_STARPU_MPI_DEBUG(60, "Trying to add chandle %p with tag %d in the hashmap[%d]\n", chandle, chandle->mpi_tag, chandle->source);
-
-	struct _starpu_mpi_copy_handle_hashlist *hashlist;
-	HASH_FIND_INT(_starpu_mpi_copy_handle_hashmap[chandle->source], &chandle->mpi_tag, hashlist);
-	if (hashlist == NULL)
-	{
-		hashlist = malloc(sizeof(struct _starpu_mpi_copy_handle_hashlist));
-		hashlist->list = _starpu_mpi_copy_handle_list_new();
-		hashlist->mpi_tag = chandle->mpi_tag;
-		HASH_ADD_INT(_starpu_mpi_copy_handle_hashmap[chandle->source], mpi_tag, hashlist);
-	}
-	_starpu_mpi_copy_handle_list_push_back(hashlist->list, chandle);
-	_starpu_mpi_copy_handle_hashmap_count ++;
-#ifdef STARPU_VERBOSE
-	_starpu_mpi_copy_handle_display_hash(chandle->source, chandle->mpi_tag);
-#endif
-}
-
-static void delete_chandle(struct _starpu_mpi_copy_handle *chandle)
-{
-	_STARPU_MPI_DEBUG(60, "Trying to delete chandle %p with tag %d in the hashmap[%d]\n", chandle, chandle->mpi_tag, chandle->source);
-	struct _starpu_mpi_copy_handle *found = pop_chandle(chandle->mpi_tag, chandle->source, 1);
-
-	STARPU_ASSERT_MSG(found == chandle,
-			  "Error delete_chandle : chandle %p with tag %d is NOT in the hashmap[%d]\n", chandle, chandle->mpi_tag, chandle->source);
-
-	_starpu_mpi_copy_handle_hashmap_count --;
-#ifdef STARPU_VERBOSE
-	_starpu_mpi_copy_handle_display_hash(chandle->source, chandle->mpi_tag);
-#endif
-}
-
 static void _starpu_mpi_request_init(struct _starpu_mpi_req *req)
 {
 	/* Initialize the request structure */
@@ -1257,8 +1137,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 		MPI_Comm_size(MPI_COMM_WORLD, &nb_nodes);
 		_starpu_mpi_app_req_hashmap = malloc(nb_nodes * sizeof(struct _starpu_mpi_req *));
 		for(k=0 ; k<nb_nodes ; k++) _starpu_mpi_app_req_hashmap[k] = NULL;
-		_starpu_mpi_copy_handle_hashmap = malloc(nb_nodes * sizeof(struct _starpu_mpi_copy_handle_hash_list *));
-		for(k=0 ; k<nb_nodes ; k++) _starpu_mpi_copy_handle_hashmap[k] = NULL;
+		_starpu_mpi_early_data_init(nb_nodes);
 	}
 
 	/* notify the main thread that the progression thread is ready */
@@ -1449,7 +1328,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 	STARPU_ASSERT_MSG(_starpu_mpi_req_list_empty(new_requests), "List of new requests not empty");
 	STARPU_ASSERT_MSG(posted_requests == 0, "Number of posted request is not zero");
 	STARPU_ASSERT_MSG(_starpu_mpi_app_req_hashmap_count == 0, "Number of receive requests left is not zero");
-	STARPU_ASSERT_MSG(_starpu_mpi_copy_handle_hashmap_count == 0, "Number of copy requests left is not zero");
+	_starpu_mpi_early_data_check_termination();
 
 	if (argc_argv->initialize_mpi)
 	{
@@ -1459,27 +1338,8 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 
 	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
 
-	{
-		int n;
-		struct _starpu_mpi_copy_handle_hashlist *hashlist;
-
-		for(n=0 ; n<worldsize; n++)
-		{
-			for(hashlist=_starpu_mpi_copy_handle_hashmap[n]; hashlist != NULL; hashlist=hashlist->hh.next)
-			{
-				_starpu_mpi_copy_handle_list_delete(hashlist->list);
-			}
-			struct _starpu_mpi_copy_handle_hashlist *current, *tmp;
-			HASH_ITER(hh, _starpu_mpi_copy_handle_hashmap[n], current, tmp)
-			{
-				HASH_DEL(_starpu_mpi_copy_handle_hashmap[n], current);
-				free(current);
-			}
-		}
-	}
-
+	_starpu_mpi_early_data_free(worldsize);
 	free(_starpu_mpi_app_req_hashmap);
-	free(_starpu_mpi_copy_handle_hashmap);
 	free(argc_argv);
 	free(recv_env);
 

+ 167 - 0
mpi/src/starpu_mpi_early_data.c

@@ -0,0 +1,167 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2009, 2010-2014  Université de Bordeaux 1
+ * Copyright (C) 2010, 2011, 2012, 2013, 2014  Centre National de la Recherche Scientifique
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <stdlib.h>
+#include <starpu_mpi.h>
+#include <starpu_mpi_early_data.h>
+#include <starpu_mpi_private.h>
+#include <common/uthash.h>
+
+struct _starpu_mpi_copy_handle_hashlist
+{
+	struct _starpu_mpi_copy_handle_list *list;
+	UT_hash_handle hh;
+	int mpi_tag;
+};
+
+/** stores data which have been received by MPI but have not been requested by the application */
+static struct _starpu_mpi_copy_handle_hashlist **_starpu_mpi_copy_handle_hashmap = NULL;
+static int _starpu_mpi_copy_handle_hashmap_count = 0;
+
+void _starpu_mpi_early_data_init(int world_size)
+{
+	int k;
+
+	_starpu_mpi_copy_handle_hashmap = malloc(world_size * sizeof(struct _starpu_mpi_copy_handle_hash_list *));
+	for(k=0 ; k<world_size ; k++) _starpu_mpi_copy_handle_hashmap[k] = NULL;
+}
+
+void _starpu_mpi_early_data_check_termination()
+{
+	STARPU_ASSERT_MSG(_starpu_mpi_copy_handle_hashmap_count == 0, "Number of copy requests left is not zero");
+}
+
+void _starpu_mpi_early_data_free(int world_size)
+{
+	int n;
+	struct _starpu_mpi_copy_handle_hashlist *hashlist;
+
+	for(n=0 ; n<world_size; n++)
+	{
+		for(hashlist=_starpu_mpi_copy_handle_hashmap[n]; hashlist != NULL; hashlist=hashlist->hh.next)
+		{
+			_starpu_mpi_copy_handle_list_delete(hashlist->list);
+		}
+		struct _starpu_mpi_copy_handle_hashlist *current, *tmp;
+		HASH_ITER(hh, _starpu_mpi_copy_handle_hashmap[n], current, tmp)
+		{
+			HASH_DEL(_starpu_mpi_copy_handle_hashmap[n], current);
+			free(current);
+		}
+	}
+	free(_starpu_mpi_copy_handle_hashmap);
+}
+
+#ifdef STARPU_VERBOSE
+static void _starpu_mpi_copy_handle_display_hash(int source, int tag)
+{
+	struct _starpu_mpi_copy_handle_hashlist *hashlist;
+	HASH_FIND_INT(_starpu_mpi_copy_handle_hashmap[source], &tag, hashlist);
+
+	if (hashlist == NULL)
+	{
+		_STARPU_MPI_DEBUG(60, "Hashlist for source %d and tag %d does not exist\n", source, tag);
+	}
+	else if (_starpu_mpi_copy_handle_list_empty(hashlist->list))
+	{
+		_STARPU_MPI_DEBUG(60, "Hashlist for source %d and tag %d is empty\n", source, tag);
+	}
+	else
+	{
+		struct _starpu_mpi_copy_handle *cur;
+		for (cur = _starpu_mpi_copy_handle_list_begin(hashlist->list) ;
+		     cur != _starpu_mpi_copy_handle_list_end(hashlist->list);
+		     cur = _starpu_mpi_copy_handle_list_next(cur))
+		{
+			_STARPU_MPI_DEBUG(60, "Element for source %d and tag %d: %p\n", source, tag, cur);
+		}
+	}
+}
+#endif
+
+struct _starpu_mpi_copy_handle *find_chandle(int mpi_tag, int source)
+{
+	return pop_chandle(mpi_tag, source, 0);
+}
+
+void add_chandle(struct _starpu_mpi_copy_handle *chandle)
+{
+	_STARPU_MPI_DEBUG(60, "Trying to add chandle %p with tag %d in the hashmap[%d]\n", chandle, chandle->mpi_tag, chandle->source);
+
+	struct _starpu_mpi_copy_handle_hashlist *hashlist;
+	HASH_FIND_INT(_starpu_mpi_copy_handle_hashmap[chandle->source], &chandle->mpi_tag, hashlist);
+	if (hashlist == NULL)
+	{
+		hashlist = malloc(sizeof(struct _starpu_mpi_copy_handle_hashlist));
+		hashlist->list = _starpu_mpi_copy_handle_list_new();
+		hashlist->mpi_tag = chandle->mpi_tag;
+		HASH_ADD_INT(_starpu_mpi_copy_handle_hashmap[chandle->source], mpi_tag, hashlist);
+	}
+	_starpu_mpi_copy_handle_list_push_back(hashlist->list, chandle);
+	_starpu_mpi_copy_handle_hashmap_count ++;
+#ifdef STARPU_VERBOSE
+	_starpu_mpi_copy_handle_display_hash(chandle->source, chandle->mpi_tag);
+#endif
+}
+
+void delete_chandle(struct _starpu_mpi_copy_handle *chandle)
+{
+	_STARPU_MPI_DEBUG(60, "Trying to delete chandle %p with tag %d in the hashmap[%d]\n", chandle, chandle->mpi_tag, chandle->source);
+	struct _starpu_mpi_copy_handle *found = pop_chandle(chandle->mpi_tag, chandle->source, 1);
+
+	STARPU_ASSERT_MSG(found == chandle,
+			  "Error delete_chandle : chandle %p with tag %d is NOT in the hashmap[%d]\n", chandle, chandle->mpi_tag, chandle->source);
+
+	_starpu_mpi_copy_handle_hashmap_count --;
+#ifdef STARPU_VERBOSE
+	_starpu_mpi_copy_handle_display_hash(chandle->source, chandle->mpi_tag);
+#endif
+}
+
+struct _starpu_mpi_copy_handle *pop_chandle(int mpi_tag, int source, int delete)
+{
+	struct _starpu_mpi_copy_handle_hashlist *hashlist;
+	struct _starpu_mpi_copy_handle *chandle;
+
+	_STARPU_MPI_DEBUG(60, "Looking for chandle with tag %d in the hashmap[%d]\n", mpi_tag, source);
+	HASH_FIND_INT(_starpu_mpi_copy_handle_hashmap[source], &mpi_tag, hashlist);
+	if (hashlist == NULL)
+	{
+		chandle = NULL;
+	}
+	else
+	{
+		if (_starpu_mpi_copy_handle_list_empty(hashlist->list))
+		{
+			chandle = NULL;
+		}
+		else
+		{
+			if (delete == 1)
+			{
+				chandle = _starpu_mpi_copy_handle_list_pop_front(hashlist->list);
+			}
+			else
+			{
+				chandle = _starpu_mpi_copy_handle_list_front(hashlist->list);
+			}
+		}
+	}
+	_STARPU_MPI_DEBUG(60, "Found chandle %p with tag %d in the hashmap[%d]\n", chandle, mpi_tag, source);
+	return chandle;
+}
+

+ 56 - 0
mpi/src/starpu_mpi_early_data.h

@@ -0,0 +1,56 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2009, 2010-2014  Université de Bordeaux 1
+ * Copyright (C) 2010, 2011, 2012, 2013, 2014  Centre National de la Recherche Scientifique
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#ifndef __STARPU_MPI_EARLY_DATA_H__
+#define __STARPU_MPI_EARLY_DATA_H__
+
+#include <starpu.h>
+#include <stdlib.h>
+#include <mpi.h>
+#include <common/config.h>
+#include <common/list.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+LIST_TYPE(_starpu_mpi_copy_handle,
+	  starpu_data_handle_t handle;
+	  struct _starpu_mpi_envelope *env;
+	  struct _starpu_mpi_req *req;
+	  void *buffer;
+	  int mpi_tag;
+	  int source;
+	  int req_ready;
+	  starpu_pthread_mutex_t req_mutex;
+	  starpu_pthread_cond_t req_cond;
+);
+
+void _starpu_mpi_early_data_init(int world_size);
+void _starpu_mpi_early_data_check_termination();
+void _starpu_mpi_early_data_free(int world_size);
+
+struct _starpu_mpi_copy_handle *find_chandle(int mpi_tag, int source);
+void add_chandle(struct _starpu_mpi_copy_handle *chandle);
+void delete_chandle(struct _starpu_mpi_copy_handle *chandle);
+struct _starpu_mpi_copy_handle *pop_chandle(int mpi_tag, int source, int delete);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* __STARPU_MPI_EARLY_DATA_H__ */