eager_central_priority_policy.c 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2010-2014 Université de Bordeaux 1
  4. * Copyright (C) 2010, 2011, 2012, 2013 Centre National de la Recherche Scientifique
  5. * Copyright (C) 2011 INRIA
  6. *
  7. * StarPU is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU Lesser General Public License as published by
  9. * the Free Software Foundation; either version 2.1 of the License, or (at
  10. * your option) any later version.
  11. *
  12. * StarPU is distributed in the hope that it will be useful, but
  13. * WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  15. *
  16. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  17. */
  18. /*
  19. * This is policy where every worker use the same JOB QUEUE, but taking
  20. * task priorities into account
  21. */
  22. #include <starpu.h>
  23. #include <starpu_scheduler.h>
  24. #include <starpu_bitmap.h>
  25. #include <common/fxt.h>
  26. #define DEFAULT_MIN_LEVEL (-5)
  27. #define DEFAULT_MAX_LEVEL (+5)
  28. struct _starpu_priority_taskq
  29. {
  30. int min_prio;
  31. int max_prio;
  32. /* the actual lists
  33. * taskq[p] is for priority [p - STARPU_MIN_PRIO] */
  34. struct starpu_task_list *taskq;
  35. unsigned *ntasks;
  36. unsigned total_ntasks;
  37. };
  38. struct _starpu_eager_central_prio_data
  39. {
  40. struct _starpu_priority_taskq *taskq;
  41. starpu_pthread_mutex_t policy_mutex;
  42. struct starpu_bitmap *waiters;
  43. };
  44. /*
  45. * Centralized queue with priorities
  46. */
  47. static struct _starpu_priority_taskq *_starpu_create_priority_taskq(int min_prio, int max_prio)
  48. {
  49. struct _starpu_priority_taskq *central_queue;
  50. central_queue = (struct _starpu_priority_taskq *) malloc(sizeof(struct _starpu_priority_taskq));
  51. central_queue->min_prio = min_prio;
  52. central_queue->max_prio = max_prio;
  53. central_queue->total_ntasks = 0;
  54. central_queue->taskq = malloc((max_prio-min_prio+1) * sizeof(struct starpu_task_list));
  55. central_queue->ntasks = malloc((max_prio-min_prio+1) * sizeof(unsigned));
  56. int prio;
  57. for (prio = 0; prio < (max_prio-min_prio+1); prio++)
  58. {
  59. starpu_task_list_init(&central_queue->taskq[prio]);
  60. central_queue->ntasks[prio] = 0;
  61. }
  62. return central_queue;
  63. }
  64. static void _starpu_destroy_priority_taskq(struct _starpu_priority_taskq *priority_queue)
  65. {
  66. free(priority_queue->ntasks);
  67. free(priority_queue->taskq);
  68. free(priority_queue);
  69. }
  70. static void initialize_eager_center_priority_policy(unsigned sched_ctx_id)
  71. {
  72. starpu_sched_ctx_create_worker_collection(sched_ctx_id, STARPU_WORKER_LIST);
  73. struct _starpu_eager_central_prio_data *data = (struct _starpu_eager_central_prio_data*)malloc(sizeof(struct _starpu_eager_central_prio_data));
  74. /* In this policy, we support more than two levels of priority. */
  75. if (starpu_sched_ctx_min_priority_is_set(sched_ctx_id) == 0)
  76. starpu_sched_ctx_set_min_priority(sched_ctx_id, DEFAULT_MIN_LEVEL);
  77. if (starpu_sched_ctx_max_priority_is_set(sched_ctx_id) == 0)
  78. starpu_sched_ctx_set_max_priority(sched_ctx_id, DEFAULT_MAX_LEVEL);
  79. /* only a single queue (even though there are several internaly) */
  80. data->taskq = _starpu_create_priority_taskq(starpu_sched_ctx_get_min_priority(sched_ctx_id), starpu_sched_ctx_get_max_priority(sched_ctx_id));
  81. data->waiters = starpu_bitmap_create();
  82. /* Tell helgrind that it's fine to check for empty fifo in
  83. * _starpu_priority_pop_task without actual mutex (it's just an
  84. * integer) */
  85. STARPU_HG_DISABLE_CHECKING(data->taskq->total_ntasks);
  86. starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)data);
  87. STARPU_PTHREAD_MUTEX_INIT(&data->policy_mutex, NULL);
  88. }
  89. static void deinitialize_eager_center_priority_policy(unsigned sched_ctx_id)
  90. {
  91. /* TODO check that there is no task left in the queue */
  92. struct _starpu_eager_central_prio_data *data = (struct _starpu_eager_central_prio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  93. /* deallocate the task queue */
  94. _starpu_destroy_priority_taskq(data->taskq);
  95. starpu_bitmap_destroy(data->waiters);
  96. starpu_sched_ctx_delete_worker_collection(sched_ctx_id);
  97. STARPU_PTHREAD_MUTEX_DESTROY(&data->policy_mutex);
  98. free(data);
  99. }
  100. static int _starpu_priority_push_task(struct starpu_task *task)
  101. {
  102. unsigned sched_ctx_id = task->sched_ctx;
  103. struct _starpu_eager_central_prio_data *data = (struct _starpu_eager_central_prio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  104. struct _starpu_priority_taskq *taskq = data->taskq;
  105. STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
  106. unsigned priolevel = task->priority - STARPU_MIN_PRIO;
  107. starpu_task_list_push_back(&taskq->taskq[priolevel], task);
  108. taskq->ntasks[priolevel]++;
  109. taskq->total_ntasks++;
  110. starpu_push_task_end(task);
  111. /*if there are no tasks block */
  112. /* wake people waiting for a task */
  113. unsigned worker = 0;
  114. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  115. struct starpu_sched_ctx_iterator it;
  116. #ifndef STARPU_NON_BLOCKING_DRIVERS
  117. char dowake[STARPU_NMAXWORKERS] = { 0 };
  118. #endif
  119. if(workers->init_iterator)
  120. workers->init_iterator(workers, &it);
  121. while(workers->has_next(workers, &it))
  122. {
  123. worker = workers->get_next(workers, &it);
  124. #ifdef STARPU_NON_BLOCKING_DRIVERS
  125. if (!starpu_bitmap_get(data->waiters, worker))
  126. /* This worker is not waiting for a task */
  127. continue;
  128. #endif
  129. unsigned nimpl;
  130. for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
  131. if (starpu_worker_can_execute_task(worker, task, nimpl))
  132. {
  133. /* It can execute this one, tell him! */
  134. #ifdef STARPU_NON_BLOCKING_DRIVERS
  135. starpu_bitmap_unset(data->waiters, worker);
  136. /* We really woke at least somebody, no need to wake somebody else */
  137. break;
  138. #else
  139. dowake[worker] = 1;
  140. #endif
  141. }
  142. }
  143. /* Let the task free */
  144. STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
  145. #ifndef STARPU_NON_BLOCKING_DRIVERS
  146. /* Now that we have a list of potential workers, try to wake one */
  147. if(workers->init_iterator)
  148. workers->init_iterator(workers, &it);
  149. while(workers->has_next(workers, &it))
  150. {
  151. worker = workers->get_next(workers, &it);
  152. if (dowake[worker])
  153. {
  154. starpu_pthread_mutex_t *sched_mutex;
  155. starpu_pthread_cond_t *sched_cond;
  156. starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
  157. if (starpu_wakeup_worker(worker, sched_cond, sched_mutex))
  158. break; // wake up a single worker
  159. }
  160. }
  161. #endif
  162. return 0;
  163. }
  164. static struct starpu_task *_starpu_priority_pop_task(unsigned sched_ctx_id)
  165. {
  166. struct starpu_task *chosen_task = NULL, *task, *nexttask;
  167. unsigned workerid = starpu_worker_get_id();
  168. int skipped = 0;
  169. struct _starpu_eager_central_prio_data *data = (struct _starpu_eager_central_prio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  170. struct _starpu_priority_taskq *taskq = data->taskq;
  171. /* block until some event happens */
  172. /* Here helgrind would shout that this is unprotected, this is just an
  173. * integer access, and we hold the sched mutex, so we can not miss any
  174. * wake up. */
  175. if (taskq->total_ntasks == 0)
  176. return NULL;
  177. #ifdef STARPU_NON_BLOCKING_DRIVERS
  178. if (starpu_bitmap_get(data->waiters, workerid))
  179. /* Nobody woke us, avoid bothering the mutex */
  180. return NULL;
  181. #endif
  182. /* release this mutex before trying to wake up other workers */
  183. starpu_pthread_mutex_t *curr_sched_mutex;
  184. starpu_pthread_cond_t *curr_sched_cond;
  185. starpu_worker_get_sched_condition(workerid, &curr_sched_mutex, &curr_sched_cond);
  186. STARPU_PTHREAD_MUTEX_UNLOCK(curr_sched_mutex);
  187. /* all workers will block on this mutex anyway so
  188. there's no need for their own mutex to be locked */
  189. STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
  190. unsigned priolevel = taskq->max_prio - taskq->min_prio;
  191. do
  192. {
  193. if (taskq->ntasks[priolevel] > 0)
  194. {
  195. for (task = starpu_task_list_begin(&taskq->taskq[priolevel]);
  196. task != starpu_task_list_end(&taskq->taskq[priolevel]) && !chosen_task;
  197. task = nexttask)
  198. {
  199. unsigned nimpl;
  200. nexttask = starpu_task_list_next(task);
  201. for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
  202. {
  203. if (starpu_worker_can_execute_task(workerid, task, nimpl))
  204. {
  205. /* there is some task that we can grab */
  206. starpu_task_set_implementation(task, nimpl);
  207. starpu_task_list_erase(&taskq->taskq[priolevel], task);
  208. chosen_task = task;
  209. taskq->ntasks[priolevel]--;
  210. taskq->total_ntasks--;
  211. _STARPU_TRACE_JOB_POP(task, 0);
  212. break;
  213. } else skipped = 1;
  214. }
  215. }
  216. }
  217. }
  218. while (!chosen_task && priolevel-- > 0);
  219. if (!chosen_task && skipped)
  220. {
  221. /* Notify another worker to do that task */
  222. unsigned worker = 0;
  223. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  224. struct starpu_sched_ctx_iterator it;
  225. if(workers->init_iterator)
  226. workers->init_iterator(workers, &it);
  227. while(workers->has_next(workers, &it))
  228. {
  229. worker = workers->get_next(workers, &it);
  230. if(worker != workerid)
  231. {
  232. #ifdef STARPU_NON_BLOCKING_DRIVERS
  233. starpu_bitmap_unset(data->waiters, worker);
  234. #else
  235. starpu_pthread_mutex_t *sched_mutex;
  236. starpu_pthread_cond_t *sched_cond;
  237. starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
  238. STARPU_PTHREAD_COND_SIGNAL(sched_cond);
  239. #endif
  240. }
  241. }
  242. }
  243. if (!chosen_task)
  244. /* Tell pushers that we are waiting for tasks for us */
  245. starpu_bitmap_set(data->waiters, workerid);
  246. STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
  247. /* leave the mutex how it was found before this */
  248. STARPU_PTHREAD_MUTEX_LOCK(curr_sched_mutex);
  249. return chosen_task;
  250. }
  251. static void eager_center_priority_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
  252. {
  253. int workerid;
  254. unsigned i;
  255. for (i = 0; i < nworkers; i++)
  256. {
  257. workerid = workerids[i];
  258. starpu_sched_ctx_worker_shares_tasks_lists(workerid, sched_ctx_id);
  259. }
  260. }
  261. struct starpu_sched_policy _starpu_sched_prio_policy =
  262. {
  263. .add_workers = eager_center_priority_add_workers,
  264. .init_sched = initialize_eager_center_priority_policy,
  265. .deinit_sched = deinitialize_eager_center_priority_policy,
  266. /* we always use priorities in that policy */
  267. .push_task = _starpu_priority_push_task,
  268. .pop_task = _starpu_priority_pop_task,
  269. .pre_exec_hook = NULL,
  270. .post_exec_hook = NULL,
  271. .pop_every_task = NULL,
  272. .policy_name = "prio",
  273. .policy_description = "eager (with priorities)"
  274. };