source_common.c 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2012, 2016 INRIA
  4. *
  5. * StarPU is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * StarPU is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <string.h>
  17. #include <starpu.h>
  18. #include <core/task.h>
  19. #include <core/sched_policy.h>
  20. #include <drivers/driver_common/driver_common.h>
  21. #include <datawizard/coherency.h>
  22. #include <datawizard/memory_nodes.h>
  23. #include <datawizard/interfaces/data_interface.h>
  24. #include <drivers/mp_common/mp_common.h>
  25. #if defined(STARPU_USE_MPI_MASTER_SLAVE) && !defined(STARPU_MPI_MASTER_SLAVE_MULTIPLE_THREAD)
  26. struct starpu_save_thread_env
  27. {
  28. struct starpu_task * current_task;
  29. struct _starpu_worker * current_worker;
  30. struct _starpu_worker_set * current_worker_set;
  31. unsigned * current_mem_node;
  32. #ifdef STARPU_OPENMP
  33. struct starpu_omp_thread * current_omp_thread;
  34. struct starpu_omp_task * current_omp_task;
  35. #endif
  36. };
  37. struct starpu_save_thread_env save_thread_env[STARPU_MAXMPIDEVS];
  38. #endif
  39. /* Finalize the execution of a task by a worker*/
  40. static int _starpu_src_common_finalize_job (struct _starpu_job *j, struct _starpu_worker *worker)
  41. {
  42. int profiling = starpu_profiling_status_get();
  43. struct timespec codelet_end;
  44. _starpu_driver_end_job(worker, j, &worker->perf_arch, &codelet_end, 0,
  45. profiling);
  46. int count = worker->current_rank;
  47. /* If it's a combined worker, we check if it's the last one of his combined */
  48. if(j->task_size > 1)
  49. {
  50. struct _starpu_combined_worker * cb_worker = _starpu_get_combined_worker_struct(worker->combined_workerid);
  51. STARPU_PTHREAD_MUTEX_LOCK(&cb_worker->count_mutex);
  52. count = cb_worker->count--;
  53. if(count == 0)
  54. cb_worker->count = cb_worker->worker_size - 1;
  55. STARPU_PTHREAD_MUTEX_UNLOCK(&cb_worker->count_mutex);
  56. }
  57. /* Finalize the execution */
  58. if(count == 0)
  59. {
  60. _starpu_driver_update_job_feedback(j, worker, &worker->perf_arch,
  61. &j->cl_start, &codelet_end,
  62. profiling);
  63. _starpu_push_task_output (j);
  64. _starpu_handle_job_termination(j);
  65. }
  66. return 0;
  67. }
  68. /* Complete the execution of the job */
  69. static int _starpu_src_common_process_completed_job(struct _starpu_mp_node *node, struct _starpu_worker_set *workerset, void * arg, int arg_size, int stored)
  70. {
  71. int coreid;
  72. STARPU_ASSERT(sizeof(coreid) == arg_size);
  73. coreid = *(int *) arg;
  74. struct _starpu_worker *worker = &workerset->workers[coreid];
  75. struct _starpu_job *j = _starpu_get_job_associated_to_task(worker->current_task);
  76. struct _starpu_worker * old_worker = _starpu_get_local_worker_key();
  77. /* if arg is not copied we release the mutex */
  78. if (!stored)
  79. STARPU_PTHREAD_MUTEX_UNLOCK(&node->connection_mutex);
  80. _starpu_set_local_worker_key(worker);
  81. _starpu_src_common_finalize_job (j, worker);
  82. _starpu_set_local_worker_key(old_worker);
  83. worker->current_task = NULL;
  84. return 0;
  85. }
  86. /* Tell the scheduler when the execution has begun */
  87. static void _starpu_src_common_pre_exec(struct _starpu_mp_node *node, void * arg, int arg_size, int stored)
  88. {
  89. int cb_workerid, i;
  90. STARPU_ASSERT(sizeof(cb_workerid) == arg_size);
  91. cb_workerid = *(int *) arg;
  92. struct _starpu_combined_worker *combined_worker = _starpu_get_combined_worker_struct(cb_workerid);
  93. /* if arg is not copied we release the mutex */
  94. if (!stored)
  95. STARPU_PTHREAD_MUTEX_LOCK(&node->connection_mutex);
  96. for(i=0; i < combined_worker->worker_size; i++)
  97. {
  98. struct _starpu_worker * worker = _starpu_get_worker_struct(combined_worker->combined_workerid[i]);
  99. _starpu_set_local_worker_key(worker);
  100. _starpu_sched_pre_exec_hook(worker->current_task);
  101. }
  102. }
  103. /* recv a message and handle asynchronous message
  104. * return 0 if the message has not been handle (it's certainly mean that it's a synchronous message)
  105. * return 1 if the message has been handle
  106. */
  107. static int _starpu_src_common_handle_async(struct _starpu_mp_node *node,
  108. void * arg, int arg_size,
  109. enum _starpu_mp_command answer, int stored)
  110. {
  111. struct _starpu_worker_set * worker_set = NULL;
  112. switch(answer)
  113. {
  114. case STARPU_MP_COMMAND_EXECUTION_COMPLETED:
  115. worker_set = _starpu_get_worker_struct(starpu_worker_get_id())->set;
  116. _starpu_src_common_process_completed_job(node, worker_set, arg, arg_size, stored);
  117. break;
  118. case STARPU_MP_COMMAND_PRE_EXECUTION:
  119. _starpu_src_common_pre_exec(node, arg,arg_size, stored);
  120. break;
  121. case STARPU_MP_COMMAND_RECV_FROM_HOST_ASYNC_COMPLETED:
  122. case STARPU_MP_COMMAND_RECV_FROM_SINK_ASYNC_COMPLETED:
  123. {
  124. struct _starpu_async_channel * event = *((struct _starpu_async_channel **) arg);
  125. event->starpu_mp_common_finished_receiver--;
  126. if (!stored)
  127. STARPU_PTHREAD_MUTEX_UNLOCK(&node->connection_mutex);
  128. break;
  129. }
  130. case STARPU_MP_COMMAND_SEND_TO_HOST_ASYNC_COMPLETED:
  131. case STARPU_MP_COMMAND_SEND_TO_SINK_ASYNC_COMPLETED:
  132. {
  133. struct _starpu_async_channel * event = *((struct _starpu_async_channel **) arg);
  134. event->starpu_mp_common_finished_sender--;
  135. if (!stored)
  136. STARPU_PTHREAD_MUTEX_UNLOCK(&node->connection_mutex);
  137. break;
  138. }
  139. default:
  140. return 0;
  141. break;
  142. }
  143. return 1;
  144. }
  145. /* Handle all message which have been stored in the message_queue */
  146. static void _starpu_src_common_handle_stored_async(struct _starpu_mp_node *node)
  147. {
  148. STARPU_PTHREAD_MUTEX_LOCK(&node->message_queue_mutex);
  149. /* while the list is not empty */
  150. while(!mp_message_list_empty(&node->message_queue))
  151. {
  152. /* We pop a message and handle it */
  153. struct mp_message * message = mp_message_list_pop_back(&node->message_queue);
  154. /* Release mutex during handle */
  155. STARPU_PTHREAD_MUTEX_UNLOCK(&node->message_queue_mutex);
  156. _starpu_src_common_handle_async(node, message->buffer,
  157. message->size, message->type, 1);
  158. free(message->buffer);
  159. mp_message_delete(message);
  160. /* Take it again */
  161. STARPU_PTHREAD_MUTEX_LOCK(&node->message_queue_mutex);
  162. }
  163. STARPU_PTHREAD_MUTEX_UNLOCK(&node->message_queue_mutex);
  164. }
  165. /* Store a message if is asynchronous
  166. * return 1 if the message has been stored
  167. * return 0 if the message is unknown or synchrone */
  168. int _starpu_src_common_store_message(struct _starpu_mp_node *node,
  169. void * arg, int arg_size, enum _starpu_mp_command answer)
  170. {
  171. struct mp_message * message = NULL;
  172. switch(answer)
  173. {
  174. case STARPU_MP_COMMAND_EXECUTION_COMPLETED:
  175. case STARPU_MP_COMMAND_PRE_EXECUTION:
  176. message = mp_message_new();
  177. message->type = answer;
  178. _STARPU_MALLOC(message->buffer, arg_size);
  179. memcpy(message->buffer, arg, arg_size);
  180. message->size = arg_size;
  181. STARPU_PTHREAD_MUTEX_LOCK(&node->message_queue_mutex);
  182. mp_message_list_push_front(&node->message_queue,message);
  183. STARPU_PTHREAD_MUTEX_UNLOCK(&node->message_queue_mutex);
  184. return 1;
  185. break;
  186. /* For ASYNC commands don't store them, update event */
  187. case STARPU_MP_COMMAND_RECV_FROM_HOST_ASYNC_COMPLETED:
  188. case STARPU_MP_COMMAND_RECV_FROM_SINK_ASYNC_COMPLETED:
  189. {
  190. struct _starpu_async_channel * event = *((struct _starpu_async_channel **) arg);
  191. event->starpu_mp_common_finished_receiver--;
  192. return 1;
  193. break;
  194. }
  195. case STARPU_MP_COMMAND_SEND_TO_HOST_ASYNC_COMPLETED:
  196. case STARPU_MP_COMMAND_SEND_TO_SINK_ASYNC_COMPLETED:
  197. {
  198. struct _starpu_async_channel * event = *((struct _starpu_async_channel **) arg);
  199. event->starpu_mp_common_finished_sender--;
  200. return 1;
  201. break;
  202. }
  203. default:
  204. return 0;
  205. break;
  206. }
  207. }
  208. /* Store all asynchronous messages and return when a synchronous message is received */
  209. static enum _starpu_mp_command _starpu_src_common_wait_command_sync(struct _starpu_mp_node *node,
  210. void ** arg, int* arg_size)
  211. {
  212. enum _starpu_mp_command answer;
  213. int is_sync = 0;
  214. while(!is_sync)
  215. {
  216. answer = _starpu_mp_common_recv_command(node, arg, arg_size);
  217. if(!_starpu_src_common_store_message(node,*arg,*arg_size,answer))
  218. is_sync=1;
  219. }
  220. return answer;
  221. }
  222. /* Handle a asynchrone message and return a error if a synchronous message is received */
  223. static void _starpu_src_common_recv_async(struct _starpu_mp_node * node)
  224. {
  225. enum _starpu_mp_command answer;
  226. void *arg;
  227. int arg_size;
  228. answer = _starpu_mp_common_recv_command(node, &arg, &arg_size);
  229. if(!_starpu_src_common_handle_async(node,arg,arg_size,answer, 0))
  230. {
  231. printf("incorrect commande: unknown command or sync command");
  232. STARPU_ASSERT(0);
  233. }
  234. }
  235. /* Handle all asynchrone message while a completed execution message from a specific worker has been receive */
  236. enum _starpu_mp_command _starpu_src_common_wait_completed_execution(struct _starpu_mp_node *node, int devid, void **arg, int * arg_size)
  237. {
  238. enum _starpu_mp_command answer;
  239. int completed = 0;
  240. /*While the waited completed execution message has not been receive*/
  241. while(!completed)
  242. {
  243. answer = _starpu_mp_common_recv_command (node, arg, arg_size);
  244. if(answer == STARPU_MP_COMMAND_EXECUTION_COMPLETED)
  245. {
  246. int coreid;
  247. STARPU_ASSERT(sizeof(coreid) == *arg_size);
  248. coreid = *(int *) *arg;
  249. if(devid == coreid)
  250. completed = 1;
  251. else
  252. if(!_starpu_src_common_store_message(node, *arg, *arg_size, answer))
  253. /* We receive a unknown or asynchronous message */
  254. STARPU_ASSERT(0);
  255. }
  256. else
  257. {
  258. if(!_starpu_src_common_store_message(node, *arg, *arg_size, answer))
  259. /* We receive a unknown or asynchronous message */
  260. STARPU_ASSERT(0);
  261. }
  262. }
  263. return answer;
  264. }
  265. /* Send a request to the sink NODE for the number of cores on it. */
  266. int _starpu_src_common_sink_nbcores (struct _starpu_mp_node *node, int *buf)
  267. {
  268. enum _starpu_mp_command answer;
  269. void *arg;
  270. int arg_size = sizeof (int);
  271. STARPU_PTHREAD_MUTEX_LOCK(&node->connection_mutex);
  272. _starpu_mp_common_send_command (node, STARPU_MP_COMMAND_SINK_NBCORES, NULL, 0);
  273. answer = _starpu_mp_common_recv_command (node, &arg, &arg_size);
  274. STARPU_ASSERT(answer == STARPU_MP_COMMAND_ANSWER_SINK_NBCORES && arg_size == sizeof (int));
  275. memcpy (buf, arg, arg_size);
  276. STARPU_PTHREAD_MUTEX_UNLOCK(&node->connection_mutex);
  277. return 0;
  278. }
  279. /* Send a request to the sink linked to NODE for the pointer to the
  280. * function defined by FUNC_NAME.
  281. * In case of success, it returns 0 and FUNC_PTR contains the pointer ;
  282. * else it returns -ESPIPE if the function was not found.
  283. */
  284. int _starpu_src_common_lookup(struct _starpu_mp_node *node,
  285. void (**func_ptr)(void), const char *func_name)
  286. {
  287. enum _starpu_mp_command answer;
  288. void *arg;
  289. int arg_size;
  290. /* strlen ignore the terminating '\0' */
  291. arg_size = (strlen(func_name) + 1) * sizeof(char);
  292. STARPU_PTHREAD_MUTEX_LOCK(&node->connection_mutex);
  293. //_STARPU_DEBUG("Looking up %s\n", func_name);
  294. _starpu_mp_common_send_command(node, STARPU_MP_COMMAND_LOOKUP, (void *) func_name,
  295. arg_size);
  296. answer = _starpu_src_common_wait_command_sync(node, (void **) &arg,
  297. &arg_size);
  298. if (answer == STARPU_MP_COMMAND_ERROR_LOOKUP)
  299. {
  300. _STARPU_DISP("Error looking up symbol %s\n", func_name);
  301. STARPU_PTHREAD_MUTEX_UNLOCK(&node->connection_mutex);
  302. return -ESPIPE;
  303. }
  304. /* We have to be sure the device answered the right question and the
  305. * answer has the right size */
  306. STARPU_ASSERT(answer == STARPU_MP_COMMAND_ANSWER_LOOKUP);
  307. STARPU_ASSERT(arg_size == sizeof(*func_ptr));
  308. memcpy(func_ptr, arg, arg_size);
  309. STARPU_PTHREAD_MUTEX_UNLOCK(&node->connection_mutex);
  310. //_STARPU_DEBUG("got %p\n", *func_ptr);
  311. return 0;
  312. }
  313. /* Send a message to the sink to execute a kernel.
  314. * The message sent has the form below :
  315. * [Function pointer on sink, number of interfaces, interfaces
  316. * (union _starpu_interface), cl_arg]
  317. */
  318. /* Launch the execution of the function KERNEL points to on the sink linked
  319. * to NODE. Returns 0 in case of success, -EINVAL if kernel is an invalid
  320. * pointer.
  321. * Data interfaces in task are send to the sink.
  322. */
  323. int _starpu_src_common_execute_kernel(struct _starpu_mp_node *node,
  324. void (*kernel)(void), unsigned coreid,
  325. enum starpu_codelet_type type,
  326. int is_parallel_task, int cb_workerid,
  327. starpu_data_handle_t *handles,
  328. void **interfaces,
  329. unsigned nb_interfaces,
  330. void *cl_arg, size_t cl_arg_size)
  331. {
  332. void *buffer, *arg =NULL;
  333. uintptr_t buffer_ptr;
  334. int buffer_size = 0, arg_size =0;
  335. unsigned i;
  336. buffer_size = sizeof(kernel) + sizeof(coreid) + sizeof(type)
  337. + sizeof(nb_interfaces) + nb_interfaces * sizeof(union _starpu_interface) + sizeof(is_parallel_task);
  338. /*if the task is parallel*/
  339. if(is_parallel_task)
  340. {
  341. buffer_size += sizeof(cb_workerid);
  342. }
  343. /* If the user didn't give any cl_arg, there is no need to send it */
  344. if (cl_arg)
  345. {
  346. STARPU_ASSERT(cl_arg_size);
  347. buffer_size += cl_arg_size;
  348. }
  349. /* We give to send_command a buffer we just allocated, which contains
  350. * a pointer to the function (sink-side), core on which execute this
  351. * function (sink-side), number of interfaces we send,
  352. * an array of generic (union) interfaces and the value of cl_arg */
  353. _STARPU_MALLOC(buffer, buffer_size);
  354. buffer_ptr = (uintptr_t) buffer;
  355. *(void(**)(void)) buffer = kernel;
  356. buffer_ptr += sizeof(kernel);
  357. *(enum starpu_codelet_type *) buffer_ptr = type;
  358. buffer_ptr += sizeof(type);
  359. *(int *) buffer_ptr = is_parallel_task;
  360. buffer_ptr += sizeof(is_parallel_task);
  361. if(is_parallel_task)
  362. {
  363. *(int *) buffer_ptr = cb_workerid ;
  364. buffer_ptr += sizeof(cb_workerid);
  365. }
  366. *(unsigned *) buffer_ptr = coreid;
  367. buffer_ptr += sizeof(coreid);
  368. *(unsigned *) buffer_ptr = nb_interfaces;
  369. buffer_ptr += sizeof(nb_interfaces);
  370. /* Message-passing execution is a particular case as the codelet is
  371. * executed on a sink with a different memory, whereas a codelet is
  372. * executed on the host part for the other accelerators.
  373. * Thus we need to send a copy of each interface on the MP device */
  374. for (i = 0; i < nb_interfaces; i++)
  375. {
  376. starpu_data_handle_t handle = handles[i];
  377. memcpy ((void*) buffer_ptr, interfaces[i],
  378. handle->ops->interface_size);
  379. /* The sink side has no mean to get the type of each
  380. * interface, we use a union to make it generic and permit the
  381. * sink to go through the array */
  382. buffer_ptr += sizeof(union _starpu_interface);
  383. }
  384. if (cl_arg)
  385. memcpy((void*) buffer_ptr, cl_arg, cl_arg_size);
  386. STARPU_PTHREAD_MUTEX_LOCK(&node->connection_mutex);
  387. _starpu_mp_common_send_command(node, STARPU_MP_COMMAND_EXECUTE, buffer, buffer_size);
  388. enum _starpu_mp_command answer = _starpu_src_common_wait_command_sync(node, &arg, &arg_size);
  389. if (answer == STARPU_MP_COMMAND_ERROR_EXECUTE)
  390. {
  391. STARPU_PTHREAD_MUTEX_UNLOCK(&node->connection_mutex);
  392. return -EINVAL;
  393. }
  394. STARPU_ASSERT(answer == STARPU_MP_COMMAND_EXECUTION_SUBMITTED);
  395. STARPU_PTHREAD_MUTEX_UNLOCK(&node->connection_mutex);
  396. free(buffer);
  397. return 0;
  398. }
  399. /* Get the information and call the function to send to the sink a message to execute the task*/
  400. static int _starpu_src_common_execute(struct _starpu_job *j,
  401. struct _starpu_worker *worker,
  402. struct _starpu_mp_node * node)
  403. {
  404. STARPU_ASSERT(j);
  405. struct starpu_task *task = j->task;
  406. int profiling = starpu_profiling_status_get();
  407. STARPU_ASSERT(task);
  408. if (worker->current_rank == 0)
  409. {
  410. int ret = _starpu_fetch_task_input(j);
  411. if (ret != 0)
  412. {
  413. /* there was not enough memory, so the input of
  414. * the codelet cannot be fetched ... put the
  415. * codelet back, and try it later */
  416. return -EAGAIN;
  417. }
  418. }
  419. void (*kernel)(void) = node->get_kernel_from_job(node,j);
  420. _starpu_driver_start_job(worker, j, &worker->perf_arch, &j->cl_start, 0, profiling);
  421. //_STARPU_DEBUG("\nworkerid:%d, rank:%d, type:%d, cb_workerid:%d, task_size:%d\n\n",worker->devid,worker->current_rank,task->cl->type,j->combined_workerid,j->task_size);
  422. _starpu_src_common_execute_kernel(node, kernel, worker->subworkerid, task->cl->type,
  423. (j->task_size > 1),
  424. j->combined_workerid, STARPU_TASK_GET_HANDLES(task),
  425. _STARPU_TASK_GET_INTERFACES(task), STARPU_TASK_GET_NBUFFERS(task),
  426. task->cl_arg, task->cl_arg_size);
  427. return 0;
  428. }
  429. /* Send a request to the sink linked to the MP_NODE to allocate SIZE bytes on
  430. * the sink.
  431. * In case of success, it returns 0 and *ADDR contains the address of the
  432. * allocated area ;
  433. * else it returns 1 if the allocation fail.
  434. */
  435. int _starpu_src_common_allocate(struct _starpu_mp_node *mp_node,
  436. void **addr, size_t size)
  437. {
  438. enum _starpu_mp_command answer;
  439. void *arg;
  440. int arg_size;
  441. STARPU_PTHREAD_MUTEX_LOCK(&mp_node->connection_mutex);
  442. _starpu_mp_common_send_command(mp_node, STARPU_MP_COMMAND_ALLOCATE, &size,
  443. sizeof(size));
  444. answer = _starpu_src_common_wait_command_sync(mp_node, &arg, &arg_size);
  445. if (answer == STARPU_MP_COMMAND_ERROR_ALLOCATE)
  446. {
  447. STARPU_PTHREAD_MUTEX_UNLOCK(&mp_node->connection_mutex);
  448. return 1;
  449. }
  450. STARPU_ASSERT(answer == STARPU_MP_COMMAND_ANSWER_ALLOCATE &&
  451. arg_size == sizeof(*addr));
  452. memcpy(addr, arg, arg_size);
  453. STARPU_PTHREAD_MUTEX_UNLOCK(&mp_node->connection_mutex);
  454. return 0;
  455. }
  456. /* Send a request to the sink linked to the MP_NODE to deallocate the memory
  457. * area pointed by ADDR.
  458. */
  459. void _starpu_src_common_free(struct _starpu_mp_node *mp_node,
  460. void *addr)
  461. {
  462. STARPU_PTHREAD_MUTEX_LOCK(&mp_node->connection_mutex);
  463. _starpu_mp_common_send_command(mp_node, STARPU_MP_COMMAND_FREE, &addr, sizeof(addr));
  464. STARPU_PTHREAD_MUTEX_UNLOCK(&mp_node->connection_mutex);
  465. }
  466. /* Send SIZE bytes pointed by SRC to DST on the sink linked to the MP_NODE with a
  467. * synchronous mode.
  468. */
  469. int _starpu_src_common_copy_host_to_sink_sync(struct _starpu_mp_node *mp_node,
  470. void *src, void *dst, size_t size)
  471. {
  472. struct _starpu_mp_transfer_command cmd = {size, dst, NULL};
  473. STARPU_PTHREAD_MUTEX_LOCK(&mp_node->connection_mutex);
  474. _starpu_mp_common_send_command(mp_node, STARPU_MP_COMMAND_RECV_FROM_HOST, &cmd, sizeof(cmd));
  475. mp_node->dt_send(mp_node, src, size, NULL);
  476. STARPU_PTHREAD_MUTEX_UNLOCK(&mp_node->connection_mutex);
  477. return 0;
  478. }
  479. /* Send SIZE bytes pointed by SRC to DST on the sink linked to the MP_NODE with an
  480. * asynchronous mode.
  481. */
  482. int _starpu_src_common_copy_host_to_sink_async(struct _starpu_mp_node *mp_node,
  483. void *src, void *dst, size_t size, void * event)
  484. {
  485. struct _starpu_mp_transfer_command cmd = {size, dst, event};
  486. STARPU_PTHREAD_MUTEX_LOCK(&mp_node->connection_mutex);
  487. /* For asynchronous transfers, we save informations
  488. * to test is they are finished
  489. */
  490. struct _starpu_async_channel * async_channel = event;
  491. async_channel->polling_node_receiver = mp_node;
  492. _starpu_mp_common_send_command(mp_node, STARPU_MP_COMMAND_RECV_FROM_HOST_ASYNC, &cmd, sizeof(cmd));
  493. mp_node->dt_send(mp_node, src, size, event);
  494. STARPU_PTHREAD_MUTEX_UNLOCK(&mp_node->connection_mutex);
  495. return -EAGAIN;
  496. }
  497. /* Receive SIZE bytes pointed by SRC on the sink linked to the MP_NODE and store them in DST
  498. * with a synchronous mode.
  499. */
  500. int _starpu_src_common_copy_sink_to_host_sync(struct _starpu_mp_node *mp_node,
  501. void *src, void *dst, size_t size)
  502. {
  503. enum _starpu_mp_command answer;
  504. void *arg;
  505. int arg_size;
  506. struct _starpu_mp_transfer_command cmd = {size, src, NULL};
  507. STARPU_PTHREAD_MUTEX_LOCK(&mp_node->connection_mutex);
  508. _starpu_mp_common_send_command(mp_node, STARPU_MP_COMMAND_SEND_TO_HOST, &cmd, sizeof(cmd));
  509. answer = _starpu_src_common_wait_command_sync(mp_node, &arg, &arg_size);
  510. STARPU_ASSERT(answer == STARPU_MP_COMMAND_SEND_TO_HOST);
  511. mp_node->dt_recv(mp_node, dst, size, NULL);
  512. STARPU_PTHREAD_MUTEX_UNLOCK(&mp_node->connection_mutex);
  513. return 0;
  514. }
  515. /* Receive SIZE bytes pointed by SRC on the sink linked to the MP_NODE and store them in DST
  516. * with an asynchronous mode.
  517. */
  518. int _starpu_src_common_copy_sink_to_host_async(struct _starpu_mp_node *mp_node,
  519. void *src, void *dst, size_t size, void * event)
  520. {
  521. struct _starpu_mp_transfer_command cmd = {size, src, event};
  522. STARPU_PTHREAD_MUTEX_LOCK(&mp_node->connection_mutex);
  523. /* For asynchronous transfers, we save informations
  524. * to test is they are finished
  525. */
  526. struct _starpu_async_channel * async_channel = event;
  527. async_channel->polling_node_sender = mp_node;
  528. _starpu_mp_common_send_command(mp_node, STARPU_MP_COMMAND_SEND_TO_HOST_ASYNC, &cmd, sizeof(cmd));
  529. mp_node->dt_recv(mp_node, dst, size, event);
  530. STARPU_PTHREAD_MUTEX_UNLOCK(&mp_node->connection_mutex);
  531. return -EAGAIN;
  532. }
  533. /* Tell the sink linked to SRC_NODE to send SIZE bytes of data pointed by SRC
  534. * to the sink linked to DST_NODE. The latter store them in DST with a synchronous
  535. * mode.
  536. */
  537. int _starpu_src_common_copy_sink_to_sink_sync(struct _starpu_mp_node *src_node,
  538. struct _starpu_mp_node *dst_node, void *src, void *dst, size_t size)
  539. {
  540. enum _starpu_mp_command answer;
  541. void *arg;
  542. int arg_size;
  543. struct _starpu_mp_transfer_command_to_device cmd = {dst_node->peer_id, size, src, NULL};
  544. /* lock the node with the little peer_id first to prevent deadlock */
  545. if (src_node->peer_id > dst_node->peer_id)
  546. {
  547. STARPU_PTHREAD_MUTEX_LOCK(&dst_node->connection_mutex);
  548. STARPU_PTHREAD_MUTEX_LOCK(&src_node->connection_mutex);
  549. }
  550. else
  551. {
  552. STARPU_PTHREAD_MUTEX_LOCK(&src_node->connection_mutex);
  553. STARPU_PTHREAD_MUTEX_LOCK(&dst_node->connection_mutex);
  554. }
  555. /* Tell source to send data to dest. */
  556. _starpu_mp_common_send_command(src_node, STARPU_MP_COMMAND_SEND_TO_SINK, &cmd, sizeof(cmd));
  557. /* Release the source as fast as possible */
  558. STARPU_PTHREAD_MUTEX_UNLOCK(&src_node->connection_mutex);
  559. cmd.devid = src_node->peer_id;
  560. cmd.size = size;
  561. cmd.addr = dst;
  562. /* Tell dest to receive data from source. */
  563. _starpu_mp_common_send_command(dst_node, STARPU_MP_COMMAND_RECV_FROM_SINK, &cmd, sizeof(cmd));
  564. /* Wait for answer from dest to know wether transfer is finished. */
  565. answer = _starpu_src_common_wait_command_sync(dst_node, &arg, &arg_size);
  566. STARPU_ASSERT(answer == STARPU_MP_COMMAND_TRANSFER_COMPLETE);
  567. /* Release the receiver when we received the acknowlegment */
  568. STARPU_PTHREAD_MUTEX_UNLOCK(&dst_node->connection_mutex);
  569. return 0;
  570. }
  571. /* Tell the sink linked to SRC_NODE to send SIZE bytes of data pointed by SRC
  572. * to the sink linked to DST_NODE. The latter store them in DST with an asynchronous
  573. * mode.
  574. */
  575. int _starpu_src_common_copy_sink_to_sink_async(struct _starpu_mp_node *src_node,
  576. struct _starpu_mp_node *dst_node, void *src, void *dst, size_t size, void * event)
  577. {
  578. struct _starpu_mp_transfer_command_to_device cmd = {dst_node->peer_id, size, src, event};
  579. /* lock the node with the little peer_id first to prevent deadlock */
  580. if (src_node->peer_id > dst_node->peer_id)
  581. {
  582. STARPU_PTHREAD_MUTEX_LOCK(&dst_node->connection_mutex);
  583. STARPU_PTHREAD_MUTEX_LOCK(&src_node->connection_mutex);
  584. }
  585. else
  586. {
  587. STARPU_PTHREAD_MUTEX_LOCK(&src_node->connection_mutex);
  588. STARPU_PTHREAD_MUTEX_LOCK(&dst_node->connection_mutex);
  589. }
  590. /* For asynchronous transfers, we save informations
  591. * to test is they are finished
  592. */
  593. struct _starpu_async_channel * async_channel = event;
  594. async_channel->polling_node_sender = src_node;
  595. async_channel->polling_node_receiver = dst_node;
  596. /* Increase number of ack waited */
  597. async_channel->starpu_mp_common_finished_receiver++;
  598. async_channel->starpu_mp_common_finished_sender++;
  599. /* Tell source to send data to dest. */
  600. _starpu_mp_common_send_command(src_node, STARPU_MP_COMMAND_SEND_TO_SINK_ASYNC, &cmd, sizeof(cmd));
  601. STARPU_PTHREAD_MUTEX_UNLOCK(&src_node->connection_mutex);
  602. cmd.devid = src_node->peer_id;
  603. cmd.size = size;
  604. cmd.addr = dst;
  605. /* Tell dest to receive data from source. */
  606. _starpu_mp_common_send_command(dst_node, STARPU_MP_COMMAND_RECV_FROM_SINK_ASYNC, &cmd, sizeof(cmd));
  607. STARPU_PTHREAD_MUTEX_UNLOCK(&dst_node->connection_mutex);
  608. return -EAGAIN;
  609. }
  610. /* 5 functions to determine the executable to run on the device (MIC, SCC,
  611. * MPI).
  612. */
  613. static void _starpu_src_common_cat_3(char *final, const char *first,
  614. const char *second, const char *third)
  615. {
  616. strcpy(final, first);
  617. strcat(final, second);
  618. strcat(final, third);
  619. }
  620. static void _starpu_src_common_cat_2(char *final, const char *first, const char *second)
  621. {
  622. _starpu_src_common_cat_3(final, first, second, "");
  623. }
  624. static void _starpu_src_common_dir_cat(char *final, const char *dir, const char *file)
  625. {
  626. if (file[0] == '/')
  627. ++file;
  628. size_t size = strlen(dir);
  629. if (dir[size - 1] == '/')
  630. _starpu_src_common_cat_2(final, dir, file);
  631. else
  632. _starpu_src_common_cat_3(final, dir, "/", file);
  633. }
  634. static int _starpu_src_common_test_suffixes(char *located_file_name, const char *base, const char **suffixes)
  635. {
  636. unsigned int i;
  637. for (i = 0; suffixes[i] != NULL; ++i)
  638. {
  639. _starpu_src_common_cat_2(located_file_name, base, suffixes[i]);
  640. if (access(located_file_name, R_OK) == 0)
  641. return 0;
  642. }
  643. return 1;
  644. }
  645. int _starpu_src_common_locate_file(char *located_file_name,
  646. const char *env_file_name, const char *env_mic_path,
  647. const char *config_file_name, const char *actual_file_name,
  648. const char **suffixes)
  649. {
  650. if (env_file_name != NULL)
  651. {
  652. if (access(env_file_name, R_OK) == 0)
  653. {
  654. strcpy(located_file_name, env_file_name);
  655. return 0;
  656. }
  657. else if(env_mic_path != NULL)
  658. {
  659. _starpu_src_common_dir_cat(located_file_name, env_mic_path, env_file_name);
  660. return access(located_file_name, R_OK);
  661. }
  662. }
  663. else if (config_file_name != NULL)
  664. {
  665. if (access(config_file_name, R_OK) == 0)
  666. {
  667. strcpy(located_file_name, config_file_name);
  668. return 0;
  669. }
  670. else if (env_mic_path != NULL)
  671. {
  672. _starpu_src_common_dir_cat(located_file_name, env_mic_path, config_file_name);
  673. return access(located_file_name, R_OK);
  674. }
  675. }
  676. else if (actual_file_name != NULL)
  677. {
  678. if (_starpu_src_common_test_suffixes(located_file_name, actual_file_name, suffixes) == 0)
  679. return 0;
  680. if (env_mic_path != NULL)
  681. {
  682. char actual_cpy[1024];
  683. strcpy(actual_cpy, actual_file_name);
  684. char *last = strrchr(actual_cpy, '/');
  685. while (last != NULL)
  686. {
  687. char tmp[1024];
  688. _starpu_src_common_dir_cat(tmp, env_mic_path, last);
  689. if (access(tmp, R_OK) == 0)
  690. {
  691. strcpy(located_file_name, tmp);
  692. return 0;
  693. }
  694. if (_starpu_src_common_test_suffixes(located_file_name, tmp, suffixes) == 0)
  695. return 0;
  696. *last = '\0';
  697. char *last_tmp = strrchr(actual_cpy, '/');
  698. *last = '/';
  699. last = last_tmp;
  700. }
  701. }
  702. }
  703. return 1;
  704. }
  705. #if defined(STARPU_USE_MPI_MASTER_SLAVE) && !defined(STARPU_MPI_MASTER_SLAVE_MULTIPLE_THREAD)
  706. void _starpu_src_common_init_switch_env(unsigned this)
  707. {
  708. save_thread_env[this].current_task = starpu_task_get_current();
  709. save_thread_env[this].current_worker = STARPU_PTHREAD_GETSPECIFIC(_starpu_worker_key);
  710. save_thread_env[this].current_worker_set = STARPU_PTHREAD_GETSPECIFIC(_starpu_worker_set_key);
  711. save_thread_env[this].current_mem_node = STARPU_PTHREAD_GETSPECIFIC(_starpu_memory_node_key);
  712. #ifdef STARPU_OPENMP
  713. save_thread_env[this].current_omp_thread = STARPU_PTHREAD_GETSPECIFIC(omp_thread_key);
  714. save_thread_env[this].current_omp_task = STARPU_PTHREAD_GETSPECIFIC(omp_task_key);
  715. #endif
  716. }
  717. static void _starpu_src_common_switch_env(unsigned old, unsigned new)
  718. {
  719. save_thread_env[old].current_task = starpu_task_get_current();
  720. save_thread_env[old].current_worker = STARPU_PTHREAD_GETSPECIFIC(_starpu_worker_key);
  721. save_thread_env[old].current_worker_set = STARPU_PTHREAD_GETSPECIFIC(_starpu_worker_set_key);
  722. save_thread_env[old].current_mem_node = STARPU_PTHREAD_GETSPECIFIC(_starpu_memory_node_key);
  723. #ifdef STARPU_OPENMP
  724. save_thread_env[old].current_omp_thread = STARPU_PTHREAD_GETSPECIFIC(omp_thread_key);
  725. save_thread_env[old].current_omp_task = STARPU_PTHREAD_GETSPECIFIC(omp_task_key);
  726. #endif
  727. _starpu_set_current_task(save_thread_env[new].current_task);
  728. STARPU_PTHREAD_SETSPECIFIC(_starpu_worker_key, save_thread_env[new].current_worker);
  729. STARPU_PTHREAD_SETSPECIFIC(_starpu_worker_set_key, save_thread_env[new].current_worker_set);
  730. STARPU_PTHREAD_SETSPECIFIC(_starpu_memory_node_key, save_thread_env[new].current_mem_node);
  731. #ifdef STARPU_OPENMP
  732. STARPU_PTHREAD_SETSPECIFIC(omp_thread_key, save_thread_env[new].current_omp_thread);
  733. STARPU_PTHREAD_SETSPECIFIC(omp_task_key, save_thread_env[new].current_omp_task);
  734. #endif
  735. }
  736. #endif
  737. /* Send workers to the sink node
  738. */
  739. static void _starpu_src_common_send_workers(struct _starpu_mp_node * node, int baseworkerid, int nworkers)
  740. {
  741. struct _starpu_machine_config *config = _starpu_get_machine_config();
  742. int worker_size = sizeof(struct _starpu_worker)*nworkers;
  743. int combined_worker_size = STARPU_NMAX_COMBINEDWORKERS*sizeof(struct _starpu_combined_worker);
  744. int msg[5];
  745. msg[0] = nworkers;
  746. msg[1] = worker_size;
  747. msg[2] = combined_worker_size;
  748. msg[3] = baseworkerid;
  749. msg[4] = starpu_worker_get_count();
  750. STARPU_PTHREAD_MUTEX_LOCK(&node->connection_mutex);
  751. /* tell the sink node that we will send him all workers */
  752. _starpu_mp_common_send_command(node, STARPU_MP_COMMAND_SYNC_WORKERS,
  753. &msg, sizeof(msg));
  754. /* Send all worker to the sink node */
  755. node->dt_send(node,&config->workers[baseworkerid],worker_size, NULL);
  756. /* Send all combined workers to the sink node */
  757. node->dt_send(node, &config->combined_workers,combined_worker_size, NULL);
  758. STARPU_PTHREAD_MUTEX_UNLOCK(&node->connection_mutex);
  759. }
  760. static void _starpu_src_common_worker_internal_work(struct _starpu_worker_set * worker_set, struct _starpu_mp_node * mp_node, struct starpu_task **tasks, unsigned memnode)
  761. {
  762. int res = 0;
  763. _starpu_may_pause();
  764. #ifdef STARPU_SIMGRID
  765. starpu_pthread_wait_reset(&worker_set->workers[0].wait);
  766. #endif
  767. _STARPU_TRACE_START_PROGRESS(memnode);
  768. res |= __starpu_datawizard_progress(1, 1);
  769. _STARPU_TRACE_END_PROGRESS(memnode);
  770. /* Handle message which have been store */
  771. _starpu_src_common_handle_stored_async(mp_node);
  772. STARPU_PTHREAD_MUTEX_LOCK(&mp_node->connection_mutex);
  773. /* poll the device for completed jobs.*/
  774. while(mp_node->mp_recv_is_ready(mp_node))
  775. {
  776. _starpu_src_common_recv_async(mp_node);
  777. /* Mutex is unlock in _starpu_src_common_recv_async */
  778. STARPU_PTHREAD_MUTEX_LOCK(&mp_node->connection_mutex);
  779. }
  780. STARPU_PTHREAD_MUTEX_UNLOCK(&mp_node->connection_mutex);
  781. /* get task for each worker*/
  782. res |= _starpu_get_multi_worker_task(worker_set->workers, tasks, worker_set->nworkers, memnode);
  783. #ifdef STARPU_SIMGRID
  784. if (!res)
  785. starpu_pthread_wait_wait(&worker_set->workers[0].wait);
  786. #endif
  787. /*if at least one worker have pop a task*/
  788. if(res != 0)
  789. {
  790. unsigned i;
  791. for(i=0; i<worker_set->nworkers; i++)
  792. {
  793. if(tasks[i] != NULL)
  794. {
  795. struct _starpu_job * j = _starpu_get_job_associated_to_task(tasks[i]);
  796. _starpu_set_local_worker_key(&worker_set->workers[i]);
  797. res = _starpu_src_common_execute(j, &worker_set->workers[i], mp_node);
  798. switch (res)
  799. {
  800. case 0:
  801. /* The task task has been launched with no error */
  802. break;
  803. case -EAGAIN:
  804. _STARPU_DISP("ouch, this MP worker could not actually run task %p, putting it back...\n", tasks[i]);
  805. _starpu_push_task_to_workers(tasks[i]);
  806. STARPU_ABORT();
  807. continue;
  808. break;
  809. default:
  810. STARPU_ASSERT(0);
  811. }
  812. }
  813. }
  814. }
  815. /* Handle message which have been store */
  816. _starpu_src_common_handle_stored_async(mp_node);
  817. }
  818. #if defined(STARPU_USE_MPI_MASTER_SLAVE) && !defined(STARPU_MPI_MASTER_SLAVE_MULTIPLE_THREAD)
  819. /* Function looping on the source node */
  820. void _starpu_src_common_workers_set(struct _starpu_worker_set * worker_set,
  821. int ndevices,
  822. struct _starpu_mp_node ** mp_node)
  823. {
  824. unsigned memnode[ndevices];
  825. unsigned offsetmemnode[ndevices];
  826. memset(offsetmemnode, 0, ndevices*sizeof(unsigned));
  827. int device;
  828. int nbworkers = 0;
  829. for (device = 0; device < ndevices; device++)
  830. {
  831. memnode[device] = worker_set[device].workers[0].memory_node;
  832. nbworkers += worker_set[device].nworkers;
  833. if (device != 0)
  834. offsetmemnode[device] += offsetmemnode[device-1];
  835. if (device != ndevices -1)
  836. offsetmemnode[device+1] += worker_set[device].nworkers;
  837. }
  838. struct starpu_task **tasks;
  839. _STARPU_MALLOC(tasks, sizeof(struct starpu_task *)*nbworkers);
  840. for (device = 0; device < ndevices; device++)
  841. {
  842. struct _starpu_worker *baseworker = &worker_set[device].workers[0];
  843. struct _starpu_machine_config *config = baseworker->config;
  844. unsigned baseworkerid = baseworker - config->workers;
  845. _starpu_src_common_send_workers(mp_node[device], baseworkerid, worker_set[device].nworkers);
  846. }
  847. /*main loop*/
  848. while (_starpu_machine_is_running())
  849. {
  850. for (device = 0; device < ndevices ; device++)
  851. {
  852. _starpu_src_common_switch_env(((device-1)+ndevices)%ndevices, device);
  853. _starpu_src_common_worker_internal_work(&worker_set[device], mp_node[device], tasks+offsetmemnode[device], memnode[device]);
  854. }
  855. }
  856. free(tasks);
  857. for (device = 0; device < ndevices; device++)
  858. _starpu_handle_all_pending_node_data_requests(memnode[device]);
  859. /* In case there remains some memory that was automatically
  860. * allocated by StarPU, we release it now. Note that data
  861. * coherency is not maintained anymore at that point ! */
  862. for (device = 0; device < ndevices; device++)
  863. _starpu_free_all_automatically_allocated_buffers(memnode[device]);
  864. }
  865. #endif
  866. /* Function looping on the source node */
  867. void _starpu_src_common_worker(struct _starpu_worker_set * worker_set,
  868. unsigned baseworkerid,
  869. struct _starpu_mp_node * mp_node)
  870. {
  871. unsigned memnode = worker_set->workers[0].memory_node;
  872. struct starpu_task **tasks;
  873. _STARPU_MALLOC(tasks, sizeof(struct starpu_task *)*worker_set->nworkers);
  874. _starpu_src_common_send_workers(mp_node, baseworkerid, worker_set->nworkers);
  875. /*main loop*/
  876. while (_starpu_machine_is_running())
  877. {
  878. _starpu_src_common_worker_internal_work(worker_set, mp_node, tasks, memnode);
  879. }
  880. free(tasks);
  881. _starpu_handle_all_pending_node_data_requests(memnode);
  882. /* In case there remains some memory that was automatically
  883. * allocated by StarPU, we release it now. Note that data
  884. * coherency is not maintained anymore at that point ! */
  885. _starpu_free_all_automatically_allocated_buffers(memnode);
  886. }