starpu_mpi_cache.c 10 KB


  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2011-2020 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
  4. *
  5. * StarPU is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * StarPU is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <starpu.h>
  17. #include <common/uthash.h>
  18. #include <datawizard/coherency.h>
  19. #include <starpu_mpi_cache.h>
  20. #include <starpu_mpi_cache_stats.h>
  21. #include <starpu_mpi_private.h>
  22. /* Whether we are allowed to keep copies of remote data. */
  23. struct _starpu_data_entry
  24. {
  25. UT_hash_handle hh;
  26. starpu_data_handle_t data_handle;
  27. };
  28. static starpu_pthread_mutex_t _cache_mutex;
  29. static struct _starpu_data_entry *_cache_data = NULL;
  30. int _starpu_cache_enabled=1;
  31. static MPI_Comm _starpu_cache_comm;
  32. static int _starpu_cache_comm_size;
  33. static void _starpu_mpi_cache_flush_nolock(starpu_data_handle_t data_handle);
  34. int starpu_mpi_cache_is_enabled()
  35. {
  36. return _starpu_cache_enabled==1;
  37. }
  38. int starpu_mpi_cache_set(int enabled)
  39. {
  40. if (enabled == 1)
  41. {
  42. _starpu_cache_enabled = 1;
  43. }
  44. else
  45. {
  46. if (_starpu_cache_enabled)
  47. {
  48. // We need to clean the cache
  49. starpu_mpi_cache_flush_all_data(_starpu_cache_comm);
  50. _starpu_mpi_cache_shutdown();
  51. }
  52. _starpu_cache_enabled = 0;
  53. }
  54. return 0;
  55. }
  56. void _starpu_mpi_cache_init(MPI_Comm comm)
  57. {
  58. _starpu_cache_enabled = starpu_get_env_number("STARPU_MPI_CACHE");
  59. if (_starpu_cache_enabled == -1)
  60. {
  61. _starpu_cache_enabled = 1;
  62. }
  63. if (_starpu_cache_enabled == 0)
  64. {
  65. _STARPU_DISP("Warning: StarPU MPI Communication cache is disabled\n");
  66. return;
  67. }
  68. _starpu_cache_comm = comm;
  69. starpu_mpi_comm_size(comm, &_starpu_cache_comm_size);
  70. _starpu_mpi_cache_stats_init();
  71. STARPU_PTHREAD_MUTEX_INIT(&_cache_mutex, NULL);
  72. }
  73. void _starpu_mpi_cache_shutdown()
  74. {
  75. if (_starpu_cache_enabled == 0)
  76. return;
  77. struct _starpu_data_entry *entry=NULL, *tmp=NULL;
  78. STARPU_PTHREAD_MUTEX_LOCK(&_cache_mutex);
  79. HASH_ITER(hh, _cache_data, entry, tmp)
  80. {
  81. HASH_DEL(_cache_data, entry);
  82. free(entry);
  83. }
  84. STARPU_PTHREAD_MUTEX_UNLOCK(&_cache_mutex);
  85. STARPU_PTHREAD_MUTEX_DESTROY(&_cache_mutex);
  86. _starpu_mpi_cache_stats_shutdown();
  87. }
  88. void _starpu_mpi_cache_data_clear(starpu_data_handle_t data_handle)
  89. {
  90. struct _starpu_mpi_data *mpi_data = data_handle->mpi_data;
  91. if (_starpu_cache_enabled == 1)
  92. {
  93. struct _starpu_data_entry *entry;
  94. STARPU_PTHREAD_MUTEX_LOCK(&_cache_mutex);
  95. _starpu_mpi_cache_flush_nolock(data_handle);
  96. HASH_FIND_PTR(_cache_data, &data_handle, entry);
  97. if (entry != NULL)
  98. {
  99. HASH_DEL(_cache_data, entry);
  100. free(entry);
  101. }
  102. STARPU_PTHREAD_MUTEX_UNLOCK(&_cache_mutex);
  103. }
  104. free(mpi_data->cache_sent);
  105. }
  106. void _starpu_mpi_cache_data_init(starpu_data_handle_t data_handle)
  107. {
  108. int i;
  109. struct _starpu_mpi_data *mpi_data = data_handle->mpi_data;
  110. if (_starpu_cache_enabled == 0)
  111. return;
  112. STARPU_PTHREAD_MUTEX_LOCK(&_cache_mutex);
  113. mpi_data->cache_received = 0;
  114. _STARPU_MALLOC(mpi_data->cache_sent, _starpu_cache_comm_size*sizeof(mpi_data->cache_sent[0]));
  115. for(i=0 ; i<_starpu_cache_comm_size ; i++)
  116. {
  117. mpi_data->cache_sent[i] = 0;
  118. }
  119. STARPU_PTHREAD_MUTEX_UNLOCK(&_cache_mutex);
  120. }
  121. static void _starpu_mpi_cache_data_add_nolock(starpu_data_handle_t data_handle)
  122. {
  123. struct _starpu_data_entry *entry;
  124. if (_starpu_cache_enabled == 0)
  125. return;
  126. HASH_FIND_PTR(_cache_data, &data_handle, entry);
  127. if (entry == NULL)
  128. {
  129. _STARPU_MPI_MALLOC(entry, sizeof(*entry));
  130. entry->data_handle = data_handle;
  131. HASH_ADD_PTR(_cache_data, data_handle, entry);
  132. }
  133. }
  134. static void _starpu_mpi_cache_data_remove_nolock(starpu_data_handle_t data_handle)
  135. {
  136. struct _starpu_data_entry *entry;
  137. if (_starpu_cache_enabled == 0)
  138. return;
  139. HASH_FIND_PTR(_cache_data, &data_handle, entry);
  140. if (entry)
  141. {
  142. HASH_DEL(_cache_data, entry);
  143. free(entry);
  144. }
  145. }
  146. /**************************************
  147. * Received cache
  148. **************************************/
  149. void starpu_mpi_cached_receive_clear(starpu_data_handle_t data_handle)
  150. {
  151. int mpi_rank = starpu_mpi_data_get_rank(data_handle);
  152. struct _starpu_mpi_data *mpi_data = data_handle->mpi_data;
  153. if (_starpu_cache_enabled == 0)
  154. return;
  155. STARPU_PTHREAD_MUTEX_LOCK(&_cache_mutex);
  156. STARPU_ASSERT(mpi_data->magic == 42);
  157. STARPU_MPI_ASSERT_MSG(mpi_rank < _starpu_cache_comm_size, "Node %d invalid. Max node is %d\n", mpi_rank, _starpu_cache_comm_size);
  158. if (mpi_data->cache_received == 1)
  159. {
  160. #ifdef STARPU_DEVEL
  161. # warning TODO: Somebody else will write to the data, so discard our cached copy if any. starpu_mpi could just remember itself.
  162. #endif
  163. _STARPU_MPI_DEBUG(2, "Clearing receive cache for data %p\n", data_handle);
  164. mpi_data->cache_received = 0;
  165. starpu_data_invalidate_submit(data_handle);
  166. _starpu_mpi_cache_data_remove_nolock(data_handle);
  167. _starpu_mpi_cache_stats_dec(mpi_rank, data_handle);
  168. }
  169. STARPU_PTHREAD_MUTEX_UNLOCK(&_cache_mutex);
  170. }
  171. int starpu_mpi_cached_receive_set(starpu_data_handle_t data_handle)
  172. {
  173. int mpi_rank = starpu_mpi_data_get_rank(data_handle);
  174. struct _starpu_mpi_data *mpi_data = data_handle->mpi_data;
  175. if (_starpu_cache_enabled == 0)
  176. return 0;
  177. STARPU_PTHREAD_MUTEX_LOCK(&_cache_mutex);
  178. STARPU_ASSERT(mpi_data->magic == 42);
  179. STARPU_MPI_ASSERT_MSG(mpi_rank < _starpu_cache_comm_size, "Node %d invalid. Max node is %d\n", mpi_rank, _starpu_cache_comm_size);
  180. int already_received = mpi_data->cache_received;
  181. if (already_received == 0)
  182. {
  183. _STARPU_MPI_DEBUG(2, "Noting that data %p has already been received by %d\n", data_handle, mpi_rank);
  184. mpi_data->cache_received = 1;
  185. _starpu_mpi_cache_data_add_nolock(data_handle);
  186. _starpu_mpi_cache_stats_inc(mpi_rank, data_handle);
  187. }
  188. else
  189. {
  190. _STARPU_MPI_DEBUG(2, "Do not receive data %p from node %d as it is already available\n", data_handle, mpi_rank);
  191. }
  192. STARPU_PTHREAD_MUTEX_UNLOCK(&_cache_mutex);
  193. return already_received;
  194. }
  195. int starpu_mpi_cached_receive(starpu_data_handle_t data_handle)
  196. {
  197. int already_received;
  198. struct _starpu_mpi_data *mpi_data = data_handle->mpi_data;
  199. if (_starpu_cache_enabled == 0)
  200. return 0;
  201. STARPU_PTHREAD_MUTEX_LOCK(&_cache_mutex);
  202. STARPU_ASSERT(mpi_data->magic == 42);
  203. already_received = mpi_data->cache_received;
  204. STARPU_PTHREAD_MUTEX_UNLOCK(&_cache_mutex);
  205. return already_received;
  206. }
  207. /**************************************
  208. * Send cache
  209. **************************************/
  210. void starpu_mpi_cached_send_clear(starpu_data_handle_t data_handle)
  211. {
  212. int n, size;
  213. struct _starpu_mpi_data *mpi_data = data_handle->mpi_data;
  214. if (_starpu_cache_enabled == 0)
  215. return;
  216. STARPU_PTHREAD_MUTEX_LOCK(&_cache_mutex);
  217. starpu_mpi_comm_size(mpi_data->node_tag.node.comm, &size);
  218. for(n=0 ; n<size ; n++)
  219. {
  220. if (mpi_data->cache_sent[n] == 1)
  221. {
  222. _STARPU_MPI_DEBUG(2, "Clearing send cache for data %p\n", data_handle);
  223. mpi_data->cache_sent[n] = 0;
  224. _starpu_mpi_cache_data_remove_nolock(data_handle);
  225. }
  226. }
  227. STARPU_PTHREAD_MUTEX_UNLOCK(&_cache_mutex);
  228. }
  229. int starpu_mpi_cached_send_set(starpu_data_handle_t data_handle, int dest)
  230. {
  231. struct _starpu_mpi_data *mpi_data = data_handle->mpi_data;
  232. if (_starpu_cache_enabled == 0)
  233. return 0;
  234. STARPU_MPI_ASSERT_MSG(dest < _starpu_cache_comm_size, "Node %d invalid. Max node is %d\n", dest, _starpu_cache_comm_size);
  235. STARPU_PTHREAD_MUTEX_LOCK(&_cache_mutex);
  236. int already_sent = mpi_data->cache_sent[dest];
  237. if (mpi_data->cache_sent[dest] == 0)
  238. {
  239. mpi_data->cache_sent[dest] = 1;
  240. _starpu_mpi_cache_data_add_nolock(data_handle);
  241. _STARPU_MPI_DEBUG(2, "Noting that data %p has already been sent to %d\n", data_handle, dest);
  242. }
  243. else
  244. {
  245. _STARPU_MPI_DEBUG(2, "Do not send data %p to node %d as it has already been sent\n", data_handle, dest);
  246. }
  247. STARPU_PTHREAD_MUTEX_UNLOCK(&_cache_mutex);
  248. return already_sent;
  249. }
  250. int starpu_mpi_cached_send(starpu_data_handle_t data_handle, int dest)
  251. {
  252. struct _starpu_mpi_data *mpi_data = data_handle->mpi_data;
  253. int already_sent;
  254. if (_starpu_cache_enabled == 0)
  255. return 0;
  256. STARPU_PTHREAD_MUTEX_LOCK(&_cache_mutex);
  257. STARPU_MPI_ASSERT_MSG(dest < _starpu_cache_comm_size, "Node %d invalid. Max node is %d\n", dest, _starpu_cache_comm_size);
  258. already_sent = mpi_data->cache_sent[dest];
  259. STARPU_PTHREAD_MUTEX_UNLOCK(&_cache_mutex);
  260. return already_sent;
  261. }
  262. static void _starpu_mpi_cache_flush_nolock(starpu_data_handle_t data_handle)
  263. {
  264. struct _starpu_mpi_data *mpi_data = data_handle->mpi_data;
  265. int i, nb_nodes;
  266. if (_starpu_cache_enabled == 0)
  267. return;
  268. starpu_mpi_comm_size(mpi_data->node_tag.node.comm, &nb_nodes);
  269. for(i=0 ; i<nb_nodes ; i++)
  270. {
  271. if (mpi_data->cache_sent[i] == 1)
  272. {
  273. _STARPU_MPI_DEBUG(2, "Clearing send cache for data %p\n", data_handle);
  274. mpi_data->cache_sent[i] = 0;
  275. _starpu_mpi_cache_stats_dec(i, data_handle);
  276. }
  277. }
  278. if (mpi_data->cache_received == 1)
  279. {
  280. int mpi_rank = starpu_mpi_data_get_rank(data_handle);
  281. _STARPU_MPI_DEBUG(2, "Clearing received cache for data %p\n", data_handle);
  282. mpi_data->cache_received = 0;
  283. _starpu_mpi_cache_stats_dec(mpi_rank, data_handle);
  284. }
  285. }
  286. void _starpu_mpi_cache_flush(starpu_data_handle_t data_handle)
  287. {
  288. if (_starpu_cache_enabled == 0)
  289. return;
  290. STARPU_PTHREAD_MUTEX_LOCK(&_cache_mutex);
  291. _starpu_mpi_cache_flush_nolock(data_handle);
  292. STARPU_PTHREAD_MUTEX_UNLOCK(&_cache_mutex);
  293. }
  294. static void _starpu_mpi_cache_flush_and_invalidate_nolock(MPI_Comm comm, starpu_data_handle_t data_handle)
  295. {
  296. int my_rank, mpi_rank;
  297. _starpu_mpi_cache_flush_nolock(data_handle);
  298. starpu_mpi_comm_rank(comm, &my_rank);
  299. mpi_rank = starpu_mpi_data_get_rank(data_handle);
  300. if (mpi_rank != my_rank && mpi_rank != -1)
  301. starpu_data_invalidate_submit(data_handle);
  302. }
  303. void starpu_mpi_cache_flush(MPI_Comm comm, starpu_data_handle_t data_handle)
  304. {
  305. _starpu_mpi_data_flush(data_handle);
  306. if (_starpu_cache_enabled == 0)
  307. return;
  308. STARPU_PTHREAD_MUTEX_LOCK(&_cache_mutex);
  309. _starpu_mpi_cache_flush_and_invalidate_nolock(comm, data_handle);
  310. _starpu_mpi_cache_data_remove_nolock(data_handle);
  311. STARPU_PTHREAD_MUTEX_UNLOCK(&_cache_mutex);
  312. }
  313. void starpu_mpi_cache_flush_all_data(MPI_Comm comm)
  314. {
  315. struct _starpu_data_entry *entry=NULL, *tmp=NULL;
  316. if (_starpu_cache_enabled == 0)
  317. return;
  318. STARPU_PTHREAD_MUTEX_LOCK(&_cache_mutex);
  319. HASH_ITER(hh, _cache_data, entry, tmp)
  320. {
  321. _starpu_mpi_cache_flush_and_invalidate_nolock(comm, entry->data_handle);
  322. HASH_DEL(_cache_data, entry);
  323. free(entry);
  324. }
  325. STARPU_PTHREAD_MUTEX_UNLOCK(&_cache_mutex);
  326. }