浏览代码

mpi/src: refactor code for mpi_cache

Nathalie Furmento 12 年之前
父节点
当前提交
aa3e868b7a
共有 4 个文件被更改,包括 91 次插入44 次删除
  1. 1 0
      mpi/src/Makefile.am
  2. 2 4
      mpi/src/starpu_mpi.c
  3. 56 40
      mpi/src/starpu_mpi_insert_task.c
  4. 32 0
      mpi/src/starpu_mpi_insert_task.h

+ 1 - 0
mpi/src/Makefile.am

@@ -36,6 +36,7 @@ noinst_HEADERS =					\
 	starpu_mpi_private.h				\
 	starpu_mpi_fxt.h				\
 	starpu_mpi_stats.h				\
+	starpu_mpi_insert_task.h			\
 	starpu_mpi_datatype.h
 
 libstarpumpi_@STARPU_EFFECTIVE_VERSION@_la_SOURCES =	\

+ 2 - 4
mpi/src/starpu_mpi.c

@@ -22,6 +22,7 @@
 #include <starpu_mpi_private.h>
 #include <starpu_profiling.h>
 #include <starpu_mpi_stats.h>
+#include <starpu_mpi_insert_task.h>
 
 /* TODO find a better way to select the polling method (perhaps during the
  * configuration) */
@@ -781,10 +782,6 @@ static void _starpu_mpi_add_sync_point_in_fxt(void)
 static
 int _starpu_mpi_initialize(int initialize_mpi, int *rank, int *world_size)
 {
-#ifndef STARPU_MPI_CACHE
-	if (!getenv("STARPU_SILENT")) fprintf(stderr,"Warning: StarPU was configured with --disable-mpi-cache\n");
-#endif
-
 	_STARPU_PTHREAD_MUTEX_INIT(&mutex, NULL);
 	_STARPU_PTHREAD_COND_INIT(&cond_progression, NULL);
 	_STARPU_PTHREAD_COND_INIT(&cond_finished, NULL);
@@ -822,6 +819,7 @@ int _starpu_mpi_initialize(int initialize_mpi, int *rank, int *world_size)
 
 	_starpu_mpi_add_sync_point_in_fxt();
 	_starpu_mpi_comm_amounts_init(MPI_COMM_WORLD);
+	_starpu_mpi_tables_init(MPI_COMM_WORLD);
 	return 0;
 }
 

+ 56 - 40
mpi/src/starpu_mpi_insert_task.c

@@ -40,20 +40,20 @@ struct _starpu_data_entry **sent_data = NULL;
 struct _starpu_data_entry **received_data = NULL;
 #endif /* STARPU_MPI_CACHE */
 
-static void _starpu_mpi_tables_init()
+void _starpu_mpi_tables_init(MPI_Comm comm)
 {
 #ifdef STARPU_MPI_CACHE
-	if (sent_data == NULL) {
-		int nb_nodes;
-		int i;
+	int nb_nodes;
+	int i;
 
-		MPI_Comm_size(MPI_COMM_WORLD, &nb_nodes);
-		_STARPU_MPI_DEBUG("Initialising htable for cache\n");
-		sent_data = malloc(nb_nodes * sizeof(struct _starpu_data_entry *));
-		for(i=0 ; i<nb_nodes ; i++) sent_data[i] = NULL;
-		received_data = malloc(nb_nodes * sizeof(struct _starpu_data_entry *));
-		for(i=0 ; i<nb_nodes ; i++) received_data[i] = NULL;
-	}
+	MPI_Comm_size(comm, &nb_nodes);
+	_STARPU_MPI_DEBUG("Initialising htable for cache\n");
+	sent_data = malloc(nb_nodes * sizeof(struct _starpu_data_entry *));
+	for(i=0 ; i<nb_nodes ; i++) sent_data[i] = NULL;
+	received_data = malloc(nb_nodes * sizeof(struct _starpu_data_entry *));
+	for(i=0 ; i<nb_nodes ; i++) received_data[i] = NULL;
+#else
+	if (!getenv("STARPU_SILENT")) fprintf(stderr,"Warning: StarPU was configured with --disable-mpi-cache\n");
 #endif /* STARPU_MPI_CACHE */
 }
 
@@ -107,6 +107,47 @@ int _starpu_mpi_find_executee_node(starpu_data_handle_t data, enum starpu_access
 }
 
 static
+void *_starpu_mpi_already_received(starpu_data_handle_t data, int mpi_rank)
+{
+#ifdef STARPU_MPI_CACHE
+	struct _starpu_data_entry *already_received;
+	HASH_FIND_PTR(received_data[mpi_rank], &data, already_received);
+	if (already_received == NULL) {
+		struct _starpu_data_entry *entry = (struct _starpu_data_entry *)malloc(sizeof(*entry));
+		entry->data = data;
+		HASH_ADD_PTR(received_data[mpi_rank], data, entry);
+	}
+	else {
+		_STARPU_MPI_DEBUG("Do not receive data %p from node %d as it is already available\n", data, mpi_rank);
+	}
+	return already_received;
+#else
+	return NULL;
+#endif
+}
+
+static
+void *_starpu_mpi_already_sent(starpu_data_handle_t data, int dest)
+{
+#ifdef STARPU_MPI_CACHE
+	struct _starpu_data_entry *already_sent;
+	HASH_FIND_PTR(sent_data[dest], &data, already_sent);
+	if (already_sent == NULL) {
+		struct _starpu_data_entry *entry = (struct _starpu_data_entry *)malloc(sizeof(*entry));
+		entry->data = data;
+		HASH_ADD_PTR(sent_data[dest], data, entry);
+		_STARPU_MPI_DEBUG("Noting that data %p has already been sent to %d\n", data, dest);
+	}
+	else {
+		_STARPU_MPI_DEBUG("Do not send data %p to node %d as it has already been sent\n", data, dest);
+	}
+	return already_sent;
+#else
+	return NULL;
+#endif
+}
+
+static
 void _starpu_mpi_exchange_data_before_execution(starpu_data_handle_t data, enum starpu_access_mode mode, int me, int dest, int do_execute, MPI_Comm comm)
 {
 	if (data && mode & STARPU_R) {
@@ -123,19 +164,8 @@ void _starpu_mpi_exchange_data_before_execution(starpu_data_handle_t data, enum
 		/* The task needs to read this data */
 		if (do_execute && mpi_rank != me && mpi_rank != -1) {
 			/* I will have to execute but I don't have the data, receive */
-#ifdef STARPU_MPI_CACHE
-			struct _starpu_data_entry *already_received;
-			HASH_FIND_PTR(received_data[mpi_rank], &data, already_received);
-			if (already_received == NULL) {
-				struct _starpu_data_entry *entry = (struct _starpu_data_entry *)malloc(sizeof(*entry));
-				entry->data = data;
-				HASH_ADD_PTR(received_data[mpi_rank], data, entry);
-			}
-			else {
-				_STARPU_MPI_DEBUG("Do not receive data %p from node %d as it is already available\n", data, mpi_rank);
-			}
-			if (!already_received)
-#endif
+			void *already_received = _starpu_mpi_already_received(data, mpi_rank);
+			if (already_received == NULL)
 			{
 				_STARPU_MPI_DEBUG("Receive data %p from %d\n", data, mpi_rank);
 				starpu_mpi_irecv_detached(data, mpi_rank, mpi_tag, comm, NULL, NULL);
@@ -143,20 +173,8 @@ void _starpu_mpi_exchange_data_before_execution(starpu_data_handle_t data, enum
 		}
 		if (!do_execute && mpi_rank == me) {
 			/* Somebody else will execute it, and I have the data, send it. */
-#ifdef STARPU_MPI_CACHE
-			struct _starpu_data_entry *already_sent;
-			HASH_FIND_PTR(sent_data[dest], &data, already_sent);
-			if (already_sent == NULL) {
-				struct _starpu_data_entry *entry = (struct _starpu_data_entry *)malloc(sizeof(*entry));
-				entry->data = data;
-				HASH_ADD_PTR(sent_data[dest], data, entry);
-				_STARPU_MPI_DEBUG("Noting that data %p has already been sent to %d\n", data, dest);
-			}
-			else {
-				_STARPU_MPI_DEBUG("Do not send data %p to node %d as it has already been sent\n", data, dest);
-			}
-			if (!already_sent)
-#endif
+			void *already_sent = _starpu_mpi_already_sent(data, dest);
+			if (already_sent == NULL)
 			{
 				_STARPU_MPI_DEBUG("Send data %p to %d\n", data, dest);
 				starpu_mpi_isend_detached(data, dest, mpi_tag, comm, NULL, NULL);
@@ -251,8 +269,6 @@ int starpu_mpi_insert_task(MPI_Comm comm, struct starpu_codelet *codelet, ...)
 
 	size_on_nodes = (size_t *)calloc(1, nb_nodes * sizeof(size_t));
 
-	_starpu_mpi_tables_init();
-
 	/* Get the number of buffers and the size of the arguments */
 	va_start(varg_list, codelet);
 	arg_buffer_size = _starpu_insert_task_get_arg_size(varg_list);

+ 32 - 0
mpi/src/starpu_mpi_insert_task.h

@@ -0,0 +1,32 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2012  Centre National de la Recherche Scientifique
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#ifndef __STARPU_MPI_INSERT_TASK_H__
+#define __STARPU_MPI_INSERT_TASK_H__
+
+#include <mpi.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+void _starpu_mpi_tables_init(MPI_Comm comm);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // __STARPU_MPI_INSERT_TASK_H__