starpu_mpi_sync_data.c 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2015, 2016, 2017 CNRS
  4. *
  5. * StarPU is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * StarPU is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <stdlib.h>
  17. #include <starpu_mpi.h>
  18. #include <mpi/starpu_mpi_sync_data.h>
  19. #include <starpu_mpi_private.h>
  20. #include <common/uthash.h>
  21. #ifdef STARPU_USE_MPI_MPI
  22. struct _starpu_mpi_sync_data_handle_hashlist
  23. {
  24. struct _starpu_mpi_req_list list;
  25. UT_hash_handle hh;
  26. struct _starpu_mpi_node_tag node_tag;
  27. };
  28. /** stores data which have been received by MPI but have not been requested by the application */
  29. static starpu_pthread_mutex_t _starpu_mpi_sync_data_handle_mutex;
  30. static struct _starpu_mpi_sync_data_handle_hashlist *_starpu_mpi_sync_data_handle_hashmap = NULL;
  31. static int _starpu_mpi_sync_data_handle_hashmap_count = 0;
  32. void _starpu_mpi_sync_data_init(void)
  33. {
  34. _starpu_mpi_sync_data_handle_hashmap = NULL;
  35. STARPU_PTHREAD_MUTEX_INIT(&_starpu_mpi_sync_data_handle_mutex, NULL);
  36. _starpu_mpi_sync_data_handle_hashmap_count = 0;
  37. }
  38. void _starpu_mpi_sync_data_shutdown(void)
  39. {
  40. struct _starpu_mpi_sync_data_handle_hashlist *current, *tmp;
  41. HASH_ITER(hh, _starpu_mpi_sync_data_handle_hashmap, current, tmp)
  42. {
  43. STARPU_ASSERT(_starpu_mpi_req_list_empty(&current->list));
  44. HASH_DEL(_starpu_mpi_sync_data_handle_hashmap, current);
  45. free(current);
  46. }
  47. STARPU_PTHREAD_MUTEX_DESTROY(&_starpu_mpi_sync_data_handle_mutex);
  48. }
  49. #ifdef STARPU_VERBOSE
  50. static
  51. void _starpu_mpi_sync_data_handle_display_hash(struct _starpu_mpi_node_tag *node_tag)
  52. {
  53. struct _starpu_mpi_sync_data_handle_hashlist *hashlist;
  54. HASH_FIND(hh, _starpu_mpi_sync_data_handle_hashmap, node_tag, sizeof(struct _starpu_mpi_node_tag), hashlist);
  55. if (hashlist == NULL)
  56. {
  57. _STARPU_MPI_DEBUG(60, "Hashlist for comm %ld source %d and tag %d does not exist\n", (long int)node_tag->comm, node_tag->rank, node_tag->data_tag);
  58. }
  59. else if (_starpu_mpi_req_list_empty(&hashlist->list))
  60. {
  61. _STARPU_MPI_DEBUG(60, "Hashlist for comm %ld source %d and tag %d is empty\n", (long int)node_tag->comm, node_tag->rank, node_tag->data_tag);
  62. }
  63. else
  64. {
  65. struct _starpu_mpi_req *cur;
  66. for (cur = _starpu_mpi_req_list_begin(&hashlist->list) ;
  67. cur != _starpu_mpi_req_list_end(&hashlist->list);
  68. cur = _starpu_mpi_req_list_next(cur))
  69. {
  70. _STARPU_MPI_DEBUG(60, "Element for comm %ld source %d and tag %d: %p\n", (long int)node_tag->comm, node_tag->rank, node_tag->data_tag, cur);
  71. }
  72. }
  73. }
  74. #endif
  75. void _starpu_mpi_sync_data_check_termination(void)
  76. {
  77. STARPU_ASSERT_MSG(_starpu_mpi_sync_data_handle_hashmap_count == 0, "Number of sync received messages left is not zero, did you forget to post a receive corresponding to a send?");
  78. }
  79. int _starpu_mpi_sync_data_count(void)
  80. {
  81. return _starpu_mpi_sync_data_handle_hashmap_count;
  82. }
  83. struct _starpu_mpi_req *_starpu_mpi_sync_data_find(int data_tag, int source, MPI_Comm comm)
  84. {
  85. struct _starpu_mpi_req *req;
  86. struct _starpu_mpi_node_tag node_tag;
  87. struct _starpu_mpi_sync_data_handle_hashlist *found;
  88. memset(&node_tag, 0, sizeof(struct _starpu_mpi_node_tag));
  89. node_tag.comm = comm;
  90. node_tag.rank = source;
  91. node_tag.data_tag = data_tag;
  92. _STARPU_MPI_DEBUG(60, "Looking for sync_data_handle with comm %ld source %d tag %d in the hashmap\n", (long int)comm, source, data_tag);
  93. STARPU_PTHREAD_MUTEX_LOCK(&_starpu_mpi_sync_data_handle_mutex);
  94. HASH_FIND(hh, _starpu_mpi_sync_data_handle_hashmap, &node_tag, sizeof(struct _starpu_mpi_node_tag), found);
  95. if (found == NULL)
  96. {
  97. req = NULL;
  98. }
  99. else
  100. {
  101. if (_starpu_mpi_req_list_empty(&found->list))
  102. {
  103. req = NULL;
  104. }
  105. else
  106. {
  107. req = _starpu_mpi_req_list_pop_front(&found->list);
  108. _starpu_mpi_sync_data_handle_hashmap_count --;
  109. }
  110. }
  111. STARPU_PTHREAD_MUTEX_UNLOCK(&_starpu_mpi_sync_data_handle_mutex);
  112. _STARPU_MPI_DEBUG(60, "Found sync_data_handle %p with comm %ld source %d tag %d in the hashmap\n", req, (long int)comm, source, data_tag);
  113. return req;
  114. }
  115. void _starpu_mpi_sync_data_add(struct _starpu_mpi_req *sync_req)
  116. {
  117. struct _starpu_mpi_sync_data_handle_hashlist *hashlist;
  118. _STARPU_MPI_DEBUG(2000, "Adding sync_req %p with comm %ld source %d tag %d in the hashmap\n", sync_req, (long int)sync_req->node_tag.comm, sync_req->node_tag.rank, sync_req->node_tag.data_tag);
  119. STARPU_PTHREAD_MUTEX_LOCK(&_starpu_mpi_sync_data_handle_mutex);
  120. HASH_FIND(hh, _starpu_mpi_sync_data_handle_hashmap, &sync_req->node_tag, sizeof(struct _starpu_mpi_node_tag), hashlist);
  121. if (hashlist == NULL)
  122. {
  123. _STARPU_MPI_MALLOC(hashlist, sizeof(struct _starpu_mpi_sync_data_handle_hashlist));
  124. _starpu_mpi_req_list_init(&hashlist->list);
  125. hashlist->node_tag = sync_req->node_tag;
  126. HASH_ADD(hh, _starpu_mpi_sync_data_handle_hashmap, node_tag, sizeof(hashlist->node_tag), hashlist);
  127. }
  128. _starpu_mpi_req_list_push_back(&hashlist->list, sync_req);
  129. _starpu_mpi_sync_data_handle_hashmap_count ++;
  130. STARPU_PTHREAD_MUTEX_UNLOCK(&_starpu_mpi_sync_data_handle_mutex);
  131. #ifdef STARPU_VERBOSE
  132. _starpu_mpi_sync_data_handle_display_hash(&sync_req->node_tag);
  133. #endif
  134. }
  135. #endif // STARPU_USE_MPI_MPI