starpu_mpi_sync_data.c 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2015 Centre National de la Recherche Scientifique
  4. *
  5. * StarPU is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * StarPU is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <stdlib.h>
  17. #include <starpu_mpi.h>
  18. #include <starpu_mpi_sync_data.h>
  19. #include <starpu_mpi_private.h>
  20. #include <common/uthash.h>
  21. struct _starpu_mpi_sync_data_handle_hashlist
  22. {
  23. struct _starpu_mpi_sync_data_handle_list *list;
  24. UT_hash_handle hh;
  25. int data_tag;
  26. };
  27. /** stores data which have been received by MPI but have not been requested by the application */
  28. static starpu_pthread_mutex_t *_starpu_mpi_sync_data_handle_mutex;
  29. static struct _starpu_mpi_sync_data_handle_hashlist **_starpu_mpi_sync_data_handle_hashmap = NULL;
  30. static int _starpu_mpi_sync_data_handle_hashmap_count = 0;
  31. #ifdef STARPU_VERBOSE
  32. static
  33. void _starpu_mpi_sync_data_handle_display_hash(int source, int tag)
  34. {
  35. struct _starpu_mpi_sync_data_handle_hashlist *hashlist;
  36. HASH_FIND_INT(_starpu_mpi_sync_data_handle_hashmap[source], &tag, hashlist);
  37. if (hashlist == NULL)
  38. {
  39. _STARPU_MPI_DEBUG(60, "Hashlist for source %d and tag %d does not exist\n", source, tag);
  40. }
  41. else if (_starpu_mpi_sync_data_handle_list_empty(hashlist->list))
  42. {
  43. _STARPU_MPI_DEBUG(60, "Hashlist for source %d and tag %d is empty\n", source, tag);
  44. }
  45. else
  46. {
  47. struct _starpu_mpi_sync_data_handle *cur;
  48. for (cur = _starpu_mpi_sync_data_handle_list_begin(hashlist->list) ;
  49. cur != _starpu_mpi_sync_data_handle_list_end(hashlist->list);
  50. cur = _starpu_mpi_sync_data_handle_list_next(cur))
  51. {
  52. _STARPU_MPI_DEBUG(60, "Element for source %d and tag %d: %p\n", source, tag, cur);
  53. }
  54. }
  55. }
  56. #endif
  57. void _starpu_mpi_sync_data_init(int world_size)
  58. {
  59. int k;
  60. _starpu_mpi_sync_data_handle_hashmap = malloc(world_size * sizeof(struct _starpu_mpi_sync_data_handle_hash_list *));
  61. _starpu_mpi_sync_data_handle_mutex = malloc(world_size * sizeof(starpu_pthread_mutex_t));
  62. for(k=0 ; k<world_size ; k++)
  63. {
  64. _starpu_mpi_sync_data_handle_hashmap[k] = NULL;
  65. STARPU_PTHREAD_MUTEX_INIT(&_starpu_mpi_sync_data_handle_mutex[k], NULL);
  66. }
  67. }
  68. void _starpu_mpi_sync_data_check_termination()
  69. {
  70. STARPU_ASSERT_MSG(_starpu_mpi_sync_data_handle_hashmap_count == 0, "Number of sync received messages left is not zero, did you forget to post a receive corresponding to a send?");
  71. }
  72. void _starpu_mpi_sync_data_free(int world_size)
  73. {
  74. int n;
  75. struct _starpu_mpi_sync_data_handle_hashlist *hashlist;
  76. for(n=0 ; n<world_size; n++)
  77. {
  78. for(hashlist=_starpu_mpi_sync_data_handle_hashmap[n]; hashlist != NULL; hashlist=hashlist->hh.next)
  79. {
  80. _starpu_mpi_sync_data_handle_list_delete(hashlist->list);
  81. }
  82. struct _starpu_mpi_sync_data_handle_hashlist *current, *tmp;
  83. HASH_ITER(hh, _starpu_mpi_sync_data_handle_hashmap[n], current, tmp)
  84. {
  85. HASH_DEL(_starpu_mpi_sync_data_handle_hashmap[n], current);
  86. free(current);
  87. }
  88. STARPU_PTHREAD_MUTEX_DESTROY(&_starpu_mpi_sync_data_handle_mutex[n]);
  89. }
  90. free(_starpu_mpi_sync_data_handle_hashmap);
  91. free(_starpu_mpi_sync_data_handle_mutex);
  92. }
  93. int _starpu_mpi_sync_data_count()
  94. {
  95. return _starpu_mpi_sync_data_handle_hashmap_count;
  96. }
  97. struct _starpu_mpi_sync_data_handle *_starpu_mpi_sync_data_create(struct _starpu_mpi_req *req)
  98. {
  99. struct _starpu_mpi_sync_data_handle* sync_data_handle = calloc(1, sizeof(struct _starpu_mpi_sync_data_handle));
  100. STARPU_ASSERT(sync_data_handle);
  101. sync_data_handle->data_tag = req->data_tag;
  102. sync_data_handle->source = req->srcdst;
  103. sync_data_handle->req = req;
  104. return sync_data_handle;
  105. }
  106. struct _starpu_mpi_sync_data_handle *_starpu_mpi_sync_data_find(int data_tag, int source)
  107. {
  108. struct _starpu_mpi_sync_data_handle_hashlist *hashlist;
  109. struct _starpu_mpi_sync_data_handle *sync_data_handle;
  110. _STARPU_MPI_DEBUG(60, "Looking for sync_data_handle with tag %d in the hashmap[%d]\n", data_tag, source);
  111. STARPU_PTHREAD_MUTEX_LOCK(&_starpu_mpi_sync_data_handle_mutex[source]);
  112. HASH_FIND_INT(_starpu_mpi_sync_data_handle_hashmap[source], &data_tag, hashlist);
  113. if (hashlist == NULL)
  114. {
  115. sync_data_handle = NULL;
  116. }
  117. else
  118. {
  119. if (_starpu_mpi_sync_data_handle_list_empty(hashlist->list))
  120. {
  121. sync_data_handle = NULL;
  122. }
  123. else
  124. {
  125. sync_data_handle = _starpu_mpi_sync_data_handle_list_pop_front(hashlist->list);
  126. _starpu_mpi_sync_data_handle_hashmap_count --;
  127. }
  128. }
  129. STARPU_PTHREAD_MUTEX_UNLOCK(&_starpu_mpi_sync_data_handle_mutex[source]);
  130. _STARPU_MPI_DEBUG(60, "Found sync_data_handle %p with tag %d in the hashmap[%d]\n", sync_data_handle, data_tag, source);
  131. return sync_data_handle;
  132. }
  133. void _starpu_mpi_sync_data_add(struct _starpu_mpi_sync_data_handle *sync_data_handle)
  134. {
  135. _STARPU_MPI_DEBUG(2000, "Adding sync_data_handle %p with tag %d in the hashmap[%d]\n", sync_data_handle, sync_data_handle->data_tag, sync_data_handle->source);
  136. struct _starpu_mpi_sync_data_handle_hashlist *hashlist;
  137. STARPU_PTHREAD_MUTEX_LOCK(&_starpu_mpi_sync_data_handle_mutex[sync_data_handle->source]);
  138. HASH_FIND_INT(_starpu_mpi_sync_data_handle_hashmap[sync_data_handle->source], &sync_data_handle->data_tag, hashlist);
  139. if (hashlist == NULL)
  140. {
  141. hashlist = malloc(sizeof(struct _starpu_mpi_sync_data_handle_hashlist));
  142. hashlist->list = _starpu_mpi_sync_data_handle_list_new();
  143. hashlist->data_tag = sync_data_handle->data_tag;
  144. HASH_ADD_INT(_starpu_mpi_sync_data_handle_hashmap[sync_data_handle->source], data_tag, hashlist);
  145. }
  146. _starpu_mpi_sync_data_handle_list_push_back(hashlist->list, sync_data_handle);
  147. _starpu_mpi_sync_data_handle_hashmap_count ++;
  148. STARPU_PTHREAD_MUTEX_UNLOCK(&_starpu_mpi_sync_data_handle_mutex[sync_data_handle->source]);
  149. #ifdef STARPU_VERBOSE
  150. _starpu_mpi_sync_data_handle_display_hash(sync_data_handle->source, sync_data_handle->data_tag);
  151. #endif
  152. }