parallel_eager.c 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2011-2013 Université de Bordeaux 1
  4. * Copyright (C) 2011 Télécom-SudParis
  5. * Copyright (C) 2011-2013 INRIA
  6. *
  7. * StarPU is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU Lesser General Public License as published by
  9. * the Free Software Foundation; either version 2.1 of the License, or (at
  10. * your option) any later version.
  11. *
  12. * StarPU is distributed in the hope that it will be useful, but
  13. * WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  15. *
  16. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  17. */
  18. #include <sched_policies/fifo_queues.h>
  19. #include <core/detect_combined_workers.h>
  20. #include <starpu_scheduler.h>
  21. #include <core/workers.h>
  22. struct _starpu_peager_data
  23. {
  24. struct _starpu_fifo_taskq *fifo;
  25. struct _starpu_fifo_taskq *local_fifo[STARPU_NMAXWORKERS];
  26. int master_id[STARPU_NMAXWORKERS];
  27. starpu_pthread_mutex_t policy_mutex;
  28. };
  29. #define STARPU_NMAXCOMBINED_WORKERS 10
  30. /* XXX instead of 10, we should use some "MAX combination .."*/
  31. static int possible_combinations_cnt[STARPU_NMAXWORKERS];
  32. static int possible_combinations[STARPU_NMAXWORKERS][STARPU_NMAXCOMBINED_WORKERS];
  33. static int possible_combinations_size[STARPU_NMAXWORKERS][STARPU_NMAXCOMBINED_WORKERS];
  34. /*!!!!!!! It doesn't work with several contexts because the combined workers are constructed
  35. from the workers available to the program, and not to the context !!!!!!!!!!!!!!!!!!!!!!!
  36. */
  37. static void peager_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
  38. {
  39. struct _starpu_peager_data *data = (struct _starpu_peager_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  40. unsigned nbasic_workers = starpu_worker_get_count();
  41. unsigned ncombined_workers = starpu_combined_worker_get_count();
  42. unsigned ntotal_workers = nbasic_workers + ncombined_workers;
  43. // _starpu_sched_find_worker_combinations(workerids, nworkers);
  44. unsigned workerid, i;
  45. unsigned ncombinedworkers;
  46. ncombinedworkers = starpu_combined_worker_get_count();
  47. /* Find the master of each worker. We first assign the worker as its
  48. * own master, and then iterate over the different worker combinations
  49. * to find the biggest combination containing this worker. */
  50. for(i = 0; i < nworkers; i++)
  51. {
  52. workerid = workerids[i];
  53. int cnt = possible_combinations_cnt[workerid]++;
  54. possible_combinations[workerid][cnt] = workerid;
  55. possible_combinations_size[workerid][cnt] = 1;
  56. data->master_id[workerid] = workerid;
  57. }
  58. for (i = 0; i < ncombinedworkers; i++)
  59. {
  60. workerid = ntotal_workers + i;
  61. /* Note that we ASSUME that the workers are sorted by size ! */
  62. int *workers;
  63. int size;
  64. starpu_combined_worker_get_description(workerid, &size, &workers);
  65. int master = workers[0];
  66. int j;
  67. for (j = 0; j < size; j++)
  68. {
  69. if (data->master_id[workers[j]] > master)
  70. data->master_id[workers[j]] = master;
  71. int cnt = possible_combinations_cnt[workers[j]]++;
  72. possible_combinations[workers[j]][cnt] = workerid;
  73. possible_combinations_size[workers[j]][cnt] = size;
  74. }
  75. }
  76. for(i = 0; i < nworkers; i++)
  77. {
  78. workerid = workerids[i];
  79. /* slaves pick up tasks from their local queue, their master
  80. * will put tasks directly in that local list when a parallel
  81. * tasks comes. */
  82. data->local_fifo[workerid] = _starpu_create_fifo();
  83. }
  84. #if 0
  85. for(i = 0; i < nworkers; i++)
  86. {
  87. workerid = workerids[i];
  88. fprintf(stderr, "MASTER of %d = %d\n", workerid, master_id[workerid]);
  89. }
  90. #endif
  91. }
  92. static void peager_remove_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
  93. {
  94. struct _starpu_peager_data *data = (struct _starpu_peager_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  95. int workerid;
  96. unsigned i;
  97. for(i = 0; i < nworkers; i++)
  98. {
  99. workerid = workerids[i];
  100. if(!starpu_worker_is_combined_worker(workerid))
  101. _starpu_destroy_fifo(data->local_fifo[workerid]);
  102. }
  103. }
  104. static void initialize_peager_policy(unsigned sched_ctx_id)
  105. {
  106. starpu_sched_ctx_create_worker_collection(sched_ctx_id, STARPU_WORKER_LIST);
  107. struct _starpu_peager_data *data = (struct _starpu_peager_data*)malloc(sizeof(struct _starpu_peager_data));
  108. /* masters pick tasks from that queue */
  109. data->fifo = _starpu_create_fifo();
  110. starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)data);
  111. STARPU_PTHREAD_MUTEX_INIT(&data->policy_mutex, NULL);
  112. }
  113. static void deinitialize_peager_policy(unsigned sched_ctx_id)
  114. {
  115. /* TODO check that there is no task left in the queue */
  116. struct _starpu_peager_data *data = (struct _starpu_peager_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  117. /* deallocate the job queue */
  118. _starpu_destroy_fifo(data->fifo);
  119. starpu_sched_ctx_delete_worker_collection(sched_ctx_id);
  120. STARPU_PTHREAD_MUTEX_DESTROY(&data->policy_mutex);
  121. free(data);
  122. }
  123. static int push_task_peager_policy(struct starpu_task *task)
  124. {
  125. unsigned sched_ctx_id = task->sched_ctx;
  126. int ret_val = -1;
  127. struct _starpu_peager_data *data = (struct _starpu_peager_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  128. STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
  129. ret_val = _starpu_fifo_push_task(data->fifo, task);
  130. starpu_push_task_end(task);
  131. STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
  132. /*if there are no tasks block */
  133. /* wake people waiting for a task */
  134. int worker = -1;
  135. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  136. struct starpu_sched_ctx_iterator it;
  137. if(workers->init_iterator)
  138. workers->init_iterator(workers, &it);
  139. while(workers->has_next(workers, &it))
  140. {
  141. worker = workers->get_next(workers, &it);
  142. int master = data->master_id[worker];
  143. /* If this is not a CPU, then the worker simply grabs tasks from the fifo */
  144. if (!starpu_worker_is_combined_worker(worker) &&
  145. starpu_worker_get_type(worker) != STARPU_CPU_WORKER || master == worker)
  146. {
  147. starpu_pthread_mutex_t *sched_mutex;
  148. starpu_pthread_cond_t *sched_cond;
  149. starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
  150. STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
  151. STARPU_PTHREAD_COND_SIGNAL(sched_cond);
  152. STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
  153. }
  154. }
  155. return ret_val;
  156. }
  157. static struct starpu_task *pop_task_peager_policy(unsigned sched_ctx_id)
  158. {
  159. struct _starpu_peager_data *data = (struct _starpu_peager_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  160. int workerid = starpu_worker_get_id();
  161. /* If this is not a CPU, then the worker simply grabs tasks from the fifo */
  162. if (starpu_worker_get_type(workerid) != STARPU_CPU_WORKER)
  163. {
  164. struct starpu_task *task = NULL;
  165. STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
  166. task = _starpu_fifo_pop_task(data->fifo, workerid);
  167. STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
  168. return task;
  169. }
  170. int master = data->master_id[workerid];
  171. if (master == workerid)
  172. {
  173. /* The worker is a master */
  174. struct starpu_task *task = NULL;
  175. STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
  176. task = _starpu_fifo_pop_task(data->fifo, workerid);
  177. STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
  178. if (!task)
  179. return NULL;
  180. /* Find the largest compatible worker combination */
  181. int best_size = -1;
  182. int best_workerid = -1;
  183. int i;
  184. for (i = 0; i < possible_combinations_cnt[master]; i++)
  185. {
  186. if (possible_combinations_size[workerid][i] > best_size)
  187. {
  188. int combined_worker = possible_combinations[workerid][i];
  189. if (starpu_combined_worker_can_execute_task(combined_worker, task, 0))
  190. {
  191. best_size = possible_combinations_size[workerid][i];
  192. best_workerid = combined_worker;
  193. }
  194. }
  195. }
  196. /* In case nobody can execute this task, we let the master
  197. * worker take it anyway, so that it can discard it afterward.
  198. * */
  199. if (best_workerid == -1)
  200. return task;
  201. /* Is this a basic worker or a combined worker ? */
  202. int nbasic_workers = (int)starpu_worker_get_count();
  203. int is_basic_worker = (best_workerid < nbasic_workers);
  204. if (is_basic_worker)
  205. {
  206. /* The master is alone */
  207. return task;
  208. }
  209. else
  210. {
  211. starpu_parallel_task_barrier_init(task, best_workerid);
  212. int worker_size = 0;
  213. int *combined_workerid;
  214. starpu_combined_worker_get_description(best_workerid, &worker_size, &combined_workerid);
  215. /* Dispatch task aliases to the different slaves */
  216. for (i = 1; i < worker_size; i++)
  217. {
  218. struct starpu_task *alias = starpu_task_dup(task);
  219. int local_worker = combined_workerid[i];
  220. starpu_pthread_mutex_t *sched_mutex;
  221. starpu_pthread_cond_t *sched_cond;
  222. starpu_worker_get_sched_condition(local_worker, &sched_mutex, &sched_cond);
  223. STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
  224. _starpu_fifo_push_task(data->local_fifo[local_worker], alias);
  225. STARPU_PTHREAD_COND_SIGNAL(sched_cond);
  226. STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
  227. }
  228. /* The master also manipulated an alias */
  229. struct starpu_task *master_alias = starpu_task_dup(task);
  230. return master_alias;
  231. }
  232. }
  233. else
  234. {
  235. /* The worker is a slave */
  236. return _starpu_fifo_pop_task(data->local_fifo[workerid], workerid);
  237. }
  238. }
  239. struct starpu_sched_policy _starpu_sched_peager_policy =
  240. {
  241. .init_sched = initialize_peager_policy,
  242. .deinit_sched = deinitialize_peager_policy,
  243. .add_workers = peager_add_workers,
  244. .remove_workers = peager_remove_workers,
  245. .push_task = push_task_peager_policy,
  246. .pop_task = pop_task_peager_policy,
  247. .pre_exec_hook = NULL,
  248. .post_exec_hook = NULL,
  249. .pop_every_task = NULL,
  250. .policy_name = "peager",
  251. .policy_description = "parallel eager policy"
  252. };