disk_hdf5.c 26 KB


  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2017 Inria
  4. *
  5. * StarPU is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * StarPU is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <fcntl.h>
  17. #include <errno.h>
  18. #include <common/config.h>
  19. #ifdef HAVE_UNISTD_H
  20. #include <unistd.h>
  21. #endif
  22. #include <time.h>
  23. #include <hdf5.h>
  24. #include <starpu.h>
  25. #include <core/disk.h>
  26. #include <core/perfmodel/perfmodel.h>
  27. #ifndef O_BINARY
  28. #define O_BINARY 0
  29. #endif
  30. #define NITER _starpu_calibration_minimum
  31. /* TODO: support disk-to-disk copy with HD5Ocopy */
  32. /* ------------------- use HDF5 to write on disk ------------------- */
  33. #ifndef H5_HAVE_THREADSAFE
  34. static int nb_disk_open = 0;
  35. static volatile int init_finished = 0;
  36. static starpu_pthread_t global_thread; /* This thread will perform each write/read because we don't have asynchronous functions */
  37. static volatile int global_run; /* Ask to the thread if he can continue */
  38. static starpu_pthread_mutex_t global_mutex; /* Mutex is used to protect work_list and if HDF5 library is not safe */
  39. static starpu_pthread_cond_t global_cond;
  40. static struct _starpu_hdf5_work_list * global_work_list; /* This list contains the work for the hdf5 thread */
  41. #endif
  42. #ifdef H5_HAVE_THREADSAFE
  43. #define HDF5_VAR_THREAD fileBase->thread
  44. #define HDF5_VAR_RUN fileBase->run
  45. #define HDF5_VAR_MUTEX fileBase->mutex
  46. #define HDF5_VAR_COND fileBase->cond
  47. #define HDF5_VAR_WORK_LIST fileBase->work_list
  48. #else
  49. #define HDF5_VAR_THREAD global_thread
  50. #define HDF5_VAR_RUN global_run
  51. #define HDF5_VAR_MUTEX global_mutex
  52. #define HDF5_VAR_COND global_cond
  53. #define HDF5_VAR_WORK_LIST global_work_list
  54. #endif
  55. enum hdf5_work_type { READ, WRITE, FULL_READ, FULL_WRITE };
  56. LIST_TYPE(_starpu_hdf5_work,
  57. enum hdf5_work_type type;
  58. struct starpu_hdf5_obj * obj;
  59. struct starpu_hdf5_base * base;
  60. void * ptr;
  61. off_t offset;
  62. size_t size;
  63. void * event;
  64. );
  65. struct starpu_hdf5_base
  66. {
  67. hid_t fileID;
  68. char * path;
  69. unsigned created; /* StarPU creates the HDF5 file */
  70. unsigned next_dataset_id;
  71. starpu_pthread_t thread; /* This thread will perform each write/read because we don't have asynchronous functions */
  72. int run; /* Ask to the thread if he can continue */
  73. starpu_pthread_mutex_t mutex; /* Mutex is used to protect work_list and if HDF5 library is not safe */
  74. starpu_pthread_cond_t cond;
  75. struct _starpu_hdf5_work_list * work_list; /* This list contains the work for the hdf5 thread */
  76. };
  77. struct starpu_hdf5_obj
  78. {
  79. hid_t dataset; /* describe this object in HDF5 file */
  80. char * path; /* path where data are stored in HDF5 file */
  81. };
  82. static inline void _starpu_hdf5_protect_start(void * base STARPU_ATTRIBUTE_UNUSED)
  83. {
  84. #ifndef H5_HAVE_THREADSAFE
  85. STARPU_PTHREAD_MUTEX_LOCK(&HDF5_VAR_MUTEX);
  86. #endif
  87. }
  88. static inline void _starpu_hdf5_protect_stop(void * base STARPU_ATTRIBUTE_UNUSED)
  89. {
  90. #ifndef H5_HAVE_THREADSAFE
  91. STARPU_PTHREAD_MUTEX_UNLOCK(&HDF5_VAR_MUTEX);
  92. #endif
  93. }
  94. /* ------------------ Functions for internal thread -------------------- */
  95. static void starpu_hdf5_full_read_internal(struct _starpu_hdf5_work * work)
  96. {
  97. herr_t status;
  98. status = H5Dread(work->obj->dataset, H5T_NATIVE_CHAR, H5S_ALL, H5S_ALL, H5P_DEFAULT, work->ptr);
  99. STARPU_ASSERT_MSG(status >= 0, "Can not read data associed to this dataset (%s)\n", work->obj->path);
  100. }
  101. static void starpu_hdf5_full_write_internal(struct _starpu_hdf5_work * work)
  102. {
  103. herr_t status;
  104. /* Write ALL the dataspace */
  105. status = H5Dwrite(work->obj->dataset, H5T_NATIVE_CHAR, H5S_ALL, H5S_ALL, H5P_DEFAULT, work->ptr);
  106. STARPU_ASSERT_MSG(status >= 0, "Can not write data to this dataset (%s)\n", work->obj->path);
  107. }
  108. static void starpu_hdf5_read_internal(struct _starpu_hdf5_work * work)
  109. {
  110. herr_t status;
  111. /* Get official datatype */
  112. hid_t datatype = H5Dget_type(work->obj->dataset);
  113. hsize_t sizeDatatype = H5Tget_size(datatype);
  114. /* count in element, not in byte */
  115. work->offset /= sizeDatatype;
  116. work->size /= sizeDatatype;
  117. /* duplicate the dataspace in the dataset */
  118. hid_t dataspace_select = H5Dget_space(work->obj->dataset);
  119. STARPU_ASSERT_MSG(dataspace_select >= 0, "Error when reading this HDF5 dataset (%s)\n", work->obj->path);
  120. /* Select what we want of the duplicated dataspace (it's called an hyperslab). This operation is done on place */
  121. hsize_t offsets[1] = {work->offset};
  122. hsize_t count[1] = {work->size};
  123. /* stride and block size are NULL which is equivalent of a shift of 1 */
  124. status = H5Sselect_hyperslab(dataspace_select, H5S_SELECT_SET, offsets, NULL, count, NULL);
  125. STARPU_ASSERT_MSG(status >= 0, "Error when reading this HDF5 dataset (%s)\n", work->obj->path);
  126. /* create the dataspace for the received data which describes ptr */
  127. hsize_t dims_receive[1] = {work->size};
  128. hid_t dataspace_receive = H5Screate_simple(1, dims_receive, NULL);
  129. STARPU_ASSERT_MSG(dataspace_receive >= 0, "Error when reading this HDF5 dataset (%s)\n", work->obj->path);
  130. /* Receiver has to be an hyperslabs */
  131. offsets[0] = 0;
  132. count[0] = work->size;
  133. status = H5Sselect_hyperslab(dataspace_receive, H5S_SELECT_SET, offsets, NULL, count, NULL);
  134. STARPU_ASSERT_MSG(dataspace_receive >= 0, "Error when reading this HDF5 dataset (%s)\n", work->obj->path);
  135. status = H5Dread(work->obj->dataset, datatype, dataspace_receive, dataspace_select, H5P_DEFAULT, work->ptr);
  136. STARPU_ASSERT_MSG(status >= 0, "Error when reading this HDF5 dataset (%s)\n", work->obj->path);
  137. /* don't need these dataspaces */
  138. status = H5Sclose(dataspace_select);
  139. STARPU_ASSERT_MSG(status >= 0, "Error when reading this HDF5 dataset (%s)\n", work->obj->path);
  140. status = H5Sclose(dataspace_receive);
  141. STARPU_ASSERT_MSG(status >= 0, "Error when reading this HDF5 dataset (%s)\n", work->obj->path);
  142. }
  143. static void starpu_hdf5_write_internal(struct _starpu_hdf5_work * work)
  144. {
  145. herr_t status;
  146. /* Get official datatype */
  147. hid_t datatype = H5Dget_type(work->obj->dataset);
  148. hsize_t sizeDatatype = H5Tget_size(datatype);
  149. /* count in element, not in byte */
  150. work->offset /= sizeDatatype;
  151. work->size /= sizeDatatype;
  152. /* duplicate the dataspace in the dataset */
  153. hid_t dataspace_select = H5Dget_space(work->obj->dataset);
  154. STARPU_ASSERT_MSG(dataspace_select >= 0, "Error when writing this HDF5 dataset (%s)\n", work->obj->path);
  155. /* Select what we want of the duplicated dataspace (it's called an hyperslab). This operation is done on place */
  156. hsize_t offsets[1] = {work->offset};
  157. hsize_t count[1] = {work->size};
  158. /* stride and block size are NULL which is equivalent of a shift of 1 */
  159. status = H5Sselect_hyperslab(dataspace_select, H5S_SELECT_SET, offsets, NULL, count, NULL);
  160. STARPU_ASSERT_MSG(status >= 0, "Error when writing this HDF5 dataset (%s)\n", work->obj->path);
  161. /* create the dataspace for the received data which describes ptr */
  162. hsize_t dims_send[1] = {work->size};
  163. hid_t dataspace_send = H5Screate_simple(1, dims_send, NULL);
  164. STARPU_ASSERT_MSG(dataspace_send >= 0, "Error when writing this HDF5 dataset (%s)\n", work->obj->path);
  165. /* Receiver has to be an hyperslabs */
  166. offsets[0] = 0;
  167. count[0] = work->size;
  168. status = H5Sselect_hyperslab(dataspace_send, H5S_SELECT_SET, offsets, NULL, count, NULL);
  169. STARPU_ASSERT_MSG(dataspace_send >= 0, "Error when writing this HDF5 dataset (%s)\n", work->obj->path);
  170. status = H5Dwrite(work->obj->dataset, datatype, dataspace_send, dataspace_select, H5P_DEFAULT, work->ptr);
  171. STARPU_ASSERT_MSG(status >= 0, "Error when writing this HDF5 dataset (%s)\n", work->obj->path);
  172. /* don't need these dataspaces */
  173. status = H5Sclose(dataspace_select);
  174. STARPU_ASSERT_MSG(status >= 0, "Error when writing this HDF5 dataset (%s)\n", work->obj->path);
  175. status = H5Sclose(dataspace_send);
  176. STARPU_ASSERT_MSG(status >= 0, "Error when writing this HDF5 dataset (%s)\n", work->obj->path);
  177. }
  178. static void * _starpu_hdf5_internal_thread(void * arg)
  179. {
  180. #ifdef H5_HAVE_THREADSAFE
  181. struct starpu_hdf5_base * fileBase = (struct starpu_hdf5_base *) arg;
  182. #endif
  183. while (HDF5_VAR_RUN || !_starpu_hdf5_work_list_empty(HDF5_VAR_WORK_LIST))
  184. {
  185. STARPU_PTHREAD_MUTEX_LOCK(&HDF5_VAR_MUTEX);
  186. if (_starpu_hdf5_work_list_empty(HDF5_VAR_WORK_LIST) && HDF5_VAR_RUN)
  187. STARPU_PTHREAD_COND_WAIT(&HDF5_VAR_COND, &HDF5_VAR_MUTEX);
  188. STARPU_PTHREAD_MUTEX_UNLOCK(&HDF5_VAR_MUTEX);
  189. /* We are the only consummer here, don't need to protect here */
  190. if (!_starpu_hdf5_work_list_empty(HDF5_VAR_WORK_LIST))
  191. {
  192. STARPU_PTHREAD_MUTEX_LOCK(&HDF5_VAR_MUTEX);
  193. struct _starpu_hdf5_work * work = _starpu_hdf5_work_list_pop_back(HDF5_VAR_WORK_LIST);
  194. STARPU_PTHREAD_MUTEX_UNLOCK(&HDF5_VAR_MUTEX);
  195. _starpu_hdf5_protect_start(work->base);
  196. switch(work->type)
  197. {
  198. case READ:
  199. starpu_hdf5_read_internal(work);
  200. break;
  201. case WRITE:
  202. starpu_hdf5_write_internal(work);
  203. break;
  204. case FULL_READ:
  205. starpu_hdf5_full_read_internal(work);
  206. break;
  207. case FULL_WRITE:
  208. starpu_hdf5_full_write_internal(work);
  209. break;
  210. default:
  211. STARPU_ABORT();
  212. }
  213. _starpu_hdf5_protect_stop(work->base);
  214. /* Update event to tell it's finished */
  215. starpu_sem_post((starpu_sem_t *) work->event);
  216. free(work);
  217. }
  218. }
  219. STARPU_PTHREAD_MUTEX_LOCK(&HDF5_VAR_MUTEX);
  220. STARPU_PTHREAD_COND_BROADCAST(&HDF5_VAR_COND);
  221. STARPU_PTHREAD_MUTEX_UNLOCK(&HDF5_VAR_MUTEX);
  222. return NULL;
  223. }
  224. static void _starpu_hdf5_create_thread(struct starpu_hdf5_base * fileBase)
  225. {
  226. HDF5_VAR_WORK_LIST = _starpu_hdf5_work_list_new();
  227. HDF5_VAR_RUN = 1;
  228. STARPU_PTHREAD_COND_INIT(&HDF5_VAR_COND, NULL);
  229. STARPU_PTHREAD_CREATE(&HDF5_VAR_THREAD, NULL, _starpu_hdf5_internal_thread, (void *) fileBase);
  230. }
  231. /* returns the size in BYTES */
  232. static hsize_t _starpu_get_size_obj(struct starpu_hdf5_obj * obj)
  233. {
  234. herr_t status;
  235. hid_t dataspace = H5Dget_space(obj->dataset);
  236. STARPU_ASSERT_MSG(dataspace >= 0, "Can not get the size of this HDF5 dataset (%s)\n", obj->path);
  237. hsize_t dims[1];
  238. status = H5Sget_simple_extent_dims(dataspace, dims, NULL);
  239. STARPU_ASSERT_MSG(status >= 0, "Can not get the size of this HDF5 dataset (%s)\n", obj->path);
  240. hid_t datatype = H5Dget_type(obj->dataset);
  241. STARPU_ASSERT_MSG(datatype >= 0, "Can not get the size of this HDF5 dataset (%s)\n", obj->path);
  242. hsize_t sizeDatatype = H5Tget_size(datatype);
  243. STARPU_ASSERT_MSG(sizeDatatype > 0, "Can not get the size of this HDF5 dataset (%s)\n", obj->path);
  244. H5Sclose(dataspace);
  245. H5Tclose(datatype);
  246. return dims[0]*sizeDatatype;
  247. }
  248. static void starpu_hdf5_send_work(void *base, void *obj, void *buf, off_t offset, size_t size, void * event, enum hdf5_work_type type)
  249. {
  250. struct starpu_hdf5_obj * dataObj = (struct starpu_hdf5_obj *) obj;
  251. struct starpu_hdf5_base * fileBase = (struct starpu_hdf5_base *) base;
  252. struct _starpu_hdf5_work * work;
  253. _STARPU_MALLOC(work, sizeof(*work));
  254. work->type = type;
  255. work->obj = dataObj;
  256. work->base = fileBase;
  257. work->ptr = buf;
  258. work->offset = offset;
  259. work->size = size;
  260. work->event = event;
  261. STARPU_PTHREAD_MUTEX_LOCK(&HDF5_VAR_MUTEX);
  262. _starpu_hdf5_work_list_push_front(HDF5_VAR_WORK_LIST, work);
  263. /* Wake up internal thread */
  264. STARPU_PTHREAD_COND_BROADCAST(&HDF5_VAR_COND);
  265. STARPU_PTHREAD_MUTEX_UNLOCK(&HDF5_VAR_MUTEX);
  266. }
  267. static struct starpu_hdf5_obj * _starpu_hdf5_data_alloc(struct starpu_hdf5_base * fileBase, char * name, size_t size)
  268. {
  269. struct starpu_hdf5_obj * obj;
  270. _STARPU_MALLOC(obj, sizeof(*obj));
  271. _starpu_hdf5_protect_start((void *) fileBase);
  272. /* create a dataspace with one dimension of size elements */
  273. hsize_t dim[1] = {size};
  274. hid_t dataspace = H5Screate_simple(1, dim, NULL);
  275. if (dataspace < 0)
  276. {
  277. free(obj);
  278. return NULL;
  279. }
  280. /* create a dataset at location name, with data described by the dataspace.
  281. * Each element are like char in C (expected one byte)
  282. */
  283. obj->dataset = H5Dcreate2(fileBase->fileID, name, H5T_NATIVE_CHAR, dataspace, H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT);
  284. H5Sclose(dataspace);
  285. if (obj->dataset < 0)
  286. {
  287. free(obj);
  288. return NULL;
  289. }
  290. obj->path = name;
  291. _starpu_hdf5_protect_stop((void *) fileBase);
  292. return obj;
  293. }
  294. static struct starpu_hdf5_obj * _starpu_hdf5_data_open(struct starpu_hdf5_base * fileBase, char * name, size_t size STARPU_ATTRIBUTE_UNUSED)
  295. {
  296. struct starpu_hdf5_obj * obj;
  297. _STARPU_MALLOC(obj, sizeof(*obj));
  298. _starpu_hdf5_protect_start((void *) fileBase);
  299. /* create a dataset at location name, with data described by the dataspace.
  300. * Each element are like char in C (expected one byte)
  301. */
  302. obj->dataset = H5Dopen2(fileBase->fileID, name, H5P_DEFAULT);
  303. _starpu_hdf5_protect_stop((void *) fileBase);
  304. if (obj->dataset < 0)
  305. {
  306. free(obj);
  307. return NULL;
  308. }
  309. obj->path = name;
  310. return obj;
  311. }
  312. static void *starpu_hdf5_plug(void *parameter, starpu_ssize_t size STARPU_ATTRIBUTE_UNUSED)
  313. {
  314. struct starpu_hdf5_base * fileBase;
  315. _STARPU_MALLOC(fileBase, sizeof(struct starpu_hdf5_base));
  316. #ifndef H5_HAVE_THREADSAFE
  317. int actual_nb_disk = STARPU_ATOMIC_ADD(&nb_disk_open, 1);
  318. if (actual_nb_disk == 1)
  319. {
  320. #endif
  321. STARPU_PTHREAD_MUTEX_INIT(&HDF5_VAR_MUTEX, NULL);
  322. #ifndef H5_HAVE_THREADSAFE
  323. }
  324. else
  325. {
  326. while (!init_finished)
  327. ;
  328. }
  329. #endif
  330. _starpu_hdf5_protect_start(fileBase);
  331. struct stat buf;
  332. if (stat(parameter, &buf) != 0 || !S_ISREG(buf.st_mode))
  333. {
  334. /* The file doesn't exist or the directory exists => create the datafile */
  335. int id;
  336. fileBase->path = _starpu_mktemp_many(parameter, 0, O_RDWR | O_BINARY, &id);
  337. if (!fileBase->path)
  338. {
  339. free(fileBase);
  340. _STARPU_ERROR("Can not create the HDF5 file (%s)", (char *) parameter);
  341. return NULL;
  342. }
  343. /* just use _starpu_mktemp_many to create a file, close the file descriptor */
  344. close(id);
  345. /* Truncate it */
  346. fileBase->fileID = H5Fcreate((char *)fileBase->path, H5F_ACC_TRUNC, H5P_DEFAULT, H5P_DEFAULT);
  347. if (fileBase->fileID < 0)
  348. {
  349. free(fileBase);
  350. _STARPU_ERROR("Can not create the HDF5 file (%s)", (char *) parameter);
  351. return NULL;
  352. }
  353. fileBase->created = 1;
  354. }
  355. else
  356. {
  357. /* Well, open it ! */
  358. char *path = strdup((char *)parameter);
  359. STARPU_ASSERT(path);
  360. fileBase->fileID = H5Fopen((char *)parameter, H5F_ACC_RDWR, H5P_DEFAULT);
  361. if (fileBase->fileID < 0)
  362. {
  363. free(fileBase);
  364. free(path);
  365. _STARPU_ERROR("Can not open the HDF5 file (%s)", (char *) parameter);
  366. return NULL;
  367. }
  368. fileBase->created = 0;
  369. fileBase->path = path;
  370. }
  371. #ifndef H5_HAVE_THREADSAFE
  372. if (actual_nb_disk == 1)
  373. {
  374. #endif
  375. _starpu_hdf5_create_thread(fileBase);
  376. #ifndef H5_HAVE_THREADSAFE
  377. init_finished = 1;
  378. }
  379. #endif
  380. #if H5_VERS_MAJOR > 1 || (H5_VERS_MAJOR == 1 && H5_VERS_MINOR > 10) || (H5_VERS_MAJOR == 1 && H5_VERS_MINOR == 10 && H5_VERS_RELEASE > 0)
  381. H5Pset_file_space_strategy(fileBase->fileID, H5F_FSPACE_STRATEGY_FSM_AGGR, 0, 0);
  382. #endif
  383. _starpu_hdf5_protect_stop(fileBase);
  384. fileBase->next_dataset_id = 0;
  385. return (void *) fileBase;
  386. }
  387. /* free memory allocated for the base */
  388. static void starpu_hdf5_unplug(void *base)
  389. {
  390. #ifndef H5_HAVE_THREADSAFE
  391. int actual_nb_disk = STARPU_ATOMIC_ADD(&nb_disk_open, -1);
  392. #endif
  393. struct starpu_hdf5_base * fileBase = (struct starpu_hdf5_base *) base;
  394. herr_t status;
  395. STARPU_PTHREAD_MUTEX_LOCK(&HDF5_VAR_MUTEX);
  396. #ifndef H5_HAVE_THREADSAFE
  397. if (actual_nb_disk == 0)
  398. {
  399. #endif
  400. HDF5_VAR_RUN = 0;
  401. STARPU_PTHREAD_COND_BROADCAST(&HDF5_VAR_COND);
  402. STARPU_PTHREAD_COND_WAIT(&HDF5_VAR_COND, &HDF5_VAR_MUTEX);
  403. _starpu_hdf5_work_list_delete(HDF5_VAR_WORK_LIST);
  404. /* the internal thread is deleted */
  405. #ifndef H5_HAVE_THREADSAFE
  406. }
  407. #endif
  408. status = H5Fclose(fileBase->fileID);
  409. STARPU_PTHREAD_MUTEX_UNLOCK(&HDF5_VAR_MUTEX);
  410. #ifndef H5_HAVE_THREADSAFE
  411. if (actual_nb_disk == 0)
  412. {
  413. #endif
  414. STARPU_PTHREAD_MUTEX_DESTROY(&HDF5_VAR_MUTEX);
  415. #ifndef H5_HAVE_THREADSAFE
  416. init_finished = 0;
  417. }
  418. #endif
  419. STARPU_ASSERT_MSG(status >= 0, "Can not unplug this HDF5 disk (%s)\n", fileBase->path);
  420. if (fileBase->created)
  421. {
  422. unlink(fileBase->path);
  423. }
  424. else
  425. {
  426. /* Warn user about repack, because unlink dataset doesn't delete data in file */
  427. _STARPU_DISP("This disk (%s) was used to store temporary data. You may use the h5repack command to reduce the size of the file... \n", fileBase->path);
  428. }
  429. free(fileBase->path);
  430. free(fileBase);
  431. }
  432. static void *starpu_hdf5_alloc(void *base, size_t size)
  433. {
  434. struct starpu_hdf5_base * fileBase = (struct starpu_hdf5_base *) base;
  435. struct starpu_hdf5_obj * obj;
  436. char * name;
  437. char * prefix = "STARPU_";
  438. char name_id[16];
  439. /* Save the name of the dataset */
  440. STARPU_PTHREAD_MUTEX_LOCK(&HDF5_VAR_MUTEX);
  441. snprintf(name_id, sizeof(name_id), "%u", fileBase->next_dataset_id);
  442. fileBase->next_dataset_id++;
  443. STARPU_PTHREAD_MUTEX_UNLOCK(&HDF5_VAR_MUTEX);
  444. /* name in HDF5 is like a path */
  445. _STARPU_MALLOC(name, 1+strlen(prefix)+strlen(name_id)+1);
  446. snprintf(name, 1+strlen(prefix)+strlen(name_id)+1, "/%s%s", prefix, name_id);
  447. obj = _starpu_hdf5_data_alloc(fileBase, name, size);
  448. if (!obj)
  449. {
  450. free(name);
  451. }
  452. return (void *) obj;
  453. }
  454. static void starpu_hdf5_free(void *base, void *obj, size_t size STARPU_ATTRIBUTE_UNUSED)
  455. {
  456. struct starpu_hdf5_base * fileBase = (struct starpu_hdf5_base *) base;
  457. struct starpu_hdf5_obj * dataObj = (struct starpu_hdf5_obj *) obj;
  458. herr_t status;
  459. _starpu_hdf5_protect_start(base);
  460. status = H5Dclose(dataObj->dataset);
  461. STARPU_ASSERT_MSG(status >= 0, "Can not free this HDF5 dataset (%s)\n", dataObj->path);
  462. /* remove the dataset link in the HDF5
  463. * But it doesn't delete the space in the file */
  464. status = H5Ldelete(fileBase->fileID, dataObj->path, H5P_DEFAULT);
  465. STARPU_ASSERT_MSG(status >= 0, "Can not delete the link associed to this dataset (%s)\n", dataObj->path);
  466. _starpu_hdf5_protect_stop(base);
  467. free(dataObj->path);
  468. free(dataObj);
  469. }
  470. static void *starpu_hdf5_open(void *base, void *pos, size_t size)
  471. {
  472. struct starpu_hdf5_base * fileBase = (struct starpu_hdf5_base *) base;
  473. struct starpu_hdf5_obj * obj;
  474. char *name;
  475. name = strdup((char *)pos);
  476. STARPU_ASSERT(name);
  477. obj = _starpu_hdf5_data_open(fileBase, name, size);
  478. if (!obj)
  479. {
  480. free(name);
  481. }
  482. return (void *) obj;
  483. }
  484. static void starpu_hdf5_close(void *base, void *obj, size_t size STARPU_ATTRIBUTE_UNUSED)
  485. {
  486. struct starpu_hdf5_obj * dataObj = (struct starpu_hdf5_obj *) obj;
  487. herr_t status;
  488. _starpu_hdf5_protect_start(base);
  489. status = H5Dclose(dataObj->dataset);
  490. STARPU_ASSERT_MSG(status >= 0, "Can not close this HDF5 dataset (%s)\n", dataObj->path);
  491. _starpu_hdf5_protect_stop(base);
  492. free(dataObj->path);
  493. free(dataObj);
  494. }
  495. static void starpu_hdf5_wait(void * event)
  496. {
  497. starpu_sem_t * finished = (starpu_sem_t *) event;
  498. starpu_sem_wait(finished);
  499. }
  500. static int starpu_hdf5_test(void * event)
  501. {
  502. starpu_sem_t * finished = (starpu_sem_t *) event;
  503. return starpu_sem_trywait(finished) == 0;
  504. }
  505. static int starpu_hdf5_full_read(void *base, void *obj, void **ptr, size_t *size)
  506. {
  507. struct starpu_hdf5_obj * dataObj = (struct starpu_hdf5_obj *) obj;
  508. starpu_sem_t finished;
  509. starpu_sem_init(&finished, 0, 0);
  510. _starpu_hdf5_protect_start(base);
  511. *size = _starpu_get_size_obj(dataObj);
  512. _starpu_hdf5_protect_stop(base);
  513. starpu_malloc_flags(ptr, *size, 0);
  514. starpu_hdf5_send_work(base, obj, *ptr, 0, *size, (void*) &finished, FULL_READ);
  515. starpu_hdf5_wait(&finished);
  516. starpu_sem_destroy(&finished);
  517. return 0;
  518. }
  519. static int starpu_hdf5_full_write(void *base, void *obj, void *ptr, size_t size)
  520. {
  521. starpu_sem_t finished;
  522. starpu_sem_init(&finished, 0, 0);
  523. starpu_hdf5_send_work(base, obj, ptr, 0, size, (void*) &finished, FULL_WRITE);
  524. starpu_hdf5_wait(&finished);
  525. starpu_sem_destroy(&finished);
  526. return 0;
  527. }
  528. static int starpu_hdf5_read(void *base, void *obj, void *buf, off_t offset, size_t size)
  529. {
  530. starpu_sem_t finished;
  531. starpu_sem_init(&finished, 0, 0);
  532. starpu_hdf5_send_work(base, obj, buf, offset, size, (void*) &finished, READ);
  533. starpu_hdf5_wait(&finished);
  534. starpu_sem_destroy(&finished);
  535. return 0;
  536. }
  537. static int starpu_hdf5_write(void *base, void *obj, const void *buf, off_t offset, size_t size)
  538. {
  539. starpu_sem_t finished;
  540. starpu_sem_init(&finished, 0, 0);
  541. starpu_hdf5_send_work(base, obj, (void *) buf, offset, size, (void*) &finished, WRITE);
  542. starpu_hdf5_wait(&finished);
  543. starpu_sem_destroy(&finished);
  544. return 0;
  545. }
  546. static void * starpu_hdf5_async_read(void *base, void *obj, void *buf, off_t offset, size_t size)
  547. {
  548. starpu_sem_t * finished;
  549. _STARPU_MALLOC(finished, sizeof(*finished));
  550. starpu_sem_init(finished, 0, 0);
  551. starpu_hdf5_send_work(base, obj, buf, offset, size, (void*) finished, READ);
  552. return finished;
  553. }
  554. static void * starpu_hdf5_async_write(void *base, void *obj, void *buf, off_t offset, size_t size)
  555. {
  556. starpu_sem_t * finished;
  557. _STARPU_MALLOC(finished, sizeof(*finished));
  558. starpu_sem_init(finished, 0, 0);
  559. starpu_hdf5_send_work(base, obj, (void *) buf, offset, size, (void*) finished, WRITE);
  560. return finished;
  561. }
  562. static void starpu_hdf5_free_request(void * event)
  563. {
  564. starpu_sem_destroy(event);
  565. free(event);
  566. }
  567. static int get_hdf5_bandwidth_between_disk_and_main_ram(unsigned node)
  568. {
  569. unsigned iter;
  570. double timing_slowness, timing_latency;
  571. double start;
  572. double end;
  573. char *buf;
  574. srand(time(NULL));
  575. starpu_malloc_flags((void **) &buf, STARPU_DISK_SIZE_MIN, 0);
  576. STARPU_ASSERT(buf != NULL);
  577. /* allocate memory */
  578. void *mem = _starpu_disk_alloc(node, STARPU_DISK_SIZE_MIN);
  579. /* fail to alloc */
  580. if (mem == NULL)
  581. return 0;
  582. memset(buf, 0, STARPU_DISK_SIZE_MIN);
  583. /* Measure upload slowness */
  584. start = starpu_timing_now();
  585. for (iter = 0; iter < NITER; ++iter)
  586. {
  587. _starpu_disk_write(STARPU_MAIN_RAM, node, mem, buf, 0, STARPU_DISK_SIZE_MIN, NULL);
  588. }
  589. end = starpu_timing_now();
  590. timing_slowness = end - start;
  591. /* free memory */
  592. starpu_free_flags(buf, STARPU_DISK_SIZE_MIN, 0);
  593. starpu_malloc_flags((void**) &buf, sizeof(char), 0);
  594. STARPU_ASSERT(buf != NULL);
  595. *buf = 0;
  596. /* Measure latency */
  597. start = starpu_timing_now();
  598. for (iter = 0; iter < NITER; ++iter)
  599. {
  600. _starpu_disk_write(STARPU_MAIN_RAM, node, mem, buf, rand() % (STARPU_DISK_SIZE_MIN -1) , 1, NULL);
  601. }
  602. end = starpu_timing_now();
  603. timing_latency = end - start;
  604. _starpu_disk_free(node, mem, STARPU_DISK_SIZE_MIN);
  605. starpu_free_flags(buf, sizeof(char), 0);
  606. _starpu_save_bandwidth_and_latency_disk((NITER/timing_slowness)*STARPU_DISK_SIZE_MIN, (NITER/timing_slowness)*STARPU_DISK_SIZE_MIN,
  607. timing_latency/NITER, timing_latency/NITER, node);
  608. return 1;
  609. }
  610. struct starpu_disk_ops starpu_disk_hdf5_ops =
  611. {
  612. .alloc = starpu_hdf5_alloc,
  613. .free = starpu_hdf5_free,
  614. .open = starpu_hdf5_open,
  615. .close = starpu_hdf5_close,
  616. .read = starpu_hdf5_read,
  617. .write = starpu_hdf5_write,
  618. .plug = starpu_hdf5_plug,
  619. .unplug = starpu_hdf5_unplug,
  620. .copy = NULL,
  621. .bandwidth = get_hdf5_bandwidth_between_disk_and_main_ram,
  622. .full_read = starpu_hdf5_full_read,
  623. .full_write = starpu_hdf5_full_write,
  624. .async_read = starpu_hdf5_async_read,
  625. .async_write = starpu_hdf5_async_write,
  626. .wait_request = starpu_hdf5_wait,
  627. .test_request = starpu_hdf5_test,
  628. .free_request = starpu_hdf5_free_request
  629. };