Explorar o código

Merge from trunk @11714:11788

Marc Sergent %!s(int64=11) %!d(string=hai) anos
pai
achega
e998e008c6

+ 5 - 0
sc_hypervisor/include/sc_hypervisor_monitoring.h

@@ -127,6 +127,11 @@ struct sc_hypervisor_wrapper
 	/* boolean indicating if we add the idle of this worker to 
 	   the idle of the context */
 	unsigned compute_idle[STARPU_NMAXWORKERS];
+
+	/* boolean indicating if we add the entiere idle of this 
+	   worker to the idle of the context or just half*/
+	unsigned compute_partial_idle[STARPU_NMAXWORKERS];
+
 };
 
 /* return the wrapper of context that saves its monitoring information */

+ 6 - 3
sc_hypervisor/src/hypervisor_policies/feft_lp_policy.c

@@ -28,6 +28,9 @@ static void _try_resizing(unsigned *sched_ctxs, int nsched_ctxs, int *workers, i
 	printf("resize_no = %d\n", resize_no);
 	starpu_trace_user_event(resize_no++);
 	int ns = sched_ctxs == NULL ? sc_hypervisor_get_nsched_ctxs() : nsched_ctxs;
+
+	if(ns <= 1) return;
+
 	unsigned *curr_sched_ctxs = sched_ctxs == NULL ? sc_hypervisor_get_sched_ctxs() : sched_ctxs;
 	unsigned curr_nworkers = nworkers == -1 ? starpu_worker_get_count() : (unsigned)nworkers;
 	
@@ -154,13 +157,13 @@ static void feft_lp_size_ctxs(unsigned *sched_ctxs, int nsched_ctxs, int *worker
 static void feft_lp_handle_idle_cycle(unsigned sched_ctx, int worker)
 {
 	unsigned criteria = sc_hypervisor_get_resize_criteria();
-	if(criteria != SC_NOTHING && criteria == SC_IDLE)
+	if(criteria != SC_NOTHING)// && criteria == SC_IDLE)
 	{
 		int ret = starpu_pthread_mutex_trylock(&act_hypervisor_mutex);
 		if(ret != EBUSY)
 		{
-			if(sc_hypervisor_check_idle(sched_ctx, worker))
-				_try_resizing(NULL, -1, NULL, -1);
+			printf("trigger idle \n");
+			_try_resizing(NULL, -1, NULL, -1);
 			starpu_pthread_mutex_unlock(&act_hypervisor_mutex);
 		}
 	}

+ 1 - 2
sc_hypervisor/src/policies_utils/policy_tools.c

@@ -465,10 +465,9 @@ unsigned sc_hypervisor_check_idle(unsigned sched_ctx, int worker)
 	struct sc_hypervisor_policy_config *config = sc_w->config;
 	if(config != NULL)
 	{
-		printf("w%d/ctx%d: current idle %lf max_idle %lf\n", worker, sched_ctx, sc_w->idle_time[worker], config->max_idle[worker]);
 		if(sc_w->idle_time[worker] > config->max_idle[worker])
 		{
-//			sc_w->current_idle_time[worker] = 0.0;
+//			printf("w%d/ctx%d: current idle %lf all idle %lf max_idle %lf\n", worker, sched_ctx, idle, idle_time, config->max_idle[worker]);
 			return 1;
 		}
 	}

+ 59 - 33
sc_hypervisor/src/sc_hypervisor.c

@@ -209,6 +209,7 @@ void* sc_hypervisor_init(struct sc_hypervisor_policy *hypervisor_policy)
 			hypervisor.sched_ctx_w[i].total_elapsed_flops[j] = 0.0;
 			hypervisor.sched_ctx_w[i].worker_to_be_removed[j] = 0;
 			hypervisor.sched_ctx_w[i].compute_idle[j] = 1;
+			hypervisor.sched_ctx_w[i].compute_partial_idle[j] = 0;
 		}
 	}
 
@@ -313,7 +314,6 @@ void sc_hypervisor_register_ctx(unsigned sched_ctx, double total_flops)
 	hypervisor.sched_ctx_w[sched_ctx].total_flops = total_flops;
 	hypervisor.sched_ctx_w[sched_ctx].remaining_flops = total_flops;
 	hypervisor.resize[sched_ctx] = 1;
-	hypervisor.sched_ctx_w[sched_ctx].hyp_react_start_time = starpu_timing_now();
 	starpu_pthread_mutex_unlock(&act_hypervisor_mutex);
 }
 
@@ -831,7 +831,7 @@ void sc_hypervisor_update_resize_interval(unsigned *sched_ctxs, int nsched_ctxs)
 					elapsed_time_worker[worker] = 0.0;
 				else
 					elapsed_time_worker[worker] = (end_time - hypervisor.sched_ctx_w[sched_ctx].start_time_w[worker]) / 1000000.0;
-				
+
 				if(hypervisor.sched_ctx_w[sched_ctx].idle_start_time[worker] == 0.0)
 				{
 					idle_time = hypervisor.sched_ctx_w[sched_ctx].idle_time[worker]; /* in seconds */
@@ -948,21 +948,27 @@ static void notify_pushed_task(unsigned sched_ctx, int worker)
 /* notifies the hypervisor that the worker spent another cycle in idle time */
 static void notify_idle_cycle(unsigned sched_ctx, int worker, double idle_time)
 {
-	if(hypervisor.resize[sched_ctx])
+	struct sc_hypervisor_wrapper *sc_w = &hypervisor.sched_ctx_w[sched_ctx];
+	sc_w->current_idle_time[worker] += idle_time;
+	
+	if(sc_w->idle_start_time[worker] == 0.0 && sc_w->hyp_react_start_time != 0.0)
+		sc_w->idle_start_time[worker] = starpu_timing_now();
+	
+
+	if(sc_w->idle_start_time[worker] > 0.0)
 	{
-		struct sc_hypervisor_wrapper *sc_w = &hypervisor.sched_ctx_w[sched_ctx];
-		sc_w->current_idle_time[worker] += idle_time;
+		double end_time  = starpu_timing_now();
+		sc_w->idle_time[worker] += (end_time - sc_w->idle_start_time[worker]) / 1000000.0; /* in seconds */
+	}
+	
+	hypervisor.sched_ctx_w[sched_ctx].idle_start_time[worker] = starpu_timing_now();
 
-		if(sc_w->idle_start_time[worker] == 0.0)
-			sc_w->idle_start_time[worker] = starpu_timing_now();
-		
-		if(hypervisor.policy.handle_idle_cycle)
+	if(hypervisor.resize[sched_ctx] && hypervisor.policy.handle_idle_cycle)
+	{
+		if(sc_w->sched_ctx != STARPU_NMAX_SCHED_CTXS && sc_w->hyp_react_start_time != 0.0)
 		{
-			double curr_time = starpu_timing_now();
-			double elapsed_time = (curr_time - sc_w->hyp_react_start_time) / 1000000.0; /* in seconds */
-			if(sc_w->sched_ctx != STARPU_NMAX_SCHED_CTXS && elapsed_time > sc_w->config->time_sample)
+			if(sc_hypervisor_check_idle(sched_ctx, worker))
 			{
-				sc_w->hyp_react_start_time = starpu_timing_now();
 				hypervisor.policy.handle_idle_cycle(sched_ctx, worker);
 			}
 		}
@@ -970,6 +976,7 @@ static void notify_idle_cycle(unsigned sched_ctx, int worker, double idle_time)
 	return;
 }
 
+
 /* notifies the hypervisor that the worker is no longer idle and a new task was pushed on its queue */
 static void notify_poped_task(unsigned sched_ctx, int worker)
 {
@@ -978,18 +985,33 @@ static void notify_poped_task(unsigned sched_ctx, int worker)
 
 	hypervisor.sched_ctx_w[sched_ctx].exec_start_time[worker] = starpu_timing_now();
 
-	if(hypervisor.resize[sched_ctx])
-		hypervisor.sched_ctx_w[sched_ctx].current_idle_time[worker] = 0.0;
-
-	struct sc_hypervisor_wrapper *sc_w = &hypervisor.sched_ctx_w[sched_ctx];
-
-	if(sc_w->idle_start_time[worker] > 0.0)
+	if(hypervisor.sched_ctx_w[sched_ctx].idle_start_time[worker] > 0.0)
 	{
+		int ns = hypervisor.nsched_ctxs;
+		int j;
+		for(j = 0; j < ns; j++)
+		{
+			if(hypervisor.sched_ctxs[j] != sched_ctx)
+			{
+				if(hypervisor.sched_ctx_w[hypervisor.sched_ctxs[j]].idle_start_time[worker] > 0.0)
+					hypervisor.sched_ctx_w[hypervisor.sched_ctxs[j]].compute_partial_idle[worker] = 1;
+			}
+		}
 		double end_time  = starpu_timing_now();
-		sc_w->idle_time[worker] += (end_time - sc_w->idle_start_time[worker]) / 1000000.0; /* in seconds */
-		sc_w->idle_start_time[worker] = 0.0;
-	}
+		double idle = (end_time - hypervisor.sched_ctx_w[sched_ctx].idle_start_time[worker]) / 1000000.0; /* in seconds */
 			
+		if(hypervisor.sched_ctx_w[sched_ctx].compute_partial_idle[worker])
+			hypervisor.sched_ctx_w[sched_ctx].idle_time[worker] += idle / 2.0;
+		else
+			hypervisor.sched_ctx_w[sched_ctx].idle_time[worker] += idle;
+
+		hypervisor.sched_ctx_w[sched_ctx].compute_partial_idle[worker] = 0;
+		hypervisor.sched_ctx_w[sched_ctx].idle_start_time[worker] = 0.0;
+	}
+
+	if(hypervisor.resize[sched_ctx])
+		hypervisor.sched_ctx_w[sched_ctx].current_idle_time[worker] = 0.0;
+				
 	if(hypervisor.policy.handle_idle_end)
 		hypervisor.policy.handle_idle_end(sched_ctx, worker);
 
@@ -1016,23 +1038,26 @@ static void notify_post_exec_task(struct starpu_task *task, size_t data_size, ui
 	hypervisor.sched_ctx_w[sched_ctx].elapsed_tasks[worker]++ ;
 	hypervisor.sched_ctx_w[sched_ctx].total_elapsed_flops[worker] += flops;
 
-	starpu_pthread_mutex_lock(&act_hypervisor_mutex);
+	starpu_pthread_mutex_lock(&hypervisor.sched_ctx_w[sched_ctx].mutex);
 	hypervisor.sched_ctx_w[sched_ctx].remaining_flops -= flops;
 	if(_sc_hypervisor_use_lazy_resize())
 		_ack_resize_completed(sched_ctx, worker);
-	starpu_pthread_mutex_unlock(&act_hypervisor_mutex);
+	starpu_pthread_mutex_unlock(&hypervisor.sched_ctx_w[sched_ctx].mutex);
 
 	
 	if(hypervisor.resize[sched_ctx])
 	{	
 		if(hypervisor.policy.handle_poped_task)
-		{
+		{	
+			if(hypervisor.sched_ctx_w[sched_ctx].hyp_react_start_time == 0.0)
+				hypervisor.sched_ctx_w[sched_ctx].hyp_react_start_time = starpu_timing_now();
+
 			double curr_time = starpu_timing_now();
 			double elapsed_time = (curr_time - hypervisor.sched_ctx_w[sched_ctx].hyp_react_start_time) / 1000000.0; /* in seconds */
 			if(hypervisor.sched_ctx_w[sched_ctx].sched_ctx != STARPU_NMAX_SCHED_CTXS && elapsed_time > hypervisor.sched_ctx_w[sched_ctx].config->time_sample)
 			{
-				hypervisor.sched_ctx_w[sched_ctx].hyp_react_start_time = starpu_timing_now();
 				hypervisor.policy.handle_poped_task(sched_ctx, worker, task, footprint);
+				hypervisor.sched_ctx_w[sched_ctx].hyp_react_start_time = starpu_timing_now();
 			}
 		}
 	}
@@ -1093,9 +1118,10 @@ static void notify_post_exec_task(struct starpu_task *task, size_t data_size, ui
 
 static void notify_submitted_job(struct starpu_task *task, uint32_t footprint, size_t data_size)
 {
-	starpu_pthread_mutex_lock(&act_hypervisor_mutex);
-	hypervisor.sched_ctx_w[task->sched_ctx].submitted_flops += task->flops;
-	starpu_pthread_mutex_unlock(&act_hypervisor_mutex);
+	unsigned sched_ctx = task->sched_ctx;
+	starpu_pthread_mutex_lock(&hypervisor.sched_ctx_w[sched_ctx].mutex);
+	hypervisor.sched_ctx_w[sched_ctx].submitted_flops += task->flops;
+	starpu_pthread_mutex_unlock(&hypervisor.sched_ctx_w[sched_ctx].mutex);
 
 	if(hypervisor.policy.handle_submitted_job && !type_of_tasks_known)
 		hypervisor.policy.handle_submitted_job(task->cl, task->sched_ctx, footprint, data_size);
@@ -1226,10 +1252,10 @@ struct types_of_workers* sc_hypervisor_get_types_of_workers(int *workers, unsign
 
 void sc_hypervisor_update_diff_total_flops(unsigned sched_ctx, double diff_total_flops)
 {
-	starpu_pthread_mutex_lock(&act_hypervisor_mutex);
+	starpu_pthread_mutex_lock(&hypervisor.sched_ctx_w[sched_ctx].mutex);
 	hypervisor.sched_ctx_w[sched_ctx].total_flops += diff_total_flops;
 	hypervisor.sched_ctx_w[sched_ctx].remaining_flops += diff_total_flops;	
-	starpu_pthread_mutex_unlock(&act_hypervisor_mutex);
+	starpu_pthread_mutex_unlock(&hypervisor.sched_ctx_w[sched_ctx].mutex);
 }
 
 void sc_hypervisor_update_diff_elapsed_flops(unsigned sched_ctx, double diff_elapsed_flops)
@@ -1237,9 +1263,9 @@ void sc_hypervisor_update_diff_elapsed_flops(unsigned sched_ctx, double diff_ela
 	int workerid = starpu_worker_get_id();
 	if(workerid != -1)
 	{
-		starpu_pthread_mutex_lock(&act_hypervisor_mutex);
+//		starpu_pthread_mutex_lock(&hypervisor.sched_ctx_w[sched_ctx].mutex);
 		hypervisor.sched_ctx_w[sched_ctx].elapsed_flops[workerid] += diff_elapsed_flops;
 		hypervisor.sched_ctx_w[sched_ctx].total_elapsed_flops[workerid] += diff_elapsed_flops;
-		starpu_pthread_mutex_unlock(&act_hypervisor_mutex);
+//		starpu_pthread_mutex_unlock(&hypervisor.sched_ctx_w[sched_ctx].mutex);
 	}
 }

+ 16 - 12
src/core/sched_ctx.c

@@ -134,14 +134,15 @@ void starpu_sched_ctx_worker_shares_tasks_lists(int workerid, int sched_ctx_id)
 {
 	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	int curr_workerid = starpu_worker_get_id();
 	/* if is the initial sched_ctx no point in taking the mutex, the workers are
-	   not launched yet */
-	if(!sched_ctx->is_initial_sched)
+	   not launched yet, or if the current worker is calling this */
+	if(!sched_ctx->is_initial_sched && workerid != curr_workerid)
 		STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
 
 	worker->shares_tasks_lists[sched_ctx_id] = 1;
 
-	if(!sched_ctx->is_initial_sched)
+	if(!sched_ctx->is_initial_sched && workerid != curr_workerid)
 		STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
 }
 
@@ -166,10 +167,15 @@ static void _starpu_add_workers_to_sched_ctx(struct _starpu_sched_ctx *sched_ctx
 				added_workers[(*n_added_workers)++] = worker;
 			else
 			{
+				int curr_workerid = starpu_worker_get_id();
 				struct _starpu_worker *worker_str = _starpu_get_worker_struct(workerids[i]);
-				STARPU_PTHREAD_MUTEX_LOCK(&worker_str->sched_mutex);
+				if(curr_workerid != workerids[i])
+					STARPU_PTHREAD_MUTEX_LOCK(&worker_str->sched_mutex);
+
 				worker_str->removed_from_ctx[sched_ctx->id] = 0;
-				STARPU_PTHREAD_MUTEX_UNLOCK(&worker_str->sched_mutex);
+
+				if(curr_workerid != workerids[i])
+					STARPU_PTHREAD_MUTEX_UNLOCK(&worker_str->sched_mutex);
 			}
 		}
 		else
@@ -818,7 +824,8 @@ void _starpu_decrement_nsubmitted_tasks_of_sched_ctx(unsigned sched_ctx_id)
 		config->watchdog_ok = 1;
 
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	int finished = _starpu_barrier_counter_decrement_until_empty_counter(&sched_ctx->tasks_barrier, 0.0);
+	int reached = _starpu_barrier_counter_get_reached_start(&sched_ctx->tasks_barrier);
+	int finished = reached == 1;
         /* when finished decrementing the tasks if the user signaled he will not submit tasks anymore
            we can move all its workers to the inheritor context */
 	if(finished && sched_ctx->inheritor != STARPU_NMAX_SCHED_CTXS)
@@ -828,8 +835,6 @@ void _starpu_decrement_nsubmitted_tasks_of_sched_ctx(unsigned sched_ctx_id)
 		{
 			STARPU_PTHREAD_MUTEX_UNLOCK(&finished_submit_mutex);
 
-			/* take care the context is not deleted or changed at the same time */
-			STARPU_PTHREAD_RWLOCK_RDLOCK(&changing_ctx_mutex[sched_ctx_id]);
 			if(sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
 			{
 				if(sched_ctx->close_callback)
@@ -844,8 +849,7 @@ void _starpu_decrement_nsubmitted_tasks_of_sched_ctx(unsigned sched_ctx_id)
 					free(workerids);
 				}
 			}
-			STARPU_PTHREAD_RWLOCK_UNLOCK(&changing_ctx_mutex[sched_ctx_id]);
-
+			_starpu_barrier_counter_decrement_until_empty_counter(&sched_ctx->tasks_barrier, 0.0);
 			return;
 		}
 		STARPU_PTHREAD_MUTEX_UNLOCK(&finished_submit_mutex);
@@ -861,13 +865,11 @@ void _starpu_decrement_nsubmitted_tasks_of_sched_ctx(unsigned sched_ctx_id)
 	STARPU_PTHREAD_MUTEX_LOCK(&config->submitted_mutex);
 	if(config->submitting == 0)
 	{
-		STARPU_PTHREAD_RWLOCK_RDLOCK(&changing_ctx_mutex[sched_ctx_id]);
 		if(sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
 		{
 			if(sched_ctx->close_callback)
 				sched_ctx->close_callback(sched_ctx->id, sched_ctx->close_args);
 		}
-		STARPU_PTHREAD_RWLOCK_UNLOCK(&changing_ctx_mutex[sched_ctx_id]);
 
 		ANNOTATE_HAPPENS_AFTER(&config->running);
 		config->running = 0;
@@ -883,6 +885,8 @@ void _starpu_decrement_nsubmitted_tasks_of_sched_ctx(unsigned sched_ctx_id)
 	}
 	STARPU_PTHREAD_MUTEX_UNLOCK(&config->submitted_mutex);
 
+	_starpu_barrier_counter_decrement_until_empty_counter(&sched_ctx->tasks_barrier, 0.0);
+
 	return;
 }
 

+ 1 - 1
src/profiling/bound.c

@@ -198,7 +198,7 @@ static double** initialize_arch_duration(int maxdevid, unsigned* maxncore_table)
 			maxncore = maxncore_table[devid];
 		else
 			maxncore = 1;
-		arch_model[devid] = malloc(sizeof(*arch_model[devid])*(maxncore+1));
+		arch_model[devid] = calloc(maxncore+1,sizeof(*arch_model[devid]));
 	}
 	return arch_model;
 }

+ 8 - 5
src/sched_policies/eager_central_policy.c

@@ -175,13 +175,16 @@ static void eager_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nw
 	for (i = 0; i < nworkers; i++)
 	{
 		workerid = workerids[i];
-		starpu_pthread_mutex_t *sched_mutex;
-		starpu_pthread_cond_t *sched_cond;
-		starpu_worker_get_sched_condition(workerid, &sched_mutex, &sched_cond);
-		starpu_wakeup_worker(workerid, sched_cond, sched_mutex);
+		int curr_workerid = starpu_worker_get_id();
+		if(workerid != curr_workerid)
+		{
+			starpu_pthread_mutex_t *sched_mutex;
+			starpu_pthread_cond_t *sched_cond;
+			starpu_worker_get_sched_condition(workerid, &sched_mutex, &sched_cond);
+			starpu_wakeup_worker(workerid, sched_cond, sched_mutex);
+		}
 
 		starpu_sched_ctx_worker_shares_tasks_lists(workerid, sched_ctx_id);
-
 	}
 }