Browse Source

include the original handle data into the reduction. Handle the case where it is lazily allocated. Fixes increment_redux_lazy

Samuel Thibault 13 years ago
parent
commit
79ee0e721b
1 changed files with 71 additions and 26 deletions
  1. 71 26
      src/datawizard/reduction.c

+ 71 - 26
src/datawizard/reduction.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2010-2011  Université de Bordeaux 1
+ * Copyright (C) 2010-2012  Université de Bordeaux 1
  * Copyright (C) 2011, 2012  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -17,6 +17,7 @@
 
 #include <starpu.h>
 #include <common/utils.h>
+#include <util/starpu_data_cpy.h>
 #include <core/task.h>
 #include <datawizard/datawizard.h>
 
@@ -105,10 +106,25 @@ void _starpu_data_start_reduction_mode(starpu_data_handle_t handle)
 void _starpu_data_end_reduction_mode(starpu_data_handle_t handle)
 {
 	unsigned worker;
+	unsigned node;
+	unsigned empty; /* Whether the handle is initially unallocated */
 
 	/* Put every valid replicate in the same array */
 	unsigned replicate_count = 0;
-	starpu_data_handle_t replicate_array[STARPU_NMAXWORKERS];
+	starpu_data_handle_t replicate_array[1 + STARPU_NMAXWORKERS];
+
+	for (node = 0; node < STARPU_MAXNODES; node++)
+	{
+		if (handle->per_node[node].state != STARPU_INVALID)
+			break;
+	}
+	empty = node == STARPU_MAXNODES;
+
+#ifndef NO_TREE_REDUCTION
+	if (!empty)
+		/* Include the initial value into the reduction tree */
+		replicate_array[replicate_count++] = handle;
+#endif
 
 	/* Register all valid per-worker replicates */
 	unsigned nworkers = starpu_worker_get_count();
@@ -134,15 +150,33 @@ void _starpu_data_end_reduction_mode(starpu_data_handle_t handle)
 	}
 
 #ifndef NO_TREE_REDUCTION
-	handle->reduction_refcnt = 1;
+	if (empty) {
+		/* Only the final copy will touch the actual handle */
+		handle->reduction_refcnt = 1;
+	} else {
+		unsigned step = 1;
+		handle->reduction_refcnt = 0;
+		while (step < replicate_count)
+		{
+			/* Each stage will touch the actual handle */
+			handle->reduction_refcnt++;
+			step *= 2;
+		}
+	}
 #else
 	/* We know that in this reduction algorithm there is exactly one task per valid replicate. */
-	handle->reduction_refcnt = replicate_count;
+	handle->reduction_refcnt = replicate_count + empty;
 #endif
 
 //	fprintf(stderr, "REDUX REFCNT = %d\n", handle->reduction_refcnt);
 
-	if (replicate_count > 0)
+	if (replicate_count >
+#ifndef NO_TREE_REDUCTION
+			!empty
+#else
+			0
+#endif
+			)
 	{
 		/* Temporarily unlock the handle */
 		_starpu_spin_unlock(&handle->header_lock);
@@ -153,8 +187,10 @@ void _starpu_data_end_reduction_mode(starpu_data_handle_t handle)
 		struct starpu_task *last_replicate_deps[replicate_count];
 		memset(last_replicate_deps, 0, replicate_count*sizeof(struct starpu_task *));
 
+		/* Redux step-by-step for step from 1 to replicate_count/2, i.e.
+		 * 1-by-1, then 2-by-2, then 4-by-4, etc. */
 		unsigned step = 1;
-		while (step <= replicate_count)
+		while (step < replicate_count)
 		{
 			unsigned i;
 			for (i = 0; i < replicate_count; i+=2*step)
@@ -165,6 +201,13 @@ void _starpu_data_end_reduction_mode(starpu_data_handle_t handle)
 					 * and i+step and put the result in replicate i */
 					struct starpu_task *redux_task = starpu_task_create();
 
+					/* Mark these tasks so that StarPU does not block them
+					 * when they try to access the handle (normal tasks are
+					 * data requests to that handle are frozen until the
+					 * data is coherent again). */
+					struct _starpu_job *j = _starpu_get_job_associated_to_task(redux_task);
+					j->reduction_task = 1;
+
 					redux_task->cl = handle->redux_cl;
 					STARPU_ASSERT(redux_task->cl);
 
@@ -200,31 +243,33 @@ void _starpu_data_end_reduction_mode(starpu_data_handle_t handle)
 			step *= 2;
 		}
 
-		struct starpu_task *redux_task = starpu_task_create();
-
-		/* Mark these tasks so that StarPU does not block them
-		 * when they try to access the handle (normal tasks are
-		 * data requests to that handle are frozen until the
-		 * data is coherent again). */
-		struct _starpu_job *j = _starpu_get_job_associated_to_task(redux_task);
-		j->reduction_task = 1;
+		if (empty)
+			/* The handle was empty, we just need to copy the reduced value. */
+			_starpu_data_cpy(handle, replicate_array[0], 1, NULL, 0, 1);
 
-		redux_task->cl = handle->redux_cl;
-		STARPU_ASSERT(redux_task->cl);
-
-		redux_task->handles[0] = handle;
-		redux_task->cl->modes[0] = STARPU_RW;
+#else
+		if (empty) {
+			struct starpu_task *redux_task = starpu_task_create();
 
-		redux_task->handles[1] = replicate_array[0];
-		redux_task->cl->modes[1] = STARPU_R;
+			/* Mark these tasks so that StarPU does not block them
+			 * when they try to access the handle (normal tasks are
+			 * data requests to that handle are frozen until the
+			 * data is coherent again). */
+			struct _starpu_job *j = _starpu_get_job_associated_to_task(redux_task);
+			j->reduction_task = 1;
 
-		if (last_replicate_deps[0])
-			starpu_task_declare_deps_array(redux_task, 1, &last_replicate_deps[0]);
+			redux_task->cl = handle->init_cl;
+			STARPU_ASSERT(redux_task->cl);
+#ifdef STARPU_DEVEL
+#  warning the mode should already be set in the codelet. Only check they are valid?
+#endif
+			redux_task->cl->modes[0] = STARPU_W;
+			redux_task->handles[0] = handle;
 
-		int ret = starpu_task_submit(redux_task);
-		STARPU_ASSERT(!ret);
+			int ret = starpu_task_submit(redux_task);
+			STARPU_ASSERT(!ret);
+		}
 
-#else
 		/* Create a set of tasks to perform the reduction */
 		unsigned replicate;
 		for (replicate = 0; replicate < replicate_count; replicate++)