Browse Source

execute parallel code in a context (do not backport on the release branch, it's not working correctly yet)

Andra Hugo 12 years ago
parent
commit
71d6907584

+ 10 - 3
examples/Makefile.am

@@ -46,8 +46,9 @@ EXTRA_DIST = 					\
 	lu/xlu_implicit_pivot.c			\
 	lu/xlu_kernels.c			\
 	lu/lu_example.c				\
-	sched_ctx_utils/sched_ctx_utils.c		\
-	sched_ctx/sched_ctx.c		\
+	sched_ctx_utils/sched_ctx_utils.c			\
+	sched_ctx/sched_ctx.c					\
+	sched_ctx/parallel_code.c				\
 	incrementer/incrementer_kernels_opencl_kernel.cl 	\
 	basic_examples/variable_kernels_opencl_kernel.cl	\
 	matvecmult/matvecmult_kernel.cl				\
@@ -181,6 +182,7 @@ examplebin_PROGRAMS +=				\
 	matvecmult/matvecmult			\
 	profiling/profiling			\
 	sched_ctx/sched_ctx			\
+	sched_ctx/parallel_code			\
 	reductions/dot_product			\
 	reductions/minmax_reduction		\
 	mandelbrot/mandelbrot			\
@@ -253,7 +255,8 @@ STARPU_EXAMPLES +=				\
 	matvecmult/matvecmult			\
 	profiling/profiling			\
 	scheduler/dummy_sched			\
-	sched_ctx/sched_ctx				\
+	sched_ctx/sched_ctx			\
+	sched_ctx/parallel_code			\
 	reductions/dot_product			\
 	reductions/minmax_reduction
 
@@ -886,6 +889,10 @@ examplebin_PROGRAMS +=		\
 
 openmp_vector_scal_omp_CFLAGS = \
 	$(AM_CFLAGS) -fopenmp
+
+sched_ctx_parallel_code_CFLAGS = \
+	$(AM_CFLAGS) -fopenmp
+
 endif
 
 showcheck:

+ 125 - 0
examples/sched_ctx/parallel_code.c

@@ -0,0 +1,125 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2010-2013  Université de Bordeaux 1
+ * Copyright (C) 2010-2013  Centre National de la Recherche Scientifique
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <starpu.h>
+
+#ifdef STARPU_QUICK_CHECK
+#define NTASKS 64
+#else
+#define NTASKS 1000
+#endif
+
+int tasks_executed = 0;
+starpu_pthread_mutex_t mut;
+
+static void sched_ctx_func(void *descr[] __attribute__ ((unused)), void *arg __attribute__ ((unused)))
+{
+	starpu_pthread_mutex_lock(&mut);
+	tasks_executed++;
+	starpu_pthread_mutex_unlock(&mut);
+}
+
+static struct starpu_codelet sched_ctx_codelet =
+{
+	.cpu_funcs = {sched_ctx_func, NULL},
+	.cuda_funcs = {sched_ctx_func, NULL},
+	.opencl_funcs = {sched_ctx_func, NULL},
+	.model = NULL,
+	.nbuffers = 0,
+	.name = "sched_ctx"
+};
+
+int parallel_code(int nprocs)
+{
+	int i;
+	int tasks = 0;
+#pragma omp parallel for num_threads(nprocs)
+	for (i = 0; i < NTASKS; i++) 
+		tasks++;
+	return tasks;
+}
+
+int main(int argc, char **argv)
+{
+	int ntasks = NTASKS;
+	int ret;
+
+	ret = starpu_init(NULL);
+	if (ret == -ENODEV)
+		return 77;
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
+
+	starpu_pthread_mutex_init(&mut, NULL);
+	int nprocs1 = 1;
+	int nprocs2 = 1;
+	int procs1[20], procs2[20];
+	procs1[0] = 0;
+	procs2[0] = 0;
+
+#ifdef STARPU_USE_CPU
+	unsigned ncpus =  starpu_cpu_worker_get_count();
+	starpu_worker_get_ids_by_type(STARPU_CPU_WORKER, procs1, ncpus);
+
+	nprocs1 = ncpus/2;
+	nprocs2 =  nprocs1;
+	int j, k = 0;
+	for(j = nprocs1; j < nprocs2; j++)
+		procs2[k++] = j;
+#endif
+
+	/*create contexts however you want*/
+	unsigned sched_ctx1 = starpu_sched_ctx_create("dmda", procs1, nprocs1, "ctx1");
+	unsigned sched_ctx2 = starpu_sched_ctx_create("dmda", procs2, nprocs2, "ctx2");
+
+	/*indicate what to do with the resources when context 2 finishes (it depends on your application)*/
+	starpu_sched_ctx_set_inheritor(sched_ctx2, sched_ctx1);
+
+	int i;
+	for (i = 0; i < ntasks; i++)
+	{
+		struct starpu_task *task = starpu_task_create();
+
+		task->cl = &sched_ctx_codelet;
+		task->cl_arg = NULL;
+
+		/*submit tasks to context*/
+		ret = starpu_task_submit_to_ctx(task,sched_ctx1);
+
+		STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");
+	}
+
+	/* tell starpu when you finished submitting tasks to this context
+	   in order to allow moving resources from this context to the inheritor one
+	   when its corresponding tasks finished executing */
+
+	starpu_sched_ctx_finished_submit(sched_ctx1);
+
+	/* execute an openmp code */
+	int ret_ntasks = (int)starpu_sched_ctx_exec_parallel_code((void*)parallel_code, (void*)nprocs2, sched_ctx2);
+	printf("ctx%d: tasks %d out of %d\n", sched_ctx2, ret_ntasks, NTASKS);
+	starpu_sched_ctx_finished_submit(sched_ctx2);
+
+	/* wait for all tasks at the end*/
+	starpu_task_wait_for_all();
+
+	starpu_sched_ctx_delete(sched_ctx1);
+	starpu_sched_ctx_delete(sched_ctx2);
+	printf("ctx%d: tasks executed %d out of %d\n", sched_ctx1, tasks_executed, ntasks);
+	starpu_shutdown();
+
+	return 0;
+}

+ 2 - 0
include/starpu_sched_ctx.h

@@ -181,6 +181,8 @@ int starpu_sched_ctx_set_max_priority(unsigned sched_ctx_id, int max_prio);
  * statically allocate tasks with a default priority. */
 #define STARPU_DEFAULT_PRIO	0
 
+/* execute any parallel code on the workers of the sched_ctx (workers are blocked) */
+void* starpu_sched_ctx_exec_parallel_code(void* (*func)(void* param), void* param, unsigned sched_ctx_id);
 
 #ifdef __cplusplus
 }

+ 113 - 0
src/core/sched_ctx.c

@@ -215,6 +215,30 @@ static void _starpu_sched_ctx_free_scheduling_data(struct _starpu_sched_ctx *sch
 
 }
 
+#ifdef STARPU_HAVE_HWLOC
+static void _starpu_sched_ctx_create_hwloc_tree(struct _starpu_sched_ctx *sched_ctx)
+{
+	struct _starpu_machine_config *config = (struct _starpu_machine_config *)_starpu_get_machine_config();
+	sched_ctx->hwloc_workers_set = hwloc_bitmap_alloc();
+
+	struct starpu_worker_collection *workers = sched_ctx->workers;
+	int worker;
+	struct starpu_sched_ctx_iterator it;
+	if(workers->init_iterator)
+		workers->init_iterator(workers, &it);
+
+	while(workers->has_next(workers, &it))
+	{
+		worker = workers->get_next(workers, &it);
+		hwloc_bitmap_or(sched_ctx->hwloc_workers_set,
+				sched_ctx->hwloc_workers_set,
+				config->workers[worker].initial_hwloc_cpu_set);
+
+	}
+	return;
+}
+#endif
+
 struct _starpu_sched_ctx*  _starpu_create_sched_ctx(const char *policy_name, int *workerids,
 				  int nworkers_ctx, unsigned is_initial_sched,
 				  const char *sched_name)
@@ -259,6 +283,10 @@ struct _starpu_sched_ctx*  _starpu_create_sched_ctx(const char *policy_name, int
 	/* after having an worker_collection on the ressources add them */
 	_starpu_add_workers_to_sched_ctx(sched_ctx, workerids, nworkers_ctx, NULL, NULL);
 
+#ifdef STARPU_HAVE_HWLOC
+	/* build hwloc tree of the context */
+	_starpu_sched_ctx_create_hwloc_tree(sched_ctx);
+#endif //STARPU_HAVE_HWLOC
 
 	/* if we create the initial big sched ctx we can update workers' status here
 	   because they haven't been launched yet */
@@ -1102,3 +1130,88 @@ int starpu_sched_ctx_set_max_priority(unsigned sched_ctx_id, int max_prio)
 	sched_ctx->max_priority = max_prio;
 	return 0;
 }
+
+static void _starpu_sched_ctx_bind_thread_to_ctx_cpus(unsigned sched_ctx_id)
+{
+#ifdef STARPU_HAVE_HWLOC
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	struct _starpu_machine_config *config = _starpu_get_machine_config();
+	
+	const struct hwloc_topology_support *support = hwloc_topology_get_support(config->topology.hwtopology);
+        if (support->cpubind->set_thisthread_cpubind)
+        {
+		hwloc_bitmap_t set = sched_ctx->hwloc_workers_set;
+                int ret;
+		
+                ret = hwloc_set_cpubind (config->topology.hwtopology, set,
+                                         HWLOC_CPUBIND_THREAD);
+		if (ret)
+                {
+                        perror("binding thread");
+			STARPU_ABORT();
+                }
+	}
+
+#else
+#warning no sched ctx CPU binding support
+#endif
+	return;
+}
+static void _starpu_sched_ctx_get_workers_to_sleep(unsigned sched_ctx_id)
+{
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+
+	struct starpu_worker_collection *workers = sched_ctx->workers;
+	struct starpu_sched_ctx_iterator it;
+	struct _starpu_worker *worker = NULL;
+	if(workers->init_iterator)
+		workers->init_iterator(workers, &it);
+
+	while(workers->has_next(workers, &it))
+	{
+		worker = _starpu_get_worker_struct(workers->get_next(workers, &it));
+		STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
+		worker->parallel_sect = 1;
+		STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
+	}
+	return;
+}
+
+static void _starpu_sched_ctx_wake_up_workers(unsigned sched_ctx_id)
+{
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+
+	struct starpu_worker_collection *workers = sched_ctx->workers;
+	struct starpu_sched_ctx_iterator it;
+	struct _starpu_worker *worker = NULL;
+	if(workers->init_iterator)
+		workers->init_iterator(workers, &it);
+
+	while(workers->has_next(workers, &it))
+	{
+		worker = _starpu_get_worker_struct(workers->get_next(workers, &it));
+		STARPU_PTHREAD_MUTEX_LOCK(&worker->parallel_sect_mutex);
+		STARPU_PTHREAD_COND_BROADCAST(&worker->parallel_sect_cond);
+		STARPU_PTHREAD_MUTEX_UNLOCK(&worker->parallel_sect_mutex);
+	}
+	return;
+}
+
+void* starpu_sched_ctx_exec_parallel_code(void* (*func)(void* param), void* param, unsigned sched_ctx_id)
+{
+	/* get starpu workers to sleep */
+	_starpu_sched_ctx_get_workers_to_sleep(sched_ctx_id);
+
+	/* bind current thread on all workers of the context */
+	_starpu_sched_ctx_bind_thread_to_ctx_cpus(sched_ctx_id);
+	
+	/* execute parallel code */
+	void* ret = func(param);
+
+	/* wake up starpu workers */
+	_starpu_sched_ctx_wake_up_workers(sched_ctx_id);
+
+	return ret;
+}
+
+

+ 5 - 0
src/core/sched_ctx.h

@@ -91,6 +91,11 @@ struct _starpu_sched_ctx
          * task (level 1) or it is not (level 0). */
      	int min_priority;
 	int max_priority;
+	
+	/* hwloc tree structure of workers */
+#ifdef STARPU_HAVE_HWLOC
+	hwloc_bitmap_t hwloc_workers_set;
+#endif
 
 #ifdef STARPU_USE_SC_HYPERVISOR
 	/* a structure containing a series of performance counters determining the resize procedure */

+ 3 - 0
src/core/workers.c

@@ -400,6 +400,9 @@ static void _starpu_launch_drivers(struct _starpu_machine_config *pconfig)
 
 		STARPU_PTHREAD_MUTEX_INIT(&workerarg->sched_mutex, NULL);
 		STARPU_PTHREAD_COND_INIT(&workerarg->sched_cond, NULL);
+		STARPU_PTHREAD_MUTEX_INIT(&workerarg->parallel_sect_mutex, NULL);
+		STARPU_PTHREAD_COND_INIT(&workerarg->parallel_sect_cond, NULL);
+		workerarg->parallel_sect = 0;
 
 		/* if some codelet's termination cannot be handled directly :
 		 * for instance in the Gordon driver, Gordon tasks' callbacks

+ 9 - 0
src/core/workers.h

@@ -85,6 +85,15 @@ struct _starpu_worker
 	unsigned active_ctx;
 
 	unsigned removed_from_ctx[STARPU_NMAX_SCHED_CTXS];
+
+	/* conditions variables used when parallel sections are executed in contexts */
+	starpu_pthread_cond_t parallel_sect_cond;
+	starpu_pthread_mutex_t parallel_sect_mutex;
+
+	/* boolean indicating that workers should block in order to allow
+	   parallel sections to be executed on their allocated resources */
+	unsigned parallel_sect;
+
 #ifdef __GLIBC__
 	cpu_set_t initial_cpu_set;
 	cpu_set_t current_cpu_set;

+ 8 - 0
src/drivers/driver_common/driver_common.c

@@ -156,6 +156,14 @@ struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *args, int wor
 	struct starpu_task *task;
 
 	STARPU_PTHREAD_MUTEX_LOCK(&args->sched_mutex);
+	if(args->parallel_sect)
+	{
+		STARPU_PTHREAD_MUTEX_LOCK(&args->parallel_sect_mutex);
+		STARPU_PTHREAD_COND_WAIT(&args->parallel_sect_cond, &args->parallel_sect_mutex);
+		STARPU_PTHREAD_MUTEX_UNLOCK(&args->parallel_sect_mutex);
+		args->parallel_sect = 0;
+	}
+
 	task = _starpu_pop_task(args);
 
 	if (task == NULL)