123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756 |
- /*
- Copyright 2015 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package cache
- import (
- "fmt"
- "sync"
- "time"
- v1 "k8s.io/api/core/v1"
- "k8s.io/apimachinery/pkg/labels"
- "k8s.io/apimachinery/pkg/util/sets"
- "k8s.io/apimachinery/pkg/util/wait"
- utilfeature "k8s.io/apiserver/pkg/util/feature"
- "k8s.io/klog"
- "k8s.io/kubernetes/pkg/features"
- schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
- "k8s.io/kubernetes/pkg/scheduler/metrics"
- schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
- )
- var (
- cleanAssumedPeriod = 1 * time.Second
- )
- // New returns a Cache implementation.
- // It automatically starts a go routine that manages expiration of assumed pods.
- // "ttl" is how long the assumed pod will get expired.
- // "stop" is the channel that would close the background goroutine.
- func New(ttl time.Duration, stop <-chan struct{}) Cache {
- cache := newSchedulerCache(ttl, cleanAssumedPeriod, stop)
- cache.run()
- return cache
- }
- // nodeInfoListItem holds a NodeInfo pointer and acts as an item in a doubly
- // linked list. When a NodeInfo is updated, it goes to the head of the list.
- // The items closer to the head are the most recently updated items.
- type nodeInfoListItem struct {
- info *schedulernodeinfo.NodeInfo
- next *nodeInfoListItem
- prev *nodeInfoListItem
- }
- type schedulerCache struct {
- stop <-chan struct{}
- ttl time.Duration
- period time.Duration
- // This mutex guards all fields within this cache struct.
- mu sync.RWMutex
- // a set of assumed pod keys.
- // The key could further be used to get an entry in podStates.
- assumedPods map[string]bool
- // a map from pod key to podState.
- podStates map[string]*podState
- nodes map[string]*nodeInfoListItem
- // headNode points to the most recently updated NodeInfo in "nodes". It is the
- // head of the linked list.
- headNode *nodeInfoListItem
- nodeTree *nodeTree
- // A map from image name to its imageState.
- imageStates map[string]*imageState
- }
- type podState struct {
- pod *v1.Pod
- // Used by assumedPod to determinate expiration.
- deadline *time.Time
- // Used to block cache from expiring assumedPod if binding still runs
- bindingFinished bool
- }
- type imageState struct {
- // Size of the image
- size int64
- // A set of node names for nodes having this image present
- nodes sets.String
- }
- // createImageStateSummary returns a summarizing snapshot of the given image's state.
- func (cache *schedulerCache) createImageStateSummary(state *imageState) *schedulernodeinfo.ImageStateSummary {
- return &schedulernodeinfo.ImageStateSummary{
- Size: state.size,
- NumNodes: len(state.nodes),
- }
- }
- func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedulerCache {
- return &schedulerCache{
- ttl: ttl,
- period: period,
- stop: stop,
- nodes: make(map[string]*nodeInfoListItem),
- nodeTree: newNodeTree(nil),
- assumedPods: make(map[string]bool),
- podStates: make(map[string]*podState),
- imageStates: make(map[string]*imageState),
- }
- }
- // newNodeInfoListItem initializes a new nodeInfoListItem.
- func newNodeInfoListItem(ni *schedulernodeinfo.NodeInfo) *nodeInfoListItem {
- return &nodeInfoListItem{
- info: ni,
- }
- }
- // moveNodeInfoToHead moves a NodeInfo to the head of "cache.nodes" doubly
- // linked list. The head is the most recently updated NodeInfo.
- // We assume cache lock is already acquired.
- func (cache *schedulerCache) moveNodeInfoToHead(name string) {
- ni, ok := cache.nodes[name]
- if !ok {
- klog.Errorf("No NodeInfo with name %v found in the cache", name)
- return
- }
- // if the node info list item is already at the head, we are done.
- if ni == cache.headNode {
- return
- }
- if ni.prev != nil {
- ni.prev.next = ni.next
- }
- if ni.next != nil {
- ni.next.prev = ni.prev
- }
- if cache.headNode != nil {
- cache.headNode.prev = ni
- }
- ni.next = cache.headNode
- ni.prev = nil
- cache.headNode = ni
- }
- // removeNodeInfoFromList removes a NodeInfo from the "cache.nodes" doubly
- // linked list.
- // We assume cache lock is already acquired.
- func (cache *schedulerCache) removeNodeInfoFromList(name string) {
- ni, ok := cache.nodes[name]
- if !ok {
- klog.Errorf("No NodeInfo with name %v found in the cache", name)
- return
- }
- if ni.prev != nil {
- ni.prev.next = ni.next
- }
- if ni.next != nil {
- ni.next.prev = ni.prev
- }
- // if the removed item was at the head, we must update the head.
- if ni == cache.headNode {
- cache.headNode = ni.next
- }
- delete(cache.nodes, name)
- }
- // Snapshot takes a snapshot of the current scheduler cache. This is used for
- // debugging purposes only and shouldn't be confused with UpdateSnapshot
- // function.
- // This method is expensive, and should be only used in non-critical path.
- func (cache *schedulerCache) Dump() *Dump {
- cache.mu.RLock()
- defer cache.mu.RUnlock()
- nodes := make(map[string]*schedulernodeinfo.NodeInfo, len(cache.nodes))
- for k, v := range cache.nodes {
- nodes[k] = v.info.Clone()
- }
- assumedPods := make(map[string]bool, len(cache.assumedPods))
- for k, v := range cache.assumedPods {
- assumedPods[k] = v
- }
- return &Dump{
- Nodes: nodes,
- AssumedPods: assumedPods,
- }
- }
- // UpdateSnapshot takes a snapshot of cached NodeInfo map. This is called at
- // beginning of every scheduling cycle.
- // This function tracks generation number of NodeInfo and updates only the
- // entries of an existing snapshot that have changed after the snapshot was taken.
- func (cache *schedulerCache) UpdateSnapshot(nodeSnapshot *Snapshot) error {
- cache.mu.Lock()
- defer cache.mu.Unlock()
- balancedVolumesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes)
- // Get the last generation of the snapshot.
- snapshotGeneration := nodeSnapshot.generation
- // NodeInfoList and HavePodsWithAffinityNodeInfoList must be re-created if a node was added
- // or removed from the cache.
- updateAllLists := false
- // HavePodsWithAffinityNodeInfoList must be re-created if a node changed its
- // status from having pods with affinity to NOT having pods with affinity or the other
- // way around.
- updateNodesHavePodsWithAffinity := false
- // Start from the head of the NodeInfo doubly linked list and update snapshot
- // of NodeInfos updated after the last snapshot.
- for node := cache.headNode; node != nil; node = node.next {
- if node.info.GetGeneration() <= snapshotGeneration {
- // all the nodes are updated before the existing snapshot. We are done.
- break
- }
- if balancedVolumesEnabled && node.info.TransientInfo != nil {
- // Transient scheduler info is reset here.
- node.info.TransientInfo.ResetTransientSchedulerInfo()
- }
- if np := node.info.Node(); np != nil {
- existing, ok := nodeSnapshot.nodeInfoMap[np.Name]
- if !ok {
- updateAllLists = true
- existing = &schedulernodeinfo.NodeInfo{}
- nodeSnapshot.nodeInfoMap[np.Name] = existing
- }
- clone := node.info.Clone()
- // We track nodes that have pods with affinity, here we check if this node changed its
- // status from having pods with affinity to NOT having pods with affinity or the other
- // way around.
- if (len(existing.PodsWithAffinity()) > 0) != (len(clone.PodsWithAffinity()) > 0) {
- updateNodesHavePodsWithAffinity = true
- }
- // We need to preserve the original pointer of the NodeInfo struct since it
- // is used in the NodeInfoList, which we may not update.
- *existing = *clone
- }
- }
- // Update the snapshot generation with the latest NodeInfo generation.
- if cache.headNode != nil {
- nodeSnapshot.generation = cache.headNode.info.GetGeneration()
- }
- if len(nodeSnapshot.nodeInfoMap) > len(cache.nodes) {
- cache.removeDeletedNodesFromSnapshot(nodeSnapshot)
- updateAllLists = true
- }
- if updateAllLists || updateNodesHavePodsWithAffinity {
- cache.updateNodeInfoSnapshotList(nodeSnapshot, updateAllLists)
- }
- if len(nodeSnapshot.nodeInfoList) != cache.nodeTree.numNodes {
- errMsg := fmt.Sprintf("snapshot state is not consistent, length of NodeInfoList=%v not equal to length of nodes in tree=%v "+
- ", length of NodeInfoMap=%v, length of nodes in cache=%v"+
- ", trying to recover",
- len(nodeSnapshot.nodeInfoList), cache.nodeTree.numNodes,
- len(nodeSnapshot.nodeInfoMap), len(cache.nodes))
- klog.Error(errMsg)
- // We will try to recover by re-creating the lists for the next scheduling cycle, but still return an
- // error to surface the problem, the error will likely cause a failure to the current scheduling cycle.
- cache.updateNodeInfoSnapshotList(nodeSnapshot, true)
- return fmt.Errorf(errMsg)
- }
- return nil
- }
- func (cache *schedulerCache) updateNodeInfoSnapshotList(snapshot *Snapshot, updateAll bool) {
- snapshot.havePodsWithAffinityNodeInfoList = make([]*schedulernodeinfo.NodeInfo, 0, cache.nodeTree.numNodes)
- if updateAll {
- // Take a snapshot of the nodes order in the tree
- snapshot.nodeInfoList = make([]*schedulernodeinfo.NodeInfo, 0, cache.nodeTree.numNodes)
- for i := 0; i < cache.nodeTree.numNodes; i++ {
- nodeName := cache.nodeTree.next()
- if n := snapshot.nodeInfoMap[nodeName]; n != nil {
- snapshot.nodeInfoList = append(snapshot.nodeInfoList, n)
- if len(n.PodsWithAffinity()) > 0 {
- snapshot.havePodsWithAffinityNodeInfoList = append(snapshot.havePodsWithAffinityNodeInfoList, n)
- }
- } else {
- klog.Errorf("node %q exist in nodeTree but not in NodeInfoMap, this should not happen.", nodeName)
- }
- }
- } else {
- for _, n := range snapshot.nodeInfoList {
- if len(n.PodsWithAffinity()) > 0 {
- snapshot.havePodsWithAffinityNodeInfoList = append(snapshot.havePodsWithAffinityNodeInfoList, n)
- }
- }
- }
- }
- // If certain nodes were deleted after the last snapshot was taken, we should remove them from the snapshot.
- func (cache *schedulerCache) removeDeletedNodesFromSnapshot(snapshot *Snapshot) {
- toDelete := len(snapshot.nodeInfoMap) - len(cache.nodes)
- for name := range snapshot.nodeInfoMap {
- if toDelete <= 0 {
- break
- }
- if _, ok := cache.nodes[name]; !ok {
- delete(snapshot.nodeInfoMap, name)
- toDelete--
- }
- }
- }
- func (cache *schedulerCache) List(selector labels.Selector) ([]*v1.Pod, error) {
- alwaysTrue := func(p *v1.Pod) bool { return true }
- return cache.FilteredList(alwaysTrue, selector)
- }
- func (cache *schedulerCache) FilteredList(podFilter schedulerlisters.PodFilter, selector labels.Selector) ([]*v1.Pod, error) {
- cache.mu.RLock()
- defer cache.mu.RUnlock()
- // podFilter is expected to return true for most or all of the pods. We
- // can avoid expensive array growth without wasting too much memory by
- // pre-allocating capacity.
- maxSize := 0
- for _, n := range cache.nodes {
- maxSize += len(n.info.Pods())
- }
- pods := make([]*v1.Pod, 0, maxSize)
- for _, n := range cache.nodes {
- for _, pod := range n.info.Pods() {
- if podFilter(pod) && selector.Matches(labels.Set(pod.Labels)) {
- pods = append(pods, pod)
- }
- }
- }
- return pods, nil
- }
- func (cache *schedulerCache) AssumePod(pod *v1.Pod) error {
- key, err := schedulernodeinfo.GetPodKey(pod)
- if err != nil {
- return err
- }
- cache.mu.Lock()
- defer cache.mu.Unlock()
- if _, ok := cache.podStates[key]; ok {
- return fmt.Errorf("pod %v is in the cache, so can't be assumed", key)
- }
- cache.addPod(pod)
- ps := &podState{
- pod: pod,
- }
- cache.podStates[key] = ps
- cache.assumedPods[key] = true
- return nil
- }
- func (cache *schedulerCache) FinishBinding(pod *v1.Pod) error {
- return cache.finishBinding(pod, time.Now())
- }
- // finishBinding exists to make tests determinitistic by injecting now as an argument
- func (cache *schedulerCache) finishBinding(pod *v1.Pod, now time.Time) error {
- key, err := schedulernodeinfo.GetPodKey(pod)
- if err != nil {
- return err
- }
- cache.mu.RLock()
- defer cache.mu.RUnlock()
- klog.V(5).Infof("Finished binding for pod %v. Can be expired.", key)
- currState, ok := cache.podStates[key]
- if ok && cache.assumedPods[key] {
- dl := now.Add(cache.ttl)
- currState.bindingFinished = true
- currState.deadline = &dl
- }
- return nil
- }
- func (cache *schedulerCache) ForgetPod(pod *v1.Pod) error {
- key, err := schedulernodeinfo.GetPodKey(pod)
- if err != nil {
- return err
- }
- cache.mu.Lock()
- defer cache.mu.Unlock()
- currState, ok := cache.podStates[key]
- if ok && currState.pod.Spec.NodeName != pod.Spec.NodeName {
- return fmt.Errorf("pod %v was assumed on %v but assigned to %v", key, pod.Spec.NodeName, currState.pod.Spec.NodeName)
- }
- switch {
- // Only assumed pod can be forgotten.
- case ok && cache.assumedPods[key]:
- err := cache.removePod(pod)
- if err != nil {
- return err
- }
- delete(cache.assumedPods, key)
- delete(cache.podStates, key)
- default:
- return fmt.Errorf("pod %v wasn't assumed so cannot be forgotten", key)
- }
- return nil
- }
- // Assumes that lock is already acquired.
- func (cache *schedulerCache) addPod(pod *v1.Pod) {
- n, ok := cache.nodes[pod.Spec.NodeName]
- if !ok {
- n = newNodeInfoListItem(schedulernodeinfo.NewNodeInfo())
- cache.nodes[pod.Spec.NodeName] = n
- }
- n.info.AddPod(pod)
- cache.moveNodeInfoToHead(pod.Spec.NodeName)
- }
- // Assumes that lock is already acquired.
- func (cache *schedulerCache) updatePod(oldPod, newPod *v1.Pod) error {
- if err := cache.removePod(oldPod); err != nil {
- return err
- }
- cache.addPod(newPod)
- return nil
- }
- // Assumes that lock is already acquired.
- // Removes a pod from the cached node info. When a node is removed, some pod
- // deletion events might arrive later. This is not a problem, as the pods in
- // the node are assumed to be removed already.
- func (cache *schedulerCache) removePod(pod *v1.Pod) error {
- n, ok := cache.nodes[pod.Spec.NodeName]
- if !ok {
- return nil
- }
- if err := n.info.RemovePod(pod); err != nil {
- return err
- }
- cache.moveNodeInfoToHead(pod.Spec.NodeName)
- return nil
- }
- func (cache *schedulerCache) AddPod(pod *v1.Pod) error {
- key, err := schedulernodeinfo.GetPodKey(pod)
- if err != nil {
- return err
- }
- cache.mu.Lock()
- defer cache.mu.Unlock()
- currState, ok := cache.podStates[key]
- switch {
- case ok && cache.assumedPods[key]:
- if currState.pod.Spec.NodeName != pod.Spec.NodeName {
- // The pod was added to a different node than it was assumed to.
- klog.Warningf("Pod %v was assumed to be on %v but got added to %v", key, pod.Spec.NodeName, currState.pod.Spec.NodeName)
- // Clean this up.
- if err = cache.removePod(currState.pod); err != nil {
- klog.Errorf("removing pod error: %v", err)
- }
- cache.addPod(pod)
- }
- delete(cache.assumedPods, key)
- cache.podStates[key].deadline = nil
- cache.podStates[key].pod = pod
- case !ok:
- // Pod was expired. We should add it back.
- cache.addPod(pod)
- ps := &podState{
- pod: pod,
- }
- cache.podStates[key] = ps
- default:
- return fmt.Errorf("pod %v was already in added state", key)
- }
- return nil
- }
- func (cache *schedulerCache) UpdatePod(oldPod, newPod *v1.Pod) error {
- key, err := schedulernodeinfo.GetPodKey(oldPod)
- if err != nil {
- return err
- }
- cache.mu.Lock()
- defer cache.mu.Unlock()
- currState, ok := cache.podStates[key]
- switch {
- // An assumed pod won't have Update/Remove event. It needs to have Add event
- // before Update event, in which case the state would change from Assumed to Added.
- case ok && !cache.assumedPods[key]:
- if currState.pod.Spec.NodeName != newPod.Spec.NodeName {
- klog.Errorf("Pod %v updated on a different node than previously added to.", key)
- klog.Fatalf("Schedulercache is corrupted and can badly affect scheduling decisions")
- }
- if err := cache.updatePod(oldPod, newPod); err != nil {
- return err
- }
- currState.pod = newPod
- default:
- return fmt.Errorf("pod %v is not added to scheduler cache, so cannot be updated", key)
- }
- return nil
- }
- func (cache *schedulerCache) RemovePod(pod *v1.Pod) error {
- key, err := schedulernodeinfo.GetPodKey(pod)
- if err != nil {
- return err
- }
- cache.mu.Lock()
- defer cache.mu.Unlock()
- currState, ok := cache.podStates[key]
- switch {
- // An assumed pod won't have Delete/Remove event. It needs to have Add event
- // before Remove event, in which case the state would change from Assumed to Added.
- case ok && !cache.assumedPods[key]:
- if currState.pod.Spec.NodeName != pod.Spec.NodeName {
- klog.Errorf("Pod %v was assumed to be on %v but got added to %v", key, pod.Spec.NodeName, currState.pod.Spec.NodeName)
- klog.Fatalf("Schedulercache is corrupted and can badly affect scheduling decisions")
- }
- err := cache.removePod(currState.pod)
- if err != nil {
- return err
- }
- delete(cache.podStates, key)
- default:
- return fmt.Errorf("pod %v is not found in scheduler cache, so cannot be removed from it", key)
- }
- return nil
- }
- func (cache *schedulerCache) IsAssumedPod(pod *v1.Pod) (bool, error) {
- key, err := schedulernodeinfo.GetPodKey(pod)
- if err != nil {
- return false, err
- }
- cache.mu.RLock()
- defer cache.mu.RUnlock()
- b, found := cache.assumedPods[key]
- if !found {
- return false, nil
- }
- return b, nil
- }
- // GetPod might return a pod for which its node has already been deleted from
- // the main cache. This is useful to properly process pod update events.
- func (cache *schedulerCache) GetPod(pod *v1.Pod) (*v1.Pod, error) {
- key, err := schedulernodeinfo.GetPodKey(pod)
- if err != nil {
- return nil, err
- }
- cache.mu.RLock()
- defer cache.mu.RUnlock()
- podState, ok := cache.podStates[key]
- if !ok {
- return nil, fmt.Errorf("pod %v does not exist in scheduler cache", key)
- }
- return podState.pod, nil
- }
- func (cache *schedulerCache) AddNode(node *v1.Node) error {
- cache.mu.Lock()
- defer cache.mu.Unlock()
- n, ok := cache.nodes[node.Name]
- if !ok {
- n = newNodeInfoListItem(schedulernodeinfo.NewNodeInfo())
- cache.nodes[node.Name] = n
- } else {
- cache.removeNodeImageStates(n.info.Node())
- }
- cache.moveNodeInfoToHead(node.Name)
- cache.nodeTree.addNode(node)
- cache.addNodeImageStates(node, n.info)
- return n.info.SetNode(node)
- }
- func (cache *schedulerCache) UpdateNode(oldNode, newNode *v1.Node) error {
- cache.mu.Lock()
- defer cache.mu.Unlock()
- n, ok := cache.nodes[newNode.Name]
- if !ok {
- n = newNodeInfoListItem(schedulernodeinfo.NewNodeInfo())
- cache.nodes[newNode.Name] = n
- cache.nodeTree.addNode(newNode)
- } else {
- cache.removeNodeImageStates(n.info.Node())
- }
- cache.moveNodeInfoToHead(newNode.Name)
- cache.nodeTree.updateNode(oldNode, newNode)
- cache.addNodeImageStates(newNode, n.info)
- return n.info.SetNode(newNode)
- }
- // RemoveNode removes a node from the cache.
- // Some nodes might still have pods because their deletion events didn't arrive
- // yet. For most intents and purposes, those pods are removed from the cache,
- // having it's source of truth in the cached nodes.
- // However, some information on pods (assumedPods, podStates) persist. These
- // caches will be eventually consistent as pod deletion events arrive.
- func (cache *schedulerCache) RemoveNode(node *v1.Node) error {
- cache.mu.Lock()
- defer cache.mu.Unlock()
- _, ok := cache.nodes[node.Name]
- if !ok {
- return fmt.Errorf("node %v is not found", node.Name)
- }
- cache.removeNodeInfoFromList(node.Name)
- if err := cache.nodeTree.removeNode(node); err != nil {
- return err
- }
- cache.removeNodeImageStates(node)
- return nil
- }
- // addNodeImageStates adds states of the images on given node to the given nodeInfo and update the imageStates in
- // scheduler cache. This function assumes the lock to scheduler cache has been acquired.
- func (cache *schedulerCache) addNodeImageStates(node *v1.Node, nodeInfo *schedulernodeinfo.NodeInfo) {
- newSum := make(map[string]*schedulernodeinfo.ImageStateSummary)
- for _, image := range node.Status.Images {
- for _, name := range image.Names {
- // update the entry in imageStates
- state, ok := cache.imageStates[name]
- if !ok {
- state = &imageState{
- size: image.SizeBytes,
- nodes: sets.NewString(node.Name),
- }
- cache.imageStates[name] = state
- } else {
- state.nodes.Insert(node.Name)
- }
- // create the imageStateSummary for this image
- if _, ok := newSum[name]; !ok {
- newSum[name] = cache.createImageStateSummary(state)
- }
- }
- }
- nodeInfo.SetImageStates(newSum)
- }
- // removeNodeImageStates removes the given node record from image entries having the node
- // in imageStates cache. After the removal, if any image becomes free, i.e., the image
- // is no longer available on any node, the image entry will be removed from imageStates.
- func (cache *schedulerCache) removeNodeImageStates(node *v1.Node) {
- if node == nil {
- return
- }
- for _, image := range node.Status.Images {
- for _, name := range image.Names {
- state, ok := cache.imageStates[name]
- if ok {
- state.nodes.Delete(node.Name)
- if len(state.nodes) == 0 {
- // Remove the unused image to make sure the length of
- // imageStates represents the total number of different
- // images on all nodes
- delete(cache.imageStates, name)
- }
- }
- }
- }
- }
- func (cache *schedulerCache) run() {
- go wait.Until(cache.cleanupExpiredAssumedPods, cache.period, cache.stop)
- }
- func (cache *schedulerCache) cleanupExpiredAssumedPods() {
- cache.cleanupAssumedPods(time.Now())
- }
- // cleanupAssumedPods exists for making test deterministic by taking time as input argument.
- // It also reports metrics on the cache size for nodes, pods, and assumed pods.
- func (cache *schedulerCache) cleanupAssumedPods(now time.Time) {
- cache.mu.Lock()
- defer cache.mu.Unlock()
- defer cache.updateMetrics()
- // The size of assumedPods should be small
- for key := range cache.assumedPods {
- ps, ok := cache.podStates[key]
- if !ok {
- klog.Fatal("Key found in assumed set but not in podStates. Potentially a logical error.")
- }
- if !ps.bindingFinished {
- klog.V(3).Infof("Couldn't expire cache for pod %v/%v. Binding is still in progress.",
- ps.pod.Namespace, ps.pod.Name)
- continue
- }
- if now.After(*ps.deadline) {
- klog.Warningf("Pod %s/%s expired", ps.pod.Namespace, ps.pod.Name)
- if err := cache.expirePod(key, ps); err != nil {
- klog.Errorf("ExpirePod failed for %s: %v", key, err)
- }
- }
- }
- }
- func (cache *schedulerCache) expirePod(key string, ps *podState) error {
- if err := cache.removePod(ps.pod); err != nil {
- return err
- }
- delete(cache.assumedPods, key)
- delete(cache.podStates, key)
- return nil
- }
- // GetNodeInfo returns cached data for the node name.
- func (cache *schedulerCache) GetNodeInfo(nodeName string) (*v1.Node, error) {
- cache.mu.RLock()
- defer cache.mu.RUnlock()
- n, ok := cache.nodes[nodeName]
- if !ok {
- return nil, fmt.Errorf("node %q not found in cache", nodeName)
- }
- return n.info.Node(), nil
- }
- // updateMetrics updates cache size metric values for pods, assumed pods, and nodes
- func (cache *schedulerCache) updateMetrics() {
- metrics.CacheSize.WithLabelValues("assumed_pods").Set(float64(len(cache.assumedPods)))
- metrics.CacheSize.WithLabelValues("pods").Set(float64(len(cache.podStates)))
- metrics.CacheSize.WithLabelValues("nodes").Set(float64(len(cache.nodes)))
- }
|