Procházet zdrojové kódy

everyone has to delete the ctxs they created, remove workers does not deletes the corresponding queues in the scheduler,
changed parameteres for delete_sched_ctx (there's still a synchronisation issue, it will be fixed in the next commit)

Andra Hugo před 12 roky
rodič
revize
8c21b0e865

+ 4 - 2
examples/sched_ctx/sched_ctx.c

@@ -101,7 +101,7 @@ int main(int argc, char **argv)
 	   when its corresponding tasks finished executing */
 
 #warning TODO: to be fixed
-//	starpu_sched_ctx_finished_submit(sched_ctx1);
+	starpu_sched_ctx_finished_submit(sched_ctx1);
 
 	for (i = 0; i < ntasks/2; i++)
 	{
@@ -116,11 +116,13 @@ int main(int argc, char **argv)
 	}
 
 #warning TODO: to be fixed
-//	starpu_sched_ctx_finished_submit(sched_ctx2);
+	starpu_sched_ctx_finished_submit(sched_ctx2);
 
 	/* wait for all tasks at the end*/
 	starpu_task_wait_for_all();
 
+	starpu_sched_ctx_delete(sched_ctx1);
+	starpu_sched_ctx_delete(sched_ctx2);
 	printf("tasks executed %d out of %d\n", tasks_executed, ntasks);
 	starpu_shutdown();
 

+ 3 - 1
examples/sched_ctx_utils/sched_ctx_utils.c

@@ -103,7 +103,7 @@ void* start_bench(void *val)
 		pthread_mutex_lock(&mut);
 		if(first)
 		{
-			starpu_sched_ctx_delete(p->ctx, p->the_other_ctx);
+			starpu_sched_ctx_delete(p->ctx);
 		}
 
 		first = 0;
@@ -263,6 +263,8 @@ void construct_contexts(void (*bench)(unsigned, unsigned))
 	p2.ctx = starpu_sched_ctx_create("heft", procs2, nprocs2, "sched_ctx2");
 	p1.the_other_ctx = (int)p2.ctx;
 	p2.procs = procs2;
+	starpu_sched_ctx_set_inheritor(p1.ctx, p2.ctx);
+	starpu_sched_ctx_set_inheritor(p2.ctx, p1.ctx);
 	p2.nprocs = nprocs2;
 }
 

+ 1 - 1
include/starpu_sched_ctx.h

@@ -82,7 +82,7 @@ unsigned starpu_sched_ctx_create_inside_interval(const char *policy_name, const
 						 int min_ncpus, int max_ncpus, int min_ngpus, int max_ngpus,
 						 unsigned allow_overlap);
 
-void starpu_sched_ctx_delete(unsigned sched_ctx_id, unsigned inheritor_sched_ctx_id);
+void starpu_sched_ctx_delete(unsigned sched_ctx_id);
 
 void starpu_sched_ctx_add_workers(int *workerids_ctx, int nworkers_ctx, unsigned sched_ctx_id);
 

+ 161 - 108
src/core/sched_ctx.c

@@ -31,6 +31,8 @@ static unsigned _starpu_worker_get_first_free_sched_ctx(struct _starpu_worker *w
 
 static unsigned _starpu_worker_get_sched_ctx_id(struct _starpu_worker *worker, unsigned sched_ctx_id);
 
+static unsigned	_get_workers_list(struct starpu_sched_ctx_worker_collection *workers, int **workerids);
+
 static void change_worker_sched_ctx(unsigned sched_ctx_id)
 {
 	int workerid = starpu_worker_get_id();
@@ -44,10 +46,10 @@ static void change_worker_sched_ctx(unsigned sched_ctx_id)
 		struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
 		/* add context to worker */
 		worker->sched_ctx[worker_sched_ctx_id] = sched_ctx;
-		worker->nsched_ctxs++;
+		worker->nsched_ctxs++;	
 		worker->active_ctx = sched_ctx_id;
 	}
-	else
+	else 
 	{
 		/* remove context from worker */
 		if(worker->sched_ctx[worker_sched_ctx_id]->sched_policy)
@@ -83,19 +85,20 @@ static void _starpu_update_workers(int *workerids, int nworkers, int sched_ctx_i
 		worker[i] = _starpu_get_worker_struct(workerids[i]);
 
 		/* if the current thread requires resize it's no need
-		   to send itsefl a message in order to change its
+		   to send itsefl a message in order to change its 
 		   sched_ctx info */
 		if(curr_worker && curr_worker == worker[i])
 			change_worker_sched_ctx(sched_ctx_id);
 		else
-		{
+		{			
 			worker[i]->tasks[sched_ctx_id] = starpu_task_create();
 			worker[i]->tasks[sched_ctx_id]->cl = &sched_ctx_info_cl;
 			worker[i]->tasks[sched_ctx_id]->cl_arg = (void*)(uintptr_t)sched_ctx_id;
 			worker[i]->tasks[sched_ctx_id]->execute_on_a_specific_worker = 1;
 			worker[i]->tasks[sched_ctx_id]->workerid = workerids[i];
 			worker[i]->tasks[sched_ctx_id]->destroy = 1;
-			worker[i]->tasks[sched_ctx_id]->control_task = 1;
+//			worker[i]->tasks[sched_ctx_id]->sched_ctx = sched_ctx_id;
+			
 			int worker_sched_ctx_id = _starpu_worker_get_sched_ctx_id(worker[i], sched_ctx_id);
 			/* if the ctx is not in the worker's list it means the update concerns the addition of ctxs*/
 			if(worker_sched_ctx_id == STARPU_NMAX_SCHED_CTXS)
@@ -104,7 +107,7 @@ static void _starpu_update_workers(int *workerids, int nworkers, int sched_ctx_i
 			_starpu_exclude_task_from_dag(worker[i]->tasks[sched_ctx_id]);
 
 			_starpu_task_submit_internally(worker[i]->tasks[sched_ctx_id]);
-		}
+		}		
 	}
 }
 
@@ -114,7 +117,7 @@ void starpu_stop_task_submission()
 	_starpu_task_submit_internally(&stop_submission_task);
 }
 
-static void _starpu_add_workers_to_sched_ctx(struct _starpu_sched_ctx *sched_ctx, int *workerids, int nworkers,
+static void _starpu_add_workers_to_sched_ctx(struct _starpu_sched_ctx *sched_ctx, int *workerids, int nworkers, 
 				       int *added_workers, int *n_added_workers)
 {
 	struct starpu_sched_ctx_worker_collection *workers = sched_ctx->workers;
@@ -133,12 +136,12 @@ static void _starpu_add_workers_to_sched_ctx(struct _starpu_sched_ctx *sched_ctx
 			int worker = workers->add(workers, (workerids == NULL ? i : workerids[i]));
 			if(worker >= 0)
 			{
-				added_workers[(*n_added_workers)++] = worker;
+				added_workers[(*n_added_workers)++] = worker;		
 			}
 		}
 		else
 		{
-			int worker = (workerids == NULL ? i : workerids[i]);
+			int worker = (workerids == NULL ? i : workerids[i]); 
 			workers->add(workers, worker);
 			workers_to_add[i] = worker;
 		}
@@ -147,46 +150,21 @@ static void _starpu_add_workers_to_sched_ctx(struct _starpu_sched_ctx *sched_ctx
 	if(added_workers)
 	{
 		if(*n_added_workers > 0)
-			sched_ctx->sched_policy->add_workers(sched_ctx->id, added_workers, *n_added_workers);
+			sched_ctx->sched_policy->add_workers(sched_ctx->id, added_workers, *n_added_workers);	
 	}
 	else
-		sched_ctx->sched_policy->add_workers(sched_ctx->id, workers_to_add, nworkers_to_add);
+		sched_ctx->sched_policy->add_workers(sched_ctx->id, workers_to_add, nworkers_to_add);		
 
 	return;
 }
 
-static void _starpu_remove_workers_from_sched_ctx(struct _starpu_sched_ctx *sched_ctx, int *workerids,
+static void _starpu_remove_workers_from_sched_ctx(struct _starpu_sched_ctx *sched_ctx, int *workerids, 
 						  int nworkers, int *removed_workers, int *n_removed_workers)
 {
 	struct starpu_sched_ctx_worker_collection *workers = sched_ctx->workers;
 
 	int i = 0;
 
-
-	if(nworkers == -1)
-	{
-		int nrem_workers = 0;
-		int rem_workers[STARPU_NMAXWORKERS];
-
-		if(workers->init_cursor)
-			workers->init_cursor(workers);
-
-		int worker = -1;
-		while(workers->has_next(workers))
-		{
-			worker = workers->get_next(workers);
-			if(!starpu_worker_is_combined_worker(worker))
-				rem_workers[nrem_workers++] = worker;
-		}
-
-		if (workers->deinit_cursor)
-			workers->deinit_cursor(workers);
-
-		if(nrem_workers > 0)
-			sched_ctx->sched_policy->remove_workers(sched_ctx->id, rem_workers, nrem_workers);
-		return;
-	}
-
 	for(i = 0; i < nworkers; i++)
 	{
 		if(workers->nworkers > 0)
@@ -195,15 +173,29 @@ static void _starpu_remove_workers_from_sched_ctx(struct _starpu_sched_ctx *sche
 			if(worker >= 0)
 				removed_workers[(*n_removed_workers)++] = worker;
 		}
-		if(*n_removed_workers)
-			sched_ctx->sched_policy->remove_workers(sched_ctx->id, removed_workers, *n_removed_workers);
+/* 		if(*n_removed_workers) */
+/* 			sched_ctx->sched_policy->remove_workers(sched_ctx->id, removed_workers, *n_removed_workers); */
 	}
 
 	return;
 }
 
+static void _starpu_sched_ctx_free_scheduling_data(struct _starpu_sched_ctx *sched_ctx)
+{
+	unsigned nworkers_ctx = sched_ctx->workers->nworkers;
+	int *workerids = NULL;
+	
+	int is_list =_get_workers_list(sched_ctx->workers, &workerids);
+				
+	if(nworkers_ctx > 0)
+		sched_ctx->sched_policy->remove_workers(sched_ctx->id, workerids, nworkers_ctx);
+	if(!is_list)
+		free(workerids);
+	return;
+
+}
 
-struct _starpu_sched_ctx*  _starpu_create_sched_ctx(const char *policy_name, int *workerids,
+struct _starpu_sched_ctx*  _starpu_create_sched_ctx(const char *policy_name, int *workerids, 
 				  int nworkers_ctx, unsigned is_initial_sched,
 				  const char *sched_name)
 {
@@ -217,14 +209,13 @@ struct _starpu_sched_ctx*  _starpu_create_sched_ctx(const char *policy_name, int
 	struct _starpu_sched_ctx *sched_ctx = &config->sched_ctxs[id];
 	sched_ctx->id = id;
 
-	config->topology.nsched_ctxs++;
+	config->topology.nsched_ctxs++;	
 	_STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx_manag);
 
 	int nworkers = config->topology.nworkers;
-
+	
 	STARPU_ASSERT(nworkers_ctx <= nworkers);
-
-	_STARPU_PTHREAD_MUTEX_INIT(&sched_ctx->changing_ctx_mutex, NULL);
+  
 	_STARPU_PTHREAD_MUTEX_INIT(&sched_ctx->empty_ctx_mutex, NULL);
 
 	starpu_task_list_init(&sched_ctx->empty_ctx_tasks);
@@ -241,7 +232,7 @@ struct _starpu_sched_ctx*  _starpu_create_sched_ctx(const char *policy_name, int
 	sched_ctx->sched_mutex = (_starpu_pthread_mutex_t**)malloc(STARPU_NMAXWORKERS * sizeof(_starpu_pthread_mutex_t*));
 	sched_ctx->sched_cond = (_starpu_pthread_cond_t**)malloc(STARPU_NMAXWORKERS * sizeof(_starpu_pthread_cond_t*));
 
-
+	
 	/*init the strategy structs and the worker_collection of the ressources of the context */
 	_starpu_init_sched_policy(config, sched_ctx, policy_name);
 
@@ -258,6 +249,9 @@ struct _starpu_sched_ctx*  _starpu_create_sched_ctx(const char *policy_name, int
 	if(is_initial_sched)
 	{
 		int i;
+		/*initialize the mutexes for all contexts */
+		for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
+			_STARPU_PTHREAD_MUTEX_INIT(&changing_ctx_mutex[i], NULL);
 		for(i = 0; i < nworkers; i++)
 		{
 			struct _starpu_worker *worker = _starpu_get_worker_struct(i);
@@ -271,17 +265,17 @@ struct _starpu_sched_ctx*  _starpu_create_sched_ctx(const char *policy_name, int
 	{
 		sched_ctx->pop_counter[w] = 0;
 	}
-
+	
 	return sched_ctx;
 }
 
 static void _get_workers(int min, int max, int *workers, int *nw, enum starpu_archtype arch, unsigned allow_overlap)
 {
 	int pus[max];
-	int npus = 0;
+	int npus = 0; 
 	int i;
 	int n = 0;
-
+		
 	struct _starpu_machine_config *config = (struct _starpu_machine_config *)_starpu_get_machine_config();
 	if(config->topology.nsched_ctxs == 1)
 	{
@@ -295,10 +289,10 @@ static void _get_workers(int min, int max, int *workers, int *nw, enum starpu_ar
 	{
 		unsigned enough_ressources = 0;
 		npus = starpu_worker_get_nids_ctx_free_by_type(arch, pus, max);
-
+       
 		for(i = 0; i < npus; i++)
 			workers[(*nw)++] = pus[i];
-
+		
 		if(npus == max)
 			/*we have enough available resources */
 			enough_ressources = 1;
@@ -337,13 +331,13 @@ static void _get_workers(int min, int max, int *workers, int *nw, enum starpu_ar
 			if(npus >= min)
 				enough_ressources = 1;
 		}
-
+		
 		if(!enough_ressources)
 		{
-			/* if there is no available workers to satisfy the  minimum required
+			/* if there is no available workers to satisfy the  minimum required 
 			 give them workers proportional to their requirements*/
 			int global_npus = starpu_worker_get_count_by_type(arch);
-
+			
 			int req_npus = 0;
 
 			int s;
@@ -352,7 +346,7 @@ static void _get_workers(int min, int max, int *workers, int *nw, enum starpu_ar
 					req_npus += arch == STARPU_CPU_WORKER ? config->sched_ctxs[s].min_ncpus : config->sched_ctxs[s].min_ngpus;
 
 			req_npus += min;
-
+			
 			for(s = 1; s < STARPU_NMAX_SCHED_CTXS; s++)
 			{
 				if(config->sched_ctxs[s].id != STARPU_NMAX_SCHED_CTXS)
@@ -362,8 +356,8 @@ static void _get_workers(int min, int max, int *workers, int *nw, enum starpu_ar
 
 					int _npus = 0;
 					int _pus[STARPU_NMAXWORKERS];
-
-					_npus = starpu_get_workers_of_sched_ctx(config->sched_ctxs[s].id, _pus, arch);
+				
+					_npus = starpu_get_workers_of_sched_ctx(config->sched_ctxs[s].id, _pus, arch);					
 					if(needed_npus < (double)_npus)
 					{
 						double npus_to_rem = (double)_npus - needed_npus;
@@ -374,7 +368,7 @@ static void _get_workers(int min, int max, int *workers, int *nw, enum starpu_ar
 
 						int pus_to_remove[npus_to_remove];
 						int c = 0;
-
+						
 /*TODO: hierarchical ctxs: get npus_to_remove good workers: close to the other ones I already assigned to the ctx */
 						for(i = _npus-1; i >= (_npus - npus_to_remove); i--)
 						{
@@ -391,7 +385,7 @@ static void _get_workers(int min, int max, int *workers, int *nw, enum starpu_ar
 	}
 }
 
-unsigned starpu_sched_ctx_create_inside_interval(const char *policy_name, const char *sched_name,
+unsigned starpu_sched_ctx_create_inside_interval(const char *policy_name, const char *sched_name, 
 						 int min_ncpus, int max_ncpus, int min_ngpus, int max_ngpus,
 						 unsigned allow_overlap)
 {
@@ -412,15 +406,15 @@ unsigned starpu_sched_ctx_create_inside_interval(const char *policy_name, const
 	sched_ctx->max_ncpus = max_ncpus;
 	sched_ctx->min_ngpus = min_ngpus;
 	sched_ctx->max_ngpus = max_ngpus;
-
+	
 	_starpu_update_workers(sched_ctx->workers->workerids, sched_ctx->workers->nworkers, sched_ctx->id);
 #ifdef STARPU_USE_SCHED_CTX_HYPERVISOR
 	sched_ctx->perf_counters = NULL;
 #endif
 	return sched_ctx->id;
-
+	
 }
-unsigned starpu_sched_ctx_create(const char *policy_name, int *workerids,
+unsigned starpu_sched_ctx_create(const char *policy_name, int *workerids, 
 				 int nworkers, const char *sched_name)
 {
 	struct _starpu_sched_ctx *sched_ctx = NULL;
@@ -445,7 +439,8 @@ void starpu_set_perf_counters(unsigned sched_ctx_id, struct starpu_performance_c
 /* free all structures for the context */
 static void _starpu_delete_sched_ctx(struct _starpu_sched_ctx *sched_ctx)
 {
-	_starpu_deinit_sched_policy(sched_ctx);
+	STARPU_ASSERT(sched_ctx->id != STARPU_NMAX_SCHED_CTXS);
+	_starpu_deinit_sched_policy(sched_ctx);		
 	free(sched_ctx->sched_policy);
 	free(sched_ctx->sched_mutex);
 	free(sched_ctx->sched_cond);
@@ -454,40 +449,57 @@ static void _starpu_delete_sched_ctx(struct _starpu_sched_ctx *sched_ctx)
 	sched_ctx->sched_mutex = NULL;
 	sched_ctx->sched_cond = NULL;
 
-	_STARPU_PTHREAD_MUTEX_DESTROY(&sched_ctx->changing_ctx_mutex);
 	_STARPU_PTHREAD_MUTEX_DESTROY(&sched_ctx->empty_ctx_mutex);
+	sched_ctx->id = STARPU_NMAX_SCHED_CTXS;
 
 	struct _starpu_machine_config *config = _starpu_get_machine_config();
 	_STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx_manag);
 	config->topology.nsched_ctxs--;
-	sched_ctx->id = STARPU_NMAX_SCHED_CTXS;
 	_STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx_manag);
 }
 
-void starpu_sched_ctx_delete(unsigned sched_ctx_id, unsigned inheritor_sched_ctx_id)
+void starpu_sched_ctx_delete(unsigned sched_ctx_id)
 {
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	struct _starpu_sched_ctx *inheritor_sched_ctx = _starpu_get_sched_ctx_struct(inheritor_sched_ctx_id);
-
-	_STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->changing_ctx_mutex);
-	_starpu_update_workers(sched_ctx->workers->workerids, sched_ctx->workers->nworkers, sched_ctx->id);
-	_STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->changing_ctx_mutex);
+	unsigned inheritor_sched_ctx_id = sched_ctx->inheritor;
+	struct _starpu_sched_ctx *inheritor_sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx->inheritor);
+
+	_STARPU_PTHREAD_MUTEX_LOCK(&changing_ctx_mutex[sched_ctx_id]);
+	STARPU_ASSERT(sched_ctx->id != STARPU_NMAX_SCHED_CTXS);
+	unsigned nworkers_ctx = sched_ctx->workers->nworkers;
+	int *workerids;
+	unsigned is_list = _get_workers_list(sched_ctx->workers, &workerids);
+	_starpu_update_workers(workerids, nworkers_ctx, sched_ctx->id);
+	
+	if(!is_list)
+		free(workerids);
 
 	/*if both of them have all the ressources is pointless*/
 	/*trying to transfer ressources from one ctx to the other*/
 	struct _starpu_machine_config *config = (struct _starpu_machine_config *)_starpu_get_machine_config();
 	unsigned nworkers = config->topology.nworkers;
 
-	if(!(sched_ctx->workers->nworkers == nworkers && sched_ctx->workers->nworkers == inheritor_sched_ctx->workers->nworkers) && sched_ctx->workers->nworkers > 0 && inheritor_sched_ctx_id != STARPU_NMAX_SCHED_CTXS)
+	if(nworkers_ctx > 0 && inheritor_sched_ctx_id != STARPU_NMAX_SCHED_CTXS && 
+	   !(nworkers_ctx == nworkers && nworkers_ctx == inheritor_sched_ctx->workers->nworkers))
 	{
-		starpu_sched_ctx_add_workers(sched_ctx->workers->workerids, sched_ctx->workers->nworkers, inheritor_sched_ctx_id);
+		_STARPU_PTHREAD_MUTEX_UNLOCK(&changing_ctx_mutex[sched_ctx_id]);
+		starpu_sched_ctx_add_workers(workerids, nworkers_ctx, inheritor_sched_ctx_id);
+
 	}
+	else 
+		_STARPU_PTHREAD_MUTEX_UNLOCK(&changing_ctx_mutex[sched_ctx_id]);
 
 	if(!_starpu_wait_for_all_tasks_of_sched_ctx(sched_ctx_id) && !_starpu_wait_for_all_tasks_of_sched_ctx(0))
 	{
+		_STARPU_PTHREAD_MUTEX_LOCK(&changing_ctx_mutex[sched_ctx_id]);
+		/*if btw the mutex release & the mutex lock the context has changed take care to free all 
+		  scheduling data before deleting the context */
+		_starpu_sched_ctx_free_scheduling_data(sched_ctx);
 		_starpu_delete_sched_ctx(sched_ctx);
+
+		_STARPU_PTHREAD_MUTEX_UNLOCK(&changing_ctx_mutex[sched_ctx_id]);
 	}
-	return;
+	return;	
 }
 
 /* called after the workers are terminated so we don't have anything else to do but free the memory*/
@@ -497,27 +509,34 @@ void _starpu_delete_all_sched_ctxs()
 	for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
 	{
 		struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(i);
+		_STARPU_PTHREAD_MUTEX_LOCK(&changing_ctx_mutex[i]);
 		if(sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
 		{
-			_starpu_remove_workers_from_sched_ctx(sched_ctx, NULL, -1, NULL, NULL);
-			_starpu_barrier_counter_destroy(&sched_ctx->tasks_barrier);
-			_starpu_delete_sched_ctx(sched_ctx);
+			if(_starpu_wait_for_all_tasks_of_sched_ctx(sched_ctx->id) && _starpu_wait_for_all_tasks_of_sched_ctx(0))
+			{
+				_starpu_sched_ctx_free_scheduling_data(sched_ctx);
+				_starpu_barrier_counter_destroy(&sched_ctx->tasks_barrier);
+				_starpu_delete_sched_ctx(sched_ctx);
+			}
 		}
+		_STARPU_PTHREAD_MUTEX_UNLOCK(&changing_ctx_mutex[i]);
+		
+		_STARPU_PTHREAD_MUTEX_DESTROY(&changing_ctx_mutex[i]);
 	}
 	return;
 }
 
 static void _starpu_check_workers(int *workerids, int nworkers)
 {
-        struct _starpu_machine_config *config = (struct _starpu_machine_config *)_starpu_get_machine_config();
-        int nworkers_conf = config->topology.nworkers;
+	struct _starpu_machine_config *config = (struct _starpu_machine_config *)_starpu_get_machine_config();
+	int nworkers_conf = config->topology.nworkers;
 
 	int i;
 	for(i = 0; i < nworkers; i++)
 	{
 		/* take care the user does not ask for a resource that does not exist */
 		STARPU_ASSERT(workerids[i] >= 0 &&  workerids[i] <= nworkers_conf);
-	}
+	}		
 }
 
 void _starpu_fetch_tasks_from_empty_ctx_list(struct _starpu_sched_ctx *sched_ctx)
@@ -531,7 +550,7 @@ void _starpu_fetch_tasks_from_empty_ctx_list(struct _starpu_sched_ctx *sched_ctx
 		struct starpu_task *old_task = starpu_task_list_pop_back(&sched_ctx->empty_ctx_tasks);
 		unlocked = 1;
 		_STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->empty_ctx_mutex);
-
+		
 		if(old_task == &stop_submission_task)
 			break;
 
@@ -554,16 +573,20 @@ void starpu_sched_ctx_add_workers(int *workers_to_add, int nworkers_to_add, unsi
 	STARPU_ASSERT(workers_to_add != NULL && nworkers_to_add > 0);
 	_starpu_check_workers(workers_to_add, nworkers_to_add);
 
-	_STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->changing_ctx_mutex);
-	_starpu_add_workers_to_sched_ctx(sched_ctx, workers_to_add, nworkers_to_add, added_workers, &n_added_workers);
-
-	if(n_added_workers > 0)
+	_STARPU_PTHREAD_MUTEX_LOCK(&changing_ctx_mutex[sched_ctx_id]);
+	/* if the context has not already been deleted */
+	if(sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
 	{
-		_starpu_update_workers(added_workers, n_added_workers, sched_ctx->id);
-	}
-
-	_STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->changing_ctx_mutex);
 
+		_starpu_add_workers_to_sched_ctx(sched_ctx, workers_to_add, nworkers_to_add, added_workers, &n_added_workers);
+		
+		if(n_added_workers > 0)
+		{
+			_starpu_update_workers(added_workers, n_added_workers, sched_ctx->id);
+		}
+	}
+	_STARPU_PTHREAD_MUTEX_UNLOCK(&changing_ctx_mutex[sched_ctx_id]);
+	
 	_starpu_fetch_tasks_from_empty_ctx_list(sched_ctx);
 
 	return;
@@ -577,13 +600,17 @@ void starpu_sched_ctx_remove_workers(int *workers_to_remove, int nworkers_to_rem
 
 	_starpu_check_workers(workers_to_remove, nworkers_to_remove);
 
-	_STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->changing_ctx_mutex);
-	_starpu_remove_workers_from_sched_ctx(sched_ctx, workers_to_remove, nworkers_to_remove, removed_workers, &n_removed_workers);
+	_STARPU_PTHREAD_MUTEX_LOCK(&changing_ctx_mutex[sched_ctx_id]);
+	/* if the context has not already been deleted */
+	if(sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
+	{
+		_starpu_remove_workers_from_sched_ctx(sched_ctx, workers_to_remove, nworkers_to_remove, removed_workers, &n_removed_workers);
 
-	if(n_removed_workers > 0)
-		_starpu_update_workers(removed_workers, n_removed_workers, sched_ctx->id);
+		if(n_removed_workers > 0)
+			_starpu_update_workers(removed_workers, n_removed_workers, sched_ctx->id);
 
-	_STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->changing_ctx_mutex);
+	}
+	_STARPU_PTHREAD_MUTEX_UNLOCK(&changing_ctx_mutex[sched_ctx_id]);	       
 	return;
 }
 
@@ -660,10 +687,10 @@ static unsigned _starpu_worker_get_sched_ctx_id(struct _starpu_worker *worker, u
 int _starpu_wait_for_all_tasks_of_sched_ctx(unsigned sched_ctx_id)
 {
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-
+	
 	if (STARPU_UNLIKELY(!_starpu_worker_may_perform_blocking_calls()))
 	  return -EDEADLK;
-
+	
 	return _starpu_barrier_counter_wait_for_empty_counter(&sched_ctx->tasks_barrier);
 }
 
@@ -677,7 +704,16 @@ void _starpu_decrement_nsubmitted_tasks_of_sched_ctx(unsigned sched_ctx_id)
 		if(sched_ctx->finished_submit)
 		{
 			_STARPU_PTHREAD_MUTEX_UNLOCK(&finished_submit_mutex);
-			starpu_sched_ctx_delete(sched_ctx_id, sched_ctx->inheritor);
+			unsigned nworkers = sched_ctx->workers->nworkers;
+			int *workerids = NULL;
+			unsigned is_list = _get_workers_list(sched_ctx->workers, &workerids);
+			
+			starpu_sched_ctx_add_workers(workerids, nworkers, sched_ctx->inheritor);
+			starpu_sched_ctx_remove_workers(workerids, nworkers, sched_ctx_id);
+
+			if(!is_list)
+				free(workerids);
+
 			return;
 		}
 		_STARPU_PTHREAD_MUTEX_UNLOCK(&finished_submit_mutex);
@@ -696,7 +732,7 @@ void starpu_task_set_context(unsigned *sched_ctx)
 	pthread_setspecific(sched_ctx_key, (void*)sched_ctx);
 }
 
-unsigned starpu_task_get_context()
+unsigned starpu_task_get_sched_ctx()
 {
 	unsigned *sched_ctx = (unsigned*)pthread_getspecific(sched_ctx_key);
 	if(sched_ctx == NULL)
@@ -802,13 +838,31 @@ struct starpu_sched_ctx_worker_collection* starpu_sched_ctx_create_worker_collec
 		sched_ctx->workers->deinit = worker_list.deinit;
 		sched_ctx->workers->init_cursor = worker_list.init_cursor;
 		sched_ctx->workers->deinit_cursor = worker_list.deinit_cursor;
-		sched_ctx->workers->type = WORKER_LIST;
+		sched_ctx->workers->type = WORKER_LIST; 
 		break;
 	}
 
 	return sched_ctx->workers;
 }
 
+static unsigned _get_workers_list(struct starpu_sched_ctx_worker_collection *workers, int **workerids)
+{
+	*workerids = (int*)malloc(workers->nworkers*sizeof(int));
+	int worker;
+	int i = 0;
+	if(workers->init_cursor)
+		workers->init_cursor(workers);
+	
+	while(workers->has_next(workers))
+	{
+		worker = workers->get_next(workers);
+		(*workerids)[i++] = worker;
+	}
+	
+	if (workers->deinit_cursor)
+		workers->deinit_cursor(workers);
+	return 0;
+}
 void starpu_sched_ctx_delete_worker_collection(unsigned sched_ctx_id)
 {
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
@@ -826,15 +880,15 @@ struct starpu_sched_ctx_worker_collection* starpu_sched_ctx_get_worker_collectio
 int starpu_get_workers_of_sched_ctx(unsigned sched_ctx_id, int *pus, enum starpu_archtype arch)
 {
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-
+	
 	struct starpu_sched_ctx_worker_collection *workers = sched_ctx->workers;
 	int worker;
 
 	int npus = 0;
-
+	
 	if(workers->init_cursor)
 		workers->init_cursor(workers);
-
+	
 	while(workers->has_next(workers))
 	{
 		worker = workers->get_next(workers);
@@ -842,7 +896,7 @@ int starpu_get_workers_of_sched_ctx(unsigned sched_ctx_id, int *pus, enum starpu
 		if(curr_arch == arch)
 			pus[npus++] = worker;
 	}
-
+	
 	if (workers->deinit_cursor)
 		workers->deinit_cursor(workers);
 	return npus;
@@ -850,8 +904,7 @@ int starpu_get_workers_of_sched_ctx(unsigned sched_ctx_id, int *pus, enum starpu
 
 _starpu_pthread_mutex_t* starpu_get_changing_ctx_mutex(unsigned sched_ctx_id)
 {
-	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	return &sched_ctx->changing_ctx_mutex;
+	return &changing_ctx_mutex[sched_ctx_id];
 }
 
 unsigned starpu_sched_ctx_get_nworkers(unsigned sched_ctx_id)
@@ -859,7 +912,7 @@ unsigned starpu_sched_ctx_get_nworkers(unsigned sched_ctx_id)
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
 	if(sched_ctx != NULL)
 		return sched_ctx->workers->nworkers;
-	else
+	else 
 		return 0;
 
 }
@@ -936,14 +989,14 @@ void starpu_set_turn_to_other_ctx(int workerid, unsigned sched_ctx_id)
 	for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
 	{
 		other_sched_ctx = worker->sched_ctx[i];
-		if(other_sched_ctx != NULL && other_sched_ctx->id != STARPU_NMAX_SCHED_CTXS &&
+		if(other_sched_ctx != NULL && other_sched_ctx->id != STARPU_NMAX_SCHED_CTXS && 
 		   other_sched_ctx->id != 0 && other_sched_ctx->id != sched_ctx_id)
 		{
 			worker->active_ctx = other_sched_ctx->id;
 			active_sched_ctx = other_sched_ctx;
 			break;
 		}
-	}
+	}		
 
 	if(worker->active_ctx != sched_ctx_id)
 	{
@@ -953,7 +1006,7 @@ void starpu_set_turn_to_other_ctx(int workerid, unsigned sched_ctx_id)
 
 double starpu_get_max_time_worker_on_ctx(void)
 {
-	return max_time_worker_on_ctx;
+	return max_time_worker_on_ctx;	
 }
 
 void starpu_sched_ctx_set_inheritor(unsigned sched_ctx_id, unsigned inheritor)

+ 3 - 3
src/core/sched_ctx.h

@@ -28,6 +28,9 @@
 #define REQ_RESIZE 0
 #define DO_RESIZE 1
 
+/* used when changes (delete, modify) are applyed to contexts */
+_starpu_pthread_mutex_t changing_ctx_mutex[STARPU_NMAX_SCHED_CTXS];
+
 struct _starpu_sched_ctx
 {
 	/* id of the context used in user mode*/
@@ -44,9 +47,6 @@ struct _starpu_sched_ctx
 
 	struct starpu_sched_ctx_worker_collection *workers;
 
-	/* mutex for temp_nworkers_in_ctx*/
-	_starpu_pthread_mutex_t changing_ctx_mutex;
-
 	/* we keep an initial sched which we never delete */
 	unsigned is_initial_sched;
 

+ 3 - 3
src/core/sched_policy.c

@@ -222,10 +222,7 @@ static int _starpu_push_task_on_specific_worker(struct starpu_task *task, int wo
 	{
 		sched_ctx = worker->sched_ctx[i];
 		if (sched_ctx != NULL && sched_ctx->sched_policy != NULL && sched_ctx->sched_policy->push_task_notify)
-		{
 			sched_ctx->sched_policy->push_task_notify(task, workerid, sched_ctx->id);
-		}
-
 	}
 
 #ifdef STARPU_USE_SCHED_CTX_HYPERVISOR
@@ -258,6 +255,8 @@ static int _starpu_push_task_on_specific_worker(struct starpu_task *task, int wo
 			for (i = 0; i < task->cl->nbuffers; i++)
 				task->handles[i]->mf_node = node;
 		}
+//		if(task->sched_ctx != _starpu_get_initial_sched_ctx()->id)
+			
 		if(task->priority > 0)
 			return _starpu_push_local_task(worker, task, 1);
 		else
@@ -543,6 +542,7 @@ pick:
 				sched_ctx = _starpu_get_initial_sched_ctx();
 			else
 				sched_ctx = _get_next_sched_ctx_to_pop_into(worker);
+
 			if(sched_ctx != NULL && sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
 			{
 				sched_ctx_mutex = _starpu_get_sched_mutex(sched_ctx, worker->workerid);

+ 3 - 2
src/core/workers.c

@@ -830,6 +830,7 @@ void starpu_profiling_init()
 {
 	_starpu_profiling_init();
 }
+
 /*
  * Handle runtime termination
  */
@@ -896,7 +897,7 @@ static void _starpu_terminate_workers(struct _starpu_machine_config *config)
 #endif
 		}
 
-out:
+out:		
 		STARPU_ASSERT(starpu_task_list_empty(&worker->local_tasks));
 		_starpu_job_list_delete(worker->terminated_jobs);
 	}
@@ -1183,7 +1184,7 @@ unsigned starpu_worker_is_combined_worker(int id)
 
 struct _starpu_sched_ctx *_starpu_get_sched_ctx_struct(unsigned id)
 {
-        STARPU_ASSERT(id <= STARPU_NMAX_SCHED_CTXS);
+	if (id == STARPU_NMAX_SCHED_CTXS) return NULL;
 	return &config.sched_ctxs[id];
 }
 

+ 12 - 2
src/sched_policies/deque_modeling_policy_data_aware.c

@@ -755,8 +755,13 @@ static void dmda_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nwo
 	for (i = 0; i < nworkers; i++)
 	{
 		workerid = workerids[i];
-		dt->queue_array[workerid] = _starpu_create_fifo();
-		starpu_sched_ctx_init_worker_mutex_and_cond(sched_ctx_id, workerid);
+		/* if the worker has alreadry belonged to this context
+		   the queue and the synchronization variables have been already initialized */
+		if(dt->queue_array[workerid] ==NULL)
+		{
+			dt->queue_array[workerid] = _starpu_create_fifo();
+			starpu_sched_ctx_init_worker_mutex_and_cond(sched_ctx_id, workerid);
+		}
 	}
 }
 
@@ -770,6 +775,7 @@ static void dmda_remove_workers(unsigned sched_ctx_id, int *workerids, unsigned
 	{
 		workerid = workerids[i];
 		_starpu_destroy_fifo(dt->queue_array[workerid]);
+		dt->queue_array[workerid] = NULL;
 		starpu_sched_ctx_deinit_worker_mutex_and_cond(sched_ctx_id, workerid);
 	}
 }
@@ -788,6 +794,10 @@ static void initialize_dmda_policy(unsigned sched_ctx_id)
 
 	dt->queue_array = (struct _starpu_fifo_taskq**)malloc(STARPU_NMAXWORKERS*sizeof(struct _starpu_fifo_taskq*));
 
+	int i;
+	for(i = 0; i < STARPU_NMAXWORKERS; i++)
+		dt->queue_array[i] = NULL;
+
 	const char *strval_alpha = getenv("STARPU_SCHED_ALPHA");
 	if (strval_alpha)
 		dt->alpha = atof(strval_alpha);