graph_test_policy.c 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2010-2020 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
  4. *
  5. * StarPU is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * StarPU is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. /*
  17. * This is just a test policy for using task graph information
  18. *
  19. * We keep tasks in the fifo queue, and store the graph of tasks, until we
  20. * get the do_schedule call from the application, which tells us all tasks
  21. * were queued, and we can now compute task depths or descendants and let a simple
  22. * central-queue greedy algorithm proceed.
  23. *
  24. * TODO: let workers starting running tasks before the whole graph is submitted?
  25. */
  26. #include <starpu_scheduler.h>
  27. #include <sched_policies/fifo_queues.h>
  28. #include <sched_policies/prio_deque.h>
  29. #include <common/graph.h>
  30. #include <common/thread.h>
  31. #include <common/starpu_bitmap.h>
  32. #include <core/task.h>
  33. #include <core/workers.h>
  34. struct _starpu_graph_test_policy_data
  35. {
  36. struct _starpu_fifo_taskq *fifo; /* Bag of tasks which are ready before do_schedule is called */
  37. struct _starpu_prio_deque prio_cpu;
  38. struct _starpu_prio_deque prio_gpu;
  39. starpu_pthread_mutex_t policy_mutex;
  40. struct starpu_bitmap *waiters;
  41. unsigned computed;
  42. unsigned descendants; /* Whether we use descendants, or depths, for priorities */
  43. };
  44. static void initialize_graph_test_policy(unsigned sched_ctx_id)
  45. {
  46. struct _starpu_graph_test_policy_data *data;
  47. _STARPU_MALLOC(data, sizeof(struct _starpu_graph_test_policy_data));
  48. /* there is only a single queue in that trivial design */
  49. data->fifo = _starpu_create_fifo();
  50. _starpu_prio_deque_init(&data->prio_cpu);
  51. _starpu_prio_deque_init(&data->prio_gpu);
  52. data->waiters = starpu_bitmap_create();
  53. data->computed = 0;
  54. data->descendants = starpu_get_env_number_default("STARPU_SCHED_GRAPH_TEST_DESCENDANTS", 0);
  55. _starpu_graph_record = 1;
  56. /* Tell helgrind that it's fine to check for empty fifo in
  57. * pop_task_graph_test_policy without actual mutex (it's just an integer)
  58. */
  59. STARPU_HG_DISABLE_CHECKING(data->fifo->ntasks);
  60. starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)data);
  61. STARPU_PTHREAD_MUTEX_INIT(&data->policy_mutex, NULL);
  62. }
  63. static void deinitialize_graph_test_policy(unsigned sched_ctx_id)
  64. {
  65. struct _starpu_graph_test_policy_data *data = (struct _starpu_graph_test_policy_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  66. struct _starpu_fifo_taskq *fifo = data->fifo;
  67. STARPU_ASSERT(starpu_task_list_empty(&fifo->taskq));
  68. /* deallocate the job queue */
  69. _starpu_destroy_fifo(fifo);
  70. _starpu_prio_deque_destroy(&data->prio_cpu);
  71. _starpu_prio_deque_destroy(&data->prio_gpu);
  72. starpu_bitmap_destroy(data->waiters);
  73. _starpu_graph_record = 0;
  74. STARPU_PTHREAD_MUTEX_DESTROY(&data->policy_mutex);
  75. free(data);
  76. }
  77. /* Push the given task on CPU or GPU prio list, using a dumb heuristic */
  78. static struct _starpu_prio_deque *select_prio(unsigned sched_ctx_id, struct _starpu_graph_test_policy_data *data, struct starpu_task *task)
  79. {
  80. int cpu_can = 0, gpu_can = 0;
  81. double cpu_speed = 0.;
  82. double gpu_speed = 0.;
  83. /* Compute how fast CPUs can compute it, and how fast GPUs can compute it */
  84. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  85. struct starpu_sched_ctx_iterator it;
  86. workers->init_iterator(workers, &it);
  87. while(workers->has_next(workers, &it))
  88. {
  89. unsigned worker = workers->get_next(workers, &it);
  90. if (!starpu_worker_can_execute_task(worker, task, 0))
  91. /* This worker can not execute this task, don't count it */
  92. continue;
  93. if (starpu_worker_get_type(worker) == STARPU_CPU_WORKER)
  94. /* At least one CPU can run it */
  95. cpu_can = 1;
  96. else
  97. /* At least one GPU can run it */
  98. gpu_can = 1;
  99. /* Get expected task duration for this worker */
  100. struct starpu_perfmodel_arch* perf_arch = starpu_worker_get_perf_archtype(worker, sched_ctx_id);
  101. double length = starpu_task_expected_length(task, perf_arch, 0);
  102. double power;
  103. if (isnan(length))
  104. /* We don't have an estimation yet */
  105. length = 0.;
  106. if (length == 0.)
  107. {
  108. if (!task->cl || task->cl->model == NULL)
  109. {
  110. static unsigned _warned;
  111. if (STARPU_ATOMIC_ADD(&_warned, 1) == 1)
  112. {
  113. _STARPU_DISP("Warning: graph_test needs performance models for all tasks, including %s\n",
  114. starpu_task_get_name(task));
  115. }
  116. else
  117. {
  118. (void)STARPU_ATOMIC_ADD(&_warned, -1);
  119. }
  120. }
  121. power = 0.;
  122. }
  123. else
  124. power = 1./length;
  125. /* Add the computation power to the CPU or GPU pool */
  126. if (starpu_worker_get_type(worker) == STARPU_CPU_WORKER)
  127. cpu_speed += power;
  128. else
  129. gpu_speed += power;
  130. }
  131. /* Decide to push on CPUs or GPUs depending on the overall computation power */
  132. if (!gpu_can || (cpu_can && cpu_speed > gpu_speed))
  133. return &data->prio_cpu;
  134. else
  135. return &data->prio_gpu;
  136. }
  137. static void set_priority(void *_data, struct _starpu_graph_node *node)
  138. {
  139. struct _starpu_graph_test_policy_data *data = _data;
  140. starpu_worker_relax_on();
  141. STARPU_PTHREAD_MUTEX_LOCK(&node->mutex);
  142. starpu_worker_relax_off();
  143. struct _starpu_job *job = node->job;
  144. if (job)
  145. {
  146. if (data->descendants)
  147. job->task->priority = node->descendants;
  148. else
  149. job->task->priority = node->depth;
  150. }
  151. STARPU_PTHREAD_MUTEX_UNLOCK(&node->mutex);
  152. }
  153. static void do_schedule_graph_test_policy(unsigned sched_ctx_id)
  154. {
  155. struct _starpu_graph_test_policy_data *data = (struct _starpu_graph_test_policy_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  156. starpu_worker_relax_on();
  157. STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
  158. starpu_worker_relax_off();
  159. if (data->descendants)
  160. _starpu_graph_compute_descendants();
  161. else
  162. _starpu_graph_compute_depths();
  163. if (data->computed == 0)
  164. {
  165. data->computed = 1;
  166. /* FIXME: if data->computed already == 1, some tasks may already have been pushed to priority stage '0' in
  167. * push_task_graph_test_policy, then if we change the priority here, the stage lookup to remove the task
  168. * will get the wrong stage */
  169. _starpu_graph_foreach(set_priority, data);
  170. }
  171. /* Now that we have priorities, move tasks from bag to priority queue */
  172. while(!_starpu_fifo_empty(data->fifo))
  173. {
  174. struct starpu_task *task = _starpu_fifo_pop_task(data->fifo, -1);
  175. struct _starpu_prio_deque *prio = select_prio(sched_ctx_id, data, task);
  176. _starpu_prio_deque_push_back_task(prio, task);
  177. }
  178. /* And unleash the beast! */
  179. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  180. struct starpu_sched_ctx_iterator it;
  181. #ifdef STARPU_NON_BLOCKING_DRIVERS
  182. workers->init_iterator(workers, &it);
  183. while(workers->has_next(workers, &it))
  184. {
  185. /* Tell each worker is shouldn't sleep any more */
  186. unsigned worker = workers->get_next(workers, &it);
  187. starpu_bitmap_unset(data->waiters, worker);
  188. }
  189. #endif
  190. STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
  191. #if !defined(STARPU_NON_BLOCKING_DRIVERS) || defined(STARPU_SIMGRID)
  192. workers->init_iterator(workers, &it);
  193. while(workers->has_next(workers, &it))
  194. {
  195. /* Wake each worker */
  196. unsigned worker = workers->get_next(workers, &it);
  197. starpu_wake_worker_relax_light(worker);
  198. }
  199. #endif
  200. }
  201. static int push_task_graph_test_policy(struct starpu_task *task)
  202. {
  203. unsigned sched_ctx_id = task->sched_ctx;
  204. struct _starpu_graph_test_policy_data *data = (struct _starpu_graph_test_policy_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  205. starpu_worker_relax_on();
  206. STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
  207. starpu_worker_relax_off();
  208. if (!data->computed)
  209. {
  210. /* Priorities are not computed, leave the task in the bag for now */
  211. starpu_task_list_push_back(&data->fifo->taskq,task);
  212. data->fifo->ntasks++;
  213. data->fifo->nprocessed++;
  214. starpu_push_task_end(task);
  215. STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
  216. return 0;
  217. }
  218. /* Priorities are computed, we can push to execution */
  219. struct _starpu_prio_deque *prio = select_prio(sched_ctx_id, data, task);
  220. _starpu_prio_deque_push_back_task(prio, task);
  221. starpu_push_task_end(task);
  222. /*if there are no tasks block */
  223. /* wake people waiting for a task */
  224. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  225. struct starpu_sched_ctx_iterator it;
  226. #ifndef STARPU_NON_BLOCKING_DRIVERS
  227. char dowake[STARPU_NMAXWORKERS] = { 0 };
  228. #endif
  229. workers->init_iterator_for_parallel_tasks(workers, &it, task);
  230. while(workers->has_next(workers, &it))
  231. {
  232. unsigned worker = workers->get_next(workers, &it);
  233. #ifdef STARPU_NON_BLOCKING_DRIVERS
  234. if (!starpu_bitmap_get(data->waiters, worker))
  235. /* This worker is not waiting for a task */
  236. continue;
  237. #endif
  238. if (prio == &data->prio_cpu && starpu_worker_get_type(worker) != STARPU_CPU_WORKER)
  239. /* This worker doesn't pop from the queue we have filled */
  240. continue;
  241. if (prio == &data->prio_gpu && starpu_worker_get_type(worker) == STARPU_CPU_WORKER)
  242. /* This worker doesn't pop from the queue we have filled */
  243. continue;
  244. if (starpu_worker_can_execute_task_first_impl(worker, task, NULL))
  245. {
  246. /* It can execute this one, tell him! */
  247. #ifdef STARPU_NON_BLOCKING_DRIVERS
  248. starpu_bitmap_unset(data->waiters, worker);
  249. /* We really woke at least somebody, no need to wake somebody else */
  250. break;
  251. #else
  252. dowake[worker] = 1;
  253. #endif
  254. }
  255. }
  256. /* Let the task free */
  257. STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
  258. #if !defined(STARPU_NON_BLOCKING_DRIVERS) || defined(STARPU_SIMGRID)
  259. /* Now that we have a list of potential workers, try to wake one */
  260. workers->init_iterator_for_parallel_tasks(workers, &it, task);
  261. while(workers->has_next(workers, &it))
  262. {
  263. unsigned worker = workers->get_next(workers, &it);
  264. if (dowake[worker])
  265. {
  266. if (starpu_wake_worker_relax_light(worker))
  267. break; // wake up a single worker
  268. }
  269. }
  270. #endif
  271. return 0;
  272. }
  273. static struct starpu_task *pop_task_graph_test_policy(unsigned sched_ctx_id)
  274. {
  275. struct starpu_task *chosen_task = NULL;
  276. unsigned workerid = starpu_worker_get_id_check();
  277. struct _starpu_graph_test_policy_data *data = (struct _starpu_graph_test_policy_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  278. struct _starpu_prio_deque *prio;
  279. if (starpu_worker_get_type(workerid) == STARPU_CPU_WORKER)
  280. prio = &data->prio_cpu;
  281. else
  282. prio = &data->prio_gpu;
  283. /* block until some event happens */
  284. /* Here helgrind would shout that this is unprotected, this is just an
  285. * integer access, and we hold the sched mutex, so we can not miss any
  286. * wake up. */
  287. if (!STARPU_RUNNING_ON_VALGRIND && _starpu_prio_deque_is_empty(prio))
  288. return NULL;
  289. #ifdef STARPU_NON_BLOCKING_DRIVERS
  290. if (!STARPU_RUNNING_ON_VALGRIND && !data->computed)
  291. /* Not computed yet */
  292. return NULL;
  293. if (!STARPU_RUNNING_ON_VALGRIND && starpu_bitmap_get(data->waiters, workerid))
  294. /* Nobody woke us, avoid bothering the mutex */
  295. return NULL;
  296. #endif
  297. starpu_worker_relax_on();
  298. STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
  299. starpu_worker_relax_off();
  300. if (!data->computed)
  301. {
  302. STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
  303. return NULL;
  304. }
  305. chosen_task = _starpu_prio_deque_pop_task_for_worker(prio, workerid, NULL);
  306. if (!chosen_task)
  307. /* Tell pushers that we are waiting for tasks for us */
  308. starpu_bitmap_set(data->waiters, workerid);
  309. STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
  310. return chosen_task;
  311. }
  312. struct starpu_sched_policy _starpu_sched_graph_test_policy =
  313. {
  314. .init_sched = initialize_graph_test_policy,
  315. .deinit_sched = deinitialize_graph_test_policy,
  316. .do_schedule = do_schedule_graph_test_policy,
  317. .push_task = push_task_graph_test_policy,
  318. .pop_task = pop_task_graph_test_policy,
  319. .policy_name = "graph_test",
  320. .policy_description = "test policy for using graphs in scheduling decisions",
  321. .worker_type = STARPU_WORKER_LIST,
  322. };