Browse Source

Advice when a tag task is executed

Andra Hugo 13 years ago
parent
commit
a77b636cda

+ 6 - 7
sched_ctx_hypervisor/examples/sched_ctx_utils/sched_ctx_utils.c

@@ -284,13 +284,12 @@ void set_hypervisor_conf(int event, int task_tag)
 		}
  		
 
-		sched_ctx_hypervisor_advise(p2.ctx, p2.procs, p2.nprocs, &reply2[task_tag]);
-		if(reply2[task_tag].procs)
-			sched_ctx_hypervisor_ioctl(p2.ctx,
-						   HYPERVISOR_MAX_IDLE, reply2[task_tag].procs, reply2[task_tag].nprocs, max_idle_time_small,
-						   HYPERVISOR_TIME_TO_APPLY, task_tag,
-						   HYPERVISOR_GRANULARITY, 1,
-						   NULL);
+		sched_ctx_hypervisor_advise(p2.ctx, p2.procs, p2.nprocs, task_tag);
+		/* sched_ctx_hypervisor_ioctl(p2.ctx, */
+		/* 			   HYPERVISOR_MAX_IDLE, p2.procs, p2.nprocs, max_idle_time_small, */
+		/* 			   HYPERVISOR_TIME_TO_APPLY, task_tag, */
+		/* 			   HYPERVISOR_GRANULARITY, 1, */
+		/* 			   NULL); */
 	}
 }
 

+ 3 - 3
sched_ctx_hypervisor/include/sched_ctx_hypervisor.h

@@ -31,13 +31,13 @@ void sched_ctx_hypervisor_ignore_ctx(unsigned sched_ctx);
 
 void sched_ctx_hypervisor_resize(unsigned sender_sched_ctx, unsigned receier_sched_ctx, int *workers_to_move, unsigned nworkers_to_movex);
 
-void sched_ctx_hypervisor_set_data(unsigned sched_ctx, void *data);
+void sched_ctx_hypervisor_set_config(unsigned sched_ctx, void *config);
 
-void* sched_ctx_hypervisor_get_data(unsigned sched_ctx);
+void* sched_ctx_hypervisor_get_config(unsigned sched_ctx);
 
 void sched_ctx_hypervisor_ioctl(unsigned sched_ctx, ...);
 
-void sched_ctx_hypervisor_advise(unsigned sched_ctx, int *workers, int nworkers, struct sched_ctx_hypervisor_reply *reply);
+void sched_ctx_hypervisor_advise(unsigned sched_ctx, int *workers, int nworkers, int task_tag);
 
 struct sched_ctx_hypervisor_reply* sched_ctx_hypervisor_request(unsigned sched_ctx, int *workers, int nworkers);
 

+ 343 - 0
sched_ctx_hypervisor/src/hypervisor_policies/simple_policy.c

@@ -0,0 +1,343 @@
+#include <sched_ctx_hypervisor.h>
+#include <pthread.h>
+
+#define MAX_IDLE_TIME 5000000000
+#define MIN_WORKING_TIME 500
+
+struct simple_policy_config {
+	/* underneath this limit we cannot resize */
+	unsigned min_nprocs;
+
+	/* above this limit we cannot resize */
+	unsigned max_nprocs;
+	
+	/*resize granularity */
+	unsigned granularity;
+
+	/* priority for a worker to stay in this context */
+	/* the smaller the priority the faster it will be moved */
+	/* to another context */
+	int priority[STARPU_NMAXWORKERS];
+
+	/* above this limit the priority of the worker is reduced */
+	double max_idle[STARPU_NMAXWORKERS];
+
+	/* underneath this limit the priority of the worker is reduced */
+	double min_working[STARPU_NMAXWORKERS];
+
+	/* workers that will not move */
+	unsigned fixed_procs[STARPU_NMAXWORKERS];
+
+	/* max idle for the workers that will be added during the resizing process*/
+	double new_workers_max_idle;
+};
+
+static void simple_init(void)
+{
+}
+
+static void simple_deinit(void)
+{
+}
+
+static struct simple_policy_config* _create_config(void)
+{
+	struct simple_policy_config *config = (struct simple_policy_config *)malloc(sizeof(struct simple_policy_config));
+	config->min_nprocs = 0;
+	config->max_nprocs = 0;	
+	config->new_workers_max_idle = MAX_IDLE_TIME;
+
+	int i;
+	for(i = 0; i < STARPU_NMAXWORKERS; i++)
+	{
+		config->granularity = 1;
+		config->priority[i] = 0;
+		config->fixed_procs[i] = 0;
+		config->max_idle[i] = MAX_IDLE_TIME;
+		config->min_working[i] = MIN_WORKING_TIME;
+	}
+	
+	return config;
+}
+
+static void simple_add_sched_ctx(unsigned sched_ctx)
+{
+	struct simple_policy_config *config = _create_config();
+	sched_ctx_hypervisor_set_config(sched_ctx, config);
+}
+
+static int _compute_priority(unsigned sched_ctx)
+{
+	struct simple_policy_config *config = (struct simple_policy_config*)sched_ctx_hypervisor_get_config(sched_ctx);
+
+	int total_priority = 0;
+
+	struct worker_collection *workers = starpu_get_worker_collection_of_sched_ctx(sched_ctx);
+	int worker;
+
+	if(workers->init_cursor)
+		workers->init_cursor(workers);
+
+	while(workers->has_next(workers))
+	{
+		worker = workers->get_next(workers);
+		total_priority += config->priority[worker];
+	}
+
+	if(workers->init_cursor)
+		workers->deinit_cursor(workers);
+	return total_priority;
+}
+
+static unsigned _get_highest_priority_sched_ctx(unsigned req_sched_ctx, int *sched_ctxs, int nsched_ctxs)
+{
+	int i;
+	int highest_priority = 0;
+	int current_priority = 0;
+	unsigned sched_ctx = STARPU_NMAX_SCHED_CTXS;
+
+	for(i = 0; i < nsched_ctxs; i++)
+	{
+		if(sched_ctxs[i] != STARPU_NMAX_SCHED_CTXS && sched_ctxs[i] != req_sched_ctx)
+		{
+			current_priority = _compute_priority(sched_ctxs[i]);
+			if (highest_priority < current_priority)
+			{
+				highest_priority = current_priority;
+				sched_ctx = sched_ctxs[i];
+			}
+		}
+	}
+	
+	return sched_ctx;
+}
+
+int* _get_first_workers(unsigned sched_ctx, int nworkers)
+{
+	struct simple_policy_config *config = (struct simple_policy_config*)sched_ctx_hypervisor_get_config(sched_ctx);
+
+	int *curr_workers = (int*)malloc(nworkers * sizeof(int));
+	int i;
+	for(i = 0; i < nworkers; i++)
+		curr_workers[i] = -1;
+
+	struct worker_collection *workers = starpu_get_worker_collection_of_sched_ctx(sched_ctx);
+	int index;
+	int worker;
+	int considered = 0;
+
+	if(workers->init_cursor)
+		workers->init_cursor(workers);
+
+	for(index = 0; index < nworkers; index++)
+	{
+		while(workers->has_next(workers))
+		{
+			considered = 0;
+			worker = workers->get_next(workers);
+			if(!config->fixed_procs[worker])
+			{
+				for(i = 0; i < index; i++)
+				{
+					if(curr_workers[i] == worker)
+					{
+						considered = 1;
+						break;
+					}
+				}
+				
+				if(!considered && (curr_workers[index] < 0 || 
+						   config->priority[worker] <
+						   config->priority[curr_workers[index]]))
+					curr_workers[index] = worker;
+			}
+		}
+
+		if(curr_workers[index] < 0)
+			break;
+	}
+
+	if(workers->init_cursor)
+		workers->deinit_cursor(workers);
+
+	return curr_workers;
+}
+
+static unsigned _get_potential_nworkers(struct simple_policy_config *config, unsigned sched_ctx)
+{
+	struct worker_collection *workers = starpu_get_worker_collection_of_sched_ctx(sched_ctx);
+
+	unsigned potential_workers = 0;
+	int worker;
+
+	if(workers->init_cursor)
+		workers->init_cursor(workers);
+	while(workers->has_next(workers))
+	{
+		worker = workers->get_next(workers);
+		if(!config->fixed_procs[worker])
+			potential_workers++;
+	}
+	if(workers->init_cursor)
+		workers->deinit_cursor(workers);
+	
+	return potential_workers;
+}
+
+static void simple_manage_idle_time(unsigned req_sched_ctx, int *sched_ctxs, int nsched_ctxs, int worker, double idle_time)
+{
+       	struct simple_policy_config *config = (struct simple_policy_config*)sched_ctx_hypervisor_get_config(req_sched_ctx);
+
+	if(config && idle_time > config->max_idle[worker])
+	{
+		int ret = pthread_mutex_trylock(&act_hypervisor_mutex);
+		if(ret != EBUSY)
+		{					
+			
+			unsigned nworkers = starpu_get_nworkers_of_sched_ctx(req_sched_ctx);
+			unsigned nworkers_to_move = 0;
+			
+			/* leave at least one */
+			unsigned potential_moving_workers = _get_potential_nworkers(config, req_sched_ctx) - 1;
+			
+			if(potential_moving_workers > config->granularity)
+			{
+				if((nworkers - config->granularity) > config->min_nprocs)	
+					nworkers_to_move = config->granularity;
+			}
+			else
+			{
+				int nfixed_workers = nworkers - potential_moving_workers;
+				if(nfixed_workers >= config->min_nprocs)
+					nworkers_to_move = potential_moving_workers;
+				else
+					nworkers_to_move = potential_moving_workers - (config->min_nprocs - nfixed_workers);			
+			}
+			
+			if(nworkers_to_move > 0)
+			{
+				unsigned prio_sched_ctx = _get_highest_priority_sched_ctx(req_sched_ctx, sched_ctxs, nsched_ctxs);
+				if(prio_sched_ctx != STARPU_NMAX_SCHED_CTXS)
+				{
+					
+					int *workers_to_move = _get_first_workers(req_sched_ctx, nworkers_to_move);
+					
+					sched_ctx_hypervisor_resize(req_sched_ctx, prio_sched_ctx, workers_to_move, nworkers_to_move);
+
+					struct simple_policy_config *prio_config = (struct simple_policy_config*)sched_ctx_hypervisor_get_config(prio_sched_ctx);
+					int i;
+					for(i = 0; i < nworkers_to_move; i++)
+						prio_config->max_idle[workers_to_move[i]] = prio_config->new_workers_max_idle;
+					
+					free(workers_to_move);
+				}
+			}	
+			pthread_mutex_unlock(&act_hypervisor_mutex);
+		}
+	}
+}
+
+static void* simple_ioctl(unsigned sched_ctx, va_list varg_list, unsigned later)
+{
+	struct simple_policy_config *config = NULL;
+
+	if(later)
+		config = _create_config();
+	else
+		config = (struct simple_policy_config*)sched_ctx_hypervisor_get_config(sched_ctx);
+
+	assert(config != NULL);
+
+	int arg_type;
+	int i;
+	int *workerids;
+	int nworkers;
+	int it = 0;
+
+	while ((arg_type = va_arg(varg_list, int)) != 0) 
+	{
+		switch(arg_type)
+		{
+		case HYPERVISOR_MAX_IDLE:
+			workerids = va_arg(varg_list, int*);
+			nworkers = va_arg(varg_list, int);
+			double max_idle = va_arg(varg_list, double);
+			
+			for(i = 0; i < nworkers; i++)
+				config->max_idle[workerids[i]] = max_idle;
+
+			break;
+
+		case HYPERVISOR_MIN_WORKING:
+			workerids = va_arg(varg_list, int*);
+			nworkers = va_arg(varg_list, int);
+			double min_working = va_arg(varg_list, double);
+
+			for(i = 0; i < nworkers; i++)
+				config->min_working[workerids[i]] = min_working;
+
+			break;
+
+		case HYPERVISOR_PRIORITY:
+			workerids = va_arg(varg_list, int*);
+			nworkers = va_arg(varg_list, int);
+			int priority = va_arg(varg_list, int);
+	
+			for(i = 0; i < nworkers; i++)
+				config->priority[workerids[i]] = priority;
+			break;
+
+		case HYPERVISOR_MIN_PROCS:
+			config->min_nprocs = va_arg(varg_list, unsigned);
+			break;
+
+		case HYPERVISOR_MAX_PROCS:
+			config->max_nprocs = va_arg(varg_list, unsigned);
+			break;
+
+		case HYPERVISOR_GRANULARITY:
+			config->granularity = va_arg(varg_list, unsigned);
+			break;
+
+		case HYPERVISOR_FIXED_PROCS:
+			workerids = va_arg(varg_list, int*);
+			nworkers = va_arg(varg_list, int);
+
+			for(i = 0; i < nworkers; i++)
+				config->fixed_procs[workerids[i]] = 1;
+			break;
+
+		case HYPERVISOR_NEW_WORKERS_MAX_IDLE:
+			config->new_workers_max_idle = va_arg(varg_list, double);
+			break;
+
+/* not important for the strateg, needed just to jump these args in the iteration of the args */			
+		case HYPERVISOR_TIME_TO_APPLY:
+			va_arg(varg_list, int);
+			break;
+
+		case HYPERVISOR_MIN_TASKS:
+			va_arg(varg_list, int);
+			break;
+
+		}
+	}
+
+	va_end(varg_list);
+
+	return later ? (void*)config : NULL;
+}
+
+static void simple_remove_sched_ctx(unsigned sched_ctx)
+{
+	sched_ctx_hypervisor_set_config(sched_ctx, NULL);
+}
+
+struct hypervisor_policy simple_policy = {
+	.init = simple_init,
+	.deinit = simple_deinit,
+	.add_sched_ctx = simple_add_sched_ctx,
+	.remove_sched_ctx = simple_remove_sched_ctx,
+	.ioctl = simple_ioctl,
+	.manage_idle_time = simple_manage_idle_time
+};

+ 51 - 36
sched_ctx_hypervisor/src/sched_ctx_hypervisor.c

@@ -35,7 +35,7 @@ struct starpu_sched_ctx_hypervisor_criteria* sched_ctx_hypervisor_init(int type)
 		hypervisor.configurations[i] = NULL;
 		hypervisor.sched_ctxs[i] = STARPU_NMAX_SCHED_CTXS;
 		hypervisor.sched_ctx_w[i].sched_ctx = STARPU_NMAX_SCHED_CTXS;
-		hypervisor.sched_ctx_w[i].data = NULL;
+		hypervisor.sched_ctx_w[i].config = NULL;
 		int j;
 		for(j = 0; j < STARPU_NMAXWORKERS; j++)
 		{
@@ -67,6 +67,8 @@ void sched_ctx_hypervisor_shutdown(void)
 void sched_ctx_hypervisor_handle_ctx(unsigned sched_ctx)
 {	
 	hypervisor.configurations[sched_ctx] = (struct starpu_htbl32_node_s*)malloc(sizeof(struct starpu_htbl32_node_s));
+	hypervisor.advices[sched_ctx] = (struct starpu_htbl32_node_s*)malloc(sizeof(struct starpu_htbl32_node_s));
+
 	hypervisor.policy.add_sched_ctx(sched_ctx);
 	hypervisor.sched_ctx_w[sched_ctx].sched_ctx = sched_ctx;
 	hypervisor.sched_ctxs[hypervisor.nsched_ctxs++] = sched_ctx;
@@ -122,23 +124,27 @@ void sched_ctx_hypervisor_ignore_ctx(unsigned sched_ctx)
 	hypervisor.sched_ctx_w[sched_ctx].sched_ctx = STARPU_NMAX_SCHED_CTXS;
 	hypervisor.policy.remove_sched_ctx(sched_ctx);
 	free(hypervisor.configurations[sched_ctx]);
+	free(hypervisor.advices[sched_ctx]);
 }
 
-void sched_ctx_hypervisor_set_data(unsigned sched_ctx, void *data)
+void sched_ctx_hypervisor_set_config(unsigned sched_ctx, void *config)
 {
 	pthread_mutex_lock(&act_hypervisor_mutex);
 
-	if(hypervisor.sched_ctx_w[sched_ctx].data != NULL)
-		free(hypervisor.sched_ctx_w[sched_ctx].data);
+	if(hypervisor.sched_ctx_w[sched_ctx].config != NULL)
+	{
+		free(hypervisor.sched_ctx_w[sched_ctx].config);
+		hypervisor.sched_ctx_w[sched_ctx].config = NULL;
+	}
 
-	hypervisor.sched_ctx_w[sched_ctx].data = data;
+	hypervisor.sched_ctx_w[sched_ctx].config = config;
 	pthread_mutex_unlock(&act_hypervisor_mutex);
 	return;
 }
 
-void* sched_ctx_hypervisor_get_data(unsigned sched_ctx)
+void* sched_ctx_hypervisor_get_config(unsigned sched_ctx)
 {
-	return hypervisor.sched_ctx_w[sched_ctx].data;
+	return hypervisor.sched_ctx_w[sched_ctx].config;
 }
 
 void sched_ctx_hypervisor_ioctl(unsigned sched_ctx, ...)
@@ -162,6 +168,7 @@ void sched_ctx_hypervisor_ioctl(unsigned sched_ctx, ...)
 		case HYPERVISOR_MIN_TASKS:
 			hypervisor.min_tasks = va_arg(varg_list, int);
 			break;
+
 		}
 		if(stop) break;
 	}
@@ -170,9 +177,9 @@ void sched_ctx_hypervisor_ioctl(unsigned sched_ctx, ...)
 	va_start(varg_list, sched_ctx);
 
 	/* hypervisor configuration to be considered later */
-	void *data = hypervisor.policy.ioctl(sched_ctx, varg_list, (task_tag > 0));
-	if(data != NULL)
-		_starpu_htbl_insert_32(&hypervisor.configurations[sched_ctx], (uint32_t)task_tag, data);
+	void *config = hypervisor.policy.ioctl(sched_ctx, varg_list, (task_tag > 0));
+	if(config != NULL)
+		_starpu_htbl_insert_32(&hypervisor.configurations[sched_ctx], (uint32_t)task_tag, config);
 
 	return;
 }
@@ -221,35 +228,37 @@ void sched_ctx_hypervisor_resize(unsigned sender_sched_ctx, unsigned receiver_sc
 	return;
 }
 
-void sched_ctx_hypervisor_advise(unsigned sched_ctx, int *workerids, int nworkers, struct sched_ctx_hypervisor_reply *reply)
+void sched_ctx_hypervisor_advise(unsigned sched_ctx, int *workerids, int nworkers, int task_tag)
 {
-	pthread_mutex_lock(&act_hypervisor_mutex);
-	
-	if(hypervisor.sched_ctx_w[sched_ctx].sched_ctx != STARPU_NMAX_SCHED_CTXS)
+	/* do it right now */
+	if(task_tag == -1)	
 	{
-		starpu_add_workers_to_sched_ctx(workerids, nworkers, sched_ctx);
-		
-		struct worker_collection *workers = starpu_get_worker_collection_of_sched_ctx(sched_ctx);
-		
-		int i = 0;
-		
-		if(workers->init_cursor)
-			workers->init_cursor(workers);
-		
-		while(workers->has_next(workers))
-			reply->procs[i++] = workers->get_next(workers);
+		pthread_mutex_lock(&act_hypervisor_mutex);
 		
-		if(workers->init_cursor)
-			workers->deinit_cursor(workers);
+		if(hypervisor.sched_ctx_w[sched_ctx].sched_ctx != STARPU_NMAX_SCHED_CTXS)
+		{
+			printf("do advice\n");
+			starpu_add_workers_to_sched_ctx(workerids, nworkers, sched_ctx);
+			
+			sched_ctx_hypervisor_ioctl(sched_ctx, 
+						   HYPERVISOR_PRIORITY, workerids, nworkers, 1,
+						   NULL);		
+		}
 		
-		reply->nprocs = i;
-		sched_ctx_hypervisor_ioctl(sched_ctx, 
-					   HYPERVISOR_PRIORITY, workerids, nworkers, 1,
-					   NULL);
+		pthread_mutex_unlock(&act_hypervisor_mutex);
+	}
+	else
+	{
+
+		struct sched_ctx_hypervisor_advice* advice = (struct sched_ctx_hypervisor_advice*)malloc(sizeof(struct sched_ctx_hypervisor_advice));
+		int i;
+		for(i = 0; i < nworkers; i++)
+			advice->workerids[i] = workerids[i];
+		advice->nworkers = nworkers;
 		
+		_starpu_htbl_insert_32(&hypervisor.advices[sched_ctx], (uint32_t)task_tag, (void*)advice);			
 	}
 
-	pthread_mutex_unlock(&act_hypervisor_mutex);
 	return;
 }
 
@@ -267,7 +276,6 @@ static void idle_time_cb(unsigned sched_ctx, int worker, double idle_time)
 
 	if(hypervisor.nsched_ctxs > 1 && hypervisor.policy.manage_idle_time)
 		hypervisor.policy.manage_idle_time(sched_ctx, hypervisor.sched_ctxs, hypervisor.nsched_ctxs, worker, hypervisor.sched_ctx_w[sched_ctx].current_idle_time[worker]);
-		
 
 	return;
 }
@@ -298,7 +306,14 @@ static void poped_task_cb(unsigned sched_ctx, int worker)
 static void post_exec_hook_cb(unsigned sched_ctx, int task_tag)
 {
 	STARPU_ASSERT(task_tag > 0);
-	void *data = _starpu_htbl_search_32(hypervisor.configurations[sched_ctx], (uint32_t)task_tag);
-	if(data != NULL)	
-		sched_ctx_hypervisor_set_data(sched_ctx, data);
+	void *config = _starpu_htbl_search_32(hypervisor.configurations[sched_ctx], (uint32_t)task_tag);
+	if(config != NULL)	
+		sched_ctx_hypervisor_set_config(sched_ctx, config);
+
+	struct sched_ctx_hypervisor_advice *advice = (struct sched_ctx_hypervisor_advice*) _starpu_htbl_search_32(hypervisor.advices[sched_ctx], (uint32_t)task_tag);
+	if(advice)
+	{
+		sched_ctx_hypervisor_advise(sched_ctx, advice->workerids, advice->nworkers, -1);
+		free(advice);
+	}
 }

+ 7 - 1
sched_ctx_hypervisor/src/sched_ctx_hypervisor_intern.h

@@ -2,7 +2,7 @@
 
 struct sched_ctx_wrapper {
 	unsigned sched_ctx;
-	void *data;
+	void *config;
 	double current_idle_time[STARPU_NMAXWORKERS];
 	int tasks[STARPU_NMAXWORKERS];
 	int poped_tasks[STARPU_NMAXWORKERS];
@@ -16,6 +16,12 @@ struct sched_ctx_hypervisor {
 	int min_tasks;
 	struct hypervisor_policy policy;
 	struct starpu_htbl32_node_s *configurations[STARPU_NMAX_SCHED_CTXS];
+	struct starpu_htbl32_node_s *advices[STARPU_NMAX_SCHED_CTXS];
+};
+
+struct sched_ctx_hypervisor_advice {
+	int workerids[STARPU_NMAXWORKERS];
+	int nworkers;
 };
 
 struct sched_ctx_hypervisor hypervisor;