Browse Source

enable parallel eager to work with scheduling contexts

Olivier Aumage 7 years ago
parent
commit
c85979b76c

+ 0 - 3
examples/sched_ctx/sched_ctx.c

@@ -80,9 +80,6 @@ int main(void)
 	int nprocs2 = 0;
 	int procs1[STARPU_NMAXWORKERS], procs2[STARPU_NMAXWORKERS];
 	char *sched = getenv("STARPU_SCHED");
-	if (sched && !strcmp(sched, "peager"))
-		/* FIXME peager does not support multiple sched_ctx */
-		return 77;
 	ret = starpu_init(NULL);
 	if (ret == -ENODEV)
 		return 77;

+ 0 - 4
examples/sched_ctx/two_cpu_contexts.c

@@ -46,10 +46,6 @@ int main(void)
 	int *procs2 = NULL;
 	int i;
 	int n = 20;
-	char *starpu_sched = getenv("STARPU_SCHED");
-	if (starpu_sched && !strcmp(starpu_sched, "peager"))
-		/* FIXME peager does not support multiple sched_ctx */
-		return 77;
 	int ret = starpu_init(NULL);
 	if (ret == -ENODEV)
 		return 77;

+ 119 - 53
src/sched_policies/parallel_eager.c

@@ -22,57 +22,123 @@
 #include <starpu_scheduler.h>
 #include <core/workers.h>
 
-struct _starpu_peager_data
+struct _starpu_peager_common_data
 {
-	struct _starpu_fifo_taskq *fifo;
-	struct _starpu_fifo_taskq *local_fifo[STARPU_NMAXWORKERS];
-	starpu_pthread_mutex_t policy_mutex;
 	int possible_combinations_cnt[STARPU_NMAXWORKERS];
 	int *possible_combinations[STARPU_NMAXWORKERS];
 	int *possible_combinations_size[STARPU_NMAXWORKERS];
 	int max_combination_size[STARPU_NMAXWORKERS];
+	int no_combined_workers;
+	int ref_count;
 };
 
-static void peager_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
+struct _starpu_peager_common_data *_peager_common_data = NULL;
+
+struct _starpu_peager_data
 {
-	_starpu_sched_find_worker_combinations(workerids, nworkers);
-	struct _starpu_peager_data *data = (struct _starpu_peager_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
-	const unsigned nbasic_workers = starpu_worker_get_count();
-	const unsigned ncombined_workers = starpu_combined_worker_get_count();
-	unsigned i;
+	starpu_pthread_mutex_t policy_mutex;
+	struct _starpu_fifo_taskq *fifo;
+	struct _starpu_fifo_taskq *local_fifo[STARPU_NMAXWORKERS];
+};
 
-	for(i = 0; i < nworkers; i++)
+static void initialize_peager_common(void)
+{
+	if (_peager_common_data == NULL)
 	{
-		unsigned workerid = workerids[i];
-		starpu_sched_ctx_worker_shares_tasks_lists(workerid, sched_ctx_id);
-		data->possible_combinations_cnt[workerid] = 0;
-		int cnt = data->possible_combinations_cnt[workerid]++;
-		_STARPU_CALLOC(data->possible_combinations[workerid], ncombined_workers, sizeof(int));
-		_STARPU_CALLOC(data->possible_combinations_size[workerid], ncombined_workers, sizeof(int));
-		data->possible_combinations[workerid][cnt] = workerid;
-		data->possible_combinations_size[workerid][cnt] = 1;
-		data->max_combination_size[workerid] = 1;
+		struct _starpu_peager_common_data *common_data = NULL;
+		_STARPU_CALLOC(common_data, 1, sizeof(struct _starpu_peager_common_data));
+		common_data->ref_count = 1;
+		_peager_common_data = common_data;
+
+		const unsigned nbasic_workers = starpu_worker_get_count();
+		int basic_workerids[nbasic_workers];
+		int i;
+		for(i = 0; i < nbasic_workers; i++)
+		{
+			basic_workerids[i] = i;
+		}
+
+		_starpu_sched_find_worker_combinations(basic_workerids, nbasic_workers);
+		const unsigned ncombined_workers = starpu_combined_worker_get_count();
+		common_data->no_combined_workers = ncombined_workers == 0;
+
+		for(i = 0; i < nbasic_workers; i++)
+		{
+			common_data->possible_combinations_cnt[i] = 0;
+			int cnt = common_data->possible_combinations_cnt[i]++;
+			/* Allocate ncombined_workers + 1 for the singleton worker itself */
+			_STARPU_CALLOC(common_data->possible_combinations[i], 1+ncombined_workers, sizeof(int));
+			_STARPU_CALLOC(common_data->possible_combinations_size[i], 1+ncombined_workers, sizeof(int));
+			common_data->possible_combinations[i][cnt] = i;
+			common_data->possible_combinations_size[i][cnt] = 1;
+			common_data->max_combination_size[i] = 1;
+		}
+
+		for (i = 0; i < ncombined_workers; i++)
+		{
+			unsigned combined_workerid = nbasic_workers + i;
+			int *workers;
+			int size;
+			starpu_combined_worker_get_description(combined_workerid, &size, &workers);
+			int master = workers[0];
+			if (size > common_data->max_combination_size[master])
+			{
+				common_data->max_combination_size[master] = size;
+			}
+			int cnt = common_data->possible_combinations_cnt[master]++;
+			common_data->possible_combinations[master][cnt] = combined_workerid;
+			common_data->possible_combinations_size[master][cnt] = size;
+		}
+	}
+	else
+	{
+		_peager_common_data->ref_count++;
 	}
+}
 
-	for (i = 0; i < ncombined_workers; i++)
+static void deinitialize_peager_common(void)
+{
+	STARPU_ASSERT(_peager_common_data != NULL);
+	_peager_common_data->ref_count--;
+	if (_peager_common_data->ref_count == 0)
 	{
-		unsigned combined_workerid = nbasic_workers + i;
-		int *workers;
-		int size;
-		starpu_combined_worker_get_description(combined_workerid, &size, &workers);
-		int master = workers[0];
-		if (size > data->max_combination_size[master])
+		const unsigned nbasic_workers = starpu_worker_get_count();
+		int i;
+		for(i = 0; i < nbasic_workers; i++)
 		{
-			data->max_combination_size[master] = size;
+			free(_peager_common_data->possible_combinations[i]);
+			_peager_common_data->possible_combinations[i] = NULL;
+			free(_peager_common_data->possible_combinations_size[i]);
+			_peager_common_data->possible_combinations_size[i] = NULL;
 		}
-		int cnt = data->possible_combinations_cnt[master]++;
-		data->possible_combinations[master][cnt] = combined_workerid;
-		data->possible_combinations_size[master][cnt] = size;
+		free(_peager_common_data);
+		_peager_common_data = NULL;
+	}
+
+}
+
+static void peager_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
+{
+	if (sched_ctx_id == 0)
+	{
+		/* FIXME Fix scheduling contexts initialization or combined
+		 * worker management, to make the initialize_peager_common()
+		 * call to work right from initialize_peager_policy. For now,
+		 * this fails because it causes combined workers to be generated
+		 * too early. */
+		initialize_peager_common();
 	}
+	struct _starpu_peager_data *data = (struct _starpu_peager_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
+	unsigned i;
 
 	for(i = 0; i < nworkers; i++)
 	{
 		unsigned workerid = workerids[i];
+		if(starpu_worker_is_combined_worker(workerid))
+		{
+			continue;
+		}
+		starpu_sched_ctx_worker_shares_tasks_lists(workerid, sched_ctx_id);
 
 		/* slaves pick up tasks from their local queue, their master
 		 * will put tasks directly in that local list when a parallel
@@ -91,18 +157,18 @@ static void peager_remove_workers(unsigned sched_ctx_id, int *workerids, unsigne
 		if(!starpu_worker_is_combined_worker(workerid))
 		{
 			_starpu_destroy_fifo(data->local_fifo[workerid]);
-			free(data->possible_combinations[workerid]);
-			data->possible_combinations[workerid] = NULL;
-			free(data->possible_combinations_size[workerid]);
-			data->possible_combinations_size[workerid] = NULL;
 		}
 	}
+	if (sched_ctx_id == 0)
+	{
+		deinitialize_peager_common();
+	}
 }
 
 static void initialize_peager_policy(unsigned sched_ctx_id)
 {
 	struct _starpu_peager_data *data;
-	_STARPU_MALLOC(data, sizeof(struct _starpu_peager_data));
+	_STARPU_CALLOC(data, 1, sizeof(struct _starpu_peager_data));
 	/* masters pick tasks from that queue */
 	data->fifo = _starpu_create_fifo();
 
@@ -139,33 +205,33 @@ static int push_task_peager_policy(struct starpu_task *task)
 	STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
 
 #ifndef STARPU_NON_BLOCKING_DRIVERS
+	struct _starpu_peager_common_data *common_data = _peager_common_data;
 	/* if there are no tasks block */
 	/* wake people waiting for a task */
 	struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
 
 	struct starpu_sched_ctx_iterator it;
-	const unsigned ncombined_workers = starpu_combined_worker_get_count();
 	workers->init_iterator(workers, &it);
 	while(workers->has_next(workers, &it))
 	{
-		int worker = workers->get_next(workers, &it);
-		/* If this is not a CPU or a MIC, then the worker simply grabs tasks from the fifo */
-		if (starpu_worker_is_combined_worker(worker))
+		int workerid = workers->get_next(workers, &it);
+		/* If this is not a CPU or a MIC, then the workerid simply grabs tasks from the fifo */
+		if (starpu_worker_is_combined_worker(workerid))
 		{
 			continue;
 		}
-		if (starpu_worker_get_type(worker) != STARPU_MIC_WORKER
-				&& starpu_worker_get_type(worker) != STARPU_CPU_WORKER)
+		if (starpu_worker_get_type(workerid) != STARPU_MIC_WORKER
+				&& starpu_worker_get_type(workerid) != STARPU_CPU_WORKER)
 		{
-			_starpu_wake_worker_relax_light(worker);
+			_starpu_wake_worker_relax_light(workerid);
 			continue;
 		}
-		if ((!is_parallel_task) /* This is not a parallel task, can wake any worker */
-				|| (ncombined_workers == 0) /* There is no combined worker */
-				|| (data->max_combination_size[worker] > 1) /* This is a combined worker master and the task is parallel */
+		if ((!is_parallel_task) /* This is not a parallel task, can wake any workerid */
+				|| (common_data->no_combined_workers) /* There is no combined workerid */
+				|| (common_data->max_combination_size[workerid] > 1) /* This is a combined workerid master and the task is parallel */
 		   )
 		{
-			_starpu_wake_worker_relax_light(worker);
+			_starpu_wake_worker_relax_light(workerid);
 		}
 	}
 #endif
@@ -175,6 +241,7 @@ static int push_task_peager_policy(struct starpu_task *task)
 
 static struct starpu_task *pop_task_peager_policy(unsigned sched_ctx_id)
 {
+	struct _starpu_peager_common_data *common_data = _peager_common_data;
 	struct _starpu_peager_data *data = (struct _starpu_peager_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 
 	int workerid = starpu_worker_get_id_check();
@@ -192,7 +259,6 @@ static struct starpu_task *pop_task_peager_policy(unsigned sched_ctx_id)
 		return task;
 	}
 
-	const unsigned ncombined_workers = starpu_combined_worker_get_count();
 	struct starpu_task *task = NULL;
 	int slave_task = 0;
 	_starpu_worker_relax_on();
@@ -211,7 +277,7 @@ static struct starpu_task *pop_task_peager_policy(unsigned sched_ctx_id)
 
 #if 1
 		/* Optional heuristic to filter out purely slave workers for parallel tasks */
-		if (task && task->cl && task->cl->max_parallelism > 1 && data->max_combination_size[workerid] == 1 && ncombined_workers > 0)
+		if (task && task->cl && task->cl->max_parallelism > 1 && common_data->max_combination_size[workerid] > 1 && !common_data->no_combined_workers)
 		{
 			/* task is potentially parallel, leave it for a combined worker master */
 			_starpu_fifo_push_back_task(data->fifo, task);
@@ -233,14 +299,14 @@ static struct starpu_task *pop_task_peager_policy(unsigned sched_ctx_id)
 	int best_size = -1;
 	int best_workerid = -1;
 	int i;
-	for (i = 0; i < data->possible_combinations_cnt[workerid]; i++)
+	for (i = 0; i < common_data->possible_combinations_cnt[workerid]; i++)
 	{
-		if (data->possible_combinations_size[workerid][i] > best_size)
+		if (common_data->possible_combinations_size[workerid][i] > best_size)
 		{
-			int combined_worker = data->possible_combinations[workerid][i];
+			int combined_worker = common_data->possible_combinations[workerid][i];
 			if (starpu_combined_worker_can_execute_task(combined_worker, task, 0))
 			{
-				best_size = data->possible_combinations_size[workerid][i];
+				best_size = common_data->possible_combinations_size[workerid][i];
 				best_workerid = combined_worker;
 			}
 		}