Pārlūkot izejas kodu

add asynchronous i/o for unistd and unistd with O_DIRECT backends

Corentin Salingue 11 gadi atpakaļ
vecāks
revīzija
dfefc33e84

+ 4 - 2
src/core/disk_ops/disk_unistd.c

@@ -62,6 +62,8 @@ struct starpu_disk_ops starpu_disk_unistd_ops = {
 	.unplug = starpu_unistd_global_unplug,
 	.copy = NULL,
 	.bandwidth = get_unistd_global_bandwidth_between_disk_and_main_ram,
-	.async_read = NULL,
-	.async_write = NULL
+	.async_read = starpu_unistd_global_async_read,
+	.async_write = starpu_unistd_global_async_write,
+	.wait_request = starpu_unistd_global_wait_request,
+	.test_request = starpu_unistd_global_test_request
 };

+ 5 - 2
src/core/disk_ops/disk_unistd_o_direct.c

@@ -92,9 +92,12 @@ struct starpu_disk_ops starpu_disk_unistd_o_direct_ops = {
 	.close = starpu_unistd_global_close,
 	.read = starpu_unistd_o_direct_read,
 	.write = starpu_unistd_o_direct_write,
-	.async_write = NULL,
 	.plug = starpu_unistd_o_direct_plug,
 	.unplug = starpu_unistd_global_unplug,
 	.copy = NULL,
-	.bandwidth = get_unistd_global_bandwidth_between_disk_and_main_ram
+	.bandwidth = get_unistd_global_bandwidth_between_disk_and_main_ram,
+        .async_read = starpu_unistd_global_async_read,
+        .async_write = starpu_unistd_global_async_write,
+        .wait_request = starpu_unistd_global_wait_request,
+        .test_request = starpu_unistd_global_test_request
 };

+ 90 - 0
src/core/disk_ops/unistd/disk_unistd_global.c

@@ -20,11 +20,14 @@
 #include <sys/stat.h>
 #include <sys/time.h>
 #include <stdint.h>
+#include <aio.h>
+#include <errno.h>
 
 #include <starpu.h>
 #include <core/disk.h>
 #include <core/perfmodel/perfmodel.h>
 #include <core/disk_ops/unistd/disk_unistd_global.h>
+#include <datawizard/copy_driver.h>
 
 #ifdef STARPU_HAVE_WINDOWS
         #include <io.h>
@@ -176,6 +179,28 @@ starpu_unistd_global_read (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *
 }
 
 
+int
+starpu_unistd_global_async_read (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size, void * async_channel)
+{
+        struct starpu_unistd_global_obj * tmp = (struct starpu_unistd_global_obj *) obj;
+
+        struct _starpu_async_channel * channel = (struct _starpu_async_channel *) async_channel;
+        struct aiocb *aiocb = &channel->event.disk_event._starpu_aiocb_disk;
+
+        memset(aiocb, 0, sizeof(struct aiocb));
+
+        aiocb->aio_fildes = tmp->descriptor;
+        aiocb->aio_offset = offset;
+        aiocb->aio_nbytes = size;
+        aiocb->aio_buf = buf;
+        aiocb->aio_reqprio = 0;
+        aiocb->aio_lio_opcode = LIO_NOP;
+
+        return aio_read(aiocb);
+}
+
+
+
 /* write on the memory disk */
  int 
 starpu_unistd_global_write (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, const void *buf, off_t offset, size_t size, void * async_channel)
@@ -196,6 +221,27 @@ starpu_unistd_global_write (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, const
 }
 
 
+int
+starpu_unistd_global_async_write (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size, void * async_channel)
+{
+        struct starpu_unistd_global_obj * tmp = (struct starpu_unistd_global_obj *) obj;
+
+        struct _starpu_async_channel * channel = (struct _starpu_async_channel *) async_channel;
+        struct aiocb *aiocb = &channel->event.disk_event._starpu_aiocb_disk ;
+        memset(aiocb, 0, sizeof(struct aiocb));
+
+        aiocb->aio_fildes = tmp->descriptor;
+        aiocb->aio_offset = offset;
+        aiocb->aio_nbytes = size;
+        aiocb->aio_buf = buf;
+        aiocb->aio_reqprio = 0;
+        aiocb->aio_lio_opcode = LIO_NOP;
+
+        return aio_write(aiocb);
+}
+
+
+
 /* create a new copy of parameter == base */
  void * 
 starpu_unistd_global_plug (void *parameter)
@@ -287,3 +333,47 @@ get_unistd_global_bandwidth_between_disk_and_main_ram(unsigned node)
 					       timing_latency/NITER, timing_latency/NITER, node);
 	return 1;
 }
+
+void
+starpu_unistd_global_wait_request(void * async_channel)
+{
+        struct _starpu_async_channel * channel = (struct _starpu_async_channel *) async_channel;
+        const struct aiocb * aiocb = &channel->event.disk_event._starpu_aiocb_disk;
+        const struct aiocb * list[1];
+        list[0] = aiocb;
+        int values = -1;
+        int error_disk = EAGAIN;
+        while(values < 0 || error_disk == EAGAIN)
+        {
+                /* Wait the answer of the request TIMESTAMP IS NULL */
+                values = aio_suspend(list, 1, NULL);
+                error_disk = errno;
+        }
+}
+
+int
+starpu_unistd_global_test_request(void * async_channel)
+{
+        struct timespec time_wait_request;
+        time_wait_request.tv_sec = 0;
+        time_wait_request.tv_nsec = 0;
+
+        struct _starpu_async_channel * channel = (struct _starpu_async_channel *) async_channel;
+        const struct aiocb * aiocb = &channel->event.disk_event._starpu_aiocb_disk;
+        const struct aiocb * list[1];
+        list[0] = aiocb;
+        int values = -1;
+        int error_disk = EAGAIN;
+
+        /* Wait the answer of the request */
+        values = aio_suspend(list, 1, &time_wait_request);
+        error_disk = errno;
+        /* request is finished */
+        if (values == 0)
+                return 1;
+        /* values == -1 */
+        if (error_disk == EAGAIN)
+                return 0;
+        /* an error occured */
+        STARPU_ABORT();
+}

+ 4 - 0
src/core/disk_ops/unistd/disk_unistd_global.h

@@ -34,5 +34,9 @@ struct starpu_unistd_global_obj {
  void * starpu_unistd_global_plug (void *parameter);
  void starpu_unistd_global_unplug (void *base);
  int get_unistd_global_bandwidth_between_disk_and_main_ram(unsigned node);
+int starpu_unistd_global_async_read (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size, void * async_channel);
+int starpu_unistd_global_async_write (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size, void * async_channel);
+void starpu_unistd_global_wait_request(void * async_channel);
+int starpu_unistd_global_test_request(void * async_channel);
 
 #endif

+ 1 - 1
tests/disk/disk_compute.c

@@ -65,7 +65,7 @@ int main(int argc, char **argv)
 
 
 	/* register a disk */
-	int new_dd = starpu_disk_register(&starpu_disk_stdio_ops, (void *) base, 1024*1024*1);
+	int new_dd = starpu_disk_register(&starpu_disk_unistd_o_direct_ops, (void *) base, 1024*1024*1);
 	/* can't write on /tmp/ */
 	if (new_dd == -ENOENT) goto enoent;