Kaynağa Gözat

Fix concurrency between reclaiming and tasks writing to data. Reclaiming has to acquire the rwlock on data to prevent tasks from writing to data. Also fix the mem_reclaim test to actually test the values, and not submit too many writing tasks at the same time

Samuel Thibault 9 yıl önce
ebeveyn
işleme
04ead5605d

+ 34 - 0
src/core/dependencies/data_concurrency.c

@@ -56,6 +56,40 @@
  * step, implemented in data_arbiter_concurrency.c
  */
 
+/* Just try to acquire in R or W mode, used for eviction */
+int _starpu_attempt_to_acquire_data(starpu_data_handle_t handle, enum starpu_data_access_mode mode)
+{
+	if (mode == STARPU_RW)
+		mode = STARPU_W;
+	STARPU_ASSERT(!(mode & ~STARPU_RW));
+	_starpu_spin_checklocked(&handle->header_lock);
+
+	if (handle->arbiter)
+		/* TODO: Not implemented for now */
+		return 0;
+
+	if (handle->current_mode & ~STARPU_RW || handle->reduction_refcnt)
+		/* Not just R or W, do not bother trying to acquire it */
+		return 0;
+
+	if (handle->refcnt)
+	{
+		if (handle->current_mode == STARPU_W)
+			/* Currently held in W mode, not possible */
+			return 0;
+		if (handle->current_mode == STARPU_R && mode == STARPU_W)
+			/* Currently held in R mode, not possible */
+			return 0;
+	}
+
+	handle->refcnt++;
+	handle->busy_count++;
+	if (mode != STARPU_R || handle->current_mode != mode)
+		handle->current_mode = mode;
+
+	return 1;
+}
+
 /*
  * Check to see whether the first queued request can proceed, and return it in
  * such case.

+ 2 - 0
src/core/dependencies/data_concurrency.h

@@ -38,5 +38,7 @@ unsigned _starpu_attempt_to_submit_arbitered_data_request(unsigned request_from_
 						       void (*callback)(void *), void *argcb,
 						       struct _starpu_job *j, unsigned buffer_index);
 
+int _starpu_attempt_to_acquire_data(starpu_data_handle_t handle, enum starpu_data_access_mode mode);
+
 #endif // __DATA_CONCURRENCY_H__
 

+ 82 - 20
src/datawizard/memalloc.c

@@ -19,6 +19,7 @@
 #include <datawizard/memory_nodes.h>
 #include <datawizard/memalloc.h>
 #include <datawizard/footprint.h>
+#include <core/dependencies/data_concurrency.h>
 #include <core/disk.h>
 #include <starpu.h>
 #include <common/uthash.h>
@@ -174,6 +175,7 @@ static void unlock_all_subtree(starpu_data_handle_t handle)
 	_starpu_spin_unlock(&handle->header_lock);
 }
 
+/* This locks header_lock for all the subdatas */
 static int lock_all_subtree(starpu_data_handle_t handle)
 {
 	int child;
@@ -199,30 +201,68 @@ static int lock_all_subtree(starpu_data_handle_t handle)
 	return 1;
 }
 
-static unsigned may_free_subtree(starpu_data_handle_t handle, unsigned node)
+static unsigned release_all_subtree(starpu_data_handle_t handle);
+/* This checks that all subdatas are available, and acquire the rw lock */
+static int may_free_subtree(starpu_data_handle_t handle, unsigned node)
 {
 	/* we only free if no one refers to the leaf */
 	uint32_t refcnt = _starpu_get_data_refcnt(handle, node);
 	if (refcnt)
 		return 0;
 
-	if (!handle->nchildren)
-		return 1;
+	if (!_starpu_attempt_to_acquire_data(handle, STARPU_R))
+		return 0;
 
 	/* look into all sub-subtrees children */
-	unsigned child;
-	for (child = 0; child < handle->nchildren; child++)
+	int child;
+	for (child = 0; child < (int) handle->nchildren; child++)
 	{
-		unsigned res;
+		int res;
 		starpu_data_handle_t child_handle = starpu_data_get_child(handle, child);
 		res = may_free_subtree(child_handle, node);
-		if (!res) return 0;
+		/* The children can't have disappeared since we still hold the
+		 * parent header_lock */
+		STARPU_ASSERT(res >= 0);
+		if (!res)
+		{
+			/* Oops, can't free a child, undo what we did */
+			for (child-- ; child >= 0; child--)
+			{
+				child_handle = starpu_data_get_child(handle, child);
+				res = release_all_subtree(child_handle);
+				/* The children can't have disappeared since we still hold the
+				 * parent header_lock */
+				STARPU_ASSERT(!res);
+			}
+			res = _starpu_notify_data_dependencies(handle);
+			if (res)
+				/* Urgl, it even disappeared, tell caller that it shouldn't even unlock it */
+				return -1;
+			return 0;
+		}
 	}
 
 	/* no problem was found */
 	return 1;
 }
 
+/* This releases the rw lock of all subdatas */
+static unsigned release_all_subtree(starpu_data_handle_t handle)
+{
+	unsigned child;
+
+	for (child = 0; child < handle->nchildren; child++)
+	{
+		unsigned res;
+		starpu_data_handle_t child_handle = starpu_data_get_child(handle, child);
+		res = release_all_subtree(child_handle);
+		/* The children can't have disappeared since we still hold the
+		 * parent header_lock */
+		STARPU_ASSERT(!res);
+	}
+	return _starpu_notify_data_dependencies(handle);
+}
+
 /* Warn: this releases the header lock of the handle during the transfer
  * The handle may thus unexpectedly disappear. This returns 1 in that case.
  */
@@ -250,12 +290,9 @@ static int transfer_subtree_to_node(starpu_data_handle_t handle, unsigned src_no
 			/* There is no way we don't need a request, since
 			 * source is OWNER, destination can't be having it */
 			STARPU_ASSERT(r);
-			/* Keep the handle alive while we are working on it */
-			handle->busy_count++;
 			_starpu_spin_unlock(&handle->header_lock);
 			_starpu_wait_data_request_completion(r, 1);
 			_starpu_spin_lock(&handle->header_lock);
-			handle->busy_count--;
 			if (_starpu_data_check_not_busy(handle))
 				return 1;
 		}
@@ -455,7 +492,9 @@ static size_t try_to_free_mem_chunk(struct _starpu_mem_chunk *mc, unsigned node)
 	else if (lock_all_subtree(handle))
 	{
 		/* check if they are all "free" */
-		if (may_free_subtree(handle, node))
+		int res = may_free_subtree(handle, node);
+
+		if (res == 1)
 		{
 			int target = -1;
 
@@ -471,7 +510,6 @@ static size_t try_to_free_mem_chunk(struct _starpu_mem_chunk *mc, unsigned node)
 
 			if (target != -1)
 			{
-				int res;
 				/* Should have been avoided in our caller */
 				STARPU_ASSERT(!mc->remove_notify);
 				mc->remove_notify = &mc;
@@ -508,10 +546,22 @@ static size_t try_to_free_mem_chunk(struct _starpu_mem_chunk *mc, unsigned node)
 					}
 				}
 			}
-		}
 
-		/* unlock the tree */
-		unlock_all_subtree(handle);
+			/* release the tree */
+			if (!release_all_subtree(handle))
+				/* unlock the tree */
+				unlock_all_subtree(handle);
+		}
+		else if (res == 0)
+		{
+			/* busy, unlock the tree */
+			unlock_all_subtree(handle);
+		}
+		else
+		{
+			STARPU_ASSERT(res == -1);
+			/* busy and disappeared, don't even unlock */
+		}
 	}
 	return freed;
 }
@@ -578,9 +628,9 @@ static unsigned try_to_reuse_mem_chunk(struct _starpu_mem_chunk *mc, unsigned no
 	/* and check if they are all "free" */
 	if (lock_all_subtree(old_data))
 	{
-		if (may_free_subtree(old_data, node))
+		int res = may_free_subtree(old_data, node);
+		if (res == 1)
 		{
-			int res;
 			/* Should have been avoided in our caller */
 			STARPU_ASSERT(!mc->remove_notify);
 			mc->remove_notify = &mc;
@@ -605,10 +655,22 @@ static unsigned try_to_reuse_mem_chunk(struct _starpu_mem_chunk *mc, unsigned no
 					success = 1;
 				}
 			}
-		}
 
-		/* unlock the tree */
-		unlock_all_subtree(old_data);
+			/* release the tree */
+			if (!release_all_subtree(old_data))
+				/* unlock the tree */
+				unlock_all_subtree(old_data);
+		}
+		else if (res == 0)
+		{
+			/* busy, unlock the tree */
+			unlock_all_subtree(old_data);
+		}
+		else
+		{
+			STARPU_ASSERT(res == -1);
+			/* busy and disappeared, don't even unlock */
+		}
 	}
 
 	return success;

+ 47 - 7
tests/disk/mem_reclaim.c

@@ -49,7 +49,7 @@
 #  define NITER 16
 #else
 #  define NDATA 128
-#  define NITER 1024
+#  define NITER 32
 #endif
 #  define MEMSIZE 1
 #  define MEMSIZE_STR "1"
@@ -82,10 +82,12 @@ void starpu_my_vector_data_register(starpu_data_handle_t *handleptr, unsigned ho
 	starpu_data_register(handleptr, home_node, &vector, &starpu_interface_my_vector_ops);
 }
 
+static unsigned values[NDATA];
+
 static void zero(void *buffers[], void *args)
 {
 	struct starpu_vector_interface *vector = (struct starpu_vector_interface *) buffers[0];
-	char *val = (char*) STARPU_VECTOR_GET_PTR(vector);
+	unsigned *val = (unsigned*) STARPU_VECTOR_GET_PTR(vector);
 	*val = 0;
 	VALGRIND_MAKE_MEM_DEFINED(val, STARPU_VECTOR_GET_NX(vector) * STARPU_VECTOR_GET_ELEMSIZE(vector));
 }
@@ -93,8 +95,20 @@ static void zero(void *buffers[], void *args)
 static void inc(void *buffers[], void *args)
 {
 	struct starpu_vector_interface *vector = (struct starpu_vector_interface *) buffers[0];
-	char *val = (char*) STARPU_VECTOR_GET_PTR(vector);
-	*val++;
+	unsigned *val = (unsigned*) STARPU_VECTOR_GET_PTR(vector);
+	unsigned i;
+	starpu_codelet_unpack_args(args, &i);
+	(*val)++;
+	STARPU_ATOMIC_ADD(&values[i], 1);
+}
+
+static void check(void *buffers[], void *args)
+{
+	struct starpu_vector_interface *vector = (struct starpu_vector_interface *) buffers[0];
+	unsigned *val = (unsigned*) STARPU_VECTOR_GET_PTR(vector);
+	unsigned i;
+	starpu_codelet_unpack_args(args, &i);
+	STARPU_ASSERT(*val == values[i]);
 }
 
 static struct starpu_codelet zero_cl =
@@ -111,6 +125,13 @@ static struct starpu_codelet inc_cl =
 	.modes = { STARPU_RW },
 };
 
+static struct starpu_codelet check_cl =
+{
+	.cpu_funcs = { check },
+	.nbuffers = 1,
+	.modes = { STARPU_R },
+};
+
 int dotest(struct starpu_disk_ops *ops, char *base, void (*vector_data_register)(starpu_data_handle_t *handleptr, unsigned home_node, uintptr_t ptr, uint32_t nx, size_t elemsize))
 {
 	int *A, *C;
@@ -132,21 +153,40 @@ int dotest(struct starpu_disk_ops *ops, char *base, void (*vector_data_register)
 	/* can't write on /tmp/ */
 	if (new_dd == -ENOENT) goto enoent;
 
-	unsigned int i;
+	unsigned int i, j, k;
 
 	/* Initialize twice as much data as available memory */
 	for (i = 0; i < NDATA; i++)
 	{
 		vector_data_register(&handles[i], -1, 0, (MEMSIZE*1024*1024*2) / NDATA, sizeof(char));
 		starpu_task_insert(&zero_cl, STARPU_W, handles[i], 0);
+		if ((i % (NDATA/4)) == 0)
+			/* Wait for 1/4 of the tasks (i.e. half of the memory)
+			 * to finish before filling with others */
+			starpu_task_wait_for_all();
 	}
+	memset(values, 0, sizeof(values));
 
 	for (i = 0; i < NITER; i++)
-		starpu_task_insert(&inc_cl, STARPU_RW, handles[rand()%NDATA], 0);
+	{
+		/* submit tasks dealing with half of the memory */
+		for (j = 0; j < NDATA/4; j++)
+		{
+			k = rand()%NDATA;
+			starpu_task_insert(&inc_cl, STARPU_RW, handles[k], STARPU_VALUE, &k, sizeof(k), 0);
+		}
+		/* and wait for them before submitting more */
+		starpu_task_wait_for_all();
+	}
 
-	/* Free data */
+	/* Check and free data */
 	for (i = 0; i < NDATA; i++)
+	{
+		/* No need to be careful, reclaiming can run concurrently with
+		 * tasks reading data */
+		starpu_task_insert(&check_cl, STARPU_R, handles[i], STARPU_VALUE, &i, sizeof(i), 0);
 		starpu_data_unregister(handles[i]);
+	}
 
 	/* terminate StarPU, no task can be submitted after */
 	starpu_shutdown();