fifo_queues.c 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2010-2012 Université de Bordeaux 1
  4. * Copyright (C) 2010, 2011 Centre National de la Recherche Scientifique
  5. * Copyright (C) 2011 Télécom-SudParis
  6. *
  7. * StarPU is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU Lesser General Public License as published by
  9. * the Free Software Foundation; either version 2.1 of the License, or (at
  10. * your option) any later version.
  11. *
  12. * StarPU is distributed in the hope that it will be useful, but
  13. * WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  15. *
  16. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  17. */
  18. /* FIFO queues, ready for use by schedulers */
  19. #include <pthread.h>
  20. #include <sched_policies/fifo_queues.h>
  21. #include <errno.h>
  22. #include <common/utils.h>
  23. #include <core/task.h>
  24. #include <core/workers.h>
  25. struct _starpu_fifo_taskq *_starpu_create_fifo(void)
  26. {
  27. struct _starpu_fifo_taskq *fifo;
  28. fifo = (struct _starpu_fifo_taskq *) malloc(sizeof(struct _starpu_fifo_taskq));
  29. /* note that not all mechanisms (eg. the semaphore) have to be used */
  30. starpu_task_list_init(&fifo->taskq);
  31. fifo->ntasks = 0;
  32. fifo->nprocessed = 0;
  33. fifo->exp_start = starpu_timing_now();
  34. fifo->exp_len = 0.0;
  35. fifo->exp_end = fifo->exp_start;
  36. return fifo;
  37. }
  38. void _starpu_destroy_fifo(struct _starpu_fifo_taskq *fifo)
  39. {
  40. free(fifo);
  41. }
  42. int _starpu_fifo_empty(struct _starpu_fifo_taskq *fifo)
  43. {
  44. return fifo->ntasks == 0;
  45. }
  46. int
  47. _starpu_fifo_push_sorted_task(struct _starpu_fifo_taskq *fifo_queue,
  48. pthread_mutex_t *sched_mutex,
  49. pthread_cond_t *sched_cond,
  50. struct starpu_task *task)
  51. {
  52. struct starpu_task_list *list = &fifo_queue->taskq;
  53. _STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
  54. _STARPU_TRACE_JOB_PUSH(task, 0);
  55. if (list->head == NULL)
  56. {
  57. list->head = task;
  58. list->tail = task;
  59. task->prev = NULL;
  60. task->next = NULL;
  61. }
  62. else
  63. {
  64. struct starpu_task *current = list->head;
  65. struct starpu_task *prev = NULL;
  66. while (current)
  67. {
  68. if (current->priority >= task->priority)
  69. break;
  70. prev = current;
  71. current = current->next;
  72. }
  73. if (prev == NULL)
  74. {
  75. /* Insert at the front of the list */
  76. list->head->prev = task;
  77. task->prev = NULL;
  78. task->next = list->head;
  79. list->head = task;
  80. }
  81. else
  82. {
  83. if (current)
  84. {
  85. /* Insert between prev and current */
  86. task->prev = prev;
  87. prev->next = task;
  88. task->next = current;
  89. current->prev = task;
  90. }
  91. else
  92. {
  93. /* Insert at the tail of the list */
  94. list->tail->next = task;
  95. task->next = NULL;
  96. task->prev = list->tail;
  97. list->tail = task;
  98. }
  99. }
  100. }
  101. fifo_queue->ntasks++;
  102. fifo_queue->nprocessed++;
  103. _STARPU_PTHREAD_COND_SIGNAL(sched_cond);
  104. _STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
  105. return 0;
  106. }
  107. /* TODO: revert front/back? */
  108. int _starpu_fifo_push_task(struct _starpu_fifo_taskq *fifo_queue, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond, struct starpu_task *task)
  109. {
  110. if (task->priority > 0)
  111. {
  112. _STARPU_TRACE_JOB_PUSH(task, 1);
  113. _starpu_fifo_push_sorted_task(fifo_queue, sched_mutex,
  114. sched_cond, task);
  115. }
  116. else
  117. {
  118. _STARPU_TRACE_JOB_PUSH(task, 0);
  119. _STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
  120. starpu_task_list_push_front(&fifo_queue->taskq, task);
  121. fifo_queue->ntasks++;
  122. fifo_queue->nprocessed++;
  123. _STARPU_PTHREAD_COND_SIGNAL(sched_cond);
  124. _STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
  125. }
  126. return 0;
  127. }
  128. struct starpu_task *_starpu_fifo_pop_task(struct _starpu_fifo_taskq *fifo_queue, int workerid)
  129. {
  130. struct starpu_task *task;
  131. for (task = starpu_task_list_begin(&fifo_queue->taskq);
  132. task != starpu_task_list_end(&fifo_queue->taskq);
  133. task = starpu_task_list_next(task))
  134. {
  135. unsigned nimpl;
  136. STARPU_ASSERT(task);
  137. for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
  138. if (starpu_worker_can_execute_task(workerid, task, nimpl))
  139. {
  140. _starpu_get_job_associated_to_task(task)->nimpl = nimpl;
  141. starpu_task_list_erase(&fifo_queue->taskq, task);
  142. fifo_queue->ntasks--;
  143. _STARPU_TRACE_JOB_POP(task, 0);
  144. return task;
  145. }
  146. }
  147. return NULL;
  148. }
  149. /* This is the same as _starpu_fifo_pop_task, but without checking that the
  150. * worker will be able to execute this task. This is useful when the scheduler
  151. * has already checked it. */
  152. struct starpu_task *_starpu_fifo_pop_local_task(struct _starpu_fifo_taskq *fifo_queue)
  153. {
  154. struct starpu_task *task = NULL;
  155. if (!starpu_task_list_empty(&fifo_queue->taskq))
  156. {
  157. task = starpu_task_list_pop_back(&fifo_queue->taskq);
  158. fifo_queue->ntasks--;
  159. _STARPU_TRACE_JOB_POP(task, 0);
  160. }
  161. return task;
  162. }
  163. /* pop every task that can be executed on the calling driver */
  164. struct starpu_task *_starpu_fifo_pop_every_task(struct _starpu_fifo_taskq *fifo_queue, pthread_mutex_t *sched_mutex, int workerid)
  165. {
  166. struct starpu_task_list *old_list;
  167. unsigned size;
  168. struct starpu_task *new_list = NULL;
  169. struct starpu_task *new_list_tail = NULL;
  170. _STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
  171. size = fifo_queue->ntasks;
  172. if (size > 0)
  173. {
  174. old_list = &fifo_queue->taskq;
  175. unsigned new_list_size = 0;
  176. struct starpu_task *task, *next_task;
  177. /* note that this starts at the _head_ of the list, so we put
  178. * elements at the back of the new list */
  179. task = starpu_task_list_front(old_list);
  180. while (task)
  181. {
  182. unsigned nimpl;
  183. next_task = task->next;
  184. for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
  185. if (starpu_worker_can_execute_task(workerid, task, nimpl))
  186. {
  187. /* this elements can be moved into the new list */
  188. new_list_size++;
  189. starpu_task_list_erase(old_list, task);
  190. if (new_list_tail)
  191. {
  192. new_list_tail->next = task;
  193. task->prev = new_list_tail;
  194. task->next = NULL;
  195. new_list_tail = task;
  196. }
  197. else
  198. {
  199. new_list = task;
  200. new_list_tail = task;
  201. task->prev = NULL;
  202. task->next = NULL;
  203. }
  204. _starpu_get_job_associated_to_task(task)->nimpl = nimpl;
  205. break;
  206. }
  207. task = next_task;
  208. }
  209. fifo_queue->ntasks -= new_list_size;
  210. }
  211. _STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
  212. return new_list;
  213. }