graph_test_policy.c 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2010-2020 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
  4. *
  5. * StarPU is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * StarPU is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. /*
  17. * This is just a test policy for using task graph information
  18. *
  19. * We keep tasks in the fifo queue, and store the graph of tasks, until we
  20. * get the do_schedule call from the application, which tells us all tasks
  21. * were queued, and we can now compute task depths or descendants and let a simple
  22. * central-queue greedy algorithm proceed.
  23. *
  24. * TODO: let workers starting running tasks before the whole graph is submitted?
  25. */
  26. #include <starpu_scheduler.h>
  27. #include <sched_policies/fifo_queues.h>
  28. #include <sched_policies/prio_deque.h>
  29. #include <common/graph.h>
  30. #include <common/thread.h>
  31. #include <starpu_bitmap.h>
  32. #include <core/task.h>
  33. #include <core/workers.h>
  34. struct _starpu_graph_test_policy_data
  35. {
  36. struct _starpu_fifo_taskq fifo; /* Bag of tasks which are ready before do_schedule is called */
  37. struct _starpu_prio_deque prio_cpu;
  38. struct _starpu_prio_deque prio_gpu;
  39. starpu_pthread_mutex_t policy_mutex;
  40. struct starpu_bitmap waiters;
  41. unsigned computed;
  42. unsigned descendants; /* Whether we use descendants, or depths, for priorities */
  43. };
  44. static void initialize_graph_test_policy(unsigned sched_ctx_id)
  45. {
  46. struct _starpu_graph_test_policy_data *data;
  47. _STARPU_MALLOC(data, sizeof(struct _starpu_graph_test_policy_data));
  48. /* there is only a single queue in that trivial design */
  49. _starpu_init_fifo(&data->fifo);
  50. _starpu_prio_deque_init(&data->prio_cpu);
  51. _starpu_prio_deque_init(&data->prio_gpu);
  52. starpu_bitmap_init(&data->waiters);
  53. data->computed = 0;
  54. data->descendants = starpu_get_env_number_default("STARPU_SCHED_GRAPH_TEST_DESCENDANTS", 0);
  55. _starpu_graph_record = 1;
  56. starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)data);
  57. STARPU_PTHREAD_MUTEX_INIT(&data->policy_mutex, NULL);
  58. }
  59. static void deinitialize_graph_test_policy(unsigned sched_ctx_id)
  60. {
  61. struct _starpu_graph_test_policy_data *data = (struct _starpu_graph_test_policy_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  62. struct _starpu_fifo_taskq *fifo = &data->fifo;
  63. STARPU_ASSERT(starpu_task_list_empty(&fifo->taskq));
  64. /* deallocate the job queue */
  65. _starpu_prio_deque_destroy(&data->prio_cpu);
  66. _starpu_prio_deque_destroy(&data->prio_gpu);
  67. _starpu_graph_record = 0;
  68. STARPU_PTHREAD_MUTEX_DESTROY(&data->policy_mutex);
  69. free(data);
  70. }
  71. /* Push the given task on CPU or GPU prio list, using a dumb heuristic */
  72. static struct _starpu_prio_deque *select_prio(unsigned sched_ctx_id, struct _starpu_graph_test_policy_data *data, struct starpu_task *task)
  73. {
  74. int cpu_can = 0, gpu_can = 0;
  75. double cpu_speed = 0.;
  76. double gpu_speed = 0.;
  77. /* Compute how fast CPUs can compute it, and how fast GPUs can compute it */
  78. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  79. struct starpu_sched_ctx_iterator it;
  80. workers->init_iterator(workers, &it);
  81. while(workers->has_next(workers, &it))
  82. {
  83. unsigned worker = workers->get_next(workers, &it);
  84. if (!starpu_worker_can_execute_task(worker, task, 0))
  85. /* This worker can not execute this task, don't count it */
  86. continue;
  87. if (starpu_worker_get_type(worker) == STARPU_CPU_WORKER)
  88. /* At least one CPU can run it */
  89. cpu_can = 1;
  90. else
  91. /* At least one GPU can run it */
  92. gpu_can = 1;
  93. /* Get expected task duration for this worker */
  94. struct starpu_perfmodel_arch* perf_arch = starpu_worker_get_perf_archtype(worker, sched_ctx_id);
  95. double length = starpu_task_expected_length(task, perf_arch, 0);
  96. double power;
  97. if (isnan(length))
  98. /* We don't have an estimation yet */
  99. length = 0.;
  100. if (length == 0.)
  101. {
  102. if (!task->cl || task->cl->model == NULL)
  103. {
  104. static unsigned _warned;
  105. if (STARPU_ATOMIC_ADD(&_warned, 1) == 1)
  106. {
  107. _STARPU_DISP("Warning: graph_test needs performance models for all tasks, including %s\n",
  108. starpu_task_get_name(task));
  109. }
  110. else
  111. {
  112. (void)STARPU_ATOMIC_ADD(&_warned, -1);
  113. }
  114. }
  115. power = 0.;
  116. }
  117. else
  118. power = 1./length;
  119. /* Add the computation power to the CPU or GPU pool */
  120. if (starpu_worker_get_type(worker) == STARPU_CPU_WORKER)
  121. cpu_speed += power;
  122. else
  123. gpu_speed += power;
  124. }
  125. /* Decide to push on CPUs or GPUs depending on the overall computation power */
  126. if (!gpu_can || (cpu_can && cpu_speed > gpu_speed))
  127. return &data->prio_cpu;
  128. else
  129. return &data->prio_gpu;
  130. }
  131. static void set_priority(void *_data, struct _starpu_graph_node *node)
  132. {
  133. struct _starpu_graph_test_policy_data *data = _data;
  134. starpu_worker_relax_on();
  135. STARPU_PTHREAD_MUTEX_LOCK(&node->mutex);
  136. starpu_worker_relax_off();
  137. struct _starpu_job *job = node->job;
  138. if (job)
  139. {
  140. if (data->descendants)
  141. job->task->priority = node->descendants;
  142. else
  143. job->task->priority = node->depth;
  144. }
  145. STARPU_PTHREAD_MUTEX_UNLOCK(&node->mutex);
  146. }
  147. static void do_schedule_graph_test_policy(unsigned sched_ctx_id)
  148. {
  149. struct _starpu_graph_test_policy_data *data = (struct _starpu_graph_test_policy_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  150. starpu_worker_relax_on();
  151. STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
  152. starpu_worker_relax_off();
  153. if (data->descendants)
  154. _starpu_graph_compute_descendants();
  155. else
  156. _starpu_graph_compute_depths();
  157. if (data->computed == 0)
  158. {
  159. data->computed = 1;
  160. /* FIXME: if data->computed already == 1, some tasks may already have been pushed to priority stage '0' in
  161. * push_task_graph_test_policy, then if we change the priority here, the stage lookup to remove the task
  162. * will get the wrong stage */
  163. _starpu_graph_foreach(set_priority, data);
  164. }
  165. /* Now that we have priorities, move tasks from bag to priority queue */
  166. while(!_starpu_fifo_empty(&data->fifo))
  167. {
  168. struct starpu_task *task = _starpu_fifo_pop_task(&data->fifo, -1);
  169. struct _starpu_prio_deque *prio = select_prio(sched_ctx_id, data, task);
  170. _starpu_prio_deque_push_back_task(prio, task);
  171. }
  172. /* And unleash the beast! */
  173. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  174. struct starpu_sched_ctx_iterator it;
  175. #ifdef STARPU_NON_BLOCKING_DRIVERS
  176. workers->init_iterator(workers, &it);
  177. while(workers->has_next(workers, &it))
  178. {
  179. /* Tell each worker is shouldn't sleep any more */
  180. unsigned worker = workers->get_next(workers, &it);
  181. starpu_bitmap_unset(&data->waiters, worker);
  182. }
  183. #endif
  184. STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
  185. #if !defined(STARPU_NON_BLOCKING_DRIVERS) || defined(STARPU_SIMGRID)
  186. workers->init_iterator(workers, &it);
  187. while(workers->has_next(workers, &it))
  188. {
  189. /* Wake each worker */
  190. unsigned worker = workers->get_next(workers, &it);
  191. starpu_wake_worker_relax_light(worker);
  192. }
  193. #endif
  194. }
  195. static int push_task_graph_test_policy(struct starpu_task *task)
  196. {
  197. unsigned sched_ctx_id = task->sched_ctx;
  198. struct _starpu_graph_test_policy_data *data = (struct _starpu_graph_test_policy_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  199. starpu_worker_relax_on();
  200. STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
  201. starpu_worker_relax_off();
  202. if (!data->computed)
  203. {
  204. /* Priorities are not computed, leave the task in the bag for now */
  205. starpu_task_list_push_back(&data->fifo.taskq,task);
  206. data->fifo.ntasks++;
  207. data->fifo.nprocessed++;
  208. starpu_push_task_end(task);
  209. STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
  210. return 0;
  211. }
  212. /* Priorities are computed, we can push to execution */
  213. struct _starpu_prio_deque *prio = select_prio(sched_ctx_id, data, task);
  214. _starpu_prio_deque_push_back_task(prio, task);
  215. starpu_push_task_end(task);
  216. /*if there are no tasks block */
  217. /* wake people waiting for a task */
  218. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  219. struct starpu_sched_ctx_iterator it;
  220. #ifndef STARPU_NON_BLOCKING_DRIVERS
  221. char dowake[STARPU_NMAXWORKERS] = { 0 };
  222. #endif
  223. workers->init_iterator_for_parallel_tasks(workers, &it, task);
  224. while(workers->has_next(workers, &it))
  225. {
  226. unsigned worker = workers->get_next(workers, &it);
  227. #ifdef STARPU_NON_BLOCKING_DRIVERS
  228. if (!starpu_bitmap_get(&data->waiters, worker))
  229. /* This worker is not waiting for a task */
  230. continue;
  231. #endif
  232. if (prio == &data->prio_cpu && starpu_worker_get_type(worker) != STARPU_CPU_WORKER)
  233. /* This worker doesn't pop from the queue we have filled */
  234. continue;
  235. if (prio == &data->prio_gpu && starpu_worker_get_type(worker) == STARPU_CPU_WORKER)
  236. /* This worker doesn't pop from the queue we have filled */
  237. continue;
  238. if (starpu_worker_can_execute_task_first_impl(worker, task, NULL))
  239. {
  240. /* It can execute this one, tell him! */
  241. #ifdef STARPU_NON_BLOCKING_DRIVERS
  242. starpu_bitmap_unset(&data->waiters, worker);
  243. /* We really woke at least somebody, no need to wake somebody else */
  244. break;
  245. #else
  246. dowake[worker] = 1;
  247. #endif
  248. }
  249. }
  250. /* Let the task free */
  251. STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
  252. #if !defined(STARPU_NON_BLOCKING_DRIVERS) || defined(STARPU_SIMGRID)
  253. /* Now that we have a list of potential workers, try to wake one */
  254. workers->init_iterator_for_parallel_tasks(workers, &it, task);
  255. while(workers->has_next(workers, &it))
  256. {
  257. unsigned worker = workers->get_next(workers, &it);
  258. if (dowake[worker])
  259. {
  260. if (starpu_wake_worker_relax_light(worker))
  261. break; // wake up a single worker
  262. }
  263. }
  264. #endif
  265. return 0;
  266. }
  267. static struct starpu_task *pop_task_graph_test_policy(unsigned sched_ctx_id)
  268. {
  269. struct starpu_task *chosen_task = NULL;
  270. unsigned workerid = starpu_worker_get_id_check();
  271. struct _starpu_graph_test_policy_data *data = (struct _starpu_graph_test_policy_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  272. struct _starpu_prio_deque *prio;
  273. if (starpu_worker_get_type(workerid) == STARPU_CPU_WORKER)
  274. prio = &data->prio_cpu;
  275. else
  276. prio = &data->prio_gpu;
  277. /* block until some event happens */
  278. /* Here helgrind would shout that this is unprotected, this is just an
  279. * integer access, and we hold the sched mutex, so we can not miss any
  280. * wake up. */
  281. if (!STARPU_RUNNING_ON_VALGRIND && _starpu_prio_deque_is_empty(prio))
  282. return NULL;
  283. #ifdef STARPU_NON_BLOCKING_DRIVERS
  284. if (!STARPU_RUNNING_ON_VALGRIND && !data->computed)
  285. /* Not computed yet */
  286. return NULL;
  287. if (!STARPU_RUNNING_ON_VALGRIND && starpu_bitmap_get(&data->waiters, workerid))
  288. /* Nobody woke us, avoid bothering the mutex */
  289. return NULL;
  290. #endif
  291. starpu_worker_relax_on();
  292. STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
  293. starpu_worker_relax_off();
  294. if (!data->computed)
  295. {
  296. STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
  297. return NULL;
  298. }
  299. chosen_task = _starpu_prio_deque_pop_task_for_worker(prio, workerid, NULL);
  300. if (!chosen_task)
  301. /* Tell pushers that we are waiting for tasks for us */
  302. starpu_bitmap_set(&data->waiters, workerid);
  303. STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
  304. return chosen_task;
  305. }
  306. struct starpu_sched_policy _starpu_sched_graph_test_policy =
  307. {
  308. .init_sched = initialize_graph_test_policy,
  309. .deinit_sched = deinitialize_graph_test_policy,
  310. .do_schedule = do_schedule_graph_test_policy,
  311. .push_task = push_task_graph_test_policy,
  312. .pop_task = pop_task_graph_test_policy,
  313. .policy_name = "graph_test",
  314. .policy_description = "test policy for using graphs in scheduling decisions",
  315. .worker_type = STARPU_WORKER_LIST,
  316. };