priority.go 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. package spdystream
  2. import (
  3. "container/heap"
  4. "sync"
  5. "github.com/docker/spdystream/spdy"
  6. )
  7. type prioritizedFrame struct {
  8. frame spdy.Frame
  9. priority uint8
  10. insertId uint64
  11. }
  12. type frameQueue []*prioritizedFrame
  13. func (fq frameQueue) Len() int {
  14. return len(fq)
  15. }
  16. func (fq frameQueue) Less(i, j int) bool {
  17. if fq[i].priority == fq[j].priority {
  18. return fq[i].insertId < fq[j].insertId
  19. }
  20. return fq[i].priority < fq[j].priority
  21. }
  22. func (fq frameQueue) Swap(i, j int) {
  23. fq[i], fq[j] = fq[j], fq[i]
  24. }
  25. func (fq *frameQueue) Push(x interface{}) {
  26. *fq = append(*fq, x.(*prioritizedFrame))
  27. }
  28. func (fq *frameQueue) Pop() interface{} {
  29. old := *fq
  30. n := len(old)
  31. *fq = old[0 : n-1]
  32. return old[n-1]
  33. }
  34. type PriorityFrameQueue struct {
  35. queue *frameQueue
  36. c *sync.Cond
  37. size int
  38. nextInsertId uint64
  39. drain bool
  40. }
  41. func NewPriorityFrameQueue(size int) *PriorityFrameQueue {
  42. queue := make(frameQueue, 0, size)
  43. heap.Init(&queue)
  44. return &PriorityFrameQueue{
  45. queue: &queue,
  46. size: size,
  47. c: sync.NewCond(&sync.Mutex{}),
  48. }
  49. }
  50. func (q *PriorityFrameQueue) Push(frame spdy.Frame, priority uint8) {
  51. q.c.L.Lock()
  52. defer q.c.L.Unlock()
  53. for q.queue.Len() >= q.size {
  54. q.c.Wait()
  55. }
  56. pFrame := &prioritizedFrame{
  57. frame: frame,
  58. priority: priority,
  59. insertId: q.nextInsertId,
  60. }
  61. q.nextInsertId = q.nextInsertId + 1
  62. heap.Push(q.queue, pFrame)
  63. q.c.Signal()
  64. }
  65. func (q *PriorityFrameQueue) Pop() spdy.Frame {
  66. q.c.L.Lock()
  67. defer q.c.L.Unlock()
  68. for q.queue.Len() == 0 {
  69. if q.drain {
  70. return nil
  71. }
  72. q.c.Wait()
  73. }
  74. frame := heap.Pop(q.queue).(*prioritizedFrame).frame
  75. q.c.Signal()
  76. return frame
  77. }
  78. func (q *PriorityFrameQueue) Drain() {
  79. q.c.L.Lock()
  80. defer q.c.L.Unlock()
  81. q.drain = true
  82. q.c.Broadcast()
  83. }