factory.go 28 KB


  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // Package factory can set up a scheduler. This code is here instead of
  14. // cmd/scheduler for both testability and reuse.
  15. package factory
  16. import (
  17. "fmt"
  18. "time"
  19. v1 "k8s.io/api/core/v1"
  20. "k8s.io/apimachinery/pkg/api/errors"
  21. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  22. "k8s.io/apimachinery/pkg/fields"
  23. "k8s.io/apimachinery/pkg/labels"
  24. "k8s.io/apimachinery/pkg/runtime/schema"
  25. "k8s.io/apimachinery/pkg/types"
  26. "k8s.io/apimachinery/pkg/util/runtime"
  27. "k8s.io/apimachinery/pkg/util/sets"
  28. "k8s.io/apimachinery/pkg/util/wait"
  29. utilfeature "k8s.io/apiserver/pkg/util/feature"
  30. appsinformers "k8s.io/client-go/informers/apps/v1"
  31. coreinformers "k8s.io/client-go/informers/core/v1"
  32. policyinformers "k8s.io/client-go/informers/policy/v1beta1"
  33. storageinformers "k8s.io/client-go/informers/storage/v1"
  34. clientset "k8s.io/client-go/kubernetes"
  35. appslisters "k8s.io/client-go/listers/apps/v1"
  36. corelisters "k8s.io/client-go/listers/core/v1"
  37. policylisters "k8s.io/client-go/listers/policy/v1beta1"
  38. storagelisters "k8s.io/client-go/listers/storage/v1"
  39. "k8s.io/client-go/tools/cache"
  40. "k8s.io/client-go/tools/record"
  41. "k8s.io/klog"
  42. podutil "k8s.io/kubernetes/pkg/api/v1/pod"
  43. "k8s.io/kubernetes/pkg/features"
  44. "k8s.io/kubernetes/pkg/scheduler/algorithm"
  45. "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
  46. "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
  47. schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
  48. "k8s.io/kubernetes/pkg/scheduler/api/validation"
  49. "k8s.io/kubernetes/pkg/scheduler/apis/config"
  50. "k8s.io/kubernetes/pkg/scheduler/core"
  51. framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
  52. internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
  53. cachedebugger "k8s.io/kubernetes/pkg/scheduler/internal/cache/debugger"
  54. internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
  55. "k8s.io/kubernetes/pkg/scheduler/volumebinder"
  56. )
  57. const (
  58. initialGetBackoff = 100 * time.Millisecond
  59. maximalGetBackoff = time.Minute
  60. )
  61. // Binder knows how to write a binding.
  62. type Binder interface {
  63. Bind(binding *v1.Binding) error
  64. }
  65. // PodConditionUpdater updates the condition of a pod based on the passed
  66. // PodCondition
  67. type PodConditionUpdater interface {
  68. Update(pod *v1.Pod, podCondition *v1.PodCondition) error
  69. }
  70. // Config is an implementation of the Scheduler's configured input data.
  71. // TODO over time we should make this struct a hidden implementation detail of the scheduler.
  72. type Config struct {
  73. // It is expected that changes made via SchedulerCache will be observed
  74. // by NodeLister and Algorithm.
  75. SchedulerCache internalcache.Cache
  76. NodeLister algorithm.NodeLister
  77. Algorithm core.ScheduleAlgorithm
  78. GetBinder func(pod *v1.Pod) Binder
  79. // PodConditionUpdater is used only in case of scheduling errors. If we succeed
  80. // with scheduling, PodScheduled condition will be updated in apiserver in /bind
  81. // handler so that binding and setting PodCondition it is atomic.
  82. PodConditionUpdater PodConditionUpdater
  83. // PodPreemptor is used to evict pods and update 'NominatedNode' field of
  84. // the preemptor pod.
  85. PodPreemptor PodPreemptor
  86. // Framework runs scheduler plugins at configured extension points.
  87. Framework framework.Framework
  88. // NextPod should be a function that blocks until the next pod
  89. // is available. We don't use a channel for this, because scheduling
  90. // a pod may take some amount of time and we don't want pods to get
  91. // stale while they sit in a channel.
  92. NextPod func() *v1.Pod
  93. // WaitForCacheSync waits for scheduler cache to populate.
  94. // It returns true if it was successful, false if the controller should shutdown.
  95. WaitForCacheSync func() bool
  96. // Error is called if there is an error. It is passed the pod in
  97. // question, and the error
  98. Error func(*v1.Pod, error)
  99. // Recorder is the EventRecorder to use
  100. Recorder record.EventRecorder
  101. // Close this to shut down the scheduler.
  102. StopEverything <-chan struct{}
  103. // VolumeBinder handles PVC/PV binding for the pod.
  104. VolumeBinder *volumebinder.VolumeBinder
  105. // Disable pod preemption or not.
  106. DisablePreemption bool
  107. // SchedulingQueue holds pods to be scheduled
  108. SchedulingQueue internalqueue.SchedulingQueue
  109. }
  110. // PodPreemptor has methods needed to delete a pod and to update 'NominatedPod'
  111. // field of the preemptor pod.
  112. type PodPreemptor interface {
  113. GetUpdatedPod(pod *v1.Pod) (*v1.Pod, error)
  114. DeletePod(pod *v1.Pod) error
  115. SetNominatedNodeName(pod *v1.Pod, nominatedNode string) error
  116. RemoveNominatedNodeName(pod *v1.Pod) error
  117. }
  118. // Configurator defines I/O, caching, and other functionality needed to
  119. // construct a new scheduler. An implementation of this can be seen in
  120. // factory.go.
  121. type Configurator interface {
  122. // Exposed for testing
  123. GetHardPodAffinitySymmetricWeight() int32
  124. // Predicate related accessors to be exposed for use by k8s.io/autoscaler/cluster-autoscaler
  125. GetPredicateMetadataProducer() (predicates.PredicateMetadataProducer, error)
  126. GetPredicates(predicateKeys sets.String) (map[string]predicates.FitPredicate, error)
  127. // Needs to be exposed for things like integration tests where we want to make fake nodes.
  128. GetNodeLister() corelisters.NodeLister
  129. // Exposed for testing
  130. GetClient() clientset.Interface
  131. // Exposed for testing
  132. GetScheduledPodLister() corelisters.PodLister
  133. Create() (*Config, error)
  134. CreateFromProvider(providerName string) (*Config, error)
  135. CreateFromConfig(policy schedulerapi.Policy) (*Config, error)
  136. CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*Config, error)
  137. }
  138. // configFactory is the default implementation of the scheduler.Configurator interface.
  139. type configFactory struct {
  140. client clientset.Interface
  141. // a means to list all known scheduled pods.
  142. scheduledPodLister corelisters.PodLister
  143. // a means to list all known scheduled pods and pods assumed to have been scheduled.
  144. podLister algorithm.PodLister
  145. // a means to list all nodes
  146. nodeLister corelisters.NodeLister
  147. // a means to list all PersistentVolumes
  148. pVLister corelisters.PersistentVolumeLister
  149. // a means to list all PersistentVolumeClaims
  150. pVCLister corelisters.PersistentVolumeClaimLister
  151. // a means to list all services
  152. serviceLister corelisters.ServiceLister
  153. // a means to list all controllers
  154. controllerLister corelisters.ReplicationControllerLister
  155. // a means to list all replicasets
  156. replicaSetLister appslisters.ReplicaSetLister
  157. // a means to list all statefulsets
  158. statefulSetLister appslisters.StatefulSetLister
  159. // a means to list all PodDisruptionBudgets
  160. pdbLister policylisters.PodDisruptionBudgetLister
  161. // a means to list all StorageClasses
  162. storageClassLister storagelisters.StorageClassLister
  163. // framework has a set of plugins and the context used for running them.
  164. framework framework.Framework
  165. // Close this to stop all reflectors
  166. StopEverything <-chan struct{}
  167. scheduledPodsHasSynced cache.InformerSynced
  168. schedulerCache internalcache.Cache
  169. // SchedulerName of a scheduler is used to select which pods will be
  170. // processed by this scheduler, based on pods's "spec.schedulerName".
  171. schedulerName string
  172. // RequiredDuringScheduling affinity is not symmetric, but there is an implicit PreferredDuringScheduling affinity rule
  173. // corresponding to every RequiredDuringScheduling affinity rule.
  174. // HardPodAffinitySymmetricWeight represents the weight of implicit PreferredDuringScheduling affinity rule, in the range 0-100.
  175. hardPodAffinitySymmetricWeight int32
  176. // Handles volume binding decisions
  177. volumeBinder *volumebinder.VolumeBinder
  178. // Always check all predicates even if the middle of one predicate fails.
  179. alwaysCheckAllPredicates bool
  180. // Disable pod preemption or not.
  181. disablePreemption bool
  182. // percentageOfNodesToScore specifies percentage of all nodes to score in each scheduling cycle.
  183. percentageOfNodesToScore int32
  184. bindTimeoutSeconds int64
  185. // queue for pods that need scheduling
  186. podQueue internalqueue.SchedulingQueue
  187. enableNonPreempting bool
  188. }
  189. // ConfigFactoryArgs is a set arguments passed to NewConfigFactory.
  190. type ConfigFactoryArgs struct {
  191. SchedulerName string
  192. Client clientset.Interface
  193. NodeInformer coreinformers.NodeInformer
  194. PodInformer coreinformers.PodInformer
  195. PvInformer coreinformers.PersistentVolumeInformer
  196. PvcInformer coreinformers.PersistentVolumeClaimInformer
  197. ReplicationControllerInformer coreinformers.ReplicationControllerInformer
  198. ReplicaSetInformer appsinformers.ReplicaSetInformer
  199. StatefulSetInformer appsinformers.StatefulSetInformer
  200. ServiceInformer coreinformers.ServiceInformer
  201. PdbInformer policyinformers.PodDisruptionBudgetInformer
  202. StorageClassInformer storageinformers.StorageClassInformer
  203. HardPodAffinitySymmetricWeight int32
  204. DisablePreemption bool
  205. PercentageOfNodesToScore int32
  206. BindTimeoutSeconds int64
  207. StopCh <-chan struct{}
  208. Registry framework.Registry
  209. Plugins *config.Plugins
  210. PluginConfig []config.PluginConfig
  211. }
  212. // NewConfigFactory initializes the default implementation of a Configurator. To encourage eventual privatization of the struct type, we only
  213. // return the interface.
  214. func NewConfigFactory(args *ConfigFactoryArgs) Configurator {
  215. stopEverything := args.StopCh
  216. if stopEverything == nil {
  217. stopEverything = wait.NeverStop
  218. }
  219. schedulerCache := internalcache.New(30*time.Second, stopEverything)
  220. framework, err := framework.NewFramework(args.Registry, args.Plugins, args.PluginConfig)
  221. if err != nil {
  222. klog.Fatalf("error initializing the scheduling framework: %v", err)
  223. }
  224. // storageClassInformer is only enabled through VolumeScheduling feature gate
  225. var storageClassLister storagelisters.StorageClassLister
  226. if args.StorageClassInformer != nil {
  227. storageClassLister = args.StorageClassInformer.Lister()
  228. }
  229. c := &configFactory{
  230. client: args.Client,
  231. podLister: schedulerCache,
  232. podQueue: internalqueue.NewSchedulingQueue(stopEverything, framework),
  233. nodeLister: args.NodeInformer.Lister(),
  234. pVLister: args.PvInformer.Lister(),
  235. pVCLister: args.PvcInformer.Lister(),
  236. serviceLister: args.ServiceInformer.Lister(),
  237. controllerLister: args.ReplicationControllerInformer.Lister(),
  238. replicaSetLister: args.ReplicaSetInformer.Lister(),
  239. statefulSetLister: args.StatefulSetInformer.Lister(),
  240. pdbLister: args.PdbInformer.Lister(),
  241. storageClassLister: storageClassLister,
  242. framework: framework,
  243. schedulerCache: schedulerCache,
  244. StopEverything: stopEverything,
  245. schedulerName: args.SchedulerName,
  246. hardPodAffinitySymmetricWeight: args.HardPodAffinitySymmetricWeight,
  247. disablePreemption: args.DisablePreemption,
  248. percentageOfNodesToScore: args.PercentageOfNodesToScore,
  249. bindTimeoutSeconds: args.BindTimeoutSeconds,
  250. enableNonPreempting: utilfeature.DefaultFeatureGate.Enabled(features.NonPreemptingPriority),
  251. }
  252. // Setup volume binder
  253. c.volumeBinder = volumebinder.NewVolumeBinder(args.Client, args.NodeInformer, args.PvcInformer, args.PvInformer, args.StorageClassInformer, time.Duration(args.BindTimeoutSeconds)*time.Second)
  254. c.scheduledPodsHasSynced = args.PodInformer.Informer().HasSynced
  255. // ScheduledPodLister is something we provide to plug-in functions that
  256. // they may need to call.
  257. c.scheduledPodLister = assignedPodLister{args.PodInformer.Lister()}
  258. // Setup cache debugger
  259. debugger := cachedebugger.New(
  260. args.NodeInformer.Lister(),
  261. args.PodInformer.Lister(),
  262. c.schedulerCache,
  263. c.podQueue,
  264. )
  265. debugger.ListenForSignal(c.StopEverything)
  266. go func() {
  267. <-c.StopEverything
  268. c.podQueue.Close()
  269. }()
  270. return c
  271. }
  272. // GetNodeStore provides the cache to the nodes, mostly internal use, but may also be called by mock-tests.
  273. func (c *configFactory) GetNodeLister() corelisters.NodeLister {
  274. return c.nodeLister
  275. }
  276. func (c *configFactory) GetHardPodAffinitySymmetricWeight() int32 {
  277. return c.hardPodAffinitySymmetricWeight
  278. }
  279. func (c *configFactory) GetSchedulerName() string {
  280. return c.schedulerName
  281. }
  282. // GetClient provides a kubernetes Client, mostly internal use, but may also be called by mock-tests.
  283. func (c *configFactory) GetClient() clientset.Interface {
  284. return c.client
  285. }
  286. // GetScheduledPodLister provides a pod lister, mostly internal use, but may also be called by mock-tests.
  287. func (c *configFactory) GetScheduledPodLister() corelisters.PodLister {
  288. return c.scheduledPodLister
  289. }
  290. // Create creates a scheduler with the default algorithm provider.
  291. func (c *configFactory) Create() (*Config, error) {
  292. return c.CreateFromProvider(DefaultProvider)
  293. }
  294. // Creates a scheduler from the name of a registered algorithm provider.
  295. func (c *configFactory) CreateFromProvider(providerName string) (*Config, error) {
  296. klog.V(2).Infof("Creating scheduler from algorithm provider '%v'", providerName)
  297. provider, err := GetAlgorithmProvider(providerName)
  298. if err != nil {
  299. return nil, err
  300. }
  301. return c.CreateFromKeys(provider.FitPredicateKeys, provider.PriorityFunctionKeys, []algorithm.SchedulerExtender{})
  302. }
  303. // Creates a scheduler from the configuration file
  304. func (c *configFactory) CreateFromConfig(policy schedulerapi.Policy) (*Config, error) {
  305. klog.V(2).Infof("Creating scheduler from configuration: %v", policy)
  306. // validate the policy configuration
  307. if err := validation.ValidatePolicy(policy); err != nil {
  308. return nil, err
  309. }
  310. predicateKeys := sets.NewString()
  311. if policy.Predicates == nil {
  312. klog.V(2).Infof("Using predicates from algorithm provider '%v'", DefaultProvider)
  313. provider, err := GetAlgorithmProvider(DefaultProvider)
  314. if err != nil {
  315. return nil, err
  316. }
  317. predicateKeys = provider.FitPredicateKeys
  318. } else {
  319. for _, predicate := range policy.Predicates {
  320. klog.V(2).Infof("Registering predicate: %s", predicate.Name)
  321. predicateKeys.Insert(RegisterCustomFitPredicate(predicate))
  322. }
  323. }
  324. priorityKeys := sets.NewString()
  325. if policy.Priorities == nil {
  326. klog.V(2).Infof("Using priorities from algorithm provider '%v'", DefaultProvider)
  327. provider, err := GetAlgorithmProvider(DefaultProvider)
  328. if err != nil {
  329. return nil, err
  330. }
  331. priorityKeys = provider.PriorityFunctionKeys
  332. } else {
  333. for _, priority := range policy.Priorities {
  334. klog.V(2).Infof("Registering priority: %s", priority.Name)
  335. priorityKeys.Insert(RegisterCustomPriorityFunction(priority))
  336. }
  337. }
  338. var extenders []algorithm.SchedulerExtender
  339. if len(policy.ExtenderConfigs) != 0 {
  340. ignoredExtendedResources := sets.NewString()
  341. var ignorableExtenders []algorithm.SchedulerExtender
  342. for ii := range policy.ExtenderConfigs {
  343. klog.V(2).Infof("Creating extender with config %+v", policy.ExtenderConfigs[ii])
  344. extender, err := core.NewHTTPExtender(&policy.ExtenderConfigs[ii])
  345. if err != nil {
  346. return nil, err
  347. }
  348. if !extender.IsIgnorable() {
  349. extenders = append(extenders, extender)
  350. } else {
  351. ignorableExtenders = append(ignorableExtenders, extender)
  352. }
  353. for _, r := range policy.ExtenderConfigs[ii].ManagedResources {
  354. if r.IgnoredByScheduler {
  355. ignoredExtendedResources.Insert(string(r.Name))
  356. }
  357. }
  358. }
  359. // place ignorable extenders to the tail of extenders
  360. extenders = append(extenders, ignorableExtenders...)
  361. predicates.RegisterPredicateMetadataProducerWithExtendedResourceOptions(ignoredExtendedResources)
  362. }
  363. // Providing HardPodAffinitySymmetricWeight in the policy config is the new and preferred way of providing the value.
  364. // Give it higher precedence than scheduler CLI configuration when it is provided.
  365. if policy.HardPodAffinitySymmetricWeight != 0 {
  366. c.hardPodAffinitySymmetricWeight = policy.HardPodAffinitySymmetricWeight
  367. }
  368. // When AlwaysCheckAllPredicates is set to true, scheduler checks all the configured
  369. // predicates even after one or more of them fails.
  370. if policy.AlwaysCheckAllPredicates {
  371. c.alwaysCheckAllPredicates = policy.AlwaysCheckAllPredicates
  372. }
  373. return c.CreateFromKeys(predicateKeys, priorityKeys, extenders)
  374. }
  375. // Creates a scheduler from a set of registered fit predicate keys and priority keys.
  376. func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*Config, error) {
  377. klog.V(2).Infof("Creating scheduler with fit predicates '%v' and priority functions '%v'", predicateKeys, priorityKeys)
  378. if c.GetHardPodAffinitySymmetricWeight() < 1 || c.GetHardPodAffinitySymmetricWeight() > 100 {
  379. return nil, fmt.Errorf("invalid hardPodAffinitySymmetricWeight: %d, must be in the range 1-100", c.GetHardPodAffinitySymmetricWeight())
  380. }
  381. predicateFuncs, err := c.GetPredicates(predicateKeys)
  382. if err != nil {
  383. return nil, err
  384. }
  385. priorityConfigs, err := c.GetPriorityFunctionConfigs(priorityKeys)
  386. if err != nil {
  387. return nil, err
  388. }
  389. priorityMetaProducer, err := c.GetPriorityMetadataProducer()
  390. if err != nil {
  391. return nil, err
  392. }
  393. predicateMetaProducer, err := c.GetPredicateMetadataProducer()
  394. if err != nil {
  395. return nil, err
  396. }
  397. algo := core.NewGenericScheduler(
  398. c.schedulerCache,
  399. c.podQueue,
  400. predicateFuncs,
  401. predicateMetaProducer,
  402. priorityConfigs,
  403. priorityMetaProducer,
  404. c.framework,
  405. extenders,
  406. c.volumeBinder,
  407. c.pVCLister,
  408. c.pdbLister,
  409. c.alwaysCheckAllPredicates,
  410. c.disablePreemption,
  411. c.percentageOfNodesToScore,
  412. c.enableNonPreempting,
  413. )
  414. return &Config{
  415. SchedulerCache: c.schedulerCache,
  416. // The scheduler only needs to consider schedulable nodes.
  417. NodeLister: &nodeLister{c.nodeLister},
  418. Algorithm: algo,
  419. GetBinder: getBinderFunc(c.client, extenders),
  420. PodConditionUpdater: &podConditionUpdater{c.client},
  421. PodPreemptor: &podPreemptor{c.client},
  422. Framework: c.framework,
  423. WaitForCacheSync: func() bool {
  424. return cache.WaitForCacheSync(c.StopEverything, c.scheduledPodsHasSynced)
  425. },
  426. NextPod: internalqueue.MakeNextPodFunc(c.podQueue),
  427. Error: MakeDefaultErrorFunc(c.client, c.podQueue, c.schedulerCache, c.StopEverything),
  428. StopEverything: c.StopEverything,
  429. VolumeBinder: c.volumeBinder,
  430. SchedulingQueue: c.podQueue,
  431. }, nil
  432. }
  433. // getBinderFunc returns a func which returns an extender that supports bind or a default binder based on the given pod.
  434. func getBinderFunc(client clientset.Interface, extenders []algorithm.SchedulerExtender) func(pod *v1.Pod) Binder {
  435. var extenderBinder algorithm.SchedulerExtender
  436. for i := range extenders {
  437. if extenders[i].IsBinder() {
  438. extenderBinder = extenders[i]
  439. break
  440. }
  441. }
  442. defaultBinder := &binder{client}
  443. return func(pod *v1.Pod) Binder {
  444. if extenderBinder != nil && extenderBinder.IsInterested(pod) {
  445. return extenderBinder
  446. }
  447. return defaultBinder
  448. }
  449. }
  450. type nodeLister struct {
  451. corelisters.NodeLister
  452. }
  453. func (n *nodeLister) List() ([]*v1.Node, error) {
  454. return n.NodeLister.List(labels.Everything())
  455. }
  456. func (c *configFactory) GetPriorityFunctionConfigs(priorityKeys sets.String) ([]priorities.PriorityConfig, error) {
  457. pluginArgs, err := c.getPluginArgs()
  458. if err != nil {
  459. return nil, err
  460. }
  461. return getPriorityFunctionConfigs(priorityKeys, *pluginArgs)
  462. }
  463. func (c *configFactory) GetPriorityMetadataProducer() (priorities.PriorityMetadataProducer, error) {
  464. pluginArgs, err := c.getPluginArgs()
  465. if err != nil {
  466. return nil, err
  467. }
  468. return getPriorityMetadataProducer(*pluginArgs)
  469. }
  470. func (c *configFactory) GetPredicateMetadataProducer() (predicates.PredicateMetadataProducer, error) {
  471. pluginArgs, err := c.getPluginArgs()
  472. if err != nil {
  473. return nil, err
  474. }
  475. return getPredicateMetadataProducer(*pluginArgs)
  476. }
  477. func (c *configFactory) GetPredicates(predicateKeys sets.String) (map[string]predicates.FitPredicate, error) {
  478. pluginArgs, err := c.getPluginArgs()
  479. if err != nil {
  480. return nil, err
  481. }
  482. return getFitPredicateFunctions(predicateKeys, *pluginArgs)
  483. }
  484. func (c *configFactory) getPluginArgs() (*PluginFactoryArgs, error) {
  485. return &PluginFactoryArgs{
  486. PodLister: c.podLister,
  487. ServiceLister: c.serviceLister,
  488. ControllerLister: c.controllerLister,
  489. ReplicaSetLister: c.replicaSetLister,
  490. StatefulSetLister: c.statefulSetLister,
  491. NodeLister: &nodeLister{c.nodeLister},
  492. PDBLister: c.pdbLister,
  493. NodeInfo: &predicates.CachedNodeInfo{NodeLister: c.nodeLister},
  494. PVInfo: &predicates.CachedPersistentVolumeInfo{PersistentVolumeLister: c.pVLister},
  495. PVCInfo: &predicates.CachedPersistentVolumeClaimInfo{PersistentVolumeClaimLister: c.pVCLister},
  496. StorageClassInfo: &predicates.CachedStorageClassInfo{StorageClassLister: c.storageClassLister},
  497. VolumeBinder: c.volumeBinder,
  498. HardPodAffinitySymmetricWeight: c.hardPodAffinitySymmetricWeight,
  499. }, nil
  500. }
  501. // assignedPodLister filters the pods returned from a PodLister to
  502. // only include those that have a node name set.
  503. type assignedPodLister struct {
  504. corelisters.PodLister
  505. }
  506. // List lists all Pods in the indexer for a given namespace.
  507. func (l assignedPodLister) List(selector labels.Selector) ([]*v1.Pod, error) {
  508. list, err := l.PodLister.List(selector)
  509. if err != nil {
  510. return nil, err
  511. }
  512. filtered := make([]*v1.Pod, 0, len(list))
  513. for _, pod := range list {
  514. if len(pod.Spec.NodeName) > 0 {
  515. filtered = append(filtered, pod)
  516. }
  517. }
  518. return filtered, nil
  519. }
  520. // List lists all Pods in the indexer for a given namespace.
  521. func (l assignedPodLister) Pods(namespace string) corelisters.PodNamespaceLister {
  522. return assignedPodNamespaceLister{l.PodLister.Pods(namespace)}
  523. }
  524. // assignedPodNamespaceLister filters the pods returned from a PodNamespaceLister to
  525. // only include those that have a node name set.
  526. type assignedPodNamespaceLister struct {
  527. corelisters.PodNamespaceLister
  528. }
  529. // List lists all Pods in the indexer for a given namespace.
  530. func (l assignedPodNamespaceLister) List(selector labels.Selector) (ret []*v1.Pod, err error) {
  531. list, err := l.PodNamespaceLister.List(selector)
  532. if err != nil {
  533. return nil, err
  534. }
  535. filtered := make([]*v1.Pod, 0, len(list))
  536. for _, pod := range list {
  537. if len(pod.Spec.NodeName) > 0 {
  538. filtered = append(filtered, pod)
  539. }
  540. }
  541. return filtered, nil
  542. }
  543. // Get retrieves the Pod from the indexer for a given namespace and name.
  544. func (l assignedPodNamespaceLister) Get(name string) (*v1.Pod, error) {
  545. pod, err := l.PodNamespaceLister.Get(name)
  546. if err != nil {
  547. return nil, err
  548. }
  549. if len(pod.Spec.NodeName) > 0 {
  550. return pod, nil
  551. }
  552. return nil, errors.NewNotFound(schema.GroupResource{Resource: string(v1.ResourcePods)}, name)
  553. }
  554. type podInformer struct {
  555. informer cache.SharedIndexInformer
  556. }
  557. func (i *podInformer) Informer() cache.SharedIndexInformer {
  558. return i.informer
  559. }
  560. func (i *podInformer) Lister() corelisters.PodLister {
  561. return corelisters.NewPodLister(i.informer.GetIndexer())
  562. }
  563. // NewPodInformer creates a shared index informer that returns only non-terminal pods.
  564. func NewPodInformer(client clientset.Interface, resyncPeriod time.Duration) coreinformers.PodInformer {
  565. selector := fields.ParseSelectorOrDie(
  566. "status.phase!=" + string(v1.PodSucceeded) +
  567. ",status.phase!=" + string(v1.PodFailed))
  568. lw := cache.NewListWatchFromClient(client.CoreV1().RESTClient(), string(v1.ResourcePods), metav1.NamespaceAll, selector)
  569. return &podInformer{
  570. informer: cache.NewSharedIndexInformer(lw, &v1.Pod{}, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}),
  571. }
  572. }
  573. // MakeDefaultErrorFunc construct a function to handle pod scheduler error
  574. func MakeDefaultErrorFunc(client clientset.Interface, podQueue internalqueue.SchedulingQueue, schedulerCache internalcache.Cache, stopEverything <-chan struct{}) func(pod *v1.Pod, err error) {
  575. return func(pod *v1.Pod, err error) {
  576. if err == core.ErrNoNodesAvailable {
  577. klog.V(4).Infof("Unable to schedule %v/%v: no nodes are registered to the cluster; waiting", pod.Namespace, pod.Name)
  578. } else {
  579. if _, ok := err.(*core.FitError); ok {
  580. klog.V(4).Infof("Unable to schedule %v/%v: no fit: %v; waiting", pod.Namespace, pod.Name, err)
  581. } else if errors.IsNotFound(err) {
  582. if errStatus, ok := err.(errors.APIStatus); ok && errStatus.Status().Details.Kind == "node" {
  583. nodeName := errStatus.Status().Details.Name
  584. // when node is not found, We do not remove the node right away. Trying again to get
  585. // the node and if the node is still not found, then remove it from the scheduler cache.
  586. _, err := client.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{})
  587. if err != nil && errors.IsNotFound(err) {
  588. node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}}
  589. schedulerCache.RemoveNode(&node)
  590. }
  591. }
  592. } else {
  593. klog.Errorf("Error scheduling %v/%v: %v; retrying", pod.Namespace, pod.Name, err)
  594. }
  595. }
  596. podSchedulingCycle := podQueue.SchedulingCycle()
  597. // Retry asynchronously.
  598. // Note that this is extremely rudimentary and we need a more real error handling path.
  599. go func() {
  600. defer runtime.HandleCrash()
  601. podID := types.NamespacedName{
  602. Namespace: pod.Namespace,
  603. Name: pod.Name,
  604. }
  605. // An unschedulable pod will be placed in the unschedulable queue.
  606. // This ensures that if the pod is nominated to run on a node,
  607. // scheduler takes the pod into account when running predicates for the node.
  608. // Get the pod again; it may have changed/been scheduled already.
  609. getBackoff := initialGetBackoff
  610. for {
  611. pod, err := client.CoreV1().Pods(podID.Namespace).Get(podID.Name, metav1.GetOptions{})
  612. if err == nil {
  613. if len(pod.Spec.NodeName) == 0 {
  614. if err := podQueue.AddUnschedulableIfNotPresent(pod, podSchedulingCycle); err != nil {
  615. klog.Error(err)
  616. }
  617. }
  618. break
  619. }
  620. if errors.IsNotFound(err) {
  621. klog.Warningf("A pod %v no longer exists", podID)
  622. return
  623. }
  624. klog.Errorf("Error getting pod %v for retry: %v; retrying...", podID, err)
  625. if getBackoff = getBackoff * 2; getBackoff > maximalGetBackoff {
  626. getBackoff = maximalGetBackoff
  627. }
  628. time.Sleep(getBackoff)
  629. }
  630. }()
  631. }
  632. }
  633. type binder struct {
  634. Client clientset.Interface
  635. }
  636. // Bind just does a POST binding RPC.
  637. func (b *binder) Bind(binding *v1.Binding) error {
  638. klog.V(3).Infof("Attempting to bind %v to %v", binding.Name, binding.Target.Name)
  639. return b.Client.CoreV1().Pods(binding.Namespace).Bind(binding)
  640. }
  641. type podConditionUpdater struct {
  642. Client clientset.Interface
  643. }
  644. func (p *podConditionUpdater) Update(pod *v1.Pod, condition *v1.PodCondition) error {
  645. klog.V(3).Infof("Updating pod condition for %s/%s to (%s==%s, Reason=%s)", pod.Namespace, pod.Name, condition.Type, condition.Status, condition.Reason)
  646. if podutil.UpdatePodCondition(&pod.Status, condition) {
  647. _, err := p.Client.CoreV1().Pods(pod.Namespace).UpdateStatus(pod)
  648. return err
  649. }
  650. return nil
  651. }
  652. type podPreemptor struct {
  653. Client clientset.Interface
  654. }
  655. func (p *podPreemptor) GetUpdatedPod(pod *v1.Pod) (*v1.Pod, error) {
  656. return p.Client.CoreV1().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{})
  657. }
  658. func (p *podPreemptor) DeletePod(pod *v1.Pod) error {
  659. return p.Client.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &metav1.DeleteOptions{})
  660. }
  661. func (p *podPreemptor) SetNominatedNodeName(pod *v1.Pod, nominatedNodeName string) error {
  662. podCopy := pod.DeepCopy()
  663. podCopy.Status.NominatedNodeName = nominatedNodeName
  664. _, err := p.Client.CoreV1().Pods(pod.Namespace).UpdateStatus(podCopy)
  665. return err
  666. }
  667. func (p *podPreemptor) RemoveNominatedNodeName(pod *v1.Pod) error {
  668. if len(pod.Status.NominatedNodeName) == 0 {
  669. return nil
  670. }
  671. return p.SetNominatedNodeName(pod, "")
  672. }