stateful_set_control_test.go 68 KB


  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package statefulset
  14. import (
  15. "errors"
  16. "fmt"
  17. "math/rand"
  18. "reflect"
  19. "runtime"
  20. "sort"
  21. "strconv"
  22. "strings"
  23. "testing"
  24. "time"
  25. apps "k8s.io/api/apps/v1"
  26. v1 "k8s.io/api/core/v1"
  27. apierrors "k8s.io/apimachinery/pkg/api/errors"
  28. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  29. "k8s.io/apimachinery/pkg/labels"
  30. utilerrors "k8s.io/apimachinery/pkg/util/errors"
  31. "k8s.io/client-go/informers"
  32. appsinformers "k8s.io/client-go/informers/apps/v1"
  33. coreinformers "k8s.io/client-go/informers/core/v1"
  34. clientset "k8s.io/client-go/kubernetes"
  35. "k8s.io/client-go/kubernetes/fake"
  36. appslisters "k8s.io/client-go/listers/apps/v1"
  37. corelisters "k8s.io/client-go/listers/core/v1"
  38. "k8s.io/client-go/tools/cache"
  39. "k8s.io/client-go/tools/record"
  40. podutil "k8s.io/kubernetes/pkg/api/v1/pod"
  41. "k8s.io/kubernetes/pkg/controller"
  42. "k8s.io/kubernetes/pkg/controller/history"
  43. )
  44. type invariantFunc func(set *apps.StatefulSet, spc *fakeStatefulPodControl) error
  45. func setupController(client clientset.Interface) (*fakeStatefulPodControl, *fakeStatefulSetStatusUpdater, StatefulSetControlInterface, chan struct{}) {
  46. informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
  47. spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1().StatefulSets(), informerFactory.Apps().V1().ControllerRevisions())
  48. ssu := newFakeStatefulSetStatusUpdater(informerFactory.Apps().V1().StatefulSets())
  49. recorder := record.NewFakeRecorder(10)
  50. ssc := NewDefaultStatefulSetControl(spc, ssu, history.NewFakeHistory(informerFactory.Apps().V1().ControllerRevisions()), recorder)
  51. stop := make(chan struct{})
  52. informerFactory.Start(stop)
  53. cache.WaitForCacheSync(
  54. stop,
  55. informerFactory.Apps().V1().StatefulSets().Informer().HasSynced,
  56. informerFactory.Core().V1().Pods().Informer().HasSynced,
  57. informerFactory.Apps().V1().ControllerRevisions().Informer().HasSynced,
  58. )
  59. return spc, ssu, ssc, stop
  60. }
  61. func burst(set *apps.StatefulSet) *apps.StatefulSet {
  62. set.Spec.PodManagementPolicy = apps.ParallelPodManagement
  63. return set
  64. }
  65. func TestStatefulSetControl(t *testing.T) {
  66. simpleSetFn := func() *apps.StatefulSet { return newStatefulSet(3) }
  67. largeSetFn := func() *apps.StatefulSet { return newStatefulSet(5) }
  68. testCases := []struct {
  69. fn func(*testing.T, *apps.StatefulSet, invariantFunc)
  70. obj func() *apps.StatefulSet
  71. }{
  72. {CreatesPods, simpleSetFn},
  73. {ScalesUp, simpleSetFn},
  74. {ScalesDown, simpleSetFn},
  75. {ReplacesPods, largeSetFn},
  76. {RecreatesFailedPod, simpleSetFn},
  77. {CreatePodFailure, simpleSetFn},
  78. {UpdatePodFailure, simpleSetFn},
  79. {UpdateSetStatusFailure, simpleSetFn},
  80. {PodRecreateDeleteFailure, simpleSetFn},
  81. }
  82. for _, testCase := range testCases {
  83. fnName := runtime.FuncForPC(reflect.ValueOf(testCase.fn).Pointer()).Name()
  84. if i := strings.LastIndex(fnName, "."); i != -1 {
  85. fnName = fnName[i+1:]
  86. }
  87. t.Run(
  88. fmt.Sprintf("%s/Monotonic", fnName),
  89. func(t *testing.T) {
  90. testCase.fn(t, testCase.obj(), assertMonotonicInvariants)
  91. },
  92. )
  93. t.Run(
  94. fmt.Sprintf("%s/Burst", fnName),
  95. func(t *testing.T) {
  96. set := burst(testCase.obj())
  97. testCase.fn(t, set, assertBurstInvariants)
  98. },
  99. )
  100. }
  101. }
  102. func CreatesPods(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
  103. client := fake.NewSimpleClientset(set)
  104. spc, _, ssc, stop := setupController(client)
  105. defer close(stop)
  106. if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil {
  107. t.Errorf("Failed to turn up StatefulSet : %s", err)
  108. }
  109. var err error
  110. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  111. if err != nil {
  112. t.Fatalf("Error getting updated StatefulSet: %v", err)
  113. }
  114. if set.Status.Replicas != 3 {
  115. t.Error("Failed to scale statefulset to 3 replicas")
  116. }
  117. if set.Status.ReadyReplicas != 3 {
  118. t.Error("Failed to set ReadyReplicas correctly")
  119. }
  120. if set.Status.UpdatedReplicas != 3 {
  121. t.Error("Failed to set UpdatedReplicas correctly")
  122. }
  123. }
  124. func ScalesUp(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
  125. client := fake.NewSimpleClientset(set)
  126. spc, _, ssc, stop := setupController(client)
  127. defer close(stop)
  128. if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil {
  129. t.Errorf("Failed to turn up StatefulSet : %s", err)
  130. }
  131. *set.Spec.Replicas = 4
  132. if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil {
  133. t.Errorf("Failed to scale StatefulSet : %s", err)
  134. }
  135. var err error
  136. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  137. if err != nil {
  138. t.Fatalf("Error getting updated StatefulSet: %v", err)
  139. }
  140. if set.Status.Replicas != 4 {
  141. t.Error("Failed to scale statefulset to 4 replicas")
  142. }
  143. if set.Status.ReadyReplicas != 4 {
  144. t.Error("Failed to set readyReplicas correctly")
  145. }
  146. if set.Status.UpdatedReplicas != 4 {
  147. t.Error("Failed to set updatedReplicas correctly")
  148. }
  149. }
  150. func ScalesDown(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
  151. client := fake.NewSimpleClientset(set)
  152. spc, _, ssc, stop := setupController(client)
  153. defer close(stop)
  154. if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil {
  155. t.Errorf("Failed to turn up StatefulSet : %s", err)
  156. }
  157. *set.Spec.Replicas = 0
  158. if err := scaleDownStatefulSetControl(set, ssc, spc, invariants); err != nil {
  159. t.Errorf("Failed to scale StatefulSet : %s", err)
  160. }
  161. if set.Status.Replicas != 0 {
  162. t.Error("Failed to scale statefulset to 0 replicas")
  163. }
  164. if set.Status.ReadyReplicas != 0 {
  165. t.Error("Failed to set readyReplicas correctly")
  166. }
  167. if set.Status.UpdatedReplicas != 0 {
  168. t.Error("Failed to set updatedReplicas correctly")
  169. }
  170. }
  171. func ReplacesPods(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
  172. client := fake.NewSimpleClientset(set)
  173. spc, _, ssc, stop := setupController(client)
  174. defer close(stop)
  175. if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil {
  176. t.Errorf("Failed to turn up StatefulSet : %s", err)
  177. }
  178. var err error
  179. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  180. if err != nil {
  181. t.Fatalf("Error getting updated StatefulSet: %v", err)
  182. }
  183. if set.Status.Replicas != 5 {
  184. t.Error("Failed to scale statefulset to 5 replicas")
  185. }
  186. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  187. if err != nil {
  188. t.Error(err)
  189. }
  190. pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  191. if err != nil {
  192. t.Error(err)
  193. }
  194. sort.Sort(ascendingOrdinal(pods))
  195. spc.podsIndexer.Delete(pods[0])
  196. spc.podsIndexer.Delete(pods[2])
  197. spc.podsIndexer.Delete(pods[4])
  198. for i := 0; i < 5; i += 2 {
  199. pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  200. if err != nil {
  201. t.Error(err)
  202. }
  203. if err = ssc.UpdateStatefulSet(set, pods); err != nil {
  204. t.Errorf("Failed to update StatefulSet : %s", err)
  205. }
  206. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  207. if err != nil {
  208. t.Fatalf("Error getting updated StatefulSet: %v", err)
  209. }
  210. if pods, err = spc.setPodRunning(set, i); err != nil {
  211. t.Error(err)
  212. }
  213. if err = ssc.UpdateStatefulSet(set, pods); err != nil {
  214. t.Errorf("Failed to update StatefulSet : %s", err)
  215. }
  216. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  217. if err != nil {
  218. t.Fatalf("Error getting updated StatefulSet: %v", err)
  219. }
  220. if pods, err = spc.setPodReady(set, i); err != nil {
  221. t.Error(err)
  222. }
  223. }
  224. pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
  225. if err != nil {
  226. t.Error(err)
  227. }
  228. if err := ssc.UpdateStatefulSet(set, pods); err != nil {
  229. t.Errorf("Failed to update StatefulSet : %s", err)
  230. }
  231. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  232. if err != nil {
  233. t.Fatalf("Error getting updated StatefulSet: %v", err)
  234. }
  235. if e, a := int32(5), set.Status.Replicas; e != a {
  236. t.Errorf("Expected to scale to %d, got %d", e, a)
  237. }
  238. }
  239. func RecreatesFailedPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
  240. client := fake.NewSimpleClientset()
  241. spc, _, ssc, stop := setupController(client)
  242. defer close(stop)
  243. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  244. if err != nil {
  245. t.Error(err)
  246. }
  247. pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  248. if err != nil {
  249. t.Error(err)
  250. }
  251. if err := ssc.UpdateStatefulSet(set, pods); err != nil {
  252. t.Errorf("Error updating StatefulSet %s", err)
  253. }
  254. if err := invariants(set, spc); err != nil {
  255. t.Error(err)
  256. }
  257. pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
  258. if err != nil {
  259. t.Error(err)
  260. }
  261. pods[0].Status.Phase = v1.PodFailed
  262. spc.podsIndexer.Update(pods[0])
  263. if err := ssc.UpdateStatefulSet(set, pods); err != nil {
  264. t.Errorf("Error updating StatefulSet %s", err)
  265. }
  266. if err := invariants(set, spc); err != nil {
  267. t.Error(err)
  268. }
  269. pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
  270. if err != nil {
  271. t.Error(err)
  272. }
  273. if isCreated(pods[0]) {
  274. t.Error("StatefulSet did not recreate failed Pod")
  275. }
  276. }
  277. func CreatePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
  278. client := fake.NewSimpleClientset(set)
  279. spc, _, ssc, stop := setupController(client)
  280. defer close(stop)
  281. spc.SetCreateStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 2)
  282. if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil && isOrHasInternalError(err) {
  283. t.Errorf("StatefulSetControl did not return InternalError found %s", err)
  284. }
  285. if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil {
  286. t.Errorf("Failed to turn up StatefulSet : %s", err)
  287. }
  288. var err error
  289. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  290. if err != nil {
  291. t.Fatalf("Error getting updated StatefulSet: %v", err)
  292. }
  293. if set.Status.Replicas != 3 {
  294. t.Error("Failed to scale StatefulSet to 3 replicas")
  295. }
  296. if set.Status.ReadyReplicas != 3 {
  297. t.Error("Failed to set readyReplicas correctly")
  298. }
  299. if set.Status.UpdatedReplicas != 3 {
  300. t.Error("Failed to updatedReplicas correctly")
  301. }
  302. }
  303. func UpdatePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
  304. client := fake.NewSimpleClientset(set)
  305. spc, _, ssc, stop := setupController(client)
  306. defer close(stop)
  307. spc.SetUpdateStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 0)
  308. // have to have 1 successful loop first
  309. if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil {
  310. t.Fatalf("Unexpected error: %v", err)
  311. }
  312. var err error
  313. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  314. if err != nil {
  315. t.Fatalf("Error getting updated StatefulSet: %v", err)
  316. }
  317. if set.Status.Replicas != 3 {
  318. t.Error("Failed to scale StatefulSet to 3 replicas")
  319. }
  320. if set.Status.ReadyReplicas != 3 {
  321. t.Error("Failed to set readyReplicas correctly")
  322. }
  323. if set.Status.UpdatedReplicas != 3 {
  324. t.Error("Failed to set updatedReplicas correctly")
  325. }
  326. // now mutate a pod's identity
  327. pods, err := spc.podsLister.List(labels.Everything())
  328. if err != nil {
  329. t.Fatalf("Error listing pods: %v", err)
  330. }
  331. if len(pods) != 3 {
  332. t.Fatalf("Expected 3 pods, got %d", len(pods))
  333. }
  334. sort.Sort(ascendingOrdinal(pods))
  335. pods[0].Name = "goo-0"
  336. spc.podsIndexer.Update(pods[0])
  337. // now it should fail
  338. if err := ssc.UpdateStatefulSet(set, pods); err != nil && isOrHasInternalError(err) {
  339. t.Errorf("StatefulSetControl did not return InternalError found %s", err)
  340. }
  341. }
  342. func UpdateSetStatusFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
  343. client := fake.NewSimpleClientset(set)
  344. spc, ssu, ssc, stop := setupController(client)
  345. defer close(stop)
  346. ssu.SetUpdateStatefulSetStatusError(apierrors.NewInternalError(errors.New("API server failed")), 2)
  347. if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil && isOrHasInternalError(err) {
  348. t.Errorf("StatefulSetControl did not return InternalError found %s", err)
  349. }
  350. if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil {
  351. t.Errorf("Failed to turn up StatefulSet : %s", err)
  352. }
  353. var err error
  354. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  355. if err != nil {
  356. t.Fatalf("Error getting updated StatefulSet: %v", err)
  357. }
  358. if set.Status.Replicas != 3 {
  359. t.Error("Failed to scale StatefulSet to 3 replicas")
  360. }
  361. if set.Status.ReadyReplicas != 3 {
  362. t.Error("Failed to set readyReplicas to 3")
  363. }
  364. if set.Status.UpdatedReplicas != 3 {
  365. t.Error("Failed to set updatedReplicas to 3")
  366. }
  367. }
  368. func PodRecreateDeleteFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
  369. client := fake.NewSimpleClientset(set)
  370. spc, _, ssc, stop := setupController(client)
  371. defer close(stop)
  372. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  373. if err != nil {
  374. t.Error(err)
  375. }
  376. pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  377. if err != nil {
  378. t.Error(err)
  379. }
  380. if err := ssc.UpdateStatefulSet(set, pods); err != nil {
  381. t.Errorf("Error updating StatefulSet %s", err)
  382. }
  383. if err := invariants(set, spc); err != nil {
  384. t.Error(err)
  385. }
  386. pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
  387. if err != nil {
  388. t.Error(err)
  389. }
  390. pods[0].Status.Phase = v1.PodFailed
  391. spc.podsIndexer.Update(pods[0])
  392. spc.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 0)
  393. if err := ssc.UpdateStatefulSet(set, pods); err != nil && isOrHasInternalError(err) {
  394. t.Errorf("StatefulSet failed to %s", err)
  395. }
  396. if err := invariants(set, spc); err != nil {
  397. t.Error(err)
  398. }
  399. if err := ssc.UpdateStatefulSet(set, pods); err != nil {
  400. t.Errorf("Error updating StatefulSet %s", err)
  401. }
  402. if err := invariants(set, spc); err != nil {
  403. t.Error(err)
  404. }
  405. pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
  406. if err != nil {
  407. t.Error(err)
  408. }
  409. if isCreated(pods[0]) {
  410. t.Error("StatefulSet did not recreate failed Pod")
  411. }
  412. }
  413. func TestStatefulSetControlScaleDownDeleteError(t *testing.T) {
  414. invariants := assertMonotonicInvariants
  415. set := newStatefulSet(3)
  416. client := fake.NewSimpleClientset(set)
  417. spc, _, ssc, stop := setupController(client)
  418. defer close(stop)
  419. if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil {
  420. t.Errorf("Failed to turn up StatefulSet : %s", err)
  421. }
  422. var err error
  423. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  424. if err != nil {
  425. t.Fatalf("Error getting updated StatefulSet: %v", err)
  426. }
  427. *set.Spec.Replicas = 0
  428. spc.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 2)
  429. if err := scaleDownStatefulSetControl(set, ssc, spc, invariants); err != nil && isOrHasInternalError(err) {
  430. t.Errorf("StatefulSetControl failed to throw error on delete %s", err)
  431. }
  432. if err := scaleDownStatefulSetControl(set, ssc, spc, invariants); err != nil {
  433. t.Errorf("Failed to turn down StatefulSet %s", err)
  434. }
  435. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  436. if err != nil {
  437. t.Fatalf("Error getting updated StatefulSet: %v", err)
  438. }
  439. if set.Status.Replicas != 0 {
  440. t.Error("Failed to scale statefulset to 0 replicas")
  441. }
  442. if set.Status.ReadyReplicas != 0 {
  443. t.Error("Failed to set readyReplicas to 0")
  444. }
  445. if set.Status.UpdatedReplicas != 0 {
  446. t.Error("Failed to set updatedReplicas to 0")
  447. }
  448. }
  449. func TestStatefulSetControl_getSetRevisions(t *testing.T) {
  450. type testcase struct {
  451. name string
  452. existing []*apps.ControllerRevision
  453. set *apps.StatefulSet
  454. expectedCount int
  455. expectedCurrent *apps.ControllerRevision
  456. expectedUpdate *apps.ControllerRevision
  457. err bool
  458. }
  459. testFn := func(test *testcase, t *testing.T) {
  460. client := fake.NewSimpleClientset()
  461. informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
  462. spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1().StatefulSets(), informerFactory.Apps().V1().ControllerRevisions())
  463. ssu := newFakeStatefulSetStatusUpdater(informerFactory.Apps().V1().StatefulSets())
  464. recorder := record.NewFakeRecorder(10)
  465. ssc := defaultStatefulSetControl{spc, ssu, history.NewFakeHistory(informerFactory.Apps().V1().ControllerRevisions()), recorder}
  466. stop := make(chan struct{})
  467. defer close(stop)
  468. informerFactory.Start(stop)
  469. cache.WaitForCacheSync(
  470. stop,
  471. informerFactory.Apps().V1().StatefulSets().Informer().HasSynced,
  472. informerFactory.Core().V1().Pods().Informer().HasSynced,
  473. informerFactory.Apps().V1().ControllerRevisions().Informer().HasSynced,
  474. )
  475. test.set.Status.CollisionCount = new(int32)
  476. for i := range test.existing {
  477. ssc.controllerHistory.CreateControllerRevision(test.set, test.existing[i], test.set.Status.CollisionCount)
  478. }
  479. revisions, err := ssc.ListRevisions(test.set)
  480. if err != nil {
  481. t.Fatal(err)
  482. }
  483. current, update, _, err := ssc.getStatefulSetRevisions(test.set, revisions)
  484. if err != nil {
  485. t.Fatalf("error getting statefulset revisions:%v", err)
  486. }
  487. revisions, err = ssc.ListRevisions(test.set)
  488. if err != nil {
  489. t.Fatal(err)
  490. }
  491. if len(revisions) != test.expectedCount {
  492. t.Errorf("%s: want %d revisions got %d", test.name, test.expectedCount, len(revisions))
  493. }
  494. if test.err && err == nil {
  495. t.Errorf("%s: expected error", test.name)
  496. }
  497. if !test.err && !history.EqualRevision(current, test.expectedCurrent) {
  498. t.Errorf("%s: for current want %v got %v", test.name, test.expectedCurrent, current)
  499. }
  500. if !test.err && !history.EqualRevision(update, test.expectedUpdate) {
  501. t.Errorf("%s: for update want %v got %v", test.name, test.expectedUpdate, update)
  502. }
  503. if !test.err && test.expectedCurrent != nil && current != nil && test.expectedCurrent.Revision != current.Revision {
  504. t.Errorf("%s: for current revision want %d got %d", test.name, test.expectedCurrent.Revision, current.Revision)
  505. }
  506. if !test.err && test.expectedUpdate != nil && update != nil && test.expectedUpdate.Revision != update.Revision {
  507. t.Errorf("%s: for update revision want %d got %d", test.name, test.expectedUpdate.Revision, update.Revision)
  508. }
  509. }
  510. updateRevision := func(cr *apps.ControllerRevision, revision int64) *apps.ControllerRevision {
  511. clone := cr.DeepCopy()
  512. clone.Revision = revision
  513. return clone
  514. }
  515. set := newStatefulSet(3)
  516. set.Status.CollisionCount = new(int32)
  517. rev0 := newRevisionOrDie(set, 1)
  518. set1 := set.DeepCopy()
  519. set1.Spec.Template.Spec.Containers[0].Image = "foo"
  520. set1.Status.CurrentRevision = rev0.Name
  521. set1.Status.CollisionCount = new(int32)
  522. rev1 := newRevisionOrDie(set1, 2)
  523. set2 := set1.DeepCopy()
  524. set2.Spec.Template.Labels["new"] = "label"
  525. set2.Status.CurrentRevision = rev0.Name
  526. set2.Status.CollisionCount = new(int32)
  527. rev2 := newRevisionOrDie(set2, 3)
  528. tests := []testcase{
  529. {
  530. name: "creates initial revision",
  531. existing: nil,
  532. set: set,
  533. expectedCount: 1,
  534. expectedCurrent: rev0,
  535. expectedUpdate: rev0,
  536. err: false,
  537. },
  538. {
  539. name: "creates revision on update",
  540. existing: []*apps.ControllerRevision{rev0},
  541. set: set1,
  542. expectedCount: 2,
  543. expectedCurrent: rev0,
  544. expectedUpdate: rev1,
  545. err: false,
  546. },
  547. {
  548. name: "must not recreate a new revision of same set",
  549. existing: []*apps.ControllerRevision{rev0, rev1},
  550. set: set1,
  551. expectedCount: 2,
  552. expectedCurrent: rev0,
  553. expectedUpdate: rev1,
  554. err: false,
  555. },
  556. {
  557. name: "must rollback to a previous revision",
  558. existing: []*apps.ControllerRevision{rev0, rev1, rev2},
  559. set: set1,
  560. expectedCount: 3,
  561. expectedCurrent: rev0,
  562. expectedUpdate: updateRevision(rev1, 4),
  563. err: false,
  564. },
  565. }
  566. for i := range tests {
  567. testFn(&tests[i], t)
  568. }
  569. }
  570. func TestStatefulSetControlRollingUpdate(t *testing.T) {
  571. type testcase struct {
  572. name string
  573. invariants func(set *apps.StatefulSet, spc *fakeStatefulPodControl) error
  574. initial func() *apps.StatefulSet
  575. update func(set *apps.StatefulSet) *apps.StatefulSet
  576. validate func(set *apps.StatefulSet, pods []*v1.Pod) error
  577. }
  578. testFn := func(test *testcase, t *testing.T) {
  579. set := test.initial()
  580. client := fake.NewSimpleClientset(set)
  581. spc, _, ssc, stop := setupController(client)
  582. defer close(stop)
  583. if err := scaleUpStatefulSetControl(set, ssc, spc, test.invariants); err != nil {
  584. t.Fatalf("%s: %s", test.name, err)
  585. }
  586. set, err := spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  587. if err != nil {
  588. t.Fatalf("%s: %s", test.name, err)
  589. }
  590. set = test.update(set)
  591. if err := updateStatefulSetControl(set, ssc, spc, assertUpdateInvariants); err != nil {
  592. t.Fatalf("%s: %s", test.name, err)
  593. }
  594. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  595. if err != nil {
  596. t.Fatalf("%s: %s", test.name, err)
  597. }
  598. pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  599. if err != nil {
  600. t.Fatalf("%s: %s", test.name, err)
  601. }
  602. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  603. if err != nil {
  604. t.Fatalf("%s: %s", test.name, err)
  605. }
  606. if err := test.validate(set, pods); err != nil {
  607. t.Fatalf("%s: %s", test.name, err)
  608. }
  609. }
  610. tests := []testcase{
  611. {
  612. name: "monotonic image update",
  613. invariants: assertMonotonicInvariants,
  614. initial: func() *apps.StatefulSet {
  615. return newStatefulSet(3)
  616. },
  617. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  618. set.Spec.Template.Spec.Containers[0].Image = "foo"
  619. return set
  620. },
  621. validate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  622. sort.Sort(ascendingOrdinal(pods))
  623. for i := range pods {
  624. if pods[i].Spec.Containers[0].Image != "foo" {
  625. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  626. }
  627. }
  628. return nil
  629. },
  630. },
  631. {
  632. name: "monotonic image update and scale up",
  633. invariants: assertMonotonicInvariants,
  634. initial: func() *apps.StatefulSet {
  635. return newStatefulSet(3)
  636. },
  637. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  638. *set.Spec.Replicas = 5
  639. set.Spec.Template.Spec.Containers[0].Image = "foo"
  640. return set
  641. },
  642. validate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  643. sort.Sort(ascendingOrdinal(pods))
  644. for i := range pods {
  645. if pods[i].Spec.Containers[0].Image != "foo" {
  646. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  647. }
  648. }
  649. return nil
  650. },
  651. },
  652. {
  653. name: "monotonic image update and scale down",
  654. invariants: assertMonotonicInvariants,
  655. initial: func() *apps.StatefulSet {
  656. return newStatefulSet(5)
  657. },
  658. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  659. *set.Spec.Replicas = 3
  660. set.Spec.Template.Spec.Containers[0].Image = "foo"
  661. return set
  662. },
  663. validate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  664. sort.Sort(ascendingOrdinal(pods))
  665. for i := range pods {
  666. if pods[i].Spec.Containers[0].Image != "foo" {
  667. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  668. }
  669. }
  670. return nil
  671. },
  672. },
  673. {
  674. name: "burst image update",
  675. invariants: assertBurstInvariants,
  676. initial: func() *apps.StatefulSet {
  677. return burst(newStatefulSet(3))
  678. },
  679. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  680. set.Spec.Template.Spec.Containers[0].Image = "foo"
  681. return set
  682. },
  683. validate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  684. sort.Sort(ascendingOrdinal(pods))
  685. for i := range pods {
  686. if pods[i].Spec.Containers[0].Image != "foo" {
  687. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  688. }
  689. }
  690. return nil
  691. },
  692. },
  693. {
  694. name: "burst image update and scale up",
  695. invariants: assertBurstInvariants,
  696. initial: func() *apps.StatefulSet {
  697. return burst(newStatefulSet(3))
  698. },
  699. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  700. *set.Spec.Replicas = 5
  701. set.Spec.Template.Spec.Containers[0].Image = "foo"
  702. return set
  703. },
  704. validate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  705. sort.Sort(ascendingOrdinal(pods))
  706. for i := range pods {
  707. if pods[i].Spec.Containers[0].Image != "foo" {
  708. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  709. }
  710. }
  711. return nil
  712. },
  713. },
  714. {
  715. name: "burst image update and scale down",
  716. invariants: assertBurstInvariants,
  717. initial: func() *apps.StatefulSet {
  718. return burst(newStatefulSet(5))
  719. },
  720. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  721. *set.Spec.Replicas = 3
  722. set.Spec.Template.Spec.Containers[0].Image = "foo"
  723. return set
  724. },
  725. validate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  726. sort.Sort(ascendingOrdinal(pods))
  727. for i := range pods {
  728. if pods[i].Spec.Containers[0].Image != "foo" {
  729. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  730. }
  731. }
  732. return nil
  733. },
  734. },
  735. }
  736. for i := range tests {
  737. testFn(&tests[i], t)
  738. }
  739. }
  740. func TestStatefulSetControlOnDeleteUpdate(t *testing.T) {
  741. type testcase struct {
  742. name string
  743. invariants func(set *apps.StatefulSet, spc *fakeStatefulPodControl) error
  744. initial func() *apps.StatefulSet
  745. update func(set *apps.StatefulSet) *apps.StatefulSet
  746. validateUpdate func(set *apps.StatefulSet, pods []*v1.Pod) error
  747. validateRestart func(set *apps.StatefulSet, pods []*v1.Pod) error
  748. }
  749. originalImage := newStatefulSet(3).Spec.Template.Spec.Containers[0].Image
  750. testFn := func(test *testcase, t *testing.T) {
  751. set := test.initial()
  752. set.Spec.UpdateStrategy = apps.StatefulSetUpdateStrategy{Type: apps.OnDeleteStatefulSetStrategyType}
  753. client := fake.NewSimpleClientset(set)
  754. spc, _, ssc, stop := setupController(client)
  755. defer close(stop)
  756. if err := scaleUpStatefulSetControl(set, ssc, spc, test.invariants); err != nil {
  757. t.Fatalf("%s: %s", test.name, err)
  758. }
  759. set, err := spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  760. if err != nil {
  761. t.Fatalf("%s: %s", test.name, err)
  762. }
  763. set = test.update(set)
  764. if err := updateStatefulSetControl(set, ssc, spc, assertUpdateInvariants); err != nil {
  765. t.Fatalf("%s: %s", test.name, err)
  766. }
  767. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  768. if err != nil {
  769. t.Fatalf("%s: %s", test.name, err)
  770. }
  771. pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  772. if err != nil {
  773. t.Fatalf("%s: %s", test.name, err)
  774. }
  775. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  776. if err != nil {
  777. t.Fatalf("%s: %s", test.name, err)
  778. }
  779. if err := test.validateUpdate(set, pods); err != nil {
  780. for i := range pods {
  781. t.Log(pods[i].Name)
  782. }
  783. t.Fatalf("%s: %s", test.name, err)
  784. }
  785. replicas := *set.Spec.Replicas
  786. *set.Spec.Replicas = 0
  787. if err := scaleDownStatefulSetControl(set, ssc, spc, test.invariants); err != nil {
  788. t.Fatalf("%s: %s", test.name, err)
  789. }
  790. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  791. if err != nil {
  792. t.Fatalf("%s: %s", test.name, err)
  793. }
  794. *set.Spec.Replicas = replicas
  795. if err := scaleUpStatefulSetControl(set, ssc, spc, test.invariants); err != nil {
  796. t.Fatalf("%s: %s", test.name, err)
  797. }
  798. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  799. if err != nil {
  800. t.Fatalf("%s: %s", test.name, err)
  801. }
  802. pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
  803. if err != nil {
  804. t.Fatalf("%s: %s", test.name, err)
  805. }
  806. if err := test.validateRestart(set, pods); err != nil {
  807. t.Fatalf("%s: %s", test.name, err)
  808. }
  809. }
  810. tests := []testcase{
  811. {
  812. name: "monotonic image update",
  813. invariants: assertMonotonicInvariants,
  814. initial: func() *apps.StatefulSet {
  815. return newStatefulSet(3)
  816. },
  817. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  818. set.Spec.Template.Spec.Containers[0].Image = "foo"
  819. return set
  820. },
  821. validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  822. sort.Sort(ascendingOrdinal(pods))
  823. for i := range pods {
  824. if pods[i].Spec.Containers[0].Image != originalImage {
  825. return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  826. }
  827. }
  828. return nil
  829. },
  830. validateRestart: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  831. sort.Sort(ascendingOrdinal(pods))
  832. for i := range pods {
  833. if pods[i].Spec.Containers[0].Image != "foo" {
  834. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  835. }
  836. }
  837. return nil
  838. },
  839. },
  840. {
  841. name: "monotonic image update and scale up",
  842. invariants: assertMonotonicInvariants,
  843. initial: func() *apps.StatefulSet {
  844. return newStatefulSet(3)
  845. },
  846. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  847. *set.Spec.Replicas = 5
  848. set.Spec.Template.Spec.Containers[0].Image = "foo"
  849. return set
  850. },
  851. validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  852. sort.Sort(ascendingOrdinal(pods))
  853. for i := range pods {
  854. if i < 3 && pods[i].Spec.Containers[0].Image != originalImage {
  855. return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  856. }
  857. if i >= 3 && pods[i].Spec.Containers[0].Image != "foo" {
  858. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  859. }
  860. }
  861. return nil
  862. },
  863. validateRestart: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  864. sort.Sort(ascendingOrdinal(pods))
  865. for i := range pods {
  866. if pods[i].Spec.Containers[0].Image != "foo" {
  867. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  868. }
  869. }
  870. return nil
  871. },
  872. },
  873. {
  874. name: "monotonic image update and scale down",
  875. invariants: assertMonotonicInvariants,
  876. initial: func() *apps.StatefulSet {
  877. return newStatefulSet(5)
  878. },
  879. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  880. *set.Spec.Replicas = 3
  881. set.Spec.Template.Spec.Containers[0].Image = "foo"
  882. return set
  883. },
  884. validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  885. sort.Sort(ascendingOrdinal(pods))
  886. for i := range pods {
  887. if pods[i].Spec.Containers[0].Image != originalImage {
  888. return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  889. }
  890. }
  891. return nil
  892. },
  893. validateRestart: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  894. sort.Sort(ascendingOrdinal(pods))
  895. for i := range pods {
  896. if pods[i].Spec.Containers[0].Image != "foo" {
  897. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  898. }
  899. }
  900. return nil
  901. },
  902. },
  903. {
  904. name: "burst image update",
  905. invariants: assertBurstInvariants,
  906. initial: func() *apps.StatefulSet {
  907. return burst(newStatefulSet(3))
  908. },
  909. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  910. set.Spec.Template.Spec.Containers[0].Image = "foo"
  911. return set
  912. },
  913. validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  914. sort.Sort(ascendingOrdinal(pods))
  915. for i := range pods {
  916. if pods[i].Spec.Containers[0].Image != originalImage {
  917. return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  918. }
  919. }
  920. return nil
  921. },
  922. validateRestart: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  923. sort.Sort(ascendingOrdinal(pods))
  924. for i := range pods {
  925. if pods[i].Spec.Containers[0].Image != "foo" {
  926. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  927. }
  928. }
  929. return nil
  930. },
  931. },
  932. {
  933. name: "burst image update and scale up",
  934. invariants: assertBurstInvariants,
  935. initial: func() *apps.StatefulSet {
  936. return burst(newStatefulSet(3))
  937. },
  938. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  939. *set.Spec.Replicas = 5
  940. set.Spec.Template.Spec.Containers[0].Image = "foo"
  941. return set
  942. },
  943. validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  944. sort.Sort(ascendingOrdinal(pods))
  945. for i := range pods {
  946. if i < 3 && pods[i].Spec.Containers[0].Image != originalImage {
  947. return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  948. }
  949. if i >= 3 && pods[i].Spec.Containers[0].Image != "foo" {
  950. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  951. }
  952. }
  953. return nil
  954. },
  955. validateRestart: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  956. sort.Sort(ascendingOrdinal(pods))
  957. for i := range pods {
  958. if pods[i].Spec.Containers[0].Image != "foo" {
  959. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  960. }
  961. }
  962. return nil
  963. },
  964. },
  965. {
  966. name: "burst image update and scale down",
  967. invariants: assertBurstInvariants,
  968. initial: func() *apps.StatefulSet {
  969. return burst(newStatefulSet(5))
  970. },
  971. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  972. *set.Spec.Replicas = 3
  973. set.Spec.Template.Spec.Containers[0].Image = "foo"
  974. return set
  975. },
  976. validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  977. sort.Sort(ascendingOrdinal(pods))
  978. for i := range pods {
  979. if pods[i].Spec.Containers[0].Image != originalImage {
  980. return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  981. }
  982. }
  983. return nil
  984. },
  985. validateRestart: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  986. sort.Sort(ascendingOrdinal(pods))
  987. for i := range pods {
  988. if pods[i].Spec.Containers[0].Image != "foo" {
  989. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  990. }
  991. }
  992. return nil
  993. },
  994. },
  995. }
  996. for i := range tests {
  997. testFn(&tests[i], t)
  998. }
  999. }
  1000. func TestStatefulSetControlRollingUpdateWithPartition(t *testing.T) {
  1001. type testcase struct {
  1002. name string
  1003. partition int32
  1004. invariants func(set *apps.StatefulSet, spc *fakeStatefulPodControl) error
  1005. initial func() *apps.StatefulSet
  1006. update func(set *apps.StatefulSet) *apps.StatefulSet
  1007. validate func(set *apps.StatefulSet, pods []*v1.Pod) error
  1008. }
  1009. testFn := func(test *testcase, t *testing.T) {
  1010. set := test.initial()
  1011. set.Spec.UpdateStrategy = apps.StatefulSetUpdateStrategy{
  1012. Type: apps.RollingUpdateStatefulSetStrategyType,
  1013. RollingUpdate: func() *apps.RollingUpdateStatefulSetStrategy {
  1014. return &apps.RollingUpdateStatefulSetStrategy{Partition: &test.partition}
  1015. }(),
  1016. }
  1017. client := fake.NewSimpleClientset(set)
  1018. spc, _, ssc, stop := setupController(client)
  1019. defer close(stop)
  1020. if err := scaleUpStatefulSetControl(set, ssc, spc, test.invariants); err != nil {
  1021. t.Fatalf("%s: %s", test.name, err)
  1022. }
  1023. set, err := spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  1024. if err != nil {
  1025. t.Fatalf("%s: %s", test.name, err)
  1026. }
  1027. set = test.update(set)
  1028. if err := updateStatefulSetControl(set, ssc, spc, assertUpdateInvariants); err != nil {
  1029. t.Fatalf("%s: %s", test.name, err)
  1030. }
  1031. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  1032. if err != nil {
  1033. t.Fatalf("%s: %s", test.name, err)
  1034. }
  1035. pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  1036. if err != nil {
  1037. t.Fatalf("%s: %s", test.name, err)
  1038. }
  1039. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  1040. if err != nil {
  1041. t.Fatalf("%s: %s", test.name, err)
  1042. }
  1043. if err := test.validate(set, pods); err != nil {
  1044. t.Fatalf("%s: %s", test.name, err)
  1045. }
  1046. }
  1047. originalImage := newStatefulSet(3).Spec.Template.Spec.Containers[0].Image
  1048. tests := []testcase{
  1049. {
  1050. name: "monotonic image update",
  1051. invariants: assertMonotonicInvariants,
  1052. partition: 2,
  1053. initial: func() *apps.StatefulSet {
  1054. return newStatefulSet(3)
  1055. },
  1056. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  1057. set.Spec.Template.Spec.Containers[0].Image = "foo"
  1058. return set
  1059. },
  1060. validate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1061. sort.Sort(ascendingOrdinal(pods))
  1062. for i := range pods {
  1063. if i < 2 && pods[i].Spec.Containers[0].Image != originalImage {
  1064. return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  1065. }
  1066. if i >= 2 && pods[i].Spec.Containers[0].Image != "foo" {
  1067. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  1068. }
  1069. }
  1070. return nil
  1071. },
  1072. },
  1073. {
  1074. name: "monotonic image update and scale up",
  1075. partition: 2,
  1076. invariants: assertMonotonicInvariants,
  1077. initial: func() *apps.StatefulSet {
  1078. return newStatefulSet(3)
  1079. },
  1080. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  1081. *set.Spec.Replicas = 5
  1082. set.Spec.Template.Spec.Containers[0].Image = "foo"
  1083. return set
  1084. },
  1085. validate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1086. sort.Sort(ascendingOrdinal(pods))
  1087. for i := range pods {
  1088. if i < 2 && pods[i].Spec.Containers[0].Image != originalImage {
  1089. return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  1090. }
  1091. if i >= 2 && pods[i].Spec.Containers[0].Image != "foo" {
  1092. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  1093. }
  1094. }
  1095. return nil
  1096. },
  1097. },
  1098. {
  1099. name: "burst image update",
  1100. partition: 2,
  1101. invariants: assertBurstInvariants,
  1102. initial: func() *apps.StatefulSet {
  1103. return burst(newStatefulSet(3))
  1104. },
  1105. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  1106. set.Spec.Template.Spec.Containers[0].Image = "foo"
  1107. return set
  1108. },
  1109. validate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1110. sort.Sort(ascendingOrdinal(pods))
  1111. for i := range pods {
  1112. if i < 2 && pods[i].Spec.Containers[0].Image != originalImage {
  1113. return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  1114. }
  1115. if i >= 2 && pods[i].Spec.Containers[0].Image != "foo" {
  1116. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  1117. }
  1118. }
  1119. return nil
  1120. },
  1121. },
  1122. {
  1123. name: "burst image update and scale up",
  1124. invariants: assertBurstInvariants,
  1125. partition: 2,
  1126. initial: func() *apps.StatefulSet {
  1127. return burst(newStatefulSet(3))
  1128. },
  1129. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  1130. *set.Spec.Replicas = 5
  1131. set.Spec.Template.Spec.Containers[0].Image = "foo"
  1132. return set
  1133. },
  1134. validate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1135. sort.Sort(ascendingOrdinal(pods))
  1136. for i := range pods {
  1137. if i < 2 && pods[i].Spec.Containers[0].Image != originalImage {
  1138. return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  1139. }
  1140. if i >= 2 && pods[i].Spec.Containers[0].Image != "foo" {
  1141. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  1142. }
  1143. }
  1144. return nil
  1145. },
  1146. },
  1147. }
  1148. for i := range tests {
  1149. testFn(&tests[i], t)
  1150. }
  1151. }
  1152. func TestStatefulSetHonorRevisionHistoryLimit(t *testing.T) {
  1153. invariants := assertMonotonicInvariants
  1154. set := newStatefulSet(3)
  1155. client := fake.NewSimpleClientset(set)
  1156. spc, ssu, ssc, stop := setupController(client)
  1157. defer close(stop)
  1158. if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil {
  1159. t.Errorf("Failed to turn up StatefulSet : %s", err)
  1160. }
  1161. var err error
  1162. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  1163. if err != nil {
  1164. t.Fatalf("Error getting updated StatefulSet: %v", err)
  1165. }
  1166. for i := 0; i < int(*set.Spec.RevisionHistoryLimit)+5; i++ {
  1167. set.Spec.Template.Spec.Containers[0].Image = fmt.Sprintf("foo-%d", i)
  1168. ssu.SetUpdateStatefulSetStatusError(apierrors.NewInternalError(errors.New("API server failed")), 2)
  1169. updateStatefulSetControl(set, ssc, spc, assertUpdateInvariants)
  1170. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  1171. if err != nil {
  1172. t.Fatalf("Error getting updated StatefulSet: %v", err)
  1173. }
  1174. revisions, err := ssc.ListRevisions(set)
  1175. if err != nil {
  1176. t.Fatalf("Error listing revisions: %v", err)
  1177. }
  1178. // the extra 2 revisions are `currentRevision` and `updateRevision`
  1179. // They're considered as `live`, and truncateHistory only cleans up non-live revisions
  1180. if len(revisions) > int(*set.Spec.RevisionHistoryLimit)+2 {
  1181. t.Fatalf("%s: %d greater than limit %d", "", len(revisions), *set.Spec.RevisionHistoryLimit)
  1182. }
  1183. }
  1184. }
  1185. func TestStatefulSetControlLimitsHistory(t *testing.T) {
  1186. type testcase struct {
  1187. name string
  1188. invariants func(set *apps.StatefulSet, spc *fakeStatefulPodControl) error
  1189. initial func() *apps.StatefulSet
  1190. }
  1191. testFn := func(test *testcase, t *testing.T) {
  1192. set := test.initial()
  1193. client := fake.NewSimpleClientset(set)
  1194. spc, _, ssc, stop := setupController(client)
  1195. defer close(stop)
  1196. if err := scaleUpStatefulSetControl(set, ssc, spc, test.invariants); err != nil {
  1197. t.Fatalf("%s: %s", test.name, err)
  1198. }
  1199. set, err := spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  1200. if err != nil {
  1201. t.Fatalf("%s: %s", test.name, err)
  1202. }
  1203. for i := 0; i < 10; i++ {
  1204. set.Spec.Template.Spec.Containers[0].Image = fmt.Sprintf("foo-%d", i)
  1205. if err := updateStatefulSetControl(set, ssc, spc, assertUpdateInvariants); err != nil {
  1206. t.Fatalf("%s: %s", test.name, err)
  1207. }
  1208. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  1209. if err != nil {
  1210. t.Fatalf("%s: %s", test.name, err)
  1211. }
  1212. pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  1213. if err != nil {
  1214. t.Fatalf("%s: %s", test.name, err)
  1215. }
  1216. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  1217. if err != nil {
  1218. t.Fatalf("%s: %s", test.name, err)
  1219. }
  1220. err = ssc.UpdateStatefulSet(set, pods)
  1221. if err != nil {
  1222. t.Fatalf("%s: %s", test.name, err)
  1223. }
  1224. revisions, err := ssc.ListRevisions(set)
  1225. if err != nil {
  1226. t.Fatalf("%s: %s", test.name, err)
  1227. }
  1228. if len(revisions) > int(*set.Spec.RevisionHistoryLimit)+2 {
  1229. t.Fatalf("%s: %d greater than limit %d", test.name, len(revisions), *set.Spec.RevisionHistoryLimit)
  1230. }
  1231. }
  1232. }
  1233. tests := []testcase{
  1234. {
  1235. name: "monotonic update",
  1236. invariants: assertMonotonicInvariants,
  1237. initial: func() *apps.StatefulSet {
  1238. return newStatefulSet(3)
  1239. },
  1240. },
  1241. {
  1242. name: "burst update",
  1243. invariants: assertBurstInvariants,
  1244. initial: func() *apps.StatefulSet {
  1245. return burst(newStatefulSet(3))
  1246. },
  1247. },
  1248. }
  1249. for i := range tests {
  1250. testFn(&tests[i], t)
  1251. }
  1252. }
  1253. func TestStatefulSetControlRollback(t *testing.T) {
  1254. type testcase struct {
  1255. name string
  1256. invariants func(set *apps.StatefulSet, spc *fakeStatefulPodControl) error
  1257. initial func() *apps.StatefulSet
  1258. update func(set *apps.StatefulSet) *apps.StatefulSet
  1259. validateUpdate func(set *apps.StatefulSet, pods []*v1.Pod) error
  1260. validateRollback func(set *apps.StatefulSet, pods []*v1.Pod) error
  1261. }
  1262. originalImage := newStatefulSet(3).Spec.Template.Spec.Containers[0].Image
  1263. testFn := func(test *testcase, t *testing.T) {
  1264. set := test.initial()
  1265. client := fake.NewSimpleClientset(set)
  1266. spc, _, ssc, stop := setupController(client)
  1267. defer close(stop)
  1268. if err := scaleUpStatefulSetControl(set, ssc, spc, test.invariants); err != nil {
  1269. t.Fatalf("%s: %s", test.name, err)
  1270. }
  1271. set, err := spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  1272. if err != nil {
  1273. t.Fatalf("%s: %s", test.name, err)
  1274. }
  1275. set = test.update(set)
  1276. if err := updateStatefulSetControl(set, ssc, spc, assertUpdateInvariants); err != nil {
  1277. t.Fatalf("%s: %s", test.name, err)
  1278. }
  1279. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  1280. if err != nil {
  1281. t.Fatalf("%s: %s", test.name, err)
  1282. }
  1283. pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  1284. if err != nil {
  1285. t.Fatalf("%s: %s", test.name, err)
  1286. }
  1287. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  1288. if err != nil {
  1289. t.Fatalf("%s: %s", test.name, err)
  1290. }
  1291. if err := test.validateUpdate(set, pods); err != nil {
  1292. t.Fatalf("%s: %s", test.name, err)
  1293. }
  1294. revisions, err := ssc.ListRevisions(set)
  1295. if err != nil {
  1296. t.Fatalf("%s: %s", test.name, err)
  1297. }
  1298. history.SortControllerRevisions(revisions)
  1299. set, err = ApplyRevision(set, revisions[0])
  1300. if err != nil {
  1301. t.Fatalf("%s: %s", test.name, err)
  1302. }
  1303. if err := updateStatefulSetControl(set, ssc, spc, assertUpdateInvariants); err != nil {
  1304. t.Fatalf("%s: %s", test.name, err)
  1305. }
  1306. if err != nil {
  1307. t.Fatalf("%s: %s", test.name, err)
  1308. }
  1309. pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
  1310. if err != nil {
  1311. t.Fatalf("%s: %s", test.name, err)
  1312. }
  1313. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  1314. if err != nil {
  1315. t.Fatalf("%s: %s", test.name, err)
  1316. }
  1317. if err := test.validateRollback(set, pods); err != nil {
  1318. t.Fatalf("%s: %s", test.name, err)
  1319. }
  1320. }
  1321. tests := []testcase{
  1322. {
  1323. name: "monotonic image update",
  1324. invariants: assertMonotonicInvariants,
  1325. initial: func() *apps.StatefulSet {
  1326. return newStatefulSet(3)
  1327. },
  1328. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  1329. set.Spec.Template.Spec.Containers[0].Image = "foo"
  1330. return set
  1331. },
  1332. validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1333. sort.Sort(ascendingOrdinal(pods))
  1334. for i := range pods {
  1335. if pods[i].Spec.Containers[0].Image != "foo" {
  1336. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  1337. }
  1338. }
  1339. return nil
  1340. },
  1341. validateRollback: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1342. sort.Sort(ascendingOrdinal(pods))
  1343. for i := range pods {
  1344. if pods[i].Spec.Containers[0].Image != originalImage {
  1345. return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  1346. }
  1347. }
  1348. return nil
  1349. },
  1350. },
  1351. {
  1352. name: "monotonic image update and scale up",
  1353. invariants: assertMonotonicInvariants,
  1354. initial: func() *apps.StatefulSet {
  1355. return newStatefulSet(3)
  1356. },
  1357. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  1358. *set.Spec.Replicas = 5
  1359. set.Spec.Template.Spec.Containers[0].Image = "foo"
  1360. return set
  1361. },
  1362. validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1363. sort.Sort(ascendingOrdinal(pods))
  1364. for i := range pods {
  1365. if pods[i].Spec.Containers[0].Image != "foo" {
  1366. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  1367. }
  1368. }
  1369. return nil
  1370. },
  1371. validateRollback: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1372. sort.Sort(ascendingOrdinal(pods))
  1373. for i := range pods {
  1374. if pods[i].Spec.Containers[0].Image != originalImage {
  1375. return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  1376. }
  1377. }
  1378. return nil
  1379. },
  1380. },
  1381. {
  1382. name: "monotonic image update and scale down",
  1383. invariants: assertMonotonicInvariants,
  1384. initial: func() *apps.StatefulSet {
  1385. return newStatefulSet(5)
  1386. },
  1387. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  1388. *set.Spec.Replicas = 3
  1389. set.Spec.Template.Spec.Containers[0].Image = "foo"
  1390. return set
  1391. },
  1392. validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1393. sort.Sort(ascendingOrdinal(pods))
  1394. for i := range pods {
  1395. if pods[i].Spec.Containers[0].Image != "foo" {
  1396. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  1397. }
  1398. }
  1399. return nil
  1400. },
  1401. validateRollback: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1402. sort.Sort(ascendingOrdinal(pods))
  1403. for i := range pods {
  1404. if pods[i].Spec.Containers[0].Image != originalImage {
  1405. return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  1406. }
  1407. }
  1408. return nil
  1409. },
  1410. },
  1411. {
  1412. name: "burst image update",
  1413. invariants: assertBurstInvariants,
  1414. initial: func() *apps.StatefulSet {
  1415. return burst(newStatefulSet(3))
  1416. },
  1417. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  1418. set.Spec.Template.Spec.Containers[0].Image = "foo"
  1419. return set
  1420. },
  1421. validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1422. sort.Sort(ascendingOrdinal(pods))
  1423. for i := range pods {
  1424. if pods[i].Spec.Containers[0].Image != "foo" {
  1425. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  1426. }
  1427. }
  1428. return nil
  1429. },
  1430. validateRollback: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1431. sort.Sort(ascendingOrdinal(pods))
  1432. for i := range pods {
  1433. if pods[i].Spec.Containers[0].Image != originalImage {
  1434. return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  1435. }
  1436. }
  1437. return nil
  1438. },
  1439. },
  1440. {
  1441. name: "burst image update and scale up",
  1442. invariants: assertBurstInvariants,
  1443. initial: func() *apps.StatefulSet {
  1444. return burst(newStatefulSet(3))
  1445. },
  1446. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  1447. *set.Spec.Replicas = 5
  1448. set.Spec.Template.Spec.Containers[0].Image = "foo"
  1449. return set
  1450. },
  1451. validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1452. sort.Sort(ascendingOrdinal(pods))
  1453. for i := range pods {
  1454. if pods[i].Spec.Containers[0].Image != "foo" {
  1455. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  1456. }
  1457. }
  1458. return nil
  1459. },
  1460. validateRollback: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1461. sort.Sort(ascendingOrdinal(pods))
  1462. for i := range pods {
  1463. if pods[i].Spec.Containers[0].Image != originalImage {
  1464. return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  1465. }
  1466. }
  1467. return nil
  1468. },
  1469. },
  1470. {
  1471. name: "burst image update and scale down",
  1472. invariants: assertBurstInvariants,
  1473. initial: func() *apps.StatefulSet {
  1474. return burst(newStatefulSet(5))
  1475. },
  1476. update: func(set *apps.StatefulSet) *apps.StatefulSet {
  1477. *set.Spec.Replicas = 3
  1478. set.Spec.Template.Spec.Containers[0].Image = "foo"
  1479. return set
  1480. },
  1481. validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1482. sort.Sort(ascendingOrdinal(pods))
  1483. for i := range pods {
  1484. if pods[i].Spec.Containers[0].Image != "foo" {
  1485. return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  1486. }
  1487. }
  1488. return nil
  1489. },
  1490. validateRollback: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1491. sort.Sort(ascendingOrdinal(pods))
  1492. for i := range pods {
  1493. if pods[i].Spec.Containers[0].Image != originalImage {
  1494. return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  1495. }
  1496. }
  1497. return nil
  1498. },
  1499. },
  1500. }
  1501. for i := range tests {
  1502. testFn(&tests[i], t)
  1503. }
  1504. }
  1505. type requestTracker struct {
  1506. requests int
  1507. err error
  1508. after int
  1509. }
  1510. func (rt *requestTracker) errorReady() bool {
  1511. return rt.err != nil && rt.requests >= rt.after
  1512. }
  1513. func (rt *requestTracker) inc() {
  1514. rt.requests++
  1515. }
  1516. func (rt *requestTracker) reset() {
  1517. rt.err = nil
  1518. rt.after = 0
  1519. }
  1520. type fakeStatefulPodControl struct {
  1521. podsLister corelisters.PodLister
  1522. claimsLister corelisters.PersistentVolumeClaimLister
  1523. setsLister appslisters.StatefulSetLister
  1524. podsIndexer cache.Indexer
  1525. claimsIndexer cache.Indexer
  1526. setsIndexer cache.Indexer
  1527. revisionsIndexer cache.Indexer
  1528. createPodTracker requestTracker
  1529. updatePodTracker requestTracker
  1530. deletePodTracker requestTracker
  1531. }
  1532. func newFakeStatefulPodControl(podInformer coreinformers.PodInformer, setInformer appsinformers.StatefulSetInformer, revisionInformer appsinformers.ControllerRevisionInformer) *fakeStatefulPodControl {
  1533. claimsIndexer := cache.NewIndexer(controller.KeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
  1534. return &fakeStatefulPodControl{
  1535. podInformer.Lister(),
  1536. corelisters.NewPersistentVolumeClaimLister(claimsIndexer),
  1537. setInformer.Lister(),
  1538. podInformer.Informer().GetIndexer(),
  1539. claimsIndexer,
  1540. setInformer.Informer().GetIndexer(),
  1541. revisionInformer.Informer().GetIndexer(),
  1542. requestTracker{0, nil, 0},
  1543. requestTracker{0, nil, 0},
  1544. requestTracker{0, nil, 0}}
  1545. }
  1546. func (spc *fakeStatefulPodControl) SetCreateStatefulPodError(err error, after int) {
  1547. spc.createPodTracker.err = err
  1548. spc.createPodTracker.after = after
  1549. }
  1550. func (spc *fakeStatefulPodControl) SetUpdateStatefulPodError(err error, after int) {
  1551. spc.updatePodTracker.err = err
  1552. spc.updatePodTracker.after = after
  1553. }
  1554. func (spc *fakeStatefulPodControl) SetDeleteStatefulPodError(err error, after int) {
  1555. spc.deletePodTracker.err = err
  1556. spc.deletePodTracker.after = after
  1557. }
  1558. func (spc *fakeStatefulPodControl) setPodPending(set *apps.StatefulSet, ordinal int) ([]*v1.Pod, error) {
  1559. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  1560. if err != nil {
  1561. return nil, err
  1562. }
  1563. pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  1564. if err != nil {
  1565. return nil, err
  1566. }
  1567. if 0 > ordinal || ordinal >= len(pods) {
  1568. return nil, fmt.Errorf("ordinal %d out of range [0,%d)", ordinal, len(pods))
  1569. }
  1570. sort.Sort(ascendingOrdinal(pods))
  1571. pod := pods[ordinal].DeepCopy()
  1572. pod.Status.Phase = v1.PodPending
  1573. fakeResourceVersion(pod)
  1574. spc.podsIndexer.Update(pod)
  1575. return spc.podsLister.Pods(set.Namespace).List(selector)
  1576. }
  1577. func (spc *fakeStatefulPodControl) setPodRunning(set *apps.StatefulSet, ordinal int) ([]*v1.Pod, error) {
  1578. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  1579. if err != nil {
  1580. return nil, err
  1581. }
  1582. pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  1583. if err != nil {
  1584. return nil, err
  1585. }
  1586. if 0 > ordinal || ordinal >= len(pods) {
  1587. return nil, fmt.Errorf("ordinal %d out of range [0,%d)", ordinal, len(pods))
  1588. }
  1589. sort.Sort(ascendingOrdinal(pods))
  1590. pod := pods[ordinal].DeepCopy()
  1591. pod.Status.Phase = v1.PodRunning
  1592. fakeResourceVersion(pod)
  1593. spc.podsIndexer.Update(pod)
  1594. return spc.podsLister.Pods(set.Namespace).List(selector)
  1595. }
  1596. func (spc *fakeStatefulPodControl) setPodReady(set *apps.StatefulSet, ordinal int) ([]*v1.Pod, error) {
  1597. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  1598. if err != nil {
  1599. return nil, err
  1600. }
  1601. pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  1602. if err != nil {
  1603. return nil, err
  1604. }
  1605. if 0 > ordinal || ordinal >= len(pods) {
  1606. return nil, fmt.Errorf("ordinal %d out of range [0,%d)", ordinal, len(pods))
  1607. }
  1608. sort.Sort(ascendingOrdinal(pods))
  1609. pod := pods[ordinal].DeepCopy()
  1610. condition := v1.PodCondition{Type: v1.PodReady, Status: v1.ConditionTrue}
  1611. podutil.UpdatePodCondition(&pod.Status, &condition)
  1612. fakeResourceVersion(pod)
  1613. spc.podsIndexer.Update(pod)
  1614. return spc.podsLister.Pods(set.Namespace).List(selector)
  1615. }
  1616. func (spc *fakeStatefulPodControl) addTerminatingPod(set *apps.StatefulSet, ordinal int) ([]*v1.Pod, error) {
  1617. pod := newStatefulSetPod(set, ordinal)
  1618. pod.Status.Phase = v1.PodRunning
  1619. deleted := metav1.NewTime(time.Now())
  1620. pod.DeletionTimestamp = &deleted
  1621. condition := v1.PodCondition{Type: v1.PodReady, Status: v1.ConditionTrue}
  1622. fakeResourceVersion(pod)
  1623. podutil.UpdatePodCondition(&pod.Status, &condition)
  1624. spc.podsIndexer.Update(pod)
  1625. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  1626. if err != nil {
  1627. return nil, err
  1628. }
  1629. return spc.podsLister.Pods(set.Namespace).List(selector)
  1630. }
  1631. func (spc *fakeStatefulPodControl) setPodTerminated(set *apps.StatefulSet, ordinal int) ([]*v1.Pod, error) {
  1632. pod := newStatefulSetPod(set, ordinal)
  1633. deleted := metav1.NewTime(time.Now())
  1634. pod.DeletionTimestamp = &deleted
  1635. fakeResourceVersion(pod)
  1636. spc.podsIndexer.Update(pod)
  1637. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  1638. if err != nil {
  1639. return nil, err
  1640. }
  1641. return spc.podsLister.Pods(set.Namespace).List(selector)
  1642. }
  1643. func (spc *fakeStatefulPodControl) CreateStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error {
  1644. defer spc.createPodTracker.inc()
  1645. if spc.createPodTracker.errorReady() {
  1646. defer spc.createPodTracker.reset()
  1647. return spc.createPodTracker.err
  1648. }
  1649. for _, claim := range getPersistentVolumeClaims(set, pod) {
  1650. spc.claimsIndexer.Update(&claim)
  1651. }
  1652. spc.podsIndexer.Update(pod)
  1653. return nil
  1654. }
  1655. func (spc *fakeStatefulPodControl) UpdateStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error {
  1656. defer spc.updatePodTracker.inc()
  1657. if spc.updatePodTracker.errorReady() {
  1658. defer spc.updatePodTracker.reset()
  1659. return spc.updatePodTracker.err
  1660. }
  1661. if !identityMatches(set, pod) {
  1662. updateIdentity(set, pod)
  1663. }
  1664. if !storageMatches(set, pod) {
  1665. updateStorage(set, pod)
  1666. for _, claim := range getPersistentVolumeClaims(set, pod) {
  1667. spc.claimsIndexer.Update(&claim)
  1668. }
  1669. }
  1670. spc.podsIndexer.Update(pod)
  1671. return nil
  1672. }
  1673. func (spc *fakeStatefulPodControl) DeleteStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error {
  1674. defer spc.deletePodTracker.inc()
  1675. if spc.deletePodTracker.errorReady() {
  1676. defer spc.deletePodTracker.reset()
  1677. return spc.deletePodTracker.err
  1678. }
  1679. if key, err := controller.KeyFunc(pod); err != nil {
  1680. return err
  1681. } else if obj, found, err := spc.podsIndexer.GetByKey(key); err != nil {
  1682. return err
  1683. } else if found {
  1684. spc.podsIndexer.Delete(obj)
  1685. }
  1686. return nil
  1687. }
  1688. var _ StatefulPodControlInterface = &fakeStatefulPodControl{}
  1689. type fakeStatefulSetStatusUpdater struct {
  1690. setsLister appslisters.StatefulSetLister
  1691. setsIndexer cache.Indexer
  1692. updateStatusTracker requestTracker
  1693. }
  1694. func newFakeStatefulSetStatusUpdater(setInformer appsinformers.StatefulSetInformer) *fakeStatefulSetStatusUpdater {
  1695. return &fakeStatefulSetStatusUpdater{
  1696. setInformer.Lister(),
  1697. setInformer.Informer().GetIndexer(),
  1698. requestTracker{0, nil, 0},
  1699. }
  1700. }
  1701. func (ssu *fakeStatefulSetStatusUpdater) UpdateStatefulSetStatus(set *apps.StatefulSet, status *apps.StatefulSetStatus) error {
  1702. defer ssu.updateStatusTracker.inc()
  1703. if ssu.updateStatusTracker.errorReady() {
  1704. defer ssu.updateStatusTracker.reset()
  1705. return ssu.updateStatusTracker.err
  1706. }
  1707. set.Status = *status
  1708. ssu.setsIndexer.Update(set)
  1709. return nil
  1710. }
  1711. func (ssu *fakeStatefulSetStatusUpdater) SetUpdateStatefulSetStatusError(err error, after int) {
  1712. ssu.updateStatusTracker.err = err
  1713. ssu.updateStatusTracker.after = after
  1714. }
  1715. var _ StatefulSetStatusUpdaterInterface = &fakeStatefulSetStatusUpdater{}
  1716. func assertMonotonicInvariants(set *apps.StatefulSet, spc *fakeStatefulPodControl) error {
  1717. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  1718. if err != nil {
  1719. return err
  1720. }
  1721. pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  1722. if err != nil {
  1723. return err
  1724. }
  1725. sort.Sort(ascendingOrdinal(pods))
  1726. for ord := 0; ord < len(pods); ord++ {
  1727. if ord > 0 && isRunningAndReady(pods[ord]) && !isRunningAndReady(pods[ord-1]) {
  1728. return fmt.Errorf("successor %s is Running and Ready while %s is not", pods[ord].Name, pods[ord-1].Name)
  1729. }
  1730. if getOrdinal(pods[ord]) != ord {
  1731. return fmt.Errorf("pods %s deployed in the wrong order %d", pods[ord].Name, ord)
  1732. }
  1733. if !storageMatches(set, pods[ord]) {
  1734. return fmt.Errorf("pods %s does not match the storage specification of StatefulSet %s ", pods[ord].Name, set.Name)
  1735. }
  1736. for _, claim := range getPersistentVolumeClaims(set, pods[ord]) {
  1737. claim, err := spc.claimsLister.PersistentVolumeClaims(set.Namespace).Get(claim.Name)
  1738. if err != nil {
  1739. return err
  1740. }
  1741. if claim == nil {
  1742. return fmt.Errorf("claim %s for Pod %s was not created", claim.Name, pods[ord].Name)
  1743. }
  1744. }
  1745. if !identityMatches(set, pods[ord]) {
  1746. return fmt.Errorf("pods %s does not match the identity specification of StatefulSet %s ", pods[ord].Name, set.Name)
  1747. }
  1748. }
  1749. return nil
  1750. }
  1751. func assertBurstInvariants(set *apps.StatefulSet, spc *fakeStatefulPodControl) error {
  1752. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  1753. if err != nil {
  1754. return err
  1755. }
  1756. pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  1757. if err != nil {
  1758. return err
  1759. }
  1760. sort.Sort(ascendingOrdinal(pods))
  1761. for ord := 0; ord < len(pods); ord++ {
  1762. if !storageMatches(set, pods[ord]) {
  1763. return fmt.Errorf("pods %s does not match the storage specification of StatefulSet %s ", pods[ord].Name, set.Name)
  1764. }
  1765. for _, claim := range getPersistentVolumeClaims(set, pods[ord]) {
  1766. claim, err := spc.claimsLister.PersistentVolumeClaims(set.Namespace).Get(claim.Name)
  1767. if err != nil {
  1768. return err
  1769. }
  1770. if claim == nil {
  1771. return fmt.Errorf("claim %s for Pod %s was not created", claim.Name, pods[ord].Name)
  1772. }
  1773. }
  1774. if !identityMatches(set, pods[ord]) {
  1775. return fmt.Errorf("pods %s does not match the identity specification of StatefulSet %s ",
  1776. pods[ord].Name,
  1777. set.Name)
  1778. }
  1779. }
  1780. return nil
  1781. }
  1782. func assertUpdateInvariants(set *apps.StatefulSet, spc *fakeStatefulPodControl) error {
  1783. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  1784. if err != nil {
  1785. return err
  1786. }
  1787. pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  1788. if err != nil {
  1789. return err
  1790. }
  1791. sort.Sort(ascendingOrdinal(pods))
  1792. for ord := 0; ord < len(pods); ord++ {
  1793. if !storageMatches(set, pods[ord]) {
  1794. return fmt.Errorf("pod %s does not match the storage specification of StatefulSet %s ", pods[ord].Name, set.Name)
  1795. }
  1796. for _, claim := range getPersistentVolumeClaims(set, pods[ord]) {
  1797. claim, err := spc.claimsLister.PersistentVolumeClaims(set.Namespace).Get(claim.Name)
  1798. if err != nil {
  1799. return err
  1800. }
  1801. if claim == nil {
  1802. return fmt.Errorf("claim %s for Pod %s was not created", claim.Name, pods[ord].Name)
  1803. }
  1804. }
  1805. if !identityMatches(set, pods[ord]) {
  1806. return fmt.Errorf("pod %s does not match the identity specification of StatefulSet %s ", pods[ord].Name, set.Name)
  1807. }
  1808. }
  1809. if set.Spec.UpdateStrategy.Type == apps.OnDeleteStatefulSetStrategyType {
  1810. return nil
  1811. }
  1812. if set.Spec.UpdateStrategy.Type == apps.RollingUpdateStatefulSetStrategyType {
  1813. for i := 0; i < int(set.Status.CurrentReplicas) && i < len(pods); i++ {
  1814. if want, got := set.Status.CurrentRevision, getPodRevision(pods[i]); want != got {
  1815. return fmt.Errorf("pod %s want current revision %s got %s", pods[i].Name, want, got)
  1816. }
  1817. }
  1818. for i, j := len(pods)-1, 0; j < int(set.Status.UpdatedReplicas); i, j = i-1, j+1 {
  1819. if want, got := set.Status.UpdateRevision, getPodRevision(pods[i]); want != got {
  1820. return fmt.Errorf("pod %s want update revision %s got %s", pods[i].Name, want, got)
  1821. }
  1822. }
  1823. }
  1824. return nil
  1825. }
  1826. func fakeResourceVersion(object interface{}) {
  1827. obj, isObj := object.(metav1.Object)
  1828. if !isObj {
  1829. return
  1830. }
  1831. if version := obj.GetResourceVersion(); version == "" {
  1832. obj.SetResourceVersion("1")
  1833. } else if intValue, err := strconv.ParseInt(version, 10, 32); err == nil {
  1834. obj.SetResourceVersion(strconv.FormatInt(intValue+1, 10))
  1835. }
  1836. }
  1837. func scaleUpStatefulSetControl(set *apps.StatefulSet,
  1838. ssc StatefulSetControlInterface,
  1839. spc *fakeStatefulPodControl,
  1840. invariants invariantFunc) error {
  1841. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  1842. if err != nil {
  1843. return err
  1844. }
  1845. for set.Status.ReadyReplicas < *set.Spec.Replicas {
  1846. pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  1847. if err != nil {
  1848. return err
  1849. }
  1850. sort.Sort(ascendingOrdinal(pods))
  1851. // ensure all pods are valid (have a phase)
  1852. initialized := false
  1853. for ord, pod := range pods {
  1854. if pod.Status.Phase == "" {
  1855. if pods, err = spc.setPodPending(set, ord); err != nil {
  1856. return err
  1857. }
  1858. break
  1859. }
  1860. }
  1861. if initialized {
  1862. continue
  1863. }
  1864. // select one of the pods and move it forward in status
  1865. if len(pods) > 0 {
  1866. ord := int(rand.Int63n(int64(len(pods))))
  1867. pod := pods[ord]
  1868. switch pod.Status.Phase {
  1869. case v1.PodPending:
  1870. if pods, err = spc.setPodRunning(set, ord); err != nil {
  1871. return err
  1872. }
  1873. case v1.PodRunning:
  1874. if pods, err = spc.setPodReady(set, ord); err != nil {
  1875. return err
  1876. }
  1877. default:
  1878. continue
  1879. }
  1880. }
  1881. // run the controller once and check invariants
  1882. if err = ssc.UpdateStatefulSet(set, pods); err != nil {
  1883. return err
  1884. }
  1885. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  1886. if err != nil {
  1887. return err
  1888. }
  1889. if err := invariants(set, spc); err != nil {
  1890. return err
  1891. }
  1892. }
  1893. return invariants(set, spc)
  1894. }
  1895. func scaleDownStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlInterface, spc *fakeStatefulPodControl, invariants invariantFunc) error {
  1896. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  1897. if err != nil {
  1898. return err
  1899. }
  1900. for set.Status.Replicas > *set.Spec.Replicas {
  1901. pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  1902. if err != nil {
  1903. return err
  1904. }
  1905. sort.Sort(ascendingOrdinal(pods))
  1906. if ordinal := len(pods) - 1; ordinal >= 0 {
  1907. if err := ssc.UpdateStatefulSet(set, pods); err != nil {
  1908. return err
  1909. }
  1910. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  1911. if err != nil {
  1912. return err
  1913. }
  1914. if pods, err = spc.addTerminatingPod(set, ordinal); err != nil {
  1915. return err
  1916. }
  1917. if err = ssc.UpdateStatefulSet(set, pods); err != nil {
  1918. return err
  1919. }
  1920. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  1921. if err != nil {
  1922. return err
  1923. }
  1924. pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
  1925. if err != nil {
  1926. return err
  1927. }
  1928. sort.Sort(ascendingOrdinal(pods))
  1929. if len(pods) > 0 {
  1930. spc.podsIndexer.Delete(pods[len(pods)-1])
  1931. }
  1932. }
  1933. if err := ssc.UpdateStatefulSet(set, pods); err != nil {
  1934. return err
  1935. }
  1936. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  1937. if err != nil {
  1938. return err
  1939. }
  1940. if err := invariants(set, spc); err != nil {
  1941. return err
  1942. }
  1943. }
  1944. return invariants(set, spc)
  1945. }
  1946. func updateComplete(set *apps.StatefulSet, pods []*v1.Pod) bool {
  1947. sort.Sort(ascendingOrdinal(pods))
  1948. if len(pods) != int(*set.Spec.Replicas) {
  1949. return false
  1950. }
  1951. if set.Status.ReadyReplicas != *set.Spec.Replicas {
  1952. return false
  1953. }
  1954. switch set.Spec.UpdateStrategy.Type {
  1955. case apps.OnDeleteStatefulSetStrategyType:
  1956. return true
  1957. case apps.RollingUpdateStatefulSetStrategyType:
  1958. if set.Spec.UpdateStrategy.RollingUpdate == nil || *set.Spec.UpdateStrategy.RollingUpdate.Partition <= 0 {
  1959. if set.Status.CurrentReplicas < *set.Spec.Replicas {
  1960. return false
  1961. }
  1962. for i := range pods {
  1963. if getPodRevision(pods[i]) != set.Status.CurrentRevision {
  1964. return false
  1965. }
  1966. }
  1967. } else {
  1968. partition := int(*set.Spec.UpdateStrategy.RollingUpdate.Partition)
  1969. if len(pods) < partition {
  1970. return false
  1971. }
  1972. for i := partition; i < len(pods); i++ {
  1973. if getPodRevision(pods[i]) != set.Status.UpdateRevision {
  1974. return false
  1975. }
  1976. }
  1977. }
  1978. }
  1979. return true
  1980. }
  1981. func updateStatefulSetControl(set *apps.StatefulSet,
  1982. ssc StatefulSetControlInterface,
  1983. spc *fakeStatefulPodControl,
  1984. invariants invariantFunc) error {
  1985. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  1986. if err != nil {
  1987. return err
  1988. }
  1989. pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  1990. if err != nil {
  1991. return err
  1992. }
  1993. if err = ssc.UpdateStatefulSet(set, pods); err != nil {
  1994. return err
  1995. }
  1996. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  1997. if err != nil {
  1998. return err
  1999. }
  2000. pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
  2001. if err != nil {
  2002. return err
  2003. }
  2004. for !updateComplete(set, pods) {
  2005. pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
  2006. if err != nil {
  2007. return err
  2008. }
  2009. sort.Sort(ascendingOrdinal(pods))
  2010. initialized := false
  2011. for ord, pod := range pods {
  2012. if pod.Status.Phase == "" {
  2013. if pods, err = spc.setPodPending(set, ord); err != nil {
  2014. return err
  2015. }
  2016. break
  2017. }
  2018. }
  2019. if initialized {
  2020. continue
  2021. }
  2022. if len(pods) > 0 {
  2023. ord := int(rand.Int63n(int64(len(pods))))
  2024. pod := pods[ord]
  2025. switch pod.Status.Phase {
  2026. case v1.PodPending:
  2027. if pods, err = spc.setPodRunning(set, ord); err != nil {
  2028. return err
  2029. }
  2030. case v1.PodRunning:
  2031. if pods, err = spc.setPodReady(set, ord); err != nil {
  2032. return err
  2033. }
  2034. default:
  2035. continue
  2036. }
  2037. }
  2038. if err = ssc.UpdateStatefulSet(set, pods); err != nil {
  2039. return err
  2040. }
  2041. set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  2042. if err != nil {
  2043. return err
  2044. }
  2045. if err := invariants(set, spc); err != nil {
  2046. return err
  2047. }
  2048. pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
  2049. if err != nil {
  2050. return err
  2051. }
  2052. }
  2053. return invariants(set, spc)
  2054. }
  2055. func newRevisionOrDie(set *apps.StatefulSet, revision int64) *apps.ControllerRevision {
  2056. rev, err := newRevision(set, revision, set.Status.CollisionCount)
  2057. if err != nil {
  2058. panic(err)
  2059. }
  2060. return rev
  2061. }
  2062. func isOrHasInternalError(err error) bool {
  2063. agg, ok := err.(utilerrors.Aggregate)
  2064. return !ok && !apierrors.IsInternalError(err) || ok && len(agg.Errors()) > 0 && !apierrors.IsInternalError(agg.Errors()[0])
  2065. }