Browse Source

hypervisor fixes

Andra Hugo 13 years ago
parent
commit
d8ca57aeef

+ 5 - 1
include/starpu_scheduler.h

@@ -180,7 +180,11 @@ pthread_mutex_t* starpu_get_changing_ctx_mutex(unsigned sched_ctx_id);
 
 void starpu_set_sched_ctx(unsigned *sched_ctx);
 
-unsigned starpu_get_sched_ctx();
+unsigned starpu_get_sched_ctx(void);
+
+void starpu_notify_hypervisor_exists(void);
+
+unsigned starpu_check_if_hypervisor_exists(void);
 
 unsigned starpu_get_nworkers_of_sched_ctx(unsigned sched_ctx);
 

+ 3 - 0
sched_ctx_hypervisor/include/sched_ctx_hypervisor.h

@@ -55,6 +55,7 @@ struct resize_ack{
 	int receiver_sched_ctx;
 	int *moved_workers;
 	int nmoved_workers;
+	int *acked_workers;
 };
 
 struct sched_ctx_wrapper {
@@ -113,3 +114,5 @@ int sched_ctx_hypervisor_get_nsched_ctxs();
 struct sched_ctx_wrapper* sched_ctx_hypervisor_get_wrapper(unsigned sched_ctx);
 
 double sched_ctx_hypervisor_get_elapsed_flops_per_sched_ctx(struct sched_ctx_wrapper* sc_w);
+
+char* sched_ctx_hypervisor_get_policy();

+ 0 - 1
sched_ctx_hypervisor/src/hypervisor_policies/gflops_rate_policy.c

@@ -308,7 +308,6 @@ static void gflops_rate_resize(unsigned sched_ctx)
 	int fastest_sched_ctx = _find_fastest_sched_ctx();
 	int slowest_sched_ctx = _find_slowest_sched_ctx();
 
-//	printf("%d %d \n", fastest_sched_ctx, slowest_sched_ctx);
 	if(fastest_sched_ctx != -1 && slowest_sched_ctx != -1 && fastest_sched_ctx != slowest_sched_ctx)
 	{
 		double fastest_exp_end = _get_exp_end(fastest_sched_ctx);

+ 2 - 1
sched_ctx_hypervisor/src/hypervisor_policies/policy_utils.c

@@ -222,7 +222,9 @@ unsigned _resize(unsigned sender_sched_ctx, unsigned receiver_sched_ctx, unsigne
 		{
 			unsigned poor_sched_ctx = STARPU_NMAX_SCHED_CTXS;
 			if(receiver_sched_ctx == STARPU_NMAX_SCHED_CTXS)
+			{
 				poor_sched_ctx = _find_poor_sched_ctx(sender_sched_ctx, nworkers_to_move);
+			}
 			else
 			{
 				poor_sched_ctx = receiver_sched_ctx;
@@ -234,7 +236,6 @@ unsigned _resize(unsigned sender_sched_ctx, unsigned receiver_sched_ctx, unsigne
 				if(nworkers_to_move == 0) poor_sched_ctx = STARPU_NMAX_SCHED_CTXS;
 			}
 
-
 			if(poor_sched_ctx != STARPU_NMAX_SCHED_CTXS)
 			{						
 				int *workers_to_move = _get_first_workers(sender_sched_ctx, &nworkers_to_move, -1);

+ 1 - 1
sched_ctx_hypervisor/src/sched_ctx_config.c

@@ -41,7 +41,6 @@ static void _update_config(struct policy_config *old, struct policy_config* new)
 
 void sched_ctx_hypervisor_set_config(unsigned sched_ctx, void *config)
 {
-	printf("%d: ", sched_ctx );
 	if(hypervisor.sched_ctx_w[sched_ctx].config != NULL && config != NULL)
 	{
 		_update_config(hypervisor.sched_ctx_w[sched_ctx].config, config);
@@ -214,6 +213,7 @@ void sched_ctx_hypervisor_ioctl(unsigned sched_ctx, ...)
 
 	/* if config not null => save hypervisor configuration and consider it later */
 	struct policy_config *config = _ioctl(sched_ctx, varg_list, (task_tag > 0));
+
 	if(config != NULL)
 		_starpu_htbl_insert_32(&hypervisor.configurations[sched_ctx], (uint32_t)task_tag, config);
 

+ 119 - 51
sched_ctx_hypervisor/src/sched_ctx_hypervisor.c

@@ -14,7 +14,6 @@ extern struct hypervisor_policy idle_policy;
 extern struct hypervisor_policy app_driven_policy;
 extern struct hypervisor_policy gflops_rate_policy;
 
-
 static struct hypervisor_policy *predefined_policies[] = {
         &idle_policy,
 	&app_driven_policy,
@@ -31,7 +30,7 @@ static void _load_hypervisor_policy(struct hypervisor_policy *policy)
 		_STARPU_DEBUG("Use %s hypervisor policy \n", policy->name);
         }
 #endif
-
+	hypervisor.policy.name = policy->name;
 	hypervisor.policy.handle_poped_task = policy->handle_poped_task;
 	hypervisor.policy.handle_pushed_task = policy->handle_pushed_task;
 	hypervisor.policy.handle_idle_cycle = policy->handle_idle_cycle;
@@ -102,6 +101,7 @@ struct starpu_performance_counters* sched_ctx_hypervisor_init(struct hypervisor_
 	hypervisor.min_tasks = 0;
 	hypervisor.nsched_ctxs = 0;
 	pthread_mutex_init(&act_hypervisor_mutex, NULL);
+
 	int i;
 	for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
 	{
@@ -116,6 +116,8 @@ struct starpu_performance_counters* sched_ctx_hypervisor_init(struct hypervisor_
 		hypervisor.sched_ctx_w[i].resize_ack.receiver_sched_ctx = -1;
 		hypervisor.sched_ctx_w[i].resize_ack.moved_workers = NULL;
 		hypervisor.sched_ctx_w[i].resize_ack.nmoved_workers = 0;
+		hypervisor.sched_ctx_w[i].resize_ack.acked_workers = NULL;
+
 
 		int j;
 		for(j = 0; j < STARPU_NMAXWORKERS; j++)
@@ -138,9 +140,17 @@ struct starpu_performance_counters* sched_ctx_hypervisor_init(struct hypervisor_
 	perf_counters->notify_poped_task = notify_poped_task;
 	perf_counters->notify_post_exec_hook = notify_post_exec_hook;
 	perf_counters->notify_idle_end = notify_idle_end;
+
+	starpu_notify_hypervisor_exists();
+
 	return perf_counters;
 }
 
+char* sched_ctx_hypervisor_get_policy()
+{
+	return hypervisor.policy.name;
+}
+
 /* the user can forbid the resizing process*/
 void sched_ctx_hypervisor_stop_resize(unsigned sched_ctx)
 {
@@ -191,6 +201,8 @@ void sched_ctx_hypervisor_register_ctx(unsigned sched_ctx, double total_flops)
 
 	hypervisor.sched_ctx_w[sched_ctx].total_flops = total_flops;
 	hypervisor.sched_ctx_w[sched_ctx].remaining_flops = total_flops;
+	if(strcmp(hypervisor.policy.name, "app_driven") == 0)
+		hypervisor.resize[sched_ctx] = 1;
 }
 
 static int _get_first_free_sched_ctx(int *sched_ctxs, unsigned nsched_ctxs)
@@ -277,7 +289,7 @@ static void _get_cpus(int *workers, int nworkers, int *cpus, int *ncpus)
 /* forbids another resize request before this one is take into account */
 void sched_ctx_hypervisor_move_workers(unsigned sender_sched_ctx, unsigned receiver_sched_ctx, int* workers_to_move, unsigned nworkers_to_move)
 {
-	if(nworkers_to_move > 0 && hypervisor.resize[sender_sched_ctx])
+	if(nworkers_to_move > 0 && hypervisor.resize[sender_sched_ctx] && hypervisor.resize[receiver_sched_ctx])
 	{
 		int j;
 		printf("resize ctx %d with", sender_sched_ctx);
@@ -298,6 +310,7 @@ void sched_ctx_hypervisor_move_workers(unsigned sender_sched_ctx, unsigned recei
 		hypervisor.sched_ctx_w[sender_sched_ctx].resize_ack.receiver_sched_ctx = receiver_sched_ctx;
 		hypervisor.sched_ctx_w[sender_sched_ctx].resize_ack.moved_workers = (int*)malloc(nworkers_to_move * sizeof(int));
 		hypervisor.sched_ctx_w[sender_sched_ctx].resize_ack.nmoved_workers = nworkers_to_move;
+		hypervisor.sched_ctx_w[sender_sched_ctx].resize_ack.acked_workers = (int*)malloc(nworkers_to_move * sizeof(int));
 
 
 		int i;
@@ -305,9 +318,11 @@ void sched_ctx_hypervisor_move_workers(unsigned sender_sched_ctx, unsigned recei
 		{
 			hypervisor.sched_ctx_w[sender_sched_ctx].current_idle_time[workers_to_move[i]] = 0.0;
 			hypervisor.sched_ctx_w[sender_sched_ctx].resize_ack.moved_workers[i] = workers_to_move[i];	
+			hypervisor.sched_ctx_w[sender_sched_ctx].resize_ack.acked_workers[i] = 0;	
 		}
 
 		hypervisor.resize[sender_sched_ctx] = 0;
+		hypervisor.resize[receiver_sched_ctx] = 0;
 	}
 
 	return;
@@ -329,33 +344,88 @@ double sched_ctx_hypervisor_get_elapsed_flops_per_sched_ctx(struct sched_ctx_wra
 	return ret_val;
 }
 
-unsigned _check_for_resize_ack(unsigned sender_sched_ctx, unsigned receiver_sched_ctx, int *moved_workers, int nmoved_workers)
+static unsigned _ack_resize_completed(unsigned sched_ctx, int worker)
 {
-	struct sched_ctx_wrapper *sender_sc_w = &hypervisor.sched_ctx_w[sender_sched_ctx];
-	struct sched_ctx_wrapper *receiver_sc_w = &hypervisor.sched_ctx_w[receiver_sched_ctx];
-	int i;
-	for(i = 0; i < nmoved_workers; i++)
+	struct resize_ack *resize_ack = NULL;
+	unsigned sender_sched_ctx = STARPU_NMAX_SCHED_CTXS;
+
+	if(hypervisor.nsched_ctxs > 0)
 	{
-		int worker = moved_workers[i];
-		if(receiver_sc_w->elapsed_flops[worker] == 0.0f)
-			return 0;
+		int i;
+		for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
+		{
+			if(hypervisor.sched_ctxs[i] != STARPU_NMAX_SCHED_CTXS)
+			{
+				struct sched_ctx_wrapper *sc_w = &hypervisor.sched_ctx_w[hypervisor.sched_ctxs[i]];
+				if(sc_w->resize_ack.receiver_sched_ctx != -1 && 
+				   sc_w->resize_ack.receiver_sched_ctx == sched_ctx) 
+				{
+					resize_ack = &sc_w->resize_ack;
+					sender_sched_ctx = hypervisor.sched_ctxs[i];
+					break;
+				}
+			}
+		}
 	}
 
-	sender_sc_w->resize_ack.receiver_sched_ctx = -1;
-	sender_sc_w->resize_ack.nmoved_workers = 0;
-	free(sender_sc_w->resize_ack.moved_workers);
-
-	double start_time =  starpu_timing_now();
-	sender_sc_w->start_time = start_time;
-	sender_sc_w->remaining_flops = sender_sc_w->remaining_flops - sched_ctx_hypervisor_get_elapsed_flops_per_sched_ctx(sender_sc_w);
-	_set_elapsed_flops_per_sched_ctx(sender_sched_ctx, 0.0);
-	
-	receiver_sc_w->start_time = start_time;
-	receiver_sc_w->remaining_flops = receiver_sc_w->remaining_flops - sched_ctx_hypervisor_get_elapsed_flops_per_sched_ctx(receiver_sc_w);
-	_set_elapsed_flops_per_sched_ctx(receiver_sched_ctx, 0.0);
-	
-	hypervisor.resize[sender_sched_ctx] = 1;
-	return 1;
+	/* if there is no ctx waiting for its ack return 1*/
+	if(resize_ack == NULL)
+		return 1;
+	else
+	{
+		int *moved_workers = resize_ack->moved_workers;
+		int nmoved_workers = resize_ack->nmoved_workers;
+		int *acked_workers = resize_ack->acked_workers;
+		int i;
+		
+		if(worker != -1)
+		{
+			for(i = 0; i < nmoved_workers; i++)
+			{
+				int moved_worker = moved_workers[i];
+				if(moved_worker == worker && acked_workers[i] == 0)
+					acked_workers[i] = 1;
+			}
+		}
+		
+		int nacked_workers = 0;
+		for(i = 0; i < nmoved_workers; i++)
+		{
+			nacked_workers += (acked_workers[i] == 1);
+		}
+		
+		unsigned resize_completed = (nacked_workers == nmoved_workers);
+		unsigned receiver_sched_ctx = resize_ack->receiver_sched_ctx;
+		/* if the permission to resize is not allowed by the user don't do it
+		   whatever the application says */
+		if(resize_completed && !((hypervisor.resize[sched_ctx] == 0 || hypervisor.resize[receiver_sched_ctx] == 0) && imposed_resize) && worker == moved_workers[0])
+		{				
+			/* info concerning only the gflops_rate strateg */
+			struct sched_ctx_wrapper *sender_sc_w = &hypervisor.sched_ctx_w[sender_sched_ctx];
+			struct sched_ctx_wrapper *receiver_sc_w = &hypervisor.sched_ctx_w[receiver_sched_ctx];
+			
+			double start_time =  starpu_timing_now();
+			sender_sc_w->start_time = start_time;
+			sender_sc_w->remaining_flops = sender_sc_w->remaining_flops - sched_ctx_hypervisor_get_elapsed_flops_per_sched_ctx(sender_sc_w);
+			_set_elapsed_flops_per_sched_ctx(sender_sched_ctx, 0.0);
+			
+			receiver_sc_w->start_time = start_time;
+			receiver_sc_w->remaining_flops = receiver_sc_w->remaining_flops - sched_ctx_hypervisor_get_elapsed_flops_per_sched_ctx(receiver_sc_w);
+			_set_elapsed_flops_per_sched_ctx(receiver_sched_ctx, 0.0);
+
+			hypervisor.resize[sender_sched_ctx] = 1;
+			hypervisor.resize[receiver_sched_ctx] = 1;
+			/* if the user allowed resizing leave the decisions to the application */
+			if(imposed_resize)  imposed_resize = 0;
+
+			resize_ack->receiver_sched_ctx = -1;
+			resize_ack->nmoved_workers = 0;
+			free(resize_ack->moved_workers);
+			free(resize_ack->acked_workers);
+		}
+		return resize_completed;
+	}
+	return 0;
 }
 
 void sched_ctx_hypervisor_resize(unsigned sched_ctx, int task_tag)
@@ -366,11 +436,17 @@ void sched_ctx_hypervisor_resize(unsigned sched_ctx, int task_tag)
 /* notifies the hypervisor that the worker is no longer idle and a new task was pushed on its queue */
 static void notify_idle_end(unsigned sched_ctx, int worker)
 {
-	if(hypervisor.resize[sched_ctx])
-		hypervisor.sched_ctx_w[sched_ctx].current_idle_time[worker] = 0.0;
+	if(hypervisor.nsched_ctxs > 1)
+	{
 
-	if(hypervisor.policy.handle_idle_end)
-		hypervisor.policy.handle_idle_end(sched_ctx, worker);
+		if(hypervisor.resize[sched_ctx])
+			hypervisor.sched_ctx_w[sched_ctx].current_idle_time[worker] = 0.0;
+		
+		if(hypervisor.policy.handle_idle_end)
+			hypervisor.policy.handle_idle_end(sched_ctx, worker);
+		
+		_ack_resize_completed(sched_ctx, worker);
+	}
 }
 
 /* notifies the hypervisor that the worker spent another cycle in idle time */
@@ -378,19 +454,15 @@ static void notify_idle_cycle(unsigned sched_ctx, int worker, double idle_time)
 {
 	if(hypervisor.nsched_ctxs > 1)
 	{
-		struct sched_ctx_wrapper *sc_w = &hypervisor.sched_ctx_w[sched_ctx];
 		if(hypervisor.resize[sched_ctx])
 		{
+			struct sched_ctx_wrapper *sc_w = &hypervisor.sched_ctx_w[sched_ctx];
 			sc_w->current_idle_time[worker] += idle_time;
-
 			if(hypervisor.policy.handle_idle_cycle)
 				hypervisor.policy.handle_idle_cycle(sched_ctx, worker);
 		}		
-		else if(sc_w->resize_ack.receiver_sched_ctx != -1)
-		{
-			_check_for_resize_ack(sched_ctx, sc_w->resize_ack.receiver_sched_ctx,
-					      sc_w->resize_ack.moved_workers, sc_w->resize_ack.nmoved_workers);
-		}
+		else 
+			_ack_resize_completed(sched_ctx, worker);
 	}
 	return;
 }
@@ -404,8 +476,11 @@ static void notify_pushed_task(unsigned sched_ctx, int worker)
 	
 	int ntasks = get_ntasks(hypervisor.sched_ctx_w[sched_ctx].pushed_tasks);
 	
-	if(!imposed_resize && ntasks == hypervisor.min_tasks)
+	if(!(hypervisor.resize[sched_ctx] == 0 && imposed_resize) && ntasks == hypervisor.min_tasks)
+	{
 		hypervisor.resize[sched_ctx] = 1;
+		if(imposed_resize) imposed_resize = 0;
+	}
 
 	if(hypervisor.policy.handle_pushed_task)
 		hypervisor.policy.handle_pushed_task(sched_ctx, worker);
@@ -420,17 +495,13 @@ static void notify_poped_task(unsigned sched_ctx, int worker, double elapsed_flo
 
 	if(hypervisor.nsched_ctxs > 1)
 	{
-		struct sched_ctx_wrapper *sc_w = &hypervisor.sched_ctx_w[sched_ctx];
 		if(hypervisor.resize[sched_ctx])
 		{
 			if(hypervisor.policy.handle_poped_task)
 				hypervisor.policy.handle_poped_task(sched_ctx, worker);
 		}
-		else if(sc_w->resize_ack.receiver_sched_ctx != -1)
-		{
-			_check_for_resize_ack(sched_ctx, sc_w->resize_ack.receiver_sched_ctx,
-					      sc_w->resize_ack.moved_workers, sc_w->resize_ack.nmoved_workers);
-		}
+		else 
+			_ack_resize_completed(sched_ctx, worker);
 	}
 }
 
@@ -454,8 +525,10 @@ static void notify_post_exec_hook(unsigned sched_ctx, int task_tag)
 				_starpu_htbl_insert_32(&hypervisor.configurations[sched_ctx], (uint32_t)task_tag, NULL);
 			}
 		}	
-				
-		struct sched_ctx_wrapper *sc_w = &hypervisor.sched_ctx_w[sched_ctx];
+
+		/* for the app driven we have to wait for the resize to be available
+		   because the event is required to be executed at this specific moment */
+		while(!_ack_resize_completed(sched_ctx, -1));
 
 		if(hypervisor.resize[sched_ctx])
 		{
@@ -464,11 +537,6 @@ static void notify_post_exec_hook(unsigned sched_ctx, int task_tag)
 			if(hypervisor.policy.handle_post_exec_hook)
 				hypervisor.policy.handle_post_exec_hook(sched_ctx, resize_requests, task_tag);
 		}
-		else if(sc_w->resize_ack.receiver_sched_ctx != -1)
-		{
-			_check_for_resize_ack(sched_ctx, sc_w->resize_ack.receiver_sched_ctx,
-					      sc_w->resize_ack.moved_workers, sc_w->resize_ack.nmoved_workers);
-		}
 	}
 }
 struct sched_ctx_wrapper* sched_ctx_hypervisor_get_wrapper(unsigned sched_ctx)

+ 12 - 1
src/core/sched_ctx.c

@@ -21,6 +21,7 @@
 extern struct worker_collection worker_list;
 
 pthread_key_t sched_ctx_key;
+unsigned with_hypervisor = 0;
 
 static unsigned _starpu_get_first_free_sched_ctx(struct _starpu_machine_config *config);
 static unsigned _starpu_worker_get_first_free_sched_ctx(struct _starpu_worker *worker);
@@ -33,7 +34,7 @@ static void change_worker_sched_ctx(unsigned sched_ctx_id)
 	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
 
 	int worker_sched_ctx_id = _starpu_worker_get_sched_ctx_id(worker, sched_ctx_id);
-	/* if the ctx is not in the worker's list it means the update concerns the addition of ctxs*/
+	/* if the worker is not in the ctx's list it means the update concerns the addition of ctxs*/
 	if(worker_sched_ctx_id == STARPU_NMAX_SCHED_CTXS)
 	{
 		worker_sched_ctx_id = _starpu_worker_get_first_free_sched_ctx(worker);
@@ -482,6 +483,16 @@ unsigned starpu_get_sched_ctx()
 	return *sched_ctx;
 }
 
+void starpu_notify_hypervisor_exists()
+{
+	with_hypervisor = 1;
+}
+
+unsigned starpu_check_if_hypervisor_exists()
+{
+	return with_hypervisor;
+}
+
 unsigned _starpu_get_nsched_ctxs()
 {
 	struct _starpu_machine_config *config = (struct _starpu_machine_config *)_starpu_get_machine_config();