eager_central_priority_policy.c 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2010-2016 Université de Bordeaux
  4. * Copyright (C) 2010, 2011, 2012, 2013, 2015, 2016 CNRS
  5. * Copyright (C) 2011 INRIA
  6. *
  7. * StarPU is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU Lesser General Public License as published by
  9. * the Free Software Foundation; either version 2.1 of the License, or (at
  10. * your option) any later version.
  11. *
  12. * StarPU is distributed in the hope that it will be useful, but
  13. * WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  15. *
  16. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  17. */
  18. /*
  19. * This is policy where every worker use the same JOB QUEUE, but taking
  20. * task priorities into account
  21. */
  22. #include <starpu.h>
  23. #include <starpu_scheduler.h>
  24. #include <starpu_bitmap.h>
  25. #include <common/fxt.h>
  26. #include <core/workers.h>
  27. #define DEFAULT_MIN_LEVEL (-5)
  28. #define DEFAULT_MAX_LEVEL (+5)
  29. struct _starpu_priority_taskq
  30. {
  31. int min_prio;
  32. int max_prio;
  33. /* the actual lists
  34. * taskq[p] is for priority [p - STARPU_MIN_PRIO] */
  35. struct starpu_task_list *taskq;
  36. unsigned *ntasks;
  37. unsigned total_ntasks;
  38. };
  39. struct _starpu_eager_central_prio_data
  40. {
  41. struct _starpu_priority_taskq *taskq;
  42. starpu_pthread_mutex_t policy_mutex;
  43. struct starpu_bitmap *waiters;
  44. };
  45. /*
  46. * Centralized queue with priorities
  47. */
  48. static struct _starpu_priority_taskq *_starpu_create_priority_taskq(int min_prio, int max_prio)
  49. {
  50. struct _starpu_priority_taskq *central_queue;
  51. central_queue = (struct _starpu_priority_taskq *) malloc(sizeof(struct _starpu_priority_taskq));
  52. central_queue->min_prio = min_prio;
  53. central_queue->max_prio = max_prio;
  54. central_queue->total_ntasks = 0;
  55. central_queue->taskq = malloc((max_prio-min_prio+1) * sizeof(struct starpu_task_list));
  56. central_queue->ntasks = malloc((max_prio-min_prio+1) * sizeof(unsigned));
  57. int prio;
  58. for (prio = 0; prio < (max_prio-min_prio+1); prio++)
  59. {
  60. starpu_task_list_init(&central_queue->taskq[prio]);
  61. central_queue->ntasks[prio] = 0;
  62. }
  63. return central_queue;
  64. }
  65. static void _starpu_destroy_priority_taskq(struct _starpu_priority_taskq *priority_queue)
  66. {
  67. free(priority_queue->ntasks);
  68. free(priority_queue->taskq);
  69. free(priority_queue);
  70. }
  71. static void initialize_eager_center_priority_policy(unsigned sched_ctx_id)
  72. {
  73. struct _starpu_eager_central_prio_data *data = (struct _starpu_eager_central_prio_data*)malloc(sizeof(struct _starpu_eager_central_prio_data));
  74. /* In this policy, we support more than two levels of priority. */
  75. if (starpu_sched_ctx_min_priority_is_set(sched_ctx_id) == 0)
  76. starpu_sched_ctx_set_min_priority(sched_ctx_id, DEFAULT_MIN_LEVEL);
  77. if (starpu_sched_ctx_max_priority_is_set(sched_ctx_id) == 0)
  78. starpu_sched_ctx_set_max_priority(sched_ctx_id, DEFAULT_MAX_LEVEL);
  79. /* only a single queue (even though there are several internaly) */
  80. data->taskq = _starpu_create_priority_taskq(starpu_sched_ctx_get_min_priority(sched_ctx_id), starpu_sched_ctx_get_max_priority(sched_ctx_id));
  81. data->waiters = starpu_bitmap_create();
  82. /* Tell helgrind that it's fine to check for empty fifo in
  83. * _starpu_priority_pop_task without actual mutex (it's just an
  84. * integer) */
  85. STARPU_HG_DISABLE_CHECKING(data->taskq->total_ntasks);
  86. starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)data);
  87. STARPU_PTHREAD_MUTEX_INIT(&data->policy_mutex, NULL);
  88. }
  89. static void deinitialize_eager_center_priority_policy(unsigned sched_ctx_id)
  90. {
  91. /* TODO check that there is no task left in the queue */
  92. struct _starpu_eager_central_prio_data *data = (struct _starpu_eager_central_prio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  93. /* deallocate the job queue */
  94. _starpu_destroy_priority_taskq(data->taskq);
  95. starpu_bitmap_destroy(data->waiters);
  96. STARPU_PTHREAD_MUTEX_DESTROY(&data->policy_mutex);
  97. free(data);
  98. }
  99. static int _starpu_priority_push_task(struct starpu_task *task)
  100. {
  101. unsigned sched_ctx_id = task->sched_ctx;
  102. struct _starpu_eager_central_prio_data *data = (struct _starpu_eager_central_prio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  103. struct _starpu_priority_taskq *taskq = data->taskq;
  104. STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
  105. unsigned priolevel = task->priority - STARPU_MIN_PRIO;
  106. starpu_task_list_push_back(&taskq->taskq[priolevel], task);
  107. taskq->ntasks[priolevel]++;
  108. taskq->total_ntasks++;
  109. starpu_push_task_end(task);
  110. /*if there are no tasks block */
  111. /* wake people waiting for a task */
  112. unsigned worker = 0;
  113. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  114. struct starpu_sched_ctx_iterator it;
  115. #ifndef STARPU_NON_BLOCKING_DRIVERS
  116. char dowake[STARPU_NMAXWORKERS] = { 0 };
  117. #endif
  118. workers->init_iterator_for_parallel_tasks(workers, &it, task);
  119. while(workers->has_next(workers, &it))
  120. {
  121. worker = workers->get_next(workers, &it);
  122. #ifdef STARPU_NON_BLOCKING_DRIVERS
  123. if (!starpu_bitmap_get(data->waiters, worker))
  124. /* This worker is not waiting for a task */
  125. continue;
  126. #endif
  127. if (starpu_worker_can_execute_task_first_impl(worker, task, NULL))
  128. {
  129. /* It can execute this one, tell him! */
  130. #ifdef STARPU_NON_BLOCKING_DRIVERS
  131. starpu_bitmap_unset(data->waiters, worker);
  132. /* We really woke at least somebody, no need to wake somebody else */
  133. break;
  134. #else
  135. dowake[worker] = 1;
  136. #endif
  137. }
  138. }
  139. /* Let the task free */
  140. STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
  141. #if !defined(STARPU_NON_BLOCKING_DRIVERS) || defined(STARPU_SIMGRID)
  142. /* Now that we have a list of potential workers, try to wake one */
  143. workers->init_iterator(workers, &it);
  144. while(workers->has_next(workers, &it))
  145. {
  146. worker = workers->get_next(workers, &it);
  147. if (dowake[worker])
  148. if (starpu_wake_worker(worker))
  149. break; // wake up a single worker
  150. }
  151. #endif
  152. starpu_sched_ctx_list_task_counters_increment_all(task, sched_ctx_id);
  153. return 0;
  154. }
  155. static struct starpu_task *_starpu_priority_pop_task(unsigned sched_ctx_id)
  156. {
  157. struct starpu_task *chosen_task = NULL, *task, *nexttask;
  158. unsigned workerid = _starpu_worker_get_id_check();
  159. int skipped = 0;
  160. struct _starpu_eager_central_prio_data *data = (struct _starpu_eager_central_prio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  161. struct _starpu_priority_taskq *taskq = data->taskq;
  162. /* block until some event happens */
  163. /* Here helgrind would shout that this is unprotected, this is just an
  164. * integer access, and we hold the sched mutex, so we can not miss any
  165. * wake up. */
  166. if (!STARPU_RUNNING_ON_VALGRIND && taskq->total_ntasks == 0)
  167. return NULL;
  168. #ifdef STARPU_NON_BLOCKING_DRIVERS
  169. if (!STARPU_RUNNING_ON_VALGRIND && starpu_bitmap_get(data->waiters, workerid))
  170. /* Nobody woke us, avoid bothering the mutex */
  171. return NULL;
  172. #endif
  173. /* release this mutex before trying to wake up other workers */
  174. starpu_pthread_mutex_t *curr_sched_mutex;
  175. starpu_pthread_cond_t *curr_sched_cond;
  176. starpu_worker_get_sched_condition(workerid, &curr_sched_mutex, &curr_sched_cond);
  177. STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(curr_sched_mutex);
  178. /* all workers will block on this mutex anyway so
  179. there's no need for their own mutex to be locked */
  180. STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
  181. unsigned priolevel = taskq->max_prio - taskq->min_prio;
  182. do
  183. {
  184. if (taskq->ntasks[priolevel] > 0)
  185. {
  186. for (task = starpu_task_list_begin(&taskq->taskq[priolevel]);
  187. task != starpu_task_list_end(&taskq->taskq[priolevel]) && !chosen_task;
  188. task = nexttask)
  189. {
  190. unsigned nimpl;
  191. nexttask = starpu_task_list_next(task);
  192. if (starpu_worker_can_execute_task_first_impl(workerid, task, &nimpl))
  193. {
  194. /* there is some task that we can grab */
  195. starpu_task_set_implementation(task, nimpl);
  196. starpu_task_list_erase(&taskq->taskq[priolevel], task);
  197. chosen_task = task;
  198. taskq->ntasks[priolevel]--;
  199. taskq->total_ntasks--;
  200. break;
  201. }
  202. else
  203. skipped = 1;
  204. }
  205. }
  206. }
  207. while (!chosen_task && priolevel-- > 0);
  208. if (!chosen_task && skipped)
  209. {
  210. /* Notify another worker to do that task */
  211. unsigned worker = 0;
  212. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  213. struct starpu_sched_ctx_iterator it;
  214. workers->init_iterator_for_parallel_tasks(workers, &it, chosen_task);
  215. while(workers->has_next(workers, &it))
  216. {
  217. worker = workers->get_next(workers, &it);
  218. if(worker != workerid)
  219. {
  220. #ifdef STARPU_NON_BLOCKING_DRIVERS
  221. starpu_bitmap_unset(data->waiters, worker);
  222. #else
  223. starpu_wake_worker_locked(worker);
  224. #endif
  225. }
  226. }
  227. }
  228. if (!chosen_task)
  229. /* Tell pushers that we are waiting for tasks for us */
  230. starpu_bitmap_set(data->waiters, workerid);
  231. STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
  232. /* leave the mutex how it was found before this */
  233. STARPU_PTHREAD_MUTEX_LOCK_SCHED(curr_sched_mutex);
  234. if(chosen_task)
  235. {
  236. starpu_sched_ctx_list_task_counters_decrement_all(chosen_task, sched_ctx_id);
  237. unsigned child_sched_ctx = starpu_sched_ctx_worker_is_master_for_child_ctx(workerid, sched_ctx_id);
  238. if(child_sched_ctx != STARPU_NMAX_SCHED_CTXS)
  239. {
  240. starpu_sched_ctx_move_task_to_ctx(chosen_task, child_sched_ctx, 1);
  241. starpu_sched_ctx_revert_task_counters(sched_ctx_id, chosen_task->flops);
  242. return NULL;
  243. }
  244. }
  245. return chosen_task;
  246. }
  247. static void eager_center_priority_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
  248. {
  249. int workerid;
  250. unsigned i;
  251. for (i = 0; i < nworkers; i++)
  252. {
  253. workerid = workerids[i];
  254. int curr_workerid = _starpu_worker_get_id();
  255. if(workerid != curr_workerid)
  256. starpu_wake_worker(workerid);
  257. starpu_sched_ctx_worker_shares_tasks_lists(workerid, sched_ctx_id);
  258. }
  259. }
  260. struct starpu_sched_policy _starpu_sched_prio_policy =
  261. {
  262. .add_workers = eager_center_priority_add_workers,
  263. .init_sched = initialize_eager_center_priority_policy,
  264. .deinit_sched = deinitialize_eager_center_priority_policy,
  265. /* we always use priorities in that policy */
  266. .push_task = _starpu_priority_push_task,
  267. .pop_task = _starpu_priority_pop_task,
  268. .pre_exec_hook = NULL,
  269. .post_exec_hook = NULL,
  270. .pop_every_task = NULL,
  271. .policy_name = "prio",
  272. .policy_description = "eager (with priorities)",
  273. .worker_type = STARPU_WORKER_LIST,
  274. };