parallel_greedy.c 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2011 Université de Bordeaux 1
  4. * Copyright (C) 2011 Télécom-SudParis
  5. *
  6. * StarPU is free software; you can redistribute it and/or modify
  7. * it under the terms of the GNU Lesser General Public License as published by
  8. * the Free Software Foundation; either version 2.1 of the License, or (at
  9. * your option) any later version.
  10. *
  11. * StarPU is distributed in the hope that it will be useful, but
  12. * WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  14. *
  15. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  16. */
  17. #include <core/workers.h>
  18. #include <sched_policies/fifo_queues.h>
  19. #include <common/barrier.h>
  20. #include <sched_policies/detect_combined_workers.h>
  21. /* the former is the actual queue, the latter some container */
  22. static struct _starpu_fifo_taskq *fifo;
  23. static struct _starpu_fifo_taskq *local_fifo[STARPU_NMAXWORKERS];
  24. static int master_id[STARPU_NMAXWORKERS];
  25. static pthread_cond_t sched_cond;
  26. static pthread_mutex_t sched_mutex;
  27. static pthread_cond_t master_sched_cond[STARPU_NMAXWORKERS];
  28. static pthread_mutex_t master_sched_mutex[STARPU_NMAXWORKERS];
  29. /* XXX instead of 10, we should use some "MAX combination .."*/
  30. static int possible_combinations_cnt[STARPU_NMAXWORKERS];
  31. static int possible_combinations[STARPU_NMAXWORKERS][10];
  32. static int possible_combinations_size[STARPU_NMAXWORKERS][10];
  33. static void initialize_pgreedy_policy(struct starpu_machine_topology *topology,
  34. __attribute__ ((unused)) struct starpu_sched_policy *_policy)
  35. {
  36. /* masters pick tasks from that queue */
  37. fifo = _starpu_create_fifo();
  38. _starpu_sched_find_worker_combinations(topology);
  39. unsigned workerid;
  40. unsigned ncombinedworkers, nworkers;
  41. nworkers = topology->nworkers;
  42. ncombinedworkers = starpu_combined_worker_get_count();
  43. /* Find the master of each worker. We first assign the worker as its
  44. * own master, and then iterate over the different worker combinations
  45. * to find the biggest combination containing this worker. */
  46. for (workerid = 0; workerid < nworkers; workerid++)
  47. {
  48. int cnt = possible_combinations_cnt[workerid]++;
  49. possible_combinations[workerid][cnt] = workerid;
  50. possible_combinations_size[workerid][cnt] = 1;
  51. master_id[workerid] = workerid;
  52. }
  53. unsigned i;
  54. for (i = 0; i < ncombinedworkers; i++)
  55. {
  56. workerid = nworkers + i;
  57. /* Note that we ASSUME that the workers are sorted by size ! */
  58. int *workers;
  59. int size;
  60. starpu_combined_worker_get_description(workerid, &size, &workers);
  61. int master = workers[0];
  62. int j;
  63. for (j = 0; j < size; j++)
  64. {
  65. if (master_id[workers[j]] > master)
  66. master_id[workers[j]] = master;
  67. int cnt = possible_combinations_cnt[workers[j]]++;
  68. possible_combinations[workers[j]][cnt] = workerid;
  69. possible_combinations_size[workers[j]][cnt] = size;
  70. }
  71. }
  72. _STARPU_PTHREAD_MUTEX_INIT(&sched_mutex, NULL);
  73. _STARPU_PTHREAD_COND_INIT(&sched_cond, NULL);
  74. for (workerid = 0; workerid < nworkers; workerid++)
  75. {
  76. _STARPU_PTHREAD_MUTEX_INIT(&master_sched_mutex[workerid], NULL);
  77. _STARPU_PTHREAD_COND_INIT(&master_sched_cond[workerid], NULL);
  78. }
  79. for (workerid = 0; workerid < nworkers; workerid++)
  80. {
  81. /* slaves pick up tasks from their local queue, their master
  82. * will put tasks directly in that local list when a parallel
  83. * tasks comes. */
  84. local_fifo[workerid] = _starpu_create_fifo();
  85. unsigned master = master_id[workerid];
  86. /* All masters use the same condition/mutex */
  87. if (master == workerid)
  88. {
  89. starpu_worker_set_sched_condition(workerid,
  90. &sched_cond, &sched_mutex);
  91. }
  92. else {
  93. starpu_worker_set_sched_condition(workerid,
  94. &master_sched_cond[master],
  95. &master_sched_mutex[master]);
  96. }
  97. }
  98. #if 0
  99. for (workerid = 0; workerid < nworkers; workerid++)
  100. {
  101. fprintf(stderr, "MASTER of %d = %d\n", workerid, master_id[workerid]);
  102. }
  103. #endif
  104. }
  105. static void deinitialize_pgreedy_policy(__attribute__ ((unused)) struct starpu_machine_topology *topology,
  106. __attribute__ ((unused)) struct starpu_sched_policy *_policy)
  107. {
  108. /* TODO check that there is no task left in the queue */
  109. /* deallocate the job queue */
  110. _starpu_destroy_fifo(fifo);
  111. }
  112. static int push_task_pgreedy_policy(struct starpu_task *task)
  113. {
  114. return _starpu_fifo_push_task(fifo, &sched_mutex, &sched_cond, task);
  115. }
  116. static struct starpu_task *pop_task_pgreedy_policy(void)
  117. {
  118. int workerid = starpu_worker_get_id();
  119. /* If this is not a CPU, then the worker simply grabs tasks from the fifo */
  120. if (starpu_worker_get_type(workerid) != STARPU_CPU_WORKER)
  121. return _starpu_fifo_pop_task(fifo, workerid);
  122. int master = master_id[workerid];
  123. if (master == workerid)
  124. {
  125. /* The worker is a master */
  126. struct starpu_task *task = _starpu_fifo_pop_task(fifo, workerid);
  127. if (!task)
  128. return NULL;
  129. /* Find the largest compatible worker combination */
  130. int best_size = -1;
  131. int best_workerid = -1;
  132. int i;
  133. for (i = 0; i < possible_combinations_cnt[master]; i++)
  134. {
  135. if (possible_combinations_size[workerid][i] > best_size)
  136. {
  137. int combined_worker = possible_combinations[workerid][i];
  138. if (starpu_combined_worker_can_execute_task(combined_worker, task, 0))
  139. {
  140. best_size = possible_combinations_size[workerid][i];
  141. best_workerid = combined_worker;
  142. }
  143. }
  144. }
  145. /* In case nobody can execute this task, we let the master
  146. * worker take it anyway, so that it can discard it afterward.
  147. * */
  148. if (best_workerid == -1)
  149. return task;
  150. /* Is this a basic worker or a combined worker ? */
  151. int nbasic_workers = (int)starpu_worker_get_count();
  152. int is_basic_worker = (best_workerid < nbasic_workers);
  153. if (is_basic_worker)
  154. {
  155. /* The master is alone */
  156. return task;
  157. }
  158. else {
  159. /* The master needs to dispatch the task between the
  160. * different combined workers */
  161. struct _starpu_combined_worker *combined_worker;
  162. combined_worker = _starpu_get_combined_worker_struct(best_workerid);
  163. int worker_size = combined_worker->worker_size;
  164. int *combined_workerid = combined_worker->combined_workerid;
  165. starpu_job_t j = _starpu_get_job_associated_to_task(task);
  166. j->task_size = worker_size;
  167. j->combined_workerid = best_workerid;
  168. j->active_task_alias_count = 0;
  169. //fprintf(stderr, "POP -> size %d best_size %d\n", worker_size, best_size);
  170. _STARPU_PTHREAD_BARRIER_INIT(&j->before_work_barrier, NULL, worker_size);
  171. _STARPU_PTHREAD_BARRIER_INIT(&j->after_work_barrier, NULL, worker_size);
  172. /* Dispatch task aliases to the different slaves */
  173. for (i = 1; i < worker_size; i++)
  174. {
  175. struct starpu_task *alias = _starpu_create_task_alias(task);
  176. int local_worker = combined_workerid[i];
  177. _starpu_fifo_push_task(local_fifo[local_worker],
  178. &master_sched_mutex[master],
  179. &master_sched_cond[master], alias);
  180. }
  181. /* The master also manipulated an alias */
  182. struct starpu_task *master_alias = _starpu_create_task_alias(task);
  183. return master_alias;
  184. }
  185. }
  186. else {
  187. /* The worker is a slave */
  188. return _starpu_fifo_pop_task(local_fifo[workerid], workerid);
  189. }
  190. }
  191. struct starpu_sched_policy _starpu_sched_pgreedy_policy = {
  192. .init_sched = initialize_pgreedy_policy,
  193. .deinit_sched = deinitialize_pgreedy_policy,
  194. .push_task = push_task_pgreedy_policy,
  195. .pop_task = pop_task_pgreedy_policy,
  196. .post_exec_hook = NULL,
  197. .pop_every_task = NULL,
  198. .policy_name = "pgreedy",
  199. .policy_description = "parallel greedy policy"
  200. };