simgrid.c 39 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2012-2020 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
  4. * Copyright (C) 2013 Thibaut Lambert
  5. *
  6. * StarPU is free software; you can redistribute it and/or modify
  7. * it under the terms of the GNU Lesser General Public License as published by
  8. * the Free Software Foundation; either version 2.1 of the License, or (at
  9. * your option) any later version.
  10. *
  11. * StarPU is distributed in the hope that it will be useful, but
  12. * WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  14. *
  15. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  16. */
  17. #include <starpu.h>
  18. #include <datawizard/memory_nodes.h>
  19. #include <common/config.h>
  20. #ifdef HAVE_UNISTD_H
  21. #include <unistd.h>
  22. #endif
  23. #include <core/perfmodel/perfmodel.h>
  24. #include <core/workers.h>
  25. #include <core/simgrid.h>
  26. #if defined(HAVE_SIMGRID_SIMDAG_H) && (SIMGRID_VERSION >= 31300)
  27. #include <simgrid/simdag.h>
  28. #endif
  29. #ifdef STARPU_SIMGRID
  30. #ifdef HAVE_GETRLIMIT
  31. #include <sys/resource.h>
  32. #endif
  33. #include <simgrid/simix.h>
  34. #ifdef STARPU_HAVE_SIMGRID_HOST_H
  35. #include <simgrid/host.h>
  36. #endif
  37. #ifdef STARPU_HAVE_SIMGRID_ENGINE_H
  38. #include <simgrid/engine.h>
  39. #endif
  40. #ifdef STARPU_HAVE_XBT_CONFIG_H
  41. #include <xbt/config.h>
  42. #endif
  43. #include <smpi/smpi.h>
  44. #pragma weak starpu_main
  45. extern int starpu_main(int argc, char *argv[]);
  46. #if SIMGRID_VERSION < 31600
  47. #pragma weak smpi_main
  48. extern int smpi_main(int (*realmain) (int argc, char *argv[]), int argc, char *argv[]);
  49. #endif
  50. #pragma weak _starpu_mpi_simgrid_init
  51. extern int _starpu_mpi_simgrid_init(int argc, char *argv[]);
  52. #pragma weak smpi_process_set_user_data
  53. #if !HAVE_DECL_SMPI_PROCESS_SET_USER_DATA && !defined(smpi_process_set_user_data)
  54. extern void smpi_process_set_user_data(void *);
  55. #endif
  56. static double _starpu_simgrid_dynamic_energy = 0.0;
  57. /* 1 when MSG_init was done, 2 when initialized through redirected main, 3 when
  58. * initialized through MSG_process_attach */
  59. static int simgrid_started;
  60. static int simgrid_transfer_cost = 1;
  61. static int runners_running;
  62. starpu_pthread_queue_t _starpu_simgrid_transfer_queue[STARPU_MAXNODES];
  63. static struct transfer_runner
  64. {
  65. struct transfer *first_transfer, *last_transfer;
  66. starpu_sem_t sem;
  67. starpu_pthread_t runner;
  68. } transfer_runner[STARPU_MAXNODES][STARPU_MAXNODES];
  69. static void *transfer_execute(void *arg);
  70. starpu_pthread_queue_t _starpu_simgrid_task_queue[STARPU_NMAXWORKERS];
  71. static struct worker_runner
  72. {
  73. struct task *first_task, *last_task;
  74. starpu_sem_t sem;
  75. starpu_pthread_t runner;
  76. } worker_runner[STARPU_NMAXWORKERS];
  77. static void *task_execute(void *arg);
  78. size_t _starpu_default_stack_size = 8192;
  79. void _starpu_simgrid_set_stack_size(size_t stack_size)
  80. {
  81. #ifdef HAVE_SG_CFG_SET_INT
  82. sg_cfg_set_int("contexts/stack-size", stack_size);
  83. #elif SIMGRID_VERSION >= 31300
  84. xbt_cfg_set_int("contexts/stack-size", stack_size);
  85. #else
  86. extern xbt_cfg_t _sg_cfg_set;
  87. xbt_cfg_set_int(_sg_cfg_set, "contexts/stack_size", stack_size);
  88. #endif
  89. }
  90. #ifdef HAVE_SG_ACTOR_ON_EXIT
  91. static void on_exit_backtrace(int failed, void *data STARPU_ATTRIBUTE_UNUSED)
  92. {
  93. if (failed)
  94. xbt_backtrace_display_current();
  95. }
  96. #endif
  97. void _starpu_simgrid_actor_setup(void)
  98. {
  99. #ifdef HAVE_SG_ACTOR_ON_EXIT
  100. sg_actor_on_exit(on_exit_backtrace, NULL);
  101. #endif
  102. }
  103. #if defined(HAVE_SG_ZONE_GET_BY_NAME) || defined(sg_zone_get_by_name)
  104. #define HAVE_STARPU_SIMGRID_GET_AS_BY_NAME
  105. sg_netzone_t _starpu_simgrid_get_as_by_name(const char *name)
  106. {
  107. return sg_zone_get_by_name(name);
  108. }
  109. #elif defined(HAVE_MSG_ZONE_GET_BY_NAME) || defined(MSG_zone_get_by_name)
  110. #define HAVE_STARPU_SIMGRID_GET_AS_BY_NAME
  111. msg_as_t _starpu_simgrid_get_as_by_name(const char *name)
  112. {
  113. return MSG_zone_get_by_name(name);
  114. }
  115. #elif defined(HAVE_MSG_GET_AS_BY_NAME) || defined(MSG_get_as_by_name)
  116. #define HAVE_STARPU_SIMGRID_GET_AS_BY_NAME
  117. msg_as_t _starpu_simgrid_get_as_by_name(const char *name)
  118. {
  119. return MSG_get_as_by_name(name);
  120. }
  121. #elif defined(HAVE_MSG_ENVIRONMENT_GET_ROUTING_ROOT) || defined(MSG_environment_as_get_routing_sons)
  122. #define HAVE_STARPU_SIMGRID_GET_AS_BY_NAME
  123. static msg_as_t __starpu_simgrid_get_as_by_name(msg_as_t root, const char *name)
  124. {
  125. xbt_dict_t dict;
  126. xbt_dict_cursor_t cursor;
  127. const char *key;
  128. msg_as_t as, ret;
  129. dict = MSG_environment_as_get_routing_sons(root);
  130. xbt_dict_foreach(dict, cursor, key, as)
  131. {
  132. if (!strcmp(MSG_environment_as_get_name(as), name))
  133. return as;
  134. ret = __starpu_simgrid_get_as_by_name(as, name);
  135. if (ret)
  136. return ret;
  137. }
  138. return NULL;
  139. }
  140. msg_as_t _starpu_simgrid_get_as_by_name(const char *name)
  141. {
  142. return __starpu_simgrid_get_as_by_name(MSG_environment_get_routing_root(), name);
  143. }
  144. #endif /* HAVE_MSG_ENVIRONMENT_GET_ROUTING_ROOT */
  145. int _starpu_simgrid_get_nbhosts(const char *prefix)
  146. {
  147. int ret;
  148. #ifdef HAVE_SG_HOST_LIST
  149. sg_host_t *hosts_list = NULL;
  150. #endif
  151. xbt_dynar_t hosts = NULL;
  152. unsigned i, nb = 0;
  153. unsigned len = strlen(prefix);
  154. if (_starpu_simgrid_running_smpi())
  155. {
  156. #ifdef HAVE_STARPU_SIMGRID_GET_AS_BY_NAME
  157. char new_prefix[32];
  158. char name[32];
  159. STARPU_ASSERT(starpu_mpi_world_rank);
  160. snprintf(name, sizeof(name), STARPU_MPI_AS_PREFIX"%d", starpu_mpi_world_rank());
  161. #if defined(HAVE_MSG_ZONE_GET_HOSTS) || defined(HAVE_SG_ZONE_GET_HOSTS) || defined(MSG_zone_get_hosts) || defined(sg_zone_get_hosts)
  162. hosts = xbt_dynar_new(sizeof(sg_host_t), NULL);
  163. # if defined(HAVE_SG_ZONE_GET_HOSTS) || defined(sg_zone_get_hosts)
  164. sg_zone_get_hosts(_starpu_simgrid_get_as_by_name(name), hosts);
  165. # else
  166. MSG_zone_get_hosts(_starpu_simgrid_get_as_by_name(name), hosts);
  167. # endif
  168. #else
  169. hosts = MSG_environment_as_get_hosts(_starpu_simgrid_get_as_by_name(name));
  170. #endif
  171. snprintf(new_prefix, sizeof(new_prefix), "%s-%s", name, prefix);
  172. prefix = new_prefix;
  173. len = strlen(prefix);
  174. #else
  175. STARPU_ABORT_MSG("can not continue without an implementation for _starpu_simgrid_get_as_by_name");
  176. #endif /* HAVE_STARPU_SIMGRID_GET_AS_BY_NAME */
  177. }
  178. else
  179. {
  180. #ifdef HAVE_SG_HOST_LIST
  181. hosts_list = sg_host_list();
  182. nb = sg_host_count();
  183. #elif defined(STARPU_HAVE_SIMGRID_HOST_H)
  184. hosts = sg_hosts_as_dynar();
  185. #else
  186. hosts = MSG_hosts_as_dynar();
  187. #endif
  188. }
  189. if (hosts)
  190. nb = xbt_dynar_length(hosts);
  191. ret = 0;
  192. for (i = 0; i < nb; i++)
  193. {
  194. const char *name;
  195. #ifdef HAVE_SG_HOST_LIST
  196. if (hosts_list)
  197. name = sg_host_get_name(hosts_list[i]);
  198. else
  199. #endif
  200. #if defined(STARPU_HAVE_SIMGRID_HOST_H)
  201. name = sg_host_get_name(xbt_dynar_get_as(hosts, i, sg_host_t));
  202. #else
  203. name = MSG_host_get_name(xbt_dynar_get_as(hosts, i, msg_host_t));
  204. #endif
  205. if (!strncmp(name, prefix, len))
  206. ret++;
  207. }
  208. if (hosts)
  209. xbt_dynar_free(&hosts);
  210. return ret;
  211. }
  212. unsigned long long _starpu_simgrid_get_memsize(const char *prefix, unsigned devid)
  213. {
  214. char name[32];
  215. starpu_sg_host_t host;
  216. const char *memsize;
  217. snprintf(name, sizeof(name), "%s%u", prefix, devid);
  218. host = _starpu_simgrid_get_host_by_name(name);
  219. if (!host)
  220. return 0;
  221. #ifdef HAVE_SG_HOST_GET_PROPERTIES
  222. if (!sg_host_get_properties(host))
  223. #else
  224. if (!MSG_host_get_properties(host))
  225. #endif
  226. return 0;
  227. #ifdef HAVE_SG_HOST_GET_PROPERTIES
  228. memsize = sg_host_get_property_value(host, "memsize");
  229. #else
  230. memsize = MSG_host_get_property_value(host, "memsize");
  231. #endif
  232. if (!memsize)
  233. return 0;
  234. return atoll(memsize);
  235. }
  236. starpu_sg_host_t _starpu_simgrid_get_host_by_name(const char *name)
  237. {
  238. if (_starpu_simgrid_running_smpi())
  239. {
  240. char mpiname[32];
  241. STARPU_ASSERT(starpu_mpi_world_rank);
  242. snprintf(mpiname, sizeof(mpiname), STARPU_MPI_AS_PREFIX"%d-%s", starpu_mpi_world_rank(), name);
  243. #ifdef STARPU_HAVE_SIMGRID_HOST_H
  244. return sg_host_by_name(mpiname);
  245. #else
  246. return MSG_get_host_by_name(mpiname);
  247. #endif
  248. }
  249. else
  250. #ifdef STARPU_HAVE_SIMGRID_HOST_H
  251. return sg_host_by_name(name);
  252. #else
  253. return MSG_get_host_by_name(name);
  254. #endif
  255. }
  256. starpu_sg_host_t _starpu_simgrid_get_host_by_worker(struct _starpu_worker *worker)
  257. {
  258. char *prefix;
  259. char name[16];
  260. starpu_sg_host_t host;
  261. switch (worker->arch)
  262. {
  263. case STARPU_CPU_WORKER:
  264. prefix = "CPU";
  265. break;
  266. case STARPU_CUDA_WORKER:
  267. prefix = "CUDA";
  268. break;
  269. case STARPU_OPENCL_WORKER:
  270. prefix = "OpenCL";
  271. break;
  272. default:
  273. STARPU_ASSERT(0);
  274. }
  275. snprintf(name, sizeof(name), "%s%u", prefix, worker->devid);
  276. host = _starpu_simgrid_get_host_by_name(name);
  277. STARPU_ASSERT_MSG(host, "Could not find host %s!", name);
  278. return host;
  279. }
  280. /* Simgrid up to 3.15 would rename main into smpi_simulated_main_, and call that
  281. * from SMPI initialization
  282. * In case the MPI application didn't use smpicc to build the file containing
  283. * main(), but included our #define main starpu_main, try to cope by calling
  284. * starpu_main */
  285. int _starpu_smpi_simulated_main_(int argc, char *argv[])
  286. {
  287. if (!starpu_main)
  288. {
  289. _STARPU_ERROR("In simgrid mode, the file containing the main() function of this application needs to be compiled with starpu.h or starpu_simgrid_wrap.h included, to properly rename it into starpu_main\n");
  290. }
  291. return starpu_main(argc, argv);
  292. }
  293. int smpi_simulated_main_(int argc, char *argv[]) __attribute__((weak, alias("_starpu_smpi_simulated_main_")));
  294. /* This is used to start a non-MPI simgrid environment */
  295. void _starpu_start_simgrid(int *argc, char **argv)
  296. {
  297. char path[256];
  298. if (simgrid_started)
  299. return;
  300. simgrid_started = 1;
  301. #if defined(STARPU_SIMGRID_HAVE_SIMGRID_INIT) && defined(HAVE_SG_ACTOR_INIT)
  302. simgrid_init(argc, argv);
  303. #else
  304. MSG_init(argc, argv);
  305. #endif
  306. /* Simgrid uses tiny stacks by default. This comes unexpected to our users. */
  307. #ifdef HAVE_GETRLIMIT
  308. struct rlimit rlim;
  309. if (getrlimit(RLIMIT_STACK, &rlim) == 0 && rlim.rlim_cur != 0 && rlim.rlim_cur != RLIM_INFINITY)
  310. _starpu_default_stack_size = rlim.rlim_cur / 1024;
  311. #endif
  312. _starpu_simgrid_set_stack_size(_starpu_default_stack_size);
  313. /* Load XML platform */
  314. #if SIMGRID_VERSION < 31300
  315. _starpu_simgrid_get_platform_path(3, path, sizeof(path));
  316. #else
  317. _starpu_simgrid_get_platform_path(4, path, sizeof(path));
  318. #endif
  319. #if defined(STARPU_SIMGRID_HAVE_SIMGRID_INIT) && defined(HAVE_SG_ACTOR_INIT)
  320. simgrid_load_platform(path);
  321. #else
  322. MSG_create_environment(path);
  323. #endif
  324. int limit_bandwidth = starpu_get_env_number("STARPU_LIMIT_BANDWIDTH");
  325. if (limit_bandwidth >= 0)
  326. {
  327. #ifdef HAVE_SG_LINK_BANDWIDTH_SET
  328. sg_link_t *links = sg_link_list();
  329. int count = sg_link_count(), i;
  330. for (i = 0; i < count; i++) {
  331. sg_link_bandwidth_set(links[i], limit_bandwidth * 1000000.);
  332. }
  333. #else
  334. _STARPU_DISP("Warning: STARPU_LIMIT_BANDWIDTH set to %d but this requires simgrid 3.26, thus ignored\n", limit_bandwidth);
  335. #endif
  336. }
  337. simgrid_transfer_cost = starpu_get_env_number_default("STARPU_SIMGRID_TRANSFER_COST", 1);
  338. }
  339. static int main_ret;
  340. int do_starpu_main(int argc, char *argv[])
  341. {
  342. /* FIXME: Ugly work-around for bug in simgrid: the MPI context is not properly set at MSG process startup */
  343. starpu_sleep(0.000001);
  344. _starpu_simgrid_actor_setup();
  345. if (!starpu_main)
  346. {
  347. _STARPU_ERROR("In simgrid mode, the file containing the main() function of this application needs to be compiled with starpu.h or starpu_simgrid_wrap.h included, to properly rename it into starpu_main\n");
  348. }
  349. main_ret = starpu_main(argc, argv);
  350. return main_ret;
  351. }
  352. /* We need it only when using smpi */
  353. #pragma weak smpi_process_get_user_data
  354. extern void *smpi_process_get_user_data();
  355. /* This is hopefully called before the application and simgrid */
  356. #undef main
  357. #pragma weak main
  358. int main(int argc, char **argv)
  359. {
  360. #ifdef HAVE_SG_CONFIG_CONTINUE_AFTER_HELP
  361. sg_config_continue_after_help();
  362. #endif
  363. if (_starpu_simgrid_running_smpi())
  364. {
  365. if (!smpi_process_get_user_data)
  366. {
  367. _STARPU_ERROR("Your version of simgrid does not provide smpi_process_get_user_data, we can not continue without it\n");
  368. }
  369. #if SIMGRID_VERSION >= 31600
  370. /* Recent versions of simgrid dlopen() us, so we don't need to
  371. * do circumvolutions, just init MPI early and run the application's main */
  372. return _starpu_mpi_simgrid_init(argc, argv);
  373. #else
  374. /* Oops, we are running old SMPI, let it start Simgrid, and we'll
  375. * take back hand in _starpu_simgrid_init from starpu_init() */
  376. return smpi_main(_starpu_mpi_simgrid_init, argc, argv);
  377. #endif
  378. }
  379. /* Already initialized? It probably has been done through a
  380. * constructor and MSG_process_attach, directly jump to real main */
  381. if (simgrid_started == 3)
  382. {
  383. return do_starpu_main(argc, argv);
  384. }
  385. /* Managed to catch application's main, initialize simgrid first */
  386. _starpu_start_simgrid(&argc, argv);
  387. simgrid_started = 2;
  388. /* Create a simgrid process for main */
  389. char **argv_cpy;
  390. _STARPU_MALLOC(argv_cpy, argc * sizeof(char*));
  391. int i;
  392. for (i = 0; i < argc; i++)
  393. argv_cpy[i] = strdup(argv[i]);
  394. /* Run the application in a separate thread */
  395. _starpu_simgrid_actor_create("main", &do_starpu_main, _starpu_simgrid_get_host_by_name("MAIN"), argc, argv_cpy);
  396. /* And run maestro in the main thread */
  397. #if defined(STARPU_SIMGRID_HAVE_SIMGRID_INIT) && defined(HAVE_SG_ACTOR_INIT)
  398. simgrid_run();
  399. #else
  400. MSG_main();
  401. #endif
  402. return main_ret;
  403. }
  404. #if defined(HAVE_MSG_PROCESS_ATTACH) || defined(MSG_process_attach) || defined(HAVE_SG_ACTOR_ATTACH)
  405. static void maestro(void *data STARPU_ATTRIBUTE_UNUSED)
  406. {
  407. #if defined(STARPU_SIMGRID_HAVE_SIMGRID_INIT) && defined(HAVE_SG_ACTOR_INIT)
  408. simgrid_run();
  409. #else
  410. MSG_main();
  411. #endif
  412. }
  413. #endif
  414. /* This is called early from starpu_init, so thread functions etc. can work */
  415. void _starpu_simgrid_init_early(int *argc STARPU_ATTRIBUTE_UNUSED, char ***argv STARPU_ATTRIBUTE_UNUSED)
  416. {
  417. #ifdef HAVE_SG_CONFIG_CONTINUE_AFTER_HELP
  418. sg_config_continue_after_help();
  419. #endif
  420. #if defined(HAVE_MSG_PROCESS_ATTACH) || defined(MSG_process_attach) || defined(HAVE_SG_ACTOR_ATTACH)
  421. if (simgrid_started < 2 && !_starpu_simgrid_running_smpi())
  422. {
  423. /* "Cannot create_maestro with this ContextFactory.
  424. * Try using --cfg=contexts/factory:thread instead."
  425. * See https://github.com/simgrid/simgrid/issues/141 */
  426. _STARPU_DISP("Warning: In simgrid mode, the file containing the main() function of this application should to be compiled with starpu.h or starpu_simgrid_wrap.h included, to properly rename it into starpu_main to avoid having to use --cfg=contexts/factory:thread which reduces performance\n");
  427. #if SIMGRID_VERSION >= 31400 /* Only recent versions of simgrid support setting sg_cfg_set_string before starting simgrid */
  428. # ifdef HAVE_SG_CFG_SET_INT
  429. sg_cfg_set_string("contexts/factory", "thread");
  430. # else
  431. xbt_cfg_set_string("contexts/factory", "thread");
  432. # endif
  433. #endif
  434. /* We didn't catch application's main. */
  435. /* Start maestro as a separate thread */
  436. SIMIX_set_maestro(maestro, NULL);
  437. /* Initialize simgrid */
  438. _starpu_start_simgrid(argc, *argv);
  439. /* And attach the main thread to the main simgrid process */
  440. void **tsd;
  441. _STARPU_CALLOC(tsd, MAX_TSD+1, sizeof(void*));
  442. #if defined(HAVE_SG_ACTOR_ATTACH) && defined (HAVE_SG_ACTOR_DATA)
  443. sg_actor_t actor = sg_actor_attach("main", NULL, _starpu_simgrid_get_host_by_name("MAIN"), NULL);
  444. sg_actor_data_set(actor, tsd);
  445. #else
  446. MSG_process_attach("main", tsd, _starpu_simgrid_get_host_by_name("MAIN"), NULL);
  447. #endif
  448. /* We initialized through MSG_process_attach */
  449. simgrid_started = 3;
  450. }
  451. #endif
  452. if (!simgrid_started && !starpu_main && !_starpu_simgrid_running_smpi())
  453. {
  454. /* Oops, we don't have MSG_process_attach and didn't catch the
  455. * 'main' symbol, there is no way for us */
  456. _STARPU_ERROR("In simgrid mode, the file containing the main() function of this application needs to be compiled with starpu.h or starpu_simgrid_wrap.h included, to properly rename it into starpu_main\n");
  457. }
  458. if (_starpu_simgrid_running_smpi())
  459. {
  460. #ifndef STARPU_STATIC_ONLY
  461. _STARPU_ERROR("Simgrid currently does not support privatization for dynamically-linked libraries in SMPI. Please reconfigure and build StarPU with --disable-shared");
  462. #endif
  463. #if defined(HAVE_MSG_PROCESS_USERDATA_INIT) && !defined(HAVE_SG_ACTOR_DATA)
  464. MSG_process_userdata_init();
  465. #endif
  466. void **tsd;
  467. _STARPU_CALLOC(tsd, MAX_TSD+1, sizeof(void*));
  468. #ifdef HAVE_SG_ACTOR_DATA
  469. sg_actor_data_set(sg_actor_self(), tsd);
  470. #else
  471. smpi_process_set_user_data(tsd);
  472. #endif
  473. }
  474. unsigned i;
  475. for (i = 0; i < STARPU_MAXNODES; i++)
  476. starpu_pthread_queue_init(&_starpu_simgrid_transfer_queue[i]);
  477. for (i = 0; i < STARPU_NMAXWORKERS; i++)
  478. starpu_pthread_queue_init(&_starpu_simgrid_task_queue[i]);
  479. }
  480. /* This is called late from starpu_init, to start task executors */
  481. void _starpu_simgrid_init(void)
  482. {
  483. unsigned i;
  484. runners_running = 1;
  485. for (i = 0; i < starpu_worker_get_count(); i++)
  486. {
  487. char s[32];
  488. snprintf(s, sizeof(s), "worker %u runner", i);
  489. starpu_sem_init(&worker_runner[i].sem, 0, 0);
  490. starpu_pthread_create_on(s, &worker_runner[i].runner, NULL, task_execute, (void*)(uintptr_t) i, _starpu_simgrid_get_host_by_worker(_starpu_get_worker_struct(i)));
  491. }
  492. }
  493. void _starpu_simgrid_deinit_late(void)
  494. {
  495. #if defined(HAVE_MSG_PROCESS_ATTACH) || defined(MSG_process_attach) || defined(HAVE_SG_ACTOR_ATTACH)
  496. if (simgrid_started == 3)
  497. {
  498. /* Started with MSG_process_attach, now detach */
  499. #ifdef HAVE_SG_ACTOR_ATTACH
  500. sg_actor_detach();
  501. #else
  502. MSG_process_detach();
  503. #endif
  504. simgrid_started = 0;
  505. }
  506. #endif
  507. }
  508. void _starpu_simgrid_deinit(void)
  509. {
  510. unsigned i, j;
  511. runners_running = 0;
  512. for (i = 0; i < STARPU_MAXNODES; i++)
  513. {
  514. for (j = 0; j < STARPU_MAXNODES; j++)
  515. {
  516. struct transfer_runner *t = &transfer_runner[i][j];
  517. if (t->runner)
  518. {
  519. starpu_sem_post(&t->sem);
  520. #ifdef STARPU_HAVE_SIMGRID_ACTOR_H
  521. sg_actor_join(t->runner, 1000000);
  522. #elif SIMGRID_VERSION >= 31400
  523. MSG_process_join(t->runner, 1000000);
  524. #else
  525. starpu_sleep(1);
  526. #endif
  527. STARPU_ASSERT(t->first_transfer == NULL);
  528. STARPU_ASSERT(t->last_transfer == NULL);
  529. starpu_sem_destroy(&t->sem);
  530. }
  531. }
  532. /* FIXME: queue not empty at this point, needs proper unregistration */
  533. /* starpu_pthread_queue_destroy(&_starpu_simgrid_transfer_queue[i]); */
  534. }
  535. for (i = 0; i < starpu_worker_get_count(); i++)
  536. {
  537. struct worker_runner *w = &worker_runner[i];
  538. starpu_sem_post(&w->sem);
  539. #ifdef STARPU_HAVE_SIMGRID_ACTOR_H
  540. sg_actor_join(w->runner, 1000000);
  541. #elif SIMGRID_VERSION >= 31400
  542. MSG_process_join(w->runner, 1000000);
  543. #else
  544. starpu_sleep(1);
  545. #endif
  546. STARPU_ASSERT(w->first_task == NULL);
  547. STARPU_ASSERT(w->last_task == NULL);
  548. starpu_sem_destroy(&w->sem);
  549. starpu_pthread_queue_destroy(&_starpu_simgrid_task_queue[i]);
  550. }
  551. #if SIMGRID_VERSION >= 31300
  552. /* clean-atexit introduced in simgrid 3.13 */
  553. # ifdef HAVE_SG_CFG_SET_INT
  554. if ( sg_cfg_get_boolean("debug/clean-atexit"))
  555. # elif SIMGRID_VERSION >= 32300
  556. if ( xbt_cfg_get_boolean("debug/clean-atexit"))
  557. # else
  558. if ( xbt_cfg_get_boolean("clean-atexit"))
  559. # endif
  560. {
  561. _starpu_simgrid_deinit_late();
  562. }
  563. #endif
  564. }
  565. /*
  566. * Tasks
  567. */
  568. struct task
  569. {
  570. #ifdef HAVE_SG_ACTOR_SELF_EXECUTE
  571. double flops;
  572. #else
  573. msg_task_t task;
  574. #endif
  575. double energy;
  576. /* communication termination signalization */
  577. unsigned *finished;
  578. /* Next task on this worker */
  579. struct task *next;
  580. };
  581. /* Actually execute the task. */
  582. static void *task_execute(void *arg)
  583. {
  584. unsigned workerid = (uintptr_t) arg;
  585. struct worker_runner *w = &worker_runner[workerid];
  586. _STARPU_DEBUG("worker runner %u started\n", workerid);
  587. while (1)
  588. {
  589. struct task *task;
  590. starpu_sem_wait(&w->sem);
  591. if (!runners_running)
  592. break;
  593. task = w->first_task;
  594. w->first_task = task->next;
  595. if (w->last_task == task)
  596. w->last_task = NULL;
  597. _STARPU_DEBUG("task %p started\n", task);
  598. #ifdef HAVE_SG_ACTOR_EXECUTE
  599. sg_actor_execute(task->flops);
  600. #elif defined(HAVE_SG_ACTOR_SELF_EXECUTE)
  601. sg_actor_self_execute(task->flops);
  602. #else
  603. MSG_task_execute(task->task);
  604. MSG_task_destroy(task->task);
  605. #endif
  606. starpu_energy_use(task->energy);
  607. _STARPU_DEBUG("task %p finished\n", task);
  608. *task->finished = 1;
  609. /* The worker which started this task may be sleeping out of tasks, wake it */
  610. _starpu_wake_worker_relax(workerid);
  611. free(task);
  612. }
  613. _STARPU_DEBUG("worker %u stopped\n", workerid);
  614. return 0;
  615. }
  616. /* Wait for completion of all asynchronous tasks for this worker */
  617. void _starpu_simgrid_wait_tasks(int workerid)
  618. {
  619. struct task *task = worker_runner[workerid].last_task;
  620. if (!task)
  621. return;
  622. unsigned *finished = task->finished;
  623. starpu_pthread_wait_t wait;
  624. starpu_pthread_wait_init(&wait);
  625. starpu_pthread_queue_register(&wait, &_starpu_simgrid_task_queue[workerid]);
  626. while(1)
  627. {
  628. starpu_pthread_wait_reset(&wait);
  629. if (*finished)
  630. break;
  631. starpu_pthread_wait_wait(&wait);
  632. }
  633. starpu_pthread_queue_unregister(&wait, &_starpu_simgrid_task_queue[workerid]);
  634. starpu_pthread_wait_destroy(&wait);
  635. }
  636. /* Task execution submitted by StarPU */
  637. void _starpu_simgrid_submit_job(int workerid, int sched_ctx_id, struct _starpu_job *j, struct starpu_perfmodel_arch* perf_arch, double length, double energy, unsigned *finished)
  638. {
  639. struct starpu_task *starpu_task = j->task;
  640. double flops;
  641. #ifndef HAVE_SG_ACTOR_SELF_EXECUTE
  642. msg_task_t simgrid_task;
  643. #endif
  644. if (j->internal)
  645. /* This is not useful to include in simulation (and probably
  646. * doesn't have a perfmodel anyway) */
  647. return;
  648. if (isnan(length))
  649. {
  650. length = starpu_task_worker_expected_length(starpu_task, workerid, sched_ctx_id, j->nimpl);
  651. STARPU_ASSERT_MSG(!_STARPU_IS_ZERO(length) && !isnan(length),
  652. "Codelet %s does not have a perfmodel (in directory %s), or is not calibrated enough, please re-run in non-simgrid mode until it is calibrated, or fix the STARPU_HOSTNAME and STARPU_PERF_MODEL_DIR environment variables",
  653. _starpu_job_get_model_name(j), _starpu_get_perf_model_dir_codelet());
  654. /* TODO: option to add variance according to performance model,
  655. * to be able to easily check scheduling robustness */
  656. }
  657. if (isnan(energy))
  658. {
  659. energy = starpu_task_worker_expected_energy(starpu_task, workerid, sched_ctx_id, j->nimpl);
  660. /* TODO: option to add variance according to performance model,
  661. * to be able to easily check scheduling robustness */
  662. }
  663. #if defined(HAVE_SG_HOST_SPEED) || defined(sg_host_speed)
  664. # if defined(HAVE_SG_HOST_SELF) || defined(sg_host_self)
  665. flops = length/1000000.0*sg_host_speed(sg_host_self());
  666. # else
  667. flops = length/1000000.0*sg_host_speed(MSG_host_self());
  668. # endif
  669. #elif defined HAVE_MSG_HOST_GET_SPEED || defined(MSG_host_get_speed)
  670. flops = length/1000000.0*MSG_host_get_speed(MSG_host_self());
  671. #else
  672. flops = length/1000000.0*MSG_get_host_speed(MSG_host_self());
  673. #endif
  674. #ifndef HAVE_SG_ACTOR_SELF_EXECUTE
  675. simgrid_task = MSG_task_create(_starpu_job_get_task_name(j), flops, 0, NULL);
  676. #endif
  677. if (finished == NULL)
  678. {
  679. /* Synchronous execution */
  680. /* First wait for previous tasks */
  681. _starpu_simgrid_wait_tasks(workerid);
  682. #ifdef HAVE_SG_ACTOR_EXECUTE
  683. sg_actor_execute(flops);
  684. #elif defined(HAVE_SG_ACTOR_SELF_EXECUTE)
  685. sg_actor_self_execute(flops);
  686. #else
  687. MSG_task_execute(simgrid_task);
  688. MSG_task_destroy(simgrid_task);
  689. #endif
  690. starpu_energy_use(energy);
  691. }
  692. else
  693. {
  694. /* Asynchronous execution */
  695. struct task *task;
  696. struct worker_runner *w = &worker_runner[workerid];
  697. _STARPU_MALLOC(task, sizeof(*task));
  698. #ifdef HAVE_SG_ACTOR_SELF_EXECUTE
  699. task->flops = flops;
  700. #else
  701. task->task = simgrid_task;
  702. #endif
  703. task->energy = energy;
  704. task->finished = finished;
  705. *finished = 0;
  706. task->next = NULL;
  707. /* Sleep 10µs for the GPU task queueing */
  708. if (_starpu_simgrid_queue_malloc_cost())
  709. starpu_sleep(0.000010);
  710. if (w->last_task)
  711. {
  712. /* Already running a task, queue */
  713. w->last_task->next = task;
  714. w->last_task = task;
  715. }
  716. else
  717. {
  718. STARPU_ASSERT(!w->first_task);
  719. w->first_task = task;
  720. w->last_task = task;
  721. }
  722. starpu_sem_post(&w->sem);
  723. }
  724. }
  725. /*
  726. * Transfers
  727. */
  728. /* Note: simgrid is not parallel, so there is no need to hold locks for management of transfers. */
  729. LIST_TYPE(transfer,
  730. #if defined(HAVE_SG_HOST_SEND_TO) || defined(HAVE_SG_HOST_SENDTO)
  731. size_t size;
  732. #else
  733. msg_task_t task;
  734. #endif
  735. int src_node;
  736. int dst_node;
  737. int run_node;
  738. /* communication termination signalization */
  739. unsigned *finished;
  740. /* transfers which wait for this transfer */
  741. struct transfer **wake;
  742. unsigned nwake;
  743. /* Number of transfers that this transfer waits for */
  744. unsigned nwait;
  745. /* Next transfer on this stream */
  746. struct transfer *next;
  747. )
  748. struct transfer_list pending;
  749. /* Tell for two transfers whether they should be handled in sequence */
  750. static int transfers_are_sequential(struct transfer *new_transfer, struct transfer *old_transfer)
  751. {
  752. int new_is_cuda STARPU_ATTRIBUTE_UNUSED, old_is_cuda STARPU_ATTRIBUTE_UNUSED;
  753. int new_is_opencl STARPU_ATTRIBUTE_UNUSED, old_is_opencl STARPU_ATTRIBUTE_UNUSED;
  754. int new_is_gpu_gpu, old_is_gpu_gpu;
  755. new_is_cuda = starpu_node_get_kind(new_transfer->src_node) == STARPU_CUDA_RAM;
  756. new_is_cuda |= starpu_node_get_kind(new_transfer->dst_node) == STARPU_CUDA_RAM;
  757. old_is_cuda = starpu_node_get_kind(old_transfer->src_node) == STARPU_CUDA_RAM;
  758. old_is_cuda |= starpu_node_get_kind(old_transfer->dst_node) == STARPU_CUDA_RAM;
  759. new_is_opencl = starpu_node_get_kind(new_transfer->src_node) == STARPU_OPENCL_RAM;
  760. new_is_opencl |= starpu_node_get_kind(new_transfer->dst_node) == STARPU_OPENCL_RAM;
  761. old_is_opencl = starpu_node_get_kind(old_transfer->src_node) == STARPU_OPENCL_RAM;
  762. old_is_opencl |= starpu_node_get_kind(old_transfer->dst_node) == STARPU_OPENCL_RAM;
  763. new_is_gpu_gpu = new_transfer->src_node && new_transfer->dst_node;
  764. old_is_gpu_gpu = old_transfer->src_node && old_transfer->dst_node;
  765. /* We ignore cuda-opencl transfers, they can not happen */
  766. STARPU_ASSERT(!((new_is_cuda && old_is_opencl) || (old_is_cuda && new_is_opencl)));
  767. /* The following constraints have been observed with CUDA alone */
  768. /* Same source/destination, sequential */
  769. if (new_transfer->src_node == old_transfer->src_node && new_transfer->dst_node == old_transfer->dst_node)
  770. return 1;
  771. /* Crossed GPU-GPU, sequential */
  772. if (new_is_gpu_gpu
  773. && new_transfer->src_node == old_transfer->dst_node
  774. && old_transfer->src_node == new_transfer->dst_node)
  775. return 1;
  776. /* GPU-GPU transfers are sequential with any RAM->GPU transfer */
  777. if (new_is_gpu_gpu
  778. && (old_transfer->dst_node == new_transfer->src_node
  779. || old_transfer->dst_node == new_transfer->dst_node))
  780. return 1;
  781. if (old_is_gpu_gpu
  782. && (new_transfer->dst_node == old_transfer->src_node
  783. || new_transfer->dst_node == old_transfer->dst_node))
  784. return 1;
  785. /* StarPU's constraint on CUDA transfers is using one stream per
  786. * source/destination pair, which is already handled above */
  787. return 0;
  788. }
  789. static void transfer_queue(struct transfer *transfer)
  790. {
  791. unsigned src = transfer->src_node;
  792. unsigned dst = transfer->dst_node;
  793. struct transfer_runner *t = &transfer_runner[src][dst];
  794. if (!t->runner)
  795. {
  796. /* No runner yet, start it */
  797. static starpu_pthread_mutex_t mutex; /* process_create may yield */
  798. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  799. if (!t->runner)
  800. {
  801. char s[64];
  802. snprintf(s, sizeof(s), "transfer %u-%u runner", src, dst);
  803. starpu_pthread_create_on(s, &t->runner, NULL, transfer_execute, (void*)(uintptr_t)((src<<16) + dst), _starpu_simgrid_get_memnode_host(src));
  804. starpu_sem_init(&t->sem, 0, 0);
  805. }
  806. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  807. }
  808. if (t->last_transfer)
  809. {
  810. /* Already running a transfer, queue */
  811. t->last_transfer->next = transfer;
  812. t->last_transfer = transfer;
  813. }
  814. else
  815. {
  816. STARPU_ASSERT(!t->first_transfer);
  817. t->first_transfer = transfer;
  818. t->last_transfer = transfer;
  819. }
  820. starpu_sem_post(&t->sem);
  821. }
  822. /* Actually execute the transfer, and then start transfers waiting for this one. */
  823. static void *transfer_execute(void *arg)
  824. {
  825. unsigned src_dst = (uintptr_t) arg;
  826. unsigned src = src_dst >> 16;
  827. unsigned dst = src_dst & 0xffff;
  828. struct transfer_runner *t = &transfer_runner[src][dst];
  829. _STARPU_DEBUG("transfer runner %u-%u started\n", src, dst);
  830. while (1)
  831. {
  832. struct transfer *transfer;
  833. starpu_sem_wait(&t->sem);
  834. if (!runners_running)
  835. break;
  836. transfer = t->first_transfer;
  837. t->first_transfer = transfer->next;
  838. if (t->last_transfer == transfer)
  839. t->last_transfer = NULL;
  840. #if defined(HAVE_SG_HOST_SEND_TO) || defined(HAVE_SG_HOST_SENDTO)
  841. if (transfer->size)
  842. #else
  843. if (transfer->task)
  844. #endif
  845. {
  846. _STARPU_DEBUG("transfer %p started\n", transfer);
  847. #if defined(HAVE_SG_HOST_SEND_TO) || defined(HAVE_SG_HOST_SENDTO)
  848. #ifdef HAVE_SG_HOST_SENDTO
  849. sg_host_sendto
  850. #else
  851. sg_host_send_to
  852. #endif
  853. (_starpu_simgrid_memory_node_get_host(transfer->src_node),
  854. _starpu_simgrid_memory_node_get_host(transfer->dst_node),
  855. transfer->size);
  856. #else
  857. MSG_task_execute(transfer->task);
  858. MSG_task_destroy(transfer->task);
  859. #endif
  860. _STARPU_DEBUG("transfer %p finished\n", transfer);
  861. }
  862. *transfer->finished = 1;
  863. transfer_list_erase(&pending, transfer);
  864. /* The workers which started this request may be sleeping out of tasks, wake it */
  865. _starpu_wake_all_blocked_workers_on_node(transfer->run_node);
  866. unsigned i;
  867. /* Wake transfers waiting for my termination */
  868. /* Note: due to possible preemption inside process_create, the array
  869. * may grow while doing this */
  870. for (i = 0; i < transfer->nwake; i++)
  871. {
  872. struct transfer *wake = transfer->wake[i];
  873. STARPU_ASSERT(wake->nwait > 0);
  874. wake->nwait--;
  875. if (!wake->nwait)
  876. {
  877. _STARPU_DEBUG("triggering transfer %p\n", wake);
  878. transfer_queue(wake);
  879. }
  880. }
  881. free(transfer->wake);
  882. free(transfer);
  883. }
  884. return 0;
  885. }
  886. /* Look for sequentialization between this transfer and pending transfers, and submit this one */
  887. static void transfer_submit(struct transfer *transfer)
  888. {
  889. struct transfer *old;
  890. for (old = transfer_list_begin(&pending);
  891. old != transfer_list_end(&pending);
  892. old = transfer_list_next(old))
  893. {
  894. if (transfers_are_sequential(transfer, old))
  895. {
  896. _STARPU_DEBUG("transfer %p(%d->%d) waits for %p(%d->%d)\n",
  897. transfer, transfer->src_node, transfer->dst_node,
  898. old, old->src_node, old->dst_node);
  899. /* Make new wait for the old */
  900. transfer->nwait++;
  901. /* Make old wake the new */
  902. _STARPU_REALLOC(old->wake, (old->nwake + 1) * sizeof(old->wake));
  903. old->wake[old->nwake] = transfer;
  904. old->nwake++;
  905. }
  906. }
  907. transfer_list_push_front(&pending, transfer);
  908. if (!transfer->nwait)
  909. {
  910. _STARPU_DEBUG("transfer %p waits for nobody, starting\n", transfer);
  911. transfer_queue(transfer);
  912. }
  913. }
  914. int _starpu_simgrid_wait_transfer_event(union _starpu_async_channel_event *event)
  915. {
  916. /* this is not associated to a request so it's synchronous */
  917. starpu_pthread_wait_t wait;
  918. starpu_pthread_wait_init(&wait);
  919. starpu_pthread_queue_register(&wait, event->queue);
  920. while(1)
  921. {
  922. starpu_pthread_wait_reset(&wait);
  923. if (event->finished)
  924. break;
  925. starpu_pthread_wait_wait(&wait);
  926. }
  927. starpu_pthread_queue_unregister(&wait, event->queue);
  928. starpu_pthread_wait_destroy(&wait);
  929. return 0;
  930. }
  931. int _starpu_simgrid_test_transfer_event(union _starpu_async_channel_event *event)
  932. {
  933. return event->finished;
  934. }
  935. /* Wait for completion of all transfers */
  936. static void _starpu_simgrid_wait_transfers(void)
  937. {
  938. unsigned finished = 0;
  939. struct transfer *sync = transfer_new();
  940. struct transfer *cur;
  941. #if defined(HAVE_SG_HOST_SEND_TO) || defined(HAVE_SG_HOST_SENDTO)
  942. sync->size = 0;
  943. #else
  944. sync->task = NULL;
  945. #endif
  946. sync->finished = &finished;
  947. sync->src_node = STARPU_MAIN_RAM;
  948. sync->dst_node = STARPU_MAIN_RAM;
  949. sync->run_node = STARPU_MAIN_RAM;
  950. sync->wake = NULL;
  951. sync->nwake = 0;
  952. sync->nwait = 0;
  953. sync->next = NULL;
  954. for (cur = transfer_list_begin(&pending);
  955. cur != transfer_list_end(&pending);
  956. cur = transfer_list_next(cur))
  957. {
  958. sync->nwait++;
  959. _STARPU_REALLOC(cur->wake, (cur->nwake + 1) * sizeof(cur->wake));
  960. cur->wake[cur->nwake] = sync;
  961. cur->nwake++;
  962. }
  963. if (sync->nwait == 0)
  964. {
  965. /* No transfer to wait for */
  966. free(sync);
  967. return;
  968. }
  969. /* Push synchronization pseudo-transfer */
  970. transfer_list_push_front(&pending, sync);
  971. /* And wait for it */
  972. starpu_pthread_wait_t wait;
  973. starpu_pthread_wait_init(&wait);
  974. starpu_pthread_queue_register(&wait, &_starpu_simgrid_transfer_queue[STARPU_MAIN_RAM]);
  975. while(1)
  976. {
  977. starpu_pthread_wait_reset(&wait);
  978. if (finished)
  979. break;
  980. starpu_pthread_wait_wait(&wait);
  981. }
  982. starpu_pthread_queue_unregister(&wait, &_starpu_simgrid_transfer_queue[STARPU_MAIN_RAM]);
  983. starpu_pthread_wait_destroy(&wait);
  984. }
  985. /* Data transfer issued by StarPU */
  986. int _starpu_simgrid_transfer(size_t size, unsigned src_node, unsigned dst_node, struct _starpu_data_request *req)
  987. {
  988. /* Simgrid does not like 0-bytes transfers */
  989. if (!size)
  990. return 0;
  991. /* Explicitly disabled by user? */
  992. if (!simgrid_transfer_cost)
  993. return 0;
  994. union _starpu_async_channel_event *event, myevent;
  995. double start = 0.;
  996. struct transfer *transfer = transfer_new();
  997. _STARPU_DEBUG("creating transfer %p for %lu bytes\n", transfer, (unsigned long) size);
  998. #if defined(HAVE_SG_HOST_SEND_TO) || defined(HAVE_SG_HOST_SENDTO)
  999. transfer->size = size;
  1000. #else
  1001. msg_task_t task;
  1002. starpu_sg_host_t *hosts;
  1003. double *computation;
  1004. double *communication;
  1005. _STARPU_CALLOC(hosts, 2, sizeof(*hosts));
  1006. _STARPU_CALLOC(computation, 2, sizeof(*computation));
  1007. _STARPU_CALLOC(communication, 4, sizeof(*communication));
  1008. hosts[0] = _starpu_simgrid_memory_node_get_host(src_node);
  1009. hosts[1] = _starpu_simgrid_memory_node_get_host(dst_node);
  1010. STARPU_ASSERT(hosts[0] != hosts[1]);
  1011. communication[1] = size;
  1012. task = MSG_parallel_task_create("copy", 2, hosts, computation, communication, NULL);
  1013. transfer->task = task;
  1014. #endif
  1015. transfer->src_node = src_node;
  1016. transfer->dst_node = dst_node;
  1017. transfer->run_node = starpu_worker_get_local_memory_node();
  1018. if (req)
  1019. event = &req->async_channel.event;
  1020. else
  1021. event = &myevent;
  1022. event->finished = 0;
  1023. transfer->finished = &event->finished;
  1024. event->queue = &_starpu_simgrid_transfer_queue[transfer->run_node];
  1025. transfer->wake = NULL;
  1026. transfer->nwake = 0;
  1027. transfer->nwait = 0;
  1028. transfer->next = NULL;
  1029. if (req)
  1030. starpu_interface_start_driver_copy_async(src_node, dst_node, &start);
  1031. /* Sleep 10µs for the GPU transfer queueing */
  1032. if (_starpu_simgrid_queue_malloc_cost())
  1033. starpu_sleep(0.000010);
  1034. transfer_submit(transfer);
  1035. /* Note: from here, transfer might be already freed */
  1036. if (req)
  1037. {
  1038. starpu_interface_end_driver_copy_async(src_node, dst_node, start);
  1039. starpu_interface_data_copy(src_node, dst_node, size);
  1040. return -EAGAIN;
  1041. }
  1042. else
  1043. {
  1044. /* this is not associated to a request so it's synchronous */
  1045. _starpu_simgrid_wait_transfer_event(event);
  1046. return 0;
  1047. }
  1048. }
  1049. /* Sync all GPUs (used on CUDA Free, typically) */
  1050. void _starpu_simgrid_sync_gpus(void)
  1051. {
  1052. _starpu_simgrid_wait_transfers();
  1053. }
  1054. int
  1055. _starpu_simgrid_thread_start(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[])
  1056. {
  1057. void *(*f)(void*) = (void*) (uintptr_t) strtol(argv[0], NULL, 16);
  1058. void *arg = (void*) (uintptr_t) strtol(argv[1], NULL, 16);
  1059. /* FIXME: Ugly work-around for bug in simgrid: the MPI context is not properly set at MSG process startup */
  1060. starpu_sleep(0.000001);
  1061. _starpu_simgrid_actor_setup();
  1062. /* _args is freed with process context */
  1063. f(arg);
  1064. return 0;
  1065. }
  1066. starpu_pthread_t _starpu_simgrid_actor_create(const char *name, xbt_main_func_t code, starpu_sg_host_t host, int argc, char *argv[])
  1067. {
  1068. void **tsd;
  1069. starpu_pthread_t actor;
  1070. _STARPU_CALLOC(tsd, MAX_TSD+1, sizeof(void*));
  1071. #ifdef HAVE_SG_ACTOR_INIT
  1072. actor = sg_actor_init(name, host);
  1073. sg_actor_data_set(actor, tsd);
  1074. sg_actor_start(actor, code, argc, argv);
  1075. #else
  1076. actor = MSG_process_create_with_arguments(name, code, tsd, host, argc, argv);
  1077. #ifdef HAVE_SG_ACTOR_DATA
  1078. sg_actor_data_set(actor, tsd);
  1079. #endif
  1080. #endif
  1081. return actor;
  1082. }
  1083. starpu_sg_host_t _starpu_simgrid_get_memnode_host(unsigned node)
  1084. {
  1085. const char *fmt;
  1086. char name[16];
  1087. switch (starpu_node_get_kind(node))
  1088. {
  1089. case STARPU_CPU_RAM:
  1090. fmt = "RAM";
  1091. break;
  1092. case STARPU_CUDA_RAM:
  1093. fmt = "CUDA%u";
  1094. break;
  1095. case STARPU_OPENCL_RAM:
  1096. fmt = "OpenCL%u";
  1097. break;
  1098. case STARPU_DISK_RAM:
  1099. fmt = "DISK%u";
  1100. break;
  1101. default:
  1102. STARPU_ABORT();
  1103. break;
  1104. }
  1105. snprintf(name, sizeof(name), fmt, starpu_memory_node_get_devid(node));
  1106. return _starpu_simgrid_get_host_by_name(name);
  1107. }
  1108. void _starpu_simgrid_count_ngpus(void)
  1109. {
  1110. #if (defined(HAVE_SG_LINK_NAME) || defined sg_link_name) && (SIMGRID_VERSION >= 31300)
  1111. unsigned src, dst;
  1112. starpu_sg_host_t ramhost = _starpu_simgrid_get_host_by_name("RAM");
  1113. /* For each pair of memory nodes, get the route */
  1114. for (src = 1; src < STARPU_MAXNODES; src++)
  1115. for (dst = 1; dst < STARPU_MAXNODES; dst++)
  1116. {
  1117. int busid;
  1118. starpu_sg_host_t srchost, dsthost;
  1119. #if defined(HAVE_SG_HOST_ROUTE) || defined(sg_host_route)
  1120. xbt_dynar_t route_dynar = xbt_dynar_new(sizeof(SD_link_t), NULL);
  1121. SD_link_t *route;
  1122. #else
  1123. const SD_link_t *route;
  1124. #endif
  1125. int i, routesize;
  1126. int through;
  1127. unsigned src2;
  1128. unsigned ngpus;
  1129. const char *name;
  1130. if (dst == src)
  1131. continue;
  1132. busid = starpu_bus_get_id(src, dst);
  1133. if (busid == -1)
  1134. continue;
  1135. srchost = _starpu_simgrid_get_memnode_host(src);
  1136. dsthost = _starpu_simgrid_get_memnode_host(dst);
  1137. #if defined(HAVE_SG_HOST_ROUTE) || defined(sg_host_route)
  1138. sg_host_route(srchost, dsthost, route_dynar);
  1139. routesize = xbt_dynar_length(route_dynar);
  1140. route = xbt_dynar_to_array(route_dynar);
  1141. #else
  1142. routesize = SD_route_get_size(srchost, dsthost);
  1143. route = SD_route_get_list(srchost, dsthost);
  1144. #endif
  1145. /* If it goes through "Host", do not care, there is no
  1146. * direct transfer support */
  1147. for (i = 0; i < routesize; i++)
  1148. if (!strcmp(sg_link_name(route[i]), "Host"))
  1149. break;
  1150. if (i < routesize)
  1151. continue;
  1152. /* Get the PCI bridge between down and up links */
  1153. through = -1;
  1154. for (i = 0; i < routesize; i++)
  1155. {
  1156. name = sg_link_name(route[i]);
  1157. size_t len = strlen(name);
  1158. if (!strcmp(" through", name+len-8))
  1159. through = i;
  1160. else if (!strcmp(" up", name+len-3))
  1161. break;
  1162. }
  1163. /* Didn't find it ?! */
  1164. if (through == -1)
  1165. {
  1166. _STARPU_DEBUG("Didn't find through-link for %d->%d\n", src, dst);
  1167. continue;
  1168. }
  1169. name = sg_link_name(route[through]);
  1170. /*
  1171. * count how many direct routes go through it between
  1172. * GPUs and RAM
  1173. */
  1174. ngpus = 0;
  1175. for (src2 = 1; src2 < STARPU_MAXNODES; src2++)
  1176. {
  1177. int numa;
  1178. int nnumas = starpu_memory_nodes_get_numa_count();
  1179. int found = 0;
  1180. for (numa = 0; numa < nnumas; numa++)
  1181. if (starpu_bus_get_id(src2, numa) != -1)
  1182. {
  1183. found = 1;
  1184. break;
  1185. }
  1186. if (!found)
  1187. continue;
  1188. starpu_sg_host_t srchost2 = _starpu_simgrid_get_memnode_host(src2);
  1189. int routesize2;
  1190. #if defined(HAVE_SG_HOST_ROUTE) || defined(sg_host_route)
  1191. xbt_dynar_t route_dynar2 = xbt_dynar_new(sizeof(SD_link_t), NULL);
  1192. SD_link_t *route2;
  1193. sg_host_route(srchost2, ramhost, route_dynar2);
  1194. routesize2 = xbt_dynar_length(route_dynar2);
  1195. route2 = xbt_dynar_to_array(route_dynar2);
  1196. #else
  1197. const SD_link_t *route2 = SD_route_get_list(srchost2, ramhost);
  1198. routesize2 = SD_route_get_size(srchost2, ramhost);
  1199. #endif
  1200. for (i = 0; i < routesize2; i++)
  1201. if (!strcmp(name, sg_link_name(route2[i])))
  1202. {
  1203. /* This GPU goes through this PCI bridge to access RAM */
  1204. ngpus++;
  1205. break;
  1206. }
  1207. #if defined(HAVE_SG_HOST_ROUTE) || defined(sg_host_route)
  1208. free(route2);
  1209. #endif
  1210. }
  1211. _STARPU_DEBUG("%d->%d through %s, %u GPUs\n", src, dst, name, ngpus);
  1212. starpu_bus_set_ngpus(busid, ngpus);
  1213. #if defined(HAVE_SG_HOST_ROUTE) || defined(sg_host_route)
  1214. free(route);
  1215. #endif
  1216. }
  1217. #endif
  1218. }
  1219. #if 0
  1220. static size_t used;
  1221. void _starpu_simgrid_data_new(size_t size)
  1222. {
  1223. // Note: this is just declarative
  1224. //_STARPU_DISP("data new: %zd, now %zd\n", size, used);
  1225. }
  1226. void _starpu_simgrid_data_increase(size_t size)
  1227. {
  1228. used += size;
  1229. _STARPU_DISP("data increase: %zd, now %zd\n", size, used);
  1230. }
  1231. void _starpu_simgrid_data_alloc(size_t size)
  1232. {
  1233. used += size;
  1234. _STARPU_DISP("data alloc: %zd, now %zd\n", size, used);
  1235. }
  1236. void _starpu_simgrid_data_free(size_t size)
  1237. {
  1238. used -= size;
  1239. _STARPU_DISP("data free: %zd, now %zd\n", size, used);
  1240. }
  1241. void _starpu_simgrid_data_transfer(size_t size, unsigned src_node, unsigned dst_node)
  1242. {
  1243. _STARPU_DISP("data transfer %zd from %u to %u\n", size, src_node, dst_node);
  1244. }
  1245. #endif
  1246. void starpu_energy_use(float joules)
  1247. {
  1248. _starpu_simgrid_dynamic_energy += joules;
  1249. }
  1250. double starpu_energy_used(void)
  1251. {
  1252. float idle_power = starpu_get_env_float_default("STARPU_IDLE_POWER", 0.0);
  1253. return _starpu_simgrid_dynamic_energy + idle_power * starpu_timing_now() / 1000000;
  1254. }
  1255. #endif