浏览代码

Add a new access mode: STARPU_SCRATCH. When this mode is specified, a temporary
buffer will be allocated to the task that requested it, but no data coherency
is enforced for that type of data.

Cédric Augonnet 15 年之前
父节点
当前提交
e8068ec12d

+ 1 - 0
include/starpu_data.h

@@ -33,6 +33,7 @@ extern "C" {
 #define STARPU_R	(1<<0)
 #define STARPU_W	(1<<1)
 #define STARPU_RW	(STARPU_R|STARPU_W)
+#define STARPU_SCRATCH	(1<<2)
 typedef uint32_t starpu_access_mode;
 
 typedef struct starpu_buffer_descr_t {

+ 3 - 0
src/core/dependencies/data_concurrency.c

@@ -120,6 +120,9 @@ static unsigned attempt_to_submit_data_request_from_job(starpu_job_t j, unsigned
 	starpu_data_handle handle = j->ordered_buffers[buffer_index].handle;
 	starpu_access_mode mode = j->ordered_buffers[buffer_index].mode;
 
+	if (mode & STARPU_SCRATCH)
+		return 0;
+
 	while (_starpu_spin_trylock(&handle->header_lock))
 		_starpu_datawizard_progress(_starpu_get_local_memory_node(), 0);
 

+ 55 - 0
src/core/dependencies/implicit_data_deps.c

@@ -20,6 +20,8 @@
 
 static void _starpu_detect_implicit_data_deps_with_handle(struct starpu_task *task, starpu_data_handle handle, starpu_access_mode mode)
 {
+	STARPU_ASSERT(!(mode & STARPU_SCRATCH));
+
 	PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
 
 	if (handle->sequential_consistency)
@@ -114,6 +116,59 @@ void _starpu_detect_implicit_data_deps(struct starpu_task *task)
 		starpu_data_handle handle = task->buffers[buffer].handle;
 		starpu_access_mode mode = task->buffers[buffer].mode;
 
+		/* Scratch memory does not introduce any deps */
+		if (mode & STARPU_SCRATCH)
+			continue;
+
 		_starpu_detect_implicit_data_deps_with_handle(task, handle, mode);
 	}
 }
+
+/* This function is called when a task has been executed so that we don't
+ * create dependencies to task that do not exist anymore. */
+void _starpu_release_data_enforce_sequential_consistency(struct starpu_task *task, starpu_data_handle handle)
+{
+	PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
+
+	if (handle->sequential_consistency)
+	{
+		/* If this is the last writer, there is no point in adding
+		 * extra deps to that tasks that does not exists anymore */
+		if (task == handle->last_submitted_writer)
+			handle->last_submitted_writer = NULL;
+
+		/* Same if this is one of the readers: we go through the list
+		 * of readers and remove the task if it is found. */
+		struct starpu_task_list *l;
+		l = handle->last_submitted_readers;
+		struct starpu_task_list *prev = NULL;
+		while (l)
+		{
+			struct starpu_task_list *next = l->next;
+
+			if (l->task == task)
+			{
+				/* If we found the task in the reader list */
+				free(l);
+
+				if (prev)
+				{
+					prev->next = next;
+				}
+				else {
+					/* This is the first element of the list */
+					handle->last_submitted_readers = next;
+				}
+			}
+			else {
+				prev = l;
+			}
+
+			l = next;
+		}
+	}
+
+	PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
+}
+
+

+ 1 - 0
src/core/dependencies/implicit_data_deps.h

@@ -21,6 +21,7 @@
 #include <common/config.h>
 
 void _starpu_detect_implicit_data_deps(struct starpu_task *task);
+void _starpu_release_data_enforce_sequential_consistency(struct starpu_task *task, starpu_data_handle handle);
 
 #endif // __IMPLICIT_DATA_DEPS_H__
 

+ 44 - 50
src/datawizard/coherency.c

@@ -284,6 +284,8 @@ static int fetch_data(starpu_data_handle handle, starpu_access_mode mode)
 	read = (mode & STARPU_R); /* then R or STARPU_RW */
 	write = (mode & STARPU_W); /* then STARPU_W or STARPU_RW */
 
+	STARPU_ASSERT(!(mode & STARPU_SCRATCH));
+
 	return _starpu_fetch_data_on_node(handle, requesting_node, read, write, 0);
 }
 
@@ -334,6 +336,9 @@ int _starpu_prefetch_task_input_on_node(struct starpu_task *task, uint32_t node)
 		handle = descr->handle;
 		
 		uint32_t mode = task->buffers[index].mode;
+
+		if (mode & STARPU_SCRATCH)
+			continue;
 	
 		uint8_t read = (mode & STARPU_R);
 		uint8_t write = (mode & STARPU_W);
@@ -344,14 +349,10 @@ int _starpu_prefetch_task_input_on_node(struct starpu_task *task, uint32_t node)
 	return 0;
 }
 
-
-
 int _starpu_fetch_task_input(struct starpu_task *task, uint32_t mask)
 {
 	STARPU_TRACE_START_FETCH_INPUT(NULL);
 
-//	fprintf(stderr, "_starpu_fetch_task_input\n");
-
 	uint32_t local_memory_node = _starpu_get_local_memory_node();
 
 	starpu_buffer_descr *descrs = task->buffers;
@@ -367,13 +368,34 @@ int _starpu_fetch_task_input(struct starpu_task *task, uint32_t mask)
 		descr = &descrs[index];
 
 		handle = descr->handle;
-	
-		ret = fetch_data(handle, descr->mode);
-		if (STARPU_UNLIKELY(ret))
-			goto enomem;
+		starpu_access_mode mode = descr->mode;
+
+		void *interface;
+
+		if (mode & STARPU_SCRATCH)
+		{
+			/* This is a scratch memory, so we duplicate (any of)
+			 * the interface which contains sufficient information
+			 * to allocate the buffer. */
+			size_t interface_size = handle->ops->interface_size;
+			void *src_interface = starpu_data_get_interface_on_node(handle, local_memory_node);
+			interface = malloc(interface_size);
+			STARPU_ASSERT(interface);
+			memcpy(interface, src_interface, interface_size);
+
+			/* Pass the interface to StarPU so that the buffer can be allocated */
+			_starpu_allocate_interface(handle, interface, local_memory_node);
+		}
+		else {
+			/* That's a "normal" buffer (R/W) */
+			ret = fetch_data(handle, mode);
+			if (STARPU_UNLIKELY(ret))
+				goto enomem;
 
-		void *src_interface = starpu_data_get_interface_on_node(handle, local_memory_node);
-		task->interface[index] = src_interface;
+			interface = starpu_data_get_interface_on_node(handle, local_memory_node);
+		}
+		
+		task->interface[index] = interface;
 	}
 
 	STARPU_TRACE_END_FETCH_INPUT(NULL);
@@ -404,50 +426,22 @@ void _starpu_push_task_output(struct starpu_task *task, uint32_t mask)
 	for (index = 0; index < nbuffers; index++)
 	{
 		starpu_data_handle handle = descrs[index].handle;
+		starpu_access_mode mode = descrs[index].mode;
 
-		_starpu_release_data_on_node(handle, mask, local_node);
+		if (mode & STARPU_SCRATCH)
+		{
+			void *interface = task->interface[index];
 
-		PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
+//			fprintf(stderr, "liberate interface %p\n", interface);
 
-		if (handle->sequential_consistency)
-		{
-			/* If this is the last writer, there is no point in adding
-			 * extra deps to that tasks that does not exists anymore */
-			if (task == handle->last_submitted_writer)
-				handle->last_submitted_writer = NULL;
-
-			/* Same if this is one of the readers: we go through the list
-			 * of readers and remove the task if it is found. */
-			struct starpu_task_list *l;
-			l = handle->last_submitted_readers;
-			struct starpu_task_list *prev = NULL;
-			while (l)
-			{
-				struct starpu_task_list *next = l->next;
-
-				if (l->task == task)
-				{
-					/* If we found the task in the reader list */
-					free(l);
-
-					if (prev)
-					{
-						prev->next = next;
-					}
-					else {
-						/* This is the first element of the list */
-						handle->last_submitted_readers = next;
-					}
-				}
-				else {
-					prev = l;
-				}
-
-				l = next;
-			}
-		}
+			handle->ops->liberate_data_on_node(interface, local_node);
 
-		PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
+			free(interface);
+		}
+		else {
+			_starpu_release_data_on_node(handle, mask, local_node);
+			_starpu_release_data_enforce_sequential_consistency(task, handle);
+		}
 	}
 
 	STARPU_TRACE_END_PUSH_OUTPUT(NULL);

+ 5 - 5
src/datawizard/memalloc.c

@@ -588,7 +588,7 @@ static size_t liberate_memory_on_node(starpu_mem_chunk_t mc, uint32_t node)
  *
  */
 
-static size_t _starpu_allocate_interface(starpu_data_handle handle, void *interface, uint32_t dst_node)
+size_t _starpu_allocate_interface(starpu_data_handle handle, void *interface, uint32_t dst_node)
 {
 	unsigned attempts = 0;
 	size_t allocated_memory;
@@ -629,10 +629,6 @@ static size_t _starpu_allocate_interface(starpu_data_handle handle, void *interf
 		
 	} while(!allocated_memory && attempts++ < 2);
 
-	/* perhaps we could really not handle that capacity misses */
-	if (allocated_memory)
-		register_mem_chunk(handle, dst_node, allocated_memory, 1);
-
 	return allocated_memory;
 }
 
@@ -656,6 +652,10 @@ int _starpu_allocate_memory_on_node(starpu_data_handle handle, uint32_t dst_node
 	if (!allocated_memory)
 		return ENOMEM;
 
+	/* perhaps we could really not handle that capacity misses */
+	if (allocated_memory)
+		register_mem_chunk(handle, dst_node, allocated_memory, 1);
+
 	handle->per_node[dst_node].allocated = 1;
 	handle->per_node[dst_node].automatically_allocated = 1;
 

+ 1 - 0
src/datawizard/memalloc.h

@@ -46,6 +46,7 @@ LIST_TYPE(starpu_mem_chunk,
 void _starpu_init_mem_chunk_lists(void);
 void _starpu_deinit_mem_chunk_lists(void);
 void _starpu_request_mem_chunk_removal(starpu_data_handle handle, unsigned node);
+size_t _starpu_allocate_interface(starpu_data_handle handle, void *interface, uint32_t dst_node);
 int _starpu_allocate_memory_on_node(starpu_data_handle handle, uint32_t dst_node, unsigned may_alloc);
 size_t _starpu_liberate_all_automatically_allocated_buffers(uint32_t node);
 

+ 4 - 0
tests/Makefile.am

@@ -92,6 +92,7 @@ check_PROGRAMS += 				\
 	core/declare_deps_after_submission_synchronous	\
 	core/get_current_task			\
 	datawizard/data_implicit_deps		\
+	datawizard/scratch			\
 	datawizard/sync_and_notify_data		\
 	datawizard/dsm_stress			\
 	datawizard/write_only_tmp_buffer	\
@@ -179,6 +180,9 @@ core_get_current_task_SOURCES =			\
 datawizard_data_implicit_deps_SOURCES =		\
 	datawizard/data_implicit_deps.c
 
+datawizard_scratch_SOURCES =			\
+	datawizard/scratch.c
+
 datawizard_dsm_stress_SOURCES =			\
 	datawizard/dsm_stress.c
 

+ 91 - 0
tests/datawizard/scratch.c

@@ -0,0 +1,91 @@
+/*
+ * StarPU
+ * Copyright (C) INRIA 2008-2009 (see AUTHORS file)
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <stdio.h>
+#include <unistd.h>
+#include <errno.h>
+#include <starpu.h>
+#include <stdlib.h>
+
+#define NLOOPS		128
+#define VECTORSIZE	1024
+
+static unsigned *A;
+starpu_data_handle A_handle, B_handle;
+
+static unsigned var = 0;
+
+static void f(void *descr[], __attribute__ ((unused)) void *_args)
+{
+	unsigned *v = (unsigned *)STARPU_GET_VECTOR_PTR(descr[0]);
+	unsigned *tmp = (unsigned *)STARPU_GET_VECTOR_PTR(descr[1]);
+
+	unsigned nx = STARPU_GET_VECTOR_NX(descr[0]);
+	size_t elemsize = STARPU_GET_VECTOR_ELEMSIZE(descr[0]);
+	
+	memcpy(tmp, v, nx*elemsize);
+
+	unsigned i;
+	for (i = 0; i < nx; i++)
+	{
+		v[i] = tmp[i] + 1;
+	}
+}
+
+static starpu_codelet cl_f = {
+	.where = STARPU_CPU|STARPU_CUDA,
+	.cpu_func = f,
+	.cuda_func = f,
+	.nbuffers = 2
+};
+
+int main(int argc, char **argv)
+{
+	starpu_init(NULL);
+
+	A = calloc(VECTORSIZE, sizeof(unsigned));
+
+	starpu_vector_data_register(&A_handle, 0, (uintptr_t)A, VECTORSIZE, sizeof(unsigned));
+	starpu_vector_data_register(&B_handle, -1, (uintptr_t)NULL, VECTORSIZE, sizeof(unsigned));
+
+	unsigned loop;
+	for (loop = 0; loop < NLOOPS; loop++)
+	{
+		struct starpu_task *task_f = starpu_task_create();
+		task_f->cl = &cl_f;
+		task_f->buffers[0].handle = A_handle;
+		task_f->buffers[0].mode = STARPU_RW;
+		task_f->buffers[1].handle = B_handle;
+		task_f->buffers[1].mode = STARPU_SCRATCH;
+		starpu_task_submit(task_f);
+	}
+
+	starpu_task_wait_for_all();
+
+	starpu_data_sync_with_mem(A_handle, STARPU_R);	
+
+	unsigned i;
+	for (i = 0; i < VECTORSIZE; i++)
+	{
+		STARPU_ASSERT(A[i] == NLOOPS);
+	}
+
+	starpu_data_release_from_mem(A_handle);
+
+	starpu_shutdown();
+
+	return 0;
+}