Sfoglia il codice sorgente

Implement tree-based reductions

Cédric Augonnet 14 anni fa
parent
commit
e06529d93f
1 ha cambiato i file con 109 aggiunte e 17 eliminazioni
  1. 109 17
      src/datawizard/reduction.c

+ 109 - 17
src/datawizard/reduction.c

@@ -90,12 +90,16 @@ void starpu_data_start_reduction_mode(starpu_data_handle handle)
 	}
 }
 
+//#define NO_TREE_REDUCTION
+
 /* Force reduction. The lock should already have been taken.  */
 void starpu_data_end_reduction_mode(starpu_data_handle handle)
 {
 	unsigned worker;
 
-	handle->reduction_refcnt = 0;
+	/* Put every valid replicate in the same array */
+	unsigned replicate_count = 0;
+	starpu_data_handle replicate_array[STARPU_NMAXWORKERS];
 
 	/* Register all valid per-worker replicates */
 	for (worker = 0; worker < STARPU_NMAXWORKERS; worker++)
@@ -109,49 +113,137 @@ void starpu_data_end_reduction_mode(starpu_data_handle handle)
 			starpu_data_register(&handle->reduction_tmp_handles[worker],
 				home_node, handle->per_worker[worker].interface, handle->ops);
 
-			/* We know that in this reduction algorithm there is exactly one task per valid replicate. */
-			handle->reduction_refcnt++;
+			starpu_data_set_sequential_consistency_flag(handle->reduction_tmp_handles[worker], 0);
+
+			replicate_array[replicate_count++] = handle->reduction_tmp_handles[worker];
 		}
 		else {
 			handle->reduction_tmp_handles[worker] = NULL;
 		}
 	}
 
+#ifndef NO_TREE_REDUCTION
+	handle->reduction_refcnt = 1;
+#else
+	/* We know that in this reduction algorithm there is exactly one task per valid replicate. */
+	handle->reduction_refcnt = replicate_count;
+#endif
+
 //	fprintf(stderr, "REDUX REFCNT = %d\n", handle->reduction_refcnt);
 	
-	/* Temporarily unlock the handle */
-	_starpu_spin_unlock(&handle->header_lock);
-
-	/* Create a set of tasks to perform the reduction */
-	for (worker = 0; worker < STARPU_NMAXWORKERS; worker++)
+	if (replicate_count > 0)
 	{
-		if (handle->reduction_tmp_handles[worker])
+		/* Temporarily unlock the handle */
+		_starpu_spin_unlock(&handle->header_lock);
+
+#ifndef NO_TREE_REDUCTION
+		/* We will store a pointer to the last task which should modify the
+		 * replicate */
+		struct starpu_task *last_replicate_deps[replicate_count];
+		memset(last_replicate_deps, 0, replicate_count*sizeof(struct starpu_task *));
+	
+		unsigned step = 1;
+		while (step <= replicate_count)
 		{
-			struct starpu_task *redux_task = starpu_task_create();
+			unsigned i;
+			for (i = 0; i < replicate_count; i+=2*step)
+			{
+				if (i + step < replicate_count)
+				{
+					/* Perform the reduction between replicates i
+					 * and i+step and put the result in replicate i */
+					struct starpu_task *redux_task = starpu_task_create();
+		
+					redux_task->cl = handle->redux_cl;
+					STARPU_ASSERT(redux_task->cl);
+		
+					redux_task->buffers[0].handle = replicate_array[i];
+					redux_task->buffers[0].mode = STARPU_RW;
+		
+					redux_task->buffers[1].handle = replicate_array[i+step];
+					redux_task->buffers[1].mode = STARPU_R;
+	
+					redux_task->detach = 0;
+	
+					int ndeps = 0;
+					struct starpu_task *task_deps[2];
+	
+					if (last_replicate_deps[i])
+						task_deps[ndeps++] = last_replicate_deps[i];
+	
+					if (last_replicate_deps[i+step])
+						task_deps[ndeps++] = last_replicate_deps[i+step];
+	
+					/* i depends on this task */
+					last_replicate_deps[i] = redux_task;
+	
+					/* we don't perform the reduction until both replicates are ready */
+					starpu_task_declare_deps_array(redux_task, ndeps, task_deps); 
+		
+					int ret = starpu_task_submit(redux_task);
+					STARPU_ASSERT(!ret);
+		
+				}
+			}
+
+			step *= 2;
+		}
+	
+		struct starpu_task *redux_task = starpu_task_create();
+
+		/* Mark these tasks so that StarPU does not block them
+		 * when they try to access the handle (normal tasks are
+		 * data requests to that handle are frozen until the
+		 * data is coherent again). */
+		starpu_job_t j = _starpu_get_job_associated_to_task(redux_task);
+		j->reduction_task = 1;
 
+		redux_task->cl = handle->redux_cl;
+		STARPU_ASSERT(redux_task->cl);
+
+		redux_task->buffers[0].handle = handle;
+		redux_task->buffers[0].mode = STARPU_RW;
+
+		redux_task->buffers[1].handle = replicate_array[0];
+		redux_task->buffers[1].mode = STARPU_R;
+
+		if (last_replicate_deps[0])
+			starpu_task_declare_deps_array(redux_task, 1, &last_replicate_deps[0]);
+
+		int ret = starpu_task_submit(redux_task);
+		STARPU_ASSERT(!ret);
+
+#else
+		/* Create a set of tasks to perform the reduction */
+		unsigned replicate;
+		for (replicate = 0; replicate < replicate_count; replicate++)
+		{
+			struct starpu_task *redux_task = starpu_task_create();
+	
 			/* Mark these tasks so that StarPU does not block them
 			 * when they try to access the handle (normal tasks are
 			 * data requests to that handle are frozen until the
 			 * data is coherent again). */
 			starpu_job_t j = _starpu_get_job_associated_to_task(redux_task);
 			j->reduction_task = 1;
-
+	
 			redux_task->cl = handle->redux_cl;
 			STARPU_ASSERT(redux_task->cl);
-
+	
 			redux_task->buffers[0].handle = handle;
 			redux_task->buffers[0].mode = STARPU_RW;
-
-			redux_task->buffers[1].handle = handle->reduction_tmp_handles[worker];
+	
+			redux_task->buffers[1].handle = replicate_array[replicate];
 			redux_task->buffers[1].mode = STARPU_R;
-
+	
 			int ret = starpu_task_submit(redux_task);
 			STARPU_ASSERT(!ret);
 		}
-	}
-
+#endif
 	/* Get the header lock back */
 	_starpu_spin_lock(&handle->header_lock);
+
+	}
 }
 
 void starpu_data_end_reduction_mode_terminate(starpu_data_handle handle)