Forráskód Böngészése

lws: relax memory pressure

Samuel Thibault 5 éve
szülő
commit
d3ba7f5777
1 módosított fájl, 56 hozzáadás és 15 törlés
  1. 56 15
      src/sched_policies/work_stealing_policy.c

+ 56 - 15
src/sched_policies/work_stealing_policy.c

@@ -71,10 +71,16 @@ struct locality_entry
 
 struct _starpu_work_stealing_data_per_worker
 {
+	char fill1[STARPU_CACHELINE_SIZE];
+	/* This is read-mostly, only updated when the queue becomes empty or
+	 * becomes non-empty, to make it generally cheap to check */
+	unsigned notask;	/* whether the queue is empty */
+	char fill2[STARPU_CACHELINE_SIZE];
+
 	struct _starpu_prio_deque queue;
 	int running;
 	int *proxlist;
-	int busy;
+	int busy;	/* Whether this worker is working on a task */
 
 #ifdef USE_LOCALITY_TASKS
 	/* This records the same as queue, but hashed by data accessed with locality flag.  */
@@ -131,11 +137,12 @@ static int select_victim_round_robin(struct _starpu_work_stealing_data *ws, unsi
 		/* Here helgrind would shout that this is unprotected, but we
 		 * are fine with getting outdated values, this is just an
 		 * estimation */
-		ntasks = ws->per_worker[workerids[worker]].queue.ntasks;
-
-		if (ntasks && (ws->per_worker[workerids[worker]].busy
-					   || starpu_worker_is_blocked_in_parallel(workerids[worker])))
-			break;
+		if (!ws->per_worker[workerids[worker]].notask)
+		{
+			if (ws->per_worker[workerids[worker]].busy
+						   || starpu_worker_is_blocked_in_parallel(workerids[worker]))
+				break;
+		}
 
 		worker = (worker + 1) % nworkers;
 		if (worker == ws->last_pop_worker)
@@ -327,15 +334,31 @@ static struct starpu_task *ws_pick_task(struct _starpu_work_stealing_data *ws, i
 	{
 		/* found an interesting task, try to pick it! */
 		if (_starpu_prio_deque_pop_this_task(&data_source->queue, target, best_task))
+		{
+			if (!data_source->queue.ntasks)
+			{
+				STARPU_ASSERT(ws->per_worker[source].notask == 0);
+				ws->per_worker[source].notask = 1;
+			}
 			return best_task;
+		}
 	}
 
 	/* Didn't find an interesting task, or couldn't run it :( */
 	int skipped;
+	struct starpu_task *task;
+
 	if (source != target)
-		return _starpu_prio_deque_deque_task_for_worker(&data_source->queue, target, &skipped);
+		task = _starpu_prio_deque_deque_task_for_worker(&data_source->queue, target, &skipped);
 	else
-		return _starpu_prio_deque_pop_task_for_worker(&data_source->queue, target, &skipped);
+		task = _starpu_prio_deque_pop_task_for_worker(&data_source->queue, target, &skipped);
+
+	if (!data_source->queue.ntasks)
+	{
+		STARPU_ASSERT(ws->per_worker[source].notask == 1);
+		ws->per_worker[source].notask = 1;
+	}
+	return task;
 }
 
 /* Called when popping a task from a queue */
@@ -371,10 +394,18 @@ static void locality_pushed_task(struct _starpu_work_stealing_data *ws STARPU_AT
 static struct starpu_task *ws_pick_task(struct _starpu_work_stealing_data *ws, int source, int target)
 {
 	int skipped;
+	struct starpu_task *task;
 	if (source != target)
-		return _starpu_prio_deque_deque_task_for_worker(&ws->per_worker[source].queue, target, &skipped);
+		task = _starpu_prio_deque_deque_task_for_worker(&ws->per_worker[source].queue, target, &skipped);
 	else
-		return _starpu_prio_deque_pop_task_for_worker(&ws->per_worker[source].queue, target, &skipped);
+		task = _starpu_prio_deque_pop_task_for_worker(&ws->per_worker[source].queue, target, &skipped);
+
+	if (!ws->per_worker[source].queue.ntasks)
+	{
+		STARPU_ASSERT(ws->per_worker[source].notask == 1);
+		ws->per_worker[source].notask = 1;
+	}
+	return task;
 }
 /* Called when popping a task from a queue */
 static void locality_popped_task(struct _starpu_work_stealing_data *ws STARPU_ATTRIBUTE_UNUSED, struct starpu_task *task STARPU_ATTRIBUTE_UNUSED, int workerid STARPU_ATTRIBUTE_UNUSED, unsigned sched_ctx_id STARPU_ATTRIBUTE_UNUSED)
@@ -530,7 +561,8 @@ static struct starpu_task *ws_pop_task(unsigned sched_ctx_id)
 	struct starpu_task *task = NULL;
 	unsigned workerid = starpu_worker_get_id_check();
 
-	ws->per_worker[workerid].busy = 0;
+	if (ws->per_worker[workerid].busy)
+		ws->per_worker[workerid].busy = 0;
 
 #ifdef STARPU_NON_BLOCKING_DRIVERS
 	if (STARPU_RUNNING_ON_VALGRIND || !_starpu_prio_deque_is_empty(&ws->per_worker[workerid].queue))
@@ -617,7 +649,8 @@ static struct starpu_task *ws_pop_task(unsigned sched_ctx_id)
 		if (!task)
 			return NULL;
 	}
-	ws->per_worker[workerid].busy = !!task;
+	if (ws->per_worker[workerid].busy != !!task)
+		ws->per_worker[workerid].busy = !!task;
 	return task;
 }
 
@@ -648,6 +681,11 @@ int ws_push_task(struct starpu_task *task)
 	record_data_locality(task, workerid);
 	STARPU_ASSERT_MSG(ws->per_worker[workerid].running, "workerid=%d, ws=%p\n", workerid, ws);
 	_starpu_prio_deque_push_back_task(&ws->per_worker[workerid].queue, task);
+	if (ws->per_worker[workerid].queue.ntasks == 1)
+	{
+		STARPU_ASSERT(ws->per_worker[workerid].notask == 0);
+		ws->per_worker[workerid].notask = 0;
+	}
 	locality_pushed_task(ws, task, workerid, sched_ctx_id);
 
 	starpu_push_task_end(task);
@@ -676,10 +714,12 @@ static void ws_add_workers(unsigned sched_ctx_id, int *workerids,unsigned nworke
 		int workerid = workerids[i];
 		starpu_sched_ctx_worker_shares_tasks_lists(workerid, sched_ctx_id);
 		_starpu_prio_deque_init(&ws->per_worker[workerid].queue);
+		ws->per_worker[workerid].notask = 1;
 		ws->per_worker[workerid].running = 1;
 
 		/* Tell helgrind that we are fine with getting outdated values,
 		 * this is just an estimation */
+		STARPU_HG_DISABLE_CHECKING(ws->per_worker[workerid].notask);
 		STARPU_HG_DISABLE_CHECKING(ws->per_worker[workerid].queue.ntasks);
 		ws->per_worker[workerid].busy = 0;
 		STARPU_HG_DISABLE_CHECKING(ws->per_worker[workerid].busy);
@@ -760,11 +800,12 @@ static int lws_select_victim(struct _starpu_work_stealing_data *ws, unsigned sch
 	for (i = 0; i < nworkers; i++)
 	{
 		int neighbor = ws->per_worker[workerid].proxlist[i];
+		if (ws->per_worker[neighbor].notask)
+			continue;
                 /* FIXME: do not keep looking again and again at some worker
                  * which has tasks, but that can't execute on me */
-		int ntasks = ws->per_worker[neighbor].queue.ntasks;
-		if (ntasks && (ws->per_worker[neighbor].busy
-					   || starpu_worker_is_blocked_in_parallel(neighbor)))
+		if (ws->per_worker[neighbor].busy
+					   || starpu_worker_is_blocked_in_parallel(neighbor))
 			return neighbor;
 	}
 	return -1;