simgrid.c 38 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2012-2020 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
  4. * Copyright (C) 2013 Thibaut Lambert
  5. *
  6. * StarPU is free software; you can redistribute it and/or modify
  7. * it under the terms of the GNU Lesser General Public License as published by
  8. * the Free Software Foundation; either version 2.1 of the License, or (at
  9. * your option) any later version.
  10. *
  11. * StarPU is distributed in the hope that it will be useful, but
  12. * WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  14. *
  15. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  16. */
  17. #include <starpu.h>
  18. #include <datawizard/memory_nodes.h>
  19. #include <common/config.h>
  20. #ifdef HAVE_UNISTD_H
  21. #include <unistd.h>
  22. #endif
  23. #include <core/perfmodel/perfmodel.h>
  24. #include <core/workers.h>
  25. #include <core/simgrid.h>
  26. #if defined(HAVE_SIMGRID_SIMDAG_H) && (SIMGRID_VERSION >= 31300)
  27. #include <simgrid/simdag.h>
  28. #endif
  29. #ifdef STARPU_SIMGRID
  30. #ifdef HAVE_GETRLIMIT
  31. #include <sys/resource.h>
  32. #endif
  33. #include <simgrid/simix.h>
  34. #ifdef STARPU_HAVE_SIMGRID_HOST_H
  35. #include <simgrid/host.h>
  36. #endif
  37. #ifdef STARPU_HAVE_SIMGRID_ENGINE_H
  38. #include <simgrid/engine.h>
  39. #endif
  40. #ifdef STARPU_HAVE_XBT_CONFIG_H
  41. #include <xbt/config.h>
  42. #endif
  43. #include <smpi/smpi.h>
  44. #pragma weak starpu_main
  45. extern int starpu_main(int argc, char *argv[]);
  46. #if SIMGRID_VERSION < 31600
  47. #pragma weak smpi_main
  48. extern int smpi_main(int (*realmain) (int argc, char *argv[]), int argc, char *argv[]);
  49. #endif
  50. #pragma weak _starpu_mpi_simgrid_init
  51. extern int _starpu_mpi_simgrid_init(int argc, char *argv[]);
  52. #pragma weak smpi_process_set_user_data
  53. #if !HAVE_DECL_SMPI_PROCESS_SET_USER_DATA && !defined(smpi_process_set_user_data)
  54. extern void smpi_process_set_user_data(void *);
  55. #endif
  56. /* 1 when MSG_init was done, 2 when initialized through redirected main, 3 when
  57. * initialized through MSG_process_attach */
  58. static int simgrid_started;
  59. static int simgrid_transfer_cost = 1;
  60. static int runners_running;
  61. starpu_pthread_queue_t _starpu_simgrid_transfer_queue[STARPU_MAXNODES];
  62. static struct transfer_runner
  63. {
  64. struct transfer *first_transfer, *last_transfer;
  65. starpu_sem_t sem;
  66. starpu_pthread_t runner;
  67. } transfer_runner[STARPU_MAXNODES][STARPU_MAXNODES];
  68. static void *transfer_execute(void *arg);
  69. starpu_pthread_queue_t _starpu_simgrid_task_queue[STARPU_NMAXWORKERS];
  70. static struct worker_runner
  71. {
  72. struct task *first_task, *last_task;
  73. starpu_sem_t sem;
  74. starpu_pthread_t runner;
  75. } worker_runner[STARPU_NMAXWORKERS];
  76. static void *task_execute(void *arg);
  77. size_t _starpu_default_stack_size = 8192;
  78. void _starpu_simgrid_set_stack_size(size_t stack_size)
  79. {
  80. #ifdef HAVE_SG_CFG_SET_INT
  81. sg_cfg_set_int("contexts/stack-size", stack_size);
  82. #elif SIMGRID_VERSION >= 31300
  83. xbt_cfg_set_int("contexts/stack-size", stack_size);
  84. #else
  85. extern xbt_cfg_t _sg_cfg_set;
  86. xbt_cfg_set_int(_sg_cfg_set, "contexts/stack_size", stack_size);
  87. #endif
  88. }
  89. #if defined(HAVE_SG_ZONE_GET_BY_NAME) || defined(sg_zone_get_by_name)
  90. #define HAVE_STARPU_SIMGRID_GET_AS_BY_NAME
  91. sg_netzone_t _starpu_simgrid_get_as_by_name(const char *name)
  92. {
  93. return sg_zone_get_by_name(name);
  94. }
  95. #elif defined(HAVE_MSG_ZONE_GET_BY_NAME) || defined(MSG_zone_get_by_name)
  96. #define HAVE_STARPU_SIMGRID_GET_AS_BY_NAME
  97. msg_as_t _starpu_simgrid_get_as_by_name(const char *name)
  98. {
  99. return MSG_zone_get_by_name(name);
  100. }
  101. #elif defined(HAVE_MSG_GET_AS_BY_NAME) || defined(MSG_get_as_by_name)
  102. #define HAVE_STARPU_SIMGRID_GET_AS_BY_NAME
  103. msg_as_t _starpu_simgrid_get_as_by_name(const char *name)
  104. {
  105. return MSG_get_as_by_name(name);
  106. }
  107. #elif defined(HAVE_MSG_ENVIRONMENT_GET_ROUTING_ROOT) || defined(MSG_environment_as_get_routing_sons)
  108. #define HAVE_STARPU_SIMGRID_GET_AS_BY_NAME
  109. static msg_as_t __starpu_simgrid_get_as_by_name(msg_as_t root, const char *name)
  110. {
  111. xbt_dict_t dict;
  112. xbt_dict_cursor_t cursor;
  113. const char *key;
  114. msg_as_t as, ret;
  115. dict = MSG_environment_as_get_routing_sons(root);
  116. xbt_dict_foreach(dict, cursor, key, as)
  117. {
  118. if (!strcmp(MSG_environment_as_get_name(as), name))
  119. return as;
  120. ret = __starpu_simgrid_get_as_by_name(as, name);
  121. if (ret)
  122. return ret;
  123. }
  124. return NULL;
  125. }
  126. msg_as_t _starpu_simgrid_get_as_by_name(const char *name)
  127. {
  128. return __starpu_simgrid_get_as_by_name(MSG_environment_get_routing_root(), name);
  129. }
  130. #endif /* HAVE_MSG_ENVIRONMENT_GET_ROUTING_ROOT */
  131. int _starpu_simgrid_get_nbhosts(const char *prefix)
  132. {
  133. int ret;
  134. #ifdef HAVE_SG_HOST_LIST
  135. sg_host_t *hosts_list = NULL;
  136. #endif
  137. xbt_dynar_t hosts = NULL;
  138. unsigned i, nb = 0;
  139. unsigned len = strlen(prefix);
  140. if (_starpu_simgrid_running_smpi())
  141. {
  142. #ifdef HAVE_STARPU_SIMGRID_GET_AS_BY_NAME
  143. char new_prefix[32];
  144. char name[32];
  145. STARPU_ASSERT(starpu_mpi_world_rank);
  146. snprintf(name, sizeof(name), STARPU_MPI_AS_PREFIX"%d", starpu_mpi_world_rank());
  147. #if defined(HAVE_MSG_ZONE_GET_HOSTS) || defined(HAVE_SG_ZONE_GET_HOSTS) || defined(MSG_zone_get_hosts) || defined(sg_zone_get_hosts)
  148. hosts = xbt_dynar_new(sizeof(sg_host_t), NULL);
  149. # if defined(HAVE_SG_ZONE_GET_HOSTS) || defined(sg_zone_get_hosts)
  150. sg_zone_get_hosts(_starpu_simgrid_get_as_by_name(name), hosts);
  151. # else
  152. MSG_zone_get_hosts(_starpu_simgrid_get_as_by_name(name), hosts);
  153. # endif
  154. #else
  155. hosts = MSG_environment_as_get_hosts(_starpu_simgrid_get_as_by_name(name));
  156. #endif
  157. snprintf(new_prefix, sizeof(new_prefix), "%s-%s", name, prefix);
  158. prefix = new_prefix;
  159. len = strlen(prefix);
  160. #else
  161. STARPU_ABORT_MSG("can not continue without an implementation for _starpu_simgrid_get_as_by_name");
  162. #endif /* HAVE_STARPU_SIMGRID_GET_AS_BY_NAME */
  163. }
  164. else
  165. {
  166. #ifdef HAVE_SG_HOST_LIST
  167. hosts_list = sg_host_list();
  168. nb = sg_host_count();
  169. #elif defined(STARPU_HAVE_SIMGRID_HOST_H)
  170. hosts = sg_hosts_as_dynar();
  171. #else
  172. hosts = MSG_hosts_as_dynar();
  173. #endif
  174. }
  175. if (hosts)
  176. nb = xbt_dynar_length(hosts);
  177. ret = 0;
  178. for (i = 0; i < nb; i++)
  179. {
  180. const char *name;
  181. #ifdef HAVE_SG_HOST_LIST
  182. if (hosts_list)
  183. name = sg_host_get_name(hosts_list[i]);
  184. else
  185. #endif
  186. #if defined(STARPU_HAVE_SIMGRID_HOST_H)
  187. name = sg_host_get_name(xbt_dynar_get_as(hosts, i, sg_host_t));
  188. #else
  189. name = MSG_host_get_name(xbt_dynar_get_as(hosts, i, msg_host_t));
  190. #endif
  191. if (!strncmp(name, prefix, len))
  192. ret++;
  193. }
  194. if (hosts)
  195. xbt_dynar_free(&hosts);
  196. return ret;
  197. }
  198. unsigned long long _starpu_simgrid_get_memsize(const char *prefix, unsigned devid)
  199. {
  200. char name[32];
  201. starpu_sg_host_t host;
  202. const char *memsize;
  203. snprintf(name, sizeof(name), "%s%u", prefix, devid);
  204. host = _starpu_simgrid_get_host_by_name(name);
  205. if (!host)
  206. return 0;
  207. #ifdef HAVE_SG_HOST_GET_PROPERTIES
  208. if (!sg_host_get_properties(host))
  209. #else
  210. if (!MSG_host_get_properties(host))
  211. #endif
  212. return 0;
  213. #ifdef HAVE_SG_HOST_GET_PROPERTIES
  214. memsize = sg_host_get_property_value(host, "memsize");
  215. #else
  216. memsize = MSG_host_get_property_value(host, "memsize");
  217. #endif
  218. if (!memsize)
  219. return 0;
  220. return atoll(memsize);
  221. }
  222. starpu_sg_host_t _starpu_simgrid_get_host_by_name(const char *name)
  223. {
  224. if (_starpu_simgrid_running_smpi())
  225. {
  226. char mpiname[32];
  227. STARPU_ASSERT(starpu_mpi_world_rank);
  228. snprintf(mpiname, sizeof(mpiname), STARPU_MPI_AS_PREFIX"%d-%s", starpu_mpi_world_rank(), name);
  229. #ifdef STARPU_HAVE_SIMGRID_HOST_H
  230. return sg_host_by_name(mpiname);
  231. #else
  232. return MSG_get_host_by_name(mpiname);
  233. #endif
  234. }
  235. else
  236. #ifdef STARPU_HAVE_SIMGRID_HOST_H
  237. return sg_host_by_name(name);
  238. #else
  239. return MSG_get_host_by_name(name);
  240. #endif
  241. }
  242. starpu_sg_host_t _starpu_simgrid_get_host_by_worker(struct _starpu_worker *worker)
  243. {
  244. char *prefix;
  245. char name[16];
  246. starpu_sg_host_t host;
  247. switch (worker->arch)
  248. {
  249. case STARPU_CPU_WORKER:
  250. prefix = "CPU";
  251. break;
  252. case STARPU_CUDA_WORKER:
  253. prefix = "CUDA";
  254. break;
  255. case STARPU_OPENCL_WORKER:
  256. prefix = "OpenCL";
  257. break;
  258. default:
  259. STARPU_ASSERT(0);
  260. }
  261. snprintf(name, sizeof(name), "%s%u", prefix, worker->devid);
  262. host = _starpu_simgrid_get_host_by_name(name);
  263. STARPU_ASSERT_MSG(host, "Could not find host %s!", name);
  264. return host;
  265. }
  266. /* Simgrid up to 3.15 would rename main into smpi_simulated_main_, and call that
  267. * from SMPI initialization
  268. * In case the MPI application didn't use smpicc to build the file containing
  269. * main(), but included our #define main starpu_main, try to cope by calling
  270. * starpu_main */
  271. int _starpu_smpi_simulated_main_(int argc, char *argv[])
  272. {
  273. if (!starpu_main)
  274. {
  275. _STARPU_ERROR("In simgrid mode, the file containing the main() function of this application needs to be compiled with starpu.h or starpu_simgrid_wrap.h included, to properly rename it into starpu_main\n");
  276. }
  277. return starpu_main(argc, argv);
  278. }
  279. int smpi_simulated_main_(int argc, char *argv[]) __attribute__((weak, alias("_starpu_smpi_simulated_main_")));
  280. /* This is used to start a non-MPI simgrid environment */
  281. void _starpu_start_simgrid(int *argc, char **argv)
  282. {
  283. char path[256];
  284. if (simgrid_started)
  285. return;
  286. simgrid_started = 1;
  287. #if defined(STARPU_SIMGRID_HAVE_SIMGRID_INIT) && defined(HAVE_SG_ACTOR_INIT)
  288. simgrid_init(argc, argv);
  289. #else
  290. MSG_init(argc, argv);
  291. #endif
  292. /* Simgrid uses tiny stacks by default. This comes unexpected to our users. */
  293. #ifdef HAVE_GETRLIMIT
  294. struct rlimit rlim;
  295. if (getrlimit(RLIMIT_STACK, &rlim) == 0 && rlim.rlim_cur != 0 && rlim.rlim_cur != RLIM_INFINITY)
  296. _starpu_default_stack_size = rlim.rlim_cur / 1024;
  297. #endif
  298. _starpu_simgrid_set_stack_size(_starpu_default_stack_size);
  299. /* Load XML platform */
  300. #if SIMGRID_VERSION < 31300
  301. _starpu_simgrid_get_platform_path(3, path, sizeof(path));
  302. #else
  303. _starpu_simgrid_get_platform_path(4, path, sizeof(path));
  304. #endif
  305. #if defined(STARPU_SIMGRID_HAVE_SIMGRID_INIT) && defined(HAVE_SG_ACTOR_INIT)
  306. simgrid_load_platform(path);
  307. #else
  308. MSG_create_environment(path);
  309. #endif
  310. int limit_bandwidth = starpu_get_env_number("STARPU_LIMIT_BANDWIDTH");
  311. if (limit_bandwidth >= 0)
  312. {
  313. #ifdef HAVE_SG_LINK_BANDWIDTH_SET
  314. sg_link_t *links = sg_link_list();
  315. int count = sg_link_count(), i;
  316. for (i = 0; i < count; i++) {
  317. sg_link_bandwidth_set(links[i], limit_bandwidth * 1000000.);
  318. }
  319. #else
  320. _STARPU_DISP("Warning: STARPU_LIMIT_BANDWIDTH set to %d but this requires simgrid 3.26, thus ignored\n", limit_bandwidth);
  321. #endif
  322. }
  323. simgrid_transfer_cost = starpu_get_env_number_default("STARPU_SIMGRID_TRANSFER_COST", 1);
  324. }
  325. static int main_ret;
  326. int do_starpu_main(int argc, char *argv[])
  327. {
  328. /* FIXME: Ugly work-around for bug in simgrid: the MPI context is not properly set at MSG process startup */
  329. starpu_sleep(0.000001);
  330. if (!starpu_main)
  331. {
  332. _STARPU_ERROR("In simgrid mode, the file containing the main() function of this application needs to be compiled with starpu.h or starpu_simgrid_wrap.h included, to properly rename it into starpu_main\n");
  333. }
  334. main_ret = starpu_main(argc, argv);
  335. return main_ret;
  336. }
  337. /* We need it only when using smpi */
  338. #pragma weak smpi_process_get_user_data
  339. extern void *smpi_process_get_user_data();
  340. /* This is hopefully called before the application and simgrid */
  341. #undef main
  342. #pragma weak main
  343. int main(int argc, char **argv)
  344. {
  345. #ifdef HAVE_SG_CONFIG_CONTINUE_AFTER_HELP
  346. sg_config_continue_after_help();
  347. #endif
  348. if (_starpu_simgrid_running_smpi())
  349. {
  350. if (!smpi_process_get_user_data)
  351. {
  352. _STARPU_ERROR("Your version of simgrid does not provide smpi_process_get_user_data, we can not continue without it\n");
  353. }
  354. #if SIMGRID_VERSION >= 31600
  355. /* Recent versions of simgrid dlopen() us, so we don't need to
  356. * do circumvolutions, just init MPI early and run the application's main */
  357. return _starpu_mpi_simgrid_init(argc, argv);
  358. #else
  359. /* Oops, we are running old SMPI, let it start Simgrid, and we'll
  360. * take back hand in _starpu_simgrid_init from starpu_init() */
  361. return smpi_main(_starpu_mpi_simgrid_init, argc, argv);
  362. #endif
  363. }
  364. /* Already initialized? It probably has been done through a
  365. * constructor and MSG_process_attach, directly jump to real main */
  366. if (simgrid_started == 3)
  367. {
  368. return do_starpu_main(argc, argv);
  369. }
  370. /* Managed to catch application's main, initialize simgrid first */
  371. _starpu_start_simgrid(&argc, argv);
  372. simgrid_started = 2;
  373. /* Create a simgrid process for main */
  374. char **argv_cpy;
  375. _STARPU_MALLOC(argv_cpy, argc * sizeof(char*));
  376. int i;
  377. for (i = 0; i < argc; i++)
  378. argv_cpy[i] = strdup(argv[i]);
  379. /* Run the application in a separate thread */
  380. _starpu_simgrid_actor_create("main", &do_starpu_main, _starpu_simgrid_get_host_by_name("MAIN"), argc, argv_cpy);
  381. /* And run maestro in the main thread */
  382. #if defined(STARPU_SIMGRID_HAVE_SIMGRID_INIT) && defined(HAVE_SG_ACTOR_INIT)
  383. simgrid_run();
  384. #else
  385. MSG_main();
  386. #endif
  387. return main_ret;
  388. }
  389. #if defined(HAVE_MSG_PROCESS_ATTACH) || defined(MSG_process_attach) || defined(HAVE_SG_ACTOR_ATTACH)
  390. static void maestro(void *data STARPU_ATTRIBUTE_UNUSED)
  391. {
  392. #if defined(STARPU_SIMGRID_HAVE_SIMGRID_INIT) && defined(HAVE_SG_ACTOR_INIT)
  393. simgrid_run();
  394. #else
  395. MSG_main();
  396. #endif
  397. }
  398. #endif
  399. /* This is called early from starpu_init, so thread functions etc. can work */
  400. void _starpu_simgrid_init_early(int *argc STARPU_ATTRIBUTE_UNUSED, char ***argv STARPU_ATTRIBUTE_UNUSED)
  401. {
  402. #ifdef HAVE_SG_CONFIG_CONTINUE_AFTER_HELP
  403. sg_config_continue_after_help();
  404. #endif
  405. #if defined(HAVE_MSG_PROCESS_ATTACH) || defined(MSG_process_attach) || defined(HAVE_SG_ACTOR_ATTACH)
  406. if (simgrid_started < 2 && !_starpu_simgrid_running_smpi())
  407. {
  408. /* "Cannot create_maestro with this ContextFactory.
  409. * Try using --cfg=contexts/factory:thread instead."
  410. * See https://github.com/simgrid/simgrid/issues/141 */
  411. _STARPU_DISP("Warning: In simgrid mode, the file containing the main() function of this application should to be compiled with starpu.h or starpu_simgrid_wrap.h included, to properly rename it into starpu_main to avoid having to use --cfg=contexts/factory:thread which reduces performance\n");
  412. #if SIMGRID_VERSION >= 31400 /* Only recent versions of simgrid support setting sg_cfg_set_string before starting simgrid */
  413. # ifdef HAVE_SG_CFG_SET_INT
  414. sg_cfg_set_string("contexts/factory", "thread");
  415. # else
  416. xbt_cfg_set_string("contexts/factory", "thread");
  417. # endif
  418. #endif
  419. /* We didn't catch application's main. */
  420. /* Start maestro as a separate thread */
  421. SIMIX_set_maestro(maestro, NULL);
  422. /* Initialize simgrid */
  423. _starpu_start_simgrid(argc, *argv);
  424. /* And attach the main thread to the main simgrid process */
  425. void **tsd;
  426. _STARPU_CALLOC(tsd, MAX_TSD+1, sizeof(void*));
  427. #if defined(HAVE_SG_ACTOR_ATTACH) && defined (HAVE_SG_ACTOR_DATA)
  428. sg_actor_t actor = sg_actor_attach("main", NULL, _starpu_simgrid_get_host_by_name("MAIN"), NULL);
  429. sg_actor_data_set(actor, tsd);
  430. #else
  431. MSG_process_attach("main", tsd, _starpu_simgrid_get_host_by_name("MAIN"), NULL);
  432. #endif
  433. /* We initialized through MSG_process_attach */
  434. simgrid_started = 3;
  435. }
  436. #endif
  437. if (!simgrid_started && !starpu_main && !_starpu_simgrid_running_smpi())
  438. {
  439. /* Oops, we don't have MSG_process_attach and didn't catch the
  440. * 'main' symbol, there is no way for us */
  441. _STARPU_ERROR("In simgrid mode, the file containing the main() function of this application needs to be compiled with starpu.h or starpu_simgrid_wrap.h included, to properly rename it into starpu_main\n");
  442. }
  443. if (_starpu_simgrid_running_smpi())
  444. {
  445. #ifndef STARPU_STATIC_ONLY
  446. _STARPU_ERROR("Simgrid currently does not support privatization for dynamically-linked libraries in SMPI. Please reconfigure and build StarPU with --disable-shared");
  447. #endif
  448. #if defined(HAVE_MSG_PROCESS_USERDATA_INIT) && !defined(HAVE_SG_ACTOR_DATA)
  449. MSG_process_userdata_init();
  450. #endif
  451. void **tsd;
  452. _STARPU_CALLOC(tsd, MAX_TSD+1, sizeof(void*));
  453. #ifdef HAVE_SG_ACTOR_DATA
  454. sg_actor_data_set(sg_actor_self(), tsd);
  455. #else
  456. smpi_process_set_user_data(tsd);
  457. #endif
  458. }
  459. unsigned i;
  460. for (i = 0; i < STARPU_MAXNODES; i++)
  461. starpu_pthread_queue_init(&_starpu_simgrid_transfer_queue[i]);
  462. for (i = 0; i < STARPU_NMAXWORKERS; i++)
  463. starpu_pthread_queue_init(&_starpu_simgrid_task_queue[i]);
  464. }
  465. /* This is called late from starpu_init, to start task executors */
  466. void _starpu_simgrid_init(void)
  467. {
  468. unsigned i;
  469. runners_running = 1;
  470. for (i = 0; i < starpu_worker_get_count(); i++)
  471. {
  472. char s[32];
  473. snprintf(s, sizeof(s), "worker %u runner", i);
  474. starpu_sem_init(&worker_runner[i].sem, 0, 0);
  475. starpu_pthread_create_on(s, &worker_runner[i].runner, NULL, task_execute, (void*)(uintptr_t) i, _starpu_simgrid_get_host_by_worker(_starpu_get_worker_struct(i)));
  476. }
  477. }
  478. void _starpu_simgrid_deinit_late(void)
  479. {
  480. #if defined(HAVE_MSG_PROCESS_ATTACH) || defined(MSG_process_attach) || defined(HAVE_SG_ACTOR_ATTACH)
  481. if (simgrid_started == 3)
  482. {
  483. /* Started with MSG_process_attach, now detach */
  484. #ifdef HAVE_SG_ACTOR_ATTACH
  485. sg_actor_detach();
  486. #else
  487. MSG_process_detach();
  488. #endif
  489. simgrid_started = 0;
  490. }
  491. #endif
  492. }
  493. void _starpu_simgrid_deinit(void)
  494. {
  495. unsigned i, j;
  496. runners_running = 0;
  497. for (i = 0; i < STARPU_MAXNODES; i++)
  498. {
  499. for (j = 0; j < STARPU_MAXNODES; j++)
  500. {
  501. struct transfer_runner *t = &transfer_runner[i][j];
  502. if (t->runner)
  503. {
  504. starpu_sem_post(&t->sem);
  505. #ifdef STARPU_HAVE_SIMGRID_ACTOR_H
  506. sg_actor_join(t->runner, 1000000);
  507. #elif SIMGRID_VERSION >= 31400
  508. MSG_process_join(t->runner, 1000000);
  509. #else
  510. starpu_sleep(1);
  511. #endif
  512. STARPU_ASSERT(t->first_transfer == NULL);
  513. STARPU_ASSERT(t->last_transfer == NULL);
  514. starpu_sem_destroy(&t->sem);
  515. }
  516. }
  517. /* FIXME: queue not empty at this point, needs proper unregistration */
  518. /* starpu_pthread_queue_destroy(&_starpu_simgrid_transfer_queue[i]); */
  519. }
  520. for (i = 0; i < starpu_worker_get_count(); i++)
  521. {
  522. struct worker_runner *w = &worker_runner[i];
  523. starpu_sem_post(&w->sem);
  524. #ifdef STARPU_HAVE_SIMGRID_ACTOR_H
  525. sg_actor_join(w->runner, 1000000);
  526. #elif SIMGRID_VERSION >= 31400
  527. MSG_process_join(w->runner, 1000000);
  528. #else
  529. starpu_sleep(1);
  530. #endif
  531. STARPU_ASSERT(w->first_task == NULL);
  532. STARPU_ASSERT(w->last_task == NULL);
  533. starpu_sem_destroy(&w->sem);
  534. starpu_pthread_queue_destroy(&_starpu_simgrid_task_queue[i]);
  535. }
  536. #if SIMGRID_VERSION >= 31300
  537. /* clean-atexit introduced in simgrid 3.13 */
  538. # ifdef HAVE_SG_CFG_SET_INT
  539. if ( sg_cfg_get_boolean("debug/clean-atexit"))
  540. # elif SIMGRID_VERSION >= 32300
  541. if ( xbt_cfg_get_boolean("debug/clean-atexit"))
  542. # else
  543. if ( xbt_cfg_get_boolean("clean-atexit"))
  544. # endif
  545. {
  546. _starpu_simgrid_deinit_late();
  547. }
  548. #endif
  549. }
  550. /*
  551. * Tasks
  552. */
  553. struct task
  554. {
  555. #ifdef HAVE_SG_ACTOR_SELF_EXECUTE
  556. double flops;
  557. #else
  558. msg_task_t task;
  559. #endif
  560. /* communication termination signalization */
  561. unsigned *finished;
  562. /* Next task on this worker */
  563. struct task *next;
  564. };
  565. /* Actually execute the task. */
  566. static void *task_execute(void *arg)
  567. {
  568. unsigned workerid = (uintptr_t) arg;
  569. struct worker_runner *w = &worker_runner[workerid];
  570. _STARPU_DEBUG("worker runner %u started\n", workerid);
  571. while (1)
  572. {
  573. struct task *task;
  574. starpu_sem_wait(&w->sem);
  575. if (!runners_running)
  576. break;
  577. task = w->first_task;
  578. w->first_task = task->next;
  579. if (w->last_task == task)
  580. w->last_task = NULL;
  581. _STARPU_DEBUG("task %p started\n", task);
  582. #ifdef HAVE_SG_ACTOR_EXECUTE
  583. sg_actor_execute(task->flops);
  584. #elif defined(HAVE_SG_ACTOR_SELF_EXECUTE)
  585. sg_actor_self_execute(task->flops);
  586. #else
  587. MSG_task_execute(task->task);
  588. MSG_task_destroy(task->task);
  589. #endif
  590. _STARPU_DEBUG("task %p finished\n", task);
  591. *task->finished = 1;
  592. /* The worker which started this task may be sleeping out of tasks, wake it */
  593. _starpu_wake_worker_relax(workerid);
  594. free(task);
  595. }
  596. _STARPU_DEBUG("worker %u stopped\n", workerid);
  597. return 0;
  598. }
  599. /* Wait for completion of all asynchronous tasks for this worker */
  600. void _starpu_simgrid_wait_tasks(int workerid)
  601. {
  602. struct task *task = worker_runner[workerid].last_task;
  603. if (!task)
  604. return;
  605. unsigned *finished = task->finished;
  606. starpu_pthread_wait_t wait;
  607. starpu_pthread_wait_init(&wait);
  608. starpu_pthread_queue_register(&wait, &_starpu_simgrid_task_queue[workerid]);
  609. while(1)
  610. {
  611. starpu_pthread_wait_reset(&wait);
  612. if (*finished)
  613. break;
  614. starpu_pthread_wait_wait(&wait);
  615. }
  616. starpu_pthread_queue_unregister(&wait, &_starpu_simgrid_task_queue[workerid]);
  617. starpu_pthread_wait_destroy(&wait);
  618. }
  619. /* Task execution submitted by StarPU */
  620. void _starpu_simgrid_submit_job(int workerid, struct _starpu_job *j, struct starpu_perfmodel_arch* perf_arch, double length, unsigned *finished)
  621. {
  622. struct starpu_task *starpu_task = j->task;
  623. double flops;
  624. #ifndef HAVE_SG_ACTOR_SELF_EXECUTE
  625. msg_task_t simgrid_task;
  626. #endif
  627. if (j->internal)
  628. /* This is not useful to include in simulation (and probably
  629. * doesn't have a perfmodel anyway) */
  630. return;
  631. if (isnan(length))
  632. {
  633. length = starpu_task_expected_length(starpu_task, perf_arch, j->nimpl);
  634. STARPU_ASSERT_MSG(!_STARPU_IS_ZERO(length) && !isnan(length),
  635. "Codelet %s does not have a perfmodel (in directory %s), or is not calibrated enough, please re-run in non-simgrid mode until it is calibrated",
  636. _starpu_job_get_model_name(j), _starpu_get_perf_model_dir_codelet());
  637. /* TODO: option to add variance according to performance model,
  638. * to be able to easily check scheduling robustness */
  639. }
  640. #if defined(HAVE_SG_HOST_SPEED) || defined(sg_host_speed)
  641. # if defined(HAVE_SG_HOST_SELF) || defined(sg_host_self)
  642. flops = length/1000000.0*sg_host_speed(sg_host_self());
  643. # else
  644. flops = length/1000000.0*sg_host_speed(MSG_host_self());
  645. # endif
  646. #elif defined HAVE_MSG_HOST_GET_SPEED || defined(MSG_host_get_speed)
  647. flops = length/1000000.0*MSG_host_get_speed(MSG_host_self());
  648. #else
  649. flops = length/1000000.0*MSG_get_host_speed(MSG_host_self());
  650. #endif
  651. #ifndef HAVE_SG_ACTOR_SELF_EXECUTE
  652. simgrid_task = MSG_task_create(_starpu_job_get_task_name(j), flops, 0, NULL);
  653. #endif
  654. if (finished == NULL)
  655. {
  656. /* Synchronous execution */
  657. /* First wait for previous tasks */
  658. _starpu_simgrid_wait_tasks(workerid);
  659. #ifdef HAVE_SG_ACTOR_EXECUTE
  660. sg_actor_execute(flops);
  661. #elif defined(HAVE_SG_ACTOR_SELF_EXECUTE)
  662. sg_actor_self_execute(flops);
  663. #else
  664. MSG_task_execute(simgrid_task);
  665. MSG_task_destroy(simgrid_task);
  666. #endif
  667. }
  668. else
  669. {
  670. /* Asynchronous execution */
  671. struct task *task;
  672. struct worker_runner *w = &worker_runner[workerid];
  673. _STARPU_MALLOC(task, sizeof(*task));
  674. #ifdef HAVE_SG_ACTOR_SELF_EXECUTE
  675. task->flops = flops;
  676. #else
  677. task->task = simgrid_task;
  678. #endif
  679. task->finished = finished;
  680. *finished = 0;
  681. task->next = NULL;
  682. /* Sleep 10µs for the GPU task queueing */
  683. if (_starpu_simgrid_queue_malloc_cost())
  684. starpu_sleep(0.000010);
  685. if (w->last_task)
  686. {
  687. /* Already running a task, queue */
  688. w->last_task->next = task;
  689. w->last_task = task;
  690. }
  691. else
  692. {
  693. STARPU_ASSERT(!w->first_task);
  694. w->first_task = task;
  695. w->last_task = task;
  696. }
  697. starpu_sem_post(&w->sem);
  698. }
  699. }
  700. /*
  701. * Transfers
  702. */
  703. /* Note: simgrid is not parallel, so there is no need to hold locks for management of transfers. */
  704. LIST_TYPE(transfer,
  705. #if defined(HAVE_SG_HOST_SEND_TO) || defined(HAVE_SG_HOST_SENDTO)
  706. size_t size;
  707. #else
  708. msg_task_t task;
  709. #endif
  710. int src_node;
  711. int dst_node;
  712. int run_node;
  713. /* communication termination signalization */
  714. unsigned *finished;
  715. /* transfers which wait for this transfer */
  716. struct transfer **wake;
  717. unsigned nwake;
  718. /* Number of transfers that this transfer waits for */
  719. unsigned nwait;
  720. /* Next transfer on this stream */
  721. struct transfer *next;
  722. )
  723. struct transfer_list pending;
  724. /* Tell for two transfers whether they should be handled in sequence */
  725. static int transfers_are_sequential(struct transfer *new_transfer, struct transfer *old_transfer)
  726. {
  727. int new_is_cuda STARPU_ATTRIBUTE_UNUSED, old_is_cuda STARPU_ATTRIBUTE_UNUSED;
  728. int new_is_opencl STARPU_ATTRIBUTE_UNUSED, old_is_opencl STARPU_ATTRIBUTE_UNUSED;
  729. int new_is_gpu_gpu, old_is_gpu_gpu;
  730. new_is_cuda = starpu_node_get_kind(new_transfer->src_node) == STARPU_CUDA_RAM;
  731. new_is_cuda |= starpu_node_get_kind(new_transfer->dst_node) == STARPU_CUDA_RAM;
  732. old_is_cuda = starpu_node_get_kind(old_transfer->src_node) == STARPU_CUDA_RAM;
  733. old_is_cuda |= starpu_node_get_kind(old_transfer->dst_node) == STARPU_CUDA_RAM;
  734. new_is_opencl = starpu_node_get_kind(new_transfer->src_node) == STARPU_OPENCL_RAM;
  735. new_is_opencl |= starpu_node_get_kind(new_transfer->dst_node) == STARPU_OPENCL_RAM;
  736. old_is_opencl = starpu_node_get_kind(old_transfer->src_node) == STARPU_OPENCL_RAM;
  737. old_is_opencl |= starpu_node_get_kind(old_transfer->dst_node) == STARPU_OPENCL_RAM;
  738. new_is_gpu_gpu = new_transfer->src_node && new_transfer->dst_node;
  739. old_is_gpu_gpu = old_transfer->src_node && old_transfer->dst_node;
  740. /* We ignore cuda-opencl transfers, they can not happen */
  741. STARPU_ASSERT(!((new_is_cuda && old_is_opencl) || (old_is_cuda && new_is_opencl)));
  742. /* The following constraints have been observed with CUDA alone */
  743. /* Same source/destination, sequential */
  744. if (new_transfer->src_node == old_transfer->src_node && new_transfer->dst_node == old_transfer->dst_node)
  745. return 1;
  746. /* Crossed GPU-GPU, sequential */
  747. if (new_is_gpu_gpu
  748. && new_transfer->src_node == old_transfer->dst_node
  749. && old_transfer->src_node == new_transfer->dst_node)
  750. return 1;
  751. /* GPU-GPU transfers are sequential with any RAM->GPU transfer */
  752. if (new_is_gpu_gpu
  753. && (old_transfer->dst_node == new_transfer->src_node
  754. || old_transfer->dst_node == new_transfer->dst_node))
  755. return 1;
  756. if (old_is_gpu_gpu
  757. && (new_transfer->dst_node == old_transfer->src_node
  758. || new_transfer->dst_node == old_transfer->dst_node))
  759. return 1;
  760. /* StarPU's constraint on CUDA transfers is using one stream per
  761. * source/destination pair, which is already handled above */
  762. return 0;
  763. }
  764. static void transfer_queue(struct transfer *transfer)
  765. {
  766. unsigned src = transfer->src_node;
  767. unsigned dst = transfer->dst_node;
  768. struct transfer_runner *t = &transfer_runner[src][dst];
  769. if (!t->runner)
  770. {
  771. /* No runner yet, start it */
  772. static starpu_pthread_mutex_t mutex; /* process_create may yield */
  773. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  774. if (!t->runner)
  775. {
  776. char s[64];
  777. snprintf(s, sizeof(s), "transfer %u-%u runner", src, dst);
  778. starpu_pthread_create_on(s, &t->runner, NULL, transfer_execute, (void*)(uintptr_t)((src<<16) + dst), _starpu_simgrid_get_memnode_host(src));
  779. starpu_sem_init(&t->sem, 0, 0);
  780. }
  781. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  782. }
  783. if (t->last_transfer)
  784. {
  785. /* Already running a transfer, queue */
  786. t->last_transfer->next = transfer;
  787. t->last_transfer = transfer;
  788. }
  789. else
  790. {
  791. STARPU_ASSERT(!t->first_transfer);
  792. t->first_transfer = transfer;
  793. t->last_transfer = transfer;
  794. }
  795. starpu_sem_post(&t->sem);
  796. }
  797. /* Actually execute the transfer, and then start transfers waiting for this one. */
  798. static void *transfer_execute(void *arg)
  799. {
  800. unsigned src_dst = (uintptr_t) arg;
  801. unsigned src = src_dst >> 16;
  802. unsigned dst = src_dst & 0xffff;
  803. struct transfer_runner *t = &transfer_runner[src][dst];
  804. _STARPU_DEBUG("transfer runner %u-%u started\n", src, dst);
  805. while (1)
  806. {
  807. struct transfer *transfer;
  808. starpu_sem_wait(&t->sem);
  809. if (!runners_running)
  810. break;
  811. transfer = t->first_transfer;
  812. t->first_transfer = transfer->next;
  813. if (t->last_transfer == transfer)
  814. t->last_transfer = NULL;
  815. #if defined(HAVE_SG_HOST_SEND_TO) || defined(HAVE_SG_HOST_SENDTO)
  816. if (transfer->size)
  817. #else
  818. if (transfer->task)
  819. #endif
  820. {
  821. _STARPU_DEBUG("transfer %p started\n", transfer);
  822. #if defined(HAVE_SG_HOST_SEND_TO) || defined(HAVE_SG_HOST_SENDTO)
  823. #ifdef HAVE_SG_HOST_SENDTO
  824. sg_host_sendto
  825. #else
  826. sg_host_send_to
  827. #endif
  828. (_starpu_simgrid_memory_node_get_host(transfer->src_node),
  829. _starpu_simgrid_memory_node_get_host(transfer->dst_node),
  830. transfer->size);
  831. #else
  832. MSG_task_execute(transfer->task);
  833. MSG_task_destroy(transfer->task);
  834. #endif
  835. _STARPU_DEBUG("transfer %p finished\n", transfer);
  836. }
  837. *transfer->finished = 1;
  838. transfer_list_erase(&pending, transfer);
  839. /* The workers which started this request may be sleeping out of tasks, wake it */
  840. _starpu_wake_all_blocked_workers_on_node(transfer->run_node);
  841. unsigned i;
  842. /* Wake transfers waiting for my termination */
  843. /* Note: due to possible preemption inside process_create, the array
  844. * may grow while doing this */
  845. for (i = 0; i < transfer->nwake; i++)
  846. {
  847. struct transfer *wake = transfer->wake[i];
  848. STARPU_ASSERT(wake->nwait > 0);
  849. wake->nwait--;
  850. if (!wake->nwait)
  851. {
  852. _STARPU_DEBUG("triggering transfer %p\n", wake);
  853. transfer_queue(wake);
  854. }
  855. }
  856. free(transfer->wake);
  857. free(transfer);
  858. }
  859. return 0;
  860. }
  861. /* Look for sequentialization between this transfer and pending transfers, and submit this one */
  862. static void transfer_submit(struct transfer *transfer)
  863. {
  864. struct transfer *old;
  865. for (old = transfer_list_begin(&pending);
  866. old != transfer_list_end(&pending);
  867. old = transfer_list_next(old))
  868. {
  869. if (transfers_are_sequential(transfer, old))
  870. {
  871. _STARPU_DEBUG("transfer %p(%d->%d) waits for %p(%d->%d)\n",
  872. transfer, transfer->src_node, transfer->dst_node,
  873. old, old->src_node, old->dst_node);
  874. /* Make new wait for the old */
  875. transfer->nwait++;
  876. /* Make old wake the new */
  877. _STARPU_REALLOC(old->wake, (old->nwake + 1) * sizeof(old->wake));
  878. old->wake[old->nwake] = transfer;
  879. old->nwake++;
  880. }
  881. }
  882. transfer_list_push_front(&pending, transfer);
  883. if (!transfer->nwait)
  884. {
  885. _STARPU_DEBUG("transfer %p waits for nobody, starting\n", transfer);
  886. transfer_queue(transfer);
  887. }
  888. }
  889. int _starpu_simgrid_wait_transfer_event(union _starpu_async_channel_event *event)
  890. {
  891. /* this is not associated to a request so it's synchronous */
  892. starpu_pthread_wait_t wait;
  893. starpu_pthread_wait_init(&wait);
  894. starpu_pthread_queue_register(&wait, event->queue);
  895. while(1)
  896. {
  897. starpu_pthread_wait_reset(&wait);
  898. if (event->finished)
  899. break;
  900. starpu_pthread_wait_wait(&wait);
  901. }
  902. starpu_pthread_queue_unregister(&wait, event->queue);
  903. starpu_pthread_wait_destroy(&wait);
  904. return 0;
  905. }
  906. int _starpu_simgrid_test_transfer_event(union _starpu_async_channel_event *event)
  907. {
  908. return event->finished;
  909. }
  910. /* Wait for completion of all transfers */
  911. static void _starpu_simgrid_wait_transfers(void)
  912. {
  913. unsigned finished = 0;
  914. struct transfer *sync = transfer_new();
  915. struct transfer *cur;
  916. #if defined(HAVE_SG_HOST_SEND_TO) || defined(HAVE_SG_HOST_SENDTO)
  917. sync->size = 0;
  918. #else
  919. sync->task = NULL;
  920. #endif
  921. sync->finished = &finished;
  922. sync->src_node = STARPU_MAIN_RAM;
  923. sync->dst_node = STARPU_MAIN_RAM;
  924. sync->run_node = STARPU_MAIN_RAM;
  925. sync->wake = NULL;
  926. sync->nwake = 0;
  927. sync->nwait = 0;
  928. sync->next = NULL;
  929. for (cur = transfer_list_begin(&pending);
  930. cur != transfer_list_end(&pending);
  931. cur = transfer_list_next(cur))
  932. {
  933. sync->nwait++;
  934. _STARPU_REALLOC(cur->wake, (cur->nwake + 1) * sizeof(cur->wake));
  935. cur->wake[cur->nwake] = sync;
  936. cur->nwake++;
  937. }
  938. if (sync->nwait == 0)
  939. {
  940. /* No transfer to wait for */
  941. free(sync);
  942. return;
  943. }
  944. /* Push synchronization pseudo-transfer */
  945. transfer_list_push_front(&pending, sync);
  946. /* And wait for it */
  947. starpu_pthread_wait_t wait;
  948. starpu_pthread_wait_init(&wait);
  949. starpu_pthread_queue_register(&wait, &_starpu_simgrid_transfer_queue[STARPU_MAIN_RAM]);
  950. while(1)
  951. {
  952. starpu_pthread_wait_reset(&wait);
  953. if (finished)
  954. break;
  955. starpu_pthread_wait_wait(&wait);
  956. }
  957. starpu_pthread_queue_unregister(&wait, &_starpu_simgrid_transfer_queue[STARPU_MAIN_RAM]);
  958. starpu_pthread_wait_destroy(&wait);
  959. }
  960. /* Data transfer issued by StarPU */
  961. int _starpu_simgrid_transfer(size_t size, unsigned src_node, unsigned dst_node, struct _starpu_data_request *req)
  962. {
  963. /* Simgrid does not like 0-bytes transfers */
  964. if (!size)
  965. return 0;
  966. /* Explicitly disabled by user? */
  967. if (!simgrid_transfer_cost)
  968. return 0;
  969. union _starpu_async_channel_event *event, myevent;
  970. double start = 0.;
  971. struct transfer *transfer = transfer_new();
  972. _STARPU_DEBUG("creating transfer %p for %lu bytes\n", transfer, (unsigned long) size);
  973. #if defined(HAVE_SG_HOST_SEND_TO) || defined(HAVE_SG_HOST_SENDTO)
  974. transfer->size = size;
  975. #else
  976. msg_task_t task;
  977. starpu_sg_host_t *hosts;
  978. double *computation;
  979. double *communication;
  980. _STARPU_CALLOC(hosts, 2, sizeof(*hosts));
  981. _STARPU_CALLOC(computation, 2, sizeof(*computation));
  982. _STARPU_CALLOC(communication, 4, sizeof(*communication));
  983. hosts[0] = _starpu_simgrid_memory_node_get_host(src_node);
  984. hosts[1] = _starpu_simgrid_memory_node_get_host(dst_node);
  985. STARPU_ASSERT(hosts[0] != hosts[1]);
  986. communication[1] = size;
  987. task = MSG_parallel_task_create("copy", 2, hosts, computation, communication, NULL);
  988. transfer->task = task;
  989. #endif
  990. transfer->src_node = src_node;
  991. transfer->dst_node = dst_node;
  992. transfer->run_node = starpu_worker_get_local_memory_node();
  993. if (req)
  994. event = &req->async_channel.event;
  995. else
  996. event = &myevent;
  997. event->finished = 0;
  998. transfer->finished = &event->finished;
  999. event->queue = &_starpu_simgrid_transfer_queue[transfer->run_node];
  1000. transfer->wake = NULL;
  1001. transfer->nwake = 0;
  1002. transfer->nwait = 0;
  1003. transfer->next = NULL;
  1004. if (req)
  1005. starpu_interface_start_driver_copy_async(src_node, dst_node, &start);
  1006. /* Sleep 10µs for the GPU transfer queueing */
  1007. if (_starpu_simgrid_queue_malloc_cost())
  1008. starpu_sleep(0.000010);
  1009. transfer_submit(transfer);
  1010. /* Note: from here, transfer might be already freed */
  1011. if (req)
  1012. {
  1013. starpu_interface_end_driver_copy_async(src_node, dst_node, start);
  1014. starpu_interface_data_copy(src_node, dst_node, size);
  1015. return -EAGAIN;
  1016. }
  1017. else
  1018. {
  1019. /* this is not associated to a request so it's synchronous */
  1020. _starpu_simgrid_wait_transfer_event(event);
  1021. return 0;
  1022. }
  1023. }
  1024. /* Sync all GPUs (used on CUDA Free, typically) */
  1025. void _starpu_simgrid_sync_gpus(void)
  1026. {
  1027. _starpu_simgrid_wait_transfers();
  1028. }
  1029. int
  1030. _starpu_simgrid_thread_start(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[])
  1031. {
  1032. void *(*f)(void*) = (void*) (uintptr_t) strtol(argv[0], NULL, 16);
  1033. void *arg = (void*) (uintptr_t) strtol(argv[1], NULL, 16);
  1034. /* FIXME: Ugly work-around for bug in simgrid: the MPI context is not properly set at MSG process startup */
  1035. starpu_sleep(0.000001);
  1036. /* _args is freed with process context */
  1037. f(arg);
  1038. return 0;
  1039. }
  1040. starpu_pthread_t _starpu_simgrid_actor_create(const char *name, xbt_main_func_t code, starpu_sg_host_t host, int argc, char *argv[])
  1041. {
  1042. void **tsd;
  1043. starpu_pthread_t actor;
  1044. _STARPU_CALLOC(tsd, MAX_TSD+1, sizeof(void*));
  1045. #ifdef HAVE_SG_ACTOR_INIT
  1046. actor = sg_actor_init(name, host);
  1047. sg_actor_data_set(actor, tsd);
  1048. sg_actor_start(actor, code, argc, argv);
  1049. #else
  1050. actor = MSG_process_create_with_arguments(name, code, tsd, host, argc, argv);
  1051. #ifdef HAVE_SG_ACTOR_DATA
  1052. sg_actor_data_set(actor, tsd);
  1053. #endif
  1054. #endif
  1055. return actor;
  1056. }
  1057. starpu_sg_host_t _starpu_simgrid_get_memnode_host(unsigned node)
  1058. {
  1059. const char *fmt;
  1060. char name[16];
  1061. switch (starpu_node_get_kind(node))
  1062. {
  1063. case STARPU_CPU_RAM:
  1064. fmt = "RAM";
  1065. break;
  1066. case STARPU_CUDA_RAM:
  1067. fmt = "CUDA%u";
  1068. break;
  1069. case STARPU_OPENCL_RAM:
  1070. fmt = "OpenCL%u";
  1071. break;
  1072. case STARPU_DISK_RAM:
  1073. fmt = "DISK%u";
  1074. break;
  1075. default:
  1076. STARPU_ABORT();
  1077. break;
  1078. }
  1079. snprintf(name, sizeof(name), fmt, starpu_memory_node_get_devid(node));
  1080. return _starpu_simgrid_get_host_by_name(name);
  1081. }
  1082. void _starpu_simgrid_count_ngpus(void)
  1083. {
  1084. #if (defined(HAVE_SG_LINK_NAME) || defined sg_link_name) && (SIMGRID_VERSION >= 31300)
  1085. unsigned src, dst;
  1086. starpu_sg_host_t ramhost = _starpu_simgrid_get_host_by_name("RAM");
  1087. /* For each pair of memory nodes, get the route */
  1088. for (src = 1; src < STARPU_MAXNODES; src++)
  1089. for (dst = 1; dst < STARPU_MAXNODES; dst++)
  1090. {
  1091. int busid;
  1092. starpu_sg_host_t srchost, dsthost;
  1093. #if defined(HAVE_SG_HOST_ROUTE) || defined(sg_host_route)
  1094. xbt_dynar_t route_dynar = xbt_dynar_new(sizeof(SD_link_t), NULL);
  1095. SD_link_t *route;
  1096. #else
  1097. const SD_link_t *route;
  1098. #endif
  1099. int i, routesize;
  1100. int through;
  1101. unsigned src2;
  1102. unsigned ngpus;
  1103. const char *name;
  1104. if (dst == src)
  1105. continue;
  1106. busid = starpu_bus_get_id(src, dst);
  1107. if (busid == -1)
  1108. continue;
  1109. srchost = _starpu_simgrid_get_memnode_host(src);
  1110. dsthost = _starpu_simgrid_get_memnode_host(dst);
  1111. #if defined(HAVE_SG_HOST_ROUTE) || defined(sg_host_route)
  1112. sg_host_route(srchost, dsthost, route_dynar);
  1113. routesize = xbt_dynar_length(route_dynar);
  1114. route = xbt_dynar_to_array(route_dynar);
  1115. #else
  1116. routesize = SD_route_get_size(srchost, dsthost);
  1117. route = SD_route_get_list(srchost, dsthost);
  1118. #endif
  1119. /* If it goes through "Host", do not care, there is no
  1120. * direct transfer support */
  1121. for (i = 0; i < routesize; i++)
  1122. if (!strcmp(sg_link_name(route[i]), "Host"))
  1123. break;
  1124. if (i < routesize)
  1125. continue;
  1126. /* Get the PCI bridge between down and up links */
  1127. through = -1;
  1128. for (i = 0; i < routesize; i++)
  1129. {
  1130. name = sg_link_name(route[i]);
  1131. size_t len = strlen(name);
  1132. if (!strcmp(" through", name+len-8))
  1133. through = i;
  1134. else if (!strcmp(" up", name+len-3))
  1135. break;
  1136. }
  1137. /* Didn't find it ?! */
  1138. if (through == -1)
  1139. {
  1140. _STARPU_DEBUG("Didn't find through-link for %d->%d\n", src, dst);
  1141. continue;
  1142. }
  1143. name = sg_link_name(route[through]);
  1144. /*
  1145. * count how many direct routes go through it between
  1146. * GPUs and RAM
  1147. */
  1148. ngpus = 0;
  1149. for (src2 = 1; src2 < STARPU_MAXNODES; src2++)
  1150. {
  1151. int numa;
  1152. int nnumas = starpu_memory_nodes_get_numa_count();
  1153. int found = 0;
  1154. for (numa = 0; numa < nnumas; numa++)
  1155. if (starpu_bus_get_id(src2, numa) != -1)
  1156. {
  1157. found = 1;
  1158. break;
  1159. }
  1160. if (!found)
  1161. continue;
  1162. starpu_sg_host_t srchost2 = _starpu_simgrid_get_memnode_host(src2);
  1163. int routesize2;
  1164. #if defined(HAVE_SG_HOST_ROUTE) || defined(sg_host_route)
  1165. xbt_dynar_t route_dynar2 = xbt_dynar_new(sizeof(SD_link_t), NULL);
  1166. SD_link_t *route2;
  1167. sg_host_route(srchost2, ramhost, route_dynar2);
  1168. routesize2 = xbt_dynar_length(route_dynar2);
  1169. route2 = xbt_dynar_to_array(route_dynar2);
  1170. #else
  1171. const SD_link_t *route2 = SD_route_get_list(srchost2, ramhost);
  1172. routesize2 = SD_route_get_size(srchost2, ramhost);
  1173. #endif
  1174. for (i = 0; i < routesize2; i++)
  1175. if (!strcmp(name, sg_link_name(route2[i])))
  1176. {
  1177. /* This GPU goes through this PCI bridge to access RAM */
  1178. ngpus++;
  1179. break;
  1180. }
  1181. #if defined(HAVE_SG_HOST_ROUTE) || defined(sg_host_route)
  1182. free(route2);
  1183. #endif
  1184. }
  1185. _STARPU_DEBUG("%d->%d through %s, %u GPUs\n", src, dst, name, ngpus);
  1186. starpu_bus_set_ngpus(busid, ngpus);
  1187. #if defined(HAVE_SG_HOST_ROUTE) || defined(sg_host_route)
  1188. free(route);
  1189. #endif
  1190. }
  1191. #endif
  1192. }
  1193. #if 0
  1194. static size_t used;
  1195. void _starpu_simgrid_data_new(size_t size)
  1196. {
  1197. // Note: this is just declarative
  1198. //_STARPU_DISP("data new: %zd, now %zd\n", size, used);
  1199. }
  1200. void _starpu_simgrid_data_increase(size_t size)
  1201. {
  1202. used += size;
  1203. _STARPU_DISP("data increase: %zd, now %zd\n", size, used);
  1204. }
  1205. void _starpu_simgrid_data_alloc(size_t size)
  1206. {
  1207. used += size;
  1208. _STARPU_DISP("data alloc: %zd, now %zd\n", size, used);
  1209. }
  1210. void _starpu_simgrid_data_free(size_t size)
  1211. {
  1212. used -= size;
  1213. _STARPU_DISP("data free: %zd, now %zd\n", size, used);
  1214. }
  1215. void _starpu_simgrid_data_transfer(size_t size, unsigned src_node, unsigned dst_node)
  1216. {
  1217. _STARPU_DISP("data transfer %zd from %u to %u\n", size, src_node, dst_node);
  1218. }
  1219. #endif
  1220. #endif