stateful_set_control_test.go 67 KB


  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package statefulset
  14. import (
  15. "errors"
  16. "fmt"
  17. "math/rand"
  18. "reflect"
  19. "runtime"
  20. "sort"
  21. "strconv"
  22. "strings"
  23. "testing"
  24. "time"
  25. apps "k8s.io/api/apps/v1"
  26. "k8s.io/api/core/v1"
  27. apierrors "k8s.io/apimachinery/pkg/api/errors"
  28. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  29. "k8s.io/apimachinery/pkg/labels"
  30. "k8s.io/client-go/informers"
  31. appsinformers "k8s.io/client-go/informers/apps/v1"
  32. coreinformers "k8s.io/client-go/informers/core/v1"
  33. clientset "k8s.io/client-go/kubernetes"
  34. "k8s.io/client-go/kubernetes/fake"
  35. appslisters "k8s.io/client-go/listers/apps/v1"
  36. corelisters "k8s.io/client-go/listers/core/v1"
  37. "k8s.io/client-go/tools/cache"
  38. "k8s.io/client-go/tools/record"
  39. podutil "k8s.io/kubernetes/pkg/api/v1/pod"
  40. "k8s.io/kubernetes/pkg/controller"
  41. "k8s.io/kubernetes/pkg/controller/history"
  42. )
  43. type invariantFunc func(set *apps.StatefulSet, spc *fakeStatefulPodControl) error
  44. func setupController(client clientset.Interface) (*fakeStatefulPodControl, *fakeStatefulSetStatusUpdater, StatefulSetControlInterface, chan struct{}) {
  45. informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
  46. spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1().StatefulSets(), informerFactory.Apps().V1().ControllerRevisions())
  47. ssu := newFakeStatefulSetStatusUpdater(informerFactory.Apps().V1().StatefulSets())
  48. recorder := record.NewFakeRecorder(10)
  49. ssc := NewDefaultStatefulSetControl(spc, ssu, history.NewFakeHistory(informerFactory.Apps().V1().ControllerRevisions()), recorder)
  50. stop := make(chan struct{})
  51. informerFactory.Start(stop)
  52. cache.WaitForCacheSync(
  53. stop,
  54. informerFactory.Apps().V1().StatefulSets().Informer().HasSynced,
  55. informerFactory.Core().V1().Pods().Informer().HasSynced,
  56. informerFactory.Apps().V1().ControllerRevisions().Informer().HasSynced,
  57. )
  58. return spc, ssu, ssc, stop
  59. }
  60. func burst(set *apps.StatefulSet) *apps.StatefulSet {
  61. set.Spec.PodManagementPolicy = apps.ParallelPodManagement
  62. return set
  63. }
  64. func TestStatefulSetControl(t *testing.T) {
  65. simpleSetFn := func() *apps.StatefulSet { return newStatefulSet(3) }
  66. largeSetFn := func() *apps.StatefulSet { return newStatefulSet(5) }
  67. testCases := []struct {
  68. fn func(*testing.T, *apps.StatefulSet, invariantFunc)
  69. obj func() *apps.StatefulSet
  70. }{
  71. {CreatesPods, simpleSetFn},
  72. {ScalesUp, simpleSetFn},
  73. {ScalesDown, simpleSetFn},
  74. {ReplacesPods, largeSetFn},
  75. {RecreatesFailedPod, simpleSetFn},
  76. {CreatePodFailure, simpleSetFn},
  77. {UpdatePodFailure, simpleSetFn},
  78. {UpdateSetStatusFailure, simpleSetFn},
  79. {PodRecreateDeleteFailure, simpleSetFn},
  80. }
  81. for _, testCase := range testCases {
  82. fnName := runtime.FuncForPC(reflect.ValueOf(testCase.fn).Pointer()).Name()
  83. if i := strings.LastIndex(fnName, "."); i != -1 {
  84. fnName = fnName[i+1:]
  85. }
  86. t.Run(
  87. fmt.Sprintf("%s/Monotonic", fnName),
  88. func(t *testing.T) {
  89. testCase.fn(t, testCase.obj(), assertMonotonicInvariants)
  90. },
  91. )
  92. t.Run(
  93. fmt.Sprintf("%s/Burst", fnName),
  94. func(t *testing.T) {
  95. set := burst(testCase.obj())
  96. testCase.fn(t, set, assertBurstInvariants)
  97. },
  98. )
  99. }
  100. }
  101. func CreatesPods(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
  102. client := fake.NewSimpleClientset(set)
  103. spc, _, ssc, stop := setupController(client)
  104. defer close(stop)
  105. if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil {
  106. t.Errorf("Failed to turn up StatefulSet : %s", err)
  107. }
  108. var err error
  109. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  110. if err != nil {
  111. t.Fatalf("Error getting updated StatefulSet: %v", err)
  112. }
  113. if set.Status.Replicas != 3 {
  114. t.Error("Failed to scale statefulset to 3 replicas")
  115. }
  116. if set.Status.ReadyReplicas != 3 {
  117. t.Error("Failed to set ReadyReplicas correctly")
  118. }
  119. if set.Status.UpdatedReplicas != 3 {
  120. t.Error("Failed to set UpdatedReplicas correctly")
  121. }
  122. }
  123. func ScalesUp(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
  124. client := fake.NewSimpleClientset(set)
  125. spc, _, ssc, stop := setupController(client)
  126. defer close(stop)
  127. if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil {
  128. t.Errorf("Failed to turn up StatefulSet : %s", err)
  129. }
  130. *set.Spec.Replicas = 4
  131. if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil {
  132. t.Errorf("Failed to scale StatefulSet : %s", err)
  133. }
  134. var err error
  135. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  136. if err != nil {
  137. t.Fatalf("Error getting updated StatefulSet: %v", err)
  138. }
  139. if set.Status.Replicas != 4 {
  140. t.Error("Failed to scale statefulset to 4 replicas")
  141. }
  142. if set.Status.ReadyReplicas != 4 {
  143. t.Error("Failed to set readyReplicas correctly")
  144. }
  145. if set.Status.UpdatedReplicas != 4 {
  146. t.Error("Failed to set updatedReplicas correctly")
  147. }
  148. }
  149. func ScalesDown(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
  150. client := fake.NewSimpleClientset(set)
  151. spc, _, ssc, stop := setupController(client)
  152. defer close(stop)
  153. if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil {
  154. t.Errorf("Failed to turn up StatefulSet : %s", err)
  155. }
  156. *set.Spec.Replicas = 0
  157. if err := scaleDownStatefulSetControl(set, ssc, spc, invariants); err != nil {
  158. t.Errorf("Failed to scale StatefulSet : %s", err)
  159. }
  160. if set.Status.Replicas != 0 {
  161. t.Error("Failed to scale statefulset to 0 replicas")
  162. }
  163. if set.Status.ReadyReplicas != 0 {
  164. t.Error("Failed to set readyReplicas correctly")
  165. }
  166. if set.Status.UpdatedReplicas != 0 {
  167. t.Error("Failed to set updatedReplicas correctly")
  168. }
  169. }
  170. func ReplacesPods(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
  171. client := fake.NewSimpleClientset(set)
  172. spc, _, ssc, stop := setupController(client)
  173. defer close(stop)
  174. if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil {
  175. t.Errorf("Failed to turn up StatefulSet : %s", err)
  176. }
  177. var err error
  178. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  179. if err != nil {
  180. t.Fatalf("Error getting updated StatefulSet: %v", err)
  181. }
  182. if set.Status.Replicas != 5 {
  183. t.Error("Failed to scale statefulset to 5 replicas")
  184. }
  185. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  186. if err != nil {
  187. t.Error(err)
  188. }
  189. pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  190. if err != nil {
  191. t.Error(err)
  192. }
  193. sort.Sort(ascendingOrdinal(pods))
  194. spc.podsIndexer.Delete(pods[0])
  195. spc.podsIndexer.Delete(pods[2])
  196. spc.podsIndexer.Delete(pods[4])
  197. for i := 0; i < 5; i += 2 {
  198. pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  199. if err != nil {
  200. t.Error(err)
  201. }
  202. if err = ssc.UpdateStatefulSet(set, pods); err != nil {
  203. t.Errorf("Failed to update StatefulSet : %s", err)
  204. }
  205. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  206. if err != nil {
  207. t.Fatalf("Error getting updated StatefulSet: %v", err)
  208. }
  209. if pods, err = spc.setPodRunning(set, i); err != nil {
  210. t.Error(err)
  211. }
  212. if err = ssc.UpdateStatefulSet(set, pods); err != nil {
  213. t.Errorf("Failed to update StatefulSet : %s", err)
  214. }
  215. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  216. if err != nil {
  217. t.Fatalf("Error getting updated StatefulSet: %v", err)
  218. }
  219. if pods, err = spc.setPodReady(set, i); err != nil {
  220. t.Error(err)
  221. }
  222. }
  223. pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
  224. if err != nil {
  225. t.Error(err)
  226. }
  227. if err := ssc.UpdateStatefulSet(set, pods); err != nil {
  228. t.Errorf("Failed to update StatefulSet : %s", err)
  229. }
  230. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  231. if err != nil {
  232. t.Fatalf("Error getting updated StatefulSet: %v", err)
  233. }
  234. if e, a := int32(5), set.Status.Replicas; e != a {
  235. t.Errorf("Expected to scale to %d, got %d", e, a)
  236. }
  237. }
  238. func RecreatesFailedPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
  239. client := fake.NewSimpleClientset()
  240. spc, _, ssc, stop := setupController(client)
  241. defer close(stop)
  242. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  243. if err != nil {
  244. t.Error(err)
  245. }
  246. pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  247. if err != nil {
  248. t.Error(err)
  249. }
  250. if err := ssc.UpdateStatefulSet(set, pods); err != nil {
  251. t.Errorf("Error updating StatefulSet %s", err)
  252. }
  253. if err := invariants(set, spc); err != nil {
  254. t.Error(err)
  255. }
  256. pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
  257. if err != nil {
  258. t.Error(err)
  259. }
  260. pods[0].Status.Phase = v1.PodFailed
  261. spc.podsIndexer.Update(pods[0])
  262. if err := ssc.UpdateStatefulSet(set, pods); err != nil {
  263. t.Errorf("Error updating StatefulSet %s", err)
  264. }
  265. if err := invariants(set, spc); err != nil {
  266. t.Error(err)
  267. }
  268. pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
  269. if err != nil {
  270. t.Error(err)
  271. }
  272. if isCreated(pods[0]) {
  273. t.Error("StatefulSet did not recreate failed Pod")
  274. }
  275. }
  276. func CreatePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
  277. client := fake.NewSimpleClientset(set)
  278. spc, _, ssc, stop := setupController(client)
  279. defer close(stop)
  280. spc.SetCreateStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 2)
  281. if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); !apierrors.IsInternalError(err) {
  282. t.Errorf("StatefulSetControl did not return InternalError found %s", err)
  283. }
  284. if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil {
  285. t.Errorf("Failed to turn up StatefulSet : %s", err)
  286. }
  287. var err error
  288. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  289. if err != nil {
  290. t.Fatalf("Error getting updated StatefulSet: %v", err)
  291. }
  292. if set.Status.Replicas != 3 {
  293. t.Error("Failed to scale StatefulSet to 3 replicas")
  294. }
  295. if set.Status.ReadyReplicas != 3 {
  296. t.Error("Failed to set readyReplicas correctly")
  297. }
  298. if set.Status.UpdatedReplicas != 3 {
  299. t.Error("Failed to updatedReplicas correctly")
  300. }
  301. }
  302. func UpdatePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
  303. client := fake.NewSimpleClientset(set)
  304. spc, _, ssc, stop := setupController(client)
  305. defer close(stop)
  306. spc.SetUpdateStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 0)
  307. // have to have 1 successful loop first
  308. if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil {
  309. t.Fatalf("Unexpected error: %v", err)
  310. }
  311. var err error
  312. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  313. if err != nil {
  314. t.Fatalf("Error getting updated StatefulSet: %v", err)
  315. }
  316. if set.Status.Replicas != 3 {
  317. t.Error("Failed to scale StatefulSet to 3 replicas")
  318. }
  319. if set.Status.ReadyReplicas != 3 {
  320. t.Error("Failed to set readyReplicas correctly")
  321. }
  322. if set.Status.UpdatedReplicas != 3 {
  323. t.Error("Failed to set updatedReplicas correctly")
  324. }
  325. // now mutate a pod's identity
  326. pods, err := spc.podsLister.List(labels.Everything())
  327. if err != nil {
  328. t.Fatalf("Error listing pods: %v", err)
  329. }
  330. if len(pods) != 3 {
  331. t.Fatalf("Expected 3 pods, got %d", len(pods))
  332. }
  333. sort.Sort(ascendingOrdinal(pods))
  334. pods[0].Name = "goo-0"
  335. spc.podsIndexer.Update(pods[0])
  336. // now it should fail
  337. if err := ssc.UpdateStatefulSet(set, pods); !apierrors.IsInternalError(err) {
  338. t.Errorf("StatefulSetControl did not return InternalError found %s", err)
  339. }
  340. }
  341. func UpdateSetStatusFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
  342. client := fake.NewSimpleClientset(set)
  343. spc, ssu, ssc, stop := setupController(client)
  344. defer close(stop)
  345. ssu.SetUpdateStatefulSetStatusError(apierrors.NewInternalError(errors.New("API server failed")), 2)
  346. if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); !apierrors.IsInternalError(err) {
  347. t.Errorf("StatefulSetControl did not return InternalError found %s", err)
  348. }
  349. if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil {
  350. t.Errorf("Failed to turn up StatefulSet : %s", err)
  351. }
  352. var err error
  353. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  354. if err != nil {
  355. t.Fatalf("Error getting updated StatefulSet: %v", err)
  356. }
  357. if set.Status.Replicas != 3 {
  358. t.Error("Failed to scale StatefulSet to 3 replicas")
  359. }
  360. if set.Status.ReadyReplicas != 3 {
  361. t.Error("Failed to set readyReplicas to 3")
  362. }
  363. if set.Status.UpdatedReplicas != 3 {
  364. t.Error("Failed to set updatedReplicas to 3")
  365. }
  366. }
  367. func PodRecreateDeleteFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
  368. client := fake.NewSimpleClientset(set)
  369. spc, _, ssc, stop := setupController(client)
  370. defer close(stop)
  371. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  372. if err != nil {
  373. t.Error(err)
  374. }
  375. pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  376. if err != nil {
  377. t.Error(err)
  378. }
  379. if err := ssc.UpdateStatefulSet(set, pods); err != nil {
  380. t.Errorf("Error updating StatefulSet %s", err)
  381. }
  382. if err := invariants(set, spc); err != nil {
  383. t.Error(err)
  384. }
  385. pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
  386. if err != nil {
  387. t.Error(err)
  388. }
  389. pods[0].Status.Phase = v1.PodFailed
  390. spc.podsIndexer.Update(pods[0])
  391. spc.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 0)
  392. if err := ssc.UpdateStatefulSet(set, pods); !apierrors.IsInternalError(err) {
  393. t.Errorf("StatefulSet failed to %s", err)
  394. }
  395. if err := invariants(set, spc); err != nil {
  396. t.Error(err)
  397. }
  398. if err := ssc.UpdateStatefulSet(set, pods); err != nil {
  399. t.Errorf("Error updating StatefulSet %s", err)
  400. }
  401. if err := invariants(set, spc); err != nil {
  402. t.Error(err)
  403. }
  404. pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
  405. if err != nil {
  406. t.Error(err)
  407. }
  408. if isCreated(pods[0]) {
  409. t.Error("StatefulSet did not recreate failed Pod")
  410. }
  411. }
  412. func TestStatefulSetControlScaleDownDeleteError(t *testing.T) {
  413. invariants := assertMonotonicInvariants
  414. set := newStatefulSet(3)
  415. client := fake.NewSimpleClientset(set)
  416. spc, _, ssc, stop := setupController(client)
  417. defer close(stop)
  418. if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil {
  419. t.Errorf("Failed to turn up StatefulSet : %s", err)
  420. }
  421. var err error
  422. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  423. if err != nil {
  424. t.Fatalf("Error getting updated StatefulSet: %v", err)
  425. }
  426. *set.Spec.Replicas = 0
  427. spc.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 2)
  428. if err := scaleDownStatefulSetControl(set, ssc, spc, invariants); !apierrors.IsInternalError(err) {
  429. t.Errorf("StatefulSetControl failed to throw error on delete %s", err)
  430. }
  431. if err := scaleDownStatefulSetControl(set, ssc, spc, invariants); err != nil {
  432. t.Errorf("Failed to turn down StatefulSet %s", err)
  433. }
  434. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  435. if err != nil {
  436. t.Fatalf("Error getting updated StatefulSet: %v", err)
  437. }
  438. if set.Status.Replicas != 0 {
  439. t.Error("Failed to scale statefulset to 0 replicas")
  440. }
  441. if set.Status.ReadyReplicas != 0 {
  442. t.Error("Failed to set readyReplicas to 0")
  443. }
  444. if set.Status.UpdatedReplicas != 0 {
  445. t.Error("Failed to set updatedReplicas to 0")
  446. }
  447. }
  448. func TestStatefulSetControl_getSetRevisions(t *testing.T) {
  449. type testcase struct {
  450. name string
  451. existing []*apps.ControllerRevision
  452. set *apps.StatefulSet
  453. expectedCount int
  454. expectedCurrent *apps.ControllerRevision
  455. expectedUpdate *apps.ControllerRevision
  456. err bool
  457. }
  458. testFn := func(test *testcase, t *testing.T) {
  459. client := fake.NewSimpleClientset()
  460. informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
  461. spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1().StatefulSets(), informerFactory.Apps().V1().ControllerRevisions())
  462. ssu := newFakeStatefulSetStatusUpdater(informerFactory.Apps().V1().StatefulSets())
  463. recorder := record.NewFakeRecorder(10)
  464. ssc := defaultStatefulSetControl{spc, ssu, history.NewFakeHistory(informerFactory.Apps().V1().ControllerRevisions()), recorder}
  465. stop := make(chan struct{})
  466. defer close(stop)
  467. informerFactory.Start(stop)
  468. cache.WaitForCacheSync(
  469. stop,
  470. informerFactory.Apps().V1().StatefulSets().Informer().HasSynced,
  471. informerFactory.Core().V1().Pods().Informer().HasSynced,
  472. informerFactory.Apps().V1().ControllerRevisions().Informer().HasSynced,
  473. )
  474. test.set.Status.CollisionCount = new(int32)
  475. for i := range test.existing {
  476. ssc.controllerHistory.CreateControllerRevision(test.set, test.existing[i], test.set.Status.CollisionCount)
  477. }
  478. revisions, err := ssc.ListRevisions(test.set)
  479. if err != nil {
  480. t.Fatal(err)
  481. }
  482. current, update, _, err := ssc.getStatefulSetRevisions(test.set, revisions)
  483. if err != nil {
  484. t.Fatalf("error getting statefulset revisions:%v", err)
  485. }
  486. revisions, err = ssc.ListRevisions(test.set)
  487. if err != nil {
  488. t.Fatal(err)
  489. }
  490. if len(revisions) != test.expectedCount {
  491. t.Errorf("%s: want %d revisions got %d", test.name, test.expectedCount, len(revisions))
  492. }
  493. if test.err && err == nil {
  494. t.Errorf("%s: expected error", test.name)
  495. }
  496. if !test.err && !history.EqualRevision(current, test.expectedCurrent) {
  497. t.Errorf("%s: for current want %v got %v", test.name, test.expectedCurrent, current)
  498. }
  499. if !test.err && !history.EqualRevision(update, test.expectedUpdate) {
  500. t.Errorf("%s: for update want %v got %v", test.name, test.expectedUpdate, update)
  501. }
  502. if !test.err && test.expectedCurrent != nil && current != nil && test.expectedCurrent.Revision != current.Revision {
  503. t.Errorf("%s: for current revision want %d got %d", test.name, test.expectedCurrent.Revision, current.Revision)
  504. }
  505. if !test.err && test.expectedUpdate != nil && update != nil && test.expectedUpdate.Revision != update.Revision {
  506. t.Errorf("%s: for update revision want %d got %d", test.name, test.expectedUpdate.Revision, update.Revision)
  507. }
  508. }
  509. updateRevision := func(cr *apps.ControllerRevision, revision int64) *apps.ControllerRevision {
  510. clone := cr.DeepCopy()
  511. clone.Revision = revision
  512. return clone
  513. }
  514. set := newStatefulSet(3)
  515. set.Status.CollisionCount = new(int32)
  516. rev0 := newRevisionOrDie(set, 1)
  517. set1 := set.DeepCopy()
  518. set1.Spec.Template.Spec.Containers[0].Image = "foo"
  519. set1.Status.CurrentRevision = rev0.Name
  520. set1.Status.CollisionCount = new(int32)
  521. rev1 := newRevisionOrDie(set1, 2)
  522. set2 := set1.DeepCopy()
  523. set2.Spec.Template.Labels["new"] = "label"
  524. set2.Status.CurrentRevision = rev0.Name
  525. set2.Status.CollisionCount = new(int32)
  526. rev2 := newRevisionOrDie(set2, 3)
  527. tests := []testcase{
  528. {
  529. name: "creates initial revision",
  530. existing: nil,
  531. set: set,
  532. expectedCount: 1,
  533. expectedCurrent: rev0,
  534. expectedUpdate: rev0,
  535. err: false,
  536. },
  537. {
  538. name: "creates revision on update",
  539. existing: []*apps.ControllerRevision{rev0},
  540. set: set1,
  541. expectedCount: 2,
  542. expectedCurrent: rev0,
  543. expectedUpdate: rev1,
  544. err: false,
  545. },
  546. {
  547. name: "must not recreate a new revision of same set",
  548. existing: []*apps.ControllerRevision{rev0, rev1},
  549. set: set1,
  550. expectedCount: 2,
  551. expectedCurrent: rev0,
  552. expectedUpdate: rev1,
  553. err: false,
  554. },
  555. {
  556. name: "must rollback to a previous revision",
  557. existing: []*apps.ControllerRevision{rev0, rev1, rev2},
  558. set: set1,
  559. expectedCount: 3,
  560. expectedCurrent: rev0,
  561. expectedUpdate: updateRevision(rev1, 4),
  562. err: false,
  563. },
  564. }
  565. for i := range tests {
  566. testFn(&tests[i], t)
  567. }
  568. }
  569. func TestStatefulSetControlRollingUpdate(t *testing.T) {
  570. type testcase struct {
  571. name string
  572. invariants func(set *apps.StatefulSet, spc *fakeStatefulPodControl) error
  573. initial func() *apps.StatefulSet
  574. update func(set *apps.StatefulSet) *apps.StatefulSet
  575. validate func(set *apps.StatefulSet, pods []*v1.Pod) error
  576. }
  577. testFn := func(test *testcase, t *testing.T) {
  578. set := test.initial()
  579. client := fake.NewSimpleClientset(set)
  580. spc, _, ssc, stop := setupController(client)
  581. defer close(stop)
  582. if err := scaleUpStatefulSetControl(set, ssc, spc, test.invariants); err != nil {
  583. t.Fatalf("%s: %s", test.name, err)
  584. }
  585. set, err := spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  586. if err != nil {
  587. t.Fatalf("%s: %s", test.name, err)
  588. }
  589. set = test.update(set)
  590. if err := updateStatefulSetControl(set, ssc, spc, assertUpdateInvariants); err != nil {
  591. t.Fatalf("%s: %s", test.name, err)
  592. }
  593. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  594. if err != nil {
  595. t.Fatalf("%s: %s", test.name, err)
  596. }
  597. pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  598. if err != nil {
  599. t.Fatalf("%s: %s", test.name, err)
  600. }
  601. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  602. if err != nil {
  603. t.Fatalf("%s: %s", test.name, err)
  604. }
  605. if err := test.validate(set, pods); err != nil {
  606. t.Fatalf("%s: %s", test.name, err)
  607. }
  608. }
  609. tests := []testcase{
  610. {
  611. name: "monotonic image update",
  612. invariants: assertMonotonicInvariants,
  613. initial: func() *apps.StatefulSet {
  614. return newStatefulSet(3)
  615. },
  616. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  617. set.Spec.Template.Spec.Containers[0].Image = "foo"
  618. return set
  619. },
  620. validate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  621. sort.Sort(ascendingOrdinal(pods))
  622. for i := range pods {
  623. if pods[i].Spec.Containers[0].Image != "foo" {
  624. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  625. }
  626. }
  627. return nil
  628. },
  629. },
  630. {
  631. name: "monotonic image update and scale up",
  632. invariants: assertMonotonicInvariants,
  633. initial: func() *apps.StatefulSet {
  634. return newStatefulSet(3)
  635. },
  636. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  637. *set.Spec.Replicas = 5
  638. set.Spec.Template.Spec.Containers[0].Image = "foo"
  639. return set
  640. },
  641. validate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  642. sort.Sort(ascendingOrdinal(pods))
  643. for i := range pods {
  644. if pods[i].Spec.Containers[0].Image != "foo" {
  645. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  646. }
  647. }
  648. return nil
  649. },
  650. },
  651. {
  652. name: "monotonic image update and scale down",
  653. invariants: assertMonotonicInvariants,
  654. initial: func() *apps.StatefulSet {
  655. return newStatefulSet(5)
  656. },
  657. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  658. *set.Spec.Replicas = 3
  659. set.Spec.Template.Spec.Containers[0].Image = "foo"
  660. return set
  661. },
  662. validate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  663. sort.Sort(ascendingOrdinal(pods))
  664. for i := range pods {
  665. if pods[i].Spec.Containers[0].Image != "foo" {
  666. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  667. }
  668. }
  669. return nil
  670. },
  671. },
  672. {
  673. name: "burst image update",
  674. invariants: assertBurstInvariants,
  675. initial: func() *apps.StatefulSet {
  676. return burst(newStatefulSet(3))
  677. },
  678. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  679. set.Spec.Template.Spec.Containers[0].Image = "foo"
  680. return set
  681. },
  682. validate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  683. sort.Sort(ascendingOrdinal(pods))
  684. for i := range pods {
  685. if pods[i].Spec.Containers[0].Image != "foo" {
  686. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  687. }
  688. }
  689. return nil
  690. },
  691. },
  692. {
  693. name: "burst image update and scale up",
  694. invariants: assertBurstInvariants,
  695. initial: func() *apps.StatefulSet {
  696. return burst(newStatefulSet(3))
  697. },
  698. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  699. *set.Spec.Replicas = 5
  700. set.Spec.Template.Spec.Containers[0].Image = "foo"
  701. return set
  702. },
  703. validate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  704. sort.Sort(ascendingOrdinal(pods))
  705. for i := range pods {
  706. if pods[i].Spec.Containers[0].Image != "foo" {
  707. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  708. }
  709. }
  710. return nil
  711. },
  712. },
  713. {
  714. name: "burst image update and scale down",
  715. invariants: assertBurstInvariants,
  716. initial: func() *apps.StatefulSet {
  717. return burst(newStatefulSet(5))
  718. },
  719. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  720. *set.Spec.Replicas = 3
  721. set.Spec.Template.Spec.Containers[0].Image = "foo"
  722. return set
  723. },
  724. validate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  725. sort.Sort(ascendingOrdinal(pods))
  726. for i := range pods {
  727. if pods[i].Spec.Containers[0].Image != "foo" {
  728. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  729. }
  730. }
  731. return nil
  732. },
  733. },
  734. }
  735. for i := range tests {
  736. testFn(&tests[i], t)
  737. }
  738. }
  739. func TestStatefulSetControlOnDeleteUpdate(t *testing.T) {
  740. type testcase struct {
  741. name string
  742. invariants func(set *apps.StatefulSet, spc *fakeStatefulPodControl) error
  743. initial func() *apps.StatefulSet
  744. update func(set *apps.StatefulSet) *apps.StatefulSet
  745. validateUpdate func(set *apps.StatefulSet, pods []*v1.Pod) error
  746. validateRestart func(set *apps.StatefulSet, pods []*v1.Pod) error
  747. }
  748. originalImage := newStatefulSet(3).Spec.Template.Spec.Containers[0].Image
  749. testFn := func(test *testcase, t *testing.T) {
  750. set := test.initial()
  751. set.Spec.UpdateStrategy = apps.StatefulSetUpdateStrategy{Type: apps.OnDeleteStatefulSetStrategyType}
  752. client := fake.NewSimpleClientset(set)
  753. spc, _, ssc, stop := setupController(client)
  754. defer close(stop)
  755. if err := scaleUpStatefulSetControl(set, ssc, spc, test.invariants); err != nil {
  756. t.Fatalf("%s: %s", test.name, err)
  757. }
  758. set, err := spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  759. if err != nil {
  760. t.Fatalf("%s: %s", test.name, err)
  761. }
  762. set = test.update(set)
  763. if err := updateStatefulSetControl(set, ssc, spc, assertUpdateInvariants); err != nil {
  764. t.Fatalf("%s: %s", test.name, err)
  765. }
  766. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  767. if err != nil {
  768. t.Fatalf("%s: %s", test.name, err)
  769. }
  770. pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  771. if err != nil {
  772. t.Fatalf("%s: %s", test.name, err)
  773. }
  774. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  775. if err != nil {
  776. t.Fatalf("%s: %s", test.name, err)
  777. }
  778. if err := test.validateUpdate(set, pods); err != nil {
  779. for i := range pods {
  780. t.Log(pods[i].Name)
  781. }
  782. t.Fatalf("%s: %s", test.name, err)
  783. }
  784. replicas := *set.Spec.Replicas
  785. *set.Spec.Replicas = 0
  786. if err := scaleDownStatefulSetControl(set, ssc, spc, test.invariants); err != nil {
  787. t.Fatalf("%s: %s", test.name, err)
  788. }
  789. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  790. if err != nil {
  791. t.Fatalf("%s: %s", test.name, err)
  792. }
  793. *set.Spec.Replicas = replicas
  794. if err := scaleUpStatefulSetControl(set, ssc, spc, test.invariants); err != nil {
  795. t.Fatalf("%s: %s", test.name, err)
  796. }
  797. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  798. if err != nil {
  799. t.Fatalf("%s: %s", test.name, err)
  800. }
  801. pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
  802. if err != nil {
  803. t.Fatalf("%s: %s", test.name, err)
  804. }
  805. if err := test.validateRestart(set, pods); err != nil {
  806. t.Fatalf("%s: %s", test.name, err)
  807. }
  808. }
  809. tests := []testcase{
  810. {
  811. name: "monotonic image update",
  812. invariants: assertMonotonicInvariants,
  813. initial: func() *apps.StatefulSet {
  814. return newStatefulSet(3)
  815. },
  816. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  817. set.Spec.Template.Spec.Containers[0].Image = "foo"
  818. return set
  819. },
  820. validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  821. sort.Sort(ascendingOrdinal(pods))
  822. for i := range pods {
  823. if pods[i].Spec.Containers[0].Image != originalImage {
  824. return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  825. }
  826. }
  827. return nil
  828. },
  829. validateRestart: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  830. sort.Sort(ascendingOrdinal(pods))
  831. for i := range pods {
  832. if pods[i].Spec.Containers[0].Image != "foo" {
  833. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  834. }
  835. }
  836. return nil
  837. },
  838. },
  839. {
  840. name: "monotonic image update and scale up",
  841. invariants: assertMonotonicInvariants,
  842. initial: func() *apps.StatefulSet {
  843. return newStatefulSet(3)
  844. },
  845. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  846. *set.Spec.Replicas = 5
  847. set.Spec.Template.Spec.Containers[0].Image = "foo"
  848. return set
  849. },
  850. validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  851. sort.Sort(ascendingOrdinal(pods))
  852. for i := range pods {
  853. if i < 3 && pods[i].Spec.Containers[0].Image != originalImage {
  854. return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  855. }
  856. if i >= 3 && pods[i].Spec.Containers[0].Image != "foo" {
  857. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  858. }
  859. }
  860. return nil
  861. },
  862. validateRestart: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  863. sort.Sort(ascendingOrdinal(pods))
  864. for i := range pods {
  865. if pods[i].Spec.Containers[0].Image != "foo" {
  866. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  867. }
  868. }
  869. return nil
  870. },
  871. },
  872. {
  873. name: "monotonic image update and scale down",
  874. invariants: assertMonotonicInvariants,
  875. initial: func() *apps.StatefulSet {
  876. return newStatefulSet(5)
  877. },
  878. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  879. *set.Spec.Replicas = 3
  880. set.Spec.Template.Spec.Containers[0].Image = "foo"
  881. return set
  882. },
  883. validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  884. sort.Sort(ascendingOrdinal(pods))
  885. for i := range pods {
  886. if pods[i].Spec.Containers[0].Image != originalImage {
  887. return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  888. }
  889. }
  890. return nil
  891. },
  892. validateRestart: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  893. sort.Sort(ascendingOrdinal(pods))
  894. for i := range pods {
  895. if pods[i].Spec.Containers[0].Image != "foo" {
  896. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  897. }
  898. }
  899. return nil
  900. },
  901. },
  902. {
  903. name: "burst image update",
  904. invariants: assertBurstInvariants,
  905. initial: func() *apps.StatefulSet {
  906. return burst(newStatefulSet(3))
  907. },
  908. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  909. set.Spec.Template.Spec.Containers[0].Image = "foo"
  910. return set
  911. },
  912. validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  913. sort.Sort(ascendingOrdinal(pods))
  914. for i := range pods {
  915. if pods[i].Spec.Containers[0].Image != originalImage {
  916. return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  917. }
  918. }
  919. return nil
  920. },
  921. validateRestart: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  922. sort.Sort(ascendingOrdinal(pods))
  923. for i := range pods {
  924. if pods[i].Spec.Containers[0].Image != "foo" {
  925. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  926. }
  927. }
  928. return nil
  929. },
  930. },
  931. {
  932. name: "burst image update and scale up",
  933. invariants: assertBurstInvariants,
  934. initial: func() *apps.StatefulSet {
  935. return burst(newStatefulSet(3))
  936. },
  937. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  938. *set.Spec.Replicas = 5
  939. set.Spec.Template.Spec.Containers[0].Image = "foo"
  940. return set
  941. },
  942. validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  943. sort.Sort(ascendingOrdinal(pods))
  944. for i := range pods {
  945. if i < 3 && pods[i].Spec.Containers[0].Image != originalImage {
  946. return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  947. }
  948. if i >= 3 && pods[i].Spec.Containers[0].Image != "foo" {
  949. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  950. }
  951. }
  952. return nil
  953. },
  954. validateRestart: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  955. sort.Sort(ascendingOrdinal(pods))
  956. for i := range pods {
  957. if pods[i].Spec.Containers[0].Image != "foo" {
  958. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  959. }
  960. }
  961. return nil
  962. },
  963. },
  964. {
  965. name: "burst image update and scale down",
  966. invariants: assertBurstInvariants,
  967. initial: func() *apps.StatefulSet {
  968. return burst(newStatefulSet(5))
  969. },
  970. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  971. *set.Spec.Replicas = 3
  972. set.Spec.Template.Spec.Containers[0].Image = "foo"
  973. return set
  974. },
  975. validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  976. sort.Sort(ascendingOrdinal(pods))
  977. for i := range pods {
  978. if pods[i].Spec.Containers[0].Image != originalImage {
  979. return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  980. }
  981. }
  982. return nil
  983. },
  984. validateRestart: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  985. sort.Sort(ascendingOrdinal(pods))
  986. for i := range pods {
  987. if pods[i].Spec.Containers[0].Image != "foo" {
  988. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  989. }
  990. }
  991. return nil
  992. },
  993. },
  994. }
  995. for i := range tests {
  996. testFn(&tests[i], t)
  997. }
  998. }
  999. func TestStatefulSetControlRollingUpdateWithPartition(t *testing.T) {
  1000. type testcase struct {
  1001. name string
  1002. partition int32
  1003. invariants func(set *apps.StatefulSet, spc *fakeStatefulPodControl) error
  1004. initial func() *apps.StatefulSet
  1005. update func(set *apps.StatefulSet) *apps.StatefulSet
  1006. validate func(set *apps.StatefulSet, pods []*v1.Pod) error
  1007. }
  1008. testFn := func(test *testcase, t *testing.T) {
  1009. set := test.initial()
  1010. set.Spec.UpdateStrategy = apps.StatefulSetUpdateStrategy{
  1011. Type: apps.RollingUpdateStatefulSetStrategyType,
  1012. RollingUpdate: func() *apps.RollingUpdateStatefulSetStrategy {
  1013. return &apps.RollingUpdateStatefulSetStrategy{Partition: &test.partition}
  1014. }(),
  1015. }
  1016. client := fake.NewSimpleClientset(set)
  1017. spc, _, ssc, stop := setupController(client)
  1018. defer close(stop)
  1019. if err := scaleUpStatefulSetControl(set, ssc, spc, test.invariants); err != nil {
  1020. t.Fatalf("%s: %s", test.name, err)
  1021. }
  1022. set, err := spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  1023. if err != nil {
  1024. t.Fatalf("%s: %s", test.name, err)
  1025. }
  1026. set = test.update(set)
  1027. if err := updateStatefulSetControl(set, ssc, spc, assertUpdateInvariants); err != nil {
  1028. t.Fatalf("%s: %s", test.name, err)
  1029. }
  1030. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  1031. if err != nil {
  1032. t.Fatalf("%s: %s", test.name, err)
  1033. }
  1034. pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  1035. if err != nil {
  1036. t.Fatalf("%s: %s", test.name, err)
  1037. }
  1038. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  1039. if err != nil {
  1040. t.Fatalf("%s: %s", test.name, err)
  1041. }
  1042. if err := test.validate(set, pods); err != nil {
  1043. t.Fatalf("%s: %s", test.name, err)
  1044. }
  1045. }
  1046. originalImage := newStatefulSet(3).Spec.Template.Spec.Containers[0].Image
  1047. tests := []testcase{
  1048. {
  1049. name: "monotonic image update",
  1050. invariants: assertMonotonicInvariants,
  1051. partition: 2,
  1052. initial: func() *apps.StatefulSet {
  1053. return newStatefulSet(3)
  1054. },
  1055. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  1056. set.Spec.Template.Spec.Containers[0].Image = "foo"
  1057. return set
  1058. },
  1059. validate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1060. sort.Sort(ascendingOrdinal(pods))
  1061. for i := range pods {
  1062. if i < 2 && pods[i].Spec.Containers[0].Image != originalImage {
  1063. return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  1064. }
  1065. if i >= 2 && pods[i].Spec.Containers[0].Image != "foo" {
  1066. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  1067. }
  1068. }
  1069. return nil
  1070. },
  1071. },
  1072. {
  1073. name: "monotonic image update and scale up",
  1074. partition: 2,
  1075. invariants: assertMonotonicInvariants,
  1076. initial: func() *apps.StatefulSet {
  1077. return newStatefulSet(3)
  1078. },
  1079. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  1080. *set.Spec.Replicas = 5
  1081. set.Spec.Template.Spec.Containers[0].Image = "foo"
  1082. return set
  1083. },
  1084. validate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1085. sort.Sort(ascendingOrdinal(pods))
  1086. for i := range pods {
  1087. if i < 2 && pods[i].Spec.Containers[0].Image != originalImage {
  1088. return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  1089. }
  1090. if i >= 2 && pods[i].Spec.Containers[0].Image != "foo" {
  1091. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  1092. }
  1093. }
  1094. return nil
  1095. },
  1096. },
  1097. {
  1098. name: "burst image update",
  1099. partition: 2,
  1100. invariants: assertBurstInvariants,
  1101. initial: func() *apps.StatefulSet {
  1102. return burst(newStatefulSet(3))
  1103. },
  1104. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  1105. set.Spec.Template.Spec.Containers[0].Image = "foo"
  1106. return set
  1107. },
  1108. validate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1109. sort.Sort(ascendingOrdinal(pods))
  1110. for i := range pods {
  1111. if i < 2 && pods[i].Spec.Containers[0].Image != originalImage {
  1112. return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  1113. }
  1114. if i >= 2 && pods[i].Spec.Containers[0].Image != "foo" {
  1115. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  1116. }
  1117. }
  1118. return nil
  1119. },
  1120. },
  1121. {
  1122. name: "burst image update and scale up",
  1123. invariants: assertBurstInvariants,
  1124. partition: 2,
  1125. initial: func() *apps.StatefulSet {
  1126. return burst(newStatefulSet(3))
  1127. },
  1128. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  1129. *set.Spec.Replicas = 5
  1130. set.Spec.Template.Spec.Containers[0].Image = "foo"
  1131. return set
  1132. },
  1133. validate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1134. sort.Sort(ascendingOrdinal(pods))
  1135. for i := range pods {
  1136. if i < 2 && pods[i].Spec.Containers[0].Image != originalImage {
  1137. return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  1138. }
  1139. if i >= 2 && pods[i].Spec.Containers[0].Image != "foo" {
  1140. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  1141. }
  1142. }
  1143. return nil
  1144. },
  1145. },
  1146. }
  1147. for i := range tests {
  1148. testFn(&tests[i], t)
  1149. }
  1150. }
  1151. func TestStatefulSetControlLimitsHistory(t *testing.T) {
  1152. type testcase struct {
  1153. name string
  1154. invariants func(set *apps.StatefulSet, spc *fakeStatefulPodControl) error
  1155. initial func() *apps.StatefulSet
  1156. }
  1157. testFn := func(test *testcase, t *testing.T) {
  1158. set := test.initial()
  1159. client := fake.NewSimpleClientset(set)
  1160. spc, _, ssc, stop := setupController(client)
  1161. defer close(stop)
  1162. if err := scaleUpStatefulSetControl(set, ssc, spc, test.invariants); err != nil {
  1163. t.Fatalf("%s: %s", test.name, err)
  1164. }
  1165. set, err := spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  1166. if err != nil {
  1167. t.Fatalf("%s: %s", test.name, err)
  1168. }
  1169. for i := 0; i < 10; i++ {
  1170. set.Spec.Template.Spec.Containers[0].Image = fmt.Sprintf("foo-%d", i)
  1171. if err := updateStatefulSetControl(set, ssc, spc, assertUpdateInvariants); err != nil {
  1172. t.Fatalf("%s: %s", test.name, err)
  1173. }
  1174. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  1175. if err != nil {
  1176. t.Fatalf("%s: %s", test.name, err)
  1177. }
  1178. pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  1179. if err != nil {
  1180. t.Fatalf("%s: %s", test.name, err)
  1181. }
  1182. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  1183. if err != nil {
  1184. t.Fatalf("%s: %s", test.name, err)
  1185. }
  1186. err = ssc.UpdateStatefulSet(set, pods)
  1187. if err != nil {
  1188. t.Fatalf("%s: %s", test.name, err)
  1189. }
  1190. revisions, err := ssc.ListRevisions(set)
  1191. if err != nil {
  1192. t.Fatalf("%s: %s", test.name, err)
  1193. }
  1194. if len(revisions) > int(*set.Spec.RevisionHistoryLimit)+2 {
  1195. t.Fatalf("%s: %d greater than limit %d", test.name, len(revisions), *set.Spec.RevisionHistoryLimit)
  1196. }
  1197. }
  1198. }
  1199. tests := []testcase{
  1200. {
  1201. name: "monotonic update",
  1202. invariants: assertMonotonicInvariants,
  1203. initial: func() *apps.StatefulSet {
  1204. return newStatefulSet(3)
  1205. },
  1206. },
  1207. {
  1208. name: "burst update",
  1209. invariants: assertBurstInvariants,
  1210. initial: func() *apps.StatefulSet {
  1211. return burst(newStatefulSet(3))
  1212. },
  1213. },
  1214. }
  1215. for i := range tests {
  1216. testFn(&tests[i], t)
  1217. }
  1218. }
  1219. func TestStatefulSetControlRollback(t *testing.T) {
  1220. type testcase struct {
  1221. name string
  1222. invariants func(set *apps.StatefulSet, spc *fakeStatefulPodControl) error
  1223. initial func() *apps.StatefulSet
  1224. update func(set *apps.StatefulSet) *apps.StatefulSet
  1225. validateUpdate func(set *apps.StatefulSet, pods []*v1.Pod) error
  1226. validateRollback func(set *apps.StatefulSet, pods []*v1.Pod) error
  1227. }
  1228. originalImage := newStatefulSet(3).Spec.Template.Spec.Containers[0].Image
  1229. testFn := func(test *testcase, t *testing.T) {
  1230. set := test.initial()
  1231. client := fake.NewSimpleClientset(set)
  1232. spc, _, ssc, stop := setupController(client)
  1233. defer close(stop)
  1234. if err := scaleUpStatefulSetControl(set, ssc, spc, test.invariants); err != nil {
  1235. t.Fatalf("%s: %s", test.name, err)
  1236. }
  1237. set, err := spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  1238. if err != nil {
  1239. t.Fatalf("%s: %s", test.name, err)
  1240. }
  1241. set = test.update(set)
  1242. if err := updateStatefulSetControl(set, ssc, spc, assertUpdateInvariants); err != nil {
  1243. t.Fatalf("%s: %s", test.name, err)
  1244. }
  1245. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  1246. if err != nil {
  1247. t.Fatalf("%s: %s", test.name, err)
  1248. }
  1249. pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  1250. if err != nil {
  1251. t.Fatalf("%s: %s", test.name, err)
  1252. }
  1253. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  1254. if err != nil {
  1255. t.Fatalf("%s: %s", test.name, err)
  1256. }
  1257. if err := test.validateUpdate(set, pods); err != nil {
  1258. t.Fatalf("%s: %s", test.name, err)
  1259. }
  1260. revisions, err := ssc.ListRevisions(set)
  1261. if err != nil {
  1262. t.Fatalf("%s: %s", test.name, err)
  1263. }
  1264. history.SortControllerRevisions(revisions)
  1265. set, err = ApplyRevision(set, revisions[0])
  1266. if err != nil {
  1267. t.Fatalf("%s: %s", test.name, err)
  1268. }
  1269. if err := updateStatefulSetControl(set, ssc, spc, assertUpdateInvariants); err != nil {
  1270. t.Fatalf("%s: %s", test.name, err)
  1271. }
  1272. if err != nil {
  1273. t.Fatalf("%s: %s", test.name, err)
  1274. }
  1275. pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
  1276. if err != nil {
  1277. t.Fatalf("%s: %s", test.name, err)
  1278. }
  1279. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  1280. if err != nil {
  1281. t.Fatalf("%s: %s", test.name, err)
  1282. }
  1283. if err := test.validateRollback(set, pods); err != nil {
  1284. t.Fatalf("%s: %s", test.name, err)
  1285. }
  1286. }
  1287. tests := []testcase{
  1288. {
  1289. name: "monotonic image update",
  1290. invariants: assertMonotonicInvariants,
  1291. initial: func() *apps.StatefulSet {
  1292. return newStatefulSet(3)
  1293. },
  1294. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  1295. set.Spec.Template.Spec.Containers[0].Image = "foo"
  1296. return set
  1297. },
  1298. validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1299. sort.Sort(ascendingOrdinal(pods))
  1300. for i := range pods {
  1301. if pods[i].Spec.Containers[0].Image != "foo" {
  1302. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  1303. }
  1304. }
  1305. return nil
  1306. },
  1307. validateRollback: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1308. sort.Sort(ascendingOrdinal(pods))
  1309. for i := range pods {
  1310. if pods[i].Spec.Containers[0].Image != originalImage {
  1311. return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  1312. }
  1313. }
  1314. return nil
  1315. },
  1316. },
  1317. {
  1318. name: "monotonic image update and scale up",
  1319. invariants: assertMonotonicInvariants,
  1320. initial: func() *apps.StatefulSet {
  1321. return newStatefulSet(3)
  1322. },
  1323. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  1324. *set.Spec.Replicas = 5
  1325. set.Spec.Template.Spec.Containers[0].Image = "foo"
  1326. return set
  1327. },
  1328. validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1329. sort.Sort(ascendingOrdinal(pods))
  1330. for i := range pods {
  1331. if pods[i].Spec.Containers[0].Image != "foo" {
  1332. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  1333. }
  1334. }
  1335. return nil
  1336. },
  1337. validateRollback: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1338. sort.Sort(ascendingOrdinal(pods))
  1339. for i := range pods {
  1340. if pods[i].Spec.Containers[0].Image != originalImage {
  1341. return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  1342. }
  1343. }
  1344. return nil
  1345. },
  1346. },
  1347. {
  1348. name: "monotonic image update and scale down",
  1349. invariants: assertMonotonicInvariants,
  1350. initial: func() *apps.StatefulSet {
  1351. return newStatefulSet(5)
  1352. },
  1353. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  1354. *set.Spec.Replicas = 3
  1355. set.Spec.Template.Spec.Containers[0].Image = "foo"
  1356. return set
  1357. },
  1358. validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1359. sort.Sort(ascendingOrdinal(pods))
  1360. for i := range pods {
  1361. if pods[i].Spec.Containers[0].Image != "foo" {
  1362. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  1363. }
  1364. }
  1365. return nil
  1366. },
  1367. validateRollback: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1368. sort.Sort(ascendingOrdinal(pods))
  1369. for i := range pods {
  1370. if pods[i].Spec.Containers[0].Image != originalImage {
  1371. return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  1372. }
  1373. }
  1374. return nil
  1375. },
  1376. },
  1377. {
  1378. name: "burst image update",
  1379. invariants: assertBurstInvariants,
  1380. initial: func() *apps.StatefulSet {
  1381. return burst(newStatefulSet(3))
  1382. },
  1383. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  1384. set.Spec.Template.Spec.Containers[0].Image = "foo"
  1385. return set
  1386. },
  1387. validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1388. sort.Sort(ascendingOrdinal(pods))
  1389. for i := range pods {
  1390. if pods[i].Spec.Containers[0].Image != "foo" {
  1391. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  1392. }
  1393. }
  1394. return nil
  1395. },
  1396. validateRollback: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1397. sort.Sort(ascendingOrdinal(pods))
  1398. for i := range pods {
  1399. if pods[i].Spec.Containers[0].Image != originalImage {
  1400. return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  1401. }
  1402. }
  1403. return nil
  1404. },
  1405. },
  1406. {
  1407. name: "burst image update and scale up",
  1408. invariants: assertBurstInvariants,
  1409. initial: func() *apps.StatefulSet {
  1410. return burst(newStatefulSet(3))
  1411. },
  1412. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  1413. *set.Spec.Replicas = 5
  1414. set.Spec.Template.Spec.Containers[0].Image = "foo"
  1415. return set
  1416. },
  1417. validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1418. sort.Sort(ascendingOrdinal(pods))
  1419. for i := range pods {
  1420. if pods[i].Spec.Containers[0].Image != "foo" {
  1421. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  1422. }
  1423. }
  1424. return nil
  1425. },
  1426. validateRollback: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1427. sort.Sort(ascendingOrdinal(pods))
  1428. for i := range pods {
  1429. if pods[i].Spec.Containers[0].Image != originalImage {
  1430. return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  1431. }
  1432. }
  1433. return nil
  1434. },
  1435. },
  1436. {
  1437. name: "burst image update and scale down",
  1438. invariants: assertBurstInvariants,
  1439. initial: func() *apps.StatefulSet {
  1440. return burst(newStatefulSet(5))
  1441. },
  1442. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  1443. *set.Spec.Replicas = 3
  1444. set.Spec.Template.Spec.Containers[0].Image = "foo"
  1445. return set
  1446. },
  1447. validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1448. sort.Sort(ascendingOrdinal(pods))
  1449. for i := range pods {
  1450. if pods[i].Spec.Containers[0].Image != "foo" {
  1451. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  1452. }
  1453. }
  1454. return nil
  1455. },
  1456. validateRollback: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1457. sort.Sort(ascendingOrdinal(pods))
  1458. for i := range pods {
  1459. if pods[i].Spec.Containers[0].Image != originalImage {
  1460. return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  1461. }
  1462. }
  1463. return nil
  1464. },
  1465. },
  1466. }
  1467. for i := range tests {
  1468. testFn(&tests[i], t)
  1469. }
  1470. }
  1471. type requestTracker struct {
  1472. requests int
  1473. err error
  1474. after int
  1475. }
  1476. func (rt *requestTracker) errorReady() bool {
  1477. return rt.err != nil && rt.requests >= rt.after
  1478. }
  1479. func (rt *requestTracker) inc() {
  1480. rt.requests++
  1481. }
  1482. func (rt *requestTracker) reset() {
  1483. rt.err = nil
  1484. rt.after = 0
  1485. }
  1486. type fakeStatefulPodControl struct {
  1487. podsLister corelisters.PodLister
  1488. claimsLister corelisters.PersistentVolumeClaimLister
  1489. setsLister appslisters.StatefulSetLister
  1490. podsIndexer cache.Indexer
  1491. claimsIndexer cache.Indexer
  1492. setsIndexer cache.Indexer
  1493. revisionsIndexer cache.Indexer
  1494. createPodTracker requestTracker
  1495. updatePodTracker requestTracker
  1496. deletePodTracker requestTracker
  1497. }
  1498. func newFakeStatefulPodControl(podInformer coreinformers.PodInformer, setInformer appsinformers.StatefulSetInformer, revisionInformer appsinformers.ControllerRevisionInformer) *fakeStatefulPodControl {
  1499. claimsIndexer := cache.NewIndexer(controller.KeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
  1500. return &fakeStatefulPodControl{
  1501. podInformer.Lister(),
  1502. corelisters.NewPersistentVolumeClaimLister(claimsIndexer),
  1503. setInformer.Lister(),
  1504. podInformer.Informer().GetIndexer(),
  1505. claimsIndexer,
  1506. setInformer.Informer().GetIndexer(),
  1507. revisionInformer.Informer().GetIndexer(),
  1508. requestTracker{0, nil, 0},
  1509. requestTracker{0, nil, 0},
  1510. requestTracker{0, nil, 0}}
  1511. }
  1512. func (spc *fakeStatefulPodControl) SetCreateStatefulPodError(err error, after int) {
  1513. spc.createPodTracker.err = err
  1514. spc.createPodTracker.after = after
  1515. }
  1516. func (spc *fakeStatefulPodControl) SetUpdateStatefulPodError(err error, after int) {
  1517. spc.updatePodTracker.err = err
  1518. spc.updatePodTracker.after = after
  1519. }
  1520. func (spc *fakeStatefulPodControl) SetDeleteStatefulPodError(err error, after int) {
  1521. spc.deletePodTracker.err = err
  1522. spc.deletePodTracker.after = after
  1523. }
  1524. func (spc *fakeStatefulPodControl) setPodPending(set *apps.StatefulSet, ordinal int) ([]*v1.Pod, error) {
  1525. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  1526. if err != nil {
  1527. return nil, err
  1528. }
  1529. pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  1530. if err != nil {
  1531. return nil, err
  1532. }
  1533. if 0 > ordinal || ordinal >= len(pods) {
  1534. return nil, fmt.Errorf("ordinal %d out of range [0,%d)", ordinal, len(pods))
  1535. }
  1536. sort.Sort(ascendingOrdinal(pods))
  1537. pod := pods[ordinal].DeepCopy()
  1538. pod.Status.Phase = v1.PodPending
  1539. fakeResourceVersion(pod)
  1540. spc.podsIndexer.Update(pod)
  1541. return spc.podsLister.Pods(set.Namespace).List(selector)
  1542. }
  1543. func (spc *fakeStatefulPodControl) setPodRunning(set *apps.StatefulSet, ordinal int) ([]*v1.Pod, error) {
  1544. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  1545. if err != nil {
  1546. return nil, err
  1547. }
  1548. pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  1549. if err != nil {
  1550. return nil, err
  1551. }
  1552. if 0 > ordinal || ordinal >= len(pods) {
  1553. return nil, fmt.Errorf("ordinal %d out of range [0,%d)", ordinal, len(pods))
  1554. }
  1555. sort.Sort(ascendingOrdinal(pods))
  1556. pod := pods[ordinal].DeepCopy()
  1557. pod.Status.Phase = v1.PodRunning
  1558. fakeResourceVersion(pod)
  1559. spc.podsIndexer.Update(pod)
  1560. return spc.podsLister.Pods(set.Namespace).List(selector)
  1561. }
  1562. func (spc *fakeStatefulPodControl) setPodReady(set *apps.StatefulSet, ordinal int) ([]*v1.Pod, error) {
  1563. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  1564. if err != nil {
  1565. return nil, err
  1566. }
  1567. pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  1568. if err != nil {
  1569. return nil, err
  1570. }
  1571. if 0 > ordinal || ordinal >= len(pods) {
  1572. return nil, fmt.Errorf("ordinal %d out of range [0,%d)", ordinal, len(pods))
  1573. }
  1574. sort.Sort(ascendingOrdinal(pods))
  1575. pod := pods[ordinal].DeepCopy()
  1576. condition := v1.PodCondition{Type: v1.PodReady, Status: v1.ConditionTrue}
  1577. podutil.UpdatePodCondition(&pod.Status, &condition)
  1578. fakeResourceVersion(pod)
  1579. spc.podsIndexer.Update(pod)
  1580. return spc.podsLister.Pods(set.Namespace).List(selector)
  1581. }
  1582. func (spc *fakeStatefulPodControl) addTerminatingPod(set *apps.StatefulSet, ordinal int) ([]*v1.Pod, error) {
  1583. pod := newStatefulSetPod(set, ordinal)
  1584. pod.Status.Phase = v1.PodRunning
  1585. deleted := metav1.NewTime(time.Now())
  1586. pod.DeletionTimestamp = &deleted
  1587. condition := v1.PodCondition{Type: v1.PodReady, Status: v1.ConditionTrue}
  1588. fakeResourceVersion(pod)
  1589. podutil.UpdatePodCondition(&pod.Status, &condition)
  1590. spc.podsIndexer.Update(pod)
  1591. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  1592. if err != nil {
  1593. return nil, err
  1594. }
  1595. return spc.podsLister.Pods(set.Namespace).List(selector)
  1596. }
  1597. func (spc *fakeStatefulPodControl) setPodTerminated(set *apps.StatefulSet, ordinal int) ([]*v1.Pod, error) {
  1598. pod := newStatefulSetPod(set, ordinal)
  1599. deleted := metav1.NewTime(time.Now())
  1600. pod.DeletionTimestamp = &deleted
  1601. fakeResourceVersion(pod)
  1602. spc.podsIndexer.Update(pod)
  1603. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  1604. if err != nil {
  1605. return nil, err
  1606. }
  1607. return spc.podsLister.Pods(set.Namespace).List(selector)
  1608. }
  1609. func (spc *fakeStatefulPodControl) CreateStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error {
  1610. defer spc.createPodTracker.inc()
  1611. if spc.createPodTracker.errorReady() {
  1612. defer spc.createPodTracker.reset()
  1613. return spc.createPodTracker.err
  1614. }
  1615. for _, claim := range getPersistentVolumeClaims(set, pod) {
  1616. spc.claimsIndexer.Update(&claim)
  1617. }
  1618. spc.podsIndexer.Update(pod)
  1619. return nil
  1620. }
  1621. func (spc *fakeStatefulPodControl) UpdateStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error {
  1622. defer spc.updatePodTracker.inc()
  1623. if spc.updatePodTracker.errorReady() {
  1624. defer spc.updatePodTracker.reset()
  1625. return spc.updatePodTracker.err
  1626. }
  1627. if !identityMatches(set, pod) {
  1628. updateIdentity(set, pod)
  1629. }
  1630. if !storageMatches(set, pod) {
  1631. updateStorage(set, pod)
  1632. for _, claim := range getPersistentVolumeClaims(set, pod) {
  1633. spc.claimsIndexer.Update(&claim)
  1634. }
  1635. }
  1636. spc.podsIndexer.Update(pod)
  1637. return nil
  1638. }
  1639. func (spc *fakeStatefulPodControl) DeleteStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error {
  1640. defer spc.deletePodTracker.inc()
  1641. if spc.deletePodTracker.errorReady() {
  1642. defer spc.deletePodTracker.reset()
  1643. return spc.deletePodTracker.err
  1644. }
  1645. if key, err := controller.KeyFunc(pod); err != nil {
  1646. return err
  1647. } else if obj, found, err := spc.podsIndexer.GetByKey(key); err != nil {
  1648. return err
  1649. } else if found {
  1650. spc.podsIndexer.Delete(obj)
  1651. }
  1652. return nil
  1653. }
  1654. var _ StatefulPodControlInterface = &fakeStatefulPodControl{}
  1655. type fakeStatefulSetStatusUpdater struct {
  1656. setsLister appslisters.StatefulSetLister
  1657. setsIndexer cache.Indexer
  1658. updateStatusTracker requestTracker
  1659. }
  1660. func newFakeStatefulSetStatusUpdater(setInformer appsinformers.StatefulSetInformer) *fakeStatefulSetStatusUpdater {
  1661. return &fakeStatefulSetStatusUpdater{
  1662. setInformer.Lister(),
  1663. setInformer.Informer().GetIndexer(),
  1664. requestTracker{0, nil, 0},
  1665. }
  1666. }
  1667. func (ssu *fakeStatefulSetStatusUpdater) UpdateStatefulSetStatus(set *apps.StatefulSet, status *apps.StatefulSetStatus) error {
  1668. defer ssu.updateStatusTracker.inc()
  1669. if ssu.updateStatusTracker.errorReady() {
  1670. defer ssu.updateStatusTracker.reset()
  1671. return ssu.updateStatusTracker.err
  1672. }
  1673. set.Status = *status
  1674. ssu.setsIndexer.Update(set)
  1675. return nil
  1676. }
  1677. func (ssu *fakeStatefulSetStatusUpdater) SetUpdateStatefulSetStatusError(err error, after int) {
  1678. ssu.updateStatusTracker.err = err
  1679. ssu.updateStatusTracker.after = after
  1680. }
  1681. var _ StatefulSetStatusUpdaterInterface = &fakeStatefulSetStatusUpdater{}
  1682. func assertMonotonicInvariants(set *apps.StatefulSet, spc *fakeStatefulPodControl) error {
  1683. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  1684. if err != nil {
  1685. return err
  1686. }
  1687. pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  1688. if err != nil {
  1689. return err
  1690. }
  1691. sort.Sort(ascendingOrdinal(pods))
  1692. for ord := 0; ord < len(pods); ord++ {
  1693. if ord > 0 && isRunningAndReady(pods[ord]) && !isRunningAndReady(pods[ord-1]) {
  1694. return fmt.Errorf("successor %s is Running and Ready while %s is not", pods[ord].Name, pods[ord-1].Name)
  1695. }
  1696. if getOrdinal(pods[ord]) != ord {
  1697. return fmt.Errorf("pods %s deployed in the wrong order %d", pods[ord].Name, ord)
  1698. }
  1699. if !storageMatches(set, pods[ord]) {
  1700. return fmt.Errorf("pods %s does not match the storage specification of StatefulSet %s ", pods[ord].Name, set.Name)
  1701. }
  1702. for _, claim := range getPersistentVolumeClaims(set, pods[ord]) {
  1703. claim, err := spc.claimsLister.PersistentVolumeClaims(set.Namespace).Get(claim.Name)
  1704. if err != nil {
  1705. return err
  1706. }
  1707. if claim == nil {
  1708. return fmt.Errorf("claim %s for Pod %s was not created", claim.Name, pods[ord].Name)
  1709. }
  1710. }
  1711. if !identityMatches(set, pods[ord]) {
  1712. return fmt.Errorf("pods %s does not match the identity specification of StatefulSet %s ", pods[ord].Name, set.Name)
  1713. }
  1714. }
  1715. return nil
  1716. }
  1717. func assertBurstInvariants(set *apps.StatefulSet, spc *fakeStatefulPodControl) error {
  1718. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  1719. if err != nil {
  1720. return err
  1721. }
  1722. pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  1723. if err != nil {
  1724. return err
  1725. }
  1726. sort.Sort(ascendingOrdinal(pods))
  1727. for ord := 0; ord < len(pods); ord++ {
  1728. if !storageMatches(set, pods[ord]) {
  1729. return fmt.Errorf("pods %s does not match the storage specification of StatefulSet %s ", pods[ord].Name, set.Name)
  1730. }
  1731. for _, claim := range getPersistentVolumeClaims(set, pods[ord]) {
  1732. claim, err := spc.claimsLister.PersistentVolumeClaims(set.Namespace).Get(claim.Name)
  1733. if err != nil {
  1734. return err
  1735. }
  1736. if claim == nil {
  1737. return fmt.Errorf("claim %s for Pod %s was not created", claim.Name, pods[ord].Name)
  1738. }
  1739. }
  1740. if !identityMatches(set, pods[ord]) {
  1741. return fmt.Errorf("pods %s does not match the identity specification of StatefulSet %s ",
  1742. pods[ord].Name,
  1743. set.Name)
  1744. }
  1745. }
  1746. return nil
  1747. }
  1748. func assertUpdateInvariants(set *apps.StatefulSet, spc *fakeStatefulPodControl) error {
  1749. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  1750. if err != nil {
  1751. return err
  1752. }
  1753. pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  1754. if err != nil {
  1755. return err
  1756. }
  1757. sort.Sort(ascendingOrdinal(pods))
  1758. for ord := 0; ord < len(pods); ord++ {
  1759. if !storageMatches(set, pods[ord]) {
  1760. return fmt.Errorf("pod %s does not match the storage specification of StatefulSet %s ", pods[ord].Name, set.Name)
  1761. }
  1762. for _, claim := range getPersistentVolumeClaims(set, pods[ord]) {
  1763. claim, err := spc.claimsLister.PersistentVolumeClaims(set.Namespace).Get(claim.Name)
  1764. if err != nil {
  1765. return err
  1766. }
  1767. if claim == nil {
  1768. return fmt.Errorf("claim %s for Pod %s was not created", claim.Name, pods[ord].Name)
  1769. }
  1770. }
  1771. if !identityMatches(set, pods[ord]) {
  1772. return fmt.Errorf("pod %s does not match the identity specification of StatefulSet %s ", pods[ord].Name, set.Name)
  1773. }
  1774. }
  1775. if set.Spec.UpdateStrategy.Type == apps.OnDeleteStatefulSetStrategyType {
  1776. return nil
  1777. }
  1778. if set.Spec.UpdateStrategy.Type == apps.RollingUpdateStatefulSetStrategyType {
  1779. for i := 0; i < int(set.Status.CurrentReplicas) && i < len(pods); i++ {
  1780. if want, got := set.Status.CurrentRevision, getPodRevision(pods[i]); want != got {
  1781. return fmt.Errorf("pod %s want current revision %s got %s", pods[i].Name, want, got)
  1782. }
  1783. }
  1784. for i, j := len(pods)-1, 0; j < int(set.Status.UpdatedReplicas); i, j = i-1, j+1 {
  1785. if want, got := set.Status.UpdateRevision, getPodRevision(pods[i]); want != got {
  1786. return fmt.Errorf("pod %s want update revision %s got %s", pods[i].Name, want, got)
  1787. }
  1788. }
  1789. }
  1790. return nil
  1791. }
  1792. func fakeResourceVersion(object interface{}) {
  1793. obj, isObj := object.(metav1.Object)
  1794. if !isObj {
  1795. return
  1796. }
  1797. if version := obj.GetResourceVersion(); version == "" {
  1798. obj.SetResourceVersion("1")
  1799. } else if intValue, err := strconv.ParseInt(version, 10, 32); err == nil {
  1800. obj.SetResourceVersion(strconv.FormatInt(intValue+1, 10))
  1801. }
  1802. }
  1803. func scaleUpStatefulSetControl(set *apps.StatefulSet,
  1804. ssc StatefulSetControlInterface,
  1805. spc *fakeStatefulPodControl,
  1806. invariants invariantFunc) error {
  1807. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  1808. if err != nil {
  1809. return err
  1810. }
  1811. for set.Status.ReadyReplicas < *set.Spec.Replicas {
  1812. pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  1813. if err != nil {
  1814. return err
  1815. }
  1816. sort.Sort(ascendingOrdinal(pods))
  1817. // ensure all pods are valid (have a phase)
  1818. initialized := false
  1819. for ord, pod := range pods {
  1820. if pod.Status.Phase == "" {
  1821. if pods, err = spc.setPodPending(set, ord); err != nil {
  1822. return err
  1823. }
  1824. break
  1825. }
  1826. }
  1827. if initialized {
  1828. continue
  1829. }
  1830. // select one of the pods and move it forward in status
  1831. if len(pods) > 0 {
  1832. ord := int(rand.Int63n(int64(len(pods))))
  1833. pod := pods[ord]
  1834. switch pod.Status.Phase {
  1835. case v1.PodPending:
  1836. if pods, err = spc.setPodRunning(set, ord); err != nil {
  1837. return err
  1838. }
  1839. case v1.PodRunning:
  1840. if pods, err = spc.setPodReady(set, ord); err != nil {
  1841. return err
  1842. }
  1843. default:
  1844. continue
  1845. }
  1846. }
  1847. // run the controller once and check invariants
  1848. if err = ssc.UpdateStatefulSet(set, pods); err != nil {
  1849. return err
  1850. }
  1851. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  1852. if err != nil {
  1853. return err
  1854. }
  1855. if err := invariants(set, spc); err != nil {
  1856. return err
  1857. }
  1858. }
  1859. return invariants(set, spc)
  1860. }
  1861. func scaleDownStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlInterface, spc *fakeStatefulPodControl, invariants invariantFunc) error {
  1862. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  1863. if err != nil {
  1864. return err
  1865. }
  1866. for set.Status.Replicas > *set.Spec.Replicas {
  1867. pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  1868. if err != nil {
  1869. return err
  1870. }
  1871. sort.Sort(ascendingOrdinal(pods))
  1872. if ordinal := len(pods) - 1; ordinal >= 0 {
  1873. if err := ssc.UpdateStatefulSet(set, pods); err != nil {
  1874. return err
  1875. }
  1876. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  1877. if err != nil {
  1878. return err
  1879. }
  1880. if pods, err = spc.addTerminatingPod(set, ordinal); err != nil {
  1881. return err
  1882. }
  1883. if err = ssc.UpdateStatefulSet(set, pods); err != nil {
  1884. return err
  1885. }
  1886. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  1887. if err != nil {
  1888. return err
  1889. }
  1890. pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
  1891. if err != nil {
  1892. return err
  1893. }
  1894. sort.Sort(ascendingOrdinal(pods))
  1895. if len(pods) > 0 {
  1896. spc.podsIndexer.Delete(pods[len(pods)-1])
  1897. }
  1898. }
  1899. if err := ssc.UpdateStatefulSet(set, pods); err != nil {
  1900. return err
  1901. }
  1902. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  1903. if err != nil {
  1904. return err
  1905. }
  1906. if err := invariants(set, spc); err != nil {
  1907. return err
  1908. }
  1909. }
  1910. return invariants(set, spc)
  1911. }
  1912. func updateComplete(set *apps.StatefulSet, pods []*v1.Pod) bool {
  1913. sort.Sort(ascendingOrdinal(pods))
  1914. if len(pods) != int(*set.Spec.Replicas) {
  1915. return false
  1916. }
  1917. if set.Status.ReadyReplicas != *set.Spec.Replicas {
  1918. return false
  1919. }
  1920. switch set.Spec.UpdateStrategy.Type {
  1921. case apps.OnDeleteStatefulSetStrategyType:
  1922. return true
  1923. case apps.RollingUpdateStatefulSetStrategyType:
  1924. if set.Spec.UpdateStrategy.RollingUpdate == nil || *set.Spec.UpdateStrategy.RollingUpdate.Partition <= 0 {
  1925. if set.Status.CurrentReplicas < *set.Spec.Replicas {
  1926. return false
  1927. }
  1928. for i := range pods {
  1929. if getPodRevision(pods[i]) != set.Status.CurrentRevision {
  1930. return false
  1931. }
  1932. }
  1933. } else {
  1934. partition := int(*set.Spec.UpdateStrategy.RollingUpdate.Partition)
  1935. if len(pods) < partition {
  1936. return false
  1937. }
  1938. for i := partition; i < len(pods); i++ {
  1939. if getPodRevision(pods[i]) != set.Status.UpdateRevision {
  1940. return false
  1941. }
  1942. }
  1943. }
  1944. }
  1945. return true
  1946. }
  1947. func updateStatefulSetControl(set *apps.StatefulSet,
  1948. ssc StatefulSetControlInterface,
  1949. spc *fakeStatefulPodControl,
  1950. invariants invariantFunc) error {
  1951. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  1952. if err != nil {
  1953. return err
  1954. }
  1955. pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  1956. if err != nil {
  1957. return err
  1958. }
  1959. if err = ssc.UpdateStatefulSet(set, pods); err != nil {
  1960. return err
  1961. }
  1962. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  1963. if err != nil {
  1964. return err
  1965. }
  1966. pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
  1967. if err != nil {
  1968. return err
  1969. }
  1970. for !updateComplete(set, pods) {
  1971. pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
  1972. if err != nil {
  1973. return err
  1974. }
  1975. sort.Sort(ascendingOrdinal(pods))
  1976. initialized := false
  1977. for ord, pod := range pods {
  1978. if pod.Status.Phase == "" {
  1979. if pods, err = spc.setPodPending(set, ord); err != nil {
  1980. return err
  1981. }
  1982. break
  1983. }
  1984. }
  1985. if initialized {
  1986. continue
  1987. }
  1988. if len(pods) > 0 {
  1989. ord := int(rand.Int63n(int64(len(pods))))
  1990. pod := pods[ord]
  1991. switch pod.Status.Phase {
  1992. case v1.PodPending:
  1993. if pods, err = spc.setPodRunning(set, ord); err != nil {
  1994. return err
  1995. }
  1996. case v1.PodRunning:
  1997. if pods, err = spc.setPodReady(set, ord); err != nil {
  1998. return err
  1999. }
  2000. default:
  2001. continue
  2002. }
  2003. }
  2004. if err = ssc.UpdateStatefulSet(set, pods); err != nil {
  2005. return err
  2006. }
  2007. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  2008. if err != nil {
  2009. return err
  2010. }
  2011. if err := invariants(set, spc); err != nil {
  2012. return err
  2013. }
  2014. pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
  2015. if err != nil {
  2016. return err
  2017. }
  2018. }
  2019. return invariants(set, spc)
  2020. }
  2021. func newRevisionOrDie(set *apps.StatefulSet, revision int64) *apps.ControllerRevision {
  2022. rev, err := newRevision(set, revision, set.Status.CollisionCount)
  2023. if err != nil {
  2024. panic(err)
  2025. }
  2026. return rev
  2027. }