reader.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. /*
  2. Copyright (c) 2014-2015 VMware, Inc. All Rights Reserved.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package progress
  14. import (
  15. "container/list"
  16. "context"
  17. "fmt"
  18. "io"
  19. "sync/atomic"
  20. "time"
  21. )
  22. type readerReport struct {
  23. pos int64 // Keep first to ensure 64-bit alignment
  24. size int64 // Keep first to ensure 64-bit alignment
  25. bps *uint64 // Keep first to ensure 64-bit alignment
  26. t time.Time
  27. err error
  28. }
  29. func (r readerReport) Percentage() float32 {
  30. return 100.0 * float32(r.pos) / float32(r.size)
  31. }
  32. func (r readerReport) Detail() string {
  33. const (
  34. KiB = 1024
  35. MiB = 1024 * KiB
  36. GiB = 1024 * MiB
  37. )
  38. // Use the reader's bps field, so this report returns an up-to-date number.
  39. //
  40. // For example: if there hasn't been progress for the last 5 seconds, the
  41. // most recent report should return "0B/s".
  42. //
  43. bps := atomic.LoadUint64(r.bps)
  44. switch {
  45. case bps >= GiB:
  46. return fmt.Sprintf("%.1fGiB/s", float32(bps)/float32(GiB))
  47. case bps >= MiB:
  48. return fmt.Sprintf("%.1fMiB/s", float32(bps)/float32(MiB))
  49. case bps >= KiB:
  50. return fmt.Sprintf("%.1fKiB/s", float32(bps)/float32(KiB))
  51. default:
  52. return fmt.Sprintf("%dB/s", bps)
  53. }
  54. }
  55. func (p readerReport) Error() error {
  56. return p.err
  57. }
  58. // reader wraps an io.Reader and sends a progress report over a channel for
  59. // every read it handles.
  60. type reader struct {
  61. r io.Reader
  62. pos int64
  63. size int64
  64. bps uint64
  65. ch chan<- Report
  66. ctx context.Context
  67. }
  68. func NewReader(ctx context.Context, s Sinker, r io.Reader, size int64) *reader {
  69. pr := reader{
  70. r: r,
  71. ctx: ctx,
  72. size: size,
  73. }
  74. // Reports must be sent downstream and to the bps computation loop.
  75. pr.ch = Tee(s, newBpsLoop(&pr.bps)).Sink()
  76. return &pr
  77. }
  78. // Read calls the Read function on the underlying io.Reader. Additionally,
  79. // every read causes a progress report to be sent to the progress reader's
  80. // underlying channel.
  81. func (r *reader) Read(b []byte) (int, error) {
  82. n, err := r.r.Read(b)
  83. r.pos += int64(n)
  84. if err != nil && err != io.EOF {
  85. return n, err
  86. }
  87. q := readerReport{
  88. t: time.Now(),
  89. pos: r.pos,
  90. size: r.size,
  91. bps: &r.bps,
  92. }
  93. select {
  94. case r.ch <- q:
  95. case <-r.ctx.Done():
  96. }
  97. return n, err
  98. }
  99. // Done marks the progress reader as done, optionally including an error in the
  100. // progress report. After sending it, the underlying channel is closed.
  101. func (r *reader) Done(err error) {
  102. q := readerReport{
  103. t: time.Now(),
  104. pos: r.pos,
  105. size: r.size,
  106. bps: &r.bps,
  107. err: err,
  108. }
  109. select {
  110. case r.ch <- q:
  111. close(r.ch)
  112. case <-r.ctx.Done():
  113. }
  114. }
  115. // newBpsLoop returns a sink that monitors and stores throughput.
  116. func newBpsLoop(dst *uint64) SinkFunc {
  117. fn := func() chan<- Report {
  118. sink := make(chan Report)
  119. go bpsLoop(sink, dst)
  120. return sink
  121. }
  122. return fn
  123. }
  124. func bpsLoop(ch <-chan Report, dst *uint64) {
  125. l := list.New()
  126. for {
  127. var tch <-chan time.Time
  128. // Setup timer for front of list to become stale.
  129. if e := l.Front(); e != nil {
  130. dt := time.Second - time.Now().Sub(e.Value.(readerReport).t)
  131. tch = time.After(dt)
  132. }
  133. select {
  134. case q, ok := <-ch:
  135. if !ok {
  136. return
  137. }
  138. l.PushBack(q)
  139. case <-tch:
  140. l.Remove(l.Front())
  141. }
  142. // Compute new bps
  143. if l.Len() == 0 {
  144. atomic.StoreUint64(dst, 0)
  145. } else {
  146. f := l.Front().Value.(readerReport)
  147. b := l.Back().Value.(readerReport)
  148. atomic.StoreUint64(dst, uint64(b.pos-f.pos))
  149. }
  150. }
  151. }