123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620 |
- /*
- Copyright 2016 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package garbagecollector
- import (
- "fmt"
- "reflect"
- "sync"
- "time"
- "k8s.io/klog"
- "k8s.io/apimachinery/pkg/api/meta"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/runtime/schema"
- utilerrors "k8s.io/apimachinery/pkg/util/errors"
- utilruntime "k8s.io/apimachinery/pkg/util/runtime"
- "k8s.io/apimachinery/pkg/util/sets"
- "k8s.io/apimachinery/pkg/util/wait"
- "k8s.io/client-go/metadata"
- "k8s.io/client-go/tools/cache"
- "k8s.io/client-go/util/workqueue"
- "k8s.io/kubernetes/pkg/controller"
- "k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly"
- )
- type eventType int
- func (e eventType) String() string {
- switch e {
- case addEvent:
- return "add"
- case updateEvent:
- return "update"
- case deleteEvent:
- return "delete"
- default:
- return fmt.Sprintf("unknown(%d)", int(e))
- }
- }
- const (
- addEvent eventType = iota
- updateEvent
- deleteEvent
- )
- type event struct {
- eventType eventType
- obj interface{}
- // the update event comes with an old object, but it's not used by the garbage collector.
- oldObj interface{}
- gvk schema.GroupVersionKind
- }
- // GraphBuilder processes events supplied by the informers, updates uidToNode,
- // a graph that caches the dependencies as we know, and enqueues
- // items to the attemptToDelete and attemptToOrphan.
- type GraphBuilder struct {
- restMapper meta.RESTMapper
- // each monitor list/watches a resource, the results are funneled to the
- // dependencyGraphBuilder
- monitors monitors
- monitorLock sync.RWMutex
- // informersStarted is closed after after all of the controllers have been initialized and are running.
- // After that it is safe to start them here, before that it is not.
- informersStarted <-chan struct{}
- // stopCh drives shutdown. When a receive from it unblocks, monitors will shut down.
- // This channel is also protected by monitorLock.
- stopCh <-chan struct{}
- // running tracks whether Run() has been called.
- // it is protected by monitorLock.
- running bool
- metadataClient metadata.Interface
- // monitors are the producer of the graphChanges queue, graphBuilder alters
- // the in-memory graph according to the changes.
- graphChanges workqueue.RateLimitingInterface
- // uidToNode doesn't require a lock to protect, because only the
- // single-threaded GraphBuilder.processGraphChanges() reads/writes it.
- uidToNode *concurrentUIDToNode
- // GraphBuilder is the producer of attemptToDelete and attemptToOrphan, GC is the consumer.
- attemptToDelete workqueue.RateLimitingInterface
- attemptToOrphan workqueue.RateLimitingInterface
- // GraphBuilder and GC share the absentOwnerCache. Objects that are known to
- // be non-existent are added to the cached.
- absentOwnerCache *UIDCache
- sharedInformers controller.InformerFactory
- ignoredResources map[schema.GroupResource]struct{}
- }
- // monitor runs a Controller with a local stop channel.
- type monitor struct {
- controller cache.Controller
- store cache.Store
- // stopCh stops Controller. If stopCh is nil, the monitor is considered to be
- // not yet started.
- stopCh chan struct{}
- }
- // Run is intended to be called in a goroutine. Multiple calls of this is an
- // error.
- func (m *monitor) Run() {
- m.controller.Run(m.stopCh)
- }
- type monitors map[schema.GroupVersionResource]*monitor
- func (gb *GraphBuilder) controllerFor(resource schema.GroupVersionResource, kind schema.GroupVersionKind) (cache.Controller, cache.Store, error) {
- handlers := cache.ResourceEventHandlerFuncs{
- // add the event to the dependencyGraphBuilder's graphChanges.
- AddFunc: func(obj interface{}) {
- event := &event{
- eventType: addEvent,
- obj: obj,
- gvk: kind,
- }
- gb.graphChanges.Add(event)
- },
- UpdateFunc: func(oldObj, newObj interface{}) {
- // TODO: check if there are differences in the ownerRefs,
- // finalizers, and DeletionTimestamp; if not, ignore the update.
- event := &event{
- eventType: updateEvent,
- obj: newObj,
- oldObj: oldObj,
- gvk: kind,
- }
- gb.graphChanges.Add(event)
- },
- DeleteFunc: func(obj interface{}) {
- // delta fifo may wrap the object in a cache.DeletedFinalStateUnknown, unwrap it
- if deletedFinalStateUnknown, ok := obj.(cache.DeletedFinalStateUnknown); ok {
- obj = deletedFinalStateUnknown.Obj
- }
- event := &event{
- eventType: deleteEvent,
- obj: obj,
- gvk: kind,
- }
- gb.graphChanges.Add(event)
- },
- }
- shared, err := gb.sharedInformers.ForResource(resource)
- if err != nil {
- klog.V(4).Infof("unable to use a shared informer for resource %q, kind %q: %v", resource.String(), kind.String(), err)
- return nil, nil, err
- }
- klog.V(4).Infof("using a shared informer for resource %q, kind %q", resource.String(), kind.String())
- // need to clone because it's from a shared cache
- shared.Informer().AddEventHandlerWithResyncPeriod(handlers, ResourceResyncTime)
- return shared.Informer().GetController(), shared.Informer().GetStore(), nil
- }
- // syncMonitors rebuilds the monitor set according to the supplied resources,
- // creating or deleting monitors as necessary. It will return any error
- // encountered, but will make an attempt to create a monitor for each resource
- // instead of immediately exiting on an error. It may be called before or after
- // Run. Monitors are NOT started as part of the sync. To ensure all existing
- // monitors are started, call startMonitors.
- func (gb *GraphBuilder) syncMonitors(resources map[schema.GroupVersionResource]struct{}) error {
- gb.monitorLock.Lock()
- defer gb.monitorLock.Unlock()
- toRemove := gb.monitors
- if toRemove == nil {
- toRemove = monitors{}
- }
- current := monitors{}
- errs := []error{}
- kept := 0
- added := 0
- for resource := range resources {
- if _, ok := gb.ignoredResources[resource.GroupResource()]; ok {
- continue
- }
- if m, ok := toRemove[resource]; ok {
- current[resource] = m
- delete(toRemove, resource)
- kept++
- continue
- }
- kind, err := gb.restMapper.KindFor(resource)
- if err != nil {
- errs = append(errs, fmt.Errorf("couldn't look up resource %q: %v", resource, err))
- continue
- }
- c, s, err := gb.controllerFor(resource, kind)
- if err != nil {
- errs = append(errs, fmt.Errorf("couldn't start monitor for resource %q: %v", resource, err))
- continue
- }
- current[resource] = &monitor{store: s, controller: c}
- added++
- }
- gb.monitors = current
- for _, monitor := range toRemove {
- if monitor.stopCh != nil {
- close(monitor.stopCh)
- }
- }
- klog.V(4).Infof("synced monitors; added %d, kept %d, removed %d", added, kept, len(toRemove))
- // NewAggregate returns nil if errs is 0-length
- return utilerrors.NewAggregate(errs)
- }
- // startMonitors ensures the current set of monitors are running. Any newly
- // started monitors will also cause shared informers to be started.
- //
- // If called before Run, startMonitors does nothing (as there is no stop channel
- // to support monitor/informer execution).
- func (gb *GraphBuilder) startMonitors() {
- gb.monitorLock.Lock()
- defer gb.monitorLock.Unlock()
- if !gb.running {
- return
- }
- // we're waiting until after the informer start that happens once all the controllers are initialized. This ensures
- // that they don't get unexpected events on their work queues.
- <-gb.informersStarted
- monitors := gb.monitors
- started := 0
- for _, monitor := range monitors {
- if monitor.stopCh == nil {
- monitor.stopCh = make(chan struct{})
- gb.sharedInformers.Start(gb.stopCh)
- go monitor.Run()
- started++
- }
- }
- klog.V(4).Infof("started %d new monitors, %d currently running", started, len(monitors))
- }
- // IsSynced returns true if any monitors exist AND all those monitors'
- // controllers HasSynced functions return true. This means IsSynced could return
- // true at one time, and then later return false if all monitors were
- // reconstructed.
- func (gb *GraphBuilder) IsSynced() bool {
- gb.monitorLock.Lock()
- defer gb.monitorLock.Unlock()
- if len(gb.monitors) == 0 {
- klog.V(4).Info("garbage controller monitor not synced: no monitors")
- return false
- }
- for resource, monitor := range gb.monitors {
- if !monitor.controller.HasSynced() {
- klog.V(4).Infof("garbage controller monitor not yet synced: %+v", resource)
- return false
- }
- }
- return true
- }
- // Run sets the stop channel and starts monitor execution until stopCh is
- // closed. Any running monitors will be stopped before Run returns.
- func (gb *GraphBuilder) Run(stopCh <-chan struct{}) {
- klog.Infof("GraphBuilder running")
- defer klog.Infof("GraphBuilder stopping")
- // Set up the stop channel.
- gb.monitorLock.Lock()
- gb.stopCh = stopCh
- gb.running = true
- gb.monitorLock.Unlock()
- // Start monitors and begin change processing until the stop channel is
- // closed.
- gb.startMonitors()
- wait.Until(gb.runProcessGraphChanges, 1*time.Second, stopCh)
- // Stop any running monitors.
- gb.monitorLock.Lock()
- defer gb.monitorLock.Unlock()
- monitors := gb.monitors
- stopped := 0
- for _, monitor := range monitors {
- if monitor.stopCh != nil {
- stopped++
- close(monitor.stopCh)
- }
- }
- // reset monitors so that the graph builder can be safely re-run/synced.
- gb.monitors = nil
- klog.Infof("stopped %d of %d monitors", stopped, len(monitors))
- }
- var ignoredResources = map[schema.GroupResource]struct{}{
- {Group: "", Resource: "events"}: {},
- }
- // DefaultIgnoredResources returns the default set of resources that the garbage collector controller
- // should ignore. This is exposed so downstream integrators can have access to the defaults, and add
- // to them as necessary when constructing the controller.
- func DefaultIgnoredResources() map[schema.GroupResource]struct{} {
- return ignoredResources
- }
- // enqueueVirtualDeleteEvent is used to add a virtual delete event to be processed for virtual nodes
- // once it is determined they do not have backing objects in storage
- func (gb *GraphBuilder) enqueueVirtualDeleteEvent(ref objectReference) {
- gb.graphChanges.Add(&event{
- eventType: deleteEvent,
- obj: &metaonly.MetadataOnlyObject{
- TypeMeta: metav1.TypeMeta{APIVersion: ref.APIVersion, Kind: ref.Kind},
- ObjectMeta: metav1.ObjectMeta{Namespace: ref.Namespace, UID: ref.UID, Name: ref.Name},
- },
- })
- }
- // addDependentToOwners adds n to owners' dependents list. If the owner does not
- // exist in the gb.uidToNode yet, a "virtual" node will be created to represent
- // the owner. The "virtual" node will be enqueued to the attemptToDelete, so that
- // attemptToDeleteItem() will verify if the owner exists according to the API server.
- func (gb *GraphBuilder) addDependentToOwners(n *node, owners []metav1.OwnerReference) {
- for _, owner := range owners {
- ownerNode, ok := gb.uidToNode.Read(owner.UID)
- if !ok {
- // Create a "virtual" node in the graph for the owner if it doesn't
- // exist in the graph yet.
- ownerNode = &node{
- identity: objectReference{
- OwnerReference: owner,
- Namespace: n.identity.Namespace,
- },
- dependents: make(map[*node]struct{}),
- virtual: true,
- }
- klog.V(5).Infof("add virtual node.identity: %s\n\n", ownerNode.identity)
- gb.uidToNode.Write(ownerNode)
- }
- ownerNode.addDependent(n)
- if !ok {
- // Enqueue the virtual node into attemptToDelete.
- // The garbage processor will enqueue a virtual delete
- // event to delete it from the graph if API server confirms this
- // owner doesn't exist.
- gb.attemptToDelete.Add(ownerNode)
- }
- }
- }
- // insertNode insert the node to gb.uidToNode; then it finds all owners as listed
- // in n.owners, and adds the node to their dependents list.
- func (gb *GraphBuilder) insertNode(n *node) {
- gb.uidToNode.Write(n)
- gb.addDependentToOwners(n, n.owners)
- }
- // removeDependentFromOwners remove n from owners' dependents list.
- func (gb *GraphBuilder) removeDependentFromOwners(n *node, owners []metav1.OwnerReference) {
- for _, owner := range owners {
- ownerNode, ok := gb.uidToNode.Read(owner.UID)
- if !ok {
- continue
- }
- ownerNode.deleteDependent(n)
- }
- }
- // removeNode removes the node from gb.uidToNode, then finds all
- // owners as listed in n.owners, and removes n from their dependents list.
- func (gb *GraphBuilder) removeNode(n *node) {
- gb.uidToNode.Delete(n.identity.UID)
- gb.removeDependentFromOwners(n, n.owners)
- }
- type ownerRefPair struct {
- oldRef metav1.OwnerReference
- newRef metav1.OwnerReference
- }
- // TODO: profile this function to see if a naive N^2 algorithm performs better
- // when the number of references is small.
- func referencesDiffs(old []metav1.OwnerReference, new []metav1.OwnerReference) (added []metav1.OwnerReference, removed []metav1.OwnerReference, changed []ownerRefPair) {
- oldUIDToRef := make(map[string]metav1.OwnerReference)
- for _, value := range old {
- oldUIDToRef[string(value.UID)] = value
- }
- oldUIDSet := sets.StringKeySet(oldUIDToRef)
- for _, value := range new {
- newUID := string(value.UID)
- if oldUIDSet.Has(newUID) {
- if !reflect.DeepEqual(oldUIDToRef[newUID], value) {
- changed = append(changed, ownerRefPair{oldRef: oldUIDToRef[newUID], newRef: value})
- }
- oldUIDSet.Delete(newUID)
- } else {
- added = append(added, value)
- }
- }
- for oldUID := range oldUIDSet {
- removed = append(removed, oldUIDToRef[oldUID])
- }
- return added, removed, changed
- }
- func deletionStartsWithFinalizer(oldObj interface{}, newAccessor metav1.Object, matchingFinalizer string) bool {
- // if the new object isn't being deleted, or doesn't have the finalizer we're interested in, return false
- if !beingDeleted(newAccessor) || !hasFinalizer(newAccessor, matchingFinalizer) {
- return false
- }
- // if the old object is nil, or wasn't being deleted, or didn't have the finalizer, return true
- if oldObj == nil {
- return true
- }
- oldAccessor, err := meta.Accessor(oldObj)
- if err != nil {
- utilruntime.HandleError(fmt.Errorf("cannot access oldObj: %v", err))
- return false
- }
- return !beingDeleted(oldAccessor) || !hasFinalizer(oldAccessor, matchingFinalizer)
- }
- func beingDeleted(accessor metav1.Object) bool {
- return accessor.GetDeletionTimestamp() != nil
- }
- func hasDeleteDependentsFinalizer(accessor metav1.Object) bool {
- return hasFinalizer(accessor, metav1.FinalizerDeleteDependents)
- }
- func hasOrphanFinalizer(accessor metav1.Object) bool {
- return hasFinalizer(accessor, metav1.FinalizerOrphanDependents)
- }
- func hasFinalizer(accessor metav1.Object, matchingFinalizer string) bool {
- finalizers := accessor.GetFinalizers()
- for _, finalizer := range finalizers {
- if finalizer == matchingFinalizer {
- return true
- }
- }
- return false
- }
- // this function takes newAccessor directly because the caller already
- // instantiates an accessor for the newObj.
- func startsWaitingForDependentsDeleted(oldObj interface{}, newAccessor metav1.Object) bool {
- return deletionStartsWithFinalizer(oldObj, newAccessor, metav1.FinalizerDeleteDependents)
- }
- // this function takes newAccessor directly because the caller already
- // instantiates an accessor for the newObj.
- func startsWaitingForDependentsOrphaned(oldObj interface{}, newAccessor metav1.Object) bool {
- return deletionStartsWithFinalizer(oldObj, newAccessor, metav1.FinalizerOrphanDependents)
- }
- // if an blocking ownerReference points to an object gets removed, or gets set to
- // "BlockOwnerDeletion=false", add the object to the attemptToDelete queue.
- func (gb *GraphBuilder) addUnblockedOwnersToDeleteQueue(removed []metav1.OwnerReference, changed []ownerRefPair) {
- for _, ref := range removed {
- if ref.BlockOwnerDeletion != nil && *ref.BlockOwnerDeletion {
- node, found := gb.uidToNode.Read(ref.UID)
- if !found {
- klog.V(5).Infof("cannot find %s in uidToNode", ref.UID)
- continue
- }
- gb.attemptToDelete.Add(node)
- }
- }
- for _, c := range changed {
- wasBlocked := c.oldRef.BlockOwnerDeletion != nil && *c.oldRef.BlockOwnerDeletion
- isUnblocked := c.newRef.BlockOwnerDeletion == nil || (c.newRef.BlockOwnerDeletion != nil && !*c.newRef.BlockOwnerDeletion)
- if wasBlocked && isUnblocked {
- node, found := gb.uidToNode.Read(c.newRef.UID)
- if !found {
- klog.V(5).Infof("cannot find %s in uidToNode", c.newRef.UID)
- continue
- }
- gb.attemptToDelete.Add(node)
- }
- }
- }
- func (gb *GraphBuilder) processTransitions(oldObj interface{}, newAccessor metav1.Object, n *node) {
- if startsWaitingForDependentsOrphaned(oldObj, newAccessor) {
- klog.V(5).Infof("add %s to the attemptToOrphan", n.identity)
- gb.attemptToOrphan.Add(n)
- return
- }
- if startsWaitingForDependentsDeleted(oldObj, newAccessor) {
- klog.V(2).Infof("add %s to the attemptToDelete, because it's waiting for its dependents to be deleted", n.identity)
- // if the n is added as a "virtual" node, its deletingDependents field is not properly set, so always set it here.
- n.markDeletingDependents()
- for dep := range n.dependents {
- gb.attemptToDelete.Add(dep)
- }
- gb.attemptToDelete.Add(n)
- }
- }
- func (gb *GraphBuilder) runProcessGraphChanges() {
- for gb.processGraphChanges() {
- }
- }
- // Dequeueing an event from graphChanges, updating graph, populating dirty_queue.
- func (gb *GraphBuilder) processGraphChanges() bool {
- item, quit := gb.graphChanges.Get()
- if quit {
- return false
- }
- defer gb.graphChanges.Done(item)
- event, ok := item.(*event)
- if !ok {
- utilruntime.HandleError(fmt.Errorf("expect a *event, got %v", item))
- return true
- }
- obj := event.obj
- accessor, err := meta.Accessor(obj)
- if err != nil {
- utilruntime.HandleError(fmt.Errorf("cannot access obj: %v", err))
- return true
- }
- klog.V(5).Infof("GraphBuilder process object: %s/%s, namespace %s, name %s, uid %s, event type %v", event.gvk.GroupVersion().String(), event.gvk.Kind, accessor.GetNamespace(), accessor.GetName(), string(accessor.GetUID()), event.eventType)
- // Check if the node already exists
- existingNode, found := gb.uidToNode.Read(accessor.GetUID())
- if found {
- // this marks the node as having been observed via an informer event
- // 1. this depends on graphChanges only containing add/update events from the actual informer
- // 2. this allows things tracking virtual nodes' existence to stop polling and rely on informer events
- existingNode.markObserved()
- }
- switch {
- case (event.eventType == addEvent || event.eventType == updateEvent) && !found:
- newNode := &node{
- identity: objectReference{
- OwnerReference: metav1.OwnerReference{
- APIVersion: event.gvk.GroupVersion().String(),
- Kind: event.gvk.Kind,
- UID: accessor.GetUID(),
- Name: accessor.GetName(),
- },
- Namespace: accessor.GetNamespace(),
- },
- dependents: make(map[*node]struct{}),
- owners: accessor.GetOwnerReferences(),
- deletingDependents: beingDeleted(accessor) && hasDeleteDependentsFinalizer(accessor),
- beingDeleted: beingDeleted(accessor),
- }
- gb.insertNode(newNode)
- // the underlying delta_fifo may combine a creation and a deletion into
- // one event, so we need to further process the event.
- gb.processTransitions(event.oldObj, accessor, newNode)
- case (event.eventType == addEvent || event.eventType == updateEvent) && found:
- // handle changes in ownerReferences
- added, removed, changed := referencesDiffs(existingNode.owners, accessor.GetOwnerReferences())
- if len(added) != 0 || len(removed) != 0 || len(changed) != 0 {
- // check if the changed dependency graph unblock owners that are
- // waiting for the deletion of their dependents.
- gb.addUnblockedOwnersToDeleteQueue(removed, changed)
- // update the node itself
- existingNode.owners = accessor.GetOwnerReferences()
- // Add the node to its new owners' dependent lists.
- gb.addDependentToOwners(existingNode, added)
- // remove the node from the dependent list of node that are no longer in
- // the node's owners list.
- gb.removeDependentFromOwners(existingNode, removed)
- }
- if beingDeleted(accessor) {
- existingNode.markBeingDeleted()
- }
- gb.processTransitions(event.oldObj, accessor, existingNode)
- case event.eventType == deleteEvent:
- if !found {
- klog.V(5).Infof("%v doesn't exist in the graph, this shouldn't happen", accessor.GetUID())
- return true
- }
- // removeNode updates the graph
- gb.removeNode(existingNode)
- existingNode.dependentsLock.RLock()
- defer existingNode.dependentsLock.RUnlock()
- if len(existingNode.dependents) > 0 {
- gb.absentOwnerCache.Add(accessor.GetUID())
- }
- for dep := range existingNode.dependents {
- gb.attemptToDelete.Add(dep)
- }
- for _, owner := range existingNode.owners {
- ownerNode, found := gb.uidToNode.Read(owner.UID)
- if !found || !ownerNode.isDeletingDependents() {
- continue
- }
- // this is to let attempToDeleteItem check if all the owner's
- // dependents are deleted, if so, the owner will be deleted.
- gb.attemptToDelete.Add(ownerNode)
- }
- }
- return true
- }
|