stateful_pod_control.go 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package statefulset
  14. import (
  15. "fmt"
  16. "strings"
  17. apps "k8s.io/api/apps/v1"
  18. "k8s.io/api/core/v1"
  19. apierrors "k8s.io/apimachinery/pkg/api/errors"
  20. errorutils "k8s.io/apimachinery/pkg/util/errors"
  21. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  22. clientset "k8s.io/client-go/kubernetes"
  23. appslisters "k8s.io/client-go/listers/apps/v1"
  24. corelisters "k8s.io/client-go/listers/core/v1"
  25. "k8s.io/client-go/tools/record"
  26. "k8s.io/client-go/util/retry"
  27. )
  28. // StatefulPodControlInterface defines the interface that StatefulSetController uses to create, update, and delete Pods,
  29. // and to update the Status of a StatefulSet. It follows the design paradigms used for PodControl, but its
  30. // implementation provides for PVC creation, ordered Pod creation, ordered Pod termination, and Pod identity enforcement.
  31. // Like controller.PodControlInterface, it is implemented as an interface to provide for testing fakes.
  32. type StatefulPodControlInterface interface {
  33. // CreateStatefulPod create a Pod in a StatefulSet. Any PVCs necessary for the Pod are created prior to creating
  34. // the Pod. If the returned error is nil the Pod and its PVCs have been created.
  35. CreateStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error
  36. // UpdateStatefulPod Updates a Pod in a StatefulSet. If the Pod already has the correct identity and stable
  37. // storage this method is a no-op. If the Pod must be mutated to conform to the Set, it is mutated and updated.
  38. // pod is an in-out parameter, and any updates made to the pod are reflected as mutations to this parameter. If
  39. // the create is successful, the returned error is nil.
  40. UpdateStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error
  41. // DeleteStatefulPod deletes a Pod in a StatefulSet. The pods PVCs are not deleted. If the delete is successful,
  42. // the returned error is nil.
  43. DeleteStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error
  44. }
  45. func NewRealStatefulPodControl(
  46. client clientset.Interface,
  47. setLister appslisters.StatefulSetLister,
  48. podLister corelisters.PodLister,
  49. pvcLister corelisters.PersistentVolumeClaimLister,
  50. recorder record.EventRecorder,
  51. ) StatefulPodControlInterface {
  52. return &realStatefulPodControl{client, setLister, podLister, pvcLister, recorder}
  53. }
  54. // realStatefulPodControl implements StatefulPodControlInterface using a clientset.Interface to communicate with the
  55. // API server. The struct is package private as the internal details are irrelevant to importing packages.
  56. type realStatefulPodControl struct {
  57. client clientset.Interface
  58. setLister appslisters.StatefulSetLister
  59. podLister corelisters.PodLister
  60. pvcLister corelisters.PersistentVolumeClaimLister
  61. recorder record.EventRecorder
  62. }
  63. func (spc *realStatefulPodControl) CreateStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error {
  64. // Create the Pod's PVCs prior to creating the Pod
  65. if err := spc.createPersistentVolumeClaims(set, pod); err != nil {
  66. spc.recordPodEvent("create", set, pod, err)
  67. return err
  68. }
  69. // If we created the PVCs attempt to create the Pod
  70. _, err := spc.client.CoreV1().Pods(set.Namespace).Create(pod)
  71. // sink already exists errors
  72. if apierrors.IsAlreadyExists(err) {
  73. return err
  74. }
  75. spc.recordPodEvent("create", set, pod, err)
  76. return err
  77. }
  78. func (spc *realStatefulPodControl) UpdateStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error {
  79. attemptedUpdate := false
  80. err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
  81. // assume the Pod is consistent
  82. consistent := true
  83. // if the Pod does not conform to its identity, update the identity and dirty the Pod
  84. if !identityMatches(set, pod) {
  85. updateIdentity(set, pod)
  86. consistent = false
  87. }
  88. // if the Pod does not conform to the StatefulSet's storage requirements, update the Pod's PVC's,
  89. // dirty the Pod, and create any missing PVCs
  90. if !storageMatches(set, pod) {
  91. updateStorage(set, pod)
  92. consistent = false
  93. if err := spc.createPersistentVolumeClaims(set, pod); err != nil {
  94. spc.recordPodEvent("update", set, pod, err)
  95. return err
  96. }
  97. }
  98. // if the Pod is not dirty, do nothing
  99. if consistent {
  100. return nil
  101. }
  102. attemptedUpdate = true
  103. // commit the update, retrying on conflicts
  104. _, updateErr := spc.client.CoreV1().Pods(set.Namespace).Update(pod)
  105. if updateErr == nil {
  106. return nil
  107. }
  108. if updated, err := spc.podLister.Pods(set.Namespace).Get(pod.Name); err == nil {
  109. // make a copy so we don't mutate the shared cache
  110. pod = updated.DeepCopy()
  111. } else {
  112. utilruntime.HandleError(fmt.Errorf("error getting updated Pod %s/%s from lister: %v", set.Namespace, pod.Name, err))
  113. }
  114. return updateErr
  115. })
  116. if attemptedUpdate {
  117. spc.recordPodEvent("update", set, pod, err)
  118. }
  119. return err
  120. }
  121. func (spc *realStatefulPodControl) DeleteStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error {
  122. err := spc.client.CoreV1().Pods(set.Namespace).Delete(pod.Name, nil)
  123. spc.recordPodEvent("delete", set, pod, err)
  124. return err
  125. }
  126. // recordPodEvent records an event for verb applied to a Pod in a StatefulSet. If err is nil the generated event will
  127. // have a reason of v1.EventTypeNormal. If err is not nil the generated event will have a reason of v1.EventTypeWarning.
  128. func (spc *realStatefulPodControl) recordPodEvent(verb string, set *apps.StatefulSet, pod *v1.Pod, err error) {
  129. if err == nil {
  130. reason := fmt.Sprintf("Successful%s", strings.Title(verb))
  131. message := fmt.Sprintf("%s Pod %s in StatefulSet %s successful",
  132. strings.ToLower(verb), pod.Name, set.Name)
  133. spc.recorder.Event(set, v1.EventTypeNormal, reason, message)
  134. } else {
  135. reason := fmt.Sprintf("Failed%s", strings.Title(verb))
  136. message := fmt.Sprintf("%s Pod %s in StatefulSet %s failed error: %s",
  137. strings.ToLower(verb), pod.Name, set.Name, err)
  138. spc.recorder.Event(set, v1.EventTypeWarning, reason, message)
  139. }
  140. }
  141. // recordClaimEvent records an event for verb applied to the PersistentVolumeClaim of a Pod in a StatefulSet. If err is
  142. // nil the generated event will have a reason of v1.EventTypeNormal. If err is not nil the generated event will have a
  143. // reason of v1.EventTypeWarning.
  144. func (spc *realStatefulPodControl) recordClaimEvent(verb string, set *apps.StatefulSet, pod *v1.Pod, claim *v1.PersistentVolumeClaim, err error) {
  145. if err == nil {
  146. reason := fmt.Sprintf("Successful%s", strings.Title(verb))
  147. message := fmt.Sprintf("%s Claim %s Pod %s in StatefulSet %s success",
  148. strings.ToLower(verb), claim.Name, pod.Name, set.Name)
  149. spc.recorder.Event(set, v1.EventTypeNormal, reason, message)
  150. } else {
  151. reason := fmt.Sprintf("Failed%s", strings.Title(verb))
  152. message := fmt.Sprintf("%s Claim %s for Pod %s in StatefulSet %s failed error: %s",
  153. strings.ToLower(verb), claim.Name, pod.Name, set.Name, err)
  154. spc.recorder.Event(set, v1.EventTypeWarning, reason, message)
  155. }
  156. }
  157. // createPersistentVolumeClaims creates all of the required PersistentVolumeClaims for pod, which must be a member of
  158. // set. If all of the claims for Pod are successfully created, the returned error is nil. If creation fails, this method
  159. // may be called again until no error is returned, indicating the PersistentVolumeClaims for pod are consistent with
  160. // set's Spec.
  161. func (spc *realStatefulPodControl) createPersistentVolumeClaims(set *apps.StatefulSet, pod *v1.Pod) error {
  162. var errs []error
  163. for _, claim := range getPersistentVolumeClaims(set, pod) {
  164. _, err := spc.pvcLister.PersistentVolumeClaims(claim.Namespace).Get(claim.Name)
  165. switch {
  166. case apierrors.IsNotFound(err):
  167. _, err := spc.client.CoreV1().PersistentVolumeClaims(claim.Namespace).Create(&claim)
  168. if err != nil {
  169. errs = append(errs, fmt.Errorf("Failed to create PVC %s: %s", claim.Name, err))
  170. }
  171. if err == nil || !apierrors.IsAlreadyExists(err) {
  172. spc.recordClaimEvent("create", set, pod, &claim, err)
  173. }
  174. case err != nil:
  175. errs = append(errs, fmt.Errorf("Failed to retrieve PVC %s: %s", claim.Name, err))
  176. spc.recordClaimEvent("create", set, pod, &claim, err)
  177. }
  178. // TODO: Check resource requirements and accessmodes, update if necessary
  179. }
  180. return errorutils.NewAggregate(errs)
  181. }
  182. var _ StatefulPodControlInterface = &realStatefulPodControl{}