Sfoglia il codice sorgente

While sampling the performance of the bus, we now measure which are the CPUs
that have the best affinity to the CUDA devices, so that we put CUDA workers
close to the GPU whenever possible.

Cédric Augonnet 15 anni fa
parent
commit
07a84c2b9f

+ 4 - 0
src/core/perfmodel/perfmodel.h

@@ -101,5 +101,9 @@ void create_sampling_directory_if_needed(void);
 
 void load_bus_performance_files(void);
 double predict_transfer_time(unsigned src_node, unsigned dst_node, size_t size);
+
+#ifdef USE_CUDA
+int *get_gpu_affinity_vector(unsigned gpuid);
+#endif
  
 #endif // __PERFMODEL_H__

+ 225 - 4
src/core/perfmodel/perfmodel_bus.c

@@ -16,31 +16,51 @@
 
 #include <unistd.h>
 #include <sys/time.h>
+#include <stdlib.h>
 
 #include <starpu.h>
 #include <common/config.h>
+#include <core/workers.h>
 #include <core/perfmodel/perfmodel.h>
 #include <datawizard/data_parameters.h>
 
 #define SIZE	(32*1024*1024*sizeof(char))
 #define NITER	128
 
+#define MAXCPUS	32
+
+struct cudadev_timing {
+	int cpu_id;
+	double timing_htod;
+	double timing_dtoh;
+};
+
 static double bandwith_matrix[MAXNODES][MAXNODES] = {{-1.0}};
 static double latency_matrix[MAXNODES][MAXNODES] = {{ -1.0}};
 static unsigned was_benchmarked = 0;
 static int ncuda = 0;
 
+static int affinity_matrix[MAXCUDADEVS][MAXCPUS];
+
 /* Benchmarking the performance of the bus */
 
 #ifdef USE_CUDA
 static double cudadev_timing_htod[MAXNODES] = {0.0};
 static double cudadev_timing_dtoh[MAXNODES] = {0.0};
 
-static void measure_bandwith_between_host_and_dev(int dev)
+static struct cudadev_timing cudadev_timing_per_cpu[MAXNODES][MAXCPUS];
+
+static void measure_bandwith_between_host_and_dev_on_cpu(int dev, int cpu)
 {
+	struct machine_config_s *config = get_machine_config();
+	bind_thread_on_cpu(config, cpu);
+
 	/* Initiliaze CUDA context on the device */
 	cudaSetDevice(dev);
 
+	/* hack to avoid third party libs to rebind threads */
+	bind_thread_on_cpu(config, cpu);
+
 	/* hack to force the initialization */
 	cudaFree(0);
 
@@ -63,6 +83,8 @@ static void measure_bandwith_between_host_and_dev(int dev)
 	struct timeval start;
 	struct timeval end;
 
+	cudadev_timing_per_cpu[dev+1][cpu].cpu_id = cpu;
+
 	/* Measure upload bandwith */
 	gettimeofday(&start, NULL);
 	for (iter = 0; iter < NITER; iter++)
@@ -73,7 +95,7 @@ static void measure_bandwith_between_host_and_dev(int dev)
 	gettimeofday(&end, NULL);
 	timing = (double)((end.tv_sec - start.tv_sec)*1000000 + (end.tv_usec - start.tv_usec));
 
-	cudadev_timing_htod[dev+1] = timing/NITER;
+	cudadev_timing_per_cpu[dev+1][cpu].timing_htod = timing/NITER;
 
 	/* Measure download bandwith */
 	gettimeofday(&start, NULL);
@@ -85,34 +107,117 @@ static void measure_bandwith_between_host_and_dev(int dev)
 	gettimeofday(&end, NULL);
 	timing = (double)((end.tv_sec - start.tv_sec)*1000000 + (end.tv_usec - start.tv_usec));
 
-	cudadev_timing_dtoh[dev+1] = timing/NITER;
+	cudadev_timing_per_cpu[dev+1][cpu].timing_dtoh = timing/NITER;
 
 	/* Free buffers */
 	cudaFreeHost(h_buffer);
 	cudaFree(d_buffer);
 
 	cudaThreadExit();
+
+}
+
+/* NB: we want to sort the bandwith by DECREASING order */
+int compar_cudadev_timing(const void *left_cudadev_timing, const void *right_cudadev_timing)
+{
+	const struct cudadev_timing *left = left_cudadev_timing;
+	const struct cudadev_timing *right = right_cudadev_timing;
+	
+	double left_dtoh = left->timing_dtoh;
+	double left_htod = left->timing_htod;
+	double right_dtoh = right->timing_dtoh;
+	double right_htod = right->timing_htod;
+	
+	double bandwith_sum2_left = left_dtoh*left_dtoh + left_htod*left_htod;
+	double bandwith_sum2_right = right_dtoh*right_dtoh + right_htod*right_htod;
+
+	/* it's for a decreasing sorting */
+	return (bandwith_sum2_left < bandwith_sum2_right);
+}
+
+static void measure_bandwith_between_host_and_dev(int dev, unsigned ncores)
+{
+	unsigned core;
+	for (core = 0; core < ncores; core++)
+	{
+		measure_bandwith_between_host_and_dev_on_cpu(dev, core);
+	}
+
+	/* sort the results */
+	qsort(cudadev_timing_per_cpu[dev+1], ncores,
+			sizeof(struct cudadev_timing),
+			compar_cudadev_timing);
+	
+#ifdef VERBOSE
+	/* find the best candidate(s) */
+	unsigned best_core = 0;
+	for (core = 0; core < ncores; core++)
+	{
+		unsigned current_core = cudadev_timing_per_cpu[dev+1][core].cpu_id;
+		double bandwith_dtoh = cudadev_timing_per_cpu[dev+1][core].timing_dtoh;
+		double bandwith_htod = cudadev_timing_per_cpu[dev+1][core].timing_htod;
+
+		double bandwith_sum2 = bandwith_dtoh*bandwith_dtoh + bandwith_htod*bandwith_htod;
+
+		fprintf(stderr, "BANDWITH GPU %d CPU %d - htod %lf - dtoh %lf - %lf\n", dev, current_core, bandwith_htod, bandwith_dtoh, sqrt(bandwith_sum2));
+
+		if (bandwith_sum2 > best_bandwith_sum2)
+		{
+			best_bandwith_sum2 = bandwith_sum2;
+			best_core = current_core;
+		}
+	}
+
+	fprintf(stderr, "BANDWITH GPU %d BEST CPU %d\n", dev, best_core);
+#endif
+
+	/* The results are sorted in a decreasing order, so that the best
+	 * measurement is currently the first entry. */
+	cudadev_timing_dtoh[dev+1] = cudadev_timing_per_cpu[dev+1][0].timing_dtoh;
+	cudadev_timing_htod[dev+1] = cudadev_timing_per_cpu[dev+1][0].timing_htod;
 }
 #endif
 
 static void benchmark_all_cuda_devices(void)
 {
+	int ret;
+
 #ifdef VERBOSE
 	fprintf(stderr, "Benchmarking the speed of the bus\n");
 #endif
 
+	/* Save the current cpu binding */
+	cpu_set_t former_process_affinity;
+	ret = sched_getaffinity(0, sizeof(former_process_affinity), &former_process_affinity);
+	if (ret)
+	{
+		perror("sched_getaffinity");
+		STARPU_ABORT();
+	}
+
 #ifdef USE_CUDA
+	struct machine_config_s *config = get_machine_config();
+	unsigned ncores = topology_get_nhwcore(config);
+
         cudaGetDeviceCount(&ncuda);
 	int i;
 	for (i = 0; i < ncuda; i++)
 	{
 		/* measure bandwith between Host and Device i */
-		measure_bandwith_between_host_and_dev(i);
+		measure_bandwith_between_host_and_dev(i, ncores);
 	}
 #endif
 
 	was_benchmarked = 1;
 
+	/* Restore the former affinity */
+	ret = sched_setaffinity(0, sizeof(former_process_affinity), &former_process_affinity);
+	if (ret)
+	{
+		perror("sched_setaffinity");
+		STARPU_ABORT();
+	}
+
 #ifdef VERBOSE
 	fprintf(stderr, "Benchmarking the speed of the bus is done.\n");
 #endif
@@ -130,6 +235,121 @@ static void get_bus_path(const char *type, char *path, size_t maxlen)
 }
 
 /*
+ *	Affinity
+ */
+
+static void get_affinity_path(char *path, size_t maxlen)
+{
+	get_bus_path("affinity", path, maxlen);
+}
+
+static void load_bus_affinity_file_content(void)
+{
+	FILE *f;
+
+	char path[256];
+	get_affinity_path(path, 256);
+
+	f = fopen(path, "r");
+	STARPU_ASSERT(f);
+
+#ifdef USE_CUDA
+	struct machine_config_s *config = get_machine_config();
+	unsigned ncores = topology_get_nhwcore(config);
+
+        cudaGetDeviceCount(&ncuda);
+
+	int gpu;
+	for (gpu = 0; gpu < ncuda; gpu++)
+	{
+		int ret;
+
+		int dummy;
+
+		ret = fscanf(f, "%d\t", &dummy);
+		STARPU_ASSERT(ret == 1);
+
+		STARPU_ASSERT(dummy == gpu);
+
+		unsigned core;
+		for (core = 0; core < ncores; core++)
+		{
+			ret = fscanf(f, "%d\t", &affinity_matrix[gpu][core]);
+			STARPU_ASSERT(ret == 1);
+		}
+
+		ret = fscanf(f, "\n");
+		STARPU_ASSERT(ret == 0);
+	}
+#endif
+
+	fclose(f);
+}
+
+static void write_bus_affinity_file_content(void)
+{
+	FILE *f;
+
+	STARPU_ASSERT(was_benchmarked);
+
+	char path[256];
+	get_affinity_path(path, 256);
+
+	f = fopen(path, "w+");
+	if (!f)
+	{
+		perror("fopen");
+		STARPU_ABORT();
+	}
+
+#ifdef USE_CUDA
+	struct machine_config_s *config = get_machine_config();
+	unsigned ncores = topology_get_nhwcore(config);
+
+	int gpu;
+	for (gpu = 0; gpu < ncuda; gpu++)
+	{
+		fprintf(f, "%d\t", gpu);
+
+		unsigned core;
+		for (core = 0; core < ncores; core++)
+		{
+			fprintf(f, "%d\t", cudadev_timing_per_cpu[gpu+1][core].cpu_id);
+		}
+
+		fprintf(f, "\n");
+	}
+#endif
+
+	fclose(f);
+}
+
+static void load_bus_affinity_file(void)
+{
+	int res;
+
+	char path[256];
+	get_affinity_path(path, 256);
+
+	res = access(path, F_OK);
+	if (res)
+	{
+		/* File does not exist yet */
+		if (!was_benchmarked)
+			benchmark_all_cuda_devices();
+
+		write_bus_affinity_file_content();
+	}
+
+	load_bus_affinity_file_content();
+}
+
+int *get_gpu_affinity_vector(unsigned gpuid)
+{
+	return affinity_matrix[gpuid];
+}
+
+/*
  *	Latency
  */
 
@@ -355,6 +575,7 @@ void load_bus_performance_files(void)
 {
 	create_sampling_directory_if_needed();
 
+	load_bus_affinity_file();
 	load_bus_latency_file();
 	load_bus_bandwith_file();
 }

+ 93 - 20
src/core/topology.c

@@ -25,6 +25,10 @@
 #ifdef HAVE_HWLOC
 #include <hwloc.h>
 #endif
+		
+static unsigned topology_is_initialized = 0;
+
+static unsigned may_bind_automatically = 0;
 
 static void initialize_workers_bindid(struct machine_config_s *config);
 
@@ -100,6 +104,9 @@ static void initialize_workers_gpuid(struct machine_config_s *config)
 		/* by default, we take a round robin policy */
 		for (i = 0; i < STARPU_NMAXWORKERS; i++)
 			config->workers_gpuid[i] = (unsigned)i;
+
+		/* StarPU can use sampling techniques to bind threads correctly */
+		may_bind_automatically = 1;
 	}
 }
 #endif
@@ -111,31 +118,48 @@ static inline int get_next_gpuid(struct machine_config_s *config)
 	return (int)config->workers_gpuid[i];
 }
 
-static int init_machine_config(struct machine_config_s *config,
-				struct starpu_conf *user_conf)
+static void init_topology(struct machine_config_s *config)
 {
-	int explicitval __attribute__((unused));
-	unsigned use_accelerator = 0;
-
-	config->nworkers = 0;
-
+	if (!topology_is_initialized)
+	{
 #ifdef HAVE_HWLOC
-	hwloc_topology_init(&config->hwtopology);
-	hwloc_topology_load(config->hwtopology);
+		hwloc_topology_init(&config->hwtopology);
+		hwloc_topology_load(config->hwtopology);
 
-	config->core_depth = hwloc_get_type_depth(config->hwtopology, HWLOC_OBJ_CORE);
+		config->core_depth = hwloc_get_type_depth(config->hwtopology, HWLOC_OBJ_CORE);
 
-	/* Would be very odd */
-	STARPU_ASSERT(config->core_depth != HWLOC_TYPE_DEPTH_MULTIPLE);
+		/* Would be very odd */
+		STARPU_ASSERT(config->core_depth != HWLOC_TYPE_DEPTH_MULTIPLE);
 
-	if (config->core_depth == HWLOC_TYPE_DEPTH_UNKNOWN)
-		/* unknown, using logical procesors as fallback */
-		config->core_depth = hwloc_get_type_depth(config->hwtopology, HWLOC_OBJ_PROC);
+		if (config->core_depth == HWLOC_TYPE_DEPTH_UNKNOWN)
+			/* unknown, using logical procesors as fallback */
+			config->core_depth = hwloc_get_type_depth(config->hwtopology, HWLOC_OBJ_PROC);
 
-	config->nhwcores = hwloc_get_nbobjs_by_depth(config->hwtopology, config->core_depth);
+		config->nhwcores = hwloc_get_nbobjs_by_depth(config->hwtopology, config->core_depth);
 #else
-	config->nhwcores = sysconf(_SC_NPROCESSORS_ONLN);
+		config->nhwcores = sysconf(_SC_NPROCESSORS_ONLN);
 #endif
+	
+		topology_is_initialized = 1;
+	}
+}
+
+unsigned topology_get_nhwcore(struct machine_config_s *config)
+{
+	init_topology(config);
+	
+	return config->nhwcores;
+}
+
+static int init_machine_config(struct machine_config_s *config,
+				struct starpu_conf *user_conf)
+{
+	int explicitval __attribute__((unused));
+	unsigned use_accelerator = 0;
+
+	config->nworkers = 0;
+
+	init_topology(config);
 
 	initialize_workers_bindid(config);
 
@@ -336,8 +360,44 @@ static void initialize_workers_bindid(struct machine_config_s *config)
 	}
 }
 
-static inline int get_next_bindid(struct machine_config_s *config)
+/* This function gets the identifier of the next core on which to bind a
+ * worker. In case a list of preferred cores was specified, we look for a an
+ * available core among the list if possible, otherwise a round-robin policy is
+ * used. */
+static inline int get_next_bindid(struct machine_config_s *config,
+				int *preferred_binding, int npreferred)
 {
+	unsigned found = 0;
+	int current_preferred;
+
+	for (current_preferred = 0; current_preferred < npreferred; current_preferred++)
+	{
+		if (found)
+			break;
+
+		unsigned requested_core = preferred_binding[current_preferred];
+
+		/* can we bind the worker on the requested core ? */
+		unsigned ind;
+		for (ind = config->current_bindid; ind < config->nhwcores; ind++)
+		{
+			if (config->workers_bindid[ind] == requested_core)
+			{
+				/* the core is available, we  use it ! In order
+				 * to make sure that it will not be used again
+				 * later on, we remove the entry from the list
+				 * */
+				config->workers_bindid[ind] =
+					config->workers_bindid[config->current_bindid];
+				config->workers_bindid[config->current_bindid] = requested_core;
+
+				found = 1;
+
+				break;
+			}
+		}
+	}
+
 	unsigned i = ((config->current_bindid++) % STARPU_NMAXWORKERS);
 
 	return (int)config->workers_bindid[i];
@@ -348,6 +408,8 @@ void bind_thread_on_cpu(struct machine_config_s *config __attribute__((unused)),
 	int ret;
 
 #ifdef HAVE_HWLOC
+	init_topology(config);
+
 	hwloc_obj_t obj = hwloc_get_obj_by_depth(config->hwtopology, config->core_depth, coreid);
 	hwloc_cpuset_t set = obj->cpuset;
 	hwloc_cpuset_singlify(set);
@@ -396,6 +458,10 @@ static void init_workers_binding(struct machine_config_s *config)
 		unsigned memory_node = -1;
 		unsigned is_a_set_of_accelerators = 0;
 		struct worker_s *workerarg = &config->workers[worker];
+
+		/* Perhaps the worker has some "favourite" bindings  */
+		int *preferred_binding = NULL;
+		int npreferred = 0;
 		
 		/* select the memory node that contains worker's memory */
 		switch (workerarg->arch) {
@@ -412,6 +478,12 @@ static void init_workers_binding(struct machine_config_s *config)
 #endif
 #ifdef USE_CUDA
 			case STARPU_CUDA_WORKER:
+				if (may_bind_automatically)
+				{
+					/* StarPU is allowed to bind threads automatically */
+					preferred_binding = get_gpu_affinity_vector(workerarg->id);
+					npreferred = config->nhwcores;
+				}
 				is_a_set_of_accelerators = 0;
 				memory_node = register_memory_node(CUDA_RAM);
 				break;
@@ -422,11 +494,12 @@ static void init_workers_binding(struct machine_config_s *config)
 
 		if (is_a_set_of_accelerators) {
 			if (accelerator_bindid == -1)
-				accelerator_bindid = get_next_bindid(config);
+				accelerator_bindid = get_next_bindid(config, preferred_binding, npreferred);
+
 			workerarg->bindid = accelerator_bindid;
 		}
 		else {
-			workerarg->bindid = get_next_bindid(config);
+			workerarg->bindid = get_next_bindid(config, preferred_binding, npreferred);
 		}
 
 		workerarg->memory_node = memory_node;

+ 5 - 0
src/core/topology.h

@@ -66,4 +66,9 @@ int starpu_build_topology(struct machine_config_s *config);
 
 void starpu_destroy_topology(struct machine_config_s *config);
 
+/* returns the number of physical cores */
+unsigned topology_get_nhwcore(struct machine_config_s *config);
+
+void bind_thread_on_cpu(struct machine_config_s *config, unsigned coreid);
+
 #endif // __TOPOLOGY_H__

+ 5 - 0
src/core/workers.c

@@ -24,6 +24,11 @@ static pthread_key_t worker_key;
 
 static struct machine_config_s config;
 
+struct machine_config_s *get_machine_config(void)
+{
+	return &config;
+}
+
 /* in case a task is submitted, we may check whether there exists a worker
    that may execute the task or not */
 

+ 2 - 2
src/core/workers.h

@@ -140,8 +140,6 @@ inline uint32_t may_submit_cuda_task(void);
 inline uint32_t may_submit_core_task(void);
 inline uint32_t worker_may_execute_task(unsigned workerid, uint32_t where);
 
-void bind_thread_on_cpu(struct machine_config_s *config, unsigned coreid);
-
 inline void lock_all_queues_attached_to_node(unsigned node);
 inline void unlock_all_queues_attached_to_node(unsigned node);
 inline void broadcast_all_queues_attached_to_node(unsigned node);
@@ -151,4 +149,6 @@ struct worker_s *get_local_worker_key(void);
 
 struct worker_s *get_worker_struct(unsigned id);
 
+struct machine_config_s *get_machine_config(void);
+
 #endif // __WORKERS_H__