浏览代码

correct speed computations for the hyp

Andra Hugo 12 年之前
父节点
当前提交
70ee761ed3

+ 1 - 1
include/starpu_sched_ctx.h

@@ -52,7 +52,7 @@ struct starpu_sched_ctx_performance_counters
 	void (*notify_pushed_task)(unsigned sched_ctx_id, int worker);
 	void (*notify_poped_task)(unsigned sched_ctx_id, int worker, struct starpu_task *task, size_t data_size, uint32_t footprint);
 	void (*notify_post_exec_hook)(unsigned sched_ctx_id, int taskid);
-	void (*notify_submitted_job)(struct starpu_task *task, uint32_t footprint);
+	void (*notify_submitted_job)(struct starpu_task *task, uint32_t footprint, size_t data_size);
 	void (*notify_delete_context)(unsigned sched_ctx);
 };
 

+ 2 - 2
sc_hypervisor/include/sc_hypervisor.h

@@ -65,7 +65,7 @@ struct sc_hypervisor_policy
 	void (*handle_post_exec_hook)(unsigned sched_ctx, int task_tag);
 
 	/* the hypervisor takes a decision when a job was submitted in this ctx */
-	void (*handle_submitted_job)(struct starpu_codelet *cl, unsigned sched_ctx, uint32_t footprint);
+	void (*handle_submitted_job)(struct starpu_codelet *cl, unsigned sched_ctx, uint32_t footprint, size_t data_size);
 	
 	/* the hypervisor takes a decision when a certain ctx was deleted */
 	void (*end_ctx)(unsigned sched_ctx);
@@ -120,7 +120,7 @@ void sc_hypervisor_free_size_req(void);
 unsigned sc_hypervisor_can_resize(unsigned sched_ctx);
 
 /* indicate the types of tasks a context will execute in order to better decide the sizing of ctxs */
-void sc_hypervisor_set_type_of_task(struct starpu_codelet *cl, unsigned sched_ctx, uint32_t footprint);
+	void sc_hypervisor_set_type_of_task(struct starpu_codelet *cl, unsigned sched_ctx, uint32_t footprint, size_t data_size);
 
 #ifdef __cplusplus
 }

+ 3 - 2
sc_hypervisor/include/sc_hypervisor_monitoring.h

@@ -77,8 +77,9 @@ struct sc_hypervisor_wrapper
 	/* nr of tasks executed on each worker in this ctx */
 	int elapsed_tasks[STARPU_NMAXWORKERS];
 
-	/* the average speed of workers when they belonged to this context */
-	double ref_velocity[STARPU_NMAXWORKERS];
+	/* the average speed of the type of workers when they belonged to this context */
+	/* 0 - cuda 1 - cpu */
+	double ref_velocity[2];
 
 	/* number of flops submitted to this ctx */
 	double submitted_flops;

+ 2 - 1
sc_hypervisor/include/sc_hypervisor_policy.h

@@ -37,11 +37,12 @@ struct sc_hypervisor_policy_task_pool
 	uint32_t footprint;
 	unsigned sched_ctx_id;
 	unsigned long n;
+	size_t data_size;
 	struct sc_hypervisor_policy_task_pool *next;
 };
 
 /* add task information to a task wrapper linked list */
-void sc_hypervisor_policy_add_task_to_pool(struct starpu_codelet *cl, unsigned sched_ctx, uint32_t footprint, struct sc_hypervisor_policy_task_pool **task_pools);
+	void sc_hypervisor_policy_add_task_to_pool(struct starpu_codelet *cl, unsigned sched_ctx, uint32_t footprint, struct sc_hypervisor_policy_task_pool **task_pools, size_t data_size);
 
 /* remove task information from a task wrapper linked list */
 void sc_hypervisor_policy_remove_task_from_pool(struct starpu_task *task, uint32_t footprint, struct sc_hypervisor_policy_task_pool **task_pools);

+ 10 - 6
sc_hypervisor/src/hypervisor_policies/teft_lp_policy.c

@@ -53,6 +53,7 @@ static double _compute_workers_distrib(int ns, int nw, double final_w_in_s[ns][n
 	double tasks[nw][nt];
 	double times[nw][nt];
 	
+	/* times in ms */
 	sc_hypervisor_get_tasks_times(nw, nt, times, workers, size_ctxs, task_pools);
 
 	double res = 0.0;
@@ -101,7 +102,8 @@ static void _size_ctxs(int *sched_ctxs, int nsched_ctxs , int *workers, int nwor
 
 	/* smallest possible tmax, difficult to obtain as we
 	   compute the nr of flops and not the tasks */
-	double possible_tmax = sc_hypervisor_lp_get_tmax(nw, workers);
+        /*lp computes it in s but it's converted to ms just before return */
+	double possible_tmax = sc_hypervisor_lp_get_tmax(nw, workers); 
 	double smallest_tmax = possible_tmax / 3;
 	double tmax = possible_tmax * ns;
 	double tmin = smallest_tmax;
@@ -149,11 +151,11 @@ static void size_if_required()
 	}
 }
 
-static void teft_lp_handle_submitted_job(struct starpu_codelet *cl, unsigned sched_ctx, uint32_t footprint)
+static void teft_lp_handle_submitted_job(struct starpu_codelet *cl, unsigned sched_ctx, uint32_t footprint, size_t data_size)
 {
 	/* count the tasks of the same type */
 	starpu_pthread_mutex_lock(&mutex);
-	sc_hypervisor_policy_add_task_to_pool(cl, sched_ctx, footprint, &task_pools);
+	sc_hypervisor_policy_add_task_to_pool(cl, sched_ctx, footprint, &task_pools, data_size);
 	starpu_pthread_mutex_unlock(&mutex);
 
 	size_if_required();
@@ -195,12 +197,14 @@ static void _try_resizing(void)
 	specific_data.tmp_task_pools = tmp_task_pools;
 	specific_data.size_ctxs = 0;
 
-			/* smallest possible tmax, difficult to obtain as we
-			   compute the nr of flops and not the tasks */
+	/* smallest possible tmax, difficult to obtain as we
+	   compute the nr of flops and not the tasks */
+        /*lp computes it in s but it's converted to ms just before return */
 	double possible_tmax = sc_hypervisor_lp_get_tmax(nw, NULL);
-	double smallest_tmax = possible_tmax / 3;
+	double smallest_tmax = 0.0;//possible_tmax / 3;
 	double tmax = possible_tmax * ns;
 	double tmin = smallest_tmax;
+
 	unsigned found_sol = sc_hypervisor_lp_execute_dichotomy(ns, nw, w_in_s, 1, (void*)&specific_data, 
 								tmin, tmax, smallest_tmax, _compute_workers_distrib);
 //			starpu_pthread_mutex_unlock(&mutex);

+ 1 - 1
sc_hypervisor/src/policies_utils/lp_programs.c

@@ -241,7 +241,7 @@ double sc_hypervisor_lp_simulate_distrib_tasks(int ns, int nw, int nt, double w_
 				w_in_s[s][w] = (double)glp_mip_col_val(lp, nw*nt+s*nw+w+1);
                         else
 				w_in_s[s][w] = glp_get_col_prim(lp, nw*nt+s*nw+w+1);
-//			printf("w_in_s[%d][%d]=%lf\n", s, w, w_in_s[s][w]);
+//			printf("w %d in ctx %d = %lf\n", w, s, w_in_s[s][w]);
 		}
 //	printf("\n");
 

+ 1 - 1
sc_hypervisor/src/policies_utils/lp_tools.c

@@ -83,7 +83,7 @@ double sc_hypervisor_lp_get_tmax(int nw, int *workers)
 	int nsched_ctxs = sc_hypervisor_get_nsched_ctxs();
 
 	double res[nsched_ctxs][ntypes_of_workers];
-	return sc_hypervisor_lp_get_nworkers_per_ctx(nsched_ctxs, ntypes_of_workers, res, total_nw) * 1000;
+	return sc_hypervisor_lp_get_nworkers_per_ctx(nsched_ctxs, ntypes_of_workers, res, total_nw) * 1000.0;
 }
 
 void sc_hypervisor_lp_round_double_to_int(int ns, int nw, double res[ns][nw], int res_rounded[ns][nw])

+ 3 - 2
sc_hypervisor/src/policies_utils/policy_tools.c

@@ -437,7 +437,7 @@ void sc_hypervisor_get_tasks_times(int nw, int nt, double times[nw][nt], int *wo
                                 times[w][t] = NAN;
 			else
 			{
-                                times[w][t] = length / 1000.;
+                                times[w][t] = (length / 1000.);
 
 				double transfer_time = 0.0;
 				enum starpu_worker_archtype arch = starpu_worker_get_type(worker);
@@ -447,7 +447,7 @@ void sc_hypervisor_get_tasks_times(int nw, int nt, double times[nw][nt], int *wo
 					if(!worker_in_ctx && !size_ctxs)
 					{
 						double transfer_velocity = starpu_get_bandwidth_RAM_CUDA(worker);
-						transfer_time +=  (tp->footprint / transfer_velocity) / 1000. ;
+						transfer_time +=  (tp->data_size / transfer_velocity) / 1000. ;
 					}
 					double latency = starpu_get_latency_RAM_CUDA(worker);
 					transfer_time += latency/1000.;
@@ -456,6 +456,7 @@ void sc_hypervisor_get_tasks_times(int nw, int nt, double times[nw][nt], int *wo
 //				printf("%d/%d %s x %d time = %lf transfer_time = %lf\n", w, tp->sched_ctx_id, tp->cl->model->symbol, tp->n, times[w][t], transfer_time);
 				times[w][t] += transfer_time;
 			}
+//			printf("sc%d w%d task %s nt %d times %lf s\n", tp->sched_ctx_id, w, tp->cl->model->symbol, tp->n, times[w][t]);
                 }
         }
 }

+ 22 - 29
sc_hypervisor/src/policies_utils/speed.c

@@ -84,7 +84,6 @@ double sc_hypervisor_get_velocity_per_worker(struct sc_hypervisor_wrapper *sc_w,
 /* 		} */
 			
                 double vel  = (elapsed_flops/elapsed_time);/* in Gflops/s */
-		sc_w->ref_velocity[worker] = sc_w->ref_velocity[worker] > 1.0 ? (sc_w->ref_velocity[worker] + vel) / 2 : vel; 
                 return vel;
         }
 
@@ -113,42 +112,36 @@ double sc_hypervisor_get_velocity_per_worker_type(struct sc_hypervisor_wrapper*
                 if(arch == req_arch)
                 {
 			double _vel = sc_hypervisor_get_velocity_per_worker(sc_w, worker);
-			if(_vel == -1.0) return -1.0;
-			velocity += _vel;
-			nworkers++;
+			if(_vel > 0.0)
+			{
+				velocity += _vel;
+				nworkers++;
+
+			}
 		}
-	}
-			
+	}			
 
-        return (nworkers != 0 ? velocity / nworkers : -1.0);
+	velocity = ((nworkers != 0 && velocity > 0.1) ? velocity / nworkers : -1.0);
+	if(velocity != -1.0)
+	{
+		if(arch == STARPU_CUDA_WORKER)
+			sc_w->ref_velocity[0] = sc_w->ref_velocity[0] > 1.0 ? (sc_w->ref_velocity[0] + velocity) / 2 : velocity; 
+		else
+			sc_w->ref_velocity[1] = sc_w->ref_velocity[1] > 1.0 ? (sc_w->ref_velocity[1] + velocity) / 2 : velocity; 
+	}
+	return velocity;
 }
 
 /* compute an average value of the cpu/cuda old velocity */
 double sc_hypervisor_get_ref_velocity_per_worker_type(struct sc_hypervisor_wrapper* sc_w, enum starpu_worker_archtype arch)
 {
-	double ref_velocity = 0.0;
-	unsigned nw = 0;
-
-	struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sc_w->sched_ctx);
-	int worker;
+	if(arch == STARPU_CUDA_WORKER && sc_w->ref_velocity[0] > 0.0)
+		return sc_w->ref_velocity[0];
+	else
+		if(arch == STARPU_CPU_WORKER && sc_w->ref_velocity[1] > 0.0)
+			return sc_w->ref_velocity[1];
 
-	struct starpu_sched_ctx_iterator it;
-	if(workers->init_iterator)
-		workers->init_iterator(workers, &it);
-
-	while(workers->has_next(workers, &it))
-	{
-		worker = workers->get_next(workers, &it);
-                enum starpu_worker_archtype req_arch = starpu_worker_get_type(worker);
-                if(arch == req_arch)
-                {
-			if(sc_w->ref_velocity[worker] < 1.0) return -1.0;
-			ref_velocity += sc_w->ref_velocity[worker];
-			nw++;
-		}
-	}
-	
-	return (nw != 0 ? ref_velocity / nw : -1.0);
+	return -1.0;
 }
 
 double sc_hypervisor_get_velocity(struct sc_hypervisor_wrapper *sc_w, enum starpu_worker_archtype arch)

+ 2 - 1
sc_hypervisor/src/policies_utils/task_pool.c

@@ -17,7 +17,7 @@
 
 #include "sc_hypervisor_policy.h"
 
-void sc_hypervisor_policy_add_task_to_pool(struct starpu_codelet *cl, unsigned sched_ctx, uint32_t footprint, struct sc_hypervisor_policy_task_pool **task_pools)
+void sc_hypervisor_policy_add_task_to_pool(struct starpu_codelet *cl, unsigned sched_ctx, uint32_t footprint, struct sc_hypervisor_policy_task_pool **task_pools, size_t data_size)
 {
 	struct sc_hypervisor_policy_task_pool *tp = NULL;
 
@@ -35,6 +35,7 @@ void sc_hypervisor_policy_add_task_to_pool(struct starpu_codelet *cl, unsigned s
 		tp->sched_ctx_id = sched_ctx;
 		tp->n = 0;
 		tp->next = *task_pools;
+		tp->data_size = data_size;
 		*task_pools = tp;
 	}
 

+ 8 - 6
sc_hypervisor/src/sc_hypervisor.c

@@ -28,7 +28,7 @@ static void notify_pushed_task(unsigned sched_ctx, int worker);
 static void notify_poped_task(unsigned sched_ctx, int worker, struct starpu_task *task, size_t data_size, uint32_t footprint);
 static void notify_post_exec_hook(unsigned sched_ctx, int taskid);
 static void notify_idle_end(unsigned sched_ctx, int  worker);
-static void notify_submitted_job(struct starpu_task *task, unsigned footprint);
+static void notify_submitted_job(struct starpu_task *task, unsigned footprint, size_t data_size);
 static void notify_delete_context(unsigned sched_ctx);
 
 extern struct sc_hypervisor_policy idle_policy;
@@ -164,6 +164,9 @@ struct starpu_sched_ctx_performance_counters* sc_hypervisor_init(struct sc_hyper
 		starpu_pthread_mutex_init(&hypervisor.sched_ctx_w[i].mutex, NULL);
 		hypervisor.optimal_v[i] = 0.0;
 
+		hypervisor.sched_ctx_w[i].ref_velocity[0] = -1.0;
+		hypervisor.sched_ctx_w[i].ref_velocity[1] = -1.0;
+
 		int j;
 		for(j = 0; j < STARPU_NMAXWORKERS; j++)
 		{
@@ -177,7 +180,6 @@ struct starpu_sched_ctx_performance_counters* sc_hypervisor_init(struct sc_hyper
 			hypervisor.sched_ctx_w[i].elapsed_tasks[j] = 0;
 			hypervisor.sched_ctx_w[i].total_elapsed_flops[j] = 0.0;
 			hypervisor.sched_ctx_w[i].worker_to_be_removed[j] = 0;
-			hypervisor.sched_ctx_w[i].ref_velocity[j] = -1.0;
 		}
 	}
 
@@ -866,21 +868,21 @@ static void notify_post_exec_hook(unsigned sched_ctx, int task_tag)
 	return;
 }
 
-static void notify_submitted_job(struct starpu_task *task, uint32_t footprint)
+static void notify_submitted_job(struct starpu_task *task, uint32_t footprint, size_t data_size)
 {
 	starpu_pthread_mutex_lock(&act_hypervisor_mutex);
 	hypervisor.sched_ctx_w[task->sched_ctx].submitted_flops += task->flops;
 	starpu_pthread_mutex_unlock(&act_hypervisor_mutex);
 
 	if(hypervisor.policy.handle_submitted_job && !type_of_tasks_known)
-		hypervisor.policy.handle_submitted_job(task->cl, task->sched_ctx, footprint);
+		hypervisor.policy.handle_submitted_job(task->cl, task->sched_ctx, footprint, data_size);
 }
 
-void sc_hypervisor_set_type_of_task(struct starpu_codelet *cl, unsigned sched_ctx, uint32_t footprint)
+void sc_hypervisor_set_type_of_task(struct starpu_codelet *cl, unsigned sched_ctx, uint32_t footprint, size_t data_size)
 {
 	type_of_tasks_known = 1;
 	if(hypervisor.policy.handle_submitted_job)
-		hypervisor.policy.handle_submitted_job(cl, sched_ctx, footprint);
+		hypervisor.policy.handle_submitted_job(cl, sched_ctx, footprint, data_size);
 }
 
 static void notify_delete_context(unsigned sched_ctx)

+ 1 - 1
src/core/sched_policy.c

@@ -295,7 +295,7 @@ static int _starpu_push_task_on_specific_worker(struct starpu_task *task, int wo
 
 static int _starpu_nworkers_able_to_execute_task(struct starpu_task *task, struct _starpu_sched_ctx *sched_ctx)
 {
-	int worker = -1, nworkers = 0;
+	unsigned worker = 0, nworkers = 0;
 	struct starpu_worker_collection *workers = sched_ctx->workers;
 
 	struct starpu_sched_ctx_iterator it;

+ 11 - 2
src/core/task.c

@@ -237,9 +237,18 @@ int _starpu_submit_job(struct _starpu_job *j)
 	   && sched_ctx->perf_counters != NULL)
 	{
 		_starpu_compute_buffers_footprint(j->task->cl->model, STARPU_CPU_DEFAULT, 0, j);
-		sched_ctx->perf_counters->notify_submitted_job(j->task, j->footprint);
+		int i;
+		size_t data_size = 0;
+		for(i = 0; i < STARPU_NMAXBUFS; i++)
+		{
+			starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, i);
+			if (handle != NULL)
+				data_size += _starpu_data_get_size(handle);
+		}
+
+		sched_ctx->perf_counters->notify_submitted_job(j->task, j->footprint, data_size);
 	}
-#endif
+#endif//STARPU_USE_SC_HYPERVISOR
 
 	/* We retain handle reference count */
 	if (task->cl)