123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406 |
- /*
- * StarPU
- * Copyright (C) INRIA 2008-2009 (see AUTHORS file)
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU Lesser General Public License as published by
- * the Free Software Foundation; either version 2.1 of the License, or (at
- * your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
- *
- * See the GNU Lesser General Public License in COPYING.LGPL for more details.
- */
- #include <common/config.h>
- #include <datawizard/data_request.h>
- #include <pthread.h>
- #include <common/utils.h>
- /* requests that have not been treated at all */
- static starpu_data_request_list_t data_requests[STARPU_MAXNODES];
- static pthread_cond_t data_requests_list_cond[STARPU_MAXNODES];
- static pthread_mutex_t data_requests_list_mutex[STARPU_MAXNODES];
- /* requests that are not terminated (eg. async transfers) */
- static starpu_data_request_list_t data_requests_pending[STARPU_MAXNODES];
- static pthread_cond_t data_requests_pending_list_cond[STARPU_MAXNODES];
- static pthread_mutex_t data_requests_pending_list_mutex[STARPU_MAXNODES];
- void _starpu_init_data_request_lists(void)
- {
- unsigned i;
- for (i = 0; i < STARPU_MAXNODES; i++)
- {
- data_requests[i] = starpu_data_request_list_new();
- PTHREAD_MUTEX_INIT(&data_requests_list_mutex[i], NULL);
- PTHREAD_COND_INIT(&data_requests_list_cond[i], NULL);
- data_requests_pending[i] = starpu_data_request_list_new();
- PTHREAD_MUTEX_INIT(&data_requests_pending_list_mutex[i], NULL);
- PTHREAD_COND_INIT(&data_requests_pending_list_cond[i], NULL);
- }
- }
- void _starpu_deinit_data_request_lists(void)
- {
- unsigned i;
- for (i = 0; i < STARPU_MAXNODES; i++)
- {
- PTHREAD_COND_DESTROY(&data_requests_pending_list_cond[i]);
- PTHREAD_MUTEX_DESTROY(&data_requests_pending_list_mutex[i]);
- starpu_data_request_list_delete(data_requests_pending[i]);
- PTHREAD_COND_DESTROY(&data_requests_list_cond[i]);
- PTHREAD_MUTEX_DESTROY(&data_requests_list_mutex[i]);
- starpu_data_request_list_delete(data_requests[i]);
- }
- }
- /* this should be called with the lock r->handle->header_lock taken */
- static void starpu_data_request_destroy(starpu_data_request_t r)
- {
- r->handle->per_node[r->dst_node].request = NULL;
- starpu_data_request_delete(r);
- }
- /* handle->lock should already be taken ! */
- starpu_data_request_t _starpu_create_data_request(starpu_data_handle handle, uint32_t src_node, uint32_t dst_node, uint32_t handling_node, uint8_t read, uint8_t write, unsigned is_prefetch)
- {
- starpu_data_request_t r = starpu_data_request_new();
- _starpu_spin_init(&r->lock);
- r->handle = handle;
- r->src_node = src_node;
- r->dst_node = dst_node;
- r->read = read;
- r->write = write;
- r->handling_node = handling_node;
- r->completed = 0;
- r->retval = -1;
- r->next_req_count = 0;
- r->strictness = 1;
- r->is_a_prefetch_request = is_prefetch;
- /* associate that request with the handle so that further similar
- * requests will reuse that one */
- _starpu_spin_lock(&r->lock);
- handle->per_node[dst_node].request = r;
- handle->per_node[dst_node].refcnt++;
- if (read)
- handle->per_node[src_node].refcnt++;
- r->refcnt = 1;
- _starpu_spin_unlock(&r->lock);
- return r;
- }
- /* handle->lock should be taken */
- starpu_data_request_t _starpu_search_existing_data_request(starpu_data_handle handle, uint32_t dst_node, uint8_t read, uint8_t write)
- {
- starpu_data_request_t r = handle->per_node[dst_node].request;
- if (r)
- {
- /* perhaps we need to "upgrade" the request */
- if (read)
- {
- /* in case the exisiting request did not imply a memory
- * transfer yet, we have to increment the refcnt now
- * (so that the source remains valid) */
- if (!r->read)
- handle->per_node[dst_node].refcnt++;
- r->read = 1;
- }
- if (write)
- r->write = 1;
- _starpu_spin_lock(&r->lock);
- }
- return r;
- }
- int _starpu_wait_data_request_completion(starpu_data_request_t r, unsigned may_alloc)
- {
- int retval;
- int do_delete = 0;
- uint32_t local_node = _starpu_get_local_memory_node();
- do {
- _starpu_spin_lock(&r->lock);
- if (r->completed)
- break;
- _starpu_spin_unlock(&r->lock);
- _starpu_wake_all_blocked_workers_on_node(r->handling_node);
- _starpu_datawizard_progress(local_node, may_alloc);
- } while (1);
- retval = r->retval;
- if (retval)
- fprintf(stderr, "REQUEST %p COMPLETED (retval %d) !\n", r, r->retval);
-
- r->refcnt--;
- /* if nobody is waiting on that request, we can get rid of it */
- if (r->refcnt == 0)
- do_delete = 1;
- _starpu_spin_unlock(&r->lock);
-
- if (do_delete)
- starpu_data_request_destroy(r);
-
- return retval;
- }
- /* this is non blocking */
- void _starpu_post_data_request(starpu_data_request_t r, uint32_t handling_node)
- {
- // fprintf(stderr, "POST REQUEST\n");
- if (r->read)
- {
- STARPU_ASSERT(r->handle->per_node[r->src_node].allocated);
- STARPU_ASSERT(r->handle->per_node[r->src_node].refcnt);
- }
- /* insert the request in the proper list */
- PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[handling_node]);
- starpu_data_request_list_push_front(data_requests[handling_node], r);
- PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[handling_node]);
- _starpu_wake_all_blocked_workers_on_node(handling_node);
- }
- static void starpu_handle_data_request_completion(starpu_data_request_t r)
- {
- unsigned do_delete = 0;
- starpu_data_handle handle = r->handle;
- uint32_t src_node = r->src_node;
- uint32_t dst_node = r->dst_node;
- _starpu_update_data_state(handle, dst_node, r->write);
- #ifdef STARPU_USE_FXT
- size_t size = handle->ops->get_size(handle);
- STARPU_TRACE_END_DRIVER_COPY(src_node, dst_node, size, r->com_id);
- #endif
- unsigned chained_req;
- for (chained_req = 0; chained_req < r->next_req_count; chained_req++)
- {
- _starpu_post_data_request(r->next_req[chained_req], r->next_req[chained_req]->handling_node);
- }
- r->completed = 1;
-
- handle->per_node[dst_node].refcnt--;
- if (r->read)
- handle->per_node[src_node].refcnt--;
- r->refcnt--;
- /* if nobody is waiting on that request, we can get rid of it */
- if (r->refcnt == 0)
- do_delete = 1;
-
- r->retval = 0;
- _starpu_spin_unlock(&r->lock);
- if (do_delete)
- starpu_data_request_destroy(r);
- _starpu_spin_unlock(&handle->header_lock);
- }
- /* TODO : accounting to see how much time was spent working for other people ... */
- static int starpu_handle_data_request(starpu_data_request_t r, unsigned may_alloc)
- {
- starpu_data_handle handle = r->handle;
- _starpu_spin_lock(&handle->header_lock);
- _starpu_spin_lock(&r->lock);
- if (r->read)
- {
- STARPU_ASSERT(handle->per_node[r->src_node].allocated);
- STARPU_ASSERT(handle->per_node[r->src_node].refcnt);
- }
- /* perform the transfer */
- /* the header of the data must be locked by the worker that submitted the request */
- r->retval = _starpu_driver_copy_data_1_to_1(handle, r->src_node, r->dst_node, !r->read, r, may_alloc);
- if (r->retval == ENOMEM)
- {
- _starpu_spin_unlock(&r->lock);
- _starpu_spin_unlock(&handle->header_lock);
- return ENOMEM;
- }
- if (r->retval == EAGAIN)
- {
- _starpu_spin_unlock(&r->lock);
- _starpu_spin_unlock(&handle->header_lock);
- /* the request is pending and we put it in the corresponding queue */
- PTHREAD_MUTEX_LOCK(&data_requests_pending_list_mutex[r->handling_node]);
- starpu_data_request_list_push_front(data_requests_pending[r->handling_node], r);
- PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[r->handling_node]);
- return EAGAIN;
- }
- /* the request has been handled */
- starpu_handle_data_request_completion(r);
- return 0;
- }
- void _starpu_handle_node_data_requests(uint32_t src_node, unsigned may_alloc)
- {
- /* for all entries of the list */
- starpu_data_request_t r;
- /* take all the entries from the request list */
- PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[src_node]);
- starpu_data_request_list_t local_list = data_requests[src_node];
- if (starpu_data_request_list_empty(local_list))
- {
- /* there is no request */
- PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[src_node]);
- return;
- }
- data_requests[src_node] = starpu_data_request_list_new();
- PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[src_node]);
- while (!starpu_data_request_list_empty(local_list))
- {
- int res;
- r = starpu_data_request_list_pop_back(local_list);
- res = starpu_handle_data_request(r, may_alloc);
- if (res == ENOMEM)
- {
- PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[src_node]);
- starpu_data_request_list_push_front(data_requests[src_node], r);
- PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[src_node]);
- }
- /* wake the requesting worker up */
- // if we do not progress ..
- // pthread_cond_broadcast(&data_requests_list_cond[src_node]);
- }
- starpu_data_request_list_delete(local_list);
- }
- static void _handle_pending_node_data_requests(uint32_t src_node, unsigned force)
- {
- // fprintf(stderr, "_starpu_handle_pending_node_data_requests ...\n");
- PTHREAD_MUTEX_LOCK(&data_requests_pending_list_mutex[src_node]);
- /* for all entries of the list */
- starpu_data_request_list_t local_list = data_requests_pending[src_node];
- data_requests_pending[src_node] = starpu_data_request_list_new();
- PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[src_node]);
- while (!starpu_data_request_list_empty(local_list))
- {
- starpu_data_request_t r;
- r = starpu_data_request_list_pop_back(local_list);
- starpu_data_handle handle = r->handle;
-
- _starpu_spin_lock(&handle->header_lock);
-
- _starpu_spin_lock(&r->lock);
-
- /* wait until the transfer is terminated */
- if (force)
- {
- _starpu_driver_wait_request_completion(&r->async_channel, src_node);
- starpu_handle_data_request_completion(r);
- }
- else {
- if (_starpu_driver_test_request_completion(&r->async_channel, src_node))
- {
-
- starpu_handle_data_request_completion(r);
- }
- else {
- _starpu_spin_unlock(&r->lock);
- _starpu_spin_unlock(&handle->header_lock);
- /* wake the requesting worker up */
- PTHREAD_MUTEX_LOCK(&data_requests_pending_list_mutex[src_node]);
- starpu_data_request_list_push_front(data_requests_pending[src_node], r);
- PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[src_node]);
- }
- }
- }
- starpu_data_request_list_delete(local_list);
- }
- void _starpu_handle_pending_node_data_requests(uint32_t src_node)
- {
- _handle_pending_node_data_requests(src_node, 0);
- }
- void _starpu_handle_all_pending_node_data_requests(uint32_t src_node)
- {
- _handle_pending_node_data_requests(src_node, 1);
- }
- int _starpu_check_that_no_data_request_exists(uint32_t node)
- {
- /* XXX lock that !!! that's a quick'n'dirty test */
- int no_request = starpu_data_request_list_empty(data_requests[node]);
- int no_pending = starpu_data_request_list_empty(data_requests_pending[node]);
- return (no_request && no_pending);
- }
|