瀏覽代碼

mpi/src: move all functionalities related to early application requests to starpu_mpi_early_request

Nathalie Furmento 11 年之前
父節點
當前提交
f5ce9ebd53
共有 4 個文件被更改,包括 162 次插入80 次删除
  1. 4 2
      mpi/src/Makefile.am
  2. 10 78
      mpi/src/starpu_mpi.c
  3. 104 0
      mpi/src/starpu_mpi_early_request.c
  4. 44 0
      mpi/src/starpu_mpi_early_request.h

+ 4 - 2
mpi/src/Makefile.am

@@ -40,7 +40,8 @@ noinst_HEADERS =					\
 	starpu_mpi_datatype.h				\
 	starpu_mpi_cache.h				\
 	starpu_mpi_cache_stats.h			\
-	starpu_mpi_early_data.h
+	starpu_mpi_early_data.h				\
+	starpu_mpi_early_request.h
 
 libstarpumpi_@STARPU_EFFECTIVE_VERSION@_la_SOURCES =	\
 	starpu_mpi.c					\
@@ -52,7 +53,8 @@ libstarpumpi_@STARPU_EFFECTIVE_VERSION@_la_SOURCES =	\
 	starpu_mpi_private.c				\
 	starpu_mpi_cache.c				\
 	starpu_mpi_cache_stats.c			\
-	starpu_mpi_early_data.c
+	starpu_mpi_early_data.c				\
+	starpu_mpi_early_request.c
 
 showcheck:
 	-cat /dev/null

+ 10 - 78
mpi/src/starpu_mpi.c

@@ -23,6 +23,7 @@
 #include <starpu_mpi_stats.h>
 #include <starpu_mpi_task_insert.h>
 #include <starpu_mpi_early_data.h>
+#include <starpu_mpi_early_request.h>
 #include <common/config.h>
 #include <common/thread.h>
 #include <datawizard/interfaces/data_interface.h>
@@ -66,70 +67,6 @@ static int posted_requests = 0, newer_requests, barrier_running = 0;
 
 #define _STARPU_MPI_INC_POSTED_REQUESTS(value) { STARPU_PTHREAD_MUTEX_LOCK(&mutex_posted_requests); posted_requests += value; STARPU_PTHREAD_MUTEX_UNLOCK(&mutex_posted_requests); }
 
-/********************************************************/
-/*                                                      */
-/*  Hashmap's requests functionalities                  */
-/*                                                      */
-/********************************************************/
-
-/** stores application requests for which data have not been received yet */
-static struct _starpu_mpi_req **_starpu_mpi_app_req_hashmap = NULL;
-static int _starpu_mpi_app_req_hashmap_count = 0;
-
-static struct _starpu_mpi_req* find_app_req(int mpi_tag, int source)
-{
-	struct _starpu_mpi_req* req;
-
-	HASH_FIND_INT(_starpu_mpi_app_req_hashmap[source], &mpi_tag, req);
-
-	return req;
-}
-
-static void add_app_req(struct _starpu_mpi_req *req)
-{
-	struct _starpu_mpi_req *test_req;
-
-	test_req = find_app_req(req->mpi_tag, req->srcdst);
-
-	if (test_req == NULL)
-	{
-		HASH_ADD_INT(_starpu_mpi_app_req_hashmap[req->srcdst], mpi_tag, req);
-		_starpu_mpi_app_req_hashmap_count ++;
-		_STARPU_MPI_DEBUG(3, "Adding request %p with tag %d in the application request hashmap[%d]\n", req, req->mpi_tag, req->srcdst);
-	}
-	else
-	{
-		_STARPU_MPI_DEBUG(3, "[Error] request %p with tag %d already in the application request hashmap[%d]\n", req, req->mpi_tag, req->srcdst);
-		int seq_const = starpu_data_get_sequential_consistency_flag(req->data_handle);
-		if (seq_const &&  req->sequential_consistency)
-		{
-			STARPU_ASSERT_MSG(!test_req, "[Error] request %p with tag %d wanted to be added to the application request hashmap[%d], while another request %p with the same tag is already in it. \n Sequential consistency is activated : this is not supported by StarPU.", req, req->mpi_tag, req->srcdst, test_req);
-		}
-		else
-		{
-			STARPU_ASSERT_MSG(!test_req, "[Error] request %p with tag %d wanted to be added to the application request hashmap[%d], while another request %p with the same tag is already in it. \n Sequential consistency isn't activated for this handle : you should want to add dependencies between requests for which the sequential consistency is deactivated.", req, req->mpi_tag, req->srcdst, test_req);
-		}
-	}
-}
-
-static void delete_app_req(struct _starpu_mpi_req *req)
-{
-	struct _starpu_mpi_req *test_req;
-
-	test_req = find_app_req(req->mpi_tag, req->srcdst);
-
-	if (test_req != NULL)
-	{
-		HASH_DEL(_starpu_mpi_app_req_hashmap[req->srcdst], req);
-		_starpu_mpi_app_req_hashmap_count --;
-		_STARPU_MPI_DEBUG(3, "Deleting application request %p with tag %d from the application request hashmap[%d]\n", req, req->mpi_tag, req->srcdst);
-	}
-	else
-	{
-		_STARPU_MPI_DEBUG(3, "[Warning] request %p with tag %d is NOT in the application request hashmap[%d]\n", req, req->mpi_tag, req->srcdst);
-	}
-}
-
 static void _starpu_mpi_request_init(struct _starpu_mpi_req **req)
 {
 	*req = malloc(sizeof(struct _starpu_mpi_req));
@@ -929,7 +866,7 @@ static void _starpu_mpi_submit_new_mpi_request(void *arg)
 			else
 			{
 				_STARPU_MPI_DEBUG(3, "Adding the pending receive request %p (srcdst %d tag %d) into the request hashmap\n", req, req->srcdst, req->mpi_tag);
-				add_app_req(req);
+				_starpu_mpi_early_request_add(req);
 			}
 		}
 	}
@@ -1128,13 +1065,8 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 	_starpu_mpi_comm_amounts_init(MPI_COMM_WORLD);
 	_starpu_mpi_cache_init(MPI_COMM_WORLD);
 
-	{
-		int nb_nodes, k;
-		MPI_Comm_size(MPI_COMM_WORLD, &nb_nodes);
-		_starpu_mpi_app_req_hashmap = malloc(nb_nodes * sizeof(struct _starpu_mpi_req *));
-		for(k=0 ; k<nb_nodes ; k++) _starpu_mpi_app_req_hashmap[k] = NULL;
-		_starpu_mpi_early_data_init(nb_nodes);
-	}
+	_starpu_mpi_early_request_init(worldsize);
+	_starpu_mpi_early_data_init(worldsize);
 
 	/* notify the main thread that the progression thread is ready */
 	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
@@ -1151,7 +1083,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 	while (running || posted_requests || !(_starpu_mpi_req_list_empty(new_requests)) || !(_starpu_mpi_req_list_empty(detached_requests)))
 	{
 		/* shall we block ? */
-		unsigned block = _starpu_mpi_req_list_empty(new_requests) && (_starpu_mpi_app_req_hashmap_count == 0);
+		unsigned block = _starpu_mpi_req_list_empty(new_requests) && _starpu_mpi_early_request_count() == 0;
 
 #ifndef STARPU_MPI_ACTIVITY
 		STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
@@ -1191,7 +1123,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 		/* If there is no currently submitted header_req submitted to catch envelopes from senders, and there is some pending receive
 		 * requests in our side, we resubmit a header request. */
 		MPI_Request header_req;
-		if ((_starpu_mpi_app_req_hashmap_count > 0) && (header_req_submitted == 0))// && (HASH_COUNT(_starpu_mpi_early_data_handle_hashmap) == 0))
+		if ((_starpu_mpi_early_request_count() > 0) && (header_req_submitted == 0))// && (HASH_COUNT(_starpu_mpi_early_data_handle_hashmap) == 0))
 		{
 			_STARPU_MPI_DEBUG(3, "Posting a receive to get a data envelop\n");
 			MPI_Irecv(recv_env, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, MPI_ANY_SOURCE, _starpu_mpi_tag, MPI_COMM_WORLD, &header_req);
@@ -1217,7 +1149,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 			{
 				_STARPU_MPI_DEBUG(3, "Searching for application request with tag %d and source %d (size %ld)\n", recv_env->mpi_tag, status.MPI_SOURCE, recv_env->size);
 
-				struct _starpu_mpi_req *found_req = find_app_req(recv_env->mpi_tag, status.MPI_SOURCE);
+				struct _starpu_mpi_req *found_req = _starpu_mpi_early_request_find(recv_env->mpi_tag, status.MPI_SOURCE);
 
 				/* Case : a data will arrive before the matching receive has been submitted in our side of the application.
 				 * We will allow a temporary handle to store the incoming data, by submitting a starpu_mpi_irecv_detached
@@ -1288,7 +1220,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 				{
 					_STARPU_MPI_DEBUG(3, "A matching receive has been found for the incoming data with tag %d\n", recv_env->mpi_tag);
 
-					delete_app_req(found_req);
+					_starpu_mpi_early_request_delete(found_req);
 
 					_starpu_mpi_handle_allocate_datatype(found_req->data_handle, &found_req->datatype, &found_req->user_datatype);
 					if (found_req->user_datatype == 0)
@@ -1325,7 +1257,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 	STARPU_ASSERT_MSG(_starpu_mpi_req_list_empty(detached_requests), "List of detached requests not empty");
 	STARPU_ASSERT_MSG(_starpu_mpi_req_list_empty(new_requests), "List of new requests not empty");
 	STARPU_ASSERT_MSG(posted_requests == 0, "Number of posted request is not zero");
-	STARPU_ASSERT_MSG(_starpu_mpi_app_req_hashmap_count == 0, "Number of receive requests left is not zero");
+	_starpu_mpi_early_request_check_termination();
 	_starpu_mpi_early_data_check_termination();
 
 	if (argc_argv->initialize_mpi)
@@ -1337,7 +1269,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
 
 	_starpu_mpi_early_data_free(worldsize);
-	free(_starpu_mpi_app_req_hashmap);
+	_starpu_mpi_early_request_free();
 	free(argc_argv);
 	free(recv_env);
 

+ 104 - 0
mpi/src/starpu_mpi_early_request.c

@@ -0,0 +1,104 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2009, 2010-2014  Université de Bordeaux 1
+ * Copyright (C) 2010, 2011, 2012, 2013, 2014  Centre National de la Recherche Scientifique
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <stdlib.h>
+#include <starpu_mpi.h>
+#include <starpu_mpi_private.h>
+#include <starpu_mpi_early_request.h>
+#include <common/uthash.h>
+
+/** stores application requests for which data have not been received yet */
+static struct _starpu_mpi_req **_starpu_mpi_app_req_hashmap = NULL;
+static int _starpu_mpi_app_req_hashmap_count = 0;
+
+void _starpu_mpi_early_request_init(int world_size)
+{
+	int k;
+
+	_starpu_mpi_app_req_hashmap = malloc(world_size * sizeof(struct _starpu_mpi_req *));
+	for(k=0 ; k<world_size ; k++) _starpu_mpi_app_req_hashmap[k] = NULL;
+}
+
+void _starpu_mpi_early_request_free()
+{
+	free(_starpu_mpi_app_req_hashmap);
+}
+
+int _starpu_mpi_early_request_count()
+{
+	return _starpu_mpi_app_req_hashmap_count;
+}
+
+void _starpu_mpi_early_request_check_termination()
+{
+	STARPU_ASSERT_MSG(_starpu_mpi_early_request_count() == 0, "Number of receive requests left is not zero");
+}
+
+struct _starpu_mpi_req* _starpu_mpi_early_request_find(int mpi_tag, int source)
+{
+	struct _starpu_mpi_req* req;
+
+	HASH_FIND_INT(_starpu_mpi_app_req_hashmap[source], &mpi_tag, req);
+
+	return req;
+}
+
+void _starpu_mpi_early_request_add(struct _starpu_mpi_req *req)
+{
+	struct _starpu_mpi_req *test_req;
+
+	test_req = _starpu_mpi_early_request_find(req->mpi_tag, req->srcdst);
+
+	if (test_req == NULL)
+	{
+		HASH_ADD_INT(_starpu_mpi_app_req_hashmap[req->srcdst], mpi_tag, req);
+		_starpu_mpi_app_req_hashmap_count ++;
+		_STARPU_MPI_DEBUG(3, "Adding request %p with tag %d in the application request hashmap[%d]\n", req, req->mpi_tag, req->srcdst);
+	}
+	else
+	{
+		_STARPU_MPI_DEBUG(3, "[Error] request %p with tag %d already in the application request hashmap[%d]\n", req, req->mpi_tag, req->srcdst);
+		int seq_const = starpu_data_get_sequential_consistency_flag(req->data_handle);
+		if (seq_const &&  req->sequential_consistency)
+		{
+			STARPU_ASSERT_MSG(!test_req, "[Error] request %p with tag %d wanted to be added to the application request hashmap[%d], while another request %p with the same tag is already in it. \n Sequential consistency is activated : this is not supported by StarPU.", req, req->mpi_tag, req->srcdst, test_req);
+		}
+		else
+		{
+			STARPU_ASSERT_MSG(!test_req, "[Error] request %p with tag %d wanted to be added to the application request hashmap[%d], while another request %p with the same tag is already in it. \n Sequential consistency isn't activated for this handle : you should want to add dependencies between requests for which the sequential consistency is deactivated.", req, req->mpi_tag, req->srcdst, test_req);
+		}
+	}
+}
+
+void _starpu_mpi_early_request_delete(struct _starpu_mpi_req *req)
+{
+	struct _starpu_mpi_req *test_req;
+
+	test_req = _starpu_mpi_early_request_find(req->mpi_tag, req->srcdst);
+
+	if (test_req != NULL)
+	{
+		HASH_DEL(_starpu_mpi_app_req_hashmap[req->srcdst], req);
+		_starpu_mpi_app_req_hashmap_count --;
+		_STARPU_MPI_DEBUG(3, "Deleting application request %p with tag %d from the application request hashmap[%d]\n", req, req->mpi_tag, req->srcdst);
+	}
+	else
+	{
+		_STARPU_MPI_DEBUG(3, "[Warning] request %p with tag %d is NOT in the application request hashmap[%d]\n", req, req->mpi_tag, req->srcdst);
+	}
+}
+

+ 44 - 0
mpi/src/starpu_mpi_early_request.h

@@ -0,0 +1,44 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2009, 2010-2014  Université de Bordeaux 1
+ * Copyright (C) 2010, 2011, 2012, 2013, 2014  Centre National de la Recherche Scientifique
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#ifndef __STARPU_MPI_EARLY_REQUEST_H__
+#define __STARPU_MPI_EARLY_REQUEST_H__
+
+#include <starpu.h>
+#include <stdlib.h>
+#include <mpi.h>
+#include <common/config.h>
+#include <common/list.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+void _starpu_mpi_early_request_init(int world_size);
+void _starpu_mpi_early_request_free();
+int _starpu_mpi_early_request_count();
+void _starpu_mpi_early_request_check_termination();
+
+void _starpu_mpi_early_request_add(struct _starpu_mpi_req *req);
+struct _starpu_mpi_req* _starpu_mpi_early_request_find(int mpi_tag, int source);
+void _starpu_mpi_early_request_delete(struct _starpu_mpi_req *req);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* __STARPU_MPI_EARLY_REQUEST_H__ */