parallel_greedy.c 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2011 Université de Bordeaux 1
  4. * Copyright (C) 2011 Télécom-SudParis
  5. *
  6. * StarPU is free software; you can redistribute it and/or modify
  7. * it under the terms of the GNU Lesser General Public License as published by
  8. * the Free Software Foundation; either version 2.1 of the License, or (at
  9. * your option) any later version.
  10. *
  11. * StarPU is distributed in the hope that it will be useful, but
  12. * WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  14. *
  15. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  16. */
  17. #include <core/workers.h>
  18. #include <sched_policies/fifo_queues.h>
  19. #include <common/barrier.h>
  20. /* the former is the actual queue, the latter some container */
  21. static struct starpu_fifo_taskq_s *fifo;
  22. static struct starpu_fifo_taskq_s *local_fifo[STARPU_NMAXWORKERS];
  23. static int master_id[STARPU_NMAXWORKERS];
  24. static pthread_cond_t sched_cond;
  25. static pthread_mutex_t sched_mutex;
  26. static pthread_cond_t master_sched_cond[STARPU_NMAXWORKERS];
  27. static pthread_mutex_t master_sched_mutex[STARPU_NMAXWORKERS];
  28. /* XXX instead of 10, we should use some "MAX combination .."*/
  29. static int possible_combinations_cnt[STARPU_NMAXWORKERS];
  30. static int possible_combinations[STARPU_NMAXWORKERS][10];
  31. static int possible_combinations_size[STARPU_NMAXWORKERS][10];
  32. static void initialize_pgreedy_policy(struct starpu_machine_topology_s *topology,
  33. __attribute__ ((unused)) struct starpu_sched_policy_s *_policy)
  34. {
  35. /* masters pick tasks from that queue */
  36. fifo = _starpu_create_fifo();
  37. _starpu_sched_find_worker_combinations(topology);
  38. unsigned workerid;
  39. unsigned ncombinedworkers, nworkers;
  40. nworkers = topology->nworkers;
  41. ncombinedworkers = starpu_combined_worker_get_count();
  42. /* Find the master of each worker. We first assign the worker as its
  43. * own master, and then iterate over the different worker combinations
  44. * to find the biggest combination containing this worker. */
  45. for (workerid = 0; workerid < nworkers; workerid++)
  46. {
  47. int cnt = possible_combinations_cnt[workerid]++;
  48. possible_combinations[workerid][cnt] = workerid;
  49. possible_combinations_size[workerid][cnt] = 1;
  50. master_id[workerid] = workerid;
  51. }
  52. unsigned i;
  53. for (i = 0; i < ncombinedworkers; i++)
  54. {
  55. workerid = nworkers + i;
  56. /* Note that we ASSUME that the workers are sorted by size ! */
  57. int *workers;
  58. int size;
  59. starpu_combined_worker_get_description(workerid, &size, &workers);
  60. int master = workers[0];
  61. int j;
  62. for (j = 0; j < size; j++)
  63. {
  64. if (master_id[workers[j]] > master)
  65. master_id[workers[j]] = master;
  66. int cnt = possible_combinations_cnt[workers[j]]++;
  67. possible_combinations[workers[j]][cnt] = workerid;
  68. possible_combinations_size[workers[j]][cnt] = size;
  69. }
  70. }
  71. PTHREAD_MUTEX_INIT(&sched_mutex, NULL);
  72. PTHREAD_COND_INIT(&sched_cond, NULL);
  73. for (workerid = 0; workerid < nworkers; workerid++)
  74. {
  75. PTHREAD_MUTEX_INIT(&master_sched_mutex[workerid], NULL);
  76. PTHREAD_COND_INIT(&master_sched_cond[workerid], NULL);
  77. }
  78. for (workerid = 0; workerid < nworkers; workerid++)
  79. {
  80. /* slaves pick up tasks from their local queue, their master
  81. * will put tasks directly in that local list when a parallel
  82. * tasks comes. */
  83. local_fifo[workerid] = _starpu_create_fifo();
  84. unsigned master = master_id[workerid];
  85. /* All masters use the same condition/mutex */
  86. if (master == workerid)
  87. {
  88. starpu_worker_set_sched_condition(workerid,
  89. &sched_cond, &sched_mutex);
  90. }
  91. else {
  92. starpu_worker_set_sched_condition(workerid,
  93. &master_sched_cond[master],
  94. &master_sched_mutex[master]);
  95. }
  96. }
  97. #if 0
  98. for (workerid = 0; workerid < nworkers; workerid++)
  99. {
  100. fprintf(stderr, "MASTER of %d = %d\n", workerid, master_id[workerid]);
  101. }
  102. #endif
  103. }
  104. static void deinitialize_pgreedy_policy(__attribute__ ((unused)) struct starpu_machine_topology_s *topology,
  105. __attribute__ ((unused)) struct starpu_sched_policy_s *_policy)
  106. {
  107. /* TODO check that there is no task left in the queue */
  108. /* deallocate the job queue */
  109. _starpu_destroy_fifo(fifo);
  110. }
  111. static int push_task_pgreedy_policy(struct starpu_task *task)
  112. {
  113. return _starpu_fifo_push_task(fifo, &sched_mutex, &sched_cond, task);
  114. }
  115. static struct starpu_task *pop_task_pgreedy_policy(void)
  116. {
  117. int workerid = starpu_worker_get_id();
  118. /* If this is not a CPU, then the worker simply grabs tasks from the fifo */
  119. if (starpu_worker_get_type(workerid) != STARPU_CPU_WORKER)
  120. return _starpu_fifo_pop_task(fifo, workerid);
  121. int master = master_id[workerid];
  122. if (master == workerid)
  123. {
  124. /* The worker is a master */
  125. struct starpu_task *task = _starpu_fifo_pop_task(fifo, workerid);
  126. if (!task)
  127. return NULL;
  128. /* Find the largest compatible worker combination */
  129. int best_size = -1;
  130. int best_workerid = -1;
  131. int i;
  132. for (i = 0; i < possible_combinations_cnt[master]; i++)
  133. {
  134. if (possible_combinations_size[workerid][i] > best_size)
  135. {
  136. int combined_worker = possible_combinations[workerid][i];
  137. if (starpu_combined_worker_may_execute_task(combined_worker, task, 0))
  138. {
  139. best_size = possible_combinations_size[workerid][i];
  140. best_workerid = combined_worker;
  141. }
  142. }
  143. }
  144. /* In case nobody can execute this task, we let the master
  145. * worker take it anyway, so that it can discard it afterward.
  146. * */
  147. if (best_workerid == -1)
  148. return task;
  149. /* Is this a basic worker or a combined worker ? */
  150. int nbasic_workers = (int)starpu_worker_get_count();
  151. int is_basic_worker = (best_workerid < nbasic_workers);
  152. if (is_basic_worker)
  153. {
  154. /* The master is alone */
  155. return task;
  156. }
  157. else {
  158. /* The master needs to dispatch the task between the
  159. * different combined workers */
  160. struct starpu_combined_worker_s *combined_worker;
  161. combined_worker = _starpu_get_combined_worker_struct(best_workerid);
  162. int worker_size = combined_worker->worker_size;
  163. int *combined_workerid = combined_worker->combined_workerid;
  164. starpu_job_t j = _starpu_get_job_associated_to_task(task);
  165. j->task_size = worker_size;
  166. j->combined_workerid = best_workerid;
  167. j->active_task_alias_count = 0;
  168. //fprintf(stderr, "POP -> size %d best_size %d\n", worker_size, best_size);
  169. PTHREAD_BARRIER_INIT(&j->before_work_barrier, NULL, worker_size);
  170. PTHREAD_BARRIER_INIT(&j->after_work_barrier, NULL, worker_size);
  171. /* Dispatch task aliases to the different slaves */
  172. for (i = 1; i < worker_size; i++)
  173. {
  174. struct starpu_task *alias = _starpu_create_task_alias(task);
  175. int local_worker = combined_workerid[i];
  176. _starpu_fifo_push_task(local_fifo[local_worker],
  177. &master_sched_mutex[master],
  178. &master_sched_cond[master], alias);
  179. }
  180. /* The master also manipulated an alias */
  181. struct starpu_task *master_alias = _starpu_create_task_alias(task);
  182. return master_alias;
  183. }
  184. }
  185. else {
  186. /* The worker is a slave */
  187. return _starpu_fifo_pop_task(local_fifo[workerid], workerid);
  188. }
  189. }
  190. struct starpu_sched_policy_s _starpu_sched_pgreedy_policy = {
  191. .init_sched = initialize_pgreedy_policy,
  192. .deinit_sched = deinitialize_pgreedy_policy,
  193. .push_task = push_task_pgreedy_policy,
  194. .pop_task = pop_task_pgreedy_policy,
  195. .post_exec_hook = NULL,
  196. .pop_every_task = NULL,
  197. .policy_name = "pgreedy",
  198. .policy_description = "parallel greedy policy"
  199. };