source_common.c 37 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2012, 2016 INRIA
  4. *
  5. * StarPU is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * StarPU is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <string.h>
  17. #include <starpu.h>
  18. #include <core/task.h>
  19. #include <core/sched_policy.h>
  20. #include <drivers/driver_common/driver_common.h>
  21. #include <datawizard/coherency.h>
  22. #include <datawizard/memory_nodes.h>
  23. #include <datawizard/interfaces/data_interface.h>
  24. #include <drivers/mp_common/mp_common.h>
  25. #if defined(STARPU_USE_MPI_MASTER_SLAVE) && !defined(STARPU_MPI_MASTER_SLAVE_MULTIPLE_THREAD)
  26. struct starpu_save_thread_env
  27. {
  28. struct starpu_task * current_task;
  29. struct _starpu_worker * current_worker;
  30. struct _starpu_worker_set * current_worker_set;
  31. unsigned * current_mem_node;
  32. #ifdef STARPU_OPENMP
  33. struct starpu_omp_thread * current_omp_thread;
  34. struct starpu_omp_task * current_omp_task;
  35. #endif
  36. };
  37. struct starpu_save_thread_env save_thread_env[STARPU_MAXMPIDEVS];
  38. #endif
  39. /* Finalize the execution of a task by a worker*/
  40. static int _starpu_src_common_finalize_job (struct _starpu_job *j, struct _starpu_worker *worker)
  41. {
  42. int profiling = starpu_profiling_status_get();
  43. struct timespec codelet_end;
  44. _starpu_driver_end_job(worker, j, &worker->perf_arch, &codelet_end, 0,
  45. profiling);
  46. int count = worker->current_rank;
  47. /* If it's a combined worker, we check if it's the last one of his combined */
  48. if(j->task_size > 1)
  49. {
  50. struct _starpu_combined_worker * cb_worker = _starpu_get_combined_worker_struct(worker->combined_workerid);
  51. STARPU_PTHREAD_MUTEX_LOCK(&cb_worker->count_mutex);
  52. count = cb_worker->count--;
  53. if(count == 0)
  54. cb_worker->count = cb_worker->worker_size - 1;
  55. STARPU_PTHREAD_MUTEX_UNLOCK(&cb_worker->count_mutex);
  56. }
  57. /* Finalize the execution */
  58. if(count == 0)
  59. {
  60. _starpu_driver_update_job_feedback(j, worker, &worker->perf_arch,
  61. &j->cl_start, &codelet_end,
  62. profiling);
  63. _starpu_push_task_output (j);
  64. _starpu_handle_job_termination(j);
  65. }
  66. return 0;
  67. }
  68. /* Complete the execution of the job */
  69. static int _starpu_src_common_process_completed_job(struct _starpu_mp_node *node, struct _starpu_worker_set *workerset, void * arg, int arg_size, int stored)
  70. {
  71. int coreid;
  72. STARPU_ASSERT(sizeof(coreid) == arg_size);
  73. coreid = *(int *) arg;
  74. struct _starpu_worker *worker = &workerset->workers[coreid];
  75. struct _starpu_job *j = _starpu_get_job_associated_to_task(worker->current_task);
  76. struct _starpu_worker * old_worker = _starpu_get_local_worker_key();
  77. /* if arg is not copied we release the mutex */
  78. if (!stored)
  79. STARPU_PTHREAD_MUTEX_UNLOCK(&node->connection_mutex);
  80. _starpu_set_local_worker_key(worker);
  81. _starpu_src_common_finalize_job (j, worker);
  82. _starpu_set_local_worker_key(old_worker);
  83. worker->current_task = NULL;
  84. return 0;
  85. }
  86. /* Tell the scheduler when the execution has begun */
  87. static void _starpu_src_common_pre_exec(struct _starpu_mp_node *node, void * arg, int arg_size, int stored)
  88. {
  89. int cb_workerid, i;
  90. STARPU_ASSERT(sizeof(cb_workerid) == arg_size);
  91. cb_workerid = *(int *) arg;
  92. struct _starpu_combined_worker *combined_worker = _starpu_get_combined_worker_struct(cb_workerid);
  93. /* if arg is not copied we release the mutex */
  94. if (!stored)
  95. STARPU_PTHREAD_MUTEX_LOCK(&node->connection_mutex);
  96. for(i=0; i < combined_worker->worker_size; i++)
  97. {
  98. struct _starpu_worker * worker = _starpu_get_worker_struct(combined_worker->combined_workerid[i]);
  99. _starpu_set_local_worker_key(worker);
  100. _starpu_sched_pre_exec_hook(worker->current_task);
  101. }
  102. }
  103. /* recv a message and handle asynchronous message
  104. * return 0 if the message has not been handle (it's certainly mean that it's a synchronous message)
  105. * return 1 if the message has been handle
  106. */
  107. static int _starpu_src_common_handle_async(struct _starpu_mp_node *node,
  108. void * arg, int arg_size,
  109. enum _starpu_mp_command answer, int stored)
  110. {
  111. struct _starpu_worker_set * worker_set = NULL;
  112. switch(answer)
  113. {
  114. case STARPU_MP_COMMAND_EXECUTION_COMPLETED:
  115. worker_set = _starpu_get_worker_struct(starpu_worker_get_id())->set;
  116. _starpu_src_common_process_completed_job(node, worker_set, arg, arg_size, stored);
  117. break;
  118. case STARPU_MP_COMMAND_PRE_EXECUTION:
  119. _starpu_src_common_pre_exec(node, arg,arg_size, stored);
  120. break;
  121. case STARPU_MP_COMMAND_RECV_FROM_HOST_ASYNC_COMPLETED:
  122. case STARPU_MP_COMMAND_RECV_FROM_SINK_ASYNC_COMPLETED:
  123. {
  124. struct _starpu_async_channel * event = *((struct _starpu_async_channel **) arg);
  125. event->starpu_mp_common_finished_receiver--;
  126. if (!stored)
  127. STARPU_PTHREAD_MUTEX_UNLOCK(&node->connection_mutex);
  128. break;
  129. }
  130. case STARPU_MP_COMMAND_SEND_TO_HOST_ASYNC_COMPLETED:
  131. case STARPU_MP_COMMAND_SEND_TO_SINK_ASYNC_COMPLETED:
  132. {
  133. struct _starpu_async_channel * event = *((struct _starpu_async_channel **) arg);
  134. event->starpu_mp_common_finished_sender--;
  135. if (!stored)
  136. STARPU_PTHREAD_MUTEX_UNLOCK(&node->connection_mutex);
  137. break;
  138. }
  139. default:
  140. return 0;
  141. break;
  142. }
  143. return 1;
  144. }
  145. /* Handle all message which have been stored in the message_queue */
  146. static void _starpu_src_common_handle_stored_async(struct _starpu_mp_node *node)
  147. {
  148. STARPU_PTHREAD_MUTEX_LOCK(&node->message_queue_mutex);
  149. /* while the list is not empty */
  150. while(!mp_message_list_empty(&node->message_queue))
  151. {
  152. /* We pop a message and handle it */
  153. struct mp_message * message = mp_message_list_pop_back(&node->message_queue);
  154. /* Release mutex during handle */
  155. STARPU_PTHREAD_MUTEX_UNLOCK(&node->message_queue_mutex);
  156. _starpu_src_common_handle_async(node, message->buffer,
  157. message->size, message->type, 1);
  158. free(message->buffer);
  159. mp_message_delete(message);
  160. /* Take it again */
  161. STARPU_PTHREAD_MUTEX_LOCK(&node->message_queue_mutex);
  162. }
  163. STARPU_PTHREAD_MUTEX_UNLOCK(&node->message_queue_mutex);
  164. }
  165. /* Store a message if is asynchronous
  166. * return 1 if the message has been stored
  167. * return 0 if the message is unknown or synchrone */
  168. int _starpu_src_common_store_message(struct _starpu_mp_node *node,
  169. void * arg, int arg_size, enum _starpu_mp_command answer)
  170. {
  171. struct mp_message * message = NULL;
  172. switch(answer)
  173. {
  174. case STARPU_MP_COMMAND_EXECUTION_COMPLETED:
  175. case STARPU_MP_COMMAND_PRE_EXECUTION:
  176. message = mp_message_new();
  177. message->type = answer;
  178. _STARPU_MALLOC(message->buffer, arg_size);
  179. memcpy(message->buffer, arg, arg_size);
  180. message->size = arg_size;
  181. STARPU_PTHREAD_MUTEX_LOCK(&node->message_queue_mutex);
  182. mp_message_list_push_front(&node->message_queue,message);
  183. STARPU_PTHREAD_MUTEX_UNLOCK(&node->message_queue_mutex);
  184. return 1;
  185. break;
  186. /* For ASYNC commands don't store them, update event */
  187. case STARPU_MP_COMMAND_RECV_FROM_HOST_ASYNC_COMPLETED:
  188. case STARPU_MP_COMMAND_RECV_FROM_SINK_ASYNC_COMPLETED:
  189. {
  190. struct _starpu_async_channel * event = *((struct _starpu_async_channel **) arg);
  191. event->starpu_mp_common_finished_receiver--;
  192. return 1;
  193. break;
  194. }
  195. case STARPU_MP_COMMAND_SEND_TO_HOST_ASYNC_COMPLETED:
  196. case STARPU_MP_COMMAND_SEND_TO_SINK_ASYNC_COMPLETED:
  197. {
  198. struct _starpu_async_channel * event = *((struct _starpu_async_channel **) arg);
  199. event->starpu_mp_common_finished_sender--;
  200. return 1;
  201. break;
  202. }
  203. default:
  204. return 0;
  205. break;
  206. }
  207. }
  208. /* Store all asynchronous messages and return when a synchronous message is received */
  209. static enum _starpu_mp_command _starpu_src_common_wait_command_sync(struct _starpu_mp_node *node,
  210. void ** arg, int* arg_size)
  211. {
  212. enum _starpu_mp_command answer;
  213. int is_sync = 0;
  214. while(!is_sync)
  215. {
  216. answer = _starpu_mp_common_recv_command(node, arg, arg_size);
  217. if(!_starpu_src_common_store_message(node,*arg,*arg_size,answer))
  218. is_sync=1;
  219. }
  220. return answer;
  221. }
  222. /* Handle a asynchrone message and return a error if a synchronous message is received */
  223. static void _starpu_src_common_recv_async(struct _starpu_mp_node * node)
  224. {
  225. enum _starpu_mp_command answer;
  226. void *arg;
  227. int arg_size;
  228. answer = _starpu_mp_common_recv_command(node, &arg, &arg_size);
  229. if(!_starpu_src_common_handle_async(node,arg,arg_size,answer, 0))
  230. {
  231. printf("incorrect commande: unknown command or sync command");
  232. STARPU_ASSERT(0);
  233. }
  234. }
  235. /* Handle all asynchrone message while a completed execution message from a specific worker has been receive */
  236. enum _starpu_mp_command _starpu_src_common_wait_completed_execution(struct _starpu_mp_node *node, int devid, void **arg, int * arg_size)
  237. {
  238. enum _starpu_mp_command answer;
  239. int completed = 0;
  240. /*While the waited completed execution message has not been receive*/
  241. while(!completed)
  242. {
  243. answer = _starpu_mp_common_recv_command (node, arg, arg_size);
  244. if(answer == STARPU_MP_COMMAND_EXECUTION_COMPLETED)
  245. {
  246. int coreid;
  247. STARPU_ASSERT(sizeof(coreid) == *arg_size);
  248. coreid = *(int *) *arg;
  249. if(devid == coreid)
  250. completed = 1;
  251. else
  252. if(!_starpu_src_common_store_message(node, *arg, *arg_size, answer))
  253. /* We receive a unknown or asynchronous message */
  254. STARPU_ASSERT(0);
  255. }
  256. else
  257. {
  258. if(!_starpu_src_common_store_message(node, *arg, *arg_size, answer))
  259. /* We receive a unknown or asynchronous message */
  260. STARPU_ASSERT(0);
  261. }
  262. }
  263. return answer;
  264. }
  265. /* Send a request to the sink NODE for the number of cores on it. */
  266. int _starpu_src_common_sink_nbcores (struct _starpu_mp_node *node, int *buf)
  267. {
  268. enum _starpu_mp_command answer;
  269. void *arg;
  270. int arg_size = sizeof (int);
  271. STARPU_PTHREAD_MUTEX_LOCK(&node->connection_mutex);
  272. _starpu_mp_common_send_command (node, STARPU_MP_COMMAND_SINK_NBCORES, NULL, 0);
  273. answer = _starpu_mp_common_recv_command (node, &arg, &arg_size);
  274. STARPU_ASSERT(answer == STARPU_MP_COMMAND_ANSWER_SINK_NBCORES && arg_size == sizeof (int));
  275. memcpy (buf, arg, arg_size);
  276. STARPU_PTHREAD_MUTEX_UNLOCK(&node->connection_mutex);
  277. return 0;
  278. }
  279. /* Send a request to the sink linked to NODE for the pointer to the
  280. * function defined by FUNC_NAME.
  281. * In case of success, it returns 0 and FUNC_PTR contains the pointer ;
  282. * else it returns -ESPIPE if the function was not found.
  283. */
  284. int _starpu_src_common_lookup(struct _starpu_mp_node *node,
  285. void (**func_ptr)(void), const char *func_name)
  286. {
  287. enum _starpu_mp_command answer;
  288. void *arg;
  289. int arg_size;
  290. /* strlen ignore the terminating '\0' */
  291. arg_size = (strlen(func_name) + 1) * sizeof(char);
  292. STARPU_PTHREAD_MUTEX_LOCK(&node->connection_mutex);
  293. //_STARPU_DEBUG("Looking up %s\n", func_name);
  294. _starpu_mp_common_send_command(node, STARPU_MP_COMMAND_LOOKUP, (void *) func_name,
  295. arg_size);
  296. answer = _starpu_src_common_wait_command_sync(node, (void **) &arg,
  297. &arg_size);
  298. if (answer == STARPU_MP_COMMAND_ERROR_LOOKUP)
  299. {
  300. _STARPU_DISP("Error looking up symbol %s\n", func_name);
  301. STARPU_PTHREAD_MUTEX_UNLOCK(&node->connection_mutex);
  302. return -ESPIPE;
  303. }
  304. /* We have to be sure the device answered the right question and the
  305. * answer has the right size */
  306. STARPU_ASSERT(answer == STARPU_MP_COMMAND_ANSWER_LOOKUP);
  307. STARPU_ASSERT(arg_size == sizeof(*func_ptr));
  308. memcpy(func_ptr, arg, arg_size);
  309. STARPU_PTHREAD_MUTEX_UNLOCK(&node->connection_mutex);
  310. //_STARPU_DEBUG("got %p\n", *func_ptr);
  311. return 0;
  312. }
  313. /* Send a message to the sink to execute a kernel.
  314. * The message sent has the form below :
  315. * [Function pointer on sink, number of interfaces, interfaces
  316. * (union _starpu_interface), cl_arg]
  317. */
  318. /* Launch the execution of the function KERNEL points to on the sink linked
  319. * to NODE. Returns 0 in case of success, -EINVAL if kernel is an invalid
  320. * pointer.
  321. * Data interfaces in task are send to the sink.
  322. */
  323. int _starpu_src_common_execute_kernel(struct _starpu_mp_node *node,
  324. void (*kernel)(void), unsigned coreid,
  325. enum starpu_codelet_type type,
  326. int is_parallel_task, int cb_workerid,
  327. starpu_data_handle_t *handles,
  328. void **interfaces,
  329. unsigned nb_interfaces,
  330. void *cl_arg, size_t cl_arg_size)
  331. {
  332. void *buffer, *arg =NULL;
  333. uintptr_t buffer_ptr;
  334. int buffer_size = 0, arg_size =0;
  335. unsigned i;
  336. buffer_size = sizeof(kernel) + sizeof(coreid) + sizeof(type)
  337. + sizeof(nb_interfaces) + nb_interfaces * sizeof(union _starpu_interface) + sizeof(is_parallel_task);
  338. /*if the task is parallel*/
  339. if(is_parallel_task)
  340. {
  341. buffer_size += sizeof(cb_workerid);
  342. }
  343. /* If the user didn't give any cl_arg, there is no need to send it */
  344. if (cl_arg)
  345. {
  346. STARPU_ASSERT(cl_arg_size);
  347. buffer_size += cl_arg_size;
  348. }
  349. /* We give to send_command a buffer we just allocated, which contains
  350. * a pointer to the function (sink-side), core on which execute this
  351. * function (sink-side), number of interfaces we send,
  352. * an array of generic (union) interfaces and the value of cl_arg */
  353. _STARPU_MALLOC(buffer, buffer_size);
  354. buffer_ptr = (uintptr_t) buffer;
  355. *(void(**)(void)) buffer = kernel;
  356. buffer_ptr += sizeof(kernel);
  357. *(enum starpu_codelet_type *) buffer_ptr = type;
  358. buffer_ptr += sizeof(type);
  359. *(int *) buffer_ptr = is_parallel_task;
  360. buffer_ptr += sizeof(is_parallel_task);
  361. if(is_parallel_task)
  362. {
  363. *(int *) buffer_ptr = cb_workerid ;
  364. buffer_ptr += sizeof(cb_workerid);
  365. }
  366. *(unsigned *) buffer_ptr = coreid;
  367. buffer_ptr += sizeof(coreid);
  368. *(unsigned *) buffer_ptr = nb_interfaces;
  369. buffer_ptr += sizeof(nb_interfaces);
  370. /* Message-passing execution is a particular case as the codelet is
  371. * executed on a sink with a different memory, whereas a codelet is
  372. * executed on the host part for the other accelerators.
  373. * Thus we need to send a copy of each interface on the MP device */
  374. for (i = 0; i < nb_interfaces; i++)
  375. {
  376. starpu_data_handle_t handle = handles[i];
  377. memcpy ((void*) buffer_ptr, interfaces[i],
  378. handle->ops->interface_size);
  379. /* The sink side has no mean to get the type of each
  380. * interface, we use a union to make it generic and permit the
  381. * sink to go through the array */
  382. buffer_ptr += sizeof(union _starpu_interface);
  383. }
  384. if (cl_arg)
  385. memcpy((void*) buffer_ptr, cl_arg, cl_arg_size);
  386. STARPU_PTHREAD_MUTEX_LOCK(&node->connection_mutex);
  387. _starpu_mp_common_send_command(node, STARPU_MP_COMMAND_EXECUTE, buffer, buffer_size);
  388. enum _starpu_mp_command answer = _starpu_src_common_wait_command_sync(node, &arg, &arg_size);
  389. if (answer == STARPU_MP_COMMAND_ERROR_EXECUTE)
  390. {
  391. STARPU_PTHREAD_MUTEX_UNLOCK(&node->connection_mutex);
  392. return -EINVAL;
  393. }
  394. STARPU_ASSERT(answer == STARPU_MP_COMMAND_EXECUTION_SUBMITTED);
  395. STARPU_PTHREAD_MUTEX_UNLOCK(&node->connection_mutex);
  396. free(buffer);
  397. return 0;
  398. }
  399. /* Get the information and call the function to send to the sink a message to execute the task*/
  400. static int _starpu_src_common_execute(struct _starpu_job *j,
  401. struct _starpu_worker *worker,
  402. struct _starpu_mp_node * node)
  403. {
  404. STARPU_ASSERT(j);
  405. struct starpu_task *task = j->task;
  406. int profiling = starpu_profiling_status_get();
  407. STARPU_ASSERT(task);
  408. if (worker->current_rank == 0)
  409. {
  410. int ret = _starpu_fetch_task_input(task, j, 0);
  411. if (ret != 0)
  412. {
  413. /* there was not enough memory, so the input of
  414. * the codelet cannot be fetched ... put the
  415. * codelet back, and try it later */
  416. return -EAGAIN;
  417. }
  418. }
  419. void (*kernel)(void) = node->get_kernel_from_job(node,j);
  420. _starpu_driver_start_job(worker, j, &worker->perf_arch, &j->cl_start, 0, profiling);
  421. //_STARPU_DEBUG("\nworkerid:%d, rank:%d, type:%d, cb_workerid:%d, task_size:%d\n\n",worker->devid,worker->current_rank,task->cl->type,j->combined_workerid,j->task_size);
  422. _starpu_src_common_execute_kernel(node, kernel, worker->subworkerid, task->cl->type,
  423. (j->task_size > 1),
  424. j->combined_workerid, STARPU_TASK_GET_HANDLES(task),
  425. _STARPU_TASK_GET_INTERFACES(task), STARPU_TASK_GET_NBUFFERS(task),
  426. task->cl_arg, task->cl_arg_size);
  427. return 0;
  428. }
  429. /* Send a request to the sink linked to the MP_NODE to allocate SIZE bytes on
  430. * the sink.
  431. * In case of success, it returns 0 and *ADDR contains the address of the
  432. * allocated area ;
  433. * else it returns 1 if the allocation fail.
  434. */
  435. int _starpu_src_common_allocate(struct _starpu_mp_node *mp_node,
  436. void **addr, size_t size)
  437. {
  438. enum _starpu_mp_command answer;
  439. void *arg;
  440. int arg_size;
  441. STARPU_PTHREAD_MUTEX_LOCK(&mp_node->connection_mutex);
  442. _starpu_mp_common_send_command(mp_node, STARPU_MP_COMMAND_ALLOCATE, &size,
  443. sizeof(size));
  444. answer = _starpu_src_common_wait_command_sync(mp_node, &arg, &arg_size);
  445. if (answer == STARPU_MP_COMMAND_ERROR_ALLOCATE)
  446. {
  447. STARPU_PTHREAD_MUTEX_UNLOCK(&mp_node->connection_mutex);
  448. return 1;
  449. }
  450. STARPU_ASSERT(answer == STARPU_MP_COMMAND_ANSWER_ALLOCATE &&
  451. arg_size == sizeof(*addr));
  452. memcpy(addr, arg, arg_size);
  453. STARPU_PTHREAD_MUTEX_UNLOCK(&mp_node->connection_mutex);
  454. return 0;
  455. }
  456. /* Send a request to the sink linked to the MP_NODE to deallocate the memory
  457. * area pointed by ADDR.
  458. */
  459. void _starpu_src_common_free(struct _starpu_mp_node *mp_node,
  460. void *addr)
  461. {
  462. STARPU_PTHREAD_MUTEX_LOCK(&mp_node->connection_mutex);
  463. _starpu_mp_common_send_command(mp_node, STARPU_MP_COMMAND_FREE, &addr, sizeof(addr));
  464. STARPU_PTHREAD_MUTEX_UNLOCK(&mp_node->connection_mutex);
  465. }
  466. /* Send SIZE bytes pointed by SRC to DST on the sink linked to the MP_NODE with a
  467. * synchronous mode.
  468. */
  469. int _starpu_src_common_copy_host_to_sink_sync(struct _starpu_mp_node *mp_node,
  470. void *src, void *dst, size_t size)
  471. {
  472. struct _starpu_mp_transfer_command cmd = {size, dst, NULL};
  473. STARPU_PTHREAD_MUTEX_LOCK(&mp_node->connection_mutex);
  474. _starpu_mp_common_send_command(mp_node, STARPU_MP_COMMAND_RECV_FROM_HOST, &cmd, sizeof(cmd));
  475. mp_node->dt_send(mp_node, src, size, NULL);
  476. STARPU_PTHREAD_MUTEX_UNLOCK(&mp_node->connection_mutex);
  477. return 0;
  478. }
  479. /* Send SIZE bytes pointed by SRC to DST on the sink linked to the MP_NODE with an
  480. * asynchronous mode.
  481. */
  482. int _starpu_src_common_copy_host_to_sink_async(struct _starpu_mp_node *mp_node,
  483. void *src, void *dst, size_t size, void * event)
  484. {
  485. struct _starpu_mp_transfer_command cmd = {size, dst, event};
  486. STARPU_PTHREAD_MUTEX_LOCK(&mp_node->connection_mutex);
  487. /* For asynchronous transfers, we save informations
  488. * to test is they are finished
  489. */
  490. struct _starpu_async_channel * async_channel = event;
  491. async_channel->polling_node_receiver = mp_node;
  492. _starpu_mp_common_send_command(mp_node, STARPU_MP_COMMAND_RECV_FROM_HOST_ASYNC, &cmd, sizeof(cmd));
  493. mp_node->dt_send(mp_node, src, size, event);
  494. STARPU_PTHREAD_MUTEX_UNLOCK(&mp_node->connection_mutex);
  495. return -EAGAIN;
  496. }
  497. /* Receive SIZE bytes pointed by SRC on the sink linked to the MP_NODE and store them in DST
  498. * with a synchronous mode.
  499. */
  500. int _starpu_src_common_copy_sink_to_host_sync(struct _starpu_mp_node *mp_node,
  501. void *src, void *dst, size_t size)
  502. {
  503. enum _starpu_mp_command answer;
  504. void *arg;
  505. int arg_size;
  506. struct _starpu_mp_transfer_command cmd = {size, src, NULL};
  507. STARPU_PTHREAD_MUTEX_LOCK(&mp_node->connection_mutex);
  508. _starpu_mp_common_send_command(mp_node, STARPU_MP_COMMAND_SEND_TO_HOST, &cmd, sizeof(cmd));
  509. answer = _starpu_src_common_wait_command_sync(mp_node, &arg, &arg_size);
  510. STARPU_ASSERT(answer == STARPU_MP_COMMAND_SEND_TO_HOST);
  511. mp_node->dt_recv(mp_node, dst, size, NULL);
  512. STARPU_PTHREAD_MUTEX_UNLOCK(&mp_node->connection_mutex);
  513. return 0;
  514. }
  515. /* Receive SIZE bytes pointed by SRC on the sink linked to the MP_NODE and store them in DST
  516. * with an asynchronous mode.
  517. */
  518. int _starpu_src_common_copy_sink_to_host_async(struct _starpu_mp_node *mp_node,
  519. void *src, void *dst, size_t size, void * event)
  520. {
  521. struct _starpu_mp_transfer_command cmd = {size, src, event};
  522. STARPU_PTHREAD_MUTEX_LOCK(&mp_node->connection_mutex);
  523. /* For asynchronous transfers, we save informations
  524. * to test is they are finished
  525. */
  526. struct _starpu_async_channel * async_channel = event;
  527. async_channel->polling_node_sender = mp_node;
  528. _starpu_mp_common_send_command(mp_node, STARPU_MP_COMMAND_SEND_TO_HOST_ASYNC, &cmd, sizeof(cmd));
  529. mp_node->dt_recv(mp_node, dst, size, event);
  530. STARPU_PTHREAD_MUTEX_UNLOCK(&mp_node->connection_mutex);
  531. return -EAGAIN;
  532. }
  533. /* Tell the sink linked to SRC_NODE to send SIZE bytes of data pointed by SRC
  534. * to the sink linked to DST_NODE. The latter store them in DST with a synchronous
  535. * mode.
  536. */
  537. int _starpu_src_common_copy_sink_to_sink_sync(struct _starpu_mp_node *src_node,
  538. struct _starpu_mp_node *dst_node, void *src, void *dst, size_t size)
  539. {
  540. enum _starpu_mp_command answer;
  541. void *arg;
  542. int arg_size;
  543. struct _starpu_mp_transfer_command_to_device cmd = {dst_node->peer_id, size, src, NULL};
  544. /* lock the node with the little peer_id first to prevent deadlock */
  545. if (src_node->peer_id > dst_node->peer_id)
  546. {
  547. STARPU_PTHREAD_MUTEX_LOCK(&dst_node->connection_mutex);
  548. STARPU_PTHREAD_MUTEX_LOCK(&src_node->connection_mutex);
  549. }
  550. else
  551. {
  552. STARPU_PTHREAD_MUTEX_LOCK(&src_node->connection_mutex);
  553. STARPU_PTHREAD_MUTEX_LOCK(&dst_node->connection_mutex);
  554. }
  555. /* Tell source to send data to dest. */
  556. _starpu_mp_common_send_command(src_node, STARPU_MP_COMMAND_SEND_TO_SINK, &cmd, sizeof(cmd));
  557. /* Release the source as fast as possible */
  558. STARPU_PTHREAD_MUTEX_UNLOCK(&src_node->connection_mutex);
  559. cmd.devid = src_node->peer_id;
  560. cmd.size = size;
  561. cmd.addr = dst;
  562. /* Tell dest to receive data from source. */
  563. _starpu_mp_common_send_command(dst_node, STARPU_MP_COMMAND_RECV_FROM_SINK, &cmd, sizeof(cmd));
  564. /* Wait for answer from dest to know wether transfer is finished. */
  565. answer = _starpu_src_common_wait_command_sync(dst_node, &arg, &arg_size);
  566. STARPU_ASSERT(answer == STARPU_MP_COMMAND_TRANSFER_COMPLETE);
  567. /* Release the receiver when we received the acknowlegment */
  568. STARPU_PTHREAD_MUTEX_UNLOCK(&dst_node->connection_mutex);
  569. return 0;
  570. }
  571. /* Tell the sink linked to SRC_NODE to send SIZE bytes of data pointed by SRC
  572. * to the sink linked to DST_NODE. The latter store them in DST with an asynchronous
  573. * mode.
  574. */
  575. int _starpu_src_common_copy_sink_to_sink_async(struct _starpu_mp_node *src_node,
  576. struct _starpu_mp_node *dst_node, void *src, void *dst, size_t size, void * event)
  577. {
  578. struct _starpu_mp_transfer_command_to_device cmd = {dst_node->peer_id, size, src, event};
  579. /* lock the node with the little peer_id first to prevent deadlock */
  580. if (src_node->peer_id > dst_node->peer_id)
  581. {
  582. STARPU_PTHREAD_MUTEX_LOCK(&dst_node->connection_mutex);
  583. STARPU_PTHREAD_MUTEX_LOCK(&src_node->connection_mutex);
  584. }
  585. else
  586. {
  587. STARPU_PTHREAD_MUTEX_LOCK(&src_node->connection_mutex);
  588. STARPU_PTHREAD_MUTEX_LOCK(&dst_node->connection_mutex);
  589. }
  590. /* For asynchronous transfers, we save informations
  591. * to test is they are finished
  592. */
  593. struct _starpu_async_channel * async_channel = event;
  594. async_channel->polling_node_sender = src_node;
  595. async_channel->polling_node_receiver = dst_node;
  596. /* Increase number of ack waited */
  597. async_channel->starpu_mp_common_finished_receiver++;
  598. async_channel->starpu_mp_common_finished_sender++;
  599. /* Tell source to send data to dest. */
  600. _starpu_mp_common_send_command(src_node, STARPU_MP_COMMAND_SEND_TO_SINK_ASYNC, &cmd, sizeof(cmd));
  601. STARPU_PTHREAD_MUTEX_UNLOCK(&src_node->connection_mutex);
  602. cmd.devid = src_node->peer_id;
  603. cmd.size = size;
  604. cmd.addr = dst;
  605. /* Tell dest to receive data from source. */
  606. _starpu_mp_common_send_command(dst_node, STARPU_MP_COMMAND_RECV_FROM_SINK_ASYNC, &cmd, sizeof(cmd));
  607. STARPU_PTHREAD_MUTEX_UNLOCK(&dst_node->connection_mutex);
  608. return -EAGAIN;
  609. }
  610. /* 5 functions to determine the executable to run on the device (MIC, SCC,
  611. * MPI).
  612. */
  613. static void _starpu_src_common_cat_3(char *final, const char *first,
  614. const char *second, const char *third)
  615. {
  616. strcpy(final, first);
  617. strcat(final, second);
  618. strcat(final, third);
  619. }
  620. static void _starpu_src_common_cat_2(char *final, const char *first, const char *second)
  621. {
  622. _starpu_src_common_cat_3(final, first, second, "");
  623. }
  624. static void _starpu_src_common_dir_cat(char *final, const char *dir, const char *file)
  625. {
  626. if (file[0] == '/')
  627. ++file;
  628. size_t size = strlen(dir);
  629. if (dir[size - 1] == '/')
  630. _starpu_src_common_cat_2(final, dir, file);
  631. else
  632. _starpu_src_common_cat_3(final, dir, "/", file);
  633. }
  634. static int _starpu_src_common_test_suffixes(char *located_file_name, const char *base, const char **suffixes)
  635. {
  636. unsigned int i;
  637. for (i = 0; suffixes[i] != NULL; ++i)
  638. {
  639. _starpu_src_common_cat_2(located_file_name, base, suffixes[i]);
  640. if (access(located_file_name, R_OK) == 0)
  641. return 0;
  642. }
  643. return 1;
  644. }
  645. int _starpu_src_common_locate_file(char *located_file_name,
  646. const char *env_file_name, const char *env_mic_path,
  647. const char *config_file_name, const char *actual_file_name,
  648. const char **suffixes)
  649. {
  650. if (env_file_name != NULL)
  651. {
  652. if (access(env_file_name, R_OK) == 0)
  653. {
  654. strcpy(located_file_name, env_file_name);
  655. return 0;
  656. }
  657. else if(env_mic_path != NULL)
  658. {
  659. _starpu_src_common_dir_cat(located_file_name, env_mic_path, env_file_name);
  660. return access(located_file_name, R_OK);
  661. }
  662. }
  663. else if (config_file_name != NULL)
  664. {
  665. if (access(config_file_name, R_OK) == 0)
  666. {
  667. strcpy(located_file_name, config_file_name);
  668. return 0;
  669. }
  670. else if (env_mic_path != NULL)
  671. {
  672. _starpu_src_common_dir_cat(located_file_name, env_mic_path, config_file_name);
  673. return access(located_file_name, R_OK);
  674. }
  675. }
  676. else if (actual_file_name != NULL)
  677. {
  678. if (_starpu_src_common_test_suffixes(located_file_name, actual_file_name, suffixes) == 0)
  679. return 0;
  680. if (env_mic_path != NULL)
  681. {
  682. char actual_cpy[1024];
  683. strcpy(actual_cpy, actual_file_name);
  684. char *last = strrchr(actual_cpy, '/');
  685. while (last != NULL)
  686. {
  687. char tmp[1024];
  688. _starpu_src_common_dir_cat(tmp, env_mic_path, last);
  689. if (access(tmp, R_OK) == 0)
  690. {
  691. strcpy(located_file_name, tmp);
  692. return 0;
  693. }
  694. if (_starpu_src_common_test_suffixes(located_file_name, tmp, suffixes) == 0)
  695. return 0;
  696. *last = '\0';
  697. char *last_tmp = strrchr(actual_cpy, '/');
  698. *last = '/';
  699. last = last_tmp;
  700. }
  701. }
  702. }
  703. return 1;
  704. }
  705. #if defined(STARPU_USE_MPI_MASTER_SLAVE) && !defined(STARPU_MPI_MASTER_SLAVE_MULTIPLE_THREAD)
  706. void _starpu_src_common_init_switch_env(unsigned this)
  707. {
  708. save_thread_env[this].current_task = starpu_task_get_current();
  709. save_thread_env[this].current_worker = STARPU_PTHREAD_GETSPECIFIC(_starpu_worker_key);
  710. save_thread_env[this].current_worker_set = STARPU_PTHREAD_GETSPECIFIC(_starpu_worker_set_key);
  711. save_thread_env[this].current_mem_node = STARPU_PTHREAD_GETSPECIFIC(_starpu_memory_node_key);
  712. #ifdef STARPU_OPENMP
  713. save_thread_env[this].current_omp_thread = STARPU_PTHREAD_GETSPECIFIC(omp_thread_key);
  714. save_thread_env[this].current_omp_task = STARPU_PTHREAD_GETSPECIFIC(omp_task_key);
  715. #endif
  716. }
  717. static void _starpu_src_common_switch_env(unsigned old, unsigned new)
  718. {
  719. save_thread_env[old].current_task = starpu_task_get_current();
  720. save_thread_env[old].current_worker = STARPU_PTHREAD_GETSPECIFIC(_starpu_worker_key);
  721. save_thread_env[old].current_worker_set = STARPU_PTHREAD_GETSPECIFIC(_starpu_worker_set_key);
  722. save_thread_env[old].current_mem_node = STARPU_PTHREAD_GETSPECIFIC(_starpu_memory_node_key);
  723. #ifdef STARPU_OPENMP
  724. save_thread_env[old].current_omp_thread = STARPU_PTHREAD_GETSPECIFIC(omp_thread_key);
  725. save_thread_env[old].current_omp_task = STARPU_PTHREAD_GETSPECIFIC(omp_task_key);
  726. #endif
  727. _starpu_set_current_task(save_thread_env[new].current_task);
  728. STARPU_PTHREAD_SETSPECIFIC(_starpu_worker_key, save_thread_env[new].current_worker);
  729. STARPU_PTHREAD_SETSPECIFIC(_starpu_worker_set_key, save_thread_env[new].current_worker_set);
  730. STARPU_PTHREAD_SETSPECIFIC(_starpu_memory_node_key, save_thread_env[new].current_mem_node);
  731. #ifdef STARPU_OPENMP
  732. STARPU_PTHREAD_SETSPECIFIC(omp_thread_key, save_thread_env[new].current_omp_thread);
  733. STARPU_PTHREAD_SETSPECIFIC(omp_task_key, save_thread_env[new].current_omp_task);
  734. #endif
  735. }
  736. #endif
  737. /* Send workers to the sink node
  738. */
  739. static void _starpu_src_common_send_workers(struct _starpu_mp_node * node, int baseworkerid, int nworkers)
  740. {
  741. struct _starpu_machine_config *config = _starpu_get_machine_config();
  742. int worker_size = sizeof(struct _starpu_worker)*nworkers;
  743. int combined_worker_size = STARPU_NMAX_COMBINEDWORKERS*sizeof(struct _starpu_combined_worker);
  744. int msg[5];
  745. msg[0] = nworkers;
  746. msg[1] = worker_size;
  747. msg[2] = combined_worker_size;
  748. msg[3] = baseworkerid;
  749. msg[4] = starpu_worker_get_count();
  750. STARPU_PTHREAD_MUTEX_LOCK(&node->connection_mutex);
  751. /* tell the sink node that we will send him all workers */
  752. _starpu_mp_common_send_command(node, STARPU_MP_COMMAND_SYNC_WORKERS,
  753. &msg, sizeof(msg));
  754. /* Send all worker to the sink node */
  755. node->dt_send(node,&config->workers[baseworkerid],worker_size, NULL);
  756. /* Send all combined workers to the sink node */
  757. node->dt_send(node, &config->combined_workers,combined_worker_size, NULL);
  758. STARPU_PTHREAD_MUTEX_UNLOCK(&node->connection_mutex);
  759. }
  760. static void _starpu_src_common_worker_internal_work(struct _starpu_worker_set * worker_set, struct _starpu_mp_node * mp_node, struct starpu_task **tasks, unsigned memnode)
  761. {
  762. int res = 0;
  763. _starpu_may_pause();
  764. #ifdef STARPU_SIMGRID
  765. starpu_pthread_wait_reset(&worker_set->workers[0].wait);
  766. #endif
  767. /* Test if async transfers are completed */
  768. for (unsigned i = 0; i < worker_set->nworkers; i++)
  769. {
  770. struct starpu_task *task = worker_set->workers[i].task_transferring;
  771. /* We send all buffers to execute the task */
  772. if (task != NULL && worker_set->workers[i].nb_buffers_transferred == worker_set->workers[i].nb_buffers_totransfer)
  773. {
  774. struct _starpu_job * j = _starpu_get_job_associated_to_task(task);
  775. _starpu_set_local_worker_key(&worker_set->workers[i]);
  776. _starpu_release_fetch_task_input_async(j, worker_set->workers[i].workerid, worker_set->workers[i].nb_buffers_totransfer);
  777. /* Execute the task */
  778. res = _starpu_src_common_execute(j, &worker_set->workers[i], mp_node);
  779. switch (res)
  780. {
  781. case 0:
  782. /* The task task has been launched with no error */
  783. break;
  784. case -EAGAIN:
  785. _STARPU_DISP("ouch, this MP worker could not actually run task %p, putting it back...\n", tasks[i]);
  786. _starpu_push_task_to_workers(worker_set->workers[i].task_transferring);
  787. STARPU_ABORT();
  788. continue;
  789. break;
  790. default:
  791. STARPU_ASSERT(0);
  792. }
  793. /* Reset it */
  794. worker_set->workers[i].task_transferring = NULL;
  795. }
  796. _STARPU_TRACE_START_PROGRESS(memnode);
  797. res |= __starpu_datawizard_progress(1, 1);
  798. _STARPU_TRACE_END_PROGRESS(memnode);
  799. /* Handle message which have been store */
  800. _starpu_src_common_handle_stored_async(mp_node);
  801. STARPU_PTHREAD_MUTEX_LOCK(&mp_node->connection_mutex);
  802. /* poll the device for completed jobs.*/
  803. while(mp_node->mp_recv_is_ready(mp_node))
  804. {
  805. _starpu_src_common_recv_async(mp_node);
  806. /* Mutex is unlock in _starpu_src_common_recv_async */
  807. STARPU_PTHREAD_MUTEX_LOCK(&mp_node->connection_mutex);
  808. }
  809. STARPU_PTHREAD_MUTEX_UNLOCK(&mp_node->connection_mutex);
  810. /* get task for each worker*/
  811. res |= _starpu_get_multi_worker_task(worker_set->workers, tasks, worker_set->nworkers, memnode);
  812. #ifdef STARPU_SIMGRID
  813. if (!res)
  814. starpu_pthread_wait_wait(&worker_set->workers[0].wait);
  815. #endif
  816. /*if at least one worker have pop a task*/
  817. if(res != 0)
  818. {
  819. unsigned i;
  820. for(i=0; i<worker_set->nworkers; i++)
  821. {
  822. if(tasks[i] != NULL)
  823. {
  824. _starpu_set_local_worker_key(&worker_set->workers[i]);
  825. _starpu_fetch_task_input(task[i], _starpu_get_job_associated_to_task(tasks[i]), 1);
  826. }
  827. }
  828. /* Handle message which have been store */
  829. _starpu_src_common_handle_stored_async(mp_node);
  830. }
  831. #if defined(STARPU_USE_MPI_MASTER_SLAVE) && !defined(STARPU_MPI_MASTER_SLAVE_MULTIPLE_THREAD)
  832. /* Function looping on the source node */
  833. void _starpu_src_common_workers_set(struct _starpu_worker_set * worker_set,
  834. int ndevices, struct _starpu_mp_node ** mp_node)
  835. {
  836. unsigned memnode[ndevices];
  837. unsigned offsetmemnode[ndevices];
  838. memset(offsetmemnode, 0, ndevices*sizeof(unsigned));
  839. int device;
  840. int nbworkers = 0;
  841. for (device = 0; device < ndevices; device++)
  842. {
  843. memnode[device] = worker_set[device].workers[0].memory_node;
  844. nbworkers += worker_set[device].nworkers;
  845. if (device != 0)
  846. offsetmemnode[device] += offsetmemnode[device-1];
  847. if (device != ndevices -1)
  848. offsetmemnode[device+1] += worker_set[device].nworkers;
  849. }
  850. struct starpu_task **tasks;
  851. _STARPU_MALLOC(tasks, sizeof(struct starpu_task *)*nbworkers);
  852. for (device = 0; device < ndevices; device++)
  853. {
  854. struct _starpu_worker *baseworker = &worker_set[device].workers[0];
  855. struct _starpu_machine_config *config = baseworker->config;
  856. unsigned baseworkerid = baseworker - config->workers;
  857. _starpu_src_common_send_workers(mp_node[device], baseworkerid, worker_set[device].nworkers);
  858. }
  859. /*main loop*/
  860. while (_starpu_machine_is_running())
  861. {
  862. for (device = 0; device < ndevices ; device++)
  863. {
  864. _starpu_src_common_switch_env(((device-1)+ndevices)%ndevices, device);
  865. _starpu_src_common_worker_internal_work(&worker_set[device], mp_node[device], tasks+offsetmemnode[device], memnode[device]);
  866. }
  867. }
  868. free(tasks);
  869. for (device = 0; device < ndevices; device++)
  870. _starpu_handle_all_pending_node_data_requests(memnode[device]);
  871. /* In case there remains some memory that was automatically
  872. * allocated by StarPU, we release it now. Note that data
  873. * coherency is not maintained anymore at that point ! */
  874. for (device = 0; device < ndevices; device++)
  875. _starpu_free_all_automatically_allocated_buffers(memnode[device]);
  876. }
  877. #endif
  878. /* Function looping on the source node */
  879. void _starpu_src_common_worker(struct _starpu_worker_set * worker_set,
  880. unsigned baseworkerid,
  881. struct _starpu_mp_node * mp_node)
  882. {
  883. unsigned memnode = worker_set->workers[0].memory_node;
  884. struct starpu_task **tasks;
  885. _STARPU_MALLOC(tasks, sizeof(struct starpu_task *)*worker_set->nworkers);
  886. _starpu_src_common_send_workers(mp_node, baseworkerid, worker_set->nworkers);
  887. /*main loop*/
  888. while (_starpu_machine_is_running())
  889. {
  890. _starpu_src_common_worker_internal_work(worker_set, mp_node, tasks, memnode);
  891. }
  892. free(tasks);
  893. _starpu_handle_all_pending_node_data_requests(memnode);
  894. /* In case there remains some memory that was automatically
  895. * allocated by StarPU, we release it now. Note that data
  896. * coherency is not maintained anymore at that point ! */
  897. _starpu_free_all_automatically_allocated_buffers(memnode);
  898. }