浏览代码

mark _starpu_handle_job_termination has a sched_op to protect against sched_ctx changes

Olivier Aumage 8 年之前
父节点
当前提交
c366c7fe62
共有 3 个文件被更改,包括 40 次插入5 次删除
  1. 24 4
      src/core/jobs.c
  2. 15 1
      src/core/sched_ctx.c
  3. 1 0
      src/core/workers.h

+ 24 - 4
src/core/jobs.c

@@ -262,6 +262,18 @@ void _starpu_handle_job_submission(struct _starpu_job *j)
 
 void _starpu_handle_job_termination(struct _starpu_job *j)
 {
+	struct _starpu_worker *worker = _starpu_get_local_worker_key();
+	int need_leave_sched_op = 0;
+	if (worker)
+	{
+		STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
+		if (!worker->state_sched_op_pending)
+		{
+			_starpu_worker_enter_sched_op(worker);
+			need_leave_sched_op = 1;
+		}
+		STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
+	}
 	struct starpu_task *task = j->task;
 	unsigned sched_ctx = task->sched_ctx;
 	double flops = task->flops;
@@ -508,13 +520,17 @@ void _starpu_handle_job_termination(struct _starpu_job *j)
 		}
 	}
 
-	_starpu_decrement_nready_tasks_of_sched_ctx(sched_ctx, flops);
-	_starpu_decrement_nsubmitted_tasks_of_sched_ctx(sched_ctx);
-	struct _starpu_worker *worker;
-	worker = _starpu_get_local_worker_key();
 	if (worker)
 	{
+		_starpu_worker_relax_on();
+		_starpu_sched_ctx_lock_write(sched_ctx);
+		_starpu_worker_relax_off();
+		starpu_sched_ctx_revert_task_counters_ctx_locked(sched_ctx, flops);
+		_starpu_sched_ctx_unlock_write(sched_ctx);
+
 		STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
+		if (need_leave_sched_op)
+			_starpu_worker_leave_sched_op(worker);
 
 		if(worker->removed_from_ctx[sched_ctx] == 1 && worker->shares_tasks_lists[sched_ctx] == 1)
 		{
@@ -523,6 +539,10 @@ void _starpu_handle_job_termination(struct _starpu_job *j)
 		}
 		STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
 	}
+	else
+	{
+		starpu_sched_ctx_revert_task_counters(sched_ctx, flops);
+	}
 }
 
 /* This function is called when a new task is submitted to StarPU

+ 15 - 1
src/core/sched_ctx.c

@@ -2720,7 +2720,6 @@ void _starpu_worker_apply_deferred_ctx_changes(void)
 	{
 		struct _starpu_ctx_change *chg = _starpu_ctx_change_list_pop_front(l);
 		STARPU_ASSERT(chg->workerids_to_change != NULL);
-		_starpu_sched_ctx_lock_write(chg->sched_ctx_id);
 
 		if (chg->nworkers_to_notify)
 		{
@@ -2732,6 +2731,7 @@ void _starpu_worker_apply_deferred_ctx_changes(void)
 			STARPU_ASSERT(chg->workerids_to_notify == NULL);
 			notify_workers_about_changing_ctx_pending(chg->nworkers_to_change, chg->workerids_to_change);
 		}
+		_starpu_sched_ctx_lock_write(chg->sched_ctx_id);
 		switch (chg->op)
 		{
 			case ctx_change_add:
@@ -2742,6 +2742,20 @@ void _starpu_worker_apply_deferred_ctx_changes(void)
 			case ctx_change_remove:
 			{
 				remove_notified_workers(chg->workerids_to_change, chg->nworkers_to_change, chg->sched_ctx_id);
+				{
+					int i;
+					for (i = 0; i < chg->nworkers_to_change; i++)
+					{
+						struct _starpu_worker *w =
+							_starpu_get_worker_struct(chg->workerids_to_change[i]);
+						if(w->removed_from_ctx[chg->sched_ctx_id] == 1
+								&& w->shares_tasks_lists[chg->sched_ctx_id] == 1)
+						{
+							_starpu_worker_gets_out_of_ctx(chg->sched_ctx_id, w);
+							w->removed_from_ctx[chg->sched_ctx_id] = 0;
+						}
+					}
+				}
 			}
 			break;
 			default:

+ 1 - 0
src/core/workers.h

@@ -806,6 +806,7 @@ static inline void  _starpu_worker_leave_sched_op(struct _starpu_worker * const
 {
 	worker->state_safe_for_observation = 1;
 	worker->state_sched_op_pending = 0;
+	STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
 	_starpu_worker_apply_deferred_ctx_changes();
 }