eventhandlers.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470
  1. /*
  2. Copyright 2019 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package scheduler
  14. import (
  15. "fmt"
  16. "k8s.io/klog"
  17. "reflect"
  18. "k8s.io/api/core/v1"
  19. storagev1 "k8s.io/api/storage/v1"
  20. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  21. coreinformers "k8s.io/client-go/informers/core/v1"
  22. storageinformers "k8s.io/client-go/informers/storage/v1"
  23. "k8s.io/client-go/tools/cache"
  24. )
  25. func (sched *Scheduler) onPvAdd(obj interface{}) {
  26. // Pods created when there are no PVs available will be stuck in
  27. // unschedulable queue. But unbound PVs created for static provisioning and
  28. // delay binding storage class are skipped in PV controller dynamic
  29. // provisioning and binding process, will not trigger events to schedule pod
  30. // again. So we need to move pods to active queue on PV add for this
  31. // scenario.
  32. sched.config.SchedulingQueue.MoveAllToActiveQueue()
  33. }
  34. func (sched *Scheduler) onPvUpdate(old, new interface{}) {
  35. // Scheduler.bindVolumesWorker may fail to update assumed pod volume
  36. // bindings due to conflicts if PVs are updated by PV controller or other
  37. // parties, then scheduler will add pod back to unschedulable queue. We
  38. // need to move pods to active queue on PV update for this scenario.
  39. sched.config.SchedulingQueue.MoveAllToActiveQueue()
  40. }
  41. func (sched *Scheduler) onPvcAdd(obj interface{}) {
  42. sched.config.SchedulingQueue.MoveAllToActiveQueue()
  43. }
  44. func (sched *Scheduler) onPvcUpdate(old, new interface{}) {
  45. sched.config.SchedulingQueue.MoveAllToActiveQueue()
  46. }
  47. func (sched *Scheduler) onStorageClassAdd(obj interface{}) {
  48. sc, ok := obj.(*storagev1.StorageClass)
  49. if !ok {
  50. klog.Errorf("cannot convert to *storagev1.StorageClass: %v", obj)
  51. return
  52. }
  53. // CheckVolumeBindingPred fails if pod has unbound immediate PVCs. If these
  54. // PVCs have specified StorageClass name, creating StorageClass objects
  55. // with late binding will cause predicates to pass, so we need to move pods
  56. // to active queue.
  57. // We don't need to invalidate cached results because results will not be
  58. // cached for pod that has unbound immediate PVCs.
  59. if sc.VolumeBindingMode != nil && *sc.VolumeBindingMode == storagev1.VolumeBindingWaitForFirstConsumer {
  60. sched.config.SchedulingQueue.MoveAllToActiveQueue()
  61. }
  62. }
  63. func (sched *Scheduler) onServiceAdd(obj interface{}) {
  64. sched.config.SchedulingQueue.MoveAllToActiveQueue()
  65. }
  66. func (sched *Scheduler) onServiceUpdate(oldObj interface{}, newObj interface{}) {
  67. sched.config.SchedulingQueue.MoveAllToActiveQueue()
  68. }
  69. func (sched *Scheduler) onServiceDelete(obj interface{}) {
  70. sched.config.SchedulingQueue.MoveAllToActiveQueue()
  71. }
  72. func (sched *Scheduler) addNodeToCache(obj interface{}) {
  73. node, ok := obj.(*v1.Node)
  74. if !ok {
  75. klog.Errorf("cannot convert to *v1.Node: %v", obj)
  76. return
  77. }
  78. if err := sched.config.SchedulerCache.AddNode(node); err != nil {
  79. klog.Errorf("scheduler cache AddNode failed: %v", err)
  80. }
  81. sched.config.SchedulingQueue.MoveAllToActiveQueue()
  82. }
  83. func (sched *Scheduler) updateNodeInCache(oldObj, newObj interface{}) {
  84. oldNode, ok := oldObj.(*v1.Node)
  85. if !ok {
  86. klog.Errorf("cannot convert oldObj to *v1.Node: %v", oldObj)
  87. return
  88. }
  89. newNode, ok := newObj.(*v1.Node)
  90. if !ok {
  91. klog.Errorf("cannot convert newObj to *v1.Node: %v", newObj)
  92. return
  93. }
  94. if err := sched.config.SchedulerCache.UpdateNode(oldNode, newNode); err != nil {
  95. klog.Errorf("scheduler cache UpdateNode failed: %v", err)
  96. }
  97. // Only activate unschedulable pods if the node became more schedulable.
  98. // We skip the node property comparison when there is no unschedulable pods in the queue
  99. // to save processing cycles. We still trigger a move to active queue to cover the case
  100. // that a pod being processed by the scheduler is determined unschedulable. We want this
  101. // pod to be reevaluated when a change in the cluster happens.
  102. if sched.config.SchedulingQueue.NumUnschedulablePods() == 0 || nodeSchedulingPropertiesChanged(newNode, oldNode) {
  103. sched.config.SchedulingQueue.MoveAllToActiveQueue()
  104. }
  105. }
  106. func (sched *Scheduler) deleteNodeFromCache(obj interface{}) {
  107. var node *v1.Node
  108. switch t := obj.(type) {
  109. case *v1.Node:
  110. node = t
  111. case cache.DeletedFinalStateUnknown:
  112. var ok bool
  113. node, ok = t.Obj.(*v1.Node)
  114. if !ok {
  115. klog.Errorf("cannot convert to *v1.Node: %v", t.Obj)
  116. return
  117. }
  118. default:
  119. klog.Errorf("cannot convert to *v1.Node: %v", t)
  120. return
  121. }
  122. // NOTE: Updates must be written to scheduler cache before invalidating
  123. // equivalence cache, because we could snapshot equivalence cache after the
  124. // invalidation and then snapshot the cache itself. If the cache is
  125. // snapshotted before updates are written, we would update equivalence
  126. // cache with stale information which is based on snapshot of old cache.
  127. if err := sched.config.SchedulerCache.RemoveNode(node); err != nil {
  128. klog.Errorf("scheduler cache RemoveNode failed: %v", err)
  129. }
  130. }
  131. func (sched *Scheduler) addPodToSchedulingQueue(obj interface{}) {
  132. if err := sched.config.SchedulingQueue.Add(obj.(*v1.Pod)); err != nil {
  133. utilruntime.HandleError(fmt.Errorf("unable to queue %T: %v", obj, err))
  134. }
  135. }
  136. func (sched *Scheduler) updatePodInSchedulingQueue(oldObj, newObj interface{}) {
  137. pod := newObj.(*v1.Pod)
  138. if sched.skipPodUpdate(pod) {
  139. return
  140. }
  141. if err := sched.config.SchedulingQueue.Update(oldObj.(*v1.Pod), pod); err != nil {
  142. utilruntime.HandleError(fmt.Errorf("unable to update %T: %v", newObj, err))
  143. }
  144. }
  145. func (sched *Scheduler) deletePodFromSchedulingQueue(obj interface{}) {
  146. var pod *v1.Pod
  147. switch t := obj.(type) {
  148. case *v1.Pod:
  149. pod = obj.(*v1.Pod)
  150. case cache.DeletedFinalStateUnknown:
  151. var ok bool
  152. pod, ok = t.Obj.(*v1.Pod)
  153. if !ok {
  154. utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched))
  155. return
  156. }
  157. default:
  158. utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj))
  159. return
  160. }
  161. if err := sched.config.SchedulingQueue.Delete(pod); err != nil {
  162. utilruntime.HandleError(fmt.Errorf("unable to dequeue %T: %v", obj, err))
  163. }
  164. if sched.config.VolumeBinder != nil {
  165. // Volume binder only wants to keep unassigned pods
  166. sched.config.VolumeBinder.DeletePodBindings(pod)
  167. }
  168. }
  169. func (sched *Scheduler) addPodToCache(obj interface{}) {
  170. pod, ok := obj.(*v1.Pod)
  171. if !ok {
  172. klog.Errorf("cannot convert to *v1.Pod: %v", obj)
  173. return
  174. }
  175. if err := sched.config.SchedulerCache.AddPod(pod); err != nil {
  176. klog.Errorf("scheduler cache AddPod failed: %v", err)
  177. }
  178. sched.config.SchedulingQueue.AssignedPodAdded(pod)
  179. }
  180. func (sched *Scheduler) updatePodInCache(oldObj, newObj interface{}) {
  181. oldPod, ok := oldObj.(*v1.Pod)
  182. if !ok {
  183. klog.Errorf("cannot convert oldObj to *v1.Pod: %v", oldObj)
  184. return
  185. }
  186. newPod, ok := newObj.(*v1.Pod)
  187. if !ok {
  188. klog.Errorf("cannot convert newObj to *v1.Pod: %v", newObj)
  189. return
  190. }
  191. // NOTE: Updates must be written to scheduler cache before invalidating
  192. // equivalence cache, because we could snapshot equivalence cache after the
  193. // invalidation and then snapshot the cache itself. If the cache is
  194. // snapshotted before updates are written, we would update equivalence
  195. // cache with stale information which is based on snapshot of old cache.
  196. if err := sched.config.SchedulerCache.UpdatePod(oldPod, newPod); err != nil {
  197. klog.Errorf("scheduler cache UpdatePod failed: %v", err)
  198. }
  199. sched.config.SchedulingQueue.AssignedPodUpdated(newPod)
  200. }
  201. func (sched *Scheduler) deletePodFromCache(obj interface{}) {
  202. var pod *v1.Pod
  203. switch t := obj.(type) {
  204. case *v1.Pod:
  205. pod = t
  206. case cache.DeletedFinalStateUnknown:
  207. var ok bool
  208. pod, ok = t.Obj.(*v1.Pod)
  209. if !ok {
  210. klog.Errorf("cannot convert to *v1.Pod: %v", t.Obj)
  211. return
  212. }
  213. default:
  214. klog.Errorf("cannot convert to *v1.Pod: %v", t)
  215. return
  216. }
  217. // NOTE: Updates must be written to scheduler cache before invalidating
  218. // equivalence cache, because we could snapshot equivalence cache after the
  219. // invalidation and then snapshot the cache itself. If the cache is
  220. // snapshotted before updates are written, we would update equivalence
  221. // cache with stale information which is based on snapshot of old cache.
  222. if err := sched.config.SchedulerCache.RemovePod(pod); err != nil {
  223. klog.Errorf("scheduler cache RemovePod failed: %v", err)
  224. }
  225. sched.config.SchedulingQueue.MoveAllToActiveQueue()
  226. }
  227. // assignedPod selects pods that are assigned (scheduled and running).
  228. func assignedPod(pod *v1.Pod) bool {
  229. return len(pod.Spec.NodeName) != 0
  230. }
  231. // responsibleForPod returns true if the pod has asked to be scheduled by the given scheduler.
  232. func responsibleForPod(pod *v1.Pod, schedulerName string) bool {
  233. return schedulerName == pod.Spec.SchedulerName
  234. }
  235. // skipPodUpdate checks whether the specified pod update should be ignored.
  236. // This function will return true if
  237. // - The pod has already been assumed, AND
  238. // - The pod has only its ResourceVersion, Spec.NodeName and/or Annotations
  239. // updated.
  240. func (sched *Scheduler) skipPodUpdate(pod *v1.Pod) bool {
  241. // Non-assumed pods should never be skipped.
  242. isAssumed, err := sched.config.SchedulerCache.IsAssumedPod(pod)
  243. if err != nil {
  244. utilruntime.HandleError(fmt.Errorf("failed to check whether pod %s/%s is assumed: %v", pod.Namespace, pod.Name, err))
  245. return false
  246. }
  247. if !isAssumed {
  248. return false
  249. }
  250. // Gets the assumed pod from the cache.
  251. assumedPod, err := sched.config.SchedulerCache.GetPod(pod)
  252. if err != nil {
  253. utilruntime.HandleError(fmt.Errorf("failed to get assumed pod %s/%s from cache: %v", pod.Namespace, pod.Name, err))
  254. return false
  255. }
  256. // Compares the assumed pod in the cache with the pod update. If they are
  257. // equal (with certain fields excluded), this pod update will be skipped.
  258. f := func(pod *v1.Pod) *v1.Pod {
  259. p := pod.DeepCopy()
  260. // ResourceVersion must be excluded because each object update will
  261. // have a new resource version.
  262. p.ResourceVersion = ""
  263. // Spec.NodeName must be excluded because the pod assumed in the cache
  264. // is expected to have a node assigned while the pod update may nor may
  265. // not have this field set.
  266. p.Spec.NodeName = ""
  267. // Annotations must be excluded for the reasons described in
  268. // https://github.com/kubernetes/kubernetes/issues/52914.
  269. p.Annotations = nil
  270. return p
  271. }
  272. assumedPodCopy, podCopy := f(assumedPod), f(pod)
  273. if !reflect.DeepEqual(assumedPodCopy, podCopy) {
  274. return false
  275. }
  276. klog.V(3).Infof("Skipping pod %s/%s update", pod.Namespace, pod.Name)
  277. return true
  278. }
  279. // AddAllEventHandlers is a helper function used in tests and in Scheduler
  280. // to add event handlers for various informers.
  281. func AddAllEventHandlers(
  282. sched *Scheduler,
  283. schedulerName string,
  284. nodeInformer coreinformers.NodeInformer,
  285. podInformer coreinformers.PodInformer,
  286. pvInformer coreinformers.PersistentVolumeInformer,
  287. pvcInformer coreinformers.PersistentVolumeClaimInformer,
  288. serviceInformer coreinformers.ServiceInformer,
  289. storageClassInformer storageinformers.StorageClassInformer,
  290. ) {
  291. // scheduled pod cache
  292. podInformer.Informer().AddEventHandler(
  293. cache.FilteringResourceEventHandler{
  294. FilterFunc: func(obj interface{}) bool {
  295. switch t := obj.(type) {
  296. case *v1.Pod:
  297. return assignedPod(t)
  298. case cache.DeletedFinalStateUnknown:
  299. if pod, ok := t.Obj.(*v1.Pod); ok {
  300. return assignedPod(pod)
  301. }
  302. utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched))
  303. return false
  304. default:
  305. utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj))
  306. return false
  307. }
  308. },
  309. Handler: cache.ResourceEventHandlerFuncs{
  310. AddFunc: sched.addPodToCache,
  311. UpdateFunc: sched.updatePodInCache,
  312. DeleteFunc: sched.deletePodFromCache,
  313. },
  314. },
  315. )
  316. // unscheduled pod queue
  317. podInformer.Informer().AddEventHandler(
  318. cache.FilteringResourceEventHandler{
  319. FilterFunc: func(obj interface{}) bool {
  320. switch t := obj.(type) {
  321. case *v1.Pod:
  322. return !assignedPod(t) && responsibleForPod(t, schedulerName)
  323. case cache.DeletedFinalStateUnknown:
  324. if pod, ok := t.Obj.(*v1.Pod); ok {
  325. return !assignedPod(pod) && responsibleForPod(pod, schedulerName)
  326. }
  327. utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched))
  328. return false
  329. default:
  330. utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj))
  331. return false
  332. }
  333. },
  334. Handler: cache.ResourceEventHandlerFuncs{
  335. AddFunc: sched.addPodToSchedulingQueue,
  336. UpdateFunc: sched.updatePodInSchedulingQueue,
  337. DeleteFunc: sched.deletePodFromSchedulingQueue,
  338. },
  339. },
  340. )
  341. nodeInformer.Informer().AddEventHandler(
  342. cache.ResourceEventHandlerFuncs{
  343. AddFunc: sched.addNodeToCache,
  344. UpdateFunc: sched.updateNodeInCache,
  345. DeleteFunc: sched.deleteNodeFromCache,
  346. },
  347. )
  348. // On add and delete of PVs, it will affect equivalence cache items
  349. // related to persistent volume
  350. pvInformer.Informer().AddEventHandler(
  351. cache.ResourceEventHandlerFuncs{
  352. // MaxPDVolumeCountPredicate: since it relies on the counts of PV.
  353. AddFunc: sched.onPvAdd,
  354. UpdateFunc: sched.onPvUpdate,
  355. },
  356. )
  357. // This is for MaxPDVolumeCountPredicate: add/delete PVC will affect counts of PV when it is bound.
  358. pvcInformer.Informer().AddEventHandler(
  359. cache.ResourceEventHandlerFuncs{
  360. AddFunc: sched.onPvcAdd,
  361. UpdateFunc: sched.onPvcUpdate,
  362. },
  363. )
  364. // This is for ServiceAffinity: affected by the selector of the service is updated.
  365. // Also, if new service is added, equivalence cache will also become invalid since
  366. // existing pods may be "captured" by this service and change this predicate result.
  367. serviceInformer.Informer().AddEventHandler(
  368. cache.ResourceEventHandlerFuncs{
  369. AddFunc: sched.onServiceAdd,
  370. UpdateFunc: sched.onServiceUpdate,
  371. DeleteFunc: sched.onServiceDelete,
  372. },
  373. )
  374. storageClassInformer.Informer().AddEventHandler(
  375. cache.ResourceEventHandlerFuncs{
  376. AddFunc: sched.onStorageClassAdd,
  377. },
  378. )
  379. }
  380. func nodeSchedulingPropertiesChanged(newNode *v1.Node, oldNode *v1.Node) bool {
  381. if nodeSpecUnschedulableChanged(newNode, oldNode) {
  382. return true
  383. }
  384. if nodeAllocatableChanged(newNode, oldNode) {
  385. return true
  386. }
  387. if nodeLabelsChanged(newNode, oldNode) {
  388. return true
  389. }
  390. if nodeTaintsChanged(newNode, oldNode) {
  391. return true
  392. }
  393. if nodeConditionsChanged(newNode, oldNode) {
  394. return true
  395. }
  396. return false
  397. }
  398. func nodeAllocatableChanged(newNode *v1.Node, oldNode *v1.Node) bool {
  399. return !reflect.DeepEqual(oldNode.Status.Allocatable, newNode.Status.Allocatable)
  400. }
  401. func nodeLabelsChanged(newNode *v1.Node, oldNode *v1.Node) bool {
  402. return !reflect.DeepEqual(oldNode.GetLabels(), newNode.GetLabels())
  403. }
  404. func nodeTaintsChanged(newNode *v1.Node, oldNode *v1.Node) bool {
  405. return !reflect.DeepEqual(newNode.Spec.Taints, oldNode.Spec.Taints)
  406. }
  407. func nodeConditionsChanged(newNode *v1.Node, oldNode *v1.Node) bool {
  408. strip := func(conditions []v1.NodeCondition) map[v1.NodeConditionType]v1.ConditionStatus {
  409. conditionStatuses := make(map[v1.NodeConditionType]v1.ConditionStatus, len(conditions))
  410. for i := range conditions {
  411. conditionStatuses[conditions[i].Type] = conditions[i].Status
  412. }
  413. return conditionStatuses
  414. }
  415. return !reflect.DeepEqual(strip(oldNode.Status.Conditions), strip(newNode.Status.Conditions))
  416. }
  417. func nodeSpecUnschedulableChanged(newNode *v1.Node, oldNode *v1.Node) bool {
  418. return newNode.Spec.Unschedulable != oldNode.Spec.Unschedulable && newNode.Spec.Unschedulable == false
  419. }