Sfoglia il codice sorgente

Add modular-eager-prio scheduler

Samuel Thibault 5 anni fa
parent
commit
dfd3c72b8b

+ 5 - 2
doc/doxygen/chapters/320_scheduling.doxy

@@ -208,12 +208,15 @@ pre-defined Modularized Schedulers :
 - Eager-based Schedulers (with/without prefetching : \c modular-eager ,
 \c modular-eager-prefetching) : \n
 Naive scheduler, which tries to map a task on the first available resource
-it finds.
+it finds. The prefecthing variant queues several tasks in advance to be able to
+do data prefetching. This may however degrade load balancing a bit.
 
 - Prio-based Schedulers (with/without prefetching :
-\c modular-prio, \c modular-prio-prefetching) : \n
+\c modular-prio, \c modular-prio-prefetching , \c modular-eager-prio) : \n
 Similar to Eager-Based Schedulers. Can handle tasks which have a defined
 priority and schedule them accordingly.
+The \c modular-eager-prio variant integrates the eager and priority queue in a
+single component. This allows it to do a better job at pushing tasks.
 
 - Random-based Schedulers (with/without prefetching: \c modular-random,
 \c modular-random-prio, \c modular-random-prefetching, \c

+ 12 - 0
include/starpu_sched_component.h

@@ -494,6 +494,18 @@ int starpu_sched_component_is_random(struct starpu_sched_component *);
 struct starpu_sched_component *starpu_sched_component_eager_create(struct starpu_sched_tree *tree, void *arg) STARPU_ATTRIBUTE_MALLOC;
 int starpu_sched_component_is_eager(struct starpu_sched_component *);
 
+/** @} */
+
+/**
+   @name Resource-mapping Eager Prio Component API
+   @{
+*/
+
+struct starpu_sched_component *starpu_sched_component_eager_prio_create(struct starpu_sched_tree *tree, void *arg) STARPU_ATTRIBUTE_MALLOC;
+int starpu_sched_component_is_eager_prio(struct starpu_sched_component *);
+
+/** @} */
+
 /**
    @name Resource-mapping Eager-Calibration Component API
    @{

+ 2 - 0
src/Makefile.am

@@ -283,6 +283,7 @@ libstarpu_@STARPU_EFFECTIVE_VERSION@_la_SOURCES = 		\
 	sched_policies/component_prio.c 				\
 	sched_policies/component_random.c				\
 	sched_policies/component_eager.c				\
+	sched_policies/component_eager_prio.c				\
 	sched_policies/component_eager_calibration.c				\
 	sched_policies/component_mct.c				\
 	sched_policies/component_heft.c				\
@@ -291,6 +292,7 @@ libstarpu_@STARPU_EFFECTIVE_VERSION@_la_SOURCES = 		\
 	sched_policies/component_composed.c				\
 	sched_policies/component_work_stealing.c				\
 	sched_policies/modular_eager.c				\
+	sched_policies/modular_eager_prio.c				\
 	sched_policies/modular_eager_prefetching.c				\
 	sched_policies/modular_gemm.c				\
 	sched_policies/modular_prio.c				\

+ 1 - 0
src/core/sched_policy.c

@@ -56,6 +56,7 @@ static struct starpu_sched_policy *predefined_policies[] =
 {
 	&_starpu_sched_modular_eager_policy,
 	&_starpu_sched_modular_eager_prefetching_policy,
+	&_starpu_sched_modular_eager_prio_policy,
 	&_starpu_sched_modular_gemm_policy,
 	&_starpu_sched_modular_prio_policy,
 	&_starpu_sched_modular_prio_prefetching_policy,

+ 1 - 0
src/core/sched_policy.h

@@ -93,6 +93,7 @@ extern struct starpu_sched_policy _starpu_sched_peager_policy;
 extern struct starpu_sched_policy _starpu_sched_heteroprio_policy;
 extern struct starpu_sched_policy _starpu_sched_modular_eager_policy;
 extern struct starpu_sched_policy _starpu_sched_modular_eager_prefetching_policy;
+extern struct starpu_sched_policy _starpu_sched_modular_eager_prio_policy;
 extern struct starpu_sched_policy _starpu_sched_modular_gemm_policy;
 extern struct starpu_sched_policy _starpu_sched_modular_prio_policy;
 extern struct starpu_sched_policy _starpu_sched_modular_prio_prefetching_policy;

+ 2 - 0
src/sched_policies/component_eager.c

@@ -55,6 +55,7 @@ static int eager_push_task(struct starpu_sched_component * component, struct sta
 		}
 	}
 
+	/* FIXME: should rather just loop over children before looping over its workers */
 	int workerid;
 	for(workerid = starpu_bitmap_first(component->workers_in_ctx);
 	    workerid != -1;
@@ -63,6 +64,7 @@ static int eager_push_task(struct starpu_sched_component * component, struct sta
 		int nimpl;
 		for(nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
 		{
+			/* FIXME: use starpu_worker_can_execute_task_first_impl instead */
 			if(starpu_worker_can_execute_task(workerid,task,nimpl)
 			   || starpu_combined_worker_can_execute_task(workerid, task, nimpl))
 			{

+ 165 - 0
src/sched_policies/component_eager_prio.c

@@ -0,0 +1,165 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2013,2017,2018                           Inria
+ * Copyright (C) 2014-2018                                CNRS
+ * Copyright (C) 2013-2019                                Université de Bordeaux
+ * Copyright (C) 2013                                     Simon Archipoff
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+/* eager component which has its own priority queue. It can thus eagerly push
+ * tasks to lower queues without having to wait for being pulled from.  */
+
+#include <starpu_sched_component.h>
+#include "prio_deque.h"
+#include <starpu_perfmodel.h>
+#include <float.h>
+#include <core/sched_policy.h>
+#include <core/task.h>
+
+struct _starpu_eager_prio_data
+{
+	struct _starpu_prio_deque prio;
+	starpu_pthread_mutex_t mutex;
+};
+
+static int eager_prio_progress_one(struct starpu_sched_component *component)
+{
+	struct _starpu_eager_prio_data * data = component->data;
+	starpu_pthread_mutex_t * mutex = &data->mutex;
+	struct _starpu_prio_deque * prio = &data->prio;
+	struct starpu_task *task;
+	int ret;
+
+	STARPU_COMPONENT_MUTEX_LOCK(mutex);
+	task = _starpu_prio_deque_pop_task(prio);
+	STARPU_COMPONENT_MUTEX_UNLOCK(mutex);
+
+	if (!task)
+	{
+		return 1;
+	}
+
+	/* FIXME: should rather just loop over children before looping over its workers */
+	int workerid;
+	for(workerid = starpu_bitmap_first(component->workers_in_ctx);
+	    workerid != -1;
+	    workerid = starpu_bitmap_next(component->workers_in_ctx, workerid))
+	{
+		int nimpl;
+		for(nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
+		{
+			/* FIXME: use starpu_worker_can_execute_task_first_impl instead */
+			if(starpu_worker_can_execute_task(workerid,task,nimpl)
+			   || starpu_combined_worker_can_execute_task(workerid, task, nimpl))
+			{
+				unsigned i;
+				for (i = 0; i < component->nchildren; i++)
+				{
+					int idworker;
+					for(idworker = starpu_bitmap_first(component->children[i]->workers);
+						idworker != -1;
+						idworker = starpu_bitmap_next(component->children[i]->workers, idworker))
+					{
+						if (idworker == workerid)
+						{
+							STARPU_ASSERT(!starpu_sched_component_is_worker(component->children[i]));
+							ret = starpu_sched_component_push_task(component,component->children[i],task);
+							if (!ret)
+								return 0;
+						}
+					}
+				}
+			}
+		}
+	}
+
+	/* Could not push to child actually, push that one back too */
+	STARPU_COMPONENT_MUTEX_LOCK(mutex);
+	_starpu_prio_deque_push_front_task(prio, task);
+	STARPU_COMPONENT_MUTEX_UNLOCK(mutex);
+	return 1;
+}
+
+/* Try to push some tasks below */
+static void eager_prio_progress(struct starpu_sched_component *component)
+{
+	STARPU_ASSERT(component && starpu_sched_component_is_eager_prio(component));
+	while (!eager_prio_progress_one(component))
+		;
+}
+
+static int eager_prio_push_task(struct starpu_sched_component * component, struct starpu_task * task)
+{
+	STARPU_ASSERT(component && task && starpu_sched_component_is_eager_prio(component));
+	struct _starpu_eager_prio_data * data = component->data;
+	struct _starpu_prio_deque * prio = &data->prio;
+	starpu_pthread_mutex_t * mutex = &data->mutex;
+
+	STARPU_COMPONENT_MUTEX_LOCK(mutex);
+	_starpu_prio_deque_push_back_task(prio,task);
+	STARPU_COMPONENT_MUTEX_UNLOCK(mutex);
+
+	eager_prio_progress(component);
+
+	return 0;
+}
+
+static int eager_prio_can_push(struct starpu_sched_component *component, struct starpu_sched_component * to STARPU_ATTRIBUTE_UNUSED)
+{
+	eager_prio_progress(component);
+	int ret = 0;
+	unsigned j;
+	for(j=0; j < component->nparents; j++)
+	{
+		if(component->parents[j] == NULL)
+			continue;
+		else
+		{
+			ret = component->parents[j]->can_push(component->parents[j], component);
+			if(ret)
+				break;
+		}
+	}
+	return ret;
+}
+
+static void eager_prio_component_deinit_data(struct starpu_sched_component * component)
+{
+	STARPU_ASSERT(starpu_sched_component_is_eager_prio(component));
+	struct _starpu_eager_prio_data * d = component->data;
+	_starpu_prio_deque_destroy(&d->prio);
+	free(d);
+}
+
+int starpu_sched_component_is_eager_prio(struct starpu_sched_component * component)
+{
+	return component->push_task == eager_prio_push_task;
+}
+
+struct starpu_sched_component * starpu_sched_component_eager_prio_create(struct starpu_sched_tree *tree)
+{
+	struct starpu_sched_component * component = starpu_sched_component_create(tree, "eager_prio");
+	struct _starpu_eager_prio_data *data;
+	_STARPU_MALLOC(data, sizeof(*data));
+
+	_starpu_prio_deque_init(&data->prio);
+	STARPU_PTHREAD_MUTEX_INIT(&data->mutex,NULL);
+	component->data = data;
+
+	component->push_task = eager_prio_push_task;
+	component->can_push = eager_prio_can_push;
+	component->deinit_data = eager_prio_component_deinit_data;
+
+	return component;
+}

+ 54 - 0
src/sched_policies/modular_eager_prio.c

@@ -0,0 +1,54 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2013-2015,2017                           Inria
+ * Copyright (C) 2014,2015,2017                           CNRS
+ * Copyright (C) 2013-2015,2017,2018-2019                 Université de Bordeaux
+ * Copyright (C) 2013                                     Simon Archipoff
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <starpu_sched_component.h>
+#include <starpu_scheduler.h>
+#include <float.h>
+#include <limits.h>
+
+static void initialize_eager_prio_center_policy(unsigned sched_ctx_id)
+{
+	starpu_sched_component_initialize_simple_scheduler((starpu_sched_component_create_t) starpu_sched_component_eager_prio_create, NULL,
+			STARPU_SCHED_SIMPLE_DECIDE_WORKERS |
+			STARPU_SCHED_SIMPLE_FIFOS_BELOW |
+			STARPU_SCHED_SIMPLE_FIFOS_BELOW_PRIO |
+			STARPU_SCHED_SIMPLE_IMPL, sched_ctx_id);
+}
+
+static void deinitialize_eager_prio_center_policy(unsigned sched_ctx_id)
+{
+	struct starpu_sched_tree *t = (struct starpu_sched_tree*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
+	starpu_sched_tree_destroy(t);
+}
+
+struct starpu_sched_policy _starpu_sched_modular_eager_prio_policy =
+{
+	.init_sched = initialize_eager_prio_center_policy,
+	.deinit_sched = deinitialize_eager_prio_center_policy,
+	.add_workers = starpu_sched_tree_add_workers,
+	.remove_workers = starpu_sched_tree_remove_workers,
+	.push_task = starpu_sched_tree_push_task,
+	.pop_task = starpu_sched_tree_pop_task,
+	.pre_exec_hook = starpu_sched_component_worker_pre_exec_hook,
+	.post_exec_hook = starpu_sched_component_worker_post_exec_hook,
+	.pop_every_task = NULL,
+	.policy_name = "modular-eager-prio",
+	.policy_description = "eager-prio modular policy",
+	.worker_type = STARPU_WORKER_LIST,
+};