|
@@ -418,6 +418,49 @@ typedef enum {
|
|
|
/* XXX we should use an accessor */
|
|
|
extern mem_node_descr descr;
|
|
|
|
|
|
+static void operate_on_all_queues_attached_to_node(unsigned nodeid, queue_op op)
|
|
|
+{
|
|
|
+ unsigned q_id;
|
|
|
+ struct jobq_s *q;
|
|
|
+
|
|
|
+ take_mutex(&descr.attached_queues_mutex);
|
|
|
+
|
|
|
+ unsigned nqueues = descr.queues_count[nodeid];
|
|
|
+
|
|
|
+ for (q_id = 0; q_id < nqueues; q_id++)
|
|
|
+ {
|
|
|
+ q = descr.attached_queues_per_node[nodeid][q_id];
|
|
|
+ switch (op) {
|
|
|
+ case BROADCAST:
|
|
|
+ pthread_cond_broadcast(&q->activity_cond);
|
|
|
+ break;
|
|
|
+ case LOCK:
|
|
|
+ pthread_mutex_lock(&q->activity_mutex);
|
|
|
+ break;
|
|
|
+ case UNLOCK:
|
|
|
+ pthread_mutex_unlock(&q->activity_mutex);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ release_mutex(&descr.attached_queues_mutex);
|
|
|
+}
|
|
|
+
|
|
|
+inline void lock_all_queues_attached_to_node(unsigned node)
|
|
|
+{
|
|
|
+ operate_on_all_queues_attached_to_node(node, LOCK);
|
|
|
+}
|
|
|
+
|
|
|
+inline void unlock_all_queues_attached_to_node(unsigned node)
|
|
|
+{
|
|
|
+ operate_on_all_queues_attached_to_node(node, UNLOCK);
|
|
|
+}
|
|
|
+
|
|
|
+inline void broadcast_all_queues_attached_to_node(unsigned node)
|
|
|
+{
|
|
|
+ operate_on_all_queues_attached_to_node(node, BROADCAST);
|
|
|
+}
|
|
|
+
|
|
|
static void operate_on_all_queues(queue_op op)
|
|
|
{
|
|
|
unsigned q_id;
|