Преглед на файлове

Fix atomicity of exp_start update

Samuel Thibault преди 13 години
родител
ревизия
a5974d8225
променени са 2 файла, в които са добавени 8 реда и са изтрити 8 реда
  1. 2 0
      src/sched_policies/heft.c
  2. 6 8
      src/sched_policies/parallel_heft.c

+ 2 - 0
src/sched_policies/heft.c

@@ -259,10 +259,12 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 		for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
 		{
 			/* Sometimes workers didn't take the tasks as early as we expected */
+			_STARPU_PTHREAD_MUTEX_LOCK(&sched_mutex[worker]);
 			exp_start[worker] = STARPU_MAX(exp_start[worker], starpu_timing_now());
 			exp_end[worker][nimpl] = exp_start[worker] + exp_len[worker];
 			if (exp_end[worker][nimpl] > max_exp_end)
 				max_exp_end = exp_end[worker][nimpl];
+			_STARPU_PTHREAD_MUTEX_UNLOCK(&sched_mutex[worker]);
 
 			if (!starpu_worker_can_execute_task(worker, task, nimpl))
 			{

+ 6 - 8
src/sched_policies/parallel_heft.c

@@ -34,8 +34,6 @@
 #define DBL_MAX __DBL_MAX__
 #endif
 
-static pthread_mutex_t big_lock;
-
 static unsigned nworkers, ncombinedworkers;
 //static enum starpu_perf_archtype applicable_perf_archtypes[STARPU_NARCH_VARIATIONS];
 //static unsigned napplicable_perf_archtypes = 0;
@@ -92,18 +90,18 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 
 	int ret = 0;
 
-	_STARPU_PTHREAD_MUTEX_LOCK(&big_lock);
-
 	if (is_basic_worker)
 	{
 		task->predicted = exp_end_predicted - worker_exp_end[best_workerid];
 		/* TODO */
 		task->predicted_transfer = 0;
+		_STARPU_PTHREAD_MUTEX_LOCK(&sched_mutex[best_workerid]);
 		worker_exp_len[best_workerid] += task->predicted;
 		worker_exp_end[best_workerid] = exp_end_predicted;
 		worker_exp_start[best_workerid] = exp_end_predicted - worker_exp_len[best_workerid];
 
 		ntasks[best_workerid]++;
+		_STARPU_PTHREAD_MUTEX_UNLOCK(&sched_mutex[best_workerid]);
 
 		ret = starpu_push_local_task(best_workerid, task, prio);
 	}
@@ -134,19 +132,19 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 			/* TODO */
 			alias->predicted_transfer = 0;
 
+			_STARPU_PTHREAD_MUTEX_LOCK(&sched_mutex[local_worker]);
 			worker_exp_len[local_worker] += alias->predicted;
 			worker_exp_end[local_worker] = exp_end_predicted;
 			worker_exp_start[local_worker] = exp_end_predicted - worker_exp_len[local_worker];
 
 			ntasks[local_worker]++;
+			_STARPU_PTHREAD_MUTEX_UNLOCK(&sched_mutex[local_worker]);
 
 			ret |= starpu_push_local_task(local_worker, alias, prio);
 		}
 
 	}
 
-	_STARPU_PTHREAD_MUTEX_UNLOCK(&big_lock);
-
 	return ret;
 }
 
@@ -239,10 +237,12 @@ static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio)
 	for (worker = 0; worker < nworkers; worker++)
 	{
 		/* Sometimes workers didn't take the tasks as early as we expected */
+		_STARPU_PTHREAD_MUTEX_LOCK(&sched_mutex[worker]);
 		worker_exp_start[worker] = STARPU_MAX(worker_exp_start[worker], starpu_timing_now());
 		worker_exp_end[worker] = worker_exp_start[worker] + worker_exp_len[worker];
 		if (worker_exp_end[worker] > max_exp_end)
 			max_exp_end = worker_exp_end[worker];
+		_STARPU_PTHREAD_MUTEX_UNLOCK(&sched_mutex[worker]);
 	}
 
 	unsigned nimpl;
@@ -432,8 +432,6 @@ static void initialize_parallel_heft_policy(struct starpu_machine_topology *topo
 		starpu_worker_set_sched_condition(workerid, &sched_cond[workerid], &sched_mutex[workerid]);
 	}
 
-	_STARPU_PTHREAD_MUTEX_INIT(&big_lock, NULL);
-
 	/* We pre-compute an array of all the perfmodel archs that are applicable */
 	unsigned total_worker_count = nworkers + ncombinedworkers;