disk_hdf5.c 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2017, 2019 CNRS
  4. * Copyright (C) 2017 Inria
  5. * Copyright (C) 2017 Université de Bordeaux
  6. *
  7. * StarPU is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU Lesser General Public License as published by
  9. * the Free Software Foundation; either version 2.1 of the License, or (at
  10. * your option) any later version.
  11. *
  12. * StarPU is distributed in the hope that it will be useful, but
  13. * WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  15. *
  16. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  17. */
  18. #include <fcntl.h>
  19. #include <errno.h>
  20. #include <common/config.h>
  21. #ifdef HAVE_UNISTD_H
  22. #include <unistd.h>
  23. #endif
  24. #include <time.h>
  25. #include <hdf5.h>
  26. #include <starpu.h>
  27. #include <core/disk.h>
  28. #include <core/perfmodel/perfmodel.h>
  29. #ifndef O_BINARY
  30. #define O_BINARY 0
  31. #endif
  32. #define NITER _starpu_calibration_minimum
  33. #define STARPU_CHUNK_DIM 4096
  34. /* ------------------- use HDF5 to write on disk ------------------- */
  35. #ifndef H5_HAVE_THREADSAFE
  36. static int nb_disk_open = 0;
  37. static volatile int init_finished = 0;
  38. static starpu_pthread_t global_thread; /* This thread will perform each write/read because we don't have asynchronous functions */
  39. static volatile int global_run; /* Ask to the thread if he can continue */
  40. static starpu_pthread_mutex_t global_mutex; /* Mutex is used to protect work_list and if HDF5 library is not safe */
  41. static starpu_pthread_cond_t global_cond;
  42. static struct _starpu_hdf5_work_list global_work_list; /* This list contains the work for the hdf5 thread */
  43. #endif
  44. #ifdef H5_HAVE_THREADSAFE
  45. #define HDF5_VAR_THREAD fileBase->thread
  46. #define HDF5_VAR_RUN fileBase->run
  47. #define HDF5_VAR_MUTEX fileBase->mutex
  48. #define HDF5_VAR_COND fileBase->cond
  49. #define HDF5_VAR_WORK_LIST fileBase->work_list
  50. #else
  51. #define HDF5_VAR_THREAD global_thread
  52. #define HDF5_VAR_RUN global_run
  53. #define HDF5_VAR_MUTEX global_mutex
  54. #define HDF5_VAR_COND global_cond
  55. #define HDF5_VAR_WORK_LIST global_work_list
  56. #endif
  57. enum hdf5_work_type { READ, WRITE, FULL_READ, FULL_WRITE, COPY };
  58. LIST_TYPE(_starpu_hdf5_work,
  59. enum hdf5_work_type type;
  60. struct starpu_hdf5_base * base_src;
  61. struct starpu_hdf5_obj * obj_src;
  62. off_t offset_src;
  63. struct starpu_hdf5_base * base_dst;
  64. struct starpu_hdf5_obj * obj_dst;
  65. off_t offset_dst;
  66. void * ptr;
  67. size_t size;
  68. void * event;
  69. );
  70. struct starpu_hdf5_base
  71. {
  72. hid_t fileID;
  73. char * path;
  74. unsigned created; /* StarPU creates the HDF5 file */
  75. unsigned next_dataset_id;
  76. starpu_pthread_t thread; /* This thread will perform each write/read because we don't have asynchronous functions */
  77. int run; /* Ask to the thread if he can continue */
  78. starpu_pthread_mutex_t mutex; /* Mutex is used to protect work_list and if HDF5 library is not safe */
  79. starpu_pthread_cond_t cond;
  80. struct _starpu_hdf5_work_list work_list; /* This list contains the work for the hdf5 thread */
  81. };
  82. struct starpu_hdf5_obj
  83. {
  84. hid_t dataset; /* describe this object in HDF5 file */
  85. char * path; /* path where data are stored in HDF5 file */
  86. size_t size;
  87. };
  88. static inline void _starpu_hdf5_protect_start(void * base STARPU_ATTRIBUTE_UNUSED)
  89. {
  90. #ifndef H5_HAVE_THREADSAFE
  91. if (base != NULL)
  92. STARPU_PTHREAD_MUTEX_LOCK(&HDF5_VAR_MUTEX);
  93. #endif
  94. }
  95. static inline void _starpu_hdf5_protect_stop(void * base STARPU_ATTRIBUTE_UNUSED)
  96. {
  97. #ifndef H5_HAVE_THREADSAFE
  98. if (base != NULL)
  99. STARPU_PTHREAD_MUTEX_UNLOCK(&HDF5_VAR_MUTEX);
  100. #endif
  101. }
  102. /* ------------------ Functions for internal thread -------------------- */
  103. /* TODO : Dataspace may not be NATIVE_CHAR for opened data */
  104. static void starpu_hdf5_full_read_internal(struct _starpu_hdf5_work * work)
  105. {
  106. herr_t status;
  107. status = H5Dread(work->obj_src->dataset, H5T_NATIVE_CHAR, H5S_ALL, H5S_ALL, H5P_DEFAULT, work->ptr);
  108. STARPU_ASSERT_MSG(status >= 0, "Can not read data associed to this dataset (%s)\n", work->obj_src->path);
  109. }
  110. /* TODO : Dataspace may not be NATIVE_CHAR for opened data */
  111. static void starpu_hdf5_full_write_internal(struct _starpu_hdf5_work * work)
  112. {
  113. herr_t status;
  114. /* Update size of dataspace */
  115. if (work->size > work->obj_dst->size)
  116. {
  117. /* Get official datatype */
  118. hid_t datatype = H5Dget_type(work->obj_dst->dataset);
  119. hsize_t sizeDatatype = H5Tget_size(datatype);
  120. /* Count in number of elements */
  121. hsize_t extendsdim[1] = {work->size/sizeDatatype};
  122. status = H5Dset_extent (work->obj_dst->dataset, extendsdim);
  123. STARPU_ASSERT_MSG(status >= 0, "Error when extending HDF5 dataspace !\n");
  124. work->obj_dst->size = work->size;
  125. }
  126. /* Write ALL the dataspace */
  127. status = H5Dwrite(work->obj_dst->dataset, H5T_NATIVE_CHAR, H5S_ALL, H5S_ALL, H5P_DEFAULT, work->ptr);
  128. STARPU_ASSERT_MSG(status >= 0, "Can not write data to this dataset (%s)\n", work->obj_dst->path);
  129. }
  130. static void starpu_hdf5_read_internal(struct _starpu_hdf5_work * work)
  131. {
  132. herr_t status;
  133. /* Get official datatype */
  134. hid_t datatype = H5Dget_type(work->obj_src->dataset);
  135. hsize_t sizeDatatype = H5Tget_size(datatype);
  136. /* count in element, not in byte */
  137. work->offset_src /= sizeDatatype;
  138. work->size /= sizeDatatype;
  139. /* duplicate the dataspace in the dataset */
  140. hid_t dataspace_select = H5Dget_space(work->obj_src->dataset);
  141. STARPU_ASSERT_MSG(dataspace_select >= 0, "Error when reading this HDF5 dataset (%s)\n", work->obj_src->path);
  142. /* Select what we want of the duplicated dataspace (it's called an hyperslab). This operation is done on place */
  143. hsize_t offsets[1] = {work->offset_src};
  144. hsize_t count[1] = {work->size};
  145. /* stride and block size are NULL which is equivalent of a shift of 1 */
  146. status = H5Sselect_hyperslab(dataspace_select, H5S_SELECT_SET, offsets, NULL, count, NULL);
  147. STARPU_ASSERT_MSG(status >= 0, "Error when reading this HDF5 dataset (%s)\n", work->obj_src->path);
  148. /* create the dataspace for the received data which describes ptr */
  149. hsize_t dims_receive[1] = {work->size};
  150. hid_t dataspace_receive = H5Screate_simple(1, dims_receive, NULL);
  151. STARPU_ASSERT_MSG(dataspace_receive >= 0, "Error when reading this HDF5 dataset (%s)\n", work->obj_src->path);
  152. /* Receiver has to be an hyperslabs */
  153. offsets[0] = 0;
  154. count[0] = work->size;
  155. H5Sselect_hyperslab(dataspace_receive, H5S_SELECT_SET, offsets, NULL, count, NULL);
  156. STARPU_ASSERT_MSG(dataspace_receive >= 0, "Error when reading this HDF5 dataset (%s)\n", work->obj_src->path);
  157. status = H5Dread(work->obj_src->dataset, datatype, dataspace_receive, dataspace_select, H5P_DEFAULT, work->ptr);
  158. STARPU_ASSERT_MSG(status >= 0, "Error when reading this HDF5 dataset (%s)\n", work->obj_src->path);
  159. /* don't need these dataspaces */
  160. status = H5Sclose(dataspace_select);
  161. STARPU_ASSERT_MSG(status >= 0, "Error when reading this HDF5 dataset (%s)\n", work->obj_src->path);
  162. status = H5Sclose(dataspace_receive);
  163. STARPU_ASSERT_MSG(status >= 0, "Error when reading this HDF5 dataset (%s)\n", work->obj_src->path);
  164. }
  165. static void starpu_hdf5_write_internal(struct _starpu_hdf5_work * work)
  166. {
  167. herr_t status;
  168. /* Get official datatype */
  169. hid_t datatype = H5Dget_type(work->obj_dst->dataset);
  170. hsize_t sizeDatatype = H5Tget_size(datatype);
  171. /* Update size of dataspace */
  172. if (work->size + work->offset_dst > work->obj_dst->size)
  173. {
  174. /* Count in number of elements */
  175. hsize_t extendsdim[1] = {(work->offset_dst + work->size)/sizeDatatype};
  176. status = H5Dset_extent (work->obj_dst->dataset, extendsdim);
  177. STARPU_ASSERT_MSG(status >= 0, "Error when extending HDF5 dataspace !\n");
  178. work->obj_dst->size = work->offset_dst + work->size;
  179. }
  180. /* count in element, not in byte */
  181. work->offset_dst /= sizeDatatype;
  182. work->size /= sizeDatatype;
  183. /* duplicate the dataspace in the dataset */
  184. hid_t dataspace_select = H5Dget_space(work->obj_dst->dataset);
  185. STARPU_ASSERT_MSG(dataspace_select >= 0, "Error when writing this HDF5 dataset (%s)\n", work->obj_dst->path);
  186. /* Select what we want of the duplicated dataspace (it's called an hyperslab). This operation is done on place */
  187. hsize_t offsets[1] = {work->offset_dst};
  188. hsize_t count[1] = {work->size};
  189. /* stride and block size are NULL which is equivalent of a shift of 1 */
  190. status = H5Sselect_hyperslab(dataspace_select, H5S_SELECT_SET, offsets, NULL, count, NULL);
  191. STARPU_ASSERT_MSG(status >= 0, "Error when writing this HDF5 dataset (%s)\n", work->obj_dst->path);
  192. /* create the dataspace for the received data which describes ptr */
  193. hsize_t dims_send[1] = {work->size};
  194. hid_t dataspace_send = H5Screate_simple(1, dims_send, NULL);
  195. STARPU_ASSERT_MSG(dataspace_send >= 0, "Error when writing this HDF5 dataset (%s)\n", work->obj_dst->path);
  196. /* Receiver has to be an hyperslabs */
  197. offsets[0] = 0;
  198. count[0] = work->size;
  199. H5Sselect_hyperslab(dataspace_send, H5S_SELECT_SET, offsets, NULL, count, NULL);
  200. STARPU_ASSERT_MSG(dataspace_send >= 0, "Error when writing this HDF5 dataset (%s)\n", work->obj_dst->path);
  201. status = H5Dwrite(work->obj_dst->dataset, datatype, dataspace_send, dataspace_select, H5P_DEFAULT, work->ptr);
  202. STARPU_ASSERT_MSG(status >= 0, "Error when writing this HDF5 dataset (%s)\n", work->obj_dst->path);
  203. /* don't need these dataspaces */
  204. status = H5Sclose(dataspace_select);
  205. STARPU_ASSERT_MSG(status >= 0, "Error when writing this HDF5 dataset (%s)\n", work->obj_dst->path);
  206. status = H5Sclose(dataspace_send);
  207. STARPU_ASSERT_MSG(status >= 0, "Error when writing this HDF5 dataset (%s)\n", work->obj_dst->path);
  208. }
  209. unsigned warned = 0;
  210. static void starpu_hdf5_copy_internal(struct _starpu_hdf5_work * work)
  211. {
  212. herr_t status;
  213. /* HDF5 H50copy supports only same size in both areas and copies the entire object */
  214. if (work->offset_src == 0 && work->offset_dst == 0 && work->size == work->obj_src->size && work->size == work->obj_dst->size)
  215. {
  216. H5Dclose(work->obj_dst->dataset);
  217. /* Dirty : Delete dataspace because H5Ocopy only works if destination does not exist */
  218. H5Ldelete(work->base_dst->fileID, work->obj_dst->path, H5P_DEFAULT);
  219. status = H5Ocopy(work->base_src->fileID, work->obj_src->path, work->base_dst->fileID, work->obj_dst->path, H5P_DEFAULT, H5P_DEFAULT);
  220. STARPU_ASSERT_MSG(status >= 0, "Can not copy data (%s) associed to this disk (%s) to the data (%s) on this disk (%s)\n", work->obj_src->path, work->base_src->path, work->obj_dst->path, work->base_dst->path);
  221. work->obj_dst->dataset = H5Dopen2(work->base_dst->fileID, work->obj_dst->path, H5P_DEFAULT);
  222. }
  223. else
  224. {
  225. if (!warned)
  226. {
  227. _STARPU_DISP("Direct disk to disk copy is not supported for a piece of data. Data will be transfered to RAM memory and then, be pushed on disk \n");
  228. warned = 1;
  229. }
  230. void * ptr;
  231. int ret = _starpu_malloc_flags_on_node(STARPU_MAIN_RAM, &ptr, work->size, 0);
  232. STARPU_ASSERT_MSG(ret == 0, "Cannot allocate %lu bytes to perform disk to disk operation", (unsigned long)work->size);
  233. /* buffer is only used internally to store intermediate data */
  234. work->ptr = ptr;
  235. starpu_hdf5_read_internal(work);
  236. starpu_hdf5_write_internal(work);
  237. _starpu_free_flags_on_node(STARPU_MAIN_RAM, ptr, work->size, 0);
  238. }
  239. }
  240. static void * _starpu_hdf5_internal_thread(void * arg)
  241. {
  242. #ifdef H5_HAVE_THREADSAFE
  243. struct starpu_hdf5_base * fileBase = (struct starpu_hdf5_base *) arg;
  244. #endif
  245. while (HDF5_VAR_RUN || !_starpu_hdf5_work_list_empty(&HDF5_VAR_WORK_LIST))
  246. {
  247. STARPU_PTHREAD_MUTEX_LOCK(&HDF5_VAR_MUTEX);
  248. if (_starpu_hdf5_work_list_empty(&HDF5_VAR_WORK_LIST) && HDF5_VAR_RUN)
  249. STARPU_PTHREAD_COND_WAIT(&HDF5_VAR_COND, &HDF5_VAR_MUTEX);
  250. STARPU_PTHREAD_MUTEX_UNLOCK(&HDF5_VAR_MUTEX);
  251. /* We are the only consummer here, don't need to protect here */
  252. if (!_starpu_hdf5_work_list_empty(&HDF5_VAR_WORK_LIST))
  253. {
  254. STARPU_PTHREAD_MUTEX_LOCK(&HDF5_VAR_MUTEX);
  255. struct _starpu_hdf5_work * work = _starpu_hdf5_work_list_pop_back(&HDF5_VAR_WORK_LIST);
  256. STARPU_PTHREAD_MUTEX_UNLOCK(&HDF5_VAR_MUTEX);
  257. if (work->base_src < work->base_dst)
  258. {
  259. _starpu_hdf5_protect_start(work->base_src);
  260. #ifdef H5_HAVE_THREADSAFE
  261. _starpu_hdf5_protect_start(work->base_dst);
  262. #endif
  263. }
  264. else
  265. {
  266. _starpu_hdf5_protect_start(work->base_dst);
  267. #ifdef H5_HAVE_THREADSAFE
  268. if (work->base_src != work->base_dst)
  269. _starpu_hdf5_protect_start(work->base_src);
  270. #endif
  271. }
  272. switch(work->type)
  273. {
  274. case READ:
  275. starpu_hdf5_read_internal(work);
  276. break;
  277. case WRITE:
  278. starpu_hdf5_write_internal(work);
  279. break;
  280. case FULL_READ:
  281. starpu_hdf5_full_read_internal(work);
  282. break;
  283. case FULL_WRITE:
  284. starpu_hdf5_full_write_internal(work);
  285. break;
  286. case COPY:
  287. starpu_hdf5_copy_internal(work);
  288. break;
  289. default:
  290. STARPU_ABORT();
  291. }
  292. if (work->base_src < work->base_dst)
  293. {
  294. _starpu_hdf5_protect_stop(work->base_src);
  295. #ifdef H5_HAVE_THREADSAFE
  296. _starpu_hdf5_protect_stop(work->base_dst);
  297. #endif
  298. }
  299. else
  300. {
  301. _starpu_hdf5_protect_stop(work->base_dst);
  302. #ifdef H5_HAVE_THREADSAFE
  303. if (work->base_src != work->base_dst)
  304. _starpu_hdf5_protect_stop(work->base_src);
  305. #endif
  306. }
  307. /* Update event to tell it's finished */
  308. starpu_sem_post((starpu_sem_t *) work->event);
  309. free(work);
  310. }
  311. }
  312. return NULL;
  313. }
  314. static void _starpu_hdf5_create_thread(struct starpu_hdf5_base * fileBase)
  315. {
  316. _starpu_hdf5_work_list_init(&HDF5_VAR_WORK_LIST);
  317. HDF5_VAR_RUN = 1;
  318. STARPU_PTHREAD_COND_INIT(&HDF5_VAR_COND, NULL);
  319. STARPU_PTHREAD_CREATE(&HDF5_VAR_THREAD, NULL, _starpu_hdf5_internal_thread, (void *) fileBase);
  320. }
  321. /* returns the size in BYTES */
  322. static hsize_t _starpu_get_size_obj(struct starpu_hdf5_obj * obj)
  323. {
  324. herr_t status;
  325. hid_t dataspace = H5Dget_space(obj->dataset);
  326. STARPU_ASSERT_MSG(dataspace >= 0, "Can not get the size of this HDF5 dataset (%s)\n", obj->path);
  327. hsize_t dims[1];
  328. status = H5Sget_simple_extent_dims(dataspace, dims, NULL);
  329. STARPU_ASSERT_MSG(status >= 0, "Can not get the size of this HDF5 dataset (%s)\n", obj->path);
  330. hid_t datatype = H5Dget_type(obj->dataset);
  331. STARPU_ASSERT_MSG(datatype >= 0, "Can not get the size of this HDF5 dataset (%s)\n", obj->path);
  332. hsize_t sizeDatatype = H5Tget_size(datatype);
  333. STARPU_ASSERT_MSG(sizeDatatype > 0, "Can not get the size of this HDF5 dataset (%s)\n", obj->path);
  334. H5Sclose(dataspace);
  335. H5Tclose(datatype);
  336. return dims[0]*sizeDatatype;
  337. }
  338. static void starpu_hdf5_send_work(void *base_src, void *obj_src, off_t offset_src, void *base_dst, void *obj_dst, off_t offset_dst, void *buf, size_t size, void * event, enum hdf5_work_type type)
  339. {
  340. struct starpu_hdf5_obj * dataObj_src = (struct starpu_hdf5_obj *) obj_src;
  341. struct starpu_hdf5_obj * dataObj_dst = (struct starpu_hdf5_obj *) obj_dst;
  342. struct starpu_hdf5_base * fileBase_src = (struct starpu_hdf5_base *) base_src;
  343. struct starpu_hdf5_base * fileBase_dst = (struct starpu_hdf5_base *) base_dst;
  344. struct _starpu_hdf5_work * work;
  345. _STARPU_MALLOC(work, sizeof(*work));
  346. work->type = type;
  347. work->base_src = fileBase_src;
  348. work->obj_src = dataObj_src;
  349. work->offset_src = offset_src;
  350. work->base_dst = fileBase_dst;
  351. work->obj_dst = dataObj_dst;
  352. work->offset_dst = offset_dst;
  353. work->ptr = buf;
  354. work->size = size;
  355. work->event = event;
  356. #ifdef H5_HAVE_THREADSAFE
  357. struct starpu_hdf5_base * fileBase;
  358. if (fileBase_src != NULL)
  359. fileBase = fileBase_src;
  360. else
  361. fileBase = fileBase_dst;
  362. #endif
  363. STARPU_PTHREAD_MUTEX_LOCK(&HDF5_VAR_MUTEX);
  364. _starpu_hdf5_work_list_push_front(&HDF5_VAR_WORK_LIST, work);
  365. /* Wake up internal thread */
  366. STARPU_PTHREAD_COND_BROADCAST(&HDF5_VAR_COND);
  367. STARPU_PTHREAD_MUTEX_UNLOCK(&HDF5_VAR_MUTEX);
  368. }
  369. static struct starpu_hdf5_obj * _starpu_hdf5_data_alloc(struct starpu_hdf5_base * fileBase, char * name, size_t size)
  370. {
  371. struct starpu_hdf5_obj * obj;
  372. _STARPU_MALLOC(obj, sizeof(*obj));
  373. _starpu_hdf5_protect_start((void *) fileBase);
  374. /* create a dataspace with one dimension of size elements */
  375. hsize_t dim[1] = {size};
  376. hsize_t maxdim[1] = {H5S_UNLIMITED};
  377. hid_t dataspace = H5Screate_simple(1, dim, maxdim);
  378. if (dataspace < 0)
  379. {
  380. free(obj);
  381. return NULL;
  382. }
  383. hsize_t chunkdim[1] = {STARPU_CHUNK_DIM};
  384. hid_t prop = H5Pcreate (H5P_DATASET_CREATE);
  385. herr_t status = H5Pset_chunk (prop, 1, chunkdim);
  386. STARPU_ASSERT_MSG(status >= 0, "Error when setting HDF5 property \n");
  387. /* create a dataset at location name, with data described by the dataspace.
  388. * Each element are like char in C (expected one byte)
  389. */
  390. obj->dataset = H5Dcreate2(fileBase->fileID, name, H5T_NATIVE_CHAR, dataspace, H5P_DEFAULT, prop, H5P_DEFAULT);
  391. H5Sclose(dataspace);
  392. H5Pclose(prop);
  393. if (obj->dataset < 0)
  394. {
  395. free(obj);
  396. return NULL;
  397. }
  398. obj->path = name;
  399. obj->size = size;
  400. _starpu_hdf5_protect_stop((void *) fileBase);
  401. return obj;
  402. }
  403. static struct starpu_hdf5_obj * _starpu_hdf5_data_open(struct starpu_hdf5_base * fileBase, char * name, size_t size)
  404. {
  405. struct starpu_hdf5_obj * obj;
  406. _STARPU_MALLOC(obj, sizeof(*obj));
  407. _starpu_hdf5_protect_start((void *) fileBase);
  408. /* create a dataset at location name, with data described by the dataspace.
  409. * Each element are like char in C (expected one byte)
  410. */
  411. obj->dataset = H5Dopen2(fileBase->fileID, name, H5P_DEFAULT);
  412. _starpu_hdf5_protect_stop((void *) fileBase);
  413. if (obj->dataset < 0)
  414. {
  415. free(obj);
  416. return NULL;
  417. }
  418. obj->path = name;
  419. obj->size = size;
  420. return obj;
  421. }
  422. static void *starpu_hdf5_plug(void *parameter, starpu_ssize_t size STARPU_ATTRIBUTE_UNUSED)
  423. {
  424. struct starpu_hdf5_base * fileBase;
  425. _STARPU_MALLOC(fileBase, sizeof(struct starpu_hdf5_base));
  426. #ifndef H5_HAVE_THREADSAFE
  427. int actual_nb_disk = STARPU_ATOMIC_ADD(&nb_disk_open, 1);
  428. if (actual_nb_disk == 1)
  429. {
  430. #endif
  431. STARPU_PTHREAD_MUTEX_INIT(&HDF5_VAR_MUTEX, NULL);
  432. #ifndef H5_HAVE_THREADSAFE
  433. }
  434. else
  435. {
  436. while (!init_finished)
  437. ;
  438. }
  439. #endif
  440. _starpu_hdf5_protect_start(fileBase);
  441. struct stat buf;
  442. if (stat(parameter, &buf) != 0 || !S_ISREG(buf.st_mode))
  443. {
  444. /* The file doesn't exist or the directory exists => create the datafile */
  445. int id;
  446. _starpu_mkpath(parameter, S_IRWXU);
  447. fileBase->path = _starpu_mktemp(parameter, O_RDWR | O_BINARY, &id);
  448. if (!fileBase->path)
  449. {
  450. free(fileBase);
  451. _STARPU_ERROR("Can not create the HDF5 file (%s)", (char *) parameter);
  452. return NULL;
  453. }
  454. /* just use _starpu_mktemp_many to create a file, close the file descriptor */
  455. close(id);
  456. /* Truncate it */
  457. fileBase->fileID = H5Fcreate((char *)fileBase->path, H5F_ACC_TRUNC, H5P_DEFAULT, H5P_DEFAULT);
  458. if (fileBase->fileID < 0)
  459. {
  460. free(fileBase);
  461. _STARPU_ERROR("Can not create the HDF5 file (%s)", (char *) parameter);
  462. return NULL;
  463. }
  464. fileBase->created = 1;
  465. }
  466. else
  467. {
  468. /* Well, open it ! */
  469. char *path = strdup((char *)parameter);
  470. STARPU_ASSERT(path);
  471. fileBase->fileID = H5Fopen((char *)parameter, H5F_ACC_RDWR, H5P_DEFAULT);
  472. if (fileBase->fileID < 0)
  473. {
  474. free(fileBase);
  475. free(path);
  476. _STARPU_ERROR("Can not open the HDF5 file (%s)", (char *) parameter);
  477. return NULL;
  478. }
  479. fileBase->created = 0;
  480. fileBase->path = path;
  481. }
  482. #ifndef H5_HAVE_THREADSAFE
  483. if (actual_nb_disk == 1)
  484. {
  485. #endif
  486. _starpu_hdf5_create_thread(fileBase);
  487. #ifndef H5_HAVE_THREADSAFE
  488. init_finished = 1;
  489. }
  490. #endif
  491. #if H5_VERS_MAJOR > 1 || (H5_VERS_MAJOR == 1 && H5_VERS_MINOR > 10) || (H5_VERS_MAJOR == 1 && H5_VERS_MINOR == 10 && H5_VERS_RELEASE > 0)
  492. H5Pset_file_space_strategy(fileBase->fileID, H5F_FSPACE_STRATEGY_FSM_AGGR, 0, 0);
  493. #endif
  494. _starpu_hdf5_protect_stop(fileBase);
  495. fileBase->next_dataset_id = 0;
  496. return (void *) fileBase;
  497. }
  498. /* free memory allocated for the base */
  499. static void starpu_hdf5_unplug(void *base)
  500. {
  501. #ifndef H5_HAVE_THREADSAFE
  502. int actual_nb_disk = STARPU_ATOMIC_ADD(&nb_disk_open, -1);
  503. #endif
  504. struct starpu_hdf5_base * fileBase = (struct starpu_hdf5_base *) base;
  505. herr_t status;
  506. STARPU_PTHREAD_MUTEX_LOCK(&HDF5_VAR_MUTEX);
  507. #ifndef H5_HAVE_THREADSAFE
  508. if (actual_nb_disk == 0)
  509. {
  510. #endif
  511. HDF5_VAR_RUN = 0;
  512. STARPU_PTHREAD_COND_BROADCAST(&HDF5_VAR_COND);
  513. STARPU_PTHREAD_MUTEX_UNLOCK(&HDF5_VAR_MUTEX);
  514. STARPU_PTHREAD_JOIN(HDF5_VAR_THREAD, NULL);
  515. STARPU_PTHREAD_MUTEX_LOCK(&HDF5_VAR_MUTEX);
  516. STARPU_PTHREAD_COND_DESTROY(&HDF5_VAR_COND);
  517. STARPU_ASSERT(_starpu_hdf5_work_list_empty(&HDF5_VAR_WORK_LIST));
  518. /* the internal thread is deleted */
  519. #ifndef H5_HAVE_THREADSAFE
  520. }
  521. #endif
  522. status = H5Fclose(fileBase->fileID);
  523. STARPU_PTHREAD_MUTEX_UNLOCK(&HDF5_VAR_MUTEX);
  524. #ifndef H5_HAVE_THREADSAFE
  525. if (actual_nb_disk == 0)
  526. {
  527. #endif
  528. STARPU_PTHREAD_MUTEX_DESTROY(&HDF5_VAR_MUTEX);
  529. #ifndef H5_HAVE_THREADSAFE
  530. init_finished = 0;
  531. }
  532. #endif
  533. STARPU_ASSERT_MSG(status >= 0, "Can not unplug this HDF5 disk (%s)\n", fileBase->path);
  534. if (fileBase->created)
  535. {
  536. unlink(fileBase->path);
  537. }
  538. else
  539. {
  540. /* Warn user about repack, because unlink dataset doesn't delete data in file */
  541. _STARPU_DISP("This disk (%s) was used to store temporary data. You may use the h5repack command to reduce the size of the file... \n", fileBase->path);
  542. }
  543. free(fileBase->path);
  544. free(fileBase);
  545. }
  546. static void *starpu_hdf5_alloc(void *base, size_t size)
  547. {
  548. struct starpu_hdf5_base * fileBase = (struct starpu_hdf5_base *) base;
  549. struct starpu_hdf5_obj * obj;
  550. char * name;
  551. char * prefix = "STARPU_";
  552. char name_id[16];
  553. /* Save the name of the dataset */
  554. STARPU_PTHREAD_MUTEX_LOCK(&HDF5_VAR_MUTEX);
  555. snprintf(name_id, sizeof(name_id), "%u", fileBase->next_dataset_id);
  556. fileBase->next_dataset_id++;
  557. STARPU_PTHREAD_MUTEX_UNLOCK(&HDF5_VAR_MUTEX);
  558. /* name in HDF5 is like a path */
  559. _STARPU_MALLOC(name, 1+strlen(prefix)+strlen(name_id)+1);
  560. snprintf(name, 1+strlen(prefix)+strlen(name_id)+1, "/%s%s", prefix, name_id);
  561. obj = _starpu_hdf5_data_alloc(fileBase, name, size);
  562. if (!obj)
  563. {
  564. free(name);
  565. }
  566. return (void *) obj;
  567. }
  568. static void starpu_hdf5_free(void *base, void *obj, size_t size STARPU_ATTRIBUTE_UNUSED)
  569. {
  570. struct starpu_hdf5_base * fileBase = (struct starpu_hdf5_base *) base;
  571. struct starpu_hdf5_obj * dataObj = (struct starpu_hdf5_obj *) obj;
  572. herr_t status;
  573. _starpu_hdf5_protect_start(base);
  574. status = H5Dclose(dataObj->dataset);
  575. STARPU_ASSERT_MSG(status >= 0, "Can not free this HDF5 dataset (%s)\n", dataObj->path);
  576. /* remove the dataset link in the HDF5
  577. * But it doesn't delete the space in the file */
  578. status = H5Ldelete(fileBase->fileID, dataObj->path, H5P_DEFAULT);
  579. STARPU_ASSERT_MSG(status >= 0, "Can not delete the link associed to this dataset (%s)\n", dataObj->path);
  580. _starpu_hdf5_protect_stop(base);
  581. free(dataObj->path);
  582. free(dataObj);
  583. }
  584. static void *starpu_hdf5_open(void *base, void *pos, size_t size)
  585. {
  586. struct starpu_hdf5_base * fileBase = (struct starpu_hdf5_base *) base;
  587. struct starpu_hdf5_obj * obj;
  588. char *name;
  589. name = strdup((char *)pos);
  590. STARPU_ASSERT(name);
  591. obj = _starpu_hdf5_data_open(fileBase, name, size);
  592. if (!obj)
  593. {
  594. free(name);
  595. }
  596. return (void *) obj;
  597. }
  598. static void starpu_hdf5_close(void *base, void *obj, size_t size STARPU_ATTRIBUTE_UNUSED)
  599. {
  600. struct starpu_hdf5_obj * dataObj = (struct starpu_hdf5_obj *) obj;
  601. herr_t status;
  602. _starpu_hdf5_protect_start(base);
  603. status = H5Dclose(dataObj->dataset);
  604. STARPU_ASSERT_MSG(status >= 0, "Can not close this HDF5 dataset (%s)\n", dataObj->path);
  605. _starpu_hdf5_protect_stop(base);
  606. free(dataObj->path);
  607. free(dataObj);
  608. }
  609. static void starpu_hdf5_wait(void * event)
  610. {
  611. starpu_sem_t * finished = (starpu_sem_t *) event;
  612. starpu_sem_wait(finished);
  613. }
  614. static int starpu_hdf5_test(void * event)
  615. {
  616. starpu_sem_t * finished = (starpu_sem_t *) event;
  617. return starpu_sem_trywait(finished) == 0;
  618. }
  619. static int starpu_hdf5_full_read(void *base, void *obj, void **ptr, size_t *size, unsigned dst_node)
  620. {
  621. struct starpu_hdf5_obj * dataObj = (struct starpu_hdf5_obj *) obj;
  622. starpu_sem_t finished;
  623. starpu_sem_init(&finished, 0, 0);
  624. _starpu_hdf5_protect_start(base);
  625. *size = _starpu_get_size_obj(dataObj);
  626. _starpu_hdf5_protect_stop(base);
  627. _starpu_malloc_flags_on_node(dst_node, ptr, *size, 0);
  628. starpu_hdf5_send_work(base, obj, 0, NULL, NULL, 0, *ptr, *size, (void*) &finished, FULL_READ);
  629. starpu_hdf5_wait(&finished);
  630. starpu_sem_destroy(&finished);
  631. return 0;
  632. }
  633. static int starpu_hdf5_full_write(void *base, void *obj, void *ptr, size_t size)
  634. {
  635. starpu_sem_t finished;
  636. starpu_sem_init(&finished, 0, 0);
  637. starpu_hdf5_send_work(NULL, NULL, 0, base, obj, 0, ptr, size, (void*) &finished, FULL_WRITE);
  638. starpu_hdf5_wait(&finished);
  639. starpu_sem_destroy(&finished);
  640. return 0;
  641. }
  642. static int starpu_hdf5_read(void *base, void *obj, void *buf, off_t offset, size_t size)
  643. {
  644. starpu_sem_t finished;
  645. starpu_sem_init(&finished, 0, 0);
  646. starpu_hdf5_send_work(base, obj, offset, NULL, NULL, 0, buf, size, (void*) &finished, READ);
  647. starpu_hdf5_wait(&finished);
  648. starpu_sem_destroy(&finished);
  649. return 0;
  650. }
  651. static int starpu_hdf5_write(void *base, void *obj, const void *buf, off_t offset, size_t size)
  652. {
  653. starpu_sem_t finished;
  654. starpu_sem_init(&finished, 0, 0);
  655. starpu_hdf5_send_work(NULL, NULL, 0, base, obj, offset, (void *) buf, size, (void*) &finished, WRITE);
  656. starpu_hdf5_wait(&finished);
  657. starpu_sem_destroy(&finished);
  658. return 0;
  659. }
  660. static void * starpu_hdf5_async_read(void *base, void *obj, void *buf, off_t offset, size_t size)
  661. {
  662. starpu_sem_t * finished;
  663. _STARPU_MALLOC(finished, sizeof(*finished));
  664. starpu_sem_init(finished, 0, 0);
  665. starpu_hdf5_send_work(base, obj, offset, NULL, NULL, 0, buf, size, (void*) finished, READ);
  666. return finished;
  667. }
  668. static void * starpu_hdf5_async_write(void *base, void *obj, void *buf, off_t offset, size_t size)
  669. {
  670. starpu_sem_t * finished;
  671. _STARPU_MALLOC(finished, sizeof(*finished));
  672. starpu_sem_init(finished, 0, 0);
  673. starpu_hdf5_send_work(NULL, NULL, 0, base, obj, offset, (void *) buf, size, (void*) finished, WRITE);
  674. return finished;
  675. }
  676. void * starpu_hdf5_async_full_read (void * base, void * obj, void ** ptr, size_t * size, unsigned dst_node)
  677. {
  678. struct starpu_hdf5_obj * dataObj = (struct starpu_hdf5_obj *) obj;
  679. starpu_sem_t * finished;
  680. _STARPU_MALLOC(finished, sizeof(*finished));
  681. starpu_sem_init(finished, 0, 0);
  682. _starpu_hdf5_protect_start(base);
  683. *size = _starpu_get_size_obj(dataObj);
  684. _starpu_hdf5_protect_stop(base);
  685. _starpu_malloc_flags_on_node(dst_node, ptr, *size, 0);
  686. starpu_hdf5_send_work(base, obj, 0, NULL, NULL, 0, *ptr, *size, (void*) finished, FULL_READ);
  687. return finished;
  688. }
  689. void * starpu_hdf5_async_full_write (void * base, void * obj, void * ptr, size_t size)
  690. {
  691. starpu_sem_t * finished;
  692. _STARPU_MALLOC(finished, sizeof(*finished));
  693. starpu_sem_init(finished, 0, 0);
  694. starpu_hdf5_send_work(NULL, NULL, 0, base, obj, 0, ptr, size, (void*) finished, FULL_WRITE);
  695. return finished;
  696. }
  697. void * starpu_hdf5_copy(void *base_src, void* obj_src, off_t offset_src, void *base_dst, void* obj_dst, off_t offset_dst, size_t size)
  698. {
  699. starpu_sem_t * finished;
  700. _STARPU_MALLOC(finished, sizeof(*finished));
  701. starpu_sem_init(finished, 0, 0);
  702. starpu_hdf5_send_work(base_src, obj_src, offset_src, base_dst, obj_dst, offset_dst, NULL, size, (void*) finished, COPY);
  703. return finished;
  704. }
  705. static void starpu_hdf5_free_request(void * event)
  706. {
  707. starpu_sem_destroy(event);
  708. free(event);
  709. }
  710. static int get_hdf5_bandwidth_between_disk_and_main_ram(unsigned node, void *base)
  711. {
  712. unsigned iter;
  713. double timing_slowness, timing_latency;
  714. double start;
  715. double end;
  716. char *buf;
  717. struct starpu_hdf5_base * fileBase = (struct starpu_hdf5_base *) base;
  718. srand(time(NULL));
  719. starpu_malloc_flags((void **) &buf, STARPU_DISK_SIZE_MIN, 0);
  720. STARPU_ASSERT(buf != NULL);
  721. /* allocate memory */
  722. void *mem = _starpu_disk_alloc(node, STARPU_DISK_SIZE_MIN);
  723. /* fail to alloc */
  724. if (mem == NULL)
  725. return 0;
  726. memset(buf, 0, STARPU_DISK_SIZE_MIN);
  727. /* Measure upload slowness */
  728. start = starpu_timing_now();
  729. for (iter = 0; iter < NITER; ++iter)
  730. {
  731. _starpu_disk_write(STARPU_MAIN_RAM, node, mem, buf, 0, STARPU_DISK_SIZE_MIN, NULL);
  732. }
  733. end = starpu_timing_now();
  734. timing_slowness = end - start;
  735. /* free memory */
  736. starpu_free_flags(buf, STARPU_DISK_SIZE_MIN, 0);
  737. starpu_malloc_flags((void**) &buf, sizeof(char), 0);
  738. STARPU_ASSERT(buf != NULL);
  739. *buf = 0;
  740. /* Measure latency */
  741. start = starpu_timing_now();
  742. for (iter = 0; iter < NITER; ++iter)
  743. {
  744. _starpu_disk_write(STARPU_MAIN_RAM, node, mem, buf, rand() % (STARPU_DISK_SIZE_MIN -1) , 1, NULL);
  745. }
  746. end = starpu_timing_now();
  747. timing_latency = end - start;
  748. _starpu_disk_free(node, mem, STARPU_DISK_SIZE_MIN);
  749. starpu_free_flags(buf, sizeof(char), 0);
  750. _starpu_save_bandwidth_and_latency_disk((NITER/timing_slowness)*STARPU_DISK_SIZE_MIN, (NITER/timing_slowness)*STARPU_DISK_SIZE_MIN,
  751. timing_latency/NITER, timing_latency/NITER, node, fileBase->path);
  752. return 1;
  753. }
  754. struct starpu_disk_ops starpu_disk_hdf5_ops =
  755. {
  756. .alloc = starpu_hdf5_alloc,
  757. .free = starpu_hdf5_free,
  758. .open = starpu_hdf5_open,
  759. .close = starpu_hdf5_close,
  760. .read = starpu_hdf5_read,
  761. .write = starpu_hdf5_write,
  762. .plug = starpu_hdf5_plug,
  763. .unplug = starpu_hdf5_unplug,
  764. .copy = starpu_hdf5_copy,
  765. .bandwidth = get_hdf5_bandwidth_between_disk_and_main_ram,
  766. .full_read = starpu_hdf5_full_read,
  767. .full_write = starpu_hdf5_full_write,
  768. .async_read = starpu_hdf5_async_read,
  769. .async_write = starpu_hdf5_async_write,
  770. .async_full_read = starpu_hdf5_async_full_read,
  771. .async_full_write = starpu_hdf5_async_full_write,
  772. .wait_request = starpu_hdf5_wait,
  773. .test_request = starpu_hdf5_test,
  774. .free_request = starpu_hdf5_free_request
  775. };