Преглед изворни кода

fix resize for lp2 (compute correctly the bw && the latency)

Andra Hugo пре 12 година
родитељ
комит
5c1a97e5c7
1 измењених фајлова са 60 додато и 23 уклоњено
  1. 60 23
      sched_ctx_hypervisor/src/hypervisor_policies/lp2_policy.c

+ 60 - 23
sched_ctx_hypervisor/src/hypervisor_policies/lp2_policy.c

@@ -21,8 +21,10 @@
 static struct bound_task_pool *task_pools = NULL;
 
 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
-static double _glp_resolve(int ns, int nw, int nt, double tasks[nw][nt], double tmax, double w_in_s[ns][nw], int *in_sched_ctxs, int *workers, unsigned interger);
-static unsigned _compute_task_distribution_over_ctxs(int ns, int nw, int nt, double w_in_s[ns][nw], double tasks[nw][nt], int *sched_ctxs, int *workers)
+static double _glp_resolve(int ns, int nw, int nt, double tasks[nw][nt], double tmax, double w_in_s[ns][nw], int *in_sched_ctxs, int *workers, unsigned interger,
+			   struct bound_task_pool *tmp_task_pools, unsigned size_ctxs);
+static unsigned _compute_task_distribution_over_ctxs(int ns, int nw, int nt, double w_in_s[ns][nw], double tasks[nw][nt], 
+						     int *sched_ctxs, int *workers, struct bound_task_pool *tmp_task_pools, unsigned size_ctxs)
 {
 	double draft_tasks[nw][nt];
 	double draft_w_in_s[ns][nw];
@@ -53,6 +55,7 @@ static unsigned _compute_task_distribution_over_ctxs(int ns, int nw, int nt, dou
 	double old_tmax = 0.0;
 	unsigned found_sol = 0;
 
+//	printf("tmin = %lf tmax = %lf \n", tmin, tmax);
 	struct timeval start_time;
 	struct timeval end_time;
 	int nd = 0;
@@ -65,7 +68,7 @@ static unsigned _compute_task_distribution_over_ctxs(int ns, int nw, int nt, dou
 		/* find solution and save the values in draft tables
 		   only if there is a solution for the system we save them
 		   in the proper table */
-		res = _glp_resolve(ns, nw, nt, draft_tasks, tmax, draft_w_in_s, sched_ctxs, workers, 1);
+		res = _glp_resolve(ns, nw, nt, draft_tasks, tmax, draft_w_in_s, sched_ctxs, workers, 1, tmp_task_pools, size_ctxs);
 		if(res != 0.0)
 		{
 			for(w = 0; w < nw; w++)
@@ -129,7 +132,7 @@ static void _size_ctxs(int *sched_ctxs, int nsched_ctxs , int *workers, int nwor
 
 	double w_in_s[ns][nw];
 	double tasks[nw][nt];
-	unsigned found_sol = _compute_task_distribution_over_ctxs(ns, nw, nt, w_in_s, tasks, sched_ctxs, workers);
+	unsigned found_sol = _compute_task_distribution_over_ctxs(ns, nw, nt, w_in_s, tasks, sched_ctxs, workers, task_pools, 1);
 	pthread_mutex_unlock(&mutex);
 	/* if we did find at least one solution redistribute the resources */
 	if(found_sol)
@@ -237,7 +240,7 @@ static void _remove_task_from_pool(struct starpu_task *task, uint32_t footprint)
 	}
 }
 
-static void _get_tasks_times(int nw, int nt, double times[nw][nt], int *workers)
+static void _get_tasks_times(int nw, int nt, double times[nw][nt], int *workers, unsigned size_ctxs)
 {
         struct bound_task_pool *tp;
         int w, t;
@@ -256,20 +259,21 @@ static void _get_tasks_times(int nw, int nt, double times[nw][nt], int *workers)
                                 times[w][t] = length / 1000.;
 
 				double transfer_time = 0.0;
-				unsigned worker_in_ctx = starpu_sched_ctx_contains_worker(worker, tp->sched_ctx_id);
-				if(!worker_in_ctx)
+				enum starpu_archtype arch = starpu_worker_get_type(worker);
+				if(arch == STARPU_CUDA_WORKER)
 				{
-					enum starpu_archtype arch = starpu_worker_get_type(worker);
-					if(arch == STARPU_CUDA_WORKER)
+					unsigned worker_in_ctx = starpu_sched_ctx_contains_worker(worker, tp->sched_ctx_id);
+					if(!worker_in_ctx && !size_ctxs)
 					{
 						double transfer_velocity = starpu_get_bandwidth_RAM_CUDA(worker);
-						transfer_time =  (tp->footprint / transfer_velocity) / 1000000 ;
-						double latency = starpu_get_latency_RAM_CUDA(worker);
-						transfer_time += (tp->n * latency)/1000000;
+						transfer_time +=  (tp->footprint / transfer_velocity) / 1000. ;
 					}
-					times[w][t] += transfer_time;
+					double latency = starpu_get_latency_RAM_CUDA(worker);
+					transfer_time += latency/1000.;
+
 				}
 //				printf("%d/%d %s x %d time = %lf transfer_time = %lf\n", w, tp->sched_ctx_id, tp->cl->model->symbol, tp->n, times[w][t], transfer_time);
+				times[w][t] += transfer_time;
 			}
                 }
         }
@@ -280,9 +284,10 @@ static void _get_tasks_times(int nw, int nt, double times[nw][nt], int *workers)
  */
 #ifdef STARPU_HAVE_GLPK_H
 #include <glpk.h>
-static double _glp_resolve(int ns, int nw, int nt, double tasks[nw][nt], double tmax, double w_in_s[ns][nw], int *in_sched_ctxs, int *workers, unsigned integer)
+static double _glp_resolve(int ns, int nw, int nt, double tasks[nw][nt], double tmax, double w_in_s[ns][nw], int *in_sched_ctxs, int *workers, unsigned integer,
+			   struct bound_task_pool *tmp_task_pools, unsigned size_ctxs)
 {
-	if(task_pools == NULL)
+	if(tmp_task_pools == NULL)
 		return 0.0;
 	struct bound_task_pool * tp;
 	int t, w, s;
@@ -303,7 +308,7 @@ static double _glp_resolve(int ns, int nw, int nt, double tasks[nw][nt], double
 		int ia[ne], ja[ne];
 		double ar[ne];
 
-		_get_tasks_times(nw, nt, times, workers);
+		_get_tasks_times(nw, nt, times, workers, size_ctxs);
 
 		/* Variables: number of tasks i assigned to worker j, and tmax */
 		glp_add_cols(lp, nw*nt+ns*nw);
@@ -369,7 +374,7 @@ static double _glp_resolve(int ns, int nw, int nt, double tasks[nw][nt], double
 				starpu_worker_get_name(w, name, sizeof(name));
 				snprintf(title, sizeof(title), "worker %s", name);
 				glp_set_row_name(lp, curr_row_idx+s*nw+w+1, title);
-				for (t = 0, tp = task_pools; tp; t++, tp = tp->next)
+				for (t = 0, tp = tmp_task_pools; tp; t++, tp = tp->next)
 				{
 					if((int)tp->sched_ctx_id == sched_ctxs[s])
 					{
@@ -395,7 +400,7 @@ static double _glp_resolve(int ns, int nw, int nt, double tasks[nw][nt], double
 
 		/* Total task completion */
 		glp_add_rows(lp, nt);
-		for (t = 0, tp = task_pools; tp; t++, tp = tp->next)
+		for (t = 0, tp = tmp_task_pools; tp; t++, tp = tp->next)
 		{
 			char name[32], title[64];
 			starpu_worker_get_name(w, name, sizeof(name));
@@ -444,6 +449,12 @@ static double _glp_resolve(int ns, int nw, int nt, double tasks[nw][nt], double
 	glp_init_smcp(&parm);
 	parm.msg_lev = GLP_MSG_OFF;
 	int ret = glp_simplex(lp, &parm);
+
+/* 	char str[50]; */
+/* 	sprintf(str, "outpu_lp_%g", tmax); */
+
+/* 	glp_print_sol(lp, str); */
+
 	if (ret)
 	{
 		printf("error in simplex\n");
@@ -504,6 +515,16 @@ static double _glp_resolve(int ns, int nw, int nt, double tasks[nw][nt], double
 	return res;
 }
 
+static struct bound_task_pool* _clone_linked_list(struct bound_task_pool *tp)
+{
+	if(tp == NULL) return NULL;
+
+	struct bound_task_pool *tmp_tp = (struct bound_task_pool*)malloc(sizeof(struct bound_task_pool));
+	memcpy(tmp_tp, tp, sizeof(struct bound_task_pool));
+	tmp_tp->next = _clone_linked_list(tp->next);
+	return tmp_tp;
+}
+
 static void lp2_handle_poped_task(unsigned sched_ctx, int worker, struct starpu_task *task, uint32_t footprint)
 {
 	struct sched_ctx_hypervisor_wrapper* sc_w = sched_ctx_hypervisor_get_wrapper(sched_ctx);
@@ -525,27 +546,43 @@ static void lp2_handle_poped_task(unsigned sched_ctx, int worker, struct starpu_
 
 //			pthread_mutex_lock(&mutex);
 
-			struct bound_task_pool * tp;
+			/* we don't take the mutex bc a correct value of the number of tasks is
+			   not required but we do a copy in order to be sure
+			   that the linear progr won't segfault if the list of 
+			   submitted task will change during the exec */
+
+			struct bound_task_pool *tp = NULL;
+			struct bound_task_pool *tmp_task_pools = _clone_linked_list(task_pools);
+
 			for (tp = task_pools; tp; tp = tp->next)
 				nt++;
 
+
 			double w_in_s[ns][nw];
 			double tasks_per_worker[nw][nt];
 
-			unsigned found_sol = _compute_task_distribution_over_ctxs(ns, nw, nt, w_in_s, tasks_per_worker, NULL, NULL);
+			unsigned found_sol = _compute_task_distribution_over_ctxs(ns, nw, nt, w_in_s, tasks_per_worker, NULL, NULL, tmp_task_pools, 0);
 //			pthread_mutex_unlock(&mutex);
 
 			/* if we did find at least one solution redistribute the resources */
 			if(found_sol)
 				_lp_place_resources_in_ctx(ns, nw, w_in_s, NULL, NULL, 0);
 
+			struct bound_task_pool *next = NULL;
+			struct bound_task_pool *tmp_tp = tmp_task_pools;
+			while(tmp_task_pools)
+			{
+				next = tmp_tp->next;
+				free(tmp_tp);
+				tmp_tp = next;
+				tmp_task_pools = next;
+			}
+			
 
 		}
 		pthread_mutex_unlock(&act_hypervisor_mutex);
 	}
-	/* TODO: dangerous issue not taking this mutex... but too expensive to do it */
-	/* correct value of the number of tasks is not required but the linear progr
-	   can segfault if this changes during the exec */
+	/* too expensive to take this mutex and correct value of the number of tasks is not compulsory */
 //	pthread_mutex_lock(&mutex);
 	_remove_task_from_pool(task, footprint);
 //	pthread_mutex_unlock(&mutex);