node_fifo.c 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. #include "node_sched.h"
  2. #include "fifo_queues.h"
  3. #include <starpu_scheduler.h>
  4. struct _starpu_fifo_data
  5. {
  6. struct _starpu_fifo_taskq * fifo;
  7. starpu_pthread_mutex_t mutex;
  8. };
  9. static struct _starpu_task_execute_preds estimated_execute_preds(struct _starpu_sched_node * node,
  10. struct starpu_task * task)
  11. {
  12. if(node->nchilds == 0)
  13. {
  14. struct _starpu_task_execute_preds p = { CANNOT_EXECUTE };
  15. return p;
  16. }
  17. struct _starpu_task_execute_preds preds = node->childs[0]->estimated_execute_preds(node->childs[0],task);
  18. if(preds.state == PERF_MODEL)
  19. {
  20. struct _starpu_fifo_data * data = node->data;
  21. struct _starpu_fifo_taskq * fifo = data->fifo;
  22. starpu_pthread_mutex_t * mutex = &data->mutex;
  23. STARPU_PTHREAD_MUTEX_LOCK(mutex);
  24. preds.expected_finish_time = _starpu_compute_expected_time(fifo->exp_start,
  25. preds.expected_finish_time + fifo->exp_end,
  26. preds.expected_length + fifo->exp_len,
  27. preds.expected_transfer_length);
  28. STARPU_PTHREAD_MUTEX_UNLOCK(mutex);
  29. }
  30. return preds;
  31. }
  32. static double estimated_load(struct _starpu_sched_node * node)
  33. {
  34. double relative_speedup = 0.0;
  35. int i;
  36. STARPU_ASSERT(node->nworkers > 0);
  37. int nworkers = node->is_homogeneous ? 1 : node->nworkers;
  38. for(i = 0; i < nworkers; i++)
  39. relative_speedup += starpu_worker_get_relative_speedup(starpu_worker_get_perf_archtype(node->workerids[i]));
  40. relative_speedup /= nworkers;
  41. struct _starpu_fifo_taskq * fifo = node->data;
  42. STARPU_ASSERT(!_STARPU_IS_ZERO(relative_speedup));
  43. double load = fifo->ntasks / relative_speedup;
  44. for(i = 0; i < node->nchilds; i++)
  45. {
  46. struct _starpu_sched_node * c = node->childs[i];
  47. load += c->estimated_load(c);
  48. }
  49. return load;
  50. }
  51. static int push_task(struct _starpu_sched_node * node, struct starpu_task * task)
  52. {
  53. STARPU_ASSERT(node->nworkers > 0);
  54. struct _starpu_fifo_data * data = node->data;
  55. struct _starpu_fifo_taskq * fifo = data->fifo;
  56. starpu_pthread_mutex_t * mutex = &data->mutex;
  57. STARPU_PTHREAD_MUTEX_LOCK(mutex);
  58. STARPU_ASSERT(!isnan(fifo->exp_end));
  59. STARPU_ASSERT(!isnan(fifo->exp_len));
  60. STARPU_ASSERT(!isnan(fifo->exp_start));
  61. int ret = _starpu_fifo_push_sorted_task(fifo, task);
  62. if(!isnan(task->predicted))
  63. {
  64. fifo->exp_len += task->predicted/node->nworkers;
  65. fifo->exp_end = fifo->exp_start + fifo->exp_len;
  66. }
  67. STARPU_ASSERT(!isnan(fifo->exp_end));
  68. STARPU_ASSERT(!isnan(fifo->exp_len));
  69. STARPU_ASSERT(!isnan(fifo->exp_start));
  70. STARPU_PTHREAD_MUTEX_UNLOCK(mutex);
  71. node->available(node);
  72. return ret;
  73. }
  74. static struct starpu_task * pop_task(struct _starpu_sched_node * node, unsigned sched_ctx_id)
  75. {
  76. struct _starpu_fifo_data * data = node->data;
  77. struct _starpu_fifo_taskq * fifo = data->fifo;
  78. starpu_pthread_mutex_t * mutex = &data->mutex;
  79. STARPU_PTHREAD_MUTEX_LOCK(mutex);
  80. STARPU_ASSERT(!isnan(fifo->exp_end));
  81. STARPU_ASSERT(!isnan(fifo->exp_len));
  82. STARPU_ASSERT(!isnan(fifo->exp_start));
  83. struct starpu_task * task = _starpu_fifo_pop_task(fifo, starpu_worker_get_id());
  84. if(task)
  85. {
  86. fifo->exp_start = starpu_timing_now();
  87. STARPU_ASSERT(node->nworkers > 0);
  88. if(!isnan(task->predicted))
  89. fifo->exp_len -= task->predicted/node->nworkers;
  90. fifo->exp_end = fifo->exp_start + fifo->exp_len;
  91. }
  92. STARPU_ASSERT(!isnan(fifo->exp_end));
  93. STARPU_ASSERT(!isnan(fifo->exp_len));
  94. STARPU_ASSERT(!isnan(fifo->exp_start));
  95. STARPU_PTHREAD_MUTEX_UNLOCK(mutex);
  96. if(task)
  97. return task;
  98. struct _starpu_sched_node * father = node->fathers[sched_ctx_id];
  99. if(father)
  100. return father->pop_task(father,sched_ctx_id);
  101. return NULL;
  102. }
  103. struct starpu_task_list _starpu_sched_node_fifo_get_non_executable_tasks(struct _starpu_sched_node * node)
  104. {
  105. struct starpu_task_list list;
  106. starpu_task_list_init(&list);
  107. struct _starpu_fifo_data * data = node->data;
  108. struct _starpu_fifo_taskq * fifo = data->fifo;
  109. struct starpu_task * task;
  110. for (task = starpu_task_list_begin(&fifo->taskq);
  111. task != starpu_task_list_end(&fifo->taskq);
  112. task = starpu_task_list_next(task))
  113. {
  114. STARPU_ASSERT(task);
  115. if(!_starpu_sched_node_can_execute_task(node, task))
  116. {
  117. starpu_task_list_erase(&fifo->taskq, task);
  118. starpu_task_list_push_front(&list, task);
  119. fifo->ntasks--;
  120. }
  121. }
  122. return list;
  123. }
  124. int _starpu_sched_node_is_fifo(struct _starpu_sched_node * node)
  125. {
  126. return node->estimated_execute_preds == estimated_execute_preds
  127. || node->estimated_load == estimated_load
  128. || node->push_task == push_task
  129. || node->pop_task == pop_task;
  130. }
  131. struct _starpu_sched_node * _starpu_sched_node_fifo_create(void)
  132. {
  133. struct _starpu_sched_node * node = _starpu_sched_node_create();
  134. struct _starpu_fifo_data * data = malloc(sizeof(*data));
  135. data->fifo = _starpu_create_fifo();
  136. STARPU_PTHREAD_MUTEX_INIT(&data->mutex,NULL);
  137. node->data = data;
  138. node->estimated_execute_preds = estimated_execute_preds;
  139. node->estimated_load = estimated_load;
  140. node->push_task = push_task;
  141. node->pop_task = pop_task;
  142. return node;
  143. }