data_commute_concurrency.c 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2010-2015 Université de Bordeaux
  4. * Copyright (C) 2010, 2011, 2012, 2013 Centre National de la Recherche Scientifique
  5. * Copyright (C) 2015 Inria
  6. *
  7. * StarPU is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU Lesser General Public License as published by
  9. * the Free Software Foundation; either version 2.1 of the License, or (at
  10. * your option) any later version.
  11. *
  12. * StarPU is distributed in the hope that it will be useful, but
  13. * WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  15. *
  16. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  17. */
  18. #include <core/dependencies/data_concurrency.h>
  19. #include <datawizard/coherency.h>
  20. #include <core/sched_policy.h>
  21. #include <common/starpu_spinlock.h>
  22. #include <datawizard/sort_data_handles.h>
  23. //#define NO_LOCK_OR_DELEGATE
  24. /* Here are the high level algorithms which have been discussed in order
  25. * to manage the commutes.
  26. Pour chaque handle h en commute:
  27. mutex_lock(&arbiter)
  28. relâcher h
  29. Pour chaque tâche Tc en attente sur le handle:
  30. // Juste tester si on peut prendre:
  31. Pour chaque donnée Tc_h qu’il attend:
  32. Si Tc_h est occupé, goto fail
  33. // Vraiment prendre
  34. Pour chaque donnée Tc_h qu’il attend:
  35. lock(Tc_h)
  36. prendre(h) (il devrait être encore disponible si tout le reste utilise bien le mutex arbiter)
  37. lock(Tc_h)
  38. // on a trouvé quelqu’un, on a fini!
  39. _starpu_push_task(Tc);
  40. break;
  41. fail:
  42. // Pas de bol, on essaie une autre tâche
  43. continue;
  44. // relâcher un peu le mutex arbiter de temps en temps
  45. mutex_unlock(&arbiter)
  46. mutex_lock(&arbiter)
  47. Pour chaque handle h en commute:
  48. lock(h)
  49. essayer de prendre h, si échec goto fail;
  50. unlock(h)
  51. mutex_unlock(&arbiter)
  52. return 0
  53. fail:
  54. // s’enregistrer sur la liste des requêtes de h
  55. Pour chaque handle déjà pris:
  56. lock(handle)
  57. relâcher handle
  58. unlock(handle)
  59. mutex_unlock(&arbiter)
  60. */
  61. /* Here are the LockOrDelegate functions
  62. * There are two version depending on the support of the compare and exchange
  63. * support from the compiler
  64. */
  65. #ifndef NO_LOCK_OR_DELEGATE
  66. /* A LockOrDelegate task list */
  67. struct LockOrDelegateListNode
  68. {
  69. int (*func)(void*);
  70. void* data;
  71. struct LockOrDelegateListNode* next;
  72. };
  73. /* If the compiler support C11 and the usage of atomic functions */
  74. #if (201112L <= __STDC_VERSION__) && !(defined(__STDC_NO_ATOMICS__))
  75. #include <stdatomic.h>
  76. /* To know the number of task to perform and attributes the tickets */
  77. static atomic_int dlAtomicCounter;
  78. /* The list of task to perform */
  79. static _Atomic struct LockOrDelegateListNode* dlListHead;
  80. /* Post a task to perform if possible, otherwise put it in the list
  81. * If we can perform this task, we may also perform all the tasks in the list
  82. * This function return 1 if the task (and maybe some others) has been done
  83. * by the calling thread and 0 otherwise (if the task has just been put in the list)
  84. */
  85. int _starpu_LockOrDelegatePostOrPerform(int (*func)(void*), void* data)
  86. {
  87. /* Get our ticket */
  88. int insertionPosition = atomic_load(&dlAtomicCounter);
  89. while (!atomic_compare_exchange_weak(&dlAtomicCounter, &insertionPosition, insertionPosition+1))
  90. ;
  91. /* If we obtain 0 we are responsible of computing all the tasks */
  92. if(insertionPosition == 0)
  93. {
  94. /* start by our current task */
  95. (*func)(data);
  96. /* Compute task of other and manage ticket */
  97. while(1)
  98. {
  99. STARPU_ASSERT(atomic_load(&dlAtomicCounter) > 0);
  100. /* Dec ticket and see if something else has to be done */
  101. int removedPosition = atomic_load(&dlAtomicCounter);
  102. while(!atomic_compare_exchange_weak(&dlAtomicCounter, &removedPosition,removedPosition-1));
  103. if(removedPosition-1 == 0)
  104. {
  105. break;
  106. }
  107. /* Get the next task */
  108. struct LockOrDelegateListNode* removedNode = (struct LockOrDelegateListNode*)atomic_load(&dlListHead);
  109. // Maybe it has not been pushed yet (listHead.load() == nullptr)
  110. while((removedNode = (struct LockOrDelegateListNode*)atomic_load(&dlListHead)) == NULL || !atomic_compare_exchange_weak(&dlListHead, &removedNode,removedNode->next))
  111. ;
  112. STARPU_ASSERT(removedNode);
  113. /* call the task */
  114. (*removedNode->func)(removedNode->data);
  115. // Delete node
  116. free(removedNode);
  117. }
  118. return 1;
  119. }
  120. struct LockOrDelegateListNode* newNode = (struct LockOrDelegateListNode*)malloc(sizeof(struct LockOrDelegateListNode));
  121. STARPU_ASSERT(newNode);
  122. newNode->data = data;
  123. newNode->func = func;
  124. newNode->next = (struct LockOrDelegateListNode*)atomic_load(&dlListHead);
  125. while(!atomic_compare_exchange_weak(&dlListHead, &newNode->next, newNode))
  126. ;
  127. return 0;
  128. }
  129. #else
  130. /* We cannot rely on the C11 atomics */
  131. #warning Lock based version of Lock or Delegate
  132. #include <pthread.h>
  133. #include <errno.h>
  134. /* The list of task to perform */
  135. static struct LockOrDelegateListNode* dlTaskListHead = NULL;
  136. /* To protect the list of tasks */
  137. static pthread_mutex_t dlListLock = PTHREAD_MUTEX_INITIALIZER;
  138. /* To know who is responsible to compute all the tasks */
  139. static pthread_mutex_t dlWorkLock = PTHREAD_MUTEX_INITIALIZER;
  140. /* Post a task to perfom if possible, otherwise put it in the list
  141. * If we can perfom this task, we may also perfom all the tasks in the list
  142. * This function return 1 if the task (and maybe some others) has been done
  143. * by the calling thread and 0 otherwise (if the task has just been put in the list)
  144. */
  145. int _starpu_LockOrDelegatePostOrPerform(int (*func)(void*), void* data)
  146. {
  147. /* We could avoid to allocate if we will be responsible but for simplicity
  148. * we always push the task in the list */
  149. struct LockOrDelegateListNode* newNode = (struct LockOrDelegateListNode*)malloc(sizeof(struct LockOrDelegateListNode));
  150. STARPU_ASSERT(newNode);
  151. newNode->data = data;
  152. newNode->func = func;
  153. /* insert the node */
  154. int ret = pthread_mutex_lock(&dlListLock);
  155. STARPU_ASSERT(ret == 0);
  156. newNode->next = dlTaskListHead;
  157. dlTaskListHead = newNode;
  158. ret = pthread_mutex_unlock(&dlListLock);
  159. STARPU_ASSERT(ret == 0);
  160. /* See if we can compute all the tasks */
  161. if((ret = pthread_mutex_trylock(&dlWorkLock)) == 0)
  162. {
  163. ret = pthread_mutex_lock(&dlListLock);
  164. STARPU_ASSERT(ret == 0);
  165. while(dlTaskListHead != 0)
  166. {
  167. struct LockOrDelegateListNode* iter = dlTaskListHead;
  168. dlTaskListHead = dlTaskListHead->next;
  169. ret = pthread_mutex_unlock(&dlListLock);
  170. STARPU_ASSERT(ret == 0);
  171. (*iter->func)(iter->data);
  172. free(iter);
  173. ret = pthread_mutex_lock(&dlListLock);
  174. STARPU_ASSERT(ret == 0);
  175. }
  176. /* First unlock the list! this is important */
  177. ret = pthread_mutex_unlock(&dlWorkLock);
  178. STARPU_ASSERT(ret == 0);
  179. ret = pthread_mutex_unlock(&dlListLock);
  180. STARPU_ASSERT(ret == 0);
  181. return 1;
  182. }
  183. STARPU_ASSERT(ret == EBUSY);
  184. return 0;
  185. }
  186. #endif
  187. #else // NO_LOCK_OR_DELEGATE
  188. pthread_mutex_t commute_global_mutex = PTHREAD_MUTEX_INITIALIZER;
  189. #endif
  190. /* This function find a node that contains the parameter j as job and remove it from the list
  191. * the function return 0 if a node was found and deleted, 1 otherwise
  192. */
  193. static unsigned remove_job_from_requester_list(struct _starpu_data_requester_list* req_list, struct _starpu_job * j)
  194. {
  195. struct _starpu_data_requester * iter = _starpu_data_requester_list_begin(req_list);//_head;
  196. while(iter != _starpu_data_requester_list_end(req_list) && iter->j != j)
  197. {
  198. iter = _starpu_data_requester_list_next(iter); // iter = iter->_next;
  199. }
  200. if(iter)
  201. {
  202. _starpu_data_requester_list_erase(req_list, iter);
  203. return 0;
  204. }
  205. return 1;
  206. }
  207. #ifndef NO_LOCK_OR_DELEGATE
  208. int _starpu_submit_job_enforce_commute_deps(void* inData)
  209. {
  210. struct starpu_enforce_commute_args* args = (struct starpu_enforce_commute_args*)inData;
  211. struct _starpu_job *j = args->j;
  212. unsigned buf = args->buf;
  213. unsigned nbuffers = args->nbuffers;
  214. /* we are in charge of freeing the args */
  215. free(args);
  216. args = NULL;
  217. inData = NULL;
  218. #else // NO_LOCK_OR_DELEGATE
  219. int _submit_job_enforce_commute_deps(struct _starpu_job *j, unsigned buf, unsigned nbuffers)
  220. {
  221. int ret = pthread_mutex_lock(&commute_global_mutex);
  222. STARPU_ASSERT(ret == 0);
  223. #endif
  224. const unsigned nb_non_commute_buff = buf;
  225. unsigned idx_buf_commute;
  226. unsigned all_commutes_available = 1;
  227. for (idx_buf_commute = nb_non_commute_buff; idx_buf_commute < nbuffers; idx_buf_commute++)
  228. {
  229. if (idx_buf_commute && (_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_commute-1)==_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_commute)))
  230. continue;
  231. /* we post all commute */
  232. starpu_data_handle_t handle = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_commute);
  233. enum starpu_data_access_mode mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, idx_buf_commute);
  234. STARPU_ASSERT(mode & STARPU_COMMUTE);
  235. _starpu_spin_lock(&handle->header_lock);
  236. if(handle->refcnt == 0)
  237. {
  238. handle->refcnt += 1;
  239. handle->busy_count += 1;
  240. handle->current_mode = mode;
  241. _starpu_spin_unlock(&handle->header_lock);
  242. }
  243. else
  244. {
  245. /* stop if an handle do not have a refcnt == 0 */
  246. _starpu_spin_unlock(&handle->header_lock);
  247. all_commutes_available = 0;
  248. break;
  249. }
  250. }
  251. if(all_commutes_available == 0)
  252. {
  253. /* Oups cancel all taken and put req in commute list */
  254. unsigned idx_buf_cancel;
  255. for (idx_buf_cancel = nb_non_commute_buff; idx_buf_cancel < idx_buf_commute ; idx_buf_cancel++)
  256. {
  257. if (idx_buf_cancel && (_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_cancel-1)==_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_cancel)))
  258. continue;
  259. starpu_data_handle_t cancel_handle = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_cancel);
  260. _starpu_spin_lock(&cancel_handle->header_lock);
  261. /* reset the counter because finally we do not take the data */
  262. STARPU_ASSERT(cancel_handle->refcnt == 1);
  263. cancel_handle->refcnt -= 1;
  264. _starpu_spin_unlock(&cancel_handle->header_lock);
  265. }
  266. for (idx_buf_cancel = nb_non_commute_buff; idx_buf_cancel < nbuffers ; idx_buf_cancel++)
  267. {
  268. starpu_data_handle_t cancel_handle = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_cancel);
  269. enum starpu_data_access_mode cancel_mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, idx_buf_cancel);
  270. STARPU_ASSERT(cancel_mode & STARPU_COMMUTE);
  271. struct _starpu_data_requester *r = _starpu_data_requester_new();
  272. r->mode = cancel_mode;
  273. r->is_requested_by_codelet = 1;
  274. r->j = j;
  275. r->buffer_index = idx_buf_cancel;
  276. r->ready_data_callback = NULL;
  277. r->argcb = NULL;
  278. _starpu_spin_lock(&cancel_handle->header_lock);
  279. /* create list if needed */
  280. if(cancel_handle->commute_req_list == NULL)
  281. cancel_handle->commute_req_list = _starpu_data_requester_list_new();
  282. /* store node in list */
  283. _starpu_data_requester_list_push_front(cancel_handle->commute_req_list, r);
  284. /* inc the busy count if it has not been changed in the previous loop */
  285. if(idx_buf_commute <= idx_buf_cancel)
  286. cancel_handle->busy_count += 1;
  287. _starpu_spin_unlock(&cancel_handle->header_lock);
  288. }
  289. #ifdef NO_LOCK_OR_DELEGATE
  290. ret = pthread_mutex_unlock(&commute_global_mutex);
  291. STARPU_ASSERT(ret == 0);
  292. #endif
  293. return 1;
  294. }
  295. // all_commutes_available is true
  296. _starpu_push_task(j);
  297. #ifdef NO_LOCK_OR_DELEGATE
  298. ret = pthread_mutex_unlock(&commute_global_mutex);
  299. STARPU_ASSERT(ret == 0);
  300. #endif
  301. return 0;
  302. }
  303. #ifndef NO_LOCK_OR_DELEGATE
  304. int _starpu_notify_commute_dependencies(void* inData)
  305. {
  306. starpu_data_handle_t handle = (starpu_data_handle_t)inData;
  307. #else // NO_LOCK_OR_DELEGATE
  308. int _starpu_notify_commute_dependencies(starpu_data_handle_t handle)
  309. {
  310. int ret = pthread_mutex_lock(&commute_global_mutex);
  311. STARPU_ASSERT(ret == 0);
  312. #endif
  313. /* Since the request has been posted the handle may have been proceed and released */
  314. if(handle->commute_req_list == NULL)
  315. {
  316. #ifdef NO_LOCK_OR_DELEGATE
  317. ret = pthread_mutex_unlock(&commute_global_mutex);
  318. STARPU_ASSERT(ret == 0);
  319. #endif
  320. return 1;
  321. }
  322. /* no one has the right to work on commute_req_list without a lock on commute_global_mutex
  323. so we do not need to lock the handle for safety */
  324. struct _starpu_data_requester *r;
  325. r = _starpu_data_requester_list_begin(handle->commute_req_list); //_head;
  326. while(r)
  327. {
  328. struct _starpu_job* j = r->j;
  329. STARPU_ASSERT(r->mode & STARPU_COMMUTE);
  330. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(j->task);
  331. unsigned nb_non_commute_buff;
  332. /* find the position of commute buffers */
  333. for (nb_non_commute_buff = 0; nb_non_commute_buff < nbuffers; nb_non_commute_buff++)
  334. {
  335. if (nb_non_commute_buff && (_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, nb_non_commute_buff-1) == _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, nb_non_commute_buff)))
  336. continue;
  337. enum starpu_data_access_mode mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, nb_non_commute_buff);
  338. if(mode & STARPU_COMMUTE)
  339. {
  340. break;
  341. }
  342. }
  343. unsigned idx_buf_commute;
  344. unsigned all_commutes_available = 1;
  345. for (idx_buf_commute = nb_non_commute_buff; idx_buf_commute < nbuffers; idx_buf_commute++)
  346. {
  347. if (idx_buf_commute && (_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_commute-1)==_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_commute)))
  348. continue;
  349. /* we post all commute */
  350. starpu_data_handle_t handle_commute = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_commute);
  351. enum starpu_data_access_mode mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, idx_buf_commute);
  352. STARPU_ASSERT(mode & STARPU_COMMUTE);
  353. _starpu_spin_lock(&handle_commute->header_lock);
  354. if(handle_commute->refcnt != 0)
  355. {
  356. /* handle is not available */
  357. _starpu_spin_unlock(&handle_commute->header_lock);
  358. all_commutes_available = 0;
  359. break;
  360. }
  361. /* mark the handle as taken */
  362. handle_commute->refcnt += 1;
  363. handle_commute->current_mode = mode;
  364. _starpu_spin_unlock(&handle_commute->header_lock);
  365. }
  366. if(all_commutes_available)
  367. {
  368. for (idx_buf_commute = nb_non_commute_buff; idx_buf_commute < nbuffers; idx_buf_commute++)
  369. {
  370. if (idx_buf_commute && (_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_commute-1)==_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_commute)))
  371. continue;
  372. /* we post all commute */
  373. starpu_data_handle_t handle_commute = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_commute);
  374. enum starpu_data_access_mode mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, idx_buf_commute);
  375. STARPU_ASSERT(mode & STARPU_COMMUTE);
  376. _starpu_spin_lock(&handle_commute->header_lock);
  377. STARPU_ASSERT(handle_commute->refcnt == 1);
  378. STARPU_ASSERT( handle_commute->busy_count >= 1);
  379. STARPU_ASSERT( handle_commute->current_mode == mode);
  380. const unsigned correctly_deleted = remove_job_from_requester_list(handle_commute->commute_req_list, j);
  381. STARPU_ASSERT(correctly_deleted == 0);
  382. if(_starpu_data_requester_list_empty(handle_commute->commute_req_list)) // If size == 0
  383. {
  384. _starpu_data_requester_list_delete(handle_commute->commute_req_list);
  385. handle_commute->commute_req_list = NULL;
  386. }
  387. _starpu_spin_unlock(&handle_commute->header_lock);
  388. }
  389. /* delete list node */
  390. _starpu_data_requester_delete(r);
  391. /* push the task */
  392. _starpu_push_task(j);
  393. /* release global mutex */
  394. #ifdef NO_LOCK_OR_DELEGATE
  395. ret = pthread_mutex_unlock(&commute_global_mutex);
  396. STARPU_ASSERT(ret == 0);
  397. #endif
  398. /* We need to lock when returning 0 */
  399. return 0;
  400. }
  401. else
  402. {
  403. unsigned idx_buf_cancel;
  404. /* all handles are not available - revert the mark */
  405. for (idx_buf_cancel = nb_non_commute_buff; idx_buf_cancel < idx_buf_commute ; idx_buf_cancel++)
  406. {
  407. starpu_data_handle_t cancel_handle = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_cancel);
  408. _starpu_spin_lock(&cancel_handle->header_lock);
  409. STARPU_ASSERT(cancel_handle->refcnt == 1);
  410. cancel_handle->refcnt -= 1;
  411. _starpu_spin_unlock(&cancel_handle->header_lock);
  412. }
  413. }
  414. r = r->_next;
  415. }
  416. /* no task has been pushed */
  417. #ifdef NO_LOCK_OR_DELEGATE
  418. ret = pthread_mutex_unlock(&commute_global_mutex);
  419. STARPU_ASSERT(ret == 0);
  420. #endif
  421. return 1;
  422. }