data_arbiter_concurrency.c 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2015 Université de Bordeaux
  4. * Copyright (C) 2015 Inria
  5. *
  6. * StarPU is free software; you can redistribute it and/or modify
  7. * it under the terms of the GNU Lesser General Public License as published by
  8. * the Free Software Foundation; either version 2.1 of the License, or (at
  9. * your option) any later version.
  10. *
  11. * StarPU is distributed in the hope that it will be useful, but
  12. * WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  14. *
  15. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  16. */
  17. #include <core/dependencies/data_concurrency.h>
  18. #include <datawizard/coherency.h>
  19. #include <core/sched_policy.h>
  20. #include <common/starpu_spinlock.h>
  21. #include <datawizard/sort_data_handles.h>
  22. #include <datawizard/memalloc.h>
  23. #include <datawizard/memory_nodes.h>
  24. /* TODO factorize with data_concurrency.c and btw support redux */
  25. /* TODO: fine-grain R/W access */
  26. //#define LOCK_OR_DELEGATE
  27. /*
  28. * This implements a solution for the dining philosophers problem (see
  29. * data_concurrency.c for the rationale) based on a centralized arbiter. This
  30. * allows to get a more parallel solution than the Dijkstra solution, by
  31. * avoiding strictly serialized executions, and instead opportunistically find
  32. * which tasks can take data.
  33. *
  34. * These are the algorithms implemented below:
  35. *
  36. *
  37. * at termination of task T:
  38. *
  39. * - for each handle h of T:
  40. * - mutex_lock(&arbiter)
  41. * - release reference on h
  42. * - for each task Tc waiting for h:
  43. * - for each data Tc_h it is waiting for:
  44. * - if Tc_h is busy, goto fail
  45. * // Ok, now really take them
  46. * - For each data Tc_h it is waiting:
  47. * - lock(Tc_h)
  48. * - take reference on h (it should be still available since we hold the arbiter)
  49. * - unlock(Tc_h)
  50. * // Ok, we managed to find somebody, we're finished!
  51. * _starpu_push_task(Tc);
  52. * break;
  53. * fail:
  54. * - unrecord T as waiting on h
  55. * - record T as waiting on Tc_h
  56. * // No luck, let's try another task
  57. * continue;
  58. * // Release the arbiter mutex a bit from time to time
  59. * - mutex_unlock(&arbiter)
  60. *
  61. *
  62. * at submission of task T:
  63. *
  64. * - mutex_lock(&arbiter)
  65. * - for each handle h of T:
  66. * - lock(h)
  67. * - try to take a reference on h, goto fail on failure
  68. * - unlock(h)
  69. * // Success!
  70. * - mutex_unlock(&arbiter);
  71. * - return 0;
  72. *
  73. * fail:
  74. * // couldn't take everything, record task T and abort
  75. * - record T as waiting on h
  76. * // drop spurious references
  77. * - for each handle h of T already taken:
  78. * - lock(h)
  79. * - release reference on h
  80. * - unlock(h)
  81. * - mutex_unlock(&arbiter)
  82. * - return 1;
  83. */
  84. struct starpu_arbiter
  85. {
  86. #ifdef LOCK_OR_DELEGATE
  87. /* The list of task to perform */
  88. struct LockOrDelegateListNode* dlTaskListHead;
  89. /* To protect the list of tasks */
  90. struct _starpu_spinlock dlListLock;
  91. /* Whether somebody is working on the list */
  92. int working;
  93. #else /* LOCK_OR_DELEGATE */
  94. starpu_pthread_mutex_t mutex;
  95. #endif /* LOCK_OR_DELEGATE */
  96. };
  97. #ifdef LOCK_OR_DELEGATE
  98. /* In case of congestion, we don't want to needlessly wait for the arbiter lock
  99. * while we can just delegate the work to the worker already managing some
  100. * dependencies.
  101. *
  102. * So we push work on the dlTastListHead queue and only one worker will process
  103. * the list.
  104. */
  105. /* A LockOrDelegate task list */
  106. struct LockOrDelegateListNode
  107. {
  108. void (*func)(void*);
  109. void* data;
  110. struct LockOrDelegateListNode* next;
  111. };
  112. /* Post a task to perfom if possible, otherwise put it in the list
  113. * If we can perfom this task, we may also perfom all the tasks in the list
  114. * This function return 1 if the task (and maybe some others) has been done
  115. * by the calling thread and 0 otherwise (if the task has just been put in the list)
  116. */
  117. static int _starpu_LockOrDelegatePostOrPerform(starpu_arbiter_t arbiter, void (*func)(void*), void* data)
  118. {
  119. struct LockOrDelegateListNode* newNode = malloc(sizeof(*newNode)), *iter;
  120. int did = 0;
  121. STARPU_ASSERT(newNode);
  122. newNode->data = data;
  123. newNode->func = func;
  124. _starpu_spin_lock(&arbiter->dlListLock);
  125. if (arbiter->working)
  126. {
  127. /* Somebody working on it, insert the node */
  128. newNode->next = arbiter->dlTaskListHead;
  129. arbiter->dlTaskListHead = newNode;
  130. }
  131. else
  132. {
  133. /* Nobody working on the list, we'll work */
  134. arbiter->working = 1;
  135. /* work on what was pushed so far first */
  136. iter = arbiter->dlTaskListHead;
  137. arbiter->dlTaskListHead = NULL;
  138. _starpu_spin_unlock(&arbiter->dlListLock);
  139. while (iter != NULL)
  140. {
  141. (*iter->func)(iter->data);
  142. free(iter);
  143. iter = iter->next;
  144. }
  145. /* And then do our job */
  146. (*func)(data);
  147. free(newNode);
  148. did = 1;
  149. _starpu_spin_lock(&arbiter->dlListLock);
  150. /* And finish working on anything that could have been pushed
  151. * in the meanwhile */
  152. while (arbiter->dlTaskListHead != 0)
  153. {
  154. iter = arbiter->dlTaskListHead;
  155. arbiter->dlTaskListHead = arbiter->dlTaskListHead->next;
  156. _starpu_spin_unlock(&arbiter->dlListLock);
  157. (*iter->func)(iter->data);
  158. free(iter);
  159. _starpu_spin_lock(&arbiter->dlListLock);
  160. }
  161. arbiter->working = 0;
  162. }
  163. _starpu_spin_unlock(&arbiter->dlListLock);
  164. return did;
  165. }
  166. #endif
  167. /* Try to submit a data request, in case the request can be processed
  168. * immediatly, return 0, if there is still a dependency that is not compatible
  169. * with the current mode, the request is put in the per-handle list of
  170. * "requesters", and this function returns 1. */
  171. #ifdef LOCK_OR_DELEGATE
  172. struct starpu_submit_arbitered_args
  173. {
  174. unsigned request_from_codelet;
  175. starpu_data_handle_t handle;
  176. enum starpu_data_access_mode mode;
  177. void (*callback)(void *);
  178. void *argcb;
  179. struct _starpu_job *j;
  180. unsigned buffer_index;
  181. };
  182. static unsigned ___starpu_attempt_to_submit_arbitered_data_request(unsigned request_from_codelet,
  183. starpu_data_handle_t handle, enum starpu_data_access_mode mode,
  184. void (*callback)(void *), void *argcb,
  185. struct _starpu_job *j, unsigned buffer_index);
  186. static void __starpu_attempt_to_submit_arbitered_data_request(void *inData)
  187. {
  188. struct starpu_submit_arbitered_args* args = inData;
  189. unsigned request_from_codelet = args->request_from_codelet;
  190. starpu_data_handle_t handle = args->handle;
  191. enum starpu_data_access_mode mode = args->mode;
  192. void (*callback)(void*) = args->callback;
  193. void *argcb = args->argcb;
  194. struct _starpu_job *j = args->j;
  195. unsigned buffer_index = args->buffer_index;
  196. free(args);
  197. if (!___starpu_attempt_to_submit_arbitered_data_request(request_from_codelet, handle, mode, callback, argcb, j, buffer_index))
  198. /* Success, but we have no way to report it to original caller,
  199. * so call callback ourself */
  200. callback(argcb);
  201. }
  202. unsigned _starpu_attempt_to_submit_arbitered_data_request(unsigned request_from_codelet,
  203. starpu_data_handle_t handle, enum starpu_data_access_mode mode,
  204. void (*callback)(void *), void *argcb,
  205. struct _starpu_job *j, unsigned buffer_index)
  206. {
  207. struct starpu_submit_arbitered_args* args = malloc(sizeof(*args));
  208. args->request_from_codelet = request_from_codelet;
  209. args->handle = handle;
  210. args->mode = mode;
  211. args->callback = callback;
  212. args->argcb = argcb;
  213. args->j = j;
  214. args->buffer_index = buffer_index;
  215. /* The function will delete args */
  216. _starpu_LockOrDelegatePostOrPerform(handle->arbiter, &__starpu_attempt_to_submit_arbitered_data_request, args);
  217. return 1;
  218. }
  219. unsigned ___starpu_attempt_to_submit_arbitered_data_request(unsigned request_from_codelet,
  220. starpu_data_handle_t handle, enum starpu_data_access_mode mode,
  221. void (*callback)(void *), void *argcb,
  222. struct _starpu_job *j, unsigned buffer_index)
  223. {
  224. STARPU_ASSERT(handle->arbiter);
  225. #else // LOCK_OR_DELEGATE
  226. unsigned _starpu_attempt_to_submit_arbitered_data_request(unsigned request_from_codelet,
  227. starpu_data_handle_t handle, enum starpu_data_access_mode mode,
  228. void (*callback)(void *), void *argcb,
  229. struct _starpu_job *j, unsigned buffer_index)
  230. {
  231. starpu_arbiter_t arbiter = handle->arbiter;
  232. STARPU_PTHREAD_MUTEX_LOCK(&arbiter->mutex);
  233. #endif // LOCK_OR_DELEGATE
  234. if (mode == STARPU_RW)
  235. mode = STARPU_W;
  236. STARPU_ASSERT_MSG(!(mode & STARPU_REDUX), "REDUX with arbiter is not implemented\n");
  237. /* Take the lock protecting the header. We try to do some progression
  238. * in case this is called from a worker, otherwise we just wait for the
  239. * lock to be available. */
  240. if (request_from_codelet)
  241. {
  242. int cpt = 0;
  243. while (cpt < STARPU_SPIN_MAXTRY && _starpu_spin_trylock(&handle->header_lock))
  244. {
  245. cpt++;
  246. _starpu_datawizard_progress(_starpu_memory_node_get_local_key(), 0);
  247. }
  248. if (cpt == STARPU_SPIN_MAXTRY)
  249. _starpu_spin_lock(&handle->header_lock);
  250. }
  251. else
  252. {
  253. _starpu_spin_lock(&handle->header_lock);
  254. }
  255. /* If there is currently nobody accessing the piece of data, or it's
  256. * not another writter and if this is the same type of access as the
  257. * current one, we can proceed. */
  258. unsigned put_in_list;
  259. if (handle->refcnt)
  260. {
  261. /* there cannot be multiple writers or a new writer
  262. * while the data is in read mode */
  263. handle->busy_count++;
  264. /* enqueue the request */
  265. struct _starpu_data_requester *r = _starpu_data_requester_new();
  266. r->mode = mode;
  267. r->is_requested_by_codelet = request_from_codelet;
  268. r->j = j;
  269. r->buffer_index = buffer_index;
  270. r->ready_data_callback = callback;
  271. r->argcb = argcb;
  272. _starpu_data_requester_list_push_back(&handle->arbitered_req_list, r);
  273. /* failed */
  274. put_in_list = 1;
  275. }
  276. else
  277. {
  278. handle->refcnt++;
  279. handle->busy_count++;
  280. /* Do not write to handle->current_mode if it is already
  281. * R. This avoids a spurious warning from helgrind when
  282. * the following happens:
  283. * acquire(R) in thread A
  284. * acquire(R) in thread B
  285. * release_data_on_node() in thread A
  286. * helgrind would shout that the latter reads current_mode
  287. * unsafely.
  288. *
  289. * This actually basically explains helgrind that it is a
  290. * shared R acquisition.
  291. */
  292. if (mode != STARPU_R || handle->current_mode != mode)
  293. handle->current_mode = mode;
  294. /* success */
  295. put_in_list = 0;
  296. }
  297. _starpu_spin_unlock(&handle->header_lock);
  298. #ifndef LOCK_OR_DELEGATE
  299. STARPU_PTHREAD_MUTEX_UNLOCK(&arbiter->mutex);
  300. #endif // LOCK_OR_DELEGATE
  301. return put_in_list;
  302. }
  303. #ifdef LOCK_OR_DELEGATE
  304. /* These are the arguments passed to _submit_job_enforce_arbitered_deps */
  305. struct starpu_enforce_arbitered_args
  306. {
  307. struct _starpu_job *j;
  308. unsigned buf;
  309. unsigned nbuffers;
  310. };
  311. static void ___starpu_submit_job_enforce_arbitered_deps(struct _starpu_job *j, unsigned buf, unsigned nbuffers);
  312. static void __starpu_submit_job_enforce_arbitered_deps(void* inData)
  313. {
  314. struct starpu_enforce_arbitered_args* args = inData;
  315. struct _starpu_job *j = args->j;
  316. unsigned buf = args->buf;
  317. unsigned nbuffers = args->nbuffers;
  318. /* we are in charge of freeing the args */
  319. free(args);
  320. ___starpu_submit_job_enforce_arbitered_deps(j, buf, nbuffers);
  321. }
  322. void _starpu_submit_job_enforce_arbitered_deps(struct _starpu_job *j, unsigned buf, unsigned nbuffers)
  323. {
  324. struct starpu_enforce_arbitered_args* args = malloc(sizeof(*args));
  325. starpu_data_handle_t handle = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, buf);
  326. args->j = j;
  327. args->buf = buf;
  328. args->nbuffers = nbuffers;
  329. /* The function will delete args */
  330. _starpu_LockOrDelegatePostOrPerform(handle->arbiter, &__starpu_submit_job_enforce_arbitered_deps, args);
  331. }
  332. static void ___starpu_submit_job_enforce_arbitered_deps(struct _starpu_job *j, unsigned buf, unsigned nbuffers)
  333. {
  334. starpu_arbiter_t arbiter = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, buf)->arbiter;
  335. #else // LOCK_OR_DELEGATE
  336. void _starpu_submit_job_enforce_arbitered_deps(struct _starpu_job *j, unsigned buf, unsigned nbuffers)
  337. {
  338. starpu_arbiter_t arbiter = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, buf)->arbiter;
  339. STARPU_PTHREAD_MUTEX_LOCK(&arbiter->mutex);
  340. #endif
  341. STARPU_ASSERT(arbiter);
  342. const unsigned start_buf_arbiter = buf;
  343. unsigned idx_buf_arbiter;
  344. unsigned all_arbiter_available = 1;
  345. starpu_data_handle_t handle;
  346. enum starpu_data_access_mode mode;
  347. for (idx_buf_arbiter = start_buf_arbiter; idx_buf_arbiter < nbuffers; idx_buf_arbiter++)
  348. {
  349. handle = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_arbiter);
  350. mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, idx_buf_arbiter);
  351. if (idx_buf_arbiter && (_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_arbiter-1)==handle))
  352. /* We have already requested this data, skip it. This
  353. * depends on ordering putting writes before reads, see
  354. * _starpu_compar_handles. */
  355. continue;
  356. if (handle->arbiter != arbiter)
  357. {
  358. /* another arbiter */
  359. break;
  360. }
  361. /* Try to take handle */
  362. _starpu_spin_lock(&handle->header_lock);
  363. if (handle->refcnt == 0)
  364. {
  365. /* Got it */
  366. handle->refcnt++;
  367. handle->busy_count++;
  368. handle->current_mode = mode;
  369. _starpu_spin_unlock(&handle->header_lock);
  370. }
  371. else
  372. {
  373. /* a handle does not have a refcnt == 0, stop */
  374. _starpu_spin_unlock(&handle->header_lock);
  375. all_arbiter_available = 0;
  376. break;
  377. }
  378. }
  379. if (all_arbiter_available == 0)
  380. {
  381. /* Oups, record ourself as waiting for this data */
  382. struct _starpu_data_requester *r = _starpu_data_requester_new();
  383. r->mode = mode;
  384. r->is_requested_by_codelet = 1;
  385. r->j = j;
  386. r->buffer_index = start_buf_arbiter;
  387. r->ready_data_callback = NULL;
  388. r->argcb = NULL;
  389. /* store node in list */
  390. _starpu_data_requester_list_push_front(&handle->arbitered_req_list, r);
  391. _starpu_spin_lock(&handle->header_lock);
  392. handle->busy_count++;
  393. _starpu_spin_unlock(&handle->header_lock);
  394. /* and cancel all taken */
  395. unsigned idx_buf_cancel;
  396. for (idx_buf_cancel = start_buf_arbiter; idx_buf_cancel < idx_buf_arbiter ; idx_buf_cancel++)
  397. {
  398. starpu_data_handle_t cancel_handle = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_cancel);
  399. if (idx_buf_cancel && (_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_cancel-1)==cancel_handle))
  400. continue;
  401. if (cancel_handle->arbiter != arbiter)
  402. /* Will have to process another arbiter, will do that later */
  403. break;
  404. _starpu_spin_lock(&cancel_handle->header_lock);
  405. /* reset the counter because finally we do not take the data */
  406. STARPU_ASSERT(cancel_handle->refcnt == 1);
  407. cancel_handle->refcnt--;
  408. STARPU_ASSERT(cancel_handle->busy_count > 0);
  409. cancel_handle->busy_count--;
  410. if (!_starpu_data_check_not_busy(cancel_handle))
  411. _starpu_spin_unlock(&cancel_handle->header_lock);
  412. }
  413. #ifndef LOCK_OR_DELEGATE
  414. STARPU_PTHREAD_MUTEX_UNLOCK(&arbiter->mutex);
  415. #endif
  416. return;
  417. }
  418. #ifndef LOCK_OR_DELEGATE
  419. STARPU_PTHREAD_MUTEX_UNLOCK(&arbiter->mutex);
  420. #endif
  421. // all_arbiter_available is true
  422. if (idx_buf_arbiter < nbuffers)
  423. /* Other arbitered data, process them */
  424. _starpu_submit_job_enforce_arbitered_deps(j, idx_buf_arbiter, nbuffers);
  425. else
  426. /* Finished with all data, can eventually push! */
  427. _starpu_push_task(j);
  428. }
  429. #ifdef LOCK_OR_DELEGATE
  430. void ___starpu_notify_arbitered_dependencies(starpu_data_handle_t handle);
  431. void __starpu_notify_arbitered_dependencies(void* inData)
  432. {
  433. starpu_data_handle_t handle = inData;
  434. ___starpu_notify_arbitered_dependencies(handle);
  435. }
  436. void _starpu_notify_arbitered_dependencies(starpu_data_handle_t handle)
  437. {
  438. _starpu_LockOrDelegatePostOrPerform(handle->arbiter, &__starpu_notify_arbitered_dependencies, handle);
  439. }
  440. void ___starpu_notify_arbitered_dependencies(starpu_data_handle_t handle)
  441. #else // LOCK_OR_DELEGATE
  442. void _starpu_notify_arbitered_dependencies(starpu_data_handle_t handle)
  443. #endif
  444. {
  445. starpu_arbiter_t arbiter = handle->arbiter;
  446. #ifndef LOCK_OR_DELEGATE
  447. STARPU_PTHREAD_MUTEX_LOCK(&arbiter->mutex);
  448. #endif
  449. /* Since the request has been posted the handle may have been proceed and released */
  450. if (_starpu_data_requester_list_empty(&handle->arbitered_req_list))
  451. {
  452. #ifndef LOCK_OR_DELEGATE
  453. STARPU_PTHREAD_MUTEX_UNLOCK(&arbiter->mutex);
  454. #endif
  455. return;
  456. }
  457. /* Note: we may be putting back our own requests, so avoid looping by
  458. * extracting the list */
  459. struct _starpu_data_requester_list l = handle->arbitered_req_list;
  460. _starpu_data_requester_list_init(&handle->arbitered_req_list);
  461. while (!_starpu_data_requester_list_empty(&l))
  462. {
  463. struct _starpu_data_requester *r = _starpu_data_requester_list_pop_front(&l);
  464. if (!r->is_requested_by_codelet)
  465. {
  466. /* data_acquire_cb, process it */
  467. enum starpu_data_access_mode r_mode = r->mode;
  468. if (r_mode == STARPU_RW)
  469. r_mode = STARPU_W;
  470. _starpu_spin_lock(&handle->header_lock);
  471. handle->refcnt++;
  472. handle->busy_count++;
  473. handle->current_mode = r_mode;
  474. _starpu_spin_unlock(&handle->header_lock);
  475. /* Put back remaining requests */
  476. _starpu_data_requester_list_push_list_back(&handle->arbitered_req_list, &l);
  477. #ifndef LOCK_OR_DELEGATE
  478. STARPU_PTHREAD_MUTEX_UNLOCK(&arbiter->mutex);
  479. #endif
  480. r->ready_data_callback(r->argcb);
  481. _starpu_data_requester_delete(r);
  482. _starpu_spin_lock(&handle->header_lock);
  483. STARPU_ASSERT(handle->busy_count > 0);
  484. handle->busy_count--;
  485. if (!_starpu_data_check_not_busy(handle))
  486. _starpu_spin_unlock(&handle->header_lock);
  487. return;
  488. }
  489. /* A task waiting for a set of data, try to acquire them */
  490. struct _starpu_job* j = r->j;
  491. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(j->task);
  492. unsigned idx_buf_arbiter;
  493. unsigned all_arbiter_available = 1;
  494. starpu_data_handle_t handle_arbiter;
  495. enum starpu_data_access_mode mode;
  496. unsigned start_buf_arbiter = r->buffer_index;
  497. for (idx_buf_arbiter = start_buf_arbiter; idx_buf_arbiter < nbuffers; idx_buf_arbiter++)
  498. {
  499. handle_arbiter = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_arbiter);
  500. if (idx_buf_arbiter && (_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_arbiter-1)==handle_arbiter))
  501. /* We have already requested this data, skip it. This
  502. * depends on ordering putting writes before reads, see
  503. * _starpu_compar_handles. */
  504. continue;
  505. if (handle_arbiter->arbiter != arbiter)
  506. /* Will have to process another arbiter, will do that later */
  507. break;
  508. mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, idx_buf_arbiter);
  509. /* we post all arbiter */
  510. _starpu_spin_lock(&handle_arbiter->header_lock);
  511. if (handle_arbiter->refcnt != 0)
  512. {
  513. /* handle is not available, record ourself */
  514. _starpu_spin_unlock(&handle_arbiter->header_lock);
  515. all_arbiter_available = 0;
  516. break;
  517. }
  518. /* mark the handle as taken */
  519. handle_arbiter->refcnt++;
  520. handle_arbiter->busy_count++;
  521. handle_arbiter->current_mode = mode;
  522. _starpu_spin_unlock(&handle_arbiter->header_lock);
  523. }
  524. if (all_arbiter_available)
  525. {
  526. /* Success! Drop request */
  527. _starpu_data_requester_delete(r);
  528. _starpu_spin_lock(&handle->header_lock);
  529. STARPU_ASSERT(handle->busy_count > 0);
  530. handle->busy_count--;
  531. if (!_starpu_data_check_not_busy(handle))
  532. _starpu_spin_unlock(&handle->header_lock);
  533. /* Put back remaining requests */
  534. _starpu_data_requester_list_push_list_back(&handle->arbitered_req_list, &l);
  535. #ifndef LOCK_OR_DELEGATE
  536. STARPU_PTHREAD_MUTEX_UNLOCK(&arbiter->mutex);
  537. #endif
  538. if (idx_buf_arbiter < nbuffers)
  539. /* Other arbitered data, process them */
  540. _starpu_submit_job_enforce_arbitered_deps(j, idx_buf_arbiter, nbuffers);
  541. else
  542. /* Finished with all data, can eventually push! */
  543. _starpu_push_task(j);
  544. return;
  545. }
  546. else
  547. {
  548. /* all handles are not available - record that task on the first unavailable handle */
  549. /* store node in list */
  550. r->mode = mode;
  551. _starpu_data_requester_list_push_front(&handle_arbiter->arbitered_req_list, r);
  552. /* Move check_busy reference too */
  553. _starpu_spin_lock(&handle->header_lock);
  554. STARPU_ASSERT(handle->busy_count > 0);
  555. handle->busy_count--;
  556. if (!_starpu_data_check_not_busy(handle))
  557. _starpu_spin_unlock(&handle->header_lock);
  558. _starpu_spin_lock(&handle_arbiter->header_lock);
  559. handle_arbiter->busy_count++;
  560. _starpu_spin_unlock(&handle_arbiter->header_lock);
  561. /* and revert the mark */
  562. unsigned idx_buf_cancel;
  563. for (idx_buf_cancel = start_buf_arbiter; idx_buf_cancel < idx_buf_arbiter ; idx_buf_cancel++)
  564. {
  565. starpu_data_handle_t cancel_handle = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_cancel);
  566. if (idx_buf_cancel && (_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_cancel-1)==cancel_handle))
  567. continue;
  568. if (cancel_handle->arbiter != arbiter)
  569. break;
  570. _starpu_spin_lock(&cancel_handle->header_lock);
  571. STARPU_ASSERT(cancel_handle->refcnt == 1);
  572. cancel_handle->refcnt--;
  573. STARPU_ASSERT(cancel_handle->busy_count > 0);
  574. cancel_handle->busy_count--;
  575. if (!_starpu_data_check_not_busy(cancel_handle))
  576. _starpu_spin_unlock(&cancel_handle->header_lock);
  577. }
  578. }
  579. }
  580. /* no task has been pushed */
  581. #ifndef LOCK_OR_DELEGATE
  582. STARPU_PTHREAD_MUTEX_UNLOCK(&arbiter->mutex);
  583. #endif
  584. return;
  585. }
  586. starpu_arbiter_t starpu_arbiter_create(void)
  587. {
  588. starpu_arbiter_t res = malloc(sizeof(*res));
  589. #ifdef LOCK_OR_DELEGATE
  590. res->dlTaskListHead = NULL;
  591. _starpu_spin_init(&res->dlListLock);
  592. res->working = 0;
  593. #else /* LOCK_OR_DELEGATE */
  594. STARPU_PTHREAD_MUTEX_INIT(&res->mutex, NULL);
  595. #endif /* LOCK_OR_DELEGATE */
  596. return res;
  597. }
  598. void starpu_data_assign_arbiter(starpu_data_handle_t handle, starpu_arbiter_t arbiter)
  599. {
  600. if (handle->arbiter && handle->arbiter == _starpu_global_arbiter)
  601. /* Just for testing purpose */
  602. return;
  603. STARPU_ASSERT_MSG(!handle->arbiter, "handle can only be assigned one arbiter");
  604. STARPU_ASSERT_MSG(!handle->refcnt, "arbiter can be assigned to handle only right after initialization");
  605. STARPU_ASSERT_MSG(!handle->busy_count, "arbiter can be assigned to handle only right after initialization");
  606. handle->arbiter = arbiter;
  607. }
  608. void starpu_arbiter_destroy(starpu_arbiter_t arbiter)
  609. {
  610. #ifdef LOCK_OR_DELEGATE
  611. _starpu_spin_lock(&arbiter->dlListLock);
  612. STARPU_ASSERT(!arbiter->dlTaskListHead);
  613. STARPU_ASSERT(!arbiter->working);
  614. _starpu_spin_unlock(&arbiter->dlListLock);
  615. _starpu_spin_destroy(&arbiter->dlListLock);
  616. #else /* LOCK_OR_DELEGATE */
  617. STARPU_PTHREAD_MUTEX_LOCK(&arbiter->mutex);
  618. STARPU_PTHREAD_MUTEX_UNLOCK(&arbiter->mutex);
  619. STARPU_PTHREAD_MUTEX_DESTROY(&arbiter->mutex);
  620. #endif /* LOCK_OR_DELEGATE */
  621. free(arbiter);
  622. }