jobs.h 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2009-2016 Université de Bordeaux
  4. * Copyright (C) 2010, 2011, 2013, 2014, 2015 CNRS
  5. * Copyright (C) 2011 Télécom-SudParis
  6. * Copyright (C) 2014 INRIA
  7. *
  8. * StarPU is free software; you can redistribute it and/or modify
  9. * it under the terms of the GNU Lesser General Public License as published by
  10. * the Free Software Foundation; either version 2.1 of the License, or (at
  11. * your option) any later version.
  12. *
  13. * StarPU is distributed in the hope that it will be useful, but
  14. * WITHOUT ANY WARRANTY; without even the implied warranty of
  15. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  16. *
  17. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  18. */
  19. #ifndef __JOBS_H__
  20. #define __JOBS_H__
  21. #include <starpu.h>
  22. #include <semaphore.h>
  23. #include <stdio.h>
  24. #include <stdlib.h>
  25. #include <stdint.h>
  26. #include <string.h>
  27. #include <stdarg.h>
  28. #include <common/config.h>
  29. #ifdef HAVE_UNISTD_H
  30. #include <unistd.h>
  31. #endif
  32. #include <common/timing.h>
  33. #include <common/list.h>
  34. #include <common/fxt.h>
  35. #include <core/dependencies/tags.h>
  36. #include <datawizard/datawizard.h>
  37. #include <core/perfmodel/perfmodel.h>
  38. #include <core/errorcheck.h>
  39. #include <common/barrier.h>
  40. #include <common/utils.h>
  41. #ifdef STARPU_USE_CUDA
  42. #include <cuda.h>
  43. #endif
  44. struct _starpu_worker;
  45. /* codelet function */
  46. typedef void (*_starpu_cl_func_t)(void **, void *);
  47. #define _STARPU_CPU_MAY_PERFORM(j) ((j)->task->cl->where & STARPU_CPU)
  48. #define _STARPU_CUDA_MAY_PERFORM(j) ((j)->task->cl->where & STARPU_CUDA)
  49. #define _STARPU_OPENCL_MAY_PERFORM(j) ((j)->task->cl->where & STARPU_OPENCL)
  50. #define _STARPU_MIC_MAY_PERFORM(j) ((j)->task->cl->where & STARPU_MIC)
  51. #define _STARPU_SCC_MAY_PERFORM(j) ((j)->task->cl->where & STARPU_SCC)
  52. struct _starpu_data_descr
  53. {
  54. starpu_data_handle_t handle;
  55. enum starpu_data_access_mode mode;
  56. int node;
  57. };
  58. struct _starpu_job_list {
  59. struct _starpu_job_list *next;
  60. struct _starpu_job_list *prev;
  61. };
  62. #ifdef STARPU_DEBUG
  63. #define STARPU_ASSERT_JOB_LIST(expr) STARPU_ASSERT(expr)
  64. #else
  65. #define STARPU_ASSERT_JOB_LIST(expr) ((void) 0)
  66. #endif
  67. #define _starpu_job_of(elt, member) \
  68. ((struct _starpu_job *) ((uintptr_t) (elt) - ((uintptr_t) (&((struct _starpu_job *) 0)->member))))
  69. #define _starpu_job_list_init(head) do { \
  70. struct _starpu_job_list *_head = (head); \
  71. _head->next = _head; \
  72. _head->prev = _head; \
  73. } while (0)
  74. #define _starpu_job_list_push_front(head, j, member) do { \
  75. struct _starpu_job *_j = (j); \
  76. struct _starpu_job_list *_head = (head); \
  77. STARPU_ASSERT_JOB_LIST(_j->member.prev == NULL); \
  78. STARPU_ASSERT_JOB_LIST(_j->member.next == NULL); \
  79. _j->member.next = _head->next; \
  80. _j->member.prev = _head; \
  81. _head->next->prev = &_j->member; \
  82. _head->next = &_j->member; \
  83. } while (0)
  84. #define _starpu_job_list_push_back(head, j, member) do { \
  85. struct _starpu_job *_j = (j); \
  86. struct _starpu_job_list *_head = (head); \
  87. STARPU_ASSERT_JOB_LIST(_j->member.prev == NULL); \
  88. STARPU_ASSERT_JOB_LIST(_j->member.next == NULL); \
  89. _j->member.prev = _head->prev; \
  90. _j->member.next = _head; \
  91. _head->prev->next = &_j->member; \
  92. _head->prev = &_j->member; \
  93. } while (0)
  94. #define _starpu_job_list_erase(head, j, member) do { \
  95. struct _starpu_job *_j = (j); \
  96. STARPU_ASSERT_JOB_LIST(_j->member.next->prev == &_j->member); \
  97. _j->member.next->prev = _j->member.prev; \
  98. STARPU_ASSERT_JOB_LIST(_j->member.prev->next == &_j->member); \
  99. _j->member.prev->next = _j->member.next; \
  100. _j->member.next = NULL; \
  101. _j->member.prev = NULL; \
  102. } while (0)
  103. #define _starpu_job_list_queued(j, member) \
  104. ((j)->member.next != NULL)
  105. #define _starpu_job_list_empty(head) \
  106. ((head)->next != head)
  107. #define _starpu_job_list_begin(head, member) \
  108. _starpu_job_of((head)->next, member)
  109. #define _starpu_job_list_next(head, j, member) \
  110. _starpu_job_of((j)->member.next, member)
  111. #define _starpu_job_list_end(head, member) \
  112. _starpu_job_of(head, member)
  113. /* A job is the internal representation of a task. */
  114. struct _starpu_job {
  115. /* Each job is attributed a unique id. */
  116. unsigned long job_id;
  117. /* The task associated to that job */
  118. struct starpu_task *task;
  119. /* These synchronization structures are used to wait for the job to be
  120. * available or terminated for instance. */
  121. starpu_pthread_mutex_t sync_mutex;
  122. starpu_pthread_cond_t sync_cond;
  123. /* To avoid deadlocks, we reorder the different buffers accessed to by
  124. * the task so that we always grab the rw-lock associated to the
  125. * handles in the same order. */
  126. struct _starpu_data_descr ordered_buffers[STARPU_NMAXBUFS];
  127. struct _starpu_task_wrapper_dlist dep_slots[STARPU_NMAXBUFS];
  128. struct _starpu_data_descr *dyn_ordered_buffers;
  129. struct _starpu_task_wrapper_dlist *dyn_dep_slots;
  130. /* If a tag is associated to the job, this points to the internal data
  131. * structure that describes the tag status. */
  132. struct _starpu_tag *tag;
  133. /* Maintain a list of all the completion groups that depend on the job.
  134. * */
  135. struct _starpu_cg_list job_successors;
  136. /* For tasks with cl==NULL but submitted with explicit data dependency,
  137. * the handle for this dependency, so as to remove the task from the
  138. * last_writer/readers */
  139. starpu_data_handle_t implicit_dep_handle;
  140. struct _starpu_task_wrapper_dlist implicit_dep_slot;
  141. /* Indicates whether the task associated to that job has already been
  142. * submitted to StarPU (1) or not (0) (using starpu_task_submit).
  143. * Becomes and stays 2 when the task is submitted several times.
  144. *
  145. * Protected by j->sync_mutex.
  146. */
  147. unsigned submitted:2;
  148. /* Indicates whether the task associated to this job is terminated or
  149. * not.
  150. *
  151. * Protected by j->sync_mutex.
  152. */
  153. unsigned terminated:2;
  154. #ifdef STARPU_OPENMP
  155. /* Job is a continuation or a regular task. */
  156. unsigned continuation;
  157. /* If 0, the prepared continuation is not resubmitted automatically
  158. * when going to sleep, if 1, the prepared continuation is immediately
  159. * resubmitted when going to sleep. */
  160. unsigned continuation_resubmit;
  161. /* Callback function called when:
  162. * - The continuation starpu task is ready to be submitted again if
  163. * continuation_resubmit = 0;
  164. * - The continuation starpu task has just been re-submitted if
  165. * continuation_resubmit = 1. */
  166. void (*continuation_callback_on_sleep)(void *arg);
  167. void *continuation_callback_on_sleep_arg;
  168. void (*omp_cleanup_callback)(void *arg);
  169. void *omp_cleanup_callback_arg;
  170. /* Job has been stopped at least once. */
  171. unsigned discontinuous;
  172. /* Cumulated execution time for discontinuous jobs */
  173. struct timespec cumulated_ts;
  174. /* Cumulated energy consumption for discontinuous jobs */
  175. double cumulated_energy_consumed;
  176. #endif
  177. /* The value of the footprint that identifies the job may be stored in
  178. * this structure. */
  179. uint32_t footprint;
  180. unsigned footprint_is_computed:1;
  181. /* Should that task appear in the debug tools ? (eg. the DAG generated
  182. * with dot) */
  183. unsigned exclude_from_dag:1;
  184. /* Is that task internal to StarPU? */
  185. unsigned internal:1;
  186. /* During the reduction of a handle, StarPU may have to submit tasks to
  187. * perform the reduction itself: those task should not be stalled while
  188. * other tasks are blocked until the handle has been properly reduced,
  189. * so we need a flag to differentiate them from "normal" tasks. */
  190. unsigned reduction_task:1;
  191. /* The implementation associated to the job */
  192. unsigned nimpl;
  193. /* Number of workers executing that task (>1 if the task is parallel)
  194. * */
  195. int task_size;
  196. /* In case we have assigned this job to a combined workerid */
  197. int combined_workerid;
  198. /* How many workers are currently running an alias of that job (for
  199. * parallel tasks only). */
  200. int active_task_alias_count;
  201. /* Used to record codelet start time instead of using a
  202. * local variable */
  203. struct timespec cl_start;
  204. struct bound_task *bound_task;
  205. /* Parallel workers may have to synchronize before/after the execution of a parallel task. */
  206. starpu_pthread_barrier_t before_work_barrier;
  207. starpu_pthread_barrier_t after_work_barrier;
  208. unsigned after_work_busy_barrier;
  209. /*
  210. * Fields for graph analysis for scheduling heuristics
  211. */
  212. /* Member of list of all jobs without incoming dependency */
  213. struct _starpu_job_list top;
  214. /* Member of list of all jobs without outgoing dependency */
  215. struct _starpu_job_list bottom;
  216. /* Member of list of all jobs */
  217. struct _starpu_job_list all;
  218. /* set of incoming dependencies */
  219. struct _starpu_job **incoming; /* May contain NULLs for terminated jobs */
  220. unsigned n_incoming; /* Number of slots used */
  221. unsigned alloc_incoming; /* Size of incoming */
  222. /* set of outgoing dependencies */
  223. struct _starpu_job **outgoing;
  224. unsigned *outgoing_slot; /* Index within corresponding incoming array */
  225. unsigned n_outgoing; /* Number of slots used */
  226. unsigned alloc_outgoing; /* Size of outgoing */
  227. unsigned depth; /* Rank from bottom, in number of jobs */
  228. /* Only available if _starpu_graph_compute_depths was called */
  229. unsigned descendants; /* Number of children, grand-children, etc. */
  230. /* Only available if _starpu_graph_compute_descendants was called */
  231. int graph_n; /* Variable available for graph flow */
  232. #ifdef STARPU_DEBUG
  233. /* Linked-list of all jobs, for debugging */
  234. struct _starpu_job_list all_submitted;
  235. #endif
  236. };
  237. void _starpu_job_init(void);
  238. void _starpu_job_fini(void);
  239. /* Create an internal struct _starpu_job *structure to encapsulate the task. */
  240. struct _starpu_job* _starpu_job_create(struct starpu_task *task) STARPU_ATTRIBUTE_MALLOC;
  241. /* Destroy the data structure associated to the job structure */
  242. void _starpu_job_destroy(struct _starpu_job *j);
  243. /* Test for the termination of the job */
  244. int _starpu_job_finished(struct _starpu_job *j);
  245. /* Wait for the termination of the job */
  246. void _starpu_wait_job(struct _starpu_job *j);
  247. #ifdef STARPU_OPENMP
  248. /* Test for the termination of the job */
  249. int _starpu_test_job_termination(struct _starpu_job *j);
  250. /* Prepare the job for accepting new dependencies before becoming a continuation. */
  251. void _starpu_job_prepare_for_continuation_ext(struct _starpu_job *j, unsigned continuation_resubmit,
  252. void (*continuation_callback_on_sleep)(void *arg), void *continuation_callback_on_sleep_arg);
  253. void _starpu_job_prepare_for_continuation(struct _starpu_job *j);
  254. void _starpu_job_set_omp_cleanup_callback(struct _starpu_job *j,
  255. void (*omp_cleanup_callback)(void *arg), void *omp_cleanup_callback_arg);
  256. #endif
  257. /* Specify that the task should not appear in the DAG generated by debug tools. */
  258. void _starpu_exclude_task_from_dag(struct starpu_task *task);
  259. /* try to submit job j, enqueue it if it's not schedulable yet. The job's sync mutex is supposed to be held already */
  260. unsigned _starpu_enforce_deps_and_schedule(struct _starpu_job *j);
  261. unsigned _starpu_enforce_deps_starting_from_task(struct _starpu_job *j);
  262. #ifdef STARPU_OPENMP
  263. /* When waking up a continuation, we only enforce new task dependencies */
  264. unsigned _starpu_reenforce_task_deps_and_schedule(struct _starpu_job *j);
  265. #endif
  266. /* Called at the submission of the job */
  267. void _starpu_handle_job_submission(struct _starpu_job *j);
  268. /* This function must be called after the execution of a job, this triggers all
  269. * job's dependencies and perform the callback function if any. */
  270. void _starpu_handle_job_termination(struct _starpu_job *j);
  271. /* Get the sum of the size of the data accessed by the job. */
  272. size_t _starpu_job_get_data_size(struct starpu_perfmodel *model, struct starpu_perfmodel_arch* arch, unsigned nimpl, struct _starpu_job *j);
  273. /* Get a task from the local pool of tasks that were explicitly attributed to
  274. * that worker. */
  275. struct starpu_task *_starpu_pop_local_task(struct _starpu_worker *worker);
  276. /* Put a task into the pool of tasks that are explicitly attributed to the
  277. * specified worker. If "back" is set, the task is put at the back of the list.
  278. * Considering the tasks are popped from the back, this value should be 0 to
  279. * enforce a FIFO ordering. */
  280. int _starpu_push_local_task(struct _starpu_worker *worker, struct starpu_task *task, int prio);
  281. #define _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(job, i) ((job->dyn_ordered_buffers) ? job->dyn_ordered_buffers[i].handle : job->ordered_buffers[i].handle)
  282. #define _STARPU_JOB_GET_ORDERED_BUFFER_MODE(job, i) ((job->dyn_ordered_buffers) ? job->dyn_ordered_buffers[i].mode : job->ordered_buffers[i].mode)
  283. #define _STARPU_JOB_GET_ORDERED_BUFFER_NODE(job, i) ((job->dyn_ordered_buffers) ? job->dyn_ordered_buffers[i].node : job->ordered_buffers[i].node)
  284. #define _STARPU_JOB_SET_ORDERED_BUFFER_HANDLE(job, handle, i) do { if (job->dyn_ordered_buffers) job->dyn_ordered_buffers[i].handle = (handle); else job->ordered_buffers[i].handle = (handle);} while(0)
  285. #define _STARPU_JOB_SET_ORDERED_BUFFER_MODE(job, __mode, i) do { if (job->dyn_ordered_buffers) job->dyn_ordered_buffers[i].mode = __mode; else job->ordered_buffers[i].mode = __mode;} while(0)
  286. #define _STARPU_JOB_SET_ORDERED_BUFFER_NODE(job, __node, i) do { if (job->dyn_ordered_buffers) job->dyn_ordered_buffers[i].node = __node; else job->ordered_buffers[i].node = __node;} while(0)
  287. #define _STARPU_JOB_SET_ORDERED_BUFFER(job, buffer, i) do { if (job->dyn_ordered_buffers) job->dyn_ordered_buffers[i] = buffer; else job->ordered_buffers[i] = buffer;} while(0)
  288. #define _STARPU_JOB_GET_ORDERED_BUFFERS(job) (job->dyn_ordered_buffers) ? job->dyn_ordered_buffers : job->ordered_buffers
  289. #define _STARPU_JOB_GET_DEP_SLOTS(job) (((job)->dyn_dep_slots) ? (job)->dyn_dep_slots : (job)->dep_slots)
  290. #endif // __JOBS_H__