Browse Source

Supply more helper functions: the "array" version of the helpers make it
possible to submit multiple MPI requests at the same time and to liberate a
single tag once all transfers are done.

Cédric Augonnet 15 years ago
parent
commit
332daf9df2
2 changed files with 66 additions and 0 deletions
  1. 9 0
      mpi/starpu_mpi.h
  2. 57 0
      mpi/starpu_mpi_helper.c

+ 9 - 0
mpi/starpu_mpi.h

@@ -39,4 +39,13 @@ int starpu_mpi_shutdown(void);
 int starpu_mpi_isend_detached_unlock_tag(starpu_data_handle data_handle, int dest, int mpi_tag, MPI_Comm comm, starpu_tag_t tag);
 int starpu_mpi_irecv_detached_unlock_tag(starpu_data_handle data_handle, int source, int mpi_tag, MPI_Comm comm, starpu_tag_t tag);
 
+/* Asynchronously send an array of buffers, and unlocks the tag once all of
+ * them are transmitted. */
+int starpu_mpi_isend_array_detached_unlock_tag(unsigned array_size,
+		starpu_data_handle *data_handle, int *dest, int *mpi_tag,
+		MPI_Comm *comm, starpu_tag_t tag);
+int starpu_mpi_irecv_array_detached_unlock_tag(unsigned array_size,
+		starpu_data_handle *data_handle, int *source, int *mpi_tag,
+		MPI_Comm *comm, starpu_tag_t tag);
+
 #endif // __STARPU_MPI_H__

+ 57 - 0
mpi/starpu_mpi_helper.c

@@ -44,3 +44,60 @@ int starpu_mpi_irecv_detached_unlock_tag(starpu_data_handle data_handle, int sou
 	return starpu_mpi_irecv_detached(data_handle, source, mpi_tag, comm,
 						starpu_mpi_unlock_tag_callback, tagptr);
 }
+
+struct arg_array {
+	int array_size;
+	starpu_tag_t tag;
+};
+
+static void starpu_mpi_array_unlock_callback(void *_arg)
+{
+	struct arg_array *arg = _arg;
+
+	int remaining = STARPU_ATOMIC_ADD(&arg->array_size, -1);
+
+	if (remaining == 0)
+	{
+		starpu_tag_notify_from_apps(arg->tag);
+		free(arg);
+	}
+}
+
+int starpu_mpi_isend_array_detached_unlock_tag(unsigned array_size,
+		starpu_data_handle *data_handle, int *dest, int *mpi_tag,
+		MPI_Comm *comm, starpu_tag_t tag)
+{
+	struct arg_array *arg = malloc(sizeof(struct arg_array));
+
+	arg->array_size = array_size;
+	arg->tag = tag;
+
+	unsigned elem;
+	for (elem = 0; elem < array_size; elem++)
+	{
+		starpu_mpi_isend_detached(data_handle[elem], dest[elem],
+				mpi_tag[elem], comm[elem],
+				starpu_mpi_array_unlock_callback, arg);
+	}
+
+	return 0;
+}
+
+
+int starpu_mpi_irecv_array_detached_unlock_tag(unsigned array_size, starpu_data_handle *data_handle, int *source, int *mpi_tag, MPI_Comm *comm, starpu_tag_t tag)
+{
+	struct arg_array *arg = malloc(sizeof(struct arg_array));
+
+	arg->array_size = array_size;
+	arg->tag = tag;
+
+	unsigned elem;
+	for (elem = 0; elem < array_size; elem++)
+	{
+		starpu_mpi_irecv_detached(data_handle[elem], source[elem],
+				mpi_tag[elem], comm[elem],
+				starpu_mpi_array_unlock_callback, arg);
+	}
+
+	return 0;
+}