disk_stdio.c 11 KB


  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2013-2020 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
  4. * Copyright (C) 2013 Corentin Salingue
  5. *
  6. * StarPU is free software; you can redistribute it and/or modify
  7. * it under the terms of the GNU Lesser General Public License as published by
  8. * the Free Software Foundation; either version 2.1 of the License, or (at
  9. * your option) any later version.
  10. *
  11. * StarPU is distributed in the hope that it will be useful, but
  12. * WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  14. *
  15. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  16. */
  17. #include <fcntl.h>
  18. #include <stdio.h>
  19. #include <stdlib.h>
  20. #include <sys/stat.h>
  21. #include <errno.h>
  22. #include <time.h>
  23. #include <starpu.h>
  24. #include <core/disk.h>
  25. #include <core/perfmodel/perfmodel.h>
  26. #include <datawizard/copy_driver.h>
  27. #include <datawizard/memory_manager.h>
  28. #include <datawizard/memory_nodes.h>
  29. #include <starpu_parameters.h>
  30. #ifdef STARPU_HAVE_WINDOWS
  31. # include <io.h>
  32. #endif
  33. #define NITER _starpu_calibration_minimum
  34. #ifndef O_BINARY
  35. #define O_BINARY 0
  36. #endif
  37. #define MAX_OPEN_FILES 64
  38. #define TEMP_HIERARCHY_DEPTH 2
  39. /* ------------------- use STDIO to write on disk ------------------- */
  40. static unsigned starpu_stdio_opened_files;
  41. struct starpu_stdio_obj
  42. {
  43. int descriptor;
  44. FILE * file;
  45. char * path;
  46. size_t size;
  47. starpu_pthread_mutex_t mutex;
  48. };
  49. struct starpu_stdio_base
  50. {
  51. char * path;
  52. int created;
  53. };
  54. static struct starpu_stdio_obj *_starpu_stdio_init(int descriptor, char *path, size_t size)
  55. {
  56. struct starpu_stdio_obj *obj;
  57. _STARPU_MALLOC(obj, sizeof(struct starpu_stdio_obj));
  58. FILE *f = fdopen(descriptor,"rb+");
  59. if (f == NULL)
  60. {
  61. free(obj);
  62. return NULL;
  63. }
  64. STARPU_HG_DISABLE_CHECKING(starpu_stdio_opened_files);
  65. if (starpu_stdio_opened_files >= MAX_OPEN_FILES)
  66. {
  67. /* Too many opened files, avoid keeping this one opened */
  68. fclose(f);
  69. f = NULL;
  70. descriptor = -1;
  71. }
  72. else
  73. (void) STARPU_ATOMIC_ADD(&starpu_stdio_opened_files, 1);
  74. STARPU_PTHREAD_MUTEX_INIT(&obj->mutex, NULL);
  75. obj->descriptor = descriptor;
  76. obj->file = f;
  77. obj->path = path;
  78. obj->size = size;
  79. return (void *) obj;
  80. }
  81. static FILE *_starpu_stdio_reopen(struct starpu_stdio_obj *obj)
  82. {
  83. int id = open(obj->path, O_RDWR);
  84. STARPU_ASSERT(id >= 0);
  85. FILE *f = fdopen(id,"rb+");
  86. STARPU_ASSERT(f);
  87. return f;
  88. }
  89. static void _starpu_stdio_reclose(FILE *f)
  90. {
  91. fclose(f);
  92. }
  93. static void _starpu_stdio_close(struct starpu_stdio_obj *obj)
  94. {
  95. if (obj->descriptor < 0)
  96. return;
  97. if (starpu_stdio_opened_files < MAX_OPEN_FILES)
  98. (void) STARPU_ATOMIC_ADD(&starpu_stdio_opened_files, -1);
  99. fclose(obj->file);
  100. }
  101. static void _starpu_stdio_fini(struct starpu_stdio_obj *obj)
  102. {
  103. STARPU_PTHREAD_MUTEX_DESTROY(&obj->mutex);
  104. free(obj->path);
  105. free(obj);
  106. }
  107. /* allocation memory on disk */
  108. static void *starpu_stdio_alloc(void *base, size_t size)
  109. {
  110. struct starpu_stdio_obj *obj;
  111. struct starpu_stdio_base * fileBase = (struct starpu_stdio_base *) base;
  112. int id;
  113. char *baseCpy = _starpu_mktemp_many(fileBase->path, TEMP_HIERARCHY_DEPTH, O_RDWR | O_BINARY, &id);
  114. /* fail */
  115. if (!baseCpy)
  116. return NULL;
  117. int val = _starpu_ftruncate(id,size);
  118. /* fail */
  119. if (val < 0)
  120. {
  121. _STARPU_DISP("Could not truncate file, ftruncate failed with error '%s'\n", strerror(errno));
  122. close(id);
  123. unlink(baseCpy);
  124. free(baseCpy);
  125. return NULL;
  126. }
  127. obj = _starpu_stdio_init(id, baseCpy, size);
  128. if (!obj)
  129. {
  130. close(id);
  131. unlink(baseCpy);
  132. free(baseCpy);
  133. }
  134. return obj;
  135. }
  136. /* free memory on disk */
  137. static void starpu_stdio_free(void *base STARPU_ATTRIBUTE_UNUSED, void *obj, size_t size STARPU_ATTRIBUTE_UNUSED)
  138. {
  139. struct starpu_stdio_obj *tmp = (struct starpu_stdio_obj *) obj;
  140. _starpu_stdio_close(tmp);
  141. unlink(tmp->path);
  142. _starpu_rmtemp_many(tmp->path, TEMP_HIERARCHY_DEPTH);
  143. _starpu_stdio_fini(tmp);
  144. }
  145. /* open an existing memory on disk */
  146. static void *starpu_stdio_open(void *base, void *pos, size_t size)
  147. {
  148. struct starpu_stdio_base * fileBase = (struct starpu_stdio_base *) base;
  149. struct starpu_stdio_obj *obj;
  150. /* create template */
  151. char *baseCpy;
  152. _STARPU_MALLOC(baseCpy, strlen(fileBase->path)+1+strlen(pos)+1);
  153. snprintf(baseCpy, strlen(fileBase->path)+1+strlen(pos)+1, "%s/%s", fileBase->path, (char *)pos);
  154. int id = open(baseCpy, O_RDWR);
  155. if (id < 0)
  156. {
  157. free(baseCpy);
  158. return NULL;
  159. }
  160. obj = _starpu_stdio_init(id, baseCpy, size);
  161. if (!obj)
  162. free(baseCpy);
  163. return obj;
  164. }
  165. /* free memory without delete it */
  166. static void starpu_stdio_close(void *base STARPU_ATTRIBUTE_UNUSED, void *obj, size_t size STARPU_ATTRIBUTE_UNUSED)
  167. {
  168. struct starpu_stdio_obj *tmp = (struct starpu_stdio_obj *) obj;
  169. _starpu_stdio_close(tmp);
  170. _starpu_stdio_fini(tmp);
  171. }
  172. /* read the memory disk */
  173. static int starpu_stdio_read(void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size)
  174. {
  175. struct starpu_stdio_obj *tmp = (struct starpu_stdio_obj *) obj;
  176. FILE *f = tmp->file;
  177. if (f)
  178. STARPU_PTHREAD_MUTEX_LOCK(&tmp->mutex);
  179. else
  180. f = _starpu_stdio_reopen(obj);
  181. int res = fseek(f, offset, SEEK_SET);
  182. STARPU_ASSERT_MSG(res == 0, "Stdio read failed");
  183. starpu_ssize_t nb = fread(buf, 1, size, f);
  184. STARPU_ASSERT_MSG(nb >= 0, "Stdio read failed");
  185. if (tmp->file)
  186. STARPU_PTHREAD_MUTEX_UNLOCK(&tmp->mutex);
  187. else
  188. _starpu_stdio_reclose(f);
  189. return 0;
  190. }
  191. static int starpu_stdio_full_read(void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void **ptr, size_t *size, unsigned dst_node)
  192. {
  193. struct starpu_stdio_obj *tmp = (struct starpu_stdio_obj *) obj;
  194. FILE *f = tmp->file;
  195. starpu_ssize_t ssize;
  196. if (f)
  197. STARPU_PTHREAD_MUTEX_LOCK(&tmp->mutex);
  198. else
  199. f = _starpu_stdio_reopen(obj);
  200. int res = fseek(f, 0, SEEK_END);
  201. STARPU_ASSERT_MSG(res == 0, "Stdio write failed");
  202. ssize = ftell(f);
  203. STARPU_ASSERT_MSG(ssize >= 0, "Stdio write failed");
  204. *size = ssize;
  205. if (tmp->file)
  206. STARPU_PTHREAD_MUTEX_UNLOCK(&tmp->mutex);
  207. /* Alloc aligned buffer */
  208. _starpu_malloc_flags_on_node(dst_node, ptr, *size, 0);
  209. if (tmp->file)
  210. STARPU_PTHREAD_MUTEX_LOCK(&tmp->mutex);
  211. res = fseek(f, 0, SEEK_SET);
  212. STARPU_ASSERT_MSG(res == 0, "Stdio read failed");
  213. starpu_ssize_t nb = fread(*ptr, 1, *size, f);
  214. STARPU_ASSERT_MSG(nb >= 0, "Stdio read failed");
  215. if (tmp->file)
  216. STARPU_PTHREAD_MUTEX_UNLOCK(&tmp->mutex);
  217. else
  218. _starpu_stdio_reclose(f);
  219. return 0;
  220. }
  221. /* write on the memory disk */
  222. static int starpu_stdio_write(void *base STARPU_ATTRIBUTE_UNUSED, void *obj, const void *buf, off_t offset, size_t size)
  223. {
  224. struct starpu_stdio_obj *tmp = (struct starpu_stdio_obj *) obj;
  225. FILE *f = tmp->file;
  226. if (f)
  227. STARPU_PTHREAD_MUTEX_LOCK(&tmp->mutex);
  228. else
  229. f = _starpu_stdio_reopen(obj);
  230. int res = fseek(f, offset, SEEK_SET);
  231. STARPU_ASSERT_MSG(res == 0, "Stdio write failed");
  232. fwrite(buf, 1, size, f);
  233. if (tmp->file)
  234. STARPU_PTHREAD_MUTEX_UNLOCK(&tmp->mutex);
  235. else
  236. _starpu_stdio_reclose(f);
  237. return 0;
  238. }
  239. static int starpu_stdio_full_write(void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *ptr, size_t size)
  240. {
  241. struct starpu_stdio_obj *tmp = (struct starpu_stdio_obj *) obj;
  242. FILE *f = tmp->file;
  243. if (!f)
  244. f = _starpu_stdio_reopen(obj);
  245. /* update file size to realise the next good full_read */
  246. if(size != tmp->size)
  247. {
  248. int val = _starpu_fftruncate(f,size);
  249. STARPU_ASSERT(val == 0);
  250. tmp->size = size;
  251. }
  252. int res = fseek(f, 0, SEEK_SET);
  253. STARPU_ASSERT_MSG(res == 0, "Stdio write failed");
  254. fwrite(ptr, 1, size, f);
  255. if (!tmp->file)
  256. _starpu_stdio_reclose(f);
  257. return 0;
  258. }
  259. static void *starpu_stdio_plug(void *parameter, starpu_ssize_t size STARPU_ATTRIBUTE_UNUSED)
  260. {
  261. struct starpu_stdio_base * base;
  262. struct stat buf;
  263. _STARPU_MALLOC(base, sizeof(*base));
  264. base->created = 0;
  265. base->path = strdup((char *) parameter);
  266. STARPU_ASSERT(base->path);
  267. if (!(stat(base->path, &buf) == 0 && S_ISDIR(buf.st_mode)))
  268. {
  269. _starpu_mkpath(base->path, S_IRWXU);
  270. base->created = 1;
  271. }
  272. return (void *) base;
  273. }
  274. /* free memory allocated for the base */
  275. static void starpu_stdio_unplug(void *base)
  276. {
  277. struct starpu_stdio_base * fileBase = (struct starpu_stdio_base *) base;
  278. if (fileBase->created)
  279. rmdir(fileBase->path);
  280. free(fileBase->path);
  281. free(fileBase);
  282. }
  283. static int get_stdio_bandwidth_between_disk_and_main_ram(unsigned node, void *base)
  284. {
  285. unsigned iter;
  286. double timing_slowness, timing_latency;
  287. double start;
  288. double end;
  289. char *buf;
  290. struct starpu_stdio_base * fileBase = (struct starpu_stdio_base *) base;
  291. srand(time(NULL));
  292. starpu_malloc_flags((void **) &buf, STARPU_DISK_SIZE_MIN, 0);
  293. STARPU_ASSERT(buf != NULL);
  294. /* allocate memory */
  295. void *mem = _starpu_disk_alloc(node, STARPU_DISK_SIZE_MIN);
  296. /* fail to alloc */
  297. if (mem == NULL)
  298. return 0;
  299. struct starpu_stdio_obj *tmp = (struct starpu_stdio_obj *) mem;
  300. memset(buf, 0, STARPU_DISK_SIZE_MIN);
  301. /* Measure upload slowness */
  302. start = starpu_timing_now();
  303. for (iter = 0; iter < NITER; ++iter)
  304. {
  305. FILE *f = tmp->file;
  306. _starpu_disk_write(STARPU_MAIN_RAM, node, mem, buf, 0, STARPU_DISK_SIZE_MIN, NULL);
  307. if (!f)
  308. f = _starpu_stdio_reopen(tmp);
  309. /* clean cache memory */
  310. int res = fflush(f);
  311. STARPU_ASSERT_MSG(res == 0, "Slowness computation failed \n");
  312. #ifdef STARPU_HAVE_WINDOWS
  313. res = _commit(fileno(f));
  314. #else
  315. res = fsync(fileno(f));
  316. #endif
  317. STARPU_ASSERT_MSG(res == 0, "Slowness computation failed \n");
  318. if (!tmp->file)
  319. _starpu_stdio_reclose(f);
  320. }
  321. end = starpu_timing_now();
  322. timing_slowness = end - start;
  323. /* free memory */
  324. starpu_free_flags(buf, STARPU_DISK_SIZE_MIN, 0);
  325. starpu_malloc_flags((void**) &buf, sizeof(char), 0);
  326. STARPU_ASSERT(buf != NULL);
  327. *buf = 0;
  328. /* Measure latency */
  329. start = starpu_timing_now();
  330. for (iter = 0; iter < NITER; ++iter)
  331. {
  332. FILE *f = tmp->file;
  333. _starpu_disk_write(STARPU_MAIN_RAM, node, mem, buf, rand() % (STARPU_DISK_SIZE_MIN -1) , 1, NULL);
  334. if (!f)
  335. f = _starpu_stdio_reopen(tmp);
  336. int res = fflush(f);
  337. STARPU_ASSERT_MSG(res == 0, "Latency computation failed");
  338. #ifdef STARPU_HAVE_WINDOWS
  339. res = _commit(fileno(f));
  340. #else
  341. res = fsync(fileno(f));
  342. #endif
  343. STARPU_ASSERT_MSG(res == 0, "Latency computation failed");
  344. if (!tmp->file)
  345. _starpu_stdio_reclose(f);
  346. }
  347. end = starpu_timing_now();
  348. timing_latency = end - start;
  349. _starpu_disk_free(node, mem, STARPU_DISK_SIZE_MIN);
  350. starpu_free_flags(buf, sizeof(char), 0);
  351. _starpu_save_bandwidth_and_latency_disk((NITER/timing_slowness)*STARPU_DISK_SIZE_MIN, (NITER/timing_slowness)*STARPU_DISK_SIZE_MIN,
  352. timing_latency/NITER, timing_latency/NITER, node, fileBase->path);
  353. return 1;
  354. }
  355. struct starpu_disk_ops starpu_disk_stdio_ops =
  356. {
  357. .alloc = starpu_stdio_alloc,
  358. .free = starpu_stdio_free,
  359. .open = starpu_stdio_open,
  360. .close = starpu_stdio_close,
  361. .read = starpu_stdio_read,
  362. .write = starpu_stdio_write,
  363. .plug = starpu_stdio_plug,
  364. .unplug = starpu_stdio_unplug,
  365. .copy = NULL,
  366. .bandwidth = get_stdio_bandwidth_between_disk_and_main_ram,
  367. .full_read = starpu_stdio_full_read,
  368. .full_write = starpu_stdio_full_write
  369. };