reduction.c 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. /*
  2. * StarPU
  3. * Copyright (C) Université Bordeaux 1, CNRS 2008-2010 (see AUTHORS file)
  4. *
  5. * This program is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <starpu.h>
  17. #include <common/utils.h>
  18. #include <datawizard/datawizard.h>
  19. void starpu_data_set_reduction_methods(starpu_data_handle handle,
  20. struct starpu_codelet_t *redux_cl,
  21. struct starpu_codelet_t *init_cl)
  22. {
  23. _starpu_spin_lock(&handle->header_lock);
  24. unsigned child;
  25. for (child = 0; child < handle->nchildren; child++)
  26. {
  27. /* make sure that the flags are applied to the children as well */
  28. struct starpu_data_state_t *child_handle = &handle->children[child];
  29. if (child_handle->nchildren > 0)
  30. starpu_data_set_reduction_methods(child_handle, redux_cl, init_cl);
  31. }
  32. handle->redux_cl = redux_cl;
  33. handle->init_cl = init_cl;
  34. _starpu_spin_unlock(&handle->header_lock);
  35. }
  36. void _starpu_redux_init_data_replicate(starpu_data_handle handle, struct starpu_data_replicate_s *replicate, int workerid)
  37. {
  38. STARPU_ASSERT(replicate);
  39. STARPU_ASSERT(replicate->allocated);
  40. struct starpu_codelet_t *init_cl = handle->init_cl;
  41. STARPU_ASSERT(init_cl);
  42. cl_func init_func = NULL;
  43. /* TODO Check that worker may execute the codelet */
  44. switch (starpu_worker_get_type(workerid)) {
  45. case STARPU_CPU_WORKER:
  46. init_func = init_cl->cpu_func;
  47. break;
  48. case STARPU_CUDA_WORKER:
  49. init_func = init_cl->cuda_func;
  50. break;
  51. case STARPU_OPENCL_WORKER:
  52. init_func = init_cl->opencl_func;
  53. break;
  54. default:
  55. STARPU_ABORT();
  56. break;
  57. }
  58. STARPU_ASSERT(init_func);
  59. init_func(&replicate->interface, NULL);
  60. replicate->initialized = 1;
  61. }
  62. /* Enable reduction mode */
  63. void starpu_data_start_reduction_mode(starpu_data_handle handle)
  64. {
  65. unsigned worker;
  66. _starpu_spin_lock(&handle->header_lock);
  67. for (worker = 0; worker < STARPU_NMAXWORKERS; worker++)
  68. {
  69. struct starpu_data_replicate_s *replicate;
  70. replicate = &handle->per_worker[worker];
  71. replicate->initialized = 0;
  72. }
  73. _starpu_spin_unlock(&handle->header_lock);
  74. }
  75. /* Force reduction */
  76. void starpu_data_end_reduction_mode(starpu_data_handle handle)
  77. {
  78. unsigned worker;
  79. _starpu_spin_lock(&handle->header_lock);
  80. /* Register all valid per-worker replicates */
  81. starpu_data_handle tmp_handles[STARPU_NMAXWORKERS];
  82. for (worker = 0; worker < STARPU_NMAXWORKERS; worker++)
  83. {
  84. if (handle->per_worker[worker].initialized)
  85. {
  86. /* Make sure the replicate is not removed */
  87. handle->per_worker[worker].refcnt++;
  88. uint32_t home_node = starpu_worker_get_memory_node(worker);
  89. starpu_data_register(&tmp_handles[worker], home_node, handle->per_worker[worker].interface, handle->ops);
  90. }
  91. else {
  92. tmp_handles[worker] = NULL;
  93. }
  94. }
  95. _starpu_spin_unlock(&handle->header_lock);
  96. /* Create a set of tasks to perform the reduction */
  97. for (worker = 0; worker < STARPU_NMAXWORKERS; worker++)
  98. {
  99. if (tmp_handles[worker])
  100. {
  101. struct starpu_task *redux_task = starpu_task_create();
  102. redux_task->cl = handle->redux_cl;
  103. STARPU_ASSERT(redux_task->cl);
  104. redux_task->buffers[0].handle = handle;
  105. redux_task->buffers[0].mode = STARPU_RW;
  106. redux_task->buffers[1].handle = tmp_handles[worker];
  107. redux_task->buffers[1].mode = STARPU_R;
  108. int ret = starpu_task_submit(redux_task);
  109. STARPU_ASSERT(!ret);
  110. }
  111. }
  112. /* TODO have a better way to synchronize */
  113. starpu_task_wait_for_all();
  114. _starpu_spin_lock(&handle->header_lock);
  115. for (worker = 0; worker < STARPU_NMAXWORKERS; worker++)
  116. {
  117. struct starpu_data_replicate_s *replicate;
  118. replicate = &handle->per_worker[worker];
  119. replicate->initialized = 0;
  120. if (tmp_handles[worker])
  121. {
  122. starpu_data_unregister_no_coherency(tmp_handles[worker]);
  123. handle->per_worker[worker].refcnt--;
  124. /* TODO put in cache */
  125. }
  126. }
  127. _starpu_spin_unlock(&handle->header_lock);
  128. }