Browse Source

eager_central_priority policy

Andra Hugo 13 years ago
parent
commit
cb2ae1a386

+ 1 - 1
src/Makefile.am

@@ -158,6 +158,7 @@ libstarpu_@STARPU_EFFECTIVE_VERSION@_la_SOURCES = 						\
 	sched_policies/fifo_queues.c				\
 	sched_policies/eager_central_policy.c			\
 	sched_policies/deque_modeling_policy_data_aware.c	\
+	sched_policies/eager_central_priority_policy.c		\
 	sched_policies/detect_combined_workers.c		\
 	drivers/driver_common/driver_common.c			\
 	datawizard/memory_nodes.c				\
@@ -210,7 +211,6 @@ libstarpu_@STARPU_EFFECTIVE_VERSION@_la_SOURCES = 						\
 	top/starpu_top_message_queue.c				\
 	top/starpu_top_connection.c                          	\
 	worker_collection/worker_list.c
-#	sched_policies/eager_central_priority_policy.c		
 #	sched_policies/work_stealing_policy.c			
 #	sched_policies/random_policy.c				
 #	sched_policies/parallel_heft.c				

+ 1 - 1
src/sched_policies/eager_central_policy.c

@@ -32,7 +32,7 @@ typedef struct {
 
 static void eager_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers) 
 {
-	eager_center_policy_data *data = (eager_center_policy_data*)malloc(sizeof(eager_center_policy_data));
+	eager_center_policy_data *data = (eager_center_policy_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
 	unsigned i;
 	int workerid;
 	for (i = 0; i < nworkers; i++)

+ 54 - 51
src/sched_policies/eager_central_priority_policy.c

@@ -32,7 +32,7 @@
 
 #define NPRIO_LEVELS	(MAX_LEVEL - MIN_LEVEL + 1)
 
-struct starpu_priority_taskq_s
+struct _starpu_priority_taskq
 {
 	/* the actual lists
 	 *	taskq[p] is for priority [p - STARPU_MIN_PRIO] */
@@ -52,11 +52,11 @@ typedef struct eager_central_prio_data{
  * Centralized queue with priorities
  */
 
-static struct starpu_priority_taskq_s *_starpu_create_priority_taskq(void)
+static struct _starpu_priority_taskq *_starpu_create_priority_taskq(void)
 {
-	struct starpu_priority_taskq_s *central_queue;
+	struct _starpu_priority_taskq *central_queue;
 
-	central_queue = (struct starpu_priority_taskq_s *) malloc(sizeof(struct starpu_priority_taskq_s));
+	central_queue = (struct _starpu_priority_taskq *) malloc(sizeof(struct _starpu_priority_taskq));
 	central_queue->total_ntasks = 0;
 
 	unsigned prio;
@@ -69,33 +69,39 @@ static struct starpu_priority_taskq_s *_starpu_create_priority_taskq(void)
 	return central_queue;
 }
 
-static void _starpu_destroy_priority_taskq(struct starpu_priority_taskq_s *priority_queue)
+static void _starpu_destroy_priority_taskq(struct _starpu_priority_taskq *priority_queue)
 {
 	free(priority_queue);
 }
 
-static void initialize_eager_center_priority_policy_for_workers(unsigned sched_ctx_id, int *workerids, unsigned nnew_workers) 
+static void eager_priority_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers) 
 {
-	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	struct eager_central_prio_data *data = (struct eager_central_prio_data*)sched_ctx->policy_data;
-
-	unsigned nworkers_ctx = sched_ctx->nworkers;
+	eager_central_prio_data *data = (eager_central_prio_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
 
 	unsigned i;
 	int workerid;
-	for (i = 0; i < nnew_workers; i++)
+	for (i = 0; i < nworkers; i++)
 	{
 		workerid = workerids[i];
-		sched_ctx->sched_mutex[workerid] = &data->sched_mutex;
-		sched_ctx->sched_cond[workerid] = &data->sched_cond;
+		starpu_worker_set_sched_condition(sched_ctx_id, workerid, &data->sched_mutex, &data->sched_cond);
 	}
+}
 
+static void eager_priority_remove_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
+{
+	unsigned i;
+	int workerid;
+	for (i = 0; i < nworkers; i++)
+	{
+		workerid = workerids[i];
+		starpu_worker_set_sched_condition(sched_ctx_id, workerid, NULL, NULL);
+	}	
 }
 
 static void initialize_eager_center_priority_policy(unsigned sched_ctx_id) 
 {
-	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	struct eager_central_prio_data *data = (struct eager_central_prio_data*)malloc(sizeof(struct eager_central_prio_data));
+	starpu_create_worker_collection_for_sched_ctx(sched_ctx_id, WORKER_LIST);
+	eager_central_prio_data *data = (eager_central_prio_data*)malloc(sizeof(eager_central_prio_data));
 
 	/* In this policy, we support more than two levels of priority. */
 	starpu_sched_set_min_priority(MIN_LEVEL);
@@ -103,55 +109,51 @@ static void initialize_eager_center_priority_policy(unsigned sched_ctx_id)
 
 	/* only a single queue (even though there are several internaly) */
 	data->taskq = _starpu_create_priority_taskq();
-	sched_ctx->policy_data = (void*) data;
+	starpu_set_sched_ctx_policy_data(sched_ctx_id, (void*)data);
 
-	PTHREAD_MUTEX_INIT(&data->sched_mutex, NULL);
-	PTHREAD_COND_INIT(&data->sched_cond, NULL);
+	_STARPU_PTHREAD_MUTEX_INIT(&data->sched_mutex, NULL);
+	_STARPU_PTHREAD_COND_INIT(&data->sched_cond, NULL);
 
-	int nworkers = sched_ctx->nworkers;
-	int workerid_ctx;
-	int workerid;
-	for (workerid_ctx = 0; workerid_ctx < nworkers; workerid_ctx++)
-	{
-		workerid = sched_ctx->workerids[workerid_ctx];
-		sched_ctx->sched_mutex[workerid] = &data->sched_mutex;
-		sched_ctx->sched_cond[workerid] = &data->sched_cond;
-	}
 }
 
 static void deinitialize_eager_center_priority_policy(unsigned sched_ctx_id) 
 {
 	/* TODO check that there is no task left in the queue */
-	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	struct eager_central_prio_data *data = (struct eager_central_prio_data*)sched_ctx->policy_data;
+	eager_central_prio_data *data = (eager_central_prio_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
 
 	/* deallocate the task queue */
 	_starpu_destroy_priority_taskq(data->taskq);
 
-	PTHREAD_MUTEX_DESTROY(&data->sched_mutex);
-        PTHREAD_COND_DESTROY(&data->sched_cond);
+	_STARPU_PTHREAD_MUTEX_DESTROY(&data->sched_mutex);
+        _STARPU_PTHREAD_COND_DESTROY(&data->sched_cond);
 
+	starpu_delete_worker_collection_for_sched_ctx(sched_ctx_id);
         free(data);
-
-	unsigned nworkers_ctx = sched_ctx->nworkers;
-	int workerid;
-	unsigned workerid_ctx;
-	for (workerid_ctx = 0; workerid_ctx < nworkers_ctx; workerid_ctx++)
-	{
-		workerid = sched_ctx->workerids[workerid_ctx];
-		sched_ctx->sched_mutex[workerid] = NULL;
-		sched_ctx->sched_cond[workerid] = NULL;
-	}
 	
 }
 
-static int _starpu_priority_push_task(struct starpu_task *task, unsigned sched_ctx_id)
+static int _starpu_priority_push_task(struct starpu_task *task)
 {
-	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	struct eager_central_prio_data *data = (struct eager_central_prio_data*)sched_ctx->policy_data;
+	unsigned sched_ctx_id = task->sched_ctx;
+	eager_central_prio_data *data = (eager_central_prio_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
 
-	struct starpu_priority_taskq_s *taskq = data->taskq;
+	struct _starpu_priority_taskq *taskq = data->taskq;
 
+	/* if the context has no workers return */
+	pthread_mutex_t *changing_ctx_mutex = starpu_get_changing_ctx_mutex(sched_ctx_id);
+        unsigned nworkers;
+        int ret_val = -1;
+
+        _STARPU_PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
+        nworkers = starpu_get_nworkers_of_sched_ctx(sched_ctx_id);
+        if(nworkers == 0)
+        {
+                _STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
+                return ret_val;
+        }
+
+
+	/*if there are no tasks block */
 	/* wake people waiting for a task */
 	_STARPU_PTHREAD_MUTEX_LOCK(&data->sched_mutex);
 
@@ -166,7 +168,8 @@ static int _starpu_priority_push_task(struct starpu_task *task, unsigned sched_c
 	_STARPU_PTHREAD_COND_SIGNAL(&data->sched_cond);
 	_STARPU_PTHREAD_MUTEX_UNLOCK(&data->sched_mutex);
 
-	return 0;
+        _STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
+        return 0;
 }
 
 static struct starpu_task *_starpu_priority_pop_task(unsigned sched_ctx_id)
@@ -174,13 +177,12 @@ static struct starpu_task *_starpu_priority_pop_task(unsigned sched_ctx_id)
 	/* XXX FIXME: should call starpu_worker_can_execute_task!! */
 	struct starpu_task *task = NULL;
 
-	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	struct eager_central_prio_data *data = (struct eager_central_prio_data*)sched_ctx->policy_data;
+	eager_central_prio_data *data = (eager_central_prio_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
 	
-	struct starpu_priority_taskq_s *taskq = data->taskq;
+	struct _starpu_priority_taskq *taskq = data->taskq;
 
 	/* block until some event happens */
-	PTHREAD_MUTEX_LOCK(&data->sched_mutex);
+	_STARPU_PTHREAD_MUTEX_LOCK(&data->sched_mutex);
 
 	if ((taskq->total_ntasks == 0) && _starpu_machine_is_running())
 	{
@@ -218,8 +220,9 @@ static struct starpu_task *_starpu_priority_pop_task(unsigned sched_ctx_id)
 struct starpu_sched_policy _starpu_sched_prio_policy =
 {
 	.init_sched = initialize_eager_center_priority_policy,
-	.init_sched_for_workers = initialize_eager_center_priority_policy_for_workers,
 	.deinit_sched = deinitialize_eager_center_priority_policy,
+        .add_workers = eager_priority_add_workers,
+        .remove_workers = eager_priority_remove_workers,
 	/* we always use priorities in that policy */
 	.push_task = _starpu_priority_push_task,
 	.pop_task = _starpu_priority_pop_task,