Browse Source

bug fixing

Andra Hugo 13 years ago
parent
commit
f0eb6f74d4

+ 4 - 0
sched_ctx_hypervisor/include/sched_ctx_hypervisor.h

@@ -135,6 +135,8 @@ struct sched_ctx_wrapper* sched_ctx_hypervisor_get_wrapper(unsigned sched_ctx);
 
 double sched_ctx_hypervisor_get_elapsed_flops_per_sched_ctx(struct sched_ctx_wrapper* sc_w);
 
+double sched_ctx_hypervisor_get_total_elapsed_flops_per_sched_ctx(struct sched_ctx_wrapper* sc_w);
+
 char* sched_ctx_hypervisor_get_policy();
 
 void sched_ctx_hypervisor_add_workers_to_sched_ctx(int* workers_to_add, unsigned nworkers_to_add, unsigned sched_ctx);
@@ -148,3 +150,5 @@ unsigned sched_ctx_hypervisor_get_size_req(int **sched_ctxs, int* nsched_ctxs, i
 void sched_ctx_hypervisor_save_size_req(int *sched_ctxs, int nsched_ctxs, int *workers, int nworkers);	
 
 void sched_ctx_hypervisor_free_size_req(void);
+
+unsigned sched_ctx_hypervisor_can_resize(unsigned sched_ctx);

+ 77 - 19
sched_ctx_hypervisor/src/hypervisor_policies/lp3_policy.c

@@ -23,7 +23,6 @@ static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
 
 static void _size_ctxs(int *sched_ctxs, int nsched_ctxs , int *workers, int nworkers)
 {
-	printf("~~~~~~~~~~~~~~~~~~~~~~~~size\n");
 	int ns = sched_ctxs == NULL ? sched_ctx_hypervisor_get_nsched_ctxs() : nsched_ctxs;
 	int nw = workers == NULL ? starpu_worker_get_count() : nworkers; /* Number of different workers */
 	int nt = 0; /* Number of different kinds of tasks */
@@ -293,7 +292,11 @@ static double _glp_resolve(int ns, int nw, int nt, double tasks[nw][nt], double
 
 	for(s = 0; s < ns; s++)
 		for(w = 0; w < nw; w++)
+		{
 			w_in_s[s][w] = glp_get_col_prim(lp, nw*nt+s*nw+w+1);
+/* 			if(w_in_s[s][w]) */
+/* 				printf("%d in %d %lf \n",w, s, w_in_s[s][w]); */
+		}
 
 	glp_delete_prob(lp);
 	return res;
@@ -308,6 +311,8 @@ static void _redistribute_resources_in_ctxs(int ns, int nw, int nt, double w_in_
 	for(s = 0; s < ns; s++)
 	{
 		int workers_to_add[nw], workers_to_remove[nw];
+		int destination_ctx[nw][ns];
+
 		for(w = 0; w < nw; w++)
 		{
 			workers_to_add[w] = -1;
@@ -318,20 +323,78 @@ static void _redistribute_resources_in_ctxs(int ns, int nw, int nt, double w_in_
 
 		for(w = 0; w < nw; w++)
 		{
-			if(w_in_s[s][w] >= 0.5)
-				workers_to_add[nadd++] = workers == NULL ? w : workers[w];
+			enum starpu_perf_archtype arch = workers == NULL ? starpu_worker_get_type(w) :
+				starpu_worker_get_type(workers[w]);
+
+			if(arch == STARPU_CPU_WORKER)
+			{
+				if(w_in_s[s][w] >= 0.5)
+				{
+//					printf("add %d to ctx %d\n", w, s);
+					workers_to_add[nadd++] = workers == NULL ? w : workers[w];
+				}
+				else
+				{
+//					printf("remove %d from ctx %d\n", w, s);
+					workers_to_remove[nremove++] = workers == NULL ? w : workers[w];
+					for(s2 = 0; s2 < ns; s2++)
+						if(s2 != s && w_in_s[s2][w] >= 0.5)
+							destination_ctx[w][s2] = 1;
+						else
+							destination_ctx[w][s2] = 0;
+					
+					
+				}
+			}
 			else
-				workers_to_remove[nremove++] = workers == NULL ? w : workers[w];
+			{
+				if(w_in_s[s][w] >= 0.3)
+				{
+	//				printf("add %d to ctx %d\n", w, s);
+					workers_to_add[nadd++] = workers == NULL ? w : workers[w];
+				}
+				else
+				{
+//					printf("remove %d from ctx %d\n", w, s);
+					workers_to_remove[nremove++] = workers == NULL ? w : workers[w];
+					for(s2 = 0; s2 < ns; s2++)
+						if(s2 != s && w_in_s[s2][w] >= 0.3)
+							destination_ctx[w][s2] = 1;
+						else
+							destination_ctx[w][s2] = 0;
+				}
+			}
+	
 		}
 		
 		if(!first_time)
 		{
-			printf("********resize \n");
-			sched_ctx_hypervisor_remove_workers_from_sched_ctx(workers_to_remove, nremove, sched_ctxs[s]);
+			/* do not remove workers if they can't go anywhere */
+			int w2;
+			unsigned found_one_dest[nremove];
+			unsigned all_have_dest = 1;
+			for(w2 = 0; w2 < nremove; w2++)
+				found_one_dest[w2] = 0;
+
+			for(w2 = 0; w2 < nremove; w2++)
+				for(s2 = 0; s2 < ns; s2++)
+					if(destination_ctx[w2][s2] && sched_ctx_hypervisor_can_resize(sched_ctxs[s2]))
+					{
+						found_one_dest[w2] = 1;
+						break;
+					}
+			for(w2 = 0; w2 < nremove; w2++)
+			{
+				if(!found_one_dest[w2])
+				{
+					all_have_dest = 0;
+					break;
+				}
+			}
+			if(all_have_dest)
+				sched_ctx_hypervisor_remove_workers_from_sched_ctx(workers_to_remove, nremove, sched_ctxs[s]);
 		}
-		else
-			printf("*********size \n");
-	
+
 		sched_ctx_hypervisor_add_workers_to_sched_ctx(workers_to_add, nadd, sched_ctxs[s]);
 		struct policy_config *new_config = sched_ctx_hypervisor_get_config(sched_ctxs[s]);
 		int i;
@@ -371,13 +434,12 @@ static unsigned _compute_task_distribution_over_ctxs(int ns, int nw, int nt, dou
 	   compute the nr of flops and not the tasks */
 	double smallest_tmax = _lp_get_tmax(nw, workers);
 	double tmax = smallest_tmax * ns;
-	printf("tmax = %lf\n", tmax);
 	
 	double res = 1.0;
 	unsigned has_sol = 0;
 	double tmin = 0.0;
 	double old_tmax = 0.0;
-	unsigned found_sol;
+	unsigned found_sol = 0;
 	/* we fix tmax and we do not treat it as an unknown
 	   we just vary by dichotomy its values*/
 	while(tmax > 1.0)
@@ -404,14 +466,12 @@ static unsigned _compute_task_distribution_over_ctxs(int ns, int nw, int nt, dou
 		   bigger than the old min */
 		if(has_sol)
 		{
-			printf("%lf: has sol\n", tmax);
 			if(old_tmax != 0.0 && (old_tmax - tmax) < 0.5)
 				break;
 			old_tmax = tmax;
 		}
 		else /*else try a bigger one but smaller than the old tmax */
 		{
-			printf("%lf: no sol\n", tmax);
 			tmin = tmax;
 			if(old_tmax != 0.0)
 				tmax = old_tmax;
@@ -421,14 +481,14 @@ static unsigned _compute_task_distribution_over_ctxs(int ns, int nw, int nt, dou
 		
 		if(tmax < smallest_tmax)
 		{
-			found_sol = 0;
-			break;
+			tmax = old_tmax;
+			tmin = smallest_tmax;
+			tmax = _find_tmax(tmin, tmax);
 		}
 	}
 	return found_sol;
 }
 
-static int done = 0;
 static void lp3_handle_poped_task(unsigned sched_ctx, int worker)
 {
 	struct sched_ctx_wrapper* sc_w = sched_ctx_hypervisor_get_wrapper(sched_ctx);
@@ -442,7 +502,7 @@ static void lp3_handle_poped_task(unsigned sched_ctx, int worker)
 			return;
 		}
 
-		if(_velocity_gap_btw_ctxs() && !done)
+		if(_velocity_gap_btw_ctxs())
 		{
 			int ns = sched_ctx_hypervisor_get_nsched_ctxs();
 			int nw = starpu_worker_get_count(); /* Number of different workers */
@@ -457,7 +517,6 @@ static void lp3_handle_poped_task(unsigned sched_ctx, int worker)
 			/* if we did find at least one solution redistribute the resources */
 			if(found_sol)
 			{
-				done = 1;
 				_redistribute_resources_in_ctxs(ns, nw, nt, w_in_s, 0, NULL, NULL);
 			}
 		}
@@ -468,7 +527,6 @@ static void lp3_handle_poped_task(unsigned sched_ctx, int worker)
 
 static void lp3_size_ctxs(int *sched_ctxs, int nsched_ctxs , int *workers, int nworkers)
 {
-	printf("require size !!!!!!!!!!!!!!\n");
 	sched_ctx_hypervisor_save_size_req(sched_ctxs, nsched_ctxs, workers, nworkers);
 }
 

+ 5 - 3
sched_ctx_hypervisor/src/hypervisor_policies/policy_tools.c

@@ -315,8 +315,10 @@ static double _get_elapsed_flops(struct sched_ctx_wrapper* sc_w, int *npus, enum
 double _get_ctx_velocity(struct sched_ctx_wrapper* sc_w)
 {
         double elapsed_flops = sched_ctx_hypervisor_get_elapsed_flops_per_sched_ctx(sc_w);
-
-        if( elapsed_flops >= 1.0)
+	double total_elapsed_flops = sched_ctx_hypervisor_get_total_elapsed_flops_per_sched_ctx(sc_w);
+	double prc = elapsed_flops/sc_w->total_flops;
+	double prc_valid_velocity = elapsed_flops == total_elapsed_flops ? 0.05 : 0.2;
+        if( prc >= prc_valid_velocity)
         {
                 double curr_time = starpu_timing_now();
                 double elapsed_time = curr_time - sc_w->start_time;
@@ -367,7 +369,7 @@ int _velocity_gap_btw_ctxs()
 					{
 						double gap = ctx_v < other_ctx_v ? other_ctx_v / ctx_v : ctx_v / other_ctx_v ;
 //						printf("gap = %lf\n", gap);
-						if(gap > 8)
+						if(gap > 2)
 							return 1;
 					}
 				}

+ 76 - 28
sched_ctx_hypervisor/src/sched_ctx_hypervisor.c

@@ -417,6 +417,11 @@ void sched_ctx_hypervisor_add_workers_to_sched_ctx(int* workers_to_add, unsigned
 	return;
 }
 
+unsigned sched_ctx_hypervisor_can_resize(unsigned sched_ctx)
+{
+	return hypervisor.resize[sched_ctx];
+}
+
 void sched_ctx_hypervisor_remove_workers_from_sched_ctx(int* workers_to_remove, unsigned nworkers_to_remove, unsigned sched_ctx)
 {
 	if(nworkers_to_remove > 0 && hypervisor.resize[sched_ctx])
@@ -464,6 +469,15 @@ double sched_ctx_hypervisor_get_elapsed_flops_per_sched_ctx(struct sched_ctx_wra
 	return ret_val;
 }
 
+double sched_ctx_hypervisor_get_total_elapsed_flops_per_sched_ctx(struct sched_ctx_wrapper* sc_w)
+{
+	double ret_val = 0.0;
+	int i;
+	for(i = 0; i < STARPU_NMAXWORKERS; i++)
+		ret_val += sc_w->total_elapsed_flops[i];
+	return ret_val;
+}
+
 static unsigned _ack_resize_completed(unsigned sched_ctx, int worker)
 {
 	struct resize_ack *resize_ack = NULL;
@@ -516,36 +530,69 @@ static unsigned _ack_resize_completed(unsigned sched_ctx, int worker)
 		
 		unsigned resize_completed = (nacked_workers == nmoved_workers);
 		unsigned receiver_sched_ctx = resize_ack->receiver_sched_ctx;
-		/* if the permission to resize is not allowed by the user don't do it
-		   whatever the application says */
-		if(resize_completed && !((hypervisor.resize[sched_ctx] == 0 || hypervisor.resize[receiver_sched_ctx] == 0) && imposed_resize) && worker == moved_workers[0])
-		{				
-			/* info concerning only the gflops_rate strateg */
-			struct sched_ctx_wrapper *sender_sc_w = &hypervisor.sched_ctx_w[sender_sched_ctx];
-			struct sched_ctx_wrapper *receiver_sc_w = &hypervisor.sched_ctx_w[receiver_sched_ctx];
+		unsigned unknown_sender = receiver_sched_ctx == sched_ctx;
+		if(!unknown_sender)
+		{
+			/* if the permission to resize is not allowed by the user don't do it
+			   whatever the application says */
+			if(resize_completed && !((hypervisor.resize[sched_ctx] == 0 || hypervisor.resize[receiver_sched_ctx] == 0) && imposed_resize) && worker == moved_workers[0])
+			{				
+				/* info concerning only the gflops_rate strateg */
+				struct sched_ctx_wrapper *sender_sc_w = &hypervisor.sched_ctx_w[sender_sched_ctx];
+				struct sched_ctx_wrapper *receiver_sc_w = &hypervisor.sched_ctx_w[receiver_sched_ctx];
+				
+				double start_time =  starpu_timing_now();
+				sender_sc_w->start_time = start_time;
+				sender_sc_w->remaining_flops = sender_sc_w->remaining_flops - sched_ctx_hypervisor_get_elapsed_flops_per_sched_ctx(sender_sc_w);
+				_set_elapsed_flops_per_sched_ctx(sender_sched_ctx, 0.0);
+				
+				receiver_sc_w->start_time = start_time;
+				receiver_sc_w->remaining_flops = receiver_sc_w->remaining_flops - sched_ctx_hypervisor_get_elapsed_flops_per_sched_ctx(receiver_sc_w);
+				_set_elapsed_flops_per_sched_ctx(receiver_sched_ctx, 0.0);
+				
+				hypervisor.resize[sender_sched_ctx] = 1;
+				hypervisor.resize[receiver_sched_ctx] = 1;
+				/* if the user allowed resizing leave the decisions to the application */
+				if(imposed_resize)  imposed_resize = 0;
+				
+				pthread_mutex_lock(&hypervisor.sched_ctx_w[sched_ctx].mutex);
+				resize_ack->receiver_sched_ctx = -1;
+				resize_ack->nmoved_workers = 0;
+				free(resize_ack->moved_workers);
+				free(resize_ack->acked_workers);
+				pthread_mutex_unlock(&hypervisor.sched_ctx_w[sched_ctx].mutex);
+			}
 			
-			double start_time =  starpu_timing_now();
-			sender_sc_w->start_time = start_time;
-			sender_sc_w->remaining_flops = sender_sc_w->remaining_flops - sched_ctx_hypervisor_get_elapsed_flops_per_sched_ctx(sender_sc_w);
-			_set_elapsed_flops_per_sched_ctx(sender_sched_ctx, 0.0);
+			return resize_completed;
+		}
+		else
+		{
+			/* if the permission to resize is not allowed by the user don't do it
+			   whatever the application says */
+			if(resize_completed && !(hypervisor.resize[sched_ctx] == 0 && imposed_resize) && worker == moved_workers[0])
+			{				
+				/* info concerning only the gflops_rate strateg */
+				struct sched_ctx_wrapper *sc_w = &hypervisor.sched_ctx_w[sched_ctx];
+				
+				double start_time =  starpu_timing_now();
+				sc_w->start_time = start_time;
+				sc_w->remaining_flops = sc_w->remaining_flops - sched_ctx_hypervisor_get_elapsed_flops_per_sched_ctx(sc_w);
+				_set_elapsed_flops_per_sched_ctx(sched_ctx, 0.0);
+								
+				hypervisor.resize[sched_ctx] = 1;
+				/* if the user allowed resizing leave the decisions to the application */
+				if(imposed_resize)  imposed_resize = 0;
+				
+				pthread_mutex_lock(&hypervisor.sched_ctx_w[sched_ctx].mutex);
+				resize_ack->receiver_sched_ctx = -1;
+				resize_ack->nmoved_workers = 0;
+				free(resize_ack->moved_workers);
+				free(resize_ack->acked_workers);
+				pthread_mutex_unlock(&hypervisor.sched_ctx_w[sched_ctx].mutex);
+			}
 			
-			receiver_sc_w->start_time = start_time;
-			receiver_sc_w->remaining_flops = receiver_sc_w->remaining_flops - sched_ctx_hypervisor_get_elapsed_flops_per_sched_ctx(receiver_sc_w);
-			_set_elapsed_flops_per_sched_ctx(receiver_sched_ctx, 0.0);
-
-			hypervisor.resize[sender_sched_ctx] = 1;
-			hypervisor.resize[receiver_sched_ctx] = 1;
-			/* if the user allowed resizing leave the decisions to the application */
-			if(imposed_resize)  imposed_resize = 0;
-
-			pthread_mutex_lock(&hypervisor.sched_ctx_w[sched_ctx].mutex);
-			resize_ack->receiver_sched_ctx = -1;
-			resize_ack->nmoved_workers = 0;
-			free(resize_ack->moved_workers);
-			free(resize_ack->acked_workers);
-			pthread_mutex_unlock(&hypervisor.sched_ctx_w[sched_ctx].mutex);
+			return resize_completed;
 		}
-		return resize_completed;
 	}
 	return 0;
 }
@@ -567,7 +614,8 @@ static void notify_idle_end(unsigned sched_ctx, int worker)
 		if(hypervisor.policy.handle_idle_end)
 			hypervisor.policy.handle_idle_end(sched_ctx, worker);
 		
-		_ack_resize_completed(sched_ctx, worker);
+		if(!hypervisor.resize[sched_ctx])
+			_ack_resize_completed(sched_ctx, worker);
 	}
 }
 

+ 13 - 7
src/core/sched_ctx.c

@@ -90,13 +90,15 @@ static void _starpu_update_workers(int *workerids, int nworkers, int sched_ctx_i
 			worker[i]->tasks[sched_ctx_id]->execute_on_a_specific_worker = 1;
 			worker[i]->tasks[sched_ctx_id]->workerid = workerids[i];
 			worker[i]->tasks[sched_ctx_id]->destroy = 1;
+			worker[i]->tasks[sched_ctx_id]->control_task = 1;
 			int worker_sched_ctx_id = _starpu_worker_get_sched_ctx_id(worker[i], sched_ctx_id);
                         /* if the ctx is not in the worker's list it means the update concerns the addition of ctxs*/
                         if(worker_sched_ctx_id == STARPU_NMAX_SCHED_CTXS)
                                 worker[i]->tasks[sched_ctx_id]->priority = 1;
 
 			_starpu_exclude_task_from_dag(worker[i]->tasks[sched_ctx_id]);
-			
+
+//			starpu_push_local_task(workerids[i], worker[i]->tasks[sched_ctx_id], 1);
 			_starpu_task_submit_internally(worker[i]->tasks[sched_ctx_id]);
 		}		
 	}
@@ -121,7 +123,9 @@ static void _starpu_add_workers_to_sched_ctx(struct _starpu_sched_ctx *sched_ctx
 		{
 			int worker = workers->add(workers, (workerids == NULL ? i : workerids[i]));
 			if(worker >= 0)
+			{
 				added_workers[(*n_added_workers)++] = worker;		
+			}
 		}
 		else
 		{
@@ -334,16 +338,18 @@ void starpu_add_workers_to_sched_ctx(int *workers_to_add, int nworkers_to_add, u
 	_starpu_add_workers_to_sched_ctx(sched_ctx, workers_to_add, nworkers_to_add, added_workers, &n_added_workers);
 
 	if(n_added_workers > 0)
+	{
 		_starpu_update_workers(added_workers, n_added_workers, sched_ctx->id);
+	}
 
 	_STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->changing_ctx_mutex);
 
-	if(n_added_workers > 0)
-	{
-		_STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->no_workers_mutex);
-		_STARPU_PTHREAD_COND_BROADCAST(&sched_ctx->no_workers_cond);
-		_STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->no_workers_mutex);
-	}
+/* 	if(n_added_workers > 0) */
+/* 	{ */
+/* 		_STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->no_workers_mutex); */
+/* 		_STARPU_PTHREAD_COND_BROADCAST(&sched_ctx->no_workers_cond); */
+/* 		_STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->no_workers_mutex); */
+/* 	} */
 
 	unsigned unlocked = 0;
 	_STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->empty_ctx_mutex);

+ 4 - 1
src/core/sched_policy.c

@@ -272,7 +272,10 @@ static int _starpu_push_task_on_specific_worker(struct starpu_task *task, int wo
 			for (i = 0; i < task->cl->nbuffers; i++)
 				task->handles[i]->mf_node = node;
 		}
-		return _starpu_push_local_task(worker, task, 0);
+		if(task->priority > 0)
+			return _starpu_push_local_task(worker, task, 1);
+		else
+			return _starpu_push_local_task(worker, task, 0);
 	}
 	else
 	{

+ 6 - 3
src/worker_collection/worker_list.c

@@ -35,8 +35,10 @@ static unsigned _worker_belongs_to_ctx(struct worker_collection *workers, int wo
 	
 	int i;
 	for(i = 0; i < nworkers; i++)
-	  if(workerids[i] == workerid)
-		  return 1;
+	{
+		if(workerids[i] == workerid)
+			return 1;
+	}
 	return 0;
 }
 
@@ -106,7 +108,8 @@ static int list_remove(struct worker_collection *workers, int worker)
 	}
 
 	_rearange_workerids(workerids, nworkers);
-	workers->nworkers--;
+	if(found_worker != -1)
+		workers->nworkers--;
 
 	return found_worker;
 }