소스 검색

Add STARPU_NOWHERE to create synchronization tasks with data.

Samuel Thibault 10 년 전
부모
커밋
75d0d38234

+ 1 - 0
ChangeLog

@@ -151,6 +151,7 @@ Small features:
     disable CUDA costs simulation in simgrid mode.
   * Add starpu_memory_pin and starpu_memory_unpin to pin memory allocated
     another way than starpu_malloc.
+  * Add STARPU_NOWHERE to create synchronization tasks with data.
 
 Changes:
   * Data interfaces (variable, vector, matrix and block) now define

+ 32 - 0
doc/doxygen/chapters/06tasks.doxy

@@ -469,5 +469,37 @@ CPU and GPU tasks are not affected and can be run concurrently). The parallel
 task scheduler will however still however still try varying combined worker
 sizes to look for the most efficient ones.
 
+\subsection SynchronizationTasks Synchronization tasks
+
+For the application conveniency, it may be useful to define tasks which do not
+actually make any computation, but wear for instance dependencies between other
+tasks or tags, or to be submitted in callbacks, etc.
+
+The obvious way is of course to make kernel functions empty, but such task will
+thus have to wait for a worker to become ready, transfer data, etc.
+
+A much lighter way to define a synchronization task is to set its <c>cl</c>
+field to <c>NULL</c>. The task will thus be a mere synchronization point,
+without any data access or execution content: as soon as its dependencies become
+available, it will terminate, call the callbacks, and release dependencies.
+
+An intermediate solution is to define a codelet with its <c>where</c> field set
+to STARPU_NOWHERE, for instance this:
+
+\code{.c}
+struct starpu_codelet {
+	.where = NOWHERE,
+	.nbuffers = 1,
+	.modes = { STARPU_R },
+}
+
+task = starpu_task_create();
+task->cl = starpu_codelet;
+task->handles[0] = handle;
+starpu_task_submit(task);
+\endcode
+
+will create a task which simply waits for the value of <c>handle</c> to be
+available for read. This task can then be depended on, etc.
 
 */

+ 9 - 0
doc/doxygen/chapters/api/codelet_and_tasks.doxy

@@ -56,6 +56,13 @@ The task is waiting for a task.
 \ingroup API_Codelet_And_Tasks
 The task is waiting for some data.
 
+\def STARPU_NOWHERE
+\ingroup API_Codelet_And_Tasks
+This macro is used when setting the field starpu_codelet::where
+to specify that the codelet has no computation part, and thus does not need
+to be scheduled, and data does not need to be actually loaded. This is thus
+essentially used for synchronization tasks.
+
 \def STARPU_CPU
 \ingroup API_Codelet_And_Tasks
 This macro is used when setting the field starpu_codelet::where
@@ -170,6 +177,8 @@ indicates that the codelet is implemented for both CPU cores and CUDA
 devices while ::STARPU_OPENCL indicates that it is only available on
 OpenCL devices. If the field is unset, its value will be automatically
 set based on the availability of the XXX_funcs fields defined below.
+It can also be set to ::STARPU_NOWHERE to specify that no computation
+has to be actually done.
 
 \var int (*starpu_codelet::can_execute)(unsigned workerid, struct starpu_task *task, unsigned nimpl)
 Define a function which should return 1 if the worker designated by

+ 1 - 0
include/starpu_task.h

@@ -35,6 +35,7 @@ extern "C"
 {
 #endif
 
+#define STARPU_NOWHERE	((1ULL)<<0)
 #define STARPU_CPU	((1ULL)<<1)
 #define STARPU_CUDA	((1ULL)<<3)
 #define STARPU_OPENCL	((1ULL)<<6)

+ 4 - 0
src/core/jobs.c

@@ -246,6 +246,10 @@ void _starpu_handle_job_termination(struct _starpu_job *j)
 	size_t data_size = 0;
 #endif //STARPU_USE_SC_HYPERVISOR
 
+	if (task->cl && task->cl->where == STARPU_NOWHERE)
+		/* push_task_output hasn't been done */
+		_starpu_release_nowhere_task_output(j);
+
 	/* We release handle reference count */
 	if (task->cl && !continuation)
 	{

+ 1 - 1
src/core/sched_policy.c

@@ -402,7 +402,7 @@ int _starpu_repush_task(struct _starpu_job *j)
 	/* in case there is no codelet associated to the task (that's a control
 	 * task), we directly execute its callback and enforce the
 	 * corresponding dependencies */
-	if (task->cl == NULL)
+	if (task->cl == NULL || task->cl->where == STARPU_NOWHERE)
 	{
 		if(task->prologue_callback_pop_func)
 			task->prologue_callback_pop_func(task->prologue_callback_pop_arg);

+ 2 - 0
src/core/workers.c

@@ -174,6 +174,8 @@ static uint32_t _starpu_worker_exists_and_can_execute(struct starpu_task *task,
 uint32_t _starpu_worker_exists(struct starpu_task *task)
 {
 	_starpu_codelet_check_deprecated_fields(task->cl);
+	if (task->cl->where == STARPU_NOWHERE)
+		return 1;
 
 	/* if the task belongs to the init context we can
 	   check out all the worker mask of the machine

+ 41 - 0
src/datawizard/coherency.c

@@ -971,6 +971,47 @@ void _starpu_push_task_output(struct _starpu_job *j)
 	_STARPU_TRACE_END_PUSH_OUTPUT(NULL);
 }
 
+/* Version of _starpu_push_task_output used by NOWHERE tasks, for which
+ * _starpu_fetch_task_input was not called. We just release the handle */
+void _starpu_release_nowhere_task_output(struct _starpu_job *j)
+{
+#ifdef STARPU_OPENMP
+	STARPU_ASSERT(!j->continuation);
+#endif
+	int profiling = starpu_profiling_status_get();
+	struct starpu_task *task = j->task;
+	if (profiling && task->profiling_info)
+		_starpu_clock_gettime(&task->profiling_info->release_data_start_time);
+
+        struct _starpu_data_descr *descrs = _STARPU_JOB_GET_ORDERED_BUFFERS(j);
+        unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
+
+	unsigned index;
+	for (index = 0; index < nbuffers; index++)
+	{
+		starpu_data_handle_t handle = descrs[index].handle;
+
+		if (index && descrs[index-1].handle == descrs[index].handle)
+			/* We have already released this data, skip it. This
+			 * depends on ordering putting writes before reads, see
+			 * _starpu_compar_handles */
+			continue;
+
+		/* Keep a reference for future
+		 * _starpu_release_task_enforce_sequential_consistency call */
+		_starpu_spin_lock(&handle->header_lock);
+		handle->busy_count++;
+		_starpu_spin_unlock(&handle->header_lock);
+
+		_starpu_spin_lock(&handle->header_lock);
+		if (!_starpu_notify_data_dependencies(handle))
+			_starpu_spin_unlock(&handle->header_lock);
+	}
+
+	if (profiling && task->profiling_info)
+		_starpu_clock_gettime(&task->profiling_info->release_data_end_time);
+}
+
 /* NB : this value can only be an indication of the status of a data
 	at some point, but there is no strong garantee ! */
 unsigned _starpu_is_data_present_or_requested(starpu_data_handle_t handle, unsigned node)

+ 2 - 0
src/datawizard/coherency.h

@@ -268,6 +268,8 @@ uint32_t _starpu_data_get_footprint(starpu_data_handle_t handle);
 
 void _starpu_push_task_output(struct _starpu_job *j);
 
+void _starpu_release_nowhere_task_output(struct _starpu_job *j);
+
 STARPU_ATTRIBUTE_WARN_UNUSED_RESULT
 int _starpu_fetch_task_input(struct _starpu_job *j);
 

+ 1 - 0
tests/Makefile.am

@@ -187,6 +187,7 @@ noinst_PROGRAMS =				\
 	datawizard/lazy_unregister		\
 	datawizard/no_unregister		\
 	datawizard/noreclaim			\
+	datawizard/nowhere			\
 	datawizard/interfaces/copy_interfaces	\
 	datawizard/interfaces/block/block_interface \
 	datawizard/interfaces/bcsr/bcsr_interface \

+ 119 - 0
tests/datawizard/nowhere.c

@@ -0,0 +1,119 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2015  Université de Bordeaux
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <stdio.h>
+#include <unistd.h>
+#include <errno.h>
+#include <starpu.h>
+#include <stdlib.h>
+#include "../helper.h"
+
+static int x, y;
+
+static void prod(void *descr[], void *_args STARPU_ATTRIBUTE_UNUSED)
+{
+	int *v = (int *)STARPU_VARIABLE_GET_PTR(descr[0]);
+
+	*v = 1;
+}
+
+static struct starpu_codelet cl_prod =
+{
+	.cpu_funcs = { prod },
+	.nbuffers = 1,
+	.modes = { STARPU_W },
+};
+
+static void callback(void *callback_arg)
+{
+	STARPU_ASSERT(x>=1);
+	STARPU_ASSERT(y>=1);
+}
+
+static struct starpu_codelet cl_nowhere =
+{
+	.where = STARPU_NOWHERE,
+	.nbuffers = 2,
+	.modes = { STARPU_R, STARPU_R },
+};
+
+static void cons(void *descr[], void *_args STARPU_ATTRIBUTE_UNUSED)
+{
+	int *v = (int *)STARPU_VARIABLE_GET_PTR(descr[0]);
+
+	STARPU_ASSERT(*v == 1);
+	*v = 2;
+}
+
+static struct starpu_codelet cl_cons =
+{
+	.cpu_funcs = { cons },
+	.nbuffers = 1,
+	.modes = { STARPU_RW },
+};
+
+int main(int argc, char **argv)
+{
+	starpu_data_handle_t handle_x, handle_y;
+	int ret;
+
+	ret = starpu_initialize(NULL, &argc, &argv);
+	if (ret == -ENODEV) return STARPU_TEST_SKIPPED;
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
+
+	starpu_variable_data_register(&handle_x, STARPU_MAIN_RAM, (uintptr_t)&x, sizeof(x));
+	starpu_variable_data_register(&handle_y, STARPU_MAIN_RAM, (uintptr_t)&y, sizeof(y));
+
+	ret = starpu_task_insert(&cl_prod, STARPU_W, handle_x, 0);
+	if (ret == -ENODEV) goto enodev;
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");
+
+	ret = starpu_task_insert(&cl_prod, STARPU_W, handle_y, 0);
+	if (ret == -ENODEV) goto enodev;
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");
+
+	ret = starpu_task_insert(&cl_nowhere, STARPU_R, handle_x, STARPU_R, handle_y, STARPU_CALLBACK, callback, 0);
+	if (ret == -ENODEV) goto enodev;
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");
+
+	ret = starpu_task_insert(&cl_cons, STARPU_RW, handle_x, 0);
+	if (ret == -ENODEV) goto enodev;
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");
+
+	ret = starpu_task_insert(&cl_cons, STARPU_RW, handle_y, 0);
+	if (ret == -ENODEV) goto enodev;
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");
+
+	ret = starpu_task_wait_for_all();
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_wait_for_all");
+
+	starpu_data_unregister(handle_x);
+	starpu_data_unregister(handle_y);
+
+	starpu_shutdown();
+
+	return EXIT_SUCCESS;
+
+enodev:
+	starpu_data_unregister(handle_x);
+	starpu_data_unregister(handle_y);
+
+	fprintf(stderr, "WARNING: No one can execute this task\n");
+	/* yes, we do not perform the computation but we did detect that no one
+ 	 * could perform the kernel, so this is not an error from StarPU */
+	starpu_shutdown();
+	return STARPU_TEST_SKIPPED;
+}