Przeglądaj źródła

Fix lock ordering: when one has to allocate destination room for reclaiming, we have to unlock the data handle lock and acquire mc_lock, so we need to release the mc_lock before calling try_to_free_mem_chunk or try_to_reuse_mem_chunk. Also add a stressing disk reclaiming testcase.

Samuel Thibault 10 lat temu
rodzic
commit
5af6d13d14
4 zmienionych plików z 324 dodań i 43 usunięć
  1. 131 42
      src/datawizard/memalloc.c
  2. 10 1
      src/datawizard/memalloc.h
  3. 1 0
      tests/Makefile.am
  4. 182 0
      tests/disk/mem_reclaim.c

+ 131 - 42
src/datawizard/memalloc.c

@@ -23,7 +23,7 @@
 #include <common/uthash.h>
 
 /* This per-node RW-locks protect mc_list and memchunk_cache entries */
-/* Note: handle header lock is always taken before this */
+/* Note: handle header lock is always taken before this (normal add/remove case) */
 static struct _starpu_spinlock mc_lock[STARPU_MAXNODES];
 
 /* Potentially in use memory chunks */
@@ -307,6 +307,7 @@ static size_t free_memory_on_node(struct _starpu_mem_chunk *mc, unsigned node)
 
 
 
+/* mc_lock is held */
 static size_t do_free_mem_chunk(struct _starpu_mem_chunk *mc, unsigned node)
 {
 	size_t size;
@@ -325,6 +326,11 @@ static size_t do_free_mem_chunk(struct _starpu_mem_chunk *mc, unsigned node)
 
 	/* remove the mem_chunk from the list */
 	_starpu_mem_chunk_list_erase(mc_list[node], mc);
+	if (mc->remove_notify)
+	{
+		*(mc->remove_notify) = NULL;
+		mc->remove_notify = NULL;
+	}
 
 	_starpu_mem_chunk_delete(mc);
 
@@ -333,6 +339,7 @@ static size_t do_free_mem_chunk(struct _starpu_mem_chunk *mc, unsigned node)
 
 /* This function is called for memory chunks that are possibly in used (ie. not
  * in the cache). They should therefore still be associated to a handle. */
+/* mc_lock is held and may be temporarily released! */
 static size_t try_to_free_mem_chunk(struct _starpu_mem_chunk *mc, unsigned node)
 {
 	size_t freed = 0;
@@ -395,21 +402,37 @@ static size_t try_to_free_mem_chunk(struct _starpu_mem_chunk *mc, unsigned node)
 			target = choose_target(handle, node);
 
 			if (target != -1) {
+				/* Should have been avoided in our caller */
+				STARPU_ASSERT(!mc->remove_notify);
+				mc->remove_notify = &mc;
+				_starpu_spin_unlock(&mc_lock[node]);
 #ifdef STARPU_MEMORY_STATS
 				if (handle->per_node[node].state == STARPU_OWNER)
 					_starpu_memory_handle_stats_invalidated(handle, node);
 #endif
 				_STARPU_TRACE_START_WRITEBACK(node);
+				/* Note: this may need to allocate data etc.
+				 * and thus release the header lock, take
+				 * mc_lock, etc. */
 				transfer_subtree_to_node(handle, node, target);
 				_STARPU_TRACE_END_WRITEBACK(node);
 #ifdef STARPU_MEMORY_STATS
 				_starpu_memory_handle_stats_loaded_owner(handle, target);
 #endif
+				_starpu_spin_lock(&mc_lock[node]);
 
-				STARPU_ASSERT(handle->per_node[node].refcnt == 0);
-
-				/* now the actual buffer may be freed */
-				freed = do_free_mem_chunk(mc, node);
+				if (mc)
+				{
+					STARPU_ASSERT(mc->remove_notify == &mc);
+					mc->remove_notify = NULL;
+					/* mc is still associated with the old
+					 * handle, now free it.
+					 */
+					STARPU_ASSERT(handle->per_node[node].refcnt == 0);
+
+					/* now the actual buffer may be freed */
+					freed = do_free_mem_chunk(mc, node);
+				}
 			}
 		}
 
@@ -463,11 +486,17 @@ static void reuse_mem_chunk(unsigned node, struct _starpu_data_replicate *new_re
 	if (is_already_in_mc_list)
 	{
 		_starpu_mem_chunk_list_erase(mc_list[node], mc);
+		if (mc->remove_notify)
+		{
+			*(mc->remove_notify) = NULL;
+			mc->remove_notify = NULL;
+		}
 	}
 
 	free(mc);
 }
 
+/* mc_lock is held and may be temporarily released! */
 static unsigned try_to_reuse_mem_chunk(struct _starpu_mem_chunk *mc, unsigned node, struct _starpu_data_replicate *replicate, unsigned is_already_in_mc_list)
 {
 	unsigned success = 0;
@@ -484,16 +513,27 @@ static unsigned try_to_reuse_mem_chunk(struct _starpu_mem_chunk *mc, unsigned no
 	{
 		if (may_free_subtree(old_data, node))
 		{
-			success = 1;
-
+			/* Should have been avoided in our caller */
+			STARPU_ASSERT(!mc->remove_notify);
+			mc->remove_notify = &mc;
 			/* in case there was nobody using that buffer, throw it
 			 * away after writing it back to main memory */
+			_starpu_spin_unlock(&mc_lock[node]);
 			_STARPU_TRACE_START_WRITEBACK(node);
 			transfer_subtree_to_node(old_data, node, STARPU_MAIN_RAM);
 			_STARPU_TRACE_END_WRITEBACK(node);
+			_starpu_spin_lock(&mc_lock[node]);
 
-			/* now replace the previous data */
-			reuse_mem_chunk(node, replicate, mc, is_already_in_mc_list);
+			if (mc)
+			{
+				STARPU_ASSERT(mc->remove_notify == &mc);
+				mc->remove_notify = NULL;
+				/* mc is still associated with the old
+				 * handle, now replace the previous data
+				 */
+				reuse_mem_chunk(node, replicate, mc, is_already_in_mc_list);
+				success = 1;
+			}
 		}
 
 		/* unlock the tree */
@@ -550,54 +590,83 @@ static struct _starpu_mem_chunk *_starpu_memchunk_cache_lookup_locked(unsigned n
 }
 
 /* this function looks for a memory chunk that matches a given footprint in the
- * list of mem chunk that need to be freed. This function must be called with
- * mc_lock[node] taken. */
+ * list of mem chunk that need to be freed. */
 static unsigned try_to_find_reusable_mem_chunk(unsigned node, starpu_data_handle_t data, struct _starpu_data_replicate *replicate, uint32_t footprint)
 {
-	struct _starpu_mem_chunk *mc, *next_mc;
+	struct _starpu_mem_chunk *mc, *orig_next_mc, *next_mc;
+	int success = 0;
 
+	_starpu_spin_lock(&mc_lock[node]);
 	/* go through all buffers in the cache */
 	mc = _starpu_memchunk_cache_lookup_locked(node, data, footprint);
 	if (mc)
 	{
 		/* We found an entry in the cache so we can reuse it */
 		reuse_mem_chunk(node, replicate, mc, 0);
+		_starpu_spin_unlock(&mc_lock[node]);
 		return 1;
 	}
 
 	if (!_starpu_has_not_important_data)
+	{
+		_starpu_spin_unlock(&mc_lock[node]);
 		return 0;
+	}
 
+restart:
 	/* now look for some non essential data in the active list */
 	for (mc = _starpu_mem_chunk_list_begin(mc_list[node]);
-	     mc != _starpu_mem_chunk_list_end(mc_list[node]);
+	     mc != _starpu_mem_chunk_list_end(mc_list[node]) && !success;
 	     mc = next_mc)
 	{
 		/* there is a risk that the memory chunk is freed before next
 		 * iteration starts: so we compute the next element of the list
 		 * now */
-		next_mc = _starpu_mem_chunk_list_next(mc);
+		orig_next_mc = next_mc = _starpu_mem_chunk_list_next(mc);
+		if (mc->remove_notify)
+			/* Somebody already working here, skip */
+			continue;
+		if (next_mc)
+		{
+			if (next_mc->remove_notify)
+				/* Somebody already working here, skip */
+				continue;
+			next_mc->remove_notify = &next_mc;
+		}
 
 		if (mc->data->is_not_important && (mc->footprint == footprint))
 		{
 //			fprintf(stderr, "found a candidate ...\n");
-			if (try_to_reuse_mem_chunk(mc, node, replicate, 1))
-				return 1;
+			/* Note: this may unlock mc_list! */
+			success = try_to_reuse_mem_chunk(mc, node, replicate, 1);
+		}
+
+		if (orig_next_mc)
+		{
+			if (!next_mc)
+				/* Oops, somebody dropped the next item while we were
+				 * not keeping the mc_lock. Restart from the beginning
+				 * of the list */
+				goto restart;
+			else
+			{
+				STARPU_ASSERT(next_mc->remove_notify == &next_mc);
+				next_mc->remove_notify = NULL;
+			}
 		}
 	}
+	_starpu_spin_unlock(&mc_lock[node]);
 
-	return 0;
+	return success;
 }
 #endif
 
 /*
- * Free the memory chuncks that are explicitely tagged to be freed. The
- * mc_lock[node] rw-lock should be taken prior to calling this function.
+ * Free the memory chuncks that are explicitely tagged to be freed.
  */
 static size_t flush_memchunk_cache(unsigned node, size_t reclaim)
 {
 	struct _starpu_mem_chunk *mc;
-	struct _starpu_mem_chunk_list *busy_mc_cache;
 	struct mc_cache_entry *entry, *tmp;
 
 	size_t freed = 0;
@@ -605,26 +674,16 @@ static size_t flush_memchunk_cache(unsigned node, size_t reclaim)
 	_starpu_spin_lock(&mc_lock[node]);
 	HASH_ITER(hh, mc_cache[node], entry, tmp)
 	{
-		busy_mc_cache = _starpu_mem_chunk_list_new();
-
 		while (!_starpu_mem_chunk_list_empty(entry->list)) {
 			mc = _starpu_mem_chunk_list_pop_front(entry->list);
-			starpu_data_handle_t handle = mc->data;
-
-			if (handle)
-				if (_starpu_spin_trylock(&handle->header_lock)) {
-					/* The handle is still busy, leave this chunk for later */
-					_starpu_mem_chunk_list_push_back(busy_mc_cache, mc);
-					continue;
-				}
+			STARPU_ASSERT(!mc->data);
+			STARPU_ASSERT(!mc->replicate);
 
 			mc_cache_nb[node]--;
 			STARPU_ASSERT(mc_cache_nb[node] >= 0);
 			mc_cache_size[node] -= mc->size;
 			STARPU_ASSERT(mc_cache_size[node] >= 0);
 			freed += free_memory_on_node(mc, node);
-			if (handle)
-				_starpu_spin_unlock(&handle->header_lock);
 
 			free(mc->chunk_interface);
 			_starpu_mem_chunk_delete(mc);
@@ -632,8 +691,6 @@ static size_t flush_memchunk_cache(unsigned node, size_t reclaim)
 			if (reclaim && freed >= reclaim)
 				break;
 		}
-		_starpu_mem_chunk_list_push_list_front(busy_mc_cache, entry->list);
-		_starpu_mem_chunk_list_delete(busy_mc_cache);
 
 		if (reclaim && freed >= reclaim)
 			break;
@@ -665,8 +722,9 @@ static size_t free_potentially_in_use_mc(unsigned node, unsigned force, size_t r
 restart:
 	_starpu_spin_lock(&mc_lock[node]);
 
+restart2:
 	for (mc = _starpu_mem_chunk_list_begin(mc_list[node]);
-	     mc != _starpu_mem_chunk_list_end(mc_list[node]);
+	     mc != _starpu_mem_chunk_list_end(mc_list[node]) && (!reclaim || freed < reclaim);
 	     mc = next_mc)
 	{
 		/* mc hopefully gets out of the list, we thus need to prefetch
@@ -675,13 +733,37 @@ restart:
 
 		if (!force)
 		{
+			struct _starpu_mem_chunk *orig_next_mc = next_mc;
+			if (mc->remove_notify)
+				/* Somebody already working here, skip */
+				continue;
+			if (next_mc)
+			{
+				if (next_mc->remove_notify)
+					/* Somebody already working here, skip */
+					continue;
+				next_mc->remove_notify = &next_mc;
+			}
+			/* Note: this may unlock mc_list! */
 			freed += try_to_free_mem_chunk(mc, node);
 
-			if (reclaim && freed >= reclaim)
-				break;
+			if (orig_next_mc)
+			{
+				if (!next_mc)
+					/* Oops, somebody dropped the next item while we were
+					 * not keeping the mc_lock. Restart from the beginning
+					 * of the list */
+					goto restart2;
+				else
+				{
+					STARPU_ASSERT(next_mc->remove_notify == &next_mc);
+					next_mc->remove_notify = NULL;
+				}
+			}
 		}
 		else
 		{
+			/* Shutting down, really free */
 			starpu_data_handle_t handle = mc->data;
 
 			if (_starpu_spin_trylock(&handle->header_lock))
@@ -811,6 +893,7 @@ static struct _starpu_mem_chunk *_starpu_memchunk_init(struct _starpu_data_repli
 	mc->replicate->mc = mc;
 	mc->chunk_interface = NULL;
 	mc->size_interface = interface_size;
+	mc->remove_notify = NULL;
 
 	return mc;
 }
@@ -843,6 +926,7 @@ void _starpu_request_mem_chunk_removal(starpu_data_handle_t handle, struct _star
 	struct _starpu_mem_chunk *mc = replicate->mc;
 
 	STARPU_ASSERT(mc->data == handle);
+	_starpu_spin_checklocked(&handle->header_lock);
 
 	/* Record the allocated size, so that later in memory
 	 * reclaiming we can estimate how much memory we free
@@ -866,6 +950,11 @@ void _starpu_request_mem_chunk_removal(starpu_data_handle_t handle, struct _star
 	mc->data = NULL;
 	/* remove it from the main list */
 	_starpu_mem_chunk_list_erase(mc_list[node], mc);
+	if (mc->remove_notify)
+	{
+		*(mc->remove_notify) = NULL;
+		mc->remove_notify = NULL;
+	}
 
 	_starpu_spin_unlock(&mc_lock[node]);
 
@@ -934,16 +1023,11 @@ static starpu_ssize_t _starpu_allocate_interface(starpu_data_handle_t handle, st
 	uint32_t footprint = _starpu_compute_data_footprint(handle);
 
 	_STARPU_TRACE_START_ALLOC_REUSE(dst_node, data_size);
-	_starpu_spin_lock(&mc_lock[dst_node]);
-
 	if (try_to_find_reusable_mem_chunk(dst_node, handle, replicate, footprint))
 	{
-		_starpu_spin_unlock(&mc_lock[dst_node]);
 		_starpu_allocation_cache_hit(dst_node);
 		return data_size;
 	}
-
-	_starpu_spin_unlock(&mc_lock[dst_node]);
 	_STARPU_TRACE_END_ALLOC_REUSE(dst_node);
 #endif
 	STARPU_ASSERT(handle->ops);
@@ -1083,6 +1167,11 @@ void _starpu_memchunk_recently_used(struct _starpu_mem_chunk *mc, unsigned node)
 		return;
 	_starpu_spin_lock(&mc_lock[node]);
 	_starpu_mem_chunk_list_erase(mc_list[node], mc);
+	if (mc->remove_notify)
+	{
+		*(mc->remove_notify) = NULL;
+		mc->remove_notify = NULL;
+	}
 	_starpu_mem_chunk_list_push_back(mc_list[node], mc);
 	_starpu_spin_unlock(&mc_lock[node]);
 }

+ 10 - 1
src/datawizard/memalloc.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2009-2010, 2012-2014  Université de Bordeaux
+ * Copyright (C) 2009-2010, 2012-2015  Université de Bordeaux
  * Copyright (C) 2010, 2011, 2012, 2013  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -28,7 +28,10 @@
 
 struct _starpu_data_replicate;
 
+/* While associated with a handle, the content is protected by the handle lock, except a few fields
+ */
 LIST_TYPE(_starpu_mem_chunk,
+	/* protected by the mc_lock */
 	starpu_data_handle_t data;
 
 	uint32_t footprint;
@@ -58,6 +61,12 @@ LIST_TYPE(_starpu_mem_chunk,
 	 * filters. */
 	unsigned relaxed_coherency;
 	struct _starpu_data_replicate *replicate;
+
+	/* This is set when one keeps a pointer to this mc obtained from the
+	 * mc_list without mc_lock held. We need to clear the pointer if we
+	 * remove this entry from the mc_list, so we know we have to restart
+	 * from zero. This is protected by the corresponding mc_lock.  */
+	struct _starpu_mem_chunk **remove_notify;
 )
 
 void _starpu_init_mem_chunk_lists(void);

+ 1 - 0
tests/Makefile.am

@@ -214,6 +214,7 @@ noinst_PROGRAMS =				\
 	disk/disk_copy				\
 	disk/disk_compute			\
 	disk/disk_pack				\
+	disk/mem_reclaim			\
 	errorcheck/starpu_init_noworker		\
 	errorcheck/invalid_blocking_calls	\
 	errorcheck/invalid_tasks		\

+ 182 - 0
tests/disk/mem_reclaim.c

@@ -0,0 +1,182 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2013 Corentin Salingue
+ * Copyright (C) 2015 Centre National de la Recherche Scientifique
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+/* Try to write into disk memory
+ * Use mechanism to push datas from main ram to disk ram
+ */
+
+#include <fcntl.h>
+#include <starpu.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <unistd.h>
+#include <math.h>
+#include <common/config.h>
+#include "../helper.h"
+
+#ifdef STARPU_HAVE_WINDOWS
+#  include <io.h>
+#  if defined(_WIN32) && !defined(__CYGWIN__)
+#    define mkdir(path, mode) mkdir(path)
+#  endif
+#endif
+
+#ifdef STARPU_QUICK_CHECK
+#  define NDATA 4
+#  define NITER 16
+#else
+#  define NDATA 128
+#  define NITER 1024
+#endif
+#  define MEMSIZE 1
+#  define MEMSIZE_STR "1"
+
+const struct starpu_data_copy_methods my_vector_copy_data_methods_s;
+struct starpu_data_interface_ops starpu_interface_my_vector_ops;
+
+void starpu_my_vector_data_register(starpu_data_handle_t *handleptr, unsigned home_node,
+                        uintptr_t ptr, uint32_t nx, size_t elemsize)
+{
+	struct starpu_vector_interface vector =
+	{
+		.id = STARPU_VECTOR_INTERFACE_ID,
+		.ptr = ptr,
+		.nx = nx,
+		.elemsize = elemsize,
+                .dev_handle = ptr,
+		.slice_base = 0,
+                .offset = 0
+	};
+
+	starpu_data_register(handleptr, home_node, &vector, &starpu_interface_my_vector_ops);
+}
+
+static void zero(void *buffers[], void *args)
+{
+	struct starpu_vector_interface *vector = (struct starpu_vector_interface *) buffers[0];
+	char *val = (char*) STARPU_VECTOR_GET_PTR(vector);
+	*val = 0;
+}
+
+static void inc(void *buffers[], void *args)
+{
+	struct starpu_vector_interface *vector = (struct starpu_vector_interface *) buffers[0];
+	char *val = (char*) STARPU_VECTOR_GET_PTR(vector);
+	*val++;
+}
+
+static struct starpu_codelet zero_cl =
+{
+	.cpu_funcs = { zero },
+	.nbuffers = 1,
+	.modes = { STARPU_W },
+};
+
+static struct starpu_codelet inc_cl =
+{
+	.cpu_funcs = { inc },
+	.nbuffers = 1,
+	.modes = { STARPU_RW },
+};
+
+int dotest(struct starpu_disk_ops *ops, char *base, void (*vector_data_register)(starpu_data_handle_t *handleptr, unsigned home_node, uintptr_t ptr, uint32_t nx, size_t elemsize))
+{
+	int *A, *C;
+	starpu_data_handle_t handles[NDATA];
+
+	/* Initialize StarPU without GPU devices to make sure the memory of the GPU devices will not be used */
+	struct starpu_conf conf;
+	int ret = starpu_conf_init(&conf);
+	if (ret == -EINVAL)
+		return EXIT_FAILURE;
+	conf.ncuda = 0;
+	conf.nopencl = 0;
+	ret = starpu_init(&conf);
+	if (ret == -ENODEV) goto enodev;
+
+	/* Initialize path and name */
+	/* register swap disk */
+	int new_dd = starpu_disk_register(ops, (void *) base, 1024*1024*16);
+	/* can't write on /tmp/ */
+	if (new_dd == -ENOENT) goto enoent;
+
+	unsigned dd = (unsigned) new_dd;
+
+	unsigned int i;
+
+	/* Initialize twice as much data as available memory */
+	for (i = 0; i < NDATA; i++)
+	{
+		vector_data_register(&handles[i], -1, 0, (MEMSIZE*1024*1024*2) / NDATA, sizeof(char));
+		starpu_task_insert(&zero_cl, STARPU_W, handles[i], 0);
+	}
+
+	for (i = 0; i < NITER; i++)
+		starpu_task_insert(&inc_cl, STARPU_RW, handles[random()%NDATA], 0);
+
+	/* Free data */
+	for (i = 0; i < NDATA; i++)
+		starpu_data_unregister(handles[i]);
+
+	/* terminate StarPU, no task can be submitted after */
+	starpu_shutdown();
+
+	return EXIT_SUCCESS;
+
+enoent:
+	FPRINTF(stderr, "Couldn't write data: ENOENT\n");
+	starpu_shutdown();
+enodev:
+	return STARPU_TEST_SKIPPED;
+}
+
+static int merge_result(int old, int new)
+{
+	if (new == EXIT_FAILURE)
+		return EXIT_FAILURE;
+	if (old == 0)
+		return 0;
+	return new;
+}
+
+int main(void)
+{
+	int ret = 0;
+	char s[128];
+	snprintf(s, sizeof(s), "/tmp/%s-disk-%d", getenv("USER"), getpid());
+	mkdir(s, 0777);
+
+	setenv("STARPU_LIMIT_CPU_MEM", MEMSIZE_STR, 1);
+
+	/* Build an vector-like interface which doesn't have the any_to_any helper, to force making use of pack/unpack */
+	memcpy(&starpu_interface_my_vector_ops, &starpu_interface_vector_ops, sizeof(starpu_interface_my_vector_ops));
+	starpu_interface_my_vector_ops.copy_methods = &my_vector_copy_data_methods_s;
+
+	ret = merge_result(ret, dotest(&starpu_disk_stdio_ops, s, starpu_vector_data_register));
+	ret = merge_result(ret, dotest(&starpu_disk_stdio_ops, s, starpu_my_vector_data_register));
+	ret = merge_result(ret, dotest(&starpu_disk_unistd_ops, s, starpu_vector_data_register));
+	ret = merge_result(ret, dotest(&starpu_disk_unistd_ops, s, starpu_my_vector_data_register));
+#ifdef STARPU_LINUX_SYS
+	ret = merge_result(ret, dotest(&starpu_disk_unistd_o_direct_ops, s, starpu_vector_data_register));
+	ret = merge_result(ret, dotest(&starpu_disk_unistd_o_direct_ops, s, starpu_my_vector_data_register));
+#endif
+	rmdir(s);
+	return ret;
+}