浏览代码

Lock performance model files while writing and reading them to avoid issues on parallel launches, MPI runs notably.

Samuel Thibault 10 年之前
父节点
当前提交
14b8f531a6
共有 6 个文件被更改,包括 91 次插入2 次删除
  1. 4 0
      ChangeLog
  2. 44 0
      src/common/utils.c
  3. 5 0
      src/common/utils.h
  4. 26 2
      src/core/perfmodel/perfmodel_bus.c
  5. 9 0
      src/core/perfmodel/perfmodel_history.c
  6. 3 0
      src/core/simgrid.c

+ 4 - 0
ChangeLog

@@ -101,6 +101,10 @@ New features:
   * One can register an existing on-GPU buffer to be used by a handle.
   * Add the starpu_paje_summary statistics tool.
 
+Small changes:
+  * Lock performance model files while writing and reading them to avoid
+    issues on parallel launches, MPI runs notably.
+
 StarPU 1.1.2 (svn revision xxx)
 ==============================================
 The scheduling context release

+ 44 - 0
src/common/utils.c

@@ -21,6 +21,7 @@
 #include <libgen.h>
 #include <errno.h>
 #include <unistd.h>
+#include <fcntl.h>
 
 #ifdef __MINGW32__
 #include <io.h>
@@ -102,6 +103,49 @@ void _starpu_mkpath_and_check(const char *path, mode_t mode)
 	}
 }
 
+int _starpu_ftruncate(FILE *file)
+{
+	return ftruncate(fileno(file), 0);
+}
+
+int _starpu_frdlock(FILE *file)
+{
+	struct flock lock = {
+		.l_type = F_RDLCK,
+		.l_whence = SEEK_SET,
+		.l_start = 0,
+		.l_len = 0
+	};
+	return fcntl(fileno(file), F_SETLKW, &lock);
+}
+
+int _starpu_frdunlock(FILE *file)
+{
+	struct flock lock = {
+		.l_type = F_UNLCK,
+		.l_whence = SEEK_SET,
+		.l_start = 0,
+		.l_len = 0
+	};
+	return fcntl(fileno(file), F_SETLKW, &lock);
+}
+
+int _starpu_fwrlock(FILE *file)
+{
+	struct flock lock = {
+		.l_type = F_WRLCK,
+		.l_whence = SEEK_SET,
+		.l_start = 0,
+		.l_len = 0
+	};
+	return fcntl(fileno(file), F_SETLKW, &lock);
+}
+
+int _starpu_fwrunlock(FILE *file)
+{
+	return _starpu_frdunlock(file);
+}
+
 int _starpu_check_mutex_deadlock(starpu_pthread_mutex_t *mutex)
 {
 	int ret;

+ 5 - 0
src/common/utils.h

@@ -105,6 +105,11 @@
 
 int _starpu_mkpath(const char *s, mode_t mode);
 void _starpu_mkpath_and_check(const char *s, mode_t mode);
+int _starpu_ftruncate(FILE *file);
+int _starpu_frdlock(FILE *file);
+int _starpu_frdunlock(FILE *file);
+int _starpu_fwrlock(FILE *file);
+int _starpu_fwrunlock(FILE *file);
 char *_starpu_get_home_path(void);
 void _starpu_gethostname(char *hostname, size_t size);
 

+ 26 - 2
src/core/perfmodel/perfmodel_bus.c

@@ -783,6 +783,8 @@ static void load_bus_affinity_file_content(void)
 	f = fopen(path, "r");
 	STARPU_ASSERT(f);
 
+	_starpu_frdlock(f);
+
 	struct _starpu_machine_config *config = _starpu_get_machine_config();
 	ncpus = _starpu_topology_get_nhwcpu(config);
         unsigned gpu;
@@ -835,6 +837,7 @@ static void load_bus_affinity_file_content(void)
 		STARPU_ASSERT(ret == 0);
 	}
 #endif /* !STARPU_USE_OPENCL */
+	_starpu_frdunlock(f);
 
 	fclose(f);
 #endif /* !(STARPU_USE_CUDA_ || STARPU_USE_OPENCL */
@@ -862,6 +865,7 @@ static void write_bus_affinity_file_content(void)
 		STARPU_ABORT();
 	}
 
+	_starpu_frdlock(f);
 	unsigned cpu;
         unsigned gpu;
 
@@ -897,6 +901,7 @@ static void write_bus_affinity_file_content(void)
 	}
 #endif
 
+	_starpu_frdunlock(f);
 	fclose(f);
 #endif
 }
@@ -1006,6 +1011,7 @@ static int load_bus_latency_file_content(void)
 		fflush(stderr);
 		STARPU_ABORT();
 	}
+	_starpu_frdlock(f);
 
 	for (src = 0; src < STARPU_MAXNODES; src++)
 	{
@@ -1073,13 +1079,14 @@ static int load_bus_latency_file_content(void)
 			break;
 		ungetc(n, f);
 	}
+	_starpu_frdunlock(f);
+	fclose(f);
 
 	/* No more values, take NAN */
 	for ( ; src < STARPU_MAXNODES; src++)
 		for (dst = 0; dst < STARPU_MAXNODES; dst++)
 			latency_matrix[src][dst] = NAN;
 
-	fclose(f);
 	return 1;
 }
 
@@ -1104,6 +1111,8 @@ static void write_bus_latency_file_content(void)
 		fflush(stderr);
 		STARPU_ABORT();
 	}
+	_starpu_fwrlock(f);
+	_starpu_ftruncate(f);
 
 	fprintf(f, "# ");
 	for (dst = 0; dst < STARPU_MAXNODES; dst++)
@@ -1163,6 +1172,7 @@ static void write_bus_latency_file_content(void)
 
 		fprintf(f, "\n");
 	}
+	_starpu_fwrunlock(f);
 
 	fclose(f);
 }
@@ -1223,6 +1233,7 @@ static int load_bus_bandwidth_file_content(void)
 		fflush(stderr);
 		STARPU_ABORT();
 	}
+	_starpu_frdlock(f);
 
 	for (src = 0; src < STARPU_MAXNODES; src++)
 	{
@@ -1290,13 +1301,14 @@ static int load_bus_bandwidth_file_content(void)
 			break;
 		ungetc(n, f);
 	}
+	_starpu_frdunlock(f);
+	fclose(f);
 
 	/* No more values, take NAN */
 	for ( ; src < STARPU_MAXNODES; src++)
 		for (dst = 0; dst < STARPU_MAXNODES; dst++)
 			latency_matrix[src][dst] = NAN;
 
-	fclose(f);
 	return 1;
 }
 
@@ -1316,6 +1328,9 @@ static void write_bus_bandwidth_file_content(void)
 	f = fopen(path, "w+");
 	STARPU_ASSERT(f);
 
+	_starpu_fwrlock(f);
+	_starpu_ftruncate(f);
+
 	fprintf(f, "# ");
 	for (dst = 0; dst < STARPU_MAXNODES; dst++)
 		fprintf(f, "to %d\t\t", dst);
@@ -1387,6 +1402,7 @@ static void write_bus_bandwidth_file_content(void)
 		fprintf(f, "\n");
 	}
 
+	_starpu_fwrunlock(f);
 	fclose(f);
 }
 #endif /* STARPU_SIMGRID */
@@ -1551,6 +1567,7 @@ static void check_bus_config_file(void)
                 // Loading configuration from file
                 f = fopen(path, "r");
                 STARPU_ASSERT(f);
+		_starpu_frdlock(f);
                 _starpu_drop_comments(f);
                 ret = fscanf(f, "%u\t", &read_cpus);
 		STARPU_ASSERT(ret == 1);
@@ -1565,6 +1582,7 @@ static void check_bus_config_file(void)
 		if (ret == 0)
 			read_mic = 0;
                 _starpu_drop_comments(f);
+		_starpu_frdunlock(f);
                 fclose(f);
 
                 // Loading current configuration
@@ -1619,6 +1637,8 @@ static void write_bus_config_file_content(void)
 
         f = fopen(path, "w+");
 	STARPU_ASSERT(f);
+	_starpu_fwrlock(f);
+	_starpu_ftruncate(f);
 
         fprintf(f, "# Current configuration\n");
         fprintf(f, "%u # Number of CPUs\n", ncpus);
@@ -1626,6 +1646,7 @@ static void write_bus_config_file_content(void)
         fprintf(f, "%d # Number of OpenCL devices\n", nopencl);
         fprintf(f, "%d # Number of MIC devices\n", nmic);
 
+	_starpu_fwrunlock(f);
         fclose(f);
 }
 
@@ -1664,6 +1685,8 @@ static void write_bus_platform_file_content(void)
 		fflush(stderr);
 		STARPU_ABORT();
 	}
+	_starpu_fwrlock(f);
+	_starpu_ftruncate(f);
 
 	fprintf(f,
 "<?xml version='1.0'?>\n"
@@ -1810,6 +1833,7 @@ static void write_bus_platform_file_content(void)
 " </platform>\n"
 		);
 
+	_starpu_fwrunlock(f);
 	fclose(f);
 }
 

+ 9 - 0
src/core/perfmodel/perfmodel_history.c

@@ -834,7 +834,10 @@ static void save_history_based_model(struct starpu_perfmodel *model)
 	f = fopen(path, "w+");
 	STARPU_ASSERT_MSG(f, "Could not save performance model %s\n", path);
 
+	_starpu_fwrlock(f);
+	_starpu_ftruncate(f);
 	dump_model_file(f, model);
+	_starpu_fwrunlock(f);
 
 	fclose(f);
 }
@@ -1009,7 +1012,9 @@ void _starpu_load_history_based_model(struct starpu_perfmodel *model, unsigned s
 				f = fopen(path, "r");
 				STARPU_ASSERT(f);
 
+				_starpu_frdlock(f);
 				parse_model_file(f, model, scan_history);
+				_starpu_frdunlock(f);
 
 				fclose(f);
 			}
@@ -1099,10 +1104,12 @@ int starpu_perfmodel_load_symbol(const char *symbol, struct starpu_perfmodel *mo
 	FILE *f = fopen(path, "r");
 	STARPU_ASSERT(f);
 
+	_starpu_frdlock(f);
 	starpu_perfmodel_init_with_file(f, model);
 	rewind(f);
 
 	parse_model_file(f, model, 1);
+	_starpu_frdunlock(f);
 
 	STARPU_ASSERT(fclose(f) == 0);
 
@@ -1412,6 +1419,7 @@ void _starpu_update_perfmodel_history(struct _starpu_job *j, struct starpu_perfm
 			_STARPU_DISP("Error <%s> when opening file <%s>\n", strerror(errno), per_arch_model->debug_path);
 			STARPU_ABORT();
 		}
+		_starpu_fwrlock(f);
 
 		if (!j->footprint_is_computed)
 			(void) _starpu_compute_buffers_footprint(model, arch, nimpl, j);
@@ -1431,6 +1439,7 @@ void _starpu_update_perfmodel_history(struct _starpu_job *j, struct starpu_perfm
 			handle->ops->display(handle, f);
 		}
 		fprintf(f, "\n");
+		_starpu_fwrunlock(f);
 		fclose(f);
 #endif
 		STARPU_PTHREAD_RWLOCK_UNLOCK(&model->model_rwlock);

+ 3 - 0
src/core/simgrid.c

@@ -217,6 +217,7 @@ void _starpu_simgrid_init()
 		/* Get XML platform */
 		_starpu_simgrid_get_platform_path(path, sizeof(path));
 		in = fopen(path, "r");
+		_starpu_frdlock(in);
 		STARPU_ASSERT_MSG(in, "Could not open platform file %s", path);
 #ifdef HAVE_MKSTEMPS
 		out = mkstemps(template, strlen(".xml"));
@@ -230,6 +231,8 @@ void _starpu_simgrid_init()
 		snprintf(cmdline, sizeof(cmdline), "xsltproc --novalid --stringparam ASname %s -o %s "STARPU_DATADIR"/starpu/starpu_smpi.xslt %s", asname, template, path);
 		ret = system(cmdline);
 		STARPU_ASSERT_MSG(ret == 0, "running xsltproc to generate SMPI platforms %s from %s failed", template, path);
+		_starpu_frdunlock(in);
+		fclose(in);
 
 		/* And create it */
 		MSG_create_environment(template);