stateful_pod_control.go 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package statefulset
  14. import (
  15. "context"
  16. "fmt"
  17. "strings"
  18. apps "k8s.io/api/apps/v1"
  19. "k8s.io/api/core/v1"
  20. apierrors "k8s.io/apimachinery/pkg/api/errors"
  21. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  22. errorutils "k8s.io/apimachinery/pkg/util/errors"
  23. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  24. clientset "k8s.io/client-go/kubernetes"
  25. appslisters "k8s.io/client-go/listers/apps/v1"
  26. corelisters "k8s.io/client-go/listers/core/v1"
  27. "k8s.io/client-go/tools/record"
  28. "k8s.io/client-go/util/retry"
  29. )
  30. // StatefulPodControlInterface defines the interface that StatefulSetController uses to create, update, and delete Pods,
  31. // and to update the Status of a StatefulSet. It follows the design paradigms used for PodControl, but its
  32. // implementation provides for PVC creation, ordered Pod creation, ordered Pod termination, and Pod identity enforcement.
  33. // Like controller.PodControlInterface, it is implemented as an interface to provide for testing fakes.
  34. type StatefulPodControlInterface interface {
  35. // CreateStatefulPod create a Pod in a StatefulSet. Any PVCs necessary for the Pod are created prior to creating
  36. // the Pod. If the returned error is nil the Pod and its PVCs have been created.
  37. CreateStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error
  38. // UpdateStatefulPod Updates a Pod in a StatefulSet. If the Pod already has the correct identity and stable
  39. // storage this method is a no-op. If the Pod must be mutated to conform to the Set, it is mutated and updated.
  40. // pod is an in-out parameter, and any updates made to the pod are reflected as mutations to this parameter. If
  41. // the create is successful, the returned error is nil.
  42. UpdateStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error
  43. // DeleteStatefulPod deletes a Pod in a StatefulSet. The pods PVCs are not deleted. If the delete is successful,
  44. // the returned error is nil.
  45. DeleteStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error
  46. }
  47. func NewRealStatefulPodControl(
  48. client clientset.Interface,
  49. setLister appslisters.StatefulSetLister,
  50. podLister corelisters.PodLister,
  51. pvcLister corelisters.PersistentVolumeClaimLister,
  52. recorder record.EventRecorder,
  53. ) StatefulPodControlInterface {
  54. return &realStatefulPodControl{client, setLister, podLister, pvcLister, recorder}
  55. }
  56. // realStatefulPodControl implements StatefulPodControlInterface using a clientset.Interface to communicate with the
  57. // API server. The struct is package private as the internal details are irrelevant to importing packages.
  58. type realStatefulPodControl struct {
  59. client clientset.Interface
  60. setLister appslisters.StatefulSetLister
  61. podLister corelisters.PodLister
  62. pvcLister corelisters.PersistentVolumeClaimLister
  63. recorder record.EventRecorder
  64. }
  65. func (spc *realStatefulPodControl) CreateStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error {
  66. // Create the Pod's PVCs prior to creating the Pod
  67. if err := spc.createPersistentVolumeClaims(set, pod); err != nil {
  68. spc.recordPodEvent("create", set, pod, err)
  69. return err
  70. }
  71. // If we created the PVCs attempt to create the Pod
  72. _, err := spc.client.CoreV1().Pods(set.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
  73. // sink already exists errors
  74. if apierrors.IsAlreadyExists(err) {
  75. return err
  76. }
  77. spc.recordPodEvent("create", set, pod, err)
  78. return err
  79. }
  80. func (spc *realStatefulPodControl) UpdateStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error {
  81. attemptedUpdate := false
  82. err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
  83. // assume the Pod is consistent
  84. consistent := true
  85. // if the Pod does not conform to its identity, update the identity and dirty the Pod
  86. if !identityMatches(set, pod) {
  87. updateIdentity(set, pod)
  88. consistent = false
  89. }
  90. // if the Pod does not conform to the StatefulSet's storage requirements, update the Pod's PVC's,
  91. // dirty the Pod, and create any missing PVCs
  92. if !storageMatches(set, pod) {
  93. updateStorage(set, pod)
  94. consistent = false
  95. if err := spc.createPersistentVolumeClaims(set, pod); err != nil {
  96. spc.recordPodEvent("update", set, pod, err)
  97. return err
  98. }
  99. }
  100. // if the Pod is not dirty, do nothing
  101. if consistent {
  102. return nil
  103. }
  104. attemptedUpdate = true
  105. // commit the update, retrying on conflicts
  106. _, updateErr := spc.client.CoreV1().Pods(set.Namespace).Update(context.TODO(), pod, metav1.UpdateOptions{})
  107. if updateErr == nil {
  108. return nil
  109. }
  110. if updated, err := spc.podLister.Pods(set.Namespace).Get(pod.Name); err == nil {
  111. // make a copy so we don't mutate the shared cache
  112. pod = updated.DeepCopy()
  113. } else {
  114. utilruntime.HandleError(fmt.Errorf("error getting updated Pod %s/%s from lister: %v", set.Namespace, pod.Name, err))
  115. }
  116. return updateErr
  117. })
  118. if attemptedUpdate {
  119. spc.recordPodEvent("update", set, pod, err)
  120. }
  121. return err
  122. }
  123. func (spc *realStatefulPodControl) DeleteStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error {
  124. err := spc.client.CoreV1().Pods(set.Namespace).Delete(context.TODO(), pod.Name, nil)
  125. spc.recordPodEvent("delete", set, pod, err)
  126. return err
  127. }
  128. // recordPodEvent records an event for verb applied to a Pod in a StatefulSet. If err is nil the generated event will
  129. // have a reason of v1.EventTypeNormal. If err is not nil the generated event will have a reason of v1.EventTypeWarning.
  130. func (spc *realStatefulPodControl) recordPodEvent(verb string, set *apps.StatefulSet, pod *v1.Pod, err error) {
  131. if err == nil {
  132. reason := fmt.Sprintf("Successful%s", strings.Title(verb))
  133. message := fmt.Sprintf("%s Pod %s in StatefulSet %s successful",
  134. strings.ToLower(verb), pod.Name, set.Name)
  135. spc.recorder.Event(set, v1.EventTypeNormal, reason, message)
  136. } else {
  137. reason := fmt.Sprintf("Failed%s", strings.Title(verb))
  138. message := fmt.Sprintf("%s Pod %s in StatefulSet %s failed error: %s",
  139. strings.ToLower(verb), pod.Name, set.Name, err)
  140. spc.recorder.Event(set, v1.EventTypeWarning, reason, message)
  141. }
  142. }
  143. // recordClaimEvent records an event for verb applied to the PersistentVolumeClaim of a Pod in a StatefulSet. If err is
  144. // nil the generated event will have a reason of v1.EventTypeNormal. If err is not nil the generated event will have a
  145. // reason of v1.EventTypeWarning.
  146. func (spc *realStatefulPodControl) recordClaimEvent(verb string, set *apps.StatefulSet, pod *v1.Pod, claim *v1.PersistentVolumeClaim, err error) {
  147. if err == nil {
  148. reason := fmt.Sprintf("Successful%s", strings.Title(verb))
  149. message := fmt.Sprintf("%s Claim %s Pod %s in StatefulSet %s success",
  150. strings.ToLower(verb), claim.Name, pod.Name, set.Name)
  151. spc.recorder.Event(set, v1.EventTypeNormal, reason, message)
  152. } else {
  153. reason := fmt.Sprintf("Failed%s", strings.Title(verb))
  154. message := fmt.Sprintf("%s Claim %s for Pod %s in StatefulSet %s failed error: %s",
  155. strings.ToLower(verb), claim.Name, pod.Name, set.Name, err)
  156. spc.recorder.Event(set, v1.EventTypeWarning, reason, message)
  157. }
  158. }
  159. // createPersistentVolumeClaims creates all of the required PersistentVolumeClaims for pod, which must be a member of
  160. // set. If all of the claims for Pod are successfully created, the returned error is nil. If creation fails, this method
  161. // may be called again until no error is returned, indicating the PersistentVolumeClaims for pod are consistent with
  162. // set's Spec.
  163. func (spc *realStatefulPodControl) createPersistentVolumeClaims(set *apps.StatefulSet, pod *v1.Pod) error {
  164. var errs []error
  165. for _, claim := range getPersistentVolumeClaims(set, pod) {
  166. _, err := spc.pvcLister.PersistentVolumeClaims(claim.Namespace).Get(claim.Name)
  167. switch {
  168. case apierrors.IsNotFound(err):
  169. _, err := spc.client.CoreV1().PersistentVolumeClaims(claim.Namespace).Create(context.TODO(), &claim, metav1.CreateOptions{})
  170. if err != nil {
  171. errs = append(errs, fmt.Errorf("failed to create PVC %s: %s", claim.Name, err))
  172. }
  173. if err == nil || !apierrors.IsAlreadyExists(err) {
  174. spc.recordClaimEvent("create", set, pod, &claim, err)
  175. }
  176. case err != nil:
  177. errs = append(errs, fmt.Errorf("failed to retrieve PVC %s: %s", claim.Name, err))
  178. spc.recordClaimEvent("create", set, pod, &claim, err)
  179. }
  180. // TODO: Check resource requirements and accessmodes, update if necessary
  181. }
  182. return errorutils.NewAggregate(errs)
  183. }
  184. var _ StatefulPodControlInterface = &realStatefulPodControl{}