starpu_mpi_sync_data.c 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2015 Centre National de la Recherche Scientifique
  4. *
  5. * StarPU is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * StarPU is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <stdlib.h>
  17. #include <starpu_mpi.h>
  18. #include <starpu_mpi_sync_data.h>
  19. #include <starpu_mpi_private.h>
  20. #include <common/uthash.h>
  21. struct _starpu_mpi_sync_data_handle_hashlist
  22. {
  23. struct _starpu_mpi_req_list *list;
  24. UT_hash_handle hh;
  25. struct _starpu_mpi_node_tag node_tag;
  26. };
  27. /** stores data which have been received by MPI but have not been requested by the application */
  28. static starpu_pthread_mutex_t _starpu_mpi_sync_data_handle_mutex;
  29. static struct _starpu_mpi_sync_data_handle_hashlist *_starpu_mpi_sync_data_handle_hashmap = NULL;
  30. static int _starpu_mpi_sync_data_handle_hashmap_count = 0;
  31. void _starpu_mpi_sync_data_init(void)
  32. {
  33. _starpu_mpi_sync_data_handle_hashmap = NULL;
  34. STARPU_PTHREAD_MUTEX_INIT(&_starpu_mpi_sync_data_handle_mutex, NULL);
  35. _starpu_mpi_sync_data_handle_hashmap_count = 0;
  36. }
  37. void _starpu_mpi_sync_data_free(void)
  38. {
  39. struct _starpu_mpi_sync_data_handle_hashlist *current, *tmp;
  40. HASH_ITER(hh, _starpu_mpi_sync_data_handle_hashmap, current, tmp)
  41. {
  42. _starpu_mpi_req_list_delete(current->list);
  43. HASH_DEL(_starpu_mpi_sync_data_handle_hashmap, current);
  44. free(current);
  45. }
  46. STARPU_PTHREAD_MUTEX_DESTROY(&_starpu_mpi_sync_data_handle_mutex);
  47. free(_starpu_mpi_sync_data_handle_hashmap);
  48. }
  49. #ifdef STARPU_VERBOSE
  50. static
  51. void _starpu_mpi_sync_data_handle_display_hash(struct _starpu_mpi_node_tag *node_tag)
  52. {
  53. struct _starpu_mpi_sync_data_handle_hashlist *hashlist;
  54. HASH_FIND(hh, _starpu_mpi_sync_data_handle_hashmap, node_tag, sizeof(struct _starpu_mpi_node_tag), hashlist);
  55. if (hashlist == NULL)
  56. {
  57. _STARPU_MPI_DEBUG(60, "Hashlist for comm %p source %d and tag %d does not exist\n", node_tag->comm, node_tag->rank, node_tag->data_tag);
  58. }
  59. else if (_starpu_mpi_req_list_empty(hashlist->list))
  60. {
  61. _STARPU_MPI_DEBUG(60, "Hashlist for comm %p source %d and tag %d is empty\n", node_tag->comm, node_tag->rank, node_tag->data_tag);
  62. }
  63. else
  64. {
  65. struct _starpu_mpi_req *cur;
  66. for (cur = _starpu_mpi_req_list_begin(hashlist->list) ;
  67. cur != _starpu_mpi_req_list_end(hashlist->list);
  68. cur = _starpu_mpi_req_list_next(cur))
  69. {
  70. _STARPU_MPI_DEBUG(60, "Element for comm %p source %d and tag %d: %p\n", node_tag->comm, node_tag->rank, node_tag->data_tag, cur);
  71. }
  72. }
  73. }
  74. #endif
  75. void _starpu_mpi_sync_data_check_termination(void)
  76. {
  77. STARPU_ASSERT_MSG(_starpu_mpi_sync_data_handle_hashmap_count == 0, "Number of sync received messages left is not zero, did you forget to post a receive corresponding to a send?");
  78. }
  79. int _starpu_mpi_sync_data_count(void)
  80. {
  81. return _starpu_mpi_sync_data_handle_hashmap_count;
  82. }
  83. struct _starpu_mpi_req *_starpu_mpi_sync_data_find(int data_tag, int source, MPI_Comm comm)
  84. {
  85. struct _starpu_mpi_req *req;
  86. struct _starpu_mpi_node_tag node_tag;
  87. struct _starpu_mpi_sync_data_handle_hashlist *found;
  88. memset(&node_tag, 0, sizeof(struct _starpu_mpi_node_tag));
  89. node_tag.comm = comm;
  90. node_tag.rank = source;
  91. node_tag.data_tag = data_tag;
  92. _STARPU_MPI_DEBUG(60, "Looking for sync_data_handle with comm %p source %d tag %d in the hashmap\n", comm, source, data_tag);
  93. STARPU_PTHREAD_MUTEX_LOCK(&_starpu_mpi_sync_data_handle_mutex);
  94. HASH_FIND(hh, _starpu_mpi_sync_data_handle_hashmap, &node_tag, sizeof(struct _starpu_mpi_node_tag), found);
  95. if (found == NULL)
  96. {
  97. req = NULL;
  98. }
  99. else
  100. {
  101. if (_starpu_mpi_req_list_empty(found->list))
  102. {
  103. req = NULL;
  104. }
  105. else
  106. {
  107. req = _starpu_mpi_req_list_pop_front(found->list);
  108. _starpu_mpi_sync_data_handle_hashmap_count --;
  109. }
  110. }
  111. STARPU_PTHREAD_MUTEX_UNLOCK(&_starpu_mpi_sync_data_handle_mutex);
  112. _STARPU_MPI_DEBUG(60, "Found sync_data_handle %p with comm %p source %d tag %d in the hashmap\n", req, comm, source, data_tag);
  113. return req;
  114. }
  115. void _starpu_mpi_sync_data_add(struct _starpu_mpi_req *sync_req)
  116. {
  117. struct _starpu_mpi_sync_data_handle_hashlist *hashlist;
  118. _STARPU_MPI_DEBUG(2000, "Adding sync_req %p with comm %p source %d tag %d in the hashmap\n", sync_req, sync_req->node_tag.comm, sync_req->node_tag.rank, sync_req->node_tag.data_tag);
  119. STARPU_PTHREAD_MUTEX_LOCK(&_starpu_mpi_sync_data_handle_mutex);
  120. HASH_FIND(hh, _starpu_mpi_sync_data_handle_hashmap, &sync_req->node_tag, sizeof(struct _starpu_mpi_node_tag), hashlist);
  121. if (hashlist == NULL)
  122. {
  123. hashlist = malloc(sizeof(struct _starpu_mpi_sync_data_handle_hashlist));
  124. hashlist->list = _starpu_mpi_req_list_new();
  125. hashlist->node_tag = sync_req->node_tag;
  126. HASH_ADD(hh, _starpu_mpi_sync_data_handle_hashmap, node_tag, sizeof(hashlist->node_tag), hashlist);
  127. }
  128. _starpu_mpi_req_list_push_back(hashlist->list, sync_req);
  129. _starpu_mpi_sync_data_handle_hashmap_count ++;
  130. STARPU_PTHREAD_MUTEX_UNLOCK(&_starpu_mpi_sync_data_handle_mutex);
  131. #ifdef STARPU_VERBOSE
  132. _starpu_mpi_sync_data_handle_display_hash(&sync_req->node_tag);
  133. #endif
  134. }