소스 검색

various bug fix
implementation of estimated_end and estimated_load for work_stealing
perf problem with work stealing...

Simon Archipoff 12 년 전
부모
커밋
d86a5c89f4
3개의 변경된 파일83개의 추가작업 그리고 15개의 파일을 삭제
  1. 7 0
      src/sched_policies/node_heft.c
  2. 74 8
      src/sched_policies/node_work_stealing.c
  3. 2 7
      src/sched_policies/node_worker.c

+ 7 - 0
src/sched_policies/node_heft.c

@@ -279,6 +279,13 @@ static void initialize_heft_center_policy(unsigned sched_ctx_id)
 		struct starpu_sched_node * worker_node = starpu_sched_node_worker_get(i);
 		STARPU_ASSERT(worker_node);
 
+#if 1
+		struct starpu_sched_node * ws = starpu_sched_node_work_stealing_create(NULL);
+		ws->add_child(ws, worker_node);
+		starpu_sched_node_set_father(worker_node, ws, sched_ctx_id);
+		worker_node = ws;
+#endif
+
 		struct starpu_sched_node * impl_node = starpu_sched_node_best_implementation_create(NULL);
 		impl_node->add_child(impl_node, worker_node);
 		starpu_sched_node_set_father(worker_node, impl_node, sched_ctx_id);

+ 74 - 8
src/sched_policies/node_work_stealing.c

@@ -2,15 +2,14 @@
 #include "prio_deque.h"
 #include <starpu_scheduler.h>
 #include <starpu.h>
-
+#include <float.h>
 struct _starpu_work_stealing_data
 {
 /* keep track of the work performed from the beginning of the algorithm to make
  * better decisions about which queue to child when stealing or deferring work
  */
-	unsigned performed_total;
-	unsigned last_pop_child;
-	unsigned last_push_child;
+	unsigned performed_total, last_pop_child, last_push_child;
+
 	struct _starpu_prio_deque ** fifos;	
 	starpu_pthread_mutex_t ** mutexes;
 	int size;
@@ -32,14 +31,18 @@ static struct starpu_task *  steal_task_round_robin(struct starpu_sched_node *no
 	while (1)
 	{
 		struct _starpu_prio_deque * fifo = wsd->fifos[i];
+
 		STARPU_PTHREAD_MUTEX_LOCK(wsd->mutexes[i]);
 		task = _starpu_prio_deque_deque_task_for_worker(fifo, workerid);
-		STARPU_PTHREAD_MUTEX_UNLOCK(wsd->mutexes[i]);
-		if(task)
+		if(task && !isnan(task->predicted))
 		{
+			fifo->exp_len -= task->predicted;
 			fifo->nprocessed--;
-			break;
 		}
+		STARPU_PTHREAD_MUTEX_UNLOCK(wsd->mutexes[i]);
+		if(task)
+			break;
+
 		if (i == wsd->last_pop_child)
 		{
 			/* We got back to the first worker,
@@ -106,15 +109,27 @@ static struct starpu_task * pop_task(struct starpu_sched_node * node, unsigned s
 	struct _starpu_work_stealing_data * wsd = node->data;
 	STARPU_PTHREAD_MUTEX_LOCK(wsd->mutexes[i]);
 	struct starpu_task * task = _starpu_prio_deque_pop_task(wsd->fifos[i]);
+	if(task)
+	{
+		if(!isnan(task->predicted))
+			wsd->fifos[i]->exp_len -= task->predicted;
+	}
+	else
+		wsd->fifos[i]->exp_len = 0.0;
+
 	STARPU_PTHREAD_MUTEX_UNLOCK(wsd->mutexes[i]);
 	if(task)
+	{
 		return task;
+	}
+	
 	task  = steal_task(node, workerid);
 	if(task)
 	{
 		STARPU_PTHREAD_MUTEX_LOCK(wsd->mutexes[i]);
 		wsd->fifos[i]->nprocessed++;
 		STARPU_PTHREAD_MUTEX_UNLOCK(wsd->mutexes[i]);
+
 		return task;
 	}
 	if(node->fathers[sched_ctx_id])
@@ -123,7 +138,48 @@ static struct starpu_task * pop_task(struct starpu_sched_node * node, unsigned s
 		return NULL;
 }
 
+double _ws_estimated_end(struct starpu_sched_node * node)
+{
+	STARPU_ASSERT(starpu_sched_node_is_work_stealing(node));
+	struct _starpu_work_stealing_data * wsd = node->data;
+	double sum_len = 0.0;
+	double sum_end = 0.0;
+	int i;
+	for(i = 0; i < node->nchilds; i++)
+	{
+		STARPU_PTHREAD_MUTEX_LOCK(wsd->mutexes[i]);
+		sum_len += wsd->fifos[i]->exp_len;
+		STARPU_PTHREAD_MUTEX_UNLOCK(wsd->mutexes[i]);
+		sum_end += node->childs[i]->estimated_end(node->childs[i]);
+	}
+	int nb_workers = starpu_bitmap_cardinal(node->workers_in_ctx);
+
+	return sum_end / node->nchilds + sum_len / nb_workers;
+}
 
+double _ws_estimated_load(struct starpu_sched_node * node)
+{
+	STARPU_ASSERT(starpu_sched_node_is_work_stealing(node));
+	struct _starpu_work_stealing_data * wsd = node->data;
+	int ntasks = 0;
+	int i;
+	for(i = 0; i < node->nchilds; i++)
+	{
+		STARPU_PTHREAD_MUTEX_LOCK(wsd->mutexes[i]);
+		ntasks += wsd->fifos[i]->ntasks;
+		STARPU_PTHREAD_MUTEX_UNLOCK(wsd->mutexes[i]);
+	}
+	double speedup = 0.0;
+	int workerid;
+	for(workerid = starpu_bitmap_first(node->workers_in_ctx);
+	    -1 != workerid;
+	    workerid = starpu_bitmap_next(node->workers_in_ctx, workerid))
+	{
+		speedup += starpu_worker_get_relative_speedup(workerid);
+	}
+	
+	return ntasks / speedup;
+}
 
 static int push_task(struct starpu_sched_node * node, struct starpu_task * task)
 {
@@ -156,6 +212,9 @@ int _starpu_ws_push_task(struct starpu_task *task)
 		node = node->fathers[sched_ctx_id];
 		if(starpu_sched_node_is_work_stealing(node))
 		{
+			if(!starpu_sched_node_can_execute_task(node, task))
+				return starpu_sched_tree_push_task(task);
+
 			int i;
 			for(i = 0; i < node->nchilds; i++)
 				if(is_worker_of_node(node->childs[i], workerid))
@@ -165,6 +224,8 @@ int _starpu_ws_push_task(struct starpu_task *task)
 			struct _starpu_work_stealing_data * wsd = node->data;
 			STARPU_PTHREAD_MUTEX_LOCK(wsd->mutexes[i]);
 			int ret = _starpu_prio_deque_push_task(wsd->fifos[i] , task);
+			if(ret == 0 && !isnan(task->predicted))
+				wsd->fifos[i]->exp_len += task->predicted;
 			STARPU_PTHREAD_MUTEX_UNLOCK(wsd->mutexes[i]);
 			
 			//we need to wake all workers
@@ -180,12 +241,13 @@ int _starpu_ws_push_task(struct starpu_task *task)
 		}
 	}
 
-	STARPU_ASSERT_MSG(0, "there were a problem here, dont know what to do");
+	/* this should not be reached */
 	return starpu_sched_tree_push_task(task);
 }
 
 
 
+
 void _ws_add_child(struct starpu_sched_node * node, struct starpu_sched_node * child)
 {
 	struct _starpu_work_stealing_data * wsd = node->data;
@@ -229,7 +291,9 @@ void _ws_remove_child(struct starpu_sched_node * node, struct starpu_sched_node
 	node->nchilds--;
 	struct starpu_task * task;
 	while((task = _starpu_prio_deque_pop_task(tmp_fifo)))
+	{
 		node->push_task(node, task);
+	}
 	_starpu_prio_deque_destroy(tmp_fifo);
 	free(tmp_fifo);
 }
@@ -248,6 +312,8 @@ struct starpu_sched_node * starpu_sched_node_work_stealing_create(void * arg STA
 	node->push_task = push_task;
 	node->add_child = _ws_add_child;
 	node->remove_child = _ws_remove_child;
+	node->estimated_end = _ws_estimated_end;
+	node->estimated_load = _ws_estimated_load;
 	node->deinit_data = _work_stealing_node_deinit_data;
 	node->data = wsd;
 	return  node;

+ 2 - 7
src/sched_policies/node_worker.c

@@ -223,7 +223,7 @@ static inline struct starpu_task * _starpu_worker_task_list_pop(struct _starpu_w
 			(void) STARPU_ATOMIC_ADD(p, -1);
 			if(*p == 0)
 				_starpu_task_grid_unset_left_right_member(t);
-//			l->ntasks--;
+			l->ntasks--;
 			if(!isnan(task->predicted))
 			{
 				l->exp_len -= task->predicted_transfer;
@@ -294,7 +294,7 @@ int starpu_sched_node_worker_push_task(struct starpu_sched_node * node, struct s
 	t->ntasks = 1;
 
 	task->workerid = starpu_bitmap_first(node->workers);
-#if 1 /* dead lock problem */
+#if 1 /* dead lock problem? */
 	if (starpu_get_prefetch_flag())
 	{
 		unsigned memory_node = starpu_worker_get_memory_node(task->workerid);
@@ -748,11 +748,6 @@ void starpu_sched_node_worker_pre_exec_hook(struct starpu_task * task)
 	{
 		struct _starpu_worker_task_list * list = _worker_get_list();
 		STARPU_PTHREAD_MUTEX_LOCK(&list->mutex);
-		if(!task->execute_on_a_specific_worker)
-		{
-			STARPU_ASSERT(list->ntasks != 0);
-			list->ntasks--;
-		}
 
 		list->exp_start = starpu_timing_now() + task->predicted;