simgrid.c 32 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2012-2017 Université de Bordeaux
  4. * Copyright (C) 2016 Inria
  5. * Copyright (C) 2016, 2017 CNRS
  6. *
  7. * StarPU is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU Lesser General Public License as published by
  9. * the Free Software Foundation; either version 2.1 of the License, or (at
  10. * your option) any later version.
  11. *
  12. * StarPU is distributed in the hope that it will be useful, but
  13. * WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  15. *
  16. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  17. */
  18. #include <starpu.h>
  19. #include <datawizard/memory_nodes.h>
  20. #include <common/config.h>
  21. #ifdef HAVE_UNISTD_H
  22. #include <unistd.h>
  23. #endif
  24. #include <core/perfmodel/perfmodel.h>
  25. #include <core/workers.h>
  26. #include <core/simgrid.h>
  27. #if defined(HAVE_SG_LINK_NAME) && (SIMGRID_VERSION_MAJOR >= 4 || (SIMGRID_VERSION_MAJOR == 3 && SIMGRID_VERSION_MINOR >= 13))
  28. #include <simgrid/simdag.h>
  29. #endif
  30. #ifdef STARPU_SIMGRID
  31. #ifdef HAVE_GETRLIMIT
  32. #include <sys/resource.h>
  33. #endif
  34. #include <simgrid/simix.h>
  35. #pragma weak starpu_main
  36. extern int starpu_main(int argc, char *argv[]);
  37. #pragma weak smpi_main
  38. extern int smpi_main(int (*realmain) (int argc, char *argv[]), int argc, char *argv[]);
  39. #pragma weak _starpu_mpi_simgrid_init
  40. extern int _starpu_mpi_simgrid_init(int argc, char *argv[]);
  41. static int simgrid_started;
  42. static int runners_running;
  43. starpu_pthread_queue_t _starpu_simgrid_transfer_queue[STARPU_MAXNODES];
  44. static struct transfer_runner
  45. {
  46. struct transfer *first_transfer, *last_transfer;
  47. msg_sem_t sem;
  48. msg_process_t runner;
  49. } transfer_runner[STARPU_MAXNODES][STARPU_MAXNODES];
  50. static int transfer_execute(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[] STARPU_ATTRIBUTE_UNUSED);
  51. starpu_pthread_queue_t _starpu_simgrid_task_queue[STARPU_NMAXWORKERS];
  52. static struct worker_runner
  53. {
  54. struct task *first_task, *last_task;
  55. msg_sem_t sem;
  56. msg_process_t runner;
  57. } worker_runner[STARPU_NMAXWORKERS];
  58. static int task_execute(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[] STARPU_ATTRIBUTE_UNUSED);
  59. /* In case the MPI application didn't use smpicc to build the file containing
  60. * main(), try to cope by calling starpu_main */
  61. int _starpu_smpi_simulated_main_(int argc, char *argv[])
  62. {
  63. if (!starpu_main)
  64. {
  65. _STARPU_ERROR("In simgrid mode, the file containing the main() function of this application needs to be compiled with starpu.h or starpu_simgrid_wrap.h included, to properly rename it into starpu_main\n");
  66. }
  67. return starpu_main(argc, argv);
  68. }
  69. int smpi_simulated_main_(int argc, char *argv[]) __attribute__((weak, alias("_starpu_smpi_simulated_main_")));
  70. #ifdef HAVE_MSG_ENVIRONMENT_GET_ROUTING_ROOT
  71. #ifdef HAVE_MSG_GET_AS_BY_NAME
  72. msg_as_t _starpu_simgrid_get_as_by_name(const char *name)
  73. {
  74. return MSG_get_as_by_name(name);
  75. }
  76. #else /* HAVE_MSG_GET_AS_BY_NAME */
  77. static msg_as_t __starpu_simgrid_get_as_by_name(msg_as_t root, const char *name)
  78. {
  79. xbt_dict_t dict;
  80. xbt_dict_cursor_t cursor;
  81. const char *key;
  82. msg_as_t as, ret;
  83. dict = MSG_environment_as_get_routing_sons(root);
  84. xbt_dict_foreach(dict, cursor, key, as)
  85. {
  86. if (!strcmp(MSG_environment_as_get_name(as), name))
  87. return as;
  88. ret = __starpu_simgrid_get_as_by_name(as, name);
  89. if (ret)
  90. return ret;
  91. }
  92. return NULL;
  93. }
  94. msg_as_t _starpu_simgrid_get_as_by_name(const char *name)
  95. {
  96. return __starpu_simgrid_get_as_by_name(MSG_environment_get_routing_root(), name);
  97. }
  98. #endif /* HAVE_MSG_GET_AS_BY_NAME */
  99. #endif /* HAVE_MSG_ENVIRONMENT_GET_ROUTING_ROOT */
  100. int _starpu_simgrid_get_nbhosts(const char *prefix)
  101. {
  102. int ret;
  103. xbt_dynar_t hosts;
  104. unsigned i, nb;
  105. unsigned len = strlen(prefix);
  106. #ifdef HAVE_MSG_ENVIRONMENT_GET_ROUTING_ROOT
  107. char new_prefix[32];
  108. if (_starpu_simgrid_running_smpi())
  109. {
  110. char name[32];
  111. STARPU_ASSERT(starpu_mpi_world_rank);
  112. snprintf(name, sizeof(name), STARPU_MPI_AS_PREFIX"%u", starpu_mpi_world_rank());
  113. hosts = MSG_environment_as_get_hosts(_starpu_simgrid_get_as_by_name(name));
  114. snprintf(new_prefix, sizeof(new_prefix), "%s-%s", name, prefix);
  115. prefix = new_prefix;
  116. len = strlen(prefix);
  117. }
  118. else
  119. #endif /* HAVE_MSG_ENVIRONMENT_GET_ROUTING_ROOT */
  120. hosts = MSG_hosts_as_dynar();
  121. nb = xbt_dynar_length(hosts);
  122. ret = 0;
  123. for (i = 0; i < nb; i++)
  124. {
  125. const char *name;
  126. name = MSG_host_get_name(xbt_dynar_get_as(hosts, i, msg_host_t));
  127. if (!strncmp(name, prefix, len))
  128. ret++;
  129. }
  130. xbt_dynar_free(&hosts);
  131. return ret;
  132. }
  133. unsigned long long _starpu_simgrid_get_memsize(const char *prefix, unsigned devid)
  134. {
  135. char name[32];
  136. msg_host_t host;
  137. const char *memsize;
  138. snprintf(name, sizeof(name), "%s%u", prefix, devid);
  139. host = _starpu_simgrid_get_host_by_name(name);
  140. if (!host)
  141. return 0;
  142. if (!MSG_host_get_properties(host))
  143. return 0;
  144. memsize = MSG_host_get_property_value(host, "memsize");
  145. if (!memsize)
  146. return 0;
  147. return atoll(memsize);
  148. }
  149. msg_host_t _starpu_simgrid_get_host_by_name(const char *name)
  150. {
  151. if (_starpu_simgrid_running_smpi())
  152. {
  153. char mpiname[32];
  154. STARPU_ASSERT(starpu_mpi_world_rank);
  155. snprintf(mpiname, sizeof(mpiname), STARPU_MPI_AS_PREFIX"%d-%s", starpu_mpi_world_rank(), name);
  156. return MSG_get_host_by_name(mpiname);
  157. }
  158. else
  159. return MSG_get_host_by_name(name);
  160. }
  161. msg_host_t _starpu_simgrid_get_host_by_worker(struct _starpu_worker *worker)
  162. {
  163. char *prefix;
  164. char name[16];
  165. msg_host_t host;
  166. switch (worker->arch)
  167. {
  168. case STARPU_CPU_WORKER:
  169. prefix = "CPU";
  170. break;
  171. case STARPU_CUDA_WORKER:
  172. prefix = "CUDA";
  173. break;
  174. case STARPU_OPENCL_WORKER:
  175. prefix = "OpenCL";
  176. break;
  177. default:
  178. STARPU_ASSERT(0);
  179. }
  180. snprintf(name, sizeof(name), "%s%d", prefix, worker->devid);
  181. host = _starpu_simgrid_get_host_by_name(name);
  182. STARPU_ASSERT_MSG(host, "Could not find host %s!", name);
  183. return host;
  184. }
  185. static void start_simgrid(int *argc, char **argv)
  186. {
  187. char path[256];
  188. simgrid_started = 1;
  189. if (!starpu_main && !(smpi_main && smpi_simulated_main_ != _starpu_smpi_simulated_main_))
  190. {
  191. _STARPU_ERROR("In simgrid mode, the file containing the main() function of this application needs to be compiled with starpu.h or starpu_simgrid_wrap.h included, to properly rename it into starpu_main\n");
  192. }
  193. MSG_init(argc, argv);
  194. #if SIMGRID_VERSION_MAJOR < 3 || (SIMGRID_VERSION_MAJOR == 3 && SIMGRID_VERSION_MINOR < 9)
  195. /* Versions earlier than 3.9 didn't support our communication tasks */
  196. MSG_config("workstation/model", "ptask_L07");
  197. #endif
  198. /* Simgrid uses tiny stacks by default. This comes unexpected to our users. */
  199. unsigned stack_size = 8192;
  200. #ifdef HAVE_GETRLIMIT
  201. struct rlimit rlim;
  202. if (getrlimit(RLIMIT_STACK, &rlim) == 0 && rlim.rlim_cur != 0 && rlim.rlim_cur != RLIM_INFINITY)
  203. stack_size = rlim.rlim_cur / 1024;
  204. #endif
  205. #if SIMGRID_VERSION_MAJOR < 3 || (SIMGRID_VERSION_MAJOR == 3 && SIMGRID_VERSION_MINOR < 13)
  206. extern xbt_cfg_t _sg_cfg_set;
  207. xbt_cfg_set_int(_sg_cfg_set, "contexts/stack_size", stack_size);
  208. #else
  209. xbt_cfg_set_int("contexts/stack-size", stack_size);
  210. #endif
  211. /* Load XML platform */
  212. #if SIMGRID_VERSION_MAJOR < 3 || (SIMGRID_VERSION_MAJOR == 3 && SIMGRID_VERSION_MINOR < 13)
  213. _starpu_simgrid_get_platform_path(3, path, sizeof(path));
  214. #else
  215. _starpu_simgrid_get_platform_path(4, path, sizeof(path));
  216. #endif
  217. MSG_create_environment(path);
  218. }
  219. struct main_args
  220. {
  221. int argc;
  222. char **argv;
  223. };
  224. static int main_ret;
  225. int do_starpu_main(int argc, char *argv[])
  226. {
  227. /* FIXME: Ugly work-around for bug in simgrid: the MPI context is not properly set at MSG process startup */
  228. MSG_process_sleep(0.000001);
  229. main_ret = starpu_main(argc, argv);
  230. return main_ret;
  231. }
  232. #undef main
  233. #pragma weak main
  234. int main(int argc, char **argv)
  235. {
  236. if (_starpu_simgrid_running_smpi())
  237. {
  238. /* Oops, we are running SMPI, let it start Simgrid, and we'll
  239. * take back hand in _starpu_simgrid_init from starpu_init() */
  240. return smpi_main(_starpu_mpi_simgrid_init, argc, argv);
  241. }
  242. /* Managed to catch application's main, initialize simgrid first */
  243. start_simgrid(&argc, argv);
  244. /* Create a simgrid process for main */
  245. char **argv_cpy;
  246. _STARPU_MALLOC(argv_cpy, argc * sizeof(char*));
  247. int i;
  248. for (i = 0; i < argc; i++)
  249. argv_cpy[i] = strdup(argv[i]);
  250. void **tsd;
  251. _STARPU_CALLOC(tsd, MAX_TSD+1, sizeof(void*));
  252. MSG_process_create_with_arguments("main", &do_starpu_main, tsd, MSG_get_host_by_name("MAIN"), argc, argv_cpy);
  253. /* And run maestro in main thread */
  254. MSG_main();
  255. return main_ret;
  256. }
  257. #ifdef HAVE_MSG_PROCESS_ATTACH
  258. static void maestro(void *data STARPU_ATTRIBUTE_UNUSED)
  259. {
  260. MSG_main();
  261. }
  262. #endif
  263. void _starpu_simgrid_init_early(int *argc STARPU_ATTRIBUTE_UNUSED, char ***argv STARPU_ATTRIBUTE_UNUSED)
  264. {
  265. #ifdef HAVE_MSG_PROCESS_ATTACH
  266. if (!simgrid_started && !(smpi_main && smpi_simulated_main_ != _starpu_smpi_simulated_main_))
  267. {
  268. _STARPU_DISP("Warning: In simgrid mode, the file containing the main() function of this application should to be compiled with starpu.h or starpu_simgrid_wrap.h included, to properly rename it into starpu_main to avoid having to use --cfg=contexts/factory:thread which reduces performance\n");
  269. #if SIMGRID_VERSION_MAJOR > 3 || (SIMGRID_VERSION_MAJOR == 3 && SIMGRID_VERSION_MINOR >= 14) /* Only recent versions of simgrid support setting xbt_cfg_set_string before starting simgrid */
  270. /* "Cannot create_maestro with this ContextFactory.
  271. * Try using --cfg=contexts/factory:thread instead."
  272. * See https://github.com/simgrid/simgrid/issues/141 */
  273. xbt_cfg_set_string("contexts/factory", "thread");
  274. #endif
  275. /* We didn't catch application's main. */
  276. /* Start maestro as a separate thread */
  277. SIMIX_set_maestro(maestro, NULL);
  278. /* Initialize simgrid */
  279. start_simgrid(argc, *argv);
  280. /* And attach the main thread to the main simgrid process */
  281. void **tsd;
  282. _STARPU_CALLOC(tsd, MAX_TSD+1, sizeof(void*));
  283. MSG_process_attach("main", tsd, MSG_get_host_by_name("MAIN"), NULL);
  284. simgrid_started = 2;
  285. }
  286. #endif
  287. if (!simgrid_started && !starpu_main && !(smpi_main && smpi_simulated_main_ != _starpu_smpi_simulated_main_))
  288. {
  289. _STARPU_ERROR("In simgrid mode, the file containing the main() function of this application needs to be compiled with starpu.h or starpu_simgrid_wrap.h included, to properly rename it into starpu_main\n");
  290. }
  291. if (_starpu_simgrid_running_smpi())
  292. {
  293. #ifndef STARPU_STATIC_ONLY
  294. _STARPU_ERROR("Simgrid currently does not support privatization for dynamically-linked libraries in SMPI. Please reconfigure and build StarPU with --disable-shared");
  295. #endif
  296. void **tsd;
  297. _STARPU_CALLOC(tsd, MAX_TSD+1, sizeof(void*));
  298. MSG_process_set_data(MSG_process_self(), tsd);
  299. }
  300. unsigned i;
  301. for (i = 0; i < STARPU_MAXNODES; i++)
  302. starpu_pthread_queue_init(&_starpu_simgrid_transfer_queue[i]);
  303. for (i = 0; i < STARPU_NMAXWORKERS; i++)
  304. starpu_pthread_queue_init(&_starpu_simgrid_task_queue[i]);
  305. }
  306. void _starpu_simgrid_init(void)
  307. {
  308. unsigned i;
  309. runners_running = 1;
  310. for (i = 0; i < starpu_worker_get_count(); i++)
  311. {
  312. char s[32];
  313. snprintf(s, sizeof(s), "worker %u runner", i);
  314. void **tsd;
  315. _STARPU_CALLOC(tsd, MAX_TSD+1, sizeof(void*));
  316. worker_runner[i].sem = MSG_sem_init(0);
  317. tsd[0] = (void*)(uintptr_t) i;
  318. worker_runner[i].runner = MSG_process_create_with_arguments(s, task_execute, tsd, _starpu_simgrid_get_host_by_worker(_starpu_get_worker_struct(i)), 0, NULL);
  319. }
  320. }
  321. void _starpu_simgrid_deinit(void)
  322. {
  323. unsigned i, j;
  324. runners_running = 0;
  325. for (i = 0; i < STARPU_MAXNODES; i++)
  326. {
  327. for (j = 0; j < STARPU_MAXNODES; j++)
  328. {
  329. struct transfer_runner *t = &transfer_runner[i][j];
  330. if (t->runner)
  331. {
  332. MSG_sem_release(t->sem);
  333. #if SIMGRID_VERSION_MAJOR > 3 || (SIMGRID_VERSION_MAJOR == 3 && SIMGRID_VERSION_MINOR >= 14)
  334. MSG_process_join(t->runner, 1000000);
  335. #else
  336. MSG_process_sleep(1);
  337. #endif
  338. STARPU_ASSERT(t->first_transfer == NULL);
  339. STARPU_ASSERT(t->last_transfer == NULL);
  340. MSG_sem_destroy(t->sem);
  341. }
  342. }
  343. /* FIXME: queue not empty at this point, needs proper unregistration */
  344. /* starpu_pthread_queue_destroy(&_starpu_simgrid_transfer_queue[i]); */
  345. }
  346. for (i = 0; i < starpu_worker_get_count(); i++)
  347. {
  348. struct worker_runner *w = &worker_runner[i];
  349. MSG_sem_release(w->sem);
  350. #if SIMGRID_VERSION_MAJOR > 3 || (SIMGRID_VERSION_MAJOR == 3 && SIMGRID_VERSION_MINOR >= 14)
  351. MSG_process_join(w->runner, 1000000);
  352. #else
  353. MSG_process_sleep(1);
  354. #endif
  355. STARPU_ASSERT(w->first_task == NULL);
  356. STARPU_ASSERT(w->last_task == NULL);
  357. MSG_sem_destroy(w->sem);
  358. starpu_pthread_queue_destroy(&_starpu_simgrid_task_queue[i]);
  359. }
  360. #ifdef HAVE_MSG_PROCESS_ATTACH
  361. if (simgrid_started == 2)
  362. {
  363. /* Started with MSG_process_attach, now detach */
  364. MSG_process_detach();
  365. simgrid_started = 0;
  366. }
  367. #endif
  368. }
  369. /*
  370. * Tasks
  371. */
  372. struct task
  373. {
  374. msg_task_t task;
  375. /* communication termination signalization */
  376. unsigned *finished;
  377. /* Next task on this worker */
  378. struct task *next;
  379. };
  380. /* Actually execute the task. */
  381. static int task_execute(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[] STARPU_ATTRIBUTE_UNUSED)
  382. {
  383. /* FIXME: Ugly work-around for bug in simgrid: the MPI context is not properly set at MSG process startup */
  384. MSG_process_sleep(0.000001);
  385. unsigned workerid = (uintptr_t) starpu_pthread_getspecific(0);
  386. struct worker_runner *w = &worker_runner[workerid];
  387. _STARPU_DEBUG("worker runner %u started\n", workerid);
  388. while (1)
  389. {
  390. struct task *task;
  391. MSG_sem_acquire(w->sem);
  392. if (!runners_running)
  393. break;
  394. task = w->first_task;
  395. w->first_task = task->next;
  396. if (w->last_task == task)
  397. w->last_task = NULL;
  398. _STARPU_DEBUG("task %p started\n", task);
  399. MSG_task_execute(task->task);
  400. MSG_task_destroy(task->task);
  401. _STARPU_DEBUG("task %p finished\n", task);
  402. *task->finished = 1;
  403. /* The worker which started this task may be sleeping out of tasks, wake it */
  404. starpu_wake_worker(workerid);
  405. free(task);
  406. }
  407. _STARPU_DEBUG("worker %u stopped\n", workerid);
  408. return 0;
  409. }
  410. /* Wait for completion of all asynchronous tasks for this worker */
  411. void _starpu_simgrid_wait_tasks(int workerid)
  412. {
  413. struct task *task = worker_runner[workerid].last_task;
  414. if (!task)
  415. return;
  416. unsigned *finished = task->finished;
  417. starpu_pthread_wait_t wait;
  418. starpu_pthread_wait_init(&wait);
  419. starpu_pthread_queue_register(&wait, &_starpu_simgrid_task_queue[workerid]);
  420. while(1)
  421. {
  422. starpu_pthread_wait_reset(&wait);
  423. if (*finished)
  424. break;
  425. starpu_pthread_wait_wait(&wait);
  426. }
  427. starpu_pthread_queue_unregister(&wait, &_starpu_simgrid_task_queue[workerid]);
  428. starpu_pthread_wait_destroy(&wait);
  429. }
  430. /* Task execution submitted by StarPU */
  431. void _starpu_simgrid_submit_job(int workerid, struct _starpu_job *j, struct starpu_perfmodel_arch* perf_arch, double length, unsigned *finished)
  432. {
  433. struct starpu_task *starpu_task = j->task;
  434. msg_task_t simgrid_task;
  435. if (j->internal)
  436. /* This is not useful to include in simulation (and probably
  437. * doesn't have a perfmodel anyway) */
  438. return;
  439. if (isnan(length))
  440. {
  441. length = starpu_task_expected_length(starpu_task, perf_arch, j->nimpl);
  442. STARPU_ASSERT_MSG(!_STARPU_IS_ZERO(length) && !isnan(length),
  443. "Codelet %s does not have a perfmodel, or is not calibrated enough, please re-run in non-simgrid mode until it is calibrated",
  444. _starpu_job_get_model_name(j));
  445. }
  446. simgrid_task = MSG_task_create(_starpu_job_get_task_name(j),
  447. #ifdef HAVE_MSG_HOST_GET_SPEED
  448. length/1000000.0*MSG_host_get_speed(MSG_host_self()),
  449. #else
  450. length/1000000.0*MSG_get_host_speed(MSG_host_self()),
  451. #endif
  452. 0, NULL);
  453. if (finished == NULL)
  454. {
  455. /* Synchronous execution */
  456. /* First wait for previous tasks */
  457. _starpu_simgrid_wait_tasks(workerid);
  458. MSG_task_execute(simgrid_task);
  459. MSG_task_destroy(simgrid_task);
  460. }
  461. else
  462. {
  463. /* Asynchronous execution */
  464. struct task *task;
  465. struct worker_runner *w = &worker_runner[workerid];
  466. _STARPU_MALLOC(task, sizeof(*task));
  467. task->task = simgrid_task;
  468. task->finished = finished;
  469. *finished = 0;
  470. task->next = NULL;
  471. /* Sleep 10µs for the GPU task queueing */
  472. if (_starpu_simgrid_queue_malloc_cost())
  473. MSG_process_sleep(0.000010);
  474. if (w->last_task)
  475. {
  476. /* Already running a task, queue */
  477. w->last_task->next = task;
  478. w->last_task = task;
  479. }
  480. else
  481. {
  482. STARPU_ASSERT(!w->first_task);
  483. w->first_task = task;
  484. w->last_task = task;
  485. }
  486. MSG_sem_release(w->sem);
  487. }
  488. }
  489. /*
  490. * Transfers
  491. */
  492. /* Note: simgrid is not parallel, so there is no need to hold locks for management of transfers. */
  493. LIST_TYPE(transfer,
  494. msg_task_t task;
  495. int src_node;
  496. int dst_node;
  497. int run_node;
  498. /* communication termination signalization */
  499. unsigned *finished;
  500. /* transfers which wait for this transfer */
  501. struct transfer **wake;
  502. unsigned nwake;
  503. /* Number of transfers that this transfer waits for */
  504. unsigned nwait;
  505. /* Next transfer on this stream */
  506. struct transfer *next;
  507. )
  508. struct transfer_list pending;
  509. /* Tell for two transfers whether they should be handled in sequence */
  510. static int transfers_are_sequential(struct transfer *new_transfer, struct transfer *old_transfer)
  511. {
  512. int new_is_cuda STARPU_ATTRIBUTE_UNUSED, old_is_cuda STARPU_ATTRIBUTE_UNUSED;
  513. int new_is_opencl STARPU_ATTRIBUTE_UNUSED, old_is_opencl STARPU_ATTRIBUTE_UNUSED;
  514. int new_is_gpu_gpu, old_is_gpu_gpu;
  515. new_is_cuda = starpu_node_get_kind(new_transfer->src_node) == STARPU_CUDA_RAM;
  516. new_is_cuda |= starpu_node_get_kind(new_transfer->dst_node) == STARPU_CUDA_RAM;
  517. old_is_cuda = starpu_node_get_kind(old_transfer->src_node) == STARPU_CUDA_RAM;
  518. old_is_cuda |= starpu_node_get_kind(old_transfer->dst_node) == STARPU_CUDA_RAM;
  519. new_is_opencl = starpu_node_get_kind(new_transfer->src_node) == STARPU_OPENCL_RAM;
  520. new_is_opencl |= starpu_node_get_kind(new_transfer->dst_node) == STARPU_OPENCL_RAM;
  521. old_is_opencl = starpu_node_get_kind(old_transfer->src_node) == STARPU_OPENCL_RAM;
  522. old_is_opencl |= starpu_node_get_kind(old_transfer->dst_node) == STARPU_OPENCL_RAM;
  523. new_is_gpu_gpu = new_transfer->src_node && new_transfer->dst_node;
  524. old_is_gpu_gpu = old_transfer->src_node && old_transfer->dst_node;
  525. /* We ignore cuda-opencl transfers, they can not happen */
  526. STARPU_ASSERT(!((new_is_cuda && old_is_opencl) || (old_is_cuda && new_is_opencl)));
  527. /* The following constraints have been observed with CUDA alone */
  528. /* Same source/destination, sequential */
  529. if (new_transfer->src_node == old_transfer->src_node && new_transfer->dst_node == old_transfer->dst_node)
  530. return 1;
  531. /* Crossed GPU-GPU, sequential */
  532. if (new_is_gpu_gpu
  533. && new_transfer->src_node == old_transfer->dst_node
  534. && old_transfer->src_node == new_transfer->dst_node)
  535. return 1;
  536. /* GPU-GPU transfers are sequential with any RAM->GPU transfer */
  537. if (new_is_gpu_gpu
  538. && old_transfer->dst_node == new_transfer->src_node
  539. && old_transfer->dst_node == new_transfer->dst_node)
  540. return 1;
  541. if (old_is_gpu_gpu
  542. && new_transfer->dst_node == old_transfer->src_node
  543. && new_transfer->dst_node == old_transfer->dst_node)
  544. return 1;
  545. /* StarPU's constraint on CUDA transfers is using one stream per
  546. * source/destination pair, which is already handled above */
  547. return 0;
  548. }
  549. static void transfer_queue(struct transfer *transfer)
  550. {
  551. unsigned src = transfer->src_node;
  552. unsigned dst = transfer->dst_node;
  553. struct transfer_runner *t = &transfer_runner[src][dst];
  554. if (!t->runner)
  555. {
  556. /* No runner yet, start it */
  557. static starpu_pthread_mutex_t mutex; /* process_create may yield */
  558. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  559. if (!t->runner)
  560. {
  561. char s[64];
  562. snprintf(s, sizeof(s), "transfer %u-%u runner", src, dst);
  563. void **tsd;
  564. _STARPU_CALLOC(tsd, MAX_TSD+1, sizeof(void*));
  565. tsd[0] = (void*)(uintptr_t)((src<<16) + dst);
  566. t->runner = MSG_process_create_with_arguments(s, transfer_execute, tsd, _starpu_simgrid_get_memnode_host(src), 0, NULL);
  567. t->sem = MSG_sem_init(0);
  568. }
  569. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  570. }
  571. if (t->last_transfer)
  572. {
  573. /* Already running a transfer, queue */
  574. t->last_transfer->next = transfer;
  575. t->last_transfer = transfer;
  576. }
  577. else
  578. {
  579. STARPU_ASSERT(!t->first_transfer);
  580. t->first_transfer = transfer;
  581. t->last_transfer = transfer;
  582. }
  583. MSG_sem_release(t->sem);
  584. }
  585. /* Actually execute the transfer, and then start transfers waiting for this one. */
  586. static int transfer_execute(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[] STARPU_ATTRIBUTE_UNUSED)
  587. {
  588. /* FIXME: Ugly work-around for bug in simgrid: the MPI context is not properly set at MSG process startup */
  589. MSG_process_sleep(0.000001);
  590. unsigned src_dst = (uintptr_t) starpu_pthread_getspecific(0);
  591. unsigned src = src_dst >> 16;
  592. unsigned dst = src_dst & 0xffff;
  593. struct transfer_runner *t = &transfer_runner[src][dst];
  594. _STARPU_DEBUG("transfer runner %u-%u started\n", src, dst);
  595. while (1)
  596. {
  597. struct transfer *transfer;
  598. MSG_sem_acquire(t->sem);
  599. if (!runners_running)
  600. break;
  601. transfer = t->first_transfer;
  602. t->first_transfer = transfer->next;
  603. if (t->last_transfer == transfer)
  604. t->last_transfer = NULL;
  605. if (transfer->task)
  606. {
  607. _STARPU_DEBUG("transfer %p started\n", transfer);
  608. MSG_task_execute(transfer->task);
  609. MSG_task_destroy(transfer->task);
  610. _STARPU_DEBUG("transfer %p finished\n", transfer);
  611. }
  612. *transfer->finished = 1;
  613. transfer_list_erase(&pending, transfer);
  614. /* The workers which started this request may be sleeping out of tasks, wake it */
  615. _starpu_wake_all_blocked_workers_on_node(transfer->run_node);
  616. unsigned i;
  617. /* Wake transfers waiting for my termination */
  618. /* Note: due to possible preemption inside process_create, the array
  619. * may grow while doing this */
  620. for (i = 0; i < transfer->nwake; i++)
  621. {
  622. struct transfer *wake = transfer->wake[i];
  623. STARPU_ASSERT(wake->nwait > 0);
  624. wake->nwait--;
  625. if (!wake->nwait)
  626. {
  627. _STARPU_DEBUG("triggering transfer %p\n", wake);
  628. transfer_queue(wake);
  629. }
  630. }
  631. free(transfer->wake);
  632. free(transfer);
  633. }
  634. return 0;
  635. }
  636. /* Look for sequentialization between this transfer and pending transfers, and submit this one */
  637. static void transfer_submit(struct transfer *transfer)
  638. {
  639. struct transfer *old;
  640. for (old = transfer_list_begin(&pending);
  641. old != transfer_list_end(&pending);
  642. old = transfer_list_next(old))
  643. {
  644. if (transfers_are_sequential(transfer, old))
  645. {
  646. _STARPU_DEBUG("transfer %p(%d->%d) waits for %p(%d->%d)\n",
  647. transfer, transfer->src_node, transfer->dst_node,
  648. old, old->src_node, old->dst_node);
  649. /* Make new wait for the old */
  650. transfer->nwait++;
  651. /* Make old wake the new */
  652. _STARPU_REALLOC(old->wake, (old->nwake + 1) * sizeof(old->wake));
  653. old->wake[old->nwake] = transfer;
  654. old->nwake++;
  655. }
  656. }
  657. transfer_list_push_front(&pending, transfer);
  658. if (!transfer->nwait)
  659. {
  660. _STARPU_DEBUG("transfer %p waits for nobody, starting\n", transfer);
  661. transfer_queue(transfer);
  662. }
  663. }
  664. int _starpu_simgrid_wait_transfer_event(union _starpu_async_channel_event *event)
  665. {
  666. /* this is not associated to a request so it's synchronous */
  667. starpu_pthread_wait_t wait;
  668. starpu_pthread_wait_init(&wait);
  669. starpu_pthread_queue_register(&wait, event->queue);
  670. while(1)
  671. {
  672. starpu_pthread_wait_reset(&wait);
  673. if (event->finished)
  674. break;
  675. starpu_pthread_wait_wait(&wait);
  676. }
  677. starpu_pthread_queue_unregister(&wait, event->queue);
  678. starpu_pthread_wait_destroy(&wait);
  679. return 0;
  680. }
  681. int _starpu_simgrid_test_transfer_event(union _starpu_async_channel_event *event)
  682. {
  683. return event->finished;
  684. }
  685. /* Wait for completion of all transfers */
  686. static void _starpu_simgrid_wait_transfers(void)
  687. {
  688. unsigned finished = 0;
  689. struct transfer *sync = transfer_new();
  690. struct transfer *cur;
  691. sync->task = NULL;
  692. sync->finished = &finished;
  693. sync->src_node = STARPU_MAIN_RAM;
  694. sync->dst_node = STARPU_MAIN_RAM;
  695. sync->run_node = STARPU_MAIN_RAM;
  696. sync->wake = NULL;
  697. sync->nwake = 0;
  698. sync->nwait = 0;
  699. sync->next = NULL;
  700. for (cur = transfer_list_begin(&pending);
  701. cur != transfer_list_end(&pending);
  702. cur = transfer_list_next(cur))
  703. {
  704. sync->nwait++;
  705. _STARPU_REALLOC(cur->wake, (cur->nwake + 1) * sizeof(cur->wake));
  706. cur->wake[cur->nwake] = sync;
  707. cur->nwake++;
  708. }
  709. if (sync->nwait == 0)
  710. {
  711. /* No transfer to wait for */
  712. free(sync);
  713. return;
  714. }
  715. /* Push synchronization pseudo-transfer */
  716. transfer_list_push_front(&pending, sync);
  717. /* And wait for it */
  718. starpu_pthread_wait_t wait;
  719. starpu_pthread_wait_init(&wait);
  720. starpu_pthread_queue_register(&wait, &_starpu_simgrid_transfer_queue[STARPU_MAIN_RAM]);
  721. while(1)
  722. {
  723. starpu_pthread_wait_reset(&wait);
  724. if (finished)
  725. break;
  726. starpu_pthread_wait_wait(&wait);
  727. }
  728. starpu_pthread_queue_unregister(&wait, &_starpu_simgrid_transfer_queue[STARPU_MAIN_RAM]);
  729. starpu_pthread_wait_destroy(&wait);
  730. }
  731. /* Data transfer issued by StarPU */
  732. int _starpu_simgrid_transfer(size_t size, unsigned src_node, unsigned dst_node, struct _starpu_data_request *req)
  733. {
  734. /* Simgrid does not like 0-bytes transfers */
  735. if (!size)
  736. return 0;
  737. msg_task_t task;
  738. msg_host_t *hosts;
  739. double *computation;
  740. double *communication;
  741. union _starpu_async_channel_event *event, myevent;
  742. _STARPU_CALLOC(hosts, 2, sizeof(*hosts));
  743. _STARPU_CALLOC(computation, 2, sizeof(*computation));
  744. _STARPU_CALLOC(communication, 4, sizeof(*communication));
  745. hosts[0] = _starpu_simgrid_memory_node_get_host(src_node);
  746. hosts[1] = _starpu_simgrid_memory_node_get_host(dst_node);
  747. STARPU_ASSERT(hosts[0] != hosts[1]);
  748. communication[1] = size;
  749. task = MSG_parallel_task_create("copy", 2, hosts, computation, communication, NULL);
  750. struct transfer *transfer = transfer_new();
  751. _STARPU_DEBUG("creating transfer %p for %lu bytes\n", transfer, (unsigned long) size);
  752. transfer->task = task;
  753. transfer->src_node = src_node;
  754. transfer->dst_node = dst_node;
  755. transfer->run_node = _starpu_memory_node_get_local_key();
  756. if (req)
  757. event = &req->async_channel.event;
  758. else
  759. event = &myevent;
  760. event->finished = 0;
  761. transfer->finished = &event->finished;
  762. event->queue = &_starpu_simgrid_transfer_queue[transfer->run_node];
  763. transfer->wake = NULL;
  764. transfer->nwake = 0;
  765. transfer->nwait = 0;
  766. transfer->next = NULL;
  767. if (req)
  768. _STARPU_TRACE_START_DRIVER_COPY_ASYNC(src_node, dst_node);
  769. /* Sleep 10µs for the GPU transfer queueing */
  770. if (_starpu_simgrid_queue_malloc_cost())
  771. MSG_process_sleep(0.000010);
  772. transfer_submit(transfer);
  773. /* Note: from here, transfer might be already freed */
  774. if (req)
  775. {
  776. _STARPU_TRACE_END_DRIVER_COPY_ASYNC(src_node, dst_node);
  777. _STARPU_TRACE_DATA_COPY(src_node, dst_node, size);
  778. return -EAGAIN;
  779. }
  780. else
  781. {
  782. /* this is not associated to a request so it's synchronous */
  783. _starpu_simgrid_wait_transfer_event(event);
  784. return 0;
  785. }
  786. }
  787. /* Sync all GPUs (used on CUDA Free, typically) */
  788. void _starpu_simgrid_sync_gpus(void)
  789. {
  790. _starpu_simgrid_wait_transfers();
  791. }
  792. int
  793. _starpu_simgrid_thread_start(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[])
  794. {
  795. void *(*f)(void*) = (void*) (uintptr_t) strtol(argv[0], NULL, 16);
  796. void *arg = (void*) (uintptr_t) strtol(argv[1], NULL, 16);
  797. /* FIXME: Ugly work-around for bug in simgrid: the MPI context is not properly set at MSG process startup */
  798. MSG_process_sleep(0.000001);
  799. /* _args is freed with process context */
  800. f(arg);
  801. return 0;
  802. }
  803. msg_host_t
  804. _starpu_simgrid_get_memnode_host(unsigned node)
  805. {
  806. const char *fmt;
  807. char name[16];
  808. switch (starpu_node_get_kind(node))
  809. {
  810. case STARPU_CPU_RAM:
  811. fmt = "RAM";
  812. break;
  813. case STARPU_CUDA_RAM:
  814. fmt = "CUDA%u";
  815. break;
  816. case STARPU_OPENCL_RAM:
  817. fmt = "OpenCL%u";
  818. break;
  819. default:
  820. STARPU_ABORT();
  821. break;
  822. }
  823. snprintf(name, sizeof(name), fmt, _starpu_memory_node_get_devid(node));
  824. return _starpu_simgrid_get_host_by_name(name);
  825. }
  826. void _starpu_simgrid_count_ngpus(void)
  827. {
  828. #if defined(HAVE_SG_LINK_NAME) && (SIMGRID_VERSION_MAJOR >= 4 || (SIMGRID_VERSION_MAJOR == 3 && SIMGRID_VERSION_MINOR >= 13))
  829. unsigned src, dst;
  830. msg_host_t ramhost = _starpu_simgrid_get_host_by_name("RAM");
  831. /* For each pair of memory nodes, get the route */
  832. for (src = 1; src < STARPU_MAXNODES; src++)
  833. for (dst = 1; dst < STARPU_MAXNODES; dst++)
  834. {
  835. int busid;
  836. msg_host_t srchost, dsthost;
  837. #ifdef HAVE_SG_HOST_ROUTE
  838. xbt_dynar_t route_dynar = xbt_dynar_new(sizeof(SD_link_t), NULL);
  839. SD_link_t *route;
  840. #else
  841. const SD_link_t *route;
  842. #endif
  843. int i, routesize;
  844. int through;
  845. unsigned src2;
  846. unsigned ngpus;
  847. const char *name;
  848. if (dst == src)
  849. continue;
  850. busid = starpu_bus_get_id(src, dst);
  851. if (busid == -1)
  852. continue;
  853. srchost = _starpu_simgrid_get_memnode_host(src);
  854. dsthost = _starpu_simgrid_get_memnode_host(dst);
  855. #ifdef HAVE_SG_HOST_ROUTE
  856. sg_host_route(srchost, dsthost, route_dynar);
  857. routesize = xbt_dynar_length(route_dynar);
  858. route = xbt_dynar_to_array(route_dynar);
  859. #else
  860. routesize = SD_route_get_size(srchost, dsthost);
  861. route = SD_route_get_list(srchost, dsthost);
  862. #endif
  863. /* If it goes through "Host", do not care, there is no
  864. * direct transfer support */
  865. for (i = 0; i < routesize; i++)
  866. if (!strcmp(sg_link_name(route[i]), "Host"))
  867. break;
  868. if (i < routesize)
  869. continue;
  870. /* Get the PCI bridge between down and up links */
  871. through = -1;
  872. for (i = 0; i < routesize; i++)
  873. {
  874. name = sg_link_name(route[i]);
  875. size_t len = strlen(name);
  876. if (!strcmp(" through", name+len-8))
  877. through = i;
  878. else if (!strcmp(" up", name+len-3))
  879. break;
  880. }
  881. /* Didn't find it ?! */
  882. if (through == -1)
  883. {
  884. _STARPU_DEBUG("Didn't find through-link for %d->%d\n", src, dst);
  885. continue;
  886. }
  887. name = sg_link_name(route[through]);
  888. /*
  889. * count how many direct routes go through it between
  890. * GPUs and RAM
  891. */
  892. ngpus = 0;
  893. for (src2 = 1; src2 < STARPU_MAXNODES; src2++)
  894. {
  895. if (starpu_bus_get_id(src2, STARPU_MAIN_RAM) == -1)
  896. continue;
  897. msg_host_t srchost2 = _starpu_simgrid_get_memnode_host(src2);
  898. int routesize2;
  899. #ifdef HAVE_SG_HOST_ROUTE
  900. xbt_dynar_t route_dynar2 = xbt_dynar_new(sizeof(SD_link_t), NULL);
  901. SD_link_t *route2;
  902. sg_host_route(srchost2, ramhost, route_dynar2);
  903. routesize2 = xbt_dynar_length(route_dynar2);
  904. route2 = xbt_dynar_to_array(route_dynar2);
  905. #else
  906. const SD_link_t *route2 = SD_route_get_list(srchost2, ramhost);
  907. routesize2 = SD_route_get_size(srchost2, ramhost);
  908. #endif
  909. for (i = 0; i < routesize2; i++)
  910. if (!strcmp(name, sg_link_name(route2[i])))
  911. {
  912. /* This GPU goes through this PCI bridge to access RAM */
  913. ngpus++;
  914. break;
  915. }
  916. #ifdef HAVE_SG_HOST_ROUTE
  917. free(route2);
  918. #endif
  919. }
  920. _STARPU_DEBUG("%d->%d through %s, %u GPUs\n", src, dst, name, ngpus);
  921. starpu_bus_set_ngpus(busid, ngpus);
  922. #ifdef HAVE_SG_HOST_ROUTE
  923. free(route);
  924. #endif
  925. }
  926. #endif
  927. }
  928. typedef struct
  929. {
  930. void_f_pvoid_t code;
  931. void *userparam;
  932. void *father_data;
  933. } thread_data_t;
  934. static int _starpu_simgrid_xbt_thread_create_wrapper(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[] STARPU_ATTRIBUTE_UNUSED)
  935. {
  936. /* FIXME: Ugly work-around for bug in simgrid: the MPI context is not properly set at MSG process startup */
  937. MSG_process_sleep(0.000001);
  938. #ifdef HAVE_SMX_ACTOR_T
  939. smx_actor_t
  940. #else
  941. smx_process_t
  942. #endif
  943. self = SIMIX_process_self();
  944. #if SIMGRID_VERSION_MAJOR < 3 || (SIMGRID_VERSION_MAJOR == 3 && SIMGRID_VERSION_MINOR < 13)
  945. thread_data_t *t = SIMIX_process_self_get_data(self);
  946. #else
  947. thread_data_t *t = SIMIX_process_self_get_data();
  948. #endif
  949. simcall_process_set_data(self, t->father_data);
  950. t->code(t->userparam);
  951. simcall_process_set_data(self, NULL);
  952. free(t);
  953. return 0;
  954. }
  955. void _starpu_simgrid_xbt_thread_create(const char *name, void_f_pvoid_t code, void *param)
  956. {
  957. #ifdef HAVE_SMX_ACTOR_T
  958. smx_actor_t process STARPU_ATTRIBUTE_UNUSED;
  959. #else
  960. smx_process_t process STARPU_ATTRIBUTE_UNUSED;
  961. #endif
  962. thread_data_t *res;
  963. _STARPU_MALLOC(res, sizeof(thread_data_t));
  964. res->userparam = param;
  965. res->code = code;
  966. #if SIMGRID_VERSION_MAJOR < 3 || (SIMGRID_VERSION_MAJOR == 3 && SIMGRID_VERSION_MINOR < 13)
  967. res->father_data = SIMIX_process_self_get_data(SIMIX_process_self());
  968. #else
  969. res->father_data = SIMIX_process_self_get_data();
  970. #endif
  971. #if SIMGRID_VERSION_MAJOR < 3 || (SIMGRID_VERSION_MAJOR == 3 && SIMGRID_VERSION_MINOR < 12)
  972. simcall_process_create(&process,
  973. #else
  974. process = simcall_process_create(
  975. #endif
  976. name,
  977. _starpu_simgrid_xbt_thread_create_wrapper, res,
  978. #if SIMGRID_VERSION_MAJOR < 3 || (SIMGRID_VERSION_MAJOR == 3 && SIMGRID_VERSION_MINOR < 14)
  979. SIMIX_host_self_get_name(),
  980. #else
  981. SIMIX_host_self(),
  982. #endif
  983. #if SIMGRID_VERSION_MAJOR < 3 || (SIMGRID_VERSION_MAJOR == 3 && SIMGRID_VERSION_MINOR < 15)
  984. -1.0,
  985. #endif
  986. 0, NULL,
  987. /*props */ NULL
  988. #if SIMGRID_VERSION_MAJOR < 3 || (SIMGRID_VERSION_MAJOR == 3 && SIMGRID_VERSION_MINOR < 15)
  989. , 0
  990. #endif
  991. );
  992. }
  993. #endif