Kaynağa Gözat

First draft of implementation of the STARPU_REDUX mode.
- the starpu_data_set_reduction_methods function specifies which codelets to
use initialize the replicates or to reduce two
- the starpu_data_start_reduction_mode function indicates to StarPU that the
handle will be accessed in redux mode
- the starpu_data_end_reduction_mode function forces StarPU to perform the
actual reduction

NB: starpu_data_start_reduction_mode and starpu_data_end_reduction_mode should
be hidden in the future.

Cédric Augonnet 14 yıl önce
ebeveyn
işleme
8e094a1fec

+ 4 - 2
include/starpu_data.h

@@ -83,9 +83,11 @@ void starpu_data_set_default_sequential_consistency_flag(unsigned flag);
 /* Query the status of the handle on the specified memory node. */
 void starpu_data_query_status(starpu_data_handle handle, int memory_node, int *is_allocated, int *is_valid, int *is_requested);
 
+struct starpu_codelet_t;
+
 void starpu_data_set_reduction_methods(starpu_data_handle handle,
-					void (*redux_func)(void *, void *),
-					void (*init_func)(void *));
+					struct starpu_codelet_t *redux_cl,
+					struct starpu_codelet_t *init_cl);
 
 #ifdef __cplusplus
 }

+ 1 - 0
src/core/dependencies/data_concurrency.c

@@ -191,6 +191,7 @@ static unsigned unlock_one_requester(starpu_data_requester_t r)
 void _starpu_notify_data_dependencies(starpu_data_handle handle)
 {
 	/* A data access has finished so we remove a reference. */
+	STARPU_ASSERT(handle->refcnt > 0);
 	handle->refcnt--;
 
 	while (may_unlock_data_req_list_head(handle))

+ 10 - 1
src/datawizard/coherency.c

@@ -148,6 +148,7 @@ void _starpu_update_data_state(starpu_data_handle handle,
  * 		    else (invalid,owner->shared)
  */
 
+/* This function is called with handle's header lock taken */
 static starpu_data_request_t create_new_request_to_fetch_data(starpu_data_handle handle,
 				struct starpu_data_replicate_s *dst_replicate,
                                 starpu_access_mode mode, unsigned is_prefetch,
@@ -412,6 +413,8 @@ int _starpu_fetch_task_input(struct starpu_task *task, uint32_t mask)
 
 	unsigned local_memory_node = _starpu_get_local_memory_node();
 
+	int workerid = starpu_worker_get_id();
+
 	unsigned index;
 	for (index = 0; index < nbuffers; index++)
 	{
@@ -423,7 +426,6 @@ int _starpu_fetch_task_input(struct starpu_task *task, uint32_t mask)
 
 		if (mode & (STARPU_SCRATCH|STARPU_REDUX))
 		{
-			int workerid = starpu_worker_get_id();
 			local_replicate = &handle->per_worker[workerid];
 		}
 		else {
@@ -436,6 +438,13 @@ int _starpu_fetch_task_input(struct starpu_task *task, uint32_t mask)
 			goto enomem;
 
 		task->interface[index] = local_replicate->interface;
+
+		if (mode & STARPU_REDUX)
+		{
+			/* If the replicate was not initialized yet, we have to do it now */
+			if (!local_replicate->initialized)
+				_starpu_redux_init_data_replicate(handle, local_replicate, workerid);
+		}
 	}
 
 	STARPU_TRACE_END_FETCH_INPUT(NULL);

+ 8 - 2
src/datawizard/coherency.h

@@ -49,6 +49,10 @@ LIST_TYPE(starpu_data_replicate,
 	 * filters. */
 	unsigned relaxed_coherency;
 
+	/* In the case of a SCRATCH access, we need to initialize the replicate
+	 * with a neutral element before using it. */
+	unsigned initialized;
+
 	/* describes the state of the local data in term of coherency */
 	starpu_cache_state	state; 
 
@@ -161,8 +165,8 @@ struct starpu_data_state_t {
 	 * the reduction of an interface into another one (eg. "+="), and init_func
 	 * initializes the data interface to a default value that is stable by
 	 * reduction (eg. 0 for +=). */
-	void (*redux_func)(void *dst_interface, void *src_interface);
-	void (*init_func)(void *interface);
+	struct starpu_codelet_t *redux_cl;
+	struct starpu_codelet_t *init_cl;
 };
 
 void _starpu_display_msi_stats(void);
@@ -197,4 +201,6 @@ int _starpu_prefetch_task_input_on_node(struct starpu_task *task, uint32_t node)
 uint32_t _starpu_select_node_to_handle_request(uint32_t src_node, uint32_t dst_node);
 uint32_t _starpu_select_src_node(struct starpu_data_state_t *state);
 
+void _starpu_redux_init_data_replicate(starpu_data_handle handle, struct starpu_data_replicate_s *replicate, int workerid);
+
 #endif // __COHERENCY__H__

+ 3 - 2
src/datawizard/filters.c

@@ -142,8 +142,8 @@ void starpu_data_partition(starpu_data_handle initial_handle, struct starpu_data
 
 		/* The methods used for reduction are propagated to the
 		 * children. */
-		child->redux_func = initial_handle->redux_func;
-		child->init_func = initial_handle->init_func;
+		child->redux_cl = initial_handle->redux_cl;
+		child->init_cl = initial_handle->init_cl;
 
 		unsigned node;
 		for (node = 0; node < STARPU_MAXNODES; node++)
@@ -181,6 +181,7 @@ void starpu_data_partition(starpu_data_handle initial_handle, struct starpu_data
 			child_replicate->requested = 0;
 			child_replicate->request = NULL;
 			child_replicate->relaxed_coherency = 1;
+			child_replicate->initialized = 0;
 
 			/* duplicate  the content of the interface on node 0 */
 			memcpy(child_replicate->interface, child->per_node[0].interface, child->ops->interface_size);

+ 3 - 2
src/datawizard/interfaces/data_interface.c

@@ -54,8 +54,8 @@ static void _starpu_register_new_data(starpu_data_handle handle,
 	handle->post_sync_tasks_cnt = 0;
 
 	/* By default, there are no methods available to perform a reduction */
-	handle->redux_func = NULL;
-	handle->init_func = NULL;
+	handle->redux_cl = NULL;
+	handle->init_cl = NULL;
 
 #ifdef STARPU_USE_FXT
 	handle->last_submitted_ghost_writer_id_is_valid = 0;
@@ -110,6 +110,7 @@ static void _starpu_register_new_data(starpu_data_handle handle,
 		replicate->requested = 0;
 		replicate->request = NULL;
 		replicate->relaxed_coherency = 1;
+		replicate->initialized = 0;
 		replicate->memory_node = starpu_worker_get_memory_node(worker);
 
 		/* duplicate  the content of the interface on node 0 */

+ 3 - 1
src/datawizard/memalloc.c

@@ -138,7 +138,7 @@ static void transfer_subtree_to_node(starpu_data_handle handle, unsigned src_nod
 			src_replicate->refcnt++;
 			dst_replicate->refcnt++;
 
-			ret = _starpu_driver_copy_data_1_to_1(handle, src_replicate, dst_replicate, 0, NULL, 0);
+			ret = _starpu_driver_copy_data_1_to_1(handle, src_replicate, dst_replicate, 0, NULL, 1);
 			STARPU_ASSERT(ret == 0);
 
 			src_replicate->refcnt--;
@@ -324,9 +324,11 @@ static void reuse_mem_chunk(unsigned node, struct starpu_data_replicate_s *new_r
 	struct starpu_data_replicate_s *old_replicate = mc->replicate;
 	old_replicate->allocated = 0;
 	old_replicate->automatically_allocated = 0;
+	old_replicate->initialized = 0;
 
 	new_replicate->allocated = 1;
 	new_replicate->automatically_allocated = 1;
+	new_replicate->initialized = 0;
 
 	STARPU_ASSERT(new_replicate->interface);
 	STARPU_ASSERT(mc->interface);

+ 123 - 5
src/datawizard/reduction.c

@@ -19,8 +19,8 @@
 #include <datawizard/datawizard.h>
 
 void starpu_data_set_reduction_methods(starpu_data_handle handle,
-					void (*redux_func)(void *, void *),
-					void (*init_func)(void *))
+					struct starpu_codelet_t *redux_cl,
+					struct starpu_codelet_t *init_cl)
 {
 	_starpu_spin_lock(&handle->header_lock);
 
@@ -30,13 +30,131 @@ void starpu_data_set_reduction_methods(starpu_data_handle handle,
 		/* make sure that the flags are applied to the children as well */
 		struct starpu_data_state_t *child_handle = &handle->children[child];
 		if (child_handle->nchildren > 0)
-			starpu_data_set_reduction_methods(child_handle, redux_func, init_func);
+			starpu_data_set_reduction_methods(child_handle, redux_cl, init_cl);
 	}
 
-	handle->redux_func = redux_func;
-	handle->init_func = init_func;
+	handle->redux_cl = redux_cl;
+	handle->init_cl = init_cl;
 
 	_starpu_spin_unlock(&handle->header_lock);
 }
 
+void _starpu_redux_init_data_replicate(starpu_data_handle handle, struct starpu_data_replicate_s *replicate, int workerid)
+{
+	STARPU_ASSERT(replicate);
+	STARPU_ASSERT(replicate->allocated);
+
+	struct starpu_codelet_t *init_cl = handle->init_cl;
+	STARPU_ASSERT(init_cl);
+
+	cl_func init_func = NULL;
+	
+	/* TODO Check that worker may execute the codelet */
+
+	switch (starpu_worker_get_type(workerid)) {
+		case STARPU_CPU_WORKER:
+			init_func = init_cl->cpu_func;
+			break;
+		case STARPU_CUDA_WORKER:
+			init_func = init_cl->cuda_func;
+			break;
+		case STARPU_OPENCL_WORKER:
+			init_func = init_cl->opencl_func;
+			break;
+		default:
+			STARPU_ABORT();
+			break;
+	}
+
+	STARPU_ASSERT(init_func);
+
+	init_func(&replicate->interface, NULL);
+
+	replicate->initialized = 1;
+}
+
+/* Enable reduction mode */
+void starpu_data_start_reduction_mode(starpu_data_handle handle)
+{
+	unsigned worker;
+
+	_starpu_spin_lock(&handle->header_lock);
+
+	for (worker = 0; worker < STARPU_NMAXWORKERS; worker++)
+	{
+		struct starpu_data_replicate_s *replicate;
+		replicate = &handle->per_worker[worker];
+		replicate->initialized = 0;
+	}
+
+	_starpu_spin_unlock(&handle->header_lock);
+}
+
+/* Force reduction */
+void starpu_data_end_reduction_mode(starpu_data_handle handle)
+{
+	unsigned worker;
+
+	_starpu_spin_lock(&handle->header_lock);
+
+	/* Register all valid per-worker replicates */
+	starpu_data_handle tmp_handles[STARPU_NMAXWORKERS];
+
+	for (worker = 0; worker < STARPU_NMAXWORKERS; worker++)
+	{
+		if (handle->per_worker[worker].initialized)
+		{
+			/* Make sure the replicate is not removed */
+			handle->per_worker[worker].refcnt++;
+
+			uint32_t home_node = starpu_worker_get_memory_node(worker); 
+			starpu_data_register(&tmp_handles[worker], home_node, handle->per_worker[worker].interface, handle->ops);
+		}
+		else {
+			tmp_handles[worker] = NULL;
+		}
+	}
+	
+	_starpu_spin_unlock(&handle->header_lock);
+
+	/* Create a set of tasks to perform the reduction */
+	for (worker = 0; worker < STARPU_NMAXWORKERS; worker++)
+	{
+		if (tmp_handles[worker])
+		{
+			struct starpu_task *redux_task = starpu_task_create();
+
+			redux_task->cl = handle->redux_cl;
+			STARPU_ASSERT(redux_task->cl);
+
+			redux_task->buffers[0].handle = handle;
+			redux_task->buffers[0].mode = STARPU_RW;
 
+			redux_task->buffers[1].handle = tmp_handles[worker];
+			redux_task->buffers[1].mode = STARPU_R;
+
+			int ret = starpu_task_submit(redux_task);
+			STARPU_ASSERT(!ret);
+		}
+	}
+
+	/* TODO have a better way to synchronize */
+	starpu_task_wait_for_all();
+
+	_starpu_spin_lock(&handle->header_lock);
+	for (worker = 0; worker < STARPU_NMAXWORKERS; worker++)
+	{
+		struct starpu_data_replicate_s *replicate;
+		replicate = &handle->per_worker[worker];
+		replicate->initialized = 0;
+
+		if (tmp_handles[worker])
+		{
+			starpu_data_unregister_no_coherency(tmp_handles[worker]);
+
+			handle->per_worker[worker].refcnt--;
+			/* TODO put in cache */
+		}
+	}
+	_starpu_spin_unlock(&handle->header_lock);
+}