simgrid.c 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2012-2017 Université de Bordeaux
  4. * Copyright (C) 2016 Inria
  5. * Copyright (C) 2016, 2017 CNRS
  6. *
  7. * StarPU is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU Lesser General Public License as published by
  9. * the Free Software Foundation; either version 2.1 of the License, or (at
  10. * your option) any later version.
  11. *
  12. * StarPU is distributed in the hope that it will be useful, but
  13. * WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  15. *
  16. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  17. */
  18. #include <starpu.h>
  19. #include <datawizard/memory_nodes.h>
  20. #include <common/config.h>
  21. #ifdef HAVE_UNISTD_H
  22. #include <unistd.h>
  23. #endif
  24. #include <core/perfmodel/perfmodel.h>
  25. #include <core/workers.h>
  26. #include <core/simgrid.h>
  27. #if defined(HAVE_SG_LINK_NAME) && (SIMGRID_VERSION_MAJOR >= 4 || (SIMGRID_VERSION_MAJOR == 3 && SIMGRID_VERSION_MINOR >= 13))
  28. #include <simgrid/simdag.h>
  29. #endif
  30. #ifdef STARPU_SIMGRID
  31. #include <sys/resource.h>
  32. #include <simgrid/simix.h>
  33. #pragma weak starpu_main
  34. extern int starpu_main(int argc, char *argv[]);
  35. #pragma weak smpi_main
  36. extern int smpi_main(int (*realmain) (int argc, char *argv[]), int argc, char *argv[]);
  37. #pragma weak _starpu_mpi_simgrid_init
  38. extern int _starpu_mpi_simgrid_init(int argc, char *argv[]);
  39. static int simgrid_started;
  40. starpu_pthread_queue_t _starpu_simgrid_transfer_queue[STARPU_MAXNODES];
  41. starpu_pthread_queue_t _starpu_simgrid_task_queue[STARPU_NMAXWORKERS];
  42. /* In case the MPI application didn't use smpicc to build the file containing
  43. * main(), try to cope by calling starpu_main */
  44. int _starpu_smpi_simulated_main_(int argc, char *argv[])
  45. {
  46. if (!starpu_main)
  47. {
  48. _STARPU_ERROR("In simgrid mode, the file containing the main() function of this application needs to be compiled with starpu.h or starpu_simgrid_wrap.h included, to properly rename it into starpu_main\n");
  49. }
  50. return starpu_main(argc, argv);
  51. }
  52. int smpi_simulated_main_(int argc, char *argv[]) __attribute__((weak, alias("_starpu_smpi_simulated_main_")));
  53. #ifdef HAVE_MSG_ENVIRONMENT_GET_ROUTING_ROOT
  54. #ifdef HAVE_MSG_GET_AS_BY_NAME
  55. msg_as_t _starpu_simgrid_get_as_by_name(const char *name)
  56. {
  57. return MSG_get_as_by_name(name);
  58. }
  59. #else /* HAVE_MSG_GET_AS_BY_NAME */
  60. static msg_as_t __starpu_simgrid_get_as_by_name(msg_as_t root, const char *name)
  61. {
  62. xbt_dict_t dict;
  63. xbt_dict_cursor_t cursor;
  64. const char *key;
  65. msg_as_t as, ret;
  66. dict = MSG_environment_as_get_routing_sons(root);
  67. xbt_dict_foreach(dict, cursor, key, as)
  68. {
  69. if (!strcmp(MSG_environment_as_get_name(as), name))
  70. return as;
  71. ret = __starpu_simgrid_get_as_by_name(as, name);
  72. if (ret)
  73. return ret;
  74. }
  75. return NULL;
  76. }
  77. msg_as_t _starpu_simgrid_get_as_by_name(const char *name)
  78. {
  79. return __starpu_simgrid_get_as_by_name(MSG_environment_get_routing_root(), name);
  80. }
  81. #endif /* HAVE_MSG_GET_AS_BY_NAME */
  82. #endif /* HAVE_MSG_ENVIRONMENT_GET_ROUTING_ROOT */
  83. int _starpu_simgrid_get_nbhosts(const char *prefix)
  84. {
  85. int ret;
  86. xbt_dynar_t hosts;
  87. unsigned i, nb;
  88. unsigned len = strlen(prefix);
  89. #ifdef HAVE_MSG_ENVIRONMENT_GET_ROUTING_ROOT
  90. char new_prefix[32];
  91. if (_starpu_simgrid_running_smpi())
  92. {
  93. char name[32];
  94. STARPU_ASSERT(starpu_mpi_world_rank);
  95. snprintf(name, sizeof(name), STARPU_MPI_AS_PREFIX"%u", starpu_mpi_world_rank());
  96. hosts = MSG_environment_as_get_hosts(_starpu_simgrid_get_as_by_name(name));
  97. snprintf(new_prefix, sizeof(new_prefix), "%s-%s", name, prefix);
  98. prefix = new_prefix;
  99. len = strlen(prefix);
  100. }
  101. else
  102. #endif /* HAVE_MSG_ENVIRONMENT_GET_ROUTING_ROOT */
  103. hosts = MSG_hosts_as_dynar();
  104. nb = xbt_dynar_length(hosts);
  105. ret = 0;
  106. for (i = 0; i < nb; i++)
  107. {
  108. const char *name;
  109. name = MSG_host_get_name(xbt_dynar_get_as(hosts, i, msg_host_t));
  110. if (!strncmp(name, prefix, len))
  111. ret++;
  112. }
  113. xbt_dynar_free(&hosts);
  114. return ret;
  115. }
  116. unsigned long long _starpu_simgrid_get_memsize(const char *prefix, unsigned devid)
  117. {
  118. char name[32];
  119. msg_host_t host;
  120. const char *memsize;
  121. snprintf(name, sizeof(name), "%s%u", prefix, devid);
  122. host = _starpu_simgrid_get_host_by_name(name);
  123. if (!host)
  124. return 0;
  125. if (!MSG_host_get_properties(host))
  126. return 0;
  127. memsize = MSG_host_get_property_value(host, "memsize");
  128. if (!memsize)
  129. return 0;
  130. return atoll(memsize);
  131. }
  132. msg_host_t _starpu_simgrid_get_host_by_name(const char *name)
  133. {
  134. if (_starpu_simgrid_running_smpi())
  135. {
  136. char mpiname[32];
  137. STARPU_ASSERT(starpu_mpi_world_rank);
  138. snprintf(mpiname, sizeof(mpiname), STARPU_MPI_AS_PREFIX"%d-%s", starpu_mpi_world_rank(), name);
  139. return MSG_get_host_by_name(mpiname);
  140. }
  141. else
  142. return MSG_get_host_by_name(name);
  143. }
  144. msg_host_t _starpu_simgrid_get_host_by_worker(struct _starpu_worker *worker)
  145. {
  146. char *prefix;
  147. char name[16];
  148. msg_host_t host;
  149. switch (worker->arch)
  150. {
  151. case STARPU_CPU_WORKER:
  152. prefix = "CPU";
  153. break;
  154. case STARPU_CUDA_WORKER:
  155. prefix = "CUDA";
  156. break;
  157. case STARPU_OPENCL_WORKER:
  158. prefix = "OpenCL";
  159. break;
  160. default:
  161. STARPU_ASSERT(0);
  162. }
  163. snprintf(name, sizeof(name), "%s%d", prefix, worker->devid);
  164. host = _starpu_simgrid_get_host_by_name(name);
  165. STARPU_ASSERT_MSG(host, "Could not find host %s!", name);
  166. return host;
  167. }
  168. static void start_simgrid(int *argc, char **argv)
  169. {
  170. char path[256];
  171. simgrid_started = 1;
  172. if (!starpu_main && !(smpi_main && smpi_simulated_main_ != _starpu_smpi_simulated_main_))
  173. {
  174. _STARPU_ERROR("In simgrid mode, the file containing the main() function of this application needs to be compiled with starpu.h or starpu_simgrid_wrap.h included, to properly rename it into starpu_main\n");
  175. }
  176. MSG_init(argc, argv);
  177. #if SIMGRID_VERSION_MAJOR < 3 || (SIMGRID_VERSION_MAJOR == 3 && SIMGRID_VERSION_MINOR < 9)
  178. /* Versions earlier than 3.9 didn't support our communication tasks */
  179. MSG_config("workstation/model", "ptask_L07");
  180. #endif
  181. /* Simgrid uses tiny stacks by default. This comes unexpected to our users. */
  182. unsigned stack_size = 8192;
  183. struct rlimit rlim;
  184. if (getrlimit(RLIMIT_STACK, &rlim) == 0 && rlim.rlim_cur != 0 && rlim.rlim_cur != RLIM_INFINITY)
  185. stack_size = rlim.rlim_cur / 1024;
  186. #if SIMGRID_VERSION_MAJOR < 3 || (SIMGRID_VERSION_MAJOR == 3 && SIMGRID_VERSION_MINOR < 13)
  187. extern xbt_cfg_t _sg_cfg_set;
  188. xbt_cfg_set_int(_sg_cfg_set, "contexts/stack_size", stack_size);
  189. #else
  190. xbt_cfg_set_int("contexts/stack-size", stack_size);
  191. #endif
  192. /* Load XML platform */
  193. #if SIMGRID_VERSION_MAJOR < 3 || (SIMGRID_VERSION_MAJOR == 3 && SIMGRID_VERSION_MINOR < 13)
  194. _starpu_simgrid_get_platform_path(3, path, sizeof(path));
  195. #else
  196. _starpu_simgrid_get_platform_path(4, path, sizeof(path));
  197. #endif
  198. MSG_create_environment(path);
  199. }
  200. struct main_args
  201. {
  202. int argc;
  203. char **argv;
  204. };
  205. static int main_ret;
  206. int do_starpu_main(int argc, char *argv[])
  207. {
  208. /* FIXME: Ugly work-around for bug in simgrid: the MPI context is not properly set at MSG process startup */
  209. MSG_process_sleep(0.000001);
  210. main_ret = starpu_main(argc, argv);
  211. return main_ret;
  212. }
  213. #undef main
  214. #pragma weak main
  215. int main(int argc, char **argv)
  216. {
  217. if (_starpu_simgrid_running_smpi())
  218. {
  219. /* Oops, we are running SMPI, let it start Simgrid, and we'll
  220. * take back hand in _starpu_simgrid_init from starpu_init() */
  221. return smpi_main(_starpu_mpi_simgrid_init, argc, argv);
  222. }
  223. /* Managed to catch application's main, initialize simgrid first */
  224. start_simgrid(&argc, argv);
  225. /* Create a simgrid process for main */
  226. char **argv_cpy;
  227. _STARPU_MALLOC(argv_cpy, argc * sizeof(char*));
  228. int i;
  229. for (i = 0; i < argc; i++)
  230. argv_cpy[i] = strdup(argv[i]);
  231. MSG_process_create_with_arguments("main", &do_starpu_main, calloc(MAX_TSD+1, sizeof(void*)), MSG_get_host_by_name("MAIN"), argc, argv_cpy);
  232. /* And run maestro in main thread */
  233. MSG_main();
  234. return main_ret;
  235. }
  236. static void maestro(void *data STARPU_ATTRIBUTE_UNUSED)
  237. {
  238. MSG_main();
  239. }
  240. void _starpu_simgrid_init(int *argc STARPU_ATTRIBUTE_UNUSED, char ***argv STARPU_ATTRIBUTE_UNUSED)
  241. {
  242. #ifdef HAVE_MSG_PROCESS_ATTACH
  243. if (!simgrid_started && !(smpi_main && smpi_simulated_main_ != _starpu_smpi_simulated_main_))
  244. {
  245. _STARPU_DISP("Warning: In simgrid mode, the file containing the main() function of this application should to be compiled with starpu.h or starpu_simgrid_wrap.h included, to properly rename it into starpu_main to avoid having to use --cfg=contexts/factory:thread which reduces performance\n");
  246. #if SIMGRID_VERSION_MAJOR > 3 || (SIMGRID_VERSION_MAJOR == 3 && SIMGRID_VERSION_MINOR >= 14)
  247. xbt_cfg_set_string("contexts/factory", "thread");
  248. #endif
  249. /* We didn't catch application's main. */
  250. /* Start maestro as a separate thread */
  251. SIMIX_set_maestro(maestro, NULL);
  252. /* Initialize simgrid */
  253. start_simgrid(argc, *argv);
  254. /* And attach the main thread to the main simgrid process */
  255. MSG_process_attach("main", calloc(MAX_TSD, sizeof(void*)), MSG_get_host_by_name("MAIN"), NULL);
  256. simgrid_started = 2;
  257. }
  258. #endif
  259. unsigned i;
  260. if (!simgrid_started && !starpu_main && !(smpi_main && smpi_simulated_main_ != _starpu_smpi_simulated_main_))
  261. {
  262. _STARPU_ERROR("In simgrid mode, the file containing the main() function of this application needs to be compiled with starpu.h or starpu_simgrid_wrap.h included, to properly rename it into starpu_main\n");
  263. }
  264. if (_starpu_simgrid_running_smpi())
  265. {
  266. #ifdef __PIC__
  267. _STARPU_ERROR("Simgrid currently does not support privatization for dynamically-linked libraries in SMPI. Please reconfigure and build StarPU with --disable-shared");
  268. #endif
  269. MSG_process_set_data(MSG_process_self(), calloc(MAX_TSD, sizeof(void*)));
  270. }
  271. for (i = 0; i < STARPU_MAXNODES; i++)
  272. starpu_pthread_queue_init(&_starpu_simgrid_transfer_queue[i]);
  273. for (i = 0; i < STARPU_NMAXWORKERS; i++)
  274. starpu_pthread_queue_init(&_starpu_simgrid_task_queue[i]);
  275. }
  276. void _starpu_simgrid_deinit(void)
  277. {
  278. #ifdef HAVE_MSG_PROCESS_ATTACH
  279. if (simgrid_started == 2)
  280. {
  281. /* Started with MSG_process_attach, now detach */
  282. MSG_process_detach();
  283. simgrid_started = 0;
  284. }
  285. #endif
  286. }
  287. /*
  288. * Tasks
  289. */
  290. struct task
  291. {
  292. msg_task_t task;
  293. int workerid;
  294. /* communication termination signalization */
  295. unsigned *finished;
  296. starpu_pthread_mutex_t *mutex;
  297. starpu_pthread_cond_t *cond;
  298. /* Task which waits for this task */
  299. struct task *next;
  300. };
  301. static struct task *last_task[STARPU_NMAXWORKERS];
  302. /* Actually execute the task. */
  303. static int task_execute(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[] STARPU_ATTRIBUTE_UNUSED)
  304. {
  305. /* FIXME: Ugly work-around for bug in simgrid: the MPI context is not properly set at MSG process startup */
  306. MSG_process_sleep(0.000001);
  307. struct task *task = starpu_pthread_getspecific(0);
  308. _STARPU_DEBUG("task %p started\n", task);
  309. MSG_task_execute(task->task);
  310. MSG_task_destroy(task->task);
  311. _STARPU_DEBUG("task %p finished\n", task);
  312. STARPU_PTHREAD_MUTEX_LOCK(task->mutex);
  313. *task->finished = 1;
  314. STARPU_PTHREAD_COND_BROADCAST(task->cond);
  315. STARPU_PTHREAD_MUTEX_UNLOCK(task->mutex);
  316. /* The worker which started this task may be sleeping out of tasks, wake it */
  317. starpu_wake_worker(task->workerid);
  318. if (last_task[task->workerid] == task)
  319. last_task[task->workerid] = NULL;
  320. if (task->next)
  321. {
  322. void **tsd = calloc(MAX_TSD+1, sizeof(void*));
  323. tsd[0] = task->next;
  324. MSG_process_create_with_arguments("task", task_execute, tsd, MSG_host_self(), 0, NULL);
  325. }
  326. /* Task is freed with process context */
  327. return 0;
  328. }
  329. /* Wait for completion of all asynchronous tasks for this worker */
  330. void _starpu_simgrid_wait_tasks(int workerid)
  331. {
  332. struct task *task = last_task[workerid];
  333. if (!task)
  334. return;
  335. unsigned *finished = task->finished;
  336. starpu_pthread_mutex_t *mutex = task->mutex;
  337. starpu_pthread_cond_t *cond = task->cond;
  338. STARPU_PTHREAD_MUTEX_LOCK(mutex);
  339. while (!*finished)
  340. STARPU_PTHREAD_COND_WAIT(cond, mutex);
  341. STARPU_PTHREAD_MUTEX_UNLOCK(mutex);
  342. }
  343. /* Task execution submitted by StarPU */
  344. void _starpu_simgrid_submit_job(int workerid, struct _starpu_job *j, struct starpu_perfmodel_arch* perf_arch, double length, unsigned *finished, starpu_pthread_mutex_t *mutex, starpu_pthread_cond_t *cond)
  345. {
  346. struct starpu_task *starpu_task = j->task;
  347. msg_task_t simgrid_task;
  348. if (j->internal)
  349. /* This is not useful to include in simulation (and probably
  350. * doesn't have a perfmodel anyway) */
  351. return;
  352. if (isnan(length))
  353. {
  354. length = starpu_task_expected_length(starpu_task, perf_arch, j->nimpl);
  355. STARPU_ASSERT_MSG(!_STARPU_IS_ZERO(length) && !isnan(length),
  356. "Codelet %s does not have a perfmodel, or is not calibrated enough, please re-run in non-simgrid mode until it is calibrated",
  357. _starpu_job_get_model_name(j));
  358. }
  359. simgrid_task = MSG_task_create(_starpu_job_get_task_name(j),
  360. #ifdef HAVE_MSG_HOST_GET_SPEED
  361. length/1000000.0*MSG_host_get_speed(MSG_host_self()),
  362. #else
  363. length/1000000.0*MSG_get_host_speed(MSG_host_self()),
  364. #endif
  365. 0, NULL);
  366. if (finished == NULL)
  367. {
  368. /* Synchronous execution */
  369. /* First wait for previous tasks */
  370. _starpu_simgrid_wait_tasks(workerid);
  371. MSG_task_execute(simgrid_task);
  372. MSG_task_destroy(simgrid_task);
  373. }
  374. else
  375. {
  376. /* Asynchronous execution */
  377. struct task *task;
  378. _STARPU_MALLOC(task, sizeof(*task));
  379. task->task = simgrid_task;
  380. task->workerid = workerid;
  381. task->finished = finished;
  382. *finished = 0;
  383. task->mutex = mutex;
  384. task->cond = cond;
  385. task->next = NULL;
  386. /* Sleep 10µs for the GPU task queueing */
  387. if (_starpu_simgrid_queue_malloc_cost())
  388. MSG_process_sleep(0.000010);
  389. if (last_task[workerid])
  390. {
  391. /* Make this task depend on the previous */
  392. last_task[workerid]->next = task;
  393. last_task[workerid] = task;
  394. }
  395. else
  396. {
  397. void **tsd;
  398. last_task[workerid] = task;
  399. tsd = calloc(MAX_TSD+1, sizeof(void*));
  400. tsd[0] = task;
  401. MSG_process_create_with_arguments("task", task_execute, tsd, MSG_host_self(), 0, NULL);
  402. }
  403. }
  404. }
  405. /*
  406. * Transfers
  407. */
  408. /* Note: simgrid is not parallel, so there is no need to hold locks for management of transfers. */
  409. LIST_TYPE(transfer,
  410. msg_task_t task;
  411. int src_node;
  412. int dst_node;
  413. int run_node;
  414. /* communication termination signalization */
  415. unsigned *finished;
  416. starpu_pthread_mutex_t *mutex;
  417. starpu_pthread_cond_t *cond;
  418. /* transfers which wait for this transfer */
  419. struct transfer **wake;
  420. unsigned nwake;
  421. /* Number of transfers that this transfer waits for */
  422. unsigned nwait;
  423. )
  424. struct transfer_list pending;
  425. /* Tell for two transfers whether they should be handled in sequence */
  426. static int transfers_are_sequential(struct transfer *new_transfer, struct transfer *old_transfer)
  427. {
  428. int new_is_cuda STARPU_ATTRIBUTE_UNUSED, old_is_cuda STARPU_ATTRIBUTE_UNUSED;
  429. int new_is_opencl STARPU_ATTRIBUTE_UNUSED, old_is_opencl STARPU_ATTRIBUTE_UNUSED;
  430. int new_is_gpu_gpu, old_is_gpu_gpu;
  431. new_is_cuda = starpu_node_get_kind(new_transfer->src_node) == STARPU_CUDA_RAM;
  432. new_is_cuda |= starpu_node_get_kind(new_transfer->dst_node) == STARPU_CUDA_RAM;
  433. old_is_cuda = starpu_node_get_kind(old_transfer->src_node) == STARPU_CUDA_RAM;
  434. old_is_cuda |= starpu_node_get_kind(old_transfer->dst_node) == STARPU_CUDA_RAM;
  435. new_is_opencl = starpu_node_get_kind(new_transfer->src_node) == STARPU_OPENCL_RAM;
  436. new_is_opencl |= starpu_node_get_kind(new_transfer->dst_node) == STARPU_OPENCL_RAM;
  437. old_is_opencl = starpu_node_get_kind(old_transfer->src_node) == STARPU_OPENCL_RAM;
  438. old_is_opencl |= starpu_node_get_kind(old_transfer->dst_node) == STARPU_OPENCL_RAM;
  439. new_is_gpu_gpu = new_transfer->src_node && new_transfer->dst_node;
  440. old_is_gpu_gpu = old_transfer->src_node && old_transfer->dst_node;
  441. /* We ignore cuda-opencl transfers, they can not happen */
  442. STARPU_ASSERT(!((new_is_cuda && old_is_opencl) || (old_is_cuda && new_is_opencl)));
  443. /* The following constraints have been observed with CUDA alone */
  444. /* Same source/destination, sequential */
  445. if (new_transfer->src_node == old_transfer->src_node && new_transfer->dst_node == old_transfer->dst_node)
  446. return 1;
  447. /* Crossed GPU-GPU, sequential */
  448. if (new_is_gpu_gpu
  449. && new_transfer->src_node == old_transfer->dst_node
  450. && old_transfer->src_node == new_transfer->dst_node)
  451. return 1;
  452. /* GPU-GPU transfers are sequential with any RAM->GPU transfer */
  453. if (new_is_gpu_gpu
  454. && old_transfer->dst_node == new_transfer->src_node
  455. && old_transfer->dst_node == new_transfer->dst_node)
  456. return 1;
  457. if (old_is_gpu_gpu
  458. && new_transfer->dst_node == old_transfer->src_node
  459. && new_transfer->dst_node == old_transfer->dst_node)
  460. return 1;
  461. /* StarPU's constraint on CUDA transfers is using one stream per
  462. * source/destination pair, which is already handled above */
  463. return 0;
  464. }
  465. /* Actually execute the transfer, and then start transfers waiting for this one. */
  466. static int transfer_execute(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[] STARPU_ATTRIBUTE_UNUSED)
  467. {
  468. /* FIXME: Ugly work-around for bug in simgrid: the MPI context is not properly set at MSG process startup */
  469. MSG_process_sleep(0.000001);
  470. struct transfer *transfer = starpu_pthread_getspecific(0);
  471. unsigned i;
  472. _STARPU_DEBUG("transfer %p started\n", transfer);
  473. MSG_task_execute(transfer->task);
  474. MSG_task_destroy(transfer->task);
  475. _STARPU_DEBUG("transfer %p finished\n", transfer);
  476. STARPU_PTHREAD_MUTEX_LOCK(transfer->mutex);
  477. *transfer->finished = 1;
  478. STARPU_PTHREAD_COND_BROADCAST(transfer->cond);
  479. STARPU_PTHREAD_MUTEX_UNLOCK(transfer->mutex);
  480. /* The workers which started this request may be sleeping out of tasks, wake it */
  481. _starpu_wake_all_blocked_workers_on_node(transfer->run_node);
  482. /* Wake transfers waiting for my termination */
  483. /* Note: due to possible preemption inside process_create, the array
  484. * may grow while doing this */
  485. for (i = 0; i < transfer->nwake; i++)
  486. {
  487. struct transfer *wake = transfer->wake[i];
  488. STARPU_ASSERT(wake->nwait > 0);
  489. wake->nwait--;
  490. if (!wake->nwait)
  491. {
  492. void **tsd;
  493. _STARPU_DEBUG("triggering transfer %p\n", wake);
  494. tsd = calloc(MAX_TSD+1, sizeof(void*));
  495. tsd[0] = wake;
  496. MSG_process_create_with_arguments("transfer task", transfer_execute, tsd, _starpu_simgrid_get_host_by_name("MAIN"), 0, NULL);
  497. }
  498. }
  499. free(transfer->wake);
  500. transfer_list_erase(&pending, transfer);
  501. /* transfer is freed with process context */
  502. return 0;
  503. }
  504. /* Look for sequentialization between this transfer and pending transfers, and submit this one */
  505. static void transfer_submit(struct transfer *transfer)
  506. {
  507. struct transfer *old;
  508. for (old = transfer_list_begin(&pending);
  509. old != transfer_list_end(&pending);
  510. old = transfer_list_next(old))
  511. {
  512. if (transfers_are_sequential(transfer, old))
  513. {
  514. _STARPU_DEBUG("transfer %p(%d->%d) waits for %p(%d->%d)\n",
  515. transfer, transfer->src_node, transfer->dst_node,
  516. old, old->src_node, old->dst_node);
  517. /* Make new wait for the old */
  518. transfer->nwait++;
  519. /* Make old wake the new */
  520. _STARPU_REALLOC(old->wake, (old->nwake + 1) * sizeof(old->wake));
  521. old->wake[old->nwake] = transfer;
  522. old->nwake++;
  523. }
  524. }
  525. transfer_list_push_front(&pending, transfer);
  526. if (!transfer->nwait)
  527. {
  528. void **tsd;
  529. _STARPU_DEBUG("transfer %p waits for nobody, starting\n", transfer);
  530. tsd = calloc(MAX_TSD+1, sizeof(void*));
  531. tsd[0] = transfer;
  532. MSG_process_create_with_arguments("transfer task", transfer_execute, tsd, _starpu_simgrid_get_host_by_name("MAIN"), 0, NULL);
  533. }
  534. }
  535. /* Data transfer issued by StarPU */
  536. int _starpu_simgrid_transfer(size_t size, unsigned src_node, unsigned dst_node, struct _starpu_data_request *req)
  537. {
  538. /* Simgrid does not like 0-bytes transfers */
  539. if (!size)
  540. return 0;
  541. msg_task_t task;
  542. msg_host_t *hosts;
  543. double *computation;
  544. double *communication;
  545. starpu_pthread_mutex_t mutex;
  546. starpu_pthread_cond_t cond;
  547. unsigned finished;
  548. _STARPU_CALLOC(hosts, 2, sizeof(*hosts));
  549. _STARPU_CALLOC(computation, 2, sizeof(*computation));
  550. _STARPU_CALLOC(communication, 4, sizeof(*communication));
  551. hosts[0] = _starpu_simgrid_memory_node_get_host(src_node);
  552. hosts[1] = _starpu_simgrid_memory_node_get_host(dst_node);
  553. STARPU_ASSERT(hosts[0] != hosts[1]);
  554. communication[1] = size;
  555. task = MSG_parallel_task_create("copy", 2, hosts, computation, communication, NULL);
  556. struct transfer *transfer = transfer_new();
  557. _STARPU_DEBUG("creating transfer %p for %lu bytes\n", transfer, (unsigned long) size);
  558. transfer->task = task;
  559. transfer->src_node = src_node;
  560. transfer->dst_node = dst_node;
  561. transfer->run_node = _starpu_memory_node_get_local_key();
  562. if (req)
  563. {
  564. transfer->finished = &req->async_channel.event.finished;
  565. transfer->mutex = &req->async_channel.event.mutex;
  566. transfer->cond = &req->async_channel.event.cond;
  567. }
  568. else
  569. {
  570. transfer->finished = &finished;
  571. transfer->mutex = &mutex;
  572. transfer->cond = &cond;
  573. }
  574. *transfer->finished = 0;
  575. STARPU_PTHREAD_MUTEX_INIT(transfer->mutex, NULL);
  576. STARPU_PTHREAD_COND_INIT(transfer->cond, NULL);
  577. transfer->wake = NULL;
  578. transfer->nwake = 0;
  579. transfer->nwait = 0;
  580. if (req)
  581. _STARPU_TRACE_START_DRIVER_COPY_ASYNC(src_node, dst_node);
  582. /* Sleep 10µs for the GPU transfer queueing */
  583. if (_starpu_simgrid_queue_malloc_cost())
  584. MSG_process_sleep(0.000010);
  585. transfer_submit(transfer);
  586. /* Note: from here, transfer might be already freed */
  587. if (req)
  588. {
  589. _STARPU_TRACE_END_DRIVER_COPY_ASYNC(src_node, dst_node);
  590. _STARPU_TRACE_DATA_COPY(src_node, dst_node, size);
  591. return -EAGAIN;
  592. }
  593. else
  594. {
  595. /* this is not associated to a request so it's synchronous */
  596. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  597. while (!finished)
  598. STARPU_PTHREAD_COND_WAIT(&cond, &mutex);
  599. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  600. return 0;
  601. }
  602. }
  603. int
  604. _starpu_simgrid_thread_start(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[])
  605. {
  606. void *(*f)(void*) = (void*) (uintptr_t) strtol(argv[0], NULL, 16);
  607. void *arg = (void*) (uintptr_t) strtol(argv[1], NULL, 16);
  608. /* FIXME: Ugly work-around for bug in simgrid: the MPI context is not properly set at MSG process startup */
  609. MSG_process_sleep(0.000001);
  610. /* _args is freed with process context */
  611. f(arg);
  612. return 0;
  613. }
  614. msg_host_t
  615. _starpu_simgrid_get_memnode_host(unsigned node)
  616. {
  617. const char *fmt;
  618. char name[16];
  619. switch (starpu_node_get_kind(node))
  620. {
  621. case STARPU_CPU_RAM:
  622. fmt = "RAM";
  623. break;
  624. case STARPU_CUDA_RAM:
  625. fmt = "CUDA%u";
  626. break;
  627. case STARPU_OPENCL_RAM:
  628. fmt = "OpenCL%u";
  629. break;
  630. default:
  631. STARPU_ABORT();
  632. break;
  633. }
  634. snprintf(name, sizeof(name), fmt, _starpu_memory_node_get_devid(node));
  635. return _starpu_simgrid_get_host_by_name(name);
  636. }
  637. void _starpu_simgrid_count_ngpus(void)
  638. {
  639. #if defined(HAVE_SG_LINK_NAME) && (SIMGRID_VERSION_MAJOR >= 4 || (SIMGRID_VERSION_MAJOR == 3 && SIMGRID_VERSION_MINOR >= 13))
  640. unsigned src, dst;
  641. msg_host_t ramhost = _starpu_simgrid_get_host_by_name("RAM");
  642. /* For each pair of memory nodes, get the route */
  643. for (src = 1; src < STARPU_MAXNODES; src++)
  644. for (dst = 1; dst < STARPU_MAXNODES; dst++)
  645. {
  646. int busid;
  647. msg_host_t srchost, dsthost;
  648. const SD_link_t *route;
  649. int i, routesize;
  650. int through;
  651. unsigned src2;
  652. unsigned ngpus;
  653. const char *name;
  654. if (dst == src)
  655. continue;
  656. busid = starpu_bus_get_id(src, dst);
  657. if (busid == -1)
  658. continue;
  659. srchost = _starpu_simgrid_get_memnode_host(src);
  660. dsthost = _starpu_simgrid_get_memnode_host(dst);
  661. routesize = SD_route_get_size(srchost, dsthost);
  662. route = SD_route_get_list(srchost, dsthost);
  663. /* If it goes through "Host", do not care, there is no
  664. * direct transfer support */
  665. for (i = 0; i < routesize; i++)
  666. if (!strcmp(sg_link_name(route[i]), "Host"))
  667. break;
  668. if (i < routesize)
  669. continue;
  670. /* Get the PCI bridge between down and up links */
  671. through = -1;
  672. for (i = 0; i < routesize; i++)
  673. {
  674. name = sg_link_name(route[i]);
  675. size_t len = strlen(name);
  676. if (!strcmp(" through", name+len-8))
  677. through = i;
  678. else if (!strcmp(" up", name+len-3))
  679. break;
  680. }
  681. /* Didn't find it ?! */
  682. if (through == -1)
  683. {
  684. _STARPU_DEBUG("Didn't find through-link for %d->%d\n", src, dst);
  685. continue;
  686. }
  687. name = sg_link_name(route[through]);
  688. /*
  689. * count how many direct routes go through it between
  690. * GPUs and RAM
  691. */
  692. ngpus = 0;
  693. for (src2 = 1; src2 < STARPU_MAXNODES; src2++)
  694. {
  695. if (starpu_bus_get_id(src2, STARPU_MAIN_RAM) == -1)
  696. continue;
  697. msg_host_t srchost2 = _starpu_simgrid_get_memnode_host(src2);
  698. int routesize2 = SD_route_get_size(srchost2, ramhost);
  699. const SD_link_t *route2 = SD_route_get_list(srchost2, ramhost);
  700. for (i = 0; i < routesize2; i++)
  701. if (!strcmp(name, sg_link_name(route2[i])))
  702. {
  703. /* This GPU goes through this PCI bridge to access RAM */
  704. ngpus++;
  705. break;
  706. }
  707. }
  708. _STARPU_DEBUG("%d->%d through %s, %u GPUs\n", src, dst, name, ngpus);
  709. starpu_bus_set_ngpus(busid, ngpus);
  710. }
  711. #endif
  712. }
  713. typedef struct{
  714. void_f_pvoid_t code;
  715. void *userparam;
  716. void *father_data;
  717. } thread_data_t;
  718. static int _starpu_simgrid_xbt_thread_create_wrapper(int argc, char *argv[])
  719. {
  720. /* FIXME: Ugly work-around for bug in simgrid: the MPI context is not properly set at MSG process startup */
  721. MSG_process_sleep(0.000001);
  722. #ifdef HAVE_SMX_ACTOR_T
  723. smx_actor_t
  724. #else
  725. smx_process_t
  726. #endif
  727. self = SIMIX_process_self();
  728. thread_data_t *t = SIMIX_process_self_get_data(self);
  729. simcall_process_set_data(self, t->father_data);
  730. t->code(t->userparam);
  731. simcall_process_set_data(self, NULL);
  732. free(t);
  733. return 0;
  734. }
  735. void _starpu_simgrid_xbt_thread_create(const char *name, void_f_pvoid_t code, void *param)
  736. {
  737. #ifdef HAVE_SMX_ACTOR_T
  738. smx_actor_t process;
  739. #else
  740. smx_process_t process;
  741. #endif
  742. thread_data_t *res = malloc(sizeof(thread_data_t));
  743. res->userparam = param;
  744. res->code = code;
  745. res->father_data = SIMIX_process_self_get_data(SIMIX_process_self());
  746. #if SIMGRID_VERSION_MAJOR < 3 || (SIMGRID_VERSION_MAJOR == 3 && SIMGRID_VERSION_MINOR < 12)
  747. simcall_process_create(&process
  748. #else
  749. process = simcall_process_create(
  750. #endif
  751. name,
  752. _starpu_simgrid_xbt_thread_create_wrapper, res,
  753. SIMIX_host_self_get_name(), -1.0, 0, NULL,
  754. /*props */ NULL,0);
  755. }
  756. #endif