sink_common.c 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2012 INRIA
  4. *
  5. * StarPU is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * StarPU is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <starpu.h>
  17. #include <common/config.h>
  18. #include <common/utils.h>
  19. #include <drivers/mp_common/mp_common.h>
  20. #include <datawizard/interfaces/data_interface.h>
  21. #include <common/barrier.h>
  22. #include <core/workers.h>
  23. #include <common/barrier_counter.h>
  24. #ifdef STARPU_USE_MIC
  25. #include <common/COISysInfo_common.h>
  26. #endif
  27. #include "sink_common.h"
  28. /* Return the sink kind of the running process, based on the value of the
  29. * STARPU_SINK environment variable.
  30. * If there is no valid value retrieved, return STARPU_INVALID_KIND
  31. */
  32. static enum _starpu_mp_node_kind _starpu_sink_common_get_kind(void)
  33. {
  34. /* Environment varible STARPU_SINK must be defined when running on sink
  35. * side : let's use it to get the kind of node we're running on */
  36. char *node_kind = starpu_getenv("STARPU_SINK");
  37. STARPU_ASSERT(node_kind);
  38. if (!strcmp(node_kind, "STARPU_MIC"))
  39. return STARPU_MIC_SINK;
  40. else if (!strcmp(node_kind, "STARPU_SCC"))
  41. return STARPU_SCC_SINK;
  42. else if (!strcmp(node_kind, "STARPU_MPI"))
  43. return STARPU_MPI_SINK;
  44. else
  45. return STARPU_INVALID_KIND;
  46. }
  47. /* Send to host the number of cores of the sink device
  48. */
  49. static void _starpu_sink_common_get_nb_cores (struct _starpu_mp_node *node)
  50. {
  51. // Process packet received from `_starpu_src_common_sink_cores'.
  52. _starpu_mp_common_send_command (node, STARPU_ANSWER_SINK_NBCORES,
  53. &node->nb_cores, sizeof (int));
  54. }
  55. /* Send to host the address of the function given in parameter
  56. */
  57. static void _starpu_sink_common_lookup(const struct _starpu_mp_node *node,
  58. char *func_name)
  59. {
  60. void (*func)(void);
  61. func = node->lookup(node,func_name);
  62. //_STARPU_DEBUG("Looked up %s, got %p\n", func_name, func);
  63. /* If we couldn't find the function, let's send an error to the host.
  64. * The user probably made a mistake in the name */
  65. if (func)
  66. _starpu_mp_common_send_command(node, STARPU_ANSWER_LOOKUP,
  67. &func, sizeof(func));
  68. else
  69. _starpu_mp_common_send_command(node, STARPU_ERROR_LOOKUP,
  70. NULL, 0);
  71. }
  72. /* Allocate a memory space and send the address of this space to the host
  73. */
  74. void _starpu_sink_common_allocate(const struct _starpu_mp_node *mp_node,
  75. void *arg, int arg_size)
  76. {
  77. STARPU_ASSERT(arg_size == sizeof(size_t));
  78. void *addr = malloc(*(size_t *)(arg));
  79. /* If the allocation fail, let's send an error to the host.
  80. */
  81. if (addr)
  82. _starpu_mp_common_send_command(mp_node, STARPU_ANSWER_ALLOCATE,
  83. &addr, sizeof(addr));
  84. else
  85. _starpu_mp_common_send_command(mp_node, STARPU_ERROR_ALLOCATE,
  86. NULL, 0);
  87. }
  88. void _starpu_sink_common_free(const struct _starpu_mp_node *mp_node STARPU_ATTRIBUTE_UNUSED,
  89. void *arg, int arg_size)
  90. {
  91. STARPU_ASSERT(arg_size == sizeof(void *));
  92. free(*(void **)(arg));
  93. }
  94. static void _starpu_sink_common_copy_from_host(const struct _starpu_mp_node *mp_node,
  95. void *arg, int arg_size)
  96. {
  97. STARPU_ASSERT(arg_size == sizeof(struct _starpu_mp_transfer_command));
  98. struct _starpu_mp_transfer_command *cmd = (struct _starpu_mp_transfer_command *)arg;
  99. mp_node->dt_recv(mp_node, cmd->addr, cmd->size);
  100. }
  101. static void _starpu_sink_common_copy_to_host(const struct _starpu_mp_node *mp_node,
  102. void *arg, int arg_size)
  103. {
  104. STARPU_ASSERT(arg_size == sizeof(struct _starpu_mp_transfer_command));
  105. struct _starpu_mp_transfer_command *cmd = (struct _starpu_mp_transfer_command *)arg;
  106. mp_node->dt_send(mp_node, cmd->addr, cmd->size);
  107. }
  108. static void _starpu_sink_common_copy_from_sink(const struct _starpu_mp_node *mp_node,
  109. void *arg, int arg_size)
  110. {
  111. STARPU_ASSERT(arg_size == sizeof(struct _starpu_mp_transfer_command_to_device));
  112. struct _starpu_mp_transfer_command_to_device *cmd = (struct _starpu_mp_transfer_command_to_device *)arg;
  113. mp_node->dt_recv_from_device(mp_node, cmd->devid, cmd->addr, cmd->size);
  114. _starpu_mp_common_send_command(mp_node, STARPU_TRANSFER_COMPLETE, NULL, 0);
  115. }
  116. static void _starpu_sink_common_copy_to_sink(const struct _starpu_mp_node *mp_node,
  117. void *arg, int arg_size)
  118. {
  119. STARPU_ASSERT(arg_size == sizeof(struct _starpu_mp_transfer_command_to_device));
  120. struct _starpu_mp_transfer_command_to_device *cmd = (struct _starpu_mp_transfer_command_to_device *)arg;
  121. mp_node->dt_send_to_device(mp_node, cmd->devid, cmd->addr, cmd->size);
  122. }
  123. /* Receive workers and combined workers and store them into the struct config
  124. */
  125. static void _starpu_sink_common_recv_workers(struct _starpu_mp_node * node, void *arg, int arg_size)
  126. {
  127. /* Retrieve information from the message */
  128. STARPU_ASSERT(arg_size == (sizeof(int)*5));
  129. uintptr_t arg_ptr = (uintptr_t) arg;
  130. int i;
  131. int nworkers = *(int *)arg_ptr;
  132. arg_ptr += sizeof(nworkers);
  133. int worker_size = *(int *)arg_ptr;
  134. arg_ptr += sizeof(worker_size);
  135. int combined_worker_size = *(int *)arg_ptr;
  136. arg_ptr += sizeof(combined_worker_size);
  137. int baseworkerid = *(int *)arg_ptr;
  138. arg_ptr += sizeof(baseworkerid);
  139. struct _starpu_machine_config *config = _starpu_get_machine_config();
  140. config->topology.nworkers = *(int *)arg_ptr;
  141. /* Retrieve workers */
  142. struct _starpu_worker * workers = &config->workers[baseworkerid];
  143. node->dt_recv(node,workers,worker_size);
  144. /* Update workers to have coherent field */
  145. for(i=0; i<nworkers; i++)
  146. {
  147. workers[i].config = config;
  148. starpu_pthread_mutex_init(&workers[i].mutex,NULL);
  149. starpu_pthread_mutex_destroy(&workers[i].mutex);
  150. starpu_pthread_cond_init(&workers[i].started_cond,NULL);
  151. starpu_pthread_cond_destroy(&workers[i].started_cond);
  152. starpu_pthread_cond_init(&workers[i].ready_cond,NULL);
  153. starpu_pthread_cond_destroy(&workers[i].ready_cond);
  154. starpu_pthread_mutex_init(&workers[i].sched_mutex,NULL);
  155. starpu_pthread_mutex_destroy(&workers[i].sched_mutex);
  156. starpu_pthread_cond_init(&workers[i].sched_cond,NULL);
  157. starpu_pthread_cond_destroy(&workers[i].sched_cond);
  158. workers[i].current_task = NULL;
  159. workers[i].set = NULL;
  160. }
  161. /* Retrieve combined workers */
  162. struct _starpu_combined_worker * combined_workers = config->combined_workers;
  163. node->dt_recv(node, combined_workers, combined_worker_size);
  164. node->baseworkerid = baseworkerid;
  165. STARPU_PTHREAD_BARRIER_WAIT(&node->init_completed_barrier);
  166. }
  167. /* Function looping on the sink, waiting for tasks to execute.
  168. * If the caller is the host, don't do anything.
  169. */
  170. void _starpu_sink_common_worker(void)
  171. {
  172. struct _starpu_mp_node *node = NULL;
  173. enum _starpu_mp_command command = STARPU_EXIT;
  174. int arg_size = 0;
  175. void *arg = NULL;
  176. int exit_starpu = 0;
  177. enum _starpu_mp_node_kind node_kind = _starpu_sink_common_get_kind();
  178. if (node_kind == STARPU_INVALID_KIND)
  179. _STARPU_ERROR("No valid sink kind retrieved, use the"
  180. "STARPU_SINK environment variable to specify"
  181. "this\n");
  182. /* Create and initialize the node */
  183. node = _starpu_mp_common_node_create(node_kind, -1);
  184. starpu_pthread_key_t worker_key;
  185. STARPU_PTHREAD_KEY_CREATE(&worker_key, NULL);
  186. while (!exit_starpu)
  187. {
  188. /* If we have received a message */
  189. if(node->mp_recv_is_ready(node))
  190. {
  191. command = _starpu_mp_common_recv_command(node, &arg, &arg_size);
  192. switch(command)
  193. {
  194. case STARPU_EXIT:
  195. exit_starpu = 1;
  196. break;
  197. case STARPU_EXECUTE:
  198. node->execute(node, arg, arg_size);
  199. break;
  200. case STARPU_SINK_NBCORES:
  201. _starpu_sink_common_get_nb_cores(node);
  202. break;
  203. case STARPU_LOOKUP:
  204. _starpu_sink_common_lookup(node, (char *) arg);
  205. break;
  206. case STARPU_ALLOCATE:
  207. node->allocate(node, arg, arg_size);
  208. break;
  209. case STARPU_FREE:
  210. node->free(node, arg, arg_size);
  211. break;
  212. case STARPU_RECV_FROM_HOST:
  213. _starpu_sink_common_copy_from_host(node, arg, arg_size);
  214. break;
  215. case STARPU_SEND_TO_HOST:
  216. _starpu_sink_common_copy_to_host(node, arg, arg_size);
  217. break;
  218. case STARPU_RECV_FROM_SINK:
  219. _starpu_sink_common_copy_from_sink(node, arg, arg_size);
  220. break;
  221. case STARPU_SEND_TO_SINK:
  222. _starpu_sink_common_copy_to_sink(node, arg, arg_size);
  223. break;
  224. case STARPU_SYNC_WORKERS:
  225. _starpu_sink_common_recv_workers(node, arg, arg_size);
  226. break;
  227. default:
  228. printf("Oops, command %x unrecognized\n", command);
  229. }
  230. }
  231. STARPU_PTHREAD_MUTEX_LOCK(&node->message_queue_mutex);
  232. /* If the list is not empty */
  233. if(!mp_message_list_empty(&node->message_queue))
  234. {
  235. /* We pop a message and send it to the host */
  236. struct mp_message * message = mp_message_list_pop_back(&node->message_queue);
  237. STARPU_PTHREAD_MUTEX_UNLOCK(&node->message_queue_mutex);
  238. //_STARPU_DEBUG("telling host that we have finished the task %p sur %d.\n", task->kernel, task->coreid);
  239. _starpu_mp_common_send_command(node, message->type,
  240. message->buffer, message->size);
  241. free(message->buffer);
  242. mp_message_delete(message);
  243. }
  244. else
  245. {
  246. STARPU_PTHREAD_MUTEX_UNLOCK(&node->message_queue_mutex);
  247. }
  248. }
  249. STARPU_PTHREAD_KEY_DELETE(worker_key);
  250. /* Deinitialize the node and release it */
  251. _starpu_mp_common_node_destroy(node);
  252. exit(0);
  253. }
  254. /* Search for the mp_barrier correspondind to the specified combined worker
  255. * and create it if it doesn't exist
  256. */
  257. static struct mp_barrier * _starpu_sink_common_get_barrier(struct _starpu_mp_node * node, int cb_workerid, int cb_workersize)
  258. {
  259. struct mp_barrier * b = NULL;
  260. STARPU_PTHREAD_MUTEX_LOCK(&node->barrier_mutex);
  261. /* Search if the barrier already exist */
  262. for(b = mp_barrier_list_begin(&node->barrier_list);
  263. b != mp_barrier_list_end(&node->barrier_list) && b->id != cb_workerid;
  264. b = mp_barrier_list_next(b));
  265. /* If we found the barrier */
  266. if(b != NULL)
  267. {
  268. STARPU_PTHREAD_MUTEX_UNLOCK(&node->barrier_mutex);
  269. return b;
  270. }
  271. else
  272. {
  273. /* Else we create, initialize and add it to the list*/
  274. b = mp_barrier_new();
  275. b->id = cb_workerid;
  276. STARPU_PTHREAD_BARRIER_INIT(&b->before_work_barrier,NULL,cb_workersize);
  277. STARPU_PTHREAD_BARRIER_INIT(&b->after_work_barrier,NULL,cb_workersize);
  278. mp_barrier_list_push_back(&node->barrier_list,b);
  279. STARPU_PTHREAD_MUTEX_UNLOCK(&node->barrier_mutex);
  280. return b;
  281. }
  282. }
  283. /* Erase for the mp_barrier correspondind to the specified combined worker
  284. */
  285. static void _starpu_sink_common_erase_barrier(struct _starpu_mp_node * node, struct mp_barrier *barrier)
  286. {
  287. STARPU_PTHREAD_MUTEX_LOCK(&node->barrier_mutex);
  288. mp_barrier_list_erase(&node->barrier_list,barrier);
  289. STARPU_PTHREAD_MUTEX_UNLOCK(&node->barrier_mutex);
  290. }
  291. /* Append the message given in parameter to the message list
  292. */
  293. static void _starpu_sink_common_append_message(struct _starpu_mp_node *node, struct mp_message * message)
  294. {
  295. STARPU_PTHREAD_MUTEX_LOCK(&node->message_queue_mutex);
  296. mp_message_list_push_front(&node->message_queue,message);
  297. STARPU_PTHREAD_MUTEX_UNLOCK(&node->message_queue_mutex);
  298. }
  299. /* Append to the message list a "STARPU_PRE_EXECUTION" message
  300. */
  301. static void _starpu_sink_common_pre_execution_message(struct _starpu_mp_node *node, struct mp_task *task)
  302. {
  303. /* Init message to tell the sink that the execution has begun */
  304. struct mp_message * message = mp_message_new();
  305. message->type = STARPU_PRE_EXECUTION;
  306. message->buffer = malloc(sizeof(int));
  307. *(int *) message->buffer = task->combined_workerid;
  308. message->size = sizeof(int);
  309. /* Append the message to the queue */
  310. _starpu_sink_common_append_message(node, message);
  311. }
  312. /* Append to the message list a "STARPU_EXECUTION_COMPLETED" message
  313. */
  314. static void _starpu_sink_common_execution_completed_message(struct _starpu_mp_node *node, struct mp_task *task)
  315. {
  316. /* Init message to tell the sink that the execution is completed */
  317. struct mp_message * message = mp_message_new();
  318. message->type = STARPU_EXECUTION_COMPLETED;
  319. message->buffer = malloc(sizeof(int));
  320. *(int*) message->buffer = task->coreid;
  321. message->size = sizeof(int);
  322. /* Append the message to the queue */
  323. _starpu_sink_common_append_message(node, message);
  324. }
  325. /* Bind the thread which is running on the specified core to the combined worker */
  326. static void _starpu_sink_common_bind_to_combined_worker(struct _starpu_mp_node *node, int coreid, struct _starpu_combined_worker * combined_worker)
  327. {
  328. int i;
  329. int * bind_set = malloc(sizeof(int)*combined_worker->worker_size);
  330. for(i=0;i<combined_worker->worker_size;i++)
  331. bind_set[i] = combined_worker->combined_workerid[i] - node->baseworkerid;
  332. node->bind_thread(node, coreid, bind_set, combined_worker->worker_size);
  333. }
  334. /* Get the current rank of the worker in the combined worker
  335. */
  336. static int _starpu_sink_common_get_current_rank(int workerid, struct _starpu_combined_worker * combined_worker)
  337. {
  338. int i;
  339. for(i=0; i<combined_worker->worker_size; i++)
  340. if(workerid == combined_worker->combined_workerid[i])
  341. return i;
  342. STARPU_ASSERT(0);
  343. return -1;
  344. }
  345. /* Execute the task
  346. */
  347. static void _starpu_sink_common_execute_kernel(struct _starpu_mp_node *node, int coreid, struct _starpu_worker * worker)
  348. {
  349. struct _starpu_combined_worker * combined_worker = NULL;
  350. struct mp_task* task = node->run_table[coreid];
  351. /* If it's a parallel task */
  352. if(task->is_parallel_task)
  353. {
  354. combined_worker = _starpu_get_combined_worker_struct(task->combined_workerid);
  355. worker->current_rank = _starpu_sink_common_get_current_rank(worker->workerid, combined_worker);
  356. worker->combined_workerid = task->combined_workerid;
  357. worker->worker_size = combined_worker->worker_size;
  358. /* Synchronize with others threads of the combined worker*/
  359. STARPU_PTHREAD_BARRIER_WAIT(&task->mp_barrier->before_work_barrier);
  360. /* The first thread of the combined worker */
  361. if(worker->current_rank == 0)
  362. {
  363. /* tell the sink that the execution has begun */
  364. _starpu_sink_common_pre_execution_message(node,task);
  365. /* If the mode is FORKJOIN,
  366. * the first thread binds himself
  367. * on all core of the combined worker*/
  368. if(task->type == STARPU_FORKJOIN)
  369. {
  370. _starpu_sink_common_bind_to_combined_worker(node, coreid, combined_worker);
  371. }
  372. }
  373. }
  374. else
  375. {
  376. worker->current_rank = 0;
  377. worker->combined_workerid = 0;
  378. worker->worker_size = 1;
  379. }
  380. if(task->type != STARPU_FORKJOIN || worker->current_rank == 0)
  381. {
  382. if (_starpu_get_disable_kernels() <= 0)
  383. {
  384. /* execute the task */
  385. task->kernel(task->interfaces,task->cl_arg);
  386. }
  387. }
  388. /* If it's a parallel task */
  389. if(task->is_parallel_task)
  390. {
  391. /* Synchronize with others threads of the combined worker*/
  392. STARPU_PTHREAD_BARRIER_WAIT(&task->mp_barrier->after_work_barrier);
  393. /* The fisrt thread of the combined */
  394. if(worker->current_rank == 0)
  395. {
  396. /* Erase the barrier from the list */
  397. _starpu_sink_common_erase_barrier(node,task->mp_barrier);
  398. /* If the mode is FORKJOIN,
  399. * the first thread rebinds himself on his own core */
  400. if(task->type == STARPU_FORKJOIN)
  401. node->bind_thread(node, coreid, &coreid, 1);
  402. }
  403. }
  404. node->run_table[coreid] = NULL;
  405. /* tell the sink that the execution is completed */
  406. _starpu_sink_common_execution_completed_message(node,task);
  407. /*free the task*/
  408. unsigned i;
  409. for (i = 0; i < task->nb_interfaces; i++)
  410. free(task->interfaces[i]);
  411. free(task->interfaces);
  412. free(task);
  413. }
  414. /* The main function executed by the thread
  415. * thread_arg is a structure containing the information needed by the thread
  416. */
  417. void* _starpu_sink_thread(void * thread_arg)
  418. {
  419. /* Retrieve the information from the structure */
  420. struct _starpu_mp_node *node = ((struct arg_sink_thread *)thread_arg)->node;
  421. int coreid =((struct arg_sink_thread *)thread_arg)->coreid;
  422. /* free the structure */
  423. free(thread_arg);
  424. STARPU_PTHREAD_BARRIER_WAIT(&node->init_completed_barrier);
  425. struct _starpu_worker *worker = &_starpu_get_machine_config()->workers[node->baseworkerid + coreid];
  426. node->bind_thread(node, coreid, &coreid, 1);
  427. _starpu_set_local_worker_key(worker);
  428. while(node->is_running)
  429. {
  430. /*Wait there is a task available */
  431. sem_wait(&node->sem_run_table[coreid]);
  432. if(node->run_table[coreid] != NULL)
  433. _starpu_sink_common_execute_kernel(node,coreid,worker);
  434. }
  435. starpu_pthread_exit(NULL);
  436. }
  437. /* Add the task to the specific thread and wake him up
  438. */
  439. static void _starpu_sink_common_execute_thread(struct _starpu_mp_node *node, struct mp_task *task)
  440. {
  441. /* Add the task to the specific thread */
  442. node->run_table[task->coreid] = task;
  443. /* Unlock the mutex to wake up the thread which will execute the task */
  444. sem_post(&node->sem_run_table[task->coreid]);
  445. }
  446. /* Receive paquet from _starpu_src_common_execute_kernel in the form below :
  447. * [Function pointer on sink, number of interfaces, interfaces
  448. * (union _starpu_interface), cl_arg]
  449. * Then call the function given, passing as argument an array containing the
  450. * addresses of the received interfaces
  451. */
  452. void _starpu_sink_common_execute(struct _starpu_mp_node *node,
  453. void *arg, int arg_size)
  454. {
  455. unsigned i;
  456. uintptr_t arg_ptr = (uintptr_t) arg;
  457. struct mp_task *task = malloc(sizeof(struct mp_task));
  458. task->kernel = *(void(**)(void **, void *)) arg_ptr;
  459. arg_ptr += sizeof(task->kernel);
  460. task->type = *(enum starpu_codelet_type *) arg_ptr;
  461. arg_ptr += sizeof(task->type);
  462. task->is_parallel_task = *(int *) arg_ptr;
  463. arg_ptr += sizeof(task->is_parallel_task);
  464. if(task->is_parallel_task)
  465. {
  466. task->combined_workerid= *(int *) arg_ptr;
  467. arg_ptr += sizeof(task->combined_workerid);
  468. task->mp_barrier = _starpu_sink_common_get_barrier(node,task->combined_workerid,_starpu_get_combined_worker_struct(task->combined_workerid)->worker_size);
  469. }
  470. task->coreid = *(unsigned *) arg_ptr;
  471. arg_ptr += sizeof(task->coreid);
  472. task->nb_interfaces = *(unsigned *) arg_ptr;
  473. arg_ptr += sizeof(task->nb_interfaces);
  474. task->interfaces = malloc(task->nb_interfaces * sizeof(*task->interfaces));
  475. /* The function needs an array pointing to each interface it needs
  476. * during execution. As in sink-side there is no mean to know which
  477. * kind of interface to expect, the array is composed of unions of
  478. * interfaces, thus we expect the same size anyway */
  479. for (i = 0; i < task->nb_interfaces; i++)
  480. {
  481. union _starpu_interface * interface = malloc(sizeof(union _starpu_interface));
  482. memcpy(interface, (void*) arg_ptr,
  483. sizeof(union _starpu_interface));
  484. task->interfaces[i] = interface;
  485. arg_ptr += sizeof(union _starpu_interface);
  486. }
  487. /* Was cl_arg sent ? */
  488. if (arg_size > arg_ptr - (uintptr_t) arg)
  489. task->cl_arg = (void*) arg_ptr;
  490. else
  491. task->cl_arg = NULL;
  492. //_STARPU_DEBUG("telling host that we have submitted the task %p.\n", task->kernel);
  493. _starpu_mp_common_send_command(node, STARPU_EXECUTION_SUBMITTED,
  494. NULL, 0);
  495. //_STARPU_DEBUG("executing the task %p\n", task->kernel);
  496. _starpu_sink_common_execute_thread(node, task);
  497. }