浏览代码

When many files are open, switch to opening files on demand for each request

Samuel Thibault 9 年之前
父节点
当前提交
5c7fec04ee
共有 2 个文件被更改,包括 214 次插入43 次删除
  1. 99 17
      src/core/disk_ops/disk_stdio.c
  2. 115 26
      src/core/disk_ops/unistd/disk_unistd_global.c

+ 99 - 17
src/core/disk_ops/disk_stdio.c

@@ -40,9 +40,11 @@
 #define O_BINARY 0
 #endif
 
+#define MAX_OPEN_FILES 64
 #define TEMP_HIERARCHY_DEPTH 2
 
 /* ------------------- use STDIO to write on disk -------------------  */
+static int starpu_stdio_opened_files;
 
 struct starpu_stdio_obj
 {
@@ -65,6 +67,18 @@ static struct starpu_stdio_obj *_starpu_stdio_init(int descriptor, char *path, s
 		return NULL;
 	}
 
+	STARPU_HG_DISABLE_CHECKING(starpu_stdio_opened_files);
+	if (starpu_stdio_opened_files >= MAX_OPEN_FILES)
+	{
+		/* Too many opened files, avoid keeping this one opened */
+		fclose(f);
+		f = NULL;
+		close(descriptor);
+		descriptor = -1;
+	}
+	else
+		(void) STARPU_ATOMIC_ADD(&starpu_stdio_opened_files, 1);
+
 	STARPU_PTHREAD_MUTEX_INIT(&obj->mutex, NULL);
 
 	obj->descriptor = descriptor;
@@ -75,8 +89,32 @@ static struct starpu_stdio_obj *_starpu_stdio_init(int descriptor, char *path, s
 	return (void *) obj;
 }
 
+static FILE *_starpu_stdio_reopen(struct starpu_stdio_obj *obj)
+{
+	int id = open(obj->path, O_RDWR);
+	STARPU_ASSERT(id >= 0);
+
+	FILE *f = fdopen(id,"rb+");
+	STARPU_ASSERT(f);
+
+	return f;
+}
+
+static void _starpu_stdio_reclose(FILE *f)
+{
+	int id = fileno(f);
+	fclose(f);
+	close(id);
+}
+
 static void _starpu_stdio_close(struct starpu_stdio_obj *obj)
 {
+	if (obj->descriptor < 0)
+		return;
+
+	if (starpu_stdio_opened_files < MAX_OPEN_FILES)
+		(void) STARPU_ATOMIC_ADD(&starpu_stdio_opened_files, -1);
+
 	fclose(obj->file);
 	close(obj->descriptor);
 }
@@ -179,16 +217,23 @@ static void starpu_stdio_close(void *base STARPU_ATTRIBUTE_UNUSED, void *obj, si
 static int starpu_stdio_read(void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size)
 {
 	struct starpu_stdio_obj *tmp = (struct starpu_stdio_obj *) obj;
+	FILE *f = tmp->file;
 
-	STARPU_PTHREAD_MUTEX_LOCK(&tmp->mutex);
+	if (f)
+		STARPU_PTHREAD_MUTEX_LOCK(&tmp->mutex);
+	else
+		f = _starpu_stdio_reopen(obj);
 
-	int res = fseek(tmp->file, offset, SEEK_SET);
+	int res = fseek(f, offset, SEEK_SET);
 	STARPU_ASSERT_MSG(res == 0, "Stdio read failed");
 
-	starpu_ssize_t nb = fread(buf, 1, size, tmp->file);
+	starpu_ssize_t nb = fread(buf, 1, size, f);
 	STARPU_ASSERT_MSG(nb >= 0, "Stdio read failed");
 
-	STARPU_PTHREAD_MUTEX_UNLOCK(&tmp->mutex);
+	if (tmp->file)
+		STARPU_PTHREAD_MUTEX_UNLOCK(&tmp->mutex);
+	else
+		_starpu_stdio_reclose(f);
 
 	return 0;
 }
@@ -196,33 +241,58 @@ static int starpu_stdio_read(void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void
 static int starpu_stdio_full_read(void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void **ptr, size_t *size)
 {
 	struct starpu_stdio_obj *tmp = (struct starpu_stdio_obj *) obj;
+	FILE *f = tmp->file;
 
-	STARPU_PTHREAD_MUTEX_LOCK(&tmp->mutex);
+	if (f)
+		STARPU_PTHREAD_MUTEX_LOCK(&tmp->mutex);
+	else
+		f = _starpu_stdio_reopen(obj);
 
-	int res = fseek(tmp->file, 0, SEEK_END);
+	int res = fseek(f, 0, SEEK_END);
 	STARPU_ASSERT_MSG(res == 0, "Stdio write failed");
-	*size = ftell(tmp->file);
-
-	STARPU_PTHREAD_MUTEX_UNLOCK(&tmp->mutex);
+	*size = ftell(f);
 
+	if (tmp->file)
+		STARPU_PTHREAD_MUTEX_UNLOCK(&tmp->mutex);
 	/* Alloc aligned buffer */
 	starpu_malloc_flags(ptr, *size, 0);
-	return starpu_stdio_read(base, obj, *ptr, 0, *size);
+	if (tmp->file)
+		STARPU_PTHREAD_MUTEX_LOCK(&tmp->mutex);
+
+	res = fseek(f, 0, SEEK_SET);
+	STARPU_ASSERT_MSG(res == 0, "Stdio read failed");
+
+	starpu_ssize_t nb = fread(*ptr, 1, *size, f);
+	STARPU_ASSERT_MSG(nb >= 0, "Stdio read failed");
+
+	if (tmp->file)
+		STARPU_PTHREAD_MUTEX_UNLOCK(&tmp->mutex);
+	else
+		_starpu_stdio_reclose(f);
+
+	return 0;
 }
 
 /* write on the memory disk */
 static int starpu_stdio_write(void *base STARPU_ATTRIBUTE_UNUSED, void *obj, const void *buf, off_t offset, size_t size)
 {
 	struct starpu_stdio_obj *tmp = (struct starpu_stdio_obj *) obj;
+	FILE *f = tmp->file;
 
-	STARPU_PTHREAD_MUTEX_LOCK(&tmp->mutex);
+	if (f)
+		STARPU_PTHREAD_MUTEX_LOCK(&tmp->mutex);
+	else
+		f = _starpu_stdio_reopen(obj);
 
-	int res = fseek(tmp->file, offset, SEEK_SET);
+	int res = fseek(f, offset, SEEK_SET);
 	STARPU_ASSERT_MSG(res == 0, "Stdio write failed");
 
-	fwrite(buf, 1, size, tmp->file);
+	fwrite(buf, 1, size, f);
 
-	STARPU_PTHREAD_MUTEX_UNLOCK(&tmp->mutex);
+	if (tmp->file)
+		STARPU_PTHREAD_MUTEX_UNLOCK(&tmp->mutex);
+	else
+		_starpu_stdio_reclose(f);
 
 	return 0;
 }
@@ -230,21 +300,33 @@ static int starpu_stdio_write(void *base STARPU_ATTRIBUTE_UNUSED, void *obj, con
 static int starpu_stdio_full_write(void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *ptr, size_t size)
 {
 	struct starpu_stdio_obj *tmp = (struct starpu_stdio_obj *) obj;
+	FILE *f = tmp->file;
+	int fd;
+
+	if (!f)
+		f = _starpu_stdio_reopen(obj);
+	fd = fileno(f);
 
 	/* update file size to realise the next good full_read */
 	if(size != tmp->size)
 	{
 #ifdef STARPU_HAVE_WINDOWS
-		int val = _chsize(tmp->descriptor, size);
+		int val = _chsize(fd, size);
 #else
-		int val = ftruncate(tmp->descriptor,size);
+		int val = ftruncate(fd,size);
 #endif
 		STARPU_ASSERT(val == 0);
 
 		tmp->size = size;
 	}
 
-	starpu_stdio_write(base, obj, ptr, 0, size);
+	int res = fseek(f, 0, SEEK_SET);
+	STARPU_ASSERT_MSG(res == 0, "Stdio write failed");
+
+	fwrite(ptr, 1, size, f);
+
+	if (!tmp->file)
+		_starpu_stdio_reclose(f);
 
 	return 0;
 }

+ 115 - 26
src/core/disk_ops/unistd/disk_unistd_global.c

@@ -48,14 +48,34 @@
 #  define MEM_SIZE 1
 #endif
 
+#define MAX_OPEN_FILES 64
 #define TEMP_HIERARCHY_DEPTH 2
 
 /* TODO: on Linux, use io_submit */
 
+static int starpu_unistd_opened_files;
+
+#ifdef HAVE_AIO_H
+struct starpu_unistd_aiocb {
+	struct aiocb aiocb;
+	struct starpu_unistd_global_obj *obj;
+};
+#endif
+
 /* ------------------- use UNISTD to write on disk -------------------  */
 
 static void _starpu_unistd_init(struct starpu_unistd_global_obj *obj, int descriptor, char *path, size_t size)
 {
+	STARPU_HG_DISABLE_CHECKING(starpu_unistd_opened_files);
+	if (starpu_unistd_opened_files >= MAX_OPEN_FILES)
+	{
+		/* Too many opened files, avoid keeping this one opened */
+		close(descriptor);
+		descriptor = -1;
+	}
+	else
+		(void) STARPU_ATOMIC_ADD(&starpu_unistd_opened_files, 1);
+
 	STARPU_PTHREAD_MUTEX_INIT(&obj->mutex, NULL);
 
 	obj->descriptor = descriptor;
@@ -63,8 +83,26 @@ static void _starpu_unistd_init(struct starpu_unistd_global_obj *obj, int descri
 	obj->size = size;
 }
 
+static int _starpu_unistd_reopen(struct starpu_unistd_global_obj *obj)
+{
+	int id = open(obj->path, obj->flags);
+	STARPU_ASSERT(id >= 0);
+	return id;
+}
+
+static void _starpu_unistd_reclose(int id)
+{
+	close(id);
+}
+
 static void _starpu_unistd_close(struct starpu_unistd_global_obj *obj)
 {
+	if (obj->descriptor < 0)
+		return;
+
+	if (starpu_unistd_opened_files < MAX_OPEN_FILES)
+		(void) STARPU_ATOMIC_ADD(&starpu_unistd_opened_files, -1);
+
 	close(obj->descriptor);
 }
 
@@ -162,21 +200,33 @@ int starpu_unistd_global_read(void *base STARPU_ATTRIBUTE_UNUSED, void *obj, voi
 {
 	struct starpu_unistd_global_obj *tmp = (struct starpu_unistd_global_obj *) obj;
 	starpu_ssize_t nb;
+	int fd = tmp->descriptor;
 
 #ifdef HAVE_PREAD
-	nb = pread(tmp->descriptor, buf, size, offset);
-#else
-	STARPU_PTHREAD_MUTEX_LOCK(&tmp->mutex);
+	if (fd >= 0)
+		nb = pread(fd, buf, size, offset);
+	else
+#endif
+	{
+		if (fd >= 0)
+			STARPU_PTHREAD_MUTEX_LOCK(&tmp->mutex);
+		else
+			fd = _starpu_unistd_reopen(obj);
 
-	int res = lseek(tmp->descriptor, offset, SEEK_SET);
-	STARPU_ASSERT_MSG(res >= 0, "Starpu Disk unistd lseek for read failed: offset %lu got errno %d", (unsigned long) offset, errno);
+		int res = lseek(fd, offset, SEEK_SET);
+		STARPU_ASSERT_MSG(res >= 0, "Starpu Disk unistd lseek for read failed: offset %lu got errno %d", (unsigned long) offset, errno);
 
-	nb = read(tmp->descriptor, buf, size);
+		nb = read(fd, buf, size);
 
-	STARPU_PTHREAD_MUTEX_UNLOCK(&tmp->mutex);
-#endif
+		if (tmp->descriptor >= 0)
+			STARPU_PTHREAD_MUTEX_UNLOCK(&tmp->mutex);
+		else
+			_starpu_unistd_reclose(fd);
+
+	}
 
 	STARPU_ASSERT_MSG(nb >= 0, "Starpu Disk unistd read failed: size %lu got errno %d", (unsigned long) size, errno);
+
 	return nb;
 }
 
@@ -184,10 +234,15 @@ int starpu_unistd_global_read(void *base STARPU_ATTRIBUTE_UNUSED, void *obj, voi
 void *starpu_unistd_global_async_read(void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size)
 {
         struct starpu_unistd_global_obj *tmp = obj;
+        struct starpu_unistd_aiocb *starpu_aiocb = calloc(1,sizeof(*starpu_aiocb));
+        struct aiocb *aiocb = &starpu_aiocb->aiocb;
+        starpu_aiocb->obj = obj;
+        int fd = tmp->descriptor;
 
-        struct aiocb *aiocb = calloc(1,sizeof(*aiocb));
+        if (fd < 0)
+                fd = _starpu_unistd_reopen(obj);
 
-        aiocb->aio_fildes = tmp->descriptor;
+        aiocb->aio_fildes = fd;
         aiocb->aio_offset = offset;
         aiocb->aio_nbytes = size;
         aiocb->aio_buf = buf;
@@ -197,6 +252,8 @@ void *starpu_unistd_global_async_read(void *base STARPU_ATTRIBUTE_UNUSED, void *
         if (aio_read(aiocb) < 0)
         {
                 free(aiocb);
+                if (tmp->descriptor < 0)
+                        _starpu_unistd_reclose(fd);
                 aiocb = NULL;
         }
 
@@ -207,15 +264,20 @@ void *starpu_unistd_global_async_read(void *base STARPU_ATTRIBUTE_UNUSED, void *
 int starpu_unistd_global_full_read(void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void **ptr, size_t *size)
 {
         struct starpu_unistd_global_obj *tmp = (struct starpu_unistd_global_obj *) obj;
+	int fd = tmp->descriptor;
 
+	if (fd < 0)
+		fd = _starpu_unistd_reopen(obj);
 #ifdef STARPU_HAVE_WINDOWS
-	*size = _filelength(tmp->descriptor);
+	*size = _filelength(fd);
 #else
 	struct stat st;
-	fstat(tmp->descriptor, &st);
+	fstat(fd, &st);
 
 	*size = st.st_size;
 #endif
+	if (tmp->descriptor < 0)
+		_starpu_unistd_reclose(fd);
 
 	/* Allocated aligned buffer */
 	starpu_malloc_flags(ptr, *size, 0);
@@ -227,19 +289,29 @@ int starpu_unistd_global_write(void *base STARPU_ATTRIBUTE_UNUSED, void *obj, co
 {
 	struct starpu_unistd_global_obj *tmp = (struct starpu_unistd_global_obj *) obj;
 	int res;
+	int fd = tmp->descriptor;
 
 #ifdef HAVE_PWRITE
-	res = pwrite(tmp->descriptor, buf, size, offset);
-#else
-	STARPU_PTHREAD_MUTEX_LOCK(&tmp->mutex);
+	if (fd >= 0)
+		res = pwrite(fd, buf, size, offset);
+	else
+#endif
+	{
+		if (fd >= 0)
+			STARPU_PTHREAD_MUTEX_LOCK(&tmp->mutex);
+		else
+			fd = _starpu_unistd_reopen(obj);
 
-	res = lseek(tmp->descriptor, offset, SEEK_SET);
-	STARPU_ASSERT_MSG(res >= 0, "Starpu Disk unistd lseek for write failed: offset %lu got errno %d", (unsigned long) offset, errno);
+		res = lseek(fd, offset, SEEK_SET);
+		STARPU_ASSERT_MSG(res >= 0, "Starpu Disk unistd lseek for write failed: offset %lu got errno %d", (unsigned long) offset, errno);
 
-	res = write(tmp->descriptor, buf, size);
+		res = write(fd, buf, size);
 
-	STARPU_PTHREAD_MUTEX_UNLOCK(&tmp->mutex);
-#endif
+		if (tmp->descriptor >= 0)
+			STARPU_PTHREAD_MUTEX_UNLOCK(&tmp->mutex);
+		else
+			_starpu_unistd_reclose(fd);
+	}
 
 	STARPU_ASSERT_MSG(res >= 0, "Starpu Disk unistd write failed: size %lu got errno %d", (unsigned long) size, errno);
 	return 0;
@@ -249,9 +321,15 @@ int starpu_unistd_global_write(void *base STARPU_ATTRIBUTE_UNUSED, void *obj, co
 void *starpu_unistd_global_async_write(void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size)
 {
         struct starpu_unistd_global_obj *tmp = obj;
-        struct aiocb *aiocb = calloc(1,sizeof(*aiocb));
+        struct starpu_unistd_aiocb *starpu_aiocb = calloc(1,sizeof(*starpu_aiocb));
+        struct aiocb *aiocb = &starpu_aiocb->aiocb;
+        starpu_aiocb->obj = obj;
+        int fd = tmp->descriptor;
+
+        if (fd < 0)
+                fd = _starpu_unistd_reopen(obj);
 
-        aiocb->aio_fildes = tmp->descriptor;
+        aiocb->aio_fildes = fd;
         aiocb->aio_offset = offset;
         aiocb->aio_nbytes = size;
         aiocb->aio_buf = buf;
@@ -261,6 +339,8 @@ void *starpu_unistd_global_async_write(void *base STARPU_ATTRIBUTE_UNUSED, void
         if (aio_write(aiocb) < 0)
         {
                 free(aiocb);
+                if (tmp->descriptor < 0)
+                        _starpu_unistd_reclose(fd);
                 aiocb = NULL;
         }
 
@@ -275,11 +355,17 @@ int starpu_unistd_global_full_write(void *base STARPU_ATTRIBUTE_UNUSED, void *ob
         /* update file size to realise the next good full_read */
         if(size != tmp->size)
         {
+		int fd = tmp->descriptor;
+
+		if (fd < 0)
+			fd = _starpu_unistd_reopen(obj);
 #ifdef STARPU_HAVE_WINDOWS
-		int val = _chsize(tmp->descriptor, size);
+		int val = _chsize(fd, size);
 #else
-		int val = ftruncate(tmp->descriptor,size);
+		int val = ftruncate(fd,size);
 #endif
+		if (tmp->descriptor < 0)
+			_starpu_unistd_reclose(fd);
 		STARPU_ASSERT(val == 0);
 		tmp->size = size;
         }
@@ -425,8 +511,11 @@ int starpu_unistd_global_test_request(void *async_channel)
 
 void starpu_unistd_global_free_request(void *async_channel)
 {
-        struct aiocb *aiocb = async_channel;
+        struct starpu_unistd_aiocb *starpu_aiocb = async_channel;
+        struct aiocb *aiocb = &starpu_aiocb->aiocb;
+        if (starpu_aiocb->obj->descriptor < 0)
+                _starpu_unistd_reclose(aiocb->aio_fildes);
         aio_return(aiocb);
-        free(aiocb);
+        free(starpu_aiocb);
 }
 #endif