parallel_eager.c 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2011-2016 Université de Bordeaux
  4. * Copyright (C) 2011 Télécom-SudParis
  5. * Copyright (C) 2011-2013 INRIA
  6. * Copyright (C) 2016, 2017 CNRS
  7. *
  8. * StarPU is free software; you can redistribute it and/or modify
  9. * it under the terms of the GNU Lesser General Public License as published by
  10. * the Free Software Foundation; either version 2.1 of the License, or (at
  11. * your option) any later version.
  12. *
  13. * StarPU is distributed in the hope that it will be useful, but
  14. * WITHOUT ANY WARRANTY; without even the implied warranty of
  15. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  16. *
  17. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  18. */
  19. #include <sched_policies/fifo_queues.h>
  20. #include <core/detect_combined_workers.h>
  21. #include <starpu_scheduler.h>
  22. #include <core/workers.h>
  23. struct _starpu_peager_data
  24. {
  25. struct _starpu_fifo_taskq *fifo;
  26. struct _starpu_fifo_taskq *local_fifo[STARPU_NMAXWORKERS];
  27. int master_id[STARPU_NMAXWORKERS];
  28. starpu_pthread_mutex_t policy_mutex;
  29. };
  30. #define STARPU_NMAXCOMBINED_WORKERS 520
  31. /* instead of STARPU_NMAXCOMBINED_WORKERS, we should use some "MAX combination .."*/
  32. static int possible_combinations_cnt[STARPU_NMAXWORKERS];
  33. static int possible_combinations[STARPU_NMAXWORKERS][STARPU_NMAXCOMBINED_WORKERS];
  34. static int possible_combinations_size[STARPU_NMAXWORKERS][STARPU_NMAXCOMBINED_WORKERS];
  35. /*!!!!!!! It doesn't work with several contexts because the combined workers are constructed
  36. from the workers available to the program, and not to the context !!!!!!!!!!!!!!!!!!!!!!!
  37. */
  38. static void peager_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
  39. {
  40. _starpu_sched_find_worker_combinations(workerids, nworkers);
  41. struct _starpu_peager_data *data = (struct _starpu_peager_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  42. unsigned nbasic_workers = starpu_worker_get_count();
  43. unsigned ncombined_workers= starpu_combined_worker_get_count();
  44. unsigned workerid, i;
  45. /* Find the master of each worker. We first assign the worker as its
  46. * own master, and then iterate over the different worker combinations
  47. * to find the biggest combination containing this worker. */
  48. for(i = 0; i < nworkers; i++)
  49. {
  50. workerid = workerids[i];
  51. starpu_sched_ctx_worker_shares_tasks_lists(workerid, sched_ctx_id);
  52. int cnt = possible_combinations_cnt[workerid]++;
  53. possible_combinations[workerid][cnt] = workerid;
  54. possible_combinations_size[workerid][cnt] = 1;
  55. data->master_id[workerid] = workerid;
  56. }
  57. for (i = 0; i < ncombined_workers; i++)
  58. {
  59. workerid = nbasic_workers + i;
  60. /* Note that we ASSUME that the workers are sorted by size ! */
  61. int *workers;
  62. int size;
  63. starpu_combined_worker_get_description(workerid, &size, &workers);
  64. int master = workers[0];
  65. int j;
  66. for (j = 0; j < size; j++)
  67. {
  68. if (data->master_id[workers[j]] > master)
  69. data->master_id[workers[j]] = master;
  70. int cnt = possible_combinations_cnt[workers[j]]++;
  71. possible_combinations[workers[j]][cnt] = workerid;
  72. possible_combinations_size[workers[j]][cnt] = size;
  73. }
  74. }
  75. for(i = 0; i < nworkers; i++)
  76. {
  77. workerid = workerids[i];
  78. /* slaves pick up tasks from their local queue, their master
  79. * will put tasks directly in that local list when a parallel
  80. * tasks comes. */
  81. data->local_fifo[workerid] = _starpu_create_fifo();
  82. }
  83. #if 0
  84. for(i = 0; i < nworkers; i++)
  85. {
  86. workerid = workerids[i];
  87. _STARPU_MSG("MASTER of %d = %d\n", workerid, master_id[workerid]);
  88. }
  89. #endif
  90. }
  91. static void peager_remove_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
  92. {
  93. struct _starpu_peager_data *data = (struct _starpu_peager_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  94. unsigned i;
  95. for(i = 0; i < nworkers; i++)
  96. {
  97. int workerid = workerids[i];
  98. if(!starpu_worker_is_combined_worker(workerid))
  99. _starpu_destroy_fifo(data->local_fifo[workerid]);
  100. }
  101. }
  102. static void initialize_peager_policy(unsigned sched_ctx_id)
  103. {
  104. struct _starpu_peager_data *data;
  105. _STARPU_MALLOC(data, sizeof(struct _starpu_peager_data));
  106. /* masters pick tasks from that queue */
  107. data->fifo = _starpu_create_fifo();
  108. starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)data);
  109. STARPU_PTHREAD_MUTEX_INIT(&data->policy_mutex, NULL);
  110. }
  111. static void deinitialize_peager_policy(unsigned sched_ctx_id)
  112. {
  113. /* TODO check that there is no task left in the queue */
  114. struct _starpu_peager_data *data = (struct _starpu_peager_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  115. /* deallocate the job queue */
  116. _starpu_destroy_fifo(data->fifo);
  117. STARPU_PTHREAD_MUTEX_DESTROY(&data->policy_mutex);
  118. free(data);
  119. }
  120. static int push_task_peager_policy(struct starpu_task *task)
  121. {
  122. unsigned sched_ctx_id = task->sched_ctx;
  123. int ret_val;
  124. struct _starpu_peager_data *data = (struct _starpu_peager_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  125. STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
  126. ret_val = _starpu_fifo_push_task(data->fifo, task);
  127. starpu_push_task_end(task);
  128. STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
  129. #ifndef STARPU_NON_BLOCKING_DRIVERS
  130. /* if there are no tasks block */
  131. /* wake people waiting for a task */
  132. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  133. struct starpu_sched_ctx_iterator it;
  134. workers->init_iterator(workers, &it);
  135. while(workers->has_next(workers, &it))
  136. {
  137. int worker = workers->get_next(workers, &it);
  138. int master = data->master_id[worker];
  139. /* If this is not a CPU or a MIC, then the worker simply grabs tasks from the fifo */
  140. if ((!starpu_worker_is_combined_worker(worker) &&
  141. starpu_worker_get_type(worker) != STARPU_MIC_WORKER &&
  142. starpu_worker_get_type(worker) != STARPU_CPU_WORKER)
  143. || (master == worker))
  144. starpu_wake_worker(worker);
  145. }
  146. #endif
  147. return ret_val;
  148. }
  149. static struct starpu_task *pop_task_peager_policy(unsigned sched_ctx_id)
  150. {
  151. _starpu_worker_enter_section_safe_for_observation();
  152. struct _starpu_peager_data *data = (struct _starpu_peager_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  153. int workerid = starpu_worker_get_id_check();
  154. /* If this is not a CPU or a MIC, then the worker simply grabs tasks from the fifo */
  155. if (starpu_worker_get_type(workerid) != STARPU_CPU_WORKER && starpu_worker_get_type(workerid) != STARPU_MIC_WORKER)
  156. {
  157. struct starpu_task *task = NULL;
  158. STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
  159. _starpu_worker_leave_section_safe_for_observation();
  160. task = _starpu_fifo_pop_task(data->fifo, workerid);
  161. STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
  162. return task;
  163. }
  164. int master = data->master_id[workerid];
  165. //_STARPU_DEBUG("workerid:%d, master:%d\n",workerid,master);
  166. struct starpu_task *task = NULL;
  167. if (master == workerid)
  168. {
  169. /* The worker is a master */
  170. STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
  171. _starpu_worker_leave_section_safe_for_observation();
  172. task = _starpu_fifo_pop_task(data->fifo, workerid);
  173. STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
  174. if (!task)
  175. goto ret;
  176. /* Find the largest compatible worker combination */
  177. int best_size = -1;
  178. int best_workerid = -1;
  179. int i;
  180. for (i = 0; i < possible_combinations_cnt[master]; i++)
  181. {
  182. if (possible_combinations_size[workerid][i] > best_size)
  183. {
  184. int combined_worker = possible_combinations[workerid][i];
  185. if (starpu_combined_worker_can_execute_task(combined_worker, task, 0))
  186. {
  187. best_size = possible_combinations_size[workerid][i];
  188. best_workerid = combined_worker;
  189. }
  190. }
  191. }
  192. /* In case nobody can execute this task, we let the master
  193. * worker take it anyway, so that it can discard it afterward.
  194. * */
  195. if (best_workerid == -1)
  196. goto ret;
  197. /* Is this a basic worker or a combined worker ? */
  198. int nbasic_workers = (int)starpu_worker_get_count();
  199. int is_basic_worker = (best_workerid < nbasic_workers);
  200. if (is_basic_worker)
  201. {
  202. /* The master is alone */
  203. goto ret;
  204. }
  205. else
  206. {
  207. starpu_parallel_task_barrier_init(task, best_workerid);
  208. int worker_size = 0;
  209. int *combined_workerid;
  210. starpu_combined_worker_get_description(best_workerid, &worker_size, &combined_workerid);
  211. /* Dispatch task aliases to the different slaves */
  212. for (i = 1; i < worker_size; i++)
  213. {
  214. struct starpu_task *alias = starpu_task_dup(task);
  215. int local_worker = combined_workerid[i];
  216. alias->destroy = 1;
  217. starpu_pthread_mutex_t *sched_mutex;
  218. starpu_pthread_cond_t *sched_cond;
  219. starpu_worker_get_sched_condition(local_worker, &sched_mutex, &sched_cond);
  220. STARPU_PTHREAD_MUTEX_LOCK_SCHED(sched_mutex);
  221. _starpu_fifo_push_task(data->local_fifo[local_worker], alias);
  222. #if !defined(STARPU_NON_BLOCKING_DRIVERS) || defined(STARPU_SIMGRID)
  223. starpu_wake_worker_locked(local_worker);
  224. #endif
  225. STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(sched_mutex);
  226. }
  227. /* The master also manipulated an alias */
  228. struct starpu_task *master_alias = starpu_task_dup(task);
  229. master_alias->destroy = 1;
  230. task = master_alias;
  231. goto ret;
  232. }
  233. }
  234. else
  235. {
  236. /* The worker is a slave */
  237. STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
  238. _starpu_worker_leave_section_safe_for_observation();
  239. task = _starpu_fifo_pop_task(data->local_fifo[workerid], workerid);
  240. STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
  241. }
  242. ret:
  243. return task;
  244. }
  245. struct starpu_sched_policy _starpu_sched_peager_policy =
  246. {
  247. .init_sched = initialize_peager_policy,
  248. .deinit_sched = deinitialize_peager_policy,
  249. .add_workers = peager_add_workers,
  250. .remove_workers = peager_remove_workers,
  251. .push_task = push_task_peager_policy,
  252. .pop_task = pop_task_peager_policy,
  253. .pre_exec_hook = NULL,
  254. .post_exec_hook = NULL,
  255. .pop_every_task = NULL,
  256. .policy_name = "peager",
  257. .policy_description = "parallel eager policy",
  258. .worker_type = STARPU_WORKER_LIST,
  259. };