Selaa lähdekoodia

Use separate request queues for different memory node pairs and way

To avoid stalling one way due to transfers on another way.
Samuel Thibault 4 vuotta sitten
vanhempi
commit
f80c2b93ac

+ 0 - 19
src/core/sched_policy.c

@@ -1153,25 +1153,6 @@ void _starpu_sched_post_exec_hook(struct starpu_task *task)
 	}
 }
 
-void _starpu_wait_on_sched_event(void)
-{
-	struct _starpu_worker *worker = _starpu_get_local_worker_key();
-
-	STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
-
-	_starpu_handle_all_pending_node_data_requests(worker->memory_node);
-
-	if (_starpu_machine_is_running())
-	{
-#ifndef STARPU_NON_BLOCKING_DRIVERS
-		STARPU_PTHREAD_COND_WAIT(&worker->sched_cond,
-					  &worker->sched_mutex);
-#endif
-	}
-
-	STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
-}
-
 int starpu_push_local_task(int workerid, struct starpu_task *task, int back STARPU_ATTRIBUTE_UNUSED)
 {
 	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);

+ 0 - 2
src/core/sched_policy.h

@@ -63,8 +63,6 @@ struct starpu_task *_starpu_pop_every_task(struct _starpu_sched_ctx *sched_ctx);
 void _starpu_sched_post_exec_hook(struct starpu_task *task);
 int _starpu_pop_task_end(struct starpu_task *task);
 
-void _starpu_wait_on_sched_event(void);
-
 struct starpu_task *_starpu_create_conversion_task(starpu_data_handle_t handle,
 						   unsigned int node) STARPU_ATTRIBUTE_MALLOC;
 

+ 131 - 103
src/datawizard/data_request.c

@@ -25,57 +25,67 @@
 #include <core/simgrid.h>
 
 /* requests that have not been treated at all */
-#ifdef STARPU_DEVEL
-#warning split into separate out/in queues for each node, so that MAX_PENDING_REQUESTS_PER_NODE is separate for them, since the links are bidirectionnal
-#endif
-static struct _starpu_data_request_prio_list data_requests[STARPU_MAXNODES];
-static struct _starpu_data_request_prio_list prefetch_requests[STARPU_MAXNODES]; /* Contains both task_prefetch and prefetch */
-static struct _starpu_data_request_prio_list idle_requests[STARPU_MAXNODES];
-static starpu_pthread_mutex_t data_requests_list_mutex[STARPU_MAXNODES];
+static struct _starpu_data_request_prio_list data_requests[STARPU_MAXNODES][STARPU_MAXNODES][2];
+static struct _starpu_data_request_prio_list prefetch_requests[STARPU_MAXNODES][STARPU_MAXNODES][2]; /* Contains both task_prefetch and prefetch */
+static struct _starpu_data_request_prio_list idle_requests[STARPU_MAXNODES][STARPU_MAXNODES][2];
+static starpu_pthread_mutex_t data_requests_list_mutex[STARPU_MAXNODES][STARPU_MAXNODES][2];
 
 /* requests that are not terminated (eg. async transfers) */
-static struct _starpu_data_request_prio_list data_requests_pending[STARPU_MAXNODES];
-static unsigned data_requests_npending[STARPU_MAXNODES];
-static starpu_pthread_mutex_t data_requests_pending_list_mutex[STARPU_MAXNODES];
+static struct _starpu_data_request_prio_list data_requests_pending[STARPU_MAXNODES][STARPU_MAXNODES][2];
+static unsigned data_requests_npending[STARPU_MAXNODES][STARPU_MAXNODES][2];
+static starpu_pthread_mutex_t data_requests_pending_list_mutex[STARPU_MAXNODES][STARPU_MAXNODES][2];
 
 void _starpu_init_data_request_lists(void)
 {
-	unsigned i;
+	unsigned i, j;
+	enum _starpu_data_request_inout k;
 	for (i = 0; i < STARPU_MAXNODES; i++)
 	{
-		_starpu_data_request_prio_list_init(&data_requests[i]);
-		_starpu_data_request_prio_list_init(&prefetch_requests[i]);
-		_starpu_data_request_prio_list_init(&idle_requests[i]);
+		for (j = 0; j < STARPU_MAXNODES; j++)
+		{
+			for (k = _STARPU_DATA_REQUEST_IN; k <= _STARPU_DATA_REQUEST_OUT; k++)
+			{
+				_starpu_data_request_prio_list_init(&data_requests[i][j][k]);
+				_starpu_data_request_prio_list_init(&prefetch_requests[i][j][k]);
+				_starpu_data_request_prio_list_init(&idle_requests[i][j][k]);
 
 #ifndef STARPU_DEBUG
-		/* Tell helgrind that we are fine with checking for list_empty
-		 * in _starpu_handle_node_data_requests, we will call it
-		 * periodically anyway */
-		STARPU_HG_DISABLE_CHECKING(data_requests[i].tree.root);
-		STARPU_HG_DISABLE_CHECKING(prefetch_requests[i].tree.root);
-		STARPU_HG_DISABLE_CHECKING(idle_requests[i].tree.root);
+				/* Tell helgrind that we are fine with checking for list_empty
+				 * in _starpu_handle_node_data_requests, we will call it
+				 * periodically anyway */
+				STARPU_HG_DISABLE_CHECKING(data_requests[i][j][k].tree.root);
+				STARPU_HG_DISABLE_CHECKING(prefetch_requests[i][j][k].tree.root);
+				STARPU_HG_DISABLE_CHECKING(idle_requests[i][j][k].tree.root);
 #endif
+				_starpu_data_request_prio_list_init(&data_requests_pending[i][j][k]);
+				data_requests_npending[i][j][k] = 0;
 
-		STARPU_PTHREAD_MUTEX_INIT(&data_requests_list_mutex[i], NULL);
-
-		_starpu_data_request_prio_list_init(&data_requests_pending[i]);
-		data_requests_npending[i] = 0;
-		STARPU_PTHREAD_MUTEX_INIT(&data_requests_pending_list_mutex[i], NULL);
+				STARPU_PTHREAD_MUTEX_INIT(&data_requests_list_mutex[i][j][k], NULL);
+				STARPU_PTHREAD_MUTEX_INIT(&data_requests_pending_list_mutex[i][j][k], NULL);
+			}
+		}
 	}
 	STARPU_HG_DISABLE_CHECKING(data_requests_npending);
 }
 
 void _starpu_deinit_data_request_lists(void)
 {
-	unsigned i;
+	unsigned i, j;
+	enum _starpu_data_request_inout k;
 	for (i = 0; i < STARPU_MAXNODES; i++)
 	{
-		_starpu_data_request_prio_list_deinit(&data_requests[i]);
-		_starpu_data_request_prio_list_deinit(&prefetch_requests[i]);
-		_starpu_data_request_prio_list_deinit(&idle_requests[i]);
-		STARPU_PTHREAD_MUTEX_DESTROY(&data_requests_pending_list_mutex[i]);
-		_starpu_data_request_prio_list_deinit(&data_requests_pending[i]);
-		STARPU_PTHREAD_MUTEX_DESTROY(&data_requests_list_mutex[i]);
+		for (j = 0; j < STARPU_MAXNODES; j++)
+		{
+			for (k = _STARPU_DATA_REQUEST_IN; k <= _STARPU_DATA_REQUEST_OUT; k++)
+			{
+				_starpu_data_request_prio_list_deinit(&data_requests[i][j][k]);
+				_starpu_data_request_prio_list_deinit(&prefetch_requests[i][j][k]);
+				_starpu_data_request_prio_list_deinit(&idle_requests[i][j][k]);
+				_starpu_data_request_prio_list_deinit(&data_requests_pending[i][j][k]);
+				STARPU_PTHREAD_MUTEX_DESTROY(&data_requests_pending_list_mutex[i][j][k]);
+				STARPU_PTHREAD_MUTEX_DESTROY(&data_requests_list_mutex[i][j][k]);
+			}
+		}
 	}
 }
 
@@ -153,6 +163,24 @@ struct _starpu_data_request *_starpu_create_data_request(starpu_data_handle_t ha
 	if (handling_node == -1)
 		handling_node = STARPU_MAIN_RAM;
 	r->handling_node = handling_node;
+	if (dst_replicate && handling_node == dst_replicate->memory_node)
+	{
+		if (src_replicate)
+			r->peer_node = src_replicate->memory_node;
+		else
+			r->peer_node = 0;
+		r->inout = _STARPU_DATA_REQUEST_IN;
+	}
+	else if (src_replicate)
+	{
+		r->peer_node = dst_replicate->memory_node;
+		r->inout = _STARPU_DATA_REQUEST_OUT;
+	}
+	else
+	{
+		r->peer_node = 0;
+		r->inout = _STARPU_DATA_REQUEST_OUT;
+	}
 	STARPU_ASSERT(starpu_node_get_kind(handling_node) == STARPU_CPU_RAM || _starpu_memory_node_get_nworkers(handling_node));
 	r->completed = 0;
 	r->prefetch = is_prefetch;
@@ -310,14 +338,14 @@ void _starpu_post_data_request(struct _starpu_data_request *r)
 	}
 
 	/* insert the request in the proper list */
-	STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[handling_node]);
+	STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[handling_node][r->peer_node][r->inout]);
 	if (r->prefetch >= STARPU_IDLEFETCH)
-		_starpu_data_request_prio_list_push_back(&idle_requests[handling_node], r);
+		_starpu_data_request_prio_list_push_back(&idle_requests[handling_node][r->peer_node][r->inout], r);
 	else if (r->prefetch > STARPU_FETCH)
-		_starpu_data_request_prio_list_push_back(&prefetch_requests[handling_node], r);
+		_starpu_data_request_prio_list_push_back(&prefetch_requests[handling_node][r->peer_node][r->inout], r);
 	else
-		_starpu_data_request_prio_list_push_back(&data_requests[handling_node], r);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[handling_node]);
+		_starpu_data_request_prio_list_push_back(&data_requests[handling_node][r->peer_node][r->inout], r);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[handling_node][r->peer_node][r->inout]);
 
 #ifndef STARPU_NON_BLOCKING_DRIVERS
 	_starpu_wake_all_blocked_workers_on_node(handling_node);
@@ -528,10 +556,10 @@ static int starpu_handle_data_request(struct _starpu_data_request *r, unsigned m
 		 * requests in the meantime. */
 		_starpu_spin_unlock(&handle->header_lock);
 
-		STARPU_PTHREAD_MUTEX_LOCK(&data_requests_pending_list_mutex[r->handling_node]);
-		_starpu_data_request_prio_list_push_back(&data_requests_pending[r->handling_node], r);
-		data_requests_npending[r->handling_node]++;
-		STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[r->handling_node]);
+		STARPU_PTHREAD_MUTEX_LOCK(&data_requests_pending_list_mutex[r->handling_node][r->peer_node][r->inout]);
+		_starpu_data_request_prio_list_push_back(&data_requests_pending[r->handling_node][r->peer_node][r->inout], r);
+		data_requests_npending[r->handling_node][r->peer_node][r->inout]++;
+		STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[r->handling_node][r->peer_node][r->inout]);
 
 		return -EAGAIN;
 	}
@@ -543,7 +571,7 @@ static int starpu_handle_data_request(struct _starpu_data_request *r, unsigned m
 	return 0;
 }
 
-static int __starpu_handle_node_data_requests(struct _starpu_data_request_prio_list *reqlist, unsigned handling_node, unsigned may_alloc, unsigned n, unsigned *pushed, enum starpu_is_prefetch prefetch)
+static int __starpu_handle_node_data_requests(struct _starpu_data_request_prio_list reqlist[STARPU_MAXNODES][STARPU_MAXNODES][2], unsigned handling_node, unsigned peer_node, enum _starpu_data_request_inout inout, unsigned may_alloc, unsigned n, unsigned *pushed, enum starpu_is_prefetch prefetch)
 {
 	struct _starpu_data_request *r;
 	struct _starpu_data_request_prio_list new_data_requests[prefetch + 1]; /* Indexed by prefetch level */
@@ -556,7 +584,7 @@ static int __starpu_handle_node_data_requests(struct _starpu_data_request_prio_l
 	/* This is racy, but not posing problems actually, since we know we
 	 * will come back here to probe again regularly anyway.
 	 * Thus, do not expose this optimization to helgrind */
-	if (!STARPU_RUNNING_ON_VALGRIND && _starpu_data_request_prio_list_empty(&reqlist[handling_node]))
+	if (!STARPU_RUNNING_ON_VALGRIND && _starpu_data_request_prio_list_empty(&reqlist[handling_node][peer_node][inout]))
 		return 0;
 #endif
 
@@ -564,29 +592,29 @@ static int __starpu_handle_node_data_requests(struct _starpu_data_request_prio_l
 
 #ifdef STARPU_NON_BLOCKING_DRIVERS
 	/* take all the entries from the request list */
-	if (STARPU_PTHREAD_MUTEX_TRYLOCK(&data_requests_list_mutex[handling_node]))
+	if (STARPU_PTHREAD_MUTEX_TRYLOCK(&data_requests_list_mutex[handling_node][peer_node][inout]))
 	{
 		/* List is busy, do not bother with it */
 		return -EBUSY;
 	}
 #else
-	STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[handling_node]);
+	STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[handling_node][peer_node][inout]);
 #endif
 
-	if (_starpu_data_request_prio_list_empty(&reqlist[handling_node]))
+	if (_starpu_data_request_prio_list_empty(&reqlist[handling_node][peer_node][inout]))
 	{
 		/* there is no request */
-                STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[handling_node]);
+                STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[handling_node][peer_node][inout]);
 		return 0;
 	}
 
 	/* There is an entry: we create a new empty list to replace the list of
 	 * requests, and we handle the request(s) one by one in the former
 	 * list, without concurrency issues.*/
-	struct _starpu_data_request_prio_list local_list = reqlist[handling_node];
-	_starpu_data_request_prio_list_init(&reqlist[handling_node]);
+	struct _starpu_data_request_prio_list local_list = reqlist[handling_node][peer_node][inout];
+	_starpu_data_request_prio_list_init(&reqlist[handling_node][peer_node][inout]);
 
-	STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[handling_node]);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[handling_node][peer_node][inout]);
 
 	for (i = 0; i <= prefetch; i++)
 		_starpu_data_request_prio_list_init(&new_data_requests[i]);
@@ -597,7 +625,7 @@ static int __starpu_handle_node_data_requests(struct _starpu_data_request_prio_l
 	{
                 int res;
 
-		if (data_requests_npending[handling_node] >= n)
+		if (data_requests_npending[handling_node][peer_node][inout] >= n)
 		{
 			/* Too many requests at the same time, skip pushing
 			 * more for now */
@@ -644,28 +672,28 @@ static int __starpu_handle_node_data_requests(struct _starpu_data_request_prio_l
 
 	if (i <= prefetch)
 	{
-		STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[handling_node]);
+		STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[handling_node][peer_node][inout]);
 		if (!(_starpu_data_request_prio_list_empty(&new_data_requests[STARPU_FETCH])))
 		{
-			_starpu_data_request_prio_list_push_prio_list_back(&new_data_requests[STARPU_FETCH], &data_requests[handling_node]);
-			data_requests[handling_node] = new_data_requests[STARPU_FETCH];
+			_starpu_data_request_prio_list_push_prio_list_back(&new_data_requests[STARPU_FETCH], &data_requests[handling_node][peer_node][inout]);
+			data_requests[handling_node][peer_node][inout] = new_data_requests[STARPU_FETCH];
 		}
 		if (prefetch >= STARPU_TASK_PREFETCH && !(_starpu_data_request_prio_list_empty(&new_data_requests[STARPU_TASK_PREFETCH])))
 		{
-			_starpu_data_request_prio_list_push_prio_list_back(&new_data_requests[STARPU_TASK_PREFETCH], &prefetch_requests[handling_node]);
-			prefetch_requests[handling_node] = new_data_requests[STARPU_TASK_PREFETCH];
+			_starpu_data_request_prio_list_push_prio_list_back(&new_data_requests[STARPU_TASK_PREFETCH], &prefetch_requests[handling_node][peer_node][inout]);
+			prefetch_requests[handling_node][peer_node][inout] = new_data_requests[STARPU_TASK_PREFETCH];
 		}
 		if (prefetch >= STARPU_PREFETCH && !(_starpu_data_request_prio_list_empty(&new_data_requests[STARPU_PREFETCH])))
 		{
-			_starpu_data_request_prio_list_push_prio_list_back(&new_data_requests[STARPU_PREFETCH], &prefetch_requests[handling_node]);
-			prefetch_requests[handling_node] = new_data_requests[STARPU_PREFETCH];
+			_starpu_data_request_prio_list_push_prio_list_back(&new_data_requests[STARPU_PREFETCH], &prefetch_requests[handling_node][peer_node][inout]);
+			prefetch_requests[handling_node][peer_node][inout] = new_data_requests[STARPU_PREFETCH];
 		}
 		if (prefetch >= STARPU_IDLEFETCH && !(_starpu_data_request_prio_list_empty(&new_data_requests[STARPU_IDLEFETCH])))
 		{
-			_starpu_data_request_prio_list_push_prio_list_back(&new_data_requests[STARPU_IDLEFETCH], &idle_requests[handling_node]);
-			idle_requests[handling_node] = new_data_requests[STARPU_IDLEFETCH];
+			_starpu_data_request_prio_list_push_prio_list_back(&new_data_requests[STARPU_IDLEFETCH], &idle_requests[handling_node][peer_node][inout]);
+			idle_requests[handling_node][peer_node][inout] = new_data_requests[STARPU_IDLEFETCH];
 		}
-		STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[handling_node]);
+		STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[handling_node][peer_node][inout]);
 
 #ifdef STARPU_SIMGRID
 		if (*pushed)
@@ -687,22 +715,22 @@ static int __starpu_handle_node_data_requests(struct _starpu_data_request_prio_l
 	return ret;
 }
 
-int _starpu_handle_node_data_requests(unsigned handling_node, unsigned may_alloc, unsigned *pushed)
+int _starpu_handle_node_data_requests(unsigned handling_node, unsigned peer_node, enum _starpu_data_request_inout inout, unsigned may_alloc, unsigned *pushed)
 {
-	return __starpu_handle_node_data_requests(data_requests, handling_node, may_alloc, MAX_PENDING_REQUESTS_PER_NODE, pushed, STARPU_FETCH);
+	return __starpu_handle_node_data_requests(data_requests, handling_node, peer_node, inout, may_alloc, MAX_PENDING_REQUESTS_PER_NODE, pushed, STARPU_FETCH);
 }
 
-int _starpu_handle_node_prefetch_requests(unsigned handling_node, unsigned may_alloc, unsigned *pushed)
+int _starpu_handle_node_prefetch_requests(unsigned handling_node, unsigned peer_node, enum _starpu_data_request_inout inout, unsigned may_alloc, unsigned *pushed)
 {
-	return __starpu_handle_node_data_requests(prefetch_requests, handling_node, may_alloc, MAX_PENDING_PREFETCH_REQUESTS_PER_NODE, pushed, STARPU_PREFETCH);
+	return __starpu_handle_node_data_requests(prefetch_requests, handling_node, peer_node, inout, may_alloc, MAX_PENDING_PREFETCH_REQUESTS_PER_NODE, pushed, STARPU_PREFETCH);
 }
 
-int _starpu_handle_node_idle_requests(unsigned handling_node, unsigned may_alloc, unsigned *pushed)
+int _starpu_handle_node_idle_requests(unsigned handling_node, unsigned peer_node, enum _starpu_data_request_inout inout, unsigned may_alloc, unsigned *pushed)
 {
-	return __starpu_handle_node_data_requests(idle_requests, handling_node, may_alloc, MAX_PENDING_IDLE_REQUESTS_PER_NODE, pushed, STARPU_IDLEFETCH);
+	return __starpu_handle_node_data_requests(idle_requests, handling_node, peer_node, inout, may_alloc, MAX_PENDING_IDLE_REQUESTS_PER_NODE, pushed, STARPU_IDLEFETCH);
 }
 
-static int _handle_pending_node_data_requests(unsigned handling_node, unsigned force)
+static int _handle_pending_node_data_requests(unsigned handling_node, unsigned peer_node, enum _starpu_data_request_inout inout, unsigned force)
 {
 //	_STARPU_DEBUG("_starpu_handle_pending_node_data_requests ...\n");
 //
@@ -713,14 +741,14 @@ static int _handle_pending_node_data_requests(unsigned handling_node, unsigned f
 	/* Here helgrind would should that this is an un protected access.
 	 * We however don't care about missing an entry, we will get called
 	 * again sooner or later. */
-	if (!STARPU_RUNNING_ON_VALGRIND && _starpu_data_request_prio_list_empty(&data_requests_pending[handling_node]))
+	if (!STARPU_RUNNING_ON_VALGRIND && _starpu_data_request_prio_list_empty(&data_requests_pending[handling_node][peer_node][inout]))
 		return 0;
 #endif
 
 #ifdef STARPU_NON_BLOCKING_DRIVERS
 	if (!force)
 	{
-		if (STARPU_PTHREAD_MUTEX_TRYLOCK(&data_requests_pending_list_mutex[handling_node]))
+		if (STARPU_PTHREAD_MUTEX_TRYLOCK(&data_requests_pending_list_mutex[handling_node][peer_node][inout]))
 		{
 			/* List is busy, do not bother with it */
 			return 0;
@@ -729,19 +757,19 @@ static int _handle_pending_node_data_requests(unsigned handling_node, unsigned f
 	else
 #endif
 		/* We really want to handle requests */
-		STARPU_PTHREAD_MUTEX_LOCK(&data_requests_pending_list_mutex[handling_node]);
+		STARPU_PTHREAD_MUTEX_LOCK(&data_requests_pending_list_mutex[handling_node][peer_node][inout]);
 
-	if (_starpu_data_request_prio_list_empty(&data_requests_pending[handling_node]))
+	if (_starpu_data_request_prio_list_empty(&data_requests_pending[handling_node][peer_node][inout]))
 	{
 		/* there is no request */
-		STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[handling_node]);
+		STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[handling_node][peer_node][inout]);
 		return 0;
 	}
 	/* for all entries of the list */
-	struct _starpu_data_request_prio_list local_list = data_requests_pending[handling_node];
-	_starpu_data_request_prio_list_init(&data_requests_pending[handling_node]);
+	struct _starpu_data_request_prio_list local_list = data_requests_pending[handling_node][peer_node][inout];
+	_starpu_data_request_prio_list_init(&data_requests_pending[handling_node][peer_node][inout]);
 
-	STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[handling_node]);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[handling_node][peer_node][inout]);
 
 	_starpu_data_request_prio_list_init(&new_data_requests_pending);
 	taken = 0;
@@ -804,49 +832,49 @@ static int _handle_pending_node_data_requests(unsigned handling_node, unsigned f
 		}
 	}
 	_starpu_data_request_prio_list_deinit(&local_list);
-	STARPU_PTHREAD_MUTEX_LOCK(&data_requests_pending_list_mutex[handling_node]);
-	data_requests_npending[handling_node] -= taken - kept;
+	STARPU_PTHREAD_MUTEX_LOCK(&data_requests_pending_list_mutex[handling_node][peer_node][inout]);
+	data_requests_npending[handling_node][peer_node][inout] -= taken - kept;
 	if (kept)
-		_starpu_data_request_prio_list_push_prio_list_back(&data_requests_pending[handling_node], &new_data_requests_pending);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[handling_node]);
+		_starpu_data_request_prio_list_push_prio_list_back(&data_requests_pending[handling_node][peer_node][inout], &new_data_requests_pending);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[handling_node][peer_node][inout]);
 
 	return taken - kept;
 }
 
-int _starpu_handle_pending_node_data_requests(unsigned handling_node)
+int _starpu_handle_pending_node_data_requests(unsigned handling_node, unsigned peer_node, enum _starpu_data_request_inout inout)
 {
-	return _handle_pending_node_data_requests(handling_node, 0);
+	return _handle_pending_node_data_requests(handling_node, peer_node, inout, 0);
 }
 
-int _starpu_handle_all_pending_node_data_requests(unsigned handling_node)
+int _starpu_handle_all_pending_node_data_requests(unsigned handling_node, unsigned peer_node, enum _starpu_data_request_inout inout)
 {
-	return _handle_pending_node_data_requests(handling_node, 1);
+	return _handle_pending_node_data_requests(handling_node, peer_node, inout, 1);
 }
 
 /* Note: the returned value will be outdated since the locks are not taken at
  * entry/exit */
-int _starpu_check_that_no_data_request_exists(unsigned node)
+int _starpu_check_that_no_data_request_exists(unsigned node, unsigned peer_node, enum _starpu_data_request_inout inout)
 {
 	int no_request;
 	int no_pending;
 
-	STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[node]);
-	no_request = _starpu_data_request_prio_list_empty(&data_requests[node])
-	          && _starpu_data_request_prio_list_empty(&prefetch_requests[node])
-		  && _starpu_data_request_prio_list_empty(&idle_requests[node]);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[node]);
-	STARPU_PTHREAD_MUTEX_LOCK(&data_requests_pending_list_mutex[node]);
-	no_pending = !data_requests_npending[node];
-	STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[node]);
+	STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[node][peer_node][inout]);
+	no_request = _starpu_data_request_prio_list_empty(&data_requests[node][peer_node][inout])
+	          && _starpu_data_request_prio_list_empty(&prefetch_requests[node][peer_node][inout])
+		  && _starpu_data_request_prio_list_empty(&idle_requests[node][peer_node][inout]);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[node][peer_node][inout]);
+	STARPU_PTHREAD_MUTEX_LOCK(&data_requests_pending_list_mutex[node][peer_node][inout]);
+	no_pending = !data_requests_npending[node][peer_node][inout];
+	STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[node][peer_node][inout]);
 
 	return no_request && no_pending;
 }
 
 /* Note: the returned value will be outdated since the locks are not taken at
  * entry/exit */
-int _starpu_check_that_no_data_request_is_pending(unsigned node)
+int _starpu_check_that_no_data_request_is_pending(unsigned node, unsigned peer_node, enum _starpu_data_request_inout inout)
 {
-	return !data_requests_npending[node];
+	return !data_requests_npending[node][peer_node][inout];
 }
 
 
@@ -868,27 +896,27 @@ void _starpu_update_prefetch_status(struct _starpu_data_request *r, enum starpu_
 			_starpu_update_prefetch_status(next_req, prefetch);
 	}
 
-	STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[r->handling_node]);
+	STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[r->handling_node][r->peer_node][r->inout]);
 
 	int found = 1;
 
 	/* The request can be in a different list (handling request or the temp list)
 	 * we have to check that it is really in the prefetch or idle list. */
-	if (_starpu_data_request_prio_list_ismember(&prefetch_requests[r->handling_node], r))
-		_starpu_data_request_prio_list_erase(&prefetch_requests[r->handling_node], r);
-	else if (_starpu_data_request_prio_list_ismember(&idle_requests[r->handling_node], r))
-		_starpu_data_request_prio_list_erase(&idle_requests[r->handling_node], r);
+	if (_starpu_data_request_prio_list_ismember(&prefetch_requests[r->handling_node][r->peer_node][r->inout], r))
+		_starpu_data_request_prio_list_erase(&prefetch_requests[r->handling_node][r->peer_node][r->inout], r);
+	else if (_starpu_data_request_prio_list_ismember(&idle_requests[r->handling_node][r->peer_node][r->inout], r))
+		_starpu_data_request_prio_list_erase(&idle_requests[r->handling_node][r->peer_node][r->inout], r);
 	else
 		found = 0;
 
 	if (found)
 	{
 		if (prefetch > STARPU_FETCH)
-			_starpu_data_request_prio_list_push_back(&prefetch_requests[r->handling_node],r);
+			_starpu_data_request_prio_list_push_back(&prefetch_requests[r->handling_node][r->peer_node][r->inout],r);
 		else
-			_starpu_data_request_prio_list_push_back(&data_requests[r->handling_node],r);
+			_starpu_data_request_prio_list_push_back(&data_requests[r->handling_node][r->peer_node][r->inout],r);
 	}
-	STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[r->handling_node]);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[r->handling_node][r->peer_node][r->inout]);
 
 #ifndef STARPU_NON_BLOCKING_DRIVERS
 	_starpu_wake_all_blocked_workers_on_node(r->handling_node);

+ 15 - 9
src/datawizard/data_request.h

@@ -32,8 +32,8 @@
  * Data interfaces should also have to declare how many asynchronous requests
  * they have actually started (think of e.g. csr).
  */
-#define MAX_PENDING_REQUESTS_PER_NODE 20
-#define MAX_PENDING_PREFETCH_REQUESTS_PER_NODE 10
+#define MAX_PENDING_REQUESTS_PER_NODE 5
+#define MAX_PENDING_PREFETCH_REQUESTS_PER_NODE 2
 #define MAX_PENDING_IDLE_REQUESTS_PER_NODE 1
 /** Maximum time in us that we can afford pushing requests before going back to the driver loop, e.g. for checking GPU task termination */
 #define MAX_PUSH_TIME 1000
@@ -47,6 +47,10 @@ struct _starpu_callback_list
 	struct _starpu_callback_list *next;
 };
 
+enum _starpu_data_request_inout {
+	_STARPU_DATA_REQUEST_IN, _STARPU_DATA_REQUEST_OUT
+};
+
 /** This represents a data request, i.e. we want some data to get transferred
  * from a source to a destination. */
 LIST_TYPE(_starpu_data_request,
@@ -63,6 +67,8 @@ LIST_TYPE(_starpu_data_request,
 	 * the node can make the CUDA/OpenCL calls.
 	 */
 	unsigned handling_node;
+	unsigned peer_node;
+	enum _starpu_data_request_inout inout;
 
 	/*
 	 * What the destination node wants to do with the data: write to it,
@@ -135,15 +141,15 @@ void _starpu_init_data_request_lists(void);
 void _starpu_deinit_data_request_lists(void);
 void _starpu_post_data_request(struct _starpu_data_request *r);
 /** returns 0 if we have pushed all requests, -EBUSY or -ENOMEM otherwise */
-int _starpu_handle_node_data_requests(unsigned handling_node, unsigned may_alloc, unsigned *pushed);
-int _starpu_handle_node_prefetch_requests(unsigned handling_node, unsigned may_alloc, unsigned *pushed);
-int _starpu_handle_node_idle_requests(unsigned handling_node, unsigned may_alloc, unsigned *pushed);
+int _starpu_handle_node_data_requests(unsigned handling_node, unsigned peer_node, enum _starpu_data_request_inout inout, unsigned may_alloc, unsigned *pushed);
+int _starpu_handle_node_prefetch_requests(unsigned handling_node, unsigned peer_node, enum _starpu_data_request_inout inout, unsigned may_alloc, unsigned *pushed);
+int _starpu_handle_node_idle_requests(unsigned handling_node, unsigned peer_node, enum _starpu_data_request_inout inout, unsigned may_alloc, unsigned *pushed);
 
-int _starpu_handle_pending_node_data_requests(unsigned src_node);
-int _starpu_handle_all_pending_node_data_requests(unsigned src_node);
+int _starpu_handle_pending_node_data_requests(unsigned handling_node, unsigned peer_node, enum _starpu_data_request_inout inout);
+int _starpu_handle_all_pending_node_data_requests(unsigned handling_node, unsigned peer_node, enum _starpu_data_request_inout inout);
 
-int _starpu_check_that_no_data_request_exists(unsigned node);
-int _starpu_check_that_no_data_request_is_pending(unsigned node);
+int _starpu_check_that_no_data_request_exists(unsigned handling_node, unsigned peer_node, enum _starpu_data_request_inout inout);
+int _starpu_check_that_no_data_request_is_pending(unsigned handling_node, unsigned peer_node, enum _starpu_data_request_inout inout);
 
 struct _starpu_data_request *_starpu_create_data_request(starpu_data_handle_t handle,
 							 struct _starpu_data_replicate *src_replicate,

+ 30 - 10
src/datawizard/datawizard.c

@@ -26,7 +26,7 @@
 #include <core/simgrid.h>
 #endif
 
-int ___starpu_datawizard_progress(unsigned memory_node, unsigned may_alloc, unsigned push_requests)
+int ___starpu_datawizard_progress(unsigned memory_node, unsigned peer_node, enum _starpu_data_request_inout inout, unsigned may_alloc, unsigned push_requests)
 {
 	int ret = 0;
 
@@ -37,7 +37,7 @@ int ___starpu_datawizard_progress(unsigned memory_node, unsigned may_alloc, unsi
 	STARPU_UYIELD();
 
 	/* in case some other driver requested data */
-	if (_starpu_handle_pending_node_data_requests(memory_node))
+	if (_starpu_handle_pending_node_data_requests(memory_node, peer_node, inout))
 		ret = 1;
 
 	starpu_memchunk_tidy(memory_node);
@@ -46,16 +46,16 @@ int ___starpu_datawizard_progress(unsigned memory_node, unsigned may_alloc, unsi
 	{
 		/* Some transfers have finished, or the driver requests to really push more */
 		unsigned pushed;
-		if (_starpu_handle_node_data_requests(memory_node, may_alloc, &pushed) == 0)
+		if (_starpu_handle_node_data_requests(memory_node, peer_node, inout, may_alloc, &pushed) == 0)
 		{
 			if (pushed)
 				ret = 1;
 			/* We pushed all pending requests, we can afford pushing
 			 * prefetch requests */
-			_starpu_handle_node_prefetch_requests(memory_node, may_alloc, &pushed);
-			if (_starpu_check_that_no_data_request_is_pending(memory_node))
+			_starpu_handle_node_prefetch_requests(memory_node, peer_node, inout, may_alloc, &pushed);
+			if (_starpu_check_that_no_data_request_is_pending(memory_node, peer_node, inout))
 				/* No pending transfer, push some idle transfer */
-				_starpu_handle_node_idle_requests(memory_node, may_alloc, &pushed);
+				_starpu_handle_node_idle_requests(memory_node, peer_node, inout, may_alloc, &pushed);
 		}
 		if (pushed)
 			ret = 1;
@@ -68,16 +68,20 @@ int ___starpu_datawizard_progress(unsigned memory_node, unsigned may_alloc, unsi
 int __starpu_datawizard_progress(unsigned may_alloc, unsigned push_requests)
 {
 	struct _starpu_worker *worker = _starpu_get_local_worker_key();
-        unsigned memnode;
+        unsigned memnode, memnode2;
 
 	if (!worker)
 	{
 		/* Call from main application, only make RAM requests progress */
 		int ret = 0;
 		int nnumas = starpu_memory_nodes_get_numa_count();
-		int numa;
+		int numa, numa2;
 		for (numa = 0; numa < nnumas; numa++)
-			ret |=  ___starpu_datawizard_progress(numa, may_alloc, push_requests);
+			for (numa2 = 0; numa2 < nnumas; numa2++)
+			{
+				ret |=  ___starpu_datawizard_progress(numa, numa2, _STARPU_DATA_REQUEST_IN, may_alloc, push_requests);
+				ret |=  ___starpu_datawizard_progress(numa, numa2, _STARPU_DATA_REQUEST_OUT, may_alloc, push_requests);
+			}
 
 		return ret;
 	}
@@ -93,7 +97,11 @@ int __starpu_datawizard_progress(unsigned may_alloc, unsigned push_requests)
         for (memnode = 0; memnode < nnodes; memnode++)
         {
                 if (_starpu_worker_drives_memory[current_worker_id][memnode] == 1)
-                        ret |= ___starpu_datawizard_progress(memnode, may_alloc, push_requests);
+			for (memnode2 = 0; memnode2 < nnodes; memnode2++)
+			{
+				ret |= ___starpu_datawizard_progress(memnode, memnode2, _STARPU_DATA_REQUEST_IN, may_alloc, push_requests);
+				ret |= ___starpu_datawizard_progress(memnode, memnode2, _STARPU_DATA_REQUEST_OUT, may_alloc, push_requests);
+			}
         }
 
         return ret;
@@ -103,3 +111,15 @@ void _starpu_datawizard_progress(unsigned may_alloc)
 {
         __starpu_datawizard_progress(may_alloc, 1);
 }
+
+void _starpu_datawizard_handle_all_pending_node_data_requests(unsigned memnode)
+{
+	unsigned nnodes = starpu_memory_nodes_get_count();
+	unsigned memnode2;
+
+	for (memnode2 = 0; memnode2 < nnodes; memnode2++)
+	{
+		_starpu_handle_all_pending_node_data_requests(memnode, memnode2, _STARPU_DATA_REQUEST_IN);
+		_starpu_handle_all_pending_node_data_requests(memnode, memnode2, _STARPU_DATA_REQUEST_OUT);
+	}
+}

+ 4 - 1
src/datawizard/datawizard.h

@@ -41,11 +41,14 @@
  * If \p may_alloc is 1, it can allocate destination data for transfers
  * (this is not possible e.g. when spinning for a handle lock)
  */
-int ___starpu_datawizard_progress(unsigned memory_node, unsigned may_alloc, unsigned push_requests);
+int ___starpu_datawizard_progress(unsigned memory_node, unsigned peer_node, enum _starpu_data_request_inout inout, unsigned may_alloc, unsigned push_requests);
 /** Call ___starpu_datawizard_progress() for all memory nodes driven by the
  * current worker */
 int __starpu_datawizard_progress(unsigned may_alloc, unsigned push_requests);
 /** Call __starpu_datawizard_progress with push_requests = 1 */
 void _starpu_datawizard_progress(unsigned may_alloc);
 
+/** Check for all pending data request progress on node \p memory_node */
+void _starpu_datawizard_handle_all_pending_node_data_requests(unsigned memnode);
+
 #endif // __DATAWIZARD_H__

+ 2 - 1
src/drivers/cpu/driver_cpu.c

@@ -40,6 +40,7 @@
 #include <datawizard/memory_manager.h>
 #include <datawizard/memory_nodes.h>
 #include <datawizard/malloc.h>
+#include <datawizard/datawizard.h>
 #include <core/simgrid.h>
 #include <core/task.h>
 #include <core/disk.h>
@@ -429,7 +430,7 @@ int _starpu_cpu_driver_deinit(struct _starpu_worker *cpu_worker)
 	_STARPU_TRACE_WORKER_DEINIT_START;
 
 	unsigned memnode = cpu_worker->memory_node;
-	_starpu_handle_all_pending_node_data_requests(memnode);
+	_starpu_datawizard_handle_all_pending_node_data_requests(memnode);
 
 	/* In case there remains some memory that was automatically
 	 * allocated by StarPU, we release it now. Note that data

+ 2 - 1
src/drivers/cuda/driver_cuda.c

@@ -37,6 +37,7 @@
 #include <datawizard/memory_manager.h>
 #include <datawizard/memory_nodes.h>
 #include <datawizard/malloc.h>
+#include <datawizard/datawizard.h>
 #include <core/task.h>
 #include <common/knobs.h>
 
@@ -1006,7 +1007,7 @@ int _starpu_cuda_driver_deinit(struct _starpu_worker_set *worker_set)
 		if (!usersleft)
                 {
 			/* I'm last, deinitialize device */
-			_starpu_handle_all_pending_node_data_requests(memnode);
+			_starpu_datawizard_handle_all_pending_node_data_requests(memnode);
 
 			/* In case there remains some memory that was automatically
 			 * allocated by StarPU, we release it now. Note that data

+ 2 - 1
src/drivers/opencl/driver_opencl.c

@@ -31,6 +31,7 @@
 #include <datawizard/memory_manager.h>
 #include <datawizard/memory_nodes.h>
 #include <datawizard/malloc.h>
+#include <datawizard/datawizard.h>
 #include <core/task.h>
 #include <common/knobs.h>
 
@@ -839,7 +840,7 @@ int _starpu_opencl_driver_deinit(struct _starpu_worker *worker)
 
 	unsigned memnode = worker->memory_node;
 
-	_starpu_handle_all_pending_node_data_requests(memnode);
+	_starpu_datawizard_handle_all_pending_node_data_requests(memnode);
 
 	/* In case there remains some memory that was automatically
 	 * allocated by StarPU, we release it now. Note that data

+ 26 - 5
tools/gdbinit

@@ -693,7 +693,12 @@ define starpu-print-requests
   set $node = 0
   while $node < _starpu_descr.nnodes
     printf "Node %u:\n", $node
-    starpu-print-requests-list data_requests[$node]
+    set $node2 = 0
+    while $node2 < _starpu_descr.nnodes
+      starpu-print-requests-list data_requests[$node][$node2][0]
+      starpu-print-requests-list data_requests[$node][$node2][1]
+      set $node2 = $node2 + 1
+    end
     set $node = $node + 1
   end
 end
@@ -702,8 +707,14 @@ define starpu-print-prequests
   set $node = 0
   while $node < _starpu_descr.nnodes
     printf "Node %u:\n", $node
-    printf "%u pending requests\n", data_requests_npending[$node]
-    starpu-print-requests-list data_requests_pending[$node]
+    set $node2 = 0
+    while $node2 < _starpu_descr.nnodes
+      printf "%u pending requests from %u\n", data_requests_npending[$node][$node2][0], $node2
+      starpu-print-requests-list data_requests_pending[$node][$node2][0]
+      printf "%u pending requests to %u\n", data_requests_npending[$node][$node2][1], $node2
+      starpu-print-requests-list data_requests_pending[$node][$node2][1]
+      set $node2 = $node2 + 1
+    end
     set $node = $node + 1
   end
 end
@@ -730,7 +741,12 @@ define starpu-print-frequests
   set $node = 0
   while $node < _starpu_descr.nnodes
     printf "Node %u:\n", $node
-    starpu-print-requests-list prefetch_requests[$node]
+    set $node2 = 0
+    while $node2 < _starpu_descr.nnodes
+      starpu-print-requests-list prefetch_requests[$node][$node2][0]
+      starpu-print-requests-list prefetch_requests[$node][$node2][1]
+      set $node2 = $node2 + 1
+    end
     set $node = $node + 1
   end
 end
@@ -739,7 +755,12 @@ define starpu-print-irequests
   set $node = 0
   while $node < _starpu_descr.nnodes
     printf "Node %u:\n", $node
-    starpu-print-requests-list idle_requests[$node]
+    set $node2 = 0
+    while $node2 < _starpu_descr.nnodes
+      starpu-print-requests-list idle_requests[$node][$node2][0]
+      starpu-print-requests-list idle_requests[$node][$node2][1]
+      set $node2 = $node2 + 1
+    end
     set $node = $node + 1
   end
 end