Sfoglia il codice sorgente

starpu_mpi_insert_task had been renamed in starpu_mpi_task_insert and
kept as a macro to avoid breaking old codes. However some codes (morse
for example) test the availability of starpu by checking if the
function starpu_mpi_insert_task exists. Hence the symbol
starpu_mpi_insert_task has been redefined as a function.

Nathalie Furmento 11 anni fa
parent
commit
e030f1a6df
2 ha cambiato i file con 93 aggiunte e 80 eliminazioni
  1. 3 1
      mpi/include/starpu_mpi.h
  2. 90 79
      mpi/src/starpu_mpi_task_insert.c

+ 3 - 1
mpi/include/starpu_mpi.h

@@ -48,7 +48,9 @@ int starpu_mpi_initialize_extended(int *rank, int *world_size) STARPU_DEPRECATED
 int starpu_mpi_shutdown(void);
 
 int starpu_mpi_task_insert(MPI_Comm comm, struct starpu_codelet *codelet, ...);
-#define starpu_mpi_insert_task starpu_mpi_task_insert
+/* the function starpu_mpi_insert_task has the same semantics as starpu_mpi_task_insert, it is kept to avoid breaking old codes */
+int starpu_mpi_insert_task(MPI_Comm comm, struct starpu_codelet *codelet, ...);
+
 void starpu_mpi_get_data_on_node(MPI_Comm comm, starpu_data_handle_t data_handle, int node);
 void starpu_mpi_get_data_on_node_detached(MPI_Comm comm, starpu_data_handle_t data_handle, int node, void (*callback)(void*), void *arg);
 void starpu_mpi_redux_data(MPI_Comm comm, starpu_data_handle_t data_handle);

+ 90 - 79
mpi/src/starpu_mpi_task_insert.c

@@ -363,10 +363,10 @@ void _starpu_mpi_clear_data_after_execution(starpu_data_handle_t data, enum star
 	}
 }
 
-int starpu_mpi_task_insert(MPI_Comm comm, struct starpu_codelet *codelet, ...)
+int _starpu_mpi_task_insert_v(MPI_Comm comm, struct starpu_codelet *codelet, va_list varg_list)
 {
 	int arg_type;
-	va_list varg_list;
+	va_list varg_list_copy;
 	int me, do_execute, xrank, nb_nodes;
 	size_t *size_on_nodes;
 	size_t arg_buffer_size = 0;
@@ -385,18 +385,18 @@ int starpu_mpi_task_insert(MPI_Comm comm, struct starpu_codelet *codelet, ...)
 	inconsistent_execute = 0;
 	do_execute = -1;
 	xrank = -1;
-	va_start(varg_list, codelet);
-	while ((arg_type = va_arg(varg_list, int)) != 0)
+	va_copy(varg_list_copy, varg_list);
+	while ((arg_type = va_arg(varg_list_copy, int)) != 0)
 	{
 		if (arg_type==STARPU_EXECUTE_ON_NODE)
 		{
-			xrank = va_arg(varg_list, int);
+			xrank = va_arg(varg_list_copy, int);
 			_STARPU_MPI_DEBUG(1, "Executing on node %d\n", xrank);
 			do_execute = 1;
 		}
 		else if (arg_type==STARPU_EXECUTE_ON_DATA)
 		{
-			starpu_data_handle_t data = va_arg(varg_list, starpu_data_handle_t);
+			starpu_data_handle_t data = va_arg(varg_list_copy, starpu_data_handle_t);
 			xrank = starpu_data_get_rank(data);
 			_STARPU_MPI_DEBUG(1, "Executing on data node %d\n", xrank);
 			STARPU_ASSERT_MSG(xrank <= nb_nodes, "Node %d to execute codelet is not a valid node (%d)", xrank, nb_nodes);
@@ -404,7 +404,7 @@ int starpu_mpi_task_insert(MPI_Comm comm, struct starpu_codelet *codelet, ...)
 		}
 		else if (arg_type==STARPU_R || arg_type==STARPU_W || arg_type==STARPU_RW || arg_type==STARPU_SCRATCH || arg_type==STARPU_REDUX)
 		{
-			starpu_data_handle_t data = va_arg(varg_list, starpu_data_handle_t);
+			starpu_data_handle_t data = va_arg(varg_list_copy, starpu_data_handle_t);
 			enum starpu_data_access_mode mode = (enum starpu_data_access_mode) arg_type;
 			int ret = _starpu_mpi_find_executee_node(data, mode, me, &do_execute, &inconsistent_execute, &dest, size_on_nodes);
 			if (ret == -EINVAL)
@@ -416,8 +416,8 @@ int starpu_mpi_task_insert(MPI_Comm comm, struct starpu_codelet *codelet, ...)
 		}
 		else if (arg_type == STARPU_DATA_ARRAY)
 		{
-			starpu_data_handle_t *datas = va_arg(varg_list, starpu_data_handle_t *);
-			int nb_handles = va_arg(varg_list, int);
+			starpu_data_handle_t *datas = va_arg(varg_list_copy, starpu_data_handle_t *);
+			int nb_handles = va_arg(varg_list_copy, int);
 			int i;
 			for(i=0 ; i<nb_handles ; i++)
 			{
@@ -433,33 +433,33 @@ int starpu_mpi_task_insert(MPI_Comm comm, struct starpu_codelet *codelet, ...)
 		}
 		else if (arg_type==STARPU_VALUE)
 		{
-			va_arg(varg_list, void *);
-			va_arg(varg_list, size_t);
+			va_arg(varg_list_copy, void *);
+			va_arg(varg_list_copy, size_t);
 		}
 		else if (arg_type==STARPU_CALLBACK)
 		{
-			va_arg(varg_list, void (*)(void *));
+			va_arg(varg_list_copy, void (*)(void *));
 		}
 		else if (arg_type==STARPU_CALLBACK_WITH_ARG)
 		{
-			va_arg(varg_list, void (*)(void *));
-			va_arg(varg_list, void *);
+			va_arg(varg_list_copy, void (*)(void *));
+			va_arg(varg_list_copy, void *);
 		}
 		else if (arg_type==STARPU_CALLBACK_ARG)
 		{
-			va_arg(varg_list, void *);
+			va_arg(varg_list_copy, void *);
 		}
 		else if (arg_type==STARPU_PRIORITY)
 		{
-			va_arg(varg_list, int);
+			va_arg(varg_list_copy, int);
 		}
 		else if (arg_type==STARPU_HYPERVISOR_TAG)
 		{
-			(void)va_arg(varg_list, int);
+			(void)va_arg(varg_list_copy, int);
 		}
 		else if (arg_type==STARPU_FLOPS)
 		{
-			(void)va_arg(varg_list, double);
+			(void)va_arg(varg_list_copy, double);
 		}
 		else if (arg_type==STARPU_TAG)
 		{
@@ -467,7 +467,7 @@ int starpu_mpi_task_insert(MPI_Comm comm, struct starpu_codelet *codelet, ...)
 		}
 
 	}
-	va_end(varg_list);
+	va_end(varg_list_copy);
 
 	if (do_execute == -1)
 	{
@@ -511,13 +511,13 @@ int starpu_mpi_task_insert(MPI_Comm comm, struct starpu_codelet *codelet, ...)
 	}
 
 	/* Send and receive data as requested */
-	va_start(varg_list, codelet);
+	va_copy(varg_list_copy, varg_list);
 	current_data = 0;
-	while ((arg_type = va_arg(varg_list, int)) != 0)
+	while ((arg_type = va_arg(varg_list_copy, int)) != 0)
 	{
 		if (arg_type==STARPU_R || arg_type==STARPU_W || arg_type==STARPU_RW || arg_type==STARPU_SCRATCH || arg_type==STARPU_REDUX)
 		{
-			starpu_data_handle_t data = va_arg(varg_list, starpu_data_handle_t);
+			starpu_data_handle_t data = va_arg(varg_list_copy, starpu_data_handle_t);
 			enum starpu_data_access_mode mode = (enum starpu_data_access_mode) arg_type;
 
 			_starpu_mpi_exchange_data_before_execution(data, mode, me, dest, do_execute, comm);
@@ -526,8 +526,8 @@ int starpu_mpi_task_insert(MPI_Comm comm, struct starpu_codelet *codelet, ...)
 		}
 		else if (arg_type == STARPU_DATA_ARRAY)
 		{
-			starpu_data_handle_t *datas = va_arg(varg_list, starpu_data_handle_t *);
-			int nb_handles = va_arg(varg_list, int);
+			starpu_data_handle_t *datas = va_arg(varg_list_copy, starpu_data_handle_t *);
+			int nb_handles = va_arg(varg_list_copy, int);
 			int i;
 
 			for(i=0 ; i<nb_handles ; i++)
@@ -538,62 +538,62 @@ int starpu_mpi_task_insert(MPI_Comm comm, struct starpu_codelet *codelet, ...)
 		}
 		else if (arg_type==STARPU_VALUE)
 		{
-			va_arg(varg_list, void *);
-			va_arg(varg_list, size_t);
+			va_arg(varg_list_copy, void *);
+			va_arg(varg_list_copy, size_t);
 		}
 		else if (arg_type==STARPU_CALLBACK)
 		{
-			va_arg(varg_list, void (*)(void *));
+			va_arg(varg_list_copy, void (*)(void *));
 		}
 		else if (arg_type==STARPU_CALLBACK_WITH_ARG)
 		{
-			va_arg(varg_list, void (*)(void *));
-			va_arg(varg_list, void *);
+			va_arg(varg_list_copy, void (*)(void *));
+			va_arg(varg_list_copy, void *);
 		}
 		else if (arg_type==STARPU_CALLBACK_ARG)
 		{
-			va_arg(varg_list, void *);
+			va_arg(varg_list_copy, void *);
 		}
 		else if (arg_type==STARPU_PRIORITY)
 		{
-			va_arg(varg_list, int);
+			va_arg(varg_list_copy, int);
 		}
 		else if (arg_type==STARPU_EXECUTE_ON_NODE)
 		{
-			va_arg(varg_list, int);
+			va_arg(varg_list_copy, int);
 		}
 		else if (arg_type==STARPU_EXECUTE_ON_DATA)
 		{
-			va_arg(varg_list, starpu_data_handle_t);
+			va_arg(varg_list_copy, starpu_data_handle_t);
 		}
 		else if (arg_type==STARPU_HYPERVISOR_TAG)
 		{
-			(void)va_arg(varg_list, int);
+			(void)va_arg(varg_list_copy, int);
 		}
 		else if (arg_type==STARPU_FLOPS)
 		{
-			(void)va_arg(varg_list, double);
+			(void)va_arg(varg_list_copy, double);
 		}
 		else if (arg_type==STARPU_TAG)
 		{
 			STARPU_ASSERT_MSG(0, "STARPU_TAG is not supported in MPI mode\n");
 		}
 	}
-	va_end(varg_list);
+	va_end(varg_list_copy);
 
 	if (do_execute)
 	{
 		/* Get the number of buffers and the size of the arguments */
-		va_start(varg_list, codelet);
-		arg_buffer_size = _starpu_task_insert_get_arg_size(varg_list);
-		va_end(varg_list);
+		va_copy(varg_list_copy, varg_list);
+		arg_buffer_size = _starpu_task_insert_get_arg_size(varg_list_copy);
+		va_end(varg_list_copy);
 
 		/* Pack arguments if needed */
 		if (arg_buffer_size)
 		{
-			va_start(varg_list, codelet);
-			_starpu_codelet_pack_args(&arg_buffer, arg_buffer_size, varg_list);
-			va_end(varg_list);
+			va_copy(varg_list_copy, varg_list);
+			_starpu_codelet_pack_args(&arg_buffer, arg_buffer_size, varg_list_copy);
+			va_end(varg_list_copy);
 		}
 
 		_STARPU_MPI_DEBUG(1, "Execution of the codelet %p (%s)\n", codelet, codelet->name);
@@ -606,22 +606,22 @@ int starpu_mpi_task_insert(MPI_Comm comm, struct starpu_codelet *codelet, ...)
 			task->dyn_handles = malloc(codelet->nbuffers * sizeof(starpu_data_handle_t));
 		}
 
-		va_start(varg_list, codelet);
-		int ret = _starpu_task_insert_create_and_submit(arg_buffer, arg_buffer_size, codelet, &task, varg_list);
-		va_end(varg_list);
+		va_copy(varg_list_copy, varg_list);
+		int ret = _starpu_task_insert_create_and_submit(arg_buffer, arg_buffer_size, codelet, &task, varg_list_copy);
+		va_end(varg_list_copy);
 
 		STARPU_ASSERT_MSG(ret==0, "_starpu_task_insert_create_and_submit failure %d", ret);
 	}
 
 	if (inconsistent_execute)
 	{
-		va_start(varg_list, codelet);
+		va_copy(varg_list_copy, varg_list);
 		current_data = 0;
-		while ((arg_type = va_arg(varg_list, int)) != 0)
+		while ((arg_type = va_arg(varg_list_copy, int)) != 0)
 		{
 			if (arg_type==STARPU_R || arg_type==STARPU_W || arg_type==STARPU_RW || arg_type==STARPU_SCRATCH || arg_type==STARPU_REDUX)
 			{
-				starpu_data_handle_t data = va_arg(varg_list, starpu_data_handle_t);
+				starpu_data_handle_t data = va_arg(varg_list_copy, starpu_data_handle_t);
 				enum starpu_data_access_mode mode = (enum starpu_data_access_mode) arg_type;
 
 				_starpu_mpi_exchange_data_after_execution(data, mode, me, xrank, dest, do_execute, comm);
@@ -629,8 +629,8 @@ int starpu_mpi_task_insert(MPI_Comm comm, struct starpu_codelet *codelet, ...)
 			}
 			else if (arg_type == STARPU_DATA_ARRAY)
 			{
-				starpu_data_handle_t *datas = va_arg(varg_list, starpu_data_handle_t *);
-				int nb_handles = va_arg(varg_list, int);
+				starpu_data_handle_t *datas = va_arg(varg_list_copy, starpu_data_handle_t *);
+				int nb_handles = va_arg(varg_list_copy, int);
 				int i;
 
 				for(i=0 ; i<nb_handles ; i++)
@@ -641,57 +641,57 @@ int starpu_mpi_task_insert(MPI_Comm comm, struct starpu_codelet *codelet, ...)
 			}
 			else if (arg_type==STARPU_VALUE)
 			{
-				va_arg(varg_list, void *);
-				va_arg(varg_list, size_t);
+				va_arg(varg_list_copy, void *);
+				va_arg(varg_list_copy, size_t);
 			}
 			else if (arg_type==STARPU_CALLBACK)
 			{
-				va_arg(varg_list, void (*)(void *));
+				va_arg(varg_list_copy, void (*)(void *));
 			}
 			else if (arg_type==STARPU_CALLBACK_WITH_ARG)
 			{
-				va_arg(varg_list, void (*)(void *));
-				va_arg(varg_list, void *);
+				va_arg(varg_list_copy, void (*)(void *));
+				va_arg(varg_list_copy, void *);
 			}
 			else if (arg_type==STARPU_CALLBACK_ARG)
 			{
-				va_arg(varg_list, void *);
+				va_arg(varg_list_copy, void *);
 			}
 			else if (arg_type==STARPU_PRIORITY)
 			{
-				va_arg(varg_list, int);
+				va_arg(varg_list_copy, int);
 			}
 			else if (arg_type==STARPU_EXECUTE_ON_NODE)
 			{
-				va_arg(varg_list, int);
+				va_arg(varg_list_copy, int);
 			}
 			else if (arg_type==STARPU_EXECUTE_ON_DATA)
 			{
-				va_arg(varg_list, starpu_data_handle_t);
+				va_arg(varg_list_copy, starpu_data_handle_t);
 			}
 			else if (arg_type==STARPU_HYPERVISOR_TAG)
 			{
-				(void)va_arg(varg_list, int);
+				(void)va_arg(varg_list_copy, int);
 			}
 			else if (arg_type==STARPU_FLOPS)
 			{
-				(void)va_arg(varg_list, double);
+				(void)va_arg(varg_list_copy, double);
 			}
 			else if (arg_type==STARPU_TAG)
 			{
 				STARPU_ASSERT_MSG(0, "STARPU_TAG is not supported in MPI mode\n");
 			}
 			}
-		va_end(varg_list);
+		va_end(varg_list_copy);
 	}
 
-	va_start(varg_list, codelet);
+	va_copy(varg_list_copy, varg_list);
 	current_data = 0;
-	while ((arg_type = va_arg(varg_list, int)) != 0)
+	while ((arg_type = va_arg(varg_list_copy, int)) != 0)
 	{
 		if (arg_type==STARPU_R || arg_type==STARPU_W || arg_type==STARPU_RW || arg_type == STARPU_SCRATCH || arg_type == STARPU_REDUX)
 		{
-			starpu_data_handle_t data = va_arg(varg_list, starpu_data_handle_t);
+			starpu_data_handle_t data = va_arg(varg_list_copy, starpu_data_handle_t);
 			enum starpu_data_access_mode mode = (enum starpu_data_access_mode) arg_type;
 
 			_starpu_mpi_clear_data_after_execution(data, mode, me, do_execute, comm);
@@ -699,8 +699,8 @@ int starpu_mpi_task_insert(MPI_Comm comm, struct starpu_codelet *codelet, ...)
 		}
 		else if (arg_type == STARPU_DATA_ARRAY)
 		{
-			starpu_data_handle_t *datas = va_arg(varg_list, starpu_data_handle_t *);
-			int nb_handles = va_arg(varg_list, int);
+			starpu_data_handle_t *datas = va_arg(varg_list_copy, starpu_data_handle_t *);
+			int nb_handles = va_arg(varg_list_copy, int);
 			int i;
 
 			for(i=0 ; i<nb_handles ; i++)
@@ -711,41 +711,41 @@ int starpu_mpi_task_insert(MPI_Comm comm, struct starpu_codelet *codelet, ...)
 		}
 		else if (arg_type==STARPU_VALUE)
 		{
-			va_arg(varg_list, void *);
-			va_arg(varg_list, size_t);
+			va_arg(varg_list_copy, void *);
+			va_arg(varg_list_copy, size_t);
 		}
 		else if (arg_type==STARPU_CALLBACK)
 		{
-			va_arg(varg_list, void (*)(void *));
+			va_arg(varg_list_copy, void (*)(void *));
 		}
 		else if (arg_type==STARPU_CALLBACK_WITH_ARG)
 		{
-			va_arg(varg_list, void (*)(void *));
-			va_arg(varg_list, void *);
+			va_arg(varg_list_copy, void (*)(void *));
+			va_arg(varg_list_copy, void *);
 		}
 		else if (arg_type==STARPU_CALLBACK_ARG)
 		{
-			va_arg(varg_list, void *);
+			va_arg(varg_list_copy, void *);
 		}
 		else if (arg_type==STARPU_PRIORITY)
 		{
-			va_arg(varg_list, int);
+			va_arg(varg_list_copy, int);
 		}
 		else if (arg_type==STARPU_EXECUTE_ON_NODE)
 		{
-			va_arg(varg_list, int);
+			va_arg(varg_list_copy, int);
 		}
 		else if (arg_type==STARPU_EXECUTE_ON_DATA)
 		{
-			va_arg(varg_list, starpu_data_handle_t);
+			va_arg(varg_list_copy, starpu_data_handle_t);
 		}
 		else if (arg_type==STARPU_HYPERVISOR_TAG)
 		{
-			(void)va_arg(varg_list, int);
+			(void)va_arg(varg_list_copy, int);
 		}
 		else if (arg_type==STARPU_FLOPS)
 		{
-			(void)va_arg(varg_list, double);
+			(void)va_arg(varg_list_copy, double);
 		}
 		else if (arg_type==STARPU_TAG)
 		{
@@ -753,11 +753,22 @@ int starpu_mpi_task_insert(MPI_Comm comm, struct starpu_codelet *codelet, ...)
 		}
 	}
 
-	va_end(varg_list);
+	va_end(varg_list_copy);
 	_STARPU_MPI_LOG_OUT();
 	return 0;
 }
 
+int starpu_mpi_task_insert(MPI_Comm comm, struct starpu_codelet *codelet, ...)
+{
+	va_list varg_list;
+	int ret;
+
+	va_start(varg_list, codelet);
+	ret = _starpu_mpi_task_insert_v(comm, codelet, varg_list);
+	va_end(varg_list);
+	return ret;
+}
+
 void starpu_mpi_get_data_on_node_detached(MPI_Comm comm, starpu_data_handle_t data_handle, int node, void (*callback)(void*), void *arg)
 {
 	int me, rank, tag;