combined_workers.c 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2013-2015,2017 Inria
  4. * Copyright (C) 2010-2015,2018-2019 Université de Bordeaux
  5. * Copyright (C) 2010,2011,2013-2017 CNRS
  6. * Copyright (C) 2013 Simon Archipoff
  7. * Copyright (C) 2013 Thibaut Lambert
  8. *
  9. * StarPU is free software; you can redistribute it and/or modify
  10. * it under the terms of the GNU Lesser General Public License as published by
  11. * the Free Software Foundation; either version 2.1 of the License, or (at
  12. * your option) any later version.
  13. *
  14. * StarPU is distributed in the hope that it will be useful, but
  15. * WITHOUT ANY WARRANTY; without even the implied warranty of
  16. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  17. *
  18. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  19. */
  20. #include <stdlib.h> // for qsort
  21. #include <starpu.h>
  22. #include <common/config.h>
  23. #include <core/workers.h>
  24. #ifdef __GLIBC__
  25. #include <sched.h>
  26. #endif
  27. #if defined(_WIN32) && !defined(__CYGWIN__)
  28. #include <windows.h>
  29. #endif
  30. static int compar_int(const void *pa, const void *pb)
  31. {
  32. int a = *((int *)pa);
  33. int b = *((int *)pb);
  34. return a - b;
  35. }
  36. static void sort_workerid_array(int nworkers, int workerid_array[])
  37. {
  38. qsort(workerid_array, nworkers, sizeof(int), compar_int);
  39. }
  40. /* Create a new worker id for a combination of workers. This method should
  41. * typically be called at the initialization of the scheduling policy. This
  42. * worker should be the combination of the list of id's contained in the
  43. * workerid_array array which has nworkers entries. This function returns
  44. * the identifier of the combined worker in case of success, a negative value
  45. * is returned otherwise. */
  46. int starpu_combined_worker_assign_workerid(int nworkers, int workerid_array[])
  47. {
  48. int new_workerid;
  49. /* Return the number of actual workers. */
  50. struct _starpu_machine_config *config = _starpu_get_machine_config();
  51. int basic_worker_count = (int)config->topology.nworkers;
  52. int combined_worker_id = (int)config->topology.ncombinedworkers;
  53. /* We sort the ids */
  54. sort_workerid_array(nworkers, workerid_array);
  55. /* Test that all workers are not combined workers already. */
  56. int i;
  57. for (i = 0; i < nworkers; i++)
  58. {
  59. int id = workerid_array[i];
  60. /* We only combine valid "basic" workers */
  61. if ((id < 0) || (id >= basic_worker_count))
  62. return -EINVAL;
  63. #ifdef STARPU_USE_MIC
  64. STARPU_ASSERT(config->workers[id].arch == STARPU_CPU_WORKER || config->workers[id].arch == STARPU_MIC_WORKER);
  65. STARPU_ASSERT(config->workers[id].worker_mask == STARPU_CPU || config->workers[id].worker_mask == STARPU_MIC);
  66. #else/* STARPU_USE_MIC */
  67. /* We only combine CPUs */
  68. STARPU_ASSERT(config->workers[id].arch == STARPU_CPU_WORKER);
  69. STARPU_ASSERT(config->workers[id].worker_mask == STARPU_CPU);
  70. #endif /* STARPU_USE_MIC */
  71. }
  72. /* Get an id for that combined worker. Note that this is not thread
  73. * safe because this method should only be called when the scheduler
  74. * is being initialized. */
  75. new_workerid = basic_worker_count + combined_worker_id;
  76. STARPU_ASSERT_MSG(new_workerid < STARPU_NMAXWORKERS, "Too many combined workers for parallel task execution. Please use configure option --enable-maxcpus to increase it beyond the current value %d", STARPU_MAXCPUS);
  77. config->topology.ncombinedworkers++;
  78. // fprintf(stderr, "COMBINED WORKERS ");
  79. // for (i = 0; i < nworkers; i++)
  80. // {
  81. // fprintf(stderr, "%d ", workerid_array[i]);
  82. // }
  83. // fprintf(stderr, "into worker %d\n", new_workerid);
  84. for(i = 0; i < nworkers; i++)
  85. _starpu_get_worker_struct(workerid_array[i])->combined_workerid = new_workerid;
  86. struct _starpu_combined_worker *combined_worker =
  87. &config->combined_workers[combined_worker_id];
  88. combined_worker->worker_size = nworkers;
  89. _STARPU_MALLOC(combined_worker->perf_arch.devices, sizeof(struct starpu_perfmodel_device));
  90. combined_worker->perf_arch.ndevices = 1;
  91. combined_worker->perf_arch.devices[0].type = config->workers[workerid_array[0]].perf_arch.devices[0].type;
  92. combined_worker->perf_arch.devices[0].devid = config->workers[workerid_array[0]].perf_arch.devices[0].devid;
  93. combined_worker->perf_arch.devices[0].ncores = nworkers;
  94. combined_worker->worker_mask = config->workers[workerid_array[0]].worker_mask;
  95. #ifdef STARPU_USE_MP
  96. combined_worker->count = nworkers -1;
  97. STARPU_PTHREAD_MUTEX_INIT(&combined_worker->count_mutex,NULL);
  98. #endif
  99. /* We assume that the memory node should either be that of the first
  100. * entry, and it is very likely that every worker in the combination
  101. * should be on the same memory node.*/
  102. int first_id = workerid_array[0];
  103. combined_worker->memory_node = config->workers[first_id].memory_node;
  104. /* Save the list of combined workers */
  105. memcpy(&combined_worker->combined_workerid, workerid_array, nworkers*sizeof(int));
  106. /* Note that we maintain both the cpu_set and the hwloc_cpu_set so that
  107. * the application is not forced to use hwloc when it is available. */
  108. #ifdef __GLIBC__
  109. CPU_ZERO(&combined_worker->cpu_set);
  110. #endif /* __GLIBC__ */
  111. #ifdef STARPU_HAVE_HWLOC
  112. combined_worker->hwloc_cpu_set = hwloc_bitmap_alloc();
  113. #endif
  114. for (i = 0; i < nworkers; i++)
  115. {
  116. #if defined(__GLIBC__) || defined(STARPU_HAVE_HWLOC)
  117. int id = workerid_array[i];
  118. #ifdef __GLIBC__
  119. #ifdef CPU_OR
  120. CPU_OR(&combined_worker->cpu_set,
  121. &combined_worker->cpu_set,
  122. &config->workers[id].cpu_set);
  123. #else
  124. int j;
  125. for (j = 0; j < CPU_SETSIZE; j++)
  126. {
  127. if (CPU_ISSET(j, &config->workers[id].cpu_set))
  128. CPU_SET(j, &combined_worker->cpu_set);
  129. }
  130. #endif
  131. #endif /* __GLIBC__ */
  132. #ifdef STARPU_HAVE_HWLOC
  133. hwloc_bitmap_or(combined_worker->hwloc_cpu_set,
  134. combined_worker->hwloc_cpu_set,
  135. config->workers[id].hwloc_cpu_set);
  136. #endif
  137. #endif
  138. }
  139. starpu_sched_ctx_add_combined_workers(&new_workerid, 1, STARPU_GLOBAL_SCHED_CTX);
  140. return new_workerid;
  141. }
  142. int starpu_combined_worker_get_description(int workerid, int *worker_size, int **combined_workerid)
  143. {
  144. /* Check that this is the id of a combined worker */
  145. struct _starpu_combined_worker *worker;
  146. worker = _starpu_get_combined_worker_struct(workerid);
  147. STARPU_ASSERT(worker);
  148. if (worker_size)
  149. *worker_size = worker->worker_size;
  150. if (combined_workerid)
  151. *combined_workerid = worker->combined_workerid;
  152. return 0;
  153. }