瀏覽代碼

Add possibility to delay the termination of a task with the functions
starpu_task_end_dep_add() which specifies the number of calls to the
function starpu_task_end_dep_release() needed to trigger the task
termination.

Nathalie Furmento 6 年之前
父節點
當前提交
e70a08efd4

+ 4 - 0
ChangeLog

@@ -46,6 +46,10 @@ New features:
     communication layer, so that the MPI progression thread started by
     StarPU-MPI can also execute tasks and not only take care of
     communications.
+  * Add possibility to delay the termination of a task with the
+    functions starpu_task_end_dep_add() which specifies the number of
+    calls to the function starpu_task_end_dep_release() needed to
+    trigger the task termination.
 
 Small features:
   * Scheduling contexts may now be associated a user data pointer at creation

+ 25 - 0
doc/doxygen/chapters/301_tasks.doxy

@@ -55,6 +55,31 @@ other tasks and may thus be a bottleneck if not executed early
 enough), the field starpu_task::priority should be set to transmit the
 priority information to StarPU.
 
+\section TaskDependencies Task Dependencies
+
+By default, task dependencies are inferred from data dependency (sequential
+coherency) by StarPU. The application can however disable sequential coherency
+for some data, and dependencies can be specifically expressed.
+
+Setting (or unsetting) sequential consistency can be done at the data
+level by calling starpu_data_set_sequential_consistency_flag() for a
+specific data or starpu_data_set_default_sequential_consistency_flag()
+for all datas.
+
+Setting (or unsetting) sequential consistency can also be done at task
+level by setting the field starpu_task::sequential_consistency to 0.
+
+One can explicitely set dependencies between tasks using
+starpu_task_declare_deps_array(). Dependencies between tasks can be
+expressed through tags associated to a tag with the field
+starpu_task::tag_id and using the function starpu_tag_declare_deps()
+or starpu_tag_declare_deps_array().
+
+The termination of a task can be delayed through the function
+starpu_task_end_dep_add() which specifies the number of calls to the
+function starpu_task_end_dep_release() needed to trigger the task
+termination.
+
 \section SettingManyDataHandlesForATask Setting Many Data Handles For a Task
 
 The maximum number of data a task can manage is fixed by the environment variable

+ 12 - 1
doc/doxygen/chapters/api/explicit_dependencies.doxy

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2010-2013,2015,2017                      CNRS
+ * Copyright (C) 2010-2013,2015,2017,2018                      CNRS
  * Copyright (C) 2009-2011,2014-2016                      Université de Bordeaux
  * Copyright (C) 2011-2012                                Inria
  *
@@ -135,4 +135,15 @@ to execute the tasks. When called several times on the same tag,
 notification will be done only on first call, thus implementing "OR"
 dependencies, until the tag is restarted using starpu_tag_restart().
 
+\fn void starpu_task_end_dep_add(struct starpu_task *t, int nb_deps)
+\ingroup API_Explicit_Dependencies
+Add \p nb_deps end dependencies to the task \p t. This means the task
+will not terminate until the required number of calls to the function
+starpu_task_end_dep_release() has been made.
+
+\fn void starpu_task_end_dep_release(struct starpu_task *t)
+\ingroup API_Explicit_Dependencies
+Unlock 1 end dependency to the task \p t. This function must be called
+after starpu_task_end_dep_add().
+
 */

+ 2 - 1
examples/Makefile.am

@@ -249,7 +249,8 @@ STARPU_EXAMPLES +=				\
 	sched_ctx/dummy_sched_with_ctx		\
 	worker_collections/worker_tree_example  \
 	reductions/dot_product			\
-	reductions/minmax_reduction
+	reductions/minmax_reduction		\
+	dependency/task_end_dep
 
 endif
 

+ 92 - 0
examples/dependency/task_end_dep.c

@@ -0,0 +1,92 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2018                                     CNRS
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <starpu.h>
+
+#define FPRINTF(ofile, fmt, ...) do { if (!getenv("STARPU_SSILENT")) {fprintf(ofile, fmt, ## __VA_ARGS__); }} while(0)
+
+void cpu_codelet2(void *descr[], void *args)
+{
+	(void)descr;
+	(void)args;
+}
+
+struct starpu_codelet cl2 =
+{
+	.cpu_funcs = {cpu_codelet2},
+	.cpu_funcs_name = {"cpu_codelet2"},
+	.name = "codelet2"
+};
+
+void cpu_codelet(void *descr[], void *args)
+{
+	(void)args;
+	int *val = (int *)STARPU_VARIABLE_GET_PTR(descr[0]);
+	struct starpu_task *task;
+
+	task = starpu_task_get_current();
+	starpu_task_end_dep_add(task, 1);
+
+	starpu_task_insert(&cl2,
+			   STARPU_CALLBACK_WITH_ARG, starpu_task_end_dep_release, task,
+			   0);
+	*val *= 2;
+}
+
+struct starpu_codelet cl =
+{
+	.cpu_funcs = {cpu_codelet},
+	.cpu_funcs_name = {"cpu_codelet"},
+	.nbuffers = 1,
+	.modes = {STARPU_RW},
+	.name = "codelet"
+};
+
+int main(void)
+{
+        int value=12;
+	int ret;
+	starpu_data_handle_t value_handle;
+
+        ret = starpu_init(NULL);
+	if (STARPU_UNLIKELY(ret == -ENODEV))
+	{
+		return 77;
+	}
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
+
+	if (starpu_cpu_worker_get_count() < 1)
+	{
+		FPRINTF(stderr, "This application requires at least 1 cpu worker\n");
+		starpu_shutdown();
+		return 77;
+	}
+
+	starpu_variable_data_register(&value_handle, STARPU_MAIN_RAM, (uintptr_t)&value, sizeof(value));
+
+	ret = starpu_task_insert(&cl,
+				 STARPU_RW, value_handle,
+				 0);
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_insert");
+
+	starpu_data_unregister(value_handle);
+
+        starpu_shutdown();
+
+	FPRINTF(stderr, "Value = %d\n", value);
+
+	return ret;
+}

+ 5 - 1
include/starpu_task.h

@@ -2,7 +2,7 @@
  *
  * Copyright (C) 2011-2017                                Inria
  * Copyright (C) 2009-2018                                Université de Bordeaux
- * Copyright (C) 2010-2015,2017                           CNRS
+ * Copyright (C) 2010-2015,2017,2018                           CNRS
  * Copyright (C) 2011                                     Télécom-SudParis
  * Copyright (C) 2016                                     Uppsala University
  *
@@ -225,6 +225,7 @@ struct starpu_task
 #else
 	void *omp_task;
 #endif
+	unsigned nb_termination_call_required;
 	void *sched_data;
 };
 
@@ -299,6 +300,9 @@ void starpu_tag_declare_deps_array(starpu_tag_t id, unsigned ndeps, starpu_tag_t
 
 void starpu_task_declare_deps_array(struct starpu_task *task, unsigned ndeps, struct starpu_task *task_array[]);
 
+void starpu_task_end_dep_add(struct starpu_task *t, int nb_deps);
+void starpu_task_end_dep_release(struct starpu_task *t);
+
 int starpu_task_get_task_succs(struct starpu_task *task, unsigned ndeps, struct starpu_task *task_array[]);
 int starpu_task_get_task_scheduled_succs(struct starpu_task *task, unsigned ndeps, struct starpu_task *task_array[]);
 

+ 21 - 1
src/core/jobs.c

@@ -2,7 +2,7 @@
  *
  * Copyright (C) 2011-2017                                Inria
  * Copyright (C) 2008-2018                                Université de Bordeaux
- * Copyright (C) 2010-2017                                CNRS
+ * Copyright (C) 2010-2018                                CNRS
  * Copyright (C) 2013                                     Thibaut Lambert
  * Copyright (C) 2011                                     Télécom-SudParis
  *
@@ -261,8 +261,28 @@ void _starpu_handle_job_submission(struct _starpu_job *j)
 #endif
 }
 
+void starpu_task_end_dep_release(struct starpu_task *t)
+{
+	struct _starpu_job *j = (struct _starpu_job *)t->starpu_private;
+	_starpu_handle_job_termination(j);
+}
+
+void starpu_task_end_dep_add(struct starpu_task *t, int nb_deps)
+{
+	t->nb_termination_call_required += nb_deps;
+}
+
 void _starpu_handle_job_termination(struct _starpu_job *j)
 {
+	if (j->task->nb_termination_call_required != 0)
+	{
+		STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
+		int nb = j->task->nb_termination_call_required;
+		j->task->nb_termination_call_required -= 1;
+		STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
+		if (nb != 0) return;
+	}
+
 	struct starpu_task *task = j->task;
 	unsigned sched_ctx = task->sched_ctx;
 	double flops = task->flops;