Browse Source

Ensure that (L)WS schedulers still work in locality modes with correct results.

Terry Cojean 9 years ago
parent
commit
d9ed02bb48
1 changed files with 13 additions and 11 deletions
  1. 13 11
      src/sched_policies/work_stealing_policy.c

+ 13 - 11
src/sched_policies/work_stealing_policy.c

@@ -530,23 +530,25 @@ static struct starpu_task *ws_pop_task(unsigned sched_ctx_id)
 
 	STARPU_PTHREAD_MUTEX_LOCK(&ws->per_worker[workerid].worker_mutex);
 	task = ws_pick_task(workerid, workerid, sched_ctx_id);
+	if (task)
+		locality_popped_task(task, workerid, sched_ctx_id);
 	STARPU_PTHREAD_MUTEX_UNLOCK(&ws->per_worker[workerid].worker_mutex);
 	if (task)
 	{
-		locality_popped_task(task, workerid, sched_ctx_id);
 		/* there was a local task */
 		return task;
 	}
 
+
 	/* we need to steal someone's job */
 	unsigned victim = ws->select_victim(sched_ctx_id, workerid);
 
+	STARPU_PTHREAD_MUTEX_LOCK(&ws->per_worker[victim].worker_mutex);
 	if (ws->per_worker[victim].queue_array != NULL && ws->per_worker[victim].queue_array->ntasks > 0)
 	{
-		STARPU_PTHREAD_MUTEX_LOCK(&ws->per_worker[victim].worker_mutex);
 		task = ws_pick_task(victim, workerid, sched_ctx_id);
-		STARPU_PTHREAD_MUTEX_UNLOCK(&ws->per_worker[victim].worker_mutex);
 	}
+
 	if (task)
 	{
 		_STARPU_TRACE_WORK_STEALING(workerid, victim);
@@ -554,19 +556,20 @@ static struct starpu_task *ws_pop_task(unsigned sched_ctx_id)
 		record_worker_locality(task, workerid, sched_ctx_id);
 		locality_popped_task(task, victim, sched_ctx_id);
 	}
+	STARPU_PTHREAD_MUTEX_UNLOCK(&ws->per_worker[victim].worker_mutex);
 
 	if(!task)
 	{
+		STARPU_PTHREAD_MUTEX_LOCK(&ws->per_worker[workerid].worker_mutex);
 		if (ws->per_worker[workerid].queue_array != NULL && ws->per_worker[workerid].queue_array->ntasks > 0)
-		{
-			STARPU_PTHREAD_MUTEX_LOCK(&ws->per_worker[workerid].worker_mutex);
 			task = ws_pick_task(workerid, workerid, sched_ctx_id);
-			STARPU_PTHREAD_MUTEX_UNLOCK(&ws->per_worker[workerid].worker_mutex);
-		}
 
 		if (task)
-		{
 			locality_popped_task(task, workerid, sched_ctx_id);
+		STARPU_PTHREAD_MUTEX_UNLOCK(&ws->per_worker[workerid].worker_mutex);
+
+		if (task)
+		{
 			/* there was a local task */
 			return task;
 		}
@@ -598,8 +601,6 @@ int ws_push_task(struct starpu_task *task)
 	starpu_pthread_cond_t *sched_cond;
 	starpu_worker_get_sched_condition(workerid, &sched_mutex, &sched_cond);
 	STARPU_PTHREAD_MUTEX_LOCK_SCHED(sched_mutex);
-	record_data_locality(task, workerid);
-
 #ifdef HAVE_AYUDAME_H
 	struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
 	if (AYU_event)
@@ -610,9 +611,10 @@ int ws_push_task(struct starpu_task *task)
 #endif
 
 	STARPU_PTHREAD_MUTEX_LOCK(&ws->per_worker[workerid].worker_mutex);
+	record_data_locality(task, workerid);
 	_starpu_fifo_push_task(ws->per_worker[workerid].queue_array, task);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&ws->per_worker[workerid].worker_mutex);
 	locality_pushed_task(task, workerid, sched_ctx_id);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&ws->per_worker[workerid].worker_mutex);
 
 	starpu_push_task_end(task);
 	STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(sched_mutex);