Browse Source

lock_write sched_ctx during the whole counter update sequence

Olivier Aumage 8 years ago
parent
commit
6991a4d86e

+ 3 - 0
include/starpu_sched_ctx.h

@@ -136,6 +136,7 @@ void starpu_sched_ctx_list_task_counters_reset(unsigned sched_ctx_id, int worker
 void starpu_sched_ctx_list_task_counters_increment_all(struct starpu_task *task, unsigned sched_ctx_id);
 
 void starpu_sched_ctx_list_task_counters_decrement_all(struct starpu_task *task, unsigned sched_ctx_id);
+void starpu_sched_ctx_list_task_counters_decrement_all_ctx_locked(struct starpu_task *task, unsigned sched_ctx_id);
 
 void starpu_sched_ctx_list_task_counters_reset_all(struct starpu_task *task, unsigned sched_ctx_id);
 
@@ -161,8 +162,10 @@ unsigned starpu_sched_ctx_worker_is_master_for_child_ctx(int workerid, unsigned
 unsigned starpu_sched_ctx_master_get_context(int masterid);
 
 void starpu_sched_ctx_revert_task_counters(unsigned sched_ctx_id, double flops);
+void starpu_sched_ctx_revert_task_counters_ctx_locked(unsigned sched_ctx_id, double flops);
 
 void starpu_sched_ctx_move_task_to_ctx(struct starpu_task *task, unsigned sched_ctx, unsigned manage_mutex, unsigned with_repush);
+void starpu_sched_ctx_move_task_to_ctx_locked(struct starpu_task *task, unsigned sched_ctx, unsigned with_repush);
 
 int starpu_sched_ctx_get_worker_rank(unsigned sched_ctx_id);
 

+ 51 - 0
src/core/sched_ctx.c

@@ -1554,6 +1554,12 @@ unsigned _starpu_increment_nready_tasks_of_sched_ctx(unsigned sched_ctx_id, doub
 	return ret;
 }
 
+void _starpu_decrement_nready_tasks_of_sched_ctx_locked(unsigned sched_ctx_id, double ready_flops)
+{
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	_starpu_barrier_counter_decrement_until_empty_counter(&sched_ctx->ready_tasks_barrier, ready_flops);
+}
+
 void _starpu_decrement_nready_tasks_of_sched_ctx(unsigned sched_ctx_id, double ready_flops)
 {
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
@@ -2175,12 +2181,36 @@ struct _starpu_sched_ctx *__starpu_sched_ctx_get_sched_ctx_for_worker_and_job(st
 	return ret;
 }
 
+void starpu_sched_ctx_revert_task_counters_ctx_locked(unsigned sched_ctx_id, double ready_flops)
+{
+        _starpu_decrement_nsubmitted_tasks_of_sched_ctx(sched_ctx_id);
+        _starpu_decrement_nready_tasks_of_sched_ctx_locked(sched_ctx_id, ready_flops);
+}
+
 void starpu_sched_ctx_revert_task_counters(unsigned sched_ctx_id, double ready_flops)
 {
         _starpu_decrement_nsubmitted_tasks_of_sched_ctx(sched_ctx_id);
         _starpu_decrement_nready_tasks_of_sched_ctx(sched_ctx_id, ready_flops);
 }
 
+void starpu_sched_ctx_move_task_to_ctx_locked(struct starpu_task *task, unsigned sched_ctx, unsigned with_repush)
+{
+	/* TODO: make something cleaner which differentiates between calls
+	   from push or pop (have mutex or not) and from another worker or not */
+	int workerid = starpu_worker_get_id();
+
+	task->sched_ctx = sched_ctx;
+
+	struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
+
+	_starpu_increment_nsubmitted_tasks_of_sched_ctx(j->task->sched_ctx);
+
+	if(with_repush)
+		_starpu_repush_task(j);
+	else
+		_starpu_increment_nready_tasks_of_sched_ctx(j->task->sched_ctx, j->task->flops, j->task);
+}
+
 void starpu_sched_ctx_move_task_to_ctx(struct starpu_task *task, unsigned sched_ctx, unsigned manage_mutex, 
 				       unsigned with_repush)
 {
@@ -2259,6 +2289,27 @@ void starpu_sched_ctx_list_task_counters_increment_all(struct starpu_task *task,
 	}
 }
 
+void starpu_sched_ctx_list_task_counters_decrement_all_ctx_locked(struct starpu_task *task, unsigned sched_ctx_id)
+{
+	if (_starpu_get_nsched_ctxs() > 1)
+	{
+		struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
+		struct starpu_sched_ctx_iterator it;
+		workers->init_iterator_for_parallel_tasks(workers, &it, task);
+		while(workers->has_next(workers, &it))
+		{
+			int workerid = workers->get_next(workers, &it);
+			struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
+			if (worker->nsched_ctxs > 1)
+			{
+				_starpu_worker_lock_for_observation(workerid);
+				starpu_sched_ctx_list_task_counters_decrement(sched_ctx_id, workerid);
+				_starpu_worker_unlock_for_observation(workerid);
+			}
+		}
+	}
+}
+
 void starpu_sched_ctx_list_task_counters_decrement_all(struct starpu_task *task, unsigned sched_ctx_id)
 {
 	if (_starpu_get_nsched_ctxs() > 1)

+ 2 - 0
src/drivers/driver_common/driver_common.c

@@ -76,6 +76,7 @@ void _starpu_driver_start_job(struct _starpu_worker *worker, struct _starpu_job
 	struct _starpu_sched_ctx *sched_ctx = _starpu_sched_ctx_get_sched_ctx_for_worker_and_job(worker, j);
 	if(!sched_ctx)
 		sched_ctx = _starpu_get_sched_ctx_struct(j->task->sched_ctx);
+	_starpu_sched_ctx_lock_read(sched_ctx->id);
 	if(!sched_ctx->sched_policy)
 	{
 		if(!sched_ctx->awake_workers && sched_ctx->main_master == worker->workerid)
@@ -101,6 +102,7 @@ void _starpu_driver_start_job(struct _starpu_worker *worker, struct _starpu_job
 	}
 	else
 		_STARPU_TRACE_START_CODELET_BODY(j, j->nimpl, perf_arch, workerid);
+	_starpu_sched_ctx_unlock_read(sched_ctx->id);
 	_STARPU_TASK_BREAK_ON(task, exec);
 }
 

+ 5 - 3
src/sched_policies/eager_central_policy.c

@@ -225,15 +225,17 @@ static struct starpu_task *pop_task_eager_policy(unsigned sched_ctx_id)
 	leave_busy_state(data);
 	if(chosen_task)
 	{
-		starpu_sched_ctx_list_task_counters_decrement_all(chosen_task, sched_ctx_id);
+		_starpu_sched_ctx_lock_write(sched_ctx_id);
+		starpu_sched_ctx_list_task_counters_decrement_all_ctx_locked(chosen_task, sched_ctx_id);
 
 		unsigned child_sched_ctx = starpu_sched_ctx_worker_is_master_for_child_ctx(workerid, sched_ctx_id);
 		if(child_sched_ctx != STARPU_NMAX_SCHED_CTXS)
 		{
-			starpu_sched_ctx_move_task_to_ctx(chosen_task, child_sched_ctx, 0, 1);
-			starpu_sched_ctx_revert_task_counters(sched_ctx_id, chosen_task->flops);
+			starpu_sched_ctx_move_task_to_ctx_locked(chosen_task, child_sched_ctx, 1);
+			starpu_sched_ctx_revert_task_counters_ctx_locked(sched_ctx_id, chosen_task->flops);
 			chosen_task = NULL;
 		}
+		_starpu_sched_ctx_unlock_write(sched_ctx_id);
 	}
 	_starpu_worker_leave_section_safe_for_observation();
 

+ 5 - 3
src/sched_policies/eager_central_priority_policy.c

@@ -307,15 +307,17 @@ static struct starpu_task *_starpu_priority_pop_task(unsigned sched_ctx_id)
 
 	if(chosen_task)
 	{
-		starpu_sched_ctx_list_task_counters_decrement_all(chosen_task, sched_ctx_id);
+		_starpu_sched_ctx_lock_write(sched_ctx_id);
+		starpu_sched_ctx_list_task_counters_decrement_all_ctx_locked(chosen_task, sched_ctx_id);
 
                 unsigned child_sched_ctx = starpu_sched_ctx_worker_is_master_for_child_ctx(workerid, sched_ctx_id);
 		if(child_sched_ctx != STARPU_NMAX_SCHED_CTXS)
 		{
-			starpu_sched_ctx_move_task_to_ctx(chosen_task, child_sched_ctx, 0, 1);
-			starpu_sched_ctx_revert_task_counters(sched_ctx_id, chosen_task->flops);
+			starpu_sched_ctx_move_task_to_ctx_locked(chosen_task, child_sched_ctx, 1);
+			starpu_sched_ctx_revert_task_counters_ctx_locked(sched_ctx_id, chosen_task->flops);
 			chosen_task = NULL;
 		}
+		_starpu_sched_ctx_unlock_write(sched_ctx_id);
 	}
 	_starpu_worker_leave_section_safe_for_observation();