graph_test_policy.c 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2010-2021 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
  4. *
  5. * StarPU is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * StarPU is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. /*
  17. * This is just a test policy for using task graph information
  18. *
  19. * We keep tasks in the fifo queue, and store the graph of tasks, until we
  20. * get the do_schedule call from the application, which tells us all tasks
  21. * were queued, and we can now compute task depths or descendants and let a simple
  22. * central-queue greedy algorithm proceed.
  23. *
  24. * TODO: let workers starting running tasks before the whole graph is submitted?
  25. */
  26. #include <starpu_scheduler.h>
  27. #include <sched_policies/fifo_queues.h>
  28. #include <sched_policies/prio_deque.h>
  29. #include <common/graph.h>
  30. #include <common/thread.h>
  31. #include <starpu_bitmap.h>
  32. #include <core/task.h>
  33. #include <core/workers.h>
  34. struct _starpu_graph_test_policy_data
  35. {
  36. struct _starpu_fifo_taskq fifo; /* Bag of tasks which are ready before do_schedule is called */
  37. struct _starpu_prio_deque prio_cpu;
  38. struct _starpu_prio_deque prio_gpu;
  39. starpu_pthread_mutex_t policy_mutex;
  40. struct starpu_bitmap waiters;
  41. unsigned computed;
  42. unsigned descendants; /* Whether we use descendants, or depths, for priorities */
  43. };
  44. static void initialize_graph_test_policy(unsigned sched_ctx_id)
  45. {
  46. struct _starpu_graph_test_policy_data *data;
  47. _STARPU_MALLOC(data, sizeof(struct _starpu_graph_test_policy_data));
  48. /* there is only a single queue in that trivial design */
  49. _starpu_init_fifo(&data->fifo);
  50. _starpu_prio_deque_init(&data->prio_cpu);
  51. _starpu_prio_deque_init(&data->prio_gpu);
  52. starpu_bitmap_init(&data->waiters);
  53. data->computed = 0;
  54. data->descendants = starpu_get_env_number_default("STARPU_SCHED_GRAPH_TEST_DESCENDANTS", 0);
  55. _starpu_graph_record = 1;
  56. starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)data);
  57. STARPU_PTHREAD_MUTEX_INIT(&data->policy_mutex, NULL);
  58. }
  59. static void deinitialize_graph_test_policy(unsigned sched_ctx_id)
  60. {
  61. struct _starpu_graph_test_policy_data *data = (struct _starpu_graph_test_policy_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  62. struct _starpu_fifo_taskq *fifo = &data->fifo;
  63. STARPU_ASSERT(starpu_task_list_empty(&fifo->taskq));
  64. /* deallocate the job queue */
  65. _starpu_prio_deque_destroy(&data->prio_cpu);
  66. _starpu_prio_deque_destroy(&data->prio_gpu);
  67. _starpu_graph_record = 0;
  68. STARPU_PTHREAD_MUTEX_DESTROY(&data->policy_mutex);
  69. free(data);
  70. }
  71. /* Push the given task on CPU or GPU prio list, using a dumb heuristic */
  72. static struct _starpu_prio_deque *select_prio(unsigned sched_ctx_id, struct _starpu_graph_test_policy_data *data, struct starpu_task *task)
  73. {
  74. int cpu_can = 0, gpu_can = 0;
  75. double cpu_speed = 0.;
  76. double gpu_speed = 0.;
  77. /* Compute how fast CPUs can compute it, and how fast GPUs can compute it */
  78. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  79. struct starpu_sched_ctx_iterator it;
  80. workers->init_iterator(workers, &it);
  81. while(workers->has_next(workers, &it))
  82. {
  83. unsigned worker = workers->get_next(workers, &it);
  84. if (!starpu_worker_can_execute_task(worker, task, 0))
  85. /* This worker can not execute this task, don't count it */
  86. continue;
  87. if (starpu_worker_get_type(worker) == STARPU_CPU_WORKER)
  88. /* At least one CPU can run it */
  89. cpu_can = 1;
  90. else
  91. /* At least one GPU can run it */
  92. gpu_can = 1;
  93. /* Get expected task duration for this worker */
  94. struct starpu_perfmodel_arch* perf_arch = starpu_worker_get_perf_archtype(worker, sched_ctx_id);
  95. double length = starpu_task_expected_length(task, perf_arch, 0);
  96. double power;
  97. if (isnan(length))
  98. /* We don't have an estimation yet */
  99. length = 0.;
  100. if (length == 0.)
  101. {
  102. if (!task->cl || task->cl->model == NULL)
  103. {
  104. static unsigned _warned;
  105. STARPU_HG_DISABLE_CHECKING(_warned);
  106. if (STARPU_ATOMIC_ADD(&_warned, 1) == 1)
  107. {
  108. _STARPU_DISP("Warning: graph_test needs performance models for all tasks, including %s\n",
  109. starpu_task_get_name(task));
  110. }
  111. else
  112. {
  113. (void)STARPU_ATOMIC_ADD(&_warned, -1);
  114. }
  115. }
  116. power = 0.;
  117. }
  118. else
  119. power = 1./length;
  120. /* Add the computation power to the CPU or GPU pool */
  121. if (starpu_worker_get_type(worker) == STARPU_CPU_WORKER)
  122. cpu_speed += power;
  123. else
  124. gpu_speed += power;
  125. }
  126. /* Decide to push on CPUs or GPUs depending on the overall computation power */
  127. if (!gpu_can || (cpu_can && cpu_speed > gpu_speed))
  128. return &data->prio_cpu;
  129. else
  130. return &data->prio_gpu;
  131. }
  132. static void set_priority(void *_data, struct _starpu_graph_node *node)
  133. {
  134. struct _starpu_graph_test_policy_data *data = _data;
  135. starpu_worker_relax_on();
  136. STARPU_PTHREAD_MUTEX_LOCK(&node->mutex);
  137. starpu_worker_relax_off();
  138. struct _starpu_job *job = node->job;
  139. if (job)
  140. {
  141. if (data->descendants)
  142. job->task->priority = node->descendants;
  143. else
  144. job->task->priority = node->depth;
  145. }
  146. STARPU_PTHREAD_MUTEX_UNLOCK(&node->mutex);
  147. }
  148. static void do_schedule_graph_test_policy(unsigned sched_ctx_id)
  149. {
  150. struct _starpu_graph_test_policy_data *data = (struct _starpu_graph_test_policy_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  151. starpu_worker_relax_on();
  152. STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
  153. starpu_worker_relax_off();
  154. if (data->descendants)
  155. _starpu_graph_compute_descendants();
  156. else
  157. _starpu_graph_compute_depths();
  158. if (data->computed == 0)
  159. {
  160. data->computed = 1;
  161. /* FIXME: if data->computed already == 1, some tasks may already have been pushed to priority stage '0' in
  162. * push_task_graph_test_policy, then if we change the priority here, the stage lookup to remove the task
  163. * will get the wrong stage */
  164. _starpu_graph_foreach(set_priority, data);
  165. }
  166. /* Now that we have priorities, move tasks from bag to priority queue */
  167. while(!_starpu_fifo_empty(&data->fifo))
  168. {
  169. struct starpu_task *task = _starpu_fifo_pop_task(&data->fifo, -1);
  170. struct _starpu_prio_deque *prio = select_prio(sched_ctx_id, data, task);
  171. _starpu_prio_deque_push_back_task(prio, task);
  172. }
  173. /* And unleash the beast! */
  174. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  175. struct starpu_sched_ctx_iterator it;
  176. #ifdef STARPU_NON_BLOCKING_DRIVERS
  177. workers->init_iterator(workers, &it);
  178. while(workers->has_next(workers, &it))
  179. {
  180. /* Tell each worker is shouldn't sleep any more */
  181. unsigned worker = workers->get_next(workers, &it);
  182. starpu_bitmap_unset(&data->waiters, worker);
  183. }
  184. #endif
  185. STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
  186. #if !defined(STARPU_NON_BLOCKING_DRIVERS) || defined(STARPU_SIMGRID)
  187. workers->init_iterator(workers, &it);
  188. while(workers->has_next(workers, &it))
  189. {
  190. /* Wake each worker */
  191. unsigned worker = workers->get_next(workers, &it);
  192. starpu_wake_worker_relax_light(worker);
  193. }
  194. #endif
  195. }
  196. static int push_task_graph_test_policy(struct starpu_task *task)
  197. {
  198. unsigned sched_ctx_id = task->sched_ctx;
  199. struct _starpu_graph_test_policy_data *data = (struct _starpu_graph_test_policy_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  200. starpu_worker_relax_on();
  201. STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
  202. starpu_worker_relax_off();
  203. if (!data->computed)
  204. {
  205. /* Priorities are not computed, leave the task in the bag for now */
  206. starpu_task_list_push_back(&data->fifo.taskq,task);
  207. data->fifo.ntasks++;
  208. data->fifo.nprocessed++;
  209. starpu_push_task_end(task);
  210. STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
  211. return 0;
  212. }
  213. /* Priorities are computed, we can push to execution */
  214. struct _starpu_prio_deque *prio = select_prio(sched_ctx_id, data, task);
  215. _starpu_prio_deque_push_back_task(prio, task);
  216. starpu_push_task_end(task);
  217. /*if there are no tasks block */
  218. /* wake people waiting for a task */
  219. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  220. struct starpu_sched_ctx_iterator it;
  221. #ifndef STARPU_NON_BLOCKING_DRIVERS
  222. char dowake[STARPU_NMAXWORKERS] = { 0 };
  223. #endif
  224. workers->init_iterator_for_parallel_tasks(workers, &it, task);
  225. while(workers->has_next(workers, &it))
  226. {
  227. unsigned worker = workers->get_next(workers, &it);
  228. #ifdef STARPU_NON_BLOCKING_DRIVERS
  229. if (!starpu_bitmap_get(&data->waiters, worker))
  230. /* This worker is not waiting for a task */
  231. continue;
  232. #endif
  233. if (prio == &data->prio_cpu && starpu_worker_get_type(worker) != STARPU_CPU_WORKER)
  234. /* This worker doesn't pop from the queue we have filled */
  235. continue;
  236. if (prio == &data->prio_gpu && starpu_worker_get_type(worker) == STARPU_CPU_WORKER)
  237. /* This worker doesn't pop from the queue we have filled */
  238. continue;
  239. if (starpu_worker_can_execute_task_first_impl(worker, task, NULL))
  240. {
  241. /* It can execute this one, tell him! */
  242. #ifdef STARPU_NON_BLOCKING_DRIVERS
  243. starpu_bitmap_unset(&data->waiters, worker);
  244. /* We really woke at least somebody, no need to wake somebody else */
  245. break;
  246. #else
  247. dowake[worker] = 1;
  248. #endif
  249. }
  250. }
  251. /* Let the task free */
  252. STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
  253. #if !defined(STARPU_NON_BLOCKING_DRIVERS) || defined(STARPU_SIMGRID)
  254. /* Now that we have a list of potential workers, try to wake one */
  255. workers->init_iterator_for_parallel_tasks(workers, &it, task);
  256. while(workers->has_next(workers, &it))
  257. {
  258. unsigned worker = workers->get_next(workers, &it);
  259. if (dowake[worker])
  260. {
  261. if (starpu_wake_worker_relax_light(worker))
  262. break; // wake up a single worker
  263. }
  264. }
  265. #endif
  266. return 0;
  267. }
  268. static struct starpu_task *pop_task_graph_test_policy(unsigned sched_ctx_id)
  269. {
  270. struct starpu_task *chosen_task = NULL;
  271. unsigned workerid = starpu_worker_get_id_check();
  272. struct _starpu_graph_test_policy_data *data = (struct _starpu_graph_test_policy_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  273. struct _starpu_prio_deque *prio;
  274. if (starpu_worker_get_type(workerid) == STARPU_CPU_WORKER)
  275. prio = &data->prio_cpu;
  276. else
  277. prio = &data->prio_gpu;
  278. /* block until some event happens */
  279. /* Here helgrind would shout that this is unprotected, this is just an
  280. * integer access, and we hold the sched mutex, so we can not miss any
  281. * wake up. */
  282. if (!STARPU_RUNNING_ON_VALGRIND && _starpu_prio_deque_is_empty(prio))
  283. return NULL;
  284. #ifdef STARPU_NON_BLOCKING_DRIVERS
  285. if (!STARPU_RUNNING_ON_VALGRIND && !data->computed)
  286. /* Not computed yet */
  287. return NULL;
  288. if (!STARPU_RUNNING_ON_VALGRIND && starpu_bitmap_get(&data->waiters, workerid))
  289. /* Nobody woke us, avoid bothering the mutex */
  290. return NULL;
  291. #endif
  292. starpu_worker_relax_on();
  293. STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
  294. starpu_worker_relax_off();
  295. if (!data->computed)
  296. {
  297. STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
  298. return NULL;
  299. }
  300. chosen_task = _starpu_prio_deque_pop_task_for_worker(prio, workerid, NULL);
  301. if (!chosen_task)
  302. /* Tell pushers that we are waiting for tasks for us */
  303. starpu_bitmap_set(&data->waiters, workerid);
  304. STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
  305. return chosen_task;
  306. }
  307. struct starpu_sched_policy _starpu_sched_graph_test_policy =
  308. {
  309. .init_sched = initialize_graph_test_policy,
  310. .deinit_sched = deinitialize_graph_test_policy,
  311. .do_schedule = do_schedule_graph_test_policy,
  312. .push_task = push_task_graph_test_policy,
  313. .pop_task = pop_task_graph_test_policy,
  314. .policy_name = "graph_test",
  315. .policy_description = "test policy for using graphs in scheduling decisions",
  316. .worker_type = STARPU_WORKER_LIST,
  317. };