simgrid.c 12 KB


  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2012-2013 Université de Bordeaux 1
  4. *
  5. * StarPU is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * StarPU is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <starpu.h>
  17. #include <datawizard/memory_nodes.h>
  18. #include <unistd.h>
  19. #include <core/perfmodel/perfmodel.h>
  20. #include <core/workers.h>
  21. #include <core/simgrid.h>
  22. #ifdef STARPU_SIMGRID
  23. #include <msg/msg.h>
  24. #define MAX_TSD 16
  25. #pragma weak starpu_main
  26. extern int starpu_main(int argc, char *argv[]);
  27. struct main_args
  28. {
  29. int argc;
  30. char **argv;
  31. };
  32. int do_starpu_main(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[] STARPU_ATTRIBUTE_UNUSED)
  33. {
  34. struct main_args *args = MSG_process_get_data(MSG_process_self());
  35. return starpu_main(args->argc, args->argv);
  36. }
  37. int _starpu_simgrid_get_nbhosts(const char *prefix)
  38. {
  39. int ret;
  40. xbt_dynar_t hosts = MSG_hosts_as_dynar();
  41. unsigned i, nb = xbt_dynar_length(hosts);
  42. unsigned len = strlen(prefix);
  43. ret = 0;
  44. for (i = 0; i < nb; i++) {
  45. const char *name;
  46. name = MSG_host_get_name(xbt_dynar_get_as(hosts, i, msg_host_t));
  47. if (!strncmp(name, prefix, len))
  48. ret++;
  49. }
  50. xbt_dynar_free(&hosts);
  51. return ret;
  52. }
  53. #ifdef STARPU_DEVEL
  54. #warning TODO: use another way to start main, when simgrid provides it, and then include the application-provided configuration for platform numbers
  55. #endif
  56. #undef main
  57. int main(int argc, char **argv)
  58. {
  59. xbt_dynar_t hosts;
  60. int i;
  61. char path[256];
  62. if (!starpu_main)
  63. {
  64. _STARPU_ERROR("The main file of this application needs to be compiled with starpu.h included, to properly define starpu_main\n");
  65. exit(EXIT_FAILURE);
  66. }
  67. MSG_init(&argc, argv);
  68. #if SIMGRID_VERSION_MAJOR < 3 || (SIMGRID_VERSION_MAJOR == 3 && SIMGRID_VERSION_MINOR < 9)
  69. /* Versions earlier than 3.9 didn't support our communication tasks */
  70. MSG_config("workstation/model", "ptask_L07");
  71. #endif
  72. /* Load XML platform */
  73. _starpu_simgrid_get_platform_path(path, sizeof(path));
  74. MSG_create_environment(path);
  75. hosts = MSG_hosts_as_dynar();
  76. int nb = xbt_dynar_length(hosts);
  77. for (i = 0; i < nb; i++)
  78. MSG_host_set_data(xbt_dynar_get_as(hosts, i, msg_host_t), calloc(MAX_TSD, sizeof(void*)));
  79. struct main_args args = { .argc = argc, .argv = argv };
  80. MSG_process_create("main", &do_starpu_main, &args, xbt_dynar_get_as(hosts, 0, msg_host_t));
  81. xbt_dynar_free(&hosts);
  82. MSG_main();
  83. return 0;
  84. }
  85. /* Task execution submitted by StarPU */
  86. void _starpu_simgrid_execute_job(struct _starpu_job *j, enum starpu_perf_archtype perf_arch, double length)
  87. {
  88. struct starpu_task *task = j->task;
  89. msg_task_t simgrid_task;
  90. if (j->exclude_from_dag)
  91. /* This is not useful to include in simulation (and probably
  92. * doesn't have a perfmodel anyway) */
  93. return;
  94. if (isnan(length))
  95. {
  96. length = starpu_task_expected_length(task, perf_arch, j->nimpl);
  97. STARPU_ASSERT_MSG(!_STARPU_IS_ZERO(length) && !isnan(length),
  98. "Codelet %s does not have a perfmodel, or is not calibrated enough",
  99. _starpu_job_get_model_name(j));
  100. }
  101. simgrid_task = MSG_task_create(_starpu_job_get_model_name(j),
  102. length/1000000.0*MSG_get_host_speed(MSG_host_self()),
  103. 0, NULL);
  104. MSG_task_execute(simgrid_task);
  105. }
  106. /* Note: simgrid is not parallel, so there is no need to hold locks for management of transfers. */
  107. LIST_TYPE(transfer,
  108. msg_task_t task;
  109. int src_node;
  110. int dst_node;
  111. int run_node;
  112. /* communication termination signalization */
  113. unsigned *finished;
  114. _starpu_pthread_mutex_t *mutex;
  115. _starpu_pthread_cond_t *cond;
  116. /* transfers which wait for this transfer */
  117. struct transfer **wake;
  118. unsigned nwake;
  119. /* Number of transfers that this transfer waits for */
  120. unsigned nwait;
  121. )
  122. struct transfer_list *pending;
  123. /* Tell for two transfers whether they should be handled in sequence */
  124. static int transfers_are_sequential(struct transfer *new_transfer, struct transfer *old_transfer)
  125. {
  126. int new_is_cuda STARPU_ATTRIBUTE_UNUSED, old_is_cuda STARPU_ATTRIBUTE_UNUSED;
  127. int new_is_opencl STARPU_ATTRIBUTE_UNUSED, old_is_opencl STARPU_ATTRIBUTE_UNUSED;
  128. int new_is_gpu_gpu, old_is_gpu_gpu;
  129. new_is_cuda = starpu_node_get_kind(new_transfer->src_node) == STARPU_CUDA_RAM;
  130. new_is_cuda |= starpu_node_get_kind(new_transfer->dst_node) == STARPU_CUDA_RAM;
  131. old_is_cuda = starpu_node_get_kind(old_transfer->src_node) == STARPU_CUDA_RAM;
  132. old_is_cuda |= starpu_node_get_kind(old_transfer->dst_node) == STARPU_CUDA_RAM;
  133. new_is_opencl = starpu_node_get_kind(new_transfer->src_node) == STARPU_OPENCL_RAM;
  134. new_is_opencl |= starpu_node_get_kind(new_transfer->dst_node) == STARPU_OPENCL_RAM;
  135. old_is_opencl = starpu_node_get_kind(old_transfer->src_node) == STARPU_OPENCL_RAM;
  136. old_is_opencl |= starpu_node_get_kind(old_transfer->dst_node) == STARPU_OPENCL_RAM;
  137. new_is_gpu_gpu = new_transfer->src_node && new_transfer->dst_node;
  138. old_is_gpu_gpu = old_transfer->src_node && old_transfer->dst_node;
  139. /* We ignore cuda-opencl transfers, they can not happen */
  140. STARPU_ASSERT(!((new_is_cuda && old_is_opencl) || (old_is_cuda && new_is_opencl)));
  141. /* The following constraints have been observed with CUDA alone */
  142. /* Same source/destination, sequential */
  143. if (new_transfer->src_node == old_transfer->src_node && new_transfer->dst_node == old_transfer->dst_node)
  144. return 1;
  145. /* Crossed GPU-GPU, sequential */
  146. if (new_is_gpu_gpu
  147. && new_transfer->src_node == old_transfer->dst_node
  148. && old_transfer->src_node == new_transfer->dst_node)
  149. return 1;
  150. /* GPU-GPU transfers are sequential with any RAM->GPU transfer */
  151. if (new_is_gpu_gpu
  152. && old_transfer->dst_node == new_transfer->src_node
  153. && old_transfer->dst_node == new_transfer->dst_node)
  154. return 1;
  155. if (old_is_gpu_gpu
  156. && new_transfer->dst_node == old_transfer->src_node
  157. && new_transfer->dst_node == old_transfer->dst_node)
  158. return 1;
  159. /* These constraints come from StarPU */
  160. /* StarPU uses one stream per direction */
  161. /* RAM->GPU and GPU->RAM are already handled by "same source/destination" */
  162. /* StarPU uses one stream per running GPU for GPU-GPU transfers */
  163. if (new_is_gpu_gpu && old_is_gpu_gpu && new_transfer->run_node == old_transfer->run_node)
  164. return 1;
  165. return 0;
  166. }
  167. /* Actually execute the transfer, and then start transfers waiting for this one. */
  168. static int transfer_execute(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[] STARPU_ATTRIBUTE_UNUSED)
  169. {
  170. struct transfer *transfer = MSG_process_get_data(MSG_process_self());
  171. unsigned i;
  172. _STARPU_DEBUG("transfer %p started\n", transfer);
  173. MSG_task_execute(transfer->task);
  174. MSG_task_destroy(transfer->task);
  175. _STARPU_DEBUG("transfer %p finished\n", transfer);
  176. _STARPU_PTHREAD_MUTEX_LOCK(transfer->mutex);
  177. *transfer->finished = 1;
  178. _STARPU_PTHREAD_COND_BROADCAST(transfer->cond);
  179. _STARPU_PTHREAD_MUTEX_UNLOCK(transfer->mutex);
  180. /* The workers which started this request may be sleeping out of tasks, wake it */
  181. _starpu_wake_all_blocked_workers_on_node(transfer->run_node);
  182. /* Wake transfers waiting for my termination */
  183. /* Note: due to possible preemption inside process_create, the array
  184. * may grow while doing this */
  185. for (i = 0; i < transfer->nwake; i++)
  186. {
  187. struct transfer *wake = transfer->wake[i];
  188. STARPU_ASSERT(wake->nwait > 0);
  189. wake->nwait--;
  190. if (!wake->nwait)
  191. {
  192. _STARPU_DEBUG("triggering transfer %p\n", wake);
  193. MSG_process_create("transfer task", transfer_execute, wake, MSG_get_host_by_name("MAIN"));
  194. }
  195. }
  196. free(transfer->wake);
  197. transfer_list_erase(pending, transfer);
  198. transfer_delete(transfer);
  199. return 0;
  200. }
  201. /* Look for sequentialization between this transfer and pending transfers, and submit this one */
  202. static void transfer_submit(struct transfer *transfer)
  203. {
  204. struct transfer *old;
  205. if (!pending)
  206. pending = transfer_list_new();
  207. for (old = transfer_list_begin(pending);
  208. old != transfer_list_end(pending);
  209. old = transfer_list_next(old))
  210. {
  211. if (transfers_are_sequential(transfer, old))
  212. {
  213. _STARPU_DEBUG("transfer %p(%d->%d) waits for %p(%d->%d)\n",
  214. transfer, transfer->src_node, transfer->dst_node,
  215. old, old->src_node, old->dst_node);
  216. /* Make new wait for the old */
  217. transfer->nwait++;
  218. /* Make old wake the new */
  219. old->wake = realloc(old->wake, (old->nwake + 1) * sizeof(old->wake));
  220. old->wake[old->nwake] = transfer;
  221. old->nwake++;
  222. }
  223. }
  224. transfer_list_push_front(pending, transfer);
  225. if (!transfer->nwait)
  226. {
  227. _STARPU_DEBUG("transfer %p waits for nobody, starting\n", transfer);
  228. MSG_process_create("transfer task", transfer_execute, transfer, MSG_get_host_by_name("MAIN"));
  229. }
  230. }
  231. /* Data transfer issued by StarPU */
  232. int _starpu_simgrid_transfer(size_t size, unsigned src_node, unsigned dst_node, struct _starpu_data_request *req)
  233. {
  234. msg_task_t task;
  235. msg_host_t *hosts = calloc(2, sizeof(*hosts));
  236. double *computation = calloc(2, sizeof(*computation));
  237. double *communication = calloc(4, sizeof(*communication));
  238. _starpu_pthread_mutex_t mutex;
  239. _starpu_pthread_cond_t cond;
  240. unsigned finished;
  241. hosts[0] = _starpu_simgrid_memory_node_get_host(src_node);
  242. hosts[1] = _starpu_simgrid_memory_node_get_host(dst_node);
  243. STARPU_ASSERT(hosts[0] != hosts[1]);
  244. communication[1] = size;
  245. task = MSG_parallel_task_create("copy", 2, hosts, computation, communication, NULL);
  246. struct transfer *transfer = transfer_new();
  247. _STARPU_DEBUG("creating transfer %p for %lu bytes\n", transfer, (unsigned long) size);
  248. transfer->task = task;
  249. transfer->src_node = src_node;
  250. transfer->dst_node = dst_node;
  251. transfer->run_node = _starpu_memory_node_get_local_key();
  252. if (req)
  253. {
  254. transfer->finished = &req->async_channel.event.finished;
  255. transfer->mutex = &req->async_channel.event.mutex;
  256. transfer->cond = &req->async_channel.event.cond;
  257. }
  258. else
  259. {
  260. transfer->finished = &finished;
  261. transfer->mutex = &mutex;
  262. transfer->cond = &cond;
  263. }
  264. *transfer->finished = 0;
  265. _STARPU_PTHREAD_MUTEX_INIT(transfer->mutex, NULL);
  266. _STARPU_PTHREAD_COND_INIT(transfer->cond, NULL);
  267. transfer->wake = NULL;
  268. transfer->nwake = 0;
  269. transfer->nwait = 0;
  270. if (req)
  271. _STARPU_TRACE_START_DRIVER_COPY_ASYNC(src_node, dst_node);
  272. transfer_submit(transfer);
  273. /* Note: from here, transfer might be already freed */
  274. if (req)
  275. {
  276. _STARPU_TRACE_END_DRIVER_COPY_ASYNC(src_node, dst_node);
  277. _STARPU_TRACE_DATA_COPY(src_node, dst_node, size);
  278. return -EAGAIN;
  279. }
  280. else
  281. {
  282. /* this is not associated to a request so it's synchronous */
  283. _STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  284. while (!finished)
  285. _STARPU_PTHREAD_COND_WAIT(&cond, &mutex);
  286. _STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  287. return 0;
  288. }
  289. }
  290. static int used_key[MAX_TSD];
  291. int _starpu_pthread_key_create(_starpu_pthread_key_t *key)
  292. {
  293. unsigned i;
  294. /* Note: no synchronization here, we are actually monothreaded anyway. */
  295. for (i = 0; i < MAX_TSD; i++)
  296. if (!used_key[i])
  297. {
  298. used_key[i] = 1;
  299. break;
  300. }
  301. STARPU_ASSERT(i < MAX_TSD);
  302. *key = i;
  303. return 0;
  304. }
  305. int _starpu_pthread_key_delete(_starpu_pthread_key_t key)
  306. {
  307. used_key[key] = 0;
  308. return 0;
  309. }
  310. int _starpu_pthread_setspecific(_starpu_pthread_key_t key, void *ptr)
  311. {
  312. void **array = MSG_host_get_data(MSG_host_self());
  313. array[key] = ptr;
  314. return 0;
  315. }
  316. void* _starpu_pthread_getspecific(_starpu_pthread_key_t key)
  317. {
  318. void **array = MSG_host_get_data(MSG_host_self());
  319. return array[key];
  320. }
  321. int
  322. _starpu_simgrid_thread_start(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[] STARPU_ATTRIBUTE_UNUSED)
  323. {
  324. struct _starpu_pthread_args *args = MSG_process_get_data(MSG_process_self());
  325. args->f(args->arg);
  326. free(args);
  327. return 0;
  328. }
  329. #endif