stateful_set_control_test.go 66 KB


  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package statefulset
  14. import (
  15. "errors"
  16. "fmt"
  17. "math/rand"
  18. "reflect"
  19. "runtime"
  20. "sort"
  21. "strconv"
  22. "strings"
  23. "testing"
  24. "time"
  25. apps "k8s.io/api/apps/v1"
  26. "k8s.io/api/core/v1"
  27. apierrors "k8s.io/apimachinery/pkg/api/errors"
  28. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  29. "k8s.io/apimachinery/pkg/labels"
  30. "k8s.io/client-go/informers"
  31. appsinformers "k8s.io/client-go/informers/apps/v1"
  32. coreinformers "k8s.io/client-go/informers/core/v1"
  33. clientset "k8s.io/client-go/kubernetes"
  34. "k8s.io/client-go/kubernetes/fake"
  35. appslisters "k8s.io/client-go/listers/apps/v1"
  36. corelisters "k8s.io/client-go/listers/core/v1"
  37. "k8s.io/client-go/tools/cache"
  38. "k8s.io/client-go/tools/record"
  39. podutil "k8s.io/kubernetes/pkg/api/v1/pod"
  40. "k8s.io/kubernetes/pkg/controller"
  41. "k8s.io/kubernetes/pkg/controller/history"
  42. )
  43. type invariantFunc func(set *apps.StatefulSet, spc *fakeStatefulPodControl) error
  44. func setupController(client clientset.Interface) (*fakeStatefulPodControl, *fakeStatefulSetStatusUpdater, StatefulSetControlInterface, chan struct{}) {
  45. informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
  46. spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1().StatefulSets())
  47. ssu := newFakeStatefulSetStatusUpdater(informerFactory.Apps().V1().StatefulSets())
  48. recorder := record.NewFakeRecorder(10)
  49. ssc := NewDefaultStatefulSetControl(spc, ssu, history.NewFakeHistory(informerFactory.Apps().V1().ControllerRevisions()), recorder)
  50. stop := make(chan struct{})
  51. informerFactory.Start(stop)
  52. cache.WaitForCacheSync(
  53. stop,
  54. informerFactory.Apps().V1().StatefulSets().Informer().HasSynced,
  55. informerFactory.Core().V1().Pods().Informer().HasSynced,
  56. informerFactory.Apps().V1().ControllerRevisions().Informer().HasSynced,
  57. )
  58. return spc, ssu, ssc, stop
  59. }
  60. func burst(set *apps.StatefulSet) *apps.StatefulSet {
  61. set.Spec.PodManagementPolicy = apps.ParallelPodManagement
  62. return set
  63. }
  64. func TestStatefulSetControl(t *testing.T) {
  65. simpleSetFn := func() *apps.StatefulSet { return newStatefulSet(3) }
  66. largeSetFn := func() *apps.StatefulSet { return newStatefulSet(5) }
  67. testCases := []struct {
  68. fn func(*testing.T, *apps.StatefulSet, invariantFunc)
  69. obj func() *apps.StatefulSet
  70. }{
  71. {CreatesPods, simpleSetFn},
  72. {ScalesUp, simpleSetFn},
  73. {ScalesDown, simpleSetFn},
  74. {ReplacesPods, largeSetFn},
  75. {RecreatesFailedPod, simpleSetFn},
  76. {CreatePodFailure, simpleSetFn},
  77. {UpdatePodFailure, simpleSetFn},
  78. {UpdateSetStatusFailure, simpleSetFn},
  79. {PodRecreateDeleteFailure, simpleSetFn},
  80. }
  81. for _, testCase := range testCases {
  82. fnName := runtime.FuncForPC(reflect.ValueOf(testCase.fn).Pointer()).Name()
  83. if i := strings.LastIndex(fnName, "."); i != -1 {
  84. fnName = fnName[i+1:]
  85. }
  86. t.Run(
  87. fmt.Sprintf("%s/Monotonic", fnName),
  88. func(t *testing.T) {
  89. testCase.fn(t, testCase.obj(), assertMonotonicInvariants)
  90. },
  91. )
  92. t.Run(
  93. fmt.Sprintf("%s/Burst", fnName),
  94. func(t *testing.T) {
  95. set := burst(testCase.obj())
  96. testCase.fn(t, set, assertBurstInvariants)
  97. },
  98. )
  99. }
  100. }
  101. func CreatesPods(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
  102. client := fake.NewSimpleClientset(set)
  103. spc, _, ssc, stop := setupController(client)
  104. defer close(stop)
  105. if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil {
  106. t.Errorf("Failed to turn up StatefulSet : %s", err)
  107. }
  108. var err error
  109. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  110. if err != nil {
  111. t.Fatalf("Error getting updated StatefulSet: %v", err)
  112. }
  113. if set.Status.Replicas != 3 {
  114. t.Error("Failed to scale statefulset to 3 replicas")
  115. }
  116. if set.Status.ReadyReplicas != 3 {
  117. t.Error("Failed to set ReadyReplicas correctly")
  118. }
  119. if set.Status.UpdatedReplicas != 3 {
  120. t.Error("Failed to set UpdatedReplicas correctly")
  121. }
  122. }
  123. func ScalesUp(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
  124. client := fake.NewSimpleClientset(set)
  125. spc, _, ssc, stop := setupController(client)
  126. defer close(stop)
  127. if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil {
  128. t.Errorf("Failed to turn up StatefulSet : %s", err)
  129. }
  130. *set.Spec.Replicas = 4
  131. if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil {
  132. t.Errorf("Failed to scale StatefulSet : %s", err)
  133. }
  134. var err error
  135. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  136. if err != nil {
  137. t.Fatalf("Error getting updated StatefulSet: %v", err)
  138. }
  139. if set.Status.Replicas != 4 {
  140. t.Error("Failed to scale statefulset to 4 replicas")
  141. }
  142. if set.Status.ReadyReplicas != 4 {
  143. t.Error("Failed to set readyReplicas correctly")
  144. }
  145. if set.Status.UpdatedReplicas != 4 {
  146. t.Error("Failed to set updatedReplicas correctly")
  147. }
  148. }
  149. func ScalesDown(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
  150. client := fake.NewSimpleClientset(set)
  151. spc, _, ssc, stop := setupController(client)
  152. defer close(stop)
  153. if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil {
  154. t.Errorf("Failed to turn up StatefulSet : %s", err)
  155. }
  156. *set.Spec.Replicas = 0
  157. if err := scaleDownStatefulSetControl(set, ssc, spc, invariants); err != nil {
  158. t.Errorf("Failed to scale StatefulSet : %s", err)
  159. }
  160. if set.Status.Replicas != 0 {
  161. t.Error("Failed to scale statefulset to 0 replicas")
  162. }
  163. if set.Status.ReadyReplicas != 0 {
  164. t.Error("Failed to set readyReplicas correctly")
  165. }
  166. if set.Status.UpdatedReplicas != 0 {
  167. t.Error("Failed to set updatedReplicas correctly")
  168. }
  169. }
  170. func ReplacesPods(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
  171. client := fake.NewSimpleClientset(set)
  172. spc, _, ssc, stop := setupController(client)
  173. defer close(stop)
  174. if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil {
  175. t.Errorf("Failed to turn up StatefulSet : %s", err)
  176. }
  177. var err error
  178. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  179. if err != nil {
  180. t.Fatalf("Error getting updated StatefulSet: %v", err)
  181. }
  182. if set.Status.Replicas != 5 {
  183. t.Error("Failed to scale statefulset to 5 replicas")
  184. }
  185. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  186. if err != nil {
  187. t.Error(err)
  188. }
  189. pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  190. if err != nil {
  191. t.Error(err)
  192. }
  193. sort.Sort(ascendingOrdinal(pods))
  194. spc.podsIndexer.Delete(pods[0])
  195. spc.podsIndexer.Delete(pods[2])
  196. spc.podsIndexer.Delete(pods[4])
  197. for i := 0; i < 5; i += 2 {
  198. pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  199. if err != nil {
  200. t.Error(err)
  201. }
  202. if err = ssc.UpdateStatefulSet(set, pods); err != nil {
  203. t.Errorf("Failed to update StatefulSet : %s", err)
  204. }
  205. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  206. if err != nil {
  207. t.Fatalf("Error getting updated StatefulSet: %v", err)
  208. }
  209. if pods, err = spc.setPodRunning(set, i); err != nil {
  210. t.Error(err)
  211. }
  212. if err = ssc.UpdateStatefulSet(set, pods); err != nil {
  213. t.Errorf("Failed to update StatefulSet : %s", err)
  214. }
  215. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  216. if err != nil {
  217. t.Fatalf("Error getting updated StatefulSet: %v", err)
  218. }
  219. if pods, err = spc.setPodReady(set, i); err != nil {
  220. t.Error(err)
  221. }
  222. }
  223. pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
  224. if err != nil {
  225. t.Error(err)
  226. }
  227. if err := ssc.UpdateStatefulSet(set, pods); err != nil {
  228. t.Errorf("Failed to update StatefulSet : %s", err)
  229. }
  230. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  231. if err != nil {
  232. t.Fatalf("Error getting updated StatefulSet: %v", err)
  233. }
  234. if e, a := int32(5), set.Status.Replicas; e != a {
  235. t.Errorf("Expected to scale to %d, got %d", e, a)
  236. }
  237. }
  238. func RecreatesFailedPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
  239. client := fake.NewSimpleClientset()
  240. spc, _, ssc, stop := setupController(client)
  241. defer close(stop)
  242. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  243. if err != nil {
  244. t.Error(err)
  245. }
  246. pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  247. if err != nil {
  248. t.Error(err)
  249. }
  250. if err := ssc.UpdateStatefulSet(set, pods); err != nil {
  251. t.Errorf("Error updating StatefulSet %s", err)
  252. }
  253. if err := invariants(set, spc); err != nil {
  254. t.Error(err)
  255. }
  256. pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
  257. if err != nil {
  258. t.Error(err)
  259. }
  260. pods[0].Status.Phase = v1.PodFailed
  261. spc.podsIndexer.Update(pods[0])
  262. if err := ssc.UpdateStatefulSet(set, pods); err != nil {
  263. t.Errorf("Error updating StatefulSet %s", err)
  264. }
  265. if err := invariants(set, spc); err != nil {
  266. t.Error(err)
  267. }
  268. pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
  269. if err != nil {
  270. t.Error(err)
  271. }
  272. if isCreated(pods[0]) {
  273. t.Error("StatefulSet did not recreate failed Pod")
  274. }
  275. }
  276. func CreatePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
  277. client := fake.NewSimpleClientset(set)
  278. spc, _, ssc, stop := setupController(client)
  279. defer close(stop)
  280. spc.SetCreateStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 2)
  281. if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); !apierrors.IsInternalError(err) {
  282. t.Errorf("StatefulSetControl did not return InternalError found %s", err)
  283. }
  284. if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil {
  285. t.Errorf("Failed to turn up StatefulSet : %s", err)
  286. }
  287. var err error
  288. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  289. if err != nil {
  290. t.Fatalf("Error getting updated StatefulSet: %v", err)
  291. }
  292. if set.Status.Replicas != 3 {
  293. t.Error("Failed to scale StatefulSet to 3 replicas")
  294. }
  295. if set.Status.ReadyReplicas != 3 {
  296. t.Error("Failed to set readyReplicas correctly")
  297. }
  298. if set.Status.UpdatedReplicas != 3 {
  299. t.Error("Failed to updatedReplicas correctly")
  300. }
  301. }
  302. func UpdatePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
  303. client := fake.NewSimpleClientset(set)
  304. spc, _, ssc, stop := setupController(client)
  305. defer close(stop)
  306. spc.SetUpdateStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 0)
  307. // have to have 1 successful loop first
  308. if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil {
  309. t.Fatalf("Unexpected error: %v", err)
  310. }
  311. var err error
  312. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  313. if err != nil {
  314. t.Fatalf("Error getting updated StatefulSet: %v", err)
  315. }
  316. if set.Status.Replicas != 3 {
  317. t.Error("Failed to scale StatefulSet to 3 replicas")
  318. }
  319. if set.Status.ReadyReplicas != 3 {
  320. t.Error("Failed to set readyReplicas correctly")
  321. }
  322. if set.Status.UpdatedReplicas != 3 {
  323. t.Error("Failed to set updatedReplicas correctly")
  324. }
  325. // now mutate a pod's identity
  326. pods, err := spc.podsLister.List(labels.Everything())
  327. if err != nil {
  328. t.Fatalf("Error listing pods: %v", err)
  329. }
  330. if len(pods) != 3 {
  331. t.Fatalf("Expected 3 pods, got %d", len(pods))
  332. }
  333. sort.Sort(ascendingOrdinal(pods))
  334. pods[0].Name = "goo-0"
  335. spc.podsIndexer.Update(pods[0])
  336. // now it should fail
  337. if err := ssc.UpdateStatefulSet(set, pods); !apierrors.IsInternalError(err) {
  338. t.Errorf("StatefulSetControl did not return InternalError found %s", err)
  339. }
  340. }
  341. func UpdateSetStatusFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
  342. client := fake.NewSimpleClientset(set)
  343. spc, ssu, ssc, stop := setupController(client)
  344. defer close(stop)
  345. ssu.SetUpdateStatefulSetStatusError(apierrors.NewInternalError(errors.New("API server failed")), 2)
  346. if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); !apierrors.IsInternalError(err) {
  347. t.Errorf("StatefulSetControl did not return InternalError found %s", err)
  348. }
  349. if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil {
  350. t.Errorf("Failed to turn up StatefulSet : %s", err)
  351. }
  352. var err error
  353. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  354. if err != nil {
  355. t.Fatalf("Error getting updated StatefulSet: %v", err)
  356. }
  357. if set.Status.Replicas != 3 {
  358. t.Error("Failed to scale StatefulSet to 3 replicas")
  359. }
  360. if set.Status.ReadyReplicas != 3 {
  361. t.Error("Failed to set readyReplicas to 3")
  362. }
  363. if set.Status.UpdatedReplicas != 3 {
  364. t.Error("Failed to set updatedReplicas to 3")
  365. }
  366. }
  367. func PodRecreateDeleteFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
  368. client := fake.NewSimpleClientset(set)
  369. spc, _, ssc, stop := setupController(client)
  370. defer close(stop)
  371. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  372. if err != nil {
  373. t.Error(err)
  374. }
  375. pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  376. if err != nil {
  377. t.Error(err)
  378. }
  379. if err := ssc.UpdateStatefulSet(set, pods); err != nil {
  380. t.Errorf("Error updating StatefulSet %s", err)
  381. }
  382. if err := invariants(set, spc); err != nil {
  383. t.Error(err)
  384. }
  385. pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
  386. if err != nil {
  387. t.Error(err)
  388. }
  389. pods[0].Status.Phase = v1.PodFailed
  390. spc.podsIndexer.Update(pods[0])
  391. spc.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 0)
  392. if err := ssc.UpdateStatefulSet(set, pods); !apierrors.IsInternalError(err) {
  393. t.Errorf("StatefulSet failed to %s", err)
  394. }
  395. if err := invariants(set, spc); err != nil {
  396. t.Error(err)
  397. }
  398. if err := ssc.UpdateStatefulSet(set, pods); err != nil {
  399. t.Errorf("Error updating StatefulSet %s", err)
  400. }
  401. if err := invariants(set, spc); err != nil {
  402. t.Error(err)
  403. }
  404. pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
  405. if err != nil {
  406. t.Error(err)
  407. }
  408. if isCreated(pods[0]) {
  409. t.Error("StatefulSet did not recreate failed Pod")
  410. }
  411. }
  412. func TestStatefulSetControlScaleDownDeleteError(t *testing.T) {
  413. invariants := assertMonotonicInvariants
  414. set := newStatefulSet(3)
  415. client := fake.NewSimpleClientset(set)
  416. spc, _, ssc, stop := setupController(client)
  417. defer close(stop)
  418. if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil {
  419. t.Errorf("Failed to turn up StatefulSet : %s", err)
  420. }
  421. var err error
  422. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  423. if err != nil {
  424. t.Fatalf("Error getting updated StatefulSet: %v", err)
  425. }
  426. *set.Spec.Replicas = 0
  427. spc.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 2)
  428. if err := scaleDownStatefulSetControl(set, ssc, spc, invariants); !apierrors.IsInternalError(err) {
  429. t.Errorf("StatefulSetControl failed to throw error on delete %s", err)
  430. }
  431. if err := scaleDownStatefulSetControl(set, ssc, spc, invariants); err != nil {
  432. t.Errorf("Failed to turn down StatefulSet %s", err)
  433. }
  434. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  435. if err != nil {
  436. t.Fatalf("Error getting updated StatefulSet: %v", err)
  437. }
  438. if set.Status.Replicas != 0 {
  439. t.Error("Failed to scale statefulset to 0 replicas")
  440. }
  441. if set.Status.ReadyReplicas != 0 {
  442. t.Error("Failed to set readyReplicas to 0")
  443. }
  444. if set.Status.UpdatedReplicas != 0 {
  445. t.Error("Failed to set updatedReplicas to 0")
  446. }
  447. }
  448. func TestStatefulSetControl_getSetRevisions(t *testing.T) {
  449. type testcase struct {
  450. name string
  451. existing []*apps.ControllerRevision
  452. set *apps.StatefulSet
  453. expectedCount int
  454. expectedCurrent *apps.ControllerRevision
  455. expectedUpdate *apps.ControllerRevision
  456. err bool
  457. }
  458. testFn := func(test *testcase, t *testing.T) {
  459. client := fake.NewSimpleClientset()
  460. informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
  461. spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1().StatefulSets())
  462. ssu := newFakeStatefulSetStatusUpdater(informerFactory.Apps().V1().StatefulSets())
  463. recorder := record.NewFakeRecorder(10)
  464. ssc := defaultStatefulSetControl{spc, ssu, history.NewFakeHistory(informerFactory.Apps().V1().ControllerRevisions()), recorder}
  465. stop := make(chan struct{})
  466. defer close(stop)
  467. informerFactory.Start(stop)
  468. cache.WaitForCacheSync(
  469. stop,
  470. informerFactory.Apps().V1().StatefulSets().Informer().HasSynced,
  471. informerFactory.Core().V1().Pods().Informer().HasSynced,
  472. informerFactory.Apps().V1().ControllerRevisions().Informer().HasSynced,
  473. )
  474. test.set.Status.CollisionCount = new(int32)
  475. for i := range test.existing {
  476. ssc.controllerHistory.CreateControllerRevision(test.set, test.existing[i], test.set.Status.CollisionCount)
  477. }
  478. revisions, err := ssc.ListRevisions(test.set)
  479. if err != nil {
  480. t.Fatal(err)
  481. }
  482. current, update, _, err := ssc.getStatefulSetRevisions(test.set, revisions)
  483. if err != nil {
  484. t.Fatalf("error getting statefulset revisions:%v", err)
  485. }
  486. revisions, err = ssc.ListRevisions(test.set)
  487. if err != nil {
  488. t.Fatal(err)
  489. }
  490. if len(revisions) != test.expectedCount {
  491. t.Errorf("%s: want %d revisions got %d", test.name, test.expectedCount, len(revisions))
  492. }
  493. if test.err && err == nil {
  494. t.Errorf("%s: expected error", test.name)
  495. }
  496. if !test.err && !history.EqualRevision(current, test.expectedCurrent) {
  497. t.Errorf("%s: for current want %v got %v", test.name, test.expectedCurrent, current)
  498. }
  499. if !test.err && !history.EqualRevision(update, test.expectedUpdate) {
  500. t.Errorf("%s: for update want %v got %v", test.name, test.expectedUpdate, update)
  501. }
  502. if !test.err && test.expectedCurrent != nil && current != nil && test.expectedCurrent.Revision != current.Revision {
  503. t.Errorf("%s: for current revision want %d got %d", test.name, test.expectedCurrent.Revision, current.Revision)
  504. }
  505. if !test.err && test.expectedUpdate != nil && update != nil && test.expectedUpdate.Revision != update.Revision {
  506. t.Errorf("%s: for update revision want %d got %d", test.name, test.expectedUpdate.Revision, update.Revision)
  507. }
  508. }
  509. updateRevision := func(cr *apps.ControllerRevision, revision int64) *apps.ControllerRevision {
  510. clone := cr.DeepCopy()
  511. clone.Revision = revision
  512. return clone
  513. }
  514. set := newStatefulSet(3)
  515. set.Status.CollisionCount = new(int32)
  516. rev0 := newRevisionOrDie(set, 1)
  517. set1 := set.DeepCopy()
  518. set1.Spec.Template.Spec.Containers[0].Image = "foo"
  519. set1.Status.CurrentRevision = rev0.Name
  520. set1.Status.CollisionCount = new(int32)
  521. rev1 := newRevisionOrDie(set1, 2)
  522. set2 := set1.DeepCopy()
  523. set2.Spec.Template.Labels["new"] = "label"
  524. set2.Status.CurrentRevision = rev0.Name
  525. set2.Status.CollisionCount = new(int32)
  526. rev2 := newRevisionOrDie(set2, 3)
  527. tests := []testcase{
  528. {
  529. name: "creates initial revision",
  530. existing: nil,
  531. set: set,
  532. expectedCount: 1,
  533. expectedCurrent: rev0,
  534. expectedUpdate: rev0,
  535. err: false,
  536. },
  537. {
  538. name: "creates revision on update",
  539. existing: []*apps.ControllerRevision{rev0},
  540. set: set1,
  541. expectedCount: 2,
  542. expectedCurrent: rev0,
  543. expectedUpdate: rev1,
  544. err: false,
  545. },
  546. {
  547. name: "must not recreate a new revision of same set",
  548. existing: []*apps.ControllerRevision{rev0, rev1},
  549. set: set1,
  550. expectedCount: 2,
  551. expectedCurrent: rev0,
  552. expectedUpdate: rev1,
  553. err: false,
  554. },
  555. {
  556. name: "must rollback to a previous revision",
  557. existing: []*apps.ControllerRevision{rev0, rev1, rev2},
  558. set: set1,
  559. expectedCount: 3,
  560. expectedCurrent: rev0,
  561. expectedUpdate: updateRevision(rev1, 4),
  562. err: false,
  563. },
  564. }
  565. for i := range tests {
  566. testFn(&tests[i], t)
  567. }
  568. }
  569. func TestStatefulSetControlRollingUpdate(t *testing.T) {
  570. type testcase struct {
  571. name string
  572. invariants func(set *apps.StatefulSet, spc *fakeStatefulPodControl) error
  573. initial func() *apps.StatefulSet
  574. update func(set *apps.StatefulSet) *apps.StatefulSet
  575. validate func(set *apps.StatefulSet, pods []*v1.Pod) error
  576. }
  577. testFn := func(test *testcase, t *testing.T) {
  578. set := test.initial()
  579. client := fake.NewSimpleClientset(set)
  580. spc, _, ssc, stop := setupController(client)
  581. defer close(stop)
  582. if err := scaleUpStatefulSetControl(set, ssc, spc, test.invariants); err != nil {
  583. t.Fatalf("%s: %s", test.name, err)
  584. }
  585. set, err := spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  586. if err != nil {
  587. t.Fatalf("%s: %s", test.name, err)
  588. }
  589. set = test.update(set)
  590. if err := updateStatefulSetControl(set, ssc, spc, assertUpdateInvariants); err != nil {
  591. t.Fatalf("%s: %s", test.name, err)
  592. }
  593. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  594. if err != nil {
  595. t.Fatalf("%s: %s", test.name, err)
  596. }
  597. pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  598. if err != nil {
  599. t.Fatalf("%s: %s", test.name, err)
  600. }
  601. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  602. if err != nil {
  603. t.Fatalf("%s: %s", test.name, err)
  604. }
  605. if err := test.validate(set, pods); err != nil {
  606. t.Fatalf("%s: %s", test.name, err)
  607. }
  608. }
  609. tests := []testcase{
  610. {
  611. name: "monotonic image update",
  612. invariants: assertMonotonicInvariants,
  613. initial: func() *apps.StatefulSet {
  614. return newStatefulSet(3)
  615. },
  616. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  617. set.Spec.Template.Spec.Containers[0].Image = "foo"
  618. return set
  619. },
  620. validate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  621. sort.Sort(ascendingOrdinal(pods))
  622. for i := range pods {
  623. if pods[i].Spec.Containers[0].Image != "foo" {
  624. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  625. }
  626. }
  627. return nil
  628. },
  629. },
  630. {
  631. name: "monotonic image update and scale up",
  632. invariants: assertMonotonicInvariants,
  633. initial: func() *apps.StatefulSet {
  634. return newStatefulSet(3)
  635. },
  636. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  637. *set.Spec.Replicas = 5
  638. set.Spec.Template.Spec.Containers[0].Image = "foo"
  639. return set
  640. },
  641. validate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  642. sort.Sort(ascendingOrdinal(pods))
  643. for i := range pods {
  644. if pods[i].Spec.Containers[0].Image != "foo" {
  645. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  646. }
  647. }
  648. return nil
  649. },
  650. },
  651. {
  652. name: "monotonic image update and scale down",
  653. invariants: assertMonotonicInvariants,
  654. initial: func() *apps.StatefulSet {
  655. return newStatefulSet(5)
  656. },
  657. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  658. *set.Spec.Replicas = 3
  659. set.Spec.Template.Spec.Containers[0].Image = "foo"
  660. return set
  661. },
  662. validate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  663. sort.Sort(ascendingOrdinal(pods))
  664. for i := range pods {
  665. if pods[i].Spec.Containers[0].Image != "foo" {
  666. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  667. }
  668. }
  669. return nil
  670. },
  671. },
  672. {
  673. name: "burst image update",
  674. invariants: assertBurstInvariants,
  675. initial: func() *apps.StatefulSet {
  676. return burst(newStatefulSet(3))
  677. },
  678. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  679. set.Spec.Template.Spec.Containers[0].Image = "foo"
  680. return set
  681. },
  682. validate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  683. sort.Sort(ascendingOrdinal(pods))
  684. for i := range pods {
  685. if pods[i].Spec.Containers[0].Image != "foo" {
  686. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  687. }
  688. }
  689. return nil
  690. },
  691. },
  692. {
  693. name: "burst image update and scale up",
  694. invariants: assertBurstInvariants,
  695. initial: func() *apps.StatefulSet {
  696. return burst(newStatefulSet(3))
  697. },
  698. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  699. *set.Spec.Replicas = 5
  700. set.Spec.Template.Spec.Containers[0].Image = "foo"
  701. return set
  702. },
  703. validate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  704. sort.Sort(ascendingOrdinal(pods))
  705. for i := range pods {
  706. if pods[i].Spec.Containers[0].Image != "foo" {
  707. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  708. }
  709. }
  710. return nil
  711. },
  712. },
  713. {
  714. name: "burst image update and scale down",
  715. invariants: assertBurstInvariants,
  716. initial: func() *apps.StatefulSet {
  717. return burst(newStatefulSet(5))
  718. },
  719. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  720. *set.Spec.Replicas = 3
  721. set.Spec.Template.Spec.Containers[0].Image = "foo"
  722. return set
  723. },
  724. validate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  725. sort.Sort(ascendingOrdinal(pods))
  726. for i := range pods {
  727. if pods[i].Spec.Containers[0].Image != "foo" {
  728. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  729. }
  730. }
  731. return nil
  732. },
  733. },
  734. }
  735. for i := range tests {
  736. testFn(&tests[i], t)
  737. }
  738. }
  739. func TestStatefulSetControlOnDeleteUpdate(t *testing.T) {
  740. type testcase struct {
  741. name string
  742. invariants func(set *apps.StatefulSet, spc *fakeStatefulPodControl) error
  743. initial func() *apps.StatefulSet
  744. update func(set *apps.StatefulSet) *apps.StatefulSet
  745. validateUpdate func(set *apps.StatefulSet, pods []*v1.Pod) error
  746. validateRestart func(set *apps.StatefulSet, pods []*v1.Pod) error
  747. }
  748. originalImage := newStatefulSet(3).Spec.Template.Spec.Containers[0].Image
  749. testFn := func(test *testcase, t *testing.T) {
  750. set := test.initial()
  751. set.Spec.UpdateStrategy = apps.StatefulSetUpdateStrategy{Type: apps.OnDeleteStatefulSetStrategyType}
  752. client := fake.NewSimpleClientset(set)
  753. spc, _, ssc, stop := setupController(client)
  754. defer close(stop)
  755. if err := scaleUpStatefulSetControl(set, ssc, spc, test.invariants); err != nil {
  756. t.Fatalf("%s: %s", test.name, err)
  757. }
  758. set, err := spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  759. if err != nil {
  760. t.Fatalf("%s: %s", test.name, err)
  761. }
  762. set = test.update(set)
  763. if err := updateStatefulSetControl(set, ssc, spc, assertUpdateInvariants); err != nil {
  764. t.Fatalf("%s: %s", test.name, err)
  765. }
  766. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  767. if err != nil {
  768. t.Fatalf("%s: %s", test.name, err)
  769. }
  770. pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  771. if err != nil {
  772. t.Fatalf("%s: %s", test.name, err)
  773. }
  774. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  775. if err != nil {
  776. t.Fatalf("%s: %s", test.name, err)
  777. }
  778. if err := test.validateUpdate(set, pods); err != nil {
  779. for i := range pods {
  780. t.Log(pods[i].Name)
  781. }
  782. t.Fatalf("%s: %s", test.name, err)
  783. }
  784. replicas := *set.Spec.Replicas
  785. *set.Spec.Replicas = 0
  786. if err := scaleDownStatefulSetControl(set, ssc, spc, test.invariants); err != nil {
  787. t.Fatalf("%s: %s", test.name, err)
  788. }
  789. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  790. if err != nil {
  791. t.Fatalf("%s: %s", test.name, err)
  792. }
  793. *set.Spec.Replicas = replicas
  794. if err := scaleUpStatefulSetControl(set, ssc, spc, test.invariants); err != nil {
  795. t.Fatalf("%s: %s", test.name, err)
  796. }
  797. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  798. if err != nil {
  799. t.Fatalf("%s: %s", test.name, err)
  800. }
  801. pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
  802. if err != nil {
  803. t.Fatalf("%s: %s", test.name, err)
  804. }
  805. if err := test.validateRestart(set, pods); err != nil {
  806. t.Fatalf("%s: %s", test.name, err)
  807. }
  808. }
  809. tests := []testcase{
  810. {
  811. name: "monotonic image update",
  812. invariants: assertMonotonicInvariants,
  813. initial: func() *apps.StatefulSet {
  814. return newStatefulSet(3)
  815. },
  816. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  817. set.Spec.Template.Spec.Containers[0].Image = "foo"
  818. return set
  819. },
  820. validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  821. sort.Sort(ascendingOrdinal(pods))
  822. for i := range pods {
  823. if pods[i].Spec.Containers[0].Image != originalImage {
  824. return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  825. }
  826. }
  827. return nil
  828. },
  829. validateRestart: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  830. sort.Sort(ascendingOrdinal(pods))
  831. for i := range pods {
  832. if pods[i].Spec.Containers[0].Image != "foo" {
  833. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  834. }
  835. }
  836. return nil
  837. },
  838. },
  839. {
  840. name: "monotonic image update and scale up",
  841. invariants: assertMonotonicInvariants,
  842. initial: func() *apps.StatefulSet {
  843. return newStatefulSet(3)
  844. },
  845. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  846. *set.Spec.Replicas = 5
  847. set.Spec.Template.Spec.Containers[0].Image = "foo"
  848. return set
  849. },
  850. validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  851. sort.Sort(ascendingOrdinal(pods))
  852. for i := range pods {
  853. if i < 3 && pods[i].Spec.Containers[0].Image != originalImage {
  854. return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  855. }
  856. if i >= 3 && pods[i].Spec.Containers[0].Image != "foo" {
  857. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  858. }
  859. }
  860. return nil
  861. },
  862. validateRestart: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  863. sort.Sort(ascendingOrdinal(pods))
  864. for i := range pods {
  865. if pods[i].Spec.Containers[0].Image != "foo" {
  866. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  867. }
  868. }
  869. return nil
  870. },
  871. },
  872. {
  873. name: "monotonic image update and scale down",
  874. invariants: assertMonotonicInvariants,
  875. initial: func() *apps.StatefulSet {
  876. return newStatefulSet(5)
  877. },
  878. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  879. *set.Spec.Replicas = 3
  880. set.Spec.Template.Spec.Containers[0].Image = "foo"
  881. return set
  882. },
  883. validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  884. sort.Sort(ascendingOrdinal(pods))
  885. for i := range pods {
  886. if pods[i].Spec.Containers[0].Image != originalImage {
  887. return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  888. }
  889. }
  890. return nil
  891. },
  892. validateRestart: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  893. sort.Sort(ascendingOrdinal(pods))
  894. for i := range pods {
  895. if pods[i].Spec.Containers[0].Image != "foo" {
  896. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  897. }
  898. }
  899. return nil
  900. },
  901. },
  902. {
  903. name: "burst image update",
  904. invariants: assertBurstInvariants,
  905. initial: func() *apps.StatefulSet {
  906. return burst(newStatefulSet(3))
  907. },
  908. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  909. set.Spec.Template.Spec.Containers[0].Image = "foo"
  910. return set
  911. },
  912. validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  913. sort.Sort(ascendingOrdinal(pods))
  914. for i := range pods {
  915. if pods[i].Spec.Containers[0].Image != originalImage {
  916. return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  917. }
  918. }
  919. return nil
  920. },
  921. validateRestart: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  922. sort.Sort(ascendingOrdinal(pods))
  923. for i := range pods {
  924. if pods[i].Spec.Containers[0].Image != "foo" {
  925. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  926. }
  927. }
  928. return nil
  929. },
  930. },
  931. {
  932. name: "burst image update and scale up",
  933. invariants: assertBurstInvariants,
  934. initial: func() *apps.StatefulSet {
  935. return burst(newStatefulSet(3))
  936. },
  937. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  938. *set.Spec.Replicas = 5
  939. set.Spec.Template.Spec.Containers[0].Image = "foo"
  940. return set
  941. },
  942. validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  943. sort.Sort(ascendingOrdinal(pods))
  944. for i := range pods {
  945. if i < 3 && pods[i].Spec.Containers[0].Image != originalImage {
  946. return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  947. }
  948. if i >= 3 && pods[i].Spec.Containers[0].Image != "foo" {
  949. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  950. }
  951. }
  952. return nil
  953. },
  954. validateRestart: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  955. sort.Sort(ascendingOrdinal(pods))
  956. for i := range pods {
  957. if pods[i].Spec.Containers[0].Image != "foo" {
  958. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  959. }
  960. }
  961. return nil
  962. },
  963. },
  964. {
  965. name: "burst image update and scale down",
  966. invariants: assertBurstInvariants,
  967. initial: func() *apps.StatefulSet {
  968. return burst(newStatefulSet(5))
  969. },
  970. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  971. *set.Spec.Replicas = 3
  972. set.Spec.Template.Spec.Containers[0].Image = "foo"
  973. return set
  974. },
  975. validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  976. sort.Sort(ascendingOrdinal(pods))
  977. for i := range pods {
  978. if pods[i].Spec.Containers[0].Image != originalImage {
  979. return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  980. }
  981. }
  982. return nil
  983. },
  984. validateRestart: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  985. sort.Sort(ascendingOrdinal(pods))
  986. for i := range pods {
  987. if pods[i].Spec.Containers[0].Image != "foo" {
  988. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  989. }
  990. }
  991. return nil
  992. },
  993. },
  994. }
  995. for i := range tests {
  996. testFn(&tests[i], t)
  997. }
  998. }
  999. func TestStatefulSetControlRollingUpdateWithPartition(t *testing.T) {
  1000. type testcase struct {
  1001. name string
  1002. partition int32
  1003. invariants func(set *apps.StatefulSet, spc *fakeStatefulPodControl) error
  1004. initial func() *apps.StatefulSet
  1005. update func(set *apps.StatefulSet) *apps.StatefulSet
  1006. validate func(set *apps.StatefulSet, pods []*v1.Pod) error
  1007. }
  1008. testFn := func(test *testcase, t *testing.T) {
  1009. set := test.initial()
  1010. set.Spec.UpdateStrategy = apps.StatefulSetUpdateStrategy{
  1011. Type: apps.RollingUpdateStatefulSetStrategyType,
  1012. RollingUpdate: func() *apps.RollingUpdateStatefulSetStrategy {
  1013. return &apps.RollingUpdateStatefulSetStrategy{Partition: &test.partition}
  1014. }(),
  1015. }
  1016. client := fake.NewSimpleClientset(set)
  1017. spc, _, ssc, stop := setupController(client)
  1018. defer close(stop)
  1019. if err := scaleUpStatefulSetControl(set, ssc, spc, test.invariants); err != nil {
  1020. t.Fatalf("%s: %s", test.name, err)
  1021. }
  1022. set, err := spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  1023. if err != nil {
  1024. t.Fatalf("%s: %s", test.name, err)
  1025. }
  1026. set = test.update(set)
  1027. if err := updateStatefulSetControl(set, ssc, spc, assertUpdateInvariants); err != nil {
  1028. t.Fatalf("%s: %s", test.name, err)
  1029. }
  1030. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  1031. if err != nil {
  1032. t.Fatalf("%s: %s", test.name, err)
  1033. }
  1034. pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  1035. if err != nil {
  1036. t.Fatalf("%s: %s", test.name, err)
  1037. }
  1038. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  1039. if err != nil {
  1040. t.Fatalf("%s: %s", test.name, err)
  1041. }
  1042. if err := test.validate(set, pods); err != nil {
  1043. t.Fatalf("%s: %s", test.name, err)
  1044. }
  1045. }
  1046. originalImage := newStatefulSet(3).Spec.Template.Spec.Containers[0].Image
  1047. tests := []testcase{
  1048. {
  1049. name: "monotonic image update",
  1050. invariants: assertMonotonicInvariants,
  1051. partition: 2,
  1052. initial: func() *apps.StatefulSet {
  1053. return newStatefulSet(3)
  1054. },
  1055. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  1056. set.Spec.Template.Spec.Containers[0].Image = "foo"
  1057. return set
  1058. },
  1059. validate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1060. sort.Sort(ascendingOrdinal(pods))
  1061. for i := range pods {
  1062. if i < 2 && pods[i].Spec.Containers[0].Image != originalImage {
  1063. return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  1064. }
  1065. if i >= 2 && pods[i].Spec.Containers[0].Image != "foo" {
  1066. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  1067. }
  1068. }
  1069. return nil
  1070. },
  1071. },
  1072. {
  1073. name: "monotonic image update and scale up",
  1074. partition: 2,
  1075. invariants: assertMonotonicInvariants,
  1076. initial: func() *apps.StatefulSet {
  1077. return newStatefulSet(3)
  1078. },
  1079. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  1080. *set.Spec.Replicas = 5
  1081. set.Spec.Template.Spec.Containers[0].Image = "foo"
  1082. return set
  1083. },
  1084. validate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1085. sort.Sort(ascendingOrdinal(pods))
  1086. for i := range pods {
  1087. if i < 2 && pods[i].Spec.Containers[0].Image != originalImage {
  1088. return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  1089. }
  1090. if i >= 2 && pods[i].Spec.Containers[0].Image != "foo" {
  1091. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  1092. }
  1093. }
  1094. return nil
  1095. },
  1096. },
  1097. {
  1098. name: "burst image update",
  1099. partition: 2,
  1100. invariants: assertBurstInvariants,
  1101. initial: func() *apps.StatefulSet {
  1102. return burst(newStatefulSet(3))
  1103. },
  1104. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  1105. set.Spec.Template.Spec.Containers[0].Image = "foo"
  1106. return set
  1107. },
  1108. validate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1109. sort.Sort(ascendingOrdinal(pods))
  1110. for i := range pods {
  1111. if i < 2 && pods[i].Spec.Containers[0].Image != originalImage {
  1112. return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  1113. }
  1114. if i >= 2 && pods[i].Spec.Containers[0].Image != "foo" {
  1115. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  1116. }
  1117. }
  1118. return nil
  1119. },
  1120. },
  1121. {
  1122. name: "burst image update and scale up",
  1123. invariants: assertBurstInvariants,
  1124. partition: 2,
  1125. initial: func() *apps.StatefulSet {
  1126. return burst(newStatefulSet(3))
  1127. },
  1128. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  1129. *set.Spec.Replicas = 5
  1130. set.Spec.Template.Spec.Containers[0].Image = "foo"
  1131. return set
  1132. },
  1133. validate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1134. sort.Sort(ascendingOrdinal(pods))
  1135. for i := range pods {
  1136. if i < 2 && pods[i].Spec.Containers[0].Image != originalImage {
  1137. return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  1138. }
  1139. if i >= 2 && pods[i].Spec.Containers[0].Image != "foo" {
  1140. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  1141. }
  1142. }
  1143. return nil
  1144. },
  1145. },
  1146. }
  1147. for i := range tests {
  1148. testFn(&tests[i], t)
  1149. }
  1150. }
  1151. func TestStatefulSetControlLimitsHistory(t *testing.T) {
  1152. type testcase struct {
  1153. name string
  1154. invariants func(set *apps.StatefulSet, spc *fakeStatefulPodControl) error
  1155. initial func() *apps.StatefulSet
  1156. }
  1157. testFn := func(test *testcase, t *testing.T) {
  1158. set := test.initial()
  1159. client := fake.NewSimpleClientset(set)
  1160. spc, _, ssc, stop := setupController(client)
  1161. defer close(stop)
  1162. if err := scaleUpStatefulSetControl(set, ssc, spc, test.invariants); err != nil {
  1163. t.Fatalf("%s: %s", test.name, err)
  1164. }
  1165. set, err := spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  1166. if err != nil {
  1167. t.Fatalf("%s: %s", test.name, err)
  1168. }
  1169. for i := 0; i < 10; i++ {
  1170. set.Spec.Template.Spec.Containers[0].Image = fmt.Sprintf("foo-%d", i)
  1171. if err := updateStatefulSetControl(set, ssc, spc, assertUpdateInvariants); err != nil {
  1172. t.Fatalf("%s: %s", test.name, err)
  1173. }
  1174. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  1175. if err != nil {
  1176. t.Fatalf("%s: %s", test.name, err)
  1177. }
  1178. pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  1179. if err != nil {
  1180. t.Fatalf("%s: %s", test.name, err)
  1181. }
  1182. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  1183. if err != nil {
  1184. t.Fatalf("%s: %s", test.name, err)
  1185. }
  1186. err = ssc.UpdateStatefulSet(set, pods)
  1187. if err != nil {
  1188. t.Fatalf("%s: %s", test.name, err)
  1189. }
  1190. revisions, err := ssc.ListRevisions(set)
  1191. if err != nil {
  1192. t.Fatalf("%s: %s", test.name, err)
  1193. }
  1194. if len(revisions) > int(*set.Spec.RevisionHistoryLimit)+2 {
  1195. t.Fatalf("%s: %d greater than limit %d", test.name, len(revisions), *set.Spec.RevisionHistoryLimit)
  1196. }
  1197. }
  1198. }
  1199. tests := []testcase{
  1200. {
  1201. name: "monotonic update",
  1202. invariants: assertMonotonicInvariants,
  1203. initial: func() *apps.StatefulSet {
  1204. return newStatefulSet(3)
  1205. },
  1206. },
  1207. {
  1208. name: "burst update",
  1209. invariants: assertBurstInvariants,
  1210. initial: func() *apps.StatefulSet {
  1211. return burst(newStatefulSet(3))
  1212. },
  1213. },
  1214. }
  1215. for i := range tests {
  1216. testFn(&tests[i], t)
  1217. }
  1218. }
  1219. func TestStatefulSetControlRollback(t *testing.T) {
  1220. type testcase struct {
  1221. name string
  1222. invariants func(set *apps.StatefulSet, spc *fakeStatefulPodControl) error
  1223. initial func() *apps.StatefulSet
  1224. update func(set *apps.StatefulSet) *apps.StatefulSet
  1225. validateUpdate func(set *apps.StatefulSet, pods []*v1.Pod) error
  1226. validateRollback func(set *apps.StatefulSet, pods []*v1.Pod) error
  1227. }
  1228. originalImage := newStatefulSet(3).Spec.Template.Spec.Containers[0].Image
  1229. testFn := func(test *testcase, t *testing.T) {
  1230. set := test.initial()
  1231. client := fake.NewSimpleClientset(set)
  1232. spc, _, ssc, stop := setupController(client)
  1233. defer close(stop)
  1234. if err := scaleUpStatefulSetControl(set, ssc, spc, test.invariants); err != nil {
  1235. t.Fatalf("%s: %s", test.name, err)
  1236. }
  1237. set, err := spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  1238. if err != nil {
  1239. t.Fatalf("%s: %s", test.name, err)
  1240. }
  1241. set = test.update(set)
  1242. if err := updateStatefulSetControl(set, ssc, spc, assertUpdateInvariants); err != nil {
  1243. t.Fatalf("%s: %s", test.name, err)
  1244. }
  1245. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  1246. if err != nil {
  1247. t.Fatalf("%s: %s", test.name, err)
  1248. }
  1249. pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  1250. if err != nil {
  1251. t.Fatalf("%s: %s", test.name, err)
  1252. }
  1253. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  1254. if err != nil {
  1255. t.Fatalf("%s: %s", test.name, err)
  1256. }
  1257. if err := test.validateUpdate(set, pods); err != nil {
  1258. t.Fatalf("%s: %s", test.name, err)
  1259. }
  1260. revisions, err := ssc.ListRevisions(set)
  1261. if err != nil {
  1262. t.Fatalf("%s: %s", test.name, err)
  1263. }
  1264. history.SortControllerRevisions(revisions)
  1265. set, err = ApplyRevision(set, revisions[0])
  1266. if err != nil {
  1267. t.Fatalf("%s: %s", test.name, err)
  1268. }
  1269. if err := updateStatefulSetControl(set, ssc, spc, assertUpdateInvariants); err != nil {
  1270. t.Fatalf("%s: %s", test.name, err)
  1271. }
  1272. if err != nil {
  1273. t.Fatalf("%s: %s", test.name, err)
  1274. }
  1275. pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
  1276. if err != nil {
  1277. t.Fatalf("%s: %s", test.name, err)
  1278. }
  1279. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  1280. if err != nil {
  1281. t.Fatalf("%s: %s", test.name, err)
  1282. }
  1283. if err := test.validateRollback(set, pods); err != nil {
  1284. t.Fatalf("%s: %s", test.name, err)
  1285. }
  1286. }
  1287. tests := []testcase{
  1288. {
  1289. name: "monotonic image update",
  1290. invariants: assertMonotonicInvariants,
  1291. initial: func() *apps.StatefulSet {
  1292. return newStatefulSet(3)
  1293. },
  1294. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  1295. set.Spec.Template.Spec.Containers[0].Image = "foo"
  1296. return set
  1297. },
  1298. validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1299. sort.Sort(ascendingOrdinal(pods))
  1300. for i := range pods {
  1301. if pods[i].Spec.Containers[0].Image != "foo" {
  1302. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  1303. }
  1304. }
  1305. return nil
  1306. },
  1307. validateRollback: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1308. sort.Sort(ascendingOrdinal(pods))
  1309. for i := range pods {
  1310. if pods[i].Spec.Containers[0].Image != originalImage {
  1311. return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  1312. }
  1313. }
  1314. return nil
  1315. },
  1316. },
  1317. {
  1318. name: "monotonic image update and scale up",
  1319. invariants: assertMonotonicInvariants,
  1320. initial: func() *apps.StatefulSet {
  1321. return newStatefulSet(3)
  1322. },
  1323. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  1324. *set.Spec.Replicas = 5
  1325. set.Spec.Template.Spec.Containers[0].Image = "foo"
  1326. return set
  1327. },
  1328. validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1329. sort.Sort(ascendingOrdinal(pods))
  1330. for i := range pods {
  1331. if pods[i].Spec.Containers[0].Image != "foo" {
  1332. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  1333. }
  1334. }
  1335. return nil
  1336. },
  1337. validateRollback: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1338. sort.Sort(ascendingOrdinal(pods))
  1339. for i := range pods {
  1340. if pods[i].Spec.Containers[0].Image != originalImage {
  1341. return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  1342. }
  1343. }
  1344. return nil
  1345. },
  1346. },
  1347. {
  1348. name: "monotonic image update and scale down",
  1349. invariants: assertMonotonicInvariants,
  1350. initial: func() *apps.StatefulSet {
  1351. return newStatefulSet(5)
  1352. },
  1353. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  1354. *set.Spec.Replicas = 3
  1355. set.Spec.Template.Spec.Containers[0].Image = "foo"
  1356. return set
  1357. },
  1358. validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1359. sort.Sort(ascendingOrdinal(pods))
  1360. for i := range pods {
  1361. if pods[i].Spec.Containers[0].Image != "foo" {
  1362. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  1363. }
  1364. }
  1365. return nil
  1366. },
  1367. validateRollback: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1368. sort.Sort(ascendingOrdinal(pods))
  1369. for i := range pods {
  1370. if pods[i].Spec.Containers[0].Image != originalImage {
  1371. return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  1372. }
  1373. }
  1374. return nil
  1375. },
  1376. },
  1377. {
  1378. name: "burst image update",
  1379. invariants: assertBurstInvariants,
  1380. initial: func() *apps.StatefulSet {
  1381. return burst(newStatefulSet(3))
  1382. },
  1383. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  1384. set.Spec.Template.Spec.Containers[0].Image = "foo"
  1385. return set
  1386. },
  1387. validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1388. sort.Sort(ascendingOrdinal(pods))
  1389. for i := range pods {
  1390. if pods[i].Spec.Containers[0].Image != "foo" {
  1391. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  1392. }
  1393. }
  1394. return nil
  1395. },
  1396. validateRollback: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1397. sort.Sort(ascendingOrdinal(pods))
  1398. for i := range pods {
  1399. if pods[i].Spec.Containers[0].Image != originalImage {
  1400. return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  1401. }
  1402. }
  1403. return nil
  1404. },
  1405. },
  1406. {
  1407. name: "burst image update and scale up",
  1408. invariants: assertBurstInvariants,
  1409. initial: func() *apps.StatefulSet {
  1410. return burst(newStatefulSet(3))
  1411. },
  1412. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  1413. *set.Spec.Replicas = 5
  1414. set.Spec.Template.Spec.Containers[0].Image = "foo"
  1415. return set
  1416. },
  1417. validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1418. sort.Sort(ascendingOrdinal(pods))
  1419. for i := range pods {
  1420. if pods[i].Spec.Containers[0].Image != "foo" {
  1421. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  1422. }
  1423. }
  1424. return nil
  1425. },
  1426. validateRollback: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1427. sort.Sort(ascendingOrdinal(pods))
  1428. for i := range pods {
  1429. if pods[i].Spec.Containers[0].Image != originalImage {
  1430. return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  1431. }
  1432. }
  1433. return nil
  1434. },
  1435. },
  1436. {
  1437. name: "burst image update and scale down",
  1438. invariants: assertBurstInvariants,
  1439. initial: func() *apps.StatefulSet {
  1440. return burst(newStatefulSet(5))
  1441. },
  1442. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  1443. *set.Spec.Replicas = 3
  1444. set.Spec.Template.Spec.Containers[0].Image = "foo"
  1445. return set
  1446. },
  1447. validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1448. sort.Sort(ascendingOrdinal(pods))
  1449. for i := range pods {
  1450. if pods[i].Spec.Containers[0].Image != "foo" {
  1451. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  1452. }
  1453. }
  1454. return nil
  1455. },
  1456. validateRollback: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1457. sort.Sort(ascendingOrdinal(pods))
  1458. for i := range pods {
  1459. if pods[i].Spec.Containers[0].Image != originalImage {
  1460. return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  1461. }
  1462. }
  1463. return nil
  1464. },
  1465. },
  1466. }
  1467. for i := range tests {
  1468. testFn(&tests[i], t)
  1469. }
  1470. }
  1471. type requestTracker struct {
  1472. requests int
  1473. err error
  1474. after int
  1475. }
  1476. func (rt *requestTracker) errorReady() bool {
  1477. return rt.err != nil && rt.requests >= rt.after
  1478. }
  1479. func (rt *requestTracker) inc() {
  1480. rt.requests++
  1481. }
  1482. func (rt *requestTracker) reset() {
  1483. rt.err = nil
  1484. rt.after = 0
  1485. }
  1486. type fakeStatefulPodControl struct {
  1487. podsLister corelisters.PodLister
  1488. claimsLister corelisters.PersistentVolumeClaimLister
  1489. setsLister appslisters.StatefulSetLister
  1490. podsIndexer cache.Indexer
  1491. claimsIndexer cache.Indexer
  1492. setsIndexer cache.Indexer
  1493. createPodTracker requestTracker
  1494. updatePodTracker requestTracker
  1495. deletePodTracker requestTracker
  1496. }
  1497. func newFakeStatefulPodControl(podInformer coreinformers.PodInformer, setInformer appsinformers.StatefulSetInformer) *fakeStatefulPodControl {
  1498. claimsIndexer := cache.NewIndexer(controller.KeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
  1499. return &fakeStatefulPodControl{
  1500. podInformer.Lister(),
  1501. corelisters.NewPersistentVolumeClaimLister(claimsIndexer),
  1502. setInformer.Lister(),
  1503. podInformer.Informer().GetIndexer(),
  1504. claimsIndexer,
  1505. setInformer.Informer().GetIndexer(),
  1506. requestTracker{0, nil, 0},
  1507. requestTracker{0, nil, 0},
  1508. requestTracker{0, nil, 0}}
  1509. }
  1510. func (spc *fakeStatefulPodControl) SetCreateStatefulPodError(err error, after int) {
  1511. spc.createPodTracker.err = err
  1512. spc.createPodTracker.after = after
  1513. }
  1514. func (spc *fakeStatefulPodControl) SetUpdateStatefulPodError(err error, after int) {
  1515. spc.updatePodTracker.err = err
  1516. spc.updatePodTracker.after = after
  1517. }
  1518. func (spc *fakeStatefulPodControl) SetDeleteStatefulPodError(err error, after int) {
  1519. spc.deletePodTracker.err = err
  1520. spc.deletePodTracker.after = after
  1521. }
  1522. func (spc *fakeStatefulPodControl) setPodPending(set *apps.StatefulSet, ordinal int) ([]*v1.Pod, error) {
  1523. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  1524. if err != nil {
  1525. return nil, err
  1526. }
  1527. pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  1528. if err != nil {
  1529. return nil, err
  1530. }
  1531. if 0 > ordinal || ordinal >= len(pods) {
  1532. return nil, fmt.Errorf("ordinal %d out of range [0,%d)", ordinal, len(pods))
  1533. }
  1534. sort.Sort(ascendingOrdinal(pods))
  1535. pod := pods[ordinal].DeepCopy()
  1536. pod.Status.Phase = v1.PodPending
  1537. fakeResourceVersion(pod)
  1538. spc.podsIndexer.Update(pod)
  1539. return spc.podsLister.Pods(set.Namespace).List(selector)
  1540. }
  1541. func (spc *fakeStatefulPodControl) setPodRunning(set *apps.StatefulSet, ordinal int) ([]*v1.Pod, error) {
  1542. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  1543. if err != nil {
  1544. return nil, err
  1545. }
  1546. pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  1547. if err != nil {
  1548. return nil, err
  1549. }
  1550. if 0 > ordinal || ordinal >= len(pods) {
  1551. return nil, fmt.Errorf("ordinal %d out of range [0,%d)", ordinal, len(pods))
  1552. }
  1553. sort.Sort(ascendingOrdinal(pods))
  1554. pod := pods[ordinal].DeepCopy()
  1555. pod.Status.Phase = v1.PodRunning
  1556. fakeResourceVersion(pod)
  1557. spc.podsIndexer.Update(pod)
  1558. return spc.podsLister.Pods(set.Namespace).List(selector)
  1559. }
  1560. func (spc *fakeStatefulPodControl) setPodReady(set *apps.StatefulSet, ordinal int) ([]*v1.Pod, error) {
  1561. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  1562. if err != nil {
  1563. return nil, err
  1564. }
  1565. pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  1566. if err != nil {
  1567. return nil, err
  1568. }
  1569. if 0 > ordinal || ordinal >= len(pods) {
  1570. return nil, fmt.Errorf("ordinal %d out of range [0,%d)", ordinal, len(pods))
  1571. }
  1572. sort.Sort(ascendingOrdinal(pods))
  1573. pod := pods[ordinal].DeepCopy()
  1574. condition := v1.PodCondition{Type: v1.PodReady, Status: v1.ConditionTrue}
  1575. podutil.UpdatePodCondition(&pod.Status, &condition)
  1576. fakeResourceVersion(pod)
  1577. spc.podsIndexer.Update(pod)
  1578. return spc.podsLister.Pods(set.Namespace).List(selector)
  1579. }
  1580. func (spc *fakeStatefulPodControl) addTerminatingPod(set *apps.StatefulSet, ordinal int) ([]*v1.Pod, error) {
  1581. pod := newStatefulSetPod(set, ordinal)
  1582. pod.Status.Phase = v1.PodRunning
  1583. deleted := metav1.NewTime(time.Now())
  1584. pod.DeletionTimestamp = &deleted
  1585. condition := v1.PodCondition{Type: v1.PodReady, Status: v1.ConditionTrue}
  1586. fakeResourceVersion(pod)
  1587. podutil.UpdatePodCondition(&pod.Status, &condition)
  1588. spc.podsIndexer.Update(pod)
  1589. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  1590. if err != nil {
  1591. return nil, err
  1592. }
  1593. return spc.podsLister.Pods(set.Namespace).List(selector)
  1594. }
  1595. func (spc *fakeStatefulPodControl) setPodTerminated(set *apps.StatefulSet, ordinal int) ([]*v1.Pod, error) {
  1596. pod := newStatefulSetPod(set, ordinal)
  1597. deleted := metav1.NewTime(time.Now())
  1598. pod.DeletionTimestamp = &deleted
  1599. fakeResourceVersion(pod)
  1600. spc.podsIndexer.Update(pod)
  1601. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  1602. if err != nil {
  1603. return nil, err
  1604. }
  1605. return spc.podsLister.Pods(set.Namespace).List(selector)
  1606. }
  1607. func (spc *fakeStatefulPodControl) CreateStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error {
  1608. defer spc.createPodTracker.inc()
  1609. if spc.createPodTracker.errorReady() {
  1610. defer spc.createPodTracker.reset()
  1611. return spc.createPodTracker.err
  1612. }
  1613. for _, claim := range getPersistentVolumeClaims(set, pod) {
  1614. spc.claimsIndexer.Update(&claim)
  1615. }
  1616. spc.podsIndexer.Update(pod)
  1617. return nil
  1618. }
  1619. func (spc *fakeStatefulPodControl) UpdateStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error {
  1620. defer spc.updatePodTracker.inc()
  1621. if spc.updatePodTracker.errorReady() {
  1622. defer spc.updatePodTracker.reset()
  1623. return spc.updatePodTracker.err
  1624. }
  1625. if !identityMatches(set, pod) {
  1626. updateIdentity(set, pod)
  1627. }
  1628. if !storageMatches(set, pod) {
  1629. updateStorage(set, pod)
  1630. for _, claim := range getPersistentVolumeClaims(set, pod) {
  1631. spc.claimsIndexer.Update(&claim)
  1632. }
  1633. }
  1634. spc.podsIndexer.Update(pod)
  1635. return nil
  1636. }
  1637. func (spc *fakeStatefulPodControl) DeleteStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error {
  1638. defer spc.deletePodTracker.inc()
  1639. if spc.deletePodTracker.errorReady() {
  1640. defer spc.deletePodTracker.reset()
  1641. return spc.deletePodTracker.err
  1642. }
  1643. if key, err := controller.KeyFunc(pod); err != nil {
  1644. return err
  1645. } else if obj, found, err := spc.podsIndexer.GetByKey(key); err != nil {
  1646. return err
  1647. } else if found {
  1648. spc.podsIndexer.Delete(obj)
  1649. }
  1650. return nil
  1651. }
  1652. var _ StatefulPodControlInterface = &fakeStatefulPodControl{}
  1653. type fakeStatefulSetStatusUpdater struct {
  1654. setsLister appslisters.StatefulSetLister
  1655. setsIndexer cache.Indexer
  1656. updateStatusTracker requestTracker
  1657. }
  1658. func newFakeStatefulSetStatusUpdater(setInformer appsinformers.StatefulSetInformer) *fakeStatefulSetStatusUpdater {
  1659. return &fakeStatefulSetStatusUpdater{
  1660. setInformer.Lister(),
  1661. setInformer.Informer().GetIndexer(),
  1662. requestTracker{0, nil, 0},
  1663. }
  1664. }
  1665. func (ssu *fakeStatefulSetStatusUpdater) UpdateStatefulSetStatus(set *apps.StatefulSet, status *apps.StatefulSetStatus) error {
  1666. defer ssu.updateStatusTracker.inc()
  1667. if ssu.updateStatusTracker.errorReady() {
  1668. defer ssu.updateStatusTracker.reset()
  1669. return ssu.updateStatusTracker.err
  1670. }
  1671. set.Status = *status
  1672. ssu.setsIndexer.Update(set)
  1673. return nil
  1674. }
  1675. func (ssu *fakeStatefulSetStatusUpdater) SetUpdateStatefulSetStatusError(err error, after int) {
  1676. ssu.updateStatusTracker.err = err
  1677. ssu.updateStatusTracker.after = after
  1678. }
  1679. var _ StatefulSetStatusUpdaterInterface = &fakeStatefulSetStatusUpdater{}
  1680. func assertMonotonicInvariants(set *apps.StatefulSet, spc *fakeStatefulPodControl) error {
  1681. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  1682. if err != nil {
  1683. return err
  1684. }
  1685. pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  1686. if err != nil {
  1687. return err
  1688. }
  1689. sort.Sort(ascendingOrdinal(pods))
  1690. for ord := 0; ord < len(pods); ord++ {
  1691. if ord > 0 && isRunningAndReady(pods[ord]) && !isRunningAndReady(pods[ord-1]) {
  1692. return fmt.Errorf("Successor %s is Running and Ready while %s is not", pods[ord].Name, pods[ord-1].Name)
  1693. }
  1694. if getOrdinal(pods[ord]) != ord {
  1695. return fmt.Errorf("pods %s deployed in the wrong order %d", pods[ord].Name, ord)
  1696. }
  1697. if !storageMatches(set, pods[ord]) {
  1698. return fmt.Errorf("pods %s does not match the storage specification of StatefulSet %s ", pods[ord].Name, set.Name)
  1699. }
  1700. for _, claim := range getPersistentVolumeClaims(set, pods[ord]) {
  1701. claim, err := spc.claimsLister.PersistentVolumeClaims(set.Namespace).Get(claim.Name)
  1702. if err != nil {
  1703. return err
  1704. }
  1705. if claim == nil {
  1706. return fmt.Errorf("claim %s for Pod %s was not created", claim.Name, pods[ord].Name)
  1707. }
  1708. }
  1709. if !identityMatches(set, pods[ord]) {
  1710. return fmt.Errorf("pods %s does not match the identity specification of StatefulSet %s ", pods[ord].Name, set.Name)
  1711. }
  1712. }
  1713. return nil
  1714. }
  1715. func assertBurstInvariants(set *apps.StatefulSet, spc *fakeStatefulPodControl) error {
  1716. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  1717. if err != nil {
  1718. return err
  1719. }
  1720. pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  1721. if err != nil {
  1722. return err
  1723. }
  1724. sort.Sort(ascendingOrdinal(pods))
  1725. for ord := 0; ord < len(pods); ord++ {
  1726. if !storageMatches(set, pods[ord]) {
  1727. return fmt.Errorf("pods %s does not match the storage specification of StatefulSet %s ", pods[ord].Name, set.Name)
  1728. }
  1729. for _, claim := range getPersistentVolumeClaims(set, pods[ord]) {
  1730. claim, err := spc.claimsLister.PersistentVolumeClaims(set.Namespace).Get(claim.Name)
  1731. if err != nil {
  1732. return err
  1733. }
  1734. if claim == nil {
  1735. return fmt.Errorf("claim %s for Pod %s was not created", claim.Name, pods[ord].Name)
  1736. }
  1737. }
  1738. if !identityMatches(set, pods[ord]) {
  1739. return fmt.Errorf("pods %s does not match the identity specification of StatefulSet %s ",
  1740. pods[ord].Name,
  1741. set.Name)
  1742. }
  1743. }
  1744. return nil
  1745. }
  1746. func assertUpdateInvariants(set *apps.StatefulSet, spc *fakeStatefulPodControl) error {
  1747. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  1748. if err != nil {
  1749. return err
  1750. }
  1751. pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  1752. if err != nil {
  1753. return err
  1754. }
  1755. sort.Sort(ascendingOrdinal(pods))
  1756. for ord := 0; ord < len(pods); ord++ {
  1757. if !storageMatches(set, pods[ord]) {
  1758. return fmt.Errorf("pod %s does not match the storage specification of StatefulSet %s ", pods[ord].Name, set.Name)
  1759. }
  1760. for _, claim := range getPersistentVolumeClaims(set, pods[ord]) {
  1761. claim, err := spc.claimsLister.PersistentVolumeClaims(set.Namespace).Get(claim.Name)
  1762. if err != nil {
  1763. return err
  1764. }
  1765. if claim == nil {
  1766. return fmt.Errorf("claim %s for Pod %s was not created", claim.Name, pods[ord].Name)
  1767. }
  1768. }
  1769. if !identityMatches(set, pods[ord]) {
  1770. return fmt.Errorf("pod %s does not match the identity specification of StatefulSet %s ", pods[ord].Name, set.Name)
  1771. }
  1772. }
  1773. if set.Spec.UpdateStrategy.Type == apps.OnDeleteStatefulSetStrategyType {
  1774. return nil
  1775. }
  1776. if set.Spec.UpdateStrategy.Type == apps.RollingUpdateStatefulSetStrategyType {
  1777. for i := 0; i < int(set.Status.CurrentReplicas) && i < len(pods); i++ {
  1778. if want, got := set.Status.CurrentRevision, getPodRevision(pods[i]); want != got {
  1779. return fmt.Errorf("pod %s want current revision %s got %s", pods[i].Name, want, got)
  1780. }
  1781. }
  1782. for i, j := len(pods)-1, 0; j < int(set.Status.UpdatedReplicas); i, j = i-1, j+1 {
  1783. if want, got := set.Status.UpdateRevision, getPodRevision(pods[i]); want != got {
  1784. return fmt.Errorf("pod %s want update revision %s got %s", pods[i].Name, want, got)
  1785. }
  1786. }
  1787. }
  1788. return nil
  1789. }
  1790. func fakeResourceVersion(object interface{}) {
  1791. obj, isObj := object.(metav1.Object)
  1792. if !isObj {
  1793. return
  1794. }
  1795. if version := obj.GetResourceVersion(); version == "" {
  1796. obj.SetResourceVersion("1")
  1797. } else if intValue, err := strconv.ParseInt(version, 10, 32); err == nil {
  1798. obj.SetResourceVersion(strconv.FormatInt(intValue+1, 10))
  1799. }
  1800. }
  1801. func scaleUpStatefulSetControl(set *apps.StatefulSet,
  1802. ssc StatefulSetControlInterface,
  1803. spc *fakeStatefulPodControl,
  1804. invariants invariantFunc) error {
  1805. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  1806. if err != nil {
  1807. return err
  1808. }
  1809. for set.Status.ReadyReplicas < *set.Spec.Replicas {
  1810. pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  1811. if err != nil {
  1812. return err
  1813. }
  1814. sort.Sort(ascendingOrdinal(pods))
  1815. // ensure all pods are valid (have a phase)
  1816. initialized := false
  1817. for ord, pod := range pods {
  1818. if pod.Status.Phase == "" {
  1819. if pods, err = spc.setPodPending(set, ord); err != nil {
  1820. return err
  1821. }
  1822. break
  1823. }
  1824. }
  1825. if initialized {
  1826. continue
  1827. }
  1828. // select one of the pods and move it forward in status
  1829. if len(pods) > 0 {
  1830. ord := int(rand.Int63n(int64(len(pods))))
  1831. pod := pods[ord]
  1832. switch pod.Status.Phase {
  1833. case v1.PodPending:
  1834. if pods, err = spc.setPodRunning(set, ord); err != nil {
  1835. return err
  1836. }
  1837. case v1.PodRunning:
  1838. if pods, err = spc.setPodReady(set, ord); err != nil {
  1839. return err
  1840. }
  1841. default:
  1842. continue
  1843. }
  1844. }
  1845. // run the controller once and check invariants
  1846. if err = ssc.UpdateStatefulSet(set, pods); err != nil {
  1847. return err
  1848. }
  1849. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  1850. if err != nil {
  1851. return err
  1852. }
  1853. if err := invariants(set, spc); err != nil {
  1854. return err
  1855. }
  1856. }
  1857. return invariants(set, spc)
  1858. }
  1859. func scaleDownStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlInterface, spc *fakeStatefulPodControl, invariants invariantFunc) error {
  1860. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  1861. if err != nil {
  1862. return err
  1863. }
  1864. for set.Status.Replicas > *set.Spec.Replicas {
  1865. pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  1866. if err != nil {
  1867. return err
  1868. }
  1869. sort.Sort(ascendingOrdinal(pods))
  1870. if ordinal := len(pods) - 1; ordinal >= 0 {
  1871. if err := ssc.UpdateStatefulSet(set, pods); err != nil {
  1872. return err
  1873. }
  1874. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  1875. if err != nil {
  1876. return err
  1877. }
  1878. if pods, err = spc.addTerminatingPod(set, ordinal); err != nil {
  1879. return err
  1880. }
  1881. if err = ssc.UpdateStatefulSet(set, pods); err != nil {
  1882. return err
  1883. }
  1884. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  1885. if err != nil {
  1886. return err
  1887. }
  1888. pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
  1889. if err != nil {
  1890. return err
  1891. }
  1892. sort.Sort(ascendingOrdinal(pods))
  1893. if len(pods) > 0 {
  1894. spc.podsIndexer.Delete(pods[len(pods)-1])
  1895. }
  1896. }
  1897. if err := ssc.UpdateStatefulSet(set, pods); err != nil {
  1898. return err
  1899. }
  1900. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  1901. if err != nil {
  1902. return err
  1903. }
  1904. if err := invariants(set, spc); err != nil {
  1905. return err
  1906. }
  1907. }
  1908. return invariants(set, spc)
  1909. }
  1910. func updateComplete(set *apps.StatefulSet, pods []*v1.Pod) bool {
  1911. sort.Sort(ascendingOrdinal(pods))
  1912. if len(pods) != int(*set.Spec.Replicas) {
  1913. return false
  1914. }
  1915. if set.Status.ReadyReplicas != *set.Spec.Replicas {
  1916. return false
  1917. }
  1918. switch set.Spec.UpdateStrategy.Type {
  1919. case apps.OnDeleteStatefulSetStrategyType:
  1920. return true
  1921. case apps.RollingUpdateStatefulSetStrategyType:
  1922. if set.Spec.UpdateStrategy.RollingUpdate == nil || *set.Spec.UpdateStrategy.RollingUpdate.Partition <= 0 {
  1923. if set.Status.CurrentReplicas < *set.Spec.Replicas {
  1924. return false
  1925. }
  1926. for i := range pods {
  1927. if getPodRevision(pods[i]) != set.Status.CurrentRevision {
  1928. return false
  1929. }
  1930. }
  1931. } else {
  1932. partition := int(*set.Spec.UpdateStrategy.RollingUpdate.Partition)
  1933. if len(pods) < partition {
  1934. return false
  1935. }
  1936. for i := partition; i < len(pods); i++ {
  1937. if getPodRevision(pods[i]) != set.Status.UpdateRevision {
  1938. return false
  1939. }
  1940. }
  1941. }
  1942. }
  1943. return true
  1944. }
  1945. func updateStatefulSetControl(set *apps.StatefulSet,
  1946. ssc StatefulSetControlInterface,
  1947. spc *fakeStatefulPodControl,
  1948. invariants invariantFunc) error {
  1949. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  1950. if err != nil {
  1951. return err
  1952. }
  1953. pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  1954. if err != nil {
  1955. return err
  1956. }
  1957. if err = ssc.UpdateStatefulSet(set, pods); err != nil {
  1958. return err
  1959. }
  1960. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  1961. if err != nil {
  1962. return err
  1963. }
  1964. pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
  1965. if err != nil {
  1966. return err
  1967. }
  1968. for !updateComplete(set, pods) {
  1969. pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
  1970. if err != nil {
  1971. return err
  1972. }
  1973. sort.Sort(ascendingOrdinal(pods))
  1974. initialized := false
  1975. for ord, pod := range pods {
  1976. if pod.Status.Phase == "" {
  1977. if pods, err = spc.setPodPending(set, ord); err != nil {
  1978. return err
  1979. }
  1980. break
  1981. }
  1982. }
  1983. if initialized {
  1984. continue
  1985. }
  1986. if len(pods) > 0 {
  1987. ord := int(rand.Int63n(int64(len(pods))))
  1988. pod := pods[ord]
  1989. switch pod.Status.Phase {
  1990. case v1.PodPending:
  1991. if pods, err = spc.setPodRunning(set, ord); err != nil {
  1992. return err
  1993. }
  1994. case v1.PodRunning:
  1995. if pods, err = spc.setPodReady(set, ord); err != nil {
  1996. return err
  1997. }
  1998. default:
  1999. continue
  2000. }
  2001. }
  2002. if err = ssc.UpdateStatefulSet(set, pods); err != nil {
  2003. return err
  2004. }
  2005. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  2006. if err != nil {
  2007. return err
  2008. }
  2009. if err := invariants(set, spc); err != nil {
  2010. return err
  2011. }
  2012. pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
  2013. if err != nil {
  2014. return err
  2015. }
  2016. }
  2017. return invariants(set, spc)
  2018. }
  2019. func newRevisionOrDie(set *apps.StatefulSet, revision int64) *apps.ControllerRevision {
  2020. rev, err := newRevision(set, revision, set.Status.CollisionCount)
  2021. if err != nil {
  2022. panic(err)
  2023. }
  2024. return rev
  2025. }