123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470 |
- /*
- Copyright 2019 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package scheduler
- import (
- "fmt"
- "k8s.io/klog"
- "reflect"
- "k8s.io/api/core/v1"
- storagev1 "k8s.io/api/storage/v1"
- utilruntime "k8s.io/apimachinery/pkg/util/runtime"
- coreinformers "k8s.io/client-go/informers/core/v1"
- storageinformers "k8s.io/client-go/informers/storage/v1"
- "k8s.io/client-go/tools/cache"
- )
- func (sched *Scheduler) onPvAdd(obj interface{}) {
- // Pods created when there are no PVs available will be stuck in
- // unschedulable queue. But unbound PVs created for static provisioning and
- // delay binding storage class are skipped in PV controller dynamic
- // provisioning and binding process, will not trigger events to schedule pod
- // again. So we need to move pods to active queue on PV add for this
- // scenario.
- sched.config.SchedulingQueue.MoveAllToActiveQueue()
- }
- func (sched *Scheduler) onPvUpdate(old, new interface{}) {
- // Scheduler.bindVolumesWorker may fail to update assumed pod volume
- // bindings due to conflicts if PVs are updated by PV controller or other
- // parties, then scheduler will add pod back to unschedulable queue. We
- // need to move pods to active queue on PV update for this scenario.
- sched.config.SchedulingQueue.MoveAllToActiveQueue()
- }
- func (sched *Scheduler) onPvcAdd(obj interface{}) {
- sched.config.SchedulingQueue.MoveAllToActiveQueue()
- }
- func (sched *Scheduler) onPvcUpdate(old, new interface{}) {
- sched.config.SchedulingQueue.MoveAllToActiveQueue()
- }
- func (sched *Scheduler) onStorageClassAdd(obj interface{}) {
- sc, ok := obj.(*storagev1.StorageClass)
- if !ok {
- klog.Errorf("cannot convert to *storagev1.StorageClass: %v", obj)
- return
- }
- // CheckVolumeBindingPred fails if pod has unbound immediate PVCs. If these
- // PVCs have specified StorageClass name, creating StorageClass objects
- // with late binding will cause predicates to pass, so we need to move pods
- // to active queue.
- // We don't need to invalidate cached results because results will not be
- // cached for pod that has unbound immediate PVCs.
- if sc.VolumeBindingMode != nil && *sc.VolumeBindingMode == storagev1.VolumeBindingWaitForFirstConsumer {
- sched.config.SchedulingQueue.MoveAllToActiveQueue()
- }
- }
- func (sched *Scheduler) onServiceAdd(obj interface{}) {
- sched.config.SchedulingQueue.MoveAllToActiveQueue()
- }
- func (sched *Scheduler) onServiceUpdate(oldObj interface{}, newObj interface{}) {
- sched.config.SchedulingQueue.MoveAllToActiveQueue()
- }
- func (sched *Scheduler) onServiceDelete(obj interface{}) {
- sched.config.SchedulingQueue.MoveAllToActiveQueue()
- }
- func (sched *Scheduler) addNodeToCache(obj interface{}) {
- node, ok := obj.(*v1.Node)
- if !ok {
- klog.Errorf("cannot convert to *v1.Node: %v", obj)
- return
- }
- if err := sched.config.SchedulerCache.AddNode(node); err != nil {
- klog.Errorf("scheduler cache AddNode failed: %v", err)
- }
- sched.config.SchedulingQueue.MoveAllToActiveQueue()
- }
- func (sched *Scheduler) updateNodeInCache(oldObj, newObj interface{}) {
- oldNode, ok := oldObj.(*v1.Node)
- if !ok {
- klog.Errorf("cannot convert oldObj to *v1.Node: %v", oldObj)
- return
- }
- newNode, ok := newObj.(*v1.Node)
- if !ok {
- klog.Errorf("cannot convert newObj to *v1.Node: %v", newObj)
- return
- }
- if err := sched.config.SchedulerCache.UpdateNode(oldNode, newNode); err != nil {
- klog.Errorf("scheduler cache UpdateNode failed: %v", err)
- }
- // Only activate unschedulable pods if the node became more schedulable.
- // We skip the node property comparison when there is no unschedulable pods in the queue
- // to save processing cycles. We still trigger a move to active queue to cover the case
- // that a pod being processed by the scheduler is determined unschedulable. We want this
- // pod to be reevaluated when a change in the cluster happens.
- if sched.config.SchedulingQueue.NumUnschedulablePods() == 0 || nodeSchedulingPropertiesChanged(newNode, oldNode) {
- sched.config.SchedulingQueue.MoveAllToActiveQueue()
- }
- }
- func (sched *Scheduler) deleteNodeFromCache(obj interface{}) {
- var node *v1.Node
- switch t := obj.(type) {
- case *v1.Node:
- node = t
- case cache.DeletedFinalStateUnknown:
- var ok bool
- node, ok = t.Obj.(*v1.Node)
- if !ok {
- klog.Errorf("cannot convert to *v1.Node: %v", t.Obj)
- return
- }
- default:
- klog.Errorf("cannot convert to *v1.Node: %v", t)
- return
- }
- // NOTE: Updates must be written to scheduler cache before invalidating
- // equivalence cache, because we could snapshot equivalence cache after the
- // invalidation and then snapshot the cache itself. If the cache is
- // snapshotted before updates are written, we would update equivalence
- // cache with stale information which is based on snapshot of old cache.
- if err := sched.config.SchedulerCache.RemoveNode(node); err != nil {
- klog.Errorf("scheduler cache RemoveNode failed: %v", err)
- }
- }
- func (sched *Scheduler) addPodToSchedulingQueue(obj interface{}) {
- if err := sched.config.SchedulingQueue.Add(obj.(*v1.Pod)); err != nil {
- utilruntime.HandleError(fmt.Errorf("unable to queue %T: %v", obj, err))
- }
- }
- func (sched *Scheduler) updatePodInSchedulingQueue(oldObj, newObj interface{}) {
- pod := newObj.(*v1.Pod)
- if sched.skipPodUpdate(pod) {
- return
- }
- if err := sched.config.SchedulingQueue.Update(oldObj.(*v1.Pod), pod); err != nil {
- utilruntime.HandleError(fmt.Errorf("unable to update %T: %v", newObj, err))
- }
- }
- func (sched *Scheduler) deletePodFromSchedulingQueue(obj interface{}) {
- var pod *v1.Pod
- switch t := obj.(type) {
- case *v1.Pod:
- pod = obj.(*v1.Pod)
- case cache.DeletedFinalStateUnknown:
- var ok bool
- pod, ok = t.Obj.(*v1.Pod)
- if !ok {
- utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched))
- return
- }
- default:
- utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj))
- return
- }
- if err := sched.config.SchedulingQueue.Delete(pod); err != nil {
- utilruntime.HandleError(fmt.Errorf("unable to dequeue %T: %v", obj, err))
- }
- if sched.config.VolumeBinder != nil {
- // Volume binder only wants to keep unassigned pods
- sched.config.VolumeBinder.DeletePodBindings(pod)
- }
- }
- func (sched *Scheduler) addPodToCache(obj interface{}) {
- pod, ok := obj.(*v1.Pod)
- if !ok {
- klog.Errorf("cannot convert to *v1.Pod: %v", obj)
- return
- }
- if err := sched.config.SchedulerCache.AddPod(pod); err != nil {
- klog.Errorf("scheduler cache AddPod failed: %v", err)
- }
- sched.config.SchedulingQueue.AssignedPodAdded(pod)
- }
- func (sched *Scheduler) updatePodInCache(oldObj, newObj interface{}) {
- oldPod, ok := oldObj.(*v1.Pod)
- if !ok {
- klog.Errorf("cannot convert oldObj to *v1.Pod: %v", oldObj)
- return
- }
- newPod, ok := newObj.(*v1.Pod)
- if !ok {
- klog.Errorf("cannot convert newObj to *v1.Pod: %v", newObj)
- return
- }
- // NOTE: Updates must be written to scheduler cache before invalidating
- // equivalence cache, because we could snapshot equivalence cache after the
- // invalidation and then snapshot the cache itself. If the cache is
- // snapshotted before updates are written, we would update equivalence
- // cache with stale information which is based on snapshot of old cache.
- if err := sched.config.SchedulerCache.UpdatePod(oldPod, newPod); err != nil {
- klog.Errorf("scheduler cache UpdatePod failed: %v", err)
- }
- sched.config.SchedulingQueue.AssignedPodUpdated(newPod)
- }
- func (sched *Scheduler) deletePodFromCache(obj interface{}) {
- var pod *v1.Pod
- switch t := obj.(type) {
- case *v1.Pod:
- pod = t
- case cache.DeletedFinalStateUnknown:
- var ok bool
- pod, ok = t.Obj.(*v1.Pod)
- if !ok {
- klog.Errorf("cannot convert to *v1.Pod: %v", t.Obj)
- return
- }
- default:
- klog.Errorf("cannot convert to *v1.Pod: %v", t)
- return
- }
- // NOTE: Updates must be written to scheduler cache before invalidating
- // equivalence cache, because we could snapshot equivalence cache after the
- // invalidation and then snapshot the cache itself. If the cache is
- // snapshotted before updates are written, we would update equivalence
- // cache with stale information which is based on snapshot of old cache.
- if err := sched.config.SchedulerCache.RemovePod(pod); err != nil {
- klog.Errorf("scheduler cache RemovePod failed: %v", err)
- }
- sched.config.SchedulingQueue.MoveAllToActiveQueue()
- }
- // assignedPod selects pods that are assigned (scheduled and running).
- func assignedPod(pod *v1.Pod) bool {
- return len(pod.Spec.NodeName) != 0
- }
- // responsibleForPod returns true if the pod has asked to be scheduled by the given scheduler.
- func responsibleForPod(pod *v1.Pod, schedulerName string) bool {
- return schedulerName == pod.Spec.SchedulerName
- }
- // skipPodUpdate checks whether the specified pod update should be ignored.
- // This function will return true if
- // - The pod has already been assumed, AND
- // - The pod has only its ResourceVersion, Spec.NodeName and/or Annotations
- // updated.
- func (sched *Scheduler) skipPodUpdate(pod *v1.Pod) bool {
- // Non-assumed pods should never be skipped.
- isAssumed, err := sched.config.SchedulerCache.IsAssumedPod(pod)
- if err != nil {
- utilruntime.HandleError(fmt.Errorf("failed to check whether pod %s/%s is assumed: %v", pod.Namespace, pod.Name, err))
- return false
- }
- if !isAssumed {
- return false
- }
- // Gets the assumed pod from the cache.
- assumedPod, err := sched.config.SchedulerCache.GetPod(pod)
- if err != nil {
- utilruntime.HandleError(fmt.Errorf("failed to get assumed pod %s/%s from cache: %v", pod.Namespace, pod.Name, err))
- return false
- }
- // Compares the assumed pod in the cache with the pod update. If they are
- // equal (with certain fields excluded), this pod update will be skipped.
- f := func(pod *v1.Pod) *v1.Pod {
- p := pod.DeepCopy()
- // ResourceVersion must be excluded because each object update will
- // have a new resource version.
- p.ResourceVersion = ""
- // Spec.NodeName must be excluded because the pod assumed in the cache
- // is expected to have a node assigned while the pod update may nor may
- // not have this field set.
- p.Spec.NodeName = ""
- // Annotations must be excluded for the reasons described in
- // https://github.com/kubernetes/kubernetes/issues/52914.
- p.Annotations = nil
- return p
- }
- assumedPodCopy, podCopy := f(assumedPod), f(pod)
- if !reflect.DeepEqual(assumedPodCopy, podCopy) {
- return false
- }
- klog.V(3).Infof("Skipping pod %s/%s update", pod.Namespace, pod.Name)
- return true
- }
- // AddAllEventHandlers is a helper function used in tests and in Scheduler
- // to add event handlers for various informers.
- func AddAllEventHandlers(
- sched *Scheduler,
- schedulerName string,
- nodeInformer coreinformers.NodeInformer,
- podInformer coreinformers.PodInformer,
- pvInformer coreinformers.PersistentVolumeInformer,
- pvcInformer coreinformers.PersistentVolumeClaimInformer,
- serviceInformer coreinformers.ServiceInformer,
- storageClassInformer storageinformers.StorageClassInformer,
- ) {
- // scheduled pod cache
- podInformer.Informer().AddEventHandler(
- cache.FilteringResourceEventHandler{
- FilterFunc: func(obj interface{}) bool {
- switch t := obj.(type) {
- case *v1.Pod:
- return assignedPod(t)
- case cache.DeletedFinalStateUnknown:
- if pod, ok := t.Obj.(*v1.Pod); ok {
- return assignedPod(pod)
- }
- utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched))
- return false
- default:
- utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj))
- return false
- }
- },
- Handler: cache.ResourceEventHandlerFuncs{
- AddFunc: sched.addPodToCache,
- UpdateFunc: sched.updatePodInCache,
- DeleteFunc: sched.deletePodFromCache,
- },
- },
- )
- // unscheduled pod queue
- podInformer.Informer().AddEventHandler(
- cache.FilteringResourceEventHandler{
- FilterFunc: func(obj interface{}) bool {
- switch t := obj.(type) {
- case *v1.Pod:
- return !assignedPod(t) && responsibleForPod(t, schedulerName)
- case cache.DeletedFinalStateUnknown:
- if pod, ok := t.Obj.(*v1.Pod); ok {
- return !assignedPod(pod) && responsibleForPod(pod, schedulerName)
- }
- utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched))
- return false
- default:
- utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj))
- return false
- }
- },
- Handler: cache.ResourceEventHandlerFuncs{
- AddFunc: sched.addPodToSchedulingQueue,
- UpdateFunc: sched.updatePodInSchedulingQueue,
- DeleteFunc: sched.deletePodFromSchedulingQueue,
- },
- },
- )
- nodeInformer.Informer().AddEventHandler(
- cache.ResourceEventHandlerFuncs{
- AddFunc: sched.addNodeToCache,
- UpdateFunc: sched.updateNodeInCache,
- DeleteFunc: sched.deleteNodeFromCache,
- },
- )
- // On add and delete of PVs, it will affect equivalence cache items
- // related to persistent volume
- pvInformer.Informer().AddEventHandler(
- cache.ResourceEventHandlerFuncs{
- // MaxPDVolumeCountPredicate: since it relies on the counts of PV.
- AddFunc: sched.onPvAdd,
- UpdateFunc: sched.onPvUpdate,
- },
- )
- // This is for MaxPDVolumeCountPredicate: add/delete PVC will affect counts of PV when it is bound.
- pvcInformer.Informer().AddEventHandler(
- cache.ResourceEventHandlerFuncs{
- AddFunc: sched.onPvcAdd,
- UpdateFunc: sched.onPvcUpdate,
- },
- )
- // This is for ServiceAffinity: affected by the selector of the service is updated.
- // Also, if new service is added, equivalence cache will also become invalid since
- // existing pods may be "captured" by this service and change this predicate result.
- serviceInformer.Informer().AddEventHandler(
- cache.ResourceEventHandlerFuncs{
- AddFunc: sched.onServiceAdd,
- UpdateFunc: sched.onServiceUpdate,
- DeleteFunc: sched.onServiceDelete,
- },
- )
- storageClassInformer.Informer().AddEventHandler(
- cache.ResourceEventHandlerFuncs{
- AddFunc: sched.onStorageClassAdd,
- },
- )
- }
- func nodeSchedulingPropertiesChanged(newNode *v1.Node, oldNode *v1.Node) bool {
- if nodeSpecUnschedulableChanged(newNode, oldNode) {
- return true
- }
- if nodeAllocatableChanged(newNode, oldNode) {
- return true
- }
- if nodeLabelsChanged(newNode, oldNode) {
- return true
- }
- if nodeTaintsChanged(newNode, oldNode) {
- return true
- }
- if nodeConditionsChanged(newNode, oldNode) {
- return true
- }
- return false
- }
- func nodeAllocatableChanged(newNode *v1.Node, oldNode *v1.Node) bool {
- return !reflect.DeepEqual(oldNode.Status.Allocatable, newNode.Status.Allocatable)
- }
- func nodeLabelsChanged(newNode *v1.Node, oldNode *v1.Node) bool {
- return !reflect.DeepEqual(oldNode.GetLabels(), newNode.GetLabels())
- }
- func nodeTaintsChanged(newNode *v1.Node, oldNode *v1.Node) bool {
- return !reflect.DeepEqual(newNode.Spec.Taints, oldNode.Spec.Taints)
- }
- func nodeConditionsChanged(newNode *v1.Node, oldNode *v1.Node) bool {
- strip := func(conditions []v1.NodeCondition) map[v1.NodeConditionType]v1.ConditionStatus {
- conditionStatuses := make(map[v1.NodeConditionType]v1.ConditionStatus, len(conditions))
- for i := range conditions {
- conditionStatuses[conditions[i].Type] = conditions[i].Status
- }
- return conditionStatuses
- }
- return !reflect.DeepEqual(strip(oldNode.Status.Conditions), strip(newNode.Status.Conditions))
- }
- func nodeSpecUnschedulableChanged(newNode *v1.Node, oldNode *v1.Node) bool {
- return newNode.Spec.Unschedulable != oldNode.Spec.Unschedulable && newNode.Spec.Unschedulable == false
- }
|