소스 검색

update _starpu_wake_all_blocked_workers*

Olivier Aumage 8 년 전
부모
커밋
5f69e81148
5개의 변경된 파일61개의 추가작업 그리고 43개의 파일을 삭제
  1. 9 6
      src/core/workers.c
  2. 38 26
      src/datawizard/copy_driver.c
  3. 4 4
      src/datawizard/memory_nodes.c
  4. 6 6
      src/datawizard/memory_nodes.h
  5. 4 1
      src/drivers/cpu/driver_cpu.c

+ 9 - 6
src/core/workers.c

@@ -457,14 +457,17 @@ int starpu_combined_worker_can_execute_task(unsigned workerid, struct starpu_tas
  * Runtime initialization methods
  */
 
-static void _starpu_init_worker_queue(struct _starpu_worker *workerarg)
+static void _starpu_init_worker_queue(struct _starpu_worker *worker)
 {
-	starpu_pthread_cond_t *cond = &workerarg->sched_cond;
-	starpu_pthread_mutex_t *mutex = &workerarg->sched_mutex;
-
-	unsigned memory_node = workerarg->memory_node;
+#if 0
+	starpu_pthread_cond_t *cond = &worker->sched_cond;
+	starpu_pthread_mutex_t *mutex = &worker->sched_mutex;
+	unsigned memory_node = worker->memory_node;
+#else
+#warning TODO remove
+#endif
 
-	_starpu_memory_node_register_condition(cond, mutex, memory_node);
+	_starpu_memory_node_register_condition(worker, &worker->sched_cond, worker->memory_node);
 }
 
 /*

+ 38 - 26
src/datawizard/copy_driver.c

@@ -41,34 +41,40 @@
 void _starpu_wake_all_blocked_workers_on_node(unsigned nodeid)
 {
 	/* wake up all workers on that memory node */
-	unsigned cond_id;
-
 	struct _starpu_memory_node_descr * const descr = _starpu_memory_node_get_description();
-
-	starpu_pthread_mutex_t *mymutex = NULL;
-	starpu_pthread_cond_t *mycond = NULL;
-	const int myworkerid = starpu_worker_get_id();
-	if (myworkerid >= 0)
-		starpu_worker_get_sched_condition(myworkerid, &mymutex, &mycond);
+	const int cur_workerid = starpu_worker_get_id();
+	struct _starpu_worker *cur_worker = cur_workerid>=0?_starpu_get_worker_struct(cur_workerid):NULL;
 
 	STARPU_PTHREAD_RWLOCK_RDLOCK(&descr->conditions_rwlock);
 
 	unsigned nconds = descr->condition_count[nodeid];
+	unsigned cond_id;
 	for (cond_id = 0; cond_id < nconds; cond_id++)
 	{
-		struct _starpu_cond_and_mutex *condition;
-		condition  = &descr->conditions_attached_to_node[nodeid][cond_id];
+		struct _starpu_cond_and_worker *condition;
+		condition = &descr->conditions_attached_to_node[nodeid][cond_id];
 
-		if (condition->mutex == mymutex)
+		if (condition->worker == cur_worker)
 			/* No need to wake myself, and I might be called from
 			 * the scheduler with mutex locked, through
 			 * starpu_prefetch_task_input_on_node */
 			continue;
 
 		/* wake anybody waiting on that condition */
-		STARPU_PTHREAD_MUTEX_LOCK_SCHED(condition->mutex);
+		STARPU_PTHREAD_MUTEX_LOCK_SCHED(&condition->worker->sched_mutex);
+		if (condition->cond == &condition->worker->sched_cond)
+		{
+			if (condition->worker->status == STATUS_SCHEDULING)
+			{
+				condition->worker->state_keep_awake = 1;
+			}
+			else if (condition->worker->status == STATUS_SLEEPING)
+			{
+				condition->worker->status = STATUS_WAKING_UP;
+			} 
+		} 
 		STARPU_PTHREAD_COND_BROADCAST(condition->cond);
-		STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(condition->mutex);
+		STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&condition->worker->sched_mutex);
 	}
 
 	STARPU_PTHREAD_RWLOCK_UNLOCK(&descr->conditions_rwlock);
@@ -81,34 +87,40 @@ void _starpu_wake_all_blocked_workers_on_node(unsigned nodeid)
 void starpu_wake_all_blocked_workers(void)
 {
 	/* workers may be blocked on the various queues' conditions */
-	unsigned cond_id;
-
 	struct _starpu_memory_node_descr * const descr = _starpu_memory_node_get_description();
-
-	starpu_pthread_mutex_t *mymutex = NULL;
-	starpu_pthread_cond_t *mycond = NULL;
-	const int myworkerid = starpu_worker_get_id();
-	if (myworkerid >= 0)
-		starpu_worker_get_sched_condition(myworkerid, &mymutex, &mycond);
+	const int cur_workerid = starpu_worker_get_id();
+	struct _starpu_worker *cur_worker = cur_workerid>=0?_starpu_get_worker_struct(cur_workerid):NULL;
 
 	STARPU_PTHREAD_RWLOCK_RDLOCK(&descr->conditions_rwlock);
 
 	unsigned nconds = descr->total_condition_count;
+	unsigned cond_id;
 	for (cond_id = 0; cond_id < nconds; cond_id++)
 	{
-		struct _starpu_cond_and_mutex *condition;
-		condition  = &descr->conditions_all[cond_id];
+		struct _starpu_cond_and_worker *condition;
+		condition = &descr->conditions_all[cond_id];
 
-		if (condition->mutex == mymutex)
+		if (condition->worker == cur_worker)
 			/* No need to wake myself, and I might be called from
 			 * the scheduler with mutex locked, through
 			 * starpu_prefetch_task_input_on_node */
 			continue;
 
 		/* wake anybody waiting on that condition */
-		STARPU_PTHREAD_MUTEX_LOCK_SCHED(condition->mutex);
+		STARPU_PTHREAD_MUTEX_LOCK_SCHED(&condition->worker->sched_mutex);
+		if (condition->cond == &condition->worker->sched_cond)
+		{
+			if (condition->worker->status == STATUS_SCHEDULING)
+			{
+				condition->worker->state_keep_awake = 1;
+			}
+			else if (condition->worker->status == STATUS_SLEEPING)
+			{
+				condition->worker->status = STATUS_WAKING_UP;
+			} 
+		} 
 		STARPU_PTHREAD_COND_BROADCAST(condition->cond);
-		STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(condition->mutex);
+		STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&condition->worker->sched_mutex);
 	}
 
 	STARPU_PTHREAD_RWLOCK_UNLOCK(&descr->conditions_rwlock);

+ 4 - 4
src/datawizard/memory_nodes.c

@@ -130,7 +130,7 @@ unsigned _starpu_memory_node_register(enum starpu_node_kind kind, int devid)
 /* TODO move in a more appropriate file  !! */
 /* Register a condition variable associated to worker which is associated to a
  * memory node itself. */
-void _starpu_memory_node_register_condition(starpu_pthread_cond_t *cond, starpu_pthread_mutex_t *mutex, unsigned nodeid)
+void _starpu_memory_node_register_condition(struct _starpu_worker *worker, starpu_pthread_cond_t *cond, unsigned nodeid)
 {
 	unsigned cond_id;
 	unsigned nconds_total, nconds;
@@ -143,7 +143,7 @@ void _starpu_memory_node_register_condition(starpu_pthread_cond_t *cond, starpu_
 	{
 		if (_starpu_descr.conditions_attached_to_node[nodeid][cond_id].cond == cond)
 		{
-			STARPU_ASSERT(_starpu_descr.conditions_attached_to_node[nodeid][cond_id].mutex == mutex);
+			STARPU_ASSERT(_starpu_descr.conditions_attached_to_node[nodeid][cond_id].worker == worker);
 
 			/* the condition is already in the list */
 			STARPU_PTHREAD_RWLOCK_UNLOCK(&_starpu_descr.conditions_rwlock);
@@ -153,7 +153,7 @@ void _starpu_memory_node_register_condition(starpu_pthread_cond_t *cond, starpu_
 
 	/* it was not found locally */
 	_starpu_descr.conditions_attached_to_node[nodeid][cond_id].cond = cond;
-	_starpu_descr.conditions_attached_to_node[nodeid][cond_id].mutex = mutex;
+	_starpu_descr.conditions_attached_to_node[nodeid][cond_id].worker = worker;
 	_starpu_descr.condition_count[nodeid]++;
 
 	/* do we have to add it in the global list as well ? */
@@ -170,7 +170,7 @@ void _starpu_memory_node_register_condition(starpu_pthread_cond_t *cond, starpu_
 
 	/* it was not in the global list either */
 	_starpu_descr.conditions_all[nconds_total].cond = cond;
-	_starpu_descr.conditions_all[nconds_total].mutex = mutex;
+	_starpu_descr.conditions_all[nconds_total].worker = worker;
 	_starpu_descr.total_condition_count++;
 
 	STARPU_PTHREAD_RWLOCK_UNLOCK(&_starpu_descr.conditions_rwlock);

+ 6 - 6
src/datawizard/memory_nodes.h

@@ -35,10 +35,10 @@
 
 extern char _starpu_worker_drives_memory[STARPU_NMAXWORKERS][STARPU_MAXNODES];
 
-struct _starpu_cond_and_mutex
+struct _starpu_cond_and_worker
 {
-        starpu_pthread_cond_t *cond;
-        starpu_pthread_mutex_t *mutex;
+	starpu_pthread_cond_t *cond;
+	struct _starpu_worker *worker;
 };
 
 struct _starpu_memory_node_descr
@@ -62,8 +62,8 @@ struct _starpu_memory_node_descr
 	 * list of all these condition variables so that we can wake up all
 	 * worker attached to a memory node that are waiting on a task. */
 	starpu_pthread_rwlock_t conditions_rwlock;
-	struct _starpu_cond_and_mutex conditions_attached_to_node[STARPU_MAXNODES][STARPU_NMAXWORKERS];
-	struct _starpu_cond_and_mutex conditions_all[STARPU_MAXNODES*STARPU_NMAXWORKERS];
+	struct _starpu_cond_and_worker conditions_attached_to_node[STARPU_MAXNODES][STARPU_NMAXWORKERS];
+	struct _starpu_cond_and_worker conditions_all[STARPU_MAXNODES*STARPU_NMAXWORKERS];
 	/* the number of queues attached to each node */
 	unsigned total_condition_count;
 	unsigned condition_count[STARPU_MAXNODES];
@@ -109,7 +109,7 @@ static inline msg_host_t _starpu_simgrid_memory_node_get_host(unsigned node)
 #endif
 unsigned _starpu_memory_node_register(enum starpu_node_kind kind, int devid);
 //void _starpu_memory_node_attach_queue(struct starpu_jobq_s *q, unsigned nodeid);
-void _starpu_memory_node_register_condition(starpu_pthread_cond_t *cond, starpu_pthread_mutex_t *mutex, unsigned memory_node);
+void _starpu_memory_node_register_condition(struct _starpu_worker *worker, starpu_pthread_cond_t *cond, unsigned nodeid);
 
 static inline int _starpu_memory_node_get_devid(unsigned node)
 {

+ 4 - 1
src/drivers/cpu/driver_cpu.c

@@ -210,9 +210,12 @@ int _starpu_cpu_driver_init(struct _starpu_worker *cpu_worker)
 
 	_STARPU_TRACE_WORKER_INIT_END(cpu_worker->workerid);
 
+	STARPU_PTHREAD_MUTEX_LOCK_SCHED(&cpu_worker->sched_mutex);
+	cpu_worker->status = STATUS_UNKNOWN;
+	STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&cpu_worker->sched_mutex);
+
 	/* tell the main thread that we are ready */
 	STARPU_PTHREAD_MUTEX_LOCK(&cpu_worker->mutex);
-	cpu_worker->status = STATUS_UNKNOWN;
 	cpu_worker->worker_is_initialized = 1;
 	STARPU_PTHREAD_COND_SIGNAL(&cpu_worker->ready_cond);
 	STARPU_PTHREAD_MUTEX_UNLOCK(&cpu_worker->mutex);