Browse Source

merge state_blocked flag and state_busy_in_parallel flag that are always set conjointly

Olivier Aumage 8 years ago
parent
commit
9c211dd53a
4 changed files with 28 additions and 40 deletions
  1. 9 15
      src/core/sched_ctx.c
  2. 4 5
      src/core/workers.c
  3. 3 4
      src/core/workers.h
  4. 12 16
      src/drivers/driver_common/driver_common.c

+ 9 - 15
src/core/sched_ctx.c

@@ -1116,7 +1116,7 @@ void starpu_sched_ctx_delete(unsigned sched_ctx_id)
 		for(w = 0; w < nworkers_ctx; w++)
 		{
 			struct _starpu_worker *worker = _starpu_get_worker_struct(backup_workerids[w]);
-			while (worker->state_busy_in_parallel)
+			while (worker->state_blocked)
 			{
 				STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
 			}
@@ -2392,9 +2392,8 @@ static void _starpu_sched_ctx_put_workers_to_sleep(unsigned sched_ctx_id, unsign
 				&& (current_worker_id == -1 || workerid != current_worker_id)
 				&& !blocked[workers_count])
 		{
-			/* TODO: replace fall_asleep_sem by a condition, in order to be able to avoid unlocking sched_mutex */
 			struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
-			while (!worker->state_blocked_in_ctx)
+			while (!worker->state_blocked)
 				STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
 		}
 		workers_count++;
@@ -2428,29 +2427,24 @@ static void _starpu_sched_ctx_wake_up_workers(unsigned sched_ctx_id, unsigned al
 			 && sched_ctx->parallel_sect[workerid] && (workerid != master || all))
 		{
 			struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
-			if((current_worker_id == -1 || workerid != current_worker_id) && worker->state_blocked_in_ctx)
+			if((current_worker_id == -1 || workerid != current_worker_id) && worker->state_blocked)
 			{
-				/* TODO: this section seems suspicious since
-				 * busy_in_parallel state may change after
-				 * unlock and before sem_wait, sem should be
-				 * reimplemented using sched_mutex + a
-				 * condition */
 				if (!workers_locked)
 					STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
-				if (worker->state_wait_ack__busy_in_parallel)
+				if (worker->state_wait_ack__blocked)
 				{
-					STARPU_ASSERT(worker->state_busy_in_parallel == 1);
-					worker->state_wait_ack__busy_in_parallel = 0;
+					STARPU_ASSERT(worker->state_blocked == 1);
+					worker->state_wait_ack__blocked = 0;
 					/* broadcast is required because sched_cond is shared for multiple purpose */
 					STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
 				}
-				worker->state_wait_handshake__busy_in_parallel = 1;
+				worker->state_wait_handshake__blocked = 1;
 				do
 				{
 					STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
 				}
-				while (worker->state_wait_handshake__busy_in_parallel);
-				worker->state_wait_handshake__busy_in_parallel = 0;
+				while (worker->state_wait_handshake__blocked);
+				worker->state_wait_handshake__blocked = 0;
 				if (!workers_locked)
 					STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
 			}

+ 4 - 5
src/core/workers.c

@@ -581,10 +581,9 @@ static void _starpu_worker_init(struct _starpu_worker *workerarg, struct _starpu
 
 	workerarg->state_sched_op_pending = 0;
 	workerarg->state_changing_ctx_waiting = 0;
-	workerarg->state_busy_in_parallel = 0;
-	workerarg->state_wait_ack__busy_in_parallel = 0;
-	workerarg->state_wait_handshake__busy_in_parallel = 0;
-	workerarg->state_blocked_in_ctx = 0;
+	workerarg->state_blocked = 0;
+	workerarg->state_wait_ack__blocked = 0;
+	workerarg->state_wait_handshake__blocked = 0;
 
 	/* cpu_set/hwloc_cpu_set initialized in topology.c */
 }
@@ -1693,7 +1692,7 @@ unsigned starpu_worker_get_count(void)
 
 unsigned starpu_worker_is_blocked(int workerid)
 {
-	return _starpu_config.workers[workerid].state_blocked_in_ctx;
+	return (unsigned)_starpu_config.workers[workerid].state_blocked;
 }
 
 unsigned starpu_worker_is_slave_somewhere(int workerid)

+ 3 - 4
src/core/workers.h

@@ -84,10 +84,9 @@ LIST_TYPE(_starpu_worker,
         starpu_pthread_mutex_t sched_mutex; /* mutex protecting sched_cond */
 	int state_sched_op_pending:1; /* a task pop is ongoing even though sched_mutex may temporarily be unlocked */
 	int state_changing_ctx_waiting:1; /* a thread is waiting for transient operations such as pop to complete before acquiring sched_mutex and modifying the worker ctx*/
-	int state_busy_in_parallel:1;
-	int state_wait_ack__busy_in_parallel:1;
-	int state_wait_handshake__busy_in_parallel:1;
-	int state_blocked_in_ctx:1; /* worker is blocked in a ctx */
+	int state_blocked:1;
+	int state_wait_ack__blocked:1;
+	int state_wait_handshake__blocked:1;
 	struct starpu_task_list local_tasks; /* this queue contains tasks that have been explicitely submitted to that queue */
 	struct starpu_task **local_ordered_tasks; /* this queue contains tasks that have been explicitely submitted to that queue with an explicit order */
 	unsigned local_ordered_tasks_size; /* this records the size of local_ordered_tasks */

+ 12 - 16
src/drivers/driver_common/driver_common.c

@@ -367,21 +367,19 @@ struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *worker, int w
 					/* we need it until here bc of the list of ctxs of the workers
 					   that can change in another thread */
 					needed = 0;
-					worker->state_blocked_in_ctx = 1;
+					worker->state_blocked = 1;
+					worker->state_wait_ack__blocked = 1;
 					STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
-					worker->state_busy_in_parallel = 1;
-					worker->state_wait_ack__busy_in_parallel = 1;
 					do
 					{
 						STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
 					}
-					while (worker->state_wait_ack__busy_in_parallel);
-					worker->state_busy_in_parallel = 0;
-					worker->state_blocked_in_ctx = 0;
+					while (worker->state_wait_ack__blocked);
+					worker->state_blocked = 0;
 					sched_ctx->parallel_sect[workerid] = 0;
-					if (worker->state_wait_handshake__busy_in_parallel)
+					if (worker->state_wait_handshake__blocked)
 					{
-						worker->state_wait_handshake__busy_in_parallel = 0;
+						worker->state_wait_handshake__blocked = 0;
 						STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
 					}
 				}
@@ -396,21 +394,19 @@ struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *worker, int w
 			if(sched_ctx->parallel_sect[workerid])
 			{
 //				needed = 0;
-				worker->state_blocked_in_ctx = 1;
+				worker->state_blocked = 1;
+				worker->state_wait_ack__blocked = 1;
 				STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
-				worker->state_busy_in_parallel = 1;
-				worker->state_wait_ack__busy_in_parallel = 1;
 				do
 				{
 					STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
 				}
-				while (worker->state_wait_ack__busy_in_parallel);
-				worker->state_busy_in_parallel = 0;
-				worker->state_blocked_in_ctx = 0;
+				while (worker->state_wait_ack__blocked);
+				worker->state_blocked = 0;
 				sched_ctx->parallel_sect[workerid] = 0;
-				if (worker->state_wait_handshake__busy_in_parallel)
+				if (worker->state_wait_handshake__blocked)
 				{
-					worker->state_wait_handshake__busy_in_parallel = 0;
+					worker->state_wait_handshake__blocked = 0;
 					STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
 				}
 			}