Przeglądaj źródła

Make starpu_shutdown wait for all ready tasks. Otherwise, we may stop a worker which was supposed to make a transfer which is needed by some task currently running on some other worker, waiting for that data

Samuel Thibault 13 lat temu
rodzic
commit
a9f7dfbaa3
5 zmienionych plików z 54 dodań i 1 usunięć
  1. 3 0
      include/starpu_task.h
  2. 3 0
      src/core/jobs.c
  3. 42 1
      src/core/task.c
  4. 4 0
      src/core/task.h
  5. 2 0
      src/core/workers.c

+ 3 - 0
include/starpu_task.h

@@ -280,6 +280,9 @@ int starpu_task_wait(struct starpu_task *task);
  * been executed. */
 int starpu_task_wait_for_all(void);
 
+/* This function waits until there is no more ready task. */
+int starpu_task_wait_for_no_ready(void);
+
 void starpu_display_codelet_stats(struct starpu_codelet_t *cl);
 
 /* Return the task currently executed by the worker, or NULL if this is called

+ 3 - 0
src/core/jobs.c

@@ -232,6 +232,7 @@ void _starpu_handle_job_termination(starpu_job_t j, unsigned job_is_already_lock
 		STARPU_ASSERT(!ret);
 	}
 	_starpu_decrement_nsubmitted_tasks();
+	_starpu_decrement_nready_tasks();
 }
 
 /* This function is called when a new task is submitted to StarPU 
@@ -332,6 +333,8 @@ unsigned _starpu_enforce_deps_and_schedule(starpu_job_t j, unsigned job_is_alrea
 		return 0;
         }
 
+	_starpu_increment_nready_tasks();
+
 	ret = _starpu_push_task(j, job_is_already_locked);
 
         _STARPU_LOG_OUT();

+ 42 - 1
src/core/task.c

@@ -32,7 +32,7 @@
 /* TODO we could make this hierarchical to avoid contention ? */
 static pthread_cond_t submitted_cond = PTHREAD_COND_INITIALIZER;
 static pthread_mutex_t submitted_mutex = PTHREAD_MUTEX_INITIALIZER;
-static long int nsubmitted = 0;
+static long int nsubmitted = 0, nready;
 
 static void _starpu_increment_nsubmitted_tasks(void);
 
@@ -349,6 +349,27 @@ int starpu_task_wait_for_all(void)
 	return 0;
 }
 
+/*
+ * We wait until there is no ready task any more (i.e. StarPU will not be able
+ * to progress any more).
+ */
+int starpu_task_wait_for_no_ready(void)
+{
+	if (STARPU_UNLIKELY(!_starpu_worker_may_perform_blocking_calls()))
+		return -EDEADLK;
+
+	PTHREAD_MUTEX_LOCK(&submitted_mutex);
+
+	STARPU_TRACE_TASK_WAIT_FOR_ALL;
+
+	while (nready > 0)
+		PTHREAD_COND_WAIT(&submitted_cond, &submitted_mutex);
+	
+	PTHREAD_MUTEX_UNLOCK(&submitted_mutex);
+
+	return 0;
+}
+
 void _starpu_decrement_nsubmitted_tasks(void)
 {
 	PTHREAD_MUTEX_LOCK(&submitted_mutex);
@@ -373,6 +394,26 @@ static void _starpu_increment_nsubmitted_tasks(void)
 	PTHREAD_MUTEX_UNLOCK(&submitted_mutex);
 }
 
+void _starpu_increment_nready_tasks(void)
+{
+	PTHREAD_MUTEX_LOCK(&submitted_mutex);
+
+	nready++;
+
+	PTHREAD_MUTEX_UNLOCK(&submitted_mutex);
+}
+
+void _starpu_decrement_nready_tasks(void)
+{
+	PTHREAD_MUTEX_LOCK(&submitted_mutex);
+
+	if (--nready == 0)
+		PTHREAD_COND_BROADCAST(&submitted_cond);
+
+	PTHREAD_MUTEX_UNLOCK(&submitted_mutex);
+
+}
+
 void _starpu_initialize_current_task_key(void)
 {
 	pthread_key_create(&current_task_key, NULL);

+ 4 - 0
src/core/task.h

@@ -25,6 +25,10 @@
 /* In order to implement starpu_task_wait_for_all, we keep track of the number of
  * task currently submitted */
 void _starpu_decrement_nsubmitted_tasks(void);
+/* In order to implement starpu_task_wait_for_no_ready, we keep track of the number of
+ * task currently ready */
+void _starpu_increment_nready_tasks(void);
+void _starpu_decrement_nready_tasks(void);
 
 /* A pthread key is used to store the task currently executed on the thread.
  * _starpu_initialize_current_task_key initializes this pthread key and

+ 2 - 0
src/core/workers.c

@@ -521,6 +521,8 @@ void starpu_shutdown(void)
 	initialized = CHANGING;
 	PTHREAD_MUTEX_UNLOCK(&init_mutex);
 
+	starpu_task_wait_for_no_ready();
+
 	_starpu_display_msi_stats();
 	_starpu_display_alloc_cache_stats();