disk_stdio.c 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2013 Corentin Salingue
  4. *
  5. * StarPU is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * StarPU is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <fcntl.h>
  17. #include <stdio.h>
  18. #include <stdlib.h>
  19. #include <sys/stat.h>
  20. #include <errno.h>
  21. #include <time.h>
  22. #include <starpu.h>
  23. #include <core/disk.h>
  24. #include <core/perfmodel/perfmodel.h>
  25. #include <datawizard/copy_driver.h>
  26. #include <datawizard/memory_manager.h>
  27. #include <starpu_parameters.h>
  28. #ifdef STARPU_HAVE_WINDOWS
  29. #include <io.h>
  30. #endif
  31. #define NITER _STARPU_CALIBRATION_MINIMUM
  32. /* ------------------- use STDIO to write on disk ------------------- */
  33. struct starpu_stdio_obj
  34. {
  35. int descriptor;
  36. FILE * file;
  37. char * path;
  38. size_t size;
  39. starpu_pthread_mutex_t mutex;
  40. };
  41. /* allocation memory on disk */
  42. static void *starpu_stdio_alloc (void *base, size_t size)
  43. {
  44. struct starpu_stdio_obj * obj = malloc(sizeof(struct starpu_stdio_obj));
  45. STARPU_ASSERT(obj != NULL);
  46. int id = -1;
  47. /* create template for mkstemp */
  48. char * tmp = "STARPU_XXXXXX";
  49. char * baseCpy = malloc(strlen(base)+1+strlen(tmp)+1);
  50. STARPU_ASSERT(baseCpy != NULL);
  51. strcpy(baseCpy, (char *) base);
  52. strcat(baseCpy,"/");
  53. strcat(baseCpy,tmp);
  54. #ifdef STARPU_HAVE_WINDOWS
  55. _mktemp(baseCpy);
  56. id = open(baseCpy, O_RDWR | O_BINARY);
  57. #else
  58. id = mkstemp(baseCpy);
  59. #endif
  60. /* fail */
  61. if (id < 0)
  62. {
  63. free(obj);
  64. free(baseCpy);
  65. return NULL;
  66. }
  67. FILE * f = fdopen(id, "rb+");
  68. /* fail */
  69. if (f == NULL)
  70. {
  71. /* delete fic */
  72. close(id);
  73. unlink(baseCpy);
  74. free(baseCpy);
  75. free(obj);
  76. return NULL;
  77. }
  78. #ifdef STARPU_HAVE_WINDOWS
  79. int val = _chsize(id, size);
  80. #else
  81. int val = ftruncate(id,size);
  82. #endif
  83. /* fail */
  84. if (val < 0)
  85. {
  86. fclose(f);
  87. close(id);
  88. unlink(baseCpy);
  89. free(baseCpy);
  90. free(obj);
  91. return NULL;
  92. }
  93. STARPU_PTHREAD_MUTEX_INIT(&obj->mutex, NULL);
  94. obj->descriptor = id;
  95. obj->file = f;
  96. obj->path = baseCpy;
  97. obj->size = size;
  98. return (void *) obj;
  99. }
  100. /* free memory on disk */
  101. static void starpu_stdio_free(void *base STARPU_ATTRIBUTE_UNUSED, void *obj, size_t size STARPU_ATTRIBUTE_UNUSED)
  102. {
  103. struct starpu_stdio_obj * tmp = (struct starpu_stdio_obj *) obj;
  104. STARPU_PTHREAD_MUTEX_DESTROY(&tmp->mutex);
  105. fclose(tmp->file);
  106. close(tmp->descriptor);
  107. unlink(tmp->path);
  108. free(tmp->path);
  109. free(tmp);
  110. }
  111. /* open an existing memory on disk */
  112. static void *starpu_stdio_open (void *base, void *pos, size_t size)
  113. {
  114. struct starpu_stdio_obj * obj = malloc(sizeof(struct starpu_stdio_obj));
  115. STARPU_ASSERT(obj != NULL);
  116. /* create template */
  117. char * baseCpy = malloc(strlen(base)+1+strlen(pos)+1);
  118. STARPU_ASSERT(baseCpy != NULL);
  119. strcpy(baseCpy,(char *) base);
  120. strcat(baseCpy,(char *) "/");
  121. strcat(baseCpy,(char *) pos);
  122. int id = open(baseCpy, O_RDWR);
  123. if (id < 0)
  124. {
  125. free(obj);
  126. free(baseCpy);
  127. return NULL;
  128. }
  129. FILE * f = fdopen(id,"rb+");
  130. if (f == NULL)
  131. {
  132. free(obj);
  133. free(baseCpy);
  134. return NULL;
  135. }
  136. STARPU_PTHREAD_MUTEX_INIT(&obj->mutex, NULL);
  137. obj->descriptor = id;
  138. obj->file = f;
  139. obj->path = baseCpy;
  140. obj->size = size;
  141. return (void *) obj;
  142. }
  143. /* free memory without delete it */
  144. static void starpu_stdio_close(void *base STARPU_ATTRIBUTE_UNUSED, void *obj, size_t size STARPU_ATTRIBUTE_UNUSED)
  145. {
  146. struct starpu_stdio_obj * tmp = (struct starpu_stdio_obj *) obj;
  147. STARPU_PTHREAD_MUTEX_DESTROY(&tmp->mutex);
  148. fclose(tmp->file);
  149. close(tmp->descriptor);
  150. free(tmp->path);
  151. free(tmp);
  152. }
  153. /* read the memory disk */
  154. static int starpu_stdio_read(void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size)
  155. {
  156. struct starpu_stdio_obj * tmp = (struct starpu_stdio_obj *) obj;
  157. STARPU_PTHREAD_MUTEX_LOCK(&tmp->mutex);
  158. int res = fseek(tmp->file, offset, SEEK_SET);
  159. STARPU_ASSERT_MSG(res == 0, "Stdio read failed");
  160. starpu_ssize_t nb = fread (buf, 1, size, tmp->file);
  161. STARPU_ASSERT_MSG(nb >= 0, "Stdio read failed");
  162. STARPU_PTHREAD_MUTEX_UNLOCK(&tmp->mutex);
  163. return 0;
  164. }
  165. static int starpu_stdio_full_read(void *base STARPU_ATTRIBUTE_UNUSED, void * obj, void ** ptr, size_t * size)
  166. {
  167. struct starpu_stdio_obj * tmp = (struct starpu_stdio_obj *) obj;
  168. STARPU_PTHREAD_MUTEX_LOCK(&tmp->mutex);
  169. int res = fseek(tmp->file, 0, SEEK_END);
  170. STARPU_ASSERT_MSG(res == 0, "Stdio write failed");
  171. *size = ftell(tmp->file);
  172. STARPU_PTHREAD_MUTEX_UNLOCK(&tmp->mutex);
  173. /* Alloc aligned buffer */
  174. starpu_malloc_flags(ptr, *size, 0);
  175. return starpu_stdio_read(base, obj, *ptr, 0, *size);
  176. }
  177. /* write on the memory disk */
  178. static int starpu_stdio_write(void *base STARPU_ATTRIBUTE_UNUSED, void *obj, const void *buf, off_t offset, size_t size)
  179. {
  180. struct starpu_stdio_obj * tmp = (struct starpu_stdio_obj *) obj;
  181. STARPU_PTHREAD_MUTEX_LOCK(&tmp->mutex);
  182. int res = fseek(tmp->file, offset, SEEK_SET);
  183. STARPU_ASSERT_MSG(res == 0, "Stdio write failed");
  184. fwrite (buf, 1, size, tmp->file);
  185. STARPU_PTHREAD_MUTEX_UNLOCK(&tmp->mutex);
  186. return 0;
  187. }
  188. static int starpu_stdio_full_write(void * base STARPU_ATTRIBUTE_UNUSED, void * obj, void * ptr, size_t size)
  189. {
  190. struct starpu_stdio_obj * tmp = (struct starpu_stdio_obj *) obj;
  191. /* update file size to realise the next good full_read */
  192. if(size != tmp->size)
  193. {
  194. #ifdef STARPU_HAVE_WINDOWS
  195. int val = _chsize(tmp->descriptor, size);
  196. #else
  197. int val = ftruncate(tmp->descriptor,size);
  198. #endif
  199. STARPU_ASSERT(val == 0);
  200. tmp->size = size;
  201. }
  202. starpu_stdio_write(base, obj, ptr, 0, size);
  203. return 0;
  204. }
  205. /* create a new copy of parameter == base */
  206. static void *starpu_stdio_plug(void *parameter, starpu_ssize_t size STARPU_ATTRIBUTE_UNUSED)
  207. {
  208. char * tmp = malloc(sizeof(char)*(strlen(parameter)+1));
  209. STARPU_ASSERT(tmp != NULL);
  210. strcpy(tmp,(char *) parameter);
  211. return (void *) tmp;
  212. }
  213. /* free memory allocated for the base */
  214. static void starpu_stdio_unplug(void *base)
  215. {
  216. free(base);
  217. }
  218. static int get_stdio_bandwidth_between_disk_and_main_ram(unsigned node)
  219. {
  220. unsigned iter;
  221. double timing_slowness, timing_latency;
  222. double start;
  223. double end;
  224. char *buf;
  225. srand (time (NULL));
  226. starpu_malloc_flags((void **) &buf, SIZE_DISK_MIN, 0);
  227. STARPU_ASSERT(buf != NULL);
  228. /* allocate memory */
  229. void * mem = _starpu_disk_alloc(node, SIZE_DISK_MIN);
  230. /* fail to alloc */
  231. if (mem == NULL)
  232. return 0;
  233. struct starpu_stdio_obj * tmp = (struct starpu_stdio_obj *) mem;
  234. memset(buf, 0, SIZE_DISK_MIN);
  235. /* Measure upload slowness */
  236. start = starpu_timing_now();
  237. for (iter = 0; iter < NITER; ++iter)
  238. {
  239. _starpu_disk_write(STARPU_MAIN_RAM, node, mem, buf, 0, SIZE_DISK_MIN, NULL);
  240. /* clean cache memory */
  241. int res = fflush (tmp->file);
  242. STARPU_ASSERT_MSG(res == 0, "Slowness computation failed \n");
  243. #ifdef STARPU_HAVE_WINDOWS
  244. res = _commit(tmp->descriptor);
  245. #else
  246. res = fsync(tmp->descriptor);
  247. #endif
  248. STARPU_ASSERT_MSG(res == 0, "Slowness computation failed \n");
  249. }
  250. end = starpu_timing_now();
  251. timing_slowness = end - start;
  252. /* free memory */
  253. starpu_free_flags(buf, SIZE_DISK_MIN, 0);
  254. starpu_malloc_flags((void**) &buf, sizeof(char), 0);
  255. STARPU_ASSERT(buf != NULL);
  256. *buf = 0;
  257. /* Measure latency */
  258. start = starpu_timing_now();
  259. for (iter = 0; iter < NITER; ++iter)
  260. {
  261. _starpu_disk_write(STARPU_MAIN_RAM, node, mem, buf, rand() % (SIZE_DISK_MIN -1) , 1, NULL);
  262. int res = fflush (tmp->file);
  263. STARPU_ASSERT_MSG(res == 0, "Latency computation failed");
  264. #ifdef STARPU_HAVE_WINDOWS
  265. res = _commit(tmp->descriptor);
  266. #else
  267. res = fsync(tmp->descriptor);
  268. #endif
  269. STARPU_ASSERT_MSG(res == 0, "Latency computation failed");
  270. }
  271. end = starpu_timing_now();
  272. timing_latency = end - start;
  273. _starpu_disk_free(node, mem, SIZE_DISK_MIN);
  274. starpu_free_flags(buf, sizeof(char), 0);
  275. _starpu_save_bandwidth_and_latency_disk((NITER/timing_slowness)*1000000, (NITER/timing_slowness)*1000000,
  276. timing_latency/NITER, timing_latency/NITER, node);
  277. return 1;
  278. }
  279. struct starpu_disk_ops starpu_disk_stdio_ops =
  280. {
  281. .alloc = starpu_stdio_alloc,
  282. .free = starpu_stdio_free,
  283. .open = starpu_stdio_open,
  284. .close = starpu_stdio_close,
  285. .read = starpu_stdio_read,
  286. .write = starpu_stdio_write,
  287. .plug = starpu_stdio_plug,
  288. .unplug = starpu_stdio_unplug,
  289. .copy = NULL,
  290. .bandwidth = get_stdio_bandwidth_between_disk_and_main_ram,
  291. .full_read = starpu_stdio_full_read,
  292. .full_write = starpu_stdio_full_write
  293. };