Просмотр исходного кода

Use an internal thread to perform sync read/write operations on HDF5

Corentin Salingue лет назад: 8
Родитель
Сommit
331c23d466
1 измененных файлов с 246 добавлено и 107 удалено
  1. 246 107
      src/core/disk_ops/disk_hdf5.c

+ 246 - 107
src/core/disk_ops/disk_hdf5.c

@@ -37,13 +37,29 @@ static int nb_disk_open = 0;
 
 /* ------------------- use HDF5 to write on disk -------------------  */
 
+enum hdf5_work_type { READ, WRITE, FULL_READ, FULL_WRITE };
+
+LIST_TYPE(_starpu_hdf5_work,
+        enum hdf5_work_type type;
+        struct starpu_hdf5_obj * obj;
+        struct starpu_hdf5_base * base;
+        void * ptr;
+        off_t offset;
+        size_t size;
+        void * event;
+);
+
 struct starpu_hdf5_base
 {
         hid_t fileID;
         char * path;
         unsigned created;       /* StarPU creates the HDF5 file */
         unsigned next_dataset_id;
-	starpu_pthread_mutex_t mutex;
+        starpu_pthread_t thread;        /* This thread will perform each write/read because we don't have asynchronous functions */
+        int run;                        /* Ask to the thread if he can continue */
+	starpu_pthread_mutex_t mutex;   /* Mutex is used to protect work_list and if HDF5 library is not safe */
+        starpu_pthread_cond_t cond;
+        struct _starpu_hdf5_work_list * work_list;        /* This list contains the work for the hdf5 thread */
 };
 
 struct starpu_hdf5_obj
@@ -68,6 +84,177 @@ static inline void _starpu_hdf5_protect_stop(void * base)
 #endif
 }
 
+/* ------------------ Functions for internal thread -------------------- */
+
+static void starpu_hdf5_full_read_internal(struct _starpu_hdf5_work * work)
+{
+        herr_t status;
+
+        status = H5Dread(work->obj->dataset, H5T_NATIVE_CHAR, H5S_ALL, H5S_ALL, H5P_DEFAULT, work->ptr);
+        STARPU_ASSERT_MSG(status >= 0, "Can not read data associed to this dataset (%s)\n", work->obj->path);
+}
+
+static void starpu_hdf5_full_write_internal(struct _starpu_hdf5_work * work)
+{
+        herr_t status;
+
+        /* Write ALL the dataspace */
+        status = H5Dwrite(work->obj->dataset, H5T_NATIVE_CHAR, H5S_ALL, H5S_ALL, H5P_DEFAULT, work->ptr);
+        STARPU_ASSERT_MSG(status >= 0, "Can not write data to this dataset (%s)\n", work->obj->path);
+}
+
+static void starpu_hdf5_read_internal(struct _starpu_hdf5_work * work)
+{
+        herr_t status;
+
+        /* Get official datatype */
+        hid_t datatype = H5Dget_type(work->obj->dataset);
+        hsize_t sizeDatatype = H5Tget_size(datatype);
+
+        /* count in element, not in byte */
+        work->offset /= sizeDatatype;
+        work->size /= sizeDatatype;
+
+        /* duplicate the dataspace in the dataset */
+        hid_t dataspace_select = H5Dget_space(work->obj->dataset);
+        STARPU_ASSERT_MSG(dataspace_select >= 0, "Error when reading this HDF5 dataset (%s)\n", work->obj->path);
+
+        /* Select what we want of the duplicated dataspace (it's called an hyperslab). This operation is done on place */
+        hsize_t offsets[1] = {work->offset};
+        hsize_t count[1] = {work->size};
+        /* stride and block size are NULL which is equivalent of a shift of 1 */
+        status = H5Sselect_hyperslab(dataspace_select, H5S_SELECT_SET, offsets, NULL, count, NULL);
+        STARPU_ASSERT_MSG(status >= 0, "Error when reading this HDF5 dataset (%s)\n", work->obj->path);
+
+        /* create the dataspace for the received data which describes ptr */
+        hsize_t dims_receive[1] = {work->size};
+        hid_t dataspace_receive = H5Screate_simple(1, dims_receive, NULL);
+        STARPU_ASSERT_MSG(dataspace_receive >= 0, "Error when reading this HDF5 dataset (%s)\n", work->obj->path);
+
+        /* Receiver has to be an hyperslabs */
+        offsets[0] = 0;
+        count[0] = work->size;
+        status = H5Sselect_hyperslab(dataspace_receive, H5S_SELECT_SET, offsets, NULL, count, NULL);
+        STARPU_ASSERT_MSG(dataspace_receive >= 0, "Error when reading this HDF5 dataset (%s)\n", work->obj->path);
+
+        status = H5Dread(work->obj->dataset, datatype, dataspace_receive, dataspace_select, H5P_DEFAULT, work->ptr);
+        STARPU_ASSERT_MSG(status >= 0, "Error when reading this HDF5 dataset (%s)\n", work->obj->path);
+
+        /* don't need these dataspaces */
+        status = H5Sclose(dataspace_select);
+        STARPU_ASSERT_MSG(status >= 0, "Error when reading this HDF5 dataset (%s)\n", work->obj->path);
+        status = H5Sclose(dataspace_receive);
+        STARPU_ASSERT_MSG(status >= 0, "Error when reading this HDF5 dataset (%s)\n", work->obj->path);
+}
+
+static void starpu_hdf5_write_internal(struct _starpu_hdf5_work * work)
+{
+        herr_t status;
+
+        /* Get official datatype */
+        hid_t datatype = H5Dget_type(work->obj->dataset);
+        hsize_t sizeDatatype = H5Tget_size(datatype);
+
+        /* count in element, not in byte */
+        work->offset /= sizeDatatype;
+        work->size /= sizeDatatype;
+
+        /* duplicate the dataspace in the dataset */
+        hid_t dataspace_select = H5Dget_space(work->obj->dataset);
+        STARPU_ASSERT_MSG(dataspace_select >= 0, "Error when writing this HDF5 dataset (%s)\n", work->obj->path);
+
+        /* Select what we want of the duplicated dataspace (it's called an hyperslab). This operation is done on place */
+        hsize_t offsets[1] = {work->offset};
+        hsize_t count[1] = {work->size};
+        /* stride and block size are NULL which is equivalent of a shift of 1 */
+        status = H5Sselect_hyperslab(dataspace_select, H5S_SELECT_SET, offsets, NULL, count, NULL);
+        STARPU_ASSERT_MSG(status >= 0, "Error when writing this HDF5 dataset (%s)\n", work->obj->path);
+
+        /* create the dataspace for the received data which describes ptr */
+        hsize_t dims_send[1] = {work->size};
+        hid_t dataspace_send = H5Screate_simple(1, dims_send, NULL);
+        STARPU_ASSERT_MSG(dataspace_send >= 0, "Error when writing this HDF5 dataset (%s)\n", work->obj->path);
+
+        /* Receiver has to be an hyperslabs */
+        offsets[0] = 0;
+        count[0] = work->size;
+        status = H5Sselect_hyperslab(dataspace_send, H5S_SELECT_SET, offsets, NULL, count, NULL);
+        STARPU_ASSERT_MSG(dataspace_send >= 0, "Error when writing this HDF5 dataset (%s)\n", work->obj->path);
+
+        status = H5Dwrite(work->obj->dataset, datatype, dataspace_send, dataspace_select, H5P_DEFAULT, work->ptr);
+        STARPU_ASSERT_MSG(status >= 0, "Error when writing this HDF5 dataset (%s)\n", work->obj->path);
+
+        /* don't need these dataspaces */
+        status = H5Sclose(dataspace_select);
+        STARPU_ASSERT_MSG(status >= 0, "Error when writing this HDF5 dataset (%s)\n", work->obj->path);
+        status = H5Sclose(dataspace_send);
+        STARPU_ASSERT_MSG(status >= 0, "Error when writing this HDF5 dataset (%s)\n", work->obj->path);
+}
+
+static void * _starpu_hdf5_internal_thread(void * arg)
+{
+        struct starpu_hdf5_base * base = (struct starpu_hdf5_base *) arg;
+        while (base->run || !_starpu_hdf5_work_list_empty(base->work_list))
+        {
+                STARPU_PTHREAD_MUTEX_LOCK(&base->mutex);
+                if (_starpu_hdf5_work_list_empty(base->work_list) && base->run)
+                        STARPU_PTHREAD_COND_WAIT(&base->cond, &base->mutex);
+                STARPU_PTHREAD_MUTEX_UNLOCK(&base->mutex);
+
+                /* We are the only consummer here, don't need to protect here */
+                if (!_starpu_hdf5_work_list_empty(base->work_list))
+                {
+                        STARPU_PTHREAD_MUTEX_LOCK(&base->mutex);
+                        struct _starpu_hdf5_work * work = _starpu_hdf5_work_list_pop_back(base->work_list);
+                        STARPU_PTHREAD_MUTEX_UNLOCK(&base->mutex);
+
+                        _starpu_hdf5_protect_start(work->base);
+                        switch(work->type)
+                        {
+                                case READ:
+                                        starpu_hdf5_read_internal(work);
+                                        break;
+
+                                case WRITE:
+                                        starpu_hdf5_write_internal(work);
+                                        break;
+
+                                case FULL_READ:
+                                        starpu_hdf5_full_read_internal(work);
+                                        break;
+
+                                case FULL_WRITE:
+                                        starpu_hdf5_full_write_internal(work);
+                                        break;
+
+                                default:
+                                        STARPU_ABORT();
+                        }
+                        _starpu_hdf5_protect_stop(work->base);
+
+                        /* Update event to tell it's finished */
+                        *((int *) work->event) = 1;
+
+                        free(work);
+                }
+        }
+
+        STARPU_PTHREAD_MUTEX_LOCK(&base->mutex);
+        STARPU_PTHREAD_COND_BROADCAST(&base->cond);
+        STARPU_PTHREAD_MUTEX_UNLOCK(&base->mutex);
+
+        return NULL;
+}
+
+static void _starpu_hdf5_create_thread(struct starpu_hdf5_base * base)
+{
+        base->work_list = _starpu_hdf5_work_list_new();
+        base->run = 1;
+
+        STARPU_PTHREAD_COND_INIT(&base->cond, NULL);
+        STARPU_PTHREAD_CREATE(&base->thread, NULL, _starpu_hdf5_internal_thread, (void *) base); 
+}
+
 /* returns the size in BYTES */
 static hsize_t _starpu_get_size_obj(struct starpu_hdf5_obj * obj)
 {
@@ -92,6 +279,29 @@ static hsize_t _starpu_get_size_obj(struct starpu_hdf5_obj * obj)
         return dims[0]*sizeDatatype;
 }
 
+static void starpu_hdf5_send_work(void *base, void *obj, void *buf, off_t offset, size_t size, void * event, enum hdf5_work_type type)
+{
+        struct starpu_hdf5_obj * dataObj = (struct starpu_hdf5_obj *) obj;
+        struct starpu_hdf5_base * fileBase = (struct starpu_hdf5_base *) base;
+
+        struct _starpu_hdf5_work * work;
+        _STARPU_MALLOC(work, sizeof(*work));
+
+        work->type = type;
+        work->obj = dataObj;
+        work->base = fileBase;
+        work->ptr = buf;
+        work->offset = offset;
+        work->size = size;
+        work->event = event;
+
+        STARPU_PTHREAD_MUTEX_LOCK(&fileBase->mutex);
+        _starpu_hdf5_work_list_push_front(fileBase->work_list, work);
+        /* Wake up internal thread */
+        STARPU_PTHREAD_COND_BROADCAST(&fileBase->cond);
+        STARPU_PTHREAD_MUTEX_UNLOCK(&fileBase->mutex);
+}
+
 static struct starpu_hdf5_obj * _starpu_hdf5_data_alloc(struct starpu_hdf5_base * fileBase,  char * name, size_t size)
 {
         struct starpu_hdf5_obj * obj;
@@ -166,7 +376,6 @@ static void *starpu_hdf5_plug(void *parameter, starpu_ssize_t size STARPU_ATTRIB
         _STARPU_MALLOC(base, sizeof(struct starpu_hdf5_base));
 
 	STARPU_PTHREAD_MUTEX_INIT(&base->mutex, NULL);
-
         _starpu_hdf5_protect_start(base);
 
         struct stat buf;
@@ -210,6 +419,8 @@ static void *starpu_hdf5_plug(void *parameter, starpu_ssize_t size STARPU_ATTRIB
                 base->path = path;
         }
 
+        _starpu_hdf5_create_thread(base);
+
         _starpu_hdf5_protect_stop(base);
 
         base->next_dataset_id = 0;
@@ -227,12 +438,17 @@ static void starpu_hdf5_unplug(void *base)
         struct starpu_hdf5_base * fileBase = (struct starpu_hdf5_base *) base;
         herr_t status;
 
-        _starpu_hdf5_protect_start(base);
+        STARPU_PTHREAD_MUTEX_LOCK(&fileBase->mutex);
+
+        fileBase->run = 0;
+        STARPU_PTHREAD_COND_BROADCAST(&fileBase->cond);
+        STARPU_PTHREAD_COND_WAIT(&fileBase->cond, &fileBase->mutex);
+        /* the internal thread is deleted */
 
-	STARPU_PTHREAD_MUTEX_DESTROY(&fileBase->mutex);
         status = H5Fclose(fileBase->fileID);
 
-        _starpu_hdf5_protect_stop(base);
+        STARPU_PTHREAD_MUTEX_UNLOCK(&fileBase->mutex);
+	STARPU_PTHREAD_MUTEX_DESTROY(&fileBase->mutex);
 
         STARPU_ASSERT_MSG(status >= 0, "Can not unplug this HDF5 disk (%s)\n", fileBase->path);
         if (fileBase->created)
@@ -286,7 +502,6 @@ static void starpu_hdf5_free(void *base, void *obj, size_t size STARPU_ATTRIBUTE
 
         _starpu_hdf5_protect_start(base);
 
-        /* TODO delete dataset */
         status = H5Dclose(dataObj->dataset);
         STARPU_ASSERT_MSG(status >= 0, "Can not free this HDF5 dataset (%s)\n", dataObj->path);
 
@@ -336,138 +551,62 @@ static void starpu_hdf5_close(void *base, void *obj, size_t size STARPU_ATTRIBUT
         free(dataObj);
 }
 
+static void starpu_hdf5_wait(void * event)
+{
+        volatile int * finished = (int *) event;
+
+        while (*finished == 0)
+                ;
+}
+
 static int starpu_hdf5_full_read(void *base, void *obj, void **ptr, size_t *size)
 {
         struct starpu_hdf5_obj * dataObj = (struct starpu_hdf5_obj *) obj;
-        herr_t status;
 
-        _starpu_hdf5_protect_start(base);
+        int finished = 0;
 
-        /* Get the size of the dataspace (only 1 dimension) */
+        _starpu_hdf5_protect_start(base);
         *size = _starpu_get_size_obj(dataObj);
+        _starpu_hdf5_protect_stop(base);
 
         starpu_malloc_flags(ptr, *size, 0); 
 
-        status = H5Dread(dataObj->dataset, H5T_NATIVE_CHAR, H5S_ALL, H5S_ALL, H5P_DEFAULT, *ptr);
-        STARPU_ASSERT_MSG(status >= 0, "Can not read data associed to this dataset (%s)\n", dataObj->path);
-
-        _starpu_hdf5_protect_stop(base);
-
+        starpu_hdf5_send_work(base, obj, *ptr, 0, *size, (void*) &finished, FULL_READ);
+        
+        starpu_hdf5_wait(&finished);
+        
         return 0;
 }
 
 static int starpu_hdf5_full_write(void *base, void *obj, void *ptr, size_t size)
 {
-        struct starpu_hdf5_obj * dataObj = (struct starpu_hdf5_obj *) obj;
-        herr_t status;
+        int finished = 0;
 
-        /* Write ALL the dataspace */
-        _starpu_hdf5_protect_start(base);
-        status = H5Dwrite(dataObj->dataset, H5T_NATIVE_CHAR, H5S_ALL, H5S_ALL, H5P_DEFAULT, ptr);
-        _starpu_hdf5_protect_stop(base);
-        STARPU_ASSERT_MSG(status >= 0, "Can not write data to this dataset (%s)\n", dataObj->path);
+        starpu_hdf5_send_work(base, obj, ptr, 0, size, (void*) &finished, FULL_WRITE);
+
+        starpu_hdf5_wait(&finished);
 
         return 0;
 }
 
 static int starpu_hdf5_read(void *base, void *obj, void *buf, off_t offset, size_t size)
 {
-        struct starpu_hdf5_obj * dataObj = (struct starpu_hdf5_obj *) obj;
-        herr_t status;
-
-        _starpu_hdf5_protect_start(base);
-
-        /* Get official datatype */
-        hid_t datatype = H5Dget_type(dataObj->dataset);
-        hsize_t sizeDatatype = H5Tget_size(datatype);
-
-        /* count in element, not in byte */
-        offset /= sizeDatatype;
-        size /= sizeDatatype;
-
-        /* duplicate the dataspace in the dataset */
-        hid_t dataspace_select = H5Dget_space(dataObj->dataset);
-        STARPU_ASSERT_MSG(dataspace_select >= 0, "Error when reading this HDF5 dataset (%s)\n", dataObj->path);
-
-        /* Select what we want of the duplicated dataspace (it's called an hyperslab). This operation is done on place */
-        hsize_t offsets[1] = {offset};
-        hsize_t count[1] = {size};
-        /* stride and block size are NULL which is equivalent of a shift of 1 */
-        status = H5Sselect_hyperslab(dataspace_select, H5S_SELECT_SET, offsets, NULL, count, NULL);
-        STARPU_ASSERT_MSG(status >= 0, "Error when reading this HDF5 dataset (%s)\n", dataObj->path);
-
-        /* create the dataspace for the received data which describes ptr */
-        hsize_t dims_receive[1] = {size};
-        hid_t dataspace_receive = H5Screate_simple(1, dims_receive, NULL);
-        STARPU_ASSERT_MSG(dataspace_receive >= 0, "Error when reading this HDF5 dataset (%s)\n", dataObj->path);
-
-        /* Receiver has to be an hyperslabs */
-        offsets[0] = 0;
-        count[0] = size;
-        status = H5Sselect_hyperslab(dataspace_receive, H5S_SELECT_SET, offsets, NULL, count, NULL);
-        STARPU_ASSERT_MSG(dataspace_receive >= 0, "Error when reading this HDF5 dataset (%s)\n", dataObj->path);
-
-        status = H5Dread(dataObj->dataset, datatype, dataspace_receive, dataspace_select, H5P_DEFAULT, buf);
-        STARPU_ASSERT_MSG(status >= 0, "Error when reading this HDF5 dataset (%s)\n", dataObj->path);
+        int finished = 0;
 
-        /* don't need these dataspaces */
-        status = H5Sclose(dataspace_select);
-        STARPU_ASSERT_MSG(status >= 0, "Error when reading this HDF5 dataset (%s)\n", dataObj->path);
-        status = H5Sclose(dataspace_receive);
-        STARPU_ASSERT_MSG(status >= 0, "Error when reading this HDF5 dataset (%s)\n", dataObj->path);
+        starpu_hdf5_send_work(base, obj, buf, offset, size, (void*) &finished, READ);
 
-        _starpu_hdf5_protect_stop(base);
+        starpu_hdf5_wait(&finished);
 
         return 0;
 }
 
 static int starpu_hdf5_write(void *base, void *obj, const void *buf, off_t offset, size_t size)
 {
-        struct starpu_hdf5_obj * dataObj = (struct starpu_hdf5_obj *) obj;
-        herr_t status;
-
-        _starpu_hdf5_protect_start(base);
-
-        /* Get official datatype */
-        hid_t datatype = H5Dget_type(dataObj->dataset);
-        hsize_t sizeDatatype = H5Tget_size(datatype);
-
-        /* count in element, not in byte */
-        offset /= sizeDatatype;
-        size /= sizeDatatype;
-
-        /* duplicate the dataspace in the dataset */
-        hid_t dataspace_select = H5Dget_space(dataObj->dataset);
-        STARPU_ASSERT_MSG(dataspace_select >= 0, "Error when writing this HDF5 dataset (%s)\n", dataObj->path);
-
-        /* Select what we want of the duplicated dataspace (it's called an hyperslab). This operation is done on place */
-        hsize_t offsets[1] = {offset};
-        hsize_t count[1] = {size};
-        /* stride and block size are NULL which is equivalent of a shift of 1 */
-        status = H5Sselect_hyperslab(dataspace_select, H5S_SELECT_SET, offsets, NULL, count, NULL);
-        STARPU_ASSERT_MSG(status >= 0, "Error when writing this HDF5 dataset (%s)\n", dataObj->path);
-
-        /* create the dataspace for the received data which describes ptr */
-        hsize_t dims_send[1] = {size};
-        hid_t dataspace_send = H5Screate_simple(1, dims_send, NULL);
-        STARPU_ASSERT_MSG(dataspace_send >= 0, "Error when writing this HDF5 dataset (%s)\n", dataObj->path);
-
-        /* Receiver has to be an hyperslabs */
-        offsets[0] = 0;
-        count[0] = size;
-        status = H5Sselect_hyperslab(dataspace_send, H5S_SELECT_SET, offsets, NULL, count, NULL);
-        STARPU_ASSERT_MSG(dataspace_send >= 0, "Error when writing this HDF5 dataset (%s)\n", dataObj->path);
+        int finished = 0;
 
-        status = H5Dwrite(dataObj->dataset, datatype, dataspace_send, dataspace_select, H5P_DEFAULT, buf);
-        STARPU_ASSERT_MSG(status >= 0, "Error when writing this HDF5 dataset (%s)\n", dataObj->path);
+        starpu_hdf5_send_work(base, obj, (void *) buf, offset, size, (void*) &finished, WRITE);
 
-        /* don't need these dataspaces */
-        status = H5Sclose(dataspace_select);
-        STARPU_ASSERT_MSG(status >= 0, "Error when writing this HDF5 dataset (%s)\n", dataObj->path);
-        status = H5Sclose(dataspace_send);
-        STARPU_ASSERT_MSG(status >= 0, "Error when writing this HDF5 dataset (%s)\n", dataObj->path);
-
-        _starpu_hdf5_protect_stop(base);
+        starpu_hdf5_wait(&finished);
 
         return 0;
 }