Andra Hugo 13 роки тому
батько
коміт
5b0e1abd0a

+ 46 - 29
sched_ctx_hypervisor/examples/sched_ctx_utils/sched_ctx_utils.c

@@ -88,10 +88,10 @@ void* start_bench(void *val){
 	{
 		pthread_mutex_lock(&mut);
 		if(first){
-			sched_ctx_hypervisor_ignore_ctx(p->ctx);
+		 	sched_ctx_hypervisor_ignore_ctx(p->ctx);
 			starpu_delete_sched_ctx(p->ctx, p->the_other_ctx);
 		}
-		
+
 		first = 0;
 		pthread_mutex_unlock(&mut);
 	}
@@ -151,6 +151,8 @@ void start_2benchs(void (*bench)(float*, unsigned, unsigned))
 	gettimeofday(&end, NULL);
 
 	pthread_mutex_destroy(&mut);
+//	sched_ctx_hypervisor_ignore_ctx(p1.ctx);
+//	sched_ctx_hypervisor_ignore_ctx(p2.ctx);
 
 	double timing = (double)((end.tv_sec - start.tv_sec)*1000000 + (end.tv_usec - start.tv_usec));
 	timing /= 1000000;
@@ -249,15 +251,15 @@ void construct_contexts(void (*bench)(float*, unsigned, unsigned))
 	sched_ctx_hypervisor_handle_ctx(p1.ctx);
 	
 	sched_ctx_hypervisor_ioctl(p1.ctx,
-				   HYPERVISOR_MAX_IDLE, p1.procs, p1.nprocs, 100000.0,
+				   HYPERVISOR_MAX_IDLE, p1.procs, p1.nprocs, 1000000.0,
 				   HYPERVISOR_MAX_IDLE, p1.procs, gpu+gpu1, 100000000.0,
 				   HYPERVISOR_MIN_WORKING, p1.procs, p1.nprocs, 200.0,
 //				   HYPERVISOR_PRIORITY, p1.procs, p1.nprocs, 1,
 				   HYPERVISOR_PRIORITY, p1.procs, gpu+gpu1, 2,
-				   HYPERVISOR_MIN_PROCS, 1,
+				   HYPERVISOR_MIN_PROCS, 0,
 				   HYPERVISOR_MAX_PROCS, 11,
-				   HYPERVISOR_GRANULARITY, 4,
-				   HYPERVISOR_FIXED_PROCS, p1.procs, gpu,
+				   HYPERVISOR_GRANULARITY, 2,
+//				   HYPERVISOR_FIXED_PROCS, p1.procs, gpu,
 				   HYPERVISOR_MIN_TASKS, 10000,
 				   HYPERVISOR_NEW_WORKERS_MAX_IDLE, 1000000.0,
 				   NULL);
@@ -285,9 +287,9 @@ void construct_contexts(void (*bench)(float*, unsigned, unsigned))
 				   HYPERVISOR_MIN_WORKING, p2.procs, p2.nprocs, 200.0,
 //				   HYPERVISOR_PRIORITY, p2.procs, p2.nprocs, 1,
 				   HYPERVISOR_PRIORITY, p2.procs, gpu+gpu2, 2,
-				   HYPERVISOR_MIN_PROCS, 1,
+				   HYPERVISOR_MIN_PROCS, 0,
 				   HYPERVISOR_MAX_PROCS, 11,
-				   HYPERVISOR_GRANULARITY, 3,
+				   HYPERVISOR_GRANULARITY, 4,
 				   HYPERVISOR_FIXED_PROCS, p2.procs, gpu,
 				   HYPERVISOR_MIN_TASKS, 10000,
 				   HYPERVISOR_NEW_WORKERS_MAX_IDLE, 100000.0,
@@ -297,34 +299,49 @@ void construct_contexts(void (*bench)(float*, unsigned, unsigned))
 void set_hypervisor_conf(int event, int task_tag)
 {
 	unsigned *id = pthread_getspecific(key);
-	pthread_mutex_lock(&mut);
 	int reset_conf = 1;
-	pthread_mutex_unlock(&mut);
+	pthread_mutex_lock(&mut);
 	reset_conf = first;
+	pthread_mutex_unlock(&mut);
 
 
-	if(*id == 1 && reset_conf)
+	if(reset_conf)
 	{
-		double  max_idle_time_big = 0, max_idle_time_small;
-		if(event == START_BENCH)
-		{
-			max_idle_time_big = 1000.0;
-			max_idle_time_small = 1000000.0;
-		}
-		else
-		{
-			max_idle_time_big = 10000000.0;
-			max_idle_time_small = 1000.0;
+		if(*id == 1)
+		{			
+			if(event == START_BENCH)
+			{
+				//sched_ctx_hypervisor_request(p2.ctx, p2.procs, p2.nprocs, task_tag);
+				/* sched_ctx_hypervisor_ioctl(p2.ctx, */
+				/* 			   HYPERVISOR_MAX_IDLE, p2.procs, p2.nprocs, 10000000.0, */
+				/* 			   HYPERVISOR_TIME_TO_APPLY, task_tag, */
+				/* 			   HYPERVISOR_GRANULARITY, 1, */
+				/* 			   NULL); */
+			}
+		} else {
+
+			if(event == START_BENCH)
+			{
+				int procs[12] = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11};
+				sched_ctx_hypervisor_request(p1.ctx, procs, 12, task_tag);
+				/* sched_ctx_hypervisor_ioctl(p1.ctx, */
+				/* 			   HYPERVISOR_MAX_IDLE, procs, 12, 1000.0, */
+				/* 			   HYPERVISOR_MAX_IDLE, procs, 3, 1000000.0, */
+				/* 			   HYPERVISOR_TIME_TO_APPLY, task_tag, */
+				/* 			   HYPERVISOR_GRANULARITY, 4, */
+				/* 			   NULL); */
+			}
+			else
+			{
+				/* int procs[12] = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}; */
+				/* sched_ctx_hypervisor_ioctl(p1.ctx, */
+				/* 			   HYPERVISOR_MAX_IDLE, procs, 12, 1000.0, */
+				/* 			   HYPERVISOR_TIME_TO_APPLY, task_tag, */
+				/* 			   HYPERVISOR_GRANULARITY, 4, */
+				/* 			   NULL); */
+			}
 
 		}
- 		
-
-		sched_ctx_hypervisor_advise(p2.ctx, p2.procs, p2.nprocs, task_tag);
-		sched_ctx_hypervisor_ioctl(p2.ctx,
-					   HYPERVISOR_MAX_IDLE, p2.procs, p2.nprocs, max_idle_time_small,
-					   HYPERVISOR_TIME_TO_APPLY, task_tag,
-					   HYPERVISOR_GRANULARITY, 1,
-					   NULL);
 	}
 }
 

+ 1 - 1
sched_ctx_hypervisor/include/sched_ctx_hypervisor.h

@@ -39,7 +39,7 @@ void sched_ctx_hypervisor_ioctl(unsigned sched_ctx, ...);
 
 void sched_ctx_hypervisor_advise(unsigned sched_ctx, int *workers, int nworkers, int task_tag);
 
-struct sched_ctx_hypervisor_reply* sched_ctx_hypervisor_request(unsigned sched_ctx, int *workers, int nworkers);
+void sched_ctx_hypervisor_request(unsigned sched_ctx, int *workers, int nworkers, int task_tag);
 
 /* hypervisor policies */
 #define SIMPLE_POLICY 1

+ 18 - 27
sched_ctx_hypervisor/src/hypervisor_policies/simple_policy.c

@@ -32,14 +32,6 @@ struct simple_policy_config {
 	double new_workers_max_idle;
 };
 
-static void simple_init(void)
-{
-}
-
-static void simple_deinit(void)
-{
-}
-
 static struct simple_policy_config* _create_config(void)
 {
 	struct simple_policy_config *config = (struct simple_policy_config *)malloc(sizeof(struct simple_policy_config));
@@ -92,7 +84,7 @@ static int _compute_priority(unsigned sched_ctx)
 static unsigned _get_highest_priority_sched_ctx(unsigned req_sched_ctx, int *sched_ctxs, int nsched_ctxs)
 {
 	int i;
-	int highest_priority = 0;
+	int highest_priority = -1;
 	int current_priority = 0;
 	unsigned sched_ctx = STARPU_NMAX_SCHED_CTXS;
 
@@ -198,30 +190,29 @@ static void simple_manage_idle_time(unsigned req_sched_ctx, int *sched_ctxs, int
 			unsigned nworkers_to_move = 0;
 			
 			/* leave at least one */
-			unsigned potential_moving_workers = _get_potential_nworkers(config, req_sched_ctx) - 1;
-			
-			if(potential_moving_workers > config->granularity)
-			{
-				if((nworkers - config->granularity) > config->min_nprocs)	
-					nworkers_to_move = config->granularity;
-			}
-			else
+			int potential_moving_workers = _get_potential_nworkers(config, req_sched_ctx) - 1;
+			if(potential_moving_workers > 0)
 			{
-				int nfixed_workers = nworkers - potential_moving_workers;
-				if(nfixed_workers >= config->min_nprocs)
-					nworkers_to_move = potential_moving_workers;
+				if(potential_moving_workers > config->granularity)
+				{
+					if((nworkers - config->granularity) > config->min_nprocs)	
+						nworkers_to_move = config->granularity;
+				}
 				else
-					nworkers_to_move = potential_moving_workers - (config->min_nprocs - nfixed_workers);			
+				{
+					int nfixed_workers = nworkers - potential_moving_workers;
+					if(nfixed_workers >= config->min_nprocs)
+						nworkers_to_move = potential_moving_workers;
+					else
+						nworkers_to_move = potential_moving_workers - (config->min_nprocs - nfixed_workers);			
+				}
 			}
-			
 			if(nworkers_to_move > 0)
 			{
 				unsigned prio_sched_ctx = _get_highest_priority_sched_ctx(req_sched_ctx, sched_ctxs, nsched_ctxs);
 				if(prio_sched_ctx != STARPU_NMAX_SCHED_CTXS)
-				{
-					
+				{					
 					int *workers_to_move = _get_first_workers(req_sched_ctx, nworkers_to_move);
-					
 					sched_ctx_hypervisor_resize(req_sched_ctx, prio_sched_ctx, workers_to_move, nworkers_to_move);
 
 					struct simple_policy_config *prio_config = (struct simple_policy_config*)sched_ctx_hypervisor_get_config(prio_sched_ctx);
@@ -334,8 +325,8 @@ static void simple_remove_sched_ctx(unsigned sched_ctx)
 }
 
 struct hypervisor_policy simple_policy = {
-	.init = simple_init,
-	.deinit = simple_deinit,
+	.init = NULL,
+	.deinit = NULL,
 	.add_sched_ctx = simple_add_sched_ctx,
 	.remove_sched_ctx = simple_remove_sched_ctx,
 	.ioctl = simple_ioctl,

+ 99 - 28
sched_ctx_hypervisor/src/sched_ctx_hypervisor.c

@@ -47,7 +47,6 @@ struct starpu_sched_ctx_hypervisor_criteria* sched_ctx_hypervisor_init(int type)
 
 	_load_hypervisor_policy(type);
 
-	hypervisor.policy.init();
 	criteria = (struct starpu_sched_ctx_hypervisor_criteria*)malloc(sizeof(struct starpu_sched_ctx_hypervisor_criteria));
 	criteria->idle_time_cb = idle_time_cb;
 	criteria->pushed_task_cb = pushed_task_cb;
@@ -59,7 +58,6 @@ struct starpu_sched_ctx_hypervisor_criteria* sched_ctx_hypervisor_init(int type)
 void sched_ctx_hypervisor_shutdown(void)
 {
 	hypervisor.resize = 0;
-	hypervisor.policy.deinit();
 	free(criteria);
 	pthread_mutex_destroy(&act_hypervisor_mutex);
 }
@@ -68,6 +66,7 @@ void sched_ctx_hypervisor_handle_ctx(unsigned sched_ctx)
 {	
 	hypervisor.configurations[sched_ctx] = (struct starpu_htbl32_node_s*)malloc(sizeof(struct starpu_htbl32_node_s));
 	hypervisor.advices[sched_ctx] = (struct starpu_htbl32_node_s*)malloc(sizeof(struct starpu_htbl32_node_s));
+	hypervisor.requests[sched_ctx] = (struct starpu_htbl32_node_s*)malloc(sizeof(struct starpu_htbl32_node_s));
 
 	hypervisor.policy.add_sched_ctx(sched_ctx);
 	hypervisor.sched_ctx_w[sched_ctx].sched_ctx = sched_ctx;
@@ -125,6 +124,7 @@ void sched_ctx_hypervisor_ignore_ctx(unsigned sched_ctx)
 	hypervisor.policy.remove_sched_ctx(sched_ctx);
 	free(hypervisor.configurations[sched_ctx]);
 	free(hypervisor.advices[sched_ctx]);
+	free(hypervisor.requests[sched_ctx]);
 }
 
 void sched_ctx_hypervisor_set_config(unsigned sched_ctx, void *config)
@@ -219,11 +219,13 @@ static unsigned check_tasks_of_sched_ctx(unsigned sched_ctx)
 void sched_ctx_hypervisor_resize(unsigned sender_sched_ctx, unsigned receiver_sched_ctx, int* workers_to_move, unsigned nworkers_to_move)
 {
 	if(hypervisor.resize)
+	{
 		_sched_ctx_hypervisor_resize(sender_sched_ctx, receiver_sched_ctx, workers_to_move, nworkers_to_move);
 
-	int i;
-	for(i = 0; i < nworkers_to_move; i++)
-		hypervisor.sched_ctx_w[sender_sched_ctx].current_idle_time[workers_to_move[i]] = 0.0;
+		int i;
+		for(i = 0; i < nworkers_to_move; i++)
+			hypervisor.sched_ctx_w[sender_sched_ctx].current_idle_time[workers_to_move[i]] = 0.0;
+	}
 
 	return;
 }
@@ -249,34 +251,92 @@ void sched_ctx_hypervisor_advise(unsigned sched_ctx, int *workerids, int nworker
 	}
 	else
 	{
-
-		struct sched_ctx_hypervisor_advice* advice = (struct sched_ctx_hypervisor_advice*)malloc(sizeof(struct sched_ctx_hypervisor_advice));
+		struct sched_ctx_hypervisor_adjustment* adjustment = (struct sched_ctx_hypervisor_adjustment*)malloc(sizeof(struct sched_ctx_hypervisor_adjustment));
 		int i;
 		for(i = 0; i < nworkers; i++)
-			advice->workerids[i] = workerids[i];
-		advice->nworkers = nworkers;
+			adjustment->workerids[i] = workerids[i];
+		adjustment->nworkers = nworkers;
 		
-		_starpu_htbl_insert_32(&hypervisor.advices[sched_ctx], (uint32_t)task_tag, (void*)advice);			
+		_starpu_htbl_insert_32(&hypervisor.advices[sched_ctx], (uint32_t)task_tag, (void*)adjustment);	
 	}
 
 	return;
 }
 
-struct sched_ctx_hypervisor_reply* sched_ctx_hypervisor_request(unsigned sched_ctx, int *workers, int nworkers)
+void get_overage_workers(unsigned sched_ctx, int *workerids, int nworkers, int *overage_workers, int *noverage_workers)
 {
-	if(hypervisor.sched_ctx_w[sched_ctx].sched_ctx != STARPU_NMAX_SCHED_CTXS)
+	struct worker_collection *workers = starpu_get_worker_collection_of_sched_ctx(sched_ctx);
+	int worker, i, found = -1;
+
+	if(workers->init_cursor)
+		workers->init_cursor(workers);
+
+	while(workers->has_next(workers))
 	{
+		worker = workers->get_next(workers);
+		for(i = 0; i < nworkers; i++)
+			if(workerids[i] == worker)
+			{
+				found = worker;
+				break;
+			}
+		if(found == -1)
+			overage_workers[(*noverage_workers)++]  = worker;
+		found = -1;
 	}
-	return NULL;
+
+	if(workers->init_cursor)
+		workers->deinit_cursor(workers);
 }
 
-static void idle_time_cb(unsigned sched_ctx, int worker, double idle_time)
+void sched_ctx_hypervisor_request(unsigned sched_ctx, int *workerids, int nworkers, int task_tag)
 {
-	hypervisor.sched_ctx_w[sched_ctx].current_idle_time[worker] += idle_time;
+	/* do it right now */
+	if(task_tag == -1)	
+	{
+		pthread_mutex_lock(&act_hypervisor_mutex);
+		
+		if(hypervisor.sched_ctx_w[sched_ctx].sched_ctx != STARPU_NMAX_SCHED_CTXS)
+		{
+			printf("do request\n");
 
-	if(hypervisor.nsched_ctxs > 1 && hypervisor.policy.manage_idle_time)
-		hypervisor.policy.manage_idle_time(sched_ctx, hypervisor.sched_ctxs, hypervisor.nsched_ctxs, worker, hypervisor.sched_ctx_w[sched_ctx].current_idle_time[worker]);
+			int overage_workers[STARPU_NMAXWORKERS];
+			int noverage_workers = 0;
+			get_overage_workers(sched_ctx, workerids, nworkers, overage_workers, &noverage_workers);
+			starpu_add_workers_to_sched_ctx(workerids, nworkers, sched_ctx);
 
+			if(noverage_workers > 0)
+				starpu_remove_workers_from_sched_ctx(overage_workers, noverage_workers, sched_ctx);
+			
+			int i;
+			for(i = 0; i < hypervisor.nsched_ctxs; i++)
+				if(hypervisor.sched_ctxs[i] != sched_ctx && hypervisor.sched_ctxs[i] != STARPU_NMAX_SCHED_CTXS)
+					starpu_remove_workers_from_sched_ctx(workerids, nworkers, hypervisor.sched_ctxs[i]);
+		}
+		
+		pthread_mutex_unlock(&act_hypervisor_mutex);
+	}
+	else
+	{
+		struct sched_ctx_hypervisor_adjustment* adjustment = (struct sched_ctx_hypervisor_adjustment*)malloc(sizeof(struct sched_ctx_hypervisor_adjustment));
+		int i;
+		for(i = 0; i < nworkers; i++)
+			adjustment->workerids[i] = workerids[i];
+		adjustment->nworkers = nworkers;
+		
+		_starpu_htbl_insert_32(&hypervisor.requests[sched_ctx], (uint32_t)task_tag, (void*)adjustment);	
+	}
+
+	return ;
+}
+
+static void idle_time_cb(unsigned sched_ctx, int worker, double idle_time)
+{
+	if(hypervisor.resize && hypervisor.nsched_ctxs > 1 && hypervisor.policy.manage_idle_time)
+	{
+		hypervisor.sched_ctx_w[sched_ctx].current_idle_time[worker] += idle_time;
+		hypervisor.policy.manage_idle_time(sched_ctx, hypervisor.sched_ctxs, hypervisor.nsched_ctxs, worker, hypervisor.sched_ctx_w[sched_ctx].current_idle_time[worker]);
+	}
 	return;
 }
 
@@ -297,23 +357,34 @@ static void pushed_task_cb(unsigned sched_ctx, int worker)
 
 static void poped_task_cb(unsigned sched_ctx, int worker)
 {
-	hypervisor.sched_ctx_w[sched_ctx].poped_tasks[worker]++;
-	int npoped_tasks = get_ntasks(hypervisor.sched_ctx_w[sched_ctx].poped_tasks);
-       	int ntasks = get_ntasks(hypervisor.sched_ctx_w[sched_ctx].tasks);
-	hypervisor.resize = ((ntasks - npoped_tasks) > 0);
+	/* hypervisor.sched_ctx_w[sched_ctx].poped_tasks[worker]++; */
+	/* int npoped_tasks = get_ntasks(hypervisor.sched_ctx_w[sched_ctx].poped_tasks); */
+       	/* int ntasks = get_ntasks(hypervisor.sched_ctx_w[sched_ctx].tasks); */
+	/* hypervisor.resize = ((ntasks - npoped_tasks) > 0); */
 }
 
 static void post_exec_hook_cb(unsigned sched_ctx, int task_tag)
 {
 	STARPU_ASSERT(task_tag > 0);
-	void *config = _starpu_htbl_search_32(hypervisor.configurations[sched_ctx], (uint32_t)task_tag);
-	if(config != NULL)	
-		sched_ctx_hypervisor_set_config(sched_ctx, config);
 
-	struct sched_ctx_hypervisor_advice *advice = (struct sched_ctx_hypervisor_advice*) _starpu_htbl_search_32(hypervisor.advices[sched_ctx], (uint32_t)task_tag);
-	if(advice)
+	if(hypervisor.nsched_ctxs > 1)
 	{
-		sched_ctx_hypervisor_advise(sched_ctx, advice->workerids, advice->nworkers, -1);
-		free(advice);
+		void *config = _starpu_htbl_search_32(hypervisor.configurations[sched_ctx], (uint32_t)task_tag);
+		if(config != NULL)	
+			sched_ctx_hypervisor_set_config(sched_ctx, config);
+		
+		struct sched_ctx_hypervisor_adjustment *adjustment = (struct sched_ctx_hypervisor_adjustment*) _starpu_htbl_search_32(hypervisor.advices[sched_ctx], (uint32_t)task_tag);
+		if(adjustment)
+		{
+			sched_ctx_hypervisor_advise(sched_ctx, adjustment->workerids, adjustment->nworkers, -1);
+			free(adjustment);
+		}
+		
+		adjustment = (struct sched_ctx_hypervisor_adjustment*) _starpu_htbl_search_32(hypervisor.requests[sched_ctx], (uint32_t)task_tag);
+		if(adjustment)
+		{
+			sched_ctx_hypervisor_request(sched_ctx, adjustment->workerids, adjustment->nworkers, -1);
+			free(adjustment);
+		}
 	}
 }

+ 2 - 1
sched_ctx_hypervisor/src/sched_ctx_hypervisor_intern.h

@@ -17,9 +17,10 @@ struct sched_ctx_hypervisor {
 	struct hypervisor_policy policy;
 	struct starpu_htbl32_node_s *configurations[STARPU_NMAX_SCHED_CTXS];
 	struct starpu_htbl32_node_s *advices[STARPU_NMAX_SCHED_CTXS];
+	struct starpu_htbl32_node_s *requests[STARPU_NMAX_SCHED_CTXS];
 };
 
-struct sched_ctx_hypervisor_advice {
+struct sched_ctx_hypervisor_adjustment {
 	int workerids[STARPU_NMAXWORKERS];
 	int nworkers;
 };

+ 43 - 6
src/core/sched_ctx.c

@@ -102,7 +102,8 @@ static void _starpu_add_workers_to_sched_ctx(struct starpu_sched_ctx *sched_ctx,
 {
 	struct worker_collection *workers = sched_ctx->workers;
 	struct starpu_machine_config_s *config = (struct starpu_machine_config_s *)_starpu_get_machine_config();
-	
+	int init_nworkers = sched_ctx->workers->nworkers;
+
 	int nworkers_to_add = nworkers == -1 ? config->topology.nworkers : nworkers;
 	int workers_to_add[nworkers_to_add];
 
@@ -132,6 +133,7 @@ static void _starpu_add_workers_to_sched_ctx(struct starpu_sched_ctx *sched_ctx,
 	}
 	else
 		sched_ctx->sched_policy->add_workers(sched_ctx->id, workers_to_add, nworkers_to_add);		
+
 	return;
 }
 
@@ -143,9 +145,12 @@ static void _starpu_remove_workers_from_sched_ctx(struct starpu_sched_ctx *sched
 	int i = 0;
 	for(i = 0; i < nworkers; i++)
 	{
-		int worker = workers->remove(workers, workerids[i]);
-		if(worker >= 0)
-			removed_workers[(*n_removed_workers)++] = worker;
+		if(workers->nworkers > 0)
+		{
+			int worker = workers->remove(workers, workerids[i]);
+			if(worker >= 0)
+				removed_workers[(*n_removed_workers)++] = worker;
+		}
 	}
 					   
 	return;
@@ -167,6 +172,11 @@ struct starpu_sched_ctx*  _starpu_create_sched_ctx(const char *policy_name, int
 	STARPU_ASSERT(nworkers_ctx <= nworkers);
   
 	PTHREAD_MUTEX_INIT(&sched_ctx->changing_ctx_mutex, NULL);
+ 	PTHREAD_MUTEX_INIT(&sched_ctx->no_workers_mutex, NULL);
+	PTHREAD_COND_INIT(&sched_ctx->no_workers_cond, NULL);
+	PTHREAD_MUTEX_INIT(&sched_ctx->empty_ctx_mutex, NULL);
+
+	starpu_task_list_init(&sched_ctx->empty_ctx_tasks);
 
 	sched_ctx->sched_policy = (struct starpu_sched_policy_s*)malloc(sizeof(struct starpu_sched_policy_s));
 	sched_ctx->is_initial_sched = is_initial_sched;
@@ -178,6 +188,7 @@ struct starpu_sched_ctx*  _starpu_create_sched_ctx(const char *policy_name, int
 	sched_ctx->sched_mutex = (pthread_mutex_t**)malloc(STARPU_NMAXWORKERS * sizeof(pthread_mutex_t*));
 	sched_ctx->sched_cond = (pthread_cond_t**)malloc(STARPU_NMAXWORKERS * sizeof(pthread_cond_t*));
 
+	
 	/*init the strategy structs and the worker_collection of the ressources of the context */
 	_starpu_init_sched_policy(config, sched_ctx, policy_name);
 
@@ -243,6 +254,9 @@ static void free_sched_ctx_mem(struct starpu_sched_ctx *sched_ctx)
 	sched_ctx->sched_mutex = NULL;
 	sched_ctx->sched_cond = NULL;
 
+	PTHREAD_MUTEX_DESTROY(&sched_ctx->no_workers_mutex);
+	PTHREAD_COND_DESTROY(&sched_ctx->no_workers_cond);
+
 	struct starpu_machine_config_s *config = _starpu_get_machine_config();
 	config->topology.nsched_ctxs--;
 	sched_ctx->id = STARPU_NMAX_SCHED_CTXS;
@@ -263,12 +277,14 @@ void starpu_delete_sched_ctx(unsigned sched_ctx_id, unsigned inheritor_sched_ctx
 	struct starpu_machine_config_s *config = (struct starpu_machine_config_s *)_starpu_get_machine_config();
 	int nworkers = config->topology.nworkers;
 
-	if(!(sched_ctx->workers->nworkers == nworkers && sched_ctx->workers->nworkers == inheritor_sched_ctx->workers->nworkers) && sched_ctx->workers->nworkers > 0)
+	if(!(sched_ctx->workers->nworkers == nworkers && sched_ctx->workers->nworkers == inheritor_sched_ctx->workers->nworkers) && sched_ctx->workers->nworkers > 0 && inheritor_sched_ctx != STARPU_NMAX_SCHED_CTXS)
 		starpu_add_workers_to_sched_ctx(sched_ctx->workers->workerids, sched_ctx->workers->nworkers, inheritor_sched_ctx_id);
 	
 	if(!starpu_wait_for_all_tasks_of_sched_ctx(sched_ctx_id))
 	{
+		PTHREAD_MUTEX_LOCK(&sched_ctx->changing_ctx_mutex);
 		free_sched_ctx_mem(sched_ctx);
+		PTHREAD_MUTEX_UNLOCK(&sched_ctx->changing_ctx_mutex);
 	}
 
 	return;	
@@ -309,6 +325,7 @@ static void _starpu_check_workers(int *workerids, int nworkers)
 void starpu_add_workers_to_sched_ctx(int *workers_to_add, int nworkers_to_add, unsigned sched_ctx_id)
 {
 	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	int init_workers = sched_ctx->workers->nworkers;
 	int added_workers[nworkers_to_add];
 	int n_added_workers = 0;
 
@@ -322,7 +339,27 @@ void starpu_add_workers_to_sched_ctx(int *workers_to_add, int nworkers_to_add, u
 		_starpu_update_workers(added_workers, n_added_workers, sched_ctx->id);
 
 	PTHREAD_MUTEX_UNLOCK(&sched_ctx->changing_ctx_mutex);
-       
+
+	if(n_added_workers > 0)
+	{
+		PTHREAD_MUTEX_LOCK(&sched_ctx->no_workers_mutex);
+		PTHREAD_COND_BROADCAST(&sched_ctx->no_workers_cond);
+		PTHREAD_MUTEX_UNLOCK(&sched_ctx->no_workers_mutex);
+	}
+
+	unsigned unlocked = 0;
+	PTHREAD_MUTEX_LOCK(&sched_ctx->empty_ctx_mutex);
+	while(!starpu_task_list_empty(&sched_ctx->empty_ctx_tasks))
+	{
+		struct starpu_task *old_task = starpu_task_list_pop_back(&sched_ctx->empty_ctx_tasks);
+		PTHREAD_MUTEX_UNLOCK(&sched_ctx->empty_ctx_mutex);
+		unlocked = 1;
+		starpu_job_t old_j = _starpu_get_job_associated_to_task(old_task);
+		_starpu_push_task(old_j, 1);
+	}
+	if(!unlocked)
+		PTHREAD_MUTEX_UNLOCK(&sched_ctx->empty_ctx_mutex);
+	
 	return;
 }
 

+ 12 - 0
src/core/sched_ctx.h

@@ -57,6 +57,18 @@ struct starpu_sched_ctx {
 	/* table of sched mutex corresponding to each worker in this ctx */
 	pthread_mutex_t **sched_mutex;
 
+	/* cond to block push when there are no workers in the ctx */
+	pthread_cond_t no_workers_cond;
+
+	/* mutex to block push when there are no workers in the ctx */
+	pthread_mutex_t no_workers_mutex;
+
+	/*ready tasks that couldn't be pushed because the ctx has no workers*/
+	struct starpu_task_list empty_ctx_tasks;
+
+	/* mutext protecting empty_ctx_tasks list */
+	pthread_mutex_t empty_ctx_mutex;
+
 #ifdef STARPU_USE_SCHED_CTX_HYPERVISOR
 	/* a structure containing a series of criteria determining the resize procedure */
 	struct starpu_sched_ctx_hypervisor_criteria *criteria;

+ 26 - 1
src/core/sched_policy.c

@@ -288,6 +288,32 @@ static int _starpu_push_task_on_specific_worker(struct starpu_task *task, int wo
 int _starpu_push_task(starpu_job_t j, unsigned job_is_already_locked)
 {
 	struct starpu_task *task = j->task;
+	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
+	int workerid = starpu_worker_get_id();
+	unsigned no_workers = 0;
+	unsigned nworkers; 
+	
+	PTHREAD_MUTEX_LOCK(&sched_ctx->changing_ctx_mutex);
+	nworkers = sched_ctx->workers->nworkers;
+	PTHREAD_MUTEX_UNLOCK(&sched_ctx->changing_ctx_mutex);
+
+	PTHREAD_MUTEX_LOCK(&sched_ctx->no_workers_mutex);
+	if(nworkers == 0)
+	{
+		no_workers = 1;
+		if(workerid == -1)
+			PTHREAD_COND_WAIT(&sched_ctx->no_workers_cond, &sched_ctx->no_workers_mutex);
+	}
+	PTHREAD_MUTEX_UNLOCK(&sched_ctx->no_workers_mutex);
+
+	if(workerid >= 0 && no_workers)
+	{
+		PTHREAD_MUTEX_LOCK(&sched_ctx->empty_ctx_mutex);
+		starpu_task_list_push_front(&sched_ctx->empty_ctx_tasks, task);
+		PTHREAD_MUTEX_UNLOCK(&sched_ctx->empty_ctx_mutex);
+		return 0;
+	}
+
         _STARPU_LOG_IN();
 
 	task->status = STARPU_TASK_READY;
@@ -310,7 +336,6 @@ int _starpu_push_task(starpu_job_t j, unsigned job_is_already_locked)
 	}
 	else 
 	{
-		struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
 		STARPU_ASSERT(sched_ctx->sched_policy->push_task);
 
 		ret = sched_ctx->sched_policy->push_task(task);

+ 2 - 2
src/debug/traces/starpu_paje.c

@@ -150,8 +150,8 @@ void starpu_fxt_write_paje_header(FILE *file)
 	6       R       MS      Reclaiming         \".0 .1 .4\"		\n \
 	6       Co       MS     DriverCopy         \".3 .5 .1\"		\n \
 	6       No       MS     Nothing         \".0 .0 .0\"		\n \
-	6       Ctx1       S     InCtx1         \"255.0 255.0 .0\"		\n \
-	6       Ctx2       S     InCtx2         \".0 191.0 255.0\"		\n \
+	6       Ctx1       S     InCtx1         \"255.0 255.0 0.0\"		\n \
+	6       Ctx2       S     InCtx2         \".0 255.0 .0\"		\n \
 	5       MPIL     MPIP	P	P      MPIL\n \
 	5       L       P	Mn	Mn      L\n");