eager_central_priority_policy.c 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2010-2014 Université de Bordeaux
  4. * Copyright (C) 2010, 2011, 2012, 2013, 2015 Centre National de la Recherche Scientifique
  5. * Copyright (C) 2011 INRIA
  6. *
  7. * StarPU is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU Lesser General Public License as published by
  9. * the Free Software Foundation; either version 2.1 of the License, or (at
  10. * your option) any later version.
  11. *
  12. * StarPU is distributed in the hope that it will be useful, but
  13. * WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  15. *
  16. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  17. */
  18. /*
  19. * This is policy where every worker use the same JOB QUEUE, but taking
  20. * task priorities into account
  21. */
  22. #include <starpu.h>
  23. #include <starpu_scheduler.h>
  24. #include <starpu_bitmap.h>
  25. #include <common/fxt.h>
  26. #define DEFAULT_MIN_LEVEL (-5)
  27. #define DEFAULT_MAX_LEVEL (+5)
  28. struct _starpu_priority_taskq
  29. {
  30. int min_prio;
  31. int max_prio;
  32. /* the actual lists
  33. * taskq[p] is for priority [p - STARPU_MIN_PRIO] */
  34. struct starpu_task_list *taskq;
  35. unsigned *ntasks;
  36. unsigned total_ntasks;
  37. };
  38. struct _starpu_eager_central_prio_data
  39. {
  40. struct _starpu_priority_taskq *taskq;
  41. starpu_pthread_mutex_t policy_mutex;
  42. struct starpu_bitmap *waiters;
  43. };
  44. /*
  45. * Centralized queue with priorities
  46. */
  47. static struct _starpu_priority_taskq *_starpu_create_priority_taskq(int min_prio, int max_prio)
  48. {
  49. struct _starpu_priority_taskq *central_queue;
  50. central_queue = (struct _starpu_priority_taskq *) malloc(sizeof(struct _starpu_priority_taskq));
  51. central_queue->min_prio = min_prio;
  52. central_queue->max_prio = max_prio;
  53. central_queue->total_ntasks = 0;
  54. central_queue->taskq = malloc((max_prio-min_prio+1) * sizeof(struct starpu_task_list));
  55. central_queue->ntasks = malloc((max_prio-min_prio+1) * sizeof(unsigned));
  56. int prio;
  57. for (prio = 0; prio < (max_prio-min_prio+1); prio++)
  58. {
  59. starpu_task_list_init(&central_queue->taskq[prio]);
  60. central_queue->ntasks[prio] = 0;
  61. }
  62. return central_queue;
  63. }
  64. static void _starpu_destroy_priority_taskq(struct _starpu_priority_taskq *priority_queue)
  65. {
  66. free(priority_queue->ntasks);
  67. free(priority_queue->taskq);
  68. free(priority_queue);
  69. }
  70. static void initialize_eager_center_priority_policy(unsigned sched_ctx_id)
  71. {
  72. starpu_sched_ctx_create_worker_collection(sched_ctx_id, STARPU_WORKER_LIST);
  73. struct _starpu_eager_central_prio_data *data = (struct _starpu_eager_central_prio_data*)malloc(sizeof(struct _starpu_eager_central_prio_data));
  74. /* In this policy, we support more than two levels of priority. */
  75. if (starpu_sched_ctx_min_priority_is_set(sched_ctx_id) == 0)
  76. starpu_sched_ctx_set_min_priority(sched_ctx_id, DEFAULT_MIN_LEVEL);
  77. if (starpu_sched_ctx_max_priority_is_set(sched_ctx_id) == 0)
  78. starpu_sched_ctx_set_max_priority(sched_ctx_id, DEFAULT_MAX_LEVEL);
  79. /* only a single queue (even though there are several internaly) */
  80. data->taskq = _starpu_create_priority_taskq(starpu_sched_ctx_get_min_priority(sched_ctx_id), starpu_sched_ctx_get_max_priority(sched_ctx_id));
  81. data->waiters = starpu_bitmap_create();
  82. /* Tell helgrind that it's fine to check for empty fifo in
  83. * _starpu_priority_pop_task without actual mutex (it's just an
  84. * integer) */
  85. STARPU_HG_DISABLE_CHECKING(data->taskq->total_ntasks);
  86. starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)data);
  87. STARPU_PTHREAD_MUTEX_INIT(&data->policy_mutex, NULL);
  88. }
  89. static void deinitialize_eager_center_priority_policy(unsigned sched_ctx_id)
  90. {
  91. /* TODO check that there is no task left in the queue */
  92. struct _starpu_eager_central_prio_data *data = (struct _starpu_eager_central_prio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  93. /* deallocate the task queue */
  94. _starpu_destroy_priority_taskq(data->taskq);
  95. starpu_bitmap_destroy(data->waiters);
  96. starpu_sched_ctx_delete_worker_collection(sched_ctx_id);
  97. STARPU_PTHREAD_MUTEX_DESTROY(&data->policy_mutex);
  98. free(data);
  99. }
  100. static int _starpu_priority_push_task(struct starpu_task *task)
  101. {
  102. unsigned sched_ctx_id = task->sched_ctx;
  103. struct _starpu_eager_central_prio_data *data = (struct _starpu_eager_central_prio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  104. struct _starpu_priority_taskq *taskq = data->taskq;
  105. STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
  106. unsigned priolevel = task->priority - STARPU_MIN_PRIO;
  107. starpu_task_list_push_back(&taskq->taskq[priolevel], task);
  108. taskq->ntasks[priolevel]++;
  109. taskq->total_ntasks++;
  110. starpu_push_task_end(task);
  111. /*if there are no tasks block */
  112. /* wake people waiting for a task */
  113. unsigned worker = 0;
  114. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  115. struct starpu_sched_ctx_iterator it;
  116. #ifndef STARPU_NON_BLOCKING_DRIVERS
  117. char dowake[STARPU_NMAXWORKERS] = { 0 };
  118. #endif
  119. workers->init_iterator(workers, &it);
  120. while(workers->has_next(workers, &it))
  121. {
  122. worker = workers->get_next(workers, &it);
  123. #ifdef STARPU_NON_BLOCKING_DRIVERS
  124. if (!starpu_bitmap_get(data->waiters, worker))
  125. /* This worker is not waiting for a task */
  126. continue;
  127. #endif
  128. if (starpu_worker_can_execute_task_first_impl(worker, task, NULL))
  129. {
  130. /* It can execute this one, tell him! */
  131. #ifdef STARPU_NON_BLOCKING_DRIVERS
  132. starpu_bitmap_unset(data->waiters, worker);
  133. /* We really woke at least somebody, no need to wake somebody else */
  134. break;
  135. #else
  136. dowake[worker] = 1;
  137. #endif
  138. }
  139. }
  140. /* Let the task free */
  141. STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
  142. #ifndef STARPU_NON_BLOCKING_DRIVERS
  143. /* Now that we have a list of potential workers, try to wake one */
  144. workers->init_iterator(workers, &it);
  145. while(workers->has_next(workers, &it))
  146. {
  147. worker = workers->get_next(workers, &it);
  148. if (dowake[worker])
  149. if (starpu_wake_worker(worker))
  150. break; // wake up a single worker
  151. }
  152. #endif
  153. return 0;
  154. }
  155. static struct starpu_task *_starpu_priority_pop_task(unsigned sched_ctx_id)
  156. {
  157. struct starpu_task *chosen_task = NULL, *task, *nexttask;
  158. unsigned workerid = starpu_worker_get_id();
  159. int skipped = 0;
  160. struct _starpu_eager_central_prio_data *data = (struct _starpu_eager_central_prio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  161. struct _starpu_priority_taskq *taskq = data->taskq;
  162. /* block until some event happens */
  163. /* Here helgrind would shout that this is unprotected, this is just an
  164. * integer access, and we hold the sched mutex, so we can not miss any
  165. * wake up. */
  166. if (taskq->total_ntasks == 0)
  167. return NULL;
  168. #ifdef STARPU_NON_BLOCKING_DRIVERS
  169. if (starpu_bitmap_get(data->waiters, workerid))
  170. /* Nobody woke us, avoid bothering the mutex */
  171. return NULL;
  172. #endif
  173. /* release this mutex before trying to wake up other workers */
  174. starpu_pthread_mutex_t *curr_sched_mutex;
  175. starpu_pthread_cond_t *curr_sched_cond;
  176. starpu_worker_get_sched_condition(workerid, &curr_sched_mutex, &curr_sched_cond);
  177. STARPU_PTHREAD_MUTEX_UNLOCK(curr_sched_mutex);
  178. /* all workers will block on this mutex anyway so
  179. there's no need for their own mutex to be locked */
  180. STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
  181. unsigned priolevel = taskq->max_prio - taskq->min_prio;
  182. do
  183. {
  184. if (taskq->ntasks[priolevel] > 0)
  185. {
  186. for (task = starpu_task_list_begin(&taskq->taskq[priolevel]);
  187. task != starpu_task_list_end(&taskq->taskq[priolevel]) && !chosen_task;
  188. task = nexttask)
  189. {
  190. unsigned nimpl;
  191. nexttask = starpu_task_list_next(task);
  192. if (starpu_worker_can_execute_task_first_impl(workerid, task, &nimpl))
  193. {
  194. /* there is some task that we can grab */
  195. starpu_task_set_implementation(task, nimpl);
  196. starpu_task_list_erase(&taskq->taskq[priolevel], task);
  197. chosen_task = task;
  198. taskq->ntasks[priolevel]--;
  199. taskq->total_ntasks--;
  200. _STARPU_TRACE_JOB_POP(task, 0);
  201. break;
  202. }
  203. else
  204. skipped = 1;
  205. }
  206. }
  207. }
  208. while (!chosen_task && priolevel-- > 0);
  209. if (!chosen_task && skipped)
  210. {
  211. /* Notify another worker to do that task */
  212. unsigned worker = 0;
  213. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  214. struct starpu_sched_ctx_iterator it;
  215. workers->init_iterator(workers, &it);
  216. while(workers->has_next(workers, &it))
  217. {
  218. worker = workers->get_next(workers, &it);
  219. if(worker != workerid)
  220. {
  221. #ifdef STARPU_NON_BLOCKING_DRIVERS
  222. starpu_bitmap_unset(data->waiters, worker);
  223. #else
  224. starpu_pthread_mutex_t *sched_mutex;
  225. starpu_pthread_cond_t *sched_cond;
  226. starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
  227. STARPU_PTHREAD_COND_SIGNAL(sched_cond);
  228. #endif
  229. }
  230. }
  231. }
  232. if (!chosen_task)
  233. /* Tell pushers that we are waiting for tasks for us */
  234. starpu_bitmap_set(data->waiters, workerid);
  235. STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
  236. /* leave the mutex how it was found before this */
  237. STARPU_PTHREAD_MUTEX_LOCK(curr_sched_mutex);
  238. if(chosen_task)
  239. {
  240. unsigned child_sched_ctx = starpu_sched_ctx_worker_is_master_for_child_ctx(workerid, sched_ctx_id);
  241. if(child_sched_ctx != STARPU_NMAX_SCHED_CTXS)
  242. {
  243. starpu_sched_ctx_move_task_to_ctx(chosen_task, child_sched_ctx);
  244. starpu_sched_ctx_revert_task_counters(sched_ctx_id, chosen_task->flops);
  245. return NULL;
  246. }
  247. }
  248. return chosen_task;
  249. }
  250. static void eager_center_priority_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
  251. {
  252. int workerid;
  253. unsigned i;
  254. for (i = 0; i < nworkers; i++)
  255. {
  256. workerid = workerids[i];
  257. starpu_sched_ctx_worker_shares_tasks_lists(workerid, sched_ctx_id);
  258. }
  259. }
  260. struct starpu_sched_policy _starpu_sched_prio_policy =
  261. {
  262. .add_workers = eager_center_priority_add_workers,
  263. .init_sched = initialize_eager_center_priority_policy,
  264. .deinit_sched = deinitialize_eager_center_priority_policy,
  265. /* we always use priorities in that policy */
  266. .push_task = _starpu_priority_push_task,
  267. .pop_task = _starpu_priority_pop_task,
  268. .pre_exec_hook = NULL,
  269. .post_exec_hook = NULL,
  270. .pop_every_task = NULL,
  271. .policy_name = "prio",
  272. .policy_description = "eager (with priorities)"
  273. };