Quellcode durchsuchen

in node_work_stealing.c :
fix missing mutex
use prio_deque

Simon Archipoff vor 12 Jahren
Ursprung
Commit
193f04d5bd
1 geänderte Dateien mit 13 neuen und 13 gelöschten Zeilen
  1. 13 13
      src/sched_policies/node_work_stealing.c

+ 13 - 13
src/sched_policies/node_work_stealing.c

@@ -1,5 +1,5 @@
 #include "node_sched.h"
 #include "node_sched.h"
-#include "fifo_queues.h"
+#include "prio_deque.h"
 #include <starpu_scheduler.h>
 #include <starpu_scheduler.h>
 #include <starpu.h>
 #include <starpu.h>
 
 
@@ -8,12 +8,10 @@ struct _starpu_work_stealing_data
 /* keep track of the work performed from the beginning of the algorithm to make
 /* keep track of the work performed from the beginning of the algorithm to make
  * better decisions about which queue to child when stealing or deferring work
  * better decisions about which queue to child when stealing or deferring work
  */
  */
-	
 	unsigned performed_total;
 	unsigned performed_total;
 	unsigned last_pop_child;
 	unsigned last_pop_child;
 	unsigned last_push_child;
 	unsigned last_push_child;
-	
-	struct _starpu_fifo_taskq ** fifos;
+	struct _starpu_prio_deque * fifos;	
 	starpu_pthread_mutex_t * mutexes;
 	starpu_pthread_mutex_t * mutexes;
 };
 };
 
 
@@ -32,9 +30,9 @@ static struct starpu_task *  steal_task_round_robin(struct _starpu_sched_node *n
 	struct starpu_task * task = NULL;
 	struct starpu_task * task = NULL;
 	while (1)
 	while (1)
 	{
 	{
-		struct _starpu_fifo_taskq * fifo = wsd->fifos[i];
+		struct _starpu_prio_deque * fifo = wsd->fifos + i;
 		STARPU_PTHREAD_MUTEX_LOCK(wsd->mutexes + i);
 		STARPU_PTHREAD_MUTEX_LOCK(wsd->mutexes + i);
-		task = _starpu_fifo_pop_task(fifo, workerid);
+		task = _starpu_prio_deque_deque_task_for_worker(fifo, workerid);
 		STARPU_PTHREAD_MUTEX_UNLOCK(wsd->mutexes + i);
 		STARPU_PTHREAD_MUTEX_UNLOCK(wsd->mutexes + i);
 		if(task)
 		if(task)
 		{
 		{
@@ -48,8 +46,8 @@ static struct starpu_task *  steal_task_round_robin(struct _starpu_sched_node *n
 			return NULL;
 			return NULL;
 		}
 		}
 		i = (i + 1) % node->nchilds;
 		i = (i + 1) % node->nchilds;
+	
 	}
 	}
-
 	return task;
 	return task;
 }
 }
 
 
@@ -106,7 +104,7 @@ static struct starpu_task * pop_task(struct _starpu_sched_node * node, unsigned
 	STARPU_ASSERT(i < node->nchilds);
 	STARPU_ASSERT(i < node->nchilds);
 	struct _starpu_work_stealing_data * wsd = node->data;
 	struct _starpu_work_stealing_data * wsd = node->data;
 	STARPU_PTHREAD_MUTEX_LOCK(wsd->mutexes + i);
 	STARPU_PTHREAD_MUTEX_LOCK(wsd->mutexes + i);
-	struct starpu_task * task = _starpu_fifo_pop_local_task(wsd->fifos[i]);
+	struct starpu_task * task = _starpu_prio_deque_pop_task(wsd->fifos + i);
 	STARPU_PTHREAD_MUTEX_UNLOCK(wsd->mutexes + i);
 	STARPU_PTHREAD_MUTEX_UNLOCK(wsd->mutexes + i);
 	if(task)
 	if(task)
 		return task;
 		return task;
@@ -114,7 +112,7 @@ static struct starpu_task * pop_task(struct _starpu_sched_node * node, unsigned
 	if(task)
 	if(task)
 	{
 	{
 		STARPU_PTHREAD_MUTEX_LOCK(wsd->mutexes + i);
 		STARPU_PTHREAD_MUTEX_LOCK(wsd->mutexes + i);
-		wsd->fifos[i]->nprocessed++;
+		wsd->fifos[i].nprocessed++;
 		STARPU_PTHREAD_MUTEX_UNLOCK(wsd->mutexes + i);
 		STARPU_PTHREAD_MUTEX_UNLOCK(wsd->mutexes + i);
 		return task;
 		return task;
 	}
 	}
@@ -138,7 +136,9 @@ static int push_task(struct _starpu_sched_node * node, struct starpu_task * task
 		struct _starpu_sched_node * child = node->childs[i];
 		struct _starpu_sched_node * child = node->childs[i];
 		if(_starpu_sched_node_can_execute_task(child,task))
 		if(_starpu_sched_node_can_execute_task(child,task))
 		{
 		{
-			ret = _starpu_fifo_push_sorted_task(wsd->fifos[i], task);
+			STARPU_PTHREAD_MUTEX_LOCK(wsd->mutexes + i);
+			ret = _starpu_prio_deque_push_task(wsd->fifos + i, task);
+			STARPU_PTHREAD_MUTEX_UNLOCK(wsd->mutexes + i);
 			break;
 			break;
 		}
 		}
 	}
 	}
@@ -203,12 +203,12 @@ static void init_ws_data(struct _starpu_sched_node *node)
 		STARPU_ASSERT(node->childs[i] != NULL);
 		STARPU_ASSERT(node->childs[i] != NULL);
 	}
 	}
 	int size = node->nchilds;
 	int size = node->nchilds;
-	wsd->fifos = malloc(sizeof(struct _starpu_fifo_taskq*) * size);
+	wsd->fifos = malloc(sizeof(struct _starpu_prio_deque) * size);
 	wsd->mutexes = malloc(sizeof(starpu_pthread_rwlock_t) * size);
 	wsd->mutexes = malloc(sizeof(starpu_pthread_rwlock_t) * size);
 
 
 	for(i = 0; i < size; i++)
 	for(i = 0; i < size; i++)
 	{
 	{
-		wsd->fifos[i] = _starpu_create_fifo();
+		_starpu_prio_deque_init(wsd->fifos + i);
 		STARPU_PTHREAD_MUTEX_INIT(wsd->mutexes + i, NULL);
 		STARPU_PTHREAD_MUTEX_INIT(wsd->mutexes + i, NULL);
 	}
 	}
 }
 }
@@ -220,7 +220,7 @@ static void deinit_ws_data(struct _starpu_sched_node *node)
 	for(i = 0; i < node->nchilds; i++)
 	for(i = 0; i < node->nchilds; i++)
 	{
 	{
 		STARPU_PTHREAD_MUTEX_DESTROY(wsd->mutexes + i);
 		STARPU_PTHREAD_MUTEX_DESTROY(wsd->mutexes + i);
-		_starpu_destroy_fifo(wsd->fifos[i]);
+		_starpu_prio_deque_destroy(wsd->fifos + i);
 	}
 	}
 	free(wsd->mutexes);
 	free(wsd->mutexes);
 	free(wsd->fifos);
 	free(wsd->fifos);