Browse Source

Use linux' aio syscalls when available instead of the userland glibc emulation

Samuel Thibault 8 years ago
parent
commit
af7be88375
3 changed files with 124 additions and 12 deletions
  1. 2 0
      configure.ac
  2. 2 2
      src/core/disk_ops/disk_unistd_o_direct.c
  3. 120 10
      src/core/disk_ops/unistd/disk_unistd_global.c

+ 2 - 0
configure.ac

@@ -753,7 +753,9 @@ fi
 AC_CHECK_FUNC([sched_yield], [AC_DEFINE([STARPU_HAVE_SCHED_YIELD], [1], [Define to 1 if the function sched_yield is available.])])
 
 AC_CHECK_HEADERS([aio.h])
+AC_CHECK_HEADERS([libaio.h])
 AC_CHECK_LIB([rt], [aio_read])
+AC_CHECK_LIB([aio], [io_setup])
 
 AC_CHECK_FUNCS([mkostemp])
 AC_CHECK_FUNCS([mkdtemp])

+ 2 - 2
src/core/disk_ops/disk_unistd_o_direct.c

@@ -78,7 +78,7 @@ static void *starpu_unistd_o_direct_plug(void *parameter, starpu_ssize_t size)
 	return starpu_unistd_global_plug (parameter, size);
 }
 
-#ifdef HAVE_AIO_H
+#if defined(HAVE_AIO_H) || defined(HAVE_LIBAIO_H)
 void *starpu_unistd_o_direct_global_async_read(void *base, void *obj, void *buf, off_t offset, size_t size)
 {
 	STARPU_ASSERT_MSG((size % getpagesize()) == 0, "The unistd_o_direct variant can only read a multiple of page size %lu Bytes (Here %lu). Use the non-o_direct unistd variant if your data is not a multiple of %lu",
@@ -112,7 +112,7 @@ struct starpu_disk_ops starpu_disk_unistd_o_direct_ops =
 	.unplug = starpu_unistd_global_unplug,
 	.copy = NULL,
 	.bandwidth = get_unistd_global_bandwidth_between_disk_and_main_ram,
-#ifdef HAVE_AIO_H
+#if defined(HAVE_AIO_H) || defined(HAVE_LIBAIO_H)
         .async_read = starpu_unistd_o_direct_global_async_read,
         .async_write = starpu_unistd_o_direct_global_async_write,
         .wait_request = starpu_unistd_global_wait_request,

+ 120 - 10
src/core/disk_ops/unistd/disk_unistd_global.c

@@ -19,12 +19,14 @@
 #include <stdlib.h>
 #include <sys/stat.h>
 #include <stdint.h>
-#ifdef HAVE_AIO_H
-#include <aio.h>
-#endif
 #include <errno.h>
 
 #include <common/config.h>
+#if defined(HAVE_LIBAIO_H)
+#include <libaio.h>
+#elif defined(HAVE_AIO_H)
+#include <aio.h>
+#endif
 #ifdef HAVE_UNISTD_H
 #  include <unistd.h>
 #endif
@@ -51,11 +53,16 @@
 #define MAX_OPEN_FILES 64
 #define TEMP_HIERARCHY_DEPTH 2
 
-/* TODO: on Linux, use io_submit */
-
 static unsigned starpu_unistd_opened_files;
 
-#ifdef HAVE_AIO_H
+#if defined(HAVE_LIBAIO_H)
+struct starpu_unistd_aiocb
+{
+	struct iocb iocb;
+	io_context_t ctx;
+	struct starpu_unistd_global_obj *obj;
+};
+#elif defined(HAVE_AIO_H)
 struct starpu_unistd_aiocb
 {
 	struct aiocb aiocb;
@@ -223,7 +230,32 @@ int starpu_unistd_global_read(void *base STARPU_ATTRIBUTE_UNUSED, void *obj, voi
 	return nb;
 }
 
-#ifdef HAVE_AIO_H
+#if defined(HAVE_LIBAIO_H)
+void *starpu_unistd_global_async_read(void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size)
+{
+        struct starpu_unistd_global_obj *tmp = obj;
+        struct starpu_unistd_aiocb *starpu_aiocb;
+	_STARPU_CALLOC(starpu_aiocb, 1,sizeof(*starpu_aiocb));
+        struct iocb *iocb = &starpu_aiocb->iocb;
+        starpu_aiocb->obj = obj;
+        int fd = tmp->descriptor;
+
+        if (fd < 0)
+                fd = _starpu_unistd_reopen(obj);
+
+	io_setup(1, &starpu_aiocb->ctx);
+	io_prep_pread(iocb, fd, buf, size, offset);
+	if (io_submit(starpu_aiocb->ctx, 1, &iocb) < 0)
+	{
+                free(iocb);
+                if (tmp->descriptor < 0)
+                        _starpu_unistd_reclose(fd);
+                iocb = NULL;
+        }
+
+        return starpu_aiocb;
+}
+#elif defined(HAVE_AIO_H)
 void *starpu_unistd_global_async_read(void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size)
 {
         struct starpu_unistd_global_obj *tmp = obj;
@@ -312,7 +344,32 @@ int starpu_unistd_global_write(void *base STARPU_ATTRIBUTE_UNUSED, void *obj, co
 	return 0;
 }
 
-#ifdef HAVE_AIO_H
+#if defined(HAVE_LIBAIO_H)
+void *starpu_unistd_global_async_write(void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size)
+{
+        struct starpu_unistd_global_obj *tmp = obj;
+        struct starpu_unistd_aiocb *starpu_aiocb;
+	_STARPU_CALLOC(starpu_aiocb, 1,sizeof(*starpu_aiocb));
+        struct iocb *iocb = &starpu_aiocb->iocb;
+        starpu_aiocb->obj = obj;
+        int fd = tmp->descriptor;
+
+        if (fd < 0)
+                fd = _starpu_unistd_reopen(obj);
+
+	io_setup(1, &starpu_aiocb->ctx);
+	io_prep_pwrite(iocb, fd, buf, size, offset);
+	if (io_submit(starpu_aiocb->ctx, 1, &iocb) < 0)
+        {
+                free(iocb);
+                if (tmp->descriptor < 0)
+                        _starpu_unistd_reclose(fd);
+                iocb = NULL;
+        }
+
+        return starpu_aiocb;
+}
+#elif defined(HAVE_AIO_H)
 void *starpu_unistd_global_async_write(void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size)
 {
         struct starpu_unistd_global_obj *tmp = obj;
@@ -473,7 +530,60 @@ int get_unistd_global_bandwidth_between_disk_and_main_ram(unsigned node)
 	return 1;
 }
 
-#ifdef HAVE_AIO_H
+#if defined(HAVE_LIBAIO_H)
+void starpu_unistd_global_wait_request(void *async_channel)
+{
+	struct starpu_unistd_aiocb *starpu_aiocb = async_channel;
+	struct io_event event;
+
+        int values = -1;
+        int ret, myerrno = EAGAIN;
+        while(values <= 0 && (myerrno == EAGAIN || myerrno == EINTR))
+        {
+                /* Wait the answer of the request timeout IS NULL */
+		values = io_getevents(starpu_aiocb->ctx, 1, 1, &event, NULL);
+		if (values < 0)
+			myerrno = -values;
+        }
+	STARPU_ASSERT(&starpu_aiocb->iocb == event.obj);
+	ret = event.res > 0;
+        STARPU_ASSERT_MSG(!ret, "aio_error returned %d", ret);
+}
+
+int starpu_unistd_global_test_request(void *async_channel)
+{
+	struct starpu_unistd_aiocb *starpu_aiocb = async_channel;
+	struct io_event event;
+	struct timespec ts;
+        int ret;
+
+	memset(&ts, 0, sizeof(ts));
+
+        /* Test the answer of the request */
+	ret = io_getevents(starpu_aiocb->ctx, 0, 1, &event, &ts);
+
+        if (ret == 1)
+	{
+		STARPU_ASSERT(&starpu_aiocb->iocb == event.obj);
+                /* request is finished */
+                return 1;
+	}
+        if (ret == 0 || ret == -EINTR || ret == -EINPROGRESS || ret == -EAGAIN)
+                return 0;
+        /* an error occured */
+        STARPU_ABORT_MSG("aio_error returned %d", ret);
+}
+
+void starpu_unistd_global_free_request(void *async_channel)
+{
+        struct starpu_unistd_aiocb *starpu_aiocb = async_channel;
+        struct iocb *iocb = &starpu_aiocb->iocb;
+        if (starpu_aiocb->obj->descriptor < 0)
+                _starpu_unistd_reclose(iocb->aio_fildes);
+        io_destroy(starpu_aiocb->ctx);
+        free(starpu_aiocb);
+}
+#elif defined(HAVE_AIO_H)
 void starpu_unistd_global_wait_request(void *async_channel)
 {
         const struct aiocb *aiocb = async_channel;
@@ -507,7 +617,7 @@ int starpu_unistd_global_test_request(void *async_channel)
         if (ret == 0)
                 /* request is finished */
                 return 1;
-        if (ret == EINPROGRESS || ret == EAGAIN)
+        if (ret == EINTR || ret == EINPROGRESS || ret == EAGAIN)
                 return 0;
         /* an error occured */
         STARPU_ABORT_MSG("aio_error returned %d", ret);