浏览代码

work_stealing policy

Andra Hugo 13 年之前
父节点
当前提交
44ca36558e
共有 2 个文件被更改,包括 106 次插入93 次删除
  1. 1 1
      src/Makefile.am
  2. 105 92
      src/sched_policies/work_stealing_policy.c

+ 1 - 1
src/Makefile.am

@@ -160,6 +160,7 @@ libstarpu_@STARPU_EFFECTIVE_VERSION@_la_SOURCES = 						\
 	sched_policies/deque_modeling_policy_data_aware.c	\
 	sched_policies/eager_central_priority_policy.c		\
 	sched_policies/random_policy.c				\
+	sched_policies/work_stealing_policy.c			\
 	sched_policies/detect_combined_workers.c		\
 	drivers/driver_common/driver_common.c			\
 	datawizard/memory_nodes.c				\
@@ -212,7 +213,6 @@ libstarpu_@STARPU_EFFECTIVE_VERSION@_la_SOURCES = 						\
 	top/starpu_top_message_queue.c				\
 	top/starpu_top_connection.c                          	\
 	worker_collection/worker_list.c
-#	sched_policies/work_stealing_policy.c			
 #	sched_policies/parallel_heft.c				
 #	sched_policies/parallel_greedy.c			
 

+ 105 - 92
src/sched_policies/work_stealing_policy.c

@@ -23,7 +23,7 @@
 #include <core/workers.h>
 #include <sched_policies/deque_queues.h>
 
-typedef struct work_stealing_data{
+typedef struct{
 	struct _starpu_deque_jobq **queue_array;
 	unsigned rr_worker;
 	/* keep track of the work performed from the beginning of the algorithm to make
@@ -33,7 +33,7 @@ typedef struct work_stealing_data{
 	pthread_mutex_t sched_mutex;
 	pthread_cond_t sched_cond;
 	unsigned last_pop_worker;
-static unsigned last_push_worker;
+	static unsigned last_push_worker;
 } work_stealing_data;
 
 #ifdef USE_OVERLOAD
@@ -53,16 +53,17 @@ static int calibration_value = 0;
  * the worker previously selected doesn't own any task,
  * then we return the first non-empty worker.
  */
-static unsigned select_victim_round_robin(struct starpu_sched_ctx *sched_ctx)
+static unsigned select_victim_round_robin(unsigned sched_ctx_id)
 {
-	work_stealing_data *ws = (work_stealing_data*)sched_ctx->policy_data;
+	work_stealing_data *ws = (work_stealing_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
 	unsigned worker = ws->last_pop_worker;
+	unsigned nworkers = starpu_get_nworkers_of_sched_ctx(sched_ctx_id);
 
 	/* If the worker's queue is empty, let's try
 	 * the next ones */
 	while (!ws->queue_array[worker]->njobs)
 	{
-		worker = (worker + 1) % sched_ctx->nworkers;
+		worker = (worker + 1) % nworkers;
 		if (worker == ws->last_pop_worker)
 		{
 			/* We got back to the first worker,
@@ -71,7 +72,7 @@ static unsigned select_victim_round_robin(struct starpu_sched_ctx *sched_ctx)
 		}
 	}
 
-	ws->last_pop_worker = (worker + 1) % sched_ctx->nworkers;
+	ws->last_pop_worker = (worker + 1) % nworkers;
 
 	return worker;
 }
@@ -80,12 +81,13 @@ static unsigned select_victim_round_robin(struct starpu_sched_ctx *sched_ctx)
  * Return a worker to whom add a task.
  * Selecting a worker is done in a round-robin fashion.
  */
-static unsigned select_worker_round_robin(struct starpu_sched_ctx *sched_ctx)
+static unsigned select_worker_round_robin(unsigned sched_ctx_id)
 {
-	work_stealing_data *ws = (work_stealing_data*)sched_ctx->policy_data;
+	work_stealing_data *ws = (work_stealing_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
 	unsigned worker = ws->last_push_worker;
+	unsigned nworkers = starpu_get_nworkers_of_sched_ctx(sched_ctx_id);
 
-	last_push_worker = (last_push_worker + 1) % sched_ctx->nworkers;
+	ws->last_push_worker = (ws->last_push_worker + 1) % nworkers;
 
 	return worker;
 }
@@ -100,9 +102,9 @@ static unsigned select_worker_round_robin(struct starpu_sched_ctx *sched_ctx)
  * 		a smaller value implies a faster worker with an relatively emptier queue : more suitable to put tasks in
  * 		a bigger value implies a slower worker with an reletively more replete queue : more suitable to steal tasks from
  */
-static float overload_metric(struct starpu_sched_ctx *sched_ctx, unsigned id)
+static float overload_metric(unsigned sched_ctx_id, unsigned id)
 {
-	work_stealing_data *ws = (work_stealing_data*)sched_ctx->policy_data;
+	work_stealing_data *ws = (work_stealing_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
 	float execution_ratio = 0.0f;
 	float current_ratio = 0.0f;
 
@@ -132,9 +134,9 @@ static float overload_metric(struct starpu_sched_ctx *sched_ctx, unsigned id)
  * by the tasks are taken into account to select the most suitable
  * worker to steal task from.
  */
-static unsigned select_victim_overload(struct starpu_sched_ctx *sched_ctx)
+static unsigned select_victim_overload(unsigned sched_ctx_id)
 {
-	unsigned worker, worker_ctx;
+	unsigned worker;
 	float  worker_ratio;
 	unsigned best_worker = 0;
 	float best_ratio = FLT_MIN;	
@@ -142,12 +144,17 @@ static unsigned select_victim_overload(struct starpu_sched_ctx *sched_ctx)
 	/* Don't try to play smart until we get
 	 * enough informations. */
 	if (performed_total < calibration_value)
-		return select_victim_round_robin(sched_ctx);
+		return select_victim_round_robin(sched_ctx_id);
 
-	for (worker_ctx = 0; worker_ctx < sched_ctx->nworkers; worker_ctx++)
-	{
-		worker = sched_ctx->workerid[worker_ctx];
-		worker_ratio = overload_metric(worker);
+	struct worker_collection *workers = starpu_get_worker_collection_of_sched_ctx(sched_ctx_id);
+
+        if(workers->init_cursor)
+                workers->init_cursor(workers);
+
+	while(workers->has_next(workers))
+        {
+                worker = workers->get_next(workers);
+		worker_ratio = overload_metric(sched_ctx_id, worker);
 
 		if (worker_ratio > best_ratio)
 		{
@@ -156,6 +163,9 @@ static unsigned select_victim_overload(struct starpu_sched_ctx *sched_ctx)
 		}
 	}
 
+	if(workers->init_cursor)
+                workers->deinit_cursor(workers);
+
 	return best_worker;
 }
 
@@ -166,9 +176,9 @@ static unsigned select_victim_overload(struct starpu_sched_ctx *sched_ctx)
  * by the tasks are taken into account to select the most suitable
  * worker to add a task to.
  */
-static unsigned select_worker_overload(struct starpu_sched_ctx *sched_ctx)
+static unsigned select_worker_overload(unsigned sched_ctx_id)
 {
-	unsigned worker, worker_ctx;
+	unsigned worker;
 	float  worker_ratio;
 	unsigned best_worker = 0;
 	float best_ratio = FLT_MAX;
@@ -176,12 +186,18 @@ static unsigned select_worker_overload(struct starpu_sched_ctx *sched_ctx)
 	/* Don't try to play smart until we get
 	 * enough informations. */
 	if (performed_total < calibration_value)
-		return select_worker_round_robin(sched_ctx);
+		return select_worker_round_robin(sched_ctx_id);
 
-	for (worker_ctx = 0; worker_ctx < sched_ctx->nworkers; worker_ctx++)
-	{
-		worker = sched_ctx->workerid[worker_ctx];
-		worker_ratio = overload_metric(sched_ctx,  worker);
+	struct worker_collection *workers = starpu_get_worker_collection_of_sched_ctx(sched_ctx_id);
+
+        if(workers->init_cursor)
+                workers->init_cursor(workers);
+
+	while(workers->has_next(workers))
+        {
+                worker = workers->get_next(workers);
+
+		worker_ratio = overload_metric(sched_ctx_id, worker);
 
 		if (worker_ratio < best_ratio)
 		{
@@ -190,6 +206,9 @@ static unsigned select_worker_overload(struct starpu_sched_ctx *sched_ctx)
 		}
 	}
 
+	if(workers->init_cursor)
+                workers->deinit_cursor(workers);
+
 	return best_worker;
 }
 
@@ -201,12 +220,12 @@ static unsigned select_worker_overload(struct starpu_sched_ctx *sched_ctx)
  * This is a phony function used to call the right
  * function depending on the value of USE_OVERLOAD.
  */
-static inline unsigned select_victim(struct starpu_sched_ctx *sched_ctx)
+static inline unsigned select_victim(unsigned sched_ctx_id)
 {
 #ifdef USE_OVERLOAD
-	return select_victim_overload(sched_ctx);
+	return select_victim_overload(sched_ctx_id);
 #else
-	return select_victim_round_robin(sched_ctx);
+	return select_victim_round_robin(sched_ctx_id);
 #endif /* USE_OVERLOAD */
 }
 
@@ -215,12 +234,12 @@ static inline unsigned select_victim(struct starpu_sched_ctx *sched_ctx)
  * This is a phony function used to call the right
  * function depending on the value of USE_OVERLOAD.
  */
-static inline unsigned select_worker(struct starpu_sched_ctx *sched_ctx)
+static inline unsigned select_worker(unsigned sched_ctx_id)
 {
 #ifdef USE_OVERLOAD
-	return select_worker_overload(sched_ctx);
+	return select_worker_overload(sched_ctx_id);
 #else
-	return select_worker_round_robin(sched_ctx);
+	return select_worker_round_robin(sched_ctx_id);
 #endif /* USE_OVERLOAD */
 }
 
@@ -230,8 +249,7 @@ static inline unsigned select_worker(struct starpu_sched_ctx *sched_ctx)
 #endif
 static struct starpu_task *ws_pop_task(unsigned sched_ctx_id)
 {
-	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	work_stealing_data *ws = (work_stealing_data*)sched_ctx->policy_data;
+	work_stealing_data *ws = (work_stealing_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
 
 	struct starpu_task *task;
 	struct _starpu_deque_jobq *q;
@@ -242,21 +260,21 @@ static struct starpu_task *ws_pop_task(unsigned sched_ctx_id)
 
 	q = ws->queue_array[workerid];
 
-	PTHREAD_MUTEX_LOCK(&ws->sched_mutex);
+	_STARPU_PTHREAD_MUTEX_LOCK(&ws->sched_mutex);
 
 	task = _starpu_deque_pop_task(q, workerid);
 	if (task)
 	{
 		/* there was a local task */
 		ws->performed_total++;
-		PTHREAD_MUTEX_UNLOCK(&ws->sched_mutex);
+		_STARPU_PTHREAD_MUTEX_UNLOCK(&ws->sched_mutex);
 		q->nprocessed++;
 		q->njobs--;
 		return task;
 	}
 
 	/* we need to steal someone's job */
-	unsigned victim = select_victim(sched_ctx);
+	unsigned victim = select_victim(sched_ctx_id);
 	struct _starpu_deque_jobq *victimq = ws->queue_array[victim];
 
 	task = _starpu_deque_pop_task(victimq, workerid);
@@ -274,22 +292,35 @@ static struct starpu_task *ws_pop_task(unsigned sched_ctx_id)
 	return task;
 }
 
-int ws_push_task(struct starpu_task *task, unsigned sched_ctx_id)
+int ws_push_task(struct starpu_task *task)
 {
-	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	work_stealing_data *ws = (work_stealing_data*)sched_ctx->policy_data;
+	unsigned sched_ctx_id = task->sched_ctx;
+	work_stealing_data *ws = (work_stealing_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
 
 	struct _starpu_deque_jobq *deque_queue;
 	struct _starpu_job *j = _starpu_get_job_associated_to_task(task); 
 	int workerid = starpu_worker_get_id();
 
+	pthread_mutex_t *changing_ctx_mutex = starpu_get_changing_ctx_mutex(sched_ctx_id);
+        unsigned nworkers;
+        int ret_val = -1;
+
+	/* if the context has no workers return */
+        _STARPU_PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
+        nworkers = starpu_get_nworkers_of_sched_ctx(sched_ctx_id);
+        if(nworkers == 0)
+        {
+                _STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
+                return ret_val;
+        }
+
 	_STARPU_PTHREAD_MUTEX_LOCK(&ws->sched_mutex);
 
 	/* If the current thread is not a worker but
 	 * the main thread (-1), we find the better one to
 	 * put task on its queue */
 	if (workerid == -1)
-		workerid = select_worker(sched_ctx);
+		workerid = select_worker(sched_ctx_id);
 
 	deque_queue = ws->queue_array[workerid];
 
@@ -300,18 +331,19 @@ int ws_push_task(struct starpu_task *task, unsigned sched_ctx_id)
 	_STARPU_PTHREAD_COND_SIGNAL(&ws->sched_cond);
 	_STARPU_PTHREAD_MUTEX_UNLOCK(&ws->sched_mutex);
 
+        _STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
+
 	return 0;
 }
 
-static void initialize_ws_policy_for_workers(unsigned sched_ctx_id, int *workerids,unsigned nnew_workers) 
+static void ws_add_workers(unsigned sched_ctx_id, int *workerids,unsigned nworkers) 
 {
-	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	work_stealing_data *ws = (work_stealing_data*)sched_ctx->policy_data;
+	work_stealing_data *ws = (work_stealing_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
 
 	unsigned i;
 	int workerid;
 	
-	for (i = 0; i < nnew_workers; i++)
+	for (i = 0; i < nworkers; i++)
 	{
 		workerid = workerids[i];
 		ws->queue_array[workerid] = _starpu_create_deque();
@@ -322,18 +354,32 @@ static void initialize_ws_policy_for_workers(unsigned sched_ctx_id, int *workeri
 		ws->queue_array[workerid]->nprocessed = -1;
 		ws->queue_array[workerid]->njobs = 0;
 
-		sched_ctx->sched_mutex[workerid] = &ws->sched_mutex;
-		sched_ctx->sched_cond[workerid] = &ws->sched_cond;
+		starpu_worker_set_sched_condition(sched_ctx_id, workerid, &ws->sched_mutex, &ws->sched_cond);
+	}
+}
+
+static void ws_remove_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
+{
+	work_stealing_data *ws = (work_stealing_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
+
+	unsigned i;
+	int workerid;
+	
+	for (i = 0; i < nworkers; i++)
+	{
+		workerid = workerids[i];
+		_starpu_destroy_deque(ws->queue_array[workerid]);
+		starpu_worker_set_sched_condition(sched_ctx_id, workerid, NULL, NULL);
 	}
 }
 
 static void initialize_ws_policy(unsigned sched_ctx_id) 
 {
-	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	starpu_create_worker_collection_for_sched_ctx(sched_ctx_id, WORKER_LIST);
+
 	work_stealing_data *ws = (work_stealing_data*)malloc(sizeof(work_stealing_data));
-	sched_ctx->policy_data = (void*)ws;
+	starpu_set_sched_ctx_policy_data(sched_ctx_id, (void*)ws);
 	
-	unsigned nworkers = sched_ctx->nworkers;
 	ws->last_pop_worker = 0;
 	ws->last_push_worker = 0;
 
@@ -343,67 +389,34 @@ static void initialize_ws_policy(unsigned sched_ctx_id)
 	 */
 	ws->performed_total = -1;
 
-	ws->queue_array = (struct starpu_deque_jobq_s**)malloc(STARPU_NMAXWORKERS*sizeof(struct _starpu_deque_jobq*));
+	ws->queue_array = (struct _starpu_deque_jobq**)malloc(STARPU_NMAXWORKERS*sizeof(struct _starpu_deque_jobq*));
 
 	_STARPU_PTHREAD_MUTEX_INIT(&ws->sched_mutex, NULL);
 	_STARPU_PTHREAD_COND_INIT(&ws->sched_cond, NULL);
-
-	unsigned workerid_ctx;
-	int workerid;
-	for (workerid_ctx = 0; workerid_ctx < nworkers; workerid_ctx++)
-	{
-		workerid = sched_ctx->workerids[workerid_ctx];
-		ws->queue_array[workerid] = _starpu_create_deque();
-		/**
-		 * The first WS_POP_TASK will increase NPROCESSED though no task was actually performed yet,
-		 * we need to initialize it at -1.
-		 */
-		ws->queue_array[workerid]->nprocessed = -1;
-		ws->queue_array[workerid]->njobs = 0;
-
-		sched_ctx->sched_mutex[workerid] = &ws->sched_mutex;
-		sched_ctx->sched_cond[workerid] = &ws->sched_cond;
-
-#ifdef USE_OVERLOAD
-		enum starpu_perf_archtype perf_arch;
-		perf_arch = starpu_worker_get_perf_archtype(workerid);
-		calibration_value += (unsigned int) starpu_worker_get_relative_speedup(perf_arch);
-#endif /* USE_OVERLOAD */
-	}
 }
 
 static void deinit_ws_policy(unsigned sched_ctx_id)
 {
-	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	unsigned nworkers_ctx = sched_ctx->nworkers;
+	work_stealing_data *ws = (work_stealing_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
+	_STARPU_PTHREAD_MUTEX_DESTROY(&ws->sched_mutex);
+	_STARPU_PTHREAD_COND_DESTROY(&ws->sched_cond);
 
-	struct work_stealing_data *data = (struct work_stealing_data*)malloc(sizeof(work_stealing_data));
-	
-
-	pthread_mutex_init(&data->sched_mutex, NULL);
-	pthread_cond_init(&data->sched_cond, NULL);
-
-	int workerid;
-	unsigned workerid_ctx;
-	for (workerid_ctx = 0; workerid_ctx < nworkers_ctx; workerid_ctx++)
-	{
-		workerid = sched_ctx->workerids[workerid_ctx];
-		_starpu_destroy_deque(&data->queue_array[workerid]);
-		sched_ctx->sched_mutex[workerid] = &data->sched_mutex;
-		sched_ctx->sched_cond[workerid] = &data->sched_cond;
-	}
+	free(ws->queue_array);
+        free(ws);
+        starpu_delete_worker_collection_for_sched_ctx(sched_ctx_id);
 }
 
 struct starpu_sched_policy _starpu_sched_ws_policy =
 {
 	.init_sched = initialize_ws_policy,
 	.deinit_sched = deinit_ws_policy,
+	.add_workers = ws_add_workers,
+	.remove_workers = ws_remove_workers,
 	.push_task = ws_push_task,
 	.pop_task = ws_pop_task,
 	.pre_exec_hook = NULL,
 	.post_exec_hook = NULL,
 	.pop_every_task = NULL,
 	.policy_name = "ws",
-	.policy_description = "work stealing",
-	.init_sched_for_workers = initialize_ws_policy_for_workers
+	.policy_description = "work stealing"
 };