node_fifo.c 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. #include <starpu_sched_node.h>
  2. #include "prio_deque.h"
  3. #include <starpu_scheduler.h>
  4. struct _starpu_fifo_data
  5. {
  6. struct _starpu_prio_deque fifo;
  7. starpu_pthread_mutex_t mutex;
  8. };
  9. static double fifo_estimated_end(struct starpu_sched_node * node)
  10. {
  11. struct _starpu_fifo_data * data = node->data;
  12. struct _starpu_prio_deque * fifo = &data->fifo;
  13. starpu_pthread_mutex_t * mutex = &data->mutex;
  14. int card = starpu_bitmap_cardinal(node->workers_in_ctx);
  15. STARPU_PTHREAD_MUTEX_LOCK(mutex);
  16. double estimated_end = fifo->exp_start + fifo->exp_len / card;
  17. STARPU_PTHREAD_MUTEX_UNLOCK(mutex);
  18. return estimated_end;
  19. }
  20. static double estimated_load(struct starpu_sched_node * node)
  21. {
  22. struct _starpu_fifo_data * data = node->data;
  23. struct _starpu_prio_deque * fifo = &data->fifo;
  24. starpu_pthread_mutex_t * mutex = &data->mutex;
  25. double relative_speedup = 0.0;
  26. double load;
  27. if(node->is_homogeneous)
  28. {
  29. relative_speedup = starpu_worker_get_relative_speedup(starpu_worker_get_perf_archtype(starpu_bitmap_first(node->workers)));
  30. STARPU_PTHREAD_MUTEX_LOCK(mutex);
  31. load = fifo->ntasks / relative_speedup;
  32. STARPU_PTHREAD_MUTEX_UNLOCK(mutex);
  33. return load;
  34. }
  35. else
  36. {
  37. int i;
  38. for(i = starpu_bitmap_first(node->workers);
  39. i != -1;
  40. i = starpu_bitmap_next(node->workers, i))
  41. relative_speedup += starpu_worker_get_relative_speedup(starpu_worker_get_perf_archtype(i));
  42. relative_speedup /= starpu_bitmap_cardinal(node->workers);
  43. STARPU_ASSERT(!_STARPU_IS_ZERO(relative_speedup));
  44. STARPU_PTHREAD_MUTEX_LOCK(mutex);
  45. load = fifo->ntasks / relative_speedup;
  46. STARPU_PTHREAD_MUTEX_UNLOCK(mutex);
  47. }
  48. int i;
  49. for(i = 0; i < node->nchilds; i++)
  50. {
  51. struct starpu_sched_node * c = node->childs[i];
  52. load += c->estimated_load(c);
  53. }
  54. return load;
  55. }
  56. static int push_task(struct starpu_sched_node * node, struct starpu_task * task)
  57. {
  58. STARPU_ASSERT(starpu_sched_node_can_execute_task(node,task));
  59. struct _starpu_fifo_data * data = node->data;
  60. struct _starpu_prio_deque * fifo = &data->fifo;
  61. starpu_pthread_mutex_t * mutex = &data->mutex;
  62. STARPU_PTHREAD_MUTEX_LOCK(mutex);
  63. STARPU_ASSERT(!isnan(fifo->exp_end));
  64. STARPU_ASSERT(!isnan(fifo->exp_len));
  65. STARPU_ASSERT(!isnan(fifo->exp_start));
  66. int ret = _starpu_prio_deque_push_task(fifo, task);
  67. if(!isnan(task->predicted))
  68. {
  69. fifo->exp_len += task->predicted;
  70. fifo->exp_end = fifo->exp_start + fifo->exp_len;
  71. }
  72. STARPU_ASSERT(!isnan(fifo->exp_end));
  73. STARPU_ASSERT(!isnan(fifo->exp_len));
  74. STARPU_ASSERT(!isnan(fifo->exp_start));
  75. STARPU_PTHREAD_MUTEX_UNLOCK(mutex);
  76. node->available(node);
  77. return ret;
  78. }
  79. static struct starpu_task * pop_task(struct starpu_sched_node * node, unsigned sched_ctx_id)
  80. {
  81. struct _starpu_fifo_data * data = node->data;
  82. struct _starpu_prio_deque * fifo = &data->fifo;
  83. starpu_pthread_mutex_t * mutex = &data->mutex;
  84. STARPU_PTHREAD_MUTEX_LOCK(mutex);
  85. STARPU_ASSERT(!isnan(fifo->exp_end));
  86. STARPU_ASSERT(!isnan(fifo->exp_len));
  87. STARPU_ASSERT(!isnan(fifo->exp_start));
  88. struct starpu_task * task = node->is_homogeneous ?
  89. _starpu_prio_deque_pop_task(fifo):
  90. _starpu_prio_deque_pop_task_for_worker(fifo, starpu_worker_get_id());
  91. if(task)
  92. {
  93. if(!isnan(task->predicted))
  94. {
  95. fifo->exp_start = starpu_timing_now() + task->predicted;
  96. fifo->exp_len -= task->predicted;
  97. }
  98. fifo->exp_end = fifo->exp_start + fifo->exp_len;
  99. if(fifo->ntasks == 0)
  100. fifo->exp_len = 0.0;
  101. }
  102. STARPU_ASSERT(!isnan(fifo->exp_end));
  103. STARPU_ASSERT(!isnan(fifo->exp_len));
  104. STARPU_ASSERT(!isnan(fifo->exp_start));
  105. STARPU_PTHREAD_MUTEX_UNLOCK(mutex);
  106. if(task)
  107. return task;
  108. struct starpu_sched_node * father = node->fathers[sched_ctx_id];
  109. if(father)
  110. return father->pop_task(father,sched_ctx_id);
  111. return NULL;
  112. }
  113. /*
  114. struct starpu_task_list starpu_sched_node_fifo_get_non_executable_tasks(struct starpu_sched_node * node)
  115. {
  116. struct starpu_task_list list;
  117. starpu_task_list_init(&list);
  118. struct _starpu_fifo_data * data = node->data;
  119. struct _starpu_prio_deque * fifo = data->fifo;
  120. struct starpu_task * task;
  121. for (task = starpu_task_list_begin(&fifo->taskq);
  122. task != starpu_task_list_end(&fifo->taskq);
  123. task = starpu_task_list_next(task))
  124. {
  125. STARPU_ASSERT(task);
  126. if(!starpu_sched_node_can_execute_task(node, task))
  127. {
  128. starpu_task_list_erase(&fifo->taskq, task);
  129. starpu_task_list_push_front(&list, task);
  130. fifo->ntasks--;
  131. }
  132. }
  133. return list;
  134. }
  135. */
  136. void init_fifo_data(struct starpu_sched_node * node)
  137. {
  138. STARPU_ASSERT(starpu_sched_node_is_fifo(node));
  139. struct _starpu_fifo_data * data = malloc(sizeof(*data));
  140. _starpu_prio_deque_init(&data->fifo);
  141. STARPU_PTHREAD_MUTEX_INIT(&data->mutex,NULL);
  142. node->data = data;
  143. }
  144. void deinit_fifo_data(struct starpu_sched_node * node)
  145. {
  146. struct _starpu_fifo_data * data = node->data;
  147. STARPU_PTHREAD_MUTEX_DESTROY(&data->mutex);
  148. _starpu_prio_deque_destroy(&data->fifo);
  149. free(data);
  150. }
  151. int starpu_sched_node_is_fifo(struct starpu_sched_node * node)
  152. {
  153. return node->init_data == init_fifo_data;
  154. }
  155. struct starpu_sched_node * starpu_sched_node_fifo_create(void * arg STARPU_ATTRIBUTE_UNUSED)
  156. {
  157. struct starpu_sched_node * node = starpu_sched_node_create();
  158. node->estimated_end = fifo_estimated_end;
  159. node->estimated_load = estimated_load;
  160. node->init_data = init_fifo_data;
  161. node->deinit_data = deinit_fifo_data;
  162. node->push_task = push_task;
  163. node->pop_task = pop_task;
  164. return node;
  165. }