simgrid.c 29 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2012-2017 Université de Bordeaux
  4. * Copyright (C) 2016 Inria
  5. * Copyright (C) 2016, 2017 CNRS
  6. *
  7. * StarPU is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU Lesser General Public License as published by
  9. * the Free Software Foundation; either version 2.1 of the License, or (at
  10. * your option) any later version.
  11. *
  12. * StarPU is distributed in the hope that it will be useful, but
  13. * WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  15. *
  16. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  17. */
  18. #include <starpu.h>
  19. #include <datawizard/memory_nodes.h>
  20. #include <common/config.h>
  21. #ifdef HAVE_UNISTD_H
  22. #include <unistd.h>
  23. #endif
  24. #include <core/perfmodel/perfmodel.h>
  25. #include <core/workers.h>
  26. #include <core/simgrid.h>
  27. #if defined(HAVE_SG_LINK_NAME) && (SIMGRID_VERSION_MAJOR >= 4 || (SIMGRID_VERSION_MAJOR == 3 && SIMGRID_VERSION_MINOR >= 13))
  28. #include <simgrid/simdag.h>
  29. #endif
  30. #ifdef STARPU_SIMGRID
  31. #include <sys/resource.h>
  32. #include <simgrid/simix.h>
  33. #pragma weak starpu_main
  34. extern int starpu_main(int argc, char *argv[]);
  35. #pragma weak smpi_main
  36. extern int smpi_main(int (*realmain) (int argc, char *argv[]), int argc, char *argv[]);
  37. #pragma weak _starpu_mpi_simgrid_init
  38. extern int _starpu_mpi_simgrid_init(int argc, char *argv[]);
  39. static int simgrid_started;
  40. static int runners_running;
  41. starpu_pthread_queue_t _starpu_simgrid_transfer_queue[STARPU_MAXNODES];
  42. static struct transfer_runner {
  43. struct transfer *first_transfer, *last_transfer;
  44. msg_sem_t sem;
  45. msg_process_t runner;
  46. } transfer_runner[STARPU_MAXNODES][STARPU_MAXNODES];
  47. static int transfer_execute(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[] STARPU_ATTRIBUTE_UNUSED);
  48. starpu_pthread_queue_t _starpu_simgrid_task_queue[STARPU_NMAXWORKERS];
  49. static struct worker_runner {
  50. struct task *first_task, *last_task;
  51. msg_sem_t sem;
  52. msg_process_t runner;
  53. } worker_runner[STARPU_NMAXWORKERS];
  54. static int task_execute(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[] STARPU_ATTRIBUTE_UNUSED);
  55. /* In case the MPI application didn't use smpicc to build the file containing
  56. * main(), try to cope by calling starpu_main */
  57. int _starpu_smpi_simulated_main_(int argc, char *argv[])
  58. {
  59. if (!starpu_main)
  60. {
  61. _STARPU_ERROR("In simgrid mode, the file containing the main() function of this application needs to be compiled with starpu.h or starpu_simgrid_wrap.h included, to properly rename it into starpu_main\n");
  62. }
  63. return starpu_main(argc, argv);
  64. }
  65. int smpi_simulated_main_(int argc, char *argv[]) __attribute__((weak, alias("_starpu_smpi_simulated_main_")));
  66. #ifdef HAVE_MSG_ENVIRONMENT_GET_ROUTING_ROOT
  67. #ifdef HAVE_MSG_GET_AS_BY_NAME
  68. msg_as_t _starpu_simgrid_get_as_by_name(const char *name)
  69. {
  70. return MSG_get_as_by_name(name);
  71. }
  72. #else /* HAVE_MSG_GET_AS_BY_NAME */
  73. static msg_as_t __starpu_simgrid_get_as_by_name(msg_as_t root, const char *name)
  74. {
  75. xbt_dict_t dict;
  76. xbt_dict_cursor_t cursor;
  77. const char *key;
  78. msg_as_t as, ret;
  79. dict = MSG_environment_as_get_routing_sons(root);
  80. xbt_dict_foreach(dict, cursor, key, as)
  81. {
  82. if (!strcmp(MSG_environment_as_get_name(as), name))
  83. return as;
  84. ret = __starpu_simgrid_get_as_by_name(as, name);
  85. if (ret)
  86. return ret;
  87. }
  88. return NULL;
  89. }
  90. msg_as_t _starpu_simgrid_get_as_by_name(const char *name)
  91. {
  92. return __starpu_simgrid_get_as_by_name(MSG_environment_get_routing_root(), name);
  93. }
  94. #endif /* HAVE_MSG_GET_AS_BY_NAME */
  95. #endif /* HAVE_MSG_ENVIRONMENT_GET_ROUTING_ROOT */
  96. int _starpu_simgrid_get_nbhosts(const char *prefix)
  97. {
  98. int ret;
  99. xbt_dynar_t hosts;
  100. unsigned i, nb;
  101. unsigned len = strlen(prefix);
  102. #ifdef HAVE_MSG_ENVIRONMENT_GET_ROUTING_ROOT
  103. char new_prefix[32];
  104. if (_starpu_simgrid_running_smpi())
  105. {
  106. char name[32];
  107. STARPU_ASSERT(starpu_mpi_world_rank);
  108. snprintf(name, sizeof(name), STARPU_MPI_AS_PREFIX"%u", starpu_mpi_world_rank());
  109. hosts = MSG_environment_as_get_hosts(_starpu_simgrid_get_as_by_name(name));
  110. snprintf(new_prefix, sizeof(new_prefix), "%s-%s", name, prefix);
  111. prefix = new_prefix;
  112. len = strlen(prefix);
  113. }
  114. else
  115. #endif /* HAVE_MSG_ENVIRONMENT_GET_ROUTING_ROOT */
  116. hosts = MSG_hosts_as_dynar();
  117. nb = xbt_dynar_length(hosts);
  118. ret = 0;
  119. for (i = 0; i < nb; i++)
  120. {
  121. const char *name;
  122. name = MSG_host_get_name(xbt_dynar_get_as(hosts, i, msg_host_t));
  123. if (!strncmp(name, prefix, len))
  124. ret++;
  125. }
  126. xbt_dynar_free(&hosts);
  127. return ret;
  128. }
  129. unsigned long long _starpu_simgrid_get_memsize(const char *prefix, unsigned devid)
  130. {
  131. char name[32];
  132. msg_host_t host;
  133. const char *memsize;
  134. snprintf(name, sizeof(name), "%s%u", prefix, devid);
  135. host = _starpu_simgrid_get_host_by_name(name);
  136. if (!host)
  137. return 0;
  138. if (!MSG_host_get_properties(host))
  139. return 0;
  140. memsize = MSG_host_get_property_value(host, "memsize");
  141. if (!memsize)
  142. return 0;
  143. return atoll(memsize);
  144. }
  145. msg_host_t _starpu_simgrid_get_host_by_name(const char *name)
  146. {
  147. if (_starpu_simgrid_running_smpi())
  148. {
  149. char mpiname[32];
  150. STARPU_ASSERT(starpu_mpi_world_rank);
  151. snprintf(mpiname, sizeof(mpiname), STARPU_MPI_AS_PREFIX"%d-%s", starpu_mpi_world_rank(), name);
  152. return MSG_get_host_by_name(mpiname);
  153. }
  154. else
  155. return MSG_get_host_by_name(name);
  156. }
  157. msg_host_t _starpu_simgrid_get_host_by_worker(struct _starpu_worker *worker)
  158. {
  159. char *prefix;
  160. char name[16];
  161. msg_host_t host;
  162. switch (worker->arch)
  163. {
  164. case STARPU_CPU_WORKER:
  165. prefix = "CPU";
  166. break;
  167. case STARPU_CUDA_WORKER:
  168. prefix = "CUDA";
  169. break;
  170. case STARPU_OPENCL_WORKER:
  171. prefix = "OpenCL";
  172. break;
  173. default:
  174. STARPU_ASSERT(0);
  175. }
  176. snprintf(name, sizeof(name), "%s%d", prefix, worker->devid);
  177. host = _starpu_simgrid_get_host_by_name(name);
  178. STARPU_ASSERT_MSG(host, "Could not find host %s!", name);
  179. return host;
  180. }
  181. static void start_simgrid(int *argc, char **argv)
  182. {
  183. char path[256];
  184. simgrid_started = 1;
  185. if (!starpu_main && !(smpi_main && smpi_simulated_main_ != _starpu_smpi_simulated_main_))
  186. {
  187. _STARPU_ERROR("In simgrid mode, the file containing the main() function of this application needs to be compiled with starpu.h or starpu_simgrid_wrap.h included, to properly rename it into starpu_main\n");
  188. }
  189. MSG_init(argc, argv);
  190. #if SIMGRID_VERSION_MAJOR < 3 || (SIMGRID_VERSION_MAJOR == 3 && SIMGRID_VERSION_MINOR < 9)
  191. /* Versions earlier than 3.9 didn't support our communication tasks */
  192. MSG_config("workstation/model", "ptask_L07");
  193. #endif
  194. /* Simgrid uses tiny stacks by default. This comes unexpected to our users. */
  195. unsigned stack_size = 8192;
  196. struct rlimit rlim;
  197. if (getrlimit(RLIMIT_STACK, &rlim) == 0 && rlim.rlim_cur != 0 && rlim.rlim_cur != RLIM_INFINITY)
  198. stack_size = rlim.rlim_cur / 1024;
  199. #if SIMGRID_VERSION_MAJOR < 3 || (SIMGRID_VERSION_MAJOR == 3 && SIMGRID_VERSION_MINOR < 13)
  200. extern xbt_cfg_t _sg_cfg_set;
  201. xbt_cfg_set_int(_sg_cfg_set, "contexts/stack_size", stack_size);
  202. #else
  203. xbt_cfg_set_int("contexts/stack-size", stack_size);
  204. #endif
  205. /* Load XML platform */
  206. #if SIMGRID_VERSION_MAJOR < 3 || (SIMGRID_VERSION_MAJOR == 3 && SIMGRID_VERSION_MINOR < 13)
  207. _starpu_simgrid_get_platform_path(3, path, sizeof(path));
  208. #else
  209. _starpu_simgrid_get_platform_path(4, path, sizeof(path));
  210. #endif
  211. MSG_create_environment(path);
  212. }
  213. struct main_args
  214. {
  215. int argc;
  216. char **argv;
  217. };
  218. static int main_ret;
  219. int do_starpu_main(int argc, char *argv[])
  220. {
  221. /* FIXME: Ugly work-around for bug in simgrid: the MPI context is not properly set at MSG process startup */
  222. MSG_process_sleep(0.000001);
  223. main_ret = starpu_main(argc, argv);
  224. return main_ret;
  225. }
  226. #undef main
  227. #pragma weak main
  228. int main(int argc, char **argv)
  229. {
  230. if (_starpu_simgrid_running_smpi())
  231. {
  232. /* Oops, we are running SMPI, let it start Simgrid, and we'll
  233. * take back hand in _starpu_simgrid_init from starpu_init() */
  234. return smpi_main(_starpu_mpi_simgrid_init, argc, argv);
  235. }
  236. /* Managed to catch application's main, initialize simgrid first */
  237. start_simgrid(&argc, argv);
  238. /* Create a simgrid process for main */
  239. char **argv_cpy;
  240. _STARPU_MALLOC(argv_cpy, argc * sizeof(char*));
  241. int i;
  242. for (i = 0; i < argc; i++)
  243. argv_cpy[i] = strdup(argv[i]);
  244. MSG_process_create_with_arguments("main", &do_starpu_main, calloc(MAX_TSD+1, sizeof(void*)), MSG_get_host_by_name("MAIN"), argc, argv_cpy);
  245. /* And run maestro in main thread */
  246. MSG_main();
  247. return main_ret;
  248. }
  249. #ifdef HAVE_MSG_PROCESS_ATTACH
  250. static void maestro(void *data STARPU_ATTRIBUTE_UNUSED)
  251. {
  252. MSG_main();
  253. }
  254. #endif
  255. void _starpu_simgrid_init_early(int *argc STARPU_ATTRIBUTE_UNUSED, char ***argv STARPU_ATTRIBUTE_UNUSED)
  256. {
  257. #ifdef HAVE_MSG_PROCESS_ATTACH
  258. if (!simgrid_started && !(smpi_main && smpi_simulated_main_ != _starpu_smpi_simulated_main_))
  259. {
  260. _STARPU_DISP("Warning: In simgrid mode, the file containing the main() function of this application should to be compiled with starpu.h or starpu_simgrid_wrap.h included, to properly rename it into starpu_main to avoid having to use --cfg=contexts/factory:thread which reduces performance\n");
  261. #if SIMGRID_VERSION_MAJOR > 3 || (SIMGRID_VERSION_MAJOR == 3 && SIMGRID_VERSION_MINOR >= 14)
  262. xbt_cfg_set_string("contexts/factory", "thread");
  263. #endif
  264. /* We didn't catch application's main. */
  265. /* Start maestro as a separate thread */
  266. SIMIX_set_maestro(maestro, NULL);
  267. /* Initialize simgrid */
  268. start_simgrid(argc, *argv);
  269. /* And attach the main thread to the main simgrid process */
  270. MSG_process_attach("main", calloc(MAX_TSD, sizeof(void*)), MSG_get_host_by_name("MAIN"), NULL);
  271. simgrid_started = 2;
  272. }
  273. #endif
  274. if (!simgrid_started && !starpu_main && !(smpi_main && smpi_simulated_main_ != _starpu_smpi_simulated_main_))
  275. {
  276. _STARPU_ERROR("In simgrid mode, the file containing the main() function of this application needs to be compiled with starpu.h or starpu_simgrid_wrap.h included, to properly rename it into starpu_main\n");
  277. }
  278. if (_starpu_simgrid_running_smpi())
  279. {
  280. #ifndef STARPU_STATIC_ONLY
  281. _STARPU_ERROR("Simgrid currently does not support privatization for dynamically-linked libraries in SMPI. Please reconfigure and build StarPU with --disable-shared");
  282. #endif
  283. MSG_process_set_data(MSG_process_self(), calloc(MAX_TSD, sizeof(void*)));
  284. }
  285. unsigned i;
  286. for (i = 0; i < STARPU_MAXNODES; i++)
  287. starpu_pthread_queue_init(&_starpu_simgrid_transfer_queue[i]);
  288. for (i = 0; i < STARPU_NMAXWORKERS; i++)
  289. starpu_pthread_queue_init(&_starpu_simgrid_task_queue[i]);
  290. }
  291. void _starpu_simgrid_init(void)
  292. {
  293. unsigned i;
  294. runners_running = 1;
  295. for (i = 0; i < starpu_worker_get_count(); i++)
  296. {
  297. char s[32];
  298. snprintf(s, sizeof(s), "worker %u runner", i);
  299. void **tsd = calloc(MAX_TSD+1, sizeof(void*));
  300. worker_runner[i].sem = MSG_sem_init(0);
  301. tsd[0] = (void*)(uintptr_t) i;
  302. worker_runner[i].runner = MSG_process_create_with_arguments(s, task_execute, tsd, _starpu_simgrid_get_host_by_worker(_starpu_get_worker_struct(i)), 0, NULL);
  303. }
  304. }
  305. void _starpu_simgrid_deinit(void)
  306. {
  307. unsigned i, j;
  308. runners_running = 0;
  309. for (i = 0; i < STARPU_MAXNODES; i++)
  310. {
  311. for (j = 0; j < STARPU_MAXNODES; j++)
  312. {
  313. struct transfer_runner *t = &transfer_runner[i][j];
  314. if (t->runner)
  315. {
  316. MSG_sem_release(t->sem);
  317. #if SIMGRID_VERSION_MAJOR > 3 || (SIMGRID_VERSION_MAJOR == 3 && SIMGRID_VERSION_MINOR >= 14)
  318. MSG_process_join(t->runner, 1000000);
  319. #else
  320. MSG_process_sleep(1);
  321. #endif
  322. STARPU_ASSERT(t->first_transfer == NULL);
  323. STARPU_ASSERT(t->last_transfer == NULL);
  324. MSG_sem_destroy(t->sem);
  325. }
  326. }
  327. /* FIXME: queue not empty at this point, needs proper unregistration */
  328. /* starpu_pthread_queue_destroy(&_starpu_simgrid_transfer_queue[i]); */
  329. }
  330. for (i = 0; i < starpu_worker_get_count(); i++)
  331. {
  332. struct worker_runner *w = &worker_runner[i];
  333. MSG_sem_release(w->sem);
  334. #if SIMGRID_VERSION_MAJOR > 3 || (SIMGRID_VERSION_MAJOR == 3 && SIMGRID_VERSION_MINOR >= 14)
  335. MSG_process_join(w->runner, 1000000);
  336. #else
  337. MSG_process_sleep(1);
  338. #endif
  339. STARPU_ASSERT(w->first_task == NULL);
  340. STARPU_ASSERT(w->last_task == NULL);
  341. MSG_sem_destroy(w->sem);
  342. starpu_pthread_queue_destroy(&_starpu_simgrid_task_queue[i]);
  343. }
  344. #ifdef HAVE_MSG_PROCESS_ATTACH
  345. if (simgrid_started == 2)
  346. {
  347. /* Started with MSG_process_attach, now detach */
  348. MSG_process_detach();
  349. simgrid_started = 0;
  350. }
  351. #endif
  352. }
  353. /*
  354. * Tasks
  355. */
  356. struct task
  357. {
  358. msg_task_t task;
  359. /* communication termination signalization */
  360. unsigned *finished;
  361. /* Next task on this worker */
  362. struct task *next;
  363. };
  364. /* Actually execute the task. */
  365. static int task_execute(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[] STARPU_ATTRIBUTE_UNUSED)
  366. {
  367. /* FIXME: Ugly work-around for bug in simgrid: the MPI context is not properly set at MSG process startup */
  368. MSG_process_sleep(0.000001);
  369. unsigned workerid = (uintptr_t) starpu_pthread_getspecific(0);
  370. struct worker_runner *w = &worker_runner[workerid];
  371. _STARPU_DEBUG("worker runner %u started\n", workerid);
  372. while (1) {
  373. struct task *task;
  374. MSG_sem_acquire(w->sem);
  375. if (!runners_running)
  376. break;
  377. task = w->first_task;
  378. w->first_task = task->next;
  379. if (w->last_task == task)
  380. w->last_task = NULL;
  381. _STARPU_DEBUG("task %p started\n", task);
  382. MSG_task_execute(task->task);
  383. MSG_task_destroy(task->task);
  384. _STARPU_DEBUG("task %p finished\n", task);
  385. *task->finished = 1;
  386. /* The worker which started this task may be sleeping out of tasks, wake it */
  387. starpu_wake_worker(workerid);
  388. free(task);
  389. }
  390. _STARPU_DEBUG("worker %u stopped\n", workerid);
  391. return 0;
  392. }
  393. /* Wait for completion of all asynchronous tasks for this worker */
  394. void _starpu_simgrid_wait_tasks(int workerid)
  395. {
  396. struct task *task = worker_runner[workerid].last_task;
  397. if (!task)
  398. return;
  399. unsigned *finished = task->finished;
  400. starpu_pthread_wait_t wait;
  401. starpu_pthread_wait_init(&wait);
  402. starpu_pthread_queue_register(&wait, &_starpu_simgrid_task_queue[workerid]);
  403. while(1)
  404. {
  405. starpu_pthread_wait_reset(&wait);
  406. if (*finished)
  407. break;
  408. starpu_pthread_wait_wait(&wait);
  409. }
  410. starpu_pthread_queue_unregister(&wait, &_starpu_simgrid_task_queue[workerid]);
  411. starpu_pthread_wait_destroy(&wait);
  412. }
  413. /* Task execution submitted by StarPU */
  414. void _starpu_simgrid_submit_job(int workerid, struct _starpu_job *j, struct starpu_perfmodel_arch* perf_arch, double length, unsigned *finished)
  415. {
  416. struct starpu_task *starpu_task = j->task;
  417. msg_task_t simgrid_task;
  418. if (j->internal)
  419. /* This is not useful to include in simulation (and probably
  420. * doesn't have a perfmodel anyway) */
  421. return;
  422. if (isnan(length))
  423. {
  424. length = starpu_task_expected_length(starpu_task, perf_arch, j->nimpl);
  425. STARPU_ASSERT_MSG(!_STARPU_IS_ZERO(length) && !isnan(length),
  426. "Codelet %s does not have a perfmodel, or is not calibrated enough, please re-run in non-simgrid mode until it is calibrated",
  427. _starpu_job_get_model_name(j));
  428. }
  429. simgrid_task = MSG_task_create(_starpu_job_get_task_name(j),
  430. #ifdef HAVE_MSG_HOST_GET_SPEED
  431. length/1000000.0*MSG_host_get_speed(MSG_host_self()),
  432. #else
  433. length/1000000.0*MSG_get_host_speed(MSG_host_self()),
  434. #endif
  435. 0, NULL);
  436. if (finished == NULL)
  437. {
  438. /* Synchronous execution */
  439. /* First wait for previous tasks */
  440. _starpu_simgrid_wait_tasks(workerid);
  441. MSG_task_execute(simgrid_task);
  442. MSG_task_destroy(simgrid_task);
  443. }
  444. else
  445. {
  446. /* Asynchronous execution */
  447. struct task *task;
  448. struct worker_runner *w = &worker_runner[workerid];
  449. _STARPU_MALLOC(task, sizeof(*task));
  450. task->task = simgrid_task;
  451. task->finished = finished;
  452. *finished = 0;
  453. task->next = NULL;
  454. /* Sleep 10µs for the GPU task queueing */
  455. if (_starpu_simgrid_queue_malloc_cost())
  456. MSG_process_sleep(0.000010);
  457. if (w->last_task)
  458. {
  459. /* Already running a task, queue */
  460. w->last_task->next = task;
  461. w->last_task = task;
  462. }
  463. else
  464. {
  465. STARPU_ASSERT(!w->first_task);
  466. w->first_task = task;
  467. w->last_task = task;
  468. }
  469. MSG_sem_release(w->sem);
  470. }
  471. }
  472. /*
  473. * Transfers
  474. */
  475. /* Note: simgrid is not parallel, so there is no need to hold locks for management of transfers. */
  476. LIST_TYPE(transfer,
  477. msg_task_t task;
  478. int src_node;
  479. int dst_node;
  480. int run_node;
  481. /* communication termination signalization */
  482. unsigned *finished;
  483. /* transfers which wait for this transfer */
  484. struct transfer **wake;
  485. unsigned nwake;
  486. /* Number of transfers that this transfer waits for */
  487. unsigned nwait;
  488. /* Next transfer on this stream */
  489. struct transfer *next;
  490. )
  491. struct transfer_list pending;
  492. /* Tell for two transfers whether they should be handled in sequence */
  493. static int transfers_are_sequential(struct transfer *new_transfer, struct transfer *old_transfer)
  494. {
  495. int new_is_cuda STARPU_ATTRIBUTE_UNUSED, old_is_cuda STARPU_ATTRIBUTE_UNUSED;
  496. int new_is_opencl STARPU_ATTRIBUTE_UNUSED, old_is_opencl STARPU_ATTRIBUTE_UNUSED;
  497. int new_is_gpu_gpu, old_is_gpu_gpu;
  498. new_is_cuda = starpu_node_get_kind(new_transfer->src_node) == STARPU_CUDA_RAM;
  499. new_is_cuda |= starpu_node_get_kind(new_transfer->dst_node) == STARPU_CUDA_RAM;
  500. old_is_cuda = starpu_node_get_kind(old_transfer->src_node) == STARPU_CUDA_RAM;
  501. old_is_cuda |= starpu_node_get_kind(old_transfer->dst_node) == STARPU_CUDA_RAM;
  502. new_is_opencl = starpu_node_get_kind(new_transfer->src_node) == STARPU_OPENCL_RAM;
  503. new_is_opencl |= starpu_node_get_kind(new_transfer->dst_node) == STARPU_OPENCL_RAM;
  504. old_is_opencl = starpu_node_get_kind(old_transfer->src_node) == STARPU_OPENCL_RAM;
  505. old_is_opencl |= starpu_node_get_kind(old_transfer->dst_node) == STARPU_OPENCL_RAM;
  506. new_is_gpu_gpu = new_transfer->src_node && new_transfer->dst_node;
  507. old_is_gpu_gpu = old_transfer->src_node && old_transfer->dst_node;
  508. /* We ignore cuda-opencl transfers, they can not happen */
  509. STARPU_ASSERT(!((new_is_cuda && old_is_opencl) || (old_is_cuda && new_is_opencl)));
  510. /* The following constraints have been observed with CUDA alone */
  511. /* Same source/destination, sequential */
  512. if (new_transfer->src_node == old_transfer->src_node && new_transfer->dst_node == old_transfer->dst_node)
  513. return 1;
  514. /* Crossed GPU-GPU, sequential */
  515. if (new_is_gpu_gpu
  516. && new_transfer->src_node == old_transfer->dst_node
  517. && old_transfer->src_node == new_transfer->dst_node)
  518. return 1;
  519. /* GPU-GPU transfers are sequential with any RAM->GPU transfer */
  520. if (new_is_gpu_gpu
  521. && old_transfer->dst_node == new_transfer->src_node
  522. && old_transfer->dst_node == new_transfer->dst_node)
  523. return 1;
  524. if (old_is_gpu_gpu
  525. && new_transfer->dst_node == old_transfer->src_node
  526. && new_transfer->dst_node == old_transfer->dst_node)
  527. return 1;
  528. /* StarPU's constraint on CUDA transfers is using one stream per
  529. * source/destination pair, which is already handled above */
  530. return 0;
  531. }
  532. static void transfer_queue(struct transfer *transfer)
  533. {
  534. unsigned src = transfer->src_node;
  535. unsigned dst = transfer->dst_node;
  536. struct transfer_runner *t = &transfer_runner[src][dst];
  537. if (!t->runner)
  538. {
  539. /* No runner yet, start it */
  540. static starpu_pthread_mutex_t mutex; /* process_create may yield */
  541. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  542. if (!t->runner)
  543. {
  544. char s[64];
  545. snprintf(s, sizeof(s), "transfer %u-%u runner", src, dst);
  546. void **tsd = calloc(MAX_TSD+1, sizeof(void*));
  547. tsd[0] = (void*)(uintptr_t)((src<<16) + dst);
  548. t->runner = MSG_process_create_with_arguments(s, transfer_execute, tsd, _starpu_simgrid_get_memnode_host(src), 0, NULL);
  549. t->sem = MSG_sem_init(0);
  550. }
  551. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  552. }
  553. if (t->last_transfer)
  554. {
  555. /* Already running a transfer, queue */
  556. t->last_transfer->next = transfer;
  557. t->last_transfer = transfer;
  558. }
  559. else
  560. {
  561. STARPU_ASSERT(!t->first_transfer);
  562. t->first_transfer = transfer;
  563. t->last_transfer = transfer;
  564. }
  565. MSG_sem_release(t->sem);
  566. }
  567. /* Actually execute the transfer, and then start transfers waiting for this one. */
  568. static int transfer_execute(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[] STARPU_ATTRIBUTE_UNUSED)
  569. {
  570. /* FIXME: Ugly work-around for bug in simgrid: the MPI context is not properly set at MSG process startup */
  571. MSG_process_sleep(0.000001);
  572. unsigned src_dst = (uintptr_t) starpu_pthread_getspecific(0);
  573. unsigned src = src_dst >> 16;
  574. unsigned dst = src_dst & 0xffff;
  575. struct transfer_runner *t = &transfer_runner[src][dst];
  576. _STARPU_DEBUG("transfer runner %u-%u started\n", src, dst);
  577. while (1) {
  578. struct transfer *transfer;
  579. MSG_sem_acquire(t->sem);
  580. if (!runners_running)
  581. break;
  582. transfer = t->first_transfer;
  583. t->first_transfer = transfer->next;
  584. if (t->last_transfer == transfer)
  585. t->last_transfer = NULL;
  586. _STARPU_DEBUG("transfer %p started\n", transfer);
  587. MSG_task_execute(transfer->task);
  588. MSG_task_destroy(transfer->task);
  589. _STARPU_DEBUG("transfer %p finished\n", transfer);
  590. *transfer->finished = 1;
  591. transfer_list_erase(&pending, transfer);
  592. /* The workers which started this request may be sleeping out of tasks, wake it */
  593. _starpu_wake_all_blocked_workers_on_node(transfer->run_node);
  594. unsigned i;
  595. /* Wake transfers waiting for my termination */
  596. /* Note: due to possible preemption inside process_create, the array
  597. * may grow while doing this */
  598. for (i = 0; i < transfer->nwake; i++)
  599. {
  600. struct transfer *wake = transfer->wake[i];
  601. STARPU_ASSERT(wake->nwait > 0);
  602. wake->nwait--;
  603. if (!wake->nwait)
  604. {
  605. _STARPU_DEBUG("triggering transfer %p\n", wake);
  606. transfer_queue(wake);
  607. }
  608. }
  609. free(transfer->wake);
  610. free(transfer);
  611. }
  612. return 0;
  613. }
  614. /* Look for sequentialization between this transfer and pending transfers, and submit this one */
  615. static void transfer_submit(struct transfer *transfer)
  616. {
  617. struct transfer *old;
  618. for (old = transfer_list_begin(&pending);
  619. old != transfer_list_end(&pending);
  620. old = transfer_list_next(old))
  621. {
  622. if (transfers_are_sequential(transfer, old))
  623. {
  624. _STARPU_DEBUG("transfer %p(%d->%d) waits for %p(%d->%d)\n",
  625. transfer, transfer->src_node, transfer->dst_node,
  626. old, old->src_node, old->dst_node);
  627. /* Make new wait for the old */
  628. transfer->nwait++;
  629. /* Make old wake the new */
  630. _STARPU_REALLOC(old->wake, (old->nwake + 1) * sizeof(old->wake));
  631. old->wake[old->nwake] = transfer;
  632. old->nwake++;
  633. }
  634. }
  635. transfer_list_push_front(&pending, transfer);
  636. if (!transfer->nwait)
  637. {
  638. _STARPU_DEBUG("transfer %p waits for nobody, starting\n", transfer);
  639. transfer_queue(transfer);
  640. }
  641. }
  642. int _starpu_simgrid_wait_transfer_event(union _starpu_async_channel_event *event)
  643. {
  644. /* this is not associated to a request so it's synchronous */
  645. starpu_pthread_wait_t wait;
  646. starpu_pthread_wait_init(&wait);
  647. starpu_pthread_queue_register(&wait, event->queue);
  648. while(1)
  649. {
  650. starpu_pthread_wait_reset(&wait);
  651. if (event->finished)
  652. break;
  653. starpu_pthread_wait_wait(&wait);
  654. }
  655. starpu_pthread_queue_unregister(&wait, event->queue);
  656. starpu_pthread_wait_destroy(&wait);
  657. return 0;
  658. }
  659. int _starpu_simgrid_test_transfer_event(union _starpu_async_channel_event *event)
  660. {
  661. return event->finished;
  662. }
  663. /* Data transfer issued by StarPU */
  664. int _starpu_simgrid_transfer(size_t size, unsigned src_node, unsigned dst_node, struct _starpu_data_request *req)
  665. {
  666. /* Simgrid does not like 0-bytes transfers */
  667. if (!size)
  668. return 0;
  669. msg_task_t task;
  670. msg_host_t *hosts;
  671. double *computation;
  672. double *communication;
  673. union _starpu_async_channel_event *event, myevent;
  674. _STARPU_CALLOC(hosts, 2, sizeof(*hosts));
  675. _STARPU_CALLOC(computation, 2, sizeof(*computation));
  676. _STARPU_CALLOC(communication, 4, sizeof(*communication));
  677. hosts[0] = _starpu_simgrid_memory_node_get_host(src_node);
  678. hosts[1] = _starpu_simgrid_memory_node_get_host(dst_node);
  679. STARPU_ASSERT(hosts[0] != hosts[1]);
  680. communication[1] = size;
  681. task = MSG_parallel_task_create("copy", 2, hosts, computation, communication, NULL);
  682. struct transfer *transfer = transfer_new();
  683. _STARPU_DEBUG("creating transfer %p for %lu bytes\n", transfer, (unsigned long) size);
  684. transfer->task = task;
  685. transfer->src_node = src_node;
  686. transfer->dst_node = dst_node;
  687. transfer->run_node = _starpu_memory_node_get_local_key();
  688. if (req)
  689. event = &req->async_channel.event;
  690. else
  691. event = &myevent;
  692. event->finished = 0;
  693. transfer->finished = &event->finished;
  694. event->queue = &_starpu_simgrid_transfer_queue[transfer->run_node];
  695. transfer->wake = NULL;
  696. transfer->nwake = 0;
  697. transfer->nwait = 0;
  698. transfer->next = NULL;
  699. if (req)
  700. _STARPU_TRACE_START_DRIVER_COPY_ASYNC(src_node, dst_node);
  701. /* Sleep 10µs for the GPU transfer queueing */
  702. if (_starpu_simgrid_queue_malloc_cost())
  703. MSG_process_sleep(0.000010);
  704. transfer_submit(transfer);
  705. /* Note: from here, transfer might be already freed */
  706. if (req)
  707. {
  708. _STARPU_TRACE_END_DRIVER_COPY_ASYNC(src_node, dst_node);
  709. _STARPU_TRACE_DATA_COPY(src_node, dst_node, size);
  710. return -EAGAIN;
  711. }
  712. else
  713. {
  714. /* this is not associated to a request so it's synchronous */
  715. _starpu_simgrid_wait_transfer_event(event);
  716. return 0;
  717. }
  718. }
  719. int
  720. _starpu_simgrid_thread_start(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[])
  721. {
  722. void *(*f)(void*) = (void*) (uintptr_t) strtol(argv[0], NULL, 16);
  723. void *arg = (void*) (uintptr_t) strtol(argv[1], NULL, 16);
  724. /* FIXME: Ugly work-around for bug in simgrid: the MPI context is not properly set at MSG process startup */
  725. MSG_process_sleep(0.000001);
  726. /* _args is freed with process context */
  727. f(arg);
  728. return 0;
  729. }
  730. msg_host_t
  731. _starpu_simgrid_get_memnode_host(unsigned node)
  732. {
  733. const char *fmt;
  734. char name[16];
  735. switch (starpu_node_get_kind(node))
  736. {
  737. case STARPU_CPU_RAM:
  738. fmt = "RAM";
  739. break;
  740. case STARPU_CUDA_RAM:
  741. fmt = "CUDA%u";
  742. break;
  743. case STARPU_OPENCL_RAM:
  744. fmt = "OpenCL%u";
  745. break;
  746. default:
  747. STARPU_ABORT();
  748. break;
  749. }
  750. snprintf(name, sizeof(name), fmt, _starpu_memory_node_get_devid(node));
  751. return _starpu_simgrid_get_host_by_name(name);
  752. }
  753. void _starpu_simgrid_count_ngpus(void)
  754. {
  755. #if defined(HAVE_SG_LINK_NAME) && (SIMGRID_VERSION_MAJOR >= 4 || (SIMGRID_VERSION_MAJOR == 3 && SIMGRID_VERSION_MINOR >= 13))
  756. unsigned src, dst;
  757. msg_host_t ramhost = _starpu_simgrid_get_host_by_name("RAM");
  758. /* For each pair of memory nodes, get the route */
  759. for (src = 1; src < STARPU_MAXNODES; src++)
  760. for (dst = 1; dst < STARPU_MAXNODES; dst++)
  761. {
  762. int busid;
  763. msg_host_t srchost, dsthost;
  764. const SD_link_t *route;
  765. int i, routesize;
  766. int through;
  767. unsigned src2;
  768. unsigned ngpus;
  769. const char *name;
  770. if (dst == src)
  771. continue;
  772. busid = starpu_bus_get_id(src, dst);
  773. if (busid == -1)
  774. continue;
  775. srchost = _starpu_simgrid_get_memnode_host(src);
  776. dsthost = _starpu_simgrid_get_memnode_host(dst);
  777. routesize = SD_route_get_size(srchost, dsthost);
  778. route = SD_route_get_list(srchost, dsthost);
  779. /* If it goes through "Host", do not care, there is no
  780. * direct transfer support */
  781. for (i = 0; i < routesize; i++)
  782. if (!strcmp(sg_link_name(route[i]), "Host"))
  783. break;
  784. if (i < routesize)
  785. continue;
  786. /* Get the PCI bridge between down and up links */
  787. through = -1;
  788. for (i = 0; i < routesize; i++)
  789. {
  790. name = sg_link_name(route[i]);
  791. size_t len = strlen(name);
  792. if (!strcmp(" through", name+len-8))
  793. through = i;
  794. else if (!strcmp(" up", name+len-3))
  795. break;
  796. }
  797. /* Didn't find it ?! */
  798. if (through == -1)
  799. {
  800. _STARPU_DEBUG("Didn't find through-link for %d->%d\n", src, dst);
  801. continue;
  802. }
  803. name = sg_link_name(route[through]);
  804. /*
  805. * count how many direct routes go through it between
  806. * GPUs and RAM
  807. */
  808. ngpus = 0;
  809. for (src2 = 1; src2 < STARPU_MAXNODES; src2++)
  810. {
  811. if (starpu_bus_get_id(src2, STARPU_MAIN_RAM) == -1)
  812. continue;
  813. msg_host_t srchost2 = _starpu_simgrid_get_memnode_host(src2);
  814. int routesize2 = SD_route_get_size(srchost2, ramhost);
  815. const SD_link_t *route2 = SD_route_get_list(srchost2, ramhost);
  816. for (i = 0; i < routesize2; i++)
  817. if (!strcmp(name, sg_link_name(route2[i])))
  818. {
  819. /* This GPU goes through this PCI bridge to access RAM */
  820. ngpus++;
  821. break;
  822. }
  823. }
  824. _STARPU_DEBUG("%d->%d through %s, %u GPUs\n", src, dst, name, ngpus);
  825. starpu_bus_set_ngpus(busid, ngpus);
  826. }
  827. #endif
  828. }
  829. typedef struct{
  830. void_f_pvoid_t code;
  831. void *userparam;
  832. void *father_data;
  833. } thread_data_t;
  834. static int _starpu_simgrid_xbt_thread_create_wrapper(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[] STARPU_ATTRIBUTE_UNUSED)
  835. {
  836. /* FIXME: Ugly work-around for bug in simgrid: the MPI context is not properly set at MSG process startup */
  837. MSG_process_sleep(0.000001);
  838. #ifdef HAVE_SMX_ACTOR_T
  839. smx_actor_t
  840. #else
  841. smx_process_t
  842. #endif
  843. self = SIMIX_process_self();
  844. thread_data_t *t = SIMIX_process_self_get_data(self);
  845. simcall_process_set_data(self, t->father_data);
  846. t->code(t->userparam);
  847. simcall_process_set_data(self, NULL);
  848. free(t);
  849. return 0;
  850. }
  851. void _starpu_simgrid_xbt_thread_create(const char *name, void_f_pvoid_t code, void *param)
  852. {
  853. #ifdef HAVE_SMX_ACTOR_T
  854. smx_actor_t process STARPU_ATTRIBUTE_UNUSED;
  855. #else
  856. smx_process_t process STARPU_ATTRIBUTE_UNUSED;
  857. #endif
  858. thread_data_t *res;
  859. _STARPU_MALLOC(res, sizeof(thread_data_t));
  860. res->userparam = param;
  861. res->code = code;
  862. res->father_data = SIMIX_process_self_get_data(SIMIX_process_self());
  863. #if SIMGRID_VERSION_MAJOR < 3 || (SIMGRID_VERSION_MAJOR == 3 && SIMGRID_VERSION_MINOR < 12)
  864. simcall_process_create(&process,
  865. #else
  866. process = simcall_process_create(
  867. #endif
  868. name,
  869. _starpu_simgrid_xbt_thread_create_wrapper, res,
  870. #if SIMGRID_VERSION_MAJOR < 3 || (SIMGRID_VERSION_MAJOR == 3 && SIMGRID_VERSION_MINOR < 14)
  871. SIMIX_host_self_get_name(),
  872. #else
  873. SIMIX_host_self(),
  874. #endif
  875. #if SIMGRID_VERSION_MAJOR < 3 || (SIMGRID_VERSION_MAJOR == 3 && SIMGRID_VERSION_MINOR < 15)
  876. -1.0,
  877. #endif
  878. 0, NULL,
  879. /*props */ NULL
  880. #if SIMGRID_VERSION_MAJOR < 3 || (SIMGRID_VERSION_MAJOR == 3 && SIMGRID_VERSION_MINOR < 15)
  881. , 0
  882. #endif
  883. );
  884. }
  885. #endif