Browse Source

Make clusters work with (L)WS

Terry Cojean 8 years ago
parent
commit
cb36a92b55
1 changed files with 24 additions and 2 deletions
  1. 24 2
      src/sched_policies/work_stealing_policy.c

+ 24 - 2
src/sched_policies/work_stealing_policy.c

@@ -133,7 +133,8 @@ static int select_victim_round_robin(struct _starpu_work_stealing_data *ws, unsi
 		 * estimation */
 		ntasks = ws->per_worker[workerids[worker]].queue_array->ntasks;
 
-		if (ntasks && ws->per_worker[workerids[worker]].busy)
+		if (ntasks && (ws->per_worker[workerids[worker]].busy
+					   || starpu_worker_is_blocked(workerids[worker])))
 			break;
 
 		worker = (worker + 1) % nworkers;
@@ -538,6 +539,14 @@ static struct starpu_task *ws_pop_task(unsigned sched_ctx_id)
 	{
 		/* there was a local task */
 		ws->per_worker[workerid].busy = 1;
+		starpu_sched_ctx_list_task_counters_decrement(sched_ctx_id, workerid);
+		unsigned child_sched_ctx = starpu_sched_ctx_worker_is_master_for_child_ctx(workerid, sched_ctx_id);
+		if(child_sched_ctx != STARPU_NMAX_SCHED_CTXS)
+		{
+			starpu_sched_ctx_move_task_to_ctx(task, child_sched_ctx, 1, 1);
+			starpu_sched_ctx_revert_task_counters(sched_ctx_id, task->flops);
+			return NULL;
+		}
 		return task;
 	}
 
@@ -571,6 +580,7 @@ static struct starpu_task *ws_pop_task(unsigned sched_ctx_id)
 	{
 		_STARPU_TRACE_WORK_STEALING(workerid, victim);
 		_STARPU_TASK_BREAK_ON(task, sched);
+		starpu_sched_ctx_list_task_counters_decrement(sched_ctx_id, victim);
 		record_data_locality(task, workerid);
 		record_worker_locality(ws, task, workerid, sched_ctx_id);
 		locality_popped_task(ws, task, victim, sched_ctx_id);
@@ -580,6 +590,16 @@ static struct starpu_task *ws_pop_task(unsigned sched_ctx_id)
 	/* Done with stealing, resynchronize with core */
 	STARPU_PTHREAD_MUTEX_LOCK_SCHED(sched_mutex);
 
+	if (task)
+	{
+		unsigned child_sched_ctx = starpu_sched_ctx_worker_is_master_for_child_ctx(workerid, sched_ctx_id);
+		if(child_sched_ctx != STARPU_NMAX_SCHED_CTXS)
+		{
+			starpu_sched_ctx_move_task_to_ctx(task, child_sched_ctx, 1, 1);
+			starpu_sched_ctx_revert_task_counters(sched_ctx_id, task->flops);
+			return NULL;
+		}
+	}
 	ws->per_worker[workerid].busy = !!task;
 	return task;
 }
@@ -620,6 +640,7 @@ int ws_push_task(struct starpu_task *task)
 
 	starpu_push_task_end(task);
 	STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(sched_mutex);
+	starpu_sched_ctx_list_task_counters_increment(sched_ctx_id, workerid);
 
 #if !defined(STARPU_NON_BLOCKING_DRIVERS) || defined(STARPU_SIMGRID)
 	/* TODO: implement fine-grain signaling, similar to what eager does */
@@ -726,7 +747,8 @@ static int lws_select_victim(struct _starpu_work_stealing_data *ws, unsigned sch
 	{
 		int neighbor = ws->per_worker[workerid].proxlist[i];
 		int ntasks = ws->per_worker[neighbor].queue_array->ntasks;
-		if (ntasks && ws->per_worker[neighbor].busy)
+		if (ntasks && (ws->per_worker[neighbor].busy
+					   || starpu_worker_is_blocked(neighbor)))
 			return neighbor;
 	}
 	return -1;