rate_limited_queue.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package scheduler
  14. import (
  15. "container/heap"
  16. "sync"
  17. "time"
  18. "k8s.io/apimachinery/pkg/util/sets"
  19. "k8s.io/client-go/util/flowcontrol"
  20. "k8s.io/klog"
  21. )
  22. const (
  23. // NodeHealthUpdateRetry controls the number of retries of writing
  24. // node health update.
  25. NodeHealthUpdateRetry = 5
  26. // NodeEvictionPeriod controls how often NodeController will try to
  27. // evict Pods from non-responsive Nodes.
  28. NodeEvictionPeriod = 100 * time.Millisecond
  29. // EvictionRateLimiterBurst is the burst value for all eviction rate
  30. // limiters
  31. EvictionRateLimiterBurst = 1
  32. )
  33. // TimedValue is a value that should be processed at a designated time.
  34. type TimedValue struct {
  35. Value string
  36. // UID could be anything that helps identify the value
  37. UID interface{}
  38. AddedAt time.Time
  39. ProcessAt time.Time
  40. }
  41. // now is used to test time
  42. var now = time.Now
  43. // TimedQueue is a priority heap where the lowest ProcessAt is at the front of the queue
  44. type TimedQueue []*TimedValue
  45. // Len is the length of the queue.
  46. func (h TimedQueue) Len() int { return len(h) }
  47. // Less returns true if queue[i] < queue[j].
  48. func (h TimedQueue) Less(i, j int) bool { return h[i].ProcessAt.Before(h[j].ProcessAt) }
  49. // Swap swaps index i and j.
  50. func (h TimedQueue) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
  51. // Push a new TimedValue on to the queue.
  52. func (h *TimedQueue) Push(x interface{}) {
  53. *h = append(*h, x.(*TimedValue))
  54. }
  55. // Pop the lowest ProcessAt item.
  56. func (h *TimedQueue) Pop() interface{} {
  57. old := *h
  58. n := len(old)
  59. x := old[n-1]
  60. *h = old[0 : n-1]
  61. return x
  62. }
  63. // UniqueQueue is a FIFO queue which additionally guarantees that any
  64. // element can be added only once until it is removed.
  65. type UniqueQueue struct {
  66. lock sync.Mutex
  67. queue TimedQueue
  68. set sets.String
  69. }
  70. // Add a new value to the queue if it wasn't added before, or was
  71. // explicitly removed by the Remove call. Returns true if new value
  72. // was added.
  73. func (q *UniqueQueue) Add(value TimedValue) bool {
  74. q.lock.Lock()
  75. defer q.lock.Unlock()
  76. if q.set.Has(value.Value) {
  77. return false
  78. }
  79. heap.Push(&q.queue, &value)
  80. q.set.Insert(value.Value)
  81. return true
  82. }
  83. // Replace replaces an existing value in the queue if it already
  84. // exists, otherwise it does nothing. Returns true if the item was
  85. // found.
  86. func (q *UniqueQueue) Replace(value TimedValue) bool {
  87. q.lock.Lock()
  88. defer q.lock.Unlock()
  89. for i := range q.queue {
  90. if q.queue[i].Value != value.Value {
  91. continue
  92. }
  93. heap.Remove(&q.queue, i)
  94. heap.Push(&q.queue, &value)
  95. return true
  96. }
  97. return false
  98. }
  99. // RemoveFromQueue the value from the queue, but keeps it in the set,
  100. // so it won't be added second time. Returns true if something was
  101. // removed.
  102. func (q *UniqueQueue) RemoveFromQueue(value string) bool {
  103. q.lock.Lock()
  104. defer q.lock.Unlock()
  105. if !q.set.Has(value) {
  106. return false
  107. }
  108. for i, val := range q.queue {
  109. if val.Value == value {
  110. heap.Remove(&q.queue, i)
  111. return true
  112. }
  113. }
  114. return false
  115. }
  116. // Remove the value from the queue, so Get() call won't return it, and
  117. // allow subsequent addition of the given value. If the value is not
  118. // present does nothing and returns false.
  119. func (q *UniqueQueue) Remove(value string) bool {
  120. q.lock.Lock()
  121. defer q.lock.Unlock()
  122. if !q.set.Has(value) {
  123. return false
  124. }
  125. q.set.Delete(value)
  126. for i, val := range q.queue {
  127. if val.Value == value {
  128. heap.Remove(&q.queue, i)
  129. return true
  130. }
  131. }
  132. return true
  133. }
  134. // Get returns the oldest added value that wasn't returned yet.
  135. func (q *UniqueQueue) Get() (TimedValue, bool) {
  136. q.lock.Lock()
  137. defer q.lock.Unlock()
  138. if len(q.queue) == 0 {
  139. return TimedValue{}, false
  140. }
  141. result := heap.Pop(&q.queue).(*TimedValue)
  142. q.set.Delete(result.Value)
  143. return *result, true
  144. }
  145. // Head returns the oldest added value that wasn't returned yet
  146. // without removing it.
  147. func (q *UniqueQueue) Head() (TimedValue, bool) {
  148. q.lock.Lock()
  149. defer q.lock.Unlock()
  150. if len(q.queue) == 0 {
  151. return TimedValue{}, false
  152. }
  153. result := q.queue[0]
  154. return *result, true
  155. }
  156. // Clear removes all items from the queue and duplication preventing
  157. // set.
  158. func (q *UniqueQueue) Clear() {
  159. q.lock.Lock()
  160. defer q.lock.Unlock()
  161. if q.queue.Len() > 0 {
  162. q.queue = make(TimedQueue, 0)
  163. }
  164. if len(q.set) > 0 {
  165. q.set = sets.NewString()
  166. }
  167. }
  168. // RateLimitedTimedQueue is a unique item priority queue ordered by
  169. // the expected next time of execution. It is also rate limited.
  170. type RateLimitedTimedQueue struct {
  171. queue UniqueQueue
  172. limiterLock sync.Mutex
  173. limiter flowcontrol.RateLimiter
  174. }
  175. // NewRateLimitedTimedQueue creates new queue which will use given
  176. // RateLimiter to oversee execution.
  177. func NewRateLimitedTimedQueue(limiter flowcontrol.RateLimiter) *RateLimitedTimedQueue {
  178. return &RateLimitedTimedQueue{
  179. queue: UniqueQueue{
  180. queue: TimedQueue{},
  181. set: sets.NewString(),
  182. },
  183. limiter: limiter,
  184. }
  185. }
  186. // ActionFunc takes a timed value and returns false if the item must
  187. // be retried, with an optional time.Duration if some minimum wait
  188. // interval should be used.
  189. type ActionFunc func(TimedValue) (bool, time.Duration)
  190. // Try processes the queue.Ends prematurely if RateLimiter forbids an
  191. // action and leak is true. Otherwise, requeues the item to be
  192. // processed. Each value is processed once if fn returns true,
  193. // otherwise it is added back to the queue. The returned remaining is
  194. // used to identify the minimum time to execute the next item in the
  195. // queue. The same value is processed only once unless Remove is
  196. // explicitly called on it (it's done by the cancelPodEviction
  197. // function in NodeController when Node becomes Ready again) TODO:
  198. // figure out a good way to do garbage collection for all Nodes that
  199. // were removed from the cluster.
  200. func (q *RateLimitedTimedQueue) Try(fn ActionFunc) {
  201. val, ok := q.queue.Head()
  202. q.limiterLock.Lock()
  203. defer q.limiterLock.Unlock()
  204. for ok {
  205. // rate limit the queue checking
  206. if !q.limiter.TryAccept() {
  207. klog.V(10).Infof("Try rate limited for value: %v", val)
  208. // Try again later
  209. break
  210. }
  211. now := now()
  212. if now.Before(val.ProcessAt) {
  213. break
  214. }
  215. if ok, wait := fn(val); !ok {
  216. val.ProcessAt = now.Add(wait + 1)
  217. q.queue.Replace(val)
  218. } else {
  219. q.queue.RemoveFromQueue(val.Value)
  220. }
  221. val, ok = q.queue.Head()
  222. }
  223. }
  224. // Add value to the queue to be processed. Won't add the same
  225. // value(comparison by value) a second time if it was already added
  226. // and not removed.
  227. func (q *RateLimitedTimedQueue) Add(value string, uid interface{}) bool {
  228. now := now()
  229. return q.queue.Add(TimedValue{
  230. Value: value,
  231. UID: uid,
  232. AddedAt: now,
  233. ProcessAt: now,
  234. })
  235. }
  236. // Remove Node from the Evictor. The Node won't be processed until
  237. // added again.
  238. func (q *RateLimitedTimedQueue) Remove(value string) bool {
  239. return q.queue.Remove(value)
  240. }
  241. // Clear removes all items from the queue
  242. func (q *RateLimitedTimedQueue) Clear() {
  243. q.queue.Clear()
  244. }
  245. // SwapLimiter safely swaps current limiter for this queue with the
  246. // passed one if capacities or qps's differ.
  247. func (q *RateLimitedTimedQueue) SwapLimiter(newQPS float32) {
  248. q.limiterLock.Lock()
  249. defer q.limiterLock.Unlock()
  250. if q.limiter.QPS() == newQPS {
  251. return
  252. }
  253. var newLimiter flowcontrol.RateLimiter
  254. if newQPS <= 0 {
  255. newLimiter = flowcontrol.NewFakeNeverRateLimiter()
  256. } else {
  257. newLimiter = flowcontrol.NewTokenBucketRateLimiter(newQPS, EvictionRateLimiterBurst)
  258. // If we're currently waiting on limiter, we drain the new one - this is a good approach when Burst value is 1
  259. // TODO: figure out if we need to support higher Burst values and decide on the drain logic, should we keep:
  260. // - saturation (percentage of used tokens)
  261. // - number of used tokens
  262. // - number of available tokens
  263. // - something else
  264. if q.limiter.TryAccept() == false {
  265. newLimiter.TryAccept()
  266. }
  267. }
  268. q.limiter.Stop()
  269. q.limiter = newLimiter
  270. }