scheduler.go 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package scheduler
  14. import (
  15. "context"
  16. "fmt"
  17. "io/ioutil"
  18. "math/rand"
  19. "os"
  20. "time"
  21. v1 "k8s.io/api/core/v1"
  22. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  23. "k8s.io/apimachinery/pkg/runtime"
  24. "k8s.io/apimachinery/pkg/util/wait"
  25. utilfeature "k8s.io/apiserver/pkg/util/feature"
  26. "k8s.io/client-go/informers"
  27. coreinformers "k8s.io/client-go/informers/core/v1"
  28. clientset "k8s.io/client-go/kubernetes"
  29. "k8s.io/client-go/tools/cache"
  30. "k8s.io/client-go/tools/events"
  31. "k8s.io/klog"
  32. podutil "k8s.io/kubernetes/pkg/api/v1/pod"
  33. kubefeatures "k8s.io/kubernetes/pkg/features"
  34. schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
  35. "k8s.io/kubernetes/pkg/scheduler/apis/config/scheme"
  36. "k8s.io/kubernetes/pkg/scheduler/core"
  37. frameworkplugins "k8s.io/kubernetes/pkg/scheduler/framework/plugins"
  38. framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
  39. internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
  40. internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
  41. "k8s.io/kubernetes/pkg/scheduler/metrics"
  42. "k8s.io/kubernetes/pkg/scheduler/volumebinder"
  43. )
  44. const (
  45. // BindTimeoutSeconds defines the default bind timeout
  46. BindTimeoutSeconds = 100
  47. // SchedulerError is the reason recorded for events when an error occurs during scheduling a pod.
  48. SchedulerError = "SchedulerError"
  49. // Percentage of plugin metrics to be sampled.
  50. pluginMetricsSamplePercent = 10
  51. )
  52. // podConditionUpdater updates the condition of a pod based on the passed
  53. // PodCondition
  54. // TODO (ahmad-diaa): Remove type and replace it with scheduler methods
  55. type podConditionUpdater interface {
  56. update(pod *v1.Pod, podCondition *v1.PodCondition) error
  57. }
  58. // PodPreemptor has methods needed to delete a pod and to update 'NominatedPod'
  59. // field of the preemptor pod.
  60. // TODO (ahmad-diaa): Remove type and replace it with scheduler methods
  61. type podPreemptor interface {
  62. getUpdatedPod(pod *v1.Pod) (*v1.Pod, error)
  63. deletePod(pod *v1.Pod) error
  64. setNominatedNodeName(pod *v1.Pod, nominatedNode string) error
  65. removeNominatedNodeName(pod *v1.Pod) error
  66. }
  67. // Scheduler watches for new unscheduled pods. It attempts to find
  68. // nodes that they fit on and writes bindings back to the api server.
  69. type Scheduler struct {
  70. // It is expected that changes made via SchedulerCache will be observed
  71. // by NodeLister and Algorithm.
  72. SchedulerCache internalcache.Cache
  73. Algorithm core.ScheduleAlgorithm
  74. // PodConditionUpdater is used only in case of scheduling errors. If we succeed
  75. // with scheduling, PodScheduled condition will be updated in apiserver in /bind
  76. // handler so that binding and setting PodCondition it is atomic.
  77. podConditionUpdater podConditionUpdater
  78. // PodPreemptor is used to evict pods and update 'NominatedNode' field of
  79. // the preemptor pod.
  80. podPreemptor podPreemptor
  81. // Framework runs scheduler plugins at configured extension points.
  82. Framework framework.Framework
  83. // NextPod should be a function that blocks until the next pod
  84. // is available. We don't use a channel for this, because scheduling
  85. // a pod may take some amount of time and we don't want pods to get
  86. // stale while they sit in a channel.
  87. NextPod func() *framework.PodInfo
  88. // Error is called if there is an error. It is passed the pod in
  89. // question, and the error
  90. Error func(*framework.PodInfo, error)
  91. // Recorder is the EventRecorder to use
  92. Recorder events.EventRecorder
  93. // Close this to shut down the scheduler.
  94. StopEverything <-chan struct{}
  95. // VolumeBinder handles PVC/PV binding for the pod.
  96. VolumeBinder *volumebinder.VolumeBinder
  97. // Disable pod preemption or not.
  98. DisablePreemption bool
  99. // SchedulingQueue holds pods to be scheduled
  100. SchedulingQueue internalqueue.SchedulingQueue
  101. scheduledPodsHasSynced func() bool
  102. }
  103. // Cache returns the cache in scheduler for test to check the data in scheduler.
  104. func (sched *Scheduler) Cache() internalcache.Cache {
  105. return sched.SchedulerCache
  106. }
  107. type schedulerOptions struct {
  108. schedulerName string
  109. schedulerAlgorithmSource schedulerapi.SchedulerAlgorithmSource
  110. disablePreemption bool
  111. percentageOfNodesToScore int32
  112. bindTimeoutSeconds int64
  113. podInitialBackoffSeconds int64
  114. podMaxBackoffSeconds int64
  115. // Contains out-of-tree plugins to be merged with the in-tree registry.
  116. frameworkOutOfTreeRegistry framework.Registry
  117. // Plugins and PluginConfig set from ComponentConfig.
  118. frameworkPlugins *schedulerapi.Plugins
  119. frameworkPluginConfig []schedulerapi.PluginConfig
  120. }
  121. // Option configures a Scheduler
  122. type Option func(*schedulerOptions)
  123. // WithName sets schedulerName for Scheduler, the default schedulerName is default-scheduler
  124. func WithName(schedulerName string) Option {
  125. return func(o *schedulerOptions) {
  126. o.schedulerName = schedulerName
  127. }
  128. }
  129. // WithAlgorithmSource sets schedulerAlgorithmSource for Scheduler, the default is a source with DefaultProvider.
  130. func WithAlgorithmSource(source schedulerapi.SchedulerAlgorithmSource) Option {
  131. return func(o *schedulerOptions) {
  132. o.schedulerAlgorithmSource = source
  133. }
  134. }
  135. // WithPreemptionDisabled sets disablePreemption for Scheduler, the default value is false
  136. func WithPreemptionDisabled(disablePreemption bool) Option {
  137. return func(o *schedulerOptions) {
  138. o.disablePreemption = disablePreemption
  139. }
  140. }
  141. // WithPercentageOfNodesToScore sets percentageOfNodesToScore for Scheduler, the default value is 50
  142. func WithPercentageOfNodesToScore(percentageOfNodesToScore int32) Option {
  143. return func(o *schedulerOptions) {
  144. o.percentageOfNodesToScore = percentageOfNodesToScore
  145. }
  146. }
  147. // WithBindTimeoutSeconds sets bindTimeoutSeconds for Scheduler, the default value is 100
  148. func WithBindTimeoutSeconds(bindTimeoutSeconds int64) Option {
  149. return func(o *schedulerOptions) {
  150. o.bindTimeoutSeconds = bindTimeoutSeconds
  151. }
  152. }
  153. // WithFrameworkOutOfTreeRegistry sets the registry for out-of-tree plugins. Those plugins
  154. // will be appended to the default registry.
  155. func WithFrameworkOutOfTreeRegistry(registry framework.Registry) Option {
  156. return func(o *schedulerOptions) {
  157. o.frameworkOutOfTreeRegistry = registry
  158. }
  159. }
  160. // WithFrameworkPlugins sets the plugins that the framework should be configured with.
  161. func WithFrameworkPlugins(plugins *schedulerapi.Plugins) Option {
  162. return func(o *schedulerOptions) {
  163. o.frameworkPlugins = plugins
  164. }
  165. }
  166. // WithFrameworkPluginConfig sets the PluginConfig slice that the framework should be configured with.
  167. func WithFrameworkPluginConfig(pluginConfig []schedulerapi.PluginConfig) Option {
  168. return func(o *schedulerOptions) {
  169. o.frameworkPluginConfig = pluginConfig
  170. }
  171. }
  172. // WithPodInitialBackoffSeconds sets podInitialBackoffSeconds for Scheduler, the default value is 1
  173. func WithPodInitialBackoffSeconds(podInitialBackoffSeconds int64) Option {
  174. return func(o *schedulerOptions) {
  175. o.podInitialBackoffSeconds = podInitialBackoffSeconds
  176. }
  177. }
  178. // WithPodMaxBackoffSeconds sets podMaxBackoffSeconds for Scheduler, the default value is 10
  179. func WithPodMaxBackoffSeconds(podMaxBackoffSeconds int64) Option {
  180. return func(o *schedulerOptions) {
  181. o.podMaxBackoffSeconds = podMaxBackoffSeconds
  182. }
  183. }
  184. var defaultSchedulerOptions = schedulerOptions{
  185. schedulerName: v1.DefaultSchedulerName,
  186. schedulerAlgorithmSource: schedulerapi.SchedulerAlgorithmSource{
  187. Provider: defaultAlgorithmSourceProviderName(),
  188. },
  189. disablePreemption: false,
  190. percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
  191. bindTimeoutSeconds: BindTimeoutSeconds,
  192. podInitialBackoffSeconds: int64(internalqueue.DefaultPodInitialBackoffDuration.Seconds()),
  193. podMaxBackoffSeconds: int64(internalqueue.DefaultPodMaxBackoffDuration.Seconds()),
  194. }
  195. // New returns a Scheduler
  196. func New(client clientset.Interface,
  197. informerFactory informers.SharedInformerFactory,
  198. podInformer coreinformers.PodInformer,
  199. recorder events.EventRecorder,
  200. stopCh <-chan struct{},
  201. opts ...Option) (*Scheduler, error) {
  202. stopEverything := stopCh
  203. if stopEverything == nil {
  204. stopEverything = wait.NeverStop
  205. }
  206. options := defaultSchedulerOptions
  207. for _, opt := range opts {
  208. opt(&options)
  209. }
  210. schedulerCache := internalcache.New(30*time.Second, stopEverything)
  211. volumeBinder := volumebinder.NewVolumeBinder(
  212. client,
  213. informerFactory.Core().V1().Nodes(),
  214. informerFactory.Storage().V1().CSINodes(),
  215. informerFactory.Core().V1().PersistentVolumeClaims(),
  216. informerFactory.Core().V1().PersistentVolumes(),
  217. informerFactory.Storage().V1().StorageClasses(),
  218. time.Duration(options.bindTimeoutSeconds)*time.Second,
  219. )
  220. registry := frameworkplugins.NewInTreeRegistry()
  221. if err := registry.Merge(options.frameworkOutOfTreeRegistry); err != nil {
  222. return nil, err
  223. }
  224. snapshot := internalcache.NewEmptySnapshot()
  225. configurator := &Configurator{
  226. client: client,
  227. informerFactory: informerFactory,
  228. podInformer: podInformer,
  229. volumeBinder: volumeBinder,
  230. schedulerCache: schedulerCache,
  231. StopEverything: stopEverything,
  232. disablePreemption: options.disablePreemption,
  233. percentageOfNodesToScore: options.percentageOfNodesToScore,
  234. bindTimeoutSeconds: options.bindTimeoutSeconds,
  235. podInitialBackoffSeconds: options.podInitialBackoffSeconds,
  236. podMaxBackoffSeconds: options.podMaxBackoffSeconds,
  237. enableNonPreempting: utilfeature.DefaultFeatureGate.Enabled(kubefeatures.NonPreemptingPriority),
  238. registry: registry,
  239. plugins: options.frameworkPlugins,
  240. pluginConfig: options.frameworkPluginConfig,
  241. nodeInfoSnapshot: snapshot,
  242. }
  243. metrics.Register()
  244. var sched *Scheduler
  245. source := options.schedulerAlgorithmSource
  246. switch {
  247. case source.Provider != nil:
  248. // Create the config from a named algorithm provider.
  249. sc, err := configurator.createFromProvider(*source.Provider)
  250. if err != nil {
  251. return nil, fmt.Errorf("couldn't create scheduler using provider %q: %v", *source.Provider, err)
  252. }
  253. sched = sc
  254. case source.Policy != nil:
  255. // Create the config from a user specified policy source.
  256. policy := &schedulerapi.Policy{}
  257. switch {
  258. case source.Policy.File != nil:
  259. if err := initPolicyFromFile(source.Policy.File.Path, policy); err != nil {
  260. return nil, err
  261. }
  262. case source.Policy.ConfigMap != nil:
  263. if err := initPolicyFromConfigMap(client, source.Policy.ConfigMap, policy); err != nil {
  264. return nil, err
  265. }
  266. }
  267. sc, err := configurator.createFromConfig(*policy)
  268. if err != nil {
  269. return nil, fmt.Errorf("couldn't create scheduler from policy: %v", err)
  270. }
  271. sched = sc
  272. default:
  273. return nil, fmt.Errorf("unsupported algorithm source: %v", source)
  274. }
  275. // Additional tweaks to the config produced by the configurator.
  276. sched.Recorder = recorder
  277. sched.DisablePreemption = options.disablePreemption
  278. sched.StopEverything = stopEverything
  279. sched.podConditionUpdater = &podConditionUpdaterImpl{client}
  280. sched.podPreemptor = &podPreemptorImpl{client}
  281. sched.scheduledPodsHasSynced = podInformer.Informer().HasSynced
  282. AddAllEventHandlers(sched, options.schedulerName, informerFactory, podInformer)
  283. return sched, nil
  284. }
  285. // initPolicyFromFile initialize policy from file
  286. func initPolicyFromFile(policyFile string, policy *schedulerapi.Policy) error {
  287. // Use a policy serialized in a file.
  288. _, err := os.Stat(policyFile)
  289. if err != nil {
  290. return fmt.Errorf("missing policy config file %s", policyFile)
  291. }
  292. data, err := ioutil.ReadFile(policyFile)
  293. if err != nil {
  294. return fmt.Errorf("couldn't read policy config: %v", err)
  295. }
  296. err = runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), []byte(data), policy)
  297. if err != nil {
  298. return fmt.Errorf("invalid policy: %v", err)
  299. }
  300. return nil
  301. }
  302. // initPolicyFromConfigMap initialize policy from configMap
  303. func initPolicyFromConfigMap(client clientset.Interface, policyRef *schedulerapi.SchedulerPolicyConfigMapSource, policy *schedulerapi.Policy) error {
  304. // Use a policy serialized in a config map value.
  305. policyConfigMap, err := client.CoreV1().ConfigMaps(policyRef.Namespace).Get(context.TODO(), policyRef.Name, metav1.GetOptions{})
  306. if err != nil {
  307. return fmt.Errorf("couldn't get policy config map %s/%s: %v", policyRef.Namespace, policyRef.Name, err)
  308. }
  309. data, found := policyConfigMap.Data[schedulerapi.SchedulerPolicyConfigMapKey]
  310. if !found {
  311. return fmt.Errorf("missing policy config map value at key %q", schedulerapi.SchedulerPolicyConfigMapKey)
  312. }
  313. err = runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), []byte(data), policy)
  314. if err != nil {
  315. return fmt.Errorf("invalid policy: %v", err)
  316. }
  317. return nil
  318. }
  319. // Run begins watching and scheduling. It waits for cache to be synced, then starts scheduling and blocked until the context is done.
  320. func (sched *Scheduler) Run(ctx context.Context) {
  321. if !cache.WaitForCacheSync(ctx.Done(), sched.scheduledPodsHasSynced) {
  322. return
  323. }
  324. sched.SchedulingQueue.Run()
  325. wait.UntilWithContext(ctx, sched.scheduleOne, 0)
  326. sched.SchedulingQueue.Close()
  327. }
  328. // recordFailedSchedulingEvent records an event for the pod that indicates the
  329. // pod has failed to schedule.
  330. // NOTE: This function modifies "pod". "pod" should be copied before being passed.
  331. func (sched *Scheduler) recordSchedulingFailure(podInfo *framework.PodInfo, err error, reason string, message string) {
  332. sched.Error(podInfo, err)
  333. pod := podInfo.Pod
  334. sched.Recorder.Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", message)
  335. if err := sched.podConditionUpdater.update(pod, &v1.PodCondition{
  336. Type: v1.PodScheduled,
  337. Status: v1.ConditionFalse,
  338. Reason: reason,
  339. Message: err.Error(),
  340. }); err != nil {
  341. klog.Errorf("Error updating the condition of the pod %s/%s: %v", pod.Namespace, pod.Name, err)
  342. }
  343. }
  344. // preempt tries to create room for a pod that has failed to schedule, by preempting lower priority pods if possible.
  345. // If it succeeds, it adds the name of the node where preemption has happened to the pod spec.
  346. // It returns the node name and an error if any.
  347. func (sched *Scheduler) preempt(ctx context.Context, state *framework.CycleState, fwk framework.Framework, preemptor *v1.Pod, scheduleErr error) (string, error) {
  348. preemptor, err := sched.podPreemptor.getUpdatedPod(preemptor)
  349. if err != nil {
  350. klog.Errorf("Error getting the updated preemptor pod object: %v", err)
  351. return "", err
  352. }
  353. node, victims, nominatedPodsToClear, err := sched.Algorithm.Preempt(ctx, state, preemptor, scheduleErr)
  354. if err != nil {
  355. klog.Errorf("Error preempting victims to make room for %v/%v: %v", preemptor.Namespace, preemptor.Name, err)
  356. return "", err
  357. }
  358. var nodeName = ""
  359. if node != nil {
  360. nodeName = node.Name
  361. // Update the scheduling queue with the nominated pod information. Without
  362. // this, there would be a race condition between the next scheduling cycle
  363. // and the time the scheduler receives a Pod Update for the nominated pod.
  364. sched.SchedulingQueue.UpdateNominatedPodForNode(preemptor, nodeName)
  365. // Make a call to update nominated node name of the pod on the API server.
  366. err = sched.podPreemptor.setNominatedNodeName(preemptor, nodeName)
  367. if err != nil {
  368. klog.Errorf("Error in preemption process. Cannot set 'NominatedPod' on pod %v/%v: %v", preemptor.Namespace, preemptor.Name, err)
  369. sched.SchedulingQueue.DeleteNominatedPodIfExists(preemptor)
  370. return "", err
  371. }
  372. for _, victim := range victims {
  373. if err := sched.podPreemptor.deletePod(victim); err != nil {
  374. klog.Errorf("Error preempting pod %v/%v: %v", victim.Namespace, victim.Name, err)
  375. return "", err
  376. }
  377. // If the victim is a WaitingPod, send a reject message to the PermitPlugin
  378. if waitingPod := fwk.GetWaitingPod(victim.UID); waitingPod != nil {
  379. waitingPod.Reject("preempted")
  380. }
  381. sched.Recorder.Eventf(victim, preemptor, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v", preemptor.Namespace, preemptor.Name, nodeName)
  382. }
  383. metrics.PreemptionVictims.Observe(float64(len(victims)))
  384. }
  385. // Clearing nominated pods should happen outside of "if node != nil". Node could
  386. // be nil when a pod with nominated node name is eligible to preempt again,
  387. // but preemption logic does not find any node for it. In that case Preempt()
  388. // function of generic_scheduler.go returns the pod itself for removal of
  389. // the 'NominatedPod' field.
  390. for _, p := range nominatedPodsToClear {
  391. rErr := sched.podPreemptor.removeNominatedNodeName(p)
  392. if rErr != nil {
  393. klog.Errorf("Cannot remove 'NominatedPod' field of pod: %v", rErr)
  394. // We do not return as this error is not critical.
  395. }
  396. }
  397. return nodeName, err
  398. }
  399. // bindVolumes will make the API update with the assumed bindings and wait until
  400. // the PV controller has completely finished the binding operation.
  401. //
  402. // If binding errors, times out or gets undone, then an error will be returned to
  403. // retry scheduling.
  404. func (sched *Scheduler) bindVolumes(assumed *v1.Pod) error {
  405. klog.V(5).Infof("Trying to bind volumes for pod \"%v/%v\"", assumed.Namespace, assumed.Name)
  406. err := sched.VolumeBinder.Binder.BindPodVolumes(assumed)
  407. if err != nil {
  408. klog.V(1).Infof("Failed to bind volumes for pod \"%v/%v\": %v", assumed.Namespace, assumed.Name, err)
  409. // Unassume the Pod and retry scheduling
  410. if forgetErr := sched.SchedulerCache.ForgetPod(assumed); forgetErr != nil {
  411. klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
  412. }
  413. return err
  414. }
  415. klog.V(5).Infof("Success binding volumes for pod \"%v/%v\"", assumed.Namespace, assumed.Name)
  416. return nil
  417. }
  418. // assume signals to the cache that a pod is already in the cache, so that binding can be asynchronous.
  419. // assume modifies `assumed`.
  420. func (sched *Scheduler) assume(assumed *v1.Pod, host string) error {
  421. // Optimistically assume that the binding will succeed and send it to apiserver
  422. // in the background.
  423. // If the binding fails, scheduler will release resources allocated to assumed pod
  424. // immediately.
  425. assumed.Spec.NodeName = host
  426. if err := sched.SchedulerCache.AssumePod(assumed); err != nil {
  427. klog.Errorf("scheduler cache AssumePod failed: %v", err)
  428. return err
  429. }
  430. // if "assumed" is a nominated pod, we should remove it from internal cache
  431. if sched.SchedulingQueue != nil {
  432. sched.SchedulingQueue.DeleteNominatedPodIfExists(assumed)
  433. }
  434. return nil
  435. }
  436. // bind binds a pod to a given node defined in a binding object.
  437. // The precedence for binding is: (1) extenders and (2) framework plugins.
  438. // We expect this to run asynchronously, so we handle binding metrics internally.
  439. func (sched *Scheduler) bind(ctx context.Context, assumed *v1.Pod, targetNode string, state *framework.CycleState) (err error) {
  440. start := time.Now()
  441. defer func() {
  442. sched.finishBinding(assumed, targetNode, start, err)
  443. }()
  444. bound, err := sched.extendersBinding(assumed, targetNode)
  445. if bound {
  446. return err
  447. }
  448. bindStatus := sched.Framework.RunBindPlugins(ctx, state, assumed, targetNode)
  449. if bindStatus.IsSuccess() {
  450. return nil
  451. }
  452. if bindStatus.Code() == framework.Error {
  453. return bindStatus.AsError()
  454. }
  455. return fmt.Errorf("bind status: %s, %v", bindStatus.Code().String(), bindStatus.Message())
  456. }
  457. // TODO(#87159): Move this to a Plugin.
  458. func (sched *Scheduler) extendersBinding(pod *v1.Pod, node string) (bool, error) {
  459. for _, extender := range sched.Algorithm.Extenders() {
  460. if !extender.IsBinder() || !extender.IsInterested(pod) {
  461. continue
  462. }
  463. return true, extender.Bind(&v1.Binding{
  464. ObjectMeta: metav1.ObjectMeta{Namespace: pod.Namespace, Name: pod.Name, UID: pod.UID},
  465. Target: v1.ObjectReference{Kind: "Node", Name: node},
  466. })
  467. }
  468. return false, nil
  469. }
  470. func (sched *Scheduler) finishBinding(assumed *v1.Pod, targetNode string, start time.Time, err error) {
  471. if finErr := sched.SchedulerCache.FinishBinding(assumed); finErr != nil {
  472. klog.Errorf("scheduler cache FinishBinding failed: %v", finErr)
  473. }
  474. if err != nil {
  475. klog.V(1).Infof("Failed to bind pod: %v/%v", assumed.Namespace, assumed.Name)
  476. if err := sched.SchedulerCache.ForgetPod(assumed); err != nil {
  477. klog.Errorf("scheduler cache ForgetPod failed: %v", err)
  478. }
  479. return
  480. }
  481. metrics.BindingLatency.Observe(metrics.SinceInSeconds(start))
  482. metrics.DeprecatedSchedulingDuration.WithLabelValues(metrics.Binding).Observe(metrics.SinceInSeconds(start))
  483. sched.Recorder.Eventf(assumed, nil, v1.EventTypeNormal, "Scheduled", "Binding", "Successfully assigned %v/%v to %v", assumed.Namespace, assumed.Name, targetNode)
  484. }
  485. // scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting.
  486. func (sched *Scheduler) scheduleOne(ctx context.Context) {
  487. fwk := sched.Framework
  488. podInfo := sched.NextPod()
  489. // pod could be nil when schedulerQueue is closed
  490. if podInfo == nil || podInfo.Pod == nil {
  491. return
  492. }
  493. pod := podInfo.Pod
  494. if sched.skipPodSchedule(pod) {
  495. return
  496. }
  497. klog.V(3).Infof("Attempting to schedule pod: %v/%v", pod.Namespace, pod.Name)
  498. // Synchronously attempt to find a fit for the pod.
  499. start := time.Now()
  500. state := framework.NewCycleState()
  501. state.SetRecordPluginMetrics(rand.Intn(100) < pluginMetricsSamplePercent)
  502. schedulingCycleCtx, cancel := context.WithCancel(ctx)
  503. defer cancel()
  504. scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, state, pod)
  505. if err != nil {
  506. sched.recordSchedulingFailure(podInfo.DeepCopy(), err, v1.PodReasonUnschedulable, err.Error())
  507. // Schedule() may have failed because the pod would not fit on any host, so we try to
  508. // preempt, with the expectation that the next time the pod is tried for scheduling it
  509. // will fit due to the preemption. It is also possible that a different pod will schedule
  510. // into the resources that were preempted, but this is harmless.
  511. if fitError, ok := err.(*core.FitError); ok {
  512. if sched.DisablePreemption {
  513. klog.V(3).Infof("Pod priority feature is not enabled or preemption is disabled by scheduler configuration." +
  514. " No preemption is performed.")
  515. } else {
  516. preemptionStartTime := time.Now()
  517. sched.preempt(schedulingCycleCtx, state, fwk, pod, fitError)
  518. metrics.PreemptionAttempts.Inc()
  519. metrics.SchedulingAlgorithmPreemptionEvaluationDuration.Observe(metrics.SinceInSeconds(preemptionStartTime))
  520. metrics.DeprecatedSchedulingDuration.WithLabelValues(metrics.PreemptionEvaluation).Observe(metrics.SinceInSeconds(preemptionStartTime))
  521. }
  522. // Pod did not fit anywhere, so it is counted as a failure. If preemption
  523. // succeeds, the pod should get counted as a success the next time we try to
  524. // schedule it. (hopefully)
  525. metrics.PodScheduleFailures.Inc()
  526. } else {
  527. klog.Errorf("error selecting node for pod: %v", err)
  528. metrics.PodScheduleErrors.Inc()
  529. }
  530. return
  531. }
  532. metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start))
  533. // Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet.
  534. // This allows us to keep scheduling without waiting on binding to occur.
  535. assumedPodInfo := podInfo.DeepCopy()
  536. assumedPod := assumedPodInfo.Pod
  537. // Assume volumes first before assuming the pod.
  538. //
  539. // If all volumes are completely bound, then allBound is true and binding will be skipped.
  540. //
  541. // Otherwise, binding of volumes is started after the pod is assumed, but before pod binding.
  542. //
  543. // This function modifies 'assumedPod' if volume binding is required.
  544. allBound, err := sched.VolumeBinder.Binder.AssumePodVolumes(assumedPod, scheduleResult.SuggestedHost)
  545. if err != nil {
  546. sched.recordSchedulingFailure(assumedPodInfo, err, SchedulerError,
  547. fmt.Sprintf("AssumePodVolumes failed: %v", err))
  548. metrics.PodScheduleErrors.Inc()
  549. return
  550. }
  551. // Run "reserve" plugins.
  552. if sts := fwk.RunReservePlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
  553. sched.recordSchedulingFailure(assumedPodInfo, sts.AsError(), SchedulerError, sts.Message())
  554. metrics.PodScheduleErrors.Inc()
  555. return
  556. }
  557. // assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost
  558. err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
  559. if err != nil {
  560. // This is most probably result of a BUG in retrying logic.
  561. // We report an error here so that pod scheduling can be retried.
  562. // This relies on the fact that Error will check if the pod has been bound
  563. // to a node and if so will not add it back to the unscheduled pods queue
  564. // (otherwise this would cause an infinite loop).
  565. sched.recordSchedulingFailure(assumedPodInfo, err, SchedulerError, fmt.Sprintf("AssumePod failed: %v", err))
  566. metrics.PodScheduleErrors.Inc()
  567. // trigger un-reserve plugins to clean up state associated with the reserved Pod
  568. fwk.RunUnreservePlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
  569. return
  570. }
  571. // Run "permit" plugins.
  572. runPermitStatus := fwk.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
  573. if runPermitStatus.Code() != framework.Wait && !runPermitStatus.IsSuccess() {
  574. var reason string
  575. if runPermitStatus.IsUnschedulable() {
  576. metrics.PodScheduleFailures.Inc()
  577. reason = v1.PodReasonUnschedulable
  578. } else {
  579. metrics.PodScheduleErrors.Inc()
  580. reason = SchedulerError
  581. }
  582. if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil {
  583. klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
  584. }
  585. // One of the plugins returned status different than success or wait.
  586. fwk.RunUnreservePlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
  587. sched.recordSchedulingFailure(assumedPodInfo, runPermitStatus.AsError(), reason, runPermitStatus.Message())
  588. return
  589. }
  590. // bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
  591. go func() {
  592. bindingCycleCtx, cancel := context.WithCancel(ctx)
  593. defer cancel()
  594. metrics.SchedulerGoroutines.WithLabelValues("binding").Inc()
  595. defer metrics.SchedulerGoroutines.WithLabelValues("binding").Dec()
  596. waitOnPermitStatus := fwk.WaitOnPermit(bindingCycleCtx, assumedPod)
  597. if !waitOnPermitStatus.IsSuccess() {
  598. var reason string
  599. if waitOnPermitStatus.IsUnschedulable() {
  600. metrics.PodScheduleFailures.Inc()
  601. reason = v1.PodReasonUnschedulable
  602. } else {
  603. metrics.PodScheduleErrors.Inc()
  604. reason = SchedulerError
  605. }
  606. if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil {
  607. klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
  608. }
  609. // trigger un-reserve plugins to clean up state associated with the reserved Pod
  610. fwk.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
  611. sched.recordSchedulingFailure(assumedPodInfo, waitOnPermitStatus.AsError(), reason, waitOnPermitStatus.Message())
  612. return
  613. }
  614. // Bind volumes first before Pod
  615. if !allBound {
  616. err := sched.bindVolumes(assumedPod)
  617. if err != nil {
  618. sched.recordSchedulingFailure(assumedPodInfo, err, "VolumeBindingFailed", err.Error())
  619. metrics.PodScheduleErrors.Inc()
  620. // trigger un-reserve plugins to clean up state associated with the reserved Pod
  621. fwk.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
  622. return
  623. }
  624. }
  625. // Run "prebind" plugins.
  626. preBindStatus := fwk.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
  627. if !preBindStatus.IsSuccess() {
  628. var reason string
  629. metrics.PodScheduleErrors.Inc()
  630. reason = SchedulerError
  631. if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil {
  632. klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
  633. }
  634. // trigger un-reserve plugins to clean up state associated with the reserved Pod
  635. fwk.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
  636. sched.recordSchedulingFailure(assumedPodInfo, preBindStatus.AsError(), reason, preBindStatus.Message())
  637. return
  638. }
  639. err := sched.bind(bindingCycleCtx, assumedPod, scheduleResult.SuggestedHost, state)
  640. metrics.E2eSchedulingLatency.Observe(metrics.SinceInSeconds(start))
  641. if err != nil {
  642. metrics.PodScheduleErrors.Inc()
  643. // trigger un-reserve plugins to clean up state associated with the reserved Pod
  644. fwk.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
  645. sched.recordSchedulingFailure(assumedPodInfo, err, SchedulerError, fmt.Sprintf("Binding rejected: %v", err))
  646. } else {
  647. // Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2.
  648. if klog.V(2) {
  649. klog.Infof("pod %v/%v is bound successfully on node %q, %d nodes evaluated, %d nodes were found feasible.", assumedPod.Namespace, assumedPod.Name, scheduleResult.SuggestedHost, scheduleResult.EvaluatedNodes, scheduleResult.FeasibleNodes)
  650. }
  651. metrics.PodScheduleSuccesses.Inc()
  652. metrics.PodSchedulingAttempts.Observe(float64(podInfo.Attempts))
  653. metrics.PodSchedulingDuration.Observe(metrics.SinceInSeconds(podInfo.InitialAttemptTimestamp))
  654. // Run "postbind" plugins.
  655. fwk.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
  656. }
  657. }()
  658. }
  659. // skipPodSchedule returns true if we could skip scheduling the pod for specified cases.
  660. func (sched *Scheduler) skipPodSchedule(pod *v1.Pod) bool {
  661. // Case 1: pod is being deleted.
  662. if pod.DeletionTimestamp != nil {
  663. sched.Recorder.Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
  664. klog.V(3).Infof("Skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
  665. return true
  666. }
  667. // Case 2: pod has been assumed and pod updates could be skipped.
  668. // An assumed pod can be added again to the scheduling queue if it got an update event
  669. // during its previous scheduling cycle but before getting assumed.
  670. if sched.skipPodUpdate(pod) {
  671. return true
  672. }
  673. return false
  674. }
  675. type podConditionUpdaterImpl struct {
  676. Client clientset.Interface
  677. }
  678. func (p *podConditionUpdaterImpl) update(pod *v1.Pod, condition *v1.PodCondition) error {
  679. klog.V(3).Infof("Updating pod condition for %s/%s to (%s==%s, Reason=%s)", pod.Namespace, pod.Name, condition.Type, condition.Status, condition.Reason)
  680. if podutil.UpdatePodCondition(&pod.Status, condition) {
  681. _, err := p.Client.CoreV1().Pods(pod.Namespace).UpdateStatus(context.TODO(), pod, metav1.UpdateOptions{})
  682. return err
  683. }
  684. return nil
  685. }
  686. type podPreemptorImpl struct {
  687. Client clientset.Interface
  688. }
  689. func (p *podPreemptorImpl) getUpdatedPod(pod *v1.Pod) (*v1.Pod, error) {
  690. return p.Client.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{})
  691. }
  692. func (p *podPreemptorImpl) deletePod(pod *v1.Pod) error {
  693. return p.Client.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, &metav1.DeleteOptions{})
  694. }
  695. func (p *podPreemptorImpl) setNominatedNodeName(pod *v1.Pod, nominatedNodeName string) error {
  696. podCopy := pod.DeepCopy()
  697. podCopy.Status.NominatedNodeName = nominatedNodeName
  698. _, err := p.Client.CoreV1().Pods(pod.Namespace).UpdateStatus(context.TODO(), podCopy, metav1.UpdateOptions{})
  699. return err
  700. }
  701. func (p *podPreemptorImpl) removeNominatedNodeName(pod *v1.Pod) error {
  702. if len(pod.Status.NominatedNodeName) == 0 {
  703. return nil
  704. }
  705. return p.setNominatedNodeName(pod, "")
  706. }
  707. func defaultAlgorithmSourceProviderName() *string {
  708. provider := schedulerapi.SchedulerDefaultProviderName
  709. return &provider
  710. }