fifo_queues.c 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. /*
  2. * StarPU
  3. * Copyright (C) Université Bordeaux 1, CNRS 2008-2010 (see AUTHORS file)
  4. *
  5. * This program is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. /* FIFO queues, ready for use by schedulers */
  17. #include <pthread.h>
  18. #include <sched_policies/fifo_queues.h>
  19. #include <errno.h>
  20. #include <common/utils.h>
  21. #include <core/task.h>
  22. struct starpu_fifo_taskq_s *_starpu_create_fifo(void)
  23. {
  24. struct starpu_fifo_taskq_s *fifo;
  25. fifo = malloc(sizeof(struct starpu_fifo_taskq_s));
  26. /* note that not all mechanisms (eg. the semaphore) have to be used */
  27. starpu_task_list_init(&fifo->taskq);
  28. fifo->ntasks = 0;
  29. fifo->nprocessed = 0;
  30. fifo->exp_start = _starpu_timing_now();
  31. fifo->exp_len = 0.0;
  32. fifo->exp_end = fifo->exp_start;
  33. return fifo;
  34. }
  35. void _starpu_destroy_fifo(struct starpu_fifo_taskq_s *fifo)
  36. {
  37. free(fifo);
  38. }
  39. int _starpu_fifo_push_prio_task(struct starpu_fifo_taskq_s *fifo_queue, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond, struct starpu_task *task)
  40. {
  41. PTHREAD_MUTEX_LOCK(sched_mutex);
  42. STARPU_TRACE_JOB_PUSH(task, 0);
  43. starpu_task_list_push_back(&fifo_queue->taskq, task);
  44. fifo_queue->ntasks++;
  45. fifo_queue->nprocessed++;
  46. pthread_cond_signal(sched_cond);
  47. PTHREAD_MUTEX_UNLOCK(sched_mutex);
  48. return 0;
  49. }
  50. int _starpu_fifo_push_task(struct starpu_fifo_taskq_s *fifo_queue, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond, struct starpu_task *task)
  51. {
  52. PTHREAD_MUTEX_LOCK(sched_mutex);
  53. STARPU_TRACE_JOB_PUSH(task, 0);
  54. starpu_task_list_push_front(&fifo_queue->taskq, task);
  55. fifo_queue->ntasks++;
  56. fifo_queue->nprocessed++;
  57. pthread_cond_signal(sched_cond);
  58. PTHREAD_MUTEX_UNLOCK(sched_mutex);
  59. return 0;
  60. }
  61. struct starpu_task *_starpu_fifo_pop_task(struct starpu_fifo_taskq_s *fifo_queue)
  62. {
  63. struct starpu_task *task = NULL;
  64. if (fifo_queue->ntasks == 0)
  65. return NULL;
  66. if (fifo_queue->ntasks > 0)
  67. {
  68. /* there is a task */
  69. task = starpu_task_list_pop_back(&fifo_queue->taskq);
  70. STARPU_ASSERT(task);
  71. fifo_queue->ntasks--;
  72. STARPU_TRACE_JOB_POP(task, 0);
  73. }
  74. return task;
  75. }
  76. /* pop every task that can be executed on the calling driver */
  77. struct starpu_task *_starpu_fifo_pop_every_task(struct starpu_fifo_taskq_s *fifo_queue, pthread_mutex_t *sched_mutex, uint32_t where)
  78. {
  79. struct starpu_task_list *old_list;
  80. unsigned size;
  81. struct starpu_task *new_list = NULL;
  82. struct starpu_task *new_list_tail = NULL;
  83. PTHREAD_MUTEX_LOCK(sched_mutex);
  84. size = fifo_queue->ntasks;
  85. if (size > 0) {
  86. old_list = &fifo_queue->taskq;
  87. unsigned new_list_size = 0;
  88. struct starpu_task *task, *next_task;
  89. /* note that this starts at the _head_ of the list, so we put
  90. * elements at the back of the new list */
  91. task = starpu_task_list_front(old_list);
  92. while (task)
  93. {
  94. next_task = task->next;
  95. if (task->cl->where & where)
  96. {
  97. /* this elements can be moved into the new list */
  98. new_list_size++;
  99. starpu_task_list_erase(old_list, task);
  100. if (new_list_tail)
  101. {
  102. new_list_tail->next = task;
  103. task->prev = new_list_tail;
  104. task->next = NULL;
  105. new_list_tail = task;
  106. }
  107. else {
  108. new_list = task;
  109. new_list_tail = task;
  110. task->prev = NULL;
  111. task->next = NULL;
  112. }
  113. }
  114. task = next_task;
  115. }
  116. fifo_queue->ntasks -= new_list_size;
  117. }
  118. PTHREAD_MUTEX_UNLOCK(sched_mutex);
  119. return new_list;
  120. }