stateful_set.go 16 KB


  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package statefulset
  14. import (
  15. "fmt"
  16. "reflect"
  17. "time"
  18. apps "k8s.io/api/apps/v1"
  19. "k8s.io/api/core/v1"
  20. "k8s.io/apimachinery/pkg/api/errors"
  21. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  22. "k8s.io/apimachinery/pkg/labels"
  23. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  24. "k8s.io/apimachinery/pkg/util/wait"
  25. appsinformers "k8s.io/client-go/informers/apps/v1"
  26. coreinformers "k8s.io/client-go/informers/core/v1"
  27. clientset "k8s.io/client-go/kubernetes"
  28. "k8s.io/client-go/kubernetes/scheme"
  29. v1core "k8s.io/client-go/kubernetes/typed/core/v1"
  30. appslisters "k8s.io/client-go/listers/apps/v1"
  31. corelisters "k8s.io/client-go/listers/core/v1"
  32. "k8s.io/client-go/tools/cache"
  33. "k8s.io/client-go/tools/record"
  34. "k8s.io/client-go/util/workqueue"
  35. "k8s.io/kubernetes/pkg/controller"
  36. "k8s.io/kubernetes/pkg/controller/history"
  37. "k8s.io/klog"
  38. )
  39. // controllerKind contains the schema.GroupVersionKind for this controller type.
  40. var controllerKind = apps.SchemeGroupVersion.WithKind("StatefulSet")
  41. // StatefulSetController controls statefulsets.
  42. type StatefulSetController struct {
  43. // client interface
  44. kubeClient clientset.Interface
  45. // control returns an interface capable of syncing a stateful set.
  46. // Abstracted out for testing.
  47. control StatefulSetControlInterface
  48. // podControl is used for patching pods.
  49. podControl controller.PodControlInterface
  50. // podLister is able to list/get pods from a shared informer's store
  51. podLister corelisters.PodLister
  52. // podListerSynced returns true if the pod shared informer has synced at least once
  53. podListerSynced cache.InformerSynced
  54. // setLister is able to list/get stateful sets from a shared informer's store
  55. setLister appslisters.StatefulSetLister
  56. // setListerSynced returns true if the stateful set shared informer has synced at least once
  57. setListerSynced cache.InformerSynced
  58. // pvcListerSynced returns true if the pvc shared informer has synced at least once
  59. pvcListerSynced cache.InformerSynced
  60. // revListerSynced returns true if the rev shared informer has synced at least once
  61. revListerSynced cache.InformerSynced
  62. // StatefulSets that need to be synced.
  63. queue workqueue.RateLimitingInterface
  64. }
  65. // NewStatefulSetController creates a new statefulset controller.
  66. func NewStatefulSetController(
  67. podInformer coreinformers.PodInformer,
  68. setInformer appsinformers.StatefulSetInformer,
  69. pvcInformer coreinformers.PersistentVolumeClaimInformer,
  70. revInformer appsinformers.ControllerRevisionInformer,
  71. kubeClient clientset.Interface,
  72. ) *StatefulSetController {
  73. eventBroadcaster := record.NewBroadcaster()
  74. eventBroadcaster.StartLogging(klog.Infof)
  75. eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
  76. recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "statefulset-controller"})
  77. ssc := &StatefulSetController{
  78. kubeClient: kubeClient,
  79. control: NewDefaultStatefulSetControl(
  80. NewRealStatefulPodControl(
  81. kubeClient,
  82. setInformer.Lister(),
  83. podInformer.Lister(),
  84. pvcInformer.Lister(),
  85. recorder),
  86. NewRealStatefulSetStatusUpdater(kubeClient, setInformer.Lister()),
  87. history.NewHistory(kubeClient, revInformer.Lister()),
  88. recorder,
  89. ),
  90. pvcListerSynced: pvcInformer.Informer().HasSynced,
  91. queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "statefulset"),
  92. podControl: controller.RealPodControl{KubeClient: kubeClient, Recorder: recorder},
  93. revListerSynced: revInformer.Informer().HasSynced,
  94. }
  95. podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  96. // lookup the statefulset and enqueue
  97. AddFunc: ssc.addPod,
  98. // lookup current and old statefulset if labels changed
  99. UpdateFunc: ssc.updatePod,
  100. // lookup statefulset accounting for deletion tombstones
  101. DeleteFunc: ssc.deletePod,
  102. })
  103. ssc.podLister = podInformer.Lister()
  104. ssc.podListerSynced = podInformer.Informer().HasSynced
  105. setInformer.Informer().AddEventHandler(
  106. cache.ResourceEventHandlerFuncs{
  107. AddFunc: ssc.enqueueStatefulSet,
  108. UpdateFunc: func(old, cur interface{}) {
  109. oldPS := old.(*apps.StatefulSet)
  110. curPS := cur.(*apps.StatefulSet)
  111. if oldPS.Status.Replicas != curPS.Status.Replicas {
  112. klog.V(4).Infof("Observed updated replica count for StatefulSet: %v, %d->%d", curPS.Name, oldPS.Status.Replicas, curPS.Status.Replicas)
  113. }
  114. ssc.enqueueStatefulSet(cur)
  115. },
  116. DeleteFunc: ssc.enqueueStatefulSet,
  117. },
  118. )
  119. ssc.setLister = setInformer.Lister()
  120. ssc.setListerSynced = setInformer.Informer().HasSynced
  121. // TODO: Watch volumes
  122. return ssc
  123. }
  124. // Run runs the statefulset controller.
  125. func (ssc *StatefulSetController) Run(workers int, stopCh <-chan struct{}) {
  126. defer utilruntime.HandleCrash()
  127. defer ssc.queue.ShutDown()
  128. klog.Infof("Starting stateful set controller")
  129. defer klog.Infof("Shutting down statefulset controller")
  130. if !controller.WaitForCacheSync("stateful set", stopCh, ssc.podListerSynced, ssc.setListerSynced, ssc.pvcListerSynced, ssc.revListerSynced) {
  131. return
  132. }
  133. for i := 0; i < workers; i++ {
  134. go wait.Until(ssc.worker, time.Second, stopCh)
  135. }
  136. <-stopCh
  137. }
  138. // addPod adds the statefulset for the pod to the sync queue
  139. func (ssc *StatefulSetController) addPod(obj interface{}) {
  140. pod := obj.(*v1.Pod)
  141. if pod.DeletionTimestamp != nil {
  142. // on a restart of the controller manager, it's possible a new pod shows up in a state that
  143. // is already pending deletion. Prevent the pod from being a creation observation.
  144. ssc.deletePod(pod)
  145. return
  146. }
  147. // If it has a ControllerRef, that's all that matters.
  148. if controllerRef := metav1.GetControllerOf(pod); controllerRef != nil {
  149. set := ssc.resolveControllerRef(pod.Namespace, controllerRef)
  150. if set == nil {
  151. return
  152. }
  153. klog.V(4).Infof("Pod %s created, labels: %+v", pod.Name, pod.Labels)
  154. ssc.enqueueStatefulSet(set)
  155. return
  156. }
  157. // Otherwise, it's an orphan. Get a list of all matching controllers and sync
  158. // them to see if anyone wants to adopt it.
  159. sets := ssc.getStatefulSetsForPod(pod)
  160. if len(sets) == 0 {
  161. return
  162. }
  163. klog.V(4).Infof("Orphan Pod %s created, labels: %+v", pod.Name, pod.Labels)
  164. for _, set := range sets {
  165. ssc.enqueueStatefulSet(set)
  166. }
  167. }
  168. // updatePod adds the statefulset for the current and old pods to the sync queue.
  169. func (ssc *StatefulSetController) updatePod(old, cur interface{}) {
  170. curPod := cur.(*v1.Pod)
  171. oldPod := old.(*v1.Pod)
  172. if curPod.ResourceVersion == oldPod.ResourceVersion {
  173. // In the event of a re-list we may receive update events for all known pods.
  174. // Two different versions of the same pod will always have different RVs.
  175. return
  176. }
  177. labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
  178. curControllerRef := metav1.GetControllerOf(curPod)
  179. oldControllerRef := metav1.GetControllerOf(oldPod)
  180. controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
  181. if controllerRefChanged && oldControllerRef != nil {
  182. // The ControllerRef was changed. Sync the old controller, if any.
  183. if set := ssc.resolveControllerRef(oldPod.Namespace, oldControllerRef); set != nil {
  184. ssc.enqueueStatefulSet(set)
  185. }
  186. }
  187. // If it has a ControllerRef, that's all that matters.
  188. if curControllerRef != nil {
  189. set := ssc.resolveControllerRef(curPod.Namespace, curControllerRef)
  190. if set == nil {
  191. return
  192. }
  193. klog.V(4).Infof("Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta)
  194. ssc.enqueueStatefulSet(set)
  195. return
  196. }
  197. // Otherwise, it's an orphan. If anything changed, sync matching controllers
  198. // to see if anyone wants to adopt it now.
  199. if labelChanged || controllerRefChanged {
  200. sets := ssc.getStatefulSetsForPod(curPod)
  201. if len(sets) == 0 {
  202. return
  203. }
  204. klog.V(4).Infof("Orphan Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta)
  205. for _, set := range sets {
  206. ssc.enqueueStatefulSet(set)
  207. }
  208. }
  209. }
  210. // deletePod enqueues the statefulset for the pod accounting for deletion tombstones.
  211. func (ssc *StatefulSetController) deletePod(obj interface{}) {
  212. pod, ok := obj.(*v1.Pod)
  213. // When a delete is dropped, the relist will notice a pod in the store not
  214. // in the list, leading to the insertion of a tombstone object which contains
  215. // the deleted key/value. Note that this value might be stale.
  216. if !ok {
  217. tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
  218. if !ok {
  219. utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %+v", obj))
  220. return
  221. }
  222. pod, ok = tombstone.Obj.(*v1.Pod)
  223. if !ok {
  224. utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %+v", obj))
  225. return
  226. }
  227. }
  228. controllerRef := metav1.GetControllerOf(pod)
  229. if controllerRef == nil {
  230. // No controller should care about orphans being deleted.
  231. return
  232. }
  233. set := ssc.resolveControllerRef(pod.Namespace, controllerRef)
  234. if set == nil {
  235. return
  236. }
  237. klog.V(4).Infof("Pod %s/%s deleted through %v.", pod.Namespace, pod.Name, utilruntime.GetCaller())
  238. ssc.enqueueStatefulSet(set)
  239. }
  240. // getPodsForStatefulSet returns the Pods that a given StatefulSet should manage.
  241. // It also reconciles ControllerRef by adopting/orphaning.
  242. //
  243. // NOTE: Returned Pods are pointers to objects from the cache.
  244. // If you need to modify one, you need to copy it first.
  245. func (ssc *StatefulSetController) getPodsForStatefulSet(set *apps.StatefulSet, selector labels.Selector) ([]*v1.Pod, error) {
  246. // List all pods to include the pods that don't match the selector anymore but
  247. // has a ControllerRef pointing to this StatefulSet.
  248. pods, err := ssc.podLister.Pods(set.Namespace).List(labels.Everything())
  249. if err != nil {
  250. return nil, err
  251. }
  252. filter := func(pod *v1.Pod) bool {
  253. // Only claim if it matches our StatefulSet name. Otherwise release/ignore.
  254. return isMemberOf(set, pod)
  255. }
  256. // If any adoptions are attempted, we should first recheck for deletion with
  257. // an uncached quorum read sometime after listing Pods (see #42639).
  258. canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) {
  259. fresh, err := ssc.kubeClient.AppsV1().StatefulSets(set.Namespace).Get(set.Name, metav1.GetOptions{})
  260. if err != nil {
  261. return nil, err
  262. }
  263. if fresh.UID != set.UID {
  264. return nil, fmt.Errorf("original StatefulSet %v/%v is gone: got uid %v, wanted %v", set.Namespace, set.Name, fresh.UID, set.UID)
  265. }
  266. return fresh, nil
  267. })
  268. cm := controller.NewPodControllerRefManager(ssc.podControl, set, selector, controllerKind, canAdoptFunc)
  269. return cm.ClaimPods(pods, filter)
  270. }
  271. // adoptOrphanRevisions adopts any orphaned ControllerRevisions matched by set's Selector.
  272. func (ssc *StatefulSetController) adoptOrphanRevisions(set *apps.StatefulSet) error {
  273. revisions, err := ssc.control.ListRevisions(set)
  274. if err != nil {
  275. return err
  276. }
  277. hasOrphans := false
  278. for i := range revisions {
  279. if metav1.GetControllerOf(revisions[i]) == nil {
  280. hasOrphans = true
  281. break
  282. }
  283. }
  284. if hasOrphans {
  285. fresh, err := ssc.kubeClient.AppsV1().StatefulSets(set.Namespace).Get(set.Name, metav1.GetOptions{})
  286. if err != nil {
  287. return err
  288. }
  289. if fresh.UID != set.UID {
  290. return fmt.Errorf("original StatefulSet %v/%v is gone: got uid %v, wanted %v", set.Namespace, set.Name, fresh.UID, set.UID)
  291. }
  292. return ssc.control.AdoptOrphanRevisions(set, revisions)
  293. }
  294. return nil
  295. }
  296. // getStatefulSetsForPod returns a list of StatefulSets that potentially match
  297. // a given pod.
  298. func (ssc *StatefulSetController) getStatefulSetsForPod(pod *v1.Pod) []*apps.StatefulSet {
  299. sets, err := ssc.setLister.GetPodStatefulSets(pod)
  300. if err != nil {
  301. return nil
  302. }
  303. // More than one set is selecting the same Pod
  304. if len(sets) > 1 {
  305. // ControllerRef will ensure we don't do anything crazy, but more than one
  306. // item in this list nevertheless constitutes user error.
  307. utilruntime.HandleError(
  308. fmt.Errorf(
  309. "user error: more than one StatefulSet is selecting pods with labels: %+v",
  310. pod.Labels))
  311. }
  312. return sets
  313. }
  314. // resolveControllerRef returns the controller referenced by a ControllerRef,
  315. // or nil if the ControllerRef could not be resolved to a matching controller
  316. // of the correct Kind.
  317. func (ssc *StatefulSetController) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *apps.StatefulSet {
  318. // We can't look up by UID, so look up by Name and then verify UID.
  319. // Don't even try to look up by Name if it's the wrong Kind.
  320. if controllerRef.Kind != controllerKind.Kind {
  321. return nil
  322. }
  323. set, err := ssc.setLister.StatefulSets(namespace).Get(controllerRef.Name)
  324. if err != nil {
  325. return nil
  326. }
  327. if set.UID != controllerRef.UID {
  328. // The controller we found with this Name is not the same one that the
  329. // ControllerRef points to.
  330. return nil
  331. }
  332. return set
  333. }
  334. // enqueueStatefulSet enqueues the given statefulset in the work queue.
  335. func (ssc *StatefulSetController) enqueueStatefulSet(obj interface{}) {
  336. key, err := controller.KeyFunc(obj)
  337. if err != nil {
  338. utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
  339. return
  340. }
  341. ssc.queue.Add(key)
  342. }
  343. // processNextWorkItem dequeues items, processes them, and marks them done. It enforces that the syncHandler is never
  344. // invoked concurrently with the same key.
  345. func (ssc *StatefulSetController) processNextWorkItem() bool {
  346. key, quit := ssc.queue.Get()
  347. if quit {
  348. return false
  349. }
  350. defer ssc.queue.Done(key)
  351. if err := ssc.sync(key.(string)); err != nil {
  352. utilruntime.HandleError(fmt.Errorf("Error syncing StatefulSet %v, requeuing: %v", key.(string), err))
  353. ssc.queue.AddRateLimited(key)
  354. } else {
  355. ssc.queue.Forget(key)
  356. }
  357. return true
  358. }
  359. // worker runs a worker goroutine that invokes processNextWorkItem until the controller's queue is closed
  360. func (ssc *StatefulSetController) worker() {
  361. for ssc.processNextWorkItem() {
  362. }
  363. }
  364. // sync syncs the given statefulset.
  365. func (ssc *StatefulSetController) sync(key string) error {
  366. startTime := time.Now()
  367. defer func() {
  368. klog.V(4).Infof("Finished syncing statefulset %q (%v)", key, time.Since(startTime))
  369. }()
  370. namespace, name, err := cache.SplitMetaNamespaceKey(key)
  371. if err != nil {
  372. return err
  373. }
  374. set, err := ssc.setLister.StatefulSets(namespace).Get(name)
  375. if errors.IsNotFound(err) {
  376. klog.Infof("StatefulSet has been deleted %v", key)
  377. return nil
  378. }
  379. if err != nil {
  380. utilruntime.HandleError(fmt.Errorf("unable to retrieve StatefulSet %v from store: %v", key, err))
  381. return err
  382. }
  383. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  384. if err != nil {
  385. utilruntime.HandleError(fmt.Errorf("error converting StatefulSet %v selector: %v", key, err))
  386. // This is a non-transient error, so don't retry.
  387. return nil
  388. }
  389. if err := ssc.adoptOrphanRevisions(set); err != nil {
  390. return err
  391. }
  392. pods, err := ssc.getPodsForStatefulSet(set, selector)
  393. if err != nil {
  394. return err
  395. }
  396. return ssc.syncStatefulSet(set, pods)
  397. }
  398. // syncStatefulSet syncs a tuple of (statefulset, []*v1.Pod).
  399. func (ssc *StatefulSetController) syncStatefulSet(set *apps.StatefulSet, pods []*v1.Pod) error {
  400. klog.V(4).Infof("Syncing StatefulSet %v/%v with %d pods", set.Namespace, set.Name, len(pods))
  401. // TODO: investigate where we mutate the set during the update as it is not obvious.
  402. if err := ssc.control.UpdateStatefulSet(set.DeepCopy(), pods); err != nil {
  403. return err
  404. }
  405. klog.V(4).Infof("Successfully synced StatefulSet %s/%s successful", set.Namespace, set.Name)
  406. return nil
  407. }