瀏覽代碼

mpi/src/starpu_mpi.c: revert commit 8620 which intended to fix mechanism to send data with user-defined datatype.

 We actually waited for the completion of the receive of the size before receiving the data.

 There is a problem but it is not this one. The truth is out there ....

 Furthermore, this fix meant the size of the data was sent too early, i.e before the pack of the data was done.
Nathalie Furmento 12 年之前
父節點
當前提交
52ac5fca66
共有 1 個文件被更改,包括 41 次插入42 次删除
  1. 41 42
      mpi/src/starpu_mpi.c

+ 41 - 42
mpi/src/starpu_mpi.c

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2009, 2010-2012  Université de Bordeaux 1
- * Copyright (C) 2010, 2011, 2012, 2013  Centre National de la Recherche Scientifique
+ * Copyright (C) 2010, 2011, 2012  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -69,7 +69,6 @@ static int posted_requests = 0, newer_requests, barrier_running = 0;
 /********************************************************/
 
 static struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle_t data_handle,
-							      size_t size,
 							      int srcdst, int mpi_tag, MPI_Comm comm,
 							      unsigned detached, void (*callback)(void *), void *arg,
 							      enum _starpu_mpi_request_type request_type, void (*func)(struct _starpu_mpi_req *),
@@ -91,7 +90,6 @@ static struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle
 	req->request_type = request_type;
 
 	req->data_handle = data_handle;
-	req->count = size;
 	req->srcdst = srcdst;
 	req->mpi_tag = mpi_tag;
 	req->comm = comm;
@@ -145,7 +143,13 @@ static void _starpu_mpi_isend_data_func(struct _starpu_mpi_req *req)
 	_STARPU_MPI_LOG_OUT();
 }
 
-static void _starpu_mpi_isend_pack_func(struct _starpu_mpi_req *req)
+static void _starpu_mpi_isend_size_callback(void *arg)
+{
+	struct _starpu_mpi_req *req = (struct _starpu_mpi_req *) arg;
+	_starpu_mpi_isend_data_func(req);
+}
+
+static void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
 {
 	_starpu_mpi_handle_allocate_datatype(req->data_handle, &req->datatype, &req->user_datatype);
 	if (req->user_datatype == 0)
@@ -156,8 +160,12 @@ static void _starpu_mpi_isend_pack_func(struct _starpu_mpi_req *req)
 	}
 	else
 	{
+		starpu_data_handle_t count_handle;
+
 		starpu_handle_pack_data(req->data_handle, &req->ptr, &req->count);
-		_starpu_mpi_isend_data_func(req);
+		starpu_variable_data_register(&count_handle, 0, (uintptr_t)&req->count, sizeof(req->count));
+		_starpu_mpi_isend_common(count_handle, req->srcdst, req->mpi_tag, req->comm, 1, _starpu_mpi_isend_size_callback, req);
+		starpu_data_unregister_submit(count_handle);
 	}
 }
 
@@ -165,20 +173,7 @@ static struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t dat
 							int dest, int mpi_tag, MPI_Comm comm,
 							unsigned detached, void (*callback)(void *), void *arg)
 {
-	enum starpu_data_interface_id id = starpu_handle_get_interface_id(data_handle);
-	size_t size;
-
-	size = starpu_handle_get_size(data_handle);
-
-	if (id >= STARPU_MAX_INTERFACE_ID)
-	{
-		starpu_data_handle_t size_handle;
-		starpu_variable_data_register(&size_handle, 0, (uintptr_t)&(size), sizeof(size));
-		starpu_mpi_send(size_handle, dest, mpi_tag, comm);
-		starpu_data_unregister(size_handle);
-	}
-
-	return _starpu_mpi_isend_irecv_common(data_handle, size, dest, mpi_tag, comm, detached, callback, arg, SEND_REQ, _starpu_mpi_isend_pack_func, STARPU_R);
+	return _starpu_mpi_isend_irecv_common(data_handle, dest, mpi_tag, comm, detached, callback, arg, SEND_REQ, _starpu_mpi_isend_size_func, STARPU_R);
 }
 
 int starpu_mpi_isend(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int dest, int mpi_tag, MPI_Comm comm)
@@ -253,7 +248,27 @@ static void _starpu_mpi_irecv_data_func(struct _starpu_mpi_req *req)
 	_STARPU_MPI_LOG_OUT();
 }
 
-static void _starpu_mpi_irecv_pack_func(struct _starpu_mpi_req *req)
+struct _starpu_mpi_irecv_size_callback
+{
+	starpu_data_handle_t handle;
+	struct _starpu_mpi_req *req;
+};
+
+static void _starpu_mpi_irecv_size_callback(void *arg)
+{
+	struct _starpu_mpi_irecv_size_callback *callback = (struct _starpu_mpi_irecv_size_callback *)arg;
+
+	starpu_data_unregister(callback->handle);
+	callback->req->ptr = malloc(callback->req->count);
+#ifdef STARPU_DEVEL
+#warning TODO: in some cases, callback->req->count is incorrect, we need to fix that
+#endif
+	STARPU_ASSERT_MSG(callback->req->ptr, "cannot allocate message of size %ld\n", callback->req->count);
+	_starpu_mpi_irecv_data_func(callback->req);
+	free(callback);
+}
+
+static void _starpu_mpi_irecv_size_func(struct _starpu_mpi_req *req)
 {
 	_STARPU_MPI_LOG_IN();
 
@@ -266,27 +281,16 @@ static void _starpu_mpi_irecv_pack_func(struct _starpu_mpi_req *req)
 	}
 	else
 	{
-		req->ptr = malloc(req->count);
-		_starpu_mpi_irecv_data_func(req);
+		struct _starpu_mpi_irecv_size_callback *callback = malloc(sizeof(struct _starpu_mpi_irecv_size_callback));
+		callback->req = req;
+		starpu_variable_data_register(&callback->handle, 0, (uintptr_t)&(callback->req->count), sizeof(callback->req->count));
+		_starpu_mpi_irecv_common(callback->handle, req->srcdst, req->mpi_tag, req->comm, 1, _starpu_mpi_irecv_size_callback, callback);
 	}
 }
 
 static struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, unsigned detached, void (*callback)(void *), void *arg)
 {
-	enum starpu_data_interface_id id = starpu_handle_get_interface_id(data_handle);
-	size_t size=0;
-
-	if (id >= STARPU_MAX_INTERFACE_ID)
-	{
-		starpu_data_handle_t size_handle;
-		MPI_Status status;
-
-		starpu_variable_data_register(&size_handle, 0, (uintptr_t)&(size), sizeof(size));
-		starpu_mpi_recv(size_handle, source, mpi_tag, comm, &status);
-		starpu_data_unregister(size_handle);
-	}
-
-	return _starpu_mpi_isend_irecv_common(data_handle, size, source, mpi_tag, comm, detached, callback, arg, RECV_REQ, _starpu_mpi_irecv_pack_func, STARPU_W);
+	return _starpu_mpi_isend_irecv_common(data_handle, source, mpi_tag, comm, detached, callback, arg, RECV_REQ, _starpu_mpi_irecv_size_func, STARPU_W);
 }
 
 int starpu_mpi_irecv(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int source, int mpi_tag, MPI_Comm comm)
@@ -351,13 +355,8 @@ static void _starpu_mpi_probe_func(struct _starpu_mpi_req *req)
 
 int starpu_mpi_irecv_probe_detached(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
 {
-	size_t size;
-
 	_STARPU_MPI_LOG_IN();
-
-	size = starpu_handle_get_size(data_handle);
-	_starpu_mpi_isend_irecv_common(data_handle, size, source, mpi_tag, comm, 1, callback, arg, PROBE_REQ, _starpu_mpi_probe_func, STARPU_W);
-
+	_starpu_mpi_isend_irecv_common(data_handle, source, mpi_tag, comm, 1, callback, arg, PROBE_REQ, _starpu_mpi_probe_func, STARPU_W);
 	_STARPU_MPI_LOG_OUT();
 	return 0;
 }