pipeline.c 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2012-2020 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
  4. *
  5. * StarPU is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * StarPU is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. /*
  17. * This examples shows how to submit a pipeline to StarPU with limited buffer
  18. * use, and avoiding submitted all the tasks at once.
  19. *
  20. * This is a dumb example pipeline, depicted here:
  21. *
  22. * x--\
  23. * >==axpy-->sum
  24. * y--/
  25. *
  26. * x and y produce vectors full of x and y values, axpy multiplies them, and sum
  27. * sums it up. We thus have 3 temporary buffers
  28. */
  29. #include <starpu.h>
  30. #include <stdint.h>
  31. #include <semaphore.h>
  32. #include <common/blas.h>
  33. #ifdef STARPU_USE_CUDA
  34. #include <starpu_cublas_v2.h>
  35. #endif
  36. #define FPRINTF(ofile, fmt, ...) do { if (!getenv("STARPU_SSILENT")) {fprintf(ofile, fmt, ## __VA_ARGS__); }} while(0)
  37. /* Vector size */
  38. #ifdef STARPU_QUICK_CHECK
  39. #define N 16
  40. #else
  41. #define N 1048576
  42. #endif
  43. /* Number of iteration buffers, and thus overlapped pipeline iterations */
  44. #define K 16
  45. /* Number of concurrently submitted pipeline iterations */
  46. #define C 64
  47. /* Number of iterations */
  48. #define L 256
  49. /* X / Y codelets */
  50. void pipeline_cpu_x(void *descr[], void *args)
  51. {
  52. float x;
  53. float *val = (float *) STARPU_VECTOR_GET_PTR(descr[0]);
  54. int n = STARPU_VECTOR_GET_NX(descr[0]);
  55. int i;
  56. starpu_codelet_unpack_args(args, &x);
  57. for (i = 0; i < n ; i++)
  58. val[i] = x;
  59. }
  60. static struct starpu_perfmodel pipeline_model_x =
  61. {
  62. .type = STARPU_HISTORY_BASED,
  63. .symbol = "pipeline_model_x"
  64. };
  65. static struct starpu_codelet pipeline_codelet_x =
  66. {
  67. .cpu_funcs = {pipeline_cpu_x},
  68. .cpu_funcs_name = {"pipeline_cpu_x"},
  69. .nbuffers = 1,
  70. .modes = {STARPU_W},
  71. .model = &pipeline_model_x
  72. };
  73. /* axpy codelets */
  74. void pipeline_cpu_axpy(void *descr[], void *arg)
  75. {
  76. (void)arg;
  77. float *x = (float *) STARPU_VECTOR_GET_PTR(descr[0]);
  78. float *y = (float *) STARPU_VECTOR_GET_PTR(descr[1]);
  79. int n = STARPU_VECTOR_GET_NX(descr[0]);
  80. STARPU_SAXPY(n, 1., x, 1, y, 1);
  81. }
  82. #ifdef STARPU_USE_CUDA
  83. void pipeline_cublas_axpy(void *descr[], void *arg)
  84. {
  85. (void)arg;
  86. float *x = (float *) STARPU_VECTOR_GET_PTR(descr[0]);
  87. float *y = (float *) STARPU_VECTOR_GET_PTR(descr[1]);
  88. int n = STARPU_VECTOR_GET_NX(descr[0]);
  89. float alpha = 1.;
  90. cublasStatus_t status = cublasSaxpy(starpu_cublas_get_local_handle(), n, &alpha, x, 1, y, 1);
  91. if (status != CUBLAS_STATUS_SUCCESS)
  92. STARPU_CUBLAS_REPORT_ERROR(status);
  93. }
  94. #endif
  95. static struct starpu_perfmodel pipeline_model_axpy =
  96. {
  97. .type = STARPU_HISTORY_BASED,
  98. .symbol = "pipeline_model_axpy"
  99. };
  100. static struct starpu_codelet pipeline_codelet_axpy =
  101. {
  102. .cpu_funcs = {pipeline_cpu_axpy},
  103. .cpu_funcs_name = {"pipeline_cpu_axpy"},
  104. #ifdef STARPU_USE_CUDA
  105. .cuda_funcs = {pipeline_cublas_axpy},
  106. .cuda_flags = {STARPU_CUDA_ASYNC},
  107. #endif
  108. .nbuffers = 2,
  109. .modes = {STARPU_R, STARPU_RW},
  110. .model = &pipeline_model_axpy
  111. };
  112. /* sum codelet */
  113. void pipeline_cpu_sum(void *descr[], void *arg)
  114. {
  115. (void)arg;
  116. float *x = (float *) STARPU_VECTOR_GET_PTR(descr[0]);
  117. int n = STARPU_VECTOR_GET_NX(descr[0]);
  118. float y;
  119. y = STARPU_SASUM(n, x, 1);
  120. FPRINTF(stderr,"CPU finished with %f\n", y);
  121. }
  122. #ifdef STARPU_USE_CUDA
  123. void pipeline_cublas_sum(void *descr[], void *arg)
  124. {
  125. (void)arg;
  126. float *x = (float *) STARPU_VECTOR_GET_PTR(descr[0]);
  127. int n = STARPU_VECTOR_GET_NX(descr[0]);
  128. float y;
  129. cublasStatus_t status = cublasSasum(starpu_cublas_get_local_handle(), n, x, 1, &y);
  130. if (status != CUBLAS_STATUS_SUCCESS)
  131. STARPU_CUBLAS_REPORT_ERROR(status);
  132. FPRINTF(stderr,"CUBLAS finished with %f\n", y);
  133. }
  134. #endif
  135. static struct starpu_perfmodel pipeline_model_sum =
  136. {
  137. .type = STARPU_HISTORY_BASED,
  138. .symbol = "pipeline_model_sum"
  139. };
  140. static struct starpu_codelet pipeline_codelet_sum =
  141. {
  142. .cpu_funcs = {pipeline_cpu_sum},
  143. .cpu_funcs_name = {"pipeline_cpu_sum"},
  144. #ifdef STARPU_USE_CUDA
  145. .cuda_funcs = {pipeline_cublas_sum},
  146. .cuda_flags = {STARPU_CUDA_ASYNC},
  147. #endif
  148. .nbuffers = 1,
  149. .modes = {STARPU_R},
  150. .model = &pipeline_model_sum
  151. };
  152. static void release_sem(void *arg)
  153. {
  154. sem_post(arg);
  155. };
  156. int main(void)
  157. {
  158. int ret = 0;
  159. int k, l, c;
  160. starpu_data_handle_t buffersX[K], buffersY[K], buffersP[K];
  161. sem_t sems[C];
  162. ret = starpu_init(NULL);
  163. if (ret == -ENODEV)
  164. exit(77);
  165. STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
  166. starpu_cublas_init();
  167. /* Initialize the K temporary buffers. No need to allocate it ourselves
  168. * Since it's the X and Y kernels which will fill the initial values. */
  169. for (k = 0; k < K; k++)
  170. {
  171. starpu_vector_data_register(&buffersX[k], -1, 0, N, sizeof(float));
  172. starpu_vector_data_register(&buffersY[k], -1, 0, N, sizeof(float));
  173. starpu_vector_data_register(&buffersP[k], -1, 0, N, sizeof(float));
  174. }
  175. /* Initialize way to wait for the C previous concurrent stages */
  176. for (c = 0; c < C; c++)
  177. sem_init(&sems[c], 0, 0);
  178. /* Submits the l pipeline stages */
  179. for (l = 0; l < L; l++)
  180. {
  181. float x = l;
  182. float y = 2*l;
  183. /* First wait for the C previous concurrent stages */
  184. if (l >= C)
  185. {
  186. starpu_do_schedule();
  187. sem_wait(&sems[l%C]);
  188. }
  189. /* Now submit the next stage */
  190. ret = starpu_task_insert(&pipeline_codelet_x,
  191. STARPU_W, buffersX[l%K],
  192. STARPU_VALUE, &x, sizeof(x),
  193. STARPU_TAG_ONLY, (starpu_tag_t) (100*l),
  194. 0);
  195. if (ret == -ENODEV) goto enodev;
  196. STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_insert x");
  197. ret = starpu_task_insert(&pipeline_codelet_x,
  198. STARPU_W, buffersY[l%K],
  199. STARPU_VALUE, &y, sizeof(y),
  200. STARPU_TAG_ONLY, (starpu_tag_t) (100*l+1),
  201. 0);
  202. if (ret == -ENODEV) goto enodev;
  203. STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_insert y");
  204. ret = starpu_task_insert(&pipeline_codelet_axpy,
  205. STARPU_R, buffersX[l%K],
  206. STARPU_RW, buffersY[l%K],
  207. STARPU_TAG_ONLY, (starpu_tag_t) l,
  208. 0);
  209. if (ret == -ENODEV) goto enodev;
  210. STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_insert axpy");
  211. ret = starpu_task_insert(&pipeline_codelet_sum,
  212. STARPU_R, buffersY[l%K],
  213. STARPU_CALLBACK_WITH_ARG_NFREE, release_sem, &sems[l%C],
  214. STARPU_TAG_ONLY, (starpu_tag_t) l,
  215. 0);
  216. if (ret == -ENODEV) goto enodev;
  217. STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_insert sum");
  218. }
  219. starpu_task_wait_for_all();
  220. enodev:
  221. for (k = 0; k < K; k++)
  222. {
  223. starpu_data_unregister(buffersX[k]);
  224. starpu_data_unregister(buffersY[k]);
  225. starpu_data_unregister(buffersP[k]);
  226. }
  227. starpu_shutdown();
  228. return (ret == -ENODEV ? 77 : 0);
  229. }