Procházet zdrojové kódy

Add STARPU_MAIN_THREAD_CPUID and STARPU_MPI_THREAD_CPUID environment variables

Samuel Thibault před 7 roky
rodič
revize
28fcff7473

+ 2 - 0
ChangeLog

@@ -50,6 +50,8 @@ Small features:
   * Add SubmitOrder trace field.
   * Add workerids and workerids_len task fields.
   * Add priority management to StarPU-MPI.
+  * Add STARPU_MAIN_THREAD_CPUID and STARPU_MPI_THREAD_CPUID environment
+    variables.
 
 Changes:
   * Vastly improve simgrid simulation time.

+ 15 - 0
doc/doxygen/chapters/501_environment_variables.doxy

@@ -205,6 +205,21 @@ set.
 
 </dd>
 
+<dt>STARPU_MAIN_THREAD_CPUID</dt>
+<dd>
+\anchor STARPU_MAIN_THREAD_CPUID
+\addindex __env__STARPU_MAIN_THREAD_CPUID
+When defined, this make StarPU bind the thread that calls starpu_initialize() to
+the given CPU ID.
+</dd>
+
+<dt>STARPU_MPI_THREAD_CPUID</dt>
+<dd>
+\anchor STARPU_MPI_THREAD_CPUID
+\addindex __env__STARPU_MPI_THREAD_CPUID
+When defined, this make StarPU bind its MPI thread to the given CPU ID.
+</dd>
+
 <dt>STARPU_WORKERS_CUDAID</dt>
 <dd>
 \anchor STARPU_WORKERS_CUDAID

+ 10 - 0
mpi/src/starpu_mpi.c

@@ -38,6 +38,8 @@
 #include <datawizard/coherency.h>
 #include <core/simgrid.h>
 #include <core/task.h>
+#include <core/topology.h>
+#include <core/workers.h>
 
 /* Number of ready requests to process before polling for completed requests */
 static unsigned nready_process;
@@ -45,6 +47,8 @@ static unsigned nready_process;
 /* Number of send requests to submit to MPI at the same time */
 static unsigned ndetached_send;
 
+static int mpi_thread_cpuid = -1;
+
 static void _starpu_mpi_add_sync_point_in_fxt(void);
 static void _starpu_mpi_submit_ready_request(void *arg);
 static void _starpu_mpi_handle_ready_request(struct _starpu_mpi_req *req);
@@ -1369,7 +1373,12 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 	starpu_pthread_setname("MPI");
 
 #ifndef STARPU_SIMGRID
+	if (mpi_thread_cpuid >= 0)
+		_starpu_bind_thread_on_cpu(mpi_thread_cpuid, STARPU_NOWORKERID);
 	_starpu_mpi_do_initialize(argc_argv);
+	if (mpi_thread_cpuid >= 0)
+		/* In case MPI changed the binding */
+		_starpu_bind_thread_on_cpu(mpi_thread_cpuid, STARPU_NOWORKERID);
 #endif
 
 	_starpu_mpi_fake_world_size = starpu_get_env_number("STARPU_MPI_FAKE_SIZE");
@@ -1720,6 +1729,7 @@ int _starpu_mpi_progress_init(struct _starpu_mpi_argc_argv *argc_argv)
         _starpu_mpi_comm_debug = starpu_getenv("STARPU_MPI_COMM") != NULL;
 	nready_process = starpu_get_env_number_default("STARPU_MPI_NREADY_PROCESS", 10);
 	ndetached_send = starpu_get_env_number_default("STARPU_MPI_NDETACHED_SEND", 10);
+	mpi_thread_cpuid = starpu_get_env_number_default("STARPU_MPI_THREAD_CPUID", -1);
 
 #ifdef STARPU_SIMGRID
 	STARPU_PTHREAD_MUTEX_INIT(&wait_counter_mutex, NULL);

+ 11 - 11
src/core/perfmodel/perfmodel_bus.c

@@ -142,7 +142,7 @@ hwloc_topology_t _starpu_perfmodel_get_hwtopology()
 static void measure_bandwidth_between_host_and_dev_on_numa_with_cuda(int dev, int numa, int cpu, struct dev_timing *dev_timing_per_cpu)
 {
 	struct _starpu_machine_config *config = _starpu_get_machine_config();
-	_starpu_bind_thread_on_cpu(config, cpu, STARPU_NOWORKERID);
+	_starpu_bind_thread_on_cpu(cpu, STARPU_NOWORKERID);
 	size_t size = SIZE;
 
 	const unsigned nnuma_nodes = _starpu_topology_get_nnumanodes(config);
@@ -153,13 +153,13 @@ static void measure_bandwidth_between_host_and_dev_on_numa_with_cuda(int dev, in
 	cudaSetDevice(dev);
 
 	/* hack to avoid third party libs to rebind threads */
-	_starpu_bind_thread_on_cpu(config, cpu, STARPU_NOWORKERID);
+	_starpu_bind_thread_on_cpu(cpu, STARPU_NOWORKERID);
 
 	/* hack to force the initialization */
 	cudaFree(0);
 
 	/* hack to avoid third party libs to rebind threads */
-	_starpu_bind_thread_on_cpu(config, cpu, STARPU_NOWORKERID);
+	_starpu_bind_thread_on_cpu(cpu, STARPU_NOWORKERID);
 
         /* Get the maximum size which can be allocated on the device */
 	struct cudaDeviceProp prop;
@@ -175,7 +175,7 @@ static void measure_bandwidth_between_host_and_dev_on_numa_with_cuda(int dev, in
 	STARPU_ASSERT(cures == cudaSuccess);
 
 	/* hack to avoid third party libs to rebind threads */
-	_starpu_bind_thread_on_cpu(config, cpu, STARPU_NOWORKERID);
+	_starpu_bind_thread_on_cpu(cpu, STARPU_NOWORKERID);
 
 	/* Allocate a buffer on the host */
 	unsigned char *h_buffer;
@@ -198,14 +198,14 @@ static void measure_bandwidth_between_host_and_dev_on_numa_with_cuda(int dev, in
 	STARPU_ASSERT(cures == cudaSuccess);
 
 	/* hack to avoid third party libs to rebind threads */
-	_starpu_bind_thread_on_cpu(config, cpu, STARPU_NOWORKERID);
+	_starpu_bind_thread_on_cpu(cpu, STARPU_NOWORKERID);
 
 	/* Fill them */
 	memset(h_buffer, 0, size);
 	cudaMemset(d_buffer, 0, size);
 
 	/* hack to avoid third party libs to rebind threads */
-	_starpu_bind_thread_on_cpu(config, cpu, STARPU_NOWORKERID);
+	_starpu_bind_thread_on_cpu(cpu, STARPU_NOWORKERID);
 
 	const unsigned timing_numa_index = dev*STARPU_MAXNUMANODES + numa;
 	unsigned iter;
@@ -396,7 +396,7 @@ static void measure_bandwidth_between_host_and_dev_on_numa_with_opencl(int dev,
 	int not_initialized;
 
 	struct _starpu_machine_config *config = _starpu_get_machine_config();
-	_starpu_bind_thread_on_cpu(config, cpu, STARPU_NOWORKERID);
+	_starpu_bind_thread_on_cpu(cpu, STARPU_NOWORKERID);
 
 	const unsigned nnuma_nodes = _starpu_topology_get_nnumanodes(config);
 
@@ -430,7 +430,7 @@ static void measure_bandwidth_between_host_and_dev_on_numa_with_opencl(int dev,
 	}
 
 	/* hack to avoid third party libs to rebind threads */
-	_starpu_bind_thread_on_cpu(config, cpu, STARPU_NOWORKERID);
+	_starpu_bind_thread_on_cpu(cpu, STARPU_NOWORKERID);
 
 	/* Allocate a buffer on the device */
 	cl_mem d_buffer;
@@ -438,7 +438,7 @@ static void measure_bandwidth_between_host_and_dev_on_numa_with_opencl(int dev,
 	if (STARPU_UNLIKELY(err != CL_SUCCESS)) STARPU_OPENCL_REPORT_ERROR(err);
 
 	/* hack to avoid third party libs to rebind threads */
-	_starpu_bind_thread_on_cpu(config, cpu, STARPU_NOWORKERID);
+	_starpu_bind_thread_on_cpu(cpu, STARPU_NOWORKERID);
 	/* Allocate a buffer on the host */
 	unsigned char *h_buffer;
 #if defined(STARPU_HAVE_HWLOC)
@@ -456,14 +456,14 @@ static void measure_bandwidth_between_host_and_dev_on_numa_with_opencl(int dev,
 	}
 
 	/* hack to avoid third party libs to rebind threads */
-	_starpu_bind_thread_on_cpu(config, cpu, STARPU_NOWORKERID);
+	_starpu_bind_thread_on_cpu(cpu, STARPU_NOWORKERID);
 	/* Fill them */
 	memset(h_buffer, 0, size);
 	err = clEnqueueWriteBuffer(queue, d_buffer, CL_TRUE, 0, size, h_buffer, 0, NULL, NULL);
 	if (STARPU_UNLIKELY(err != CL_SUCCESS)) STARPU_OPENCL_REPORT_ERROR(err);
 	clFinish(queue);
 	/* hack to avoid third party libs to rebind threads */
-	_starpu_bind_thread_on_cpu(config, cpu, STARPU_NOWORKERID);
+	_starpu_bind_thread_on_cpu(cpu, STARPU_NOWORKERID);
 
 	const unsigned timing_numa_index = dev*STARPU_MAXNUMANODES + numa;
 	unsigned iter;

+ 1 - 1
src/core/sched_ctx.c

@@ -2096,7 +2096,7 @@ unsigned _starpu_sched_ctx_last_worker_awake(struct _starpu_worker *worker)
 
 void starpu_sched_ctx_bind_current_thread_to_cpuid(unsigned cpuid)
 {
-	_starpu_bind_thread_on_cpu(_starpu_get_machine_config(), cpuid, STARPU_NOWORKERID);
+	_starpu_bind_thread_on_cpu(cpuid, STARPU_NOWORKERID);
 }
 
 unsigned starpu_sched_ctx_worker_is_master_for_child_ctx(int workerid, unsigned sched_ctx_id)

+ 2 - 2
src/core/topology.c

@@ -1805,7 +1805,6 @@ void _starpu_destroy_machine_config(struct _starpu_machine_config *config)
 
 void
 _starpu_bind_thread_on_cpu (
-	struct _starpu_machine_config *config STARPU_ATTRIBUTE_UNUSED,
 	int cpuid STARPU_ATTRIBUTE_UNUSED, int workerid STARPU_ATTRIBUTE_UNUSED)
 {
 #ifdef STARPU_SIMGRID
@@ -1834,6 +1833,7 @@ _starpu_bind_thread_on_cpu (
 #ifdef STARPU_USE_CUDA
 	_starpu_init_cuda();
 #endif
+	struct _starpu_machine_config *config = _starpu_get_machine_config();
 	_starpu_init_topology(config);
 
 	support = hwloc_topology_get_support (config->topology.hwtopology);
@@ -1886,7 +1886,6 @@ _starpu_bind_thread_on_cpu (
 
 void
 _starpu_bind_thread_on_cpus (
-	struct _starpu_machine_config *config STARPU_ATTRIBUTE_UNUSED,
 	struct _starpu_combined_worker *combined_worker STARPU_ATTRIBUTE_UNUSED)
 {
 #ifdef STARPU_SIMGRID
@@ -1901,6 +1900,7 @@ _starpu_bind_thread_on_cpus (
 #ifdef STARPU_USE_CUDA
 	_starpu_init_cuda();
 #endif
+	struct _starpu_machine_config *config = _starpu_get_machine_config();
 	_starpu_init_topology(config);
 
 	support = hwloc_topology_get_support(config->topology.hwtopology);

+ 2 - 2
src/core/topology.h

@@ -64,11 +64,11 @@ void _starpu_topology_filter(hwloc_topology_t topology);
 /* Bind the current thread on the CPU logically identified by "cpuid". The
  * logical ordering of the processors is either that of hwloc (if available),
  * or the ordering exposed by the OS. */
-void _starpu_bind_thread_on_cpu(struct _starpu_machine_config *config, int cpuid, int workerid);
+void _starpu_bind_thread_on_cpu(int cpuid, int workerid);
 
 struct _starpu_combined_worker;
 /* Bind the current thread on the set of CPUs for the given combined worker. */
-void _starpu_bind_thread_on_cpus(struct _starpu_machine_config *config STARPU_ATTRIBUTE_UNUSED, struct _starpu_combined_worker *combined_worker);
+void _starpu_bind_thread_on_cpus(struct _starpu_combined_worker *combined_worker);
 
 struct _starpu_worker *_starpu_get_worker_from_driver(struct starpu_driver *d);
 

+ 5 - 1
src/core/workers.c

@@ -646,7 +646,7 @@ void _starpu_driver_start(struct _starpu_worker *worker, unsigned fut_key, unsig
 	STARPU_PTHREAD_COND_SIGNAL(&worker->started_cond);
 	STARPU_PTHREAD_MUTEX_UNLOCK(&worker->mutex);
 
-	_starpu_bind_thread_on_cpu(worker->config, worker->bindid, worker->workerid);
+	_starpu_bind_thread_on_cpu(worker->bindid, worker->workerid);
 
 #if defined(STARPU_PERF_DEBUG) && !defined(STARPU_SIMGRID)
 	setitimer(ITIMER_PROF, &prof_itimer, NULL);
@@ -1405,6 +1405,10 @@ int starpu_initialize(struct starpu_conf *user_conf, int *argc, char ***argv)
 	STARPU_PTHREAD_COND_BROADCAST(&init_cond);
 	STARPU_PTHREAD_MUTEX_UNLOCK(&init_mutex);
 
+	int main_thread_cpuid = starpu_get_env_number_default("STARPU_MAIN_THREAD_CPUID", -1);
+	if (main_thread_cpuid >= 0)
+		_starpu_bind_thread_on_cpu(main_thread_cpuid, STARPU_NOWORKERID);
+
 	_STARPU_DEBUG("Initialisation finished\n");
 
 #ifdef STARPU_USE_MP

+ 2 - 2
src/drivers/cpu/driver_cpu.c

@@ -79,7 +79,7 @@ static int execute_job_on_cpu(struct _starpu_job *j, struct starpu_task *worker_
 		_starpu_cl_func_t func = _starpu_task_get_cpu_nth_implementation(cl, j->nimpl);
 		if (is_parallel_task && cl->type == STARPU_FORKJOIN)
 			/* bind to parallel worker */
-			_starpu_bind_thread_on_cpus(cpu_args->config, _starpu_get_combined_worker_struct(j->combined_workerid));
+			_starpu_bind_thread_on_cpus(_starpu_get_combined_worker_struct(j->combined_workerid));
 		STARPU_ASSERT_MSG(func, "when STARPU_CPU is defined in 'where', cpu_func or cpu_funcs has to be defined");
 		if (_starpu_get_disable_kernels() <= 0)
 		{
@@ -102,7 +102,7 @@ static int execute_job_on_cpu(struct _starpu_job *j, struct starpu_task *worker_
 		}
 		if (is_parallel_task && cl->type == STARPU_FORKJOIN)
 			/* rebind to single CPU */
-			_starpu_bind_thread_on_cpu(cpu_args->config, cpu_args->bindid, cpu_args->workerid);
+			_starpu_bind_thread_on_cpu(cpu_args->bindid, cpu_args->workerid);
 	}
 
 	_starpu_driver_end_job(cpu_args, j, perf_arch, rank, profiling);

+ 1 - 1
src/drivers/cuda/driver_cuda.c

@@ -659,7 +659,7 @@ int _starpu_cuda_driver_init(struct _starpu_worker_set *worker_set)
 	}
 
 	/* one more time to avoid hacks from third party lib :) */
-	_starpu_bind_thread_on_cpu(worker0->config, worker0->bindid, worker0->workerid);
+	_starpu_bind_thread_on_cpu(worker0->bindid, worker0->workerid);
 
 	for (i = 0; i < worker_set->nworkers; i++)
 	{

+ 2 - 2
src/drivers/gordon/driver_gordon.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2009-2015  Université de Bordeaux
+ * Copyright (C) 2009-2015, 2017  Université de Bordeaux
  * Copyright (C) 2010, 2011, 2013, 2016, 2017  CNRS
  * Copyright (C) 2011  Télécom-SudParis
  *
@@ -55,7 +55,7 @@ void *gordon_worker_progress(void *arg)
 	struct _starpu_worker_set *gordon_set_arg = arg;
 	unsigned prog_thread_bind_id =
 		(gordon_set_arg->workers[0].bindid + 1)%(gordon_set_arg->config->nhwcores);
-	_starpu_bind_thread_on_cpu(gordon_set_arg->config, prog_thread_bind_id, gordon_set_arg->workers[0].workerid);
+	_starpu_bind_thread_on_cpu(prog_thread_bind_id, gordon_set_arg->workers[0].workerid);
 
 	STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
 	progress_thread_is_inited = 1;

+ 1 - 1
src/drivers/opencl/driver_opencl.c

@@ -617,7 +617,7 @@ int _starpu_opencl_driver_init(struct _starpu_worker *worker)
 	_starpu_opencl_init_context(devid);
 
 	/* one more time to avoid hacks from third party lib :) */
-	_starpu_bind_thread_on_cpu(worker->config, worker->bindid, worker->workerid);
+	_starpu_bind_thread_on_cpu(worker->bindid, worker->workerid);
 
 	_starpu_opencl_limit_gpu_mem_if_needed(devid);
 	_starpu_memory_manager_set_global_memory_size(worker->memory_node, _starpu_opencl_get_global_mem_size(devid));