Browse Source

work stealing wake up all workers when a task is pushed by a worker

Simon Archipoff 12 years ago
parent
commit
6a03f85f2b

+ 0 - 3
src/sched_policies/fifo_queues.c

@@ -71,7 +71,6 @@ int
 _starpu_fifo_push_sorted_task(struct _starpu_fifo_taskq *fifo_queue, struct starpu_task *task)
 {
 	struct starpu_task_list *list = &fifo_queue->taskq;
-
 	if (list->head == NULL)
 	{
 		list->head = task;
@@ -148,7 +147,6 @@ int _starpu_fifo_push_task(struct _starpu_fifo_taskq *fifo_queue, struct starpu_
 struct starpu_task *_starpu_fifo_pop_task(struct _starpu_fifo_taskq *fifo_queue, int workerid)
 {
 	struct starpu_task *task;
-
 	for (task  = starpu_task_list_begin(&fifo_queue->taskq);
 	     task != starpu_task_list_end(&fifo_queue->taskq);
 	     task  = starpu_task_list_next(task))
@@ -161,7 +159,6 @@ struct starpu_task *_starpu_fifo_pop_task(struct _starpu_fifo_taskq *fifo_queue,
 			{
 				starpu_task_set_implementation(task, nimpl);
 				starpu_task_list_erase(&fifo_queue->taskq, task);
-				//		fprintf(stderr,"nb task %d prio %d\n", fifo_queue->ntasks, task->priority);
 				fifo_queue->ntasks--;
 				_STARPU_TRACE_JOB_POP(task, 0);
 				return task;

+ 2 - 0
src/sched_policies/node_eager.c

@@ -31,6 +31,7 @@ static void add_worker_eager(unsigned sched_ctx_id, int * workerids, unsigned nw
 		t->root->add_child(t->root, _starpu_sched_node_worker_get(workerids[i]), sched_ctx_id);
 		_starpu_sched_node_worker_get(workerids[i])->fathers[sched_ctx_id] = t->root;
 	}
+	_starpu_tree_update_after_modification(t);
 }
 
 static void remove_worker_eager(unsigned sched_ctx_id, int * workerids, unsigned nworkers)
@@ -42,6 +43,7 @@ static void remove_worker_eager(unsigned sched_ctx_id, int * workerids, unsigned
 		t->root->remove_child(t->root, _starpu_sched_node_worker_get(workerids[i]), sched_ctx_id);
 		_starpu_sched_node_worker_get(workerids[i])->fathers[sched_ctx_id] = NULL;
 	}
+	_starpu_tree_update_after_modification(t);
 }
 
 

+ 19 - 4
src/sched_policies/node_work_stealing.c

@@ -26,12 +26,12 @@ static struct starpu_task *  steal_task_round_robin(struct _starpu_sched_node *n
 {
 	struct _starpu_work_stealing_data *wsd = node->data;
 	unsigned i = wsd->last_pop_child;
+	wsd->last_pop_child = (wsd->last_pop_child + 1) % node->nchilds;
 	/* If the worker's queue have no suitable tasks, let's try
 	 * the next ones */
 	struct starpu_task * task = NULL;
 	while (1)
 	{
-		i = (i + 1) % node->nchilds;
 		struct _starpu_fifo_taskq * fifo = wsd->fifos[i];
 		STARPU_PTHREAD_MUTEX_LOCK(wsd->mutexes + i);
 		task = _starpu_fifo_pop_task(fifo, workerid);
@@ -47,9 +47,9 @@ static struct starpu_task *  steal_task_round_robin(struct _starpu_sched_node *n
 			 * don't go in infinite loop */
 			return NULL;
 		}
+		i = (i + 1) % node->nchilds;
 	}
 
-	wsd->last_pop_child = i;
 	return task;
 }
 
@@ -118,7 +118,12 @@ static struct starpu_task * pop_task(struct _starpu_sched_node * node, unsigned
 		return task;
 	task  = steal_task(node, workerid);
 	if(task)
+	{
+		STARPU_PTHREAD_MUTEX_LOCK(wsd->mutexes + i);
+		wsd->fifos[i]->nprocessed++;
+		STARPU_PTHREAD_MUTEX_UNLOCK(wsd->mutexes + i);
 		return task;
+	}
 	if(node->fathers[sched_ctx_id])
 		return node->fathers[sched_ctx_id]->pop_task(node->fathers[sched_ctx_id],sched_ctx_id);
 	else
@@ -133,7 +138,6 @@ static int push_task(struct _starpu_sched_node * node, struct starpu_task * task
 	struct _starpu_work_stealing_data * wsd = node->data;
 	int ret = -1;
 	int start = wsd->last_push_child;
-	
 	int i;
 	for(i = (start+1)%node->nchilds; i != start; i = (i+1)%node->nchilds)
 	{
@@ -162,6 +166,7 @@ int _starpu_ws_push_task(struct starpu_task *task)
 	int workerid = starpu_worker_get_id();
 	if(workerid == -1)
 		return _starpu_tree_push_task(task);
+
 	unsigned sched_ctx_id = task->sched_ctx;
 	struct _starpu_sched_node * node =_starpu_sched_node_worker_get(workerid);
 	while(node->fathers[sched_ctx_id] != NULL)
@@ -179,10 +184,20 @@ int _starpu_ws_push_task(struct starpu_task *task)
 			STARPU_PTHREAD_MUTEX_LOCK(wsd->mutexes + i);
 			int ret = _starpu_fifo_push_sorted_task(wsd->fifos[i], task);
 			STARPU_PTHREAD_MUTEX_UNLOCK(wsd->mutexes + i);
-			node->childs[i]->available(node->childs[i]);
+			
+			//we need to wake all other workers
+			int j;
+			for(j = 0; j < node->nchilds; j++)
+			{
+				if(j == i)
+					continue;
+				node->childs[j]->available(node->childs[j]);
+			}
+
 			return ret;
 		}
 	}
+
 	STARPU_ASSERT_MSG(0, "there were a problem here, dont know what to do");
 	return _starpu_tree_push_task(task);
 }