driver_mic_source.c 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2012 Inria
  4. *
  5. * StarPU is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * StarPU is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <stdio.h>
  17. #include <scif.h>
  18. #include <starpu.h>
  19. #include <starpu_profiling.h>
  20. #include <core/sched_policy.h>
  21. #include <common/uthash.h>
  22. #include <drivers/driver_common/driver_common.h>
  23. #include <drivers/mp_common/source_common.h>
  24. #include "driver_mic_common.h"
  25. #include "driver_mic_source.h"
  26. /* Array of structures containing all the informations useful to send
  27. * and receive informations with devices */
  28. struct _starpu_mp_node *mic_nodes[STARPU_MAXMICDEVS];
  29. static COIENGINE handles[STARPU_MAXMICDEVS];
  30. /* Structure used by host to store informations about a kernel executable on
  31. * a MIC device : its name, and its address on each device.
  32. * If a kernel has been initialized, then a lookup has already been achieved and the
  33. * device knows how to call it, else the host still needs to do a lookup.
  34. */
  35. struct _starpu_mic_kernel
  36. {
  37. UT_hash_handle hh;
  38. char *name;
  39. starpu_mic_kernel_t func[STARPU_MAXMICDEVS];
  40. } *kernels;
  41. /* Mutex for concurrent access to the table.
  42. */
  43. starpu_pthread_mutex_t htbl_mutex = PTHREAD_MUTEX_INITIALIZER;
  44. /* Number of MIC worker initialized.
  45. */
  46. unsigned int nb_mic_worker_init = 0;
  47. starpu_pthread_mutex_t nb_mic_worker_init_mutex = PTHREAD_MUTEX_INITIALIZER;
  48. /* Returns the ID of the MIC device controlled by the caller.
  49. * if the worker doesn't control a MIC device -ENODEV is returned
  50. */
  51. //static int _starpu_mic_get_devid(void)
  52. //{
  53. // struct _starpu_machine_config *config = _starpu_get_machine_config();
  54. // int workerid = starpu_worker_get_id();
  55. //
  56. // if (config->workers[workerid].arch != STARPU_MIC_WORKER)
  57. // return -ENODEV;
  58. //
  59. // return config->workers[workerid].devid;
  60. //}
  61. const struct _starpu_mp_node *_starpu_mic_src_get_actual_thread_mp_node()
  62. {
  63. struct _starpu_worker *actual_worker = _starpu_get_local_worker_key();
  64. STARPU_ASSERT(actual_worker);
  65. int nodeid = actual_worker->mp_nodeid;
  66. STARPU_ASSERT(nodeid >= 0 && nodeid < STARPU_MAXMICDEVS);
  67. return mic_nodes[nodeid];
  68. }
  69. const struct _starpu_mp_node *_starpu_mic_src_get_mp_node_from_memory_node(int memory_node)
  70. {
  71. int nodeid = _starpu_memory_node_get_devid(memory_node);
  72. STARPU_ASSERT_MSG(nodeid >= 0 && nodeid < STARPU_MAXMICDEVS, "bogus nodeid %d for memory node %d\n", nodeid, memory_node);
  73. return mic_nodes[nodeid];
  74. }
  75. static void _starpu_mic_src_free_kernel(void *kernel)
  76. {
  77. struct _starpu_mic_kernel *k = kernel;
  78. free(k->name);
  79. free(kernel);
  80. }
  81. void _starpu_mic_clear_kernels(void)
  82. {
  83. struct _starpu_mic_kernel *kernel, *tmp;
  84. HASH_ITER(hh, kernels, kernel, tmp)
  85. {
  86. HASH_DEL(kernels, kernel);
  87. free(kernel);
  88. }
  89. }
  90. static int
  91. _starpu_mic_src_finalize_job (struct _starpu_job *j, struct _starpu_worker *worker)
  92. {
  93. uint32_t mask = 0;
  94. int profiling = starpu_profiling_status_get();
  95. struct timespec codelet_end;
  96. _starpu_driver_end_job(worker, j, worker->perf_arch, &codelet_end, 0,
  97. profiling);
  98. _starpu_driver_update_job_feedback(j, worker, worker->perf_arch,
  99. &j->cl_start, &codelet_end,
  100. profiling);
  101. _starpu_push_task_output (j, mask);
  102. _starpu_handle_job_termination(j);
  103. return 0;
  104. }
  105. static int
  106. _starpu_mic_src_process_completed_job (struct _starpu_worker_set *workerset)
  107. {
  108. struct _starpu_mp_node *node = mic_nodes[workerset->workers[0].mp_nodeid];
  109. enum _starpu_mp_command answer;
  110. void *arg;
  111. int arg_size;
  112. answer = _starpu_mp_common_recv_command (node, &arg, &arg_size);
  113. STARPU_ASSERT (answer == STARPU_EXECUTION_COMPLETED);
  114. void *arg_ptr = arg;
  115. int coreid;
  116. coreid = *(int *) arg_ptr;
  117. arg_ptr += sizeof (coreid); // Useless.
  118. struct _starpu_worker *worker = &workerset->workers[coreid];
  119. struct starpu_task *task = worker->current_task;
  120. struct _starpu_job *j = _starpu_get_job_associated_to_task (task);
  121. _starpu_mic_src_finalize_job (j, worker);
  122. worker->current_task = NULL;
  123. return 0;
  124. }
  125. static int _starpu_mic_src_execute_job(struct _starpu_job *j, struct _starpu_worker *args)
  126. {
  127. int ret;
  128. uint32_t mask = 0;
  129. STARPU_ASSERT(j);
  130. struct starpu_task *task = j->task;
  131. //struct timespec codelet_end;
  132. int profiling = starpu_profiling_status_get();
  133. unsigned calibrate_model = 0;
  134. STARPU_ASSERT(task);
  135. struct starpu_codelet *cl = task->cl;
  136. STARPU_ASSERT(cl);
  137. if (cl->model && cl->model->benchmarking)
  138. calibrate_model = 1;
  139. ret = _starpu_fetch_task_input(j, mask);
  140. if (ret != 0)
  141. {
  142. /* there was not enough memory, so the input of
  143. * the codelet cannot be fetched ... put the
  144. * codelet back, and try it later */
  145. return -EAGAIN;
  146. }
  147. starpu_mic_kernel_t kernel = _starpu_mic_src_get_kernel_from_codelet(j->task->cl, j->nimpl);
  148. _starpu_driver_start_job (args, j, &j->cl_start, 0, profiling);
  149. _starpu_src_common_execute_kernel_from_task(mic_nodes[args->mp_nodeid],
  150. (void (*)(void)) kernel, args->devid, task);
  151. return 0;
  152. }
  153. int _starpu_mic_src_register_kernel(starpu_mic_func_symbol_t *symbol, const char *func_name)
  154. {
  155. unsigned int func_name_size = (strlen(func_name) + 1) * sizeof(char);
  156. STARPU_PTHREAD_MUTEX_LOCK(&htbl_mutex);
  157. struct _starpu_mic_kernel *kernel;
  158. HASH_FIND_STR(kernels, func_name, kernel);
  159. if (kernel != NULL)
  160. {
  161. STARPU_PTHREAD_MUTEX_UNLOCK(&htbl_mutex);
  162. // Function already in the table.
  163. *symbol = kernel;
  164. return 0;
  165. }
  166. kernel = malloc(sizeof(*kernel));
  167. if (kernel == NULL)
  168. {
  169. STARPU_PTHREAD_MUTEX_UNLOCK(&htbl_mutex);
  170. return -ENOMEM;
  171. }
  172. kernel->name = malloc(func_name_size);
  173. if (kernel->name == NULL)
  174. {
  175. STARPU_PTHREAD_MUTEX_UNLOCK(&htbl_mutex);
  176. free(kernel);
  177. return -ENOMEM;
  178. }
  179. memcpy(kernel->name, func_name, func_name_size);
  180. HASH_ADD_STR(kernels, name, kernel);
  181. unsigned int nb_mic_devices = _starpu_mic_src_get_device_count();
  182. unsigned int i;
  183. for (i = 0; i < nb_mic_devices; ++i)
  184. kernel->func[i] = NULL;
  185. STARPU_PTHREAD_MUTEX_UNLOCK(&htbl_mutex);
  186. *symbol = kernel;
  187. return 0;
  188. }
  189. starpu_mic_kernel_t _starpu_mic_src_get_kernel(starpu_mic_func_symbol_t symbol)
  190. {
  191. int workerid = starpu_worker_get_id();
  192. /* This function has to be called in the codelet only, by the thread
  193. * which will handle the task */
  194. if (workerid < 0)
  195. return NULL;
  196. int nodeid = starpu_worker_get_mp_nodeid(workerid);
  197. struct _starpu_mic_kernel *kernel = symbol;
  198. if (kernel->func[nodeid] == NULL)
  199. {
  200. struct _starpu_mp_node *node = mic_nodes[nodeid];
  201. int ret = _starpu_src_common_lookup(node, (void (**)(void))&kernel->func[nodeid], kernel->name);
  202. if (ret)
  203. return NULL;
  204. }
  205. return kernel->func[nodeid];
  206. }
  207. /* Report an error which occured when using a MIC device
  208. * and print this error in a human-readable style.
  209. * It hanbles errors occuring when using COI.
  210. */
  211. void _starpu_mic_src_report_coi_error(const char *func, const char *file,
  212. const int line, const COIRESULT status)
  213. {
  214. const char *errormsg = COIResultGetName(status);
  215. printf("SRC: oops in %s (%s:%u)... %d: %s \n", func, file, line, status, errormsg);
  216. STARPU_ASSERT(0);
  217. }
  218. /* Report an error which occured when using a MIC device
  219. * and print this error in a human-readable style.
  220. * It hanbles errors occuring when using SCIF.
  221. */
  222. void _starpu_mic_src_report_scif_error(const char *func, const char *file, const int line, const int status)
  223. {
  224. const char *errormsg = strerror(status);
  225. printf("SRC: oops in %s (%s:%u)... %d: %s \n", func, file, line, status, errormsg);
  226. STARPU_ASSERT(0);
  227. }
  228. /* Return the number of MIC devices in the system.
  229. * If the number of devices is already known, we use the cached value
  230. * without calling again COI. */
  231. unsigned _starpu_mic_src_get_device_count(void)
  232. {
  233. static unsigned short cached = 0;
  234. static unsigned nb_devices = 0;
  235. /* We don't need to call the COI API again if we already
  236. * have the result in cache */
  237. if (!cached)
  238. {
  239. COIRESULT res;
  240. res = COIEngineGetCount(COI_ISA_MIC, &nb_devices);
  241. /* If something is wrong with the COI engine, we shouldn't
  242. * use MIC devices (if there is any...) */
  243. if (res != COI_SUCCESS)
  244. nb_devices = 0;
  245. cached = 1;
  246. }
  247. return nb_devices;
  248. }
  249. unsigned starpu_mic_device_get_count(void)
  250. {
  251. // Return the number of configured MIC devices.
  252. struct _starpu_machine_config *config = _starpu_get_machine_config ();
  253. struct starpu_machine_topology *topology = &config->topology;
  254. return topology->nmicdevices;
  255. }
  256. starpu_mic_kernel_t _starpu_mic_src_get_kernel_from_codelet(struct starpu_codelet *cl, unsigned nimpl)
  257. {
  258. starpu_mic_kernel_t kernel = NULL;
  259. starpu_mic_func_t func = _starpu_task_get_mic_nth_implementation(cl, nimpl);
  260. if (func)
  261. {
  262. /* We execute the function contained in the codelet, it must return a
  263. * pointer to the function to execute on the device, either specified
  264. * directly by the user or by a call to starpu_mic_get_func().
  265. */
  266. kernel = func();
  267. }
  268. else
  269. {
  270. /* If user dont define any starpu_mic_fun_t in cl->mic_func we try to use
  271. * cpu_func_name.
  272. */
  273. char *func_name = _starpu_task_get_cpu_name_nth_implementation(cl, nimpl);
  274. if (func_name)
  275. {
  276. starpu_mic_func_symbol_t symbol;
  277. _starpu_mic_src_register_kernel(&symbol, func_name);
  278. kernel = _starpu_mic_src_get_kernel(symbol);
  279. }
  280. }
  281. STARPU_ASSERT(kernel);
  282. return kernel;
  283. }
  284. /* Initialize the node structure describing the MIC source.
  285. */
  286. void _starpu_mic_src_init(struct _starpu_mp_node *node)
  287. {
  288. /* Let's initialize the connection with the peered sink device */
  289. _starpu_mic_common_connect(&node->mp_connection.mic_endpoint,
  290. STARPU_TO_MIC_ID(node->peer_id),
  291. STARPU_MIC_SINK_PORT_NUMBER(node->peer_id),
  292. STARPU_MIC_SOURCE_PORT_NUMBER);
  293. _starpu_mic_common_connect(&node->host_sink_dt_connection.mic_endpoint,
  294. STARPU_TO_MIC_ID(node->peer_id),
  295. STARPU_MIC_SINK_DT_PORT_NUMBER(node->peer_id),
  296. STARPU_MIC_SOURCE_DT_PORT_NUMBER);
  297. }
  298. /* Deinitialize the MIC sink, close all the connections.
  299. */
  300. void _starpu_mic_src_deinit(struct _starpu_mp_node *node)
  301. {
  302. scif_close(node->host_sink_dt_connection.mic_endpoint);
  303. scif_close(node->mp_connection.mic_endpoint);
  304. }
  305. /* Get infos of the MIC associed to memory_node */
  306. static void _starpu_mic_get_engine_info(COI_ENGINE_INFO *info, int devid)
  307. {
  308. STARPU_ASSERT(devid >= 0 && devid < STARPU_MAXMICDEVS);
  309. if (COIEngineGetInfo(handles[devid], sizeof(*info), info) != COI_SUCCESS)
  310. STARPU_MIC_SRC_REPORT_COI_ERROR(errno);
  311. }
  312. /* TODO: call _starpu_memory_manager_set_global_memory_size instead */
  313. /* Return the size of the memory on the MIC associed to memory_node */
  314. size_t _starpu_mic_get_global_mem_size(int devid)
  315. {
  316. COI_ENGINE_INFO infos;
  317. _starpu_mic_get_engine_info(&infos, devid);
  318. return infos.PhysicalMemory;
  319. }
  320. /* Return the size of the free memory on the MIC associed to memory_node */
  321. size_t _starpu_mic_get_free_mem_size(int devid)
  322. {
  323. COI_ENGINE_INFO infos;
  324. _starpu_mic_get_engine_info(&infos, devid);
  325. return infos.PhysicalMemoryFree;
  326. }
  327. /* Allocate memory on MIC.
  328. * Return 0 if OK or 1 if not.
  329. */
  330. int _starpu_mic_allocate_memory(void **addr, size_t size, unsigned memory_node)
  331. {
  332. /* We check we have (1.25 * size) free space in the MIC because
  333. * transfert with scif is not possible when the MIC
  334. * doesn't have enought free memory.
  335. * In this cas we can't tell any things to the host. */
  336. //int devid = _starpu_memory_node_get_devid(memory_node);
  337. //if (_starpu_mic_get_free_mem_size(devid) < size * 1.25)
  338. // return 1;
  339. const struct _starpu_mp_node *mp_node = _starpu_mic_src_get_mp_node_from_memory_node(memory_node);
  340. return _starpu_src_common_allocate(mp_node, addr, size);
  341. }
  342. /* Free memory on MIC.
  343. * Mic need size to free memory for use the function scif_unregister.
  344. */
  345. void _starpu_mic_free_memory(void *addr, size_t size, unsigned memory_node)
  346. {
  347. const struct _starpu_mp_node *mp_node = _starpu_mic_src_get_mp_node_from_memory_node(memory_node);
  348. struct _starpu_mic_free_command cmd = {addr, size};
  349. return _starpu_mp_common_send_command(mp_node, STARPU_FREE, &cmd, sizeof(cmd));
  350. }
  351. /* Transfert SIZE bytes from the address pointed by SRC in the SRC_NODE memory
  352. * node to the address pointed by DST in the DST_NODE memory node
  353. */
  354. int _starpu_mic_copy_ram_to_mic(void *src, unsigned src_node STARPU_ATTRIBUTE_UNUSED, void *dst, unsigned dst_node, size_t size)
  355. {
  356. const struct _starpu_mp_node *mp_node = _starpu_mic_src_get_mp_node_from_memory_node(dst_node);
  357. return _starpu_src_common_copy_host_to_sink(mp_node, src, dst, size);
  358. }
  359. /* Transfert SIZE bytes from the address pointed by SRC in the SRC_NODE memory
  360. * node to the address pointed by DST in the DST_NODE memory node
  361. */
  362. int _starpu_mic_copy_mic_to_ram(void *src, unsigned src_node, void *dst, unsigned dst_node STARPU_ATTRIBUTE_UNUSED, size_t size)
  363. {
  364. const struct _starpu_mp_node *mp_node = _starpu_mic_src_get_mp_node_from_memory_node(src_node);
  365. return _starpu_src_common_copy_sink_to_host(mp_node, src, dst, size);
  366. }
  367. /* Asynchronous transfers */
  368. int _starpu_mic_copy_ram_to_mic_async(void *src, unsigned src_node STARPU_ATTRIBUTE_UNUSED, void *dst, unsigned dst_node, size_t size)
  369. {
  370. const struct _starpu_mp_node *mp_node = _starpu_mic_src_get_mp_node_from_memory_node(dst_node);
  371. if (scif_vwriteto(mp_node->host_sink_dt_connection.mic_endpoint, src, size, (off_t)dst, 0) < 0)
  372. STARPU_MIC_SRC_REPORT_SCIF_ERROR(errno);
  373. return 0;
  374. }
  375. int _starpu_mic_copy_mic_to_ram_async(void *src, unsigned src_node, void *dst, unsigned dst_node STARPU_ATTRIBUTE_UNUSED, size_t size)
  376. {
  377. const struct _starpu_mp_node *mp_node = _starpu_mic_src_get_mp_node_from_memory_node(src_node);
  378. if (scif_vreadfrom(mp_node->host_sink_dt_connection.mic_endpoint, dst, size, (off_t)src, 0) < 0)
  379. STARPU_MIC_SRC_REPORT_SCIF_ERROR(errno);
  380. return 0;
  381. }
  382. /* Initialize a _starpu_mic_async_event. */
  383. int _starpu_mic_init_event(struct _starpu_mic_async_event *event, unsigned memory_node)
  384. {
  385. const struct _starpu_mp_node *mp_node = _starpu_mic_src_get_mp_node_from_memory_node(memory_node);
  386. scif_epd_t epd = mp_node->host_sink_dt_connection.mic_endpoint;
  387. event->memory_node = memory_node;
  388. /* Address of allocation must be multiple of the page size. */
  389. if (posix_memalign((void **)&(event->signal), 0x1000, sizeof(*(event->signal))) != 0)
  390. return -ENOMEM;
  391. *(event->signal) = 0;
  392. /* The size pass to scif_register is 0x1000 because it should be a multiple of the page size. */
  393. if (scif_register(epd, event->signal, 0x1000, (off_t)(event->signal), SCIF_PROT_WRITE, SCIF_MAP_FIXED) < 0)
  394. STARPU_MIC_SRC_REPORT_SCIF_ERROR(errno);
  395. /* Mark for a futur wait. */
  396. if (scif_fence_mark(epd, SCIF_FENCE_INIT_SELF, &(event->mark)) < 0)
  397. STARPU_MIC_SRC_REPORT_SCIF_ERROR(errno);
  398. /* Tell to scif to write STARPU_MIC_REQUEST_COMPLETE in event->signal when the transfer is complete.
  399. * We use this for test the end of a transfer. */
  400. if (scif_fence_signal(epd, (off_t)event->signal, STARPU_MIC_REQUEST_COMPLETE, 0, 0, SCIF_FENCE_INIT_SELF | SCIF_SIGNAL_LOCAL) < 0)
  401. STARPU_MIC_SRC_REPORT_SCIF_ERROR(errno);
  402. return 0;
  403. }
  404. /* Wait the end of the asynchronous request */
  405. void _starpu_mic_wait_request_completion(struct _starpu_mic_async_event *event)
  406. {
  407. if (event->signal != NULL)
  408. {
  409. const struct _starpu_mp_node *mp_node = _starpu_mic_src_get_mp_node_from_memory_node(event->memory_node);
  410. scif_epd_t epd = mp_node->host_sink_dt_connection.mic_endpoint;
  411. if (scif_fence_wait(epd, event->mark) < 0)
  412. STARPU_MIC_SRC_REPORT_SCIF_ERROR(errno);
  413. if (scif_unregister(epd, (off_t)(event->signal), 0x1000) < 0)
  414. STARPU_MIC_SRC_REPORT_SCIF_ERROR(errno);
  415. free(event->signal);
  416. event->signal = NULL;
  417. }
  418. }
  419. /* Test if a asynchronous request is end.
  420. * Return 1 if is end, 0 else. */
  421. int _starpu_mic_request_is_complete(struct _starpu_mic_async_event *event)
  422. {
  423. if (event->signal != NULL && *(event->signal) != STARPU_MIC_REQUEST_COMPLETE)
  424. return 0;
  425. const struct _starpu_mp_node *mp_node = _starpu_mic_src_get_mp_node_from_memory_node(event->memory_node);
  426. scif_epd_t epd = mp_node->host_sink_dt_connection.mic_endpoint;
  427. if (scif_unregister(epd, (off_t)(event->signal), 0x1000) < 0)
  428. STARPU_MIC_SRC_REPORT_SCIF_ERROR(errno);
  429. free(event->signal);
  430. event->signal = NULL;
  431. return 1;
  432. }
  433. void *_starpu_mic_src_worker(void *arg)
  434. {
  435. struct _starpu_worker_set *args = arg;
  436. /* As all workers of a set share common data, we just use the first
  437. * one for intializing the following stuffs. */
  438. struct _starpu_worker *baseworker = &args->workers[0];
  439. struct _starpu_machine_config *config = baseworker->config;
  440. unsigned baseworkerid = baseworker - config->workers;
  441. unsigned mp_nodeid = baseworker->mp_nodeid;
  442. unsigned i;
  443. unsigned memnode = baseworker->memory_node;
  444. _starpu_worker_init(baseworker, _STARPU_FUT_MIC_KEY);
  445. // Current task for a thread managing a worker set has no sense.
  446. _starpu_set_current_task(NULL);
  447. for (i = 0; i < config->topology.nmiccores[mp_nodeid]; i++)
  448. {
  449. struct _starpu_worker *worker = &config->workers[baseworkerid+i];
  450. snprintf(worker->name, sizeof(worker->name), "MIC %d core %u", mp_nodeid, i);
  451. }
  452. baseworker->status = STATUS_UNKNOWN;
  453. _STARPU_TRACE_WORKER_INIT_END;
  454. /* tell the main thread that this one is ready */
  455. STARPU_PTHREAD_MUTEX_LOCK(&args->mutex);
  456. args->set_is_initialized = 1;
  457. STARPU_PTHREAD_COND_SIGNAL(&args->ready_cond);
  458. STARPU_PTHREAD_MUTEX_UNLOCK(&args->mutex);
  459. while (_starpu_machine_is_running())
  460. {
  461. int res;
  462. struct starpu_task *task = NULL;
  463. struct _starpu_job * j;
  464. unsigned micworkerid = 0;
  465. _STARPU_TRACE_START_PROGRESS(memnode);
  466. _starpu_datawizard_progress(memnode, 1);
  467. _STARPU_TRACE_END_PROGRESS(memnode);
  468. STARPU_PTHREAD_MUTEX_LOCK(&baseworker->sched_mutex);
  469. /* We pop tasklists of each worker in the set and process the
  470. * first non-empty list. */
  471. for (micworkerid = 0 ; (micworkerid < args->nworkers) && (task == NULL); micworkerid++)
  472. task = _starpu_pop_task (&args->workers[micworkerid]);
  473. if (task != NULL) {
  474. micworkerid--;
  475. goto task_found;
  476. }
  477. #if 0 // XXX: synchronous execution for now
  478. /* No task to submit, so we can poll the MIC device for
  479. * completed jobs. */
  480. struct pollfd fd = {
  481. .fd = mic_nodes[baseworker->mp_nodeid]->mp_connection.mic_endpoint,
  482. .events = POLLIN
  483. };
  484. if (0 < poll (&fd, 1, 0)) {
  485. _starpu_mic_src_process_completed_job (args);
  486. goto restart_loop;
  487. }
  488. #endif
  489. /* At this point, there is really nothing to do for the thread
  490. * so we can block.
  491. * XXX: blocking drivers is in fact broken. DO NOT USE IT ! */
  492. if (_starpu_worker_get_status(baseworkerid) != STATUS_SLEEPING)
  493. {
  494. _STARPU_TRACE_WORKER_SLEEP_START;
  495. _starpu_worker_restart_sleeping(baseworkerid);
  496. _starpu_worker_set_status(baseworkerid, STATUS_SLEEPING);
  497. }
  498. if (_starpu_worker_can_block(memnode))
  499. STARPU_PTHREAD_COND_WAIT(&baseworker->sched_cond, &baseworker->sched_mutex);
  500. else
  501. {
  502. if (_starpu_machine_is_running())
  503. STARPU_UYIELD();
  504. }
  505. if (_starpu_worker_get_status(baseworkerid) == STATUS_SLEEPING)
  506. {
  507. _STARPU_TRACE_WORKER_SLEEP_END;
  508. _starpu_worker_stop_sleeping(baseworkerid);
  509. _starpu_worker_set_status(baseworkerid, STATUS_UNKNOWN);
  510. }
  511. restart_loop:
  512. STARPU_PTHREAD_MUTEX_UNLOCK(&baseworker->sched_mutex);
  513. continue;
  514. task_found:
  515. /* If the MIC core associated to `micworkerid' is already
  516. * processing a job, we push back this one in the worker task
  517. * list. */
  518. STARPU_PTHREAD_MUTEX_UNLOCK(&baseworker->sched_mutex);
  519. if (args->workers[micworkerid].current_task) {
  520. _starpu_push_task_to_workers(task);
  521. continue;
  522. }
  523. STARPU_ASSERT(task);
  524. j = _starpu_get_job_associated_to_task(task);
  525. /* can a MIC device do that task ? */
  526. if (!_STARPU_MIC_MAY_PERFORM(j))
  527. {
  528. /* this isn't a mic task */
  529. _starpu_push_task_to_workers(task);
  530. continue;
  531. }
  532. args->workers[micworkerid].current_task = j->task;
  533. res = _starpu_mic_src_execute_job (j, &args->workers[micworkerid]);
  534. if (res)
  535. {
  536. switch (res)
  537. {
  538. case -EAGAIN:
  539. _STARPU_DISP("ouch, put the codelet %p back ... \n", j);
  540. _starpu_push_task_to_workers(task);
  541. STARPU_ABORT();
  542. continue;
  543. default:
  544. STARPU_ASSERT(0);
  545. }
  546. }
  547. /* XXX: synchronous execution for now */
  548. _starpu_mic_src_process_completed_job (args);
  549. }
  550. _STARPU_TRACE_WORKER_DEINIT_START;
  551. _starpu_handle_all_pending_node_data_requests(memnode);
  552. /* In case there remains some memory that was automatically
  553. * allocated by StarPU, we release it now. Note that data
  554. * coherency is not maintained anymore at that point ! */
  555. _starpu_free_all_automatically_allocated_buffers(memnode);
  556. _STARPU_TRACE_WORKER_DEINIT_END(_STARPU_FUT_CUDA_KEY);
  557. return NULL;
  558. }