123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449 |
- /*
- Copyright 2016 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package statefulset
- import (
- "errors"
- "strings"
- "testing"
- apierrors "k8s.io/apimachinery/pkg/api/errors"
- "k8s.io/apimachinery/pkg/runtime"
- core "k8s.io/client-go/testing"
- "k8s.io/client-go/tools/cache"
- "k8s.io/client-go/tools/record"
- "k8s.io/api/core/v1"
- "k8s.io/client-go/kubernetes/fake"
- corelisters "k8s.io/client-go/listers/core/v1"
- _ "k8s.io/kubernetes/pkg/apis/apps/install"
- _ "k8s.io/kubernetes/pkg/apis/core/install"
- )
- func TestStatefulPodControlCreatesPods(t *testing.T) {
- recorder := record.NewFakeRecorder(10)
- set := newStatefulSet(3)
- pod := newStatefulSetPod(set, 0)
- fakeClient := &fake.Clientset{}
- pvcIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
- pvcLister := corelisters.NewPersistentVolumeClaimLister(pvcIndexer)
- control := NewRealStatefulPodControl(fakeClient, nil, nil, pvcLister, recorder)
- fakeClient.AddReactor("get", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
- return true, nil, apierrors.NewNotFound(action.GetResource().GroupResource(), action.GetResource().Resource)
- })
- fakeClient.AddReactor("create", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
- create := action.(core.CreateAction)
- return true, create.GetObject(), nil
- })
- fakeClient.AddReactor("create", "pods", func(action core.Action) (bool, runtime.Object, error) {
- create := action.(core.CreateAction)
- return true, create.GetObject(), nil
- })
- if err := control.CreateStatefulPod(set, pod); err != nil {
- t.Errorf("StatefulPodControl failed to create Pod error: %s", err)
- }
- events := collectEvents(recorder.Events)
- if eventCount := len(events); eventCount != 2 {
- t.Errorf("Expected 2 events for successful create found %d", eventCount)
- }
- for i := range events {
- if !strings.Contains(events[i], v1.EventTypeNormal) {
- t.Errorf("Found unexpected non-normal event %s", events[i])
- }
- }
- }
- func TestStatefulPodControlCreatePodExists(t *testing.T) {
- recorder := record.NewFakeRecorder(10)
- set := newStatefulSet(3)
- pod := newStatefulSetPod(set, 0)
- fakeClient := &fake.Clientset{}
- pvcs := getPersistentVolumeClaims(set, pod)
- pvcIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
- for k := range pvcs {
- pvc := pvcs[k]
- pvcIndexer.Add(&pvc)
- }
- pvcLister := corelisters.NewPersistentVolumeClaimLister(pvcIndexer)
- control := NewRealStatefulPodControl(fakeClient, nil, nil, pvcLister, recorder)
- fakeClient.AddReactor("create", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
- create := action.(core.CreateAction)
- return true, create.GetObject(), nil
- })
- fakeClient.AddReactor("create", "pods", func(action core.Action) (bool, runtime.Object, error) {
- return true, pod, apierrors.NewAlreadyExists(action.GetResource().GroupResource(), pod.Name)
- })
- if err := control.CreateStatefulPod(set, pod); !apierrors.IsAlreadyExists(err) {
- t.Errorf("Failed to create Pod error: %s", err)
- }
- events := collectEvents(recorder.Events)
- if eventCount := len(events); eventCount != 0 {
- t.Errorf("Pod and PVC exist: got %d events, but want 0", eventCount)
- for i := range events {
- t.Log(events[i])
- }
- }
- }
- func TestStatefulPodControlCreatePodPvcCreateFailure(t *testing.T) {
- recorder := record.NewFakeRecorder(10)
- set := newStatefulSet(3)
- pod := newStatefulSetPod(set, 0)
- fakeClient := &fake.Clientset{}
- pvcIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
- pvcLister := corelisters.NewPersistentVolumeClaimLister(pvcIndexer)
- control := NewRealStatefulPodControl(fakeClient, nil, nil, pvcLister, recorder)
- fakeClient.AddReactor("create", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
- return true, nil, apierrors.NewInternalError(errors.New("API server down"))
- })
- fakeClient.AddReactor("create", "pods", func(action core.Action) (bool, runtime.Object, error) {
- create := action.(core.CreateAction)
- return true, create.GetObject(), nil
- })
- if err := control.CreateStatefulPod(set, pod); err == nil {
- t.Error("Failed to produce error on PVC creation failure")
- }
- events := collectEvents(recorder.Events)
- if eventCount := len(events); eventCount != 2 {
- t.Errorf("PVC create failure: got %d events, but want 2", eventCount)
- }
- for i := range events {
- if !strings.Contains(events[i], v1.EventTypeWarning) {
- t.Errorf("Found unexpected non-warning event %s", events[i])
- }
- }
- }
- type fakeIndexer struct {
- cache.Indexer
- getError error
- }
- func (f *fakeIndexer) GetByKey(key string) (interface{}, bool, error) {
- return nil, false, f.getError
- }
- func TestStatefulPodControlCreatePodPvcGetFailure(t *testing.T) {
- recorder := record.NewFakeRecorder(10)
- set := newStatefulSet(3)
- pod := newStatefulSetPod(set, 0)
- fakeClient := &fake.Clientset{}
- pvcIndexer := &fakeIndexer{getError: errors.New("API server down")}
- pvcLister := corelisters.NewPersistentVolumeClaimLister(pvcIndexer)
- control := NewRealStatefulPodControl(fakeClient, nil, nil, pvcLister, recorder)
- fakeClient.AddReactor("create", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
- return true, nil, apierrors.NewInternalError(errors.New("API server down"))
- })
- fakeClient.AddReactor("create", "pods", func(action core.Action) (bool, runtime.Object, error) {
- create := action.(core.CreateAction)
- return true, create.GetObject(), nil
- })
- if err := control.CreateStatefulPod(set, pod); err == nil {
- t.Error("Failed to produce error on PVC creation failure")
- }
- events := collectEvents(recorder.Events)
- if eventCount := len(events); eventCount != 2 {
- t.Errorf("PVC create failure: got %d events, but want 2", eventCount)
- }
- for i := range events {
- if !strings.Contains(events[i], v1.EventTypeWarning) {
- t.Errorf("Found unexpected non-warning event: %s", events[i])
- }
- }
- }
- func TestStatefulPodControlCreatePodFailed(t *testing.T) {
- recorder := record.NewFakeRecorder(10)
- set := newStatefulSet(3)
- pod := newStatefulSetPod(set, 0)
- fakeClient := &fake.Clientset{}
- pvcIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
- pvcLister := corelisters.NewPersistentVolumeClaimLister(pvcIndexer)
- control := NewRealStatefulPodControl(fakeClient, nil, nil, pvcLister, recorder)
- fakeClient.AddReactor("create", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
- create := action.(core.CreateAction)
- return true, create.GetObject(), nil
- })
- fakeClient.AddReactor("create", "pods", func(action core.Action) (bool, runtime.Object, error) {
- return true, nil, apierrors.NewInternalError(errors.New("API server down"))
- })
- if err := control.CreateStatefulPod(set, pod); err == nil {
- t.Error("Failed to produce error on Pod creation failure")
- }
- events := collectEvents(recorder.Events)
- if eventCount := len(events); eventCount != 2 {
- t.Errorf("Pod create failed: got %d events, but want 2", eventCount)
- } else if !strings.Contains(events[0], v1.EventTypeNormal) {
- t.Errorf("Found unexpected non-normal event %s", events[0])
- } else if !strings.Contains(events[1], v1.EventTypeWarning) {
- t.Errorf("Found unexpected non-warning event %s", events[1])
- }
- }
- func TestStatefulPodControlNoOpUpdate(t *testing.T) {
- recorder := record.NewFakeRecorder(10)
- set := newStatefulSet(3)
- pod := newStatefulSetPod(set, 0)
- fakeClient := &fake.Clientset{}
- control := NewRealStatefulPodControl(fakeClient, nil, nil, nil, recorder)
- fakeClient.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) {
- t.Error("no-op update should not make any client invocation")
- return true, nil, apierrors.NewInternalError(errors.New("If we are here we have a problem"))
- })
- if err := control.UpdateStatefulPod(set, pod); err != nil {
- t.Errorf("Error returned on no-op update error: %s", err)
- }
- events := collectEvents(recorder.Events)
- if eventCount := len(events); eventCount != 0 {
- t.Errorf("no-op update: got %d events, but want 0", eventCount)
- }
- }
- func TestStatefulPodControlUpdatesIdentity(t *testing.T) {
- recorder := record.NewFakeRecorder(10)
- set := newStatefulSet(3)
- pod := newStatefulSetPod(set, 0)
- fakeClient := fake.NewSimpleClientset(set, pod)
- control := NewRealStatefulPodControl(fakeClient, nil, nil, nil, recorder)
- var updated *v1.Pod
- fakeClient.PrependReactor("update", "pods", func(action core.Action) (bool, runtime.Object, error) {
- update := action.(core.UpdateAction)
- updated = update.GetObject().(*v1.Pod)
- return true, update.GetObject(), nil
- })
- pod.Name = "goo-0"
- if err := control.UpdateStatefulPod(set, pod); err != nil {
- t.Errorf("Successful update returned an error: %s", err)
- }
- events := collectEvents(recorder.Events)
- if eventCount := len(events); eventCount != 1 {
- t.Errorf("Pod update successful:got %d events,but want 1", eventCount)
- } else if !strings.Contains(events[0], v1.EventTypeNormal) {
- t.Errorf("Found unexpected non-normal event %s", events[0])
- }
- if !identityMatches(set, updated) {
- t.Error("Name update failed identity does not match")
- }
- }
- func TestStatefulPodControlUpdateIdentityFailure(t *testing.T) {
- recorder := record.NewFakeRecorder(10)
- set := newStatefulSet(3)
- pod := newStatefulSetPod(set, 0)
- fakeClient := &fake.Clientset{}
- indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
- gooPod := newStatefulSetPod(set, 0)
- gooPod.Name = "goo-0"
- indexer.Add(gooPod)
- podLister := corelisters.NewPodLister(indexer)
- control := NewRealStatefulPodControl(fakeClient, nil, podLister, nil, recorder)
- fakeClient.AddReactor("update", "pods", func(action core.Action) (bool, runtime.Object, error) {
- pod.Name = "goo-0"
- return true, nil, apierrors.NewInternalError(errors.New("API server down"))
- })
- pod.Name = "goo-0"
- if err := control.UpdateStatefulPod(set, pod); err == nil {
- t.Error("Failed update does not generate an error")
- }
- events := collectEvents(recorder.Events)
- if eventCount := len(events); eventCount != 1 {
- t.Errorf("Pod update failed: got %d events, but want 1", eventCount)
- } else if !strings.Contains(events[0], v1.EventTypeWarning) {
- t.Errorf("Found unexpected non-warning event %s", events[0])
- }
- if identityMatches(set, pod) {
- t.Error("Failed update mutated Pod identity")
- }
- }
- func TestStatefulPodControlUpdatesPodStorage(t *testing.T) {
- recorder := record.NewFakeRecorder(10)
- set := newStatefulSet(3)
- pod := newStatefulSetPod(set, 0)
- fakeClient := &fake.Clientset{}
- pvcIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
- pvcLister := corelisters.NewPersistentVolumeClaimLister(pvcIndexer)
- control := NewRealStatefulPodControl(fakeClient, nil, nil, pvcLister, recorder)
- pvcs := getPersistentVolumeClaims(set, pod)
- volumes := make([]v1.Volume, 0, len(pod.Spec.Volumes))
- for i := range pod.Spec.Volumes {
- if _, contains := pvcs[pod.Spec.Volumes[i].Name]; !contains {
- volumes = append(volumes, pod.Spec.Volumes[i])
- }
- }
- pod.Spec.Volumes = volumes
- fakeClient.AddReactor("update", "pods", func(action core.Action) (bool, runtime.Object, error) {
- update := action.(core.UpdateAction)
- return true, update.GetObject(), nil
- })
- fakeClient.AddReactor("create", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
- update := action.(core.UpdateAction)
- return true, update.GetObject(), nil
- })
- var updated *v1.Pod
- fakeClient.PrependReactor("update", "pods", func(action core.Action) (bool, runtime.Object, error) {
- update := action.(core.UpdateAction)
- updated = update.GetObject().(*v1.Pod)
- return true, update.GetObject(), nil
- })
- if err := control.UpdateStatefulPod(set, pod); err != nil {
- t.Errorf("Successful update returned an error: %s", err)
- }
- events := collectEvents(recorder.Events)
- if eventCount := len(events); eventCount != 2 {
- t.Errorf("Pod storage update successful: got %d events, but want 2", eventCount)
- }
- for i := range events {
- if !strings.Contains(events[i], v1.EventTypeNormal) {
- t.Errorf("Found unexpected non-normal event %s", events[i])
- }
- }
- if !storageMatches(set, updated) {
- t.Error("Name update failed identity does not match")
- }
- }
- func TestStatefulPodControlUpdatePodStorageFailure(t *testing.T) {
- recorder := record.NewFakeRecorder(10)
- set := newStatefulSet(3)
- pod := newStatefulSetPod(set, 0)
- fakeClient := &fake.Clientset{}
- pvcIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
- pvcLister := corelisters.NewPersistentVolumeClaimLister(pvcIndexer)
- control := NewRealStatefulPodControl(fakeClient, nil, nil, pvcLister, recorder)
- pvcs := getPersistentVolumeClaims(set, pod)
- volumes := make([]v1.Volume, 0, len(pod.Spec.Volumes))
- for i := range pod.Spec.Volumes {
- if _, contains := pvcs[pod.Spec.Volumes[i].Name]; !contains {
- volumes = append(volumes, pod.Spec.Volumes[i])
- }
- }
- pod.Spec.Volumes = volumes
- fakeClient.AddReactor("update", "pods", func(action core.Action) (bool, runtime.Object, error) {
- update := action.(core.UpdateAction)
- return true, update.GetObject(), nil
- })
- fakeClient.AddReactor("create", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
- return true, nil, apierrors.NewInternalError(errors.New("API server down"))
- })
- if err := control.UpdateStatefulPod(set, pod); err == nil {
- t.Error("Failed Pod storage update did not return an error")
- }
- events := collectEvents(recorder.Events)
- if eventCount := len(events); eventCount != 2 {
- t.Errorf("Pod storage update failed: got %d events, but want 2", eventCount)
- }
- for i := range events {
- if !strings.Contains(events[i], v1.EventTypeWarning) {
- t.Errorf("Found unexpected non-normal event %s", events[i])
- }
- }
- }
- func TestStatefulPodControlUpdatePodConflictSuccess(t *testing.T) {
- recorder := record.NewFakeRecorder(10)
- set := newStatefulSet(3)
- pod := newStatefulSetPod(set, 0)
- fakeClient := &fake.Clientset{}
- indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
- gooPod := newStatefulSetPod(set, 0)
- gooPod.Name = "goo-0"
- indexer.Add(gooPod)
- podLister := corelisters.NewPodLister(indexer)
- control := NewRealStatefulPodControl(fakeClient, nil, podLister, nil, recorder)
- conflict := false
- fakeClient.AddReactor("update", "pods", func(action core.Action) (bool, runtime.Object, error) {
- update := action.(core.UpdateAction)
- if !conflict {
- conflict = true
- return true, update.GetObject(), apierrors.NewConflict(action.GetResource().GroupResource(), pod.Name, errors.New("conflict"))
- } else {
- return true, update.GetObject(), nil
- }
- })
- pod.Name = "goo-0"
- if err := control.UpdateStatefulPod(set, pod); err != nil {
- t.Errorf("Successful update returned an error: %s", err)
- }
- events := collectEvents(recorder.Events)
- if eventCount := len(events); eventCount != 1 {
- t.Errorf("Pod update successful: got %d, but want 1", eventCount)
- } else if !strings.Contains(events[0], v1.EventTypeNormal) {
- t.Errorf("Found unexpected non-normal event %s", events[0])
- }
- if !identityMatches(set, pod) {
- t.Error("Name update failed identity does not match")
- }
- }
- func TestStatefulPodControlDeletesStatefulPod(t *testing.T) {
- recorder := record.NewFakeRecorder(10)
- set := newStatefulSet(3)
- pod := newStatefulSetPod(set, 0)
- fakeClient := &fake.Clientset{}
- control := NewRealStatefulPodControl(fakeClient, nil, nil, nil, recorder)
- fakeClient.AddReactor("delete", "pods", func(action core.Action) (bool, runtime.Object, error) {
- return true, nil, nil
- })
- if err := control.DeleteStatefulPod(set, pod); err != nil {
- t.Errorf("Error returned on successful delete: %s", err)
- }
- events := collectEvents(recorder.Events)
- if eventCount := len(events); eventCount != 1 {
- t.Errorf("delete successful: got %d events, but want 1", eventCount)
- } else if !strings.Contains(events[0], v1.EventTypeNormal) {
- t.Errorf("Found unexpected non-normal event %s", events[0])
- }
- }
- func TestStatefulPodControlDeleteFailure(t *testing.T) {
- recorder := record.NewFakeRecorder(10)
- set := newStatefulSet(3)
- pod := newStatefulSetPod(set, 0)
- fakeClient := &fake.Clientset{}
- control := NewRealStatefulPodControl(fakeClient, nil, nil, nil, recorder)
- fakeClient.AddReactor("delete", "pods", func(action core.Action) (bool, runtime.Object, error) {
- return true, nil, apierrors.NewInternalError(errors.New("API server down"))
- })
- if err := control.DeleteStatefulPod(set, pod); err == nil {
- t.Error("Failed to return error on failed delete")
- }
- events := collectEvents(recorder.Events)
- if eventCount := len(events); eventCount != 1 {
- t.Errorf("delete failed: got %d events, but want 1", eventCount)
- } else if !strings.Contains(events[0], v1.EventTypeWarning) {
- t.Errorf("Found unexpected non-warning event %s", events[0])
- }
- }
- func collectEvents(source <-chan string) []string {
- done := false
- events := make([]string, 0)
- for !done {
- select {
- case event := <-source:
- events = append(events, event)
- default:
- done = true
- }
- }
- return events
- }
|