Quellcode durchsuchen

mpi/src: add capability to deal with several early requests with the same tag and source.

Nathalie Furmento vor 10 Jahren
Ursprung
Commit
49a28d19d8
3 geänderte Dateien mit 36 neuen und 43 gelöschten Zeilen
  1. 2 4
      mpi/src/starpu_mpi.c
  2. 31 35
      mpi/src/starpu_mpi_early_request.c
  3. 3 4
      mpi/src/starpu_mpi_early_request.h

+ 2 - 4
mpi/src/starpu_mpi.c

@@ -252,7 +252,7 @@ static void _starpu_mpi_submit_ready_request(void *arg)
 				else
 				{
 					_STARPU_MPI_DEBUG(3, "Adding the pending receive request %p (srcdst %d tag %d) into the request hashmap\n", req, req->node_tag.rank, req->node_tag.data_tag);
-					_starpu_mpi_early_request_add(req);
+					_starpu_mpi_early_request_enqueue(req);
 				}
 			}
 		}
@@ -1352,7 +1352,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 				{
 					_STARPU_MPI_DEBUG(3, "Searching for application request with tag %d and source %d (size %ld)\n", envelope->data_tag, envelope_status.MPI_SOURCE, envelope->size);
 
-					struct _starpu_mpi_req *early_request = _starpu_mpi_early_request_find(envelope->data_tag, envelope_status.MPI_SOURCE, envelope_comm);
+					struct _starpu_mpi_req *early_request = _starpu_mpi_early_request_dequeue(envelope->data_tag, envelope_status.MPI_SOURCE, envelope_comm);
 
 					/* Case: a data will arrive before a matching receive is
 					 * posted by the application. Create a temporary handle to
@@ -1400,8 +1400,6 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 						_STARPU_MPI_DEBUG(2000, "A matching application request has been found for the incoming data with tag %d\n", envelope->data_tag);
 						_STARPU_MPI_DEBUG(2000, "Request sync %d\n", envelope->sync);
 
-						_starpu_mpi_early_request_delete(early_request);
-
 						early_request->sync = envelope->sync;
 						_starpu_mpi_handle_allocate_datatype(early_request->data_handle, &early_request->datatype, &early_request->user_datatype);
 						if (early_request->user_datatype == 0)

+ 31 - 35
mpi/src/starpu_mpi_early_request.c

@@ -22,7 +22,14 @@
 #include <common/uthash.h>
 
 /** stores application requests for which data have not been received yet */
-struct _starpu_mpi_req *_starpu_mpi_early_request_hash;
+struct _starpu_mpi_early_request_hashlist
+{
+	struct _starpu_mpi_req_list *list;
+	UT_hash_handle hh;
+	struct _starpu_mpi_node_tag node_tag;
+};
+
+struct _starpu_mpi_early_request_hashlist *_starpu_mpi_early_request_hash;
 int _starpu_mpi_early_request_hash_count;
 
 void _starpu_mpi_early_request_init()
@@ -46,63 +53,52 @@ void _starpu_mpi_early_request_check_termination()
 	STARPU_ASSERT_MSG(_starpu_mpi_early_request_count() == 0, "Number of early requests left is not zero");
 }
 
-struct _starpu_mpi_req* _starpu_mpi_early_request_find(int data_tag, int source, MPI_Comm comm)
+struct _starpu_mpi_req* _starpu_mpi_early_request_dequeue(int data_tag, int source, MPI_Comm comm)
 {
 	struct _starpu_mpi_node_tag node_tag;
 	struct _starpu_mpi_req *found;
+	struct _starpu_mpi_early_request_hashlist *hashlist;
 
 	memset(&node_tag, 0, sizeof(struct _starpu_mpi_node_tag));
 	node_tag.comm = comm;
 	node_tag.rank = source;
 	node_tag.data_tag = data_tag;
 
-	HASH_FIND(hh, _starpu_mpi_early_request_hash, &node_tag, sizeof(struct _starpu_mpi_node_tag), found);
-
-	return found;
-}
-
-void _starpu_mpi_early_request_add(struct _starpu_mpi_req *req)
-{
-	struct _starpu_mpi_req *test_req;
-
-	test_req = _starpu_mpi_early_request_find(req->node_tag.data_tag, req->node_tag.rank, req->node_tag.comm);
-
-	if (test_req == NULL)
+	_STARPU_MPI_DEBUG(100, "Looking for early_request with comm %p source %d tag %d\n", node_tag.comm, node_tag.rank, node_tag.data_tag);
+	HASH_FIND(hh, _starpu_mpi_early_request_hash, &node_tag, sizeof(struct _starpu_mpi_node_tag), hashlist);
+	if (hashlist == NULL)
 	{
-		HASH_ADD(hh, _starpu_mpi_early_request_hash, node_tag, sizeof(req->node_tag), req);
-		_starpu_mpi_early_request_hash_count ++;
-		_STARPU_MPI_DEBUG(3, "Adding request %p with comm %p source %d tag %d in the application request hashmap\n", req, req->node_tag.comm, req->node_tag.rank, req->node_tag.data_tag);
+		found = NULL;
 	}
 	else
 	{
-		_STARPU_MPI_DEBUG(3, "[Error] request %p with comm %p source %d tag %d already in the application request hashmap\n", req, req->node_tag.comm, req->node_tag.rank, req->node_tag.data_tag);
-		int seq_const = starpu_data_get_sequential_consistency_flag(req->data_handle);
-		if (seq_const &&  req->sequential_consistency)
+		if (_starpu_mpi_req_list_empty(hashlist->list))
 		{
-			STARPU_ASSERT_MSG(!test_req, "[Error] request %p with comm %p source %d tag %d wanted to be added to the application request hashmap, while another request %p with the same tag is already in it. \n Sequential consistency is activated : this is not supported by StarPU.", req, req->node_tag.comm, req->node_tag.rank, req->node_tag.data_tag, test_req);
+			found = NULL;
 		}
 		else
 		{
-			STARPU_ASSERT_MSG(!test_req, "[Error] request %p with comm %p source %d tag %d wanted to be added to the application request hashmap, while another request %p with the same tag is already in it. \n Sequential consistency isn't activated for this handle : you should want to add dependencies between requests for which the sequential consistency is deactivated.", req, req->node_tag.comm, req->node_tag.rank, req->node_tag.data_tag, test_req);
+			found = _starpu_mpi_req_list_pop_front(hashlist->list);
+			_starpu_mpi_early_request_hash_count --;
 		}
 	}
+	_STARPU_MPI_DEBUG(100, "Found early_request %p with comm %p source %d tag %d\n", found, node_tag.comm, node_tag.rank, node_tag.data_tag);
+	return found;
 }
 
-void _starpu_mpi_early_request_delete(struct _starpu_mpi_req *req)
+void _starpu_mpi_early_request_enqueue(struct _starpu_mpi_req *req)
 {
-	struct _starpu_mpi_req *test_req;
-
-	test_req = _starpu_mpi_early_request_find(req->node_tag.data_tag, req->node_tag.rank, req->node_tag.comm);
+	_STARPU_MPI_DEBUG(100, "Adding request %p with comm %p source %d tag %d in the application request hashmap\n", req, req->node_tag.comm, req->node_tag.rank, req->node_tag.data_tag);
 
-	if (test_req != NULL)
+	struct _starpu_mpi_early_request_hashlist *hashlist;
+	HASH_FIND(hh, _starpu_mpi_early_request_hash, &req->node_tag, sizeof(struct _starpu_mpi_node_tag), hashlist);
+	if (hashlist == NULL)
 	{
-		HASH_DEL(_starpu_mpi_early_request_hash, req);
-		_starpu_mpi_early_request_hash_count --;
-		_STARPU_MPI_DEBUG(3, "Deleting application request %p with comm %p source %d tag %d from the application request hashmap\n", req, req->node_tag.comm, req->node_tag.rank, req->node_tag.data_tag);
-	}
-	else
-	{
-		_STARPU_MPI_DEBUG(3, "[Warning] request %p with comm %p source %d tag %d is NOT in the application request hashmap\n", req, req->node_tag.comm, req->node_tag.rank, req->node_tag.data_tag);
+		hashlist = malloc(sizeof(struct _starpu_mpi_early_request_hashlist));
+		hashlist->list = _starpu_mpi_req_list_new();
+		hashlist->node_tag = req->node_tag;
+		HASH_ADD(hh, _starpu_mpi_early_request_hash, node_tag, sizeof(hashlist->node_tag), hashlist);
 	}
+	_starpu_mpi_req_list_push_back(hashlist->list, req);
+	_starpu_mpi_early_request_hash_count ++;
 }
-

+ 3 - 4
mpi/src/starpu_mpi_early_request.h

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2009, 2010-2014  Université de Bordeaux
- * Copyright (C) 2010, 2011, 2012, 2013, 2014  CNRS
+ * Copyright (C) 2010, 2011, 2012, 2013, 2014, 2015  CNRS
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -33,9 +33,8 @@ void _starpu_mpi_early_request_free(void);
 int _starpu_mpi_early_request_count(void);
 void _starpu_mpi_early_request_check_termination(void);
 
-void _starpu_mpi_early_request_add(struct _starpu_mpi_req *req);
-struct _starpu_mpi_req* _starpu_mpi_early_request_find(int data_tag, int source, MPI_Comm comm);
-void _starpu_mpi_early_request_delete(struct _starpu_mpi_req *req);
+void _starpu_mpi_early_request_enqueue(struct _starpu_mpi_req *req);
+struct _starpu_mpi_req* _starpu_mpi_early_request_dequeue(int data_tag, int source, MPI_Comm comm);
 
 #ifdef __cplusplus
 }