starpu_mpi.c 71 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2009, 2010-2016 Université de Bordeaux
  4. * Copyright (C) 2010, 2011, 2012, 2013, 2014, 2015, 2016 CNRS
  5. * Copyright (C) 2016 Inria
  6. *
  7. * StarPU is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU Lesser General Public License as published by
  9. * the Free Software Foundation; either version 2.1 of the License, or (at
  10. * your option) any later version.
  11. *
  12. * StarPU is distributed in the hope that it will be useful, but
  13. * WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  15. *
  16. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  17. */
  18. #include <stdlib.h>
  19. #include <starpu_mpi.h>
  20. #include <starpu_mpi_datatype.h>
  21. #include <starpu_mpi_private.h>
  22. #include <starpu_mpi_cache.h>
  23. #include <starpu_profiling.h>
  24. #include <starpu_mpi_stats.h>
  25. #include <starpu_mpi_cache.h>
  26. #include <starpu_mpi_sync_data.h>
  27. #include <starpu_mpi_early_data.h>
  28. #include <starpu_mpi_early_request.h>
  29. #include <starpu_mpi_select_node.h>
  30. #include <starpu_mpi_tag.h>
  31. #include <starpu_mpi_comm.h>
  32. #include <common/config.h>
  33. #include <common/thread.h>
  34. #include <datawizard/interfaces/data_interface.h>
  35. #include <datawizard/coherency.h>
  36. #include <core/simgrid.h>
  37. #include <core/task.h>
  38. static void _starpu_mpi_add_sync_point_in_fxt(void);
  39. static void _starpu_mpi_submit_ready_request(void *arg);
  40. static void _starpu_mpi_handle_ready_request(struct _starpu_mpi_req *req);
  41. static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req);
  42. #ifdef STARPU_VERBOSE
  43. static char *_starpu_mpi_request_type(enum _starpu_mpi_request_type request_type);
  44. #endif
  45. static struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t data_handle,
  46. int dest, int data_tag, MPI_Comm comm,
  47. unsigned detached, unsigned sync, void (*callback)(void *), void *arg,
  48. int sequential_consistency);
  49. static struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle,
  50. int source, int data_tag, MPI_Comm comm,
  51. unsigned detached, unsigned sync, void (*callback)(void *), void *arg,
  52. int sequential_consistency, int is_internal_req,
  53. starpu_ssize_t count);
  54. static void _starpu_mpi_handle_detached_request(struct _starpu_mpi_req *req);
  55. static void _starpu_mpi_early_data_cb(void* arg);
  56. /* The list of ready requests */
  57. static struct _starpu_mpi_req_list *ready_requests;
  58. /* The list of detached requests that have already been submitted to MPI */
  59. static struct _starpu_mpi_req_list *detached_requests;
  60. static starpu_pthread_mutex_t detached_requests_mutex;
  61. /* Condition to wake up progression thread */
  62. static starpu_pthread_cond_t cond_progression;
  63. /* Condition to wake up waiting for all current MPI requests to finish */
  64. static starpu_pthread_cond_t cond_finished;
  65. static starpu_pthread_mutex_t mutex;
  66. #ifndef STARPU_SIMGRID
  67. static starpu_pthread_t progress_thread;
  68. #endif
  69. static int running = 0;
  70. #ifdef STARPU_SIMGRID
  71. static int _mpi_world_size;
  72. static int _mpi_world_rank;
  73. #endif
  74. /* Count requests posted by the application and not yet submitted to MPI */
  75. static starpu_pthread_mutex_t mutex_posted_requests;
  76. static int posted_requests = 0, newer_requests, barrier_running = 0;
  77. #define _STARPU_MPI_INC_POSTED_REQUESTS(value) { STARPU_PTHREAD_MUTEX_LOCK(&mutex_posted_requests); posted_requests += value; STARPU_PTHREAD_MUTEX_UNLOCK(&mutex_posted_requests); }
  78. #pragma weak smpi_simulated_main_
  79. extern int smpi_simulated_main_(int argc, char *argv[]);
  80. #ifdef HAVE_SMPI_PROCESS_SET_USER_DATA
  81. #if !HAVE_DECL_SMPI_PROCESS_SET_USER_DATA
  82. extern void smpi_process_set_user_data(void *);
  83. #endif
  84. #endif
  85. static void _starpu_mpi_request_init(struct _starpu_mpi_req **req)
  86. {
  87. *req = calloc(1, sizeof(struct _starpu_mpi_req));
  88. STARPU_MPI_ASSERT_MSG(*req, "Invalid request");
  89. /* Initialize the request structure */
  90. (*req)->data_handle = NULL;
  91. (*req)->datatype = 0;
  92. (*req)->datatype_name = NULL;
  93. (*req)->ptr = NULL;
  94. (*req)->count = -1;
  95. (*req)->registered_datatype = -1;
  96. (*req)->node_tag.rank = -1;
  97. (*req)->node_tag.data_tag = -1;
  98. (*req)->node_tag.comm = 0;
  99. (*req)->func = NULL;
  100. (*req)->status = NULL;
  101. (*req)->data_request = 0;
  102. (*req)->flag = NULL;
  103. (*req)->ret = -1;
  104. STARPU_PTHREAD_MUTEX_INIT(&((*req)->req_mutex), NULL);
  105. STARPU_PTHREAD_COND_INIT(&((*req)->req_cond), NULL);
  106. STARPU_PTHREAD_MUTEX_INIT(&((*req)->posted_mutex), NULL);
  107. STARPU_PTHREAD_COND_INIT(&((*req)->posted_cond), NULL);
  108. (*req)->request_type = UNKNOWN_REQ;
  109. (*req)->submitted = 0;
  110. (*req)->completed = 0;
  111. (*req)->posted = 0;
  112. (*req)->other_request = NULL;
  113. (*req)->sync = 0;
  114. (*req)->detached = -1;
  115. (*req)->callback = NULL;
  116. (*req)->callback_arg = NULL;
  117. (*req)->size_req = 0;
  118. (*req)->internal_req = NULL;
  119. (*req)->is_internal_req = 0;
  120. (*req)->envelope = NULL;
  121. (*req)->sequential_consistency = 1;
  122. }
  123. static void _starpu_mpi_request_destroy(struct _starpu_mpi_req *req)
  124. {
  125. STARPU_PTHREAD_MUTEX_DESTROY(&req->req_mutex);
  126. STARPU_PTHREAD_COND_DESTROY(&req->req_cond);
  127. STARPU_PTHREAD_MUTEX_DESTROY(&req->posted_mutex);
  128. STARPU_PTHREAD_COND_DESTROY(&req->posted_cond);
  129. free(req->datatype_name);
  130. req->datatype_name = NULL;
  131. free(req);
  132. req = NULL;
  133. }
  134. /********************************************************/
  135. /* */
  136. /* Send/Receive functionalities */
  137. /* */
  138. /********************************************************/
  139. struct _starpu_mpi_early_data_cb_args
  140. {
  141. starpu_data_handle_t data_handle;
  142. starpu_data_handle_t early_handle;
  143. struct _starpu_mpi_req *req;
  144. void *buffer;
  145. };
  146. static void _starpu_mpi_submit_ready_request(void *arg)
  147. {
  148. _STARPU_MPI_LOG_IN();
  149. struct _starpu_mpi_req *req = arg;
  150. _STARPU_MPI_INC_POSTED_REQUESTS(-1);
  151. _STARPU_MPI_DEBUG(3, "new req %p srcdst %d tag %d and type %s %d\n", req, req->node_tag.rank, req->node_tag.data_tag, _starpu_mpi_request_type(req->request_type), req->is_internal_req);
  152. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  153. if (req->request_type == RECV_REQ)
  154. {
  155. /* Case : the request is the internal receive request submitted
  156. * by StarPU-MPI to receive incoming data without a matching
  157. * early_request from the application. We immediately allocate the
  158. * pointer associated to the data_handle, and push it into the
  159. * ready_requests list, so as the real MPI request can be submitted
  160. * before the next submission of the envelope-catching request. */
  161. if (req->is_internal_req)
  162. {
  163. _starpu_mpi_handle_allocate_datatype(req->data_handle, req);
  164. if (req->registered_datatype == 1)
  165. {
  166. req->count = 1;
  167. req->ptr = starpu_data_get_local_ptr(req->data_handle);
  168. }
  169. else
  170. {
  171. STARPU_ASSERT(req->count);
  172. req->ptr = malloc(req->count);
  173. STARPU_MPI_ASSERT_MSG(req->ptr, "cannot allocate message of size %ld\n", req->count);
  174. }
  175. _STARPU_MPI_DEBUG(3, "Pushing internal starpu_mpi_irecv request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d registered_datatype %d \n",
  176. req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.rank, req->data_handle, req->ptr,
  177. req->datatype_name, (int)req->count, req->registered_datatype);
  178. _starpu_mpi_req_list_push_front(ready_requests, req);
  179. /* inform the starpu mpi thread that the request has been pushed in the ready_requests list */
  180. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  181. STARPU_PTHREAD_MUTEX_LOCK(&req->posted_mutex);
  182. req->posted = 1;
  183. STARPU_PTHREAD_COND_BROADCAST(&req->posted_cond);
  184. STARPU_PTHREAD_MUTEX_UNLOCK(&req->posted_mutex);
  185. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  186. }
  187. else
  188. {
  189. /* test whether some data with the given tag and source have already been received by StarPU-MPI*/
  190. struct _starpu_mpi_early_data_handle *early_data_handle = _starpu_mpi_early_data_find(&req->node_tag);
  191. /* Case: a receive request for a data with the given tag and source has already been
  192. * posted by StarPU. Asynchronously requests a Read permission over the temporary handle ,
  193. * so as when the internal receive is completed, the _starpu_mpi_early_data_cb function
  194. * will be called to bring the data back to the original data handle associated to the request.*/
  195. if (early_data_handle)
  196. {
  197. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  198. STARPU_PTHREAD_MUTEX_LOCK(&(early_data_handle->req_mutex));
  199. while (!(early_data_handle->req_ready))
  200. STARPU_PTHREAD_COND_WAIT(&(early_data_handle->req_cond), &(early_data_handle->req_mutex));
  201. STARPU_PTHREAD_MUTEX_UNLOCK(&(early_data_handle->req_mutex));
  202. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  203. _STARPU_MPI_DEBUG(3, "The RECV request %p with tag %d has already been received, copying previously received data into handle's pointer..\n", req, req->node_tag.data_tag);
  204. STARPU_ASSERT(req->data_handle != early_data_handle->handle);
  205. req->internal_req = early_data_handle->req;
  206. struct _starpu_mpi_early_data_cb_args *cb_args = malloc(sizeof(struct _starpu_mpi_early_data_cb_args));
  207. cb_args->data_handle = req->data_handle;
  208. cb_args->early_handle = early_data_handle->handle;
  209. cb_args->buffer = early_data_handle->buffer;
  210. cb_args->req = req;
  211. _STARPU_MPI_DEBUG(3, "Calling data_acquire_cb on starpu_mpi_copy_cb..\n");
  212. starpu_data_acquire_cb(early_data_handle->handle,STARPU_R,_starpu_mpi_early_data_cb,(void*) cb_args);
  213. }
  214. /* Case: no matching data has been received. Store the receive request as an early_request. */
  215. else
  216. {
  217. struct _starpu_mpi_req *sync_req = _starpu_mpi_sync_data_find(req->node_tag.data_tag, req->node_tag.rank, req->node_tag.comm);
  218. _STARPU_MPI_DEBUG(3, "----------> Looking for sync data for tag %d and src %d = %p\n", req->node_tag.data_tag, req->node_tag.rank, sync_req);
  219. if (sync_req)
  220. {
  221. req->sync = 1;
  222. _starpu_mpi_handle_allocate_datatype(req->data_handle, req);
  223. if (req->registered_datatype == 1)
  224. {
  225. req->count = 1;
  226. req->ptr = starpu_data_get_local_ptr(req->data_handle);
  227. }
  228. else
  229. {
  230. req->count = sync_req->count;
  231. STARPU_ASSERT(req->count);
  232. req->ptr = malloc(req->count);
  233. STARPU_MPI_ASSERT_MSG(req->ptr, "cannot allocate message of size %ld\n", req->count);
  234. }
  235. _starpu_mpi_req_list_push_front(ready_requests, req);
  236. _starpu_mpi_request_destroy(sync_req);
  237. }
  238. else
  239. {
  240. _STARPU_MPI_DEBUG(3, "Adding the pending receive request %p (srcdst %d tag %d) into the request hashmap\n", req, req->node_tag.rank, req->node_tag.data_tag);
  241. _starpu_mpi_early_request_enqueue(req);
  242. }
  243. }
  244. }
  245. }
  246. else
  247. {
  248. _starpu_mpi_req_list_push_front(ready_requests, req);
  249. _STARPU_MPI_DEBUG(3, "Pushing new request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d registered_datatype %d \n",
  250. req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.rank, req->data_handle, req->ptr,
  251. req->datatype_name, (int)req->count, req->registered_datatype);
  252. }
  253. newer_requests = 1;
  254. STARPU_PTHREAD_COND_BROADCAST(&cond_progression);
  255. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  256. _STARPU_MPI_LOG_OUT();
  257. }
  258. static struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle_t data_handle,
  259. int srcdst, int data_tag, MPI_Comm comm,
  260. unsigned detached, unsigned sync, void (*callback)(void *), void *arg,
  261. enum _starpu_mpi_request_type request_type, void (*func)(struct _starpu_mpi_req *),
  262. enum starpu_data_access_mode mode,
  263. int sequential_consistency,
  264. int is_internal_req,
  265. starpu_ssize_t count)
  266. {
  267. struct _starpu_mpi_req *req;
  268. _STARPU_MPI_LOG_IN();
  269. _STARPU_MPI_INC_POSTED_REQUESTS(1);
  270. _starpu_mpi_comm_register(comm);
  271. /* Initialize the request structure */
  272. _starpu_mpi_request_init(&req);
  273. req->request_type = request_type;
  274. req->data_handle = data_handle;
  275. req->node_tag.rank = srcdst;
  276. req->node_tag.data_tag = data_tag;
  277. req->node_tag.comm = comm;
  278. req->detached = detached;
  279. req->sync = sync;
  280. req->callback = callback;
  281. req->callback_arg = arg;
  282. req->func = func;
  283. req->sequential_consistency = sequential_consistency;
  284. req->is_internal_req = is_internal_req;
  285. req->count = count;
  286. /* Asynchronously request StarPU to fetch the data in main memory: when
  287. * it is available in main memory, _starpu_mpi_submit_ready_request(req) is called and
  288. * the request is actually submitted */
  289. starpu_data_acquire_cb_sequential_consistency(data_handle, mode, _starpu_mpi_submit_ready_request, (void *)req, sequential_consistency);
  290. _STARPU_MPI_LOG_OUT();
  291. return req;
  292. }
  293. /********************************************************/
  294. /* */
  295. /* Send functionalities */
  296. /* */
  297. /********************************************************/
  298. static void _starpu_mpi_isend_data_func(struct _starpu_mpi_req *req)
  299. {
  300. _STARPU_MPI_LOG_IN();
  301. _STARPU_MPI_DEBUG(30, "post MPI isend request %p type %s tag %d src %d data %p datasize %ld ptr %p datatype '%s' count %d registered_datatype %d sync %d\n", req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.rank, req->data_handle, starpu_data_get_size(req->data_handle), req->ptr, req->datatype_name, (int)req->count, req->registered_datatype, req->sync);
  302. _starpu_mpi_comm_amounts_inc(req->node_tag.comm, req->node_tag.rank, req->datatype, req->count);
  303. _STARPU_MPI_TRACE_ISEND_SUBMIT_BEGIN(req->node_tag.rank, req->node_tag.data_tag, 0);
  304. if (req->sync == 0)
  305. {
  306. _STARPU_MPI_COMM_TO_DEBUG(req->count, req->datatype, req->node_tag.rank, _STARPU_MPI_TAG_DATA, req->node_tag.data_tag, req->node_tag.comm);
  307. req->ret = MPI_Isend(req->ptr, req->count, req->datatype, req->node_tag.rank, _STARPU_MPI_TAG_DATA, req->node_tag.comm, &req->data_request);
  308. STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Isend returning %s", _starpu_mpi_get_mpi_error_code(req->ret));
  309. }
  310. else
  311. {
  312. _STARPU_MPI_COMM_TO_DEBUG(req->count, req->datatype, req->node_tag.rank, _STARPU_MPI_TAG_SYNC_DATA, req->node_tag.data_tag, req->node_tag.comm);
  313. req->ret = MPI_Issend(req->ptr, req->count, req->datatype, req->node_tag.rank, _STARPU_MPI_TAG_SYNC_DATA, req->node_tag.comm, &req->data_request);
  314. STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Issend returning %s", _starpu_mpi_get_mpi_error_code(req->ret));
  315. }
  316. _STARPU_MPI_TRACE_ISEND_SUBMIT_END(req->node_tag.rank, req->node_tag.data_tag, 0);
  317. /* somebody is perhaps waiting for the MPI request to be posted */
  318. STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
  319. req->submitted = 1;
  320. STARPU_PTHREAD_COND_BROADCAST(&req->req_cond);
  321. STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
  322. _starpu_mpi_handle_detached_request(req);
  323. _STARPU_MPI_LOG_OUT();
  324. }
  325. static void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
  326. {
  327. _starpu_mpi_handle_allocate_datatype(req->data_handle, req);
  328. req->envelope = calloc(1,sizeof(struct _starpu_mpi_envelope));
  329. req->envelope->mode = _STARPU_MPI_ENVELOPE_DATA;
  330. req->envelope->data_tag = req->node_tag.data_tag;
  331. req->envelope->sync = req->sync;
  332. if (req->registered_datatype == 1)
  333. {
  334. int size;
  335. req->count = 1;
  336. req->ptr = starpu_data_get_local_ptr(req->data_handle);
  337. MPI_Type_size(req->datatype, &size);
  338. req->envelope->size = (starpu_ssize_t)req->count * size;
  339. _STARPU_MPI_DEBUG(20, "Post MPI isend count (%ld) datatype_size %ld request to %d\n",req->count,starpu_data_get_size(req->data_handle), req->node_tag.rank);
  340. _STARPU_MPI_COMM_TO_DEBUG(sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->node_tag.rank, _STARPU_MPI_TAG_ENVELOPE, _STARPU_MPI_TAG_ENVELOPE, req->node_tag.comm);
  341. MPI_Isend(req->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->node_tag.rank, _STARPU_MPI_TAG_ENVELOPE, req->node_tag.comm, &req->size_req);
  342. }
  343. else
  344. {
  345. int ret;
  346. // Do not pack the data, just try to find out the size
  347. starpu_data_pack(req->data_handle, NULL, &(req->envelope->size));
  348. if (req->envelope->size != -1)
  349. {
  350. // We already know the size of the data, let's send it to overlap with the packing of the data
  351. _STARPU_MPI_DEBUG(20, "Sending size %ld (%ld %s) to node %d (first call to pack)\n", req->envelope->size, sizeof(req->count), "MPI_BYTE", req->node_tag.rank);
  352. req->count = req->envelope->size;
  353. _STARPU_MPI_COMM_TO_DEBUG(sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->node_tag.rank, _STARPU_MPI_TAG_ENVELOPE, _STARPU_MPI_TAG_ENVELOPE, req->node_tag.comm);
  354. ret = MPI_Isend(req->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->node_tag.rank, _STARPU_MPI_TAG_ENVELOPE, req->node_tag.comm, &req->size_req);
  355. STARPU_MPI_ASSERT_MSG(ret == MPI_SUCCESS, "when sending size, MPI_Isend returning %s", _starpu_mpi_get_mpi_error_code(ret));
  356. }
  357. // Pack the data
  358. starpu_data_pack(req->data_handle, &req->ptr, &req->count);
  359. if (req->envelope->size == -1)
  360. {
  361. // We know the size now, let's send it
  362. _STARPU_MPI_DEBUG(20, "Sending size %ld (%ld %s) to node %d (second call to pack)\n", req->envelope->size, sizeof(req->count), "MPI_BYTE", req->node_tag.rank);
  363. _STARPU_MPI_COMM_TO_DEBUG(sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->node_tag.rank, _STARPU_MPI_TAG_ENVELOPE, _STARPU_MPI_TAG_ENVELOPE, req->node_tag.comm);
  364. ret = MPI_Isend(req->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->node_tag.rank, _STARPU_MPI_TAG_ENVELOPE, req->node_tag.comm, &req->size_req);
  365. STARPU_MPI_ASSERT_MSG(ret == MPI_SUCCESS, "when sending size, MPI_Isend returning %s", _starpu_mpi_get_mpi_error_code(ret));
  366. }
  367. else
  368. {
  369. // We check the size returned with the 2 calls to pack is the same
  370. STARPU_MPI_ASSERT_MSG(req->count == req->envelope->size, "Calls to pack_data returned different sizes %ld != %ld", req->count, req->envelope->size);
  371. }
  372. // We can send the data now
  373. }
  374. if (req->sync)
  375. {
  376. // If the data is to be sent in synchronous mode, we need to wait for the receiver ready message
  377. _starpu_mpi_sync_data_add(req);
  378. }
  379. else
  380. {
  381. // Otherwise we can send the data
  382. _starpu_mpi_isend_data_func(req);
  383. }
  384. }
  385. static struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t data_handle,
  386. int dest, int data_tag, MPI_Comm comm,
  387. unsigned detached, unsigned sync, void (*callback)(void *), void *arg,
  388. int sequential_consistency)
  389. {
  390. return _starpu_mpi_isend_irecv_common(data_handle, dest, data_tag, comm, detached, sync, callback, arg, SEND_REQ, _starpu_mpi_isend_size_func, STARPU_R, sequential_consistency, 0, 0);
  391. }
  392. int starpu_mpi_isend(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int dest, int data_tag, MPI_Comm comm)
  393. {
  394. _STARPU_MPI_LOG_IN();
  395. STARPU_MPI_ASSERT_MSG(public_req, "starpu_mpi_isend needs a valid starpu_mpi_req");
  396. struct _starpu_mpi_req *req;
  397. _STARPU_MPI_TRACE_ISEND_COMPLETE_BEGIN(dest, data_tag, 0);
  398. req = _starpu_mpi_isend_common(data_handle, dest, data_tag, comm, 0, 0, NULL, NULL, 1);
  399. _STARPU_MPI_TRACE_ISEND_COMPLETE_END(dest, data_tag, 0);
  400. STARPU_MPI_ASSERT_MSG(req, "Invalid return for _starpu_mpi_isend_common");
  401. *public_req = req;
  402. _STARPU_MPI_LOG_OUT();
  403. return 0;
  404. }
  405. int starpu_mpi_isend_detached(starpu_data_handle_t data_handle,
  406. int dest, int data_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
  407. {
  408. _STARPU_MPI_LOG_IN();
  409. _starpu_mpi_isend_common(data_handle, dest, data_tag, comm, 1, 0, callback, arg, 1);
  410. _STARPU_MPI_LOG_OUT();
  411. return 0;
  412. }
  413. int starpu_mpi_send(starpu_data_handle_t data_handle, int dest, int data_tag, MPI_Comm comm)
  414. {
  415. starpu_mpi_req req;
  416. MPI_Status status;
  417. _STARPU_MPI_LOG_IN();
  418. memset(&status, 0, sizeof(MPI_Status));
  419. starpu_mpi_isend(data_handle, &req, dest, data_tag, comm);
  420. starpu_mpi_wait(&req, &status);
  421. _STARPU_MPI_LOG_OUT();
  422. return 0;
  423. }
  424. int starpu_mpi_issend(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int dest, int data_tag, MPI_Comm comm)
  425. {
  426. _STARPU_MPI_LOG_IN();
  427. STARPU_MPI_ASSERT_MSG(public_req, "starpu_mpi_issend needs a valid starpu_mpi_req");
  428. struct _starpu_mpi_req *req;
  429. req = _starpu_mpi_isend_common(data_handle, dest, data_tag, comm, 0, 1, NULL, NULL, 1);
  430. STARPU_MPI_ASSERT_MSG(req, "Invalid return for _starpu_mpi_isend_common");
  431. *public_req = req;
  432. _STARPU_MPI_LOG_OUT();
  433. return 0;
  434. }
  435. int starpu_mpi_issend_detached(starpu_data_handle_t data_handle, int dest, int data_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
  436. {
  437. _STARPU_MPI_LOG_IN();
  438. _starpu_mpi_isend_common(data_handle, dest, data_tag, comm, 1, 1, callback, arg, 1);
  439. _STARPU_MPI_LOG_OUT();
  440. return 0;
  441. }
  442. /********************************************************/
  443. /* */
  444. /* receive functionalities */
  445. /* */
  446. /********************************************************/
  447. static void _starpu_mpi_irecv_data_func(struct _starpu_mpi_req *req)
  448. {
  449. _STARPU_MPI_LOG_IN();
  450. _STARPU_MPI_DEBUG(20, "post MPI irecv request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d registered_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.rank, req->data_handle, req->ptr, req->datatype_name, (int)req->count, req->registered_datatype);
  451. _STARPU_MPI_TRACE_IRECV_SUBMIT_BEGIN(req->node_tag.rank, req->node_tag.data_tag);
  452. if (req->sync)
  453. {
  454. struct _starpu_mpi_envelope *_envelope = calloc(1,sizeof(struct _starpu_mpi_envelope));
  455. _envelope->mode = _STARPU_MPI_ENVELOPE_SYNC_READY;
  456. _envelope->data_tag = req->node_tag.data_tag;
  457. _STARPU_MPI_DEBUG(20, "Telling node %d it can send the data and waiting for the data back ...\n", req->node_tag.rank);
  458. _STARPU_MPI_COMM_TO_DEBUG(sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->node_tag.rank, _STARPU_MPI_TAG_ENVELOPE, _STARPU_MPI_TAG_ENVELOPE, req->node_tag.comm);
  459. req->ret = MPI_Send(_envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->node_tag.rank, _STARPU_MPI_TAG_ENVELOPE, req->node_tag.comm);
  460. STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Send returning %s", _starpu_mpi_get_mpi_error_code(req->ret));
  461. free(_envelope);
  462. _envelope = NULL;
  463. }
  464. if (req->sync)
  465. {
  466. _STARPU_MPI_COMM_FROM_DEBUG(req->count, req->datatype, req->node_tag.rank, _STARPU_MPI_TAG_SYNC_DATA, req->node_tag.data_tag, req->node_tag.comm);
  467. req->ret = MPI_Irecv(req->ptr, req->count, req->datatype, req->node_tag.rank, _STARPU_MPI_TAG_SYNC_DATA, req->node_tag.comm, &req->data_request);
  468. }
  469. else
  470. {
  471. _STARPU_MPI_COMM_FROM_DEBUG(req->count, req->datatype, req->node_tag.rank, _STARPU_MPI_TAG_DATA, req->node_tag.data_tag, req->node_tag.comm);
  472. req->ret = MPI_Irecv(req->ptr, req->count, req->datatype, req->node_tag.rank, _STARPU_MPI_TAG_DATA, req->node_tag.comm, &req->data_request);
  473. }
  474. STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_IRecv returning %s", _starpu_mpi_get_mpi_error_code(req->ret));
  475. _STARPU_MPI_TRACE_IRECV_SUBMIT_END(req->node_tag.rank, req->node_tag.data_tag);
  476. /* somebody is perhaps waiting for the MPI request to be posted */
  477. STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
  478. req->submitted = 1;
  479. STARPU_PTHREAD_COND_BROADCAST(&req->req_cond);
  480. STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
  481. _starpu_mpi_handle_detached_request(req);
  482. _STARPU_MPI_LOG_OUT();
  483. }
  484. static struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle, int source, int data_tag, MPI_Comm comm, unsigned detached, unsigned sync, void (*callback)(void *), void *arg, int sequential_consistency, int is_internal_req, starpu_ssize_t count)
  485. {
  486. return _starpu_mpi_isend_irecv_common(data_handle, source, data_tag, comm, detached, sync, callback, arg, RECV_REQ, _starpu_mpi_irecv_data_func, STARPU_W, sequential_consistency, is_internal_req, count);
  487. }
  488. int starpu_mpi_irecv(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int source, int data_tag, MPI_Comm comm)
  489. {
  490. _STARPU_MPI_LOG_IN();
  491. STARPU_MPI_ASSERT_MSG(public_req, "starpu_mpi_irecv needs a valid starpu_mpi_req");
  492. // // We check if a tag is defined for the data handle, if not,
  493. // // we define the one given for the communication.
  494. // // A tag is necessary for the internal mpi engine.
  495. // int tag = starpu_data_get_tag(data_handle);
  496. // if (tag == -1)
  497. // starpu_data_set_tag(data_handle, data_tag);
  498. struct _starpu_mpi_req *req;
  499. _STARPU_MPI_TRACE_IRECV_COMPLETE_BEGIN(source, data_tag);
  500. req = _starpu_mpi_irecv_common(data_handle, source, data_tag, comm, 0, 0, NULL, NULL, 1, 0, 0);
  501. _STARPU_MPI_TRACE_IRECV_COMPLETE_END(source, data_tag);
  502. STARPU_MPI_ASSERT_MSG(req, "Invalid return for _starpu_mpi_irecv_common");
  503. *public_req = req;
  504. _STARPU_MPI_LOG_OUT();
  505. return 0;
  506. }
  507. int starpu_mpi_irecv_detached(starpu_data_handle_t data_handle, int source, int data_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
  508. {
  509. _STARPU_MPI_LOG_IN();
  510. // // We check if a tag is defined for the data handle, if not,
  511. // // we define the one given for the communication.
  512. // // A tag is necessary for the internal mpi engine.
  513. // int tag = starpu_data_get_tag(data_handle);
  514. // if (tag == -1)
  515. // starpu_data_set_tag(data_handle, data_tag);
  516. _starpu_mpi_irecv_common(data_handle, source, data_tag, comm, 1, 0, callback, arg, 1, 0, 0);
  517. _STARPU_MPI_LOG_OUT();
  518. return 0;
  519. }
  520. int starpu_mpi_irecv_detached_sequential_consistency(starpu_data_handle_t data_handle, int source, int data_tag, MPI_Comm comm, void (*callback)(void *), void *arg, int sequential_consistency)
  521. {
  522. _STARPU_MPI_LOG_IN();
  523. // // We check if a tag is defined for the data handle, if not,
  524. // // we define the one given for the communication.
  525. // // A tag is necessary for the internal mpi engine.
  526. // int tag = starpu_data_get_tag(data_handle);
  527. // if (tag == -1)
  528. // starpu_data_set_tag(data_handle, data_tag);
  529. _starpu_mpi_irecv_common(data_handle, source, data_tag, comm, 1, 0, callback, arg, sequential_consistency, 0, 0);
  530. _STARPU_MPI_LOG_OUT();
  531. return 0;
  532. }
  533. int starpu_mpi_recv(starpu_data_handle_t data_handle, int source, int data_tag, MPI_Comm comm, MPI_Status *status)
  534. {
  535. starpu_mpi_req req;
  536. _STARPU_MPI_LOG_IN();
  537. // // We check if a tag is defined for the data handle, if not,
  538. // // we define the one given for the communication.
  539. // // A tag is necessary for the internal mpi engine.
  540. // int tag = starpu_data_get_tag(data_handle);
  541. // if (tag == -1)
  542. // starpu_data_set_tag(data_handle, data_tag);
  543. starpu_mpi_irecv(data_handle, &req, source, data_tag, comm);
  544. starpu_mpi_wait(&req, status);
  545. _STARPU_MPI_LOG_OUT();
  546. return 0;
  547. }
  548. /********************************************************/
  549. /* */
  550. /* Wait functionalities */
  551. /* */
  552. /********************************************************/
  553. static void _starpu_mpi_wait_func(struct _starpu_mpi_req *waiting_req)
  554. {
  555. _STARPU_MPI_LOG_IN();
  556. /* Which is the mpi request we are waiting for ? */
  557. struct _starpu_mpi_req *req = waiting_req->other_request;
  558. _STARPU_MPI_TRACE_UWAIT_BEGIN(req->node_tag.rank, req->node_tag.data_tag);
  559. if (req->data_request != MPI_REQUEST_NULL)
  560. {
  561. req->ret = MPI_Wait(&req->data_request, waiting_req->status);
  562. STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Wait returning %s", _starpu_mpi_get_mpi_error_code(req->ret));
  563. }
  564. _STARPU_MPI_TRACE_UWAIT_END(req->node_tag.rank, req->node_tag.data_tag);
  565. _starpu_mpi_handle_request_termination(req);
  566. _STARPU_MPI_LOG_OUT();
  567. }
  568. int starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status)
  569. {
  570. int ret;
  571. struct _starpu_mpi_req *req = *public_req;
  572. struct _starpu_mpi_req *waiting_req;
  573. _STARPU_MPI_LOG_IN();
  574. _STARPU_MPI_INC_POSTED_REQUESTS(1);
  575. /* We cannot try to complete a MPI request that was not actually posted
  576. * to MPI yet. */
  577. STARPU_PTHREAD_MUTEX_LOCK(&(req->req_mutex));
  578. while (!(req->submitted))
  579. STARPU_PTHREAD_COND_WAIT(&(req->req_cond), &(req->req_mutex));
  580. STARPU_PTHREAD_MUTEX_UNLOCK(&(req->req_mutex));
  581. /* Initialize the request structure */
  582. _starpu_mpi_request_init(&waiting_req);
  583. waiting_req->status = status;
  584. waiting_req->other_request = req;
  585. waiting_req->func = _starpu_mpi_wait_func;
  586. waiting_req->request_type = WAIT_REQ;
  587. _starpu_mpi_submit_ready_request(waiting_req);
  588. /* We wait for the MPI request to finish */
  589. STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
  590. while (!req->completed)
  591. STARPU_PTHREAD_COND_WAIT(&req->req_cond, &req->req_mutex);
  592. STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
  593. ret = req->ret;
  594. /* The internal request structure was automatically allocated */
  595. *public_req = NULL;
  596. if (req->internal_req)
  597. {
  598. _starpu_mpi_request_destroy(req->internal_req);
  599. }
  600. _starpu_mpi_request_destroy(req);
  601. _starpu_mpi_request_destroy(waiting_req);
  602. _STARPU_MPI_LOG_OUT();
  603. return ret;
  604. }
  605. /********************************************************/
  606. /* */
  607. /* Test functionalities */
  608. /* */
  609. /********************************************************/
  610. static void _starpu_mpi_test_func(struct _starpu_mpi_req *testing_req)
  611. {
  612. _STARPU_MPI_LOG_IN();
  613. /* Which is the mpi request we are testing for ? */
  614. struct _starpu_mpi_req *req = testing_req->other_request;
  615. _STARPU_MPI_DEBUG(2, "Test request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d registered_datatype %d \n",
  616. req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.rank, req->data_handle, req->ptr, req->datatype_name, (int)req->count, req->registered_datatype);
  617. _STARPU_MPI_TRACE_UTESTING_BEGIN(req->node_tag.rank, req->node_tag.data_tag);
  618. req->ret = MPI_Test(&req->data_request, testing_req->flag, testing_req->status);
  619. STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Test returning %s", _starpu_mpi_get_mpi_error_code(req->ret));
  620. _STARPU_MPI_TRACE_UTESTING_END(req->node_tag.rank, req->node_tag.data_tag);
  621. if (*testing_req->flag)
  622. {
  623. testing_req->ret = req->ret;
  624. _starpu_mpi_handle_request_termination(req);
  625. }
  626. STARPU_PTHREAD_MUTEX_LOCK(&testing_req->req_mutex);
  627. testing_req->completed = 1;
  628. STARPU_PTHREAD_COND_SIGNAL(&testing_req->req_cond);
  629. STARPU_PTHREAD_MUTEX_UNLOCK(&testing_req->req_mutex);
  630. _STARPU_MPI_LOG_OUT();
  631. }
  632. int starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status)
  633. {
  634. _STARPU_MPI_LOG_IN();
  635. int ret = 0;
  636. STARPU_MPI_ASSERT_MSG(public_req, "starpu_mpi_test needs a valid starpu_mpi_req");
  637. struct _starpu_mpi_req *req = *public_req;
  638. STARPU_MPI_ASSERT_MSG(!req->detached, "MPI_Test cannot be called on a detached request");
  639. STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
  640. unsigned submitted = req->submitted;
  641. STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
  642. if (submitted)
  643. {
  644. struct _starpu_mpi_req *testing_req;
  645. _starpu_mpi_request_init(&testing_req);
  646. /* Initialize the request structure */
  647. STARPU_PTHREAD_MUTEX_INIT(&(testing_req->req_mutex), NULL);
  648. STARPU_PTHREAD_COND_INIT(&(testing_req->req_cond), NULL);
  649. testing_req->flag = flag;
  650. testing_req->status = status;
  651. testing_req->other_request = req;
  652. testing_req->func = _starpu_mpi_test_func;
  653. testing_req->completed = 0;
  654. testing_req->request_type = TEST_REQ;
  655. _STARPU_MPI_INC_POSTED_REQUESTS(1);
  656. _starpu_mpi_submit_ready_request(testing_req);
  657. /* We wait for the test request to finish */
  658. STARPU_PTHREAD_MUTEX_LOCK(&(testing_req->req_mutex));
  659. while (!(testing_req->completed))
  660. STARPU_PTHREAD_COND_WAIT(&(testing_req->req_cond), &(testing_req->req_mutex));
  661. STARPU_PTHREAD_MUTEX_UNLOCK(&(testing_req->req_mutex));
  662. ret = testing_req->ret;
  663. if (*(testing_req->flag))
  664. {
  665. /* The request was completed so we free the internal
  666. * request structure which was automatically allocated
  667. * */
  668. *public_req = NULL;
  669. if (req->internal_req)
  670. {
  671. _starpu_mpi_request_destroy(req->internal_req);
  672. }
  673. _starpu_mpi_request_destroy(req);
  674. }
  675. _starpu_mpi_request_destroy(testing_req);
  676. }
  677. else
  678. {
  679. *flag = 0;
  680. }
  681. _STARPU_MPI_LOG_OUT();
  682. return ret;
  683. }
  684. /********************************************************/
  685. /* */
  686. /* Barrier functionalities */
  687. /* */
  688. /********************************************************/
  689. static void _starpu_mpi_barrier_func(struct _starpu_mpi_req *barrier_req)
  690. {
  691. _STARPU_MPI_LOG_IN();
  692. barrier_req->ret = MPI_Barrier(barrier_req->node_tag.comm);
  693. STARPU_MPI_ASSERT_MSG(barrier_req->ret == MPI_SUCCESS, "MPI_Barrier returning %s", _starpu_mpi_get_mpi_error_code(barrier_req->ret));
  694. _starpu_mpi_handle_request_termination(barrier_req);
  695. _STARPU_MPI_LOG_OUT();
  696. }
  697. int _starpu_mpi_barrier(MPI_Comm comm)
  698. {
  699. _STARPU_MPI_LOG_IN();
  700. int ret = posted_requests;
  701. struct _starpu_mpi_req *barrier_req;
  702. _starpu_mpi_request_init(&barrier_req);
  703. /* First wait for *both* all tasks and MPI requests to finish, in case
  704. * some tasks generate MPI requests, MPI requests generate tasks, etc.
  705. */
  706. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  707. STARPU_MPI_ASSERT_MSG(!barrier_running, "Concurrent starpu_mpi_barrier is not implemented, even on different communicators");
  708. barrier_running = 1;
  709. do
  710. {
  711. while (posted_requests)
  712. /* Wait for all current MPI requests to finish */
  713. STARPU_PTHREAD_COND_WAIT(&cond_finished, &mutex);
  714. /* No current request, clear flag */
  715. newer_requests = 0;
  716. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  717. /* Now wait for all tasks */
  718. starpu_task_wait_for_all();
  719. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  720. /* Check newer_requests again, in case some MPI requests
  721. * triggered by tasks completed and triggered tasks between
  722. * wait_for_all finished and we take the lock */
  723. } while (posted_requests || newer_requests);
  724. barrier_running = 0;
  725. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  726. /* Initialize the request structure */
  727. STARPU_PTHREAD_MUTEX_INIT(&(barrier_req->req_mutex), NULL);
  728. STARPU_PTHREAD_COND_INIT(&(barrier_req->req_cond), NULL);
  729. barrier_req->func = _starpu_mpi_barrier_func;
  730. barrier_req->request_type = BARRIER_REQ;
  731. barrier_req->node_tag.comm = comm;
  732. _STARPU_MPI_INC_POSTED_REQUESTS(1);
  733. _starpu_mpi_submit_ready_request(barrier_req);
  734. /* We wait for the MPI request to finish */
  735. STARPU_PTHREAD_MUTEX_LOCK(&barrier_req->req_mutex);
  736. while (!barrier_req->completed)
  737. STARPU_PTHREAD_COND_WAIT(&barrier_req->req_cond, &barrier_req->req_mutex);
  738. STARPU_PTHREAD_MUTEX_UNLOCK(&barrier_req->req_mutex);
  739. _starpu_mpi_request_destroy(barrier_req);
  740. _STARPU_MPI_LOG_OUT();
  741. return ret;
  742. }
  743. int starpu_mpi_barrier(MPI_Comm comm)
  744. {
  745. _starpu_mpi_barrier(comm);
  746. return 0;
  747. }
  748. /********************************************************/
  749. /* */
  750. /* Progression */
  751. /* */
  752. /********************************************************/
  753. #ifdef STARPU_VERBOSE
  754. static char *_starpu_mpi_request_type(enum _starpu_mpi_request_type request_type)
  755. {
  756. switch (request_type)
  757. {
  758. case SEND_REQ: return "SEND_REQ";
  759. case RECV_REQ: return "RECV_REQ";
  760. case WAIT_REQ: return "WAIT_REQ";
  761. case TEST_REQ: return "TEST_REQ";
  762. case BARRIER_REQ: return "BARRIER_REQ";
  763. case UNKNOWN_REQ: return "UNSET_REQ";
  764. default: return "unknown request type";
  765. }
  766. }
  767. #endif
  768. static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req)
  769. {
  770. _STARPU_MPI_LOG_IN();
  771. _STARPU_MPI_DEBUG(2, "complete MPI request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d registered_datatype %d internal_req %p\n",
  772. req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.rank, req->data_handle, req->ptr,
  773. req->datatype_name, (int)req->count, req->registered_datatype, req->internal_req);
  774. if (req->internal_req)
  775. {
  776. struct _starpu_mpi_early_data_handle *early_data_handle = _starpu_mpi_early_data_find(&req->node_tag);
  777. STARPU_MPI_ASSERT_MSG(early_data_handle, "Could not find a copy data handle with the tag %d and the node %d\n", req->node_tag.data_tag, req->node_tag.rank);
  778. _STARPU_MPI_DEBUG(3, "Handling deleting of early_data structure from the hashmap..\n");
  779. _starpu_mpi_early_data_delete(early_data_handle);
  780. free(early_data_handle);
  781. early_data_handle = NULL;
  782. }
  783. else
  784. {
  785. if (req->request_type == RECV_REQ || req->request_type == SEND_REQ)
  786. {
  787. if (req->registered_datatype == 0)
  788. {
  789. if (req->request_type == SEND_REQ)
  790. {
  791. // We need to make sure the communication for sending the size
  792. // has completed, as MPI can re-order messages, let's call
  793. // MPI_Wait to make sure data have been sent
  794. int ret;
  795. ret = MPI_Wait(&req->size_req, MPI_STATUS_IGNORE);
  796. STARPU_MPI_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Wait returning %s", _starpu_mpi_get_mpi_error_code(ret));
  797. free(req->ptr);
  798. req->ptr = NULL;
  799. }
  800. else if (req->request_type == RECV_REQ)
  801. {
  802. // req->ptr is freed by starpu_data_unpack
  803. starpu_data_unpack(req->data_handle, req->ptr, req->count);
  804. }
  805. }
  806. else
  807. {
  808. _starpu_mpi_handle_free_datatype(req->data_handle, &req->datatype);
  809. }
  810. }
  811. }
  812. if (req->data_handle)
  813. starpu_data_release(req->data_handle);
  814. if (req->envelope)
  815. {
  816. free(req->envelope);
  817. req->envelope = NULL;
  818. }
  819. /* Execute the specified callback, if any */
  820. if (req->callback)
  821. req->callback(req->callback_arg);
  822. /* tell anyone potentially waiting on the request that it is
  823. * terminated now */
  824. STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
  825. req->completed = 1;
  826. STARPU_PTHREAD_COND_BROADCAST(&req->req_cond);
  827. STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
  828. _STARPU_MPI_LOG_OUT();
  829. }
  830. static void _starpu_mpi_early_data_cb(void* arg)
  831. {
  832. struct _starpu_mpi_early_data_cb_args *args = arg;
  833. if (args->buffer)
  834. {
  835. /* Data has been received as a raw memory, it has to be unpacked */
  836. struct starpu_data_interface_ops *itf_src = starpu_data_get_interface_ops(args->early_handle);
  837. struct starpu_data_interface_ops *itf_dst = starpu_data_get_interface_ops(args->data_handle);
  838. STARPU_MPI_ASSERT_MSG(itf_dst->unpack_data, "The data interface does not define an unpack function\n");
  839. itf_dst->unpack_data(args->data_handle, STARPU_MAIN_RAM, args->buffer, itf_src->get_size(args->early_handle));
  840. free(args->buffer);
  841. args->buffer = NULL;
  842. }
  843. else
  844. {
  845. struct starpu_data_interface_ops *itf = starpu_data_get_interface_ops(args->early_handle);
  846. void* itf_src = starpu_data_get_interface_on_node(args->early_handle, STARPU_MAIN_RAM);
  847. void* itf_dst = starpu_data_get_interface_on_node(args->data_handle, STARPU_MAIN_RAM);
  848. if (!itf->copy_methods->ram_to_ram)
  849. {
  850. _STARPU_MPI_DEBUG(3, "Initiating any_to_any copy..\n");
  851. itf->copy_methods->any_to_any(itf_src, STARPU_MAIN_RAM, itf_dst, STARPU_MAIN_RAM, NULL);
  852. }
  853. else
  854. {
  855. _STARPU_MPI_DEBUG(3, "Initiating ram_to_ram copy..\n");
  856. itf->copy_methods->ram_to_ram(itf_src, STARPU_MAIN_RAM, itf_dst, STARPU_MAIN_RAM);
  857. }
  858. }
  859. _STARPU_MPI_DEBUG(3, "Done, handling release of early_handle..\n");
  860. starpu_data_release(args->early_handle);
  861. _STARPU_MPI_DEBUG(3, "Done, handling unregister of early_handle..\n");
  862. starpu_data_unregister_submit(args->early_handle);
  863. _STARPU_MPI_DEBUG(3, "Done, handling request %p termination of the already received request\n",args->req);
  864. // If the request is detached, we need to call _starpu_mpi_handle_request_termination
  865. // as it will not be called automatically as the request is not in the list detached_requests
  866. if (args->req)
  867. {
  868. if (args->req->detached)
  869. {
  870. _starpu_mpi_handle_request_termination(args->req);
  871. free(args->req);
  872. args->req = NULL;
  873. }
  874. else
  875. {
  876. // else: If the request is not detached its termination will
  877. // be handled when calling starpu_mpi_wait
  878. // We store in the application request the internal MPI
  879. // request so that it can be used by starpu_mpi_wait
  880. args->req->data_request = args->req->internal_req->data_request;
  881. STARPU_PTHREAD_MUTEX_LOCK(&args->req->req_mutex);
  882. args->req->submitted = 1;
  883. STARPU_PTHREAD_COND_BROADCAST(&args->req->req_cond);
  884. STARPU_PTHREAD_MUTEX_UNLOCK(&args->req->req_mutex);
  885. }
  886. }
  887. free(args);
  888. args = NULL;
  889. }
  890. #ifdef STARPU_MPI_ACTIVITY
  891. static unsigned _starpu_mpi_progression_hook_func(void *arg STARPU_ATTRIBUTE_UNUSED)
  892. {
  893. unsigned may_block = 1;
  894. STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
  895. if (!_starpu_mpi_req_list_empty(detached_requests))
  896. {
  897. STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
  898. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  899. STARPU_PTHREAD_COND_SIGNAL(&cond_progression);
  900. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  901. may_block = 0;
  902. }
  903. else
  904. STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
  905. return may_block;
  906. }
  907. #endif /* STARPU_MPI_ACTIVITY */
  908. static void _starpu_mpi_test_detached_requests(void)
  909. {
  910. //_STARPU_MPI_LOG_IN();
  911. int flag;
  912. MPI_Status status;
  913. struct _starpu_mpi_req *req;
  914. STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
  915. req = _starpu_mpi_req_list_begin(detached_requests);
  916. while (req != _starpu_mpi_req_list_end(detached_requests))
  917. {
  918. STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
  919. //_STARPU_MPI_DEBUG(3, "Test detached request %p - mpitag %d - TYPE %s %d\n", &req->data_request, req->node_tag.data_tag, _starpu_mpi_request_type(req->request_type), req->node_tag.rank);
  920. req->ret = MPI_Test(&req->data_request, &flag, &status);
  921. STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Test returning %s", _starpu_mpi_get_mpi_error_code(req->ret));
  922. if (!flag)
  923. {
  924. req = _starpu_mpi_req_list_next(req);
  925. }
  926. else
  927. {
  928. struct _starpu_mpi_req *next_req;
  929. next_req = _starpu_mpi_req_list_next(req);
  930. if (req->request_type == RECV_REQ)
  931. {
  932. _STARPU_MPI_TRACE_IRECV_COMPLETE_BEGIN(req->node_tag.rank, req->node_tag.data_tag);
  933. }
  934. else if (req->request_type == SEND_REQ)
  935. {
  936. _STARPU_MPI_TRACE_ISEND_COMPLETE_BEGIN(req->node_tag.rank, req->node_tag.data_tag, 0);
  937. }
  938. _starpu_mpi_req_list_erase(detached_requests, req);
  939. _starpu_mpi_handle_request_termination(req);
  940. if (req->request_type == RECV_REQ)
  941. {
  942. _STARPU_MPI_TRACE_IRECV_COMPLETE_END(req->node_tag.rank, req->node_tag.data_tag);
  943. }
  944. else if (req->request_type == SEND_REQ)
  945. {
  946. _STARPU_MPI_TRACE_ISEND_COMPLETE_END(req->node_tag.rank, req->node_tag.data_tag, 0);
  947. }
  948. if (req->is_internal_req == 0)
  949. {
  950. _starpu_mpi_request_destroy(req);
  951. }
  952. req = next_req;
  953. }
  954. STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
  955. }
  956. STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
  957. //_STARPU_MPI_LOG_OUT();
  958. }
  959. static void _starpu_mpi_handle_detached_request(struct _starpu_mpi_req *req)
  960. {
  961. if (req->detached)
  962. {
  963. /* put the submitted request into the list of pending requests
  964. * so that it can be handled by the progression mechanisms */
  965. STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
  966. _starpu_mpi_req_list_push_front(detached_requests, req);
  967. STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
  968. starpu_wake_all_blocked_workers();
  969. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  970. STARPU_PTHREAD_COND_SIGNAL(&cond_progression);
  971. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  972. }
  973. }
  974. static void _starpu_mpi_handle_ready_request(struct _starpu_mpi_req *req)
  975. {
  976. _STARPU_MPI_LOG_IN();
  977. STARPU_MPI_ASSERT_MSG(req, "Invalid request");
  978. /* submit the request to MPI */
  979. _STARPU_MPI_DEBUG(2, "Handling new request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d registered_datatype %d \n",
  980. req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.rank, req->data_handle, req->ptr, req->datatype_name, (int)req->count, req->registered_datatype);
  981. req->func(req);
  982. _STARPU_MPI_LOG_OUT();
  983. }
  984. struct _starpu_mpi_argc_argv
  985. {
  986. int initialize_mpi;
  987. int *argc;
  988. char ***argv;
  989. MPI_Comm comm;
  990. int fargc; // Fortran argc
  991. char **fargv; // Fortran argv
  992. };
  993. static void _starpu_mpi_print_thread_level_support(int thread_level, char *msg)
  994. {
  995. switch (thread_level)
  996. {
  997. case MPI_THREAD_SERIALIZED:
  998. {
  999. _STARPU_DISP("MPI%s MPI_THREAD_SERIALIZED; Multiple threads may make MPI calls, but only one at a time.\n", msg);
  1000. break;
  1001. }
  1002. case MPI_THREAD_FUNNELED:
  1003. {
  1004. _STARPU_DISP("MPI%s MPI_THREAD_FUNNELED; The application can safely make calls to StarPU-MPI functions, but should not call directly MPI communication functions.\n", msg);
  1005. break;
  1006. }
  1007. case MPI_THREAD_SINGLE:
  1008. {
  1009. _STARPU_DISP("MPI%s MPI_THREAD_SINGLE; MPI does not have multi-thread support, this might cause problems. The application can make calls to StarPU-MPI functions, but not call directly MPI Communication functions.\n", msg);
  1010. break;
  1011. }
  1012. }
  1013. }
  1014. static void _starpu_mpi_receive_early_data(struct _starpu_mpi_envelope *envelope, MPI_Status status, MPI_Comm comm)
  1015. {
  1016. _STARPU_MPI_DEBUG(20, "Request with tag %d and source %d not found, creating a early_data_handle to receive incoming data..\n", envelope->data_tag, status.MPI_SOURCE);
  1017. _STARPU_MPI_DEBUG(20, "Request sync %d\n", envelope->sync);
  1018. struct _starpu_mpi_early_data_handle* early_data_handle = _starpu_mpi_early_data_create(envelope, status.MPI_SOURCE, comm);
  1019. _starpu_mpi_early_data_add(early_data_handle);
  1020. starpu_data_handle_t data_handle = NULL;
  1021. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  1022. data_handle = _starpu_mpi_data_get_data_handle_from_tag(envelope->data_tag);
  1023. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  1024. if (data_handle && starpu_data_get_interface_id(data_handle) < STARPU_MAX_INTERFACE_ID)
  1025. {
  1026. /* We know which data will receive it and we won't have to unpack, use just the same kind of data. */
  1027. early_data_handle->buffer = NULL;
  1028. starpu_data_register_same(&early_data_handle->handle, data_handle);
  1029. //_starpu_mpi_early_data_add(early_data_handle);
  1030. }
  1031. else
  1032. {
  1033. /* The application has not registered yet a data with the tag,
  1034. * we are going to receive the data as a raw memory, and give it
  1035. * to the application when it post a receive for this tag
  1036. */
  1037. _STARPU_MPI_DEBUG(3, "Posting a receive for a data of size %d which has not yet been registered\n", (int)early_data_handle->env->size);
  1038. early_data_handle->buffer = malloc(early_data_handle->env->size);
  1039. starpu_variable_data_register(&early_data_handle->handle, STARPU_MAIN_RAM, (uintptr_t) early_data_handle->buffer, early_data_handle->env->size);
  1040. //_starpu_mpi_early_data_add(early_data_handle);
  1041. }
  1042. _STARPU_MPI_DEBUG(20, "Posting internal detached irecv on early_data_handle with tag %d from comm %d src %d ..\n", early_data_handle->node_tag.data_tag, comm, status.MPI_SOURCE);
  1043. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  1044. early_data_handle->req = _starpu_mpi_irecv_common(early_data_handle->handle, status.MPI_SOURCE,
  1045. early_data_handle->node_tag.data_tag, comm, 1, 0,
  1046. NULL, NULL, 1, 1, envelope->size);
  1047. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  1048. // We wait until the request is pushed in the
  1049. // ready_request list, that ensures that the next loop
  1050. // will call _starpu_mpi_handle_ready_request
  1051. // on the request and post the corresponding mpi_irecv,
  1052. // otherwise, it may lead to read data as envelop
  1053. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  1054. STARPU_PTHREAD_MUTEX_LOCK(&(early_data_handle->req->posted_mutex));
  1055. while (!(early_data_handle->req->posted))
  1056. STARPU_PTHREAD_COND_WAIT(&(early_data_handle->req->posted_cond), &(early_data_handle->req->posted_mutex));
  1057. STARPU_PTHREAD_MUTEX_UNLOCK(&(early_data_handle->req->posted_mutex));
  1058. STARPU_PTHREAD_MUTEX_LOCK(&early_data_handle->req_mutex);
  1059. early_data_handle->req_ready = 1;
  1060. STARPU_PTHREAD_COND_BROADCAST(&early_data_handle->req_cond);
  1061. STARPU_PTHREAD_MUTEX_UNLOCK(&early_data_handle->req_mutex);
  1062. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  1063. }
  1064. static void _starpu_mpi_do_initialize(struct _starpu_mpi_argc_argv *argc_argv)
  1065. {
  1066. if (argc_argv->initialize_mpi)
  1067. {
  1068. int thread_support;
  1069. _STARPU_DEBUG("Calling MPI_Init_thread\n");
  1070. if (MPI_Init_thread(argc_argv->argc, argc_argv->argv, MPI_THREAD_SERIALIZED, &thread_support) != MPI_SUCCESS)
  1071. {
  1072. _STARPU_ERROR("MPI_Init_thread failed\n");
  1073. }
  1074. _starpu_mpi_print_thread_level_support(thread_support, "_Init_thread level =");
  1075. }
  1076. else
  1077. {
  1078. int provided;
  1079. MPI_Query_thread(&provided);
  1080. _starpu_mpi_print_thread_level_support(provided, " has been initialized with");
  1081. }
  1082. }
  1083. static void *_starpu_mpi_progress_thread_func(void *arg)
  1084. {
  1085. struct _starpu_mpi_argc_argv *argc_argv = (struct _starpu_mpi_argc_argv *) arg;
  1086. int rank, worldsize;
  1087. #ifndef STARPU_SIMGRID
  1088. _starpu_mpi_do_initialize(argc_argv);
  1089. #endif
  1090. MPI_Comm_rank(argc_argv->comm, &rank);
  1091. MPI_Comm_size(argc_argv->comm, &worldsize);
  1092. MPI_Comm_set_errhandler(argc_argv->comm, MPI_ERRORS_RETURN);
  1093. #ifdef STARPU_SIMGRID
  1094. _mpi_world_size = worldsize;
  1095. _mpi_world_rank = rank;
  1096. #endif
  1097. #ifdef STARPU_SIMGRID
  1098. /* Now that MPI is set up, let the rest of simgrid get initialized */
  1099. char ** argv_cpy = malloc(*(argc_argv->argc) * sizeof(char*));
  1100. int i;
  1101. for (i = 0; i < *(argc_argv->argc); i++)
  1102. argv_cpy[i] = strdup((*(argc_argv->argv))[i]);
  1103. MSG_process_create_with_arguments("main", smpi_simulated_main_, NULL, _starpu_simgrid_get_host_by_name("MAIN"), *(argc_argv->argc), argv_cpy);
  1104. /* And set TSD for us */
  1105. #ifdef HAVE_SMPI_PROCESS_SET_USER_DATA
  1106. smpi_process_set_user_data(calloc(MAX_TSD, sizeof(void*)));
  1107. #endif
  1108. #endif
  1109. #ifdef STARPU_USE_FXT
  1110. STARPU_PTHREAD_MUTEX_LOCK(&_starpu_fxt_started_mutex);
  1111. while (!_starpu_fxt_started)
  1112. STARPU_PTHREAD_COND_WAIT(&_starpu_fxt_started_cond, &_starpu_fxt_started_mutex);
  1113. STARPU_PTHREAD_MUTEX_UNLOCK(&_starpu_fxt_started_mutex);
  1114. #endif //STARPU_USE_FXT
  1115. {
  1116. _STARPU_MPI_TRACE_START(rank, worldsize);
  1117. #ifdef STARPU_USE_FXT
  1118. starpu_profiling_set_id(rank);
  1119. #endif //STARPU_USE_FXT
  1120. }
  1121. _starpu_mpi_add_sync_point_in_fxt();
  1122. _starpu_mpi_comm_amounts_init(argc_argv->comm);
  1123. _starpu_mpi_cache_init(argc_argv->comm);
  1124. _starpu_mpi_select_node_init();
  1125. _starpu_mpi_tag_init();
  1126. _starpu_mpi_comm_init(argc_argv->comm);
  1127. _starpu_mpi_early_request_init();
  1128. _starpu_mpi_early_data_init();
  1129. _starpu_mpi_sync_data_init();
  1130. _starpu_mpi_datatype_init();
  1131. /* notify the main thread that the progression thread is ready */
  1132. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  1133. running = 1;
  1134. STARPU_PTHREAD_COND_SIGNAL(&cond_progression);
  1135. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  1136. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  1137. int envelope_request_submitted = 0;
  1138. while (running || posted_requests || !(_starpu_mpi_req_list_empty(ready_requests)) || !(_starpu_mpi_req_list_empty(detached_requests)))// || !(_starpu_mpi_early_request_count()) || !(_starpu_mpi_sync_data_count()))
  1139. {
  1140. /* shall we block ? */
  1141. unsigned block = _starpu_mpi_req_list_empty(ready_requests) && _starpu_mpi_early_request_count() == 0 && _starpu_mpi_sync_data_count() == 0;
  1142. #ifndef STARPU_MPI_ACTIVITY
  1143. STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
  1144. block = block && _starpu_mpi_req_list_empty(detached_requests);
  1145. STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
  1146. #endif /* STARPU_MPI_ACTIVITY */
  1147. #ifdef STARPU_SIMGRID
  1148. MSG_process_sleep(0.000010);
  1149. #endif
  1150. if (block)
  1151. {
  1152. _STARPU_MPI_DEBUG(3, "NO MORE REQUESTS TO HANDLE\n");
  1153. _STARPU_MPI_TRACE_SLEEP_BEGIN();
  1154. if (barrier_running)
  1155. /* Tell mpi_barrier */
  1156. STARPU_PTHREAD_COND_SIGNAL(&cond_finished);
  1157. STARPU_PTHREAD_COND_WAIT(&cond_progression, &mutex);
  1158. _STARPU_MPI_TRACE_SLEEP_END();
  1159. }
  1160. /* get one request */
  1161. struct _starpu_mpi_req *req;
  1162. while (!_starpu_mpi_req_list_empty(ready_requests))
  1163. {
  1164. req = _starpu_mpi_req_list_pop_back(ready_requests);
  1165. /* handling a request is likely to block for a while
  1166. * (on a sync_data_with_mem call), we want to let the
  1167. * application submit requests in the meantime, so we
  1168. * release the lock. */
  1169. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  1170. _starpu_mpi_handle_ready_request(req);
  1171. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  1172. }
  1173. /* If there is no currently submitted envelope_request submitted to
  1174. * catch envelopes from senders, and there is some pending
  1175. * receive requests on our side, we resubmit a header request. */
  1176. if (((_starpu_mpi_early_request_count() > 0) || (_starpu_mpi_sync_data_count() > 0)) && (envelope_request_submitted == 0))// && (HASH_COUNT(_starpu_mpi_early_data_handle_hashmap) == 0))
  1177. {
  1178. _starpu_mpi_comm_post_recv();
  1179. envelope_request_submitted = 1;
  1180. }
  1181. /* test whether there are some terminated "detached request" */
  1182. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  1183. _starpu_mpi_test_detached_requests();
  1184. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  1185. if (envelope_request_submitted == 1)
  1186. {
  1187. int flag;
  1188. struct _starpu_mpi_envelope *envelope;
  1189. MPI_Status envelope_status;
  1190. MPI_Comm envelope_comm;
  1191. /* test whether an envelope has arrived. */
  1192. flag = _starpu_mpi_comm_test_recv(&envelope_status, &envelope, &envelope_comm);
  1193. if (flag)
  1194. {
  1195. _STARPU_MPI_DEBUG(4, "Envelope received with mode %d\n", envelope->mode);
  1196. if (envelope->mode == _STARPU_MPI_ENVELOPE_SYNC_READY)
  1197. {
  1198. struct _starpu_mpi_req *_sync_req = _starpu_mpi_sync_data_find(envelope->data_tag, envelope_status.MPI_SOURCE, envelope_comm);
  1199. _STARPU_MPI_DEBUG(20, "Sending data with tag %d to node %d\n", _sync_req->node_tag.data_tag, envelope_status.MPI_SOURCE);
  1200. STARPU_MPI_ASSERT_MSG(envelope->data_tag == _sync_req->node_tag.data_tag, "Tag mismatch (envelope %d != req %d)\n", envelope->data_tag, _sync_req->node_tag.data_tag);
  1201. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  1202. _starpu_mpi_isend_data_func(_sync_req);
  1203. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  1204. }
  1205. else
  1206. {
  1207. _STARPU_MPI_DEBUG(3, "Searching for application request with tag %d and source %d (size %ld)\n", envelope->data_tag, envelope_status.MPI_SOURCE, envelope->size);
  1208. struct _starpu_mpi_req *early_request = _starpu_mpi_early_request_dequeue(envelope->data_tag, envelope_status.MPI_SOURCE, envelope_comm);
  1209. /* Case: a data will arrive before a matching receive is
  1210. * posted by the application. Create a temporary handle to
  1211. * store the incoming data, submit a starpu_mpi_irecv_detached
  1212. * on this handle, and store it as an early_data
  1213. */
  1214. if (early_request == NULL)
  1215. {
  1216. if (envelope->sync)
  1217. {
  1218. _STARPU_MPI_DEBUG(2000, "-------------------------> adding request for tag %d\n", envelope->data_tag);
  1219. struct _starpu_mpi_req *new_req;
  1220. #ifdef STARPU_DEVEL
  1221. #warning creating a request is not really useful.
  1222. #endif
  1223. /* Initialize the request structure */
  1224. _starpu_mpi_request_init(&new_req);
  1225. new_req->request_type = RECV_REQ;
  1226. new_req->data_handle = NULL;
  1227. new_req->node_tag.rank = envelope_status.MPI_SOURCE;
  1228. new_req->node_tag.data_tag = envelope->data_tag;
  1229. new_req->node_tag.comm = envelope_comm;
  1230. new_req->detached = 1;
  1231. new_req->sync = 1;
  1232. new_req->callback = NULL;
  1233. new_req->callback_arg = NULL;
  1234. new_req->func = _starpu_mpi_irecv_data_func;
  1235. new_req->sequential_consistency = 1;
  1236. new_req->is_internal_req = 0; // ????
  1237. new_req->count = envelope->size;
  1238. _starpu_mpi_sync_data_add(new_req);
  1239. }
  1240. else
  1241. {
  1242. _starpu_mpi_receive_early_data(envelope, envelope_status, envelope_comm);
  1243. }
  1244. }
  1245. /* Case: a matching application request has been found for
  1246. * the incoming data, we handle the correct allocation
  1247. * of the pointer associated to the data handle, then
  1248. * submit the corresponding receive with
  1249. * _starpu_mpi_handle_ready_request. */
  1250. else
  1251. {
  1252. _STARPU_MPI_DEBUG(2000, "A matching application request has been found for the incoming data with tag %d\n", envelope->data_tag);
  1253. _STARPU_MPI_DEBUG(2000, "Request sync %d\n", envelope->sync);
  1254. early_request->sync = envelope->sync;
  1255. _starpu_mpi_handle_allocate_datatype(early_request->data_handle, early_request);
  1256. if (early_request->registered_datatype == 1)
  1257. {
  1258. early_request->count = 1;
  1259. early_request->ptr = starpu_data_get_local_ptr(early_request->data_handle);
  1260. }
  1261. else
  1262. {
  1263. early_request->count = envelope->size;
  1264. early_request->ptr = malloc(early_request->count);
  1265. STARPU_MPI_ASSERT_MSG(early_request->ptr, "cannot allocate message of size %ld\n", early_request->count);
  1266. }
  1267. _STARPU_MPI_DEBUG(3, "Handling new request... \n");
  1268. /* handling a request is likely to block for a while
  1269. * (on a sync_data_with_mem call), we want to let the
  1270. * application submit requests in the meantime, so we
  1271. * release the lock. */
  1272. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  1273. _starpu_mpi_handle_ready_request(early_request);
  1274. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  1275. }
  1276. }
  1277. envelope_request_submitted = 0;
  1278. }
  1279. else
  1280. {
  1281. //_STARPU_MPI_DEBUG(4, "Nothing received, continue ..\n");
  1282. }
  1283. }
  1284. }
  1285. if (envelope_request_submitted)
  1286. {
  1287. _starpu_mpi_comm_cancel_recv();
  1288. envelope_request_submitted = 0;
  1289. }
  1290. STARPU_MPI_ASSERT_MSG(_starpu_mpi_req_list_empty(detached_requests), "List of detached requests not empty");
  1291. STARPU_MPI_ASSERT_MSG(_starpu_mpi_req_list_empty(ready_requests), "List of ready requests not empty");
  1292. STARPU_MPI_ASSERT_MSG(posted_requests == 0, "Number of posted request is not zero");
  1293. _starpu_mpi_early_request_check_termination();
  1294. _starpu_mpi_early_data_check_termination();
  1295. _starpu_mpi_sync_data_check_termination();
  1296. if (argc_argv->initialize_mpi)
  1297. {
  1298. _STARPU_MPI_DEBUG(3, "Calling MPI_Finalize()\n");
  1299. MPI_Finalize();
  1300. }
  1301. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  1302. _starpu_mpi_sync_data_free();
  1303. _starpu_mpi_early_data_free();
  1304. _starpu_mpi_early_request_free();
  1305. _starpu_mpi_datatype_free();
  1306. free(argc_argv);
  1307. return NULL;
  1308. }
  1309. /********************************************************/
  1310. /* */
  1311. /* (De)Initialization methods */
  1312. /* */
  1313. /********************************************************/
  1314. #ifdef STARPU_MPI_ACTIVITY
  1315. static int hookid = - 1;
  1316. #endif /* STARPU_MPI_ACTIVITY */
  1317. static void _starpu_mpi_add_sync_point_in_fxt(void)
  1318. {
  1319. #ifdef STARPU_USE_FXT
  1320. int rank;
  1321. int worldsize;
  1322. int ret;
  1323. starpu_mpi_comm_rank(MPI_COMM_WORLD, &rank);
  1324. starpu_mpi_comm_size(MPI_COMM_WORLD, &worldsize);
  1325. ret = MPI_Barrier(MPI_COMM_WORLD);
  1326. STARPU_MPI_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Barrier returning %s", _starpu_mpi_get_mpi_error_code(ret));
  1327. /* We generate a "unique" key so that we can make sure that different
  1328. * FxT traces come from the same MPI run. */
  1329. int random_number;
  1330. /* XXX perhaps we don't want to generate a new seed if the application
  1331. * specified some reproductible behaviour ? */
  1332. if (rank == 0)
  1333. {
  1334. srand(time(NULL));
  1335. random_number = rand();
  1336. }
  1337. ret = MPI_Bcast(&random_number, 1, MPI_INT, 0, MPI_COMM_WORLD);
  1338. STARPU_MPI_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Bcast returning %s", _starpu_mpi_get_mpi_error_code(ret));
  1339. _STARPU_MPI_TRACE_BARRIER(rank, worldsize, random_number);
  1340. _STARPU_MPI_DEBUG(3, "unique key %x\n", random_number);
  1341. #endif
  1342. }
  1343. static
  1344. int _starpu_mpi_initialize(int *argc, char ***argv, int initialize_mpi, MPI_Comm comm)
  1345. {
  1346. struct _starpu_mpi_argc_argv *argc_argv = malloc(sizeof(struct _starpu_mpi_argc_argv));
  1347. argc_argv->initialize_mpi = initialize_mpi;
  1348. argc_argv->argc = argc;
  1349. argc_argv->argv = argv;
  1350. argc_argv->comm = comm;
  1351. #ifdef STARPU_SIMGRID
  1352. /* Call MPI_Init_thread as early as possible, to initialize simgrid
  1353. * before working with mutexes etc. */
  1354. _starpu_mpi_do_initialize(argc_argv);
  1355. #endif
  1356. STARPU_PTHREAD_MUTEX_INIT(&mutex, NULL);
  1357. STARPU_PTHREAD_COND_INIT(&cond_progression, NULL);
  1358. STARPU_PTHREAD_COND_INIT(&cond_finished, NULL);
  1359. ready_requests = _starpu_mpi_req_list_new();
  1360. STARPU_PTHREAD_MUTEX_INIT(&detached_requests_mutex, NULL);
  1361. detached_requests = _starpu_mpi_req_list_new();
  1362. STARPU_PTHREAD_MUTEX_INIT(&mutex_posted_requests, NULL);
  1363. _starpu_mpi_comm = starpu_getenv("STARPU_MPI_COMM") != NULL;
  1364. #ifdef STARPU_MPI_ACTIVITY
  1365. hookid = starpu_progression_hook_register(_starpu_mpi_progression_hook_func, NULL);
  1366. STARPU_MPI_ASSERT_MSG(hookid >= 0, "starpu_progression_hook_register failed");
  1367. #endif /* STARPU_MPI_ACTIVITY */
  1368. #ifdef STARPU_SIMGRID
  1369. _starpu_mpi_progress_thread_func(argc_argv);
  1370. return 0;
  1371. #else
  1372. STARPU_PTHREAD_CREATE(&progress_thread, NULL, _starpu_mpi_progress_thread_func, argc_argv);
  1373. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  1374. while (!running)
  1375. STARPU_PTHREAD_COND_WAIT(&cond_progression, &mutex);
  1376. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  1377. return 0;
  1378. #endif
  1379. }
  1380. #ifdef STARPU_SIMGRID
  1381. /* This is called before application's main, to initialize SMPI before we can
  1382. * create MSG processes to run application's main */
  1383. int _starpu_mpi_simgrid_init(int argc, char *argv[])
  1384. {
  1385. return _starpu_mpi_initialize(&argc, &argv, 1, MPI_COMM_WORLD);
  1386. }
  1387. #endif
  1388. int starpu_mpi_init_comm(int *argc STARPU_ATTRIBUTE_UNUSED, char ***argv STARPU_ATTRIBUTE_UNUSED, int initialize_mpi STARPU_ATTRIBUTE_UNUSED, MPI_Comm comm STARPU_ATTRIBUTE_UNUSED)
  1389. {
  1390. #ifdef STARPU_SIMGRID
  1391. return 0;
  1392. #else
  1393. return _starpu_mpi_initialize(argc, argv, initialize_mpi, comm);
  1394. #endif
  1395. }
  1396. int starpu_mpi_init(int *argc, char ***argv, int initialize_mpi)
  1397. {
  1398. return starpu_mpi_init_comm(argc, argv, initialize_mpi, MPI_COMM_WORLD);
  1399. }
  1400. int starpu_mpi_initialize(void)
  1401. {
  1402. #ifdef STARPU_SIMGRID
  1403. return 0;
  1404. #else
  1405. return _starpu_mpi_initialize(NULL, NULL, 0, MPI_COMM_WORLD);
  1406. #endif
  1407. }
  1408. int starpu_mpi_initialize_extended(int *rank, int *world_size)
  1409. {
  1410. #ifdef STARPU_SIMGRID
  1411. *world_size = _mpi_world_size;
  1412. *rank = _mpi_world_rank;
  1413. return 0;
  1414. #else
  1415. int ret;
  1416. ret = _starpu_mpi_initialize(NULL, NULL, 1, MPI_COMM_WORLD);
  1417. if (ret == 0)
  1418. {
  1419. _STARPU_DEBUG("Calling MPI_Comm_rank\n");
  1420. MPI_Comm_rank(MPI_COMM_WORLD, rank);
  1421. MPI_Comm_size(MPI_COMM_WORLD, world_size);
  1422. }
  1423. return ret;
  1424. #endif
  1425. }
  1426. int starpu_mpi_shutdown(void)
  1427. {
  1428. void *value;
  1429. int rank, world_size;
  1430. /* We need to get the rank before calling MPI_Finalize to pass to _starpu_mpi_comm_amounts_display() */
  1431. starpu_mpi_comm_rank(MPI_COMM_WORLD, &rank);
  1432. starpu_mpi_comm_size(MPI_COMM_WORLD, &world_size);
  1433. /* kill the progression thread */
  1434. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  1435. running = 0;
  1436. STARPU_PTHREAD_COND_BROADCAST(&cond_progression);
  1437. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  1438. #ifndef STARPU_SIMGRID
  1439. starpu_pthread_join(progress_thread, &value);
  1440. #else
  1441. /* FIXME: should rather properly wait for _starpu_mpi_progress_thread_func to finish */
  1442. MSG_process_sleep(1);
  1443. #endif
  1444. #ifdef STARPU_MPI_ACTIVITY
  1445. starpu_progression_hook_deregister(hookid);
  1446. #endif /* STARPU_MPI_ACTIVITY */
  1447. _STARPU_MPI_TRACE_STOP(rank, world_size);
  1448. /* free the request queues */
  1449. _starpu_mpi_req_list_delete(detached_requests);
  1450. _starpu_mpi_req_list_delete(ready_requests);
  1451. _starpu_mpi_comm_amounts_display(rank);
  1452. _starpu_mpi_comm_amounts_free();
  1453. _starpu_mpi_cache_free(world_size);
  1454. _starpu_mpi_tag_free();
  1455. _starpu_mpi_comm_free();
  1456. return 0;
  1457. }
  1458. void _starpu_mpi_clear_cache(starpu_data_handle_t data_handle)
  1459. {
  1460. _starpu_mpi_data_release_tag(data_handle);
  1461. struct _starpu_mpi_node_tag *mpi_data = data_handle->mpi_data;
  1462. _starpu_mpi_cache_flush(mpi_data->comm, data_handle);
  1463. free(data_handle->mpi_data);
  1464. }
  1465. void starpu_mpi_data_register_comm(starpu_data_handle_t data_handle, int tag, int rank, MPI_Comm comm)
  1466. {
  1467. struct _starpu_mpi_node_tag *mpi_data;
  1468. if (data_handle->mpi_data)
  1469. {
  1470. mpi_data = data_handle->mpi_data;
  1471. }
  1472. else
  1473. {
  1474. mpi_data = calloc(1, sizeof(struct _starpu_mpi_node_tag));
  1475. data_handle->mpi_data = mpi_data;
  1476. _starpu_mpi_data_register_tag(data_handle, tag);
  1477. _starpu_data_set_unregister_hook(data_handle, _starpu_mpi_clear_cache);
  1478. }
  1479. if (tag != -1)
  1480. {
  1481. mpi_data->data_tag = tag;
  1482. }
  1483. if (rank != -1)
  1484. {
  1485. mpi_data->rank = rank;
  1486. mpi_data->comm = comm;
  1487. _starpu_mpi_comm_register(comm);
  1488. }
  1489. }
  1490. void starpu_mpi_data_set_rank_comm(starpu_data_handle_t handle, int rank, MPI_Comm comm)
  1491. {
  1492. starpu_mpi_data_register_comm(handle, -1, rank, comm);
  1493. }
  1494. void starpu_mpi_data_set_tag(starpu_data_handle_t handle, int tag)
  1495. {
  1496. starpu_mpi_data_register_comm(handle, tag, -1, MPI_COMM_WORLD);
  1497. }
  1498. int starpu_mpi_data_get_rank(starpu_data_handle_t data)
  1499. {
  1500. STARPU_ASSERT_MSG(data->mpi_data, "starpu_mpi_data_register MUST be called for data %p\n", data);
  1501. return ((struct _starpu_mpi_node_tag *)(data->mpi_data))->rank;
  1502. }
  1503. int starpu_mpi_data_get_tag(starpu_data_handle_t data)
  1504. {
  1505. STARPU_ASSERT_MSG(data->mpi_data, "starpu_mpi_data_register MUST be called for data %p\n", data);
  1506. return ((struct _starpu_mpi_node_tag *)(data->mpi_data))->data_tag;
  1507. }
  1508. int starpu_mpi_comm_size(MPI_Comm comm, int *size)
  1509. {
  1510. #ifdef STARPU_SIMGRID
  1511. STARPU_MPI_ASSERT_MSG(comm == MPI_COMM_WORLD, "StarPU-SMPI only works with MPI_COMM_WORLD for now");
  1512. *size = _mpi_world_size;
  1513. return 0;
  1514. #else
  1515. return MPI_Comm_size(comm, size);
  1516. #endif
  1517. }
  1518. int starpu_mpi_comm_rank(MPI_Comm comm, int *rank)
  1519. {
  1520. #ifdef STARPU_SIMGRID
  1521. STARPU_MPI_ASSERT_MSG(comm == MPI_COMM_WORLD, "StarPU-SMPI only works with MPI_COMM_WORLD for now");
  1522. *rank = _mpi_world_rank;
  1523. return 0;
  1524. #else
  1525. return MPI_Comm_rank(comm, rank);
  1526. #endif
  1527. }
  1528. int starpu_mpi_world_size(void)
  1529. {
  1530. int size;
  1531. starpu_mpi_comm_size(MPI_COMM_WORLD, &size);
  1532. return size;
  1533. }
  1534. int starpu_mpi_world_rank(void)
  1535. {
  1536. int rank;
  1537. starpu_mpi_comm_rank(MPI_COMM_WORLD, &rank);
  1538. return rank;
  1539. }
  1540. int starpu_mpi_wait_for_all(MPI_Comm comm)
  1541. {
  1542. int mpi = 1;
  1543. int task = 1;
  1544. while (task || mpi)
  1545. {
  1546. task = _starpu_task_wait_for_all_and_return_nb_waited_tasks();
  1547. mpi = _starpu_mpi_barrier(comm);
  1548. }
  1549. return 0;
  1550. }
  1551. /* Fortran related functions */
  1552. struct _starpu_mpi_argc_argv *fstarpu_mpi_argcv_alloc(int argc, int initialize_mpi, int comm_present, MPI_Fint comm)
  1553. {
  1554. struct _starpu_mpi_argc_argv *argcv = calloc(1,sizeof(*argcv));
  1555. argcv->initialize_mpi = initialize_mpi;
  1556. if (comm_present) {
  1557. argcv->comm = MPI_Comm_f2c(comm);
  1558. } else {
  1559. argcv->comm = MPI_COMM_WORLD;
  1560. }
  1561. argcv->fargc = argc;
  1562. argcv->argc = &argcv->fargc;
  1563. argcv->fargv = calloc(argc, sizeof(char *));
  1564. argcv->argv = &argcv->fargv;
  1565. return argcv;
  1566. }
  1567. void fstarpu_mpi_argcv_set_arg(struct _starpu_mpi_argc_argv *argcv, int i, int len, char *_s)
  1568. {
  1569. STARPU_ASSERT(len >= 0);
  1570. STARPU_ASSERT(i >= 0 && i < argcv->fargc);
  1571. char *s = malloc(len+1);
  1572. memcpy(s, _s, len);
  1573. s[len] = '\0';
  1574. argcv->fargv[i] = s;
  1575. }
  1576. void fstarpu_mpi_argcv_free(struct _starpu_mpi_argc_argv *argcv)
  1577. {
  1578. if (argcv->fargv != NULL)
  1579. {
  1580. int i;
  1581. for (i=0; i<argcv->fargc; i++)
  1582. {
  1583. free(argcv->fargv[i]);
  1584. }
  1585. free(argcv->fargv);
  1586. }
  1587. free(argcv);
  1588. }
  1589. starpu_mpi_req *fstarpu_mpi_req_alloc(void)
  1590. {
  1591. return calloc(1, sizeof(starpu_mpi_req));
  1592. }
  1593. void fstarpu_mpi_req_free(starpu_mpi_req *req)
  1594. {
  1595. free(req);
  1596. }
  1597. MPI_Status *fstarpu_mpi_status_alloc(void)
  1598. {
  1599. return calloc(1, sizeof(MPI_Status));
  1600. }
  1601. void fstarpu_mpi_status_free(MPI_Status *status)
  1602. {
  1603. free(status);
  1604. }
  1605. int fstarpu_mpi_barrier(MPI_Fint comm)
  1606. {
  1607. return starpu_mpi_barrier(MPI_Comm_f2c(comm));
  1608. }
  1609. int fstarpu_mpi_irecv_detached_sequential_consistency(starpu_data_handle_t data_handle, int src, int mpi_tag, MPI_Fint comm, void (*callback)(void *), void *arg, int seq_const)
  1610. {
  1611. return starpu_mpi_irecv_detached_sequential_consistency(data_handle, src, mpi_tag, MPI_Comm_f2c(comm), callback, arg, seq_const);
  1612. }
  1613. int fstarpu_mpi_init_c(struct _starpu_mpi_argc_argv *argcv)
  1614. {
  1615. return starpu_mpi_init_comm(argcv->argc, argcv->argv, argcv->initialize_mpi, argcv->comm);
  1616. }
  1617. void fstarpu_mpi_get_data_on_node(MPI_Fint comm, starpu_data_handle_t data_handle, int node)
  1618. {
  1619. starpu_mpi_get_data_on_node(MPI_Comm_f2c(comm), data_handle, node);
  1620. }
  1621. void fstarpu_mpi_get_data_on_node_detached(MPI_Fint comm, starpu_data_handle_t data_handle, int node, void (*callback)(void *), void *arg)
  1622. {
  1623. starpu_mpi_get_data_on_node_detached(MPI_Comm_f2c(comm), data_handle, node, callback, arg);
  1624. }
  1625. void fstarpu_mpi_redux_data(MPI_Fint comm, starpu_data_handle_t data_handle)
  1626. {
  1627. starpu_mpi_redux_data(MPI_Comm_f2c(comm), data_handle);
  1628. }
  1629. /* scatter/gather */
  1630. int fstarpu_mpi_scatter_detached(starpu_data_handle_t *data_handles, int cnt, int root, MPI_Fint comm, void (*scallback)(void *), void *sarg, void (*rcallback)(void *), void *rarg)
  1631. {
  1632. return starpu_mpi_scatter_detached(data_handles, cnt, root, MPI_Comm_f2c(comm), scallback, sarg, rcallback, rarg);
  1633. }
  1634. int fstarpu_mpi_gather_detached(starpu_data_handle_t *data_handles, int cnt, int root, MPI_Fint comm, void (*scallback)(void *), void *sarg, void (*rcallback)(void *), void *rarg)
  1635. {
  1636. return starpu_mpi_gather_detached(data_handles, cnt, root, MPI_Comm_f2c(comm), scallback, sarg, rcallback, rarg);
  1637. }
  1638. /* isend/irecv detached unlock tag */
  1639. int fstarpu_mpi_isend_detached_unlock_tag(starpu_data_handle_t data_handle, int dst, int mpi_tag, MPI_Fint comm, starpu_tag_t *starpu_tag)
  1640. {
  1641. return starpu_mpi_isend_detached_unlock_tag(data_handle, dst, mpi_tag, MPI_Comm_f2c(comm), *starpu_tag);
  1642. }
  1643. int fstarpu_mpi_irecv_detached_unlock_tag(starpu_data_handle_t data_handle, int src, int mpi_tag, MPI_Fint comm, starpu_tag_t *starpu_tag)
  1644. {
  1645. return starpu_mpi_irecv_detached_unlock_tag(data_handle, src, mpi_tag, MPI_Comm_f2c(comm), *starpu_tag);
  1646. }
  1647. /* isend/irecv array detached unlock tag */
  1648. int fstarpu_mpi_isend_array_detached_unlock_tag(int array_size, starpu_data_handle_t *data_handles, int *dsts, int *mpi_tags, MPI_Fint *_comms, starpu_tag_t *starpu_tag)
  1649. {
  1650. MPI_Comm comms[array_size];
  1651. int i;
  1652. for (i = 0; i < array_size; i++)
  1653. {
  1654. comms[i] = MPI_Comm_f2c(_comms[i]);
  1655. }
  1656. int ret = starpu_mpi_isend_array_detached_unlock_tag((unsigned)array_size, data_handles, dsts, mpi_tags, comms, *starpu_tag);
  1657. return ret;
  1658. }
  1659. int fstarpu_mpi_irecv_array_detached_unlock_tag(int array_size, starpu_data_handle_t *data_handles, int *srcs, int *mpi_tags, MPI_Fint *_comms, starpu_tag_t *starpu_tag)
  1660. {
  1661. MPI_Comm comms[array_size];
  1662. int i;
  1663. for (i = 0; i < array_size; i++)
  1664. {
  1665. comms[i] = MPI_Comm_f2c(_comms[i]);
  1666. }
  1667. int ret = starpu_mpi_irecv_array_detached_unlock_tag((unsigned)array_size, data_handles, srcs, mpi_tags, comms, *starpu_tag);
  1668. return ret;
  1669. }
  1670. /* isend/irecv */
  1671. int fstarpu_mpi_isend(starpu_data_handle_t data_handle, starpu_mpi_req *req, int dst, int mpi_tag, MPI_Fint comm)
  1672. {
  1673. return starpu_mpi_isend(data_handle, req, dst, mpi_tag, MPI_Comm_f2c(comm));
  1674. }
  1675. int fstarpu_mpi_irecv(starpu_data_handle_t data_handle, starpu_mpi_req *req, int src, int mpi_tag, MPI_Fint comm)
  1676. {
  1677. return starpu_mpi_irecv(data_handle, req, src, mpi_tag, MPI_Comm_f2c(comm));
  1678. }
  1679. /* send/recv */
  1680. int fstarpu_mpi_send(starpu_data_handle_t data_handle, int dst, int mpi_tag, MPI_Fint comm)
  1681. {
  1682. return starpu_mpi_send(data_handle, dst, mpi_tag, MPI_Comm_f2c(comm));
  1683. }
  1684. int fstarpu_mpi_recv(starpu_data_handle_t data_handle, int src, int mpi_tag, MPI_Fint comm, MPI_Status *status)
  1685. {
  1686. return starpu_mpi_recv(data_handle, src, mpi_tag, MPI_Comm_f2c(comm), status);
  1687. }
  1688. /* isend/irecv detached */
  1689. int fstarpu_mpi_isend_detached(starpu_data_handle_t data_handle, int dst, int mpi_tag, MPI_Fint comm, void (*callback)(void *), void *arg)
  1690. {
  1691. return starpu_mpi_isend_detached(data_handle, dst, mpi_tag, MPI_Comm_f2c(comm), callback, arg);
  1692. }
  1693. int fstarpu_mpi_irecv_detached(starpu_data_handle_t data_handle, int src, int mpi_tag, MPI_Fint comm, void (*callback)(void *), void *arg)
  1694. {
  1695. return starpu_mpi_irecv_detached(data_handle, src, mpi_tag, MPI_Comm_f2c(comm), callback, arg);
  1696. }
  1697. /* issend / issend detached */
  1698. int fstarpu_mpi_issend(starpu_data_handle_t data_handle, starpu_mpi_req *req, int dst, int mpi_tag, MPI_Fint comm)
  1699. {
  1700. return starpu_mpi_issend(data_handle, req, dst, mpi_tag, MPI_Comm_f2c(comm));
  1701. }
  1702. int fstarpu_mpi_issend_detached(starpu_data_handle_t data_handle, int dst, int mpi_tag, MPI_Fint comm, void (*callback)(void *), void *arg)
  1703. {
  1704. return starpu_mpi_issend_detached(data_handle, dst, mpi_tag, MPI_Comm_f2c(comm), callback, arg);
  1705. }
  1706. /* cache */
  1707. void fstarpu_mpi_cache_flush(MPI_Fint comm, starpu_data_handle_t data_handle)
  1708. {
  1709. return starpu_mpi_cache_flush(MPI_Comm_f2c(comm), data_handle);
  1710. }
  1711. void fstarpu_mpi_cache_flush_all_data(MPI_Fint comm)
  1712. {
  1713. return starpu_mpi_cache_flush_all_data(MPI_Comm_f2c(comm));
  1714. }
  1715. int fstarpu_mpi_comm_size(MPI_Fint comm, int *size)
  1716. {
  1717. return starpu_mpi_comm_size(MPI_Comm_f2c(comm), size);
  1718. }
  1719. int fstarpu_mpi_comm_rank(MPI_Fint comm, int *rank)
  1720. {
  1721. return starpu_mpi_comm_rank(MPI_Comm_f2c(comm), rank);
  1722. }
  1723. MPI_Fint fstarpu_mpi_world_comm()
  1724. {
  1725. return MPI_Comm_c2f(MPI_COMM_WORLD);
  1726. }
  1727. void fstarpu_mpi_data_register_comm(starpu_data_handle_t handle, int tag, int rank, MPI_Fint comm)
  1728. {
  1729. return starpu_mpi_data_register_comm(handle, tag, rank, MPI_Comm_f2c(comm));
  1730. }
  1731. void fstarpu_mpi_data_register(starpu_data_handle_t handle, int tag, int rank)
  1732. {
  1733. return starpu_mpi_data_register_comm(handle, tag, rank, MPI_COMM_WORLD);
  1734. }
  1735. void fstarpu_mpi_data_set_rank_comm(starpu_data_handle_t handle, int rank, MPI_Fint comm)
  1736. {
  1737. return starpu_mpi_data_set_rank_comm(handle, rank, MPI_Comm_f2c(comm));
  1738. }
  1739. void fstarpu_mpi_data_set_rank(starpu_data_handle_t handle, int rank)
  1740. {
  1741. return starpu_mpi_data_set_rank_comm(handle, rank, MPI_COMM_WORLD);
  1742. }
  1743. int fstarpu_mpi_wait_for_all(MPI_Fint comm)
  1744. {
  1745. return starpu_mpi_wait_for_all(MPI_Comm_f2c(comm));
  1746. }