123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769 |
- /*
- Copyright 2014 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- // Package factory can set up a scheduler. This code is here instead of
- // cmd/scheduler for both testability and reuse.
- package factory
- import (
- "fmt"
- "time"
- v1 "k8s.io/api/core/v1"
- "k8s.io/apimachinery/pkg/api/errors"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/fields"
- "k8s.io/apimachinery/pkg/labels"
- "k8s.io/apimachinery/pkg/runtime/schema"
- "k8s.io/apimachinery/pkg/types"
- "k8s.io/apimachinery/pkg/util/runtime"
- "k8s.io/apimachinery/pkg/util/sets"
- "k8s.io/apimachinery/pkg/util/wait"
- utilfeature "k8s.io/apiserver/pkg/util/feature"
- appsinformers "k8s.io/client-go/informers/apps/v1"
- coreinformers "k8s.io/client-go/informers/core/v1"
- policyinformers "k8s.io/client-go/informers/policy/v1beta1"
- storageinformers "k8s.io/client-go/informers/storage/v1"
- clientset "k8s.io/client-go/kubernetes"
- appslisters "k8s.io/client-go/listers/apps/v1"
- corelisters "k8s.io/client-go/listers/core/v1"
- policylisters "k8s.io/client-go/listers/policy/v1beta1"
- storagelisters "k8s.io/client-go/listers/storage/v1"
- "k8s.io/client-go/tools/cache"
- "k8s.io/client-go/tools/record"
- "k8s.io/klog"
- podutil "k8s.io/kubernetes/pkg/api/v1/pod"
- "k8s.io/kubernetes/pkg/features"
- "k8s.io/kubernetes/pkg/scheduler/algorithm"
- "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
- "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
- schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
- "k8s.io/kubernetes/pkg/scheduler/api/validation"
- "k8s.io/kubernetes/pkg/scheduler/apis/config"
- "k8s.io/kubernetes/pkg/scheduler/core"
- framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
- internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
- cachedebugger "k8s.io/kubernetes/pkg/scheduler/internal/cache/debugger"
- internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
- "k8s.io/kubernetes/pkg/scheduler/volumebinder"
- )
- const (
- initialGetBackoff = 100 * time.Millisecond
- maximalGetBackoff = time.Minute
- )
- // Binder knows how to write a binding.
- type Binder interface {
- Bind(binding *v1.Binding) error
- }
- // PodConditionUpdater updates the condition of a pod based on the passed
- // PodCondition
- type PodConditionUpdater interface {
- Update(pod *v1.Pod, podCondition *v1.PodCondition) error
- }
- // Config is an implementation of the Scheduler's configured input data.
- // TODO over time we should make this struct a hidden implementation detail of the scheduler.
- type Config struct {
- // It is expected that changes made via SchedulerCache will be observed
- // by NodeLister and Algorithm.
- SchedulerCache internalcache.Cache
- NodeLister algorithm.NodeLister
- Algorithm core.ScheduleAlgorithm
- GetBinder func(pod *v1.Pod) Binder
- // PodConditionUpdater is used only in case of scheduling errors. If we succeed
- // with scheduling, PodScheduled condition will be updated in apiserver in /bind
- // handler so that binding and setting PodCondition it is atomic.
- PodConditionUpdater PodConditionUpdater
- // PodPreemptor is used to evict pods and update 'NominatedNode' field of
- // the preemptor pod.
- PodPreemptor PodPreemptor
- // Framework runs scheduler plugins at configured extension points.
- Framework framework.Framework
- // NextPod should be a function that blocks until the next pod
- // is available. We don't use a channel for this, because scheduling
- // a pod may take some amount of time and we don't want pods to get
- // stale while they sit in a channel.
- NextPod func() *v1.Pod
- // WaitForCacheSync waits for scheduler cache to populate.
- // It returns true if it was successful, false if the controller should shutdown.
- WaitForCacheSync func() bool
- // Error is called if there is an error. It is passed the pod in
- // question, and the error
- Error func(*v1.Pod, error)
- // Recorder is the EventRecorder to use
- Recorder record.EventRecorder
- // Close this to shut down the scheduler.
- StopEverything <-chan struct{}
- // VolumeBinder handles PVC/PV binding for the pod.
- VolumeBinder *volumebinder.VolumeBinder
- // Disable pod preemption or not.
- DisablePreemption bool
- // SchedulingQueue holds pods to be scheduled
- SchedulingQueue internalqueue.SchedulingQueue
- }
- // PodPreemptor has methods needed to delete a pod and to update 'NominatedPod'
- // field of the preemptor pod.
- type PodPreemptor interface {
- GetUpdatedPod(pod *v1.Pod) (*v1.Pod, error)
- DeletePod(pod *v1.Pod) error
- SetNominatedNodeName(pod *v1.Pod, nominatedNode string) error
- RemoveNominatedNodeName(pod *v1.Pod) error
- }
- // Configurator defines I/O, caching, and other functionality needed to
- // construct a new scheduler. An implementation of this can be seen in
- // factory.go.
- type Configurator interface {
- // Exposed for testing
- GetHardPodAffinitySymmetricWeight() int32
- // Predicate related accessors to be exposed for use by k8s.io/autoscaler/cluster-autoscaler
- GetPredicateMetadataProducer() (predicates.PredicateMetadataProducer, error)
- GetPredicates(predicateKeys sets.String) (map[string]predicates.FitPredicate, error)
- // Needs to be exposed for things like integration tests where we want to make fake nodes.
- GetNodeLister() corelisters.NodeLister
- // Exposed for testing
- GetClient() clientset.Interface
- // Exposed for testing
- GetScheduledPodLister() corelisters.PodLister
- Create() (*Config, error)
- CreateFromProvider(providerName string) (*Config, error)
- CreateFromConfig(policy schedulerapi.Policy) (*Config, error)
- CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*Config, error)
- }
- // configFactory is the default implementation of the scheduler.Configurator interface.
- type configFactory struct {
- client clientset.Interface
- // a means to list all known scheduled pods.
- scheduledPodLister corelisters.PodLister
- // a means to list all known scheduled pods and pods assumed to have been scheduled.
- podLister algorithm.PodLister
- // a means to list all nodes
- nodeLister corelisters.NodeLister
- // a means to list all PersistentVolumes
- pVLister corelisters.PersistentVolumeLister
- // a means to list all PersistentVolumeClaims
- pVCLister corelisters.PersistentVolumeClaimLister
- // a means to list all services
- serviceLister corelisters.ServiceLister
- // a means to list all controllers
- controllerLister corelisters.ReplicationControllerLister
- // a means to list all replicasets
- replicaSetLister appslisters.ReplicaSetLister
- // a means to list all statefulsets
- statefulSetLister appslisters.StatefulSetLister
- // a means to list all PodDisruptionBudgets
- pdbLister policylisters.PodDisruptionBudgetLister
- // a means to list all StorageClasses
- storageClassLister storagelisters.StorageClassLister
- // framework has a set of plugins and the context used for running them.
- framework framework.Framework
- // Close this to stop all reflectors
- StopEverything <-chan struct{}
- scheduledPodsHasSynced cache.InformerSynced
- schedulerCache internalcache.Cache
- // SchedulerName of a scheduler is used to select which pods will be
- // processed by this scheduler, based on pods's "spec.schedulerName".
- schedulerName string
- // RequiredDuringScheduling affinity is not symmetric, but there is an implicit PreferredDuringScheduling affinity rule
- // corresponding to every RequiredDuringScheduling affinity rule.
- // HardPodAffinitySymmetricWeight represents the weight of implicit PreferredDuringScheduling affinity rule, in the range 0-100.
- hardPodAffinitySymmetricWeight int32
- // Handles volume binding decisions
- volumeBinder *volumebinder.VolumeBinder
- // Always check all predicates even if the middle of one predicate fails.
- alwaysCheckAllPredicates bool
- // Disable pod preemption or not.
- disablePreemption bool
- // percentageOfNodesToScore specifies percentage of all nodes to score in each scheduling cycle.
- percentageOfNodesToScore int32
- bindTimeoutSeconds int64
- // queue for pods that need scheduling
- podQueue internalqueue.SchedulingQueue
- enableNonPreempting bool
- }
- // ConfigFactoryArgs is a set arguments passed to NewConfigFactory.
- type ConfigFactoryArgs struct {
- SchedulerName string
- Client clientset.Interface
- NodeInformer coreinformers.NodeInformer
- PodInformer coreinformers.PodInformer
- PvInformer coreinformers.PersistentVolumeInformer
- PvcInformer coreinformers.PersistentVolumeClaimInformer
- ReplicationControllerInformer coreinformers.ReplicationControllerInformer
- ReplicaSetInformer appsinformers.ReplicaSetInformer
- StatefulSetInformer appsinformers.StatefulSetInformer
- ServiceInformer coreinformers.ServiceInformer
- PdbInformer policyinformers.PodDisruptionBudgetInformer
- StorageClassInformer storageinformers.StorageClassInformer
- HardPodAffinitySymmetricWeight int32
- DisablePreemption bool
- PercentageOfNodesToScore int32
- BindTimeoutSeconds int64
- StopCh <-chan struct{}
- Registry framework.Registry
- Plugins *config.Plugins
- PluginConfig []config.PluginConfig
- }
- // NewConfigFactory initializes the default implementation of a Configurator. To encourage eventual privatization of the struct type, we only
- // return the interface.
- func NewConfigFactory(args *ConfigFactoryArgs) Configurator {
- stopEverything := args.StopCh
- if stopEverything == nil {
- stopEverything = wait.NeverStop
- }
- schedulerCache := internalcache.New(30*time.Second, stopEverything)
- framework, err := framework.NewFramework(args.Registry, args.Plugins, args.PluginConfig)
- if err != nil {
- klog.Fatalf("error initializing the scheduling framework: %v", err)
- }
- // storageClassInformer is only enabled through VolumeScheduling feature gate
- var storageClassLister storagelisters.StorageClassLister
- if args.StorageClassInformer != nil {
- storageClassLister = args.StorageClassInformer.Lister()
- }
- c := &configFactory{
- client: args.Client,
- podLister: schedulerCache,
- podQueue: internalqueue.NewSchedulingQueue(stopEverything, framework),
- nodeLister: args.NodeInformer.Lister(),
- pVLister: args.PvInformer.Lister(),
- pVCLister: args.PvcInformer.Lister(),
- serviceLister: args.ServiceInformer.Lister(),
- controllerLister: args.ReplicationControllerInformer.Lister(),
- replicaSetLister: args.ReplicaSetInformer.Lister(),
- statefulSetLister: args.StatefulSetInformer.Lister(),
- pdbLister: args.PdbInformer.Lister(),
- storageClassLister: storageClassLister,
- framework: framework,
- schedulerCache: schedulerCache,
- StopEverything: stopEverything,
- schedulerName: args.SchedulerName,
- hardPodAffinitySymmetricWeight: args.HardPodAffinitySymmetricWeight,
- disablePreemption: args.DisablePreemption,
- percentageOfNodesToScore: args.PercentageOfNodesToScore,
- bindTimeoutSeconds: args.BindTimeoutSeconds,
- enableNonPreempting: utilfeature.DefaultFeatureGate.Enabled(features.NonPreemptingPriority),
- }
- // Setup volume binder
- c.volumeBinder = volumebinder.NewVolumeBinder(args.Client, args.NodeInformer, args.PvcInformer, args.PvInformer, args.StorageClassInformer, time.Duration(args.BindTimeoutSeconds)*time.Second)
- c.scheduledPodsHasSynced = args.PodInformer.Informer().HasSynced
- // ScheduledPodLister is something we provide to plug-in functions that
- // they may need to call.
- c.scheduledPodLister = assignedPodLister{args.PodInformer.Lister()}
- // Setup cache debugger
- debugger := cachedebugger.New(
- args.NodeInformer.Lister(),
- args.PodInformer.Lister(),
- c.schedulerCache,
- c.podQueue,
- )
- debugger.ListenForSignal(c.StopEverything)
- go func() {
- <-c.StopEverything
- c.podQueue.Close()
- }()
- return c
- }
- // GetNodeStore provides the cache to the nodes, mostly internal use, but may also be called by mock-tests.
- func (c *configFactory) GetNodeLister() corelisters.NodeLister {
- return c.nodeLister
- }
- func (c *configFactory) GetHardPodAffinitySymmetricWeight() int32 {
- return c.hardPodAffinitySymmetricWeight
- }
- func (c *configFactory) GetSchedulerName() string {
- return c.schedulerName
- }
- // GetClient provides a kubernetes Client, mostly internal use, but may also be called by mock-tests.
- func (c *configFactory) GetClient() clientset.Interface {
- return c.client
- }
- // GetScheduledPodLister provides a pod lister, mostly internal use, but may also be called by mock-tests.
- func (c *configFactory) GetScheduledPodLister() corelisters.PodLister {
- return c.scheduledPodLister
- }
- // Create creates a scheduler with the default algorithm provider.
- func (c *configFactory) Create() (*Config, error) {
- return c.CreateFromProvider(DefaultProvider)
- }
- // Creates a scheduler from the name of a registered algorithm provider.
- func (c *configFactory) CreateFromProvider(providerName string) (*Config, error) {
- klog.V(2).Infof("Creating scheduler from algorithm provider '%v'", providerName)
- provider, err := GetAlgorithmProvider(providerName)
- if err != nil {
- return nil, err
- }
- return c.CreateFromKeys(provider.FitPredicateKeys, provider.PriorityFunctionKeys, []algorithm.SchedulerExtender{})
- }
- // Creates a scheduler from the configuration file
- func (c *configFactory) CreateFromConfig(policy schedulerapi.Policy) (*Config, error) {
- klog.V(2).Infof("Creating scheduler from configuration: %v", policy)
- // validate the policy configuration
- if err := validation.ValidatePolicy(policy); err != nil {
- return nil, err
- }
- predicateKeys := sets.NewString()
- if policy.Predicates == nil {
- klog.V(2).Infof("Using predicates from algorithm provider '%v'", DefaultProvider)
- provider, err := GetAlgorithmProvider(DefaultProvider)
- if err != nil {
- return nil, err
- }
- predicateKeys = provider.FitPredicateKeys
- } else {
- for _, predicate := range policy.Predicates {
- klog.V(2).Infof("Registering predicate: %s", predicate.Name)
- predicateKeys.Insert(RegisterCustomFitPredicate(predicate))
- }
- }
- priorityKeys := sets.NewString()
- if policy.Priorities == nil {
- klog.V(2).Infof("Using priorities from algorithm provider '%v'", DefaultProvider)
- provider, err := GetAlgorithmProvider(DefaultProvider)
- if err != nil {
- return nil, err
- }
- priorityKeys = provider.PriorityFunctionKeys
- } else {
- for _, priority := range policy.Priorities {
- klog.V(2).Infof("Registering priority: %s", priority.Name)
- priorityKeys.Insert(RegisterCustomPriorityFunction(priority))
- }
- }
- var extenders []algorithm.SchedulerExtender
- if len(policy.ExtenderConfigs) != 0 {
- ignoredExtendedResources := sets.NewString()
- var ignorableExtenders []algorithm.SchedulerExtender
- for ii := range policy.ExtenderConfigs {
- klog.V(2).Infof("Creating extender with config %+v", policy.ExtenderConfigs[ii])
- extender, err := core.NewHTTPExtender(&policy.ExtenderConfigs[ii])
- if err != nil {
- return nil, err
- }
- if !extender.IsIgnorable() {
- extenders = append(extenders, extender)
- } else {
- ignorableExtenders = append(ignorableExtenders, extender)
- }
- for _, r := range policy.ExtenderConfigs[ii].ManagedResources {
- if r.IgnoredByScheduler {
- ignoredExtendedResources.Insert(string(r.Name))
- }
- }
- }
- // place ignorable extenders to the tail of extenders
- extenders = append(extenders, ignorableExtenders...)
- predicates.RegisterPredicateMetadataProducerWithExtendedResourceOptions(ignoredExtendedResources)
- }
- // Providing HardPodAffinitySymmetricWeight in the policy config is the new and preferred way of providing the value.
- // Give it higher precedence than scheduler CLI configuration when it is provided.
- if policy.HardPodAffinitySymmetricWeight != 0 {
- c.hardPodAffinitySymmetricWeight = policy.HardPodAffinitySymmetricWeight
- }
- // When AlwaysCheckAllPredicates is set to true, scheduler checks all the configured
- // predicates even after one or more of them fails.
- if policy.AlwaysCheckAllPredicates {
- c.alwaysCheckAllPredicates = policy.AlwaysCheckAllPredicates
- }
- return c.CreateFromKeys(predicateKeys, priorityKeys, extenders)
- }
- // Creates a scheduler from a set of registered fit predicate keys and priority keys.
- func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*Config, error) {
- klog.V(2).Infof("Creating scheduler with fit predicates '%v' and priority functions '%v'", predicateKeys, priorityKeys)
- if c.GetHardPodAffinitySymmetricWeight() < 1 || c.GetHardPodAffinitySymmetricWeight() > 100 {
- return nil, fmt.Errorf("invalid hardPodAffinitySymmetricWeight: %d, must be in the range 1-100", c.GetHardPodAffinitySymmetricWeight())
- }
- predicateFuncs, err := c.GetPredicates(predicateKeys)
- if err != nil {
- return nil, err
- }
- priorityConfigs, err := c.GetPriorityFunctionConfigs(priorityKeys)
- if err != nil {
- return nil, err
- }
- priorityMetaProducer, err := c.GetPriorityMetadataProducer()
- if err != nil {
- return nil, err
- }
- predicateMetaProducer, err := c.GetPredicateMetadataProducer()
- if err != nil {
- return nil, err
- }
- algo := core.NewGenericScheduler(
- c.schedulerCache,
- c.podQueue,
- predicateFuncs,
- predicateMetaProducer,
- priorityConfigs,
- priorityMetaProducer,
- c.framework,
- extenders,
- c.volumeBinder,
- c.pVCLister,
- c.pdbLister,
- c.alwaysCheckAllPredicates,
- c.disablePreemption,
- c.percentageOfNodesToScore,
- c.enableNonPreempting,
- )
- return &Config{
- SchedulerCache: c.schedulerCache,
- // The scheduler only needs to consider schedulable nodes.
- NodeLister: &nodeLister{c.nodeLister},
- Algorithm: algo,
- GetBinder: getBinderFunc(c.client, extenders),
- PodConditionUpdater: &podConditionUpdater{c.client},
- PodPreemptor: &podPreemptor{c.client},
- Framework: c.framework,
- WaitForCacheSync: func() bool {
- return cache.WaitForCacheSync(c.StopEverything, c.scheduledPodsHasSynced)
- },
- NextPod: internalqueue.MakeNextPodFunc(c.podQueue),
- Error: MakeDefaultErrorFunc(c.client, c.podQueue, c.schedulerCache, c.StopEverything),
- StopEverything: c.StopEverything,
- VolumeBinder: c.volumeBinder,
- SchedulingQueue: c.podQueue,
- }, nil
- }
- // getBinderFunc returns a func which returns an extender that supports bind or a default binder based on the given pod.
- func getBinderFunc(client clientset.Interface, extenders []algorithm.SchedulerExtender) func(pod *v1.Pod) Binder {
- var extenderBinder algorithm.SchedulerExtender
- for i := range extenders {
- if extenders[i].IsBinder() {
- extenderBinder = extenders[i]
- break
- }
- }
- defaultBinder := &binder{client}
- return func(pod *v1.Pod) Binder {
- if extenderBinder != nil && extenderBinder.IsInterested(pod) {
- return extenderBinder
- }
- return defaultBinder
- }
- }
- type nodeLister struct {
- corelisters.NodeLister
- }
- func (n *nodeLister) List() ([]*v1.Node, error) {
- return n.NodeLister.List(labels.Everything())
- }
- func (c *configFactory) GetPriorityFunctionConfigs(priorityKeys sets.String) ([]priorities.PriorityConfig, error) {
- pluginArgs, err := c.getPluginArgs()
- if err != nil {
- return nil, err
- }
- return getPriorityFunctionConfigs(priorityKeys, *pluginArgs)
- }
- func (c *configFactory) GetPriorityMetadataProducer() (priorities.PriorityMetadataProducer, error) {
- pluginArgs, err := c.getPluginArgs()
- if err != nil {
- return nil, err
- }
- return getPriorityMetadataProducer(*pluginArgs)
- }
- func (c *configFactory) GetPredicateMetadataProducer() (predicates.PredicateMetadataProducer, error) {
- pluginArgs, err := c.getPluginArgs()
- if err != nil {
- return nil, err
- }
- return getPredicateMetadataProducer(*pluginArgs)
- }
- func (c *configFactory) GetPredicates(predicateKeys sets.String) (map[string]predicates.FitPredicate, error) {
- pluginArgs, err := c.getPluginArgs()
- if err != nil {
- return nil, err
- }
- return getFitPredicateFunctions(predicateKeys, *pluginArgs)
- }
- func (c *configFactory) getPluginArgs() (*PluginFactoryArgs, error) {
- return &PluginFactoryArgs{
- PodLister: c.podLister,
- ServiceLister: c.serviceLister,
- ControllerLister: c.controllerLister,
- ReplicaSetLister: c.replicaSetLister,
- StatefulSetLister: c.statefulSetLister,
- NodeLister: &nodeLister{c.nodeLister},
- PDBLister: c.pdbLister,
- NodeInfo: &predicates.CachedNodeInfo{NodeLister: c.nodeLister},
- PVInfo: &predicates.CachedPersistentVolumeInfo{PersistentVolumeLister: c.pVLister},
- PVCInfo: &predicates.CachedPersistentVolumeClaimInfo{PersistentVolumeClaimLister: c.pVCLister},
- StorageClassInfo: &predicates.CachedStorageClassInfo{StorageClassLister: c.storageClassLister},
- VolumeBinder: c.volumeBinder,
- HardPodAffinitySymmetricWeight: c.hardPodAffinitySymmetricWeight,
- }, nil
- }
- // assignedPodLister filters the pods returned from a PodLister to
- // only include those that have a node name set.
- type assignedPodLister struct {
- corelisters.PodLister
- }
- // List lists all Pods in the indexer for a given namespace.
- func (l assignedPodLister) List(selector labels.Selector) ([]*v1.Pod, error) {
- list, err := l.PodLister.List(selector)
- if err != nil {
- return nil, err
- }
- filtered := make([]*v1.Pod, 0, len(list))
- for _, pod := range list {
- if len(pod.Spec.NodeName) > 0 {
- filtered = append(filtered, pod)
- }
- }
- return filtered, nil
- }
- // List lists all Pods in the indexer for a given namespace.
- func (l assignedPodLister) Pods(namespace string) corelisters.PodNamespaceLister {
- return assignedPodNamespaceLister{l.PodLister.Pods(namespace)}
- }
- // assignedPodNamespaceLister filters the pods returned from a PodNamespaceLister to
- // only include those that have a node name set.
- type assignedPodNamespaceLister struct {
- corelisters.PodNamespaceLister
- }
- // List lists all Pods in the indexer for a given namespace.
- func (l assignedPodNamespaceLister) List(selector labels.Selector) (ret []*v1.Pod, err error) {
- list, err := l.PodNamespaceLister.List(selector)
- if err != nil {
- return nil, err
- }
- filtered := make([]*v1.Pod, 0, len(list))
- for _, pod := range list {
- if len(pod.Spec.NodeName) > 0 {
- filtered = append(filtered, pod)
- }
- }
- return filtered, nil
- }
- // Get retrieves the Pod from the indexer for a given namespace and name.
- func (l assignedPodNamespaceLister) Get(name string) (*v1.Pod, error) {
- pod, err := l.PodNamespaceLister.Get(name)
- if err != nil {
- return nil, err
- }
- if len(pod.Spec.NodeName) > 0 {
- return pod, nil
- }
- return nil, errors.NewNotFound(schema.GroupResource{Resource: string(v1.ResourcePods)}, name)
- }
- type podInformer struct {
- informer cache.SharedIndexInformer
- }
- func (i *podInformer) Informer() cache.SharedIndexInformer {
- return i.informer
- }
- func (i *podInformer) Lister() corelisters.PodLister {
- return corelisters.NewPodLister(i.informer.GetIndexer())
- }
- // NewPodInformer creates a shared index informer that returns only non-terminal pods.
- func NewPodInformer(client clientset.Interface, resyncPeriod time.Duration) coreinformers.PodInformer {
- selector := fields.ParseSelectorOrDie(
- "status.phase!=" + string(v1.PodSucceeded) +
- ",status.phase!=" + string(v1.PodFailed))
- lw := cache.NewListWatchFromClient(client.CoreV1().RESTClient(), string(v1.ResourcePods), metav1.NamespaceAll, selector)
- return &podInformer{
- informer: cache.NewSharedIndexInformer(lw, &v1.Pod{}, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}),
- }
- }
- // MakeDefaultErrorFunc construct a function to handle pod scheduler error
- func MakeDefaultErrorFunc(client clientset.Interface, podQueue internalqueue.SchedulingQueue, schedulerCache internalcache.Cache, stopEverything <-chan struct{}) func(pod *v1.Pod, err error) {
- return func(pod *v1.Pod, err error) {
- if err == core.ErrNoNodesAvailable {
- klog.V(4).Infof("Unable to schedule %v/%v: no nodes are registered to the cluster; waiting", pod.Namespace, pod.Name)
- } else {
- if _, ok := err.(*core.FitError); ok {
- klog.V(4).Infof("Unable to schedule %v/%v: no fit: %v; waiting", pod.Namespace, pod.Name, err)
- } else if errors.IsNotFound(err) {
- if errStatus, ok := err.(errors.APIStatus); ok && errStatus.Status().Details.Kind == "node" {
- nodeName := errStatus.Status().Details.Name
- // when node is not found, We do not remove the node right away. Trying again to get
- // the node and if the node is still not found, then remove it from the scheduler cache.
- _, err := client.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{})
- if err != nil && errors.IsNotFound(err) {
- node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}}
- schedulerCache.RemoveNode(&node)
- }
- }
- } else {
- klog.Errorf("Error scheduling %v/%v: %v; retrying", pod.Namespace, pod.Name, err)
- }
- }
- podSchedulingCycle := podQueue.SchedulingCycle()
- // Retry asynchronously.
- // Note that this is extremely rudimentary and we need a more real error handling path.
- go func() {
- defer runtime.HandleCrash()
- podID := types.NamespacedName{
- Namespace: pod.Namespace,
- Name: pod.Name,
- }
- // An unschedulable pod will be placed in the unschedulable queue.
- // This ensures that if the pod is nominated to run on a node,
- // scheduler takes the pod into account when running predicates for the node.
- // Get the pod again; it may have changed/been scheduled already.
- getBackoff := initialGetBackoff
- for {
- pod, err := client.CoreV1().Pods(podID.Namespace).Get(podID.Name, metav1.GetOptions{})
- if err == nil {
- if len(pod.Spec.NodeName) == 0 {
- if err := podQueue.AddUnschedulableIfNotPresent(pod, podSchedulingCycle); err != nil {
- klog.Error(err)
- }
- }
- break
- }
- if errors.IsNotFound(err) {
- klog.Warningf("A pod %v no longer exists", podID)
- return
- }
- klog.Errorf("Error getting pod %v for retry: %v; retrying...", podID, err)
- if getBackoff = getBackoff * 2; getBackoff > maximalGetBackoff {
- getBackoff = maximalGetBackoff
- }
- time.Sleep(getBackoff)
- }
- }()
- }
- }
- type binder struct {
- Client clientset.Interface
- }
- // Bind just does a POST binding RPC.
- func (b *binder) Bind(binding *v1.Binding) error {
- klog.V(3).Infof("Attempting to bind %v to %v", binding.Name, binding.Target.Name)
- return b.Client.CoreV1().Pods(binding.Namespace).Bind(binding)
- }
- type podConditionUpdater struct {
- Client clientset.Interface
- }
- func (p *podConditionUpdater) Update(pod *v1.Pod, condition *v1.PodCondition) error {
- klog.V(3).Infof("Updating pod condition for %s/%s to (%s==%s, Reason=%s)", pod.Namespace, pod.Name, condition.Type, condition.Status, condition.Reason)
- if podutil.UpdatePodCondition(&pod.Status, condition) {
- _, err := p.Client.CoreV1().Pods(pod.Namespace).UpdateStatus(pod)
- return err
- }
- return nil
- }
- type podPreemptor struct {
- Client clientset.Interface
- }
- func (p *podPreemptor) GetUpdatedPod(pod *v1.Pod) (*v1.Pod, error) {
- return p.Client.CoreV1().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{})
- }
- func (p *podPreemptor) DeletePod(pod *v1.Pod) error {
- return p.Client.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &metav1.DeleteOptions{})
- }
- func (p *podPreemptor) SetNominatedNodeName(pod *v1.Pod, nominatedNodeName string) error {
- podCopy := pod.DeepCopy()
- podCopy.Status.NominatedNodeName = nominatedNodeName
- _, err := p.Client.CoreV1().Pods(pod.Namespace).UpdateStatus(podCopy)
- return err
- }
- func (p *podPreemptor) RemoveNominatedNodeName(pod *v1.Pod) error {
- if len(pod.Status.NominatedNodeName) == 0 {
- return nil
- }
- return p.SetNominatedNodeName(pod, "")
- }
|