Prechádzať zdrojové kódy

Drop graph nodes lazily, to avoid workers getting stuck on deallocating a job while a smart scheduler is holding the task graph lock

Samuel Thibault 8 rokov pred
rodič
commit
1691636086

+ 94 - 17
src/common/graph.c

@@ -16,14 +16,20 @@
 
 /*
  * This stores the task graph structure, to used by the schedulers which need
- * it.  We do not always enable it since it is costly.
+ * it.  We do not always enable it since it is costly.  To avoid interfering
+ * too much with execution, it may be a bit outdated, i.e. still contain jobs
+ * which have completed very recently.
+ *
+ * This is because we drop nodes lazily: when a job terminates, we just add the
+ * node to the dropped list (to avoid having to take the mutex on the whole
+ * graph).  The graph gets updated whenever the graph mutex becomes available.
  */
 
 #include <starpu.h>
 #include <core/jobs.h>
 #include <common/graph.h>
 
-/* Protects the whole task graph */
+/* Protects the whole task graph except the dropped list */
 static starpu_pthread_rwlock_t graph_lock;
 
 /* Whether we should enable recording the task graph */
@@ -36,12 +42,61 @@ struct _starpu_graph_node_multilist_bottom bottom;
 /* This list contains all nodes */
 struct _starpu_graph_node_multilist_all all;
 
+/* Protects the dropped list, always taken before graph lock */
+static starpu_pthread_mutex_t dropped_lock;
+/* This list contains all dropped nodes, i.e. the job terminated by the corresponding node is still int he graph */
+struct _starpu_graph_node_multilist_dropped dropped;
+
 void _starpu_graph_init(void)
 {
 	STARPU_PTHREAD_RWLOCK_INIT(&graph_lock, NULL);
 	_starpu_graph_node_multilist_init_top(&top);
 	_starpu_graph_node_multilist_init_bottom(&bottom);
 	_starpu_graph_node_multilist_init_all(&all);
+	STARPU_PTHREAD_MUTEX_INIT(&dropped_lock, NULL);
+	_starpu_graph_node_multilist_init_dropped(&dropped);
+}
+
+void _starpu_graph_wrlock(void)
+{
+	STARPU_PTHREAD_RWLOCK_WRLOCK(&graph_lock);
+}
+
+void _starpu_graph_drop_node(struct _starpu_graph_node *node);
+void _starpu_graph_wrunlock(void)
+{
+	struct _starpu_graph_node *node, *next;
+	struct _starpu_graph_node_multilist_dropped dropping;
+
+	STARPU_PTHREAD_RWLOCK_UNLOCK(&graph_lock);
+	STARPU_PTHREAD_MUTEX_LOCK(&dropped_lock);
+	/* Pick up the list of dropped nodes */
+	_starpu_graph_node_multilist_move_dropped(&dropped, &dropping);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&dropped_lock);
+
+	/* And now process it if it's not empty.  */
+	if (!_starpu_graph_node_multilist_empty_dropped(&dropping))
+	{
+		STARPU_PTHREAD_RWLOCK_WRLOCK(&graph_lock);
+		for (node = _starpu_graph_node_multilist_begin_dropped(&dropping);
+		     node != _starpu_graph_node_multilist_end_dropped(&dropping);
+		     node = next)
+		{
+			next = _starpu_graph_node_multilist_next_dropped(node);
+			_starpu_graph_drop_node(node);
+		}
+		STARPU_PTHREAD_RWLOCK_UNLOCK(&graph_lock);
+	}
+}
+
+void _starpu_graph_rdlock(void)
+{
+	STARPU_PTHREAD_RWLOCK_RDLOCK(&graph_lock);
+}
+
+void _starpu_graph_rdunlock(void)
+{
+	STARPU_PTHREAD_RWLOCK_UNLOCK(&graph_lock);
 }
 
 static void __starpu_graph_foreach(void (*func)(void *data, struct _starpu_graph_node *node), void *data)
@@ -62,14 +117,14 @@ void _starpu_graph_add_job(struct _starpu_job *job)
 	job->graph_node = node;
 	STARPU_PTHREAD_MUTEX_INIT(&node->mutex, NULL);
 
-	STARPU_PTHREAD_RWLOCK_WRLOCK(&graph_lock);
+	_starpu_graph_wrlock();
 
 	/* It does not have any dependency yet, add to all lists */
 	_starpu_graph_node_multilist_push_back_top(&top, node);
 	_starpu_graph_node_multilist_push_back_bottom(&bottom, node);
 	_starpu_graph_node_multilist_push_back_all(&all, node);
 
-	STARPU_PTHREAD_RWLOCK_UNLOCK(&graph_lock);
+	_starpu_graph_wrunlock();
 }
 
 /* Add a node to an array of nodes */
@@ -95,7 +150,7 @@ static unsigned add_node(struct _starpu_graph_node *node, struct _starpu_graph_n
 void _starpu_graph_add_job_dep(struct _starpu_job *job, struct _starpu_job *prev_job)
 {
 	unsigned rank_incoming, rank_outgoing;
-	STARPU_PTHREAD_RWLOCK_WRLOCK(&graph_lock);
+	_starpu_graph_wrlock();
 	struct _starpu_graph_node *node = job->graph_node;
 	struct _starpu_graph_node *prev_node = prev_job->graph_node;
 
@@ -111,16 +166,14 @@ void _starpu_graph_add_job_dep(struct _starpu_job *job, struct _starpu_job *prev
 	rank_outgoing = add_node(node, &prev_node->outgoing, &prev_node->n_outgoing, &prev_node->alloc_outgoing, &prev_node->outgoing_slot);
 	prev_node->outgoing_slot[rank_outgoing] = rank_incoming;
 
-	STARPU_PTHREAD_RWLOCK_UNLOCK(&graph_lock);
+	_starpu_graph_wrunlock();
 }
 
 /* Drop a node, and thus its dependencies */
-void _starpu_graph_drop_job(struct _starpu_job *job)
+void _starpu_graph_drop_node(struct _starpu_graph_node *node)
 {
 	unsigned i;
-	STARPU_PTHREAD_RWLOCK_WRLOCK(&graph_lock);
-	struct _starpu_graph_node *node = job->graph_node;
-	job->graph_node = NULL;
+	STARPU_ASSERT(!node->job);
 
 	if (_starpu_graph_node_multilist_queued_bottom(node))
 		_starpu_graph_node_multilist_erase_bottom(&bottom, node);
@@ -135,7 +188,7 @@ void _starpu_graph_drop_job(struct _starpu_job *job)
 		struct _starpu_graph_node *next = node->outgoing[i];
 		next->incoming[node->outgoing_slot[i]] = NULL;
 	}
-	STARPU_PTHREAD_RWLOCK_UNLOCK(&graph_lock);
+
 	node->n_outgoing = 0;
 	free(node->outgoing);
 	node->outgoing = NULL;
@@ -149,6 +202,30 @@ void _starpu_graph_drop_job(struct _starpu_job *job)
 	free(node);
 }
 
+/* Drop a job */
+void _starpu_graph_drop_job(struct _starpu_job *job)
+{
+	struct _starpu_graph_node *node = job->graph_node;
+	job->graph_node = NULL;
+
+	STARPU_PTHREAD_MUTEX_LOCK(&node->mutex);
+	/* Will not be able to use the job any more */
+	node->job = NULL;
+	STARPU_PTHREAD_MUTEX_UNLOCK(&node->mutex);
+
+	STARPU_PTHREAD_MUTEX_LOCK(&dropped_lock);
+	if (STARPU_PTHREAD_RWLOCK_TRYWRLOCK(&graph_lock) == 0)
+	{
+		/* Graph wrlock is available, drop node immediately */
+		_starpu_graph_drop_node(node);
+		STARPU_PTHREAD_RWLOCK_UNLOCK(&graph_lock);
+	}
+	else
+		/* Queue for removal when lock becomes available */
+		_starpu_graph_node_multilist_push_back_dropped(&dropped, node);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&dropped_lock);
+}
+
 static void _starpu_graph_set_n(void *data, struct _starpu_graph_node *node)
 {
 	int value = (intptr_t) data;
@@ -223,7 +300,7 @@ void _starpu_graph_compute_depths(void)
 {
 	struct _starpu_graph_node *node;
 
-	STARPU_PTHREAD_RWLOCK_WRLOCK(&graph_lock);
+	_starpu_graph_wrlock();
 
 	/* The bottom of the graph has depth 0 */
 	for (node = _starpu_graph_node_multilist_begin_bottom(&bottom);
@@ -233,7 +310,7 @@ void _starpu_graph_compute_depths(void)
 
 	_starpu_graph_compute_bottom_up(compute_depth, NULL);
 
-	STARPU_PTHREAD_RWLOCK_UNLOCK(&graph_lock);
+	_starpu_graph_wrunlock();
 }
 
 void _starpu_graph_compute_descendants(void)
@@ -244,7 +321,7 @@ void _starpu_graph_compute_descendants(void)
 	unsigned current_alloc = 0, next_alloc = 0, swap_alloc;
 	unsigned descendants;
 
-	STARPU_PTHREAD_RWLOCK_WRLOCK(&graph_lock);
+	_starpu_graph_wrlock();
 
 	/* Yes, this is O(|V|.(|V|+|E|)) :( */
 
@@ -305,7 +382,7 @@ void _starpu_graph_compute_descendants(void)
 		node->descendants = descendants;
 	}
 
-	STARPU_PTHREAD_RWLOCK_UNLOCK(&graph_lock);
+	_starpu_graph_wrunlock();
 
 	free(current_set);
 	free(next_set);
@@ -313,7 +390,7 @@ void _starpu_graph_compute_descendants(void)
 
 void _starpu_graph_foreach(void (*func)(void *data, struct _starpu_graph_node *node), void *data)
 {
-	STARPU_PTHREAD_RWLOCK_WRLOCK(&graph_lock);
+	_starpu_graph_wrlock();
 	__starpu_graph_foreach(func, data);
-	STARPU_PTHREAD_RWLOCK_UNLOCK(&graph_lock);
+	_starpu_graph_wrunlock();
 }

+ 10 - 2
src/common/graph.h

@@ -21,10 +21,11 @@
 MULTILIST_CREATE_TYPE(_starpu_graph_node, all)
 MULTILIST_CREATE_TYPE(_starpu_graph_node, top)
 MULTILIST_CREATE_TYPE(_starpu_graph_node, bottom)
+MULTILIST_CREATE_TYPE(_starpu_graph_node, dropped)
 
 struct _starpu_graph_node {
 	starpu_pthread_mutex_t mutex;	/* protects access to the job */
-	struct _starpu_job *job;
+	struct _starpu_job *job;	/* pointer to the job, if it is still alive, NULL otherwise */
 
 	/*
 	 * Fields for graph analysis for scheduling heuristics
@@ -35,6 +36,8 @@ struct _starpu_graph_node {
 	struct _starpu_graph_node_multilist_bottom bottom;
 	/* Member of list of all jobs */
 	struct _starpu_graph_node_multilist_all all;
+	/* Member of list of dropped jobs */
+	struct _starpu_graph_node_multilist_dropped dropped;
 
 	/* set of incoming dependencies */
 	struct _starpu_graph_node **incoming;	/* May contain NULLs for terminated jobs */
@@ -57,9 +60,14 @@ struct _starpu_graph_node {
 MULTILIST_CREATE_INLINES(struct _starpu_graph_node, _starpu_graph_node, all)
 MULTILIST_CREATE_INLINES(struct _starpu_graph_node, _starpu_graph_node, top)
 MULTILIST_CREATE_INLINES(struct _starpu_graph_node, _starpu_graph_node, bottom)
+MULTILIST_CREATE_INLINES(struct _starpu_graph_node, _starpu_graph_node, dropped)
 
-void _starpu_graph_init(void);
 extern int _starpu_graph_record;
+void _starpu_graph_init(void);
+void _starpu_graph_wrlock(void);
+void _starpu_graph_rdlock(void);
+void _starpu_graph_wrunlock(void);
+void _starpu_graph_rdunlock(void);
 
 /* Add a job to the graph, called before any _starpu_graph_add_job_dep call */
 void _starpu_graph_add_job(struct _starpu_job *job);

+ 14 - 0
src/common/list.h

@@ -285,6 +285,20 @@ static inline TYPE *ENAME##_multilist_end_##MEMBER(struct ENAME##_multilist_##ME
 /* Return the next element of the list.  */ \
 static inline TYPE *ENAME##_multilist_next_##MEMBER(TYPE *e) { \
 	return ENAME##_of_multilist_##MEMBER(e->MEMBER.next); \
+} \
+\
+ /* Move a list from its head to another head.  */ \
+static inline void ENAME##_multilist_move_##MEMBER(struct ENAME##_multilist_##MEMBER *head, struct ENAME##_multilist_##MEMBER *newhead) { \
+	if (ENAME##_multilist_empty_##MEMBER(head)) \
+		ENAME##_multilist_init_##MEMBER(newhead); \
+	else { \
+		newhead->next = head->next; \
+		newhead->next->prev = newhead; \
+		newhead->prev = head->prev; \
+		newhead->prev->next = newhead; \
+		head->next = head; \
+		head->prev = head; \
+	} \
 }
 
 #endif /* __LIST_H__ */

+ 9 - 4
src/sched_policies/graph_test_policy.c

@@ -148,11 +148,16 @@ static struct _starpu_prio_deque *select_prio(unsigned sched_ctx_id, struct _sta
 static void set_priority(void *_data, struct _starpu_graph_node *node)
 {
 	struct _starpu_graph_test_policy_data *data = _data;
+	STARPU_PTHREAD_MUTEX_LOCK(&node->mutex);
 	struct _starpu_job *job = node->job;
-	if (data->descendants)
-		job->task->priority = node->descendants;
-	else
-		job->task->priority = node->depth;
+	if (job)
+	{
+		if (data->descendants)
+			job->task->priority = node->descendants;
+		else
+			job->task->priority = node->depth;
+	}
+	STARPU_PTHREAD_MUTEX_UNLOCK(&node->mutex);
 }
 
 static void do_schedule_graph_test_policy(unsigned sched_ctx_id)