Browse Source

add instant speed resizing policy

Andra Hugo 12 years ago
parent
commit
81a4c78ea0

+ 9 - 0
sched_ctx_hypervisor/include/sched_ctx_hypervisor.h

@@ -42,6 +42,8 @@ extern "C"
 #define HYPERVISOR_TIME_TO_APPLY -10
 #define HYPERVISOR_EMPTY_CTX_MAX_IDLE -11
 #define HYPERVISOR_NULL -12
+#define	HYPERVISOR_ISPEED_W_SAMPLE -13
+#define HYPERVISOR_ISPEED_CTX_SAMPLE -14
 
 pthread_mutex_t act_hypervisor_mutex;
 
@@ -78,6 +80,13 @@ struct sched_ctx_hypervisor_policy_config
 
 	/* above this context we allow removing all workers */
 	double empty_ctx_max_idle[STARPU_NMAXWORKERS];
+
+	/* sample used to compute the instant speed per worker*/
+	double ispeed_w_sample[STARPU_NMAXWORKERS];
+
+	/* sample used to compute the instant speed per ctx*/
+	double ispeed_ctx_sample;
+
 };
 
 struct sched_ctx_hypervisor_resize_ack

+ 2 - 1
sched_ctx_hypervisor/src/Makefile.am

@@ -32,7 +32,8 @@ libsched_ctx_hypervisor_la_SOURCES = 			\
 	hypervisor_policies/app_driven_policy.c		\
 	hypervisor_policies/gflops_rate_policy.c	\
 	hypervisor_policies/lp_policy.c			\
-	hypervisor_policies/lp2_policy.c
+	hypervisor_policies/lp2_policy.c		\
+	hypervisor_policies/ispeed_policy.c
 
 noinst_HEADERS = sched_ctx_hypervisor_intern.h		\
 	hypervisor_policies/policy_tools.h		\

+ 191 - 0
sched_ctx_hypervisor/src/hypervisor_policies/ispeed_policy.c

@@ -0,0 +1,191 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2011, 2012  INRIA
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include "policy_tools.h"
+
+static unsigned _get_fastest_sched_ctx(void)
+{
+	int *sched_ctxs = sched_ctx_hypervisor_get_sched_ctxs();
+	int nsched_ctxs = sched_ctx_hypervisor_get_nsched_ctxs();
+
+	int fastest_sched_ctx = STARPU_NMAX_SCHED_CTXS;
+	double curr_velocity = 0.0;
+	double biggest_velocity = 0.0;
+	int i;
+	for(i = 0; i < nsched_ctxs; i++)
+	{
+		curr_velocity = _get_ctx_velocity(sched_ctx_hypervisor_get_wrapper(sched_ctxs[i]));
+		if( curr_velocity > biggest_velocity)
+		{
+			fastest_sched_ctx = sched_ctxs[i];
+			biggest_velocity = curr_velocity;
+		}
+	}
+
+	return fastest_sched_ctx;
+}
+
+static unsigned _get_slowest_sched_ctx(void)
+{
+	int *sched_ctxs = sched_ctx_hypervisor_get_sched_ctxs();
+	int nsched_ctxs = sched_ctx_hypervisor_get_nsched_ctxs();
+
+	double smallest_velocity = _get_ctx_velocity(sched_ctx_hypervisor_get_wrapper(sched_ctxs[0]));
+	unsigned slowest_sched_ctx = smallest_velocity == 0.0  ? STARPU_NMAX_SCHED_CTXS : sched_ctxs[0];
+	double curr_velocity = 0.0;
+	int i;
+	for(i = 1; i < nsched_ctxs; i++)
+	{
+		curr_velocity = _get_ctx_velocity(sched_ctx_hypervisor_get_wrapper(sched_ctxs[i]));
+		if((curr_velocity < smallest_velocity || smallest_velocity == 0.0) && curr_velocity != 0.0)
+		{
+			smallest_velocity = curr_velocity;
+			slowest_sched_ctx = sched_ctxs[i];
+		}
+	}
+
+	return slowest_sched_ctx;
+}
+
+
+/* get first nworkers with the highest idle time in the context */
+static int* _get_slowest_workers(unsigned sched_ctx, int *nworkers, enum starpu_archtype arch)
+{
+	struct sched_ctx_hypervisor_wrapper* sc_w = sched_ctx_hypervisor_get_wrapper(sched_ctx);
+	struct sched_ctx_hypervisor_policy_config *config = sched_ctx_hypervisor_get_config(sched_ctx);
+
+	int *curr_workers = (int*)malloc((*nworkers) * sizeof(int));
+	int i;
+	for(i = 0; i < *nworkers; i++)
+		curr_workers[i] = -1;
+
+	struct starpu_sched_ctx_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx);
+	int index;
+	int worker;
+	int considered = 0;
+
+	struct starpu_iterator it;
+	if(workers->init_iterator)
+		workers->init_iterator(workers, &it);
+
+	for(index = 0; index < *nworkers; index++)
+	{
+		while(workers->has_next(workers, &it))
+		{
+			considered = 0;
+			worker = workers->get_next(workers, &it);
+			enum starpu_archtype curr_arch = starpu_worker_get_type(worker);
+			if(arch == STARPU_ANY_WORKER || curr_arch == arch)
+			{
+
+				if(!config->fixed_workers[worker])
+				{
+					for(i = 0; i < index; i++)
+					{
+						if(curr_workers[i] == worker)
+						{
+							considered = 1;
+							break;
+						}
+					}
+
+					if(!considered)
+					{
+						double worker_velocity = _get_velocity_per_worker(sc_w, worker);
+						if(worker_velocity != -1.0)
+						{
+							/* the first iteration*/
+							if(curr_workers[index] < 0)
+								curr_workers[index] = worker;
+							/* small priority worker is the first to leave the ctx*/
+							else if(config->priority[worker] <
+							config->priority[curr_workers[index]])
+								curr_workers[index] = worker;
+							/* if we don't consider priorities check for the workers
+							   with the biggest idle time */
+							else if(config->priority[worker] ==
+								config->priority[curr_workers[index]])
+							{
+								double curr_worker_velocity = _get_velocity_per_worker(sc_w, curr_workers[index]);
+								if(worker_velocity < curr_worker_velocity && curr_worker_velocity != -1.0)
+								{
+									curr_workers[index] = worker;
+								}
+							}
+						}
+					}
+				}
+			}
+		}
+
+		if(curr_workers[index] < 0)
+		{
+			*nworkers = index;
+			break;
+		}
+	}
+	return curr_workers;
+}			
+
+static void ispeed_handle_poped_task(unsigned sched_ctx, int worker)
+{
+	int ret = pthread_mutex_trylock(&act_hypervisor_mutex);
+	if(ret != EBUSY)
+	{
+		if(_velocity_gap_btw_ctxs())
+		{
+			unsigned fastest_sched_ctx = _get_fastest_sched_ctx();
+			unsigned slowest_sched_ctx = _get_slowest_sched_ctx();
+			if(fastest_sched_ctx != STARPU_NMAX_SCHED_CTXS && slowest_sched_ctx != STARPU_NMAX_SCHED_CTXS && fastest_sched_ctx != slowest_sched_ctx)
+			{
+				int nworkers_to_move = _get_nworkers_to_move(fastest_sched_ctx);
+				if(nworkers_to_move > 0)
+				{
+					int *workers_to_move = _get_slowest_workers(fastest_sched_ctx, &nworkers_to_move, STARPU_ANY_WORKER);
+					if(nworkers_to_move > 0)
+					{
+						double new_speed = 0.0;
+						int i;
+						for(i = 0; i < nworkers_to_move; i++)
+							new_speed += _get_velocity_per_worker(sched_ctx_hypervisor_get_wrapper(fastest_sched_ctx), workers_to_move[i]);
+						double fastest_speed = _get_ctx_velocity(sched_ctx_hypervisor_get_wrapper(fastest_sched_ctx));
+						double slowest_speed = _get_ctx_velocity(sched_ctx_hypervisor_get_wrapper(slowest_sched_ctx));
+						if((slowest_speed + new_speed) <= (fastest_speed - new_speed))
+						{
+							sched_ctx_hypervisor_move_workers(fastest_sched_ctx, slowest_sched_ctx, workers_to_move, nworkers_to_move, 0);
+						}
+					}
+					
+					free(workers_to_move);
+				}
+
+			}
+		}
+		pthread_mutex_unlock(&act_hypervisor_mutex);
+	}
+}
+
+struct sched_ctx_hypervisor_policy ispeed_policy = {
+	.size_ctxs = NULL,
+	.handle_poped_task = ispeed_handle_poped_task,
+	.handle_pushed_task = NULL,
+	.handle_idle_cycle = NULL,
+	.handle_idle_end = NULL,
+	.handle_post_exec_hook = NULL,
+	.handle_submitted_job = NULL,
+	.custom = 0,
+	.name = "ispeed"
+};

+ 21 - 2
sched_ctx_hypervisor/src/hypervisor_policies/policy_tools.c

@@ -324,10 +324,12 @@ static double _get_elapsed_flops(struct sched_ctx_hypervisor_wrapper* sc_w, int
 
 double _get_ctx_velocity(struct sched_ctx_hypervisor_wrapper* sc_w)
 {
+	struct sched_ctx_hypervisor_policy_config *config = sched_ctx_hypervisor_get_config(sc_w->sched_ctx);
         double elapsed_flops = sched_ctx_hypervisor_get_elapsed_flops_per_sched_ctx(sc_w);
 	double total_elapsed_flops = sched_ctx_hypervisor_get_total_elapsed_flops_per_sched_ctx(sc_w);
-	double prc = elapsed_flops/sc_w->total_flops;
-	double redim_sample = elapsed_flops == total_elapsed_flops ? HYPERVISOR_START_REDIM_SAMPLE : HYPERVISOR_REDIM_SAMPLE;
+	double prc = config->ispeed_ctx_sample != 0.0 ? elapsed_flops : elapsed_flops/sc_w->total_flops;
+	double redim_sample = config->ispeed_ctx_sample != 0.0 ? config->ispeed_ctx_sample : 
+		(elapsed_flops == total_elapsed_flops ? HYPERVISOR_START_REDIM_SAMPLE : HYPERVISOR_REDIM_SAMPLE);
 	if(prc >= redim_sample)
         {
                 double curr_time = starpu_timing_now();
@@ -337,6 +339,23 @@ double _get_ctx_velocity(struct sched_ctx_hypervisor_wrapper* sc_w)
 	return 0.0;
 }
 
+double _get_velocity_per_worker(struct sched_ctx_hypervisor_wrapper *sc_w, unsigned worker)
+{
+        double elapsed_flops = sc_w->elapsed_flops[worker];
+	struct sched_ctx_hypervisor_policy_config *config = sched_ctx_hypervisor_get_config(sc_w->sched_ctx);
+	double sample = config->ispeed_w_sample[worker];
+
+        if( elapsed_flops >= sample)
+        {
+                double curr_time = starpu_timing_now();
+                double elapsed_time = curr_time - sc_w->start_time;
+                return (elapsed_flops/elapsed_time);
+        }
+
+        return -1.0;
+
+}
+
 /* compute an average value of the cpu velocity */
 double _get_velocity_per_worker_type(struct sched_ctx_hypervisor_wrapper* sc_w, enum starpu_archtype arch)
 {

+ 2 - 0
sched_ctx_hypervisor/src/hypervisor_policies/policy_tools.h

@@ -50,6 +50,8 @@ unsigned _resize_to_unknown_receiver(unsigned sender_sched_ctx, unsigned now);
 
 double _get_ctx_velocity(struct sched_ctx_hypervisor_wrapper* sc_w);
 
+double _get_velocity_per_worker(struct sched_ctx_hypervisor_wrapper *sc_w, unsigned worker); 
+
 double _get_velocity_per_worker_type(struct sched_ctx_hypervisor_wrapper* sc_w, enum starpu_archtype arch);
 
 int _velocity_gap_btw_ctxs(void);

+ 15 - 0
sched_ctx_hypervisor/src/sched_ctx_config.c

@@ -22,6 +22,7 @@ static struct sched_ctx_hypervisor_policy_config* _create_config(void)
 	config->min_nworkers = -1;
 	config->max_nworkers = -1;
 	config->new_workers_max_idle = -1.0;
+	config->ispeed_ctx_sample = 0.0;
 
 	int i;
 	for(i = 0; i < STARPU_NMAXWORKERS; i++)
@@ -32,6 +33,7 @@ static struct sched_ctx_hypervisor_policy_config* _create_config(void)
 		config->max_idle[i] = -1.0;
 		config->empty_ctx_max_idle[i] = -1.0;
 		config->min_working[i] = -1.0;
+		config->ispeed_w_sample[i] = 0.0;
 	}
 
 	return config;
@@ -180,6 +182,19 @@ static struct sched_ctx_hypervisor_policy_config* _ioctl(unsigned sched_ctx, va_
 			config->new_workers_max_idle = va_arg(varg_list, double);
 			break;
 
+		case HYPERVISOR_ISPEED_W_SAMPLE:
+			workerids = va_arg(varg_list, int*);
+			nworkers = va_arg(varg_list, int);
+			double sample = va_arg(varg_list, double);
+
+			for(i = 0; i < nworkers; i++)
+				config->ispeed_w_sample[workerids[i]] = sample;
+			break;
+
+		case HYPERVISOR_ISPEED_CTX_SAMPLE:
+			config->ispeed_ctx_sample = va_arg(varg_list, double);
+			break;
+
 /* not important for the strateg, needed just to jump these args in the iteration of the args */
 		case HYPERVISOR_TIME_TO_APPLY:
 			va_arg(varg_list, int);

+ 3 - 1
sched_ctx_hypervisor/src/sched_ctx_hypervisor.c

@@ -34,6 +34,7 @@ extern struct sched_ctx_hypervisor_policy gflops_rate_policy;
 #ifdef STARPU_HAVE_GLPK_H
 extern struct sched_ctx_hypervisor_policy lp_policy;
 extern struct sched_ctx_hypervisor_policy lp2_policy;
+extern struct sched_ctx_hypervisor_policy ispeed_policy;
 #endif // STARPU_HAVE_GLPK_H
 
 
@@ -45,7 +46,8 @@ static struct sched_ctx_hypervisor_policy *predefined_policies[] =
 	&lp_policy,
 	&lp2_policy,
 #endif // STARPU_HAVE_GLPK_H
-	&gflops_rate_policy
+	&gflops_rate_policy,
+	&ispeed_policy
 };
 
 static void _load_hypervisor_policy(struct sched_ctx_hypervisor_policy *policy)

+ 2 - 0
src/core/sched_ctx.c

@@ -601,7 +601,9 @@ void starpu_sched_ctx_add_workers(int *workers_to_add, int nworkers_to_add, unsi
 		_starpu_add_workers_to_sched_ctx(sched_ctx, workers_to_add, nworkers_to_add, added_workers, &n_added_workers);
 		
 		if(n_added_workers > 0)
+		{
 			_starpu_update_workers_with_ctx(added_workers, n_added_workers, sched_ctx->id);
+		}
 
 		_starpu_fetch_tasks_from_empty_ctx_list(sched_ctx);
 	}