fifo_queues.c 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2010-2013 Université de Bordeaux 1
  4. * Copyright (C) 2010, 2011, 2013 Centre National de la Recherche Scientifique
  5. * Copyright (C) 2011 Télécom-SudParis
  6. *
  7. * StarPU is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU Lesser General Public License as published by
  9. * the Free Software Foundation; either version 2.1 of the License, or (at
  10. * your option) any later version.
  11. *
  12. * StarPU is distributed in the hope that it will be useful, but
  13. * WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  15. *
  16. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  17. */
  18. /* FIFO queues, ready for use by schedulers */
  19. #include <starpu_scheduler.h>
  20. #include <sched_policies/fifo_queues.h>
  21. #include <common/fxt.h>
  22. /*
  23. static int is_sorted_task_list(struct starpu_task * task)
  24. {
  25. if(!task)
  26. return 1;
  27. struct starpu_task * next = task->next;
  28. if(!next)
  29. return 1;
  30. while(next)
  31. {
  32. if(task->priority < next->priority)
  33. return 0;
  34. task = next;
  35. next = next->next;
  36. }
  37. return 1;
  38. }
  39. */
  40. struct _starpu_fifo_taskq *_starpu_create_fifo(void)
  41. {
  42. struct _starpu_fifo_taskq *fifo;
  43. fifo = (struct _starpu_fifo_taskq *) malloc(sizeof(struct _starpu_fifo_taskq));
  44. /* note that not all mechanisms (eg. the semaphore) have to be used */
  45. starpu_task_list_init(&fifo->taskq);
  46. fifo->ntasks = 0;
  47. fifo->nprocessed = 0;
  48. fifo->exp_start = starpu_timing_now();
  49. fifo->exp_len = 0.0;
  50. fifo->exp_end = fifo->exp_start;
  51. return fifo;
  52. }
  53. void _starpu_destroy_fifo(struct _starpu_fifo_taskq *fifo)
  54. {
  55. free(fifo);
  56. }
  57. int _starpu_fifo_empty(struct _starpu_fifo_taskq *fifo)
  58. {
  59. return fifo->ntasks == 0;
  60. }
  61. int
  62. _starpu_fifo_push_sorted_task(struct _starpu_fifo_taskq *fifo_queue, struct starpu_task *task)
  63. {
  64. struct starpu_task_list *list = &fifo_queue->taskq;
  65. if (list->head == NULL)
  66. {
  67. list->head = task;
  68. list->tail = task;
  69. task->prev = NULL;
  70. task->next = NULL;
  71. }
  72. else
  73. {
  74. struct starpu_task *current = list->head;
  75. struct starpu_task *prev = NULL;
  76. while (current)
  77. {
  78. if (current->priority < task->priority)
  79. break;
  80. prev = current;
  81. current = current->next;
  82. }
  83. if (prev == NULL)
  84. {
  85. /* Insert at the front of the list */
  86. list->head->prev = task;
  87. task->prev = NULL;
  88. task->next = list->head;
  89. list->head = task;
  90. }
  91. else
  92. {
  93. if (current)
  94. {
  95. /* Insert between prev and current */
  96. task->prev = prev;
  97. prev->next = task;
  98. task->next = current;
  99. current->prev = task;
  100. }
  101. else
  102. {
  103. /* Insert at the tail of the list */
  104. list->tail->next = task;
  105. task->next = NULL;
  106. task->prev = list->tail;
  107. list->tail = task;
  108. }
  109. }
  110. }
  111. fifo_queue->ntasks++;
  112. fifo_queue->nprocessed++;
  113. return 0;
  114. }
  115. int _starpu_fifo_push_task(struct _starpu_fifo_taskq *fifo_queue, struct starpu_task *task)
  116. {
  117. if (task->priority > 0)
  118. {
  119. _starpu_fifo_push_sorted_task(fifo_queue, task);
  120. }
  121. else
  122. {
  123. starpu_task_list_push_back(&fifo_queue->taskq, task);
  124. fifo_queue->ntasks++;
  125. fifo_queue->nprocessed++;
  126. }
  127. return 0;
  128. }
  129. struct starpu_task *_starpu_fifo_pop_task(struct _starpu_fifo_taskq *fifo_queue, int workerid)
  130. {
  131. struct starpu_task *task;
  132. for (task = starpu_task_list_begin(&fifo_queue->taskq);
  133. task != starpu_task_list_end(&fifo_queue->taskq);
  134. task = starpu_task_list_next(task))
  135. {
  136. unsigned nimpl;
  137. STARPU_ASSERT(task);
  138. for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
  139. if (starpu_worker_can_execute_task(workerid, task, nimpl))
  140. {
  141. starpu_task_set_implementation(task, nimpl);
  142. starpu_task_list_erase(&fifo_queue->taskq, task);
  143. fifo_queue->ntasks--;
  144. _STARPU_TRACE_JOB_POP(task, 0);
  145. return task;
  146. }
  147. }
  148. return NULL;
  149. }
  150. /* This is the same as _starpu_fifo_pop_task, but without checking that the
  151. * worker will be able to execute this task. This is useful when the scheduler
  152. * has already checked it. */
  153. struct starpu_task *_starpu_fifo_pop_local_task(struct _starpu_fifo_taskq *fifo_queue)
  154. {
  155. struct starpu_task *task = NULL;
  156. if (!starpu_task_list_empty(&fifo_queue->taskq))
  157. {
  158. task = starpu_task_list_pop_front(&fifo_queue->taskq);
  159. fifo_queue->ntasks--;
  160. _STARPU_TRACE_JOB_POP(task, 0);
  161. }
  162. return task;
  163. }
  164. /* pop every task that can be executed on the calling driver */
  165. struct starpu_task *_starpu_fifo_pop_every_task(struct _starpu_fifo_taskq *fifo_queue, int workerid)
  166. {
  167. struct starpu_task_list *old_list;
  168. unsigned size;
  169. struct starpu_task *new_list = NULL;
  170. struct starpu_task *new_list_tail = NULL;
  171. size = fifo_queue->ntasks;
  172. if (size > 0)
  173. {
  174. old_list = &fifo_queue->taskq;
  175. unsigned new_list_size = 0;
  176. struct starpu_task *task, *next_task;
  177. /* note that this starts at the _head_ of the list, so we put
  178. * elements at the back of the new list */
  179. task = starpu_task_list_front(old_list);
  180. while (task)
  181. {
  182. unsigned nimpl;
  183. next_task = task->next;
  184. for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
  185. if (starpu_worker_can_execute_task(workerid, task, nimpl))
  186. {
  187. /* this elements can be moved into the new list */
  188. new_list_size++;
  189. starpu_task_list_erase(old_list, task);
  190. if (new_list_tail)
  191. {
  192. new_list_tail->next = task;
  193. task->prev = new_list_tail;
  194. task->next = NULL;
  195. new_list_tail = task;
  196. }
  197. else
  198. {
  199. new_list = task;
  200. new_list_tail = task;
  201. task->prev = NULL;
  202. task->next = NULL;
  203. }
  204. starpu_task_set_implementation(task, nimpl);
  205. break;
  206. }
  207. task = next_task;
  208. }
  209. fifo_queue->ntasks -= new_list_size;
  210. }
  211. return new_list;
  212. }