瀏覽代碼

mpi: externalise function which select the node to execute the codelet
when there is several nodes having data in W mode

Nathalie Furmento 10 年之前
父節點
當前提交
d9685df9a7

+ 2 - 0
mpi/src/Makefile.am

@@ -39,6 +39,7 @@ noinst_HEADERS =					\
 	starpu_mpi_task_insert.h			\
 	starpu_mpi_task_insert.h			\
 	starpu_mpi_datatype.h				\
 	starpu_mpi_datatype.h				\
 	starpu_mpi_cache.h				\
 	starpu_mpi_cache.h				\
+	starpu_mpi_select_node.h			\
 	starpu_mpi_cache_stats.h			\
 	starpu_mpi_cache_stats.h			\
 	starpu_mpi_early_data.h				\
 	starpu_mpi_early_data.h				\
 	starpu_mpi_early_request.h
 	starpu_mpi_early_request.h
@@ -52,6 +53,7 @@ libstarpumpi_@STARPU_EFFECTIVE_VERSION@_la_SOURCES =	\
 	starpu_mpi_stats.c				\
 	starpu_mpi_stats.c				\
 	starpu_mpi_private.c				\
 	starpu_mpi_private.c				\
 	starpu_mpi_cache.c				\
 	starpu_mpi_cache.c				\
+	starpu_mpi_select_node.c			\
 	starpu_mpi_cache_stats.c			\
 	starpu_mpi_cache_stats.c			\
 	starpu_mpi_early_data.c				\
 	starpu_mpi_early_data.c				\
 	starpu_mpi_early_request.c
 	starpu_mpi_early_request.c

+ 59 - 0
mpi/src/starpu_mpi_select_node.c

@@ -0,0 +1,59 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2014  Centre National de la Recherche Scientifique
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <stdarg.h>
+#include <mpi.h>
+
+#include <starpu.h>
+#include <starpu_data.h>
+#include <starpu_mpi_private.h>
+#include <starpu_mpi_select_node.h>
+#include <starpu_mpi_task_insert.h>
+#include <datawizard/coherency.h>
+
+int _starpu_mpi_select_node(int me, int nb_nodes, struct starpu_data_descr *descr, int nb_data)
+{
+	size_t *size_on_nodes;
+	size_t max_size;
+	int i;
+	int xrank;
+
+	size_on_nodes = (size_t *)calloc(1, nb_nodes * sizeof(size_t));
+
+	for(i= 0 ; i<nb_data ; i++)
+	{
+		starpu_data_handle_t data = descr[i].handle;
+		enum starpu_data_access_mode mode = descr[i].mode;
+		if (mode & STARPU_R)
+		{
+			int rank = starpu_data_get_rank(data);
+			size_on_nodes[rank] += data->ops->get_size(data);
+		}
+	}
+
+	// We select the node which has the most data in R mode
+	max_size = 0;
+	for(i=0 ; i<nb_nodes ; i++)
+	{
+		if (size_on_nodes[i] > max_size)
+		{
+			max_size = size_on_nodes[i];
+			xrank = i;
+		}
+	}
+
+	return xrank;
+}

+ 32 - 0
mpi/src/starpu_mpi_select_node.h

@@ -0,0 +1,32 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2014  Centre National de la Recherche Scientifique
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#ifndef __STARPU_MPI_SELECT_NODE_H__
+#define __STARPU_MPI_SELECT_NODE_H__
+
+#include <mpi.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+int _starpu_mpi_select_node(int me, int nb_nodes, struct starpu_data_descr *descr, int nb_data);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // __STARPU_MPI_SELECT_NODE_H__

+ 142 - 60
mpi/src/starpu_mpi_task_insert.c

@@ -29,21 +29,11 @@
 #include <starpu_mpi_private.h>
 #include <starpu_mpi_private.h>
 #include <starpu_mpi_task_insert.h>
 #include <starpu_mpi_task_insert.h>
 #include <starpu_mpi_cache.h>
 #include <starpu_mpi_cache.h>
-
-typedef void (*_starpu_callback_func_t)(void *);
+#include <starpu_mpi_select_node.h>
 
 
 static
 static
-int _starpu_mpi_find_executee_node(starpu_data_handle_t data, enum starpu_data_access_mode mode, int me, int *do_execute, int *inconsistent_execute, int *dest, size_t *size_on_nodes)
+int _starpu_mpi_find_executee_node(starpu_data_handle_t data, enum starpu_data_access_mode mode, int me, int *do_execute, int *inconsistent_execute, int *dest)
 {
 {
-	if (data && mode & STARPU_R)
-	{
-		struct starpu_data_interface_ops *ops;
-		int rank = starpu_data_get_rank(data);
-
-		ops = data->ops;
-		size_on_nodes[rank] += ops->get_size(data);
-	}
-
 	if (mode & STARPU_W)
 	if (mode & STARPU_W)
 	{
 	{
 		if (!data)
 		if (!data)
@@ -58,35 +48,43 @@ int _starpu_mpi_find_executee_node(starpu_data_handle_t data, enum starpu_data_a
 			_STARPU_MPI_LOG_OUT();
 			_STARPU_MPI_LOG_OUT();
 			return -EINVAL;
 			return -EINVAL;
 		}
 		}
+
 		int mpi_rank = starpu_data_get_rank(data);
 		int mpi_rank = starpu_data_get_rank(data);
+		if (mpi_rank == -1)
+		{
+			_STARPU_ERROR("Data %p with mode STARPU_W needs to have a valid rank", data);
+		}
+
 		if (mpi_rank == me)
 		if (mpi_rank == me)
 		{
 		{
+			// This node owns the data
 			if (*do_execute == 0)
 			if (*do_execute == 0)
 			{
 			{
+				// Another node has already been selected to execute the codelet
 				*inconsistent_execute = 1;
 				*inconsistent_execute = 1;
 			}
 			}
 			else
 			else
 			{
 			{
+				// This node is going to execute the codelet
 				*do_execute = 1;
 				*do_execute = 1;
 			}
 			}
 		}
 		}
-		else if (mpi_rank != -1)
+		else
 		{
 		{
+			// Another node owns the data
 			if (*do_execute == 1)
 			if (*do_execute == 1)
 			{
 			{
+				// But this node has already been selected to execute the codelet
 				*inconsistent_execute = 1;
 				*inconsistent_execute = 1;
 			}
 			}
 			else
 			else
 			{
 			{
+				// This node will not execute the codelet
 				*do_execute = 0;
 				*do_execute = 0;
 				*dest = mpi_rank;
 				*dest = mpi_rank;
 				/* That's the rank which needs the data to be sent to */
 				/* That's the rank which needs the data to be sent to */
 			}
 			}
 		}
 		}
-		else
-		{
-			_STARPU_ERROR("rank %d invalid\n", mpi_rank);
-		}
 	}
 	}
 	return 0;
 	return 0;
 }
 }
@@ -198,15 +196,128 @@ void _starpu_mpi_clear_data_after_execution(starpu_data_handle_t data, enum star
 }
 }
 
 
 static
 static
+int _starpu_mpi_task_select_node(struct starpu_codelet *codelet, int me, int nb_nodes, va_list varg_list, int nb_data)
+{
+	va_list varg_list_copy;
+	int arg_type, arg_type_nocommute;
+	int current_data=0;
+	int rank;
+	struct starpu_data_descr *descr;
+
+	descr = (struct starpu_data_descr *) malloc(nb_data * sizeof(struct starpu_data_descr));
+	va_copy(varg_list_copy, varg_list);
+	while ((arg_type = va_arg(varg_list_copy, int)) != 0)
+	{
+		arg_type_nocommute = arg_type & ~STARPU_COMMUTE;
+		if (arg_type==STARPU_EXECUTE_ON_NODE)
+		{
+			(void) va_arg(varg_list_copy, int);
+		}
+		else if (arg_type==STARPU_EXECUTE_ON_DATA)
+		{
+			(void) va_arg(varg_list_copy, starpu_data_handle_t);
+		}
+		else if (arg_type==STARPU_EXECUTE_ON_WORKER)
+		{
+			(void)va_arg(varg_list_copy, int);
+		}
+		else if (arg_type==STARPU_WORKER_ORDER)
+		{
+			(void)va_arg(varg_list_copy, unsigned);
+		}
+		else if (arg_type_nocommute==STARPU_R || arg_type_nocommute==STARPU_W || arg_type_nocommute==STARPU_RW || arg_type==STARPU_SCRATCH || arg_type==STARPU_REDUX)
+		{
+			descr[current_data].handle = va_arg(varg_list_copy, starpu_data_handle_t);
+			descr[current_data].mode = (enum starpu_data_access_mode) arg_type;
+			current_data ++;
+		}
+		else if (arg_type == STARPU_DATA_ARRAY)
+		{
+			starpu_data_handle_t *datas = va_arg(varg_list_copy, starpu_data_handle_t *);
+			int nb_handles = va_arg(varg_list_copy, int);
+			int i;
+			for(i=0 ; i<nb_handles ; i++)
+			{
+				descr[current_data].handle = datas[i];
+				descr[current_data].mode = STARPU_CODELET_GET_MODE(codelet, current_data);
+				current_data ++;
+			}
+		}
+		else if (arg_type==STARPU_VALUE)
+		{
+			(void)va_arg(varg_list_copy, void *);
+			(void)va_arg(varg_list_copy, size_t);
+		}
+		else if (arg_type==STARPU_CALLBACK)
+		{
+			(void)va_arg(varg_list_copy, _starpu_callback_func_t);
+		}
+		else if (arg_type==STARPU_CALLBACK_WITH_ARG)
+		{
+			(void)va_arg(varg_list_copy, _starpu_callback_func_t);
+			(void)va_arg(varg_list_copy, void *);
+		}
+		else if (arg_type==STARPU_CALLBACK_ARG)
+		{
+			(void)va_arg(varg_list_copy, void *);
+		}
+		else if (arg_type==STARPU_PROLOGUE_CALLBACK)
+                {
+			(void)va_arg(varg_list_copy, _starpu_callback_func_t);
+		}
+                else if (arg_type==STARPU_PROLOGUE_CALLBACK_ARG)
+                {
+                        (void)va_arg(varg_list_copy, void *);
+                }
+                else if (arg_type==STARPU_PROLOGUE_CALLBACK_POP)
+                {
+			(void)va_arg(varg_list_copy, _starpu_callback_func_t);
+                }
+                else if (arg_type==STARPU_PROLOGUE_CALLBACK_POP_ARG)
+                {
+                        (void)va_arg(varg_list_copy, void *);
+		}
+		else if (arg_type==STARPU_PRIORITY)
+		{
+			(void)va_arg(varg_list_copy, int);
+		}
+		else if (arg_type==STARPU_HYPERVISOR_TAG)
+		{
+			(void)va_arg(varg_list_copy, int);
+		}
+		else if (arg_type==STARPU_FLOPS)
+		{
+			(void)va_arg(varg_list_copy, double);
+		}
+		else if (arg_type==STARPU_TAG_ONLY)
+		{
+			(void)va_arg(varg_list, starpu_tag_t);
+		}
+		else if (arg_type==STARPU_TAG)
+		{
+			STARPU_ASSERT_MSG(0, "STARPU_TAG is not supported in MPI mode\n");
+		}
+		else
+		{
+			STARPU_ABORT_MSG("Unrecognized argument %d\n", arg_type);
+		}
+
+	}
+	va_end(varg_list_copy);
+
+	rank = _starpu_mpi_select_node(me, nb_nodes, descr, nb_data);
+	free(descr);
+	return rank;
+}
+
+static
 int _starpu_mpi_task_decode_v(struct starpu_codelet *codelet, int me, int nb_nodes, int *xrank, int *dest, int *do_execute, va_list varg_list)
 int _starpu_mpi_task_decode_v(struct starpu_codelet *codelet, int me, int nb_nodes, int *xrank, int *dest, int *do_execute, va_list varg_list)
 {
 {
 	va_list varg_list_copy;
 	va_list varg_list_copy;
 	int inconsistent_execute = 0;
 	int inconsistent_execute = 0;
 	int arg_type, arg_type_nocommute;
 	int arg_type, arg_type_nocommute;
-	size_t *size_on_nodes;
 	int current_data = 0;
 	int current_data = 0;
 
 
-	size_on_nodes = (size_t *)calloc(1, nb_nodes * sizeof(size_t));
 	*do_execute = -1;
 	*do_execute = -1;
 	*xrank = -1;
 	*xrank = -1;
 	va_copy(varg_list_copy, varg_list);
 	va_copy(varg_list_copy, varg_list);
@@ -216,7 +327,7 @@ int _starpu_mpi_task_decode_v(struct starpu_codelet *codelet, int me, int nb_nod
 		if (arg_type==STARPU_EXECUTE_ON_NODE)
 		if (arg_type==STARPU_EXECUTE_ON_NODE)
 		{
 		{
 			*xrank = va_arg(varg_list_copy, int);
 			*xrank = va_arg(varg_list_copy, int);
-			_STARPU_MPI_DEBUG(1, "Executing on node %d\n", *xrank);
+			_STARPU_MPI_DEBUG(100, "Executing on node %d\n", *xrank);
 			*do_execute = 1;
 			*do_execute = 1;
 		}
 		}
 		else if (arg_type==STARPU_EXECUTE_ON_DATA)
 		else if (arg_type==STARPU_EXECUTE_ON_DATA)
@@ -224,7 +335,7 @@ int _starpu_mpi_task_decode_v(struct starpu_codelet *codelet, int me, int nb_nod
 			starpu_data_handle_t data = va_arg(varg_list_copy, starpu_data_handle_t);
 			starpu_data_handle_t data = va_arg(varg_list_copy, starpu_data_handle_t);
 			*xrank = starpu_data_get_rank(data);
 			*xrank = starpu_data_get_rank(data);
 			STARPU_ASSERT_MSG(*xrank != -1, "Rank of the data must be set using starpu_mpi_data_register() or starpu_data_set_rank()");
 			STARPU_ASSERT_MSG(*xrank != -1, "Rank of the data must be set using starpu_mpi_data_register() or starpu_data_set_rank()");
-			_STARPU_MPI_DEBUG(1, "Executing on data node %d\n", *xrank);
+			_STARPU_MPI_DEBUG(100, "Executing on data node %d\n", *xrank);
 			STARPU_ASSERT_MSG(*xrank <= nb_nodes, "Node %d to execute codelet is not a valid node (%d)", *xrank, nb_nodes);
 			STARPU_ASSERT_MSG(*xrank <= nb_nodes, "Node %d to execute codelet is not a valid node (%d)", *xrank, nb_nodes);
 			*do_execute = 1;
 			*do_execute = 1;
 		}
 		}
@@ -244,10 +355,9 @@ int _starpu_mpi_task_decode_v(struct starpu_codelet *codelet, int me, int nb_nod
 		{
 		{
 			starpu_data_handle_t data = va_arg(varg_list_copy, starpu_data_handle_t);
 			starpu_data_handle_t data = va_arg(varg_list_copy, starpu_data_handle_t);
 			enum starpu_data_access_mode mode = (enum starpu_data_access_mode) arg_type;
 			enum starpu_data_access_mode mode = (enum starpu_data_access_mode) arg_type;
-			int ret = _starpu_mpi_find_executee_node(data, mode, me, do_execute, &inconsistent_execute, dest, size_on_nodes);
+			int ret = _starpu_mpi_find_executee_node(data, mode, me, do_execute, &inconsistent_execute, dest);
 			if (ret == -EINVAL)
 			if (ret == -EINVAL)
 			{
 			{
-				free(size_on_nodes);
 				return ret;
 				return ret;
 			}
 			}
 			current_data ++;
 			current_data ++;
@@ -260,10 +370,9 @@ int _starpu_mpi_task_decode_v(struct starpu_codelet *codelet, int me, int nb_nod
 			for(i=0 ; i<nb_handles ; i++)
 			for(i=0 ; i<nb_handles ; i++)
 			{
 			{
 				enum starpu_data_access_mode mode = STARPU_CODELET_GET_MODE(codelet, current_data);
 				enum starpu_data_access_mode mode = STARPU_CODELET_GET_MODE(codelet, current_data);
-				int ret = _starpu_mpi_find_executee_node(datas[i], mode, me, do_execute, &inconsistent_execute, dest, size_on_nodes);
+				int ret = _starpu_mpi_find_executee_node(datas[i], mode, me, do_execute, &inconsistent_execute, dest);
 				if (ret == -EINVAL)
 				if (ret == -EINVAL)
 				{
 				{
-					free(size_on_nodes);
 					return ret;
 					return ret;
 				}
 				}
 				current_data ++;
 				current_data ++;
@@ -331,42 +440,15 @@ int _starpu_mpi_task_decode_v(struct starpu_codelet *codelet, int me, int nb_nod
 	}
 	}
 	va_end(varg_list_copy);
 	va_end(varg_list_copy);
 
 
-	if (*do_execute == -1)
-	{
-		int i;
-		size_t max_size = 0;
-		for(i=0 ; i<nb_nodes ; i++)
-		{
-			if (size_on_nodes[i] > max_size)
-			{
-				max_size = size_on_nodes[i];
-				*xrank = i;
-			}
-		}
-		if (*xrank != -1)
-		{
-			_STARPU_MPI_DEBUG(1, "Node %d is having the most R data\n", *xrank);
-			*do_execute = 1;
-		}
-	}
-	free(size_on_nodes);
-
-	STARPU_ASSERT_MSG(*do_execute != -1, "StarPU needs to see a W or a REDUX data which will tell it where to execute the task");
-
-	if (inconsistent_execute == 1)
+	if (inconsistent_execute == 1 && *xrank == -1)
 	{
 	{
-		if (*xrank == -1)
-		{
-			_STARPU_MPI_DEBUG(1, "Different tasks are owning W data. Needs to specify which one is to execute the codelet, using STARPU_EXECUTE_ON_NODE or STARPU_EXECUTE_ON_DATA\n");
-			return -EINVAL;
-		}
-		else
-		{
-			*do_execute = (me == *xrank);
-			*dest = *xrank;
-		}
+		// We need to find out which node is going to execute the codelet.
+		_STARPU_MPI_DISP("Different nodes are owning W data. Need to specify which node is going to execute the codelet, using STARPU_EXECUTE_ON_NODE or STARPU_EXECUTE_ON_DATA\n");
+		*xrank = _starpu_mpi_task_select_node(codelet, me, nb_nodes, varg_list, current_data);
+		*do_execute = (me == *xrank);
+		*dest = *xrank;
 	}
 	}
-	else if (*xrank != -1)
+	else
 	{
 	{
 		*do_execute = (me == *xrank);
 		*do_execute = (me == *xrank);
 		*dest = *xrank;
 		*dest = *xrank;
@@ -517,7 +599,7 @@ int _starpu_mpi_task_build_v(MPI_Comm comm, struct starpu_codelet *codelet, stru
 			va_end(varg_list_copy);
 			va_end(varg_list_copy);
 		}
 		}
 
 
-		_STARPU_MPI_DEBUG(1, "Execution of the codelet %p (%s)\n", codelet, codelet->name);
+		_STARPU_MPI_DEBUG(100, "Execution of the codelet %p (%s)\n", codelet, codelet->name);
 
 
 		*task = starpu_task_create();
 		*task = starpu_task_create();
 		(*task)->cl_arg_free = 1;
 		(*task)->cl_arg_free = 1;

+ 3 - 1
mpi/src/starpu_mpi_task_insert.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  *
- * Copyright (C) 2012  Centre National de la Recherche Scientifique
+ * Copyright (C) 2012, 2014  Centre National de la Recherche Scientifique
  *
  *
  * StarPU is free software; you can redistribute it and/or modify
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
  * it under the terms of the GNU Lesser General Public License as published by
@@ -23,6 +23,8 @@
 extern "C" {
 extern "C" {
 #endif
 #endif
 
 
+typedef void (*_starpu_callback_func_t)(void *);
+
 void _starpu_mpi_cache_init(MPI_Comm comm);
 void _starpu_mpi_cache_init(MPI_Comm comm);
 void _starpu_mpi_cache_free(int world_size);
 void _starpu_mpi_cache_free(int world_size);