scheduler.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package scheduler
  14. import (
  15. "fmt"
  16. "io/ioutil"
  17. "os"
  18. "time"
  19. "k8s.io/klog"
  20. v1 "k8s.io/api/core/v1"
  21. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  22. "k8s.io/apimachinery/pkg/runtime"
  23. "k8s.io/apimachinery/pkg/util/wait"
  24. appsinformers "k8s.io/client-go/informers/apps/v1"
  25. coreinformers "k8s.io/client-go/informers/core/v1"
  26. policyinformers "k8s.io/client-go/informers/policy/v1beta1"
  27. storageinformers "k8s.io/client-go/informers/storage/v1"
  28. clientset "k8s.io/client-go/kubernetes"
  29. "k8s.io/client-go/tools/record"
  30. schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
  31. latestschedulerapi "k8s.io/kubernetes/pkg/scheduler/api/latest"
  32. kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
  33. "k8s.io/kubernetes/pkg/scheduler/core"
  34. // "k8s.io/kubernetes/pkg/scheduler/customcache"
  35. // "github.com/iwita/kube-scheduler/customcache"
  36. "k8s.io/kubernetes/pkg/scheduler/factory"
  37. framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
  38. internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
  39. "k8s.io/kubernetes/pkg/scheduler/metrics"
  40. "k8s.io/kubernetes/pkg/scheduler/util"
  41. )
  42. const (
  43. // BindTimeoutSeconds defines the default bind timeout
  44. BindTimeoutSeconds = 100
  45. // SchedulerError is the reason recorded for events when an error occurs during scheduling a pod.
  46. SchedulerError = "SchedulerError"
  47. )
  48. // Scheduler watches for new unscheduled pods. It attempts to find
  49. // nodes that they fit on and writes bindings back to the api server.
  50. type Scheduler struct {
  51. config *factory.Config
  52. }
  53. // Cache returns the cache in scheduler for test to check the data in scheduler.
  54. func (sched *Scheduler) Cache() internalcache.Cache {
  55. return sched.config.SchedulerCache
  56. }
  57. type schedulerOptions struct {
  58. schedulerName string
  59. hardPodAffinitySymmetricWeight int32
  60. disablePreemption bool
  61. percentageOfNodesToScore int32
  62. bindTimeoutSeconds int64
  63. }
  64. // Option configures a Scheduler
  65. type Option func(*schedulerOptions)
  66. // WithName sets schedulerName for Scheduler, the default schedulerName is default-scheduler
  67. func WithName(schedulerName string) Option {
  68. return func(o *schedulerOptions) {
  69. o.schedulerName = schedulerName
  70. }
  71. }
  72. // WithHardPodAffinitySymmetricWeight sets hardPodAffinitySymmetricWeight for Scheduler, the default value is 1
  73. func WithHardPodAffinitySymmetricWeight(hardPodAffinitySymmetricWeight int32) Option {
  74. return func(o *schedulerOptions) {
  75. o.hardPodAffinitySymmetricWeight = hardPodAffinitySymmetricWeight
  76. }
  77. }
  78. // WithPreemptionDisabled sets disablePreemption for Scheduler, the default value is false
  79. func WithPreemptionDisabled(disablePreemption bool) Option {
  80. return func(o *schedulerOptions) {
  81. o.disablePreemption = disablePreemption
  82. }
  83. }
  84. // WithPercentageOfNodesToScore sets percentageOfNodesToScore for Scheduler, the default value is 50
  85. func WithPercentageOfNodesToScore(percentageOfNodesToScore int32) Option {
  86. return func(o *schedulerOptions) {
  87. o.percentageOfNodesToScore = percentageOfNodesToScore
  88. }
  89. }
  90. // WithBindTimeoutSeconds sets bindTimeoutSeconds for Scheduler, the default value is 100
  91. func WithBindTimeoutSeconds(bindTimeoutSeconds int64) Option {
  92. return func(o *schedulerOptions) {
  93. o.bindTimeoutSeconds = bindTimeoutSeconds
  94. }
  95. }
  96. var defaultSchedulerOptions = schedulerOptions{
  97. schedulerName: v1.DefaultSchedulerName,
  98. hardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight,
  99. disablePreemption: false,
  100. percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
  101. bindTimeoutSeconds: BindTimeoutSeconds,
  102. }
  103. // New returns a Scheduler
  104. func New(client clientset.Interface,
  105. nodeInformer coreinformers.NodeInformer,
  106. podInformer coreinformers.PodInformer,
  107. pvInformer coreinformers.PersistentVolumeInformer,
  108. pvcInformer coreinformers.PersistentVolumeClaimInformer,
  109. replicationControllerInformer coreinformers.ReplicationControllerInformer,
  110. replicaSetInformer appsinformers.ReplicaSetInformer,
  111. statefulSetInformer appsinformers.StatefulSetInformer,
  112. serviceInformer coreinformers.ServiceInformer,
  113. pdbInformer policyinformers.PodDisruptionBudgetInformer,
  114. storageClassInformer storageinformers.StorageClassInformer,
  115. recorder record.EventRecorder,
  116. schedulerAlgorithmSource kubeschedulerconfig.SchedulerAlgorithmSource,
  117. stopCh <-chan struct{},
  118. registry framework.Registry,
  119. plugins *kubeschedulerconfig.Plugins,
  120. pluginConfig []kubeschedulerconfig.PluginConfig,
  121. opts ...func(o *schedulerOptions)) (*Scheduler, error) {
  122. options := defaultSchedulerOptions
  123. for _, opt := range opts {
  124. opt(&options)
  125. }
  126. // Set up the configurator which can create schedulers from configs.
  127. configurator := factory.NewConfigFactory(&factory.ConfigFactoryArgs{
  128. SchedulerName: options.schedulerName,
  129. Client: client,
  130. NodeInformer: nodeInformer,
  131. PodInformer: podInformer,
  132. PvInformer: pvInformer,
  133. PvcInformer: pvcInformer,
  134. ReplicationControllerInformer: replicationControllerInformer,
  135. ReplicaSetInformer: replicaSetInformer,
  136. StatefulSetInformer: statefulSetInformer,
  137. ServiceInformer: serviceInformer,
  138. PdbInformer: pdbInformer,
  139. StorageClassInformer: storageClassInformer,
  140. HardPodAffinitySymmetricWeight: options.hardPodAffinitySymmetricWeight,
  141. DisablePreemption: options.disablePreemption,
  142. PercentageOfNodesToScore: options.percentageOfNodesToScore,
  143. BindTimeoutSeconds: options.bindTimeoutSeconds,
  144. Registry: registry,
  145. Plugins: plugins,
  146. PluginConfig: pluginConfig,
  147. })
  148. var config *factory.Config
  149. source := schedulerAlgorithmSource
  150. switch {
  151. case source.Provider != nil:
  152. // Create the config from a named algorithm provider.
  153. sc, err := configurator.CreateFromProvider(*source.Provider)
  154. if err != nil {
  155. return nil, fmt.Errorf("couldn't create scheduler using provider %q: %v", *source.Provider, err)
  156. }
  157. config = sc
  158. case source.Policy != nil:
  159. // Create the config from a user specified policy source.
  160. policy := &schedulerapi.Policy{}
  161. switch {
  162. case source.Policy.File != nil:
  163. if err := initPolicyFromFile(source.Policy.File.Path, policy); err != nil {
  164. return nil, err
  165. }
  166. case source.Policy.ConfigMap != nil:
  167. if err := initPolicyFromConfigMap(client, source.Policy.ConfigMap, policy); err != nil {
  168. return nil, err
  169. }
  170. }
  171. sc, err := configurator.CreateFromConfig(*policy)
  172. if err != nil {
  173. return nil, fmt.Errorf("couldn't create scheduler from policy: %v", err)
  174. }
  175. config = sc
  176. default:
  177. return nil, fmt.Errorf("unsupported algorithm source: %v", source)
  178. }
  179. // Additional tweaks to the config produced by the configurator.
  180. config.Recorder = recorder
  181. config.DisablePreemption = options.disablePreemption
  182. config.StopEverything = stopCh
  183. // Create the scheduler.
  184. sched := NewFromConfig(config)
  185. AddAllEventHandlers(sched, options.schedulerName, nodeInformer, podInformer, pvInformer, pvcInformer, serviceInformer, storageClassInformer)
  186. return sched, nil
  187. }
  188. // initPolicyFromFile initialize policy from file
  189. func initPolicyFromFile(policyFile string, policy *schedulerapi.Policy) error {
  190. // Use a policy serialized in a file.
  191. _, err := os.Stat(policyFile)
  192. if err != nil {
  193. return fmt.Errorf("missing policy config file %s", policyFile)
  194. }
  195. data, err := ioutil.ReadFile(policyFile)
  196. if err != nil {
  197. return fmt.Errorf("couldn't read policy config: %v", err)
  198. }
  199. err = runtime.DecodeInto(latestschedulerapi.Codec, []byte(data), policy)
  200. if err != nil {
  201. return fmt.Errorf("invalid policy: %v", err)
  202. }
  203. return nil
  204. }
  205. // initPolicyFromConfigMap initialize policy from configMap
  206. func initPolicyFromConfigMap(client clientset.Interface, policyRef *kubeschedulerconfig.SchedulerPolicyConfigMapSource, policy *schedulerapi.Policy) error {
  207. // Use a policy serialized in a config map value.
  208. policyConfigMap, err := client.CoreV1().ConfigMaps(policyRef.Namespace).Get(policyRef.Name, metav1.GetOptions{})
  209. if err != nil {
  210. return fmt.Errorf("couldn't get policy config map %s/%s: %v", policyRef.Namespace, policyRef.Name, err)
  211. }
  212. data, found := policyConfigMap.Data[kubeschedulerconfig.SchedulerPolicyConfigMapKey]
  213. if !found {
  214. return fmt.Errorf("missing policy config map value at key %q", kubeschedulerconfig.SchedulerPolicyConfigMapKey)
  215. }
  216. err = runtime.DecodeInto(latestschedulerapi.Codec, []byte(data), policy)
  217. if err != nil {
  218. return fmt.Errorf("invalid policy: %v", err)
  219. }
  220. return nil
  221. }
  222. // NewFromConfig returns a new scheduler using the provided Config.
  223. func NewFromConfig(config *factory.Config) *Scheduler {
  224. metrics.Register()
  225. return &Scheduler{
  226. config: config,
  227. }
  228. }
  229. //var timeout *time.Ticker
  230. // Run begins watching and scheduling. It waits for cache to be synced, then starts a goroutine and returns immediately.
  231. func (sched *Scheduler) Run() {
  232. if !sched.config.WaitForCacheSync() {
  233. return
  234. }
  235. //customcache.Timeout := time.NewTicker(time.Duration(10 * time.Second))
  236. go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
  237. }
  238. // Config returns scheduler's config pointer. It is exposed for testing purposes.
  239. func (sched *Scheduler) Config() *factory.Config {
  240. return sched.config
  241. }
  242. // recordFailedSchedulingEvent records an event for the pod that indicates the
  243. // pod has failed to schedule.
  244. // NOTE: This function modifies "pod". "pod" should be copied before being passed.
  245. func (sched *Scheduler) recordSchedulingFailure(pod *v1.Pod, err error, reason string, message string) {
  246. sched.config.Error(pod, err)
  247. sched.config.Recorder.Event(pod, v1.EventTypeWarning, "FailedScheduling", message)
  248. sched.config.PodConditionUpdater.Update(pod, &v1.PodCondition{
  249. Type: v1.PodScheduled,
  250. Status: v1.ConditionFalse,
  251. Reason: reason,
  252. Message: err.Error(),
  253. })
  254. }
  255. // schedule implements the scheduling algorithm and returns the suggested result(host,
  256. // evaluated nodes number,feasible nodes number).
  257. func (sched *Scheduler) schedule(pod *v1.Pod) (core.ScheduleResult, error) {
  258. result, err := sched.config.Algorithm.Schedule(pod, sched.config.NodeLister)
  259. if err != nil {
  260. pod = pod.DeepCopy()
  261. sched.recordSchedulingFailure(pod, err, v1.PodReasonUnschedulable, err.Error())
  262. return core.ScheduleResult{}, err
  263. }
  264. return result, err
  265. }
  266. // preempt tries to create room for a pod that has failed to schedule, by preempting lower priority pods if possible.
  267. // If it succeeds, it adds the name of the node where preemption has happened to the pod spec.
  268. // It returns the node name and an error if any.
  269. func (sched *Scheduler) preempt(preemptor *v1.Pod, scheduleErr error) (string, error) {
  270. preemptor, err := sched.config.PodPreemptor.GetUpdatedPod(preemptor)
  271. if err != nil {
  272. klog.Errorf("Error getting the updated preemptor pod object: %v", err)
  273. return "", err
  274. }
  275. node, victims, nominatedPodsToClear, err := sched.config.Algorithm.Preempt(preemptor, sched.config.NodeLister, scheduleErr)
  276. if err != nil {
  277. klog.Errorf("Error preempting victims to make room for %v/%v.", preemptor.Namespace, preemptor.Name)
  278. return "", err
  279. }
  280. var nodeName = ""
  281. if node != nil {
  282. nodeName = node.Name
  283. // Update the scheduling queue with the nominated pod information. Without
  284. // this, there would be a race condition between the next scheduling cycle
  285. // and the time the scheduler receives a Pod Update for the nominated pod.
  286. sched.config.SchedulingQueue.UpdateNominatedPodForNode(preemptor, nodeName)
  287. // Make a call to update nominated node name of the pod on the API server.
  288. err = sched.config.PodPreemptor.SetNominatedNodeName(preemptor, nodeName)
  289. if err != nil {
  290. klog.Errorf("Error in preemption process. Cannot set 'NominatedPod' on pod %v/%v: %v", preemptor.Namespace, preemptor.Name, err)
  291. sched.config.SchedulingQueue.DeleteNominatedPodIfExists(preemptor)
  292. return "", err
  293. }
  294. for _, victim := range victims {
  295. if err := sched.config.PodPreemptor.DeletePod(victim); err != nil {
  296. klog.Errorf("Error preempting pod %v/%v: %v", victim.Namespace, victim.Name, err)
  297. return "", err
  298. }
  299. sched.config.Recorder.Eventf(victim, v1.EventTypeNormal, "Preempted", "by %v/%v on node %v", preemptor.Namespace, preemptor.Name, nodeName)
  300. }
  301. metrics.PreemptionVictims.Set(float64(len(victims)))
  302. }
  303. // Clearing nominated pods should happen outside of "if node != nil". Node could
  304. // be nil when a pod with nominated node name is eligible to preempt again,
  305. // but preemption logic does not find any node for it. In that case Preempt()
  306. // function of generic_scheduler.go returns the pod itself for removal of
  307. // the 'NominatedPod' field.
  308. for _, p := range nominatedPodsToClear {
  309. rErr := sched.config.PodPreemptor.RemoveNominatedNodeName(p)
  310. if rErr != nil {
  311. klog.Errorf("Cannot remove 'NominatedPod' field of pod: %v", rErr)
  312. // We do not return as this error is not critical.
  313. }
  314. }
  315. return nodeName, err
  316. }
  317. // assumeVolumes will update the volume cache with the chosen bindings
  318. //
  319. // This function modifies assumed if volume binding is required.
  320. func (sched *Scheduler) assumeVolumes(assumed *v1.Pod, host string) (allBound bool, err error) {
  321. allBound, err = sched.config.VolumeBinder.Binder.AssumePodVolumes(assumed, host)
  322. if err != nil {
  323. sched.recordSchedulingFailure(assumed, err, SchedulerError,
  324. fmt.Sprintf("AssumePodVolumes failed: %v", err))
  325. }
  326. return
  327. }
  328. // bindVolumes will make the API update with the assumed bindings and wait until
  329. // the PV controller has completely finished the binding operation.
  330. //
  331. // If binding errors, times out or gets undone, then an error will be returned to
  332. // retry scheduling.
  333. func (sched *Scheduler) bindVolumes(assumed *v1.Pod) error {
  334. klog.V(5).Infof("Trying to bind volumes for pod \"%v/%v\"", assumed.Namespace, assumed.Name)
  335. err := sched.config.VolumeBinder.Binder.BindPodVolumes(assumed)
  336. if err != nil {
  337. klog.V(1).Infof("Failed to bind volumes for pod \"%v/%v\": %v", assumed.Namespace, assumed.Name, err)
  338. // Unassume the Pod and retry scheduling
  339. if forgetErr := sched.config.SchedulerCache.ForgetPod(assumed); forgetErr != nil {
  340. klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
  341. }
  342. sched.recordSchedulingFailure(assumed, err, "VolumeBindingFailed", err.Error())
  343. return err
  344. }
  345. klog.V(5).Infof("Success binding volumes for pod \"%v/%v\"", assumed.Namespace, assumed.Name)
  346. return nil
  347. }
  348. // assume signals to the cache that a pod is already in the cache, so that binding can be asynchronous.
  349. // assume modifies `assumed`.
  350. func (sched *Scheduler) assume(assumed *v1.Pod, host string) error {
  351. // Optimistically assume that the binding will succeed and send it to apiserver
  352. // in the background.
  353. // If the binding fails, scheduler will release resources allocated to assumed pod
  354. // immediately.
  355. assumed.Spec.NodeName = host
  356. if err := sched.config.SchedulerCache.AssumePod(assumed); err != nil {
  357. klog.Errorf("scheduler cache AssumePod failed: %v", err)
  358. // This is most probably result of a BUG in retrying logic.
  359. // We report an error here so that pod scheduling can be retried.
  360. // This relies on the fact that Error will check if the pod has been bound
  361. // to a node and if so will not add it back to the unscheduled pods queue
  362. // (otherwise this would cause an infinite loop).
  363. sched.recordSchedulingFailure(assumed, err, SchedulerError,
  364. fmt.Sprintf("AssumePod failed: %v", err))
  365. return err
  366. }
  367. // if "assumed" is a nominated pod, we should remove it from internal cache
  368. if sched.config.SchedulingQueue != nil {
  369. sched.config.SchedulingQueue.DeleteNominatedPodIfExists(assumed)
  370. }
  371. return nil
  372. }
  373. // bind binds a pod to a given node defined in a binding object. We expect this to run asynchronously, so we
  374. // handle binding metrics internally.
  375. func (sched *Scheduler) bind(assumed *v1.Pod, b *v1.Binding) error {
  376. bindingStart := time.Now()
  377. // If binding succeeded then PodScheduled condition will be updated in apiserver so that
  378. // it's atomic with setting host.
  379. err := sched.config.GetBinder(assumed).Bind(b)
  380. if finErr := sched.config.SchedulerCache.FinishBinding(assumed); finErr != nil {
  381. klog.Errorf("scheduler cache FinishBinding failed: %v", finErr)
  382. }
  383. if err != nil {
  384. klog.V(1).Infof("Failed to bind pod: %v/%v", assumed.Namespace, assumed.Name)
  385. if err := sched.config.SchedulerCache.ForgetPod(assumed); err != nil {
  386. klog.Errorf("scheduler cache ForgetPod failed: %v", err)
  387. }
  388. sched.recordSchedulingFailure(assumed, err, SchedulerError,
  389. fmt.Sprintf("Binding rejected: %v", err))
  390. return err
  391. }
  392. metrics.BindingLatency.Observe(metrics.SinceInSeconds(bindingStart))
  393. metrics.DeprecatedBindingLatency.Observe(metrics.SinceInMicroseconds(bindingStart))
  394. metrics.SchedulingLatency.WithLabelValues(metrics.Binding).Observe(metrics.SinceInSeconds(bindingStart))
  395. metrics.DeprecatedSchedulingLatency.WithLabelValues(metrics.Binding).Observe(metrics.SinceInSeconds(bindingStart))
  396. sched.config.Recorder.Eventf(assumed, v1.EventTypeNormal, "Scheduled", "Successfully assigned %v/%v to %v", assumed.Namespace, assumed.Name, b.Target.Name)
  397. return nil
  398. }
  399. // scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting.
  400. func (sched *Scheduler) scheduleOne() {
  401. // // Check if the cache needs update
  402. // select {
  403. // // clean the cache if 10 seconds are passed
  404. // case <-customcache.LabCache.Timeout.C:
  405. // klog.Infof("Time to erase")
  406. // customcache.LabCache.CleanCache()
  407. // default:
  408. // }
  409. //Check if the cache needs update
  410. //klog.Infof("The value of the Ticker: %v", customcache.LabCache.Timeout.C)
  411. // select {
  412. // // clean the cache if 10 seconds are passed
  413. // case <-customcache.LabCache.Timeout.C:
  414. // klog.Infof("Time to erase: %v", time.Now())
  415. // //customcache.LabCache.Timeout.Stop()
  416. // customcache.LabCache.CleanCache()
  417. // klog.Infof("Cache: %v", customcache.LabCache.Cache)
  418. // default:
  419. // klog.Infof("Cache is Valid, Time: %v", customcache.LabCache.Timeout.C)
  420. // }
  421. fwk := sched.config.Framework
  422. pod := sched.config.NextPod()
  423. // pod could be nil when schedulerQueue is closed
  424. if pod == nil {
  425. return
  426. }
  427. if pod.DeletionTimestamp != nil {
  428. sched.config.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
  429. klog.V(3).Infof("Skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
  430. return
  431. }
  432. klog.V(3).Infof("Attempting to schedule pod: %v/%v", pod.Namespace, pod.Name)
  433. // Synchronously attempt to find a fit for the pod.
  434. start := time.Now()
  435. pluginContext := framework.NewPluginContext()
  436. scheduleResult, err := sched.schedule(pod)
  437. if err != nil {
  438. // schedule() may have failed because the pod would not fit on any host, so we try to
  439. // preempt, with the expectation that the next time the pod is tried for scheduling it
  440. // will fit due to the preemption. It is also possible that a different pod will schedule
  441. // into the resources that were preempted, but this is harmless.
  442. if fitError, ok := err.(*core.FitError); ok {
  443. if !util.PodPriorityEnabled() || sched.config.DisablePreemption {
  444. klog.V(3).Infof("Pod priority feature is not enabled or preemption is disabled by scheduler configuration." +
  445. " No preemption is performed.")
  446. } else {
  447. preemptionStartTime := time.Now()
  448. sched.preempt(pod, fitError)
  449. metrics.PreemptionAttempts.Inc()
  450. metrics.SchedulingAlgorithmPremptionEvaluationDuration.Observe(metrics.SinceInSeconds(preemptionStartTime))
  451. metrics.DeprecatedSchedulingAlgorithmPremptionEvaluationDuration.Observe(metrics.SinceInMicroseconds(preemptionStartTime))
  452. metrics.SchedulingLatency.WithLabelValues(metrics.PreemptionEvaluation).Observe(metrics.SinceInSeconds(preemptionStartTime))
  453. metrics.DeprecatedSchedulingLatency.WithLabelValues(metrics.PreemptionEvaluation).Observe(metrics.SinceInSeconds(preemptionStartTime))
  454. }
  455. // Pod did not fit anywhere, so it is counted as a failure. If preemption
  456. // succeeds, the pod should get counted as a success the next time we try to
  457. // schedule it. (hopefully)
  458. metrics.PodScheduleFailures.Inc()
  459. } else {
  460. klog.Errorf("error selecting node for pod: %v", err)
  461. metrics.PodScheduleErrors.Inc()
  462. }
  463. return
  464. }
  465. metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start))
  466. metrics.DeprecatedSchedulingAlgorithmLatency.Observe(metrics.SinceInMicroseconds(start))
  467. // Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet.
  468. // This allows us to keep scheduling without waiting on binding to occur.
  469. assumedPod := pod.DeepCopy()
  470. // Assume volumes first before assuming the pod.
  471. //
  472. // If all volumes are completely bound, then allBound is true and binding will be skipped.
  473. //
  474. // Otherwise, binding of volumes is started after the pod is assumed, but before pod binding.
  475. //
  476. // This function modifies 'assumedPod' if volume binding is required.
  477. allBound, err := sched.assumeVolumes(assumedPod, scheduleResult.SuggestedHost)
  478. if err != nil {
  479. klog.Errorf("error assuming volumes: %v", err)
  480. metrics.PodScheduleErrors.Inc()
  481. return
  482. }
  483. // Run "reserve" plugins.
  484. if sts := fwk.RunReservePlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
  485. sched.recordSchedulingFailure(assumedPod, sts.AsError(), SchedulerError, sts.Message())
  486. metrics.PodScheduleErrors.Inc()
  487. return
  488. }
  489. // assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost
  490. err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
  491. if err != nil {
  492. klog.Errorf("error assuming pod: %v", err)
  493. metrics.PodScheduleErrors.Inc()
  494. // trigger un-reserve plugins to clean up state associated with the reserved Pod
  495. fwk.RunUnreservePlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost)
  496. return
  497. }
  498. // bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
  499. go func() {
  500. // Bind volumes first before Pod
  501. if !allBound {
  502. err := sched.bindVolumes(assumedPod)
  503. if err != nil {
  504. klog.Errorf("error binding volumes: %v", err)
  505. metrics.PodScheduleErrors.Inc()
  506. // trigger un-reserve plugins to clean up state associated with the reserved Pod
  507. fwk.RunUnreservePlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost)
  508. return
  509. }
  510. }
  511. // Run "permit" plugins.
  512. permitStatus := fwk.RunPermitPlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost)
  513. if !permitStatus.IsSuccess() {
  514. var reason string
  515. if permitStatus.Code() == framework.Unschedulable {
  516. reason = v1.PodReasonUnschedulable
  517. } else {
  518. metrics.PodScheduleErrors.Inc()
  519. reason = SchedulerError
  520. }
  521. if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil {
  522. klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
  523. }
  524. sched.recordSchedulingFailure(assumedPod, permitStatus.AsError(), reason, permitStatus.Message())
  525. // trigger un-reserve plugins to clean up state associated with the reserved Pod
  526. fwk.RunUnreservePlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost)
  527. return
  528. }
  529. // Run "prebind" plugins.
  530. prebindStatus := fwk.RunPrebindPlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost)
  531. if !prebindStatus.IsSuccess() {
  532. var reason string
  533. if prebindStatus.Code() == framework.Unschedulable {
  534. reason = v1.PodReasonUnschedulable
  535. } else {
  536. metrics.PodScheduleErrors.Inc()
  537. reason = SchedulerError
  538. }
  539. if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil {
  540. klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
  541. }
  542. sched.recordSchedulingFailure(assumedPod, prebindStatus.AsError(), reason, prebindStatus.Message())
  543. // trigger un-reserve plugins to clean up state associated with the reserved Pod
  544. fwk.RunUnreservePlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost)
  545. return
  546. }
  547. err := sched.bind(assumedPod, &v1.Binding{
  548. ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID},
  549. Target: v1.ObjectReference{
  550. Kind: "Node",
  551. Name: scheduleResult.SuggestedHost,
  552. },
  553. })
  554. metrics.E2eSchedulingLatency.Observe(metrics.SinceInSeconds(start))
  555. metrics.DeprecatedE2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start))
  556. if err != nil {
  557. klog.Errorf("error binding pod: %v", err)
  558. metrics.PodScheduleErrors.Inc()
  559. // trigger un-reserve plugins to clean up state associated with the reserved Pod
  560. fwk.RunUnreservePlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost)
  561. } else {
  562. klog.V(2).Infof("pod %v/%v is bound successfully on node %v, %d nodes evaluated, %d nodes were found feasible", assumedPod.Namespace, assumedPod.Name, scheduleResult.SuggestedHost, scheduleResult.EvaluatedNodes, scheduleResult.FeasibleNodes)
  563. metrics.PodScheduleSuccesses.Inc()
  564. // Run "postbind" plugins.
  565. fwk.RunPostbindPlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost)
  566. }
  567. }()
  568. }