simgrid.c 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2012-2014 Université de Bordeaux 1
  4. *
  5. * StarPU is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * StarPU is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <starpu.h>
  17. #include <datawizard/memory_nodes.h>
  18. #include <common/config.h>
  19. #ifdef HAVE_UNISTD_H
  20. #include <unistd.h>
  21. #endif
  22. #include <core/perfmodel/perfmodel.h>
  23. #include <core/workers.h>
  24. #include <core/simgrid.h>
  25. #ifdef STARPU_SIMGRID
  26. #include <msg/msg.h>
  27. #include <smpi/smpif.h>
  28. #define STARPU_MPI_AS_PREFIX "StarPU-MPI"
  29. #pragma weak starpu_main
  30. extern int starpu_main(int argc, char *argv[]);
  31. #pragma weak smpi_main
  32. extern int smpi_main(int (*realmain) (int argc, char *argv[]), int argc, char *argv[]);
  33. #pragma weak smpi_simulated_main_
  34. extern int smpi_simulated_main_(int argc, char *argv[]);
  35. #pragma weak starpu_mpi_world_rank
  36. extern int starpu_mpi_world_rank(void);
  37. #define _starpu_simgrid_running_smpi() (getenv("SMPI_GLOBAL_SIZE") != NULL)
  38. struct main_args
  39. {
  40. int argc;
  41. char **argv;
  42. };
  43. int do_starpu_main(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[] STARPU_ATTRIBUTE_UNUSED)
  44. {
  45. struct main_args *args = MSG_process_get_data(MSG_process_self());
  46. return starpu_main(args->argc, args->argv);
  47. }
  48. #ifdef HAVE_MSG_ENVIRONMENT_GET_ROUTING_ROOT
  49. #ifdef HAVE_MSG_GET_AS_BY_NAME
  50. static msg_as_t _starpu_simgrid_get_as_by_name(const char *name)
  51. {
  52. return MSG_get_as_by_name(name);
  53. }
  54. #else /* HAVE_MSG_GET_AS_BY_NAME */
  55. static msg_as_t __starpu_simgrid_get_as_by_name(msg_as_t root, const char *name)
  56. {
  57. xbt_dict_t dict;
  58. xbt_dict_cursor_t cursor;
  59. const char *key;
  60. msg_as_t as, ret;
  61. dict = MSG_environment_as_get_routing_sons(root);
  62. xbt_dict_foreach(dict, cursor, key, as) {
  63. if (!strcmp(MSG_environment_as_get_name(as), name))
  64. return as;
  65. ret = __starpu_simgrid_get_as_by_name(as, name);
  66. if (ret)
  67. return ret;
  68. }
  69. return NULL;
  70. }
  71. static msg_as_t _starpu_simgrid_get_as_by_name(const char *name)
  72. {
  73. return __starpu_simgrid_get_as_by_name(MSG_environment_get_routing_root(), name);
  74. }
  75. #endif /* HAVE_MSG_GET_AS_BY_NAME */
  76. #endif /* HAVE_MSG_ENVIRONMENT_GET_ROUTING_ROOT */
  77. int _starpu_simgrid_get_nbhosts(const char *prefix)
  78. {
  79. int ret;
  80. xbt_dynar_t hosts;
  81. unsigned i, nb;
  82. unsigned len = strlen(prefix);
  83. #ifdef HAVE_MSG_ENVIRONMENT_GET_ROUTING_ROOT
  84. if (_starpu_simgrid_running_smpi())
  85. {
  86. char name[16];
  87. STARPU_ASSERT(starpu_mpi_world_rank);
  88. snprintf(name, sizeof(name), STARPU_MPI_AS_PREFIX"%u", starpu_mpi_world_rank());
  89. hosts = MSG_environment_as_get_hosts(_starpu_simgrid_get_as_by_name(name));
  90. }
  91. else
  92. #endif /* HAVE_MSG_ENVIRONMENT_GET_ROUTING_ROOT */
  93. hosts = MSG_hosts_as_dynar();
  94. nb = xbt_dynar_length(hosts);
  95. ret = 0;
  96. for (i = 0; i < nb; i++) {
  97. const char *name;
  98. name = MSG_host_get_name(xbt_dynar_get_as(hosts, i, msg_host_t));
  99. if (!strncmp(name, prefix, len))
  100. ret++;
  101. }
  102. xbt_dynar_free(&hosts);
  103. return ret;
  104. }
  105. unsigned long long _starpu_simgrid_get_memsize(const char *prefix, unsigned devid)
  106. {
  107. char name[16];
  108. msg_host_t host;
  109. const char *memsize;
  110. snprintf(name, sizeof(name), "%s%u", prefix, devid);
  111. host = _starpu_simgrid_get_host_by_name(name);
  112. if (!host)
  113. return 0;
  114. if (!MSG_host_get_properties(host))
  115. return 0;
  116. memsize = MSG_host_get_property_value(host, "memsize");
  117. if (!memsize)
  118. return 0;
  119. return atoll(memsize);
  120. }
  121. msg_host_t _starpu_simgrid_get_host_by_name(const char *name)
  122. {
  123. if (_starpu_simgrid_running_smpi())
  124. {
  125. char mpiname[16];
  126. STARPU_ASSERT(starpu_mpi_world_rank);
  127. snprintf(mpiname, sizeof(mpiname), "%d-%s", starpu_mpi_world_rank(), name);
  128. return MSG_get_host_by_name(mpiname);
  129. }
  130. else
  131. return MSG_get_host_by_name(name);
  132. }
  133. #ifdef STARPU_DEVEL
  134. #warning TODO: use another way to start main, when simgrid provides it, and then include the application-provided configuration for platform numbers
  135. #endif
  136. #undef main
  137. int main(int argc, char **argv)
  138. {
  139. char path[256];
  140. if (!starpu_main && !(smpi_main && smpi_simulated_main_))
  141. {
  142. _STARPU_ERROR("The main file of this application needs to be compiled with starpu.h included, to properly define starpu_main\n");
  143. exit(EXIT_FAILURE);
  144. }
  145. if (_starpu_simgrid_running_smpi())
  146. {
  147. /* Oops, we are running SMPI, let it start Simgrid, and we'll
  148. * take back hand in _starpu_simgrid_init from starpu_init() */
  149. return smpi_main(smpi_simulated_main_, argc, argv);
  150. }
  151. MSG_init(&argc, argv);
  152. #if SIMGRID_VERSION_MAJOR < 3 || (SIMGRID_VERSION_MAJOR == 3 && SIMGRID_VERSION_MINOR < 9)
  153. /* Versions earlier than 3.9 didn't support our communication tasks */
  154. MSG_config("workstation/model", "ptask_L07");
  155. #endif
  156. /* Simgrid uses tiny stacks by default. This comes unexpected to our users. */
  157. extern xbt_cfg_t _sg_cfg_set;
  158. xbt_cfg_set_int(_sg_cfg_set, "contexts/stack_size", 8192);
  159. /* Load XML platform */
  160. _starpu_simgrid_get_platform_path(path, sizeof(path));
  161. MSG_create_environment(path);
  162. struct main_args args = { .argc = argc, .argv = argv };
  163. MSG_process_create("main", &do_starpu_main, &args, MSG_get_host_by_name("MAIN"));
  164. MSG_main();
  165. return 0;
  166. }
  167. void _starpu_simgrid_init()
  168. {
  169. xbt_dynar_t hosts;
  170. int i;
  171. #ifdef HAVE_MSG_ENVIRONMENT_GET_ROUTING_ROOT
  172. if (_starpu_simgrid_running_smpi())
  173. {
  174. /* Take back hand to create the local platform for this MPI
  175. * node */
  176. char asname[16];
  177. char path[256];
  178. char cmdline[1024];
  179. FILE *in;
  180. int out;
  181. #ifdef HAVE_MKSTEMPS
  182. char template[] = "/tmp/"STARPU_MPI_AS_PREFIX"-platform-XXXXXX.xml";
  183. #else
  184. char template[] = "/tmp/"STARPU_MPI_AS_PREFIX"-platform-XXXXXX";
  185. #endif
  186. int ret;
  187. STARPU_ASSERT(starpu_mpi_world_rank);
  188. snprintf(asname, sizeof(asname), STARPU_MPI_AS_PREFIX"%u", starpu_mpi_world_rank());
  189. /* Get XML platform */
  190. _starpu_simgrid_get_platform_path(path, sizeof(path));
  191. in = fopen(path, "r");
  192. _starpu_frdlock(in);
  193. STARPU_ASSERT_MSG(in, "Could not open platform file %s", path);
  194. #ifdef HAVE_MKSTEMPS
  195. out = mkstemps(template, strlen(".xml"));
  196. #else
  197. out = mkstemp(template);
  198. #endif
  199. /* Generate modified XML platform */
  200. STARPU_ASSERT_MSG(out >= 0, "Could not create temporary file like %s", template);
  201. close(out);
  202. snprintf(cmdline, sizeof(cmdline), "xsltproc --novalid --stringparam ASname %s -o %s "STARPU_DATADIR"/starpu/starpu_smpi.xslt %s", asname, template, path);
  203. ret = system(cmdline);
  204. STARPU_ASSERT_MSG(ret == 0, "running xsltproc to generate SMPI platforms %s from %s failed", template, path);
  205. _starpu_frdunlock(in);
  206. fclose(in);
  207. /* And create it */
  208. MSG_create_environment(template);
  209. unlink(template);
  210. hosts = MSG_environment_as_get_hosts(_starpu_simgrid_get_as_by_name(asname));
  211. }
  212. else
  213. #endif /* HAVE_MSG_ENVIRONMENT_GET_ROUTING_ROOT */
  214. hosts = MSG_hosts_as_dynar();
  215. int nb = xbt_dynar_length(hosts);
  216. for (i = 0; i < nb; i++)
  217. MSG_host_set_data(xbt_dynar_get_as(hosts, i, msg_host_t), calloc(MAX_TSD, sizeof(void*)));
  218. xbt_dynar_free(&hosts);
  219. }
  220. /* Task execution submitted by StarPU */
  221. void _starpu_simgrid_execute_job(struct _starpu_job *j, struct starpu_perfmodel_arch* perf_arch, double length)
  222. {
  223. struct starpu_task *task = j->task;
  224. msg_task_t simgrid_task;
  225. if (j->internal)
  226. /* This is not useful to include in simulation (and probably
  227. * doesn't have a perfmodel anyway) */
  228. return;
  229. if (isnan(length))
  230. {
  231. length = starpu_task_expected_length(task, perf_arch, j->nimpl);
  232. STARPU_ASSERT_MSG(!_STARPU_IS_ZERO(length) && !isnan(length),
  233. "Codelet %s does not have a perfmodel, or is not calibrated enough, please re-run in non-simgrid mode until it is calibrated",
  234. _starpu_job_get_model_name(j));
  235. }
  236. simgrid_task = MSG_task_create(_starpu_job_get_model_name(j),
  237. length/1000000.0*MSG_get_host_speed(MSG_host_self()),
  238. 0, NULL);
  239. MSG_task_execute(simgrid_task);
  240. MSG_task_destroy(simgrid_task);
  241. }
  242. /* Note: simgrid is not parallel, so there is no need to hold locks for management of transfers. */
  243. LIST_TYPE(transfer,
  244. msg_task_t task;
  245. int src_node;
  246. int dst_node;
  247. int run_node;
  248. /* communication termination signalization */
  249. unsigned *finished;
  250. starpu_pthread_mutex_t *mutex;
  251. starpu_pthread_cond_t *cond;
  252. /* transfers which wait for this transfer */
  253. struct transfer **wake;
  254. unsigned nwake;
  255. /* Number of transfers that this transfer waits for */
  256. unsigned nwait;
  257. )
  258. struct transfer_list *pending;
  259. /* Tell for two transfers whether they should be handled in sequence */
  260. static int transfers_are_sequential(struct transfer *new_transfer, struct transfer *old_transfer)
  261. {
  262. int new_is_cuda STARPU_ATTRIBUTE_UNUSED, old_is_cuda STARPU_ATTRIBUTE_UNUSED;
  263. int new_is_opencl STARPU_ATTRIBUTE_UNUSED, old_is_opencl STARPU_ATTRIBUTE_UNUSED;
  264. int new_is_gpu_gpu, old_is_gpu_gpu;
  265. new_is_cuda = starpu_node_get_kind(new_transfer->src_node) == STARPU_CUDA_RAM;
  266. new_is_cuda |= starpu_node_get_kind(new_transfer->dst_node) == STARPU_CUDA_RAM;
  267. old_is_cuda = starpu_node_get_kind(old_transfer->src_node) == STARPU_CUDA_RAM;
  268. old_is_cuda |= starpu_node_get_kind(old_transfer->dst_node) == STARPU_CUDA_RAM;
  269. new_is_opencl = starpu_node_get_kind(new_transfer->src_node) == STARPU_OPENCL_RAM;
  270. new_is_opencl |= starpu_node_get_kind(new_transfer->dst_node) == STARPU_OPENCL_RAM;
  271. old_is_opencl = starpu_node_get_kind(old_transfer->src_node) == STARPU_OPENCL_RAM;
  272. old_is_opencl |= starpu_node_get_kind(old_transfer->dst_node) == STARPU_OPENCL_RAM;
  273. new_is_gpu_gpu = new_transfer->src_node && new_transfer->dst_node;
  274. old_is_gpu_gpu = old_transfer->src_node && old_transfer->dst_node;
  275. /* We ignore cuda-opencl transfers, they can not happen */
  276. STARPU_ASSERT(!((new_is_cuda && old_is_opencl) || (old_is_cuda && new_is_opencl)));
  277. /* The following constraints have been observed with CUDA alone */
  278. /* Same source/destination, sequential */
  279. if (new_transfer->src_node == old_transfer->src_node && new_transfer->dst_node == old_transfer->dst_node)
  280. return 1;
  281. /* Crossed GPU-GPU, sequential */
  282. if (new_is_gpu_gpu
  283. && new_transfer->src_node == old_transfer->dst_node
  284. && old_transfer->src_node == new_transfer->dst_node)
  285. return 1;
  286. /* GPU-GPU transfers are sequential with any RAM->GPU transfer */
  287. if (new_is_gpu_gpu
  288. && old_transfer->dst_node == new_transfer->src_node
  289. && old_transfer->dst_node == new_transfer->dst_node)
  290. return 1;
  291. if (old_is_gpu_gpu
  292. && new_transfer->dst_node == old_transfer->src_node
  293. && new_transfer->dst_node == old_transfer->dst_node)
  294. return 1;
  295. /* StarPU's constraint on CUDA transfers is using one stream per
  296. * source/destination pair, which is already handled above */
  297. return 0;
  298. }
  299. /* Actually execute the transfer, and then start transfers waiting for this one. */
  300. static int transfer_execute(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[] STARPU_ATTRIBUTE_UNUSED)
  301. {
  302. struct transfer *transfer = MSG_process_get_data(MSG_process_self());
  303. unsigned i;
  304. _STARPU_DEBUG("transfer %p started\n", transfer);
  305. MSG_task_execute(transfer->task);
  306. MSG_task_destroy(transfer->task);
  307. _STARPU_DEBUG("transfer %p finished\n", transfer);
  308. STARPU_PTHREAD_MUTEX_LOCK(transfer->mutex);
  309. *transfer->finished = 1;
  310. STARPU_PTHREAD_COND_BROADCAST(transfer->cond);
  311. STARPU_PTHREAD_MUTEX_UNLOCK(transfer->mutex);
  312. /* The workers which started this request may be sleeping out of tasks, wake it */
  313. _starpu_wake_all_blocked_workers_on_node(transfer->run_node);
  314. /* Wake transfers waiting for my termination */
  315. /* Note: due to possible preemption inside process_create, the array
  316. * may grow while doing this */
  317. for (i = 0; i < transfer->nwake; i++)
  318. {
  319. struct transfer *wake = transfer->wake[i];
  320. STARPU_ASSERT(wake->nwait > 0);
  321. wake->nwait--;
  322. if (!wake->nwait)
  323. {
  324. _STARPU_DEBUG("triggering transfer %p\n", wake);
  325. MSG_process_create("transfer task", transfer_execute, wake, _starpu_simgrid_get_host_by_name("MAIN"));
  326. }
  327. }
  328. free(transfer->wake);
  329. transfer_list_erase(pending, transfer);
  330. transfer_delete(transfer);
  331. return 0;
  332. }
  333. /* Look for sequentialization between this transfer and pending transfers, and submit this one */
  334. static void transfer_submit(struct transfer *transfer)
  335. {
  336. struct transfer *old;
  337. if (!pending)
  338. pending = transfer_list_new();
  339. for (old = transfer_list_begin(pending);
  340. old != transfer_list_end(pending);
  341. old = transfer_list_next(old))
  342. {
  343. if (transfers_are_sequential(transfer, old))
  344. {
  345. _STARPU_DEBUG("transfer %p(%d->%d) waits for %p(%d->%d)\n",
  346. transfer, transfer->src_node, transfer->dst_node,
  347. old, old->src_node, old->dst_node);
  348. /* Make new wait for the old */
  349. transfer->nwait++;
  350. /* Make old wake the new */
  351. old->wake = realloc(old->wake, (old->nwake + 1) * sizeof(old->wake));
  352. old->wake[old->nwake] = transfer;
  353. old->nwake++;
  354. }
  355. }
  356. transfer_list_push_front(pending, transfer);
  357. if (!transfer->nwait)
  358. {
  359. _STARPU_DEBUG("transfer %p waits for nobody, starting\n", transfer);
  360. MSG_process_create("transfer task", transfer_execute, transfer, _starpu_simgrid_get_host_by_name("MAIN"));
  361. }
  362. }
  363. /* Data transfer issued by StarPU */
  364. int _starpu_simgrid_transfer(size_t size, unsigned src_node, unsigned dst_node, struct _starpu_data_request *req)
  365. {
  366. msg_task_t task;
  367. msg_host_t *hosts = calloc(2, sizeof(*hosts));
  368. double *computation = calloc(2, sizeof(*computation));
  369. double *communication = calloc(4, sizeof(*communication));
  370. starpu_pthread_mutex_t mutex;
  371. starpu_pthread_cond_t cond;
  372. unsigned finished;
  373. hosts[0] = _starpu_simgrid_memory_node_get_host(src_node);
  374. hosts[1] = _starpu_simgrid_memory_node_get_host(dst_node);
  375. STARPU_ASSERT(hosts[0] != hosts[1]);
  376. communication[1] = size;
  377. task = MSG_parallel_task_create("copy", 2, hosts, computation, communication, NULL);
  378. struct transfer *transfer = transfer_new();
  379. _STARPU_DEBUG("creating transfer %p for %lu bytes\n", transfer, (unsigned long) size);
  380. transfer->task = task;
  381. transfer->src_node = src_node;
  382. transfer->dst_node = dst_node;
  383. transfer->run_node = _starpu_memory_node_get_local_key();
  384. if (req)
  385. {
  386. transfer->finished = &req->async_channel.event.finished;
  387. transfer->mutex = &req->async_channel.event.mutex;
  388. transfer->cond = &req->async_channel.event.cond;
  389. }
  390. else
  391. {
  392. transfer->finished = &finished;
  393. transfer->mutex = &mutex;
  394. transfer->cond = &cond;
  395. }
  396. *transfer->finished = 0;
  397. STARPU_PTHREAD_MUTEX_INIT(transfer->mutex, NULL);
  398. STARPU_PTHREAD_COND_INIT(transfer->cond, NULL);
  399. transfer->wake = NULL;
  400. transfer->nwake = 0;
  401. transfer->nwait = 0;
  402. if (req)
  403. _STARPU_TRACE_START_DRIVER_COPY_ASYNC(src_node, dst_node);
  404. /* Sleep 10µs for the GPU transfer queueing */
  405. MSG_process_sleep(0.000010);
  406. transfer_submit(transfer);
  407. /* Note: from here, transfer might be already freed */
  408. if (req)
  409. {
  410. _STARPU_TRACE_END_DRIVER_COPY_ASYNC(src_node, dst_node);
  411. _STARPU_TRACE_DATA_COPY(src_node, dst_node, size);
  412. return -EAGAIN;
  413. }
  414. else
  415. {
  416. /* this is not associated to a request so it's synchronous */
  417. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  418. while (!finished)
  419. STARPU_PTHREAD_COND_WAIT(&cond, &mutex);
  420. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  421. return 0;
  422. }
  423. }
  424. int
  425. _starpu_simgrid_thread_start(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[] STARPU_ATTRIBUTE_UNUSED)
  426. {
  427. struct _starpu_pthread_args *_args = MSG_process_get_data(MSG_process_self());
  428. struct _starpu_pthread_args args = *_args;
  429. free(_args);
  430. args.f(args.arg);
  431. return 0;
  432. }
  433. #endif