Sfoglia il codice sorgente

Instead of storing the list of queues associated to each memory node, we
directly store the condition variables.

Cédric Augonnet 15 anni fa
parent
commit
9b34ec2d9c

+ 19 - 60
src/core/workers.c

@@ -82,8 +82,12 @@ static struct starpu_worker_set_s gordon_worker_set;
 static void _starpu_init_worker_queue(struct starpu_worker_s *workerarg)
 {
 	struct starpu_jobq_s *jobq = workerarg->jobq;
+	pthread_cond_t *cond = &jobq->activity_cond;
+	pthread_mutex_t *mutex = &jobq->activity_mutex;
 
-	_starpu_memory_node_attach_queue(jobq, workerarg->memory_node);
+	unsigned memory_node = workerarg->memory_node;
+
+	_starpu_memory_node_register_condition(cond, mutex, memory_node);
 }
 
 static void _starpu_init_workers(struct starpu_machine_config_s *config)
@@ -365,79 +369,34 @@ typedef enum {
 	UNLOCK
 } queue_op;
 
-static void _starpu_operate_on_all_queues_attached_to_node(unsigned nodeid, queue_op op)
-{
-	unsigned q_id;
-	struct starpu_jobq_s *q;
-
-	starpu_mem_node_descr * const descr = _starpu_get_memory_node_description();
-
-	PTHREAD_RWLOCK_RDLOCK(&descr->attached_queues_rwlock);
-
-	unsigned nqueues = descr->queues_count[nodeid];
-
-	for (q_id = 0; q_id < nqueues; q_id++)
-	{
-		q  = descr->attached_queues_per_node[nodeid][q_id];
-		switch (op) {
-			case BROADCAST:
-				PTHREAD_COND_BROADCAST(&q->activity_cond);
-				break;
-			case LOCK:
-				PTHREAD_MUTEX_LOCK(&q->activity_mutex);
-				break;
-			case UNLOCK:
-				PTHREAD_MUTEX_UNLOCK(&q->activity_mutex);
-				break;
-		}
-	}
-
-	PTHREAD_RWLOCK_UNLOCK(&descr->attached_queues_rwlock);
-}
-
-inline void _starpu_lock_all_queues_attached_to_node(unsigned node)
-{
-	_starpu_operate_on_all_queues_attached_to_node(node, LOCK);
-}
-
-inline void _starpu_unlock_all_queues_attached_to_node(unsigned node)
-{
-	_starpu_operate_on_all_queues_attached_to_node(node, UNLOCK);
-}
-
-inline void _starpu_broadcast_all_queues_attached_to_node(unsigned node)
-{
-	_starpu_operate_on_all_queues_attached_to_node(node, BROADCAST);
-}
-
-static void _starpu_operate_on_all_queues(queue_op op)
+static void _starpu_operate_on_all_conditions(queue_op op)
 {
-	unsigned q_id;
-	struct starpu_jobq_s *q;
+	unsigned cond_id;
+	struct _cond_and_mutex *condition;
 
 	starpu_mem_node_descr * const descr = _starpu_get_memory_node_description();
 
-	PTHREAD_RWLOCK_RDLOCK(&descr->attached_queues_rwlock);
+	PTHREAD_RWLOCK_RDLOCK(&descr->conditions_rwlock);
 
-	unsigned nqueues = descr->total_queues_count;
+	unsigned nconds = descr->total_condition_count;
 
-	for (q_id = 0; q_id < nqueues; q_id++)
+	for (cond_id = 0; cond_id < nconds; cond_id++)
 	{
-		q  = descr->attached_queues_all[q_id];
+		condition = &descr->conditions_all[cond_id];
 		switch (op) {
 			case BROADCAST:
-				PTHREAD_COND_BROADCAST(&q->activity_cond);
+				PTHREAD_COND_BROADCAST(condition->cond);
 				break;
 			case LOCK:
-				PTHREAD_MUTEX_LOCK(&q->activity_mutex);
+				PTHREAD_MUTEX_LOCK(condition->mutex);
 				break;
 			case UNLOCK:
-				PTHREAD_MUTEX_UNLOCK(&q->activity_mutex);
+				PTHREAD_MUTEX_UNLOCK(condition->mutex);
 				break;
 		}
 	}
 
-	PTHREAD_RWLOCK_UNLOCK(&descr->attached_queues_rwlock);
+	PTHREAD_RWLOCK_UNLOCK(&descr->conditions_rwlock);
 }
 
 static void _starpu_kill_all_workers(struct starpu_machine_config_s *config)
@@ -449,17 +408,17 @@ static void _starpu_kill_all_workers(struct starpu_machine_config_s *config)
 
 	struct starpu_sched_policy_s *sched = _starpu_get_sched_policy();
 
-	_starpu_operate_on_all_queues(LOCK);
+	_starpu_operate_on_all_conditions(LOCK);
 	PTHREAD_MUTEX_LOCK(&sched->sched_activity_mutex);
 	
 	/* set the flag which will tell workers to stop */
 	config->running = 0;
 
-	_starpu_operate_on_all_queues(BROADCAST);
+	_starpu_operate_on_all_conditions(BROADCAST);
 	PTHREAD_COND_BROADCAST(&sched->sched_activity_cond);
 
 	PTHREAD_MUTEX_UNLOCK(&sched->sched_activity_mutex);
-	_starpu_operate_on_all_queues(UNLOCK);
+	_starpu_operate_on_all_conditions(UNLOCK);
 }
 
 void starpu_shutdown(void)

+ 0 - 4
src/core/workers.h

@@ -144,10 +144,6 @@ uint32_t _starpu_worker_may_execute_task(unsigned workerid, uint32_t where);
 unsigned _starpu_worker_can_block(unsigned memnode);
 void _starpu_block_worker(int workerid, pthread_cond_t *cond, pthread_mutex_t *mutex);
 
-void _starpu_lock_all_queues_attached_to_node(unsigned node);
-void _starpu_unlock_all_queues_attached_to_node(unsigned node);
-void _starpu_broadcast_all_queues_attached_to_node(unsigned node);
-
 void _starpu_set_local_worker_key(struct starpu_worker_s *worker);
 struct starpu_worker_s *_starpu_get_local_worker_key(void);
 

+ 39 - 16
src/datawizard/copy_driver.c

@@ -28,26 +28,36 @@
 
 void _starpu_wake_all_blocked_workers_on_node(unsigned nodeid)
 {
-	/* wake up all queues on that node */
-	unsigned q_id;
+	/* workers may be blocked on the policy's global condition */
+	struct starpu_sched_policy_s *sched = _starpu_get_sched_policy();
+	pthread_cond_t *sched_cond = &sched->sched_activity_cond;
+	pthread_mutex_t *sched_mutex = &sched->sched_activity_mutex;
+
+
+	PTHREAD_MUTEX_LOCK(sched_mutex);
+	PTHREAD_COND_BROADCAST(sched_cond);
+	PTHREAD_MUTEX_UNLOCK(sched_mutex);
+
+	/* wake up all workers on that memory node */
+	unsigned cond_id;
 
 	starpu_mem_node_descr * const descr = _starpu_get_memory_node_description();
 
-	PTHREAD_RWLOCK_RDLOCK(&descr->attached_queues_rwlock);
+	PTHREAD_RWLOCK_RDLOCK(&descr->conditions_rwlock);
 
-	unsigned nqueues = descr->queues_count[nodeid];
-	for (q_id = 0; q_id < nqueues; q_id++)
+	unsigned nconds = descr->condition_count[nodeid];
+	for (cond_id = 0; cond_id < nconds; cond_id++)
 	{
-		struct starpu_jobq_s *q;
-		q  = descr->attached_queues_per_node[nodeid][q_id];
+		struct _cond_and_mutex *condition;
+		condition  = &descr->conditions_attached_to_node[nodeid][cond_id];
 
-		/* wake anybody waiting on that queue */
-		PTHREAD_MUTEX_LOCK(&q->activity_mutex);
-		PTHREAD_COND_BROADCAST(&q->activity_cond);
-		PTHREAD_MUTEX_UNLOCK(&q->activity_mutex);
+		/* wake anybody waiting on that condition */
+		PTHREAD_MUTEX_LOCK(condition->mutex);
+		PTHREAD_COND_BROADCAST(condition->cond);
+		PTHREAD_MUTEX_UNLOCK(condition->mutex);
 	}
 
-	PTHREAD_RWLOCK_UNLOCK(&descr->attached_queues_rwlock);
+	PTHREAD_RWLOCK_UNLOCK(&descr->conditions_rwlock);
 }
 
 void starpu_wake_all_blocked_workers(void)
@@ -62,12 +72,25 @@ void starpu_wake_all_blocked_workers(void)
 	PTHREAD_MUTEX_UNLOCK(sched_mutex);
 
 	/* workers may be blocked on the various queues' conditions */
-	unsigned node;
-	unsigned nnodes = _starpu_get_memory_nodes_count();
-	for (node = 0; node < nnodes; node++)
+	unsigned cond_id;
+
+	starpu_mem_node_descr * const descr = _starpu_get_memory_node_description();
+
+	PTHREAD_RWLOCK_RDLOCK(&descr->conditions_rwlock);
+
+	unsigned nconds = descr->total_condition_count;
+	for (cond_id = 0; cond_id < nconds; cond_id++)
 	{
-		_starpu_wake_all_blocked_workers_on_node(node);
+		struct _cond_and_mutex *condition;
+		condition  = &descr->conditions_all[cond_id];
+
+		/* wake anybody waiting on that condition */
+		PTHREAD_MUTEX_LOCK(condition->mutex);
+		PTHREAD_COND_BROADCAST(condition->cond);
+		PTHREAD_MUTEX_UNLOCK(condition->mutex);
 	}
+
+	PTHREAD_RWLOCK_UNLOCK(&descr->conditions_rwlock);
 }
 
 #ifdef STARPU_USE_FXT

+ 29 - 24
src/datawizard/memory_nodes.c

@@ -40,8 +40,8 @@ void _starpu_init_memory_nodes(void)
 	_starpu_init_mem_chunk_lists();
 	_starpu_init_data_request_lists();
 
-	pthread_rwlock_init(&descr.attached_queues_rwlock, NULL);
-	descr.total_queues_count = 0;
+	pthread_rwlock_init(&descr.conditions_rwlock, NULL);
+	descr.total_condition_count = 0;
 }
 
 void _starpu_deinit_memory_nodes(void)
@@ -94,54 +94,59 @@ unsigned _starpu_register_memory_node(starpu_node_kind kind)
 	descr.nodes[nnodes-1] = kind;
 	STARPU_TRACE_NEW_MEM_NODE(nnodes-1);
 
-	/* for now, there is no queue related to that newly created node */
-	descr.queues_count[nnodes-1] = 0;
+	/* for now, there is no condition associated to that newly created node */
+	descr.condition_count[nnodes-1] = 0;
 
 	return (nnodes-1);
 }
 
 /* TODO move in a more appropriate file  !! */
-/* attach a queue to a memory node (if it's not already attached) */
-void _starpu_memory_node_attach_queue(struct starpu_jobq_s *q, unsigned nodeid)
+/* Register a condition variable associated to worker which is associated to a
+ * memory node itself. */
+void _starpu_memory_node_register_condition(pthread_cond_t *cond, pthread_mutex_t *mutex, unsigned nodeid)
 {
-	unsigned queue;
-	unsigned nqueues_total, nqueues;
+	unsigned cond_id;
+	unsigned nconds_total, nconds;
 	
-	pthread_rwlock_wrlock(&descr.attached_queues_rwlock);
+	pthread_rwlock_wrlock(&descr.conditions_rwlock);
 
 	/* we only insert the queue if it's not already in the list */
-	nqueues = descr.queues_count[nodeid];
-	for (queue = 0; queue < nqueues; queue++)
+	nconds = descr.condition_count[nodeid];
+	for (cond_id = 0; cond_id < nconds; cond_id++)
 	{
-		if (descr.attached_queues_per_node[nodeid][queue] == q)
+		if (descr.conditions_attached_to_node[nodeid][cond_id].cond == cond)
 		{
-			/* the queue is already in the list */
-			pthread_rwlock_unlock(&descr.attached_queues_rwlock);
+			STARPU_ASSERT(descr.conditions_attached_to_node[nodeid][cond_id].mutex == mutex);
+
+			/* the condition is already in the list */
+			pthread_rwlock_unlock(&descr.conditions_rwlock);
 			return;
 		}
 	}
 
 	/* it was not found locally */
-	descr.attached_queues_per_node[nodeid][nqueues] = q;
-	descr.queues_count[nodeid]++;
+	descr.conditions_attached_to_node[nodeid][cond_id].cond = cond;
+	descr.conditions_attached_to_node[nodeid][cond_id].mutex = mutex;
+	descr.condition_count[nodeid]++;
 
 	/* do we have to add it in the global list as well ? */
-	nqueues_total = descr.total_queues_count; 
-	for (queue = 0; queue < nqueues_total; queue++)
+	nconds_total = descr.total_condition_count; 
+	for (cond_id = 0; cond_id < nconds_total; cond_id++)
 	{
-		if (descr.attached_queues_all[queue] == q)
+		if (descr.conditions_all[cond_id].cond == cond)
 		{
 			/* the queue is already in the global list */
-			pthread_rwlock_unlock(&descr.attached_queues_rwlock);
+			pthread_rwlock_unlock(&descr.conditions_rwlock);
 			return;
 		}
 	} 
 
-	/* it was not in the global queue either */
-	descr.attached_queues_all[nqueues_total] = q;
-	descr.total_queues_count++;
+	/* it was not in the global list either */
+	descr.conditions_all[nconds_total].cond = cond;
+	descr.conditions_all[nconds_total].mutex = mutex;
+	descr.total_condition_count++;
 
-	pthread_rwlock_unlock(&descr.attached_queues_rwlock);
+	pthread_rwlock_unlock(&descr.conditions_rwlock);
 }
 
 unsigned starpu_worker_get_memory_node(unsigned workerid)

+ 18 - 7
src/datawizard/memory_nodes.h

@@ -36,18 +36,28 @@ typedef starpu_node_kind starpu_memory_node_tuple;
 #define _STARPU_MEMORY_NODE_TUPLE_FIRST(tuple) (tuple & 0x0F)
 #define _STARPU_MEMORY_NODE_TUPLE_SECOND(tuple) (tuple & 0xF0)
 
+struct _cond_and_mutex {
+        pthread_cond_t *cond;
+        pthread_mutex_t *mutex;	
+};
+
 typedef struct {
 	unsigned nnodes;
 	starpu_node_kind nodes[STARPU_MAXNODES];
 
-	/* the list of queues that are attached to a given node */
 	// TODO move this 2 lists outside starpu_mem_node_descr
-	pthread_rwlock_t attached_queues_rwlock;
-	struct starpu_jobq_s *attached_queues_per_node[STARPU_MAXNODES][STARPU_NMAXWORKERS];
-	struct starpu_jobq_s *attached_queues_all[STARPU_MAXNODES*STARPU_NMAXWORKERS];
+	/* Every worker is associated to a condition variable on which the
+	 * worker waits when there is task available. It is possible that
+	 * multiple worker share the same condition variable, so we maintain a
+	 * list of all these condition variables so that we can wake up all
+	 * worker attached to a memory node that are waiting on a task. */
+	pthread_rwlock_t conditions_rwlock;
+	struct _cond_and_mutex conditions_attached_to_node[STARPU_MAXNODES][STARPU_NMAXWORKERS];
+	struct _cond_and_mutex conditions_all[STARPU_MAXNODES*STARPU_NMAXWORKERS];
 	/* the number of queues attached to each node */
-	unsigned total_queues_count;
-	unsigned queues_count[STARPU_MAXNODES];
+	unsigned total_condition_count;
+	unsigned condition_count[STARPU_MAXNODES];
+
 } starpu_mem_node_descr;
 
 void _starpu_init_memory_nodes(void);
@@ -55,7 +65,8 @@ void _starpu_deinit_memory_nodes(void);
 void _starpu_set_local_memory_node_key(unsigned *node);
 unsigned _starpu_get_local_memory_node(void);
 unsigned _starpu_register_memory_node(starpu_node_kind kind);
-void _starpu_memory_node_attach_queue(struct starpu_jobq_s *q, unsigned nodeid);
+//void _starpu_memory_node_attach_queue(struct starpu_jobq_s *q, unsigned nodeid);
+void _starpu_memory_node_register_condition(pthread_cond_t *cond, pthread_mutex_t *mutex, unsigned memory_node);
 
 starpu_node_kind _starpu_get_node_kind(uint32_t node);
 unsigned _starpu_get_memory_nodes_count(void);