Преглед на файлове

Add HDF5 disk to disk function

Corentin Salingue преди 8 години
родител
ревизия
7b77630c8e
променени са 1 файла, в които са добавени 153 реда и са изтрити 61 реда
  1. 153 61
      src/core/disk_ops/disk_hdf5.c

+ 153 - 61
src/core/disk_ops/disk_hdf5.c

@@ -34,8 +34,6 @@
 #define NITER	_starpu_calibration_minimum
 #define STARPU_CHUNK_DIM 4096
 
-/* TODO: support disk-to-disk copy with HD5Ocopy */
-
 /* ------------------- use HDF5 to write on disk -------------------  */
 
 #ifndef H5_HAVE_THREADSAFE
@@ -66,16 +64,20 @@ static struct _starpu_hdf5_work_list global_work_list;        /* This list conta
 
 #endif									
 
-
-
-enum hdf5_work_type { READ, WRITE, FULL_READ, FULL_WRITE };
+enum hdf5_work_type { READ, WRITE, FULL_READ, FULL_WRITE, COPY };
 
 LIST_TYPE(_starpu_hdf5_work,
         enum hdf5_work_type type;
-        struct starpu_hdf5_obj * obj;
-        struct starpu_hdf5_base * base;
+
+        struct starpu_hdf5_base * base_src;
+        struct starpu_hdf5_obj * obj_src;
+        off_t offset_src;
+
+        struct starpu_hdf5_base * base_dst;
+        struct starpu_hdf5_obj * obj_dst;
+        off_t offset_dst;
+
         void * ptr;
-        off_t offset;
         size_t size;
         void * event;
 );
@@ -103,14 +105,16 @@ struct starpu_hdf5_obj
 static inline void _starpu_hdf5_protect_start(void * base STARPU_ATTRIBUTE_UNUSED)
 {
 #ifndef H5_HAVE_THREADSAFE
-        STARPU_PTHREAD_MUTEX_LOCK(&HDF5_VAR_MUTEX);
+	if (base != NULL)
+		STARPU_PTHREAD_MUTEX_LOCK(&HDF5_VAR_MUTEX);
 #endif
 }
 
 static inline void _starpu_hdf5_protect_stop(void * base STARPU_ATTRIBUTE_UNUSED)
 {
 #ifndef H5_HAVE_THREADSAFE
-        STARPU_PTHREAD_MUTEX_UNLOCK(&HDF5_VAR_MUTEX);
+	if (base != NULL)
+		STARPU_PTHREAD_MUTEX_UNLOCK(&HDF5_VAR_MUTEX);
 #endif
 }
 
@@ -121,8 +125,8 @@ static void starpu_hdf5_full_read_internal(struct _starpu_hdf5_work * work)
 {
         herr_t status;
 
-        status = H5Dread(work->obj->dataset, H5T_NATIVE_CHAR, H5S_ALL, H5S_ALL, H5P_DEFAULT, work->ptr);
-        STARPU_ASSERT_MSG(status >= 0, "Can not read data associed to this dataset (%s)\n", work->obj->path);
+        status = H5Dread(work->obj_src->dataset, H5T_NATIVE_CHAR, H5S_ALL, H5S_ALL, H5P_DEFAULT, work->ptr);
+        STARPU_ASSERT_MSG(status >= 0, "Can not read data associed to this dataset (%s)\n", work->obj_src->path);
 }
 
 /* TODO : Dataspace may not be NATIVE_CHAR for opened data */
@@ -131,22 +135,22 @@ static void starpu_hdf5_full_write_internal(struct _starpu_hdf5_work * work)
         herr_t status;
 
 	/* Update size of dataspace */
-	if (work->size > work->obj->size)
+	if (work->size > work->obj_dst->size)
 	{
 		/* Get official datatype */
-		hid_t datatype = H5Dget_type(work->obj->dataset);
+		hid_t datatype = H5Dget_type(work->obj_dst->dataset);
 		hsize_t sizeDatatype = H5Tget_size(datatype);
 		
 		/* Count in number of elements */
 		hsize_t extendsdim[1] = {work->size/sizeDatatype};
-		status = H5Dset_extent (work->obj->dataset, extendsdim);
+		status = H5Dset_extent (work->obj_dst->dataset, extendsdim);
 		STARPU_ASSERT_MSG(status >= 0, "Error when extending HDF5 dataspace !\n");
-		work->obj->size = work->size;
+		work->obj_dst->size = work->size;
 	}
 
         /* Write ALL the dataspace */
-        status = H5Dwrite(work->obj->dataset, H5T_NATIVE_CHAR, H5S_ALL, H5S_ALL, H5P_DEFAULT, work->ptr);
-        STARPU_ASSERT_MSG(status >= 0, "Can not write data to this dataset (%s)\n", work->obj->path);
+        status = H5Dwrite(work->obj_dst->dataset, H5T_NATIVE_CHAR, H5S_ALL, H5S_ALL, H5P_DEFAULT, work->ptr);
+        STARPU_ASSERT_MSG(status >= 0, "Can not write data to this dataset (%s)\n", work->obj_dst->path);
 }
 
 static void starpu_hdf5_read_internal(struct _starpu_hdf5_work * work)
@@ -154,43 +158,43 @@ static void starpu_hdf5_read_internal(struct _starpu_hdf5_work * work)
         herr_t status;
 
         /* Get official datatype */
-        hid_t datatype = H5Dget_type(work->obj->dataset);
+        hid_t datatype = H5Dget_type(work->obj_src->dataset);
         hsize_t sizeDatatype = H5Tget_size(datatype);
 
         /* count in element, not in byte */
-        work->offset /= sizeDatatype;
+        work->offset_src /= sizeDatatype;
         work->size /= sizeDatatype;
 
         /* duplicate the dataspace in the dataset */
-        hid_t dataspace_select = H5Dget_space(work->obj->dataset);
-        STARPU_ASSERT_MSG(dataspace_select >= 0, "Error when reading this HDF5 dataset (%s)\n", work->obj->path);
+        hid_t dataspace_select = H5Dget_space(work->obj_src->dataset);
+        STARPU_ASSERT_MSG(dataspace_select >= 0, "Error when reading this HDF5 dataset (%s)\n", work->obj_src->path);
 
         /* Select what we want of the duplicated dataspace (it's called an hyperslab). This operation is done on place */
-        hsize_t offsets[1] = {work->offset};
+        hsize_t offsets[1] = {work->offset_src};
         hsize_t count[1] = {work->size};
         /* stride and block size are NULL which is equivalent of a shift of 1 */
         status = H5Sselect_hyperslab(dataspace_select, H5S_SELECT_SET, offsets, NULL, count, NULL);
-        STARPU_ASSERT_MSG(status >= 0, "Error when reading this HDF5 dataset (%s)\n", work->obj->path);
+        STARPU_ASSERT_MSG(status >= 0, "Error when reading this HDF5 dataset (%s)\n", work->obj_src->path);
 
         /* create the dataspace for the received data which describes ptr */
         hsize_t dims_receive[1] = {work->size};
         hid_t dataspace_receive = H5Screate_simple(1, dims_receive, NULL);
-        STARPU_ASSERT_MSG(dataspace_receive >= 0, "Error when reading this HDF5 dataset (%s)\n", work->obj->path);
+        STARPU_ASSERT_MSG(dataspace_receive >= 0, "Error when reading this HDF5 dataset (%s)\n", work->obj_src->path);
 
         /* Receiver has to be an hyperslabs */
         offsets[0] = 0;
         count[0] = work->size;
         status = H5Sselect_hyperslab(dataspace_receive, H5S_SELECT_SET, offsets, NULL, count, NULL);
-        STARPU_ASSERT_MSG(dataspace_receive >= 0, "Error when reading this HDF5 dataset (%s)\n", work->obj->path);
+        STARPU_ASSERT_MSG(dataspace_receive >= 0, "Error when reading this HDF5 dataset (%s)\n", work->obj_src->path);
 
-        status = H5Dread(work->obj->dataset, datatype, dataspace_receive, dataspace_select, H5P_DEFAULT, work->ptr);
-        STARPU_ASSERT_MSG(status >= 0, "Error when reading this HDF5 dataset (%s)\n", work->obj->path);
+        status = H5Dread(work->obj_src->dataset, datatype, dataspace_receive, dataspace_select, H5P_DEFAULT, work->ptr);
+        STARPU_ASSERT_MSG(status >= 0, "Error when reading this HDF5 dataset (%s)\n", work->obj_src->path);
 
         /* don't need these dataspaces */
         status = H5Sclose(dataspace_select);
-        STARPU_ASSERT_MSG(status >= 0, "Error when reading this HDF5 dataset (%s)\n", work->obj->path);
+        STARPU_ASSERT_MSG(status >= 0, "Error when reading this HDF5 dataset (%s)\n", work->obj_src->path);
         status = H5Sclose(dataspace_receive);
-        STARPU_ASSERT_MSG(status >= 0, "Error when reading this HDF5 dataset (%s)\n", work->obj->path);
+        STARPU_ASSERT_MSG(status >= 0, "Error when reading this HDF5 dataset (%s)\n", work->obj_src->path);
 }
 
 static void starpu_hdf5_write_internal(struct _starpu_hdf5_work * work)
@@ -198,53 +202,98 @@ static void starpu_hdf5_write_internal(struct _starpu_hdf5_work * work)
         herr_t status;
 
         /* Get official datatype */
-        hid_t datatype = H5Dget_type(work->obj->dataset);
+        hid_t datatype = H5Dget_type(work->obj_dst->dataset);
         hsize_t sizeDatatype = H5Tget_size(datatype);
 
 	/* Update size of dataspace */
-	if (work->size + work->offset > work->obj->size)
+	if (work->size + work->offset_dst > work->obj_dst->size)
 	{
 		/* Count in number of elements */
-		hsize_t extendsdim[1] = {(work->offset + work->size)/sizeDatatype};
-		status = H5Dset_extent (work->obj->dataset, extendsdim);
+		hsize_t extendsdim[1] = {(work->offset_dst + work->size)/sizeDatatype};
+		status = H5Dset_extent (work->obj_dst->dataset, extendsdim);
 		STARPU_ASSERT_MSG(status >= 0, "Error when extending HDF5 dataspace !\n");
-		work->obj->size = work->offset + work->size;
+		work->obj_dst->size = work->offset_dst + work->size;
 	}
 
         /* count in element, not in byte */
-        work->offset /= sizeDatatype;
+        work->offset_dst /= sizeDatatype;
         work->size /= sizeDatatype;
 
         /* duplicate the dataspace in the dataset */
-        hid_t dataspace_select = H5Dget_space(work->obj->dataset);
-        STARPU_ASSERT_MSG(dataspace_select >= 0, "Error when writing this HDF5 dataset (%s)\n", work->obj->path);
+        hid_t dataspace_select = H5Dget_space(work->obj_dst->dataset);
+        STARPU_ASSERT_MSG(dataspace_select >= 0, "Error when writing this HDF5 dataset (%s)\n", work->obj_dst->path);
 
         /* Select what we want of the duplicated dataspace (it's called an hyperslab). This operation is done on place */
-        hsize_t offsets[1] = {work->offset};
+        hsize_t offsets[1] = {work->offset_dst};
         hsize_t count[1] = {work->size};
         /* stride and block size are NULL which is equivalent of a shift of 1 */
         status = H5Sselect_hyperslab(dataspace_select, H5S_SELECT_SET, offsets, NULL, count, NULL);
-        STARPU_ASSERT_MSG(status >= 0, "Error when writing this HDF5 dataset (%s)\n", work->obj->path);
+        STARPU_ASSERT_MSG(status >= 0, "Error when writing this HDF5 dataset (%s)\n", work->obj_dst->path);
 
         /* create the dataspace for the received data which describes ptr */
         hsize_t dims_send[1] = {work->size};
         hid_t dataspace_send = H5Screate_simple(1, dims_send, NULL);
-        STARPU_ASSERT_MSG(dataspace_send >= 0, "Error when writing this HDF5 dataset (%s)\n", work->obj->path);
+        STARPU_ASSERT_MSG(dataspace_send >= 0, "Error when writing this HDF5 dataset (%s)\n", work->obj_dst->path);
 
         /* Receiver has to be an hyperslabs */
         offsets[0] = 0;
         count[0] = work->size;
         status = H5Sselect_hyperslab(dataspace_send, H5S_SELECT_SET, offsets, NULL, count, NULL);
-        STARPU_ASSERT_MSG(dataspace_send >= 0, "Error when writing this HDF5 dataset (%s)\n", work->obj->path);
+        STARPU_ASSERT_MSG(dataspace_send >= 0, "Error when writing this HDF5 dataset (%s)\n", work->obj_dst->path);
 
-        status = H5Dwrite(work->obj->dataset, datatype, dataspace_send, dataspace_select, H5P_DEFAULT, work->ptr);
-        STARPU_ASSERT_MSG(status >= 0, "Error when writing this HDF5 dataset (%s)\n", work->obj->path);
+        status = H5Dwrite(work->obj_dst->dataset, datatype, dataspace_send, dataspace_select, H5P_DEFAULT, work->ptr);
+        STARPU_ASSERT_MSG(status >= 0, "Error when writing this HDF5 dataset (%s)\n", work->obj_dst->path);
 
         /* don't need these dataspaces */
         status = H5Sclose(dataspace_select);
-        STARPU_ASSERT_MSG(status >= 0, "Error when writing this HDF5 dataset (%s)\n", work->obj->path);
+        STARPU_ASSERT_MSG(status >= 0, "Error when writing this HDF5 dataset (%s)\n", work->obj_dst->path);
         status = H5Sclose(dataspace_send);
-        STARPU_ASSERT_MSG(status >= 0, "Error when writing this HDF5 dataset (%s)\n", work->obj->path);
+        STARPU_ASSERT_MSG(status >= 0, "Error when writing this HDF5 dataset (%s)\n", work->obj_dst->path);
+}
+
+unsigned warned = 0;
+static void starpu_hdf5_copy_internal(struct _starpu_hdf5_work * work)
+{
+	herr_t status;
+
+	/* HDF5 H50copy supports only same size in both areas and copies the entire object */
+	if (work->offset_src == 0 && work->offset_dst == 0 && work->size == work->obj_src->size && work->size == work->obj_dst->size)
+	{
+		/* Add creation of intermediate group. It's not really used in our case but HDF5 doc is very weird */
+		hid_t grp_prop = H5Pcreate(H5P_LINK_CREATE);
+		status = H5Pset_create_intermediate_group(grp_prop, 1);
+		STARPU_ASSERT_MSG(status >= 0, "Can not copy data (%s) associed to this disk (%s) to the data (%s) on this disk (%s)\n", work->obj_src->path, work->base_src->path, work->obj_dst->path, work->base_dst->path);
+
+		hid_t copy_prop = H5Pcreate(H5P_OBJECT_COPY);
+		status = H5Pset_copy_object(copy_prop, H5O_COPY_SHALLOW_HIERARCHY_FLAG);
+		STARPU_ASSERT_MSG(status >= 0, "Can not copy data (%s) associed to this disk (%s) to the data (%s) on this disk (%s)\n", work->obj_src->path, work->base_src->path, work->obj_dst->path, work->base_dst->path);
+
+		status = H5Ocopy(work->obj_src->dataset, work->obj_src->path, work->obj_dst->dataset, work->obj_dst->path, copy_prop, grp_prop); 
+		STARPU_ASSERT_MSG(status >= 0, "Can not copy data (%s) associed to this disk (%s) to the data (%s) on this disk (%s)\n", work->obj_src->path, work->base_src->path, work->obj_dst->path, work->base_dst->path);
+
+		H5Pclose(grp_prop);
+		H5Pclose(copy_prop);
+	}
+	else
+	{
+		if (!warned)
+		{
+			_STARPU_DISP("Direct disk to disk copy is not supported for a piece of data. Data will be transfered to RAM memory and then, be pushed on disk \n");
+			warned = 1;
+		}
+
+		void ** ptr = NULL;
+		int ret = _starpu_malloc_flags_on_node(STARPU_MAIN_RAM, ptr, work->size, 0);
+		STARPU_ASSERT_MSG(ret == 0, "Cannot allocate %lu bytes to perform disk to disk operation", work->size);
+
+		/* buffer is only used internally to store intermediate data */
+		work->ptr = *ptr;
+		
+		starpu_hdf5_read_internal(work);
+		starpu_hdf5_write_internal(work);
+
+		_starpu_free_flags_on_node(STARPU_MAIN_RAM, *ptr, work->size, 0);
+	}
 }
 
 static void * _starpu_hdf5_internal_thread(void * arg)
@@ -266,7 +315,19 @@ static void * _starpu_hdf5_internal_thread(void * arg)
                         struct _starpu_hdf5_work * work = _starpu_hdf5_work_list_pop_back(&HDF5_VAR_WORK_LIST);
                         STARPU_PTHREAD_MUTEX_UNLOCK(&HDF5_VAR_MUTEX);
 
-                        _starpu_hdf5_protect_start(work->base);
+			if (work->base_src < work->base_dst)
+			{
+				_starpu_hdf5_protect_start(work->base_src);
+				if (work->base_src != work->base_dst)
+					_starpu_hdf5_protect_start(work->base_dst);
+			}
+			else
+			{
+				_starpu_hdf5_protect_start(work->base_dst);
+				if (work->base_src != work->base_dst)
+					_starpu_hdf5_protect_start(work->base_src);
+			}
+
                         switch(work->type)
                         {
                                 case READ:
@@ -284,11 +345,16 @@ static void * _starpu_hdf5_internal_thread(void * arg)
                                 case FULL_WRITE:
                                         starpu_hdf5_full_write_internal(work);
                                         break;
+				
+				case COPY:
+					starpu_hdf5_copy_internal(work);
+					break;
 
                                 default:
                                         STARPU_ABORT();
                         }
-                        _starpu_hdf5_protect_stop(work->base);
+                        _starpu_hdf5_protect_stop(work->base_src);
+                        _starpu_hdf5_protect_stop(work->base_dst);
 
                         /* Update event to tell it's finished */
                         starpu_sem_post((starpu_sem_t *) work->event);
@@ -337,22 +403,36 @@ static hsize_t _starpu_get_size_obj(struct starpu_hdf5_obj * obj)
         return dims[0]*sizeDatatype;
 }
 
-static void starpu_hdf5_send_work(void *base, void *obj, void *buf, off_t offset, size_t size, void * event, enum hdf5_work_type type)
+static void starpu_hdf5_send_work(void *base_src, void *obj_src, off_t offset_src, void *base_dst, void *obj_dst, off_t offset_dst, void *buf, size_t size, void * event, enum hdf5_work_type type)
 {
-        struct starpu_hdf5_obj * dataObj = (struct starpu_hdf5_obj *) obj;
-        struct starpu_hdf5_base * fileBase = (struct starpu_hdf5_base *) base;
+        struct starpu_hdf5_obj * dataObj_src = (struct starpu_hdf5_obj *) obj_src;
+        struct starpu_hdf5_obj * dataObj_dst = (struct starpu_hdf5_obj *) obj_dst;
+        struct starpu_hdf5_base * fileBase_src = (struct starpu_hdf5_base *) base_src;
+        struct starpu_hdf5_base * fileBase_dst = (struct starpu_hdf5_base *) base_dst;
 
         struct _starpu_hdf5_work * work;
         _STARPU_MALLOC(work, sizeof(*work));
 
         work->type = type;
-        work->obj = dataObj;
-        work->base = fileBase;
+
+        work->base_src = fileBase_src;
+        work->obj_src = dataObj_src;
+        work->offset_src = offset_src;
+
+        work->base_dst = fileBase_dst;
+        work->obj_dst = dataObj_dst;
+        work->offset_dst = offset_dst;
+
         work->ptr = buf;
-        work->offset = offset;
         work->size = size;
         work->event = event;
 
+        struct starpu_hdf5_base * fileBase;
+	if (fileBase_src != NULL)
+		fileBase = fileBase_src;
+	else	
+		fileBase = fileBase_dst;
+
         STARPU_PTHREAD_MUTEX_LOCK(&HDF5_VAR_MUTEX);
         _starpu_hdf5_work_list_push_front(&HDF5_VAR_WORK_LIST, work);
         /* Wake up internal thread */
@@ -685,7 +765,7 @@ static int starpu_hdf5_full_read(void *base, void *obj, void **ptr, size_t *size
 
         _starpu_malloc_flags_on_node(dst_node, ptr, *size, 0); 
 
-        starpu_hdf5_send_work(base, obj, *ptr, 0, *size, (void*) &finished, FULL_READ);
+        starpu_hdf5_send_work(base, obj, 0, NULL, NULL, 0, *ptr, *size, (void*) &finished, FULL_READ);
         
         starpu_hdf5_wait(&finished);
 
@@ -699,7 +779,7 @@ static int starpu_hdf5_full_write(void *base, void *obj, void *ptr, size_t size)
         starpu_sem_t finished;
         starpu_sem_init(&finished, 0, 0);
 
-        starpu_hdf5_send_work(base, obj, ptr, 0, size, (void*) &finished, FULL_WRITE);
+        starpu_hdf5_send_work(NULL, NULL, 0, base, obj, 0, ptr, size, (void*) &finished, FULL_WRITE);
 
         starpu_hdf5_wait(&finished);
 
@@ -713,7 +793,7 @@ static int starpu_hdf5_read(void *base, void *obj, void *buf, off_t offset, size
         starpu_sem_t finished;
         starpu_sem_init(&finished, 0, 0);
 
-        starpu_hdf5_send_work(base, obj, buf, offset, size, (void*) &finished, READ);
+        starpu_hdf5_send_work(base, obj, offset, NULL, NULL, 0, buf, size, (void*) &finished, READ);
 
         starpu_hdf5_wait(&finished);
 
@@ -727,7 +807,7 @@ static int starpu_hdf5_write(void *base, void *obj, const void *buf, off_t offse
         starpu_sem_t finished;
         starpu_sem_init(&finished, 0, 0);
 
-        starpu_hdf5_send_work(base, obj, (void *) buf, offset, size, (void*) &finished, WRITE);
+        starpu_hdf5_send_work(NULL, NULL, 0, base, obj, offset, (void *) buf, size, (void*) &finished, WRITE);
 
         starpu_hdf5_wait(&finished);
 
@@ -742,7 +822,7 @@ static void * starpu_hdf5_async_read(void *base, void *obj, void *buf, off_t off
         _STARPU_MALLOC(finished, sizeof(*finished));
         starpu_sem_init(finished, 0, 0);
 
-        starpu_hdf5_send_work(base, obj, buf, offset, size, (void*) finished, READ);
+        starpu_hdf5_send_work(base, obj, offset, NULL, NULL, 0, buf, size, (void*) finished, READ);
 
         return finished;
 }
@@ -753,11 +833,23 @@ static void * starpu_hdf5_async_write(void *base, void *obj, void *buf, off_t of
         _STARPU_MALLOC(finished, sizeof(*finished));
         starpu_sem_init(finished, 0, 0);
 
-        starpu_hdf5_send_work(base, obj, (void *) buf, offset, size, (void*) finished, WRITE);
+        starpu_hdf5_send_work(NULL, NULL, 0, base, obj, offset, (void *) buf, size, (void*) finished, WRITE);
 
         return finished;
 }
 
+void *  starpu_hdf5_copy(void *base_src, void* obj_src, off_t offset_src,  void *base_dst, void* obj_dst, off_t offset_dst, size_t size)
+{
+        starpu_sem_t * finished;
+        _STARPU_MALLOC(finished, sizeof(*finished));
+        starpu_sem_init(finished, 0, 0);
+
+	starpu_hdf5_send_work(base_src, obj_src, offset_src, base_dst, obj_dst, offset_dst, NULL, size, (void*) finished, COPY);
+
+        return finished;
+}
+
+
 static void starpu_hdf5_free_request(void * event)
 {
         starpu_sem_destroy(event);
@@ -830,7 +922,7 @@ struct starpu_disk_ops starpu_disk_hdf5_ops =
 	.write = starpu_hdf5_write,
 	.plug = starpu_hdf5_plug,
 	.unplug = starpu_hdf5_unplug,
-	.copy = NULL,
+	.copy = starpu_hdf5_copy,
 	.bandwidth = get_hdf5_bandwidth_between_disk_and_main_ram,
 	.full_read = starpu_hdf5_full_read,
 	.full_write = starpu_hdf5_full_write,