stateful_pod_control_test.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package statefulset
  14. import (
  15. "errors"
  16. "strings"
  17. "testing"
  18. apierrors "k8s.io/apimachinery/pkg/api/errors"
  19. "k8s.io/apimachinery/pkg/runtime"
  20. core "k8s.io/client-go/testing"
  21. "k8s.io/client-go/tools/cache"
  22. "k8s.io/client-go/tools/record"
  23. "k8s.io/api/core/v1"
  24. "k8s.io/client-go/kubernetes/fake"
  25. corelisters "k8s.io/client-go/listers/core/v1"
  26. _ "k8s.io/kubernetes/pkg/apis/apps/install"
  27. _ "k8s.io/kubernetes/pkg/apis/core/install"
  28. )
  29. func TestStatefulPodControlCreatesPods(t *testing.T) {
  30. recorder := record.NewFakeRecorder(10)
  31. set := newStatefulSet(3)
  32. pod := newStatefulSetPod(set, 0)
  33. fakeClient := &fake.Clientset{}
  34. pvcIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
  35. pvcLister := corelisters.NewPersistentVolumeClaimLister(pvcIndexer)
  36. control := NewRealStatefulPodControl(fakeClient, nil, nil, pvcLister, recorder)
  37. fakeClient.AddReactor("get", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
  38. return true, nil, apierrors.NewNotFound(action.GetResource().GroupResource(), action.GetResource().Resource)
  39. })
  40. fakeClient.AddReactor("create", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
  41. create := action.(core.CreateAction)
  42. return true, create.GetObject(), nil
  43. })
  44. fakeClient.AddReactor("create", "pods", func(action core.Action) (bool, runtime.Object, error) {
  45. create := action.(core.CreateAction)
  46. return true, create.GetObject(), nil
  47. })
  48. if err := control.CreateStatefulPod(set, pod); err != nil {
  49. t.Errorf("StatefulPodControl failed to create Pod error: %s", err)
  50. }
  51. events := collectEvents(recorder.Events)
  52. if eventCount := len(events); eventCount != 2 {
  53. t.Errorf("Expected 2 events for successful create found %d", eventCount)
  54. }
  55. for i := range events {
  56. if !strings.Contains(events[i], v1.EventTypeNormal) {
  57. t.Errorf("Found unexpected non-normal event %s", events[i])
  58. }
  59. }
  60. }
  61. func TestStatefulPodControlCreatePodExists(t *testing.T) {
  62. recorder := record.NewFakeRecorder(10)
  63. set := newStatefulSet(3)
  64. pod := newStatefulSetPod(set, 0)
  65. fakeClient := &fake.Clientset{}
  66. pvcs := getPersistentVolumeClaims(set, pod)
  67. pvcIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
  68. for k := range pvcs {
  69. pvc := pvcs[k]
  70. pvcIndexer.Add(&pvc)
  71. }
  72. pvcLister := corelisters.NewPersistentVolumeClaimLister(pvcIndexer)
  73. control := NewRealStatefulPodControl(fakeClient, nil, nil, pvcLister, recorder)
  74. fakeClient.AddReactor("create", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
  75. create := action.(core.CreateAction)
  76. return true, create.GetObject(), nil
  77. })
  78. fakeClient.AddReactor("create", "pods", func(action core.Action) (bool, runtime.Object, error) {
  79. return true, pod, apierrors.NewAlreadyExists(action.GetResource().GroupResource(), pod.Name)
  80. })
  81. if err := control.CreateStatefulPod(set, pod); !apierrors.IsAlreadyExists(err) {
  82. t.Errorf("Failed to create Pod error: %s", err)
  83. }
  84. events := collectEvents(recorder.Events)
  85. if eventCount := len(events); eventCount != 0 {
  86. t.Errorf("Pod and PVC exist: got %d events, but want 0", eventCount)
  87. for i := range events {
  88. t.Log(events[i])
  89. }
  90. }
  91. }
  92. func TestStatefulPodControlCreatePodPvcCreateFailure(t *testing.T) {
  93. recorder := record.NewFakeRecorder(10)
  94. set := newStatefulSet(3)
  95. pod := newStatefulSetPod(set, 0)
  96. fakeClient := &fake.Clientset{}
  97. pvcIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
  98. pvcLister := corelisters.NewPersistentVolumeClaimLister(pvcIndexer)
  99. control := NewRealStatefulPodControl(fakeClient, nil, nil, pvcLister, recorder)
  100. fakeClient.AddReactor("create", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
  101. return true, nil, apierrors.NewInternalError(errors.New("API server down"))
  102. })
  103. fakeClient.AddReactor("create", "pods", func(action core.Action) (bool, runtime.Object, error) {
  104. create := action.(core.CreateAction)
  105. return true, create.GetObject(), nil
  106. })
  107. if err := control.CreateStatefulPod(set, pod); err == nil {
  108. t.Error("Failed to produce error on PVC creation failure")
  109. }
  110. events := collectEvents(recorder.Events)
  111. if eventCount := len(events); eventCount != 2 {
  112. t.Errorf("PVC create failure: got %d events, but want 2", eventCount)
  113. }
  114. for i := range events {
  115. if !strings.Contains(events[i], v1.EventTypeWarning) {
  116. t.Errorf("Found unexpected non-warning event %s", events[i])
  117. }
  118. }
  119. }
  120. type fakeIndexer struct {
  121. cache.Indexer
  122. getError error
  123. }
  124. func (f *fakeIndexer) GetByKey(key string) (interface{}, bool, error) {
  125. return nil, false, f.getError
  126. }
  127. func TestStatefulPodControlCreatePodPvcGetFailure(t *testing.T) {
  128. recorder := record.NewFakeRecorder(10)
  129. set := newStatefulSet(3)
  130. pod := newStatefulSetPod(set, 0)
  131. fakeClient := &fake.Clientset{}
  132. pvcIndexer := &fakeIndexer{getError: errors.New("API server down")}
  133. pvcLister := corelisters.NewPersistentVolumeClaimLister(pvcIndexer)
  134. control := NewRealStatefulPodControl(fakeClient, nil, nil, pvcLister, recorder)
  135. fakeClient.AddReactor("create", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
  136. return true, nil, apierrors.NewInternalError(errors.New("API server down"))
  137. })
  138. fakeClient.AddReactor("create", "pods", func(action core.Action) (bool, runtime.Object, error) {
  139. create := action.(core.CreateAction)
  140. return true, create.GetObject(), nil
  141. })
  142. if err := control.CreateStatefulPod(set, pod); err == nil {
  143. t.Error("Failed to produce error on PVC creation failure")
  144. }
  145. events := collectEvents(recorder.Events)
  146. if eventCount := len(events); eventCount != 2 {
  147. t.Errorf("PVC create failure: got %d events, but want 2", eventCount)
  148. }
  149. for i := range events {
  150. if !strings.Contains(events[i], v1.EventTypeWarning) {
  151. t.Errorf("Found unexpected non-warning event: %s", events[i])
  152. }
  153. }
  154. }
  155. func TestStatefulPodControlCreatePodFailed(t *testing.T) {
  156. recorder := record.NewFakeRecorder(10)
  157. set := newStatefulSet(3)
  158. pod := newStatefulSetPod(set, 0)
  159. fakeClient := &fake.Clientset{}
  160. pvcIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
  161. pvcLister := corelisters.NewPersistentVolumeClaimLister(pvcIndexer)
  162. control := NewRealStatefulPodControl(fakeClient, nil, nil, pvcLister, recorder)
  163. fakeClient.AddReactor("create", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
  164. create := action.(core.CreateAction)
  165. return true, create.GetObject(), nil
  166. })
  167. fakeClient.AddReactor("create", "pods", func(action core.Action) (bool, runtime.Object, error) {
  168. return true, nil, apierrors.NewInternalError(errors.New("API server down"))
  169. })
  170. if err := control.CreateStatefulPod(set, pod); err == nil {
  171. t.Error("Failed to produce error on Pod creation failure")
  172. }
  173. events := collectEvents(recorder.Events)
  174. if eventCount := len(events); eventCount != 2 {
  175. t.Errorf("Pod create failed: got %d events, but want 2", eventCount)
  176. } else if !strings.Contains(events[0], v1.EventTypeNormal) {
  177. t.Errorf("Found unexpected non-normal event %s", events[0])
  178. } else if !strings.Contains(events[1], v1.EventTypeWarning) {
  179. t.Errorf("Found unexpected non-warning event %s", events[1])
  180. }
  181. }
  182. func TestStatefulPodControlNoOpUpdate(t *testing.T) {
  183. recorder := record.NewFakeRecorder(10)
  184. set := newStatefulSet(3)
  185. pod := newStatefulSetPod(set, 0)
  186. fakeClient := &fake.Clientset{}
  187. control := NewRealStatefulPodControl(fakeClient, nil, nil, nil, recorder)
  188. fakeClient.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) {
  189. t.Error("no-op update should not make any client invocation")
  190. return true, nil, apierrors.NewInternalError(errors.New("If we are here we have a problem"))
  191. })
  192. if err := control.UpdateStatefulPod(set, pod); err != nil {
  193. t.Errorf("Error returned on no-op update error: %s", err)
  194. }
  195. events := collectEvents(recorder.Events)
  196. if eventCount := len(events); eventCount != 0 {
  197. t.Errorf("no-op update: got %d events, but want 0", eventCount)
  198. }
  199. }
  200. func TestStatefulPodControlUpdatesIdentity(t *testing.T) {
  201. recorder := record.NewFakeRecorder(10)
  202. set := newStatefulSet(3)
  203. pod := newStatefulSetPod(set, 0)
  204. fakeClient := fake.NewSimpleClientset(set, pod)
  205. control := NewRealStatefulPodControl(fakeClient, nil, nil, nil, recorder)
  206. var updated *v1.Pod
  207. fakeClient.PrependReactor("update", "pods", func(action core.Action) (bool, runtime.Object, error) {
  208. update := action.(core.UpdateAction)
  209. updated = update.GetObject().(*v1.Pod)
  210. return true, update.GetObject(), nil
  211. })
  212. pod.Name = "goo-0"
  213. if err := control.UpdateStatefulPod(set, pod); err != nil {
  214. t.Errorf("Successful update returned an error: %s", err)
  215. }
  216. events := collectEvents(recorder.Events)
  217. if eventCount := len(events); eventCount != 1 {
  218. t.Errorf("Pod update successful:got %d events,but want 1", eventCount)
  219. } else if !strings.Contains(events[0], v1.EventTypeNormal) {
  220. t.Errorf("Found unexpected non-normal event %s", events[0])
  221. }
  222. if !identityMatches(set, updated) {
  223. t.Error("Name update failed identity does not match")
  224. }
  225. }
  226. func TestStatefulPodControlUpdateIdentityFailure(t *testing.T) {
  227. recorder := record.NewFakeRecorder(10)
  228. set := newStatefulSet(3)
  229. pod := newStatefulSetPod(set, 0)
  230. fakeClient := &fake.Clientset{}
  231. indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
  232. gooPod := newStatefulSetPod(set, 0)
  233. gooPod.Name = "goo-0"
  234. indexer.Add(gooPod)
  235. podLister := corelisters.NewPodLister(indexer)
  236. control := NewRealStatefulPodControl(fakeClient, nil, podLister, nil, recorder)
  237. fakeClient.AddReactor("update", "pods", func(action core.Action) (bool, runtime.Object, error) {
  238. pod.Name = "goo-0"
  239. return true, nil, apierrors.NewInternalError(errors.New("API server down"))
  240. })
  241. pod.Name = "goo-0"
  242. if err := control.UpdateStatefulPod(set, pod); err == nil {
  243. t.Error("Failed update does not generate an error")
  244. }
  245. events := collectEvents(recorder.Events)
  246. if eventCount := len(events); eventCount != 1 {
  247. t.Errorf("Pod update failed: got %d events, but want 1", eventCount)
  248. } else if !strings.Contains(events[0], v1.EventTypeWarning) {
  249. t.Errorf("Found unexpected non-warning event %s", events[0])
  250. }
  251. if identityMatches(set, pod) {
  252. t.Error("Failed update mutated Pod identity")
  253. }
  254. }
  255. func TestStatefulPodControlUpdatesPodStorage(t *testing.T) {
  256. recorder := record.NewFakeRecorder(10)
  257. set := newStatefulSet(3)
  258. pod := newStatefulSetPod(set, 0)
  259. fakeClient := &fake.Clientset{}
  260. pvcIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
  261. pvcLister := corelisters.NewPersistentVolumeClaimLister(pvcIndexer)
  262. control := NewRealStatefulPodControl(fakeClient, nil, nil, pvcLister, recorder)
  263. pvcs := getPersistentVolumeClaims(set, pod)
  264. volumes := make([]v1.Volume, 0, len(pod.Spec.Volumes))
  265. for i := range pod.Spec.Volumes {
  266. if _, contains := pvcs[pod.Spec.Volumes[i].Name]; !contains {
  267. volumes = append(volumes, pod.Spec.Volumes[i])
  268. }
  269. }
  270. pod.Spec.Volumes = volumes
  271. fakeClient.AddReactor("update", "pods", func(action core.Action) (bool, runtime.Object, error) {
  272. update := action.(core.UpdateAction)
  273. return true, update.GetObject(), nil
  274. })
  275. fakeClient.AddReactor("create", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
  276. update := action.(core.UpdateAction)
  277. return true, update.GetObject(), nil
  278. })
  279. var updated *v1.Pod
  280. fakeClient.PrependReactor("update", "pods", func(action core.Action) (bool, runtime.Object, error) {
  281. update := action.(core.UpdateAction)
  282. updated = update.GetObject().(*v1.Pod)
  283. return true, update.GetObject(), nil
  284. })
  285. if err := control.UpdateStatefulPod(set, pod); err != nil {
  286. t.Errorf("Successful update returned an error: %s", err)
  287. }
  288. events := collectEvents(recorder.Events)
  289. if eventCount := len(events); eventCount != 2 {
  290. t.Errorf("Pod storage update successful: got %d events, but want 2", eventCount)
  291. }
  292. for i := range events {
  293. if !strings.Contains(events[i], v1.EventTypeNormal) {
  294. t.Errorf("Found unexpected non-normal event %s", events[i])
  295. }
  296. }
  297. if !storageMatches(set, updated) {
  298. t.Error("Name update failed identity does not match")
  299. }
  300. }
  301. func TestStatefulPodControlUpdatePodStorageFailure(t *testing.T) {
  302. recorder := record.NewFakeRecorder(10)
  303. set := newStatefulSet(3)
  304. pod := newStatefulSetPod(set, 0)
  305. fakeClient := &fake.Clientset{}
  306. pvcIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
  307. pvcLister := corelisters.NewPersistentVolumeClaimLister(pvcIndexer)
  308. control := NewRealStatefulPodControl(fakeClient, nil, nil, pvcLister, recorder)
  309. pvcs := getPersistentVolumeClaims(set, pod)
  310. volumes := make([]v1.Volume, 0, len(pod.Spec.Volumes))
  311. for i := range pod.Spec.Volumes {
  312. if _, contains := pvcs[pod.Spec.Volumes[i].Name]; !contains {
  313. volumes = append(volumes, pod.Spec.Volumes[i])
  314. }
  315. }
  316. pod.Spec.Volumes = volumes
  317. fakeClient.AddReactor("update", "pods", func(action core.Action) (bool, runtime.Object, error) {
  318. update := action.(core.UpdateAction)
  319. return true, update.GetObject(), nil
  320. })
  321. fakeClient.AddReactor("create", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
  322. return true, nil, apierrors.NewInternalError(errors.New("API server down"))
  323. })
  324. if err := control.UpdateStatefulPod(set, pod); err == nil {
  325. t.Error("Failed Pod storage update did not return an error")
  326. }
  327. events := collectEvents(recorder.Events)
  328. if eventCount := len(events); eventCount != 2 {
  329. t.Errorf("Pod storage update failed: got %d events, but want 2", eventCount)
  330. }
  331. for i := range events {
  332. if !strings.Contains(events[i], v1.EventTypeWarning) {
  333. t.Errorf("Found unexpected non-normal event %s", events[i])
  334. }
  335. }
  336. }
  337. func TestStatefulPodControlUpdatePodConflictSuccess(t *testing.T) {
  338. recorder := record.NewFakeRecorder(10)
  339. set := newStatefulSet(3)
  340. pod := newStatefulSetPod(set, 0)
  341. fakeClient := &fake.Clientset{}
  342. indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
  343. gooPod := newStatefulSetPod(set, 0)
  344. gooPod.Name = "goo-0"
  345. indexer.Add(gooPod)
  346. podLister := corelisters.NewPodLister(indexer)
  347. control := NewRealStatefulPodControl(fakeClient, nil, podLister, nil, recorder)
  348. conflict := false
  349. fakeClient.AddReactor("update", "pods", func(action core.Action) (bool, runtime.Object, error) {
  350. update := action.(core.UpdateAction)
  351. if !conflict {
  352. conflict = true
  353. return true, update.GetObject(), apierrors.NewConflict(action.GetResource().GroupResource(), pod.Name, errors.New("conflict"))
  354. } else {
  355. return true, update.GetObject(), nil
  356. }
  357. })
  358. pod.Name = "goo-0"
  359. if err := control.UpdateStatefulPod(set, pod); err != nil {
  360. t.Errorf("Successful update returned an error: %s", err)
  361. }
  362. events := collectEvents(recorder.Events)
  363. if eventCount := len(events); eventCount != 1 {
  364. t.Errorf("Pod update successful: got %d, but want 1", eventCount)
  365. } else if !strings.Contains(events[0], v1.EventTypeNormal) {
  366. t.Errorf("Found unexpected non-normal event %s", events[0])
  367. }
  368. if !identityMatches(set, pod) {
  369. t.Error("Name update failed identity does not match")
  370. }
  371. }
  372. func TestStatefulPodControlDeletesStatefulPod(t *testing.T) {
  373. recorder := record.NewFakeRecorder(10)
  374. set := newStatefulSet(3)
  375. pod := newStatefulSetPod(set, 0)
  376. fakeClient := &fake.Clientset{}
  377. control := NewRealStatefulPodControl(fakeClient, nil, nil, nil, recorder)
  378. fakeClient.AddReactor("delete", "pods", func(action core.Action) (bool, runtime.Object, error) {
  379. return true, nil, nil
  380. })
  381. if err := control.DeleteStatefulPod(set, pod); err != nil {
  382. t.Errorf("Error returned on successful delete: %s", err)
  383. }
  384. events := collectEvents(recorder.Events)
  385. if eventCount := len(events); eventCount != 1 {
  386. t.Errorf("delete successful: got %d events, but want 1", eventCount)
  387. } else if !strings.Contains(events[0], v1.EventTypeNormal) {
  388. t.Errorf("Found unexpected non-normal event %s", events[0])
  389. }
  390. }
  391. func TestStatefulPodControlDeleteFailure(t *testing.T) {
  392. recorder := record.NewFakeRecorder(10)
  393. set := newStatefulSet(3)
  394. pod := newStatefulSetPod(set, 0)
  395. fakeClient := &fake.Clientset{}
  396. control := NewRealStatefulPodControl(fakeClient, nil, nil, nil, recorder)
  397. fakeClient.AddReactor("delete", "pods", func(action core.Action) (bool, runtime.Object, error) {
  398. return true, nil, apierrors.NewInternalError(errors.New("API server down"))
  399. })
  400. if err := control.DeleteStatefulPod(set, pod); err == nil {
  401. t.Error("Failed to return error on failed delete")
  402. }
  403. events := collectEvents(recorder.Events)
  404. if eventCount := len(events); eventCount != 1 {
  405. t.Errorf("delete failed: got %d events, but want 1", eventCount)
  406. } else if !strings.Contains(events[0], v1.EventTypeWarning) {
  407. t.Errorf("Found unexpected non-warning event %s", events[0])
  408. }
  409. }
  410. func collectEvents(source <-chan string) []string {
  411. done := false
  412. events := make([]string, 0)
  413. for !done {
  414. select {
  415. case event := <-source:
  416. events = append(events, event)
  417. default:
  418. done = true
  419. }
  420. }
  421. return events
  422. }