Andra Hugo 13 vuotta sitten
vanhempi
commit
ecd77fea1c

+ 2 - 2
include/starpu_scheduler.h

@@ -138,8 +138,8 @@ struct starpu_sched_ctx_hypervisor_criteria {
 
 #ifdef STARPU_BUILD_SCHED_CTX_HYPERVISOR
 unsigned starpu_create_sched_ctx_with_criteria(const char *policy_name, int *workerids_ctx, int nworkers_ctx, const char *sched_name, struct starpu_sched_ctx_hypervisor_criteria *criteria);
-void starpu_call_poped_task_cb(int workerid);
-void starpu_call_pushed_task_cb(int workerid);
+void starpu_call_poped_task_cb(int workerid, unsigned sched_ctx_id);
+void starpu_call_pushed_task_cb(int workerid, unsigned sched_ctx_id);
 #endif //STARPU_BUILD_SCHED_CTX_HYPERVISOR
 
 unsigned starpu_create_sched_ctx(const char *policy_name, int *workerids_ctx, int nworkers_ctx, const char *sched_name);

+ 35 - 16
sched_ctx_hypervisor/examples/sched_ctx_utils/sched_ctx_utils.c

@@ -86,6 +86,17 @@ void* start_bench(void *val){
 	for(i = 0; i < NSAMPLES; i++)
 		p->bench(p->mat[i], p->size, p->nblocks);
 	
+	/* if(p->ctx != 0) */
+	/* { */
+	/* 	pthread_mutex_lock(&mut); */
+	/* 	if(first){ */
+	/* 		sched_ctx_hypervisor_ignore_ctx(p->ctx); */
+	/* 		starpu_delete_sched_ctx(p->ctx, p->the_other_ctx); */
+	/* 	} */
+		
+	/* 	first = 0; */
+	/* 	pthread_mutex_unlock(&mut); */
+	/* } */
 	sched_ctx_hypervisor_stop_resize(p->the_other_ctx);
 	rv[p->id].flops /= NSAMPLES;
 	rv[p->id].avg_timing /= NSAMPLES;
@@ -255,7 +266,7 @@ void construct_contexts(void (*bench)(float*, unsigned, unsigned))
 	sched_ctx_hypervisor_ioctl(p1.ctx,
 				   HYPERVISOR_GRANULARITY, 2,
 				   HYPERVISOR_MIN_TASKS, 1000,
-				   HYPERVISOR_MIN_WORKERS, 12,
+				   HYPERVISOR_MIN_WORKERS, 6,
 				   HYPERVISOR_MAX_WORKERS, 12,
 				   NULL);
 
@@ -291,7 +302,7 @@ void construct_contexts(void (*bench)(float*, unsigned, unsigned))
 				   HYPERVISOR_GRANULARITY, 2,
 				   HYPERVISOR_MIN_TASKS, 500,
 				   HYPERVISOR_MIN_WORKERS, 0,
-				   HYPERVISOR_MAX_WORKERS, 0,
+				   HYPERVISOR_MAX_WORKERS, 6,
 				   NULL);
 
 }
@@ -311,26 +322,29 @@ void set_hypervisor_conf(int event, int task_tag)
 							   HYPERVISOR_TIME_TO_APPLY, task_tag,
 							   NULL);
 
+				printf("%d: set max %d for tag %d\n", p2.ctx, 4, task_tag);
 				sched_ctx_hypervisor_ioctl(p1.ctx,
 							   HYPERVISOR_MIN_WORKERS, 6,
 							   HYPERVISOR_MAX_WORKERS, 8,
 							   HYPERVISOR_TIME_TO_APPLY, task_tag,
 							   NULL);
+				printf("%d: set max %d for tag %d\n", p1.ctx, 8, task_tag);
 				sched_ctx_hypervisor_resize(p1.ctx, task_tag);
 			}
 			if(it == 2)
 			{
 				sched_ctx_hypervisor_ioctl(p2.ctx,
-							   HYPERVISOR_MIN_WORKERS, 10,
+							   HYPERVISOR_MIN_WORKERS, 12,
 							   HYPERVISOR_MAX_WORKERS, 12,
 							   HYPERVISOR_TIME_TO_APPLY, task_tag,
 							   NULL);
-
+				printf("%d: set max %d for tag %d\n", p2.ctx, 12, task_tag);
 				sched_ctx_hypervisor_ioctl(p1.ctx,
 							   HYPERVISOR_MIN_WORKERS, 0,
 							   HYPERVISOR_MAX_WORKERS, 0,
 							   HYPERVISOR_TIME_TO_APPLY, task_tag,
 							   NULL);
+				printf("%d: set max %d for tag %d\n", p1.ctx, 0, task_tag);
 				sched_ctx_hypervisor_resize(p1.ctx, task_tag);
 			}
 			it++;
@@ -341,18 +355,23 @@ void set_hypervisor_conf(int event, int task_tag)
 	{
 		if(event == END_BENCH)
 		{
-			sched_ctx_hypervisor_ioctl(p1.ctx,
-						   HYPERVISOR_MIN_WORKERS, 10,
-						   HYPERVISOR_MAX_WORKERS, 12,
-						   HYPERVISOR_TIME_TO_APPLY, task_tag,
-						   NULL);
-			
-			sched_ctx_hypervisor_ioctl(p2.ctx,
-						   HYPERVISOR_MIN_WORKERS, 0,
-						   HYPERVISOR_MAX_WORKERS, 0,
-						   HYPERVISOR_TIME_TO_APPLY, task_tag,
-						   NULL);
-			sched_ctx_hypervisor_resize(p2.ctx, task_tag);
+			if(it2 < 3)
+			{
+				sched_ctx_hypervisor_ioctl(p1.ctx,
+							   HYPERVISOR_MIN_WORKERS, 6,
+							   HYPERVISOR_MAX_WORKERS, 12,
+							   HYPERVISOR_TIME_TO_APPLY, task_tag,
+							   NULL);
+				printf("%d: set max %d for tag %d\n", p1.ctx, 12, task_tag);
+				sched_ctx_hypervisor_ioctl(p2.ctx,
+							   HYPERVISOR_MIN_WORKERS, 0,
+							   HYPERVISOR_MAX_WORKERS, 0,
+							   HYPERVISOR_TIME_TO_APPLY, task_tag,
+							   NULL);
+				printf("%d: set max %d for tag %d\n", p2.ctx, 0, task_tag);
+				sched_ctx_hypervisor_resize(p2.ctx, task_tag);
+			}
+			it2++;
 		}
 	}
 

+ 8 - 1
sched_ctx_hypervisor/include/sched_ctx_hypervisor.h

@@ -46,6 +46,12 @@ void sched_ctx_hypervisor_ioctl(unsigned sched_ctx, ...);
 
 void sched_ctx_hypervisor_steal_workers(unsigned sched_ctx, int *workers, int nworkers, int task_tag);
 
+int* sched_ctx_hypervisor_get_sched_ctxs();
+
+int sched_ctx_hypervisor_get_nsched_ctxs();
+
+double sched_ctx_hypervisor_get_debit(unsigned sched_ctx);
+
 /* hypervisor policies */
 #define SIMPLE_POLICY 1
 
@@ -55,7 +61,8 @@ struct hypervisor_policy {
 	void (*add_sched_ctx)(unsigned sched_ctx);
 	void(*remove_sched_ctx)(unsigned sched_ctx);
 	void* (*ioctl)(unsigned sched_ctx, va_list varg_list, unsigned later);
-	void (*manage_idle_time)(unsigned req_sched_ctx, int *sched_ctxs, unsigned nsched_ctxs, int worker, double idle_time);
+	void (*manage_idle_time)(unsigned req_sched_ctx, int worker, double idle_time);
+	void (*manage_task_flux)(unsigned sched_ctx);
 	unsigned (*resize)(unsigned sched_ctx, int *sched_ctxs, unsigned nsched_ctxs);
 	void (*update_config)(void* old_config, void* new_config);
 };

+ 105 - 10
sched_ctx_hypervisor/src/hypervisor_policies/simple_policy.c

@@ -100,12 +100,15 @@ static int _compute_priority(unsigned sched_ctx)
 	return total_priority;
 }
 
-static unsigned _find_poor_sched_ctx(unsigned req_sched_ctx, int *sched_ctxs, int nsched_ctxs, int nworkers_to_move)
+static unsigned _find_poor_sched_ctx(unsigned req_sched_ctx, int nworkers_to_move)
 {
 	int i;
 	int highest_priority = -1;
 	int current_priority = 0;
 	unsigned sched_ctx = STARPU_NMAX_SCHED_CTXS;
+	int *sched_ctxs = sched_ctx_hypervisor_get_sched_ctxs();
+	int nsched_ctxs = sched_ctx_hypervisor_get_nsched_ctxs();
+
 
 	struct simple_policy_config *config = NULL;
 
@@ -130,7 +133,7 @@ static unsigned _find_poor_sched_ctx(unsigned req_sched_ctx, int *sched_ctxs, in
 	return sched_ctx;
 }
 
-int* _get_first_workers(unsigned sched_ctx, int *nworkers)
+int* _get_first_workers(unsigned sched_ctx, unsigned *nworkers)
 {
 	struct simple_policy_config *config = (struct simple_policy_config*)sched_ctx_hypervisor_get_config(sched_ctx);
 
@@ -242,26 +245,83 @@ static unsigned _get_nworkers_to_move(unsigned req_sched_ctx)
 			else
 				nworkers_to_move = potential_moving_workers - (config->min_nworkers - nfixed_workers);	
 		}
+		printf("nworkers = %d nworkers_to_move = %d max_nworkers=%d\n", nworkers, nworkers_to_move, config->max_nworkers);
 		if((nworkers - nworkers_to_move) > config->max_nworkers)
 			nworkers_to_move = nworkers - config->max_nworkers;
 	}
 	return nworkers_to_move;
 }
 
-static unsigned simple_resize(unsigned req_sched_ctx, int *sched_ctxs, int nsched_ctxs)
+static int _find_fastest_sched_ctx()
+{
+	int *sched_ctxs = sched_ctx_hypervisor_get_sched_ctxs();
+	int nsched_ctxs = sched_ctx_hypervisor_get_nsched_ctxs();
+
+	int fastest_sched_ctx = -1;
+	double fastest_debit = -1.0, curr_debit = 0.0;
+	int i;
+	for(i = 0; i < nsched_ctxs; i++)
+	{
+		curr_debit = sched_ctx_hypervisor_get_debit(sched_ctxs[i]);
+		if(fastest_debit <= curr_debit)
+		{
+			fastest_debit = curr_debit;
+			fastest_sched_ctx = sched_ctxs[i];
+		}
+	}
+
+	return fastest_sched_ctx;
+}
+
+static int _find_slowest_sched_ctx()
+{
+	int *sched_ctxs = sched_ctx_hypervisor_get_sched_ctxs();
+	int nsched_ctxs = sched_ctx_hypervisor_get_nsched_ctxs();
+
+	int slowest_sched_ctx = -1;
+	double slowest_debit = 1.0, curr_debit = 0.0;
+	int i;
+	for(i = 0; i < nsched_ctxs; i++)
+	{
+		curr_debit = sched_ctx_hypervisor_get_debit(sched_ctxs[i]);
+		if(slowest_debit >= curr_debit)
+		{
+			slowest_debit = curr_debit;
+			slowest_sched_ctx = sched_ctxs[i];
+		}
+	}
+
+	return slowest_sched_ctx;
+}
+
+static unsigned _simple_resize(unsigned sender_sched_ctx, unsigned receiver_sched_ctx)
 {
 	int ret = pthread_mutex_trylock(&act_hypervisor_mutex);
 	if(ret != EBUSY)
 	{					
-		unsigned nworkers_to_move = _get_nworkers_to_move(req_sched_ctx);
+		unsigned nworkers_to_move = _get_nworkers_to_move(sender_sched_ctx);
 		
+		if(sender_sched_ctx == 2)
+			printf("try to resize with nworkers = %d\n", nworkers_to_move);
 		if(nworkers_to_move > 0)
 		{
-			unsigned poor_sched_ctx = _find_poor_sched_ctx(req_sched_ctx, sched_ctxs, nsched_ctxs, nworkers_to_move);
+			unsigned poor_sched_ctx = STARPU_NMAX_SCHED_CTXS;
+			if(receiver_sched_ctx == STARPU_NMAX_SCHED_CTXS)
+				poor_sched_ctx = _find_poor_sched_ctx(sender_sched_ctx, nworkers_to_move);
+			else
+			{
+				poor_sched_ctx = receiver_sched_ctx;
+				struct simple_policy_config *config = (struct simple_policy_config*)sched_ctx_hypervisor_get_config(poor_sched_ctx);
+				unsigned nworkers = starpu_get_nworkers_of_sched_ctx(poor_sched_ctx);
+				if((nworkers+nworkers_to_move) > config->max_nworkers)
+					nworkers_to_move = nworkers > config->max_nworkers ? 0 : (config->max_nworkers - nworkers);
+				if(nworkers_to_move == 0) poor_sched_ctx = STARPU_NMAX_SCHED_CTXS;
+			}
+			
 			if(poor_sched_ctx != STARPU_NMAX_SCHED_CTXS)
-			{					
-				int *workers_to_move = _get_first_workers(req_sched_ctx, &nworkers_to_move);
-				sched_ctx_hypervisor_move_workers(req_sched_ctx, poor_sched_ctx, workers_to_move, nworkers_to_move);
+			{						
+				int *workers_to_move = _get_first_workers(sender_sched_ctx, &nworkers_to_move);
+				sched_ctx_hypervisor_move_workers(sender_sched_ctx, poor_sched_ctx, workers_to_move, nworkers_to_move);
 				
 				struct simple_policy_config *new_config = (struct simple_policy_config*)sched_ctx_hypervisor_get_config(poor_sched_ctx);
 				int i;
@@ -278,15 +338,46 @@ static unsigned simple_resize(unsigned req_sched_ctx, int *sched_ctxs, int nsche
 
 }
 
-static void simple_manage_idle_time(unsigned req_sched_ctx, int *sched_ctxs, int nsched_ctxs, int worker, double idle_time)
+static unsigned simple_resize(unsigned sender_sched_ctx)
+{
+	return _simple_resize(sender_sched_ctx, STARPU_NMAX_SCHED_CTXS);
+}
+
+static void simple_manage_idle_time(unsigned req_sched_ctx, int worker, double idle_time)
 {
        	struct simple_policy_config *config = (struct simple_policy_config*)sched_ctx_hypervisor_get_config(req_sched_ctx);
 
 	if(config != NULL && idle_time > config->max_idle[worker])
-		simple_resize(req_sched_ctx, sched_ctxs, nsched_ctxs);
+		simple_resize(req_sched_ctx);
 	return;
 }
 
+static void simple_manage_task_flux(unsigned curr_sched_ctx)
+{
+	double curr_debit = sched_ctx_hypervisor_get_debit(curr_sched_ctx);
+	
+	int slow_sched_ctx = _find_slowest_sched_ctx();
+	int fast_sched_ctx = _find_fastest_sched_ctx();
+	if(slow_sched_ctx != fast_sched_ctx && slow_sched_ctx != -1 && fast_sched_ctx != -1)
+	{
+		if(curr_sched_ctx == slow_sched_ctx)
+		{
+			double debit_fast = sched_ctx_hypervisor_get_debit(fast_sched_ctx);
+			/* only if there is a difference of 30 % */
+			if(debit_fast != 0.0 && debit_fast > (curr_debit + curr_debit * 0.1))
+				_simple_resize(fast_sched_ctx, curr_sched_ctx);
+		}
+		
+		if(curr_sched_ctx == fast_sched_ctx)
+		{
+			double debit_slow = sched_ctx_hypervisor_get_debit(slow_sched_ctx);
+			/* only if there is a difference of 30 % */
+			if(curr_debit != 0.0 && (debit_slow + debit_slow *0.1) < curr_debit)
+				_simple_resize(curr_sched_ctx, slow_sched_ctx);
+		}
+	}
+}
+
 static void* simple_ioctl(unsigned sched_ctx, va_list varg_list, unsigned later)
 {
 	struct simple_policy_config *config = NULL;
@@ -352,6 +443,8 @@ static void* simple_ioctl(unsigned sched_ctx, va_list varg_list, unsigned later)
 
 		case HYPERVISOR_MAX_WORKERS:
 			config->max_nworkers = va_arg(varg_list, unsigned);
+			if(config->max_nworkers == 0)
+			  printf("%d: max nworkers = 0\n", sched_ctx);
 			break;
 
 		case HYPERVISOR_GRANULARITY:
@@ -392,6 +485,7 @@ static void simple_update_config(void *old_config, void* config)
 	struct simple_policy_config *old = (struct simple_policy_config*)old_config;
 	struct simple_policy_config *new = (struct simple_policy_config*)config;
 
+	printf("new = %d old = %d\n", new->max_nworkers, old->max_nworkers);
 	old->min_nworkers = new->min_nworkers != -1 ? new->min_nworkers : old->min_nworkers ;
 	old->max_nworkers = new->max_nworkers != -1 ? new->max_nworkers : old->max_nworkers ;
 	old->new_workers_max_idle = new->new_workers_max_idle != -1.0 ? new->new_workers_max_idle : old->new_workers_max_idle;
@@ -420,6 +514,7 @@ struct hypervisor_policy simple_policy = {
 	.remove_sched_ctx = simple_remove_sched_ctx,
 	.ioctl = simple_ioctl,
 	.manage_idle_time = simple_manage_idle_time,
+	.manage_task_flux = simple_manage_task_flux,
 	.resize = simple_resize,
 	.update_config = simple_update_config
 };

+ 61 - 15
sched_ctx_hypervisor/src/sched_ctx_hypervisor.c

@@ -24,6 +24,7 @@ static void _load_hypervisor_policy(int type)
 		hypervisor.policy.manage_idle_time = simple_policy.manage_idle_time;
 		hypervisor.policy.update_config = simple_policy.update_config;
 		hypervisor.policy.resize = simple_policy.resize;
+		hypervisor.policy.manage_task_flux = simple_policy.manage_task_flux;
 		break;
 	}
 }
@@ -41,12 +42,16 @@ struct starpu_sched_ctx_hypervisor_criteria* sched_ctx_hypervisor_init(int type)
 		hypervisor.sched_ctxs[i] = STARPU_NMAX_SCHED_CTXS;
 		hypervisor.sched_ctx_w[i].sched_ctx = STARPU_NMAX_SCHED_CTXS;
 		hypervisor.sched_ctx_w[i].config = NULL;
+		hypervisor.sched_ctx_w[i].temp_npushed_tasks = 0;
+		hypervisor.sched_ctx_w[i].temp_npoped_tasks = 0;
+
 		int j;
 		for(j = 0; j < STARPU_NMAXWORKERS; j++)
 		{
 			hypervisor.sched_ctx_w[i].current_idle_time[j] = 0.0;
-			hypervisor.sched_ctx_w[i].tasks[j] = 0;
+			hypervisor.sched_ctx_w[i].pushed_tasks[j] = 0;
 			hypervisor.sched_ctx_w[i].poped_tasks[j] = 0;
+
 		}
 	}
 
@@ -63,19 +68,14 @@ struct starpu_sched_ctx_hypervisor_criteria* sched_ctx_hypervisor_init(int type)
 
 void sched_ctx_hypervisor_stop_resize(unsigned sched_ctx)
 {
-	pthread_mutex_lock(&act_hypervisor_mutex);
 	imposed_resize = 1;
 	hypervisor.resize[sched_ctx] = 0;
-	pthread_mutex_unlock(&act_hypervisor_mutex);
-			
 }
 
 void sched_ctx_hypervisor_start_resize(unsigned sched_ctx)
 {
-	pthread_mutex_lock(&act_hypervisor_mutex);	
 	imposed_resize = 1;
 	hypervisor.resize[sched_ctx] = 1;
-	pthread_mutex_unlock(&act_hypervisor_mutex);
 }
 
 void sched_ctx_hypervisor_shutdown(void)
@@ -158,8 +158,11 @@ void sched_ctx_hypervisor_ignore_ctx(unsigned sched_ctx)
 
 void sched_ctx_hypervisor_set_config(unsigned sched_ctx, void *config)
 {
+	    printf("%d: ", sched_ctx );
 	if(hypervisor.sched_ctx_w[sched_ctx].config != NULL && config != NULL)
+	  {
 		hypervisor.policy.update_config(hypervisor.sched_ctx_w[sched_ctx].config, config);
+	  }
 	else
 		hypervisor.sched_ctx_w[sched_ctx].config = config;
 
@@ -219,9 +222,19 @@ static int get_ntasks( int *tasks)
 	return ntasks;
 }
 
+static void reset_ntasks( int *tasks)
+{
+	int j;
+	for(j = 0; j < STARPU_NMAXWORKERS; j++)
+	{
+		tasks[j] = 0;
+	}
+	return;
+}
+
 static unsigned check_tasks_of_sched_ctx(unsigned sched_ctx)
 {
-	int ntasks = get_ntasks(hypervisor.sched_ctx_w[sched_ctx].tasks);
+	int ntasks = get_ntasks(hypervisor.sched_ctx_w[sched_ctx].pushed_tasks);
 	
 	return ntasks > hypervisor.min_tasks;
 }
@@ -344,7 +357,7 @@ static void idle_time_cb(unsigned sched_ctx, int worker, double idle_time)
 	if(hypervisor.resize[sched_ctx] && hypervisor.nsched_ctxs > 1 && hypervisor.policy.manage_idle_time)
 	{
 		hypervisor.sched_ctx_w[sched_ctx].current_idle_time[worker] += idle_time;
-		hypervisor.policy.manage_idle_time(sched_ctx, hypervisor.sched_ctxs, hypervisor.nsched_ctxs, worker, hypervisor.sched_ctx_w[sched_ctx].current_idle_time[worker]);
+		hypervisor.policy.manage_idle_time(sched_ctx, worker, hypervisor.sched_ctx_w[sched_ctx].current_idle_time[worker]);
 
 	}
 	return;
@@ -355,12 +368,45 @@ static void working_time_cb(unsigned sched_ctx, int worker, double working_time,
 	return;
 }
 
+int* sched_ctx_hypervisor_get_sched_ctxs()
+{
+	return hypervisor.sched_ctxs;
+}
+
+int sched_ctx_hypervisor_get_nsched_ctxs()
+{
+	return hypervisor.nsched_ctxs;
+}
+
+double sched_ctx_hypervisor_get_debit(unsigned sched_ctx)
+{
+	unsigned nworkers = starpu_get_nworkers_of_sched_ctx(sched_ctx);
+	if(nworkers == 0)
+		return 0.0;
+
+	int npushed_tasks = get_ntasks(hypervisor.sched_ctx_w[sched_ctx].pushed_tasks);
+	int npoped_tasks = get_ntasks(hypervisor.sched_ctx_w[sched_ctx].poped_tasks);
+	if(hypervisor.sched_ctx_w[sched_ctx].temp_npushed_tasks != npushed_tasks || hypervisor.sched_ctx_w[sched_ctx].temp_npoped_tasks!= npoped_tasks)
+	{
+		hypervisor.sched_ctx_w[sched_ctx].temp_npushed_tasks = npushed_tasks;
+		hypervisor.sched_ctx_w[sched_ctx].temp_npoped_tasks = npoped_tasks;
+		
+		STARPU_ASSERT(npoped_tasks <= npushed_tasks);
+		if(npushed_tasks > 0 && npoped_tasks > 0)
+		{
+			double debit = (((double)npoped_tasks)*1.0)/((double)npushed_tasks * 1.0);
+			return debit;
+		}
+	}
+
+	return 0.0;
+}
 
 static void pushed_task_cb(unsigned sched_ctx, int worker)
 {	
-	hypervisor.sched_ctx_w[sched_ctx].tasks[worker]++;
+	hypervisor.sched_ctx_w[sched_ctx].pushed_tasks[worker]++;
        
-	int ntasks = get_ntasks(hypervisor.sched_ctx_w[sched_ctx].tasks);
+	int ntasks = get_ntasks(hypervisor.sched_ctx_w[sched_ctx].pushed_tasks);
 	
 	if(!imposed_resize)
 		hypervisor.resize[sched_ctx] = (ntasks > hypervisor.min_tasks);
@@ -368,17 +414,17 @@ static void pushed_task_cb(unsigned sched_ctx, int worker)
 
 static void poped_task_cb(unsigned sched_ctx, int worker)
 {
-	/* hypervisor.sched_ctx_w[sched_ctx].poped_tasks[worker]++; */
-	/* int npoped_tasks = get_ntasks(hypervisor.sched_ctx_w[sched_ctx].poped_tasks); */
-       	/* int ntasks = get_ntasks(hypervisor.sched_ctx_w[sched_ctx].tasks); */
-	/* hypervisor.resize = ((ntasks - npoped_tasks) > 0); */
+	hypervisor.sched_ctx_w[sched_ctx].poped_tasks[worker]++;
+	
+	/* if(hypervisor.nsched_ctxs > 1) */
+	/* 	hypervisor.policy.manage_task_flux(sched_ctx); */
 }
 
 static void post_exec_hook_cb(unsigned sched_ctx, int task_tag)
 {
 	STARPU_ASSERT(task_tag > 0);
 
-	if(hypervisor.nsched_ctxs > 1)
+	if(hypervisor.nsched_ctxs > 1 && hypervisor.resize[sched_ctx])
 	{
 		unsigned conf_sched_ctx;
 		int i;

+ 4 - 1
sched_ctx_hypervisor/src/sched_ctx_hypervisor_intern.h

@@ -4,8 +4,11 @@ struct sched_ctx_wrapper {
 	unsigned sched_ctx;
 	void *config;
 	double current_idle_time[STARPU_NMAXWORKERS];
-	int tasks[STARPU_NMAXWORKERS];
+	int pushed_tasks[STARPU_NMAXWORKERS];
 	int poped_tasks[STARPU_NMAXWORKERS];
+	int temp_npushed_tasks;
+	int temp_npoped_tasks;
+
 };
 
 struct sched_ctx_hypervisor {

+ 10 - 19
src/core/sched_ctx.c

@@ -640,32 +640,23 @@ unsigned starpu_get_nworkers_of_sched_ctx(unsigned sched_ctx_id)
 }
 
 #ifdef STARPU_USE_SCHED_CTX_HYPERVISOR
-void starpu_call_poped_task_cb(int workerid)
+void starpu_call_poped_task_cb(int workerid, unsigned sched_ctx_id)
 {
 	struct starpu_worker_s *worker =  _starpu_get_worker_struct(workerid);
-	unsigned i;
-	struct starpu_sched_ctx *sched_ctx = NULL;
-	for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
-	{
-		sched_ctx = worker->sched_ctx[i];
-		if(sched_ctx != NULL && sched_ctx->id != 0 && sched_ctx->id != STARPU_NMAX_SCHED_CTXS
-		   && sched_ctx->criteria != NULL)
-			sched_ctx->criteria->poped_task_cb(sched_ctx->id, worker->workerid);
-	}
+	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
 	
+	if(sched_ctx != NULL && sched_ctx_id != 0 && sched_ctx_id != STARPU_NMAX_SCHED_CTXS
+		   && sched_ctx->criteria != NULL)
+		sched_ctx->criteria->poped_task_cb(sched_ctx_id, worker->workerid);
 }
 
-void starpu_call_pushed_task_cb(int workerid)
+void starpu_call_pushed_task_cb(int workerid, unsigned sched_ctx_id)
 {
 	struct starpu_worker_s *worker =  _starpu_get_worker_struct(workerid);
-	unsigned i;
-	struct starpu_sched_ctx *sched_ctx = NULL;
-	for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
-	{
-		sched_ctx = worker->sched_ctx[i];
-		if(sched_ctx != NULL && sched_ctx->id != 0  && sched_ctx->criteria != NULL)
-			sched_ctx->criteria->pushed_task_cb(sched_ctx->id, workerid);
-	}
+	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+
+	if(sched_ctx != NULL && sched_ctx_id != 0  && sched_ctx->criteria != NULL)
+			sched_ctx->criteria->pushed_task_cb(sched_ctx_id, workerid);
 
 }
 #endif //STARPU_USE_SCHED_CTX_HYPERVISOR

+ 11 - 14
src/sched_policies/heft.c

@@ -144,14 +144,13 @@ static void heft_post_exec_hook(struct starpu_task *task)
 		sched_cond = &workerarg->sched_cond;
 		starpu_worker_set_sched_condition(sched_ctx_id, workerid, sched_mutex, sched_cond);
 	}
-	/* Once we have executed the task, we can update the predicted amount
-	 * of work. */
-	PTHREAD_MUTEX_LOCK(sched_mutex);
-
 #ifdef STARPU_USE_SCHED_CTX_HYPERVISOR
-	starpu_call_poped_task_cb(workerid);
+	starpu_call_poped_task_cb(workerid, sched_ctx_id);
 #endif //STARPU_USE_SCHED_CTX_HYPERVISOR
 
+	/* Once we have executed the task, we can update the predicted amount
+	 * of work. */
+	PTHREAD_MUTEX_LOCK(sched_mutex);
 	exp_len[workerid] -= model;
 	exp_start[workerid] = starpu_timing_now() + model;
 	exp_end[workerid] = exp_start[workerid] + exp_len[workerid];
@@ -180,13 +179,12 @@ static void heft_push_task_notify(struct starpu_task *task, int workerid)
 		starpu_worker_set_sched_condition(sched_ctx_id, workerid, sched_mutex, sched_cond);
 	}
 
-	/* Update the predictions */
-	PTHREAD_MUTEX_LOCK(sched_mutex);
-
 #ifdef STARPU_USE_SCHED_CTX_HYPERVISOR
-	starpu_call_pushed_task_cb(workerid);
+	starpu_call_pushed_task_cb(workerid, sched_ctx_id);
 #endif //STARPU_USE_SCHED_CTX_HYPERVISOR
 
+	/* Update the predictions */
+	PTHREAD_MUTEX_LOCK(sched_mutex);
 	/* Sometimes workers didn't take the tasks as early as we expected */
 	exp_start[workerid] = STARPU_MAX(exp_start[workerid], starpu_timing_now());
 	exp_end[workerid] = STARPU_MAX(exp_start[workerid], starpu_timing_now());
@@ -223,11 +221,11 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 		starpu_worker_set_sched_condition(sched_ctx_id, best_workerid, sched_mutex, sched_cond);
 	}
 
-	PTHREAD_MUTEX_LOCK(sched_mutex);
 #ifdef STARPU_USE_SCHED_CTX_HYPERVISOR
-	starpu_call_pushed_task_cb(best_workerid);
+	starpu_call_pushed_task_cb(best_workerid, sched_ctx_id);
 #endif //STARPU_USE_SCHED_CTX_HYPERVISOR
 
+	PTHREAD_MUTEX_LOCK(sched_mutex);
 	exp_end[best_workerid] += predicted;
 	exp_len[best_workerid] += predicted;
 	ntasks[best_workerid]++;
@@ -305,7 +303,6 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 				local_data_penalty[worker_ctx] = starpu_task_expected_data_transfer_time(memory_node, task);
 				local_power[worker_ctx] = starpu_task_expected_power(task, perf_arch, nimpl);
 				//_STARPU_DEBUG("Scheduler heft bundle: task length (%lf) local power (%lf) worker (%u) kernel (%u) \n", local_task_length[worker_ctx],local_power[worker_ctx],worker,nimpl);
-				//			printf("%d: task_len = %lf task_pen = %lf\n", worker, local_task_length[worker_ctx], local_data_penalty[worker_ctx]);
 			}
 			
 			double ntasks_end = ntasks[worker] / starpu_worker_get_relative_speedup(perf_arch);
@@ -477,7 +474,7 @@ static int _heft_push_task(struct starpu_task *task, unsigned prio, unsigned sch
 
 static int heft_push_task(struct starpu_task *task)
 {
-	unsigned sched_ctx_id = task->sched_ctx;;
+	unsigned sched_ctx_id = task->sched_ctx;
 	pthread_mutex_t *changing_ctx_mutex = starpu_get_changing_ctx_mutex(sched_ctx_id);
 	unsigned nworkers; 
 	int ret_val = -1;
@@ -490,7 +487,7 @@ static int heft_push_task(struct starpu_task *task)
 			PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
 			return ret_val;
 		}
-			
+
 		ret_val = _heft_push_task(task, 1, sched_ctx_id);
 		PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
 		return ret_val;