Browse Source

Allow parallel OpenMP workers to be viewed as sequential ones for specific tasks, then rebind automatically.

Terry Cojean 9 years ago
parent
commit
548ff4b4c6

+ 8 - 1
examples/sched_ctx/parallel_tasks_with_cluster_api.c

@@ -101,7 +101,14 @@ int main(int argc, char **argv)
 				    STARPU_VALUE,&size,sizeof(int),
 				    STARPU_VALUE,&size,sizeof(int),
 				    0);
 				    0);
 		t->destroy = 1;
 		t->destroy = 1;
-		t->possibly_parallel = 1;
+		/* For two tasks, try out the case when the task isn't parallel and expect
+			 the configuration to be sequential due to this, then automatically changed
+			 back to the parallel one */
+		if (i<=4 || i > 6)
+			t->possibly_parallel = 1;
+		/* Note that this mode requires that you put a prologue callback managing
+			 this on all tasks to be taken into account. */
+		t->prologue_callback_pop_func = &starpu_openmp_prologue;
 
 
 		ret=starpu_task_submit(t);
 		ret=starpu_task_submit(t);
 		if (ret == -ENODEV)
 		if (ret == -ENODEV)

+ 2 - 2
include/starpu_clusters_util.h

@@ -72,10 +72,10 @@ int starpu_uncluster_machine(struct starpu_cluster_machine* clusters);
 int starpu_cluster_print(struct starpu_cluster_machine* clusters);
 int starpu_cluster_print(struct starpu_cluster_machine* clusters);
 
 
 /* Prologue functions */
 /* Prologue functions */
-void starpu_openmp_prologue(void * sched_ctx_id);
+void starpu_openmp_prologue(void*);
 #define starpu_intel_openmp_mkl_prologue starpu_openmp_prologue
 #define starpu_intel_openmp_mkl_prologue starpu_openmp_prologue
 #ifdef STARPU_MKL
 #ifdef STARPU_MKL
-void starpu_gnu_openmp_mkl_prologue(void * sched_ctx_id);
+void starpu_gnu_openmp_mkl_prologue(void*);
 #endif /* STARPU_MKL */
 #endif /* STARPU_MKL */
 
 
 #ifdef __cplusplus
 #ifdef __cplusplus

+ 1 - 0
src/core/sched_ctx.c

@@ -566,6 +566,7 @@ struct _starpu_sched_ctx* _starpu_create_sched_ctx(struct starpu_sched_policy *p
 		sched_ctx->sleeping[w] = 0;
 		sched_ctx->sleeping[w] = 0;
 	}
 	}
 
 
+	sched_ctx->parallel_view = 0;
 
 
         /*init the strategy structs and the worker_collection of the ressources of the context */
         /*init the strategy structs and the worker_collection of the ressources of the context */
 	if(policy)
 	if(policy)

+ 4 - 0
src/core/sched_ctx.h

@@ -168,6 +168,10 @@ struct _starpu_sched_ctx
 	/* perf model for the device comb of the ctx */
 	/* perf model for the device comb of the ctx */
 	struct starpu_perfmodel_arch perf_arch;
 	struct starpu_perfmodel_arch perf_arch;
 
 
+	/* For parallel workers, say whether it is viewed as sequential or not. This
+		 is a helper for the prologue code. */
+	unsigned parallel_view;
+
 	/* for ctxs without policy: flag to indicate that we want to get
 	/* for ctxs without policy: flag to indicate that we want to get
 	   the threads to sleep in order to replace them with other threads or leave
 	   the threads to sleep in order to replace them with other threads or leave
 	   them awake & use them in the parallel code*/
 	   them awake & use them in the parallel code*/

+ 4 - 0
src/core/sched_policy.c

@@ -989,7 +989,11 @@ profiling:
 	}
 	}
 
 
 	if(task->prologue_callback_pop_func)
 	if(task->prologue_callback_pop_func)
+	{
+		_starpu_set_current_task(task);
 		task->prologue_callback_pop_func(task->prologue_callback_pop_arg);
 		task->prologue_callback_pop_func(task->prologue_callback_pop_arg);
+		_starpu_set_current_task(NULL);
+	}
 
 
 	return task;
 	return task;
 }
 }

+ 44 - 22
src/util/starpu_clusters_create.c

@@ -45,45 +45,67 @@ starpu_binding_function _starpu_cluster_type_get_func(starpu_cluster_types type)
 	return prologue_func;
 	return prologue_func;
 }
 }
 
 
-void starpu_openmp_prologue(void *sched_ctx_id)
+void starpu_openmp_prologue(void* arg)
 {
 {
-	int sched_ctx = *(int*)sched_ctx_id;
-	int *cpuids = NULL;
-	int ncpuids = 0;
 	int workerid = starpu_worker_get_id_check();
 	int workerid = starpu_worker_get_id_check();
 
 
 	if (starpu_worker_get_type(workerid) == STARPU_CPU_WORKER)
 	if (starpu_worker_get_type(workerid) == STARPU_CPU_WORKER)
 	{
 	{
-		starpu_sched_ctx_get_available_cpuids(sched_ctx, &cpuids, &ncpuids);
-		omp_set_num_threads(ncpuids);
-#pragma omp parallel
+		struct starpu_task *task = starpu_task_get_current();
+		int sched_ctx = task->sched_ctx;
+		struct _starpu_sched_ctx *ctx_struct = _starpu_get_sched_ctx_struct(sched_ctx);
+		/* If the view of the worker doesn't correspond to the view of the task,
+			 adapt the thread team */
+		if (ctx_struct->parallel_view != task->possibly_parallel)
 		{
 		{
-			starpu_sched_ctx_bind_current_thread_to_cpuid(cpuids[omp_get_thread_num()]);
+			int *cpuids = NULL;
+			int ncpuids = 0;
+
+			starpu_sched_ctx_get_available_cpuids(sched_ctx, &cpuids, &ncpuids);
+			if (!task->possibly_parallel)
+				ncpuids=1;
+			omp_set_num_threads(ncpuids);
+#pragma omp parallel
+			{
+				starpu_sched_ctx_bind_current_thread_to_cpuid(cpuids[omp_get_thread_num()]);
+			}
+			free(cpuids);
+			ctx_struct->parallel_view = !ctx_struct->parallel_view;
 		}
 		}
-		free(cpuids);
 	}
 	}
 	return;
 	return;
 }
 }
 
 
 #ifdef STARPU_MKL
 #ifdef STARPU_MKL
-void starpu_gnu_openmp_mkl_prologue(void *sched_ctx_id)
+void starpu_gnu_openmp_mkl_prologue(void* arg)
 {
 {
-	int sched_ctx = *(int*)sched_ctx_id;
-	int *cpuids = NULL;
-	int ncpuids = 0;
 	int workerid = starpu_worker_get_id();
 	int workerid = starpu_worker_get_id();
 
 
 	if (starpu_worker_get_type(workerid) == STARPU_CPU_WORKER)
 	if (starpu_worker_get_type(workerid) == STARPU_CPU_WORKER)
 	{
 	{
-		starpu_sched_ctx_get_available_cpuids(sched_ctx, &cpuids, &ncpuids);
-		omp_set_num_threads(ncpuids);
-		mkl_set_num_threads(ncpuids);
-		mkl_set_dynamic(0);
-#pragma omp parallel
+		struct starpu_task *task = starpu_task_get_current();
+		int sched_ctx = task->sched_ctx;
+		struct _starpu_sched_ctx *ctx_struct = _starpu_get_sched_ctx_struct(sched_ctx);
+		/* If the view of the worker doesn't correspond to the view of the task,
+			 adapt the thread team */
+		if (ctx_struct->parallel_view != task->possibly_parallel)
 		{
 		{
-			starpu_sched_ctx_bind_current_thread_to_cpuid(cpuids[omp_get_thread_num()]);
+			int *cpuids = NULL;
+			int ncpuids = 0;
+
+			starpu_sched_ctx_get_available_cpuids(sched_ctx, &cpuids, &ncpuids);
+			if (!task->possibly_parallel)
+				ncpuids=1;
+			omp_set_num_threads(ncpuids);
+			mkl_set_num_threads(ncpuids);
+			mkl_set_dynamic(0);
+#pragma omp parallel
+			{
+				starpu_sched_ctx_bind_current_thread_to_cpuid(cpuids[omp_get_thread_num()]);
+			}
+			free(cpuids);
+			ctx_struct->parallel_view = !ctx_struct->parallel_view;
 		}
 		}
-		free(cpuids);
 	}
 	}
 	return;
 	return;
 }
 }
@@ -324,8 +346,8 @@ int _starpu_cluster_bind(struct _starpu_cluster *cluster)
 	else
 	else
 	{
 	{
 		func = _starpu_cluster_type_get_func(cluster->params->type);
 		func = _starpu_cluster_type_get_func(cluster->params->type);
-		func_arg = (void*) &cluster->id;
-		}
+		func_arg = NULL;
+	}
 
 
 	return starpu_task_insert(&_starpu_cluster_bind_cl,
 	return starpu_task_insert(&_starpu_cluster_bind_cl,
 				  STARPU_SCHED_CTX, cluster->id,
 				  STARPU_SCHED_CTX, cluster->id,