disk_stdio.c 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2013 Corentin Salingue
  4. *
  5. * StarPU is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * StarPU is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <fcntl.h>
  17. #include <stdio.h>
  18. #include <stdlib.h>
  19. #include <sys/stat.h>
  20. #include <errno.h>
  21. #include <time.h>
  22. #include <starpu.h>
  23. #include <core/disk.h>
  24. #include <core/perfmodel/perfmodel.h>
  25. #include <datawizard/copy_driver.h>
  26. #include <datawizard/memory_manager.h>
  27. #ifdef STARPU_HAVE_WINDOWS
  28. #include <io.h>
  29. #endif
  30. #define NITER 64
  31. /* ------------------- use STDIO to write on disk ------------------- */
  32. struct starpu_stdio_obj
  33. {
  34. int descriptor;
  35. FILE * file;
  36. char * path;
  37. size_t size;
  38. starpu_pthread_mutex_t mutex;
  39. };
  40. /* allocation memory on disk */
  41. static void *starpu_stdio_alloc (void *base, size_t size)
  42. {
  43. struct starpu_stdio_obj * obj = malloc(sizeof(struct starpu_stdio_obj));
  44. STARPU_ASSERT(obj != NULL);
  45. int id = -1;
  46. /* create template for mkstemp */
  47. char * tmp = "STARPU_XXXXXX";
  48. char * baseCpy = malloc(strlen(base)+1+strlen(tmp)+1);
  49. STARPU_ASSERT(baseCpy != NULL);
  50. strcpy(baseCpy, (char *) base);
  51. strcat(baseCpy,"/");
  52. strcat(baseCpy,tmp);
  53. #ifdef STARPU_HAVE_WINDOWS
  54. _mktemp(baseCpy);
  55. id = open(baseCpy, O_RDWR | O_BINARY);
  56. #else
  57. id = mkstemp(baseCpy);
  58. #endif
  59. /* fail */
  60. if (id < 0)
  61. {
  62. free(obj);
  63. free(baseCpy);
  64. return NULL;
  65. }
  66. FILE * f = fdopen(id, "rb+");
  67. /* fail */
  68. if (f == NULL)
  69. {
  70. /* delete fic */
  71. close(id);
  72. unlink(baseCpy);
  73. free(baseCpy);
  74. free(obj);
  75. return NULL;
  76. }
  77. #ifdef STARPU_HAVE_WINDOWS
  78. int val = _chsize(id, size);
  79. #else
  80. int val = ftruncate(id,size);
  81. #endif
  82. /* fail */
  83. if (val < 0)
  84. {
  85. fclose(f);
  86. close(id);
  87. unlink(baseCpy);
  88. free(baseCpy);
  89. free(obj);
  90. return NULL;
  91. }
  92. STARPU_PTHREAD_MUTEX_INIT(&obj->mutex, NULL);
  93. obj->descriptor = id;
  94. obj->file = f;
  95. obj->path = baseCpy;
  96. obj->size = size;
  97. return (void *) obj;
  98. }
  99. /* free memory on disk */
  100. static void starpu_stdio_free(void *base STARPU_ATTRIBUTE_UNUSED, void *obj, size_t size STARPU_ATTRIBUTE_UNUSED)
  101. {
  102. struct starpu_stdio_obj * tmp = (struct starpu_stdio_obj *) obj;
  103. STARPU_PTHREAD_MUTEX_DESTROY(&tmp->mutex);
  104. fclose(tmp->file);
  105. close(tmp->descriptor);
  106. unlink(tmp->path);
  107. free(tmp->path);
  108. free(tmp);
  109. }
  110. /* open an existing memory on disk */
  111. static void *starpu_stdio_open (void *base, void *pos, size_t size)
  112. {
  113. struct starpu_stdio_obj * obj = malloc(sizeof(struct starpu_stdio_obj));
  114. STARPU_ASSERT(obj != NULL);
  115. /* create template */
  116. char * baseCpy = malloc(strlen(base)+1+strlen(pos)+1);
  117. STARPU_ASSERT(baseCpy != NULL);
  118. strcpy(baseCpy,(char *) base);
  119. strcat(baseCpy,(char *) "/");
  120. strcat(baseCpy,(char *) pos);
  121. int id = open(baseCpy, O_RDWR);
  122. if (id < 0)
  123. {
  124. free(obj);
  125. free(baseCpy);
  126. return NULL;
  127. }
  128. FILE * f = fdopen(id,"rb+");
  129. if (f == NULL)
  130. {
  131. free(obj);
  132. free(baseCpy);
  133. return NULL;
  134. }
  135. STARPU_PTHREAD_MUTEX_INIT(&obj->mutex, NULL);
  136. obj->descriptor = id;
  137. obj->file = f;
  138. obj->path = baseCpy;
  139. obj->size = size;
  140. return (void *) obj;
  141. }
  142. /* free memory without delete it */
  143. static void starpu_stdio_close(void *base STARPU_ATTRIBUTE_UNUSED, void *obj, size_t size STARPU_ATTRIBUTE_UNUSED)
  144. {
  145. struct starpu_stdio_obj * tmp = (struct starpu_stdio_obj *) obj;
  146. STARPU_PTHREAD_MUTEX_DESTROY(&tmp->mutex);
  147. fclose(tmp->file);
  148. close(tmp->descriptor);
  149. free(tmp->path);
  150. free(tmp);
  151. }
  152. /* read the memory disk */
  153. static int starpu_stdio_read(void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size)
  154. {
  155. struct starpu_stdio_obj * tmp = (struct starpu_stdio_obj *) obj;
  156. STARPU_PTHREAD_MUTEX_LOCK(&tmp->mutex);
  157. int res = fseek(tmp->file, offset, SEEK_SET);
  158. STARPU_ASSERT_MSG(res == 0, "Stdio read failed");
  159. ssize_t nb = fread (buf, 1, size, tmp->file);
  160. STARPU_ASSERT_MSG(nb >= 0, "Stdio read failed");
  161. STARPU_PTHREAD_MUTEX_UNLOCK(&tmp->mutex);
  162. return 0;
  163. }
  164. static int starpu_stdio_full_read(void *base STARPU_ATTRIBUTE_UNUSED, void * obj, void ** ptr, size_t * size)
  165. {
  166. struct starpu_stdio_obj * tmp = (struct starpu_stdio_obj *) obj;
  167. STARPU_PTHREAD_MUTEX_LOCK(&tmp->mutex);
  168. int res = fseek(tmp->file, 0, SEEK_END);
  169. STARPU_ASSERT_MSG(res == 0, "Stdio write failed");
  170. *size = ftell(tmp->file);
  171. STARPU_PTHREAD_MUTEX_UNLOCK(&tmp->mutex);
  172. /* Alloc aligned buffer */
  173. starpu_malloc_flags(ptr, *size, 0);
  174. return starpu_stdio_read(base, obj, *ptr, 0, *size);
  175. }
  176. /* write on the memory disk */
  177. static int starpu_stdio_write(void *base STARPU_ATTRIBUTE_UNUSED, void *obj, const void *buf, off_t offset, size_t size)
  178. {
  179. struct starpu_stdio_obj * tmp = (struct starpu_stdio_obj *) obj;
  180. STARPU_PTHREAD_MUTEX_LOCK(&tmp->mutex);
  181. int res = fseek(tmp->file, offset, SEEK_SET);
  182. STARPU_ASSERT_MSG(res == 0, "Stdio write failed");
  183. fwrite (buf, 1, size, tmp->file);
  184. STARPU_PTHREAD_MUTEX_UNLOCK(&tmp->mutex);
  185. return 0;
  186. }
  187. static int starpu_stdio_full_write(void * base STARPU_ATTRIBUTE_UNUSED, void * obj, void * ptr, size_t size)
  188. {
  189. struct starpu_stdio_obj * tmp = (struct starpu_stdio_obj *) obj;
  190. /* update file size to realise the next good full_read */
  191. if(size != tmp->size)
  192. {
  193. #ifdef STARPU_HAVE_WINDOWS
  194. int val = _chsize(tmp->descriptor, size);
  195. #else
  196. int val = ftruncate(tmp->descriptor,size);
  197. #endif
  198. STARPU_ASSERT(val == 0);
  199. tmp->size = size;
  200. }
  201. starpu_stdio_write(base, obj, ptr, 0, size);
  202. return 0;
  203. }
  204. /* create a new copy of parameter == base */
  205. static void *starpu_stdio_plug(void *parameter, ssize_t size STARPU_ATTRIBUTE_UNUSED)
  206. {
  207. char * tmp = malloc(sizeof(char)*(strlen(parameter)+1));
  208. STARPU_ASSERT(tmp != NULL);
  209. strcpy(tmp,(char *) parameter);
  210. return (void *) tmp;
  211. }
  212. /* free memory allocated for the base */
  213. static void starpu_stdio_unplug(void *base)
  214. {
  215. free(base);
  216. }
  217. static int get_stdio_bandwidth_between_disk_and_main_ram(unsigned node)
  218. {
  219. unsigned iter;
  220. double timing_slowness, timing_latency;
  221. double start;
  222. double end;
  223. char *buf;
  224. srand (time (NULL));
  225. starpu_malloc_flags((void **) &buf, SIZE_DISK_MIN, 0);
  226. STARPU_ASSERT(buf != NULL);
  227. /* allocate memory */
  228. void * mem = _starpu_disk_alloc(node, SIZE_DISK_MIN);
  229. /* fail to alloc */
  230. if (mem == NULL)
  231. return 0;
  232. struct starpu_stdio_obj * tmp = (struct starpu_stdio_obj *) mem;
  233. memset(buf, 0, SIZE_DISK_MIN);
  234. /* Measure upload slowness */
  235. start = starpu_timing_now();
  236. for (iter = 0; iter < NITER; ++iter)
  237. {
  238. _starpu_disk_write(STARPU_MAIN_RAM, node, mem, buf, 0, SIZE_DISK_MIN, NULL);
  239. /* clean cache memory */
  240. int res = fflush (tmp->file);
  241. STARPU_ASSERT_MSG(res == 0, "Slowness computation failed \n");
  242. #ifdef STARPU_HAVE_WINDOWS
  243. res = _commit(tmp->descriptor);
  244. #else
  245. res = fsync(tmp->descriptor);
  246. #endif
  247. STARPU_ASSERT_MSG(res == 0, "Slowness computation failed \n");
  248. }
  249. end = starpu_timing_now();
  250. timing_slowness = end - start;
  251. /* free memory */
  252. starpu_free_flags(buf, SIZE_DISK_MIN, 0);
  253. starpu_malloc_flags((void**) &buf, sizeof(char), 0);
  254. STARPU_ASSERT(buf != NULL);
  255. *buf = 0;
  256. /* Measure latency */
  257. start = starpu_timing_now();
  258. for (iter = 0; iter < NITER; ++iter)
  259. {
  260. _starpu_disk_write(STARPU_MAIN_RAM, node, mem, buf, rand() % (SIZE_DISK_MIN -1) , 1, NULL);
  261. int res = fflush (tmp->file);
  262. STARPU_ASSERT_MSG(res == 0, "Latency computation failed");
  263. #ifdef STARPU_HAVE_WINDOWS
  264. res = _commit(tmp->descriptor);
  265. #else
  266. res = fsync(tmp->descriptor);
  267. #endif
  268. STARPU_ASSERT_MSG(res == 0, "Latency computation failed");
  269. }
  270. end = starpu_timing_now();
  271. timing_latency = end - start;
  272. _starpu_disk_free(node, mem, SIZE_DISK_MIN);
  273. starpu_free_flags(buf, sizeof(char), 0);
  274. _starpu_save_bandwidth_and_latency_disk((NITER/timing_slowness)*1000000, (NITER/timing_slowness)*1000000,
  275. timing_latency/NITER, timing_latency/NITER, node);
  276. return 1;
  277. }
  278. struct starpu_disk_ops starpu_disk_stdio_ops =
  279. {
  280. .alloc = starpu_stdio_alloc,
  281. .free = starpu_stdio_free,
  282. .open = starpu_stdio_open,
  283. .close = starpu_stdio_close,
  284. .read = starpu_stdio_read,
  285. .write = starpu_stdio_write,
  286. .plug = starpu_stdio_plug,
  287. .unplug = starpu_stdio_unplug,
  288. .copy = NULL,
  289. .bandwidth = get_stdio_bandwidth_between_disk_and_main_ram,
  290. .full_read = starpu_stdio_full_read,
  291. .full_write = starpu_stdio_full_write
  292. };