disk_stdio.c 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2013 Corentin Salingue
  4. *
  5. * StarPU is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * StarPU is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <fcntl.h>
  17. #include <stdio.h>
  18. #include <stdlib.h>
  19. #include <sys/stat.h>
  20. #include <sys/time.h>
  21. #ifdef HAVE_AIO_H
  22. #include <aio.h>
  23. #endif
  24. #include <errno.h>
  25. #include <time.h>
  26. #include <starpu.h>
  27. #include <core/disk.h>
  28. #include <core/perfmodel/perfmodel.h>
  29. #include <datawizard/copy_driver.h>
  30. #include <datawizard/memory_manager.h>
  31. #ifdef STARPU_HAVE_WINDOWS
  32. #include <io.h>
  33. #endif
  34. #define NITER 64
  35. /* ------------------- use STDIO to write on disk ------------------- */
  36. struct starpu_stdio_obj {
  37. int descriptor;
  38. FILE * file;
  39. char * path;
  40. double size;
  41. starpu_pthread_mutex_t mutex;
  42. };
  43. /* allocation memory on disk */
  44. static void *
  45. starpu_stdio_alloc (void *base, size_t size)
  46. {
  47. struct starpu_stdio_obj * obj = malloc(sizeof(struct starpu_stdio_obj));
  48. STARPU_ASSERT(obj != NULL);
  49. int id = -1;
  50. /* create template for mkstemp */
  51. char * baseCpy = malloc(strlen(base)+8);
  52. STARPU_ASSERT(baseCpy != NULL);
  53. char * tmp = "STARPU_XXXXXX";
  54. strcpy(baseCpy, (char *) base);
  55. strcat(baseCpy,"/");
  56. strcat(baseCpy,tmp);
  57. #ifdef STARPU_HAVE_WINDOWS
  58. _mktemp(baseCpy);
  59. id = open(baseCpy, O_RDWR);
  60. #else
  61. id = mkstemp(baseCpy);
  62. #endif
  63. /* fail */
  64. if (id < 0)
  65. {
  66. free(obj);
  67. free(baseCpy);
  68. return NULL;
  69. }
  70. FILE * f = fdopen(id, "rb+");
  71. /* fail */
  72. if (f == NULL)
  73. {
  74. /* delete fic */
  75. free(obj);
  76. free(baseCpy);
  77. unlink(baseCpy);
  78. return NULL;
  79. }
  80. #ifdef STARPU_HAVE_WINDOWS
  81. int val = _chsize(id, size);
  82. #else
  83. int val = ftruncate(id,size);
  84. #endif
  85. /* fail */
  86. if (val < 0)
  87. {
  88. free(obj);
  89. free(baseCpy);
  90. unlink(baseCpy);
  91. return NULL;
  92. }
  93. STARPU_PTHREAD_MUTEX_INIT(&obj->mutex, NULL);
  94. obj->descriptor = id;
  95. obj->file = f;
  96. obj->path = baseCpy;
  97. obj->size = size;
  98. return (void *) obj;
  99. }
  100. /* free memory on disk */
  101. static void
  102. starpu_stdio_free (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, size_t size STARPU_ATTRIBUTE_UNUSED)
  103. {
  104. struct starpu_stdio_obj * tmp = (struct starpu_stdio_obj *) obj;
  105. STARPU_PTHREAD_MUTEX_DESTROY(&tmp->mutex);
  106. unlink(tmp->path);
  107. fclose(tmp->file);
  108. close(tmp->descriptor);
  109. free(tmp->path);
  110. free(tmp);
  111. }
  112. /* open an existing memory on disk */
  113. static void *
  114. starpu_stdio_open (void *base, void *pos, size_t size)
  115. {
  116. struct starpu_stdio_obj * obj = malloc(sizeof(struct starpu_stdio_obj));
  117. STARPU_ASSERT(obj != NULL);
  118. /* create template */
  119. char * baseCpy = malloc(strlen(base)+1+strlen(pos)+1);
  120. STARPU_ASSERT(baseCpy != NULL);
  121. strcpy(baseCpy,(char *) base);
  122. strcat(baseCpy,(char *) "/");
  123. strcat(baseCpy,(char *) pos);
  124. int id = open(baseCpy, O_RDWR);
  125. if (id < 0)
  126. {
  127. free(obj);
  128. free(baseCpy);
  129. return NULL;
  130. }
  131. FILE * f = fdopen(id,"rb+");
  132. if (f == NULL)
  133. {
  134. free(obj);
  135. free(baseCpy);
  136. return NULL;
  137. }
  138. STARPU_PTHREAD_MUTEX_INIT(&obj->mutex, NULL);
  139. obj->descriptor = id;
  140. obj->file = f;
  141. obj->path = baseCpy;
  142. obj->size = size;
  143. return (void *) obj;
  144. }
  145. /* free memory without delete it */
  146. static void
  147. starpu_stdio_close (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, size_t size STARPU_ATTRIBUTE_UNUSED)
  148. {
  149. struct starpu_stdio_obj * tmp = (struct starpu_stdio_obj *) obj;
  150. STARPU_PTHREAD_MUTEX_DESTROY(&tmp->mutex);
  151. fclose(tmp->file);
  152. close(tmp->descriptor);
  153. free(tmp->path);
  154. free(tmp);
  155. }
  156. /* read the memory disk */
  157. static int
  158. starpu_stdio_read (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size, void * async_channel STARPU_ATTRIBUTE_UNUSED)
  159. {
  160. struct starpu_stdio_obj * tmp = (struct starpu_stdio_obj *) obj;
  161. STARPU_PTHREAD_MUTEX_LOCK(&tmp->mutex);
  162. int res = fseek(tmp->file, offset, SEEK_SET);
  163. STARPU_ASSERT_MSG(res == 0, "Stdio read failed");
  164. ssize_t nb = fread (buf, 1, size, tmp->file);
  165. STARPU_ASSERT_MSG(nb >= 0, "Stdio read failed");
  166. STARPU_PTHREAD_MUTEX_UNLOCK(&tmp->mutex);
  167. return 0;
  168. }
  169. static int
  170. starpu_stdio_full_read(unsigned node, void *base STARPU_ATTRIBUTE_UNUSED, void * obj, void ** ptr, size_t * size)
  171. {
  172. struct starpu_stdio_obj * tmp = (struct starpu_stdio_obj *) obj;
  173. *size = tmp->size;
  174. *ptr = malloc(*size);
  175. return _starpu_disk_read(node, STARPU_MAIN_RAM, obj, *ptr, 0, *size, NULL);
  176. }
  177. /* write on the memory disk */
  178. static int
  179. starpu_stdio_write (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, const void *buf, off_t offset, size_t size, void * async_channel STARPU_ATTRIBUTE_UNUSED)
  180. {
  181. struct starpu_stdio_obj * tmp = (struct starpu_stdio_obj *) obj;
  182. STARPU_PTHREAD_MUTEX_LOCK(&tmp->mutex);
  183. int res = fseek(tmp->file, offset, SEEK_SET);
  184. STARPU_ASSERT_MSG(res == 0, "Stdio write failed");
  185. ssize_t nb = fwrite (buf, 1, size, tmp->file);
  186. STARPU_PTHREAD_MUTEX_UNLOCK(&tmp->mutex);
  187. return nb;
  188. }
  189. static int
  190. starpu_stdio_full_write (unsigned node, void * base STARPU_ATTRIBUTE_UNUSED, void * obj, void * ptr, size_t size)
  191. {
  192. struct starpu_stdio_obj * tmp = (struct starpu_stdio_obj *) obj;
  193. /* update file size to realise the next good full_read */
  194. if(size != tmp->size)
  195. {
  196. _starpu_memory_manager_deallocate_size(tmp->size, node);
  197. if (_starpu_memory_manager_can_allocate_size(size, node))
  198. {
  199. #ifdef STARPU_HAVE_WINDOWS
  200. int val = _chsize(tmp->descriptor, size);
  201. #else
  202. int val = ftruncate(tmp->descriptor,size);
  203. #endif
  204. STARPU_ASSERT_MSG(val >= 0,"StarPU Error to truncate file in STDIO full_write function");
  205. tmp->size = size;
  206. }
  207. else
  208. {
  209. STARPU_ASSERT_MSG(0, "Can't allocate size %u on the disk !", (int) size);
  210. }
  211. }
  212. return _starpu_disk_write(STARPU_MAIN_RAM, node, obj, ptr, 0, tmp->size, NULL);
  213. }
  214. /* create a new copy of parameter == base */
  215. static void *
  216. starpu_stdio_plug (void *parameter)
  217. {
  218. char * tmp = malloc(sizeof(char)*(strlen(parameter)+1));
  219. STARPU_ASSERT(tmp != NULL);
  220. strcpy(tmp,(char *) parameter);
  221. return (void *) tmp;
  222. }
  223. /* free memory allocated for the base */
  224. static void
  225. starpu_stdio_unplug (void *base)
  226. {
  227. free(base);
  228. }
  229. static int
  230. get_stdio_bandwidth_between_disk_and_main_ram(unsigned node)
  231. {
  232. unsigned iter;
  233. double timing_slowness, timing_latency;
  234. struct timeval start;
  235. struct timeval end;
  236. srand (time (NULL));
  237. char * buf = malloc(SIZE_DISK_MIN*sizeof(char));
  238. STARPU_ASSERT(buf != NULL);
  239. /* allocate memory */
  240. void * mem = _starpu_disk_alloc(node, SIZE_DISK_MIN);
  241. /* fail to alloc */
  242. if (mem == NULL)
  243. return 0;
  244. struct starpu_stdio_obj * tmp = (struct starpu_stdio_obj *) mem;
  245. /* Measure upload slowness */
  246. gettimeofday(&start, NULL);
  247. for (iter = 0; iter < NITER; ++iter)
  248. {
  249. _starpu_disk_write(STARPU_MAIN_RAM, node, mem, buf, 0, SIZE_DISK_MIN, NULL);
  250. /* clean cache memory */
  251. int res = fflush (tmp->file);
  252. STARPU_ASSERT_MSG(res == 0, "Slowness computation failed \n");
  253. #ifdef STARPU_HAVE_WINDOWS
  254. res = _commit(tmp->descriptor);
  255. #else
  256. res = fsync(tmp->descriptor);
  257. #endif
  258. STARPU_ASSERT_MSG(res == 0, "Slowness computation failed \n");
  259. }
  260. gettimeofday(&end, NULL);
  261. timing_slowness = (double)((end.tv_sec - start.tv_sec)*1000000 + (end.tv_usec - start.tv_usec));
  262. /* free memory */
  263. free(buf);
  264. buf = malloc(sizeof(char));
  265. STARPU_ASSERT(buf != NULL);
  266. /* Measure latency */
  267. gettimeofday(&start, NULL);
  268. for (iter = 0; iter < NITER; ++iter)
  269. {
  270. _starpu_disk_write(STARPU_MAIN_RAM, node, mem, buf, rand() % (SIZE_DISK_MIN -1) , 1, NULL);
  271. int res = fflush (tmp->file);
  272. STARPU_ASSERT_MSG(res == 0, "Latency computation failed");
  273. #ifdef STARPU_HAVE_WINDOWS
  274. res = _commit(tmp->descriptor);
  275. #else
  276. res = fsync(tmp->descriptor);
  277. #endif
  278. STARPU_ASSERT_MSG(res == 0, "Latency computation failed");
  279. }
  280. gettimeofday(&end, NULL);
  281. timing_latency = (double)((end.tv_sec - start.tv_sec)*1000000 + (end.tv_usec - start.tv_usec));
  282. _starpu_disk_free(node, mem, SIZE_DISK_MIN);
  283. free(buf);
  284. _starpu_save_bandwidth_and_latency_disk((NITER/timing_slowness)*1000000, (NITER/timing_slowness)*1000000,
  285. timing_latency/NITER, timing_latency/NITER, node);
  286. return 1;
  287. }
  288. struct starpu_disk_ops starpu_disk_stdio_ops = {
  289. .alloc = starpu_stdio_alloc,
  290. .free = starpu_stdio_free,
  291. .open = starpu_stdio_open,
  292. .close = starpu_stdio_close,
  293. .read = starpu_stdio_read,
  294. .write = starpu_stdio_write,
  295. .plug = starpu_stdio_plug,
  296. .unplug = starpu_stdio_unplug,
  297. .copy = NULL,
  298. .bandwidth = get_stdio_bandwidth_between_disk_and_main_ram,
  299. .full_read = starpu_stdio_full_read,
  300. .full_write = starpu_stdio_full_write
  301. };