Sfoglia il codice sorgente

Measure bandwidth and latency for mpi Master-Slave

Corentin Salingue 8 anni fa
parent
commit
b73c8258d6

+ 127 - 74
src/core/perfmodel/perfmodel_bus.c

@@ -36,6 +36,7 @@
 #include <core/perfmodel/perfmodel.h>
 #include <core/simgrid.h>
 #include <common/utils.h>
+#include <drivers/mpi/driver_mpi_common.h>
 
 #ifdef STARPU_USE_OPENCL
 #include <starpu_opencl.h>
@@ -123,6 +124,8 @@ static double mic_time_device_to_host[STARPU_MAXNODES] = {0.0};
 #ifdef STARPU_USE_MPI_MASTER_SLAVE
 static double mpi_time_host_to_device[STARPU_MAXNODES] = {0.0};
 static double mpi_time_device_to_host[STARPU_MAXNODES] = {0.0};
+static double mpi_latency_host_to_device[STARPU_MAXNODES] = {0.0};
+static double mpi_latency_device_to_host[STARPU_MAXNODES] = {0.0};
 #endif
 
 #ifdef STARPU_HAVE_HWLOC
@@ -671,7 +674,7 @@ static void benchmark_all_gpu_devices(void)
 	_STARPU_DISP("can not measure bus in simgrid mode, please run starpu_calibrate_bus in non-simgrid mode to make sure the bus performance model was calibrated\n");
 	STARPU_ABORT();
 #else /* !SIMGRID */
-#if defined(STARPU_USE_CUDA) || defined(STARPU_USE_OPENCL) || defined(STARPU_USE_MIC)
+#if defined(STARPU_USE_CUDA) || defined(STARPU_USE_OPENCL) || defined(STARPU_USE_MIC) || defined(STARPU_USE_MPI_MASTER_SLAVE)
 	unsigned i;
 #endif
 #ifdef HAVE_CUDA_MEMCPY_PEER
@@ -748,17 +751,10 @@ static void benchmark_all_gpu_devices(void)
 #endif /* STARPU_USE_MIC */
 
 #ifdef STARPU_USE_MPI_MASTER_SLAVE
-	/* TODO: implement real calibration ! For now we only put an arbitrary
-	 * value for each device during at the declaration as a bug fix, else
-	 * we get problems on heft scheduler */
-        nmpi_ms = _starpu_mpi_src_get_device_count();
+    
+    _starpu_mpi_common_measure_bandwidth_latency(mpi_time_host_to_device, mpi_time_device_to_host, mpi_latency_host_to_device, mpi_latency_device_to_host);
 
-	for (i = 0; i < STARPU_MAXNODES; i++)
-	{
-		mpi_time_host_to_device[i] = 0.1;
-		mpi_time_device_to_host[i] = 0.1;
-	}
-#endif /* STARPU_USE_MIC */
+#endif /* STARPU_USE_MPI_MASTER_SLAVE */
 
 #ifdef STARPU_HAVE_HWLOC
 	hwloc_set_cpubind(hwtopology, former_cpuset, HWLOC_CPUBIND_THREAD);
@@ -944,6 +940,12 @@ static void generate_bus_affinity_file(void)
 	if (!was_benchmarked)
 		benchmark_all_gpu_devices();
 
+#ifdef STARPU_USE_MPI_MASTER_SLAVE
+    /* Slaves don't write files */
+    if (!_starpu_mpi_common_is_src_node())
+        return;
+#endif
+
 	write_bus_affinity_file_content();
 }
 
@@ -1125,7 +1127,7 @@ static int load_bus_latency_file_content(void)
 #ifndef STARPU_SIMGRID
 static void write_bus_latency_file_content(void)
 {
-        unsigned src, dst, maxnode;
+    unsigned src, dst, maxnode;
 	FILE *f;
 
 	STARPU_ASSERT(was_benchmarked);
@@ -1193,11 +1195,18 @@ static void write_bus_latency_file_content(void)
 				}
 #endif
 #ifdef STARPU_USE_OPENCL
-				if (src > ncuda)
+				if (src > ncuda && src <= ncuda + nopencl)
 					latency += opencldev_latency_dtoh[src-ncuda];
-				if (dst > ncuda)
+				if (dst > ncuda && dst <= ncuda + nopencl)
 					latency += opencldev_latency_htod[dst-ncuda];
 #endif
+                /* TODO Latency MIC */
+#ifdef STARPU_USE_MPI_MASTER_SLAVE
+				if (src > ncuda + nopencl + nmic && src <= ncuda + nopencl + nmic + nmpi_ms)
+					latency += mpi_latency_device_to_host[src - (ncuda + nopencl + nmic) - 1];
+				if (dst > ncuda + nopencl + nmic && dst <= ncuda + nopencl + nmic + nmpi_ms)
+					latency += mpi_latency_host_to_device[dst - (ncuda + nopencl + nmic) - 1];
+#endif
 			}
 
 			if (dst)
@@ -1218,6 +1227,12 @@ static void generate_bus_latency_file(void)
 	if (!was_benchmarked)
 		benchmark_all_gpu_devices();
 
+#ifdef STARPU_USE_MPI_MASTER_SLAVE
+    /* Slaves don't write files */
+    if (!_starpu_mpi_common_is_src_node())
+        return;
+#endif
+
 #ifndef STARPU_SIMGRID
 	write_bus_latency_file_content();
 #endif
@@ -1424,10 +1439,11 @@ static void write_bus_bandwidth_file_content(void)
 					slowness += mic_time_host_to_device[dst - (ncuda + nopencl)];
 #endif
 #ifdef STARPU_USE_MPI_MASTER_SLAVE
-				if (src > ncuda + nopencl + nmic)
-					slowness += mpi_time_device_to_host[src - (ncuda + nopencl + nmic)];
-				if (dst > ncuda + nopencl + nmic)
-					slowness += mpi_time_host_to_device[dst - (ncuda + nopencl + nmic)];
+                /* here we have bandwidth */
+				if (src > ncuda + nopencl + nmic && src <= ncuda + nopencl + nmic + nmpi_ms)
+					slowness += 1.0/mpi_time_device_to_host[src - (ncuda + nopencl + nmic) - 1];
+				if (dst > ncuda + nopencl + nmic && dst <= ncuda + nopencl + nmic +nmpi_ms)
+					slowness += 1.0/mpi_time_host_to_device[dst - (ncuda + nopencl + nmic) - 1];
 #endif
 				bandwidth = 1.0/slowness;
 			}
@@ -1579,6 +1595,12 @@ static void generate_bus_bandwidth_file(void)
 {
 	if (!was_benchmarked)
 		benchmark_all_gpu_devices();
+    
+#ifdef STARPU_USE_MPI_MASTER_SLAVE
+    /* Slaves don't write files */
+    if (!_starpu_mpi_common_is_src_node())
+        return;
+#endif
 
 #ifndef STARPU_SIMGRID
 	write_bus_bandwidth_file_content();
@@ -1609,6 +1631,25 @@ static void get_config_path(char *path, size_t maxlen)
 	get_bus_path("config", path, maxlen);
 }
 
+static void compare_value_and_recalibrate(char * msg, unsigned val_file, unsigned val_detected)
+{
+    if (val_file != val_detected)
+    {
+#ifdef STARPU_USE_MPI_MASTER_SLAVE
+        /* Only the master prints the message */
+        if (_starpu_mpi_common_is_src_node())
+#endif
+
+        _STARPU_DISP("Current configuration does not match the bus performance model (%s: (stored) %d != (current) %d), recalibrating...\n", msg, val_file, val_detected);
+        _starpu_bus_force_sampling();
+
+#ifdef STARPU_USE_MPI_MASTER_SLAVE
+        if (_starpu_mpi_common_is_src_node())
+#endif
+        _STARPU_DISP("... done\n");
+    }
+}
+
 static void check_bus_config_file(void)
 {
         int res;
@@ -1627,70 +1668,58 @@ static void check_bus_config_file(void)
         }
         else
 	{
-                FILE *f;
-                int ret;
-		unsigned read_cuda = -1, read_opencl = -1, read_mic = -1;
-                unsigned read_cpus = -1;
-
-                // Loading configuration from file
-                f = fopen(path, "r");
-                STARPU_ASSERT(f);
-		_starpu_frdlock(f);
-                _starpu_drop_comments(f);
-                ret = fscanf(f, "%u\t", &read_cpus);
-		STARPU_ASSERT(ret == 1);
-                _starpu_drop_comments(f);
-		ret = fscanf(f, "%d\t", &read_cuda);
-		STARPU_ASSERT(ret == 1);
-                _starpu_drop_comments(f);
-		ret = fscanf(f, "%d\t", &read_opencl);
-		STARPU_ASSERT(ret == 1);
-                _starpu_drop_comments(f);
-		ret = fscanf(f, "%d\t", &read_mic);
-		if (ret == 0)
-			read_mic = 0;
-                _starpu_drop_comments(f);
-		_starpu_frdunlock(f);
-                fclose(f);
-
-                // Loading current configuration
-                ncpus = _starpu_topology_get_nhwcpu(config);
+        FILE *f;
+        int ret;
+        unsigned read_cuda = -1, read_opencl = -1, read_mic = -1, read_mpi_ms = -1;
+        unsigned read_cpus = -1;
+
+        // Loading configuration from file
+        f = fopen(path, "r");
+        STARPU_ASSERT(f);
+        _starpu_frdlock(f);
+        _starpu_drop_comments(f);
+        ret = fscanf(f, "%u\t", &read_cpus);
+        STARPU_ASSERT(ret == 1);
+        _starpu_drop_comments(f);
+        ret = fscanf(f, "%d\t", &read_cuda);
+        STARPU_ASSERT(ret == 1);
+        _starpu_drop_comments(f);
+        ret = fscanf(f, "%d\t", &read_opencl);
+        STARPU_ASSERT(ret == 1);
+        _starpu_drop_comments(f);
+        ret = fscanf(f, "%d\t", &read_mic);
+        if (ret == 0)
+            read_mic = 0;
+        _starpu_drop_comments(f);
+        ret = fscanf(f, "%d\t", &read_mpi_ms);
+        if (ret == 0)
+            read_mpi_ms = 0;
+        _starpu_drop_comments(f);
+        _starpu_frdunlock(f);
+        fclose(f);
+
+        // Loading current configuration
+        ncpus = _starpu_topology_get_nhwcpu(config);
 #ifdef STARPU_USE_CUDA
 		ncuda = _starpu_get_cuda_device_count();
 #endif
 #ifdef STARPU_USE_OPENCL
-                nopencl = _starpu_opencl_get_device_count();
+        nopencl = _starpu_opencl_get_device_count();
 #endif
 #ifdef STARPU_USE_MIC
-                nmic = _starpu_mic_src_get_device_count();
+        nmic = _starpu_mic_src_get_device_count();
 #endif /* STARPU_USE_MIC */
+#ifdef STARPU_USE_MPI_MASTER_SLAVE
+        nmpi_ms = _starpu_mpi_src_get_device_count();
+#endif /* STARPU_USE_MPI_MASTER_SLAVE */
 
-                // Checking if both configurations match
-                if (read_cpus != ncpus)
-		{
-			_STARPU_DISP("Current configuration does not match the bus performance model (CPUS: (stored) %u != (current) %u), recalibrating...\n", read_cpus, ncpus);
-                        _starpu_bus_force_sampling();
-			_STARPU_DISP("... done\n");
-                }
-                else if (read_cuda != ncuda)
-		{
-                        _STARPU_DISP("Current configuration does not match the bus performance model (CUDA: (stored) %d != (current) %d), recalibrating...\n", read_cuda, ncuda);
-                        _starpu_bus_force_sampling();
-			_STARPU_DISP("... done\n");
-                }
-                else if (read_opencl != nopencl)
-		{
-                        _STARPU_DISP("Current configuration does not match the bus performance model (OpenCL: (stored) %d != (current) %d), recalibrating...\n", read_opencl, nopencl);
-                        _starpu_bus_force_sampling();
-			_STARPU_DISP("... done\n");
-                }
-                else if (read_mic != nmic)
-		{
-                        _STARPU_DISP("Current configuration does not match the bus performance model (MIC: (stored) %d != (current) %d), recalibrating...\n", read_mic, nmic);
-                        _starpu_bus_force_sampling();
-			_STARPU_DISP("... done\n");
-                }
-        }
+        // Checking if both configurations match
+        compare_value_and_recalibrate("CPUS", read_cpus, ncpus);
+        compare_value_and_recalibrate("CUDA", read_cuda, ncuda);
+        compare_value_and_recalibrate("OpenCL", read_opencl, nopencl);
+        compare_value_and_recalibrate("MIC", read_mic, nmic);
+        compare_value_and_recalibrate("MPI Master-Slave", read_mpi_ms, nmpi_ms);
+    }
 }
 
 static void write_bus_config_file_content(void)
@@ -1713,6 +1742,7 @@ static void write_bus_config_file_content(void)
         fprintf(f, "%d # Number of CUDA devices\n", ncuda);
         fprintf(f, "%d # Number of OpenCL devices\n", nopencl);
         fprintf(f, "%d # Number of MIC devices\n", nmic);
+        fprintf(f, "%d # Number of MPI devices\n", nmpi_ms);
 
 	_starpu_fwrunlock(f);
         fclose(f);
@@ -1722,6 +1752,12 @@ static void generate_bus_config_file(void)
 {
 	if (!was_benchmarked)
 		benchmark_all_gpu_devices();
+    
+#ifdef STARPU_USE_MPI_MASTER_SLAVE
+    /* Slaves don't write files */
+    if (!_starpu_mpi_common_is_src_node())
+        return;
+#endif
 
 	write_bus_config_file_content();
 }
@@ -2448,6 +2484,12 @@ static void generate_bus_platform_file(void)
 	if (!was_benchmarked)
 		benchmark_all_gpu_devices();
 
+#ifdef STARPU_USE_MPI_MASTER_SLAVE
+    /* Slaves don't write files */
+    if (!_starpu_mpi_common_is_src_node())
+        return;
+#endif
+
 	write_bus_platform_file_content(3);
 	write_bus_platform_file_content(4);
 }
@@ -2501,9 +2543,20 @@ void _starpu_load_bus_performance_files(void)
 #if defined(STARPU_USE_OPENCL) || defined(STARPU_USE_SIMGRID)
 	nopencl = _starpu_opencl_get_device_count();
 #endif
+#if defined(STARPU_USE_MPI_MASTER_SLAVE) || defined(STARPU_USE_SIMGRID)
+    nmpi_ms = _starpu_mpi_src_get_device_count();
+#endif
 
 #ifndef STARPU_SIMGRID
         check_bus_config_file();
+#endif
+
+#ifdef STARPU_USE_MPI_MASTER_SLAVE
+        /* be sure that master wrote the perf files */
+        _starpu_mpi_common_barrier();
+#endif
+
+#ifndef STARPU_SIMGRID
 	load_bus_affinity_file();
 #endif
 	load_bus_latency_file();

+ 116 - 1
src/drivers/mpi/driver_mpi_common.c

@@ -17,8 +17,12 @@
 
 #include <mpi.h>
 #include <core/workers.h>
+#include <core/perfmodel/perfmodel.h>
 #include "driver_mpi_common.h"
 
+#define NITER 32
+#define SIZE_BANDWIDTH (1024*1024)
+
 #define DRIVER_MPI_MASTER_NODE_DEFAULT 0
 
 static int mpi_initialized;
@@ -100,7 +104,6 @@ void _starpu_mpi_common_mp_deinit()
     mpi_initialized = 0;
 }
 
-
 int _starpu_mpi_common_is_src_node()
 {   
     int id_proc;
@@ -209,3 +212,115 @@ void _starpu_mpi_common_recv_from_device(const struct _starpu_mp_node *node, int
     res = MPI_Recv(msg, len, MPI_BYTE, src_devid, tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
     STARPU_ASSERT_MSG(res == MPI_SUCCESS, "MPI Master/Slave cannot receive a msg with a size of %d Bytes !", len);
 }
+
+void _starpu_mpi_common_barrier(void)
+{
+    MPI_Barrier(MPI_COMM_WORLD);
+}
+
+/* Compute bandwidth and latency between source and sink nodes
+ * Source node has to have the entire set of times at the end
+ */
+void _starpu_mpi_common_measure_bandwidth_latency(double * bandwidth_htod, double * bandwidth_dtoh, double * latency_htod, double * latency_dtoh)
+{
+    int ret;
+    unsigned iter;
+
+    int nb_proc, id_proc;
+    MPI_Comm_rank(MPI_COMM_WORLD, &id_proc);
+    MPI_Comm_size(MPI_COMM_WORLD, &nb_proc);
+
+    char * buf = malloc(SIZE_BANDWIDTH);
+    memset(buf, 0, SIZE_BANDWIDTH);
+
+    unsigned node;
+    unsigned id = 0;
+    for(node = 0; node < nb_proc; node++)
+    {
+        MPI_Barrier(MPI_COMM_WORLD);
+
+        //Don't measure link master <-> master
+        if(node == src_node_id)
+            continue;
+
+        if(_starpu_mpi_common_is_src_node())
+        {
+            double start, end;
+
+            /* measure bandwidth host to device */
+            start = starpu_timing_now();
+            for (iter = 0; iter < NITER; iter++)
+            {
+                ret = MPI_Send(buf, SIZE_BANDWIDTH, MPI_BYTE, node, node, MPI_COMM_WORLD); 
+                STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "Bandwidth of MPI Master/Slave cannot be measured !");
+            }
+            end = starpu_timing_now();
+            bandwidth_htod[id] = (NITER*1000000)/(end - start);
+
+            /* measure bandwidth device to host */
+            start = starpu_timing_now();
+            for (iter = 0; iter < NITER; iter++)
+            {
+                ret = MPI_Recv(buf, SIZE_BANDWIDTH, MPI_BYTE, node, node, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
+                STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "Bandwidth of MPI Master/Slave cannot be measured !");
+            }
+            end = starpu_timing_now();
+            bandwidth_dtoh[id] = (NITER*1000000)/(end - start);
+
+            /* measure latency host to device */
+            start = starpu_timing_now();
+            for (iter = 0; iter < NITER; iter++)
+            {
+                ret = MPI_Send(buf, 1, MPI_BYTE, node, node, MPI_COMM_WORLD); 
+                STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "Latency of MPI Master/Slave cannot be measured !");
+            }
+            end = starpu_timing_now();
+            latency_htod[id] = (end - start)/NITER;
+
+            /* measure latency device to host */
+            start = starpu_timing_now();
+            for (iter = 0; iter < NITER; iter++)
+            {
+                ret = MPI_Recv(buf, 1, MPI_BYTE, node, node, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
+                STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "Bandwidth of MPI Master/Slave cannot be measured !");
+            }
+            end = starpu_timing_now();
+            latency_dtoh[id] = (end - start)/NITER;
+
+        }
+        else if (node == id_proc) /* if we are the sink node evaluated */
+        {
+            /* measure bandwidth host to device */
+            for (iter = 0; iter < NITER; iter++)
+            {
+                ret = MPI_Recv(buf, SIZE_BANDWIDTH, MPI_BYTE, src_node_id, node, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
+                STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "Bandwidth of MPI Master/Slave cannot be measured !");
+            }
+
+            /* measure bandwidth device to host */
+            for (iter = 0; iter < NITER; iter++)
+            {
+                ret = MPI_Send(buf, SIZE_BANDWIDTH, MPI_BYTE, src_node_id, node, MPI_COMM_WORLD); 
+                STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "Bandwidth of MPI Master/Slave cannot be measured !");
+            }
+
+            /* measure latency host to device */
+            for (iter = 0; iter < NITER; iter++)
+            {
+                ret = MPI_Recv(buf, 1, MPI_BYTE, src_node_id, node, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
+                STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "Bandwidth of MPI Master/Slave cannot be measured !");
+            }
+
+            /* measure latency device to host */
+            for (iter = 0; iter < NITER; iter++)
+            {
+                ret = MPI_Send(buf, 1, MPI_BYTE, src_node_id, node, MPI_COMM_WORLD); 
+                STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "Latency of MPI Master/Slave cannot be measured !");
+            }
+        }
+
+        id++;
+    }
+    
+    free(buf);
+}

+ 4 - 0
src/drivers/mpi/driver_mpi_common.h

@@ -40,6 +40,10 @@ void _starpu_mpi_common_recv(const struct _starpu_mp_node *node, void *msg, int
 void _starpu_mpi_common_recv_from_device(const struct _starpu_mp_node *node, int src_devid, void *msg, int len);
 void _starpu_mpi_common_send_to_device(const struct _starpu_mp_node *node, int dst_devid, void *msg, int len);
 
+void _starpu_mpi_common_barrier(void);
+
+void _starpu_mpi_common_measure_bandwidth_latency(double * bandwidth_htod, double * bandwidth_dtoh, double * latency_htod, double * latency_dtoh);
+
 
 #endif  /* STARPU_USE_MPI_MASTER_SLAVE */