瀏覽代碼

data_request: Allow several requests for the same src/dst pair

This will be needed to properly record a long history of prefetches,
which might be interleaved with evictions.
Samuel Thibault 4 年之前
父節點
當前提交
957178e7cb

+ 25 - 3
src/datawizard/coherency.c

@@ -494,8 +494,11 @@ struct _starpu_data_request *_starpu_create_request_to_fetch_data(starpu_data_ha
 		unsigned nnodes = starpu_memory_nodes_get_count();
 		for (i = 0; i < nnodes; i++)
 			for (j = 0; j < nnodes; j++)
-				if (handle->per_node[i].request[j])
+			{
+				struct _starpu_data_request *r;
+				for (r = handle->per_node[i].request[j]; r; r = r->next_same_req)
 					nwait++;
+			}
 		/* If the request is not detached (i.e. the caller really wants
 		 * proper ownership), no new requests will appear because a
 		 * reference will be kept on the dst replicate, which will
@@ -532,6 +535,25 @@ struct _starpu_data_request *_starpu_create_request_to_fetch_data(starpu_data_ha
 
 				_starpu_memchunk_recently_used(dst_replicate->mc, requesting_node);
 			}
+
+			if (task)
+			{
+				unsigned j;
+				unsigned nnodes = starpu_memory_nodes_get_count();
+				/* Cancel any existing (prefetch) request */
+				struct _starpu_data_request *r2;
+				for (j = 0; j < nnodes; j++)
+				{
+					for (r2 = dst_replicate->request[j]; r2; r2 = r2->next_same_req)
+					{
+						if (r2->task && r2->task == task)
+						{
+							r2->canceled = 1;
+							break;
+						}
+					}
+				}
+			}
 		}
 
 		_starpu_spin_unlock(&handle->header_lock);
@@ -706,8 +728,8 @@ struct _starpu_data_request *_starpu_create_request_to_fetch_data(starpu_data_ha
 		for (i = 0; i < nnodes; i++)
 			for (j = 0; j < nnodes; j++)
 			{
-				struct _starpu_data_request *r2 = handle->per_node[i].request[j];
-				if (r2)
+				struct _starpu_data_request *r2;
+				for (r2 = handle->per_node[i].request[j]; r2; r2 = r2->next_same_req)
 				{
 					_starpu_spin_lock(&r2->lock);
 					if (is_prefetch < r2->prefetch)

+ 6 - 7
src/datawizard/coherency.h

@@ -72,14 +72,13 @@ struct _starpu_data_replicate
 	 * */
 	unsigned automatically_allocated:1;
 
-	/** To help the scheduling policies to make some decision, we
-	   may keep a track of the tasks that are likely to request
-	   this data on the current node.
-	   It is the responsability of the scheduling _policy_ to set that
-	   flag when it assigns a task to a queue, policies which do not
-	   use this hint can simply ignore it.
-	 */
+	/** This tracks the list of requests to provide the value */
 	struct _starpu_data_request *request[STARPU_MAXNODES];
+	/** This points to the last entry of request, to easily append to the list */
+	struct _starpu_data_request *last_request[STARPU_MAXNODES];
+
+	/* Which request is loading data here */
+	struct _starpu_data_request *load_request;
 
 	/** The number of prefetches that we made for this replicate for various tasks
 	 * This is also the number of tasks that we will wait to see use the mc before

+ 64 - 8
src/datawizard/data_request.c

@@ -105,6 +105,7 @@ static void _starpu_data_request_unlink(struct _starpu_data_request *r)
 	else
 	{
 		unsigned node;
+		struct _starpu_data_request **prevp, *prev;
 
 		if (r->mode & STARPU_R)
 			/* If this is a read request, we store the pending requests
@@ -115,10 +116,25 @@ static void _starpu_data_request_unlink(struct _starpu_data_request *r)
 			 * we use the destination node to cache the request. */
 			node = r->dst_replicate->memory_node;
 
-		STARPU_ASSERT(r->dst_replicate->request[node] == r);
-		r->dst_replicate->request[node] = NULL;
-	}
+		/* Look for ourself in the list, we should be not very far. */
+		for (prevp = &r->dst_replicate->request[node], prev = NULL;
+		     *prevp && *prevp != r;
+		     prev = *prevp, prevp = &prev->next_same_req)
+			;
+
+		STARPU_ASSERT(*prevp == r);
+		*prevp = r->next_same_req;
 
+		if (!r->next_same_req)
+		{
+			/* I was last */
+			STARPU_ASSERT(r->dst_replicate->last_request[node] == r);
+			if (prev)
+				r->dst_replicate->last_request[node] = prev;
+			else
+				r->dst_replicate->last_request[node] = NULL;
+		}
+	}
 }
 
 static void _starpu_data_request_destroy(struct _starpu_data_request *r)
@@ -185,12 +201,14 @@ struct _starpu_data_request *_starpu_create_data_request(starpu_data_handle_t ha
 	STARPU_ASSERT(starpu_node_get_kind(handling_node) == STARPU_CPU_RAM || _starpu_memory_node_get_nworkers(handling_node));
 	r->completed = 0;
 	r->added_ref = 0;
+	r->canceled = 0;
 	r->prefetch = is_prefetch;
 	r->task = task;
 	r->nb_tasks_prefetch = 0;
 	r->prio = prio;
 	r->retval = -1;
 	r->ndeps = ndeps;
+	r->next_same_req = NULL;
 	r->next_req_count = 0;
 	r->callbacks = NULL;
 	r->com_id = 0;
@@ -220,8 +238,11 @@ struct _starpu_data_request *_starpu_create_data_request(starpu_data_handle_t ha
 		else
 			node = dst_replicate->memory_node;
 
-		STARPU_ASSERT(!dst_replicate->request[node]);
-		dst_replicate->request[node] = r;
+		if (!dst_replicate->request[node])
+			dst_replicate->request[node] = r;
+		else
+			dst_replicate->last_request[node]->next_same_req = r;
+		dst_replicate->last_request[node] = r;
 
 		if (mode & STARPU_R)
 		{
@@ -392,7 +413,7 @@ static void starpu_handle_data_request_completion(struct _starpu_data_request *r
 	struct _starpu_data_replicate *dst_replicate = r->dst_replicate;
 
 
-	if (dst_replicate)
+	if (r->canceled < 2 && dst_replicate)
 	{
 #ifdef STARPU_MEMORY_STATS
 		enum _starpu_cache_state old_src_replicate_state = src_replicate->state;
@@ -400,6 +421,7 @@ static void starpu_handle_data_request_completion(struct _starpu_data_request *r
 
 		_starpu_spin_checklocked(&handle->header_lock);
 		_starpu_update_data_state(handle, r->dst_replicate, mode);
+		dst_replicate->load_request = NULL;
 
 #ifdef STARPU_MEMORY_STATS
 		if (src_replicate->state == STARPU_INVALID)
@@ -422,7 +444,7 @@ static void starpu_handle_data_request_completion(struct _starpu_data_request *r
 #endif
 	}
 
-	if (r->com_id > 0)
+	if (r->canceled < 2 && r->com_id > 0)
 	{
 #ifdef STARPU_USE_FXT
 		unsigned src_node = src_replicate->memory_node;
@@ -454,7 +476,7 @@ static void starpu_handle_data_request_completion(struct _starpu_data_request *r
 	/* Remove a reference on the destination replicate for the request */
 	if (dst_replicate)
 	{
-		if (dst_replicate->mc)
+		if (r->canceled < 2 && dst_replicate->mc)
 			/* Make sure it stays there for the task.  */
 			dst_replicate->nb_tasks_prefetch += r->nb_tasks_prefetch;
 
@@ -510,6 +532,14 @@ static void starpu_handle_data_request_completion(struct _starpu_data_request *r
 	}
 }
 
+void _starpu_data_request_complete_wait(void *arg)
+{
+	struct _starpu_data_request *r = arg;
+	_starpu_spin_lock(&r->handle->header_lock);
+	_starpu_spin_lock(&r->lock);
+	starpu_handle_data_request_completion(r);
+}
+
 /* TODO : accounting to see how much time was spent working for other people ... */
 static int starpu_handle_data_request(struct _starpu_data_request *r, unsigned may_alloc)
 {
@@ -534,6 +564,32 @@ static int starpu_handle_data_request(struct _starpu_data_request *r, unsigned m
 	struct _starpu_data_replicate *src_replicate = r->src_replicate;
 	struct _starpu_data_replicate *dst_replicate = r->dst_replicate;
 
+	if (r->canceled)
+	{
+		/* Ok, canceled before starting copies etc. */
+		r->canceled = 2;
+		/* Nothing left to do */
+		starpu_handle_data_request_completion(r);
+		return 0;
+	}
+
+	struct _starpu_data_request *r2 = dst_replicate->load_request;
+	if (r2 && r2 != r)
+	{
+		/* Oh, some other transfer is already loading the value. Just wait for it */
+		r->canceled = 2;
+		_starpu_spin_unlock(&r->lock);
+		_starpu_spin_lock(&r2->lock);
+		_starpu_data_request_append_callback(r2, _starpu_data_request_complete_wait, r);
+		_starpu_spin_unlock(&r2->lock);
+		_starpu_spin_unlock(&handle->header_lock);
+		return 0;
+	}
+
+	/* We are loading this replicate.
+	 * Note: we might fail to allocate memory, but we will keep on and others will wait for us. */
+	dst_replicate->load_request = r;
+
 	enum starpu_data_access_mode r_mode = r->mode;
 
 	STARPU_ASSERT(!(r_mode & STARPU_R) || src_replicate);

+ 8 - 1
src/datawizard/data_request.h

@@ -90,8 +90,11 @@ LIST_TYPE(_starpu_data_request,
 	/** Whether we have already added our reference to the dst replicate. */
 	unsigned added_ref:1;
 
+	/** Whether the request was canceled before being handled (because the transfer already happened another way). */
+	unsigned canceled:2;
+
 	/** Whether this is just a prefetch request */
-	enum starpu_is_prefetch prefetch;
+	enum starpu_is_prefetch prefetch:3;
 
 	/** Task this request is for */
 	struct starpu_task *task;
@@ -109,6 +112,10 @@ LIST_TYPE(_starpu_data_request,
 	 * dependencies. */
 	unsigned ndeps;
 
+	/** Some further tasks may have requested prefetches for the same data
+	 * much later on, link with them */
+	struct _starpu_data_request *next_same_req;
+
 	/** in case we have a chain of request (eg. for nvidia multi-GPU), this
 	 * is the list of requests which are waiting for this one. */
 	struct _starpu_data_request *next_req[STARPU_MAXNODES+1];

+ 2 - 0
src/datawizard/interfaces/data_interface.c

@@ -380,7 +380,9 @@ _starpu_data_initialize_per_worker(starpu_data_handle_t handle)
 		//for (node = 0; node < STARPU_MAXNODES; node++)
 		//{
 		//	replicate->request[node] = NULL;
+		//	replicate->last_request[node] = NULL;
 		//}
+		//replicate->load_request = NULL;
 
 		/* Assuming being used for SCRATCH for now, patched when entering REDUX mode */
 		replicate->relaxed_coherency = 1;