graph_builder.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package garbagecollector
  14. import (
  15. "fmt"
  16. "reflect"
  17. "sync"
  18. "time"
  19. "k8s.io/klog"
  20. "k8s.io/apimachinery/pkg/api/meta"
  21. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  22. "k8s.io/apimachinery/pkg/runtime/schema"
  23. utilerrors "k8s.io/apimachinery/pkg/util/errors"
  24. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  25. "k8s.io/apimachinery/pkg/util/sets"
  26. "k8s.io/apimachinery/pkg/util/wait"
  27. "k8s.io/client-go/tools/cache"
  28. "k8s.io/client-go/util/workqueue"
  29. "k8s.io/kubernetes/pkg/controller"
  30. "k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly"
  31. )
  32. type eventType int
  33. func (e eventType) String() string {
  34. switch e {
  35. case addEvent:
  36. return "add"
  37. case updateEvent:
  38. return "update"
  39. case deleteEvent:
  40. return "delete"
  41. default:
  42. return fmt.Sprintf("unknown(%d)", int(e))
  43. }
  44. }
  45. const (
  46. addEvent eventType = iota
  47. updateEvent
  48. deleteEvent
  49. )
  50. type event struct {
  51. eventType eventType
  52. obj interface{}
  53. // the update event comes with an old object, but it's not used by the garbage collector.
  54. oldObj interface{}
  55. gvk schema.GroupVersionKind
  56. }
  57. // GraphBuilder: based on the events supplied by the informers, GraphBuilder updates
  58. // uidToNode, a graph that caches the dependencies as we know, and enqueues
  59. // items to the attemptToDelete and attemptToOrphan.
  60. type GraphBuilder struct {
  61. restMapper meta.RESTMapper
  62. // each monitor list/watches a resource, the results are funneled to the
  63. // dependencyGraphBuilder
  64. monitors monitors
  65. monitorLock sync.RWMutex
  66. // informersStarted is closed after after all of the controllers have been initialized and are running.
  67. // After that it is safe to start them here, before that it is not.
  68. informersStarted <-chan struct{}
  69. // stopCh drives shutdown. When a receive from it unblocks, monitors will shut down.
  70. // This channel is also protected by monitorLock.
  71. stopCh <-chan struct{}
  72. // running tracks whether Run() has been called.
  73. // it is protected by monitorLock.
  74. running bool
  75. // monitors are the producer of the graphChanges queue, graphBuilder alters
  76. // the in-memory graph according to the changes.
  77. graphChanges workqueue.RateLimitingInterface
  78. // uidToNode doesn't require a lock to protect, because only the
  79. // single-threaded GraphBuilder.processGraphChanges() reads/writes it.
  80. uidToNode *concurrentUIDToNode
  81. // GraphBuilder is the producer of attemptToDelete and attemptToOrphan, GC is the consumer.
  82. attemptToDelete workqueue.RateLimitingInterface
  83. attemptToOrphan workqueue.RateLimitingInterface
  84. // GraphBuilder and GC share the absentOwnerCache. Objects that are known to
  85. // be non-existent are added to the cached.
  86. absentOwnerCache *UIDCache
  87. sharedInformers controller.InformerFactory
  88. ignoredResources map[schema.GroupResource]struct{}
  89. }
  90. // monitor runs a Controller with a local stop channel.
  91. type monitor struct {
  92. controller cache.Controller
  93. store cache.Store
  94. // stopCh stops Controller. If stopCh is nil, the monitor is considered to be
  95. // not yet started.
  96. stopCh chan struct{}
  97. }
  98. // Run is intended to be called in a goroutine. Multiple calls of this is an
  99. // error.
  100. func (m *monitor) Run() {
  101. m.controller.Run(m.stopCh)
  102. }
  103. type monitors map[schema.GroupVersionResource]*monitor
  104. func (gb *GraphBuilder) controllerFor(resource schema.GroupVersionResource, kind schema.GroupVersionKind) (cache.Controller, cache.Store, error) {
  105. handlers := cache.ResourceEventHandlerFuncs{
  106. // add the event to the dependencyGraphBuilder's graphChanges.
  107. AddFunc: func(obj interface{}) {
  108. event := &event{
  109. eventType: addEvent,
  110. obj: obj,
  111. gvk: kind,
  112. }
  113. gb.graphChanges.Add(event)
  114. },
  115. UpdateFunc: func(oldObj, newObj interface{}) {
  116. // TODO: check if there are differences in the ownerRefs,
  117. // finalizers, and DeletionTimestamp; if not, ignore the update.
  118. event := &event{
  119. eventType: updateEvent,
  120. obj: newObj,
  121. oldObj: oldObj,
  122. gvk: kind,
  123. }
  124. gb.graphChanges.Add(event)
  125. },
  126. DeleteFunc: func(obj interface{}) {
  127. // delta fifo may wrap the object in a cache.DeletedFinalStateUnknown, unwrap it
  128. if deletedFinalStateUnknown, ok := obj.(cache.DeletedFinalStateUnknown); ok {
  129. obj = deletedFinalStateUnknown.Obj
  130. }
  131. event := &event{
  132. eventType: deleteEvent,
  133. obj: obj,
  134. gvk: kind,
  135. }
  136. gb.graphChanges.Add(event)
  137. },
  138. }
  139. shared, err := gb.sharedInformers.ForResource(resource)
  140. if err != nil {
  141. klog.V(4).Infof("unable to use a shared informer for resource %q, kind %q: %v", resource.String(), kind.String(), err)
  142. return nil, nil, err
  143. }
  144. klog.V(4).Infof("using a shared informer for resource %q, kind %q", resource.String(), kind.String())
  145. // need to clone because it's from a shared cache
  146. shared.Informer().AddEventHandlerWithResyncPeriod(handlers, ResourceResyncTime)
  147. return shared.Informer().GetController(), shared.Informer().GetStore(), nil
  148. }
  149. // syncMonitors rebuilds the monitor set according to the supplied resources,
  150. // creating or deleting monitors as necessary. It will return any error
  151. // encountered, but will make an attempt to create a monitor for each resource
  152. // instead of immediately exiting on an error. It may be called before or after
  153. // Run. Monitors are NOT started as part of the sync. To ensure all existing
  154. // monitors are started, call startMonitors.
  155. func (gb *GraphBuilder) syncMonitors(resources map[schema.GroupVersionResource]struct{}) error {
  156. gb.monitorLock.Lock()
  157. defer gb.monitorLock.Unlock()
  158. toRemove := gb.monitors
  159. if toRemove == nil {
  160. toRemove = monitors{}
  161. }
  162. current := monitors{}
  163. errs := []error{}
  164. kept := 0
  165. added := 0
  166. for resource := range resources {
  167. if _, ok := gb.ignoredResources[resource.GroupResource()]; ok {
  168. continue
  169. }
  170. if m, ok := toRemove[resource]; ok {
  171. current[resource] = m
  172. delete(toRemove, resource)
  173. kept++
  174. continue
  175. }
  176. kind, err := gb.restMapper.KindFor(resource)
  177. if err != nil {
  178. errs = append(errs, fmt.Errorf("couldn't look up resource %q: %v", resource, err))
  179. continue
  180. }
  181. c, s, err := gb.controllerFor(resource, kind)
  182. if err != nil {
  183. errs = append(errs, fmt.Errorf("couldn't start monitor for resource %q: %v", resource, err))
  184. continue
  185. }
  186. current[resource] = &monitor{store: s, controller: c}
  187. added++
  188. }
  189. gb.monitors = current
  190. for _, monitor := range toRemove {
  191. if monitor.stopCh != nil {
  192. close(monitor.stopCh)
  193. }
  194. }
  195. klog.V(4).Infof("synced monitors; added %d, kept %d, removed %d", added, kept, len(toRemove))
  196. // NewAggregate returns nil if errs is 0-length
  197. return utilerrors.NewAggregate(errs)
  198. }
  199. // startMonitors ensures the current set of monitors are running. Any newly
  200. // started monitors will also cause shared informers to be started.
  201. //
  202. // If called before Run, startMonitors does nothing (as there is no stop channel
  203. // to support monitor/informer execution).
  204. func (gb *GraphBuilder) startMonitors() {
  205. gb.monitorLock.Lock()
  206. defer gb.monitorLock.Unlock()
  207. if !gb.running {
  208. return
  209. }
  210. // we're waiting until after the informer start that happens once all the controllers are initialized. This ensures
  211. // that they don't get unexpected events on their work queues.
  212. <-gb.informersStarted
  213. monitors := gb.monitors
  214. started := 0
  215. for _, monitor := range monitors {
  216. if monitor.stopCh == nil {
  217. monitor.stopCh = make(chan struct{})
  218. gb.sharedInformers.Start(gb.stopCh)
  219. go monitor.Run()
  220. started++
  221. }
  222. }
  223. klog.V(4).Infof("started %d new monitors, %d currently running", started, len(monitors))
  224. }
  225. // IsSynced returns true if any monitors exist AND all those monitors'
  226. // controllers HasSynced functions return true. This means IsSynced could return
  227. // true at one time, and then later return false if all monitors were
  228. // reconstructed.
  229. func (gb *GraphBuilder) IsSynced() bool {
  230. gb.monitorLock.Lock()
  231. defer gb.monitorLock.Unlock()
  232. if len(gb.monitors) == 0 {
  233. klog.V(4).Info("garbage controller monitor not synced: no monitors")
  234. return false
  235. }
  236. for resource, monitor := range gb.monitors {
  237. if !monitor.controller.HasSynced() {
  238. klog.V(4).Infof("garbage controller monitor not yet synced: %+v", resource)
  239. return false
  240. }
  241. }
  242. return true
  243. }
  244. // Run sets the stop channel and starts monitor execution until stopCh is
  245. // closed. Any running monitors will be stopped before Run returns.
  246. func (gb *GraphBuilder) Run(stopCh <-chan struct{}) {
  247. klog.Infof("GraphBuilder running")
  248. defer klog.Infof("GraphBuilder stopping")
  249. // Set up the stop channel.
  250. gb.monitorLock.Lock()
  251. gb.stopCh = stopCh
  252. gb.running = true
  253. gb.monitorLock.Unlock()
  254. // Start monitors and begin change processing until the stop channel is
  255. // closed.
  256. gb.startMonitors()
  257. wait.Until(gb.runProcessGraphChanges, 1*time.Second, stopCh)
  258. // Stop any running monitors.
  259. gb.monitorLock.Lock()
  260. defer gb.monitorLock.Unlock()
  261. monitors := gb.monitors
  262. stopped := 0
  263. for _, monitor := range monitors {
  264. if monitor.stopCh != nil {
  265. stopped++
  266. close(monitor.stopCh)
  267. }
  268. }
  269. // reset monitors so that the graph builder can be safely re-run/synced.
  270. gb.monitors = nil
  271. klog.Infof("stopped %d of %d monitors", stopped, len(monitors))
  272. }
  273. var ignoredResources = map[schema.GroupResource]struct{}{
  274. {Group: "", Resource: "events"}: {},
  275. }
  276. // DefaultIgnoredResources returns the default set of resources that the garbage collector controller
  277. // should ignore. This is exposed so downstream integrators can have access to the defaults, and add
  278. // to them as necessary when constructing the controller.
  279. func DefaultIgnoredResources() map[schema.GroupResource]struct{} {
  280. return ignoredResources
  281. }
  282. // enqueueVirtualDeleteEvent is used to add a virtual delete event to be processed for virtual nodes
  283. // once it is determined they do not have backing objects in storage
  284. func (gb *GraphBuilder) enqueueVirtualDeleteEvent(ref objectReference) {
  285. gb.graphChanges.Add(&event{
  286. eventType: deleteEvent,
  287. obj: &metaonly.MetadataOnlyObject{
  288. TypeMeta: metav1.TypeMeta{APIVersion: ref.APIVersion, Kind: ref.Kind},
  289. ObjectMeta: metav1.ObjectMeta{Namespace: ref.Namespace, UID: ref.UID, Name: ref.Name},
  290. },
  291. })
  292. }
  293. // addDependentToOwners adds n to owners' dependents list. If the owner does not
  294. // exist in the gb.uidToNode yet, a "virtual" node will be created to represent
  295. // the owner. The "virtual" node will be enqueued to the attemptToDelete, so that
  296. // attemptToDeleteItem() will verify if the owner exists according to the API server.
  297. func (gb *GraphBuilder) addDependentToOwners(n *node, owners []metav1.OwnerReference) {
  298. for _, owner := range owners {
  299. ownerNode, ok := gb.uidToNode.Read(owner.UID)
  300. if !ok {
  301. // Create a "virtual" node in the graph for the owner if it doesn't
  302. // exist in the graph yet.
  303. ownerNode = &node{
  304. identity: objectReference{
  305. OwnerReference: owner,
  306. Namespace: n.identity.Namespace,
  307. },
  308. dependents: make(map[*node]struct{}),
  309. virtual: true,
  310. }
  311. klog.V(5).Infof("add virtual node.identity: %s\n\n", ownerNode.identity)
  312. gb.uidToNode.Write(ownerNode)
  313. }
  314. ownerNode.addDependent(n)
  315. if !ok {
  316. // Enqueue the virtual node into attemptToDelete.
  317. // The garbage processor will enqueue a virtual delete
  318. // event to delete it from the graph if API server confirms this
  319. // owner doesn't exist.
  320. gb.attemptToDelete.Add(ownerNode)
  321. }
  322. }
  323. }
  324. // insertNode insert the node to gb.uidToNode; then it finds all owners as listed
  325. // in n.owners, and adds the node to their dependents list.
  326. func (gb *GraphBuilder) insertNode(n *node) {
  327. gb.uidToNode.Write(n)
  328. gb.addDependentToOwners(n, n.owners)
  329. }
  330. // removeDependentFromOwners remove n from owners' dependents list.
  331. func (gb *GraphBuilder) removeDependentFromOwners(n *node, owners []metav1.OwnerReference) {
  332. for _, owner := range owners {
  333. ownerNode, ok := gb.uidToNode.Read(owner.UID)
  334. if !ok {
  335. continue
  336. }
  337. ownerNode.deleteDependent(n)
  338. }
  339. }
  340. // removeNode removes the node from gb.uidToNode, then finds all
  341. // owners as listed in n.owners, and removes n from their dependents list.
  342. func (gb *GraphBuilder) removeNode(n *node) {
  343. gb.uidToNode.Delete(n.identity.UID)
  344. gb.removeDependentFromOwners(n, n.owners)
  345. }
  346. type ownerRefPair struct {
  347. oldRef metav1.OwnerReference
  348. newRef metav1.OwnerReference
  349. }
  350. // TODO: profile this function to see if a naive N^2 algorithm performs better
  351. // when the number of references is small.
  352. func referencesDiffs(old []metav1.OwnerReference, new []metav1.OwnerReference) (added []metav1.OwnerReference, removed []metav1.OwnerReference, changed []ownerRefPair) {
  353. oldUIDToRef := make(map[string]metav1.OwnerReference)
  354. for _, value := range old {
  355. oldUIDToRef[string(value.UID)] = value
  356. }
  357. oldUIDSet := sets.StringKeySet(oldUIDToRef)
  358. newUIDToRef := make(map[string]metav1.OwnerReference)
  359. for _, value := range new {
  360. newUIDToRef[string(value.UID)] = value
  361. }
  362. newUIDSet := sets.StringKeySet(newUIDToRef)
  363. addedUID := newUIDSet.Difference(oldUIDSet)
  364. removedUID := oldUIDSet.Difference(newUIDSet)
  365. intersection := oldUIDSet.Intersection(newUIDSet)
  366. for uid := range addedUID {
  367. added = append(added, newUIDToRef[uid])
  368. }
  369. for uid := range removedUID {
  370. removed = append(removed, oldUIDToRef[uid])
  371. }
  372. for uid := range intersection {
  373. if !reflect.DeepEqual(oldUIDToRef[uid], newUIDToRef[uid]) {
  374. changed = append(changed, ownerRefPair{oldRef: oldUIDToRef[uid], newRef: newUIDToRef[uid]})
  375. }
  376. }
  377. return added, removed, changed
  378. }
  379. // returns if the object in the event just transitions to "being deleted".
  380. func deletionStarts(oldObj interface{}, newAccessor metav1.Object) bool {
  381. // The delta_fifo may combine the creation and update of the object into one
  382. // event, so if there is no oldObj, we just return if the newObj (via
  383. // newAccessor) is being deleted.
  384. if oldObj == nil {
  385. if newAccessor.GetDeletionTimestamp() == nil {
  386. return false
  387. }
  388. return true
  389. }
  390. oldAccessor, err := meta.Accessor(oldObj)
  391. if err != nil {
  392. utilruntime.HandleError(fmt.Errorf("cannot access oldObj: %v", err))
  393. return false
  394. }
  395. return beingDeleted(newAccessor) && !beingDeleted(oldAccessor)
  396. }
  397. func beingDeleted(accessor metav1.Object) bool {
  398. return accessor.GetDeletionTimestamp() != nil
  399. }
  400. func hasDeleteDependentsFinalizer(accessor metav1.Object) bool {
  401. finalizers := accessor.GetFinalizers()
  402. for _, finalizer := range finalizers {
  403. if finalizer == metav1.FinalizerDeleteDependents {
  404. return true
  405. }
  406. }
  407. return false
  408. }
  409. func hasOrphanFinalizer(accessor metav1.Object) bool {
  410. finalizers := accessor.GetFinalizers()
  411. for _, finalizer := range finalizers {
  412. if finalizer == metav1.FinalizerOrphanDependents {
  413. return true
  414. }
  415. }
  416. return false
  417. }
  418. // this function takes newAccessor directly because the caller already
  419. // instantiates an accessor for the newObj.
  420. func startsWaitingForDependentsDeleted(oldObj interface{}, newAccessor metav1.Object) bool {
  421. return deletionStarts(oldObj, newAccessor) && hasDeleteDependentsFinalizer(newAccessor)
  422. }
  423. // this function takes newAccessor directly because the caller already
  424. // instantiates an accessor for the newObj.
  425. func startsWaitingForDependentsOrphaned(oldObj interface{}, newAccessor metav1.Object) bool {
  426. return deletionStarts(oldObj, newAccessor) && hasOrphanFinalizer(newAccessor)
  427. }
  428. // if an blocking ownerReference points to an object gets removed, or gets set to
  429. // "BlockOwnerDeletion=false", add the object to the attemptToDelete queue.
  430. func (gb *GraphBuilder) addUnblockedOwnersToDeleteQueue(removed []metav1.OwnerReference, changed []ownerRefPair) {
  431. for _, ref := range removed {
  432. if ref.BlockOwnerDeletion != nil && *ref.BlockOwnerDeletion {
  433. node, found := gb.uidToNode.Read(ref.UID)
  434. if !found {
  435. klog.V(5).Infof("cannot find %s in uidToNode", ref.UID)
  436. continue
  437. }
  438. gb.attemptToDelete.Add(node)
  439. }
  440. }
  441. for _, c := range changed {
  442. wasBlocked := c.oldRef.BlockOwnerDeletion != nil && *c.oldRef.BlockOwnerDeletion
  443. isUnblocked := c.newRef.BlockOwnerDeletion == nil || (c.newRef.BlockOwnerDeletion != nil && !*c.newRef.BlockOwnerDeletion)
  444. if wasBlocked && isUnblocked {
  445. node, found := gb.uidToNode.Read(c.newRef.UID)
  446. if !found {
  447. klog.V(5).Infof("cannot find %s in uidToNode", c.newRef.UID)
  448. continue
  449. }
  450. gb.attemptToDelete.Add(node)
  451. }
  452. }
  453. }
  454. func (gb *GraphBuilder) processTransitions(oldObj interface{}, newAccessor metav1.Object, n *node) {
  455. if startsWaitingForDependentsOrphaned(oldObj, newAccessor) {
  456. klog.V(5).Infof("add %s to the attemptToOrphan", n.identity)
  457. gb.attemptToOrphan.Add(n)
  458. return
  459. }
  460. if startsWaitingForDependentsDeleted(oldObj, newAccessor) {
  461. klog.V(2).Infof("add %s to the attemptToDelete, because it's waiting for its dependents to be deleted", n.identity)
  462. // if the n is added as a "virtual" node, its deletingDependents field is not properly set, so always set it here.
  463. n.markDeletingDependents()
  464. for dep := range n.dependents {
  465. gb.attemptToDelete.Add(dep)
  466. }
  467. gb.attemptToDelete.Add(n)
  468. }
  469. }
  470. func (gb *GraphBuilder) runProcessGraphChanges() {
  471. for gb.processGraphChanges() {
  472. }
  473. }
  474. // Dequeueing an event from graphChanges, updating graph, populating dirty_queue.
  475. func (gb *GraphBuilder) processGraphChanges() bool {
  476. item, quit := gb.graphChanges.Get()
  477. if quit {
  478. return false
  479. }
  480. defer gb.graphChanges.Done(item)
  481. event, ok := item.(*event)
  482. if !ok {
  483. utilruntime.HandleError(fmt.Errorf("expect a *event, got %v", item))
  484. return true
  485. }
  486. obj := event.obj
  487. accessor, err := meta.Accessor(obj)
  488. if err != nil {
  489. utilruntime.HandleError(fmt.Errorf("cannot access obj: %v", err))
  490. return true
  491. }
  492. klog.V(5).Infof("GraphBuilder process object: %s/%s, namespace %s, name %s, uid %s, event type %v", event.gvk.GroupVersion().String(), event.gvk.Kind, accessor.GetNamespace(), accessor.GetName(), string(accessor.GetUID()), event.eventType)
  493. // Check if the node already exists
  494. existingNode, found := gb.uidToNode.Read(accessor.GetUID())
  495. if found {
  496. // this marks the node as having been observed via an informer event
  497. // 1. this depends on graphChanges only containing add/update events from the actual informer
  498. // 2. this allows things tracking virtual nodes' existence to stop polling and rely on informer events
  499. existingNode.markObserved()
  500. }
  501. switch {
  502. case (event.eventType == addEvent || event.eventType == updateEvent) && !found:
  503. newNode := &node{
  504. identity: objectReference{
  505. OwnerReference: metav1.OwnerReference{
  506. APIVersion: event.gvk.GroupVersion().String(),
  507. Kind: event.gvk.Kind,
  508. UID: accessor.GetUID(),
  509. Name: accessor.GetName(),
  510. },
  511. Namespace: accessor.GetNamespace(),
  512. },
  513. dependents: make(map[*node]struct{}),
  514. owners: accessor.GetOwnerReferences(),
  515. deletingDependents: beingDeleted(accessor) && hasDeleteDependentsFinalizer(accessor),
  516. beingDeleted: beingDeleted(accessor),
  517. }
  518. gb.insertNode(newNode)
  519. // the underlying delta_fifo may combine a creation and a deletion into
  520. // one event, so we need to further process the event.
  521. gb.processTransitions(event.oldObj, accessor, newNode)
  522. case (event.eventType == addEvent || event.eventType == updateEvent) && found:
  523. // handle changes in ownerReferences
  524. added, removed, changed := referencesDiffs(existingNode.owners, accessor.GetOwnerReferences())
  525. if len(added) != 0 || len(removed) != 0 || len(changed) != 0 {
  526. // check if the changed dependency graph unblock owners that are
  527. // waiting for the deletion of their dependents.
  528. gb.addUnblockedOwnersToDeleteQueue(removed, changed)
  529. // update the node itself
  530. existingNode.owners = accessor.GetOwnerReferences()
  531. // Add the node to its new owners' dependent lists.
  532. gb.addDependentToOwners(existingNode, added)
  533. // remove the node from the dependent list of node that are no longer in
  534. // the node's owners list.
  535. gb.removeDependentFromOwners(existingNode, removed)
  536. }
  537. if beingDeleted(accessor) {
  538. existingNode.markBeingDeleted()
  539. }
  540. gb.processTransitions(event.oldObj, accessor, existingNode)
  541. case event.eventType == deleteEvent:
  542. if !found {
  543. klog.V(5).Infof("%v doesn't exist in the graph, this shouldn't happen", accessor.GetUID())
  544. return true
  545. }
  546. // removeNode updates the graph
  547. gb.removeNode(existingNode)
  548. existingNode.dependentsLock.RLock()
  549. defer existingNode.dependentsLock.RUnlock()
  550. if len(existingNode.dependents) > 0 {
  551. gb.absentOwnerCache.Add(accessor.GetUID())
  552. }
  553. for dep := range existingNode.dependents {
  554. gb.attemptToDelete.Add(dep)
  555. }
  556. for _, owner := range existingNode.owners {
  557. ownerNode, found := gb.uidToNode.Read(owner.UID)
  558. if !found || !ownerNode.isDeletingDependents() {
  559. continue
  560. }
  561. // this is to let attempToDeleteItem check if all the owner's
  562. // dependents are deleted, if so, the owner will be deleted.
  563. gb.attemptToDelete.Add(ownerNode)
  564. }
  565. }
  566. return true
  567. }