eager_central_priority_policy.c 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2011-2013,2015-2017 Inria
  4. * Copyright (C) 2008-2017 Université de Bordeaux
  5. * Copyright (C) 2010-2013,2015-2017 CNRS
  6. * Copyright (C) 2016 Uppsala University
  7. *
  8. * StarPU is free software; you can redistribute it and/or modify
  9. * it under the terms of the GNU Lesser General Public License as published by
  10. * the Free Software Foundation; either version 2.1 of the License, or (at
  11. * your option) any later version.
  12. *
  13. * StarPU is distributed in the hope that it will be useful, but
  14. * WITHOUT ANY WARRANTY; without even the implied warranty of
  15. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  16. *
  17. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  18. */
  19. /*
  20. * This is policy where every worker use the same JOB QUEUE, but taking
  21. * task priorities into account
  22. *
  23. * TODO: merge with eager, after checking the scalability
  24. */
  25. #include <starpu.h>
  26. #include <starpu_scheduler.h>
  27. #include <starpu_bitmap.h>
  28. #include "prio_deque.h"
  29. #include <limits.h>
  30. #include <common/fxt.h>
  31. #include <core/workers.h>
  32. struct _starpu_eager_central_prio_data
  33. {
  34. struct _starpu_prio_deque taskq;
  35. starpu_pthread_mutex_t policy_mutex;
  36. struct starpu_bitmap *waiters;
  37. };
  38. /*
  39. * Centralized queue with priorities
  40. */
  41. static void initialize_eager_center_priority_policy(unsigned sched_ctx_id)
  42. {
  43. struct _starpu_eager_central_prio_data *data;
  44. _STARPU_MALLOC(data, sizeof(struct _starpu_eager_central_prio_data));
  45. /* only a single queue (even though there are several internaly) */
  46. _starpu_prio_deque_init(&data->taskq);
  47. data->waiters = starpu_bitmap_create();
  48. /* Tell helgrind that it's fine to check for empty fifo in
  49. * _starpu_priority_pop_task without actual mutex (it's just an
  50. * integer) */
  51. STARPU_HG_DISABLE_CHECKING(data->taskq.ntasks);
  52. starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)data);
  53. STARPU_PTHREAD_MUTEX_INIT(&data->policy_mutex, NULL);
  54. /* The application may use any integer */
  55. if (starpu_sched_ctx_min_priority_is_set(sched_ctx_id) == 0)
  56. starpu_sched_ctx_set_min_priority(sched_ctx_id, INT_MIN);
  57. if (starpu_sched_ctx_max_priority_is_set(sched_ctx_id) == 0)
  58. starpu_sched_ctx_set_max_priority(sched_ctx_id, INT_MAX);
  59. }
  60. static void deinitialize_eager_center_priority_policy(unsigned sched_ctx_id)
  61. {
  62. /* TODO check that there is no task left in the queue */
  63. struct _starpu_eager_central_prio_data *data = (struct _starpu_eager_central_prio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  64. /* deallocate the job queue */
  65. _starpu_prio_deque_destroy(&data->taskq);
  66. starpu_bitmap_destroy(data->waiters);
  67. STARPU_PTHREAD_MUTEX_DESTROY(&data->policy_mutex);
  68. free(data);
  69. }
  70. static int _starpu_priority_push_task(struct starpu_task *task)
  71. {
  72. unsigned sched_ctx_id = task->sched_ctx;
  73. struct _starpu_eager_central_prio_data *data = (struct _starpu_eager_central_prio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  74. struct _starpu_prio_deque *taskq = &data->taskq;
  75. _starpu_worker_relax_on();
  76. STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
  77. _starpu_worker_relax_off();
  78. _starpu_prio_deque_push_back_task(taskq, task);
  79. if (_starpu_get_nsched_ctxs() > 1)
  80. {
  81. _starpu_worker_relax_on();
  82. _starpu_sched_ctx_lock_write(sched_ctx_id);
  83. _starpu_worker_relax_off();
  84. starpu_sched_ctx_list_task_counters_increment_all_ctx_locked(task, sched_ctx_id);
  85. _starpu_sched_ctx_unlock_write(sched_ctx_id);
  86. }
  87. starpu_push_task_end(task);
  88. /*if there are no tasks block */
  89. /* wake people waiting for a task */
  90. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  91. struct starpu_sched_ctx_iterator it;
  92. #ifndef STARPU_NON_BLOCKING_DRIVERS
  93. char dowake[STARPU_NMAXWORKERS] = { 0 };
  94. #endif
  95. workers->init_iterator_for_parallel_tasks(workers, &it, task);
  96. while(workers->has_next(workers, &it))
  97. {
  98. unsigned worker = workers->get_next(workers, &it);
  99. #ifdef STARPU_NON_BLOCKING_DRIVERS
  100. if (!starpu_bitmap_get(data->waiters, worker))
  101. /* This worker is not waiting for a task */
  102. continue;
  103. #endif
  104. if (starpu_worker_can_execute_task_first_impl(worker, task, NULL))
  105. {
  106. /* It can execute this one, tell him! */
  107. #ifdef STARPU_NON_BLOCKING_DRIVERS
  108. starpu_bitmap_unset(data->waiters, worker);
  109. /* We really woke at least somebody, no need to wake somebody else */
  110. break;
  111. #else
  112. dowake[worker] = 1;
  113. #endif
  114. }
  115. }
  116. /* Let the task free */
  117. STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
  118. #if !defined(STARPU_NON_BLOCKING_DRIVERS) || defined(STARPU_SIMGRID)
  119. /* Now that we have a list of potential workers, try to wake one */
  120. workers->init_iterator(workers, &it);
  121. while(workers->has_next(workers, &it))
  122. {
  123. unsigned worker = workers->get_next(workers, &it);
  124. if (dowake[worker])
  125. if (_starpu_wake_worker_relax_light(worker))
  126. break; // wake up a single worker
  127. }
  128. #endif
  129. return 0;
  130. }
  131. static struct starpu_task *_starpu_priority_pop_task(unsigned sched_ctx_id)
  132. {
  133. struct starpu_task *chosen_task;
  134. unsigned workerid = starpu_worker_get_id_check();
  135. int skipped = 0;
  136. struct _starpu_eager_central_prio_data *data = (struct _starpu_eager_central_prio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  137. struct _starpu_prio_deque *taskq = &data->taskq;
  138. /* Here helgrind would shout that this is unprotected, this is just an
  139. * integer access, and we hold the sched mutex, so we can not miss any
  140. * wake up. */
  141. if (!STARPU_RUNNING_ON_VALGRIND && _starpu_prio_deque_is_empty(taskq))
  142. {
  143. return NULL;
  144. }
  145. #ifdef STARPU_NON_BLOCKING_DRIVERS
  146. if (!STARPU_RUNNING_ON_VALGRIND && starpu_bitmap_get(data->waiters, workerid))
  147. /* Nobody woke us, avoid bothering the mutex */
  148. {
  149. return NULL;
  150. }
  151. #endif
  152. /* block until some event happens */
  153. _starpu_worker_relax_on();
  154. STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
  155. _starpu_worker_relax_off();
  156. chosen_task = _starpu_prio_deque_pop_task_for_worker(taskq, workerid, &skipped);
  157. if (!chosen_task && skipped)
  158. {
  159. /* Notify another worker to do that task */
  160. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  161. struct starpu_sched_ctx_iterator it;
  162. workers->init_iterator_for_parallel_tasks(workers, &it, chosen_task);
  163. while(workers->has_next(workers, &it))
  164. {
  165. unsigned worker = workers->get_next(workers, &it);
  166. if(worker != workerid)
  167. {
  168. #ifdef STARPU_NON_BLOCKING_DRIVERS
  169. starpu_bitmap_unset(data->waiters, worker);
  170. #else
  171. _starpu_wake_worker_relax_light(worker);
  172. #endif
  173. }
  174. }
  175. }
  176. if (!chosen_task)
  177. /* Tell pushers that we are waiting for tasks for us */
  178. starpu_bitmap_set(data->waiters, workerid);
  179. STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
  180. if(chosen_task)
  181. {
  182. _starpu_worker_relax_on();
  183. _starpu_sched_ctx_lock_write(sched_ctx_id);
  184. _starpu_worker_relax_off();
  185. starpu_sched_ctx_list_task_counters_decrement_all_ctx_locked(chosen_task, sched_ctx_id);
  186. unsigned child_sched_ctx = starpu_sched_ctx_worker_is_master_for_child_ctx(workerid, sched_ctx_id);
  187. if(child_sched_ctx != STARPU_NMAX_SCHED_CTXS)
  188. {
  189. starpu_sched_ctx_move_task_to_ctx_locked(chosen_task, child_sched_ctx, 1);
  190. starpu_sched_ctx_revert_task_counters_ctx_locked(sched_ctx_id, chosen_task->flops);
  191. chosen_task = NULL;
  192. }
  193. _starpu_sched_ctx_unlock_write(sched_ctx_id);
  194. }
  195. return chosen_task;
  196. }
  197. static void eager_center_priority_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
  198. {
  199. unsigned i;
  200. for (i = 0; i < nworkers; i++)
  201. {
  202. int workerid = workerids[i];
  203. int curr_workerid = _starpu_worker_get_id();
  204. if(workerid != curr_workerid)
  205. starpu_wake_worker_locked(workerid);
  206. starpu_sched_ctx_worker_shares_tasks_lists(workerid, sched_ctx_id);
  207. }
  208. }
  209. struct starpu_sched_policy _starpu_sched_prio_policy =
  210. {
  211. .add_workers = eager_center_priority_add_workers,
  212. .init_sched = initialize_eager_center_priority_policy,
  213. .deinit_sched = deinitialize_eager_center_priority_policy,
  214. /* we always use priorities in that policy */
  215. .push_task = _starpu_priority_push_task,
  216. .pop_task = _starpu_priority_pop_task,
  217. .pre_exec_hook = NULL,
  218. .post_exec_hook = NULL,
  219. .pop_every_task = NULL,
  220. .policy_name = "prio",
  221. .policy_description = "eager (with priorities)",
  222. .worker_type = STARPU_WORKER_LIST,
  223. };