Преглед на файлове

*eager: wake up workers when we add them to a context
*pop tasks: fix round robin for the overlapping contexts
*hypervisor: update linear progr to better consider the max workers for a context

Andra Hugo преди 11 години
родител
ревизия
52459c8508

+ 21 - 75
sc_hypervisor/src/policies_utils/lp_programs.c

@@ -255,8 +255,7 @@ double sc_hypervisor_lp_simulate_distrib_flops(int ns, int nw, double v[ns][nw],
 	int s, w;
 	glp_prob *lp;
 
-	int ne = //ns * (nw*ns + 1) +
-		(ns*nw+1)*(2*ns+nw)
+	int ne = (ns*nw+1)*(ns+nw)
 		+ 1; /* glp dumbness */
 	int n = 1;
 	int ia[ne], ja[ne];
@@ -276,16 +275,26 @@ double sc_hypervisor_lp_simulate_distrib_flops(int ns, int nw, double v[ns][nw],
 	{
 		for(w = 0; w < nw; w++)
 		{
+			struct sc_hypervisor_policy_config *config = sc_hypervisor_get_config(sched_ctxs[s]);
 			char name[32];
 			snprintf(name, sizeof(name), "worker%dctx%d", w, s);
 			glp_set_col_name(lp, n, name);
 			if (integer)
 			{
 				glp_set_col_kind(lp, n, GLP_IV);
-				glp_set_col_bnds(lp, n, GLP_LO, 0, 0);
+				printf("ctx %d idx %d min %d max %d \n", sched_ctxs[s], s, config->min_nworkers, config->max_nworkers);
+				if(config->max_nworkers == 0)
+					glp_set_col_bnds(lp, n, GLP_FX, config->min_nworkers, config->max_nworkers);
+				else
+					glp_set_col_bnds(lp, n, GLP_DB, config->min_nworkers, config->max_nworkers);
 			}
 			else
-				glp_set_col_bnds(lp, n, GLP_LO, 0.0, 0.0);
+			{
+				if(config->max_nworkers == 0)
+					glp_set_col_bnds(lp, n, GLP_FX, config->min_nworkers*1.0, config->max_nworkers*1.0);
+				else
+					glp_set_col_bnds(lp, n, GLP_DB, config->min_nworkers*1.0, config->max_nworkers*1.0);
+			}
 			n++;
 		}
 	}
@@ -338,69 +347,6 @@ double sc_hypervisor_lp_simulate_distrib_flops(int ns, int nw, double v[ns][nw],
 		n++;
 	}
 
-	/* one row corresponds to one ctx*/
-	glp_add_rows(lp, ns);
-
-	for(s = 0; s < ns; s++)
-	{
-		struct sc_hypervisor_policy_config *config = sc_hypervisor_get_config(sched_ctxs[s]);
-		char name[32];
-		snprintf(name, sizeof(name), "ctx%d", s);
-		glp_set_row_name(lp, ns+s+1, name);
-		glp_set_row_bnds(lp, ns+s+1, GLP_LO, 0., 0.);
-		
-
-		int s2;
-		for(s2 = 0; s2 < ns; s2++)
-		{
-			if(s2 == s)
-			{
-
-				for(w = 0; w < nw; w++)
-				{
-					/* only for CPUs for now */
-					if(w == 0)
-					{
-						ia[n] = ns+s+1;
-						ja[n] = w+s2*nw + 1;
-						ar[n] = 1.0;
-//					printf("ia[%d]=%d ja[%d]=%d ar[%d]=%lf\n", n, ia[n], n, ja[n], n, ar[n]);
-					}
-					else
-					{
-						ia[n] = ns+s+1;
-						ja[n] = w+s2*nw + 1;
-						ar[n] = 0.0;
-//					printf("ia[%d]=%d ja[%d]=%d ar[%d]=%lf\n", n, ia[n], n, ja[n], n, ar[n]);
-
-					}
-					n++;
-				}
-			}
-			else
-			{
-				for(w = 0; w < nw; w++)
-				{
-
-					ia[n] = ns+s+1;
-					ja[n] = w+s2*nw + 1;
-					ar[n] = 0.0;
-//					printf("ia[%d]=%d ja[%d]=%d ar[%d]=%lf\n", n, ia[n], n, ja[n], n, ar[n]);
-					n++;
-				}
-				
-			}
-				
-		}
-		ia[n] = ns+s+1;
-		ja[n] = ns*nw+1;
-		ar[n] = 0.0;
-		n++;
-		
-		glp_set_row_bnds(lp, ns+s+1, GLP_UP, config->min_nworkers, config->max_nworkers);
-
-	}
-
 	/*we add another linear constraint : sum(all cpus) = 9 and sum(all gpus) = 3 */
 	glp_add_rows(lp, nw);
 
@@ -408,7 +354,7 @@ double sc_hypervisor_lp_simulate_distrib_flops(int ns, int nw, double v[ns][nw],
 	{
 		char name[32];
 		snprintf(name, sizeof(name), "w%d", w);
-		glp_set_row_name(lp, 2*ns+w+1, name);
+		glp_set_row_name(lp, ns+w+1, name);
 		for(s = 0; s < ns; s++)
 		{
 			int w2;
@@ -416,14 +362,14 @@ double sc_hypervisor_lp_simulate_distrib_flops(int ns, int nw, double v[ns][nw],
 			{
 				if(w2 == w)
 				{
-					ia[n] = 2*ns+w+1;
+					ia[n] = ns+w+1;
 					ja[n] = w2+s*nw + 1;
 					ar[n] = 1.0;
 //					printf("ia[%d]=%d ja[%d]=%d ar[%d]=%lf\n", n, ia[n], n, ja[n], n, ar[n]);
 				}
 				else
 				{
-					ia[n] = 2*ns+w+1;
+					ia[n] = ns+w+1;
 					ja[n] = w2+s*nw + 1;
 					ar[n] = 0.0;
 //					printf("ia[%d]=%d ja[%d]=%d ar[%d]=%lf\n", n, ia[n], n, ja[n], n, ar[n]);
@@ -432,7 +378,7 @@ double sc_hypervisor_lp_simulate_distrib_flops(int ns, int nw, double v[ns][nw],
 			}
 		}
 		/* 1/tmax */
-		ia[n] = 2*ns+w+1;
+		ia[n] = ns+w+1;
 		ja[n] = ns*nw+1;
 		ar[n] = 0.0;
 //		printf("ia[%d]=%d ja[%d]=%d ar[%d]=%lf\n", n, ia[n], n, ja[n], n, ar[n]);
@@ -440,11 +386,11 @@ double sc_hypervisor_lp_simulate_distrib_flops(int ns, int nw, double v[ns][nw],
 
 		/*sum(all gpus) = 3*/
 		if(w == 0)
-			glp_set_row_bnds(lp, 2*ns+w+1, GLP_FX, total_nw[0], total_nw[0]);
+			glp_set_row_bnds(lp, ns+w+1, GLP_FX, total_nw[0], total_nw[0]);
 
 		/*sum(all cpus) = 9*/
 		if(w == 1)
-			glp_set_row_bnds(lp, 2*ns+w+1, GLP_FX, total_nw[1], total_nw[1]);
+			glp_set_row_bnds(lp, ns+w+1, GLP_FX, total_nw[1], total_nw[1]);
 	}
 
 	STARPU_ASSERT(n == ne);
@@ -493,7 +439,7 @@ double sc_hypervisor_lp_simulate_distrib_flops(int ns, int nw, double v[ns][nw],
 
 	double vmax = glp_get_obj_val(lp);
 
-//	printf("vmax = %lf \n", vmax);
+	printf("vmax = %lf \n", vmax);
 	n = 1;
 	for(s = 0; s < ns; s++)
 	{
@@ -503,7 +449,7 @@ double sc_hypervisor_lp_simulate_distrib_flops(int ns, int nw, double v[ns][nw],
                                 res[s][w] = (double)glp_mip_col_val(lp, n);
 			else
 				res[s][w] = glp_get_col_prim(lp, n);
-//			printf("%d/%d: res %lf flops = %lf v = %lf\n", w,s, res[s][w], flops[s], v[s][w]);
+			printf("%d/%d: res %lf flops = %lf v = %lf\n", w,s, res[s][w], flops[s], v[s][w]);
 			n++;
 		}
 	}

+ 2 - 1
sc_hypervisor/src/policies_utils/lp_tools.c

@@ -89,7 +89,8 @@ we give a worker (in shared mode) to the context in order to leave him
 finish its work = we give -1.0 value instead of 0.0 and further on in
 the distribution function we take this into account and revert the variable
 to its 0.0 value */ 
-		if(no_workers && (flops[i] != 0.0 || sc_w->nready_tasks > 0))
+//		if(no_workers && (flops[i] != 0.0 || sc_w->nready_tasks > 0))
+		if(no_workers)
 		{
 			for(w = 0; w < nw; w++)
 				res[i][w] = -1.0;

+ 3 - 3
sc_hypervisor/src/policies_utils/policy_tools.c

@@ -465,10 +465,10 @@ unsigned sc_hypervisor_check_idle(unsigned sched_ctx, int worker)
 	struct sc_hypervisor_policy_config *config = sc_w->config;
 	if(config != NULL)
 	{
-		printf("w%d/ctx%d: current idle %lf max_idle %lf\n", worker, sched_ctx, sc_w->current_idle_time[worker], config->max_idle[worker]);
-		if(sc_w->current_idle_time[worker] > config->max_idle[worker])
+		printf("w%d/ctx%d: current idle %lf max_idle %lf\n", worker, sched_ctx, sc_w->idle_time[worker], config->max_idle[worker]);
+		if(sc_w->idle_time[worker] > config->max_idle[worker])
 		{
-			sc_w->current_idle_time[worker] = 0.0;
+//			sc_w->current_idle_time[worker] = 0.0;
 			return 1;
 		}
 	}

+ 13 - 32
sc_hypervisor/src/sc_hypervisor.c

@@ -819,7 +819,19 @@ void sc_hypervisor_update_resize_interval(unsigned *sched_ctxs, int nsched_ctxs)
 		double elapsed_time = (curr_time - hypervisor.sched_ctx_w[sched_ctx].start_time) / 1000000.0; /* in seconds */
 		double norm_idle_time = max_workers_idle_time[i] / elapsed_time;
 
-		config->max_nworkers = 	workers->nworkers - lrint(norm_idle_time) + hypervisor.sched_ctx_w[sched_ctx].nready_tasks;
+		if(lrint(norm_idle_time) >= 1)
+		{
+			config->max_nworkers = 	workers->nworkers - lrint(norm_idle_time);
+			if(config->max_nworkers > hypervisor.sched_ctx_w[sched_ctx].nready_tasks)
+				config->max_nworkers = hypervisor.sched_ctx_w[sched_ctx].nready_tasks - 1;
+		}
+		else
+		{
+			if(max_workers_idle_time[i] < 0.000001)
+				config->max_nworkers = 	workers->nworkers + hypervisor.sched_ctx_w[sched_ctx].nready_tasks - 1;
+			else
+				config->max_nworkers = workers->nworkers;
+		}
 		
 		if(config->max_nworkers < 0)
 			config->max_nworkers = 0;
@@ -829,37 +841,6 @@ void sc_hypervisor_update_resize_interval(unsigned *sched_ctxs, int nsched_ctxs)
 		printf("%d: ready tasks  %d idle for long %lf norm_idle_time %lf elapsed_time %lf nworkers %d max %d \n", 
 		       sched_ctx, hypervisor.sched_ctx_w[sched_ctx].nready_tasks, max_workers_idle_time[i], norm_idle_time, elapsed_time, workers->nworkers, config->max_nworkers);
 
-/* 		if(max_workers_idle_time[i] > 0.000002) */
-/* 		{ */
-/* 			double curr_time = starpu_timing_now(); */
-/* 			double elapsed_time = (curr_time - hypervisor.sched_ctx_w[sched_ctx].start_time) / 1000000.0; /\* in seconds *\/ */
-/* 			double norm_idle_time = max_workers_idle_time[i] / elapsed_time; */
-
-/* 			config->max_nworkers = 	workers->nworkers - lrint(norm_idle_time); */
-/* 			if(config->max_nworkers < 0) */
-/* 				config->max_nworkers = 0; */
-			
-/* 			printf("%d: ready tasks  %d idle for long %lf norm_idle_time %lf elapsed_time %lf nworkers %d decr %d \n",  */
-/* 			       sched_ctx, hypervisor.sched_ctx_w[sched_ctx].nready_tasks, max_workers_idle_time[i], norm_idle_time, elapsed_time, workers->nworkers, config->max_nworkers); */
-
-/* 		} */
-/* 		else */
-/* 		{ */
-/* 			double curr_time = starpu_timing_now(); */
-/* 			double elapsed_time = (curr_time - hypervisor.sched_ctx_w[sched_ctx].start_time) / 1000000.0; /\* in seconds *\/ */
-/* 			double norm_idle_time = max_workers_idle_time[i] / elapsed_time; */
-			
-/* 			if(workers->nworkers == 0 && hypervisor.sched_ctx_w[sched_ctx].nready_tasks == 1) */
-/* 				config->max_nworkers = 0; */
-/* 			else */
-/* 			{ */
-/* 				config->max_nworkers = (hypervisor.sched_ctx_w[sched_ctx].nready_tasks > max_cpus)  */
-/* 					? max_cpus : hypervisor.sched_ctx_w[sched_ctx].nready_tasks; */
-/* 				config->max_nworkers = workers->nworkers > config->max_nworkers ? workers->nworkers : config->max_nworkers; */
-/* 			} */
-/* 			printf("%d: ready tasks  %d not idle %lf norm_idle_time %lf elapsed_time %lf nworkers %d incr %d \n",  */
-/* 			       sched_ctx, hypervisor.sched_ctx_w[sched_ctx].nready_tasks, max_workers_idle_time[i], norm_idle_time, elapsed_time, workers->nworkers, config->max_nworkers); */
-/* 		} */
 
 		total_max_nworkers += config->max_nworkers;
 		configured = 1;

+ 30 - 0
src/core/sched_ctx.c

@@ -1116,6 +1116,36 @@ int starpu_sched_ctx_set_max_priority(unsigned sched_ctx_id, int max_prio)
 	return 0;
 }
 
+unsigned _starpu_sched_ctx_last_worker_awake(struct _starpu_worker *worker)
+{
+	struct _starpu_sched_ctx_list *l = NULL;
+        for (l = worker->sched_ctx_list; l; l = l->next)
+        {
+		struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(l->sched_ctx);
+		
+		unsigned last_worker_awake = 1;
+		struct starpu_worker_collection *workers = sched_ctx->workers;
+		struct starpu_sched_ctx_iterator it;
+
+		int workerid;
+		if(workers->init_iterator)
+			workers->init_iterator(workers, &it);
+		
+		while(workers->has_next(workers, &it))
+		{
+			workerid = workers->get_next(workers, &it);
+			if(workerid != worker->workerid && _starpu_worker_get_status(workerid) != STATUS_SLEEPING)
+			{
+				last_worker_awake = 0;
+				break;
+			}
+		}
+		if(last_worker_awake)
+			return 1;
+	}
+	return 0;
+}
+
 static void _starpu_sched_ctx_bind_thread_to_ctx_cpus(unsigned sched_ctx_id)
 {
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);

+ 4 - 0
src/core/sched_ctx.h

@@ -161,6 +161,10 @@ unsigned _starpu_worker_belongs_to_a_sched_ctx(int workerid, unsigned sched_ctx_
 /* mutex synchronising several simultaneous modifications of a context */
 starpu_pthread_mutex_t* _starpu_sched_ctx_get_changing_ctx_mutex(unsigned sched_ctx_id);
 
+/* indicates wheather this worker should go to sleep or not 
+   (if it is the last one awake in a context he should better keep awake) */
+unsigned _starpu_sched_ctx_last_worker_awake(struct _starpu_worker *worker);
+
 /*rebind each thread on its cpu after finishing a parallel code */
 void _starpu_sched_ctx_rebind_thread_to_its_cpu(unsigned cpuid);
 

+ 29 - 14
src/core/sched_policy.c

@@ -566,8 +566,8 @@ struct _starpu_sched_ctx* _get_next_sched_ctx_to_pop_into(struct _starpu_worker
 	for (l = worker->sched_ctx_list; l; l = l->next)
 	{
 		sched_ctx = _starpu_get_sched_ctx_struct(l->sched_ctx);
-/* 		if(worker->removed_from_ctx[sched_ctx->id]) */
-/* 			return sched_ctx; */
+		if(worker->removed_from_ctx[sched_ctx->id] == 1 && worker->shares_tasks_lists[sched_ctx->id] == 1)
+			return sched_ctx;
 		if(sched_ctx->pop_counter[worker->workerid] < worker->nsched_ctxs &&
 		   smallest_counter > sched_ctx->pop_counter[worker->workerid])
 		{
@@ -622,7 +622,22 @@ pick:
 			if(worker->nsched_ctxs == 1)
 				sched_ctx = _starpu_get_initial_sched_ctx();
 			else
-				sched_ctx = _get_next_sched_ctx_to_pop_into(worker);
+			{
+				while(1)
+				{
+					sched_ctx = _get_next_sched_ctx_to_pop_into(worker);
+					
+					if(worker->removed_from_ctx[sched_ctx->id] == 1 && worker->shares_tasks_lists[sched_ctx->id] == 1)
+					{
+						_starpu_worker_gets_out_of_ctx(sched_ctx->id, worker);
+						worker->removed_from_ctx[sched_ctx->id] = 0;
+						sched_ctx = NULL;
+					}
+					else
+						break;
+				}
+			}
+
 
 			if(sched_ctx && sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
 			{
@@ -631,30 +646,30 @@ pick:
 					task = sched_ctx->sched_policy->pop_task(sched_ctx->id);
 				}
 			}
-
+			
 			if(!task)
 			{
-				if(sched_ctx && worker->removed_from_ctx[sched_ctx->id])
+				/* it doesn't matter if it shares tasks list or not in the scheduler,
+				   if it does not have any task to pop just get it out of here */
+				/* however if it shares a task list it will be removed as soon as he 
+				  finishes this job (in handle_job_termination) */
+				if(worker->removed_from_ctx[sched_ctx->id])
 				{
 					_starpu_worker_gets_out_of_ctx(sched_ctx->id, worker);
 					worker->removed_from_ctx[sched_ctx->id] = 0;
-				} 
-#ifdef STARPU_USE_SC_HYPERVISOR
-				else 
-				{
-					struct starpu_sched_ctx_performance_counters *perf_counters = sched_ctx->perf_counters;
-					if(sched_ctx->id != 0 && perf_counters != NULL && perf_counters->notify_idle_cycle)
-						perf_counters->notify_idle_cycle(sched_ctx->id, worker->workerid, 1.0);
 				}
+#ifdef STARPU_USE_SC_HYPERVISOR
+				struct starpu_sched_ctx_performance_counters *perf_counters = sched_ctx->perf_counters;
+				if(sched_ctx->id != 0 && perf_counters != NULL && perf_counters->notify_idle_cycle)
+					perf_counters->notify_idle_cycle(sched_ctx->id, worker->workerid, 1.0);
 #endif //STARPU_USE_SC_HYPERVISOR
-					
+				
 #ifndef STARPU_NON_BLOCKING_DRIVERS
 				if((sched_ctx->pop_counter[worker->workerid] == 0 && been_here[sched_ctx->id]) || worker->nsched_ctxs == 1)
 					break;
 				been_here[sched_ctx->id] = 1;
 #endif
 			}
-			
 			sched_ctx->pop_counter[worker->workerid]++;
 		}
 	  }

+ 2 - 1
src/drivers/driver_common/driver_common.c

@@ -22,6 +22,7 @@
 #include <profiling/profiling.h>
 #include <common/utils.h>
 #include <core/debug.h>
+#include <core/sched_ctx.h>
 #include <drivers/driver_common/driver_common.h>
 #include <starpu_top.h>
 #include <core/sched_policy.h>
@@ -218,7 +219,7 @@ struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *args, int wor
 
 		_starpu_worker_set_status_sleeping(workerid);
 
-		if (_starpu_worker_can_block(memnode))
+		if (_starpu_worker_can_block(memnode) && !_starpu_sched_ctx_last_worker_awake(args))
 		{
 			STARPU_PTHREAD_COND_WAIT(&args->sched_cond, &args->sched_mutex);
 			STARPU_PTHREAD_MUTEX_UNLOCK(&args->sched_mutex);

+ 3 - 3
src/sched_policies/eager_central_policy.c

@@ -144,13 +144,13 @@ static void eager_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nw
 	for (i = 0; i < nworkers; i++)
 	{
 		workerid = workerids[i];
-		starpu_sched_ctx_worker_shares_tasks_lists(workerid, sched_ctx_id);
-
 		starpu_pthread_mutex_t *sched_mutex;
 		starpu_pthread_cond_t *sched_cond;
 		starpu_worker_get_sched_condition(workerid, &sched_mutex, &sched_cond);
-
 		starpu_wakeup_worker(workerid, sched_cond, sched_mutex);
+
+		starpu_sched_ctx_worker_shares_tasks_lists(workerid, sched_ctx_id);
+
 	}
 }