瀏覽代碼

last small things

Andra Hugo 13 年之前
父節點
當前提交
3a2ff609dd

+ 22 - 1
examples/sched_ctx_utils/sched_ctx_utils.c

@@ -98,10 +98,12 @@ void start_2benchs(void (*bench)(unsigned, unsigned))
 {
 	p1.bench = bench;
 	p1.size = size1;
+	printf("size %d\n", size1);
 	p1.nblocks = nblocks1;
 	
 	p2.bench = bench;
 	p2.size = size2;
+	printf("size %d\n", size2);
 	p2.nblocks = nblocks2;
 	
 	pthread_t tid[2];
@@ -190,15 +192,24 @@ void construct_contexts(void (*bench)(unsigned, unsigned))
 	int k = 0;
 
 	for(i = 0; i < gpu; i++)
+	{
 		procs[k++] = i;
+		printf("%d ", i);
+	}
 
 	for(i = gpu; i < gpu + gpu1; i++)
+	{
 		procs[k++] = i;
+		printf("%d ", i);
+	}
 
 
 	for(i = n_all_gpus; i < n_all_gpus + cpu1; i++)
+	{
 		procs[k++] = i;
-
+		printf("%d ", i);
+	}
+	printf("\n ");
 
 	p1.ctx = starpu_create_sched_ctx("heft", procs, nprocs1, "sched_ctx1");
 	p2.the_other_ctx = (int)p1.ctx;
@@ -209,13 +220,23 @@ void construct_contexts(void (*bench)(unsigned, unsigned))
 	k = 0;
 
 	for(i = 0; i < gpu; i++)
+	{
 		procs2[k++] = i;
+		printf("%d ", i);
+	}
 
 	for(i = gpu + gpu1; i < gpu + gpu1 + gpu2; i++)
+	{
 		procs2[k++] = i;
+		printf("%d ", i);
+	}
 
 	for(i = n_all_gpus  + cpu1; i < n_all_gpus + cpu1 + cpu2; i++)
+	{
 		procs2[k++] = i;
+		printf("%d ", i);
+	}
+	printf("\n");
 
 	p2.ctx = starpu_create_sched_ctx("heft", procs2, nprocs2, "sched_ctx2");
 	p1.the_other_ctx = (int)p2.ctx;

+ 2 - 0
include/starpu_scheduler.h

@@ -204,6 +204,8 @@ unsigned starpu_get_nworkers_of_sched_ctx(unsigned sched_ctx);
 
 unsigned starpu_get_nshared_workers(unsigned sched_ctx_id, unsigned sched_ctx_id2);
 
+unsigned starpu_worker_belongs_to_sched_ctx(int workerid, unsigned sched_ctx_id);
+
 /* Check if the worker specified by workerid can execute the codelet. */
 int starpu_worker_can_execute_task(unsigned workerid, struct starpu_task *task, unsigned nimpl);
 

+ 2 - 0
sched_ctx_hypervisor/include/sched_ctx_hypervisor.h

@@ -83,9 +83,11 @@ struct sched_ctx_wrapper {
 	double total_flops;
 	double total_elapsed_flops[STARPU_NMAXWORKERS];
 	double elapsed_flops[STARPU_NMAXWORKERS];
+	double submitted_flops;
 	double remaining_flops;
 	double start_time;
 	struct resize_ack resize_ack;
+	pthread_mutex_t mutex;
 };
 
 struct hypervisor_policy {

+ 227 - 88
sched_ctx_hypervisor/src/hypervisor_policies/lp2_policy.c

@@ -76,10 +76,14 @@ static void _starpu_get_tasks_times(int nw, int nt, double times[nw][nt])
 
                         if (isnan(length))
                                 times[w][t] = NAN;
-                        else
+                       else
                                 times[w][t] = length / 1000.;
+			
+//			printf("t%d on worker %d ctx %d: %lf \n", t, w, tp->sched_ctx_id, times[w][t]);
                 }
+//		printf("\n");
         }
+//	printf("\n");
 }
 
 int _get_idx_sched_ctx(int sched_ctx_id)
@@ -98,7 +102,7 @@ int _get_idx_sched_ctx(int sched_ctx_id)
  */
 #ifdef HAVE_GLPK_H
 #include <glpk.h>
-static void _glp_resolve(int ns, int nw, int nt, double res[ns][nw][nt], int integer)
+static void _glp_resolve(int ns, int nw, int nt, double tasks[nw][nt])
 {
 	struct bound_task_pool * tp;
 	int t, w, s;
@@ -133,14 +137,47 @@ static void _glp_resolve(int ns, int nw, int nt, double res[ns][nw][nt], int int
 				char name[32];
 				snprintf(name, sizeof(name), "w%dt%dn", w, t);
 				glp_set_col_name(lp, colnum(w, t), name);
-				if (integer)
-					glp_set_col_kind(lp, colnum(w, t), GLP_IV);
 				glp_set_col_bnds(lp, colnum(w, t), GLP_LO, 0., 0.);
 			}
 		glp_set_col_bnds(lp, nw*nt+1, GLP_LO, 0., 0.);
 
+		int *sched_ctxs = sched_ctx_hypervisor_get_sched_ctxs();
+
+		/* Number of task * time > 0.3 * tmax */
+		glp_add_rows(lp, nw*ns);
+		for(s = 0; s < ns; s++)
+		{
+			for (w = 0; w < nw; w++)
+			{
+				char name[32], title[64];
+				starpu_worker_get_name(w, name, sizeof(name));
+				snprintf(title, sizeof(title), "worker %x ctx %x limit", w, s);
+				glp_set_row_name(lp, w+(s*nw)+1, title);
+				for (t = 0, tp = task_pools; tp; t++, tp = tp->next)
+				{
+					if(tp->sched_ctx_id == sched_ctxs[s])
+					{
+						ia[n] = w+(s*nw)+1;
+						ja[n] = colnum(w, t);
+						ar[n] = times[w][t];
+						
+						n++;
+					}
+				}
+
+				/* tmax */
+				ia[n] = w+(s*nw)+1;
+				ja[n] = nw*nt+1;
+				ar[n] = -1;
+				n++;
+
+				glp_set_row_bnds(lp, w+(s*nw)+1, GLP_UP, 0.0, 0.0);
+			}
+ 		}
+
+		int curr_row_idx = nw*ns;
 		/* Total worker execution time */
-		glp_add_rows(lp, nw);
+		glp_add_rows(lp, nw*ns);
 		for (t = 0, tp = task_pools; tp; t++, tp = tp->next)
 		{
 			int someone = 0;
@@ -156,26 +193,30 @@ static void _glp_resolve(int ns, int nw, int nt, double res[ns][nw][nt], int int
 		}
 		for (w = 0; w < nw; w++)
 		{
+			
 			char name[32], title[64];
 			starpu_worker_get_name(w, name, sizeof(name));
 			snprintf(title, sizeof(title), "worker %s", name);
-			glp_set_row_name(lp, w+1, title);
+			glp_set_row_name(lp, curr_row_idx+w+1, title);
 			for (t = 0, tp = task_pools; tp; t++, tp = tp->next)
 			{
-				ia[n] = w+1;
+				ia[n] = curr_row_idx+w+1;
 				ja[n] = colnum(w, t);
 				if (isnan(times[w][t]))
 					ar[n] = 1000000000.;
 				else
 					ar[n] = times[w][t];
+				if(starpu_worker_belongs_to_sched_ctx(w, tp->sched_ctx_id))
+					ar[n] = 100000;
+				
 				n++;
 			}
 			/* tmax */
-			ia[n] = w+1;
+			ia[n] = curr_row_idx+w+1;
 			ja[n] = nw*nt+1;
 			ar[n] = -1;
 			n++;
-			glp_set_row_bnds(lp, w+1, GLP_UP, 0, 0);
+			glp_set_row_bnds(lp, curr_row_idx+w+1, GLP_UP, 0, 0);
 		}
 
 		/* Total task completion */
@@ -185,49 +226,19 @@ static void _glp_resolve(int ns, int nw, int nt, double res[ns][nw][nt], int int
 			char name[32], title[64];
 			starpu_worker_get_name(w, name, sizeof(name));
 			snprintf(title, sizeof(title), "task %s key %x", tp->cl->name, (unsigned) tp->footprint);
-			glp_set_row_name(lp, nw+t+1, title);
+			glp_set_row_name(lp, curr_row_idx+nw+t+1, title);
 			for (w = 0; w < nw; w++)
 			{
-				ia[n] = nw+t+1;
+				ia[n] = curr_row_idx+nw+t+1;
 				ja[n] = colnum(w, t);
 				ar[n] = 1;
 				n++;
 			}
-			glp_set_row_bnds(lp, nw+t+1, GLP_FX, tp->n, tp->n);
+			glp_set_row_bnds(lp, curr_row_idx+nw+t+1, GLP_FX, tp->n, tp->n);
 		}
 
-		int *sched_ctxs = sched_ctx_hypervisor_get_sched_ctxs();
-		/* Number of task * time > 0.3 * tmax */
-		glp_add_rows(lp, nw*ns);
-		for (w = 0; w < nw; w++)
-		{
-			for(s = 0; s < ns; s++)
-			{
-				char name[32], title[64];
-				starpu_worker_get_name(w, name, sizeof(name));
-				snprintf(title, sizeof(title), "worker %x ctx %x limit", w, s);
-				glp_set_row_name(lp, nw+nt+w+(s*nw)+1, title);
-				for (t = 0, tp = task_pools; tp; t++, tp = tp->next)
-				{
-					if(tp->sched_ctx_id == sched_ctxs[s])
-					{
-						ia[n] = nw+nt+w+(s*nw)+1;
-						ja[n] = colnum(w, t);
-						ar[n] = 1;
-						n++;
-					}
-				}
-
-				/* tmax */
-				ia[n] = nw+nt+w+(s*nw)+1;
-				ja[n] = nw*nt+1;
-				ar[n] = -0.3;
-				n++;
-
-				glp_set_row_bnds(lp, nw+nt+w+(s*nw)+1, GLP_UP, 0.0, 0.0);
-			}
-		}
 
+//		printf("n = %d nw*ns  = %d ne = %d\n", n, nw*ns, ne);
 		STARPU_ASSERT(n == ne);
 
 		glp_load_matrix(lp, ne-1, ia, ja, ar);
@@ -244,32 +255,134 @@ static void _glp_resolve(int ns, int nw, int nt, double res[ns][nw][nt], int int
 		return NULL;
 	}
 
-        if (integer)
-        {
-                glp_iocp iocp;
-                glp_init_iocp(&iocp);
-                iocp.msg_lev = GLP_MSG_OFF;
-                glp_intopt(lp, &iocp);
-        }
+	double tmax = glp_get_obj_val(lp);
 
+//        printf("Theoretical minimum execution time: %f ms\n", tmax);
+	for (w = 0; w < nw; w++)
+	{
+		for (t = 0, tp = task_pools; tp; t++, tp = tp->next)
+		{
+			tasks[w][t] = glp_get_col_prim(lp, colnum(w, t));
+//			printf("t%d worker %d ctx %d res %lf \n", t, w, tasks[w][t]);
+		}
+	}
 
-	double tmax = glp_get_obj_val(lp);
+	glp_delete_prob(lp);
+}
 
-        printf("Theoretical minimum execution time: %f ms\n", tmax);
+int _get_worker_having_tasks_of_this_ctx(int worker, int nw, int nt, double tasks[nw][nt], int sched_ctx)
+{
+	int t, w;
+	struct bound_task_pool * tp;
+	for(w = 0; w < nw; w++)
+	{
+		for (t = 0, tp = task_pools; tp; t++, tp = tp->next)
+			if(w != worker && tasks[w][t] >= 1.0 && tp->sched_ctx_id == sched_ctx)
+				return w;
+	}
+	return -1;
+}
+int _get_worker_full_of_tasks_of_this_ctx(int worker, int nw, int nt, double tasks[nw][nt], int sched_ctx)
+{
+	int t, w;
+	struct bound_task_pool * tp;
+	for(w = 0; w < nw; w++)
+		for (t = 0, tp = task_pools; tp; t++, tp = tp->next)
+			if(w != worker && tasks[w][t] > 0.3 * tp->n && tp->sched_ctx_id == sched_ctx)
+				return w;
+	return -1;
+}
 
-	for (t = 0, tp = task_pools; tp; t++, tp = tp->next)
+void _get_tasks_from_busiest_worker(int nw, int nt, double tasks[nw][nt], int worker)
+{
+	int w, t;
+	double tasks_per_worker[nw];
+	double max_tasks = 0.0;
+	int busiest_worker = -1;
+	printf("got inside \n");
+	for(w = 0; w < nw; w++)
 	{
-		for (w = 0; w < nw; w++)
+		if(w != worker)
+		{
+			tasks_per_worker[w] = 0.0;
+			for(t = 0; t < nt; t++)
+			{
+				tasks_per_worker[w] += tasks[w][t];
+			}
+			if(max_tasks < tasks_per_worker[w])
+			{
+				max_tasks = tasks_per_worker[w];
+				busiest_worker = w;
+			}
+		}
+	}
+	for(t = 0; t < nt; t++)
+	{
+		if(tasks_per_worker[busiest_worker] > (max_tasks / 2))
 		{
-			s = _get_idx_sched_ctx(tp->sched_ctx_id);
-			res[s][w][t] = glp_get_col_prim(lp, colnum(w, t));
+			tasks[worker][t] = tasks[busiest_worker][t];
+			tasks_per_worker[busiest_worker] -= tasks[busiest_worker][t];
+			tasks[busiest_worker][t] = 0.0;
+		}
+	}
+}
+void _recompute_resource_distrib(int nw, int nt, double tasks[nw][nt])
+{
+	int w, s, t;
+	struct bound_task_pool * tp;
+	for(w = 0; w < nw; w++)
+	{
+		int no_ctxs = 0;
+		int last_ctx = -1;
+		for (t = 0, tp = task_pools; tp; t++, tp = tp->next)
+		{
+			if(tasks[w][t] >= 1.0)
+			{
+				if(last_ctx != -1 && tp->sched_ctx_id != last_ctx)
+				{
+					enum starpu_archtype arch = starpu_worker_get_type(w);
+					int w2 = -1;
+					if(arch == STARPU_CPU_WORKER)
+						w2 = _get_worker_having_tasks_of_this_ctx(w, nw, nt, tasks, tp->sched_ctx_id);
+					else if(arch == STARPU_CUDA_WORKER && tasks[w][t] < 0.3*tp->n)
+						w2 = _get_worker_full_of_tasks_of_this_ctx(w, nw, nt, tasks, tp->sched_ctx_id);
+					
+					printf("w=%d t=%d tasks=%lf w2=%d\n", w, t, tasks[w][t], w2);
+					if(w2 != -1)
+					{
+						tasks[w2][t] += tasks[w][t];
+						tasks[w][t] = 0.0;
+					}
+				}
+				else
+					last_ctx = tp->sched_ctx_id;
+			}
 		}
 	}
 
-	glp_delete_prob(lp);
+	
+	for(w = 0; w < nw; w++)
+	{
+		unsigned empty = 1;
+		for (t = 0, tp = task_pools; tp; t++, tp = tp->next)
+		{
+			if(tasks[w][t] >= 1.0)
+			{
+				printf("%d: tasks %lf\n", w, tasks[w][t]);
+				empty = 0;
+				break;
+			}
+		}
+		
+		if(empty)
+		{
+			printf("worker having no task %d\n", w);
+			_get_tasks_from_busiest_worker(nw, nt, tasks, w);
+		}
+	}
 }
 
-void _redistribute_resources_in_ctxs2(int ns, int nw, int nt, double res[ns][nw][nt])
+void _redistribute_resources_in_ctxs2(int ns, int nw, int nt, double tasks[nw][nt])
 {
 	int *sched_ctxs = sched_ctx_hypervisor_get_sched_ctxs();
         struct bound_task_pool * tp;
@@ -293,7 +406,7 @@ void _redistribute_resources_in_ctxs2(int ns, int nw, int nt, double res[ns][nw]
 			{
 				if(tp->sched_ctx_id == sched_ctxs[s])
 				{
-					if(res[s][w][t] >= 1.0)
+					if(tasks[w][t] >= 1.0)
 					{
 						workers_to_add[nadd++] = w;
 						found = 1;
@@ -307,49 +420,75 @@ void _redistribute_resources_in_ctxs2(int ns, int nw, int nt, double res[ns][nw]
 
 		
 		unsigned nworkers_ctx = get_nworkers_ctx(sched_ctxs[s], STARPU_ALL);
-		if(nadd != nworkers_ctx)
+	
+		if(nworkers_ctx > nremove)
+			sched_ctx_hypervisor_remove_workers_from_sched_ctx(workers_to_remove, nremove, sched_ctxs[s]);
+	
+		if(nworkers_ctx != STARPU_NMAXWORKERS)
 		{
-			printf("%d: add %d \n", sched_ctxs[s], nadd);
-			printf("%d: remove %d \n", sched_ctxs[s], nremove);
 			sched_ctx_hypervisor_add_workers_to_sched_ctx(workers_to_add, nadd, sched_ctxs[s]);
-			sched_ctx_hypervisor_remove_workers_from_sched_ctx(workers_to_remove, nremove, sched_ctxs[s]);
-
 			struct policy_config *new_config = sched_ctx_hypervisor_get_config(sched_ctxs[s]);
 			int i;
 			for(i = 0; i < nadd; i++)
-				new_config->max_idle[workers_to_add[i]] = new_config->max_idle[workers_to_add[i]] !=MAX_IDLE_TIME ? new_config->max_idle[workers_to_add[i]] :  new_config->new_workers_max_idle;
+				new_config->max_idle[workers_to_add[i]] = new_config->max_idle[workers_to_add[i]] != MAX_IDLE_TIME ? new_config->max_idle[workers_to_add[i]] :  new_config->new_workers_max_idle;
 		}
 	}
+		
 }
-
+int redistrib = 0;
+int done = 0;
 void lp2_handle_poped_task(unsigned sched_ctx, int worker)
 {
-	if(_velocity_gap_btw_ctxs())
+	struct sched_ctx_wrapper* sc_w = sched_ctx_hypervisor_get_wrapper(sched_ctx);
+	
+	int ret = pthread_mutex_trylock(&act_hypervisor_mutex);
+	if(ret != EBUSY)
 	{
-		int ns = sched_ctx_hypervisor_get_nsched_ctxs();
-		int nw = starpu_worker_get_count(); /* Number of different workers */
-		int nt = 0; /* Number of different kinds of tasks */
-		struct bound_task_pool * tp;
-		for (tp = task_pools; tp; tp = tp->next)
-			nt++;
-		
-       		double res[ns][nw][nt];
+		if(sc_w->submitted_flops >= sc_w->total_flops && !done)
+		{
+			redistrib = 1;
+			done = 1;
+		}
 
-		int ret = pthread_mutex_trylock(&act_hypervisor_mutex);
-		if(ret != EBUSY)
+		if(_velocity_gap_btw_ctxs() && redistrib)
 		{
-			_glp_resolve(ns, nw, nt, res, 0);
-/* 			int i, j, k; */
-/* 			for( i = 0; i < ns; i++) */
-/* 				for(j = 0; j < nw; j++) */
-/* 					for(k = 0; k < nt; k++) */
-/* 					{ */
-/* 						printf("ctx %d/worker %d/task type %d: res = %lf \n", i, j, k, res[i][j][k]); */
-/* 					} */
-		
-			_redistribute_resources_in_ctxs2(ns, nw, nt, res);
-			pthread_mutex_unlock(&act_hypervisor_mutex);
+			redistrib = 0;
+			int ns = sched_ctx_hypervisor_get_nsched_ctxs();
+			int nw = starpu_worker_get_count(); /* Number of different workers */
+			int nt = 0; /* Number of different kinds of tasks */
+			struct bound_task_pool * tp;
+			for (tp = task_pools; tp; tp = tp->next)
+				nt++;
+			
+			double tasks[nw][nt];
+ 			int w,t;
+			for(w = 0; w < nw; w++)
+				for(t = 0; t < nt; t++)
+					tasks[w][t] = 0.0;
+
+			printf("###################################start to resolve \n");
+			_glp_resolve(ns, nw, nt, tasks);
+			for(w = 0; w < nw; w++)
+				for (t = 0, tp = task_pools; tp; t++, tp = tp->next)
+				{
+					if(tasks[w][t] > 0.0)
+						printf("ctx %d/worker %d/task type %d: res = %lf \n", tp->sched_ctx_id, w, t, tasks[w][t]);
+				}
+			printf("***************************\n");			
+
+			_recompute_resource_distrib(nw, nt, tasks);
+
+			for(w = 0; w < nw; w++)
+				for (t = 0, tp = task_pools; tp; t++, tp = tp->next)
+				{
+					if(tasks[w][t] > 0.0)
+						printf("ctx %d/worker %d/task type %d: res = %lf \n", tp->sched_ctx_id, w, t, tasks[w][t]);
+				}
+			
+
+			_redistribute_resources_in_ctxs2(ns, nw, nt, tasks);
 		}
+		pthread_mutex_unlock(&act_hypervisor_mutex);
 	}		
 }
 

+ 97 - 29
sched_ctx_hypervisor/src/hypervisor_policies/lp_policy.c

@@ -189,15 +189,21 @@ int _velocity_gap_btw_ctxs()
 	{
 		sc_w = sched_ctx_hypervisor_get_wrapper(sched_ctxs[i]);
 		double ctx_v = _get_ctx_velocity(sc_w);
-		for(j = 0; j < nsched_ctxs; j++)
+		if(ctx_v != 0.0)
 		{
-			if(sched_ctxs[i] != sched_ctxs[j])
+			for(j = 0; j < nsched_ctxs; j++)
 			{
-				other_sc_w = sched_ctx_hypervisor_get_wrapper(sched_ctxs[j]);
-				double other_ctx_v = _get_ctx_velocity(other_sc_w);
-				double gap = ctx_v < other_ctx_v ? ctx_v / other_ctx_v : other_ctx_v / ctx_v;
-				if(gap > 0.7)
-					return 1;
+				if(sched_ctxs[i] != sched_ctxs[j])
+				{
+					other_sc_w = sched_ctx_hypervisor_get_wrapper(sched_ctxs[j]);
+					double other_ctx_v = _get_ctx_velocity(other_sc_w);
+					if(other_ctx_v != 0.0)
+					{
+						double gap = ctx_v < other_ctx_v ? ctx_v / other_ctx_v : other_ctx_v / ctx_v;
+						if(gap > 0.5)
+							return 1;
+					}
+				}
 			}
 		}
 
@@ -261,7 +267,7 @@ void _round_double_to_int(int ns, int nw, double res[ns][nw], int res_rounded[ns
 	}		
 }
 
-void _redistribute_resources_in_ctxs(int ns, int nw, int res_rounded[ns][nw])
+void _redistribute_resources_in_ctxs(int ns, int nw, int res_rounded[ns][nw], double res[ns][nw])
 {
 	int *sched_ctxs = sched_ctx_hypervisor_get_sched_ctxs();
 	int s, s2, w;
@@ -273,36 +279,98 @@ void _redistribute_resources_in_ctxs(int ns, int nw, int res_rounded[ns][nw])
 			if(w == 0) arch = STARPU_CUDA_WORKER;
 			if(w == 1) arch = STARPU_CPU_WORKER;
 
-			unsigned nworkers_ctx = get_nworkers_ctx(sched_ctxs[s], arch);
-			if(nworkers_ctx > res_rounded[s][w])
+			if(w == 1)
 			{
-				int nworkers_to_move = nworkers_ctx - res_rounded[s][w];
-				int receiving_s = -1;
-				
-				for(s2 = 0; s2 < ns; s2++)
+				unsigned nworkers_ctx = get_nworkers_ctx(sched_ctxs[s], arch);
+				if(nworkers_ctx > res_rounded[s][w])
 				{
-					if(sched_ctxs[s2] != sched_ctxs[s])
+					int nworkers_to_move = nworkers_ctx - res_rounded[s][w];
+					int receiving_s = -1;
+					
+					for(s2 = 0; s2 < ns; s2++)
 					{
-						int nworkers_ctx2 = get_nworkers_ctx(sched_ctxs[s2], arch);
-						if((res_rounded[s2][w] - nworkers_ctx2) == nworkers_to_move)
+						if(sched_ctxs[s2] != sched_ctxs[s])
 						{
-							receiving_s = sched_ctxs[s2];
-							break;
+							int nworkers_ctx2 = get_nworkers_ctx(sched_ctxs[s2], arch);
+							if((res_rounded[s2][w] - nworkers_ctx2) >= nworkers_to_move)
+							{
+								receiving_s = sched_ctxs[s2];
+								break;
+							}
 						}
 					}
+					if(receiving_s != -1)
+					{
+						int *workers_to_move = _get_first_workers(sched_ctxs[s], &nworkers_to_move, arch);
+						sched_ctx_hypervisor_move_workers(sched_ctxs[s], receiving_s, workers_to_move, nworkers_to_move);
+						struct policy_config *new_config = sched_ctx_hypervisor_get_config(receiving_s);
+						int i;
+						for(i = 0; i < nworkers_to_move; i++)
+							new_config->max_idle[workers_to_move[i]] = new_config->max_idle[workers_to_move[i]] !=MAX_IDLE_TIME ? new_config->max_idle[workers_to_move[i]] :  new_config->new_workers_max_idle;
+						
+						free(workers_to_move);
+					}
 				}
-				if(receiving_s != -1)
+			}
+			else
+			{
+				double nworkers_ctx = get_nworkers_ctx(sched_ctxs[s], arch) * 1.0;
+				if(nworkers_ctx > res[s][w])
 				{
-					int *workers_to_move = _get_first_workers(sched_ctxs[s], &nworkers_to_move, arch);
-					sched_ctx_hypervisor_move_workers(sched_ctxs[s], receiving_s, workers_to_move, nworkers_to_move);
-					struct policy_config *new_config = sched_ctx_hypervisor_get_config(receiving_s);
-					int i;
-					for(i = 0; i < nworkers_to_move; i++)
-						new_config->max_idle[workers_to_move[i]] = new_config->max_idle[workers_to_move[i]] !=MAX_IDLE_TIME ? new_config->max_idle[workers_to_move[i]] :  new_config->new_workers_max_idle;
-
-					free(workers_to_move);
+					double nworkers_to_move = nworkers_ctx - res[s][w];
+					int receiving_s = -1;
+					
+					for(s2 = 0; s2 < ns; s2++)
+					{
+						if(sched_ctxs[s2] != sched_ctxs[s])
+						{
+							double nworkers_ctx2 = get_nworkers_ctx(sched_ctxs[s2], arch) * 1.0;
+							if((res[s2][w] - nworkers_ctx2) >= nworkers_to_move)
+							{
+								receiving_s = sched_ctxs[s2];
+								break;
+							}
+						}
+					}
+					if(receiving_s != -1)
+					{
+						int x = floor(nworkers_to_move);
+						double x_double = (double)x;
+						double diff = nworkers_to_move - x_double;
+						if(diff == 0)
+						{
+							int *workers_to_move = _get_first_workers(sched_ctxs[s], &x, arch);
+							sched_ctx_hypervisor_move_workers(sched_ctxs[s], receiving_s, workers_to_move, x);
+							struct policy_config *new_config = sched_ctx_hypervisor_get_config(receiving_s);
+							int i;
+							for(i = 0; i < x; i++)
+								new_config->max_idle[workers_to_move[i]] = new_config->max_idle[workers_to_move[i]] !=MAX_IDLE_TIME ? new_config->max_idle[workers_to_move[i]] :  new_config->new_workers_max_idle;
+							
+							free(workers_to_move);
+						}
+						else
+						{
+							x+=1;
+							int *workers_to_move = _get_first_workers(sched_ctxs[s], &x, arch);
+							sched_ctx_hypervisor_remove_workers_from_sched_ctx(workers_to_move, x-1, sched_ctxs[s]);
+							if(diff > 0.3)
+								sched_ctx_hypervisor_add_workers_to_sched_ctx(workers_to_move, x, receiving_s);
+							else
+								sched_ctx_hypervisor_add_workers_to_sched_ctx(workers_to_move, x-1, receiving_s);
+
+							struct policy_config *new_config = sched_ctx_hypervisor_get_config(receiving_s);
+							int i;
+							for(i = 0; i < x-1; i++)
+								new_config->max_idle[workers_to_move[i]] = new_config->max_idle[workers_to_move[i]] !=MAX_IDLE_TIME ? new_config->max_idle[workers_to_move[i]] :  new_config->new_workers_max_idle;
+
+							free(workers_to_move);
+							
+
+						}
+					}
 				}
 			}
+
 		}
 	}
 }
@@ -346,7 +414,7 @@ void lp_handle_poped_task(unsigned sched_ctx, int worker)
 /* 				printf("ctx %d/worker type %d: n = %d \n", i, 1, res_rounded[i][1]); */
 /* 			} */
 			
-			_redistribute_resources_in_ctxs(nsched_ctxs, 2, res_rounded);
+			_redistribute_resources_in_ctxs(nsched_ctxs, 2, res_rounded, res);
 			
 			pthread_mutex_unlock(&act_hypervisor_mutex);
 		}

+ 31 - 18
sched_ctx_hypervisor/src/sched_ctx_hypervisor.c

@@ -136,13 +136,14 @@ struct starpu_performance_counters* sched_ctx_hypervisor_init(struct hypervisor_
 		hypervisor.sched_ctx_w[i].sched_ctx = STARPU_NMAX_SCHED_CTXS;
 		hypervisor.sched_ctx_w[i].config = NULL;
 		hypervisor.sched_ctx_w[i].total_flops = 0.0;
+		hypervisor.sched_ctx_w[i].submitted_flops = 0.0;
 		hypervisor.sched_ctx_w[i].remaining_flops = 0.0;
 		hypervisor.sched_ctx_w[i].start_time = 0.0;
 		hypervisor.sched_ctx_w[i].resize_ack.receiver_sched_ctx = -1;
 		hypervisor.sched_ctx_w[i].resize_ack.moved_workers = NULL;
 		hypervisor.sched_ctx_w[i].resize_ack.nmoved_workers = 0;
 		hypervisor.sched_ctx_w[i].resize_ack.acked_workers = NULL;
-
+		pthread_mutex_init(&hypervisor.sched_ctx_w[i].mutex, NULL);
 
 		int j;
 		for(j = 0; j < STARPU_NMAXWORKERS; j++)
@@ -201,6 +202,7 @@ void sched_ctx_hypervisor_shutdown(void)
 		{
 			sched_ctx_hypervisor_stop_resize(hypervisor.sched_ctxs[i]);
 			sched_ctx_hypervisor_unregister_ctx(hypervisor.sched_ctxs[i]);
+			pthread_mutex_destroy(&hypervisor.sched_ctx_w[i].mutex);
 		}
 	}
 	perf_counters->notify_idle_cycle = NULL;
@@ -324,7 +326,7 @@ int get_nworkers_ctx(unsigned sched_ctx, enum starpu_archtype arch)
 	{
 		worker = workers->get_next(workers);
 		enum starpu_archtype curr_arch = starpu_worker_get_type(worker);
-		if( curr_arch == arch)
+		if(curr_arch == arch || arch == STARPU_ALL)
 			nworkers_ctx++;
 	}
 	return nworkers_ctx;
@@ -334,7 +336,6 @@ int get_nworkers_ctx(unsigned sched_ctx, enum starpu_archtype arch)
 /* forbids another resize request before this one is take into account */
 void sched_ctx_hypervisor_move_workers(unsigned sender_sched_ctx, unsigned receiver_sched_ctx, int* workers_to_move, unsigned nworkers_to_move)
 {
-	printf("nworkers to move %d resize_sender %d resize_receiver %d\n", nworkers_to_move,  hypervisor.resize[sender_sched_ctx], hypervisor.resize[receiver_sched_ctx]);
 	if(nworkers_to_move > 0 && hypervisor.resize[sender_sched_ctx] && hypervisor.resize[receiver_sched_ctx])
 	{
 		int j;
@@ -349,10 +350,12 @@ void sched_ctx_hypervisor_move_workers(unsigned sender_sched_ctx, unsigned recei
 		_get_cpus(workers_to_move, nworkers_to_move, cpus, &ncpus);
 
 //		if(ncpus != 0)
-			starpu_remove_workers_from_sched_ctx(cpus, ncpus, sender_sched_ctx);
+//			starpu_remove_workers_from_sched_ctx(cpus, ncpus, sender_sched_ctx);
 
+		starpu_remove_workers_from_sched_ctx(workers_to_move, nworkers_to_move, sender_sched_ctx);
 		starpu_add_workers_to_sched_ctx(workers_to_move, nworkers_to_move, receiver_sched_ctx);
 
+		pthread_mutex_lock(&hypervisor.sched_ctx_w[sender_sched_ctx].mutex);
 		hypervisor.sched_ctx_w[sender_sched_ctx].resize_ack.receiver_sched_ctx = receiver_sched_ctx;
 		hypervisor.sched_ctx_w[sender_sched_ctx].resize_ack.moved_workers = (int*)malloc(nworkers_to_move * sizeof(int));
 		hypervisor.sched_ctx_w[sender_sched_ctx].resize_ack.nmoved_workers = nworkers_to_move;
@@ -367,6 +370,8 @@ void sched_ctx_hypervisor_move_workers(unsigned sender_sched_ctx, unsigned recei
 			hypervisor.sched_ctx_w[sender_sched_ctx].resize_ack.acked_workers[i] = 0;	
 		}
 
+		pthread_mutex_lock(&hypervisor.sched_ctx_w[sender_sched_ctx].mutex);
+
 		hypervisor.resize[sender_sched_ctx] = 0;
 		hypervisor.resize[receiver_sched_ctx] = 0;
 	}
@@ -379,13 +384,14 @@ void sched_ctx_hypervisor_add_workers_to_sched_ctx(int* workers_to_add, unsigned
 	if(nworkers_to_add > 0 && hypervisor.resize[sched_ctx])
 	{
 		int j;
-		printf("resize ctx %d with", sched_ctx);
+		printf("add to ctx %d:", sched_ctx);
 		for(j = 0; j < nworkers_to_add; j++)
 			printf(" %d", workers_to_add[j]);
 		printf("\n");
 
 		starpu_add_workers_to_sched_ctx(workers_to_add, nworkers_to_add, sched_ctx);
 
+		pthread_mutex_lock(&hypervisor.sched_ctx_w[sched_ctx].mutex);
 		hypervisor.sched_ctx_w[sched_ctx].resize_ack.receiver_sched_ctx = sched_ctx;
 		hypervisor.sched_ctx_w[sched_ctx].resize_ack.moved_workers = (int*)malloc(nworkers_to_add * sizeof(int));
 		hypervisor.sched_ctx_w[sched_ctx].resize_ack.nmoved_workers = nworkers_to_add;
@@ -399,6 +405,7 @@ void sched_ctx_hypervisor_add_workers_to_sched_ctx(int* workers_to_add, unsigned
 			hypervisor.sched_ctx_w[sched_ctx].resize_ack.moved_workers[i] = workers_to_add[i];	
 			hypervisor.sched_ctx_w[sched_ctx].resize_ack.acked_workers[i] = 0;	
 		}
+		pthread_mutex_unlock(&hypervisor.sched_ctx_w[sched_ctx].mutex);
 
 		hypervisor.resize[sched_ctx] = 0;
 	}
@@ -411,28 +418,28 @@ void sched_ctx_hypervisor_remove_workers_from_sched_ctx(int* workers_to_remove,
 	if(nworkers_to_remove > 0 && hypervisor.resize[sched_ctx])
 	{
 		int j;
-		printf("resize ctx %d with", sched_ctx);
+		printf("remove from ctx %d:", sched_ctx);
 		for(j = 0; j < nworkers_to_remove; j++)
 			printf(" %d", workers_to_remove[j]);
 		printf("\n");
 
 		starpu_remove_workers_from_sched_ctx(workers_to_remove, nworkers_to_remove, sched_ctx);
 
-		hypervisor.sched_ctx_w[sched_ctx].resize_ack.receiver_sched_ctx = sched_ctx;
-		hypervisor.sched_ctx_w[sched_ctx].resize_ack.moved_workers = (int*)malloc(nworkers_to_remove * sizeof(int));
-		hypervisor.sched_ctx_w[sched_ctx].resize_ack.nmoved_workers = nworkers_to_remove;
-		hypervisor.sched_ctx_w[sched_ctx].resize_ack.acked_workers = (int*)malloc(nworkers_to_remove * sizeof(int));
+/* 		hypervisor.sched_ctx_w[sched_ctx].resize_ack.receiver_sched_ctx = sched_ctx; */
+/* 		hypervisor.sched_ctx_w[sched_ctx].resize_ack.moved_workers = (int*)malloc(nworkers_to_remove * sizeof(int)); */
+/* 		hypervisor.sched_ctx_w[sched_ctx].resize_ack.nmoved_workers = nworkers_to_remove; */
+/* 		hypervisor.sched_ctx_w[sched_ctx].resize_ack.acked_workers = (int*)malloc(nworkers_to_remove * sizeof(int)); */
 
 
-		int i;
-		for(i = 0; i < nworkers_to_remove; i++)
-		{
-			hypervisor.sched_ctx_w[sched_ctx].current_idle_time[workers_to_remove[i]] = 0.0;
-			hypervisor.sched_ctx_w[sched_ctx].resize_ack.moved_workers[i] = workers_to_remove[i];	
-			hypervisor.sched_ctx_w[sched_ctx].resize_ack.acked_workers[i] = 0;	
-		}
+/* 		int i; */
+/* 		for(i = 0; i < nworkers_to_remove; i++) */
+/* 		{ */
+/* 			hypervisor.sched_ctx_w[sched_ctx].current_idle_time[workers_to_remove[i]] = 0.0; */
+/* 			hypervisor.sched_ctx_w[sched_ctx].resize_ack.moved_workers[i] = workers_to_remove[i];	 */
+/* 			hypervisor.sched_ctx_w[sched_ctx].resize_ack.acked_workers[i] = 0;	 */
+/* 		} */
 
-		hypervisor.resize[sched_ctx] = 0;
+//		hypervisor.resize[sched_ctx] = 0;
 	}
 
 	return;
@@ -528,10 +535,12 @@ static unsigned _ack_resize_completed(unsigned sched_ctx, int worker)
 			/* if the user allowed resizing leave the decisions to the application */
 			if(imposed_resize)  imposed_resize = 0;
 
+			pthread_mutex_lock(&hypervisor.sched_ctx_w[sched_ctx].mutex);
 			resize_ack->receiver_sched_ctx = -1;
 			resize_ack->nmoved_workers = 0;
 			free(resize_ack->moved_workers);
 			free(resize_ack->acked_workers);
+			pthread_mutex_unlock(&hypervisor.sched_ctx_w[sched_ctx].mutex);
 		}
 		return resize_completed;
 	}
@@ -653,6 +662,10 @@ static void notify_post_exec_hook(unsigned sched_ctx, int task_tag)
 
 static void notify_submitted_job(struct starpu_task *task, unsigned footprint)
 {
+	pthread_mutex_lock(&act_hypervisor_mutex);
+	hypervisor.sched_ctx_w[task->sched_ctx].submitted_flops += task->flops;
+	pthread_mutex_unlock(&act_hypervisor_mutex);
+
 	if(hypervisor.policy.handle_submitted_job)
 		hypervisor.policy.handle_submitted_job(task, footprint);
 }

+ 14 - 1
src/core/sched_ctx.c

@@ -349,9 +349,12 @@ void starpu_add_workers_to_sched_ctx(int *workers_to_add, int nworkers_to_add, u
 	_STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->empty_ctx_mutex);
 	while(!starpu_task_list_empty(&sched_ctx->empty_ctx_tasks))
 	{
+		if(unlocked)
+			_STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->empty_ctx_mutex);
 		struct starpu_task *old_task = starpu_task_list_pop_back(&sched_ctx->empty_ctx_tasks);
-		_STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->empty_ctx_mutex);
 		unlocked = 1;
+		_STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->empty_ctx_mutex);
+
 		struct _starpu_job *old_j = _starpu_get_job_associated_to_task(old_task);
 		_starpu_push_task(old_j);
 	}
@@ -650,6 +653,16 @@ unsigned starpu_get_nshared_workers(unsigned sched_ctx_id, unsigned sched_ctx_id
 	return shared_workers;
 }
 
+unsigned starpu_worker_belongs_to_sched_ctx(int workerid, unsigned sched_ctx_id)
+{
+	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
+	unsigned i;
+	for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
+		if(worker->sched_ctx[i] == sched_ctx_id)
+			return 1;
+	return 0;
+}
+
 #ifdef STARPU_USE_SCHED_CTX_HYPERVISOR
 void starpu_call_poped_task_cb(int workerid, unsigned sched_ctx_id, double flops)
 {

+ 1 - 1
src/core/sched_ctx.h

@@ -42,7 +42,7 @@ struct _starpu_sched_ctx {
 
 	struct worker_collection *workers;
 	
-	/* mutext for temp_nworkers_in_ctx*/
+	/* mutex for temp_nworkers_in_ctx*/
 	pthread_mutex_t changing_ctx_mutex;
 
 	/* we keep an initial sched which we never delete */