Browse Source

data_request: pickup only a few entries from the request lists

Otherwise we have to dispatch requests back into the lists, which can be
terribly expensive when there are a lot of prefetch requests.
Samuel Thibault 4 years ago
parent
commit
038033d2e7
1 changed files with 33 additions and 47 deletions
  1. 33 47
      src/datawizard/data_request.c

+ 33 - 47
src/datawizard/data_request.c

@@ -579,7 +579,6 @@ static int starpu_handle_data_request(struct _starpu_data_request *r, unsigned m
 static int __starpu_handle_node_data_requests(struct _starpu_data_request_prio_list reqlist[STARPU_MAXNODES][STARPU_MAXNODES][2], unsigned handling_node, unsigned peer_node, enum _starpu_data_request_inout inout, unsigned may_alloc, unsigned n, unsigned *pushed, enum starpu_is_prefetch prefetch)
 {
 	struct _starpu_data_request *r;
-	struct _starpu_data_request_prio_list new_data_requests[prefetch + 1]; /* Indexed by prefetch level */
 	unsigned i;
 	int ret = 0;
 
@@ -593,7 +592,11 @@ static int __starpu_handle_node_data_requests(struct _starpu_data_request_prio_l
 		return 0;
 #endif
 
-	/* TODO optimize */
+	/* We create a new list to pickup some requests from the main list, and
+	 * we handle the request(s) one by one from it, without concurrency issues.
+	 */
+	struct _starpu_data_request_list local_list, remain_list;
+	_starpu_data_request_list_init(&local_list);
 
 #ifdef STARPU_NON_BLOCKING_DRIVERS
 	/* take all the entries from the request list */
@@ -606,27 +609,30 @@ static int __starpu_handle_node_data_requests(struct _starpu_data_request_prio_l
 	STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[handling_node][peer_node][inout]);
 #endif
 
-	if (_starpu_data_request_prio_list_empty(&reqlist[handling_node][peer_node][inout]))
+	for (i = data_requests_npending[handling_node][peer_node][inout];
+		i < n && ! _starpu_data_request_prio_list_empty(&reqlist[handling_node][peer_node][inout]);
+		i++)
 	{
-		/* there is no request */
-                STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[handling_node][peer_node][inout]);
-		return 0;
+		r = _starpu_data_request_prio_list_pop_front_highest(&reqlist[handling_node][peer_node][inout]);
+		_starpu_data_request_list_push_back(&local_list, r);
 	}
 
-	/* There is an entry: we create a new empty list to replace the list of
-	 * requests, and we handle the request(s) one by one in the former
-	 * list, without concurrency issues.*/
-	struct _starpu_data_request_prio_list local_list = reqlist[handling_node][peer_node][inout];
-	_starpu_data_request_prio_list_init(&reqlist[handling_node][peer_node][inout]);
+	if (!_starpu_data_request_prio_list_empty(&reqlist[handling_node][peer_node][inout]))
+		/* We have left some requests */
+		ret = -EBUSY;
 
 	STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[handling_node][peer_node][inout]);
 
-	for (i = 0; i <= prefetch; i++)
-		_starpu_data_request_prio_list_init(&new_data_requests[i]);
+	if (_starpu_data_request_list_empty(&local_list))
+		/* there is no request */
+		return 0;
+
+	/* This will contain the remaining requests */
+	_starpu_data_request_list_init(&remain_list);
 
 	double start = starpu_timing_now();
 	/* for all entries of the list */
-	while (!_starpu_data_request_prio_list_empty(&local_list))
+	while (!_starpu_data_request_list_empty(&local_list))
 	{
                 int res;
 
@@ -638,7 +644,7 @@ static int __starpu_handle_node_data_requests(struct _starpu_data_request_prio_l
 			break;
 		}
 
-		r = _starpu_data_request_prio_list_pop_front_highest(&local_list);
+		r = _starpu_data_request_list_pop_front(&local_list);
 
 		res = starpu_handle_data_request(r, may_alloc, prefetch);
 		if (res != 0 && res != -EAGAIN)
@@ -646,7 +652,7 @@ static int __starpu_handle_node_data_requests(struct _starpu_data_request_prio_l
 			/* handle is busy, or not enough memory, postpone for now */
 			ret = res;
 			/* Prefetch requests might have gotten promoted while in tmp list */
-			_starpu_data_request_prio_list_push_back(&new_data_requests[r->prefetch], r);
+			_starpu_data_request_list_push_back(&remain_list, r);
 			if (prefetch > STARPU_FETCH)
 				/* Prefetching more there would make the situation even worse */
 				break;
@@ -662,41 +668,21 @@ static int __starpu_handle_node_data_requests(struct _starpu_data_request_prio_l
 		}
 	}
 
-	/* Push back requests we didn't handle on the proper list */
-	while (!_starpu_data_request_prio_list_empty(&local_list))
-	{
-		r = _starpu_data_request_prio_list_pop_front_highest(&local_list);
-		/* Prefetch requests might have gotten promoted while in tmp list */
-		_starpu_data_request_prio_list_push_back(&new_data_requests[r->prefetch], r);
-	}
-	_starpu_data_request_prio_list_deinit(&local_list);
-
-	for (i = 0; i <= prefetch; i++)
-		if (!_starpu_data_request_prio_list_empty(&new_data_requests[i]))
-			break;
+	/* Gather remainder */
+	_starpu_data_request_list_push_list_back(&remain_list, &local_list);
 
-	if (i <= prefetch)
+	if (!_starpu_data_request_list_empty(&remain_list))
 	{
 		STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[handling_node][peer_node][inout]);
-		if (!(_starpu_data_request_prio_list_empty(&new_data_requests[STARPU_FETCH])))
-		{
-			_starpu_data_request_prio_list_push_prio_list_back(&new_data_requests[STARPU_FETCH], &data_requests[handling_node][peer_node][inout]);
-			data_requests[handling_node][peer_node][inout] = new_data_requests[STARPU_FETCH];
-		}
-		if (prefetch >= STARPU_TASK_PREFETCH && !(_starpu_data_request_prio_list_empty(&new_data_requests[STARPU_TASK_PREFETCH])))
+		while (!_starpu_data_request_list_empty(&remain_list))
 		{
-			_starpu_data_request_prio_list_push_prio_list_back(&new_data_requests[STARPU_TASK_PREFETCH], &prefetch_requests[handling_node][peer_node][inout]);
-			prefetch_requests[handling_node][peer_node][inout] = new_data_requests[STARPU_TASK_PREFETCH];
-		}
-		if (prefetch >= STARPU_PREFETCH && !(_starpu_data_request_prio_list_empty(&new_data_requests[STARPU_PREFETCH])))
-		{
-			_starpu_data_request_prio_list_push_prio_list_back(&new_data_requests[STARPU_PREFETCH], &prefetch_requests[handling_node][peer_node][inout]);
-			prefetch_requests[handling_node][peer_node][inout] = new_data_requests[STARPU_PREFETCH];
-		}
-		if (prefetch >= STARPU_IDLEFETCH && !(_starpu_data_request_prio_list_empty(&new_data_requests[STARPU_IDLEFETCH])))
-		{
-			_starpu_data_request_prio_list_push_prio_list_back(&new_data_requests[STARPU_IDLEFETCH], &idle_requests[handling_node][peer_node][inout]);
-			idle_requests[handling_node][peer_node][inout] = new_data_requests[STARPU_IDLEFETCH];
+			r = _starpu_data_request_list_pop_front(&remain_list);
+			if (r->prefetch >= STARPU_IDLEFETCH)
+				_starpu_data_request_prio_list_push_front(&idle_requests[handling_node][r->peer_node][r->inout], r);
+			else if (r->prefetch > STARPU_FETCH)
+				_starpu_data_request_prio_list_push_front(&prefetch_requests[handling_node][r->peer_node][r->inout], r);
+			else
+				_starpu_data_request_prio_list_push_front(&data_requests[handling_node][r->peer_node][r->inout], r);
 		}
 		STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[handling_node][peer_node][inout]);