Browse Source

comment new blocking request synchronization scheme
tweak names and functions layout a little bit

Olivier Aumage 8 years ago
parent
commit
f00b8bcaf7
3 changed files with 91 additions and 39 deletions
  1. 3 4
      src/core/sched_ctx.c
  2. 82 29
      src/core/workers.h
  3. 6 6
      src/drivers/driver_common/driver_common.c

+ 3 - 4
src/core/sched_ctx.c

@@ -55,8 +55,7 @@ static void notify_workers_about_changing_ctx_pending(const unsigned nworkers, c
 	{
 		struct _starpu_worker *worker = _starpu_get_worker_struct(workerids[i]);
 		STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
-		_starpu_worker_set_changing_ctx_notice(worker);
-		_starpu_worker_wait_for_transient_sched_op_completion(worker);
+		_starpu_worker_enter_changing_ctx_op(worker);
 		STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
 	}
 }
@@ -68,7 +67,7 @@ static void notify_workers_about_changing_ctx_done(const unsigned nworkers, cons
 	{
 		struct _starpu_worker *worker = _starpu_get_worker_struct(workerids[i]);
 		STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
-		_starpu_worker_clear_changing_ctx_notice(worker);
+		_starpu_worker_leave_changing_ctx_op(worker);
 		STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
 	}
 }
@@ -1108,7 +1107,7 @@ void starpu_sched_ctx_delete(unsigned sched_ctx_id)
 		/*if btw the mutex release & the mutex lock the context has changed take care to free all
 		  scheduling data before deleting the context */
 
-		/* announce upcoming context changes, then wait for transient unlocked operations to
+		/* announce upcoming context changes, then wait for sched_op operations to
 		 * complete before altering the sched_ctx under sched_mutex protection */
 		_starpu_update_notified_workers_without_ctx(workerids, nworkers_ctx, sched_ctx_id, 1);
 		_starpu_sched_ctx_free_scheduling_data(sched_ctx);

+ 82 - 29
src/core/workers.h

@@ -80,16 +80,25 @@ LIST_TYPE(_starpu_worker,
 	starpu_pthread_cond_t started_cond; /* indicate when the worker is ready */
 	starpu_pthread_cond_t ready_cond; /* indicate when the worker is ready */
 	unsigned memory_node; /* which memory node is the worker associated with ? */
-	starpu_pthread_cond_t sched_cond; /* condition variable used when the worker waits for tasks. */
+	/* condition variable used for passive waiting operations on worker
+	 * STARPU_PTHREAD_COND_BROADCAST must be used instead of STARPU_PTHREAD_COND_SIGNAL,
+	 * since the condition is shared for multiple purpose */
+	starpu_pthread_cond_t sched_cond;
         starpu_pthread_mutex_t sched_mutex; /* mutex protecting sched_cond */
 	int state_sched_op_pending:1; /* a task pop is ongoing even though sched_mutex may temporarily be unlocked */
-	int state_changing_ctx_waiting:1; /* a thread is waiting for transient operations such as pop to complete before acquiring sched_mutex and modifying the worker ctx*/
-	int state_changing_ctx_notice:1; /* the worker ctx is about to change or being changed, wait for flag to be cleared before starting new transient scheduling operations */
-	int state_blocked:1;
-	int state_block_req:1;
-	int state_block_ack:1;
-	int state_unblock_req:1;
-	int state_unblock_ack:1;
+	int state_changing_ctx_waiting:1; /* a thread is waiting for operations such as pop to complete before acquiring sched_mutex and modifying the worker ctx*/
+	int state_changing_ctx_notice:1; /* the worker ctx is about to change or being changed, wait for flag to be cleared before starting new scheduling operations */
+	int state_blocked:1; /* worker is currently blocked */
+	int state_block_req:1; /* a request for state transition from unblocked to blocked is pending */
+	int state_block_ack:1; /* a block request has been honored */
+	int state_unblock_req:1; /* a request for state transition from blocked to unblocked is pending */
+	int state_unblock_ack:1; /* an unblock request has been honored */
+	 /* cumulative blocking depth
+	  * - =0  worker unblocked
+	  * - >0  worker blocked
+	  * - transition from 0 to 1 triggers a block_req
+	  * - transition from 1 to 0 triggers a unblock_req
+	  */
 	unsigned block_ref_count;
 	struct starpu_task_list local_tasks; /* this queue contains tasks that have been explicitely submitted to that queue */
 	struct starpu_task **local_ordered_tasks; /* this queue contains tasks that have been explicitely submitted to that queue with an explicit order */
@@ -618,29 +627,41 @@ struct _starpu_sched_ctx* _starpu_worker_get_ctx_stream(unsigned stream_workerid
  */
 static inline void _starpu_worker_request_blocking(struct _starpu_worker * const worker)
 {
+	/* flush pending requests to start on a fresh transaction epoch */
 	while (worker->state_unblock_req)
 		STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
+
+	/* announce blocking intent */
 	STARPU_ASSERT(worker->block_ref_count < UINT_MAX);
 	worker->block_ref_count++;
+
 	if (worker->block_ref_count == 1)
 	{
+		/* only the transition from 0 to 1 triggers the block_req */
+
 		STARPU_ASSERT(!worker->state_blocked);
 		STARPU_ASSERT(!worker->state_block_req);
 		STARPU_ASSERT(!worker->state_block_ack);
 		STARPU_ASSERT(!worker->state_unblock_req);
 		STARPU_ASSERT(!worker->state_unblock_ack);
 
+		/* trigger the block_req */
 		worker->state_block_req = 1;
 		STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
 
+		/* wait for block_req to be processed */
 		while (!worker->state_block_ack)
 			STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
 
 		STARPU_ASSERT(worker->block_ref_count >= 1);
 		STARPU_ASSERT(worker->state_block_req);
 		STARPU_ASSERT(worker->state_blocked);
+
+		/* reset block_req state flags */
 		worker->state_block_req = 0;
 		worker->state_block_ack = 0;
+
+		/* broadcast block_req state flags reset */
 		STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
 	}
 }
@@ -649,30 +670,44 @@ static inline void _starpu_worker_request_blocking(struct _starpu_worker * const
  */
 static inline void _starpu_worker_request_unblocking(struct _starpu_worker * const worker)
 {
+	/* flush pending requests to start on a fresh transaction epoch */
 	while (worker->state_block_req)
 		STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
+
+	/* unblocking may be requested unconditionnally
+	 * thus, check is unblocking is really needed */
 	if (worker->state_blocked)
 	{
-		STARPU_ASSERT(worker->block_ref_count > 0);
 		if (worker->block_ref_count == 1)
 		{
+			/* only the transition from 1 to 0 triggers the unblock_req */
+
 			STARPU_ASSERT(!worker->state_block_req);
 			STARPU_ASSERT(!worker->state_block_ack);
 			STARPU_ASSERT(!worker->state_unblock_req);
 			STARPU_ASSERT(!worker->state_unblock_ack);
 
+			/* trigger the unblock_req */
 			worker->state_unblock_req = 1;
 			STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
 
+			/* wait for the unblock_req to be processed */
 			while (!worker->state_unblock_ack)
 				STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
 
 			STARPU_ASSERT(worker->state_unblock_req);
 			STARPU_ASSERT(!worker->state_blocked);
+
+			/* reset unblock_req state flags */
 			worker->state_unblock_req = 0;
 			worker->state_unblock_ack = 0;
+
+			/* broadcast unblock_req state flags reset */
 			STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
 		}
+
+		/* announce unblocking complete */
+		STARPU_ASSERT(worker->block_ref_count > 0);
 		worker->block_ref_count--;
 	}
 }
@@ -689,10 +724,14 @@ static inline void _starpu_worker_process_block_requests(struct _starpu_worker *
 		STARPU_ASSERT(!worker->state_unblock_ack);
 		STARPU_ASSERT(worker->block_ref_count > 0);
 		
+		/* enter effective blocked state */
 		worker->state_blocked = 1;
+
+		/* notify block_req processing */
 		worker->state_block_ack = 1;
 		STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
 
+		/* block */
 		while (!worker->state_unblock_req)
 			STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
 
@@ -702,7 +741,10 @@ static inline void _starpu_worker_process_block_requests(struct _starpu_worker *
 		STARPU_ASSERT(!worker->state_unblock_ack);
 		STARPU_ASSERT(worker->block_ref_count > 0);
 
+		/* leave effective blocked state */
 		worker->state_blocked = 0;
+
+		/* notify unblock_req processing */
 		worker->state_unblock_ack = 1;
 		STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
 	}
@@ -712,14 +754,21 @@ static inline void _starpu_worker_process_block_requests(struct _starpu_worker *
  * Mark the beginning of a scheduling operation during which the sched_mutex
  * lock may be temporarily released, but the scheduling context of the worker
  * should not be modified */
-static inline void _starpu_worker_enter_transient_sched_op(struct _starpu_worker * const worker)
+static inline void _starpu_worker_enter_sched_op(struct _starpu_worker * const worker)
 {
+	/* process pending block requests before entering a sched_op region */
 	_starpu_worker_process_block_requests(worker);
 	while (worker->state_changing_ctx_notice)
 	{
 		STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
+
+		/* new block requests may have been triggered during the wait,
+		 * need to check again */
 		_starpu_worker_process_block_requests(worker);
 	}
+
+	/* no block request and no ctx change ahead,
+	 * enter sched_op */
 	worker->state_sched_op_pending = 1;
 }
 
@@ -727,49 +776,53 @@ static inline void _starpu_worker_enter_transient_sched_op(struct _starpu_worker
  * Mark the end of a scheduling operation, and notify potential waiters that
  * scheduling context changes can safely be performed again.
  */
-static inline void  _starpu_worker_leave_transient_sched_op(struct _starpu_worker * const worker)
+static inline void  _starpu_worker_leave_sched_op(struct _starpu_worker * const worker)
 {
 	worker->state_sched_op_pending = 0;
 	if (worker->state_changing_ctx_waiting)
-		/* cond_broadcast is required over cond_signal since
-		 * the condition is share for multiple purpose */
 		STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
 }
 
 /* Must be called with worker's sched_mutex held.
  */
-static inline void _starpu_worker_set_changing_ctx_notice(struct _starpu_worker * const worker)
+static inline void _starpu_worker_enter_changing_ctx_op(struct _starpu_worker * const worker)
 {
-	/* another ctx change might be under way, wait for the way to be cleared */
+	/* flush pending requests to start on a fresh transaction epoch */
 	while (worker->state_changing_ctx_notice)
 		STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
-	worker->state_changing_ctx_notice = 1;
-}
 
-/* Must be called with worker's sched_mutex held.
- */
-static inline void _starpu_worker_clear_changing_ctx_notice(struct _starpu_worker * const worker)
-{
-	worker->state_changing_ctx_notice = 0;
-	STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
-}
+	/* announce changing_ctx intent
+	 *
+	 * - an already started sched_op is allowed to complete
+	 * - no new sched_op may be started
+	 */
+	worker->state_changing_ctx_notice = 1;
 
-/* Must be called with worker's sched_mutex held.
- * Passively wait until state_sched_op_pending is cleared.
- */
-static inline void _starpu_worker_wait_for_transient_sched_op_completion(struct _starpu_worker * const worker)
-{
+	/* allow for an already started sched_op to complete */
 	if (worker->state_sched_op_pending)
 	{
+		/* request sched_op to broadcast when way is cleared */
 		worker->state_changing_ctx_waiting = 1;
+
+		/* wait for sched_op completion */
 		STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
 		do
 		{
 			STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
 		}
 		while (worker->state_sched_op_pending);
+
+		/* reset flag so other sched_ops wont have to broadcast state */
 		worker->state_changing_ctx_waiting = 0;
 	}
 }
 
+/* Must be called with worker's sched_mutex held.
+ */
+static inline void _starpu_worker_leave_changing_ctx_op(struct _starpu_worker * const worker)
+{
+	worker->state_changing_ctx_notice = 0;
+	STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
+}
+
 #endif // __WORKERS_H__

+ 6 - 6
src/drivers/driver_common/driver_common.c

@@ -344,7 +344,7 @@ struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *worker, int w
 	unsigned executing STARPU_ATTRIBUTE_UNUSED = 0;
 
 	_starpu_worker_set_status_scheduling(workerid);
-	_starpu_worker_enter_transient_sched_op(worker);
+	_starpu_worker_enter_sched_op(worker);
 	if ((worker->pipeline_length == 0 && worker->current_task)
 		|| (worker->pipeline_length != 0 && worker->ntasks))
 		/* This worker is executing something */
@@ -377,7 +377,7 @@ struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *worker, int w
 		if (_starpu_worker_can_block(memnode, worker)
 			&& !_starpu_sched_ctx_last_worker_awake(worker))
 		{
-			_starpu_worker_leave_transient_sched_op(worker);
+			_starpu_worker_leave_sched_op(worker);
 			do
 			{
 				STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
@@ -387,7 +387,7 @@ struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *worker, int w
 		}
 		else
 		{
-			_starpu_worker_leave_transient_sched_op(worker);
+			_starpu_worker_leave_sched_op(worker);
 			STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
 			if (_starpu_machine_is_running())
 				_starpu_exponential_backoff(worker);
@@ -408,7 +408,7 @@ struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *worker, int w
 	}
 	worker->spinning_backoff = BACKOFF_MIN;
 
-	_starpu_worker_leave_transient_sched_op(worker);
+	_starpu_worker_leave_sched_op(worker);
 	STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
 
 
@@ -458,9 +458,9 @@ int _starpu_get_multi_worker_task(struct _starpu_worker *workers, struct starpu_
 #endif
 			_starpu_worker_set_status_scheduling(workers[i].workerid);
 			_starpu_set_local_worker_key(&workers[i]);
-			_starpu_worker_enter_transient_sched_op(&workers[i]);
+			_starpu_worker_enter_sched_op(&workers[i]);
 			tasks[i] = _starpu_pop_task(&workers[i]);
-			_starpu_worker_leave_transient_sched_op(&workers[i]);
+			_starpu_worker_leave_sched_op(&workers[i]);
 			if(tasks[i] != NULL)
 			{
 				_starpu_worker_set_status_scheduling_done(workers[i].workerid);