starpu_mpi.c 51 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2009, 2010-2014 Université de Bordeaux
  4. * Copyright (C) 2010, 2011, 2012, 2013, 2014 Centre National de la Recherche Scientifique
  5. *
  6. * StarPU is free software; you can redistribute it and/or modify
  7. * it under the terms of the GNU Lesser General Public License as published by
  8. * the Free Software Foundation; either version 2.1 of the License, or (at
  9. * your option) any later version.
  10. *
  11. * StarPU is distributed in the hope that it will be useful, but
  12. * WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  14. *
  15. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  16. */
  17. #include <stdlib.h>
  18. #include <starpu_mpi.h>
  19. #include <starpu_mpi_datatype.h>
  20. #include <starpu_mpi_private.h>
  21. #include <starpu_profiling.h>
  22. #include <starpu_mpi_stats.h>
  23. #include <starpu_mpi_task_insert.h>
  24. #include <starpu_mpi_early_data.h>
  25. #include <starpu_mpi_early_request.h>
  26. #include <common/config.h>
  27. #include <common/thread.h>
  28. #include <datawizard/interfaces/data_interface.h>
  29. #include <datawizard/coherency.h>
  30. static void _starpu_mpi_add_sync_point_in_fxt(void);
  31. static void _starpu_mpi_submit_ready_request(void *arg);
  32. static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req);
  33. #ifdef STARPU_VERBOSE
  34. static char *_starpu_mpi_request_type(enum _starpu_mpi_request_type request_type);
  35. #endif
  36. static struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t data_handle,
  37. int dest, int mpi_tag, MPI_Comm comm,
  38. unsigned detached, unsigned sync, void (*callback)(void *), void *arg,
  39. int sequential_consistency);
  40. static struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle,
  41. int source, int mpi_tag, MPI_Comm comm,
  42. unsigned detached, unsigned sync, void (*callback)(void *), void *arg,
  43. int sequential_consistency, int is_internal_req,
  44. starpu_ssize_t count);
  45. static void _starpu_mpi_handle_detached_request(struct _starpu_mpi_req *req);
  46. /* The list of ready requests */
  47. static struct _starpu_mpi_req_list *ready_requests;
  48. /* The list of detached requests that have already been submitted to MPI */
  49. static struct _starpu_mpi_req_list *detached_requests;
  50. static starpu_pthread_mutex_t detached_requests_mutex;
  51. /* Condition to wake up progression thread */
  52. static starpu_pthread_cond_t cond_progression;
  53. /* Condition to wake up waiting for all current MPI requests to finish */
  54. static starpu_pthread_cond_t cond_finished;
  55. static starpu_pthread_mutex_t mutex;
  56. static starpu_pthread_t progress_thread;
  57. static int running = 0;
  58. /* Count requests posted by the application and not yet submitted to MPI */
  59. static starpu_pthread_mutex_t mutex_posted_requests;
  60. static int posted_requests = 0, newer_requests, barrier_running = 0;
  61. #define _STARPU_MPI_INC_POSTED_REQUESTS(value) { STARPU_PTHREAD_MUTEX_LOCK(&mutex_posted_requests); posted_requests += value; STARPU_PTHREAD_MUTEX_UNLOCK(&mutex_posted_requests); }
  62. static void _starpu_mpi_request_init(struct _starpu_mpi_req **req)
  63. {
  64. *req = malloc(sizeof(struct _starpu_mpi_req));
  65. STARPU_ASSERT_MSG(*req, "Invalid request");
  66. /* Initialize the request structure */
  67. (*req)->data_handle = NULL;
  68. (*req)->datatype = 0;
  69. (*req)->ptr = NULL;
  70. (*req)->count = -1;
  71. (*req)->user_datatype = -1;
  72. (*req)->srcdst = -1;
  73. (*req)->mpi_tag = -1;
  74. (*req)->comm = 0;
  75. (*req)->func = NULL;
  76. (*req)->status = NULL;
  77. (*req)->request = 0;
  78. (*req)->flag = NULL;
  79. (*req)->ret = -1;
  80. STARPU_PTHREAD_MUTEX_INIT(&((*req)->req_mutex), NULL);
  81. STARPU_PTHREAD_COND_INIT(&((*req)->req_cond), NULL);
  82. STARPU_PTHREAD_MUTEX_INIT(&((*req)->posted_mutex), NULL);
  83. STARPU_PTHREAD_COND_INIT(&((*req)->posted_cond), NULL);
  84. (*req)->request_type = UNKNOWN_REQ;
  85. (*req)->submitted = 0;
  86. (*req)->completed = 0;
  87. (*req)->posted = 0;
  88. (*req)->other_request = NULL;
  89. (*req)->sync = 0;
  90. (*req)->detached = -1;
  91. (*req)->callback = NULL;
  92. (*req)->callback_arg = NULL;
  93. (*req)->size_req = 0;
  94. (*req)->internal_req = NULL;
  95. (*req)->is_internal_req = 0;
  96. (*req)->envelope = NULL;
  97. (*req)->sequential_consistency = 1;
  98. }
  99. /********************************************************/
  100. /* */
  101. /* Send/Receive functionalities */
  102. /* */
  103. /********************************************************/
  104. static struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle_t data_handle,
  105. int srcdst, int mpi_tag, MPI_Comm comm,
  106. unsigned detached, unsigned sync, void (*callback)(void *), void *arg,
  107. enum _starpu_mpi_request_type request_type, void (*func)(struct _starpu_mpi_req *),
  108. enum starpu_data_access_mode mode,
  109. int sequential_consistency,
  110. int is_internal_req,
  111. starpu_ssize_t count)
  112. {
  113. struct _starpu_mpi_req *req;
  114. _STARPU_MPI_LOG_IN();
  115. _STARPU_MPI_INC_POSTED_REQUESTS(1);
  116. /* Initialize the request structure */
  117. _starpu_mpi_request_init(&req);
  118. req->request_type = request_type;
  119. req->data_handle = data_handle;
  120. req->srcdst = srcdst;
  121. req->mpi_tag = mpi_tag;
  122. req->comm = comm;
  123. req->detached = detached;
  124. req->sync = sync;
  125. req->callback = callback;
  126. req->callback_arg = arg;
  127. req->func = func;
  128. req->sequential_consistency = sequential_consistency;
  129. req->is_internal_req = is_internal_req;
  130. req->count = count;
  131. /* Asynchronously request StarPU to fetch the data in main memory: when
  132. * it is available in main memory, _starpu_mpi_submit_ready_request(req) is called and
  133. * the request is actually submitted */
  134. starpu_data_acquire_cb_sequential_consistency(data_handle, mode, _starpu_mpi_submit_ready_request, (void *)req, sequential_consistency);
  135. _STARPU_MPI_LOG_OUT();
  136. return req;
  137. }
  138. /********************************************************/
  139. /* */
  140. /* Send functionalities */
  141. /* */
  142. /********************************************************/
  143. static void _starpu_mpi_isend_data_func(struct _starpu_mpi_req *req)
  144. {
  145. _STARPU_MPI_LOG_IN();
  146. _STARPU_MPI_DEBUG(2, "post MPI isend request %p type %s tag %d src %d data %p datasize %ld ptr %p datatype '%s' count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, starpu_data_get_size(req->data_handle), req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
  147. _starpu_mpi_comm_amounts_inc(req->comm, req->srcdst, req->datatype, req->count);
  148. _STARPU_MPI_TRACE_ISEND_SUBMIT_BEGIN(req->srcdst, req->mpi_tag, 0);
  149. if (req->sync == 0)
  150. {
  151. req->ret = MPI_Isend(req->ptr, req->count, req->datatype, req->srcdst, _starpu_mpi_tag, req->comm, &req->request);
  152. STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Isend returning %d", req->ret);
  153. }
  154. else
  155. {
  156. req->ret = MPI_Issend(req->ptr, req->count, req->datatype, req->srcdst, _starpu_mpi_tag, req->comm, &req->request);
  157. STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Issend returning %d", req->ret);
  158. }
  159. _STARPU_MPI_TRACE_ISEND_SUBMIT_END(req->srcdst, req->mpi_tag, 0);
  160. /* somebody is perhaps waiting for the MPI request to be posted */
  161. STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
  162. req->submitted = 1;
  163. STARPU_PTHREAD_COND_BROADCAST(&req->req_cond);
  164. STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
  165. _starpu_mpi_handle_detached_request(req);
  166. _STARPU_MPI_LOG_OUT();
  167. }
  168. static void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
  169. {
  170. _starpu_mpi_handle_allocate_datatype(req->data_handle, &req->datatype, &req->user_datatype);
  171. req->envelope = calloc(1,sizeof(struct _starpu_mpi_envelope));
  172. req->envelope->mpi_tag = req->mpi_tag;
  173. if (req->user_datatype == 0)
  174. {
  175. int size;
  176. req->count = 1;
  177. req->ptr = starpu_data_get_local_ptr(req->data_handle);
  178. MPI_Type_size(req->datatype, &size);
  179. req->envelope->size = (starpu_ssize_t)req->count * size;
  180. _STARPU_MPI_DEBUG(1, "Post MPI isend count (%ld) datatype_size %ld request to %d with tag %d\n",req->count,starpu_data_get_size(req->data_handle),req->srcdst, _starpu_mpi_tag);
  181. MPI_Isend(req->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _starpu_mpi_tag, req->comm, &req->size_req);
  182. }
  183. else
  184. {
  185. int ret;
  186. // Do not pack the data, just try to find out the size
  187. starpu_data_pack(req->data_handle, NULL, &(req->envelope->size));
  188. if (req->envelope->size != -1)
  189. {
  190. // We already know the size of the data, let's send it to overlap with the packing of the data
  191. _STARPU_MPI_DEBUG(1, "Sending size %ld (%ld %s) with tag %d to node %d (first call to pack)\n", req->envelope->size, sizeof(req->count), _starpu_mpi_datatype(MPI_BYTE), _starpu_mpi_tag, req->srcdst);
  192. req->count = req->envelope->size;
  193. ret = MPI_Isend(req->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _starpu_mpi_tag, req->comm, &req->size_req);
  194. STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "when sending size, MPI_Isend returning %d", ret);
  195. }
  196. // Pack the data
  197. starpu_data_pack(req->data_handle, &req->ptr, &req->count);
  198. if (req->envelope->size == -1)
  199. {
  200. // We know the size now, let's send it
  201. _STARPU_MPI_DEBUG(1, "Sending size %ld (%ld %s) with tag %d to node %d (second call to pack)\n", req->envelope->size, sizeof(req->count), _starpu_mpi_datatype(MPI_BYTE), _starpu_mpi_tag, req->srcdst);
  202. ret = MPI_Isend(req->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _starpu_mpi_tag, req->comm, &req->size_req);
  203. STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "when sending size, MPI_Isend returning %d", ret);
  204. }
  205. else
  206. {
  207. // We check the size returned with the 2 calls to pack is the same
  208. STARPU_ASSERT_MSG(req->count == req->envelope->size, "Calls to pack_data returned different sizes %ld != %ld", req->count, req->envelope->size);
  209. }
  210. // We can send the data now
  211. }
  212. _starpu_mpi_isend_data_func(req);
  213. }
  214. static struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t data_handle,
  215. int dest, int mpi_tag, MPI_Comm comm,
  216. unsigned detached, unsigned sync, void (*callback)(void *), void *arg,
  217. int sequential_consistency)
  218. {
  219. return _starpu_mpi_isend_irecv_common(data_handle, dest, mpi_tag, comm, detached, sync, callback, arg, SEND_REQ, _starpu_mpi_isend_size_func, STARPU_R, sequential_consistency, 0, 0);
  220. }
  221. int starpu_mpi_isend(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int dest, int mpi_tag, MPI_Comm comm)
  222. {
  223. _STARPU_MPI_LOG_IN();
  224. STARPU_ASSERT_MSG(public_req, "starpu_mpi_isend needs a valid starpu_mpi_req");
  225. struct _starpu_mpi_req *req;
  226. _STARPU_MPI_TRACE_ISEND_COMPLETE_BEGIN(dest, mpi_tag, 0);
  227. req = _starpu_mpi_isend_common(data_handle, dest, mpi_tag, comm, 0, 0, NULL, NULL, 1);
  228. _STARPU_MPI_TRACE_ISEND_COMPLETE_END(dest, mpi_tag, 0);
  229. STARPU_ASSERT_MSG(req, "Invalid return for _starpu_mpi_isend_common");
  230. *public_req = req;
  231. _STARPU_MPI_LOG_OUT();
  232. return 0;
  233. }
  234. int starpu_mpi_isend_detached(starpu_data_handle_t data_handle,
  235. int dest, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
  236. {
  237. _STARPU_MPI_LOG_IN();
  238. _starpu_mpi_isend_common(data_handle, dest, mpi_tag, comm, 1, 0, callback, arg, 1);
  239. _STARPU_MPI_LOG_OUT();
  240. return 0;
  241. }
  242. int starpu_mpi_send(starpu_data_handle_t data_handle, int dest, int mpi_tag, MPI_Comm comm)
  243. {
  244. starpu_mpi_req req;
  245. MPI_Status status;
  246. _STARPU_MPI_LOG_IN();
  247. memset(&status, 0, sizeof(MPI_Status));
  248. starpu_mpi_isend(data_handle, &req, dest, mpi_tag, comm);
  249. starpu_mpi_wait(&req, &status);
  250. _STARPU_MPI_LOG_OUT();
  251. return 0;
  252. }
  253. int starpu_mpi_issend(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int dest, int mpi_tag, MPI_Comm comm)
  254. {
  255. _STARPU_MPI_LOG_IN();
  256. STARPU_ASSERT_MSG(public_req, "starpu_mpi_issend needs a valid starpu_mpi_req");
  257. struct _starpu_mpi_req *req;
  258. req = _starpu_mpi_isend_common(data_handle, dest, mpi_tag, comm, 0, 1, NULL, NULL, 1);
  259. STARPU_ASSERT_MSG(req, "Invalid return for _starpu_mpi_isend_common");
  260. *public_req = req;
  261. _STARPU_MPI_LOG_OUT();
  262. return 0;
  263. }
  264. int starpu_mpi_issend_detached(starpu_data_handle_t data_handle, int dest, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
  265. {
  266. _STARPU_MPI_LOG_IN();
  267. _starpu_mpi_isend_common(data_handle, dest, mpi_tag, comm, 1, 1, callback, arg, 1);
  268. _STARPU_MPI_LOG_OUT();
  269. return 0;
  270. }
  271. /********************************************************/
  272. /* */
  273. /* receive functionalities */
  274. /* */
  275. /********************************************************/
  276. static void _starpu_mpi_irecv_data_func(struct _starpu_mpi_req *req)
  277. {
  278. _STARPU_MPI_LOG_IN();
  279. _STARPU_MPI_DEBUG(20, "post MPI irecv request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
  280. _STARPU_MPI_TRACE_IRECV_SUBMIT_BEGIN(req->srcdst, req->mpi_tag);
  281. req->ret = MPI_Irecv(req->ptr, req->count, req->datatype, req->srcdst, _starpu_mpi_tag, req->comm, &req->request);
  282. STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_IRecv returning %d", req->ret);
  283. _STARPU_MPI_TRACE_IRECV_SUBMIT_END(req->srcdst, req->mpi_tag);
  284. /* somebody is perhaps waiting for the MPI request to be posted */
  285. STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
  286. req->submitted = 1;
  287. STARPU_PTHREAD_COND_BROADCAST(&req->req_cond);
  288. STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
  289. _starpu_mpi_handle_detached_request(req);
  290. _STARPU_MPI_LOG_OUT();
  291. }
  292. static struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, unsigned detached, unsigned sync, void (*callback)(void *), void *arg, int sequential_consistency, int is_internal_req, starpu_ssize_t count)
  293. {
  294. return _starpu_mpi_isend_irecv_common(data_handle, source, mpi_tag, comm, detached, sync, callback, arg, RECV_REQ, _starpu_mpi_irecv_data_func, STARPU_W, sequential_consistency, is_internal_req, count);
  295. }
  296. int starpu_mpi_irecv(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int source, int mpi_tag, MPI_Comm comm)
  297. {
  298. _STARPU_MPI_LOG_IN();
  299. STARPU_ASSERT_MSG(public_req, "starpu_mpi_irecv needs a valid starpu_mpi_req");
  300. // // We check if a tag is defined for the data handle, if not,
  301. // // we define the one given for the communication.
  302. // // A tag is necessary for the internal mpi engine.
  303. // int tag = starpu_data_get_tag(data_handle);
  304. // if (tag == -1)
  305. // starpu_data_set_tag(data_handle, mpi_tag);
  306. struct _starpu_mpi_req *req;
  307. _STARPU_MPI_TRACE_IRECV_COMPLETE_BEGIN(source, mpi_tag);
  308. req = _starpu_mpi_irecv_common(data_handle, source, mpi_tag, comm, 0, 0, NULL, NULL, 1, 0, 0);
  309. _STARPU_MPI_TRACE_IRECV_COMPLETE_END(source, mpi_tag);
  310. STARPU_ASSERT_MSG(req, "Invalid return for _starpu_mpi_irecv_common");
  311. *public_req = req;
  312. _STARPU_MPI_LOG_OUT();
  313. return 0;
  314. }
  315. int starpu_mpi_irecv_detached(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
  316. {
  317. _STARPU_MPI_LOG_IN();
  318. // // We check if a tag is defined for the data handle, if not,
  319. // // we define the one given for the communication.
  320. // // A tag is necessary for the internal mpi engine.
  321. // int tag = starpu_data_get_tag(data_handle);
  322. // if (tag == -1)
  323. // starpu_data_set_tag(data_handle, mpi_tag);
  324. _starpu_mpi_irecv_common(data_handle, source, mpi_tag, comm, 1, 0, callback, arg, 1, 0, 0);
  325. _STARPU_MPI_LOG_OUT();
  326. return 0;
  327. }
  328. int starpu_mpi_irecv_detached_sequential_consistency(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg, int sequential_consistency)
  329. {
  330. _STARPU_MPI_LOG_IN();
  331. // // We check if a tag is defined for the data handle, if not,
  332. // // we define the one given for the communication.
  333. // // A tag is necessary for the internal mpi engine.
  334. // int tag = starpu_data_get_tag(data_handle);
  335. // if (tag == -1)
  336. // starpu_data_set_tag(data_handle, mpi_tag);
  337. _starpu_mpi_irecv_common(data_handle, source, mpi_tag, comm, 1, 0, callback, arg, sequential_consistency, 0, 0);
  338. _STARPU_MPI_LOG_OUT();
  339. return 0;
  340. }
  341. int starpu_mpi_recv(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, MPI_Status *status)
  342. {
  343. starpu_mpi_req req;
  344. _STARPU_MPI_LOG_IN();
  345. // // We check if a tag is defined for the data handle, if not,
  346. // // we define the one given for the communication.
  347. // // A tag is necessary for the internal mpi engine.
  348. // int tag = starpu_data_get_tag(data_handle);
  349. // if (tag == -1)
  350. // starpu_data_set_tag(data_handle, mpi_tag);
  351. starpu_mpi_irecv(data_handle, &req, source, mpi_tag, comm);
  352. starpu_mpi_wait(&req, status);
  353. _STARPU_MPI_LOG_OUT();
  354. return 0;
  355. }
  356. /********************************************************/
  357. /* */
  358. /* Wait functionalities */
  359. /* */
  360. /********************************************************/
  361. static void _starpu_mpi_wait_func(struct _starpu_mpi_req *waiting_req)
  362. {
  363. _STARPU_MPI_LOG_IN();
  364. /* Which is the mpi request we are waiting for ? */
  365. struct _starpu_mpi_req *req = waiting_req->other_request;
  366. _STARPU_MPI_TRACE_UWAIT_BEGIN(req->srcdst, req->mpi_tag);
  367. req->ret = MPI_Wait(&req->request, waiting_req->status);
  368. STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Wait returning %d", req->ret);
  369. _STARPU_MPI_TRACE_UWAIT_END(req->srcdst, req->mpi_tag);
  370. _starpu_mpi_handle_request_termination(req);
  371. _STARPU_MPI_LOG_OUT();
  372. }
  373. int starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status)
  374. {
  375. int ret;
  376. struct _starpu_mpi_req *req = *public_req;
  377. struct _starpu_mpi_req *waiting_req;
  378. _STARPU_MPI_LOG_IN();
  379. _STARPU_MPI_INC_POSTED_REQUESTS(1);
  380. /* We cannot try to complete a MPI request that was not actually posted
  381. * to MPI yet. */
  382. STARPU_PTHREAD_MUTEX_LOCK(&(req->req_mutex));
  383. while (!(req->submitted))
  384. STARPU_PTHREAD_COND_WAIT(&(req->req_cond), &(req->req_mutex));
  385. STARPU_PTHREAD_MUTEX_UNLOCK(&(req->req_mutex));
  386. /* Initialize the request structure */
  387. _starpu_mpi_request_init(&waiting_req);
  388. waiting_req->status = status;
  389. waiting_req->other_request = req;
  390. waiting_req->func = _starpu_mpi_wait_func;
  391. waiting_req->request_type = WAIT_REQ;
  392. _starpu_mpi_submit_ready_request(waiting_req);
  393. /* We wait for the MPI request to finish */
  394. STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
  395. while (!req->completed)
  396. STARPU_PTHREAD_COND_WAIT(&req->req_cond, &req->req_mutex);
  397. STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
  398. ret = req->ret;
  399. /* The internal request structure was automatically allocated */
  400. *public_req = NULL;
  401. free(req);
  402. free(waiting_req);
  403. _STARPU_MPI_LOG_OUT();
  404. return ret;
  405. }
  406. /********************************************************/
  407. /* */
  408. /* Test functionalities */
  409. /* */
  410. /********************************************************/
  411. static void _starpu_mpi_test_func(struct _starpu_mpi_req *testing_req)
  412. {
  413. _STARPU_MPI_LOG_IN();
  414. /* Which is the mpi request we are testing for ? */
  415. struct _starpu_mpi_req *req = testing_req->other_request;
  416. _STARPU_MPI_DEBUG(2, "Test request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n",
  417. req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
  418. _STARPU_MPI_TRACE_UTESTING_BEGIN(req->srcdst, req->mpi_tag);
  419. req->ret = MPI_Test(&req->request, testing_req->flag, testing_req->status);
  420. STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Test returning %d", req->ret);
  421. _STARPU_MPI_TRACE_UTESTING_END(req->srcdst, req->mpi_tag);
  422. if (*testing_req->flag)
  423. {
  424. testing_req->ret = req->ret;
  425. _starpu_mpi_handle_request_termination(req);
  426. }
  427. STARPU_PTHREAD_MUTEX_LOCK(&testing_req->req_mutex);
  428. testing_req->completed = 1;
  429. STARPU_PTHREAD_COND_SIGNAL(&testing_req->req_cond);
  430. STARPU_PTHREAD_MUTEX_UNLOCK(&testing_req->req_mutex);
  431. _STARPU_MPI_LOG_OUT();
  432. }
  433. int starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status)
  434. {
  435. _STARPU_MPI_LOG_IN();
  436. int ret = 0;
  437. STARPU_ASSERT_MSG(public_req, "starpu_mpi_test needs a valid starpu_mpi_req");
  438. struct _starpu_mpi_req *req = *public_req;
  439. STARPU_ASSERT_MSG(!req->detached, "MPI_Test cannot be called on a detached request");
  440. STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
  441. unsigned submitted = req->submitted;
  442. STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
  443. if (submitted)
  444. {
  445. struct _starpu_mpi_req *testing_req;
  446. _starpu_mpi_request_init(&testing_req);
  447. /* Initialize the request structure */
  448. STARPU_PTHREAD_MUTEX_INIT(&(testing_req->req_mutex), NULL);
  449. STARPU_PTHREAD_COND_INIT(&(testing_req->req_cond), NULL);
  450. testing_req->flag = flag;
  451. testing_req->status = status;
  452. testing_req->other_request = req;
  453. testing_req->func = _starpu_mpi_test_func;
  454. testing_req->completed = 0;
  455. testing_req->request_type = TEST_REQ;
  456. _STARPU_MPI_INC_POSTED_REQUESTS(1);
  457. _starpu_mpi_submit_ready_request(testing_req);
  458. /* We wait for the test request to finish */
  459. STARPU_PTHREAD_MUTEX_LOCK(&(testing_req->req_mutex));
  460. while (!(testing_req->completed))
  461. STARPU_PTHREAD_COND_WAIT(&(testing_req->req_cond), &(testing_req->req_mutex));
  462. STARPU_PTHREAD_MUTEX_UNLOCK(&(testing_req->req_mutex));
  463. ret = testing_req->ret;
  464. if (*(testing_req->flag))
  465. {
  466. /* The request was completed so we free the internal
  467. * request structure which was automatically allocated
  468. * */
  469. *public_req = NULL;
  470. free(req);
  471. }
  472. free(testing_req);
  473. }
  474. else
  475. {
  476. *flag = 0;
  477. }
  478. _STARPU_MPI_LOG_OUT();
  479. return ret;
  480. }
  481. /********************************************************/
  482. /* */
  483. /* Barrier functionalities */
  484. /* */
  485. /********************************************************/
  486. static void _starpu_mpi_barrier_func(struct _starpu_mpi_req *barrier_req)
  487. {
  488. _STARPU_MPI_LOG_IN();
  489. barrier_req->ret = MPI_Barrier(barrier_req->comm);
  490. STARPU_ASSERT_MSG(barrier_req->ret == MPI_SUCCESS, "MPI_Barrier returning %d", barrier_req->ret);
  491. _starpu_mpi_handle_request_termination(barrier_req);
  492. _STARPU_MPI_LOG_OUT();
  493. }
  494. int starpu_mpi_barrier(MPI_Comm comm)
  495. {
  496. int ret;
  497. struct _starpu_mpi_req *barrier_req;
  498. _STARPU_MPI_LOG_IN();
  499. _starpu_mpi_request_init(&barrier_req);
  500. /* First wait for *both* all tasks and MPI requests to finish, in case
  501. * some tasks generate MPI requests, MPI requests generate tasks, etc.
  502. */
  503. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  504. STARPU_ASSERT_MSG(!barrier_running, "Concurrent starpu_mpi_barrier is not implemented, even on different communicators");
  505. barrier_running = 1;
  506. do
  507. {
  508. while (posted_requests)
  509. /* Wait for all current MPI requests to finish */
  510. STARPU_PTHREAD_COND_WAIT(&cond_finished, &mutex);
  511. /* No current request, clear flag */
  512. newer_requests = 0;
  513. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  514. /* Now wait for all tasks */
  515. starpu_task_wait_for_all();
  516. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  517. /* Check newer_requests again, in case some MPI requests
  518. * triggered by tasks completed and triggered tasks between
  519. * wait_for_all finished and we take the lock */
  520. } while (posted_requests || newer_requests);
  521. barrier_running = 0;
  522. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  523. /* Initialize the request structure */
  524. STARPU_PTHREAD_MUTEX_INIT(&(barrier_req->req_mutex), NULL);
  525. STARPU_PTHREAD_COND_INIT(&(barrier_req->req_cond), NULL);
  526. barrier_req->func = _starpu_mpi_barrier_func;
  527. barrier_req->request_type = BARRIER_REQ;
  528. barrier_req->comm = comm;
  529. _STARPU_MPI_INC_POSTED_REQUESTS(1);
  530. _starpu_mpi_submit_ready_request(barrier_req);
  531. /* We wait for the MPI request to finish */
  532. STARPU_PTHREAD_MUTEX_LOCK(&barrier_req->req_mutex);
  533. while (!barrier_req->completed)
  534. STARPU_PTHREAD_COND_WAIT(&barrier_req->req_cond, &barrier_req->req_mutex);
  535. STARPU_PTHREAD_MUTEX_UNLOCK(&barrier_req->req_mutex);
  536. ret = barrier_req->ret;
  537. free(barrier_req);
  538. _STARPU_MPI_LOG_OUT();
  539. return ret;
  540. }
  541. /********************************************************/
  542. /* */
  543. /* Progression */
  544. /* */
  545. /********************************************************/
  546. #ifdef STARPU_VERBOSE
  547. static char *_starpu_mpi_request_type(enum _starpu_mpi_request_type request_type)
  548. {
  549. switch (request_type)
  550. {
  551. case SEND_REQ: return "SEND_REQ";
  552. case RECV_REQ: return "RECV_REQ";
  553. case WAIT_REQ: return "WAIT_REQ";
  554. case TEST_REQ: return "TEST_REQ";
  555. case BARRIER_REQ: return "BARRIER_REQ";
  556. case UNKNOWN_REQ: return "UNSET_REQ";
  557. default: return "unknown request type";
  558. }
  559. }
  560. #endif
  561. static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req)
  562. {
  563. int ret;
  564. _STARPU_MPI_LOG_IN();
  565. _STARPU_MPI_DEBUG(2, "complete MPI request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d internal_req %p\n",
  566. req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr,
  567. _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype, req->internal_req);
  568. if (req->internal_req)
  569. {
  570. struct _starpu_mpi_early_data_handle *early_data_handle = _starpu_mpi_early_data_find(req->mpi_tag, req->srcdst);
  571. STARPU_ASSERT_MSG(early_data_handle, "Could not find a copy data handle with the tag %d and the node %d\n", req->mpi_tag, req->srcdst);
  572. _STARPU_MPI_DEBUG(3, "Handling deleting of early_data structure from the hashmap..\n");
  573. _starpu_mpi_early_data_delete(early_data_handle);
  574. free(early_data_handle);
  575. }
  576. else
  577. {
  578. if (req->request_type == RECV_REQ || req->request_type == SEND_REQ)
  579. {
  580. if (req->user_datatype == 1)
  581. {
  582. if (req->request_type == SEND_REQ)
  583. {
  584. // We need to make sure the communication for sending the size
  585. // has completed, as MPI can re-order messages, let's call
  586. // MPI_Wait to make sure data have been sent
  587. ret = MPI_Wait(&req->size_req, MPI_STATUS_IGNORE);
  588. STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Wait returning %d", ret);
  589. free(req->ptr);
  590. }
  591. if (req->request_type == RECV_REQ)
  592. {
  593. // req->ptr is freed by starpu_data_unpack
  594. starpu_data_unpack(req->data_handle, req->ptr, req->count);
  595. }
  596. }
  597. else
  598. {
  599. _starpu_mpi_handle_free_datatype(req->data_handle, &req->datatype);
  600. }
  601. }
  602. }
  603. if (req->data_handle)
  604. starpu_data_release(req->data_handle);
  605. if (req->envelope)
  606. {
  607. free(req->envelope);
  608. req->envelope = NULL;
  609. }
  610. /* Execute the specified callback, if any */
  611. if (req->callback)
  612. req->callback(req->callback_arg);
  613. /* tell anyone potentially waiting on the request that it is
  614. * terminated now */
  615. STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
  616. req->completed = 1;
  617. STARPU_PTHREAD_COND_BROADCAST(&req->req_cond);
  618. STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
  619. _STARPU_MPI_LOG_OUT();
  620. }
  621. struct _starpu_mpi_early_data_cb_args
  622. {
  623. starpu_data_handle_t data_handle;
  624. starpu_data_handle_t early_handle;
  625. struct _starpu_mpi_req *req;
  626. void *buffer;
  627. };
  628. static void _starpu_mpi_early_data_cb(void* arg)
  629. {
  630. struct _starpu_mpi_early_data_cb_args *args = arg;
  631. // We store in the application request the internal MPI
  632. // request so that it can be used by starpu_mpi_wait
  633. args->req->request = args->req->internal_req->request;
  634. args->req->submitted = 1;
  635. if (args->buffer)
  636. {
  637. /* Data has been received as a raw memory, it has to be unpacked */
  638. struct starpu_data_interface_ops *itf_src = starpu_data_get_interface_ops(args->early_handle);
  639. struct starpu_data_interface_ops *itf_dst = starpu_data_get_interface_ops(args->data_handle);
  640. STARPU_ASSERT_MSG(itf_dst->unpack_data, "The data interface does not define an unpack function\n");
  641. itf_dst->unpack_data(args->data_handle, STARPU_MAIN_RAM, args->buffer, itf_src->get_size(args->early_handle));
  642. free(args->buffer);
  643. }
  644. else
  645. {
  646. struct starpu_data_interface_ops *itf = starpu_data_get_interface_ops(args->early_handle);
  647. void* itf_src = starpu_data_get_interface_on_node(args->early_handle, STARPU_MAIN_RAM);
  648. void* itf_dst = starpu_data_get_interface_on_node(args->data_handle, STARPU_MAIN_RAM);
  649. if (!itf->copy_methods->ram_to_ram)
  650. {
  651. _STARPU_MPI_DEBUG(3, "Initiating any_to_any copy..\n");
  652. itf->copy_methods->any_to_any(itf_src, STARPU_MAIN_RAM, itf_dst, STARPU_MAIN_RAM, NULL);
  653. }
  654. else
  655. {
  656. _STARPU_MPI_DEBUG(3, "Initiating ram_to_ram copy..\n");
  657. itf->copy_methods->ram_to_ram(itf_src, STARPU_MAIN_RAM, itf_dst, STARPU_MAIN_RAM);
  658. }
  659. }
  660. _STARPU_MPI_DEBUG(3, "Done, handling release of early_handle..\n");
  661. starpu_data_release(args->early_handle);
  662. _STARPU_MPI_DEBUG(3, "Done, handling unregister of early_handle..\n");
  663. starpu_data_unregister_submit(args->early_handle);
  664. _STARPU_MPI_DEBUG(3, "Done, handling request %p termination of the already received request\n",args->req);
  665. // If the request is detached, we need to call _starpu_mpi_handle_request_termination
  666. // as it will not be called automatically as the request is not in the list detached_requests
  667. if (args->req->detached)
  668. _starpu_mpi_handle_request_termination(args->req);
  669. // else: If the request is not detached its termination will
  670. // be handled when calling starpu_mpi_wait
  671. free(args);
  672. }
  673. static void _starpu_mpi_submit_ready_request(void *arg)
  674. {
  675. _STARPU_MPI_LOG_IN();
  676. struct _starpu_mpi_req *req = arg;
  677. _STARPU_MPI_INC_POSTED_REQUESTS(-1);
  678. _STARPU_MPI_DEBUG(3, "new req %p srcdst %d tag %d and type %s\n", req, req->srcdst, req->mpi_tag, _starpu_mpi_request_type(req->request_type));
  679. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  680. if (req->request_type == RECV_REQ)
  681. {
  682. /* Case : the request is the internal receive request submitted
  683. * by StarPU-MPI to receive incoming data without a matching
  684. * early_request from the application. We immediately allocate the
  685. * pointer associated to the data_handle, and push it into the
  686. * ready_requests list, so as the real MPI request can be submitted
  687. * before the next submission of the envelope-catching request. */
  688. if (req->is_internal_req)
  689. {
  690. _starpu_mpi_handle_allocate_datatype(req->data_handle, &req->datatype, &req->user_datatype);
  691. if (req->user_datatype == 0)
  692. {
  693. req->count = 1;
  694. req->ptr = starpu_data_get_local_ptr(req->data_handle);
  695. }
  696. else
  697. {
  698. STARPU_ASSERT(req->count);
  699. req->ptr = malloc(req->count);
  700. STARPU_ASSERT_MSG(req->ptr, "cannot allocate message of size %ld\n", req->count);
  701. }
  702. _STARPU_MPI_DEBUG(3, "Pushing internal starpu_mpi_irecv request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n",
  703. req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr,
  704. _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
  705. _starpu_mpi_req_list_push_front(ready_requests, req);
  706. /* inform the starpu mpi thread that the request has been pushed in the ready_requests list */
  707. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  708. STARPU_PTHREAD_MUTEX_LOCK(&req->posted_mutex);
  709. req->posted = 1;
  710. STARPU_PTHREAD_COND_BROADCAST(&req->posted_cond);
  711. STARPU_PTHREAD_MUTEX_UNLOCK(&req->posted_mutex);
  712. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  713. }
  714. else
  715. {
  716. /* test whether the receive request has already been submitted internally by StarPU-MPI*/
  717. struct _starpu_mpi_early_data_handle *early_data_handle = _starpu_mpi_early_data_find(req->mpi_tag, req->srcdst);
  718. /* Case: a receive request for a data with the given tag and source has already been
  719. * posted by StarPU. Asynchronously requests a Read permission over the temporary handle ,
  720. * so as when the internal receive is completed, the _starpu_mpi_early_data_cb function
  721. * will be called to bring the data back to the original data handle associated to the request.*/
  722. if (early_data_handle)
  723. {
  724. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  725. STARPU_PTHREAD_MUTEX_LOCK(&(early_data_handle->req_mutex));
  726. while (!(early_data_handle->req_ready))
  727. STARPU_PTHREAD_COND_WAIT(&(early_data_handle->req_cond), &(early_data_handle->req_mutex));
  728. STARPU_PTHREAD_MUTEX_UNLOCK(&(early_data_handle->req_mutex));
  729. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  730. _STARPU_MPI_DEBUG(3, "The RECV request %p with tag %d has already been received, copying previously received data into handle's pointer..\n", req, req->mpi_tag);
  731. STARPU_ASSERT(req->data_handle != early_data_handle->handle);
  732. req->internal_req = early_data_handle->req;
  733. struct _starpu_mpi_early_data_cb_args *cb_args = malloc(sizeof(struct _starpu_mpi_early_data_cb_args));
  734. cb_args->data_handle = req->data_handle;
  735. cb_args->early_handle = early_data_handle->handle;
  736. cb_args->buffer = early_data_handle->buffer;
  737. cb_args->req = req;
  738. _STARPU_MPI_DEBUG(3, "Calling data_acquire_cb on starpu_mpi_copy_cb..\n");
  739. starpu_data_acquire_cb(early_data_handle->handle,STARPU_R,_starpu_mpi_early_data_cb,(void*) cb_args);
  740. }
  741. /* Case: no matching data has been received. Store the receive request as an early_request. */
  742. else
  743. {
  744. _STARPU_MPI_DEBUG(3, "Adding the pending receive request %p (srcdst %d tag %d) into the request hashmap\n", req, req->srcdst, req->mpi_tag);
  745. _starpu_mpi_early_request_add(req);
  746. }
  747. }
  748. }
  749. else
  750. {
  751. _starpu_mpi_req_list_push_front(ready_requests, req);
  752. _STARPU_MPI_DEBUG(3, "Pushing new request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n",
  753. req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
  754. }
  755. newer_requests = 1;
  756. STARPU_PTHREAD_COND_BROADCAST(&cond_progression);
  757. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  758. _STARPU_MPI_LOG_OUT();
  759. }
  760. #ifdef STARPU_MPI_ACTIVITY
  761. static unsigned _starpu_mpi_progression_hook_func(void *arg STARPU_ATTRIBUTE_UNUSED)
  762. {
  763. unsigned may_block = 1;
  764. STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
  765. if (!_starpu_mpi_req_list_empty(detached_requests))
  766. {
  767. STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
  768. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  769. STARPU_PTHREAD_COND_SIGNAL(&cond_progression);
  770. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  771. may_block = 0;
  772. }
  773. else
  774. STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
  775. return may_block;
  776. }
  777. #endif /* STARPU_MPI_ACTIVITY */
  778. static void _starpu_mpi_test_detached_requests(void)
  779. {
  780. _STARPU_MPI_LOG_IN();
  781. int flag;
  782. MPI_Status status;
  783. struct _starpu_mpi_req *req, *next_req;
  784. STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
  785. for (req = _starpu_mpi_req_list_begin(detached_requests);
  786. req != _starpu_mpi_req_list_end(detached_requests);
  787. req = next_req)
  788. {
  789. next_req = _starpu_mpi_req_list_next(req);
  790. STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
  791. //_STARPU_MPI_DEBUG(3, "Test detached request %p - mpitag %d - TYPE %s %d\n", &req->request, req->mpi_tag, _starpu_mpi_request_type(req->request_type), req->srcdst);
  792. req->ret = MPI_Test(&req->request, &flag, &status);
  793. STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Test returning %d", req->ret);
  794. if (flag)
  795. {
  796. if (req->request_type == RECV_REQ)
  797. {
  798. _STARPU_MPI_TRACE_IRECV_COMPLETE_BEGIN(req->srcdst, req->mpi_tag);
  799. }
  800. else if (req->request_type == SEND_REQ)
  801. {
  802. _STARPU_MPI_TRACE_ISEND_COMPLETE_BEGIN(req->srcdst, req->mpi_tag, 0);
  803. }
  804. _starpu_mpi_handle_request_termination(req);
  805. if (req->request_type == RECV_REQ)
  806. {
  807. _STARPU_MPI_TRACE_IRECV_COMPLETE_END(req->srcdst, req->mpi_tag);
  808. }
  809. else if (req->request_type == SEND_REQ)
  810. {
  811. _STARPU_MPI_TRACE_ISEND_COMPLETE_END(req->srcdst, req->mpi_tag, 0);
  812. }
  813. }
  814. STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
  815. if (flag)
  816. {
  817. _starpu_mpi_req_list_erase(detached_requests, req);
  818. #ifdef STARPU_DEVEL
  819. #warning FIXME: when do we free internal requests
  820. #endif
  821. if (!req->is_internal_req)
  822. free(req);
  823. }
  824. }
  825. STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
  826. _STARPU_MPI_LOG_OUT();
  827. }
  828. static void _starpu_mpi_handle_detached_request(struct _starpu_mpi_req *req)
  829. {
  830. if (req->detached)
  831. {
  832. /* put the submitted request into the list of pending requests
  833. * so that it can be handled by the progression mechanisms */
  834. STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
  835. _starpu_mpi_req_list_push_front(detached_requests, req);
  836. STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
  837. starpu_wake_all_blocked_workers();
  838. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  839. STARPU_PTHREAD_COND_SIGNAL(&cond_progression);
  840. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  841. }
  842. }
  843. static void _starpu_mpi_handle_ready_request(struct _starpu_mpi_req *req)
  844. {
  845. _STARPU_MPI_LOG_IN();
  846. STARPU_ASSERT_MSG(req, "Invalid request");
  847. /* submit the request to MPI */
  848. _STARPU_MPI_DEBUG(2, "Handling new request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n",
  849. req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
  850. req->func(req);
  851. _STARPU_MPI_LOG_OUT();
  852. }
  853. struct _starpu_mpi_argc_argv
  854. {
  855. int initialize_mpi;
  856. int *argc;
  857. char ***argv;
  858. };
  859. static void _starpu_mpi_print_thread_level_support(int thread_level, char *msg)
  860. {
  861. switch (thread_level)
  862. {
  863. case MPI_THREAD_SERIALIZED:
  864. {
  865. _STARPU_DISP("MPI%s MPI_THREAD_SERIALIZED; Multiple threads may make MPI calls, but only one at a time.\n", msg);
  866. break;
  867. }
  868. case MPI_THREAD_FUNNELED:
  869. {
  870. _STARPU_DISP("MPI%s MPI_THREAD_FUNNELED; The application can safely make calls to StarPU-MPI functions, but should not call directly MPI communication functions.\n", msg);
  871. break;
  872. }
  873. case MPI_THREAD_SINGLE:
  874. {
  875. _STARPU_DISP("MPI%s MPI_THREAD_SINGLE; MPI does not have multi-thread support, this might cause problems. The application can make calls to StarPU-MPI functions, but not call directly MPI Communication functions.\n", msg);
  876. break;
  877. }
  878. }
  879. }
  880. static void *_starpu_mpi_progress_thread_func(void *arg)
  881. {
  882. struct _starpu_mpi_argc_argv *argc_argv = (struct _starpu_mpi_argc_argv *) arg;
  883. int rank, worldsize;
  884. if (argc_argv->initialize_mpi)
  885. {
  886. int thread_support;
  887. _STARPU_DEBUG("Calling MPI_Init_thread\n");
  888. if (MPI_Init_thread(argc_argv->argc, argc_argv->argv, MPI_THREAD_SERIALIZED, &thread_support) != MPI_SUCCESS)
  889. {
  890. _STARPU_ERROR("MPI_Init_thread failed\n");
  891. }
  892. _starpu_mpi_print_thread_level_support(thread_support, "_Init_thread level =");
  893. }
  894. else
  895. {
  896. int provided;
  897. MPI_Query_thread(&provided);
  898. _starpu_mpi_print_thread_level_support(provided, " has been initialized with");
  899. }
  900. MPI_Comm_rank(MPI_COMM_WORLD, &rank);
  901. MPI_Comm_size(MPI_COMM_WORLD, &worldsize);
  902. MPI_Comm_set_errhandler(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
  903. {
  904. _STARPU_MPI_TRACE_START(rank, worldsize);
  905. #ifdef STARPU_USE_FXT
  906. starpu_profiling_set_id(rank);
  907. #endif //STARPU_USE_FXT
  908. }
  909. _starpu_mpi_add_sync_point_in_fxt();
  910. _starpu_mpi_comm_amounts_init(MPI_COMM_WORLD);
  911. _starpu_mpi_cache_init(MPI_COMM_WORLD);
  912. _starpu_mpi_early_request_init(worldsize);
  913. _starpu_mpi_early_data_init(worldsize);
  914. /* notify the main thread that the progression thread is ready */
  915. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  916. running = 1;
  917. STARPU_PTHREAD_COND_SIGNAL(&cond_progression);
  918. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  919. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  920. struct _starpu_mpi_envelope *recv_env = calloc(1,sizeof(struct _starpu_mpi_envelope));
  921. int header_req_submitted = 0;
  922. while (running || posted_requests || !(_starpu_mpi_req_list_empty(ready_requests)) || !(_starpu_mpi_req_list_empty(detached_requests)))
  923. {
  924. /* shall we block ? */
  925. unsigned block = _starpu_mpi_req_list_empty(ready_requests) && _starpu_mpi_early_request_count() == 0;
  926. #ifndef STARPU_MPI_ACTIVITY
  927. STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
  928. block = block && _starpu_mpi_req_list_empty(detached_requests);
  929. STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
  930. #endif /* STARPU_MPI_ACTIVITY */
  931. if (block)
  932. {
  933. _STARPU_MPI_DEBUG(3, "NO MORE REQUESTS TO HANDLE\n");
  934. _STARPU_MPI_TRACE_SLEEP_BEGIN();
  935. if (barrier_running)
  936. /* Tell mpi_barrier */
  937. STARPU_PTHREAD_COND_SIGNAL(&cond_finished);
  938. STARPU_PTHREAD_COND_WAIT(&cond_progression, &mutex);
  939. _STARPU_MPI_TRACE_SLEEP_END();
  940. }
  941. /* get one request */
  942. struct _starpu_mpi_req *req;
  943. while (!_starpu_mpi_req_list_empty(ready_requests))
  944. {
  945. req = _starpu_mpi_req_list_pop_back(ready_requests);
  946. /* handling a request is likely to block for a while
  947. * (on a sync_data_with_mem call), we want to let the
  948. * application submit requests in the meantime, so we
  949. * release the lock. */
  950. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  951. _starpu_mpi_handle_ready_request(req);
  952. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  953. }
  954. /* If there is no currently submitted header_req submitted to
  955. * catch envelopes from senders, and there is some pending
  956. * receive requests on our side, we resubmit a header request. */
  957. MPI_Request header_req;
  958. if ((_starpu_mpi_early_request_count() > 0) && (header_req_submitted == 0))// && (HASH_COUNT(_starpu_mpi_early_data_handle_hashmap) == 0))
  959. {
  960. _STARPU_MPI_DEBUG(3, "Posting a receive to get a data envelop\n");
  961. MPI_Irecv(recv_env, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, MPI_ANY_SOURCE, _starpu_mpi_tag, MPI_COMM_WORLD, &header_req);
  962. header_req_submitted = 1;
  963. }
  964. /* test whether there are some terminated "detached request" */
  965. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  966. _starpu_mpi_test_detached_requests();
  967. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  968. if (header_req_submitted == 1)
  969. {
  970. int flag,res;
  971. MPI_Status status;
  972. _STARPU_MPI_DEBUG(4, "Test of header_req\n");
  973. /* test whether an envelope has arrived. */
  974. res = MPI_Test(&header_req, &flag, &status);
  975. STARPU_ASSERT(res == MPI_SUCCESS);
  976. if (flag)
  977. {
  978. _STARPU_MPI_DEBUG(3, "Searching for application request with tag %d and source %d (size %ld)\n", recv_env->mpi_tag, status.MPI_SOURCE, recv_env->size);
  979. struct _starpu_mpi_req *found_req = _starpu_mpi_early_request_find(recv_env->mpi_tag, status.MPI_SOURCE);
  980. /* Case: a data will arrive before a matching receive is
  981. * posted by the application. Create a temporary handle to
  982. * store the incoming data, submit a starpu_mpi_irecv_detached
  983. * on this handle, and store it as an early_data
  984. */
  985. if (!found_req)
  986. {
  987. _STARPU_MPI_DEBUG(3, "Request with tag %d and source %d not found, creating a early_handle to receive incoming data..\n", recv_env->mpi_tag, status.MPI_SOURCE);
  988. starpu_data_handle_t data_handle = NULL;
  989. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  990. data_handle = _starpu_data_get_data_handle_from_tag(recv_env->mpi_tag);
  991. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  992. struct _starpu_mpi_early_data_handle* early_data_handle = calloc(1, sizeof(struct _starpu_mpi_early_data_handle));
  993. STARPU_ASSERT(early_data_handle);
  994. STARPU_PTHREAD_MUTEX_INIT(&early_data_handle->req_mutex, NULL);
  995. STARPU_PTHREAD_COND_INIT(&early_data_handle->req_cond, NULL);
  996. early_data_handle->mpi_tag = recv_env->mpi_tag;
  997. early_data_handle->env = recv_env;
  998. early_data_handle->source = status.MPI_SOURCE;
  999. if (data_handle)
  1000. {
  1001. early_data_handle->buffer = NULL;
  1002. starpu_data_register_same(&early_data_handle->handle, data_handle);
  1003. _starpu_mpi_early_data_add(early_data_handle);
  1004. }
  1005. else
  1006. {
  1007. /* The application has not registered yet a data with the tag,
  1008. * we are going to receive the data as a raw memory, and give it
  1009. * to the application when it post a receive for this tag
  1010. */
  1011. _STARPU_MPI_DEBUG(20, "Posting a receive for a data of size %d which has not yet been registered\n", (int)early_data_handle->env->size);
  1012. early_data_handle->buffer = malloc(early_data_handle->env->size);
  1013. starpu_vector_data_register(&early_data_handle->handle, STARPU_MAIN_RAM, (uintptr_t) early_data_handle->buffer, early_data_handle->env->size, 1);
  1014. _starpu_mpi_early_data_add(early_data_handle);
  1015. }
  1016. _STARPU_MPI_DEBUG(20, "Posting internal detached irecv on early_handle with tag %d from src %d ..\n", early_data_handle->mpi_tag, status.MPI_SOURCE);
  1017. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  1018. early_data_handle->req = _starpu_mpi_irecv_common(early_data_handle->handle, status.MPI_SOURCE,
  1019. early_data_handle->mpi_tag, MPI_COMM_WORLD, 1, 0,
  1020. NULL, NULL, 1, 1, recv_env->size);
  1021. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  1022. // We wait until the request is pushed in the
  1023. // ready_request list, that ensures that the next loop
  1024. // will call _starpu_mpi_handle_ready_request
  1025. // on the request and post the corresponding mpi_irecv,
  1026. // otherwise, it may lead to read data as envelop
  1027. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  1028. STARPU_PTHREAD_MUTEX_LOCK(&(early_data_handle->req->posted_mutex));
  1029. while (!(early_data_handle->req->posted))
  1030. STARPU_PTHREAD_COND_WAIT(&(early_data_handle->req->posted_cond), &(early_data_handle->req->posted_mutex));
  1031. STARPU_PTHREAD_MUTEX_UNLOCK(&(early_data_handle->req->posted_mutex));
  1032. STARPU_PTHREAD_MUTEX_LOCK(&early_data_handle->req_mutex);
  1033. early_data_handle->req_ready = 1;
  1034. STARPU_PTHREAD_COND_BROADCAST(&early_data_handle->req_cond);
  1035. STARPU_PTHREAD_MUTEX_UNLOCK(&early_data_handle->req_mutex);
  1036. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  1037. }
  1038. /* Case: a matching application request has been found for
  1039. * the incoming data, we handle the correct allocation
  1040. * of the pointer associated to the data handle, then
  1041. * submit the corresponding receive with
  1042. * _starpu_mpi_handle_ready_request. */
  1043. else
  1044. {
  1045. _STARPU_MPI_DEBUG(3, "A matching receive has been found for the incoming data with tag %d\n", recv_env->mpi_tag);
  1046. _starpu_mpi_early_request_delete(found_req);
  1047. _starpu_mpi_handle_allocate_datatype(found_req->data_handle, &found_req->datatype, &found_req->user_datatype);
  1048. if (found_req->user_datatype == 0)
  1049. {
  1050. found_req->count = 1;
  1051. found_req->ptr = starpu_data_get_local_ptr(found_req->data_handle);
  1052. }
  1053. else
  1054. {
  1055. found_req->count = recv_env->size;
  1056. found_req->ptr = malloc(found_req->count);
  1057. STARPU_ASSERT_MSG(found_req->ptr, "cannot allocate message of size %ld\n", found_req->count);
  1058. }
  1059. _STARPU_MPI_DEBUG(3, "Handling new request... \n");
  1060. /* handling a request is likely to block for a while
  1061. * (on a sync_data_with_mem call), we want to let the
  1062. * application submit requests in the meantime, so we
  1063. * release the lock. */
  1064. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  1065. _starpu_mpi_handle_ready_request(found_req);
  1066. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  1067. }
  1068. header_req_submitted = 0;
  1069. }
  1070. else
  1071. {
  1072. _STARPU_MPI_DEBUG(4, "Nothing received, continue ..\n");
  1073. }
  1074. }
  1075. }
  1076. STARPU_ASSERT_MSG(_starpu_mpi_req_list_empty(detached_requests), "List of detached requests not empty");
  1077. STARPU_ASSERT_MSG(_starpu_mpi_req_list_empty(ready_requests), "List of ready requests not empty");
  1078. STARPU_ASSERT_MSG(posted_requests == 0, "Number of posted request is not zero");
  1079. _starpu_mpi_early_request_check_termination();
  1080. _starpu_mpi_early_data_check_termination();
  1081. if (argc_argv->initialize_mpi)
  1082. {
  1083. _STARPU_MPI_DEBUG(3, "Calling MPI_Finalize()\n");
  1084. MPI_Finalize();
  1085. }
  1086. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  1087. _starpu_mpi_early_data_free(worldsize);
  1088. _starpu_mpi_early_request_free();
  1089. free(argc_argv);
  1090. free(recv_env);
  1091. return NULL;
  1092. }
  1093. /********************************************************/
  1094. /* */
  1095. /* (De)Initialization methods */
  1096. /* */
  1097. /********************************************************/
  1098. #ifdef STARPU_MPI_ACTIVITY
  1099. static int hookid = - 1;
  1100. #endif /* STARPU_MPI_ACTIVITY */
  1101. static void _starpu_mpi_add_sync_point_in_fxt(void)
  1102. {
  1103. #ifdef STARPU_USE_FXT
  1104. int rank;
  1105. int worldsize;
  1106. int ret;
  1107. MPI_Comm_rank(MPI_COMM_WORLD, &rank);
  1108. MPI_Comm_size(MPI_COMM_WORLD, &worldsize);
  1109. ret = MPI_Barrier(MPI_COMM_WORLD);
  1110. STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Barrier returning %d", ret);
  1111. /* We generate a "unique" key so that we can make sure that different
  1112. * FxT traces come from the same MPI run. */
  1113. int random_number;
  1114. /* XXX perhaps we don't want to generate a new seed if the application
  1115. * specified some reproductible behaviour ? */
  1116. if (rank == 0)
  1117. {
  1118. srand(time(NULL));
  1119. random_number = rand();
  1120. }
  1121. ret = MPI_Bcast(&random_number, 1, MPI_INT, 0, MPI_COMM_WORLD);
  1122. STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Bcast returning %d", ret);
  1123. _STARPU_MPI_TRACE_BARRIER(rank, worldsize, random_number);
  1124. _STARPU_MPI_DEBUG(3, "unique key %x\n", random_number);
  1125. #endif
  1126. }
  1127. static
  1128. int _starpu_mpi_initialize(int *argc, char ***argv, int initialize_mpi)
  1129. {
  1130. STARPU_PTHREAD_MUTEX_INIT(&mutex, NULL);
  1131. STARPU_PTHREAD_COND_INIT(&cond_progression, NULL);
  1132. STARPU_PTHREAD_COND_INIT(&cond_finished, NULL);
  1133. ready_requests = _starpu_mpi_req_list_new();
  1134. STARPU_PTHREAD_MUTEX_INIT(&detached_requests_mutex, NULL);
  1135. detached_requests = _starpu_mpi_req_list_new();
  1136. STARPU_PTHREAD_MUTEX_INIT(&mutex_posted_requests, NULL);
  1137. struct _starpu_mpi_argc_argv *argc_argv = malloc(sizeof(struct _starpu_mpi_argc_argv));
  1138. argc_argv->initialize_mpi = initialize_mpi;
  1139. argc_argv->argc = argc;
  1140. argc_argv->argv = argv;
  1141. #ifdef STARPU_MPI_ACTIVITY
  1142. hookid = starpu_progression_hook_register(_starpu_mpi_progression_hook_func, NULL);
  1143. STARPU_ASSERT_MSG(hookid >= 0, "starpu_progression_hook_register failed");
  1144. #endif /* STARPU_MPI_ACTIVITY */
  1145. STARPU_PTHREAD_CREATE(&progress_thread, NULL, _starpu_mpi_progress_thread_func, argc_argv);
  1146. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  1147. while (!running)
  1148. STARPU_PTHREAD_COND_WAIT(&cond_progression, &mutex);
  1149. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  1150. return 0;
  1151. }
  1152. int starpu_mpi_init(int *argc, char ***argv, int initialize_mpi)
  1153. {
  1154. return _starpu_mpi_initialize(argc, argv, initialize_mpi);
  1155. }
  1156. int starpu_mpi_initialize(void)
  1157. {
  1158. return _starpu_mpi_initialize(NULL, NULL, 0);
  1159. }
  1160. int starpu_mpi_initialize_extended(int *rank, int *world_size)
  1161. {
  1162. int ret;
  1163. ret = _starpu_mpi_initialize(NULL, NULL, 1);
  1164. if (ret == 0)
  1165. {
  1166. _STARPU_DEBUG("Calling MPI_Comm_rank\n");
  1167. MPI_Comm_rank(MPI_COMM_WORLD, rank);
  1168. MPI_Comm_size(MPI_COMM_WORLD, world_size);
  1169. }
  1170. return ret;
  1171. }
  1172. int starpu_mpi_shutdown(void)
  1173. {
  1174. void *value;
  1175. int rank, world_size;
  1176. /* We need to get the rank before calling MPI_Finalize to pass to _starpu_mpi_comm_amounts_display() */
  1177. MPI_Comm_rank(MPI_COMM_WORLD, &rank);
  1178. MPI_Comm_size(MPI_COMM_WORLD, &world_size);
  1179. /* kill the progression thread */
  1180. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  1181. running = 0;
  1182. STARPU_PTHREAD_COND_BROADCAST(&cond_progression);
  1183. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  1184. starpu_pthread_join(progress_thread, &value);
  1185. #ifdef STARPU_MPI_ACTIVITY
  1186. starpu_progression_hook_deregister(hookid);
  1187. #endif /* STARPU_MPI_ACTIVITY */
  1188. _STARPU_MPI_TRACE_STOP(rank, world_size);
  1189. /* free the request queues */
  1190. _starpu_mpi_req_list_delete(detached_requests);
  1191. _starpu_mpi_req_list_delete(ready_requests);
  1192. _starpu_mpi_comm_amounts_display(rank);
  1193. _starpu_mpi_comm_amounts_free();
  1194. _starpu_mpi_cache_free(world_size);
  1195. return 0;
  1196. }
  1197. void _starpu_mpi_clear_cache(starpu_data_handle_t data_handle)
  1198. {
  1199. starpu_mpi_cache_flush(MPI_COMM_WORLD, data_handle);
  1200. }
  1201. void starpu_mpi_data_register(starpu_data_handle_t data_handle, int tag, int rank)
  1202. {
  1203. #ifdef STARPU_DEVEL
  1204. #warning see if the following code is really needed, it deadlocks some applications
  1205. #if 0
  1206. int my;
  1207. MPI_Comm_rank(MPI_COMM_WORLD, &my);
  1208. if (my != rank)
  1209. STARPU_ASSERT_MSG(data_handle->home_node == -1, "Data does not belong to node %d, it should be assigned a home node -1", my);
  1210. #endif
  1211. #endif
  1212. _starpu_data_set_rank(data_handle, rank);
  1213. _starpu_data_set_tag(data_handle, tag);
  1214. _starpu_data_set_unregister_hook(data_handle, _starpu_mpi_clear_cache);
  1215. }
  1216. int starpu_mpi_world_rank(void)
  1217. {
  1218. int rank;
  1219. MPI_Comm_rank(MPI_COMM_WORLD, &rank);
  1220. return rank;
  1221. }