浏览代码

data_request: Fix node name

We may handle both inbound and outbound requests on a given node, so the
handling node may be either source or destination.
Samuel Thibault 4 年之前
父节点
当前提交
7bdd96b3c4
共有 2 个文件被更改,包括 48 次插入48 次删除
  1. 45 45
      src/datawizard/data_request.c
  2. 3 3
      src/datawizard/data_request.h

+ 45 - 45
src/datawizard/data_request.c

@@ -543,7 +543,7 @@ static int starpu_handle_data_request(struct _starpu_data_request *r, unsigned m
 	return 0;
 }
 
-static int __starpu_handle_node_data_requests(struct _starpu_data_request_prio_list *reqlist, unsigned src_node, unsigned may_alloc, unsigned n, unsigned *pushed, enum starpu_is_prefetch prefetch)
+static int __starpu_handle_node_data_requests(struct _starpu_data_request_prio_list *reqlist, unsigned handling_node, unsigned may_alloc, unsigned n, unsigned *pushed, enum starpu_is_prefetch prefetch)
 {
 	struct _starpu_data_request *r;
 	struct _starpu_data_request_prio_list new_data_requests[prefetch + 1]; /* Indexed by prefetch level */
@@ -556,7 +556,7 @@ static int __starpu_handle_node_data_requests(struct _starpu_data_request_prio_l
 	/* This is racy, but not posing problems actually, since we know we
 	 * will come back here to probe again regularly anyway.
 	 * Thus, do not expose this optimization to helgrind */
-	if (!STARPU_RUNNING_ON_VALGRIND && _starpu_data_request_prio_list_empty(&reqlist[src_node]))
+	if (!STARPU_RUNNING_ON_VALGRIND && _starpu_data_request_prio_list_empty(&reqlist[handling_node]))
 		return 0;
 #endif
 
@@ -564,29 +564,29 @@ static int __starpu_handle_node_data_requests(struct _starpu_data_request_prio_l
 
 #ifdef STARPU_NON_BLOCKING_DRIVERS
 	/* take all the entries from the request list */
-	if (STARPU_PTHREAD_MUTEX_TRYLOCK(&data_requests_list_mutex[src_node]))
+	if (STARPU_PTHREAD_MUTEX_TRYLOCK(&data_requests_list_mutex[handling_node]))
 	{
 		/* List is busy, do not bother with it */
 		return -EBUSY;
 	}
 #else
-	STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[src_node]);
+	STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[handling_node]);
 #endif
 
-	if (_starpu_data_request_prio_list_empty(&reqlist[src_node]))
+	if (_starpu_data_request_prio_list_empty(&reqlist[handling_node]))
 	{
 		/* there is no request */
-                STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[src_node]);
+                STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[handling_node]);
 		return 0;
 	}
 
 	/* There is an entry: we create a new empty list to replace the list of
 	 * requests, and we handle the request(s) one by one in the former
 	 * list, without concurrency issues.*/
-	struct _starpu_data_request_prio_list local_list = reqlist[src_node];
-	_starpu_data_request_prio_list_init(&reqlist[src_node]);
+	struct _starpu_data_request_prio_list local_list = reqlist[handling_node];
+	_starpu_data_request_prio_list_init(&reqlist[handling_node]);
 
-	STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[src_node]);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[handling_node]);
 
 	for (i = 0; i <= prefetch; i++)
 		_starpu_data_request_prio_list_init(&new_data_requests[i]);
@@ -597,7 +597,7 @@ static int __starpu_handle_node_data_requests(struct _starpu_data_request_prio_l
 	{
                 int res;
 
-		if (data_requests_npending[src_node] >= n)
+		if (data_requests_npending[handling_node] >= n)
 		{
 			/* Too many requests at the same time, skip pushing
 			 * more for now */
@@ -644,28 +644,28 @@ static int __starpu_handle_node_data_requests(struct _starpu_data_request_prio_l
 
 	if (i <= prefetch)
 	{
-		STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[src_node]);
+		STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[handling_node]);
 		if (!(_starpu_data_request_prio_list_empty(&new_data_requests[STARPU_FETCH])))
 		{
-			_starpu_data_request_prio_list_push_prio_list_back(&new_data_requests[STARPU_FETCH], &data_requests[src_node]);
-			data_requests[src_node] = new_data_requests[STARPU_FETCH];
+			_starpu_data_request_prio_list_push_prio_list_back(&new_data_requests[STARPU_FETCH], &data_requests[handling_node]);
+			data_requests[handling_node] = new_data_requests[STARPU_FETCH];
 		}
 		if (prefetch >= STARPU_TASK_PREFETCH && !(_starpu_data_request_prio_list_empty(&new_data_requests[STARPU_TASK_PREFETCH])))
 		{
-			_starpu_data_request_prio_list_push_prio_list_back(&new_data_requests[STARPU_TASK_PREFETCH], &prefetch_requests[src_node]);
-			prefetch_requests[src_node] = new_data_requests[STARPU_TASK_PREFETCH];
+			_starpu_data_request_prio_list_push_prio_list_back(&new_data_requests[STARPU_TASK_PREFETCH], &prefetch_requests[handling_node]);
+			prefetch_requests[handling_node] = new_data_requests[STARPU_TASK_PREFETCH];
 		}
 		if (prefetch >= STARPU_PREFETCH && !(_starpu_data_request_prio_list_empty(&new_data_requests[STARPU_PREFETCH])))
 		{
-			_starpu_data_request_prio_list_push_prio_list_back(&new_data_requests[STARPU_PREFETCH], &prefetch_requests[src_node]);
-			prefetch_requests[src_node] = new_data_requests[STARPU_PREFETCH];
+			_starpu_data_request_prio_list_push_prio_list_back(&new_data_requests[STARPU_PREFETCH], &prefetch_requests[handling_node]);
+			prefetch_requests[handling_node] = new_data_requests[STARPU_PREFETCH];
 		}
 		if (prefetch >= STARPU_IDLEFETCH && !(_starpu_data_request_prio_list_empty(&new_data_requests[STARPU_IDLEFETCH])))
 		{
-			_starpu_data_request_prio_list_push_prio_list_back(&new_data_requests[STARPU_IDLEFETCH], &idle_requests[src_node]);
-			idle_requests[src_node] = new_data_requests[STARPU_IDLEFETCH];
+			_starpu_data_request_prio_list_push_prio_list_back(&new_data_requests[STARPU_IDLEFETCH], &idle_requests[handling_node]);
+			idle_requests[handling_node] = new_data_requests[STARPU_IDLEFETCH];
 		}
-		STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[src_node]);
+		STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[handling_node]);
 
 #ifdef STARPU_SIMGRID
 		if (*pushed)
@@ -677,32 +677,32 @@ static int __starpu_handle_node_data_requests(struct _starpu_data_request_prio_l
 			 * for eviction to happen.
 			 */
 			starpu_sleep(0.000001);
-			_starpu_wake_all_blocked_workers_on_node(src_node);
+			_starpu_wake_all_blocked_workers_on_node(handling_node);
 		}
 #elif !defined(STARPU_NON_BLOCKING_DRIVERS)
-		_starpu_wake_all_blocked_workers_on_node(src_node);
+		_starpu_wake_all_blocked_workers_on_node(handling_node);
 #endif
 	}
 
 	return ret;
 }
 
-int _starpu_handle_node_data_requests(unsigned src_node, unsigned may_alloc, unsigned *pushed)
+int _starpu_handle_node_data_requests(unsigned handling_node, unsigned may_alloc, unsigned *pushed)
 {
-	return __starpu_handle_node_data_requests(data_requests, src_node, may_alloc, MAX_PENDING_REQUESTS_PER_NODE, pushed, STARPU_FETCH);
+	return __starpu_handle_node_data_requests(data_requests, handling_node, may_alloc, MAX_PENDING_REQUESTS_PER_NODE, pushed, STARPU_FETCH);
 }
 
-int _starpu_handle_node_prefetch_requests(unsigned src_node, unsigned may_alloc, unsigned *pushed)
+int _starpu_handle_node_prefetch_requests(unsigned handling_node, unsigned may_alloc, unsigned *pushed)
 {
-	return __starpu_handle_node_data_requests(prefetch_requests, src_node, may_alloc, MAX_PENDING_PREFETCH_REQUESTS_PER_NODE, pushed, STARPU_PREFETCH);
+	return __starpu_handle_node_data_requests(prefetch_requests, handling_node, may_alloc, MAX_PENDING_PREFETCH_REQUESTS_PER_NODE, pushed, STARPU_PREFETCH);
 }
 
-int _starpu_handle_node_idle_requests(unsigned src_node, unsigned may_alloc, unsigned *pushed)
+int _starpu_handle_node_idle_requests(unsigned handling_node, unsigned may_alloc, unsigned *pushed)
 {
-	return __starpu_handle_node_data_requests(idle_requests, src_node, may_alloc, MAX_PENDING_IDLE_REQUESTS_PER_NODE, pushed, STARPU_IDLEFETCH);
+	return __starpu_handle_node_data_requests(idle_requests, handling_node, may_alloc, MAX_PENDING_IDLE_REQUESTS_PER_NODE, pushed, STARPU_IDLEFETCH);
 }
 
-static int _handle_pending_node_data_requests(unsigned src_node, unsigned force)
+static int _handle_pending_node_data_requests(unsigned handling_node, unsigned force)
 {
 //	_STARPU_DEBUG("_starpu_handle_pending_node_data_requests ...\n");
 //
@@ -713,14 +713,14 @@ static int _handle_pending_node_data_requests(unsigned src_node, unsigned force)
 	/* Here helgrind would should that this is an un protected access.
 	 * We however don't care about missing an entry, we will get called
 	 * again sooner or later. */
-	if (!STARPU_RUNNING_ON_VALGRIND && _starpu_data_request_prio_list_empty(&data_requests_pending[src_node]))
+	if (!STARPU_RUNNING_ON_VALGRIND && _starpu_data_request_prio_list_empty(&data_requests_pending[handling_node]))
 		return 0;
 #endif
 
 #ifdef STARPU_NON_BLOCKING_DRIVERS
 	if (!force)
 	{
-		if (STARPU_PTHREAD_MUTEX_TRYLOCK(&data_requests_pending_list_mutex[src_node]))
+		if (STARPU_PTHREAD_MUTEX_TRYLOCK(&data_requests_pending_list_mutex[handling_node]))
 		{
 			/* List is busy, do not bother with it */
 			return 0;
@@ -729,19 +729,19 @@ static int _handle_pending_node_data_requests(unsigned src_node, unsigned force)
 	else
 #endif
 		/* We really want to handle requests */
-		STARPU_PTHREAD_MUTEX_LOCK(&data_requests_pending_list_mutex[src_node]);
+		STARPU_PTHREAD_MUTEX_LOCK(&data_requests_pending_list_mutex[handling_node]);
 
-	if (_starpu_data_request_prio_list_empty(&data_requests_pending[src_node]))
+	if (_starpu_data_request_prio_list_empty(&data_requests_pending[handling_node]))
 	{
 		/* there is no request */
-		STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[src_node]);
+		STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[handling_node]);
 		return 0;
 	}
 	/* for all entries of the list */
-	struct _starpu_data_request_prio_list local_list = data_requests_pending[src_node];
-	_starpu_data_request_prio_list_init(&data_requests_pending[src_node]);
+	struct _starpu_data_request_prio_list local_list = data_requests_pending[handling_node];
+	_starpu_data_request_prio_list_init(&data_requests_pending[handling_node]);
 
-	STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[src_node]);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[handling_node]);
 
 	_starpu_data_request_prio_list_init(&new_data_requests_pending);
 	taken = 0;
@@ -804,23 +804,23 @@ static int _handle_pending_node_data_requests(unsigned src_node, unsigned force)
 		}
 	}
 	_starpu_data_request_prio_list_deinit(&local_list);
-	STARPU_PTHREAD_MUTEX_LOCK(&data_requests_pending_list_mutex[src_node]);
-	data_requests_npending[src_node] -= taken - kept;
+	STARPU_PTHREAD_MUTEX_LOCK(&data_requests_pending_list_mutex[handling_node]);
+	data_requests_npending[handling_node] -= taken - kept;
 	if (kept)
-		_starpu_data_request_prio_list_push_prio_list_back(&data_requests_pending[src_node], &new_data_requests_pending);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[src_node]);
+		_starpu_data_request_prio_list_push_prio_list_back(&data_requests_pending[handling_node], &new_data_requests_pending);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[handling_node]);
 
 	return taken - kept;
 }
 
-int _starpu_handle_pending_node_data_requests(unsigned src_node)
+int _starpu_handle_pending_node_data_requests(unsigned handling_node)
 {
-	return _handle_pending_node_data_requests(src_node, 0);
+	return _handle_pending_node_data_requests(handling_node, 0);
 }
 
-int _starpu_handle_all_pending_node_data_requests(unsigned src_node)
+int _starpu_handle_all_pending_node_data_requests(unsigned handling_node)
 {
-	return _handle_pending_node_data_requests(src_node, 1);
+	return _handle_pending_node_data_requests(handling_node, 1);
 }
 
 /* Note: the returned value will be outdated since the locks are not taken at

+ 3 - 3
src/datawizard/data_request.h

@@ -135,9 +135,9 @@ void _starpu_init_data_request_lists(void);
 void _starpu_deinit_data_request_lists(void);
 void _starpu_post_data_request(struct _starpu_data_request *r);
 /** returns 0 if we have pushed all requests, -EBUSY or -ENOMEM otherwise */
-int _starpu_handle_node_data_requests(unsigned src_node, unsigned may_alloc, unsigned *pushed);
-int _starpu_handle_node_prefetch_requests(unsigned src_node, unsigned may_alloc, unsigned *pushed);
-int _starpu_handle_node_idle_requests(unsigned src_node, unsigned may_alloc, unsigned *pushed);
+int _starpu_handle_node_data_requests(unsigned handling_node, unsigned may_alloc, unsigned *pushed);
+int _starpu_handle_node_prefetch_requests(unsigned handling_node, unsigned may_alloc, unsigned *pushed);
+int _starpu_handle_node_idle_requests(unsigned handling_node, unsigned may_alloc, unsigned *pushed);
 
 int _starpu_handle_pending_node_data_requests(unsigned src_node);
 int _starpu_handle_all_pending_node_data_requests(unsigned src_node);