Procházet zdrojové kódy

mpi: new functions to register and unregister node selection policies

Nathalie Furmento před 10 roky
rodič
revize
ba89cf40f6

+ 3 - 0
ChangeLog

@@ -43,6 +43,9 @@ New features:
         - New flag STARPU_NODE_SELECTION_POLICY to specify a policy for
           selecting a node to execute the codelet when several nodes
 	  own data in W mode.
+	- New selection node policies can be un/registered with the
+	  functions starpu_mpi_node_selection_register_policy() and
+	  starpu_mpi_node_selection_unregister_policy()
 
   * New STARPU_COMMUTE flag which can be passed along STARPU_W or STARPU_RW to
     let starpu commute write accesses.

+ 37 - 0
doc/doxygen/chapters/api/mpi.doxy

@@ -327,6 +327,10 @@ owner if needed. At least the target node and the owner have to call
 the function. On reception, the \p callback function is called with
 the argument \p arg.
 
+@name Node Selection Policy
+\anchor MPINodeSelectionPolicy
+\ingroup API_MPI_Support
+
 \fn int starpu_mpi_node_selection_get_current_policy()
 \ingroup API_MPI_Support
 Return the current policy used to select the node which will execute the codelet
@@ -338,6 +342,39 @@ execute the codelet. The policy "node_with_most_R_data" selects the
 node having the most data in R mode so as to minimize the amount of
 data to be transfered.
 
+\fn int starpu_mpi_node_selection_register_policy(starpu_mpi_select_node_policy_func_t policy_func)
+\ingroup API_MPI_Support
+Register a new policy which can then be used when there is several nodes owning data in W mode.
+Here an example of function defining a node selection policy.
+The codelet will be executed on the node owing the first data with a size bigger than 1M, or on the node
+0 if no data fits the given size.
+\code{.c}
+int my_node_selection_policy(int me, int nb_nodes, struct starpu_data_descr *descr, int nb_data)
+{
+	// me is the current MPI rank
+	// nb_nodes is the number of MPI nodes
+	// descr is the description of the data specified when calling starpu_mpi_task_insert
+	// nb_data is the number of data in descr
+	int i;
+	for(i= 0 ; i<nb_data ; i++)
+	{
+		starpu_data_handle_t data = descr[i].handle;
+		enum starpu_data_access_mode mode = descr[i].mode;
+		if (mode & STARPU_R)
+		{
+			int rank = starpu_data_get_rank(data);
+			size_t size = starpu_data_get_size(data);
+			if (size > 1024*1024) return rank;
+		}
+	}
+	return 0;
+}
+\endcode
+
+\fn int starpu_mpi_node_selection_unregister_policy(int policy)
+\ingroup API_MPI_Support
+Unregister a previously registered policy.
+
 @name Collective Operations
 \anchor MPICollectiveOperations
 \ingroup API_MPI_Support

+ 6 - 2
mpi/include/starpu_mpi.h

@@ -82,8 +82,12 @@ void starpu_mpi_set_communication_tag(int tag);
 
 void starpu_mpi_data_register(starpu_data_handle_t data_handle, int tag, int rank);
 
-#define STARPU_MPI_NODE_SELECTION_CURRENT_POLICY 0
-#define STARPU_MPI_NODE_SELECTION_MOST_R_DATA    1
+#define STARPU_MPI_NODE_SELECTION_CURRENT_POLICY -1
+#define STARPU_MPI_NODE_SELECTION_MOST_R_DATA    0
+
+typedef int (*starpu_mpi_select_node_policy_func_t)(int me, int nb_nodes, struct starpu_data_descr *descr, int nb_data);
+int starpu_mpi_node_selection_register_policy(starpu_mpi_select_node_policy_func_t policy_func);
+int starpu_mpi_node_selection_unregister_policy(int policy);
 
 int starpu_mpi_node_selection_get_current_policy();
 int starpu_mpi_node_selection_set_current_policy(int policy);

+ 3 - 1
mpi/src/starpu_mpi.c

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2009, 2010-2015  Université de Bordeaux
- * Copyright (C) 2010, 2011, 2012, 2013, 2014  Centre National de la Recherche Scientifique
+ * Copyright (C) 2010, 2011, 2012, 2013, 2014, 2015  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -24,6 +24,7 @@
 #include <starpu_mpi_task_insert.h>
 #include <starpu_mpi_early_data.h>
 #include <starpu_mpi_early_request.h>
+#include <starpu_mpi_select_node.h>
 #include <common/config.h>
 #include <common/thread.h>
 #include <datawizard/interfaces/data_interface.h>
@@ -1120,6 +1121,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 	_starpu_mpi_add_sync_point_in_fxt();
 	_starpu_mpi_comm_amounts_init(MPI_COMM_WORLD);
 	_starpu_mpi_cache_init(MPI_COMM_WORLD);
+	_starpu_mpi_select_node_init();
 
 	_starpu_mpi_early_request_init(worldsize);
 	_starpu_mpi_early_data_init(worldsize);

+ 41 - 8
mpi/src/starpu_mpi_select_node.c

@@ -18,6 +18,7 @@
 #include <mpi.h>
 
 #include <starpu.h>
+#include <starpu_mpi.h>
 #include <starpu_data.h>
 #include <starpu_mpi_private.h>
 #include <starpu_mpi_select_node.h>
@@ -25,6 +26,19 @@
 #include <datawizard/coherency.h>
 
 static int _current_policy = STARPU_MPI_NODE_SELECTION_MOST_R_DATA;
+static int _last_predefined_policy = STARPU_MPI_NODE_SELECTION_MOST_R_DATA;
+static starpu_mpi_select_node_policy_func_t _policies[_STARPU_MPI_NODE_SELECTION_MAX_POLICY];
+
+int _starpu_mpi_select_node_with_most_R_data(int me, int nb_nodes, struct starpu_data_descr *descr, int nb_data);
+
+void _starpu_mpi_select_node_init()
+{
+	int i;
+
+	_policies[STARPU_MPI_NODE_SELECTION_MOST_R_DATA] = _starpu_mpi_select_node_with_most_R_data;
+	for(i=_last_predefined_policy+1 ; i<_STARPU_MPI_NODE_SELECTION_MAX_POLICY ; i++)
+		_policies[i] = NULL;
+}
 
 int starpu_mpi_node_selection_get_current_policy()
 {
@@ -33,13 +47,32 @@ int starpu_mpi_node_selection_get_current_policy()
 
 int starpu_mpi_node_selection_set_current_policy(int policy)
 {
-#ifdef STARPU_DEVEL
-#warning need to check the policy is valid
-#endif
+	STARPU_ASSERT_MSG(_policies[policy] != NULL, "Policy %d invalid.\n", policy);
 	_current_policy = policy;
 	return 0;
 }
 
+int starpu_mpi_node_selection_register_policy(starpu_mpi_select_node_policy_func_t policy_func)
+{
+	int i=_last_predefined_policy+1;
+	// Look for a unregistered policy
+	while(i<_STARPU_MPI_NODE_SELECTION_MAX_POLICY)
+	{
+		if (_policies[i] == NULL) break;
+		i++;
+	}
+	STARPU_ASSERT_MSG(_policies[i] == NULL, "No unused policy available. Unregister existing policies before registering a new one.");
+	_policies[i] = policy_func;
+	return i;
+}
+
+int starpu_mpi_node_selection_unregister_policy(int policy)
+{
+	STARPU_ASSERT_MSG(policy > _last_predefined_policy, "Policy %d invalid. Only user-registered policies can be unregistered\n", policy);
+	_policies[policy] = NULL;
+	return 0;
+}
+
 int _starpu_mpi_select_node_with_most_R_data(int me, int nb_nodes, struct starpu_data_descr *descr, int nb_data)
 {
 	size_t *size_on_nodes;
@@ -77,9 +110,9 @@ int _starpu_mpi_select_node_with_most_R_data(int me, int nb_nodes, struct starpu
 
 int _starpu_mpi_select_node(int me, int nb_nodes, struct starpu_data_descr *descr, int nb_data, int policy)
 {
-	int current_policy = policy == STARPU_MPI_NODE_SELECTION_CURRENT_POLICY ? _current_policy : policy;
-	if (current_policy == STARPU_MPI_NODE_SELECTION_MOST_R_DATA)
-		return _starpu_mpi_select_node_with_most_R_data(me, nb_nodes, descr, nb_data);
-	else
-		STARPU_ABORT_MSG("Node selection policy <%d> unknown\n", current_policy);
+	int ppolicy = policy == STARPU_MPI_NODE_SELECTION_CURRENT_POLICY ? _current_policy : policy;
+	STARPU_ASSERT_MSG(ppolicy < _STARPU_MPI_NODE_SELECTION_MAX_POLICY, "Invalid policy %d\n", ppolicy);
+	STARPU_ASSERT_MSG(_policies[ppolicy], "Unregistered policy %d\n", ppolicy);
+	starpu_mpi_select_node_policy_func_t func = _policies[ppolicy];
+	return func(me, nb_nodes, descr, nb_data);
 }

+ 3 - 0
mpi/src/starpu_mpi_select_node.h

@@ -23,6 +23,9 @@
 extern "C" {
 #endif
 
+#define _STARPU_MPI_NODE_SELECTION_MAX_POLICY 24
+
+void _starpu_mpi_select_node_init();
 int _starpu_mpi_select_node(int me, int nb_nodes, struct starpu_data_descr *descr, int nb_data, int policy);
 
 #ifdef __cplusplus