Browse Source

bug fixingg + granularity computation

Andra Hugo 13 years ago
parent
commit
463b9da491

+ 2 - 0
include/starpu_scheduler.h

@@ -174,6 +174,8 @@ unsigned starpu_get_sched_ctx();
 
 
 unsigned starpu_get_nworkers_of_sched_ctx(unsigned sched_ctx);
 unsigned starpu_get_nworkers_of_sched_ctx(unsigned sched_ctx);
 
 
+unsigned starpu_get_nshared_workers(unsigned sched_ctx_id, unsigned sched_ctx_id2);
+
 /* Check if the worker specified by workerid can execute the codelet. */
 /* Check if the worker specified by workerid can execute the codelet. */
 int starpu_worker_may_execute_task(unsigned workerid, struct starpu_task *task, unsigned nimpl);
 int starpu_worker_may_execute_task(unsigned workerid, struct starpu_task *task, unsigned nimpl);
 
 

+ 6 - 0
sched_ctx_hypervisor/include/sched_ctx_hypervisor.h

@@ -91,6 +91,12 @@ double sched_ctx_hypervisor_get_idle_time(unsigned sched_ctx, int worker);
 
 
 double sched_ctx_hypervisor_get_bef_res_exp_end(unsigned sched_ctx);
 double sched_ctx_hypervisor_get_bef_res_exp_end(unsigned sched_ctx);
 
 
+double sched_ctx_hypervisor_get_ctx_velocity(unsigned sched_ctx);
+
+double sched_ctx_hypervisor_get_cpu_velocity(unsigned sched_ctx);
+
+double sched_ctx_hypervisor_get_flops_left(unsigned sched_ctx);
+
 /* hypervisor policies */
 /* hypervisor policies */
 #define IDLE_POLICY 1
 #define IDLE_POLICY 1
 #define APP_DRIVEN_POLICY 2
 #define APP_DRIVEN_POLICY 2

+ 144 - 32
sched_ctx_hypervisor/src/hypervisor_policies/simple_policy.c

@@ -57,7 +57,7 @@ static unsigned _find_poor_sched_ctx(unsigned req_sched_ctx, int nworkers_to_mov
 	return sched_ctx;
 	return sched_ctx;
 }
 }
 
 
-int* _get_first_workers(unsigned sched_ctx, unsigned *nworkers)
+int* _get_first_workers(unsigned sched_ctx, unsigned *nworkers, enum starpu_archtype arch)
 {
 {
 	struct policy_config *config = sched_ctx_hypervisor_get_config(sched_ctx);
 	struct policy_config *config = sched_ctx_hypervisor_get_config(sched_ctx);
 
 
@@ -80,40 +80,45 @@ int* _get_first_workers(unsigned sched_ctx, unsigned *nworkers)
 		{
 		{
 			considered = 0;
 			considered = 0;
 			worker = workers->get_next(workers);
 			worker = workers->get_next(workers);
-			if(!config->fixed_workers[worker])
+			enum starpu_archtype curr_arch = starpu_worker_get_type(worker);
+			if(arch == 0 || curr_arch == arch)
 			{
 			{
-				for(i = 0; i < index; i++)
+
+				if(!config->fixed_workers[worker])
 				{
 				{
-					if(curr_workers[i] == worker)
+					for(i = 0; i < index; i++)
 					{
 					{
-						considered = 1;
-						break;
+						if(curr_workers[i] == worker)
+						{
+							considered = 1;
+							break;
+						}
 					}
 					}
-				}
-				
-				if(!considered)
-				{
-					/* the first iteration*/
-					if(curr_workers[index] < 0)
+					
+					if(!considered)
+					{
+						/* the first iteration*/
+						if(curr_workers[index] < 0)
 						curr_workers[index] = worker;
 						curr_workers[index] = worker;
-					/* small priority worker is the first to leave the ctx*/
-					else if(config->priority[worker] <
-						   config->priority[curr_workers[index]])
+						/* small priority worker is the first to leave the ctx*/
+						else if(config->priority[worker] <
+							config->priority[curr_workers[index]])
 						curr_workers[index] = worker;
 						curr_workers[index] = worker;
-					/* if we don't consider priorities check for the workers
-					 with the biggest idle time */
-					else if(config->priority[worker] ==
-						   config->priority[curr_workers[index]])
-					{
-						double worker_idle_time = sched_ctx_hypervisor_get_idle_time(sched_ctx, worker);
-						double curr_worker_idle_time = sched_ctx_hypervisor_get_idle_time(sched_ctx, curr_workers[index]);
-						if(worker_idle_time > curr_worker_idle_time)
-							curr_workers[index] = worker;
+						/* if we don't consider priorities check for the workers
+						   with the biggest idle time */
+						else if(config->priority[worker] ==
+							config->priority[curr_workers[index]])
+						{
+							double worker_idle_time = sched_ctx_hypervisor_get_idle_time(sched_ctx, worker);
+							double curr_worker_idle_time = sched_ctx_hypervisor_get_idle_time(sched_ctx, curr_workers[index]);
+							if(worker_idle_time > curr_worker_idle_time)
+								curr_workers[index] = worker;
+						}
 					}
 					}
 				}
 				}
 			}
 			}
 		}
 		}
-
+			
 		if(curr_workers[index] < 0)
 		if(curr_workers[index] < 0)
 		{
 		{
 			*nworkers = index;
 			*nworkers = index;
@@ -127,7 +132,7 @@ int* _get_first_workers(unsigned sched_ctx, unsigned *nworkers)
 	return curr_workers;
 	return curr_workers;
 }
 }
 
 
-static unsigned _get_potential_nworkers(struct policy_config *config, unsigned sched_ctx)
+static unsigned _get_potential_nworkers(struct policy_config *config, unsigned sched_ctx, enum starpu_archtype arch)
 {
 {
 	struct worker_collection *workers = starpu_get_worker_collection_of_sched_ctx(sched_ctx);
 	struct worker_collection *workers = starpu_get_worker_collection_of_sched_ctx(sched_ctx);
 
 
@@ -139,8 +144,12 @@ static unsigned _get_potential_nworkers(struct policy_config *config, unsigned s
 	while(workers->has_next(workers))
 	while(workers->has_next(workers))
 	{
 	{
 		worker = workers->get_next(workers);
 		worker = workers->get_next(workers);
-		if(!config->fixed_workers[worker])
-			potential_workers++;
+		enum starpu_archtype curr_arch = starpu_worker_get_type(worker);
+                if(arch == 0 || curr_arch == arch)
+                {
+			if(!config->fixed_workers[worker])
+				potential_workers++;
+		}
 	}
 	}
 	if(workers->init_cursor)
 	if(workers->init_cursor)
 		workers->deinit_cursor(workers);
 		workers->deinit_cursor(workers);
@@ -154,7 +163,7 @@ static unsigned _get_nworkers_to_move(unsigned req_sched_ctx)
 	unsigned nworkers = starpu_get_nworkers_of_sched_ctx(req_sched_ctx);
 	unsigned nworkers = starpu_get_nworkers_of_sched_ctx(req_sched_ctx);
 	unsigned nworkers_to_move = 0;
 	unsigned nworkers_to_move = 0;
 	
 	
-	unsigned potential_moving_workers = _get_potential_nworkers(config, req_sched_ctx);
+	unsigned potential_moving_workers = _get_potential_nworkers(config, req_sched_ctx, 0);
 	if(potential_moving_workers > 0)
 	if(potential_moving_workers > 0)
 	{
 	{
 		if(potential_moving_workers <= config->min_nworkers)
 		if(potential_moving_workers <= config->min_nworkers)
@@ -213,15 +222,16 @@ static unsigned _simple_resize(unsigned sender_sched_ctx, unsigned receiver_sche
 				poor_sched_ctx = receiver_sched_ctx;
 				poor_sched_ctx = receiver_sched_ctx;
 				struct policy_config *config = sched_ctx_hypervisor_get_config(poor_sched_ctx);
 				struct policy_config *config = sched_ctx_hypervisor_get_config(poor_sched_ctx);
 				unsigned nworkers = starpu_get_nworkers_of_sched_ctx(poor_sched_ctx);
 				unsigned nworkers = starpu_get_nworkers_of_sched_ctx(poor_sched_ctx);
-				if((nworkers+nworkers_to_move) > config->max_nworkers)
-					nworkers_to_move = nworkers > config->max_nworkers ? 0 : (config->max_nworkers - nworkers);
+				unsigned nshared_workers = starpu_get_nshared_workers(sender_sched_ctx, poor_sched_ctx);
+				if((nworkers+nworkers_to_move-nshared_workers) > config->max_nworkers)
+					nworkers_to_move = nworkers > config->max_nworkers ? 0 : (config->max_nworkers - nworkers+nshared_workers);
 				if(nworkers_to_move == 0) poor_sched_ctx = STARPU_NMAX_SCHED_CTXS;
 				if(nworkers_to_move == 0) poor_sched_ctx = STARPU_NMAX_SCHED_CTXS;
 			}
 			}
 
 
 
 
 			if(poor_sched_ctx != STARPU_NMAX_SCHED_CTXS)
 			if(poor_sched_ctx != STARPU_NMAX_SCHED_CTXS)
 			{						
 			{						
-				int *workers_to_move = _get_first_workers(sender_sched_ctx, &nworkers_to_move);
+				int *workers_to_move = _get_first_workers(sender_sched_ctx, &nworkers_to_move, 0);
 				sched_ctx_hypervisor_move_workers(sender_sched_ctx, poor_sched_ctx, workers_to_move, nworkers_to_move);
 				sched_ctx_hypervisor_move_workers(sender_sched_ctx, poor_sched_ctx, workers_to_move, nworkers_to_move);
 				
 				
 				struct policy_config *new_config = sched_ctx_hypervisor_get_config(poor_sched_ctx);
 				struct policy_config *new_config = sched_ctx_hypervisor_get_config(poor_sched_ctx);
@@ -239,6 +249,108 @@ static unsigned _simple_resize(unsigned sender_sched_ctx, unsigned receiver_sche
 
 
 }
 }
 
 
+static int* _get_workers_to_move(unsigned sender_sched_ctx, unsigned receiver_sched_ctx, int *nworkers)
+{
+        int *workers = NULL;
+        double v_receiver = sched_ctx_hypervisor_get_ctx_velocity(receiver_sched_ctx);
+        double receiver_remainig_flops = sched_ctx_hypervisor_get_flops_left(receiver_sched_ctx);
+        double sender_exp_end = sched_ctx_hypervisor_get_exp_end(sender_sched_ctx);
+        double sender_v_cpu = sched_ctx_hypervisor_get_cpu_velocity(sender_sched_ctx);
+//      double v_gcpu = sched_ctx_hypervisor_get_gpu_velocity(sender_sched_ctx);                                                                                                                                                                                                                                                                                                                                                                                                              
+
+        double v_for_rctx = (receiver_remainig_flops/(sender_exp_end - starpu_timing_now())) - v_receiver;
+//      v_for_rctx /= 2;                                                                                                                                                                                                                                                                                                                                                                                                                                                                      
+
+        int nworkers_needed = v_for_rctx/sender_v_cpu;
+/*      printf("%d->%d: v_rec %lf v %lf v_cpu %lf w_needed %d \n", sender_sched_ctx, receiver_sched_ctx, */
+/*             v_receiver, v_for_rctx, sender_v_cpu, nworkers_needed); */
+        if(nworkers_needed > 0)
+        {
+                struct policy_config *sender_config = sched_ctx_hypervisor_get_config(sender_sched_ctx);
+                unsigned potential_moving_cpus = _get_potential_nworkers(sender_config, sender_sched_ctx, STARPU_CPU_WORKER);
+                unsigned potential_moving_gpus = _get_potential_nworkers(sender_config, sender_sched_ctx, STARPU_CUDA_WORKER);
+                unsigned sender_nworkers = starpu_get_nworkers_of_sched_ctx(sender_sched_ctx);
+                struct policy_config *config = sched_ctx_hypervisor_get_config(receiver_sched_ctx);
+                unsigned nworkers_ctx = starpu_get_nworkers_of_sched_ctx(receiver_sched_ctx);
+
+                if(nworkers_needed < (potential_moving_cpus + 5 * potential_moving_gpus))
+                {
+                        if((sender_nworkers - nworkers_needed) >= sender_config->min_nworkers)
+                        {
+                                if((nworkers_ctx + nworkers_needed) > config->max_nworkers)
+                                        nworkers_needed = nworkers_ctx > config->max_nworkers ? 0 : (config->max_nworkers - nworkers_ctx);
+
+                                if(nworkers_needed > 0)
+                                {
+                                        int ngpus = nworkers_needed / 5;
+                                        int *gpus;
+                                        gpus = _get_first_workers(sender_sched_ctx, &ngpus, STARPU_CUDA_WORKER);
+                                        int ncpus = nworkers_needed - ngpus;
+                                        int *cpus;
+                                        cpus = _get_first_workers(sender_sched_ctx, &ncpus, STARPU_CPU_WORKER);
+                                        workers = (int*)malloc(nworkers_needed*sizeof(int));
+                                        int i;
+                                        for(i = 0; i < ngpus; i++)
+                                                workers[(*nworkers)++] = gpus[i];
+
+                                        for(i = 0; i < ncpus; i++)
+                                                workers[(*nworkers)++] = cpus[i];
+
+                                        free(gpus);
+                                        free(cpus);
+                                }
+                        }
+                }
+		else
+                {
+                        int nworkers_to_move = _get_nworkers_to_move(sender_sched_ctx);
+
+                        if(sender_nworkers - nworkers_to_move >= sender_config->min_nworkers)
+                        {
+                                unsigned nshared_workers = starpu_get_nshared_workers(sender_sched_ctx, receiver_sched_ctx);
+                                if((nworkers_ctx + nworkers_to_move - nshared_workers) > config->max_nworkers)
+                                        nworkers_to_move = nworkers_ctx > config->max_nworkers ? 0 : (config->max_nworkers - nworkers_ctx + nshared_workers);
+
+                                if(nworkers_to_move > 0)
+                                {
+                                        workers = _get_first_workers(sender_sched_ctx, &nworkers_to_move, 0);
+                                        *nworkers = nworkers_to_move;
+                                }
+                        }
+                }
+        }
+        return workers;
+}
+
+static unsigned _simple_resize2(unsigned sender_sched_ctx, unsigned receiver_sched_ctx, unsigned force_resize)
+{
+        int ret = 1;
+        if(force_resize)
+                pthread_mutex_lock(&act_hypervisor_mutex);
+        else
+                ret = pthread_mutex_trylock(&act_hypervisor_mutex);
+        if(ret != EBUSY)
+        {
+                int nworkers_to_move = 0;
+                int *workers_to_move =  _get_workers_to_move(sender_sched_ctx, receiver_sched_ctx, &nworkers_to_move);
+		if(nworkers_to_move > 0)
+                {
+                        sched_ctx_hypervisor_move_workers(sender_sched_ctx, receiver_sched_ctx, workers_to_move, nworkers_to_move);
+
+                        struct policy_config *new_config = sched_ctx_hypervisor_get_config(receiver_sched_ctx);
+                        int i;
+                        for(i = 0; i < nworkers_to_move; i++)
+                                new_config->max_idle[workers_to_move[i]] = new_config->max_idle[workers_to_move[i]] !=MAX_IDLE_TIME ? new_config->max_idle[workers_to_move[i]] :  new_config->new_workers_max_idle;
+
+                        free(workers_to_move);
+                }
+                pthread_mutex_unlock(&act_hypervisor_mutex);
+                return 1;
+        }
+        return 0;
+
+}
+
 static unsigned simple_resize(unsigned sender_sched_ctx)
 static unsigned simple_resize(unsigned sender_sched_ctx)
 {
 {
 	return _simple_resize(sender_sched_ctx, STARPU_NMAX_SCHED_CTXS, 1);
 	return _simple_resize(sender_sched_ctx, STARPU_NMAX_SCHED_CTXS, 1);

+ 61 - 1
sched_ctx_hypervisor/src/sched_ctx_hypervisor.c

@@ -225,7 +225,7 @@ static void _get_cpus(int *workers, int nworkers, int *cpus, int *ncpus)
 
 
 void sched_ctx_hypervisor_move_workers(unsigned sender_sched_ctx, unsigned receiver_sched_ctx, int* workers_to_move, unsigned nworkers_to_move)
 void sched_ctx_hypervisor_move_workers(unsigned sender_sched_ctx, unsigned receiver_sched_ctx, int* workers_to_move, unsigned nworkers_to_move)
 {
 {
-	if(hypervisor.resize[sender_sched_ctx])
+	if(hypervisor.resize[sender_sched_ctx] || sched_ctx_hypervisor_get_flops_left_pct(sender_sched_ctx) == 0.0f)
 	{
 	{
 		int j;
 		int j;
 		printf("resize ctx %d with", sender_sched_ctx);
 		printf("resize ctx %d with", sender_sched_ctx);
@@ -437,6 +437,34 @@ static double _get_elapsed_flops_per_sched_ctx(unsigned sched_ctx)
 	return ret_val;
 	return ret_val;
 }
 }
 
 
+static double _get_elapsed_flops_per_cpus(unsigned sched_ctx, int *ncpus)
+{
+	double ret_val = 0.0;
+
+	struct worker_collection *workers = starpu_get_worker_collection_of_sched_ctx(sched_ctx);
+        int worker;
+
+	if(workers->init_cursor)
+                workers->init_cursor(workers);
+
+        while(workers->has_next(workers))
+	{
+                worker = workers->get_next(workers);
+                enum starpu_archtype arch = starpu_worker_get_type(worker);
+                if(arch == STARPU_CPU_WORKER)
+                {
+			ret_val += hypervisor.sched_ctx_w[sched_ctx].elapsed_flops[worker];
+			(*ncpus)++;
+                }
+        }
+
+	if(workers->init_cursor)
+		workers->deinit_cursor(workers);
+
+	return ret_val;
+}
+
+
 static void _set_elapsed_flops_per_sched_ctx(unsigned sched_ctx, double val)
 static void _set_elapsed_flops_per_sched_ctx(unsigned sched_ctx, double val)
 {
 {
 	int i;
 	int i;
@@ -459,6 +487,38 @@ double sched_ctx_hypervisor_get_exp_end(unsigned sched_ctx)
 	return -1.0;
 	return -1.0;
 }
 }
 
 
+double sched_ctx_hypervisor_get_ctx_velocity(unsigned sched_ctx)
+{
+        double elapsed_flops = _get_elapsed_flops_per_sched_ctx(sched_ctx);
+
+        if( elapsed_flops != 0.0)
+        {
+                double curr_time = starpu_timing_now();
+                double elapsed_time = curr_time - hypervisor.sched_ctx_w[sched_ctx].start_time;
+                return elapsed_flops/elapsed_time;
+        }
+}
+
+double sched_ctx_hypervisor_get_cpu_velocity(unsigned sched_ctx)
+{
+        int ncpus = 0;
+        double elapsed_flops = _get_elapsed_flops_per_cpus(sched_ctx, &ncpus);
+
+        if( elapsed_flops != 0.0)
+        {
+                double curr_time = starpu_timing_now();
+                double elapsed_time = curr_time - hypervisor.sched_ctx_w[sched_ctx].start_time;
+                return (elapsed_flops/elapsed_time) / ncpus;
+        }
+
+        return -1.0;
+}
+
+double sched_ctx_hypervisor_get_flops_left(unsigned sched_ctx)
+{
+        return hypervisor.sched_ctx_w[sched_ctx].remaining_flops;
+}
+
 double sched_ctx_hypervisor_get_flops_left_pct(unsigned sched_ctx)
 double sched_ctx_hypervisor_get_flops_left_pct(unsigned sched_ctx)
 {
 {
 	struct sched_ctx_wrapper *wrapper = &hypervisor.sched_ctx_w[sched_ctx];
 	struct sched_ctx_wrapper *wrapper = &hypervisor.sched_ctx_w[sched_ctx];

+ 40 - 1
src/core/sched_ctx.c

@@ -89,7 +89,10 @@ static void _starpu_update_workers(int *workerids, int nworkers, int sched_ctx_i
 			worker[i]->tasks[sched_ctx_id]->execute_on_a_specific_worker = 1;
 			worker[i]->tasks[sched_ctx_id]->execute_on_a_specific_worker = 1;
 			worker[i]->tasks[sched_ctx_id]->workerid = workerids[i];
 			worker[i]->tasks[sched_ctx_id]->workerid = workerids[i];
 			worker[i]->tasks[sched_ctx_id]->destroy = 1;
 			worker[i]->tasks[sched_ctx_id]->destroy = 1;
-			worker[i]->tasks[sched_ctx_id]->priority = 1;
+			int worker_sched_ctx_id = _starpu_worker_get_sched_ctx_id(worker[i], sched_ctx_id);
+                        /* if the ctx is not in the worker's list it means the update concerns the addition of ctxs*/
+                        if(worker_sched_ctx_id == STARPU_NMAX_SCHED_CTXS)
+                                worker[i]->tasks[sched_ctx_id]->priority = 1;
 
 
 			_starpu_exclude_task_from_dag(worker[i]->tasks[sched_ctx_id]);
 			_starpu_exclude_task_from_dag(worker[i]->tasks[sched_ctx_id]);
 			
 			
@@ -642,6 +645,42 @@ unsigned starpu_get_nworkers_of_sched_ctx(unsigned sched_ctx_id)
 
 
 }
 }
 
 
+unsigned starpu_get_nshared_workers(unsigned sched_ctx_id, unsigned sched_ctx_id2)
+{
+        struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+        struct starpu_sched_ctx *sched_ctx2 = _starpu_get_sched_ctx_struct(sched_ctx_id2);
+
+        struct worker_collection *workers = sched_ctx->workers;
+        struct worker_collection *workers2 = sched_ctx2->workers;
+        int worker, worker2;
+        int shared_workers = 0;
+
+        if(workers->init_cursor)
+                workers->init_cursor(workers);
+
+        if(workers2->init_cursor)
+                workers2->init_cursor(workers2);
+
+        while(workers->has_next(workers))
+        {
+                worker = workers->get_next(workers);
+                while(workers2->has_next(workers2))
+		{
+                        worker2 = workers2->get_next(workers2);
+                        if(worker == worker2)
+				shared_workers++;
+                }
+        }
+
+        if(workers->init_cursor)
+                workers->deinit_cursor(workers);
+
+        if(workers2->init_cursor)
+                workers2->deinit_cursor(workers2);
+
+	return shared_workers;
+}
+
 #ifdef STARPU_USE_SCHED_CTX_HYPERVISOR
 #ifdef STARPU_USE_SCHED_CTX_HYPERVISOR
 void starpu_call_poped_task_cb(int workerid, unsigned sched_ctx_id, double flops)
 void starpu_call_poped_task_cb(int workerid, unsigned sched_ctx_id, double flops)
 {
 {