Procházet zdrojové kódy

- plug ICV nthreads

Olivier Aumage před 11 roky
rodič
revize
8ab945d406
1 změnil soubory, kde provedl 73 přidání a 36 odebrání
  1. 73 36
      src/util/openmp_runtime_support.c

+ 73 - 36
src/util/openmp_runtime_support.c

@@ -593,7 +593,29 @@ static void omp_initial_thread_exit()
 
 static void omp_initial_region_setup(void)
 {
+	omp_initial_thread_setup();
 	const int max_levels = _starpu_omp_initial_icv_values->max_active_levels_var;
+	const int max_threads = (int)starpu_cpu_worker_get_count();
+	
+	if (_starpu_omp_initial_icv_values->nthreads_var[0] == 0)
+	{
+		_starpu_omp_initial_icv_values->nthreads_var[0] = max_threads;
+		_starpu_omp_initial_icv_values->nthreads_var[1] = 0;
+	}
+	else
+	{
+		int i;
+		for (i = 0; i < max_levels; i++)
+		{
+			if (_starpu_omp_initial_icv_values->nthreads_var[i] == 0)
+				break;
+			if (_starpu_omp_initial_icv_values->nthreads_var[i] > max_threads)
+			{
+				_starpu_omp_initial_icv_values->nthreads_var[i] = max_threads;
+			}
+		}
+	}
+
 	_global_state.initial_device->icvs.max_active_levels_var = max_levels;
 	_global_state.initial_device->icvs.stacksize_var = _starpu_omp_initial_icv_values->stacksize_var;
 	_global_state.initial_device->icvs.wait_policy_var = _starpu_omp_initial_icv_values->wait_policy_var;
@@ -616,7 +638,7 @@ static void omp_initial_region_setup(void)
 	{
 		_global_state.initial_region->icvs.nthreads_var = malloc(2 * sizeof(*_global_state.initial_region->icvs.nthreads_var));
 		_global_state.initial_region->icvs.nthreads_var[0] = _starpu_omp_initial_icv_values->nthreads_var[0];
-		_global_state.initial_region->icvs.nthreads_var[0] = 0;
+		_global_state.initial_region->icvs.nthreads_var[1] = 0;
 	}
 
 	if (_starpu_omp_initial_icv_values->bind_var[1] != starpu_omp_bind_undefined)
@@ -633,7 +655,7 @@ static void omp_initial_region_setup(void)
 	{
 		_global_state.initial_region->icvs.bind_var = malloc(2 * sizeof(*_global_state.initial_region->icvs.bind_var));
 		_global_state.initial_region->icvs.bind_var[0] = _starpu_omp_initial_icv_values->bind_var[0];
-		_global_state.initial_region->icvs.bind_var[0] = starpu_omp_bind_undefined;
+		_global_state.initial_region->icvs.bind_var[1] = starpu_omp_bind_undefined;
 	}
 	_global_state.initial_region->icvs.thread_limit_var = _starpu_omp_initial_icv_values->thread_limit_var;
 	_global_state.initial_region->icvs.active_levels_var = 0;
@@ -641,7 +663,6 @@ static void omp_initial_region_setup(void)
 	_global_state.initial_region->icvs.default_device_var = _starpu_omp_initial_icv_values->default_device_var;
 	starpu_omp_task_list_push_back(_global_state.initial_region->implicit_task_list,
 			_global_state.initial_task);
-	omp_initial_thread_setup();
 }
 
 static void omp_initial_region_exit(void)
@@ -733,69 +754,85 @@ void starpu_omp_parallel_region(const struct starpu_omp_parallel_region_attr *at
 {
 	struct starpu_omp_thread *master_thread = STARPU_PTHREAD_GETSPECIFIC(omp_thread_key);
 	struct starpu_omp_task *task = STARPU_PTHREAD_GETSPECIFIC(omp_task_key);
-	struct starpu_omp_region *region = task->owner_region;
+	struct starpu_omp_region *generating_region = task->owner_region;
 	int ret;
 
-	const int max_levels = region->owner_device->icvs.max_active_levels_var;
+	const int max_levels = generating_region->owner_device->icvs.max_active_levels_var;
+	struct starpu_omp_region *new_region = 
+		create_omp_region_struct(generating_region, _global_state.initial_device);
 
-	/* TODO: compute the proper nb_threads and launch additional workers as needed.
-	 * for now, the level 1 parallel region spans all the threads
-	 * and level >= 2 parallel regions have only one thread */
-	int nb_threads = (attr->if_clause != 0 && region->level == 0)?starpu_cpu_worker_get_count():1;
+	int nb_threads = 1;
 
-	struct starpu_omp_region *new_region = 
-		create_omp_region_struct(region, _global_state.initial_device);
+	/* TODO: for now, nested parallel sections are not supported, thus we
+	 * open an active parallel section only if the generating region is the
+	 * initial region */
+	if (generating_region->level == 0)
+	{
+		if (attr->if_clause != 0)
+		{
+			const int max_threads = (int)starpu_cpu_worker_get_count();
+			if (generating_region->icvs.nthreads_var[0] < max_threads)
+			{
+				nb_threads = generating_region->icvs.nthreads_var[0];
+			}
+			else
+			{
+				nb_threads = max_threads;
+			}
+		}
+	}
+	STARPU_ASSERT(nb_threads > 0);
 
-	new_region->icvs.dyn_var = region->icvs.dyn_var;
-	new_region->icvs.nest_var = region->icvs.nest_var;
+	new_region->icvs.dyn_var = generating_region->icvs.dyn_var;
+	new_region->icvs.nest_var = generating_region->icvs.nest_var;
 	if (new_region->level < max_levels)
 	{
-		if (region->icvs.nthreads_var[1] != 0)
+		if (generating_region->icvs.nthreads_var[1] != 0)
 		{
 			new_region->icvs.nthreads_var = malloc((1+max_levels-new_region->level) * sizeof(*new_region->icvs.nthreads_var));
 			int i,j;
 			for (i = new_region->level, j = 0; i < max_levels; i++, j++)
 			{
-				new_region->icvs.nthreads_var[j] = region->icvs.nthreads_var[j+1];
+				new_region->icvs.nthreads_var[j] = generating_region->icvs.nthreads_var[j+1];
 			}
 			new_region->icvs.nthreads_var[j] = 0;
 		}
 		else
 		{
 			new_region->icvs.nthreads_var = malloc(2 * sizeof(*new_region->icvs.nthreads_var));
-			new_region->icvs.nthreads_var[0] = region->icvs.nthreads_var[0];
-			new_region->icvs.nthreads_var[0] = 0;
+			new_region->icvs.nthreads_var[0] = generating_region->icvs.nthreads_var[0];
+			new_region->icvs.nthreads_var[1] = 0;
 		}
 
-		if (region->icvs.bind_var[1] != starpu_omp_bind_undefined)
+		if (generating_region->icvs.bind_var[1] != starpu_omp_bind_undefined)
 		{
 			new_region->icvs.bind_var = malloc((1+max_levels-new_region->level) * sizeof(*new_region->icvs.bind_var));
 			int i,j;
 			for (i = new_region->level, j = 0; i < max_levels; i++, j++)
 			{
-				new_region->icvs.bind_var[j] = region->icvs.bind_var[j+1];
+				new_region->icvs.bind_var[j] = generating_region->icvs.bind_var[j+1];
 			}
 			new_region->icvs.bind_var[j] = starpu_omp_bind_undefined;
 		}
 		else
 		{
 			new_region->icvs.bind_var = malloc(2 * sizeof(*new_region->icvs.bind_var));
-			new_region->icvs.bind_var[0] = region->icvs.bind_var[0];
-			new_region->icvs.bind_var[0] = starpu_omp_bind_undefined;
+			new_region->icvs.bind_var[0] = generating_region->icvs.bind_var[0];
+			new_region->icvs.bind_var[1] = starpu_omp_bind_undefined;
 		}
 	}
 	else
 	{
 		new_region->icvs.nthreads_var = malloc(sizeof(*new_region->icvs.nthreads_var));
-		new_region->icvs.nthreads_var[0] = region->icvs.nthreads_var[0];
+		new_region->icvs.nthreads_var[0] = generating_region->icvs.nthreads_var[0];
 
 		new_region->icvs.bind_var = malloc(sizeof(*new_region->icvs.bind_var));
-		new_region->icvs.bind_var[0] = region->icvs.bind_var[0];
+		new_region->icvs.bind_var[0] = generating_region->icvs.bind_var[0];
 	}
-	new_region->icvs.thread_limit_var = region->icvs.thread_limit_var;
-	new_region->icvs.active_levels_var = (nb_threads > 1)?region->icvs.active_levels_var+1:region->icvs.active_levels_var;
-	new_region->icvs.levels_var = region->icvs.levels_var+1;
-	new_region->icvs.default_device_var = region->icvs.default_device_var;
+	new_region->icvs.thread_limit_var = generating_region->icvs.thread_limit_var;
+	new_region->icvs.active_levels_var = (nb_threads > 1)?generating_region->icvs.active_levels_var+1:generating_region->icvs.active_levels_var;
+	new_region->icvs.levels_var = generating_region->icvs.levels_var+1;
+	new_region->icvs.default_device_var = generating_region->icvs.default_device_var;
 
 	int i;
 	for (i = 0; i < nb_threads; i++)
@@ -812,7 +849,7 @@ void starpu_omp_parallel_region(const struct starpu_omp_parallel_region_attr *at
 			/* TODO: specify actual starpu worker */
 
 			/* TODO: use a less arbitrary thread/worker mapping scheme */
-			if (region->level == 0)
+			if (generating_region->level == 0)
 			{
 				struct _starpu_worker *worker = _starpu_get_worker_struct(i);
 				new_thread = get_worker_thread(worker);
@@ -840,8 +877,8 @@ void starpu_omp_parallel_region(const struct starpu_omp_parallel_region_attr *at
 
 	/* 
 	 * if task == initial_task, create a starpu task as a continuation to all the implicit
-	 * tasks of the new region, else prepare the task for preemption,
-	 * to become itself a continuation to the implicit tasks of the new region
+	 * tasks of the new generating_region, else prepare the task for preemption,
+	 * to become itself a continuation to the implicit tasks of the new generating_region
 	 */
 	if (task == _global_state.initial_task)
 	{
@@ -873,14 +910,14 @@ void starpu_omp_parallel_region(const struct starpu_omp_parallel_region_attr *at
 	{
 		implicit_task->cl = attr->cl;
 		/*
-		 * save pointer to the regions user function from the parallel region codelet
+		 * save pointer to the regions user function from the parallel generating_region codelet
 		 *
 		 * TODO: add support for multiple/heterogeneous implementations
 		 */
 		implicit_task->f = implicit_task->cl.cpu_funcs[0];
 
 		/*
-		 * plug the task wrapper into the parallel region codelet instead, to support task preemption
+		 * plug the task wrapper into the parallel generating_region codelet instead, to support task preemption
 		 */
 		implicit_task->cl.cpu_funcs[0] = starpu_omp_implicit_task_exec;
 
@@ -905,7 +942,7 @@ void starpu_omp_parallel_region(const struct starpu_omp_parallel_region_attr *at
 	attr = NULL;
 
 	/*
-	 * submit all the region implicit starpu tasks
+	 * submit all the generating_region implicit starpu tasks
 	 */
 	for (implicit_task  = starpu_omp_task_list_begin(new_region->implicit_task_list);
 			implicit_task != starpu_omp_task_list_end(new_region->implicit_task_list);
@@ -916,7 +953,7 @@ void starpu_omp_parallel_region(const struct starpu_omp_parallel_region_attr *at
 	}
 
 	/*
-	 * submit the region continuation starpu task if task == initial_task
+	 * submit the generating_region continuation starpu task if task == initial_task
 	 */
 	if (task == _global_state.initial_task)
 	{
@@ -925,7 +962,7 @@ void starpu_omp_parallel_region(const struct starpu_omp_parallel_region_attr *at
 	}
 
 	/*
-	 * preempt for completion of the region
+	 * preempt for completion of the generating_region
 	 */
 	starpu_omp_task_preempt();
 	if (task == _global_state.initial_task)
@@ -938,7 +975,7 @@ void starpu_omp_parallel_region(const struct starpu_omp_parallel_region_attr *at
 		new_region->continuation_starpu_task = NULL;
 	}
 	/*
-	 * TODO: free region resources
+	 * TODO: free generating_region resources
 	 */
 	for (i = 0; i < nb_threads; i++)
 	{