Browse Source

port r12703 from 1.1: Allow data to be uninitialized but the application provide an init_cl to initialize it

Samuel Thibault 11 years ago
parent
commit
f255d2c95a

+ 12 - 0
ChangeLog

@@ -17,6 +17,18 @@
 StarPU 1.2.0 (svn revision xxxx)
 ==============================================
 
+StarPU 1.1.2 (svn revision xxxx)
+==============================================
+The scheduling context release
+
+New features:
+  * The reduction init codelet is automatically used to initialize temporary
+    buffers.
+
+StarPU 1.1.1 (svn revision 12638)
+==============================================
+The scheduling context release
+
 New features:
   * Xeon Phi support
   * SCC support

+ 5 - 0
doc/doxygen/chapters/07data_management.doxy

@@ -325,6 +325,11 @@ starpu_task_insert(&summarize_data, STARPU_R, handle, STARPU_W, result_handle, 0
 starpu_data_unregister_submit(handle);
 \endcode
 
+The application may also want to see the temporary data initialized
+on the fly before being used by the task. This can be done by using
+starpu_data_set_reduction_methods() to set an initialization codelet (no redux
+codelet is needed).
+
 \subsection ScratchData Scratch Data
 
 Some kernels sometimes need temporary data to achieve the computations, i.e. a

+ 6 - 2
src/core/perfmodel/perfmodel.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2009-2013  Université de Bordeaux 1
+ * Copyright (C) 2009-2014  Université de Bordeaux 1
  * Copyright (C) 2010, 2011, 2012, 2013  Centre National de la Recherche Scientifique
  * Copyright (C) 2011  Télécom-SudParis
  *
@@ -293,7 +293,11 @@ double starpu_data_expected_transfer_time(starpu_data_handle_t handle, unsigned
 	if (size == 0)
 		return 0.0;
 
-	unsigned src_node = _starpu_select_src_node(handle, memory_node);
+	int src_node = _starpu_select_src_node(handle, memory_node);
+	if (src_node < 0)
+		/* Will just create it in place. Ideally we should take the
+		 * time to create it into account */
+		return 0.0;
 	return starpu_transfer_predict(src_node, memory_node, size);
 }
 

+ 20 - 10
src/datawizard/coherency.c

@@ -27,7 +27,7 @@
 #include <starpu_scheduler.h>
 
 static int link_supports_direct_transfers(starpu_data_handle_t handle, unsigned src_node, unsigned dst_node, unsigned *handling_node);
-unsigned _starpu_select_src_node(starpu_data_handle_t handle, unsigned destination)
+int _starpu_select_src_node(starpu_data_handle_t handle, unsigned destination)
 {
 	int src_node = -1;
 	unsigned i;
@@ -50,6 +50,12 @@ unsigned _starpu_select_src_node(starpu_data_handle_t handle, unsigned destinati
 		}
 	}
 
+	if (src_node_mask == 0 && handle->init_cl)
+	{
+		/* No copy yet, but applicationg told us how to build it.  */
+		return -1;
+	}
+
 	/* we should have found at least one copy ! */
 	STARPU_ASSERT(src_node_mask != 0);
 
@@ -427,16 +433,23 @@ struct _starpu_data_request *_starpu_create_request_to_fetch_data(starpu_data_ha
 	STARPU_ASSERT(dst_replicate->state == STARPU_INVALID);
 
 	/* find someone who already has the data */
-	unsigned src_node = 0;
+	int src_node = 0;
 
 	if (mode & STARPU_R)
 	{
 		src_node = _starpu_select_src_node(handle, requesting_node);
-		STARPU_ASSERT(src_node != requesting_node);
+		STARPU_ASSERT(src_node != (int) requesting_node);
+		if (src_node < 0)
+		{
+			/* We will create it, no need to read an existing value */
+			mode &= ~STARPU_R;
+		}
 	}
 	else
 	{
-		/* if the data is in write only mode, there is no need for a source */
+		/* if the data is in write only mode (and not SCRATCH or REDUX), there is no need for a source, data will be initialized by the task itself */
+		if (mode & STARPU_W)
+			dst_replicate->initialized = 1;
 		if (requesting_node == STARPU_MAIN_RAM) {
 			/* And this is the main RAM, really no need for a
 			 * request, just allocate */
@@ -752,12 +765,9 @@ int _starpu_fetch_task_input(struct _starpu_job *j)
 
 		_STARPU_TASK_SET_INTERFACE(task , local_replicate->data_interface, index);
 
-		if (mode & STARPU_REDUX)
-		{
-			/* If the replicate was not initialized yet, we have to do it now */
-			if (!local_replicate->initialized)
-				_starpu_redux_init_data_replicate(handle, local_replicate, workerid);
-		}
+		/* If the replicate was not initialized yet, we have to do it now */
+		if (!local_replicate->initialized)
+			_starpu_redux_init_data_replicate(handle, local_replicate, workerid);
 	}
 
 	if (profiling && task->profiling_info)

+ 2 - 3
src/datawizard/coherency.h

@@ -51,8 +51,7 @@ LIST_TYPE(_starpu_data_replicate,
 	 * filters. */
 	unsigned relaxed_coherency;
 
-	/* In the case of a SCRATCH access, we need to initialize the replicate
-	 * with a neutral element before using it. */
+	/* We may need to initialize the replicate with some value before using it. */
 	unsigned initialized;
 
 	/* describes the state of the local data in term of coherency */
@@ -255,7 +254,7 @@ int _starpu_fetch_task_input(struct _starpu_job *j);
 
 unsigned _starpu_is_data_present_or_requested(struct _starpu_data_state *state, unsigned node);
 
-unsigned _starpu_select_src_node(struct _starpu_data_state *state, unsigned destination);
+int _starpu_select_src_node(struct _starpu_data_state *state, unsigned destination);
 
 /* is_prefetch is whether the DSM may drop the request (when there is not enough memory for instance
  * async is whether the caller wants a reference on the last request, to be

+ 2 - 0
src/datawizard/copy_driver.c

@@ -493,6 +493,8 @@ int STARPU_ATTRIBUTE_WARN_UNUSED_RESULT _starpu_driver_copy_data_1_to_1(starpu_d
 			req->com_id = com_id;
 #endif
 
+		dst_replicate->initialized = 1;
+
 		_STARPU_TRACE_START_DRIVER_COPY(src_node, dst_node, size, com_id);
 		ret_copy = copy_data_1_to_1_generic(handle, src_replicate, dst_replicate, req);
 		if (!req)

+ 1 - 0
src/datawizard/filters.c

@@ -220,6 +220,7 @@ void starpu_data_partition(starpu_data_handle_t initial_handle, struct starpu_da
 			child_replicate->refcnt = 0;
 			child_replicate->memory_node = node;
 			child_replicate->relaxed_coherency = 0;
+			child_replicate->initialized = initial_replicate->initialized;
 
 			/* update the interface */
 			void *initial_interface = starpu_data_get_interface_on_node(initial_handle, node);

+ 2 - 0
src/datawizard/interfaces/data_interface.c

@@ -246,12 +246,14 @@ static void _starpu_register_new_data(starpu_data_handle_t handle,
 			replicate->state = STARPU_OWNER;
 			replicate->allocated = 1;
 			replicate->automatically_allocated = 0;
+			replicate->initialized = 1;
 		}
 		else
 		{
 			/* the value is not available here yet */
 			replicate->state = STARPU_INVALID;
 			replicate->allocated = 0;
+			replicate->initialized = 0;
 		}
 	}
 

+ 1 - 0
src/datawizard/memalloc.c

@@ -789,6 +789,7 @@ void _starpu_request_mem_chunk_removal(starpu_data_handle_t handle, struct _star
 	mc->replicate = NULL;
 	replicate->allocated = 0;
 	replicate->automatically_allocated = 0;
+	replicate->initialized = 0;
 
 	_starpu_spin_lock(&mc_lock[node]);
 

+ 1 - 0
tests/Makefile.am

@@ -173,6 +173,7 @@ noinst_PROGRAMS =				\
 	datawizard/mpi_like			\
 	datawizard/mpi_like_async		\
 	datawizard/critical_section_with_void_interface\
+	datawizard/increment_init		\
 	datawizard/increment_redux		\
 	datawizard/increment_redux_v2		\
 	datawizard/increment_redux_lazy		\

+ 214 - 0
tests/datawizard/increment_init.c

@@ -0,0 +1,214 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2010, 2012-2014  Université de Bordeaux 1
+ * Copyright (C) 2010, 2011, 2012  Centre National de la Recherche Scientifique
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <config.h>
+#include <starpu.h>
+#include "../helper.h"
+
+static starpu_data_handle_t handle;
+
+/*
+ *	Reduction methods
+ */
+
+#ifdef STARPU_USE_CUDA
+static void neutral_cuda_kernel(void *descr[], void *arg)
+{
+	STARPU_SKIP_IF_VALGRIND;
+
+	unsigned *dst = (unsigned *)STARPU_VARIABLE_GET_PTR(descr[0]);
+
+	/* This is a dummy technique of course */
+	unsigned host_dst = 0;
+	cudaMemcpyAsync(dst, &host_dst, sizeof(unsigned), cudaMemcpyHostToDevice, starpu_cuda_get_local_stream());
+	cudaStreamSynchronize(starpu_cuda_get_local_stream());
+}
+#endif
+
+#ifdef STARPU_USE_OPENCL
+static void neutral_opencl_kernel(void *descr[], void *arg)
+{
+	STARPU_SKIP_IF_VALGRIND;
+
+	unsigned h_dst = 0;
+	cl_mem d_dst = (cl_mem)STARPU_VARIABLE_GET_PTR(descr[0]);
+
+	cl_command_queue queue;
+	starpu_opencl_get_current_queue(&queue);
+
+	clEnqueueWriteBuffer(queue, d_dst, CL_TRUE, 0, sizeof(unsigned), (void *)&h_dst, 0, NULL, NULL);
+	clFinish(queue);
+}
+#endif
+
+
+
+static void neutral_cpu_kernel(void *descr[], void *arg)
+{
+	STARPU_SKIP_IF_VALGRIND;
+
+	unsigned *dst = (unsigned *)STARPU_VARIABLE_GET_PTR(descr[0]);
+	*dst = 0;
+}
+
+static struct starpu_codelet neutral_cl =
+{
+#ifdef STARPU_USE_CUDA
+	.cuda_funcs = {neutral_cuda_kernel, NULL},
+#endif
+#ifdef STARPU_USE_OPENCL
+	.opencl_funcs = {neutral_opencl_kernel, NULL},
+#endif
+	.cpu_funcs = {neutral_cpu_kernel, NULL},
+	.modes = {STARPU_W},
+	.nbuffers = 1
+};
+
+/*
+ *	Increment codelet
+ */
+
+#ifdef STARPU_USE_OPENCL
+/* dummy OpenCL implementation */
+static void increment_opencl_kernel(void *descr[], void *cl_arg STARPU_ATTRIBUTE_UNUSED)
+{
+	STARPU_SKIP_IF_VALGRIND;
+
+	cl_mem d_token = (cl_mem)STARPU_VARIABLE_GET_PTR(descr[0]);
+	unsigned h_token;
+
+	cl_command_queue queue;
+	starpu_opencl_get_current_queue(&queue);
+
+	clEnqueueReadBuffer(queue, d_token, CL_TRUE, 0, sizeof(unsigned), (void *)&h_token, 0, NULL, NULL);
+	h_token++;
+	clEnqueueWriteBuffer(queue, d_token, CL_TRUE, 0, sizeof(unsigned), (void *)&h_token, 0, NULL, NULL);
+	clFinish(queue);
+}
+#endif
+
+
+#ifdef STARPU_USE_CUDA
+static void increment_cuda_kernel(void *descr[], void *arg)
+{
+	STARPU_SKIP_IF_VALGRIND;
+
+	unsigned *tokenptr = (unsigned *)STARPU_VARIABLE_GET_PTR(descr[0]);
+	unsigned host_token;
+
+	/* This is a dummy technique of course */
+	cudaMemcpyAsync(&host_token, tokenptr, sizeof(unsigned), cudaMemcpyDeviceToHost, starpu_cuda_get_local_stream());
+	cudaStreamSynchronize(starpu_cuda_get_local_stream());
+
+	host_token++;
+
+	cudaMemcpyAsync(tokenptr, &host_token, sizeof(unsigned), cudaMemcpyHostToDevice, starpu_cuda_get_local_stream());
+	cudaStreamSynchronize(starpu_cuda_get_local_stream());
+}
+#endif
+
+static void increment_cpu_kernel(void *descr[], void *arg)
+{
+	STARPU_SKIP_IF_VALGRIND;
+
+	unsigned *tokenptr = (unsigned *)STARPU_VARIABLE_GET_PTR(descr[0]);
+	*tokenptr = *tokenptr + 1;
+}
+
+static struct starpu_codelet increment_cl =
+{
+#ifdef STARPU_USE_CUDA
+	.cuda_funcs = {increment_cuda_kernel, NULL},
+#endif
+#ifdef STARPU_USE_OPENCL
+	.opencl_funcs = {increment_opencl_kernel, NULL},
+#endif
+	.cpu_funcs = {increment_cpu_kernel, NULL},
+	.nbuffers = 1,
+	.modes = {STARPU_RW}
+};
+
+int main(int argc, char **argv)
+{
+	unsigned *pvar;
+	int ret;
+
+	ret = starpu_init(NULL);
+	if (ret == -ENODEV) return STARPU_TEST_SKIPPED;
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
+
+	starpu_variable_data_register(&handle, -1, 0, sizeof(unsigned));
+
+	starpu_data_set_reduction_methods(handle, NULL, &neutral_cl);
+
+#ifdef STARPU_QUICK_CHECK
+	unsigned ntasks = 32;
+	unsigned nloops = 4;
+#else
+	unsigned ntasks = 1024;
+	unsigned nloops = 16;
+#endif
+
+	unsigned loop;
+	unsigned t;
+
+	for (loop = 0; loop < nloops; loop++)
+	{
+		for (t = 0; t < ntasks; t++)
+		{
+			struct starpu_task *task = starpu_task_create();
+
+			task->cl = &increment_cl;
+			task->handles[0] = handle;
+
+			ret = starpu_task_submit(task);
+			if (ret == -ENODEV) goto enodev;
+			STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");
+		}
+
+		ret = starpu_data_acquire(handle, STARPU_R);
+		pvar = starpu_data_handle_to_pointer(handle, 0);
+		STARPU_CHECK_RETURN_VALUE(ret, "starpu_data_acquire");
+		if (*pvar != ntasks)
+		{
+			FPRINTF(stderr, "[end of loop] Value %u != Expected value %u\n", *pvar, ntasks * (loop+1));
+			starpu_data_release(handle);
+			starpu_data_unregister(handle);
+			goto err;
+		}
+		starpu_data_release(handle);
+		starpu_data_invalidate(handle);
+	}
+
+	starpu_data_unregister(handle);
+	starpu_shutdown();
+
+	return EXIT_SUCCESS;
+
+enodev:
+	starpu_data_unregister(handle);
+	fprintf(stderr, "WARNING: No one can execute this task\n");
+	/* yes, we do not perform the computation but we did detect that no one
+ 	 * could perform the kernel, so this is not an error from StarPU */
+	starpu_shutdown();
+	return STARPU_TEST_SKIPPED;
+
+err:
+	starpu_shutdown();
+	STARPU_RETURN(EXIT_FAILURE);
+
+}