Quellcode durchsuchen

redesigned algorithm for peager scheduling policy

Olivier Aumage vor 7 Jahren
Ursprung
Commit
1b8631f4a5

+ 5 - 4
examples/sched_ctx/sched_ctx.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2012-2014,2017                           Inria
+ * Copyright (C) 2012-2014,2017-2018                      Inria
  * Copyright (C) 2010-2017                                CNRS
  * Copyright (C) 2010-2014                                Université de Bordeaux
  *
@@ -79,8 +79,10 @@ int main(void)
 	int nprocs1 = 0;
 	int nprocs2 = 0;
 	int procs1[STARPU_NMAXWORKERS], procs2[STARPU_NMAXWORKERS];
-	char *sched;
-
+	char *sched = getenv("STARPU_SCHED");
+	if (sched && !strcmp(sched, "peager"))
+		/* FIXME peager does not support multiple sched_ctx */
+		return 77;
 	ret = starpu_init(NULL);
 	if (ret == -ENODEV)
 		return 77;
@@ -104,7 +106,6 @@ int main(void)
 	}
 
 	/*create contexts however you want*/
-	sched = getenv("STARPU_SCHED");
 	unsigned sched_ctx1 = starpu_sched_ctx_create(procs1, nprocs1, "ctx1", STARPU_SCHED_CTX_POLICY_NAME, sched?sched:"eager", 0);
 	unsigned sched_ctx2 = starpu_sched_ctx_create(procs2, nprocs2, "ctx2", STARPU_SCHED_CTX_POLICY_NAME, sched?sched:"eager", 0);
 

+ 5 - 2
examples/sched_ctx/two_cpu_contexts.c

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2017                                     CNRS
- * Copyright (C) 2016                                     Inria
+ * Copyright (C) 2016,2018                                Inria
  * Copyright (C) 2016                                     Université de Bordeaux
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -46,7 +46,10 @@ int main(void)
 	int *procs2 = NULL;
 	int i;
 	int n = 20;
-
+	char *starpu_sched = getenv("STARPU_SCHED");
+	if (starpu_sched && !strcmp(starpu_sched, "peager"))
+		/* FIXME peager does not support multiple sched_ctx */
+		return 77;
 	int ret = starpu_init(NULL);
 	if (ret == -ENODEV)
 		return 77;

+ 138 - 129
src/sched_policies/parallel_eager.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2011-2013,2015,2017                      Inria
+ * Copyright (C) 2011-2013,2015,2017,2018                 Inria
  * Copyright (C) 2011-2016                                Université de Bordeaux
  * Copyright (C) 2011-2014,2016-2017                      CNRS
  * Copyright (C) 2013                                     Thibaut Lambert
@@ -26,84 +26,59 @@ struct _starpu_peager_data
 {
 	struct _starpu_fifo_taskq *fifo;
 	struct _starpu_fifo_taskq *local_fifo[STARPU_NMAXWORKERS];
-
-	int master_id[STARPU_NMAXWORKERS];
-        starpu_pthread_mutex_t policy_mutex;
+	starpu_pthread_mutex_t policy_mutex;
+	int possible_combinations_cnt[STARPU_NMAXWORKERS];
+	int *possible_combinations[STARPU_NMAXWORKERS];
+	int *possible_combinations_size[STARPU_NMAXWORKERS];
+	int max_combination_size[STARPU_NMAXWORKERS];
 };
 
-#define STARPU_NMAXCOMBINED_WORKERS 520
-/* instead of STARPU_NMAXCOMBINED_WORKERS, we should use some "MAX combination .."*/
-static int possible_combinations_cnt[STARPU_NMAXWORKERS];
-static int possible_combinations[STARPU_NMAXWORKERS][STARPU_NMAXCOMBINED_WORKERS];
-static int possible_combinations_size[STARPU_NMAXWORKERS][STARPU_NMAXCOMBINED_WORKERS];
-
-
-/*!!!!!!! It doesn't work with several contexts because the combined workers are constructed
-  from the workers available to the program, and not to the context !!!!!!!!!!!!!!!!!!!!!!!
- */
-
 static void peager_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
 {
 	_starpu_sched_find_worker_combinations(workerids, nworkers);
 	struct _starpu_peager_data *data = (struct _starpu_peager_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
-	unsigned nbasic_workers = starpu_worker_get_count();
-	unsigned ncombined_workers= starpu_combined_worker_get_count();
-	unsigned workerid, i;
+	const unsigned nbasic_workers = starpu_worker_get_count();
+	const unsigned ncombined_workers = starpu_combined_worker_get_count();
+	unsigned i;
 
-	/* Find the master of each worker. We first assign the worker as its
-	 * own master, and then iterate over the different worker combinations
-	 * to find the biggest combination containing this worker. */
 	for(i = 0; i < nworkers; i++)
 	{
-		workerid = workerids[i];
+		unsigned workerid = workerids[i];
 		starpu_sched_ctx_worker_shares_tasks_lists(workerid, sched_ctx_id);
-		int cnt = possible_combinations_cnt[workerid]++;
-		possible_combinations[workerid][cnt] = workerid;
-		possible_combinations_size[workerid][cnt] = 1;
-
-		data->master_id[workerid] = workerid;
+		data->possible_combinations_cnt[workerid] = 0;
+		int cnt = data->possible_combinations_cnt[workerid]++;
+		_STARPU_CALLOC(data->possible_combinations[workerid], ncombined_workers, sizeof(int));
+		_STARPU_CALLOC(data->possible_combinations_size[workerid], ncombined_workers, sizeof(int));
+		data->possible_combinations[workerid][cnt] = workerid;
+		data->possible_combinations_size[workerid][cnt] = 1;
+		data->max_combination_size[workerid] = 1;
 	}
 
-
 	for (i = 0; i < ncombined_workers; i++)
 	{
-		workerid = nbasic_workers + i;
-
-		/* Note that we ASSUME that the workers are sorted by size ! */
+		unsigned combined_workerid = nbasic_workers + i;
 		int *workers;
 		int size;
-		starpu_combined_worker_get_description(workerid, &size, &workers);
-
+		starpu_combined_worker_get_description(combined_workerid, &size, &workers);
 		int master = workers[0];
-		int j;
-		for (j = 0; j < size; j++)
+		if (size > data->max_combination_size[master])
 		{
-			if (data->master_id[workers[j]] > master)
-				data->master_id[workers[j]] = master;
-			int cnt = possible_combinations_cnt[workers[j]]++;
-			possible_combinations[workers[j]][cnt] = workerid;
-			possible_combinations_size[workers[j]][cnt] = size;
+			data->max_combination_size[master] = size;
 		}
+		int cnt = data->possible_combinations_cnt[master]++;
+		data->possible_combinations[master][cnt] = combined_workerid;
+		data->possible_combinations_size[master][cnt] = size;
 	}
 
 	for(i = 0; i < nworkers; i++)
 	{
-		workerid = workerids[i];
+		unsigned workerid = workerids[i];
 
 		/* slaves pick up tasks from their local queue, their master
 		 * will put tasks directly in that local list when a parallel
 		 * tasks comes. */
 		data->local_fifo[workerid] = _starpu_create_fifo();
 	}
-
-#if 0
-	for(i = 0; i < nworkers; i++)
-        {
-		workerid = workerids[i];
-
-		_STARPU_MSG("MASTER of %d = %d\n", workerid, master_id[workerid]);
-	}
-#endif
 }
 
 static void peager_remove_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
@@ -114,7 +89,13 @@ static void peager_remove_workers(unsigned sched_ctx_id, int *workerids, unsigne
         {
 		int workerid = workerids[i];
 		if(!starpu_worker_is_combined_worker(workerid))
+		{
 			_starpu_destroy_fifo(data->local_fifo[workerid]);
+			free(data->possible_combinations[workerid]);
+			data->possible_combinations[workerid] = NULL;
+			free(data->possible_combinations_size[workerid]);
+			data->possible_combinations_size[workerid] = NULL;
+		}
 	}
 }
 
@@ -151,6 +132,9 @@ static int push_task_peager_policy(struct starpu_task *task)
 
 	STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
 	ret_val = _starpu_fifo_push_task(data->fifo, task);
+#ifndef STARPU_NON_BLOCKING_DRIVERS
+	int is_parallel_task = task->cl && task->cl->max_parallelism > 1;
+#endif
 	starpu_push_task_end(task);
 	STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
 
@@ -160,18 +144,29 @@ static int push_task_peager_policy(struct starpu_task *task)
 	struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
 
 	struct starpu_sched_ctx_iterator it;
-
+	const unsigned ncombined_workers = starpu_combined_worker_get_count();
 	workers->init_iterator(workers, &it);
 	while(workers->has_next(workers, &it))
 	{
 		int worker = workers->get_next(workers, &it);
-		int master = data->master_id[worker];
 		/* If this is not a CPU or a MIC, then the worker simply grabs tasks from the fifo */
-		if ((!starpu_worker_is_combined_worker(worker) &&
-		    starpu_worker_get_type(worker) != STARPU_MIC_WORKER &&
-		    starpu_worker_get_type(worker) != STARPU_CPU_WORKER)
-			|| (master == worker))
+		if (starpu_worker_is_combined_worker(worker))
+		{
+			continue;
+		}
+		if (starpu_worker_get_type(worker) != STARPU_MIC_WORKER
+				&& starpu_worker_get_type(worker) != STARPU_CPU_WORKER)
+		{
 			_starpu_wake_worker_relax_light(worker);
+			continue;
+		}
+		if ((!is_parallel_task) /* This is not a parallel task, can wake any worker */
+				|| (ncombined_workers == 0) /* There is no combined worker */
+				|| (data->max_combination_size[worker] > 1) /* This is a combined worker master and the task is parallel */
+		   )
+		{
+			_starpu_wake_worker_relax_light(worker);
+		}
 	}
 #endif
 
@@ -197,95 +192,109 @@ static struct starpu_task *pop_task_peager_policy(unsigned sched_ctx_id)
 		return task;
 	}
 
-	int master = data->master_id[workerid];
-
-	//_STARPU_DEBUG("workerid:%d, master:%d\n",workerid,master);
-
-
+	const unsigned ncombined_workers = starpu_combined_worker_get_count();
 	struct starpu_task *task = NULL;
-	if (master == workerid)
+	int slave_task = 0;
+	_starpu_worker_relax_on();
+	STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
+	_starpu_worker_relax_off();
+	/* check if a slave task is available in the local queue */
+	task = _starpu_fifo_pop_task(data->local_fifo[workerid], workerid);
+	if (!task)
 	{
-		/* The worker is a master */
-		_starpu_worker_relax_on();
-		STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
-		_starpu_worker_relax_off();
+		/* no slave task, try to pop a task as master */
 		task = _starpu_fifo_pop_task(data->fifo, workerid);
-		STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
-
-		if (!task)
-			goto ret;
-
-		/* Find the largest compatible worker combination */
-		int best_size = -1;
-		int best_workerid = -1;
-		int i;
-		for (i = 0; i < possible_combinations_cnt[master]; i++)
+		if (task)
 		{
-			if (possible_combinations_size[workerid][i] > best_size)
-			{
-				int combined_worker = possible_combinations[workerid][i];
-				if (starpu_combined_worker_can_execute_task(combined_worker, task, 0))
-				{
-					best_size = possible_combinations_size[workerid][i];
-					best_workerid = combined_worker;
-				}
-			}
+			_STARPU_DEBUG("%d: poping master task %p\n", task);
 		}
 
-		/* In case nobody can execute this task, we let the master
-		 * worker take it anyway, so that it can discard it afterward.
-		 * */
-		if (best_workerid == -1)
-			goto ret;
-
-		/* Is this a basic worker or a combined worker ? */
-		int nbasic_workers = (int)starpu_worker_get_count();
-		int is_basic_worker = (best_workerid < nbasic_workers);
-		if (is_basic_worker)
+#if 1
+		/* Optional heuristic to filter out purely slave workers for parallel tasks */
+		if (task && task->cl && task->cl->max_parallelism > 1 && data->max_combination_size[workerid] == 1 && ncombined_workers > 0)
 		{
-
-			/* The master is alone */
-			goto ret;
+			/* task is potentially parallel, leave it for a combined worker master */
+			_starpu_fifo_push_back_task(data->fifo, task);
+			task = NULL;
 		}
-		else
+#endif
+	}
+	else
+	{
+		slave_task = 1;
+		_STARPU_DEBUG("%d: poping slave task %p\n", task);
+	}
+	if (!task || slave_task)
+	{
+		STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
+		goto ret;
+	}
+	/* Find the largest compatible worker combination */
+	int best_size = -1;
+	int best_workerid = -1;
+	int i;
+	for (i = 0; i < data->possible_combinations_cnt[workerid]; i++)
+	{
+		if (data->possible_combinations_size[workerid][i] > best_size)
 		{
-			starpu_parallel_task_barrier_init(task, best_workerid);
-			int worker_size = 0;
-			int *combined_workerid;
-			starpu_combined_worker_get_description(best_workerid, &worker_size, &combined_workerid);
-
-			/* Dispatch task aliases to the different slaves */
-			for (i = 1; i < worker_size; i++)
+			int combined_worker = data->possible_combinations[workerid][i];
+			if (starpu_combined_worker_can_execute_task(combined_worker, task, 0))
 			{
-				struct starpu_task *alias = starpu_task_dup(task);
-				int local_worker = combined_workerid[i];
-
-				alias->destroy = 1;
-				_starpu_worker_lock(local_worker);
-				_starpu_fifo_push_task(data->local_fifo[local_worker], alias);
-
-#if !defined(STARPU_NON_BLOCKING_DRIVERS) || defined(STARPU_SIMGRID)
-				starpu_wake_worker_locked(local_worker);
-#endif
-				_starpu_worker_unlock(local_worker);
+				best_size = data->possible_combinations_size[workerid][i];
+				best_workerid = combined_worker;
 			}
-
-			/* The master also manipulated an alias */
-			struct starpu_task *master_alias = starpu_task_dup(task);
-			master_alias->destroy = 1;
-			task = master_alias;
-			goto ret;
 		}
 	}
-	else
+	_STARPU_DEBUG("task %p, best_workerid=%d, best_size=%d\n", task, best_workerid, best_size);
+
+	/* In case nobody can execute this task, we let the master
+	 * worker take it anyway, so that it can discard it afterward.
+	 * */
+	if (best_workerid == -1)
 	{
-		/* The worker is a slave */
-		_starpu_worker_relax_on();
-		STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
-		_starpu_worker_relax_off();
-		task = _starpu_fifo_pop_task(data->local_fifo[workerid], workerid);
 		STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
+		goto ret;
 	}
+
+	/* Is this a basic worker or a combined worker ? */
+	if (best_workerid < starpu_worker_get_count())
+	{
+		STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
+		/* The master is alone */
+		goto ret;
+	}
+	starpu_parallel_task_barrier_init(task, best_workerid);
+	int worker_size = 0;
+	int *combined_workerid;
+	starpu_combined_worker_get_description(best_workerid, &worker_size, &combined_workerid);
+
+	_STARPU_DEBUG("dispatching task %p on combined worker %d of size %d\n", task, best_workerid, worker_size);
+	/* Dispatch task aliases to the different slaves */
+	for (i = 1; i < worker_size; i++)
+	{
+		struct starpu_task *alias = starpu_task_dup(task);
+		int local_worker = combined_workerid[i];
+		alias->destroy = 1;
+		_starpu_fifo_push_task(data->local_fifo[local_worker], alias);
+	}
+
+	/* The master also manipulated an alias */
+	struct starpu_task *master_alias = starpu_task_dup(task);
+	master_alias->destroy = 1;
+	task = master_alias;
+	
+	STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
+
+	for (i = 1; i < worker_size; i++)
+	{
+		int local_worker = combined_workerid[i];
+		_starpu_worker_lock(local_worker);
+#if !defined(STARPU_NON_BLOCKING_DRIVERS) || defined(STARPU_SIMGRID)
+		starpu_wake_worker_locked(local_worker);
+#endif
+		_starpu_worker_unlock(local_worker);
+	}
+
 ret:
 	return task;
 }