Prechádzať zdrojové kódy

Move context from write/read to plug/unplug to reduce number of io_setup/io_destroy

Corentin Salingue 8 rokov pred
rodič
commit
71a5919691

+ 88 - 26
src/core/disk_ops/unistd/disk_unistd_global.c

@@ -35,8 +35,10 @@
 #include <core/perfmodel/perfmodel.h>
 #include <core/disk_ops/unistd/disk_unistd_global.h>
 #include <datawizard/copy_driver.h>
+#include <datawizard/data_request.h>
 #include <datawizard/memory_manager.h>
 #include <starpu_parameters.h>
+#include <common/uthash.h>
 
 #ifdef STARPU_HAVE_WINDOWS
 #  include <io.h>
@@ -59,14 +61,26 @@ struct starpu_unistd_base
 {
 	char * path;
 	int created;
+#if defined(HAVE_LIBAIO_H)
+	io_context_t ctx;
+        struct starpu_unistd_aiocb_link * hashtable;
+        starpu_pthread_mutex_t mutex;
+#endif
 };
 
 #if defined(HAVE_LIBAIO_H)
+struct starpu_unistd_aiocb_link
+{
+        UT_hash_handle hh;
+        void * starpu_aiocb;
+        void * aiocb;
+};
 struct starpu_unistd_aiocb
 {
+        int finished;
 	struct iocb iocb;
-	io_context_t ctx;
 	struct starpu_unistd_global_obj *obj;
+        struct starpu_unistd_base *base;
 	size_t len;
 };
 #elif defined(HAVE_AIO_H)
@@ -240,24 +254,24 @@ int starpu_unistd_global_read(void *base STARPU_ATTRIBUTE_UNUSED, void *obj, voi
 }
 
 #if defined(HAVE_LIBAIO_H)
-void *starpu_unistd_global_async_read(void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size)
+void *starpu_unistd_global_async_read(void *base, void *obj, void *buf, off_t offset, size_t size)
 {
+        struct starpu_unistd_base * fileBase = (struct starpu_unistd_base *) base;
         struct starpu_unistd_global_obj *tmp = obj;
         struct starpu_unistd_aiocb *starpu_aiocb;
 	_STARPU_CALLOC(starpu_aiocb, 1,sizeof(*starpu_aiocb));
         struct iocb *iocb = &starpu_aiocb->iocb;
         starpu_aiocb->obj = obj;
         int fd = tmp->descriptor;
-	int ret;
 
         if (fd < 0)
                 fd = _starpu_unistd_reopen(obj);
 
-	ret = io_setup(1, &starpu_aiocb->ctx);
-	STARPU_ASSERT(ret == 0);
 	starpu_aiocb->len = size;
+        starpu_aiocb->finished = 0;
+        starpu_aiocb->base = fileBase;
 	io_prep_pread(iocb, fd, buf, size, offset);
-	if (io_submit(starpu_aiocb->ctx, 1, &iocb) < 0)
+	if (io_submit(fileBase->ctx, 1, &iocb) < 0)
 	{
                 free(iocb);
                 if (tmp->descriptor < 0)
@@ -265,6 +279,14 @@ void *starpu_unistd_global_async_read(void *base STARPU_ATTRIBUTE_UNUSED, void *
                 iocb = NULL;
         }
 
+        struct starpu_unistd_aiocb_link *l;
+        _STARPU_MALLOC(l, sizeof(*l));
+        l->aiocb = iocb;
+        l->starpu_aiocb = starpu_aiocb;
+        STARPU_PTHREAD_MUTEX_LOCK(&fileBase->mutex);
+        HASH_ADD_PTR(fileBase->hashtable, aiocb, l);
+        STARPU_PTHREAD_MUTEX_UNLOCK(&fileBase->mutex);
+
         return starpu_aiocb;
 }
 #elif defined(HAVE_AIO_H)
@@ -357,24 +379,24 @@ int starpu_unistd_global_write(void *base STARPU_ATTRIBUTE_UNUSED, void *obj, co
 }
 
 #if defined(HAVE_LIBAIO_H)
-void *starpu_unistd_global_async_write(void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size)
+void *starpu_unistd_global_async_write(void *base, void *obj, void *buf, off_t offset, size_t size)
 {
+        struct starpu_unistd_base * fileBase = (struct starpu_unistd_base *) base;
         struct starpu_unistd_global_obj *tmp = obj;
         struct starpu_unistd_aiocb *starpu_aiocb;
 	_STARPU_CALLOC(starpu_aiocb, 1,sizeof(*starpu_aiocb));
         struct iocb *iocb = &starpu_aiocb->iocb;
         starpu_aiocb->obj = obj;
         int fd = tmp->descriptor;
-	int ret;
 
         if (fd < 0)
                 fd = _starpu_unistd_reopen(obj);
 
-	ret = io_setup(1, &starpu_aiocb->ctx);
-	STARPU_ASSERT(ret == 0);
 	starpu_aiocb->len = size;
+        starpu_aiocb->finished = 0;
+        starpu_aiocb->base = fileBase;
 	io_prep_pwrite(iocb, fd, buf, size, offset);
-	if (io_submit(starpu_aiocb->ctx, 1, &iocb) < 0)
+	if (io_submit(fileBase->ctx, 1, &iocb) < 0)
         {
                 free(iocb);
                 if (tmp->descriptor < 0)
@@ -382,6 +404,14 @@ void *starpu_unistd_global_async_write(void *base STARPU_ATTRIBUTE_UNUSED, void
                 iocb = NULL;
         }
 
+        struct starpu_unistd_aiocb_link *l;
+        _STARPU_MALLOC(l, sizeof(*l));
+        l->aiocb = iocb;
+        l->starpu_aiocb = starpu_aiocb;
+        STARPU_PTHREAD_MUTEX_LOCK(&fileBase->mutex);
+        HASH_ADD_PTR(fileBase->hashtable, aiocb, l);
+        STARPU_PTHREAD_MUTEX_UNLOCK(&fileBase->mutex);
+
         return starpu_aiocb;
 }
 #elif defined(HAVE_AIO_H)
@@ -456,6 +486,15 @@ void *starpu_unistd_global_plug(void *parameter, starpu_ssize_t size STARPU_ATTR
 		}
 	}
 
+#if defined(HAVE_LIBAIO_H)
+        STARPU_PTHREAD_MUTEX_INIT(&base->mutex, NULL);
+        base->hashtable = NULL;
+        unsigned nb_event = MAX_PENDING_REQUESTS_PER_NODE + MAX_PENDING_PREFETCH_REQUESTS_PER_NODE + MAX_PENDING_IDLE_REQUESTS_PER_NODE;
+        memset(&base->ctx, 0, sizeof(base->ctx));
+	int ret = io_setup(nb_event, &base->ctx);
+	STARPU_ASSERT(ret == 0);
+#endif
+
 	return (void *) base;
 }
 
@@ -463,6 +502,10 @@ void *starpu_unistd_global_plug(void *parameter, starpu_ssize_t size STARPU_ATTR
 void starpu_unistd_global_unplug(void *base)
 {
 	struct starpu_unistd_base * fileBase = (struct starpu_unistd_base *) base;
+#if defined(HAVE_LIBAIO_H)
+        STARPU_PTHREAD_MUTEX_DESTROY(&fileBase->mutex);
+        io_destroy(fileBase->ctx);
+#endif
 	if (fileBase->created)
 		rmdir(fileBase->path);
 	free(fileBase->path);
@@ -561,16 +604,26 @@ void starpu_unistd_global_wait_request(void *async_channel)
 
         int values = -1;
         int myerrno = EAGAIN;
-        while(values <= 0 && (myerrno == EAGAIN || myerrno == EINTR))
+        while(!starpu_aiocb->finished || (values <= 0 && (myerrno == EAGAIN || myerrno == EINTR)))
         {
                 /* Wait the answer of the request timeout IS NULL */
-		values = io_getevents(starpu_aiocb->ctx, 1, 1, &event, NULL);
+		values = io_getevents(starpu_aiocb->base->ctx, 1, 1, &event, NULL);
 		if (values < 0)
 			myerrno = -values;
+                if (values > 0)
+                {
+                        //we may catch an other request...
+                        STARPU_PTHREAD_MUTEX_LOCK(&starpu_aiocb->base->mutex);
+                        struct starpu_unistd_aiocb_link *l = NULL;
+                        while (l == NULL)
+                                HASH_FIND_PTR(starpu_aiocb->base->hashtable, &event.obj, l);
+
+                        HASH_DEL(starpu_aiocb->base->hashtable, l);
+                        STARPU_PTHREAD_MUTEX_UNLOCK(&starpu_aiocb->base->mutex);
+                        ((struct starpu_unistd_aiocb *) l->starpu_aiocb)->finished = 1;
+                        free(l);
+                }
         }
-	STARPU_ASSERT(&starpu_aiocb->iocb == event.obj);
-        STARPU_ASSERT_MSG(!myerrno, "aio_error returned %d", myerrno);
-	STARPU_ASSERT(event.res == starpu_aiocb->len);
 }
 
 int starpu_unistd_global_test_request(void *async_channel)
@@ -580,22 +633,32 @@ int starpu_unistd_global_test_request(void *async_channel)
 	struct timespec ts;
         int ret;
 
+        if (starpu_aiocb->finished)
+                return 1;
+
 	memset(&ts, 0, sizeof(ts));
 
         /* Test the answer of the request */
-	ret = io_getevents(starpu_aiocb->ctx, 0, 1, &event, &ts);
+	ret = io_getevents(starpu_aiocb->base->ctx, 0, 1, &event, &ts);
 
         if (ret == 1)
 	{
-		STARPU_ASSERT(&starpu_aiocb->iocb == event.obj);
-		STARPU_ASSERT(event.res == starpu_aiocb->len);
-                /* request is finished */
-                return 1;
+                //we may catch an other request...
+                STARPU_PTHREAD_MUTEX_LOCK(&starpu_aiocb->base->mutex);
+                struct starpu_unistd_aiocb_link *l = NULL;
+                while (l == NULL)
+                        HASH_FIND_PTR(starpu_aiocb->base->hashtable, &event.obj, l);
+
+                HASH_DEL(starpu_aiocb->base->hashtable, l);
+                STARPU_PTHREAD_MUTEX_UNLOCK(&starpu_aiocb->base->mutex);
+                ((struct starpu_unistd_aiocb *) l->starpu_aiocb)->finished = 1;
+                free(l);
+
+                if (starpu_aiocb->finished)
+                        return 1;
 	}
-        if (ret == 0 || ret == -EINTR || ret == -EINPROGRESS || ret == -EAGAIN)
-                return 0;
-        /* an error occured */
-        STARPU_ABORT_MSG("aio_error returned %d", ret);
+
+        return 0;
 }
 
 void starpu_unistd_global_free_request(void *async_channel)
@@ -604,7 +667,6 @@ void starpu_unistd_global_free_request(void *async_channel)
         struct iocb *iocb = &starpu_aiocb->iocb;
         if (starpu_aiocb->obj->descriptor < 0)
                 _starpu_unistd_reclose(iocb->aio_fildes);
-        io_destroy(starpu_aiocb->ctx);
         free(starpu_aiocb);
 }
 #elif defined(HAVE_AIO_H)

+ 0 - 9
src/datawizard/data_request.c

@@ -24,15 +24,6 @@
 #include <core/disk.h>
 #include <core/simgrid.h>
 
-/* TODO: This should be tuned according to driver capabilities
- * Data interfaces should also have to declare how many asynchronous requests
- * they have actually started (think of e.g. csr).
- */
-#define MAX_PENDING_REQUESTS_PER_NODE 20
-#define MAX_PENDING_PREFETCH_REQUESTS_PER_NODE 10
-#define MAX_PENDING_IDLE_REQUESTS_PER_NODE 1
-#define MAX_PUSH_TIME 1000 /* Maximum time in us that we can afford pushing requests before going back to the driver loop, e.g. for checking GPU task termination */
-
 /* requests that have not been treated at all */
 static struct _starpu_data_request_prio_list data_requests[STARPU_MAXNODES];
 static struct _starpu_data_request_prio_list prefetch_requests[STARPU_MAXNODES];

+ 9 - 0
src/datawizard/data_request.h

@@ -27,6 +27,15 @@
 #include <common/prio_list.h>
 #include <common/starpu_spinlock.h>
 
+/* TODO: This should be tuned according to driver capabilities
+ * Data interfaces should also have to declare how many asynchronous requests
+ * they have actually started (think of e.g. csr).
+ */
+#define MAX_PENDING_REQUESTS_PER_NODE 20
+#define MAX_PENDING_PREFETCH_REQUESTS_PER_NODE 10
+#define MAX_PENDING_IDLE_REQUESTS_PER_NODE 1
+#define MAX_PUSH_TIME 1000 /* Maximum time in us that we can afford pushing requests before going back to the driver loop, e.g. for checking GPU task termination */
+
 struct _starpu_data_replicate;
 
 struct _starpu_callback_list