Переглянути джерело

Add a task prefetch level

to improve retaining data in accelerators so we can make prefetch more
aggressive into evicting old data.
Samuel Thibault 4 роки тому
батько
коміт
040f190393

+ 2 - 0
ChangeLog

@@ -39,6 +39,8 @@ New features:
     its OS index.
   * New function starpu_get_hwloc_topology() to get a copy of the hwloc
     topology used by StarPU.
+  * Add a task prefetch level, to improve retaining data in accelerators so we
+    can make prefetch more aggressive.
 
 Small changes:
   * Use the S4U interface of Simgrid instead of xbt and MSG.

+ 38 - 6
src/datawizard/coherency.c

@@ -522,7 +522,13 @@ struct _starpu_data_request *_starpu_create_request_to_fetch_data(starpu_data_ha
 #endif
 
 			if (dst_replicate->mc)
+			{
+				if (is_prefetch == STARPU_TASK_PREFETCH)
+					/* Make sure it stays there */
+					dst_replicate->mc->nb_tasks_prefetch++;
+
 				_starpu_memchunk_recently_used(dst_replicate->mc, requesting_node);
+			}
 		}
 
 		_starpu_spin_unlock(&handle->header_lock);
@@ -567,6 +573,9 @@ struct _starpu_data_request *_starpu_create_request_to_fetch_data(starpu_data_ha
 			if (_starpu_allocate_memory_on_node(handle, dst_replicate, is_prefetch) == 0)
 			{
 				_starpu_update_data_state(handle, dst_replicate, mode);
+				if (is_prefetch == STARPU_TASK_PREFETCH)
+					/* Make sure it stays there */
+					dst_replicate->mc->nb_tasks_prefetch++;
 
 				_starpu_spin_unlock(&handle->header_lock);
 
@@ -645,9 +654,17 @@ struct _starpu_data_request *_starpu_create_request_to_fetch_data(starpu_data_ha
 				STARPU_ASSERT(r->next_req_count <= STARPU_MAXNODES);
 			}
 		}
-		else if (!write_invalidation)
-			/* The last request will perform the callback after termination */
-			_starpu_data_request_append_callback(r, callback_func, callback_arg);
+		else
+		{
+			if (is_prefetch == STARPU_TASK_PREFETCH)
+				/* Make last request add the prefetch count on the mc to keep the data
+				 * there until the task gets to execute.  */
+				r->nb_tasks_prefetch++;
+
+			if (!write_invalidation)
+				/* The last request will perform the callback after termination */
+				_starpu_data_request_append_callback(r, callback_func, callback_arg);
+		}
 
 
 		if (reused_requests[hop])
@@ -785,7 +802,12 @@ static int idle_prefetch_data_on_node(starpu_data_handle_t handle, int node, str
 	return _starpu_fetch_data_on_node(handle, node, replicate, mode, 1, STARPU_IDLEFETCH, 1, NULL, NULL, prio, "idle_prefetch_data_on_node");
 }
 
-static int prefetch_data_on_node(starpu_data_handle_t handle, int node, struct _starpu_data_replicate *replicate, enum starpu_data_access_mode mode, int prio)
+static int task_prefetch_data_on_node(starpu_data_handle_t handle, int node, struct _starpu_data_replicate *replicate, enum starpu_data_access_mode mode, int prio)
+{
+	return _starpu_fetch_data_on_node(handle, node, replicate, mode, 1, STARPU_TASK_PREFETCH, 1, NULL, NULL, prio, "task_prefetch_data_on_node");
+}
+
+static int STARPU_ATTRIBUTE_UNUSED prefetch_data_on_node(starpu_data_handle_t handle, int node, struct _starpu_data_replicate *replicate, enum starpu_data_access_mode mode, int prio)
 {
 	return _starpu_fetch_data_on_node(handle, node, replicate, mode, 1, STARPU_PREFETCH, 1, NULL, NULL, prio, "prefetch_data_on_node");
 }
@@ -911,7 +933,7 @@ int starpu_prefetch_task_input_on_node_prio(struct starpu_task *task, unsigned t
 		int node = _starpu_task_data_get_node_on_node(task, index, target_node);
 
 		struct _starpu_data_replicate *replicate = &handle->per_node[node];
-		prefetch_data_on_node(handle, node, replicate, mode, prio);
+		task_prefetch_data_on_node(handle, node, replicate, mode, prio);
 
 		_starpu_set_data_requested_flag_if_needed(handle, replicate);
 	}
@@ -988,7 +1010,7 @@ int starpu_prefetch_task_input_for_prio(struct starpu_task *task, unsigned worke
 		int node = _starpu_task_data_get_node_on_worker(task, index, worker);
 
 		struct _starpu_data_replicate *replicate = &handle->per_node[node];
-		prefetch_data_on_node(handle, node, replicate, mode, prio);
+		task_prefetch_data_on_node(handle, node, replicate, mode, prio);
 
 		_starpu_set_data_requested_flag_if_needed(handle, replicate);
 	}
@@ -1223,7 +1245,17 @@ void _starpu_fetch_task_input_tail(struct starpu_task *task, struct _starpu_job
 		local_replicate = get_replicate(handle, mode, workerid, node);
 		_starpu_spin_lock(&handle->header_lock);
 		if (local_replicate->mc)
+		{
 			local_replicate->mc->diduse = 1;
+			if (task->prefetched)
+			{
+				/* Allocations or transfer prefetchs should have been done by now and marked
+				 * this mc as needed for us.
+				 * Now that we added a reference for the task, we can relieve that.  */
+				STARPU_ASSERT(local_replicate->mc->nb_tasks_prefetch > 0);
+				local_replicate->mc->nb_tasks_prefetch--;
+			}
+		}
 		_starpu_spin_unlock(&handle->header_lock);
 
 		_STARPU_TASK_SET_INTERFACE(task , local_replicate->data_interface, descrs[index].index);

+ 4 - 0
src/datawizard/copy_driver.c

@@ -244,6 +244,10 @@ int STARPU_ATTRIBUTE_WARN_UNUSED_RESULT _starpu_driver_copy_data_1_to_1(starpu_d
 
 		dst_replicate->initialized = 1;
 
+		if (dst_replicate->mc)
+			/* When we have the data there, make sure it stays there for the task.  */
+			dst_replicate->mc->nb_tasks_prefetch += req->nb_tasks_prefetch;
+
 		_STARPU_TRACE_START_DRIVER_COPY(src_node, dst_node, size, com_id, prefetch, handle);
 		int ret_copy = copy_data_1_to_1_generic(handle, src_replicate, dst_replicate, req);
 		if (!req)

+ 3 - 2
src/datawizard/copy_driver.h

@@ -50,8 +50,9 @@ struct _starpu_data_replicate;
 enum _starpu_is_prefetch
 {
 	STARPU_FETCH = 0,		/* A task really needs it now! */
-	STARPU_PREFETCH = 1,		/* It is a good idea to have it asap */
-	STARPU_IDLEFETCH = 2,		/* Get this here when you have time to */
+	STARPU_TASK_PREFETCH = 1,	/* A task will need it soon */
+	STARPU_PREFETCH = 2,		/* It is a good idea to have it asap */
+	STARPU_IDLEFETCH = 3,		/* Get this here when you have time to */
 	STARPU_NFETCH
 };
 

+ 22 - 11
src/datawizard/data_request.c

@@ -29,7 +29,7 @@
 #warning split into separate out/in queues for each node, so that MAX_PENDING_REQUESTS_PER_NODE is separate for them, since the links are bidirectionnal
 #endif
 static struct _starpu_data_request_prio_list data_requests[STARPU_MAXNODES];
-static struct _starpu_data_request_prio_list prefetch_requests[STARPU_MAXNODES];
+static struct _starpu_data_request_prio_list prefetch_requests[STARPU_MAXNODES]; /* Contains both task_prefetch and prefetch */
 static struct _starpu_data_request_prio_list idle_requests[STARPU_MAXNODES];
 static starpu_pthread_mutex_t data_requests_list_mutex[STARPU_MAXNODES];
 
@@ -156,6 +156,7 @@ struct _starpu_data_request *_starpu_create_data_request(starpu_data_handle_t ha
 	STARPU_ASSERT(starpu_node_get_kind(handling_node) == STARPU_CPU_RAM || _starpu_memory_node_get_nworkers(handling_node));
 	r->completed = 0;
 	r->prefetch = is_prefetch;
+	r->nb_tasks_prefetch = 0;
 	r->prio = prio;
 	r->retval = -1;
 	r->ndeps = ndeps;
@@ -310,7 +311,7 @@ void _starpu_post_data_request(struct _starpu_data_request *r)
 
 	/* insert the request in the proper list */
 	STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[handling_node]);
-	if (r->prefetch == STARPU_IDLEFETCH)
+	if (r->prefetch >= STARPU_IDLEFETCH)
 		_starpu_data_request_prio_list_push_back(&idle_requests[handling_node], r);
 	else if (r->prefetch > STARPU_FETCH)
 		_starpu_data_request_prio_list_push_back(&prefetch_requests[handling_node], r);
@@ -644,6 +645,11 @@ static int __starpu_handle_node_data_requests(struct _starpu_data_request_prio_l
 			_starpu_data_request_prio_list_push_prio_list_back(&new_data_requests[STARPU_FETCH], &data_requests[src_node]);
 			data_requests[src_node] = new_data_requests[STARPU_FETCH];
 		}
+		if (prefetch >= STARPU_TASK_PREFETCH && !(_starpu_data_request_prio_list_empty(&new_data_requests[STARPU_TASK_PREFETCH])))
+		{
+			_starpu_data_request_prio_list_push_prio_list_back(&new_data_requests[STARPU_TASK_PREFETCH], &prefetch_requests[src_node]);
+			prefetch_requests[src_node] = new_data_requests[STARPU_TASK_PREFETCH];
+		}
 		if (prefetch >= STARPU_PREFETCH && !(_starpu_data_request_prio_list_empty(&new_data_requests[STARPU_PREFETCH])))
 		{
 			_starpu_data_request_prio_list_push_prio_list_back(&new_data_requests[STARPU_PREFETCH], &prefetch_requests[src_node]);
@@ -844,6 +850,10 @@ void _starpu_update_prefetch_status(struct _starpu_data_request *r, enum _starpu
 	STARPU_ASSERT(r->prefetch > prefetch);
 	r->prefetch=prefetch;
 
+	if (prefetch >= STARPU_IDLEFETCH)
+		/* No possible actual change */
+		return;
+
 	/* We have to promote chained_request too! */
 	unsigned chained_req;
 	for (chained_req = 0; chained_req < r->next_req_count; chained_req++)
@@ -855,19 +865,20 @@ void _starpu_update_prefetch_status(struct _starpu_data_request *r, enum _starpu
 
 	STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[r->handling_node]);
 
+	int found = 1;
+
 	/* The request can be in a different list (handling request or the temp list)
-	 * we have to check that it is really in the prefetch list. */
+	 * we have to check that it is really in the prefetch or idle list. */
 	if (_starpu_data_request_prio_list_ismember(&prefetch_requests[r->handling_node], r))
-	{
-		_starpu_data_request_prio_list_erase(&prefetch_requests[r->handling_node],r);
-		_starpu_data_request_prio_list_push_back(&data_requests[r->handling_node],r);
-	}
-	/* The request can be in a different list (handling request or the temp list)
-	 * we have to check that it is really in the idle list. */
+		_starpu_data_request_prio_list_erase(&prefetch_requests[r->handling_node], r);
 	else if (_starpu_data_request_prio_list_ismember(&idle_requests[r->handling_node], r))
+		_starpu_data_request_prio_list_erase(&idle_requests[r->handling_node], r);
+	else
+		found = 0;
+
+	if (found)
 	{
-		_starpu_data_request_prio_list_erase(&idle_requests[r->handling_node],r);
-		if (prefetch >= STARPU_PREFETCH)
+		if (prefetch > STARPU_FETCH)
 			_starpu_data_request_prio_list_push_back(&prefetch_requests[r->handling_node],r);
 		else
 			_starpu_data_request_prio_list_push_back(&data_requests[r->handling_node],r);

+ 3 - 0
src/datawizard/data_request.h

@@ -82,6 +82,9 @@ LIST_TYPE(_starpu_data_request,
 	/** Whether this is just a prefetch request */
 	enum _starpu_is_prefetch prefetch;
 
+	/** Number of tasks which used this as a prefetch */
+	unsigned nb_tasks_prefetch;
+
 	/** Priority of the request. Default is 0 */
 	int prio;
 

+ 18 - 14
src/datawizard/memalloc.c

@@ -546,7 +546,7 @@ static void reuse_mem_chunk(unsigned node, struct _starpu_data_replicate *new_re
 /* This function is called for memory chunks that are possibly in used (ie. not
  * in the cache). They should therefore still be associated to a handle. */
 /* mc_lock is held and may be temporarily released! */
-static size_t try_to_throw_mem_chunk(struct _starpu_mem_chunk *mc, unsigned node, struct _starpu_data_replicate *replicate, unsigned is_already_in_mc_list)
+static size_t try_to_throw_mem_chunk(struct _starpu_mem_chunk *mc, unsigned node, struct _starpu_data_replicate *replicate, unsigned is_already_in_mc_list, enum _starpu_is_prefetch is_prefetch)
 {
 	size_t freed = 0;
 
@@ -571,6 +571,10 @@ static size_t try_to_throw_mem_chunk(struct _starpu_mem_chunk *mc, unsigned node
 		/* Hasn't been used yet, avoid evicting it */
 		return 0;
 
+	if (mc->nb_tasks_prefetch && is_prefetch >= STARPU_TASK_PREFETCH)
+		/* We have not finished executing the tasks this was prefetched for */
+		return 0;
+
 	/* REDUX memchunk */
 	if (mc->relaxed_coherency == 2)
 	{
@@ -782,7 +786,7 @@ static int try_to_find_reusable_mc(unsigned node, starpu_data_handle_t data, str
 
 /* this function looks for a memory chunk that matches a given footprint in the
  * list of mem chunk that are not important */
-static int try_to_reuse_not_important_mc(unsigned node, starpu_data_handle_t data, struct _starpu_data_replicate *replicate, uint32_t footprint)
+static int try_to_reuse_not_important_mc(unsigned node, starpu_data_handle_t data, struct _starpu_data_replicate *replicate, uint32_t footprint, enum _starpu_is_prefetch is_prefetch)
 {
 	struct _starpu_mem_chunk *mc, *orig_next_mc, *next_mc;
 	int success = 0;
@@ -816,7 +820,7 @@ restart:
 		}
 
 		/* Note: this may unlock mc_list! */
-		success = try_to_throw_mem_chunk(mc, node, replicate, 1);
+		success = try_to_throw_mem_chunk(mc, node, replicate, 1, is_prefetch);
 
 		if (orig_next_mc)
 		{
@@ -846,6 +850,9 @@ static int try_to_reuse_potentially_in_use_mc(unsigned node, starpu_data_handle_
 	struct _starpu_mem_chunk *mc, *next_mc, *orig_next_mc;
 	int success = 0;
 
+	if (is_prefetch >= STARPU_IDLEFETCH)
+		/* Do not evict a MC just for an idle fetch */
+		return 0;
 	/*
 	 * We have to unlock mc_lock before locking header_lock, so we have
 	 * to be careful with the list.  We try to do just one pass, by
@@ -868,15 +875,11 @@ restart:
 		if (mc->remove_notify)
 			/* Somebody already working here, skip */
 			continue;
-		if (is_prefetch >= STARPU_IDLEFETCH)
-			/* Do not evict a MC just for an idle fetch */
-			continue;
-		if (is_prefetch >= STARPU_PREFETCH && !mc->wontuse)
+		if (!mc->wontuse && is_prefetch >= STARPU_PREFETCH)
 			/* Do not evict something that we might reuse, just for a prefetch */
-			/* TODO ! */
-			/* FIXME: but perhaps we won't have any task using it in
-                         * the close future, we should perhaps rather check
-                         * mc->replicate->refcnt? */
+			continue;
+		if (mc->nb_tasks_prefetch && is_prefetch >= STARPU_TASK_PREFETCH)
+			/* Do not evict something that we will reuse, just for a task prefetch */
 			continue;
 		if (mc->footprint != footprint || _starpu_data_interface_compare(handle->per_node[node].data_interface, handle->ops, mc->data->per_node[node].data_interface, mc->ops) != 1)
 			/* Not the right type of interface, skip */
@@ -890,7 +893,7 @@ restart:
 		}
 
 		/* Note: this may unlock mc_list! */
-		success = try_to_throw_mem_chunk(mc, node, replicate, 1);
+		success = try_to_throw_mem_chunk(mc, node, replicate, 1, is_prefetch);
 
 		if (orig_next_mc)
 		{
@@ -1000,7 +1003,7 @@ restart2:
 				next_mc->remove_notify = &next_mc;
 			}
 			/* Note: this may unlock mc_list! */
-			freed += try_to_throw_mem_chunk(mc, node, NULL, 0);
+			freed += try_to_throw_mem_chunk(mc, node, NULL, 0, STARPU_FETCH);
 
 			if (orig_next_mc)
 			{
@@ -1318,6 +1321,7 @@ static struct _starpu_mem_chunk *_starpu_memchunk_init(struct _starpu_data_repli
 	mc->size_interface = interface_size;
 	mc->remove_notify = NULL;
 	mc->diduse = 0;
+	mc->nb_tasks_prefetch = 0;
 	mc->wontuse = 0;
 
 	return mc;
@@ -1515,7 +1519,7 @@ static starpu_ssize_t _starpu_allocate_interface(starpu_data_handle_t handle, st
 			reclaim -= freed;
 
 			/* Try to reuse an allocated data with the same interface (to avoid spurious free/alloc) */
-			if (_starpu_has_not_important_data && try_to_reuse_not_important_mc(dst_node, handle, replicate, footprint))
+			if (_starpu_has_not_important_data && try_to_reuse_not_important_mc(dst_node, handle, replicate, footprint, is_prefetch))
 				break;
 			if (try_to_reuse_potentially_in_use_mc(dst_node, handle, replicate, footprint, is_prefetch))
 			{

+ 7 - 0
src/datawizard/memalloc.h

@@ -60,10 +60,17 @@ LIST_TYPE(_starpu_mem_chunk,
 	/** Whether the memchunk is in the clean part of the mc_list */
 	unsigned clean:1;
 	/** Was this chunk used since it got allocated?  */
+	/* FIXME: probably useless now with nb_tasks_prefetch */
 	unsigned diduse:1;
 	/** Was this chunk marked as "won't use"? */
 	unsigned wontuse:1;
 
+	/** The number of prefetches that we made for this mc for various tasks
+	 * This is also the number of tasks that we will wait to see use this mc before
+	 * we attempt to evict it.
+	 */
+	unsigned nb_tasks_prefetch;
+
 	/** the size of the data is only set when calling _starpu_request_mem_chunk_removal(),
 	 * it is needed to estimate how much memory is in mc_cache, and by
 	 * free_memory_on_node() which is called when the handle is no longer

+ 1 - 0
src/debug/traces/starpu_fxt.c

@@ -2274,6 +2274,7 @@ static const char *copy_link_type(enum _starpu_is_prefetch prefetch)
 	switch (prefetch)
 	{
 		case STARPU_FETCH: return "F";
+		case STARPU_TASK_PREFETCH: return "TF";
 		case STARPU_PREFETCH: return "PF";
 		case STARPU_IDLEFETCH: return "IF";
 		default: STARPU_ASSERT(0);

+ 2 - 0
src/debug/traces/starpu_paje.c

@@ -398,6 +398,7 @@ void _starpu_fxt_write_paje_header(FILE *file STARPU_ATTRIBUTE_UNUSED, struct st
 	/* Link types */
 	poti_DefineLinkType("MPIL", "MPIP", "MPICt", "MPICt", "MPI communication");
 	poti_DefineLinkType("F", "P", "Mm", "Mm", "Intra-node data Fetch");
+	poti_DefineLinkType("TF", "P", "Mm", "Mm", "Intra-node data TaskPreFetch");
 	poti_DefineLinkType("PF", "P", "Mm", "Mm", "Intra-node data PreFetch");
 	poti_DefineLinkType("IF", "P", "Mm", "Mm", "Intra-node data IdleFetch");
 	poti_DefineLinkType("WSL", "P", "W", "W", "Work steal");
@@ -551,6 +552,7 @@ void _starpu_fxt_write_paje_header(FILE *file STARPU_ATTRIBUTE_UNUSED, struct st
 6       No       MS     Nothing         \".0 .0 .0\"		\n\
 5       MPIL     MPIP	MPICt	MPICt   \"MPI communication\"\n\
 5       F       P	Mm	Mm      \"Intra-node data Fetch\"\n\
+5       TF      P	Mm	Mm      \"Intra-node data TaskPreFetch\"\n\
 5       PF      P	Mm	Mm      \"Intra-node data PreFetch\"\n\
 5       IF      P	Mm	Mm      \"Intra-node data IdleFetch\"\n\
 5       WSL     P	W	W       \"Work steal\"\n");