123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557 |
- /*
- Copyright 2016 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package statefulset
- import (
- "math"
- "sort"
- "k8s.io/klog"
- apps "k8s.io/api/apps/v1"
- "k8s.io/api/core/v1"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/client-go/tools/record"
- "k8s.io/kubernetes/pkg/controller/history"
- )
- // StatefulSetControl implements the control logic for updating StatefulSets and their children Pods. It is implemented
- // as an interface to allow for extensions that provide different semantics. Currently, there is only one implementation.
- type StatefulSetControlInterface interface {
- // UpdateStatefulSet implements the control logic for Pod creation, update, and deletion, and
- // persistent volume creation, update, and deletion.
- // If an implementation returns a non-nil error, the invocation will be retried using a rate-limited strategy.
- // Implementors should sink any errors that they do not wish to trigger a retry, and they may feel free to
- // exit exceptionally at any point provided they wish the update to be re-run at a later point in time.
- UpdateStatefulSet(set *apps.StatefulSet, pods []*v1.Pod) error
- // ListRevisions returns a array of the ControllerRevisions that represent the revisions of set. If the returned
- // error is nil, the returns slice of ControllerRevisions is valid.
- ListRevisions(set *apps.StatefulSet) ([]*apps.ControllerRevision, error)
- // AdoptOrphanRevisions adopts any orphaned ControllerRevisions that match set's Selector. If all adoptions are
- // successful the returned error is nil.
- AdoptOrphanRevisions(set *apps.StatefulSet, revisions []*apps.ControllerRevision) error
- }
- // NewDefaultStatefulSetControl returns a new instance of the default implementation StatefulSetControlInterface that
- // implements the documented semantics for StatefulSets. podControl is the PodControlInterface used to create, update,
- // and delete Pods and to create PersistentVolumeClaims. statusUpdater is the StatefulSetStatusUpdaterInterface used
- // to update the status of StatefulSets. You should use an instance returned from NewRealStatefulPodControl() for any
- // scenario other than testing.
- func NewDefaultStatefulSetControl(
- podControl StatefulPodControlInterface,
- statusUpdater StatefulSetStatusUpdaterInterface,
- controllerHistory history.Interface,
- recorder record.EventRecorder) StatefulSetControlInterface {
- return &defaultStatefulSetControl{podControl, statusUpdater, controllerHistory, recorder}
- }
- type defaultStatefulSetControl struct {
- podControl StatefulPodControlInterface
- statusUpdater StatefulSetStatusUpdaterInterface
- controllerHistory history.Interface
- recorder record.EventRecorder
- }
- // UpdateStatefulSet executes the core logic loop for a stateful set, applying the predictable and
- // consistent monotonic update strategy by default - scale up proceeds in ordinal order, no new pod
- // is created while any pod is unhealthy, and pods are terminated in descending order. The burst
- // strategy allows these constraints to be relaxed - pods will be created and deleted eagerly and
- // in no particular order. Clients using the burst strategy should be careful to ensure they
- // understand the consistency implications of having unpredictable numbers of pods available.
- func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, pods []*v1.Pod) error {
- // list all revisions and sort them
- revisions, err := ssc.ListRevisions(set)
- if err != nil {
- return err
- }
- history.SortControllerRevisions(revisions)
- // get the current, and update revisions
- currentRevision, updateRevision, collisionCount, err := ssc.getStatefulSetRevisions(set, revisions)
- if err != nil {
- return err
- }
- // perform the main update function and get the status
- status, err := ssc.updateStatefulSet(set, currentRevision, updateRevision, collisionCount, pods)
- if err != nil {
- return err
- }
- // update the set's status
- err = ssc.updateStatefulSetStatus(set, status)
- if err != nil {
- return err
- }
- klog.V(4).Infof("StatefulSet %s/%s pod status replicas=%d ready=%d current=%d updated=%d",
- set.Namespace,
- set.Name,
- status.Replicas,
- status.ReadyReplicas,
- status.CurrentReplicas,
- status.UpdatedReplicas)
- klog.V(4).Infof("StatefulSet %s/%s revisions current=%s update=%s",
- set.Namespace,
- set.Name,
- status.CurrentRevision,
- status.UpdateRevision)
- // maintain the set's revision history limit
- return ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision)
- }
- func (ssc *defaultStatefulSetControl) ListRevisions(set *apps.StatefulSet) ([]*apps.ControllerRevision, error) {
- selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
- if err != nil {
- return nil, err
- }
- return ssc.controllerHistory.ListControllerRevisions(set, selector)
- }
- func (ssc *defaultStatefulSetControl) AdoptOrphanRevisions(
- set *apps.StatefulSet,
- revisions []*apps.ControllerRevision) error {
- for i := range revisions {
- adopted, err := ssc.controllerHistory.AdoptControllerRevision(set, controllerKind, revisions[i])
- if err != nil {
- return err
- }
- revisions[i] = adopted
- }
- return nil
- }
- // truncateHistory truncates any non-live ControllerRevisions in revisions from set's history. The UpdateRevision and
- // CurrentRevision in set's Status are considered to be live. Any revisions associated with the Pods in pods are also
- // considered to be live. Non-live revisions are deleted, starting with the revision with the lowest Revision, until
- // only RevisionHistoryLimit revisions remain. If the returned error is nil the operation was successful. This method
- // expects that revisions is sorted when supplied.
- func (ssc *defaultStatefulSetControl) truncateHistory(
- set *apps.StatefulSet,
- pods []*v1.Pod,
- revisions []*apps.ControllerRevision,
- current *apps.ControllerRevision,
- update *apps.ControllerRevision) error {
- history := make([]*apps.ControllerRevision, 0, len(revisions))
- // mark all live revisions
- live := map[string]bool{current.Name: true, update.Name: true}
- for i := range pods {
- live[getPodRevision(pods[i])] = true
- }
- // collect live revisions and historic revisions
- for i := range revisions {
- if !live[revisions[i].Name] {
- history = append(history, revisions[i])
- }
- }
- historyLen := len(history)
- historyLimit := int(*set.Spec.RevisionHistoryLimit)
- if historyLen <= historyLimit {
- return nil
- }
- // delete any non-live history to maintain the revision limit.
- history = history[:(historyLen - historyLimit)]
- for i := 0; i < len(history); i++ {
- if err := ssc.controllerHistory.DeleteControllerRevision(history[i]); err != nil {
- return err
- }
- }
- return nil
- }
- // getStatefulSetRevisions returns the current and update ControllerRevisions for set. It also
- // returns a collision count that records the number of name collisions set saw when creating
- // new ControllerRevisions. This count is incremented on every name collision and is used in
- // building the ControllerRevision names for name collision avoidance. This method may create
- // a new revision, or modify the Revision of an existing revision if an update to set is detected.
- // This method expects that revisions is sorted when supplied.
- func (ssc *defaultStatefulSetControl) getStatefulSetRevisions(
- set *apps.StatefulSet,
- revisions []*apps.ControllerRevision) (*apps.ControllerRevision, *apps.ControllerRevision, int32, error) {
- var currentRevision, updateRevision *apps.ControllerRevision
- revisionCount := len(revisions)
- history.SortControllerRevisions(revisions)
- // Use a local copy of set.Status.CollisionCount to avoid modifying set.Status directly.
- // This copy is returned so the value gets carried over to set.Status in updateStatefulSet.
- var collisionCount int32
- if set.Status.CollisionCount != nil {
- collisionCount = *set.Status.CollisionCount
- }
- // create a new revision from the current set
- updateRevision, err := newRevision(set, nextRevision(revisions), &collisionCount)
- if err != nil {
- return nil, nil, collisionCount, err
- }
- // find any equivalent revisions
- equalRevisions := history.FindEqualRevisions(revisions, updateRevision)
- equalCount := len(equalRevisions)
- if equalCount > 0 && history.EqualRevision(revisions[revisionCount-1], equalRevisions[equalCount-1]) {
- // if the equivalent revision is immediately prior the update revision has not changed
- updateRevision = revisions[revisionCount-1]
- } else if equalCount > 0 {
- // if the equivalent revision is not immediately prior we will roll back by incrementing the
- // Revision of the equivalent revision
- updateRevision, err = ssc.controllerHistory.UpdateControllerRevision(
- equalRevisions[equalCount-1],
- updateRevision.Revision)
- if err != nil {
- return nil, nil, collisionCount, err
- }
- } else {
- //if there is no equivalent revision we create a new one
- updateRevision, err = ssc.controllerHistory.CreateControllerRevision(set, updateRevision, &collisionCount)
- if err != nil {
- return nil, nil, collisionCount, err
- }
- }
- // attempt to find the revision that corresponds to the current revision
- for i := range revisions {
- if revisions[i].Name == set.Status.CurrentRevision {
- currentRevision = revisions[i]
- }
- }
- // if the current revision is nil we initialize the history by setting it to the update revision
- if currentRevision == nil {
- currentRevision = updateRevision
- }
- return currentRevision, updateRevision, collisionCount, nil
- }
- // updateStatefulSet performs the update function for a StatefulSet. This method creates, updates, and deletes Pods in
- // the set in order to conform the system to the target state for the set. The target state always contains
- // set.Spec.Replicas Pods with a Ready Condition. If the UpdateStrategy.Type for the set is
- // RollingUpdateStatefulSetStrategyType then all Pods in the set must be at set.Status.CurrentRevision.
- // If the UpdateStrategy.Type for the set is OnDeleteStatefulSetStrategyType, the target state implies nothing about
- // the revisions of Pods in the set. If the UpdateStrategy.Type for the set is PartitionStatefulSetStrategyType, then
- // all Pods with ordinal less than UpdateStrategy.Partition.Ordinal must be at Status.CurrentRevision and all other
- // Pods must be at Status.UpdateRevision. If the returned error is nil, the returned StatefulSetStatus is valid and the
- // update must be recorded. If the error is not nil, the method should be retried until successful.
- func (ssc *defaultStatefulSetControl) updateStatefulSet(
- set *apps.StatefulSet,
- currentRevision *apps.ControllerRevision,
- updateRevision *apps.ControllerRevision,
- collisionCount int32,
- pods []*v1.Pod) (*apps.StatefulSetStatus, error) {
- // get the current and update revisions of the set.
- currentSet, err := ApplyRevision(set, currentRevision)
- if err != nil {
- return nil, err
- }
- updateSet, err := ApplyRevision(set, updateRevision)
- if err != nil {
- return nil, err
- }
- // set the generation, and revisions in the returned status
- status := apps.StatefulSetStatus{}
- status.ObservedGeneration = set.Generation
- status.CurrentRevision = currentRevision.Name
- status.UpdateRevision = updateRevision.Name
- status.CollisionCount = new(int32)
- *status.CollisionCount = collisionCount
- replicaCount := int(*set.Spec.Replicas)
- // slice that will contain all Pods such that 0 <= getOrdinal(pod) < set.Spec.Replicas
- replicas := make([]*v1.Pod, replicaCount)
- // slice that will contain all Pods such that set.Spec.Replicas <= getOrdinal(pod)
- condemned := make([]*v1.Pod, 0, len(pods))
- unhealthy := 0
- firstUnhealthyOrdinal := math.MaxInt32
- var firstUnhealthyPod *v1.Pod
- // First we partition pods into two lists valid replicas and condemned Pods
- for i := range pods {
- status.Replicas++
- // count the number of running and ready replicas
- if isRunningAndReady(pods[i]) {
- status.ReadyReplicas++
- }
- // count the number of current and update replicas
- if isCreated(pods[i]) && !isTerminating(pods[i]) {
- if getPodRevision(pods[i]) == currentRevision.Name {
- status.CurrentReplicas++
- }
- if getPodRevision(pods[i]) == updateRevision.Name {
- status.UpdatedReplicas++
- }
- }
- if ord := getOrdinal(pods[i]); 0 <= ord && ord < replicaCount {
- // if the ordinal of the pod is within the range of the current number of replicas,
- // insert it at the indirection of its ordinal
- replicas[ord] = pods[i]
- } else if ord >= replicaCount {
- // if the ordinal is greater than the number of replicas add it to the condemned list
- condemned = append(condemned, pods[i])
- }
- // If the ordinal could not be parsed (ord < 0), ignore the Pod.
- }
- // for any empty indices in the sequence [0,set.Spec.Replicas) create a new Pod at the correct revision
- for ord := 0; ord < replicaCount; ord++ {
- if replicas[ord] == nil {
- replicas[ord] = newVersionedStatefulSetPod(
- currentSet,
- updateSet,
- currentRevision.Name,
- updateRevision.Name, ord)
- }
- }
- // sort the condemned Pods by their ordinals
- sort.Sort(ascendingOrdinal(condemned))
- // find the first unhealthy Pod
- for i := range replicas {
- if !isHealthy(replicas[i]) {
- unhealthy++
- if ord := getOrdinal(replicas[i]); ord < firstUnhealthyOrdinal {
- firstUnhealthyOrdinal = ord
- firstUnhealthyPod = replicas[i]
- }
- }
- }
- for i := range condemned {
- if !isHealthy(condemned[i]) {
- unhealthy++
- if ord := getOrdinal(condemned[i]); ord < firstUnhealthyOrdinal {
- firstUnhealthyOrdinal = ord
- firstUnhealthyPod = condemned[i]
- }
- }
- }
- if unhealthy > 0 {
- klog.V(4).Infof("StatefulSet %s/%s has %d unhealthy Pods starting with %s",
- set.Namespace,
- set.Name,
- unhealthy,
- firstUnhealthyPod.Name)
- }
- // If the StatefulSet is being deleted, don't do anything other than updating
- // status.
- if set.DeletionTimestamp != nil {
- return &status, nil
- }
- monotonic := !allowsBurst(set)
- // Examine each replica with respect to its ordinal
- for i := range replicas {
- // delete and recreate failed pods
- if isFailed(replicas[i]) {
- ssc.recorder.Eventf(set, v1.EventTypeWarning, "RecreatingFailedPod",
- "StatefulSet %s/%s is recreating failed Pod %s",
- set.Namespace,
- set.Name,
- replicas[i].Name)
- if err := ssc.podControl.DeleteStatefulPod(set, replicas[i]); err != nil {
- return &status, err
- }
- if getPodRevision(replicas[i]) == currentRevision.Name {
- status.CurrentReplicas--
- }
- if getPodRevision(replicas[i]) == updateRevision.Name {
- status.UpdatedReplicas--
- }
- status.Replicas--
- replicas[i] = newVersionedStatefulSetPod(
- currentSet,
- updateSet,
- currentRevision.Name,
- updateRevision.Name,
- i)
- }
- // If we find a Pod that has not been created we create the Pod
- if !isCreated(replicas[i]) {
- if err := ssc.podControl.CreateStatefulPod(set, replicas[i]); err != nil {
- return &status, err
- }
- status.Replicas++
- if getPodRevision(replicas[i]) == currentRevision.Name {
- status.CurrentReplicas++
- }
- if getPodRevision(replicas[i]) == updateRevision.Name {
- status.UpdatedReplicas++
- }
- // if the set does not allow bursting, return immediately
- if monotonic {
- return &status, nil
- }
- // pod created, no more work possible for this round
- continue
- }
- // If we find a Pod that is currently terminating, we must wait until graceful deletion
- // completes before we continue to make progress.
- if isTerminating(replicas[i]) && monotonic {
- klog.V(4).Infof(
- "StatefulSet %s/%s is waiting for Pod %s to Terminate",
- set.Namespace,
- set.Name,
- replicas[i].Name)
- return &status, nil
- }
- // If we have a Pod that has been created but is not running and ready we can not make progress.
- // We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its
- // ordinal, are Running and Ready.
- if !isRunningAndReady(replicas[i]) && monotonic {
- klog.V(4).Infof(
- "StatefulSet %s/%s is waiting for Pod %s to be Running and Ready",
- set.Namespace,
- set.Name,
- replicas[i].Name)
- return &status, nil
- }
- // Enforce the StatefulSet invariants
- if identityMatches(set, replicas[i]) && storageMatches(set, replicas[i]) {
- continue
- }
- // Make a deep copy so we don't mutate the shared cache
- replica := replicas[i].DeepCopy()
- if err := ssc.podControl.UpdateStatefulPod(updateSet, replica); err != nil {
- return &status, err
- }
- }
- // At this point, all of the current Replicas are Running and Ready, we can consider termination.
- // We will wait for all predecessors to be Running and Ready prior to attempting a deletion.
- // We will terminate Pods in a monotonically decreasing order over [len(pods),set.Spec.Replicas).
- // Note that we do not resurrect Pods in this interval. Also note that scaling will take precedence over
- // updates.
- for target := len(condemned) - 1; target >= 0; target-- {
- // wait for terminating pods to expire
- if isTerminating(condemned[target]) {
- klog.V(4).Infof(
- "StatefulSet %s/%s is waiting for Pod %s to Terminate prior to scale down",
- set.Namespace,
- set.Name,
- condemned[target].Name)
- // block if we are in monotonic mode
- if monotonic {
- return &status, nil
- }
- continue
- }
- // if we are in monotonic mode and the condemned target is not the first unhealthy Pod block
- if !isRunningAndReady(condemned[target]) && monotonic && condemned[target] != firstUnhealthyPod {
- klog.V(4).Infof(
- "StatefulSet %s/%s is waiting for Pod %s to be Running and Ready prior to scale down",
- set.Namespace,
- set.Name,
- firstUnhealthyPod.Name)
- return &status, nil
- }
- klog.V(2).Infof("StatefulSet %s/%s terminating Pod %s for scale down",
- set.Namespace,
- set.Name,
- condemned[target].Name)
- if err := ssc.podControl.DeleteStatefulPod(set, condemned[target]); err != nil {
- return &status, err
- }
- if getPodRevision(condemned[target]) == currentRevision.Name {
- status.CurrentReplicas--
- }
- if getPodRevision(condemned[target]) == updateRevision.Name {
- status.UpdatedReplicas--
- }
- if monotonic {
- return &status, nil
- }
- }
- // for the OnDelete strategy we short circuit. Pods will be updated when they are manually deleted.
- if set.Spec.UpdateStrategy.Type == apps.OnDeleteStatefulSetStrategyType {
- return &status, nil
- }
- // we compute the minimum ordinal of the target sequence for a destructive update based on the strategy.
- updateMin := 0
- if set.Spec.UpdateStrategy.RollingUpdate != nil {
- updateMin = int(*set.Spec.UpdateStrategy.RollingUpdate.Partition)
- }
- // we terminate the Pod with the largest ordinal that does not match the update revision.
- for target := len(replicas) - 1; target >= updateMin; target-- {
- // delete the Pod if it is not already terminating and does not match the update revision.
- if getPodRevision(replicas[target]) != updateRevision.Name && !isTerminating(replicas[target]) {
- klog.V(2).Infof("StatefulSet %s/%s terminating Pod %s for update",
- set.Namespace,
- set.Name,
- replicas[target].Name)
- err := ssc.podControl.DeleteStatefulPod(set, replicas[target])
- status.CurrentReplicas--
- return &status, err
- }
- // wait for unhealthy Pods on update
- if !isHealthy(replicas[target]) {
- klog.V(4).Infof(
- "StatefulSet %s/%s is waiting for Pod %s to update",
- set.Namespace,
- set.Name,
- replicas[target].Name)
- return &status, nil
- }
- }
- return &status, nil
- }
- // updateStatefulSetStatus updates set's Status to be equal to status. If status indicates a complete update, it is
- // mutated to indicate completion. If status is semantically equivalent to set's Status no update is performed. If the
- // returned error is nil, the update is successful.
- func (ssc *defaultStatefulSetControl) updateStatefulSetStatus(
- set *apps.StatefulSet,
- status *apps.StatefulSetStatus) error {
- // complete any in progress rolling update if necessary
- completeRollingUpdate(set, status)
- // if the status is not inconsistent do not perform an update
- if !inconsistentStatus(set, status) {
- return nil
- }
- // copy set and update its status
- set = set.DeepCopy()
- if err := ssc.statusUpdater.UpdateStatefulSetStatus(set, status); err != nil {
- return err
- }
- return nil
- }
- var _ StatefulSetControlInterface = &defaultStatefulSetControl{}
|