deque_queues.c 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2010-2011 Université de Bordeaux 1
  4. * Copyright (C) 2010, 2011, 2013 Centre National de la Recherche Scientifique
  5. * Copyright (C) 2011 Télécom-SudParis
  6. *
  7. * StarPU is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU Lesser General Public License as published by
  9. * the Free Software Foundation; either version 2.1 of the License, or (at
  10. * your option) any later version.
  11. *
  12. * StarPU is distributed in the hope that it will be useful, but
  13. * WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  15. *
  16. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  17. */
  18. /* Deque queues, ready for use by schedulers */
  19. #include <starpu.h>
  20. #include <starpu_scheduler.h>
  21. #include <sched_policies/deque_queues.h>
  22. #include <core/workers.h>
  23. struct _starpu_deque_jobq *_starpu_create_deque(void)
  24. {
  25. struct _starpu_deque_jobq *deque;
  26. deque = (struct _starpu_deque_jobq *) malloc(sizeof(struct _starpu_deque_jobq));
  27. /* note that not all mechanisms (eg. the semaphore) have to be used */
  28. deque->jobq = _starpu_job_list_new();
  29. deque->njobs = 0;
  30. deque->nprocessed = 0;
  31. deque->exp_start = starpu_timing_now();
  32. deque->exp_len = 0.0;
  33. deque->exp_end = deque->exp_start;
  34. return deque;
  35. }
  36. void _starpu_destroy_deque(struct _starpu_deque_jobq *deque)
  37. {
  38. _starpu_job_list_delete(deque->jobq);
  39. free(deque);
  40. }
  41. unsigned _starpu_get_deque_njobs(struct _starpu_deque_jobq *deque_queue)
  42. {
  43. return deque_queue->njobs;
  44. }
  45. int _starpu_get_deque_nprocessed(struct _starpu_deque_jobq *deque_queue)
  46. {
  47. return deque_queue->nprocessed;
  48. }
  49. struct starpu_task *_starpu_deque_pop_task(struct _starpu_deque_jobq *deque_queue, int workerid)
  50. {
  51. struct _starpu_job *j = NULL;
  52. if ((deque_queue->njobs == 0) && _starpu_machine_is_running())
  53. {
  54. return NULL;
  55. }
  56. /* TODO find a task that suits workerid */
  57. for (j = _starpu_job_list_begin(deque_queue->jobq);
  58. j != _starpu_job_list_end(deque_queue->jobq);
  59. j = _starpu_job_list_next(j))
  60. {
  61. unsigned nimpl;
  62. STARPU_ASSERT(j);
  63. for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
  64. if (starpu_worker_can_execute_task(workerid, j->task, nimpl))
  65. {
  66. j->nimpl = nimpl;
  67. j = _starpu_job_list_pop_front(deque_queue->jobq);
  68. _STARPU_TRACE_JOB_POP(j, 0);
  69. return j->task;
  70. }
  71. }
  72. return NULL;
  73. }
  74. struct _starpu_job_list *_starpu_deque_pop_every_task(struct _starpu_deque_jobq *deque_queue, starpu_pthread_mutex_t *sched_mutex, int workerid)
  75. {
  76. struct _starpu_job_list *new_list, *old_list;
  77. /* block until some task is available in that queue */
  78. _STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
  79. if (deque_queue->njobs == 0)
  80. {
  81. new_list = NULL;
  82. }
  83. else
  84. {
  85. /* there is a task */
  86. old_list = deque_queue->jobq;
  87. new_list = _starpu_job_list_new();
  88. unsigned new_list_size = 0;
  89. struct _starpu_job *i;
  90. struct _starpu_job *next_job;
  91. /* note that this starts at the _head_ of the list, so we put
  92. * elements at the back of the new list */
  93. for(i = _starpu_job_list_begin(old_list);
  94. i != _starpu_job_list_end(old_list);
  95. i = next_job)
  96. {
  97. unsigned nimpl;
  98. next_job = _starpu_job_list_next(i);
  99. for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
  100. if (starpu_worker_can_execute_task(workerid, i->task, nimpl))
  101. {
  102. /* this elements can be moved into the new list */
  103. new_list_size++;
  104. _starpu_job_list_erase(old_list, i);
  105. _starpu_job_list_push_back(new_list, i);
  106. i->nimpl = nimpl;
  107. }
  108. }
  109. if (new_list_size == 0)
  110. {
  111. /* the new list is empty ... */
  112. _starpu_job_list_delete(new_list);
  113. new_list = NULL;
  114. }
  115. else
  116. {
  117. deque_queue->njobs -= new_list_size;
  118. }
  119. }
  120. _STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
  121. return new_list;
  122. }