123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407 |
- /*
- Copyright 2016 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package statefulset
- import (
- "bytes"
- "encoding/json"
- "fmt"
- "regexp"
- "strconv"
- apps "k8s.io/api/apps/v1"
- "k8s.io/api/core/v1"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/runtime"
- "k8s.io/apimachinery/pkg/util/strategicpatch"
- "k8s.io/client-go/kubernetes/scheme"
- podutil "k8s.io/kubernetes/pkg/api/v1/pod"
- "k8s.io/kubernetes/pkg/controller"
- "k8s.io/kubernetes/pkg/controller/history"
- )
- // maxUpdateRetries is the maximum number of retries used for update conflict resolution prior to failure
- const maxUpdateRetries = 10
- // updateConflictError is the error used to indicate that the maximum number of retries against the API server have
- // been attempted and we need to back off
- var updateConflictError = fmt.Errorf("aborting update after %d attempts", maxUpdateRetries)
- var patchCodec = scheme.Codecs.LegacyCodec(apps.SchemeGroupVersion)
- // overlappingStatefulSets sorts a list of StatefulSets by creation timestamp, using their names as a tie breaker.
- // Generally used to tie break between StatefulSets that have overlapping selectors.
- type overlappingStatefulSets []*apps.StatefulSet
- func (o overlappingStatefulSets) Len() int { return len(o) }
- func (o overlappingStatefulSets) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
- func (o overlappingStatefulSets) Less(i, j int) bool {
- if o[i].CreationTimestamp.Equal(&o[j].CreationTimestamp) {
- return o[i].Name < o[j].Name
- }
- return o[i].CreationTimestamp.Before(&o[j].CreationTimestamp)
- }
- // statefulPodRegex is a regular expression that extracts the parent StatefulSet and ordinal from the Name of a Pod
- var statefulPodRegex = regexp.MustCompile("(.*)-([0-9]+)$")
- // getParentNameAndOrdinal gets the name of pod's parent StatefulSet and pod's ordinal as extracted from its Name. If
- // the Pod was not created by a StatefulSet, its parent is considered to be empty string, and its ordinal is considered
- // to be -1.
- func getParentNameAndOrdinal(pod *v1.Pod) (string, int) {
- parent := ""
- ordinal := -1
- subMatches := statefulPodRegex.FindStringSubmatch(pod.Name)
- if len(subMatches) < 3 {
- return parent, ordinal
- }
- parent = subMatches[1]
- if i, err := strconv.ParseInt(subMatches[2], 10, 32); err == nil {
- ordinal = int(i)
- }
- return parent, ordinal
- }
- // getParentName gets the name of pod's parent StatefulSet. If pod has not parent, the empty string is returned.
- func getParentName(pod *v1.Pod) string {
- parent, _ := getParentNameAndOrdinal(pod)
- return parent
- }
- // getOrdinal gets pod's ordinal. If pod has no ordinal, -1 is returned.
- func getOrdinal(pod *v1.Pod) int {
- _, ordinal := getParentNameAndOrdinal(pod)
- return ordinal
- }
- // getPodName gets the name of set's child Pod with an ordinal index of ordinal
- func getPodName(set *apps.StatefulSet, ordinal int) string {
- return fmt.Sprintf("%s-%d", set.Name, ordinal)
- }
- // getPersistentVolumeClaimName gets the name of PersistentVolumeClaim for a Pod with an ordinal index of ordinal. claim
- // must be a PersistentVolumeClaim from set's VolumeClaims template.
- func getPersistentVolumeClaimName(set *apps.StatefulSet, claim *v1.PersistentVolumeClaim, ordinal int) string {
- // NOTE: This name format is used by the heuristics for zone spreading in ChooseZoneForVolume
- return fmt.Sprintf("%s-%s-%d", claim.Name, set.Name, ordinal)
- }
- // isMemberOf tests if pod is a member of set.
- func isMemberOf(set *apps.StatefulSet, pod *v1.Pod) bool {
- return getParentName(pod) == set.Name
- }
- // identityMatches returns true if pod has a valid identity and network identity for a member of set.
- func identityMatches(set *apps.StatefulSet, pod *v1.Pod) bool {
- parent, ordinal := getParentNameAndOrdinal(pod)
- return ordinal >= 0 &&
- set.Name == parent &&
- pod.Name == getPodName(set, ordinal) &&
- pod.Namespace == set.Namespace &&
- pod.Labels[apps.StatefulSetPodNameLabel] == pod.Name
- }
- // storageMatches returns true if pod's Volumes cover the set of PersistentVolumeClaims
- func storageMatches(set *apps.StatefulSet, pod *v1.Pod) bool {
- ordinal := getOrdinal(pod)
- if ordinal < 0 {
- return false
- }
- volumes := make(map[string]v1.Volume, len(pod.Spec.Volumes))
- for _, volume := range pod.Spec.Volumes {
- volumes[volume.Name] = volume
- }
- for _, claim := range set.Spec.VolumeClaimTemplates {
- volume, found := volumes[claim.Name]
- if !found ||
- volume.VolumeSource.PersistentVolumeClaim == nil ||
- volume.VolumeSource.PersistentVolumeClaim.ClaimName !=
- getPersistentVolumeClaimName(set, &claim, ordinal) {
- return false
- }
- }
- return true
- }
- // getPersistentVolumeClaims gets a map of PersistentVolumeClaims to their template names, as defined in set. The
- // returned PersistentVolumeClaims are each constructed with a the name specific to the Pod. This name is determined
- // by getPersistentVolumeClaimName.
- func getPersistentVolumeClaims(set *apps.StatefulSet, pod *v1.Pod) map[string]v1.PersistentVolumeClaim {
- ordinal := getOrdinal(pod)
- templates := set.Spec.VolumeClaimTemplates
- claims := make(map[string]v1.PersistentVolumeClaim, len(templates))
- for i := range templates {
- claim := templates[i]
- claim.Name = getPersistentVolumeClaimName(set, &claim, ordinal)
- claim.Namespace = set.Namespace
- if claim.Labels != nil {
- for key, value := range set.Spec.Selector.MatchLabels {
- claim.Labels[key] = value
- }
- } else {
- claim.Labels = set.Spec.Selector.MatchLabels
- }
- claims[templates[i].Name] = claim
- }
- return claims
- }
- // updateStorage updates pod's Volumes to conform with the PersistentVolumeClaim of set's templates. If pod has
- // conflicting local Volumes these are replaced with Volumes that conform to the set's templates.
- func updateStorage(set *apps.StatefulSet, pod *v1.Pod) {
- currentVolumes := pod.Spec.Volumes
- claims := getPersistentVolumeClaims(set, pod)
- newVolumes := make([]v1.Volume, 0, len(claims))
- for name, claim := range claims {
- newVolumes = append(newVolumes, v1.Volume{
- Name: name,
- VolumeSource: v1.VolumeSource{
- PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
- ClaimName: claim.Name,
- // TODO: Use source definition to set this value when we have one.
- ReadOnly: false,
- },
- },
- })
- }
- for i := range currentVolumes {
- if _, ok := claims[currentVolumes[i].Name]; !ok {
- newVolumes = append(newVolumes, currentVolumes[i])
- }
- }
- pod.Spec.Volumes = newVolumes
- }
- func initIdentity(set *apps.StatefulSet, pod *v1.Pod) {
- updateIdentity(set, pod)
- // Set these immutable fields only on initial Pod creation, not updates.
- pod.Spec.Hostname = pod.Name
- pod.Spec.Subdomain = set.Spec.ServiceName
- }
- // updateIdentity updates pod's name, hostname, and subdomain, and StatefulSetPodNameLabel to conform to set's name
- // and headless service.
- func updateIdentity(set *apps.StatefulSet, pod *v1.Pod) {
- pod.Name = getPodName(set, getOrdinal(pod))
- pod.Namespace = set.Namespace
- if pod.Labels == nil {
- pod.Labels = make(map[string]string)
- }
- pod.Labels[apps.StatefulSetPodNameLabel] = pod.Name
- }
- // isRunningAndReady returns true if pod is in the PodRunning Phase, if it has a condition of PodReady.
- func isRunningAndReady(pod *v1.Pod) bool {
- return pod.Status.Phase == v1.PodRunning && podutil.IsPodReady(pod)
- }
- // isCreated returns true if pod has been created and is maintained by the API server
- func isCreated(pod *v1.Pod) bool {
- return pod.Status.Phase != ""
- }
- // isFailed returns true if pod has a Phase of PodFailed
- func isFailed(pod *v1.Pod) bool {
- return pod.Status.Phase == v1.PodFailed
- }
- // isTerminating returns true if pod's DeletionTimestamp has been set
- func isTerminating(pod *v1.Pod) bool {
- return pod.DeletionTimestamp != nil
- }
- // isHealthy returns true if pod is running and ready and has not been terminated
- func isHealthy(pod *v1.Pod) bool {
- return isRunningAndReady(pod) && !isTerminating(pod)
- }
- // allowsBurst is true if the alpha burst annotation is set.
- func allowsBurst(set *apps.StatefulSet) bool {
- return set.Spec.PodManagementPolicy == apps.ParallelPodManagement
- }
- // setPodRevision sets the revision of Pod to revision by adding the StatefulSetRevisionLabel
- func setPodRevision(pod *v1.Pod, revision string) {
- if pod.Labels == nil {
- pod.Labels = make(map[string]string)
- }
- pod.Labels[apps.StatefulSetRevisionLabel] = revision
- }
- // getPodRevision gets the revision of Pod by inspecting the StatefulSetRevisionLabel. If pod has no revision the empty
- // string is returned.
- func getPodRevision(pod *v1.Pod) string {
- if pod.Labels == nil {
- return ""
- }
- return pod.Labels[apps.StatefulSetRevisionLabel]
- }
- // newStatefulSetPod returns a new Pod conforming to the set's Spec with an identity generated from ordinal.
- func newStatefulSetPod(set *apps.StatefulSet, ordinal int) *v1.Pod {
- pod, _ := controller.GetPodFromTemplate(&set.Spec.Template, set, metav1.NewControllerRef(set, controllerKind))
- pod.Name = getPodName(set, ordinal)
- initIdentity(set, pod)
- updateStorage(set, pod)
- return pod
- }
- // newVersionedStatefulSetPod creates a new Pod for a StatefulSet. currentSet is the representation of the set at the
- // current revision. updateSet is the representation of the set at the updateRevision. currentRevision is the name of
- // the current revision. updateRevision is the name of the update revision. ordinal is the ordinal of the Pod. If the
- // returned error is nil, the returned Pod is valid.
- func newVersionedStatefulSetPod(currentSet, updateSet *apps.StatefulSet, currentRevision, updateRevision string, ordinal int) *v1.Pod {
- if currentSet.Spec.UpdateStrategy.Type == apps.RollingUpdateStatefulSetStrategyType &&
- (currentSet.Spec.UpdateStrategy.RollingUpdate == nil && ordinal < int(currentSet.Status.CurrentReplicas)) ||
- (currentSet.Spec.UpdateStrategy.RollingUpdate != nil && ordinal < int(*currentSet.Spec.UpdateStrategy.RollingUpdate.Partition)) {
- pod := newStatefulSetPod(currentSet, ordinal)
- setPodRevision(pod, currentRevision)
- return pod
- }
- pod := newStatefulSetPod(updateSet, ordinal)
- setPodRevision(pod, updateRevision)
- return pod
- }
- // Match check if the given StatefulSet's template matches the template stored in the given history.
- func Match(ss *apps.StatefulSet, history *apps.ControllerRevision) (bool, error) {
- patch, err := getPatch(ss)
- if err != nil {
- return false, err
- }
- return bytes.Equal(patch, history.Data.Raw), nil
- }
- // getPatch returns a strategic merge patch that can be applied to restore a StatefulSet to a
- // previous version. If the returned error is nil the patch is valid. The current state that we save is just the
- // PodSpecTemplate. We can modify this later to encompass more state (or less) and remain compatible with previously
- // recorded patches.
- func getPatch(set *apps.StatefulSet) ([]byte, error) {
- str, err := runtime.Encode(patchCodec, set)
- if err != nil {
- return nil, err
- }
- var raw map[string]interface{}
- json.Unmarshal([]byte(str), &raw)
- objCopy := make(map[string]interface{})
- specCopy := make(map[string]interface{})
- spec := raw["spec"].(map[string]interface{})
- template := spec["template"].(map[string]interface{})
- specCopy["template"] = template
- template["$patch"] = "replace"
- objCopy["spec"] = specCopy
- patch, err := json.Marshal(objCopy)
- return patch, err
- }
- // newRevision creates a new ControllerRevision containing a patch that reapplies the target state of set.
- // The Revision of the returned ControllerRevision is set to revision. If the returned error is nil, the returned
- // ControllerRevision is valid. StatefulSet revisions are stored as patches that re-apply the current state of set
- // to a new StatefulSet using a strategic merge patch to replace the saved state of the new StatefulSet.
- func newRevision(set *apps.StatefulSet, revision int64, collisionCount *int32) (*apps.ControllerRevision, error) {
- patch, err := getPatch(set)
- if err != nil {
- return nil, err
- }
- cr, err := history.NewControllerRevision(set,
- controllerKind,
- set.Spec.Template.Labels,
- runtime.RawExtension{Raw: patch},
- revision,
- collisionCount)
- if err != nil {
- return nil, err
- }
- if cr.ObjectMeta.Annotations == nil {
- cr.ObjectMeta.Annotations = make(map[string]string)
- }
- for key, value := range set.Annotations {
- cr.ObjectMeta.Annotations[key] = value
- }
- return cr, nil
- }
- // ApplyRevision returns a new StatefulSet constructed by restoring the state in revision to set. If the returned error
- // is nil, the returned StatefulSet is valid.
- func ApplyRevision(set *apps.StatefulSet, revision *apps.ControllerRevision) (*apps.StatefulSet, error) {
- clone := set.DeepCopy()
- patched, err := strategicpatch.StrategicMergePatch([]byte(runtime.EncodeOrDie(patchCodec, clone)), revision.Data.Raw, clone)
- if err != nil {
- return nil, err
- }
- restoredSet := &apps.StatefulSet{}
- err = json.Unmarshal(patched, restoredSet)
- if err != nil {
- return nil, err
- }
- return restoredSet, nil
- }
- // nextRevision finds the next valid revision number based on revisions. If the length of revisions
- // is 0 this is 1. Otherwise, it is 1 greater than the largest revision's Revision. This method
- // assumes that revisions has been sorted by Revision.
- func nextRevision(revisions []*apps.ControllerRevision) int64 {
- count := len(revisions)
- if count <= 0 {
- return 1
- }
- return revisions[count-1].Revision + 1
- }
- // inconsistentStatus returns true if the ObservedGeneration of status is greater than set's
- // Generation or if any of the status's fields do not match those of set's status.
- func inconsistentStatus(set *apps.StatefulSet, status *apps.StatefulSetStatus) bool {
- return status.ObservedGeneration > set.Status.ObservedGeneration ||
- status.Replicas != set.Status.Replicas ||
- status.CurrentReplicas != set.Status.CurrentReplicas ||
- status.ReadyReplicas != set.Status.ReadyReplicas ||
- status.UpdatedReplicas != set.Status.UpdatedReplicas ||
- status.CurrentRevision != set.Status.CurrentRevision ||
- status.UpdateRevision != set.Status.UpdateRevision
- }
- // completeRollingUpdate completes a rolling update when all of set's replica Pods have been updated
- // to the updateRevision. status's currentRevision is set to updateRevision and its' updateRevision
- // is set to the empty string. status's currentReplicas is set to updateReplicas and its updateReplicas
- // are set to 0.
- func completeRollingUpdate(set *apps.StatefulSet, status *apps.StatefulSetStatus) {
- if set.Spec.UpdateStrategy.Type == apps.RollingUpdateStatefulSetStrategyType &&
- status.UpdatedReplicas == status.Replicas &&
- status.ReadyReplicas == status.Replicas {
- status.CurrentReplicas = status.UpdatedReplicas
- status.CurrentRevision = status.UpdateRevision
- }
- }
- // ascendingOrdinal is a sort.Interface that Sorts a list of Pods based on the ordinals extracted
- // from the Pod. Pod's that have not been constructed by StatefulSet's have an ordinal of -1, and are therefore pushed
- // to the front of the list.
- type ascendingOrdinal []*v1.Pod
- func (ao ascendingOrdinal) Len() int {
- return len(ao)
- }
- func (ao ascendingOrdinal) Swap(i, j int) {
- ao[i], ao[j] = ao[j], ao[i]
- }
- func (ao ascendingOrdinal) Less(i, j int) bool {
- return getOrdinal(ao[i]) < getOrdinal(ao[j])
- }
|