Andra Hugo %!s(int64=13) %!d(string=hai) anos
pai
achega
d4cbeb3dfd

+ 2 - 0
include/starpu_scheduler.h

@@ -171,6 +171,8 @@ void starpu_worker_init_sched_condition(unsigned sched_ctx, int workerid);
 void starpu_worker_deinit_sched_condition(unsigned sched_ctx, int workerid);
 
 void starpu_create_worker_collection_for_sched_ctx(unsigned sched_ctx_id, int type);
+	
+void starpu_delete_worker_collection_for_sched_ctx(unsigned sched_ctx_id); 
 
 struct worker_collection* starpu_get_worker_collection_of_sched_ctx(unsigned sched_ctx_id);
 

+ 1 - 1
src/Makefile.am

@@ -156,6 +156,7 @@ libstarpu_@STARPU_EFFECTIVE_VERSION@_la_SOURCES = 						\
 	sched_policies/stack_queues.c				\
 	sched_policies/deque_queues.c				\
 	sched_policies/fifo_queues.c				\
+	sched_policies/eager_central_policy.c			\
 	sched_policies/deque_modeling_policy_data_aware.c	\
 	sched_policies/detect_combined_workers.c		\
 	drivers/driver_common/driver_common.c			\
@@ -209,7 +210,6 @@ libstarpu_@STARPU_EFFECTIVE_VERSION@_la_SOURCES = 						\
 	top/starpu_top_message_queue.c				\
 	top/starpu_top_connection.c                          	\
 	worker_collection/worker_list.c
-#	sched_policies/eager_central_policy.c			
 #	sched_policies/eager_central_priority_policy.c		
 #	sched_policies/work_stealing_policy.c			
 #	sched_policies/random_policy.c				

+ 8 - 4
src/core/sched_ctx.c

@@ -246,14 +246,10 @@ unsigned starpu_create_sched_ctx_with_perf_counters(const char *policy_name, int
 /* free all structures for the context */
 static void free_sched_ctx_mem(struct _starpu_sched_ctx *sched_ctx)
 {
-	sched_ctx->workers->deinit(sched_ctx->workers);
-
-	free(sched_ctx->workers);
 	free(sched_ctx->sched_policy);
 	free(sched_ctx->sched_mutex);
 	free(sched_ctx->sched_cond);
 
-	sched_ctx->workers = NULL;
 	sched_ctx->sched_policy = NULL;
 	sched_ctx->sched_mutex = NULL;
 	sched_ctx->sched_cond = NULL;
@@ -637,6 +633,14 @@ void starpu_create_worker_collection_for_sched_ctx(unsigned sched_ctx_id, int wo
 	return;
 }
 
+void starpu_delete_worker_collection_for_sched_ctx(unsigned sched_ctx_id)
+{
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	sched_ctx->workers->deinit(sched_ctx->workers);
+
+	free(sched_ctx->workers);
+}
+
 struct worker_collection* starpu_get_worker_collection_of_sched_ctx(unsigned sched_ctx_id)
 {
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);

+ 1 - 1
src/sched_policies/deque_modeling_policy_data_aware.c

@@ -738,10 +738,10 @@ static void initialize_dmda_sorted_policy(unsigned sched_ctx_id)
 
 static void deinitialize_dmda_policy(unsigned sched_ctx_id) 
 {
-
 	dmda_data *dt = (dmda_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
 	free(dt->queue_array);
 	free(dt);
+	starpu_delete_worker_collection_for_sched_ctx(sched_ctx_id);
 
 	_STARPU_DEBUG("total_task_cnt %ld ready_task_cnt %ld -> %f\n", dt->total_task_cnt, dt->ready_task_cnt, (100.0f*dt->ready_task_cnt)/dt->total_task_cnt);
 }

+ 78 - 69
src/sched_policies/eager_central_policy.c

@@ -24,123 +24,132 @@
 #include <core/workers.h>
 #include <sched_policies/fifo_queues.h>
 
-typedef struct eager_center_policy_data {
+typedef struct {
 	struct _starpu_fifo_taskq *fifo;
 	pthread_mutex_t sched_mutex;
 	pthread_cond_t sched_cond;
 } eager_center_policy_data;
 
-static void initialize_eager_center_policy_for_workers(unsigned sched_ctx_id, int *workerids, unsigned nnew_workers) 
+static void eager_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers) 
 {
-	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	struct eager_center_policy_data *data = (struct eager_center_policy_data*)sched_ctx->policy_data;
+	eager_center_policy_data *data = (eager_center_policy_data*)malloc(sizeof(eager_center_policy_data));
+	unsigned i;
+	int workerid;
+	for (i = 0; i < nworkers; i++)
+	{
+		workerid = workerids[i];
+		starpu_worker_set_sched_condition(sched_ctx_id, workerid, &data->sched_mutex, &data->sched_cond);
+	}
+}
 
+static void eager_remove_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
+{
 	unsigned i;
 	int workerid;
-	for (i = 0; i < nnew_workers; i++)
+	for (i = 0; i < nworkers; i++)
 	{
 		workerid = workerids[i];
-		sched_ctx->sched_mutex[workerid] = &data->sched_mutex;
-		sched_ctx->sched_cond[workerid] = &data->sched_cond;
+		starpu_worker_set_sched_condition(sched_ctx_id, workerid, NULL, NULL);
 	}
 }
 
 static void initialize_eager_center_policy(unsigned sched_ctx_id) 
 {
-	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	starpu_create_worker_collection_for_sched_ctx(sched_ctx_id, WORKER_LIST);
 
-	struct eager_center_policy_data *data = (struct eager_center_policy_data*)malloc(sizeof(eager_center_policy_data));
+	eager_center_policy_data *data = (eager_center_policy_data*)malloc(sizeof(eager_center_policy_data));
 
 	/* there is only a single queue in that trivial design */
 	data->fifo =  _starpu_create_fifo();
 
-	PTHREAD_MUTEX_INIT(&data->sched_mutex, NULL);
-	PTHREAD_COND_INIT(&data->sched_cond, NULL);
-
-	sched_ctx->policy_data = (void*)data;
+	_STARPU_PTHREAD_MUTEX_INIT(&data->sched_mutex, NULL);
+	_STARPU_PTHREAD_COND_INIT(&data->sched_cond, NULL);
 
-	int workerid;
-	unsigned workerid_ctx;
-	int nworkers_ctx = sched_ctx->nworkers;
-	for (workerid_ctx = 0; workerid_ctx < nworkers_ctx; workerid_ctx++){
-		workerid = sched_ctx->workerids[workerid_ctx];
-		sched_ctx->sched_mutex[workerid] = &data->sched_mutex;
-		sched_ctx->sched_cond[workerid] = &data->sched_cond;
-	}
+	starpu_set_sched_ctx_policy_data(sched_ctx_id, (void*)data);
 }
 
 static void deinitialize_eager_center_policy(unsigned sched_ctx_id) 
 {
 	/* TODO check that there is no task left in the queue */
 
-	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	struct eager_center_policy_data *data = (struct eager_center_policy_data*)sched_ctx->policy_data;
-
+	eager_center_policy_data *data = (eager_center_policy_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
 
 	/* deallocate the job queue */
 	_starpu_destroy_fifo(data->fifo);
 
-	PTHREAD_MUTEX_DESTROY(&data->sched_mutex);
-	PTHREAD_COND_DESTROY(&data->sched_cond);
+	_STARPU_PTHREAD_MUTEX_DESTROY(&data->sched_mutex);
+	_STARPU_PTHREAD_COND_DESTROY(&data->sched_cond);
 	
-	free(data);	
-	
-	unsigned nworkers_ctx = sched_ctx->nworkers;
-	int workerid;
-	unsigned workerid_ctx;
-	for (workerid_ctx = 0; workerid_ctx < nworkers_ctx; workerid_ctx++)
-	{
-		workerid = sched_ctx->workerids[workerid_ctx];
-		sched_ctx->sched_mutex[workerid] = NULL;
-		sched_ctx->sched_cond[workerid] = NULL;
-	}
+	starpu_delete_worker_collection_for_sched_ctx(sched_ctx_id);
 
+	free(data);	
 }
 
-static int push_task_eager_policy(struct starpu_task *task, unsigned sched_ctx_id)
+static int push_task_eager_policy(struct starpu_task *task)
 {
-	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	struct eager_center_policy_data *data = (struct eager_center_policy_data*)sched_ctx->policy_data;
-
-	int i;
-	int workerid;
-	for(i = 0; i < sched_ctx->nworkers; i++){
-		workerid = sched_ctx->workerids[i]; 
-		_starpu_increment_nsubmitted_tasks_of_worker(workerid);
+	unsigned sched_ctx_id = task->sched_ctx;
+	eager_center_policy_data *data = (eager_center_policy_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
+        pthread_mutex_t *changing_ctx_mutex = starpu_get_changing_ctx_mutex(sched_ctx_id);
+        unsigned nworkers;
+        int ret_val = -1;
+
+        _STARPU_PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
+        nworkers = starpu_get_nworkers_of_sched_ctx(sched_ctx_id);
+        if(nworkers == 0)
+        {
+                _STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
+                return ret_val;
+        }
+
+	struct worker_collection *workers = starpu_get_worker_collection_of_sched_ctx(sched_ctx_id);
+	int worker;
+	if(workers->init_cursor)
+		workers->init_cursor(workers);
+
+        while(workers->has_next(workers))
+	{
+		worker = workers->get_next(workers);
+		_starpu_increment_nsubmitted_tasks_of_worker(worker);
 	}
 
-	struct _starpu_fifo_taskq *fifo = data->fifo;
-	return _starpu_fifo_push_task(fifo, &data->sched_mutex, &data->sched_cond, task);
+	if(workers->init_cursor)
+                workers->deinit_cursor(workers);
+
+        ret_val = _starpu_fifo_push_task(data->fifo, &data->sched_mutex, &data->sched_cond, task);
+        _STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
+        return ret_val;
 }
 
 static struct starpu_task *pop_every_task_eager_policy(unsigned sched_ctx_id)
 {
-	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	struct eager_center_policy_data *data = (struct eager_center_policy_data*)sched_ctx->policy_data;
-
-	static struct _starpu_fifo_taskq *fifo = data->fifo;
-	return _starpu_fifo_pop_every_task(fifo, &data->sched_mutex, starpu_worker_get_id());
+	eager_center_policy_data *data = (eager_center_policy_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
+	return _starpu_fifo_pop_every_task(data->fifo, &data->sched_mutex, starpu_worker_get_id());
 }
 
 static struct starpu_task *pop_task_eager_policy(unsigned sched_ctx_id)
 {
-    unsigned workerid = starpu_worker_get_id();
-	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	struct eager_center_policy_data *data = (struct eager_center_policy_data*)sched_ctx->policy_data;
-
-	static struct _starpu_fifo_taskq *fifo = data->fifo;
-	struct starpu_task *task =  _starpu_fifo_pop_task(fifo, workerid);
-
+	unsigned workerid = starpu_worker_get_id();
+	eager_center_policy_data *data = (eager_center_policy_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
+	
+	struct starpu_task *task =  _starpu_fifo_pop_task(data->fifo, workerid);
+	
 	if(task)
-	  {
-		int i;
-		for(i = 0; i <sched_ctx->nworkers; i++)
-		  {
-			workerid = sched_ctx->workerids[i]; 
+	{
+		struct worker_collection *workers = starpu_get_worker_collection_of_sched_ctx(sched_ctx_id);
+		int worker;
+		if(workers->init_cursor)
+			workers->init_cursor(workers);
+		
+		while(workers->has_next(workers))
+		{
+			worker = workers->get_next(workers);
 			_starpu_decrement_nsubmitted_tasks_of_worker(workerid);
-		  }
-	  }
-
+		}
+		
+		if(workers->init_cursor)
+			workers->deinit_cursor(workers);
+	}
+	
 	return task;
 }
 
@@ -148,7 +157,7 @@ struct starpu_sched_policy _starpu_sched_eager_policy =
 {
 	.init_sched = initialize_eager_center_policy,
 	.deinit_sched = deinitialize_eager_center_policy,
-	.add_workers = eager_add_workers ,
+	.add_workers = eager_add_workers,
 	.remove_workers = eager_remove_workers,
 	.push_task = push_task_eager_policy,
 	.pop_task = pop_task_eager_policy,

+ 1 - 0
src/sched_policies/heft.c

@@ -616,6 +616,7 @@ static void heft_deinit(unsigned sched_ctx_id)
 {
 	heft_data *ht = (heft_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
 	free(ht);
+	starpu_delete_worker_collection_for_sched_ctx(sched_ctx_id);
 }
 
 struct starpu_sched_policy heft_policy =