garbagecollector.go 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package garbagecollector
  14. import (
  15. "fmt"
  16. "reflect"
  17. "sync"
  18. "time"
  19. "k8s.io/klog"
  20. "k8s.io/apimachinery/pkg/api/errors"
  21. "k8s.io/apimachinery/pkg/api/meta"
  22. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  23. "k8s.io/apimachinery/pkg/runtime/schema"
  24. "k8s.io/apimachinery/pkg/types"
  25. utilerrors "k8s.io/apimachinery/pkg/util/errors"
  26. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  27. "k8s.io/apimachinery/pkg/util/sets"
  28. "k8s.io/apimachinery/pkg/util/wait"
  29. "k8s.io/client-go/discovery"
  30. "k8s.io/client-go/metadata"
  31. "k8s.io/client-go/tools/cache"
  32. "k8s.io/client-go/util/workqueue"
  33. "k8s.io/kubernetes/pkg/controller"
  34. // import known versions
  35. _ "k8s.io/client-go/kubernetes"
  36. )
  37. // ResourceResyncTime defines the resync period of the garbage collector's informers.
  38. const ResourceResyncTime time.Duration = 0
  39. // GarbageCollector runs reflectors to watch for changes of managed API
  40. // objects, funnels the results to a single-threaded dependencyGraphBuilder,
  41. // which builds a graph caching the dependencies among objects. Triggered by the
  42. // graph changes, the dependencyGraphBuilder enqueues objects that can
  43. // potentially be garbage-collected to the `attemptToDelete` queue, and enqueues
  44. // objects whose dependents need to be orphaned to the `attemptToOrphan` queue.
  45. // The GarbageCollector has workers who consume these two queues, send requests
  46. // to the API server to delete/update the objects accordingly.
  47. // Note that having the dependencyGraphBuilder notify the garbage collector
  48. // ensures that the garbage collector operates with a graph that is at least as
  49. // up to date as the notification is sent.
  50. type GarbageCollector struct {
  51. restMapper resettableRESTMapper
  52. metadataClient metadata.Interface
  53. // garbage collector attempts to delete the items in attemptToDelete queue when the time is ripe.
  54. attemptToDelete workqueue.RateLimitingInterface
  55. // garbage collector attempts to orphan the dependents of the items in the attemptToOrphan queue, then deletes the items.
  56. attemptToOrphan workqueue.RateLimitingInterface
  57. dependencyGraphBuilder *GraphBuilder
  58. // GC caches the owners that do not exist according to the API server.
  59. absentOwnerCache *UIDCache
  60. workerLock sync.RWMutex
  61. }
  62. // NewGarbageCollector creates a new GarbageCollector.
  63. func NewGarbageCollector(
  64. metadataClient metadata.Interface,
  65. mapper resettableRESTMapper,
  66. deletableResources map[schema.GroupVersionResource]struct{},
  67. ignoredResources map[schema.GroupResource]struct{},
  68. sharedInformers controller.InformerFactory,
  69. informersStarted <-chan struct{},
  70. ) (*GarbageCollector, error) {
  71. attemptToDelete := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_attempt_to_delete")
  72. attemptToOrphan := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_attempt_to_orphan")
  73. absentOwnerCache := NewUIDCache(500)
  74. gc := &GarbageCollector{
  75. metadataClient: metadataClient,
  76. restMapper: mapper,
  77. attemptToDelete: attemptToDelete,
  78. attemptToOrphan: attemptToOrphan,
  79. absentOwnerCache: absentOwnerCache,
  80. }
  81. gb := &GraphBuilder{
  82. metadataClient: metadataClient,
  83. informersStarted: informersStarted,
  84. restMapper: mapper,
  85. graphChanges: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_graph_changes"),
  86. uidToNode: &concurrentUIDToNode{
  87. uidToNode: make(map[types.UID]*node),
  88. },
  89. attemptToDelete: attemptToDelete,
  90. attemptToOrphan: attemptToOrphan,
  91. absentOwnerCache: absentOwnerCache,
  92. sharedInformers: sharedInformers,
  93. ignoredResources: ignoredResources,
  94. }
  95. if err := gb.syncMonitors(deletableResources); err != nil {
  96. utilruntime.HandleError(fmt.Errorf("failed to sync all monitors: %v", err))
  97. }
  98. gc.dependencyGraphBuilder = gb
  99. return gc, nil
  100. }
  101. // resyncMonitors starts or stops resource monitors as needed to ensure that all
  102. // (and only) those resources present in the map are monitored.
  103. func (gc *GarbageCollector) resyncMonitors(deletableResources map[schema.GroupVersionResource]struct{}) error {
  104. if err := gc.dependencyGraphBuilder.syncMonitors(deletableResources); err != nil {
  105. return err
  106. }
  107. gc.dependencyGraphBuilder.startMonitors()
  108. return nil
  109. }
  110. // Run starts garbage collector workers.
  111. func (gc *GarbageCollector) Run(workers int, stopCh <-chan struct{}) {
  112. defer utilruntime.HandleCrash()
  113. defer gc.attemptToDelete.ShutDown()
  114. defer gc.attemptToOrphan.ShutDown()
  115. defer gc.dependencyGraphBuilder.graphChanges.ShutDown()
  116. klog.Infof("Starting garbage collector controller")
  117. defer klog.Infof("Shutting down garbage collector controller")
  118. go gc.dependencyGraphBuilder.Run(stopCh)
  119. if !cache.WaitForNamedCacheSync("garbage collector", stopCh, gc.dependencyGraphBuilder.IsSynced) {
  120. return
  121. }
  122. klog.Infof("Garbage collector: all resource monitors have synced. Proceeding to collect garbage")
  123. // gc workers
  124. for i := 0; i < workers; i++ {
  125. go wait.Until(gc.runAttemptToDeleteWorker, 1*time.Second, stopCh)
  126. go wait.Until(gc.runAttemptToOrphanWorker, 1*time.Second, stopCh)
  127. }
  128. <-stopCh
  129. }
  130. // resettableRESTMapper is a RESTMapper which is capable of resetting itself
  131. // from discovery.
  132. type resettableRESTMapper interface {
  133. meta.RESTMapper
  134. Reset()
  135. }
  136. // Sync periodically resyncs the garbage collector when new resources are
  137. // observed from discovery. When new resources are detected, Sync will stop all
  138. // GC workers, reset gc.restMapper, and resync the monitors.
  139. //
  140. // Note that discoveryClient should NOT be shared with gc.restMapper, otherwise
  141. // the mapper's underlying discovery client will be unnecessarily reset during
  142. // the course of detecting new resources.
  143. func (gc *GarbageCollector) Sync(discoveryClient discovery.ServerResourcesInterface, period time.Duration, stopCh <-chan struct{}) {
  144. oldResources := make(map[schema.GroupVersionResource]struct{})
  145. wait.Until(func() {
  146. // Get the current resource list from discovery.
  147. newResources := GetDeletableResources(discoveryClient)
  148. // This can occur if there is an internal error in GetDeletableResources.
  149. if len(newResources) == 0 {
  150. klog.V(2).Infof("no resources reported by discovery, skipping garbage collector sync")
  151. return
  152. }
  153. // Decide whether discovery has reported a change.
  154. if reflect.DeepEqual(oldResources, newResources) {
  155. klog.V(5).Infof("no resource updates from discovery, skipping garbage collector sync")
  156. return
  157. }
  158. // Ensure workers are paused to avoid processing events before informers
  159. // have resynced.
  160. gc.workerLock.Lock()
  161. defer gc.workerLock.Unlock()
  162. // Once we get here, we should not unpause workers until we've successfully synced
  163. attempt := 0
  164. wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
  165. attempt++
  166. // On a reattempt, check if available resources have changed
  167. if attempt > 1 {
  168. newResources = GetDeletableResources(discoveryClient)
  169. if len(newResources) == 0 {
  170. klog.V(2).Infof("no resources reported by discovery (attempt %d)", attempt)
  171. return false, nil
  172. }
  173. }
  174. klog.V(2).Infof("syncing garbage collector with updated resources from discovery (attempt %d): %s", attempt, printDiff(oldResources, newResources))
  175. // Resetting the REST mapper will also invalidate the underlying discovery
  176. // client. This is a leaky abstraction and assumes behavior about the REST
  177. // mapper, but we'll deal with it for now.
  178. gc.restMapper.Reset()
  179. klog.V(4).Infof("reset restmapper")
  180. // Perform the monitor resync and wait for controllers to report cache sync.
  181. //
  182. // NOTE: It's possible that newResources will diverge from the resources
  183. // discovered by restMapper during the call to Reset, since they are
  184. // distinct discovery clients invalidated at different times. For example,
  185. // newResources may contain resources not returned in the restMapper's
  186. // discovery call if the resources appeared in-between the calls. In that
  187. // case, the restMapper will fail to map some of newResources until the next
  188. // attempt.
  189. if err := gc.resyncMonitors(newResources); err != nil {
  190. utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors (attempt %d): %v", attempt, err))
  191. return false, nil
  192. }
  193. klog.V(4).Infof("resynced monitors")
  194. // wait for caches to fill for a while (our sync period) before attempting to rediscover resources and retry syncing.
  195. // this protects us from deadlocks where available resources changed and one of our informer caches will never fill.
  196. // informers keep attempting to sync in the background, so retrying doesn't interrupt them.
  197. // the call to resyncMonitors on the reattempt will no-op for resources that still exist.
  198. // note that workers stay paused until we successfully resync.
  199. if !cache.WaitForNamedCacheSync("garbage collector", waitForStopOrTimeout(stopCh, period), gc.dependencyGraphBuilder.IsSynced) {
  200. utilruntime.HandleError(fmt.Errorf("timed out waiting for dependency graph builder sync during GC sync (attempt %d)", attempt))
  201. return false, nil
  202. }
  203. // success, break out of the loop
  204. return true, nil
  205. }, stopCh)
  206. // Finally, keep track of our new state. Do this after all preceding steps
  207. // have succeeded to ensure we'll retry on subsequent syncs if an error
  208. // occurred.
  209. oldResources = newResources
  210. klog.V(2).Infof("synced garbage collector")
  211. }, period, stopCh)
  212. }
  213. // printDiff returns a human-readable summary of what resources were added and removed
  214. func printDiff(oldResources, newResources map[schema.GroupVersionResource]struct{}) string {
  215. removed := sets.NewString()
  216. for oldResource := range oldResources {
  217. if _, ok := newResources[oldResource]; !ok {
  218. removed.Insert(fmt.Sprintf("%+v", oldResource))
  219. }
  220. }
  221. added := sets.NewString()
  222. for newResource := range newResources {
  223. if _, ok := oldResources[newResource]; !ok {
  224. added.Insert(fmt.Sprintf("%+v", newResource))
  225. }
  226. }
  227. return fmt.Sprintf("added: %v, removed: %v", added.List(), removed.List())
  228. }
  229. // waitForStopOrTimeout returns a stop channel that closes when the provided stop channel closes or when the specified timeout is reached
  230. func waitForStopOrTimeout(stopCh <-chan struct{}, timeout time.Duration) <-chan struct{} {
  231. stopChWithTimeout := make(chan struct{})
  232. go func() {
  233. select {
  234. case <-stopCh:
  235. case <-time.After(timeout):
  236. }
  237. close(stopChWithTimeout)
  238. }()
  239. return stopChWithTimeout
  240. }
  241. // IsSynced returns true if dependencyGraphBuilder is synced.
  242. func (gc *GarbageCollector) IsSynced() bool {
  243. return gc.dependencyGraphBuilder.IsSynced()
  244. }
  245. func (gc *GarbageCollector) runAttemptToDeleteWorker() {
  246. for gc.attemptToDeleteWorker() {
  247. }
  248. }
  249. func (gc *GarbageCollector) attemptToDeleteWorker() bool {
  250. item, quit := gc.attemptToDelete.Get()
  251. gc.workerLock.RLock()
  252. defer gc.workerLock.RUnlock()
  253. if quit {
  254. return false
  255. }
  256. defer gc.attemptToDelete.Done(item)
  257. n, ok := item.(*node)
  258. if !ok {
  259. utilruntime.HandleError(fmt.Errorf("expect *node, got %#v", item))
  260. return true
  261. }
  262. err := gc.attemptToDeleteItem(n)
  263. if err != nil {
  264. if _, ok := err.(*restMappingError); ok {
  265. // There are at least two ways this can happen:
  266. // 1. The reference is to an object of a custom type that has not yet been
  267. // recognized by gc.restMapper (this is a transient error).
  268. // 2. The reference is to an invalid group/version. We don't currently
  269. // have a way to distinguish this from a valid type we will recognize
  270. // after the next discovery sync.
  271. // For now, record the error and retry.
  272. klog.V(5).Infof("error syncing item %s: %v", n, err)
  273. } else {
  274. utilruntime.HandleError(fmt.Errorf("error syncing item %s: %v", n, err))
  275. }
  276. // retry if garbage collection of an object failed.
  277. gc.attemptToDelete.AddRateLimited(item)
  278. } else if !n.isObserved() {
  279. // requeue if item hasn't been observed via an informer event yet.
  280. // otherwise a virtual node for an item added AND removed during watch reestablishment can get stuck in the graph and never removed.
  281. // see https://issue.k8s.io/56121
  282. klog.V(5).Infof("item %s hasn't been observed via informer yet", n.identity)
  283. gc.attemptToDelete.AddRateLimited(item)
  284. }
  285. return true
  286. }
  287. // isDangling check if a reference is pointing to an object that doesn't exist.
  288. // If isDangling looks up the referenced object at the API server, it also
  289. // returns its latest state.
  290. func (gc *GarbageCollector) isDangling(reference metav1.OwnerReference, item *node) (
  291. dangling bool, owner *metav1.PartialObjectMetadata, err error) {
  292. if gc.absentOwnerCache.Has(reference.UID) {
  293. klog.V(5).Infof("according to the absentOwnerCache, object %s's owner %s/%s, %s does not exist", item.identity.UID, reference.APIVersion, reference.Kind, reference.Name)
  294. return true, nil, nil
  295. }
  296. // TODO: we need to verify the reference resource is supported by the
  297. // system. If it's not a valid resource, the garbage collector should i)
  298. // ignore the reference when decide if the object should be deleted, and
  299. // ii) should update the object to remove such references. This is to
  300. // prevent objects having references to an old resource from being
  301. // deleted during a cluster upgrade.
  302. resource, namespaced, err := gc.apiResource(reference.APIVersion, reference.Kind)
  303. if err != nil {
  304. return false, nil, err
  305. }
  306. // TODO: It's only necessary to talk to the API server if the owner node
  307. // is a "virtual" node. The local graph could lag behind the real
  308. // status, but in practice, the difference is small.
  309. owner, err = gc.metadataClient.Resource(resource).Namespace(resourceDefaultNamespace(namespaced, item.identity.Namespace)).Get(reference.Name, metav1.GetOptions{})
  310. switch {
  311. case errors.IsNotFound(err):
  312. gc.absentOwnerCache.Add(reference.UID)
  313. klog.V(5).Infof("object %s's owner %s/%s, %s is not found", item.identity.UID, reference.APIVersion, reference.Kind, reference.Name)
  314. return true, nil, nil
  315. case err != nil:
  316. return false, nil, err
  317. }
  318. if owner.GetUID() != reference.UID {
  319. klog.V(5).Infof("object %s's owner %s/%s, %s is not found, UID mismatch", item.identity.UID, reference.APIVersion, reference.Kind, reference.Name)
  320. gc.absentOwnerCache.Add(reference.UID)
  321. return true, nil, nil
  322. }
  323. return false, owner, nil
  324. }
  325. // classify the latestReferences to three categories:
  326. // solid: the owner exists, and is not "waitingForDependentsDeletion"
  327. // dangling: the owner does not exist
  328. // waitingForDependentsDeletion: the owner exists, its deletionTimestamp is non-nil, and it has
  329. // FinalizerDeletingDependents
  330. // This function communicates with the server.
  331. func (gc *GarbageCollector) classifyReferences(item *node, latestReferences []metav1.OwnerReference) (
  332. solid, dangling, waitingForDependentsDeletion []metav1.OwnerReference, err error) {
  333. for _, reference := range latestReferences {
  334. isDangling, owner, err := gc.isDangling(reference, item)
  335. if err != nil {
  336. return nil, nil, nil, err
  337. }
  338. if isDangling {
  339. dangling = append(dangling, reference)
  340. continue
  341. }
  342. ownerAccessor, err := meta.Accessor(owner)
  343. if err != nil {
  344. return nil, nil, nil, err
  345. }
  346. if ownerAccessor.GetDeletionTimestamp() != nil && hasDeleteDependentsFinalizer(ownerAccessor) {
  347. waitingForDependentsDeletion = append(waitingForDependentsDeletion, reference)
  348. } else {
  349. solid = append(solid, reference)
  350. }
  351. }
  352. return solid, dangling, waitingForDependentsDeletion, nil
  353. }
  354. func ownerRefsToUIDs(refs []metav1.OwnerReference) []types.UID {
  355. var ret []types.UID
  356. for _, ref := range refs {
  357. ret = append(ret, ref.UID)
  358. }
  359. return ret
  360. }
  361. func (gc *GarbageCollector) attemptToDeleteItem(item *node) error {
  362. klog.V(2).Infof("processing item %s", item.identity)
  363. // "being deleted" is an one-way trip to the final deletion. We'll just wait for the final deletion, and then process the object's dependents.
  364. if item.isBeingDeleted() && !item.isDeletingDependents() {
  365. klog.V(5).Infof("processing item %s returned at once, because its DeletionTimestamp is non-nil", item.identity)
  366. return nil
  367. }
  368. // TODO: It's only necessary to talk to the API server if this is a
  369. // "virtual" node. The local graph could lag behind the real status, but in
  370. // practice, the difference is small.
  371. latest, err := gc.getObject(item.identity)
  372. switch {
  373. case errors.IsNotFound(err):
  374. // the GraphBuilder can add "virtual" node for an owner that doesn't
  375. // exist yet, so we need to enqueue a virtual Delete event to remove
  376. // the virtual node from GraphBuilder.uidToNode.
  377. klog.V(5).Infof("item %v not found, generating a virtual delete event", item.identity)
  378. gc.dependencyGraphBuilder.enqueueVirtualDeleteEvent(item.identity)
  379. // since we're manually inserting a delete event to remove this node,
  380. // we don't need to keep tracking it as a virtual node and requeueing in attemptToDelete
  381. item.markObserved()
  382. return nil
  383. case err != nil:
  384. return err
  385. }
  386. if latest.GetUID() != item.identity.UID {
  387. klog.V(5).Infof("UID doesn't match, item %v not found, generating a virtual delete event", item.identity)
  388. gc.dependencyGraphBuilder.enqueueVirtualDeleteEvent(item.identity)
  389. // since we're manually inserting a delete event to remove this node,
  390. // we don't need to keep tracking it as a virtual node and requeueing in attemptToDelete
  391. item.markObserved()
  392. return nil
  393. }
  394. // TODO: attemptToOrphanWorker() routine is similar. Consider merging
  395. // attemptToOrphanWorker() into attemptToDeleteItem() as well.
  396. if item.isDeletingDependents() {
  397. return gc.processDeletingDependentsItem(item)
  398. }
  399. // compute if we should delete the item
  400. ownerReferences := latest.GetOwnerReferences()
  401. if len(ownerReferences) == 0 {
  402. klog.V(2).Infof("object %s's doesn't have an owner, continue on next item", item.identity)
  403. return nil
  404. }
  405. solid, dangling, waitingForDependentsDeletion, err := gc.classifyReferences(item, ownerReferences)
  406. if err != nil {
  407. return err
  408. }
  409. klog.V(5).Infof("classify references of %s.\nsolid: %#v\ndangling: %#v\nwaitingForDependentsDeletion: %#v\n", item.identity, solid, dangling, waitingForDependentsDeletion)
  410. switch {
  411. case len(solid) != 0:
  412. klog.V(2).Infof("object %#v has at least one existing owner: %#v, will not garbage collect", item.identity, solid)
  413. if len(dangling) == 0 && len(waitingForDependentsDeletion) == 0 {
  414. return nil
  415. }
  416. klog.V(2).Infof("remove dangling references %#v and waiting references %#v for object %s", dangling, waitingForDependentsDeletion, item.identity)
  417. // waitingForDependentsDeletion needs to be deleted from the
  418. // ownerReferences, otherwise the referenced objects will be stuck with
  419. // the FinalizerDeletingDependents and never get deleted.
  420. ownerUIDs := append(ownerRefsToUIDs(dangling), ownerRefsToUIDs(waitingForDependentsDeletion)...)
  421. patch := deleteOwnerRefStrategicMergePatch(item.identity.UID, ownerUIDs...)
  422. _, err = gc.patch(item, patch, func(n *node) ([]byte, error) {
  423. return gc.deleteOwnerRefJSONMergePatch(n, ownerUIDs...)
  424. })
  425. return err
  426. case len(waitingForDependentsDeletion) != 0 && item.dependentsLength() != 0:
  427. deps := item.getDependents()
  428. for _, dep := range deps {
  429. if dep.isDeletingDependents() {
  430. // this circle detection has false positives, we need to
  431. // apply a more rigorous detection if this turns out to be a
  432. // problem.
  433. // there are multiple workers run attemptToDeleteItem in
  434. // parallel, the circle detection can fail in a race condition.
  435. klog.V(2).Infof("processing object %s, some of its owners and its dependent [%s] have FinalizerDeletingDependents, to prevent potential cycle, its ownerReferences are going to be modified to be non-blocking, then the object is going to be deleted with Foreground", item.identity, dep.identity)
  436. patch, err := item.unblockOwnerReferencesStrategicMergePatch()
  437. if err != nil {
  438. return err
  439. }
  440. if _, err := gc.patch(item, patch, gc.unblockOwnerReferencesJSONMergePatch); err != nil {
  441. return err
  442. }
  443. break
  444. }
  445. }
  446. klog.V(2).Infof("at least one owner of object %s has FinalizerDeletingDependents, and the object itself has dependents, so it is going to be deleted in Foreground", item.identity)
  447. // the deletion event will be observed by the graphBuilder, so the item
  448. // will be processed again in processDeletingDependentsItem. If it
  449. // doesn't have dependents, the function will remove the
  450. // FinalizerDeletingDependents from the item, resulting in the final
  451. // deletion of the item.
  452. policy := metav1.DeletePropagationForeground
  453. return gc.deleteObject(item.identity, &policy)
  454. default:
  455. // item doesn't have any solid owner, so it needs to be garbage
  456. // collected. Also, none of item's owners is waiting for the deletion of
  457. // the dependents, so set propagationPolicy based on existing finalizers.
  458. var policy metav1.DeletionPropagation
  459. switch {
  460. case hasOrphanFinalizer(latest):
  461. // if an existing orphan finalizer is already on the object, honor it.
  462. policy = metav1.DeletePropagationOrphan
  463. case hasDeleteDependentsFinalizer(latest):
  464. // if an existing foreground finalizer is already on the object, honor it.
  465. policy = metav1.DeletePropagationForeground
  466. default:
  467. // otherwise, default to background.
  468. policy = metav1.DeletePropagationBackground
  469. }
  470. klog.V(2).Infof("delete object %s with propagation policy %s", item.identity, policy)
  471. return gc.deleteObject(item.identity, &policy)
  472. }
  473. }
  474. // process item that's waiting for its dependents to be deleted
  475. func (gc *GarbageCollector) processDeletingDependentsItem(item *node) error {
  476. blockingDependents := item.blockingDependents()
  477. if len(blockingDependents) == 0 {
  478. klog.V(2).Infof("remove DeleteDependents finalizer for item %s", item.identity)
  479. return gc.removeFinalizer(item, metav1.FinalizerDeleteDependents)
  480. }
  481. for _, dep := range blockingDependents {
  482. if !dep.isDeletingDependents() {
  483. klog.V(2).Infof("adding %s to attemptToDelete, because its owner %s is deletingDependents", dep.identity, item.identity)
  484. gc.attemptToDelete.Add(dep)
  485. }
  486. }
  487. return nil
  488. }
  489. // dependents are copies of pointers to the owner's dependents, they don't need to be locked.
  490. func (gc *GarbageCollector) orphanDependents(owner objectReference, dependents []*node) error {
  491. errCh := make(chan error, len(dependents))
  492. wg := sync.WaitGroup{}
  493. wg.Add(len(dependents))
  494. for i := range dependents {
  495. go func(dependent *node) {
  496. defer wg.Done()
  497. // the dependent.identity.UID is used as precondition
  498. patch := deleteOwnerRefStrategicMergePatch(dependent.identity.UID, owner.UID)
  499. _, err := gc.patch(dependent, patch, func(n *node) ([]byte, error) {
  500. return gc.deleteOwnerRefJSONMergePatch(n, owner.UID)
  501. })
  502. // note that if the target ownerReference doesn't exist in the
  503. // dependent, strategic merge patch will NOT return an error.
  504. if err != nil && !errors.IsNotFound(err) {
  505. errCh <- fmt.Errorf("orphaning %s failed, %v", dependent.identity, err)
  506. }
  507. }(dependents[i])
  508. }
  509. wg.Wait()
  510. close(errCh)
  511. var errorsSlice []error
  512. for e := range errCh {
  513. errorsSlice = append(errorsSlice, e)
  514. }
  515. if len(errorsSlice) != 0 {
  516. return fmt.Errorf("failed to orphan dependents of owner %s, got errors: %s", owner, utilerrors.NewAggregate(errorsSlice).Error())
  517. }
  518. klog.V(5).Infof("successfully updated all dependents of owner %s", owner)
  519. return nil
  520. }
  521. func (gc *GarbageCollector) runAttemptToOrphanWorker() {
  522. for gc.attemptToOrphanWorker() {
  523. }
  524. }
  525. // attemptToOrphanWorker dequeues a node from the attemptToOrphan, then finds its
  526. // dependents based on the graph maintained by the GC, then removes it from the
  527. // OwnerReferences of its dependents, and finally updates the owner to remove
  528. // the "Orphan" finalizer. The node is added back into the attemptToOrphan if any of
  529. // these steps fail.
  530. func (gc *GarbageCollector) attemptToOrphanWorker() bool {
  531. item, quit := gc.attemptToOrphan.Get()
  532. gc.workerLock.RLock()
  533. defer gc.workerLock.RUnlock()
  534. if quit {
  535. return false
  536. }
  537. defer gc.attemptToOrphan.Done(item)
  538. owner, ok := item.(*node)
  539. if !ok {
  540. utilruntime.HandleError(fmt.Errorf("expect *node, got %#v", item))
  541. return true
  542. }
  543. // we don't need to lock each element, because they never get updated
  544. owner.dependentsLock.RLock()
  545. dependents := make([]*node, 0, len(owner.dependents))
  546. for dependent := range owner.dependents {
  547. dependents = append(dependents, dependent)
  548. }
  549. owner.dependentsLock.RUnlock()
  550. err := gc.orphanDependents(owner.identity, dependents)
  551. if err != nil {
  552. utilruntime.HandleError(fmt.Errorf("orphanDependents for %s failed with %v", owner.identity, err))
  553. gc.attemptToOrphan.AddRateLimited(item)
  554. return true
  555. }
  556. // update the owner, remove "orphaningFinalizer" from its finalizers list
  557. err = gc.removeFinalizer(owner, metav1.FinalizerOrphanDependents)
  558. if err != nil {
  559. utilruntime.HandleError(fmt.Errorf("removeOrphanFinalizer for %s failed with %v", owner.identity, err))
  560. gc.attemptToOrphan.AddRateLimited(item)
  561. }
  562. return true
  563. }
  564. // *FOR TEST USE ONLY*
  565. // GraphHasUID returns if the GraphBuilder has a particular UID store in its
  566. // uidToNode graph. It's useful for debugging.
  567. // This method is used by integration tests.
  568. func (gc *GarbageCollector) GraphHasUID(u types.UID) bool {
  569. _, ok := gc.dependencyGraphBuilder.uidToNode.Read(u)
  570. return ok
  571. }
  572. // GetDeletableResources returns all resources from discoveryClient that the
  573. // garbage collector should recognize and work with. More specifically, all
  574. // preferred resources which support the 'delete', 'list', and 'watch' verbs.
  575. //
  576. // All discovery errors are considered temporary. Upon encountering any error,
  577. // GetDeletableResources will log and return any discovered resources it was
  578. // able to process (which may be none).
  579. func GetDeletableResources(discoveryClient discovery.ServerResourcesInterface) map[schema.GroupVersionResource]struct{} {
  580. preferredResources, err := discoveryClient.ServerPreferredResources()
  581. if err != nil {
  582. if discovery.IsGroupDiscoveryFailedError(err) {
  583. klog.Warningf("failed to discover some groups: %v", err.(*discovery.ErrGroupDiscoveryFailed).Groups)
  584. } else {
  585. klog.Warningf("failed to discover preferred resources: %v", err)
  586. }
  587. }
  588. if preferredResources == nil {
  589. return map[schema.GroupVersionResource]struct{}{}
  590. }
  591. // This is extracted from discovery.GroupVersionResources to allow tolerating
  592. // failures on a per-resource basis.
  593. deletableResources := discovery.FilteredBy(discovery.SupportsAllVerbs{Verbs: []string{"delete", "list", "watch"}}, preferredResources)
  594. deletableGroupVersionResources := map[schema.GroupVersionResource]struct{}{}
  595. for _, rl := range deletableResources {
  596. gv, err := schema.ParseGroupVersion(rl.GroupVersion)
  597. if err != nil {
  598. klog.Warningf("ignoring invalid discovered resource %q: %v", rl.GroupVersion, err)
  599. continue
  600. }
  601. for i := range rl.APIResources {
  602. deletableGroupVersionResources[schema.GroupVersionResource{Group: gv.Group, Version: gv.Version, Resource: rl.APIResources[i].Name}] = struct{}{}
  603. }
  604. }
  605. return deletableGroupVersionResources
  606. }