Parcourir la source

mpi: Deal with new parameter STARPU_DATA_ARRAY in starpu_mpi_insert_task

Nathalie Furmento il y a 12 ans
Parent
commit
8d892f8554
1 fichiers modifiés avec 233 ajouts et 138 suppressions
  1. 233 138
      mpi/starpu_mpi_insert_task.c

+ 233 - 138
mpi/starpu_mpi_insert_task.c

@@ -49,6 +49,165 @@ static void _starpu_mpi_tables_init()
 	}
 }
 
+static
+int _starpu_mpi_find_executee_node(starpu_data_handle_t data, enum starpu_access_mode mode, int me, int *do_execute, int *inconsistent_execute, int *dest, size_t *size_on_nodes)
+{
+	if (data && mode & STARPU_R) {
+		struct starpu_data_interface_ops *ops;
+		int rank = starpu_data_get_rank(data);
+
+		ops = data->ops;
+		size_on_nodes[rank] += ops->get_size(data);
+	}
+
+	if (mode & STARPU_W) {
+		if (!data) {
+			/* We don't have anything allocated for this.
+			 * The application knows we won't do anything
+			 * about this task */
+			/* Yes, the app could actually not call
+			 * insert_task at all itself, this is just a
+			 * safeguard. */
+			_STARPU_MPI_DEBUG("oh oh\n");
+			_STARPU_MPI_LOG_OUT();
+			return -EINVAL;
+		}
+		int mpi_rank = starpu_data_get_rank(data);
+		if (mpi_rank == me) {
+			if (*do_execute == 0) {
+				*inconsistent_execute = 1;
+			}
+			else {
+				*do_execute = 1;
+			}
+		}
+		else if (mpi_rank != -1) {
+			if (*do_execute == 1) {
+				*inconsistent_execute = 1;
+			}
+			else {
+				*do_execute = 0;
+				*dest = mpi_rank;
+				/* That's the rank which needs the data to be sent to */
+			}
+		}
+		else {
+			_STARPU_ERROR("rank invalid\n");
+		}
+	}
+	return 0;
+}
+
+void _starpu_mpi_exchange_data_before_execution(starpu_data_handle_t data, enum starpu_access_mode mode, int me, int dest, int do_execute, MPI_Comm comm)
+{
+	if (data && mode & STARPU_R) {
+		int mpi_rank = starpu_data_get_rank(data);
+		int mpi_tag = starpu_data_get_tag(data);
+		STARPU_ASSERT(mpi_tag >= 0 && "StarPU needs to be told the MPI rank of this data, using starpu_data_set_rank");
+		/* The task needs to read this data */
+		if (do_execute && mpi_rank != me && mpi_rank != -1) {
+			/* I will have to execute but I don't have the data, receive */
+#ifdef MPI_CACHE
+			struct _starpu_data_entry *already_received;
+			HASH_FIND_PTR(received_data[mpi_rank], &data, already_received);
+			if (already_received == NULL) {
+				struct _starpu_data_entry *entry = (struct _starpu_data_entry *)malloc(sizeof(*entry));
+				entry->data = data;
+				HASH_ADD_PTR(received_data[mpi_rank], data, entry);
+			}
+			else {
+				_STARPU_MPI_DEBUG("Do not receive data %p from node %d as it is already available\n", data, mpi_rank);
+			}
+			if (!already_received)
+#endif
+			{
+				_STARPU_MPI_DEBUG("Receive data %p from %d\n", data, mpi_rank);
+				starpu_mpi_irecv_detached(data, mpi_rank, mpi_tag, comm, NULL, NULL);
+			}
+		}
+		if (!do_execute && mpi_rank == me) {
+			/* Somebody else will execute it, and I have the data, send it. */
+#ifdef MPI_CACHE
+			struct _starpu_data_entry *already_sent;
+			HASH_FIND_PTR(sent_data[mpi_rank], &data, already_sent);
+			if (already_sent == NULL) {
+				struct _starpu_data_entry *entry = (struct _starpu_data_entry *)malloc(sizeof(*entry));
+				entry->data = data;
+				HASH_ADD_PTR(sent_data[dest], data, entry);
+			}
+			else {
+				_STARPU_MPI_DEBUG("Do not send data %p to node %d as it has already been sent\n", data, dest);
+			}
+			if (!already_sent)
+#endif
+			{
+				_STARPU_MPI_DEBUG("Send data %p to %d\n", data, dest);
+				starpu_mpi_isend_detached(data, dest, mpi_tag, comm, NULL, NULL);
+			}
+		}
+	}
+}
+
+void _starpu_mpi_exchange_data_after_execution(starpu_data_handle_t data, enum starpu_access_mode mode, int me, int xrank, int dest, int do_execute, MPI_Comm comm)
+{
+	if (mode & STARPU_W) {
+		int mpi_rank = starpu_data_get_rank(data);
+		int mpi_tag = starpu_data_get_tag(data);
+		STARPU_ASSERT(mpi_tag >= 0 && "StarPU needs to be told the MPI rank of this data, using starpu_data_set_rank");
+		if (mpi_rank == me) {
+			if (xrank != -1 && me != xrank) {
+				_STARPU_MPI_DEBUG("Receive data %p back from the task %d which executed the codelet ...\n", data, dest);
+				starpu_mpi_irecv_detached(data, dest, mpi_tag, comm, NULL, NULL);
+			}
+		}
+		else if (do_execute) {
+			_STARPU_MPI_DEBUG("Send data %p back to its owner %d...\n", data, mpi_rank);
+			starpu_mpi_isend_detached(data, mpi_rank, mpi_tag, comm, NULL, NULL);
+		}
+	}
+}
+
+void _starpu_mpi_clear_data_after_execution(starpu_data_handle_t data, enum starpu_access_mode mode, int me, int do_execute, MPI_Comm comm)
+{
+#ifdef MPI_CACHE
+	if (mode & STARPU_W) {
+		if (do_execute) {
+			/* Note that all copies I've sent to neighbours are now invalid */
+			int n, size;
+			MPI_Comm_size(comm, &size);
+			for(n=0 ; n<size ; n++) {
+				struct _starpu_data_entry *already_sent;
+				HASH_FIND_PTR(sent_data[n], &data, already_sent);
+				if (already_sent) {
+					_STARPU_MPI_DEBUG("Posting request to clear send cache for data %p\n", data);
+					_starpu_mpi_clear_cache_request(data, n, _STARPU_MPI_CLEAR_SENT_DATA);
+				}
+			}
+		}
+		else {
+			int mpi_rank = starpu_data_get_rank(data);
+			struct _starpu_data_entry *already_received;
+			HASH_FIND_PTR(received_data[mpi_rank], &data, already_received);
+			if (already_received) {
+				/* Somebody else will write to the data, so discard our cached copy if any */
+				/* TODO: starpu_mpi could just remember itself. */
+				_STARPU_MPI_DEBUG("Posting request to clear receive cache for data %p\n", data);
+				_starpu_mpi_clear_cache_request(data, mpi_rank, _STARPU_MPI_CLEAR_RECEIVED_DATA);
+				starpu_data_invalidate_submit(data);
+			}
+		}
+	}
+#else
+	/* We allocated a temporary buffer for the received data, now drop it */
+	if ((mode & STARPU_R) && do_execute) {
+		int mpi_rank = starpu_data_get_rank(data);
+		if (mpi_rank != me && mpi_rank != -1) {
+			starpu_data_invalidate_submit(data);
+		}
+	}
+#endif
+}
+
 int starpu_mpi_insert_task(MPI_Comm comm, struct starpu_codelet *codelet, ...)
 {
 	int arg_type;
@@ -58,6 +217,7 @@ int starpu_mpi_insert_task(MPI_Comm comm, struct starpu_codelet *codelet, ...)
 	size_t arg_buffer_size = 0;
 	char *arg_buffer;
 	int dest=0, inconsistent_execute;
+	int current_data = 0;
 
 	_STARPU_MPI_LOG_IN();
 
@@ -95,49 +255,30 @@ int starpu_mpi_insert_task(MPI_Comm comm, struct starpu_codelet *codelet, ...)
 		}
 		else if (arg_type==STARPU_R || arg_type==STARPU_W || arg_type==STARPU_RW || arg_type==STARPU_SCRATCH || arg_type==STARPU_REDUX) {
 			starpu_data_handle_t data = va_arg(varg_list, starpu_data_handle_t);
-
-			if (data && arg_type & STARPU_R) {
-				int rank = starpu_data_get_rank(data);
-				struct starpu_data_interface_ops *ops;
-				ops = data->ops;
-				size_on_nodes[rank] += ops->get_size(data);
+			enum starpu_access_mode mode = (enum starpu_access_mode) arg_type;
+			int ret = _starpu_mpi_find_executee_node(data, mode, me, &do_execute, &inconsistent_execute, &dest, size_on_nodes);
+			if (ret == -EINVAL)
+			{
+				free(size_on_nodes);
+				return ret;
 			}
-
-			if (arg_type & STARPU_W) {
-				if (!data) {
-					/* We don't have anything allocated for this.
-					 * The application knows we won't do anything
-					 * about this task */
-					/* Yes, the app could actually not call
-					 * insert_task at all itself, this is just a
-					 * safeguard. */
-					_STARPU_MPI_DEBUG("oh oh\n");
-					_STARPU_MPI_LOG_OUT();
+			current_data ++;
+		}
+		else if (arg_type == STARPU_DATA_ARRAY)
+		{
+			starpu_data_handle_t *datas = va_arg(varg_list, starpu_data_handle_t *);
+			int nb_handles = va_arg(varg_list, int);
+			int i;
+			for(i=0 ; i<nb_handles ; i++)
+			{
+				enum starpu_access_mode mode = codelet->modes[current_data];
+				int ret = _starpu_mpi_find_executee_node(datas[i], mode, me, &do_execute, &inconsistent_execute, &dest, size_on_nodes);
+				if (ret == -EINVAL)
+				{
 					free(size_on_nodes);
-					return -EINVAL;
-				}
-				int mpi_rank = starpu_data_get_rank(data);
-				if (mpi_rank == me) {
-					if (do_execute == 0) {
-						inconsistent_execute = 1;
-					}
-					else {
-						do_execute = 1;
-					}
-				}
-				else if (mpi_rank != -1) {
-					if (do_execute == 1) {
-						inconsistent_execute = 1;
-					}
-					else {
-						do_execute = 0;
-						dest = mpi_rank;
-						/* That's the rank which needs the data to be sent to */
-					}
-				}
-				else {
-					_STARPU_ERROR("rank invalid\n");
+					return ret;
 				}
+				current_data ++;
 			}
 		}
 		else if (arg_type==STARPU_VALUE) {
@@ -203,54 +344,26 @@ int starpu_mpi_insert_task(MPI_Comm comm, struct starpu_codelet *codelet, ...)
 
 	/* Send and receive data as requested */
 	va_start(varg_list, codelet);
+	current_data = 0;
 	while ((arg_type = va_arg(varg_list, int)) != 0) {
-		if (arg_type==STARPU_R || arg_type==STARPU_W || arg_type==STARPU_RW || arg_type == STARPU_SCRATCH) {
+		if (arg_type==STARPU_R || arg_type==STARPU_W || arg_type==STARPU_RW || arg_type==STARPU_SCRATCH || arg_type==STARPU_REDUX) {
 			starpu_data_handle_t data = va_arg(varg_list, starpu_data_handle_t);
-			if (data && arg_type & STARPU_R) {
-				int mpi_rank = starpu_data_get_rank(data);
-				int mpi_tag = starpu_data_get_tag(data);
-				STARPU_ASSERT(mpi_tag >= 0 && "StarPU needs to be told the MPI rank of this data, using starpu_data_set_rank");
-				/* The task needs to read this data */
-				if (do_execute && mpi_rank != me && mpi_rank != -1) {
-					/* I will have to execute but I don't have the data, receive */
-#ifdef MPI_CACHE
-					struct _starpu_data_entry *already_received;
-					HASH_FIND_PTR(received_data[mpi_rank], &data, already_received);
-					if (already_received == NULL) {
-						struct _starpu_data_entry *entry = (struct _starpu_data_entry *)malloc(sizeof(*entry));
-						entry->data = data;
-						HASH_ADD_PTR(received_data[mpi_rank], data, entry);
-					}
-					else {
-						_STARPU_MPI_DEBUG("Do not receive data %p from node %d as it is already available\n", data, mpi_rank);
-					}
-					if (!already_received)
-#endif
-					{
-						_STARPU_MPI_DEBUG("Receive data %p from %d\n", data, mpi_rank);
-						starpu_mpi_irecv_detached(data, mpi_rank, mpi_tag, comm, NULL, NULL);
-					}
-				}
-				if (!do_execute && mpi_rank == me) {
-					/* Somebody else will execute it, and I have the data, send it. */
-#ifdef MPI_CACHE
-					struct _starpu_data_entry *already_sent;
-					HASH_FIND_PTR(sent_data[mpi_rank], &data, already_sent);
-					if (already_sent == NULL) {
-						struct _starpu_data_entry *entry = (struct _starpu_data_entry *)malloc(sizeof(*entry));
-						entry->data = data;
-						HASH_ADD_PTR(sent_data[dest], data, entry);
-					}
-					else {
-						_STARPU_MPI_DEBUG("Do not send data %p to node %d as it has already been sent\n", data, dest);
-					}
-					if (!already_sent)
-#endif
-					{
-						_STARPU_MPI_DEBUG("Send data %p to %d\n", data, dest);
-						starpu_mpi_isend_detached(data, dest, mpi_tag, comm, NULL, NULL);
-					}
-				}
+			enum starpu_access_mode mode = (enum starpu_access_mode) arg_type;
+
+			_starpu_mpi_exchange_data_before_execution(data, mode, me, dest, do_execute, comm);
+			current_data ++;
+
+		}
+		else if (arg_type == STARPU_DATA_ARRAY)
+		{
+			starpu_data_handle_t *datas = va_arg(varg_list, starpu_data_handle_t *);
+			int nb_handles = va_arg(varg_list, int);
+			int i;
+
+			for(i=0 ; i<nb_handles ; i++)
+			{
+				_starpu_mpi_exchange_data_before_execution(datas[i], codelet->modes[current_data], me, dest, do_execute, comm);
+				current_data++;
 			}
 		}
 		else if (arg_type==STARPU_VALUE) {
@@ -290,23 +403,25 @@ int starpu_mpi_insert_task(MPI_Comm comm, struct starpu_codelet *codelet, ...)
 
 	if (inconsistent_execute) {
 		va_start(varg_list, codelet);
+		current_data = 0;
 		while ((arg_type = va_arg(varg_list, int)) != 0) {
-			if (arg_type==STARPU_R || arg_type==STARPU_W || arg_type==STARPU_RW || arg_type == STARPU_SCRATCH) {
+			if (arg_type==STARPU_R || arg_type==STARPU_W || arg_type==STARPU_RW || arg_type==STARPU_SCRATCH || arg_type==STARPU_REDUX) {
 				starpu_data_handle_t data = va_arg(varg_list, starpu_data_handle_t);
-				if (arg_type & STARPU_W) {
-					int mpi_rank = starpu_data_get_rank(data);
-					int mpi_tag = starpu_data_get_tag(data);
-					STARPU_ASSERT(mpi_tag >= 0 && "StarPU needs to be told the MPI rank of this data, using starpu_data_set_rank");
-					if (mpi_rank == me) {
-						if (xrank != -1 && me != xrank) {
-							_STARPU_MPI_DEBUG("Receive data %p back from the task %d which executed the codelet ...\n", data, dest);
-							starpu_mpi_irecv_detached(data, dest, mpi_tag, comm, NULL, NULL);
-						}
-					}
-					else if (do_execute) {
-						_STARPU_MPI_DEBUG("Send data %p back to its owner %d...\n", data, mpi_rank);
-						starpu_mpi_isend_detached(data, mpi_rank, mpi_tag, comm, NULL, NULL);
-					}
+				enum starpu_access_mode mode = (enum starpu_access_mode) arg_type;
+
+				_starpu_mpi_exchange_data_after_execution(data, mode, me, xrank, dest, do_execute, comm);
+				current_data++;
+			}
+			else if (arg_type == STARPU_DATA_ARRAY)
+			{
+				starpu_data_handle_t *datas = va_arg(varg_list, starpu_data_handle_t *);
+				int nb_handles = va_arg(varg_list, int);
+				int i;
+
+				for(i=0 ; i<nb_handles ; i++)
+				{
+					_starpu_mpi_exchange_data_after_execution(datas[i], codelet->modes[current_data], me, xrank, dest, do_execute, comm);
+					current_data++;
 				}
 			}
 			else if (arg_type==STARPU_VALUE) {
@@ -337,46 +452,26 @@ int starpu_mpi_insert_task(MPI_Comm comm, struct starpu_codelet *codelet, ...)
 	}
 
 	va_start(varg_list, codelet);
+	current_data = 0;
 	while ((arg_type = va_arg(varg_list, int)) != 0) {
-		if (arg_type==STARPU_R || arg_type==STARPU_W || arg_type==STARPU_RW || arg_type == STARPU_SCRATCH) {
+		if (arg_type==STARPU_R || arg_type==STARPU_W || arg_type==STARPU_RW || arg_type == STARPU_SCRATCH || arg_type == STARPU_REDUX) {
 			starpu_data_handle_t data = va_arg(varg_list, starpu_data_handle_t);
-#ifdef MPI_CACHE
-			if (arg_type & STARPU_W) {
-				if (do_execute) {
-					/* Note that all copies I've sent to neighbours are now invalid */
-					int n, size;
-					MPI_Comm_size(comm, &size);
-					for(n=0 ; n<size ; n++) {
-						struct _starpu_data_entry *already_sent;
-						HASH_FIND_PTR(sent_data[n], &data, already_sent);
-						if (already_sent) {
-							_STARPU_MPI_DEBUG("Posting request to clear send cache for data %p\n", data);
-							_starpu_mpi_clear_cache_request(data, n, _STARPU_MPI_CLEAR_SENT_DATA);
-						}
-					}
-				}
-				else {
-					int mpi_rank = starpu_data_get_rank(data);
-					struct _starpu_data_entry *already_received;
-					HASH_FIND_PTR(received_data[mpi_rank], &data, already_received);
-					if (already_received) {
-						/* Somebody else will write to the data, so discard our cached copy if any */
-						/* TODO: starpu_mpi could just remember itself. */
-						_STARPU_MPI_DEBUG("Posting request to clear receive cache for data %p\n", data);
-						_starpu_mpi_clear_cache_request(data, mpi_rank, _STARPU_MPI_CLEAR_RECEIVED_DATA);
-						starpu_data_invalidate_submit(data);
-					}
-				}
-			}
-#else
-			/* We allocated a temporary buffer for the received data, now drop it */
-			if ((arg_type & STARPU_R) && do_execute) {
-				int mpi_rank = starpu_data_get_rank(data);
-				if (mpi_rank != me && mpi_rank != -1) {
-					starpu_data_invalidate_submit(data);
-				}
+			enum starpu_access_mode mode = (enum starpu_access_mode) arg_type;
+
+			_starpu_mpi_clear_data_after_execution(data, mode, me, do_execute, comm);
+			current_data++;
+		}
+		else if (arg_type == STARPU_DATA_ARRAY)
+		{
+			starpu_data_handle_t *datas = va_arg(varg_list, starpu_data_handle_t *);
+			int nb_handles = va_arg(varg_list, int);
+			int i;
+
+			for(i=0 ; i<nb_handles ; i++)
+			{
+				_starpu_mpi_clear_data_after_execution(datas[i], codelet->modes[current_data], me, do_execute, comm);
+				current_data++;
 			}
-#endif
 		}
 		else if (arg_type==STARPU_VALUE) {
 			va_arg(varg_list, void *);