data_arbiter_concurrency.c 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2013,2015-2017 Inria
  4. * Copyright (C) 2010-2013,2016,2017 CNRS
  5. * Copyright (C) 2009-2018 Université de Bordeaux
  6. *
  7. * StarPU is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU Lesser General Public License as published by
  9. * the Free Software Foundation; either version 2.1 of the License, or (at
  10. * your option) any later version.
  11. *
  12. * StarPU is distributed in the hope that it will be useful, but
  13. * WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  15. *
  16. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  17. */
  18. #include <core/dependencies/data_concurrency.h>
  19. #include <datawizard/coherency.h>
  20. #include <core/sched_policy.h>
  21. #include <common/starpu_spinlock.h>
  22. #include <datawizard/sort_data_handles.h>
  23. #include <datawizard/memalloc.h>
  24. #include <datawizard/memory_nodes.h>
  25. /* TODO factorize with data_concurrency.c and btw support redux */
  26. //#define LOCK_OR_DELEGATE
  27. /*
  28. * This implements a solution for the dining philosophers problem (see
  29. * data_concurrency.c for the rationale) based on a centralized arbiter. This
  30. * allows to get a more parallel solution than the Dijkstra solution, by
  31. * avoiding strictly serialized executions, and instead opportunistically find
  32. * which tasks can take data.
  33. *
  34. * These are the algorithms implemented below:
  35. *
  36. *
  37. * at termination of task T:
  38. *
  39. * - for each handle h of T:
  40. * - mutex_lock(&arbiter)
  41. * - release reference on h
  42. * - call _starpu_notify_arbitered_dependencies which does the following
  43. * - for each task Tc waiting for h:
  44. * - for each data Tc_h it is waiting for:
  45. * - if Tc_h is busy, goto fail
  46. * // Ok, now really take them
  47. * - For each data Tc_h it is waiting:
  48. * - lock(Tc_h)
  49. * - take reference on h (it should be still available since we hold the arbiter)
  50. * - unlock(Tc_h)
  51. * // Ok, we managed to find somebody, we're finished!
  52. * _starpu_push_task(Tc);
  53. * break;
  54. * fail:
  55. * - unrecord T as waiting on h
  56. * - record T as waiting on Tc_h
  57. * // No luck, let's try another task
  58. * continue;
  59. * // Release the arbiter mutex a bit from time to time
  60. * - mutex_unlock(&arbiter)
  61. *
  62. *
  63. * at submission of task T (_starpu_submit_job_enforce_arbitered_deps):
  64. *
  65. * - mutex_lock(&arbiter)
  66. * - for each handle h of T:
  67. * - lock(h)
  68. * - try to take a reference on h, goto fail on failure
  69. * - unlock(h)
  70. * // Success!
  71. * - mutex_unlock(&arbiter);
  72. * - return 0;
  73. *
  74. * fail:
  75. * // couldn't take everything, record task T and abort
  76. * - record T as waiting on h
  77. * // drop spurious references
  78. * - for each handle h of T already taken:
  79. * - lock(h)
  80. * - release reference on h
  81. * - unlock(h)
  82. * - mutex_unlock(&arbiter)
  83. * - return 1;
  84. *
  85. * at acquire (_starpu_attempt_to_submit_arbitered_data_request):
  86. * - mutex_lock(&arbiter)
  87. * - try to take a reference on h
  88. * - on failure, record as waiting on h
  89. * - mutex_unlock(&arbiter);
  90. * - return 0 if succeeded, 1 if failed;
  91. */
  92. static int _starpu_arbiter_filter_modes(int mode)
  93. {
  94. /* Do not care about some flags */
  95. mode &= ~STARPU_COMMUTE;
  96. mode &= ~STARPU_SSEND;
  97. mode &= ~STARPU_LOCALITY;
  98. if (mode == STARPU_RW)
  99. mode = STARPU_W;
  100. return mode;
  101. }
  102. struct starpu_arbiter
  103. {
  104. #ifdef LOCK_OR_DELEGATE
  105. /* The list of task to perform */
  106. struct LockOrDelegateListNode* dlTaskListHead;
  107. /* To protect the list of tasks */
  108. struct _starpu_spinlock dlListLock;
  109. /* Whether somebody is working on the list */
  110. int working;
  111. #else /* LOCK_OR_DELEGATE */
  112. starpu_pthread_mutex_t mutex;
  113. #endif /* LOCK_OR_DELEGATE */
  114. };
  115. #ifdef LOCK_OR_DELEGATE
  116. /* In case of congestion, we don't want to needlessly wait for the arbiter lock
  117. * while we can just delegate the work to the worker already managing some
  118. * dependencies.
  119. *
  120. * So we push work on the dlTastListHead queue and only one worker will process
  121. * the list.
  122. */
  123. /* A LockOrDelegate task list */
  124. struct LockOrDelegateListNode
  125. {
  126. void (*func)(void*);
  127. void* data;
  128. struct LockOrDelegateListNode* next;
  129. };
  130. /* Post a task to perfom if possible, otherwise put it in the list
  131. * If we can perfom this task, we may also perfom all the tasks in the list
  132. * This function return 1 if the task (and maybe some others) has been done
  133. * by the calling thread and 0 otherwise (if the task has just been put in the list)
  134. */
  135. static int _starpu_LockOrDelegatePostOrPerform(starpu_arbiter_t arbiter, void (*func)(void*), void* data)
  136. {
  137. struct LockOrDelegateListNode *newNode, *iter, *next;
  138. int did = 0;
  139. _STARPU_MALLOC(newNode, sizeof(*newNode));
  140. newNode->data = data;
  141. newNode->func = func;
  142. _starpu_spin_lock(&arbiter->dlListLock);
  143. if (arbiter->working)
  144. {
  145. /* Somebody working on it, insert the node */
  146. newNode->next = arbiter->dlTaskListHead;
  147. arbiter->dlTaskListHead = newNode;
  148. }
  149. else
  150. {
  151. /* Nobody working on the list, we'll work */
  152. arbiter->working = 1;
  153. /* work on what was pushed so far first */
  154. iter = arbiter->dlTaskListHead;
  155. arbiter->dlTaskListHead = NULL;
  156. _starpu_spin_unlock(&arbiter->dlListLock);
  157. while (iter != NULL)
  158. {
  159. (*iter->func)(iter->data);
  160. next = iter->next;
  161. free(iter);
  162. iter = next;
  163. }
  164. /* And then do our job */
  165. (*func)(data);
  166. free(newNode);
  167. did = 1;
  168. _starpu_spin_lock(&arbiter->dlListLock);
  169. /* And finish working on anything that could have been pushed
  170. * in the meanwhile */
  171. while (arbiter->dlTaskListHead != 0)
  172. {
  173. iter = arbiter->dlTaskListHead;
  174. arbiter->dlTaskListHead = arbiter->dlTaskListHead->next;
  175. _starpu_spin_unlock(&arbiter->dlListLock);
  176. (*iter->func)(iter->data);
  177. free(iter);
  178. _starpu_spin_lock(&arbiter->dlListLock);
  179. }
  180. arbiter->working = 0;
  181. }
  182. _starpu_spin_unlock(&arbiter->dlListLock);
  183. return did;
  184. }
  185. #endif
  186. /* Try to submit just one data request, in case the request can be processed
  187. * immediatly, return 0, if there is still a dependency that is not compatible
  188. * with the current mode, the request is put in the per-handle list of
  189. * "requesters", and this function returns 1. */
  190. #ifdef LOCK_OR_DELEGATE
  191. struct starpu_submit_arbitered_args
  192. {
  193. unsigned request_from_codelet;
  194. starpu_data_handle_t handle;
  195. enum starpu_data_access_mode mode;
  196. void (*callback)(void *);
  197. void *argcb;
  198. struct _starpu_job *j;
  199. unsigned buffer_index;
  200. };
  201. static unsigned ___starpu_attempt_to_submit_arbitered_data_request(unsigned request_from_codelet,
  202. starpu_data_handle_t handle, enum starpu_data_access_mode mode,
  203. void (*callback)(void *), void *argcb,
  204. struct _starpu_job *j, unsigned buffer_index);
  205. static void __starpu_attempt_to_submit_arbitered_data_request(void *inData)
  206. {
  207. struct starpu_submit_arbitered_args* args = inData;
  208. unsigned request_from_codelet = args->request_from_codelet;
  209. starpu_data_handle_t handle = args->handle;
  210. enum starpu_data_access_mode mode = args->mode;
  211. void (*callback)(void*) = args->callback;
  212. void *argcb = args->argcb;
  213. struct _starpu_job *j = args->j;
  214. unsigned buffer_index = args->buffer_index;
  215. free(args);
  216. if (!___starpu_attempt_to_submit_arbitered_data_request(request_from_codelet, handle, mode, callback, argcb, j, buffer_index))
  217. /* Success, but we have no way to report it to original caller,
  218. * so call callback ourself */
  219. callback(argcb);
  220. }
  221. unsigned _starpu_attempt_to_submit_arbitered_data_request(unsigned request_from_codelet,
  222. starpu_data_handle_t handle, enum starpu_data_access_mode mode,
  223. void (*callback)(void *), void *argcb,
  224. struct _starpu_job *j, unsigned buffer_index)
  225. {
  226. struct starpu_submit_arbitered_args* args;
  227. _STARPU_MALLOC(args, sizeof(*args));
  228. args->request_from_codelet = request_from_codelet;
  229. args->handle = handle;
  230. args->mode = mode;
  231. args->callback = callback;
  232. args->argcb = argcb;
  233. args->j = j;
  234. args->buffer_index = buffer_index;
  235. /* The function will delete args */
  236. _starpu_LockOrDelegatePostOrPerform(handle->arbiter, &__starpu_attempt_to_submit_arbitered_data_request, args);
  237. return 1;
  238. }
  239. unsigned ___starpu_attempt_to_submit_arbitered_data_request(unsigned request_from_codelet,
  240. starpu_data_handle_t handle, enum starpu_data_access_mode mode,
  241. void (*callback)(void *), void *argcb,
  242. struct _starpu_job *j, unsigned buffer_index)
  243. {
  244. STARPU_ASSERT(handle->arbiter);
  245. #else // LOCK_OR_DELEGATE
  246. unsigned _starpu_attempt_to_submit_arbitered_data_request(unsigned request_from_codelet,
  247. starpu_data_handle_t handle, enum starpu_data_access_mode mode,
  248. void (*callback)(void *), void *argcb,
  249. struct _starpu_job *j, unsigned buffer_index)
  250. {
  251. starpu_arbiter_t arbiter = handle->arbiter;
  252. STARPU_PTHREAD_MUTEX_LOCK(&arbiter->mutex);
  253. #endif // LOCK_OR_DELEGATE
  254. mode = _starpu_arbiter_filter_modes(mode);
  255. STARPU_ASSERT_MSG(!(mode & STARPU_REDUX), "REDUX with arbiter is not implemented\n");
  256. /* Take the lock protecting the header. We try to do some progression
  257. * in case this is called from a worker, otherwise we just wait for the
  258. * lock to be available. */
  259. if (request_from_codelet)
  260. {
  261. int cpt = 0;
  262. while (cpt < STARPU_SPIN_MAXTRY && _starpu_spin_trylock(&handle->header_lock))
  263. {
  264. cpt++;
  265. _starpu_datawizard_progress(0);
  266. }
  267. if (cpt == STARPU_SPIN_MAXTRY)
  268. _starpu_spin_lock(&handle->header_lock);
  269. }
  270. else
  271. {
  272. _starpu_spin_lock(&handle->header_lock);
  273. }
  274. /* If there is currently nobody accessing the piece of data, or it's
  275. * not another writter and if this is the same type of access as the
  276. * current one, we can proceed. */
  277. unsigned put_in_list = 1;
  278. if ((handle->refcnt == 0) || (!(mode == STARPU_W) && (handle->current_mode == mode)))
  279. {
  280. /* TODO: Detect whether this is the end of a reduction phase etc. like in data_concurrency.c */
  281. if (0)
  282. {
  283. }
  284. else
  285. {
  286. put_in_list = 0;
  287. }
  288. }
  289. if (put_in_list)
  290. {
  291. /* there cannot be multiple writers or a new writer
  292. * while the data is in read mode */
  293. handle->busy_count++;
  294. /* enqueue the request */
  295. struct _starpu_data_requester *r = _starpu_data_requester_new();
  296. r->mode = mode;
  297. r->is_requested_by_codelet = request_from_codelet;
  298. r->j = j;
  299. r->buffer_index = buffer_index;
  300. r->prio = j ? j->task->priority : 0;
  301. r->ready_data_callback = callback;
  302. r->argcb = argcb;
  303. _starpu_data_requester_prio_list_push_back(&handle->arbitered_req_list, r);
  304. /* failed */
  305. put_in_list = 1;
  306. }
  307. else
  308. {
  309. handle->refcnt++;
  310. handle->busy_count++;
  311. /* Do not write to handle->current_mode if it is already
  312. * R. This avoids a spurious warning from helgrind when
  313. * the following happens:
  314. * acquire(R) in thread A
  315. * acquire(R) in thread B
  316. * release_data_on_node() in thread A
  317. * helgrind would shout that the latter reads current_mode
  318. * unsafely.
  319. *
  320. * This actually basically explains helgrind that it is a
  321. * shared R acquisition.
  322. */
  323. if (mode != STARPU_R || handle->current_mode != mode)
  324. handle->current_mode = mode;
  325. /* success */
  326. put_in_list = 0;
  327. }
  328. _starpu_spin_unlock(&handle->header_lock);
  329. #ifndef LOCK_OR_DELEGATE
  330. STARPU_PTHREAD_MUTEX_UNLOCK(&arbiter->mutex);
  331. #endif // LOCK_OR_DELEGATE
  332. return put_in_list;
  333. }
  334. #ifdef LOCK_OR_DELEGATE
  335. /* These are the arguments passed to _submit_job_enforce_arbitered_deps */
  336. struct starpu_enforce_arbitered_args
  337. {
  338. struct _starpu_job *j;
  339. unsigned buf;
  340. unsigned nbuffers;
  341. };
  342. static void ___starpu_submit_job_enforce_arbitered_deps(struct _starpu_job *j, unsigned buf, unsigned nbuffers);
  343. static void __starpu_submit_job_enforce_arbitered_deps(void* inData)
  344. {
  345. struct starpu_enforce_arbitered_args* args = inData;
  346. struct _starpu_job *j = args->j;
  347. unsigned buf = args->buf;
  348. unsigned nbuffers = args->nbuffers;
  349. /* we are in charge of freeing the args */
  350. free(args);
  351. ___starpu_submit_job_enforce_arbitered_deps(j, buf, nbuffers);
  352. }
  353. void _starpu_submit_job_enforce_arbitered_deps(struct _starpu_job *j, unsigned buf, unsigned nbuffers)
  354. {
  355. struct starpu_enforce_arbitered_args* args;
  356. _STARPU_MALLOC(args, sizeof(*args));
  357. starpu_data_handle_t handle = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, buf);
  358. args->j = j;
  359. args->buf = buf;
  360. args->nbuffers = nbuffers;
  361. /* The function will delete args */
  362. _starpu_LockOrDelegatePostOrPerform(handle->arbiter, &__starpu_submit_job_enforce_arbitered_deps, args);
  363. }
  364. static void ___starpu_submit_job_enforce_arbitered_deps(struct _starpu_job *j, unsigned buf, unsigned nbuffers)
  365. {
  366. starpu_arbiter_t arbiter = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, buf)->arbiter;
  367. #else // LOCK_OR_DELEGATE
  368. void _starpu_submit_job_enforce_arbitered_deps(struct _starpu_job *j, unsigned buf, unsigned nbuffers)
  369. {
  370. struct _starpu_data_descr *descrs = _STARPU_JOB_GET_ORDERED_BUFFERS(j);
  371. starpu_arbiter_t arbiter = descrs[buf].handle->arbiter;
  372. STARPU_PTHREAD_MUTEX_LOCK(&arbiter->mutex);
  373. #endif
  374. STARPU_ASSERT(arbiter);
  375. const unsigned start_buf_arbiter = buf;
  376. unsigned idx_buf_arbiter;
  377. unsigned all_arbiter_available = 1;
  378. starpu_data_handle_t handle;
  379. enum starpu_data_access_mode mode;
  380. for (idx_buf_arbiter = start_buf_arbiter; idx_buf_arbiter < nbuffers; idx_buf_arbiter++)
  381. {
  382. handle = descrs[idx_buf_arbiter].handle;
  383. mode = descrs[idx_buf_arbiter].mode & ~STARPU_COMMUTE;
  384. mode = _starpu_arbiter_filter_modes(mode);
  385. STARPU_ASSERT_MSG(!(mode & STARPU_REDUX), "REDUX with arbiter is not implemented\n");
  386. if (idx_buf_arbiter && (descrs[idx_buf_arbiter-1].handle == handle))
  387. /* We have already requested this data, skip it. This
  388. * depends on ordering putting writes before reads, see
  389. * _starpu_compar_handles. */
  390. continue;
  391. if (handle->arbiter != arbiter)
  392. {
  393. /* another arbiter */
  394. break;
  395. }
  396. /* Try to take handle */
  397. _starpu_spin_lock(&handle->header_lock);
  398. if ((handle->refcnt == 0) || (!(mode == STARPU_W) && (handle->current_mode == mode)))
  399. {
  400. /* Got it */
  401. handle->refcnt++;
  402. handle->busy_count++;
  403. if (mode != STARPU_R || handle->current_mode != mode)
  404. handle->current_mode = mode;
  405. _starpu_spin_unlock(&handle->header_lock);
  406. }
  407. else
  408. {
  409. /* a handle does not have a refcnt == 0, stop */
  410. _starpu_spin_unlock(&handle->header_lock);
  411. all_arbiter_available = 0;
  412. break;
  413. }
  414. }
  415. if (all_arbiter_available == 0)
  416. {
  417. /* Oups, record ourself as waiting for this data */
  418. struct _starpu_data_requester *r = _starpu_data_requester_new();
  419. r->mode = mode;
  420. r->is_requested_by_codelet = 1;
  421. r->j = j;
  422. r->buffer_index = start_buf_arbiter;
  423. r->prio = j->task->priority;
  424. r->ready_data_callback = NULL;
  425. r->argcb = NULL;
  426. /* store node in list */
  427. _starpu_data_requester_prio_list_push_front(&handle->arbitered_req_list, r);
  428. _starpu_spin_lock(&handle->header_lock);
  429. handle->busy_count++;
  430. _starpu_spin_unlock(&handle->header_lock);
  431. /* and cancel all taken */
  432. unsigned idx_buf_cancel;
  433. for (idx_buf_cancel = start_buf_arbiter; idx_buf_cancel < idx_buf_arbiter ; idx_buf_cancel++)
  434. {
  435. starpu_data_handle_t cancel_handle = descrs[idx_buf_cancel].handle;
  436. if (idx_buf_cancel && (descrs[idx_buf_cancel-1].handle == cancel_handle))
  437. continue;
  438. if (cancel_handle->arbiter != arbiter)
  439. /* Will have to process another arbiter, will do that later */
  440. break;
  441. _starpu_spin_lock(&cancel_handle->header_lock);
  442. /* reset the counter because finally we do not take the data */
  443. STARPU_ASSERT(cancel_handle->refcnt >= 1);
  444. cancel_handle->refcnt--;
  445. STARPU_ASSERT(cancel_handle->busy_count > 0);
  446. cancel_handle->busy_count--;
  447. if (!_starpu_data_check_not_busy(cancel_handle))
  448. _starpu_spin_unlock(&cancel_handle->header_lock);
  449. }
  450. #ifndef LOCK_OR_DELEGATE
  451. STARPU_PTHREAD_MUTEX_UNLOCK(&arbiter->mutex);
  452. #endif
  453. return;
  454. }
  455. #ifndef LOCK_OR_DELEGATE
  456. STARPU_PTHREAD_MUTEX_UNLOCK(&arbiter->mutex);
  457. #endif
  458. // all_arbiter_available is true
  459. if (idx_buf_arbiter < nbuffers)
  460. /* Other arbitered data, process them */
  461. _starpu_submit_job_enforce_arbitered_deps(j, idx_buf_arbiter, nbuffers);
  462. else
  463. /* Finished with all data, can eventually push! */
  464. _starpu_push_task(j);
  465. }
  466. #ifdef LOCK_OR_DELEGATE
  467. void ___starpu_notify_arbitered_dependencies(starpu_data_handle_t handle);
  468. void __starpu_notify_arbitered_dependencies(void* inData)
  469. {
  470. starpu_data_handle_t handle = inData;
  471. ___starpu_notify_arbitered_dependencies(handle);
  472. }
  473. void _starpu_notify_arbitered_dependencies(starpu_data_handle_t handle)
  474. {
  475. _starpu_LockOrDelegatePostOrPerform(handle->arbiter, &__starpu_notify_arbitered_dependencies, handle);
  476. }
  477. void ___starpu_notify_arbitered_dependencies(starpu_data_handle_t handle)
  478. #else // LOCK_OR_DELEGATE
  479. void _starpu_notify_arbitered_dependencies(starpu_data_handle_t handle)
  480. #endif
  481. {
  482. starpu_arbiter_t arbiter = handle->arbiter;
  483. #ifndef LOCK_OR_DELEGATE
  484. STARPU_PTHREAD_MUTEX_LOCK(&arbiter->mutex);
  485. #endif
  486. /* Since the request has been posted the handle may have been proceed and released */
  487. if (_starpu_data_requester_prio_list_empty(&handle->arbitered_req_list))
  488. {
  489. /* No waiter, just remove our reference */
  490. _starpu_spin_lock(&handle->header_lock);
  491. STARPU_ASSERT(handle->refcnt > 0);
  492. handle->refcnt--;
  493. STARPU_ASSERT(handle->busy_count > 0);
  494. handle->busy_count--;
  495. #ifndef LOCK_OR_DELEGATE
  496. STARPU_PTHREAD_MUTEX_UNLOCK(&arbiter->mutex);
  497. #endif
  498. if (_starpu_data_check_not_busy(handle))
  499. /* Handle was even destroyed, don't unlock it. */
  500. return;
  501. _starpu_spin_unlock(&handle->header_lock);
  502. return;
  503. }
  504. /* There is a waiter, remove our reference */
  505. _starpu_spin_lock(&handle->header_lock);
  506. STARPU_ASSERT(handle->refcnt > 0);
  507. handle->refcnt--;
  508. STARPU_ASSERT(handle->busy_count > 0);
  509. handle->busy_count--;
  510. /* There should be at least one busy_count reference for the waiter
  511. * (thus we don't risk to see the handle disappear below) */
  512. STARPU_ASSERT(handle->busy_count > 0);
  513. _starpu_spin_unlock(&handle->header_lock);
  514. /* Note: we may be putting back our own requests, so avoid looping by
  515. * extracting the list */
  516. struct _starpu_data_requester_prio_list l = handle->arbitered_req_list;
  517. _starpu_data_requester_prio_list_init(&handle->arbitered_req_list);
  518. while (!_starpu_data_requester_prio_list_empty(&l))
  519. {
  520. struct _starpu_data_requester *r = _starpu_data_requester_prio_list_pop_front_highest(&l);
  521. if (!r->is_requested_by_codelet)
  522. {
  523. /* data_acquire_cb, process it */
  524. enum starpu_data_access_mode r_mode = r->mode;
  525. int put_in_list = 1;
  526. r_mode = _starpu_arbiter_filter_modes(r_mode);
  527. _starpu_spin_lock(&handle->header_lock);
  528. handle->busy_count++;
  529. if ((handle->refcnt == 0) || (!(r_mode == STARPU_W) && (handle->current_mode == r_mode)))
  530. {
  531. handle->refcnt++;
  532. handle->current_mode = r_mode;
  533. put_in_list = 0;
  534. }
  535. _starpu_spin_unlock(&handle->header_lock);
  536. if (put_in_list)
  537. _starpu_data_requester_prio_list_push_front(&l, r);
  538. /* Put back remaining requests */
  539. _starpu_data_requester_prio_list_push_prio_list_back(&handle->arbitered_req_list, &l);
  540. #ifndef LOCK_OR_DELEGATE
  541. STARPU_PTHREAD_MUTEX_UNLOCK(&arbiter->mutex);
  542. #endif
  543. if (!put_in_list)
  544. {
  545. r->ready_data_callback(r->argcb);
  546. _starpu_data_requester_delete(r);
  547. }
  548. _starpu_spin_lock(&handle->header_lock);
  549. STARPU_ASSERT(handle->busy_count > 0);
  550. handle->busy_count--;
  551. if (!_starpu_data_check_not_busy(handle))
  552. _starpu_spin_unlock(&handle->header_lock);
  553. return;
  554. }
  555. /* A task waiting for a set of data, try to acquire them */
  556. struct _starpu_job* j = r->j;
  557. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(j->task);
  558. unsigned idx_buf_arbiter;
  559. unsigned all_arbiter_available = 1;
  560. starpu_data_handle_t handle_arbiter;
  561. enum starpu_data_access_mode mode;
  562. unsigned start_buf_arbiter = r->buffer_index;
  563. struct _starpu_data_descr *descrs = _STARPU_JOB_GET_ORDERED_BUFFERS(j);
  564. for (idx_buf_arbiter = start_buf_arbiter; idx_buf_arbiter < nbuffers; idx_buf_arbiter++)
  565. {
  566. handle_arbiter = descrs[idx_buf_arbiter].handle;
  567. if (idx_buf_arbiter && (descrs[idx_buf_arbiter-1].handle == handle_arbiter))
  568. /* We have already requested this data, skip it. This
  569. * depends on ordering putting writes before reads, see
  570. * _starpu_compar_handles. */
  571. continue;
  572. if (handle_arbiter->arbiter != arbiter)
  573. /* Will have to process another arbiter, will do that later */
  574. break;
  575. mode = descrs[idx_buf_arbiter].mode;
  576. mode = _starpu_arbiter_filter_modes(mode);
  577. /* we post all arbiter */
  578. _starpu_spin_lock(&handle_arbiter->header_lock);
  579. if (!((handle_arbiter->refcnt == 0) || (!(mode == STARPU_W) && (handle_arbiter->current_mode == mode))))
  580. {
  581. /* handle is not available, record ourself */
  582. _starpu_spin_unlock(&handle_arbiter->header_lock);
  583. all_arbiter_available = 0;
  584. break;
  585. }
  586. /* mark the handle as taken */
  587. handle_arbiter->refcnt++;
  588. handle_arbiter->busy_count++;
  589. handle_arbiter->current_mode = mode;
  590. _starpu_spin_unlock(&handle_arbiter->header_lock);
  591. }
  592. if (all_arbiter_available)
  593. {
  594. /* Success! Drop request */
  595. _starpu_data_requester_delete(r);
  596. _starpu_spin_lock(&handle->header_lock);
  597. STARPU_ASSERT(handle->busy_count > 0);
  598. handle->busy_count--;
  599. if (!_starpu_data_check_not_busy(handle))
  600. _starpu_spin_unlock(&handle->header_lock);
  601. /* Put back remaining requests */
  602. _starpu_data_requester_prio_list_push_prio_list_back(&handle->arbitered_req_list, &l);
  603. #ifndef LOCK_OR_DELEGATE
  604. STARPU_PTHREAD_MUTEX_UNLOCK(&arbiter->mutex);
  605. #endif
  606. if (idx_buf_arbiter < nbuffers)
  607. /* Other arbitered data, process them */
  608. _starpu_submit_job_enforce_arbitered_deps(j, idx_buf_arbiter, nbuffers);
  609. else
  610. /* Finished with all data, can eventually push! */
  611. _starpu_push_task(j);
  612. return;
  613. }
  614. else
  615. {
  616. /* all handles are not available - record that task on the first unavailable handle */
  617. /* store node in list */
  618. r->mode = mode;
  619. _starpu_data_requester_prio_list_push_front(&handle_arbiter->arbitered_req_list, r);
  620. /* Move check_busy reference too */
  621. _starpu_spin_lock(&handle->header_lock);
  622. STARPU_ASSERT(handle->busy_count > 0);
  623. handle->busy_count--;
  624. if (!_starpu_data_check_not_busy(handle))
  625. _starpu_spin_unlock(&handle->header_lock);
  626. _starpu_spin_lock(&handle_arbiter->header_lock);
  627. handle_arbiter->busy_count++;
  628. _starpu_spin_unlock(&handle_arbiter->header_lock);
  629. /* and revert the mark */
  630. unsigned idx_buf_cancel;
  631. for (idx_buf_cancel = start_buf_arbiter; idx_buf_cancel < idx_buf_arbiter ; idx_buf_cancel++)
  632. {
  633. starpu_data_handle_t cancel_handle = descrs[idx_buf_cancel].handle;
  634. if (idx_buf_cancel && (descrs[idx_buf_cancel-1].handle == cancel_handle))
  635. continue;
  636. if (cancel_handle->arbiter != arbiter)
  637. break;
  638. _starpu_spin_lock(&cancel_handle->header_lock);
  639. STARPU_ASSERT(cancel_handle->refcnt >= 1);
  640. cancel_handle->refcnt--;
  641. STARPU_ASSERT(cancel_handle->busy_count > 0);
  642. cancel_handle->busy_count--;
  643. if (!_starpu_data_check_not_busy(cancel_handle))
  644. _starpu_spin_unlock(&cancel_handle->header_lock);
  645. }
  646. }
  647. }
  648. /* no task has been pushed */
  649. #ifndef LOCK_OR_DELEGATE
  650. STARPU_PTHREAD_MUTEX_UNLOCK(&arbiter->mutex);
  651. #endif
  652. return;
  653. }
  654. starpu_arbiter_t starpu_arbiter_create(void)
  655. {
  656. starpu_arbiter_t res;
  657. _STARPU_MALLOC(res, sizeof(*res));
  658. #ifdef LOCK_OR_DELEGATE
  659. res->dlTaskListHead = NULL;
  660. _starpu_spin_init(&res->dlListLock);
  661. res->working = 0;
  662. #else /* LOCK_OR_DELEGATE */
  663. STARPU_PTHREAD_MUTEX_INIT(&res->mutex, NULL);
  664. #endif /* LOCK_OR_DELEGATE */
  665. return res;
  666. }
  667. void starpu_data_assign_arbiter(starpu_data_handle_t handle, starpu_arbiter_t arbiter)
  668. {
  669. if (handle->arbiter && handle->arbiter == _starpu_global_arbiter)
  670. /* Just for testing purpose */
  671. return;
  672. STARPU_ASSERT_MSG(!handle->arbiter, "handle can only be assigned one arbiter");
  673. STARPU_ASSERT_MSG(!handle->refcnt, "arbiter can be assigned to handle only right after initialization");
  674. STARPU_ASSERT_MSG(!handle->busy_count, "arbiter can be assigned to handle only right after initialization");
  675. handle->arbiter = arbiter;
  676. }
  677. void starpu_arbiter_destroy(starpu_arbiter_t arbiter)
  678. {
  679. #ifdef LOCK_OR_DELEGATE
  680. _starpu_spin_lock(&arbiter->dlListLock);
  681. STARPU_ASSERT(!arbiter->dlTaskListHead);
  682. STARPU_ASSERT(!arbiter->working);
  683. _starpu_spin_unlock(&arbiter->dlListLock);
  684. _starpu_spin_destroy(&arbiter->dlListLock);
  685. #else /* LOCK_OR_DELEGATE */
  686. STARPU_PTHREAD_MUTEX_LOCK(&arbiter->mutex);
  687. STARPU_PTHREAD_MUTEX_UNLOCK(&arbiter->mutex);
  688. STARPU_PTHREAD_MUTEX_DESTROY(&arbiter->mutex);
  689. #endif /* LOCK_OR_DELEGATE */
  690. free(arbiter);
  691. }