Ver código fonte

new fifo with deque
firsts lines for bitmap

Simon Archipoff 12 anos atrás
pai
commit
becd3d35a2

+ 4 - 2
src/Makefile.am

@@ -120,7 +120,8 @@ noinst_HEADERS = 						\
 	top/starpu_top_message_queue.h				\
 	top/starpu_top_connection.h				\
 	top/starpu_top_core.h					\
-	sched_policies/node_sched.h
+	sched_policies/node_sched.h				\
+	sched_policies/prio_deque.h
 
 libstarpu_@STARPU_EFFECTIVE_VERSION@_la_SOURCES = 						\
 	common/barrier.c					\
@@ -228,7 +229,8 @@ libstarpu_@STARPU_EFFECTIVE_VERSION@_la_SOURCES = 						\
 	sched_policies/node_work_stealing.c			\
 	sched_policies/node_fifo.c 				\
 	sched_policies/node_heft.c				\
-	sched_policies/hierarchical_heft.c
+	sched_policies/hierarchical_heft.c			\
+	sched_policies/prio_deque.c
 if STARPU_USE_CPU
 libstarpu_@STARPU_EFFECTIVE_VERSION@_la_SOURCES += drivers/cpu/driver_cpu.c
 endif

+ 69 - 0
src/sched_policies/bitmap.c

@@ -0,0 +1,69 @@
+
+#include <limits.h>
+
+struct _starpu_bitmap{
+	unsigned long * bits;
+	int size;
+};
+
+struct _starpu_bitmap * _starpu_bitmap_create(void)
+{
+	struct _starpu_bitmap * b = malloc(sizeof(*b));
+	memset(b,0,sizeof(*b));
+	return b;
+}
+void _starpu_bitmap_destroy(struct _starpu_bitmap * b)
+{
+	free(b->bits);
+	free(b);
+}
+
+
+struct tuple {
+	int nb_longs;
+	int nb_bits;
+};
+
+static inline struct tuple get_tuple(int e)
+{
+	int i;
+	for(i = 0; e > LONG_BIT; e -= LONG_BIT, i++)
+		;
+	struct tuple t = {i, e};
+	return t;
+}
+
+void _starpu_bitmap_set(struct _starpu_bitmap * b, int e)
+{
+	struct tuple t = get_tuple(e);
+	if(t.nb_long + 1 > b->size)
+	{
+		b->bits = realloc(b->bits, sizeof(unsigned long) * (t.nb_long + 1));
+		b->size = t.nb_long + 1;
+	}
+	b->bits[t.nb_long] |= (1ul << t.bn_bits);
+}
+void _starpu_bitmap_unset(struct _starpu_bitmap *b, int e)
+{
+	struct tuple t = get_tuple(e);
+	if(e / LONG_BIT > b->size)
+		return;
+	b->bits[t.nb_long] ^= ~(1ul << t.bn_bits);
+}
+
+int _starpu_bitmap_get(struct _starpu_bitmap * b, int e)
+{
+	if(e / LONG_BIT > b->size)
+		return 0;
+	int i;
+	struct tuple t = get_tuple(e);
+	return b->bits[t.nb_longs] & (1 << t.nb_bits);
+}
+
+
+int _starpu_bitmap_and_get(struct _starpu_bitmap * b1, struct _starpu_bitmap * b2, int e)
+{
+	return _starpu_bitmap_get(b1,e) && _starpu_bitmap_get(b2,e);
+}
+
+

+ 21 - 0
src/sched_policies/bitmap.h

@@ -0,0 +1,21 @@
+#ifndef __BITMAP_H__
+#define __BITMAP_H__
+
+struct _starpu_bitmap;
+
+struct _starpu_bitmap * _starpu_bitmap_create(void);
+void _starpu_bitmap_destroy(struct _starpu_bitmap *);
+
+void _starpu_bitmap_set(struct _starpu_bitmap *, int);
+void _starpu_bitmap_unset(struct _starpu_bitmap *, int);
+
+int _starpu_bitmap_get(struct _starpu_bitmap *, int);
+
+//return 1 iff e set in b1 AND e set in b2
+int _starpu_bitmap_and_get(struct _starpu_bitmap * b1,
+			   struct _starpu_bitmap * b2,
+			   int e);
+
+
+
+#endif

+ 2 - 2
src/sched_policies/hierarchical_heft.c

@@ -92,13 +92,13 @@ static void remove_worker_heft(unsigned sched_ctx_id, int * workerids, unsigned
 		struct _starpu_sched_node * node = _starpu_sched_tree_remove_worker(t, workerid, sched_ctx_id);
 		if(node)
 		{
-			if(_starpu_sched_node_is_fifo(node))
+			/*if(_starpu_sched_node_is_fifo(node))
 			{
 				STARPU_ASSERT(_starpu_sched_node_is_fifo(node));
 				struct starpu_task_list list = _starpu_sched_node_fifo_get_non_executable_tasks(node);
 				int res = _starpu_sched_node_push_tasks_to_firsts_suitable_parent(node, &list, sched_ctx_id);
 				STARPU_ASSERT(!res); (void) res;
-			}
+				}*/
 		}
 		_starpu_node_destroy_rec(node, sched_ctx_id);
 	}

+ 14 - 12
src/sched_policies/node_fifo.c

@@ -1,11 +1,11 @@
 #include "node_sched.h"
-#include "fifo_queues.h"
+#include "prio_deque.h"
 #include <starpu_scheduler.h>
 
 
 struct _starpu_fifo_data
 {
-	struct _starpu_fifo_taskq * fifo;
+	struct _starpu_prio_deque fifo;
 	starpu_pthread_mutex_t mutex;
 };
 
@@ -24,7 +24,7 @@ static struct _starpu_task_execute_preds estimated_execute_preds(struct _starpu_
 	if(preds.state == PERF_MODEL)
 	{
 		struct _starpu_fifo_data * data = node->data;
-		struct _starpu_fifo_taskq * fifo = data->fifo;
+		struct _starpu_prio_deque * fifo = &data->fifo;
 		starpu_pthread_mutex_t * mutex = &data->mutex;
 		STARPU_PTHREAD_MUTEX_LOCK(mutex);
 		preds.expected_finish_time = _starpu_compute_expected_time(fifo->exp_start,
@@ -45,7 +45,7 @@ static double estimated_load(struct _starpu_sched_node * node)
 	for(i = 0; i < nworkers; i++)
 		relative_speedup += starpu_worker_get_relative_speedup(starpu_worker_get_perf_archtype(node->workerids[i]));
 	relative_speedup /= nworkers;
-	struct _starpu_fifo_taskq * fifo = node->data;
+	struct _starpu_prio_deque * fifo = node->data;
 	STARPU_ASSERT(!_STARPU_IS_ZERO(relative_speedup));
 	double load = fifo->ntasks / relative_speedup; 
 	for(i = 0; i < node->nchilds; i++)
@@ -60,13 +60,13 @@ static int push_task(struct _starpu_sched_node * node, struct starpu_task * task
 {
 	STARPU_ASSERT(node->nworkers > 0);
 	struct _starpu_fifo_data * data = node->data;
-	struct _starpu_fifo_taskq * fifo = data->fifo;
+	struct _starpu_prio_deque * fifo = &data->fifo;
 	starpu_pthread_mutex_t * mutex = &data->mutex;
 	STARPU_PTHREAD_MUTEX_LOCK(mutex);
 	STARPU_ASSERT(!isnan(fifo->exp_end));
 	STARPU_ASSERT(!isnan(fifo->exp_len));
 	STARPU_ASSERT(!isnan(fifo->exp_start));
-	int ret = _starpu_fifo_push_sorted_task(fifo, task);
+	int ret = _starpu_prio_deque_push_task(fifo, task);
 	if(!isnan(task->predicted))
 	{
 		fifo->exp_len += task->predicted/node->nworkers;
@@ -84,13 +84,15 @@ static int push_task(struct _starpu_sched_node * node, struct starpu_task * task
 static struct starpu_task * pop_task(struct _starpu_sched_node * node, unsigned sched_ctx_id)
 {
 	struct _starpu_fifo_data * data = node->data;
-	struct _starpu_fifo_taskq * fifo = data->fifo;
+	struct _starpu_prio_deque * fifo = &data->fifo;
 	starpu_pthread_mutex_t * mutex = &data->mutex;
 	STARPU_PTHREAD_MUTEX_LOCK(mutex);
 	STARPU_ASSERT(!isnan(fifo->exp_end));
 	STARPU_ASSERT(!isnan(fifo->exp_len));
 	STARPU_ASSERT(!isnan(fifo->exp_start));
-	struct starpu_task * task  = _starpu_fifo_pop_task(fifo, starpu_worker_get_id());
+	struct starpu_task * task  = node->is_homogeneous ?
+		_starpu_prio_deque_pop_task(fifo):
+		_starpu_prio_deque_pop_task_for_worker(fifo, starpu_worker_get_id());
 	if(task)
 	{
 		fifo->exp_start = starpu_timing_now();
@@ -110,13 +112,13 @@ static struct starpu_task * pop_task(struct _starpu_sched_node * node, unsigned
 		return father->pop_task(father,sched_ctx_id);
 	return NULL;
 }
-
+/*
 struct starpu_task_list  _starpu_sched_node_fifo_get_non_executable_tasks(struct _starpu_sched_node * node)
 {
 	struct starpu_task_list list;
 	starpu_task_list_init(&list);
 	struct _starpu_fifo_data * data = node->data;
-	struct _starpu_fifo_taskq * fifo = data->fifo;
+	struct _starpu_prio_deque * fifo = data->fifo;
 	struct starpu_task * task;
 	for (task  = starpu_task_list_begin(&fifo->taskq);
 	     task != starpu_task_list_end(&fifo->taskq);
@@ -132,7 +134,7 @@ struct starpu_task_list  _starpu_sched_node_fifo_get_non_executable_tasks(struct
 	}
 	return list;
 }
-
+*/
 int _starpu_sched_node_is_fifo(struct _starpu_sched_node * node)
 {
 	return node->estimated_execute_preds == estimated_execute_preds
@@ -145,7 +147,7 @@ struct _starpu_sched_node * _starpu_sched_node_fifo_create(void)
 {
 	struct _starpu_sched_node * node = _starpu_sched_node_create();
 	struct _starpu_fifo_data * data = malloc(sizeof(*data));
-	data->fifo = _starpu_create_fifo();
+	_starpu_prio_deque_init(&data->fifo);
 	STARPU_PTHREAD_MUTEX_INIT(&data->mutex,NULL);
 	node->data = data;
 	node->estimated_execute_preds = estimated_execute_preds;

+ 1 - 1
src/sched_policies/node_sched.h

@@ -108,7 +108,7 @@ int _starpu_sched_node_worker_get_workerid(struct _starpu_sched_node * worker_no
 
 struct _starpu_sched_node * _starpu_sched_node_fifo_create(void);
 int _starpu_sched_node_is_fifo(struct _starpu_sched_node * node);
-struct starpu_task_list  _starpu_sched_node_fifo_get_non_executable_tasks(struct _starpu_sched_node * fifo_node);
+//struct starpu_task_list  _starpu_sched_node_fifo_get_non_executable_tasks(struct _starpu_sched_node * fifo_node);
 
 /* struct _starpu_sched_node * _starpu_sched_node_work_stealing_create(void); */
 int _starpu_sched_node_is_work_stealing(struct _starpu_sched_node * node);

+ 8 - 9
src/sched_policies/node_worker.c

@@ -3,10 +3,13 @@
 #include <float.h>
 
 static struct _starpu_sched_node * _worker_nodes[STARPU_NMAXWORKERS];
-static struct _starpu_sched_node  * _starpu_sched_node_worker_create(int workerid);
+
+static struct _starpu_sched_node * _starpu_sched_node_worker_create(int workerid);
+
 struct _starpu_sched_node * _starpu_sched_node_worker_get(int workerid)
 {
 	STARPU_ASSERT(workerid >= 0 && workerid < STARPU_NMAXWORKERS);
+	/* we may need to take a mutex here */
 	if(_worker_nodes[workerid])
 		return _worker_nodes[workerid];
 	else
@@ -21,12 +24,6 @@ int _starpu_sched_node_worker_push_task(struct _starpu_sched_node * node, struct
 	
 	int ret = _starpu_push_local_task(node->data, task, task->priority);
 	return ret;
-/*	STARPU_PTHREAD_MUTEX_LOCK(&node->mutex);
-	int ret_val = _starpu_fifo_push_sorted_task(node->fifo, task);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&node->mutex);
-	node->available(node);
-	return ret_val;
-*/
 }
 
 struct starpu_task * _starpu_sched_node_worker_pop_task(struct _starpu_sched_node *node,unsigned sched_ctx_id)
@@ -34,8 +31,10 @@ struct starpu_task * _starpu_sched_node_worker_pop_task(struct _starpu_sched_nod
 	struct _starpu_sched_node *father = node->fathers[sched_ctx_id];
 	if(father == NULL)
 		return NULL;
-	else
-		return father->pop_task(father,sched_ctx_id);
+	struct starpu_task * task = father->pop_task(father,sched_ctx_id);
+	if(task)
+		starpu_push_task_end(task);
+	return task;
 }
 void _starpu_sched_node_worker_destroy(struct _starpu_sched_node *node)
 {

+ 147 - 0
src/sched_policies/prio_deque.c

@@ -0,0 +1,147 @@
+#include "prio_deque.h"
+#include <core/workers.h>
+
+
+void _starpu_prio_deque_init(struct _starpu_prio_deque * pdeque)
+{
+	memset(pdeque,0,sizeof(*pdeque));
+}
+void _starpu_prio_deque_destroy(struct _starpu_prio_deque * pdeque)
+{
+	while(pdeque->list)
+	{
+		struct _starpu_prio_list * l = pdeque->list;
+		pdeque->list = l->next;
+		STARPU_ASSERT(starpu_task_list_empty(&l->list));
+		free(l);
+	}
+}
+
+int _starpu_prio_deque_is_empty(struct _starpu_prio_deque * pdeque)
+{
+	
+	return pdeque->ntasks == 0;
+/*
+	struct _starpu_prio_list * l = pdeque->list;
+	while(l)
+	{
+		if(!starpu_task_list_empty(&l->list))
+			return 0;
+		l = l->next;
+	}
+	return 1;
+*/
+}
+
+
+static struct _starpu_prio_list * _starpu_prio_list_create(int prio)
+{
+	struct _starpu_prio_list * l = malloc(sizeof(*l));
+	memset(l, 0, sizeof(*l));
+	l->prio = prio;
+	return l;
+}
+
+
+
+int _starpu_prio_deque_push_task(struct _starpu_prio_deque * pdeque, struct starpu_task * task)
+{
+	STARPU_ASSERT(task != NULL);
+        struct _starpu_prio_list * l;
+	if(pdeque->list == NULL)
+		pdeque->list =  l = _starpu_prio_list_create(task->priority);
+	else
+	{
+		struct _starpu_prio_list * current = pdeque->list;
+		struct _starpu_prio_list * prev  = NULL;
+		while(current)
+		{
+			if(current->prio <= task->priority)
+				break;
+			prev = current;
+			current = current->next;
+		}
+		if(!current)
+			prev->next = current = l = _starpu_prio_list_create(task->priority);
+		if(current->prio == task->priority)
+			l = current;
+		if(prev == NULL)
+		{
+			l = pdeque->list;
+			pdeque->list = _starpu_prio_list_create(task->priority);
+			pdeque->list->next = l;
+			l = pdeque->list;
+		}
+		else
+		{
+			l = _starpu_prio_list_create(task->priority);
+			l->next = current;
+			prev->next = l;
+		}
+	}
+	
+	starpu_task_list_push_back(&l->list, task);
+	pdeque->ntasks++;
+	return 0;
+}
+
+
+static inline int pred_true(struct starpu_task * t STARPU_ATTRIBUTE_UNUSED, void * v STARPU_ATTRIBUTE_UNUSED)
+{
+	return 1;
+}
+
+static inline int pred_can_execute(struct starpu_task * t, void * pworkerid)
+{
+	int i;
+	for(i = 0; i < STARPU_MAXIMPLEMENTATIONS; i++)
+		if(starpu_worker_can_execute_task(*(int*)pworkerid, t,i))
+			return 1;
+	return 0;
+}
+
+
+#define REMOVE_TASK(pdeque, first_task_field, next_task_field, predicate, parg)	\
+	({								\
+		struct _starpu_prio_list * l = pdeque->list;		\
+		struct starpu_task * t = NULL;				\
+		while(l)						\
+		{							\
+			t = l->list.first_task_field;			\
+			while(t && !predicate(t,parg))			\
+				t = t->next_task_field;			\
+			if(t)						\
+			{						\
+				starpu_task_list_erase(&l->list, t);	\
+				l = NULL;				\
+			}						\
+			if(l)						\
+				l = l->next;				\
+		}							\
+		if(t)							\
+		{							\
+			pdeque->ntasks--;				\
+		}							\
+		t;							\
+	})
+
+struct starpu_task * _starpu_prio_deque_pop_task(struct _starpu_prio_deque * pdeque)
+{
+	struct starpu_task * t = REMOVE_TASK(pdeque, head, prev, pred_true, STARPU_POISON_PTR);
+	return t;
+}
+struct starpu_task * _starpu_prio_deque_pop_task_for_worker(struct _starpu_prio_deque * pdeque, int workerid)
+{
+	return REMOVE_TASK(pdeque, head, prev, pred_can_execute, &workerid);
+}
+
+// deque a task of the higher priority available
+struct starpu_task * _starpu_prio_deque_deque_task(struct _starpu_prio_deque * pdeque)
+{
+	return REMOVE_TASK(pdeque, tail, next, pred_true, STARPU_POISON_PTR);
+}
+
+struct starpu_task * _starpu_prio_deque_deque_task_for_worker(struct _starpu_prio_deque * pdeque, int workerid)
+{
+	return REMOVE_TASK(pdeque, tail, next, pred_can_execute, &workerid);
+}

+ 36 - 0
src/sched_policies/prio_deque.h

@@ -0,0 +1,36 @@
+#ifndef __PRIO_DEQUE_H__
+#define __PRIO_DEQUE_H__
+#include <starpu.h>
+#include <starpu_task_list.h>
+
+
+struct _starpu_prio_list
+{
+	int prio;
+	struct starpu_task_list list;
+	struct _starpu_prio_list * next;
+};
+
+struct _starpu_prio_deque
+{
+	struct _starpu_prio_list * list;
+	unsigned ntasks;
+	unsigned nprocessed;
+	double exp_start, exp_end, exp_len;
+};
+
+void _starpu_prio_deque_init(struct _starpu_prio_deque *);
+void _starpu_prio_deque_destroy(struct _starpu_prio_deque *);
+
+int _starpu_prio_deque_is_empty(struct _starpu_prio_deque *);
+
+int _starpu_prio_deque_push_task(struct _starpu_prio_deque *, struct starpu_task*);
+
+struct starpu_task * _starpu_prio_deque_pop_task(struct _starpu_prio_deque*);
+struct starpu_task * _starpu_prio_deque_pop_task_for_worker(struct _starpu_prio_deque*, int workerid);
+
+// deque a task of the higher priority available
+struct starpu_task * _starpu_prio_deque_deque_task(struct _starpu_prio_deque *);
+struct starpu_task * _starpu_prio_deque_deque_task_for_worker(struct _starpu_prio_deque *, int workerid);
+
+#endif // __PRIO_DEQUE_H__