瀏覽代碼

allow passing the pointer of a scheduler to a context

Andra Hugo 11 年之前
父節點
當前提交
5fa6ed1bb7

+ 3 - 0
examples/Makefile.am

@@ -49,6 +49,7 @@ EXTRA_DIST = 					\
 	sched_ctx_utils/sched_ctx_utils.c			\
 	sched_ctx/sched_ctx.c					\
 	sched_ctx/parallel_code.c				\
+	sched_ctx/dummy_sched_with_ctx.c			\
 	incrementer/incrementer_kernels_opencl_kernel.cl 	\
 	basic_examples/variable_kernels_opencl_kernel.cl	\
 	matvecmult/matvecmult_kernel.cl				\
@@ -188,6 +189,7 @@ examplebin_PROGRAMS +=				\
 	profiling/profiling			\
 	sched_ctx/sched_ctx			\
 	sched_ctx/parallel_code			\
+	sched_ctx/dummy_sched_with_ctx		\
 	reductions/dot_product			\
 	reductions/minmax_reduction		\
 	mandelbrot/mandelbrot			\
@@ -262,6 +264,7 @@ STARPU_EXAMPLES +=				\
 	scheduler/dummy_sched			\
 	sched_ctx/sched_ctx			\
 	sched_ctx/parallel_code			\
+	sched_ctx/dummy_sched_with_ctx		\
 	reductions/dot_product			\
 	reductions/minmax_reduction
 

+ 183 - 0
examples/sched_ctx/dummy_sched_with_ctx.c

@@ -0,0 +1,183 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2010-2013  Université de Bordeaux 1
+ * Copyright (C) 2010-2013  Centre National de la Recherche Scientifique
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <starpu.h>
+#include <starpu_scheduler.h>
+
+#define NTASKS	32000
+#define FPRINTF(ofile, fmt, ...) do { if (!getenv("STARPU_SSILENT")) {fprintf(ofile, fmt, ## __VA_ARGS__); }} while(0)
+
+struct dummy_sched_data
+{
+	struct starpu_task_list sched_list;
+     	starpu_pthread_mutex_t policy_mutex;
+};
+
+static void init_dummy_sched(unsigned sched_ctx_id)
+{
+	starpu_sched_ctx_create_worker_collection(sched_ctx_id, STARPU_WORKER_LIST);
+
+	struct dummy_sched_data *data = (struct dummy_sched_data*)malloc(sizeof(struct dummy_sched_data));
+
+
+	/* Create a linked-list of tasks and a condition variable to protect it */
+	starpu_task_list_init(&data->sched_list);
+
+	starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)data);
+
+	starpu_pthread_mutex_init(&data->policy_mutex, NULL);
+	FPRINTF(stderr, "Initialising Dummy scheduler\n");
+}
+
+static void deinit_dummy_sched(unsigned sched_ctx_id)
+{
+	struct dummy_sched_data *data = (struct dummy_sched_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
+
+	STARPU_ASSERT(starpu_task_list_empty(&data->sched_list));
+
+	starpu_sched_ctx_delete_worker_collection(sched_ctx_id);
+
+	starpu_pthread_mutex_destroy(&data->policy_mutex);
+
+	free(data);
+
+	FPRINTF(stderr, "Destroying Dummy scheduler\n");
+}
+
+static int push_task_dummy(struct starpu_task *task)
+{
+	unsigned sched_ctx_id = task->sched_ctx;
+	struct dummy_sched_data *data = (struct dummy_sched_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
+
+	/* NB: In this simplistic strategy, we assume that the context in which
+	   we push task has at least one worker*/
+
+
+	/* lock all workers when pushing tasks on a list where all
+	   of them would pop for tasks */
+        starpu_pthread_mutex_lock(&data->policy_mutex);
+
+	starpu_task_list_push_front(&data->sched_list, task);
+
+	starpu_push_task_end(task);
+	starpu_pthread_mutex_unlock(&data->policy_mutex);
+
+
+        /*if there are no tasks block */
+        /* wake people waiting for a task */
+        unsigned worker = 0;
+	struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
+
+        struct starpu_sched_ctx_iterator it;
+        if(workers->init_iterator)
+		workers->init_iterator(workers, &it);
+
+	while(workers->has_next(workers, &it))
+        {
+                worker = workers->get_next(workers, &it);
+		starpu_pthread_mutex_t *sched_mutex;
+                starpu_pthread_cond_t *sched_cond;
+                starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
+		starpu_pthread_mutex_lock(sched_mutex);
+                starpu_pthread_cond_signal(sched_cond);
+                starpu_pthread_mutex_unlock(sched_mutex);
+        }
+
+	return 0;
+}
+
+/* The mutex associated to the calling worker is already taken by StarPU */
+static struct starpu_task *pop_task_dummy(unsigned sched_ctx_id)
+{
+	/* NB: In this simplistic strategy, we assume that all workers are able
+	 * to execute all tasks, otherwise, it would have been necessary to go
+	 * through the entire list until we find a task that is executable from
+	 * the calling worker. So we just take the head of the list and give it
+	 * to the worker. */
+	struct dummy_sched_data *data = (struct dummy_sched_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
+	starpu_pthread_mutex_lock(&data->policy_mutex);
+	struct starpu_task *task = starpu_task_list_pop_back(&data->sched_list);
+	starpu_pthread_mutex_unlock(&data->policy_mutex);
+	return task;
+}
+
+static struct starpu_sched_policy dummy_sched_policy =
+{
+	.init_sched = init_dummy_sched,
+	.add_workers = NULL,
+	.remove_workers = NULL,
+	.deinit_sched = deinit_dummy_sched,
+	.push_task = push_task_dummy,
+	.pop_task = pop_task_dummy,
+	.post_exec_hook = NULL,
+	.pop_every_task = NULL,
+	.policy_name = "dummy",
+	.policy_description = "dummy scheduling strategy"
+};
+
+static void dummy_func(void *descr[] STARPU_ATTRIBUTE_UNUSED, void *arg STARPU_ATTRIBUTE_UNUSED)
+{
+}
+
+static struct starpu_codelet dummy_codelet =
+{
+	.cpu_funcs = {dummy_func, NULL},
+	.cuda_funcs = {dummy_func, NULL},
+        .opencl_funcs = {dummy_func, NULL},
+	.model = NULL,
+	.nbuffers = 0,
+	.name = "dummy",
+};
+
+
+int main(int argc, char **argv)
+{
+	int ntasks = NTASKS;
+	int ret;
+/* 	struct starpu_conf conf; */
+
+/* 	starpu_conf_init(&conf); */
+/* 	conf.sched_policy = &dummy_sched_policy, */
+	ret = starpu_init(NULL);
+	if (ret == -ENODEV)
+		return 77;
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
+
+	unsigned sched_ctx = starpu_sched_ctx_create_with_custom_policy(&dummy_sched_policy, NULL, -1, "dummy");
+#ifdef STARPU_QUICK_CHECK
+	ntasks /= 100;
+#endif
+
+	starpu_sched_ctx_set_context(&sched_ctx);
+	int i;
+	for (i = 0; i < ntasks; i++)
+	{
+		struct starpu_task *task = starpu_task_create();
+
+		task->cl = &dummy_codelet;
+		task->cl_arg = NULL;
+
+		ret = starpu_task_submit(task);
+		STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");
+	}
+
+	starpu_task_wait_for_all();
+
+	starpu_shutdown();
+
+	return 0;
+}

+ 5 - 0
include/starpu_sched_ctx.h

@@ -24,8 +24,13 @@ extern "C"
 {
 #endif
 
+
 unsigned starpu_sched_ctx_create(const char *policy_name, int *workerids_ctx, int nworkers_ctx, const char *sched_ctx_name);
 
+#ifdef __STARPU_SCHEDULER_H__
+unsigned starpu_sched_ctx_create_with_custom_policy(struct starpu_sched_policy *policy, int *workerids, int nworkers, const char *sched_name);
+#endif
+
 unsigned starpu_sched_ctx_create_inside_interval(const char *policy_name, const char *sched_name, int min_ncpus, int max_ncpus, int min_ngpus, int max_ngpus, unsigned allow_overlap);
 
 void starpu_sched_ctx_add_workers(int *workerids_ctx, int nworkers_ctx, unsigned sched_ctx_id);

+ 24 - 5
src/core/sched_ctx.c

@@ -163,7 +163,7 @@ static void _starpu_add_workers_to_sched_ctx(struct _starpu_sched_ctx *sched_ctx
 			workers->add(workers, worker);
 			workers_to_add[i] = worker;
 		}
-}
+	}
 
 	if(sched_ctx->sched_policy->add_workers)
 	{
@@ -242,7 +242,7 @@ static void _starpu_sched_ctx_create_hwloc_tree(struct _starpu_sched_ctx *sched_
 }
 #endif
 
-struct _starpu_sched_ctx*  _starpu_create_sched_ctx(const char *policy_name, int *workerids,
+struct _starpu_sched_ctx*  _starpu_create_sched_ctx(struct starpu_sched_policy *policy, int *workerids,
 				  int nworkers_ctx, unsigned is_initial_sched,
 				  const char *sched_name)
 {
@@ -279,7 +279,7 @@ struct _starpu_sched_ctx*  _starpu_create_sched_ctx(const char *policy_name, int
 	_starpu_barrier_counter_init(&sched_ctx->tasks_barrier, 0);
 
 	/*init the strategy structs and the worker_collection of the ressources of the context */
-	_starpu_init_sched_policy(config, sched_ctx, policy_name);
+	_starpu_init_sched_policy(config, sched_ctx, policy);
 
 	/* construct the collection of workers(list/tree/etc.) */
 	sched_ctx->workers->init(sched_ctx->workers);
@@ -437,6 +437,9 @@ unsigned starpu_sched_ctx_create_inside_interval(const char *policy_name, const
 						 int min_ncpus, int max_ncpus, int min_ngpus, int max_ngpus,
 						 unsigned allow_overlap)
 {
+	struct _starpu_machine_config *config = (struct _starpu_machine_config *)_starpu_get_machine_config();
+	struct starpu_sched_policy *selected_policy = _starpu_select_sched_policy(config, policy_name);
+
 	struct _starpu_sched_ctx *sched_ctx = NULL;
 	int workers[max_ncpus + max_ngpus];
 	int nw = 0;
@@ -449,7 +452,7 @@ unsigned starpu_sched_ctx_create_inside_interval(const char *policy_name, const
 	for(i = 0; i < nw; i++)
 		printf("%d ", workers[i]);
 	printf("\n");
-	sched_ctx = _starpu_create_sched_ctx(policy_name, workers, nw, 0, sched_name);
+	sched_ctx = _starpu_create_sched_ctx(selected_policy, workers, nw, 0, sched_name);
 	sched_ctx->min_ncpus = min_ncpus;
 	sched_ctx->max_ncpus = max_ncpus;
 	sched_ctx->min_ngpus = min_ngpus;
@@ -462,11 +465,27 @@ unsigned starpu_sched_ctx_create_inside_interval(const char *policy_name, const
 	return sched_ctx->id;
 
 }
+
 unsigned starpu_sched_ctx_create(const char *policy_name, int *workerids,
 				 int nworkers, const char *sched_name)
 {
+	struct _starpu_machine_config *config = (struct _starpu_machine_config *)_starpu_get_machine_config();
+	struct starpu_sched_policy *selected_policy = _starpu_select_sched_policy(config, policy_name);
+
+	struct _starpu_sched_ctx *sched_ctx = NULL;
+	sched_ctx = _starpu_create_sched_ctx(selected_policy, workerids, nworkers, 0, sched_name);
+
+	_starpu_update_workers_with_ctx(sched_ctx->workers->workerids, sched_ctx->workers->nworkers, sched_ctx->id);
+#ifdef STARPU_USE_SC_HYPERVISOR
+	sched_ctx->perf_counters = NULL;
+#endif
+	return sched_ctx->id;
+}
+
+unsigned starpu_sched_ctx_create_with_custom_policy(struct starpu_sched_policy *policy, int *workerids, int nworkers, const char *sched_name)
+{
 	struct _starpu_sched_ctx *sched_ctx = NULL;
-	sched_ctx = _starpu_create_sched_ctx(policy_name, workerids, nworkers, 0, sched_name);
+	sched_ctx = _starpu_create_sched_ctx(policy, workerids, nworkers, 0, sched_name);
 
 	_starpu_update_workers_with_ctx(sched_ctx->workers->workerids, sched_ctx->workers->nworkers, sched_ctx->id);
 #ifdef STARPU_USE_SC_HYPERVISOR

+ 1 - 1
src/core/sched_ctx.h

@@ -124,7 +124,7 @@ void _starpu_init_sched_ctx_for_worker(unsigned workerid);
 void _starpu_delete_sched_ctx_for_worker(unsigned workerid);
 
 /* allocate all structures belonging to a context */
-struct _starpu_sched_ctx*  _starpu_create_sched_ctx(const char *policy_name, int *workerid, int nworkerids, unsigned is_init_sched, const char *sched_name);
+struct _starpu_sched_ctx*  _starpu_create_sched_ctx(struct starpu_sched_policy *policy, int *workerid, int nworkerids, unsigned is_init_sched, const char *sched_name);
 
 /* delete all sched_ctx */
 void _starpu_delete_all_sched_ctxs();

+ 2 - 5
src/core/sched_policy.c

@@ -126,7 +126,7 @@ static void display_sched_help_message(void)
 	 }
 }
 
-static struct starpu_sched_policy *select_sched_policy(struct _starpu_machine_config *config, const char *required_policy)
+struct starpu_sched_policy *_starpu_select_sched_policy(struct _starpu_machine_config *config, const char *required_policy)
 {
 	struct starpu_sched_policy *selected_policy = NULL;
 	struct starpu_conf *user_conf = config->conf;
@@ -155,7 +155,7 @@ static struct starpu_sched_policy *select_sched_policy(struct _starpu_machine_co
 	return &_starpu_sched_eager_policy;
 }
 
-void _starpu_init_sched_policy(struct _starpu_machine_config *config, struct _starpu_sched_ctx *sched_ctx, const char *required_policy)
+void _starpu_init_sched_policy(struct _starpu_machine_config *config, struct _starpu_sched_ctx *sched_ctx, struct starpu_sched_policy *selected_policy)
 {
 	/* Perhaps we have to display some help */
 	display_sched_help_message();
@@ -168,9 +168,6 @@ void _starpu_init_sched_policy(struct _starpu_machine_config *config, struct _st
 	/* Set calibrate flag */
 	_starpu_set_calibrate_flag(config->conf->calibrate);
 
-	struct starpu_sched_policy *selected_policy;
-	selected_policy = select_sched_policy(config, required_policy);
-
 	load_sched_policy(selected_policy, sched_ctx);
 
 	sched_ctx->sched_policy->init_sched(sched_ctx->id);

+ 3 - 1
src/core/sched_policy.h

@@ -27,10 +27,12 @@ struct starpu_machine_config;
 struct starpu_sched_policy *_starpu_get_sched_policy( struct _starpu_sched_ctx *sched_ctx);
 
 void _starpu_init_sched_policy(struct _starpu_machine_config *config,
-			       struct _starpu_sched_ctx *sched_ctx, const char *required_policy);
+			       struct _starpu_sched_ctx *sched_ctx, struct starpu_sched_policy *policy);
 
 void _starpu_deinit_sched_policy(struct _starpu_sched_ctx *sched_ctx);
 
+struct starpu_sched_policy *_starpu_select_sched_policy(struct _starpu_machine_config *config, const char *required_policy);
+
 int _starpu_push_task(struct _starpu_job *task);
 
 /* actually pushes the tasks to the specific worker or to the scheduler */

+ 5 - 1
src/core/workers.c

@@ -1002,7 +1002,11 @@ int starpu_initialize(struct starpu_conf *user_conf, int *argc, char ***argv)
 	_starpu_initialize_current_task_key();
 
 	if (!is_a_sink)
-		_starpu_create_sched_ctx(config.conf->sched_policy_name, NULL, -1, 1, "init");
+	{
+		struct starpu_sched_policy *selected_policy = _starpu_select_sched_policy(&config, config.conf->sched_policy_name);
+		_starpu_create_sched_ctx(selected_policy, NULL, -1, 1, "init");
+
+	}
 
 	_starpu_initialize_registered_performance_models();