浏览代码

Adding documentation on new implemented public functions, comments in mpi/src/starpu_mpi.c, and getter/setter functions so as the application can access to the internal starpu_mpi_tag used by StarPU-MPI.

Marc Sergent 12 年之前
父节点
当前提交
6832511f53

+ 8 - 8
ChangeLog

@@ -18,6 +18,14 @@ StarPU 1.2.0 (svn revision xxxx)
 ==============================================
 
 New features:
+  * MPI:
+        - New internal communication system : a unique tag called
+	  starpu_mpi_tag is now used for all communications, and a system
+	  of hashmaps on each node which stores pending receives has been
+	  implemented. Every message is now coupled with an envelope, sent
+	  before the corresponding data, which allows the receiver to
+	  allocate data correctly, and to submit the matching receive of
+	  the envelope.
 
 StarPU 1.1.0 (svn revision xxxx)
 ==============================================
@@ -67,14 +75,6 @@ New features:
 	  data with dynamic size can now be exchanged with StarPU-MPI.
         - New functionality starpu_mpi_irecv_probe_detached which
   	  first tests if the message is available before calling MPI_Recv
-	  (now deprecated).
-        - New internal communication system : a unique tag called
-	  STARPU_MPI_TAG is now used for all communications, and a system
-	  of hashmaps on each node which stores pending receives has been
-	  implemented. Every message is now coupled with an envelope, sent
-	  before the corresponding data, which allows the receiver to
-	  allocate data correctly, and to submit the matching receive of
-	  the envelope.
   * Add experimental simgrid support, to simulate execution with various
     number of CPUs, GPUs, amount of memory, etc.
   * Add support for OpenCL simulators (which provide simulated execution time)

+ 6 - 5
configure.ac

@@ -146,15 +146,16 @@ esac
 if test x"$enable_native_winthreads" = xyes
 then
     CPPFLAGS+=" -I$STARPU_SRC_DIR/include/pthread_win32 "
+    AC_COMPILE_IFELSE(
+          [AC_LANG_PROGRAM([[
+    	        #include <pthread.h>
+	  	]],
+		[[ pthread_t t; pthread_create(&t, NULL, NULL, NULL); ]])]
+		,,AC_MSG_ERROR([pthread_create unavailable]))
 else
     AC_CHECK_LIB([pthread], [pthread_create])
 fi
 
-AC_COMPILE_IFELSE(
-  [AC_LANG_PROGRAM([[
-    #include <pthread.h>
-  ]], [[ pthread_t t; pthread_create(&t, NULL, NULL, NULL); ]])],,
-  AC_MSG_ERROR([pthread_create unavailable]))
 AC_SEARCH_LIBS([sqrt],[m],,AC_MSG_ERROR([math library unavailable]))
 AC_HAVE_LIBRARY([ws2_32])
 AC_CHECK_FUNCS([sysconf])

+ 9 - 0
doc/chapters/basic-api.texi

@@ -506,6 +506,11 @@ matrix) which can be registered by the means of helper functions (e.g.
 @code{starpu_vector_data_register} or @code{starpu_matrix_data_register}).
 @end deftypefun
 
+@deftypefun struct starpu_data_interface_ops* starpu_handle_get_interface (starpu_data_handle_t @var{handle})
+Return the associated interface_ops of a data handle, which is different from the interface of
+data stored in it.
+@end deftypefun
+
 @deftypefun void starpu_data_register_same ({starpu_data_handle_t *}@var{handledst}, starpu_data_handle_t @var{handlesrc})
 Register a new piece of data into the handle @var{handledst} with the
 same interface as the handle @var{handlesrc}.
@@ -2146,6 +2151,10 @@ mode of a specific data handle with the
 @code{starpu_data_set_sequential_consistency_flag} function.
 @end deftypefun
 
+@deftypefun unsigned starpu_data_get_sequential_consistency_flag (starpu_data_handle_t @var{handle})
+Return the sequential consistency flag associated to a data handle.
+@end deftypefun
+
 @deftypefun unsigned starpu_data_get_default_sequential_consistency_flag (void)
 Return the default sequential consistency flag
 @end deftypefun

+ 21 - 7
doc/chapters/mpi-support.texi

@@ -151,13 +151,23 @@ the list of the ready requests if it is a send request, or in an
 hashmap if it is a receive request.
 
 Internally, all MPI communications submitted by StarPU uses a unique
-tag called STARPU_MPI_TAG. The matching of tags with corresponding
-requests is done into StarPU-MPI. To handle this, any communication is 
-a double-communication based on a envelope + data system. Every data 
-which will be sent needs to send an envelope which describes the data 
-(particularly its tag) before sending the data, so the receiver can
-get the matching pending receive request from the hashmap, and submit 
-it to recieve the data correctly.
+tag called starpu_mpi_tag, which can be accessed with getter/setter
+functions.
+
+@deftypefun void starpu_mpi_set_starpu_mpi_tag (int @var{tag})
+Tell StarPU-MPI which MPI tag to use for all its communications.
+@end deftypefun
+
+@deftypefun int starpu_mpi_get_starpu_mpi_tag (void)
+Returns the MPI tag which will be used for all StarPU-MPI communications.
+@end deftypefun
+
+The matching of tags with corresponding requests is done into StarPU-MPI. 
+To handle this, any communication is a double-communication based on a 
+envelope + data system. Every data which will be sent needs to send an 
+envelope which describes the data (particularly its tag) before sending 
+the data, so the receiver can get the matching pending receive request 
+from the hashmap, and submit it to recieve the data correctly.
 
 To this aim, the StarPU-MPI progression thread has a permanent-submitted 
 request destined to receive incoming envelopes from all sources.
@@ -269,6 +279,10 @@ Tell StarPU-MPI which MPI tag to use when exchanging the data.
 Returns the MPI tag to be used when exchanging the data.
 @end deftypefun
 
+@deftypefun starpu_data_handle_t starpu_data_get_data_handle_from_tag (int @var{tag})
+Returns the data handle associated to the MPI tag, or NULL if there is not.
+@end deftypefun
+
 @deftypefun int starpu_data_set_rank (starpu_data_handle_t @var{handle}, int @var{rank})
 Tell StarPU-MPI which MPI node "owns" a given data, that is, the node which will
 always keep an up-to-date value, and will by default execute tasks which write

+ 2 - 2
examples/basic_examples/hello_world.c

@@ -59,10 +59,9 @@ void cpu_func(void *buffers[], void *cl_arg)
 	FPRINTF(stdout, "Hello world (params = {%i, %f} )\n", params->i, params->f);
 }
 
-struct starpu_codelet cl = {};
-
 int main(int argc, char **argv)
 {
+	struct starpu_codelet cl;
 	struct starpu_task *task;
 	struct params params = {1, 2.0f};
 	int ret;
@@ -80,6 +79,7 @@ int main(int argc, char **argv)
 	 * called */
 	task = starpu_task_create();
 
+	starpu_codelet_init(&cl);
 	/* this codelet may only be executed on a CPU, and its cpu
  	 * implementation is function "cpu_func" */
 	cl.cpu_funcs[0] = cpu_func;

+ 3 - 2
include/starpu.h

@@ -23,11 +23,12 @@
 #ifndef _MSC_VER
 #include <stdint.h>
 #else
+#include <windows.h>
 typedef unsigned char uint8_t;
 typedef unsigned short uint16_t;
-typedef unsigned long uint32_t;
+typedef unsigned int uint32_t;
 typedef unsigned long long uint64_t;
-typedef unsigned long int uintptr_t;
+typedef UINT_PTR uintptr_t;
 #endif
 
 #include <starpu_config.h>

+ 8 - 0
include/starpu_config.h.in

@@ -98,6 +98,14 @@ typedef ssize_t starpu_ssize_t;
 #  define __starpu_inline __inline__
 #endif
 
+#ifdef _MSC_VER
+struct timespec
+{
+  time_t  tv_sec;  /* Seconds */
+  long    tv_nsec; /* Nanoseconds */
+};
+#endif
+
 #undef STARPU_QUICK_CHECK
 #undef STARPU_USE_DRAND48
 #undef STARPU_USE_ERAND48_R

+ 4 - 0
mpi/include/starpu_mpi.h

@@ -71,6 +71,10 @@ void starpu_mpi_comm_amounts_retrieve(size_t *comm_amounts);
 void starpu_mpi_cache_flush(MPI_Comm comm, starpu_data_handle_t data_handle);
 void starpu_mpi_cache_flush_all_data(MPI_Comm comm);
 
+/* getter/setter for internal starpu_mpi_tag used for all communications in StarPU-MPI. */
+int starpu_mpi_get_starpu_mpi_tag(void);
+void starpu_mpi_set_starpu_mpi_tag(int tag);
+
 #ifdef __cplusplus
 }
 #endif

+ 55 - 45
mpi/src/starpu_mpi.c

@@ -62,7 +62,7 @@ struct _starpu_mpi_envelope
 	ssize_t psize;
 	int mpi_tag;
 };
- 
+
 struct _starpu_mpi_copy_handle
 {
 	starpu_data_handle_t handle;
@@ -242,7 +242,7 @@ static void _starpu_mpi_isend_data_func(struct _starpu_mpi_req *req)
 
 	TRACE_MPI_ISEND_SUBMIT_BEGIN(req->srcdst, req->mpi_tag, 0);
 
-	req->ret = MPI_Isend(req->ptr, req->count, req->datatype, req->srcdst, STARPU_MPI_TAG, req->comm, &req->request);
+	req->ret = MPI_Isend(req->ptr, req->count, req->datatype, req->srcdst, starpu_mpi_tag, req->comm, &req->request);
 	STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Isend returning %d", req->ret);
 
 	TRACE_MPI_ISEND_SUBMIT_END(req->srcdst, req->mpi_tag, 0);
@@ -273,11 +273,8 @@ static void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
 		
 		env->psize = (ssize_t)req->count;
 		
-		int type_size = 0;
-		MPI_Type_size(req->datatype,&type_size);
-		
-		_STARPU_MPI_DEBUG(1, "Post MPI isend count (%ld) datatype_size %d request to %d with tag %d\n",req->count,type_size,req->srcdst,STARPU_MPI_TAG);
-		MPI_Isend(env, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, STARPU_MPI_TAG, req->comm, &req->size_req);
+		_STARPU_MPI_DEBUG(1, "Post MPI isend count (%ld) datatype_size %d request to %d with tag %d\n",req->count,starpu_handle_get_size(req->data_handle),req->srcdst,starpu_mpi_tag);
+		MPI_Isend(env, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, starpu_mpi_tag, req->comm, &req->size_req);
 	}
 	else
 	{
@@ -289,9 +286,9 @@ static void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
 		if (env->psize != -1)
  		{
  			// We already know the size of the data, let's send it to overlap with the packing of the data
-			_STARPU_MPI_DEBUG(1, "Sending size %ld (%ld %s) with tag %d to node %d (first call to pack)\n", env->psize, sizeof(req->count), _starpu_mpi_datatype(MPI_BYTE), STARPU_MPI_TAG, req->srcdst);
+			_STARPU_MPI_DEBUG(1, "Sending size %ld (%ld %s) with tag %d to node %d (first call to pack)\n", env->psize, sizeof(req->count), _starpu_mpi_datatype(MPI_BYTE), starpu_mpi_tag, req->srcdst);
 			req->count = env->psize;
-			ret = MPI_Isend(env, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, STARPU_MPI_TAG, req->comm, &req->size_req);
+			ret = MPI_Isend(env, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, starpu_mpi_tag, req->comm, &req->size_req);
 			STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "when sending size, MPI_Isend returning %d", ret);
  		}
  
@@ -300,8 +297,8 @@ static void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
 		if (env->psize == -1)
  		{
  			// We know the size now, let's send it
-			_STARPU_MPI_DEBUG(1, "Sending size %ld (%ld %s) with tag %d to node %d (second call to pack)\n", env->psize, sizeof(req->count), _starpu_mpi_datatype(MPI_BYTE), STARPU_MPI_TAG, req->srcdst);
-			ret = MPI_Isend(env, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, STARPU_MPI_TAG, req->comm, &req->size_req);
+			_STARPU_MPI_DEBUG(1, "Sending size %ld (%ld %s) with tag %d to node %d (second call to pack)\n", env->psize, sizeof(req->count), _starpu_mpi_datatype(MPI_BYTE), starpu_mpi_tag, req->srcdst);
+			ret = MPI_Isend(env, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, starpu_mpi_tag, req->comm, &req->size_req);
 			STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "when sending size, MPI_Isend returning %d", ret);
  		}
  		else
@@ -377,7 +374,7 @@ static void _starpu_mpi_irecv_data_func(struct _starpu_mpi_req *req)
 
 	TRACE_MPI_IRECV_SUBMIT_BEGIN(req->srcdst, req->mpi_tag);
 
-	req->ret = MPI_Irecv(req->ptr, req->count, req->datatype, req->srcdst, STARPU_MPI_TAG, req->comm, &req->request);
+	req->ret = MPI_Irecv(req->ptr, req->count, req->datatype, req->srcdst, starpu_mpi_tag, req->comm, &req->request);
 	STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_IRecv returning %d", req->ret);
 
 	TRACE_MPI_IRECV_SUBMIT_END(req->srcdst, req->mpi_tag);
@@ -796,7 +793,13 @@ static void _starpu_mpi_submit_new_mpi_request(void *arg)
 
 	if (req->request_type == RECV_REQ)
 	{
+		/* test whether the receive request has already been submitted internally by StarPU-MPI*/
 		struct _starpu_mpi_copy_handle *chandle = find_chandle(req->mpi_tag);
+		
+		/* Case : the request has already been submitted internally by StarPU.
+		 * We'll asynchronously ask a Read permission over the temporary handle, so as when
+		 * the internal receive will be over, the _starpu_mpi_copy_cb function will be called to
+		 * bring the data back to the original data handle associated to the request.*/
 		if (chandle && (req->data_handle != chandle->handle))
 		{
 			_STARPU_MPI_DEBUG(3, "The RECV request %p with tag %d has already been received, copying previously received data into handle's pointer..\n", req, req->mpi_tag);
@@ -808,45 +811,43 @@ static void _starpu_mpi_submit_new_mpi_request(void *arg)
 
 			_STARPU_MPI_DEBUG(3, "Calling data_acquire_cb on starpu_mpi_copy_cb..\n");
 			starpu_data_acquire_cb(chandle->handle,STARPU_R,_starpu_mpi_copy_cb,(void*) arg);
-
-			_STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
-			_STARPU_MPI_LOG_OUT();
 		}
-		else if (chandle && (req->data_handle == chandle->handle))
+		else 
 		{
-			_starpu_mpi_handle_allocate_datatype(req->data_handle, &req->datatype, &req->user_datatype);
-			if (req->user_datatype == 0)
+			/* Case : the request is the internal receive request submitted by StarPU-MPI to receive
+			 * incoming data without a matching pending receive already submitted by the application.
+			 * We immediately allocate the pointer associated to the data_handle, and pushing it into
+			 * the list of new_requests, so as the real MPI request can be submitted before the next
+			 * submission of the envelope-catching request. */
+			if (chandle && (req->data_handle == chandle->handle))
 			{
-				req->count = 1;
-				req->ptr = starpu_handle_get_local_ptr(req->data_handle);
+				_starpu_mpi_handle_allocate_datatype(req->data_handle, &req->datatype, &req->user_datatype);
+				if (req->user_datatype == 0)
+				{
+					req->count = 1;
+					req->ptr = starpu_handle_get_local_ptr(req->data_handle);
+				}
+				else
+				{
+					req->count = chandle->env->psize;
+					req->ptr = malloc(req->count);
+
+					STARPU_ASSERT_MSG(req->ptr, "cannot allocate message of size %ld\n", req->count);
+				}
+
+				_starpu_mpi_req_list_push_front(new_requests, req);
+
+				_STARPU_MPI_DEBUG(3, "Pushing internal starpu_mpi_irecv request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
 			}
+			/* Case : a classic receive request with no send received earlier than expected.
+			 * We just add the pending receive request to the requests' hashmap. */
 			else
 			{
-				req->count = chandle->env->psize;
-				req->ptr = malloc(req->count);
-
-				STARPU_ASSERT_MSG(req->ptr, "cannot allocate message of size %ld\n", req->count);
+				add_req(req);
 			}
 
-			_starpu_mpi_req_list_push_front(new_requests, req);
-
-			newer_requests = 1;
-			_STARPU_MPI_DEBUG(3, "Pushing internal starpu_mpi_irecv request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n",
-					  req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
-			_STARPU_PTHREAD_COND_BROADCAST(&cond_progression);
-			_STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
-			_STARPU_MPI_LOG_OUT();
-		}
-		else
-		{
-			add_req(req);
-
 			newer_requests = 1;
-			_STARPU_MPI_DEBUG(3, "Pushing new request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n",
-					  req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
 			_STARPU_PTHREAD_COND_BROADCAST(&cond_progression);
-			_STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
-			_STARPU_MPI_LOG_OUT();
 		}
 	}
 	else
@@ -857,9 +858,10 @@ static void _starpu_mpi_submit_new_mpi_request(void *arg)
 		_STARPU_MPI_DEBUG(3, "Pushing new request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n",
 				  req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
 		_STARPU_PTHREAD_COND_BROADCAST(&cond_progression);
-		_STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
-		_STARPU_MPI_LOG_OUT();
 	}
+
+	_STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+	_STARPU_MPI_LOG_OUT();
 }
 
 #ifdef STARPU_MPI_ACTIVITY
@@ -1081,9 +1083,11 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 			_STARPU_PTHREAD_MUTEX_LOCK(&mutex);
 		}
 
+		/* If there is no currently submitted header_req submitted to catch envelopes from senders, and there is some pending receive
+		 * requests in our side, we resubmit a header request. */
 		if ((HASH_COUNT(_starpu_mpi_req_hashmap) > 0) && (header_req_submitted == 0) && (HASH_COUNT(_starpu_mpi_copy_handle_hashmap) == 0))
 		{
-			MPI_Irecv(recv_env, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, MPI_ANY_SOURCE, STARPU_MPI_TAG, MPI_COMM_WORLD, &header_req);
+			MPI_Irecv(recv_env, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, MPI_ANY_SOURCE, starpu_mpi_tag, MPI_COMM_WORLD, &header_req);
 
 			_STARPU_MPI_DEBUG(3, "Submit of header_req OK!\n");
 			header_req_submitted = 1;
@@ -1100,17 +1104,21 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 			MPI_Status status;
 			_STARPU_MPI_DEBUG(3, "Test of header_req\n");
 
+			/* test whether an envelope has arrived. */
 			res = MPI_Test(&header_req, &flag, &status);
 			STARPU_ASSERT(res == MPI_SUCCESS);
 
 			if (flag)
 			{
-				_STARPU_MPI_DEBUG(3, "Request received !\n");
+				_STARPU_MPI_DEBUG(3, "header_req received !\n");
 
 				_STARPU_MPI_DEBUG(3, "Searching for request with tag %d, size %ld ..\n",recv_env->mpi_tag, recv_env->psize);
 
 				struct _starpu_mpi_req *found_req = find_req(recv_env->mpi_tag);
 
+				/* Case : a data will arrive before the matching receive has been submitted in our side of the application. 
+				 * We will allow a temporary handle to store the incoming data, by submitting a starpu_mpi_irecv_detached
+				 * on this handle, and register this so as the StarPU-MPI layer can remember it.*/
 				if (!found_req)
 				{
 					_STARPU_MPI_DEBUG(3, "Request with tag %d not found, creating a copy_handle to receive incoming data..\n",recv_env->mpi_tag);
@@ -1138,6 +1146,8 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 
 					_STARPU_MPI_DEBUG(3, "Success of starpu_irecv_detached on copy_handle with tag %d from src %d ..\n", chandle->mpi_tag, status.MPI_SOURCE); 
 				}
+				/* Case : a matching receive has been found for the incoming data, we handle the correct allocation of the pointer associated to
+				 * the data handle, then submit the corresponding receive with _starpu_mpi_handle_new_request. */
 				else
 				{
 					_STARPU_MPI_DEBUG(3, "Found !\n");

+ 12 - 0
mpi/src/starpu_mpi_private.c

@@ -15,11 +15,23 @@
  * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  */
 
+#include <starpu_mpi_private.h>
+
 int _debug_rank=-1;
 int _debug_level=3;
+int starpu_mpi_tag = 42;
 
 void _starpu_mpi_set_debug_level(int level)
 {
 	_debug_level = level;
 }
 
+int starpu_mpi_get_starpu_mpi_tag(void)
+{
+	return starpu_mpi_tag;
+}
+
+void starpu_mpi_set_starpu_mpi_tag(int tag)
+{
+	starpu_mpi_tag = tag;
+}

+ 1 - 1
mpi/src/starpu_mpi_private.h

@@ -69,7 +69,7 @@ void _starpu_mpi_set_debug_level(int level);
 #  define _STARPU_MPI_LOG_OUT()
 #endif
 
-#define STARPU_MPI_TAG 42
+extern int starpu_mpi_tag;
 
 enum _starpu_mpi_request_type
 {