Browse Source

reenable modular_ws

Olivier Aumage 8 years ago
parent
commit
1a4f99d734
2 changed files with 31 additions and 1 deletions
  1. 1 1
      src/core/sched_policy.c
  2. 30 0
      src/sched_policies/component_work_stealing.c

+ 1 - 1
src/core/sched_policy.c

@@ -53,7 +53,6 @@ int starpu_get_prefetch_flag(void)
 static struct starpu_sched_policy *predefined_policies[] =
 {
 #if 0
-	&_starpu_sched_modular_ws_policy,
 	&_starpu_sched_modular_heft_policy,
 	&_starpu_sched_modular_heft_prio_policy,
 	&_starpu_sched_modular_heft2_policy,
@@ -66,6 +65,7 @@ static struct starpu_sched_policy *predefined_policies[] =
 	&_starpu_sched_modular_random_prio_policy,
 	&_starpu_sched_modular_random_prefetching_policy,
 	&_starpu_sched_modular_random_prio_prefetching_policy,
+	&_starpu_sched_modular_ws_policy,
 	&_starpu_sched_eager_policy,
 	&_starpu_sched_prio_policy,
 	&_starpu_sched_random_policy,

+ 30 - 0
src/sched_policies/component_work_stealing.c

@@ -52,11 +52,16 @@ static struct starpu_task *  steal_task_round_robin(struct starpu_sched_componen
 	/* If the worker's queue have no suitable tasks, let's try
 	 * the next ones */
 	struct starpu_task * task = NULL;
+	const int relaxed_state = _starpu_worker_get_observation_safe_state();
 	while (1)
 	{
 		struct _starpu_prio_deque * fifo = wsd->fifos[i];
 
+		if (!relaxed_state)
+			_starpu_worker_enter_section_safe_for_observation();
 		STARPU_PTHREAD_MUTEX_LOCK(wsd->mutexes[i]);
+		if (!relaxed_state)
+			_starpu_worker_leave_section_safe_for_observation();
 		task = _starpu_prio_deque_deque_task_for_worker(fifo, workerid);
 		if(task && !isnan(task->predicted))
 		{
@@ -134,7 +139,9 @@ static struct starpu_task * pull_task(struct starpu_sched_component * component)
 	}
 	STARPU_ASSERT(i < component->nchildren);
 	struct _starpu_work_stealing_data * wsd = component->data;
+	_starpu_worker_enter_section_safe_for_observation();
 	STARPU_PTHREAD_MUTEX_LOCK(wsd->mutexes[i]);
+	_starpu_worker_leave_section_safe_for_observation();
 	struct starpu_task * task = _starpu_prio_deque_pop_task(wsd->fifos[i]);
 	if(task)
 	{
@@ -156,7 +163,9 @@ static struct starpu_task * pull_task(struct starpu_sched_component * component)
 	task  = steal_task(component, workerid);
 	if(task)
 	{
+		_starpu_worker_enter_section_safe_for_observation();
 		STARPU_PTHREAD_MUTEX_LOCK(wsd->mutexes[i]);
+		_starpu_worker_leave_section_safe_for_observation();
 		wsd->fifos[i]->nprocessed++;
 		STARPU_PTHREAD_MUTEX_UNLOCK(wsd->mutexes[i]);
 
@@ -186,9 +195,14 @@ double _ws_estimated_end(struct starpu_sched_component * component)
 	double sum_len = 0.0;
 	double sum_start = 0.0;
 	int i;
+	const int relaxed_state = _starpu_worker_get_observation_safe_state();
 	for(i = 0; i < component->nchildren; i++)
 	{
+		if (!relaxed_state)
+			_starpu_worker_enter_section_safe_for_observation();
 		STARPU_PTHREAD_MUTEX_LOCK(wsd->mutexes[i]);
+		if (!relaxed_state)
+			_starpu_worker_leave_section_safe_for_observation();
 		sum_len += wsd->fifos[i]->exp_len;
 		wsd->fifos[i]->exp_start = STARPU_MAX(starpu_timing_now(), wsd->fifos[i]->exp_start);
 		sum_start += wsd->fifos[i]->exp_start;
@@ -206,9 +220,14 @@ double _ws_estimated_load(struct starpu_sched_component * component)
 	struct _starpu_work_stealing_data * wsd = component->data;
 	int ntasks = 0;
 	int i;
+	const int relaxed_state = _starpu_worker_get_observation_safe_state();
 	for(i = 0; i < component->nchildren; i++)
 	{
+		if (!relaxed_state)
+			_starpu_worker_enter_section_safe_for_observation();
 		STARPU_PTHREAD_MUTEX_LOCK(wsd->mutexes[i]);
+		if (!relaxed_state)
+			_starpu_worker_leave_section_safe_for_observation();
 		ntasks += wsd->fifos[i]->ntasks;
 		STARPU_PTHREAD_MUTEX_UNLOCK(wsd->mutexes[i]);
 	}
@@ -230,8 +249,13 @@ static int push_task(struct starpu_sched_component * component, struct starpu_ta
 	int ret;
 	int i = wsd->last_push_child;
 	i = (i+1)%component->nchildren;
+	const int relaxed_state = _starpu_worker_get_observation_safe_state();
 
+	if (!relaxed_state)
+		_starpu_worker_enter_section_safe_for_observation();
 	STARPU_PTHREAD_MUTEX_LOCK(wsd->mutexes[i]);
+	if (!relaxed_state)
+		_starpu_worker_leave_section_safe_for_observation();
 	_STARPU_TASK_BREAK_ON(task, sched);
 	ret = _starpu_prio_deque_push_task(wsd->fifos[i], task);
 	STARPU_PTHREAD_MUTEX_UNLOCK(wsd->mutexes[i]);
@@ -251,6 +275,7 @@ int starpu_sched_tree_work_stealing_push_task(struct starpu_task *task)
 
 	unsigned sched_ctx_id = task->sched_ctx;
 	struct starpu_sched_component * component =starpu_sched_component_worker_get(sched_ctx_id, workerid);
+	const int relaxed_state = _starpu_worker_get_observation_safe_state();
 	while(component->parents[sched_ctx_id] != NULL)
 	{
 		component = component->parents[sched_ctx_id];
@@ -266,7 +291,11 @@ int starpu_sched_tree_work_stealing_push_task(struct starpu_task *task)
 			STARPU_ASSERT(i < component->nchildren);
 
 			struct _starpu_work_stealing_data * wsd = component->data;
+			if (!relaxed_state)
+				_starpu_worker_enter_section_safe_for_observation();
 			STARPU_PTHREAD_MUTEX_LOCK(wsd->mutexes[i]);
+			if (!relaxed_state)
+				_starpu_worker_leave_section_safe_for_observation();
 			int ret = _starpu_prio_deque_push_task(wsd->fifos[i] , task);
 			if(ret == 0 && !isnan(task->predicted))
 				wsd->fifos[i]->exp_len += task->predicted;
@@ -278,6 +307,7 @@ int starpu_sched_tree_work_stealing_push_task(struct starpu_task *task)
 		}
 	}
 
+	STARPU_ASSERT(0);
 	/* this should not be reached */
 	return starpu_sched_tree_push_task(task);
 }