Преглед на файлове

bring worker state management routines together

Olivier Aumage преди 8 години
родител
ревизия
6f3194bf72
променени са 2 файла, в които са добавени 70 реда и са изтрити 54 реда
  1. 5 54
      src/core/sched_ctx.c
  2. 65 0
      src/core/workers.h

+ 5 - 54
src/core/sched_ctx.c

@@ -2347,31 +2347,7 @@ static void _starpu_sched_ctx_block_workers(unsigned sched_ctx_id, unsigned all)
 		{
 			struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
 			STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
-			while (worker->state_unblock_req)
-				STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
-			STARPU_ASSERT(worker->block_ref_count < UINT_MAX);
-			worker->block_ref_count++;
-			if (worker->block_ref_count == 1)
-			{
-				STARPU_ASSERT(!worker->state_blocked);
-				STARPU_ASSERT(!worker->state_block_req);
-				STARPU_ASSERT(!worker->state_block_ack);
-				STARPU_ASSERT(!worker->state_unblock_req);
-				STARPU_ASSERT(!worker->state_unblock_ack);
-
-				worker->state_block_req = 1;
-				STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
-
-				while (!worker->state_block_ack)
-					STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
-
-				STARPU_ASSERT(worker->block_ref_count >= 1);
-				STARPU_ASSERT(worker->state_block_req);
-				STARPU_ASSERT(worker->state_blocked);
-				worker->state_block_req = 0;
-				worker->state_block_ack = 0;
-				STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
-			}
+			_starpu_worker_request_blocking(worker);
 			STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
 		}
 		workers_count++;
@@ -2404,38 +2380,13 @@ static void _starpu_sched_ctx_unblock_workers(unsigned sched_ctx_id, unsigned al
 		if(starpu_worker_get_type(workerid) == STARPU_CPU_WORKER
 			 && (workerid != master || all))
 		{
-			struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
-			STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
 			if((current_worker_id == -1 || workerid != current_worker_id))
 			{
-				while (worker->state_block_req)
-					STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
-				if (worker->state_blocked)
-				{
-					STARPU_ASSERT(worker->block_ref_count > 0);
-					if (worker->block_ref_count == 1)
-					{
-						STARPU_ASSERT(!worker->state_block_req);
-						STARPU_ASSERT(!worker->state_block_ack);
-						STARPU_ASSERT(!worker->state_unblock_req);
-						STARPU_ASSERT(!worker->state_unblock_ack);
-
-						worker->state_unblock_req = 1;
-						STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
-
-						while (!worker->state_unblock_ack)
-							STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
-
-						STARPU_ASSERT(worker->state_unblock_req);
-						STARPU_ASSERT(!worker->state_blocked);
-						worker->state_unblock_req = 0;
-						worker->state_unblock_ack = 0;
-						STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
-					}
-					worker->block_ref_count--;
-				}
+				struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
+				STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
+				_starpu_worker_request_unblocking(worker);
+				STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
 			}
-			STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
 		}
 	}
 

+ 65 - 0
src/core/workers.h

@@ -614,6 +614,71 @@ void _starpu_worker_set_stream_ctx(unsigned workerid, struct _starpu_sched_ctx *
 
 struct _starpu_sched_ctx* _starpu_worker_get_ctx_stream(unsigned stream_workerid);
 
+/* Must be called with worker's sched_mutex held.
+ */
+static inline void _starpu_worker_request_blocking(struct _starpu_worker * const worker)
+{
+	while (worker->state_unblock_req)
+		STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
+	STARPU_ASSERT(worker->block_ref_count < UINT_MAX);
+	worker->block_ref_count++;
+	if (worker->block_ref_count == 1)
+	{
+		STARPU_ASSERT(!worker->state_blocked);
+		STARPU_ASSERT(!worker->state_block_req);
+		STARPU_ASSERT(!worker->state_block_ack);
+		STARPU_ASSERT(!worker->state_unblock_req);
+		STARPU_ASSERT(!worker->state_unblock_ack);
+
+		worker->state_block_req = 1;
+		STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
+
+		while (!worker->state_block_ack)
+			STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
+
+		STARPU_ASSERT(worker->block_ref_count >= 1);
+		STARPU_ASSERT(worker->state_block_req);
+		STARPU_ASSERT(worker->state_blocked);
+		worker->state_block_req = 0;
+		worker->state_block_ack = 0;
+		STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
+	}
+}
+
+/* Must be called with worker's sched_mutex held.
+ */
+static inline void _starpu_worker_request_unblocking(struct _starpu_worker * const worker)
+{
+	while (worker->state_block_req)
+		STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
+	if (worker->state_blocked)
+	{
+		STARPU_ASSERT(worker->block_ref_count > 0);
+		if (worker->block_ref_count == 1)
+		{
+			STARPU_ASSERT(!worker->state_block_req);
+			STARPU_ASSERT(!worker->state_block_ack);
+			STARPU_ASSERT(!worker->state_unblock_req);
+			STARPU_ASSERT(!worker->state_unblock_ack);
+
+			worker->state_unblock_req = 1;
+			STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
+
+			while (!worker->state_unblock_ack)
+				STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
+
+			STARPU_ASSERT(worker->state_unblock_req);
+			STARPU_ASSERT(!worker->state_blocked);
+			worker->state_unblock_req = 0;
+			worker->state_unblock_ack = 0;
+			STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
+		}
+		worker->block_ref_count--;
+	}
+}
+
+/* Must be called with worker's sched_mutex held.
+ */
 static inline void _starpu_worker_process_block_requests(struct _starpu_worker * const worker)
 {
 	while (worker->state_block_req)