Quellcode durchsuchen

Add graph inspection facility for schedulers

Samuel Thibault vor 9 Jahren
Ursprung
Commit
26a7949eed

+ 1 - 0
ChangeLog

@@ -116,6 +116,7 @@ New features:
   * Generate animated html trace of modular schedulers.
   * Generate animated html trace of modular schedulers.
   * Add asynchronous partition planning. It only supports coherency through
   * Add asynchronous partition planning. It only supports coherency through
     the home node of data for now.
     the home node of data for now.
+  * Add graph inspection facility for schedulers.
 
 
 Small features:
 Small features:
   * Tasks can now have a name (via the field const char *name of
   * Tasks can now have a name (via the field const char *name of

+ 24 - 3
doc/doxygen/chapters/08scheduling.doxy

@@ -179,13 +179,34 @@ static struct starpu_sched_policy dummy_sched_policy = {
     .add_workers = dummy_sched_add_workers,
     .add_workers = dummy_sched_add_workers,
     .remove_workers = dummy_sched_remove_workers,
     .remove_workers = dummy_sched_remove_workers,
     .push_task = push_task_dummy,
     .push_task = push_task_dummy,
-    .push_prio_task = NULL,
     .pop_task = pop_task_dummy,
     .pop_task = pop_task_dummy,
-    .post_exec_hook = NULL,
-    .pop_every_task = NULL,
     .policy_name = "dummy",
     .policy_name = "dummy",
     .policy_description = "dummy scheduling strategy"
     .policy_description = "dummy scheduling strategy"
 };
 };
 \endcode
 \endcode
 
 
+\section GraphScheduling Graph-based scheduling
+
+For performance reasons, most of the schedulers shipped with StarPU use simple
+list-scheduling heuristics, assuming that the application has already set
+priorities.  That is why they do their scheduling between when tasks become
+available for execution and when a worker becomes idle, without looking at the
+task graph.
+
+Other heuristics can however look at the task graph. Recording the task graph
+is expensive, so it is not available by default, the scheduling heuristic has
+to set _starpu_graph_record to 1 from the initialization function, to make it
+available. Then the <c>_starpu_graph*</c> functions can be used.
+
+<c>src/sched_policies/graph_test_policy.c</c> is an example of simple greedy
+policy which automatically computes priorities by bottom-up rank.
+
+The idea is that while the application submits tasks, they are only pushed
+to a bag of tasks. When the application is finished with submitting tasks,
+it calls starpu_do_schedule (or starpu_task_wait_for_all, which calls
+starpu_do_schedule), and the starpu_sched_policy::do_schedule method of the
+scheduler is called. This method calls _starpu_graph_compute_depths to compute
+the bottom-up ranks, and then uses these rank to set priorities over tasks. It
+can then let tasks go to a priority queue, from which workers can then pop.
+
 */
 */

+ 11 - 1
doc/doxygen/chapters/api/scheduling_policy.doxy

@@ -76,7 +76,7 @@ For each task not going through the scheduler (because starpu_task::execute_on_a
 \var void (*starpu_sched_policy::do_schedule)(unsigned sched_ctx_id)
 \var void (*starpu_sched_policy::do_schedule)(unsigned sched_ctx_id)
         Optional field. This method is called when it is a good time to start
         Optional field. This method is called when it is a good time to start
         scheduling tasks. This is notably called when the application calls
         scheduling tasks. This is notably called when the application calls
-        starpu_task_wait_for_all.
+        starpu_task_wait_for_all or starpu_do_schedule explicitly.
 \var void (*starpu_sched_policy::add_workers)(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
 \var void (*starpu_sched_policy::add_workers)(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
         Initialize scheduling structures corresponding to each worker used by the policy.
         Initialize scheduling structures corresponding to each worker used by the policy.
 \var void (*starpu_sched_policy::remove_workers)(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
 \var void (*starpu_sched_policy::remove_workers)(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
@@ -224,4 +224,14 @@ Prefetch data for a given task on a given node when the bus is idle
 The scheduling policies indicates if the worker may pop tasks from the list of other workers
 The scheduling policies indicates if the worker may pop tasks from the list of other workers
 or if there is a central list with task for all the workers
 or if there is a central list with task for all the workers
 
 
+\fn void _starpu_graph_compute_depths(void)
+\ingroup API_Scheduling_policy
+This make StarPU compute for each task the depth, i.e. the length of the
+longest path to a task without outgoing dependencies.
+
+\fn void _starpu_graph_foreach(void (*func)(void *data, struct _starpu_job *job), void *data)
+\ingroup API_Scheduling_policy
+This calls \e func for each node of the task graph, passing also \e data as it
+is.
+
 */
 */

+ 1 - 5
examples/scheduler/dummy_sched.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  *
- * Copyright (C) 2010-2015  Université de Bordeaux
+ * Copyright (C) 2010-2016  Université de Bordeaux
  * Copyright (C) 2010-2013  CNRS
  * Copyright (C) 2010-2013  CNRS
  *
  *
  * StarPU is free software; you can redistribute it and/or modify
  * StarPU is free software; you can redistribute it and/or modify
@@ -119,13 +119,9 @@ static struct starpu_task *pop_task_dummy(unsigned sched_ctx_id)
 static struct starpu_sched_policy dummy_sched_policy =
 static struct starpu_sched_policy dummy_sched_policy =
 {
 {
 	.init_sched = init_dummy_sched,
 	.init_sched = init_dummy_sched,
-	.add_workers = NULL,
-	.remove_workers = NULL,
 	.deinit_sched = deinit_dummy_sched,
 	.deinit_sched = deinit_dummy_sched,
 	.push_task = push_task_dummy,
 	.push_task = push_task_dummy,
 	.pop_task = pop_task_dummy,
 	.pop_task = pop_task_dummy,
-	.post_exec_hook = NULL,
-	.pop_every_task = NULL,
 	.policy_name = "dummy",
 	.policy_name = "dummy",
 	.policy_description = "dummy scheduling strategy",
 	.policy_description = "dummy scheduling strategy",
 	.worker_type = STARPU_WORKER_LIST,
 	.worker_type = STARPU_WORKER_LIST,

+ 3 - 0
src/Makefile.am

@@ -118,6 +118,7 @@ noinst_HEADERS = 						\
 	common/rbtree.h						\
 	common/rbtree.h						\
 	common/rbtree_i.h					\
 	common/rbtree_i.h					\
 	common/prio_list.h					\
 	common/prio_list.h					\
+	common/graph.h						\
 	drivers/driver_common/driver_common.h			\
 	drivers/driver_common/driver_common.h			\
 	drivers/mp_common/mp_common.h				\
 	drivers/mp_common/mp_common.h				\
 	drivers/mp_common/source_common.h			\
 	drivers/mp_common/source_common.h			\
@@ -160,6 +161,7 @@ libstarpu_@STARPU_EFFECTIVE_VERSION@_la_SOURCES = 		\
 	common/utils.c						\
 	common/utils.c						\
 	common/thread.c						\
 	common/thread.c						\
 	common/rbtree.c						\
 	common/rbtree.c						\
+	common/graph.c						\
 	core/jobs.c						\
 	core/jobs.c						\
 	core/task.c						\
 	core/task.c						\
 	core/task_bundle.c					\
 	core/task_bundle.c					\
@@ -202,6 +204,7 @@ libstarpu_@STARPU_EFFECTIVE_VERSION@_la_SOURCES = 		\
 	sched_policies/parallel_heft.c				\
 	sched_policies/parallel_heft.c				\
 	sched_policies/parallel_eager.c				\
 	sched_policies/parallel_eager.c				\
 	sched_policies/heteroprio.c				\
 	sched_policies/heteroprio.c				\
+	sched_policies/graph_test_policy.c			\
 	drivers/driver_common/driver_common.c			\
 	drivers/driver_common/driver_common.c			\
 	drivers/disk/driver_disk.c				\
 	drivers/disk/driver_disk.c				\
 	datawizard/memory_nodes.c				\
 	datawizard/memory_nodes.c				\

+ 234 - 0
src/common/graph.c

@@ -0,0 +1,234 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2016  Université de Bordeaux
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+/*
+ * This stores the task graph structure, to used by the schedulers which need
+ * it.  We do not always enable it since it is costly.
+ */
+
+#include <starpu.h>
+#include <core/jobs.h>
+#include <common/graph.h>
+
+/* Protects the whole task graph */
+static starpu_pthread_rwlock_t graph_lock;
+
+/* Whether we should enable recording the task graph */
+int _starpu_graph_record;
+
+/* This list contains all jobs without incoming dependency */
+struct _starpu_job_list top;
+/* This list contains all jobs without outgoing dependency */
+struct _starpu_job_list bottom;
+/* This list contains all jobs */
+struct _starpu_job_list all;
+
+void _starpu_graph_init(void)
+{
+	STARPU_PTHREAD_RWLOCK_INIT(&graph_lock, NULL);
+	_starpu_job_list_init(&top);
+	_starpu_job_list_init(&bottom);
+	_starpu_job_list_init(&all);
+}
+
+static void __starpu_graph_foreach(void (*func)(void *data, struct _starpu_job *job), void *data)
+{
+	struct _starpu_job *job;
+
+	for (job = _starpu_job_list_begin(&all, all);
+	     job != _starpu_job_list_end(&all, all);
+	     job = _starpu_job_list_next(&all, job, all))
+		func(data, job);
+}
+
+/* Add a job to the graph */
+void _starpu_graph_add_job(struct _starpu_job *job)
+{
+	STARPU_PTHREAD_RWLOCK_WRLOCK(&graph_lock);
+
+	/* It does not have any dependency yet, add to all lists */
+	_starpu_job_list_push_back(&top, job, top);
+	_starpu_job_list_push_back(&bottom, job, bottom);
+	_starpu_job_list_push_back(&all, job, all);
+
+	STARPU_PTHREAD_RWLOCK_UNLOCK(&graph_lock);
+}
+
+/* Add a job to an array of jobs */
+static unsigned add_job(struct _starpu_job *job, struct _starpu_job ***jobs, unsigned *n_jobs, unsigned *alloc_jobs, unsigned **slot)
+{
+	unsigned ret;
+	if (*n_jobs == *alloc_jobs)
+	{
+		if (*alloc_jobs)
+			*alloc_jobs *= 2;
+		else
+			*alloc_jobs = 4;
+		*jobs = realloc(*jobs, *alloc_jobs * sizeof(**jobs));
+		if (slot)
+			*slot = realloc(*slot, *alloc_jobs * sizeof(**slot));
+	}
+	ret = (*n_jobs)++;
+	(*jobs)[ret] = job;
+	return ret;
+}
+
+/* Add a dependency between jobs */
+void _starpu_graph_add_job_dep(struct _starpu_job *job, struct _starpu_job *prev_job)
+{
+	unsigned rank_incoming, rank_outgoing;
+	STARPU_PTHREAD_RWLOCK_WRLOCK(&graph_lock);
+
+	if (_starpu_job_list_queued(prev_job, bottom))
+		/* Previous job is not at bottom any more */
+		_starpu_job_list_erase(bottom, prev_job, bottom);
+
+	if (_starpu_job_list_queued(job, top))
+		/* Next job is not at top any more */
+		_starpu_job_list_erase(top, job, top);
+
+	rank_incoming = add_job(prev_job, &job->incoming, &job->n_incoming, &job->alloc_incoming, NULL);
+	rank_outgoing = add_job(job, &prev_job->outgoing, &prev_job->n_outgoing, &prev_job->alloc_outgoing, &prev_job->outgoing_slot);
+	prev_job->outgoing_slot[rank_outgoing] = rank_incoming;
+
+	STARPU_PTHREAD_RWLOCK_UNLOCK(&graph_lock);
+}
+
+/* Drop a job, and thus its dependencies */
+void _starpu_graph_drop_job(struct _starpu_job *job)
+{
+	unsigned i;
+	STARPU_PTHREAD_RWLOCK_WRLOCK(&graph_lock);
+
+	if (_starpu_job_list_queued(job, bottom))
+		_starpu_job_list_erase(bottom, job, bottom);
+	if (_starpu_job_list_queued(job, top))
+		_starpu_job_list_erase(top, job, top);
+	if (_starpu_job_list_queued(job, all))
+		_starpu_job_list_erase(all, job, all);
+
+	/* Drop ourself from the incoming part of the outgoing jobs */
+	for (i = 0; i < job->n_outgoing; i++)
+	{
+		struct _starpu_job *next = job->outgoing[i];
+		next->incoming[job->outgoing_slot[i]] = NULL;
+	}
+	job->n_outgoing = 0;
+	free(job->outgoing);
+	job->outgoing = NULL;
+	free(job->outgoing_slot);
+	job->outgoing_slot = NULL;
+	job->alloc_outgoing = 0;
+	job->n_incoming = 0;
+	free(job->incoming);
+	job->incoming = NULL;
+	job->alloc_incoming = 0;
+	STARPU_PTHREAD_RWLOCK_UNLOCK(&graph_lock);
+}
+
+static void _starpu_graph_set_n(void *data, struct _starpu_job *job)
+{
+	int value = (intptr_t) data;
+	job->graph_n = value;
+}
+
+/* Call func for each vertex of the task graph, from bottom to top, in topological order */
+static void _starpu_graph_compute_bottom_up(void (*func)(struct _starpu_job *next_job, struct _starpu_job *prev_job, void *data), void *data)
+{
+	struct _starpu_job *job, *job2;
+	struct _starpu_job **current_set = NULL, **next_set = NULL, **swap_set;
+	unsigned current_n, next_n, i, j;
+	unsigned current_alloc = 0, next_alloc = 0, swap_alloc;
+
+	/* Classical flow algorithm: start from bottom, and propagate depths to top */
+
+	/* Set number of processed outgoing edges to 0 for each node */
+	__starpu_graph_foreach(_starpu_graph_set_n, (void*) 0);
+
+	/* Start with the bottom of the graph */
+	current_n = 0;
+	for (job = _starpu_job_list_begin(&bottom, bottom);
+	     job != _starpu_job_list_end(&bottom, bottom);
+	     job = _starpu_job_list_next(&bottom, job, bottom))
+		add_job(job, &current_set, &current_n, &current_alloc, NULL);
+
+	/* Now propagate to top as long as we have current nodes */
+	while (current_n)
+	{
+		/* Next set is initially empty */
+		next_n = 0;
+
+		/* For each node in the current set */
+		for (i = 0; i < current_n; i++)
+		{
+			job = current_set[i];
+			/* For each parent of this job */
+			for (j = 0; j < job->n_incoming; j++)
+			{
+				job2 = job->incoming[j];
+				if (!job2)
+					continue;
+				job2->graph_n++;
+				func(job, job2, data);
+				if (job2->depth < job->depth + 1)
+					job2->depth = job->depth + 1;
+
+				if ((unsigned) job2->graph_n == job2->n_outgoing)
+					/* All outgoing edges were processed, can now add to next set */
+					add_job(job2, &next_set, &next_n, &next_alloc, NULL);
+			}
+		}
+
+		/* Swap next set with current set */
+		swap_set = next_set;
+		swap_alloc = next_alloc;
+		next_set = current_set;
+		next_alloc = current_alloc;
+		current_set = swap_set;
+		current_alloc = swap_alloc;
+		current_n = next_n;
+	}
+}
+
+static void compute_depth(struct _starpu_job *next_job, struct _starpu_job *prev_job, void *data STARPU_ATTRIBUTE_UNUSED)
+{
+	if (prev_job->depth < next_job->depth + 1)
+		prev_job->depth = next_job->depth + 1;
+}
+
+void _starpu_graph_compute_depths(void)
+{
+	struct _starpu_job *job;
+
+	STARPU_PTHREAD_RWLOCK_WRLOCK(&graph_lock);
+
+	/* The bottom of the graph has depth 0 */
+	for (job = _starpu_job_list_begin(&bottom, bottom);
+	     job != _starpu_job_list_end(&bottom, bottom);
+	     job = _starpu_job_list_next(&bottom, job, bottom))
+		job->depth = 0;
+
+	_starpu_graph_compute_bottom_up(compute_depth, NULL);
+
+	STARPU_PTHREAD_RWLOCK_UNLOCK(&graph_lock);
+}
+
+void _starpu_graph_foreach(void (*func)(void *data, struct _starpu_job *job), void *data)
+{
+	STARPU_PTHREAD_RWLOCK_WRLOCK(&graph_lock);
+	__starpu_graph_foreach(func, data);
+	STARPU_PTHREAD_RWLOCK_UNLOCK(&graph_lock);
+}

+ 34 - 0
src/common/graph.h

@@ -0,0 +1,34 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2016  Université de Bordeaux
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+void _starpu_graph_init(void);
+extern int _starpu_graph_record;
+
+/* Add a job to the graph, called before any _starpu_graph_add_job_dep call */
+void _starpu_graph_add_job(struct _starpu_job *job);
+
+/* Add a dependency between jobs */
+void _starpu_graph_add_job_dep(struct _starpu_job *job, struct _starpu_job *prev_job);
+
+/* Remove a job from the graph */
+void _starpu_graph_drop_job(struct _starpu_job *job);
+
+/* Compute the depth of jobs in the graph */
+/* This does not take job duration into account, just the number */
+void _starpu_graph_compute_depths(void);
+
+/* Apply func on each job of the graph */
+void _starpu_graph_foreach(void (*func)(void *data, struct _starpu_job *job), void *data);

+ 3 - 0
src/core/dependencies/task_deps.c

@@ -19,6 +19,7 @@
 #include <starpu.h>
 #include <starpu.h>
 #include <common/config.h>
 #include <common/config.h>
 #include <common/utils.h>
 #include <common/utils.h>
+#include <common/graph.h>
 #include <core/dependencies/tags.h>
 #include <core/dependencies/tags.h>
 #include <core/jobs.h>
 #include <core/jobs.h>
 #include <core/task.h>
 #include <core/task.h>
@@ -118,6 +119,8 @@ void _starpu_task_declare_deps_array(struct starpu_task *task, unsigned ndeps, s
 			AYU_event(AYU_ADDDEPENDENCY, job->job_id, AYU_data);
 			AYU_event(AYU_ADDDEPENDENCY, job->job_id, AYU_data);
 		}
 		}
 #endif
 #endif
+		if (_starpu_graph_record)
+			_starpu_graph_add_job_dep(job, dep_job);
 
 
 		_starpu_task_add_succ(dep_job, cg);
 		_starpu_task_add_succ(dep_job, cg);
 		if (dep_job->task->regenerate)
 		if (dep_job->task->regenerate)

+ 11 - 0
src/core/jobs.c

@@ -24,6 +24,7 @@
 #include <core/dependencies/data_concurrency.h>
 #include <core/dependencies/data_concurrency.h>
 #include <common/config.h>
 #include <common/config.h>
 #include <common/utils.h>
 #include <common/utils.h>
+#include <common/graph.h>
 #include <profiling/profiling.h>
 #include <profiling/profiling.h>
 #include <profiling/bound.h>
 #include <profiling/bound.h>
 #include <starpu_top.h>
 #include <starpu_top.h>
@@ -103,6 +104,9 @@ struct _starpu_job* STARPU_ATTRIBUTE_MALLOC _starpu_job_create(struct starpu_tas
 	if (task->use_tag)
 	if (task->use_tag)
 		_starpu_tag_declare(task->tag_id, job);
 		_starpu_tag_declare(task->tag_id, job);
 
 
+	if (_starpu_graph_record)
+		_starpu_graph_add_job(job);
+
         _STARPU_LOG_OUT();
         _STARPU_LOG_OUT();
 	return job;
 	return job;
 }
 }
@@ -135,6 +139,9 @@ void _starpu_job_destroy(struct _starpu_job *j)
 		j->dyn_dep_slots = NULL;
 		j->dyn_dep_slots = NULL;
 	}
 	}
 
 
+	if (_starpu_graph_record)
+		_starpu_graph_drop_job(j);
+
 	free(j);
 	free(j);
 }
 }
 
 
@@ -317,6 +324,10 @@ void _starpu_handle_job_termination(struct _starpu_job *j)
 		_starpu_release_task_enforce_sequential_consistency(j);
 		_starpu_release_task_enforce_sequential_consistency(j);
 	}
 	}
 
 
+	/* Remove ourself from the graph before notifying dependencies */
+	if (_starpu_graph_record)
+		_starpu_graph_drop_job(j);
+
 	/* Task does not have a cl, but has explicit data dependencies, we need
 	/* Task does not have a cl, but has explicit data dependencies, we need
 	 * to tell them that we will not exist any more before notifying the
 	 * to tell them that we will not exist any more before notifying the
 	 * tasks waiting for us
 	 * tasks waiting for us

+ 24 - 0
src/core/jobs.h

@@ -252,6 +252,30 @@ struct _starpu_job {
 	starpu_pthread_barrier_t after_work_barrier;
 	starpu_pthread_barrier_t after_work_barrier;
 	unsigned after_work_busy_barrier;
 	unsigned after_work_busy_barrier;
 
 
+	/*
+	 * Fields for graph analysis for scheduling heuristics
+	 */
+	/* Member of list of all jobs without incoming dependency */
+	struct _starpu_job_list top;
+	/* Member of list of all jobs without outgoing dependency */
+	struct _starpu_job_list bottom;
+	/* Member of list of all jobs */
+	struct _starpu_job_list all;
+
+	/* set of incoming dependencies */
+	struct _starpu_job **incoming;	/* May contain NULLs for terminated jobs */
+	unsigned n_incoming;		/* Number of slots used */
+	unsigned alloc_incoming;	/* Size of incoming */
+	/* set of outgoing dependencies */
+	struct _starpu_job **outgoing;
+	unsigned *outgoing_slot;	/* Index within corresponding incoming array */
+	unsigned n_outgoing;		/* Number of slots used */
+	unsigned alloc_outgoing;	/* Size of outgoing */
+
+	unsigned depth;			/* Rank from bottom, in number of jobs */
+
+	int graph_n;			/* Variable available for graph flow */
+
 #ifdef STARPU_DEBUG
 #ifdef STARPU_DEBUG
 	/* Linked-list of all jobs, for debugging */
 	/* Linked-list of all jobs, for debugging */
 	struct _starpu_job_list all_submitted;
 	struct _starpu_job_list all_submitted;

+ 1 - 0
src/core/sched_policy.c

@@ -59,6 +59,7 @@ static struct starpu_sched_policy *predefined_policies[] =
 	&_starpu_sched_parallel_heft_policy,
 	&_starpu_sched_parallel_heft_policy,
 	&_starpu_sched_peager_policy,
 	&_starpu_sched_peager_policy,
 	&_starpu_sched_heteroprio_policy,
 	&_starpu_sched_heteroprio_policy,
+	&_starpu_sched_graph_test_policy,
 	NULL
 	NULL
 };
 };
 
 

+ 1 - 0
src/core/sched_policy.h

@@ -87,4 +87,5 @@ extern struct starpu_sched_policy _starpu_sched_modular_random_prio_prefetching_
 extern struct starpu_sched_policy _starpu_sched_modular_ws_policy;
 extern struct starpu_sched_policy _starpu_sched_modular_ws_policy;
 extern struct starpu_sched_policy _starpu_sched_modular_heft_policy;
 extern struct starpu_sched_policy _starpu_sched_modular_heft_policy;
 extern struct starpu_sched_policy _starpu_sched_modular_heft2_policy;
 extern struct starpu_sched_policy _starpu_sched_modular_heft2_policy;
+extern struct starpu_sched_policy _starpu_sched_graph_test_policy;
 #endif // __SCHED_POLICY_H__
 #endif // __SCHED_POLICY_H__

+ 2 - 0
src/core/workers.c

@@ -22,6 +22,7 @@
 #include <stdio.h>
 #include <stdio.h>
 #include <common/config.h>
 #include <common/config.h>
 #include <common/utils.h>
 #include <common/utils.h>
+#include <common/graph.h>
 #include <core/progress_hook.h>
 #include <core/progress_hook.h>
 #include <core/workers.h>
 #include <core/workers.h>
 #include <core/debug.h>
 #include <core/debug.h>
@@ -1178,6 +1179,7 @@ int starpu_initialize(struct starpu_conf *user_conf, int *argc, char ***argv)
 	}
 	}
 
 
 	_starpu_job_init();
 	_starpu_job_init();
+	_starpu_graph_init();
 
 
 	_starpu_init_all_sched_ctxs(&_starpu_config);
 	_starpu_init_all_sched_ctxs(&_starpu_config);
 	_starpu_init_progression_hooks();
 	_starpu_init_progression_hooks();

+ 3 - 3
src/sched_policies/fifo_queues.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  *
- * Copyright (C) 2010-2015  Université de Bordeaux
+ * Copyright (C) 2010-2016  Université de Bordeaux
  * Copyright (C) 2010, 2011, 2013  CNRS
  * Copyright (C) 2010, 2011, 2013  CNRS
  * Copyright (C) 2011  Télécom-SudParis
  * Copyright (C) 2011  Télécom-SudParis
  *
  *
@@ -220,10 +220,10 @@ struct starpu_task *_starpu_fifo_pop_task(struct _starpu_fifo_taskq *fifo_queue,
 	     task != starpu_task_list_end(&fifo_queue->taskq);
 	     task != starpu_task_list_end(&fifo_queue->taskq);
 	     task  = starpu_task_list_next(task))
 	     task  = starpu_task_list_next(task))
 	{
 	{
-		unsigned nimpl;
+		unsigned nimpl = 0;
 		STARPU_ASSERT(task);
 		STARPU_ASSERT(task);
 
 
-		if (starpu_worker_can_execute_task_first_impl(workerid, task, &nimpl))
+		if (workerid < 0 || starpu_worker_can_execute_task_first_impl(workerid, task, &nimpl))
 		{
 		{
 			starpu_task_set_implementation(task, nimpl);
 			starpu_task_set_implementation(task, nimpl);
 			starpu_task_list_erase(&fifo_queue->taskq, task);
 			starpu_task_list_erase(&fifo_queue->taskq, task);

+ 254 - 0
src/sched_policies/graph_test_policy.c

@@ -0,0 +1,254 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2010-2016  Université de Bordeaux
+ * Copyright (C) 2010-2013  CNRS
+ * Copyright (C) 2011  INRIA
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+/*
+ *	This is just a test policy for using task graph information
+ *
+ *	We keep tasks in the fifo queue, and store the graph of tasks, until we
+ *	get the do_schedule call from the application, which tells us all tasks
+ *	were queued, and we can now compute task depths and let a simple
+ *	central-queue greedy algorithm proceed.
+ *
+ *	TODO: let workers starting running tasks before the whole graph is submitted?
+ */
+
+#include <starpu_scheduler.h>
+#include <sched_policies/fifo_queues.h>
+#include <sched_policies/prio_deque.h>
+#include <common/graph.h>
+#include <common/thread.h>
+#include <starpu_bitmap.h>
+#include <core/task.h>
+
+struct _starpu_graph_test_policy_data
+{
+	struct _starpu_fifo_taskq *fifo;	/* Bag of tasks which are ready before do_schedule is called */
+	struct _starpu_prio_deque prio;
+	starpu_pthread_mutex_t policy_mutex;
+	struct starpu_bitmap *waiters;
+	unsigned computed;
+};
+
+static void initialize_graph_test_policy(unsigned sched_ctx_id)
+{
+	struct _starpu_graph_test_policy_data *data = (struct _starpu_graph_test_policy_data*)malloc(sizeof(struct _starpu_graph_test_policy_data));
+
+	/* there is only a single queue in that trivial design */
+	data->fifo =  _starpu_create_fifo();
+	 _starpu_prio_deque_init(&data->prio );
+	data->waiters = starpu_bitmap_create();
+	data->computed = 0;
+
+	_starpu_graph_record = 1;
+
+	 /* Tell helgrind that it's fine to check for empty fifo in
+	  * pop_task_graph_test_policy without actual mutex (it's just an integer)
+	  */
+	STARPU_HG_DISABLE_CHECKING(data->fifo->ntasks);
+
+	starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)data);
+	STARPU_PTHREAD_MUTEX_INIT(&data->policy_mutex, NULL);
+}
+
+static void deinitialize_graph_test_policy(unsigned sched_ctx_id)
+{
+	struct _starpu_graph_test_policy_data *data = (struct _starpu_graph_test_policy_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
+	struct _starpu_fifo_taskq *fifo = data->fifo;
+
+	STARPU_ASSERT(starpu_task_list_empty(&fifo->taskq));
+
+	/* deallocate the job queue */
+	_starpu_destroy_fifo(fifo);
+	starpu_bitmap_destroy(data->waiters);
+
+	STARPU_PTHREAD_MUTEX_DESTROY(&data->policy_mutex);
+	free(data);
+}
+
+static void set_priority(void *_data STARPU_ATTRIBUTE_UNUSED, struct _starpu_job *job)
+{
+	job->task->priority = job->depth;
+}
+
+static void do_schedule_graph_test_policy(unsigned sched_ctx_id)
+{
+	struct _starpu_graph_test_policy_data *data = (struct _starpu_graph_test_policy_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
+
+	STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
+	_starpu_graph_compute_depths();
+	data->computed = 1;
+	_starpu_graph_foreach(set_priority, NULL);
+
+	/* Now that we have priorities, move tasks from bag to priority queue */
+	while(!_starpu_fifo_empty(data->fifo)) {
+		struct starpu_task *task = _starpu_fifo_pop_task(data->fifo, -1);
+		_starpu_prio_deque_push_back_task(&data->prio, task);
+	}
+
+	/* And unleash the beast! */
+	unsigned worker;
+	struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
+	struct starpu_sched_ctx_iterator it;
+	workers->init_iterator(workers, &it);
+	while(workers->has_next(workers, &it))
+	{
+		/* Tell each worker is shouldn't sleep any more */
+		worker = workers->get_next(workers, &it);
+#ifdef STARPU_NON_BLOCKING_DRIVERS
+			starpu_bitmap_unset(data->waiters, worker);
+#endif
+	}
+	STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
+
+#if !defined(STARPU_NON_BLOCKING_DRIVERS) || defined(STARPU_SIMGRID)
+	workers->init_iterator(workers, &it);
+	while(workers->has_next(workers, &it))
+	{
+		/* Wake each worker */
+		worker = workers->get_next(workers, &it);
+		starpu_wake_worker(worker);
+	}
+#endif
+}
+
+static int push_task_graph_test_policy(struct starpu_task *task)
+{
+	unsigned sched_ctx_id = task->sched_ctx;
+	struct _starpu_graph_test_policy_data *data = (struct _starpu_graph_test_policy_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
+
+	STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
+	if (!data->computed)
+	{
+		/* Priorities are not computed, leave the task in the bag for now */
+		starpu_task_list_push_back(&data->fifo->taskq,task);
+		data->fifo->ntasks++;
+		data->fifo->nprocessed++;
+		starpu_push_task_end(task);
+		STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
+		return 0;
+	}
+
+	/* Priorities are computed, we can push to execution */
+	_starpu_prio_deque_push_back_task(&data->prio,task);
+
+	starpu_push_task_end(task);
+
+	/*if there are no tasks block */
+	/* wake people waiting for a task */
+	unsigned worker = 0;
+	struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
+	
+	struct starpu_sched_ctx_iterator it;
+#ifndef STARPU_NON_BLOCKING_DRIVERS
+	char dowake[STARPU_NMAXWORKERS] = { 0 };
+#endif
+
+	workers->init_iterator_for_parallel_tasks(workers, &it, task);
+	while(workers->has_next(workers, &it))
+	{
+		worker = workers->get_next(workers, &it);
+
+#ifdef STARPU_NON_BLOCKING_DRIVERS
+		if (!starpu_bitmap_get(data->waiters, worker))
+			/* This worker is not waiting for a task */
+			continue;
+#endif
+
+		if (starpu_worker_can_execute_task_first_impl(worker, task, NULL))
+		{
+			/* It can execute this one, tell him! */
+#ifdef STARPU_NON_BLOCKING_DRIVERS
+			starpu_bitmap_unset(data->waiters, worker);
+			/* We really woke at least somebody, no need to wake somebody else */
+			break;
+#else
+			dowake[worker] = 1;
+#endif
+		}
+	}
+	/* Let the task free */
+	STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
+
+#if !defined(STARPU_NON_BLOCKING_DRIVERS) || defined(STARPU_SIMGRID)
+	/* Now that we have a list of potential workers, try to wake one */
+
+	workers->init_iterator_for_parallel_tasks(workers, &it, task);
+	while(workers->has_next(workers, &it))
+	{
+		worker = workers->get_next(workers, &it);
+		if (dowake[worker])
+			if (starpu_wake_worker(worker))
+				break; // wake up a single worker
+	}
+#endif
+
+	starpu_sched_ctx_list_task_counters_increment_all(task, sched_ctx_id);
+
+	return 0;
+}
+
+static struct starpu_task *pop_task_graph_test_policy(unsigned sched_ctx_id)
+{
+	struct starpu_task *chosen_task = NULL;
+	unsigned workerid = starpu_worker_get_id();
+	struct _starpu_graph_test_policy_data *data = (struct _starpu_graph_test_policy_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
+
+	/* block until some event happens */
+	/* Here helgrind would shout that this is unprotected, this is just an
+	 * integer access, and we hold the sched mutex, so we can not miss any
+	 * wake up. */
+	if (!STARPU_RUNNING_ON_VALGRIND && _starpu_prio_deque_is_empty(&data->prio))
+		return NULL;
+
+#ifdef STARPU_NON_BLOCKING_DRIVERS
+	if (!STARPU_RUNNING_ON_VALGRIND && !data->computed)
+		/* Not computed yet */
+		return NULL;
+	if (!STARPU_RUNNING_ON_VALGRIND && starpu_bitmap_get(data->waiters, workerid))
+		/* Nobody woke us, avoid bothering the mutex */
+		return NULL;
+#endif
+
+	STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
+	if (!data->computed)
+	{
+		STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
+		return NULL;
+	}
+
+	chosen_task = _starpu_prio_deque_pop_task_for_worker(&data->prio, workerid);
+	if (!chosen_task)
+		/* Tell pushers that we are waiting for tasks for us */
+		starpu_bitmap_set(data->waiters, workerid);
+
+	STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
+
+	return chosen_task;
+}
+
+struct starpu_sched_policy _starpu_sched_graph_test_policy =
+{
+	.init_sched = initialize_graph_test_policy,
+	.deinit_sched = deinitialize_graph_test_policy,
+	.do_schedule = do_schedule_graph_test_policy,
+	.push_task = push_task_graph_test_policy,
+	.pop_task = pop_task_graph_test_policy,
+	.policy_name = "graph_test",
+	.policy_description = "test policy for using graphs in scheduling decisions",
+	.worker_type = STARPU_WORKER_LIST,
+};