Explorar el Código

Fix scalability of ws & lws with small tasks:

- use starpu_sched_ctx_get_workers_list_raw instead of the expensive starpu_sched_ctx_get_workers_list
- do not take the worker mutex if it doesn't have any task
- do not steal from a worker if it doesn't have any task
Samuel Thibault hace 8 años
padre
commit
d15e6f2019
Se han modificado 1 ficheros con 37 adiciones y 25 borrados
  1. 37 25
      src/sched_policies/work_stealing_policy.c

+ 37 - 25
src/sched_policies/work_stealing_policy.c

@@ -86,7 +86,7 @@ struct _starpu_work_stealing_data_per_worker
 
 struct _starpu_work_stealing_data
 {
-	unsigned (*select_victim)(unsigned, int);
+	int (*select_victim)(unsigned, int);
 	struct _starpu_work_stealing_data_per_worker *per_worker;
 	/* keep track of the work performed from the beginning of the algorithm to make
 	 * better decisions about which queue to select when stealing or deferring work
@@ -112,20 +112,19 @@ static int calibration_value = 0;
  * the worker previously selected doesn't own any task,
  * then we return the first non-empty worker.
  */
-static unsigned select_victim_round_robin(unsigned sched_ctx_id)
+static int select_victim_round_robin(unsigned sched_ctx_id)
 {
 	struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 	unsigned worker = ws->last_pop_worker;
-	unsigned nworkers = starpu_sched_ctx_get_nworkers(sched_ctx_id);
+	unsigned nworkers;
 	int *workerids = NULL;
-	starpu_sched_ctx_get_workers_list(sched_ctx_id, &workerids);
+	nworkers = starpu_sched_ctx_get_workers_list_raw(sched_ctx_id, &workerids);
+	unsigned ntasks = 0;
 
 	/* If the worker's queue is empty, let's try
 	 * the next ones */
 	while (1)
 	{
-		unsigned ntasks;
-
 		/* Here helgrind would shout that this is unprotected, but we
 		 * are fine with getting outdated values, this is just an
 		 * estimation */
@@ -146,9 +145,11 @@ static unsigned select_victim_round_robin(unsigned sched_ctx_id)
 	ws->last_pop_worker = (worker + 1) % nworkers;
 
 	worker = workerids[worker];
-	free(workerids);
 
-	return worker;
+	if (ntasks)
+		return worker;
+	else
+		return -1;
 }
 
 /**
@@ -159,14 +160,13 @@ static unsigned select_worker_round_robin(unsigned sched_ctx_id)
 {
 	struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 	unsigned worker = ws->last_push_worker;
-	unsigned nworkers = starpu_sched_ctx_get_nworkers(sched_ctx_id);
-	int *workerids = NULL;
-	starpu_sched_ctx_get_workers_list(sched_ctx_id, &workerids);
+	unsigned nworkers;
+	int *workerids;
+	nworkers = starpu_sched_ctx_get_workers_list_raw(sched_ctx_id, &workerids);
 
 	ws->last_push_worker = (ws->last_push_worker + 1) % nworkers;
 
 	worker = workerids[worker];
-	free(workerids);
 
 	return worker;
 }
@@ -413,7 +413,7 @@ static float overload_metric(unsigned sched_ctx_id, unsigned id)
  * by the tasks are taken into account to select the most suitable
  * worker to steal task from.
  */
-static unsigned select_victim_overload(unsigned sched_ctx_id)
+static int select_victim_overload(unsigned sched_ctx_id)
 {
 	unsigned worker;
 	float  worker_ratio;
@@ -493,7 +493,7 @@ static unsigned select_worker_overload(unsigned sched_ctx_id)
  * This is a phony function used to call the right
  * function depending on the value of USE_OVERLOAD.
  */
-static inline unsigned select_victim(unsigned sched_ctx_id,
+static inline int select_victim(unsigned sched_ctx_id,
 				     int workerid STARPU_ATTRIBUTE_UNUSED)
 {
 #ifdef USE_OVERLOAD
@@ -523,14 +523,20 @@ static struct starpu_task *ws_pop_task(unsigned sched_ctx_id)
 {
 	struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 
-	struct starpu_task *task;
-	int workerid = starpu_worker_get_id_check();
+	struct starpu_task *task = NULL;
+	unsigned workerid = starpu_worker_get_id_check();
+
+#ifdef STARPU_NON_BLOCKING_DRIVERS
+	if (STARPU_RUNNING_ON_VALGRIND || !_starpu_fifo_empty(ws->per_worker[workerid].queue_array))
+#endif
+	{
+		STARPU_PTHREAD_MUTEX_LOCK(&ws->per_worker[workerid].worker_mutex);
+		task = ws_pick_task(workerid, workerid, sched_ctx_id);
+		if (task)
+			locality_popped_task(task, workerid, sched_ctx_id);
+		STARPU_PTHREAD_MUTEX_UNLOCK(&ws->per_worker[workerid].worker_mutex);
+	}
 
-	STARPU_PTHREAD_MUTEX_LOCK(&ws->per_worker[workerid].worker_mutex);
-	task = ws_pick_task(workerid, workerid, sched_ctx_id);
-	if (task)
-		locality_popped_task(task, workerid, sched_ctx_id);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&ws->per_worker[workerid].worker_mutex);
 	if (task)
 	{
 		/* there was a local task */
@@ -539,7 +545,9 @@ static struct starpu_task *ws_pop_task(unsigned sched_ctx_id)
 
 
 	/* we need to steal someone's job */
-	unsigned victim = ws->select_victim(sched_ctx_id, workerid);
+	int victim = ws->select_victim(sched_ctx_id, workerid);
+	if (victim == -1)
+		return NULL;
 
 	STARPU_PTHREAD_MUTEX_LOCK(&ws->per_worker[victim].worker_mutex);
 	if (ws->per_worker[victim].queue_array != NULL && ws->per_worker[victim].queue_array->ntasks > 0)
@@ -557,7 +565,11 @@ static struct starpu_task *ws_pop_task(unsigned sched_ctx_id)
 	}
 	STARPU_PTHREAD_MUTEX_UNLOCK(&ws->per_worker[victim].worker_mutex);
 
-	if(!task)
+	if(!task
+#ifdef STARPU_NON_BLOCKING_DRIVERS
+		&& (STARPU_RUNNING_ON_VALGRIND || !_starpu_fifo_empty(ws->per_worker[workerid].queue_array))
+#endif
+		)
 	{
 		STARPU_PTHREAD_MUTEX_LOCK(&ws->per_worker[workerid].worker_mutex);
 		if (ws->per_worker[workerid].queue_array != NULL && ws->per_worker[workerid].queue_array->ntasks > 0)
@@ -709,7 +721,7 @@ struct starpu_sched_policy _starpu_sched_ws_policy =
  * the proximity list built using the info on te architecture provided by hwloc
  */
 #ifdef STARPU_HAVE_HWLOC
-static unsigned lws_select_victim(unsigned sched_ctx_id, int workerid)
+static int lws_select_victim(unsigned sched_ctx_id, int workerid)
 {
 	struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data *)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 	int nworkers = starpu_sched_ctx_get_nworkers(sched_ctx_id);
@@ -723,7 +735,7 @@ static unsigned lws_select_victim(unsigned sched_ctx_id, int workerid)
 		if (ntasks)
 			return neighbor;
 	}
-	return workerid;
+	return -1;
 }
 #endif