data_commute_concurrency.c 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2015 Université de Bordeaux
  4. * Copyright (C) 2015 Inria
  5. *
  6. * StarPU is free software; you can redistribute it and/or modify
  7. * it under the terms of the GNU Lesser General Public License as published by
  8. * the Free Software Foundation; either version 2.1 of the License, or (at
  9. * your option) any later version.
  10. *
  11. * StarPU is distributed in the hope that it will be useful, but
  12. * WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  14. *
  15. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  16. */
  17. #include <core/dependencies/data_concurrency.h>
  18. #include <datawizard/coherency.h>
  19. #include <core/sched_policy.h>
  20. #include <common/starpu_spinlock.h>
  21. #include <datawizard/sort_data_handles.h>
  22. #define LOCK_OR_DELEGATE
  23. /*
  24. * This implements a solution for the dining philosophers problem (see
  25. * data_concurrency.c for the rationale) based on a centralized arbiter. This
  26. * allows to get a more parallel solution than the Dijkstra solution, by
  27. * avoiding strictly serialized executions, and instead opportunistically find
  28. * which tasks can take data.
  29. *
  30. * These are the algorithms implemented below:
  31. *
  32. *
  33. * at termination of task T:
  34. *
  35. * - for each handle h of T:
  36. * - mutex_lock(&arbiter)
  37. * - release reference on h
  38. * - for each task Tc waiting for h:
  39. * - for each data Tc_h it is waiting:
  40. * - if Tc_h is busy, goto fail
  41. * // Ok, now really take them
  42. * - For each data Tc_h it is waiting:
  43. * - lock(Tc_h)
  44. * - take reference on h (it should be still available since we hold the arbiter)
  45. * - unlock(Tc_h)
  46. * // Ok, we managed to find somebody, we're finished!
  47. * _starpu_push_task(Tc);
  48. * break;
  49. * fail:
  50. * // No luck, let's try another task
  51. * continue;
  52. * // Release the arbiter mutex a bit from time to time
  53. * - mutex_unlock(&arbiter)
  54. *
  55. *
  56. * at submission of task T:
  57. *
  58. * - mutex_lock(&arbiter)
  59. * - for each handle h of T:
  60. * - lock(h)
  61. * - try to take a reference on h, goto fail on failure
  62. * - unlock(h)
  63. * // Success!
  64. * - mutex_unlock(&arbiter);
  65. * - return 0;
  66. *
  67. * fail:
  68. * // couldn't take everything, abort and record task T
  69. * // drop spurious references
  70. * - for each handle h of T already taken:
  71. * - lock(h)
  72. * - release reference on h
  73. * - unlock(h)
  74. * // record T on the list of requests for h
  75. * - for each handle h of T:
  76. * - record T as waiting on h
  77. * - mutex_unlock(&arbiter)
  78. * - return 1;
  79. */
  80. /* Here are the LockOrDelegate functions
  81. * There are two version depending on the support of the compare and exchange
  82. * support from the compiler
  83. */
  84. #ifdef LOCK_OR_DELEGATE
  85. /* A LockOrDelegate task list */
  86. struct LockOrDelegateListNode
  87. {
  88. void (*func)(void*);
  89. void* data;
  90. struct LockOrDelegateListNode* next;
  91. };
  92. /* If the compiler support C11 and the usage of atomic functions */
  93. #if (201112L <= __STDC_VERSION__) && !(defined(__STDC_NO_ATOMICS__))
  94. #include <stdatomic.h>
  95. /* To know the number of task to perform and attributes the tickets */
  96. static atomic_int dlAtomicCounter;
  97. /* The list of task to perform */
  98. static _Atomic struct LockOrDelegateListNode* dlListHead;
  99. /* Post a task to perform if possible, otherwise put it in the list
  100. * If we can perform this task, we may also perform all the tasks in the list
  101. * This function return 1 if the task (and maybe some others) has been done
  102. * by the calling thread and 0 otherwise (if the task has just been put in the list)
  103. */
  104. static int _starpu_LockOrDelegatePostOrPerform(void (*func)(void*), void* data)
  105. {
  106. /* Get our ticket */
  107. int insertionPosition = atomic_load(&dlAtomicCounter);
  108. while (!atomic_compare_exchange_weak(&dlAtomicCounter, &insertionPosition, insertionPosition+1))
  109. ;
  110. /* If we obtain 0 we are responsible of computing all the tasks */
  111. if(insertionPosition == 0)
  112. {
  113. /* start by our current task */
  114. (*func)(data);
  115. /* Compute task of other and manage ticket */
  116. while(1)
  117. {
  118. STARPU_ASSERT(atomic_load(&dlAtomicCounter) > 0);
  119. /* Dec ticket and see if something else has to be done */
  120. int removedPosition = atomic_load(&dlAtomicCounter);
  121. while(!atomic_compare_exchange_weak(&dlAtomicCounter, &removedPosition,removedPosition-1))
  122. ;
  123. if(removedPosition-1 == 0)
  124. {
  125. break;
  126. }
  127. /* Get the next task */
  128. struct LockOrDelegateListNode* removedNode = (struct LockOrDelegateListNode*)atomic_load(&dlListHead);
  129. // Maybe it has not been pushed yet (listHead.load() == nullptr)
  130. while((removedNode = (struct LockOrDelegateListNode*)atomic_load(&dlListHead)) == NULL || !atomic_compare_exchange_weak(&dlListHead, &removedNode,removedNode->next))
  131. ;
  132. STARPU_ASSERT(removedNode);
  133. /* call the task */
  134. (*removedNode->func)(removedNode->data);
  135. // Delete node
  136. free(removedNode);
  137. }
  138. return 1;
  139. }
  140. struct LockOrDelegateListNode* newNode = (struct LockOrDelegateListNode*)malloc(sizeof(struct LockOrDelegateListNode));
  141. STARPU_ASSERT(newNode);
  142. newNode->data = data;
  143. newNode->func = func;
  144. newNode->next = (struct LockOrDelegateListNode*)atomic_load(&dlListHead);
  145. while(!atomic_compare_exchange_weak(&dlListHead, &newNode->next, newNode))
  146. ;
  147. return 0;
  148. }
  149. #else
  150. /* We cannot rely on the C11 atomics */
  151. #warning Lock based version of Lock or Delegate
  152. /* The list of task to perform */
  153. static struct LockOrDelegateListNode* dlTaskListHead = NULL;
  154. /* To protect the list of tasks */
  155. static starpu_pthread_mutex_t dlListLock = STARPU_PTHREAD_MUTEX_INITIALIZER;
  156. /* To know who is responsible to compute all the tasks */
  157. static starpu_pthread_mutex_t dlWorkLock = STARPU_PTHREAD_MUTEX_INITIALIZER;
  158. /* Post a task to perfom if possible, otherwise put it in the list
  159. * If we can perfom this task, we may also perfom all the tasks in the list
  160. * This function return 1 if the task (and maybe some others) has been done
  161. * by the calling thread and 0 otherwise (if the task has just been put in the list)
  162. */
  163. static int _starpu_LockOrDelegatePostOrPerform(void (*func)(void*), void* data)
  164. {
  165. /* We could avoid to allocate if we will be responsible but for simplicity
  166. * we always push the task in the list */
  167. struct LockOrDelegateListNode* newNode = (struct LockOrDelegateListNode*)malloc(sizeof(struct LockOrDelegateListNode));
  168. STARPU_ASSERT(newNode);
  169. newNode->data = data;
  170. newNode->func = func;
  171. int ret;
  172. /* insert the node */
  173. STARPU_PTHREAD_MUTEX_LOCK(&dlListLock);
  174. newNode->next = dlTaskListHead;
  175. dlTaskListHead = newNode;
  176. STARPU_PTHREAD_MUTEX_UNLOCK(&dlListLock);
  177. /* See if we can compute all the tasks */
  178. if((ret = STARPU_PTHREAD_MUTEX_TRYLOCK(&dlWorkLock)) == 0)
  179. {
  180. STARPU_PTHREAD_MUTEX_LOCK(&dlListLock);
  181. while(dlTaskListHead != 0)
  182. {
  183. struct LockOrDelegateListNode* iter = dlTaskListHead;
  184. dlTaskListHead = dlTaskListHead->next;
  185. STARPU_PTHREAD_MUTEX_UNLOCK(&dlListLock);
  186. (*iter->func)(iter->data);
  187. free(iter);
  188. STARPU_PTHREAD_MUTEX_LOCK(&dlListLock);
  189. }
  190. /* First unlock the list! this is important */
  191. STARPU_PTHREAD_MUTEX_UNLOCK(&dlWorkLock);
  192. STARPU_PTHREAD_MUTEX_UNLOCK(&dlListLock);
  193. return 1;
  194. }
  195. STARPU_ASSERT(ret == EBUSY);
  196. return 0;
  197. }
  198. #endif
  199. #else // LOCK_OR_DELEGATE
  200. starpu_pthread_mutex_t commute_global_mutex = STARPU_PTHREAD_MUTEX_INITIALIZER;
  201. #endif
  202. /* This function find a node that contains the parameter j as job and remove it from the list
  203. * the function return 0 if a node was found and deleted, 1 otherwise
  204. */
  205. static unsigned remove_job_from_requester_list(struct _starpu_data_requester_list* req_list, struct _starpu_job * j)
  206. {
  207. struct _starpu_data_requester * iter = _starpu_data_requester_list_begin(req_list);//_head;
  208. while(iter != _starpu_data_requester_list_end(req_list) && iter->j != j)
  209. {
  210. iter = _starpu_data_requester_list_next(iter); // iter = iter->_next;
  211. }
  212. if(iter)
  213. {
  214. _starpu_data_requester_list_erase(req_list, iter);
  215. return 0;
  216. }
  217. return 1;
  218. }
  219. #ifdef LOCK_OR_DELEGATE
  220. /* These are the arguments passed to _submit_job_enforce_commute_deps */
  221. struct starpu_enforce_commute_args
  222. {
  223. struct _starpu_job *j;
  224. unsigned buf;
  225. unsigned nbuffers;
  226. };
  227. static void ___starpu_submit_job_enforce_commute_deps(struct _starpu_job *j, unsigned buf, unsigned nbuffers);
  228. static void __starpu_submit_job_enforce_commute_deps(void* inData)
  229. {
  230. struct starpu_enforce_commute_args* args = (struct starpu_enforce_commute_args*)inData;
  231. struct _starpu_job *j = args->j;
  232. unsigned buf = args->buf;
  233. unsigned nbuffers = args->nbuffers;
  234. /* we are in charge of freeing the args */
  235. free(args);
  236. args = NULL;
  237. inData = NULL;
  238. ___starpu_submit_job_enforce_commute_deps(j, buf, nbuffers);
  239. }
  240. void _starpu_submit_job_enforce_commute_deps(struct _starpu_job *j, unsigned buf, unsigned nbuffers)
  241. {
  242. struct starpu_enforce_commute_args* args = (struct starpu_enforce_commute_args*)malloc(sizeof(struct starpu_enforce_commute_args));
  243. args->j = j;
  244. args->buf = buf;
  245. args->nbuffers = nbuffers;
  246. /* The function will delete args */
  247. _starpu_LockOrDelegatePostOrPerform(&__starpu_submit_job_enforce_commute_deps, args);
  248. }
  249. static void ___starpu_submit_job_enforce_commute_deps(struct _starpu_job *j, unsigned buf, unsigned nbuffers)
  250. {
  251. #else // LOCK_OR_DELEGATE
  252. void _starpu_submit_job_enforce_commute_deps(struct _starpu_job *j, unsigned buf, unsigned nbuffers)
  253. {
  254. STARPU_PTHREAD_MUTEX_LOCK(&commute_global_mutex);
  255. #endif
  256. const unsigned nb_non_commute_buff = buf;
  257. unsigned idx_buf_commute;
  258. unsigned all_commutes_available = 1;
  259. for (idx_buf_commute = nb_non_commute_buff; idx_buf_commute < nbuffers; idx_buf_commute++)
  260. {
  261. if (idx_buf_commute && (_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_commute-1)==_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_commute)))
  262. continue;
  263. /* we post all commute */
  264. starpu_data_handle_t handle = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_commute);
  265. enum starpu_data_access_mode mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, idx_buf_commute);
  266. STARPU_ASSERT(mode & STARPU_COMMUTE);
  267. _starpu_spin_lock(&handle->header_lock);
  268. if(handle->refcnt == 0)
  269. {
  270. handle->refcnt += 1;
  271. handle->busy_count += 1;
  272. handle->current_mode = mode;
  273. _starpu_spin_unlock(&handle->header_lock);
  274. }
  275. else
  276. {
  277. /* stop if an handle do not have a refcnt == 0 */
  278. _starpu_spin_unlock(&handle->header_lock);
  279. all_commutes_available = 0;
  280. break;
  281. }
  282. }
  283. if(all_commutes_available == 0)
  284. {
  285. /* Oups cancel all taken and put req in commute list */
  286. unsigned idx_buf_cancel;
  287. for (idx_buf_cancel = nb_non_commute_buff; idx_buf_cancel < idx_buf_commute ; idx_buf_cancel++)
  288. {
  289. if (idx_buf_cancel && (_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_cancel-1)==_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_cancel)))
  290. continue;
  291. starpu_data_handle_t cancel_handle = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_cancel);
  292. _starpu_spin_lock(&cancel_handle->header_lock);
  293. /* reset the counter because finally we do not take the data */
  294. STARPU_ASSERT(cancel_handle->refcnt == 1);
  295. cancel_handle->refcnt -= 1;
  296. _starpu_spin_unlock(&cancel_handle->header_lock);
  297. }
  298. for (idx_buf_cancel = nb_non_commute_buff; idx_buf_cancel < nbuffers ; idx_buf_cancel++)
  299. {
  300. starpu_data_handle_t cancel_handle = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_cancel);
  301. enum starpu_data_access_mode cancel_mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, idx_buf_cancel);
  302. STARPU_ASSERT(cancel_mode & STARPU_COMMUTE);
  303. struct _starpu_data_requester *r = _starpu_data_requester_new();
  304. r->mode = cancel_mode;
  305. r->is_requested_by_codelet = 1;
  306. r->j = j;
  307. r->buffer_index = idx_buf_cancel;
  308. r->ready_data_callback = NULL;
  309. r->argcb = NULL;
  310. _starpu_spin_lock(&cancel_handle->header_lock);
  311. /* create list if needed */
  312. if(cancel_handle->commute_req_list == NULL)
  313. cancel_handle->commute_req_list = _starpu_data_requester_list_new();
  314. /* store node in list */
  315. _starpu_data_requester_list_push_front(cancel_handle->commute_req_list, r);
  316. /* inc the busy count if it has not been changed in the previous loop */
  317. if(idx_buf_commute <= idx_buf_cancel)
  318. cancel_handle->busy_count += 1;
  319. _starpu_spin_unlock(&cancel_handle->header_lock);
  320. }
  321. #ifndef LOCK_OR_DELEGATE
  322. STARPU_PTHREAD_MUTEX_UNLOCK(&commute_global_mutex);
  323. #endif
  324. return 1;
  325. }
  326. // all_commutes_available is true
  327. _starpu_push_task(j);
  328. #ifndef LOCK_OR_DELEGATE
  329. STARPU_PTHREAD_MUTEX_UNLOCK(&commute_global_mutex);
  330. #endif
  331. return 0;
  332. }
  333. #ifdef LOCK_OR_DELEGATE
  334. void ___starpu_notify_commute_dependencies(starpu_data_handle_t handle);
  335. void __starpu_notify_commute_dependencies(void* inData)
  336. {
  337. starpu_data_handle_t handle = (starpu_data_handle_t)inData;
  338. ___starpu_notify_commute_dependencies(handle);
  339. }
  340. void _starpu_notify_commute_dependencies(starpu_data_handle_t handle)
  341. {
  342. _starpu_LockOrDelegatePostOrPerform(&__starpu_notify_commute_dependencies, handle);
  343. }
  344. void ___starpu_notify_commute_dependencies(starpu_data_handle_t handle)
  345. {
  346. #else // LOCK_OR_DELEGATE
  347. void _starpu_notify_commute_dependencies(starpu_data_handle_t handle)
  348. {
  349. STARPU_PTHREAD_MUTEX_LOCK(&commute_global_mutex);
  350. #endif
  351. /* Since the request has been posted the handle may have been proceed and released */
  352. if(handle->commute_req_list == NULL)
  353. {
  354. #ifndef LOCK_OR_DELEGATE
  355. STARPU_PTHREAD_MUTEX_UNLOCK(&commute_global_mutex);
  356. #endif
  357. return 1;
  358. }
  359. /* no one has the right to work on commute_req_list without a lock on commute_global_mutex
  360. so we do not need to lock the handle for safety */
  361. struct _starpu_data_requester *r;
  362. r = _starpu_data_requester_list_begin(handle->commute_req_list); //_head;
  363. while(r)
  364. {
  365. struct _starpu_job* j = r->j;
  366. STARPU_ASSERT(r->mode & STARPU_COMMUTE);
  367. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(j->task);
  368. unsigned nb_non_commute_buff;
  369. /* find the position of commute buffers */
  370. for (nb_non_commute_buff = 0; nb_non_commute_buff < nbuffers; nb_non_commute_buff++)
  371. {
  372. if (nb_non_commute_buff && (_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, nb_non_commute_buff-1) == _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, nb_non_commute_buff)))
  373. continue;
  374. enum starpu_data_access_mode mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, nb_non_commute_buff);
  375. if(mode & STARPU_COMMUTE)
  376. {
  377. break;
  378. }
  379. }
  380. unsigned idx_buf_commute;
  381. unsigned all_commutes_available = 1;
  382. for (idx_buf_commute = nb_non_commute_buff; idx_buf_commute < nbuffers; idx_buf_commute++)
  383. {
  384. if (idx_buf_commute && (_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_commute-1)==_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_commute)))
  385. continue;
  386. /* we post all commute */
  387. starpu_data_handle_t handle_commute = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_commute);
  388. enum starpu_data_access_mode mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, idx_buf_commute);
  389. STARPU_ASSERT(mode & STARPU_COMMUTE);
  390. _starpu_spin_lock(&handle_commute->header_lock);
  391. if(handle_commute->refcnt != 0)
  392. {
  393. /* handle is not available */
  394. _starpu_spin_unlock(&handle_commute->header_lock);
  395. all_commutes_available = 0;
  396. break;
  397. }
  398. /* mark the handle as taken */
  399. handle_commute->refcnt += 1;
  400. handle_commute->current_mode = mode;
  401. _starpu_spin_unlock(&handle_commute->header_lock);
  402. }
  403. if(all_commutes_available)
  404. {
  405. for (idx_buf_commute = nb_non_commute_buff; idx_buf_commute < nbuffers; idx_buf_commute++)
  406. {
  407. if (idx_buf_commute && (_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_commute-1)==_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_commute)))
  408. continue;
  409. /* we post all commute */
  410. starpu_data_handle_t handle_commute = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_commute);
  411. enum starpu_data_access_mode mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, idx_buf_commute);
  412. STARPU_ASSERT(mode & STARPU_COMMUTE);
  413. _starpu_spin_lock(&handle_commute->header_lock);
  414. STARPU_ASSERT(handle_commute->refcnt == 1);
  415. STARPU_ASSERT( handle_commute->busy_count >= 1);
  416. STARPU_ASSERT( handle_commute->current_mode == mode);
  417. const unsigned correctly_deleted = remove_job_from_requester_list(handle_commute->commute_req_list, j);
  418. STARPU_ASSERT(correctly_deleted == 0);
  419. if(_starpu_data_requester_list_empty(handle_commute->commute_req_list)) // If size == 0
  420. {
  421. _starpu_data_requester_list_delete(handle_commute->commute_req_list);
  422. handle_commute->commute_req_list = NULL;
  423. }
  424. _starpu_spin_unlock(&handle_commute->header_lock);
  425. }
  426. /* delete list node */
  427. _starpu_data_requester_delete(r);
  428. /* push the task */
  429. _starpu_push_task(j);
  430. /* release global mutex */
  431. #ifndef LOCK_OR_DELEGATE
  432. STARPU_PTHREAD_MUTEX_UNLOCK(&commute_global_mutex);
  433. #endif
  434. /* We need to lock when returning 0 */
  435. return 0;
  436. }
  437. else
  438. {
  439. unsigned idx_buf_cancel;
  440. /* all handles are not available - revert the mark */
  441. for (idx_buf_cancel = nb_non_commute_buff; idx_buf_cancel < idx_buf_commute ; idx_buf_cancel++)
  442. {
  443. starpu_data_handle_t cancel_handle = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_cancel);
  444. _starpu_spin_lock(&cancel_handle->header_lock);
  445. STARPU_ASSERT(cancel_handle->refcnt == 1);
  446. cancel_handle->refcnt -= 1;
  447. _starpu_spin_unlock(&cancel_handle->header_lock);
  448. }
  449. }
  450. r = r->_next;
  451. }
  452. /* no task has been pushed */
  453. #ifndef LOCK_OR_DELEGATE
  454. STARPU_PTHREAD_MUTEX_UNLOCK(&commute_global_mutex);
  455. #endif
  456. return 1;
  457. }