Browse Source

Ensure the progression of detached MPI requests thanks to a progression hook
rather than a mere busy waiting.

Cédric Augonnet 15 years ago
parent
commit
187b338b16
1 changed files with 50 additions and 2 deletions
  1. 50 2
      mpi/starpu_mpi.c

+ 50 - 2
mpi/starpu_mpi.c

@@ -17,6 +17,10 @@
 #include <starpu_mpi.h>
 #include <starpu_mpi_datatype.h>
 
+/* TODO find a better way to select the polling method (perhaps during the
+ * configuration) */
+#define USE_STARPU_ACTIVITY	1
+
 static void submit_mpi_req(void *arg);
 static void handle_request_termination(struct starpu_mpi_req_s *req);
 
@@ -374,6 +378,25 @@ void submit_mpi_req(void *arg)
 }
 
 /*
+ *	Scheduler hook
+ */
+
+unsigned progression_hook_func(void *arg __attribute__((unused)))
+{
+	unsigned may_block = 1;
+
+	pthread_mutex_lock(&mutex);
+	if (!starpu_mpi_req_list_empty(detached_requests))
+	{
+		pthread_cond_signal(&cond);
+		may_block = 0;
+	}
+	pthread_mutex_unlock(&mutex);
+
+	return may_block;
+}
+
+/*
  *	Progression loop
  */
 
@@ -415,10 +438,15 @@ void handle_new_request(struct starpu_mpi_req_s *req)
 
 	if (req->detached)
 	{
+		pthread_mutex_lock(&mutex);
+		starpu_mpi_req_list_push_front(detached_requests, req);
+		pthread_mutex_unlock(&mutex);
+
+		wake_all_blocked_workers();
+
 		/* put the submitted request into the list of pending requests
 		 * so that it can be handled by the progression mechanisms */
 		pthread_mutex_lock(&mutex);
-		starpu_mpi_req_list_push_front(detached_requests, req);
 		pthread_cond_signal(&cond);
 		pthread_mutex_unlock(&mutex);
 	}
@@ -434,7 +462,14 @@ void *progress_thread_func(void *arg __attribute__((unused)))
 
 	pthread_mutex_lock(&mutex);
 	while (running) {
-		if (starpu_mpi_req_list_empty(new_requests) && starpu_mpi_req_list_empty(detached_requests))
+		/* shall we block ? */
+		unsigned block = starpu_mpi_req_list_empty(new_requests);
+
+#ifndef USE_STARPU_ACTIVITY
+		block = block && starpu_mpi_req_list_empty(detached_requests);
+#endif
+
+		if (block)
 			pthread_cond_wait(&cond, &mutex);
 
 		if (!running)
@@ -470,6 +505,10 @@ void *progress_thread_func(void *arg __attribute__((unused)))
  *	(De)Initialization methods 
  */
 
+#ifdef USE_STARPU_ACTIVITY
+int hookid = - 1;
+#endif
+
 int starpu_mpi_initialize(void)
 {
 	pthread_mutex_init(&mutex, NULL);
@@ -486,6 +525,11 @@ int starpu_mpi_initialize(void)
 		pthread_cond_wait(&cond, &mutex);
 	pthread_mutex_unlock(&mutex);
 
+#ifdef USE_STARPU_ACTIVITY
+	hookid = starpu_register_progression_hook(progression_hook_func, NULL);
+	STARPU_ASSERT(hookid >= 0);
+#endif
+
 	return 0;
 }
 
@@ -501,6 +545,10 @@ int starpu_mpi_shutdown(void)
 
 	pthread_join(progress_thread, &value);
 
+#ifdef USE_STARPU_ACTIVITY
+	starpu_deregister_progression_hook(hookid);
+#endif 
+
 	/* liberate the request queues */
 	starpu_mpi_req_list_delete(detached_requests);
 	starpu_mpi_req_list_delete(new_requests);