Browse Source

allow submitting to several ctxs from 1 th + small example

Andra Hugo 12 years ago
parent
commit
b74fc182bb

+ 1 - 0
Makefile.am

@@ -70,6 +70,7 @@ versinclude_HEADERS = 				\
 	include/starpu_profiling.h		\
 	include/starpu_bound.h			\
 	include/starpu_scheduler.h		\
+	include/starpu_sched_ctx.h		\
 	include/starpu_top.h			\
 	include/starpu_deprecated_api.h         \
 	include/starpu_hash.h			\

+ 3 - 0
examples/Makefile.am

@@ -48,6 +48,7 @@ EXTRA_DIST = 					\
 	lu/xlu_kernels.c			\
 	lu/lu_example.c				\
 	sched_ctx_utils/sched_ctx_utils.c		\
+	sched_ctx/sched_ctx.c		\
 	incrementer/incrementer_kernels_opencl_kernel.cl 	\
 	basic_examples/variable_kernels_opencl_kernel.cl	\
 	matvecmult/matvecmult_kernel.cl				\
@@ -186,6 +187,7 @@ examplebin_PROGRAMS +=				\
 	matvecmult/matvecmult			\
 	profiling/profiling			\
 	scheduler/dummy_sched			\
+	sched_ctx/sched_ctx			\
 	reductions/dot_product			\
 	reductions/minmax_reduction		\
 	ppm_downscaler/ppm_downscaler		\
@@ -252,6 +254,7 @@ STARPU_EXAMPLES +=				\
 	matvecmult/matvecmult			\
 	profiling/profiling			\
 	scheduler/dummy_sched			\
+	sched_ctx/sched_ctx				\
 	reductions/dot_product			\
 	reductions/minmax_reduction
 

+ 112 - 0
examples/sched_ctx/sched_ctx.c

@@ -0,0 +1,112 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2010-2012  Université de Bordeaux 1
+ * Copyright (C) 2010-2012  Centre National de la Recherche Scientifique
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include<starpu.h>
+#include<pthread.h>
+
+#define NTASKS 1000
+int tasks_executed = 0;
+pthread_mutex_t mut;
+static void sched_ctx_func(void *descr[] __attribute__ ((unused)), void *arg __attribute__ ((unused)))
+{
+	pthread_mutex_lock(&mut);
+	tasks_executed++;
+	pthread_mutex_unlock(&mut);
+}
+
+static struct starpu_codelet sched_ctx_codelet =
+{
+	.where = STARPU_CPU|STARPU_CUDA|STARPU_OPENCL,
+	.cpu_funcs = {sched_ctx_func, NULL},
+	.cuda_funcs = {sched_ctx_func, NULL},
+    .opencl_funcs = {sched_ctx_func, NULL},
+	.model = NULL,
+	.nbuffers = 0
+};
+
+
+int main(int argc, char **argv)
+{
+	int ntasks = NTASKS;
+	int ret;
+	struct starpu_conf conf;
+
+	ret = starpu_init(NULL);
+	if (ret == -ENODEV)
+		return 77;
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
+
+#ifdef STARPU_QUICK_CHECK
+	ntasks /= 100;
+#endif
+
+	pthread_mutex_init(&mut, NULL);
+
+    int cpus[9];
+    int ncpus = starpu_worker_get_ids_by_type(STARPU_CPU_WORKER, cpus, 9);
+
+    int cudadevs[3];
+    int ncuda = starpu_worker_get_ids_by_type(STARPU_CUDA_WORKER, cudadevs, 3);
+
+
+	int nprocs1 = 9;
+	int nprocs2 = 3;
+
+	int procs1[nprocs1];
+	int procs2[nprocs2];
+
+	int k;
+	for(k = 0; k < nprocs1; k++)
+	{
+		if(k < ncpus)
+			procs1[k] = cpus[k];
+		else
+			procs1[k] = cudadevs[k-ncpus];
+	}
+
+	int j = 0;
+	for(k = nprocs1; k < nprocs1+nprocs2; k++)
+		procs2[j++] = cudadevs[k-ncpus];
+
+
+	unsigned sched_ctx1 = starpu_create_sched_ctx("heft", procs1, nprocs1, "ctx1");
+	unsigned sched_ctx2 = starpu_create_sched_ctx("heft", procs2, nprocs2, "ctx2");
+
+//	starpu_set_sched_ctx(NULL);
+	unsigned i;
+	for (i = 0; i < ntasks; i++)
+	{
+		struct starpu_task *task = starpu_task_create();
+	
+		task->cl = &sched_ctx_codelet;
+		task->cl_arg = NULL;
+	
+		if(i % 2 == 0)
+			ret = starpu_task_submit_to_ctx(task,sched_ctx1);
+		else
+			ret = starpu_task_submit_to_ctx(task,sched_ctx2);
+
+		STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");
+	}
+
+	starpu_task_wait_for_all();
+
+	printf("tasks executed %d out of %d\n", tasks_executed, ntasks);
+	starpu_shutdown();
+
+	return 0;
+}

+ 1 - 0
include/starpu.h

@@ -51,6 +51,7 @@ typedef unsigned long long uint64_t;
 #endif
 #include <starpu_task_util.h>
 #include <starpu_scheduler.h>
+#include <starpu_sched_ctx.h>
 #include <starpu_expert.h>
 #include <starpu_rand.h>
 #include <starpu_cuda.h>

+ 124 - 0
include/starpu_sched_ctx.h

@@ -0,0 +1,124 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2010 - 2012  INRIA
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#ifndef __STARPU_SCHED_CTX_H__
+#define __STARPU_SCHED_CTX_H__
+
+#include <starpu.h>
+
+/* generic structure used by the scheduling contexts to iterated the workers */
+struct worker_collection {
+	/* hidden data structure used to memorize the workers */
+	void *workerids;
+	/* the number of workers in the collection */
+	unsigned nworkers;
+	/* the current cursor of the collection*/
+	pthread_key_t cursor_key;
+	/* the type of structure (WORKER_LIST,...) */
+	int type;
+	/* checks if there is another element in collection */
+	unsigned (*has_next)(struct worker_collection *workers);
+	/* return the next element in the collection */
+	int (*get_next)(struct worker_collection *workers);
+	/* add a new element in the collection */
+	int (*add)(struct worker_collection *workers, int worker);
+	/* remove an element from the collection */
+	int (*remove)(struct worker_collection *workers, int worker);
+	/* initialize the structure */
+	void* (*init)(struct worker_collection *workers);
+	/* free the structure */
+	void (*deinit)(struct worker_collection *workers);
+	/* initialize the cursor if there is one */
+	void (*init_cursor)(struct worker_collection *workers);
+	/* free the cursor if there is one */
+	void (*deinit_cursor)(struct worker_collection *workers);
+};
+
+/* types of structures the worker collection can implement */
+#define WORKER_LIST 0
+
+struct starpu_performance_counters {
+	void (*notify_idle_cycle)(unsigned sched_ctx, int worker, double idle_time);
+	void (*notify_idle_end)(unsigned sched_ctx, int worker);
+	void (*notify_pushed_task)(unsigned sched_ctx, int worker);
+	void (*notify_poped_task)(unsigned sched_ctx, int worker, double flops);
+	void (*notify_post_exec_hook)(unsigned sched_ctx, int taskid);
+	void (*notify_submitted_job)(struct starpu_task *task, uint32_t footprint);
+};
+
+#ifdef STARPU_USE_SCHED_CTX_HYPERVISOR
+void starpu_set_perf_counters(unsigned sched_ctx_id, struct starpu_performance_counters *perf_counters);
+void starpu_call_poped_task_cb(int workerid, unsigned sched_ctx_id, double flops);
+void starpu_call_pushed_task_cb(int workerid, unsigned sched_ctx_id);
+#endif //STARPU_USE_SCHED_CTX_HYPERVISOR
+
+unsigned starpu_create_sched_ctx(const char *policy_name, int *workerids_ctx, int nworkers_ctx, const char *sched_name);
+
+unsigned starpu_create_sched_ctx_inside_interval(const char *policy_name, const char *sched_name, 
+						 int min_ncpus, int max_ncpus, int min_ngpus, int max_ngpus,
+						 unsigned allow_overlap);
+
+void starpu_delete_sched_ctx(unsigned sched_ctx_id, unsigned inheritor_sched_ctx_id);
+
+void starpu_add_workers_to_sched_ctx(int *workerids_ctx, int nworkers_ctx, unsigned sched_ctx);
+
+void starpu_remove_workers_from_sched_ctx(int *workerids_ctx, int nworkers_ctx, unsigned sched_ctx);
+
+void starpu_set_sched_ctx_policy_data(unsigned sched_ctx, void* policy_data);
+
+void* starpu_get_sched_ctx_policy_data(unsigned sched_ctx);
+
+void starpu_worker_set_sched_condition(unsigned sched_ctx, int workerid, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond);
+
+void starpu_worker_get_sched_condition(unsigned sched_ctx, int workerid, pthread_mutex_t **sched_mutex, pthread_cond_t **sched_cond);
+
+void starpu_worker_init_sched_condition(unsigned sched_ctx, int workerid);
+
+void starpu_worker_deinit_sched_condition(unsigned sched_ctx, int workerid);
+
+struct worker_collection* starpu_create_worker_collection_for_sched_ctx(unsigned sched_ctx_id, int type);
+	
+void starpu_delete_worker_collection_for_sched_ctx(unsigned sched_ctx_id); 
+
+struct worker_collection* starpu_get_worker_collection_of_sched_ctx(unsigned sched_ctx_id);
+
+pthread_mutex_t* starpu_get_changing_ctx_mutex(unsigned sched_ctx_id);
+
+void starpu_set_sched_ctx(unsigned *sched_ctx);
+
+unsigned starpu_get_sched_ctx(void);
+
+void starpu_notify_hypervisor_exists(void);
+
+unsigned starpu_check_if_hypervisor_exists(void);
+
+unsigned starpu_get_nworkers_of_sched_ctx(unsigned sched_ctx);
+
+unsigned starpu_get_nshared_workers(unsigned sched_ctx_id, unsigned sched_ctx_id2);
+
+unsigned starpu_worker_belongs_to_sched_ctx(int workerid, unsigned sched_ctx_id);
+
+unsigned starpu_are_overlapping_ctxs_on_worker(int workerid);
+
+unsigned starpu_is_ctxs_turn(int workerid, unsigned sched_ctx_id);
+
+void starpu_set_turn_to_other_ctx(int workerid, unsigned sched_ctx_id);
+
+double starpu_get_max_time_worker_on_ctx(void);
+
+void starpu_stop_task_submission(void);
+
+#endif /* __STARPU_SCHED_CTX_H__ */

+ 0 - 101
include/starpu_scheduler.h

@@ -118,107 +118,6 @@ struct starpu_sched_policy
 	const char *policy_description;
 };
 
-/* generic structure used by the scheduling contexts to iterated the workers */
-struct worker_collection {
-	/* hidden data structure used to memorize the workers */
-	void *workerids;
-	/* the number of workers in the collection */
-	unsigned nworkers;
-	/* the current cursor of the collection*/
-	pthread_key_t cursor_key;
-	/* the type of structure (WORKER_LIST,...) */
-	int type;
-	/* checks if there is another element in collection */
-	unsigned (*has_next)(struct worker_collection *workers);
-	/* return the next element in the collection */
-	int (*get_next)(struct worker_collection *workers);
-	/* add a new element in the collection */
-	int (*add)(struct worker_collection *workers, int worker);
-	/* remove an element from the collection */
-	int (*remove)(struct worker_collection *workers, int worker);
-	/* initialize the structure */
-	void* (*init)(struct worker_collection *workers);
-	/* free the structure */
-	void (*deinit)(struct worker_collection *workers);
-	/* initialize the cursor if there is one */
-	void (*init_cursor)(struct worker_collection *workers);
-	/* free the cursor if there is one */
-	void (*deinit_cursor)(struct worker_collection *workers);
-};
-
-/* types of structures the worker collection can implement */
-#define WORKER_LIST 0
-
-struct starpu_performance_counters {
-	void (*notify_idle_cycle)(unsigned sched_ctx, int worker, double idle_time);
-	void (*notify_idle_end)(unsigned sched_ctx, int worker);
-	void (*notify_pushed_task)(unsigned sched_ctx, int worker);
-	void (*notify_poped_task)(unsigned sched_ctx, int worker, double flops);
-	void (*notify_post_exec_hook)(unsigned sched_ctx, int taskid);
-	void (*notify_submitted_job)(struct starpu_task *task, uint32_t footprint);
-};
-
-#ifdef STARPU_USE_SCHED_CTX_HYPERVISOR
-void starpu_set_perf_counters(unsigned sched_ctx_id, struct starpu_performance_counters *perf_counters);
-void starpu_call_poped_task_cb(int workerid, unsigned sched_ctx_id, double flops);
-void starpu_call_pushed_task_cb(int workerid, unsigned sched_ctx_id);
-#endif //STARPU_USE_SCHED_CTX_HYPERVISOR
-
-unsigned starpu_create_sched_ctx(const char *policy_name, int *workerids_ctx, int nworkers_ctx, const char *sched_name);
-
-unsigned starpu_create_sched_ctx_inside_interval(const char *policy_name, const char *sched_name, 
-						 int min_ncpus, int max_ncpus, int min_ngpus, int max_ngpus,
-						 unsigned allow_overlap);
-
-void starpu_delete_sched_ctx(unsigned sched_ctx_id, unsigned inheritor_sched_ctx_id);
-
-void starpu_add_workers_to_sched_ctx(int *workerids_ctx, int nworkers_ctx, unsigned sched_ctx);
-
-void starpu_remove_workers_from_sched_ctx(int *workerids_ctx, int nworkers_ctx, unsigned sched_ctx);
-
-void starpu_set_sched_ctx_policy_data(unsigned sched_ctx, void* policy_data);
-
-void* starpu_get_sched_ctx_policy_data(unsigned sched_ctx);
-
-void starpu_worker_set_sched_condition(unsigned sched_ctx, int workerid, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond);
-
-void starpu_worker_get_sched_condition(unsigned sched_ctx, int workerid, pthread_mutex_t **sched_mutex, pthread_cond_t **sched_cond);
-
-void starpu_worker_init_sched_condition(unsigned sched_ctx, int workerid);
-
-void starpu_worker_deinit_sched_condition(unsigned sched_ctx, int workerid);
-
-struct worker_collection* starpu_create_worker_collection_for_sched_ctx(unsigned sched_ctx_id, int type);
-	
-void starpu_delete_worker_collection_for_sched_ctx(unsigned sched_ctx_id); 
-
-struct worker_collection* starpu_get_worker_collection_of_sched_ctx(unsigned sched_ctx_id);
-
-pthread_mutex_t* starpu_get_changing_ctx_mutex(unsigned sched_ctx_id);
-
-void starpu_set_sched_ctx(unsigned *sched_ctx);
-
-unsigned starpu_get_sched_ctx(void);
-
-void starpu_notify_hypervisor_exists(void);
-
-unsigned starpu_check_if_hypervisor_exists(void);
-
-unsigned starpu_get_nworkers_of_sched_ctx(unsigned sched_ctx);
-
-unsigned starpu_get_nshared_workers(unsigned sched_ctx_id, unsigned sched_ctx_id2);
-
-unsigned starpu_worker_belongs_to_sched_ctx(int workerid, unsigned sched_ctx_id);
-
-unsigned starpu_are_overlapping_ctxs_on_worker(int workerid);
-
-unsigned starpu_is_ctxs_turn(int workerid, unsigned sched_ctx_id);
-
-void starpu_set_turn_to_other_ctx(int workerid, unsigned sched_ctx_id);
-
-double starpu_get_max_time_worker_on_ctx(void);
-
-void starpu_stop_task_submission(void);
 
 /* Check if the worker specified by workerid can execute the codelet. */
 int starpu_worker_can_execute_task(unsigned workerid, struct starpu_task *task, unsigned nimpl);

+ 1 - 1
src/core/sched_ctx.c

@@ -683,7 +683,7 @@ unsigned starpu_get_sched_ctx()
 {
 	unsigned *sched_ctx = (unsigned*)pthread_getspecific(sched_ctx_key);
 	if(sched_ctx == NULL)
-		return 0;//STARPU_NMAX_SCHED_CTXS;
+		return STARPU_NMAX_SCHED_CTXS;
 	STARPU_ASSERT(*sched_ctx < STARPU_NMAX_SCHED_CTXS);
 	return *sched_ctx;
 }

+ 29 - 5
src/core/task.c

@@ -364,11 +364,13 @@ int starpu_task_submit(struct starpu_task *task)
 	STARPU_ASSERT(task);
 	STARPU_ASSERT(task->magic == 42);
 	unsigned nsched_ctxs = _starpu_get_nsched_ctxs();
-
+	unsigned set_sched_ctx = STARPU_NMAX_SCHED_CTXS;
+	
 	if(task->sched_ctx == 0 && nsched_ctxs != 1 && !task->control_task)
-		task->sched_ctx = starpu_get_sched_ctx();
-//	task->sched_ctx = (nsched_ctxs == 1 || task->control_task) ? 
-//	   0 : starpu_get_sched_ctx());
+		set_sched_ctx = starpu_get_sched_ctx();
+	if(set_sched_ctx != STARPU_NMAX_SCHED_CTXS)
+		task->sched_ctx = set_sched_ctx;
+
 	int ret;
 	unsigned is_sync = task->synchronous;
 	starpu_task_bundle_t bundle = task->bundle;
@@ -660,7 +662,29 @@ int starpu_task_wait_for_all(void)
 {
 	unsigned nsched_ctxs = _starpu_get_nsched_ctxs();
 	unsigned sched_ctx = nsched_ctxs == 1 ? 0 : starpu_get_sched_ctx();
-	_starpu_wait_for_all_tasks_of_sched_ctx(sched_ctx);
+	
+	/* if there is no indication about which context to wait,
+	   we wait for all tasks submitted to starpu */
+	if(sched_ctx == STARPU_NMAX_SCHED_CTXS)
+	{
+		if (STARPU_UNLIKELY(!_starpu_worker_may_perform_blocking_calls()))
+			return -EDEADLK;
+
+		_STARPU_PTHREAD_MUTEX_LOCK(&submitted_mutex);
+
+		_STARPU_TRACE_TASK_WAIT_FOR_ALL;
+
+		while (nsubmitted > 0)
+			_STARPU_PTHREAD_COND_WAIT(&submitted_cond, &submitted_mutex);
+
+		_STARPU_PTHREAD_MUTEX_UNLOCK(&submitted_mutex);
+
+#ifdef HAVE_AYUDAME_H
+		if (AYU_event) AYU_event(AYU_BARRIER, 0, NULL);
+#endif
+	}
+	else
+		_starpu_wait_for_all_tasks_of_sched_ctx(sched_ctx);
 	return 0;
 }
 

+ 1 - 1
src/core/workers.c

@@ -793,7 +793,7 @@ int starpu_init(struct starpu_conf *user_conf)
 	else
 		sched_ctx = _starpu_create_sched_ctx(user_conf->sched_policy_name, NULL, -1, 1, "init");
 
-	starpu_set_sched_ctx(&sched_ctx->id);
+//	starpu_set_sched_ctx(&sched_ctx->id);
 	_starpu_initialize_registered_performance_models();
 
 	/* Launch "basic" workers (ie. non-combined workers) */