Просмотр исходного кода

implement isend, irecv, wait and test

Cédric Augonnet лет назад: 15
Родитель
Сommit
2b9b3e948b
4 измененных файлов с 293 добавлено и 13 удалено
  1. 8 0
      mpi/Makefile.am
  2. 193 9
      mpi/starpu_mpi.c
  3. 15 4
      mpi/starpu_mpi.h
  4. 77 0
      mpi/tests/mpi_isend.c

+ 8 - 0
mpi/Makefile.am

@@ -51,9 +51,17 @@ check_PROGRAMS =
 
 check_PROGRAMS +=					\
 	tests/pingpong					\
+	tests/mpi_isend					\
 	tests/ring					\
 	tests/block_interface
 
+tests_mpi_isend_LDADD =					\
+	libstarpumpi.la
+
+tests_mpi_isend_SOURCES =				\
+	tests/pingpong.c
+
+
 tests_pingpong_LDADD =					\
 	libstarpumpi.la
 

+ 193 - 9
mpi/starpu_mpi.c

@@ -17,21 +17,68 @@
 #include <starpu_mpi.h>
 #include <starpu_mpi_datatype.h>
 
-pthread_cond_t cond;
-pthread_mutex_t mutex;
-pthread_t progress_thread;
+static void submit_mpi_req(struct starpu_mpi_req_s *req);
 
-int starpu_mpi_isend(starpu_data_handle data_handle, starpu_mpi_req_t *req,
+static starpu_mpi_req_list_t new_requests; 
+static starpu_mpi_req_list_t pending_requests; 
+
+static pthread_cond_t cond;
+static pthread_mutex_t mutex;
+static pthread_t progress_thread;
+static int running = 0;
+
+static void _handle_new_mpi_isend(struct starpu_mpi_req_s *req)
+{
+	void *ptr = starpu_mpi_handle_to_ptr(req->data_handle);
+	starpu_mpi_handle_to_datatype(req->data_handle, &req->datatype);
+
+	MPI_Isend(ptr, 1, req->datatype, req->dst, req->mpi_tag, req->comm, &req->request);
+}
+
+int starpu_mpi_isend(starpu_data_handle data_handle, struct starpu_mpi_req_s *req,
 		int dest, int mpi_tag, MPI_Comm comm,
 		void (*callback)(void *))
 {
+	req->submitted = 0;
+	pthread_mutex_init(&req->req_mutex, NULL);
+	pthread_cond_init(&req->req_cond, NULL);
+
+	req->dst = dest;
+	req->mpi_tag = mpi_tag;
+	req->comm = comm;
+
+	req->handle_new = _handle_new_mpi_isend;
+
+	submit_mpi_req(req);
+
 	return 0;
 }
 
-int starpu_mpi_irecv(starpu_data_handle data_handle, starpu_mpi_req_t *req,
+static void _handle_new_mpi_irecv(struct starpu_mpi_req_s *req)
+{
+	void *ptr = starpu_mpi_handle_to_ptr(req->data_handle);
+	starpu_mpi_handle_to_datatype(req->data_handle, &req->datatype);
+
+	MPI_Irecv(ptr, 1, req->datatype, req->src, req->mpi_tag, req->comm, &req->request);
+}
+
+
+int starpu_mpi_irecv(starpu_data_handle data_handle, struct starpu_mpi_req_s *req,
 		int source, int mpi_tag, MPI_Comm comm,
 		void (*callback)(void *))
 {
+	req->submitted = 0;
+	pthread_mutex_init(&req->req_mutex, NULL);
+	pthread_cond_init(&req->req_cond, NULL);
+
+	req->src = source;
+	req->mpi_tag = mpi_tag;
+	req->comm = comm;
+
+	req->handle_new = _handle_new_mpi_irecv;
+
+	submit_mpi_req(req);
+
 	return 0;
 }
 
@@ -74,22 +121,159 @@ int starpu_mpi_send(starpu_data_handle data_handle,
 	return 0;
 }
 
-int starpu_mpi_wait(starpu_mpi_req_t *req)
+int starpu_mpi_wait(struct starpu_mpi_req_s *req, MPI_Status *status)
 {
-	return 0;
+	int ret;
+
+	pthread_mutex_lock(&req->req_mutex);
+
+	while (!req->submitted)
+		pthread_cond_wait(&req->req_cond, &req->req_mutex);
+
+	ret = MPI_Wait(&req->request, status);
+
+	MPI_Type_free(&req->datatype);
+
+	pthread_mutex_unlock(&req->req_mutex);
+
+	return ret;
 }
 
-int starpu_mpi_test(starpu_mpi_req_t *req, int *flag)
+int starpu_mpi_test(struct starpu_mpi_req_s *req, int *flag, MPI_Status *status)
 {
-	return 0;
+	int ret = 0;
+
+	pthread_mutex_lock(&req->req_mutex);
+
+	if (req->submitted)
+	{
+		ret = MPI_Test(&req->request, flag, status);
+
+		if (*flag)
+			MPI_Type_free(&req->datatype);
+	}
+	else {
+		*flag = 0;
+	}
+
+	pthread_mutex_unlock(&req->req_mutex);
+
+	return ret;
 }
 
+/*
+ *	Requests
+ */
+
+void handle_request(struct starpu_mpi_req_s *req)
+{
+	STARPU_ASSERT(req);
+
+	pthread_mutex_lock(&req->req_mutex);
+
+	starpu_sync_data_with_mem(req->data_handle, req->mode);
+
+	/* submit the request to MPI */
+	req->handle_new(req);
+
+	/* perhaps somebody is waiting or trying to test */
+	req->submitted = 1;
+	pthread_cond_broadcast(&req->req_cond);
+
+	pthread_mutex_unlock(&req->req_mutex);
+}
+
+static void submit_mpi_req(struct starpu_mpi_req_s *req)
+{
+	pthread_mutex_lock(&mutex);
+	pthread_mutex_lock(&req->req_mutex);
+
+	starpu_mpi_req_list_push_front(new_requests, req);
+
+	pthread_cond_broadcast(&req->req_cond);
+
+	pthread_mutex_unlock(&req->req_mutex);
+	pthread_mutex_unlock(&mutex);
+}
+
+/*
+ *	Progression loop
+ */
+
+void *progress_thread_func(void *arg __attribute__((unused)))
+{
+	/* notify the main thread that the progression thread is ready */
+	pthread_mutex_lock(&mutex);
+	running = 1;
+	pthread_cond_signal(&cond);
+	pthread_mutex_unlock(&mutex);
+
+	pthread_mutex_lock(&mutex);
+	while (running) {
+		pthread_cond_wait(&cond, &mutex);
+		if (!running)
+			break;		
+
+		while (!starpu_mpi_req_list_empty(new_requests))
+		{
+			/* get one request */
+			struct starpu_mpi_req_s *req;
+			req = starpu_mpi_req_list_pop_back(new_requests);
+
+			/* handling a request is likely to block for a while
+			 * (on a sync_data_with_mem call), we want to let the
+			 * application submit requests in the meantime, so we
+			 * release the lock.  */
+			pthread_mutex_unlock(&mutex);
+
+			/* handle that request */
+			STARPU_ASSERT(req);
+			req->handle_new(req);
+
+			pthread_mutex_lock(&mutex);
+		}
+
+		pthread_mutex_unlock(&mutex);
+	}
+	pthread_mutex_unlock(&mutex);
+
+	return NULL;
+}
+
+/*
+ *	(De)Initialization methods 
+ */
+
 int starpu_mpi_initialize(void)
 {
+	pthread_mutex_init(&mutex, NULL);
+	pthread_cond_init(&cond, NULL);
+
+	/* requests that have not be submitted to MPI yet */
+	new_requests = starpu_mpi_req_list_new();
+	/* requests that are already submitted and which are not completed yet */
+	pending_requests = starpu_mpi_req_list_new();
+
+	int ret = pthread_create(&progress_thread, NULL, progress_thread_func, NULL);
+
+	pthread_mutex_lock(&mutex);
+	if (!running)
+		pthread_cond_wait(&cond, &mutex);
+	pthread_mutex_unlock(&mutex);
+
 	return 0;
 }
 
 int starpu_mpi_shutdown(void)
 {
+	/* kill the progression thread */
+	pthread_mutex_lock(&mutex);
+	running = 0;
+	pthread_cond_signal(&cond);
+	pthread_mutex_unlock(&mutex);
+
+	void *value;
+	pthread_join(progress_thread, &value);
+
 	return 0;
 }

+ 15 - 4
mpi/starpu_mpi.h

@@ -24,22 +24,33 @@
 
 LIST_TYPE(starpu_mpi_req,
 	void *ptr;
+	starpu_data_handle data_handle;
+	starpu_access_mode mode;
 	MPI_Datatype datatype;
 	MPI_Request request;
+	void (*handle_new)(struct starpu_mpi_req_s *);
+	void (*handle_pending)(struct starpu_mpi_req_s *);
+	unsigned submitted;
+	int dst;
+	int src;
+	int mpi_tag;
+	MPI_Comm comm;
+	pthread_mutex_t req_mutex;
+	pthread_cond_t req_cond;
 );
 
-int starpu_mpi_isend(starpu_data_handle data_handle, starpu_mpi_req_t *req,
+int starpu_mpi_isend(starpu_data_handle data_handle, struct starpu_mpi_req_s *req,
 		int dest, int mpi_tag, MPI_Comm comm,
 		void (*callback)(void *));
-int starpu_mpi_irecv(starpu_data_handle data_handle, starpu_mpi_req_t *req,
+int starpu_mpi_irecv(starpu_data_handle data_handle, struct starpu_mpi_req_s *req,
 		int source, int mpi_tag, MPI_Comm comm,
 		void (*callback)(void *));
 int starpu_mpi_send(starpu_data_handle data_handle,
 		int dest, int mpi_tag, MPI_Comm comm);
 int starpu_mpi_recv(starpu_data_handle data_handle,
 		int source, int mpi_tag, MPI_Comm comm, MPI_Status *status);
-int starpu_mpi_wait(starpu_mpi_req_t *req);
-int starpu_mpi_test(starpu_mpi_req_t *req, int *flag);
+int starpu_mpi_wait(struct starpu_mpi_req_s *req, MPI_Status *status);
+int starpu_mpi_test(struct starpu_mpi_req_s *req, int *flag, MPI_Status *status);
 int starpu_mpi_initialize(void);
 int starpu_mpi_shutdown(void);
 

+ 77 - 0
mpi/tests/mpi_isend.c

@@ -0,0 +1,77 @@
+/*
+ * StarPU
+ * Copyright (C) INRIA 2008-2009 (see AUTHORS file)
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <starpu_mpi.h>
+
+#define NITER	2048
+#define SIZE	16
+
+float *tab;
+starpu_data_handle tab_handle;
+
+int main(int argc, char **argv)
+{
+	MPI_Init(NULL, NULL);
+
+	int rank, size;
+
+	MPI_Comm_rank(MPI_COMM_WORLD, &rank);
+	MPI_Comm_size(MPI_COMM_WORLD, &size);
+
+	if (size != 2)
+	{
+		if (rank == 0)
+			fprintf(stderr, "We need exactly 2 processes.\n");
+
+		MPI_Finalize();
+		return 0;
+	}
+
+	starpu_init(NULL);
+	starpu_mpi_initialize();
+
+	tab = malloc(SIZE*sizeof(float));
+
+	starpu_register_vector_data(&tab_handle, 0, (uintptr_t)tab, SIZE, sizeof(float));
+
+	unsigned nloops = NITER;
+	unsigned loop;
+
+	int other_rank = (rank + 1)%2;
+
+	for (loop = 0; loop < nloops; loop++)
+	{
+		if ((loop % 2) == rank)
+		{
+			MPI_Status status;
+			struct starpu_mpi_req_s req;
+			starpu_mpi_isend(tab_handle, &req, other_rank, loop,
+						MPI_COMM_WORLD, NULL);
+			starpu_mpi_wait(&req, &status);
+		}
+		else {
+			MPI_Status status;
+			starpu_mpi_recv(tab_handle, other_rank, loop, MPI_COMM_WORLD, &status);
+		}
+	}
+	
+	starpu_mpi_shutdown();
+	starpu_shutdown();
+
+	MPI_Finalize();
+
+	return 0;
+}