disk_hdf5.c 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2017 Inria
  4. *
  5. * StarPU is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * StarPU is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <fcntl.h>
  17. #include <errno.h>
  18. #include <common/config.h>
  19. #ifdef HAVE_UNISTD_H
  20. #include <unistd.h>
  21. #endif
  22. #include <time.h>
  23. #include <hdf5.h>
  24. #include <starpu.h>
  25. #include <core/disk.h>
  26. #include <core/perfmodel/perfmodel.h>
  27. #ifndef O_BINARY
  28. #define O_BINARY 0
  29. #endif
  30. #define NITER _starpu_calibration_minimum
  31. #define STARPU_CHUNK_DIM 4096
  32. /* TODO: support disk-to-disk copy with HD5Ocopy */
  33. /* ------------------- use HDF5 to write on disk ------------------- */
  34. #ifndef H5_HAVE_THREADSAFE
  35. static int nb_disk_open = 0;
  36. static volatile int init_finished = 0;
  37. static starpu_pthread_t global_thread; /* This thread will perform each write/read because we don't have asynchronous functions */
  38. static volatile int global_run; /* Ask to the thread if he can continue */
  39. static starpu_pthread_mutex_t global_mutex; /* Mutex is used to protect work_list and if HDF5 library is not safe */
  40. static starpu_pthread_cond_t global_cond;
  41. static struct _starpu_hdf5_work_list global_work_list; /* This list contains the work for the hdf5 thread */
  42. #endif
  43. #ifdef H5_HAVE_THREADSAFE
  44. #define HDF5_VAR_THREAD fileBase->thread
  45. #define HDF5_VAR_RUN fileBase->run
  46. #define HDF5_VAR_MUTEX fileBase->mutex
  47. #define HDF5_VAR_COND fileBase->cond
  48. #define HDF5_VAR_WORK_LIST fileBase->work_list
  49. #else
  50. #define HDF5_VAR_THREAD global_thread
  51. #define HDF5_VAR_RUN global_run
  52. #define HDF5_VAR_MUTEX global_mutex
  53. #define HDF5_VAR_COND global_cond
  54. #define HDF5_VAR_WORK_LIST global_work_list
  55. #endif
  56. enum hdf5_work_type { READ, WRITE, FULL_READ, FULL_WRITE };
  57. LIST_TYPE(_starpu_hdf5_work,
  58. enum hdf5_work_type type;
  59. struct starpu_hdf5_obj * obj;
  60. struct starpu_hdf5_base * base;
  61. void * ptr;
  62. off_t offset;
  63. size_t size;
  64. void * event;
  65. );
  66. struct starpu_hdf5_base
  67. {
  68. hid_t fileID;
  69. char * path;
  70. unsigned created; /* StarPU creates the HDF5 file */
  71. unsigned next_dataset_id;
  72. starpu_pthread_t thread; /* This thread will perform each write/read because we don't have asynchronous functions */
  73. int run; /* Ask to the thread if he can continue */
  74. starpu_pthread_mutex_t mutex; /* Mutex is used to protect work_list and if HDF5 library is not safe */
  75. starpu_pthread_cond_t cond;
  76. struct _starpu_hdf5_work_list work_list; /* This list contains the work for the hdf5 thread */
  77. };
  78. struct starpu_hdf5_obj
  79. {
  80. hid_t dataset; /* describe this object in HDF5 file */
  81. char * path; /* path where data are stored in HDF5 file */
  82. size_t size;
  83. };
  84. static inline void _starpu_hdf5_protect_start(void * base STARPU_ATTRIBUTE_UNUSED)
  85. {
  86. #ifndef H5_HAVE_THREADSAFE
  87. STARPU_PTHREAD_MUTEX_LOCK(&HDF5_VAR_MUTEX);
  88. #endif
  89. }
  90. static inline void _starpu_hdf5_protect_stop(void * base STARPU_ATTRIBUTE_UNUSED)
  91. {
  92. #ifndef H5_HAVE_THREADSAFE
  93. STARPU_PTHREAD_MUTEX_UNLOCK(&HDF5_VAR_MUTEX);
  94. #endif
  95. }
  96. /* ------------------ Functions for internal thread -------------------- */
  97. /* TODO : Dataspace may not be NATIVE_CHAR for opened data */
  98. static void starpu_hdf5_full_read_internal(struct _starpu_hdf5_work * work)
  99. {
  100. herr_t status;
  101. status = H5Dread(work->obj->dataset, H5T_NATIVE_CHAR, H5S_ALL, H5S_ALL, H5P_DEFAULT, work->ptr);
  102. STARPU_ASSERT_MSG(status >= 0, "Can not read data associed to this dataset (%s)\n", work->obj->path);
  103. }
  104. /* TODO : Dataspace may not be NATIVE_CHAR for opened data */
  105. static void starpu_hdf5_full_write_internal(struct _starpu_hdf5_work * work)
  106. {
  107. herr_t status;
  108. /* Update size of dataspace */
  109. if (work->size > work->obj->size)
  110. {
  111. /* Get official datatype */
  112. hid_t datatype = H5Dget_type(work->obj->dataset);
  113. hsize_t sizeDatatype = H5Tget_size(datatype);
  114. /* Count in number of elements */
  115. hsize_t extendsdim[1] = {work->size/sizeDatatype};
  116. status = H5Dset_extent (work->obj->dataset, extendsdim);
  117. STARPU_ASSERT_MSG(status >= 0, "Error when extending HDF5 dataspace !\n");
  118. work->obj->size = work->size;
  119. }
  120. /* Write ALL the dataspace */
  121. status = H5Dwrite(work->obj->dataset, H5T_NATIVE_CHAR, H5S_ALL, H5S_ALL, H5P_DEFAULT, work->ptr);
  122. STARPU_ASSERT_MSG(status >= 0, "Can not write data to this dataset (%s)\n", work->obj->path);
  123. }
  124. static void starpu_hdf5_read_internal(struct _starpu_hdf5_work * work)
  125. {
  126. herr_t status;
  127. /* Get official datatype */
  128. hid_t datatype = H5Dget_type(work->obj->dataset);
  129. hsize_t sizeDatatype = H5Tget_size(datatype);
  130. /* count in element, not in byte */
  131. work->offset /= sizeDatatype;
  132. work->size /= sizeDatatype;
  133. /* duplicate the dataspace in the dataset */
  134. hid_t dataspace_select = H5Dget_space(work->obj->dataset);
  135. STARPU_ASSERT_MSG(dataspace_select >= 0, "Error when reading this HDF5 dataset (%s)\n", work->obj->path);
  136. /* Select what we want of the duplicated dataspace (it's called an hyperslab). This operation is done on place */
  137. hsize_t offsets[1] = {work->offset};
  138. hsize_t count[1] = {work->size};
  139. /* stride and block size are NULL which is equivalent of a shift of 1 */
  140. status = H5Sselect_hyperslab(dataspace_select, H5S_SELECT_SET, offsets, NULL, count, NULL);
  141. STARPU_ASSERT_MSG(status >= 0, "Error when reading this HDF5 dataset (%s)\n", work->obj->path);
  142. /* create the dataspace for the received data which describes ptr */
  143. hsize_t dims_receive[1] = {work->size};
  144. hid_t dataspace_receive = H5Screate_simple(1, dims_receive, NULL);
  145. STARPU_ASSERT_MSG(dataspace_receive >= 0, "Error when reading this HDF5 dataset (%s)\n", work->obj->path);
  146. /* Receiver has to be an hyperslabs */
  147. offsets[0] = 0;
  148. count[0] = work->size;
  149. status = H5Sselect_hyperslab(dataspace_receive, H5S_SELECT_SET, offsets, NULL, count, NULL);
  150. STARPU_ASSERT_MSG(dataspace_receive >= 0, "Error when reading this HDF5 dataset (%s)\n", work->obj->path);
  151. status = H5Dread(work->obj->dataset, datatype, dataspace_receive, dataspace_select, H5P_DEFAULT, work->ptr);
  152. STARPU_ASSERT_MSG(status >= 0, "Error when reading this HDF5 dataset (%s)\n", work->obj->path);
  153. /* don't need these dataspaces */
  154. status = H5Sclose(dataspace_select);
  155. STARPU_ASSERT_MSG(status >= 0, "Error when reading this HDF5 dataset (%s)\n", work->obj->path);
  156. status = H5Sclose(dataspace_receive);
  157. STARPU_ASSERT_MSG(status >= 0, "Error when reading this HDF5 dataset (%s)\n", work->obj->path);
  158. }
  159. static void starpu_hdf5_write_internal(struct _starpu_hdf5_work * work)
  160. {
  161. herr_t status;
  162. /* Get official datatype */
  163. hid_t datatype = H5Dget_type(work->obj->dataset);
  164. hsize_t sizeDatatype = H5Tget_size(datatype);
  165. /* Update size of dataspace */
  166. if (work->size + work->offset > work->obj->size)
  167. {
  168. /* Count in number of elements */
  169. hsize_t extendsdim[1] = {(work->offset + work->size)/sizeDatatype};
  170. status = H5Dset_extent (work->obj->dataset, extendsdim);
  171. STARPU_ASSERT_MSG(status >= 0, "Error when extending HDF5 dataspace !\n");
  172. work->obj->size = work->offset + work->size;
  173. }
  174. /* count in element, not in byte */
  175. work->offset /= sizeDatatype;
  176. work->size /= sizeDatatype;
  177. /* duplicate the dataspace in the dataset */
  178. hid_t dataspace_select = H5Dget_space(work->obj->dataset);
  179. STARPU_ASSERT_MSG(dataspace_select >= 0, "Error when writing this HDF5 dataset (%s)\n", work->obj->path);
  180. /* Select what we want of the duplicated dataspace (it's called an hyperslab). This operation is done on place */
  181. hsize_t offsets[1] = {work->offset};
  182. hsize_t count[1] = {work->size};
  183. /* stride and block size are NULL which is equivalent of a shift of 1 */
  184. status = H5Sselect_hyperslab(dataspace_select, H5S_SELECT_SET, offsets, NULL, count, NULL);
  185. STARPU_ASSERT_MSG(status >= 0, "Error when writing this HDF5 dataset (%s)\n", work->obj->path);
  186. /* create the dataspace for the received data which describes ptr */
  187. hsize_t dims_send[1] = {work->size};
  188. hid_t dataspace_send = H5Screate_simple(1, dims_send, NULL);
  189. STARPU_ASSERT_MSG(dataspace_send >= 0, "Error when writing this HDF5 dataset (%s)\n", work->obj->path);
  190. /* Receiver has to be an hyperslabs */
  191. offsets[0] = 0;
  192. count[0] = work->size;
  193. status = H5Sselect_hyperslab(dataspace_send, H5S_SELECT_SET, offsets, NULL, count, NULL);
  194. STARPU_ASSERT_MSG(dataspace_send >= 0, "Error when writing this HDF5 dataset (%s)\n", work->obj->path);
  195. status = H5Dwrite(work->obj->dataset, datatype, dataspace_send, dataspace_select, H5P_DEFAULT, work->ptr);
  196. STARPU_ASSERT_MSG(status >= 0, "Error when writing this HDF5 dataset (%s)\n", work->obj->path);
  197. /* don't need these dataspaces */
  198. status = H5Sclose(dataspace_select);
  199. STARPU_ASSERT_MSG(status >= 0, "Error when writing this HDF5 dataset (%s)\n", work->obj->path);
  200. status = H5Sclose(dataspace_send);
  201. STARPU_ASSERT_MSG(status >= 0, "Error when writing this HDF5 dataset (%s)\n", work->obj->path);
  202. }
  203. static void * _starpu_hdf5_internal_thread(void * arg)
  204. {
  205. #ifdef H5_HAVE_THREADSAFE
  206. struct starpu_hdf5_base * fileBase = (struct starpu_hdf5_base *) arg;
  207. #endif
  208. while (HDF5_VAR_RUN || !_starpu_hdf5_work_list_empty(&HDF5_VAR_WORK_LIST))
  209. {
  210. STARPU_PTHREAD_MUTEX_LOCK(&HDF5_VAR_MUTEX);
  211. if (_starpu_hdf5_work_list_empty(&HDF5_VAR_WORK_LIST) && HDF5_VAR_RUN)
  212. STARPU_PTHREAD_COND_WAIT(&HDF5_VAR_COND, &HDF5_VAR_MUTEX);
  213. STARPU_PTHREAD_MUTEX_UNLOCK(&HDF5_VAR_MUTEX);
  214. /* We are the only consummer here, don't need to protect here */
  215. if (!_starpu_hdf5_work_list_empty(&HDF5_VAR_WORK_LIST))
  216. {
  217. STARPU_PTHREAD_MUTEX_LOCK(&HDF5_VAR_MUTEX);
  218. struct _starpu_hdf5_work * work = _starpu_hdf5_work_list_pop_back(&HDF5_VAR_WORK_LIST);
  219. STARPU_PTHREAD_MUTEX_UNLOCK(&HDF5_VAR_MUTEX);
  220. _starpu_hdf5_protect_start(work->base);
  221. switch(work->type)
  222. {
  223. case READ:
  224. starpu_hdf5_read_internal(work);
  225. break;
  226. case WRITE:
  227. starpu_hdf5_write_internal(work);
  228. break;
  229. case FULL_READ:
  230. starpu_hdf5_full_read_internal(work);
  231. break;
  232. case FULL_WRITE:
  233. starpu_hdf5_full_write_internal(work);
  234. break;
  235. default:
  236. STARPU_ABORT();
  237. }
  238. _starpu_hdf5_protect_stop(work->base);
  239. /* Update event to tell it's finished */
  240. starpu_sem_post((starpu_sem_t *) work->event);
  241. free(work);
  242. }
  243. }
  244. STARPU_PTHREAD_MUTEX_LOCK(&HDF5_VAR_MUTEX);
  245. STARPU_PTHREAD_COND_BROADCAST(&HDF5_VAR_COND);
  246. STARPU_PTHREAD_MUTEX_UNLOCK(&HDF5_VAR_MUTEX);
  247. return NULL;
  248. }
  249. static void _starpu_hdf5_create_thread(struct starpu_hdf5_base * fileBase)
  250. {
  251. _starpu_hdf5_work_list_init(&HDF5_VAR_WORK_LIST);
  252. HDF5_VAR_RUN = 1;
  253. STARPU_PTHREAD_COND_INIT(&HDF5_VAR_COND, NULL);
  254. STARPU_PTHREAD_CREATE(&HDF5_VAR_THREAD, NULL, _starpu_hdf5_internal_thread, (void *) fileBase);
  255. }
  256. /* returns the size in BYTES */
  257. static hsize_t _starpu_get_size_obj(struct starpu_hdf5_obj * obj)
  258. {
  259. herr_t status;
  260. hid_t dataspace = H5Dget_space(obj->dataset);
  261. STARPU_ASSERT_MSG(dataspace >= 0, "Can not get the size of this HDF5 dataset (%s)\n", obj->path);
  262. hsize_t dims[1];
  263. status = H5Sget_simple_extent_dims(dataspace, dims, NULL);
  264. STARPU_ASSERT_MSG(status >= 0, "Can not get the size of this HDF5 dataset (%s)\n", obj->path);
  265. hid_t datatype = H5Dget_type(obj->dataset);
  266. STARPU_ASSERT_MSG(datatype >= 0, "Can not get the size of this HDF5 dataset (%s)\n", obj->path);
  267. hsize_t sizeDatatype = H5Tget_size(datatype);
  268. STARPU_ASSERT_MSG(sizeDatatype > 0, "Can not get the size of this HDF5 dataset (%s)\n", obj->path);
  269. H5Sclose(dataspace);
  270. H5Tclose(datatype);
  271. return dims[0]*sizeDatatype;
  272. }
  273. static void starpu_hdf5_send_work(void *base, void *obj, void *buf, off_t offset, size_t size, void * event, enum hdf5_work_type type)
  274. {
  275. struct starpu_hdf5_obj * dataObj = (struct starpu_hdf5_obj *) obj;
  276. struct starpu_hdf5_base * fileBase = (struct starpu_hdf5_base *) base;
  277. struct _starpu_hdf5_work * work;
  278. _STARPU_MALLOC(work, sizeof(*work));
  279. work->type = type;
  280. work->obj = dataObj;
  281. work->base = fileBase;
  282. work->ptr = buf;
  283. work->offset = offset;
  284. work->size = size;
  285. work->event = event;
  286. STARPU_PTHREAD_MUTEX_LOCK(&HDF5_VAR_MUTEX);
  287. _starpu_hdf5_work_list_push_front(&HDF5_VAR_WORK_LIST, work);
  288. /* Wake up internal thread */
  289. STARPU_PTHREAD_COND_BROADCAST(&HDF5_VAR_COND);
  290. STARPU_PTHREAD_MUTEX_UNLOCK(&HDF5_VAR_MUTEX);
  291. }
  292. static struct starpu_hdf5_obj * _starpu_hdf5_data_alloc(struct starpu_hdf5_base * fileBase, char * name, size_t size)
  293. {
  294. struct starpu_hdf5_obj * obj;
  295. _STARPU_MALLOC(obj, sizeof(*obj));
  296. _starpu_hdf5_protect_start((void *) fileBase);
  297. /* create a dataspace with one dimension of size elements */
  298. hsize_t dim[1] = {size};
  299. hsize_t maxdim[1] = {H5S_UNLIMITED};
  300. hid_t dataspace = H5Screate_simple(1, dim, maxdim);
  301. if (dataspace < 0)
  302. {
  303. free(obj);
  304. return NULL;
  305. }
  306. hsize_t chunkdim[1] = {STARPU_CHUNK_DIM};
  307. hid_t prop = H5Pcreate (H5P_DATASET_CREATE);
  308. herr_t status = H5Pset_chunk (prop, 1, chunkdim);
  309. STARPU_ASSERT_MSG(status >= 0, "Error when setting HDF5 property \n");
  310. /* create a dataset at location name, with data described by the dataspace.
  311. * Each element are like char in C (expected one byte)
  312. */
  313. obj->dataset = H5Dcreate2(fileBase->fileID, name, H5T_NATIVE_CHAR, dataspace, H5P_DEFAULT, prop, H5P_DEFAULT);
  314. H5Sclose(dataspace);
  315. H5Pclose(prop);
  316. if (obj->dataset < 0)
  317. {
  318. free(obj);
  319. return NULL;
  320. }
  321. obj->path = name;
  322. obj->size = size;
  323. _starpu_hdf5_protect_stop((void *) fileBase);
  324. return obj;
  325. }
  326. static struct starpu_hdf5_obj * _starpu_hdf5_data_open(struct starpu_hdf5_base * fileBase, char * name, size_t size)
  327. {
  328. struct starpu_hdf5_obj * obj;
  329. _STARPU_MALLOC(obj, sizeof(*obj));
  330. _starpu_hdf5_protect_start((void *) fileBase);
  331. /* create a dataset at location name, with data described by the dataspace.
  332. * Each element are like char in C (expected one byte)
  333. */
  334. obj->dataset = H5Dopen2(fileBase->fileID, name, H5P_DEFAULT);
  335. _starpu_hdf5_protect_stop((void *) fileBase);
  336. if (obj->dataset < 0)
  337. {
  338. free(obj);
  339. return NULL;
  340. }
  341. obj->path = name;
  342. obj->size = size;
  343. return obj;
  344. }
  345. static void *starpu_hdf5_plug(void *parameter, starpu_ssize_t size STARPU_ATTRIBUTE_UNUSED)
  346. {
  347. struct starpu_hdf5_base * fileBase;
  348. _STARPU_MALLOC(fileBase, sizeof(struct starpu_hdf5_base));
  349. #ifndef H5_HAVE_THREADSAFE
  350. int actual_nb_disk = STARPU_ATOMIC_ADD(&nb_disk_open, 1);
  351. if (actual_nb_disk == 1)
  352. {
  353. #endif
  354. STARPU_PTHREAD_MUTEX_INIT(&HDF5_VAR_MUTEX, NULL);
  355. #ifndef H5_HAVE_THREADSAFE
  356. }
  357. else
  358. {
  359. while (!init_finished)
  360. ;
  361. }
  362. #endif
  363. _starpu_hdf5_protect_start(fileBase);
  364. struct stat buf;
  365. if (stat(parameter, &buf) != 0 || !S_ISREG(buf.st_mode))
  366. {
  367. /* The file doesn't exist or the directory exists => create the datafile */
  368. int id;
  369. _starpu_mkpath(parameter, S_IRWXU);
  370. fileBase->path = _starpu_mktemp(parameter, O_RDWR | O_BINARY, &id);
  371. if (!fileBase->path)
  372. {
  373. free(fileBase);
  374. _STARPU_ERROR("Can not create the HDF5 file (%s)", (char *) parameter);
  375. return NULL;
  376. }
  377. /* just use _starpu_mktemp_many to create a file, close the file descriptor */
  378. close(id);
  379. /* Truncate it */
  380. fileBase->fileID = H5Fcreate((char *)fileBase->path, H5F_ACC_TRUNC, H5P_DEFAULT, H5P_DEFAULT);
  381. if (fileBase->fileID < 0)
  382. {
  383. free(fileBase);
  384. _STARPU_ERROR("Can not create the HDF5 file (%s)", (char *) parameter);
  385. return NULL;
  386. }
  387. fileBase->created = 1;
  388. }
  389. else
  390. {
  391. /* Well, open it ! */
  392. char *path = strdup((char *)parameter);
  393. STARPU_ASSERT(path);
  394. fileBase->fileID = H5Fopen((char *)parameter, H5F_ACC_RDWR, H5P_DEFAULT);
  395. if (fileBase->fileID < 0)
  396. {
  397. free(fileBase);
  398. free(path);
  399. _STARPU_ERROR("Can not open the HDF5 file (%s)", (char *) parameter);
  400. return NULL;
  401. }
  402. fileBase->created = 0;
  403. fileBase->path = path;
  404. }
  405. #ifndef H5_HAVE_THREADSAFE
  406. if (actual_nb_disk == 1)
  407. {
  408. #endif
  409. _starpu_hdf5_create_thread(fileBase);
  410. #ifndef H5_HAVE_THREADSAFE
  411. init_finished = 1;
  412. }
  413. #endif
  414. #if H5_VERS_MAJOR > 1 || (H5_VERS_MAJOR == 1 && H5_VERS_MINOR > 10) || (H5_VERS_MAJOR == 1 && H5_VERS_MINOR == 10 && H5_VERS_RELEASE > 0)
  415. H5Pset_file_space_strategy(fileBase->fileID, H5F_FSPACE_STRATEGY_FSM_AGGR, 0, 0);
  416. #endif
  417. _starpu_hdf5_protect_stop(fileBase);
  418. fileBase->next_dataset_id = 0;
  419. return (void *) fileBase;
  420. }
  421. /* free memory allocated for the base */
  422. static void starpu_hdf5_unplug(void *base)
  423. {
  424. #ifndef H5_HAVE_THREADSAFE
  425. int actual_nb_disk = STARPU_ATOMIC_ADD(&nb_disk_open, -1);
  426. #endif
  427. struct starpu_hdf5_base * fileBase = (struct starpu_hdf5_base *) base;
  428. herr_t status;
  429. STARPU_PTHREAD_MUTEX_LOCK(&HDF5_VAR_MUTEX);
  430. #ifndef H5_HAVE_THREADSAFE
  431. if (actual_nb_disk == 0)
  432. {
  433. #endif
  434. HDF5_VAR_RUN = 0;
  435. STARPU_PTHREAD_COND_BROADCAST(&HDF5_VAR_COND);
  436. STARPU_PTHREAD_MUTEX_UNLOCK(&HDF5_VAR_MUTEX);
  437. STARPU_PTHREAD_JOIN(HDF5_VAR_THREAD, NULL);
  438. STARPU_PTHREAD_MUTEX_LOCK(&HDF5_VAR_MUTEX);
  439. STARPU_ASSERT(_starpu_hdf5_work_list_empty(&HDF5_VAR_WORK_LIST));
  440. /* the internal thread is deleted */
  441. #ifndef H5_HAVE_THREADSAFE
  442. }
  443. #endif
  444. status = H5Fclose(fileBase->fileID);
  445. STARPU_PTHREAD_MUTEX_UNLOCK(&HDF5_VAR_MUTEX);
  446. #ifndef H5_HAVE_THREADSAFE
  447. if (actual_nb_disk == 0)
  448. {
  449. #endif
  450. STARPU_PTHREAD_MUTEX_DESTROY(&HDF5_VAR_MUTEX);
  451. #ifndef H5_HAVE_THREADSAFE
  452. init_finished = 0;
  453. }
  454. #endif
  455. STARPU_ASSERT_MSG(status >= 0, "Can not unplug this HDF5 disk (%s)\n", fileBase->path);
  456. if (fileBase->created)
  457. {
  458. unlink(fileBase->path);
  459. }
  460. else
  461. {
  462. /* Warn user about repack, because unlink dataset doesn't delete data in file */
  463. _STARPU_DISP("This disk (%s) was used to store temporary data. You may use the h5repack command to reduce the size of the file... \n", fileBase->path);
  464. }
  465. free(fileBase->path);
  466. free(fileBase);
  467. }
  468. static void *starpu_hdf5_alloc(void *base, size_t size)
  469. {
  470. struct starpu_hdf5_base * fileBase = (struct starpu_hdf5_base *) base;
  471. struct starpu_hdf5_obj * obj;
  472. char * name;
  473. char * prefix = "STARPU_";
  474. char name_id[16];
  475. /* Save the name of the dataset */
  476. STARPU_PTHREAD_MUTEX_LOCK(&HDF5_VAR_MUTEX);
  477. snprintf(name_id, sizeof(name_id), "%u", fileBase->next_dataset_id);
  478. fileBase->next_dataset_id++;
  479. STARPU_PTHREAD_MUTEX_UNLOCK(&HDF5_VAR_MUTEX);
  480. /* name in HDF5 is like a path */
  481. _STARPU_MALLOC(name, 1+strlen(prefix)+strlen(name_id)+1);
  482. snprintf(name, 1+strlen(prefix)+strlen(name_id)+1, "/%s%s", prefix, name_id);
  483. obj = _starpu_hdf5_data_alloc(fileBase, name, size);
  484. if (!obj)
  485. {
  486. free(name);
  487. }
  488. return (void *) obj;
  489. }
  490. static void starpu_hdf5_free(void *base, void *obj, size_t size STARPU_ATTRIBUTE_UNUSED)
  491. {
  492. struct starpu_hdf5_base * fileBase = (struct starpu_hdf5_base *) base;
  493. struct starpu_hdf5_obj * dataObj = (struct starpu_hdf5_obj *) obj;
  494. herr_t status;
  495. _starpu_hdf5_protect_start(base);
  496. status = H5Dclose(dataObj->dataset);
  497. STARPU_ASSERT_MSG(status >= 0, "Can not free this HDF5 dataset (%s)\n", dataObj->path);
  498. /* remove the dataset link in the HDF5
  499. * But it doesn't delete the space in the file */
  500. status = H5Ldelete(fileBase->fileID, dataObj->path, H5P_DEFAULT);
  501. STARPU_ASSERT_MSG(status >= 0, "Can not delete the link associed to this dataset (%s)\n", dataObj->path);
  502. _starpu_hdf5_protect_stop(base);
  503. free(dataObj->path);
  504. free(dataObj);
  505. }
  506. static void *starpu_hdf5_open(void *base, void *pos, size_t size)
  507. {
  508. struct starpu_hdf5_base * fileBase = (struct starpu_hdf5_base *) base;
  509. struct starpu_hdf5_obj * obj;
  510. char *name;
  511. name = strdup((char *)pos);
  512. STARPU_ASSERT(name);
  513. obj = _starpu_hdf5_data_open(fileBase, name, size);
  514. if (!obj)
  515. {
  516. free(name);
  517. }
  518. return (void *) obj;
  519. }
  520. static void starpu_hdf5_close(void *base, void *obj, size_t size STARPU_ATTRIBUTE_UNUSED)
  521. {
  522. struct starpu_hdf5_obj * dataObj = (struct starpu_hdf5_obj *) obj;
  523. herr_t status;
  524. _starpu_hdf5_protect_start(base);
  525. status = H5Dclose(dataObj->dataset);
  526. STARPU_ASSERT_MSG(status >= 0, "Can not close this HDF5 dataset (%s)\n", dataObj->path);
  527. _starpu_hdf5_protect_stop(base);
  528. free(dataObj->path);
  529. free(dataObj);
  530. }
  531. static void starpu_hdf5_wait(void * event)
  532. {
  533. starpu_sem_t * finished = (starpu_sem_t *) event;
  534. starpu_sem_wait(finished);
  535. }
  536. static int starpu_hdf5_test(void * event)
  537. {
  538. starpu_sem_t * finished = (starpu_sem_t *) event;
  539. return starpu_sem_trywait(finished) == 0;
  540. }
  541. static int starpu_hdf5_full_read(void *base, void *obj, void **ptr, size_t *size)
  542. {
  543. struct starpu_hdf5_obj * dataObj = (struct starpu_hdf5_obj *) obj;
  544. starpu_sem_t finished;
  545. starpu_sem_init(&finished, 0, 0);
  546. _starpu_hdf5_protect_start(base);
  547. *size = _starpu_get_size_obj(dataObj);
  548. _starpu_hdf5_protect_stop(base);
  549. starpu_malloc_flags(ptr, *size, 0);
  550. starpu_hdf5_send_work(base, obj, *ptr, 0, *size, (void*) &finished, FULL_READ);
  551. starpu_hdf5_wait(&finished);
  552. starpu_sem_destroy(&finished);
  553. return 0;
  554. }
  555. static int starpu_hdf5_full_write(void *base, void *obj, void *ptr, size_t size)
  556. {
  557. starpu_sem_t finished;
  558. starpu_sem_init(&finished, 0, 0);
  559. starpu_hdf5_send_work(base, obj, ptr, 0, size, (void*) &finished, FULL_WRITE);
  560. starpu_hdf5_wait(&finished);
  561. starpu_sem_destroy(&finished);
  562. return 0;
  563. }
  564. static int starpu_hdf5_read(void *base, void *obj, void *buf, off_t offset, size_t size)
  565. {
  566. starpu_sem_t finished;
  567. starpu_sem_init(&finished, 0, 0);
  568. starpu_hdf5_send_work(base, obj, buf, offset, size, (void*) &finished, READ);
  569. starpu_hdf5_wait(&finished);
  570. starpu_sem_destroy(&finished);
  571. return 0;
  572. }
  573. static int starpu_hdf5_write(void *base, void *obj, const void *buf, off_t offset, size_t size)
  574. {
  575. starpu_sem_t finished;
  576. starpu_sem_init(&finished, 0, 0);
  577. starpu_hdf5_send_work(base, obj, (void *) buf, offset, size, (void*) &finished, WRITE);
  578. starpu_hdf5_wait(&finished);
  579. starpu_sem_destroy(&finished);
  580. return 0;
  581. }
  582. static void * starpu_hdf5_async_read(void *base, void *obj, void *buf, off_t offset, size_t size)
  583. {
  584. starpu_sem_t * finished;
  585. _STARPU_MALLOC(finished, sizeof(*finished));
  586. starpu_sem_init(finished, 0, 0);
  587. starpu_hdf5_send_work(base, obj, buf, offset, size, (void*) finished, READ);
  588. return finished;
  589. }
  590. static void * starpu_hdf5_async_write(void *base, void *obj, void *buf, off_t offset, size_t size)
  591. {
  592. starpu_sem_t * finished;
  593. _STARPU_MALLOC(finished, sizeof(*finished));
  594. starpu_sem_init(finished, 0, 0);
  595. starpu_hdf5_send_work(base, obj, (void *) buf, offset, size, (void*) finished, WRITE);
  596. return finished;
  597. }
  598. static void starpu_hdf5_free_request(void * event)
  599. {
  600. starpu_sem_destroy(event);
  601. free(event);
  602. }
  603. static int get_hdf5_bandwidth_between_disk_and_main_ram(unsigned node, void *base)
  604. {
  605. unsigned iter;
  606. double timing_slowness, timing_latency;
  607. double start;
  608. double end;
  609. char *buf;
  610. struct starpu_hdf5_base * fileBase = (struct starpu_hdf5_base *) base;
  611. srand(time(NULL));
  612. starpu_malloc_flags((void **) &buf, STARPU_DISK_SIZE_MIN, 0);
  613. STARPU_ASSERT(buf != NULL);
  614. /* allocate memory */
  615. void *mem = _starpu_disk_alloc(node, STARPU_DISK_SIZE_MIN);
  616. /* fail to alloc */
  617. if (mem == NULL)
  618. return 0;
  619. memset(buf, 0, STARPU_DISK_SIZE_MIN);
  620. /* Measure upload slowness */
  621. start = starpu_timing_now();
  622. for (iter = 0; iter < NITER; ++iter)
  623. {
  624. _starpu_disk_write(STARPU_MAIN_RAM, node, mem, buf, 0, STARPU_DISK_SIZE_MIN, NULL);
  625. }
  626. end = starpu_timing_now();
  627. timing_slowness = end - start;
  628. /* free memory */
  629. starpu_free_flags(buf, STARPU_DISK_SIZE_MIN, 0);
  630. starpu_malloc_flags((void**) &buf, sizeof(char), 0);
  631. STARPU_ASSERT(buf != NULL);
  632. *buf = 0;
  633. /* Measure latency */
  634. start = starpu_timing_now();
  635. for (iter = 0; iter < NITER; ++iter)
  636. {
  637. _starpu_disk_write(STARPU_MAIN_RAM, node, mem, buf, rand() % (STARPU_DISK_SIZE_MIN -1) , 1, NULL);
  638. }
  639. end = starpu_timing_now();
  640. timing_latency = end - start;
  641. _starpu_disk_free(node, mem, STARPU_DISK_SIZE_MIN);
  642. starpu_free_flags(buf, sizeof(char), 0);
  643. _starpu_save_bandwidth_and_latency_disk((NITER/timing_slowness)*STARPU_DISK_SIZE_MIN, (NITER/timing_slowness)*STARPU_DISK_SIZE_MIN,
  644. timing_latency/NITER, timing_latency/NITER, node, fileBase->path);
  645. return 1;
  646. }
  647. struct starpu_disk_ops starpu_disk_hdf5_ops =
  648. {
  649. .alloc = starpu_hdf5_alloc,
  650. .free = starpu_hdf5_free,
  651. .open = starpu_hdf5_open,
  652. .close = starpu_hdf5_close,
  653. .read = starpu_hdf5_read,
  654. .write = starpu_hdf5_write,
  655. .plug = starpu_hdf5_plug,
  656. .unplug = starpu_hdf5_unplug,
  657. .copy = NULL,
  658. .bandwidth = get_hdf5_bandwidth_between_disk_and_main_ram,
  659. .full_read = starpu_hdf5_full_read,
  660. .full_write = starpu_hdf5_full_write,
  661. .async_read = starpu_hdf5_async_read,
  662. .async_write = starpu_hdf5_async_write,
  663. .wait_request = starpu_hdf5_wait,
  664. .test_request = starpu_hdf5_test,
  665. .free_request = starpu_hdf5_free_request
  666. };