disk_stdio.c 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2013 Corentin Salingue
  4. *
  5. * StarPU is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * StarPU is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <fcntl.h>
  17. #include <stdio.h>
  18. #include <stdlib.h>
  19. #include <sys/stat.h>
  20. #include <sys/time.h>
  21. #include <aio.h>
  22. #include <errno.h>
  23. #include <time.h>
  24. #include <starpu.h>
  25. #include <core/disk.h>
  26. #include <core/perfmodel/perfmodel.h>
  27. #include <datawizard/copy_driver.h>
  28. #include <datawizard/memory_manager.h>
  29. #ifdef STARPU_HAVE_WINDOWS
  30. #include <io.h>
  31. #endif
  32. #define NITER 64
  33. /* ------------------- use STDIO to write on disk ------------------- */
  34. struct starpu_stdio_obj {
  35. int descriptor;
  36. FILE * file;
  37. char * path;
  38. double size;
  39. starpu_pthread_mutex_t mutex;
  40. };
  41. /* allocation memory on disk */
  42. static void *
  43. starpu_stdio_alloc (void *base, size_t size)
  44. {
  45. struct starpu_stdio_obj * obj = malloc(sizeof(struct starpu_stdio_obj));
  46. STARPU_ASSERT(obj != NULL);
  47. int id = -1;
  48. /* create template for mkstemp */
  49. char * baseCpy = malloc(strlen(base)+8);
  50. STARPU_ASSERT(baseCpy != NULL);
  51. char * tmp = "STARPU_XXXXXX";
  52. strcpy(baseCpy, (char *) base);
  53. strcat(baseCpy,"/");
  54. strcat(baseCpy,tmp);
  55. #ifdef STARPU_HAVE_WINDOWS
  56. _mktemp(baseCpy);
  57. id = open(baseCpy, "rb+");
  58. #else
  59. id = mkstemp(baseCpy);
  60. #endif
  61. /* fail */
  62. if (id < 0)
  63. {
  64. free(obj);
  65. free(baseCpy);
  66. return NULL;
  67. }
  68. FILE * f = fdopen(id, "rb+");
  69. /* fail */
  70. if (f == NULL)
  71. {
  72. /* delete fic */
  73. free(obj);
  74. free(baseCpy);
  75. unlink(baseCpy);
  76. return NULL;
  77. }
  78. #ifdef STARPU_HAVE_WINDOWS
  79. int val = _chsize(id, size);
  80. #else
  81. int val = ftruncate(id,size);
  82. #endif
  83. /* fail */
  84. if (val < 0)
  85. {
  86. free(obj);
  87. free(baseCpy);
  88. unlink(baseCpy);
  89. return NULL;
  90. }
  91. STARPU_PTHREAD_MUTEX_INIT(&obj->mutex, NULL);
  92. obj->descriptor = id;
  93. obj->file = f;
  94. obj->path = baseCpy;
  95. obj->size = size;
  96. return (void *) obj;
  97. }
  98. /* free memory on disk */
  99. static void
  100. starpu_stdio_free (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, size_t size STARPU_ATTRIBUTE_UNUSED)
  101. {
  102. struct starpu_stdio_obj * tmp = (struct starpu_stdio_obj *) obj;
  103. STARPU_PTHREAD_MUTEX_DESTROY(&tmp->mutex);
  104. unlink(tmp->path);
  105. fclose(tmp->file);
  106. close(tmp->descriptor);
  107. free(tmp->path);
  108. free(tmp);
  109. }
  110. /* open an existing memory on disk */
  111. static void *
  112. starpu_stdio_open (void *base, void *pos, size_t size)
  113. {
  114. struct starpu_stdio_obj * obj = malloc(sizeof(struct starpu_stdio_obj));
  115. STARPU_ASSERT(obj != NULL);
  116. /* create template */
  117. char * baseCpy = malloc(strlen(base)+1+strlen(pos)+1);
  118. STARPU_ASSERT(baseCpy != NULL);
  119. strcpy(baseCpy,(char *) base);
  120. strcat(baseCpy,(char *) "/");
  121. strcat(baseCpy,(char *) pos);
  122. int id = open(baseCpy, O_RDWR);
  123. if (id < 0)
  124. {
  125. free(obj);
  126. free(baseCpy);
  127. return NULL;
  128. }
  129. FILE * f = fdopen(id,"rb+");
  130. if (f == NULL)
  131. {
  132. free(obj);
  133. free(baseCpy);
  134. return NULL;
  135. }
  136. STARPU_PTHREAD_MUTEX_INIT(&obj->mutex, NULL);
  137. obj->descriptor = id;
  138. obj->file = f;
  139. obj->path = baseCpy;
  140. obj->size = size;
  141. return (void *) obj;
  142. }
  143. /* free memory without delete it */
  144. static void
  145. starpu_stdio_close (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, size_t size STARPU_ATTRIBUTE_UNUSED)
  146. {
  147. struct starpu_stdio_obj * tmp = (struct starpu_stdio_obj *) obj;
  148. STARPU_PTHREAD_MUTEX_DESTROY(&tmp->mutex);
  149. fclose(tmp->file);
  150. close(tmp->descriptor);
  151. free(tmp->path);
  152. free(tmp);
  153. }
  154. /* read the memory disk */
  155. static int
  156. starpu_stdio_read (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size, void * async_channel STARPU_ATTRIBUTE_UNUSED)
  157. {
  158. struct starpu_stdio_obj * tmp = (struct starpu_stdio_obj *) obj;
  159. STARPU_PTHREAD_MUTEX_LOCK(&tmp->mutex);
  160. int res = fseek(tmp->file, offset, SEEK_SET);
  161. STARPU_ASSERT_MSG(res == 0, "Stdio read failed");
  162. ssize_t nb = fread (buf, 1, size, tmp->file);
  163. STARPU_ASSERT_MSG(nb >= 0, "Stdio read failed");
  164. STARPU_PTHREAD_MUTEX_UNLOCK(&tmp->mutex);
  165. return 0;
  166. }
  167. static int
  168. starpu_stdio_async_read (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size, void * async_channel)
  169. {
  170. struct starpu_stdio_obj * tmp = (struct starpu_stdio_obj *) obj;
  171. struct _starpu_async_channel * channel = (struct _starpu_async_channel *) async_channel;
  172. struct aiocb *aiocb = &channel->event.disk_event._starpu_aiocb_disk;
  173. memset(aiocb, 0, sizeof(struct aiocb));
  174. aiocb->aio_fildes = tmp->descriptor;
  175. aiocb->aio_offset = offset;
  176. aiocb->aio_nbytes = size;
  177. aiocb->aio_buf = buf;
  178. aiocb->aio_reqprio = 0;
  179. aiocb->aio_lio_opcode = LIO_NOP;
  180. return aio_read(aiocb);
  181. }
  182. static int
  183. starpu_stdio_full_read(unsigned node, void *base STARPU_ATTRIBUTE_UNUSED, void * obj, void ** ptr, size_t * size)
  184. {
  185. struct starpu_stdio_obj * tmp = (struct starpu_stdio_obj *) obj;
  186. *size = tmp->size;
  187. *ptr = malloc(*size);
  188. return _starpu_disk_read(node, STARPU_MAIN_RAM, obj, *ptr, 0, *size, NULL);
  189. }
  190. /* write on the memory disk */
  191. static int
  192. starpu_stdio_write (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, const void *buf, off_t offset, size_t size, void * async_channel STARPU_ATTRIBUTE_UNUSED)
  193. {
  194. struct starpu_stdio_obj * tmp = (struct starpu_stdio_obj *) obj;
  195. STARPU_PTHREAD_MUTEX_LOCK(&tmp->mutex);
  196. int res = fseek(tmp->file, offset, SEEK_SET);
  197. STARPU_ASSERT_MSG(res == 0, "Stdio write failed");
  198. ssize_t nb = fwrite (buf, 1, size, tmp->file);
  199. STARPU_PTHREAD_MUTEX_UNLOCK(&tmp->mutex);
  200. return nb;
  201. }
  202. static int
  203. starpu_stdio_async_write (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size, void * async_channel)
  204. {
  205. struct starpu_stdio_obj * tmp = (struct starpu_stdio_obj *) obj;
  206. struct _starpu_async_channel * channel = (struct _starpu_async_channel *) async_channel;
  207. struct aiocb *aiocb = &channel->event.disk_event._starpu_aiocb_disk ;
  208. memset(aiocb, 0, sizeof(struct aiocb));
  209. aiocb->aio_fildes = tmp->descriptor;
  210. aiocb->aio_offset = offset;
  211. aiocb->aio_nbytes = size;
  212. aiocb->aio_buf = buf;
  213. aiocb->aio_reqprio = 0;
  214. aiocb->aio_lio_opcode = LIO_NOP;
  215. return aio_write(aiocb);
  216. }
  217. static int
  218. starpu_stdio_full_write (unsigned node, void * base STARPU_ATTRIBUTE_UNUSED, void * obj, void * ptr, size_t size)
  219. {
  220. struct starpu_stdio_obj * tmp = (struct starpu_stdio_obj *) obj;
  221. /* update file size to realise the next good full_read */
  222. if(size != tmp->size)
  223. {
  224. _starpu_memory_manager_deallocate_size(tmp->size, node);
  225. if (_starpu_memory_manager_can_allocate_size(size, node))
  226. {
  227. #ifdef STARPU_HAVE_WINDOWS
  228. int val = _chsize(tmp->descriptor, size);
  229. #else
  230. int val = ftruncate(tmp->descriptor,size);
  231. #endif
  232. STARPU_ASSERT_MSG(val >= 0,"StarPU Error to truncate file in STDIO full_write function");
  233. tmp->size = size;
  234. }
  235. else
  236. {
  237. STARPU_ASSERT_MSG(0, "Can't allocate size %u on the disk !", (int) size);
  238. }
  239. }
  240. return _starpu_disk_write(STARPU_MAIN_RAM, node, obj, ptr, 0, tmp->size, NULL);
  241. }
  242. /* create a new copy of parameter == base */
  243. static void *
  244. starpu_stdio_plug (void *parameter)
  245. {
  246. char * tmp = malloc(sizeof(char)*(strlen(parameter)+1));
  247. STARPU_ASSERT(tmp != NULL);
  248. strcpy(tmp,(char *) parameter);
  249. return (void *) tmp;
  250. }
  251. /* free memory allocated for the base */
  252. static void
  253. starpu_stdio_unplug (void *base)
  254. {
  255. free(base);
  256. }
  257. static int
  258. get_stdio_bandwidth_between_disk_and_main_ram(unsigned node)
  259. {
  260. unsigned iter;
  261. double timing_slowness, timing_latency;
  262. struct timeval start;
  263. struct timeval end;
  264. srand (time (NULL));
  265. char * buf = malloc(SIZE_DISK_MIN*sizeof(char));
  266. STARPU_ASSERT(buf != NULL);
  267. /* allocate memory */
  268. void * mem = _starpu_disk_alloc(node, SIZE_DISK_MIN);
  269. /* fail to alloc */
  270. if (mem == NULL)
  271. return 0;
  272. struct starpu_stdio_obj * tmp = (struct starpu_stdio_obj *) mem;
  273. /* Measure upload slowness */
  274. gettimeofday(&start, NULL);
  275. for (iter = 0; iter < NITER; ++iter)
  276. {
  277. _starpu_disk_write(STARPU_MAIN_RAM, node, mem, buf, 0, SIZE_DISK_MIN, NULL);
  278. /* clean cache memory */
  279. int res = fflush (tmp->file);
  280. STARPU_ASSERT_MSG(res == 0, "Slowness computation failed \n");
  281. #ifdef STARPU_HAVE_WINDOWS
  282. res = _commit(tmp->descriptor);
  283. #else
  284. res = fsync(tmp->descriptor);
  285. #endif
  286. STARPU_ASSERT_MSG(res == 0, "Slowness computation failed \n");
  287. }
  288. gettimeofday(&end, NULL);
  289. timing_slowness = (double)((end.tv_sec - start.tv_sec)*1000000 + (end.tv_usec - start.tv_usec));
  290. /* free memory */
  291. free(buf);
  292. buf = malloc(sizeof(char));
  293. STARPU_ASSERT(buf != NULL);
  294. /* Measure latency */
  295. gettimeofday(&start, NULL);
  296. for (iter = 0; iter < NITER; ++iter)
  297. {
  298. _starpu_disk_write(STARPU_MAIN_RAM, node, mem, buf, rand() % (SIZE_DISK_MIN -1) , 1, NULL);
  299. int res = fflush (tmp->file);
  300. STARPU_ASSERT_MSG(res == 0, "Latency computation failed");
  301. #ifdef STARPU_HAVE_WINDOWS
  302. res = _commit(tmp->descriptor);
  303. #else
  304. res = fsync(tmp->descriptor);
  305. #endif
  306. STARPU_ASSERT_MSG(res == 0, "Latency computation failed");
  307. }
  308. gettimeofday(&end, NULL);
  309. timing_latency = (double)((end.tv_sec - start.tv_sec)*1000000 + (end.tv_usec - start.tv_usec));
  310. _starpu_disk_free(node, mem, SIZE_DISK_MIN);
  311. free(buf);
  312. _starpu_save_bandwidth_and_latency_disk((NITER/timing_slowness)*1000000, (NITER/timing_slowness)*1000000,
  313. timing_latency/NITER, timing_latency/NITER, node);
  314. return 1;
  315. }
  316. static void
  317. starpu_stdio_wait_request(void * async_channel)
  318. {
  319. struct _starpu_async_channel * channel = (struct _starpu_async_channel *) async_channel;
  320. const struct aiocb * aiocb = &channel->event.disk_event._starpu_aiocb_disk;
  321. const struct aiocb * list[1];
  322. list[0] = aiocb;
  323. int values = -1;
  324. int error_disk = EAGAIN;
  325. while(values < 0 || error_disk == EAGAIN)
  326. {
  327. /* Wait the answer of the request TIMESTAMP IS NULL */
  328. values = aio_suspend(list, 1, NULL);
  329. error_disk = errno;
  330. }
  331. }
  332. static int
  333. starpu_stdio_test_request(void * async_channel)
  334. {
  335. struct timespec time_wait_request;
  336. time_wait_request.tv_sec = 0;
  337. time_wait_request.tv_nsec = 0;
  338. struct _starpu_async_channel * channel = (struct _starpu_async_channel *) async_channel;
  339. const struct aiocb * aiocb = &channel->event.disk_event._starpu_aiocb_disk;
  340. const struct aiocb * list[1];
  341. list[0] = aiocb;
  342. int values = -1;
  343. int error_disk = EAGAIN;
  344. /* Wait the answer of the request */
  345. values = aio_suspend(list, 1, &time_wait_request);
  346. error_disk = errno;
  347. /* request is finished */
  348. if (values == 0)
  349. return 1;
  350. /* values == -1 */
  351. if (error_disk == EAGAIN)
  352. return 0;
  353. /* an error occured */
  354. STARPU_ABORT();
  355. }
  356. struct starpu_disk_ops starpu_disk_stdio_ops = {
  357. .alloc = starpu_stdio_alloc,
  358. .free = starpu_stdio_free,
  359. .open = starpu_stdio_open,
  360. .close = starpu_stdio_close,
  361. .read = starpu_stdio_read,
  362. .async_read = starpu_stdio_async_read,
  363. .write = starpu_stdio_write,
  364. .async_write = starpu_stdio_async_write,
  365. .plug = starpu_stdio_plug,
  366. .unplug = starpu_stdio_unplug,
  367. .copy = NULL,
  368. .bandwidth = get_stdio_bandwidth_between_disk_and_main_ram,
  369. .wait_request = starpu_stdio_wait_request,
  370. .test_request = starpu_stdio_test_request,
  371. .full_read = starpu_stdio_full_read,
  372. .full_write = starpu_stdio_full_write
  373. };