Bladeren bron

update dm* policies with new synchro scheme

Olivier Aumage 8 jaren geleden
bovenliggende
commit
eaebdeebf1
3 gewijzigde bestanden met toevoegingen van 33 en 56 verwijderingen
  1. 0 2
      examples/fortran90/marshalling.c
  2. 5 5
      src/core/sched_policy.c
  3. 28 49
      src/sched_policies/deque_modeling_policy_data_aware.c

+ 0 - 2
examples/fortran90/marshalling.c

@@ -159,8 +159,6 @@ int starpu_my_init_c()
 	struct starpu_conf conf;
 	starpu_conf_init(&conf);
 	conf.sched_policy_name = "dmda";
-#warning "dmda needs update with new synchro scheme"
-	exit(77);
 
 	ret = starpu_init(&conf);
 	/*     int ret = starpu_init(NULL); */

+ 5 - 5
src/core/sched_policy.c

@@ -65,11 +65,6 @@ static struct starpu_sched_policy *predefined_policies[] =
 	&_starpu_sched_modular_heft_policy,
 	&_starpu_sched_modular_heft_prio_policy,
 	&_starpu_sched_modular_heft2_policy,
-	&_starpu_sched_dm_policy,
-	&_starpu_sched_dmda_policy,
-	&_starpu_sched_dmda_ready_policy,
-	&_starpu_sched_dmda_sorted_policy,
-	&_starpu_sched_dmda_sorted_decision_policy,
 #else
 	&_starpu_sched_eager_policy,
 	&_starpu_sched_prio_policy,
@@ -79,6 +74,11 @@ static struct starpu_sched_policy *predefined_policies[] =
 	&_starpu_sched_lws_policy,
 	&_starpu_sched_heteroprio_policy,
 	&_starpu_sched_parallel_heft_policy,
+	&_starpu_sched_dm_policy,
+	&_starpu_sched_dmda_policy,
+	&_starpu_sched_dmda_ready_policy,
+	&_starpu_sched_dmda_sorted_policy,
+	&_starpu_sched_dmda_sorted_decision_policy,
 	&_starpu_sched_graph_test_policy,
 #warning TODO: update sched policies with new synchro scheme
 #endif

+ 28 - 49
src/sched_policies/deque_modeling_policy_data_aware.c

@@ -325,12 +325,9 @@ static struct starpu_task *dmda_pop_every_task(unsigned sched_ctx_id)
 	fifo->exp_start = STARPU_MAX(starpu_timing_now(), fifo->exp_start);
 	fifo->exp_end = fifo->exp_start + fifo->exp_len;
 
-	starpu_pthread_mutex_t *sched_mutex;
-	starpu_pthread_cond_t *sched_cond;
-	starpu_worker_get_sched_condition(workerid, &sched_mutex, &sched_cond);
-	STARPU_PTHREAD_MUTEX_LOCK_SCHED(sched_mutex);
+	_starpu_worker_lock_self();
 	new_list = _starpu_fifo_pop_every_task(fifo, workerid);
-	STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(sched_mutex);
+	_starpu_worker_unlock_self();
 
 	starpu_sched_ctx_list_task_counters_reset(sched_ctx_id, workerid);
 
@@ -358,15 +355,11 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 
 	struct _starpu_fifo_taskq *fifo = dt->queue_array[best_workerid];
 
-	starpu_pthread_mutex_t *sched_mutex;
-	starpu_pthread_cond_t *sched_cond;
-	starpu_worker_get_sched_condition(best_workerid, &sched_mutex, &sched_cond);
-
 #ifdef STARPU_USE_SC_HYPERVISOR
 	starpu_sched_ctx_call_pushed_task_cb(best_workerid, sched_ctx_id);
 #endif //STARPU_USE_SC_HYPERVISOR
 
-	STARPU_PTHREAD_MUTEX_LOCK_SCHED(sched_mutex);
+	_starpu_worker_lock_for_observation_relax(best_workerid);
 
         /* Sometimes workers didn't take the tasks as early as we expected */
 	fifo->exp_start = isnan(fifo->exp_start) ? starpu_timing_now() + fifo->pipeline_len : STARPU_MAX(fifo->exp_start, starpu_timing_now());
@@ -412,7 +405,7 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 	}
 	fifo->exp_end = fifo->exp_start + fifo->exp_len;
 
-	STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(sched_mutex);
+	_starpu_worker_unlock_for_observation(best_workerid);
 
 	task->predicted = predicted;
 	task->predicted_transfer = predicted_transfer;
@@ -440,7 +433,7 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 	int ret = 0;
 	if (prio)
 	{
-		STARPU_PTHREAD_MUTEX_LOCK_SCHED(sched_mutex);
+		_starpu_worker_lock_for_observation_relax(best_workerid);
 		ret =_starpu_fifo_push_sorted_task(dt->queue_array[best_workerid], task);
 		if(dt->num_priorities != -1)
 		{
@@ -455,11 +448,11 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 		starpu_wake_worker_locked(best_workerid);
 #endif
 		starpu_push_task_end(task);
-		STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(sched_mutex);
+		_starpu_worker_unlock_for_observation(best_workerid);
 	}
 	else
 	{
-		STARPU_PTHREAD_MUTEX_LOCK_SCHED(sched_mutex);
+		_starpu_worker_lock_for_observation_relax(best_workerid);
 		starpu_task_list_push_back (&dt->queue_array[best_workerid]->taskq, task);
 		dt->queue_array[best_workerid]->ntasks++;
 		dt->queue_array[best_workerid]->nprocessed++;
@@ -467,7 +460,7 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 		starpu_wake_worker_locked(best_workerid);
 #endif
 		starpu_push_task_end(task);
-		STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(sched_mutex);
+		_starpu_worker_unlock_for_observation(best_workerid);
 	}
 
 	starpu_sched_ctx_list_task_counters_increment(sched_ctx_id, best_workerid);
@@ -660,15 +653,15 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 	{
 		unsigned nimpl;
 		unsigned impl_mask;
-		unsigned worker = workers->get_next(workers, &it);
-		struct _starpu_fifo_taskq *fifo = dt->queue_array[worker];
-		struct starpu_perfmodel_arch* perf_arch = starpu_worker_get_perf_archtype(worker, sched_ctx_id);
-		unsigned memory_node = starpu_worker_get_memory_node(worker);
+		unsigned workerid = workers->get_next(workers, &it);
+		struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
+		struct starpu_perfmodel_arch* perf_arch = starpu_worker_get_perf_archtype(workerid, sched_ctx_id);
+		unsigned memory_node = starpu_worker_get_memory_node(workerid);
 
 		/* Sometimes workers didn't take the tasks as early as we expected */
 		double exp_start = isnan(fifo->exp_start) ? starpu_timing_now() + fifo->pipeline_len : STARPU_MAX(fifo->exp_start, starpu_timing_now());
 
-		if (!starpu_worker_can_execute_task_impl(worker, task, &impl_mask))
+		if (!starpu_worker_can_execute_task_impl(workerid, task, &impl_mask))
 			continue;
 
 		for (nimpl  = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
@@ -678,11 +671,11 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 				/* no one on that queue may execute this task */
 				continue;
 			}
-			STARPU_ASSERT_MSG(fifo != NULL, "worker %u ctx %u\n", worker, sched_ctx_id);
+			STARPU_ASSERT_MSG(fifo != NULL, "workerid %u ctx %u\n", workerid, sched_ctx_id);
 
 			int fifo_ntasks = fifo->ntasks;
 			double prev_exp_len = fifo->exp_len;
-			/* consider the priority of the task when deciding on which worker to schedule,
+			/* consider the priority of the task when deciding on which workerid to schedule,
 			   compute the expected_end of the task if it is inserted before other tasks already scheduled */
 			if(sorted_decision)
 			{
@@ -693,12 +686,9 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 				}
 				else
 				{
-					starpu_pthread_mutex_t *sched_mutex;
-					starpu_pthread_cond_t *sched_cond;
-					starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
-					STARPU_PTHREAD_MUTEX_LOCK_SCHED(sched_mutex);
-					prev_exp_len = _starpu_fifo_get_exp_len_prev_task_list(fifo, task, worker, nimpl, &fifo_ntasks);
-					STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(sched_mutex);
+					_starpu_worker_lock_for_observation_relax(workerid);
+					prev_exp_len = _starpu_fifo_get_exp_len_prev_task_list(fifo, task, workerid, nimpl, &fifo_ntasks);
+					_starpu_worker_unlock_for_observation(workerid);
 				}
 			}
 
@@ -706,7 +696,7 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 			if (exp_end[worker_ctx][nimpl] > max_exp_end)
 				max_exp_end = exp_end[worker_ctx][nimpl];
 
-			//_STARPU_DEBUG("Scheduler dmda: task length (%lf) worker (%u) kernel (%u) \n", local_task_length[worker][nimpl],worker,nimpl);
+			//_STARPU_DEBUG("Scheduler dmda: task length (%lf) workerid (%u) kernel (%u) \n", local_task_length[workerid][nimpl],workerid,nimpl);
 
 			if (bundle)
 			{
@@ -747,18 +737,18 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 			    || (!calibrating && ntasks_end < ntasks_best_end)
 
 			    /* The performance model of this task is not
-			     * calibrated on this worker, try to run it there
+			     * calibrated on this workerid, try to run it there
 			     * to calibrate it there. */
 			    || (!calibrating && isnan(local_task_length[worker_ctx][nimpl]))
 
 			    /* the performance model of this task is not
-			     * calibrated on this worker either, rather run it
+			     * calibrated on this workerid either, rather run it
 			     * there if this one is low on scheduled tasks. */
 			    || (calibrating && isnan(local_task_length[worker_ctx][nimpl]) && ntasks_end < ntasks_best_end)
 				)
 			{
 				ntasks_best_end = ntasks_end;
-				ntasks_best = worker;
+				ntasks_best = workerid;
 				nimpl_best = nimpl;
 			}
 
@@ -1118,13 +1108,9 @@ static void dmda_pre_exec_hook(struct starpu_task *task, unsigned sched_ctx_id)
 	struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 	struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
 
-	starpu_pthread_mutex_t *sched_mutex;
-	starpu_pthread_cond_t *sched_cond;
-	starpu_worker_get_sched_condition(workerid, &sched_mutex, &sched_cond);
-
 	/* Once the task is executing, we can update the predicted amount
 	 * of work. */
-	STARPU_PTHREAD_MUTEX_LOCK_SCHED(sched_mutex);
+	_starpu_worker_lock_self();
 
 	_starpu_fifo_task_started(fifo, task, dt->num_priorities);
 
@@ -1132,7 +1118,7 @@ static void dmda_pre_exec_hook(struct starpu_task *task, unsigned sched_ctx_id)
 	fifo->exp_start = STARPU_MAX(starpu_timing_now() + fifo->pipeline_len, fifo->exp_start);
 	fifo->exp_end = fifo->exp_start + fifo->exp_len;
 
-	STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(sched_mutex);
+	_starpu_worker_unlock_self();
 }
 
 static void dmda_push_task_notify(struct starpu_task *task, int workerid, int perf_workerid, unsigned sched_ctx_id)
@@ -1147,13 +1133,9 @@ static void dmda_push_task_notify(struct starpu_task *task, int workerid, int pe
 						       starpu_task_get_implementation(task));
 
 	double predicted_transfer = starpu_task_expected_data_transfer_time(memory_node, task);
-	starpu_pthread_mutex_t *sched_mutex;
-	starpu_pthread_cond_t *sched_cond;
-	starpu_worker_get_sched_condition(workerid, &sched_mutex, &sched_cond);
-
 
 	/* Update the predictions */
-	STARPU_PTHREAD_MUTEX_LOCK_SCHED(sched_mutex);
+	_starpu_worker_lock_for_observation_relax(workerid);
 	/* Sometimes workers didn't take the tasks as early as we expected */
 	fifo->exp_start = isnan(fifo->exp_start) ? starpu_timing_now() + fifo->pipeline_len : STARPU_MAX(fifo->exp_start, starpu_timing_now());
 	fifo->exp_end = fifo->exp_start + fifo->exp_len;
@@ -1211,7 +1193,7 @@ static void dmda_push_task_notify(struct starpu_task *task, int workerid, int pe
 
 	fifo->ntasks++;
 
-	STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(sched_mutex);
+	_starpu_worker_unlock_for_observation(workerid);
 }
 
 static void dmda_post_exec_hook(struct starpu_task * task, unsigned sched_ctx_id)
@@ -1219,12 +1201,9 @@ static void dmda_post_exec_hook(struct starpu_task * task, unsigned sched_ctx_id
 	struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 	unsigned workerid = starpu_worker_get_id_check();
 	struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
-	starpu_pthread_mutex_t *sched_mutex;
-	starpu_pthread_cond_t *sched_cond;
-	starpu_worker_get_sched_condition(workerid, &sched_mutex, &sched_cond);
-	STARPU_PTHREAD_MUTEX_LOCK_SCHED(sched_mutex);
+	_starpu_worker_lock_self();
 	_starpu_fifo_task_finished(fifo, task, dt->num_priorities);
-	STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(sched_mutex);
+	_starpu_worker_unlock_self();
 }
 
 struct starpu_sched_policy _starpu_sched_dm_policy =