simgrid.c 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2012-2013 Université de Bordeaux 1
  4. *
  5. * StarPU is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * StarPU is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <starpu.h>
  17. #include <datawizard/memory_nodes.h>
  18. #include <unistd.h>
  19. #include <core/perfmodel/perfmodel.h>
  20. #include <core/workers.h>
  21. #include <core/simgrid.h>
  22. #ifdef STARPU_SIMGRID
  23. #include <msg/msg.h>
  24. #pragma weak starpu_main
  25. extern int starpu_main(int argc, char *argv[]);
  26. struct main_args
  27. {
  28. int argc;
  29. char **argv;
  30. };
  31. int do_starpu_main(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[] STARPU_ATTRIBUTE_UNUSED)
  32. {
  33. struct main_args *args = MSG_process_get_data(MSG_process_self());
  34. return starpu_main(args->argc, args->argv);
  35. }
  36. int _starpu_simgrid_get_nbhosts(const char *prefix)
  37. {
  38. int ret;
  39. xbt_dynar_t hosts = MSG_hosts_as_dynar();
  40. unsigned i, nb = xbt_dynar_length(hosts);
  41. unsigned len = strlen(prefix);
  42. ret = 0;
  43. for (i = 0; i < nb; i++) {
  44. const char *name;
  45. name = MSG_host_get_name(xbt_dynar_get_as(hosts, i, msg_host_t));
  46. if (!strncmp(name, prefix, len))
  47. ret++;
  48. }
  49. xbt_dynar_free(&hosts);
  50. return ret;
  51. }
  52. unsigned long long _starpu_simgrid_get_memsize(const char *prefix, unsigned devid)
  53. {
  54. char name[16];
  55. msg_host_t host;
  56. const char *memsize;
  57. snprintf(name, sizeof(name), "%s%u", prefix, devid);
  58. host = MSG_get_host_by_name(name);
  59. if (!host)
  60. return 0;
  61. memsize = MSG_host_get_property_value(host, "memsize");
  62. if (!memsize)
  63. return 0;
  64. return atoll(memsize);
  65. }
  66. #ifdef STARPU_DEVEL
  67. #warning TODO: use another way to start main, when simgrid provides it, and then include the application-provided configuration for platform numbers
  68. #endif
  69. #undef main
  70. int main(int argc, char **argv)
  71. {
  72. xbt_dynar_t hosts;
  73. int i;
  74. char path[256];
  75. if (!starpu_main)
  76. {
  77. _STARPU_ERROR("The main file of this application needs to be compiled with starpu.h included, to properly define starpu_main\n");
  78. exit(EXIT_FAILURE);
  79. }
  80. MSG_init(&argc, argv);
  81. #if SIMGRID_VERSION_MAJOR < 3 || (SIMGRID_VERSION_MAJOR == 3 && SIMGRID_VERSION_MINOR < 9)
  82. /* Versions earlier than 3.9 didn't support our communication tasks */
  83. MSG_config("workstation/model", "ptask_L07");
  84. #endif
  85. /* Load XML platform */
  86. _starpu_simgrid_get_platform_path(path, sizeof(path));
  87. MSG_create_environment(path);
  88. hosts = MSG_hosts_as_dynar();
  89. int nb = xbt_dynar_length(hosts);
  90. for (i = 0; i < nb; i++)
  91. MSG_host_set_data(xbt_dynar_get_as(hosts, i, msg_host_t), calloc(MAX_TSD, sizeof(void*)));
  92. struct main_args args = { .argc = argc, .argv = argv };
  93. MSG_process_create("main", &do_starpu_main, &args, xbt_dynar_get_as(hosts, 0, msg_host_t));
  94. xbt_dynar_free(&hosts);
  95. MSG_main();
  96. return 0;
  97. }
  98. /* Task execution submitted by StarPU */
  99. void _starpu_simgrid_execute_job(struct _starpu_job *j, struct starpu_perfmodel_arch* perf_arch, double length)
  100. {
  101. struct starpu_task *task = j->task;
  102. msg_task_t simgrid_task;
  103. if (j->internal)
  104. /* This is not useful to include in simulation (and probably
  105. * doesn't have a perfmodel anyway) */
  106. return;
  107. if (isnan(length))
  108. {
  109. length = starpu_task_expected_length(task, perf_arch, j->nimpl);
  110. STARPU_ASSERT_MSG(!_STARPU_IS_ZERO(length) && !isnan(length),
  111. "Codelet %s does not have a perfmodel, or is not calibrated enough, please re-run in non-simgrid mode until it is calibrated",
  112. _starpu_job_get_model_name(j));
  113. }
  114. simgrid_task = MSG_task_create(_starpu_job_get_model_name(j),
  115. length/1000000.0*MSG_get_host_speed(MSG_host_self()),
  116. 0, NULL);
  117. MSG_task_execute(simgrid_task);
  118. }
  119. /* Note: simgrid is not parallel, so there is no need to hold locks for management of transfers. */
  120. LIST_TYPE(transfer,
  121. msg_task_t task;
  122. int src_node;
  123. int dst_node;
  124. int run_node;
  125. /* communication termination signalization */
  126. unsigned *finished;
  127. starpu_pthread_mutex_t *mutex;
  128. starpu_pthread_cond_t *cond;
  129. /* transfers which wait for this transfer */
  130. struct transfer **wake;
  131. unsigned nwake;
  132. /* Number of transfers that this transfer waits for */
  133. unsigned nwait;
  134. )
  135. struct transfer_list *pending;
  136. /* Tell for two transfers whether they should be handled in sequence */
  137. static int transfers_are_sequential(struct transfer *new_transfer, struct transfer *old_transfer)
  138. {
  139. int new_is_cuda STARPU_ATTRIBUTE_UNUSED, old_is_cuda STARPU_ATTRIBUTE_UNUSED;
  140. int new_is_opencl STARPU_ATTRIBUTE_UNUSED, old_is_opencl STARPU_ATTRIBUTE_UNUSED;
  141. int new_is_gpu_gpu, old_is_gpu_gpu;
  142. new_is_cuda = starpu_node_get_kind(new_transfer->src_node) == STARPU_CUDA_RAM;
  143. new_is_cuda |= starpu_node_get_kind(new_transfer->dst_node) == STARPU_CUDA_RAM;
  144. old_is_cuda = starpu_node_get_kind(old_transfer->src_node) == STARPU_CUDA_RAM;
  145. old_is_cuda |= starpu_node_get_kind(old_transfer->dst_node) == STARPU_CUDA_RAM;
  146. new_is_opencl = starpu_node_get_kind(new_transfer->src_node) == STARPU_OPENCL_RAM;
  147. new_is_opencl |= starpu_node_get_kind(new_transfer->dst_node) == STARPU_OPENCL_RAM;
  148. old_is_opencl = starpu_node_get_kind(old_transfer->src_node) == STARPU_OPENCL_RAM;
  149. old_is_opencl |= starpu_node_get_kind(old_transfer->dst_node) == STARPU_OPENCL_RAM;
  150. new_is_gpu_gpu = new_transfer->src_node && new_transfer->dst_node;
  151. old_is_gpu_gpu = old_transfer->src_node && old_transfer->dst_node;
  152. /* We ignore cuda-opencl transfers, they can not happen */
  153. STARPU_ASSERT(!((new_is_cuda && old_is_opencl) || (old_is_cuda && new_is_opencl)));
  154. /* The following constraints have been observed with CUDA alone */
  155. /* Same source/destination, sequential */
  156. if (new_transfer->src_node == old_transfer->src_node && new_transfer->dst_node == old_transfer->dst_node)
  157. return 1;
  158. /* Crossed GPU-GPU, sequential */
  159. if (new_is_gpu_gpu
  160. && new_transfer->src_node == old_transfer->dst_node
  161. && old_transfer->src_node == new_transfer->dst_node)
  162. return 1;
  163. /* GPU-GPU transfers are sequential with any RAM->GPU transfer */
  164. if (new_is_gpu_gpu
  165. && old_transfer->dst_node == new_transfer->src_node
  166. && old_transfer->dst_node == new_transfer->dst_node)
  167. return 1;
  168. if (old_is_gpu_gpu
  169. && new_transfer->dst_node == old_transfer->src_node
  170. && new_transfer->dst_node == old_transfer->dst_node)
  171. return 1;
  172. /* StarPU's constraint on CUDA transfers is using one stream per
  173. * source/destination pair, which is already handled above */
  174. return 0;
  175. }
  176. /* Actually execute the transfer, and then start transfers waiting for this one. */
  177. static int transfer_execute(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[] STARPU_ATTRIBUTE_UNUSED)
  178. {
  179. struct transfer *transfer = MSG_process_get_data(MSG_process_self());
  180. unsigned i;
  181. _STARPU_DEBUG("transfer %p started\n", transfer);
  182. MSG_task_execute(transfer->task);
  183. MSG_task_destroy(transfer->task);
  184. _STARPU_DEBUG("transfer %p finished\n", transfer);
  185. STARPU_PTHREAD_MUTEX_LOCK(transfer->mutex);
  186. *transfer->finished = 1;
  187. STARPU_PTHREAD_COND_BROADCAST(transfer->cond);
  188. STARPU_PTHREAD_MUTEX_UNLOCK(transfer->mutex);
  189. /* The workers which started this request may be sleeping out of tasks, wake it */
  190. _starpu_wake_all_blocked_workers_on_node(transfer->run_node);
  191. /* Wake transfers waiting for my termination */
  192. /* Note: due to possible preemption inside process_create, the array
  193. * may grow while doing this */
  194. for (i = 0; i < transfer->nwake; i++)
  195. {
  196. struct transfer *wake = transfer->wake[i];
  197. STARPU_ASSERT(wake->nwait > 0);
  198. wake->nwait--;
  199. if (!wake->nwait)
  200. {
  201. _STARPU_DEBUG("triggering transfer %p\n", wake);
  202. MSG_process_create("transfer task", transfer_execute, wake, MSG_get_host_by_name("MAIN"));
  203. }
  204. }
  205. free(transfer->wake);
  206. transfer_list_erase(pending, transfer);
  207. transfer_delete(transfer);
  208. return 0;
  209. }
  210. /* Look for sequentialization between this transfer and pending transfers, and submit this one */
  211. static void transfer_submit(struct transfer *transfer)
  212. {
  213. struct transfer *old;
  214. if (!pending)
  215. pending = transfer_list_new();
  216. for (old = transfer_list_begin(pending);
  217. old != transfer_list_end(pending);
  218. old = transfer_list_next(old))
  219. {
  220. if (transfers_are_sequential(transfer, old))
  221. {
  222. _STARPU_DEBUG("transfer %p(%d->%d) waits for %p(%d->%d)\n",
  223. transfer, transfer->src_node, transfer->dst_node,
  224. old, old->src_node, old->dst_node);
  225. /* Make new wait for the old */
  226. transfer->nwait++;
  227. /* Make old wake the new */
  228. old->wake = realloc(old->wake, (old->nwake + 1) * sizeof(old->wake));
  229. old->wake[old->nwake] = transfer;
  230. old->nwake++;
  231. }
  232. }
  233. transfer_list_push_front(pending, transfer);
  234. if (!transfer->nwait)
  235. {
  236. _STARPU_DEBUG("transfer %p waits for nobody, starting\n", transfer);
  237. MSG_process_create("transfer task", transfer_execute, transfer, MSG_get_host_by_name("MAIN"));
  238. }
  239. }
  240. /* Data transfer issued by StarPU */
  241. int _starpu_simgrid_transfer(size_t size, unsigned src_node, unsigned dst_node, struct _starpu_data_request *req)
  242. {
  243. msg_task_t task;
  244. msg_host_t *hosts = calloc(2, sizeof(*hosts));
  245. double *computation = calloc(2, sizeof(*computation));
  246. double *communication = calloc(4, sizeof(*communication));
  247. starpu_pthread_mutex_t mutex;
  248. starpu_pthread_cond_t cond;
  249. unsigned finished;
  250. hosts[0] = _starpu_simgrid_memory_node_get_host(src_node);
  251. hosts[1] = _starpu_simgrid_memory_node_get_host(dst_node);
  252. STARPU_ASSERT(hosts[0] != hosts[1]);
  253. communication[1] = size;
  254. task = MSG_parallel_task_create("copy", 2, hosts, computation, communication, NULL);
  255. struct transfer *transfer = transfer_new();
  256. _STARPU_DEBUG("creating transfer %p for %lu bytes\n", transfer, (unsigned long) size);
  257. transfer->task = task;
  258. transfer->src_node = src_node;
  259. transfer->dst_node = dst_node;
  260. transfer->run_node = _starpu_memory_node_get_local_key();
  261. if (req)
  262. {
  263. transfer->finished = &req->async_channel.event.finished;
  264. transfer->mutex = &req->async_channel.event.mutex;
  265. transfer->cond = &req->async_channel.event.cond;
  266. }
  267. else
  268. {
  269. transfer->finished = &finished;
  270. transfer->mutex = &mutex;
  271. transfer->cond = &cond;
  272. }
  273. *transfer->finished = 0;
  274. STARPU_PTHREAD_MUTEX_INIT(transfer->mutex, NULL);
  275. STARPU_PTHREAD_COND_INIT(transfer->cond, NULL);
  276. transfer->wake = NULL;
  277. transfer->nwake = 0;
  278. transfer->nwait = 0;
  279. if (req)
  280. _STARPU_TRACE_START_DRIVER_COPY_ASYNC(src_node, dst_node);
  281. /* Sleep 10µs for the GPU transfer queueing */
  282. MSG_process_sleep(0.000010);
  283. transfer_submit(transfer);
  284. /* Note: from here, transfer might be already freed */
  285. if (req)
  286. {
  287. _STARPU_TRACE_END_DRIVER_COPY_ASYNC(src_node, dst_node);
  288. _STARPU_TRACE_DATA_COPY(src_node, dst_node, size);
  289. return -EAGAIN;
  290. }
  291. else
  292. {
  293. /* this is not associated to a request so it's synchronous */
  294. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  295. while (!finished)
  296. STARPU_PTHREAD_COND_WAIT(&cond, &mutex);
  297. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  298. return 0;
  299. }
  300. }
  301. int
  302. _starpu_simgrid_thread_start(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[] STARPU_ATTRIBUTE_UNUSED)
  303. {
  304. struct _starpu_pthread_args *args = MSG_process_get_data(MSG_process_self());
  305. args->f(args->arg);
  306. free(args);
  307. return 0;
  308. }
  309. #endif