jobs.h 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2008-2020 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
  4. * Copyright (C) 2011 Télécom-SudParis
  5. * Copyright (C) 2013 Thibaut Lambert
  6. *
  7. * StarPU is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU Lesser General Public License as published by
  9. * the Free Software Foundation; either version 2.1 of the License, or (at
  10. * your option) any later version.
  11. *
  12. * StarPU is distributed in the hope that it will be useful, but
  13. * WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  15. *
  16. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  17. */
  18. #ifndef __JOBS_H__
  19. #define __JOBS_H__
  20. /** @file */
  21. #include <starpu.h>
  22. #include <semaphore.h>
  23. #include <stdio.h>
  24. #include <stdlib.h>
  25. #include <stdint.h>
  26. #include <string.h>
  27. #include <stdarg.h>
  28. #include <common/config.h>
  29. #ifdef HAVE_UNISTD_H
  30. #include <unistd.h>
  31. #endif
  32. #include <common/timing.h>
  33. #include <common/list.h>
  34. #include <common/fxt.h>
  35. #include <core/dependencies/tags.h>
  36. #include <datawizard/datawizard.h>
  37. #include <core/perfmodel/perfmodel.h>
  38. #include <core/errorcheck.h>
  39. #include <common/barrier.h>
  40. #include <common/utils.h>
  41. #include <common/list.h>
  42. #ifdef STARPU_USE_CUDA
  43. #include <cuda.h>
  44. #endif
  45. struct _starpu_worker;
  46. /** codelet function */
  47. typedef void (*_starpu_cl_func_t)(void **, void *);
  48. #define _STARPU_CPU_MAY_PERFORM(j) ((j)->task->where & STARPU_CPU)
  49. #define _STARPU_CUDA_MAY_PERFORM(j) ((j)->task->where & STARPU_CUDA)
  50. #define _STARPU_OPENCL_MAY_PERFORM(j) ((j)->task->where & STARPU_OPENCL)
  51. #define _STARPU_MIC_MAY_PERFORM(j) ((j)->task->where & STARPU_MIC)
  52. struct _starpu_data_descr
  53. {
  54. starpu_data_handle_t handle;
  55. enum starpu_data_access_mode mode;
  56. int node; /** This is the value actually chosen, only set by
  57. _starpu_fetch_task_input for coherency with
  58. __starpu_push_task_output */
  59. int index;
  60. int orderedindex; /** For this field the array is actually indexed by
  61. parameter order, and this provides the ordered
  62. index */
  63. };
  64. #ifdef STARPU_DEBUG
  65. MULTILIST_CREATE_TYPE(_starpu_job, all_submitted)
  66. #endif
  67. /** A job is the internal representation of a task. */
  68. struct _starpu_job
  69. {
  70. /** Each job is attributed a unique id. */
  71. unsigned long job_id;
  72. /** The task associated to that job */
  73. struct starpu_task *task;
  74. /** A task that this will unlock quickly, e.g. we are the pre_sync part
  75. * of a data acquisition, and the caller promised that data release will
  76. * happen immediately, so that the post_sync task will be started
  77. * immediately after. */
  78. struct _starpu_job *quick_next;
  79. /** These synchronization structures are used to wait for the job to be
  80. * available or terminated for instance. */
  81. starpu_pthread_mutex_t sync_mutex;
  82. starpu_pthread_cond_t sync_cond;
  83. /** To avoid deadlocks, we reorder the different buffers accessed to by
  84. * the task so that we always grab the rw-lock associated to the
  85. * handles in the same order. */
  86. struct _starpu_data_descr ordered_buffers[STARPU_NMAXBUFS];
  87. struct _starpu_task_wrapper_dlist dep_slots[STARPU_NMAXBUFS];
  88. struct _starpu_data_descr *dyn_ordered_buffers;
  89. struct _starpu_task_wrapper_dlist *dyn_dep_slots;
  90. /** If a tag is associated to the job, this points to the internal data
  91. * structure that describes the tag status. */
  92. struct _starpu_tag *tag;
  93. /** Maintain a list of all the completion groups that depend on the job.
  94. * */
  95. struct _starpu_cg_list job_successors;
  96. /** Task whose termination depends on this task */
  97. struct starpu_task *end_rdep;
  98. /** For tasks with cl==NULL but submitted with explicit data dependency,
  99. * the handle for this dependency, so as to remove the task from the
  100. * last_writer/readers */
  101. starpu_data_handle_t implicit_dep_handle;
  102. struct _starpu_task_wrapper_dlist implicit_dep_slot;
  103. /** Indicates whether the task associated to that job has already been
  104. * submitted to StarPU (1) or not (0) (using starpu_task_submit).
  105. * Becomes and stays 2 when the task is submitted several times.
  106. *
  107. * Protected by j->sync_mutex.
  108. */
  109. unsigned submitted:2;
  110. /** Indicates whether the task associated to this job is terminated or
  111. * not.
  112. *
  113. * Protected by j->sync_mutex.
  114. */
  115. unsigned terminated:2;
  116. #ifdef STARPU_OPENMP
  117. /** Job is a continuation or a regular task. */
  118. unsigned continuation;
  119. /** If 0, the prepared continuation is not resubmitted automatically
  120. * when going to sleep, if 1, the prepared continuation is immediately
  121. * resubmitted when going to sleep. */
  122. unsigned continuation_resubmit;
  123. /** Callback function called when:
  124. * - The continuation starpu task is ready to be submitted again if
  125. * continuation_resubmit = 0;
  126. * - The continuation starpu task has just been re-submitted if
  127. * continuation_resubmit = 1. */
  128. void (*continuation_callback_on_sleep)(void *arg);
  129. void *continuation_callback_on_sleep_arg;
  130. void (*omp_cleanup_callback)(void *arg);
  131. void *omp_cleanup_callback_arg;
  132. /** Job has been stopped at least once. */
  133. unsigned discontinuous;
  134. /** Cumulated execution time for discontinuous jobs */
  135. struct timespec cumulated_ts;
  136. /** Cumulated energy consumption for discontinuous jobs */
  137. double cumulated_energy_consumed;
  138. #endif
  139. /** The value of the footprint that identifies the job may be stored in
  140. * this structure. */
  141. uint32_t footprint;
  142. unsigned footprint_is_computed:1;
  143. /** Should that task appear in the debug tools ? (eg. the DAG generated
  144. * with dot) */
  145. unsigned exclude_from_dag:1;
  146. /** Is that task internal to StarPU? */
  147. unsigned internal:1;
  148. /** Did that task use sequential consistency for its data? */
  149. unsigned sequential_consistency:1;
  150. /** During the reduction of a handle, StarPU may have to submit tasks to
  151. * perform the reduction itself: those task should not be stalled while
  152. * other tasks are blocked until the handle has been properly reduced,
  153. * so we need a flag to differentiate them from "normal" tasks. */
  154. unsigned reduction_task:1;
  155. /** The implementation associated to the job */
  156. unsigned nimpl;
  157. /** Number of workers executing that task (>1 if the task is parallel)
  158. * */
  159. int task_size;
  160. /** In case we have assigned this job to a combined workerid */
  161. int combined_workerid;
  162. /** How many workers are currently running an alias of that job (for
  163. * parallel tasks only). */
  164. int active_task_alias_count;
  165. struct bound_task *bound_task;
  166. /** Parallel workers may have to synchronize before/after the execution of a parallel task. */
  167. starpu_pthread_barrier_t before_work_barrier;
  168. starpu_pthread_barrier_t after_work_barrier;
  169. unsigned after_work_busy_barrier;
  170. struct _starpu_graph_node *graph_node;
  171. #ifdef STARPU_DEBUG
  172. /** Linked-list of all jobs, for debugging */
  173. struct _starpu_job_multilist_all_submitted all_submitted;
  174. #endif
  175. };
  176. #ifdef STARPU_DEBUG
  177. MULTILIST_CREATE_INLINES(struct _starpu_job, _starpu_job, all_submitted)
  178. #endif
  179. void _starpu_job_init(void);
  180. void _starpu_job_fini(void);
  181. /** Create an internal struct _starpu_job *structure to encapsulate the task. */
  182. struct _starpu_job* _starpu_job_create(struct starpu_task *task) STARPU_ATTRIBUTE_MALLOC;
  183. /** Destroy the data structure associated to the job structure */
  184. void _starpu_job_destroy(struct _starpu_job *j);
  185. /** Test for the termination of the job */
  186. int _starpu_job_finished(struct _starpu_job *j);
  187. /** Wait for the termination of the job */
  188. void _starpu_wait_job(struct _starpu_job *j);
  189. #ifdef STARPU_OPENMP
  190. /** Test for the termination of the job */
  191. int _starpu_test_job_termination(struct _starpu_job *j);
  192. /** Prepare the job for accepting new dependencies before becoming a continuation. */
  193. void _starpu_job_prepare_for_continuation_ext(struct _starpu_job *j, unsigned continuation_resubmit,
  194. void (*continuation_callback_on_sleep)(void *arg), void *continuation_callback_on_sleep_arg);
  195. void _starpu_job_prepare_for_continuation(struct _starpu_job *j);
  196. void _starpu_job_set_omp_cleanup_callback(struct _starpu_job *j,
  197. void (*omp_cleanup_callback)(void *arg), void *omp_cleanup_callback_arg);
  198. #endif
  199. /** Specify that the task should not appear in the DAG generated by debug tools. */
  200. void _starpu_exclude_task_from_dag(struct starpu_task *task);
  201. /** try to submit job j, enqueue it if it's not schedulable yet. The job's sync mutex is supposed to be held already */
  202. unsigned _starpu_enforce_deps_and_schedule(struct _starpu_job *j);
  203. unsigned _starpu_enforce_deps_starting_from_task(struct _starpu_job *j);
  204. #ifdef STARPU_OPENMP
  205. /** When waking up a continuation, we only enforce new task dependencies */
  206. unsigned _starpu_reenforce_task_deps_and_schedule(struct _starpu_job *j);
  207. #endif
  208. unsigned _starpu_take_deps_and_schedule(struct _starpu_job *j);
  209. void _starpu_enforce_deps_notify_job_ready_soon(struct _starpu_job *j, _starpu_notify_job_start_data *data, int tag);
  210. /** Called at the submission of the job */
  211. void _starpu_handle_job_submission(struct _starpu_job *j);
  212. /** This function must be called after the execution of a job, this triggers all
  213. * job's dependencies and perform the callback function if any. */
  214. void _starpu_handle_job_termination(struct _starpu_job *j);
  215. /** Get the sum of the size of the data accessed by the job. */
  216. size_t _starpu_job_get_data_size(struct starpu_perfmodel *model, struct starpu_perfmodel_arch* arch, unsigned nimpl, struct _starpu_job *j);
  217. /** Get a task from the local pool of tasks that were explicitly attributed to
  218. * that worker. */
  219. struct starpu_task *_starpu_pop_local_task(struct _starpu_worker *worker);
  220. /** Put a task into the pool of tasks that are explicitly attributed to the
  221. * specified worker. If "back" is set, the task is put at the back of the list.
  222. * Considering the tasks are popped from the back, this value should be 0 to
  223. * enforce a FIFO ordering. */
  224. int _starpu_push_local_task(struct _starpu_worker *worker, struct starpu_task *task, int prio);
  225. #define _STARPU_JOB_GET_ORDERED_BUFFER_INDEX(job, i) ((job->dyn_ordered_buffers) ? job->dyn_ordered_buffers[i].index : job->ordered_buffers[i].index)
  226. #define _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(job, i) ((job->dyn_ordered_buffers) ? job->dyn_ordered_buffers[i].handle : job->ordered_buffers[i].handle)
  227. #define _STARPU_JOB_GET_ORDERED_BUFFER_MODE(job, i) ((job->dyn_ordered_buffers) ? job->dyn_ordered_buffers[i].mode : job->ordered_buffers[i].mode)
  228. #define _STARPU_JOB_GET_ORDERED_BUFFER_NODE(job, i) ((job->dyn_ordered_buffers) ? job->dyn_ordered_buffers[i].node : job->ordered_buffers[i].node)
  229. #define _STARPU_JOB_SET_ORDERED_BUFFER_INDEX(job, __index, i) do { if (job->dyn_ordered_buffers) job->dyn_ordered_buffers[i].index = (__index); else job->ordered_buffers[i].index = (__index);} while(0)
  230. #define _STARPU_JOB_SET_ORDERED_BUFFER_HANDLE(job, __handle, i) do { if (job->dyn_ordered_buffers) job->dyn_ordered_buffers[i].handle = (__handle); else job->ordered_buffers[i].handle = (__handle);} while(0)
  231. #define _STARPU_JOB_SET_ORDERED_BUFFER_MODE(job, __mode, i) do { if (job->dyn_ordered_buffers) job->dyn_ordered_buffers[i].mode = __mode; else job->ordered_buffers[i].mode = __mode;} while(0)
  232. #define _STARPU_JOB_SET_ORDERED_BUFFER_NODE(job, __node, i) do { if (job->dyn_ordered_buffers) job->dyn_ordered_buffers[i].node = __node; else job->ordered_buffers[i].node = __node;} while(0)
  233. #define _STARPU_JOB_SET_ORDERED_BUFFER(job, buffer, i) do { if (job->dyn_ordered_buffers) job->dyn_ordered_buffers[i] = buffer; else job->ordered_buffers[i] = buffer;} while(0)
  234. #define _STARPU_JOB_GET_ORDERED_BUFFERS(job) ((job->dyn_ordered_buffers) ? job->dyn_ordered_buffers : &job->ordered_buffers[0])
  235. #define _STARPU_JOB_GET_DEP_SLOTS(job) (((job)->dyn_dep_slots) ? (job)->dyn_dep_slots : (job)->dep_slots)
  236. #endif // __JOBS_H__