heap.go 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. /*
  2. Copyright 2018 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // Below is the implementation of the a heap. The logic is pretty much the same
  14. // as cache.heap, however, this heap does not perform synchronization. It leaves
  15. // synchronization to the SchedulingQueue.
  16. package util
  17. import (
  18. "container/heap"
  19. "fmt"
  20. "k8s.io/client-go/tools/cache"
  21. "k8s.io/kubernetes/pkg/scheduler/metrics"
  22. )
  23. // KeyFunc is a function type to get the key from an object.
  24. type KeyFunc func(obj interface{}) (string, error)
  25. type heapItem struct {
  26. obj interface{} // The object which is stored in the heap.
  27. index int // The index of the object's key in the Heap.queue.
  28. }
  29. type itemKeyValue struct {
  30. key string
  31. obj interface{}
  32. }
  33. // heapData is an internal struct that implements the standard heap interface
  34. // and keeps the data stored in the heap.
  35. type heapData struct {
  36. // items is a map from key of the objects to the objects and their index.
  37. // We depend on the property that items in the map are in the queue and vice versa.
  38. items map[string]*heapItem
  39. // queue implements a heap data structure and keeps the order of elements
  40. // according to the heap invariant. The queue keeps the keys of objects stored
  41. // in "items".
  42. queue []string
  43. // keyFunc is used to make the key used for queued item insertion and retrieval, and
  44. // should be deterministic.
  45. keyFunc KeyFunc
  46. // lessFunc is used to compare two objects in the heap.
  47. lessFunc LessFunc
  48. }
  49. var (
  50. _ = heap.Interface(&heapData{}) // heapData is a standard heap
  51. )
  52. // Less compares two objects and returns true if the first one should go
  53. // in front of the second one in the heap.
  54. func (h *heapData) Less(i, j int) bool {
  55. if i > len(h.queue) || j > len(h.queue) {
  56. return false
  57. }
  58. itemi, ok := h.items[h.queue[i]]
  59. if !ok {
  60. return false
  61. }
  62. itemj, ok := h.items[h.queue[j]]
  63. if !ok {
  64. return false
  65. }
  66. return h.lessFunc(itemi.obj, itemj.obj)
  67. }
  68. // Len returns the number of items in the Heap.
  69. func (h *heapData) Len() int { return len(h.queue) }
  70. // Swap implements swapping of two elements in the heap. This is a part of standard
  71. // heap interface and should never be called directly.
  72. func (h *heapData) Swap(i, j int) {
  73. h.queue[i], h.queue[j] = h.queue[j], h.queue[i]
  74. item := h.items[h.queue[i]]
  75. item.index = i
  76. item = h.items[h.queue[j]]
  77. item.index = j
  78. }
  79. // Push is supposed to be called by heap.Push only.
  80. func (h *heapData) Push(kv interface{}) {
  81. keyValue := kv.(*itemKeyValue)
  82. n := len(h.queue)
  83. h.items[keyValue.key] = &heapItem{keyValue.obj, n}
  84. h.queue = append(h.queue, keyValue.key)
  85. }
  86. // Pop is supposed to be called by heap.Pop only.
  87. func (h *heapData) Pop() interface{} {
  88. key := h.queue[len(h.queue)-1]
  89. h.queue = h.queue[0 : len(h.queue)-1]
  90. item, ok := h.items[key]
  91. if !ok {
  92. // This is an error
  93. return nil
  94. }
  95. delete(h.items, key)
  96. return item.obj
  97. }
  98. // Peek is supposed to be called by heap.Peek only.
  99. func (h *heapData) Peek() interface{} {
  100. if len(h.queue) > 0 {
  101. return h.items[h.queue[0]].obj
  102. }
  103. return nil
  104. }
  105. // Heap is a producer/consumer queue that implements a heap data structure.
  106. // It can be used to implement priority queues and similar data structures.
  107. type Heap struct {
  108. // data stores objects and has a queue that keeps their ordering according
  109. // to the heap invariant.
  110. data *heapData
  111. // metricRecorder updates the counter when elements of a heap get added or
  112. // removed, and it does nothing if it's nil
  113. metricRecorder metrics.MetricRecorder
  114. }
  115. // Add inserts an item, and puts it in the queue. The item is updated if it
  116. // already exists.
  117. func (h *Heap) Add(obj interface{}) error {
  118. key, err := h.data.keyFunc(obj)
  119. if err != nil {
  120. return cache.KeyError{Obj: obj, Err: err}
  121. }
  122. if _, exists := h.data.items[key]; exists {
  123. h.data.items[key].obj = obj
  124. heap.Fix(h.data, h.data.items[key].index)
  125. } else {
  126. heap.Push(h.data, &itemKeyValue{key, obj})
  127. if h.metricRecorder != nil {
  128. h.metricRecorder.Inc()
  129. }
  130. }
  131. return nil
  132. }
  133. // AddIfNotPresent inserts an item, and puts it in the queue. If an item with
  134. // the key is present in the map, no changes is made to the item.
  135. func (h *Heap) AddIfNotPresent(obj interface{}) error {
  136. key, err := h.data.keyFunc(obj)
  137. if err != nil {
  138. return cache.KeyError{Obj: obj, Err: err}
  139. }
  140. if _, exists := h.data.items[key]; !exists {
  141. heap.Push(h.data, &itemKeyValue{key, obj})
  142. if h.metricRecorder != nil {
  143. h.metricRecorder.Inc()
  144. }
  145. }
  146. return nil
  147. }
  148. // Update is the same as Add in this implementation. When the item does not
  149. // exist, it is added.
  150. func (h *Heap) Update(obj interface{}) error {
  151. return h.Add(obj)
  152. }
  153. // Delete removes an item.
  154. func (h *Heap) Delete(obj interface{}) error {
  155. key, err := h.data.keyFunc(obj)
  156. if err != nil {
  157. return cache.KeyError{Obj: obj, Err: err}
  158. }
  159. if item, ok := h.data.items[key]; ok {
  160. heap.Remove(h.data, item.index)
  161. if h.metricRecorder != nil {
  162. h.metricRecorder.Dec()
  163. }
  164. return nil
  165. }
  166. return fmt.Errorf("object not found")
  167. }
  168. // Peek returns the head of the heap without removing it.
  169. func (h *Heap) Peek() interface{} {
  170. return h.data.Peek()
  171. }
  172. // Pop returns the head of the heap and removes it.
  173. func (h *Heap) Pop() (interface{}, error) {
  174. obj := heap.Pop(h.data)
  175. if obj != nil {
  176. if h.metricRecorder != nil {
  177. h.metricRecorder.Dec()
  178. }
  179. return obj, nil
  180. }
  181. return nil, fmt.Errorf("object was removed from heap data")
  182. }
  183. // Get returns the requested item, or sets exists=false.
  184. func (h *Heap) Get(obj interface{}) (interface{}, bool, error) {
  185. key, err := h.data.keyFunc(obj)
  186. if err != nil {
  187. return nil, false, cache.KeyError{Obj: obj, Err: err}
  188. }
  189. return h.GetByKey(key)
  190. }
  191. // GetByKey returns the requested item, or sets exists=false.
  192. func (h *Heap) GetByKey(key string) (interface{}, bool, error) {
  193. item, exists := h.data.items[key]
  194. if !exists {
  195. return nil, false, nil
  196. }
  197. return item.obj, true, nil
  198. }
  199. // List returns a list of all the items.
  200. func (h *Heap) List() []interface{} {
  201. list := make([]interface{}, 0, len(h.data.items))
  202. for _, item := range h.data.items {
  203. list = append(list, item.obj)
  204. }
  205. return list
  206. }
  207. // Len returns the number of items in the heap.
  208. func (h *Heap) Len() int {
  209. return len(h.data.queue)
  210. }
  211. // NewHeap returns a Heap which can be used to queue up items to process.
  212. func NewHeap(keyFn KeyFunc, lessFn LessFunc) *Heap {
  213. return NewHeapWithRecorder(keyFn, lessFn, nil)
  214. }
  215. // NewHeapWithRecorder wraps an optional metricRecorder to compose a Heap object.
  216. func NewHeapWithRecorder(keyFn KeyFunc, lessFn LessFunc, metricRecorder metrics.MetricRecorder) *Heap {
  217. return &Heap{
  218. data: &heapData{
  219. items: map[string]*heapItem{},
  220. queue: []string{},
  221. keyFunc: keyFn,
  222. lessFunc: lessFn,
  223. },
  224. metricRecorder: metricRecorder,
  225. }
  226. }