Parcourir la source

mpi: fix for being able to receive a data with the same tag from
different nodes

Nathalie Furmento il y a 11 ans
Parent
commit
4e5b001319
1 fichiers modifiés avec 58 ajouts et 43 suppressions
  1. 58 43
      mpi/src/starpu_mpi.c

+ 58 - 43
mpi/src/starpu_mpi.c

@@ -67,6 +67,7 @@ struct _starpu_mpi_copy_handle
 	starpu_data_handle_t handle;
 	struct _starpu_mpi_envelope *env;
 	int mpi_tag;
+	int source;
 	UT_hash_handle hh;
 	struct _starpu_mpi_req *req;
 	void *buffer;
@@ -79,15 +80,17 @@ struct _starpu_mpi_copy_handle
  /********************************************************/
 
 /** stores application requests for which data have not been received yet */
-static struct _starpu_mpi_req *_starpu_mpi_app_req_hashmap = NULL;
+static struct _starpu_mpi_req **_starpu_mpi_app_req_hashmap = NULL;
+static int _starpu_mpi_app_req_hashmap_count = 0;
 /** stores data which have been received by MPI but have not been requested by the application */
-static struct _starpu_mpi_copy_handle *_starpu_mpi_copy_handle_hashmap = NULL;
+static struct _starpu_mpi_copy_handle **_starpu_mpi_copy_handle_hashmap = NULL;
+static int _starpu_mpi_copy_handle_hashmap_count = 0;
 
-static struct _starpu_mpi_req* find_app_req(int mpi_tag)
+static struct _starpu_mpi_req* find_app_req(int mpi_tag, int source)
 {
 	struct _starpu_mpi_req* req;
 
-	HASH_FIND_INT(_starpu_mpi_app_req_hashmap, &mpi_tag, req);
+	HASH_FIND_INT(_starpu_mpi_app_req_hashmap[source], &mpi_tag, req);
 
 	return req;
 }
@@ -96,24 +99,25 @@ static void add_app_req(struct _starpu_mpi_req *req)
 {
 	struct _starpu_mpi_req *test_req;
 
-	test_req = find_app_req(req->mpi_tag);
+	test_req = find_app_req(req->mpi_tag, req->srcdst);
 
 	if (test_req == NULL)
 	{
-		HASH_ADD_INT(_starpu_mpi_app_req_hashmap, mpi_tag, req);
-		_STARPU_MPI_DEBUG(3, "Adding request %p with tag %d in the application request hashmap. \n", req, req->mpi_tag);
+		HASH_ADD_INT(_starpu_mpi_app_req_hashmap[req->srcdst], mpi_tag, req);
+		_starpu_mpi_app_req_hashmap_count ++;
+		_STARPU_MPI_DEBUG(3, "Adding request %p with tag %d in the application request hashmap[%d]\n", req, req->mpi_tag, req->srcdst);
 	}
 	else
 	{
-		_STARPU_MPI_DEBUG(3, "[Error] request %p with tag %d already in the application request hashmap. \n", req, req->mpi_tag);
+		_STARPU_MPI_DEBUG(3, "[Error] request %p with tag %d already in the application request hashmap[%d]\n", req, req->mpi_tag, req->srcdst);
 		int seq_const = starpu_data_get_sequential_consistency_flag(req->data_handle);
 		if (seq_const &&  req->sequential_consistency)
 		{
-			STARPU_ASSERT_MSG(!test_req, "[Error] request %p with tag %d wanted to be added to the application request hashmap, while another request %p with the same tag is already in it. \n Sequential consistency is activated : this is not supported by StarPU.", req, req->mpi_tag, test_req);
+			STARPU_ASSERT_MSG(!test_req, "[Error] request %p with tag %d wanted to be added to the application request hashmap[%d], while another request %p with the same tag is already in it. \n Sequential consistency is activated : this is not supported by StarPU.", req, req->mpi_tag, req->srcdst, test_req);
 		}
 		else
 		{
-			STARPU_ASSERT_MSG(!test_req, "[Error] request %p with tag %d wanted to be added to the application request hashmap, while another request %p with the same tag is already in it. \n Sequential consistency isn't activated for this handle : you should want to add dependencies between requests for which the sequential consistency is deactivated.", req, req->mpi_tag, test_req);
+			STARPU_ASSERT_MSG(!test_req, "[Error] request %p with tag %d wanted to be added to the application request hashmap[%d], while another request %p with the same tag is already in it. \n Sequential consistency isn't activated for this handle : you should want to add dependencies between requests for which the sequential consistency is deactivated.", req, req->mpi_tag, req->srcdst, test_req);
 		}
 	}
 }
@@ -122,60 +126,55 @@ static void delete_app_req(struct _starpu_mpi_req *req)
 {
 	struct _starpu_mpi_req *test_req;
 
-	test_req = find_app_req(req->mpi_tag);
+	test_req = find_app_req(req->mpi_tag, req->srcdst);
 
 	if (test_req != NULL)
 	{
-		HASH_DEL(_starpu_mpi_app_req_hashmap, req);
-		_STARPU_MPI_DEBUG(3, "Deleting application request %p with tag %d from the application request hashmap. \n", req, req->mpi_tag);
+		HASH_DEL(_starpu_mpi_app_req_hashmap[req->srcdst], req);
+		_starpu_mpi_app_req_hashmap_count --;
+		_STARPU_MPI_DEBUG(3, "Deleting application request %p with tag %d from the application request hashmap[%d]\n", req, req->mpi_tag, req->srcdst);
 	}
 	else
 	{
-		_STARPU_MPI_DEBUG(3, "[Warning] request %p with tag %d is NOT in the application request hashmap. \n", req, req->mpi_tag);
+		_STARPU_MPI_DEBUG(3, "[Warning] request %p with tag %d is NOT in the application request hashmap[%d]\n", req, req->mpi_tag, req->srcdst);
 	}
 }
 
-static struct _starpu_mpi_copy_handle* find_chandle(int mpi_tag)
+static struct _starpu_mpi_copy_handle* find_chandle(int mpi_tag, int source)
 {
 	struct _starpu_mpi_copy_handle* chandle;
 
-	HASH_FIND_INT(_starpu_mpi_copy_handle_hashmap, &mpi_tag, chandle);
+	_STARPU_MPI_DEBUG(60, "Looking for chandle with tag %d in the hashmap[%d]\n", mpi_tag, source);
 
+	HASH_FIND_INT(_starpu_mpi_copy_handle_hashmap[source], &mpi_tag, chandle);
 	return chandle;
 }
 
 static void add_chandle(struct _starpu_mpi_copy_handle *chandle)
 {
-	struct _starpu_mpi_copy_handle *test_chandle;
-
-	test_chandle = find_chandle(chandle->mpi_tag);
+	STARPU_ASSERT_MSG(find_chandle(chandle->mpi_tag, chandle->source) == NULL,
+			  "Error add_chandle : copied handle %p with tag %d already in the hashmap[%d]\n", chandle, chandle->mpi_tag, chandle->source);
 
-	if (test_chandle == NULL)
-	{
-		HASH_ADD_INT(_starpu_mpi_copy_handle_hashmap, mpi_tag, chandle);
-		_STARPU_MPI_DEBUG(3, "Adding copied handle %p with tag %d in the hashmap. \n", chandle, chandle->mpi_tag);
-	}
-	else
-	{
-		_STARPU_MPI_DEBUG(3, "Error add_chandle : copied handle %p with tag %d already in the hashmap. \n", chandle, chandle->mpi_tag);
-		STARPU_ASSERT(test_chandle != NULL);
-	}
+	HASH_ADD_INT(_starpu_mpi_copy_handle_hashmap[chandle->source], mpi_tag, chandle);
+	_starpu_mpi_copy_handle_hashmap_count ++;
+	_STARPU_MPI_DEBUG(60, "Adding copied handle %p with tag %d in the hashmap[%d]\n", chandle, chandle->mpi_tag, chandle->source);
 }
 
 static void delete_chandle(struct _starpu_mpi_copy_handle *chandle)
 {
 	struct _starpu_mpi_copy_handle *test_chandle;
 
-	test_chandle = find_chandle(chandle->mpi_tag);
+	test_chandle = find_chandle(chandle->mpi_tag, chandle->source);
 
 	if (test_chandle != NULL)
 	{
-		HASH_DEL(_starpu_mpi_copy_handle_hashmap, chandle);
-		_STARPU_MPI_DEBUG(3, "Deleting copied handle %p with tag %d from the hashmap. \n", chandle, chandle->mpi_tag);
+		HASH_DEL(_starpu_mpi_copy_handle_hashmap[chandle->source], chandle);
+		_starpu_mpi_copy_handle_hashmap_count --;
+		_STARPU_MPI_DEBUG(3, "Deleting copied handle %p with tag %d from the hashmap[%d]\n", chandle, chandle->mpi_tag, chandle->source);
 	}
 	else
 	{
-		_STARPU_MPI_DEBUG(3, "Warning delete_chandle : copied handle %p with tag %d isn't in the hashmap. \n", chandle, chandle->mpi_tag);
+		_STARPU_MPI_DEBUG(3, "Warning delete_chandle : copied handle %p with tag %d isn't in the hashmap[%d]\n", chandle, chandle->mpi_tag, chandle->source);
 	}
 }
 
@@ -781,7 +780,8 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req)
 
 	if (req->internal_req)
 	{
-		struct _starpu_mpi_copy_handle *chandle = find_chandle(starpu_data_get_tag(req->data_handle));
+		struct _starpu_mpi_copy_handle *chandle = find_chandle(starpu_data_get_tag(req->data_handle), req->srcdst);
+		STARPU_ASSERT_MSG(chandle, "Could not find a copy data handle with the tag %d and the node %d\n", starpu_data_get_tag(req->data_handle), req->srcdst);
 		_STARPU_MPI_DEBUG(3, "Handling deleting of copy_handle structure from the hashmap..\n");
 		delete_chandle(chandle);
 		free(chandle);
@@ -903,7 +903,7 @@ static void _starpu_mpi_submit_new_mpi_request(void *arg)
 
 	_STARPU_MPI_INC_POSTED_REQUESTS(-1);
 
-	_STARPU_MPI_DEBUG(3, "calling _starpu_mpi_submit_new_mpi_request with req %p tag %d and type %s\n", req, req->mpi_tag, _starpu_mpi_request_type(req->request_type));
+	_STARPU_MPI_DEBUG(3, "calling _starpu_mpi_submit_new_mpi_request with req %p srcdst %d tag %d and type %s\n", req, req->srcdst, req->mpi_tag, _starpu_mpi_request_type(req->request_type));
 
 	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
 
@@ -943,7 +943,7 @@ static void _starpu_mpi_submit_new_mpi_request(void *arg)
 		else
 		{
 			/* test whether the receive request has already been submitted internally by StarPU-MPI*/
-			struct _starpu_mpi_copy_handle *chandle = find_chandle(req->mpi_tag);
+			struct _starpu_mpi_copy_handle *chandle = find_chandle(req->mpi_tag, req->srcdst);
 
 			/* Case : the request has already been submitted internally by StarPU.
 			 * We'll asynchronously ask a Read permission over the temporary handle, so as when
@@ -1177,7 +1177,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 	while (running || posted_requests || !(_starpu_mpi_req_list_empty(new_requests)) || !(_starpu_mpi_req_list_empty(detached_requests)))
 	{
 		/* shall we block ? */
-		unsigned block = _starpu_mpi_req_list_empty(new_requests) && (HASH_COUNT(_starpu_mpi_app_req_hashmap) == 0);
+		unsigned block = _starpu_mpi_req_list_empty(new_requests) && (_starpu_mpi_app_req_hashmap_count == 0);
 
 #ifndef STARPU_MPI_ACTIVITY
 		STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
@@ -1217,7 +1217,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 		/* If there is no currently submitted header_req submitted to catch envelopes from senders, and there is some pending receive
 		 * requests in our side, we resubmit a header request. */
 		MPI_Request header_req;
-		if ((HASH_COUNT(_starpu_mpi_app_req_hashmap) > 0) && (header_req_submitted == 0))// && (HASH_COUNT(_starpu_mpi_copy_handle_hashmap) == 0))
+		if ((_starpu_mpi_app_req_hashmap_count > 0) && (header_req_submitted == 0))// && (HASH_COUNT(_starpu_mpi_copy_handle_hashmap) == 0))
 		{
 			_STARPU_MPI_DEBUG(3, "Posting a receive to get a data envelop\n");
 			MPI_Irecv(recv_env, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, MPI_ANY_SOURCE, _starpu_mpi_tag, MPI_COMM_WORLD, &header_req);
@@ -1241,16 +1241,17 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 
 			if (flag)
 			{
-				_STARPU_MPI_DEBUG(3, "Searching for application request with tag %d (size %ld)\n", recv_env->mpi_tag, recv_env->size);
+				_STARPU_MPI_DEBUG(3, "Searching for application request with tag %d and source %d (size %ld)\n", recv_env->mpi_tag, status.MPI_SOURCE, recv_env->size);
 
-				struct _starpu_mpi_req *found_req = find_app_req(recv_env->mpi_tag);
+				struct _starpu_mpi_req *found_req = find_app_req(recv_env->mpi_tag, status.MPI_SOURCE);
 
 				/* Case : a data will arrive before the matching receive has been submitted in our side of the application.
 				 * We will allow a temporary handle to store the incoming data, by submitting a starpu_mpi_irecv_detached
 				 * on this handle, and register this so as the StarPU-MPI layer can remember it.*/
 				if (!found_req)
 				{
-					_STARPU_MPI_DEBUG(3, "Request with tag %d not found, creating a copy_handle to receive incoming data..\n",recv_env->mpi_tag);
+					_STARPU_MPI_DEBUG(3, "Request with tag %d and source %d not found, creating a copy_handle to receive incoming data..\n", recv_env->mpi_tag, status.MPI_SOURCE);
+
 					starpu_data_handle_t data_handle = NULL;
 
 					STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
@@ -1261,6 +1262,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 					STARPU_ASSERT(chandle);
 					chandle->mpi_tag = recv_env->mpi_tag;
 					chandle->env = recv_env;
+					chandle->source = status.MPI_SOURCE;
 
 					if (data_handle)
 					{
@@ -1340,8 +1342,9 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 	STARPU_ASSERT_MSG(_starpu_mpi_req_list_empty(detached_requests), "List of detached requests not empty");
 	STARPU_ASSERT_MSG(_starpu_mpi_req_list_empty(new_requests), "List of new requests not empty");
 	STARPU_ASSERT_MSG(posted_requests == 0, "Number of posted request is not zero");
-	STARPU_ASSERT_MSG(HASH_COUNT(_starpu_mpi_app_req_hashmap) == 0, "Number of receive requests left is not zero");
-	STARPU_ASSERT_MSG(HASH_COUNT(_starpu_mpi_copy_handle_hashmap) == 0, "Number of copy requests left is not zero");
+	STARPU_ASSERT_MSG(_starpu_mpi_app_req_hashmap_count == 0, "Number of receive requests left is not zero");
+	STARPU_ASSERT_MSG(_starpu_mpi_copy_handle_hashmap_count == 0, "Number of copy requests left is not zero");
+
 	if (argc_argv->initialize_mpi)
 	{
 		_STARPU_MPI_DEBUG(3, "Calling MPI_Finalize()\n");
@@ -1350,6 +1353,8 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 
 	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
 
+	free(_starpu_mpi_app_req_hashmap);
+	free(_starpu_mpi_copy_handle_hashmap);
 	free(argc_argv);
 	free(recv_env);
 
@@ -1433,6 +1438,16 @@ int _starpu_mpi_initialize(int *argc, char ***argv, int initialize_mpi)
 	_starpu_mpi_add_sync_point_in_fxt();
 	_starpu_mpi_comm_amounts_init(MPI_COMM_WORLD);
 	_starpu_mpi_cache_init(MPI_COMM_WORLD);
+
+	{
+		int nb_nodes, k;
+		MPI_Comm_size(MPI_COMM_WORLD, &nb_nodes);
+		_starpu_mpi_app_req_hashmap = malloc(nb_nodes * sizeof(struct _starpu_mpi_req *));
+		for(k=0 ; k<nb_nodes ; k++) _starpu_mpi_app_req_hashmap[k] = NULL;
+		_starpu_mpi_copy_handle_hashmap = malloc(nb_nodes * sizeof(struct _starpu_mpi_copy_handle *));
+		for(k=0 ; k<nb_nodes ; k++) _starpu_mpi_copy_handle_hashmap[k] = NULL;
+	}
+
 	return 0;
 }