瀏覽代碼

Add "descendants" computation to graph scheduling

Samuel Thibault 9 年之前
父節點
當前提交
981a1d84f1
共有 4 個文件被更改,包括 92 次插入5 次删除
  1. 72 0
      src/common/graph.c
  2. 3 0
      src/common/graph.h
  3. 3 0
      src/core/jobs.h
  4. 14 5
      src/sched_policies/graph_test_policy.c

+ 72 - 0
src/common/graph.c

@@ -224,6 +224,78 @@ void _starpu_graph_compute_depths(void)
 	STARPU_PTHREAD_RWLOCK_UNLOCK(&graph_lock);
 }
 
+void _starpu_graph_compute_descendants(void)
+{
+	struct _starpu_job *job, *job2, *job3;
+	struct _starpu_job **current_set = NULL, **next_set = NULL, **swap_set;
+	unsigned current_n, next_n, i, j;
+	unsigned current_alloc = 0, next_alloc = 0, swap_alloc;
+	unsigned descendants;
+
+	STARPU_PTHREAD_RWLOCK_WRLOCK(&graph_lock);
+
+	/* Yes, this is O(|V|.(|V|+|E|)) :( */
+
+	/* We could get O(|V|.|E|) by doing a topological sort first.
+	 *
+	 * |E| is usually O(|V|), though (bounded number of data dependencies,
+	 * and we use synchronization tasks) */
+
+	for (job = _starpu_job_list_begin(&all, all);
+	     job != _starpu_job_list_end(&all, all);
+	     job = _starpu_job_list_next(&all, job, all))
+	{
+		/* Mark all nodes as unseen */
+		for (job2 = _starpu_job_list_begin(&all, all);
+		     job2 != _starpu_job_list_end(&all, all);
+		     job2 = _starpu_job_list_next(&all, job2, all))
+			job2->graph_n = 0;
+
+		/* Start with the node we want to compute the number of descendants of */
+		current_n = 0;
+		add_job(job, &current_set, &current_n, &current_alloc, NULL);
+		job->graph_n = 1;
+
+		descendants = 0;
+		/* While we have descendants, count their descendants */
+		while (current_n) {
+			/* Next set is initially empty */
+			next_n = 0;
+
+			/* For each node in the current set */
+			for (i = 0; i < current_n; i++)
+			{
+				job2 = current_set[i];
+				/* For each child of this job2 */
+				for (j = 0; j < job2->n_outgoing; j++)
+				{
+					job3 = job2->outgoing[j];
+					if (!job3)
+						continue;
+					if (job3->graph_n)
+						/* Already seen */
+						continue;
+					/* Add this node */
+					job3->graph_n = 1;
+					descendants++;
+					add_job(job3, &next_set, &next_n, &next_alloc, NULL);
+				}
+			}
+			/* Swap next set with current set */
+			swap_set = next_set;
+			swap_alloc = next_alloc;
+			next_set = current_set;
+			next_alloc = current_alloc;
+			current_set = swap_set;
+			current_alloc = swap_alloc;
+			current_n = next_n;
+		}
+		job->descendants = descendants;
+	}
+
+	STARPU_PTHREAD_RWLOCK_UNLOCK(&graph_lock);
+}
+
 void _starpu_graph_foreach(void (*func)(void *data, struct _starpu_job *job), void *data)
 {
 	STARPU_PTHREAD_RWLOCK_WRLOCK(&graph_lock);

+ 3 - 0
src/common/graph.h

@@ -30,5 +30,8 @@ void _starpu_graph_drop_job(struct _starpu_job *job);
 /* This does not take job duration into account, just the number */
 void _starpu_graph_compute_depths(void);
 
+/* Compute the descendants of jobs in the graph */
+void _starpu_graph_compute_descendants(void);
+
 /* Apply func on each job of the graph */
 void _starpu_graph_foreach(void (*func)(void *data, struct _starpu_job *job), void *data);

+ 3 - 0
src/core/jobs.h

@@ -273,6 +273,9 @@ struct _starpu_job {
 	unsigned alloc_outgoing;	/* Size of outgoing */
 
 	unsigned depth;			/* Rank from bottom, in number of jobs */
+					/* Only available if _starpu_graph_compute_depths was called */
+	unsigned descendants;		/* Number of children, grand-children, etc. */
+					/* Only available if _starpu_graph_compute_descendants was called */
 
 	int graph_n;			/* Variable available for graph flow */
 

+ 14 - 5
src/sched_policies/graph_test_policy.c

@@ -21,7 +21,7 @@
  *
  *	We keep tasks in the fifo queue, and store the graph of tasks, until we
  *	get the do_schedule call from the application, which tells us all tasks
- *	were queued, and we can now compute task depths and let a simple
+ *	were queued, and we can now compute task depths or descendants and let a simple
  *	central-queue greedy algorithm proceed.
  *
  *	TODO: let workers starting running tasks before the whole graph is submitted?
@@ -43,6 +43,7 @@ struct _starpu_graph_test_policy_data
 	starpu_pthread_mutex_t policy_mutex;
 	struct starpu_bitmap *waiters;
 	unsigned computed;
+	unsigned descendants;			/* Whether we use descendants, or depths, for priorities */
 };
 
 static void initialize_graph_test_policy(unsigned sched_ctx_id)
@@ -55,6 +56,7 @@ static void initialize_graph_test_policy(unsigned sched_ctx_id)
 	 _starpu_prio_deque_init(&data->prio_gpu);
 	data->waiters = starpu_bitmap_create();
 	data->computed = 0;
+	data->descendants = starpu_get_env_number_default("STARPU_SCHED_GRAPH_TEST_DESCENDANTS", 0);
 
 	_starpu_graph_record = 1;
 
@@ -140,9 +142,13 @@ static struct _starpu_prio_deque *select_prio(unsigned sched_ctx_id, struct _sta
 
 }
 
-static void set_priority(void *_data STARPU_ATTRIBUTE_UNUSED, struct _starpu_job *job)
+static void set_priority(void *_data, struct _starpu_job *job)
 {
-	job->task->priority = job->depth;
+	struct _starpu_graph_test_policy_data *data = _data;
+	if (data->descendants)
+		job->task->priority = job->descendants;
+	else
+		job->task->priority = job->depth;
 }
 
 static void do_schedule_graph_test_policy(unsigned sched_ctx_id)
@@ -150,9 +156,12 @@ static void do_schedule_graph_test_policy(unsigned sched_ctx_id)
 	struct _starpu_graph_test_policy_data *data = (struct _starpu_graph_test_policy_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 
 	STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
-	_starpu_graph_compute_depths();
+	if (data->descendants)
+		_starpu_graph_compute_descendants();
+	else
+		_starpu_graph_compute_depths();
 	data->computed = 1;
-	_starpu_graph_foreach(set_priority, NULL);
+	_starpu_graph_foreach(set_priority, data);
 
 	/* Now that we have priorities, move tasks from bag to priority queue */
 	while(!_starpu_fifo_empty(data->fifo)) {