source_common.c 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2012 Inria
  4. *
  5. * StarPU is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * StarPU is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <string.h>
  17. #include <pthread.h>
  18. #include <starpu.h>
  19. #include <core/task.h>
  20. #include <core/sched_policy.h>
  21. #include <drivers/driver_common/driver_common.h>
  22. #include <datawizard/coherency.h>
  23. #include <datawizard/interfaces/data_interface.h>
  24. #include <drivers/mp_common/mp_common.h>
  25. static int
  26. _starpu_src_common_finalize_job (struct _starpu_job *j, struct _starpu_worker *worker)
  27. {
  28. uint32_t mask = 0;
  29. int profiling = starpu_profiling_status_get();
  30. struct timespec codelet_end;
  31. _starpu_driver_end_job(worker, j, worker->perf_arch, &codelet_end, 0,
  32. profiling);
  33. _starpu_driver_update_job_feedback(j, worker, worker->perf_arch,
  34. &j->cl_start, &codelet_end,
  35. profiling);
  36. _starpu_push_task_output (j, mask);
  37. _starpu_handle_job_termination(j);
  38. return 0;
  39. }
  40. static int
  41. _starpu_src_common_process_completed_job (struct _starpu_worker_set *workerset, void * arg, int arg_size STARPU_ATTRIBUTE_UNUSED)
  42. {
  43. void *arg_ptr = arg;
  44. int coreid;
  45. coreid = *(int *) arg_ptr;
  46. arg_ptr += sizeof (coreid); // Useless.
  47. struct _starpu_worker *worker = &workerset->workers[coreid];
  48. struct starpu_task *task = worker->current_task;
  49. struct _starpu_job *j = _starpu_get_job_associated_to_task (task);
  50. _starpu_src_common_finalize_job (j, worker);
  51. worker->current_task = NULL;
  52. return 0;
  53. }
  54. /* recv a message and handle asynchrone message
  55. * return 0 if the message has not been handle (it's certainly mean that it's a synchrone message)
  56. * return 1 if the message has been handle
  57. */
  58. static int _starpu_src_common_handle_async(const struct _starpu_mp_node *node,
  59. void ** arg, int* arg_size,
  60. enum _starpu_mp_command *answer)
  61. {
  62. struct _starpu_worker_set * worker_set = _starpu_get_worker_struct(starpu_worker_get_id())->set;
  63. *answer = _starpu_mp_common_recv_command(node, arg, arg_size);
  64. switch(*answer)
  65. {
  66. case STARPU_EXECUTION_COMPLETED:
  67. _starpu_src_common_process_completed_job (worker_set, *arg, *arg_size);
  68. break;
  69. default:
  70. return 0;
  71. break;
  72. }
  73. return 1;
  74. }
  75. enum _starpu_mp_command _starpu_src_common_wait_command_sync(const struct _starpu_mp_node *node,
  76. void ** arg, int* arg_size)
  77. {
  78. enum _starpu_mp_command answer;
  79. while(_starpu_src_common_handle_async(node,arg,arg_size,&answer));
  80. return answer;
  81. }
  82. void _starpu_src_common_recv_async(struct _starpu_mp_node * baseworker_node)
  83. {
  84. enum _starpu_mp_command answer;
  85. void *arg;
  86. int arg_size;
  87. if(!_starpu_src_common_handle_async(baseworker_node,&arg,&arg_size,&answer))
  88. {
  89. printf("incorrect commande: unknown command or sync command");
  90. STARPU_ASSERT(0);
  91. }
  92. }
  93. int
  94. _starpu_src_common_sink_nbcores (const struct _starpu_mp_node *node, int *buf)
  95. {
  96. // Send a request to the sink NODE for the number of cores on it.
  97. enum _starpu_mp_command answer;
  98. void *arg;
  99. int arg_size = sizeof (int);
  100. _starpu_mp_common_send_command (node, STARPU_SINK_NBCORES, NULL, 0);
  101. answer = _starpu_mp_common_recv_command (node, &arg, &arg_size);
  102. STARPU_ASSERT (answer == STARPU_ANSWER_SINK_NBCORES && arg_size == sizeof (int));
  103. memcpy (buf, arg, arg_size);
  104. return 0;
  105. }
  106. /* Send a request to the sink linked to NODE for the pointer to the
  107. * function defined by FUNC_NAME.
  108. * In case of success, it returns 0 and FUNC_PTR contains the pointer ;
  109. * else it returns -ESPIPE if the function was not found.
  110. */
  111. int _starpu_src_common_lookup(struct _starpu_mp_node *node,
  112. void (**func_ptr)(void), const char *func_name)
  113. {
  114. enum _starpu_mp_command answer;
  115. void *arg;
  116. int arg_size;
  117. /* strlen ignore the terminating '\0' */
  118. arg_size = (strlen(func_name) + 1) * sizeof(char);
  119. //_STARPU_DEBUG("Looking up %s\n", func_name);
  120. _starpu_mp_common_send_command(node, STARPU_LOOKUP, (void *) func_name,
  121. arg_size);
  122. answer = _starpu_src_common_wait_command_sync(node, (void **) &arg,
  123. &arg_size);
  124. if (answer == STARPU_ERROR_LOOKUP)
  125. {
  126. _STARPU_DISP("Error looking up symbol %s\n", func_name);
  127. return -ESPIPE;
  128. }
  129. /* We have to be sure the device answered the right question and the
  130. * answer has the right size */
  131. STARPU_ASSERT(answer == STARPU_ANSWER_LOOKUP);
  132. STARPU_ASSERT(arg_size == sizeof(*func_ptr));
  133. memcpy(func_ptr, arg, arg_size);
  134. //_STARPU_DEBUG("got %p\n", *func_ptr);
  135. return 0;
  136. }
  137. /* Send a message to the sink to execute a kernel.
  138. * The message sent has the form below :
  139. * [Function pointer on sink, number of interfaces, interfaces
  140. * (union _starpu_interface), cl_arg]
  141. */
  142. /* Launch the execution of the function KERNEL points to on the sink linked
  143. * to NODE. Returns 0 in case of success, -EINVAL if kernel is an invalid
  144. * pointer.
  145. * Data interfaces in task are send to the sink.
  146. */
  147. int _starpu_src_common_execute_kernel(const struct _starpu_mp_node *node,
  148. void (*kernel)(void), unsigned coreid,
  149. enum starpu_codelet_type type,
  150. int is_parallel_task, int cb_workerid,
  151. starpu_data_handle_t *handles,
  152. void **interfaces,
  153. unsigned nb_interfaces,
  154. void *cl_arg, size_t cl_arg_size)
  155. {
  156. void *buffer, *buffer_ptr, *arg =NULL;
  157. int i, buffer_size = 0, cb_worker_size = 0, arg_size =0;
  158. struct _starpu_combined_worker * cb_worker;
  159. unsigned devid;
  160. buffer_size = sizeof(kernel) + sizeof(coreid) + sizeof(type)
  161. + sizeof(nb_interfaces) + nb_interfaces * sizeof(union _starpu_interface) + sizeof(is_parallel_task);
  162. /*if the task is paralle*/
  163. if(type == STARPU_FORKJOIN && is_parallel_task)
  164. {
  165. _STARPU_DEBUG("\n Parallele\n");
  166. _STARPU_DEBUG("type:%d\n",type);
  167. _STARPU_DEBUG("cb_workerid:%d\n",cb_workerid);
  168. cb_worker = _starpu_get_combined_worker_struct(cb_workerid);
  169. cb_worker_size = cb_worker->worker_size;
  170. buffer_size = sizeof(cb_worker_size) + cb_worker_size * sizeof(devid);
  171. }
  172. /* If the user didn't give any cl_arg, there is no need to send it */
  173. if (cl_arg)
  174. {
  175. STARPU_ASSERT(cl_arg_size);
  176. buffer_size += cl_arg_size;
  177. }
  178. /* We give to send_command a buffer we just allocated, which contains
  179. * a pointer to the function (sink-side), core on which execute this
  180. * function (sink-side), number of interfaces we send,
  181. * an array of generic (union) interfaces and the value of cl_arg */
  182. buffer_ptr = buffer = (void *) malloc(buffer_size);
  183. *(void(**)(void)) buffer = kernel;
  184. buffer_ptr += sizeof(kernel);
  185. *(enum starpu_codelet_type *) buffer_ptr = type;
  186. buffer_ptr += sizeof(type);
  187. *(int *) buffer_ptr = is_parallel_task;
  188. buffer_ptr += sizeof(is_parallel_task);
  189. if(type == STARPU_FORKJOIN && is_parallel_task)
  190. {
  191. *(int *) buffer_ptr = cb_worker_size;
  192. buffer_ptr += sizeof(cb_worker_size);
  193. for (i = 0; i < cb_worker_size; i++)
  194. {
  195. int devid = _starpu_get_worker_struct(cb_worker->combined_workerid[i])->devid;
  196. *(int *) buffer_ptr = devid;
  197. buffer_ptr += sizeof(devid);
  198. }
  199. }
  200. *(unsigned *) buffer_ptr = coreid;
  201. buffer_ptr += sizeof(coreid);
  202. *(unsigned *) buffer_ptr = nb_interfaces;
  203. buffer_ptr += sizeof(nb_interfaces);
  204. /* Message-passing execution is a particular case as the codelet is
  205. * executed on a sink with a different memory, whereas a codelet is
  206. * executed on the host part for the other accelerators.
  207. * Thus we need to send a copy of each interface on the MP device */
  208. for (i = 0; i < nb_interfaces; i++)
  209. {
  210. starpu_data_handle_t handle = handles[i];
  211. memcpy (buffer_ptr, interfaces[i],
  212. handle->ops->interface_size);
  213. /* The sink side has no mean to get the type of each
  214. * interface, we use a union to make it generic and permit the
  215. * sink to go through the array */
  216. buffer_ptr += sizeof(union _starpu_interface);
  217. }
  218. if (cl_arg)
  219. memcpy(buffer_ptr, cl_arg, cl_arg_size);
  220. _starpu_mp_common_send_command(node, STARPU_EXECUTE, buffer, buffer_size);
  221. enum _starpu_mp_command answer = _starpu_src_common_wait_command_sync(node, &arg, &arg_size);
  222. if (answer == STARPU_ERROR_EXECUTE)
  223. return -EINVAL;
  224. STARPU_ASSERT(answer == STARPU_EXECUTION_SUBMITTED);
  225. free(buffer);
  226. return 0;
  227. }
  228. static int _starpu_src_common_execute(struct _starpu_job *j,
  229. struct _starpu_worker *worker,
  230. struct _starpu_mp_node * node)
  231. {
  232. int ret;
  233. uint32_t mask = 0;
  234. STARPU_ASSERT(j);
  235. struct starpu_task *task = j->task;
  236. int profiling = starpu_profiling_status_get();
  237. STARPU_ASSERT(task);
  238. ret = _starpu_fetch_task_input(j, mask);
  239. if (ret != 0)
  240. {
  241. /* there was not enough memory, so the input of
  242. * the codelet cannot be fetched ... put the
  243. * codelet back, and try it later */
  244. return -EAGAIN;
  245. }
  246. void (*kernel)(void) = node->get_kernel_from_job(node,j);
  247. _starpu_driver_start_job(worker, j, &j->cl_start, 0, profiling);
  248. _STARPU_DEBUG("j->task_size:%d\n",j->task_size);
  249. _STARPU_DEBUG("j->cb_workerid:%d\n",j->combined_workerid);
  250. _STARPU_DEBUG("cb_worker_count:%d\n",starpu_combined_worker_get_count());
  251. _starpu_src_common_execute_kernel(node, kernel, worker->devid, task->cl->type,
  252. (j->task_size > 1),
  253. j->combined_workerid, task->handles,
  254. task->interfaces, task->cl->nbuffers,
  255. task->cl_arg, task->cl_arg_size);
  256. return 0;
  257. }
  258. /* Send a request to the sink linked to the MP_NODE to allocate SIZE bytes on
  259. * the sink.
  260. * In case of success, it returns 0 and *ADDR contains the address of the
  261. * allocated area ;
  262. * else it returns 1 if the allocation fail.
  263. */
  264. int _starpu_src_common_allocate(const struct _starpu_mp_node *mp_node,
  265. void **addr, size_t size)
  266. {
  267. enum _starpu_mp_command answer;
  268. void *arg;
  269. int arg_size;
  270. _starpu_mp_common_send_command(mp_node, STARPU_ALLOCATE, &size,
  271. sizeof(size));
  272. answer = _starpu_mp_common_recv_command(mp_node, &arg, &arg_size);
  273. if (answer == STARPU_ERROR_ALLOCATE)
  274. return 1;
  275. STARPU_ASSERT(answer == STARPU_ANSWER_ALLOCATE &&
  276. arg_size == sizeof(*addr));
  277. memcpy(addr, arg, arg_size);
  278. return 0;
  279. }
  280. /* Send a request to the sink linked to the MP_NODE to deallocate the memory
  281. * area pointed by ADDR.
  282. */
  283. void _starpu_src_common_free(const struct _starpu_mp_node *mp_node,
  284. void *addr)
  285. {
  286. _starpu_mp_common_send_command(mp_node, STARPU_FREE, &addr, sizeof(addr));
  287. }
  288. /* Send SIZE bytes pointed by SRC to DST on the sink linked to the MP_NODE.
  289. */
  290. int _starpu_src_common_copy_host_to_sink(const struct _starpu_mp_node *mp_node,
  291. void *src, void *dst, size_t size)
  292. {
  293. struct _starpu_mp_transfer_command cmd = {size, dst};
  294. _starpu_mp_common_send_command(mp_node, STARPU_RECV_FROM_HOST, &cmd, sizeof(cmd));
  295. mp_node->dt_send(mp_node, src, size);
  296. return 0;
  297. }
  298. /* Receive SIZE bytes pointed by SRC on the sink linked to the MP_NODE and store them in DST.
  299. */
  300. int _starpu_src_common_copy_sink_to_host(const struct _starpu_mp_node *mp_node,
  301. void *src, void *dst, size_t size)
  302. {
  303. struct _starpu_mp_transfer_command cmd = {size, src};
  304. _starpu_mp_common_send_command(mp_node, STARPU_SEND_TO_HOST, &cmd, sizeof(cmd));
  305. mp_node->dt_recv(mp_node, dst, size);
  306. return 0;
  307. }
  308. /* Tell the sink linked to SRC_NODE to send SIZE bytes of data pointed by SRC
  309. * to the sink linked to DST_NODE. The latter store them in DST.
  310. */
  311. int _starpu_src_common_copy_sink_to_sink(const struct _starpu_mp_node *src_node,
  312. const struct _starpu_mp_node *dst_node, void *src, void *dst, size_t size)
  313. {
  314. enum _starpu_mp_command answer;
  315. void *arg;
  316. int arg_size;
  317. struct _starpu_mp_transfer_command_to_device cmd = {dst_node->peer_id, size, src};
  318. /* Tell source to send data to dest. */
  319. _starpu_mp_common_send_command(src_node, STARPU_SEND_TO_SINK, &cmd, sizeof(cmd));
  320. cmd.devid = src_node->peer_id;
  321. cmd.size = size;
  322. cmd.addr = dst;
  323. /* Tell dest to receive data from source. */
  324. _starpu_mp_common_send_command(dst_node, STARPU_RECV_FROM_SINK, &cmd, sizeof(cmd));
  325. /* Wait for answer from dest to know wether transfer is finished. */
  326. answer = _starpu_mp_common_recv_command(dst_node, &arg, &arg_size);
  327. STARPU_ASSERT(answer == STARPU_TRANSFER_COMPLETE);
  328. return 0;
  329. }
  330. /* 5 functions to determine the executable to run on the device (MIC, SCC,
  331. * MPI).
  332. */
  333. static void _starpu_src_common_cat_3(char *final, const char *first,
  334. const char *second, const char *third)
  335. {
  336. strcpy(final, first);
  337. strcat(final, second);
  338. strcat(final, third);
  339. }
  340. static void _starpu_src_common_cat_2(char *final, const char *first, const char *second)
  341. {
  342. _starpu_src_common_cat_3(final, first, second, "");
  343. }
  344. static void _starpu_src_common_dir_cat(char *final, const char *dir, const char *file)
  345. {
  346. if (file[0] == '/')
  347. ++file;
  348. size_t size = strlen(dir);
  349. if (dir[size - 1] == '/')
  350. _starpu_src_common_cat_2(final, dir, file);
  351. else
  352. _starpu_src_common_cat_3(final, dir, "/", file);
  353. }
  354. static int _starpu_src_common_test_suffixes(char *located_file_name, const char *base, const char **suffixes)
  355. {
  356. unsigned int i;
  357. for (i = 0; suffixes[i] != NULL; ++i)
  358. {
  359. _starpu_src_common_cat_2(located_file_name, base, suffixes[i]);
  360. if (access(located_file_name, R_OK) == 0)
  361. return 0;
  362. }
  363. return 1;
  364. }
  365. int _starpu_src_common_locate_file(char *located_file_name,
  366. const char *env_file_name, const char *env_mic_path,
  367. const char *config_file_name, const char *actual_file_name,
  368. const char **suffixes)
  369. {
  370. if (env_file_name != NULL)
  371. {
  372. if (access(env_file_name, R_OK) == 0)
  373. {
  374. strcpy(located_file_name, env_file_name);
  375. return 0;
  376. }
  377. else if(env_mic_path != NULL)
  378. {
  379. _starpu_src_common_dir_cat(located_file_name, env_mic_path, env_file_name);
  380. return access(located_file_name, R_OK);
  381. }
  382. }
  383. else if (config_file_name != NULL)
  384. {
  385. if (access(config_file_name, R_OK) == 0)
  386. {
  387. strcpy(located_file_name, config_file_name);
  388. return 0;
  389. }
  390. else if (env_mic_path != NULL)
  391. {
  392. _starpu_src_common_dir_cat(located_file_name, env_mic_path, config_file_name);
  393. return access(located_file_name, R_OK);
  394. }
  395. }
  396. else if (actual_file_name != NULL)
  397. {
  398. if (_starpu_src_common_test_suffixes(located_file_name, actual_file_name, suffixes) == 0)
  399. return 0;
  400. if (env_mic_path != NULL)
  401. {
  402. char actual_cpy[1024];
  403. strcpy(actual_cpy, actual_file_name);
  404. char *last = strrchr(actual_cpy, '/');
  405. while (last != NULL)
  406. {
  407. char tmp[1024];
  408. _starpu_src_common_dir_cat(tmp, env_mic_path, last);
  409. if (access(tmp, R_OK) == 0)
  410. {
  411. strcpy(located_file_name, tmp);
  412. return 0;
  413. }
  414. if (_starpu_src_common_test_suffixes(located_file_name, tmp, suffixes) == 0)
  415. return 0;
  416. *last = '\0';
  417. char *last_tmp = strrchr(actual_cpy, '/');
  418. *last = '/';
  419. last = last_tmp;
  420. }
  421. }
  422. }
  423. return 1;
  424. }
  425. void _starpu_src_common_worker(struct _starpu_worker_set * worker_set,
  426. unsigned baseworkerid,
  427. struct _starpu_mp_node * mp_node)
  428. {
  429. struct _starpu_worker * baseworker = &worker_set->workers[baseworkerid];
  430. unsigned memnode = baseworker->memory_node;
  431. struct starpu_task **tasks = malloc(sizeof(struct starpu_task *)*worker_set->nworkers);
  432. /*main loop*/
  433. while (_starpu_machine_is_running())
  434. {
  435. int res;
  436. struct _starpu_job * j;
  437. _STARPU_TRACE_START_PROGRESS(memnode);
  438. _starpu_datawizard_progress(memnode, 1);
  439. _STARPU_TRACE_END_PROGRESS(memnode);
  440. /* poll the device for completed jobs.*/
  441. if (mp_node->mp_recv_is_ready(mp_node))
  442. _starpu_src_common_recv_async(mp_node);
  443. /* get task for each worker*/
  444. res = _starpu_get_multi_worker_task(worker_set->workers, tasks, worker_set->nworkers);
  445. /*if at least one worker have pop a task*/
  446. if(res != 0)
  447. {
  448. unsigned i;
  449. //_STARPU_DEBUG(" nb_tasks:%d\n", res);
  450. for(i=1; i<worker_set->nworkers; i++)
  451. {
  452. if(tasks[i] != NULL)
  453. {
  454. j = _starpu_get_job_associated_to_task(tasks[i]);
  455. worker_set->workers[i].current_task = j->task;
  456. res = _starpu_src_common_execute(j, &worker_set->workers[i], mp_node);
  457. if (res)
  458. {
  459. switch (res)
  460. {
  461. case -EAGAIN:
  462. _STARPU_DISP("ouch, Xeon Phi could not actually run task %p, putting it back...\n", tasks[i]);
  463. _starpu_push_task_to_workers(tasks[i]);
  464. STARPU_ABORT();
  465. continue;
  466. break;
  467. default:
  468. STARPU_ASSERT(0);
  469. }
  470. }
  471. //_STARPU_DEBUG(" exec fin\n");
  472. }
  473. }
  474. }
  475. }
  476. free(tasks);
  477. _starpu_handle_all_pending_node_data_requests(memnode);
  478. /* In case there remains some memory that was automatically
  479. * allocated by StarPU, we release it now. Note that data
  480. * coherency is not maintained anymore at that point ! */
  481. _starpu_free_all_automatically_allocated_buffers(memnode);
  482. }