fifo_queues.c 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2010-2015 Université de Bordeaux
  4. * Copyright (C) 2010, 2011, 2013 CNRS
  5. * Copyright (C) 2011 Télécom-SudParis
  6. *
  7. * StarPU is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU Lesser General Public License as published by
  9. * the Free Software Foundation; either version 2.1 of the License, or (at
  10. * your option) any later version.
  11. *
  12. * StarPU is distributed in the hope that it will be useful, but
  13. * WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  15. *
  16. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  17. */
  18. /* FIFO queues, ready for use by schedulers */
  19. #include <starpu_scheduler.h>
  20. #include <sched_policies/fifo_queues.h>
  21. #include <common/fxt.h>
  22. /*
  23. static int is_sorted_task_list(struct starpu_task * task)
  24. {
  25. if(!task)
  26. return 1;
  27. struct starpu_task * next = task->next;
  28. if(!next)
  29. return 1;
  30. while(next)
  31. {
  32. if(task->priority < next->priority)
  33. return 0;
  34. task = next;
  35. next = next->next;
  36. }
  37. return 1;
  38. }
  39. */
  40. struct _starpu_fifo_taskq *_starpu_create_fifo(void)
  41. {
  42. struct _starpu_fifo_taskq *fifo;
  43. fifo = (struct _starpu_fifo_taskq *) malloc(sizeof(struct _starpu_fifo_taskq));
  44. /* note that not all mechanisms (eg. the semaphore) have to be used */
  45. starpu_task_list_init(&fifo->taskq);
  46. fifo->ntasks = 0;
  47. STARPU_HG_DISABLE_CHECKING(fifo->ntasks);
  48. fifo->nprocessed = 0;
  49. fifo->exp_start = starpu_timing_now();
  50. fifo->exp_len = 0.0;
  51. fifo->exp_end = fifo->exp_start;
  52. fifo->exp_len_per_priority = NULL;
  53. return fifo;
  54. }
  55. void _starpu_destroy_fifo(struct _starpu_fifo_taskq *fifo)
  56. {
  57. free(fifo);
  58. }
  59. int _starpu_fifo_empty(struct _starpu_fifo_taskq *fifo)
  60. {
  61. return fifo->ntasks == 0;
  62. }
  63. double
  64. _starpu_fifo_get_exp_len_prev_task_list(struct _starpu_fifo_taskq *fifo_queue, struct starpu_task *task, int workerid, int nimpl, int *fifo_ntasks)
  65. {
  66. struct starpu_task_list *list = &fifo_queue->taskq;
  67. struct starpu_perfmodel_arch* perf_arch = starpu_worker_get_perf_archtype(workerid, task->sched_ctx);
  68. double exp_len = 0.0;
  69. if (list->head != NULL)
  70. {
  71. struct starpu_task *current = list->head;
  72. struct starpu_task *prev = NULL;
  73. while (current)
  74. {
  75. if (current->priority < task->priority)
  76. break;
  77. prev = current;
  78. current = current->next;
  79. }
  80. if (prev != NULL)
  81. {
  82. if (current)
  83. {
  84. /* the task's place is between prev and current */
  85. struct starpu_task *it;
  86. for(it = list->head; it != current; it = it->next)
  87. {
  88. exp_len += starpu_task_expected_length(it, perf_arch, nimpl);
  89. (*fifo_ntasks) ++;
  90. }
  91. }
  92. else
  93. {
  94. /* the task's place is at the tail of the list */
  95. exp_len = fifo_queue->exp_len;
  96. *fifo_ntasks = fifo_queue->ntasks;
  97. }
  98. }
  99. }
  100. return exp_len;
  101. }
  102. int
  103. _starpu_fifo_push_sorted_task(struct _starpu_fifo_taskq *fifo_queue, struct starpu_task *task)
  104. {
  105. struct starpu_task_list *list = &fifo_queue->taskq;
  106. if (list->head == NULL)
  107. {
  108. list->head = task;
  109. list->tail = task;
  110. task->prev = NULL;
  111. task->next = NULL;
  112. }
  113. else
  114. {
  115. struct starpu_task *current = list->head;
  116. struct starpu_task *prev = NULL;
  117. while (current)
  118. {
  119. if (current->priority < task->priority)
  120. break;
  121. prev = current;
  122. current = current->next;
  123. }
  124. if (prev == NULL)
  125. {
  126. /* Insert at the front of the list */
  127. list->head->prev = task;
  128. task->prev = NULL;
  129. task->next = list->head;
  130. list->head = task;
  131. }
  132. else
  133. {
  134. if (current)
  135. {
  136. /* Insert between prev and current */
  137. task->prev = prev;
  138. prev->next = task;
  139. task->next = current;
  140. current->prev = task;
  141. }
  142. else
  143. {
  144. /* Insert at the tail of the list */
  145. list->tail->next = task;
  146. task->next = NULL;
  147. task->prev = list->tail;
  148. list->tail = task;
  149. }
  150. }
  151. }
  152. fifo_queue->ntasks++;
  153. fifo_queue->nprocessed++;
  154. return 0;
  155. }
  156. int _starpu_fifo_push_task(struct _starpu_fifo_taskq *fifo_queue, struct starpu_task *task)
  157. {
  158. if (task->priority > 0)
  159. {
  160. _starpu_fifo_push_sorted_task(fifo_queue, task);
  161. }
  162. else
  163. {
  164. starpu_task_list_push_back(&fifo_queue->taskq, task);
  165. fifo_queue->ntasks++;
  166. fifo_queue->nprocessed++;
  167. }
  168. return 0;
  169. }
  170. int _starpu_fifo_push_back_task(struct _starpu_fifo_taskq *fifo_queue, struct starpu_task *task)
  171. {
  172. if (task->priority > 0)
  173. {
  174. _starpu_fifo_push_sorted_task(fifo_queue, task);
  175. }
  176. else
  177. {
  178. starpu_task_list_push_front(&fifo_queue->taskq, task);
  179. fifo_queue->ntasks++;
  180. }
  181. return 0;
  182. }
  183. struct starpu_task *_starpu_fifo_pop_task(struct _starpu_fifo_taskq *fifo_queue, int workerid)
  184. {
  185. struct starpu_task *task;
  186. for (task = starpu_task_list_begin(&fifo_queue->taskq);
  187. task != starpu_task_list_end(&fifo_queue->taskq);
  188. task = starpu_task_list_next(task))
  189. {
  190. unsigned nimpl;
  191. STARPU_ASSERT(task);
  192. if (starpu_worker_can_execute_task_first_impl(workerid, task, &nimpl))
  193. {
  194. starpu_task_set_implementation(task, nimpl);
  195. starpu_task_list_erase(&fifo_queue->taskq, task);
  196. fifo_queue->ntasks--;
  197. return task;
  198. }
  199. }
  200. return NULL;
  201. }
  202. /* This is the same as _starpu_fifo_pop_task, but without checking that the
  203. * worker will be able to execute this task. This is useful when the scheduler
  204. * has already checked it. */
  205. struct starpu_task *_starpu_fifo_pop_local_task(struct _starpu_fifo_taskq *fifo_queue)
  206. {
  207. struct starpu_task *task = NULL;
  208. if (!starpu_task_list_empty(&fifo_queue->taskq))
  209. {
  210. task = starpu_task_list_pop_front(&fifo_queue->taskq);
  211. fifo_queue->ntasks--;
  212. }
  213. return task;
  214. }
  215. /* pop every task that can be executed on the calling driver */
  216. struct starpu_task *_starpu_fifo_pop_every_task(struct _starpu_fifo_taskq *fifo_queue, int workerid)
  217. {
  218. struct starpu_task_list *old_list;
  219. unsigned size;
  220. struct starpu_task *new_list = NULL;
  221. struct starpu_task *new_list_tail = NULL;
  222. size = fifo_queue->ntasks;
  223. if (size > 0)
  224. {
  225. old_list = &fifo_queue->taskq;
  226. unsigned new_list_size = 0;
  227. struct starpu_task *task, *next_task;
  228. /* note that this starts at the _head_ of the list, so we put
  229. * elements at the back of the new list */
  230. task = starpu_task_list_front(old_list);
  231. while (task)
  232. {
  233. unsigned nimpl;
  234. next_task = task->next;
  235. if (starpu_worker_can_execute_task_first_impl(workerid, task, &nimpl))
  236. {
  237. /* this elements can be moved into the new list */
  238. new_list_size++;
  239. starpu_task_list_erase(old_list, task);
  240. if (new_list_tail)
  241. {
  242. new_list_tail->next = task;
  243. task->prev = new_list_tail;
  244. task->next = NULL;
  245. new_list_tail = task;
  246. }
  247. else
  248. {
  249. new_list = task;
  250. new_list_tail = task;
  251. task->prev = NULL;
  252. task->next = NULL;
  253. }
  254. starpu_task_set_implementation(task, nimpl);
  255. }
  256. task = next_task;
  257. }
  258. fifo_queue->ntasks -= new_list_size;
  259. }
  260. return new_list;
  261. }