Browse Source

The work stealing scheduler is fixed.
ws is back !

Nicolas Collin 13 years ago
parent
commit
00902d84d9

+ 2 - 3
src/sched_policies/deque_queues.c

@@ -53,12 +53,12 @@ unsigned _starpu_get_deque_njobs(struct _starpu_deque_jobq *deque_queue)
 	return deque_queue->njobs;
 }
 
-unsigned _starpu_get_deque_nprocessed(struct _starpu_deque_jobq *deque_queue)
+int _starpu_get_deque_nprocessed(struct _starpu_deque_jobq *deque_queue)
 {
 	return deque_queue->nprocessed;
 }
 
-struct starpu_task *_starpu_deque_pop_task(struct _starpu_deque_jobq *deque_queue, int workerid __attribute__ ((unused)))
+struct starpu_task *_starpu_deque_pop_task(struct _starpu_deque_jobq *deque_queue, int workerid)
 {
 	struct _starpu_job *j = NULL;
 
@@ -80,7 +80,6 @@ struct starpu_task *_starpu_deque_pop_task(struct _starpu_deque_jobq *deque_queu
 			{
 				j->nimpl = nimpl;
 				j = _starpu_job_list_pop_front(deque_queue->jobq);
-				deque_queue->njobs--;
 				_STARPU_TRACE_JOB_POP(j, 0);
 				return j->task;
 			}

+ 2 - 2
src/sched_policies/deque_queues.h

@@ -32,7 +32,7 @@ struct _starpu_deque_jobq
 	unsigned njobs;
 
 	/* the number of tasks that were processed */
-	unsigned nprocessed;
+	int nprocessed;
 
 	/* only meaningful if the queue is only used by a single worker */
 	double exp_start; /* Expected start date of first task in the queue */
@@ -47,7 +47,7 @@ struct starpu_task *_starpu_deque_pop_task(struct _starpu_deque_jobq *deque_queu
 struct _starpu_job_list *_starpu_deque_pop_every_task(struct _starpu_deque_jobq *deque_queue, pthread_mutex_t *sched_mutex, int workerid);
 
 unsigned _starpu_get_deque_njobs(struct _starpu_deque_jobq *deque_queue);
-unsigned _starpu_get_deque_nprocessed(struct _starpu_deque_jobq *deque_queue);
+int _starpu_get_deque_nprocessed(struct _starpu_deque_jobq *deque_queue);
 
 
 #endif // __DEQUE_QUEUES_H__

+ 223 - 98
src/sched_policies/work_stealing_policy.c

@@ -2,6 +2,7 @@
  *
  * Copyright (C) 2010-2011  Université de Bordeaux 1
  * Copyright (C) 2010, 2011  Centre National de la Recherche Scientifique
+ * Copyright (C) 2012	Inria
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -17,207 +18,331 @@
 
 /* Work stealing policy */
 
+#include <float.h>
+
 #include <core/workers.h>
 #include <sched_policies/deque_queues.h>
 
 static unsigned nworkers;
-static unsigned rr_worker;
+static unsigned last_pop_worker;
+static unsigned last_push_worker;
 static struct _starpu_deque_jobq *queue_array[STARPU_NMAXWORKERS];
 
 static pthread_mutex_t global_sched_mutex;
 static pthread_cond_t global_sched_cond;
 
-/* keep track of the work performed from the beginning of the algorithm to make
+/**
+ * Keep track of the work performed from the beginning of the algorithm to make
  * better decisions about which queue to select when stealing or deferring work
  */
-static unsigned performed_total = 0;
+static int performed_total;
 
 #ifdef USE_OVERLOAD
-static float overload_metric(unsigned id)
+
+/**
+ * Minimum number of task we wait for being processed before we start assuming
+ * on which worker the computation would be faster.
+ */
+static int calibration_value = 0;
+
+#endif /* USE_OVERLOAD */
+
+
+/**
+ * Return a worker from which a task can be stolen.
+ * Selecting a worker is done in a round-robin fashion, unless
+ * the worker previously selected doesn't own any task,
+ * then we return the first non-empty worker.
+ */
+static unsigned select_victim_round_robin(void)
 {
-	float execution_ratio = 0.0f;
-	if (performed_total > 0)
+	unsigned worker = last_pop_worker;
+
+	/* If the worker's queue is empty, let's try
+	 * the next ones */
+	while (!queue_array[worker]->njobs)
 	{
-		execution_ratio = _starpu_get_deque_nprocessed(queue_array[id])/performed_total;
+		worker = (worker + 1) % nworkers;
+		if (worker == last_pop_worker)
+		{
+			/* We got back to the first worker,
+			 * don't go in infinite loop */
+			break;
+		}
 	}
 
-	unsigned performed_queue;
-	performed_queue = _starpu_get_deque_nprocessed(queue_array[id]);
+	last_pop_worker = (worker + 1) % nworkers;
+
+	return worker;
+}
+
+/**
+ * Return a worker to whom add a task.
+ * Selecting a worker is done in a round-robin fashion.
+ */
+static unsigned select_worker_round_robin(void)
+{
+	unsigned worker = last_push_worker;
+
+	last_push_worker = (last_push_worker + 1) % nworkers;
+
+	return worker;
+}
+
+#ifdef USE_OVERLOAD
 
+/**
+ * Return a ratio helpful to determine whether a worker is suitable to steal
+ * tasks from or to put some tasks in its queue.
+ *
+ * \return	a ratio with a positive or negative value, describing the current state of the worker :
+ * 		a smaller value implies a faster worker with an relatively emptier queue : more suitable to put tasks in
+ * 		a bigger value implies a slower worker with an reletively more replete queue : more suitable to steal tasks from
+ */
+static float overload_metric(unsigned id)
+{
+	float execution_ratio = 0.0f;
 	float current_ratio = 0.0f;
-	if (performed_queue > 0)
+
+	int nprocessed = _starpu_get_deque_nprocessed(queue_array[id]);
+	unsigned njobs = _starpu_get_deque_njobs(queue_array[id]);
+
+	/* Did we get enough information ? */
+	if (performed_total > 0 && nprocessed > 0)
+	{
+		/* How fast or slow is the worker compared to the other workers */
+		execution_ratio = (float) nprocessed / performed_total;
+		/* How replete is its queue */
+		current_ratio = (float) njobs / nprocessed;
+	}
+	else
 	{
-		current_ratio = _starpu_get_deque_njobs(queue_array[id])/performed_queue;
+		return 0.0f;
 	}
 
 	return (current_ratio - execution_ratio);
 }
 
-/* who to steal work to ? */
-static struct _starpu_deque_jobq *select_victimq(void)
+/**
+ * Return the most suitable worker from which a task can be stolen.
+ * The number of previously processed tasks, total and local,
+ * and the number of tasks currently awaiting to be processed
+ * by the tasks are taken into account to select the most suitable
+ * worker to steal task from.
+ */
+static unsigned select_victim_overload(void)
 {
-	struct _starpu_deque_jobq *q;
+	unsigned worker;
+	float  worker_ratio;
+	unsigned best_worker = 0;
+	float best_ratio = FLT_MIN;	
 
-	unsigned attempts = nworkers;
+	/* Don't try to play smart until we get
+	 * enough informations. */
+	if (performed_total < calibration_value)
+		return select_victim_round_robin();
 
-	unsigned worker = rr_worker;
-	do
+	for (worker = 0; worker < nworkers; worker++)
 	{
-		if (overload_metric(worker) > 0.0f)
-		{
-			q = queue_array[worker];
-			return q;
-		}
-		else
+		worker_ratio = overload_metric(worker);
+
+		if (worker_ratio > best_ratio)
 		{
-			worker = (worker + 1)%nworkers;
+			best_worker = worker;
+			best_ratio = worker_ratio;
 		}
 	}
-	while(attempts-- > 0);
 
-	/* take one anyway ... */
-	q = queue_array[rr_worker];
-	rr_worker = (rr_worker + 1 )%nworkers;
-
-	return q;
+	return best_worker;
 }
 
-static struct _starpu_deque_jobq *select_workerq(void)
+/**
+ * Return the most suitable worker to whom add a task.
+ * The number of previously processed tasks, total and local,
+ * and the number of tasks currently awaiting to be processed
+ * by the tasks are taken into account to select the most suitable
+ * worker to add a task to.
+ */
+static unsigned select_worker_overload(void)
 {
-	struct _starpu_deque_jobq *q;
+	unsigned worker;
+	float  worker_ratio;
+	unsigned best_worker = 0;
+	float best_ratio = FLT_MAX;
 
-	unsigned attempts = nworkers;
+	/* Don't try to play smart until we get
+	 * enough informations. */
+	if (performed_total < calibration_value)
+		return select_worker_round_robin();
 
-	unsigned worker = rr_worker;
-	do
+	for (worker = 0; worker < nworkers; worker++)
 	{
-		if (overload_metric(worker) < 0.0f)
-		{
-			q = queue_array[worker];
-			return q;
-		}
-		else
+		worker_ratio = overload_metric(worker);
+
+		if (worker_ratio < best_ratio)
 		{
-			worker = (worker + 1)%nworkers;
+			best_worker = worker;
+			best_ratio = worker_ratio;
 		}
 	}
-	while(attempts-- > 0);
 
-	/* take one anyway ... */
-	q = queue_array[rr_worker];
-	rr_worker = (rr_worker + 1 )%nworkers;
-
-	return q;
+	return best_worker;
 }
 
-#else
+#endif /* USE_OVERLOAD */
 
-/* who to steal work from ? */
-static struct _starpu_deque_jobq *select_victimq(void)
-{
-	struct _starpu_deque_jobq *q;
-
-	q = queue_array[rr_worker];
 
-	rr_worker = (rr_worker + 1 )%nworkers;
-
-	return q;
+/**
+ * Return a worker from which a task can be stolen.
+ * This is a phony function used to call the right
+ * function depending on the value of USE_OVERLOAD.
+ */
+static inline unsigned select_victim(void)
+{
+#ifdef USE_OVERLOAD
+	return select_victim_overload();
+#else
+	return select_victim_round_robin();
+#endif /* USE_OVERLOAD */
 }
 
-
-/* when anonymous threads submit tasks,
- * we need to select a queue where to dispose them */
-static struct _starpu_deque_jobq *select_workerq(void)
+/**
+ * Return a worker from which a task can be stolen.
+ * This is a phony function used to call the right
+ * function depending on the value of USE_OVERLOAD.
+ */
+static inline unsigned select_worker(void)
 {
-	struct _starpu_deque_jobq *q;
-
-	q = queue_array[rr_worker];
-
-	rr_worker = (rr_worker + 1 )%nworkers;
-
-	return q;
+#ifdef USE_OVERLOAD
+	return select_worker_overload();
+#else
+	return select_worker_round_robin();
+#endif /* USE_OVERLOAD */
 }
 
-#endif
 
 #ifdef STARPU_DEVEL
-#warning TODO rewrite ... this will not scale at all now
+#warning TODO rewrite ... this will not scale at all now ...
+#warning and the overload versions are useless with a global mutex ...
 #endif
+
+/**
+ * Return a task to execute.
+ * If possible from the calling worker queue, else
+ * stealing from an other.
+ * For now mutex must be locked before calling this function.
+ */
 static struct starpu_task *ws_pop_task(void)
 {
 	struct starpu_task *task;
+	struct _starpu_deque_jobq *q;
 
 	int workerid = starpu_worker_get_id();
 
-	struct _starpu_deque_jobq *q;
+	STARPU_ASSERT(workerid != -1);
 
 	q = queue_array[workerid];
 
-	_STARPU_PTHREAD_MUTEX_LOCK(&global_sched_mutex);
-
-	task = _starpu_deque_pop_task(q, -1);
+	task = _starpu_deque_pop_task(q, workerid);
 	if (task)
 	{
 		/* there was a local task */
 		performed_total++;
-		_STARPU_PTHREAD_MUTEX_UNLOCK(&global_sched_mutex);
+		q->nprocessed++;
+		q->njobs--;
 		return task;
 	}
 
 	/* we need to steal someone's job */
-	struct _starpu_deque_jobq *victimq;
-	victimq = select_victimq();
+	unsigned victim = select_victim();
+	struct _starpu_deque_jobq *victimq = queue_array[victim];
 
 	task = _starpu_deque_pop_task(victimq, workerid);
 	if (task)
 	{
-		_STARPU_TRACE_WORK_STEALING(q, victimq);
+		_STARPU_TRACE_WORK_STEALING(q, workerid);
 		performed_total++;
-	}
 
-	_STARPU_PTHREAD_MUTEX_UNLOCK(&global_sched_mutex);
+		/* Beware : we have to increase the number of processed tasks of
+		 * the stealer, not the victim ! */
+		q->nprocessed++;
+		victimq->njobs--;
+	}
 
 	return task;
 }
 
+/**
+ * Push a task in the calling worker's queue.
+ * If the calling thread is not a worker, push
+ * the task in a worker chosen on the fly.
+ */
 static int ws_push_task(struct starpu_task *task)
 {
-	struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
-
+	struct _starpu_deque_jobq *deque_queue;
+	struct _starpu_job *j = _starpu_get_job_associated_to_task(task); 
 	int workerid = starpu_worker_get_id();
 
-        struct _starpu_deque_jobq *deque_queue;
-	deque_queue = queue_array[workerid];
+	_STARPU_PTHREAD_MUTEX_LOCK(&global_sched_mutex);
 
-        _STARPU_PTHREAD_MUTEX_LOCK(&global_sched_mutex);
-	// XXX reuse ?
-        //total_number_of_jobs++;
+	/* If the current thread is not a worker but
+	 * the main thread (-1), we find the better one to
+	 * put task on its queue */
+	if (workerid == -1)
+		workerid = select_worker();
 
-        _STARPU_TRACE_JOB_PUSH(task, 0);
-        _starpu_job_list_push_front(deque_queue->jobq, j);
-        deque_queue->njobs++;
-        deque_queue->nprocessed++;
+	deque_queue = queue_array[workerid];
 
-        _STARPU_PTHREAD_COND_SIGNAL(&global_sched_cond);
-        _STARPU_PTHREAD_MUTEX_UNLOCK(&global_sched_mutex);
+	_STARPU_TRACE_JOB_PUSH(task, 0);
+	_starpu_job_list_push_back(deque_queue->jobq, j);
+	deque_queue->njobs++;
+
+	_STARPU_PTHREAD_COND_SIGNAL(&global_sched_cond);
+	_STARPU_PTHREAD_MUTEX_UNLOCK(&global_sched_mutex);
 
-        return 0;
+	return 0;
 }
 
+/**
+ * Initializing the work stealing scheduler.
+ */
 static void initialize_ws_policy(struct starpu_machine_topology *topology,
-				__attribute__ ((unused)) struct starpu_sched_policy *_policy)
+		__attribute__ ((unused)) struct starpu_sched_policy *_policy)
 {
+	enum starpu_perf_archtype perf_arch;
+	unsigned workerid;
+
 	nworkers = topology->nworkers;
-	rr_worker = 0;
+	last_pop_worker = 0;
+	last_push_worker = 0;
+
+	/**
+	 * The first WS_POP_TASK will increase PERFORMED_TOTAL though no task was actually performed yet,
+	 * we need to initialize it at -1.
+	 */
+	performed_total = -1;
 
 	_STARPU_PTHREAD_MUTEX_INIT(&global_sched_mutex, NULL);
 	_STARPU_PTHREAD_COND_INIT(&global_sched_cond, NULL);
 
-	unsigned workerid;
 	for (workerid = 0; workerid < nworkers; workerid++)
 	{
 		queue_array[workerid] = _starpu_create_deque();
+
+		/**
+		 * The first WS_POP_TASK will increase NPROCESSED though no task was actually performed yet,
+		 * we need to initialize it at -1.
+		 */
+		queue_array[workerid]->nprocessed = -1;
+		queue_array[workerid]->njobs = 0;
+
 		starpu_worker_set_sched_condition(workerid, &global_sched_cond, &global_sched_mutex);
+
+		perf_arch = starpu_worker_get_perf_archtype(workerid);
+
+#ifdef USE_OVERLOAD
+		calibration_value += (unsigned int) starpu_worker_get_relative_speedup(perf_arch);
+#endif /* USE_OVERLOAD */
 	}
 }