Browse Source

- OMP support ongoing work
- rename preemption into continuation at the StarPU task/job level
- preliminary implementation of StarPU support for continuations

Olivier Aumage 11 years ago
parent
commit
e41dda4d04

+ 3 - 0
include/starpu_task.h

@@ -62,6 +62,9 @@ enum starpu_task_status
 	STARPU_TASK_BLOCKED_ON_TAG,
 	STARPU_TASK_BLOCKED_ON_TASK,
 	STARPU_TASK_BLOCKED_ON_DATA
+#ifdef STARPU_OPENMP
+	, STARPU_TASK_STOPPED
+#endif
 };
 
 typedef uint64_t starpu_tag_t;

+ 6 - 1
src/core/dependencies/task_deps.c

@@ -67,7 +67,12 @@ void _starpu_task_declare_deps_array(struct starpu_task *task, unsigned ndeps, s
 
 	STARPU_PTHREAD_MUTEX_LOCK(&job->sync_mutex);
 	if (check)
-		STARPU_ASSERT_MSG(!job->submitted || !task->destroy || task->detach, "Task dependencies have to be set before submission (submitted %u destroy %d detach %d)", job->submitted, task->destroy, task->detach);
+		STARPU_ASSERT_MSG(
+				!job->submitted || !task->destroy || task->detach
+#ifdef STARPU_OPENMP
+				|| job->continuation
+#endif
+				, "Task dependencies have to be set before submission (submitted %u destroy %d detach %d)", job->submitted, task->destroy, task->detach);
 	else
 		STARPU_ASSERT_MSG(job->terminated <= 1, "Task dependencies have to be set before termination (terminated %u)", job->terminated);
 

+ 132 - 44
src/core/jobs.c

@@ -146,6 +146,15 @@ int _starpu_test_job_termination(struct _starpu_job *j)
 	STARPU_ASSERT(!j->task->detach);
 	return (j->terminated == 2);
 }
+/* Prepare a currently running job for accepting a new set of
+ * dependencies in anticipation of becoming a continuation. */
+void _starpu_job_prepare_for_continuation(struct _starpu_job *j)
+{
+	STARPU_ASSERT(!j->continuation);
+	/* continuation are not supported for parallel tasks for now */
+	STARPU_ASSERT(j->task_size == 1);
+	j->continuation = 1;
+}
 #endif
 
 void _starpu_handle_job_termination(struct _starpu_job *j)
@@ -153,15 +162,31 @@ void _starpu_handle_job_termination(struct _starpu_job *j)
 	struct starpu_task *task = j->task;
 	unsigned sched_ctx = task->sched_ctx;
 	double flops = task->flops;
-	STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
+	const unsigned continuation =
+#ifdef STARPU_OPENMP
+		j->continuation
+#else
+		0
+#endif
+		;
 
-	task->status = STARPU_TASK_FINISHED;
+	STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
+#ifdef STARPU_OPENMP
+	if (continuation)
+	{
+		task->status = STARPU_TASK_STOPPED;
+	}
+	else
+#endif
+	{
+		task->status = STARPU_TASK_FINISHED;
 
-	/* We must have set the j->terminated flag early, so that it is
-	 * possible to express task dependencies within the callback
-	 * function. A value of 1 means that the codelet was executed but that
-	 * the callback is not done yet. */
-	j->terminated = 1;
+		/* We must have set the j->terminated flag early, so that it is
+		 * possible to express task dependencies within the callback
+		 * function. A value of 1 means that the codelet was executed but that
+		 * the callback is not done yet. */
+		j->terminated = 1;
+	}
 
 	STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
 
@@ -178,7 +203,7 @@ void _starpu_handle_job_termination(struct _starpu_job *j)
 #endif //STARPU_USE_SC_HYPERVISOR
 
 	/* We release handle reference count */
-	if (task->cl)
+	if (task->cl && !continuation)
 	{
 		unsigned i;
 		for (i=0; i<task->cl->nbuffers; i++)
@@ -190,14 +215,23 @@ void _starpu_handle_job_termination(struct _starpu_job *j)
 				_starpu_spin_unlock(&handle->header_lock);
 		}
 	}
-
-	/* Tell other tasks that we don't exist any more, thus no need for
-	 * implicit dependencies any more.  */
-	_starpu_release_task_enforce_sequential_consistency(j);
+	/* If this is a continuation, we do not release task dependencies now.
+	 * Task dependencies will be released only when the continued task
+	 * fully completes */
+	if (!continuation)
+	{
+		/* Tell other tasks that we don't exist any more, thus no need for
+		 * implicit dependencies any more.  */
+		_starpu_release_task_enforce_sequential_consistency(j);
+	}
 	/* Task does not have a cl, but has explicit data dependencies, we need
 	 * to tell them that we will not exist any more before notifying the
-	 * tasks waiting for us */
-	if (j->implicit_dep_handle) {
+	 * tasks waiting for us
+	 *
+	 * For continuations, implicit dependency handles are only released 
+	 * when the task fully completes */
+	if (j->implicit_dep_handle && !continuation)
+	{
 		starpu_data_handle_t handle = j->implicit_dep_handle;
 		_starpu_release_data_enforce_sequential_consistency(j->task, handle);
 		/* Release reference taken while setting implicit_dep_handle */
@@ -207,46 +241,68 @@ void _starpu_handle_job_termination(struct _starpu_job *j)
 			_starpu_spin_unlock(&handle->header_lock);
 	}
 
-	/* in case there are dependencies, wake up the proper tasks */
-	_starpu_notify_dependencies(j);
+	/* If this is a continuation, we do not notify task/tag dependencies
+	 * now. Task/tag dependencies will be notified only when the continued
+	 * task fully completes */
+	if (!continuation)
+	{
+		/* in case there are dependencies, wake up the proper tasks */
+		_starpu_notify_dependencies(j);
+	}
 
-	/* the callback is executed after the dependencies so that we may remove the tag
- 	 * of the task itself */
-	if (task->callback_func)
+	/* If this is a continuation, we do not execute the callback
+	 * now. The callback will be executed only when the continued
+	 * task fully completes */
+	if (!continuation)
 	{
-		int profiling = starpu_profiling_status_get();
-		if (profiling && task->profiling_info)
-			_starpu_clock_gettime(&task->profiling_info->callback_start_time);
+		/* the callback is executed after the dependencies so that we may remove the tag
+		 * of the task itself */
+		if (task->callback_func)
+		{
+			int profiling = starpu_profiling_status_get();
+			if (profiling && task->profiling_info)
+				_starpu_clock_gettime(&task->profiling_info->callback_start_time);
 
-		/* so that we can check whether we are doing blocking calls
-		 * within the callback */
-		_starpu_set_local_worker_status(STATUS_CALLBACK);
+			/* so that we can check whether we are doing blocking calls
+			 * within the callback */
+			_starpu_set_local_worker_status(STATUS_CALLBACK);
 
 
-		/* Perhaps we have nested callbacks (eg. with chains of empty
-		 * tasks). So we store the current task and we will restore it
-		 * later. */
-		struct starpu_task *current_task = starpu_task_get_current();
+			/* Perhaps we have nested callbacks (eg. with chains of empty
+			 * tasks). So we store the current task and we will restore it
+			 * later. */
+			struct starpu_task *current_task = starpu_task_get_current();
 
-		_starpu_set_current_task(task);
+			_starpu_set_current_task(task);
 
-		_STARPU_TRACE_START_CALLBACK(j);
-		task->callback_func(task->callback_arg);
-		_STARPU_TRACE_END_CALLBACK(j);
+			_STARPU_TRACE_START_CALLBACK(j);
+			task->callback_func(task->callback_arg);
+			_STARPU_TRACE_END_CALLBACK(j);
 
-		_starpu_set_current_task(current_task);
+			_starpu_set_current_task(current_task);
 
-		_starpu_set_local_worker_status(STATUS_UNKNOWN);
+			_starpu_set_local_worker_status(STATUS_UNKNOWN);
 
-		if (profiling && task->profiling_info)
-			_starpu_clock_gettime(&task->profiling_info->callback_end_time);
+			if (profiling && task->profiling_info)
+				_starpu_clock_gettime(&task->profiling_info->callback_end_time);
+		}
 	}
 
 	/* If the job was executed on a combined worker there is no need for the
 	 * scheduler to process it : the task structure doesn't contain any valuable
 	 * data as it's not linked to an actual worker */
 	/* control task should not execute post_exec_hook */
-	if(j->task_size == 1 && task->cl != NULL && !j->internal)
+	if(j->task_size == 1 && task->cl != NULL && !j->internal
+#ifdef STARPU_OPENMP
+	/* If this is a continuation, we do not execute the post_exec_hook. The
+	 * post_exec_hook will be run only when the continued task fully
+	 * completes.
+	 *
+	 * Note: If needed, a specific hook could be added to handle stopped
+	 * tasks */
+	&& !continuation
+#endif
+			)
 	{
 		_starpu_sched_post_exec_hook(task);
 #ifdef STARPU_USE_SC_HYPERVISOR
@@ -255,6 +311,9 @@ void _starpu_handle_job_termination(struct _starpu_job *j)
 
 	}
 
+	/* Note: For now, we keep the TASK_DONE trace event for continuation,
+	 * however we could add a specific event for stopped tasks if needed.
+	 */
 	_STARPU_TRACE_TASK_DONE(j);
 
 	/* NB: we do not save those values before the callback, in case the
@@ -267,9 +326,12 @@ void _starpu_handle_job_termination(struct _starpu_job *j)
 	/* we do not desallocate the job structure if some is going to
 	 * wait after the task */
 	STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
-	/* A value of 2 is put to specify that not only the codelet but
-	 * also the callback were executed. */
-	j->terminated = 2;
+	if (!continuation)
+	{
+		/* A value of 2 is put to specify that not only the codelet but
+		 * also the callback were executed. */
+		j->terminated = 2;
+	}
 	STARPU_PTHREAD_COND_BROADCAST(&j->sync_cond);
 
 #ifdef HAVE_AYUDAME_H
@@ -278,7 +340,7 @@ void _starpu_handle_job_termination(struct _starpu_job *j)
 
 	STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
 
-	if (detach)
+	if (detach && !continuation)
 	{
 		/* no one is going to synchronize with that task so we release
 		 * the data structures now. In case the job was already locked
@@ -288,9 +350,12 @@ void _starpu_handle_job_termination(struct _starpu_job *j)
 			_starpu_task_destroy(task);
 	}
 
-	if (regenerate)
+	/* A continuation is not much different from a regenerated task. */
+	if (regenerate || continuation)
 	{
-		STARPU_ASSERT_MSG(detach && !destroy && !task->synchronous, "Regenerated task must be detached (was %d), and not have detroy=1 (was %d) or synchronous=1 (was %d)", detach, destroy, task->synchronous);
+		STARPU_ASSERT_MSG((detach && !destroy && !task->synchronous)
+				|| continuation
+				, "Regenerated task must be detached (was %d), and not have detroy=1 (was %d) or synchronous=1 (was %d)", detach, destroy, task->synchronous);
 
 #ifdef HAVE_AYUDAME_H
 		if (AYU_event)
@@ -450,6 +515,29 @@ unsigned _starpu_enforce_deps_starting_from_task(struct _starpu_job *j)
 	return ret;
 }
 
+#ifdef STARPU_OPENMP
+/* When waking up a continuation, we only enforce new task dependencies */
+unsigned _starpu_reenforce_task_deps_and_schedule(struct _starpu_job *j)
+{
+	unsigned ret;
+        _STARPU_LOG_IN();
+	STARPU_ASSERT(j->discontinuous);
+
+	/* enfore task dependencies */
+	if (_starpu_not_all_task_deps_are_fulfilled(j))
+	{
+		STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
+		_STARPU_LOG_OUT_TAG("not_all_task_deps_are_fulfilled");
+		return 0;
+	}
+	STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
+	ret = _starpu_push_task(j);
+
+	_STARPU_LOG_OUT();
+	return ret;
+}
+#endif
+
 /* This function must be called with worker->sched_mutex taken */
 struct starpu_task *_starpu_pop_local_task(struct _starpu_worker *worker)
 {

+ 12 - 25
src/core/jobs.h

@@ -60,28 +60,6 @@ struct _starpu_data_descr {
 	int node;
 };
 
-#ifdef STARPU_OPENMP
-enum _starpu_job_preemption_state {
-	/* Job has never met any preemption event. */
-	_starpu_job_preemption_state_clear                    = 0,
-	/* Job has gone through the preemption preparation step
-	 * and is ready to be preempted. */
-	_starpu_job_preemption_state_prepared_for_preemption  = 1 << 0,
-	/* Job has just been preempted and is being temporarily removed from
-	 * its worker. If this bit is unset and the job is being removed from
-	 * its worker, it means that the job is simply terminating */
-	_starpu_job_preemption_state_is_preempted             = 1 << 1,
-	/* Preempted job has just been unlocked and is being put back on a
-	 * worker. If this bit is unset and the job is being put on a worker,
-	 * it means that the job is simply starting */
-	_starpu_job_preemption_state_is_restarted             = 1 << 2,
-	/* Job was preempted at least once in its life time. Useful for
-	 * determinating whether profiling data should be cumulated with
-	 * previous runs */
-	_starpu_job_preemption_state_was_preempted            = 1 << 3
-};
-#endif
-
 /* A job is the internal representation of a task. */
 LIST_TYPE(_starpu_job,
 
@@ -131,8 +109,11 @@ LIST_TYPE(_starpu_job,
 	unsigned terminated;
 
 #ifdef STARPU_OPENMP
-	/* Job preemption state and history. */
-	unsigned preemption_state;
+	/* Job is a continuation or a regular task. */
+	unsigned continuation;
+
+	/* Job has been stopped at least once. */
+	unsigned discontinuous;
 #endif
 
 	/* Should that task appear in the debug tools ? (eg. the DAG generated
@@ -185,6 +166,9 @@ void _starpu_wait_job(struct _starpu_job *j);
 #ifdef STARPU_OPENMP
 /* Test for the termination of the job */
 int _starpu_test_job_termination(struct _starpu_job *j);
+
+/* Prepare the job for accepting new dependencies before becoming a continuation. */
+void _starpu_job_prepare_for_continuation(struct _starpu_job *j);
 #endif
 
 /* Specify that the task should not appear in the DAG generated by debug tools. */
@@ -193,7 +177,10 @@ void _starpu_exclude_task_from_dag(struct starpu_task *task);
 /* try to submit job j, enqueue it if it's not schedulable yet. The job's sync mutex is supposed to be held already */
 unsigned _starpu_enforce_deps_and_schedule(struct _starpu_job *j);
 unsigned _starpu_enforce_deps_starting_from_task(struct _starpu_job *j);
-
+#ifdef STARPU_OPENMP
+/* When waking up a continuation, we only enforce new task dependencies */
+unsigned _starpu_reenforce_task_deps_and_schedule(struct _starpu_job *j);
+#endif
 
 /* This function must be called after the execution of a job, this triggers all
  * job's dependencies and perform the callback function if any. */

+ 47 - 5
src/core/task.c

@@ -259,8 +259,11 @@ struct _starpu_job *_starpu_get_job_associated_to_task(struct starpu_task *task)
  * already counted. */
 int _starpu_submit_job(struct _starpu_job *j)
 {
-
 	struct starpu_task *task = j->task;
+	int ret;
+#ifdef STARPU_OPENMP
+	const unsigned continuation = j->continuation;
+#endif
 
 	_STARPU_LOG_IN();
 	/* notify bound computation of a new task */
@@ -311,12 +314,29 @@ int _starpu_submit_job(struct _starpu_job *j)
 	/* Need to atomically set submitted to 1 and check dependencies, since
 	 * this is concucrent with _starpu_notify_cg */
 	j->terminated = 0;
+#ifdef STARPU_OPENMP
+	if (continuation)
+	{
+		j->discontinuous = 1;
+		j->continuation  = 0;
+	}
+#endif
+
 	if (!j->submitted)
 		j->submitted = 1;
 	else
 		j->submitted = 2;
 
-	int ret = _starpu_enforce_deps_and_schedule(j);
+#ifdef STARPU_OPENMP
+	if (continuation)
+	{
+		ret = _starpu_reenforce_task_deps_and_schedule(j);
+	}
+	else
+#endif
+	{
+		ret = _starpu_enforce_deps_and_schedule(j);
+	}
 
 	_STARPU_LOG_OUT();
 	return ret;
@@ -442,11 +462,18 @@ int starpu_task_submit(struct starpu_task *task)
 	int ret;
 	unsigned is_sync = task->synchronous;
 	starpu_task_bundle_t bundle = task->bundle;
-
 	/* internally, StarPU manipulates a struct _starpu_job * which is a wrapper around a
 	* task structure, it is possible that this job structure was already
 	* allocated. */
 	struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
+	const unsigned continuation =
+#ifdef STARPU_OPENMP
+		j->continuation
+#else
+		0
+#endif
+		;
+
 
 	if (j->internal)
 	{
@@ -510,8 +537,11 @@ int starpu_task_submit(struct starpu_task *task)
 			return -ENODEV;
 		}
 
-		_starpu_detect_implicit_data_deps(task);
-
+		/* If this is a continuation, we don't modify the implicit data dependencies detected earlier. */
+		if (!continuation)
+		{
+			_starpu_detect_implicit_data_deps(task);
+		}
 
 		if (task->cl->model && task->cl->model->symbol)
 			_starpu_load_perfmodel(task->cl->model);
@@ -916,6 +946,18 @@ void _starpu_set_current_task(struct starpu_task *task)
 	STARPU_PTHREAD_SETSPECIFIC(current_task_key, task);
 }
 
+#ifdef STARPU_OPENMP
+/* Prepare the fields of the currentl task for accepting a new set of
+ * dependencies in anticipation of becoming a continuation.
+ *
+ * When the task becomes 'continued', it will only be queued again when the new
+ * set of dependencies is fulfilled. */
+void _starpu_task_prepare_for_continuation(void)
+{
+	_starpu_job_prepare_for_continuation(_starpu_get_job_associated_to_task(starpu_task_get_current()));
+}
+#endif
+
 /*
  * Returns 0 if tasks does not use any multiformat handle, 1 otherwise.
  */

+ 5 - 0
src/core/task.h

@@ -59,6 +59,11 @@ int
 _starpu_handle_needs_conversion_task_for_arch(starpu_data_handle_t handle,
 				     enum starpu_node_kind node_kind);
 
+#ifdef STARPU_OPENMP
+/* Prepare the current task for accepting new dependencies before becoming a continuation. */
+void _starpu_task_prepare_for_continuation(void);
+#endif
+
 int _starpu_task_uses_multiformat_handles(struct starpu_task *task);
 
 int _starpu_task_submit_conversion_task(struct starpu_task *task,

+ 3 - 0
src/datawizard/coherency.c

@@ -799,6 +799,9 @@ enomem:
 
 void _starpu_push_task_output(struct _starpu_job *j)
 {
+#ifdef STARPU_OPENMP
+	STARPU_ASSERT(!j->continuation);
+#endif
 	_STARPU_TRACE_START_PUSH_OUTPUT(NULL);
 
 	int profiling = starpu_profiling_status_get();

+ 6 - 1
src/drivers/cpu/driver_cpu.c

@@ -114,7 +114,12 @@ static int execute_job_on_cpu(struct _starpu_job *j, struct starpu_task *worker_
 	{
 		_starpu_driver_update_job_feedback(j, cpu_args,
 				perf_arch, &codelet_start, &codelet_end, profiling);
-		_starpu_push_task_output(j);
+#ifdef STARPU_OPENMP
+		if (!j->continuation)
+#endif
+		{
+			_starpu_push_task_output(j);
+		}
 	}
 
 	return 0;

+ 3 - 0
src/drivers/driver_common/driver_common.c

@@ -116,6 +116,9 @@ void _starpu_driver_update_job_feedback(struct _starpu_job *j, struct _starpu_wo
 	int calibrate_model = 0;
 	int updated = 0;
 
+#ifdef STARPU_OPENMP
+#warning "TODO: [OPENMP] split profiling for discontinuous tasks"
+#endif
 #ifndef STARPU_SIMGRID
 	if (cl->model && cl->model->benchmarking)
 		calibrate_model = 1;

+ 6 - 34
src/util/openmp_runtime_support.c

@@ -119,7 +119,7 @@ static void omp_initial_thread_func(void)
 
 static struct starpu_omp_thread *create_omp_thread_struct(struct starpu_omp_region *owner_region)
 {
-	struct starpu_omp_thread *thread = malloc(sizeof(*thread));
+	struct starpu_omp_thread *thread = starpu_omp_thread_new();
 	if (thread == NULL)
 		_STARPU_ERROR("memory allocation failed");
 	/* .current_task */
@@ -141,7 +141,7 @@ static void destroy_omp_thread_struct(struct starpu_omp_thread *thread)
 	STARPU_ASSERT(thread->current_task == NULL);
 	STARPU_ASSERT(thread->primary_task == NULL);
 	memset(thread, 0, sizeof(*thread));
-	free(thread);
+	starpu_omp_thread_delete(thread);
 }
 
 static void starpu_omp_task_entry(struct starpu_omp_task *task)
@@ -178,16 +178,6 @@ static void starpu_omp_task_preempt(void)
 }
 
 /*
- * set a flag in starpu_task to indicate that the terminating/destroying sequence should not be applied on this task upon return,
- * the preempting sequence should be performed instead
- */
-static void _starpu_task_set_preempted(struct starpu_task *starpu_task)
-{
-	(void)starpu_task;
-	abort();/* TODO: implement */
-}
-
-/*
  * wrap a task function to allow the task to be preempted
  */
 static void starpu_omp_task_exec(void *buffers[], void *cl_arg)
@@ -255,32 +245,14 @@ static void starpu_omp_task_exec(void *buffers[], void *cl_arg)
 			task = NULL;
 		}
 	}
-	else if (task->state == starpu_omp_task_state_preempted)
-	{
-		_starpu_task_set_preempted(task->starpu_task);
-	}
-	else
+	else if (task->state != starpu_omp_task_state_preempted)
 		_STARPU_ERROR("invalid omp task state");
 }
 
-/*
- * prepare the starpu_task fields of a currently running task
- * for accepting a new set of dependencies in anticipation of a preemption
- *
- * when the task becomes preempted, it will only be queued again when the new
- * set of dependencies is fulfilled
- */
-static void _starpu_task_prepare_for_preemption(struct starpu_task *starpu_task)
-{
-	/* TODO: implement function */
-	(void)starpu_task;
-	abort();/* TODO: implement */
-}
-
 static struct starpu_omp_task *create_omp_task_struct(struct starpu_omp_task *parent_task,
 		struct starpu_omp_thread *owner_thread, struct starpu_omp_region *owner_region, int is_implicit)
 {
-	struct starpu_omp_task *task = malloc(sizeof(*task));
+	struct starpu_omp_task *task = starpu_omp_task_new();
 	if (task == NULL)
 		_STARPU_ERROR("memory allocation failed");
 	task->parent_task = parent_task;
@@ -335,7 +307,7 @@ static void destroy_omp_task_struct(struct starpu_omp_task *task)
 		free(task->stack);
 	}
 	memset(task, 0, sizeof(*task));
-	free(task);
+	starpu_omp_task_delete(task);
 }
 
 /*
@@ -502,7 +474,7 @@ void starpu_parallel_region(struct starpu_codelet *parallel_region_cl, void *par
 	else
 	{
 		/* through the preemption, the parent starpu task becomes the continuation task */
-		_starpu_task_prepare_for_preemption(parent_task->starpu_task);
+		_starpu_task_prepare_for_continuation();
 		new_region->continuation_starpu_task = parent_task->starpu_task;
 	}