Browse Source

merge trunk

Samuel Thibault 10 years ago
parent
commit
dcfa9853ab

+ 2 - 0
ChangeLog

@@ -83,6 +83,8 @@ Small features:
   * On Linux x86, spinlocks now block after a hundred tries. This avoids
     typical 10ms pauses when the application thread tries to submit tasks.
   * New function char *starpu_worker_get_type_as_string(enum starpu_worker_archtype type)
+  * Improve static scheduling by adding support for specifying the task
+    execution order.
 
 Changes:
   * Data interfaces (variable, vector, matrix and block) now define

+ 11 - 0
doc/doxygen/chapters/08scheduling.doxy

@@ -146,6 +146,17 @@ task->execute_on_a_specific_worker = 1;
 task->worker = starpu_worker_get_by_type(STARPU_CUDA_WORKER, 0);
 \endcode
 
+One can also specify the order in which tasks must be executed by setting the
+starpu_task::workerder field. If this field is set to a non-zero value, it
+provides the per-worker consecutive order in which tasks will be executed,
+starting from 1. For a given of such task, the worker will thus not execute
+it before all the tasks with smaller order value have been executed, notably
+in case those tasks are not available yet due to some dependencies. This
+eventually gives total control of task scheduling, and StarPU will only serve as
+a "self-timed" task runtime. Of course, the provided order has to be runnable,
+i.e. a task should should not depend on another task bound to the same worker
+with a bigger order.
+
 Note however that using scheduling contexts while statically scheduling tasks on workers
 could be tricky. Be careful to schedule the tasks exactly on the workers of the corresponding
 contexts, otherwise the workers' corresponding scheduling structures may not be allocated or

+ 8 - 0
doc/doxygen/chapters/21simgrid.doxy

@@ -121,6 +121,14 @@ case. Since during simgrid execution, the functions of the codelet are actually
 not called, one can use dummy functions such as the following to still permit
 CUDA or OpenCL execution:
 
+\section Debugging applications
+
+By default, simgrid uses its own implementation of threads, which prevents gdb
+from being able to inspect stacks of all threads.  To be able to fully debug an
+application running with simgrid, pass the <c>--cfg=contexts/factory:thread</c>
+option to the application, to make simgrid use system threads, which gdb will be
+able to manipulate as usual.
+
 \snippet simgrid.c To be included. You should update doxygen if you see this text.
 
 

+ 8 - 0
doc/doxygen/chapters/api/codelet_and_tasks.doxy

@@ -549,6 +549,14 @@ process this task (as returned by starpu_worker_get_id()). This field
 is ignored if the field starpu_task::execute_on_a_specific_worker is
 set to 0.
 
+\var starpu_task::workerorder
+Optional field. If the field starpu_task::execute_on_a_specific_worker is
+set, this field indicates the per-worker consecutive order in which tasks
+should be executed on the worker. Tasks will be executed in consecutive
+starpu_task::workerorder values, thus ignoring the availability order or task
+priority. See \ref StaticScheduling for more details. This field is ignored if
+the field starpu_task::execute_on_a_specific_worker is set to 0.
+
 \var starpu_task::bundle
 Optional field. The bundle that includes this task. If no bundle is
 used, this should be NULL.

+ 1 - 1
doc/doxygen/refman.tex

@@ -10,7 +10,7 @@
 ~\\
 \vspace*{15cm}
 \begin{flushright}
-Generated by Doxygen $doxygenversion on $datetime
+Generated by Doxygen.
 \end{flushright}
 \end{titlepage}
 

+ 2 - 0
include/starpu_task.h

@@ -167,6 +167,7 @@ struct starpu_task
 	unsigned regenerate:1;
 
 	unsigned workerid;
+	unsigned workerorder;
 
 	unsigned scheduled:1;
 
@@ -206,6 +207,7 @@ struct starpu_task
 	.use_tag = 0,					\
 	.synchronous = 0,				\
 	.execute_on_a_specific_worker = 0,		\
+	.workerorder = 0,				\
 	.bundle = NULL,					\
 	.detach = 1,					\
 	.destroy = 0,					\

+ 52 - 3
src/core/jobs.c

@@ -461,11 +461,28 @@ unsigned _starpu_enforce_deps_starting_from_task(struct _starpu_job *j)
 	return ret;
 }
 
+/* Ordered tasks are simply recorded as they arrive in the local_ordered_tasks
+ * ring buffer, indexed by order, and pulled from its head. */
+
 /* This function must be called with worker->sched_mutex taken */
 struct starpu_task *_starpu_pop_local_task(struct _starpu_worker *worker)
 {
 	struct starpu_task *task = NULL;
 
+	if (worker->local_ordered_tasks_size)
+	{
+		task = worker->local_ordered_tasks[worker->current_ordered_task];
+		if (task)
+		{
+			worker->local_ordered_tasks[worker->current_ordered_task] = NULL;
+			STARPU_ASSERT(task->workerorder == worker->current_ordered_task_order);
+			/* Next ordered task is there, return it */
+			worker->current_ordered_task = (worker->current_ordered_task + 1) % worker->local_ordered_tasks_size;
+			worker->current_ordered_task_order++;
+			return task;
+		}
+	}
+
 	if (!starpu_task_list_empty(&worker->local_tasks))
 		task = starpu_task_list_pop_front(&worker->local_tasks);
 
@@ -481,10 +498,42 @@ int _starpu_push_local_task(struct _starpu_worker *worker, struct starpu_task *t
 
 	STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
 
-	if (prio)
-		starpu_task_list_push_front(&worker->local_tasks, task);
+	if (task->execute_on_a_specific_worker && task->workerorder)
+	{
+		/* Put it in the ordered task ring */
+		unsigned needed = task->workerorder - worker->current_ordered_task_order + 1;
+		if (worker->local_ordered_tasks_size < needed)
+		{
+			/* Increase the size */
+			unsigned alloc = worker->local_ordered_tasks_size;
+			struct starpu_task **new;
+			unsigned copied;
+
+			if (!alloc)
+				alloc = 1;
+			while (alloc < needed)
+				alloc *= 2;
+			new = malloc(alloc * sizeof(*new));
+
+			/* Put existing tasks at the beginning of the new ring */
+			copied = worker->local_ordered_tasks_size - worker->current_ordered_task;
+			memcpy(new, &worker->local_ordered_tasks[worker->current_ordered_task], copied * sizeof(*new));
+			memcpy(new + copied, worker->local_ordered_tasks, (worker->local_ordered_tasks_size - copied) * sizeof(*new));
+			memset(new + worker->local_ordered_tasks_size, 0, (alloc - worker->local_ordered_tasks_size) * sizeof(*new));
+			free(worker->local_ordered_tasks);
+			worker->local_ordered_tasks = new;
+			worker->local_ordered_tasks_size = alloc;
+			worker->current_ordered_task = 0;
+		}
+		worker->local_ordered_tasks[(worker->current_ordered_task + task->workerorder - worker->current_ordered_task_order) % worker->local_ordered_tasks_size] = task;
+	}
 	else
-		starpu_task_list_push_back(&worker->local_tasks, task);
+	{
+		if (prio)
+			starpu_task_list_push_front(&worker->local_tasks, task);
+		else
+			starpu_task_list_push_back(&worker->local_tasks, task);
+	}
 
 	STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
 	starpu_push_task_end(task);

+ 8 - 4
src/core/perfmodel/perfmodel_history.c

@@ -814,10 +814,14 @@ static void get_model_path(struct starpu_perfmodel *model, char *path, size_t ma
 	_starpu_get_perf_model_dir_codelets(path, maxlen);
 	strncat(path, model->symbol, maxlen);
 
-	char hostname[65];
-	_starpu_gethostname(hostname, sizeof(hostname));
-	strncat(path, ".", maxlen);
-	strncat(path, hostname, maxlen);
+	const char *dot = strrchr(model->symbol, '.');
+	if (dot == NULL)
+	{
+		char hostname[65];
+		_starpu_gethostname(hostname, sizeof(hostname));
+		strncat(path, ".", maxlen);
+		strncat(path, hostname, maxlen);
+	}
 }
 
 static void save_history_based_model(struct starpu_perfmodel *model)

+ 7 - 0
src/core/workers.c

@@ -428,6 +428,10 @@ static void _starpu_worker_init(struct _starpu_worker *workerarg, struct _starpu
 	STARPU_PTHREAD_COND_INIT(&workerarg->sched_cond, NULL);
 	STARPU_PTHREAD_MUTEX_INIT(&workerarg->sched_mutex, NULL);
 	starpu_task_list_init(&workerarg->local_tasks);
+	workerarg->local_ordered_tasks = NULL;
+	workerarg->local_ordered_tasks_size = 0;
+	workerarg->current_ordered_task = 0;
+	workerarg->current_ordered_task_order = 1;
 	workerarg->current_task = NULL;
 	workerarg->first_task = 0;
 	workerarg->ntasks = 0;
@@ -1157,6 +1161,7 @@ static void _starpu_terminate_workers(struct _starpu_machine_config *pconfig)
 {
 	int status = 0;
 	unsigned workerid;
+	unsigned n;
 
 	for (workerid = 0; workerid < pconfig->topology.nworkers; workerid++)
 	{
@@ -1209,6 +1214,8 @@ static void _starpu_terminate_workers(struct _starpu_machine_config *pconfig)
 
 out:
 		STARPU_ASSERT(starpu_task_list_empty(&worker->local_tasks));
+		for (n = 0; n < worker->local_ordered_tasks_size; n++)
+			STARPU_ASSERT(worker->local_ordered_tasks[n] == NULL);
 		_starpu_sched_ctx_list_delete(&worker->sched_ctx_list);
 		_starpu_job_list_delete(worker->terminated_jobs);
 	}

+ 4 - 0
src/core/workers.h

@@ -75,6 +75,10 @@ LIST_TYPE(_starpu_worker,
 	starpu_pthread_cond_t sched_cond; /* condition variable used when the worker waits for tasks. */
         starpu_pthread_mutex_t sched_mutex; /* mutex protecting sched_cond */
 	struct starpu_task_list local_tasks; /* this queue contains tasks that have been explicitely submitted to that queue */
+	struct starpu_task **local_ordered_tasks; /* this queue contains tasks that have been explicitely submitted to that queue with an explicit order */
+	unsigned local_ordered_tasks_size; /* this records the size of local_ordered_tasks */
+	unsigned current_ordered_task; /* this records the index (within local_ordered_tasks) of the next ordered task to be executed */
+	unsigned current_ordered_task_order; /* this records the order of the next ordered task to be executed */
 	struct starpu_task *current_task; /* task currently executed by this worker (non-pipelined version) */
 	struct starpu_task *current_tasks[STARPU_MAX_PIPELINE]; /* tasks currently executed by this worker (pipelined version) */
 	unsigned char first_task; /* Index of first task in the pipeline */

+ 23 - 6
src/debug/traces/starpu_fxt.c

@@ -29,12 +29,19 @@
 #include <inttypes.h>
 #include <starpu_hash.h>
 
-static char *cpus_worker_colors[STARPU_NMAXWORKERS] = {"/greens9/7", "/greens9/6", "/greens9/5", "/greens9/4",  "/greens9/9", "/greens9/3",  "/greens9/2",  "/greens9/1"  };
-static char *cuda_worker_colors[STARPU_NMAXWORKERS] = {"/ylorrd9/9", "/ylorrd9/6", "/ylorrd9/3", "/ylorrd9/1", "/ylorrd9/8", "/ylorrd9/7", "/ylorrd9/4", "/ylorrd9/2",  "/ylorrd9/1"};
-static char *opencl_worker_colors[STARPU_NMAXWORKERS] = {"/blues9/9", "/blues9/6", "/blues9/3", "/blues9/1", "/blues9/8", "/blues9/7", "/blues9/4", "/blues9/2",  "/blues9/1"};
-static char *mic_worker_colors[STARPU_NMAXWORKERS] = {"/reds9/9", "/reds9/6", "/reds9/3", "/reds9/1", "/reds9/8", "/reds9/7", "/reds9/4", "/reds9/2",  "/reds9/1"};
-static char *scc_worker_colors[STARPU_NMAXWORKERS] = {"/reds9/9", "/reds9/6", "/reds9/3", "/reds9/1", "/reds9/8", "/reds9/7", "/reds9/4", "/reds9/2",  "/reds9/1"};
-static char *other_worker_colors[STARPU_NMAXWORKERS] = {"/greys9/9", "/greys9/8", "/greys9/7", "/greys9/6"};
+#define CPUS_WORKER_COLORS_NB	8
+#define CUDA_WORKER_COLORS_NB	9
+#define OPENCL_WORKER_COLORS_NB 8
+#define MIC_WORKER_COLORS_NB	9
+#define SCC_WORKER_COLORS_NB	9
+#define OTHER_WORKER_COLORS_NB	4
+
+static char *cpus_worker_colors[CPUS_WORKER_COLORS_NB] = {"/greens9/7", "/greens9/6", "/greens9/5", "/greens9/4",  "/greens9/9", "/greens9/3",  "/greens9/2",  "/greens9/1"  };
+static char *cuda_worker_colors[CUDA_WORKER_COLORS_NB] = {"/ylorrd9/9", "/ylorrd9/6", "/ylorrd9/3", "/ylorrd9/1", "/ylorrd9/8", "/ylorrd9/7", "/ylorrd9/4", "/ylorrd9/2",  "/ylorrd9/1"};
+static char *opencl_worker_colors[OPENCL_WORKER_COLORS_NB] = {"/blues9/9", "/blues9/6", "/blues9/3", "/blues9/1", "/blues9/8", "/blues9/7", "/blues9/4", "/blues9/2",  "/blues9/1"};
+static char *mic_worker_colors[MIC_WORKER_COLORS_NB] = {"/reds9/9", "/reds9/6", "/reds9/3", "/reds9/1", "/reds9/8", "/reds9/7", "/reds9/4", "/reds9/2",  "/reds9/1"};
+static char *scc_worker_colors[SCC_WORKER_COLORS_NB] = {"/reds9/9", "/reds9/6", "/reds9/3", "/reds9/1", "/reds9/8", "/reds9/7", "/reds9/4", "/reds9/2",  "/reds9/1"};
+static char *other_worker_colors[OTHER_WORKER_COLORS_NB] = {"/greys9/9", "/greys9/8", "/greys9/7", "/greys9/6"};
 static char *worker_colors[STARPU_NMAXWORKERS];
 
 static unsigned opencl_index = 0;
@@ -49,6 +56,7 @@ static void set_next_other_worker_color(int workerid)
 	if (workerid >= STARPU_NMAXWORKERS)
 		return;
 	worker_colors[workerid] = other_worker_colors[other_index++];
+	if (other_index == OTHER_WORKER_COLORS_NB) other_index = 0;
 }
 
 static void set_next_cpu_worker_color(int workerid)
@@ -56,6 +64,7 @@ static void set_next_cpu_worker_color(int workerid)
 	if (workerid >= STARPU_NMAXWORKERS)
 		return;
 	worker_colors[workerid] = cpus_worker_colors[cpus_index++];
+	if (cpus_index == CPUS_WORKER_COLORS_NB) cpus_index = 0;
 }
 
 static void set_next_cuda_worker_color(int workerid)
@@ -63,6 +72,7 @@ static void set_next_cuda_worker_color(int workerid)
 	if (workerid >= STARPU_NMAXWORKERS)
 		return;
 	worker_colors[workerid] = cuda_worker_colors[cuda_index++];
+	if (cuda_index == CUDA_WORKER_COLORS_NB) cuda_index = 0;
 }
 
 static void set_next_opencl_worker_color(int workerid)
@@ -70,16 +80,23 @@ static void set_next_opencl_worker_color(int workerid)
 	if (workerid >= STARPU_NMAXWORKERS)
 		return;
 	worker_colors[workerid] = opencl_worker_colors[opencl_index++];
+	if (opencl_index == OPENCL_WORKER_COLORS_NB) opencl_index = 0;
 }
 
 static void set_next_mic_worker_color(int workerid)
 {
+	if (workerid >= STARPU_NMAXWORKERS)
+		return;
 	worker_colors[workerid] = mic_worker_colors[mic_index++];
+	if (mic_index == MIC_WORKER_COLORS_NB) mic_index = 0;
 }
 
 static void set_next_scc_worker_color(int workerid)
 {
+	if (workerid >= STARPU_NMAXWORKERS)
+		return;
 	worker_colors[workerid] = scc_worker_colors[scc_index++];
+	if (scc_index == SCC_WORKER_COLORS_NB) scc_index = 0;
 }
 
 static const char *get_worker_color(int workerid)

+ 1 - 0
tests/Makefile.am

@@ -113,6 +113,7 @@ noinst_PROGRAMS =				\
 	main/deploop                            \
 	main/restart				\
 	main/execute_on_a_specific_worker	\
+	main/execute_schedule			\
 	main/insert_task			\
 	main/insert_task_nullcodelet			\
 	main/insert_task_array			\

+ 109 - 0
tests/main/execute_schedule.c

@@ -0,0 +1,109 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2014  Université de Bordeaux 1
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <stdio.h>
+#include <unistd.h>
+#include <errno.h>
+#include <starpu.h>
+#include <stdlib.h>
+#include "../helper.h"
+#include <common/thread.h>
+
+#ifdef STARPU_QUICK_CHECK
+  #define K 2
+#else
+  #define K 16
+#endif
+
+#define N 64
+
+static unsigned current = 1;
+
+void codelet(STARPU_ATTRIBUTE_UNUSED void *descr[], void *_args)
+{
+	uintptr_t me = (uintptr_t) _args;
+	STARPU_ASSERT(current == me);
+	current++;
+}
+
+static struct starpu_codelet cl =
+{
+	.cpu_funcs = {codelet, NULL},
+	.cuda_funcs = {codelet, NULL},
+	.opencl_funcs = {codelet, NULL},
+	.nbuffers = 0,
+};
+
+int main(int argc, char **argv)
+{
+	int ret;
+	struct starpu_task *dep_task[N];
+
+	ret = starpu_initialize(NULL, &argc, &argv);
+	if (ret == -ENODEV) return STARPU_TEST_SKIPPED;
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
+
+	unsigned n, i, k;
+
+	for (k = 0; k < K; k++)
+	{
+		for (n = 0; n < N; n++)
+		{
+			struct starpu_task *task;
+
+			dep_task[n] = starpu_task_create();
+
+			dep_task[n]->cl = NULL;
+
+			task = starpu_task_create();
+
+			task->cl = &cl;
+
+			task->execute_on_a_specific_worker = 1;
+			task->workerid = 0;
+			task->workerorder = k*N + n+1;
+			task->cl_arg = (void*) (uintptr_t) (k*N + n+1);
+
+			starpu_task_declare_deps_array(task, 1, &dep_task[n]);
+
+			ret = starpu_task_submit(task);
+			if (ret == -ENODEV) goto enodev;
+			STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");
+		}
+
+		for (n = 0; n < N; n++)
+		{
+			i = random()%(N-n);
+			ret = starpu_task_submit(dep_task[i]);
+			memmove(&dep_task[i], &dep_task[i+1], (N-i-1)*sizeof(dep_task[i]));
+			if (ret == -ENODEV) goto enodev;
+			STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");
+		}
+	}
+
+	starpu_task_wait_for_all();
+
+	starpu_shutdown();
+
+	return EXIT_SUCCESS;
+
+enodev:
+	starpu_shutdown();
+	fprintf(stderr, "WARNING: No one can execute this task\n");
+	/* yes, we do not perform the computation but we did detect that no one
+ 	 * could perform the kernel, so this is not an error from StarPU */
+	return STARPU_TEST_SKIPPED;
+}