Browse Source

datawizard: rework to factor transfers

we'd rather have fetch/prefetch/idlefetch to be done coherently for a given
target memory node.
Samuel Thibault 4 years ago
parent
commit
ca2dfe621d
1 changed files with 56 additions and 21 deletions
  1. 56 21
      src/datawizard/datawizard.c

+ 56 - 21
src/datawizard/datawizard.c

@@ -26,9 +26,10 @@
 #include <core/simgrid.h>
 #endif
 
-static int ___starpu_datawizard_progress(unsigned memory_node, unsigned peer_node, enum _starpu_data_request_inout inout, unsigned may_alloc, unsigned push_requests)
+static int ____starpu_datawizard_progress(unsigned memory_node, unsigned peer_start, unsigned peer_end, enum  _starpu_data_request_inout inout, unsigned may_alloc, unsigned push_requests)
 {
 	int ret = 0;
+	unsigned peer_node;
 
 #ifdef STARPU_SIMGRID
 	/* XXX */
@@ -37,8 +38,11 @@ static int ___starpu_datawizard_progress(unsigned memory_node, unsigned peer_nod
 	STARPU_UYIELD();
 
 	/* in case some other driver requested data */
-	if (_starpu_handle_pending_node_data_requests(memory_node, peer_node, inout))
-		ret = 1;
+	for (peer_node = peer_start; peer_node < peer_end; peer_node++)
+	{
+		if (_starpu_handle_pending_node_data_requests(memory_node, peer_node, inout))
+			ret = 1;
+	}
 
 	starpu_memchunk_tidy(memory_node);
 
@@ -46,41 +50,76 @@ static int ___starpu_datawizard_progress(unsigned memory_node, unsigned peer_nod
 	{
 		/* Some transfers have finished, or the driver requests to really push more */
 		unsigned pushed;
-		if (_starpu_handle_node_data_requests(memory_node, peer_node, inout, may_alloc, &pushed) == 0)
+		unsigned ok = 1;
+
+		for (peer_node = peer_start; ok && peer_node < peer_end; peer_node++)
 		{
+			if (_starpu_handle_node_data_requests(memory_node, peer_node, inout, may_alloc, &pushed) == -ENOMEM)
+				ok = 0;
 			if (pushed)
 				ret = 1;
+		}
+
+		if (ok)
+		{
+			unsigned doidle = 1;
+
 			/* We pushed all pending requests, we can afford pushing
 			 * prefetch requests */
-			_starpu_handle_node_prefetch_requests(memory_node, peer_node, inout, may_alloc, &pushed);
-			if (_starpu_check_that_no_data_request_is_pending(memory_node, peer_node, inout))
+			for (peer_node = peer_start; ok && peer_node < peer_end; peer_node++)
+			{
+				if (_starpu_handle_node_prefetch_requests(memory_node, peer_node, inout, may_alloc, &pushed) == -ENOMEM)
+					ok = 0;
+				if (pushed)
+					ret = 1;
+				if (!_starpu_check_that_no_data_request_is_pending(memory_node, peer_node, inout))
+					doidle = 0;
+			}
+
+			if (doidle)
 				/* No pending transfer, push some idle transfer */
-				_starpu_handle_node_idle_requests(memory_node, peer_node, inout, may_alloc, &pushed);
+				for (peer_node = peer_start; ok && peer_node < peer_end; peer_node++)
+				{
+					if (_starpu_handle_node_idle_requests(memory_node, peer_node, inout, may_alloc, &pushed) == -ENOMEM)
+						ok = 0;
+					if (pushed)
+						ret = 1;
+				}
 		}
-		if (pushed)
-			ret = 1;
+
 	}
 
 	return ret;
 }
 
+static int ___starpu_datawizard_progress(unsigned memory_node, unsigned nnodes, unsigned may_alloc, unsigned push_requests)
+{
+	int ret = 0;
+	unsigned peer_node;
+
+	/* First handle all incoming transfers */
+	ret |= ____starpu_datawizard_progress(memory_node, 0, nnodes, _STARPU_DATA_REQUEST_IN, may_alloc, push_requests);
+
+	/* Then handle outgoing transfers */
+	for (peer_node = 0; peer_node < nnodes; peer_node++)
+		ret |= ____starpu_datawizard_progress(memory_node, peer_node, peer_node+1, _STARPU_DATA_REQUEST_OUT, may_alloc, push_requests);
+
+	return ret;
+}
+
 int __starpu_datawizard_progress(unsigned may_alloc, unsigned push_requests)
 {
 	struct _starpu_worker *worker = _starpu_get_local_worker_key();
-        unsigned memnode, memnode2;
+        unsigned memnode;
 
 	if (!worker)
 	{
 		/* Call from main application, only make RAM requests progress */
 		int ret = 0;
 		int nnumas = starpu_memory_nodes_get_numa_count();
-		int numa, numa2;
+		int numa;
 		for (numa = 0; numa < nnumas; numa++)
-			for (numa2 = 0; numa2 < nnumas; numa2++)
-			{
-				ret |=  ___starpu_datawizard_progress(numa, numa2, _STARPU_DATA_REQUEST_IN, may_alloc, push_requests);
-				ret |=  ___starpu_datawizard_progress(numa, numa2, _STARPU_DATA_REQUEST_OUT, may_alloc, push_requests);
-			}
+			ret |=  ___starpu_datawizard_progress(numa, nnumas, may_alloc, push_requests);
 		_starpu_execute_registered_progression_hooks();
 
 		return ret;
@@ -97,11 +136,7 @@ int __starpu_datawizard_progress(unsigned may_alloc, unsigned push_requests)
         for (memnode = 0; memnode < nnodes; memnode++)
         {
                 if (_starpu_worker_drives_memory[current_worker_id][memnode] == 1)
-			for (memnode2 = 0; memnode2 < nnodes; memnode2++)
-			{
-				ret |= ___starpu_datawizard_progress(memnode, memnode2, _STARPU_DATA_REQUEST_IN, may_alloc, push_requests);
-				ret |= ___starpu_datawizard_progress(memnode, memnode2, _STARPU_DATA_REQUEST_OUT, may_alloc, push_requests);
-			}
+			ret |=  ___starpu_datawizard_progress(memnode, nnodes, may_alloc, push_requests);
         }
 
 	_starpu_execute_registered_progression_hooks();