瀏覽代碼

resize using gflops/s criteria + bug fixing

Andra Hugo 13 年之前
父節點
當前提交
6f35019d20

+ 2 - 2
include/starpu_scheduler.h

@@ -132,13 +132,13 @@ struct starpu_sched_ctx_hypervisor_criteria {
 	void (*reset_idle_time_cb)(unsigned sched_ctx, int worker);
 	void (*working_time_cb)(unsigned sched_ctx, double working_time);
 	void (*pushed_task_cb)(unsigned sched_ctx, int worker);
-	void (*poped_task_cb)(unsigned sched_ctx, int worker);
+	void (*poped_task_cb)(unsigned sched_ctx, int worker, double flops);
 	void (*post_exec_hook_cb)(unsigned sched_ctx, int taskid);
 };
 
 #ifdef STARPU_BUILD_SCHED_CTX_HYPERVISOR
 unsigned starpu_create_sched_ctx_with_criteria(const char *policy_name, int *workerids_ctx, int nworkers_ctx, const char *sched_name, struct starpu_sched_ctx_hypervisor_criteria **criteria);
-void starpu_call_poped_task_cb(int workerid, unsigned sched_ctx_id);
+void starpu_call_poped_task_cb(int workerid, unsigned sched_ctx_id, double flops);
 void starpu_call_pushed_task_cb(int workerid, unsigned sched_ctx_id);
 #endif //STARPU_BUILD_SCHED_CTX_HYPERVISOR
 

+ 4 - 1
include/starpu_task.h

@@ -184,6 +184,8 @@ struct starpu_task {
 	unsigned control_task;
 
 	int hypervisor_tag;
+	
+	double flops;
 };
 
 /* It is possible to initialize statically allocated tasks with this value.
@@ -210,7 +212,8 @@ struct starpu_task {
 	.starpu_private = NULL,				\
 	.sched_ctx = 0,					\
 	.control_task = 0,				\
-		.hypervisor_tag = 0			\
+		.hypervisor_tag = 0,			\
+		.flops = 0.0			\
 };
 
 /*

+ 2 - 2
sched_ctx_hypervisor/examples/sched_ctx_utils/sched_ctx_utils.c

@@ -250,7 +250,7 @@ void construct_contexts(void (*bench)(float*, unsigned, unsigned))
 	p1.ctx = starpu_create_sched_ctx_with_criteria("heft", p1.workers, nworkers1, "sched_ctx1", criteria);
 	p2.the_other_ctx = (int)p1.ctx;
 	p1.nworkers = nworkers1;
-	sched_ctx_hypervisor_handle_ctx(p1.ctx);
+	sched_ctx_hypervisor_handle_ctx(p1.ctx, 0.0);
 	
 	/* sched_ctx_hypervisor_ioctl(p1.ctx, */
 	/* 			   HYPERVISOR_MAX_IDLE, p1.workers, p1.nworkers, 5000.0, */
@@ -285,7 +285,7 @@ void construct_contexts(void (*bench)(float*, unsigned, unsigned))
 	p2.ctx = starpu_create_sched_ctx_with_criteria("heft", p2.workers, 0, "sched_ctx2", criteria);
 	p1.the_other_ctx = (int)p2.ctx;
 	p2.nworkers = 0;
-	sched_ctx_hypervisor_handle_ctx(p2.ctx);
+	sched_ctx_hypervisor_handle_ctx(p2.ctx, 0.0);
 	
 	/* sched_ctx_hypervisor_ioctl(p2.ctx, */
 	/* 			   HYPERVISOR_MAX_IDLE, p2.workers, p2.nworkers, 2000.0, */

+ 10 - 1
sched_ctx_hypervisor/include/sched_ctx_hypervisor.h

@@ -24,7 +24,7 @@ struct starpu_sched_ctx_hypervisor_criteria* sched_ctx_hypervisor_init(int type)
 
 void sched_ctx_hypervisor_shutdown(void);
 
-void sched_ctx_hypervisor_handle_ctx(unsigned sched_ctx);
+void sched_ctx_hypervisor_handle_ctx(unsigned sched_ctx, double total_flops);
 
 void sched_ctx_hypervisor_ignore_ctx(unsigned sched_ctx);
 
@@ -50,6 +50,14 @@ int sched_ctx_hypervisor_get_nsched_ctxs();
 
 double sched_ctx_hypervisor_get_debit(unsigned sched_ctx);
 
+double sched_ctx_hypervisor_get_exp_end(unsigned sched_ctx);
+
+double sched_ctx_hypervisor_get_flops_left_pct(unsigned sched_ctx);
+
+double sched_ctx_hypervisor_get_idle_time(unsigned sched_ctx, int worker);
+
+double sched_ctx_hypervisor_get_bef_res_exp_end(unsigned sched_ctx);
+
 /* hypervisor policies */
 #define SIMPLE_POLICY 1
 
@@ -61,6 +69,7 @@ struct hypervisor_policy {
 	void* (*ioctl)(unsigned sched_ctx, va_list varg_list, unsigned later);
 	void (*manage_idle_time)(unsigned req_sched_ctx, int worker, double idle_time);
 	void (*manage_task_flux)(unsigned sched_ctx);
+	void (*manage_gflops_rate)(unsigned sched_ctx);
 	unsigned (*resize)(unsigned sched_ctx, int *sched_ctxs, unsigned nsched_ctxs);
 	void (*update_config)(void* old_config, void* new_config);
 };

+ 120 - 13
sched_ctx_hypervisor/src/hypervisor_policies/simple_policy.c

@@ -167,10 +167,26 @@ int* _get_first_workers(unsigned sched_ctx, unsigned *nworkers)
 					}
 				}
 				
-				if(!considered && (curr_workers[index] < 0 || 
-						   config->priority[worker] <
-						   config->priority[curr_workers[index]]))
-					curr_workers[index] = worker;
+				if(!considered)
+				{
+					/* the first iteration*/
+					if(curr_workers[index] < 0)
+						curr_workers[index] = worker;
+					/* small priority worker is the first to leave the ctx*/
+					else if(config->priority[worker] <
+						   config->priority[curr_workers[index]])
+						curr_workers[index] = worker;
+					/* if we don't consider priorities check for the workers
+					 with the biggest idle time */
+					else if(config->priority[worker] ==
+						   config->priority[curr_workers[index]])
+					{
+						double worker_idle_time = sched_ctx_hypervisor_get_idle_time(sched_ctx, worker);
+						double curr_worker_idle_time = sched_ctx_hypervisor_get_idle_time(sched_ctx, curr_workers[index]);
+						if(worker_idle_time > curr_worker_idle_time)
+							curr_workers[index] = worker;
+					}
+				}
 			}
 		}
 
@@ -245,14 +261,14 @@ static unsigned _get_nworkers_to_move(unsigned req_sched_ctx)
 			else
 				nworkers_to_move = potential_moving_workers - (config->min_nworkers - nfixed_workers);	
 		}
-//		printf("nworkers = %d nworkers_to_move = %d max_nworkers=%d\n", nworkers, nworkers_to_move, config->max_nworkers);
+
 		if((nworkers - nworkers_to_move) > config->max_nworkers)
 			nworkers_to_move = nworkers - config->max_nworkers;
 	}
 	return nworkers_to_move;
 }
 
-static int _find_fastest_sched_ctx()
+static int _find_highest_debit_sched_ctx()
 {
 	int *sched_ctxs = sched_ctx_hypervisor_get_sched_ctxs();
 	int nsched_ctxs = sched_ctx_hypervisor_get_nsched_ctxs();
@@ -273,7 +289,7 @@ static int _find_fastest_sched_ctx()
 	return fastest_sched_ctx;
 }
 
-static int _find_slowest_sched_ctx()
+static int _find_slowest_debit_sched_ctx()
 {
 	int *sched_ctxs = sched_ctx_hypervisor_get_sched_ctxs();
 	int nsched_ctxs = sched_ctx_hypervisor_get_nsched_ctxs();
@@ -300,7 +316,7 @@ static unsigned _simple_resize(unsigned sender_sched_ctx, unsigned receiver_sche
 	if(ret != EBUSY)
 	{					
 		unsigned nworkers_to_move = _get_nworkers_to_move(sender_sched_ctx);
-//		printf("nworkers = %d\n", nworkers_to_move);
+
 		if(nworkers_to_move > 0)
 		{
 			unsigned poor_sched_ctx = STARPU_NMAX_SCHED_CTXS;
@@ -311,12 +327,12 @@ static unsigned _simple_resize(unsigned sender_sched_ctx, unsigned receiver_sche
 				poor_sched_ctx = receiver_sched_ctx;
 				struct simple_policy_config *config = (struct simple_policy_config*)sched_ctx_hypervisor_get_config(poor_sched_ctx);
 				unsigned nworkers = starpu_get_nworkers_of_sched_ctx(poor_sched_ctx);
-      
 				if((nworkers+nworkers_to_move) > config->max_nworkers)
 					nworkers_to_move = nworkers > config->max_nworkers ? 0 : (config->max_nworkers - nworkers);
 				if(nworkers_to_move == 0) poor_sched_ctx = STARPU_NMAX_SCHED_CTXS;
 			}
-			
+
+
 			if(poor_sched_ctx != STARPU_NMAX_SCHED_CTXS)
 			{						
 				int *workers_to_move = _get_first_workers(sender_sched_ctx, &nworkers_to_move);
@@ -355,8 +371,8 @@ static void simple_manage_task_flux(unsigned curr_sched_ctx)
 {
 	double curr_debit = sched_ctx_hypervisor_get_debit(curr_sched_ctx);
 	
-	int slow_sched_ctx = _find_slowest_sched_ctx();
-	int fast_sched_ctx = _find_fastest_sched_ctx();
+	int slow_sched_ctx = _find_slowest_debit_sched_ctx();
+	int fast_sched_ctx = _find_highest_debit_sched_ctx();
 
 	if(slow_sched_ctx != fast_sched_ctx && slow_sched_ctx != -1 && fast_sched_ctx != -1)
 	{
@@ -378,6 +394,97 @@ static void simple_manage_task_flux(unsigned curr_sched_ctx)
 	}
 }
 
+int _find_fastest_sched_ctx()
+{
+	int *sched_ctxs = sched_ctx_hypervisor_get_sched_ctxs();
+	int nsched_ctxs = sched_ctx_hypervisor_get_nsched_ctxs();
+
+	double first_exp_end = sched_ctx_hypervisor_get_exp_end(sched_ctxs[0]);
+	int fastest_sched_ctx = first_exp_end == -1.0  ? -1 : sched_ctxs[0];
+	double curr_exp_end = 0.0;
+	int i;
+	for(i = 1; i < nsched_ctxs; i++)
+	{
+		curr_exp_end = sched_ctx_hypervisor_get_exp_end(sched_ctxs[i]);
+		if(first_exp_end > curr_exp_end && curr_exp_end != -1.0)
+		{
+			first_exp_end = curr_exp_end;
+			fastest_sched_ctx = sched_ctxs[i];
+		}
+	}
+
+	return fastest_sched_ctx;
+
+}
+
+int _find_slowest_sched_ctx()
+{
+	int *sched_ctxs = sched_ctx_hypervisor_get_sched_ctxs();
+	int nsched_ctxs = sched_ctx_hypervisor_get_nsched_ctxs();
+
+	int slowest_sched_ctx = -1;
+	double curr_exp_end = 0.0;
+	double last_exp_end = -1.0;
+	int i;
+	for(i = 0; i < nsched_ctxs; i++)
+	{
+		curr_exp_end = sched_ctx_hypervisor_get_exp_end(sched_ctxs[i]);
+		/*if it hasn't started bc of no ressources give it priority */
+		if(curr_exp_end == -1.0)
+			return sched_ctxs[i];
+		if(last_exp_end < curr_exp_end)
+		{
+			slowest_sched_ctx = sched_ctxs[i];
+			last_exp_end = curr_exp_end;
+		}
+	}
+
+	return slowest_sched_ctx;
+
+}
+
+static void simple_manage_gflops_rate(unsigned sched_ctx)
+{
+	double exp_end = sched_ctx_hypervisor_get_exp_end(sched_ctx);
+	double flops_left_pct = sched_ctx_hypervisor_get_flops_left_pct(sched_ctx);
+
+	if(flops_left_pct == 0.0f)
+	{
+		int slowest_sched_ctx = _find_slowest_sched_ctx(sched_ctx);
+		if(slowest_sched_ctx != -1 && slowest_sched_ctx != sched_ctx)
+		{
+			double slowest_flops_left_pct = sched_ctx_hypervisor_get_flops_left_pct(slowest_sched_ctx);
+			printf("ctx %d finished & gives away the res to %d; slow_left %lf\n", sched_ctx, slowest_sched_ctx, slowest_flops_left_pct);
+			if(slowest_flops_left_pct != 0.0f)
+			{
+				struct simple_policy_config* config = (struct simple_policy_config*)sched_ctx_hypervisor_get_config(sched_ctx);
+				config->min_nworkers = 0;
+				config->max_nworkers = 0;
+				_simple_resize(sched_ctx, slowest_sched_ctx);
+			}
+		}
+	}
+
+	int fastest_sched_ctx = _find_fastest_sched_ctx();
+	int slowest_sched_ctx = _find_slowest_sched_ctx();
+	if(fastest_sched_ctx != -1 && slowest_sched_ctx != -1 && fastest_sched_ctx != slowest_sched_ctx)
+	{
+		double fastest_exp_end = sched_ctx_hypervisor_get_exp_end(fastest_sched_ctx);
+		double slowest_exp_end = sched_ctx_hypervisor_get_exp_end(slowest_sched_ctx);
+		double fastest_bef_res_exp_end = sched_ctx_hypervisor_get_bef_res_exp_end(fastest_sched_ctx);
+		double slowest_bef_res_exp_end = sched_ctx_hypervisor_get_bef_res_exp_end(slowest_sched_ctx);
+//					       (fastest_bef_res_exp_end < slowest_bef_res_exp_end || 
+//						fastest_bef_res_exp_end == 0.0 || slowest_bef_res_exp_end == 0)))
+
+		if((slowest_exp_end == -1.0 && fastest_exp_end != -1.0) || (fastest_exp_end < slowest_exp_end ))
+		{
+			double fast_flops_left_pct = sched_ctx_hypervisor_get_flops_left_pct(fastest_sched_ctx);
+			if(fast_flops_left_pct < 0.8)
+				_simple_resize(fastest_sched_ctx, slowest_sched_ctx);
+		}
+	}
+}
+
 static void* simple_ioctl(unsigned sched_ctx, va_list varg_list, unsigned later)
 {
 	struct simple_policy_config *config = NULL;
@@ -442,7 +549,6 @@ static void* simple_ioctl(unsigned sched_ctx, va_list varg_list, unsigned later)
 
 		case HYPERVISOR_MAX_WORKERS:
 			config->max_nworkers = va_arg(varg_list, unsigned);
-			if(config->max_nworkers == 0)
 			break;
 
 		case HYPERVISOR_GRANULARITY:
@@ -513,6 +619,7 @@ struct hypervisor_policy simple_policy = {
 	.ioctl = simple_ioctl,
 	.manage_idle_time = simple_manage_idle_time,
 	.manage_task_flux = simple_manage_task_flux,
+	.manage_gflops_rate = simple_manage_gflops_rate,
 	.resize = simple_resize,
 	.update_config = simple_update_config
 };

+ 134 - 16
sched_ctx_hypervisor/src/sched_ctx_hypervisor.c

@@ -7,10 +7,11 @@ extern struct hypervisor_policy simple_policy;
 
 static void idle_time_cb(unsigned sched_ctx, int worker, double idle_time);
 static void pushed_task_cb(unsigned sched_ctx, int worker);
-static void poped_task_cb(unsigned sched_ctx, int worker);
+static void poped_task_cb(unsigned sched_ctx, int worker, double flops);
 static void post_exec_hook_cb(unsigned sched_ctx, int taskid);
 static void reset_idle_time_cb(unsigned sched_ctx, int  worker);
 
+static void _set_elapsed_flops_per_sched_ctx(unsigned sched_ctx, double val);
 static void _load_hypervisor_policy(int type)
 {
 	switch(type)
@@ -25,6 +26,7 @@ static void _load_hypervisor_policy(int type)
 		hypervisor.policy.update_config = simple_policy.update_config;
 		hypervisor.policy.resize = simple_policy.resize;
 		hypervisor.policy.manage_task_flux = simple_policy.manage_task_flux;
+		hypervisor.policy.manage_gflops_rate = simple_policy.manage_gflops_rate;
 		break;
 	}
 }
@@ -44,6 +46,13 @@ struct starpu_sched_ctx_hypervisor_criteria** sched_ctx_hypervisor_init(int type
 		hypervisor.sched_ctx_w[i].config = NULL;
 		hypervisor.sched_ctx_w[i].temp_npushed_tasks = 0;
 		hypervisor.sched_ctx_w[i].temp_npoped_tasks = 0;
+		hypervisor.sched_ctx_w[i].total_flops = 0.0;
+		hypervisor.sched_ctx_w[i].remaining_flops = 0.0;
+		hypervisor.sched_ctx_w[i].start_time = 0.0;
+		hypervisor.sched_ctx_w[i].bef_res_exp_end = 0.0;
+		hypervisor.sched_ctx_w[i].resize_ack.receiver_sched_ctx = -1;
+		hypervisor.sched_ctx_w[i].resize_ack.moved_workers = NULL;
+		hypervisor.sched_ctx_w[i].resize_ack.nmoved_workers = 0;
 
 		int j;
 		for(j = 0; j < STARPU_NMAXWORKERS; j++)
@@ -51,6 +60,8 @@ struct starpu_sched_ctx_hypervisor_criteria** sched_ctx_hypervisor_init(int type
 			hypervisor.sched_ctx_w[i].current_idle_time[j] = 0.0;
 			hypervisor.sched_ctx_w[i].pushed_tasks[j] = 0;
 			hypervisor.sched_ctx_w[i].poped_tasks[j] = 0;
+			hypervisor.sched_ctx_w[i].elapsed_flops[j] = 0.0;
+			hypervisor.sched_ctx_w[i].total_elapsed_flops[j] = 0.0;
 
 		}
 	}
@@ -102,7 +113,7 @@ void sched_ctx_hypervisor_shutdown(void)
 	pthread_mutex_destroy(&act_hypervisor_mutex);
 }
 
-void sched_ctx_hypervisor_handle_ctx(unsigned sched_ctx)
+void sched_ctx_hypervisor_handle_ctx(unsigned sched_ctx, double total_flops)
 {	
 	hypervisor.configurations[sched_ctx] = (struct starpu_htbl32_node_s*)malloc(sizeof(struct starpu_htbl32_node_s));
 	hypervisor.steal_requests[sched_ctx] = (struct starpu_htbl32_node_s*)malloc(sizeof(struct starpu_htbl32_node_s));
@@ -111,6 +122,9 @@ void sched_ctx_hypervisor_handle_ctx(unsigned sched_ctx)
 	hypervisor.policy.add_sched_ctx(sched_ctx);
 	hypervisor.sched_ctx_w[sched_ctx].sched_ctx = sched_ctx;
 	hypervisor.sched_ctxs[hypervisor.nsched_ctxs++] = sched_ctx;
+
+	hypervisor.sched_ctx_w[sched_ctx].total_flops = total_flops;
+	hypervisor.sched_ctx_w[sched_ctx].remaining_flops = total_flops;
 }
 
 static int _get_first_free_sched_ctx(int *sched_ctxs, unsigned nsched_ctxs)
@@ -260,17 +274,58 @@ void sched_ctx_hypervisor_move_workers(unsigned sender_sched_ctx, unsigned recei
 			printf(" %d", workers_to_move[j]);
 		printf("\n");
 
+		hypervisor.sched_ctx_w[sender_sched_ctx].bef_res_exp_end = sched_ctx_hypervisor_get_exp_end(sender_sched_ctx);
+
 		starpu_remove_workers_from_sched_ctx(workers_to_move, nworkers_to_move, sender_sched_ctx);
 		starpu_add_workers_to_sched_ctx(workers_to_move, nworkers_to_move, receiver_sched_ctx);
 
+		hypervisor.sched_ctx_w[sender_sched_ctx].resize_ack.receiver_sched_ctx = receiver_sched_ctx;
+		hypervisor.sched_ctx_w[sender_sched_ctx].resize_ack.moved_workers = (int*)malloc(nworkers_to_move * sizeof(int));
+		hypervisor.sched_ctx_w[sender_sched_ctx].resize_ack.nmoved_workers = nworkers_to_move;
+
+
 		int i;
 		for(i = 0; i < nworkers_to_move; i++)
+		{
 			hypervisor.sched_ctx_w[sender_sched_ctx].current_idle_time[workers_to_move[i]] = 0.0;
+			hypervisor.sched_ctx_w[sender_sched_ctx].resize_ack.moved_workers[i] = workers_to_move[i];	
+		}
+
+		hypervisor.resize[sender_sched_ctx] = 0;
 	}
 
 	return;
 }
 
+unsigned _check_for_resize_ack(unsigned sender_sched_ctx, unsigned receiver_sched_ctx, int *moved_workers, int nmoved_workers)
+{
+	struct sched_ctx_wrapper *sender_sc_w = &hypervisor.sched_ctx_w[sender_sched_ctx];
+	struct sched_ctx_wrapper *receiver_sc_w = &hypervisor.sched_ctx_w[receiver_sched_ctx];
+	int i;
+	for(i = 0; i < nmoved_workers; i++)
+	{
+		int worker = moved_workers[i];
+		if(receiver_sc_w->elapsed_flops[worker] == 0.0f)
+			return 0;
+	}
+
+	sender_sc_w->resize_ack.receiver_sched_ctx = -1;
+	sender_sc_w->resize_ack.nmoved_workers = 0;
+	free(sender_sc_w->resize_ack.moved_workers);
+
+	double start_time =  starpu_timing_now();
+	sender_sc_w->start_time = start_time;
+	sender_sc_w->remaining_flops = sender_sc_w->remaining_flops - _get_elapsed_flops_per_sched_ctx(sender_sched_ctx);
+	_set_elapsed_flops_per_sched_ctx(sender_sched_ctx, 0.0);
+	
+	receiver_sc_w->start_time = start_time;
+	receiver_sc_w->remaining_flops = receiver_sc_w->remaining_flops - _get_elapsed_flops_per_sched_ctx(receiver_sched_ctx);
+	_set_elapsed_flops_per_sched_ctx(receiver_sched_ctx, 0.0);
+	
+	hypervisor.resize[sender_sched_ctx] = 1;
+	return 1;
+}
+
 unsigned sched_ctx_hypervisor_resize(unsigned sched_ctx, int task_tag)
 {
 	if(task_tag == -1)
@@ -357,6 +412,16 @@ void sched_ctx_hypervisor_steal_workers(unsigned sched_ctx, int *workerids, int
 	return ;
 }
 
+double sched_ctx_hypervisor_get_idle_time(unsigned sched_ctx, int worker)
+{
+	return hypervisor.sched_ctx_w[sched_ctx].current_idle_time[worker];
+}
+
+double sched_ctx_hypervisor_get_bef_res_exp_end(unsigned sched_ctx)
+{
+	return hypervisor.sched_ctx_w[sched_ctx].bef_res_exp_end;
+}
+
 static void reset_idle_time_cb(unsigned sched_ctx, int worker)
 {
 	if(hypervisor.resize[sched_ctx])
@@ -407,32 +472,85 @@ double sched_ctx_hypervisor_get_debit(unsigned sched_ctx)
 	return 0.0;
 }
 
+static double _get_total_elapsed_flops_per_sched_ctx(unsigned sched_ctx)
+{
+	double ret_val = 0.0;
+	int i;
+	for(i = 0; i < STARPU_NMAXWORKERS; i++)
+		ret_val += hypervisor.sched_ctx_w[sched_ctx].total_elapsed_flops[i];
+	return ret_val;
+}
+
+static double _get_elapsed_flops_per_sched_ctx(unsigned sched_ctx)
+{
+	double ret_val = 0.0;
+	int i;
+	for(i = 0; i < STARPU_NMAXWORKERS; i++)
+		ret_val += hypervisor.sched_ctx_w[sched_ctx].elapsed_flops[i];
+	return ret_val;
+}
+
+static void _set_elapsed_flops_per_sched_ctx(unsigned sched_ctx, double val)
+{
+	int i;
+	for(i = 0; i < STARPU_NMAXWORKERS; i++)
+		hypervisor.sched_ctx_w[sched_ctx].elapsed_flops[i] = val;
+}
+
+double sched_ctx_hypervisor_get_exp_end(unsigned sched_ctx)
+{	
+	double elapsed_flops = _get_elapsed_flops_per_sched_ctx(sched_ctx);
+
+	if( elapsed_flops != 0.0)
+	{
+		double curr_time = starpu_timing_now();
+		double elapsed_time = curr_time - hypervisor.sched_ctx_w[sched_ctx].start_time;
+		double exp_end = (elapsed_time * hypervisor.sched_ctx_w[sched_ctx].remaining_flops / 
+				  elapsed_flops) + curr_time;
+		return exp_end;
+	}
+	return -1.0;
+}
+
+double sched_ctx_hypervisor_get_flops_left_pct(unsigned sched_ctx)
+{
+	struct sched_ctx_wrapper *wrapper = &hypervisor.sched_ctx_w[sched_ctx];
+	double total_elapsed_flops = _get_total_elapsed_flops_per_sched_ctx(sched_ctx);
+	if(wrapper->total_flops == total_elapsed_flops || total_elapsed_flops > wrapper->total_flops)
+		return 0.0;
+       
+	return (wrapper->total_flops - total_elapsed_flops)/wrapper->total_flops;
+}
+
 static void pushed_task_cb(unsigned sched_ctx, int worker)
 {	
 	hypervisor.sched_ctx_w[sched_ctx].pushed_tasks[worker]++;
+	if(hypervisor.sched_ctx_w[sched_ctx].total_flops != 0.0 && hypervisor.sched_ctx_w[sched_ctx].start_time == 0.0)
+		hypervisor.sched_ctx_w[sched_ctx].start_time = starpu_timing_now();
 	
 	int ntasks = get_ntasks(hypervisor.sched_ctx_w[sched_ctx].pushed_tasks);
 	
-	if(!imposed_resize)
-		hypervisor.resize[sched_ctx] = (ntasks > hypervisor.min_tasks);
+	if(!imposed_resize && ntasks == hypervisor.min_tasks)
+		hypervisor.resize[sched_ctx] = 1;
 }
 
-static void poped_task_cb(unsigned sched_ctx, int worker)
+static void poped_task_cb(unsigned sched_ctx, int worker, double elapsed_flops)
 {
 	hypervisor.sched_ctx_w[sched_ctx].poped_tasks[worker]++;
-	
+	hypervisor.sched_ctx_w[sched_ctx].elapsed_flops[worker] += elapsed_flops;
+	hypervisor.sched_ctx_w[sched_ctx].total_elapsed_flops[worker] += elapsed_flops;
+
 	if(hypervisor.nsched_ctxs > 1)
 	{
-		int npushed_tasks = get_ntasks(hypervisor.sched_ctx_w[sched_ctx].pushed_tasks);
-		int npoped_tasks = get_ntasks(hypervisor.sched_ctx_w[sched_ctx].poped_tasks);
-		int old_npushed_tasks = hypervisor.sched_ctx_w[sched_ctx].temp_npushed_tasks;
-		int old_npoped_tasks = hypervisor.sched_ctx_w[sched_ctx].temp_npoped_tasks;
-		if((old_npushed_tasks + old_npushed_tasks * 0.2) < npushed_tasks ||
-		    (old_npoped_tasks + old_npoped_tasks * 0.2) < npoped_tasks)
-			hypervisor.policy.manage_task_flux(sched_ctx);
-
-		hypervisor.sched_ctx_w[sched_ctx].temp_npushed_tasks = npushed_tasks;
-		hypervisor.sched_ctx_w[sched_ctx].temp_npoped_tasks = npoped_tasks;			
+		struct sched_ctx_wrapper *sc_w = &hypervisor.sched_ctx_w[sched_ctx];
+		if(hypervisor.resize[sched_ctx] || sched_ctx_hypervisor_get_flops_left_pct(sched_ctx) == 0.0f)
+			hypervisor.policy.manage_gflops_rate(sched_ctx);
+		else if(sc_w->resize_ack.receiver_sched_ctx != -1)
+		{
+			_check_for_resize_ack(sched_ctx, sc_w->resize_ack.receiver_sched_ctx, 
+					      sc_w->resize_ack.moved_workers, sc_w->resize_ack.nmoved_workers);
+		}
+
 	}
 }
 

+ 13 - 1
sched_ctx_hypervisor/src/sched_ctx_hypervisor_intern.h

@@ -1,6 +1,12 @@
 #include <sched_ctx_hypervisor.h>
 #include <../common/htable32.h>
 
+struct resize_ack{
+	int receiver_sched_ctx;
+	int *moved_workers;
+	int nmoved_workers;
+};
+
 struct sched_ctx_wrapper {
 	unsigned sched_ctx;
 	void *config;
@@ -9,7 +15,13 @@ struct sched_ctx_wrapper {
 	int poped_tasks[STARPU_NMAXWORKERS];
 	int temp_npushed_tasks;
 	int temp_npoped_tasks;
-
+	double total_flops;
+	double total_elapsed_flops[STARPU_NMAXWORKERS];
+	double elapsed_flops[STARPU_NMAXWORKERS];
+	double remaining_flops;
+	double start_time;
+	double bef_res_exp_end;
+	struct resize_ack resize_ack;
 };
 
 struct sched_ctx_hypervisor {

+ 9 - 5
src/core/sched_ctx.c

@@ -40,7 +40,8 @@ static void change_worker_sched_ctx(unsigned sched_ctx_id)
 		struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
 		/* add context to worker */
 		worker->sched_ctx[worker_sched_ctx_id] = sched_ctx;
-		worker->nsched_ctxs++;	}
+		worker->nsched_ctxs++;	
+	}
 	else 
 	{
 		/* remove context from worker */
@@ -278,7 +279,9 @@ void starpu_delete_sched_ctx(unsigned sched_ctx_id, unsigned inheritor_sched_ctx
 	int nworkers = config->topology.nworkers;
 
 	if(!(sched_ctx->workers->nworkers == nworkers && sched_ctx->workers->nworkers == inheritor_sched_ctx->workers->nworkers) && sched_ctx->workers->nworkers > 0 && inheritor_sched_ctx_id != STARPU_NMAX_SCHED_CTXS)
+	{
 		starpu_add_workers_to_sched_ctx(sched_ctx->workers->workerids, sched_ctx->workers->nworkers, inheritor_sched_ctx_id);
+	}
 	
 	if(!starpu_wait_for_all_tasks_of_sched_ctx(sched_ctx_id))
 	{
@@ -639,13 +642,13 @@ unsigned starpu_get_nworkers_of_sched_ctx(unsigned sched_ctx_id)
 }
 
 #ifdef STARPU_USE_SCHED_CTX_HYPERVISOR
-void starpu_call_poped_task_cb(int workerid, unsigned sched_ctx_id)
+void starpu_call_poped_task_cb(int workerid, unsigned sched_ctx_id, double flops)
 {
 	struct starpu_worker_s *worker =  _starpu_get_worker_struct(workerid);
 	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
 	if(sched_ctx != NULL && sched_ctx_id != 0 && sched_ctx_id != STARPU_NMAX_SCHED_CTXS
 		   && *sched_ctx->criteria != NULL)
-		(*sched_ctx->criteria)->poped_task_cb(sched_ctx_id, worker->workerid);
+		(*sched_ctx->criteria)->poped_task_cb(sched_ctx_id, worker->workerid, flops);
 }
 
 void starpu_call_pushed_task_cb(int workerid, unsigned sched_ctx_id)
@@ -653,8 +656,9 @@ void starpu_call_pushed_task_cb(int workerid, unsigned sched_ctx_id)
 	struct starpu_worker_s *worker =  _starpu_get_worker_struct(workerid);
 	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
 
-	if(sched_ctx != NULL && sched_ctx_id != 0  && *sched_ctx->criteria != NULL)
-		(*sched_ctx->criteria)->pushed_task_cb(sched_ctx_id, workerid);
+	if(sched_ctx != NULL && sched_ctx_id != 0)
+		if(*sched_ctx->criteria != NULL)
+			(*sched_ctx->criteria)->pushed_task_cb(sched_ctx_id, workerid);
 
 }
 

+ 21 - 21
src/core/sched_policy.c

@@ -312,29 +312,29 @@ int _starpu_push_task(starpu_job_t j, unsigned job_is_already_locked)
 	unsigned no_workers = 0;
 	unsigned nworkers = 0; 
 
-	/*if there are workers in the ctx that are not able to execute tasks 
-	  we consider the ctx empty */
 	if(!sched_ctx->is_initial_sched)
-	  nworkers = _starpu_nworkers_able_to_execute_task(task, sched_ctx);
-	else
-	  nworkers = sched_ctx->workers->nworkers;
-
-	if(nworkers == 0)
 	{
-		if(workerid == -1)
-		{
-			PTHREAD_MUTEX_LOCK(&sched_ctx->no_workers_mutex);
-			PTHREAD_COND_WAIT(&sched_ctx->no_workers_cond, &sched_ctx->no_workers_mutex);
-			PTHREAD_MUTEX_UNLOCK(&sched_ctx->no_workers_mutex);
-			nworkers = _starpu_nworkers_able_to_execute_task(task, sched_ctx);
-			if(nworkers == 0) return _starpu_push_task(j, job_is_already_locked);
-		}
-		else
+		/*if there are workers in the ctx that are not able to execute tasks 
+		  we consider the ctx empty */
+		nworkers = _starpu_nworkers_able_to_execute_task(task, sched_ctx);
+		
+		if(nworkers == 0)
 		{
-			PTHREAD_MUTEX_LOCK(&sched_ctx->empty_ctx_mutex);
-			starpu_task_list_push_front(&sched_ctx->empty_ctx_tasks, task);
-			PTHREAD_MUTEX_UNLOCK(&sched_ctx->empty_ctx_mutex);
-			return 0;
+			if(workerid == -1)
+			{
+				PTHREAD_MUTEX_LOCK(&sched_ctx->no_workers_mutex);
+				PTHREAD_COND_WAIT(&sched_ctx->no_workers_cond, &sched_ctx->no_workers_mutex);
+				PTHREAD_MUTEX_UNLOCK(&sched_ctx->no_workers_mutex);
+				nworkers = _starpu_nworkers_able_to_execute_task(task, sched_ctx);
+				if(nworkers == 0) return _starpu_push_task(j, job_is_already_locked);
+			}
+			else
+			{
+				PTHREAD_MUTEX_LOCK(&sched_ctx->empty_ctx_mutex);
+				starpu_task_list_push_front(&sched_ctx->empty_ctx_tasks, task);
+				PTHREAD_MUTEX_UNLOCK(&sched_ctx->empty_ctx_mutex);
+				return 0;
+			}
 		}
 	}
 
@@ -366,7 +366,7 @@ int _starpu_push_task(starpu_job_t j, unsigned job_is_already_locked)
 		if(ret == -1)
 		{
 			printf("repush task \n");
-			_starpu_push_task(j, job_is_already_locked);
+			ret = _starpu_push_task(j, job_is_already_locked);
 		}
 	}
 

+ 2 - 0
src/core/task.c

@@ -85,6 +85,8 @@ void starpu_task_init(struct starpu_task *task)
 	task->control_task = 0;
 
 	task->hypervisor_tag = 0;
+	
+	task->flops = 0.0;
 }
 
 /* Free all the ressources allocated for a task, without deallocating the task

+ 1 - 1
src/core/workers.c

@@ -676,7 +676,7 @@ struct starpu_worker_s *_starpu_get_worker_struct(unsigned id)
 
 struct starpu_sched_ctx *_starpu_get_sched_ctx_struct(unsigned id)
 {
-	STARPU_ASSERT(id <= STARPU_NMAX_SCHED_CTXS);
+	STARPU_ASSERT(id >= 0 && id <= STARPU_NMAX_SCHED_CTXS);
 	return &config.sched_ctxs[id];
 }
 

+ 1 - 0
src/drivers/cuda/driver_cuda.c

@@ -300,6 +300,7 @@ void *_starpu_cuda_worker(void *arg)
 
 
 		STARPU_ASSERT(task);
+
 		j = _starpu_get_job_associated_to_task(task);
 
 		/* can CUDA do that task ? */

+ 1 - 1
src/sched_policies/heft.c

@@ -145,7 +145,7 @@ static void heft_post_exec_hook(struct starpu_task *task)
 		starpu_worker_set_sched_condition(sched_ctx_id, workerid, sched_mutex, sched_cond);
 	}
 #ifdef STARPU_USE_SCHED_CTX_HYPERVISOR
-	starpu_call_poped_task_cb(workerid, sched_ctx_id);
+	starpu_call_poped_task_cb(workerid, sched_ctx_id, task->flops);
 #endif //STARPU_USE_SCHED_CTX_HYPERVISOR
 
 	/* Once we have executed the task, we can update the predicted amount