|
@@ -25,57 +25,67 @@
|
|
|
#include <core/simgrid.h>
|
|
|
|
|
|
/* requests that have not been treated at all */
|
|
|
-#ifdef STARPU_DEVEL
|
|
|
-#warning split into separate out/in queues for each node, so that MAX_PENDING_REQUESTS_PER_NODE is separate for them, since the links are bidirectionnal
|
|
|
-#endif
|
|
|
-static struct _starpu_data_request_prio_list data_requests[STARPU_MAXNODES];
|
|
|
-static struct _starpu_data_request_prio_list prefetch_requests[STARPU_MAXNODES]; /* Contains both task_prefetch and prefetch */
|
|
|
-static struct _starpu_data_request_prio_list idle_requests[STARPU_MAXNODES];
|
|
|
-static starpu_pthread_mutex_t data_requests_list_mutex[STARPU_MAXNODES];
|
|
|
+static struct _starpu_data_request_prio_list data_requests[STARPU_MAXNODES][STARPU_MAXNODES][2];
|
|
|
+static struct _starpu_data_request_prio_list prefetch_requests[STARPU_MAXNODES][STARPU_MAXNODES][2]; /* Contains both task_prefetch and prefetch */
|
|
|
+static struct _starpu_data_request_prio_list idle_requests[STARPU_MAXNODES][STARPU_MAXNODES][2];
|
|
|
+static starpu_pthread_mutex_t data_requests_list_mutex[STARPU_MAXNODES][STARPU_MAXNODES][2];
|
|
|
|
|
|
/* requests that are not terminated (eg. async transfers) */
|
|
|
-static struct _starpu_data_request_prio_list data_requests_pending[STARPU_MAXNODES];
|
|
|
-static unsigned data_requests_npending[STARPU_MAXNODES];
|
|
|
-static starpu_pthread_mutex_t data_requests_pending_list_mutex[STARPU_MAXNODES];
|
|
|
+static struct _starpu_data_request_prio_list data_requests_pending[STARPU_MAXNODES][STARPU_MAXNODES][2];
|
|
|
+static unsigned data_requests_npending[STARPU_MAXNODES][STARPU_MAXNODES][2];
|
|
|
+static starpu_pthread_mutex_t data_requests_pending_list_mutex[STARPU_MAXNODES][STARPU_MAXNODES][2];
|
|
|
|
|
|
void _starpu_init_data_request_lists(void)
|
|
|
{
|
|
|
- unsigned i;
|
|
|
+ unsigned i, j;
|
|
|
+ enum _starpu_data_request_inout k;
|
|
|
for (i = 0; i < STARPU_MAXNODES; i++)
|
|
|
{
|
|
|
- _starpu_data_request_prio_list_init(&data_requests[i]);
|
|
|
- _starpu_data_request_prio_list_init(&prefetch_requests[i]);
|
|
|
- _starpu_data_request_prio_list_init(&idle_requests[i]);
|
|
|
+ for (j = 0; j < STARPU_MAXNODES; j++)
|
|
|
+ {
|
|
|
+ for (k = _STARPU_DATA_REQUEST_IN; k <= _STARPU_DATA_REQUEST_OUT; k++)
|
|
|
+ {
|
|
|
+ _starpu_data_request_prio_list_init(&data_requests[i][j][k]);
|
|
|
+ _starpu_data_request_prio_list_init(&prefetch_requests[i][j][k]);
|
|
|
+ _starpu_data_request_prio_list_init(&idle_requests[i][j][k]);
|
|
|
|
|
|
#ifndef STARPU_DEBUG
|
|
|
- /* Tell helgrind that we are fine with checking for list_empty
|
|
|
- * in _starpu_handle_node_data_requests, we will call it
|
|
|
- * periodically anyway */
|
|
|
- STARPU_HG_DISABLE_CHECKING(data_requests[i].tree.root);
|
|
|
- STARPU_HG_DISABLE_CHECKING(prefetch_requests[i].tree.root);
|
|
|
- STARPU_HG_DISABLE_CHECKING(idle_requests[i].tree.root);
|
|
|
+ /* Tell helgrind that we are fine with checking for list_empty
|
|
|
+ * in _starpu_handle_node_data_requests, we will call it
|
|
|
+ * periodically anyway */
|
|
|
+ STARPU_HG_DISABLE_CHECKING(data_requests[i][j][k].tree.root);
|
|
|
+ STARPU_HG_DISABLE_CHECKING(prefetch_requests[i][j][k].tree.root);
|
|
|
+ STARPU_HG_DISABLE_CHECKING(idle_requests[i][j][k].tree.root);
|
|
|
#endif
|
|
|
+ _starpu_data_request_prio_list_init(&data_requests_pending[i][j][k]);
|
|
|
+ data_requests_npending[i][j][k] = 0;
|
|
|
|
|
|
- STARPU_PTHREAD_MUTEX_INIT(&data_requests_list_mutex[i], NULL);
|
|
|
-
|
|
|
- _starpu_data_request_prio_list_init(&data_requests_pending[i]);
|
|
|
- data_requests_npending[i] = 0;
|
|
|
- STARPU_PTHREAD_MUTEX_INIT(&data_requests_pending_list_mutex[i], NULL);
|
|
|
+ STARPU_PTHREAD_MUTEX_INIT(&data_requests_list_mutex[i][j][k], NULL);
|
|
|
+ STARPU_PTHREAD_MUTEX_INIT(&data_requests_pending_list_mutex[i][j][k], NULL);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
STARPU_HG_DISABLE_CHECKING(data_requests_npending);
|
|
|
}
|
|
|
|
|
|
void _starpu_deinit_data_request_lists(void)
|
|
|
{
|
|
|
- unsigned i;
|
|
|
+ unsigned i, j;
|
|
|
+ enum _starpu_data_request_inout k;
|
|
|
for (i = 0; i < STARPU_MAXNODES; i++)
|
|
|
{
|
|
|
- _starpu_data_request_prio_list_deinit(&data_requests[i]);
|
|
|
- _starpu_data_request_prio_list_deinit(&prefetch_requests[i]);
|
|
|
- _starpu_data_request_prio_list_deinit(&idle_requests[i]);
|
|
|
- STARPU_PTHREAD_MUTEX_DESTROY(&data_requests_pending_list_mutex[i]);
|
|
|
- _starpu_data_request_prio_list_deinit(&data_requests_pending[i]);
|
|
|
- STARPU_PTHREAD_MUTEX_DESTROY(&data_requests_list_mutex[i]);
|
|
|
+ for (j = 0; j < STARPU_MAXNODES; j++)
|
|
|
+ {
|
|
|
+ for (k = _STARPU_DATA_REQUEST_IN; k <= _STARPU_DATA_REQUEST_OUT; k++)
|
|
|
+ {
|
|
|
+ _starpu_data_request_prio_list_deinit(&data_requests[i][j][k]);
|
|
|
+ _starpu_data_request_prio_list_deinit(&prefetch_requests[i][j][k]);
|
|
|
+ _starpu_data_request_prio_list_deinit(&idle_requests[i][j][k]);
|
|
|
+ _starpu_data_request_prio_list_deinit(&data_requests_pending[i][j][k]);
|
|
|
+ STARPU_PTHREAD_MUTEX_DESTROY(&data_requests_pending_list_mutex[i][j][k]);
|
|
|
+ STARPU_PTHREAD_MUTEX_DESTROY(&data_requests_list_mutex[i][j][k]);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -92,23 +102,39 @@ static void _starpu_data_request_unlink(struct _starpu_data_request *r)
|
|
|
STARPU_ASSERT(r->mode == STARPU_W);
|
|
|
r->handle->write_invalidation_req = NULL;
|
|
|
}
|
|
|
- else if (r->mode & STARPU_R)
|
|
|
- {
|
|
|
- /* If this is a read request, we store the pending requests
|
|
|
- * between src and dst. */
|
|
|
- unsigned node = r->src_replicate->memory_node;
|
|
|
- STARPU_ASSERT(r->dst_replicate->request[node] == r);
|
|
|
- r->dst_replicate->request[node] = NULL;
|
|
|
- }
|
|
|
else
|
|
|
{
|
|
|
- /* If this is a write only request, then there is no source and
|
|
|
- * we use the destination node to cache the request. */
|
|
|
- unsigned node = r->dst_replicate->memory_node;
|
|
|
- STARPU_ASSERT(r->dst_replicate->request[node] == r);
|
|
|
- r->dst_replicate->request[node] = NULL;
|
|
|
- }
|
|
|
+ unsigned node;
|
|
|
+ struct _starpu_data_request **prevp, *prev;
|
|
|
+
|
|
|
+ if (r->mode & STARPU_R)
|
|
|
+ /* If this is a read request, we store the pending requests
|
|
|
+ * between src and dst. */
|
|
|
+ node = r->src_replicate->memory_node;
|
|
|
+ else
|
|
|
+ /* If this is a write only request, then there is no source and
|
|
|
+ * we use the destination node to cache the request. */
|
|
|
+ node = r->dst_replicate->memory_node;
|
|
|
+
|
|
|
+ /* Look for ourself in the list, we should be not very far. */
|
|
|
+ for (prevp = &r->dst_replicate->request[node], prev = NULL;
|
|
|
+ *prevp && *prevp != r;
|
|
|
+ prev = *prevp, prevp = &prev->next_same_req)
|
|
|
+ ;
|
|
|
|
|
|
+ STARPU_ASSERT(*prevp == r);
|
|
|
+ *prevp = r->next_same_req;
|
|
|
+
|
|
|
+ if (!r->next_same_req)
|
|
|
+ {
|
|
|
+ /* I was last */
|
|
|
+ STARPU_ASSERT(r->dst_replicate->last_request[node] == r);
|
|
|
+ if (prev)
|
|
|
+ r->dst_replicate->last_request[node] = prev;
|
|
|
+ else
|
|
|
+ r->dst_replicate->last_request[node] = NULL;
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
static void _starpu_data_request_destroy(struct _starpu_data_request *r)
|
|
@@ -124,6 +150,7 @@ struct _starpu_data_request *_starpu_create_data_request(starpu_data_handle_t ha
|
|
|
int handling_node,
|
|
|
enum starpu_data_access_mode mode,
|
|
|
unsigned ndeps,
|
|
|
+ struct starpu_task *task,
|
|
|
enum starpu_is_prefetch is_prefetch,
|
|
|
int prio,
|
|
|
unsigned is_write_invalidation,
|
|
@@ -135,7 +162,7 @@ struct _starpu_data_request *_starpu_create_data_request(starpu_data_handle_t ha
|
|
|
|
|
|
_starpu_spin_init(&r->lock);
|
|
|
|
|
|
- _STARPU_TRACE_DATA_REQUEST_CREATED(handle, src_replicate?src_replicate->memory_node:-1, dst_replicate?dst_replicate->memory_node:-1, prio, is_prefetch);
|
|
|
+ _STARPU_TRACE_DATA_REQUEST_CREATED(handle, src_replicate?src_replicate->memory_node:-1, dst_replicate?dst_replicate->memory_node:-1, prio, is_prefetch, r);
|
|
|
|
|
|
r->origin = origin;
|
|
|
r->handle = handle;
|
|
@@ -153,22 +180,48 @@ struct _starpu_data_request *_starpu_create_data_request(starpu_data_handle_t ha
|
|
|
if (handling_node == -1)
|
|
|
handling_node = STARPU_MAIN_RAM;
|
|
|
r->handling_node = handling_node;
|
|
|
+ if (is_write_invalidation)
|
|
|
+ {
|
|
|
+ r->peer_node = handling_node;
|
|
|
+ r->inout = _STARPU_DATA_REQUEST_IN;
|
|
|
+ }
|
|
|
+ else if (dst_replicate->memory_node == handling_node)
|
|
|
+ {
|
|
|
+ if (src_replicate)
|
|
|
+ r->peer_node = src_replicate->memory_node;
|
|
|
+ else
|
|
|
+ r->peer_node = handling_node;
|
|
|
+ r->inout = _STARPU_DATA_REQUEST_IN;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ r->peer_node = dst_replicate->memory_node;
|
|
|
+ r->inout = _STARPU_DATA_REQUEST_OUT;
|
|
|
+ }
|
|
|
STARPU_ASSERT(starpu_node_get_kind(handling_node) == STARPU_CPU_RAM || _starpu_memory_node_get_nworkers(handling_node));
|
|
|
r->completed = 0;
|
|
|
+ r->added_ref = 0;
|
|
|
+ r->canceled = 0;
|
|
|
r->prefetch = is_prefetch;
|
|
|
+ r->task = task;
|
|
|
r->nb_tasks_prefetch = 0;
|
|
|
r->prio = prio;
|
|
|
r->retval = -1;
|
|
|
r->ndeps = ndeps;
|
|
|
+ r->next_same_req = NULL;
|
|
|
r->next_req_count = 0;
|
|
|
r->callbacks = NULL;
|
|
|
r->com_id = 0;
|
|
|
|
|
|
_starpu_spin_lock(&r->lock);
|
|
|
|
|
|
- /* Take a reference on the target for the request to be able to write it */
|
|
|
- if (dst_replicate)
|
|
|
+ /* For a fetch, take a reference as soon as now on the target, to avoid
|
|
|
+ * replicate eviction */
|
|
|
+ if (is_prefetch == STARPU_FETCH && dst_replicate)
|
|
|
+ {
|
|
|
+ r->added_ref = 1;
|
|
|
dst_replicate->refcnt++;
|
|
|
+ }
|
|
|
handle->busy_count++;
|
|
|
|
|
|
if (is_write_invalidation)
|
|
@@ -176,20 +229,28 @@ struct _starpu_data_request *_starpu_create_data_request(starpu_data_handle_t ha
|
|
|
STARPU_ASSERT(!handle->write_invalidation_req);
|
|
|
handle->write_invalidation_req = r;
|
|
|
}
|
|
|
- else if (mode & STARPU_R)
|
|
|
- {
|
|
|
- unsigned src_node = src_replicate->memory_node;
|
|
|
- STARPU_ASSERT(!dst_replicate->request[src_node]);
|
|
|
- dst_replicate->request[src_node] = r;
|
|
|
- /* Take a reference on the source for the request to be able to read it */
|
|
|
- src_replicate->refcnt++;
|
|
|
- handle->busy_count++;
|
|
|
- }
|
|
|
else
|
|
|
{
|
|
|
- unsigned dst_node = dst_replicate->memory_node;
|
|
|
- STARPU_ASSERT(!dst_replicate->request[dst_node]);
|
|
|
- dst_replicate->request[dst_node] = r;
|
|
|
+ unsigned node;
|
|
|
+
|
|
|
+ if (mode & STARPU_R)
|
|
|
+ node = src_replicate->memory_node;
|
|
|
+ else
|
|
|
+ node = dst_replicate->memory_node;
|
|
|
+
|
|
|
+ if (!dst_replicate->request[node])
|
|
|
+ dst_replicate->request[node] = r;
|
|
|
+ else
|
|
|
+ dst_replicate->last_request[node]->next_same_req = r;
|
|
|
+ dst_replicate->last_request[node] = r;
|
|
|
+
|
|
|
+ if (mode & STARPU_R)
|
|
|
+ {
|
|
|
+ /* Take a reference on the source for the request to be
|
|
|
+ * able to read it */
|
|
|
+ src_replicate->refcnt++;
|
|
|
+ handle->busy_count++;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
r->refcnt = 1;
|
|
@@ -199,7 +260,7 @@ struct _starpu_data_request *_starpu_create_data_request(starpu_data_handle_t ha
|
|
|
return r;
|
|
|
}
|
|
|
|
|
|
-int _starpu_wait_data_request_completion(struct _starpu_data_request *r, unsigned may_alloc)
|
|
|
+int _starpu_wait_data_request_completion(struct _starpu_data_request *r, enum _starpu_may_alloc may_alloc)
|
|
|
{
|
|
|
int retval;
|
|
|
int do_delete = 0;
|
|
@@ -310,14 +371,14 @@ void _starpu_post_data_request(struct _starpu_data_request *r)
|
|
|
}
|
|
|
|
|
|
/* insert the request in the proper list */
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[handling_node]);
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[handling_node][r->peer_node][r->inout]);
|
|
|
if (r->prefetch >= STARPU_IDLEFETCH)
|
|
|
- _starpu_data_request_prio_list_push_back(&idle_requests[handling_node], r);
|
|
|
+ _starpu_data_request_prio_list_push_back(&idle_requests[handling_node][r->peer_node][r->inout], r);
|
|
|
else if (r->prefetch > STARPU_FETCH)
|
|
|
- _starpu_data_request_prio_list_push_back(&prefetch_requests[handling_node], r);
|
|
|
+ _starpu_data_request_prio_list_push_back(&prefetch_requests[handling_node][r->peer_node][r->inout], r);
|
|
|
else
|
|
|
- _starpu_data_request_prio_list_push_back(&data_requests[handling_node], r);
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[handling_node]);
|
|
|
+ _starpu_data_request_prio_list_push_back(&data_requests[handling_node][r->peer_node][r->inout], r);
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[handling_node][r->peer_node][r->inout]);
|
|
|
|
|
|
#ifndef STARPU_NON_BLOCKING_DRIVERS
|
|
|
_starpu_wake_all_blocked_workers_on_node(handling_node);
|
|
@@ -352,7 +413,7 @@ static void starpu_handle_data_request_completion(struct _starpu_data_request *r
|
|
|
struct _starpu_data_replicate *dst_replicate = r->dst_replicate;
|
|
|
|
|
|
|
|
|
- if (dst_replicate)
|
|
|
+ if (r->canceled < 2 && dst_replicate)
|
|
|
{
|
|
|
#ifdef STARPU_MEMORY_STATS
|
|
|
enum _starpu_cache_state old_src_replicate_state = src_replicate->state;
|
|
@@ -360,6 +421,7 @@ static void starpu_handle_data_request_completion(struct _starpu_data_request *r
|
|
|
|
|
|
_starpu_spin_checklocked(&handle->header_lock);
|
|
|
_starpu_update_data_state(handle, r->dst_replicate, mode);
|
|
|
+ dst_replicate->load_request = NULL;
|
|
|
|
|
|
#ifdef STARPU_MEMORY_STATS
|
|
|
if (src_replicate->state == STARPU_INVALID)
|
|
@@ -382,7 +444,7 @@ static void starpu_handle_data_request_completion(struct _starpu_data_request *r
|
|
|
#endif
|
|
|
}
|
|
|
|
|
|
- if (r->com_id > 0)
|
|
|
+ if (r->canceled < 2 && r->com_id > 0)
|
|
|
{
|
|
|
#ifdef STARPU_USE_FXT
|
|
|
unsigned src_node = src_replicate->memory_node;
|
|
@@ -414,12 +476,15 @@ static void starpu_handle_data_request_completion(struct _starpu_data_request *r
|
|
|
/* Remove a reference on the destination replicate for the request */
|
|
|
if (dst_replicate)
|
|
|
{
|
|
|
- if (dst_replicate->mc)
|
|
|
+ if (r->canceled < 2 && dst_replicate->mc)
|
|
|
/* Make sure it stays there for the task. */
|
|
|
dst_replicate->nb_tasks_prefetch += r->nb_tasks_prefetch;
|
|
|
|
|
|
- STARPU_ASSERT(dst_replicate->refcnt > 0);
|
|
|
- dst_replicate->refcnt--;
|
|
|
+ if (r->added_ref)
|
|
|
+ {
|
|
|
+ STARPU_ASSERT(dst_replicate->refcnt > 0);
|
|
|
+ dst_replicate->refcnt--;
|
|
|
+ }
|
|
|
}
|
|
|
STARPU_ASSERT(handle->busy_count > 0);
|
|
|
handle->busy_count--;
|
|
@@ -467,8 +532,16 @@ static void starpu_handle_data_request_completion(struct _starpu_data_request *r
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+void _starpu_data_request_complete_wait(void *arg)
|
|
|
+{
|
|
|
+ struct _starpu_data_request *r = arg;
|
|
|
+ _starpu_spin_lock(&r->handle->header_lock);
|
|
|
+ _starpu_spin_lock(&r->lock);
|
|
|
+ starpu_handle_data_request_completion(r);
|
|
|
+}
|
|
|
+
|
|
|
/* TODO : accounting to see how much time was spent working for other people ... */
|
|
|
-static int starpu_handle_data_request(struct _starpu_data_request *r, unsigned may_alloc, enum starpu_is_prefetch prefetch)
|
|
|
+static int starpu_handle_data_request(struct _starpu_data_request *r, enum _starpu_may_alloc may_alloc)
|
|
|
{
|
|
|
starpu_data_handle_t handle = r->handle;
|
|
|
|
|
@@ -491,12 +564,50 @@ static int starpu_handle_data_request(struct _starpu_data_request *r, unsigned m
|
|
|
struct _starpu_data_replicate *src_replicate = r->src_replicate;
|
|
|
struct _starpu_data_replicate *dst_replicate = r->dst_replicate;
|
|
|
|
|
|
+ if (r->canceled)
|
|
|
+ {
|
|
|
+ /* Ok, canceled before starting copies etc. */
|
|
|
+ r->canceled = 2;
|
|
|
+ /* Nothing left to do */
|
|
|
+ starpu_handle_data_request_completion(r);
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (dst_replicate)
|
|
|
+ {
|
|
|
+ struct _starpu_data_request *r2 = dst_replicate->load_request;
|
|
|
+ if (r2 && r2 != r)
|
|
|
+ {
|
|
|
+ /* Oh, some other transfer is already loading the value. Just wait for it */
|
|
|
+ r->canceled = 2;
|
|
|
+ _starpu_spin_unlock(&r->lock);
|
|
|
+ _starpu_spin_lock(&r2->lock);
|
|
|
+ _starpu_data_request_append_callback(r2, _starpu_data_request_complete_wait, r);
|
|
|
+ _starpu_spin_unlock(&r2->lock);
|
|
|
+ _starpu_spin_unlock(&handle->header_lock);
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ /* We are loading this replicate.
|
|
|
+ * Note: we might fail to allocate memory, but we will keep on and others will wait for us. */
|
|
|
+ dst_replicate->load_request = r;
|
|
|
+ }
|
|
|
+
|
|
|
enum starpu_data_access_mode r_mode = r->mode;
|
|
|
|
|
|
STARPU_ASSERT(!(r_mode & STARPU_R) || src_replicate);
|
|
|
STARPU_ASSERT(!(r_mode & STARPU_R) || src_replicate->allocated);
|
|
|
STARPU_ASSERT(!(r_mode & STARPU_R) || src_replicate->refcnt);
|
|
|
|
|
|
+ /* For prefetches, we take a reference on the destination only now that
|
|
|
+ * we will really try to fetch the data (instead of in
|
|
|
+ * _starpu_create_data_request) */
|
|
|
+ if (dst_replicate && r->prefetch > STARPU_FETCH)
|
|
|
+ {
|
|
|
+ r->added_ref = 1; /* Note: we might get upgraded while trying to allocate */
|
|
|
+ dst_replicate->refcnt++;
|
|
|
+ }
|
|
|
+
|
|
|
_starpu_spin_unlock(&r->lock);
|
|
|
|
|
|
/* FIXME: the request may get upgraded from here to freeing it... */
|
|
@@ -507,7 +618,7 @@ static int starpu_handle_data_request(struct _starpu_data_request *r, unsigned m
|
|
|
|
|
|
if (dst_replicate && dst_replicate->state == STARPU_INVALID)
|
|
|
r->retval = _starpu_driver_copy_data_1_to_1(handle, src_replicate,
|
|
|
- dst_replicate, !(r_mode & STARPU_R), r, may_alloc, prefetch);
|
|
|
+ dst_replicate, !(r_mode & STARPU_R), r, may_alloc, r->prefetch);
|
|
|
else
|
|
|
/* Already valid actually, no need to transfer anything */
|
|
|
r->retval = 0;
|
|
@@ -516,6 +627,15 @@ static int starpu_handle_data_request(struct _starpu_data_request *r, unsigned m
|
|
|
{
|
|
|
/* If there was not enough memory, we will try to redo the
|
|
|
* request later. */
|
|
|
+
|
|
|
+ if (r->prefetch > STARPU_FETCH)
|
|
|
+ {
|
|
|
+ STARPU_ASSERT(r->added_ref);
|
|
|
+ /* Drop ref until next try */
|
|
|
+ r->added_ref = 0;
|
|
|
+ dst_replicate->refcnt--;
|
|
|
+ }
|
|
|
+
|
|
|
_starpu_spin_unlock(&handle->header_lock);
|
|
|
return -ENOMEM;
|
|
|
}
|
|
@@ -528,10 +648,10 @@ static int starpu_handle_data_request(struct _starpu_data_request *r, unsigned m
|
|
|
* requests in the meantime. */
|
|
|
_starpu_spin_unlock(&handle->header_lock);
|
|
|
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&data_requests_pending_list_mutex[r->handling_node]);
|
|
|
- _starpu_data_request_prio_list_push_back(&data_requests_pending[r->handling_node], r);
|
|
|
- data_requests_npending[r->handling_node]++;
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[r->handling_node]);
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&data_requests_pending_list_mutex[r->handling_node][r->peer_node][r->inout]);
|
|
|
+ _starpu_data_request_prio_list_push_back(&data_requests_pending[r->handling_node][r->peer_node][r->inout], r);
|
|
|
+ data_requests_npending[r->handling_node][r->peer_node][r->inout]++;
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[r->handling_node][r->peer_node][r->inout]);
|
|
|
|
|
|
return -EAGAIN;
|
|
|
}
|
|
@@ -543,10 +663,9 @@ static int starpu_handle_data_request(struct _starpu_data_request *r, unsigned m
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
|
-static int __starpu_handle_node_data_requests(struct _starpu_data_request_prio_list *reqlist, unsigned src_node, unsigned may_alloc, unsigned n, unsigned *pushed, enum starpu_is_prefetch prefetch)
|
|
|
+static int __starpu_handle_node_data_requests(struct _starpu_data_request_prio_list reqlist[STARPU_MAXNODES][STARPU_MAXNODES][2], unsigned handling_node, unsigned peer_node, enum _starpu_data_request_inout inout, enum _starpu_may_alloc may_alloc, unsigned n, unsigned *pushed, enum starpu_is_prefetch prefetch)
|
|
|
{
|
|
|
struct _starpu_data_request *r;
|
|
|
- struct _starpu_data_request_prio_list new_data_requests[prefetch + 1]; /* Indexed by prefetch level */
|
|
|
unsigned i;
|
|
|
int ret = 0;
|
|
|
|
|
@@ -556,48 +675,55 @@ static int __starpu_handle_node_data_requests(struct _starpu_data_request_prio_l
|
|
|
/* This is racy, but not posing problems actually, since we know we
|
|
|
* will come back here to probe again regularly anyway.
|
|
|
* Thus, do not expose this optimization to helgrind */
|
|
|
- if (!STARPU_RUNNING_ON_VALGRIND && _starpu_data_request_prio_list_empty(&reqlist[src_node]))
|
|
|
+ if (!STARPU_RUNNING_ON_VALGRIND && _starpu_data_request_prio_list_empty(&reqlist[handling_node][peer_node][inout]))
|
|
|
return 0;
|
|
|
#endif
|
|
|
|
|
|
- /* TODO optimize */
|
|
|
+ /* We create a new list to pickup some requests from the main list, and
|
|
|
+ * we handle the request(s) one by one from it, without concurrency issues.
|
|
|
+ */
|
|
|
+ struct _starpu_data_request_list local_list, remain_list;
|
|
|
+ _starpu_data_request_list_init(&local_list);
|
|
|
|
|
|
#ifdef STARPU_NON_BLOCKING_DRIVERS
|
|
|
/* take all the entries from the request list */
|
|
|
- if (STARPU_PTHREAD_MUTEX_TRYLOCK(&data_requests_list_mutex[src_node]))
|
|
|
+ if (STARPU_PTHREAD_MUTEX_TRYLOCK(&data_requests_list_mutex[handling_node][peer_node][inout]))
|
|
|
{
|
|
|
/* List is busy, do not bother with it */
|
|
|
return -EBUSY;
|
|
|
}
|
|
|
#else
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[src_node]);
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[handling_node][peer_node][inout]);
|
|
|
#endif
|
|
|
|
|
|
- if (_starpu_data_request_prio_list_empty(&reqlist[src_node]))
|
|
|
+ for (i = data_requests_npending[handling_node][peer_node][inout];
|
|
|
+ i < n && ! _starpu_data_request_prio_list_empty(&reqlist[handling_node][peer_node][inout]);
|
|
|
+ i++)
|
|
|
{
|
|
|
- /* there is no request */
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[src_node]);
|
|
|
- return 0;
|
|
|
+ r = _starpu_data_request_prio_list_pop_front_highest(&reqlist[handling_node][peer_node][inout]);
|
|
|
+ _starpu_data_request_list_push_back(&local_list, r);
|
|
|
}
|
|
|
|
|
|
- /* There is an entry: we create a new empty list to replace the list of
|
|
|
- * requests, and we handle the request(s) one by one in the former
|
|
|
- * list, without concurrency issues.*/
|
|
|
- struct _starpu_data_request_prio_list local_list = reqlist[src_node];
|
|
|
- _starpu_data_request_prio_list_init(&reqlist[src_node]);
|
|
|
+ if (!_starpu_data_request_prio_list_empty(&reqlist[handling_node][peer_node][inout]))
|
|
|
+ /* We have left some requests */
|
|
|
+ ret = -EBUSY;
|
|
|
+
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[handling_node][peer_node][inout]);
|
|
|
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[src_node]);
|
|
|
+ if (_starpu_data_request_list_empty(&local_list))
|
|
|
+ /* there is no request */
|
|
|
+ return 0;
|
|
|
|
|
|
- for (i = 0; i <= prefetch; i++)
|
|
|
- _starpu_data_request_prio_list_init(&new_data_requests[i]);
|
|
|
+ /* This will contain the remaining requests */
|
|
|
+ _starpu_data_request_list_init(&remain_list);
|
|
|
|
|
|
double start = starpu_timing_now();
|
|
|
/* for all entries of the list */
|
|
|
- while (!_starpu_data_request_prio_list_empty(&local_list))
|
|
|
+ while (!_starpu_data_request_list_empty(&local_list))
|
|
|
{
|
|
|
int res;
|
|
|
|
|
|
- if (data_requests_npending[src_node] >= n)
|
|
|
+ if (data_requests_npending[handling_node][peer_node][inout] >= n)
|
|
|
{
|
|
|
/* Too many requests at the same time, skip pushing
|
|
|
* more for now */
|
|
@@ -605,21 +731,22 @@ static int __starpu_handle_node_data_requests(struct _starpu_data_request_prio_l
|
|
|
break;
|
|
|
}
|
|
|
|
|
|
- r = _starpu_data_request_prio_list_pop_front_highest(&local_list);
|
|
|
+ r = _starpu_data_request_list_pop_front(&local_list);
|
|
|
|
|
|
- res = starpu_handle_data_request(r, may_alloc, prefetch);
|
|
|
+ res = starpu_handle_data_request(r, may_alloc);
|
|
|
if (res != 0 && res != -EAGAIN)
|
|
|
{
|
|
|
/* handle is busy, or not enough memory, postpone for now */
|
|
|
ret = res;
|
|
|
/* Prefetch requests might have gotten promoted while in tmp list */
|
|
|
- _starpu_data_request_prio_list_push_back(&new_data_requests[r->prefetch], r);
|
|
|
+ _starpu_data_request_list_push_back(&remain_list, r);
|
|
|
if (prefetch > STARPU_FETCH)
|
|
|
/* Prefetching more there would make the situation even worse */
|
|
|
break;
|
|
|
}
|
|
|
+ else
|
|
|
+ (*pushed)++;
|
|
|
|
|
|
- (*pushed)++;
|
|
|
if (starpu_timing_now() - start >= MAX_PUSH_TIME)
|
|
|
{
|
|
|
/* We have spent a lot of time doing requests, skip pushing more for now */
|
|
@@ -628,43 +755,23 @@ static int __starpu_handle_node_data_requests(struct _starpu_data_request_prio_l
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /* Push back requests we didn't handle on the proper list */
|
|
|
- while (!_starpu_data_request_prio_list_empty(&local_list))
|
|
|
- {
|
|
|
- r = _starpu_data_request_prio_list_pop_front_highest(&local_list);
|
|
|
- /* Prefetch requests might have gotten promoted while in tmp list */
|
|
|
- _starpu_data_request_prio_list_push_back(&new_data_requests[r->prefetch], r);
|
|
|
- }
|
|
|
- _starpu_data_request_prio_list_deinit(&local_list);
|
|
|
-
|
|
|
- for (i = 0; i <= prefetch; i++)
|
|
|
- if (!_starpu_data_request_prio_list_empty(&new_data_requests[i]))
|
|
|
- break;
|
|
|
+ /* Gather remainder */
|
|
|
+ _starpu_data_request_list_push_list_back(&remain_list, &local_list);
|
|
|
|
|
|
- if (i <= prefetch)
|
|
|
+ if (!_starpu_data_request_list_empty(&remain_list))
|
|
|
{
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[src_node]);
|
|
|
- if (!(_starpu_data_request_prio_list_empty(&new_data_requests[STARPU_FETCH])))
|
|
|
- {
|
|
|
- _starpu_data_request_prio_list_push_prio_list_back(&new_data_requests[STARPU_FETCH], &data_requests[src_node]);
|
|
|
- data_requests[src_node] = new_data_requests[STARPU_FETCH];
|
|
|
- }
|
|
|
- if (prefetch >= STARPU_TASK_PREFETCH && !(_starpu_data_request_prio_list_empty(&new_data_requests[STARPU_TASK_PREFETCH])))
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[handling_node][peer_node][inout]);
|
|
|
+ while (!_starpu_data_request_list_empty(&remain_list))
|
|
|
{
|
|
|
- _starpu_data_request_prio_list_push_prio_list_back(&new_data_requests[STARPU_TASK_PREFETCH], &prefetch_requests[src_node]);
|
|
|
- prefetch_requests[src_node] = new_data_requests[STARPU_TASK_PREFETCH];
|
|
|
- }
|
|
|
- if (prefetch >= STARPU_PREFETCH && !(_starpu_data_request_prio_list_empty(&new_data_requests[STARPU_PREFETCH])))
|
|
|
- {
|
|
|
- _starpu_data_request_prio_list_push_prio_list_back(&new_data_requests[STARPU_PREFETCH], &prefetch_requests[src_node]);
|
|
|
- prefetch_requests[src_node] = new_data_requests[STARPU_PREFETCH];
|
|
|
- }
|
|
|
- if (prefetch >= STARPU_IDLEFETCH && !(_starpu_data_request_prio_list_empty(&new_data_requests[STARPU_IDLEFETCH])))
|
|
|
- {
|
|
|
- _starpu_data_request_prio_list_push_prio_list_back(&new_data_requests[STARPU_IDLEFETCH], &idle_requests[src_node]);
|
|
|
- idle_requests[src_node] = new_data_requests[STARPU_IDLEFETCH];
|
|
|
+ r = _starpu_data_request_list_pop_back(&remain_list);
|
|
|
+ if (r->prefetch >= STARPU_IDLEFETCH)
|
|
|
+ _starpu_data_request_prio_list_push_front(&idle_requests[handling_node][r->peer_node][r->inout], r);
|
|
|
+ else if (r->prefetch > STARPU_FETCH)
|
|
|
+ _starpu_data_request_prio_list_push_front(&prefetch_requests[handling_node][r->peer_node][r->inout], r);
|
|
|
+ else
|
|
|
+ _starpu_data_request_prio_list_push_front(&data_requests[handling_node][r->peer_node][r->inout], r);
|
|
|
}
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[src_node]);
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[handling_node][peer_node][inout]);
|
|
|
|
|
|
#ifdef STARPU_SIMGRID
|
|
|
if (*pushed)
|
|
@@ -676,32 +783,32 @@ static int __starpu_handle_node_data_requests(struct _starpu_data_request_prio_l
|
|
|
* for eviction to happen.
|
|
|
*/
|
|
|
starpu_sleep(0.000001);
|
|
|
- _starpu_wake_all_blocked_workers_on_node(src_node);
|
|
|
+ _starpu_wake_all_blocked_workers_on_node(handling_node);
|
|
|
}
|
|
|
#elif !defined(STARPU_NON_BLOCKING_DRIVERS)
|
|
|
- _starpu_wake_all_blocked_workers_on_node(src_node);
|
|
|
+ _starpu_wake_all_blocked_workers_on_node(handling_node);
|
|
|
#endif
|
|
|
}
|
|
|
|
|
|
return ret;
|
|
|
}
|
|
|
|
|
|
-int _starpu_handle_node_data_requests(unsigned src_node, unsigned may_alloc, unsigned *pushed)
|
|
|
+int _starpu_handle_node_data_requests(unsigned handling_node, unsigned peer_node, enum _starpu_data_request_inout inout, enum _starpu_may_alloc may_alloc, unsigned *pushed)
|
|
|
{
|
|
|
- return __starpu_handle_node_data_requests(data_requests, src_node, may_alloc, MAX_PENDING_REQUESTS_PER_NODE, pushed, STARPU_FETCH);
|
|
|
+ return __starpu_handle_node_data_requests(data_requests, handling_node, peer_node, inout, may_alloc, MAX_PENDING_REQUESTS_PER_NODE, pushed, STARPU_FETCH);
|
|
|
}
|
|
|
|
|
|
-int _starpu_handle_node_prefetch_requests(unsigned src_node, unsigned may_alloc, unsigned *pushed)
|
|
|
+int _starpu_handle_node_prefetch_requests(unsigned handling_node, unsigned peer_node, enum _starpu_data_request_inout inout, enum _starpu_may_alloc may_alloc, unsigned *pushed)
|
|
|
{
|
|
|
- return __starpu_handle_node_data_requests(prefetch_requests, src_node, may_alloc, MAX_PENDING_PREFETCH_REQUESTS_PER_NODE, pushed, STARPU_PREFETCH);
|
|
|
+ return __starpu_handle_node_data_requests(prefetch_requests, handling_node, peer_node, inout, may_alloc, MAX_PENDING_PREFETCH_REQUESTS_PER_NODE, pushed, STARPU_PREFETCH);
|
|
|
}
|
|
|
|
|
|
-int _starpu_handle_node_idle_requests(unsigned src_node, unsigned may_alloc, unsigned *pushed)
|
|
|
+int _starpu_handle_node_idle_requests(unsigned handling_node, unsigned peer_node, enum _starpu_data_request_inout inout, enum _starpu_may_alloc may_alloc, unsigned *pushed)
|
|
|
{
|
|
|
- return __starpu_handle_node_data_requests(idle_requests, src_node, may_alloc, MAX_PENDING_IDLE_REQUESTS_PER_NODE, pushed, STARPU_IDLEFETCH);
|
|
|
+ return __starpu_handle_node_data_requests(idle_requests, handling_node, peer_node, inout, may_alloc, MAX_PENDING_IDLE_REQUESTS_PER_NODE, pushed, STARPU_IDLEFETCH);
|
|
|
}
|
|
|
|
|
|
-static int _handle_pending_node_data_requests(unsigned src_node, unsigned force)
|
|
|
+static int _handle_pending_node_data_requests(unsigned handling_node, unsigned peer_node, enum _starpu_data_request_inout inout, unsigned force)
|
|
|
{
|
|
|
// _STARPU_DEBUG("_starpu_handle_pending_node_data_requests ...\n");
|
|
|
//
|
|
@@ -712,14 +819,14 @@ static int _handle_pending_node_data_requests(unsigned src_node, unsigned force)
|
|
|
/* Here helgrind would should that this is an un protected access.
|
|
|
* We however don't care about missing an entry, we will get called
|
|
|
* again sooner or later. */
|
|
|
- if (!STARPU_RUNNING_ON_VALGRIND && _starpu_data_request_prio_list_empty(&data_requests_pending[src_node]))
|
|
|
+ if (!STARPU_RUNNING_ON_VALGRIND && _starpu_data_request_prio_list_empty(&data_requests_pending[handling_node][peer_node][inout]))
|
|
|
return 0;
|
|
|
#endif
|
|
|
|
|
|
#ifdef STARPU_NON_BLOCKING_DRIVERS
|
|
|
if (!force)
|
|
|
{
|
|
|
- if (STARPU_PTHREAD_MUTEX_TRYLOCK(&data_requests_pending_list_mutex[src_node]))
|
|
|
+ if (STARPU_PTHREAD_MUTEX_TRYLOCK(&data_requests_pending_list_mutex[handling_node][peer_node][inout]))
|
|
|
{
|
|
|
/* List is busy, do not bother with it */
|
|
|
return 0;
|
|
@@ -728,19 +835,19 @@ static int _handle_pending_node_data_requests(unsigned src_node, unsigned force)
|
|
|
else
|
|
|
#endif
|
|
|
/* We really want to handle requests */
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&data_requests_pending_list_mutex[src_node]);
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&data_requests_pending_list_mutex[handling_node][peer_node][inout]);
|
|
|
|
|
|
- if (_starpu_data_request_prio_list_empty(&data_requests_pending[src_node]))
|
|
|
+ if (_starpu_data_request_prio_list_empty(&data_requests_pending[handling_node][peer_node][inout]))
|
|
|
{
|
|
|
/* there is no request */
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[src_node]);
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[handling_node][peer_node][inout]);
|
|
|
return 0;
|
|
|
}
|
|
|
/* for all entries of the list */
|
|
|
- struct _starpu_data_request_prio_list local_list = data_requests_pending[src_node];
|
|
|
- _starpu_data_request_prio_list_init(&data_requests_pending[src_node]);
|
|
|
+ struct _starpu_data_request_prio_list local_list = data_requests_pending[handling_node][peer_node][inout];
|
|
|
+ _starpu_data_request_prio_list_init(&data_requests_pending[handling_node][peer_node][inout]);
|
|
|
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[src_node]);
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[handling_node][peer_node][inout]);
|
|
|
|
|
|
_starpu_data_request_prio_list_init(&new_data_requests_pending);
|
|
|
taken = 0;
|
|
@@ -803,55 +910,75 @@ static int _handle_pending_node_data_requests(unsigned src_node, unsigned force)
|
|
|
}
|
|
|
}
|
|
|
_starpu_data_request_prio_list_deinit(&local_list);
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&data_requests_pending_list_mutex[src_node]);
|
|
|
- data_requests_npending[src_node] -= taken - kept;
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&data_requests_pending_list_mutex[handling_node][peer_node][inout]);
|
|
|
+ data_requests_npending[handling_node][peer_node][inout] -= taken - kept;
|
|
|
if (kept)
|
|
|
- _starpu_data_request_prio_list_push_prio_list_back(&data_requests_pending[src_node], &new_data_requests_pending);
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[src_node]);
|
|
|
+ _starpu_data_request_prio_list_push_prio_list_back(&data_requests_pending[handling_node][peer_node][inout], &new_data_requests_pending);
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[handling_node][peer_node][inout]);
|
|
|
|
|
|
return taken - kept;
|
|
|
}
|
|
|
|
|
|
-int _starpu_handle_pending_node_data_requests(unsigned src_node)
|
|
|
+int _starpu_handle_pending_node_data_requests(unsigned handling_node, unsigned peer_node, enum _starpu_data_request_inout inout)
|
|
|
{
|
|
|
- return _handle_pending_node_data_requests(src_node, 0);
|
|
|
+ return _handle_pending_node_data_requests(handling_node, peer_node, inout, 0);
|
|
|
}
|
|
|
|
|
|
-int _starpu_handle_all_pending_node_data_requests(unsigned src_node)
|
|
|
+int _starpu_handle_all_pending_node_data_requests(unsigned handling_node, unsigned peer_node, enum _starpu_data_request_inout inout)
|
|
|
{
|
|
|
- return _handle_pending_node_data_requests(src_node, 1);
|
|
|
+ return _handle_pending_node_data_requests(handling_node, peer_node, inout, 1);
|
|
|
}
|
|
|
|
|
|
/* Note: the returned value will be outdated since the locks are not taken at
|
|
|
* entry/exit */
|
|
|
-int _starpu_check_that_no_data_request_exists(unsigned node)
|
|
|
+static int __starpu_check_that_no_data_request_exists(unsigned node, unsigned peer_node, enum _starpu_data_request_inout inout)
|
|
|
{
|
|
|
int no_request;
|
|
|
int no_pending;
|
|
|
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[node]);
|
|
|
- no_request = _starpu_data_request_prio_list_empty(&data_requests[node])
|
|
|
- && _starpu_data_request_prio_list_empty(&prefetch_requests[node])
|
|
|
- && _starpu_data_request_prio_list_empty(&idle_requests[node]);
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[node]);
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&data_requests_pending_list_mutex[node]);
|
|
|
- no_pending = !data_requests_npending[node];
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[node]);
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[node][peer_node][inout]);
|
|
|
+ no_request = _starpu_data_request_prio_list_empty(&data_requests[node][peer_node][inout])
|
|
|
+ && _starpu_data_request_prio_list_empty(&prefetch_requests[node][peer_node][inout])
|
|
|
+ && _starpu_data_request_prio_list_empty(&idle_requests[node][peer_node][inout]);
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[node][peer_node][inout]);
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&data_requests_pending_list_mutex[node][peer_node][inout]);
|
|
|
+ no_pending = !data_requests_npending[node][peer_node][inout];
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[node][peer_node][inout]);
|
|
|
|
|
|
return no_request && no_pending;
|
|
|
}
|
|
|
|
|
|
+int _starpu_check_that_no_data_request_exists(unsigned node)
|
|
|
+{
|
|
|
+ unsigned peer_node, nnodes = starpu_memory_nodes_get_count();
|
|
|
+
|
|
|
+ for (peer_node = 0; peer_node < nnodes; peer_node++)
|
|
|
+ if (!__starpu_check_that_no_data_request_exists(node, peer_node, _STARPU_DATA_REQUEST_IN)
|
|
|
+ || !__starpu_check_that_no_data_request_exists(node, peer_node, _STARPU_DATA_REQUEST_OUT))
|
|
|
+ return 0;
|
|
|
+ return 1;
|
|
|
+}
|
|
|
+
|
|
|
/* Note: the returned value will be outdated since the locks are not taken at
|
|
|
* entry/exit */
|
|
|
-int _starpu_check_that_no_data_request_is_pending(unsigned node)
|
|
|
+int _starpu_check_that_no_data_request_is_pending(unsigned node, unsigned peer_node, enum _starpu_data_request_inout inout)
|
|
|
{
|
|
|
- return !data_requests_npending[node];
|
|
|
+ return !data_requests_npending[node][peer_node][inout];
|
|
|
}
|
|
|
|
|
|
|
|
|
void _starpu_update_prefetch_status(struct _starpu_data_request *r, enum starpu_is_prefetch prefetch)
|
|
|
{
|
|
|
+ _starpu_spin_checklocked(&r->handle->header_lock);
|
|
|
STARPU_ASSERT(r->prefetch > prefetch);
|
|
|
+
|
|
|
+ if (prefetch == STARPU_FETCH && !r->added_ref)
|
|
|
+ {
|
|
|
+ /* That would have been done by _starpu_create_data_request */
|
|
|
+ r->added_ref = 1;
|
|
|
+ r->dst_replicate->refcnt++;
|
|
|
+ }
|
|
|
+
|
|
|
r->prefetch=prefetch;
|
|
|
|
|
|
if (prefetch >= STARPU_IDLEFETCH)
|
|
@@ -867,27 +994,27 @@ void _starpu_update_prefetch_status(struct _starpu_data_request *r, enum starpu_
|
|
|
_starpu_update_prefetch_status(next_req, prefetch);
|
|
|
}
|
|
|
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[r->handling_node]);
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[r->handling_node][r->peer_node][r->inout]);
|
|
|
|
|
|
int found = 1;
|
|
|
|
|
|
/* The request can be in a different list (handling request or the temp list)
|
|
|
* we have to check that it is really in the prefetch or idle list. */
|
|
|
- if (_starpu_data_request_prio_list_ismember(&prefetch_requests[r->handling_node], r))
|
|
|
- _starpu_data_request_prio_list_erase(&prefetch_requests[r->handling_node], r);
|
|
|
- else if (_starpu_data_request_prio_list_ismember(&idle_requests[r->handling_node], r))
|
|
|
- _starpu_data_request_prio_list_erase(&idle_requests[r->handling_node], r);
|
|
|
+ if (_starpu_data_request_prio_list_ismember(&prefetch_requests[r->handling_node][r->peer_node][r->inout], r))
|
|
|
+ _starpu_data_request_prio_list_erase(&prefetch_requests[r->handling_node][r->peer_node][r->inout], r);
|
|
|
+ else if (_starpu_data_request_prio_list_ismember(&idle_requests[r->handling_node][r->peer_node][r->inout], r))
|
|
|
+ _starpu_data_request_prio_list_erase(&idle_requests[r->handling_node][r->peer_node][r->inout], r);
|
|
|
else
|
|
|
found = 0;
|
|
|
|
|
|
if (found)
|
|
|
{
|
|
|
if (prefetch > STARPU_FETCH)
|
|
|
- _starpu_data_request_prio_list_push_back(&prefetch_requests[r->handling_node],r);
|
|
|
+ _starpu_data_request_prio_list_push_back(&prefetch_requests[r->handling_node][r->peer_node][r->inout],r);
|
|
|
else
|
|
|
- _starpu_data_request_prio_list_push_back(&data_requests[r->handling_node],r);
|
|
|
+ _starpu_data_request_prio_list_push_back(&data_requests[r->handling_node][r->peer_node][r->inout],r);
|
|
|
}
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[r->handling_node]);
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[r->handling_node][r->peer_node][r->inout]);
|
|
|
|
|
|
#ifndef STARPU_NON_BLOCKING_DRIVERS
|
|
|
_starpu_wake_all_blocked_workers_on_node(r->handling_node);
|