Browse Source

Fix mem_reclaim: on write request, if some requests are pending (notably reclaiming), create an additional write request which will wait for them before invalidating all copies

Samuel Thibault 10 years ago
parent
commit
91fd2cfc15

+ 80 - 13
src/datawizard/coherency.c

@@ -470,13 +470,35 @@ struct _starpu_data_request *_starpu_create_request_to_fetch_data(starpu_data_ha
 	_starpu_spin_checklocked(&handle->header_lock);
 
 	unsigned requesting_node = dst_replicate->memory_node;
+	unsigned nwait = 0;
 
-	if (dst_replicate->state != STARPU_INVALID)
+	if (mode & STARPU_W)
+	{
+		/* We will write to the buffer. We will have to wait for all
+		 * existing requests before the last request which will
+		 * invalidate all their results (which were possibly spurious,
+		 * e.g. too aggressive eviction).
+		 */
+		unsigned i, j;
+		unsigned nnodes = starpu_memory_nodes_get_count();
+		for (i = 0; i < nnodes; i++)
+			for (j = 0; j < nnodes; j++)
+				if (handle->per_node[i].request[j])
+					nwait++;
+		/* If the request is not detached (i.e. the caller really wants
+		 * proper ownership), no new requests will appear because a
+		 * reference will be kept on the dst replicate, which will
+		 * notably prevent data reclaiming.
+		 */
+	}
+
+	if (dst_replicate->state != STARPU_INVALID && !nwait)
 	{
 #ifdef STARPU_MEMORY_STATS
 		enum _starpu_cache_state old_state = dst_replicate->state;
 #endif
-		/* the data is already available so we can stop */
+		/* the data is already available and we don't have to wait for
+		 * any request, so we can stop */
 		_starpu_update_data_state(handle, dst_replicate, mode);
 		_starpu_msi_cache_hit(requesting_node);
 
@@ -503,15 +525,17 @@ struct _starpu_data_request *_starpu_create_request_to_fetch_data(starpu_data_ha
 	_starpu_msi_cache_miss(requesting_node);
 
 	/* the only remaining situation is that the local copy was invalid */
-	STARPU_ASSERT(dst_replicate->state == STARPU_INVALID);
+	STARPU_ASSERT(dst_replicate->state == STARPU_INVALID || nwait);
 
 	/* find someone who already has the data */
-	int src_node = 0;
+	int src_node = -1;
 
 	if (mode & STARPU_R)
 	{
-		src_node = _starpu_select_src_node(handle, requesting_node);
-		STARPU_ASSERT(src_node != (int) requesting_node);
+		if (dst_replicate->state == STARPU_INVALID)
+			src_node = _starpu_select_src_node(handle, requesting_node);
+		else
+			src_node = requesting_node;
 		if (src_node < 0)
 		{
 			/* We will create it, no need to read an existing value */
@@ -523,7 +547,7 @@ struct _starpu_data_request *_starpu_create_request_to_fetch_data(starpu_data_ha
 		/* if the data is in write only mode (and not SCRATCH or REDUX), there is no need for a source, data will be initialized by the task itself */
 		if (mode & STARPU_W)
 			dst_replicate->initialized = 1;
-		if (requesting_node == STARPU_MAIN_RAM)
+		if (requesting_node == STARPU_MAIN_RAM && !nwait)
 		{
 			/* And this is the main RAM, really no need for a
 			 * request, just allocate */
@@ -541,14 +565,17 @@ struct _starpu_data_request *_starpu_create_request_to_fetch_data(starpu_data_ha
 		}
 	}
 
+#define MAX_REQUESTS 4
 	/* We can safely assume that there won't be more than 2 hops in the
 	 * current implementation */
-	unsigned src_nodes[4], dst_nodes[4], handling_nodes[4];
-	int nhops = determine_request_path(handle, src_node, requesting_node, mode, 4,
+	unsigned src_nodes[MAX_REQUESTS], dst_nodes[MAX_REQUESTS], handling_nodes[MAX_REQUESTS];
+	int nhops = determine_request_path(handle, src_node, requesting_node, mode, MAX_REQUESTS,
 					src_nodes, dst_nodes, handling_nodes);
 
-	STARPU_ASSERT(nhops >= 1 && nhops <= 4);
-	struct _starpu_data_request *requests[nhops];
+	/* keep one slot for the last W request, if any */
+	int write_invalidation = mode & STARPU_W && nwait && !is_prefetch;
+	STARPU_ASSERT(nhops >= 0 && nhops <= MAX_REQUESTS-1);
+	struct _starpu_data_request *requests[nhops + write_invalidation];
 
 	/* Did we reuse a request for that hop ? */
 	int reused_requests[nhops];
@@ -584,7 +611,8 @@ struct _starpu_data_request *_starpu_create_request_to_fetch_data(starpu_data_ha
 			/* Create a new request if there was no request to reuse */
 			r = _starpu_create_data_request(handle, hop_src_replicate,
 							hop_dst_replicate, hop_handling_node,
-							mode, ndeps, is_prefetch);
+							mode, ndeps, is_prefetch, 0);
+			nwait++;
 		}
 
 		requests[hop] = r;
@@ -604,7 +632,7 @@ struct _starpu_data_request *_starpu_create_request_to_fetch_data(starpu_data_ha
 				STARPU_ASSERT(r->next_req_count <= STARPU_MAXNODES);
 			}
 		}
-		else
+		else if (!write_invalidation)
 			/* The last request will perform the callback after termination */
 			_starpu_data_request_append_callback(r, callback_func, callback_arg);
 
@@ -613,6 +641,45 @@ struct _starpu_data_request *_starpu_create_request_to_fetch_data(starpu_data_ha
 			_starpu_spin_unlock(&r->lock);
 	}
 
+	if (write_invalidation)
+	{
+		/* Some requests were still pending, we have to add yet another
+		 * request, depending on them, which will invalidate their
+		 * result.
+		 */
+		struct _starpu_data_request *r = _starpu_create_data_request(handle, dst_replicate,
+							dst_replicate, requesting_node,
+							STARPU_W, nwait, is_prefetch, 1);
+
+		/* and perform the callback after termination */
+		_starpu_data_request_append_callback(r, callback_func, callback_arg);
+
+		/* We will write to the buffer. We will have to wait for all
+		 * existing requests before the last request which will
+		 * invalidate all their results (which were possibly spurious,
+		 * e.g. too aggressive eviction).
+		 */
+		unsigned i, j;
+		unsigned nnodes = starpu_memory_nodes_get_count();
+		for (i = 0; i < nnodes; i++)
+			for (j = 0; j < nnodes; j++)
+			{
+				struct _starpu_data_request *r2 = handle->per_node[i].request[j];
+				if (r2)
+				{
+					_starpu_spin_lock(&r2->lock);
+					r2->next_req[r2->next_req_count++] = r;
+					STARPU_ASSERT(r2->next_req_count <= STARPU_MAXNODES + 1);
+					_starpu_spin_unlock(&r2->lock);
+					nwait--;
+				}
+			}
+		STARPU_ASSERT(nwait == 0);
+
+		nhops++;
+		requests[nhops - 1] = r;
+	}
+
 	if (!async)
 		requests[nhops - 1]->refcnt++;
 

+ 3 - 0
src/datawizard/coherency.h

@@ -233,6 +233,9 @@ struct _starpu_data_state
 
 	starpu_data_handle_t *reduction_tmp_handles;
 
+	/* Final request for write invalidation */
+	struct _starpu_data_request *write_invalidation_req;
+
 	unsigned lazy_unregister;
 
 #ifdef STARPU_OPENMP

+ 31 - 13
src/datawizard/data_request.c

@@ -80,24 +80,32 @@ void _starpu_deinit_data_request_lists(void)
 /* this should be called with the lock r->handle->header_lock taken */
 static void _starpu_data_request_unlink(struct _starpu_data_request *r)
 {
-	unsigned node;
-
 	_starpu_spin_checklocked(&r->handle->header_lock);
 
-	/* If this is a write only request, then there is no source and we use
-	 * the destination node to cache the request. Otherwise we store the
-	 * pending requests between src and dst. */
-	if (r->mode & STARPU_R)
+	/* If this is a write invalidation request, we store it in the handle
+	 */
+	if (r->handle->write_invalidation_req == r)
+	{
+		STARPU_ASSERT(r->mode == STARPU_W);
+		r->handle->write_invalidation_req = NULL;
+	}
+	else if (r->mode & STARPU_R)
 	{
-		node = r->src_replicate->memory_node;
+		/* If this is a read request, we store the pending requests
+		 * between src and dst. */
+		unsigned node = r->src_replicate->memory_node;
+		STARPU_ASSERT(r->dst_replicate->request[node] == r);
+		r->dst_replicate->request[node] = NULL;
 	}
 	else
 	{
-		node = r->dst_replicate->memory_node;
+		/* If this is a write only request, then there is no source and
+		 * we use the destination node to cache the request. */
+		unsigned node = r->dst_replicate->memory_node;
+		STARPU_ASSERT(r->dst_replicate->request[node] == r);
+		r->dst_replicate->request[node] = NULL;
 	}
 
-	STARPU_ASSERT(r->dst_replicate->request[node] == r);
-	r->dst_replicate->request[node] = NULL;
 }
 
 static void _starpu_data_request_destroy(struct _starpu_data_request *r)
@@ -121,7 +129,8 @@ struct _starpu_data_request *_starpu_create_data_request(starpu_data_handle_t ha
 							 unsigned handling_node,
 							 enum starpu_data_access_mode mode,
 							 unsigned ndeps,
-							 unsigned is_prefetch)
+							 unsigned is_prefetch,
+							 unsigned is_write_invalidation)
 {
 	struct _starpu_data_request *r = _starpu_data_request_new();
 
@@ -148,7 +157,12 @@ struct _starpu_data_request *_starpu_create_data_request(starpu_data_handle_t ha
 	dst_replicate->refcnt++;
 	handle->busy_count++;
 
-	if (mode & STARPU_R)
+	if (is_write_invalidation)
+	{
+		STARPU_ASSERT(!handle->write_invalidation_req);
+		handle->write_invalidation_req = r;
+	}
+	else if (mode & STARPU_R)
 	{
 		unsigned src_node = src_replicate->memory_node;
 		STARPU_ASSERT(!dst_replicate->request[src_node]);
@@ -420,8 +434,12 @@ static int starpu_handle_data_request(struct _starpu_data_request *r, unsigned m
 	/* the header of the data must be locked by the worker that submitted the request */
 
 
-	r->retval = _starpu_driver_copy_data_1_to_1(handle, src_replicate,
+	if (dst_replicate->state == STARPU_INVALID)
+		r->retval = _starpu_driver_copy_data_1_to_1(handle, src_replicate,
 						    dst_replicate, !(r_mode & STARPU_R), r, may_alloc);
+	else
+		/* Already valid actually, no need to transfer anything */
+		r->retval = 0;
 
 	if (r->retval == -ENOMEM)
 	{

+ 3 - 2
src/datawizard/data_request.h

@@ -83,7 +83,7 @@ LIST_TYPE(_starpu_data_request,
 
 	/* in case we have a chain of request (eg. for nvidia multi-GPU), this
 	 * is the list of requests which are waiting for this one. */
-	struct _starpu_data_request *next_req[STARPU_MAXNODES];
+	struct _starpu_data_request *next_req[STARPU_MAXNODES+1];
 	/* The number of requests in next_req */
 	unsigned next_req_count;
 
@@ -132,7 +132,8 @@ struct _starpu_data_request *_starpu_create_data_request(starpu_data_handle_t ha
 							 unsigned handling_node,
 							 enum starpu_data_access_mode mode,
 							 unsigned ndeps,
-							 unsigned is_prefetch);
+							 unsigned is_prefetch,
+							 unsigned is_write_invalidation);
 
 int _starpu_wait_data_request_completion(struct _starpu_data_request *r, unsigned may_alloc);
 

+ 1 - 0
src/datawizard/filters.c

@@ -204,6 +204,7 @@ static void _starpu_data_partition(starpu_data_handle_t initial_handle, starpu_d
 		_starpu_data_requester_list_init(&child->req_list);
 		_starpu_data_requester_list_init(&child->reduction_req_list);
 		child->reduction_tmp_handles = NULL;
+		child->write_invalidation_req = NULL;
 		child->refcnt = 0;
 		child->busy_count = 0;
 		child->busy_waiting = 0;

+ 1 - 0
src/datawizard/interfaces/data_interface.c

@@ -286,6 +286,7 @@ static void _starpu_register_new_data(starpu_data_handle_t handle,
 	handle->reduction_refcnt = 0;
 	_starpu_data_requester_list_init(&handle->reduction_req_list);
 	handle->reduction_tmp_handles = NULL;
+	handle->write_invalidation_req = NULL;
 
 #ifdef STARPU_USE_FXT
 	handle->last_submitted_ghost_sync_id_is_valid = 0;

+ 27 - 8
src/datawizard/memalloc.c

@@ -206,9 +206,6 @@ static unsigned may_free_subtree(starpu_data_handle_t handle, unsigned node)
 	if (refcnt)
 		return 0;
 
-	if (!handle->nchildren)
-		return 1;
-
 	/* look into all sub-subtrees children */
 	unsigned child;
 	for (child = 0; child < handle->nchildren; child++)
@@ -257,8 +254,14 @@ static int transfer_subtree_to_node(starpu_data_handle_t handle, unsigned src_no
 			_starpu_spin_lock(&handle->header_lock);
 			handle->busy_count--;
 			if (_starpu_data_check_not_busy(handle))
-				return 1;
+				/* Actually disappeared, abort completely */
+				return -1;
+			if (!may_free_subtree(handle, src_node))
+				/* Oops, while we released the header lock, a
+				 * task got in, abort. */
+				return 0;
 		}
+		STARPU_ASSERT(may_free_subtree(handle, src_node));
 
 		if (src_replicate->state == STARPU_SHARED)
 		{
@@ -294,12 +297,15 @@ static int transfer_subtree_to_node(starpu_data_handle_t handle, unsigned src_no
 		{
 			starpu_data_handle_t child_handle = starpu_data_get_child(handle, child);
 			res = transfer_subtree_to_node(child_handle, src_node, dst_node);
+			if (res == 0)
+				return 0;
 			/* There is no way children have disappeared since we
 			 * keep the parent lock held */
-			STARPU_ASSERT(!res);
+			STARPU_ASSERT(res != -1);
 		}
 	}
-	return 0;
+	/* Success! */
+	return 1;
 }
 
 static void notify_handle_children(starpu_data_handle_t handle, struct _starpu_data_replicate *replicate, unsigned node)
@@ -496,7 +502,13 @@ static size_t try_to_free_mem_chunk(struct _starpu_mem_chunk *mc, unsigned node)
 					STARPU_ASSERT(mc->remove_notify == &mc);
 					mc->remove_notify = NULL;
 
-					if (!res)
+					if (res == -1)
+					{
+						/* handle disappeared, abort without unlocking it */
+						return 0;
+					}
+
+					if (res == 1)
 					{
 						/* mc is still associated with the old
 						 * handle, now free it.
@@ -596,7 +608,14 @@ static unsigned try_to_reuse_mem_chunk(struct _starpu_mem_chunk *mc, unsigned no
 			{
 				STARPU_ASSERT(mc->remove_notify == &mc);
 				mc->remove_notify = NULL;
-				if (!res)
+
+				if (res == -1)
+				{
+					/* handle disappeared, abort without unlocking it */
+					return 0;
+				}
+
+				if (res == 1)
 				{
 					/* mc is still associated with the old
 					 * handle, now replace the previous data

+ 1 - 1
src/datawizard/user_interactions.c

@@ -34,7 +34,7 @@ int starpu_data_request_allocation(starpu_data_handle_t handle, unsigned node)
 
 	_starpu_spin_lock(&handle->header_lock);
 
-	r = _starpu_create_data_request(handle, NULL, &handle->per_node[node], node, STARPU_NONE, 0, 1);
+	r = _starpu_create_data_request(handle, NULL, &handle->per_node[node], node, STARPU_NONE, 0, 1, 0);
 
 	/* we do not increase the refcnt associated to the request since we are
 	 * not waiting for its termination */

+ 34 - 6
tests/disk/mem_reclaim.c

@@ -82,10 +82,12 @@ void starpu_my_vector_data_register(starpu_data_handle_t *handleptr, unsigned ho
 	starpu_data_register(handleptr, home_node, &vector, &starpu_interface_my_vector_ops);
 }
 
+static unsigned values[NDATA];
+
 static void zero(void *buffers[], void *args)
 {
 	struct starpu_vector_interface *vector = (struct starpu_vector_interface *) buffers[0];
-	char *val = (char*) STARPU_VECTOR_GET_PTR(vector);
+	unsigned *val = (unsigned*) STARPU_VECTOR_GET_PTR(vector);
 	*val = 0;
 	VALGRIND_MAKE_MEM_DEFINED(val, STARPU_VECTOR_GET_NX(vector) * STARPU_VECTOR_GET_ELEMSIZE(vector));
 }
@@ -93,8 +95,20 @@ static void zero(void *buffers[], void *args)
 static void inc(void *buffers[], void *args)
 {
 	struct starpu_vector_interface *vector = (struct starpu_vector_interface *) buffers[0];
-	char *val = (char*) STARPU_VECTOR_GET_PTR(vector);
-	*val++;
+	unsigned *val = (unsigned*) STARPU_VECTOR_GET_PTR(vector);
+	unsigned i;
+	starpu_codelet_unpack_args(args, &i);
+	(*val)++;
+	STARPU_ATOMIC_ADD(&values[i], 1);
+}
+
+static void check(void *buffers[], void *args)
+{
+	struct starpu_vector_interface *vector = (struct starpu_vector_interface *) buffers[0];
+	unsigned *val = (unsigned*) STARPU_VECTOR_GET_PTR(vector);
+	unsigned i;
+	starpu_codelet_unpack_args(args, &i);
+	STARPU_ASSERT(*val == values[i]);
 }
 
 static struct starpu_codelet zero_cl =
@@ -111,6 +125,13 @@ static struct starpu_codelet inc_cl =
 	.modes = { STARPU_RW },
 };
 
+static struct starpu_codelet check_cl =
+{
+	.cpu_funcs = { check },
+	.nbuffers = 1,
+	.modes = { STARPU_R },
+};
+
 int dotest(struct starpu_disk_ops *ops, char *base, void (*vector_data_register)(starpu_data_handle_t *handleptr, unsigned home_node, uintptr_t ptr, uint32_t nx, size_t elemsize))
 {
 	int *A, *C;
@@ -132,7 +153,7 @@ int dotest(struct starpu_disk_ops *ops, char *base, void (*vector_data_register)
 	/* can't write on /tmp/ */
 	if (new_dd == -ENOENT) goto enoent;
 
-	unsigned int i;
+	unsigned int i, j;
 
 	/* Initialize twice as much data as available memory */
 	for (i = 0; i < NDATA; i++)
@@ -140,13 +161,20 @@ int dotest(struct starpu_disk_ops *ops, char *base, void (*vector_data_register)
 		vector_data_register(&handles[i], -1, 0, (MEMSIZE*1024*1024*2) / NDATA, sizeof(char));
 		starpu_task_insert(&zero_cl, STARPU_W, handles[i], 0);
 	}
+	memset(values, 0, sizeof(values));
 
 	for (i = 0; i < NITER; i++)
-		starpu_task_insert(&inc_cl, STARPU_RW, handles[rand()%NDATA], 0);
+	{
+		j = rand()%NDATA;
+		starpu_task_insert(&inc_cl, STARPU_RW, handles[j], STARPU_VALUE, &j, sizeof(j), 0);
+	}
 
-	/* Free data */
+	/* Check and free data */
 	for (i = 0; i < NDATA; i++)
+	{
+		starpu_task_insert(&check_cl, STARPU_R, handles[i], STARPU_VALUE, &i, sizeof(i), 0);
 		starpu_data_unregister(handles[i]);
+	}
 
 	/* terminate StarPU, no task can be submitted after */
 	starpu_shutdown();