123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457 |
- /*
- Copyright 2016 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package statefulset
- import (
- "fmt"
- "reflect"
- "time"
- apps "k8s.io/api/apps/v1"
- "k8s.io/api/core/v1"
- "k8s.io/apimachinery/pkg/api/errors"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/labels"
- utilruntime "k8s.io/apimachinery/pkg/util/runtime"
- "k8s.io/apimachinery/pkg/util/wait"
- appsinformers "k8s.io/client-go/informers/apps/v1"
- coreinformers "k8s.io/client-go/informers/core/v1"
- clientset "k8s.io/client-go/kubernetes"
- "k8s.io/client-go/kubernetes/scheme"
- v1core "k8s.io/client-go/kubernetes/typed/core/v1"
- appslisters "k8s.io/client-go/listers/apps/v1"
- corelisters "k8s.io/client-go/listers/core/v1"
- "k8s.io/client-go/tools/cache"
- "k8s.io/client-go/tools/record"
- "k8s.io/client-go/util/workqueue"
- "k8s.io/kubernetes/pkg/controller"
- "k8s.io/kubernetes/pkg/controller/history"
- "k8s.io/klog"
- )
- // controllerKind contains the schema.GroupVersionKind for this controller type.
- var controllerKind = apps.SchemeGroupVersion.WithKind("StatefulSet")
- // StatefulSetController controls statefulsets.
- type StatefulSetController struct {
- // client interface
- kubeClient clientset.Interface
- // control returns an interface capable of syncing a stateful set.
- // Abstracted out for testing.
- control StatefulSetControlInterface
- // podControl is used for patching pods.
- podControl controller.PodControlInterface
- // podLister is able to list/get pods from a shared informer's store
- podLister corelisters.PodLister
- // podListerSynced returns true if the pod shared informer has synced at least once
- podListerSynced cache.InformerSynced
- // setLister is able to list/get stateful sets from a shared informer's store
- setLister appslisters.StatefulSetLister
- // setListerSynced returns true if the stateful set shared informer has synced at least once
- setListerSynced cache.InformerSynced
- // pvcListerSynced returns true if the pvc shared informer has synced at least once
- pvcListerSynced cache.InformerSynced
- // revListerSynced returns true if the rev shared informer has synced at least once
- revListerSynced cache.InformerSynced
- // StatefulSets that need to be synced.
- queue workqueue.RateLimitingInterface
- }
- // NewStatefulSetController creates a new statefulset controller.
- func NewStatefulSetController(
- podInformer coreinformers.PodInformer,
- setInformer appsinformers.StatefulSetInformer,
- pvcInformer coreinformers.PersistentVolumeClaimInformer,
- revInformer appsinformers.ControllerRevisionInformer,
- kubeClient clientset.Interface,
- ) *StatefulSetController {
- eventBroadcaster := record.NewBroadcaster()
- eventBroadcaster.StartLogging(klog.Infof)
- eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
- recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "statefulset-controller"})
- ssc := &StatefulSetController{
- kubeClient: kubeClient,
- control: NewDefaultStatefulSetControl(
- NewRealStatefulPodControl(
- kubeClient,
- setInformer.Lister(),
- podInformer.Lister(),
- pvcInformer.Lister(),
- recorder),
- NewRealStatefulSetStatusUpdater(kubeClient, setInformer.Lister()),
- history.NewHistory(kubeClient, revInformer.Lister()),
- recorder,
- ),
- pvcListerSynced: pvcInformer.Informer().HasSynced,
- queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "statefulset"),
- podControl: controller.RealPodControl{KubeClient: kubeClient, Recorder: recorder},
- revListerSynced: revInformer.Informer().HasSynced,
- }
- podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
- // lookup the statefulset and enqueue
- AddFunc: ssc.addPod,
- // lookup current and old statefulset if labels changed
- UpdateFunc: ssc.updatePod,
- // lookup statefulset accounting for deletion tombstones
- DeleteFunc: ssc.deletePod,
- })
- ssc.podLister = podInformer.Lister()
- ssc.podListerSynced = podInformer.Informer().HasSynced
- setInformer.Informer().AddEventHandler(
- cache.ResourceEventHandlerFuncs{
- AddFunc: ssc.enqueueStatefulSet,
- UpdateFunc: func(old, cur interface{}) {
- oldPS := old.(*apps.StatefulSet)
- curPS := cur.(*apps.StatefulSet)
- if oldPS.Status.Replicas != curPS.Status.Replicas {
- klog.V(4).Infof("Observed updated replica count for StatefulSet: %v, %d->%d", curPS.Name, oldPS.Status.Replicas, curPS.Status.Replicas)
- }
- ssc.enqueueStatefulSet(cur)
- },
- DeleteFunc: ssc.enqueueStatefulSet,
- },
- )
- ssc.setLister = setInformer.Lister()
- ssc.setListerSynced = setInformer.Informer().HasSynced
- // TODO: Watch volumes
- return ssc
- }
- // Run runs the statefulset controller.
- func (ssc *StatefulSetController) Run(workers int, stopCh <-chan struct{}) {
- defer utilruntime.HandleCrash()
- defer ssc.queue.ShutDown()
- klog.Infof("Starting stateful set controller")
- defer klog.Infof("Shutting down statefulset controller")
- if !controller.WaitForCacheSync("stateful set", stopCh, ssc.podListerSynced, ssc.setListerSynced, ssc.pvcListerSynced, ssc.revListerSynced) {
- return
- }
- for i := 0; i < workers; i++ {
- go wait.Until(ssc.worker, time.Second, stopCh)
- }
- <-stopCh
- }
- // addPod adds the statefulset for the pod to the sync queue
- func (ssc *StatefulSetController) addPod(obj interface{}) {
- pod := obj.(*v1.Pod)
- if pod.DeletionTimestamp != nil {
- // on a restart of the controller manager, it's possible a new pod shows up in a state that
- // is already pending deletion. Prevent the pod from being a creation observation.
- ssc.deletePod(pod)
- return
- }
- // If it has a ControllerRef, that's all that matters.
- if controllerRef := metav1.GetControllerOf(pod); controllerRef != nil {
- set := ssc.resolveControllerRef(pod.Namespace, controllerRef)
- if set == nil {
- return
- }
- klog.V(4).Infof("Pod %s created, labels: %+v", pod.Name, pod.Labels)
- ssc.enqueueStatefulSet(set)
- return
- }
- // Otherwise, it's an orphan. Get a list of all matching controllers and sync
- // them to see if anyone wants to adopt it.
- sets := ssc.getStatefulSetsForPod(pod)
- if len(sets) == 0 {
- return
- }
- klog.V(4).Infof("Orphan Pod %s created, labels: %+v", pod.Name, pod.Labels)
- for _, set := range sets {
- ssc.enqueueStatefulSet(set)
- }
- }
- // updatePod adds the statefulset for the current and old pods to the sync queue.
- func (ssc *StatefulSetController) updatePod(old, cur interface{}) {
- curPod := cur.(*v1.Pod)
- oldPod := old.(*v1.Pod)
- if curPod.ResourceVersion == oldPod.ResourceVersion {
- // In the event of a re-list we may receive update events for all known pods.
- // Two different versions of the same pod will always have different RVs.
- return
- }
- labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
- curControllerRef := metav1.GetControllerOf(curPod)
- oldControllerRef := metav1.GetControllerOf(oldPod)
- controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
- if controllerRefChanged && oldControllerRef != nil {
- // The ControllerRef was changed. Sync the old controller, if any.
- if set := ssc.resolveControllerRef(oldPod.Namespace, oldControllerRef); set != nil {
- ssc.enqueueStatefulSet(set)
- }
- }
- // If it has a ControllerRef, that's all that matters.
- if curControllerRef != nil {
- set := ssc.resolveControllerRef(curPod.Namespace, curControllerRef)
- if set == nil {
- return
- }
- klog.V(4).Infof("Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta)
- ssc.enqueueStatefulSet(set)
- return
- }
- // Otherwise, it's an orphan. If anything changed, sync matching controllers
- // to see if anyone wants to adopt it now.
- if labelChanged || controllerRefChanged {
- sets := ssc.getStatefulSetsForPod(curPod)
- if len(sets) == 0 {
- return
- }
- klog.V(4).Infof("Orphan Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta)
- for _, set := range sets {
- ssc.enqueueStatefulSet(set)
- }
- }
- }
- // deletePod enqueues the statefulset for the pod accounting for deletion tombstones.
- func (ssc *StatefulSetController) deletePod(obj interface{}) {
- pod, ok := obj.(*v1.Pod)
- // When a delete is dropped, the relist will notice a pod in the store not
- // in the list, leading to the insertion of a tombstone object which contains
- // the deleted key/value. Note that this value might be stale.
- if !ok {
- tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
- if !ok {
- utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %+v", obj))
- return
- }
- pod, ok = tombstone.Obj.(*v1.Pod)
- if !ok {
- utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %+v", obj))
- return
- }
- }
- controllerRef := metav1.GetControllerOf(pod)
- if controllerRef == nil {
- // No controller should care about orphans being deleted.
- return
- }
- set := ssc.resolveControllerRef(pod.Namespace, controllerRef)
- if set == nil {
- return
- }
- klog.V(4).Infof("Pod %s/%s deleted through %v.", pod.Namespace, pod.Name, utilruntime.GetCaller())
- ssc.enqueueStatefulSet(set)
- }
- // getPodsForStatefulSet returns the Pods that a given StatefulSet should manage.
- // It also reconciles ControllerRef by adopting/orphaning.
- //
- // NOTE: Returned Pods are pointers to objects from the cache.
- // If you need to modify one, you need to copy it first.
- func (ssc *StatefulSetController) getPodsForStatefulSet(set *apps.StatefulSet, selector labels.Selector) ([]*v1.Pod, error) {
- // List all pods to include the pods that don't match the selector anymore but
- // has a ControllerRef pointing to this StatefulSet.
- pods, err := ssc.podLister.Pods(set.Namespace).List(labels.Everything())
- if err != nil {
- return nil, err
- }
- filter := func(pod *v1.Pod) bool {
- // Only claim if it matches our StatefulSet name. Otherwise release/ignore.
- return isMemberOf(set, pod)
- }
- // If any adoptions are attempted, we should first recheck for deletion with
- // an uncached quorum read sometime after listing Pods (see #42639).
- canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) {
- fresh, err := ssc.kubeClient.AppsV1().StatefulSets(set.Namespace).Get(set.Name, metav1.GetOptions{})
- if err != nil {
- return nil, err
- }
- if fresh.UID != set.UID {
- return nil, fmt.Errorf("original StatefulSet %v/%v is gone: got uid %v, wanted %v", set.Namespace, set.Name, fresh.UID, set.UID)
- }
- return fresh, nil
- })
- cm := controller.NewPodControllerRefManager(ssc.podControl, set, selector, controllerKind, canAdoptFunc)
- return cm.ClaimPods(pods, filter)
- }
- // adoptOrphanRevisions adopts any orphaned ControllerRevisions matched by set's Selector.
- func (ssc *StatefulSetController) adoptOrphanRevisions(set *apps.StatefulSet) error {
- revisions, err := ssc.control.ListRevisions(set)
- if err != nil {
- return err
- }
- hasOrphans := false
- for i := range revisions {
- if metav1.GetControllerOf(revisions[i]) == nil {
- hasOrphans = true
- break
- }
- }
- if hasOrphans {
- fresh, err := ssc.kubeClient.AppsV1().StatefulSets(set.Namespace).Get(set.Name, metav1.GetOptions{})
- if err != nil {
- return err
- }
- if fresh.UID != set.UID {
- return fmt.Errorf("original StatefulSet %v/%v is gone: got uid %v, wanted %v", set.Namespace, set.Name, fresh.UID, set.UID)
- }
- return ssc.control.AdoptOrphanRevisions(set, revisions)
- }
- return nil
- }
- // getStatefulSetsForPod returns a list of StatefulSets that potentially match
- // a given pod.
- func (ssc *StatefulSetController) getStatefulSetsForPod(pod *v1.Pod) []*apps.StatefulSet {
- sets, err := ssc.setLister.GetPodStatefulSets(pod)
- if err != nil {
- return nil
- }
- // More than one set is selecting the same Pod
- if len(sets) > 1 {
- // ControllerRef will ensure we don't do anything crazy, but more than one
- // item in this list nevertheless constitutes user error.
- utilruntime.HandleError(
- fmt.Errorf(
- "user error: more than one StatefulSet is selecting pods with labels: %+v",
- pod.Labels))
- }
- return sets
- }
- // resolveControllerRef returns the controller referenced by a ControllerRef,
- // or nil if the ControllerRef could not be resolved to a matching controller
- // of the correct Kind.
- func (ssc *StatefulSetController) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *apps.StatefulSet {
- // We can't look up by UID, so look up by Name and then verify UID.
- // Don't even try to look up by Name if it's the wrong Kind.
- if controllerRef.Kind != controllerKind.Kind {
- return nil
- }
- set, err := ssc.setLister.StatefulSets(namespace).Get(controllerRef.Name)
- if err != nil {
- return nil
- }
- if set.UID != controllerRef.UID {
- // The controller we found with this Name is not the same one that the
- // ControllerRef points to.
- return nil
- }
- return set
- }
- // enqueueStatefulSet enqueues the given statefulset in the work queue.
- func (ssc *StatefulSetController) enqueueStatefulSet(obj interface{}) {
- key, err := controller.KeyFunc(obj)
- if err != nil {
- utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
- return
- }
- ssc.queue.Add(key)
- }
- // processNextWorkItem dequeues items, processes them, and marks them done. It enforces that the syncHandler is never
- // invoked concurrently with the same key.
- func (ssc *StatefulSetController) processNextWorkItem() bool {
- key, quit := ssc.queue.Get()
- if quit {
- return false
- }
- defer ssc.queue.Done(key)
- if err := ssc.sync(key.(string)); err != nil {
- utilruntime.HandleError(fmt.Errorf("Error syncing StatefulSet %v, requeuing: %v", key.(string), err))
- ssc.queue.AddRateLimited(key)
- } else {
- ssc.queue.Forget(key)
- }
- return true
- }
- // worker runs a worker goroutine that invokes processNextWorkItem until the controller's queue is closed
- func (ssc *StatefulSetController) worker() {
- for ssc.processNextWorkItem() {
- }
- }
- // sync syncs the given statefulset.
- func (ssc *StatefulSetController) sync(key string) error {
- startTime := time.Now()
- defer func() {
- klog.V(4).Infof("Finished syncing statefulset %q (%v)", key, time.Since(startTime))
- }()
- namespace, name, err := cache.SplitMetaNamespaceKey(key)
- if err != nil {
- return err
- }
- set, err := ssc.setLister.StatefulSets(namespace).Get(name)
- if errors.IsNotFound(err) {
- klog.Infof("StatefulSet has been deleted %v", key)
- return nil
- }
- if err != nil {
- utilruntime.HandleError(fmt.Errorf("unable to retrieve StatefulSet %v from store: %v", key, err))
- return err
- }
- selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
- if err != nil {
- utilruntime.HandleError(fmt.Errorf("error converting StatefulSet %v selector: %v", key, err))
- // This is a non-transient error, so don't retry.
- return nil
- }
- if err := ssc.adoptOrphanRevisions(set); err != nil {
- return err
- }
- pods, err := ssc.getPodsForStatefulSet(set, selector)
- if err != nil {
- return err
- }
- return ssc.syncStatefulSet(set, pods)
- }
- // syncStatefulSet syncs a tuple of (statefulset, []*v1.Pod).
- func (ssc *StatefulSetController) syncStatefulSet(set *apps.StatefulSet, pods []*v1.Pod) error {
- klog.V(4).Infof("Syncing StatefulSet %v/%v with %d pods", set.Namespace, set.Name, len(pods))
- // TODO: investigate where we mutate the set during the update as it is not obvious.
- if err := ssc.control.UpdateStatefulSet(set.DeepCopy(), pods); err != nil {
- return err
- }
- klog.V(4).Infof("Successfully synced StatefulSet %s/%s successful", set.Namespace, set.Name)
- return nil
- }
|