Explorar o código

factorize _starpu_handle_node_data_requests and _starpu_handle_node_prefetch_requests

Samuel Thibault %!s(int64=10) %!d(string=hai) anos
pai
achega
7d74a98d39
Modificáronse 1 ficheiros con 34 adicións e 127 borrados
  1. 34 127
      src/datawizard/data_request.c

+ 34 - 127
src/datawizard/data_request.c

@@ -439,11 +439,12 @@ static int starpu_handle_data_request(struct _starpu_data_request *r, unsigned m
 	return 0;
 }
 
-int _starpu_handle_node_data_requests(unsigned src_node, unsigned may_alloc, unsigned *pushed)
+int __starpu_handle_node_data_requests(struct _starpu_data_request_list **reqlist, unsigned src_node, unsigned may_alloc, unsigned n, unsigned *pushed, unsigned prefetch)
 {
 	struct _starpu_data_request *r;
-	struct _starpu_data_request_list *new_data_requests;
+	struct _starpu_data_request_list *new_data_requests[prefetch + 1]; /* Indexed by prefetch level */
 	struct _starpu_data_request_list *empty_list;
+	unsigned i;
 	int ret = 0;
 
 	*pushed = 0;
@@ -452,7 +453,7 @@ int _starpu_handle_node_data_requests(unsigned src_node, unsigned may_alloc, uns
 	/* This is racy, but not posing problems actually, since we know we
 	 * will come back here to probe again regularly anyway.
 	 * Thus, do not expose this optimization to helgrind */
-	if (!RUNNING_ON_VALGRIND && _starpu_data_request_list_empty(data_requests[src_node]))
+	if (!RUNNING_ON_VALGRIND && _starpu_data_request_list_empty(reqlist[src_node]))
 		return 0;
 #endif
 
@@ -470,7 +471,7 @@ int _starpu_handle_node_data_requests(unsigned src_node, unsigned may_alloc, uns
 	STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[src_node]);
 #endif
 
-	struct _starpu_data_request_list *local_list = data_requests[src_node];
+	struct _starpu_data_request_list *local_list = reqlist[src_node];
 
 	if (_starpu_data_request_list_empty(local_list))
 	{
@@ -483,19 +484,20 @@ int _starpu_handle_node_data_requests(unsigned src_node, unsigned may_alloc, uns
 	/* There is an entry: we create a new empty list to replace the list of
 	 * requests, and we handle the request(s) one by one in the former
 	 * list, without concurrency issues.*/
-	data_requests[src_node] = empty_list;
-	STARPU_HG_DISABLE_CHECKING(data_requests[src_node]->_head);
+	reqlist[src_node] = empty_list;
+	STARPU_HG_DISABLE_CHECKING(reqlist[src_node]->_head);
 
 	STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[src_node]);
 
-	new_data_requests = _starpu_data_request_list_new();
+	for (i = 0; i <= prefetch; i++)
+		new_data_requests[i] = _starpu_data_request_list_new();
 
 	/* for all entries of the list */
 	while (!_starpu_data_request_list_empty(local_list))
 	{
                 int res;
 
-		if (data_requests_npending[src_node] >= MAX_PENDING_REQUESTS_PER_NODE)
+		if (data_requests_npending[src_node] >= n)
 		{
 			/* Too many requests at the same time, skip pushing
 			 * more for now */
@@ -505,28 +507,38 @@ int _starpu_handle_node_data_requests(unsigned src_node, unsigned may_alloc, uns
 
 		r = _starpu_data_request_list_pop_front(local_list);
 
-		res = starpu_handle_data_request(r, may_alloc, 0);
+		res = starpu_handle_data_request(r, may_alloc, prefetch);
 		if (res != 0 && res != -EAGAIN)
 		{
 			/* handle is busy, or not enough memory, postpone for now */
 			ret = res;
-			_starpu_data_request_list_push_back(new_data_requests, r);
+			/* Prefetch requests might have gotten promoted while in tmp list */
+			_starpu_data_request_list_push_back(new_data_requests[r->prefetch], r);
 			break;
 		}
 
 		(*pushed)++;
 	}
 
+	/* Push back requests we didn't handle on the proper list */
 	while (!_starpu_data_request_list_empty(local_list))
 	{
 		r = _starpu_data_request_list_pop_front(local_list);
-		_starpu_data_request_list_push_back(new_data_requests, r);
+		/* Prefetch requests might have gotten promoted while in tmp list */
+		_starpu_data_request_list_push_back(new_data_requests[r->prefetch], r);
 	}
 
-	if (!_starpu_data_request_list_empty(new_data_requests))
+	for (i = 0; i <= prefetch; i++)
+		if (!_starpu_data_request_list_empty(new_data_requests[i]))
+			break;
+
+	if (i <= prefetch)
 	{
 		STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[src_node]);
-		_starpu_data_request_list_push_list_front(new_data_requests, data_requests[src_node]);
+		if (!(_starpu_data_request_list_empty(new_data_requests[0])))
+			_starpu_data_request_list_push_list_front(new_data_requests[0], data_requests[src_node]);
+		if (prefetch == 1 && !(_starpu_data_request_list_empty(new_data_requests[1])))
+			_starpu_data_request_list_push_list_front(new_data_requests[1], prefetch_requests[src_node]);
 		STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[src_node]);
 
 #ifndef STARPU_NON_BLOCKING_DRIVERS
@@ -534,127 +546,22 @@ int _starpu_handle_node_data_requests(unsigned src_node, unsigned may_alloc, uns
 #endif
 	}
 
-	_starpu_data_request_list_delete(new_data_requests);
+	for (i = 0; i <= prefetch; i++)
+		_starpu_data_request_list_delete(new_data_requests[i]);
 	_starpu_data_request_list_delete(local_list);
 
+
 	return ret;
 }
 
-int _starpu_handle_node_prefetch_requests(unsigned src_node, unsigned may_alloc, unsigned *pushed)
+int _starpu_handle_node_data_requests(unsigned src_node, unsigned may_alloc, unsigned *pushed)
 {
-	struct _starpu_data_request *r;
-	struct _starpu_data_request_list *new_data_requests;
-	struct _starpu_data_request_list *new_prefetch_requests;
-	struct _starpu_data_request_list *empty_list;
-	int ret = 0;
-
-	*pushed = 0;
-
-#ifdef STARPU_NON_BLOCKING_DRIVERS
-	/* This is racy, but not posing problems actually, since we know we
-	 * will come back here to probe again regularly anyway.
-	 * Thus, do not expose this optimization to helgrind */
-	if (!RUNNING_ON_VALGRIND && _starpu_data_request_list_empty(prefetch_requests[src_node]))
-		return 0;
-#endif
-
-	empty_list = _starpu_data_request_list_new();
-
-#ifdef STARPU_NON_BLOCKING_DRIVERS
-	/* take all the entries from the request list */
-	if (STARPU_PTHREAD_MUTEX_TRYLOCK(&data_requests_list_mutex[src_node]))
-	{
-		/* List is busy, do not bother with it */
-		_starpu_data_request_list_delete(empty_list);
-		return -EBUSY;
-	}
-#else
-	STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[src_node]);
-#endif
-
-	struct _starpu_data_request_list *local_list = prefetch_requests[src_node];
-
-	if (_starpu_data_request_list_empty(local_list))
-	{
-		/* there is no request */
-                STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[src_node]);
-		_starpu_data_request_list_delete(empty_list);
-		return 0;
-	}
-
-	/* There is an entry: we create a new empty list to replace the list of
-	 * requests, and we handle the request(s) one by one in the former
-	 * list, without concurrency issues.*/
-	prefetch_requests[src_node] = empty_list;
-	STARPU_HG_DISABLE_CHECKING(prefetch_requests[src_node]->_head);
-
-	STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[src_node]);
-
-	new_data_requests = _starpu_data_request_list_new();
-	new_prefetch_requests = _starpu_data_request_list_new();
-
-	/* for all entries of the list */
-	while (!_starpu_data_request_list_empty(local_list))
-	{
-                int res;
-
-		if (data_requests_npending[src_node] >= MAX_PENDING_PREFETCH_REQUESTS_PER_NODE)
-		{
-			/* Too many requests at the same time, skip pushing
-			 * more for now */
-			ret = -EBUSY;
-			break;
-		}
-
-		r = _starpu_data_request_list_pop_front(local_list);
-
-		res = starpu_handle_data_request(r, may_alloc, 1);
-		if (res != 0 && res != -EAGAIN)
-		{
-			/* handle is busy, or not enough memory, postpone for now */
-			ret = res;
-			if (r->prefetch)
-				_starpu_data_request_list_push_back(new_prefetch_requests, r);
-			else
-			{
-				/* Prefetch request promoted while in tmp list*/
-				_starpu_data_request_list_push_back(new_data_requests, r);
-			}
-			break;
-		}
-
-		(*pushed)++;
-	}
-
-	while (!_starpu_data_request_list_empty(local_list))
-	{
-		r = _starpu_data_request_list_pop_front(local_list);
-		if (r->prefetch)
-			_starpu_data_request_list_push_back(new_prefetch_requests, r);
-		else
-			/* Prefetch request promoted while in tmp list*/
-			_starpu_data_request_list_push_back(new_data_requests, r);
-	}
-
-	if (!(_starpu_data_request_list_empty(new_data_requests) && _starpu_data_request_list_empty(new_prefetch_requests)))
-	{
-		STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[src_node]);
-		if (!(_starpu_data_request_list_empty(new_data_requests)))
-			_starpu_data_request_list_push_list_front(new_data_requests, data_requests[src_node]);
-		if (!(_starpu_data_request_list_empty(new_prefetch_requests)))
-			_starpu_data_request_list_push_list_front(new_prefetch_requests, prefetch_requests[src_node]);
-		STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[src_node]);
-
-#ifndef STARPU_NON_BLOCKING_DRIVERS
-		_starpu_wake_all_blocked_workers_on_node(src_node);
-#endif
-	}
-
-	_starpu_data_request_list_delete(new_data_requests);
-	_starpu_data_request_list_delete(new_prefetch_requests);
-	_starpu_data_request_list_delete(local_list);
+	return __starpu_handle_node_data_requests(data_requests, src_node, may_alloc, MAX_PENDING_REQUESTS_PER_NODE, pushed, 0);
+}
 
-	return ret;
+int _starpu_handle_node_prefetch_requests(unsigned src_node, unsigned may_alloc, unsigned *pushed)
+{
+	return __starpu_handle_node_data_requests(prefetch_requests, src_node, may_alloc, MAX_PENDING_PREFETCH_REQUESTS_PER_NODE, pushed, 1);
 }
 
 static int _handle_pending_node_data_requests(unsigned src_node, unsigned force)