Переглянути джерело

Implement starpu_data_invalidate which invalidates all copies of a data handle.

Cédric Augonnet 14 роки тому
батько
коміт
461cd23194

+ 4 - 0
include/starpu_data.h

@@ -45,6 +45,10 @@ struct starpu_data_interface_ops_t;
 
 void starpu_data_unregister(starpu_data_handle handle);
 
+/* Destroy all data replicates. After data invalidation, the first access to
+ * the handle must be performed in write-only mode. */
+void starpu_data_invalidate(starpu_data_handle);
+
 void starpu_data_advise_as_important(starpu_data_handle handle, unsigned is_important);
 
 int starpu_data_acquire(starpu_data_handle handle, starpu_access_mode mode);

+ 2 - 0
src/core/dependencies/implicit_data_deps.c

@@ -324,6 +324,8 @@ void _starpu_unlock_post_sync_tasks(starpu_data_handle handle)
 	}
 }
 
+/* If sequential consistency mode is enabled, this function blocks until the
+ * handle is available in the requested access mode. */
 int _starpu_data_wait_until_available(starpu_data_handle handle, starpu_access_mode mode)
 {
 	/* If sequential consistency is enabled, wait until data is available */

+ 7 - 7
src/datawizard/coherency.c

@@ -315,16 +315,16 @@ uint32_t _starpu_data_get_footprint(starpu_data_handle handle)
 void _starpu_release_data_on_node(starpu_data_handle handle, uint32_t default_wb_mask, uint32_t memory_node)
 {
 	uint32_t wb_mask;
-
-	/* normally, the requesting node should have the data in an exclusive manner */
-	STARPU_ASSERT(handle->per_node[memory_node].state != STARPU_INVALID);
-
 	wb_mask = default_wb_mask | handle->wb_mask;
 
+	/* Note that it is possible that there is no valid copy of the data (if
+	 * starpu_data_invalidate was called for instance). In that case, we do
+	 * not enforce any write-through mechanism. */
+
 	/* are we doing write-through or just some normal write-back ? */
-	if (wb_mask & ~(1<<memory_node)) {
+	if (handle->per_node[memory_node].state != STARPU_INVALID)
+	if ((wb_mask & ~(1<<memory_node)))
 		_starpu_write_through_data(handle, memory_node, wb_mask);
-	}
 
 	uint32_t local_node = _starpu_get_local_memory_node();
 	while (_starpu_spin_trylock(&handle->header_lock))
@@ -373,7 +373,7 @@ int _starpu_fetch_task_input(struct starpu_task *task, uint32_t mask)
 	starpu_buffer_descr *descrs = task->buffers;
 	unsigned nbuffers = task->cl->nbuffers;
 
-#warning TODO get that from the stack
+	/* TODO get that from the stack */
 	starpu_job_t j = (struct starpu_job_s *)task->starpu_private;
 
 	unsigned index;

+ 28 - 2
src/datawizard/interfaces/data_interface.c

@@ -170,7 +170,7 @@ static void _starpu_data_unregister_fetch_data_callback(void *_arg)
 
 void starpu_data_unregister(starpu_data_handle handle)
 {
-	unsigned node;
+	STARPU_ASSERT(handle);
 
 	/* If sequential consistency is enabled, wait until data is available */
 	_starpu_data_wait_until_available(handle, STARPU_RW);
@@ -203,7 +203,7 @@ void starpu_data_unregister(starpu_data_handle handle)
 	}
 
 	/* Destroy the data now */
-	STARPU_ASSERT(handle);
+	unsigned node;
 	for (node = 0; node < STARPU_MAXNODES; node++)
 	{
 		starpu_local_data_state *local = &handle->per_node[node];
@@ -221,6 +221,32 @@ void starpu_data_unregister(starpu_data_handle handle)
 	free(handle);
 }
 
+void starpu_data_invalidate(starpu_data_handle handle)
+{
+	STARPU_ASSERT(handle);
+
+	starpu_data_acquire(handle, STARPU_W);
+
+	_starpu_spin_lock(&handle->header_lock);
+
+	unsigned node;
+	for (node = 0; node < STARPU_MAXNODES; node++)
+	{
+		starpu_local_data_state *local = &handle->per_node[node];
+
+		if (local->allocated && local->automatically_allocated){
+			/* free the data copy in a lazy fashion */
+			_starpu_request_mem_chunk_removal(handle, node);
+		}
+
+		local->state = STARPU_INVALID; 
+	}
+
+	_starpu_spin_unlock(&handle->header_lock);
+
+	starpu_data_release(handle);
+}
+
 unsigned starpu_get_handle_interface_id(starpu_data_handle handle)
 {
 	return handle->ops->interfaceid;

+ 4 - 0
tests/Makefile.am

@@ -99,6 +99,7 @@ check_PROGRAMS += 				\
 	datawizard/sync_and_notify_data_implicit\
 	datawizard/dsm_stress			\
 	datawizard/write_only_tmp_buffer	\
+	datawizard/data_invalidation		\
 	datawizard/dining_philosophers		\
 	datawizard/readers_and_writers		\
 	datawizard/unpartition			\
@@ -207,6 +208,9 @@ datawizard_dsm_stress_SOURCES =			\
 datawizard_write_only_tmp_buffer_SOURCES =	\
 	datawizard/write_only_tmp_buffer.c
 
+datawizard_data_invalidation_SOURCES =	\
+	datawizard/data_invalidation.c
+
 datawizard_dining_philosophers_SOURCES = 	\
 	datawizard/dining_philosophers.c
 

+ 145 - 0
tests/datawizard/data_invalidation.c

@@ -0,0 +1,145 @@
+/*
+ * StarPU
+ * Copyright (C) Université Bordeaux 1, CNRS 2008-2010 (see AUTHORS file)
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <stdio.h>
+#include <unistd.h>
+#include <errno.h>
+#include <starpu.h>
+#include <stdlib.h>
+
+#define NLOOPS		1000
+#define VECTORSIZE	1024
+
+static starpu_data_handle v_handle;
+
+/*
+ *	Memset
+ */
+
+#ifdef STARPU_USE_CUDA
+static void cuda_memset_codelet(void *descr[], __attribute__ ((unused)) void *_args)
+{
+	char *buf = (char *)STARPU_VECTOR_GET_PTR(descr[0]);
+	unsigned length = STARPU_VECTOR_GET_NX(descr[0]);
+
+	cudaMemset(buf, 42, length);
+	cudaThreadSynchronize();
+}
+#endif
+
+static void cpu_memset_codelet(void *descr[], __attribute__ ((unused)) void *_args)
+{
+	char *buf = (char *)STARPU_VECTOR_GET_PTR(descr[0]);
+	unsigned length = STARPU_VECTOR_GET_NX(descr[0]);
+
+	memset(buf, 42, length);
+}
+
+static starpu_codelet memset_cl = {
+	.where = STARPU_CPU|STARPU_CUDA,
+	.cpu_func = cpu_memset_codelet,
+#ifdef STARPU_USE_CUDA
+	.cuda_func = cuda_memset_codelet,
+#endif
+	.nbuffers = 1
+};
+
+/*
+ *	Check content
+ */
+
+static void cpu_check_content_codelet(void *descr[], __attribute__ ((unused)) void *_args)
+{
+	char *buf = (char *)STARPU_VECTOR_GET_PTR(descr[0]);
+	unsigned length = STARPU_VECTOR_GET_NX(descr[0]);
+
+	unsigned i;
+	for (i = 0; i < length; i++)
+	{
+		if (buf[i] != 42)
+		{
+			fprintf(stderr, "buf[%d] is %c while it should be %c\n", buf[i], 42);
+			exit(-1);
+		}
+	}
+}
+
+static starpu_codelet check_content_cl = {
+	.where = STARPU_CPU,
+	.cpu_func = cpu_check_content_codelet,
+	.nbuffers = 1
+};
+
+
+int main(int argc, char **argv)
+{
+	int ret;
+
+	starpu_init(NULL);
+
+	/* The buffer should never be explicitely allocated */
+	starpu_vector_data_register(&v_handle, (uint32_t)-1, (uintptr_t)NULL, VECTORSIZE, sizeof(char));
+
+	unsigned loop;
+	for (loop = 0; loop < NLOOPS; loop++)
+	{
+		struct starpu_task *memset_task;
+		struct starpu_task *check_content_task;
+
+		memset_task = starpu_task_create();
+		memset_task->cl = &memset_cl;
+		memset_task->buffers[0].handle = v_handle;
+		memset_task->buffers[0].mode = STARPU_W;
+		memset_task->detach = 0;
+	
+		ret = starpu_task_submit(memset_task);
+		if (ret == -ENODEV)
+				goto enodev;
+	
+		ret = starpu_task_wait(memset_task);
+		if (ret)
+			exit(-1);
+		
+		check_content_task = starpu_task_create();
+		check_content_task->cl = &check_content_cl;
+		check_content_task->buffers[0].handle = v_handle;
+		check_content_task->buffers[0].mode = STARPU_R;
+		check_content_task->detach = 0;
+	
+		ret = starpu_task_submit(check_content_task);
+		if (ret == -ENODEV)
+				goto enodev;
+	
+		ret = starpu_task_wait(check_content_task);
+		if (ret)
+			exit(-1);
+
+		starpu_data_invalidate(v_handle);
+	}
+
+	/* this should get rid of automatically allocated buffers */
+	starpu_data_unregister(v_handle);
+
+	starpu_shutdown();
+
+	return 0;
+
+enodev:
+	fprintf(stderr, "WARNING: No one can execute this task\n");
+	/* yes, we do not perform the computation but we did detect that no one
+ 	 * could perform the kernel, so this is not an error from StarPU */
+	return 0;
+}