123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982 |
- /* StarPU --- Runtime system for heterogeneous multicore architectures.
- *
- * Copyright (C) 2017, 2019 CNRS
- * Copyright (C) 2017 Inria
- * Copyright (C) 2017 Université de Bordeaux
- *
- * StarPU is free software; you can redistribute it and/or modify
- * it under the terms of the GNU Lesser General Public License as published by
- * the Free Software Foundation; either version 2.1 of the License, or (at
- * your option) any later version.
- *
- * StarPU is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
- *
- * See the GNU Lesser General Public License in COPYING.LGPL for more details.
- */
- #include <fcntl.h>
- #include <errno.h>
- #include <common/config.h>
- #ifdef HAVE_UNISTD_H
- #include <unistd.h>
- #endif
- #include <time.h>
- #include <hdf5.h>
- #include <starpu.h>
- #include <core/disk.h>
- #include <core/perfmodel/perfmodel.h>
- #ifndef O_BINARY
- #define O_BINARY 0
- #endif
- #define NITER _starpu_calibration_minimum
- #define STARPU_CHUNK_DIM 4096
- /* ------------------- use HDF5 to write on disk ------------------- */
- #ifndef H5_HAVE_THREADSAFE
- static int nb_disk_open = 0;
- static volatile int init_finished = 0;
- static starpu_pthread_t global_thread; /* This thread will perform each write/read because we don't have asynchronous functions */
- static volatile int global_run; /* Ask to the thread if he can continue */
- static starpu_pthread_mutex_t global_mutex; /* Mutex is used to protect work_list and if HDF5 library is not safe */
- static starpu_pthread_cond_t global_cond;
- static struct _starpu_hdf5_work_list global_work_list; /* This list contains the work for the hdf5 thread */
- #endif
- #ifdef H5_HAVE_THREADSAFE
- #define HDF5_VAR_THREAD fileBase->thread
- #define HDF5_VAR_RUN fileBase->run
- #define HDF5_VAR_MUTEX fileBase->mutex
- #define HDF5_VAR_COND fileBase->cond
- #define HDF5_VAR_WORK_LIST fileBase->work_list
- #else
- #define HDF5_VAR_THREAD global_thread
- #define HDF5_VAR_RUN global_run
- #define HDF5_VAR_MUTEX global_mutex
- #define HDF5_VAR_COND global_cond
- #define HDF5_VAR_WORK_LIST global_work_list
- #endif
- enum hdf5_work_type { READ, WRITE, FULL_READ, FULL_WRITE, COPY };
- LIST_TYPE(_starpu_hdf5_work,
- enum hdf5_work_type type;
- struct starpu_hdf5_base * base_src;
- struct starpu_hdf5_obj * obj_src;
- off_t offset_src;
- struct starpu_hdf5_base * base_dst;
- struct starpu_hdf5_obj * obj_dst;
- off_t offset_dst;
- void * ptr;
- size_t size;
- void * event;
- );
- struct starpu_hdf5_base
- {
- hid_t fileID;
- char * path;
- unsigned created; /* StarPU creates the HDF5 file */
- unsigned next_dataset_id;
- starpu_pthread_t thread; /* This thread will perform each write/read because we don't have asynchronous functions */
- int run; /* Ask to the thread if he can continue */
- starpu_pthread_mutex_t mutex; /* Mutex is used to protect work_list and if HDF5 library is not safe */
- starpu_pthread_cond_t cond;
- struct _starpu_hdf5_work_list work_list; /* This list contains the work for the hdf5 thread */
- };
- struct starpu_hdf5_obj
- {
- hid_t dataset; /* describe this object in HDF5 file */
- char * path; /* path where data are stored in HDF5 file */
- size_t size;
- };
- static inline void _starpu_hdf5_protect_start(void * base STARPU_ATTRIBUTE_UNUSED)
- {
- #ifndef H5_HAVE_THREADSAFE
- if (base != NULL)
- STARPU_PTHREAD_MUTEX_LOCK(&HDF5_VAR_MUTEX);
- #endif
- }
- static inline void _starpu_hdf5_protect_stop(void * base STARPU_ATTRIBUTE_UNUSED)
- {
- #ifndef H5_HAVE_THREADSAFE
- if (base != NULL)
- STARPU_PTHREAD_MUTEX_UNLOCK(&HDF5_VAR_MUTEX);
- #endif
- }
- /* ------------------ Functions for internal thread -------------------- */
- /* TODO : Dataspace may not be NATIVE_CHAR for opened data */
- static void starpu_hdf5_full_read_internal(struct _starpu_hdf5_work * work)
- {
- herr_t status;
- status = H5Dread(work->obj_src->dataset, H5T_NATIVE_CHAR, H5S_ALL, H5S_ALL, H5P_DEFAULT, work->ptr);
- STARPU_ASSERT_MSG(status >= 0, "Can not read data associed to this dataset (%s)\n", work->obj_src->path);
- }
- /* TODO : Dataspace may not be NATIVE_CHAR for opened data */
- static void starpu_hdf5_full_write_internal(struct _starpu_hdf5_work * work)
- {
- herr_t status;
- /* Update size of dataspace */
- if (work->size > work->obj_dst->size)
- {
- /* Get official datatype */
- hid_t datatype = H5Dget_type(work->obj_dst->dataset);
- hsize_t sizeDatatype = H5Tget_size(datatype);
- /* Count in number of elements */
- hsize_t extendsdim[1] = {work->size/sizeDatatype};
- status = H5Dset_extent (work->obj_dst->dataset, extendsdim);
- STARPU_ASSERT_MSG(status >= 0, "Error when extending HDF5 dataspace !\n");
- work->obj_dst->size = work->size;
- }
- /* Write ALL the dataspace */
- status = H5Dwrite(work->obj_dst->dataset, H5T_NATIVE_CHAR, H5S_ALL, H5S_ALL, H5P_DEFAULT, work->ptr);
- STARPU_ASSERT_MSG(status >= 0, "Can not write data to this dataset (%s)\n", work->obj_dst->path);
- }
- static void starpu_hdf5_read_internal(struct _starpu_hdf5_work * work)
- {
- herr_t status;
- /* Get official datatype */
- hid_t datatype = H5Dget_type(work->obj_src->dataset);
- hsize_t sizeDatatype = H5Tget_size(datatype);
- /* count in element, not in byte */
- work->offset_src /= sizeDatatype;
- work->size /= sizeDatatype;
- /* duplicate the dataspace in the dataset */
- hid_t dataspace_select = H5Dget_space(work->obj_src->dataset);
- STARPU_ASSERT_MSG(dataspace_select >= 0, "Error when reading this HDF5 dataset (%s)\n", work->obj_src->path);
- /* Select what we want of the duplicated dataspace (it's called an hyperslab). This operation is done on place */
- hsize_t offsets[1] = {work->offset_src};
- hsize_t count[1] = {work->size};
- /* stride and block size are NULL which is equivalent of a shift of 1 */
- status = H5Sselect_hyperslab(dataspace_select, H5S_SELECT_SET, offsets, NULL, count, NULL);
- STARPU_ASSERT_MSG(status >= 0, "Error when reading this HDF5 dataset (%s)\n", work->obj_src->path);
- /* create the dataspace for the received data which describes ptr */
- hsize_t dims_receive[1] = {work->size};
- hid_t dataspace_receive = H5Screate_simple(1, dims_receive, NULL);
- STARPU_ASSERT_MSG(dataspace_receive >= 0, "Error when reading this HDF5 dataset (%s)\n", work->obj_src->path);
- /* Receiver has to be an hyperslabs */
- offsets[0] = 0;
- count[0] = work->size;
- H5Sselect_hyperslab(dataspace_receive, H5S_SELECT_SET, offsets, NULL, count, NULL);
- STARPU_ASSERT_MSG(dataspace_receive >= 0, "Error when reading this HDF5 dataset (%s)\n", work->obj_src->path);
- status = H5Dread(work->obj_src->dataset, datatype, dataspace_receive, dataspace_select, H5P_DEFAULT, work->ptr);
- STARPU_ASSERT_MSG(status >= 0, "Error when reading this HDF5 dataset (%s)\n", work->obj_src->path);
- /* don't need these dataspaces */
- status = H5Sclose(dataspace_select);
- STARPU_ASSERT_MSG(status >= 0, "Error when reading this HDF5 dataset (%s)\n", work->obj_src->path);
- status = H5Sclose(dataspace_receive);
- STARPU_ASSERT_MSG(status >= 0, "Error when reading this HDF5 dataset (%s)\n", work->obj_src->path);
- }
- static void starpu_hdf5_write_internal(struct _starpu_hdf5_work * work)
- {
- herr_t status;
- /* Get official datatype */
- hid_t datatype = H5Dget_type(work->obj_dst->dataset);
- hsize_t sizeDatatype = H5Tget_size(datatype);
- /* Update size of dataspace */
- if (work->size + work->offset_dst > work->obj_dst->size)
- {
- /* Count in number of elements */
- hsize_t extendsdim[1] = {(work->offset_dst + work->size)/sizeDatatype};
- status = H5Dset_extent (work->obj_dst->dataset, extendsdim);
- STARPU_ASSERT_MSG(status >= 0, "Error when extending HDF5 dataspace !\n");
- work->obj_dst->size = work->offset_dst + work->size;
- }
- /* count in element, not in byte */
- work->offset_dst /= sizeDatatype;
- work->size /= sizeDatatype;
- /* duplicate the dataspace in the dataset */
- hid_t dataspace_select = H5Dget_space(work->obj_dst->dataset);
- STARPU_ASSERT_MSG(dataspace_select >= 0, "Error when writing this HDF5 dataset (%s)\n", work->obj_dst->path);
- /* Select what we want of the duplicated dataspace (it's called an hyperslab). This operation is done on place */
- hsize_t offsets[1] = {work->offset_dst};
- hsize_t count[1] = {work->size};
- /* stride and block size are NULL which is equivalent of a shift of 1 */
- status = H5Sselect_hyperslab(dataspace_select, H5S_SELECT_SET, offsets, NULL, count, NULL);
- STARPU_ASSERT_MSG(status >= 0, "Error when writing this HDF5 dataset (%s)\n", work->obj_dst->path);
- /* create the dataspace for the received data which describes ptr */
- hsize_t dims_send[1] = {work->size};
- hid_t dataspace_send = H5Screate_simple(1, dims_send, NULL);
- STARPU_ASSERT_MSG(dataspace_send >= 0, "Error when writing this HDF5 dataset (%s)\n", work->obj_dst->path);
- /* Receiver has to be an hyperslabs */
- offsets[0] = 0;
- count[0] = work->size;
- H5Sselect_hyperslab(dataspace_send, H5S_SELECT_SET, offsets, NULL, count, NULL);
- STARPU_ASSERT_MSG(dataspace_send >= 0, "Error when writing this HDF5 dataset (%s)\n", work->obj_dst->path);
- status = H5Dwrite(work->obj_dst->dataset, datatype, dataspace_send, dataspace_select, H5P_DEFAULT, work->ptr);
- STARPU_ASSERT_MSG(status >= 0, "Error when writing this HDF5 dataset (%s)\n", work->obj_dst->path);
- /* don't need these dataspaces */
- status = H5Sclose(dataspace_select);
- STARPU_ASSERT_MSG(status >= 0, "Error when writing this HDF5 dataset (%s)\n", work->obj_dst->path);
- status = H5Sclose(dataspace_send);
- STARPU_ASSERT_MSG(status >= 0, "Error when writing this HDF5 dataset (%s)\n", work->obj_dst->path);
- }
- unsigned warned = 0;
- static void starpu_hdf5_copy_internal(struct _starpu_hdf5_work * work)
- {
- herr_t status;
- /* HDF5 H50copy supports only same size in both areas and copies the entire object */
- if (work->offset_src == 0 && work->offset_dst == 0 && work->size == work->obj_src->size && work->size == work->obj_dst->size)
- {
- H5Dclose(work->obj_dst->dataset);
- /* Dirty : Delete dataspace because H5Ocopy only works if destination does not exist */
- H5Ldelete(work->base_dst->fileID, work->obj_dst->path, H5P_DEFAULT);
- status = H5Ocopy(work->base_src->fileID, work->obj_src->path, work->base_dst->fileID, work->obj_dst->path, H5P_DEFAULT, H5P_DEFAULT);
- STARPU_ASSERT_MSG(status >= 0, "Can not copy data (%s) associed to this disk (%s) to the data (%s) on this disk (%s)\n", work->obj_src->path, work->base_src->path, work->obj_dst->path, work->base_dst->path);
- work->obj_dst->dataset = H5Dopen2(work->base_dst->fileID, work->obj_dst->path, H5P_DEFAULT);
- }
- else
- {
- if (!warned)
- {
- _STARPU_DISP("Direct disk to disk copy is not supported for a piece of data. Data will be transfered to RAM memory and then, be pushed on disk \n");
- warned = 1;
- }
- void * ptr;
- int ret = _starpu_malloc_flags_on_node(STARPU_MAIN_RAM, &ptr, work->size, 0);
- STARPU_ASSERT_MSG(ret == 0, "Cannot allocate %lu bytes to perform disk to disk operation", (unsigned long)work->size);
- /* buffer is only used internally to store intermediate data */
- work->ptr = ptr;
- starpu_hdf5_read_internal(work);
- starpu_hdf5_write_internal(work);
- _starpu_free_flags_on_node(STARPU_MAIN_RAM, ptr, work->size, 0);
- }
- }
- static void * _starpu_hdf5_internal_thread(void * arg)
- {
- #ifdef H5_HAVE_THREADSAFE
- struct starpu_hdf5_base * fileBase = (struct starpu_hdf5_base *) arg;
- #endif
- while (HDF5_VAR_RUN || !_starpu_hdf5_work_list_empty(&HDF5_VAR_WORK_LIST))
- {
- STARPU_PTHREAD_MUTEX_LOCK(&HDF5_VAR_MUTEX);
- if (_starpu_hdf5_work_list_empty(&HDF5_VAR_WORK_LIST) && HDF5_VAR_RUN)
- STARPU_PTHREAD_COND_WAIT(&HDF5_VAR_COND, &HDF5_VAR_MUTEX);
- STARPU_PTHREAD_MUTEX_UNLOCK(&HDF5_VAR_MUTEX);
- /* We are the only consummer here, don't need to protect here */
- if (!_starpu_hdf5_work_list_empty(&HDF5_VAR_WORK_LIST))
- {
- STARPU_PTHREAD_MUTEX_LOCK(&HDF5_VAR_MUTEX);
- struct _starpu_hdf5_work * work = _starpu_hdf5_work_list_pop_back(&HDF5_VAR_WORK_LIST);
- STARPU_PTHREAD_MUTEX_UNLOCK(&HDF5_VAR_MUTEX);
- if (work->base_src < work->base_dst)
- {
- _starpu_hdf5_protect_start(work->base_src);
- #ifdef H5_HAVE_THREADSAFE
- _starpu_hdf5_protect_start(work->base_dst);
- #endif
- }
- else
- {
- _starpu_hdf5_protect_start(work->base_dst);
- #ifdef H5_HAVE_THREADSAFE
- if (work->base_src != work->base_dst)
- _starpu_hdf5_protect_start(work->base_src);
- #endif
- }
- switch(work->type)
- {
- case READ:
- starpu_hdf5_read_internal(work);
- break;
- case WRITE:
- starpu_hdf5_write_internal(work);
- break;
- case FULL_READ:
- starpu_hdf5_full_read_internal(work);
- break;
- case FULL_WRITE:
- starpu_hdf5_full_write_internal(work);
- break;
- case COPY:
- starpu_hdf5_copy_internal(work);
- break;
- default:
- STARPU_ABORT();
- }
- if (work->base_src < work->base_dst)
- {
- _starpu_hdf5_protect_stop(work->base_src);
- #ifdef H5_HAVE_THREADSAFE
- _starpu_hdf5_protect_stop(work->base_dst);
- #endif
- }
- else
- {
- _starpu_hdf5_protect_stop(work->base_dst);
- #ifdef H5_HAVE_THREADSAFE
- if (work->base_src != work->base_dst)
- _starpu_hdf5_protect_stop(work->base_src);
- #endif
- }
- /* Update event to tell it's finished */
- starpu_sem_post((starpu_sem_t *) work->event);
- free(work);
- }
- }
- return NULL;
- }
- static void _starpu_hdf5_create_thread(struct starpu_hdf5_base * fileBase)
- {
- _starpu_hdf5_work_list_init(&HDF5_VAR_WORK_LIST);
- HDF5_VAR_RUN = 1;
- STARPU_PTHREAD_COND_INIT(&HDF5_VAR_COND, NULL);
- STARPU_PTHREAD_CREATE(&HDF5_VAR_THREAD, NULL, _starpu_hdf5_internal_thread, (void *) fileBase);
- }
- /* returns the size in BYTES */
- static hsize_t _starpu_get_size_obj(struct starpu_hdf5_obj * obj)
- {
- herr_t status;
- hid_t dataspace = H5Dget_space(obj->dataset);
- STARPU_ASSERT_MSG(dataspace >= 0, "Can not get the size of this HDF5 dataset (%s)\n", obj->path);
- hsize_t dims[1];
- status = H5Sget_simple_extent_dims(dataspace, dims, NULL);
- STARPU_ASSERT_MSG(status >= 0, "Can not get the size of this HDF5 dataset (%s)\n", obj->path);
- hid_t datatype = H5Dget_type(obj->dataset);
- STARPU_ASSERT_MSG(datatype >= 0, "Can not get the size of this HDF5 dataset (%s)\n", obj->path);
- hsize_t sizeDatatype = H5Tget_size(datatype);
- STARPU_ASSERT_MSG(sizeDatatype > 0, "Can not get the size of this HDF5 dataset (%s)\n", obj->path);
- H5Sclose(dataspace);
- H5Tclose(datatype);
- return dims[0]*sizeDatatype;
- }
- static void starpu_hdf5_send_work(void *base_src, void *obj_src, off_t offset_src, void *base_dst, void *obj_dst, off_t offset_dst, void *buf, size_t size, void * event, enum hdf5_work_type type)
- {
- struct starpu_hdf5_obj * dataObj_src = (struct starpu_hdf5_obj *) obj_src;
- struct starpu_hdf5_obj * dataObj_dst = (struct starpu_hdf5_obj *) obj_dst;
- struct starpu_hdf5_base * fileBase_src = (struct starpu_hdf5_base *) base_src;
- struct starpu_hdf5_base * fileBase_dst = (struct starpu_hdf5_base *) base_dst;
- struct _starpu_hdf5_work * work;
- _STARPU_MALLOC(work, sizeof(*work));
- work->type = type;
- work->base_src = fileBase_src;
- work->obj_src = dataObj_src;
- work->offset_src = offset_src;
- work->base_dst = fileBase_dst;
- work->obj_dst = dataObj_dst;
- work->offset_dst = offset_dst;
- work->ptr = buf;
- work->size = size;
- work->event = event;
- #ifdef H5_HAVE_THREADSAFE
- struct starpu_hdf5_base * fileBase;
- if (fileBase_src != NULL)
- fileBase = fileBase_src;
- else
- fileBase = fileBase_dst;
- #endif
- STARPU_PTHREAD_MUTEX_LOCK(&HDF5_VAR_MUTEX);
- _starpu_hdf5_work_list_push_front(&HDF5_VAR_WORK_LIST, work);
- /* Wake up internal thread */
- STARPU_PTHREAD_COND_BROADCAST(&HDF5_VAR_COND);
- STARPU_PTHREAD_MUTEX_UNLOCK(&HDF5_VAR_MUTEX);
- }
- static struct starpu_hdf5_obj * _starpu_hdf5_data_alloc(struct starpu_hdf5_base * fileBase, char * name, size_t size)
- {
- struct starpu_hdf5_obj * obj;
- _STARPU_MALLOC(obj, sizeof(*obj));
- _starpu_hdf5_protect_start((void *) fileBase);
- /* create a dataspace with one dimension of size elements */
- hsize_t dim[1] = {size};
- hsize_t maxdim[1] = {H5S_UNLIMITED};
- hid_t dataspace = H5Screate_simple(1, dim, maxdim);
- if (dataspace < 0)
- {
- free(obj);
- return NULL;
- }
- hsize_t chunkdim[1] = {STARPU_CHUNK_DIM};
- hid_t prop = H5Pcreate (H5P_DATASET_CREATE);
- herr_t status = H5Pset_chunk (prop, 1, chunkdim);
- STARPU_ASSERT_MSG(status >= 0, "Error when setting HDF5 property \n");
- /* create a dataset at location name, with data described by the dataspace.
- * Each element are like char in C (expected one byte)
- */
- obj->dataset = H5Dcreate2(fileBase->fileID, name, H5T_NATIVE_CHAR, dataspace, H5P_DEFAULT, prop, H5P_DEFAULT);
- H5Sclose(dataspace);
- H5Pclose(prop);
- if (obj->dataset < 0)
- {
- free(obj);
- return NULL;
- }
- obj->path = name;
- obj->size = size;
- _starpu_hdf5_protect_stop((void *) fileBase);
- return obj;
- }
- static struct starpu_hdf5_obj * _starpu_hdf5_data_open(struct starpu_hdf5_base * fileBase, char * name, size_t size)
- {
- struct starpu_hdf5_obj * obj;
- _STARPU_MALLOC(obj, sizeof(*obj));
- _starpu_hdf5_protect_start((void *) fileBase);
- /* create a dataset at location name, with data described by the dataspace.
- * Each element are like char in C (expected one byte)
- */
- obj->dataset = H5Dopen2(fileBase->fileID, name, H5P_DEFAULT);
- _starpu_hdf5_protect_stop((void *) fileBase);
- if (obj->dataset < 0)
- {
- free(obj);
- return NULL;
- }
- obj->path = name;
- obj->size = size;
- return obj;
- }
- static void *starpu_hdf5_plug(void *parameter, starpu_ssize_t size STARPU_ATTRIBUTE_UNUSED)
- {
- struct starpu_hdf5_base * fileBase;
- _STARPU_MALLOC(fileBase, sizeof(struct starpu_hdf5_base));
- #ifndef H5_HAVE_THREADSAFE
- int actual_nb_disk = STARPU_ATOMIC_ADD(&nb_disk_open, 1);
- if (actual_nb_disk == 1)
- {
- #endif
- STARPU_PTHREAD_MUTEX_INIT(&HDF5_VAR_MUTEX, NULL);
- #ifndef H5_HAVE_THREADSAFE
- }
- else
- {
- while (!init_finished)
- ;
- }
- #endif
- _starpu_hdf5_protect_start(fileBase);
- struct stat buf;
- if (stat(parameter, &buf) != 0 || !S_ISREG(buf.st_mode))
- {
- /* The file doesn't exist or the directory exists => create the datafile */
- int id;
- _starpu_mkpath(parameter, S_IRWXU);
- fileBase->path = _starpu_mktemp(parameter, O_RDWR | O_BINARY, &id);
- if (!fileBase->path)
- {
- free(fileBase);
- _STARPU_ERROR("Can not create the HDF5 file (%s)", (char *) parameter);
- return NULL;
- }
- /* just use _starpu_mktemp_many to create a file, close the file descriptor */
- close(id);
- /* Truncate it */
- fileBase->fileID = H5Fcreate((char *)fileBase->path, H5F_ACC_TRUNC, H5P_DEFAULT, H5P_DEFAULT);
- if (fileBase->fileID < 0)
- {
- free(fileBase);
- _STARPU_ERROR("Can not create the HDF5 file (%s)", (char *) parameter);
- return NULL;
- }
- fileBase->created = 1;
- }
- else
- {
- /* Well, open it ! */
- char *path = strdup((char *)parameter);
- STARPU_ASSERT(path);
- fileBase->fileID = H5Fopen((char *)parameter, H5F_ACC_RDWR, H5P_DEFAULT);
- if (fileBase->fileID < 0)
- {
- free(fileBase);
- free(path);
- _STARPU_ERROR("Can not open the HDF5 file (%s)", (char *) parameter);
- return NULL;
- }
- fileBase->created = 0;
- fileBase->path = path;
- }
- #ifndef H5_HAVE_THREADSAFE
- if (actual_nb_disk == 1)
- {
- #endif
- _starpu_hdf5_create_thread(fileBase);
- #ifndef H5_HAVE_THREADSAFE
- init_finished = 1;
- }
- #endif
- #if H5_VERS_MAJOR > 1 || (H5_VERS_MAJOR == 1 && H5_VERS_MINOR > 10) || (H5_VERS_MAJOR == 1 && H5_VERS_MINOR == 10 && H5_VERS_RELEASE > 0)
- H5Pset_file_space_strategy(fileBase->fileID, H5F_FSPACE_STRATEGY_FSM_AGGR, 0, 0);
- #endif
- _starpu_hdf5_protect_stop(fileBase);
- fileBase->next_dataset_id = 0;
- return (void *) fileBase;
- }
- /* free memory allocated for the base */
- static void starpu_hdf5_unplug(void *base)
- {
- #ifndef H5_HAVE_THREADSAFE
- int actual_nb_disk = STARPU_ATOMIC_ADD(&nb_disk_open, -1);
- #endif
- struct starpu_hdf5_base * fileBase = (struct starpu_hdf5_base *) base;
- herr_t status;
- STARPU_PTHREAD_MUTEX_LOCK(&HDF5_VAR_MUTEX);
- #ifndef H5_HAVE_THREADSAFE
- if (actual_nb_disk == 0)
- {
- #endif
- HDF5_VAR_RUN = 0;
- STARPU_PTHREAD_COND_BROADCAST(&HDF5_VAR_COND);
- STARPU_PTHREAD_MUTEX_UNLOCK(&HDF5_VAR_MUTEX);
- STARPU_PTHREAD_JOIN(HDF5_VAR_THREAD, NULL);
- STARPU_PTHREAD_MUTEX_LOCK(&HDF5_VAR_MUTEX);
- STARPU_PTHREAD_COND_DESTROY(&HDF5_VAR_COND);
- STARPU_ASSERT(_starpu_hdf5_work_list_empty(&HDF5_VAR_WORK_LIST));
- /* the internal thread is deleted */
- #ifndef H5_HAVE_THREADSAFE
- }
- #endif
- status = H5Fclose(fileBase->fileID);
- STARPU_PTHREAD_MUTEX_UNLOCK(&HDF5_VAR_MUTEX);
- #ifndef H5_HAVE_THREADSAFE
- if (actual_nb_disk == 0)
- {
- #endif
- STARPU_PTHREAD_MUTEX_DESTROY(&HDF5_VAR_MUTEX);
- #ifndef H5_HAVE_THREADSAFE
- init_finished = 0;
- }
- #endif
- STARPU_ASSERT_MSG(status >= 0, "Can not unplug this HDF5 disk (%s)\n", fileBase->path);
- if (fileBase->created)
- {
- unlink(fileBase->path);
- }
- else
- {
- /* Warn user about repack, because unlink dataset doesn't delete data in file */
- _STARPU_DISP("This disk (%s) was used to store temporary data. You may use the h5repack command to reduce the size of the file... \n", fileBase->path);
- }
- free(fileBase->path);
- free(fileBase);
- }
- static void *starpu_hdf5_alloc(void *base, size_t size)
- {
- struct starpu_hdf5_base * fileBase = (struct starpu_hdf5_base *) base;
- struct starpu_hdf5_obj * obj;
- char * name;
- char * prefix = "STARPU_";
- char name_id[16];
- /* Save the name of the dataset */
- STARPU_PTHREAD_MUTEX_LOCK(&HDF5_VAR_MUTEX);
- snprintf(name_id, sizeof(name_id), "%u", fileBase->next_dataset_id);
- fileBase->next_dataset_id++;
- STARPU_PTHREAD_MUTEX_UNLOCK(&HDF5_VAR_MUTEX);
- /* name in HDF5 is like a path */
- _STARPU_MALLOC(name, 1+strlen(prefix)+strlen(name_id)+1);
- snprintf(name, 1+strlen(prefix)+strlen(name_id)+1, "/%s%s", prefix, name_id);
- obj = _starpu_hdf5_data_alloc(fileBase, name, size);
- if (!obj)
- {
- free(name);
- }
- return (void *) obj;
- }
- static void starpu_hdf5_free(void *base, void *obj, size_t size STARPU_ATTRIBUTE_UNUSED)
- {
- struct starpu_hdf5_base * fileBase = (struct starpu_hdf5_base *) base;
- struct starpu_hdf5_obj * dataObj = (struct starpu_hdf5_obj *) obj;
- herr_t status;
- _starpu_hdf5_protect_start(base);
- status = H5Dclose(dataObj->dataset);
- STARPU_ASSERT_MSG(status >= 0, "Can not free this HDF5 dataset (%s)\n", dataObj->path);
- /* remove the dataset link in the HDF5
- * But it doesn't delete the space in the file */
- status = H5Ldelete(fileBase->fileID, dataObj->path, H5P_DEFAULT);
- STARPU_ASSERT_MSG(status >= 0, "Can not delete the link associed to this dataset (%s)\n", dataObj->path);
- _starpu_hdf5_protect_stop(base);
- free(dataObj->path);
- free(dataObj);
- }
- static void *starpu_hdf5_open(void *base, void *pos, size_t size)
- {
- struct starpu_hdf5_base * fileBase = (struct starpu_hdf5_base *) base;
- struct starpu_hdf5_obj * obj;
- char *name;
- name = strdup((char *)pos);
- STARPU_ASSERT(name);
- obj = _starpu_hdf5_data_open(fileBase, name, size);
- if (!obj)
- {
- free(name);
- }
- return (void *) obj;
- }
- static void starpu_hdf5_close(void *base, void *obj, size_t size STARPU_ATTRIBUTE_UNUSED)
- {
- struct starpu_hdf5_obj * dataObj = (struct starpu_hdf5_obj *) obj;
- herr_t status;
- _starpu_hdf5_protect_start(base);
- status = H5Dclose(dataObj->dataset);
- STARPU_ASSERT_MSG(status >= 0, "Can not close this HDF5 dataset (%s)\n", dataObj->path);
- _starpu_hdf5_protect_stop(base);
- free(dataObj->path);
- free(dataObj);
- }
- static void starpu_hdf5_wait(void * event)
- {
- starpu_sem_t * finished = (starpu_sem_t *) event;
- starpu_sem_wait(finished);
- }
- static int starpu_hdf5_test(void * event)
- {
- starpu_sem_t * finished = (starpu_sem_t *) event;
- return starpu_sem_trywait(finished) == 0;
- }
- static int starpu_hdf5_full_read(void *base, void *obj, void **ptr, size_t *size, unsigned dst_node)
- {
- struct starpu_hdf5_obj * dataObj = (struct starpu_hdf5_obj *) obj;
- starpu_sem_t finished;
- starpu_sem_init(&finished, 0, 0);
- _starpu_hdf5_protect_start(base);
- *size = _starpu_get_size_obj(dataObj);
- _starpu_hdf5_protect_stop(base);
- _starpu_malloc_flags_on_node(dst_node, ptr, *size, 0);
- starpu_hdf5_send_work(base, obj, 0, NULL, NULL, 0, *ptr, *size, (void*) &finished, FULL_READ);
- starpu_hdf5_wait(&finished);
- starpu_sem_destroy(&finished);
- return 0;
- }
- static int starpu_hdf5_full_write(void *base, void *obj, void *ptr, size_t size)
- {
- starpu_sem_t finished;
- starpu_sem_init(&finished, 0, 0);
- starpu_hdf5_send_work(NULL, NULL, 0, base, obj, 0, ptr, size, (void*) &finished, FULL_WRITE);
- starpu_hdf5_wait(&finished);
- starpu_sem_destroy(&finished);
- return 0;
- }
- static int starpu_hdf5_read(void *base, void *obj, void *buf, off_t offset, size_t size)
- {
- starpu_sem_t finished;
- starpu_sem_init(&finished, 0, 0);
- starpu_hdf5_send_work(base, obj, offset, NULL, NULL, 0, buf, size, (void*) &finished, READ);
- starpu_hdf5_wait(&finished);
- starpu_sem_destroy(&finished);
- return 0;
- }
- static int starpu_hdf5_write(void *base, void *obj, const void *buf, off_t offset, size_t size)
- {
- starpu_sem_t finished;
- starpu_sem_init(&finished, 0, 0);
- starpu_hdf5_send_work(NULL, NULL, 0, base, obj, offset, (void *) buf, size, (void*) &finished, WRITE);
- starpu_hdf5_wait(&finished);
- starpu_sem_destroy(&finished);
- return 0;
- }
- static void * starpu_hdf5_async_read(void *base, void *obj, void *buf, off_t offset, size_t size)
- {
- starpu_sem_t * finished;
- _STARPU_MALLOC(finished, sizeof(*finished));
- starpu_sem_init(finished, 0, 0);
- starpu_hdf5_send_work(base, obj, offset, NULL, NULL, 0, buf, size, (void*) finished, READ);
- return finished;
- }
- static void * starpu_hdf5_async_write(void *base, void *obj, void *buf, off_t offset, size_t size)
- {
- starpu_sem_t * finished;
- _STARPU_MALLOC(finished, sizeof(*finished));
- starpu_sem_init(finished, 0, 0);
- starpu_hdf5_send_work(NULL, NULL, 0, base, obj, offset, (void *) buf, size, (void*) finished, WRITE);
- return finished;
- }
- void * starpu_hdf5_async_full_read (void * base, void * obj, void ** ptr, size_t * size, unsigned dst_node)
- {
- struct starpu_hdf5_obj * dataObj = (struct starpu_hdf5_obj *) obj;
- starpu_sem_t * finished;
- _STARPU_MALLOC(finished, sizeof(*finished));
- starpu_sem_init(finished, 0, 0);
- _starpu_hdf5_protect_start(base);
- *size = _starpu_get_size_obj(dataObj);
- _starpu_hdf5_protect_stop(base);
- _starpu_malloc_flags_on_node(dst_node, ptr, *size, 0);
- starpu_hdf5_send_work(base, obj, 0, NULL, NULL, 0, *ptr, *size, (void*) finished, FULL_READ);
- return finished;
- }
- void * starpu_hdf5_async_full_write (void * base, void * obj, void * ptr, size_t size)
- {
- starpu_sem_t * finished;
- _STARPU_MALLOC(finished, sizeof(*finished));
- starpu_sem_init(finished, 0, 0);
- starpu_hdf5_send_work(NULL, NULL, 0, base, obj, 0, ptr, size, (void*) finished, FULL_WRITE);
- return finished;
- }
- void * starpu_hdf5_copy(void *base_src, void* obj_src, off_t offset_src, void *base_dst, void* obj_dst, off_t offset_dst, size_t size)
- {
- starpu_sem_t * finished;
- _STARPU_MALLOC(finished, sizeof(*finished));
- starpu_sem_init(finished, 0, 0);
- starpu_hdf5_send_work(base_src, obj_src, offset_src, base_dst, obj_dst, offset_dst, NULL, size, (void*) finished, COPY);
- return finished;
- }
- static void starpu_hdf5_free_request(void * event)
- {
- starpu_sem_destroy(event);
- free(event);
- }
- static int get_hdf5_bandwidth_between_disk_and_main_ram(unsigned node, void *base)
- {
- unsigned iter;
- double timing_slowness, timing_latency;
- double start;
- double end;
- char *buf;
- struct starpu_hdf5_base * fileBase = (struct starpu_hdf5_base *) base;
- srand(time(NULL));
- starpu_malloc_flags((void **) &buf, STARPU_DISK_SIZE_MIN, 0);
- STARPU_ASSERT(buf != NULL);
- /* allocate memory */
- void *mem = _starpu_disk_alloc(node, STARPU_DISK_SIZE_MIN);
- /* fail to alloc */
- if (mem == NULL)
- return 0;
- memset(buf, 0, STARPU_DISK_SIZE_MIN);
- /* Measure upload slowness */
- start = starpu_timing_now();
- for (iter = 0; iter < NITER; ++iter)
- {
- _starpu_disk_write(STARPU_MAIN_RAM, node, mem, buf, 0, STARPU_DISK_SIZE_MIN, NULL);
- }
- end = starpu_timing_now();
- timing_slowness = end - start;
- /* free memory */
- starpu_free_flags(buf, STARPU_DISK_SIZE_MIN, 0);
- starpu_malloc_flags((void**) &buf, sizeof(char), 0);
- STARPU_ASSERT(buf != NULL);
- *buf = 0;
- /* Measure latency */
- start = starpu_timing_now();
- for (iter = 0; iter < NITER; ++iter)
- {
- _starpu_disk_write(STARPU_MAIN_RAM, node, mem, buf, rand() % (STARPU_DISK_SIZE_MIN -1) , 1, NULL);
- }
- end = starpu_timing_now();
- timing_latency = end - start;
- _starpu_disk_free(node, mem, STARPU_DISK_SIZE_MIN);
- starpu_free_flags(buf, sizeof(char), 0);
- _starpu_save_bandwidth_and_latency_disk((NITER/timing_slowness)*STARPU_DISK_SIZE_MIN, (NITER/timing_slowness)*STARPU_DISK_SIZE_MIN,
- timing_latency/NITER, timing_latency/NITER, node, fileBase->path);
- return 1;
- }
- struct starpu_disk_ops starpu_disk_hdf5_ops =
- {
- .alloc = starpu_hdf5_alloc,
- .free = starpu_hdf5_free,
- .open = starpu_hdf5_open,
- .close = starpu_hdf5_close,
- .read = starpu_hdf5_read,
- .write = starpu_hdf5_write,
- .plug = starpu_hdf5_plug,
- .unplug = starpu_hdf5_unplug,
- .copy = starpu_hdf5_copy,
- .bandwidth = get_hdf5_bandwidth_between_disk_and_main_ram,
- .full_read = starpu_hdf5_full_read,
- .full_write = starpu_hdf5_full_write,
- .async_read = starpu_hdf5_async_read,
- .async_write = starpu_hdf5_async_write,
- .async_full_read = starpu_hdf5_async_full_read,
- .async_full_write = starpu_hdf5_async_full_write,
- .wait_request = starpu_hdf5_wait,
- .test_request = starpu_hdf5_test,
- .free_request = starpu_hdf5_free_request
- };
|