123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259 |
- /*
- Copyright 2018 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- // Below is the implementation of the a heap. The logic is pretty much the same
- // as cache.heap, however, this heap does not perform synchronization. It leaves
- // synchronization to the SchedulingQueue.
- package util
- import (
- "container/heap"
- "fmt"
- "k8s.io/client-go/tools/cache"
- "k8s.io/kubernetes/pkg/scheduler/metrics"
- )
- // KeyFunc is a function type to get the key from an object.
- type KeyFunc func(obj interface{}) (string, error)
- type heapItem struct {
- obj interface{} // The object which is stored in the heap.
- index int // The index of the object's key in the Heap.queue.
- }
- type itemKeyValue struct {
- key string
- obj interface{}
- }
- // heapData is an internal struct that implements the standard heap interface
- // and keeps the data stored in the heap.
- type heapData struct {
- // items is a map from key of the objects to the objects and their index.
- // We depend on the property that items in the map are in the queue and vice versa.
- items map[string]*heapItem
- // queue implements a heap data structure and keeps the order of elements
- // according to the heap invariant. The queue keeps the keys of objects stored
- // in "items".
- queue []string
- // keyFunc is used to make the key used for queued item insertion and retrieval, and
- // should be deterministic.
- keyFunc KeyFunc
- // lessFunc is used to compare two objects in the heap.
- lessFunc LessFunc
- }
- var (
- _ = heap.Interface(&heapData{}) // heapData is a standard heap
- )
- // Less compares two objects and returns true if the first one should go
- // in front of the second one in the heap.
- func (h *heapData) Less(i, j int) bool {
- if i > len(h.queue) || j > len(h.queue) {
- return false
- }
- itemi, ok := h.items[h.queue[i]]
- if !ok {
- return false
- }
- itemj, ok := h.items[h.queue[j]]
- if !ok {
- return false
- }
- return h.lessFunc(itemi.obj, itemj.obj)
- }
- // Len returns the number of items in the Heap.
- func (h *heapData) Len() int { return len(h.queue) }
- // Swap implements swapping of two elements in the heap. This is a part of standard
- // heap interface and should never be called directly.
- func (h *heapData) Swap(i, j int) {
- h.queue[i], h.queue[j] = h.queue[j], h.queue[i]
- item := h.items[h.queue[i]]
- item.index = i
- item = h.items[h.queue[j]]
- item.index = j
- }
- // Push is supposed to be called by heap.Push only.
- func (h *heapData) Push(kv interface{}) {
- keyValue := kv.(*itemKeyValue)
- n := len(h.queue)
- h.items[keyValue.key] = &heapItem{keyValue.obj, n}
- h.queue = append(h.queue, keyValue.key)
- }
- // Pop is supposed to be called by heap.Pop only.
- func (h *heapData) Pop() interface{} {
- key := h.queue[len(h.queue)-1]
- h.queue = h.queue[0 : len(h.queue)-1]
- item, ok := h.items[key]
- if !ok {
- // This is an error
- return nil
- }
- delete(h.items, key)
- return item.obj
- }
- // Peek is supposed to be called by heap.Peek only.
- func (h *heapData) Peek() interface{} {
- if len(h.queue) > 0 {
- return h.items[h.queue[0]].obj
- }
- return nil
- }
- // Heap is a producer/consumer queue that implements a heap data structure.
- // It can be used to implement priority queues and similar data structures.
- type Heap struct {
- // data stores objects and has a queue that keeps their ordering according
- // to the heap invariant.
- data *heapData
- // metricRecorder updates the counter when elements of a heap get added or
- // removed, and it does nothing if it's nil
- metricRecorder metrics.MetricRecorder
- }
- // Add inserts an item, and puts it in the queue. The item is updated if it
- // already exists.
- func (h *Heap) Add(obj interface{}) error {
- key, err := h.data.keyFunc(obj)
- if err != nil {
- return cache.KeyError{Obj: obj, Err: err}
- }
- if _, exists := h.data.items[key]; exists {
- h.data.items[key].obj = obj
- heap.Fix(h.data, h.data.items[key].index)
- } else {
- heap.Push(h.data, &itemKeyValue{key, obj})
- if h.metricRecorder != nil {
- h.metricRecorder.Inc()
- }
- }
- return nil
- }
- // AddIfNotPresent inserts an item, and puts it in the queue. If an item with
- // the key is present in the map, no changes is made to the item.
- func (h *Heap) AddIfNotPresent(obj interface{}) error {
- key, err := h.data.keyFunc(obj)
- if err != nil {
- return cache.KeyError{Obj: obj, Err: err}
- }
- if _, exists := h.data.items[key]; !exists {
- heap.Push(h.data, &itemKeyValue{key, obj})
- if h.metricRecorder != nil {
- h.metricRecorder.Inc()
- }
- }
- return nil
- }
- // Update is the same as Add in this implementation. When the item does not
- // exist, it is added.
- func (h *Heap) Update(obj interface{}) error {
- return h.Add(obj)
- }
- // Delete removes an item.
- func (h *Heap) Delete(obj interface{}) error {
- key, err := h.data.keyFunc(obj)
- if err != nil {
- return cache.KeyError{Obj: obj, Err: err}
- }
- if item, ok := h.data.items[key]; ok {
- heap.Remove(h.data, item.index)
- if h.metricRecorder != nil {
- h.metricRecorder.Dec()
- }
- return nil
- }
- return fmt.Errorf("object not found")
- }
- // Peek returns the head of the heap without removing it.
- func (h *Heap) Peek() interface{} {
- return h.data.Peek()
- }
- // Pop returns the head of the heap and removes it.
- func (h *Heap) Pop() (interface{}, error) {
- obj := heap.Pop(h.data)
- if obj != nil {
- if h.metricRecorder != nil {
- h.metricRecorder.Dec()
- }
- return obj, nil
- }
- return nil, fmt.Errorf("object was removed from heap data")
- }
- // Get returns the requested item, or sets exists=false.
- func (h *Heap) Get(obj interface{}) (interface{}, bool, error) {
- key, err := h.data.keyFunc(obj)
- if err != nil {
- return nil, false, cache.KeyError{Obj: obj, Err: err}
- }
- return h.GetByKey(key)
- }
- // GetByKey returns the requested item, or sets exists=false.
- func (h *Heap) GetByKey(key string) (interface{}, bool, error) {
- item, exists := h.data.items[key]
- if !exists {
- return nil, false, nil
- }
- return item.obj, true, nil
- }
- // List returns a list of all the items.
- func (h *Heap) List() []interface{} {
- list := make([]interface{}, 0, len(h.data.items))
- for _, item := range h.data.items {
- list = append(list, item.obj)
- }
- return list
- }
- // Len returns the number of items in the heap.
- func (h *Heap) Len() int {
- return len(h.data.queue)
- }
- // NewHeap returns a Heap which can be used to queue up items to process.
- func NewHeap(keyFn KeyFunc, lessFn LessFunc) *Heap {
- return NewHeapWithRecorder(keyFn, lessFn, nil)
- }
- // NewHeapWithRecorder wraps an optional metricRecorder to compose a Heap object.
- func NewHeapWithRecorder(keyFn KeyFunc, lessFn LessFunc, metricRecorder metrics.MetricRecorder) *Heap {
- return &Heap{
- data: &heapData{
- items: map[string]*heapItem{},
- queue: []string{},
- keyFunc: keyFn,
- lessFunc: lessFn,
- },
- metricRecorder: metricRecorder,
- }
- }
|