Forráskód Böngészése

update pheft with new synchro scheme
re-enable corresponding tests and examples

Olivier Aumage 8 éve
szülő
commit
3953060972

+ 0 - 2
examples/openmp/vector_scal_omp.c

@@ -98,8 +98,6 @@ int main(int argc, char **argv)
 	 * sections, so only enable one combined worker at a time.  */
 	conf.single_combined_worker = 1;
 	conf.sched_policy_name = "pheft";
-#warning "pheft needs update with new synchro scheme"
-	return 77;
 
 	ret = starpu_init(&conf);
 	if (ret == -ENODEV) return 77;

+ 0 - 2
examples/spmd/vector_scal_spmd.c

@@ -112,8 +112,6 @@ int main(int argc, char **argv)
 	starpu_conf_init(&conf);
 	conf.single_combined_worker = 1;
 	conf.sched_policy_name = "pheft";
-#warning "pheft needs update with new synchro scheme"
-	return 77;
 
 	ret = starpu_init(&conf);
 	if (ret == -ENODEV) return 77;

+ 2 - 1
src/core/sched_ctx_list.c

@@ -253,7 +253,8 @@ void _starpu_sched_ctx_list_remove_elt(struct _starpu_sched_ctx_list **list,
 		if (parent->prev == NULL)
 		{
 			*list = parent->next;
-			parent->next->prev = NULL;
+			if (parent->next != NULL)
+				parent->next->prev = NULL;
 		}
 		else
 		{

+ 1 - 1
src/core/sched_policy.c

@@ -70,7 +70,6 @@ static struct starpu_sched_policy *predefined_policies[] =
 	&_starpu_sched_dmda_ready_policy,
 	&_starpu_sched_dmda_sorted_policy,
 	&_starpu_sched_dmda_sorted_decision_policy,
-	&_starpu_sched_parallel_heft_policy,
 #else
 	&_starpu_sched_eager_policy,
 	&_starpu_sched_prio_policy,
@@ -79,6 +78,7 @@ static struct starpu_sched_policy *predefined_policies[] =
 	&_starpu_sched_ws_policy,
 	&_starpu_sched_lws_policy,
 	&_starpu_sched_heteroprio_policy,
+	&_starpu_sched_parallel_heft_policy,
 	&_starpu_sched_graph_test_policy,
 #warning TODO: update sched policies with new synchro scheme
 #endif

+ 20 - 4
src/core/workers.h

@@ -837,9 +837,9 @@ static inline void _starpu_worker_leave_changing_ctx_op(struct _starpu_worker *
 static inline void _starpu_worker_lock_for_observation_relax(int workerid)
 {
 	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
-	int cur_workerid = starpu_worker_get_id();
 	STARPU_ASSERT(worker != NULL);
 	STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
+	int cur_workerid = starpu_worker_get_id();
 	if (workerid != cur_workerid)
 	{
 		struct _starpu_worker *cur_worker = cur_workerid<starpu_worker_get_count()?_starpu_get_worker_struct(cur_workerid):NULL;
@@ -877,11 +877,11 @@ static inline void _starpu_worker_lock_for_observation_no_relax(int workerid)
 static inline int _starpu_worker_trylock_for_observation(int workerid)
 {
 	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
-	int cur_workerid = starpu_worker_get_id();
 	STARPU_ASSERT(worker != NULL);
 	int ret = STARPU_PTHREAD_MUTEX_TRYLOCK_SCHED(&worker->sched_mutex);
 	if (ret)
 		return ret;
+	int cur_workerid = starpu_worker_get_id();
 	if (workerid != cur_workerid) {
 		ret = !worker->state_safe_for_observation;
 		if (ret)
@@ -901,7 +901,7 @@ static inline void _starpu_worker_unlock_for_observation(int workerid)
  * but the scheduling has not yet been made or is already done */
 static inline void _starpu_worker_enter_section_safe_for_observation(void)
 {
-	int workerid = starpu_worker_get_id();
+	int workerid = starpu_worker_get_id_check();
 	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
 	STARPU_ASSERT(worker != NULL);
 	STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
@@ -913,7 +913,7 @@ static inline void _starpu_worker_enter_section_safe_for_observation(void)
 
 static inline void _starpu_worker_leave_section_safe_for_observation(void)
 {
-	int workerid = starpu_worker_get_id();
+	int workerid = starpu_worker_get_id_check();
 	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
 	STARPU_ASSERT(worker != NULL);
 	STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
@@ -922,4 +922,20 @@ static inline void _starpu_worker_leave_section_safe_for_observation(void)
 	STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
 }
 
+static inline void _starpu_worker_lock_self(void)
+{
+	int workerid = starpu_worker_get_id_check();
+	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
+	STARPU_ASSERT(worker != NULL);
+	STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
+}
+
+static inline void _starpu_worker_unlock_self(void)
+{
+	int workerid = starpu_worker_get_id_check();
+	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
+	STARPU_ASSERT(worker != NULL);
+	STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
+}
+
 #endif // __WORKERS_H__

+ 41 - 54
src/sched_policies/parallel_heft.c

@@ -88,17 +88,14 @@ static void parallel_heft_pre_exec_hook(struct starpu_task *task, unsigned sched
 	if (isnan(transfer_model))
 		transfer_model = 0.0;
 
-	starpu_pthread_mutex_t *sched_mutex;
-	starpu_pthread_cond_t *sched_cond;
-	starpu_worker_get_sched_condition(workerid, &sched_mutex, &sched_cond);
 	/* Once we have executed the task, we can update the predicted amount
 	 * of work. */
-	STARPU_PTHREAD_MUTEX_LOCK_SCHED(sched_mutex);
+	_starpu_worker_lock_self();
 	worker_exp_len[workerid] -= model + transfer_model;
 	worker_exp_start[workerid] = starpu_timing_now() + model;
 	worker_exp_end[workerid] = worker_exp_start[workerid] + worker_exp_len[workerid];
 	ntasks[workerid]--;
-	STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(sched_mutex);
+	_starpu_worker_unlock_self();
 }
 
 static int push_task_on_best_worker(struct starpu_task *task, int best_workerid, double exp_end_predicted, int prio, unsigned sched_ctx_id)
@@ -119,11 +116,7 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 
 	if (!starpu_worker_is_combined_worker(best_workerid))
 	{
-		starpu_pthread_mutex_t *sched_mutex;
-		starpu_pthread_cond_t *sched_cond;
-		starpu_worker_get_sched_condition(best_workerid, &sched_mutex, &sched_cond);
-
-		STARPU_PTHREAD_MUTEX_LOCK_SCHED(sched_mutex);
+		_starpu_worker_lock_for_observation_relax(best_workerid);
 		task->predicted = exp_end_predicted - worker_exp_end[best_workerid];
 		/* TODO */
 		task->predicted_transfer = 0;
@@ -132,7 +125,7 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 		worker_exp_start[best_workerid] = exp_end_predicted - worker_exp_len[best_workerid];
 
 		ntasks[best_workerid]++;
-		STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(sched_mutex);
+		_starpu_worker_unlock_for_observation(best_workerid);
 
 		/* We don't want it to interlace its task with a combined
 		 * worker's one */
@@ -163,24 +156,21 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 		for (i = 0; i < worker_size; i++)
 		{
 			struct starpu_task *alias = starpu_task_dup(task);
-			int local_worker = combined_workerid[i];
+			int local_combined_workerid = combined_workerid[i];
 
-			alias->predicted = exp_end_predicted - worker_exp_end[local_worker];
+			alias->predicted = exp_end_predicted - worker_exp_end[local_combined_workerid];
 			/* TODO */
 			alias->predicted_transfer = 0;
 			alias->destroy = 1;
-			starpu_pthread_mutex_t *sched_mutex;
-			starpu_pthread_cond_t *sched_cond;
-			starpu_worker_get_sched_condition(local_worker, &sched_mutex, &sched_cond);
-			STARPU_PTHREAD_MUTEX_LOCK_SCHED(sched_mutex);
-			worker_exp_len[local_worker] += alias->predicted;
-			worker_exp_end[local_worker] = exp_end_predicted;
-			worker_exp_start[local_worker] = exp_end_predicted - worker_exp_len[local_worker];
-
-			ntasks[local_worker]++;
-			STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(sched_mutex);
-
-			ret |= starpu_push_local_task(local_worker, alias, prio);
+			_starpu_worker_lock_for_observation_relax(local_combined_workerid);
+			worker_exp_len[local_combined_workerid] += alias->predicted;
+			worker_exp_end[local_combined_workerid] = exp_end_predicted;
+			worker_exp_start[local_combined_workerid] = exp_end_predicted - worker_exp_len[local_combined_workerid];
+
+			ntasks[local_combined_workerid]++;
+			_starpu_worker_unlock_for_observation(local_combined_workerid);
+
+			ret |= starpu_push_local_task(local_combined_workerid, alias, prio);
 		}
 
 		STARPU_PTHREAD_MUTEX_UNLOCK(&hd->global_push_mutex);
@@ -284,10 +274,10 @@ static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio, uns
 	struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
 	unsigned nworkers_ctx = workers->nworkers;
 
-	unsigned worker, worker_ctx = 0;
+	unsigned workerid, worker_ctx = 0;
 	int best = -1, best_id_ctx = -1;
 
-	/* this flag is set if the corresponding worker is selected because
+	/* this flag is set if the corresponding workerid is selected because
 	   there is no performance prediction available yet */
 	int forced_best = -1, forced_best_ctx = -1, forced_nimpl = -1;
 
@@ -317,20 +307,17 @@ static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio, uns
 	workers->init_iterator(workers, &it);
 	while(workers->has_next(workers, &it))
 	{
-		worker = workers->get_next(workers, &it);
+		workerid = workers->get_next(workers, &it);
 
-		if(!starpu_worker_is_combined_worker(worker))
+		if(!starpu_worker_is_combined_worker(workerid))
 		{
-			starpu_pthread_mutex_t *sched_mutex;
-			starpu_pthread_cond_t *sched_cond;
-			starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
 			/* Sometimes workers didn't take the tasks as early as we expected */
-			STARPU_PTHREAD_MUTEX_LOCK_SCHED(sched_mutex);
-			worker_exp_start[worker] = STARPU_MAX(worker_exp_start[worker], starpu_timing_now());
-			worker_exp_end[worker] = worker_exp_start[worker] + worker_exp_len[worker];
-			if (worker_exp_end[worker] > max_exp_end)
-				max_exp_end = worker_exp_end[worker];
-			STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(sched_mutex);
+			_starpu_worker_lock_for_observation_relax(workerid);
+			worker_exp_start[workerid] = STARPU_MAX(worker_exp_start[workerid], starpu_timing_now());
+			worker_exp_end[workerid] = worker_exp_start[workerid] + worker_exp_len[workerid];
+			if (worker_exp_end[workerid] > max_exp_end)
+				max_exp_end = worker_exp_end[workerid];
+			_starpu_worker_unlock_for_observation(workerid);
 		}
 	}
 
@@ -338,11 +325,11 @@ static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio, uns
 	worker_ctx = 0;
 	while(workers->has_next(workers, &it))
 	{
-                worker = workers->get_next(workers, &it);
+                workerid = workers->get_next(workers, &it);
 
 		for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
 		{
-			if (!starpu_combined_worker_can_execute_task(worker, task, nimpl))
+			if (!starpu_combined_worker_can_execute_task(workerid, task, nimpl))
 			{
 				/* no one on that queue may execute this task */
 				skip_worker[worker_ctx][nimpl] = 1;
@@ -354,23 +341,23 @@ static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio, uns
 			}
 
 
-			struct starpu_perfmodel_arch* perf_arch = starpu_worker_get_perf_archtype(worker, sched_ctx_id);
+			struct starpu_perfmodel_arch* perf_arch = starpu_worker_get_perf_archtype(workerid, sched_ctx_id);
 
 			local_task_length[worker_ctx][nimpl] = starpu_task_expected_length(task, perf_arch,nimpl);
 
-			unsigned memory_node = starpu_worker_get_memory_node(worker);
+			unsigned memory_node = starpu_worker_get_memory_node(workerid);
 			local_data_penalty[worker_ctx][nimpl] = starpu_task_expected_data_transfer_time(memory_node, task);
 
-			double ntasks_end = compute_ntasks_end(worker, sched_ctx_id);
+			double ntasks_end = compute_ntasks_end(workerid, sched_ctx_id);
 
 			if (ntasks_best == -1
 			    || (!calibrating && ntasks_end < ntasks_best_end) /* Not calibrating, take better task */
-			    || (!calibrating && isnan(local_task_length[worker_ctx][nimpl])) /* Not calibrating but this worker is being calibrated */
-			    || (calibrating && isnan(local_task_length[worker_ctx][nimpl]) && ntasks_end < ntasks_best_end) /* Calibrating, compete this worker with other non-calibrated */
+			    || (!calibrating && isnan(local_task_length[worker_ctx][nimpl])) /* Not calibrating but this workerid is being calibrated */
+			    || (calibrating && isnan(local_task_length[worker_ctx][nimpl]) && ntasks_end < ntasks_best_end) /* Calibrating, compete this workerid with other non-calibrated */
 					)
 			{
 				ntasks_best_end = ntasks_end;
-				ntasks_best = worker;
+				ntasks_best = workerid;
 				ntasks_best_ctx = worker_ctx;
 				nimpl_best = nimpl;
 			}
@@ -381,7 +368,7 @@ static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio, uns
 				if (!warned)
 				{
 					warned = 1;
-					_STARPU_DISP("Warning: performance model for %s not finished calibrating on %u, using a dumb scheduling heuristic for now\n", starpu_task_get_name(task), worker);
+					_STARPU_DISP("Warning: performance model for %s not finished calibrating on %u, using a dumb scheduling heuristic for now\n", starpu_task_get_name(task), workerid);
 				}
 				/* we are calibrating, we want to speed-up calibration time
 				 * so we privilege non-calibrated tasks (but still
@@ -398,9 +385,9 @@ static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio, uns
 			if (unknown)
 				continue;
 
-			local_exp_end[worker_ctx][nimpl] = compute_expected_end(worker, local_task_length[worker_ctx][nimpl]);
+			local_exp_end[worker_ctx][nimpl] = compute_expected_end(workerid, local_task_length[worker_ctx][nimpl]);
 
-			//fprintf(stderr, "WORKER %d -> length %e end %e\n", worker, local_task_length[worker_ctx][nimpl], local_exp_end[worker][nimpl]);
+			//fprintf(stderr, "WORKER %d -> length %e end %e\n", workerid, local_task_length[worker_ctx][nimpl], local_exp_end[workerid][nimpl]);
 
 			if (local_exp_end[worker_ctx][nimpl] < best_exp_end)
 			{
@@ -411,7 +398,7 @@ static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio, uns
 
 
 			local_energy[worker_ctx][nimpl] = starpu_task_expected_energy(task, perf_arch,nimpl);
-			//_STARPU_DEBUG("Scheduler parallel heft: task length (%lf) local energy (%lf) worker (%u) kernel (%u) \n", local_task_length[worker],local_energy[worker],worker,nimpl);
+			//_STARPU_DEBUG("Scheduler parallel heft: task length (%lf) local energy (%lf) workerid (%u) kernel (%u) \n", local_task_length[workerid],local_energy[workerid],workerid,nimpl);
 
 			if (isnan(local_energy[worker_ctx][nimpl]))
 				local_energy[worker_ctx][nimpl] = 0.;
@@ -434,7 +421,7 @@ static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio, uns
 		worker_ctx = 0;
 		while(workers->has_next(workers, &it))
 		{
-			worker = workers->get_next(workers, &it);
+			workerid = workers->get_next(workers, &it);
 
 			for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
 			{
@@ -458,12 +445,12 @@ static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio, uns
 				{
 					/* we found a better solution */
 					best_fitness = fitness[worker_ctx][nimpl];
-					best = worker;
+					best = workerid;
 					best_id_ctx = worker_ctx;
 					nimpl_best = nimpl;
 				}
 
-			//	fprintf(stderr, "FITNESS worker %d -> %e local_exp_end %e - local_data_penalty %e\n", worker, fitness[worker][nimpl], local_exp_end[worker][nimpl] - best_exp_end, local_data_penalty[worker][nimpl]);
+			//	fprintf(stderr, "FITNESS workerid %d -> %e local_exp_end %e - local_data_penalty %e\n", workerid, fitness[workerid][nimpl], local_exp_end[workerid][nimpl] - best_exp_end, local_data_penalty[workerid][nimpl]);
 			}
 			worker_ctx++;
 		}
@@ -490,7 +477,7 @@ static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio, uns
 
 	//_STARPU_DEBUG("Scheduler parallel heft: kernel (%u)\n", nimpl_best);
 	starpu_task_set_implementation(task, nimpl_best);
-	/* we should now have the best worker in variable "best" */
+	/* we should now have the best workerid in variable "best" */
 	_STARPU_TASK_BREAK_ON(task, sched);
 	return push_task_on_best_worker(task, best, best_exp_end, prio, sched_ctx_id);
 }

+ 0 - 2
tests/parallel_tasks/cuda_only.c

@@ -66,8 +66,6 @@ int main(int argc, char **argv)
         struct starpu_conf conf;
 	starpu_conf_init(&conf);
 	conf.sched_policy_name = "pheft";
-#warning "pheft needs update with new synchro scheme"
-	return 77;
 
 	ret = starpu_init(&conf);
 	if (ret == -ENODEV) return STARPU_TEST_SKIPPED;

+ 0 - 2
tests/parallel_tasks/explicit_combined_worker.c

@@ -69,8 +69,6 @@ int main(int argc, char **argv)
 	ret = starpu_conf_init(&conf);
 	STARPU_CHECK_RETURN_VALUE(ret, "starpu_conf_init");
 	conf.sched_policy_name = "pheft";
-#warning "pheft needs update with new synchro scheme"
-	return 77;
 	conf.calibrate = 1;
 
 	ret = starpu_init(&conf);

+ 0 - 2
tests/parallel_tasks/parallel_kernels.c

@@ -75,8 +75,6 @@ int main(int argc, char **argv)
         struct starpu_conf conf;
 	starpu_conf_init(&conf);
 	conf.sched_policy_name = "pheft";
-#warning "pheft needs update with new synchro scheme"
-	return 77;
 	conf.calibrate = 1;
 
 	ret = starpu_init(&conf);

+ 0 - 2
tests/parallel_tasks/parallel_kernels_spmd.c

@@ -77,8 +77,6 @@ int main(int argc, char **argv)
         struct starpu_conf conf;
 	starpu_conf_init(&conf);
 	conf.sched_policy_name = "pheft";
-#warning "pheft needs update with new synchro scheme"
-	return 77;
 	conf.calibrate = 1;
 
 	ret = starpu_init(&conf);