parallel_eager.c 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2011-2013,2015,2017,2018 Inria
  4. * Copyright (C) 2011-2016 Université de Bordeaux
  5. * Copyright (C) 2011-2014,2016-2018 CNRS
  6. * Copyright (C) 2013 Thibaut Lambert
  7. * Copyright (C) 2011 Télécom-SudParis
  8. *
  9. * StarPU is free software; you can redistribute it and/or modify
  10. * it under the terms of the GNU Lesser General Public License as published by
  11. * the Free Software Foundation; either version 2.1 of the License, or (at
  12. * your option) any later version.
  13. *
  14. * StarPU is distributed in the hope that it will be useful, but
  15. * WITHOUT ANY WARRANTY; without even the implied warranty of
  16. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  17. *
  18. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  19. */
  20. #include <sched_policies/fifo_queues.h>
  21. #include <core/detect_combined_workers.h>
  22. #include <starpu_scheduler.h>
  23. #include <core/workers.h>
  24. struct _starpu_peager_data
  25. {
  26. struct _starpu_fifo_taskq *fifo;
  27. struct _starpu_fifo_taskq *local_fifo[STARPU_NMAXWORKERS];
  28. starpu_pthread_mutex_t policy_mutex;
  29. int possible_combinations_cnt[STARPU_NMAXWORKERS];
  30. int *possible_combinations[STARPU_NMAXWORKERS];
  31. int *possible_combinations_size[STARPU_NMAXWORKERS];
  32. int max_combination_size[STARPU_NMAXWORKERS];
  33. };
  34. static void peager_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
  35. {
  36. _starpu_sched_find_worker_combinations(workerids, nworkers);
  37. struct _starpu_peager_data *data = (struct _starpu_peager_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  38. const unsigned nbasic_workers = starpu_worker_get_count();
  39. const unsigned ncombined_workers = starpu_combined_worker_get_count();
  40. unsigned i;
  41. for(i = 0; i < nworkers; i++)
  42. {
  43. unsigned workerid = workerids[i];
  44. starpu_sched_ctx_worker_shares_tasks_lists(workerid, sched_ctx_id);
  45. data->possible_combinations_cnt[workerid] = 0;
  46. int cnt = data->possible_combinations_cnt[workerid]++;
  47. _STARPU_CALLOC(data->possible_combinations[workerid], ncombined_workers, sizeof(int));
  48. _STARPU_CALLOC(data->possible_combinations_size[workerid], ncombined_workers, sizeof(int));
  49. data->possible_combinations[workerid][cnt] = workerid;
  50. data->possible_combinations_size[workerid][cnt] = 1;
  51. data->max_combination_size[workerid] = 1;
  52. }
  53. for (i = 0; i < ncombined_workers; i++)
  54. {
  55. unsigned combined_workerid = nbasic_workers + i;
  56. int *workers;
  57. int size;
  58. starpu_combined_worker_get_description(combined_workerid, &size, &workers);
  59. int master = workers[0];
  60. if (size > data->max_combination_size[master])
  61. {
  62. data->max_combination_size[master] = size;
  63. }
  64. int cnt = data->possible_combinations_cnt[master]++;
  65. data->possible_combinations[master][cnt] = combined_workerid;
  66. data->possible_combinations_size[master][cnt] = size;
  67. }
  68. for(i = 0; i < nworkers; i++)
  69. {
  70. unsigned workerid = workerids[i];
  71. /* slaves pick up tasks from their local queue, their master
  72. * will put tasks directly in that local list when a parallel
  73. * tasks comes. */
  74. data->local_fifo[workerid] = _starpu_create_fifo();
  75. }
  76. }
  77. static void peager_remove_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
  78. {
  79. struct _starpu_peager_data *data = (struct _starpu_peager_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  80. unsigned i;
  81. for(i = 0; i < nworkers; i++)
  82. {
  83. int workerid = workerids[i];
  84. if(!starpu_worker_is_combined_worker(workerid))
  85. {
  86. _starpu_destroy_fifo(data->local_fifo[workerid]);
  87. free(data->possible_combinations[workerid]);
  88. data->possible_combinations[workerid] = NULL;
  89. free(data->possible_combinations_size[workerid]);
  90. data->possible_combinations_size[workerid] = NULL;
  91. }
  92. }
  93. }
  94. static void initialize_peager_policy(unsigned sched_ctx_id)
  95. {
  96. struct _starpu_peager_data *data;
  97. _STARPU_MALLOC(data, sizeof(struct _starpu_peager_data));
  98. /* masters pick tasks from that queue */
  99. data->fifo = _starpu_create_fifo();
  100. starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)data);
  101. STARPU_PTHREAD_MUTEX_INIT(&data->policy_mutex, NULL);
  102. }
  103. static void deinitialize_peager_policy(unsigned sched_ctx_id)
  104. {
  105. /* TODO check that there is no task left in the queue */
  106. struct _starpu_peager_data *data = (struct _starpu_peager_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  107. /* deallocate the job queue */
  108. _starpu_destroy_fifo(data->fifo);
  109. STARPU_PTHREAD_MUTEX_DESTROY(&data->policy_mutex);
  110. free(data);
  111. }
  112. static int push_task_peager_policy(struct starpu_task *task)
  113. {
  114. unsigned sched_ctx_id = task->sched_ctx;
  115. int ret_val;
  116. struct _starpu_peager_data *data = (struct _starpu_peager_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  117. STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
  118. ret_val = _starpu_fifo_push_task(data->fifo, task);
  119. #ifndef STARPU_NON_BLOCKING_DRIVERS
  120. int is_parallel_task = task->cl && task->cl->max_parallelism > 1;
  121. #endif
  122. starpu_push_task_end(task);
  123. STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
  124. #ifndef STARPU_NON_BLOCKING_DRIVERS
  125. /* if there are no tasks block */
  126. /* wake people waiting for a task */
  127. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  128. struct starpu_sched_ctx_iterator it;
  129. const unsigned ncombined_workers = starpu_combined_worker_get_count();
  130. workers->init_iterator(workers, &it);
  131. while(workers->has_next(workers, &it))
  132. {
  133. int worker = workers->get_next(workers, &it);
  134. /* If this is not a CPU or a MIC, then the worker simply grabs tasks from the fifo */
  135. if (starpu_worker_is_combined_worker(worker))
  136. {
  137. continue;
  138. }
  139. if (starpu_worker_get_type(worker) != STARPU_MIC_WORKER
  140. && starpu_worker_get_type(worker) != STARPU_CPU_WORKER)
  141. {
  142. _starpu_wake_worker_relax_light(worker);
  143. continue;
  144. }
  145. if ((!is_parallel_task) /* This is not a parallel task, can wake any worker */
  146. || (ncombined_workers == 0) /* There is no combined worker */
  147. || (data->max_combination_size[worker] > 1) /* This is a combined worker master and the task is parallel */
  148. )
  149. {
  150. _starpu_wake_worker_relax_light(worker);
  151. }
  152. }
  153. #endif
  154. return ret_val;
  155. }
  156. static struct starpu_task *pop_task_peager_policy(unsigned sched_ctx_id)
  157. {
  158. struct _starpu_peager_data *data = (struct _starpu_peager_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  159. int workerid = starpu_worker_get_id_check();
  160. /* If this is not a CPU or a MIC, then the worker simply grabs tasks from the fifo */
  161. if (starpu_worker_get_type(workerid) != STARPU_CPU_WORKER && starpu_worker_get_type(workerid) != STARPU_MIC_WORKER)
  162. {
  163. struct starpu_task *task = NULL;
  164. _starpu_worker_relax_on();
  165. STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
  166. _starpu_worker_relax_off();
  167. task = _starpu_fifo_pop_task(data->fifo, workerid);
  168. STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
  169. return task;
  170. }
  171. const unsigned ncombined_workers = starpu_combined_worker_get_count();
  172. struct starpu_task *task = NULL;
  173. int slave_task = 0;
  174. _starpu_worker_relax_on();
  175. STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
  176. _starpu_worker_relax_off();
  177. /* check if a slave task is available in the local queue */
  178. task = _starpu_fifo_pop_task(data->local_fifo[workerid], workerid);
  179. if (!task)
  180. {
  181. /* no slave task, try to pop a task as master */
  182. task = _starpu_fifo_pop_task(data->fifo, workerid);
  183. if (task)
  184. {
  185. _STARPU_DEBUG("poping master task %p\n", task);
  186. }
  187. #if 1
  188. /* Optional heuristic to filter out purely slave workers for parallel tasks */
  189. if (task && task->cl && task->cl->max_parallelism > 1 && data->max_combination_size[workerid] == 1 && ncombined_workers > 0)
  190. {
  191. /* task is potentially parallel, leave it for a combined worker master */
  192. _starpu_fifo_push_back_task(data->fifo, task);
  193. task = NULL;
  194. }
  195. #endif
  196. }
  197. else
  198. {
  199. slave_task = 1;
  200. _STARPU_DEBUG("poping slave task %p\n", task);
  201. }
  202. if (!task || slave_task)
  203. {
  204. STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
  205. goto ret;
  206. }
  207. /* Find the largest compatible worker combination */
  208. int best_size = -1;
  209. int best_workerid = -1;
  210. int i;
  211. for (i = 0; i < data->possible_combinations_cnt[workerid]; i++)
  212. {
  213. if (data->possible_combinations_size[workerid][i] > best_size)
  214. {
  215. int combined_worker = data->possible_combinations[workerid][i];
  216. if (starpu_combined_worker_can_execute_task(combined_worker, task, 0))
  217. {
  218. best_size = data->possible_combinations_size[workerid][i];
  219. best_workerid = combined_worker;
  220. }
  221. }
  222. }
  223. _STARPU_DEBUG("task %p, best_workerid=%d, best_size=%d\n", task, best_workerid, best_size);
  224. /* In case nobody can execute this task, we let the master
  225. * worker take it anyway, so that it can discard it afterward.
  226. * */
  227. if (best_workerid == -1)
  228. {
  229. STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
  230. goto ret;
  231. }
  232. /* Is this a basic worker or a combined worker ? */
  233. if (best_workerid < starpu_worker_get_count())
  234. {
  235. STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
  236. /* The master is alone */
  237. goto ret;
  238. }
  239. starpu_parallel_task_barrier_init(task, best_workerid);
  240. int worker_size = 0;
  241. int *combined_workerid;
  242. starpu_combined_worker_get_description(best_workerid, &worker_size, &combined_workerid);
  243. _STARPU_DEBUG("dispatching task %p on combined worker %d of size %d\n", task, best_workerid, worker_size);
  244. /* Dispatch task aliases to the different slaves */
  245. for (i = 1; i < worker_size; i++)
  246. {
  247. struct starpu_task *alias = starpu_task_dup(task);
  248. int local_worker = combined_workerid[i];
  249. alias->destroy = 1;
  250. _starpu_fifo_push_task(data->local_fifo[local_worker], alias);
  251. }
  252. /* The master also manipulated an alias */
  253. struct starpu_task *master_alias = starpu_task_dup(task);
  254. master_alias->destroy = 1;
  255. task = master_alias;
  256. STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
  257. for (i = 1; i < worker_size; i++)
  258. {
  259. int local_worker = combined_workerid[i];
  260. _starpu_worker_lock(local_worker);
  261. #if !defined(STARPU_NON_BLOCKING_DRIVERS) || defined(STARPU_SIMGRID)
  262. starpu_wake_worker_locked(local_worker);
  263. #endif
  264. _starpu_worker_unlock(local_worker);
  265. }
  266. ret:
  267. return task;
  268. }
  269. struct starpu_sched_policy _starpu_sched_peager_policy =
  270. {
  271. .init_sched = initialize_peager_policy,
  272. .deinit_sched = deinitialize_peager_policy,
  273. .add_workers = peager_add_workers,
  274. .remove_workers = peager_remove_workers,
  275. .push_task = push_task_peager_policy,
  276. .pop_task = pop_task_peager_policy,
  277. .pre_exec_hook = NULL,
  278. .post_exec_hook = NULL,
  279. .pop_every_task = NULL,
  280. .policy_name = "peager",
  281. .policy_description = "parallel eager policy",
  282. .worker_type = STARPU_WORKER_LIST,
  283. };