eager_central_policy.c 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2010-2014 Université de Bordeaux 1
  4. * Copyright (C) 2010-2013 Centre National de la Recherche Scientifique
  5. * Copyright (C) 2011 INRIA
  6. *
  7. * StarPU is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU Lesser General Public License as published by
  9. * the Free Software Foundation; either version 2.1 of the License, or (at
  10. * your option) any later version.
  11. *
  12. * StarPU is distributed in the hope that it will be useful, but
  13. * WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  15. *
  16. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  17. */
  18. /*
  19. * This is just the trivial policy where every worker use the same
  20. * JOB QUEUE.
  21. */
  22. #include <starpu_scheduler.h>
  23. #include <sched_policies/fifo_queues.h>
  24. #include <common/thread.h>
  25. #include <starpu_bitmap.h>
  26. struct _starpu_eager_center_policy_data
  27. {
  28. struct _starpu_fifo_taskq *fifo;
  29. starpu_pthread_mutex_t policy_mutex;
  30. struct starpu_bitmap *waiters;
  31. };
  32. static void initialize_eager_center_policy(unsigned sched_ctx_id)
  33. {
  34. #ifdef STARPU_HAVE_HWLOC
  35. starpu_sched_ctx_create_worker_collection(sched_ctx_id, STARPU_WORKER_TREE);
  36. #else
  37. starpu_sched_ctx_create_worker_collection(sched_ctx_id, STARPU_WORKER_LIST);
  38. #endif
  39. struct _starpu_eager_center_policy_data *data = (struct _starpu_eager_center_policy_data*)malloc(sizeof(struct _starpu_eager_center_policy_data));
  40. _STARPU_DISP("Warning: you are running the default eager scheduler, which is not very smart. Make sure to read the StarPU documentation about adding performance models in order to be able to use the dmda scheduler instead.\n");
  41. /* there is only a single queue in that trivial design */
  42. data->fifo = _starpu_create_fifo();
  43. data->waiters = starpu_bitmap_create();
  44. /* Tell helgrind that it's fine to check for empty fifo in
  45. * pop_task_eager_policy without actual mutex (it's just an integer)
  46. */
  47. STARPU_HG_DISABLE_CHECKING(data->fifo->ntasks);
  48. starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)data);
  49. STARPU_PTHREAD_MUTEX_INIT(&data->policy_mutex, NULL);
  50. }
  51. static void deinitialize_eager_center_policy(unsigned sched_ctx_id)
  52. {
  53. /* TODO check that there is no task left in the queue */
  54. struct _starpu_eager_center_policy_data *data = (struct _starpu_eager_center_policy_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  55. /* deallocate the job queue */
  56. _starpu_destroy_fifo(data->fifo);
  57. starpu_bitmap_destroy(data->waiters);
  58. starpu_sched_ctx_delete_worker_collection(sched_ctx_id);
  59. STARPU_PTHREAD_MUTEX_DESTROY(&data->policy_mutex);
  60. free(data);
  61. }
  62. static int push_task_eager_policy(struct starpu_task *task)
  63. {
  64. unsigned sched_ctx_id = task->sched_ctx;
  65. struct _starpu_eager_center_policy_data *data = (struct _starpu_eager_center_policy_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  66. STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
  67. starpu_task_list_push_back(&data->fifo->taskq,task);
  68. data->fifo->ntasks++;
  69. data->fifo->nprocessed++;
  70. starpu_push_task_end(task);
  71. /*if there are no tasks block */
  72. /* wake people waiting for a task */
  73. unsigned worker = 0;
  74. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  75. struct starpu_sched_ctx_iterator it;
  76. #ifndef STARPU_NON_BLOCKING_DRIVERS
  77. char dowake[STARPU_NMAXWORKERS] = { 0 };
  78. #endif
  79. if(workers->init_iterator)
  80. workers->init_iterator(workers, &it);
  81. while(workers->has_next_master(workers, &it))
  82. {
  83. worker = workers->get_next_master(workers, &it);
  84. #ifdef STARPU_NON_BLOCKING_DRIVERS
  85. if (!starpu_bitmap_get(data->waiters, worker))
  86. /* This worker is not waiting for a task */
  87. continue;
  88. #endif
  89. unsigned nimpl;
  90. for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
  91. if (starpu_worker_can_execute_task(worker, task, nimpl))
  92. {
  93. /* It can execute this one, tell him! */
  94. #ifdef STARPU_NON_BLOCKING_DRIVERS
  95. starpu_bitmap_unset(data->waiters, worker);
  96. /* We really woke at least somebody, no need to wake somebody else */
  97. break;
  98. #else
  99. dowake[worker] = 1;
  100. #endif
  101. }
  102. }
  103. /* Let the task free */
  104. STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
  105. #ifndef STARPU_NON_BLOCKING_DRIVERS
  106. /* Now that we have a list of potential workers, try to wake one */
  107. if(workers->init_iterator)
  108. workers->init_iterator(workers, &it);
  109. while(workers->has_next(workers, &it))
  110. {
  111. worker = workers->get_next(workers, &it);
  112. if (dowake[worker])
  113. {
  114. starpu_pthread_mutex_t *sched_mutex;
  115. starpu_pthread_cond_t *sched_cond;
  116. starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
  117. if (starpu_wakeup_worker(worker, sched_cond, sched_mutex))
  118. break; // wake up a single worker
  119. }
  120. }
  121. #endif
  122. return 0;
  123. }
  124. static struct starpu_task *pop_every_task_eager_policy(unsigned sched_ctx_id)
  125. {
  126. struct _starpu_eager_center_policy_data *data = (struct _starpu_eager_center_policy_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  127. int workerid = starpu_worker_get_id();
  128. STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
  129. struct starpu_task* task = _starpu_fifo_pop_every_task(data->fifo, workerid);
  130. STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
  131. return task;
  132. }
  133. static struct starpu_task *pop_task_eager_policy(unsigned sched_ctx_id)
  134. {
  135. unsigned workerid = starpu_worker_get_id();
  136. struct _starpu_eager_center_policy_data *data = (struct _starpu_eager_center_policy_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  137. struct starpu_task *task = NULL;
  138. /* block until some event happens */
  139. /* Here helgrind would shout that this is unprotected, this is just an
  140. * integer access, and we hold the sched mutex, so we can not miss any
  141. * wake up. */
  142. if (_starpu_fifo_empty(data->fifo))
  143. return NULL;
  144. #ifdef STARPU_NON_BLOCKING_DRIVERS
  145. if (starpu_bitmap_get(data->waiters, workerid))
  146. /* Nobody woke us, avoid bothering the mutex */
  147. return NULL;
  148. #endif
  149. STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
  150. task = _starpu_fifo_pop_task(data->fifo, workerid);
  151. if (!task)
  152. /* Tell pushers that we are waiting for tasks for us */
  153. starpu_bitmap_set(data->waiters, workerid);
  154. STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
  155. if(task)
  156. {
  157. unsigned child_sched_ctx = starpu_sched_ctx_worker_is_master_for_child_ctx(workerid, sched_ctx_id);
  158. if(child_sched_ctx != STARPU_NMAX_SCHED_CTXS)
  159. {
  160. starpu_sched_ctx_revert_task_counters(sched_ctx_id, task->flops);
  161. starpu_sched_ctx_move_task_to_ctx(task, child_sched_ctx);
  162. return NULL;
  163. }
  164. }
  165. return task;
  166. }
  167. static void eager_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
  168. {
  169. int workerid;
  170. unsigned i;
  171. for (i = 0; i < nworkers; i++)
  172. {
  173. workerid = workerids[i];
  174. int curr_workerid = starpu_worker_get_id();
  175. if(workerid != curr_workerid)
  176. {
  177. starpu_pthread_mutex_t *sched_mutex;
  178. starpu_pthread_cond_t *sched_cond;
  179. starpu_worker_get_sched_condition(workerid, &sched_mutex, &sched_cond);
  180. starpu_wakeup_worker(workerid, sched_cond, sched_mutex);
  181. }
  182. starpu_sched_ctx_worker_shares_tasks_lists(workerid, sched_ctx_id);
  183. }
  184. }
  185. struct starpu_sched_policy _starpu_sched_eager_policy =
  186. {
  187. .init_sched = initialize_eager_center_policy,
  188. .deinit_sched = deinitialize_eager_center_policy,
  189. .add_workers = eager_add_workers,
  190. .remove_workers = NULL,
  191. .push_task = push_task_eager_policy,
  192. .pop_task = pop_task_eager_policy,
  193. .pre_exec_hook = NULL,
  194. .post_exec_hook = NULL,
  195. .pop_every_task = pop_every_task_eager_policy,
  196. .policy_name = "eager",
  197. .policy_description = "eager policy with a central queue"
  198. };