瀏覽代碼

refactoring + bug fixing

Andra Hugo 13 年之前
父節點
當前提交
0a6bb3ea83

+ 1 - 3
sched_ctx_hypervisor/include/sched_ctx_hypervisor.h

@@ -103,7 +103,5 @@ double sched_ctx_hypervisor_get_flops_left(unsigned sched_ctx);
 #define GFLOPS_RATE_POLICY 3
 
 struct hypervisor_policy {
-	void (*manage_idle_time)(unsigned req_sched_ctx, int worker, double idle_time);
-	void (*manage_gflops_rate)(unsigned sched_ctx);
-	unsigned (*resize)(unsigned sched_ctx, int *sched_ctxs, unsigned nsched_ctxs);
+	unsigned (*resize)(unsigned sched_ctx);
 };

+ 9 - 5
sched_ctx_hypervisor/src/Makefile.am

@@ -25,9 +25,13 @@ lib_LTLIBRARIES = libsched_ctx_hypervisor.la
 
 libsched_ctx_hypervisor_la_LIBADD = $(top_builddir)/src/libstarpu.la
 
-libsched_ctx_hypervisor_la_SOURCES = 	\
-	sched_ctx_hypervisor.c		\
-	sched_ctx_config.c		\
-	hypervisor_policies/simple_policy.c
+libsched_ctx_hypervisor_la_SOURCES = 			\
+	sched_ctx_hypervisor.c				\
+	sched_ctx_config.c				\
+	hypervisor_policies/policy_utils.c		\
+	hypervisor_policies/idle_policy.c		\
+	hypervisor_policies/app_driven_policy.c		\
+	hypervisor_policies/gflops_rate_policy.c	
 
-noinst_HEADERS = sched_ctx_hypervisor_intern.h
+noinst_HEADERS = sched_ctx_hypervisor_intern.h		\
+	hypervisor_policies/policy_utils.h

+ 5 - 0
sched_ctx_hypervisor/src/hypervisor_policies/app_driven_policy.c

@@ -0,0 +1,5 @@
+#include "policy_utils.h"
+
+struct hypervisor_policy app_driven_policy = {
+	.resize = _resize_to_unknown_receiver
+};

+ 232 - 0
sched_ctx_hypervisor/src/hypervisor_policies/gflops_rate_policy.c

@@ -0,0 +1,232 @@
+#include "policy_utils.h"
+
+static int* _get_workers_to_move(unsigned sender_sched_ctx, unsigned receiver_sched_ctx, int *nworkers)
+{
+        int *workers = NULL;
+        double v_receiver = sched_ctx_hypervisor_get_ctx_velocity(receiver_sched_ctx);
+        double receiver_remainig_flops = sched_ctx_hypervisor_get_flops_left(receiver_sched_ctx);
+        double sender_exp_end = sched_ctx_hypervisor_get_exp_end(sender_sched_ctx);
+        double sender_v_cpu = sched_ctx_hypervisor_get_cpu_velocity(sender_sched_ctx);
+//      double v_gcpu = sched_ctx_hypervisor_get_gpu_velocity(sender_sched_ctx);
+        double v_for_rctx = (receiver_remainig_flops/(sender_exp_end - starpu_timing_now())) - v_receiver;
+//      v_for_rctx /= 2;                                                        
+        int nworkers_needed = v_for_rctx/sender_v_cpu;
+/*      printf("%d->%d: v_rec %lf v %lf v_cpu %lf w_needed %d \n", sender_sched_ctx, receiver_sched_ctx, */
+/*             v_receiver, v_for_rctx, sender_v_cpu, nworkers_needed); */
+        if(nworkers_needed > 0)
+        {
+                struct policy_config *sender_config = sched_ctx_hypervisor_get_config(sender_sched_ctx);
+                unsigned potential_moving_cpus = _get_potential_nworkers(sender_config, sender_sched_ctx, STARPU_CPU_WORKER);
+                unsigned potential_moving_gpus = _get_potential_nworkers(sender_config, sender_sched_ctx, STARPU_CUDA_WORKER);
+                unsigned sender_nworkers = starpu_get_nworkers_of_sched_ctx(sender_sched_ctx);
+                struct policy_config *config = sched_ctx_hypervisor_get_config(receiver_sched_ctx);
+                unsigned nworkers_ctx = starpu_get_nworkers_of_sched_ctx(receiver_sched_ctx);
+
+                if(nworkers_needed < (potential_moving_cpus + 5 * potential_moving_gpus))
+                {
+                        if((sender_nworkers - nworkers_needed) >= sender_config->min_nworkers)
+                        {
+                                if((nworkers_ctx + nworkers_needed) > config->max_nworkers)
+                                        nworkers_needed = nworkers_ctx > config->max_nworkers ? 0 : (config->max_nworkers - nworkers_ctx);
+
+                                if(nworkers_needed > 0)
+                                {
+                                        int ngpus = nworkers_needed / 5;
+                                        int *gpus;
+                                        gpus = _get_first_workers(sender_sched_ctx, &ngpus, STARPU_CUDA_WORKER);
+                                        int ncpus = nworkers_needed - ngpus;
+                                        int *cpus;
+                                        cpus = _get_first_workers(sender_sched_ctx, &ncpus, STARPU_CPU_WORKER);
+                                        workers = (int*)malloc(nworkers_needed*sizeof(int));
+                                        int i;
+					printf("%d: gpus: ", nworkers_needed);
+                                        for(i = 0; i < ngpus; i++)
+					{
+                                                workers[(*nworkers)++] = gpus[i];
+						printf("%d ", gpus[i]);
+					}
+					printf(" cpus:");
+                                        for(i = 0; i < ncpus; i++)
+					{
+                                                workers[(*nworkers)++] = cpus[i];
+						printf("%d ", cpus[i]);
+					}
+					printf("\n");
+                                        free(gpus);
+                                        free(cpus);
+                                }
+                        }
+                }
+		else
+                {
+//			printf("nworkers_needed = %d\n", nworkers_needed);
+                        int nworkers_to_move = _get_nworkers_to_move(sender_sched_ctx);
+
+                        if(sender_nworkers - nworkers_to_move >= sender_config->min_nworkers)
+                        {
+                                unsigned nshared_workers = starpu_get_nshared_workers(sender_sched_ctx, receiver_sched_ctx);
+                                if((nworkers_ctx + nworkers_to_move - nshared_workers) > config->max_nworkers)
+                                        nworkers_to_move = nworkers_ctx > config->max_nworkers ? 0 : (config->max_nworkers - nworkers_ctx + nshared_workers);
+
+                                if(nworkers_to_move > 0)
+                                {
+                                        workers = _get_first_workers(sender_sched_ctx, &nworkers_to_move, -1);
+                                        *nworkers = nworkers_to_move;
+                                }
+                        }
+                }
+        }
+        return workers;
+}
+
+static unsigned _gflops_rate_resize(unsigned sender_sched_ctx, unsigned receiver_sched_ctx, unsigned force_resize)
+{
+        int ret = 1;
+        if(force_resize)
+                pthread_mutex_lock(&act_hypervisor_mutex);
+        else
+                ret = pthread_mutex_trylock(&act_hypervisor_mutex);
+        if(ret != EBUSY)
+        {
+                int nworkers_to_move = 0;
+                int *workers_to_move =  _get_workers_to_move(sender_sched_ctx, receiver_sched_ctx, &nworkers_to_move);
+		if(nworkers_to_move > 0)
+                {
+                        sched_ctx_hypervisor_move_workers(sender_sched_ctx, receiver_sched_ctx, workers_to_move, nworkers_to_move);
+
+                        struct policy_config *new_config = sched_ctx_hypervisor_get_config(receiver_sched_ctx);
+                        int i;
+                        for(i = 0; i < nworkers_to_move; i++)
+                                new_config->max_idle[workers_to_move[i]] = new_config->max_idle[workers_to_move[i]] !=MAX_IDLE_TIME ? new_config->max_idle[workers_to_move[i]] :  new_config->new_workers_max_idle;
+
+                        free(workers_to_move);
+                }
+                pthread_mutex_unlock(&act_hypervisor_mutex);
+                return 1;
+        }
+        return 0;
+
+}
+
+static int _find_fastest_sched_ctx()
+{
+	int *sched_ctxs = sched_ctx_hypervisor_get_sched_ctxs();
+	int nsched_ctxs = sched_ctx_hypervisor_get_nsched_ctxs();
+
+	double first_exp_end = sched_ctx_hypervisor_get_exp_end(sched_ctxs[0]);
+	int fastest_sched_ctx = first_exp_end == -1.0  ? -1 : sched_ctxs[0];
+	double curr_exp_end = 0.0;
+	int i;
+	for(i = 1; i < nsched_ctxs; i++)
+	{
+		curr_exp_end = sched_ctx_hypervisor_get_exp_end(sched_ctxs[i]);
+		if((curr_exp_end < first_exp_end || first_exp_end == -1.0) && curr_exp_end != -1.0)
+		{
+			first_exp_end = curr_exp_end;
+			fastest_sched_ctx = sched_ctxs[i];
+		}
+	}
+
+	return fastest_sched_ctx;
+
+}
+
+static int _find_slowest_sched_ctx()
+{
+	int *sched_ctxs = sched_ctx_hypervisor_get_sched_ctxs();
+	int nsched_ctxs = sched_ctx_hypervisor_get_nsched_ctxs();
+
+	int slowest_sched_ctx = -1;
+	double curr_exp_end = 0.0;
+	double last_exp_end = -1.0;
+	int i;
+	for(i = 0; i < nsched_ctxs; i++)
+	{
+		curr_exp_end = sched_ctx_hypervisor_get_exp_end(sched_ctxs[i]);
+		/*if it hasn't started bc of no ressources give it priority */
+		if(curr_exp_end == -1.0)
+			return sched_ctxs[i];
+		if( curr_exp_end > last_exp_end)
+		{
+			slowest_sched_ctx = sched_ctxs[i];
+			last_exp_end = curr_exp_end;
+		}
+	}
+
+	return slowest_sched_ctx;
+
+}
+
+static int _find_slowest_available_sched_ctx(unsigned sched_ctx)
+{
+	int *sched_ctxs = sched_ctx_hypervisor_get_sched_ctxs();
+	int nsched_ctxs = sched_ctx_hypervisor_get_nsched_ctxs();
+
+	int slowest_sched_ctx = -1;
+	double curr_exp_end = 0.0;
+	double last_exp_end = -1.0;
+	int i;
+	for(i = 0; i < nsched_ctxs; i++)
+	{
+		if(sched_ctxs[i] != sched_ctx)
+		{
+			curr_exp_end = sched_ctx_hypervisor_get_exp_end(sched_ctxs[i]);
+			/*if it hasn't started bc of no ressources give it priority */
+			if(curr_exp_end == -1.0)
+				return sched_ctxs[i];
+			if(last_exp_end < curr_exp_end)
+			{
+				slowest_sched_ctx = sched_ctxs[i];
+				last_exp_end = curr_exp_end;
+			}
+		}
+	}
+
+	return slowest_sched_ctx;
+
+}
+
+static void gflops_rate_resize(unsigned sched_ctx)
+{
+	double exp_end = sched_ctx_hypervisor_get_exp_end(sched_ctx);
+	double flops_left_pct = sched_ctx_hypervisor_get_flops_left_pct(sched_ctx);
+
+	if(flops_left_pct == 0.0f)
+	{
+		int slowest_sched_ctx = _find_slowest_available_sched_ctx(sched_ctx);
+		if(slowest_sched_ctx != -1)
+		{
+			double slowest_flops_left_pct = sched_ctx_hypervisor_get_flops_left_pct(slowest_sched_ctx);
+			if(slowest_flops_left_pct != 0.0f)
+			{
+				struct policy_config* config = sched_ctx_hypervisor_get_config(sched_ctx);
+				config->min_nworkers = 0;
+				config->max_nworkers = 0;
+				printf("ctx %d finished & gives away the res to %d; slow_left %lf\n", sched_ctx, slowest_sched_ctx, slowest_flops_left_pct);
+				_resize(sched_ctx, slowest_sched_ctx, 1);
+				sched_ctx_hypervisor_stop_resize(slowest_sched_ctx);
+			}
+		}
+	}
+
+	int fastest_sched_ctx = _find_fastest_sched_ctx();
+	int slowest_sched_ctx = _find_slowest_sched_ctx();
+
+//	printf("%d %d \n", fastest_sched_ctx, slowest_sched_ctx);
+	if(fastest_sched_ctx != -1 && slowest_sched_ctx != -1 && fastest_sched_ctx != slowest_sched_ctx)
+	{
+		double fastest_exp_end = sched_ctx_hypervisor_get_exp_end(fastest_sched_ctx);
+		double slowest_exp_end = sched_ctx_hypervisor_get_exp_end(slowest_sched_ctx);
+
+		if((slowest_exp_end == -1.0 && fastest_exp_end != -1.0) || ((fastest_exp_end + (fastest_exp_end*0.5)) < slowest_exp_end ))
+		{
+			double fast_flops_left_pct = sched_ctx_hypervisor_get_flops_left_pct(fastest_sched_ctx);
+			if(fast_flops_left_pct < 0.8)
+				_gflops_rate_resize(fastest_sched_ctx, slowest_sched_ctx, 0);
+		}
+	}
+}
+
+struct hypervisor_policy gflops_rate_policy = {
+	.resize = gflops_rate_resize
+};

+ 5 - 0
sched_ctx_hypervisor/src/hypervisor_policies/idle_policy.c

@@ -0,0 +1,5 @@
+#include "policy_utils.h"
+
+struct hypervisor_policy idle_policy = {
+	.resize = _resize_to_unknown_receiver
+};

+ 257 - 0
sched_ctx_hypervisor/src/hypervisor_policies/policy_utils.c

@@ -0,0 +1,257 @@
+#include <sched_ctx_hypervisor.h>
+#include <pthread.h>
+
+static int _compute_priority(unsigned sched_ctx)
+{
+	struct policy_config *config = sched_ctx_hypervisor_get_config(sched_ctx);
+
+	int total_priority = 0;
+
+	struct worker_collection *workers = starpu_get_worker_collection_of_sched_ctx(sched_ctx);
+	int worker;
+
+	if(workers->init_cursor)
+		workers->init_cursor(workers);
+
+	while(workers->has_next(workers))
+	{
+		worker = workers->get_next(workers);
+		total_priority += config->priority[worker];
+	}
+
+	if(workers->init_cursor)
+		workers->deinit_cursor(workers);
+	return total_priority;
+}
+
+unsigned _find_poor_sched_ctx(unsigned req_sched_ctx, int nworkers_to_move)
+{
+	int i;
+	int highest_priority = -1;
+	int current_priority = 0;
+	unsigned sched_ctx = STARPU_NMAX_SCHED_CTXS;
+	int *sched_ctxs = sched_ctx_hypervisor_get_sched_ctxs();
+	int nsched_ctxs = sched_ctx_hypervisor_get_nsched_ctxs();
+
+
+	struct policy_config *config = NULL;
+
+	for(i = 0; i < nsched_ctxs; i++)
+	{
+		if(sched_ctxs[i] != STARPU_NMAX_SCHED_CTXS && sched_ctxs[i] != req_sched_ctx)
+		{
+			unsigned nworkers = starpu_get_nworkers_of_sched_ctx(sched_ctxs[i]);
+			config  = sched_ctx_hypervisor_get_config(sched_ctxs[i]);
+			if((nworkers + nworkers_to_move) <= config->max_nworkers)
+			{
+				current_priority = _compute_priority(sched_ctxs[i]);
+				if (highest_priority < current_priority)
+				{
+					highest_priority = current_priority;
+					sched_ctx = sched_ctxs[i];
+				}
+			}
+		}
+	}
+	
+	return sched_ctx;
+}
+
+int* _get_first_workers(unsigned sched_ctx, unsigned *nworkers, enum starpu_archtype arch)
+{
+	struct policy_config *config = sched_ctx_hypervisor_get_config(sched_ctx);
+
+	int *curr_workers = (int*)malloc((*nworkers) * sizeof(int));
+	int i;
+	for(i = 0; i < *nworkers; i++)
+		curr_workers[i] = -1;
+
+	struct worker_collection *workers = starpu_get_worker_collection_of_sched_ctx(sched_ctx);
+	int index;
+	int worker;
+	int considered = 0;
+
+	if(workers->init_cursor)
+		workers->init_cursor(workers);
+
+	for(index = 0; index < *nworkers; index++)
+	{
+		while(workers->has_next(workers))
+		{
+			considered = 0;
+			worker = workers->get_next(workers);
+			enum starpu_archtype curr_arch = starpu_worker_get_type(worker);
+			if(arch == -1 || curr_arch == arch)
+			{
+
+				if(!config->fixed_workers[worker])
+				{
+					for(i = 0; i < index; i++)
+					{
+						if(curr_workers[i] == worker)
+						{
+							considered = 1;
+							break;
+						}
+					}
+					
+					if(!considered)
+					{
+						/* the first iteration*/
+						if(curr_workers[index] < 0)
+						curr_workers[index] = worker;
+						/* small priority worker is the first to leave the ctx*/
+						else if(config->priority[worker] <
+							config->priority[curr_workers[index]])
+						curr_workers[index] = worker;
+						/* if we don't consider priorities check for the workers
+						   with the biggest idle time */
+						else if(config->priority[worker] ==
+							config->priority[curr_workers[index]])
+						{
+							double worker_idle_time = sched_ctx_hypervisor_get_idle_time(sched_ctx, worker);
+							double curr_worker_idle_time = sched_ctx_hypervisor_get_idle_time(sched_ctx, curr_workers[index]);
+							if(worker_idle_time > curr_worker_idle_time)
+								curr_workers[index] = worker;
+						}
+					}
+				}
+			}
+		}
+			
+		if(curr_workers[index] < 0)
+		{
+			*nworkers = index;
+			break;
+		}
+	}
+
+	if(workers->init_cursor)
+		workers->deinit_cursor(workers);
+
+	return curr_workers;
+}
+
+unsigned _get_potential_nworkers(struct policy_config *config, unsigned sched_ctx, enum starpu_archtype arch)
+{
+	struct worker_collection *workers = starpu_get_worker_collection_of_sched_ctx(sched_ctx);
+
+	unsigned potential_workers = 0;
+	int worker;
+
+	if(workers->init_cursor)
+		workers->init_cursor(workers);
+	while(workers->has_next(workers))
+	{
+		worker = workers->get_next(workers);
+		enum starpu_archtype curr_arch = starpu_worker_get_type(worker);
+                if(arch == 0 || curr_arch == arch)
+                {
+			if(!config->fixed_workers[worker])
+				potential_workers++;
+		}
+	}
+	if(workers->init_cursor)
+		workers->deinit_cursor(workers);
+	
+	return potential_workers;
+}
+
+unsigned _get_nworkers_to_move(unsigned req_sched_ctx)
+{
+       	struct policy_config *config = sched_ctx_hypervisor_get_config(req_sched_ctx);
+	unsigned nworkers = starpu_get_nworkers_of_sched_ctx(req_sched_ctx);
+	unsigned nworkers_to_move = 0;
+	
+	unsigned potential_moving_workers = _get_potential_nworkers(config, req_sched_ctx, 0);
+	if(potential_moving_workers > 0)
+	{
+		if(potential_moving_workers <= config->min_nworkers)
+			/* if we have to give more than min better give it all */ 
+			/* => empty ctx will block until having the required workers */
+			
+			nworkers_to_move = potential_moving_workers; 
+		else if(potential_moving_workers > config->max_nworkers)
+		{
+			if((potential_moving_workers - config->granularity) > config->max_nworkers)
+//				nworkers_to_move = config->granularity;
+				nworkers_to_move = potential_moving_workers;
+			else
+				nworkers_to_move = potential_moving_workers - config->max_nworkers;
+ 
+		}
+		else if(potential_moving_workers > config->granularity)
+		{
+			if((nworkers - config->granularity) > config->min_nworkers)	
+				nworkers_to_move = config->granularity;
+			else
+				nworkers_to_move = potential_moving_workers - config->min_nworkers;
+		}
+		else
+		{
+			int nfixed_workers = nworkers - potential_moving_workers;
+			if(nfixed_workers >= config->min_nworkers)
+				nworkers_to_move = potential_moving_workers;
+			else
+				nworkers_to_move = potential_moving_workers - (config->min_nworkers - nfixed_workers);	
+		}
+
+		if((nworkers - nworkers_to_move) > config->max_nworkers)
+			nworkers_to_move = nworkers - config->max_nworkers;
+	}
+	return nworkers_to_move;
+}
+
+unsigned _resize(unsigned sender_sched_ctx, unsigned receiver_sched_ctx, unsigned force_resize)
+{
+	int ret = 1;
+	if(force_resize)
+		pthread_mutex_lock(&act_hypervisor_mutex);
+	else
+		ret = pthread_mutex_trylock(&act_hypervisor_mutex);
+	if(ret != EBUSY)
+	{					
+		unsigned nworkers_to_move = _get_nworkers_to_move(sender_sched_ctx);
+		if(nworkers_to_move > 0)
+		{
+			unsigned poor_sched_ctx = STARPU_NMAX_SCHED_CTXS;
+			if(receiver_sched_ctx == STARPU_NMAX_SCHED_CTXS)
+				poor_sched_ctx = _find_poor_sched_ctx(sender_sched_ctx, nworkers_to_move);
+			else
+			{
+				poor_sched_ctx = receiver_sched_ctx;
+				struct policy_config *config = sched_ctx_hypervisor_get_config(poor_sched_ctx);
+				unsigned nworkers = starpu_get_nworkers_of_sched_ctx(poor_sched_ctx);
+				unsigned nshared_workers = starpu_get_nshared_workers(sender_sched_ctx, poor_sched_ctx);
+				if((nworkers+nworkers_to_move-nshared_workers) > config->max_nworkers)
+					nworkers_to_move = nworkers > config->max_nworkers ? 0 : (config->max_nworkers - nworkers+nshared_workers);
+				if(nworkers_to_move == 0) poor_sched_ctx = STARPU_NMAX_SCHED_CTXS;
+			}
+
+
+			if(poor_sched_ctx != STARPU_NMAX_SCHED_CTXS)
+			{						
+				int *workers_to_move = _get_first_workers(sender_sched_ctx, &nworkers_to_move, -1);
+				sched_ctx_hypervisor_move_workers(sender_sched_ctx, poor_sched_ctx, workers_to_move, nworkers_to_move);
+				
+				struct policy_config *new_config = sched_ctx_hypervisor_get_config(poor_sched_ctx);
+				int i;
+				for(i = 0; i < nworkers_to_move; i++)
+					new_config->max_idle[workers_to_move[i]] = new_config->max_idle[workers_to_move[i]] !=MAX_IDLE_TIME ? new_config->max_idle[workers_to_move[i]] :  new_config->new_workers_max_idle;
+				
+				free(workers_to_move);
+			}
+		}	
+		pthread_mutex_unlock(&act_hypervisor_mutex);
+		return 1;
+	}
+	return 0;
+
+}
+
+
+unsigned _resize_to_unknown_receiver(unsigned sender_sched_ctx)
+{
+	return _resize(sender_sched_ctx, STARPU_NMAX_SCHED_CTXS, 0);
+}
+

+ 14 - 0
sched_ctx_hypervisor/src/hypervisor_policies/policy_utils.h

@@ -0,0 +1,14 @@
+#include <sched_ctx_hypervisor.h>
+#include <pthread.h>
+
+unsigned _find_poor_sched_ctx(unsigned req_sched_ctx, int nworkers_to_move);
+
+int* _get_first_workers(unsigned sched_ctx, unsigned *nworkers, enum starpu_archtype arch);
+
+unsigned _get_potential_nworkers(struct policy_config *config, unsigned sched_ctx, enum starpu_archtype arch);
+
+unsigned _get_nworkers_to_move(unsigned req_sched_ctx);
+
+unsigned _resize(unsigned sender_sched_ctx, unsigned receiver_sched_ctx, unsigned force_resize);
+
+unsigned _resize_to_unknown_receiver(unsigned sender_sched_ctx);

+ 0 - 1
sched_ctx_hypervisor/src/sched_ctx_config.c

@@ -23,7 +23,6 @@ static struct policy_config* _create_config(void)
 
 static void _update_config(struct policy_config *old, struct policy_config* new)
 {
-	printf("new = %d old = %d\n", new->max_nworkers, old->max_nworkers);
 	old->min_nworkers = new->min_nworkers != -1 ? new->min_nworkers : old->min_nworkers ;
 	old->max_nworkers = new->max_nworkers != -1 ? new->max_nworkers : old->max_nworkers ;
 	old->new_workers_max_idle = new->new_workers_max_idle != -1.0 ? new->new_workers_max_idle : old->new_workers_max_idle;

+ 21 - 16
sched_ctx_hypervisor/src/sched_ctx_hypervisor.c

@@ -33,9 +33,7 @@ static void _load_hypervisor_policy(int type)
 
 	}
 
-	hypervisor.policy.manage_idle_time = policy->manage_idle_time;
 	hypervisor.policy.resize = policy->resize;
-	hypervisor.policy.manage_gflops_rate = policy->manage_gflops_rate;
 }
 
 struct starpu_sched_ctx_hypervisor_criteria** sched_ctx_hypervisor_init(int type)
@@ -225,7 +223,8 @@ static void _get_cpus(int *workers, int nworkers, int *cpus, int *ncpus)
 
 void sched_ctx_hypervisor_move_workers(unsigned sender_sched_ctx, unsigned receiver_sched_ctx, int* workers_to_move, unsigned nworkers_to_move)
 {
-	if(hypervisor.resize[sender_sched_ctx] || sched_ctx_hypervisor_get_flops_left_pct(sender_sched_ctx) == 0.0f)
+	if((hypervisor.resize[sender_sched_ctx] || sched_ctx_hypervisor_get_flops_left_pct(sender_sched_ctx) == 0.0f) &&
+	   nworkers_to_move > 0)
 	{
 		int j;
 		printf("resize ctx %d with", sender_sched_ctx);
@@ -295,7 +294,7 @@ unsigned sched_ctx_hypervisor_resize(unsigned sched_ctx, int task_tag)
 {
 	if(task_tag == -1)
 	{
-		return hypervisor.policy.resize(sched_ctx, hypervisor.sched_ctxs, hypervisor.nsched_ctxs);
+		return app_driven_policy.resize(sched_ctx);
 	}
 	else
 	{	
@@ -395,20 +394,25 @@ static void reset_idle_time_cb(unsigned sched_ctx, int worker)
 
 static void idle_time_cb(unsigned sched_ctx, int worker, double idle_time)
 {
-	if(hypervisor.resize[sched_ctx] && hypervisor.nsched_ctxs > 1 && hypervisor.policy.manage_idle_time)
+	if(hypervisor.nsched_ctxs > 1 && idle_policy.resize)
 	{
-		hypervisor.sched_ctx_w[sched_ctx].current_idle_time[worker] += idle_time;
-		hypervisor.policy.manage_idle_time(sched_ctx, worker, hypervisor.sched_ctx_w[sched_ctx].current_idle_time[worker]);
-		
+		struct sched_ctx_wrapper *sc_w = &hypervisor.sched_ctx_w[sched_ctx];
+		sc_w->current_idle_time[worker] += idle_time;
+		if(hypervisor.resize[sched_ctx])
+		{
+			struct policy_config *config = sc_w->config;
+			if(config != NULL &&  sc_w->current_idle_time[worker] > config->max_idle[worker])
+				idle_policy.resize(sched_ctx);
+		}		
+		else if(sc_w->resize_ack.receiver_sched_ctx != -1)
+		{
+			_check_for_resize_ack(sched_ctx, sc_w->resize_ack.receiver_sched_ctx,
+					      sc_w->resize_ack.moved_workers, sc_w->resize_ack.nmoved_workers);
+		}
 	}
 	return;
 }
 
-static void working_time_cb(unsigned sched_ctx, int worker, double working_time, unsigned current_nworkers)
-{
-	return;
-}
-
 int* sched_ctx_hypervisor_get_sched_ctxs()
 {
 	return hypervisor.sched_ctxs;
@@ -551,10 +555,10 @@ static void poped_task_cb(unsigned sched_ctx, int worker, double elapsed_flops)
 	{
 		struct sched_ctx_wrapper *sc_w = &hypervisor.sched_ctx_w[sched_ctx];
 		if(hypervisor.resize[sched_ctx] || sched_ctx_hypervisor_get_flops_left_pct(sched_ctx) == 0.0f)
-			hypervisor.policy.manage_gflops_rate(sched_ctx);
+			gflops_rate_policy.resize(sched_ctx);
 		else if(sc_w->resize_ack.receiver_sched_ctx != -1)
 		{
-			_check_for_resize_ack(sched_ctx, sc_w->resize_ack.receiver_sched_ctx, 
+			_check_for_resize_ack(sched_ctx, sc_w->resize_ack.receiver_sched_ctx,
 					      sc_w->resize_ack.moved_workers, sc_w->resize_ack.nmoved_workers);
 		}
 	}
@@ -564,7 +568,8 @@ static void post_exec_hook_cb(unsigned sched_ctx, int task_tag)
 {
 	STARPU_ASSERT(task_tag > 0);
 
-	if(hypervisor.nsched_ctxs > 1 && hypervisor.resize[sched_ctx])
+//	if(hypervisor.nsched_ctxs > 1 && hypervisor.resize[sched_ctx])
+	if(hypervisor.nsched_ctxs > 1)
 	{
 		unsigned conf_sched_ctx;
 		int i;

+ 10 - 5
src/core/sched_ctx.c

@@ -166,7 +166,7 @@ struct starpu_sched_ctx*  _starpu_create_sched_ctx(const char *policy_name, int
 				  const char *sched_name)
 {
 	struct starpu_machine_config_s *config = (struct starpu_machine_config_s *)_starpu_get_machine_config();
-	STARPU_ASSERT(config->topology.nsched_ctxs < STARPU_NMAX_SCHED_CTXS - 1);
+	STARPU_ASSERT(config->topology.nsched_ctxs < STARPU_NMAX_SCHED_CTXS);
 
 	unsigned id = _starpu_get_first_free_sched_ctx(config);
 
@@ -545,9 +545,11 @@ void starpu_set_sched_ctx(unsigned *sched_ctx)
 
 unsigned starpu_get_sched_ctx()
 {
-	unsigned sched_ctx = *(unsigned*)pthread_getspecific(sched_ctx_key);
-	STARPU_ASSERT(sched_ctx >= 0 && sched_ctx < STARPU_NMAX_SCHED_CTXS);
-	return sched_ctx;
+	unsigned *sched_ctx = (unsigned*)pthread_getspecific(sched_ctx_key);
+	if(sched_ctx == NULL)
+		return STARPU_NMAX_SCHED_CTXS;
+	STARPU_ASSERT(*sched_ctx >= 0 && *sched_ctx < STARPU_NMAX_SCHED_CTXS);
+	return *sched_ctx;
 }
 
 unsigned _starpu_get_nsched_ctxs()
@@ -641,7 +643,10 @@ pthread_mutex_t* starpu_get_changing_ctx_mutex(unsigned sched_ctx_id)
 unsigned starpu_get_nworkers_of_sched_ctx(unsigned sched_ctx_id)
 {
 	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	return sched_ctx->workers->nworkers;
+	if(sched_ctx != NULL)
+		return sched_ctx->workers->nworkers;
+	else 
+		return 0;
 
 }
 

+ 25 - 0
src/debug/traces/starpu_fxt.c

@@ -338,6 +338,14 @@ static void create_paje_state_if_not_found(char *name, struct starpu_fxt_options
 		fprintf(out_paje_file, "6       %s       S       %s \"%f %f %f\" \n", name, name, red, green, blue);
 		fprintf(out_paje_file, "6       %s       Ctx1       %s \"255.0 255.0 0.0\" \n", name, name);
 		fprintf(out_paje_file, "6       %s       Ctx2       %s \".0 255.0 .0\" \n", name, name);
+		fprintf(out_paje_file, "6       %s       Ctx3       %s \"75.0 .0 130.0\" \n", name, name);
+		fprintf(out_paje_file, "6       %s       Ctx4       %s \".0 245.0 255.0\" \n", name, name);
+		fprintf(out_paje_file, "6       %s       Ctx5       %s \".0 .0 .0\" \n", name, name);
+		fprintf(out_paje_file, "6       %s       Ctx6       %s \".0 .0 128.0\" \n", name, name);
+		fprintf(out_paje_file, "6       %s       Ctx7       %s \"105.0 105.0 105.0\" \n", name, name);
+		fprintf(out_paje_file, "6       %s       Ctx8       %s \"255.0 .0 255.0\" \n", name, name);
+		fprintf(out_paje_file, "6       %s       Ctx9       %s \".0 .0 1.0\" \n", name, name);
+		fprintf(out_paje_file, "6       %s       Ctx10       %s \"154.0 205.0 50.0\" \n", name, name);
 
 	}
 		
@@ -372,6 +380,23 @@ static void handle_start_codelet_body(struct fxt_ev_64 *ev, struct starpu_fxt_op
 		  fprintf(out_paje_file, "10       %f	Ctx1      %s%"PRIu64"      %s\n", start_codelet_time, prefix, ev->param[2], name);
 		else if(sched_ctx == 2)
 		  fprintf(out_paje_file, "10       %f	Ctx2      %s%"PRIu64"      %s\n", start_codelet_time, prefix, ev->param[2], name);
+		else if(sched_ctx == 3)
+		  fprintf(out_paje_file, "10       %f	Ctx3      %s%"PRIu64"      %s\n", start_codelet_time, prefix, ev->param[2], name);
+		else if(sched_ctx == 4)
+		  fprintf(out_paje_file, "10       %f	Ctx4      %s%"PRIu64"      %s\n", start_codelet_time, prefix, ev->param[2], name);
+		else if(sched_ctx == 5)
+		  fprintf(out_paje_file, "10       %f	Ctx5      %s%"PRIu64"      %s\n", start_codelet_time, prefix, ev->param[2], name);
+		else if(sched_ctx == 6)
+		  fprintf(out_paje_file, "10       %f	Ctx6      %s%"PRIu64"      %s\n", start_codelet_time, prefix, ev->param[2], name);
+		else if(sched_ctx == 7)
+		  fprintf(out_paje_file, "10       %f	Ctx7      %s%"PRIu64"      %s\n", start_codelet_time, prefix, ev->param[2], name);
+		else if(sched_ctx == 8)
+		  fprintf(out_paje_file, "10       %f	Ctx8      %s%"PRIu64"      %s\n", start_codelet_time, prefix, ev->param[2], name);
+		else if(sched_ctx == 9)
+		  fprintf(out_paje_file, "10       %f	Ctx9      %s%"PRIu64"      %s\n", start_codelet_time, prefix, ev->param[2], name);
+		else if(sched_ctx == 10)
+		  fprintf(out_paje_file, "10       %f	Ctx10      %s%"PRIu64"      %s\n", start_codelet_time, prefix, ev->param[2], name);
+
 	}
 	
 }

+ 72 - 0
src/debug/traces/starpu_paje.c

@@ -135,6 +135,14 @@ void starpu_fxt_write_paje_header(FILE *file)
 	3       S       T       \"Thread State\"                        \n \
 	3       Ctx1      T     \"InCtx1\"         		\n \
 	3       Ctx2      T     \"InCtx2\"         		\n \
+	3       Ctx3      T     \"InCtx3\"         		\n \
+	3       Ctx4      T     \"InCtx4\"         		\n \
+	3       Ctx5      T     \"InCtx5\"         		\n \
+	3       Ctx6      T     \"InCtx6\"         		\n \
+	3       Ctx7      T     \"InCtx7\"         		\n \
+	3       Ctx8      T     \"InCtx8\"         		\n \
+	3       Ctx9      T     \"InCtx9\"         		\n \
+	3       Ctx10     T     \"InCtx10\"         		\n \
 	3       MS       Mn       \"Memory Node State\"                        \n \
 	4       ntask    Sc       \"Number of tasks\"                        \n \
 	4       bw      Mn       \"Bandwidth\"                        \n \
@@ -162,6 +170,70 @@ void starpu_fxt_write_paje_header(FILE *file)
 	6       B       Ctx2       Blocked         \".9 .1 .0\"		\n \
 	6       Sl       Ctx2      Sleeping         \".9 .1 .0\"		\n \
 	6       P       Ctx2       Progressing         \".4 .1 .6\"		\n \
+	6       I       Ctx3      Initializing       \"0.0 .7 1.0\"            \n \
+	6       D       Ctx3      Deinitializing       \"0.0 .1 .7\"            \n \
+	6       Fi       Ctx3      FetchingInput       \"1.0 .1 1.0\"            \n \
+	6       Po       Ctx3      PushingOutput       \"0.1 1.0 1.0\"            \n \
+	6       C       Ctx3       Callback       \".0 .3 .8\"            \n \
+	6       B       Ctx3       Blocked         \".9 .1 .0\"		\n \
+	6       Sl       Ctx3      Sleeping         \".9 .1 .0\"		\n \
+	6       P       Ctx3       Progressing         \".4 .1 .6\"		\n \
+	6       I       Ctx4      Initializing       \"0.0 .7 1.0\"            \n \
+	6       D       Ctx4      Deinitializing       \"0.0 .1 .7\"            \n \
+	6       Fi       Ctx4      FetchingInput       \"1.0 .1 1.0\"            \n \
+	6       Po       Ctx4      PushingOutput       \"0.1 1.0 1.0\"            \n \
+	6       C       Ctx4       Callback       \".0 .3 .8\"            \n \
+	6       B       Ctx4       Blocked         \".9 .1 .0\"		\n \
+	6       Sl       Ctx4      Sleeping         \".9 .1 .0\"		\n \
+	6       P       Ctx4       Progressing         \".4 .1 .6\"		\n \
+	6       I       Ctx5      Initializing       \"0.0 .7 1.0\"            \n \
+	6       D       Ctx5      Deinitializing       \"0.0 .1 .7\"            \n \
+	6       Fi       Ctx5      FetchingInput       \"1.0 .1 1.0\"            \n \
+	6       Po       Ctx5      PushingOutput       \"0.1 1.0 1.0\"            \n \
+	6       C       Ctx5       Callback       \".0 .3 .8\"            \n \
+	6       B       Ctx5       Blocked         \".9 .1 .0\"		\n \
+	6       Sl       Ctx5      Sleeping         \".9 .1 .0\"		\n \
+	6       P       Ctx5       Progressing         \".4 .1 .6\"		\n \
+	6       I       Ctx6      Initializing       \"0.0 .7 1.0\"            \n \
+	6       D       Ctx6      Deinitializing       \"0.0 .1 .7\"            \n \
+	6       Fi       Ctx6      FetchingInput       \"1.0 .1 1.0\"            \n \
+	6       Po       Ctx6      PushingOutput       \"0.1 1.0 1.0\"            \n \
+	6       C       Ctx6       Callback       \".0 .3 .8\"            \n \
+	6       B       Ctx6       Blocked         \".9 .1 .0\"		\n \
+	6       Sl       Ctx6      Sleeping         \".9 .1 .0\"		\n \
+	6       P       Ctx6       Progressing         \".4 .1 .6\"		\n \
+	6       I       Ctx7      Initializing       \"0.0 .7 1.0\"            \n \
+	6       D       Ctx7      Deinitializing       \"0.0 .1 .7\"            \n \
+	6       Fi       Ctx7      FetchingInput       \"1.0 .1 1.0\"            \n \
+	6       Po       Ctx7      PushingOutput       \"0.1 1.0 1.0\"            \n \
+	6       C       Ctx7       Callback       \".0 .3 .8\"            \n \
+	6       B       Ctx7       Blocked         \".9 .1 .0\"		\n \
+	6       Sl       Ctx7      Sleeping         \".9 .1 .0\"		\n \
+	6       P       Ctx7       Progressing         \".4 .1 .6\"		\n \
+	6       I       Ctx8      Initializing       \"0.0 .7 1.0\"            \n \
+	6       D       Ctx8      Deinitializing       \"0.0 .1 .7\"            \n \
+	6       Fi       Ctx8      FetchingInput       \"1.0 .1 1.0\"            \n \
+	6       Po       Ctx8      PushingOutput       \"0.1 1.0 1.0\"            \n \
+	6       C       Ctx8       Callback       \".0 .3 .8\"            \n \
+	6       B       Ctx8       Blocked         \".9 .1 .0\"		\n \
+	6       Sl       Ctx8      Sleeping         \".9 .1 .0\"		\n \
+	6       P       Ctx8       Progressing         \".4 .1 .6\"		\n \
+	6       I       Ctx9      Initializing       \"0.0 .7 1.0\"            \n \
+	6       D       Ctx9      Deinitializing       \"0.0 .1 .7\"            \n \
+	6       Fi       Ctx9      FetchingInput       \"1.0 .1 1.0\"            \n \
+	6       Po       Ctx9      PushingOutput       \"0.1 1.0 1.0\"            \n \
+	6       C       Ctx9       Callback       \".0 .3 .8\"            \n \
+	6       B       Ctx9       Blocked         \".9 .1 .0\"		\n \
+	6       Sl       Ctx9      Sleeping         \".9 .1 .0\"		\n \
+	6       P       Ctx9       Progressing         \".4 .1 .6\"		\n \
+	6       I       Ctx10      Initializing       \"0.0 .7 1.0\"            \n \
+	6       D       Ctx10      Deinitializing       \"0.0 .1 .7\"            \n \
+	6       Fi       Ctx10      FetchingInput       \"1.0 .1 1.0\"            \n \
+	6       Po       Ctx10      PushingOutput       \"0.1 1.0 1.0\"            \n \
+	6       C       Ctx10       Callback       \".0 .3 .8\"            \n \
+	6       B       Ctx10       Blocked         \".9 .1 .0\"		\n \
+	6       Sl       Ctx10      Sleeping         \".9 .1 .0\"		\n \
+	6       P       Ctx10       Progressing         \".4 .1 .6\"		\n \
 	6       A       MS      Allocating         \".4 .1 .0\"		\n \
 	6       Ar       MS      AllocatingReuse       \".1 .1 .8\"		\n \
 	6       R       MS      Reclaiming         \".0 .1 .4\"		\n \

+ 0 - 2
src/sched_policies/heft.c

@@ -303,7 +303,6 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 				local_data_penalty[worker_ctx] = starpu_task_expected_data_transfer_time(memory_node, task);
 				local_power[worker_ctx] = starpu_task_expected_power(task, perf_arch, nimpl);
 				//_STARPU_DEBUG("Scheduler heft bundle: task length (%lf) local power (%lf) worker (%u) kernel (%u) \n", local_task_length[worker_ctx],local_power[worker_ctx],worker,nimpl);
-				//				printf("%d/%d: task length (%lf) exp_end (%lf) local_data_penalty (%lf)\n", worker, worker_ctx, local_task_length[worker_ctx], (exp_start[worker] + exp_len[worker] + local_task_length[worker_ctx]), local_data_penalty[worker_ctx]);
 			}
 			
 			double ntasks_end = ntasks[worker] / starpu_worker_get_relative_speedup(perf_arch);
@@ -468,7 +467,6 @@ static int _heft_push_task(struct starpu_task *task, unsigned prio, unsigned sch
 
 	if(workers->init_cursor)
 		workers->deinit_cursor(workers);
-
 	return push_task_on_best_worker(task, best, model_best, prio, sched_ctx_id);
 }
 

+ 2 - 2
src/worker_collection/worker_list.c

@@ -7,9 +7,9 @@ static unsigned list_has_next(struct worker_collection *workers)
 
 	int *cursor = (int*)pthread_getspecific(workers->cursor_key);
 
-	unsigned ret = *cursor < nworkers;
+	unsigned ret = cursor ? *cursor < nworkers : 0;
 
-	if(!ret) *cursor = 0;
+	if(!ret && cursor) *cursor = 0;
 
 	return ret;
 }