starpu_mpi_sync_data.c 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2015, 2016 CNRS
  4. *
  5. * StarPU is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * StarPU is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <stdlib.h>
  17. #include <starpu_mpi.h>
  18. #include <starpu_mpi_sync_data.h>
  19. #include <starpu_mpi_private.h>
  20. #include <common/uthash.h>
  21. struct _starpu_mpi_sync_data_handle_hashlist
  22. {
  23. struct _starpu_mpi_req_list *list;
  24. UT_hash_handle hh;
  25. struct _starpu_mpi_node_tag node_tag;
  26. };
  27. /** stores data which have been received by MPI but have not been requested by the application */
  28. static starpu_pthread_mutex_t _starpu_mpi_sync_data_handle_mutex;
  29. static struct _starpu_mpi_sync_data_handle_hashlist *_starpu_mpi_sync_data_handle_hashmap = NULL;
  30. static int _starpu_mpi_sync_data_handle_hashmap_count = 0;
  31. void _starpu_mpi_sync_data_init(void)
  32. {
  33. _starpu_mpi_sync_data_handle_hashmap = NULL;
  34. STARPU_PTHREAD_MUTEX_INIT(&_starpu_mpi_sync_data_handle_mutex, NULL);
  35. _starpu_mpi_sync_data_handle_hashmap_count = 0;
  36. }
  37. void _starpu_mpi_sync_data_free(void)
  38. {
  39. struct _starpu_mpi_sync_data_handle_hashlist *current, *tmp;
  40. HASH_ITER(hh, _starpu_mpi_sync_data_handle_hashmap, current, tmp)
  41. {
  42. _starpu_mpi_req_list_delete(current->list);
  43. HASH_DEL(_starpu_mpi_sync_data_handle_hashmap, current);
  44. free(current);
  45. }
  46. STARPU_PTHREAD_MUTEX_DESTROY(&_starpu_mpi_sync_data_handle_mutex);
  47. }
  48. #ifdef STARPU_VERBOSE
  49. static
  50. void _starpu_mpi_sync_data_handle_display_hash(struct _starpu_mpi_node_tag *node_tag)
  51. {
  52. struct _starpu_mpi_sync_data_handle_hashlist *hashlist;
  53. HASH_FIND(hh, _starpu_mpi_sync_data_handle_hashmap, node_tag, sizeof(struct _starpu_mpi_node_tag), hashlist);
  54. if (hashlist == NULL)
  55. {
  56. _STARPU_MPI_DEBUG(60, "Hashlist for comm %d source %d and tag %d does not exist\n", node_tag->comm, node_tag->rank, node_tag->data_tag);
  57. }
  58. else if (_starpu_mpi_req_list_empty(hashlist->list))
  59. {
  60. _STARPU_MPI_DEBUG(60, "Hashlist for comm %d source %d and tag %d is empty\n", node_tag->comm, node_tag->rank, node_tag->data_tag);
  61. }
  62. else
  63. {
  64. struct _starpu_mpi_req *cur;
  65. for (cur = _starpu_mpi_req_list_begin(hashlist->list) ;
  66. cur != _starpu_mpi_req_list_end(hashlist->list);
  67. cur = _starpu_mpi_req_list_next(cur))
  68. {
  69. _STARPU_MPI_DEBUG(60, "Element for comm %d source %d and tag %d: %p\n", node_tag->comm, node_tag->rank, node_tag->data_tag, cur);
  70. }
  71. }
  72. }
  73. #endif
  74. void _starpu_mpi_sync_data_check_termination(void)
  75. {
  76. STARPU_ASSERT_MSG(_starpu_mpi_sync_data_handle_hashmap_count == 0, "Number of sync received messages left is not zero, did you forget to post a receive corresponding to a send?");
  77. }
  78. int _starpu_mpi_sync_data_count(void)
  79. {
  80. return _starpu_mpi_sync_data_handle_hashmap_count;
  81. }
  82. struct _starpu_mpi_req *_starpu_mpi_sync_data_find(int data_tag, int source, MPI_Comm comm)
  83. {
  84. struct _starpu_mpi_req *req;
  85. struct _starpu_mpi_node_tag node_tag;
  86. struct _starpu_mpi_sync_data_handle_hashlist *found;
  87. memset(&node_tag, 0, sizeof(struct _starpu_mpi_node_tag));
  88. node_tag.comm = comm;
  89. node_tag.rank = source;
  90. node_tag.data_tag = data_tag;
  91. _STARPU_MPI_DEBUG(60, "Looking for sync_data_handle with comm %d source %d tag %d in the hashmap\n", comm, source, data_tag);
  92. STARPU_PTHREAD_MUTEX_LOCK(&_starpu_mpi_sync_data_handle_mutex);
  93. HASH_FIND(hh, _starpu_mpi_sync_data_handle_hashmap, &node_tag, sizeof(struct _starpu_mpi_node_tag), found);
  94. if (found == NULL)
  95. {
  96. req = NULL;
  97. }
  98. else
  99. {
  100. if (_starpu_mpi_req_list_empty(found->list))
  101. {
  102. req = NULL;
  103. }
  104. else
  105. {
  106. req = _starpu_mpi_req_list_pop_front(found->list);
  107. _starpu_mpi_sync_data_handle_hashmap_count --;
  108. }
  109. }
  110. STARPU_PTHREAD_MUTEX_UNLOCK(&_starpu_mpi_sync_data_handle_mutex);
  111. _STARPU_MPI_DEBUG(60, "Found sync_data_handle %p with comm %d source %d tag %d in the hashmap\n", req, comm, source, data_tag);
  112. return req;
  113. }
  114. void _starpu_mpi_sync_data_add(struct _starpu_mpi_req *sync_req)
  115. {
  116. struct _starpu_mpi_sync_data_handle_hashlist *hashlist;
  117. _STARPU_MPI_DEBUG(2000, "Adding sync_req %p with comm %d source %d tag %d in the hashmap\n", sync_req, sync_req->node_tag.comm, sync_req->node_tag.rank, sync_req->node_tag.data_tag);
  118. STARPU_PTHREAD_MUTEX_LOCK(&_starpu_mpi_sync_data_handle_mutex);
  119. HASH_FIND(hh, _starpu_mpi_sync_data_handle_hashmap, &sync_req->node_tag, sizeof(struct _starpu_mpi_node_tag), hashlist);
  120. if (hashlist == NULL)
  121. {
  122. _STARPU_MPI_MALLOC(hashlist, sizeof(struct _starpu_mpi_sync_data_handle_hashlist));
  123. hashlist->list = _starpu_mpi_req_list_new();
  124. hashlist->node_tag = sync_req->node_tag;
  125. HASH_ADD(hh, _starpu_mpi_sync_data_handle_hashmap, node_tag, sizeof(hashlist->node_tag), hashlist);
  126. }
  127. _starpu_mpi_req_list_push_back(hashlist->list, sync_req);
  128. _starpu_mpi_sync_data_handle_hashmap_count ++;
  129. STARPU_PTHREAD_MUTEX_UNLOCK(&_starpu_mpi_sync_data_handle_mutex);
  130. #ifdef STARPU_VERBOSE
  131. _starpu_mpi_sync_data_handle_display_hash(&sync_req->node_tag);
  132. #endif
  133. }