123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899 |
- package spdystream
- import (
- "container/heap"
- "sync"
- "github.com/docker/spdystream/spdy"
- )
- type prioritizedFrame struct {
- frame spdy.Frame
- priority uint8
- insertId uint64
- }
- type frameQueue []*prioritizedFrame
- func (fq frameQueue) Len() int {
- return len(fq)
- }
- func (fq frameQueue) Less(i, j int) bool {
- if fq[i].priority == fq[j].priority {
- return fq[i].insertId < fq[j].insertId
- }
- return fq[i].priority < fq[j].priority
- }
- func (fq frameQueue) Swap(i, j int) {
- fq[i], fq[j] = fq[j], fq[i]
- }
- func (fq *frameQueue) Push(x interface{}) {
- *fq = append(*fq, x.(*prioritizedFrame))
- }
- func (fq *frameQueue) Pop() interface{} {
- old := *fq
- n := len(old)
- *fq = old[0 : n-1]
- return old[n-1]
- }
- type PriorityFrameQueue struct {
- queue *frameQueue
- c *sync.Cond
- size int
- nextInsertId uint64
- drain bool
- }
- func NewPriorityFrameQueue(size int) *PriorityFrameQueue {
- queue := make(frameQueue, 0, size)
- heap.Init(&queue)
- return &PriorityFrameQueue{
- queue: &queue,
- size: size,
- c: sync.NewCond(&sync.Mutex{}),
- }
- }
- func (q *PriorityFrameQueue) Push(frame spdy.Frame, priority uint8) {
- q.c.L.Lock()
- defer q.c.L.Unlock()
- for q.queue.Len() >= q.size {
- q.c.Wait()
- }
- pFrame := &prioritizedFrame{
- frame: frame,
- priority: priority,
- insertId: q.nextInsertId,
- }
- q.nextInsertId = q.nextInsertId + 1
- heap.Push(q.queue, pFrame)
- q.c.Signal()
- }
- func (q *PriorityFrameQueue) Pop() spdy.Frame {
- q.c.L.Lock()
- defer q.c.L.Unlock()
- for q.queue.Len() == 0 {
- if q.drain {
- return nil
- }
- q.c.Wait()
- }
- frame := heap.Pop(q.queue).(*prioritizedFrame).frame
- q.c.Signal()
- return frame
- }
- func (q *PriorityFrameQueue) Drain() {
- q.c.L.Lock()
- defer q.c.L.Unlock()
- q.drain = true
- q.c.Broadcast()
- }
|