Browse Source

Allow using the same parameter several times, in both read and write mode

Samuel Thibault 13 years ago
parent
commit
52b0917cf1

+ 1 - 2
doc/chapters/basic-api.texi

@@ -1103,8 +1103,7 @@ The @code{struct starpu_buffer_descr} structure is composed of two fields, the
 of entries in this array must be specified in the @code{nbuffers} field of the
 @code{struct starpu_codelet} structure, and should not excede @code{STARPU_NMAXBUFS}.
 If unsufficient, this value can be set with the @code{--enable-maxbuffers}
-option when configuring StarPU. As of the current implementation, a data handle
-can appear several times only if it is only accessed with @code{STARPU_R} mode.
+option when configuring StarPU.
 
 @item @code{cl_arg} (optional; default: @code{NULL})
 This pointer is passed to the codelet through the second argument

+ 1 - 0
include/starpu_data.h

@@ -30,6 +30,7 @@ typedef struct _starpu_data_state* starpu_data_handle_t;
 
 enum starpu_access_mode
 {
+	/* Values here matter for _starpu_compar_handles */
 	STARPU_R=(1<<0),
 	STARPU_W=(1<<1),
 	STARPU_RW=(STARPU_R|STARPU_W),

+ 6 - 0
src/core/dependencies/data_concurrency.c

@@ -205,6 +205,12 @@ static unsigned _submit_job_enforce_data_deps(struct _starpu_job *j, unsigned st
 	unsigned nbuffers = j->task->cl->nbuffers;
 	for (buf = start_buffer_index; buf < nbuffers; buf++)
 	{
+		if (buf && j->ordered_buffers[buf-1].handle == j->ordered_buffers[buf].handle)
+			/* We have already requested this data, skip it. This
+			 * depends on ordering putting writes before reads, see
+			 * _starpu_compar_handles.  */
+			continue;
+
                 if (attempt_to_submit_data_request_from_job(j, buf))
 		{
                         j->task->status = STARPU_TASK_BLOCKED_ON_DATA;

+ 8 - 5
src/core/dependencies/implicit_data_deps.c

@@ -37,7 +37,7 @@ static void _starpu_add_reader_after_writer(starpu_data_handle_t handle, struct
 	handle->last_submitted_readers = link;
 
 	/* This task depends on the previous writer if any */
-	if (handle->last_submitted_writer)
+	if (handle->last_submitted_writer && handle->last_submitted_writer != post_sync_task)
 	{
 		_STARPU_DEP_DEBUG("RAW %p\n", handle);
 		struct starpu_task *task_array[1] = {handle->last_submitted_writer};
@@ -74,7 +74,8 @@ static void _starpu_add_writer_after_readers(starpu_data_handle_t handle, struct
 	l = handle->last_submitted_readers;
 	while (l)
 	{
-		nreaders++;
+		if (l->task != post_sync_task)
+			nreaders++;
 		l = l->next;
 	}
 	_STARPU_DEP_DEBUG("%d readers\n", nreaders);
@@ -86,8 +87,10 @@ static void _starpu_add_writer_after_readers(starpu_data_handle_t handle, struct
 	while (l)
 	{
 		STARPU_ASSERT(l->task);
-		task_array[i++] = l->task;
-		_STARPU_DEP_DEBUG("dep %p -> %p\n", l->task, pre_sync_task);
+		if (l->task != post_sync_task) {
+			task_array[i++] = l->task;
+			_STARPU_DEP_DEBUG("dep %p -> %p\n", l->task, pre_sync_task);
+		}
 
 		struct _starpu_task_wrapper_list *prev = l;
 		l = l->next;
@@ -125,7 +128,7 @@ static void _starpu_add_writer_after_writer(starpu_data_handle_t handle, struct
 {
 	/* (Read) Write */
 	/* This task depends on the previous writer */
-	if (handle->last_submitted_writer)
+	if (handle->last_submitted_writer && handle->last_submitted_writer != post_sync_task)
 	{
 		struct starpu_task *task_array[1] = {handle->last_submitted_writer};
 		starpu_task_declare_deps_array(pre_sync_task, 1, task_array);

+ 19 - 5
src/datawizard/coherency.c

@@ -596,15 +596,16 @@ int starpu_prefetch_task_input_on_node(struct starpu_task *task, uint32_t node)
 	return 0;
 }
 
-int _starpu_fetch_task_input(struct starpu_task *task, uint32_t mask)
+int _starpu_fetch_task_input(struct _starpu_job *j, uint32_t mask)
 {
 	_STARPU_TRACE_START_FETCH_INPUT(NULL);
 
 	int profiling = starpu_profiling_status_get();
+	struct starpu_task *task = j->task;
 	if (profiling && task->profiling_info)
 		_starpu_clock_gettime(&task->profiling_info->acquire_data_start_time);
 
-	struct starpu_buffer_descr *descrs = task->buffers;
+	struct starpu_buffer_descr *descrs = j->ordered_buffers;
 	unsigned nbuffers = task->cl->nbuffers;
 
 	unsigned local_memory_node = _starpu_get_local_memory_node();
@@ -620,6 +621,12 @@ int _starpu_fetch_task_input(struct starpu_task *task, uint32_t mask)
 
 		struct _starpu_data_replicate *local_replicate;
 
+		if (index && descrs[index-1].handle == descrs[index].handle)
+			/* We have already released this data, skip it. This
+			 * depends on ordering putting writes before reads, see
+			 * _starpu_compar_handles */
+			continue;
+
 		if (mode & (STARPU_SCRATCH|STARPU_REDUX))
 		{
 			local_replicate = &handle->per_worker[workerid];
@@ -656,19 +663,20 @@ enomem:
 	/* XXX broken ... */
 	_STARPU_DISP("something went wrong with buffer %u\n", index);
 	//push_codelet_output(task, index, mask);
-	_starpu_push_task_output(task, mask);
+	_starpu_push_task_output(j, mask);
 	return -1;
 }
 
-void _starpu_push_task_output(struct starpu_task *task, uint32_t mask)
+void _starpu_push_task_output(struct _starpu_job *j, uint32_t mask)
 {
 	_STARPU_TRACE_START_PUSH_OUTPUT(NULL);
 
 	int profiling = starpu_profiling_status_get();
+	struct starpu_task *task = j->task;
 	if (profiling && task->profiling_info)
 		_starpu_clock_gettime(&task->profiling_info->release_data_start_time);
 
-        struct starpu_buffer_descr *descrs = task->buffers;
+        struct starpu_buffer_descr *descrs = j->ordered_buffers;
         unsigned nbuffers = task->cl->nbuffers;
 
 	unsigned index;
@@ -679,6 +687,12 @@ void _starpu_push_task_output(struct starpu_task *task, uint32_t mask)
 
 		struct _starpu_data_replicate *replicate;
 
+		if (index && descrs[index-1].handle == descrs[index].handle)
+			/* We have already released this data, skip it. This
+			 * depends on ordering putting writes before reads, see
+			 * _starpu_compar_handles */
+			continue;
+
 		if (mode & STARPU_RW)
 		{
 			unsigned local_node = _starpu_get_local_memory_node();

+ 2 - 2
src/datawizard/coherency.h

@@ -242,10 +242,10 @@ size_t _starpu_data_get_size(starpu_data_handle_t handle);
 
 uint32_t _starpu_data_get_footprint(starpu_data_handle_t handle);
 
-void _starpu_push_task_output(struct starpu_task *task, uint32_t mask);
+void _starpu_push_task_output(struct _starpu_job *j, uint32_t mask);
 
 __attribute__((warn_unused_result))
-int _starpu_fetch_task_input(struct starpu_task *task, uint32_t mask);
+int _starpu_fetch_task_input(struct _starpu_job *j, uint32_t mask);
 
 unsigned _starpu_is_data_present_or_requested(struct _starpu_data_state *state, uint32_t node);
 unsigned starpu_data_test_if_allocated_on_node(starpu_data_handle_t handle, uint32_t memory_node);

+ 25 - 6
src/datawizard/sort_data_handles.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2010  Université de Bordeaux 1
+ * Copyright (C) 2010-2011  Université de Bordeaux 1
  * Copyright (C) 2010, 2011  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -64,12 +64,31 @@ static int _compar_data_paths(const unsigned pathA[], unsigned depthA,
 
 /* A comparision function between two handles makes it possible to use qsort to
  * sort a list of handles */
-static int _starpu_compar_handles(struct _starpu_data_state *dataA,
-				  struct _starpu_data_state *dataB)
+static int _starpu_compar_handles(const struct starpu_buffer_descr *descrA,
+				  const struct starpu_buffer_descr *descrB)
 {
+	struct _starpu_data_state *dataA = descrA->handle;
+	struct _starpu_data_state *dataB = descrB->handle;
+
 	/* Perhaps we have the same piece of data */
-	if (dataA == dataB)
-		return 0;
+	if (dataA == dataB) {
+		/* Process write requests first, this is needed for proper
+		 * locking, see _submit_job_enforce_data_deps,
+		 * _starpu_fetch_task_input, and _starpu_push_task_output  */
+		if (descrA->mode & STARPU_W) {
+			if (descrB->mode & STARPU_W)
+				/* Both A and B write, take the reader first */
+				if (descrA->mode & STARPU_R)
+					return -1;
+				else
+					return 1;
+			else
+				/* Only A writes, take it first */
+				return -1;
+		} else
+			/* A doesn't write, take B before */
+			return 1;
+	}
 
 	/* In case we have data/subdata from different trees */
 	if (dataA->root_handle != dataB->root_handle)
@@ -91,7 +110,7 @@ static int _starpu_compar_buffer_descr(const void *_descrA, const void *_descrB)
 	const struct starpu_buffer_descr *descrA = (const struct starpu_buffer_descr *) _descrA;
 	const struct starpu_buffer_descr *descrB = (const struct starpu_buffer_descr *) _descrB;
 
-	return _starpu_compar_handles(descrA->handle, descrB->handle);
+	return _starpu_compar_handles(descrA, descrB);
 }
 
 /* The descr array will be overwritten, so this must be a copy ! */

+ 2 - 2
src/drivers/cpu/driver_cpu.c

@@ -37,7 +37,7 @@ static int execute_job_on_cpu(struct _starpu_job *j, struct _starpu_worker *cpu_
 
 	if (rank == 0)
 	{
-		ret = _starpu_fetch_task_input(task, 0);
+		ret = _starpu_fetch_task_input(j, 0);
 		if (ret != 0)
 		{
 			/* there was not enough memory so the codelet cannot be executed right now ... */
@@ -69,7 +69,7 @@ static int execute_job_on_cpu(struct _starpu_job *j, struct _starpu_worker *cpu_
 	{
 		_starpu_driver_update_job_feedback(j, cpu_args,
 				perf_arch, &codelet_start, &codelet_end);
-		_starpu_push_task_output(task, 0);
+		_starpu_push_task_output(j, 0);
 	}
 
 	return 0;

+ 2 - 2
src/drivers/cuda/driver_cuda.c

@@ -192,7 +192,7 @@ static int execute_job_on_cuda(struct _starpu_job *j, struct _starpu_worker *arg
 	if (cl->model && cl->model->benchmarking)
 		calibrate_model = 1;
 
-	ret = _starpu_fetch_task_input(task, mask);
+	ret = _starpu_fetch_task_input(j, mask);
 	if (ret != 0)
 	{
 		/* there was not enough memory, so the input of
@@ -225,7 +225,7 @@ static int execute_job_on_cuda(struct _starpu_job *j, struct _starpu_worker *arg
 
 	_starpu_driver_update_job_feedback(j, args, args->perf_arch, &codelet_start, &codelet_end);
 
-	_starpu_push_task_output(task, mask);
+	_starpu_push_task_output(j, mask);
 
 	return 0;
 }

+ 4 - 4
src/drivers/gordon/driver_gordon.c

@@ -178,7 +178,7 @@ static struct gordon_task_wrapper_s *starpu_to_gordon_job(struct _starpu_job *j)
 
 static void handle_terminated_job(struct _starpu_job *j)
 {
-	_starpu_push_task_output(j->task, 0);
+	_starpu_push_task_output(j, 0);
 	_starpu_handle_job_termination(j, 0);
 	starpu_wake_all_blocked_workers();
 }
@@ -216,7 +216,7 @@ static void gordon_callback_list_func(void *arg)
 			_starpu_update_perfmodel_history(j, j->task->cl->model, STARPU_GORDON_DEFAULT, cpuid, measured);
 		}
 
-		_starpu_push_task_output(j->task, 0);
+		_starpu_push_task_output(j, 0);
 		_starpu_handle_job_termination(j, 0);
 		//starpu_wake_all_blocked_workers();
 
@@ -254,7 +254,7 @@ static void gordon_callback_func(void *arg)
 int inject_task(struct _starpu_job *j, struct _starpu_worker *worker)
 {
 	struct starpu_task *task = j->task;
-	int ret = _starpu_fetch_task_input(task, 0);
+	int ret = _starpu_fetch_task_input(j, 0);
 
 	if (ret != 0)
 	{
@@ -315,7 +315,7 @@ int inject_task_list(struct _starpu_job_list *list, struct _starpu_worker *worke
 		int ret;
 
 		struct starpu_task *task = j->task;
-		ret = _starpu_fetch_task_input(task, 0);
+		ret = _starpu_fetch_task_input(j, 0);
 		STARPU_ASSERT(!ret);
 
 		gordon_jobs[index].index = _starpu_task_get_gordon_nth_implementation(task->cl, j->nimpl);

+ 2 - 2
src/drivers/opencl/driver_opencl.c

@@ -582,7 +582,7 @@ static int _starpu_opencl_execute_job(struct _starpu_job *j, struct _starpu_work
 	struct starpu_codelet *cl = task->cl;
 	STARPU_ASSERT(cl);
 
-	ret = _starpu_fetch_task_input(task, mask);
+	ret = _starpu_fetch_task_input(j, mask);
 	if (ret != 0)
 	{
 		/* there was not enough memory, so the input of
@@ -602,7 +602,7 @@ static int _starpu_opencl_execute_job(struct _starpu_job *j, struct _starpu_work
 	_starpu_driver_update_job_feedback(j, args, args->perf_arch,
 							&codelet_start, &codelet_end);
 
-	_starpu_push_task_output(task, mask);
+	_starpu_push_task_output(j, mask);
 
 	return EXIT_SUCCESS;
 }

+ 0 - 3
tests/datawizard/double_parameter.c

@@ -56,8 +56,6 @@ int main(int argc, char **argv)
 	STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");
 
 	SUBMIT(R,R);
-	/* Not possible yet */
-#if 0
 	SUBMIT(R,W);
 	SUBMIT(R,RW);
 	SUBMIT(W,R);
@@ -66,7 +64,6 @@ int main(int argc, char **argv)
 	SUBMIT(RW,R);
 	SUBMIT(RW,W);
 	SUBMIT(RW,RW);
-#endif
 
 	ret = starpu_task_wait_for_all();
 	STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_wait_for_all");