Prechádzať zdrojové kódy

When a data handle is accessed in a R/W mode for the first time after it was
used in reduction mode, StarPU automatically launch tasks to perform the actual
reduction of the handle. This is implemented by "freezing" the tasks until the
reduction has been performed.
It is therefore not required to call starpu_data_end_reduction_mode anymore,
and to perform an explicit barrier anymore.

Cédric Augonnet 14 rokov pred
rodič
commit
e4ed360572

+ 0 - 4
examples/dot_product/dot_product.c

@@ -211,10 +211,6 @@ int main(int argc, char **argv)
 		STARPU_ASSERT(!ret);
 	}
 
-	starpu_task_wait_for_all();
-
-	starpu_data_end_reduction_mode(dot_handle);
-
 	starpu_data_unregister(dot_handle);
 
 	fprintf(stderr, "Reference : %e vs. %e (Delta %e)\n", reference_dot, dot, reference_dot - dot);

+ 6 - 5
examples/pi/pi_redux.c

@@ -256,23 +256,24 @@ int main(int argc, char **argv)
 		STARPU_ASSERT(!ret);
 	}
 
-	starpu_task_wait_for_all();
-	starpu_data_end_reduction_mode(shot_cnt_handle);
+	starpu_data_acquire(shot_cnt_handle, STARPU_R);
 
 	gettimeofday(&end, NULL);
-
 	double timing = (double)((end.tv_sec - start.tv_sec)*1000000 + (end.tv_usec - start.tv_usec));
-	starpu_data_unregister(shot_cnt_handle);
-
 	/* Total surface : Pi * r^ 2 = Pi*1^2, total square surface : 2^2 = 4,
 	 * probability to impact the disk: pi/4 */
 	unsigned long total = ntasks*NSHOT_PER_TASK;
 	double pi_approx = ((double)shot_cnt*4.0)/total;
+
+	starpu_data_release(shot_cnt_handle);
+
+
 	fprintf(stderr, "Pi approximation : %lf (%ld / %ld)\n", pi_approx, shot_cnt, total);
 	fprintf(stderr, "Error %le \n", pi_approx - PI);
 	fprintf(stderr, "Total time : %f ms\n", timing/1000.0);
 	fprintf(stderr, "Speed : %f GShot/s\n", total/(1e3*timing));
 
+	starpu_data_unregister(shot_cnt_handle);
 	starpu_shutdown();
 
 	if (abs(pi_approx - PI) > 1.0)

+ 130 - 46
src/core/dependencies/data_concurrency.c

@@ -30,10 +30,11 @@
  */
 
 /* the header lock must be taken by the caller */
-static unsigned may_unlock_data_req_list_head(starpu_data_handle handle)
+static unsigned may_unlock_data_req_list_head(starpu_data_handle handle,
+					starpu_data_requester_list_t req_list)
 {
 	/* if there is no one to unlock ... */
-	if (starpu_data_requester_list_empty(handle->req_list))
+	if (starpu_data_requester_list_empty(req_list))
 		return 0;
 
 	/* if there is no reference to the data anymore, we can use it */
@@ -44,7 +45,7 @@ static unsigned may_unlock_data_req_list_head(starpu_data_handle handle)
 		return 0;
 
 	/* data->current_mode == STARPU_R, so we can process more readers */
-	starpu_data_requester_t r = starpu_data_requester_list_front(handle->req_list);
+	starpu_data_requester_t r = starpu_data_requester_list_front(req_list);
 
 	starpu_access_mode r_mode = r->mode;
 	if (r_mode == STARPU_RW)
@@ -65,8 +66,6 @@ static unsigned _starpu_attempt_to_submit_data_request(unsigned request_from_cod
 					void (*callback)(void *), void *argcb,
 					starpu_job_t j, unsigned buffer_index)
 {
-	unsigned ret;
-
 	if (mode == STARPU_RW)
 		mode = STARPU_W;
 
@@ -82,23 +81,46 @@ static unsigned _starpu_attempt_to_submit_data_request(unsigned request_from_cod
 		_starpu_spin_lock(&handle->header_lock);
 	}
 
+	/* If we have a request that is not used for the reduction, and that a
+	 * reduction is pending, we put it at the end of normal list, and we
+	 * use the reduction_req_list instead */
+	unsigned pending_reduction = (handle->reduction_refcnt > 0);
+	unsigned frozen = 0;
+
+	/* If we are currently performing a reduction, we freeze any request
+	 * that is not explicitely a reduction task. */
+	unsigned is_a_reduction_task = (request_from_codelet && j->reduction_task);
+
+	if (pending_reduction && !is_a_reduction_task)
+		frozen = 1;
+
 	/* If there is currently nobody accessing the piece of data, or it's
 	 * not another writter and if this is the same type of access as the
 	 * current one, we can proceed. */
-	if ((handle->refcnt == 0) || (!(mode == STARPU_W) && (handle->current_mode == mode)))
-	{
-		handle->refcnt++;
+	unsigned put_in_list = 1;
 
-		starpu_access_mode previous_mode = handle->current_mode;
-		handle->current_mode = mode;
+	starpu_access_mode previous_mode = handle->current_mode;
 
-		if ((mode == STARPU_REDUX) && (previous_mode != STARPU_REDUX))
-			starpu_data_start_reduction_mode(handle);
+	if (!frozen && ((handle->refcnt == 0) || (!(mode == STARPU_W) && (handle->current_mode == mode))))
+	{
+		/* Detect whether this is the end of a reduction phase */
+			/* We don't want to start multiple reductions of the
+			 * same handle at the same time ! */
 
-		/* success */
-		ret = 0;
+		if ((handle->reduction_refcnt == 0) && (previous_mode == STARPU_REDUX) && (mode != STARPU_REDUX))
+		{
+			starpu_data_end_reduction_mode(handle);
+
+			/* Since we need to perform a mode change, we freeze
+			 * the request if needed. */
+			put_in_list = (handle->reduction_refcnt > 0);
+		}
+		else {
+			put_in_list = 0;
+		}
 	}
-	else
+
+	if (put_in_list)
 	{
 		/* there cannot be multiple writers or a new writer
 		 * while the data is in read mode */
@@ -112,14 +134,29 @@ static unsigned _starpu_attempt_to_submit_data_request(unsigned request_from_cod
 			r->ready_data_callback = callback;
 			r->argcb = argcb;
 
-		starpu_data_requester_list_push_back(handle->req_list, r);
+		/* We put the requester in a specific list if this is a reduction task */
+		starpu_data_requester_list_t req_list =
+			is_a_reduction_task?handle->reduction_req_list:handle->req_list;
+
+		starpu_data_requester_list_push_back(req_list, r);
 
 		/* failed */
-		ret = 1;
+		put_in_list = 1;
+	}
+	else {
+		handle->refcnt++;
+
+		handle->current_mode = mode;
+
+		if ((mode == STARPU_REDUX) && (previous_mode != STARPU_REDUX))
+			starpu_data_start_reduction_mode(handle);
+
+		/* success */
+		put_in_list = 0;
 	}
 
 	_starpu_spin_unlock(&handle->header_lock);
-	return ret;
+	return put_in_list;
 
 }
 
@@ -199,47 +236,94 @@ void _starpu_notify_data_dependencies(starpu_data_handle handle)
 	STARPU_ASSERT(handle->refcnt > 0);
 	handle->refcnt--;
 
-	while (may_unlock_data_req_list_head(handle))
+	/* The handle has been destroyed in between (eg. this was a temporary
+	 * handle created for a reduction.) */
+	if (handle->lazy_unregister && handle->refcnt == 0)
+	{
+		starpu_data_unregister_no_coherency(handle);
+		/* Warning: in case we unregister the handle, we must be sure
+		 * that the application will not try to unlock the header after
+		 * !*/
+		return;
+	}
+
+	/* In case there is a pending reduction, and that this is the last
+	 * requester, we may go back to a "normal" coherency model. */
+	if (handle->reduction_refcnt > 0)
+	{
+		//fprintf(stderr, "NOTIFY REDUCTION TASK RED REFCNT %d\n", handle->reduction_refcnt);
+		handle->reduction_refcnt--;
+		if (handle->reduction_refcnt == 0)
+			starpu_data_end_reduction_mode_terminate(handle);
+	}
+
+	starpu_data_requester_list_t req_list =
+		(handle->reduction_refcnt > 0)?handle->reduction_req_list:handle->req_list;
+
+	while (may_unlock_data_req_list_head(handle, req_list))
 	{
 		/* Grab the head of the requester list and unlock it. */
-		starpu_data_requester_t r = starpu_data_requester_list_pop_front(handle->req_list);
+		starpu_data_requester_t r = starpu_data_requester_list_pop_front(req_list);
 
-		/* The data is now attributed to that request so we put a
-		 * reference on it. */
-		handle->refcnt++;
-	
 		/* STARPU_RW accesses are treated as STARPU_W */
 		starpu_access_mode r_mode = r->mode;
 		if (r_mode == STARPU_RW)
 			r_mode = STARPU_W;
 
-		starpu_access_mode previous_mode = handle->current_mode;
-		handle->current_mode = r_mode;
-
-		/* In case we enter in a reduction mode, we invalidate all per
-		 * worker replicates. Note that the "per_node" replicates are
-		 * kept intact because we'll reduce a valid copy of the
-		 * "per-node replicate" with the per-worker replicates .*/
-		if ((r_mode == STARPU_REDUX) && (previous_mode != STARPU_REDUX))
-			starpu_data_start_reduction_mode(handle);
-
-		_starpu_spin_unlock(&handle->header_lock);
-
-		if (r->is_requested_by_codelet)
+		int put_in_list = 1;
+		if ((handle->reduction_refcnt == 0) && (handle->current_mode == STARPU_REDUX) && (r_mode != STARPU_REDUX))
 		{
-			if (!unlock_one_requester(r))
-				_starpu_push_task(r->j, 0);
+			starpu_data_end_reduction_mode(handle);
+
+			/* Since we need to perform a mode change, we freeze
+			 * the request if needed. */
+			put_in_list = (handle->reduction_refcnt > 0);
 		}
-		else
+		else {
+			put_in_list = 0;
+		}
+
+		if (put_in_list)
 		{
-			STARPU_ASSERT(r->ready_data_callback);
+			/* We need to put the request back because we must
+			 * perform a reduction before. */
+			starpu_data_requester_list_push_front(req_list, r);
 
-			/* execute the callback associated with the data requester */
-			r->ready_data_callback(r->argcb);
+			req_list = handle->reduction_req_list;
 		}
-
-		starpu_data_requester_delete(r);
+		else {
+			/* The data is now attributed to that request so we put a
+			 * reference on it. */
+			handle->refcnt++;
 		
-		_starpu_spin_lock(&handle->header_lock);
+			starpu_access_mode previous_mode = handle->current_mode;
+			handle->current_mode = r_mode;
+
+			/* In case we enter in a reduction mode, we invalidate all per
+			 * worker replicates. Note that the "per_node" replicates are
+			 * kept intact because we'll reduce a valid copy of the
+			 * "per-node replicate" with the per-worker replicates .*/
+			if ((r_mode == STARPU_REDUX) && (previous_mode != STARPU_REDUX))
+				starpu_data_start_reduction_mode(handle);
+
+			_starpu_spin_unlock(&handle->header_lock);
+
+			if (r->is_requested_by_codelet)
+			{
+				if (!unlock_one_requester(r))
+					_starpu_push_task(r->j, 0);
+			}
+			else
+			{
+				STARPU_ASSERT(r->ready_data_callback);
+
+				/* execute the callback associated with the data requester */
+				r->ready_data_callback(r->argcb);
+			}
+
+			starpu_data_requester_delete(r);
+			
+			_starpu_spin_lock(&handle->header_lock);
+		}
 	}
 }

+ 6 - 0
src/core/dependencies/implicit_data_deps.c

@@ -214,6 +214,12 @@ void _starpu_detect_implicit_data_deps(struct starpu_task *task)
 	STARPU_ASSERT(task->cl);
         _STARPU_LOG_IN();
 
+	/* We don't want to enforce a sequential consistency for tasks that are
+	 * not visible to the application. */
+	starpu_job_t j = _starpu_get_job_associated_to_task(task);
+	if (j->reduction_task)
+		return;
+
 	unsigned nbuffers = task->cl->nbuffers;
 
 	unsigned buffer;

+ 2 - 0
src/core/jobs.c

@@ -75,6 +75,8 @@ starpu_job_t __attribute__((malloc)) _starpu_job_create(struct starpu_task *task
 #endif
 	job->exclude_from_dag = 0;
 
+	job->reduction_task = 0;
+
 	_starpu_cg_list_init(&job->job_successors);
 
 	PTHREAD_MUTEX_INIT(&job->sync_mutex, NULL);

+ 6 - 0
src/core/jobs.h

@@ -94,6 +94,12 @@ LIST_TYPE(starpu_job,
 	/* Each job is attributed a unique id. */
 	unsigned long job_id;
 
+	/* During the reduction of a handle, StarPU may have to submit tasks to
+	 * perform the reduction itself: those task should not be stalled while
+	 * other tasks are blocked until the handle has been properly reduced,
+	 * so we need a flag to differentiate them from "normal" tasks. */
+	unsigned reduction_task;
+
 #ifdef STARPU_USE_FXT
 	/* A symbol name may be associated to the job directly for debug
 	 * purposes (for instance if the codelet is NULL). */

+ 15 - 2
src/datawizard/coherency.c

@@ -365,9 +365,15 @@ void _starpu_release_data_on_node(starpu_data_handle handle, uint32_t default_wt
 
 	STARPU_ASSERT(replicate->refcnt >= 0);
 
+	/* In case there was a temporary handle (eg. used for reduction), this
+	 * handle may have requested to be destroyed when the data is released
+	 * */
+	unsigned handle_was_destroyed = handle->lazy_unregister;
+
 	_starpu_notify_data_dependencies(handle);
 
-	_starpu_spin_unlock(&handle->header_lock);
+	if (!handle_was_destroyed)
+		_starpu_spin_unlock(&handle->header_lock);
 }
 
 static void _starpu_set_data_requested_flag_if_needed(struct starpu_data_replicate_s *replicate)
@@ -486,8 +492,15 @@ void _starpu_push_task_output(struct starpu_task *task, uint32_t mask)
 			replicate = &handle->per_worker[workerid];
 		}
 
+		/* In case there was a temporary handle (eg. used for
+		 * reduction), this handle may have requested to be destroyed
+		 * when the data is released
+		 * */
+		unsigned handle_was_destroyed = handle->lazy_unregister;
+
 		_starpu_release_data_on_node(handle, mask, replicate);
-		_starpu_release_data_enforce_sequential_consistency(task, handle);
+		if (!handle_was_destroyed)
+			_starpu_release_data_enforce_sequential_consistency(task, handle);
 	}
 
 	STARPU_TRACE_END_PUSH_OUTPUT(NULL);

+ 19 - 0
src/datawizard/coherency.h

@@ -161,12 +161,30 @@ struct starpu_data_state_t {
 	struct starpu_task_wrapper_list *post_sync_tasks;
 	unsigned post_sync_tasks_cnt;
 
+	/*
+	 *	Reductions
+	 */
+
 	/* During reduction we need some specific methods: redux_func performs
 	 * the reduction of an interface into another one (eg. "+="), and init_func
 	 * initializes the data interface to a default value that is stable by
 	 * reduction (eg. 0 for +=). */
 	struct starpu_codelet_t *redux_cl;
 	struct starpu_codelet_t *init_cl;
+
+	/* Are we currently performing a reduction on that handle ? If so the
+	 * reduction_refcnt should be non null until there are pending tasks
+	 * that are performing the reduction. */
+	unsigned reduction_refcnt;
+
+	/* List of requesters that are specific to the pending reduction. This
+	 * list is used when the requests in the req_list list are frozen until
+	 * the end of the reduction. */
+	struct starpu_data_requester_list_s *reduction_req_list;
+
+	starpu_data_handle reduction_tmp_handles[STARPU_NMAXWORKERS];
+
+	unsigned lazy_unregister;
 };
 
 void _starpu_display_msi_stats(void);
@@ -204,5 +222,6 @@ uint32_t _starpu_select_src_node(struct starpu_data_state_t *state);
 void _starpu_redux_init_data_replicate(starpu_data_handle handle, struct starpu_data_replicate_s *replicate, int workerid);
 void starpu_data_start_reduction_mode(starpu_data_handle handle);
 void starpu_data_end_reduction_mode(starpu_data_handle handle);
+void starpu_data_end_reduction_mode_terminate(starpu_data_handle handle);
 
 #endif // __COHERENCY__H__

+ 11 - 3
src/datawizard/interfaces/data_interface.c

@@ -57,6 +57,9 @@ static void _starpu_register_new_data(starpu_data_handle handle,
 	handle->redux_cl = NULL;
 	handle->init_cl = NULL;
 
+	handle->reduction_refcnt = 0;
+	handle->reduction_req_list = starpu_data_requester_list_new();
+
 #ifdef STARPU_USE_FXT
 	handle->last_submitted_ghost_writer_id_is_valid = 0;
 	handle->last_submitted_ghost_writer_id = 0;
@@ -224,11 +227,11 @@ static void _starpu_data_unregister(starpu_data_handle handle, unsigned coherent
 {
 	STARPU_ASSERT(handle);
 
-	/* If sequential consistency is enabled, wait until data is available */
-	_starpu_data_wait_until_available(handle, STARPU_RW);
-
 	if (coherent)
 	{
+		/* If sequential consistency is enabled, wait until data is available */
+		_starpu_data_wait_until_available(handle, STARPU_RW);
+
 		/* Fetch data in the home of the data to ensure we have a valid copy
 		 * where we registered it */
 		int home_node = handle->home_node; 
@@ -257,6 +260,11 @@ static void _starpu_data_unregister(starpu_data_handle handle, unsigned coherent
 			}
 		}
 	}
+	else {
+		/* Should we postpone the unregister operation ? */
+		if ((handle->refcnt > 0) && handle->lazy_unregister)
+			return;
+	}
 
 	_starpu_data_free_interfaces(handle);
 

+ 34 - 15
src/datawizard/reduction.c

@@ -16,6 +16,7 @@
 
 #include <starpu.h>
 #include <common/utils.h>
+#include <core/task.h>
 #include <datawizard/datawizard.h>
 
 void starpu_data_set_reduction_methods(starpu_data_handle handle,
@@ -77,6 +78,8 @@ void _starpu_redux_init_data_replicate(starpu_data_handle handle, struct starpu_
  * taken. */
 void starpu_data_start_reduction_mode(starpu_data_handle handle)
 {
+	STARPU_ASSERT(handle->reduction_refcnt == 0);
+
 	unsigned worker;
 
 	for (worker = 0; worker < STARPU_NMAXWORKERS; worker++)
@@ -87,16 +90,14 @@ void starpu_data_start_reduction_mode(starpu_data_handle handle)
 	}
 }
 
-/* Force reduction */
+/* Force reduction. The lock should already have been taken.  */
 void starpu_data_end_reduction_mode(starpu_data_handle handle)
 {
 	unsigned worker;
 
-	_starpu_spin_lock(&handle->header_lock);
+	handle->reduction_refcnt = 0;
 
 	/* Register all valid per-worker replicates */
-	starpu_data_handle tmp_handles[STARPU_NMAXWORKERS];
-
 	for (worker = 0; worker < STARPU_NMAXWORKERS; worker++)
 	{
 		if (handle->per_worker[worker].initialized)
@@ -105,29 +106,43 @@ void starpu_data_end_reduction_mode(starpu_data_handle handle)
 			handle->per_worker[worker].refcnt++;
 
 			uint32_t home_node = starpu_worker_get_memory_node(worker); 
-			starpu_data_register(&tmp_handles[worker], home_node, handle->per_worker[worker].interface, handle->ops);
+			starpu_data_register(&handle->reduction_tmp_handles[worker],
+				home_node, handle->per_worker[worker].interface, handle->ops);
+
+			/* We know that in this reduction algorithm there is exactly one task per valid replicate. */
+			handle->reduction_refcnt++;
 		}
 		else {
-			tmp_handles[worker] = NULL;
+			handle->reduction_tmp_handles[worker] = NULL;
 		}
 	}
+
+//	fprintf(stderr, "REDUX REFCNT = %d\n", handle->reduction_refcnt);
 	
+	/* Temporarily unlock the handle */
 	_starpu_spin_unlock(&handle->header_lock);
 
 	/* Create a set of tasks to perform the reduction */
 	for (worker = 0; worker < STARPU_NMAXWORKERS; worker++)
 	{
-		if (tmp_handles[worker])
+		if (handle->reduction_tmp_handles[worker])
 		{
 			struct starpu_task *redux_task = starpu_task_create();
 
+			/* Mark these tasks so that StarPU does not block them
+			 * when they try to access the handle (normal tasks are
+			 * data requests to that handle are frozen until the
+			 * data is coherent again). */
+			starpu_job_t j = _starpu_get_job_associated_to_task(redux_task);
+			j->reduction_task = 1;
+
 			redux_task->cl = handle->redux_cl;
 			STARPU_ASSERT(redux_task->cl);
 
 			redux_task->buffers[0].handle = handle;
 			redux_task->buffers[0].mode = STARPU_RW;
 
-			redux_task->buffers[1].handle = tmp_handles[worker];
+			redux_task->buffers[1].handle = handle->reduction_tmp_handles[worker];
 			redux_task->buffers[1].mode = STARPU_R;
 
 			int ret = starpu_task_submit(redux_task);
@@ -135,23 +150,27 @@ void starpu_data_end_reduction_mode(starpu_data_handle handle)
 		}
 	}
 
-	/* TODO have a better way to synchronize */
-	starpu_task_wait_for_all();
-
+	/* Get the header lock back */
 	_starpu_spin_lock(&handle->header_lock);
+}
+
+void starpu_data_end_reduction_mode_terminate(starpu_data_handle handle)
+{
+//	fprintf(stderr, "starpu_data_end_reduction_mode_terminate\n");
+	unsigned worker;
 	for (worker = 0; worker < STARPU_NMAXWORKERS; worker++)
 	{
 		struct starpu_data_replicate_s *replicate;
 		replicate = &handle->per_worker[worker];
 		replicate->initialized = 0;
 
-		if (tmp_handles[worker])
+		if (handle->reduction_tmp_handles[worker])
 		{
-			starpu_data_unregister_no_coherency(tmp_handles[worker]);
-
+//			fprintf(stderr, "unregister handle %p\n", handle);
+			handle->reduction_tmp_handles[worker]->lazy_unregister = 1;
+			starpu_data_unregister_no_coherency(handle->reduction_tmp_handles[worker]);
 			handle->per_worker[worker].refcnt--;
 			/* TODO put in cache */
 		}
 	}
-	_starpu_spin_unlock(&handle->header_lock);
 }