12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442 |
- /* StarPU --- Runtime system for heterogeneous multicore architectures.
- *
- * Copyright (C) 2009, 2010-2014 Université de Bordeaux 1
- * Copyright (C) 2010, 2011, 2012, 2013, 2014 Centre National de la Recherche Scientifique
- *
- * StarPU is free software; you can redistribute it and/or modify
- * it under the terms of the GNU Lesser General Public License as published by
- * the Free Software Foundation; either version 2.1 of the License, or (at
- * your option) any later version.
- *
- * StarPU is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
- *
- * See the GNU Lesser General Public License in COPYING.LGPL for more details.
- */
- #include <stdlib.h>
- #include <starpu_mpi.h>
- #include <starpu_mpi_datatype.h>
- #include <starpu_mpi_private.h>
- #include <starpu_profiling.h>
- #include <starpu_mpi_stats.h>
- #include <starpu_mpi_task_insert.h>
- #include <starpu_mpi_early_data.h>
- #include <starpu_mpi_early_request.h>
- #include <common/config.h>
- #include <common/thread.h>
- #include <datawizard/interfaces/data_interface.h>
- #include <datawizard/coherency.h>
- static void _starpu_mpi_add_sync_point_in_fxt(void);
- static void _starpu_mpi_submit_ready_request(void *arg);
- static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req);
- #ifdef STARPU_VERBOSE
- static char *_starpu_mpi_request_type(enum _starpu_mpi_request_type request_type);
- #endif
- static struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t data_handle,
- int dest, int mpi_tag, MPI_Comm comm,
- unsigned detached, void (*callback)(void *), void *arg,
- int sequential_consistency);
- static struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle,
- int source, int mpi_tag, MPI_Comm comm,
- unsigned detached, void (*callback)(void *), void *arg,
- int sequential_consistency, int is_internal_req,
- ssize_t count);
- static void _starpu_mpi_handle_detached_request(struct _starpu_mpi_req *req);
- /* The list of ready requests */
- static struct _starpu_mpi_req_list *ready_requests;
- /* The list of detached requests that have already been submitted to MPI */
- static struct _starpu_mpi_req_list *detached_requests;
- static starpu_pthread_mutex_t detached_requests_mutex;
- /* Condition to wake up progression thread */
- static starpu_pthread_cond_t cond_progression;
- /* Condition to wake up waiting for all current MPI requests to finish */
- static starpu_pthread_cond_t cond_finished;
- static starpu_pthread_mutex_t mutex;
- static starpu_pthread_t progress_thread;
- static int running = 0;
- /* Count requests posted by the application and not yet submitted to MPI */
- static starpu_pthread_mutex_t mutex_posted_requests;
- static int posted_requests = 0, newer_requests, barrier_running = 0;
- #define _STARPU_MPI_INC_POSTED_REQUESTS(value) { STARPU_PTHREAD_MUTEX_LOCK(&mutex_posted_requests); posted_requests += value; STARPU_PTHREAD_MUTEX_UNLOCK(&mutex_posted_requests); }
- static void _starpu_mpi_request_init(struct _starpu_mpi_req **req)
- {
- *req = malloc(sizeof(struct _starpu_mpi_req));
- STARPU_ASSERT_MSG(*req, "Invalid request");
- /* Initialize the request structure */
- (*req)->data_handle = NULL;
- (*req)->datatype = 0;
- (*req)->ptr = NULL;
- (*req)->count = -1;
- (*req)->user_datatype = -1;
- (*req)->srcdst = -1;
- (*req)->mpi_tag = -1;
- (*req)->comm = 0;
- (*req)->func = NULL;
- (*req)->status = NULL;
- (*req)->request = 0;
- (*req)->flag = NULL;
- (*req)->ret = -1;
- STARPU_PTHREAD_MUTEX_INIT(&((*req)->req_mutex), NULL);
- STARPU_PTHREAD_COND_INIT(&((*req)->req_cond), NULL);
- STARPU_PTHREAD_MUTEX_INIT(&((*req)->posted_mutex), NULL);
- STARPU_PTHREAD_COND_INIT(&((*req)->posted_cond), NULL);
- (*req)->request_type = UNKNOWN_REQ;
- (*req)->submitted = 0;
- (*req)->completed = 0;
- (*req)->posted = 0;
- (*req)->other_request = NULL;
- (*req)->detached = -1;
- (*req)->callback = NULL;
- (*req)->callback_arg = NULL;
- (*req)->size_req = 0;
- (*req)->internal_req = NULL;
- (*req)->is_internal_req = 0;
- (*req)->envelope = NULL;
- (*req)->sequential_consistency = 1;
- }
- /********************************************************/
- /* */
- /* Send/Receive functionalities */
- /* */
- /********************************************************/
- static struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle_t data_handle,
- int srcdst, int mpi_tag, MPI_Comm comm,
- unsigned detached, void (*callback)(void *), void *arg,
- enum _starpu_mpi_request_type request_type, void (*func)(struct _starpu_mpi_req *),
- enum starpu_data_access_mode mode,
- int sequential_consistency,
- int is_internal_req,
- ssize_t count)
- {
- struct _starpu_mpi_req *req;
- _STARPU_MPI_LOG_IN();
- _STARPU_MPI_INC_POSTED_REQUESTS(1);
- /* Initialize the request structure */
- _starpu_mpi_request_init(&req);
- req->request_type = request_type;
- req->data_handle = data_handle;
- req->srcdst = srcdst;
- req->mpi_tag = mpi_tag;
- req->comm = comm;
- req->detached = detached;
- req->callback = callback;
- req->callback_arg = arg;
- req->func = func;
- req->sequential_consistency = sequential_consistency;
- req->is_internal_req = is_internal_req;
- req->count = count;
- /* Asynchronously request StarPU to fetch the data in main memory: when
- * it is available in main memory, _starpu_mpi_submit_ready_request(req) is called and
- * the request is actually submitted */
- starpu_data_acquire_cb_sequential_consistency(data_handle, mode, _starpu_mpi_submit_ready_request, (void *)req, sequential_consistency);
- _STARPU_MPI_LOG_OUT();
- return req;
- }
- /********************************************************/
- /* */
- /* Send functionalities */
- /* */
- /********************************************************/
- static void _starpu_mpi_isend_data_func(struct _starpu_mpi_req *req)
- {
- _STARPU_MPI_LOG_IN();
- _STARPU_MPI_DEBUG(2, "post MPI isend request %p type %s tag %d src %d data %p datasize %ld ptr %p datatype '%s' count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, starpu_data_get_size(req->data_handle), req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
- _starpu_mpi_comm_amounts_inc(req->comm, req->srcdst, req->datatype, req->count);
- TRACE_MPI_ISEND_SUBMIT_BEGIN(req->srcdst, req->mpi_tag, 0);
- req->ret = MPI_Isend(req->ptr, req->count, req->datatype, req->srcdst, _starpu_mpi_tag, req->comm, &req->request);
- STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Isend returning %d", req->ret);
- TRACE_MPI_ISEND_SUBMIT_END(req->srcdst, req->mpi_tag, 0);
- /* somebody is perhaps waiting for the MPI request to be posted */
- STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
- req->submitted = 1;
- STARPU_PTHREAD_COND_BROADCAST(&req->req_cond);
- STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
- _starpu_mpi_handle_detached_request(req);
- _STARPU_MPI_LOG_OUT();
- }
- static void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
- {
- _starpu_mpi_handle_allocate_datatype(req->data_handle, &req->datatype, &req->user_datatype);
- req->envelope = calloc(1,sizeof(struct _starpu_mpi_envelope));
- req->envelope->mpi_tag = req->mpi_tag;
- if (req->user_datatype == 0)
- {
- int size;
- req->count = 1;
- req->ptr = starpu_data_get_local_ptr(req->data_handle);
- MPI_Type_size(req->datatype, &size);
- req->envelope->size = (ssize_t)req->count * size;
- _STARPU_MPI_DEBUG(1, "Post MPI isend count (%ld) datatype_size %ld request to %d with tag %d\n",req->count,starpu_data_get_size(req->data_handle),req->srcdst, _starpu_mpi_tag);
- MPI_Isend(req->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _starpu_mpi_tag, req->comm, &req->size_req);
- }
- else
- {
- int ret;
- // Do not pack the data, just try to find out the size
- starpu_data_pack(req->data_handle, NULL, &(req->envelope->size));
- if (req->envelope->size != -1)
- {
- // We already know the size of the data, let's send it to overlap with the packing of the data
- _STARPU_MPI_DEBUG(1, "Sending size %ld (%ld %s) with tag %d to node %d (first call to pack)\n", req->envelope->size, sizeof(req->count), _starpu_mpi_datatype(MPI_BYTE), _starpu_mpi_tag, req->srcdst);
- req->count = req->envelope->size;
- ret = MPI_Isend(req->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _starpu_mpi_tag, req->comm, &req->size_req);
- STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "when sending size, MPI_Isend returning %d", ret);
- }
- // Pack the data
- starpu_data_pack(req->data_handle, &req->ptr, &req->count);
- if (req->envelope->size == -1)
- {
- // We know the size now, let's send it
- _STARPU_MPI_DEBUG(1, "Sending size %ld (%ld %s) with tag %d to node %d (second call to pack)\n", req->envelope->size, sizeof(req->count), _starpu_mpi_datatype(MPI_BYTE), _starpu_mpi_tag, req->srcdst);
- ret = MPI_Isend(req->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _starpu_mpi_tag, req->comm, &req->size_req);
- STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "when sending size, MPI_Isend returning %d", ret);
- }
- else
- {
- // We check the size returned with the 2 calls to pack is the same
- STARPU_ASSERT_MSG(req->count == req->envelope->size, "Calls to pack_data returned different sizes %ld != %ld", req->count, req->envelope->size);
- }
- // We can send the data now
- }
- _starpu_mpi_isend_data_func(req);
- }
- static struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t data_handle,
- int dest, int mpi_tag, MPI_Comm comm,
- unsigned detached, void (*callback)(void *), void *arg,
- int sequential_consistency)
- {
- return _starpu_mpi_isend_irecv_common(data_handle, dest, mpi_tag, comm, detached, callback, arg, SEND_REQ, _starpu_mpi_isend_size_func, STARPU_R, sequential_consistency, 0, 0);
- }
- int starpu_mpi_isend(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int dest, int mpi_tag, MPI_Comm comm)
- {
- _STARPU_MPI_LOG_IN();
- STARPU_ASSERT_MSG(public_req, "starpu_mpi_isend needs a valid starpu_mpi_req");
- struct _starpu_mpi_req *req;
- req = _starpu_mpi_isend_common(data_handle, dest, mpi_tag, comm, 0, NULL, NULL, 1);
- STARPU_ASSERT_MSG(req, "Invalid return for _starpu_mpi_isend_common");
- *public_req = req;
- _STARPU_MPI_LOG_OUT();
- return 0;
- }
- int starpu_mpi_isend_detached(starpu_data_handle_t data_handle,
- int dest, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
- {
- _STARPU_MPI_LOG_IN();
- _starpu_mpi_isend_common(data_handle, dest, mpi_tag, comm, 1, callback, arg, 1);
- _STARPU_MPI_LOG_OUT();
- return 0;
- }
- int starpu_mpi_send(starpu_data_handle_t data_handle, int dest, int mpi_tag, MPI_Comm comm)
- {
- starpu_mpi_req req;
- MPI_Status status;
- _STARPU_MPI_LOG_IN();
- memset(&status, 0, sizeof(MPI_Status));
- starpu_mpi_isend(data_handle, &req, dest, mpi_tag, comm);
- starpu_mpi_wait(&req, &status);
- _STARPU_MPI_LOG_OUT();
- return 0;
- }
- /********************************************************/
- /* */
- /* receive functionalities */
- /* */
- /********************************************************/
- static void _starpu_mpi_irecv_data_func(struct _starpu_mpi_req *req)
- {
- _STARPU_MPI_LOG_IN();
- _STARPU_MPI_DEBUG(20, "post MPI irecv request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
- TRACE_MPI_IRECV_SUBMIT_BEGIN(req->srcdst, req->mpi_tag);
- req->ret = MPI_Irecv(req->ptr, req->count, req->datatype, req->srcdst, _starpu_mpi_tag, req->comm, &req->request);
- STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_IRecv returning %d", req->ret);
- TRACE_MPI_IRECV_SUBMIT_END(req->srcdst, req->mpi_tag);
- /* somebody is perhaps waiting for the MPI request to be posted */
- STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
- req->submitted = 1;
- STARPU_PTHREAD_COND_BROADCAST(&req->req_cond);
- STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
- _starpu_mpi_handle_detached_request(req);
- _STARPU_MPI_LOG_OUT();
- }
- static struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, unsigned detached, void (*callback)(void *), void *arg, int sequential_consistency, int is_internal_req, ssize_t count)
- {
- return _starpu_mpi_isend_irecv_common(data_handle, source, mpi_tag, comm, detached, callback, arg, RECV_REQ, _starpu_mpi_irecv_data_func, STARPU_W, sequential_consistency, is_internal_req, count);
- }
- int starpu_mpi_irecv(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int source, int mpi_tag, MPI_Comm comm)
- {
- _STARPU_MPI_LOG_IN();
- STARPU_ASSERT_MSG(public_req, "starpu_mpi_irecv needs a valid starpu_mpi_req");
- // // We check if a tag is defined for the data handle, if not,
- // // we define the one given for the communication.
- // // A tag is necessary for the internal mpi engine.
- // int tag = starpu_data_get_tag(data_handle);
- // if (tag == -1)
- // starpu_data_set_tag(data_handle, mpi_tag);
- struct _starpu_mpi_req *req;
- req = _starpu_mpi_irecv_common(data_handle, source, mpi_tag, comm, 0, NULL, NULL, 1, 0, 0);
- STARPU_ASSERT_MSG(req, "Invalid return for _starpu_mpi_irecv_common");
- *public_req = req;
- _STARPU_MPI_LOG_OUT();
- return 0;
- }
- int starpu_mpi_irecv_detached(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
- {
- _STARPU_MPI_LOG_IN();
- // // We check if a tag is defined for the data handle, if not,
- // // we define the one given for the communication.
- // // A tag is necessary for the internal mpi engine.
- // int tag = starpu_data_get_tag(data_handle);
- // if (tag == -1)
- // starpu_data_set_tag(data_handle, mpi_tag);
- _starpu_mpi_irecv_common(data_handle, source, mpi_tag, comm, 1, callback, arg, 1, 0, 0);
- _STARPU_MPI_LOG_OUT();
- return 0;
- }
- int starpu_mpi_irecv_detached_sequential_consistency(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg, int sequential_consistency)
- {
- _STARPU_MPI_LOG_IN();
- // // We check if a tag is defined for the data handle, if not,
- // // we define the one given for the communication.
- // // A tag is necessary for the internal mpi engine.
- // int tag = starpu_data_get_tag(data_handle);
- // if (tag == -1)
- // starpu_data_set_tag(data_handle, mpi_tag);
- _starpu_mpi_irecv_common(data_handle, source, mpi_tag, comm, 1, callback, arg, sequential_consistency, 0, 0);
- _STARPU_MPI_LOG_OUT();
- return 0;
- }
- int starpu_mpi_recv(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, MPI_Status *status)
- {
- starpu_mpi_req req;
- _STARPU_MPI_LOG_IN();
- // // We check if a tag is defined for the data handle, if not,
- // // we define the one given for the communication.
- // // A tag is necessary for the internal mpi engine.
- // int tag = starpu_data_get_tag(data_handle);
- // if (tag == -1)
- // starpu_data_set_tag(data_handle, mpi_tag);
- starpu_mpi_irecv(data_handle, &req, source, mpi_tag, comm);
- starpu_mpi_wait(&req, status);
- _STARPU_MPI_LOG_OUT();
- return 0;
- }
- /********************************************************/
- /* */
- /* Wait functionalities */
- /* */
- /********************************************************/
- static void _starpu_mpi_wait_func(struct _starpu_mpi_req *waiting_req)
- {
- _STARPU_MPI_LOG_IN();
- /* Which is the mpi request we are waiting for ? */
- struct _starpu_mpi_req *req = waiting_req->other_request;
- TRACE_MPI_UWAIT_BEGIN(req->srcdst, req->mpi_tag);
- req->ret = MPI_Wait(&req->request, waiting_req->status);
- STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Wait returning %d", req->ret);
- TRACE_MPI_UWAIT_END(req->srcdst, req->mpi_tag);
- _starpu_mpi_handle_request_termination(req);
- _STARPU_MPI_LOG_OUT();
- }
- int starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status)
- {
- int ret;
- struct _starpu_mpi_req *req = *public_req;
- struct _starpu_mpi_req *waiting_req;
- _STARPU_MPI_LOG_IN();
- _STARPU_MPI_INC_POSTED_REQUESTS(1);
- /* We cannot try to complete a MPI request that was not actually posted
- * to MPI yet. */
- STARPU_PTHREAD_MUTEX_LOCK(&(req->req_mutex));
- while (!(req->submitted))
- STARPU_PTHREAD_COND_WAIT(&(req->req_cond), &(req->req_mutex));
- STARPU_PTHREAD_MUTEX_UNLOCK(&(req->req_mutex));
- /* Initialize the request structure */
- _starpu_mpi_request_init(&waiting_req);
- waiting_req->status = status;
- waiting_req->other_request = req;
- waiting_req->func = _starpu_mpi_wait_func;
- waiting_req->request_type = WAIT_REQ;
- _starpu_mpi_submit_ready_request(waiting_req);
- /* We wait for the MPI request to finish */
- STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
- while (!req->completed)
- STARPU_PTHREAD_COND_WAIT(&req->req_cond, &req->req_mutex);
- STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
- ret = req->ret;
- /* The internal request structure was automatically allocated */
- *public_req = NULL;
- free(req);
- free(waiting_req);
- _STARPU_MPI_LOG_OUT();
- return ret;
- }
- /********************************************************/
- /* */
- /* Test functionalities */
- /* */
- /********************************************************/
- static void _starpu_mpi_test_func(struct _starpu_mpi_req *testing_req)
- {
- _STARPU_MPI_LOG_IN();
- /* Which is the mpi request we are testing for ? */
- struct _starpu_mpi_req *req = testing_req->other_request;
- _STARPU_MPI_DEBUG(2, "Test request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n",
- req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
- TRACE_MPI_UTESTING_BEGIN(req->srcdst, req->mpi_tag);
- req->ret = MPI_Test(&req->request, testing_req->flag, testing_req->status);
- STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Test returning %d", req->ret);
- TRACE_MPI_UTESTING_END(req->srcdst, req->mpi_tag);
- if (*testing_req->flag)
- {
- testing_req->ret = req->ret;
- _starpu_mpi_handle_request_termination(req);
- }
- STARPU_PTHREAD_MUTEX_LOCK(&testing_req->req_mutex);
- testing_req->completed = 1;
- STARPU_PTHREAD_COND_SIGNAL(&testing_req->req_cond);
- STARPU_PTHREAD_MUTEX_UNLOCK(&testing_req->req_mutex);
- _STARPU_MPI_LOG_OUT();
- }
- int starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status)
- {
- _STARPU_MPI_LOG_IN();
- int ret = 0;
- STARPU_ASSERT_MSG(public_req, "starpu_mpi_test needs a valid starpu_mpi_req");
- struct _starpu_mpi_req *req = *public_req;
- STARPU_ASSERT_MSG(!req->detached, "MPI_Test cannot be called on a detached request");
- STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
- unsigned submitted = req->submitted;
- STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
- if (submitted)
- {
- struct _starpu_mpi_req *testing_req;
- _starpu_mpi_request_init(&testing_req);
- /* Initialize the request structure */
- STARPU_PTHREAD_MUTEX_INIT(&(testing_req->req_mutex), NULL);
- STARPU_PTHREAD_COND_INIT(&(testing_req->req_cond), NULL);
- testing_req->flag = flag;
- testing_req->status = status;
- testing_req->other_request = req;
- testing_req->func = _starpu_mpi_test_func;
- testing_req->completed = 0;
- testing_req->request_type = TEST_REQ;
- _STARPU_MPI_INC_POSTED_REQUESTS(1);
- _starpu_mpi_submit_ready_request(testing_req);
- /* We wait for the test request to finish */
- STARPU_PTHREAD_MUTEX_LOCK(&(testing_req->req_mutex));
- while (!(testing_req->completed))
- STARPU_PTHREAD_COND_WAIT(&(testing_req->req_cond), &(testing_req->req_mutex));
- STARPU_PTHREAD_MUTEX_UNLOCK(&(testing_req->req_mutex));
- ret = testing_req->ret;
- if (*(testing_req->flag))
- {
- /* The request was completed so we free the internal
- * request structure which was automatically allocated
- * */
- *public_req = NULL;
- free(req);
- }
- free(testing_req);
- }
- else
- {
- *flag = 0;
- }
- _STARPU_MPI_LOG_OUT();
- return ret;
- }
- /********************************************************/
- /* */
- /* Barrier functionalities */
- /* */
- /********************************************************/
- static void _starpu_mpi_barrier_func(struct _starpu_mpi_req *barrier_req)
- {
- _STARPU_MPI_LOG_IN();
- barrier_req->ret = MPI_Barrier(barrier_req->comm);
- STARPU_ASSERT_MSG(barrier_req->ret == MPI_SUCCESS, "MPI_Barrier returning %d", barrier_req->ret);
- _starpu_mpi_handle_request_termination(barrier_req);
- _STARPU_MPI_LOG_OUT();
- }
- int starpu_mpi_barrier(MPI_Comm comm)
- {
- int ret;
- struct _starpu_mpi_req *barrier_req;
- _STARPU_MPI_LOG_IN();
- _starpu_mpi_request_init(&barrier_req);
- /* First wait for *both* all tasks and MPI requests to finish, in case
- * some tasks generate MPI requests, MPI requests generate tasks, etc.
- */
- STARPU_PTHREAD_MUTEX_LOCK(&mutex);
- STARPU_ASSERT_MSG(!barrier_running, "Concurrent starpu_mpi_barrier is not implemented, even on different communicators");
- barrier_running = 1;
- do
- {
- while (posted_requests)
- /* Wait for all current MPI requests to finish */
- STARPU_PTHREAD_COND_WAIT(&cond_finished, &mutex);
- /* No current request, clear flag */
- newer_requests = 0;
- STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
- /* Now wait for all tasks */
- starpu_task_wait_for_all();
- STARPU_PTHREAD_MUTEX_LOCK(&mutex);
- /* Check newer_requests again, in case some MPI requests
- * triggered by tasks completed and triggered tasks between
- * wait_for_all finished and we take the lock */
- } while (posted_requests || newer_requests);
- barrier_running = 0;
- STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
- /* Initialize the request structure */
- STARPU_PTHREAD_MUTEX_INIT(&(barrier_req->req_mutex), NULL);
- STARPU_PTHREAD_COND_INIT(&(barrier_req->req_cond), NULL);
- barrier_req->func = _starpu_mpi_barrier_func;
- barrier_req->request_type = BARRIER_REQ;
- barrier_req->comm = comm;
- _STARPU_MPI_INC_POSTED_REQUESTS(1);
- _starpu_mpi_submit_ready_request(barrier_req);
- /* We wait for the MPI request to finish */
- STARPU_PTHREAD_MUTEX_LOCK(&barrier_req->req_mutex);
- while (!barrier_req->completed)
- STARPU_PTHREAD_COND_WAIT(&barrier_req->req_cond, &barrier_req->req_mutex);
- STARPU_PTHREAD_MUTEX_UNLOCK(&barrier_req->req_mutex);
- ret = barrier_req->ret;
- free(barrier_req);
- _STARPU_MPI_LOG_OUT();
- return ret;
- }
- /********************************************************/
- /* */
- /* Progression */
- /* */
- /********************************************************/
- #ifdef STARPU_VERBOSE
- static char *_starpu_mpi_request_type(enum _starpu_mpi_request_type request_type)
- {
- switch (request_type)
- {
- case SEND_REQ: return "SEND_REQ";
- case RECV_REQ: return "RECV_REQ";
- case WAIT_REQ: return "WAIT_REQ";
- case TEST_REQ: return "TEST_REQ";
- case BARRIER_REQ: return "BARRIER_REQ";
- case UNKNOWN_REQ: return "UNSET_REQ";
- default: return "unknown request type";
- }
- }
- #endif
- static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req)
- {
- int ret;
- _STARPU_MPI_LOG_IN();
- _STARPU_MPI_DEBUG(2, "complete MPI request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d internal_req %p\n",
- req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr,
- _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype, req->internal_req);
- if (req->internal_req)
- {
- struct _starpu_mpi_early_data_handle *early_data_handle = _starpu_mpi_early_data_find(req->mpi_tag, req->srcdst);
- STARPU_ASSERT_MSG(early_data_handle, "Could not find a copy data handle with the tag %d and the node %d\n", req->mpi_tag, req->srcdst);
- _STARPU_MPI_DEBUG(3, "Handling deleting of early_data structure from the hashmap..\n");
- _starpu_mpi_early_data_delete(early_data_handle);
- free(early_data_handle);
- }
- else
- {
- if (req->request_type == RECV_REQ || req->request_type == SEND_REQ)
- {
- if (req->user_datatype == 1)
- {
- if (req->request_type == SEND_REQ)
- {
- // We need to make sure the communication for sending the size
- // has completed, as MPI can re-order messages, let's call
- // MPI_Wait to make sure data have been sent
- ret = MPI_Wait(&req->size_req, MPI_STATUS_IGNORE);
- STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Wait returning %d", ret);
- free(req->ptr);
- }
- if (req->request_type == RECV_REQ)
- {
- // req->ptr is freed by starpu_data_unpack
- starpu_data_unpack(req->data_handle, req->ptr, req->count);
- }
- }
- else
- {
- _starpu_mpi_handle_free_datatype(req->data_handle, &req->datatype);
- }
- }
- }
- if (req->data_handle)
- starpu_data_release(req->data_handle);
- if (req->envelope)
- {
- free(req->envelope);
- req->envelope = NULL;
- }
- /* Execute the specified callback, if any */
- if (req->callback)
- req->callback(req->callback_arg);
- /* tell anyone potentially waiting on the request that it is
- * terminated now */
- STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
- req->completed = 1;
- STARPU_PTHREAD_COND_BROADCAST(&req->req_cond);
- STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
- _STARPU_MPI_LOG_OUT();
- }
- struct _starpu_mpi_early_data_cb_args
- {
- starpu_data_handle_t data_handle;
- starpu_data_handle_t early_handle;
- struct _starpu_mpi_req *req;
- void *buffer;
- };
- static void _starpu_mpi_early_data_cb(void* arg)
- {
- struct _starpu_mpi_early_data_cb_args *args = arg;
- // We store in the application request the internal MPI
- // request so that it can be used by starpu_mpi_wait
- args->req->request = args->req->internal_req->request;
- args->req->submitted = 1;
- if (args->buffer)
- {
- /* Data has been received as a raw memory, it has to be unpacked */
- struct starpu_data_interface_ops *itf_src = starpu_data_get_interface_ops(args->early_handle);
- struct starpu_data_interface_ops *itf_dst = starpu_data_get_interface_ops(args->data_handle);
- STARPU_ASSERT_MSG(itf_dst->unpack_data, "The data interface does not define an unpack function\n");
- itf_dst->unpack_data(args->data_handle, STARPU_MAIN_RAM, args->buffer, itf_src->get_size(args->early_handle));
- free(args->buffer);
- }
- else
- {
- struct starpu_data_interface_ops *itf = starpu_data_get_interface_ops(args->early_handle);
- void* itf_src = starpu_data_get_interface_on_node(args->early_handle, STARPU_MAIN_RAM);
- void* itf_dst = starpu_data_get_interface_on_node(args->data_handle, STARPU_MAIN_RAM);
- if (!itf->copy_methods->ram_to_ram)
- {
- _STARPU_MPI_DEBUG(3, "Initiating any_to_any copy..\n");
- itf->copy_methods->any_to_any(itf_src, STARPU_MAIN_RAM, itf_dst, STARPU_MAIN_RAM, NULL);
- }
- else
- {
- _STARPU_MPI_DEBUG(3, "Initiating ram_to_ram copy..\n");
- itf->copy_methods->ram_to_ram(itf_src, STARPU_MAIN_RAM, itf_dst, STARPU_MAIN_RAM);
- }
- }
- _STARPU_MPI_DEBUG(3, "Done, handling release of early_handle..\n");
- starpu_data_release(args->early_handle);
- _STARPU_MPI_DEBUG(3, "Done, handling unregister of early_handle..\n");
- starpu_data_unregister_submit(args->early_handle);
- _STARPU_MPI_DEBUG(3, "Done, handling request %p termination of the already received request\n",args->req);
- // If the request is detached, we need to call _starpu_mpi_handle_request_termination
- // as it will not be called automatically as the request is not in the list detached_requests
- if (args->req->detached)
- _starpu_mpi_handle_request_termination(args->req);
- // else: If the request is not detached its termination will
- // be handled when calling starpu_mpi_wait
- free(args);
- }
- static void _starpu_mpi_submit_ready_request(void *arg)
- {
- _STARPU_MPI_LOG_IN();
- struct _starpu_mpi_req *req = arg;
- _STARPU_MPI_INC_POSTED_REQUESTS(-1);
- _STARPU_MPI_DEBUG(3, "new req %p srcdst %d tag %d and type %s\n", req, req->srcdst, req->mpi_tag, _starpu_mpi_request_type(req->request_type));
- STARPU_PTHREAD_MUTEX_LOCK(&mutex);
- if (req->request_type == RECV_REQ)
- {
- /* Case : the request is the internal receive request submitted
- * by StarPU-MPI to receive incoming data without a matching
- * early_request from the application. We immediately allocate the
- * pointer associated to the data_handle, and push it into the
- * ready_requests list, so as the real MPI request can be submitted
- * before the next submission of the envelope-catching request. */
- if (req->is_internal_req)
- {
- _starpu_mpi_handle_allocate_datatype(req->data_handle, &req->datatype, &req->user_datatype);
- if (req->user_datatype == 0)
- {
- req->count = 1;
- req->ptr = starpu_data_get_local_ptr(req->data_handle);
- }
- else
- {
- STARPU_ASSERT(req->count);
- req->ptr = malloc(req->count);
- STARPU_ASSERT_MSG(req->ptr, "cannot allocate message of size %ld\n", req->count);
- }
- _STARPU_MPI_DEBUG(3, "Pushing internal starpu_mpi_irecv request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n",
- req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr,
- _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
- _starpu_mpi_req_list_push_front(ready_requests, req);
- /* inform the starpu mpi thread that the request has been pushed in the ready_requests list */
- STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
- STARPU_PTHREAD_MUTEX_LOCK(&req->posted_mutex);
- req->posted = 1;
- STARPU_PTHREAD_COND_BROADCAST(&req->posted_cond);
- STARPU_PTHREAD_MUTEX_UNLOCK(&req->posted_mutex);
- STARPU_PTHREAD_MUTEX_LOCK(&mutex);
- }
- else
- {
- /* test whether the receive request has already been submitted internally by StarPU-MPI*/
- struct _starpu_mpi_early_data_handle *early_data_handle = _starpu_mpi_early_data_find(req->mpi_tag, req->srcdst);
- /* Case: a receive request for a data with the given tag and source has already been
- * posted by StarPU. Asynchronously requests a Read permission over the temporary handle ,
- * so as when the internal receive is completed, the _starpu_mpi_early_data_cb function
- * will be called to bring the data back to the original data handle associated to the request.*/
- if (early_data_handle)
- {
- STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
- STARPU_PTHREAD_MUTEX_LOCK(&(early_data_handle->req_mutex));
- while (!(early_data_handle->req_ready))
- STARPU_PTHREAD_COND_WAIT(&(early_data_handle->req_cond), &(early_data_handle->req_mutex));
- STARPU_PTHREAD_MUTEX_UNLOCK(&(early_data_handle->req_mutex));
- STARPU_PTHREAD_MUTEX_LOCK(&mutex);
- _STARPU_MPI_DEBUG(3, "The RECV request %p with tag %d has already been received, copying previously received data into handle's pointer..\n", req, req->mpi_tag);
- STARPU_ASSERT(req->data_handle != early_data_handle->handle);
- req->internal_req = early_data_handle->req;
- struct _starpu_mpi_early_data_cb_args *cb_args = malloc(sizeof(struct _starpu_mpi_early_data_cb_args));
- cb_args->data_handle = req->data_handle;
- cb_args->early_handle = early_data_handle->handle;
- cb_args->buffer = early_data_handle->buffer;
- cb_args->req = req;
- _STARPU_MPI_DEBUG(3, "Calling data_acquire_cb on starpu_mpi_copy_cb..\n");
- starpu_data_acquire_cb(early_data_handle->handle,STARPU_R,_starpu_mpi_early_data_cb,(void*) cb_args);
- }
- /* Case: no matching data has been received. Store the receive request as an early_request. */
- else
- {
- _STARPU_MPI_DEBUG(3, "Adding the pending receive request %p (srcdst %d tag %d) into the request hashmap\n", req, req->srcdst, req->mpi_tag);
- _starpu_mpi_early_request_add(req);
- }
- }
- }
- else
- {
- _starpu_mpi_req_list_push_front(ready_requests, req);
- _STARPU_MPI_DEBUG(3, "Pushing new request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n",
- req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
- }
- newer_requests = 1;
- STARPU_PTHREAD_COND_BROADCAST(&cond_progression);
- STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
- _STARPU_MPI_LOG_OUT();
- }
- #ifdef STARPU_MPI_ACTIVITY
- static unsigned _starpu_mpi_progression_hook_func(void *arg STARPU_ATTRIBUTE_UNUSED)
- {
- unsigned may_block = 1;
- STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
- if (!_starpu_mpi_req_list_empty(detached_requests))
- {
- STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
- STARPU_PTHREAD_MUTEX_LOCK(&mutex);
- STARPU_PTHREAD_COND_SIGNAL(&cond_progression);
- STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
- may_block = 0;
- }
- else
- STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
- return may_block;
- }
- #endif /* STARPU_MPI_ACTIVITY */
- static void _starpu_mpi_test_detached_requests(void)
- {
- _STARPU_MPI_LOG_IN();
- int flag;
- MPI_Status status;
- struct _starpu_mpi_req *req, *next_req;
- STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
- for (req = _starpu_mpi_req_list_begin(detached_requests);
- req != _starpu_mpi_req_list_end(detached_requests);
- req = next_req)
- {
- next_req = _starpu_mpi_req_list_next(req);
- STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
- //_STARPU_MPI_DEBUG(3, "Test detached request %p - mpitag %d - TYPE %s %d\n", &req->request, req->mpi_tag, _starpu_mpi_request_type(req->request_type), req->srcdst);
- req->ret = MPI_Test(&req->request, &flag, &status);
- STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Test returning %d", req->ret);
- if (flag)
- {
- if (req->request_type == RECV_REQ)
- {
- TRACE_MPI_IRECV_COMPLETE_BEGIN(req->srcdst, req->mpi_tag);
- }
- else if (req->request_type == SEND_REQ)
- {
- TRACE_MPI_ISEND_COMPLETE_BEGIN(req->srcdst, req->mpi_tag, 0);
- }
- _starpu_mpi_handle_request_termination(req);
- if (req->request_type == RECV_REQ)
- {
- TRACE_MPI_IRECV_COMPLETE_END(req->srcdst, req->mpi_tag);
- }
- else if (req->request_type == SEND_REQ)
- {
- TRACE_MPI_ISEND_COMPLETE_END(req->srcdst, req->mpi_tag, 0);
- }
- }
- STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
- if (flag)
- {
- _starpu_mpi_req_list_erase(detached_requests, req);
- #ifdef STARPU_DEVEL
- #warning FIXME: when do we free internal requests
- #endif
- if (!req->is_internal_req)
- free(req);
- }
- }
- STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
- _STARPU_MPI_LOG_OUT();
- }
- static void _starpu_mpi_handle_detached_request(struct _starpu_mpi_req *req)
- {
- if (req->detached)
- {
- /* put the submitted request into the list of pending requests
- * so that it can be handled by the progression mechanisms */
- STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
- _starpu_mpi_req_list_push_front(detached_requests, req);
- STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
- starpu_wake_all_blocked_workers();
- STARPU_PTHREAD_MUTEX_LOCK(&mutex);
- STARPU_PTHREAD_COND_SIGNAL(&cond_progression);
- STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
- }
- }
- static void _starpu_mpi_handle_ready_request(struct _starpu_mpi_req *req)
- {
- _STARPU_MPI_LOG_IN();
- STARPU_ASSERT_MSG(req, "Invalid request");
- /* submit the request to MPI */
- _STARPU_MPI_DEBUG(2, "Handling new request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n",
- req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
- req->func(req);
- _STARPU_MPI_LOG_OUT();
- }
- struct _starpu_mpi_argc_argv
- {
- int initialize_mpi;
- int *argc;
- char ***argv;
- };
- static void _starpu_mpi_print_thread_level_support(int thread_level, char *msg)
- {
- switch (thread_level)
- {
- case MPI_THREAD_SERIALIZED:
- {
- _STARPU_DISP("MPI%s MPI_THREAD_SERIALIZED; Multiple threads may make MPI calls, but only one at a time.\n", msg);
- break;
- }
- case MPI_THREAD_FUNNELED:
- {
- _STARPU_DISP("MPI%s MPI_THREAD_FUNNELED; The application can safely make calls to StarPU-MPI functions, but should not call directly MPI communication functions.\n", msg);
- break;
- }
- case MPI_THREAD_SINGLE:
- {
- _STARPU_DISP("MPI%s MPI_THREAD_SINGLE; MPI does not have multi-thread support, this might cause problems. The application can make calls to StarPU-MPI functions, but not call directly MPI Communication functions.\n", msg);
- break;
- }
- }
- }
- static void *_starpu_mpi_progress_thread_func(void *arg)
- {
- struct _starpu_mpi_argc_argv *argc_argv = (struct _starpu_mpi_argc_argv *) arg;
- int rank, worldsize;
- if (argc_argv->initialize_mpi)
- {
- int thread_support;
- _STARPU_DEBUG("Calling MPI_Init_thread\n");
- if (MPI_Init_thread(argc_argv->argc, argc_argv->argv, MPI_THREAD_SERIALIZED, &thread_support) != MPI_SUCCESS)
- {
- _STARPU_ERROR("MPI_Init_thread failed\n");
- }
- _starpu_mpi_print_thread_level_support(thread_support, "_Init_thread level =");
- }
- else
- {
- int provided;
- MPI_Query_thread(&provided);
- _starpu_mpi_print_thread_level_support(provided, " has been initialized with");
- }
- MPI_Comm_rank(MPI_COMM_WORLD, &rank);
- MPI_Comm_size(MPI_COMM_WORLD, &worldsize);
- MPI_Comm_set_errhandler(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
- {
- TRACE_MPI_START(rank, worldsize);
- #ifdef STARPU_USE_FXT
- starpu_profiling_set_id(rank);
- #endif //STARPU_USE_FXT
- }
- _starpu_mpi_add_sync_point_in_fxt();
- _starpu_mpi_comm_amounts_init(MPI_COMM_WORLD);
- _starpu_mpi_cache_init(MPI_COMM_WORLD);
- _starpu_mpi_early_request_init(worldsize);
- _starpu_mpi_early_data_init(worldsize);
- /* notify the main thread that the progression thread is ready */
- STARPU_PTHREAD_MUTEX_LOCK(&mutex);
- running = 1;
- STARPU_PTHREAD_COND_SIGNAL(&cond_progression);
- STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
- STARPU_PTHREAD_MUTEX_LOCK(&mutex);
- struct _starpu_mpi_envelope *recv_env = calloc(1,sizeof(struct _starpu_mpi_envelope));
- int header_req_submitted = 0;
- while (running || posted_requests || !(_starpu_mpi_req_list_empty(ready_requests)) || !(_starpu_mpi_req_list_empty(detached_requests)))
- {
- /* shall we block ? */
- unsigned block = _starpu_mpi_req_list_empty(ready_requests) && _starpu_mpi_early_request_count() == 0;
- #ifndef STARPU_MPI_ACTIVITY
- STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
- block = block && _starpu_mpi_req_list_empty(detached_requests);
- STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
- #endif /* STARPU_MPI_ACTIVITY */
- if (block)
- {
- _STARPU_MPI_DEBUG(3, "NO MORE REQUESTS TO HANDLE\n");
- TRACE_MPI_SLEEP_BEGIN();
- if (barrier_running)
- /* Tell mpi_barrier */
- STARPU_PTHREAD_COND_SIGNAL(&cond_finished);
- STARPU_PTHREAD_COND_WAIT(&cond_progression, &mutex);
- TRACE_MPI_SLEEP_END();
- }
- /* get one request */
- struct _starpu_mpi_req *req;
- while (!_starpu_mpi_req_list_empty(ready_requests))
- {
- req = _starpu_mpi_req_list_pop_back(ready_requests);
- /* handling a request is likely to block for a while
- * (on a sync_data_with_mem call), we want to let the
- * application submit requests in the meantime, so we
- * release the lock. */
- STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
- _starpu_mpi_handle_ready_request(req);
- STARPU_PTHREAD_MUTEX_LOCK(&mutex);
- }
- /* If there is no currently submitted header_req submitted to
- * catch envelopes from senders, and there is some pending
- * receive requests on our side, we resubmit a header request. */
- MPI_Request header_req;
- if ((_starpu_mpi_early_request_count() > 0) && (header_req_submitted == 0))// && (HASH_COUNT(_starpu_mpi_early_data_handle_hashmap) == 0))
- {
- _STARPU_MPI_DEBUG(3, "Posting a receive to get a data envelop\n");
- MPI_Irecv(recv_env, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, MPI_ANY_SOURCE, _starpu_mpi_tag, MPI_COMM_WORLD, &header_req);
- header_req_submitted = 1;
- }
- /* test whether there are some terminated "detached request" */
- STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
- _starpu_mpi_test_detached_requests();
- STARPU_PTHREAD_MUTEX_LOCK(&mutex);
- if (header_req_submitted == 1)
- {
- int flag,res;
- MPI_Status status;
- _STARPU_MPI_DEBUG(4, "Test of header_req\n");
- /* test whether an envelope has arrived. */
- res = MPI_Test(&header_req, &flag, &status);
- STARPU_ASSERT(res == MPI_SUCCESS);
- if (flag)
- {
- _STARPU_MPI_DEBUG(3, "Searching for application request with tag %d and source %d (size %ld)\n", recv_env->mpi_tag, status.MPI_SOURCE, recv_env->size);
- struct _starpu_mpi_req *found_req = _starpu_mpi_early_request_find(recv_env->mpi_tag, status.MPI_SOURCE);
- /* Case: a data will arrive before a matching receive is
- * posted by the application. Create a temporary handle to
- * store the incoming data, submit a starpu_mpi_irecv_detached
- * on this handle, and store it as an early_data
- */
- if (!found_req)
- {
- _STARPU_MPI_DEBUG(3, "Request with tag %d and source %d not found, creating a early_handle to receive incoming data..\n", recv_env->mpi_tag, status.MPI_SOURCE);
- starpu_data_handle_t data_handle = NULL;
- STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
- data_handle = _starpu_data_get_data_handle_from_tag(recv_env->mpi_tag);
- STARPU_PTHREAD_MUTEX_LOCK(&mutex);
- struct _starpu_mpi_early_data_handle* early_data_handle = calloc(1, sizeof(struct _starpu_mpi_early_data_handle));
- STARPU_ASSERT(early_data_handle);
- STARPU_PTHREAD_MUTEX_INIT(&early_data_handle->req_mutex, NULL);
- STARPU_PTHREAD_COND_INIT(&early_data_handle->req_cond, NULL);
- early_data_handle->mpi_tag = recv_env->mpi_tag;
- early_data_handle->env = recv_env;
- early_data_handle->source = status.MPI_SOURCE;
- if (data_handle)
- {
- early_data_handle->buffer = NULL;
- starpu_data_register_same(&early_data_handle->handle, data_handle);
- _starpu_mpi_early_data_add(early_data_handle);
- }
- else
- {
- /* The application has not registered yet a data with the tag,
- * we are going to receive the data as a raw memory, and give it
- * to the application when it post a receive for this tag
- */
- _STARPU_MPI_DEBUG(20, "Posting a receive for a data of size %d which has not yet been registered\n", (int)early_data_handle->env->size);
- early_data_handle->buffer = malloc(early_data_handle->env->size);
- starpu_vector_data_register(&early_data_handle->handle, STARPU_MAIN_RAM, (uintptr_t) early_data_handle->buffer, early_data_handle->env->size, 1);
- _starpu_mpi_early_data_add(early_data_handle);
- }
- _STARPU_MPI_DEBUG(20, "Posting internal detached irecv on early_handle with tag %d from src %d ..\n", early_data_handle->mpi_tag, status.MPI_SOURCE);
- STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
- early_data_handle->req = _starpu_mpi_irecv_common(early_data_handle->handle, status.MPI_SOURCE,
- early_data_handle->mpi_tag, MPI_COMM_WORLD, 1,
- NULL, NULL, 1, 1, recv_env->size);
- STARPU_PTHREAD_MUTEX_LOCK(&mutex);
- // We wait until the request is pushed in the
- // ready_request list, that ensures that the next loop
- // will call _starpu_mpi_handle_ready_request
- // on the request and post the corresponding mpi_irecv,
- // otherwise, it may lead to read data as envelop
- STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
- STARPU_PTHREAD_MUTEX_LOCK(&(early_data_handle->req->posted_mutex));
- while (!(early_data_handle->req->posted))
- STARPU_PTHREAD_COND_WAIT(&(early_data_handle->req->posted_cond), &(early_data_handle->req->posted_mutex));
- STARPU_PTHREAD_MUTEX_UNLOCK(&(early_data_handle->req->posted_mutex));
- STARPU_PTHREAD_MUTEX_LOCK(&early_data_handle->req_mutex);
- early_data_handle->req_ready = 1;
- STARPU_PTHREAD_COND_BROADCAST(&early_data_handle->req_cond);
- STARPU_PTHREAD_MUTEX_UNLOCK(&early_data_handle->req_mutex);
- STARPU_PTHREAD_MUTEX_LOCK(&mutex);
- }
- /* Case: a matching application request has been found for
- * the incoming data, we handle the correct allocation
- * of the pointer associated to the data handle, then
- * submit the corresponding receive with
- * _starpu_mpi_handle_ready_request. */
- else
- {
- _STARPU_MPI_DEBUG(3, "A matching receive has been found for the incoming data with tag %d\n", recv_env->mpi_tag);
- _starpu_mpi_early_request_delete(found_req);
- _starpu_mpi_handle_allocate_datatype(found_req->data_handle, &found_req->datatype, &found_req->user_datatype);
- if (found_req->user_datatype == 0)
- {
- found_req->count = 1;
- found_req->ptr = starpu_data_get_local_ptr(found_req->data_handle);
- }
- else
- {
- found_req->count = recv_env->size;
- found_req->ptr = malloc(found_req->count);
- STARPU_ASSERT_MSG(found_req->ptr, "cannot allocate message of size %ld\n", found_req->count);
- }
- _STARPU_MPI_DEBUG(3, "Handling new request... \n");
- /* handling a request is likely to block for a while
- * (on a sync_data_with_mem call), we want to let the
- * application submit requests in the meantime, so we
- * release the lock. */
- STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
- _starpu_mpi_handle_ready_request(found_req);
- STARPU_PTHREAD_MUTEX_LOCK(&mutex);
- }
- header_req_submitted = 0;
- }
- else
- {
- _STARPU_MPI_DEBUG(4, "Nothing received, continue ..\n");
- }
- }
- }
- STARPU_ASSERT_MSG(_starpu_mpi_req_list_empty(detached_requests), "List of detached requests not empty");
- STARPU_ASSERT_MSG(_starpu_mpi_req_list_empty(ready_requests), "List of ready requests not empty");
- STARPU_ASSERT_MSG(posted_requests == 0, "Number of posted request is not zero");
- _starpu_mpi_early_request_check_termination();
- _starpu_mpi_early_data_check_termination();
- if (argc_argv->initialize_mpi)
- {
- _STARPU_MPI_DEBUG(3, "Calling MPI_Finalize()\n");
- MPI_Finalize();
- }
- STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
- _starpu_mpi_early_data_free(worldsize);
- _starpu_mpi_early_request_free();
- free(argc_argv);
- free(recv_env);
- return NULL;
- }
- /********************************************************/
- /* */
- /* (De)Initialization methods */
- /* */
- /********************************************************/
- #ifdef STARPU_MPI_ACTIVITY
- static int hookid = - 1;
- #endif /* STARPU_MPI_ACTIVITY */
- static void _starpu_mpi_add_sync_point_in_fxt(void)
- {
- #ifdef STARPU_USE_FXT
- int rank;
- int worldsize;
- int ret;
- MPI_Comm_rank(MPI_COMM_WORLD, &rank);
- MPI_Comm_size(MPI_COMM_WORLD, &worldsize);
- ret = MPI_Barrier(MPI_COMM_WORLD);
- STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Barrier returning %d", ret);
- /* We generate a "unique" key so that we can make sure that different
- * FxT traces come from the same MPI run. */
- int random_number;
- /* XXX perhaps we don't want to generate a new seed if the application
- * specified some reproductible behaviour ? */
- if (rank == 0)
- {
- srand(time(NULL));
- random_number = rand();
- }
- ret = MPI_Bcast(&random_number, 1, MPI_INT, 0, MPI_COMM_WORLD);
- STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Bcast returning %d", ret);
- TRACE_MPI_BARRIER(rank, worldsize, random_number);
- _STARPU_MPI_DEBUG(3, "unique key %x\n", random_number);
- #endif
- }
- static
- int _starpu_mpi_initialize(int *argc, char ***argv, int initialize_mpi)
- {
- STARPU_PTHREAD_MUTEX_INIT(&mutex, NULL);
- STARPU_PTHREAD_COND_INIT(&cond_progression, NULL);
- STARPU_PTHREAD_COND_INIT(&cond_finished, NULL);
- ready_requests = _starpu_mpi_req_list_new();
- STARPU_PTHREAD_MUTEX_INIT(&detached_requests_mutex, NULL);
- detached_requests = _starpu_mpi_req_list_new();
- STARPU_PTHREAD_MUTEX_INIT(&mutex_posted_requests, NULL);
- struct _starpu_mpi_argc_argv *argc_argv = malloc(sizeof(struct _starpu_mpi_argc_argv));
- argc_argv->initialize_mpi = initialize_mpi;
- argc_argv->argc = argc;
- argc_argv->argv = argv;
- #ifdef STARPU_MPI_ACTIVITY
- hookid = starpu_progression_hook_register(_starpu_mpi_progression_hook_func, NULL);
- STARPU_ASSERT_MSG(hookid >= 0, "starpu_progression_hook_register failed");
- #endif /* STARPU_MPI_ACTIVITY */
- STARPU_PTHREAD_CREATE(&progress_thread, NULL, _starpu_mpi_progress_thread_func, argc_argv);
- STARPU_PTHREAD_MUTEX_LOCK(&mutex);
- while (!running)
- STARPU_PTHREAD_COND_WAIT(&cond_progression, &mutex);
- STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
- return 0;
- }
- int starpu_mpi_init(int *argc, char ***argv, int initialize_mpi)
- {
- return _starpu_mpi_initialize(argc, argv, initialize_mpi);
- }
- int starpu_mpi_initialize(void)
- {
- return _starpu_mpi_initialize(NULL, NULL, 0);
- }
- int starpu_mpi_initialize_extended(int *rank, int *world_size)
- {
- int ret;
- ret = _starpu_mpi_initialize(NULL, NULL, 1);
- if (ret == 0)
- {
- _STARPU_DEBUG("Calling MPI_Comm_rank\n");
- MPI_Comm_rank(MPI_COMM_WORLD, rank);
- MPI_Comm_size(MPI_COMM_WORLD, world_size);
- }
- return ret;
- }
- int starpu_mpi_shutdown(void)
- {
- void *value;
- int rank, world_size;
- /* We need to get the rank before calling MPI_Finalize to pass to _starpu_mpi_comm_amounts_display() */
- MPI_Comm_rank(MPI_COMM_WORLD, &rank);
- MPI_Comm_size(MPI_COMM_WORLD, &world_size);
- /* kill the progression thread */
- STARPU_PTHREAD_MUTEX_LOCK(&mutex);
- running = 0;
- STARPU_PTHREAD_COND_BROADCAST(&cond_progression);
- STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
- starpu_pthread_join(progress_thread, &value);
- #ifdef STARPU_MPI_ACTIVITY
- starpu_progression_hook_deregister(hookid);
- #endif /* STARPU_MPI_ACTIVITY */
- TRACE_MPI_STOP(rank, world_size);
- /* free the request queues */
- _starpu_mpi_req_list_delete(detached_requests);
- _starpu_mpi_req_list_delete(ready_requests);
- _starpu_mpi_comm_amounts_display(rank);
- _starpu_mpi_comm_amounts_free();
- _starpu_mpi_cache_free(world_size);
- return 0;
- }
- void _starpu_mpi_clear_cache(starpu_data_handle_t data_handle)
- {
- starpu_mpi_cache_flush(MPI_COMM_WORLD, data_handle);
- }
- void starpu_mpi_data_register(starpu_data_handle_t data_handle, int tag, int rank)
- {
- _starpu_data_set_rank(data_handle, rank);
- _starpu_data_set_tag(data_handle, tag);
- _starpu_data_set_unregister_hook(data_handle, _starpu_mpi_clear_cache);
- }
- int starpu_mpi_world_rank(void)
- {
- int rank;
- MPI_Comm_rank(MPI_COMM_WORLD, &rank);
- return rank;
- }
|