Browse Source

reenable modular_eager_prefetching
relax synchro on blocked state observation

Olivier Aumage 8 years ago
parent
commit
98f48e3fd4
3 changed files with 52 additions and 21 deletions
  1. 1 1
      src/core/sched_policy.c
  2. 28 2
      src/core/workers.c
  3. 23 18
      src/core/workers.h

+ 1 - 1
src/core/sched_policy.c

@@ -53,7 +53,6 @@ int starpu_get_prefetch_flag(void)
 static struct starpu_sched_policy *predefined_policies[] =
 {
 #if 0
-	&_starpu_sched_modular_eager_prefetching_policy,
 	&_starpu_sched_modular_prio_policy,
 	&_starpu_sched_modular_prio_prefetching_policy,
 	&_starpu_sched_modular_random_policy,
@@ -66,6 +65,7 @@ static struct starpu_sched_policy *predefined_policies[] =
 	&_starpu_sched_modular_heft2_policy,
 #else
 	&_starpu_sched_modular_eager_policy,
+	&_starpu_sched_modular_eager_prefetching_policy,
 	&_starpu_sched_eager_policy,
 	&_starpu_sched_prio_policy,
 	&_starpu_sched_random_policy,

+ 28 - 2
src/core/workers.c

@@ -582,6 +582,7 @@ static void _starpu_worker_init(struct _starpu_worker *workerarg, struct _starpu
 	workerarg->state_sched_op_pending = 0;
 	workerarg->state_changing_ctx_waiting = 0;
 	workerarg->state_changing_ctx_notice = 0;
+	workerarg->state_blocked_in_parallel_observed = 0;
 	workerarg->state_blocked_in_parallel = 0;
 	workerarg->state_block_in_parallel_req = 0;
 	workerarg->state_block_in_parallel_ack = 0;
@@ -1692,9 +1693,34 @@ unsigned starpu_worker_get_count(void)
 
 unsigned starpu_worker_is_blocked_in_parallel(int workerid)
 {
-	_starpu_worker_lock_for_observation_no_relax(workerid);
+	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
+	STARPU_ASSERT(worker != NULL);
+	STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
+	int cur_workerid = starpu_worker_get_id();
+	if (workerid != cur_workerid)
+	{
+		struct _starpu_worker *cur_worker = cur_workerid<starpu_worker_get_count()?_starpu_get_worker_struct(cur_workerid):NULL;
+		int relax_own_observation_state = (cur_worker != NULL) && (cur_worker->state_safe_for_observation == 0);
+		if (relax_own_observation_state && !worker->state_safe_for_observation)
+		{
+			cur_worker->state_safe_for_observation = 1;
+			STARPU_PTHREAD_COND_BROADCAST(&cur_worker->sched_cond);
+		}
+		while (!worker->state_safe_for_observation
+				|| worker->state_block_in_parallel_req
+				|| worker->state_unblock_in_parallel_req
+				|| worker->state_changing_ctx_notice)
+		{
+			STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
+		}
+		if (relax_own_observation_state)
+		{
+			cur_worker->state_safe_for_observation = 0;
+		}
+	}
 	unsigned ret = _starpu_config.workers[workerid].state_blocked_in_parallel;
-	_starpu_worker_unlock_for_observation(workerid);
+	worker->state_blocked_in_parallel_observed = 1;
+	STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
 	return ret;
 }
 

+ 23 - 18
src/core/workers.h

@@ -87,15 +87,16 @@ LIST_TYPE(_starpu_worker,
 	 * since the condition is shared for multiple purpose */
 	starpu_pthread_cond_t sched_cond;
         starpu_pthread_mutex_t sched_mutex; /* mutex protecting sched_cond */
-	int state_safe_for_observation:1; /* mark scheduling sections where other workers can safely access the worker state */
-	int state_sched_op_pending:1; /* a task pop is ongoing even though sched_mutex may temporarily be unlocked */
-	int state_changing_ctx_waiting:1; /* a thread is waiting for operations such as pop to complete before acquiring sched_mutex and modifying the worker ctx*/
-	int state_changing_ctx_notice:1; /* the worker ctx is about to change or being changed, wait for flag to be cleared before starting new scheduling operations */
-	int state_blocked_in_parallel:1; /* worker is currently blocked on a parallel section */
-	int state_block_in_parallel_req:1; /* a request for state transition from unblocked to blocked is pending */
-	int state_block_in_parallel_ack:1; /* a block request has been honored */
-	int state_unblock_in_parallel_req:1; /* a request for state transition from blocked to unblocked is pending */
-	int state_unblock_in_parallel_ack:1; /* an unblock request has been honored */
+	unsigned state_safe_for_observation; /* mark scheduling sections where other workers can safely access the worker state */
+	unsigned state_sched_op_pending; /* a task pop is ongoing even though sched_mutex may temporarily be unlocked */
+	unsigned state_changing_ctx_waiting; /* a thread is waiting for operations such as pop to complete before acquiring sched_mutex and modifying the worker ctx*/
+	unsigned state_changing_ctx_notice; /* the worker ctx is about to change or being changed, wait for flag to be cleared before starting new scheduling operations */
+	unsigned state_blocked_in_parallel; /* worker is currently blocked on a parallel section */
+	unsigned state_blocked_in_parallel_observed; /* the blocked state of the worker has been observed by another worker during a relaxed section */
+	unsigned state_block_in_parallel_req; /* a request for state transition from unblocked to blocked is pending */
+	unsigned state_block_in_parallel_ack; /* a block request has been honored */
+	unsigned state_unblock_in_parallel_req; /* a request for state transition from blocked to unblocked is pending */
+	unsigned state_unblock_in_parallel_ack; /* an unblock request has been honored */
 	 /* cumulative blocking depth
 	  * - =0  worker unblocked
 	  * - >0  worker blocked
@@ -759,21 +760,25 @@ static inline void _starpu_worker_process_block_in_parallel_requests(struct _sta
  * should not be modified */
 static inline void _starpu_worker_enter_sched_op(struct _starpu_worker * const worker)
 {
-	/* process pending block requests before entering a sched_op region */
-	_starpu_worker_process_block_in_parallel_requests(worker);
-	while (worker->state_changing_ctx_notice)
+	if (!worker->state_blocked_in_parallel_observed)
 	{
-		STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
-
-		/* new block requests may have been triggered during the wait,
-		 * need to check again */
+		/* process pending block requests before entering a sched_op region */
 		_starpu_worker_process_block_in_parallel_requests(worker);
+		while (worker->state_changing_ctx_notice)
+		{
+			STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
+
+			/* new block requests may have been triggered during the wait,
+			 * need to check again */
+			_starpu_worker_process_block_in_parallel_requests(worker);
+		}
 	}
 
 	/* no block request and no ctx change ahead,
 	 * enter sched_op */
-	worker->state_safe_for_observation = 0;
 	worker->state_sched_op_pending = 1;
+	worker->state_blocked_in_parallel_observed = 0;
+	worker->state_safe_for_observation = 0;
 }
 
 /* Must be called with worker's sched_mutex held.
@@ -784,7 +789,7 @@ static inline void  _starpu_worker_leave_sched_op(struct _starpu_worker * const
 {
 	worker->state_safe_for_observation = 1;
 	worker->state_sched_op_pending = 0;
-	if (worker->state_changing_ctx_waiting)
+	if (!worker->state_blocked_in_parallel_observed && worker->state_changing_ctx_waiting)
 		STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
 }