|
@@ -62,7 +62,6 @@ void _starpu_deinit_data_request_lists(void)
|
|
|
static void starpu_data_request_destroy(starpu_data_request_t r)
|
|
|
{
|
|
|
r->dst_replicate->request = NULL;
|
|
|
-
|
|
|
starpu_data_request_delete(r);
|
|
|
}
|
|
|
|
|
@@ -82,25 +81,16 @@ starpu_data_request_t _starpu_create_data_request(starpu_data_handle handle,
|
|
|
r->src_replicate = src_replicate;
|
|
|
r->dst_replicate = dst_replicate;
|
|
|
r->mode = mode;
|
|
|
-
|
|
|
r->handling_node = handling_node;
|
|
|
-
|
|
|
r->completed = 0;
|
|
|
r->retval = -1;
|
|
|
-
|
|
|
r->next_req_count = 0;
|
|
|
-
|
|
|
r->callbacks = NULL;
|
|
|
-
|
|
|
r->is_a_prefetch_request = is_prefetch;
|
|
|
|
|
|
- /* associate that request with the handle so that further similar
|
|
|
- * requests will reuse that one */
|
|
|
-
|
|
|
_starpu_spin_lock(&r->lock);
|
|
|
|
|
|
dst_replicate->request = r;
|
|
|
-
|
|
|
dst_replicate->refcnt++;
|
|
|
|
|
|
if (mode & STARPU_R)
|
|
@@ -197,9 +187,7 @@ void _starpu_post_data_request(starpu_data_request_t r, uint32_t handling_node)
|
|
|
|
|
|
/* insert the request in the proper list */
|
|
|
PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[handling_node]);
|
|
|
-
|
|
|
starpu_data_request_list_push_front(data_requests[handling_node], r);
|
|
|
-
|
|
|
PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[handling_node]);
|
|
|
|
|
|
#ifndef STARPU_NON_BLOCKING_DRIVERS
|
|
@@ -229,12 +217,16 @@ static void starpu_handle_data_request_completion(starpu_data_request_t r)
|
|
|
{
|
|
|
unsigned do_delete = 0;
|
|
|
starpu_data_handle handle = r->handle;
|
|
|
+ starpu_access_mode mode = r->mode;
|
|
|
+
|
|
|
+ struct starpu_data_replicate_s *src_replicate = r->src_replicate;
|
|
|
+ struct starpu_data_replicate_s *dst_replicate = r->dst_replicate;
|
|
|
|
|
|
- _starpu_update_data_state(handle, r->dst_replicate, r->mode);
|
|
|
+ _starpu_update_data_state(handle, r->dst_replicate, mode);
|
|
|
|
|
|
#ifdef STARPU_USE_FXT
|
|
|
- uint32_t src_node = r->src_replicate->memory_node;
|
|
|
- uint32_t dst_node = r->dst_replicate->memory_node;
|
|
|
+ uint32_t src_node = src_replicate->memory_node;
|
|
|
+ uint32_t dst_node = dst_replicate->memory_node;
|
|
|
size_t size = _starpu_data_get_size(handle);
|
|
|
STARPU_TRACE_END_DRIVER_COPY(src_node, dst_node, size, r->com_id);
|
|
|
#endif
|
|
@@ -244,15 +236,18 @@ static void starpu_handle_data_request_completion(starpu_data_request_t r)
|
|
|
unsigned chained_req;
|
|
|
for (chained_req = 0; chained_req < r->next_req_count; chained_req++)
|
|
|
{
|
|
|
- _starpu_post_data_request(r->next_req[chained_req], r->next_req[chained_req]->handling_node);
|
|
|
+ struct starpu_data_request_s *next_req = r->next_req[chained_req];
|
|
|
+ _starpu_post_data_request(next_req, next_req->handling_node);
|
|
|
}
|
|
|
|
|
|
r->completed = 1;
|
|
|
|
|
|
- r->dst_replicate->refcnt--;
|
|
|
+ /* Remove a reference on the destination replicate */
|
|
|
+ dst_replicate->refcnt--;
|
|
|
|
|
|
- if (r->mode & STARPU_R)
|
|
|
- r->src_replicate->refcnt--;
|
|
|
+ /* In case the source was "locked" by the request too */
|
|
|
+ if (mode & (STARPU_R|STARPU_REDUX))
|
|
|
+ src_replicate->refcnt--;
|
|
|
|
|
|
r->refcnt--;
|
|
|
|
|
@@ -290,23 +285,26 @@ static int starpu_handle_data_request(starpu_data_request_t r, unsigned may_allo
|
|
|
starpu_data_handle handle = r->handle;
|
|
|
|
|
|
_starpu_spin_lock(&handle->header_lock);
|
|
|
-
|
|
|
_starpu_spin_lock(&r->lock);
|
|
|
|
|
|
- if (r->mode & STARPU_R)
|
|
|
- {
|
|
|
- STARPU_ASSERT(r->src_replicate);
|
|
|
- STARPU_ASSERT(r->src_replicate->allocated);
|
|
|
- STARPU_ASSERT(r->src_replicate->refcnt);
|
|
|
- }
|
|
|
+ struct starpu_data_replicate_s *src_replicate = r->src_replicate;
|
|
|
+ struct starpu_data_replicate_s *dst_replicate = r->dst_replicate;
|
|
|
+
|
|
|
+ starpu_access_mode r_mode = r->mode;
|
|
|
+
|
|
|
+ STARPU_ASSERT(!(r_mode & STARPU_R) || src_replicate);
|
|
|
+ STARPU_ASSERT(!(r_mode & STARPU_R) || src_replicate->allocated);
|
|
|
+ STARPU_ASSERT(!(r_mode & STARPU_R) || src_replicate->refcnt);
|
|
|
|
|
|
/* perform the transfer */
|
|
|
/* the header of the data must be locked by the worker that submitted the request */
|
|
|
- r->retval = _starpu_driver_copy_data_1_to_1(handle, r->src_replicate,
|
|
|
- r->dst_replicate, !(r->mode & STARPU_R), r, may_alloc);
|
|
|
+ r->retval = _starpu_driver_copy_data_1_to_1(handle, src_replicate,
|
|
|
+ dst_replicate, !(r_mode & STARPU_R), r, may_alloc);
|
|
|
|
|
|
if (r->retval == -ENOMEM)
|
|
|
{
|
|
|
+ /* If there was not enough memory, we will try to redo the
|
|
|
+ * request later. */
|
|
|
_starpu_spin_unlock(&r->lock);
|
|
|
_starpu_spin_unlock(&handle->header_lock);
|
|
|
|
|
@@ -315,10 +313,13 @@ static int starpu_handle_data_request(starpu_data_request_t r, unsigned may_allo
|
|
|
|
|
|
if (r->retval == -EAGAIN)
|
|
|
{
|
|
|
+ /* The request was successful, but could not be terminted
|
|
|
+ * immediatly. We will handle the completion of the request
|
|
|
+ * asynchronously. The request is put in the list of "pending"
|
|
|
+ * requests in the meantime. */
|
|
|
_starpu_spin_unlock(&r->lock);
|
|
|
_starpu_spin_unlock(&handle->header_lock);
|
|
|
|
|
|
- /* the request is pending and we put it in the corresponding queue */
|
|
|
PTHREAD_MUTEX_LOCK(&data_requests_pending_list_mutex[r->handling_node]);
|
|
|
starpu_data_request_list_push_front(data_requests_pending[r->handling_node], r);
|
|
|
PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[r->handling_node]);
|
|
@@ -334,7 +335,6 @@ static int starpu_handle_data_request(starpu_data_request_t r, unsigned may_allo
|
|
|
|
|
|
void _starpu_handle_node_data_requests(uint32_t src_node, unsigned may_alloc)
|
|
|
{
|
|
|
- /* for all entries of the list */
|
|
|
starpu_data_request_t r;
|
|
|
|
|
|
/* take all the entries from the request list */
|
|
@@ -350,10 +350,14 @@ void _starpu_handle_node_data_requests(uint32_t src_node, unsigned may_alloc)
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
+ /* There is an entry: we create a new empty list to replace the list of
|
|
|
+ * requests, and we handle the request(s) one by one in the former
|
|
|
+ * list, without concurrency issues.*/
|
|
|
data_requests[src_node] = starpu_data_request_list_new();
|
|
|
|
|
|
PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[src_node]);
|
|
|
|
|
|
+ /* for all entries of the list */
|
|
|
while (!starpu_data_request_list_empty(local_list))
|
|
|
{
|
|
|
int res;
|
|
@@ -364,9 +368,7 @@ void _starpu_handle_node_data_requests(uint32_t src_node, unsigned may_alloc)
|
|
|
if (res == -ENOMEM)
|
|
|
{
|
|
|
PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[src_node]);
|
|
|
-
|
|
|
starpu_data_request_list_push_front(data_requests[src_node], r);
|
|
|
-
|
|
|
PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[src_node]);
|
|
|
}
|
|
|
|
|
@@ -410,14 +412,16 @@ static void _handle_pending_node_data_requests(uint32_t src_node, unsigned force
|
|
|
else {
|
|
|
if (_starpu_driver_test_request_completion(&r->async_channel, src_node))
|
|
|
{
|
|
|
-
|
|
|
+ /* The request was completed */
|
|
|
starpu_handle_data_request_completion(r);
|
|
|
}
|
|
|
else {
|
|
|
+ /* The request was not completed, so we put it
|
|
|
+ * back again on the list of pending requests
|
|
|
+ * so that it can be handled later on. */
|
|
|
_starpu_spin_unlock(&r->lock);
|
|
|
_starpu_spin_unlock(&handle->header_lock);
|
|
|
|
|
|
- /* wake the requesting worker up */
|
|
|
PTHREAD_MUTEX_LOCK(&data_requests_pending_list_mutex[src_node]);
|
|
|
starpu_data_request_list_push_front(data_requests_pending[src_node], r);
|
|
|
PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[src_node]);
|