simgrid.c 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2012-2015 Université de Bordeaux
  4. *
  5. * StarPU is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * StarPU is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <starpu.h>
  17. #include <datawizard/memory_nodes.h>
  18. #include <common/config.h>
  19. #ifdef HAVE_UNISTD_H
  20. #include <unistd.h>
  21. #endif
  22. #include <core/perfmodel/perfmodel.h>
  23. #include <core/workers.h>
  24. #include <core/simgrid.h>
  25. #ifdef STARPU_SIMGRID
  26. #include <sys/resource.h>
  27. #pragma weak starpu_main
  28. extern int starpu_main(int argc, char *argv[]);
  29. #pragma weak smpi_main
  30. extern int smpi_main(int (*realmain) (int argc, char *argv[]), int argc, char *argv[]);
  31. #pragma weak _starpu_mpi_simgrid_init
  32. extern int _starpu_mpi_simgrid_init(int argc, char *argv[]);
  33. starpu_pthread_queue_t _starpu_simgrid_transfer_queue[STARPU_MAXNODES];
  34. starpu_pthread_queue_t _starpu_simgrid_task_queue[STARPU_NMAXWORKERS];
  35. struct main_args
  36. {
  37. int argc;
  38. char **argv;
  39. };
  40. int do_starpu_main(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[])
  41. {
  42. struct main_args *args = (void*) argv;
  43. return starpu_main(args->argc, args->argv);
  44. }
  45. /* In case the MPI application didn't use smpicc to build the file containing
  46. * main(), try to cope by calling starpu_main */
  47. #pragma weak smpi_simulated_main_
  48. int smpi_simulated_main_(int argc, char *argv[])
  49. {
  50. if (!starpu_main)
  51. {
  52. _STARPU_ERROR("In simgrid mode, the file containing the main() function of this application needs to be compiled with starpu.h or starpu_simgrid_wrap.h included, to properly rename it into starpu_main\n");
  53. }
  54. return starpu_main(argc, argv);
  55. }
  56. #ifdef HAVE_MSG_ENVIRONMENT_GET_ROUTING_ROOT
  57. #ifdef HAVE_MSG_GET_AS_BY_NAME
  58. msg_as_t _starpu_simgrid_get_as_by_name(const char *name)
  59. {
  60. return MSG_get_as_by_name(name);
  61. }
  62. #else /* HAVE_MSG_GET_AS_BY_NAME */
  63. static msg_as_t __starpu_simgrid_get_as_by_name(msg_as_t root, const char *name)
  64. {
  65. xbt_dict_t dict;
  66. xbt_dict_cursor_t cursor;
  67. const char *key;
  68. msg_as_t as, ret;
  69. dict = MSG_environment_as_get_routing_sons(root);
  70. xbt_dict_foreach(dict, cursor, key, as)
  71. {
  72. if (!strcmp(MSG_environment_as_get_name(as), name))
  73. return as;
  74. ret = __starpu_simgrid_get_as_by_name(as, name);
  75. if (ret)
  76. return ret;
  77. }
  78. return NULL;
  79. }
  80. msg_as_t _starpu_simgrid_get_as_by_name(const char *name)
  81. {
  82. return __starpu_simgrid_get_as_by_name(MSG_environment_get_routing_root(), name);
  83. }
  84. #endif /* HAVE_MSG_GET_AS_BY_NAME */
  85. #endif /* HAVE_MSG_ENVIRONMENT_GET_ROUTING_ROOT */
  86. int _starpu_simgrid_get_nbhosts(const char *prefix)
  87. {
  88. int ret;
  89. xbt_dynar_t hosts;
  90. unsigned i, nb;
  91. unsigned len = strlen(prefix);
  92. #ifdef HAVE_MSG_ENVIRONMENT_GET_ROUTING_ROOT
  93. char new_prefix[32];
  94. if (_starpu_simgrid_running_smpi())
  95. {
  96. char name[32];
  97. STARPU_ASSERT(starpu_mpi_world_rank);
  98. snprintf(name, sizeof(name), STARPU_MPI_AS_PREFIX"%u", starpu_mpi_world_rank());
  99. hosts = MSG_environment_as_get_hosts(_starpu_simgrid_get_as_by_name(name));
  100. len = snprintf(new_prefix, sizeof(new_prefix), "%s-%s", name, prefix);
  101. prefix = new_prefix;
  102. len = strlen(prefix);
  103. }
  104. else
  105. #endif /* HAVE_MSG_ENVIRONMENT_GET_ROUTING_ROOT */
  106. hosts = MSG_hosts_as_dynar();
  107. nb = xbt_dynar_length(hosts);
  108. ret = 0;
  109. for (i = 0; i < nb; i++)
  110. {
  111. const char *name;
  112. name = MSG_host_get_name(xbt_dynar_get_as(hosts, i, msg_host_t));
  113. if (!strncmp(name, prefix, len))
  114. ret++;
  115. }
  116. xbt_dynar_free(&hosts);
  117. return ret;
  118. }
  119. unsigned long long _starpu_simgrid_get_memsize(const char *prefix, unsigned devid)
  120. {
  121. char name[32];
  122. msg_host_t host;
  123. const char *memsize;
  124. snprintf(name, sizeof(name), "%s%u", prefix, devid);
  125. host = _starpu_simgrid_get_host_by_name(name);
  126. if (!host)
  127. return 0;
  128. if (!MSG_host_get_properties(host))
  129. return 0;
  130. memsize = MSG_host_get_property_value(host, "memsize");
  131. if (!memsize)
  132. return 0;
  133. return atoll(memsize);
  134. }
  135. msg_host_t _starpu_simgrid_get_host_by_name(const char *name)
  136. {
  137. if (_starpu_simgrid_running_smpi())
  138. {
  139. char mpiname[32];
  140. STARPU_ASSERT(starpu_mpi_world_rank);
  141. snprintf(mpiname, sizeof(mpiname), STARPU_MPI_AS_PREFIX"%d-%s", starpu_mpi_world_rank(), name);
  142. return MSG_get_host_by_name(mpiname);
  143. }
  144. else
  145. return MSG_get_host_by_name(name);
  146. }
  147. msg_host_t _starpu_simgrid_get_host_by_worker(struct _starpu_worker *worker)
  148. {
  149. char *prefix;
  150. char name[16];
  151. msg_host_t host;
  152. switch (worker->arch)
  153. {
  154. case STARPU_CPU_WORKER:
  155. prefix = "CPU";
  156. break;
  157. case STARPU_CUDA_WORKER:
  158. prefix = "CUDA";
  159. break;
  160. case STARPU_OPENCL_WORKER:
  161. prefix = "OpenCL";
  162. break;
  163. default:
  164. STARPU_ASSERT(0);
  165. }
  166. snprintf(name, sizeof(name), "%s%d", prefix, worker->devid);
  167. host = _starpu_simgrid_get_host_by_name(name);
  168. STARPU_ASSERT_MSG(host, "Could not find host %s!", name);
  169. return host;
  170. }
  171. #ifdef STARPU_DEVEL
  172. #warning TODO: use another way to start main, when simgrid provides it, and then include the application-provided configuration for platform numbers
  173. #endif
  174. #undef main
  175. int main(int argc, char **argv)
  176. {
  177. char path[256];
  178. if (!starpu_main && !(smpi_main && smpi_simulated_main_))
  179. {
  180. _STARPU_ERROR("In simgrid mode, the file containing the main() function of this application needs to be compiled with starpu.h included, to properly rename it into starpu_main\n");
  181. }
  182. if (_starpu_simgrid_running_smpi())
  183. {
  184. /* Oops, we are running SMPI, let it start Simgrid, and we'll
  185. * take back hand in _starpu_simgrid_init from starpu_init() */
  186. return smpi_main(_starpu_mpi_simgrid_init, argc, argv);
  187. }
  188. MSG_init(&argc, argv);
  189. #if SIMGRID_VERSION_MAJOR < 3 || (SIMGRID_VERSION_MAJOR == 3 && SIMGRID_VERSION_MINOR < 9)
  190. /* Versions earlier than 3.9 didn't support our communication tasks */
  191. MSG_config("workstation/model", "ptask_L07");
  192. #endif
  193. /* Simgrid uses tiny stacks by default. This comes unexpected to our users. */
  194. extern xbt_cfg_t _sg_cfg_set;
  195. unsigned stack_size = 8192;
  196. struct rlimit rlim;
  197. if (getrlimit(RLIMIT_STACK, &rlim) == 0 && rlim.rlim_cur != 0 && rlim.rlim_cur != RLIM_INFINITY)
  198. stack_size = rlim.rlim_cur / 1024;
  199. xbt_cfg_set_int(_sg_cfg_set, "contexts/stack_size", stack_size);
  200. /* Load XML platform */
  201. _starpu_simgrid_get_platform_path(path, sizeof(path));
  202. MSG_create_environment(path);
  203. struct main_args *args = malloc(sizeof(*args));
  204. args->argc = argc;
  205. args->argv = argv;
  206. MSG_process_create_with_arguments("main", &do_starpu_main, calloc(MAX_TSD, sizeof(void*)), MSG_get_host_by_name("MAIN"), 0, (char**) args);
  207. MSG_main();
  208. return 0;
  209. }
  210. void _starpu_simgrid_init()
  211. {
  212. unsigned i;
  213. if (!starpu_main && !(smpi_main && smpi_simulated_main_))
  214. {
  215. _STARPU_ERROR("In simgrid mode, the file containing the main() function of this application needs to be compiled with starpu.h included, to properly rename it into starpu_main\n");
  216. }
  217. if (_starpu_simgrid_running_smpi())
  218. {
  219. MSG_process_set_data(MSG_process_self(), calloc(MAX_TSD, sizeof(void*)));
  220. }
  221. for (i = 0; i < STARPU_MAXNODES; i++)
  222. starpu_pthread_queue_init(&_starpu_simgrid_transfer_queue[i]);
  223. for (i = 0; i < STARPU_NMAXWORKERS; i++)
  224. starpu_pthread_queue_init(&_starpu_simgrid_task_queue[i]);
  225. }
  226. /*
  227. * Tasks
  228. */
  229. struct task
  230. {
  231. msg_task_t task;
  232. int workerid;
  233. /* communication termination signalization */
  234. unsigned *finished;
  235. starpu_pthread_mutex_t *mutex;
  236. starpu_pthread_cond_t *cond;
  237. /* Task which waits for this task */
  238. struct task *next;
  239. };
  240. static struct task *last_task[STARPU_NMAXWORKERS];
  241. /* Actually execute the task. */
  242. static int task_execute(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[])
  243. {
  244. struct task *task = (void*) argv;
  245. _STARPU_DEBUG("task %p started\n", task);
  246. MSG_task_execute(task->task);
  247. MSG_task_destroy(task->task);
  248. _STARPU_DEBUG("task %p finished\n", task);
  249. STARPU_PTHREAD_MUTEX_LOCK(task->mutex);
  250. *task->finished = 1;
  251. STARPU_PTHREAD_COND_BROADCAST(task->cond);
  252. STARPU_PTHREAD_MUTEX_UNLOCK(task->mutex);
  253. /* The worker which started this task may be sleeping out of tasks, wake it */
  254. starpu_wake_worker(task->workerid);
  255. if (last_task[task->workerid] == task)
  256. last_task[task->workerid] = NULL;
  257. if (task->next)
  258. MSG_process_create_with_arguments("task", task_execute, calloc(MAX_TSD, sizeof(void*)), MSG_host_self(), 0, (char**) task->next);
  259. /* Task is freed with process context */
  260. return 0;
  261. }
  262. /* Wait for completion of all asynchronous tasks for this worker */
  263. void _starpu_simgrid_wait_tasks(int workerid)
  264. {
  265. struct task *task = last_task[workerid];
  266. if (!task)
  267. return;
  268. unsigned *finished = task->finished;
  269. starpu_pthread_mutex_t *mutex = task->mutex;
  270. starpu_pthread_cond_t *cond = task->cond;
  271. STARPU_PTHREAD_MUTEX_LOCK(mutex);
  272. while (!*finished)
  273. STARPU_PTHREAD_COND_WAIT(cond, mutex);
  274. STARPU_PTHREAD_MUTEX_UNLOCK(mutex);
  275. }
  276. /* Task execution submitted by StarPU */
  277. void _starpu_simgrid_submit_job(int workerid, struct _starpu_job *j, struct starpu_perfmodel_arch* perf_arch, double length, unsigned *finished, starpu_pthread_mutex_t *mutex, starpu_pthread_cond_t *cond)
  278. {
  279. struct starpu_task *starpu_task = j->task;
  280. msg_task_t simgrid_task;
  281. if (j->internal)
  282. /* This is not useful to include in simulation (and probably
  283. * doesn't have a perfmodel anyway) */
  284. return;
  285. if (isnan(length))
  286. {
  287. length = starpu_task_expected_length(starpu_task, perf_arch, j->nimpl);
  288. STARPU_ASSERT_MSG(!_STARPU_IS_ZERO(length) && !isnan(length),
  289. "Codelet %s does not have a perfmodel, or is not calibrated enough, please re-run in non-simgrid mode until it is calibrated",
  290. _starpu_job_get_model_name(j));
  291. }
  292. simgrid_task = MSG_task_create(_starpu_job_get_task_name(j),
  293. length/1000000.0*MSG_get_host_speed(MSG_host_self()),
  294. 0, NULL);
  295. if (finished == NULL)
  296. {
  297. /* Synchronous execution */
  298. /* First wait for previous tasks */
  299. _starpu_simgrid_wait_tasks(workerid);
  300. MSG_task_execute(simgrid_task);
  301. MSG_task_destroy(simgrid_task);
  302. }
  303. else
  304. {
  305. /* Asynchronous execution */
  306. struct task *task = malloc(sizeof(*task));
  307. task->task = simgrid_task;
  308. task->workerid = workerid;
  309. task->finished = finished;
  310. *finished = 0;
  311. task->mutex = mutex;
  312. task->cond = cond;
  313. task->next = NULL;
  314. /* Sleep 10µs for the GPU task queueing */
  315. if (_starpu_simgrid_queue_malloc_cost())
  316. MSG_process_sleep(0.000010);
  317. if (last_task[workerid])
  318. {
  319. /* Make this task depend on the previous */
  320. last_task[workerid]->next = task;
  321. last_task[workerid] = task;
  322. }
  323. else
  324. {
  325. last_task[workerid] = task;
  326. MSG_process_create_with_arguments("task", task_execute, calloc(MAX_TSD, sizeof(void*)), MSG_host_self(), 0, (char**) task);
  327. }
  328. }
  329. }
  330. /*
  331. * Transfers
  332. */
  333. /* Note: simgrid is not parallel, so there is no need to hold locks for management of transfers. */
  334. LIST_TYPE(transfer,
  335. msg_task_t task;
  336. int src_node;
  337. int dst_node;
  338. int run_node;
  339. /* communication termination signalization */
  340. unsigned *finished;
  341. starpu_pthread_mutex_t *mutex;
  342. starpu_pthread_cond_t *cond;
  343. /* transfers which wait for this transfer */
  344. struct transfer **wake;
  345. unsigned nwake;
  346. /* Number of transfers that this transfer waits for */
  347. unsigned nwait;
  348. )
  349. struct transfer_list pending;
  350. /* Tell for two transfers whether they should be handled in sequence */
  351. static int transfers_are_sequential(struct transfer *new_transfer, struct transfer *old_transfer)
  352. {
  353. int new_is_cuda STARPU_ATTRIBUTE_UNUSED, old_is_cuda STARPU_ATTRIBUTE_UNUSED;
  354. int new_is_opencl STARPU_ATTRIBUTE_UNUSED, old_is_opencl STARPU_ATTRIBUTE_UNUSED;
  355. int new_is_gpu_gpu, old_is_gpu_gpu;
  356. new_is_cuda = starpu_node_get_kind(new_transfer->src_node) == STARPU_CUDA_RAM;
  357. new_is_cuda |= starpu_node_get_kind(new_transfer->dst_node) == STARPU_CUDA_RAM;
  358. old_is_cuda = starpu_node_get_kind(old_transfer->src_node) == STARPU_CUDA_RAM;
  359. old_is_cuda |= starpu_node_get_kind(old_transfer->dst_node) == STARPU_CUDA_RAM;
  360. new_is_opencl = starpu_node_get_kind(new_transfer->src_node) == STARPU_OPENCL_RAM;
  361. new_is_opencl |= starpu_node_get_kind(new_transfer->dst_node) == STARPU_OPENCL_RAM;
  362. old_is_opencl = starpu_node_get_kind(old_transfer->src_node) == STARPU_OPENCL_RAM;
  363. old_is_opencl |= starpu_node_get_kind(old_transfer->dst_node) == STARPU_OPENCL_RAM;
  364. new_is_gpu_gpu = new_transfer->src_node && new_transfer->dst_node;
  365. old_is_gpu_gpu = old_transfer->src_node && old_transfer->dst_node;
  366. /* We ignore cuda-opencl transfers, they can not happen */
  367. STARPU_ASSERT(!((new_is_cuda && old_is_opencl) || (old_is_cuda && new_is_opencl)));
  368. /* The following constraints have been observed with CUDA alone */
  369. /* Same source/destination, sequential */
  370. if (new_transfer->src_node == old_transfer->src_node && new_transfer->dst_node == old_transfer->dst_node)
  371. return 1;
  372. /* Crossed GPU-GPU, sequential */
  373. if (new_is_gpu_gpu
  374. && new_transfer->src_node == old_transfer->dst_node
  375. && old_transfer->src_node == new_transfer->dst_node)
  376. return 1;
  377. /* GPU-GPU transfers are sequential with any RAM->GPU transfer */
  378. if (new_is_gpu_gpu
  379. && old_transfer->dst_node == new_transfer->src_node
  380. && old_transfer->dst_node == new_transfer->dst_node)
  381. return 1;
  382. if (old_is_gpu_gpu
  383. && new_transfer->dst_node == old_transfer->src_node
  384. && new_transfer->dst_node == old_transfer->dst_node)
  385. return 1;
  386. /* StarPU's constraint on CUDA transfers is using one stream per
  387. * source/destination pair, which is already handled above */
  388. return 0;
  389. }
  390. /* Actually execute the transfer, and then start transfers waiting for this one. */
  391. static int transfer_execute(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[])
  392. {
  393. struct transfer *transfer = (void*) argv;
  394. unsigned i;
  395. _STARPU_DEBUG("transfer %p started\n", transfer);
  396. MSG_task_execute(transfer->task);
  397. MSG_task_destroy(transfer->task);
  398. _STARPU_DEBUG("transfer %p finished\n", transfer);
  399. STARPU_PTHREAD_MUTEX_LOCK(transfer->mutex);
  400. *transfer->finished = 1;
  401. STARPU_PTHREAD_COND_BROADCAST(transfer->cond);
  402. STARPU_PTHREAD_MUTEX_UNLOCK(transfer->mutex);
  403. /* The workers which started this request may be sleeping out of tasks, wake it */
  404. _starpu_wake_all_blocked_workers_on_node(transfer->run_node);
  405. /* Wake transfers waiting for my termination */
  406. /* Note: due to possible preemption inside process_create, the array
  407. * may grow while doing this */
  408. for (i = 0; i < transfer->nwake; i++)
  409. {
  410. struct transfer *wake = transfer->wake[i];
  411. STARPU_ASSERT(wake->nwait > 0);
  412. wake->nwait--;
  413. if (!wake->nwait)
  414. {
  415. _STARPU_DEBUG("triggering transfer %p\n", wake);
  416. MSG_process_create_with_arguments("transfer task", transfer_execute, calloc(MAX_TSD, sizeof(void*)), _starpu_simgrid_get_host_by_name("MAIN"), 0, (char**) wake);
  417. }
  418. }
  419. free(transfer->wake);
  420. transfer_list_erase(&pending, transfer);
  421. /* transfer is freed with process context */
  422. return 0;
  423. }
  424. /* Look for sequentialization between this transfer and pending transfers, and submit this one */
  425. static void transfer_submit(struct transfer *transfer)
  426. {
  427. struct transfer *old;
  428. for (old = transfer_list_begin(&pending);
  429. old != transfer_list_end(&pending);
  430. old = transfer_list_next(old))
  431. {
  432. if (transfers_are_sequential(transfer, old))
  433. {
  434. _STARPU_DEBUG("transfer %p(%d->%d) waits for %p(%d->%d)\n",
  435. transfer, transfer->src_node, transfer->dst_node,
  436. old, old->src_node, old->dst_node);
  437. /* Make new wait for the old */
  438. transfer->nwait++;
  439. /* Make old wake the new */
  440. old->wake = realloc(old->wake, (old->nwake + 1) * sizeof(old->wake));
  441. old->wake[old->nwake] = transfer;
  442. old->nwake++;
  443. }
  444. }
  445. transfer_list_push_front(&pending, transfer);
  446. if (!transfer->nwait)
  447. {
  448. _STARPU_DEBUG("transfer %p waits for nobody, starting\n", transfer);
  449. MSG_process_create_with_arguments("transfer task", transfer_execute, calloc(MAX_TSD, sizeof(void*)), _starpu_simgrid_get_host_by_name("MAIN"), 0, (char**) transfer);
  450. }
  451. }
  452. /* Data transfer issued by StarPU */
  453. int _starpu_simgrid_transfer(size_t size, unsigned src_node, unsigned dst_node, struct _starpu_data_request *req)
  454. {
  455. msg_task_t task;
  456. msg_host_t *hosts = calloc(2, sizeof(*hosts));
  457. double *computation = calloc(2, sizeof(*computation));
  458. double *communication = calloc(4, sizeof(*communication));
  459. starpu_pthread_mutex_t mutex;
  460. starpu_pthread_cond_t cond;
  461. unsigned finished;
  462. hosts[0] = _starpu_simgrid_memory_node_get_host(src_node);
  463. hosts[1] = _starpu_simgrid_memory_node_get_host(dst_node);
  464. STARPU_ASSERT(hosts[0] != hosts[1]);
  465. communication[1] = size;
  466. task = MSG_parallel_task_create("copy", 2, hosts, computation, communication, NULL);
  467. struct transfer *transfer = transfer_new();
  468. _STARPU_DEBUG("creating transfer %p for %lu bytes\n", transfer, (unsigned long) size);
  469. transfer->task = task;
  470. transfer->src_node = src_node;
  471. transfer->dst_node = dst_node;
  472. transfer->run_node = _starpu_memory_node_get_local_key();
  473. if (req)
  474. {
  475. transfer->finished = &req->async_channel.event.finished;
  476. transfer->mutex = &req->async_channel.event.mutex;
  477. transfer->cond = &req->async_channel.event.cond;
  478. }
  479. else
  480. {
  481. transfer->finished = &finished;
  482. transfer->mutex = &mutex;
  483. transfer->cond = &cond;
  484. }
  485. *transfer->finished = 0;
  486. STARPU_PTHREAD_MUTEX_INIT(transfer->mutex, NULL);
  487. STARPU_PTHREAD_COND_INIT(transfer->cond, NULL);
  488. transfer->wake = NULL;
  489. transfer->nwake = 0;
  490. transfer->nwait = 0;
  491. if (req)
  492. _STARPU_TRACE_START_DRIVER_COPY_ASYNC(src_node, dst_node);
  493. /* Sleep 10µs for the GPU transfer queueing */
  494. if (_starpu_simgrid_queue_malloc_cost())
  495. MSG_process_sleep(0.000010);
  496. transfer_submit(transfer);
  497. /* Note: from here, transfer might be already freed */
  498. if (req)
  499. {
  500. _STARPU_TRACE_END_DRIVER_COPY_ASYNC(src_node, dst_node);
  501. _STARPU_TRACE_DATA_COPY(src_node, dst_node, size);
  502. return -EAGAIN;
  503. }
  504. else
  505. {
  506. /* this is not associated to a request so it's synchronous */
  507. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  508. while (!finished)
  509. STARPU_PTHREAD_COND_WAIT(&cond, &mutex);
  510. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  511. return 0;
  512. }
  513. }
  514. int
  515. _starpu_simgrid_thread_start(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[])
  516. {
  517. struct _starpu_pthread_args *_args = (void*) argv;
  518. struct _starpu_pthread_args args = *_args;
  519. /* _args is freed with process context */
  520. args.f(args.arg);
  521. return 0;
  522. }
  523. #endif