cache.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package cache
  14. import (
  15. "fmt"
  16. "sync"
  17. "time"
  18. v1 "k8s.io/api/core/v1"
  19. "k8s.io/apimachinery/pkg/labels"
  20. "k8s.io/apimachinery/pkg/util/sets"
  21. "k8s.io/apimachinery/pkg/util/wait"
  22. utilfeature "k8s.io/apiserver/pkg/util/feature"
  23. "k8s.io/klog"
  24. "k8s.io/kubernetes/pkg/features"
  25. schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
  26. "k8s.io/kubernetes/pkg/scheduler/metrics"
  27. schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
  28. )
  29. var (
  30. cleanAssumedPeriod = 1 * time.Second
  31. )
  32. // New returns a Cache implementation.
  33. // It automatically starts a go routine that manages expiration of assumed pods.
  34. // "ttl" is how long the assumed pod will get expired.
  35. // "stop" is the channel that would close the background goroutine.
  36. func New(ttl time.Duration, stop <-chan struct{}) Cache {
  37. cache := newSchedulerCache(ttl, cleanAssumedPeriod, stop)
  38. cache.run()
  39. return cache
  40. }
  41. // nodeInfoListItem holds a NodeInfo pointer and acts as an item in a doubly
  42. // linked list. When a NodeInfo is updated, it goes to the head of the list.
  43. // The items closer to the head are the most recently updated items.
  44. type nodeInfoListItem struct {
  45. info *schedulernodeinfo.NodeInfo
  46. next *nodeInfoListItem
  47. prev *nodeInfoListItem
  48. }
  49. type schedulerCache struct {
  50. stop <-chan struct{}
  51. ttl time.Duration
  52. period time.Duration
  53. // This mutex guards all fields within this cache struct.
  54. mu sync.RWMutex
  55. // a set of assumed pod keys.
  56. // The key could further be used to get an entry in podStates.
  57. assumedPods map[string]bool
  58. // a map from pod key to podState.
  59. podStates map[string]*podState
  60. nodes map[string]*nodeInfoListItem
  61. // headNode points to the most recently updated NodeInfo in "nodes". It is the
  62. // head of the linked list.
  63. headNode *nodeInfoListItem
  64. nodeTree *nodeTree
  65. // A map from image name to its imageState.
  66. imageStates map[string]*imageState
  67. }
  68. type podState struct {
  69. pod *v1.Pod
  70. // Used by assumedPod to determinate expiration.
  71. deadline *time.Time
  72. // Used to block cache from expiring assumedPod if binding still runs
  73. bindingFinished bool
  74. }
  75. type imageState struct {
  76. // Size of the image
  77. size int64
  78. // A set of node names for nodes having this image present
  79. nodes sets.String
  80. }
  81. // createImageStateSummary returns a summarizing snapshot of the given image's state.
  82. func (cache *schedulerCache) createImageStateSummary(state *imageState) *schedulernodeinfo.ImageStateSummary {
  83. return &schedulernodeinfo.ImageStateSummary{
  84. Size: state.size,
  85. NumNodes: len(state.nodes),
  86. }
  87. }
  88. func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedulerCache {
  89. return &schedulerCache{
  90. ttl: ttl,
  91. period: period,
  92. stop: stop,
  93. nodes: make(map[string]*nodeInfoListItem),
  94. nodeTree: newNodeTree(nil),
  95. assumedPods: make(map[string]bool),
  96. podStates: make(map[string]*podState),
  97. imageStates: make(map[string]*imageState),
  98. }
  99. }
  100. // newNodeInfoListItem initializes a new nodeInfoListItem.
  101. func newNodeInfoListItem(ni *schedulernodeinfo.NodeInfo) *nodeInfoListItem {
  102. return &nodeInfoListItem{
  103. info: ni,
  104. }
  105. }
  106. // moveNodeInfoToHead moves a NodeInfo to the head of "cache.nodes" doubly
  107. // linked list. The head is the most recently updated NodeInfo.
  108. // We assume cache lock is already acquired.
  109. func (cache *schedulerCache) moveNodeInfoToHead(name string) {
  110. ni, ok := cache.nodes[name]
  111. if !ok {
  112. klog.Errorf("No NodeInfo with name %v found in the cache", name)
  113. return
  114. }
  115. // if the node info list item is already at the head, we are done.
  116. if ni == cache.headNode {
  117. return
  118. }
  119. if ni.prev != nil {
  120. ni.prev.next = ni.next
  121. }
  122. if ni.next != nil {
  123. ni.next.prev = ni.prev
  124. }
  125. if cache.headNode != nil {
  126. cache.headNode.prev = ni
  127. }
  128. ni.next = cache.headNode
  129. ni.prev = nil
  130. cache.headNode = ni
  131. }
  132. // removeNodeInfoFromList removes a NodeInfo from the "cache.nodes" doubly
  133. // linked list.
  134. // We assume cache lock is already acquired.
  135. func (cache *schedulerCache) removeNodeInfoFromList(name string) {
  136. ni, ok := cache.nodes[name]
  137. if !ok {
  138. klog.Errorf("No NodeInfo with name %v found in the cache", name)
  139. return
  140. }
  141. if ni.prev != nil {
  142. ni.prev.next = ni.next
  143. }
  144. if ni.next != nil {
  145. ni.next.prev = ni.prev
  146. }
  147. // if the removed item was at the head, we must update the head.
  148. if ni == cache.headNode {
  149. cache.headNode = ni.next
  150. }
  151. delete(cache.nodes, name)
  152. }
  153. // Snapshot takes a snapshot of the current scheduler cache. This is used for
  154. // debugging purposes only and shouldn't be confused with UpdateSnapshot
  155. // function.
  156. // This method is expensive, and should be only used in non-critical path.
  157. func (cache *schedulerCache) Dump() *Dump {
  158. cache.mu.RLock()
  159. defer cache.mu.RUnlock()
  160. nodes := make(map[string]*schedulernodeinfo.NodeInfo, len(cache.nodes))
  161. for k, v := range cache.nodes {
  162. nodes[k] = v.info.Clone()
  163. }
  164. assumedPods := make(map[string]bool, len(cache.assumedPods))
  165. for k, v := range cache.assumedPods {
  166. assumedPods[k] = v
  167. }
  168. return &Dump{
  169. Nodes: nodes,
  170. AssumedPods: assumedPods,
  171. }
  172. }
  173. // UpdateSnapshot takes a snapshot of cached NodeInfo map. This is called at
  174. // beginning of every scheduling cycle.
  175. // This function tracks generation number of NodeInfo and updates only the
  176. // entries of an existing snapshot that have changed after the snapshot was taken.
  177. func (cache *schedulerCache) UpdateSnapshot(nodeSnapshot *Snapshot) error {
  178. cache.mu.Lock()
  179. defer cache.mu.Unlock()
  180. balancedVolumesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes)
  181. // Get the last generation of the snapshot.
  182. snapshotGeneration := nodeSnapshot.generation
  183. // NodeInfoList and HavePodsWithAffinityNodeInfoList must be re-created if a node was added
  184. // or removed from the cache.
  185. updateAllLists := false
  186. // HavePodsWithAffinityNodeInfoList must be re-created if a node changed its
  187. // status from having pods with affinity to NOT having pods with affinity or the other
  188. // way around.
  189. updateNodesHavePodsWithAffinity := false
  190. // Start from the head of the NodeInfo doubly linked list and update snapshot
  191. // of NodeInfos updated after the last snapshot.
  192. for node := cache.headNode; node != nil; node = node.next {
  193. if node.info.GetGeneration() <= snapshotGeneration {
  194. // all the nodes are updated before the existing snapshot. We are done.
  195. break
  196. }
  197. if balancedVolumesEnabled && node.info.TransientInfo != nil {
  198. // Transient scheduler info is reset here.
  199. node.info.TransientInfo.ResetTransientSchedulerInfo()
  200. }
  201. if np := node.info.Node(); np != nil {
  202. existing, ok := nodeSnapshot.nodeInfoMap[np.Name]
  203. if !ok {
  204. updateAllLists = true
  205. existing = &schedulernodeinfo.NodeInfo{}
  206. nodeSnapshot.nodeInfoMap[np.Name] = existing
  207. }
  208. clone := node.info.Clone()
  209. // We track nodes that have pods with affinity, here we check if this node changed its
  210. // status from having pods with affinity to NOT having pods with affinity or the other
  211. // way around.
  212. if (len(existing.PodsWithAffinity()) > 0) != (len(clone.PodsWithAffinity()) > 0) {
  213. updateNodesHavePodsWithAffinity = true
  214. }
  215. // We need to preserve the original pointer of the NodeInfo struct since it
  216. // is used in the NodeInfoList, which we may not update.
  217. *existing = *clone
  218. }
  219. }
  220. // Update the snapshot generation with the latest NodeInfo generation.
  221. if cache.headNode != nil {
  222. nodeSnapshot.generation = cache.headNode.info.GetGeneration()
  223. }
  224. if len(nodeSnapshot.nodeInfoMap) > len(cache.nodes) {
  225. cache.removeDeletedNodesFromSnapshot(nodeSnapshot)
  226. updateAllLists = true
  227. }
  228. if updateAllLists || updateNodesHavePodsWithAffinity {
  229. cache.updateNodeInfoSnapshotList(nodeSnapshot, updateAllLists)
  230. }
  231. if len(nodeSnapshot.nodeInfoList) != cache.nodeTree.numNodes {
  232. errMsg := fmt.Sprintf("snapshot state is not consistent, length of NodeInfoList=%v not equal to length of nodes in tree=%v "+
  233. ", length of NodeInfoMap=%v, length of nodes in cache=%v"+
  234. ", trying to recover",
  235. len(nodeSnapshot.nodeInfoList), cache.nodeTree.numNodes,
  236. len(nodeSnapshot.nodeInfoMap), len(cache.nodes))
  237. klog.Error(errMsg)
  238. // We will try to recover by re-creating the lists for the next scheduling cycle, but still return an
  239. // error to surface the problem, the error will likely cause a failure to the current scheduling cycle.
  240. cache.updateNodeInfoSnapshotList(nodeSnapshot, true)
  241. return fmt.Errorf(errMsg)
  242. }
  243. return nil
  244. }
  245. func (cache *schedulerCache) updateNodeInfoSnapshotList(snapshot *Snapshot, updateAll bool) {
  246. snapshot.havePodsWithAffinityNodeInfoList = make([]*schedulernodeinfo.NodeInfo, 0, cache.nodeTree.numNodes)
  247. if updateAll {
  248. // Take a snapshot of the nodes order in the tree
  249. snapshot.nodeInfoList = make([]*schedulernodeinfo.NodeInfo, 0, cache.nodeTree.numNodes)
  250. for i := 0; i < cache.nodeTree.numNodes; i++ {
  251. nodeName := cache.nodeTree.next()
  252. if n := snapshot.nodeInfoMap[nodeName]; n != nil {
  253. snapshot.nodeInfoList = append(snapshot.nodeInfoList, n)
  254. if len(n.PodsWithAffinity()) > 0 {
  255. snapshot.havePodsWithAffinityNodeInfoList = append(snapshot.havePodsWithAffinityNodeInfoList, n)
  256. }
  257. } else {
  258. klog.Errorf("node %q exist in nodeTree but not in NodeInfoMap, this should not happen.", nodeName)
  259. }
  260. }
  261. } else {
  262. for _, n := range snapshot.nodeInfoList {
  263. if len(n.PodsWithAffinity()) > 0 {
  264. snapshot.havePodsWithAffinityNodeInfoList = append(snapshot.havePodsWithAffinityNodeInfoList, n)
  265. }
  266. }
  267. }
  268. }
  269. // If certain nodes were deleted after the last snapshot was taken, we should remove them from the snapshot.
  270. func (cache *schedulerCache) removeDeletedNodesFromSnapshot(snapshot *Snapshot) {
  271. toDelete := len(snapshot.nodeInfoMap) - len(cache.nodes)
  272. for name := range snapshot.nodeInfoMap {
  273. if toDelete <= 0 {
  274. break
  275. }
  276. if _, ok := cache.nodes[name]; !ok {
  277. delete(snapshot.nodeInfoMap, name)
  278. toDelete--
  279. }
  280. }
  281. }
  282. func (cache *schedulerCache) List(selector labels.Selector) ([]*v1.Pod, error) {
  283. alwaysTrue := func(p *v1.Pod) bool { return true }
  284. return cache.FilteredList(alwaysTrue, selector)
  285. }
  286. func (cache *schedulerCache) FilteredList(podFilter schedulerlisters.PodFilter, selector labels.Selector) ([]*v1.Pod, error) {
  287. cache.mu.RLock()
  288. defer cache.mu.RUnlock()
  289. // podFilter is expected to return true for most or all of the pods. We
  290. // can avoid expensive array growth without wasting too much memory by
  291. // pre-allocating capacity.
  292. maxSize := 0
  293. for _, n := range cache.nodes {
  294. maxSize += len(n.info.Pods())
  295. }
  296. pods := make([]*v1.Pod, 0, maxSize)
  297. for _, n := range cache.nodes {
  298. for _, pod := range n.info.Pods() {
  299. if podFilter(pod) && selector.Matches(labels.Set(pod.Labels)) {
  300. pods = append(pods, pod)
  301. }
  302. }
  303. }
  304. return pods, nil
  305. }
  306. func (cache *schedulerCache) AssumePod(pod *v1.Pod) error {
  307. key, err := schedulernodeinfo.GetPodKey(pod)
  308. if err != nil {
  309. return err
  310. }
  311. cache.mu.Lock()
  312. defer cache.mu.Unlock()
  313. if _, ok := cache.podStates[key]; ok {
  314. return fmt.Errorf("pod %v is in the cache, so can't be assumed", key)
  315. }
  316. cache.addPod(pod)
  317. ps := &podState{
  318. pod: pod,
  319. }
  320. cache.podStates[key] = ps
  321. cache.assumedPods[key] = true
  322. return nil
  323. }
  324. func (cache *schedulerCache) FinishBinding(pod *v1.Pod) error {
  325. return cache.finishBinding(pod, time.Now())
  326. }
  327. // finishBinding exists to make tests determinitistic by injecting now as an argument
  328. func (cache *schedulerCache) finishBinding(pod *v1.Pod, now time.Time) error {
  329. key, err := schedulernodeinfo.GetPodKey(pod)
  330. if err != nil {
  331. return err
  332. }
  333. cache.mu.RLock()
  334. defer cache.mu.RUnlock()
  335. klog.V(5).Infof("Finished binding for pod %v. Can be expired.", key)
  336. currState, ok := cache.podStates[key]
  337. if ok && cache.assumedPods[key] {
  338. dl := now.Add(cache.ttl)
  339. currState.bindingFinished = true
  340. currState.deadline = &dl
  341. }
  342. return nil
  343. }
  344. func (cache *schedulerCache) ForgetPod(pod *v1.Pod) error {
  345. key, err := schedulernodeinfo.GetPodKey(pod)
  346. if err != nil {
  347. return err
  348. }
  349. cache.mu.Lock()
  350. defer cache.mu.Unlock()
  351. currState, ok := cache.podStates[key]
  352. if ok && currState.pod.Spec.NodeName != pod.Spec.NodeName {
  353. return fmt.Errorf("pod %v was assumed on %v but assigned to %v", key, pod.Spec.NodeName, currState.pod.Spec.NodeName)
  354. }
  355. switch {
  356. // Only assumed pod can be forgotten.
  357. case ok && cache.assumedPods[key]:
  358. err := cache.removePod(pod)
  359. if err != nil {
  360. return err
  361. }
  362. delete(cache.assumedPods, key)
  363. delete(cache.podStates, key)
  364. default:
  365. return fmt.Errorf("pod %v wasn't assumed so cannot be forgotten", key)
  366. }
  367. return nil
  368. }
  369. // Assumes that lock is already acquired.
  370. func (cache *schedulerCache) addPod(pod *v1.Pod) {
  371. n, ok := cache.nodes[pod.Spec.NodeName]
  372. if !ok {
  373. n = newNodeInfoListItem(schedulernodeinfo.NewNodeInfo())
  374. cache.nodes[pod.Spec.NodeName] = n
  375. }
  376. n.info.AddPod(pod)
  377. cache.moveNodeInfoToHead(pod.Spec.NodeName)
  378. }
  379. // Assumes that lock is already acquired.
  380. func (cache *schedulerCache) updatePod(oldPod, newPod *v1.Pod) error {
  381. if err := cache.removePod(oldPod); err != nil {
  382. return err
  383. }
  384. cache.addPod(newPod)
  385. return nil
  386. }
  387. // Assumes that lock is already acquired.
  388. // Removes a pod from the cached node info. When a node is removed, some pod
  389. // deletion events might arrive later. This is not a problem, as the pods in
  390. // the node are assumed to be removed already.
  391. func (cache *schedulerCache) removePod(pod *v1.Pod) error {
  392. n, ok := cache.nodes[pod.Spec.NodeName]
  393. if !ok {
  394. return nil
  395. }
  396. if err := n.info.RemovePod(pod); err != nil {
  397. return err
  398. }
  399. cache.moveNodeInfoToHead(pod.Spec.NodeName)
  400. return nil
  401. }
  402. func (cache *schedulerCache) AddPod(pod *v1.Pod) error {
  403. key, err := schedulernodeinfo.GetPodKey(pod)
  404. if err != nil {
  405. return err
  406. }
  407. cache.mu.Lock()
  408. defer cache.mu.Unlock()
  409. currState, ok := cache.podStates[key]
  410. switch {
  411. case ok && cache.assumedPods[key]:
  412. if currState.pod.Spec.NodeName != pod.Spec.NodeName {
  413. // The pod was added to a different node than it was assumed to.
  414. klog.Warningf("Pod %v was assumed to be on %v but got added to %v", key, pod.Spec.NodeName, currState.pod.Spec.NodeName)
  415. // Clean this up.
  416. if err = cache.removePod(currState.pod); err != nil {
  417. klog.Errorf("removing pod error: %v", err)
  418. }
  419. cache.addPod(pod)
  420. }
  421. delete(cache.assumedPods, key)
  422. cache.podStates[key].deadline = nil
  423. cache.podStates[key].pod = pod
  424. case !ok:
  425. // Pod was expired. We should add it back.
  426. cache.addPod(pod)
  427. ps := &podState{
  428. pod: pod,
  429. }
  430. cache.podStates[key] = ps
  431. default:
  432. return fmt.Errorf("pod %v was already in added state", key)
  433. }
  434. return nil
  435. }
  436. func (cache *schedulerCache) UpdatePod(oldPod, newPod *v1.Pod) error {
  437. key, err := schedulernodeinfo.GetPodKey(oldPod)
  438. if err != nil {
  439. return err
  440. }
  441. cache.mu.Lock()
  442. defer cache.mu.Unlock()
  443. currState, ok := cache.podStates[key]
  444. switch {
  445. // An assumed pod won't have Update/Remove event. It needs to have Add event
  446. // before Update event, in which case the state would change from Assumed to Added.
  447. case ok && !cache.assumedPods[key]:
  448. if currState.pod.Spec.NodeName != newPod.Spec.NodeName {
  449. klog.Errorf("Pod %v updated on a different node than previously added to.", key)
  450. klog.Fatalf("Schedulercache is corrupted and can badly affect scheduling decisions")
  451. }
  452. if err := cache.updatePod(oldPod, newPod); err != nil {
  453. return err
  454. }
  455. currState.pod = newPod
  456. default:
  457. return fmt.Errorf("pod %v is not added to scheduler cache, so cannot be updated", key)
  458. }
  459. return nil
  460. }
  461. func (cache *schedulerCache) RemovePod(pod *v1.Pod) error {
  462. key, err := schedulernodeinfo.GetPodKey(pod)
  463. if err != nil {
  464. return err
  465. }
  466. cache.mu.Lock()
  467. defer cache.mu.Unlock()
  468. currState, ok := cache.podStates[key]
  469. switch {
  470. // An assumed pod won't have Delete/Remove event. It needs to have Add event
  471. // before Remove event, in which case the state would change from Assumed to Added.
  472. case ok && !cache.assumedPods[key]:
  473. if currState.pod.Spec.NodeName != pod.Spec.NodeName {
  474. klog.Errorf("Pod %v was assumed to be on %v but got added to %v", key, pod.Spec.NodeName, currState.pod.Spec.NodeName)
  475. klog.Fatalf("Schedulercache is corrupted and can badly affect scheduling decisions")
  476. }
  477. err := cache.removePod(currState.pod)
  478. if err != nil {
  479. return err
  480. }
  481. delete(cache.podStates, key)
  482. default:
  483. return fmt.Errorf("pod %v is not found in scheduler cache, so cannot be removed from it", key)
  484. }
  485. return nil
  486. }
  487. func (cache *schedulerCache) IsAssumedPod(pod *v1.Pod) (bool, error) {
  488. key, err := schedulernodeinfo.GetPodKey(pod)
  489. if err != nil {
  490. return false, err
  491. }
  492. cache.mu.RLock()
  493. defer cache.mu.RUnlock()
  494. b, found := cache.assumedPods[key]
  495. if !found {
  496. return false, nil
  497. }
  498. return b, nil
  499. }
  500. // GetPod might return a pod for which its node has already been deleted from
  501. // the main cache. This is useful to properly process pod update events.
  502. func (cache *schedulerCache) GetPod(pod *v1.Pod) (*v1.Pod, error) {
  503. key, err := schedulernodeinfo.GetPodKey(pod)
  504. if err != nil {
  505. return nil, err
  506. }
  507. cache.mu.RLock()
  508. defer cache.mu.RUnlock()
  509. podState, ok := cache.podStates[key]
  510. if !ok {
  511. return nil, fmt.Errorf("pod %v does not exist in scheduler cache", key)
  512. }
  513. return podState.pod, nil
  514. }
  515. func (cache *schedulerCache) AddNode(node *v1.Node) error {
  516. cache.mu.Lock()
  517. defer cache.mu.Unlock()
  518. n, ok := cache.nodes[node.Name]
  519. if !ok {
  520. n = newNodeInfoListItem(schedulernodeinfo.NewNodeInfo())
  521. cache.nodes[node.Name] = n
  522. } else {
  523. cache.removeNodeImageStates(n.info.Node())
  524. }
  525. cache.moveNodeInfoToHead(node.Name)
  526. cache.nodeTree.addNode(node)
  527. cache.addNodeImageStates(node, n.info)
  528. return n.info.SetNode(node)
  529. }
  530. func (cache *schedulerCache) UpdateNode(oldNode, newNode *v1.Node) error {
  531. cache.mu.Lock()
  532. defer cache.mu.Unlock()
  533. n, ok := cache.nodes[newNode.Name]
  534. if !ok {
  535. n = newNodeInfoListItem(schedulernodeinfo.NewNodeInfo())
  536. cache.nodes[newNode.Name] = n
  537. cache.nodeTree.addNode(newNode)
  538. } else {
  539. cache.removeNodeImageStates(n.info.Node())
  540. }
  541. cache.moveNodeInfoToHead(newNode.Name)
  542. cache.nodeTree.updateNode(oldNode, newNode)
  543. cache.addNodeImageStates(newNode, n.info)
  544. return n.info.SetNode(newNode)
  545. }
  546. // RemoveNode removes a node from the cache.
  547. // Some nodes might still have pods because their deletion events didn't arrive
  548. // yet. For most intents and purposes, those pods are removed from the cache,
  549. // having it's source of truth in the cached nodes.
  550. // However, some information on pods (assumedPods, podStates) persist. These
  551. // caches will be eventually consistent as pod deletion events arrive.
  552. func (cache *schedulerCache) RemoveNode(node *v1.Node) error {
  553. cache.mu.Lock()
  554. defer cache.mu.Unlock()
  555. _, ok := cache.nodes[node.Name]
  556. if !ok {
  557. return fmt.Errorf("node %v is not found", node.Name)
  558. }
  559. cache.removeNodeInfoFromList(node.Name)
  560. if err := cache.nodeTree.removeNode(node); err != nil {
  561. return err
  562. }
  563. cache.removeNodeImageStates(node)
  564. return nil
  565. }
  566. // addNodeImageStates adds states of the images on given node to the given nodeInfo and update the imageStates in
  567. // scheduler cache. This function assumes the lock to scheduler cache has been acquired.
  568. func (cache *schedulerCache) addNodeImageStates(node *v1.Node, nodeInfo *schedulernodeinfo.NodeInfo) {
  569. newSum := make(map[string]*schedulernodeinfo.ImageStateSummary)
  570. for _, image := range node.Status.Images {
  571. for _, name := range image.Names {
  572. // update the entry in imageStates
  573. state, ok := cache.imageStates[name]
  574. if !ok {
  575. state = &imageState{
  576. size: image.SizeBytes,
  577. nodes: sets.NewString(node.Name),
  578. }
  579. cache.imageStates[name] = state
  580. } else {
  581. state.nodes.Insert(node.Name)
  582. }
  583. // create the imageStateSummary for this image
  584. if _, ok := newSum[name]; !ok {
  585. newSum[name] = cache.createImageStateSummary(state)
  586. }
  587. }
  588. }
  589. nodeInfo.SetImageStates(newSum)
  590. }
  591. // removeNodeImageStates removes the given node record from image entries having the node
  592. // in imageStates cache. After the removal, if any image becomes free, i.e., the image
  593. // is no longer available on any node, the image entry will be removed from imageStates.
  594. func (cache *schedulerCache) removeNodeImageStates(node *v1.Node) {
  595. if node == nil {
  596. return
  597. }
  598. for _, image := range node.Status.Images {
  599. for _, name := range image.Names {
  600. state, ok := cache.imageStates[name]
  601. if ok {
  602. state.nodes.Delete(node.Name)
  603. if len(state.nodes) == 0 {
  604. // Remove the unused image to make sure the length of
  605. // imageStates represents the total number of different
  606. // images on all nodes
  607. delete(cache.imageStates, name)
  608. }
  609. }
  610. }
  611. }
  612. }
  613. func (cache *schedulerCache) run() {
  614. go wait.Until(cache.cleanupExpiredAssumedPods, cache.period, cache.stop)
  615. }
  616. func (cache *schedulerCache) cleanupExpiredAssumedPods() {
  617. cache.cleanupAssumedPods(time.Now())
  618. }
  619. // cleanupAssumedPods exists for making test deterministic by taking time as input argument.
  620. // It also reports metrics on the cache size for nodes, pods, and assumed pods.
  621. func (cache *schedulerCache) cleanupAssumedPods(now time.Time) {
  622. cache.mu.Lock()
  623. defer cache.mu.Unlock()
  624. defer cache.updateMetrics()
  625. // The size of assumedPods should be small
  626. for key := range cache.assumedPods {
  627. ps, ok := cache.podStates[key]
  628. if !ok {
  629. klog.Fatal("Key found in assumed set but not in podStates. Potentially a logical error.")
  630. }
  631. if !ps.bindingFinished {
  632. klog.V(3).Infof("Couldn't expire cache for pod %v/%v. Binding is still in progress.",
  633. ps.pod.Namespace, ps.pod.Name)
  634. continue
  635. }
  636. if now.After(*ps.deadline) {
  637. klog.Warningf("Pod %s/%s expired", ps.pod.Namespace, ps.pod.Name)
  638. if err := cache.expirePod(key, ps); err != nil {
  639. klog.Errorf("ExpirePod failed for %s: %v", key, err)
  640. }
  641. }
  642. }
  643. }
  644. func (cache *schedulerCache) expirePod(key string, ps *podState) error {
  645. if err := cache.removePod(ps.pod); err != nil {
  646. return err
  647. }
  648. delete(cache.assumedPods, key)
  649. delete(cache.podStates, key)
  650. return nil
  651. }
  652. // GetNodeInfo returns cached data for the node name.
  653. func (cache *schedulerCache) GetNodeInfo(nodeName string) (*v1.Node, error) {
  654. cache.mu.RLock()
  655. defer cache.mu.RUnlock()
  656. n, ok := cache.nodes[nodeName]
  657. if !ok {
  658. return nil, fmt.Errorf("node %q not found in cache", nodeName)
  659. }
  660. return n.info.Node(), nil
  661. }
  662. // updateMetrics updates cache size metric values for pods, assumed pods, and nodes
  663. func (cache *schedulerCache) updateMetrics() {
  664. metrics.CacheSize.WithLabelValues("assumed_pods").Set(float64(len(cache.assumedPods)))
  665. metrics.CacheSize.WithLabelValues("pods").Set(float64(len(cache.podStates)))
  666. metrics.CacheSize.WithLabelValues("nodes").Set(float64(len(cache.nodes)))
  667. }