request_cache.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package streaming
  14. import (
  15. "container/list"
  16. "crypto/rand"
  17. "encoding/base64"
  18. "fmt"
  19. "math"
  20. "sync"
  21. "time"
  22. "k8s.io/apimachinery/pkg/util/clock"
  23. )
  24. var (
  25. // cacheTTL is the timeout after which tokens become invalid.
  26. cacheTTL = 1 * time.Minute
  27. // maxInFlight is the maximum number of in-flight requests to allow.
  28. maxInFlight = 1000
  29. // tokenLen is the length of the random base64 encoded token identifying the request.
  30. tokenLen = 8
  31. )
  32. // requestCache caches streaming (exec/attach/port-forward) requests and generates a single-use
  33. // random token for their retrieval. The requestCache is used for building streaming URLs without
  34. // the need to encode every request parameter in the URL.
  35. type requestCache struct {
  36. // clock is used to obtain the current time
  37. clock clock.Clock
  38. // tokens maps the generate token to the request for fast retrieval.
  39. tokens map[string]*list.Element
  40. // ll maintains an age-ordered request list for faster garbage collection of expired requests.
  41. ll *list.List
  42. lock sync.Mutex
  43. }
  44. // Type representing an *ExecRequest, *AttachRequest, or *PortForwardRequest.
  45. type request interface{}
  46. type cacheEntry struct {
  47. token string
  48. req request
  49. expireTime time.Time
  50. }
  51. func newRequestCache() *requestCache {
  52. return &requestCache{
  53. clock: clock.RealClock{},
  54. ll: list.New(),
  55. tokens: make(map[string]*list.Element),
  56. }
  57. }
  58. // Insert the given request into the cache and returns the token used for fetching it out.
  59. func (c *requestCache) Insert(req request) (token string, err error) {
  60. c.lock.Lock()
  61. defer c.lock.Unlock()
  62. // Remove expired entries.
  63. c.gc()
  64. // If the cache is full, reject the request.
  65. if c.ll.Len() == maxInFlight {
  66. return "", NewErrorTooManyInFlight()
  67. }
  68. token, err = c.uniqueToken()
  69. if err != nil {
  70. return "", err
  71. }
  72. ele := c.ll.PushFront(&cacheEntry{token, req, c.clock.Now().Add(cacheTTL)})
  73. c.tokens[token] = ele
  74. return token, nil
  75. }
  76. // Consume the token (remove it from the cache) and return the cached request, if found.
  77. func (c *requestCache) Consume(token string) (req request, found bool) {
  78. c.lock.Lock()
  79. defer c.lock.Unlock()
  80. ele, ok := c.tokens[token]
  81. if !ok {
  82. return nil, false
  83. }
  84. c.ll.Remove(ele)
  85. delete(c.tokens, token)
  86. entry := ele.Value.(*cacheEntry)
  87. if c.clock.Now().After(entry.expireTime) {
  88. // Entry already expired.
  89. return nil, false
  90. }
  91. return entry.req, true
  92. }
  93. // uniqueToken generates a random URL-safe token and ensures uniqueness.
  94. func (c *requestCache) uniqueToken() (string, error) {
  95. const maxTries = 10
  96. // Number of bytes to be tokenLen when base64 encoded.
  97. tokenSize := math.Ceil(float64(tokenLen) * 6 / 8)
  98. rawToken := make([]byte, int(tokenSize))
  99. for i := 0; i < maxTries; i++ {
  100. if _, err := rand.Read(rawToken); err != nil {
  101. return "", err
  102. }
  103. encoded := base64.RawURLEncoding.EncodeToString(rawToken)
  104. token := encoded[:tokenLen]
  105. // If it's unique, return it. Otherwise retry.
  106. if _, exists := c.tokens[encoded]; !exists {
  107. return token, nil
  108. }
  109. }
  110. return "", fmt.Errorf("failed to generate unique token")
  111. }
  112. // Must be write-locked prior to calling.
  113. func (c *requestCache) gc() {
  114. now := c.clock.Now()
  115. for c.ll.Len() > 0 {
  116. oldest := c.ll.Back()
  117. entry := oldest.Value.(*cacheEntry)
  118. if !now.After(entry.expireTime) {
  119. return
  120. }
  121. // Oldest value is expired; remove it.
  122. c.ll.Remove(oldest)
  123. delete(c.tokens, entry.token)
  124. }
  125. }