fifo_queues.c 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2010-2013 Université de Bordeaux 1
  4. * Copyright (C) 2010, 2011, 2013 Centre National de la Recherche Scientifique
  5. * Copyright (C) 2011 Télécom-SudParis
  6. *
  7. * StarPU is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU Lesser General Public License as published by
  9. * the Free Software Foundation; either version 2.1 of the License, or (at
  10. * your option) any later version.
  11. *
  12. * StarPU is distributed in the hope that it will be useful, but
  13. * WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  15. *
  16. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  17. */
  18. /* FIFO queues, ready for use by schedulers */
  19. #include <starpu_scheduler.h>
  20. #include <sched_policies/fifo_queues.h>
  21. #include <common/fxt.h>
  22. struct _starpu_fifo_taskq *_starpu_create_fifo(void)
  23. {
  24. struct _starpu_fifo_taskq *fifo;
  25. fifo = (struct _starpu_fifo_taskq *) malloc(sizeof(struct _starpu_fifo_taskq));
  26. /* note that not all mechanisms (eg. the semaphore) have to be used */
  27. starpu_task_list_init(&fifo->taskq);
  28. fifo->ntasks = 0;
  29. fifo->nprocessed = 0;
  30. fifo->exp_start = starpu_timing_now();
  31. fifo->exp_len = 0.0;
  32. fifo->exp_end = fifo->exp_start;
  33. return fifo;
  34. }
  35. void _starpu_destroy_fifo(struct _starpu_fifo_taskq *fifo)
  36. {
  37. free(fifo);
  38. }
  39. int _starpu_fifo_empty(struct _starpu_fifo_taskq *fifo)
  40. {
  41. return fifo->ntasks == 0;
  42. }
  43. int
  44. _starpu_fifo_push_sorted_task(struct _starpu_fifo_taskq *fifo_queue, struct starpu_task *task)
  45. {
  46. struct starpu_task_list *list = &fifo_queue->taskq;
  47. if (list->head == NULL)
  48. {
  49. list->head = task;
  50. list->tail = task;
  51. task->prev = NULL;
  52. task->next = NULL;
  53. }
  54. else
  55. {
  56. struct starpu_task *current = list->head;
  57. struct starpu_task *prev = NULL;
  58. while (current)
  59. {
  60. if (current->priority < task->priority)
  61. break;
  62. prev = current;
  63. current = current->next;
  64. }
  65. if (prev == NULL)
  66. {
  67. /* Insert at the front of the list */
  68. list->head->prev = task;
  69. task->prev = NULL;
  70. task->next = list->head;
  71. list->head = task;
  72. }
  73. else
  74. {
  75. if (current)
  76. {
  77. /* Insert between prev and current */
  78. task->prev = prev;
  79. prev->next = task;
  80. task->next = current;
  81. current->prev = task;
  82. }
  83. else
  84. {
  85. /* Insert at the tail of the list */
  86. list->tail->next = task;
  87. task->next = NULL;
  88. task->prev = list->tail;
  89. list->tail = task;
  90. }
  91. }
  92. }
  93. fifo_queue->ntasks++;
  94. fifo_queue->nprocessed++;
  95. return 0;
  96. }
  97. int _starpu_fifo_push_task(struct _starpu_fifo_taskq *fifo_queue, struct starpu_task *task)
  98. {
  99. if (task->priority > 0)
  100. {
  101. _starpu_fifo_push_sorted_task(fifo_queue, task);
  102. }
  103. else
  104. {
  105. starpu_task_list_push_back(&fifo_queue->taskq, task);
  106. fifo_queue->ntasks++;
  107. fifo_queue->nprocessed++;
  108. }
  109. return 0;
  110. }
  111. struct starpu_task *_starpu_fifo_pop_task(struct _starpu_fifo_taskq *fifo_queue, int workerid)
  112. {
  113. struct starpu_task *task;
  114. for (task = starpu_task_list_begin(&fifo_queue->taskq);
  115. task != starpu_task_list_end(&fifo_queue->taskq);
  116. task = starpu_task_list_next(task))
  117. {
  118. unsigned nimpl;
  119. STARPU_ASSERT(task);
  120. for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
  121. if (starpu_worker_can_execute_task(workerid, task, nimpl))
  122. {
  123. starpu_task_set_implementation(task, nimpl);
  124. starpu_task_list_erase(&fifo_queue->taskq, task);
  125. fifo_queue->ntasks--;
  126. _STARPU_TRACE_JOB_POP(task, 0);
  127. return task;
  128. }
  129. }
  130. return NULL;
  131. }
  132. /* This is the same as _starpu_fifo_pop_task, but without checking that the
  133. * worker will be able to execute this task. This is useful when the scheduler
  134. * has already checked it. */
  135. struct starpu_task *_starpu_fifo_pop_local_task(struct _starpu_fifo_taskq *fifo_queue)
  136. {
  137. struct starpu_task *task = NULL;
  138. if (!starpu_task_list_empty(&fifo_queue->taskq))
  139. {
  140. task = starpu_task_list_pop_front(&fifo_queue->taskq);
  141. fifo_queue->ntasks--;
  142. _STARPU_TRACE_JOB_POP(task, 0);
  143. }
  144. return task;
  145. }
  146. /* pop every task that can be executed on the calling driver */
  147. struct starpu_task *_starpu_fifo_pop_every_task(struct _starpu_fifo_taskq *fifo_queue, int workerid)
  148. {
  149. struct starpu_task_list *old_list;
  150. unsigned size;
  151. struct starpu_task *new_list = NULL;
  152. struct starpu_task *new_list_tail = NULL;
  153. size = fifo_queue->ntasks;
  154. if (size > 0)
  155. {
  156. old_list = &fifo_queue->taskq;
  157. unsigned new_list_size = 0;
  158. struct starpu_task *task, *next_task;
  159. /* note that this starts at the _head_ of the list, so we put
  160. * elements at the back of the new list */
  161. task = starpu_task_list_front(old_list);
  162. while (task)
  163. {
  164. unsigned nimpl;
  165. next_task = task->next;
  166. for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
  167. if (starpu_worker_can_execute_task(workerid, task, nimpl))
  168. {
  169. /* this elements can be moved into the new list */
  170. new_list_size++;
  171. starpu_task_list_erase(old_list, task);
  172. if (new_list_tail)
  173. {
  174. new_list_tail->next = task;
  175. task->prev = new_list_tail;
  176. task->next = NULL;
  177. new_list_tail = task;
  178. }
  179. else
  180. {
  181. new_list = task;
  182. new_list_tail = task;
  183. task->prev = NULL;
  184. task->next = NULL;
  185. }
  186. starpu_task_set_implementation(task, nimpl);
  187. break;
  188. }
  189. task = next_task;
  190. }
  191. fifo_queue->ntasks -= new_list_size;
  192. }
  193. return new_list;
  194. }