Quellcode durchsuchen

externalize task graph into its own graph structure which does not rely on the existence of jobs

Samuel Thibault vor 8 Jahren
Ursprung
Commit
2da4df7056
5 geänderte Dateien mit 172 neuen und 148 gelöschten Zeilen
  1. 119 109
      src/common/graph.c
  2. 47 1
      src/common/graph.h
  3. 1 1
      src/core/jobs.c
  4. 1 34
      src/core/jobs.h
  5. 4 3
      src/sched_policies/graph_test_policy.c

+ 119 - 109
src/common/graph.c

@@ -29,127 +29,137 @@ static starpu_pthread_rwlock_t graph_lock;
 /* Whether we should enable recording the task graph */
 int _starpu_graph_record;
 
-/* This list contains all jobs without incoming dependency */
-struct _starpu_job_multilist_top top;
-/* This list contains all jobs without outgoing dependency */
-struct _starpu_job_multilist_bottom bottom;
-/* This list contains all jobs */
-struct _starpu_job_multilist_all all;
+/* This list contains all nodes without incoming dependency */
+struct _starpu_graph_node_multilist_top top;
+/* This list contains all nodes without outgoing dependency */
+struct _starpu_graph_node_multilist_bottom bottom;
+/* This list contains all nodes */
+struct _starpu_graph_node_multilist_all all;
 
 void _starpu_graph_init(void)
 {
 	STARPU_PTHREAD_RWLOCK_INIT(&graph_lock, NULL);
-	_starpu_job_multilist_init_top(&top);
-	_starpu_job_multilist_init_bottom(&bottom);
-	_starpu_job_multilist_init_all(&all);
+	_starpu_graph_node_multilist_init_top(&top);
+	_starpu_graph_node_multilist_init_bottom(&bottom);
+	_starpu_graph_node_multilist_init_all(&all);
 }
 
-static void __starpu_graph_foreach(void (*func)(void *data, struct _starpu_job *job), void *data)
+static void __starpu_graph_foreach(void (*func)(void *data, struct _starpu_graph_node *node), void *data)
 {
-	struct _starpu_job *job;
+	struct _starpu_graph_node *node;
 
-	for (job = _starpu_job_multilist_begin_all(&all);
-	     job != _starpu_job_multilist_end_all(&all);
-	     job = _starpu_job_multilist_next_all(job))
-		func(data, job);
+	for (node = _starpu_graph_node_multilist_begin_all(&all);
+	     node != _starpu_graph_node_multilist_end_all(&all);
+	     node = _starpu_graph_node_multilist_next_all(node))
+		func(data, node);
 }
 
-/* Add a job to the graph */
+/* Add a node to the graph */
 void _starpu_graph_add_job(struct _starpu_job *job)
 {
+	struct _starpu_graph_node *node = calloc(1, sizeof(*node));
+	node->job = job;
+	job->graph_node = node;
+	STARPU_PTHREAD_MUTEX_INIT(&node->mutex, NULL);
+
 	STARPU_PTHREAD_RWLOCK_WRLOCK(&graph_lock);
 
 	/* It does not have any dependency yet, add to all lists */
-	_starpu_job_multilist_push_back_top(&top, job);
-	_starpu_job_multilist_push_back_bottom(&bottom, job);
-	_starpu_job_multilist_push_back_all(&all, job);
+	_starpu_graph_node_multilist_push_back_top(&top, node);
+	_starpu_graph_node_multilist_push_back_bottom(&bottom, node);
+	_starpu_graph_node_multilist_push_back_all(&all, node);
 
 	STARPU_PTHREAD_RWLOCK_UNLOCK(&graph_lock);
 }
 
-/* Add a job to an array of jobs */
-static unsigned add_job(struct _starpu_job *job, struct _starpu_job ***jobs, unsigned *n_jobs, unsigned *alloc_jobs, unsigned **slot)
+/* Add a node to an array of nodes */
+static unsigned add_node(struct _starpu_graph_node *node, struct _starpu_graph_node ***nodes, unsigned *n_nodes, unsigned *alloc_nodes, unsigned **slot)
 {
 	unsigned ret;
-	if (*n_jobs == *alloc_jobs)
+	if (*n_nodes == *alloc_nodes)
 	{
-		if (*alloc_jobs)
-			*alloc_jobs *= 2;
+		if (*alloc_nodes)
+			*alloc_nodes *= 2;
 		else
-			*alloc_jobs = 4;
-		*jobs = realloc(*jobs, *alloc_jobs * sizeof(**jobs));
+			*alloc_nodes = 4;
+		*nodes = realloc(*nodes, *alloc_nodes * sizeof(**nodes));
 		if (slot)
-			*slot = realloc(*slot, *alloc_jobs * sizeof(**slot));
+			*slot = realloc(*slot, *alloc_nodes * sizeof(**slot));
 	}
-	ret = (*n_jobs)++;
-	(*jobs)[ret] = job;
+	ret = (*n_nodes)++;
+	(*nodes)[ret] = node;
 	return ret;
 }
 
-/* Add a dependency between jobs */
+/* Add a dependency between nodes */
 void _starpu_graph_add_job_dep(struct _starpu_job *job, struct _starpu_job *prev_job)
 {
 	unsigned rank_incoming, rank_outgoing;
 	STARPU_PTHREAD_RWLOCK_WRLOCK(&graph_lock);
+	struct _starpu_graph_node *node = job->graph_node;
+	struct _starpu_graph_node *prev_node = prev_job->graph_node;
 
-	if (_starpu_job_multilist_queued_bottom(prev_job))
-		/* Previous job is not at bottom any more */
-		_starpu_job_multilist_erase_bottom(&bottom, prev_job);
+	if (_starpu_graph_node_multilist_queued_bottom(prev_node))
+		/* Previous node is not at bottom any more */
+		_starpu_graph_node_multilist_erase_bottom(&bottom, prev_node);
 
-	if (_starpu_job_multilist_queued_top(job))
-		/* Next job is not at top any more */
-		_starpu_job_multilist_erase_top(&top, job);
+	if (_starpu_graph_node_multilist_queued_top(node))
+		/* Next node is not at top any more */
+		_starpu_graph_node_multilist_erase_top(&top, node);
 
-	rank_incoming = add_job(prev_job, &job->incoming, &job->n_incoming, &job->alloc_incoming, NULL);
-	rank_outgoing = add_job(job, &prev_job->outgoing, &prev_job->n_outgoing, &prev_job->alloc_outgoing, &prev_job->outgoing_slot);
-	prev_job->outgoing_slot[rank_outgoing] = rank_incoming;
+	rank_incoming = add_node(prev_node, &node->incoming, &node->n_incoming, &node->alloc_incoming, NULL);
+	rank_outgoing = add_node(node, &prev_node->outgoing, &prev_node->n_outgoing, &prev_node->alloc_outgoing, &prev_node->outgoing_slot);
+	prev_node->outgoing_slot[rank_outgoing] = rank_incoming;
 
 	STARPU_PTHREAD_RWLOCK_UNLOCK(&graph_lock);
 }
 
-/* Drop a job, and thus its dependencies */
+/* Drop a node, and thus its dependencies */
 void _starpu_graph_drop_job(struct _starpu_job *job)
 {
 	unsigned i;
 	STARPU_PTHREAD_RWLOCK_WRLOCK(&graph_lock);
-
-	if (_starpu_job_multilist_queued_bottom(job))
-		_starpu_job_multilist_erase_bottom(&bottom, job);
-	if (_starpu_job_multilist_queued_top(job))
-		_starpu_job_multilist_erase_top(&top, job);
-	if (_starpu_job_multilist_queued_all(job))
-		_starpu_job_multilist_erase_all(&all, job);
-
-	/* Drop ourself from the incoming part of the outgoing jobs */
-	for (i = 0; i < job->n_outgoing; i++)
+	struct _starpu_graph_node *node = job->graph_node;
+	job->graph_node = NULL;
+
+	if (_starpu_graph_node_multilist_queued_bottom(node))
+		_starpu_graph_node_multilist_erase_bottom(&bottom, node);
+	if (_starpu_graph_node_multilist_queued_top(node))
+		_starpu_graph_node_multilist_erase_top(&top, node);
+	if (_starpu_graph_node_multilist_queued_all(node))
+		_starpu_graph_node_multilist_erase_all(&all, node);
+
+	/* Drop ourself from the incoming part of the outgoing nodes */
+	for (i = 0; i < node->n_outgoing; i++)
 	{
-		struct _starpu_job *next = job->outgoing[i];
-		next->incoming[job->outgoing_slot[i]] = NULL;
+		struct _starpu_graph_node *next = node->outgoing[i];
+		next->incoming[node->outgoing_slot[i]] = NULL;
 	}
-	job->n_outgoing = 0;
-	free(job->outgoing);
-	job->outgoing = NULL;
-	free(job->outgoing_slot);
-	job->outgoing_slot = NULL;
-	job->alloc_outgoing = 0;
-	job->n_incoming = 0;
-	free(job->incoming);
-	job->incoming = NULL;
-	job->alloc_incoming = 0;
 	STARPU_PTHREAD_RWLOCK_UNLOCK(&graph_lock);
+	node->n_outgoing = 0;
+	free(node->outgoing);
+	node->outgoing = NULL;
+	free(node->outgoing_slot);
+	node->outgoing_slot = NULL;
+	node->alloc_outgoing = 0;
+	node->n_incoming = 0;
+	free(node->incoming);
+	node->incoming = NULL;
+	node->alloc_incoming = 0;
+	free(node);
 }
 
-static void _starpu_graph_set_n(void *data, struct _starpu_job *job)
+static void _starpu_graph_set_n(void *data, struct _starpu_graph_node *node)
 {
 	int value = (intptr_t) data;
-	job->graph_n = value;
+	node->graph_n = value;
 }
 
 /* Call func for each vertex of the task graph, from bottom to top, in topological order */
-static void _starpu_graph_compute_bottom_up(void (*func)(struct _starpu_job *next_job, struct _starpu_job *prev_job, void *data), void *data)
+static void _starpu_graph_compute_bottom_up(void (*func)(struct _starpu_graph_node *next_node, struct _starpu_graph_node *prev_node, void *data), void *data)
 {
-	struct _starpu_job *job, *job2;
-	struct _starpu_job **current_set = NULL, **next_set = NULL, **swap_set;
+	struct _starpu_graph_node *node, *node2;
+	struct _starpu_graph_node **current_set = NULL, **next_set = NULL, **swap_set;
 	unsigned current_n, next_n, i, j;
 	unsigned current_alloc = 0, next_alloc = 0, swap_alloc;
 
@@ -160,10 +170,10 @@ static void _starpu_graph_compute_bottom_up(void (*func)(struct _starpu_job *nex
 
 	/* Start with the bottom of the graph */
 	current_n = 0;
-	for (job = _starpu_job_multilist_begin_bottom(&bottom);
-	     job != _starpu_job_multilist_end_bottom(&bottom);
-	     job = _starpu_job_multilist_next_bottom(job))
-		add_job(job, &current_set, &current_n, &current_alloc, NULL);
+	for (node = _starpu_graph_node_multilist_begin_bottom(&bottom);
+	     node != _starpu_graph_node_multilist_end_bottom(&bottom);
+	     node = _starpu_graph_node_multilist_next_bottom(node))
+		add_node(node, &current_set, &current_n, &current_alloc, NULL);
 
 	/* Now propagate to top as long as we have current nodes */
 	while (current_n)
@@ -174,19 +184,19 @@ static void _starpu_graph_compute_bottom_up(void (*func)(struct _starpu_job *nex
 		/* For each node in the current set */
 		for (i = 0; i < current_n; i++)
 		{
-			job = current_set[i];
-			/* For each parent of this job */
-			for (j = 0; j < job->n_incoming; j++)
+			node = current_set[i];
+			/* For each parent of this node */
+			for (j = 0; j < node->n_incoming; j++)
 			{
-				job2 = job->incoming[j];
-				if (!job2)
+				node2 = node->incoming[j];
+				if (!node2)
 					continue;
-				job2->graph_n++;
-				func(job, job2, data);
+				node2->graph_n++;
+				func(node, node2, data);
 
-				if ((unsigned) job2->graph_n == job2->n_outgoing)
+				if ((unsigned) node2->graph_n == node2->n_outgoing)
 					/* All outgoing edges were processed, can now add to next set */
-					add_job(job2, &next_set, &next_n, &next_alloc, NULL);
+					add_node(node2, &next_set, &next_n, &next_alloc, NULL);
 			}
 		}
 
@@ -203,23 +213,23 @@ static void _starpu_graph_compute_bottom_up(void (*func)(struct _starpu_job *nex
 	free(next_set);
 }
 
-static void compute_depth(struct _starpu_job *next_job, struct _starpu_job *prev_job, void *data STARPU_ATTRIBUTE_UNUSED)
+static void compute_depth(struct _starpu_graph_node *next_node, struct _starpu_graph_node *prev_node, void *data STARPU_ATTRIBUTE_UNUSED)
 {
-	if (prev_job->depth < next_job->depth + 1)
-		prev_job->depth = next_job->depth + 1;
+	if (prev_node->depth < next_node->depth + 1)
+		prev_node->depth = next_node->depth + 1;
 }
 
 void _starpu_graph_compute_depths(void)
 {
-	struct _starpu_job *job;
+	struct _starpu_graph_node *node;
 
 	STARPU_PTHREAD_RWLOCK_WRLOCK(&graph_lock);
 
 	/* The bottom of the graph has depth 0 */
-	for (job = _starpu_job_multilist_begin_bottom(&bottom);
-	     job != _starpu_job_multilist_end_bottom(&bottom);
-	     job = _starpu_job_multilist_next_bottom(job))
-		job->depth = 0;
+	for (node = _starpu_graph_node_multilist_begin_bottom(&bottom);
+	     node != _starpu_graph_node_multilist_end_bottom(&bottom);
+	     node = _starpu_graph_node_multilist_next_bottom(node))
+		node->depth = 0;
 
 	_starpu_graph_compute_bottom_up(compute_depth, NULL);
 
@@ -228,8 +238,8 @@ void _starpu_graph_compute_depths(void)
 
 void _starpu_graph_compute_descendants(void)
 {
-	struct _starpu_job *job, *job2, *job3;
-	struct _starpu_job **current_set = NULL, **next_set = NULL, **swap_set;
+	struct _starpu_graph_node *node, *node2, *node3;
+	struct _starpu_graph_node **current_set = NULL, **next_set = NULL, **swap_set;
 	unsigned current_n, next_n, i, j;
 	unsigned current_alloc = 0, next_alloc = 0, swap_alloc;
 	unsigned descendants;
@@ -243,20 +253,20 @@ void _starpu_graph_compute_descendants(void)
 	 * |E| is usually O(|V|), though (bounded number of data dependencies,
 	 * and we use synchronization tasks) */
 
-	for (job = _starpu_job_multilist_begin_all(&all);
-	     job != _starpu_job_multilist_end_all(&all);
-	     job = _starpu_job_multilist_next_all(job))
+	for (node = _starpu_graph_node_multilist_begin_all(&all);
+	     node != _starpu_graph_node_multilist_end_all(&all);
+	     node = _starpu_graph_node_multilist_next_all(node))
 	{
 		/* Mark all nodes as unseen */
-		for (job2 = _starpu_job_multilist_begin_all(&all);
-		     job2 != _starpu_job_multilist_end_all(&all);
-		     job2 = _starpu_job_multilist_next_all(job2))
-			job2->graph_n = 0;
+		for (node2 = _starpu_graph_node_multilist_begin_all(&all);
+		     node2 != _starpu_graph_node_multilist_end_all(&all);
+		     node2 = _starpu_graph_node_multilist_next_all(node2))
+			node2->graph_n = 0;
 
 		/* Start with the node we want to compute the number of descendants of */
 		current_n = 0;
-		add_job(job, &current_set, &current_n, &current_alloc, NULL);
-		job->graph_n = 1;
+		add_node(node, &current_set, &current_n, &current_alloc, NULL);
+		node->graph_n = 1;
 
 		descendants = 0;
 		/* While we have descendants, count their descendants */
@@ -267,20 +277,20 @@ void _starpu_graph_compute_descendants(void)
 			/* For each node in the current set */
 			for (i = 0; i < current_n; i++)
 			{
-				job2 = current_set[i];
-				/* For each child of this job2 */
-				for (j = 0; j < job2->n_outgoing; j++)
+				node2 = current_set[i];
+				/* For each child of this node2 */
+				for (j = 0; j < node2->n_outgoing; j++)
 				{
-					job3 = job2->outgoing[j];
-					if (!job3)
+					node3 = node2->outgoing[j];
+					if (!node3)
 						continue;
-					if (job3->graph_n)
+					if (node3->graph_n)
 						/* Already seen */
 						continue;
 					/* Add this node */
-					job3->graph_n = 1;
+					node3->graph_n = 1;
 					descendants++;
-					add_job(job3, &next_set, &next_n, &next_alloc, NULL);
+					add_node(node3, &next_set, &next_n, &next_alloc, NULL);
 				}
 			}
 			/* Swap next set with current set */
@@ -292,7 +302,7 @@ void _starpu_graph_compute_descendants(void)
 			current_alloc = swap_alloc;
 			current_n = next_n;
 		}
-		job->descendants = descendants;
+		node->descendants = descendants;
 	}
 
 	STARPU_PTHREAD_RWLOCK_UNLOCK(&graph_lock);
@@ -301,7 +311,7 @@ void _starpu_graph_compute_descendants(void)
 	free(next_set);
 }
 
-void _starpu_graph_foreach(void (*func)(void *data, struct _starpu_job *job), void *data)
+void _starpu_graph_foreach(void (*func)(void *data, struct _starpu_graph_node *node), void *data)
 {
 	STARPU_PTHREAD_RWLOCK_WRLOCK(&graph_lock);
 	__starpu_graph_foreach(func, data);

+ 47 - 1
src/common/graph.h

@@ -14,6 +14,50 @@
  * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  */
 
+#ifndef __GRAPH_H__
+#define __GRAPH_H__
+
+#include <common/list.h>
+MULTILIST_CREATE_TYPE(_starpu_graph_node, all)
+MULTILIST_CREATE_TYPE(_starpu_graph_node, top)
+MULTILIST_CREATE_TYPE(_starpu_graph_node, bottom)
+
+struct _starpu_graph_node {
+	starpu_pthread_mutex_t mutex;	/* protects access to the job */
+	struct _starpu_job *job;
+
+	/*
+	 * Fields for graph analysis for scheduling heuristics
+	 */
+	/* Member of list of all jobs without incoming dependency */
+	struct _starpu_graph_node_multilist_top top;
+	/* Member of list of all jobs without outgoing dependency */
+	struct _starpu_graph_node_multilist_bottom bottom;
+	/* Member of list of all jobs */
+	struct _starpu_graph_node_multilist_all all;
+
+	/* set of incoming dependencies */
+	struct _starpu_graph_node **incoming;	/* May contain NULLs for terminated jobs */
+	unsigned n_incoming;		/* Number of slots used */
+	unsigned alloc_incoming;	/* Size of incoming */
+	/* set of outgoing dependencies */
+	struct _starpu_graph_node **outgoing;
+	unsigned *outgoing_slot;	/* Index within corresponding incoming array */
+	unsigned n_outgoing;		/* Number of slots used */
+	unsigned alloc_outgoing;	/* Size of outgoing */
+
+	unsigned depth;			/* Rank from bottom, in number of jobs */
+					/* Only available if _starpu_graph_compute_depths was called */
+	unsigned descendants;		/* Number of children, grand-children, etc. */
+					/* Only available if _starpu_graph_compute_descendants was called */
+
+	int graph_n;			/* Variable available for graph flow */
+};
+
+MULTILIST_CREATE_INLINES(struct _starpu_graph_node, _starpu_graph_node, all)
+MULTILIST_CREATE_INLINES(struct _starpu_graph_node, _starpu_graph_node, top)
+MULTILIST_CREATE_INLINES(struct _starpu_graph_node, _starpu_graph_node, bottom)
+
 void _starpu_graph_init(void);
 extern int _starpu_graph_record;
 
@@ -35,4 +79,6 @@ void _starpu_graph_compute_descendants(void);
 
 /* This calls \e func for each node of the task graph, passing also \e data as it */
 /* Apply func on each job of the graph */
-void _starpu_graph_foreach(void (*func)(void *data, struct _starpu_job *job), void *data);
+void _starpu_graph_foreach(void (*func)(void *data, struct _starpu_graph_node *node), void *data);
+
+#endif /* __GRAPH_H__ */

+ 1 - 1
src/core/jobs.c

@@ -148,7 +148,7 @@ void _starpu_job_destroy(struct _starpu_job *j)
 		j->dyn_dep_slots = NULL;
 	}
 
-	if (_starpu_graph_record)
+	if (_starpu_graph_record && j->graph_node)
 		_starpu_graph_drop_job(j);
 
 	if (max_memory_use)

+ 1 - 34
src/core/jobs.h

@@ -64,10 +64,6 @@ struct _starpu_data_descr
 	int node;
 };
 
-MULTILIST_CREATE_TYPE(_starpu_job, all)
-MULTILIST_CREATE_TYPE(_starpu_job, top)
-MULTILIST_CREATE_TYPE(_starpu_job, bottom)
-
 /* A job is the internal representation of a task. */
 struct _starpu_job {
 
@@ -192,32 +188,7 @@ struct _starpu_job {
 	starpu_pthread_barrier_t after_work_barrier;
 	unsigned after_work_busy_barrier;
 
-	/*
-	 * Fields for graph analysis for scheduling heuristics
-	 */
-	/* Member of list of all jobs without incoming dependency */
-	struct _starpu_job_multilist_top top;
-	/* Member of list of all jobs without outgoing dependency */
-	struct _starpu_job_multilist_bottom bottom;
-	/* Member of list of all jobs */
-	struct _starpu_job_multilist_all all;
-
-	/* set of incoming dependencies */
-	struct _starpu_job **incoming;	/* May contain NULLs for terminated jobs */
-	unsigned n_incoming;		/* Number of slots used */
-	unsigned alloc_incoming;	/* Size of incoming */
-	/* set of outgoing dependencies */
-	struct _starpu_job **outgoing;
-	unsigned *outgoing_slot;	/* Index within corresponding incoming array */
-	unsigned n_outgoing;		/* Number of slots used */
-	unsigned alloc_outgoing;	/* Size of outgoing */
-
-	unsigned depth;			/* Rank from bottom, in number of jobs */
-					/* Only available if _starpu_graph_compute_depths was called */
-	unsigned descendants;		/* Number of children, grand-children, etc. */
-					/* Only available if _starpu_graph_compute_descendants was called */
-
-	int graph_n;			/* Variable available for graph flow */
+	struct _starpu_graph_node *graph_node;
 
 #ifdef STARPU_DEBUG
 	/* Linked-list of all jobs, for debugging */
@@ -225,10 +196,6 @@ struct _starpu_job {
 #endif
 };
 
-MULTILIST_CREATE_INLINES(struct _starpu_job, _starpu_job, all)
-MULTILIST_CREATE_INLINES(struct _starpu_job, _starpu_job, top)
-MULTILIST_CREATE_INLINES(struct _starpu_job, _starpu_job, bottom)
-
 void _starpu_job_init(void);
 void _starpu_job_fini(void);
 

+ 4 - 3
src/sched_policies/graph_test_policy.c

@@ -145,13 +145,14 @@ static struct _starpu_prio_deque *select_prio(unsigned sched_ctx_id, struct _sta
 
 }
 
-static void set_priority(void *_data, struct _starpu_job *job)
+static void set_priority(void *_data, struct _starpu_graph_node *node)
 {
 	struct _starpu_graph_test_policy_data *data = _data;
+	struct _starpu_job *job = node->job;
 	if (data->descendants)
-		job->task->priority = job->descendants;
+		job->task->priority = node->descendants;
 	else
-		job->task->priority = job->depth;
+		job->task->priority = node->depth;
 }
 
 static void do_schedule_graph_test_policy(unsigned sched_ctx_id)