cache.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package cache
  14. import (
  15. "fmt"
  16. "sync"
  17. "time"
  18. "k8s.io/api/core/v1"
  19. "k8s.io/apimachinery/pkg/labels"
  20. "k8s.io/apimachinery/pkg/util/sets"
  21. "k8s.io/apimachinery/pkg/util/wait"
  22. utilfeature "k8s.io/apiserver/pkg/util/feature"
  23. "k8s.io/kubernetes/pkg/features"
  24. "k8s.io/kubernetes/pkg/scheduler/algorithm"
  25. schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
  26. "k8s.io/klog"
  27. )
  28. var (
  29. cleanAssumedPeriod = 1 * time.Second
  30. )
  31. // New returns a Cache implementation.
  32. // It automatically starts a go routine that manages expiration of assumed pods.
  33. // "ttl" is how long the assumed pod will get expired.
  34. // "stop" is the channel that would close the background goroutine.
  35. func New(ttl time.Duration, stop <-chan struct{}) Cache {
  36. cache := newSchedulerCache(ttl, cleanAssumedPeriod, stop)
  37. cache.run()
  38. return cache
  39. }
  40. // nodeInfoListItem holds a NodeInfo pointer and acts as an item in a doubly
  41. // linked list. When a NodeInfo is updated, it goes to the head of the list.
  42. // The items closer to the head are the most recently updated items.
  43. type nodeInfoListItem struct {
  44. info *schedulernodeinfo.NodeInfo
  45. next *nodeInfoListItem
  46. prev *nodeInfoListItem
  47. }
  48. type schedulerCache struct {
  49. stop <-chan struct{}
  50. ttl time.Duration
  51. period time.Duration
  52. // This mutex guards all fields within this cache struct.
  53. mu sync.RWMutex
  54. // a set of assumed pod keys.
  55. // The key could further be used to get an entry in podStates.
  56. assumedPods map[string]bool
  57. // a map from pod key to podState.
  58. podStates map[string]*podState
  59. nodes map[string]*nodeInfoListItem
  60. // headNode points to the most recently updated NodeInfo in "nodes". It is the
  61. // head of the linked list.
  62. headNode *nodeInfoListItem
  63. nodeTree *NodeTree
  64. // A map from image name to its imageState.
  65. imageStates map[string]*imageState
  66. }
  67. type podState struct {
  68. pod *v1.Pod
  69. // Used by assumedPod to determinate expiration.
  70. deadline *time.Time
  71. // Used to block cache from expiring assumedPod if binding still runs
  72. bindingFinished bool
  73. }
  74. type imageState struct {
  75. // Size of the image
  76. size int64
  77. // A set of node names for nodes having this image present
  78. nodes sets.String
  79. }
  80. // createImageStateSummary returns a summarizing snapshot of the given image's state.
  81. func (cache *schedulerCache) createImageStateSummary(state *imageState) *schedulernodeinfo.ImageStateSummary {
  82. return &schedulernodeinfo.ImageStateSummary{
  83. Size: state.size,
  84. NumNodes: len(state.nodes),
  85. }
  86. }
  87. func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedulerCache {
  88. return &schedulerCache{
  89. ttl: ttl,
  90. period: period,
  91. stop: stop,
  92. nodes: make(map[string]*nodeInfoListItem),
  93. nodeTree: newNodeTree(nil),
  94. assumedPods: make(map[string]bool),
  95. podStates: make(map[string]*podState),
  96. imageStates: make(map[string]*imageState),
  97. }
  98. }
  99. // newNodeInfoListItem initializes a new nodeInfoListItem.
  100. func newNodeInfoListItem(ni *schedulernodeinfo.NodeInfo) *nodeInfoListItem {
  101. return &nodeInfoListItem{
  102. info: ni,
  103. }
  104. }
  105. // NewNodeInfoSnapshot initializes a NodeInfoSnapshot struct and returns it.
  106. func NewNodeInfoSnapshot() *NodeInfoSnapshot {
  107. return &NodeInfoSnapshot{
  108. NodeInfoMap: make(map[string]*schedulernodeinfo.NodeInfo),
  109. }
  110. }
  111. // moveNodeInfoToHead moves a NodeInfo to the head of "cache.nodes" doubly
  112. // linked list. The head is the most recently updated NodeInfo.
  113. // We assume cache lock is already acquired.
  114. func (cache *schedulerCache) moveNodeInfoToHead(name string) {
  115. ni, ok := cache.nodes[name]
  116. if !ok {
  117. klog.Errorf("No NodeInfo with name %v found in the cache", name)
  118. return
  119. }
  120. // if the node info list item is already at the head, we are done.
  121. if ni == cache.headNode {
  122. return
  123. }
  124. if ni.prev != nil {
  125. ni.prev.next = ni.next
  126. }
  127. if ni.next != nil {
  128. ni.next.prev = ni.prev
  129. }
  130. if cache.headNode != nil {
  131. cache.headNode.prev = ni
  132. }
  133. ni.next = cache.headNode
  134. ni.prev = nil
  135. cache.headNode = ni
  136. }
  137. // removeNodeInfoFromList removes a NodeInfo from the "cache.nodes" doubly
  138. // linked list.
  139. // We assume cache lock is already acquired.
  140. func (cache *schedulerCache) removeNodeInfoFromList(name string) {
  141. ni, ok := cache.nodes[name]
  142. if !ok {
  143. klog.Errorf("No NodeInfo with name %v found in the cache", name)
  144. return
  145. }
  146. if ni.prev != nil {
  147. ni.prev.next = ni.next
  148. }
  149. if ni.next != nil {
  150. ni.next.prev = ni.prev
  151. }
  152. // if the removed item was at the head, we must update the head.
  153. if ni == cache.headNode {
  154. cache.headNode = ni.next
  155. }
  156. delete(cache.nodes, name)
  157. }
  158. // Snapshot takes a snapshot of the current scheduler cache. This is used for
  159. // debugging purposes only and shouldn't be confused with UpdateNodeInfoSnapshot
  160. // function.
  161. // This method is expensive, and should be only used in non-critical path.
  162. func (cache *schedulerCache) Snapshot() *Snapshot {
  163. cache.mu.RLock()
  164. defer cache.mu.RUnlock()
  165. nodes := make(map[string]*schedulernodeinfo.NodeInfo, len(cache.nodes))
  166. for k, v := range cache.nodes {
  167. nodes[k] = v.info.Clone()
  168. }
  169. assumedPods := make(map[string]bool, len(cache.assumedPods))
  170. for k, v := range cache.assumedPods {
  171. assumedPods[k] = v
  172. }
  173. return &Snapshot{
  174. Nodes: nodes,
  175. AssumedPods: assumedPods,
  176. }
  177. }
  178. // UpdateNodeInfoSnapshot takes a snapshot of cached NodeInfo map. This is called at
  179. // beginning of every scheduling cycle.
  180. // This function tracks generation number of NodeInfo and updates only the
  181. // entries of an existing snapshot that have changed after the snapshot was taken.
  182. func (cache *schedulerCache) UpdateNodeInfoSnapshot(nodeSnapshot *NodeInfoSnapshot) error {
  183. cache.mu.Lock()
  184. defer cache.mu.Unlock()
  185. balancedVolumesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes)
  186. // Get the last generation of the the snapshot.
  187. snapshotGeneration := nodeSnapshot.Generation
  188. // Start from the head of the NodeInfo doubly linked list and update snapshot
  189. // of NodeInfos updated after the last snapshot.
  190. for node := cache.headNode; node != nil; node = node.next {
  191. if node.info.GetGeneration() <= snapshotGeneration {
  192. // all the nodes are updated before the existing snapshot. We are done.
  193. break
  194. }
  195. if balancedVolumesEnabled && node.info.TransientInfo != nil {
  196. // Transient scheduler info is reset here.
  197. node.info.TransientInfo.ResetTransientSchedulerInfo()
  198. }
  199. if np := node.info.Node(); np != nil {
  200. nodeSnapshot.NodeInfoMap[np.Name] = node.info.Clone()
  201. }
  202. }
  203. // Update the snapshot generation with the latest NodeInfo generation.
  204. if cache.headNode != nil {
  205. nodeSnapshot.Generation = cache.headNode.info.GetGeneration()
  206. }
  207. if len(nodeSnapshot.NodeInfoMap) > len(cache.nodes) {
  208. for name := range nodeSnapshot.NodeInfoMap {
  209. if _, ok := cache.nodes[name]; !ok {
  210. delete(nodeSnapshot.NodeInfoMap, name)
  211. }
  212. }
  213. }
  214. return nil
  215. }
  216. func (cache *schedulerCache) List(selector labels.Selector) ([]*v1.Pod, error) {
  217. alwaysTrue := func(p *v1.Pod) bool { return true }
  218. return cache.FilteredList(alwaysTrue, selector)
  219. }
  220. func (cache *schedulerCache) FilteredList(podFilter algorithm.PodFilter, selector labels.Selector) ([]*v1.Pod, error) {
  221. cache.mu.RLock()
  222. defer cache.mu.RUnlock()
  223. // podFilter is expected to return true for most or all of the pods. We
  224. // can avoid expensive array growth without wasting too much memory by
  225. // pre-allocating capacity.
  226. maxSize := 0
  227. for _, n := range cache.nodes {
  228. maxSize += len(n.info.Pods())
  229. }
  230. pods := make([]*v1.Pod, 0, maxSize)
  231. for _, n := range cache.nodes {
  232. for _, pod := range n.info.Pods() {
  233. if podFilter(pod) && selector.Matches(labels.Set(pod.Labels)) {
  234. pods = append(pods, pod)
  235. }
  236. }
  237. }
  238. return pods, nil
  239. }
  240. func (cache *schedulerCache) AssumePod(pod *v1.Pod) error {
  241. key, err := schedulernodeinfo.GetPodKey(pod)
  242. if err != nil {
  243. return err
  244. }
  245. cache.mu.Lock()
  246. defer cache.mu.Unlock()
  247. if _, ok := cache.podStates[key]; ok {
  248. return fmt.Errorf("pod %v is in the cache, so can't be assumed", key)
  249. }
  250. cache.addPod(pod)
  251. ps := &podState{
  252. pod: pod,
  253. }
  254. cache.podStates[key] = ps
  255. cache.assumedPods[key] = true
  256. return nil
  257. }
  258. func (cache *schedulerCache) FinishBinding(pod *v1.Pod) error {
  259. return cache.finishBinding(pod, time.Now())
  260. }
  261. // finishBinding exists to make tests determinitistic by injecting now as an argument
  262. func (cache *schedulerCache) finishBinding(pod *v1.Pod, now time.Time) error {
  263. key, err := schedulernodeinfo.GetPodKey(pod)
  264. if err != nil {
  265. return err
  266. }
  267. cache.mu.RLock()
  268. defer cache.mu.RUnlock()
  269. klog.V(5).Infof("Finished binding for pod %v. Can be expired.", key)
  270. currState, ok := cache.podStates[key]
  271. if ok && cache.assumedPods[key] {
  272. dl := now.Add(cache.ttl)
  273. currState.bindingFinished = true
  274. currState.deadline = &dl
  275. }
  276. return nil
  277. }
  278. func (cache *schedulerCache) ForgetPod(pod *v1.Pod) error {
  279. key, err := schedulernodeinfo.GetPodKey(pod)
  280. if err != nil {
  281. return err
  282. }
  283. cache.mu.Lock()
  284. defer cache.mu.Unlock()
  285. currState, ok := cache.podStates[key]
  286. if ok && currState.pod.Spec.NodeName != pod.Spec.NodeName {
  287. return fmt.Errorf("pod %v was assumed on %v but assigned to %v", key, pod.Spec.NodeName, currState.pod.Spec.NodeName)
  288. }
  289. switch {
  290. // Only assumed pod can be forgotten.
  291. case ok && cache.assumedPods[key]:
  292. err := cache.removePod(pod)
  293. if err != nil {
  294. return err
  295. }
  296. delete(cache.assumedPods, key)
  297. delete(cache.podStates, key)
  298. default:
  299. return fmt.Errorf("pod %v wasn't assumed so cannot be forgotten", key)
  300. }
  301. return nil
  302. }
  303. // Assumes that lock is already acquired.
  304. func (cache *schedulerCache) addPod(pod *v1.Pod) {
  305. n, ok := cache.nodes[pod.Spec.NodeName]
  306. if !ok {
  307. n = newNodeInfoListItem(schedulernodeinfo.NewNodeInfo())
  308. cache.nodes[pod.Spec.NodeName] = n
  309. }
  310. n.info.AddPod(pod)
  311. cache.moveNodeInfoToHead(pod.Spec.NodeName)
  312. }
  313. // Assumes that lock is already acquired.
  314. func (cache *schedulerCache) updatePod(oldPod, newPod *v1.Pod) error {
  315. if err := cache.removePod(oldPod); err != nil {
  316. return err
  317. }
  318. cache.addPod(newPod)
  319. return nil
  320. }
  321. // Assumes that lock is already acquired.
  322. func (cache *schedulerCache) removePod(pod *v1.Pod) error {
  323. n, ok := cache.nodes[pod.Spec.NodeName]
  324. if !ok {
  325. return fmt.Errorf("node %v is not found", pod.Spec.NodeName)
  326. }
  327. if err := n.info.RemovePod(pod); err != nil {
  328. return err
  329. }
  330. if len(n.info.Pods()) == 0 && n.info.Node() == nil {
  331. cache.removeNodeInfoFromList(pod.Spec.NodeName)
  332. } else {
  333. cache.moveNodeInfoToHead(pod.Spec.NodeName)
  334. }
  335. return nil
  336. }
  337. func (cache *schedulerCache) AddPod(pod *v1.Pod) error {
  338. key, err := schedulernodeinfo.GetPodKey(pod)
  339. if err != nil {
  340. return err
  341. }
  342. cache.mu.Lock()
  343. defer cache.mu.Unlock()
  344. currState, ok := cache.podStates[key]
  345. switch {
  346. case ok && cache.assumedPods[key]:
  347. if currState.pod.Spec.NodeName != pod.Spec.NodeName {
  348. // The pod was added to a different node than it was assumed to.
  349. klog.Warningf("Pod %v was assumed to be on %v but got added to %v", key, pod.Spec.NodeName, currState.pod.Spec.NodeName)
  350. // Clean this up.
  351. cache.removePod(currState.pod)
  352. cache.addPod(pod)
  353. }
  354. delete(cache.assumedPods, key)
  355. cache.podStates[key].deadline = nil
  356. cache.podStates[key].pod = pod
  357. case !ok:
  358. // Pod was expired. We should add it back.
  359. cache.addPod(pod)
  360. ps := &podState{
  361. pod: pod,
  362. }
  363. cache.podStates[key] = ps
  364. default:
  365. return fmt.Errorf("pod %v was already in added state", key)
  366. }
  367. return nil
  368. }
  369. func (cache *schedulerCache) UpdatePod(oldPod, newPod *v1.Pod) error {
  370. key, err := schedulernodeinfo.GetPodKey(oldPod)
  371. if err != nil {
  372. return err
  373. }
  374. cache.mu.Lock()
  375. defer cache.mu.Unlock()
  376. currState, ok := cache.podStates[key]
  377. switch {
  378. // An assumed pod won't have Update/Remove event. It needs to have Add event
  379. // before Update event, in which case the state would change from Assumed to Added.
  380. case ok && !cache.assumedPods[key]:
  381. if currState.pod.Spec.NodeName != newPod.Spec.NodeName {
  382. klog.Errorf("Pod %v updated on a different node than previously added to.", key)
  383. klog.Fatalf("Schedulercache is corrupted and can badly affect scheduling decisions")
  384. }
  385. if err := cache.updatePod(oldPod, newPod); err != nil {
  386. return err
  387. }
  388. currState.pod = newPod
  389. default:
  390. return fmt.Errorf("pod %v is not added to scheduler cache, so cannot be updated", key)
  391. }
  392. return nil
  393. }
  394. func (cache *schedulerCache) RemovePod(pod *v1.Pod) error {
  395. key, err := schedulernodeinfo.GetPodKey(pod)
  396. if err != nil {
  397. return err
  398. }
  399. cache.mu.Lock()
  400. defer cache.mu.Unlock()
  401. currState, ok := cache.podStates[key]
  402. switch {
  403. // An assumed pod won't have Delete/Remove event. It needs to have Add event
  404. // before Remove event, in which case the state would change from Assumed to Added.
  405. case ok && !cache.assumedPods[key]:
  406. if currState.pod.Spec.NodeName != pod.Spec.NodeName {
  407. klog.Errorf("Pod %v was assumed to be on %v but got added to %v", key, pod.Spec.NodeName, currState.pod.Spec.NodeName)
  408. klog.Fatalf("Schedulercache is corrupted and can badly affect scheduling decisions")
  409. }
  410. err := cache.removePod(currState.pod)
  411. if err != nil {
  412. return err
  413. }
  414. delete(cache.podStates, key)
  415. default:
  416. return fmt.Errorf("pod %v is not found in scheduler cache, so cannot be removed from it", key)
  417. }
  418. return nil
  419. }
  420. func (cache *schedulerCache) IsAssumedPod(pod *v1.Pod) (bool, error) {
  421. key, err := schedulernodeinfo.GetPodKey(pod)
  422. if err != nil {
  423. return false, err
  424. }
  425. cache.mu.RLock()
  426. defer cache.mu.RUnlock()
  427. b, found := cache.assumedPods[key]
  428. if !found {
  429. return false, nil
  430. }
  431. return b, nil
  432. }
  433. func (cache *schedulerCache) GetPod(pod *v1.Pod) (*v1.Pod, error) {
  434. key, err := schedulernodeinfo.GetPodKey(pod)
  435. if err != nil {
  436. return nil, err
  437. }
  438. cache.mu.RLock()
  439. defer cache.mu.RUnlock()
  440. podState, ok := cache.podStates[key]
  441. if !ok {
  442. return nil, fmt.Errorf("pod %v does not exist in scheduler cache", key)
  443. }
  444. return podState.pod, nil
  445. }
  446. func (cache *schedulerCache) AddNode(node *v1.Node) error {
  447. cache.mu.Lock()
  448. defer cache.mu.Unlock()
  449. n, ok := cache.nodes[node.Name]
  450. if !ok {
  451. n = newNodeInfoListItem(schedulernodeinfo.NewNodeInfo())
  452. cache.nodes[node.Name] = n
  453. } else {
  454. cache.removeNodeImageStates(n.info.Node())
  455. }
  456. cache.moveNodeInfoToHead(node.Name)
  457. cache.nodeTree.AddNode(node)
  458. cache.addNodeImageStates(node, n.info)
  459. return n.info.SetNode(node)
  460. }
  461. func (cache *schedulerCache) UpdateNode(oldNode, newNode *v1.Node) error {
  462. cache.mu.Lock()
  463. defer cache.mu.Unlock()
  464. n, ok := cache.nodes[newNode.Name]
  465. if !ok {
  466. n = newNodeInfoListItem(schedulernodeinfo.NewNodeInfo())
  467. cache.nodes[newNode.Name] = n
  468. } else {
  469. cache.removeNodeImageStates(n.info.Node())
  470. }
  471. cache.moveNodeInfoToHead(newNode.Name)
  472. cache.nodeTree.UpdateNode(oldNode, newNode)
  473. cache.addNodeImageStates(newNode, n.info)
  474. return n.info.SetNode(newNode)
  475. }
  476. func (cache *schedulerCache) RemoveNode(node *v1.Node) error {
  477. cache.mu.Lock()
  478. defer cache.mu.Unlock()
  479. n, ok := cache.nodes[node.Name]
  480. if !ok {
  481. return fmt.Errorf("node %v is not found", node.Name)
  482. }
  483. if err := n.info.RemoveNode(node); err != nil {
  484. return err
  485. }
  486. // We remove NodeInfo for this node only if there aren't any pods on this node.
  487. // We can't do it unconditionally, because notifications about pods are delivered
  488. // in a different watch, and thus can potentially be observed later, even though
  489. // they happened before node removal.
  490. if len(n.info.Pods()) == 0 && n.info.Node() == nil {
  491. cache.removeNodeInfoFromList(node.Name)
  492. } else {
  493. cache.moveNodeInfoToHead(node.Name)
  494. }
  495. cache.nodeTree.RemoveNode(node)
  496. cache.removeNodeImageStates(node)
  497. return nil
  498. }
  499. // addNodeImageStates adds states of the images on given node to the given nodeInfo and update the imageStates in
  500. // scheduler cache. This function assumes the lock to scheduler cache has been acquired.
  501. func (cache *schedulerCache) addNodeImageStates(node *v1.Node, nodeInfo *schedulernodeinfo.NodeInfo) {
  502. newSum := make(map[string]*schedulernodeinfo.ImageStateSummary)
  503. for _, image := range node.Status.Images {
  504. for _, name := range image.Names {
  505. // update the entry in imageStates
  506. state, ok := cache.imageStates[name]
  507. if !ok {
  508. state = &imageState{
  509. size: image.SizeBytes,
  510. nodes: sets.NewString(node.Name),
  511. }
  512. cache.imageStates[name] = state
  513. } else {
  514. state.nodes.Insert(node.Name)
  515. }
  516. // create the imageStateSummary for this image
  517. if _, ok := newSum[name]; !ok {
  518. newSum[name] = cache.createImageStateSummary(state)
  519. }
  520. }
  521. }
  522. nodeInfo.SetImageStates(newSum)
  523. }
  524. // removeNodeImageStates removes the given node record from image entries having the node
  525. // in imageStates cache. After the removal, if any image becomes free, i.e., the image
  526. // is no longer available on any node, the image entry will be removed from imageStates.
  527. func (cache *schedulerCache) removeNodeImageStates(node *v1.Node) {
  528. if node == nil {
  529. return
  530. }
  531. for _, image := range node.Status.Images {
  532. for _, name := range image.Names {
  533. state, ok := cache.imageStates[name]
  534. if ok {
  535. state.nodes.Delete(node.Name)
  536. if len(state.nodes) == 0 {
  537. // Remove the unused image to make sure the length of
  538. // imageStates represents the total number of different
  539. // images on all nodes
  540. delete(cache.imageStates, name)
  541. }
  542. }
  543. }
  544. }
  545. }
  546. func (cache *schedulerCache) run() {
  547. go wait.Until(cache.cleanupExpiredAssumedPods, cache.period, cache.stop)
  548. }
  549. func (cache *schedulerCache) cleanupExpiredAssumedPods() {
  550. cache.cleanupAssumedPods(time.Now())
  551. }
  552. // cleanupAssumedPods exists for making test deterministic by taking time as input argument.
  553. func (cache *schedulerCache) cleanupAssumedPods(now time.Time) {
  554. cache.mu.Lock()
  555. defer cache.mu.Unlock()
  556. // The size of assumedPods should be small
  557. for key := range cache.assumedPods {
  558. ps, ok := cache.podStates[key]
  559. if !ok {
  560. panic("Key found in assumed set but not in podStates. Potentially a logical error.")
  561. }
  562. if !ps.bindingFinished {
  563. klog.V(3).Infof("Couldn't expire cache for pod %v/%v. Binding is still in progress.",
  564. ps.pod.Namespace, ps.pod.Name)
  565. continue
  566. }
  567. if now.After(*ps.deadline) {
  568. klog.Warningf("Pod %s/%s expired", ps.pod.Namespace, ps.pod.Name)
  569. if err := cache.expirePod(key, ps); err != nil {
  570. klog.Errorf("ExpirePod failed for %s: %v", key, err)
  571. }
  572. }
  573. }
  574. }
  575. func (cache *schedulerCache) expirePod(key string, ps *podState) error {
  576. if err := cache.removePod(ps.pod); err != nil {
  577. return err
  578. }
  579. delete(cache.assumedPods, key)
  580. delete(cache.podStates, key)
  581. return nil
  582. }
  583. func (cache *schedulerCache) NodeTree() *NodeTree {
  584. return cache.nodeTree
  585. }