Browse Source

mpi: fix for receiving early several datas (and not only one)

Nathalie Furmento 12 years ago
parent
commit
e2675531f5
2 changed files with 111 additions and 27 deletions
  1. 2 2
      ChangeLog
  2. 109 25
      mpi/src/starpu_mpi.c

+ 2 - 2
ChangeLog

@@ -72,9 +72,9 @@ Changes:
   * Data interfaces (variable, vector, matrix and block) now define
   * Data interfaces (variable, vector, matrix and block) now define
     pack und unpack functions
     pack und unpack functions
   * Fix for properly dealing with NAN on windows systems
   * Fix for properly dealing with NAN on windows systems
-  * StarPU-MPI: Fix for being able to receive a data which has not yet
+  * StarPU-MPI: Fix for being able to receive data which have not yet
     been registered by the application (i.e it did not call
     been registered by the application (i.e it did not call
-    starpu_data_set_tag(), the data is received as a raw memory)
+    starpu_data_set_tag(), data are received as a raw memory)
   * StarPU-MPI: Fix for being able to receive data with the same tag
   * StarPU-MPI: Fix for being able to receive data with the same tag
     from several nodes (see mpi/tests/gather.c)
     from several nodes (see mpi/tests/gather.c)
 
 

+ 109 - 25
mpi/src/starpu_mpi.c

@@ -62,28 +62,33 @@ static int posted_requests = 0, newer_requests, barrier_running = 0;
 
 
 #define _STARPU_MPI_INC_POSTED_REQUESTS(value) { STARPU_PTHREAD_MUTEX_LOCK(&mutex_posted_requests); posted_requests += value; STARPU_PTHREAD_MUTEX_UNLOCK(&mutex_posted_requests); }
 #define _STARPU_MPI_INC_POSTED_REQUESTS(value) { STARPU_PTHREAD_MUTEX_LOCK(&mutex_posted_requests); posted_requests += value; STARPU_PTHREAD_MUTEX_UNLOCK(&mutex_posted_requests); }
 
 
-struct _starpu_mpi_copy_handle
+LIST_TYPE(_starpu_mpi_copy_handle,
+	  starpu_data_handle_t handle;
+	  struct _starpu_mpi_envelope *env;
+	  struct _starpu_mpi_req *req;
+	  void *buffer;
+	  int mpi_tag;
+	  int source;
+);
+
+struct _starpu_mpi_copy_handle_hashlist
 {
 {
-	starpu_data_handle_t handle;
-	struct _starpu_mpi_envelope *env;
-	int mpi_tag;
-	int source;
+	struct _starpu_mpi_copy_handle_list *list;
 	UT_hash_handle hh;
 	UT_hash_handle hh;
-	struct _starpu_mpi_req *req;
-	void *buffer;
+	int mpi_tag;
 };
 };
 
 
- /********************************************************/
- /*                                                      */
- /*  Hashmap's requests functionalities                  */
- /*                                                      */
- /********************************************************/
+/********************************************************/
+/*                                                      */
+/*  Hashmap's requests functionalities                  */
+/*                                                      */
+/********************************************************/
 
 
 /** stores application requests for which data have not been received yet */
 /** stores application requests for which data have not been received yet */
 static struct _starpu_mpi_req **_starpu_mpi_app_req_hashmap = NULL;
 static struct _starpu_mpi_req **_starpu_mpi_app_req_hashmap = NULL;
 static int _starpu_mpi_app_req_hashmap_count = 0;
 static int _starpu_mpi_app_req_hashmap_count = 0;
 /** stores data which have been received by MPI but have not been requested by the application */
 /** stores data which have been received by MPI but have not been requested by the application */
-static struct _starpu_mpi_copy_handle **_starpu_mpi_copy_handle_hashmap = NULL;
+static struct _starpu_mpi_copy_handle_hashlist **_starpu_mpi_copy_handle_hashmap = NULL;
 static int _starpu_mpi_copy_handle_hashmap_count = 0;
 static int _starpu_mpi_copy_handle_hashmap_count = 0;
 
 
 static struct _starpu_mpi_req* find_app_req(int mpi_tag, int source)
 static struct _starpu_mpi_req* find_app_req(int mpi_tag, int source)
@@ -140,36 +145,101 @@ static void delete_app_req(struct _starpu_mpi_req *req)
 	}
 	}
 }
 }
 
 
-static struct _starpu_mpi_copy_handle* find_chandle(int mpi_tag, int source)
+static void _starpu_mpi_copy_handle_display_hash(int source, int tag)
 {
 {
-	struct _starpu_mpi_copy_handle* chandle;
+	struct _starpu_mpi_copy_handle_hashlist *hashlist;
+	HASH_FIND_INT(_starpu_mpi_copy_handle_hashmap[source], &tag, hashlist);
 
 
-	_STARPU_MPI_DEBUG(60, "Looking for chandle with tag %d in the hashmap[%d]\n", mpi_tag, source);
+	if (hashlist == NULL)
+	{
+		_STARPU_MPI_DEBUG(60, "Hashlist for source %d and tag %d does not exist\n", source, tag);
+	}
+	else if (_starpu_mpi_copy_handle_list_empty(hashlist->list))
+	{
+		_STARPU_MPI_DEBUG(60, "Hashlist for source %d and tag %d is empty\n", source, tag);
+	}
+	else
+	{
+		struct _starpu_mpi_copy_handle *cur;
+		for (cur = _starpu_mpi_copy_handle_list_begin(hashlist->list) ;
+		     cur != _starpu_mpi_copy_handle_list_end(hashlist->list);
+		     cur = _starpu_mpi_copy_handle_list_next(cur))
+		{
+			_STARPU_MPI_DEBUG(60, "Element for source %d and tag %d: %p\n", source, tag, cur);
+		}
+	}
+}
 
 
-	HASH_FIND_INT(_starpu_mpi_copy_handle_hashmap[source], &mpi_tag, chandle);
+static struct _starpu_mpi_copy_handle *pop_chandle(int mpi_tag, int source, int delete)
+{
+	struct _starpu_mpi_copy_handle_hashlist *hashlist;
+	struct _starpu_mpi_copy_handle *chandle;
+
+	_STARPU_MPI_DEBUG(60, "Looking for chandle with tag %d in the hashmap[%d]\n", mpi_tag, source);
+	HASH_FIND_INT(_starpu_mpi_copy_handle_hashmap[source], &mpi_tag, hashlist);
+	if (hashlist == NULL)
+	{
+		chandle = NULL;
+	}
+	else
+	{
+		if (_starpu_mpi_copy_handle_list_empty(hashlist->list))
+		{
+			chandle = NULL;
+		}
+		else
+		{
+			if (delete == 1)
+			{
+				chandle = _starpu_mpi_copy_handle_list_pop_front(hashlist->list);
+			}
+			else
+			{
+				chandle = _starpu_mpi_copy_handle_list_front(hashlist->list);
+			}
+		}
+	}
+	_STARPU_MPI_DEBUG(60, "Found chandle %p with tag %d in the hashmap[%d]\n", chandle, mpi_tag, source);
 	return chandle;
 	return chandle;
 }
 }
 
 
+static struct _starpu_mpi_copy_handle *find_chandle(int mpi_tag, int source)
+{
+	return pop_chandle(mpi_tag, source, 0);
+}
+
 static void add_chandle(struct _starpu_mpi_copy_handle *chandle)
 static void add_chandle(struct _starpu_mpi_copy_handle *chandle)
 {
 {
 	_STARPU_MPI_DEBUG(60, "Trying to add chandle %p with tag %d in the hashmap[%d]\n", chandle, chandle->mpi_tag, chandle->source);
 	_STARPU_MPI_DEBUG(60, "Trying to add chandle %p with tag %d in the hashmap[%d]\n", chandle, chandle->mpi_tag, chandle->source);
-	STARPU_ASSERT_MSG(find_chandle(chandle->mpi_tag, chandle->source) == NULL,
-			  "Error add_chandle : chandle %p with tag %d already in the hashmap[%d]\n", chandle, chandle->mpi_tag, chandle->source);
 
 
-	HASH_ADD_INT(_starpu_mpi_copy_handle_hashmap[chandle->source], mpi_tag, chandle);
+	struct _starpu_mpi_copy_handle_hashlist *hashlist;
+	HASH_FIND_INT(_starpu_mpi_copy_handle_hashmap[chandle->source], &chandle->mpi_tag, hashlist);
+	if (hashlist == NULL)
+	{
+		hashlist = malloc(sizeof(struct _starpu_mpi_copy_handle_hashlist));
+		hashlist->list = _starpu_mpi_copy_handle_list_new();
+		hashlist->mpi_tag = chandle->mpi_tag;
+		HASH_ADD_INT(_starpu_mpi_copy_handle_hashmap[chandle->source], mpi_tag, hashlist);
+	}
+	_starpu_mpi_copy_handle_list_push_back(hashlist->list, chandle);
 	_starpu_mpi_copy_handle_hashmap_count ++;
 	_starpu_mpi_copy_handle_hashmap_count ++;
+#ifdef STARPU_VERBOSE
+	_starpu_mpi_copy_handle_display_hash(chandle->source, chandle->mpi_tag);
+#endif
 }
 }
 
 
 static void delete_chandle(struct _starpu_mpi_copy_handle *chandle)
 static void delete_chandle(struct _starpu_mpi_copy_handle *chandle)
 {
 {
-	struct _starpu_mpi_copy_handle *test_chandle;
-
 	_STARPU_MPI_DEBUG(60, "Trying to delete chandle %p with tag %d in the hashmap[%d]\n", chandle, chandle->mpi_tag, chandle->source);
 	_STARPU_MPI_DEBUG(60, "Trying to delete chandle %p with tag %d in the hashmap[%d]\n", chandle, chandle->mpi_tag, chandle->source);
-	STARPU_ASSERT_MSG(find_chandle(chandle->mpi_tag, chandle->source) != NULL,
+	struct _starpu_mpi_copy_handle *found = pop_chandle(chandle->mpi_tag, chandle->source, 1);
+
+	STARPU_ASSERT_MSG(found == chandle,
 			  "Error delete_chandle : chandle %p with tag %d is NOT in the hashmap[%d]\n", chandle, chandle->mpi_tag, chandle->source);
 			  "Error delete_chandle : chandle %p with tag %d is NOT in the hashmap[%d]\n", chandle, chandle->mpi_tag, chandle->source);
 
 
-	HASH_DEL(_starpu_mpi_copy_handle_hashmap[chandle->source], chandle);
 	_starpu_mpi_copy_handle_hashmap_count --;
 	_starpu_mpi_copy_handle_hashmap_count --;
+#ifdef STARPU_VERBOSE
+	_starpu_mpi_copy_handle_display_hash(chandle->source, chandle->mpi_tag);
+#endif
 }
 }
 
 
 static void _starpu_mpi_request_init(struct _starpu_mpi_req *req)
 static void _starpu_mpi_request_init(struct _starpu_mpi_req *req)
@@ -1348,6 +1418,20 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 
 
 	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
 	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
 
 
+	{
+		int nb_nodes, n;
+		struct _starpu_mpi_copy_handle_hashlist *hashlist;
+
+		MPI_Comm_size(MPI_COMM_WORLD, &nb_nodes);
+		for(n=0 ; n<nb_nodes; n++)
+		{
+			for(hashlist=_starpu_mpi_copy_handle_hashmap[n]; hashlist != NULL; hashlist=hashlist->hh.next)
+			{
+				_starpu_mpi_copy_handle_list_delete(hashlist->list);
+			}
+		}
+	}
+
 	free(_starpu_mpi_app_req_hashmap);
 	free(_starpu_mpi_app_req_hashmap);
 	free(_starpu_mpi_copy_handle_hashmap);
 	free(_starpu_mpi_copy_handle_hashmap);
 	free(argc_argv);
 	free(argc_argv);
@@ -1439,7 +1523,7 @@ int _starpu_mpi_initialize(int *argc, char ***argv, int initialize_mpi)
 		MPI_Comm_size(MPI_COMM_WORLD, &nb_nodes);
 		MPI_Comm_size(MPI_COMM_WORLD, &nb_nodes);
 		_starpu_mpi_app_req_hashmap = malloc(nb_nodes * sizeof(struct _starpu_mpi_req *));
 		_starpu_mpi_app_req_hashmap = malloc(nb_nodes * sizeof(struct _starpu_mpi_req *));
 		for(k=0 ; k<nb_nodes ; k++) _starpu_mpi_app_req_hashmap[k] = NULL;
 		for(k=0 ; k<nb_nodes ; k++) _starpu_mpi_app_req_hashmap[k] = NULL;
-		_starpu_mpi_copy_handle_hashmap = malloc(nb_nodes * sizeof(struct _starpu_mpi_copy_handle *));
+		_starpu_mpi_copy_handle_hashmap = malloc(nb_nodes * sizeof(struct _starpu_mpi_copy_handle_hash_list *));
 		for(k=0 ; k<nb_nodes ; k++) _starpu_mpi_copy_handle_hashmap[k] = NULL;
 		for(k=0 ; k<nb_nodes ; k++) _starpu_mpi_copy_handle_hashmap[k] = NULL;
 	}
 	}