stateful_set.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package statefulset
  14. import (
  15. "context"
  16. "fmt"
  17. "reflect"
  18. "time"
  19. apps "k8s.io/api/apps/v1"
  20. v1 "k8s.io/api/core/v1"
  21. "k8s.io/apimachinery/pkg/api/errors"
  22. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  23. "k8s.io/apimachinery/pkg/labels"
  24. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  25. "k8s.io/apimachinery/pkg/util/wait"
  26. appsinformers "k8s.io/client-go/informers/apps/v1"
  27. coreinformers "k8s.io/client-go/informers/core/v1"
  28. clientset "k8s.io/client-go/kubernetes"
  29. "k8s.io/client-go/kubernetes/scheme"
  30. v1core "k8s.io/client-go/kubernetes/typed/core/v1"
  31. appslisters "k8s.io/client-go/listers/apps/v1"
  32. corelisters "k8s.io/client-go/listers/core/v1"
  33. "k8s.io/client-go/tools/cache"
  34. "k8s.io/client-go/tools/record"
  35. "k8s.io/client-go/util/workqueue"
  36. "k8s.io/kubernetes/pkg/controller"
  37. "k8s.io/kubernetes/pkg/controller/history"
  38. "k8s.io/klog"
  39. )
  40. // controllerKind contains the schema.GroupVersionKind for this controller type.
  41. var controllerKind = apps.SchemeGroupVersion.WithKind("StatefulSet")
  42. // StatefulSetController controls statefulsets.
  43. type StatefulSetController struct {
  44. // client interface
  45. kubeClient clientset.Interface
  46. // control returns an interface capable of syncing a stateful set.
  47. // Abstracted out for testing.
  48. control StatefulSetControlInterface
  49. // podControl is used for patching pods.
  50. podControl controller.PodControlInterface
  51. // podLister is able to list/get pods from a shared informer's store
  52. podLister corelisters.PodLister
  53. // podListerSynced returns true if the pod shared informer has synced at least once
  54. podListerSynced cache.InformerSynced
  55. // setLister is able to list/get stateful sets from a shared informer's store
  56. setLister appslisters.StatefulSetLister
  57. // setListerSynced returns true if the stateful set shared informer has synced at least once
  58. setListerSynced cache.InformerSynced
  59. // pvcListerSynced returns true if the pvc shared informer has synced at least once
  60. pvcListerSynced cache.InformerSynced
  61. // revListerSynced returns true if the rev shared informer has synced at least once
  62. revListerSynced cache.InformerSynced
  63. // StatefulSets that need to be synced.
  64. queue workqueue.RateLimitingInterface
  65. }
  66. // NewStatefulSetController creates a new statefulset controller.
  67. func NewStatefulSetController(
  68. podInformer coreinformers.PodInformer,
  69. setInformer appsinformers.StatefulSetInformer,
  70. pvcInformer coreinformers.PersistentVolumeClaimInformer,
  71. revInformer appsinformers.ControllerRevisionInformer,
  72. kubeClient clientset.Interface,
  73. ) *StatefulSetController {
  74. eventBroadcaster := record.NewBroadcaster()
  75. eventBroadcaster.StartLogging(klog.Infof)
  76. eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
  77. recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "statefulset-controller"})
  78. ssc := &StatefulSetController{
  79. kubeClient: kubeClient,
  80. control: NewDefaultStatefulSetControl(
  81. NewRealStatefulPodControl(
  82. kubeClient,
  83. setInformer.Lister(),
  84. podInformer.Lister(),
  85. pvcInformer.Lister(),
  86. recorder),
  87. NewRealStatefulSetStatusUpdater(kubeClient, setInformer.Lister()),
  88. history.NewHistory(kubeClient, revInformer.Lister()),
  89. recorder,
  90. ),
  91. pvcListerSynced: pvcInformer.Informer().HasSynced,
  92. queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "statefulset"),
  93. podControl: controller.RealPodControl{KubeClient: kubeClient, Recorder: recorder},
  94. revListerSynced: revInformer.Informer().HasSynced,
  95. }
  96. podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  97. // lookup the statefulset and enqueue
  98. AddFunc: ssc.addPod,
  99. // lookup current and old statefulset if labels changed
  100. UpdateFunc: ssc.updatePod,
  101. // lookup statefulset accounting for deletion tombstones
  102. DeleteFunc: ssc.deletePod,
  103. })
  104. ssc.podLister = podInformer.Lister()
  105. ssc.podListerSynced = podInformer.Informer().HasSynced
  106. setInformer.Informer().AddEventHandler(
  107. cache.ResourceEventHandlerFuncs{
  108. AddFunc: ssc.enqueueStatefulSet,
  109. UpdateFunc: func(old, cur interface{}) {
  110. oldPS := old.(*apps.StatefulSet)
  111. curPS := cur.(*apps.StatefulSet)
  112. if oldPS.Status.Replicas != curPS.Status.Replicas {
  113. klog.V(4).Infof("Observed updated replica count for StatefulSet: %v, %d->%d", curPS.Name, oldPS.Status.Replicas, curPS.Status.Replicas)
  114. }
  115. ssc.enqueueStatefulSet(cur)
  116. },
  117. DeleteFunc: ssc.enqueueStatefulSet,
  118. },
  119. )
  120. ssc.setLister = setInformer.Lister()
  121. ssc.setListerSynced = setInformer.Informer().HasSynced
  122. // TODO: Watch volumes
  123. return ssc
  124. }
  125. // Run runs the statefulset controller.
  126. func (ssc *StatefulSetController) Run(workers int, stopCh <-chan struct{}) {
  127. defer utilruntime.HandleCrash()
  128. defer ssc.queue.ShutDown()
  129. klog.Infof("Starting stateful set controller")
  130. defer klog.Infof("Shutting down statefulset controller")
  131. if !cache.WaitForNamedCacheSync("stateful set", stopCh, ssc.podListerSynced, ssc.setListerSynced, ssc.pvcListerSynced, ssc.revListerSynced) {
  132. return
  133. }
  134. for i := 0; i < workers; i++ {
  135. go wait.Until(ssc.worker, time.Second, stopCh)
  136. }
  137. <-stopCh
  138. }
  139. // addPod adds the statefulset for the pod to the sync queue
  140. func (ssc *StatefulSetController) addPod(obj interface{}) {
  141. pod := obj.(*v1.Pod)
  142. if pod.DeletionTimestamp != nil {
  143. // on a restart of the controller manager, it's possible a new pod shows up in a state that
  144. // is already pending deletion. Prevent the pod from being a creation observation.
  145. ssc.deletePod(pod)
  146. return
  147. }
  148. // If it has a ControllerRef, that's all that matters.
  149. if controllerRef := metav1.GetControllerOf(pod); controllerRef != nil {
  150. set := ssc.resolveControllerRef(pod.Namespace, controllerRef)
  151. if set == nil {
  152. return
  153. }
  154. klog.V(4).Infof("Pod %s created, labels: %+v", pod.Name, pod.Labels)
  155. ssc.enqueueStatefulSet(set)
  156. return
  157. }
  158. // Otherwise, it's an orphan. Get a list of all matching controllers and sync
  159. // them to see if anyone wants to adopt it.
  160. sets := ssc.getStatefulSetsForPod(pod)
  161. if len(sets) == 0 {
  162. return
  163. }
  164. klog.V(4).Infof("Orphan Pod %s created, labels: %+v", pod.Name, pod.Labels)
  165. for _, set := range sets {
  166. ssc.enqueueStatefulSet(set)
  167. }
  168. }
  169. // updatePod adds the statefulset for the current and old pods to the sync queue.
  170. func (ssc *StatefulSetController) updatePod(old, cur interface{}) {
  171. curPod := cur.(*v1.Pod)
  172. oldPod := old.(*v1.Pod)
  173. if curPod.ResourceVersion == oldPod.ResourceVersion {
  174. // In the event of a re-list we may receive update events for all known pods.
  175. // Two different versions of the same pod will always have different RVs.
  176. return
  177. }
  178. labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
  179. curControllerRef := metav1.GetControllerOf(curPod)
  180. oldControllerRef := metav1.GetControllerOf(oldPod)
  181. controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
  182. if controllerRefChanged && oldControllerRef != nil {
  183. // The ControllerRef was changed. Sync the old controller, if any.
  184. if set := ssc.resolveControllerRef(oldPod.Namespace, oldControllerRef); set != nil {
  185. ssc.enqueueStatefulSet(set)
  186. }
  187. }
  188. // If it has a ControllerRef, that's all that matters.
  189. if curControllerRef != nil {
  190. set := ssc.resolveControllerRef(curPod.Namespace, curControllerRef)
  191. if set == nil {
  192. return
  193. }
  194. klog.V(4).Infof("Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta)
  195. ssc.enqueueStatefulSet(set)
  196. return
  197. }
  198. // Otherwise, it's an orphan. If anything changed, sync matching controllers
  199. // to see if anyone wants to adopt it now.
  200. if labelChanged || controllerRefChanged {
  201. sets := ssc.getStatefulSetsForPod(curPod)
  202. if len(sets) == 0 {
  203. return
  204. }
  205. klog.V(4).Infof("Orphan Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta)
  206. for _, set := range sets {
  207. ssc.enqueueStatefulSet(set)
  208. }
  209. }
  210. }
  211. // deletePod enqueues the statefulset for the pod accounting for deletion tombstones.
  212. func (ssc *StatefulSetController) deletePod(obj interface{}) {
  213. pod, ok := obj.(*v1.Pod)
  214. // When a delete is dropped, the relist will notice a pod in the store not
  215. // in the list, leading to the insertion of a tombstone object which contains
  216. // the deleted key/value. Note that this value might be stale.
  217. if !ok {
  218. tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
  219. if !ok {
  220. utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %+v", obj))
  221. return
  222. }
  223. pod, ok = tombstone.Obj.(*v1.Pod)
  224. if !ok {
  225. utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %+v", obj))
  226. return
  227. }
  228. }
  229. controllerRef := metav1.GetControllerOf(pod)
  230. if controllerRef == nil {
  231. // No controller should care about orphans being deleted.
  232. return
  233. }
  234. set := ssc.resolveControllerRef(pod.Namespace, controllerRef)
  235. if set == nil {
  236. return
  237. }
  238. klog.V(4).Infof("Pod %s/%s deleted through %v.", pod.Namespace, pod.Name, utilruntime.GetCaller())
  239. ssc.enqueueStatefulSet(set)
  240. }
  241. // getPodsForStatefulSet returns the Pods that a given StatefulSet should manage.
  242. // It also reconciles ControllerRef by adopting/orphaning.
  243. //
  244. // NOTE: Returned Pods are pointers to objects from the cache.
  245. // If you need to modify one, you need to copy it first.
  246. func (ssc *StatefulSetController) getPodsForStatefulSet(set *apps.StatefulSet, selector labels.Selector) ([]*v1.Pod, error) {
  247. // List all pods to include the pods that don't match the selector anymore but
  248. // has a ControllerRef pointing to this StatefulSet.
  249. pods, err := ssc.podLister.Pods(set.Namespace).List(labels.Everything())
  250. if err != nil {
  251. return nil, err
  252. }
  253. filter := func(pod *v1.Pod) bool {
  254. // Only claim if it matches our StatefulSet name. Otherwise release/ignore.
  255. return isMemberOf(set, pod)
  256. }
  257. cm := controller.NewPodControllerRefManager(ssc.podControl, set, selector, controllerKind, ssc.canAdoptFunc(set))
  258. return cm.ClaimPods(pods, filter)
  259. }
  260. // If any adoptions are attempted, we should first recheck for deletion with
  261. // an uncached quorum read sometime after listing Pods/ControllerRevisions (see #42639).
  262. func (ssc *StatefulSetController) canAdoptFunc(set *apps.StatefulSet) func() error {
  263. return controller.RecheckDeletionTimestamp(func() (metav1.Object, error) {
  264. fresh, err := ssc.kubeClient.AppsV1().StatefulSets(set.Namespace).Get(context.TODO(), set.Name, metav1.GetOptions{})
  265. if err != nil {
  266. return nil, err
  267. }
  268. if fresh.UID != set.UID {
  269. return nil, fmt.Errorf("original StatefulSet %v/%v is gone: got uid %v, wanted %v", set.Namespace, set.Name, fresh.UID, set.UID)
  270. }
  271. return fresh, nil
  272. })
  273. }
  274. // adoptOrphanRevisions adopts any orphaned ControllerRevisions matched by set's Selector.
  275. func (ssc *StatefulSetController) adoptOrphanRevisions(set *apps.StatefulSet) error {
  276. revisions, err := ssc.control.ListRevisions(set)
  277. if err != nil {
  278. return err
  279. }
  280. orphanRevisions := make([]*apps.ControllerRevision, 0)
  281. for i := range revisions {
  282. if metav1.GetControllerOf(revisions[i]) == nil {
  283. orphanRevisions = append(orphanRevisions, revisions[i])
  284. }
  285. }
  286. if len(orphanRevisions) > 0 {
  287. canAdoptErr := ssc.canAdoptFunc(set)()
  288. if canAdoptErr != nil {
  289. return fmt.Errorf("can't adopt ControllerRevisions: %v", canAdoptErr)
  290. }
  291. return ssc.control.AdoptOrphanRevisions(set, orphanRevisions)
  292. }
  293. return nil
  294. }
  295. // getStatefulSetsForPod returns a list of StatefulSets that potentially match
  296. // a given pod.
  297. func (ssc *StatefulSetController) getStatefulSetsForPod(pod *v1.Pod) []*apps.StatefulSet {
  298. sets, err := ssc.setLister.GetPodStatefulSets(pod)
  299. if err != nil {
  300. return nil
  301. }
  302. // More than one set is selecting the same Pod
  303. if len(sets) > 1 {
  304. // ControllerRef will ensure we don't do anything crazy, but more than one
  305. // item in this list nevertheless constitutes user error.
  306. utilruntime.HandleError(
  307. fmt.Errorf(
  308. "user error: more than one StatefulSet is selecting pods with labels: %+v",
  309. pod.Labels))
  310. }
  311. return sets
  312. }
  313. // resolveControllerRef returns the controller referenced by a ControllerRef,
  314. // or nil if the ControllerRef could not be resolved to a matching controller
  315. // of the correct Kind.
  316. func (ssc *StatefulSetController) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *apps.StatefulSet {
  317. // We can't look up by UID, so look up by Name and then verify UID.
  318. // Don't even try to look up by Name if it's the wrong Kind.
  319. if controllerRef.Kind != controllerKind.Kind {
  320. return nil
  321. }
  322. set, err := ssc.setLister.StatefulSets(namespace).Get(controllerRef.Name)
  323. if err != nil {
  324. return nil
  325. }
  326. if set.UID != controllerRef.UID {
  327. // The controller we found with this Name is not the same one that the
  328. // ControllerRef points to.
  329. return nil
  330. }
  331. return set
  332. }
  333. // enqueueStatefulSet enqueues the given statefulset in the work queue.
  334. func (ssc *StatefulSetController) enqueueStatefulSet(obj interface{}) {
  335. key, err := controller.KeyFunc(obj)
  336. if err != nil {
  337. utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
  338. return
  339. }
  340. ssc.queue.Add(key)
  341. }
  342. // processNextWorkItem dequeues items, processes them, and marks them done. It enforces that the syncHandler is never
  343. // invoked concurrently with the same key.
  344. func (ssc *StatefulSetController) processNextWorkItem() bool {
  345. key, quit := ssc.queue.Get()
  346. if quit {
  347. return false
  348. }
  349. defer ssc.queue.Done(key)
  350. if err := ssc.sync(key.(string)); err != nil {
  351. utilruntime.HandleError(fmt.Errorf("Error syncing StatefulSet %v, requeuing: %v", key.(string), err))
  352. ssc.queue.AddRateLimited(key)
  353. } else {
  354. ssc.queue.Forget(key)
  355. }
  356. return true
  357. }
  358. // worker runs a worker goroutine that invokes processNextWorkItem until the controller's queue is closed
  359. func (ssc *StatefulSetController) worker() {
  360. for ssc.processNextWorkItem() {
  361. }
  362. }
  363. // sync syncs the given statefulset.
  364. func (ssc *StatefulSetController) sync(key string) error {
  365. startTime := time.Now()
  366. defer func() {
  367. klog.V(4).Infof("Finished syncing statefulset %q (%v)", key, time.Since(startTime))
  368. }()
  369. namespace, name, err := cache.SplitMetaNamespaceKey(key)
  370. if err != nil {
  371. return err
  372. }
  373. set, err := ssc.setLister.StatefulSets(namespace).Get(name)
  374. if errors.IsNotFound(err) {
  375. klog.Infof("StatefulSet has been deleted %v", key)
  376. return nil
  377. }
  378. if err != nil {
  379. utilruntime.HandleError(fmt.Errorf("unable to retrieve StatefulSet %v from store: %v", key, err))
  380. return err
  381. }
  382. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  383. if err != nil {
  384. utilruntime.HandleError(fmt.Errorf("error converting StatefulSet %v selector: %v", key, err))
  385. // This is a non-transient error, so don't retry.
  386. return nil
  387. }
  388. if err := ssc.adoptOrphanRevisions(set); err != nil {
  389. return err
  390. }
  391. pods, err := ssc.getPodsForStatefulSet(set, selector)
  392. if err != nil {
  393. return err
  394. }
  395. return ssc.syncStatefulSet(set, pods)
  396. }
  397. // syncStatefulSet syncs a tuple of (statefulset, []*v1.Pod).
  398. func (ssc *StatefulSetController) syncStatefulSet(set *apps.StatefulSet, pods []*v1.Pod) error {
  399. klog.V(4).Infof("Syncing StatefulSet %v/%v with %d pods", set.Namespace, set.Name, len(pods))
  400. // TODO: investigate where we mutate the set during the update as it is not obvious.
  401. if err := ssc.control.UpdateStatefulSet(set.DeepCopy(), pods); err != nil {
  402. return err
  403. }
  404. klog.V(4).Infof("Successfully synced StatefulSet %s/%s successful", set.Namespace, set.Name)
  405. return nil
  406. }