浏览代码

drop wake_up_sem in favor of a sched_cond

Olivier Aumage 8 年之前
父节点
当前提交
0580dc2152
共有 5 个文件被更改,包括 22 次插入21 次删除
  1. 9 13
      src/core/sched_ctx.c
  2. 0 4
      src/core/sched_ctx.h
  3. 1 0
      src/core/workers.c
  4. 1 0
      src/core/workers.h
  5. 11 4
      src/drivers/driver_common/driver_common.c

+ 9 - 13
src/core/sched_ctx.c

@@ -548,8 +548,6 @@ struct _starpu_sched_ctx* _starpu_create_sched_ctx(struct starpu_sched_policy *p
 	for(w = 0; w < nworkers; w++)
 	{
 		sem_init(&sched_ctx->fall_asleep_sem[w], 0, 0);
-		sem_init(&sched_ctx->wake_up_sem[w], 0, 0);
-
 		sched_ctx->parallel_sect[w] = 0;
 	}
 
@@ -2358,14 +2356,6 @@ void _starpu_sched_ctx_signal_worker_blocked(unsigned sched_ctx_id, int workerid
 	sem_post(&sched_ctx->fall_asleep_sem[sched_ctx->main_master]);
 }
 
-void _starpu_sched_ctx_signal_worker_woke_up(unsigned sched_ctx_id, int workerid)
-{
-	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
-	sem_post(&sched_ctx->wake_up_sem[sched_ctx->main_master]);
-	worker->state_blocked_in_ctx = 0;
-}
-
 static void _starpu_sched_ctx_put_workers_to_sleep(unsigned sched_ctx_id, unsigned all)
 {
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
@@ -2461,12 +2451,18 @@ static void _starpu_sched_ctx_wake_up_workers(unsigned sched_ctx_id, unsigned al
 				{
 					STARPU_ASSERT(worker->state_busy_in_parallel == 1);
 					worker->state_wait_ack__busy_in_parallel = 0;
+					/* broadcast is required because sched_cond is shared for multiple purpose */
+					STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
+				}
+				worker->state_wait_handshake__busy_in_parallel = 1;
+				do
+				{
+					STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
 				}
-				/* broadcast is required because sched_cond is shared for multiple purpose */
-				STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
+				while (worker->state_wait_handshake__busy_in_parallel);
+				worker->state_wait_handshake__busy_in_parallel = 0;
 				if (!workers_locked)
 					STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
-				sem_wait(&sched_ctx->wake_up_sem[master]);
 			}
 			else
 				sched_ctx->parallel_sect[workerid] = 0;

+ 0 - 4
src/core/sched_ctx.h

@@ -135,10 +135,6 @@ struct _starpu_sched_ctx
 	   all blocked and ready to exec the parallel code */
 	sem_t fall_asleep_sem[STARPU_NMAXWORKERS];
 
-	/* semaphore that block appl thread until starpu threads are 
-	   all woke up and ready continue appl */
-	sem_t wake_up_sem[STARPU_NMAXWORKERS];
-
 	/* ctx nesting the current ctx */
 	unsigned nesting_sched_ctx;
 

+ 1 - 0
src/core/workers.c

@@ -583,6 +583,7 @@ static void _starpu_worker_init(struct _starpu_worker *workerarg, struct _starpu
 	workerarg->state_changing_ctx_waiting = 0;
 	workerarg->state_busy_in_parallel = 0;
 	workerarg->state_wait_ack__busy_in_parallel = 0;
+	workerarg->state_wait_handshake__busy_in_parallel = 0;
 	workerarg->state_blocked_in_ctx = 0;
 
 	/* cpu_set/hwloc_cpu_set initialized in topology.c */

+ 1 - 0
src/core/workers.h

@@ -86,6 +86,7 @@ LIST_TYPE(_starpu_worker,
 	int state_changing_ctx_waiting:1; /* a thread is waiting for transient operations such as pop to complete before acquiring sched_mutex and modifying the worker ctx*/
 	int state_busy_in_parallel:1;
 	int state_wait_ack__busy_in_parallel:1;
+	int state_wait_handshake__busy_in_parallel:1;
 	int state_blocked_in_ctx:1; /* worker is blocked in a ctx */
 	struct starpu_task_list local_tasks; /* this queue contains tasks that have been explicitely submitted to that queue */
 	struct starpu_task **local_ordered_tasks; /* this queue contains tasks that have been explicitely submitted to that queue with an explicit order */

+ 11 - 4
src/drivers/driver_common/driver_common.c

@@ -376,9 +376,13 @@ struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *worker, int w
 					}
 					while (worker->state_wait_ack__busy_in_parallel);
 					worker->state_busy_in_parallel = 0;
-					STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
-					_starpu_sched_ctx_signal_worker_woke_up(sched_ctx->id, workerid);
+					worker->state_blocked_in_ctx = 0;
 					sched_ctx->parallel_sect[workerid] = 0;
+					if (worker->state_wait_handshake__busy_in_parallel)
+					{
+						worker->state_wait_handshake__busy_in_parallel = 0;
+						STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
+					}
 				}
 			}
 			if(!needed)
@@ -400,9 +404,12 @@ struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *worker, int w
 				}
 				while (worker->state_wait_ack__busy_in_parallel);
 				worker->state_busy_in_parallel = 0;
-				STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
-				_starpu_sched_ctx_signal_worker_woke_up(sched_ctx->id, workerid);
 				sched_ctx->parallel_sect[workerid] = 0;
+				if (worker->state_wait_handshake__busy_in_parallel)
+				{
+					worker->state_wait_handshake__busy_in_parallel = 0;
+					STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
+				}
 			}
 		}