Преглед изворни кода

Revert "Use separate request queues for different memory node pairs and way"

This reverts commit f80c2b93acd0567908d9e280f7ca289189c9fd2e.

This was making tests way longer...
Samuel Thibault пре 4 година
родитељ
комит
10705be34f

+ 19 - 0
src/core/sched_policy.c

@@ -1153,6 +1153,25 @@ void _starpu_sched_post_exec_hook(struct starpu_task *task)
 	}
 }
 
+void _starpu_wait_on_sched_event(void)
+{
+	struct _starpu_worker *worker = _starpu_get_local_worker_key();
+
+	STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
+
+	_starpu_handle_all_pending_node_data_requests(worker->memory_node);
+
+	if (_starpu_machine_is_running())
+	{
+#ifndef STARPU_NON_BLOCKING_DRIVERS
+		STARPU_PTHREAD_COND_WAIT(&worker->sched_cond,
+					  &worker->sched_mutex);
+#endif
+	}
+
+	STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
+}
+
 int starpu_push_local_task(int workerid, struct starpu_task *task, int back STARPU_ATTRIBUTE_UNUSED)
 {
 	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);

+ 2 - 0
src/core/sched_policy.h

@@ -63,6 +63,8 @@ struct starpu_task *_starpu_pop_every_task(struct _starpu_sched_ctx *sched_ctx);
 void _starpu_sched_post_exec_hook(struct starpu_task *task);
 int _starpu_pop_task_end(struct starpu_task *task);
 
+void _starpu_wait_on_sched_event(void);
+
 struct starpu_task *_starpu_create_conversion_task(starpu_data_handle_t handle,
 						   unsigned int node) STARPU_ATTRIBUTE_MALLOC;
 

+ 103 - 131
src/datawizard/data_request.c

@@ -25,67 +25,57 @@
 #include <core/simgrid.h>
 
 /* requests that have not been treated at all */
-static struct _starpu_data_request_prio_list data_requests[STARPU_MAXNODES][STARPU_MAXNODES][2];
-static struct _starpu_data_request_prio_list prefetch_requests[STARPU_MAXNODES][STARPU_MAXNODES][2]; /* Contains both task_prefetch and prefetch */
-static struct _starpu_data_request_prio_list idle_requests[STARPU_MAXNODES][STARPU_MAXNODES][2];
-static starpu_pthread_mutex_t data_requests_list_mutex[STARPU_MAXNODES][STARPU_MAXNODES][2];
+#ifdef STARPU_DEVEL
+#warning split into separate out/in queues for each node, so that MAX_PENDING_REQUESTS_PER_NODE is separate for them, since the links are bidirectionnal
+#endif
+static struct _starpu_data_request_prio_list data_requests[STARPU_MAXNODES];
+static struct _starpu_data_request_prio_list prefetch_requests[STARPU_MAXNODES]; /* Contains both task_prefetch and prefetch */
+static struct _starpu_data_request_prio_list idle_requests[STARPU_MAXNODES];
+static starpu_pthread_mutex_t data_requests_list_mutex[STARPU_MAXNODES];
 
 /* requests that are not terminated (eg. async transfers) */
-static struct _starpu_data_request_prio_list data_requests_pending[STARPU_MAXNODES][STARPU_MAXNODES][2];
-static unsigned data_requests_npending[STARPU_MAXNODES][STARPU_MAXNODES][2];
-static starpu_pthread_mutex_t data_requests_pending_list_mutex[STARPU_MAXNODES][STARPU_MAXNODES][2];
+static struct _starpu_data_request_prio_list data_requests_pending[STARPU_MAXNODES];
+static unsigned data_requests_npending[STARPU_MAXNODES];
+static starpu_pthread_mutex_t data_requests_pending_list_mutex[STARPU_MAXNODES];
 
 void _starpu_init_data_request_lists(void)
 {
-	unsigned i, j;
-	enum _starpu_data_request_inout k;
+	unsigned i;
 	for (i = 0; i < STARPU_MAXNODES; i++)
 	{
-		for (j = 0; j < STARPU_MAXNODES; j++)
-		{
-			for (k = _STARPU_DATA_REQUEST_IN; k <= _STARPU_DATA_REQUEST_OUT; k++)
-			{
-				_starpu_data_request_prio_list_init(&data_requests[i][j][k]);
-				_starpu_data_request_prio_list_init(&prefetch_requests[i][j][k]);
-				_starpu_data_request_prio_list_init(&idle_requests[i][j][k]);
+		_starpu_data_request_prio_list_init(&data_requests[i]);
+		_starpu_data_request_prio_list_init(&prefetch_requests[i]);
+		_starpu_data_request_prio_list_init(&idle_requests[i]);
 
 #ifndef STARPU_DEBUG
-				/* Tell helgrind that we are fine with checking for list_empty
-				 * in _starpu_handle_node_data_requests, we will call it
-				 * periodically anyway */
-				STARPU_HG_DISABLE_CHECKING(data_requests[i][j][k].tree.root);
-				STARPU_HG_DISABLE_CHECKING(prefetch_requests[i][j][k].tree.root);
-				STARPU_HG_DISABLE_CHECKING(idle_requests[i][j][k].tree.root);
+		/* Tell helgrind that we are fine with checking for list_empty
+		 * in _starpu_handle_node_data_requests, we will call it
+		 * periodically anyway */
+		STARPU_HG_DISABLE_CHECKING(data_requests[i].tree.root);
+		STARPU_HG_DISABLE_CHECKING(prefetch_requests[i].tree.root);
+		STARPU_HG_DISABLE_CHECKING(idle_requests[i].tree.root);
 #endif
-				_starpu_data_request_prio_list_init(&data_requests_pending[i][j][k]);
-				data_requests_npending[i][j][k] = 0;
 
-				STARPU_PTHREAD_MUTEX_INIT(&data_requests_list_mutex[i][j][k], NULL);
-				STARPU_PTHREAD_MUTEX_INIT(&data_requests_pending_list_mutex[i][j][k], NULL);
-			}
-		}
+		STARPU_PTHREAD_MUTEX_INIT(&data_requests_list_mutex[i], NULL);
+
+		_starpu_data_request_prio_list_init(&data_requests_pending[i]);
+		data_requests_npending[i] = 0;
+		STARPU_PTHREAD_MUTEX_INIT(&data_requests_pending_list_mutex[i], NULL);
 	}
 	STARPU_HG_DISABLE_CHECKING(data_requests_npending);
 }
 
 void _starpu_deinit_data_request_lists(void)
 {
-	unsigned i, j;
-	enum _starpu_data_request_inout k;
+	unsigned i;
 	for (i = 0; i < STARPU_MAXNODES; i++)
 	{
-		for (j = 0; j < STARPU_MAXNODES; j++)
-		{
-			for (k = _STARPU_DATA_REQUEST_IN; k <= _STARPU_DATA_REQUEST_OUT; k++)
-			{
-				_starpu_data_request_prio_list_deinit(&data_requests[i][j][k]);
-				_starpu_data_request_prio_list_deinit(&prefetch_requests[i][j][k]);
-				_starpu_data_request_prio_list_deinit(&idle_requests[i][j][k]);
-				_starpu_data_request_prio_list_deinit(&data_requests_pending[i][j][k]);
-				STARPU_PTHREAD_MUTEX_DESTROY(&data_requests_pending_list_mutex[i][j][k]);
-				STARPU_PTHREAD_MUTEX_DESTROY(&data_requests_list_mutex[i][j][k]);
-			}
-		}
+		_starpu_data_request_prio_list_deinit(&data_requests[i]);
+		_starpu_data_request_prio_list_deinit(&prefetch_requests[i]);
+		_starpu_data_request_prio_list_deinit(&idle_requests[i]);
+		STARPU_PTHREAD_MUTEX_DESTROY(&data_requests_pending_list_mutex[i]);
+		_starpu_data_request_prio_list_deinit(&data_requests_pending[i]);
+		STARPU_PTHREAD_MUTEX_DESTROY(&data_requests_list_mutex[i]);
 	}
 }
 
@@ -163,24 +153,6 @@ struct _starpu_data_request *_starpu_create_data_request(starpu_data_handle_t ha
 	if (handling_node == -1)
 		handling_node = STARPU_MAIN_RAM;
 	r->handling_node = handling_node;
-	if (dst_replicate && handling_node == dst_replicate->memory_node)
-	{
-		if (src_replicate)
-			r->peer_node = src_replicate->memory_node;
-		else
-			r->peer_node = 0;
-		r->inout = _STARPU_DATA_REQUEST_IN;
-	}
-	else if (src_replicate)
-	{
-		r->peer_node = dst_replicate->memory_node;
-		r->inout = _STARPU_DATA_REQUEST_OUT;
-	}
-	else
-	{
-		r->peer_node = 0;
-		r->inout = _STARPU_DATA_REQUEST_OUT;
-	}
 	STARPU_ASSERT(starpu_node_get_kind(handling_node) == STARPU_CPU_RAM || _starpu_memory_node_get_nworkers(handling_node));
 	r->completed = 0;
 	r->prefetch = is_prefetch;
@@ -338,14 +310,14 @@ void _starpu_post_data_request(struct _starpu_data_request *r)
 	}
 
 	/* insert the request in the proper list */
-	STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[handling_node][r->peer_node][r->inout]);
+	STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[handling_node]);
 	if (r->prefetch >= STARPU_IDLEFETCH)
-		_starpu_data_request_prio_list_push_back(&idle_requests[handling_node][r->peer_node][r->inout], r);
+		_starpu_data_request_prio_list_push_back(&idle_requests[handling_node], r);
 	else if (r->prefetch > STARPU_FETCH)
-		_starpu_data_request_prio_list_push_back(&prefetch_requests[handling_node][r->peer_node][r->inout], r);
+		_starpu_data_request_prio_list_push_back(&prefetch_requests[handling_node], r);
 	else
-		_starpu_data_request_prio_list_push_back(&data_requests[handling_node][r->peer_node][r->inout], r);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[handling_node][r->peer_node][r->inout]);
+		_starpu_data_request_prio_list_push_back(&data_requests[handling_node], r);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[handling_node]);
 
 #ifndef STARPU_NON_BLOCKING_DRIVERS
 	_starpu_wake_all_blocked_workers_on_node(handling_node);
@@ -556,10 +528,10 @@ static int starpu_handle_data_request(struct _starpu_data_request *r, unsigned m
 		 * requests in the meantime. */
 		_starpu_spin_unlock(&handle->header_lock);
 
-		STARPU_PTHREAD_MUTEX_LOCK(&data_requests_pending_list_mutex[r->handling_node][r->peer_node][r->inout]);
-		_starpu_data_request_prio_list_push_back(&data_requests_pending[r->handling_node][r->peer_node][r->inout], r);
-		data_requests_npending[r->handling_node][r->peer_node][r->inout]++;
-		STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[r->handling_node][r->peer_node][r->inout]);
+		STARPU_PTHREAD_MUTEX_LOCK(&data_requests_pending_list_mutex[r->handling_node]);
+		_starpu_data_request_prio_list_push_back(&data_requests_pending[r->handling_node], r);
+		data_requests_npending[r->handling_node]++;
+		STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[r->handling_node]);
 
 		return -EAGAIN;
 	}
@@ -571,7 +543,7 @@ static int starpu_handle_data_request(struct _starpu_data_request *r, unsigned m
 	return 0;
 }
 
-static int __starpu_handle_node_data_requests(struct _starpu_data_request_prio_list reqlist[STARPU_MAXNODES][STARPU_MAXNODES][2], unsigned handling_node, unsigned peer_node, enum _starpu_data_request_inout inout, unsigned may_alloc, unsigned n, unsigned *pushed, enum starpu_is_prefetch prefetch)
+static int __starpu_handle_node_data_requests(struct _starpu_data_request_prio_list *reqlist, unsigned handling_node, unsigned may_alloc, unsigned n, unsigned *pushed, enum starpu_is_prefetch prefetch)
 {
 	struct _starpu_data_request *r;
 	struct _starpu_data_request_prio_list new_data_requests[prefetch + 1]; /* Indexed by prefetch level */
@@ -584,7 +556,7 @@ static int __starpu_handle_node_data_requests(struct _starpu_data_request_prio_l
 	/* This is racy, but not posing problems actually, since we know we
 	 * will come back here to probe again regularly anyway.
 	 * Thus, do not expose this optimization to helgrind */
-	if (!STARPU_RUNNING_ON_VALGRIND && _starpu_data_request_prio_list_empty(&reqlist[handling_node][peer_node][inout]))
+	if (!STARPU_RUNNING_ON_VALGRIND && _starpu_data_request_prio_list_empty(&reqlist[handling_node]))
 		return 0;
 #endif
 
@@ -592,29 +564,29 @@ static int __starpu_handle_node_data_requests(struct _starpu_data_request_prio_l
 
 #ifdef STARPU_NON_BLOCKING_DRIVERS
 	/* take all the entries from the request list */
-	if (STARPU_PTHREAD_MUTEX_TRYLOCK(&data_requests_list_mutex[handling_node][peer_node][inout]))
+	if (STARPU_PTHREAD_MUTEX_TRYLOCK(&data_requests_list_mutex[handling_node]))
 	{
 		/* List is busy, do not bother with it */
 		return -EBUSY;
 	}
 #else
-	STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[handling_node][peer_node][inout]);
+	STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[handling_node]);
 #endif
 
-	if (_starpu_data_request_prio_list_empty(&reqlist[handling_node][peer_node][inout]))
+	if (_starpu_data_request_prio_list_empty(&reqlist[handling_node]))
 	{
 		/* there is no request */
-                STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[handling_node][peer_node][inout]);
+                STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[handling_node]);
 		return 0;
 	}
 
 	/* There is an entry: we create a new empty list to replace the list of
 	 * requests, and we handle the request(s) one by one in the former
 	 * list, without concurrency issues.*/
-	struct _starpu_data_request_prio_list local_list = reqlist[handling_node][peer_node][inout];
-	_starpu_data_request_prio_list_init(&reqlist[handling_node][peer_node][inout]);
+	struct _starpu_data_request_prio_list local_list = reqlist[handling_node];
+	_starpu_data_request_prio_list_init(&reqlist[handling_node]);
 
-	STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[handling_node][peer_node][inout]);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[handling_node]);
 
 	for (i = 0; i <= prefetch; i++)
 		_starpu_data_request_prio_list_init(&new_data_requests[i]);
@@ -625,7 +597,7 @@ static int __starpu_handle_node_data_requests(struct _starpu_data_request_prio_l
 	{
                 int res;
 
-		if (data_requests_npending[handling_node][peer_node][inout] >= n)
+		if (data_requests_npending[handling_node] >= n)
 		{
 			/* Too many requests at the same time, skip pushing
 			 * more for now */
@@ -672,28 +644,28 @@ static int __starpu_handle_node_data_requests(struct _starpu_data_request_prio_l
 
 	if (i <= prefetch)
 	{
-		STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[handling_node][peer_node][inout]);
+		STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[handling_node]);
 		if (!(_starpu_data_request_prio_list_empty(&new_data_requests[STARPU_FETCH])))
 		{
-			_starpu_data_request_prio_list_push_prio_list_back(&new_data_requests[STARPU_FETCH], &data_requests[handling_node][peer_node][inout]);
-			data_requests[handling_node][peer_node][inout] = new_data_requests[STARPU_FETCH];
+			_starpu_data_request_prio_list_push_prio_list_back(&new_data_requests[STARPU_FETCH], &data_requests[handling_node]);
+			data_requests[handling_node] = new_data_requests[STARPU_FETCH];
 		}
 		if (prefetch >= STARPU_TASK_PREFETCH && !(_starpu_data_request_prio_list_empty(&new_data_requests[STARPU_TASK_PREFETCH])))
 		{
-			_starpu_data_request_prio_list_push_prio_list_back(&new_data_requests[STARPU_TASK_PREFETCH], &prefetch_requests[handling_node][peer_node][inout]);
-			prefetch_requests[handling_node][peer_node][inout] = new_data_requests[STARPU_TASK_PREFETCH];
+			_starpu_data_request_prio_list_push_prio_list_back(&new_data_requests[STARPU_TASK_PREFETCH], &prefetch_requests[handling_node]);
+			prefetch_requests[handling_node] = new_data_requests[STARPU_TASK_PREFETCH];
 		}
 		if (prefetch >= STARPU_PREFETCH && !(_starpu_data_request_prio_list_empty(&new_data_requests[STARPU_PREFETCH])))
 		{
-			_starpu_data_request_prio_list_push_prio_list_back(&new_data_requests[STARPU_PREFETCH], &prefetch_requests[handling_node][peer_node][inout]);
-			prefetch_requests[handling_node][peer_node][inout] = new_data_requests[STARPU_PREFETCH];
+			_starpu_data_request_prio_list_push_prio_list_back(&new_data_requests[STARPU_PREFETCH], &prefetch_requests[handling_node]);
+			prefetch_requests[handling_node] = new_data_requests[STARPU_PREFETCH];
 		}
 		if (prefetch >= STARPU_IDLEFETCH && !(_starpu_data_request_prio_list_empty(&new_data_requests[STARPU_IDLEFETCH])))
 		{
-			_starpu_data_request_prio_list_push_prio_list_back(&new_data_requests[STARPU_IDLEFETCH], &idle_requests[handling_node][peer_node][inout]);
-			idle_requests[handling_node][peer_node][inout] = new_data_requests[STARPU_IDLEFETCH];
+			_starpu_data_request_prio_list_push_prio_list_back(&new_data_requests[STARPU_IDLEFETCH], &idle_requests[handling_node]);
+			idle_requests[handling_node] = new_data_requests[STARPU_IDLEFETCH];
 		}
-		STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[handling_node][peer_node][inout]);
+		STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[handling_node]);
 
 #ifdef STARPU_SIMGRID
 		if (*pushed)
@@ -715,22 +687,22 @@ static int __starpu_handle_node_data_requests(struct _starpu_data_request_prio_l
 	return ret;
 }
 
-int _starpu_handle_node_data_requests(unsigned handling_node, unsigned peer_node, enum _starpu_data_request_inout inout, unsigned may_alloc, unsigned *pushed)
+int _starpu_handle_node_data_requests(unsigned handling_node, unsigned may_alloc, unsigned *pushed)
 {
-	return __starpu_handle_node_data_requests(data_requests, handling_node, peer_node, inout, may_alloc, MAX_PENDING_REQUESTS_PER_NODE, pushed, STARPU_FETCH);
+	return __starpu_handle_node_data_requests(data_requests, handling_node, may_alloc, MAX_PENDING_REQUESTS_PER_NODE, pushed, STARPU_FETCH);
 }
 
-int _starpu_handle_node_prefetch_requests(unsigned handling_node, unsigned peer_node, enum _starpu_data_request_inout inout, unsigned may_alloc, unsigned *pushed)
+int _starpu_handle_node_prefetch_requests(unsigned handling_node, unsigned may_alloc, unsigned *pushed)
 {
-	return __starpu_handle_node_data_requests(prefetch_requests, handling_node, peer_node, inout, may_alloc, MAX_PENDING_PREFETCH_REQUESTS_PER_NODE, pushed, STARPU_PREFETCH);
+	return __starpu_handle_node_data_requests(prefetch_requests, handling_node, may_alloc, MAX_PENDING_PREFETCH_REQUESTS_PER_NODE, pushed, STARPU_PREFETCH);
 }
 
-int _starpu_handle_node_idle_requests(unsigned handling_node, unsigned peer_node, enum _starpu_data_request_inout inout, unsigned may_alloc, unsigned *pushed)
+int _starpu_handle_node_idle_requests(unsigned handling_node, unsigned may_alloc, unsigned *pushed)
 {
-	return __starpu_handle_node_data_requests(idle_requests, handling_node, peer_node, inout, may_alloc, MAX_PENDING_IDLE_REQUESTS_PER_NODE, pushed, STARPU_IDLEFETCH);
+	return __starpu_handle_node_data_requests(idle_requests, handling_node, may_alloc, MAX_PENDING_IDLE_REQUESTS_PER_NODE, pushed, STARPU_IDLEFETCH);
 }
 
-static int _handle_pending_node_data_requests(unsigned handling_node, unsigned peer_node, enum _starpu_data_request_inout inout, unsigned force)
+static int _handle_pending_node_data_requests(unsigned handling_node, unsigned force)
 {
 //	_STARPU_DEBUG("_starpu_handle_pending_node_data_requests ...\n");
 //
@@ -741,14 +713,14 @@ static int _handle_pending_node_data_requests(unsigned handling_node, unsigned p
 	/* Here helgrind would should that this is an un protected access.
 	 * We however don't care about missing an entry, we will get called
 	 * again sooner or later. */
-	if (!STARPU_RUNNING_ON_VALGRIND && _starpu_data_request_prio_list_empty(&data_requests_pending[handling_node][peer_node][inout]))
+	if (!STARPU_RUNNING_ON_VALGRIND && _starpu_data_request_prio_list_empty(&data_requests_pending[handling_node]))
 		return 0;
 #endif
 
 #ifdef STARPU_NON_BLOCKING_DRIVERS
 	if (!force)
 	{
-		if (STARPU_PTHREAD_MUTEX_TRYLOCK(&data_requests_pending_list_mutex[handling_node][peer_node][inout]))
+		if (STARPU_PTHREAD_MUTEX_TRYLOCK(&data_requests_pending_list_mutex[handling_node]))
 		{
 			/* List is busy, do not bother with it */
 			return 0;
@@ -757,19 +729,19 @@ static int _handle_pending_node_data_requests(unsigned handling_node, unsigned p
 	else
 #endif
 		/* We really want to handle requests */
-		STARPU_PTHREAD_MUTEX_LOCK(&data_requests_pending_list_mutex[handling_node][peer_node][inout]);
+		STARPU_PTHREAD_MUTEX_LOCK(&data_requests_pending_list_mutex[handling_node]);
 
-	if (_starpu_data_request_prio_list_empty(&data_requests_pending[handling_node][peer_node][inout]))
+	if (_starpu_data_request_prio_list_empty(&data_requests_pending[handling_node]))
 	{
 		/* there is no request */
-		STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[handling_node][peer_node][inout]);
+		STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[handling_node]);
 		return 0;
 	}
 	/* for all entries of the list */
-	struct _starpu_data_request_prio_list local_list = data_requests_pending[handling_node][peer_node][inout];
-	_starpu_data_request_prio_list_init(&data_requests_pending[handling_node][peer_node][inout]);
+	struct _starpu_data_request_prio_list local_list = data_requests_pending[handling_node];
+	_starpu_data_request_prio_list_init(&data_requests_pending[handling_node]);
 
-	STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[handling_node][peer_node][inout]);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[handling_node]);
 
 	_starpu_data_request_prio_list_init(&new_data_requests_pending);
 	taken = 0;
@@ -832,49 +804,49 @@ static int _handle_pending_node_data_requests(unsigned handling_node, unsigned p
 		}
 	}
 	_starpu_data_request_prio_list_deinit(&local_list);
-	STARPU_PTHREAD_MUTEX_LOCK(&data_requests_pending_list_mutex[handling_node][peer_node][inout]);
-	data_requests_npending[handling_node][peer_node][inout] -= taken - kept;
+	STARPU_PTHREAD_MUTEX_LOCK(&data_requests_pending_list_mutex[handling_node]);
+	data_requests_npending[handling_node] -= taken - kept;
 	if (kept)
-		_starpu_data_request_prio_list_push_prio_list_back(&data_requests_pending[handling_node][peer_node][inout], &new_data_requests_pending);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[handling_node][peer_node][inout]);
+		_starpu_data_request_prio_list_push_prio_list_back(&data_requests_pending[handling_node], &new_data_requests_pending);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[handling_node]);
 
 	return taken - kept;
 }
 
-int _starpu_handle_pending_node_data_requests(unsigned handling_node, unsigned peer_node, enum _starpu_data_request_inout inout)
+int _starpu_handle_pending_node_data_requests(unsigned handling_node)
 {
-	return _handle_pending_node_data_requests(handling_node, peer_node, inout, 0);
+	return _handle_pending_node_data_requests(handling_node, 0);
 }
 
-int _starpu_handle_all_pending_node_data_requests(unsigned handling_node, unsigned peer_node, enum _starpu_data_request_inout inout)
+int _starpu_handle_all_pending_node_data_requests(unsigned handling_node)
 {
-	return _handle_pending_node_data_requests(handling_node, peer_node, inout, 1);
+	return _handle_pending_node_data_requests(handling_node, 1);
 }
 
 /* Note: the returned value will be outdated since the locks are not taken at
  * entry/exit */
-int _starpu_check_that_no_data_request_exists(unsigned node, unsigned peer_node, enum _starpu_data_request_inout inout)
+int _starpu_check_that_no_data_request_exists(unsigned node)
 {
 	int no_request;
 	int no_pending;
 
-	STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[node][peer_node][inout]);
-	no_request = _starpu_data_request_prio_list_empty(&data_requests[node][peer_node][inout])
-	          && _starpu_data_request_prio_list_empty(&prefetch_requests[node][peer_node][inout])
-		  && _starpu_data_request_prio_list_empty(&idle_requests[node][peer_node][inout]);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[node][peer_node][inout]);
-	STARPU_PTHREAD_MUTEX_LOCK(&data_requests_pending_list_mutex[node][peer_node][inout]);
-	no_pending = !data_requests_npending[node][peer_node][inout];
-	STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[node][peer_node][inout]);
+	STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[node]);
+	no_request = _starpu_data_request_prio_list_empty(&data_requests[node])
+	          && _starpu_data_request_prio_list_empty(&prefetch_requests[node])
+		  && _starpu_data_request_prio_list_empty(&idle_requests[node]);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[node]);
+	STARPU_PTHREAD_MUTEX_LOCK(&data_requests_pending_list_mutex[node]);
+	no_pending = !data_requests_npending[node];
+	STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[node]);
 
 	return no_request && no_pending;
 }
 
 /* Note: the returned value will be outdated since the locks are not taken at
  * entry/exit */
-int _starpu_check_that_no_data_request_is_pending(unsigned node, unsigned peer_node, enum _starpu_data_request_inout inout)
+int _starpu_check_that_no_data_request_is_pending(unsigned node)
 {
-	return !data_requests_npending[node][peer_node][inout];
+	return !data_requests_npending[node];
 }
 
 
@@ -896,27 +868,27 @@ void _starpu_update_prefetch_status(struct _starpu_data_request *r, enum starpu_
 			_starpu_update_prefetch_status(next_req, prefetch);
 	}
 
-	STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[r->handling_node][r->peer_node][r->inout]);
+	STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[r->handling_node]);
 
 	int found = 1;
 
 	/* The request can be in a different list (handling request or the temp list)
 	 * we have to check that it is really in the prefetch or idle list. */
-	if (_starpu_data_request_prio_list_ismember(&prefetch_requests[r->handling_node][r->peer_node][r->inout], r))
-		_starpu_data_request_prio_list_erase(&prefetch_requests[r->handling_node][r->peer_node][r->inout], r);
-	else if (_starpu_data_request_prio_list_ismember(&idle_requests[r->handling_node][r->peer_node][r->inout], r))
-		_starpu_data_request_prio_list_erase(&idle_requests[r->handling_node][r->peer_node][r->inout], r);
+	if (_starpu_data_request_prio_list_ismember(&prefetch_requests[r->handling_node], r))
+		_starpu_data_request_prio_list_erase(&prefetch_requests[r->handling_node], r);
+	else if (_starpu_data_request_prio_list_ismember(&idle_requests[r->handling_node], r))
+		_starpu_data_request_prio_list_erase(&idle_requests[r->handling_node], r);
 	else
 		found = 0;
 
 	if (found)
 	{
 		if (prefetch > STARPU_FETCH)
-			_starpu_data_request_prio_list_push_back(&prefetch_requests[r->handling_node][r->peer_node][r->inout],r);
+			_starpu_data_request_prio_list_push_back(&prefetch_requests[r->handling_node],r);
 		else
-			_starpu_data_request_prio_list_push_back(&data_requests[r->handling_node][r->peer_node][r->inout],r);
+			_starpu_data_request_prio_list_push_back(&data_requests[r->handling_node],r);
 	}
-	STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[r->handling_node][r->peer_node][r->inout]);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[r->handling_node]);
 
 #ifndef STARPU_NON_BLOCKING_DRIVERS
 	_starpu_wake_all_blocked_workers_on_node(r->handling_node);

+ 9 - 15
src/datawizard/data_request.h

@@ -32,8 +32,8 @@
  * Data interfaces should also have to declare how many asynchronous requests
  * they have actually started (think of e.g. csr).
  */
-#define MAX_PENDING_REQUESTS_PER_NODE 5
-#define MAX_PENDING_PREFETCH_REQUESTS_PER_NODE 2
+#define MAX_PENDING_REQUESTS_PER_NODE 20
+#define MAX_PENDING_PREFETCH_REQUESTS_PER_NODE 10
 #define MAX_PENDING_IDLE_REQUESTS_PER_NODE 1
 /** Maximum time in us that we can afford pushing requests before going back to the driver loop, e.g. for checking GPU task termination */
 #define MAX_PUSH_TIME 1000
@@ -47,10 +47,6 @@ struct _starpu_callback_list
 	struct _starpu_callback_list *next;
 };
 
-enum _starpu_data_request_inout {
-	_STARPU_DATA_REQUEST_IN, _STARPU_DATA_REQUEST_OUT
-};
-
 /** This represents a data request, i.e. we want some data to get transferred
  * from a source to a destination. */
 LIST_TYPE(_starpu_data_request,
@@ -67,8 +63,6 @@ LIST_TYPE(_starpu_data_request,
 	 * the node can make the CUDA/OpenCL calls.
 	 */
 	unsigned handling_node;
-	unsigned peer_node;
-	enum _starpu_data_request_inout inout;
 
 	/*
 	 * What the destination node wants to do with the data: write to it,
@@ -141,15 +135,15 @@ void _starpu_init_data_request_lists(void);
 void _starpu_deinit_data_request_lists(void);
 void _starpu_post_data_request(struct _starpu_data_request *r);
 /** returns 0 if we have pushed all requests, -EBUSY or -ENOMEM otherwise */
-int _starpu_handle_node_data_requests(unsigned handling_node, unsigned peer_node, enum _starpu_data_request_inout inout, unsigned may_alloc, unsigned *pushed);
-int _starpu_handle_node_prefetch_requests(unsigned handling_node, unsigned peer_node, enum _starpu_data_request_inout inout, unsigned may_alloc, unsigned *pushed);
-int _starpu_handle_node_idle_requests(unsigned handling_node, unsigned peer_node, enum _starpu_data_request_inout inout, unsigned may_alloc, unsigned *pushed);
+int _starpu_handle_node_data_requests(unsigned handling_node, unsigned may_alloc, unsigned *pushed);
+int _starpu_handle_node_prefetch_requests(unsigned handling_node, unsigned may_alloc, unsigned *pushed);
+int _starpu_handle_node_idle_requests(unsigned handling_node, unsigned may_alloc, unsigned *pushed);
 
-int _starpu_handle_pending_node_data_requests(unsigned handling_node, unsigned peer_node, enum _starpu_data_request_inout inout);
-int _starpu_handle_all_pending_node_data_requests(unsigned handling_node, unsigned peer_node, enum _starpu_data_request_inout inout);
+int _starpu_handle_pending_node_data_requests(unsigned src_node);
+int _starpu_handle_all_pending_node_data_requests(unsigned src_node);
 
-int _starpu_check_that_no_data_request_exists(unsigned handling_node, unsigned peer_node, enum _starpu_data_request_inout inout);
-int _starpu_check_that_no_data_request_is_pending(unsigned handling_node, unsigned peer_node, enum _starpu_data_request_inout inout);
+int _starpu_check_that_no_data_request_exists(unsigned node);
+int _starpu_check_that_no_data_request_is_pending(unsigned node);
 
 struct _starpu_data_request *_starpu_create_data_request(starpu_data_handle_t handle,
 							 struct _starpu_data_replicate *src_replicate,

+ 10 - 30
src/datawizard/datawizard.c

@@ -26,7 +26,7 @@
 #include <core/simgrid.h>
 #endif
 
-int ___starpu_datawizard_progress(unsigned memory_node, unsigned peer_node, enum _starpu_data_request_inout inout, unsigned may_alloc, unsigned push_requests)
+int ___starpu_datawizard_progress(unsigned memory_node, unsigned may_alloc, unsigned push_requests)
 {
 	int ret = 0;
 
@@ -37,7 +37,7 @@ int ___starpu_datawizard_progress(unsigned memory_node, unsigned peer_node, enum
 	STARPU_UYIELD();
 
 	/* in case some other driver requested data */
-	if (_starpu_handle_pending_node_data_requests(memory_node, peer_node, inout))
+	if (_starpu_handle_pending_node_data_requests(memory_node))
 		ret = 1;
 
 	starpu_memchunk_tidy(memory_node);
@@ -46,16 +46,16 @@ int ___starpu_datawizard_progress(unsigned memory_node, unsigned peer_node, enum
 	{
 		/* Some transfers have finished, or the driver requests to really push more */
 		unsigned pushed;
-		if (_starpu_handle_node_data_requests(memory_node, peer_node, inout, may_alloc, &pushed) == 0)
+		if (_starpu_handle_node_data_requests(memory_node, may_alloc, &pushed) == 0)
 		{
 			if (pushed)
 				ret = 1;
 			/* We pushed all pending requests, we can afford pushing
 			 * prefetch requests */
-			_starpu_handle_node_prefetch_requests(memory_node, peer_node, inout, may_alloc, &pushed);
-			if (_starpu_check_that_no_data_request_is_pending(memory_node, peer_node, inout))
+			_starpu_handle_node_prefetch_requests(memory_node, may_alloc, &pushed);
+			if (_starpu_check_that_no_data_request_is_pending(memory_node))
 				/* No pending transfer, push some idle transfer */
-				_starpu_handle_node_idle_requests(memory_node, peer_node, inout, may_alloc, &pushed);
+				_starpu_handle_node_idle_requests(memory_node, may_alloc, &pushed);
 		}
 		if (pushed)
 			ret = 1;
@@ -68,20 +68,16 @@ int ___starpu_datawizard_progress(unsigned memory_node, unsigned peer_node, enum
 int __starpu_datawizard_progress(unsigned may_alloc, unsigned push_requests)
 {
 	struct _starpu_worker *worker = _starpu_get_local_worker_key();
-        unsigned memnode, memnode2;
+        unsigned memnode;
 
 	if (!worker)
 	{
 		/* Call from main application, only make RAM requests progress */
 		int ret = 0;
 		int nnumas = starpu_memory_nodes_get_numa_count();
-		int numa, numa2;
+		int numa;
 		for (numa = 0; numa < nnumas; numa++)
-			for (numa2 = 0; numa2 < nnumas; numa2++)
-			{
-				ret |=  ___starpu_datawizard_progress(numa, numa2, _STARPU_DATA_REQUEST_IN, may_alloc, push_requests);
-				ret |=  ___starpu_datawizard_progress(numa, numa2, _STARPU_DATA_REQUEST_OUT, may_alloc, push_requests);
-			}
+			ret |=  ___starpu_datawizard_progress(numa, may_alloc, push_requests);
 
 		return ret;
 	}
@@ -97,11 +93,7 @@ int __starpu_datawizard_progress(unsigned may_alloc, unsigned push_requests)
         for (memnode = 0; memnode < nnodes; memnode++)
         {
                 if (_starpu_worker_drives_memory[current_worker_id][memnode] == 1)
-			for (memnode2 = 0; memnode2 < nnodes; memnode2++)
-			{
-				ret |= ___starpu_datawizard_progress(memnode, memnode2, _STARPU_DATA_REQUEST_IN, may_alloc, push_requests);
-				ret |= ___starpu_datawizard_progress(memnode, memnode2, _STARPU_DATA_REQUEST_OUT, may_alloc, push_requests);
-			}
+                        ret |= ___starpu_datawizard_progress(memnode, may_alloc, push_requests);
         }
 
         return ret;
@@ -111,15 +103,3 @@ void _starpu_datawizard_progress(unsigned may_alloc)
 {
         __starpu_datawizard_progress(may_alloc, 1);
 }
-
-void _starpu_datawizard_handle_all_pending_node_data_requests(unsigned memnode)
-{
-	unsigned nnodes = starpu_memory_nodes_get_count();
-	unsigned memnode2;
-
-	for (memnode2 = 0; memnode2 < nnodes; memnode2++)
-	{
-		_starpu_handle_all_pending_node_data_requests(memnode, memnode2, _STARPU_DATA_REQUEST_IN);
-		_starpu_handle_all_pending_node_data_requests(memnode, memnode2, _STARPU_DATA_REQUEST_OUT);
-	}
-}

+ 1 - 4
src/datawizard/datawizard.h

@@ -41,14 +41,11 @@
  * If \p may_alloc is 1, it can allocate destination data for transfers
  * (this is not possible e.g. when spinning for a handle lock)
  */
-int ___starpu_datawizard_progress(unsigned memory_node, unsigned peer_node, enum _starpu_data_request_inout inout, unsigned may_alloc, unsigned push_requests);
+int ___starpu_datawizard_progress(unsigned memory_node, unsigned may_alloc, unsigned push_requests);
 /** Call ___starpu_datawizard_progress() for all memory nodes driven by the
  * current worker */
 int __starpu_datawizard_progress(unsigned may_alloc, unsigned push_requests);
 /** Call __starpu_datawizard_progress with push_requests = 1 */
 void _starpu_datawizard_progress(unsigned may_alloc);
 
-/** Check for all pending data request progress on node \p memory_node */
-void _starpu_datawizard_handle_all_pending_node_data_requests(unsigned memnode);
-
 #endif // __DATAWIZARD_H__

+ 1 - 2
src/drivers/cpu/driver_cpu.c

@@ -40,7 +40,6 @@
 #include <datawizard/memory_manager.h>
 #include <datawizard/memory_nodes.h>
 #include <datawizard/malloc.h>
-#include <datawizard/datawizard.h>
 #include <core/simgrid.h>
 #include <core/task.h>
 #include <core/disk.h>
@@ -430,7 +429,7 @@ int _starpu_cpu_driver_deinit(struct _starpu_worker *cpu_worker)
 	_STARPU_TRACE_WORKER_DEINIT_START;
 
 	unsigned memnode = cpu_worker->memory_node;
-	_starpu_datawizard_handle_all_pending_node_data_requests(memnode);
+	_starpu_handle_all_pending_node_data_requests(memnode);
 
 	/* In case there remains some memory that was automatically
 	 * allocated by StarPU, we release it now. Note that data

+ 1 - 2
src/drivers/cuda/driver_cuda.c

@@ -37,7 +37,6 @@
 #include <datawizard/memory_manager.h>
 #include <datawizard/memory_nodes.h>
 #include <datawizard/malloc.h>
-#include <datawizard/datawizard.h>
 #include <core/task.h>
 #include <common/knobs.h>
 
@@ -1007,7 +1006,7 @@ int _starpu_cuda_driver_deinit(struct _starpu_worker_set *worker_set)
 		if (!usersleft)
                 {
 			/* I'm last, deinitialize device */
-			_starpu_datawizard_handle_all_pending_node_data_requests(memnode);
+			_starpu_handle_all_pending_node_data_requests(memnode);
 
 			/* In case there remains some memory that was automatically
 			 * allocated by StarPU, we release it now. Note that data

+ 1 - 2
src/drivers/opencl/driver_opencl.c

@@ -31,7 +31,6 @@
 #include <datawizard/memory_manager.h>
 #include <datawizard/memory_nodes.h>
 #include <datawizard/malloc.h>
-#include <datawizard/datawizard.h>
 #include <core/task.h>
 #include <common/knobs.h>
 
@@ -840,7 +839,7 @@ int _starpu_opencl_driver_deinit(struct _starpu_worker *worker)
 
 	unsigned memnode = worker->memory_node;
 
-	_starpu_datawizard_handle_all_pending_node_data_requests(memnode);
+	_starpu_handle_all_pending_node_data_requests(memnode);
 
 	/* In case there remains some memory that was automatically
 	 * allocated by StarPU, we release it now. Note that data

+ 5 - 26
tools/gdbinit

@@ -693,12 +693,7 @@ define starpu-print-requests
   set $node = 0
   while $node < _starpu_descr.nnodes
     printf "Node %u:\n", $node
-    set $node2 = 0
-    while $node2 < _starpu_descr.nnodes
-      starpu-print-requests-list data_requests[$node][$node2][0]
-      starpu-print-requests-list data_requests[$node][$node2][1]
-      set $node2 = $node2 + 1
-    end
+    starpu-print-requests-list data_requests[$node]
     set $node = $node + 1
   end
 end
@@ -707,14 +702,8 @@ define starpu-print-prequests
   set $node = 0
   while $node < _starpu_descr.nnodes
     printf "Node %u:\n", $node
-    set $node2 = 0
-    while $node2 < _starpu_descr.nnodes
-      printf "%u pending requests from %u\n", data_requests_npending[$node][$node2][0], $node2
-      starpu-print-requests-list data_requests_pending[$node][$node2][0]
-      printf "%u pending requests to %u\n", data_requests_npending[$node][$node2][1], $node2
-      starpu-print-requests-list data_requests_pending[$node][$node2][1]
-      set $node2 = $node2 + 1
-    end
+    printf "%u pending requests\n", data_requests_npending[$node]
+    starpu-print-requests-list data_requests_pending[$node]
     set $node = $node + 1
   end
 end
@@ -741,12 +730,7 @@ define starpu-print-frequests
   set $node = 0
   while $node < _starpu_descr.nnodes
     printf "Node %u:\n", $node
-    set $node2 = 0
-    while $node2 < _starpu_descr.nnodes
-      starpu-print-requests-list prefetch_requests[$node][$node2][0]
-      starpu-print-requests-list prefetch_requests[$node][$node2][1]
-      set $node2 = $node2 + 1
-    end
+    starpu-print-requests-list prefetch_requests[$node]
     set $node = $node + 1
   end
 end
@@ -755,12 +739,7 @@ define starpu-print-irequests
   set $node = 0
   while $node < _starpu_descr.nnodes
     printf "Node %u:\n", $node
-    set $node2 = 0
-    while $node2 < _starpu_descr.nnodes
-      starpu-print-requests-list idle_requests[$node][$node2][0]
-      starpu-print-requests-list idle_requests[$node][$node2][1]
-      set $node2 = $node2 + 1
-    end
+    starpu-print-requests-list idle_requests[$node]
     set $node = $node + 1
   end
 end