eventhandlers.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498
  1. /*
  2. Copyright 2019 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package scheduler
  14. import (
  15. "fmt"
  16. "reflect"
  17. "k8s.io/klog"
  18. v1 "k8s.io/api/core/v1"
  19. storagev1 "k8s.io/api/storage/v1"
  20. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  21. utilfeature "k8s.io/apiserver/pkg/util/feature"
  22. "k8s.io/client-go/informers"
  23. coreinformers "k8s.io/client-go/informers/core/v1"
  24. "k8s.io/client-go/tools/cache"
  25. "k8s.io/kubernetes/pkg/features"
  26. "k8s.io/kubernetes/pkg/scheduler/internal/queue"
  27. )
  28. func (sched *Scheduler) onPvAdd(obj interface{}) {
  29. // Pods created when there are no PVs available will be stuck in
  30. // unschedulable queue. But unbound PVs created for static provisioning and
  31. // delay binding storage class are skipped in PV controller dynamic
  32. // provisioning and binding process, will not trigger events to schedule pod
  33. // again. So we need to move pods to active queue on PV add for this
  34. // scenario.
  35. sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.PvAdd)
  36. }
  37. func (sched *Scheduler) onPvUpdate(old, new interface{}) {
  38. // Scheduler.bindVolumesWorker may fail to update assumed pod volume
  39. // bindings due to conflicts if PVs are updated by PV controller or other
  40. // parties, then scheduler will add pod back to unschedulable queue. We
  41. // need to move pods to active queue on PV update for this scenario.
  42. sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.PvUpdate)
  43. }
  44. func (sched *Scheduler) onPvcAdd(obj interface{}) {
  45. sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.PvcAdd)
  46. }
  47. func (sched *Scheduler) onPvcUpdate(old, new interface{}) {
  48. sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.PvcUpdate)
  49. }
  50. func (sched *Scheduler) onStorageClassAdd(obj interface{}) {
  51. sc, ok := obj.(*storagev1.StorageClass)
  52. if !ok {
  53. klog.Errorf("cannot convert to *storagev1.StorageClass: %v", obj)
  54. return
  55. }
  56. // CheckVolumeBindingPred fails if pod has unbound immediate PVCs. If these
  57. // PVCs have specified StorageClass name, creating StorageClass objects
  58. // with late binding will cause predicates to pass, so we need to move pods
  59. // to active queue.
  60. // We don't need to invalidate cached results because results will not be
  61. // cached for pod that has unbound immediate PVCs.
  62. if sc.VolumeBindingMode != nil && *sc.VolumeBindingMode == storagev1.VolumeBindingWaitForFirstConsumer {
  63. sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.StorageClassAdd)
  64. }
  65. }
  66. func (sched *Scheduler) onServiceAdd(obj interface{}) {
  67. sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.ServiceAdd)
  68. }
  69. func (sched *Scheduler) onServiceUpdate(oldObj interface{}, newObj interface{}) {
  70. sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.ServiceUpdate)
  71. }
  72. func (sched *Scheduler) onServiceDelete(obj interface{}) {
  73. sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.ServiceDelete)
  74. }
  75. func (sched *Scheduler) addNodeToCache(obj interface{}) {
  76. node, ok := obj.(*v1.Node)
  77. if !ok {
  78. klog.Errorf("cannot convert to *v1.Node: %v", obj)
  79. return
  80. }
  81. if err := sched.SchedulerCache.AddNode(node); err != nil {
  82. klog.Errorf("scheduler cache AddNode failed: %v", err)
  83. }
  84. klog.V(3).Infof("add event for node %q", node.Name)
  85. sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.NodeAdd)
  86. }
  87. func (sched *Scheduler) updateNodeInCache(oldObj, newObj interface{}) {
  88. oldNode, ok := oldObj.(*v1.Node)
  89. if !ok {
  90. klog.Errorf("cannot convert oldObj to *v1.Node: %v", oldObj)
  91. return
  92. }
  93. newNode, ok := newObj.(*v1.Node)
  94. if !ok {
  95. klog.Errorf("cannot convert newObj to *v1.Node: %v", newObj)
  96. return
  97. }
  98. if err := sched.SchedulerCache.UpdateNode(oldNode, newNode); err != nil {
  99. klog.Errorf("scheduler cache UpdateNode failed: %v", err)
  100. }
  101. // Only activate unschedulable pods if the node became more schedulable.
  102. // We skip the node property comparison when there is no unschedulable pods in the queue
  103. // to save processing cycles. We still trigger a move to active queue to cover the case
  104. // that a pod being processed by the scheduler is determined unschedulable. We want this
  105. // pod to be reevaluated when a change in the cluster happens.
  106. if sched.SchedulingQueue.NumUnschedulablePods() == 0 {
  107. sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.Unknown)
  108. } else if event := nodeSchedulingPropertiesChange(newNode, oldNode); event != "" {
  109. sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(event)
  110. }
  111. }
  112. func (sched *Scheduler) deleteNodeFromCache(obj interface{}) {
  113. var node *v1.Node
  114. switch t := obj.(type) {
  115. case *v1.Node:
  116. node = t
  117. case cache.DeletedFinalStateUnknown:
  118. var ok bool
  119. node, ok = t.Obj.(*v1.Node)
  120. if !ok {
  121. klog.Errorf("cannot convert to *v1.Node: %v", t.Obj)
  122. return
  123. }
  124. default:
  125. klog.Errorf("cannot convert to *v1.Node: %v", t)
  126. return
  127. }
  128. klog.V(3).Infof("delete event for node %q", node.Name)
  129. // NOTE: Updates must be written to scheduler cache before invalidating
  130. // equivalence cache, because we could snapshot equivalence cache after the
  131. // invalidation and then snapshot the cache itself. If the cache is
  132. // snapshotted before updates are written, we would update equivalence
  133. // cache with stale information which is based on snapshot of old cache.
  134. if err := sched.SchedulerCache.RemoveNode(node); err != nil {
  135. klog.Errorf("scheduler cache RemoveNode failed: %v", err)
  136. }
  137. }
  138. func (sched *Scheduler) onCSINodeAdd(obj interface{}) {
  139. sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.CSINodeAdd)
  140. }
  141. func (sched *Scheduler) onCSINodeUpdate(oldObj, newObj interface{}) {
  142. sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.CSINodeUpdate)
  143. }
  144. func (sched *Scheduler) addPodToSchedulingQueue(obj interface{}) {
  145. pod := obj.(*v1.Pod)
  146. klog.V(3).Infof("add event for unscheduled pod %s/%s", pod.Namespace, pod.Name)
  147. if err := sched.SchedulingQueue.Add(pod); err != nil {
  148. utilruntime.HandleError(fmt.Errorf("unable to queue %T: %v", obj, err))
  149. }
  150. }
  151. func (sched *Scheduler) updatePodInSchedulingQueue(oldObj, newObj interface{}) {
  152. pod := newObj.(*v1.Pod)
  153. if sched.skipPodUpdate(pod) {
  154. return
  155. }
  156. if err := sched.SchedulingQueue.Update(oldObj.(*v1.Pod), pod); err != nil {
  157. utilruntime.HandleError(fmt.Errorf("unable to update %T: %v", newObj, err))
  158. }
  159. }
  160. func (sched *Scheduler) deletePodFromSchedulingQueue(obj interface{}) {
  161. var pod *v1.Pod
  162. switch t := obj.(type) {
  163. case *v1.Pod:
  164. pod = obj.(*v1.Pod)
  165. case cache.DeletedFinalStateUnknown:
  166. var ok bool
  167. pod, ok = t.Obj.(*v1.Pod)
  168. if !ok {
  169. utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched))
  170. return
  171. }
  172. default:
  173. utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj))
  174. return
  175. }
  176. klog.V(3).Infof("delete event for unscheduled pod %s/%s", pod.Namespace, pod.Name)
  177. if err := sched.SchedulingQueue.Delete(pod); err != nil {
  178. utilruntime.HandleError(fmt.Errorf("unable to dequeue %T: %v", obj, err))
  179. }
  180. if sched.VolumeBinder != nil {
  181. // Volume binder only wants to keep unassigned pods
  182. sched.VolumeBinder.DeletePodBindings(pod)
  183. }
  184. sched.Framework.RejectWaitingPod(pod.UID)
  185. }
  186. func (sched *Scheduler) addPodToCache(obj interface{}) {
  187. pod, ok := obj.(*v1.Pod)
  188. if !ok {
  189. klog.Errorf("cannot convert to *v1.Pod: %v", obj)
  190. return
  191. }
  192. klog.V(3).Infof("add event for scheduled pod %s/%s ", pod.Namespace, pod.Name)
  193. if err := sched.SchedulerCache.AddPod(pod); err != nil {
  194. klog.Errorf("scheduler cache AddPod failed: %v", err)
  195. }
  196. sched.SchedulingQueue.AssignedPodAdded(pod)
  197. }
  198. func (sched *Scheduler) updatePodInCache(oldObj, newObj interface{}) {
  199. oldPod, ok := oldObj.(*v1.Pod)
  200. if !ok {
  201. klog.Errorf("cannot convert oldObj to *v1.Pod: %v", oldObj)
  202. return
  203. }
  204. newPod, ok := newObj.(*v1.Pod)
  205. if !ok {
  206. klog.Errorf("cannot convert newObj to *v1.Pod: %v", newObj)
  207. return
  208. }
  209. // NOTE: Updates must be written to scheduler cache before invalidating
  210. // equivalence cache, because we could snapshot equivalence cache after the
  211. // invalidation and then snapshot the cache itself. If the cache is
  212. // snapshotted before updates are written, we would update equivalence
  213. // cache with stale information which is based on snapshot of old cache.
  214. if err := sched.SchedulerCache.UpdatePod(oldPod, newPod); err != nil {
  215. klog.Errorf("scheduler cache UpdatePod failed: %v", err)
  216. }
  217. sched.SchedulingQueue.AssignedPodUpdated(newPod)
  218. }
  219. func (sched *Scheduler) deletePodFromCache(obj interface{}) {
  220. var pod *v1.Pod
  221. switch t := obj.(type) {
  222. case *v1.Pod:
  223. pod = t
  224. case cache.DeletedFinalStateUnknown:
  225. var ok bool
  226. pod, ok = t.Obj.(*v1.Pod)
  227. if !ok {
  228. klog.Errorf("cannot convert to *v1.Pod: %v", t.Obj)
  229. return
  230. }
  231. default:
  232. klog.Errorf("cannot convert to *v1.Pod: %v", t)
  233. return
  234. }
  235. klog.V(3).Infof("delete event for scheduled pod %s/%s ", pod.Namespace, pod.Name)
  236. // NOTE: Updates must be written to scheduler cache before invalidating
  237. // equivalence cache, because we could snapshot equivalence cache after the
  238. // invalidation and then snapshot the cache itself. If the cache is
  239. // snapshotted before updates are written, we would update equivalence
  240. // cache with stale information which is based on snapshot of old cache.
  241. if err := sched.SchedulerCache.RemovePod(pod); err != nil {
  242. klog.Errorf("scheduler cache RemovePod failed: %v", err)
  243. }
  244. sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.AssignedPodDelete)
  245. }
  246. // assignedPod selects pods that are assigned (scheduled and running).
  247. func assignedPod(pod *v1.Pod) bool {
  248. return len(pod.Spec.NodeName) != 0
  249. }
  250. // responsibleForPod returns true if the pod has asked to be scheduled by the given scheduler.
  251. func responsibleForPod(pod *v1.Pod, schedulerName string) bool {
  252. return schedulerName == pod.Spec.SchedulerName
  253. }
  254. // skipPodUpdate checks whether the specified pod update should be ignored.
  255. // This function will return true if
  256. // - The pod has already been assumed, AND
  257. // - The pod has only its ResourceVersion, Spec.NodeName and/or Annotations
  258. // updated.
  259. func (sched *Scheduler) skipPodUpdate(pod *v1.Pod) bool {
  260. // Non-assumed pods should never be skipped.
  261. isAssumed, err := sched.SchedulerCache.IsAssumedPod(pod)
  262. if err != nil {
  263. utilruntime.HandleError(fmt.Errorf("failed to check whether pod %s/%s is assumed: %v", pod.Namespace, pod.Name, err))
  264. return false
  265. }
  266. if !isAssumed {
  267. return false
  268. }
  269. // Gets the assumed pod from the cache.
  270. assumedPod, err := sched.SchedulerCache.GetPod(pod)
  271. if err != nil {
  272. utilruntime.HandleError(fmt.Errorf("failed to get assumed pod %s/%s from cache: %v", pod.Namespace, pod.Name, err))
  273. return false
  274. }
  275. // Compares the assumed pod in the cache with the pod update. If they are
  276. // equal (with certain fields excluded), this pod update will be skipped.
  277. f := func(pod *v1.Pod) *v1.Pod {
  278. p := pod.DeepCopy()
  279. // ResourceVersion must be excluded because each object update will
  280. // have a new resource version.
  281. p.ResourceVersion = ""
  282. // Spec.NodeName must be excluded because the pod assumed in the cache
  283. // is expected to have a node assigned while the pod update may nor may
  284. // not have this field set.
  285. p.Spec.NodeName = ""
  286. // Annotations must be excluded for the reasons described in
  287. // https://github.com/kubernetes/kubernetes/issues/52914.
  288. p.Annotations = nil
  289. return p
  290. }
  291. assumedPodCopy, podCopy := f(assumedPod), f(pod)
  292. if !reflect.DeepEqual(assumedPodCopy, podCopy) {
  293. return false
  294. }
  295. klog.V(3).Infof("Skipping pod %s/%s update", pod.Namespace, pod.Name)
  296. return true
  297. }
  298. // AddAllEventHandlers is a helper function used in tests and in Scheduler
  299. // to add event handlers for various informers.
  300. func AddAllEventHandlers(
  301. sched *Scheduler,
  302. schedulerName string,
  303. informerFactory informers.SharedInformerFactory,
  304. podInformer coreinformers.PodInformer,
  305. ) {
  306. // scheduled pod cache
  307. podInformer.Informer().AddEventHandler(
  308. cache.FilteringResourceEventHandler{
  309. FilterFunc: func(obj interface{}) bool {
  310. switch t := obj.(type) {
  311. case *v1.Pod:
  312. return assignedPod(t)
  313. case cache.DeletedFinalStateUnknown:
  314. if pod, ok := t.Obj.(*v1.Pod); ok {
  315. return assignedPod(pod)
  316. }
  317. utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched))
  318. return false
  319. default:
  320. utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj))
  321. return false
  322. }
  323. },
  324. Handler: cache.ResourceEventHandlerFuncs{
  325. AddFunc: sched.addPodToCache,
  326. UpdateFunc: sched.updatePodInCache,
  327. DeleteFunc: sched.deletePodFromCache,
  328. },
  329. },
  330. )
  331. // unscheduled pod queue
  332. podInformer.Informer().AddEventHandler(
  333. cache.FilteringResourceEventHandler{
  334. FilterFunc: func(obj interface{}) bool {
  335. switch t := obj.(type) {
  336. case *v1.Pod:
  337. return !assignedPod(t) && responsibleForPod(t, schedulerName)
  338. case cache.DeletedFinalStateUnknown:
  339. if pod, ok := t.Obj.(*v1.Pod); ok {
  340. return !assignedPod(pod) && responsibleForPod(pod, schedulerName)
  341. }
  342. utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched))
  343. return false
  344. default:
  345. utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj))
  346. return false
  347. }
  348. },
  349. Handler: cache.ResourceEventHandlerFuncs{
  350. AddFunc: sched.addPodToSchedulingQueue,
  351. UpdateFunc: sched.updatePodInSchedulingQueue,
  352. DeleteFunc: sched.deletePodFromSchedulingQueue,
  353. },
  354. },
  355. )
  356. informerFactory.Core().V1().Nodes().Informer().AddEventHandler(
  357. cache.ResourceEventHandlerFuncs{
  358. AddFunc: sched.addNodeToCache,
  359. UpdateFunc: sched.updateNodeInCache,
  360. DeleteFunc: sched.deleteNodeFromCache,
  361. },
  362. )
  363. if utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) {
  364. informerFactory.Storage().V1().CSINodes().Informer().AddEventHandler(
  365. cache.ResourceEventHandlerFuncs{
  366. AddFunc: sched.onCSINodeAdd,
  367. UpdateFunc: sched.onCSINodeUpdate,
  368. },
  369. )
  370. }
  371. // On add and delete of PVs, it will affect equivalence cache items
  372. // related to persistent volume
  373. informerFactory.Core().V1().PersistentVolumes().Informer().AddEventHandler(
  374. cache.ResourceEventHandlerFuncs{
  375. // MaxPDVolumeCountPredicate: since it relies on the counts of PV.
  376. AddFunc: sched.onPvAdd,
  377. UpdateFunc: sched.onPvUpdate,
  378. },
  379. )
  380. // This is for MaxPDVolumeCountPredicate: add/delete PVC will affect counts of PV when it is bound.
  381. informerFactory.Core().V1().PersistentVolumeClaims().Informer().AddEventHandler(
  382. cache.ResourceEventHandlerFuncs{
  383. AddFunc: sched.onPvcAdd,
  384. UpdateFunc: sched.onPvcUpdate,
  385. },
  386. )
  387. // This is for ServiceAffinity: affected by the selector of the service is updated.
  388. // Also, if new service is added, equivalence cache will also become invalid since
  389. // existing pods may be "captured" by this service and change this predicate result.
  390. informerFactory.Core().V1().Services().Informer().AddEventHandler(
  391. cache.ResourceEventHandlerFuncs{
  392. AddFunc: sched.onServiceAdd,
  393. UpdateFunc: sched.onServiceUpdate,
  394. DeleteFunc: sched.onServiceDelete,
  395. },
  396. )
  397. informerFactory.Storage().V1().StorageClasses().Informer().AddEventHandler(
  398. cache.ResourceEventHandlerFuncs{
  399. AddFunc: sched.onStorageClassAdd,
  400. },
  401. )
  402. }
  403. func nodeSchedulingPropertiesChange(newNode *v1.Node, oldNode *v1.Node) string {
  404. if nodeSpecUnschedulableChanged(newNode, oldNode) {
  405. return queue.NodeSpecUnschedulableChange
  406. }
  407. if nodeAllocatableChanged(newNode, oldNode) {
  408. return queue.NodeAllocatableChange
  409. }
  410. if nodeLabelsChanged(newNode, oldNode) {
  411. return queue.NodeLabelChange
  412. }
  413. if nodeTaintsChanged(newNode, oldNode) {
  414. return queue.NodeTaintChange
  415. }
  416. if nodeConditionsChanged(newNode, oldNode) {
  417. return queue.NodeConditionChange
  418. }
  419. return ""
  420. }
  421. func nodeAllocatableChanged(newNode *v1.Node, oldNode *v1.Node) bool {
  422. return !reflect.DeepEqual(oldNode.Status.Allocatable, newNode.Status.Allocatable)
  423. }
  424. func nodeLabelsChanged(newNode *v1.Node, oldNode *v1.Node) bool {
  425. return !reflect.DeepEqual(oldNode.GetLabels(), newNode.GetLabels())
  426. }
  427. func nodeTaintsChanged(newNode *v1.Node, oldNode *v1.Node) bool {
  428. return !reflect.DeepEqual(newNode.Spec.Taints, oldNode.Spec.Taints)
  429. }
  430. func nodeConditionsChanged(newNode *v1.Node, oldNode *v1.Node) bool {
  431. strip := func(conditions []v1.NodeCondition) map[v1.NodeConditionType]v1.ConditionStatus {
  432. conditionStatuses := make(map[v1.NodeConditionType]v1.ConditionStatus, len(conditions))
  433. for i := range conditions {
  434. conditionStatuses[conditions[i].Type] = conditions[i].Status
  435. }
  436. return conditionStatuses
  437. }
  438. return !reflect.DeepEqual(strip(oldNode.Status.Conditions), strip(newNode.Status.Conditions))
  439. }
  440. func nodeSpecUnschedulableChanged(newNode *v1.Node, oldNode *v1.Node) bool {
  441. return newNode.Spec.Unschedulable != oldNode.Spec.Unschedulable && newNode.Spec.Unschedulable == false
  442. }