Bläddra i källkod

added hyperviosr policy using linear progr to compute the nr of
workers/ctx

Andra Hugo 13 år sedan
förälder
incheckning
09a51c9c97

+ 3 - 0
include/starpu_config.h.in

@@ -72,6 +72,9 @@
 #undef STARPU_NMAX_SCHED_CTXS
 #undef STARPU_MAXIMPLEMENTATIONS
 
+/* Define to 1 if you have the <glpk.h> header file. */
+#undef HAVE_GLPK_H
+
 #undef STARPU_HAVE_LIBNUMA
 
 #undef STARPU_HAVE_WINDOWS

+ 2 - 1
sched_ctx_hypervisor/src/Makefile.am

@@ -29,7 +29,8 @@ libsched_ctx_hypervisor_la_SOURCES = 			\
 	hypervisor_policies/policy_utils.c		\
 	hypervisor_policies/idle_policy.c		\
 	hypervisor_policies/app_driven_policy.c		\
-	hypervisor_policies/gflops_rate_policy.c	
+	hypervisor_policies/gflops_rate_policy.c	\
+	hypervisor_policies/lp_policy.c	
 
 noinst_HEADERS = sched_ctx_hypervisor_intern.h		\
 	hypervisor_policies/policy_utils.h

+ 1 - 56
sched_ctx_hypervisor/src/hypervisor_policies/gflops_rate_policy.c

@@ -26,33 +26,6 @@ static double _get_total_elapsed_flops_per_sched_ctx(unsigned sched_ctx)
 	return ret_val;
 }
 
-static double _get_elapsed_flops_per_cpus(struct sched_ctx_wrapper* sc_w, int *ncpus)
-{
-	double ret_val = 0.0;
-	struct worker_collection *workers = starpu_get_worker_collection_of_sched_ctx(sc_w->sched_ctx);
-        int worker;
-
-	if(workers->init_cursor)
-                workers->init_cursor(workers);
-
-        while(workers->has_next(workers))
-	{
-                worker = workers->get_next(workers);
-                enum starpu_archtype arch = starpu_worker_get_type(worker);
-                if(arch == STARPU_CPU_WORKER)
-                {
-			ret_val += sc_w->elapsed_flops[worker];
-			(*ncpus)++;
-                }
-        }
-
-	if(workers->init_cursor)
-		workers->deinit_cursor(workers);
-
-	return ret_val;
-}
-
-
 double _get_exp_end(unsigned sched_ctx)
 {
 	struct sched_ctx_wrapper *sc_w = sched_ctx_hypervisor_get_wrapper(sched_ctx);
@@ -68,34 +41,6 @@ double _get_exp_end(unsigned sched_ctx)
 	return -1.0;
 }
 
-double _get_ctx_velocity(struct sched_ctx_wrapper* sc_w)
-{
-        double elapsed_flops = sched_ctx_hypervisor_get_elapsed_flops_per_sched_ctx(sc_w);
-
-        if( elapsed_flops != 0.0)
-        {
-                double curr_time = starpu_timing_now();
-                double elapsed_time = curr_time - sc_w->start_time;
-                return elapsed_flops/elapsed_time;
-        }
-}
-
-/* compute an average value of the cpu velocity */
-double _get_cpu_velocity(struct sched_ctx_wrapper* sc_w)
-{
-        int ncpus = 0;
-        double elapsed_flops = _get_elapsed_flops_per_cpus(sc_w, &ncpus);
-
-        if( elapsed_flops != 0.0)
-        {
-                double curr_time = starpu_timing_now();
-                double elapsed_time = curr_time - sc_w->start_time;
-                return (elapsed_flops/elapsed_time) / ncpus;
-        }
-
-        return -1.0;
-}
-
 /* computes the instructions left to be executed out of the total instructions to execute */
 double _get_flops_left_pct(unsigned sched_ctx)
 {
@@ -116,7 +61,7 @@ static int* _get_workers_to_move(unsigned sender_sched_ctx, unsigned receiver_sc
         double v_receiver = _get_ctx_velocity(receiver_sc_w);
         double receiver_remainig_flops = receiver_sc_w->remaining_flops;
         double sender_exp_end = _get_exp_end(sender_sched_ctx);
-        double sender_v_cpu = _get_cpu_velocity(sender_sc_w);
+        double sender_v_cpu = _get_velocity_per_worker_type(sender_sc_w, STARPU_CPU_WORKER);
         double v_for_rctx = (receiver_remainig_flops/(sender_exp_end - starpu_timing_now())) - v_receiver;
 
         int nworkers_needed = v_for_rctx/sender_v_cpu;

+ 203 - 0
sched_ctx_hypervisor/src/hypervisor_policies/lp_policy.c

@@ -0,0 +1,203 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2011, 2012  INRIA
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include "policy_utils.h"
+
+
+/*                                                                                                                                                                                                                  
+ * GNU Linear Programming Kit backend                                                                                                                                                                               
+ */
+#ifdef HAVE_GLPK_H
+#include <glpk.h>
+static void _glp_resolve(int ns, int nw, double v[ns][nw], double flops[ns])
+{
+	int s, w;
+	glp_prob *lp;
+	
+	int ne =
+		ns * (nw+1)     /* worker execution time */
+		+ ns * nw
+		+ 1; /* glp dumbness */
+	int n = 1;
+	int ia[ne], ja[ne];
+	double ar[ne];
+
+	lp = glp_create_prob();
+
+	glp_set_prob_name(lp, "sample");
+	glp_set_obj_dir(lp, GLP_MAX);
+        glp_set_obj_name(lp, "max speed");
+
+
+#define colnum(s, w) ((w)*ns+(s)+1)
+	/* we add nw*ns columns one for each type of worker in each context 
+	   and another column corresponding to the 1/tmax bound (bc 1/tmax is a variable too)*/
+	glp_add_cols(lp, nw*ns+1);
+	/* Z = 1/tmax -> 1/vmax structural variable, nCPUs & nGPUs in ctx are auxiliar variables */
+	glp_set_obj_coef(lp, nw*ns+1, 1.);
+
+	for(s = 0; s < ns; s++)
+	{
+		for(w = 0; w < nw; w++)
+		{
+			char name[32];
+			snprintf(name, sizeof(name), "worker%dctx%d", w, s);
+			glp_set_col_name(lp, colnum(s,w), name);
+			glp_set_col_bnds(lp, colnum(s,w), GLP_LO, 0.0, 0.0);
+		}
+	}
+	glp_set_col_bnds(lp, nw*ns+1, GLP_DB, 0.0, 1.0);
+
+
+	/* one row corresponds to one ctx*/
+	glp_add_rows(lp, ns);
+
+	for(s = 0; s < ns; s++)
+	{
+		char name[32];
+		snprintf(name, sizeof(name), "ctx%d", s);
+		glp_set_row_name(lp, s+1, name);
+		glp_set_row_bnds(lp, s+1, GLP_LO, 0., 0.);
+		for(w = 0; w < nw; w++)
+		{
+			ia[n] = s+1;
+			ja[n] = colnum(s, w);
+			ar[n] = v[s][w];
+			printf("v[%d][%d] = %lf\n", s, w, v[s][w]);
+			n++;
+		}
+		/* 1/tmax */
+		ia[n] = s+1;
+		ja[n] = ns*nw+1;
+		ar[n] = (-1) * flops[s];
+		printf("%d: flops %lf\n", s, flops[s]);
+		n++;
+	}
+	
+	/*we add another linear constraint : sum(all cpus) = 3 and sum(all gpus) = 9 */
+	glp_add_rows(lp, nw);
+
+	for(w = 0; w < nw; w++)
+	{
+		char name[32];
+		snprintf(name, sizeof(name), "w%d", w);
+		glp_set_row_name(lp, ns+w+1, name);
+		for(s = 0; s < ns; s++)
+		{
+			ia[n] = ns+w+1;
+			ja[n] = colnum(s,w);
+			ar[n] = 1;
+			n++;
+		}
+		if(w == 0) glp_set_row_bnds(lp, ns+w+1, GLP_FX, 3., 3.);
+		if(w == 1) glp_set_row_bnds(lp, ns+w+1, GLP_FX, 9., 9.);
+	}
+
+
+	STARPU_ASSERT(n == ne);
+
+	glp_load_matrix(lp, ne-1, ia, ja, ar);
+
+	glp_simplex(lp, NULL);
+	double vmax1 = glp_get_obj_val(lp);
+	printf("vmax1 = %lf \n", vmax1);
+	double res[ne];
+	n = 1;
+	for(w = 0; w < nw; w++)
+	{
+		for(s = 0; s < ns; s++)
+		{
+			res[n] = glp_get_col_prim(lp, colnum(s,w));
+			printf("ctx %d/worker type %d: n = %lf \n", s, w, res[n]);
+			n++;
+		}
+	}
+//	res[n] = glp_get_col_prim(lp, ns*nw+1);
+//	printf("vmax = %lf \n", res[n]);
+
+	glp_delete_prob(lp);
+	return;
+}
+
+/* check if there is a big velocity gap between the contexts */
+int _velocity_gap_btw_ctxs()
+{
+	int *sched_ctxs = sched_ctx_hypervisor_get_sched_ctxs();
+	int nsched_ctxs = sched_ctx_hypervisor_get_nsched_ctxs();
+	int i = 0, j = 0;
+	struct sched_ctx_wrapper* sc_w;
+	struct sched_ctx_wrapper* other_sc_w;
+	
+	for(i = 0; i < nsched_ctxs; i++)
+	{
+		sc_w = sched_ctx_hypervisor_get_wrapper(sched_ctxs[i]);
+		double ctx_v = _get_ctx_velocity(sc_w);
+		for(j = 0; j < nsched_ctxs; j++)
+		{
+			if(sched_ctxs[i] != sched_ctxs[j])
+			{
+				other_sc_w = sched_ctx_hypervisor_get_wrapper(sched_ctxs[j]);
+				double other_ctx_v = _get_ctx_velocity(other_sc_w);
+				double gap = ctx_v < other_ctx_v ? ctx_v / other_ctx_v : other_ctx_v / ctx_v;
+				if(gap > 0.5)
+					return 1;
+			}
+		}
+
+	}
+	return 0;
+}
+
+void lp_handle_poped_task(unsigned sched_ctx, int worker)
+{
+	if(_velocity_gap_btw_ctxs())
+	{
+		int *sched_ctxs = sched_ctx_hypervisor_get_sched_ctxs();
+		int nsched_ctxs = sched_ctx_hypervisor_get_nsched_ctxs();
+		
+		double v[nsched_ctxs][2];
+		double flops[nsched_ctxs];
+		int i = 0;
+		struct sched_ctx_wrapper* sc_w;
+		for(i = 0; i < nsched_ctxs; i++)
+		{
+			sc_w = sched_ctx_hypervisor_get_wrapper(sched_ctxs[i]);
+			v[i][0] = 200.0;//_get_velocity_per_worker_type(sc_w, STARPU_CUDA_WORKER);
+			v[i][1] = 20.0;//_get_velocity_per_worker_type(sc_w, STARPU_CPU_WORKER);
+			flops[i] = sc_w->total_flops;
+		}
+                
+		int ret = pthread_mutex_trylock(&act_hypervisor_mutex);
+		if(ret != EBUSY)
+		{
+			_glp_resolve(nsched_ctxs, 2, v, flops);
+			pthread_mutex_unlock(&act_hypervisor_mutex);
+		}
+	}		
+}
+
+struct hypervisor_policy lp_policy = {
+	.handle_poped_task = lp_handle_poped_task,
+	.handle_pushed_task = NULL,
+	.handle_idle_cycle = NULL,
+	.handle_idle_end = NULL,
+	.handle_post_exec_hook = NULL,
+	.custom = 0,
+	.name = "lp"
+};
+	
+#endif /* HAVE_GLPK_H */
+

+ 59 - 4
sched_ctx_hypervisor/src/hypervisor_policies/policy_utils.c

@@ -1,6 +1,8 @@
 #include <sched_ctx_hypervisor.h>
 #include <pthread.h>
 
+enum starpu_archtype STARPU_ALL;
+
 static int _compute_priority(unsigned sched_ctx)
 {
 	struct policy_config *config = sched_ctx_hypervisor_get_config(sched_ctx);
@@ -84,7 +86,7 @@ int* _get_first_workers(unsigned sched_ctx, unsigned *nworkers, enum starpu_arch
 			considered = 0;
 			worker = workers->get_next(workers);
 			enum starpu_archtype curr_arch = starpu_worker_get_type(worker);
-			if(arch == -1 || curr_arch == arch)
+			if(arch == STARPU_ALL || curr_arch == arch)
 			{
 
 				if(!config->fixed_workers[worker])
@@ -149,7 +151,7 @@ unsigned _get_potential_nworkers(struct policy_config *config, unsigned sched_ct
 	{
 		worker = workers->get_next(workers);
 		enum starpu_archtype curr_arch = starpu_worker_get_type(worker);
-                if(arch == 0 || curr_arch == arch)
+                if(arch == STARPU_ALL || curr_arch == arch)
                 {
 			if(!config->fixed_workers[worker])
 				potential_workers++;
@@ -170,7 +172,7 @@ unsigned _get_nworkers_to_move(unsigned req_sched_ctx)
 	unsigned nworkers = starpu_get_nworkers_of_sched_ctx(req_sched_ctx);
 	unsigned nworkers_to_move = 0;
 	
-	unsigned potential_moving_workers = _get_potential_nworkers(config, req_sched_ctx, 0);
+	unsigned potential_moving_workers = _get_potential_nworkers(config, req_sched_ctx, STARPU_ALL);
 	if(potential_moving_workers > 0)
 	{
 		if(potential_moving_workers <= config->min_nworkers)
@@ -238,7 +240,7 @@ unsigned _resize(unsigned sender_sched_ctx, unsigned receiver_sched_ctx, unsigne
 
 			if(poor_sched_ctx != STARPU_NMAX_SCHED_CTXS)
 			{						
-				int *workers_to_move = _get_first_workers(sender_sched_ctx, &nworkers_to_move, -1);
+				int *workers_to_move = _get_first_workers(sender_sched_ctx, &nworkers_to_move, STARPU_ALL);
 				sched_ctx_hypervisor_move_workers(sender_sched_ctx, poor_sched_ctx, workers_to_move, nworkers_to_move);
 				
 				struct policy_config *new_config = sched_ctx_hypervisor_get_config(poor_sched_ctx);
@@ -262,3 +264,56 @@ unsigned _resize_to_unknown_receiver(unsigned sender_sched_ctx)
 	return _resize(sender_sched_ctx, STARPU_NMAX_SCHED_CTXS, 0);
 }
 
+static double _get_elapsed_flops(struct sched_ctx_wrapper* sc_w, int *npus, enum starpu_archtype req_arch)
+{
+	double ret_val = 0.0;
+	struct worker_collection *workers = starpu_get_worker_collection_of_sched_ctx(sc_w->sched_ctx);
+        int worker;
+
+	if(workers->init_cursor)
+                workers->init_cursor(workers);
+
+        while(workers->has_next(workers))
+	{
+                worker = workers->get_next(workers);
+                enum starpu_archtype arch = starpu_worker_get_type(worker);
+                if(arch == req_arch)
+                {
+			ret_val += sc_w->elapsed_flops[worker];
+			(*npus)++;
+                }
+        }
+
+	if(workers->init_cursor)
+		workers->deinit_cursor(workers);
+
+	return ret_val;
+}
+
+double _get_ctx_velocity(struct sched_ctx_wrapper* sc_w)
+{
+        double elapsed_flops = sched_ctx_hypervisor_get_elapsed_flops_per_sched_ctx(sc_w);
+
+        if( elapsed_flops != 0.0)
+        {
+                double curr_time = starpu_timing_now();
+                double elapsed_time = curr_time - sc_w->start_time;
+                return elapsed_flops/elapsed_time;
+        }
+}
+
+/* compute an average value of the cpu velocity */
+double _get_velocity_per_worker_type(struct sched_ctx_wrapper* sc_w, enum starpu_archtype arch)
+{
+        int npus = 0;
+        double elapsed_flops = _get_elapsed_flops(sc_w, &npus, arch);
+
+        if( elapsed_flops != 0.0)
+        {
+                double curr_time = starpu_timing_now();
+                double elapsed_time = curr_time - sc_w->start_time;
+                return (elapsed_flops/elapsed_time) / npus;
+        }
+
+        return -1.0;
+}

+ 4 - 0
sched_ctx_hypervisor/src/hypervisor_policies/policy_utils.h

@@ -12,3 +12,7 @@ unsigned _get_nworkers_to_move(unsigned req_sched_ctx);
 unsigned _resize(unsigned sender_sched_ctx, unsigned receiver_sched_ctx, unsigned force_resize);
 
 unsigned _resize_to_unknown_receiver(unsigned sender_sched_ctx);
+
+double _get_ctx_velocity(struct sched_ctx_wrapper* sc_w);
+
+double _get_velocity_per_worker_type(struct sched_ctx_wrapper* sc_w, enum starpu_archtype arch);

+ 1 - 1
sched_ctx_hypervisor/src/sched_ctx_config.c

@@ -71,7 +71,7 @@ void _add_config(unsigned sched_ctx)
 {
 	struct policy_config *config = _create_config();
 	config->min_nworkers = 0;
-	config->max_nworkers = 0;	
+	config->max_nworkers = STARPU_NMAXWORKERS;	
 	config->new_workers_max_idle = MAX_IDLE_TIME;
 
 	int i;

+ 7 - 1
sched_ctx_hypervisor/src/sched_ctx_hypervisor.c

@@ -29,10 +29,16 @@ static void notify_idle_end(unsigned sched_ctx, int  worker);
 extern struct hypervisor_policy idle_policy;
 extern struct hypervisor_policy app_driven_policy;
 extern struct hypervisor_policy gflops_rate_policy;
+#ifdef HAVE_GLPK_H
+extern struct hypervisor_policy lp_policy;
+#endif
 
 static struct hypervisor_policy *predefined_policies[] = {
         &idle_policy,
 	&app_driven_policy,
+#ifdef HAVE_GLPK_H
+	&lp_policy,
+#endif
 	&gflops_rate_policy
 };
 
@@ -492,7 +498,7 @@ static void notify_pushed_task(unsigned sched_ctx, int worker)
 	
 	int ntasks = get_ntasks(hypervisor.sched_ctx_w[sched_ctx].pushed_tasks);
 	
-	if(!(hypervisor.resize[sched_ctx] == 0 && imposed_resize) && ntasks == hypervisor.min_tasks)
+	if(hypervisor.min_tasks == 0 || (!(hypervisor.resize[sched_ctx] == 0 && imposed_resize) && ntasks == hypervisor.min_tasks))
 	{
 		hypervisor.resize[sched_ctx] = 1;
 		if(imposed_resize) imposed_resize = 0;