Bläddra i källkod

consider the flops of ready tasks when resizing and compute the speed for a sample of time too

Andra Hugo 12 år sedan
förälder
incheckning
39cfbbcb7f

+ 2 - 0
include/starpu_sched_ctx_hypervisor.h

@@ -32,6 +32,8 @@ struct starpu_sched_ctx_performance_counters
 	void (*notify_poped_task)(unsigned sched_ctx_id, int worker, struct starpu_task *task, size_t data_size, uint32_t footprint);
 	void (*notify_post_exec_hook)(unsigned sched_ctx_id, int taskid);
 	void (*notify_submitted_job)(struct starpu_task *task, uint32_t footprint, size_t data_size);
+	void (*notify_ready_task)(unsigned sched_ctx_id, struct starpu_task *task);
+	void (*notify_empty_ctx)(unsigned sched_ctx_id, struct starpu_task *task);
 	void (*notify_delete_context)(unsigned sched_ctx);
 };
 

+ 10 - 0
sc_hypervisor/include/sc_hypervisor_monitoring.h

@@ -90,6 +90,9 @@ struct sc_hypervisor_wrapper
 	/* number of flops that still have to be executed in this ctx */
 	double remaining_flops;
 	
+	/* number of flops coresponding to the ready tasks in this ctx */
+	double ready_flops;
+
 	/* the start time of the resizing sample of this context*/
 	double start_time;
 
@@ -102,6 +105,13 @@ struct sc_hypervisor_wrapper
 
 	/* mutex to protect the ack of workers */
 	starpu_pthread_mutex_t mutex;
+
+	/* boolean indicating if the resizing strategy can see the
+	   flops of all the execution or not */
+	unsigned total_flops_available;
+
+	/* the number of ready tasks submitted to a ctx */
+	int nready_tasks;
 };
 
 /* return the wrapper of context that saves its monitoring information */

+ 59 - 13
sc_hypervisor/src/policies_utils/lp_tools.c

@@ -31,6 +31,7 @@ double sc_hypervisor_lp_get_nworkers_per_ctx(int nsched_ctxs, int ntypes_of_work
 	int nw = tw->nw;
 	int i = 0;
 	struct sc_hypervisor_wrapper* sc_w;
+
 	for(i = 0; i < nsched_ctxs; i++)
 	{
 		sc_w = sc_hypervisor_get_wrapper(sched_ctxs[i]);
@@ -38,8 +39,18 @@ double sc_hypervisor_lp_get_nworkers_per_ctx(int nsched_ctxs, int ntypes_of_work
 		for(w = 0; w < nw; w++)
 			v[i][w] = sc_hypervisor_get_speed(sc_w, sc_hypervisor_get_arch_for_index(w, tw)); 
 		
-		flops[i] = sc_w->remaining_flops < 0.0 ? 0.0 : sc_w->remaining_flops/1000000000; /* in gflops*/
-//		printf("%d: flops %lf\n", sched_ctxs[i], flops[i]);
+//		flops[i] = sc_w->ready_flops/1000000000.0; /* in gflops*/
+		if(sc_w->remaining_flops < 0.0)
+			flops[i] = sc_w->ready_flops/1000000000.0; /* in gflops*/
+		else
+		{
+			if((sc_w->ready_flops/1000000000.0) < 0.5)
+				flops[i] = 0.0;
+			else
+				flops[i] = sc_w->remaining_flops/1000000000.0; /* in gflops*/
+		}
+/* 		printf("%d: flops %lf remaining flops %lf ready flops %lf nready_tasks %d\n",  */
+/* 		       sched_ctxs[i], flops[i], sc_w->remaining_flops/1000000000, sc_w->ready_flops/1000000000, sc_w->nready_tasks); */
 	}
 
 	double vmax = 1/sc_hypervisor_lp_simulate_distrib_flops(nsched_ctxs, ntypes_of_workers, v, flops, res, total_nw);
@@ -51,7 +62,29 @@ double sc_hypervisor_lp_get_nworkers_per_ctx(int nsched_ctxs, int ntypes_of_work
 #else
 		optimal_v = res[i][0] * v[i][0];
 #endif //STARPU_USE_CUDA
-//				printf("%d: set opt %lf\n", i, optimal_v[i]);
+		int w;
+		unsigned no_workers = 1;
+		for(w = 0; w < nw; w++)
+			if(res[i][w] != 0.0)
+			{
+				no_workers = 0;
+				break;
+			}
+
+		sc_w = sc_hypervisor_get_wrapper(sched_ctxs[i]);
+
+/* if the hypervisor gave 0 workers to a context but the context still 
+has some last flops or a ready task that does not even have any flops
+we give a worker (in shared mode) to the context in order to leave him
+finish its work = we give -1.0 value instead of 0.0 and further on in
+the distribution function we take this into account and revert the variable
+to its 0.0 value */
+		if(no_workers && (flops[i] != 0.0 || sc_w->nready_tasks > 0))
+		{
+			for(w = 0; w < nw; w++)
+				res[i][w] = -1.0;
+		}
+
 		if(optimal_v != 0.0)
 			_set_optimal_v(i, optimal_v);
 	}
@@ -135,14 +168,17 @@ void sc_hypervisor_lp_round_double_to_int(int ns, int nw, double res[ns][nw], in
 }
 
 void _lp_find_workers_to_give_away(int nw, int ns, unsigned sched_ctx, int sched_ctx_idx, 
-				  int tmp_nw_move[nw], int tmp_workers_move[nw][STARPU_NMAXWORKERS], 
-				  int tmp_nw_add[nw], int tmp_workers_add[nw][STARPU_NMAXWORKERS],
+				   int tmp_nw_move[nw], int tmp_workers_move[nw][STARPU_NMAXWORKERS], 
+				   int tmp_nw_add[nw], int tmp_workers_add[nw][STARPU_NMAXWORKERS],
 				   int res_rounded[ns][nw], double res[ns][nw], struct types_of_workers *tw)
 {
 	int w;
 	double target_res = 0.0;
 	for(w = 0; w < nw; w++)
+	{
 		target_res += res[sched_ctx_idx][w];
+		if(res[sched_ctx_idx][w] == -1.0) res[sched_ctx_idx][w] = 0.0;
+	}
 
 	for(w = 0; w < nw; w++)
 	{
@@ -156,7 +192,7 @@ void _lp_find_workers_to_give_away(int nw, int ns, unsigned sched_ctx, int sched
 				int nworkers_to_move = nworkers_ctx - res_rounded[sched_ctx_idx][w];
 				int *workers_to_move = sc_hypervisor_get_idlest_workers(sched_ctx, &nworkers_to_move, arch);
 				int i;
-				if(target_res == 0.0 && nworkers_to_move > 0)
+				if(target_res < 0.0 && nworkers_to_move > 0)
 				{
 					tmp_workers_add[w][tmp_nw_add[w]++] = workers_to_move[0];
 					for(i = 1; i < nworkers_to_move; i++)
@@ -324,7 +360,8 @@ void sc_hypervisor_lp_redistribute_resources_in_ctxs(int ns, int nw, int res_rou
 		/* find workers that ctx s has to give away */
 		_lp_find_workers_to_give_away(nw, ns, sched_ctxs[s], s, 
 					      tmp_nw_move, tmp_workers_move, 
-					      tmp_nw_add, tmp_workers_add, res_rounded, res, tw);
+					      tmp_nw_add, tmp_workers_add, res_rounded, 
+					      res, tw);
 		for(s2 = 0; s2 < ns; s2++)
 		{
 			if(sched_ctxs[s2] != sched_ctxs[s])
@@ -399,7 +436,8 @@ int _lp_get_unwanted_workers(int *workers_add, int nw_add, unsigned sched_ctx, i
 	return nw_remove;
 }
 
-void sc_hypervisor_lp_distribute_resources_in_ctxs(unsigned* sched_ctxs, int ns, int nw, int res_rounded[ns][nw], double res[ns][nw], int *workers, int nworkers, struct types_of_workers *tw)
+void sc_hypervisor_lp_distribute_resources_in_ctxs(unsigned* sched_ctxs, int ns, int nw, int res_rounded[ns][nw], 
+						   double res[ns][nw], int *workers, int nworkers, struct types_of_workers *tw)
 {
 	int s, w;
 	int start[nw];
@@ -411,7 +449,10 @@ void sc_hypervisor_lp_distribute_resources_in_ctxs(unsigned* sched_ctxs, int ns,
                 int nw_add = 0;
 		double target_res = 0.0;
 		for(w = 0; w < nw; w++)
+		{
 			target_res += res[s][w];
+			if(res[s][w] == -1.0) res[s][w] = 0.0;
+		}
 
 		for(w = 0; w < nw; w++)
 		{
@@ -420,15 +461,19 @@ void sc_hypervisor_lp_distribute_resources_in_ctxs(unsigned* sched_ctxs, int ns,
 			if(arch == STARPU_CPU_WORKER) 
 			{
 				int nworkers_to_add = res_rounded[s][w];
-				if(target_res == 0.0)
+				if(target_res < 0.0)
 				{
 					nworkers_to_add=1;
 					int old_start = start[w];
+					if(start[w] == nworkers)
+						start[w]--;
 					int *workers_to_add = sc_hypervisor_get_idlest_workers_in_list(&start[w], workers, nworkers, &nworkers_to_add, arch);
 					start[w] = old_start;
 					int i;
 					for(i = 0; i < nworkers_to_add; i++)
+					{
 						workers_add[nw_add++] = workers_to_add[i];
+					}
 					free(workers_to_add);
 				}
 				else
@@ -473,11 +518,12 @@ void sc_hypervisor_lp_distribute_resources_in_ctxs(unsigned* sched_ctxs, int ns,
 		if(nw_add > 0)
 		{
 			sc_hypervisor_add_workers_to_sched_ctx(workers_add, nw_add, sched_ctxs[s]);
-			int workers_remove[STARPU_NMAXWORKERS];
-			int nw_remove = _lp_get_unwanted_workers(workers_add, nw_add, sched_ctxs[s], workers_remove);
-			sc_hypervisor_remove_workers_from_sched_ctx(workers_remove, nw_remove, sched_ctxs[s], !(_sc_hypervisor_use_lazy_resize()));
-			sc_hypervisor_start_resize(sched_ctxs[s]);
 		}
+		int workers_remove[STARPU_NMAXWORKERS];
+		int nw_remove = _lp_get_unwanted_workers(workers_add, nw_add, sched_ctxs[s], workers_remove);
+		sc_hypervisor_remove_workers_from_sched_ctx(workers_remove, nw_remove, sched_ctxs[s], !(_sc_hypervisor_use_lazy_resize()));
+		sc_hypervisor_start_resize(sched_ctxs[s]);
+
 
 //		sc_hypervisor_stop_resize(current_sched_ctxs[s]);
 	}

+ 39 - 9
sc_hypervisor/src/policies_utils/speed.c

@@ -34,10 +34,24 @@ double sc_hypervisor_get_ctx_speed(struct sc_hypervisor_wrapper* sc_w)
 	double start_sample = start_sample_prc > 0.0 ? (start_sample_prc / 100) * total_flops : sample;
 	double redim_sample = elapsed_flops == total_elapsed_flops ? (start_sample > 0.0 ? start_sample : sample) : sample;
 
-	if(elapsed_flops >= redim_sample)
+	double curr_time = starpu_timing_now();
+	double elapsed_time = (curr_time - sc_w->start_time) / 1000000.0; /* in seconds */
+	
+	unsigned can_compute_speed = 0;
+	char *speed_sample_criteria = getenv("SC_HYPERVISOR_SAMPLE_CRITERIA");
+	if(speed_sample_criteria && (strcmp(speed_sample_criteria, "time") == 0))
+	{
+		int n_all_cpus = starpu_cpu_worker_get_count();
+		int n_all_cuda = starpu_cuda_worker_get_count();
+		double th_speed = SC_HYPERVISOR_DEFAULT_CPU_SPEED * n_all_cpus + SC_HYPERVISOR_DEFAULT_CUDA_SPEED * n_all_cuda;
+		double time_sample = 0.1 * ((total_flops/1000000000.0) / th_speed);
+		can_compute_speed = elapsed_time >= time_sample;
+	}
+	else
+		can_compute_speed = elapsed_flops >= redim_sample;
+
+	if(can_compute_speed)
         {
-                double curr_time = starpu_timing_now();
-                double elapsed_time = (curr_time - sc_w->start_time) / 1000000.0; /* in seconds */
                 return (elapsed_flops/1000000000.0)/elapsed_time;/* in Gflops/s */
         }
 	return -1.0;
@@ -100,8 +114,28 @@ double sc_hypervisor_get_speed_per_worker_type(struct sc_hypervisor_wrapper* sc_
 
 	double ctx_elapsed_flops = sc_hypervisor_get_elapsed_flops_per_sched_ctx(sc_w);
 	double ctx_sample = config->ispeed_ctx_sample;
-	if(ctx_elapsed_flops > ctx_sample)
+
+	double curr_time = starpu_timing_now();
+	double elapsed_time = (curr_time - sc_w->start_time) / 1000000.0; /* in seconds */
+	
+	unsigned can_compute_speed = 0;
+	char *speed_sample_criteria = getenv("SC_HYPERVISOR_SAMPLE_CRITERIA");
+	if(speed_sample_criteria && (strcmp(speed_sample_criteria, "time") == 0))
 	{
+		int n_all_cpus = starpu_cpu_worker_get_count();
+		int n_all_cuda = starpu_cuda_worker_get_count();
+		double th_speed = SC_HYPERVISOR_DEFAULT_CPU_SPEED * n_all_cpus + SC_HYPERVISOR_DEFAULT_CUDA_SPEED * n_all_cuda;
+		double total_flops = sc_w->total_flops;
+		double time_sample = 0.1 * ((total_flops/1000000000.0) / th_speed);
+		can_compute_speed = elapsed_time >= time_sample;
+	}
+	else
+		can_compute_speed = ctx_elapsed_flops > ctx_sample;
+
+	if(can_compute_speed)
+        {
+		if(ctx_elapsed_flops == 0.0) return -1.0;
+
 		struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sc_w->sched_ctx);
 		int worker;
 		
@@ -128,11 +162,7 @@ double sc_hypervisor_get_speed_per_worker_type(struct sc_hypervisor_wrapper* sc_
 		
 		if(nworkers != 0)
 		{
-			double curr_time = starpu_timing_now();
-			
-			/* compute speed for the last frame */
-			double elapsed_time = (curr_time - sc_w->start_time) / 1000000.0; /* in seconds */
-			elapsed_time -= max_workers_idle_time;
+//			elapsed_time -= max_workers_idle_time;
 			speed = (all_workers_flops / elapsed_time) / nworkers;
 		}
 		else

+ 26 - 10
sc_hypervisor/src/sc_hypervisor.c

@@ -28,6 +28,8 @@ static void notify_poped_task(unsigned sched_ctx, int worker, struct starpu_task
 static void notify_post_exec_hook(unsigned sched_ctx, int taskid);
 static void notify_idle_end(unsigned sched_ctx, int  worker);
 static void notify_submitted_job(struct starpu_task *task, unsigned footprint, size_t data_size);
+static void notify_ready_task(unsigned sched_ctx, struct starpu_task *task);
+static void notify_empty_ctx(unsigned sched_ctx, struct starpu_task *task);
 static void notify_delete_context(unsigned sched_ctx);
 
 extern struct sc_hypervisor_policy idle_policy;
@@ -188,6 +190,9 @@ void* sc_hypervisor_init(struct sc_hypervisor_policy *hypervisor_policy)
 
 		hypervisor.sched_ctx_w[i].ref_speed[0] = -1.0;
 		hypervisor.sched_ctx_w[i].ref_speed[1] = -1.0;
+		hypervisor.sched_ctx_w[i].ready_flops = 0.0;
+		hypervisor.sched_ctx_w[i].total_flops_available = 0;
+		hypervisor.sched_ctx_w[i].nready_tasks = 0;
 
 		int j;
 		for(j = 0; j < STARPU_NMAXWORKERS; j++)
@@ -215,6 +220,8 @@ void* sc_hypervisor_init(struct sc_hypervisor_policy *hypervisor_policy)
 	perf_counters->notify_post_exec_hook = notify_post_exec_hook;
 	perf_counters->notify_idle_end = notify_idle_end;
 	perf_counters->notify_submitted_job = notify_submitted_job;
+	perf_counters->notify_ready_task = notify_ready_task;
+	perf_counters->notify_empty_ctx = notify_empty_ctx;
 	perf_counters->notify_delete_context = notify_delete_context;
 
 	starpu_sched_ctx_notify_hypervisor_exists();
@@ -827,13 +834,13 @@ static void notify_poped_task(unsigned sched_ctx, int worker, struct starpu_task
 	hypervisor.sched_ctx_w[sched_ctx].elapsed_data[worker] += data_size ;
 	hypervisor.sched_ctx_w[sched_ctx].elapsed_tasks[worker]++ ;
 	hypervisor.sched_ctx_w[sched_ctx].total_elapsed_flops[worker] += task->flops;
+
 	starpu_pthread_mutex_lock(&act_hypervisor_mutex);
 	hypervisor.sched_ctx_w[sched_ctx].remaining_flops -= task->flops;
-/* 	if(hypervisor.sched_ctx_w[sched_ctx].remaining_flops < 0.0) */
-/* 		hypervisor.sched_ctx_w[sched_ctx].remaining_flops = 0.0; */
-//	double ctx_elapsed_flops = sc_hypervisor_get_elapsed_flops_per_sched_ctx(&hypervisor.sched_ctx_w[sched_ctx]);
-/* 	printf("*****************STARPU_STARPU_STARPU: decrement %lf flops  remaining flops %lf total flops %lf elapseed flops %lf in ctx %d \n", */
-/* 	       task->flops, hypervisor.sched_ctx_w[sched_ctx].remaining_flops,  hypervisor.sched_ctx_w[sched_ctx].total_flops, ctx_elapsed_flops, sched_ctx); */
+	hypervisor.sched_ctx_w[sched_ctx].nready_tasks--;
+	hypervisor.sched_ctx_w[sched_ctx].ready_flops -= task->flops;
+	if(hypervisor.sched_ctx_w[sched_ctx].ready_flops < 0.0)
+		hypervisor.sched_ctx_w[sched_ctx].ready_flops = 0.0;
 	starpu_pthread_mutex_unlock(&act_hypervisor_mutex);
 
 	if(hypervisor.resize[sched_ctx])
@@ -909,6 +916,19 @@ static void notify_submitted_job(struct starpu_task *task, uint32_t footprint, s
 		hypervisor.policy.handle_submitted_job(task->cl, task->sched_ctx, footprint, data_size);
 }
 
+static void notify_ready_task(unsigned sched_ctx_id, struct starpu_task *task)
+{
+	starpu_pthread_mutex_lock(&act_hypervisor_mutex);
+	hypervisor.sched_ctx_w[sched_ctx_id].nready_tasks++;
+	hypervisor.sched_ctx_w[sched_ctx_id].ready_flops += task->flops;
+	starpu_pthread_mutex_unlock(&act_hypervisor_mutex);
+}
+
+static void notify_empty_ctx(unsigned sched_ctx_id, struct starpu_task *task)
+{
+	sc_hypervisor_resize_ctxs(NULL, -1 , NULL, -1);
+}
+
 void sc_hypervisor_set_type_of_task(struct starpu_codelet *cl, unsigned sched_ctx, uint32_t footprint, size_t data_size)
 {
 	type_of_tasks_known = 1;
@@ -956,7 +976,7 @@ int sc_hypervisor_get_nsched_ctxs()
 int _sc_hypervisor_use_lazy_resize(void)
 {
 	char* lazy = getenv("SC_HYPERVISOR_LAZY_RESIZE");
-	return lazy ? atof(lazy)  : 1;
+	return lazy ? atoi(lazy)  : 1;
 }
 
 void sc_hypervisor_save_size_req(unsigned *sched_ctxs, int nsched_ctxs, int *workers, int nworkers)
@@ -1029,13 +1049,9 @@ struct types_of_workers* sc_hypervisor_get_types_of_workers(int *workers, unsign
 
 void sc_hypervisor_update_diff_total_flops(unsigned sched_ctx, double diff_total_flops)
 {
-//	double diff = total_flops - hypervisor.sched_ctx_w[sched_ctx].total_flops;
-//	printf("*****************STARPU_STARPU_STARPU: update diff flops %lf to ctx %d \n", diff_total_flops, sched_ctx);
 	starpu_pthread_mutex_lock(&act_hypervisor_mutex);
 	hypervisor.sched_ctx_w[sched_ctx].total_flops += diff_total_flops;
 	hypervisor.sched_ctx_w[sched_ctx].remaining_flops += diff_total_flops;	
-/* 	printf("*****************STARPU_STARPU_STARPU: total flops %lf remaining flops %lf in ctx %d \n", */
-/* 	       hypervisor.sched_ctx_w[sched_ctx].total_flops, hypervisor.sched_ctx_w[sched_ctx].remaining_flops, sched_ctx); */
 	starpu_pthread_mutex_unlock(&act_hypervisor_mutex);
 }
 

+ 17 - 0
src/core/sched_policy.c

@@ -323,6 +323,12 @@ int _starpu_push_task(struct _starpu_job *j)
 	_STARPU_TRACE_JOB_PUSH(task, task->priority > 0);
 	_starpu_increment_nready_tasks();
 	task->status = STARPU_TASK_READY;
+#ifdef STARPU_USE_SC_HYPERVISOR
+	if(sched_ctx != NULL && sched_ctx->id != 0 && sched_ctx->perf_counters != NULL 
+	   && sched_ctx->perf_counters->notify_ready_task)
+		sched_ctx->perf_counters->notify_ready_task(sched_ctx->id, task);
+#endif //STARPU_USE_SC_HYPERVISOR
+
 #ifdef HAVE_AYUDAME_H
 	if (AYU_event)
 	{
@@ -342,6 +348,11 @@ int _starpu_push_task(struct _starpu_job *j)
 			STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->empty_ctx_mutex);
 			starpu_task_list_push_front(&sched_ctx->empty_ctx_tasks, task);
 			STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->empty_ctx_mutex);
+#ifdef STARPU_USE_SC_HYPERVISOR
+			if(sched_ctx != NULL && sched_ctx->id != 0 && sched_ctx->perf_counters != NULL 
+			   && sched_ctx->perf_counters->notify_empty_ctx)
+				sched_ctx->perf_counters->notify_empty_ctx(sched_ctx->id, task);
+#endif
 			return 0;
 		}
 	}
@@ -381,6 +392,12 @@ int _starpu_push_task_to_workers(struct starpu_task *task)
 			STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->empty_ctx_mutex);
 			starpu_task_list_push_back(&sched_ctx->empty_ctx_tasks, task);
 			STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->empty_ctx_mutex);
+#ifdef STARPU_USE_SC_HYPERVISOR
+			if(sched_ctx != NULL && sched_ctx->id != 0 && sched_ctx->perf_counters != NULL 
+			   && sched_ctx->perf_counters->notify_empty_ctx)
+				sched_ctx->perf_counters->notify_empty_ctx(sched_ctx->id, task);
+#endif
+
 			return -EAGAIN;
 		}
 	}