Quellcode durchsuchen

Factorize task submission code

Samuel Thibault vor 10 Jahren
Ursprung
Commit
bb817246ac
3 geänderte Dateien mit 74 neuen und 118 gelöschten Zeilen
  1. 22 15
      src/core/dependencies/data_concurrency.c
  2. 3 1
      src/core/dependencies/data_concurrency.h
  3. 49 102
      src/core/task.c

+ 22 - 15
src/core/dependencies/data_concurrency.c

@@ -277,33 +277,40 @@ static unsigned _submit_job_enforce_data_deps(struct _starpu_job *j, unsigned st
 	return 0;
 }
 
-/* Sort the data used by the given job by handle pointer value order, and
- * acquire them in that order */
-/* No  lock is held */
-unsigned _starpu_submit_job_enforce_data_deps(struct _starpu_job *j)
+void _starpu_job_set_ordered_buffers(struct _starpu_job *j)
 {
-	struct starpu_codelet *cl = j->task->cl;
-
-	if ((cl == NULL) || (STARPU_TASK_GET_NBUFFERS(j->task) == 0))
-		return 0;
-
 	/* Compute an ordered list of the different pieces of data so that we
 	 * grab then according to a total order, thus avoiding a deadlock
 	 * condition */
 	unsigned i;
-	for (i=0 ; i<STARPU_TASK_GET_NBUFFERS(j->task); i++)
+	unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(j->task);
+	struct starpu_task *task = j->task;
+
+	for (i=0 ; i<nbuffers; i++)
 	{
-		starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(j->task, i);
+		starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, i);
 		_STARPU_JOB_SET_ORDERED_BUFFER_HANDLE(j, handle, i);
-		enum starpu_data_access_mode mode = STARPU_TASK_GET_MODE(j->task, i);
+		enum starpu_data_access_mode mode = STARPU_TASK_GET_MODE(task, i);
 		_STARPU_JOB_SET_ORDERED_BUFFER_MODE(j, mode, i);
 		int node = -1;
-		if (j->task->cl->specific_nodes)
-			node = STARPU_CODELET_GET_NODE(j->task->cl, i);
+		if (task->cl->specific_nodes)
+			node = STARPU_CODELET_GET_NODE(task->cl, i);
 		_STARPU_JOB_SET_ORDERED_BUFFER_NODE(j, node, i);
 	}
+	_starpu_sort_task_handles(_STARPU_JOB_GET_ORDERED_BUFFERS(j), nbuffers);
+}
+
+/* Sort the data used by the given job by handle pointer value order, and
+ * acquire them in that order */
+/* No  lock is held */
+unsigned _starpu_submit_job_enforce_data_deps(struct _starpu_job *j)
+{
+	struct starpu_codelet *cl = j->task->cl;
+
+	if ((cl == NULL) || (STARPU_TASK_GET_NBUFFERS(j->task) == 0))
+		return 0;
 
-	_starpu_sort_task_handles(_STARPU_JOB_GET_ORDERED_BUFFERS(j), STARPU_TASK_GET_NBUFFERS(j->task));
+	_starpu_job_set_ordered_buffers(j);
 
 	return _submit_job_enforce_data_deps(j, 0);
 }

+ 3 - 1
src/core/dependencies/data_concurrency.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2010, 2012  Université de Bordeaux
+ * Copyright (C) 2010, 2012, 2015  Université de Bordeaux
  * Copyright (C) 2010, 2011  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -20,6 +20,8 @@
 
 #include <core/jobs.h>
 
+void _starpu_job_set_ordered_buffers(struct _starpu_job *j);
+
 unsigned _starpu_submit_job_enforce_data_deps(struct _starpu_job *j);
 
 int _starpu_notify_data_dependencies(starpu_data_handle_t handle);

+ 49 - 102
src/core/task.c

@@ -24,6 +24,7 @@
 #include <core/jobs.h>
 #include <core/task.h>
 #include <core/task_bundle.h>
+#include <core/dependencies/data_concurrency.h>
 #include <common/config.h>
 #include <common/utils.h>
 #include <common/fxt.h>
@@ -502,28 +503,10 @@ void _starpu_task_check_deprecated_fields(struct starpu_task *task STARPU_ATTRIB
 	/* None any more */
 }
 
-/* application should submit new tasks to StarPU through this function */
-int starpu_task_submit(struct starpu_task *task)
+static int _starpu_task_submit_head(struct starpu_task *task)
 {
-	_STARPU_LOG_IN();
-	STARPU_ASSERT(task);
-	STARPU_ASSERT_MSG(task->magic == 42, "Tasks must be created with starpu_task_create, or initialized with starpu_task_init.");
-
-	int ret;
 	unsigned is_sync = task->synchronous;
-	starpu_task_bundle_t bundle = task->bundle;
-	/* internally, StarPU manipulates a struct _starpu_job * which is a wrapper around a
-	* task structure, it is possible that this job structure was already
-	* allocated. */
 	struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
-	const unsigned continuation =
-#ifdef STARPU_OPENMP
-		j->continuation
-#else
-		0
-#endif
-		;
-
 
 	if (j->internal)
 	{
@@ -589,12 +572,6 @@ int starpu_task_submit(struct starpu_task *task)
 			return -ENODEV;
 		}
 
-		/* If this is a continuation, we don't modify the implicit data dependencies detected earlier. */
-		if (!continuation)
-		{
-			_starpu_detect_implicit_data_deps(task);
-		}
-
 		if (task->cl->model)
 			_starpu_init_and_load_perfmodel(task->cl->model);
 
@@ -602,6 +579,40 @@ int starpu_task_submit(struct starpu_task *task)
 			_starpu_init_and_load_perfmodel(task->cl->power_model);
 	}
 
+	return 0;
+}
+
+/* application should submit new tasks to StarPU through this function */
+int starpu_task_submit(struct starpu_task *task)
+{
+	_STARPU_LOG_IN();
+	STARPU_ASSERT(task);
+	STARPU_ASSERT_MSG(task->magic == 42, "Tasks must be created with starpu_task_create, or initialized with starpu_task_init.");
+
+	int ret;
+	unsigned is_sync = task->synchronous;
+	starpu_task_bundle_t bundle = task->bundle;
+	/* internally, StarPU manipulates a struct _starpu_job * which is a wrapper around a
+	* task structure, it is possible that this job structure was already
+	* allocated. */
+	struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
+	const unsigned continuation =
+#ifdef STARPU_OPENMP
+		j->continuation
+#else
+		0
+#endif
+		;
+
+
+	ret = _starpu_task_submit_head(task);
+	if (ret)
+		return ret;
+
+	/* If this is a continuation, we don't modify the implicit data dependencies detected earlier. */
+	if (task->cl && !continuation)
+		_starpu_detect_implicit_data_deps(task);
+
 	if (bundle)
 	{
 		/* We need to make sure that models for other tasks of the
@@ -637,7 +648,6 @@ int starpu_task_submit(struct starpu_task *task)
 	 * dependency. */
 	task->status = STARPU_TASK_BLOCKED;
 
-
 	if (profiling)
 		_starpu_clock_gettime(&info->submit_time);
 
@@ -672,54 +682,21 @@ int starpu_task_submit_to_ctx(struct starpu_task *task, unsigned sched_ctx_id)
  * skipping dependencies completely (when it knows what it is doing).  */
 int _starpu_task_submit_nodeps(struct starpu_task *task)
 {
-	_starpu_task_check_deprecated_fields(task);
-	_starpu_codelet_check_deprecated_fields(task->cl);
-
-	if (task->cl)
-	{
-		if (task->cl->model)
-			_starpu_init_and_load_perfmodel(task->cl->model);
-
-		if (task->cl->power_model)
-			_starpu_init_and_load_perfmodel(task->cl->power_model);
-	}
+	int ret;
+	ret = _starpu_task_submit_head(task);
+	STARPU_ASSERT(ret == 0);
 
 	struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
 
-	if (j->internal)
-	{
-		// Internal tasks are submitted to initial context
-		j->task->sched_ctx = _starpu_get_initial_sched_ctx()->id;
-	}
-	else if (task->sched_ctx == STARPU_NMAX_SCHED_CTXS)
-	{
-		// If the task has not specified a context, we set the current context
-		j->task->sched_ctx = _starpu_sched_ctx_get_current_context();
-	}
-
 	_starpu_increment_nsubmitted_tasks_of_sched_ctx(j->task->sched_ctx);
-	STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
 
+	STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
 	j->submitted = 1;
-
+	_starpu_increment_nready_tasks_of_sched_ctx(j->task->sched_ctx, j->task->flops, j->task);
 	if (task->cl)
-	{
 		/* This would be done by data dependencies checking */
-		unsigned i;
-		unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
-		for (i=0 ; i<nbuffers ; i++)
-		{
-			starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(j->task, i);
-			_STARPU_JOB_SET_ORDERED_BUFFER_HANDLE(j, handle, i);
-			enum starpu_data_access_mode mode = STARPU_TASK_GET_MODE(j->task, i);
-			_STARPU_JOB_SET_ORDERED_BUFFER_MODE(j, mode, i);
-			int node = -1;
-			if (j->task->cl->specific_nodes)
-				node = STARPU_CODELET_GET_NODE(j->task->cl, i);
-			_STARPU_JOB_SET_ORDERED_BUFFER_NODE(j, node, i);
-		}
-	}
-
+		_starpu_job_set_ordered_buffers(j);
+	task->status = STARPU_TASK_READY;
 	STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
 
 	return _starpu_push_task(j);
@@ -731,20 +708,14 @@ int _starpu_task_submit_nodeps(struct starpu_task *task)
 int _starpu_task_submit_conversion_task(struct starpu_task *task,
 					unsigned int workerid)
 {
+	int ret;
 	STARPU_ASSERT(task->cl);
 	STARPU_ASSERT(task->execute_on_a_specific_worker);
 
-	_starpu_task_check_deprecated_fields(task);
-	_starpu_codelet_check_deprecated_fields(task->cl);
-
-	/* We should factorize that */
-	if (task->cl->model)
-		_starpu_init_and_load_perfmodel(task->cl->model);
+	ret = _starpu_task_submit_head(task);
+	STARPU_ASSERT(ret == 0);
 
-	if (task->cl->power_model)
-		_starpu_init_and_load_perfmodel(task->cl->power_model);
-
-	/* We retain handle reference count */
+	/* We retain handle reference count that would have been acquired by data dependencies.  */
 	unsigned i;
 	unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
 	for (i=0; i<nbuffers; i++)
@@ -757,34 +728,12 @@ int _starpu_task_submit_conversion_task(struct starpu_task *task,
 
 	struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
 
-	if (j->internal)
-	{
-		// Internal tasks are submitted to initial context
-		j->task->sched_ctx = _starpu_get_initial_sched_ctx()->id;
-	}
-	else if (task->sched_ctx == STARPU_NMAX_SCHED_CTXS)
-	{
-		// If the task has not specified a context, we set the current context
-		j->task->sched_ctx = _starpu_sched_ctx_get_current_context();
-	}
-
 	_starpu_increment_nsubmitted_tasks_of_sched_ctx(j->task->sched_ctx);
+
 	STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
 	j->submitted = 1;
 	_starpu_increment_nready_tasks_of_sched_ctx(j->task->sched_ctx, j->task->flops, j->task);
-	for (i=0 ; i<nbuffers ; i++)
-	{
-		starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(j->task, i);
-		_STARPU_JOB_SET_ORDERED_BUFFER_HANDLE(j, handle, i);
-		enum starpu_data_access_mode mode = STARPU_TASK_GET_MODE(j->task, i);
-		_STARPU_JOB_SET_ORDERED_BUFFER_MODE(j, mode, i);
-		int node = -1;
-		if (j->task->cl->specific_nodes)
-			node = STARPU_CODELET_GET_NODE(j->task->cl, i);
-		_STARPU_JOB_SET_ORDERED_BUFFER_NODE(j, node, i);
-	}
-
-        _STARPU_LOG_IN();
+	_starpu_job_set_ordered_buffers(j);
 
 	task->status = STARPU_TASK_READY;
 	_starpu_profiling_set_task_push_start_time(task);
@@ -799,9 +748,7 @@ int _starpu_task_submit_conversion_task(struct starpu_task *task,
 
 	_starpu_profiling_set_task_push_end_time(task);
 
-        _STARPU_LOG_OUT();
 	STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
-
 	return 0;
 }