123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306 |
- /*
- Copyright 2016 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package attachdetach
- import (
- "fmt"
- "testing"
- "time"
- "k8s.io/api/core/v1"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/labels"
- "k8s.io/apimachinery/pkg/types"
- "k8s.io/client-go/informers"
- "k8s.io/kubernetes/pkg/controller"
- "k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
- controllervolumetesting "k8s.io/kubernetes/pkg/controller/volume/attachdetach/testing"
- "k8s.io/kubernetes/pkg/volume"
- )
- func Test_NewAttachDetachController_Positive(t *testing.T) {
- // Arrange
- fakeKubeClient := controllervolumetesting.CreateTestClient()
- informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc())
- // Act
- _, err := NewAttachDetachController(
- fakeKubeClient,
- informerFactory.Core().V1().Pods(),
- informerFactory.Core().V1().Nodes(),
- informerFactory.Core().V1().PersistentVolumeClaims(),
- informerFactory.Core().V1().PersistentVolumes(),
- informerFactory.Storage().V1beta1().CSINodes(),
- informerFactory.Storage().V1beta1().CSIDrivers(),
- nil, /* cloud */
- nil, /* plugins */
- nil, /* prober */
- false,
- 5*time.Second,
- DefaultTimerConfig)
- // Assert
- if err != nil {
- t.Fatalf("Run failed with error. Expected: <no error> Actual: <%v>", err)
- }
- }
- func Test_AttachDetachControllerStateOfWolrdPopulators_Positive(t *testing.T) {
- // Arrange
- fakeKubeClient := controllervolumetesting.CreateTestClient()
- informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc())
- podInformer := informerFactory.Core().V1().Pods()
- nodeInformer := informerFactory.Core().V1().Nodes()
- pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims()
- pvInformer := informerFactory.Core().V1().PersistentVolumes()
- adc := &attachDetachController{
- kubeClient: fakeKubeClient,
- pvcLister: pvcInformer.Lister(),
- pvcsSynced: pvcInformer.Informer().HasSynced,
- pvLister: pvInformer.Lister(),
- pvsSynced: pvInformer.Informer().HasSynced,
- podLister: podInformer.Lister(),
- podsSynced: podInformer.Informer().HasSynced,
- nodeLister: nodeInformer.Lister(),
- nodesSynced: nodeInformer.Informer().HasSynced,
- cloud: nil,
- }
- // Act
- plugins := controllervolumetesting.CreateTestPlugin()
- var prober volume.DynamicPluginProber = nil // TODO (#51147) inject mock
- if err := adc.volumePluginMgr.InitPlugins(plugins, prober, adc); err != nil {
- t.Fatalf("Could not initialize volume plugins for Attach/Detach Controller: %+v", err)
- }
- adc.actualStateOfWorld = cache.NewActualStateOfWorld(&adc.volumePluginMgr)
- adc.desiredStateOfWorld = cache.NewDesiredStateOfWorld(&adc.volumePluginMgr)
- err := adc.populateActualStateOfWorld()
- if err != nil {
- t.Fatalf("Run failed with error. Expected: <no error> Actual: <%v>", err)
- }
- err = adc.populateDesiredStateOfWorld()
- if err != nil {
- t.Fatalf("Run failed with error. Expected: <no error> Actual: %v", err)
- }
- // Test the ActualStateOfWorld contains all the node volumes
- nodes, err := adc.nodeLister.List(labels.Everything())
- if err != nil {
- t.Fatalf("Failed to list nodes in indexer. Expected: <no error> Actual: %v", err)
- }
- for _, node := range nodes {
- nodeName := types.NodeName(node.Name)
- for _, attachedVolume := range node.Status.VolumesAttached {
- found := adc.actualStateOfWorld.IsVolumeAttachedToNode(attachedVolume.Name, nodeName)
- if !found {
- t.Fatalf("Run failed with error. Node %s, volume %s not found", nodeName, attachedVolume.Name)
- }
- }
- }
- pods, err := adc.podLister.List(labels.Everything())
- if err != nil {
- t.Fatalf("Run failed with error. Expected: <no error> Actual: %v", err)
- }
- for _, pod := range pods {
- uniqueName := fmt.Sprintf("%s/%s", controllervolumetesting.TestPluginName, pod.Spec.Volumes[0].Name)
- nodeName := types.NodeName(pod.Spec.NodeName)
- found := adc.desiredStateOfWorld.VolumeExists(v1.UniqueVolumeName(uniqueName), nodeName)
- if !found {
- t.Fatalf("Run failed with error. Volume %s, node %s not found in DesiredStateOfWorld",
- pod.Spec.Volumes[0].Name,
- pod.Spec.NodeName)
- }
- }
- }
- func Test_AttachDetachControllerRecovery(t *testing.T) {
- attachDetachRecoveryTestCase(t, []*v1.Pod{}, []*v1.Pod{})
- newPod1 := controllervolumetesting.NewPodWithVolume("newpod-1", "volumeName2", "mynode-1")
- attachDetachRecoveryTestCase(t, []*v1.Pod{newPod1}, []*v1.Pod{})
- newPod1 = controllervolumetesting.NewPodWithVolume("newpod-1", "volumeName2", "mynode-1")
- attachDetachRecoveryTestCase(t, []*v1.Pod{}, []*v1.Pod{newPod1})
- newPod1 = controllervolumetesting.NewPodWithVolume("newpod-1", "volumeName2", "mynode-1")
- newPod2 := controllervolumetesting.NewPodWithVolume("newpod-2", "volumeName3", "mynode-1")
- attachDetachRecoveryTestCase(t, []*v1.Pod{newPod1}, []*v1.Pod{newPod2})
- }
- func attachDetachRecoveryTestCase(t *testing.T, extraPods1 []*v1.Pod, extraPods2 []*v1.Pod) {
- fakeKubeClient := controllervolumetesting.CreateTestClient()
- informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, time.Second*1)
- //informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, time.Second*1)
- plugins := controllervolumetesting.CreateTestPlugin()
- var prober volume.DynamicPluginProber = nil // TODO (#51147) inject mock
- nodeInformer := informerFactory.Core().V1().Nodes().Informer()
- podInformer := informerFactory.Core().V1().Pods().Informer()
- var podsNum, extraPodsNum, nodesNum, i int
- stopCh := make(chan struct{})
- pods, err := fakeKubeClient.CoreV1().Pods(v1.NamespaceAll).List(metav1.ListOptions{})
- if err != nil {
- t.Fatalf("Run failed with error. Expected: <no error> Actual: %v", err)
- }
- for _, pod := range pods.Items {
- podToAdd := pod
- podInformer.GetIndexer().Add(&podToAdd)
- podsNum++
- }
- nodes, err := fakeKubeClient.CoreV1().Nodes().List(metav1.ListOptions{})
- if err != nil {
- t.Fatalf("Run failed with error. Expected: <no error> Actual: %v", err)
- }
- for _, node := range nodes.Items {
- nodeToAdd := node
- nodeInformer.GetIndexer().Add(&nodeToAdd)
- nodesNum++
- }
- informerFactory.Start(stopCh)
- if !controller.WaitForCacheSync("attach detach", stopCh,
- informerFactory.Core().V1().Pods().Informer().HasSynced,
- informerFactory.Core().V1().Nodes().Informer().HasSynced) {
- t.Fatalf("Error waiting for the informer caches to sync")
- }
- // Make sure the nodes and pods are in the inforer cache
- i = 0
- nodeList, err := informerFactory.Core().V1().Nodes().Lister().List(labels.Everything())
- for len(nodeList) < nodesNum {
- if err != nil {
- t.Fatalf("Error getting list of nodes %v", err)
- }
- if i > 100 {
- t.Fatalf("Time out while waiting for the node informer sync: found %d nodes, expected %d nodes", len(nodeList), nodesNum)
- }
- time.Sleep(100 * time.Millisecond)
- nodeList, err = informerFactory.Core().V1().Nodes().Lister().List(labels.Everything())
- i++
- }
- i = 0
- podList, err := informerFactory.Core().V1().Pods().Lister().List(labels.Everything())
- for len(podList) < podsNum {
- if err != nil {
- t.Fatalf("Error getting list of nodes %v", err)
- }
- if i > 100 {
- t.Fatalf("Time out while waiting for the pod informer sync: found %d pods, expected %d pods", len(podList), podsNum)
- }
- time.Sleep(100 * time.Millisecond)
- podList, err = informerFactory.Core().V1().Pods().Lister().List(labels.Everything())
- i++
- }
- // Create the controller
- adcObj, err := NewAttachDetachController(
- fakeKubeClient,
- informerFactory.Core().V1().Pods(),
- informerFactory.Core().V1().Nodes(),
- informerFactory.Core().V1().PersistentVolumeClaims(),
- informerFactory.Core().V1().PersistentVolumes(),
- informerFactory.Storage().V1beta1().CSINodes(),
- informerFactory.Storage().V1beta1().CSIDrivers(),
- nil, /* cloud */
- plugins,
- prober,
- false,
- 1*time.Second,
- DefaultTimerConfig)
- if err != nil {
- t.Fatalf("Run failed with error. Expected: <no error> Actual: <%v>", err)
- }
- adc := adcObj.(*attachDetachController)
- // Populate ASW
- err = adc.populateActualStateOfWorld()
- if err != nil {
- t.Fatalf("Run failed with error. Expected: <no error> Actual: <%v>", err)
- }
- for _, newPod := range extraPods1 {
- // Add a new pod between ASW and DSW ppoulators
- _, err = adc.kubeClient.CoreV1().Pods(newPod.ObjectMeta.Namespace).Create(newPod)
- if err != nil {
- t.Fatalf("Run failed with error. Failed to create a new pod: <%v>", err)
- }
- extraPodsNum++
- podInformer.GetIndexer().Add(newPod)
- }
- // Populate DSW
- err = adc.populateDesiredStateOfWorld()
- if err != nil {
- t.Fatalf("Run failed with error. Expected: <no error> Actual: %v", err)
- }
- for _, newPod := range extraPods2 {
- // Add a new pod between DSW ppoulator and reconciler run
- _, err = adc.kubeClient.CoreV1().Pods(newPod.ObjectMeta.Namespace).Create(newPod)
- if err != nil {
- t.Fatalf("Run failed with error. Failed to create a new pod: <%v>", err)
- }
- extraPodsNum++
- podInformer.GetIndexer().Add(newPod)
- }
- go adc.reconciler.Run(stopCh)
- go adc.desiredStateOfWorldPopulator.Run(stopCh)
- defer close(stopCh)
- time.Sleep(time.Second * 1) // Wait so the reconciler calls sync at least once
- testPlugin := plugins[0].(*controllervolumetesting.TestPlugin)
- for i = 0; i <= 10; i++ {
- var attachedVolumesNum int = 0
- var detachedVolumesNum int = 0
- time.Sleep(time.Second * 1) // Wait for a second
- for _, volumeList := range testPlugin.GetAttachedVolumes() {
- attachedVolumesNum += len(volumeList)
- }
- for _, volumeList := range testPlugin.GetDetachedVolumes() {
- detachedVolumesNum += len(volumeList)
- }
- // All the "extra pods" should result in volume to be attached, the pods all share one volume
- // which should be attached (+1), the volumes found only in the nodes status should be detached
- if attachedVolumesNum == 1+extraPodsNum && detachedVolumesNum == nodesNum {
- break
- }
- if i == 10 { // 10 seconds time out
- t.Fatalf("Waiting for the volumes to attach/detach timed out: attached %d (expected %d); detached %d (%d)",
- attachedVolumesNum, 1+extraPodsNum, detachedVolumesNum, nodesNum)
- }
- }
- if testPlugin.GetErrorEncountered() {
- t.Fatalf("Fatal error encountered in the testing volume plugin")
- }
- }
|