Browse Source

bug fixing + new strateg

Andra Hugo 13 years ago
parent
commit
3b0a936840

+ 2 - 1
include/starpu.h

@@ -163,7 +163,8 @@ void starpu_worker_get_name(int id, char *dst, size_t maxlen);
  *  identifier (as returned by the starpu_worker_get_id() function)
  */
 int starpu_worker_get_devid(int id);
-
+void starpu_profiling_init();
+	void starpu_display_stats();
 #ifdef __cplusplus
 }
 #endif

+ 3 - 0
include/starpu_task.h

@@ -301,6 +301,7 @@ struct starpu_task *starpu_task_create(void);
  * allocated task results in an undefined behaviour. */
 void starpu_task_destroy(struct starpu_task *task);
 int starpu_task_submit(struct starpu_task *task);// STARPU_WARN_UNUSED_RESULT;
+int starpu_task_submit_to_ctx(struct starpu_task *task, unsigned sched_ctx_id);
 
 /* This function blocks until the task was executed. It is not possible to
  * synchronize with a task more than once. It is not possible to wait
@@ -313,6 +314,8 @@ int starpu_task_wait(struct starpu_task *task);// STARPU_WARN_UNUSED_RESULT;
  * been executed. */
 int starpu_task_wait_for_all(void);
 
+int starpu_task_wait_for_all_in_ctx(unsigned sched_ctx);
+
 /* This function waits until there is no more ready task. */
 int starpu_task_wait_for_no_ready(void);
 

+ 2 - 1
sched_ctx_hypervisor/src/Makefile.am

@@ -31,7 +31,8 @@ libsched_ctx_hypervisor_la_SOURCES = 			\
 	hypervisor_policies/app_driven_policy.c		\
 	hypervisor_policies/gflops_rate_policy.c	\
 	hypervisor_policies/lp_policy.c			\
-	hypervisor_policies/lp2_policy.c	
+	hypervisor_policies/lp2_policy.c		\
+	hypervisor_policies/lp3_policy.c		
 
 noinst_HEADERS = sched_ctx_hypervisor_intern.h		\
 	hypervisor_policies/policy_utils.h

+ 10 - 4
sched_ctx_hypervisor/src/hypervisor_policies/gflops_rate_policy.c

@@ -31,7 +31,7 @@ double _get_exp_end(unsigned sched_ctx)
 	struct sched_ctx_wrapper *sc_w = sched_ctx_hypervisor_get_wrapper(sched_ctx);
 	double elapsed_flops = sched_ctx_hypervisor_get_elapsed_flops_per_sched_ctx(sc_w);
 
-	if( elapsed_flops != 0.0)
+	if( elapsed_flops >= 1.0)
 	{
 		double curr_time = starpu_timing_now();
 		double elapsed_time = curr_time - sc_w->start_time;
@@ -116,7 +116,7 @@ static int* _get_workers_to_move(unsigned sender_sched_ctx, unsigned receiver_sc
 			/*if the needed number of workers is to big we only move the number of workers 
 			  corresponding to the granularity set by the user */
                         int nworkers_to_move = _get_nworkers_to_move(sender_sched_ctx);
-
+			
                         if(sender_nworkers - nworkers_to_move >= sender_config->min_nworkers)
                         {
                                 unsigned nshared_workers = starpu_get_nshared_workers(sender_sched_ctx, receiver_sched_ctx);
@@ -125,7 +125,7 @@ static int* _get_workers_to_move(unsigned sender_sched_ctx, unsigned receiver_sc
 
                                 if(nworkers_to_move > 0)
                                 {
-                                        workers = _get_first_workers(sender_sched_ctx, &nworkers_to_move, -1);
+                                        workers = _get_first_workers(sender_sched_ctx, &nworkers_to_move, STARPU_ALL);
                                         *nworkers = nworkers_to_move;
                                 }
                         }
@@ -278,7 +278,13 @@ static void gflops_rate_resize(unsigned sched_ctx)
 		{
 			double fast_flops_left_pct = _get_flops_left_pct(fastest_sched_ctx);
 			if(fast_flops_left_pct < 0.8)
-				_gflops_rate_resize(fastest_sched_ctx, slowest_sched_ctx, 0);
+			{
+
+				struct sched_ctx_wrapper *sc_w = sched_ctx_hypervisor_get_wrapper(slowest_sched_ctx);
+				double elapsed_flops = sched_ctx_hypervisor_get_elapsed_flops_per_sched_ctx(sc_w);
+				if((elapsed_flops/sc_w->total_flops) > 0.1)
+					_gflops_rate_resize(fastest_sched_ctx, slowest_sched_ctx, 0);
+			}
 		}
 	}
 }

+ 1 - 15
sched_ctx_hypervisor/src/hypervisor_policies/lp2_policy.c

@@ -16,20 +16,6 @@
 
 #include "policy_utils.h"
 #include <math.h>
-struct bound_task_pool
-{
-	/* Which codelet has been executed */
-	struct starpu_codelet *cl;
-	/* Task footprint key */
-	uint32_t footprint;
-	/* Context the task belongs to */
-	unsigned sched_ctx_id;
-	/* Number of tasks of this kind */
-	unsigned long n;
-	/* Other task kinds */
-	struct bound_task_pool *next;
-};
-
 
 static struct bound_task_pool *task_pools, *last;
 
@@ -143,7 +129,7 @@ static void _glp_resolve(int ns, int nw, int nt, double tasks[nw][nt])
 
 		int *sched_ctxs = sched_ctx_hypervisor_get_sched_ctxs();
 
-		/* Number of task * time > 0.3 * tmax */
+		/* ntasks_per_worker*t_tasks < tmax */
 		glp_add_rows(lp, nw*ns);
 		for(s = 0; s < ns; s++)
 		{

+ 388 - 0
sched_ctx_hypervisor/src/hypervisor_policies/lp3_policy.c

@@ -0,0 +1,388 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2011, 2012  INRIA
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include "policy_utils.h"
+#include <math.h>
+
+static struct bound_task_pool *task_pools, *last;
+
+static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
+
+static void lp3_handle_submitted_job(struct starpu_task *task, unsigned footprint)
+{
+	pthread_mutex_lock(&mutex);
+	struct bound_task_pool *tp;
+	
+	if (last && last->cl == task->cl && last->footprint == footprint && last->sched_ctx_id == task->sched_ctx)
+		tp = last;
+	else
+		for (tp = task_pools; tp; tp = tp->next)
+			if (tp->cl == task->cl && tp->footprint == footprint && tp->sched_ctx_id == task->sched_ctx)
+					break;
+	
+	if (!tp)
+	{
+		tp = (struct bound_task_pool *) malloc(sizeof(*tp));
+		tp->cl = task->cl;
+		tp->footprint = footprint;
+		tp->sched_ctx_id = task->sched_ctx;
+		tp->n = 0;
+		tp->next = task_pools;
+		task_pools = tp;
+	}
+	
+	/* One more task of this kind */
+	tp->n++;
+	pthread_mutex_unlock(&mutex);
+}
+
+static void _starpu_get_tasks_times(int nw, int nt, double times[nw][nt])
+{
+        struct bound_task_pool *tp;
+        int w, t;
+        for (w = 0; w < nw; w++)
+        {
+                for (t = 0, tp = task_pools; tp; t++, tp = tp->next)
+                {
+                        enum starpu_perf_archtype arch = starpu_worker_get_perf_archtype(w);
+                        double length = starpu_history_based_job_expected_perf(tp->cl->model, arch, tp->footprint);
+
+                        if (isnan(length))
+                                times[w][t] = NAN;
+                       else
+                                times[w][t] = length / 1000.;
+			
+//			printf("t%d on worker %d ctx %d: %lf \n", t, w, tp->sched_ctx_id, times[w][t]);
+                }
+//		printf("\n");
+        }
+//	printf("\n");
+}
+
+/*                                                                                                                                                                                                                  
+ * GNU Linear Programming Kit backend                                                                                                                                                                               
+ */
+#ifdef HAVE_GLPK_H
+#include <glpk.h>
+static double _glp_resolve(int ns, int nw, int nt, double tasks[nw][nt], double tmax, double w_in_s[ns][nw])
+{
+	struct bound_task_pool * tp;
+	int t, w, s;
+	glp_prob *lp;
+
+	lp = glp_create_prob();
+	glp_set_prob_name(lp, "StarPU theoretical bound");
+	glp_set_obj_dir(lp, GLP_MAX);
+	glp_set_obj_name(lp, "total execution time");
+
+	{
+		double times[nw][nt];
+		int ne =
+//			nw * (nt+1)	/* worker execution time */
+			+ nt * nw
+			+ nw * (nt+ns)
+			+ nw * ns
+			+ 1; /* glp dumbness */
+		int n = 1;
+		int ia[ne], ja[ne];
+		double ar[ne];
+
+		_starpu_get_tasks_times(nw, nt, times);
+
+		/* Variables: number of tasks i assigned to worker j, and tmax */
+		glp_add_cols(lp, nw*nt+ns*nw);
+#define colnum(w, t) ((t)*nw+(w)+1)
+		for(s = 0; s < ns; s++)
+			for(w = 0; w < nw; w++)
+				glp_set_obj_coef(lp, nw*nt+s*nw+w+1, 1.);
+
+		for (w = 0; w < nw; w++)
+			for (t = 0, tp = task_pools; tp; t++, tp = tp->next)
+			{
+				char name[32];
+				snprintf(name, sizeof(name), "w%dt%dn", w, t);
+				glp_set_col_name(lp, colnum(w, t), name);
+				glp_set_col_bnds(lp, colnum(w, t), GLP_LO, 0., 0.);
+			}
+		for(s = 0; s < ns; s++)
+			for(w = 0; w < nw; w++)
+			{
+				char name[32];
+				snprintf(name, sizeof(name), "w%ds%dn", w, s);
+				glp_set_col_name(lp, nw*nt+s*nw+w+1, name);	
+				glp_set_col_bnds(lp, nw*nt+s*nw+w+1, GLP_LO, 0., 0.);
+			}
+
+		int *sched_ctxs = sched_ctx_hypervisor_get_sched_ctxs();
+
+		int curr_row_idx = 0;
+		/* Total worker execution time */
+		glp_add_rows(lp, nw*ns);
+		for (t = 0, tp = task_pools; tp; t++, tp = tp->next)
+		{
+			int someone = 0;
+			for (w = 0; w < nw; w++)
+				if (!isnan(times[w][t]))
+					someone = 1;
+			if (!someone)
+			{
+				/* This task does not have any performance model at all, abort */
+				glp_delete_prob(lp);
+				return 0.0;
+			}
+		}
+		for(s = 0; s < ns; s++)
+		{
+			for (w = 0; w < nw; w++)
+			{
+				char name[32], title[64];
+				starpu_worker_get_name(w, name, sizeof(name));
+				snprintf(title, sizeof(title), "worker %s", name);
+				glp_set_row_name(lp, curr_row_idx+s*nw+w+1, title);
+				for (t = 0, tp = task_pools; tp; t++, tp = tp->next)
+				{
+					if(tp->sched_ctx_id == sched_ctxs[s])
+					{
+						ia[n] = curr_row_idx+s*nw+w+1;
+						ja[n] = colnum(w, t);
+						if (isnan(times[w][t]))
+							ar[n] = 1000000000.;
+						else
+							ar[n] = times[w][t];
+						n++;
+					}
+				}
+				/* x[s][w] = 1 | 0 */
+				ia[n] = curr_row_idx+s*nw+w+1;
+				ja[n] = nw*nt+s*nw+w+1;
+				ar[n] = (-1) * tmax;
+				n++;
+				glp_set_row_bnds(lp, curr_row_idx+s*nw+w+1, GLP_UP, 0, 0);
+			}
+		}
+
+		curr_row_idx += nw*ns;
+
+		/* Total task completion */
+		glp_add_rows(lp, nt);
+		for (t = 0, tp = task_pools; tp; t++, tp = tp->next)
+		{
+			char name[32], title[64];
+			starpu_worker_get_name(w, name, sizeof(name));
+			snprintf(title, sizeof(title), "task %s key %x", tp->cl->name, (unsigned) tp->footprint);
+			glp_set_row_name(lp, curr_row_idx+t+1, title);
+			for (w = 0; w < nw; w++)
+			{
+				ia[n] = curr_row_idx+t+1;
+				ja[n] = colnum(w, t);
+				ar[n] = 1;
+				n++;
+			}
+			glp_set_row_bnds(lp, curr_row_idx+t+1, GLP_FX, tp->n, tp->n);
+		}
+
+		curr_row_idx += nt;
+
+		/* sum(x[s][i] */
+		glp_add_rows(lp, nw);
+		for (w = 0; w < nw; w++)
+		{
+			char name[32], title[64];
+			starpu_worker_get_name(w, name, sizeof(name));
+			snprintf(title, sizeof(title), "w%x", w);
+			glp_set_row_name(lp, curr_row_idx+w+1, title);
+			for(s = 0; s < ns; s++)
+			{
+				ia[n] = curr_row_idx+w+1;
+				ja[n] = nw*nt+s*nw+w+1;
+				ar[n] = 1;
+				n++;
+			}
+
+			glp_set_row_bnds(lp, curr_row_idx+w+1, GLP_FX, 1, 1);
+		}
+
+//		printf("n = %d nw*ns  = %d ne = %d\n", n, nw*ns, ne);
+		STARPU_ASSERT(n == ne);
+
+		glp_load_matrix(lp, ne-1, ia, ja, ar);
+	}
+
+	glp_smcp parm;
+	glp_init_smcp(&parm);
+	parm.msg_lev = GLP_MSG_OFF;
+	int ret = glp_simplex(lp, &parm);
+	if (ret)
+	{
+		glp_delete_prob(lp);
+		lp = NULL;
+		return 0.0;
+	}
+
+	double res = glp_get_obj_val(lp);
+
+	printf("Z: %f (must be eq to nw %d)\n", res, nw);
+	for (w = 0; w < nw; w++)
+	{
+		for (t = 0, tp = task_pools; tp; t++, tp = tp->next)
+		{
+			tasks[w][t] = glp_get_col_prim(lp, colnum(w, t));
+//			printf("t%d worker %d ctx %d res %lf \n", t, w, tasks[w][t]);
+		}
+	}
+
+	for(s = 0; s < ns; s++)
+		for(w = 0; w < nw; w++)
+		{
+			w_in_s[s][w] = glp_get_col_prim(lp, nw*nt+s*nw+w);
+			printf("worker %d ctx %d res %lf \n", w, s, w_in_s[s][w]);
+		}
+
+	glp_delete_prob(lp);
+	return res;
+}
+
+static void _redistribute_resources_in_ctxs(int ns, int nw, int nt, double w_in_s[ns][nw])
+{
+	int *sched_ctxs = sched_ctx_hypervisor_get_sched_ctxs();
+        struct bound_task_pool * tp;
+	int s, s2, w, t;
+
+	for(s = 0; s < ns; s++)
+	{
+		int workers_to_add[nw], workers_to_remove[nw];
+		for(w = 0; w < nw; w++)
+		{
+			workers_to_add[w] = -1;
+			workers_to_remove[w] = -1;
+		}
+
+		int nadd = 0, nremove = 0;
+
+		for(w = 0; w < nw; w++)
+		{
+			if(w_in_s[s][w] >= 0.5)
+				workers_to_add[nadd++] = w;
+			else
+				workers_to_remove[nremove++] = w;
+		}
+		
+		sched_ctx_hypervisor_remove_workers_from_sched_ctx(workers_to_remove, nremove, sched_ctxs[s]);
+	
+		sched_ctx_hypervisor_add_workers_to_sched_ctx(workers_to_add, nadd, sched_ctxs[s]);
+		struct policy_config *new_config = sched_ctx_hypervisor_get_config(sched_ctxs[s]);
+		int i;
+		for(i = 0; i < nadd; i++)
+			new_config->max_idle[workers_to_add[i]] = new_config->max_idle[workers_to_add[i]] != MAX_IDLE_TIME ? new_config->max_idle[workers_to_add[i]] :  new_config->new_workers_max_idle;
+	}
+
+}
+
+static int done = 0;
+static void lp3_handle_poped_task(unsigned sched_ctx, int worker)
+{
+	struct sched_ctx_wrapper* sc_w = sched_ctx_hypervisor_get_wrapper(sched_ctx);
+	
+	int ret = pthread_mutex_trylock(&act_hypervisor_mutex);
+	if(ret != EBUSY)
+	{
+		if(sc_w->submitted_flops < sc_w->total_flops)
+		{
+			pthread_mutex_unlock(&act_hypervisor_mutex);
+			return;
+		}
+
+		if(_velocity_gap_btw_ctxs() && !done)
+		{
+			done = 1;
+			int ns = sched_ctx_hypervisor_get_nsched_ctxs();
+			int nw = starpu_worker_get_count(); /* Number of different workers */
+			int nt = 0; /* Number of different kinds of tasks */
+			struct bound_task_pool * tp;
+			for (tp = task_pools; tp; tp = tp->next)
+				nt++;
+			
+			double tasks[nw][nt];
+			double draft_tasks[nw][nt];
+			double w_in_s[ns][nw];
+			double draft_w_in_s[ns][nw];
+
+ 			int w,t, s;
+			for(w = 0; w < nw; w++)
+				for(t = 0; t < nt; t++)
+				{
+					tasks[w][t] = 0.0;
+					draft_tasks[w][t] == 0.0;
+				}
+
+			for(s = 0; s < ns; s++)
+				for(w = 0; w < nw; w++)
+				{
+					w_in_s[s][w] = 0.0;
+					draft_w_in_s[s][w] = 0.0;
+				}
+
+			double tmax = 30000;
+			double res = 1.0;
+			while(tmax >= 1.0)
+			{
+				printf("resolve for tmax = %lf\n", tmax);
+				res = _glp_resolve(ns, nw, nt, draft_tasks, tmax, draft_w_in_s);
+				if(res == (double)nw)
+				{
+					for(w = 0; w < nw; w++)
+						for(t = 0; t < nt; t++)
+							tasks[w][t] = draft_tasks[w][t];
+					for(s = 0; s < ns; s++)
+						for(w = 0; w < nw; w++)
+							w_in_s[s][w] = draft_w_in_s[s][w];
+
+				}
+				else
+				{
+					
+					printf("break\n");
+					break;
+				}
+				tmax /= 2;
+			}
+
+/* 			for(w = 0; w < nw; w++) */
+/* 				for (t = 0, tp = task_pools; tp; t++, tp = tp->next) */
+/* 				{ */
+/* 					if(tasks[w][t] > 0.0) */
+/* 						printf("ctx %d/worker %d/task type %d: res = %lf \n", tp->sched_ctx_id, w, t, tasks[w][t]); */
+/* 				} */
+
+			_redistribute_resources_in_ctxs(ns, nw, nt, w_in_s);
+		}
+		pthread_mutex_unlock(&act_hypervisor_mutex);
+	}		
+}
+
+struct hypervisor_policy lp3_policy = {
+	.handle_poped_task = lp3_handle_poped_task,
+	.handle_pushed_task = NULL,
+	.handle_idle_cycle = NULL,
+	.handle_idle_end = NULL,
+	.handle_post_exec_hook = NULL,
+	.handle_submitted_job = lp3_handle_submitted_job,
+	.custom = 0,
+	.name = "lp3"
+};
+	
+#endif /* HAVE_GLPK_H */
+

+ 15 - 49
sched_ctx_hypervisor/src/hypervisor_policies/lp_policy.c

@@ -160,7 +160,7 @@ static void _glp_resolve(int ns, int nw, double v[ns][nw], double flops[ns], dou
 //	glp_simplex(lp, NULL);
 	
 	double vmax1 = glp_get_obj_val(lp);
-//	printf("vmax1 = %lf \n", vmax1);
+	printf("vmax1 = %lf \n", vmax1);
 
 	n = 1;
 	for(s = 0; s < ns; s++)
@@ -176,42 +176,7 @@ static void _glp_resolve(int ns, int nw, double v[ns][nw], double flops[ns], dou
 	return;
 }
 
-/* check if there is a big velocity gap between the contexts */
-int _velocity_gap_btw_ctxs()
-{
-	int *sched_ctxs = sched_ctx_hypervisor_get_sched_ctxs();
-	int nsched_ctxs = sched_ctx_hypervisor_get_nsched_ctxs();
-	int i = 0, j = 0;
-	struct sched_ctx_wrapper* sc_w;
-	struct sched_ctx_wrapper* other_sc_w;
-	
-	for(i = 0; i < nsched_ctxs; i++)
-	{
-		sc_w = sched_ctx_hypervisor_get_wrapper(sched_ctxs[i]);
-		double ctx_v = _get_ctx_velocity(sc_w);
-		if(ctx_v != 0.0)
-		{
-			for(j = 0; j < nsched_ctxs; j++)
-			{
-				if(sched_ctxs[i] != sched_ctxs[j])
-				{
-					other_sc_w = sched_ctx_hypervisor_get_wrapper(sched_ctxs[j]);
-					double other_ctx_v = _get_ctx_velocity(other_sc_w);
-					if(other_ctx_v != 0.0)
-					{
-						double gap = ctx_v < other_ctx_v ? ctx_v / other_ctx_v : other_ctx_v / ctx_v;
-						if(gap > 0.5)
-							return 1;
-					}
-				}
-			}
-		}
-
-	}
-	return 0;
-}
-
-void _round_double_to_int(int ns, int nw, double res[ns][nw], int res_rounded[ns][nw])
+static void _round_double_to_int(int ns, int nw, double res[ns][nw], int res_rounded[ns][nw])
 {
 	int s, w;
 	double left_res[nw];
@@ -267,7 +232,7 @@ void _round_double_to_int(int ns, int nw, double res[ns][nw], int res_rounded[ns
 	}		
 }
 
-void _redistribute_resources_in_ctxs(int ns, int nw, int res_rounded[ns][nw], double res[ns][nw])
+static void _redistribute_resources_in_ctxs(int ns, int nw, int res_rounded[ns][nw], double res[ns][nw])
 {
 	int *sched_ctxs = sched_ctx_hypervisor_get_sched_ctxs();
 	int s, s2, w;
@@ -375,7 +340,7 @@ void _redistribute_resources_in_ctxs(int ns, int nw, int res_rounded[ns][nw], do
 	}
 }
 
-void lp_handle_poped_task(unsigned sched_ctx, int worker)
+static void lp_handle_poped_task(unsigned sched_ctx, int worker)
 {
 	if(_velocity_gap_btw_ctxs())
 	{
@@ -395,24 +360,25 @@ void lp_handle_poped_task(unsigned sched_ctx, int worker)
 			v[i][0] = 200.0;//_get_velocity_per_worker_type(sc_w, STARPU_CUDA_WORKER);
 			v[i][1] = 20.0;//_get_velocity_per_worker_type(sc_w, STARPU_CPU_WORKER);
 			flops[i] = sc_w->remaining_flops/1000000000; //sc_w->total_flops/1000000000; /* in gflops*/
+			printf("%d: flops %lf\n", sched_ctxs[i], flops[i]);
 		}
                 
 		int ret = pthread_mutex_trylock(&act_hypervisor_mutex);
 		if(ret != EBUSY)
 		{
 			_glp_resolve(nsched_ctxs, 2, v, flops, res);
-/* 			for( i = 0; i < nsched_ctxs; i++) */
-/* 			{ */
-/* 				printf("ctx %d/worker type %d: n = %lf \n", i, 0, res[i][0]); */
-/* 				printf("ctx %d/worker type %d: n = %lf \n", i, 1, res[i][1]); */
-/* 			} */
+			for( i = 0; i < nsched_ctxs; i++)
+			{
+				printf("ctx %d/worker type %d: n = %lf \n", i, 0, res[i][0]);
+				printf("ctx %d/worker type %d: n = %lf \n", i, 1, res[i][1]);
+			}
 			int res_rounded[nsched_ctxs][2];
 			_round_double_to_int(nsched_ctxs, 2, res, res_rounded);
-/* 			for( i = 0; i < nsched_ctxs; i++) */
-/* 			{ */
-/* 				printf("ctx %d/worker type %d: n = %d \n", i, 0, res_rounded[i][0]); */
-/* 				printf("ctx %d/worker type %d: n = %d \n", i, 1, res_rounded[i][1]); */
-/* 			} */
+			for( i = 0; i < nsched_ctxs; i++)
+			{
+				printf("ctx %d/worker type %d: n = %d \n", i, 0, res_rounded[i][0]);
+				printf("ctx %d/worker type %d: n = %d \n", i, 1, res_rounded[i][1]);
+			}
 			
 			_redistribute_resources_in_ctxs(nsched_ctxs, 2, res_rounded, res);
 			

+ 38 - 1
sched_ctx_hypervisor/src/hypervisor_policies/policy_utils.c

@@ -294,7 +294,7 @@ double _get_ctx_velocity(struct sched_ctx_wrapper* sc_w)
 {
         double elapsed_flops = sched_ctx_hypervisor_get_elapsed_flops_per_sched_ctx(sc_w);
 
-        if( elapsed_flops != 0.0)
+        if( elapsed_flops >= 1.0)
         {
                 double curr_time = starpu_timing_now();
                 double elapsed_time = curr_time - sc_w->start_time;
@@ -318,3 +318,40 @@ double _get_velocity_per_worker_type(struct sched_ctx_wrapper* sc_w, enum starpu
 
         return -1.0;
 }
+
+
+/* check if there is a big velocity gap between the contexts */
+int _velocity_gap_btw_ctxs()
+{
+	int *sched_ctxs = sched_ctx_hypervisor_get_sched_ctxs();
+	int nsched_ctxs = sched_ctx_hypervisor_get_nsched_ctxs();
+	int i = 0, j = 0;
+	struct sched_ctx_wrapper* sc_w;
+	struct sched_ctx_wrapper* other_sc_w;
+	
+	for(i = 0; i < nsched_ctxs; i++)
+	{
+		sc_w = sched_ctx_hypervisor_get_wrapper(sched_ctxs[i]);
+		double ctx_v = _get_ctx_velocity(sc_w);
+		if(ctx_v != 0.0)
+		{
+			for(j = 0; j < nsched_ctxs; j++)
+			{
+				if(sched_ctxs[i] != sched_ctxs[j])
+				{
+					other_sc_w = sched_ctx_hypervisor_get_wrapper(sched_ctxs[j]);
+					double other_ctx_v = _get_ctx_velocity(other_sc_w);
+					if(other_ctx_v != 0.0)
+					{
+						double gap = ctx_v < other_ctx_v ? other_ctx_v / ctx_v : ctx_v / other_ctx_v ;
+//						printf("gap = %lf\n", gap);
+						if(gap > 5)
+							return 1;
+					}
+				}
+			}
+		}
+
+	}
+	return 0;
+}

+ 16 - 0
sched_ctx_hypervisor/src/hypervisor_policies/policy_utils.h

@@ -1,6 +1,20 @@
 #include <sched_ctx_hypervisor.h>
 #include <pthread.h>
 
+struct bound_task_pool
+{
+	/* Which codelet has been executed */
+	struct starpu_codelet *cl;
+	/* Task footprint key */
+	uint32_t footprint;
+	/* Context the task belongs to */
+	unsigned sched_ctx_id;
+	/* Number of tasks of this kind */
+	unsigned long n;
+	/* Other task kinds */
+	struct bound_task_pool *next;
+};
+
 unsigned _find_poor_sched_ctx(unsigned req_sched_ctx, int nworkers_to_move);
 
 int* _get_first_workers(unsigned sched_ctx, unsigned *nworkers, enum starpu_archtype arch);
@@ -16,3 +30,5 @@ unsigned _resize_to_unknown_receiver(unsigned sender_sched_ctx);
 double _get_ctx_velocity(struct sched_ctx_wrapper* sc_w);
 
 double _get_velocity_per_worker_type(struct sched_ctx_wrapper* sc_w, enum starpu_archtype arch);
+
+int _velocity_gap_btw_ctxs(void);

+ 5 - 3
sched_ctx_hypervisor/src/sched_ctx_hypervisor.c

@@ -32,6 +32,7 @@ extern struct hypervisor_policy gflops_rate_policy;
 #ifdef HAVE_GLPK_H
 extern struct hypervisor_policy lp_policy;
 extern struct hypervisor_policy lp2_policy;
+extern struct hypervisor_policy lp3_policy;
 #endif
 
 static struct hypervisor_policy *predefined_policies[] = {
@@ -40,6 +41,7 @@ static struct hypervisor_policy *predefined_policies[] = {
 #ifdef HAVE_GLPK_H
 	&lp_policy,
 	&lp2_policy,
+	&lp3_policy,
 #endif
 	&gflops_rate_policy
 };
@@ -336,7 +338,7 @@ int get_nworkers_ctx(unsigned sched_ctx, enum starpu_archtype arch)
 /* forbids another resize request before this one is take into account */
 void sched_ctx_hypervisor_move_workers(unsigned sender_sched_ctx, unsigned receiver_sched_ctx, int* workers_to_move, unsigned nworkers_to_move)
 {
-	if(nworkers_to_move > 0 && hypervisor.resize[sender_sched_ctx] && hypervisor.resize[receiver_sched_ctx])
+	if(nworkers_to_move > 0 && hypervisor.resize[sender_sched_ctx])// && hypervisor.resize[receiver_sched_ctx])
 	{
 		int j;
 		printf("resize ctx %d with", sender_sched_ctx);
@@ -370,7 +372,7 @@ void sched_ctx_hypervisor_move_workers(unsigned sender_sched_ctx, unsigned recei
 			hypervisor.sched_ctx_w[sender_sched_ctx].resize_ack.acked_workers[i] = 0;	
 		}
 
-		pthread_mutex_lock(&hypervisor.sched_ctx_w[sender_sched_ctx].mutex);
+		pthread_mutex_unlock(&hypervisor.sched_ctx_w[sender_sched_ctx].mutex);
 
 		hypervisor.resize[sender_sched_ctx] = 0;
 		hypervisor.resize[receiver_sched_ctx] = 0;
@@ -611,7 +613,7 @@ static void notify_poped_task(unsigned sched_ctx, int worker, double elapsed_flo
 	hypervisor.sched_ctx_w[sched_ctx].poped_tasks[worker]++;
 	hypervisor.sched_ctx_w[sched_ctx].elapsed_flops[worker] += elapsed_flops;
 	hypervisor.sched_ctx_w[sched_ctx].total_elapsed_flops[worker] += elapsed_flops;
-	hypervisor.sched_ctx_w[sched_ctx].remaining_flops -= sched_ctx_hypervisor_get_elapsed_flops_per_sched_ctx(&hypervisor.sched_ctx_w[sched_ctx]);
+	hypervisor.sched_ctx_w[sched_ctx].remaining_flops -= elapsed_flops; //sched_ctx_hypervisor_get_elapsed_flops_per_sched_ctx(&hypervisor.sched_ctx_w[sched_ctx]);
 
 	if(hypervisor.nsched_ctxs > 1)
 	{

+ 16 - 2
src/core/task.c

@@ -361,8 +361,10 @@ int starpu_task_submit(struct starpu_task *task)
 	STARPU_ASSERT(task->magic == 42);
 	unsigned nsched_ctxs = _starpu_get_nsched_ctxs();
 
-	task->sched_ctx = (nsched_ctxs == 1 || task->control_task) ? 
-		0 : starpu_get_sched_ctx();
+	if(task->sched_ctx == 0 && nsched_ctxs != 1 && !task->control_task)
+		task->sched_ctx = starpu_get_sched_ctx();
+//	task->sched_ctx = (nsched_ctxs == 1 || task->control_task) ? 
+//	   0 : starpu_get_sched_ctx());
 	int ret;
 	unsigned is_sync = task->synchronous;
         _STARPU_LOG_IN();
@@ -459,6 +461,13 @@ int _starpu_task_submit_internally(struct starpu_task *task)
 	return starpu_task_submit(task);
 }
 
+/* application should submit new tasks to StarPU through this function */
+int starpu_task_submit_to_ctx(struct starpu_task *task, unsigned sched_ctx_id)
+{
+	task->sched_ctx = sched_ctx_id;
+	starpu_task_submit(task);
+}
+
 /* The StarPU core can submit tasks directly to the scheduler or a worker,
  * skipping dependencies completely (when it knows what it is doing).  */
 int _starpu_task_submit_nodeps(struct starpu_task *task)
@@ -594,6 +603,11 @@ int starpu_task_wait_for_all(void)
 	return 0;
 }
 
+int starpu_task_wait_for_all_in_ctx(unsigned sched_ctx)
+{
+	_starpu_wait_for_all_tasks_of_sched_ctx(sched_ctx);
+	return 0;
+}
 /*
  * We wait until there is no ready task any more (i.e. StarPU will not be able
  * to progress any more).

+ 14 - 1
src/core/workers.c

@@ -482,7 +482,7 @@ int starpu_init(struct starpu_conf *user_conf)
 
 	_starpu_timing_init();
 
-	_starpu_profiling_init();
+//	_starpu_profiling_init();
 
 	_starpu_load_bus_performance_files();
 
@@ -542,6 +542,10 @@ int starpu_init(struct starpu_conf *user_conf)
 	return 0;
 }
 
+void starpu_profiling_init()
+{
+	_starpu_profiling_init();
+}
 /*
  * Handle runtime termination
  */
@@ -635,6 +639,15 @@ static void _starpu_kill_all_workers(struct _starpu_machine_config *config)
 	starpu_wake_all_blocked_workers();
 }
 
+void starpu_display_stats()
+{
+	const char *stats;
+	if ((stats = getenv("STARPU_BUS_STATS")) && atoi(stats))
+		starpu_bus_profiling_helper_display_summary();
+
+	if ((stats = getenv("STARPU_WORKER_STATS")) && atoi(stats))
+		starpu_worker_profiling_helper_display_summary();
+}
 void starpu_shutdown(void)
 {
 	const char *stats;

+ 21 - 1
src/drivers/cpu/driver_cpu.c

@@ -137,7 +137,8 @@ void *_starpu_cpu_worker(void *arg)
 
 	pthread_cond_t *sched_cond = &cpu_arg->sched_cond;
 	pthread_mutex_t *sched_mutex = &cpu_arg->sched_mutex;
-
+	struct timespec start_time, end_time;
+	unsigned idle = 0;
 	while (_starpu_machine_is_running())
 	{
 		_STARPU_TRACE_START_PROGRESS(memnode);
@@ -153,11 +154,30 @@ void *_starpu_cpu_worker(void *arg)
 			_STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
 			if (_starpu_worker_can_block(memnode))
 				_starpu_block_worker(workerid, sched_cond, sched_mutex);
+			else
+			{
+				_starpu_clock_gettime(&start_time);
+				_starpu_worker_register_sleeping_start_date(workerid, &start_time);
+				idle = 1;
 
+			}
 			_STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
 			continue;
 		};
 
+		if(idle)
+		{
+			_starpu_clock_gettime(&end_time);
+			
+			int profiling = starpu_profiling_status_get();
+			if (profiling)
+			{
+				struct timespec sleeping_time;
+				starpu_timespec_sub(&end_time, &start_time, &sleeping_time);
+				_starpu_worker_update_profiling_info_sleeping(workerid, &start_time, &end_time);
+			}
+			idle = 0;
+		}
 
 		STARPU_ASSERT(task);
 		j = _starpu_get_job_associated_to_task(task);

+ 21 - 1
src/drivers/cuda/driver_cuda.c

@@ -309,7 +309,8 @@ void *_starpu_cuda_worker(void *arg)
 
 	pthread_cond_t *sched_cond = &args->sched_cond;
 	pthread_mutex_t *sched_mutex = &args->sched_mutex;
-
+	struct timespec start_time, end_time;
+	unsigned idle = 0;
 	while (_starpu_machine_is_running())
 	{
 		_STARPU_TRACE_START_PROGRESS(memnode);
@@ -323,6 +324,12 @@ void *_starpu_cuda_worker(void *arg)
 			_STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
 			if (_starpu_worker_can_block(memnode))
 				_starpu_block_worker(workerid, sched_cond, sched_mutex);
+			else
+			{
+				_starpu_clock_gettime(&start_time);
+				_starpu_worker_register_sleeping_start_date(workerid, &start_time);
+				idle = 1;
+			}
 		  
 
 			_STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
@@ -330,6 +337,19 @@ void *_starpu_cuda_worker(void *arg)
 			continue;
 		};
 
+		if(idle)
+		{
+			_starpu_clock_gettime(&end_time);
+			
+			int profiling = starpu_profiling_status_get();
+			if (profiling)
+			{
+				struct timespec sleeping_time;
+				starpu_timespec_sub(&end_time, &start_time, &sleeping_time);
+				_starpu_worker_update_profiling_info_sleeping(workerid, &start_time, &end_time);
+			}
+			idle = 0;
+		}
 
 		STARPU_ASSERT(task);
 		j = _starpu_get_job_associated_to_task(task);

+ 8 - 0
src/profiling/profiling_helpers.c

@@ -59,6 +59,9 @@ void starpu_worker_profiling_helper_display_summary(void)
 
 	int workerid;
 	int worker_cnt = starpu_worker_get_count();
+	double all_total_time = 0.0;
+	double all_exec_time = 0.0;
+	double all_sleeping_time = 0.0;
 	for (workerid = 0; workerid < worker_cnt; workerid++)
 	{
 		struct starpu_worker_profiling_info info;
@@ -72,6 +75,9 @@ void starpu_worker_profiling_helper_display_summary(void)
 			double total_time = starpu_timing_timespec_to_us(&info.total_time) / 1000.;
 			double executing_time = starpu_timing_timespec_to_us(&info.executing_time) / 1000.;
 			double sleeping_time = starpu_timing_timespec_to_us(&info.sleeping_time) / 1000.;
+			all_total_time+=total_time;
+			all_exec_time += executing_time;
+			all_sleeping_time += sleeping_time;
 			if (total_time > overall_time)
 				overall_time = total_time;
 
@@ -89,6 +95,8 @@ void starpu_worker_profiling_helper_display_summary(void)
 
 		sum_consumed += info.power_consumed;
 	}
+	fprintf(stderr, "\t total: %.2lf ms executing: %.2lf ms sleeping: %.2lf\n", all_total_time, all_exec_time, all_sleeping_time);
+	fprintf(stderr, "\t total: %.2lf ms executing: %.2lf ms sleeping: %.2lf\n", all_total_time, (all_exec_time/all_total_time)*100, (all_sleeping_time/all_exec_time)*100);
 
 	if (profiling)
 	{