source_common.c 40 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2012, 2016 INRIA
  4. *
  5. * StarPU is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * StarPU is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <string.h>
  17. #include <starpu.h>
  18. #include <core/task.h>
  19. #include <core/sched_policy.h>
  20. #include <drivers/driver_common/driver_common.h>
  21. #include <datawizard/coherency.h>
  22. #include <datawizard/memory_nodes.h>
  23. #include <datawizard/interfaces/data_interface.h>
  24. #include <drivers/mp_common/mp_common.h>
  25. #if defined(STARPU_USE_MPI_MASTER_SLAVE) && !defined(STARPU_MPI_MASTER_SLAVE_MULTIPLE_THREAD)
  26. struct starpu_save_thread_env
  27. {
  28. struct starpu_task * current_task;
  29. struct _starpu_worker * current_worker;
  30. struct _starpu_worker_set * current_worker_set;
  31. unsigned * current_mem_node;
  32. #ifdef STARPU_OPENMP
  33. struct starpu_omp_thread * current_omp_thread;
  34. struct starpu_omp_task * current_omp_task;
  35. #endif
  36. };
  37. struct starpu_save_thread_env save_thread_env[STARPU_MAXMPIDEVS];
  38. #endif
  39. /* Finalize the execution of a task by a worker*/
  40. static int _starpu_src_common_finalize_job (struct _starpu_job *j, struct _starpu_worker *worker)
  41. {
  42. int profiling = starpu_profiling_status_get();
  43. struct timespec codelet_end;
  44. _starpu_driver_end_job(worker, j, &worker->perf_arch, &codelet_end, 0,
  45. profiling);
  46. int count = worker->current_rank;
  47. /* If it's a combined worker, we check if it's the last one of his combined */
  48. if(j->task_size > 1)
  49. {
  50. struct _starpu_combined_worker * cb_worker = _starpu_get_combined_worker_struct(worker->combined_workerid);
  51. STARPU_PTHREAD_MUTEX_LOCK(&cb_worker->count_mutex);
  52. count = cb_worker->count--;
  53. if(count == 0)
  54. cb_worker->count = cb_worker->worker_size - 1;
  55. STARPU_PTHREAD_MUTEX_UNLOCK(&cb_worker->count_mutex);
  56. }
  57. /* Finalize the execution */
  58. if(count == 0)
  59. {
  60. _starpu_driver_update_job_feedback(j, worker, &worker->perf_arch,
  61. &j->cl_start, &codelet_end,
  62. profiling);
  63. _starpu_push_task_output (j);
  64. _starpu_handle_job_termination(j);
  65. }
  66. return 0;
  67. }
  68. /* Complete the execution of the job */
  69. static int _starpu_src_common_process_completed_job(struct _starpu_mp_node *node, struct _starpu_worker_set *workerset, void * arg, int arg_size, int stored)
  70. {
  71. int coreid;
  72. STARPU_ASSERT(sizeof(coreid) == arg_size);
  73. coreid = *(int *) arg;
  74. struct _starpu_worker *worker = &workerset->workers[coreid];
  75. struct _starpu_job *j = _starpu_get_job_associated_to_task(worker->current_task);
  76. struct _starpu_worker * old_worker = _starpu_get_local_worker_key();
  77. /* if arg is not copied we release the mutex */
  78. if (!stored)
  79. STARPU_PTHREAD_MUTEX_UNLOCK(&node->connection_mutex);
  80. _starpu_set_local_worker_key(worker);
  81. _starpu_src_common_finalize_job (j, worker);
  82. _starpu_set_local_worker_key(old_worker);
  83. worker->current_task = NULL;
  84. return 0;
  85. }
  86. /* Tell the scheduler when the execution has begun */
  87. static void _starpu_src_common_pre_exec(struct _starpu_mp_node *node, void * arg, int arg_size, int stored)
  88. {
  89. int cb_workerid, i;
  90. STARPU_ASSERT(sizeof(cb_workerid) == arg_size);
  91. cb_workerid = *(int *) arg;
  92. struct _starpu_combined_worker *combined_worker = _starpu_get_combined_worker_struct(cb_workerid);
  93. /* if arg is not copied we release the mutex */
  94. if (!stored)
  95. STARPU_PTHREAD_MUTEX_LOCK(&node->connection_mutex);
  96. for(i=0; i < combined_worker->worker_size; i++)
  97. {
  98. struct _starpu_worker * worker = _starpu_get_worker_struct(combined_worker->combined_workerid[i]);
  99. _starpu_set_local_worker_key(worker);
  100. _starpu_sched_pre_exec_hook(worker->current_task);
  101. }
  102. }
  103. /* recv a message and handle asynchronous message
  104. * return 0 if the message has not been handle (it's certainly mean that it's a synchronous message)
  105. * return 1 if the message has been handle
  106. */
  107. static int _starpu_src_common_handle_async(struct _starpu_mp_node *node,
  108. void * arg, int arg_size,
  109. enum _starpu_mp_command answer, int stored)
  110. {
  111. struct _starpu_worker_set * worker_set = NULL;
  112. switch(answer)
  113. {
  114. case STARPU_MP_COMMAND_EXECUTION_COMPLETED:
  115. worker_set = _starpu_get_worker_struct(starpu_worker_get_id())->set;
  116. _starpu_src_common_process_completed_job(node, worker_set, arg, arg_size, stored);
  117. break;
  118. case STARPU_MP_COMMAND_PRE_EXECUTION:
  119. _starpu_src_common_pre_exec(node, arg,arg_size, stored);
  120. break;
  121. case STARPU_MP_COMMAND_RECV_FROM_HOST_ASYNC_COMPLETED:
  122. case STARPU_MP_COMMAND_RECV_FROM_SINK_ASYNC_COMPLETED:
  123. {
  124. struct _starpu_async_channel * event = *((struct _starpu_async_channel **) arg);
  125. event->starpu_mp_common_finished_receiver--;
  126. if (!stored)
  127. STARPU_PTHREAD_MUTEX_UNLOCK(&node->connection_mutex);
  128. break;
  129. }
  130. case STARPU_MP_COMMAND_SEND_TO_HOST_ASYNC_COMPLETED:
  131. case STARPU_MP_COMMAND_SEND_TO_SINK_ASYNC_COMPLETED:
  132. {
  133. struct _starpu_async_channel * event = *((struct _starpu_async_channel **) arg);
  134. event->starpu_mp_common_finished_sender--;
  135. if (!stored)
  136. STARPU_PTHREAD_MUTEX_UNLOCK(&node->connection_mutex);
  137. break;
  138. }
  139. default:
  140. return 0;
  141. break;
  142. }
  143. return 1;
  144. }
  145. /* Handle all message which have been stored in the message_queue */
  146. static void _starpu_src_common_handle_stored_async(struct _starpu_mp_node *node)
  147. {
  148. STARPU_PTHREAD_MUTEX_LOCK(&node->message_queue_mutex);
  149. /* while the list is not empty */
  150. while(!mp_message_list_empty(&node->message_queue))
  151. {
  152. /* We pop a message and handle it */
  153. struct mp_message * message = mp_message_list_pop_back(&node->message_queue);
  154. /* Release mutex during handle */
  155. STARPU_PTHREAD_MUTEX_UNLOCK(&node->message_queue_mutex);
  156. _starpu_src_common_handle_async(node, message->buffer,
  157. message->size, message->type, 1);
  158. free(message->buffer);
  159. mp_message_delete(message);
  160. /* Take it again */
  161. STARPU_PTHREAD_MUTEX_LOCK(&node->message_queue_mutex);
  162. }
  163. STARPU_PTHREAD_MUTEX_UNLOCK(&node->message_queue_mutex);
  164. }
  165. /* Store a message if is asynchronous
  166. * return 1 if the message has been stored
  167. * return 0 if the message is unknown or synchrone */
  168. int _starpu_src_common_store_message(struct _starpu_mp_node *node,
  169. void * arg, int arg_size, enum _starpu_mp_command answer)
  170. {
  171. struct mp_message * message = NULL;
  172. switch(answer)
  173. {
  174. case STARPU_MP_COMMAND_EXECUTION_COMPLETED:
  175. case STARPU_MP_COMMAND_PRE_EXECUTION:
  176. message = mp_message_new();
  177. message->type = answer;
  178. _STARPU_MALLOC(message->buffer, arg_size);
  179. memcpy(message->buffer, arg, arg_size);
  180. message->size = arg_size;
  181. STARPU_PTHREAD_MUTEX_LOCK(&node->message_queue_mutex);
  182. mp_message_list_push_front(&node->message_queue,message);
  183. STARPU_PTHREAD_MUTEX_UNLOCK(&node->message_queue_mutex);
  184. return 1;
  185. break;
  186. /* For ASYNC commands don't store them, update event */
  187. case STARPU_MP_COMMAND_RECV_FROM_HOST_ASYNC_COMPLETED:
  188. case STARPU_MP_COMMAND_RECV_FROM_SINK_ASYNC_COMPLETED:
  189. {
  190. struct _starpu_async_channel * event = *((struct _starpu_async_channel **) arg);
  191. event->starpu_mp_common_finished_receiver--;
  192. return 1;
  193. break;
  194. }
  195. case STARPU_MP_COMMAND_SEND_TO_HOST_ASYNC_COMPLETED:
  196. case STARPU_MP_COMMAND_SEND_TO_SINK_ASYNC_COMPLETED:
  197. {
  198. struct _starpu_async_channel * event = *((struct _starpu_async_channel **) arg);
  199. event->starpu_mp_common_finished_sender--;
  200. return 1;
  201. break;
  202. }
  203. default:
  204. return 0;
  205. break;
  206. }
  207. }
  208. /* Store all asynchronous messages and return when a synchronous message is received */
  209. static enum _starpu_mp_command _starpu_src_common_wait_command_sync(struct _starpu_mp_node *node,
  210. void ** arg, int* arg_size)
  211. {
  212. enum _starpu_mp_command answer;
  213. int is_sync = 0;
  214. while(!is_sync)
  215. {
  216. answer = _starpu_mp_common_recv_command(node, arg, arg_size);
  217. if(!_starpu_src_common_store_message(node,*arg,*arg_size,answer))
  218. is_sync=1;
  219. }
  220. return answer;
  221. }
  222. /* Handle a asynchrone message and return a error if a synchronous message is received */
  223. static void _starpu_src_common_recv_async(struct _starpu_mp_node * node)
  224. {
  225. enum _starpu_mp_command answer;
  226. void *arg;
  227. int arg_size;
  228. answer = _starpu_mp_common_recv_command(node, &arg, &arg_size);
  229. if(!_starpu_src_common_handle_async(node,arg,arg_size,answer, 0))
  230. {
  231. printf("incorrect commande: unknown command or sync command");
  232. STARPU_ASSERT(0);
  233. }
  234. }
  235. /* Handle all asynchrone message while a completed execution message from a specific worker has been receive */
  236. enum _starpu_mp_command _starpu_src_common_wait_completed_execution(struct _starpu_mp_node *node, int devid, void **arg, int * arg_size)
  237. {
  238. enum _starpu_mp_command answer;
  239. int completed = 0;
  240. /*While the waited completed execution message has not been receive*/
  241. while(!completed)
  242. {
  243. answer = _starpu_mp_common_recv_command (node, arg, arg_size);
  244. if(answer == STARPU_MP_COMMAND_EXECUTION_COMPLETED)
  245. {
  246. int coreid;
  247. STARPU_ASSERT(sizeof(coreid) == *arg_size);
  248. coreid = *(int *) *arg;
  249. if(devid == coreid)
  250. completed = 1;
  251. else
  252. if(!_starpu_src_common_store_message(node, *arg, *arg_size, answer))
  253. /* We receive a unknown or asynchronous message */
  254. STARPU_ASSERT(0);
  255. }
  256. else
  257. {
  258. if(!_starpu_src_common_store_message(node, *arg, *arg_size, answer))
  259. /* We receive a unknown or asynchronous message */
  260. STARPU_ASSERT(0);
  261. }
  262. }
  263. return answer;
  264. }
  265. /* Send a request to the sink NODE for the number of cores on it. */
  266. int _starpu_src_common_sink_nbcores (struct _starpu_mp_node *node, int *buf)
  267. {
  268. enum _starpu_mp_command answer;
  269. void *arg;
  270. int arg_size = sizeof (int);
  271. STARPU_PTHREAD_MUTEX_LOCK(&node->connection_mutex);
  272. _starpu_mp_common_send_command (node, STARPU_MP_COMMAND_SINK_NBCORES, NULL, 0);
  273. answer = _starpu_mp_common_recv_command (node, &arg, &arg_size);
  274. STARPU_ASSERT(answer == STARPU_MP_COMMAND_ANSWER_SINK_NBCORES && arg_size == sizeof (int));
  275. memcpy (buf, arg, arg_size);
  276. STARPU_PTHREAD_MUTEX_UNLOCK(&node->connection_mutex);
  277. return 0;
  278. }
  279. /* Send a request to the sink linked to NODE for the pointer to the
  280. * function defined by FUNC_NAME.
  281. * In case of success, it returns 0 and FUNC_PTR contains the pointer ;
  282. * else it returns -ESPIPE if the function was not found.
  283. */
  284. int _starpu_src_common_lookup(struct _starpu_mp_node *node,
  285. void (**func_ptr)(void), const char *func_name)
  286. {
  287. enum _starpu_mp_command answer;
  288. void *arg;
  289. int arg_size;
  290. /* strlen ignore the terminating '\0' */
  291. arg_size = (strlen(func_name) + 1) * sizeof(char);
  292. STARPU_PTHREAD_MUTEX_LOCK(&node->connection_mutex);
  293. //_STARPU_DEBUG("Looking up %s\n", func_name);
  294. _starpu_mp_common_send_command(node, STARPU_MP_COMMAND_LOOKUP, (void *) func_name,
  295. arg_size);
  296. answer = _starpu_src_common_wait_command_sync(node, (void **) &arg,
  297. &arg_size);
  298. if (answer == STARPU_MP_COMMAND_ERROR_LOOKUP)
  299. {
  300. _STARPU_DISP("Error looking up symbol %s\n", func_name);
  301. STARPU_PTHREAD_MUTEX_UNLOCK(&node->connection_mutex);
  302. return -ESPIPE;
  303. }
  304. /* We have to be sure the device answered the right question and the
  305. * answer has the right size */
  306. STARPU_ASSERT(answer == STARPU_MP_COMMAND_ANSWER_LOOKUP);
  307. STARPU_ASSERT(arg_size == sizeof(*func_ptr));
  308. memcpy(func_ptr, arg, arg_size);
  309. STARPU_PTHREAD_MUTEX_UNLOCK(&node->connection_mutex);
  310. //_STARPU_DEBUG("got %p\n", *func_ptr);
  311. return 0;
  312. }
  313. /* Send a message to the sink to execute a kernel.
  314. * The message sent has the form below :
  315. * [Function pointer on sink, number of interfaces, interfaces
  316. * (union _starpu_interface), cl_arg]
  317. */
  318. /* Launch the execution of the function KERNEL points to on the sink linked
  319. * to NODE. Returns 0 in case of success, -EINVAL if kernel is an invalid
  320. * pointer.
  321. * Data interfaces in task are send to the sink.
  322. */
  323. int _starpu_src_common_execute_kernel(struct _starpu_mp_node *node,
  324. void (*kernel)(void), unsigned coreid,
  325. enum starpu_codelet_type type,
  326. int is_parallel_task, int cb_workerid,
  327. starpu_data_handle_t *handles,
  328. void **interfaces,
  329. unsigned nb_interfaces,
  330. void *cl_arg, size_t cl_arg_size)
  331. {
  332. void *buffer, *arg =NULL;
  333. uintptr_t buffer_ptr;
  334. int buffer_size = 0, arg_size =0;
  335. unsigned i;
  336. buffer_size = sizeof(kernel) + sizeof(coreid) + sizeof(type)
  337. + sizeof(nb_interfaces) + nb_interfaces * sizeof(union _starpu_interface) + sizeof(is_parallel_task);
  338. /*if the task is parallel*/
  339. if(is_parallel_task)
  340. {
  341. buffer_size += sizeof(cb_workerid);
  342. }
  343. /* If the user didn't give any cl_arg, there is no need to send it */
  344. if (cl_arg)
  345. {
  346. STARPU_ASSERT(cl_arg_size);
  347. buffer_size += cl_arg_size;
  348. }
  349. /* We give to send_command a buffer we just allocated, which contains
  350. * a pointer to the function (sink-side), core on which execute this
  351. * function (sink-side), number of interfaces we send,
  352. * an array of generic (union) interfaces and the value of cl_arg */
  353. _STARPU_MALLOC(buffer, buffer_size);
  354. buffer_ptr = (uintptr_t) buffer;
  355. *(void(**)(void)) buffer = kernel;
  356. buffer_ptr += sizeof(kernel);
  357. *(enum starpu_codelet_type *) buffer_ptr = type;
  358. buffer_ptr += sizeof(type);
  359. *(int *) buffer_ptr = is_parallel_task;
  360. buffer_ptr += sizeof(is_parallel_task);
  361. if(is_parallel_task)
  362. {
  363. *(int *) buffer_ptr = cb_workerid ;
  364. buffer_ptr += sizeof(cb_workerid);
  365. }
  366. *(unsigned *) buffer_ptr = coreid;
  367. buffer_ptr += sizeof(coreid);
  368. *(unsigned *) buffer_ptr = nb_interfaces;
  369. buffer_ptr += sizeof(nb_interfaces);
  370. /* Message-passing execution is a particular case as the codelet is
  371. * executed on a sink with a different memory, whereas a codelet is
  372. * executed on the host part for the other accelerators.
  373. * Thus we need to send a copy of each interface on the MP device */
  374. for (i = 0; i < nb_interfaces; i++)
  375. {
  376. starpu_data_handle_t handle = handles[i];
  377. memcpy ((void*) buffer_ptr, interfaces[i],
  378. handle->ops->interface_size);
  379. /* The sink side has no mean to get the type of each
  380. * interface, we use a union to make it generic and permit the
  381. * sink to go through the array */
  382. buffer_ptr += sizeof(union _starpu_interface);
  383. }
  384. if (cl_arg)
  385. memcpy((void*) buffer_ptr, cl_arg, cl_arg_size);
  386. STARPU_PTHREAD_MUTEX_LOCK(&node->connection_mutex);
  387. _starpu_mp_common_send_command(node, STARPU_MP_COMMAND_EXECUTE, buffer, buffer_size);
  388. enum _starpu_mp_command answer = _starpu_src_common_wait_command_sync(node, &arg, &arg_size);
  389. if (answer == STARPU_MP_COMMAND_ERROR_EXECUTE)
  390. {
  391. STARPU_PTHREAD_MUTEX_UNLOCK(&node->connection_mutex);
  392. return -EINVAL;
  393. }
  394. STARPU_ASSERT(answer == STARPU_MP_COMMAND_EXECUTION_SUBMITTED);
  395. STARPU_PTHREAD_MUTEX_UNLOCK(&node->connection_mutex);
  396. free(buffer);
  397. return 0;
  398. }
  399. /* Get the information and call the function to send to the sink a message to execute the task*/
  400. static int _starpu_src_common_execute(struct _starpu_job *j,
  401. struct _starpu_worker *worker,
  402. struct _starpu_mp_node * node)
  403. {
  404. STARPU_ASSERT(j);
  405. struct starpu_task *task = j->task;
  406. int profiling = starpu_profiling_status_get();
  407. STARPU_ASSERT(task);
  408. if (worker->current_rank == 0)
  409. {
  410. int ret = _starpu_fetch_task_input(j);
  411. if (ret != 0)
  412. {
  413. /* there was not enough memory, so the input of
  414. * the codelet cannot be fetched ... put the
  415. * codelet back, and try it later */
  416. return -EAGAIN;
  417. }
  418. }
  419. void (*kernel)(void) = node->get_kernel_from_job(node,j);
  420. _starpu_driver_start_job(worker, j, &worker->perf_arch, &j->cl_start, 0, profiling);
  421. //_STARPU_DEBUG("\nworkerid:%d, rank:%d, type:%d, cb_workerid:%d, task_size:%d\n\n",worker->devid,worker->current_rank,task->cl->type,j->combined_workerid,j->task_size);
  422. _starpu_src_common_execute_kernel(node, kernel, worker->subworkerid, task->cl->type,
  423. (j->task_size > 1),
  424. j->combined_workerid, STARPU_TASK_GET_HANDLES(task),
  425. _STARPU_TASK_GET_INTERFACES(task), STARPU_TASK_GET_NBUFFERS(task),
  426. task->cl_arg, task->cl_arg_size);
  427. return 0;
  428. }
  429. /* Send a request to the sink linked to the MP_NODE to allocate SIZE bytes on
  430. * the sink.
  431. * In case of success, it returns 0 and *ADDR contains the address of the
  432. * allocated area ;
  433. * else it returns 1 if the allocation fail.
  434. */
  435. int _starpu_src_common_allocate(struct _starpu_mp_node *mp_node,
  436. void **addr, size_t size)
  437. {
  438. enum _starpu_mp_command answer;
  439. void *arg;
  440. int arg_size;
  441. STARPU_PTHREAD_MUTEX_LOCK(&mp_node->connection_mutex);
  442. _starpu_mp_common_send_command(mp_node, STARPU_MP_COMMAND_ALLOCATE, &size,
  443. sizeof(size));
  444. answer = _starpu_src_common_wait_command_sync(mp_node, &arg, &arg_size);
  445. if (answer == STARPU_MP_COMMAND_ERROR_ALLOCATE)
  446. {
  447. STARPU_PTHREAD_MUTEX_UNLOCK(&mp_node->connection_mutex);
  448. return 1;
  449. }
  450. STARPU_ASSERT(answer == STARPU_MP_COMMAND_ANSWER_ALLOCATE &&
  451. arg_size == sizeof(*addr));
  452. memcpy(addr, arg, arg_size);
  453. STARPU_PTHREAD_MUTEX_UNLOCK(&mp_node->connection_mutex);
  454. return 0;
  455. }
  456. /* Send a request to the sink linked to the MP_NODE to deallocate the memory
  457. * area pointed by ADDR.
  458. */
  459. void _starpu_src_common_free(struct _starpu_mp_node *mp_node,
  460. void *addr)
  461. {
  462. STARPU_PTHREAD_MUTEX_LOCK(&mp_node->connection_mutex);
  463. _starpu_mp_common_send_command(mp_node, STARPU_MP_COMMAND_FREE, &addr, sizeof(addr));
  464. STARPU_PTHREAD_MUTEX_UNLOCK(&mp_node->connection_mutex);
  465. }
  466. /* Send SIZE bytes pointed by SRC to DST on the sink linked to the MP_NODE with a
  467. * synchronous mode.
  468. */
  469. int _starpu_src_common_copy_host_to_sink_sync(struct _starpu_mp_node *mp_node,
  470. void *src, void *dst, size_t size)
  471. {
  472. struct _starpu_mp_transfer_command cmd = {size, dst, NULL};
  473. STARPU_PTHREAD_MUTEX_LOCK(&mp_node->connection_mutex);
  474. _starpu_mp_common_send_command(mp_node, STARPU_MP_COMMAND_RECV_FROM_HOST, &cmd, sizeof(cmd));
  475. mp_node->dt_send(mp_node, src, size, NULL);
  476. STARPU_PTHREAD_MUTEX_UNLOCK(&mp_node->connection_mutex);
  477. return 0;
  478. }
  479. /* Send SIZE bytes pointed by SRC to DST on the sink linked to the MP_NODE with an
  480. * asynchronous mode.
  481. */
  482. int _starpu_src_common_copy_host_to_sink_async(struct _starpu_mp_node *mp_node,
  483. void *src, void *dst, size_t size, void * event)
  484. {
  485. struct _starpu_mp_transfer_command cmd = {size, dst, event};
  486. STARPU_PTHREAD_MUTEX_LOCK(&mp_node->connection_mutex);
  487. /* For asynchronous transfers, we save informations
  488. * to test is they are finished
  489. */
  490. struct _starpu_async_channel * async_channel = event;
  491. async_channel->polling_node_receiver = mp_node;
  492. _starpu_mp_common_send_command(mp_node, STARPU_MP_COMMAND_RECV_FROM_HOST_ASYNC, &cmd, sizeof(cmd));
  493. mp_node->dt_send(mp_node, src, size, event);
  494. STARPU_PTHREAD_MUTEX_UNLOCK(&mp_node->connection_mutex);
  495. return -EAGAIN;
  496. }
  497. /* Receive SIZE bytes pointed by SRC on the sink linked to the MP_NODE and store them in DST
  498. * with a synchronous mode.
  499. */
  500. int _starpu_src_common_copy_sink_to_host_sync(struct _starpu_mp_node *mp_node,
  501. void *src, void *dst, size_t size)
  502. {
  503. enum _starpu_mp_command answer;
  504. void *arg;
  505. int arg_size;
  506. struct _starpu_mp_transfer_command cmd = {size, src, NULL};
  507. STARPU_PTHREAD_MUTEX_LOCK(&mp_node->connection_mutex);
  508. _starpu_mp_common_send_command(mp_node, STARPU_MP_COMMAND_SEND_TO_HOST, &cmd, sizeof(cmd));
  509. answer = _starpu_src_common_wait_command_sync(mp_node, &arg, &arg_size);
  510. STARPU_ASSERT(answer == STARPU_MP_COMMAND_SEND_TO_HOST);
  511. mp_node->dt_recv(mp_node, dst, size, NULL);
  512. STARPU_PTHREAD_MUTEX_UNLOCK(&mp_node->connection_mutex);
  513. return 0;
  514. }
  515. /* Receive SIZE bytes pointed by SRC on the sink linked to the MP_NODE and store them in DST
  516. * with an asynchronous mode.
  517. */
  518. int _starpu_src_common_copy_sink_to_host_async(struct _starpu_mp_node *mp_node,
  519. void *src, void *dst, size_t size, void * event)
  520. {
  521. struct _starpu_mp_transfer_command cmd = {size, src, event};
  522. STARPU_PTHREAD_MUTEX_LOCK(&mp_node->connection_mutex);
  523. /* For asynchronous transfers, we save informations
  524. * to test is they are finished
  525. */
  526. struct _starpu_async_channel * async_channel = event;
  527. async_channel->polling_node_sender = mp_node;
  528. _starpu_mp_common_send_command(mp_node, STARPU_MP_COMMAND_SEND_TO_HOST_ASYNC, &cmd, sizeof(cmd));
  529. mp_node->dt_recv(mp_node, dst, size, event);
  530. STARPU_PTHREAD_MUTEX_UNLOCK(&mp_node->connection_mutex);
  531. return -EAGAIN;
  532. }
  533. /* Tell the sink linked to SRC_NODE to send SIZE bytes of data pointed by SRC
  534. * to the sink linked to DST_NODE. The latter store them in DST with a synchronous
  535. * mode.
  536. */
  537. int _starpu_src_common_copy_sink_to_sink_sync(struct _starpu_mp_node *src_node,
  538. struct _starpu_mp_node *dst_node, void *src, void *dst, size_t size)
  539. {
  540. enum _starpu_mp_command answer;
  541. void *arg;
  542. int arg_size;
  543. struct _starpu_mp_transfer_command_to_device cmd = {dst_node->peer_id, size, src, NULL};
  544. /* lock the node with the little peer_id first to prevent deadlock */
  545. if (src_node->peer_id > dst_node->peer_id)
  546. {
  547. STARPU_PTHREAD_MUTEX_LOCK(&dst_node->connection_mutex);
  548. STARPU_PTHREAD_MUTEX_LOCK(&src_node->connection_mutex);
  549. }
  550. else
  551. {
  552. STARPU_PTHREAD_MUTEX_LOCK(&src_node->connection_mutex);
  553. STARPU_PTHREAD_MUTEX_LOCK(&dst_node->connection_mutex);
  554. }
  555. /* Tell source to send data to dest. */
  556. _starpu_mp_common_send_command(src_node, STARPU_MP_COMMAND_SEND_TO_SINK, &cmd, sizeof(cmd));
  557. /* Release the source as fast as possible */
  558. STARPU_PTHREAD_MUTEX_UNLOCK(&src_node->connection_mutex);
  559. cmd.devid = src_node->peer_id;
  560. cmd.size = size;
  561. cmd.addr = dst;
  562. /* Tell dest to receive data from source. */
  563. _starpu_mp_common_send_command(dst_node, STARPU_MP_COMMAND_RECV_FROM_SINK, &cmd, sizeof(cmd));
  564. /* Wait for answer from dest to know wether transfer is finished. */
  565. answer = _starpu_src_common_wait_command_sync(dst_node, &arg, &arg_size);
  566. STARPU_ASSERT(answer == STARPU_MP_COMMAND_TRANSFER_COMPLETE);
  567. /* Release the receiver when we received the acknowlegment */
  568. STARPU_PTHREAD_MUTEX_UNLOCK(&dst_node->connection_mutex);
  569. return 0;
  570. }
  571. /* Tell the sink linked to SRC_NODE to send SIZE bytes of data pointed by SRC
  572. * to the sink linked to DST_NODE. The latter store them in DST with an asynchronous
  573. * mode.
  574. */
  575. int _starpu_src_common_copy_sink_to_sink_async(struct _starpu_mp_node *src_node,
  576. struct _starpu_mp_node *dst_node, void *src, void *dst, size_t size, void * event)
  577. {
  578. struct _starpu_mp_transfer_command_to_device cmd = {dst_node->peer_id, size, src, event};
  579. /* lock the node with the little peer_id first to prevent deadlock */
  580. if (src_node->peer_id > dst_node->peer_id)
  581. {
  582. STARPU_PTHREAD_MUTEX_LOCK(&dst_node->connection_mutex);
  583. STARPU_PTHREAD_MUTEX_LOCK(&src_node->connection_mutex);
  584. }
  585. else
  586. {
  587. STARPU_PTHREAD_MUTEX_LOCK(&src_node->connection_mutex);
  588. STARPU_PTHREAD_MUTEX_LOCK(&dst_node->connection_mutex);
  589. }
  590. /* For asynchronous transfers, we save informations
  591. * to test is they are finished
  592. */
  593. struct _starpu_async_channel * async_channel = event;
  594. async_channel->polling_node_sender = src_node;
  595. async_channel->polling_node_receiver = dst_node;
  596. /* Increase number of ack waited */
  597. async_channel->starpu_mp_common_finished_receiver++;
  598. async_channel->starpu_mp_common_finished_sender++;
  599. /* Tell source to send data to dest. */
  600. _starpu_mp_common_send_command(src_node, STARPU_MP_COMMAND_SEND_TO_SINK_ASYNC, &cmd, sizeof(cmd));
  601. STARPU_PTHREAD_MUTEX_UNLOCK(&src_node->connection_mutex);
  602. cmd.devid = src_node->peer_id;
  603. cmd.size = size;
  604. cmd.addr = dst;
  605. /* Tell dest to receive data from source. */
  606. _starpu_mp_common_send_command(dst_node, STARPU_MP_COMMAND_RECV_FROM_SINK_ASYNC, &cmd, sizeof(cmd));
  607. STARPU_PTHREAD_MUTEX_UNLOCK(&dst_node->connection_mutex);
  608. return -EAGAIN;
  609. }
  610. /* 5 functions to determine the executable to run on the device (MIC, SCC,
  611. * MPI).
  612. */
  613. static void _starpu_src_common_cat_3(char *final, const char *first,
  614. const char *second, const char *third)
  615. {
  616. strcpy(final, first);
  617. strcat(final, second);
  618. strcat(final, third);
  619. }
  620. static void _starpu_src_common_cat_2(char *final, const char *first, const char *second)
  621. {
  622. _starpu_src_common_cat_3(final, first, second, "");
  623. }
  624. static void _starpu_src_common_dir_cat(char *final, const char *dir, const char *file)
  625. {
  626. if (file[0] == '/')
  627. ++file;
  628. size_t size = strlen(dir);
  629. if (dir[size - 1] == '/')
  630. _starpu_src_common_cat_2(final, dir, file);
  631. else
  632. _starpu_src_common_cat_3(final, dir, "/", file);
  633. }
  634. static int _starpu_src_common_test_suffixes(char *located_file_name, const char *base, const char **suffixes)
  635. {
  636. unsigned int i;
  637. for (i = 0; suffixes[i] != NULL; ++i)
  638. {
  639. _starpu_src_common_cat_2(located_file_name, base, suffixes[i]);
  640. if (access(located_file_name, R_OK) == 0)
  641. return 0;
  642. }
  643. return 1;
  644. }
  645. int _starpu_src_common_locate_file(char *located_file_name,
  646. const char *env_file_name, const char *env_mic_path,
  647. const char *config_file_name, const char *actual_file_name,
  648. const char **suffixes)
  649. {
  650. if (env_file_name != NULL)
  651. {
  652. if (access(env_file_name, R_OK) == 0)
  653. {
  654. strcpy(located_file_name, env_file_name);
  655. return 0;
  656. }
  657. else if(env_mic_path != NULL)
  658. {
  659. _starpu_src_common_dir_cat(located_file_name, env_mic_path, env_file_name);
  660. return access(located_file_name, R_OK);
  661. }
  662. }
  663. else if (config_file_name != NULL)
  664. {
  665. if (access(config_file_name, R_OK) == 0)
  666. {
  667. strcpy(located_file_name, config_file_name);
  668. return 0;
  669. }
  670. else if (env_mic_path != NULL)
  671. {
  672. _starpu_src_common_dir_cat(located_file_name, env_mic_path, config_file_name);
  673. return access(located_file_name, R_OK);
  674. }
  675. }
  676. else if (actual_file_name != NULL)
  677. {
  678. if (_starpu_src_common_test_suffixes(located_file_name, actual_file_name, suffixes) == 0)
  679. return 0;
  680. if (env_mic_path != NULL)
  681. {
  682. char actual_cpy[1024];
  683. strcpy(actual_cpy, actual_file_name);
  684. char *last = strrchr(actual_cpy, '/');
  685. while (last != NULL)
  686. {
  687. char tmp[1024];
  688. _starpu_src_common_dir_cat(tmp, env_mic_path, last);
  689. if (access(tmp, R_OK) == 0)
  690. {
  691. strcpy(located_file_name, tmp);
  692. return 0;
  693. }
  694. if (_starpu_src_common_test_suffixes(located_file_name, tmp, suffixes) == 0)
  695. return 0;
  696. *last = '\0';
  697. char *last_tmp = strrchr(actual_cpy, '/');
  698. *last = '/';
  699. last = last_tmp;
  700. }
  701. }
  702. }
  703. return 1;
  704. }
  705. #if defined(STARPU_USE_MPI_MASTER_SLAVE) && !defined(STARPU_MPI_MASTER_SLAVE_MULTIPLE_THREAD)
  706. void _starpu_src_common_init_switch_env(unsigned this)
  707. {
  708. save_thread_env[this].current_task = starpu_task_get_current();
  709. save_thread_env[this].current_worker = STARPU_PTHREAD_GETSPECIFIC(_starpu_worker_key);
  710. save_thread_env[this].current_worker_set = STARPU_PTHREAD_GETSPECIFIC(_starpu_worker_set_key);
  711. save_thread_env[this].current_mem_node = STARPU_PTHREAD_GETSPECIFIC(_starpu_memory_node_key);
  712. #ifdef STARPU_OPENMP
  713. save_thread_env[this].current_omp_thread = STARPU_PTHREAD_GETSPECIFIC(omp_thread_key);
  714. save_thread_env[this].current_omp_task = STARPU_PTHREAD_GETSPECIFIC(omp_task_key);
  715. #endif
  716. }
  717. static void _starpu_src_common_switch_env(unsigned old, unsigned new)
  718. {
  719. save_thread_env[old].current_task = starpu_task_get_current();
  720. save_thread_env[old].current_worker = STARPU_PTHREAD_GETSPECIFIC(_starpu_worker_key);
  721. save_thread_env[old].current_worker_set = STARPU_PTHREAD_GETSPECIFIC(_starpu_worker_set_key);
  722. save_thread_env[old].current_mem_node = STARPU_PTHREAD_GETSPECIFIC(_starpu_memory_node_key);
  723. #ifdef STARPU_OPENMP
  724. save_thread_env[old].current_omp_thread = STARPU_PTHREAD_GETSPECIFIC(omp_thread_key);
  725. save_thread_env[old].current_omp_task = STARPU_PTHREAD_GETSPECIFIC(omp_task_key);
  726. #endif
  727. _starpu_set_current_task(save_thread_env[new].current_task);
  728. STARPU_PTHREAD_SETSPECIFIC(_starpu_worker_key, save_thread_env[new].current_worker);
  729. STARPU_PTHREAD_SETSPECIFIC(_starpu_worker_set_key, save_thread_env[new].current_worker_set);
  730. STARPU_PTHREAD_SETSPECIFIC(_starpu_memory_node_key, save_thread_env[new].current_mem_node);
  731. #ifdef STARPU_OPENMP
  732. STARPU_PTHREAD_SETSPECIFIC(omp_thread_key, save_thread_env[new].current_omp_thread);
  733. STARPU_PTHREAD_SETSPECIFIC(omp_task_key, save_thread_env[new].current_omp_task);
  734. #endif
  735. }
  736. #endif
  737. /* Send workers to the sink node
  738. */
  739. static void _starpu_src_common_send_workers(struct _starpu_mp_node * node, int baseworkerid, int nworkers)
  740. {
  741. struct _starpu_machine_config *config = _starpu_get_machine_config();
  742. int worker_size = sizeof(struct _starpu_worker)*nworkers;
  743. int combined_worker_size = STARPU_NMAX_COMBINEDWORKERS*sizeof(struct _starpu_combined_worker);
  744. int msg[5];
  745. msg[0] = nworkers;
  746. msg[1] = worker_size;
  747. msg[2] = combined_worker_size;
  748. msg[3] = baseworkerid;
  749. msg[4] = starpu_worker_get_count();
  750. STARPU_PTHREAD_MUTEX_LOCK(&node->connection_mutex);
  751. /* tell the sink node that we will send him all workers */
  752. _starpu_mp_common_send_command(node, STARPU_MP_COMMAND_SYNC_WORKERS,
  753. &msg, sizeof(msg));
  754. /* Send all worker to the sink node */
  755. node->dt_send(node,&config->workers[baseworkerid],worker_size, NULL);
  756. /* Send all combined workers to the sink node */
  757. node->dt_send(node, &config->combined_workers,combined_worker_size, NULL);
  758. STARPU_PTHREAD_MUTEX_UNLOCK(&node->connection_mutex);
  759. }
  760. /* Callback used when a buffer is send asynchronously to the sink */
  761. static void _starpu_src_common_send_data_callback(void *arg)
  762. {
  763. struct _starpu_worker * worker = (struct _starpu_worker *) arg;
  764. /* increase the number of buffer received */
  765. STARPU_WMB();
  766. (void)STARPU_ATOMIC_ADD(&worker->nb_buffers_sent, 1);
  767. }
  768. static void _starpu_src_common_worker_internal_work(struct _starpu_worker_set * worker_set, struct _starpu_mp_node * mp_node, struct starpu_task **tasks, unsigned memnode)
  769. {
  770. int res = 0;
  771. _starpu_may_pause();
  772. #ifdef STARPU_SIMGRID
  773. starpu_pthread_wait_reset(&worker_set->workers[0].wait);
  774. #endif
  775. /* Test if async transfers are completed */
  776. for (unsigned i = 0; i < worker_set->nworkers; i++)
  777. {
  778. /* We send all buffers to execute the task */
  779. if (worker_set->workers[i].task_sending != NULL && worker_set->workers[i].nb_buffers_sent == STARPU_TASK_GET_NBUFFERS(worker_set->workers[i].task_sending))
  780. {
  781. int workerid = worker_set->workers[i].workerid;
  782. STARPU_RMB();
  783. _STARPU_TRACE_WORKER_END_FETCH_INPUT(NULL, workerid);
  784. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(worker_set->workers[i].task_sending);
  785. unsigned buf;
  786. for (buf = 0; buf < nbuffers; buf++)
  787. {
  788. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(worker_set->workers[i].task_sending, buf);
  789. struct _starpu_data_replicate *replicate = &handle->per_node[memnode];
  790. /* Release our refcnt */
  791. _starpu_spin_lock(&handle->header_lock);
  792. replicate->refcnt--;
  793. STARPU_ASSERT(replicate->refcnt >= 0);
  794. STARPU_ASSERT(handle->busy_count > 0);
  795. handle->busy_count--;
  796. if (!_starpu_data_check_not_busy(handle))
  797. _starpu_spin_unlock(&handle->header_lock);
  798. }
  799. /* Execute the task */
  800. struct _starpu_job * j = _starpu_get_job_associated_to_task(worker_set->workers[i].task_sending);
  801. _starpu_set_local_worker_key(&worker_set->workers[i]);
  802. res = _starpu_src_common_execute(j, &worker_set->workers[i], mp_node);
  803. switch (res)
  804. {
  805. case 0:
  806. /* The task task has been launched with no error */
  807. break;
  808. case -EAGAIN:
  809. _STARPU_DISP("ouch, this MP worker could not actually run task %p, putting it back...\n", tasks[i]);
  810. _starpu_push_task_to_workers(worker_set->workers[i].task_sending);
  811. STARPU_ABORT();
  812. continue;
  813. break;
  814. default:
  815. STARPU_ASSERT(0);
  816. }
  817. /* Reset it */
  818. worker_set->workers[i].task_sending = NULL;
  819. worker_set->workers[i].nb_buffers_sent = 0;
  820. }
  821. }
  822. _STARPU_TRACE_START_PROGRESS(memnode);
  823. res |= __starpu_datawizard_progress(1, 1);
  824. _STARPU_TRACE_END_PROGRESS(memnode);
  825. /* Handle message which have been store */
  826. _starpu_src_common_handle_stored_async(mp_node);
  827. STARPU_PTHREAD_MUTEX_LOCK(&mp_node->connection_mutex);
  828. /* poll the device for completed jobs.*/
  829. while(mp_node->mp_recv_is_ready(mp_node))
  830. {
  831. _starpu_src_common_recv_async(mp_node);
  832. /* Mutex is unlock in _starpu_src_common_recv_async */
  833. STARPU_PTHREAD_MUTEX_LOCK(&mp_node->connection_mutex);
  834. }
  835. STARPU_PTHREAD_MUTEX_UNLOCK(&mp_node->connection_mutex);
  836. /* get task for each worker*/
  837. res |= _starpu_get_multi_worker_task(worker_set->workers, tasks, worker_set->nworkers, memnode);
  838. #ifdef STARPU_SIMGRID
  839. if (!res)
  840. starpu_pthread_wait_wait(&worker_set->workers[0].wait);
  841. #endif
  842. /*if at least one worker have pop a task*/
  843. if(res != 0)
  844. {
  845. unsigned i, buf;
  846. for(i=0; i<worker_set->nworkers; i++)
  847. {
  848. if(tasks[i] != NULL)
  849. {
  850. int workerid = worker_set->workers[i].workerid;
  851. _STARPU_TRACE_WORKER_START_FETCH_INPUT(NULL, workerid);
  852. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(tasks[i]);
  853. for (buf = 0; buf < nbuffers; buf++)
  854. {
  855. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(tasks[i], buf);
  856. enum starpu_data_access_mode mode = STARPU_TASK_GET_MODE(tasks[i], buf);
  857. struct _starpu_data_replicate *local_replicate = get_replicate(handle, mode, workerid, memnode);
  858. int ret = _starpu_fetch_data_on_node(handle, memnode, local_replicate, mode, 0, 0, 1,
  859. _starpu_src_common_send_data_callback, &worker_set->workers[i], 0, "_starpu_src_common_worker_internal_work");
  860. STARPU_ASSERT(!ret);
  861. }
  862. worker_set->workers[i].task_sending = tasks[i];
  863. }
  864. }
  865. }
  866. /* Handle message which have been store */
  867. _starpu_src_common_handle_stored_async(mp_node);
  868. }
  869. #if defined(STARPU_USE_MPI_MASTER_SLAVE) && !defined(STARPU_MPI_MASTER_SLAVE_MULTIPLE_THREAD)
  870. /* Function looping on the source node */
  871. void _starpu_src_common_workers_set(struct _starpu_worker_set * worker_set,
  872. int ndevices, struct _starpu_mp_node ** mp_node)
  873. {
  874. unsigned memnode[ndevices];
  875. unsigned offsetmemnode[ndevices];
  876. memset(offsetmemnode, 0, ndevices*sizeof(unsigned));
  877. int device;
  878. int nbworkers = 0;
  879. for (device = 0; device < ndevices; device++)
  880. {
  881. memnode[device] = worker_set[device].workers[0].memory_node;
  882. nbworkers += worker_set[device].nworkers;
  883. if (device != 0)
  884. offsetmemnode[device] += offsetmemnode[device-1];
  885. if (device != ndevices -1)
  886. offsetmemnode[device+1] += worker_set[device].nworkers;
  887. }
  888. struct starpu_task **tasks;
  889. _STARPU_MALLOC(tasks, sizeof(struct starpu_task *)*nbworkers);
  890. for (device = 0; device < ndevices; device++)
  891. {
  892. struct _starpu_worker *baseworker = &worker_set[device].workers[0];
  893. struct _starpu_machine_config *config = baseworker->config;
  894. unsigned baseworkerid = baseworker - config->workers;
  895. _starpu_src_common_send_workers(mp_node[device], baseworkerid, worker_set[device].nworkers);
  896. }
  897. /*main loop*/
  898. while (_starpu_machine_is_running())
  899. {
  900. for (device = 0; device < ndevices ; device++)
  901. {
  902. _starpu_src_common_switch_env(((device-1)+ndevices)%ndevices, device);
  903. _starpu_src_common_worker_internal_work(&worker_set[device], mp_node[device], tasks+offsetmemnode[device], memnode[device]);
  904. }
  905. }
  906. free(tasks);
  907. for (device = 0; device < ndevices; device++)
  908. _starpu_handle_all_pending_node_data_requests(memnode[device]);
  909. /* In case there remains some memory that was automatically
  910. * allocated by StarPU, we release it now. Note that data
  911. * coherency is not maintained anymore at that point ! */
  912. for (device = 0; device < ndevices; device++)
  913. _starpu_free_all_automatically_allocated_buffers(memnode[device]);
  914. }
  915. #endif
  916. /* Function looping on the source node */
  917. void _starpu_src_common_worker(struct _starpu_worker_set * worker_set,
  918. unsigned baseworkerid,
  919. struct _starpu_mp_node * mp_node)
  920. {
  921. unsigned memnode = worker_set->workers[0].memory_node;
  922. struct starpu_task **tasks;
  923. _STARPU_MALLOC(tasks, sizeof(struct starpu_task *)*worker_set->nworkers);
  924. _starpu_src_common_send_workers(mp_node, baseworkerid, worker_set->nworkers);
  925. /*main loop*/
  926. while (_starpu_machine_is_running())
  927. {
  928. _starpu_src_common_worker_internal_work(worker_set, mp_node, tasks, memnode);
  929. }
  930. free(tasks);
  931. _starpu_handle_all_pending_node_data_requests(memnode);
  932. /* In case there remains some memory that was automatically
  933. * allocated by StarPU, we release it now. Note that data
  934. * coherency is not maintained anymore at that point ! */
  935. _starpu_free_all_automatically_allocated_buffers(memnode);
  936. }