Просмотр исходного кода

Add anticipative writeback, to flush dirty data asynchronously before the
GPU device is full.

Samuel Thibault лет назад: 10
Родитель
Сommit
5a0fcbf328

+ 6 - 0
ChangeLog

@@ -18,6 +18,9 @@
 StarPU 1.3.0 (svn revision xxxx)
 ==============================================
 
+New features:
+  * Enable anticipative writeback by default.
+
 StarPU 1.2.0 (svn revision xxxx)
 ==============================================
 
@@ -88,6 +91,9 @@ New features:
     only when the bus is idle.
   * Make starpu_data_prefetch_on_node not forcibly flush data out, introduce
     starpu_data_fetch_on_node for that.
+  * Anticipative writeback, to flush dirty data asynchronously before the
+    GPU device is full. Disabled by default. Use STARPU_MINIMUM_CLEAN_BUFFERS
+    and STARPU_TARGET_CLEAN_BUFFERS to enable it.
 
 Small features:
   * Tasks can now have a name (via the field const char *name of

+ 18 - 0
doc/doxygen/chapters/40environment_variables.doxy

@@ -621,6 +621,24 @@ GPUs (or in main memory, when using out of core), when performing a periodic
 reclaiming pass. The default is 10%.
 </dd>
 
+<dt>STARPU_MINIMUM_CLEAN_BUFFERS</dt>
+<dd>
+\anchor STARPU_MINIMUM_CLEAN_BUFFERS
+\addindex __env__STARPU_MINIMUM_CLEAN_BUFFERS
+This specifies the minimum percentage of number of buffers that should be clean in GPUs
+(or in main memory, when using out of core), below which asynchronous writebacks will be
+issued. The default is 10%.
+</dd>
+
+<dt>STARPU_TARGET_CLEAN_BUFFERS</dt>
+<dd>
+\anchor STARPU_TARGET_CLEAN_BUFFERS
+\addindex __env__STARPU_TARGET_CLEAN_BUFFERS
+This specifies the target percentage of number of buffers that should be reached in
+GPUs (or in main memory, when using out of core), when performing an asynchronous
+writeback pass. The default is 25%.
+</dd>
+
 <dt>STARPU_DISK_SWAP</dt>
 <dd>
 \anchor STARPU_DISK_SWAP

+ 11 - 0
src/common/fxt.h

@@ -156,6 +156,9 @@
 #define _STARPU_FUT_SCHED_COMPONENT_PUSH_PRIO 	0x515a
 #define _STARPU_FUT_SCHED_COMPONENT_POP_PRIO 	0x515b
 
+#define	_STARPU_FUT_START_WRITEBACK_ASYNC	0x515c
+#define	_STARPU_FUT_END_WRITEBACK_ASYNC	0x515d
+
 #define	_STARPU_FUT_HYPERVISOR_BEGIN    0x5160
 #define	_STARPU_FUT_HYPERVISOR_END	0x5161
 
@@ -606,6 +609,12 @@ do {										\
 #define _STARPU_TRACE_END_MEMRECLAIM(memnode, is_prefetch)		\
 	FUT_DO_PROBE3(_STARPU_FUT_END_MEMRECLAIM, memnode, is_prefetch, _starpu_gettid());
 	
+#define _STARPU_TRACE_START_WRITEBACK_ASYNC(memnode)		\
+	FUT_DO_PROBE2(_STARPU_FUT_START_WRITEBACK_ASYNC, memnode, _starpu_gettid());
+	
+#define _STARPU_TRACE_END_WRITEBACK_ASYNC(memnode)		\
+	FUT_DO_PROBE2(_STARPU_FUT_END_WRITEBACK_ASYNC, memnode, _starpu_gettid());
+	
 /* We skip these events becasue they are called so often that they cause FxT to
  * fail and make the overall trace unreadable anyway. */
 #define _STARPU_TRACE_START_PROGRESS(memnode)		\
@@ -865,6 +874,8 @@ do {										\
 #define _STARPU_TRACE_END_WRITEBACK(memnode)		do {} while(0)
 #define _STARPU_TRACE_START_MEMRECLAIM(memnode,is_prefetch)	do {} while(0)
 #define _STARPU_TRACE_END_MEMRECLAIM(memnode,is_prefetch)	do {} while(0)
+#define _STARPU_TRACE_START_WRITEBACK_ASYNC(memnode)	do {} while(0)
+#define _STARPU_TRACE_END_WRITEBACK_ASYNC(memnode)	do {} while(0)
 #define _STARPU_TRACE_START_PROGRESS(memnode)	do {} while(0)
 #define _STARPU_TRACE_END_PROGRESS(memnode)	do {} while(0)
 #define _STARPU_TRACE_USER_EVENT(code)		do {} while(0)

+ 3 - 0
src/datawizard/coherency.c

@@ -186,6 +186,9 @@ void _starpu_update_data_state(starpu_data_handle_t handle,
 			handle->per_node[node].state = STARPU_INVALID;
 
 		requesting_replicate->state = STARPU_OWNER;
+		if (handle->home_node != -1 && handle->per_node[handle->home_node].state == STARPU_INVALID)
+			/* Notify that this MC is now dirty */
+			_starpu_memchunk_dirty(requesting_replicate->mc, requesting_replicate->memory_node);
 	}
 	else
 	{ /* read only */

+ 188 - 9
src/datawizard/memalloc.c

@@ -26,14 +26,40 @@
 /* Note: handle header lock is always taken before this (normal add/remove case) */
 static struct _starpu_spinlock mc_lock[STARPU_MAXNODES];
 
-/* Potentially in use memory chunks */
+/* Potentially in use memory chunks. The beginning of the list is clean (home
+ * node has a copy of the data, or the data is being transferred there), the
+ * remainder of the list may not be clean. */
 static struct _starpu_mem_chunk_list *mc_list[STARPU_MAXNODES];
+/* This is a shortcut inside the mc_list to the first potentially dirty MC. All
+ * MC before this are clean, MC before this only *may* be clean. */
+static struct _starpu_mem_chunk *mc_dirty_head[STARPU_MAXNODES];
+/* Number of elements in mc_list, number of elements in the clean part of
+ * mc_list plus the non-automatically allocated elements (which are thus always
+ * considered as clean) */
+static unsigned mc_nb[STARPU_MAXNODES], mc_clean_nb[STARPU_MAXNODES];
+
+/* TODO: no home doesn't mean always clean, should push to larger memory nodes */
+/* TODO: REDUX always dirty */
 
 #define MC_LIST_PUSH_BACK(node, mc) do {				 \
-	_starpu_mem_chunk_list_push_back(mc_list[node], mc);	 \
+	_starpu_mem_chunk_list_push_back(mc_list[node], mc);		 \
+	if ((mc)->clean || (mc)->home)					 \
+		/* This is clean */				 \
+		mc_clean_nb[node]++;					 \
+	else if (!mc_dirty_head[node])					 \
+		/* This is the only dirty element for now */		 \
+		mc_dirty_head[node] = mc;				 \
+	mc_nb[node]++;							 \
 } while(0)
 
 #define MC_LIST_ERASE(node, mc) do {					 \
+	if ((mc)->clean || (mc)->home)					 \
+		mc_clean_nb[node]--; /* One clean element less */	 \
+	if ((mc) == mc_dirty_head[node])				 \
+		/* This was the dirty head */				 \
+		mc_dirty_head[node] = _starpu_mem_chunk_list_next((mc)); \
+	/* One element less */						 \
+	mc_nb[node]--;							 \
 	/* Remove element */						 \
 	_starpu_mem_chunk_list_erase(mc_list[node], (mc));		 \
 	/* Notify whoever asked for it */				 \
@@ -72,6 +98,8 @@ void _starpu_init_mem_chunk_lists(void)
 		_starpu_spin_init(&mc_lock[i]);
 		mc_list[i] = _starpu_mem_chunk_list_new();
 		STARPU_HG_DISABLE_CHECKING(mc_cache_size[i]);
+		STARPU_HG_DISABLE_CHECKING(mc_nb[i]);
+		STARPU_HG_DISABLE_CHECKING(mc_clean_nb[i]);
 	}
 }
 
@@ -81,6 +109,9 @@ void _starpu_deinit_mem_chunk_lists(void)
 	for (i = 0; i < STARPU_MAXNODES; i++)
 	{
 		struct mc_cache_entry *entry, *tmp;
+		STARPU_ASSERT(mc_nb[i] == 0);
+		STARPU_ASSERT(mc_clean_nb[i] == 0);
+		STARPU_ASSERT(mc_dirty_head[i] == NULL);
 		_starpu_mem_chunk_list_delete(mc_list[i]);
 		HASH_ITER(hh, mc_cache[i], entry, tmp)
 		{
@@ -838,6 +869,119 @@ void starpu_memchunk_tidy(unsigned node)
 	size_t target, amount;
 	unsigned minimum_p = starpu_get_env_number_default("STARPU_MINIMUM_AVAILABLE_MEM", 5);
 	unsigned target_p = starpu_get_env_number_default("STARPU_TARGET_AVAILABLE_MEM", 10);
+	unsigned minimum_clean_p = starpu_get_env_number_default("STARPU_MINIMUM_CLEAN_BUFFERS", 10);
+	unsigned target_clean_p = starpu_get_env_number_default("STARPU_TARGET_CLEAN_BUFFERS", 25);
+
+	if (mc_clean_nb[node] < (mc_nb[node] * minimum_clean_p) / 100)
+	{
+		struct _starpu_mem_chunk *mc, *orig_next_mc, *next_mc;
+		int skipped = 0;	/* Whether we skipped a dirty MC, and we should thus stop updating mc_dirty_head. */
+
+		_STARPU_DEBUG("%d not clean: %d %d\n", node, mc_clean_nb[node], mc_nb[node]);
+
+		_STARPU_TRACE_START_WRITEBACK_ASYNC(node);
+		_starpu_spin_lock(&mc_lock[node]);
+
+		for (mc = mc_dirty_head[node];
+			mc && mc_clean_nb[node] < (mc_nb[node] * target_clean_p) / 100;
+			mc = next_mc, mc && skipped ? 0 : (mc_dirty_head[node] = mc))
+		{
+			starpu_data_handle_t handle;
+
+			/* mc may get out of the list, we thus need to prefetch
+			 * the next element */
+			next_mc = _starpu_mem_chunk_list_next(mc);
+
+			if (mc->home)
+				/* Home node, it's always clean */
+				continue;
+			if (mc->clean)
+				/* already clean */
+				continue;
+			if (next_mc && next_mc->remove_notify)
+			{
+				/* Somebody already working here, skip */
+				skipped = 1;
+				continue;
+			}
+
+			handle = mc->data;
+			STARPU_ASSERT(handle);
+			STARPU_ASSERT(handle->home_node != -1);
+
+			if (_starpu_spin_trylock(&handle->header_lock))
+			{
+				/* the handle is busy, abort */
+				skipped = 1;
+				continue;
+			}
+
+			if (
+				/* This data should be written through to this node, avoid
+				 * dropping it! */
+				handle->wt_mask & (1<<node)
+				/* This is partitioned, don't care about the
+				 * whole data, we'll work on the subdatas.  */
+			     || handle->nchildren
+			        /* Somebody is still writing to it */
+			     || (_starpu_get_data_refcnt(handle, node) && handle->current_mode == STARPU_W)
+				/* REDUX, can't do anything with it, skip it */
+			     || mc->relaxed_coherency == 2
+			) {
+				_starpu_spin_unlock(&handle->header_lock);
+				continue;
+			}
+
+			/* This should have been marked as clean already */
+			STARPU_ASSERT(mc->relaxed_coherency != 1);
+			if (handle->per_node[handle->home_node].state != STARPU_INVALID) {
+				/* it's actually clean */
+				mc->clean = 1;
+				mc_clean_nb[node]++;
+			} else {
+				/* MC is dirty, submit writeback */
+
+				/* MC will be clean, consider it as such */
+				mc->clean = 1;
+				mc_clean_nb[node]++;
+
+				orig_next_mc = next_mc;
+				if (next_mc)
+				{
+					STARPU_ASSERT(!next_mc->remove_notify);
+					next_mc->remove_notify = &next_mc;
+				}
+
+				_starpu_spin_unlock(&mc_lock[node]);
+				if (!_starpu_create_request_to_fetch_data(handle, &handle->per_node[handle->home_node], STARPU_R, 2, 1, NULL, NULL))
+				{
+					/* No request was actually needed??
+					 * Odd, but cope with it.  */
+					handle = NULL;
+				}
+				_starpu_spin_lock(&mc_lock[node]);
+
+				if (orig_next_mc)
+				{
+					if (!next_mc)
+						/* Oops, somebody dropped the next item while we were
+						 * not keeping the mc_lock. Give up for now, and we'll
+						 * see the rest later */
+						;
+					else
+					{
+						STARPU_ASSERT(next_mc->remove_notify == &next_mc);
+						next_mc->remove_notify = NULL;
+					}
+				}
+			}
+
+			if (handle)
+				_starpu_spin_unlock(&handle->header_lock);
+		}
+		_starpu_spin_unlock(&mc_lock[node]);
+		_STARPU_TRACE_END_WRITEBACK_ASYNC(node);
+	}
 
 	if (total <= 0)
 		return;
@@ -871,9 +1015,7 @@ void starpu_memchunk_tidy(unsigned node)
 		}
 	}
 
-	/* TODO: only request writebacks to get buffers clean, without waiting
-	 * for it */
-
+	_STARPU_TRACE_START_MEMRECLAIM(node,2);
 	_STARPU_TRACE_START_MEMRECLAIM(node,2);
 	free_potentially_in_use_mc(node, 0, amount);
 	_STARPU_TRACE_END_MEMRECLAIM(node,2);
@@ -881,7 +1023,7 @@ out:
 	(void) STARPU_ATOMIC_ADD(&tidying[node], -1);
 }
 
-static struct _starpu_mem_chunk *_starpu_memchunk_init(struct _starpu_data_replicate *replicate, size_t interface_size, unsigned automatically_allocated)
+static struct _starpu_mem_chunk *_starpu_memchunk_init(struct _starpu_data_replicate *replicate, size_t interface_size, unsigned home, unsigned automatically_allocated)
 {
 	struct _starpu_mem_chunk *mc = _starpu_mem_chunk_new();
 	starpu_data_handle_t handle = replicate->handle;
@@ -894,6 +1036,14 @@ static struct _starpu_mem_chunk *_starpu_memchunk_init(struct _starpu_data_repli
 	mc->ops = handle->ops;
 	mc->automatically_allocated = automatically_allocated;
 	mc->relaxed_coherency = replicate->relaxed_coherency;
+	mc->home = home;
+	mc->clean = 0;
+	if (replicate->relaxed_coherency == 1)
+		/* SCRATCH is always easy to drop, thus clean */
+		mc->clean = 1;
+	else if (replicate->relaxed_coherency == 0 && handle->home_node != -1 && handle->per_node[replicate->memory_node].state != STARPU_INVALID)
+		/* This is a normal data and the home node has the value */
+		mc->clean = 1;
 	mc->replicate = replicate;
 	mc->replicate->mc = mc;
 	mc->chunk_interface = NULL;
@@ -903,7 +1053,7 @@ static struct _starpu_mem_chunk *_starpu_memchunk_init(struct _starpu_data_repli
 	return mc;
 }
 
-static void register_mem_chunk(struct _starpu_data_replicate *replicate, unsigned automatically_allocated)
+static void register_mem_chunk(starpu_data_handle_t handle, struct _starpu_data_replicate *replicate, unsigned automatically_allocated)
 {
 	unsigned dst_node = replicate->memory_node;
 
@@ -913,7 +1063,7 @@ static void register_mem_chunk(struct _starpu_data_replicate *replicate, unsigne
 	size_t interface_size = replicate->handle->ops->interface_size;
 
 	/* Put this memchunk in the list of memchunk in use */
-	mc = _starpu_memchunk_init(replicate, interface_size, automatically_allocated);
+	mc = _starpu_memchunk_init(replicate, interface_size, handle->home_node == -1 || dst_node == handle->home_node, automatically_allocated);
 
 	_starpu_spin_lock(&mc_lock[dst_node]);
 	MC_LIST_PUSH_BACK(dst_node, mc);
@@ -1116,6 +1266,7 @@ int _starpu_allocate_memory_on_node(starpu_data_handle_t handle, struct _starpu_
 	unsigned dst_node = replicate->memory_node;
 
 	STARPU_ASSERT(handle);
+	_starpu_spin_checklocked(&handle->header_lock);
 
 	/* A buffer is already allocated on the node */
 	if (replicate->allocated)
@@ -1132,7 +1283,7 @@ int _starpu_allocate_memory_on_node(starpu_data_handle_t handle, struct _starpu_
 		/* Somebody allocated it in between already */
 		return 0;
 
-	register_mem_chunk(replicate, 1);
+	register_mem_chunk(handle, replicate, 1);
 
 	replicate->allocated = 1;
 	replicate->automatically_allocated = 1;
@@ -1169,6 +1320,34 @@ void _starpu_memchunk_recently_used(struct _starpu_mem_chunk *mc, unsigned node)
 	_starpu_spin_unlock(&mc_lock[node]);
 }
 
+/* This memchunk is being written to, and thus becomes dirty */
+void _starpu_memchunk_dirty(struct _starpu_mem_chunk *mc, unsigned node)
+{
+	if (!mc)
+		/* user-allocated memory */
+		return;
+	if (mc->home)
+		/* Home is always clean */
+		return;
+	_starpu_spin_lock(&mc_lock[node]);
+	if (mc->relaxed_coherency == 1)
+	{
+		/* SCRATCH, make it clean if not already*/
+		if (!mc->clean)
+		{
+			mc_clean_nb[node]++;
+			mc->clean = 1;
+		}
+	} else {
+		if (mc->clean)
+		{
+			mc_clean_nb[node]--;
+			mc->clean = 0;
+		}
+	}
+	_starpu_spin_unlock(&mc_lock[node]);
+}
+
 #ifdef STARPU_MEMORY_STATS
 void _starpu_memory_display_stats_by_node(int node)
 {

+ 11 - 4
src/datawizard/memalloc.h

@@ -47,7 +47,16 @@ LIST_TYPE(_starpu_mem_chunk,
 	struct starpu_data_interface_ops *ops;
 	void *chunk_interface;
 	size_t size_interface;
-	unsigned automatically_allocated;
+
+	/* Whether StarPU automatically allocated this memory, or the application did */
+	unsigned automatically_allocated:1;
+	/* A buffer that is used for SCRATCH or reduction cannnot be used with
+	 * filters. */
+	unsigned relaxed_coherency:2;
+	/* Whether this is the home chunk, or there is no home chunk (and it is thus always clean) */
+	unsigned home:1;
+	/* Whether the memchunk is in the clean part of the mc_list */
+	unsigned clean:1;
 
 	/* the size of the data is only set when calling _starpu_request_mem_chunk_removal(),
 	 * it is needed to estimate how much memory is in mc_cache, and by
@@ -57,9 +66,6 @@ LIST_TYPE(_starpu_mem_chunk,
 	 */
 	size_t size;
 
-	/* A buffer that is used for SCRATCH or reduction cannnot be used with
-	 * filters. */
-	unsigned relaxed_coherency;
 	struct _starpu_data_replicate *replicate;
 
 	/* This is set when one keeps a pointer to this mc obtained from the
@@ -75,6 +81,7 @@ void _starpu_request_mem_chunk_removal(starpu_data_handle_t handle, struct _star
 int _starpu_allocate_memory_on_node(starpu_data_handle_t handle, struct _starpu_data_replicate *replicate, unsigned is_prefetch);
 size_t _starpu_free_all_automatically_allocated_buffers(unsigned node);
 void _starpu_memchunk_recently_used(struct _starpu_mem_chunk *mc, unsigned node);
+void _starpu_memchunk_dirty(struct _starpu_mem_chunk *mc, unsigned node);
 
 void _starpu_display_memory_stats_by_node(int node);
 size_t _starpu_memory_reclaim_generic(unsigned node, unsigned force, size_t reclaim);

+ 16 - 0
src/debug/traces/starpu_fxt.c

@@ -1947,6 +1947,22 @@ void _starpu_fxt_parse_new_file(char *filename_in, struct starpu_fxt_options *op
 						handle_memnode_event(&ev, options, "No");
 				}
 				break;
+			case _STARPU_FUT_START_WRITEBACK_ASYNC:
+				if (!options->no_bus)
+				{
+					handle_memnode_event(&ev, options, "Wa");
+				}
+				break;
+			case _STARPU_FUT_END_WRITEBACK_ASYNC:
+				if (!options->no_bus)
+				{
+					unsigned memnode = ev.param[0];
+					if (reclaiming[memnode])
+						handle_memnode_event(&ev, options, "R");
+					else
+						handle_memnode_event(&ev, options, "No");
+				}
+				break;
 			case _STARPU_FUT_START_MEMRECLAIM:
 				if (!options->no_bus)
 				{

+ 4 - 2
src/debug/traces/starpu_paje.c

@@ -162,7 +162,8 @@ void _starpu_fxt_write_paje_header(FILE *file STARPU_ATTRIBUTE_UNUSED)
 	poti_DefineEntityValue("A", "MS", "Allocating", ".4 .1 .0");
 	poti_DefineEntityValue("Ar", "MS", "AllocatingReuse", ".1 .1 .8");
 	poti_DefineEntityValue("F", "MS", "Freeing", ".6 .3 .0");
-	poti_DefineEntityValue("W", "MS", "WritingBack", ".0 .0 .4");
+	poti_DefineEntityValue("W", "MS", "WritingBack", ".0 .0 .5");
+	poti_DefineEntityValue("Wa", "MS", "WritingBackAsync", ".0 .0 .4");
 	poti_DefineEntityValue("R", "MS", "Reclaiming", ".0 .1 .6");
 	poti_DefineEntityValue("Co", "MS", "DriverCopy", ".3 .5 .1");
 	poti_DefineEntityValue("CoA", "MS", "DriverCopyAsync", ".1 .3 .1");
@@ -317,7 +318,8 @@ void _starpu_fxt_write_paje_header(FILE *file STARPU_ATTRIBUTE_UNUSED)
 6       A       MS      Allocating         \".4 .1 .0\"		\n\
 6       Ar       MS      AllocatingReuse       \".1 .1 .8\"		\n\
 6       F       MS      Freeing         \".6 .3 .0\"		\n\
-6       W       MS      WritingBack         \".0 .0 .4\"		\n\
+6       W       MS      WritingBack         \".0 .0 .5\"		\n\
+6       Wa       MS     WritingBackAsync    \".0 .0 .4\"		\n\
 6       R       MS      Reclaiming         \".0 .1 .6\"		\n\
 6       Co       MS     DriverCopy         \".3 .5 .1\"		\n\
 6       CoA      MS     DriverCopyAsync         \".1 .3 .1\"		\n\