Browse Source

- use a more generic interface for additional continuation features

Olivier Aumage 11 years ago
parent
commit
b908eaae3e
5 changed files with 57 additions and 29 deletions
  1. 28 21
      src/core/jobs.c
  2. 15 3
      src/core/jobs.h
  3. 4 2
      src/core/task.c
  4. 3 2
      src/core/task.h
  5. 7 1
      src/util/openmp_runtime_support.c

+ 28 - 21
src/core/jobs.c

@@ -146,23 +146,22 @@ int _starpu_test_job_termination(struct _starpu_job *j)
 	STARPU_ASSERT(!j->task->detach);
 	return (j->terminated == 2);
 }
-/* Prepare a currently running job for accepting a new set of
- * dependencies in anticipation of becoming a continuation. */
-void _starpu_job_prepare_for_continuation(struct _starpu_job *j)
+void _starpu_job_prepare_for_continuation_ext(struct _starpu_job *j, unsigned continuation_resubmit,
+		void (*continuation_callback_on_sleep)(void *arg), void *continuation_callback_on_sleep_arg)
 {
 	STARPU_ASSERT(!j->continuation);
 	/* continuation are not supported for parallel tasks for now */
 	STARPU_ASSERT(j->task_size == 1);
 	j->continuation = 1;
+	j->continuation_resubmit = continuation_resubmit;
+	j->continuation_callback_on_sleep = continuation_callback_on_sleep;
+	j->continuation_callback_on_sleep_arg = continuation_callback_on_sleep_arg;
 }
-
-void _starpu_job_prepare_for_conditional_continuation(struct _starpu_job *j, struct _starpu_spinlock *lock_ptr)
+/* Prepare a currently running job for accepting a new set of
+ * dependencies in anticipation of becoming a continuation. */
+void _starpu_job_prepare_for_continuation(struct _starpu_job *j)
 {
-	STARPU_ASSERT(!j->continuation);
-	/* continuation are not supported for parallel tasks for now */
-	STARPU_ASSERT(j->task_size == 1);
-	j->continuation_lock_ptr = lock_ptr;
-	j->continuation = 1;
+	_starpu_job_prepare_for_continuation_ext(j, 1, NULL, NULL);
 }
 #endif
 
@@ -374,19 +373,27 @@ void _starpu_handle_job_termination(struct _starpu_job *j)
 		}
 #endif
 
-#ifdef STARPU_OPENMP
-		if (continuation && j->continuation_lock_ptr != NULL)
 		{
-			struct _starpu_spinlock *lock_ptr = j->continuation_lock_ptr;
-			j->continuation_lock_ptr = NULL;
-			_starpu_spin_unlock(lock_ptr);
-		}
-		else
+#ifdef STARPU_OPENMP
+			unsigned continuation_resubmit = j->continuation_resubmit;
+			void (*continuation_callback_on_sleep)(void *arg) = j->continuation_callback_on_sleep;
+			void *continuation_callback_on_sleep_arg = j->continuation_callback_on_sleep_arg;
+			j->continuation_resubmit = 1;
+			j->continuation_callback_on_sleep = NULL;
+			j->continuation_callback_on_sleep_arg = NULL;
+			if (!continuation || continuation_resubmit)
+#endif
+			{
+				/* We reuse the same job structure */
+				int ret = _starpu_submit_job(j);
+				STARPU_ASSERT(!ret);
+			}
+#ifdef STARPU_OPENMP
+			if (continuation && continuation_callback_on_sleep != NULL)
+			{
+				continuation_callback_on_sleep(continuation_callback_on_sleep_arg);
+			}
 #endif
-		{
-			/* We reuse the same job structure */
-			int ret = _starpu_submit_job(j);
-			STARPU_ASSERT(!ret);
 		}
 	}
 

+ 15 - 3
src/core/jobs.h

@@ -112,7 +112,18 @@ LIST_TYPE(_starpu_job,
 	/* Job is a continuation or a regular task. */
 	unsigned continuation;
 
-	struct _starpu_spinlock *continuation_lock_ptr;
+	/* If 0, the prepared continuation is not resubmitted automatically
+	 * when going to sleep, if 1, the prepared continuation is immediately
+	 * resubmitted when going to sleep. */
+	unsigned continuation_resubmit;
+
+	/* Callback function called when:
+	 * - The continuation starpu task is ready to be submitted again if
+	 *   continuation_resubmit = 0;
+	 * - The continuation starpu task has just been re-submitted if
+	 *   continuation_resubmit = 1. */
+	void (*continuation_callback_on_sleep)(void *arg);
+	void *continuation_callback_on_sleep_arg;
 
 	/* Job has been stopped at least once. */
 	unsigned discontinuous;
@@ -170,9 +181,10 @@ void _starpu_wait_job(struct _starpu_job *j);
 int _starpu_test_job_termination(struct _starpu_job *j);
 
 /* Prepare the job for accepting new dependencies before becoming a continuation. */
-void _starpu_job_prepare_for_continuation(struct _starpu_job *j);
 
-void _starpu_job_prepare_for_conditional_continuation(struct _starpu_job *j, struct _starpu_spinlock *lock_ptr);
+void _starpu_job_prepare_for_continuation_ext(struct _starpu_job *j, unsigned continuation_resubmit,
+		void (*continuation_callback_on_sleep)(void *arg), void *continuation_callback_on_sleep_arg);
+void _starpu_job_prepare_for_continuation(struct _starpu_job *j);
 #endif
 
 /* Specify that the task should not appear in the DAG generated by debug tools. */

+ 4 - 2
src/core/task.c

@@ -957,9 +957,11 @@ void _starpu_task_prepare_for_continuation(void)
 	_starpu_job_prepare_for_continuation(_starpu_get_job_associated_to_task(starpu_task_get_current()));
 }
 
-void _starpu_task_prepare_for_conditional_continuation(struct _starpu_spinlock *lock_ptr)
+void _starpu_task_prepare_for_continuation_ext(unsigned continuation_resubmit,
+		void (*continuation_callback_on_sleep)(void *arg), void *continuation_callback_on_sleep_arg)
 {
-	_starpu_job_prepare_for_conditional_continuation(_starpu_get_job_associated_to_task(starpu_task_get_current()), lock_ptr);
+	_starpu_job_prepare_for_continuation_ext(_starpu_get_job_associated_to_task(starpu_task_get_current()),
+		continuation_resubmit, continuation_callback_on_sleep, continuation_callback_on_sleep_arg);
 }
 #endif
 

+ 3 - 2
src/core/task.h

@@ -61,9 +61,10 @@ _starpu_handle_needs_conversion_task_for_arch(starpu_data_handle_t handle,
 
 #ifdef STARPU_OPENMP
 /* Prepare the current task for accepting new dependencies before becoming a continuation. */
-void _starpu_task_prepare_for_continuation(void);
+void _starpu_task_prepare_for_continuation_ext(unsigned continuation_resubmit,
+		void (*continuation_callback_on_sleep)(void *arg), void *continuation_callback_on_sleep_arg);
 
-void _starpu_task_prepare_for_conditional_continuation(struct _starpu_spinlock *lock_ptr);
+void _starpu_task_prepare_for_continuation(void);
 #endif
 
 int _starpu_task_uses_multiformat_handles(struct starpu_task *task);

+ 7 - 1
src/util/openmp_runtime_support.c

@@ -603,6 +603,12 @@ void starpu_omp_parallel_region(const struct starpu_codelet * const _parallel_re
 	destroy_omp_region_struct(new_region);
 }
 
+static void barrier__sleep_callback(void *_task)
+{
+	struct starpu_omp_task *task = _task;
+	_starpu_spin_unlock(&task->lock);
+}
+
 void starpu_omp_barrier(void)
 {
 	struct starpu_omp_task *task = STARPU_PTHREAD_GETSPECIFIC(omp_task_key);
@@ -638,7 +644,7 @@ void starpu_omp_barrier(void)
 		 * . sleep
 		 */
 
-		_starpu_task_prepare_for_conditional_continuation(&task->lock);
+		_starpu_task_prepare_for_continuation_ext(0, barrier__sleep_callback, task);
 		starpu_omp_task_preempt();
 	}
 }