123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260 |
- /*
- Copyright 2016 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package reconciler
- import (
- "fmt"
- "testing"
- "time"
- "github.com/stretchr/testify/assert"
- "k8s.io/api/core/v1"
- "k8s.io/apimachinery/pkg/api/resource"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/runtime"
- k8stypes "k8s.io/apimachinery/pkg/types"
- "k8s.io/apimachinery/pkg/util/wait"
- utilfeature "k8s.io/apiserver/pkg/util/feature"
- "k8s.io/client-go/kubernetes/fake"
- core "k8s.io/client-go/testing"
- "k8s.io/client-go/tools/record"
- featuregatetesting "k8s.io/component-base/featuregate/testing"
- "k8s.io/klog"
- "k8s.io/kubernetes/pkg/features"
- "k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
- "k8s.io/kubernetes/pkg/util/mount"
- "k8s.io/kubernetes/pkg/volume"
- volumetesting "k8s.io/kubernetes/pkg/volume/testing"
- "k8s.io/kubernetes/pkg/volume/util"
- "k8s.io/kubernetes/pkg/volume/util/operationexecutor"
- )
- const (
- // reconcilerLoopSleepDuration is the amount of time the reconciler loop
- // waits between successive executions
- reconcilerLoopSleepDuration time.Duration = 1 * time.Nanosecond
- // waitForAttachTimeout is the maximum amount of time a
- // operationexecutor.Mount call will wait for a volume to be attached.
- waitForAttachTimeout time.Duration = 1 * time.Second
- nodeName k8stypes.NodeName = k8stypes.NodeName("mynodename")
- kubeletPodsDir string = "fake-dir"
- )
- func hasAddedPods() bool { return true }
- // Calls Run()
- // Verifies there are no calls to attach, detach, mount, unmount, etc.
- func Test_Run_Positive_DoNothing(t *testing.T) {
- // Arrange
- volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
- dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
- asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
- kubeClient := createTestClient()
- fakeRecorder := &record.FakeRecorder{}
- fakeHandler := volumetesting.NewBlockVolumePathHandler()
- oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
- kubeClient,
- volumePluginMgr,
- fakeRecorder,
- false, /* checkNodeCapabilitiesBeforeMount */
- fakeHandler,
- ))
- reconciler := NewReconciler(
- kubeClient,
- false, /* controllerAttachDetachEnabled */
- reconcilerLoopSleepDuration,
- waitForAttachTimeout,
- nodeName,
- dsw,
- asw,
- hasAddedPods,
- oex,
- &mount.FakeMounter{},
- volumePluginMgr,
- kubeletPodsDir)
- // Act
- runReconciler(reconciler)
- // Assert
- assert.NoError(t, volumetesting.VerifyZeroAttachCalls(fakePlugin))
- assert.NoError(t, volumetesting.VerifyZeroWaitForAttachCallCount(fakePlugin))
- assert.NoError(t, volumetesting.VerifyZeroMountDeviceCallCount(fakePlugin))
- assert.NoError(t, volumetesting.VerifyZeroSetUpCallCount(fakePlugin))
- assert.NoError(t, volumetesting.VerifyZeroTearDownCallCount(fakePlugin))
- assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
- }
- // Populates desiredStateOfWorld cache with one volume/pod.
- // Calls Run()
- // Verifies there is are attach/mount/etc calls and no detach/unmount calls.
- func Test_Run_Positive_VolumeAttachAndMount(t *testing.T) {
- // Arrange
- volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
- dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
- asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
- kubeClient := createTestClient()
- fakeRecorder := &record.FakeRecorder{}
- fakeHandler := volumetesting.NewBlockVolumePathHandler()
- oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
- kubeClient,
- volumePluginMgr,
- fakeRecorder,
- false, /* checkNodeCapabilitiesBeforeMount */
- fakeHandler))
- reconciler := NewReconciler(
- kubeClient,
- false, /* controllerAttachDetachEnabled */
- reconcilerLoopSleepDuration,
- waitForAttachTimeout,
- nodeName,
- dsw,
- asw,
- hasAddedPods,
- oex,
- &mount.FakeMounter{},
- volumePluginMgr,
- kubeletPodsDir)
- pod := &v1.Pod{
- ObjectMeta: metav1.ObjectMeta{
- Name: "pod1",
- UID: "pod1uid",
- },
- Spec: v1.PodSpec{
- Volumes: []v1.Volume{
- {
- Name: "volume-name",
- VolumeSource: v1.VolumeSource{
- GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
- PDName: "fake-device1",
- },
- },
- },
- },
- },
- }
- volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
- podName := util.GetUniquePodName(pod)
- generatedVolumeName, err := dsw.AddPodToVolume(
- podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
- // Assert
- if err != nil {
- t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
- }
- // Act
- runReconciler(reconciler)
- waitForMount(t, fakePlugin, generatedVolumeName, asw)
- // Assert
- assert.NoError(t, volumetesting.VerifyAttachCallCount(
- 1 /* expectedAttachCallCount */, fakePlugin))
- assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount(
- 1 /* expectedWaitForAttachCallCount */, fakePlugin))
- assert.NoError(t, volumetesting.VerifyMountDeviceCallCount(
- 1 /* expectedMountDeviceCallCount */, fakePlugin))
- assert.NoError(t, volumetesting.VerifySetUpCallCount(
- 1 /* expectedSetUpCallCount */, fakePlugin))
- assert.NoError(t, volumetesting.VerifyZeroTearDownCallCount(fakePlugin))
- assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
- }
- // Populates desiredStateOfWorld cache with one volume/pod.
- // Enables controllerAttachDetachEnabled.
- // Calls Run()
- // Verifies there is one mount call and no unmount calls.
- // Verifies there are no attach/detach calls.
- func Test_Run_Positive_VolumeMountControllerAttachEnabled(t *testing.T) {
- // Arrange
- volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
- dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
- asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
- kubeClient := createTestClient()
- fakeRecorder := &record.FakeRecorder{}
- fakeHandler := volumetesting.NewBlockVolumePathHandler()
- oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
- kubeClient,
- volumePluginMgr,
- fakeRecorder,
- false, /* checkNodeCapabilitiesBeforeMount */
- fakeHandler))
- reconciler := NewReconciler(
- kubeClient,
- true, /* controllerAttachDetachEnabled */
- reconcilerLoopSleepDuration,
- waitForAttachTimeout,
- nodeName,
- dsw,
- asw,
- hasAddedPods,
- oex,
- &mount.FakeMounter{},
- volumePluginMgr,
- kubeletPodsDir)
- pod := &v1.Pod{
- ObjectMeta: metav1.ObjectMeta{
- Name: "pod1",
- UID: "pod1uid",
- },
- Spec: v1.PodSpec{
- Volumes: []v1.Volume{
- {
- Name: "volume-name",
- VolumeSource: v1.VolumeSource{
- GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
- PDName: "fake-device1",
- },
- },
- },
- },
- },
- }
- volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
- podName := util.GetUniquePodName(pod)
- generatedVolumeName, err := dsw.AddPodToVolume(
- podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
- dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeName})
- // Assert
- if err != nil {
- t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
- }
- // Act
- runReconciler(reconciler)
- waitForMount(t, fakePlugin, generatedVolumeName, asw)
- // Assert
- assert.NoError(t, volumetesting.VerifyZeroAttachCalls(fakePlugin))
- assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount(
- 1 /* expectedWaitForAttachCallCount */, fakePlugin))
- assert.NoError(t, volumetesting.VerifyMountDeviceCallCount(
- 1 /* expectedMountDeviceCallCount */, fakePlugin))
- assert.NoError(t, volumetesting.VerifySetUpCallCount(
- 1 /* expectedSetUpCallCount */, fakePlugin))
- assert.NoError(t, volumetesting.VerifyZeroTearDownCallCount(fakePlugin))
- assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
- }
- // Populates desiredStateOfWorld cache with one volume/pod.
- // Calls Run()
- // Verifies there is one attach/mount/etc call and no detach calls.
- // Deletes volume/pod from desired state of world.
- // Verifies detach/unmount calls are issued.
- func Test_Run_Positive_VolumeAttachMountUnmountDetach(t *testing.T) {
- // Arrange
- volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
- dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
- asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
- kubeClient := createTestClient()
- fakeRecorder := &record.FakeRecorder{}
- fakeHandler := volumetesting.NewBlockVolumePathHandler()
- oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
- kubeClient,
- volumePluginMgr,
- fakeRecorder,
- false, /* checkNodeCapabilitiesBeforeMount */
- fakeHandler))
- reconciler := NewReconciler(
- kubeClient,
- false, /* controllerAttachDetachEnabled */
- reconcilerLoopSleepDuration,
- waitForAttachTimeout,
- nodeName,
- dsw,
- asw,
- hasAddedPods,
- oex,
- &mount.FakeMounter{},
- volumePluginMgr,
- kubeletPodsDir)
- pod := &v1.Pod{
- ObjectMeta: metav1.ObjectMeta{
- Name: "pod1",
- UID: "pod1uid",
- },
- Spec: v1.PodSpec{
- Volumes: []v1.Volume{
- {
- Name: "volume-name",
- VolumeSource: v1.VolumeSource{
- GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
- PDName: "fake-device1",
- },
- },
- },
- },
- },
- }
- volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
- podName := util.GetUniquePodName(pod)
- generatedVolumeName, err := dsw.AddPodToVolume(
- podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
- // Assert
- if err != nil {
- t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
- }
- // Act
- runReconciler(reconciler)
- waitForMount(t, fakePlugin, generatedVolumeName, asw)
- // Assert
- assert.NoError(t, volumetesting.VerifyAttachCallCount(
- 1 /* expectedAttachCallCount */, fakePlugin))
- assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount(
- 1 /* expectedWaitForAttachCallCount */, fakePlugin))
- assert.NoError(t, volumetesting.VerifyMountDeviceCallCount(
- 1 /* expectedMountDeviceCallCount */, fakePlugin))
- assert.NoError(t, volumetesting.VerifySetUpCallCount(
- 1 /* expectedSetUpCallCount */, fakePlugin))
- assert.NoError(t, volumetesting.VerifyZeroTearDownCallCount(fakePlugin))
- assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
- // Act
- dsw.DeletePodFromVolume(podName, generatedVolumeName)
- waitForDetach(t, fakePlugin, generatedVolumeName, asw)
- // Assert
- assert.NoError(t, volumetesting.VerifyTearDownCallCount(
- 1 /* expectedTearDownCallCount */, fakePlugin))
- assert.NoError(t, volumetesting.VerifyDetachCallCount(
- 1 /* expectedDetachCallCount */, fakePlugin))
- }
- // Populates desiredStateOfWorld cache with one volume/pod.
- // Enables controllerAttachDetachEnabled.
- // Calls Run()
- // Verifies one mount call is made and no unmount calls.
- // Deletes volume/pod from desired state of world.
- // Verifies one unmount call is made.
- // Verifies there are no attach/detach calls made.
- func Test_Run_Positive_VolumeUnmountControllerAttachEnabled(t *testing.T) {
- // Arrange
- volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
- dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
- asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
- kubeClient := createTestClient()
- fakeRecorder := &record.FakeRecorder{}
- fakeHandler := volumetesting.NewBlockVolumePathHandler()
- oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
- kubeClient,
- volumePluginMgr,
- fakeRecorder,
- false, /* checkNodeCapabilitiesBeforeMount */
- fakeHandler))
- reconciler := NewReconciler(
- kubeClient,
- true, /* controllerAttachDetachEnabled */
- reconcilerLoopSleepDuration,
- waitForAttachTimeout,
- nodeName,
- dsw,
- asw,
- hasAddedPods,
- oex,
- &mount.FakeMounter{},
- volumePluginMgr,
- kubeletPodsDir)
- pod := &v1.Pod{
- ObjectMeta: metav1.ObjectMeta{
- Name: "pod1",
- UID: "pod1uid",
- },
- Spec: v1.PodSpec{
- Volumes: []v1.Volume{
- {
- Name: "volume-name",
- VolumeSource: v1.VolumeSource{
- GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
- PDName: "fake-device1",
- },
- },
- },
- },
- },
- }
- volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
- podName := util.GetUniquePodName(pod)
- generatedVolumeName, err := dsw.AddPodToVolume(
- podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
- // Assert
- if err != nil {
- t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
- }
- // Act
- runReconciler(reconciler)
- dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeName})
- waitForMount(t, fakePlugin, generatedVolumeName, asw)
- // Assert
- assert.NoError(t, volumetesting.VerifyZeroAttachCalls(fakePlugin))
- assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount(
- 1 /* expectedWaitForAttachCallCount */, fakePlugin))
- assert.NoError(t, volumetesting.VerifyMountDeviceCallCount(
- 1 /* expectedMountDeviceCallCount */, fakePlugin))
- assert.NoError(t, volumetesting.VerifySetUpCallCount(
- 1 /* expectedSetUpCallCount */, fakePlugin))
- assert.NoError(t, volumetesting.VerifyZeroTearDownCallCount(fakePlugin))
- assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
- // Act
- dsw.DeletePodFromVolume(podName, generatedVolumeName)
- waitForDetach(t, fakePlugin, generatedVolumeName, asw)
- // Assert
- assert.NoError(t, volumetesting.VerifyTearDownCallCount(
- 1 /* expectedTearDownCallCount */, fakePlugin))
- assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
- }
- // Populates desiredStateOfWorld cache with one volume/pod.
- // Calls Run()
- // Verifies there are attach/get map paths/setupDevice calls and
- // no detach/teardownDevice calls.
- func Test_Run_Positive_VolumeAttachAndMap(t *testing.T) {
- // Enable BlockVolume feature gate
- defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BlockVolume, true)()
- // Arrange
- volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
- dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
- asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
- kubeClient := createTestClient()
- fakeRecorder := &record.FakeRecorder{}
- fakeHandler := volumetesting.NewBlockVolumePathHandler()
- oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
- kubeClient,
- volumePluginMgr,
- fakeRecorder,
- false, /* checkNodeCapabilitiesBeforeMount */
- fakeHandler))
- reconciler := NewReconciler(
- kubeClient,
- false, /* controllerAttachDetachEnabled */
- reconcilerLoopSleepDuration,
- waitForAttachTimeout,
- nodeName,
- dsw,
- asw,
- hasAddedPods,
- oex,
- &mount.FakeMounter{},
- volumePluginMgr,
- kubeletPodsDir)
- pod := &v1.Pod{
- ObjectMeta: metav1.ObjectMeta{
- Name: "pod1",
- UID: "pod1uid",
- },
- Spec: v1.PodSpec{},
- }
- mode := v1.PersistentVolumeBlock
- gcepv := &v1.PersistentVolume{
- ObjectMeta: metav1.ObjectMeta{UID: "001", Name: "volume-name"},
- Spec: v1.PersistentVolumeSpec{
- Capacity: v1.ResourceList{v1.ResourceName(v1.ResourceStorage): resource.MustParse("10G")},
- PersistentVolumeSource: v1.PersistentVolumeSource{GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{PDName: "fake-device1"}},
- AccessModes: []v1.PersistentVolumeAccessMode{
- v1.ReadWriteOnce,
- v1.ReadOnlyMany,
- },
- VolumeMode: &mode,
- },
- }
- volumeSpec := &volume.Spec{
- PersistentVolume: gcepv,
- }
- podName := util.GetUniquePodName(pod)
- generatedVolumeName, err := dsw.AddPodToVolume(
- podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
- // Assert
- if err != nil {
- t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
- }
- // Act
- runReconciler(reconciler)
- waitForMount(t, fakePlugin, generatedVolumeName, asw)
- // Assert
- assert.NoError(t, volumetesting.VerifyAttachCallCount(
- 1 /* expectedAttachCallCount */, fakePlugin))
- assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount(
- 1 /* expectedWaitForAttachCallCount */, fakePlugin))
- assert.NoError(t, volumetesting.VerifyGetMapDeviceCallCount(
- 1 /* expectedGetMapDeviceCallCount */, fakePlugin))
- assert.NoError(t, volumetesting.VerifyZeroTearDownDeviceCallCount(fakePlugin))
- assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
- }
- // Populates desiredStateOfWorld cache with one volume/pod.
- // Enables controllerAttachDetachEnabled.
- // Calls Run()
- // Verifies there are two get map path calls, a setupDevice call
- // and no teardownDevice call.
- // Verifies there are no attach/detach calls.
- func Test_Run_Positive_BlockVolumeMapControllerAttachEnabled(t *testing.T) {
- // Enable BlockVolume feature gate
- defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BlockVolume, true)()
- // Arrange
- volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
- dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
- asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
- kubeClient := createTestClient()
- fakeRecorder := &record.FakeRecorder{}
- fakeHandler := volumetesting.NewBlockVolumePathHandler()
- oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
- kubeClient,
- volumePluginMgr,
- fakeRecorder,
- false, /* checkNodeCapabilitiesBeforeMount */
- fakeHandler))
- reconciler := NewReconciler(
- kubeClient,
- true, /* controllerAttachDetachEnabled */
- reconcilerLoopSleepDuration,
- waitForAttachTimeout,
- nodeName,
- dsw,
- asw,
- hasAddedPods,
- oex,
- &mount.FakeMounter{},
- volumePluginMgr,
- kubeletPodsDir)
- pod := &v1.Pod{
- ObjectMeta: metav1.ObjectMeta{
- Name: "pod1",
- UID: "pod1uid",
- },
- Spec: v1.PodSpec{},
- }
- mode := v1.PersistentVolumeBlock
- gcepv := &v1.PersistentVolume{
- ObjectMeta: metav1.ObjectMeta{UID: "001", Name: "volume-name"},
- Spec: v1.PersistentVolumeSpec{
- Capacity: v1.ResourceList{v1.ResourceName(v1.ResourceStorage): resource.MustParse("10G")},
- PersistentVolumeSource: v1.PersistentVolumeSource{GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{PDName: "fake-device1"}},
- AccessModes: []v1.PersistentVolumeAccessMode{
- v1.ReadWriteOnce,
- v1.ReadOnlyMany,
- },
- VolumeMode: &mode,
- },
- }
- volumeSpec := &volume.Spec{
- PersistentVolume: gcepv,
- }
- podName := util.GetUniquePodName(pod)
- generatedVolumeName, err := dsw.AddPodToVolume(
- podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
- dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeName})
- // Assert
- if err != nil {
- t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
- }
- // Act
- runReconciler(reconciler)
- waitForMount(t, fakePlugin, generatedVolumeName, asw)
- // Assert
- assert.NoError(t, volumetesting.VerifyZeroAttachCalls(fakePlugin))
- assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount(
- 1 /* expectedWaitForAttachCallCount */, fakePlugin))
- assert.NoError(t, volumetesting.VerifyGetMapDeviceCallCount(
- 1 /* expectedGetMapDeviceCallCount */, fakePlugin))
- assert.NoError(t, volumetesting.VerifyZeroTearDownDeviceCallCount(fakePlugin))
- assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
- }
- // Populates desiredStateOfWorld cache with one volume/pod.
- // Calls Run()
- // Verifies there is one attach call, two get map path calls,
- // setupDevice call and no detach calls.
- // Deletes volume/pod from desired state of world.
- // Verifies one detach/teardownDevice calls are issued.
- func Test_Run_Positive_BlockVolumeAttachMapUnmapDetach(t *testing.T) {
- // Enable BlockVolume feature gate
- defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BlockVolume, true)()
- // Arrange
- volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
- dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
- asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
- kubeClient := createTestClient()
- fakeRecorder := &record.FakeRecorder{}
- fakeHandler := volumetesting.NewBlockVolumePathHandler()
- oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
- kubeClient,
- volumePluginMgr,
- fakeRecorder,
- false, /* checkNodeCapabilitiesBeforeMount */
- fakeHandler))
- reconciler := NewReconciler(
- kubeClient,
- false, /* controllerAttachDetachEnabled */
- reconcilerLoopSleepDuration,
- waitForAttachTimeout,
- nodeName,
- dsw,
- asw,
- hasAddedPods,
- oex,
- &mount.FakeMounter{},
- volumePluginMgr,
- kubeletPodsDir)
- pod := &v1.Pod{
- ObjectMeta: metav1.ObjectMeta{
- Name: "pod1",
- UID: "pod1uid",
- },
- Spec: v1.PodSpec{},
- }
- mode := v1.PersistentVolumeBlock
- gcepv := &v1.PersistentVolume{
- ObjectMeta: metav1.ObjectMeta{UID: "001", Name: "volume-name"},
- Spec: v1.PersistentVolumeSpec{
- Capacity: v1.ResourceList{v1.ResourceName(v1.ResourceStorage): resource.MustParse("10G")},
- PersistentVolumeSource: v1.PersistentVolumeSource{GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{PDName: "fake-device1"}},
- AccessModes: []v1.PersistentVolumeAccessMode{
- v1.ReadWriteOnce,
- v1.ReadOnlyMany,
- },
- VolumeMode: &mode,
- },
- }
- volumeSpec := &volume.Spec{
- PersistentVolume: gcepv,
- }
- podName := util.GetUniquePodName(pod)
- generatedVolumeName, err := dsw.AddPodToVolume(
- podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
- // Assert
- if err != nil {
- t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
- }
- // Act
- runReconciler(reconciler)
- waitForMount(t, fakePlugin, generatedVolumeName, asw)
- // Assert
- assert.NoError(t, volumetesting.VerifyAttachCallCount(
- 1 /* expectedAttachCallCount */, fakePlugin))
- assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount(
- 1 /* expectedWaitForAttachCallCount */, fakePlugin))
- assert.NoError(t, volumetesting.VerifyGetMapDeviceCallCount(
- 1 /* expectedGetMapDeviceCallCount */, fakePlugin))
- assert.NoError(t, volumetesting.VerifyZeroTearDownDeviceCallCount(fakePlugin))
- assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
- // Act
- dsw.DeletePodFromVolume(podName, generatedVolumeName)
- waitForDetach(t, fakePlugin, generatedVolumeName, asw)
- // Assert
- assert.NoError(t, volumetesting.VerifyTearDownDeviceCallCount(
- 1 /* expectedTearDownDeviceCallCount */, fakePlugin))
- assert.NoError(t, volumetesting.VerifyDetachCallCount(
- 1 /* expectedDetachCallCount */, fakePlugin))
- }
- // Populates desiredStateOfWorld cache with one volume/pod.
- // Enables controllerAttachDetachEnabled.
- // Calls Run()
- // Verifies two map path calls are made and no teardownDevice/detach calls.
- // Deletes volume/pod from desired state of world.
- // Verifies one teardownDevice call is made.
- // Verifies there are no attach/detach calls made.
- func Test_Run_Positive_VolumeUnmapControllerAttachEnabled(t *testing.T) {
- // Enable BlockVolume feature gate
- defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BlockVolume, true)()
- // Arrange
- volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
- dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
- asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
- kubeClient := createTestClient()
- fakeRecorder := &record.FakeRecorder{}
- fakeHandler := volumetesting.NewBlockVolumePathHandler()
- oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
- kubeClient,
- volumePluginMgr,
- fakeRecorder,
- false, /* checkNodeCapabilitiesBeforeMount */
- fakeHandler))
- reconciler := NewReconciler(
- kubeClient,
- true, /* controllerAttachDetachEnabled */
- reconcilerLoopSleepDuration,
- waitForAttachTimeout,
- nodeName,
- dsw,
- asw,
- hasAddedPods,
- oex,
- &mount.FakeMounter{},
- volumePluginMgr,
- kubeletPodsDir)
- pod := &v1.Pod{
- ObjectMeta: metav1.ObjectMeta{
- Name: "pod1",
- UID: "pod1uid",
- },
- Spec: v1.PodSpec{},
- }
- mode := v1.PersistentVolumeBlock
- gcepv := &v1.PersistentVolume{
- ObjectMeta: metav1.ObjectMeta{UID: "001", Name: "volume-name"},
- Spec: v1.PersistentVolumeSpec{
- Capacity: v1.ResourceList{v1.ResourceName(v1.ResourceStorage): resource.MustParse("10G")},
- PersistentVolumeSource: v1.PersistentVolumeSource{GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{PDName: "fake-device1"}},
- AccessModes: []v1.PersistentVolumeAccessMode{
- v1.ReadWriteOnce,
- v1.ReadOnlyMany,
- },
- VolumeMode: &mode,
- },
- }
- volumeSpec := &volume.Spec{
- PersistentVolume: gcepv,
- }
- podName := util.GetUniquePodName(pod)
- generatedVolumeName, err := dsw.AddPodToVolume(
- podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
- // Assert
- if err != nil {
- t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
- }
- // Act
- runReconciler(reconciler)
- dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeName})
- waitForMount(t, fakePlugin, generatedVolumeName, asw)
- // Assert
- assert.NoError(t, volumetesting.VerifyZeroAttachCalls(fakePlugin))
- assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount(
- 1 /* expectedWaitForAttachCallCount */, fakePlugin))
- assert.NoError(t, volumetesting.VerifyGetMapDeviceCallCount(
- 1 /* expectedGetMapDeviceCallCount */, fakePlugin))
- assert.NoError(t, volumetesting.VerifyZeroTearDownDeviceCallCount(fakePlugin))
- assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
- // Act
- dsw.DeletePodFromVolume(podName, generatedVolumeName)
- waitForDetach(t, fakePlugin, generatedVolumeName, asw)
- // Assert
- assert.NoError(t, volumetesting.VerifyTearDownDeviceCallCount(
- 1 /* expectedTearDownDeviceCallCount */, fakePlugin))
- assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
- }
- func Test_GenerateMapVolumeFunc_Plugin_Not_Found(t *testing.T) {
- testCases := map[string]struct {
- volumePlugins []volume.VolumePlugin
- expectErr bool
- expectedErrMsg string
- }{
- "volumePlugin is nil": {
- volumePlugins: []volume.VolumePlugin{},
- expectErr: true,
- expectedErrMsg: "MapVolume.FindMapperPluginBySpec failed",
- },
- "blockVolumePlugin is nil": {
- volumePlugins: volumetesting.NewFakeFileVolumePlugin(),
- expectErr: true,
- expectedErrMsg: "MapVolume.FindMapperPluginBySpec failed to find BlockVolumeMapper plugin. Volume plugin is nil.",
- },
- }
- // Enable BlockVolume feature gate
- defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BlockVolume, true)()
- for name, tc := range testCases {
- t.Run(name, func(t *testing.T) {
- volumePluginMgr := &volume.VolumePluginMgr{}
- volumePluginMgr.InitPlugins(tc.volumePlugins, nil, nil)
- asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
- oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
- nil, /* kubeClient */
- volumePluginMgr,
- nil, /* fakeRecorder */
- false, /* checkNodeCapabilitiesBeforeMount */
- nil))
- pod := &v1.Pod{
- ObjectMeta: metav1.ObjectMeta{
- Name: "pod1",
- UID: "pod1uid",
- },
- Spec: v1.PodSpec{},
- }
- volumeMode := v1.PersistentVolumeBlock
- tmpSpec := &volume.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{VolumeMode: &volumeMode}}}
- volumeToMount := operationexecutor.VolumeToMount{
- Pod: pod,
- VolumeSpec: tmpSpec}
- err := oex.MountVolume(waitForAttachTimeout, volumeToMount, asw, false)
- // Assert
- if assert.Error(t, err) {
- assert.Contains(t, err.Error(), tc.expectedErrMsg)
- }
- })
- }
- }
- func Test_GenerateUnmapVolumeFunc_Plugin_Not_Found(t *testing.T) {
- testCases := map[string]struct {
- volumePlugins []volume.VolumePlugin
- expectErr bool
- expectedErrMsg string
- }{
- "volumePlugin is nil": {
- volumePlugins: []volume.VolumePlugin{},
- expectErr: true,
- expectedErrMsg: "UnmapVolume.FindMapperPluginByName failed",
- },
- "blockVolumePlugin is nil": {
- volumePlugins: volumetesting.NewFakeFileVolumePlugin(),
- expectErr: true,
- expectedErrMsg: "UnmapVolume.FindMapperPluginByName failed to find BlockVolumeMapper plugin. Volume plugin is nil.",
- },
- }
- // Enable BlockVolume feature gate
- defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BlockVolume, true)()
- for name, tc := range testCases {
- t.Run(name, func(t *testing.T) {
- volumePluginMgr := &volume.VolumePluginMgr{}
- volumePluginMgr.InitPlugins(tc.volumePlugins, nil, nil)
- asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
- oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
- nil, /* kubeClient */
- volumePluginMgr,
- nil, /* fakeRecorder */
- false, /* checkNodeCapabilitiesBeforeMount */
- nil))
- volumeMode := v1.PersistentVolumeBlock
- tmpSpec := &volume.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{VolumeMode: &volumeMode}}}
- volumeToUnmount := operationexecutor.MountedVolume{
- PluginName: "fake-file-plugin",
- VolumeSpec: tmpSpec}
- err := oex.UnmountVolume(volumeToUnmount, asw, "" /* podsDir */)
- // Assert
- if assert.Error(t, err) {
- assert.Contains(t, err.Error(), tc.expectedErrMsg)
- }
- })
- }
- }
- func Test_GenerateUnmapDeviceFunc_Plugin_Not_Found(t *testing.T) {
- testCases := map[string]struct {
- volumePlugins []volume.VolumePlugin
- expectErr bool
- expectedErrMsg string
- }{
- "volumePlugin is nil": {
- volumePlugins: []volume.VolumePlugin{},
- expectErr: true,
- expectedErrMsg: "UnmapDevice.FindMapperPluginByName failed",
- },
- "blockVolumePlugin is nil": {
- volumePlugins: volumetesting.NewFakeFileVolumePlugin(),
- expectErr: true,
- expectedErrMsg: "UnmapDevice.FindMapperPluginByName failed to find BlockVolumeMapper plugin. Volume plugin is nil.",
- },
- }
- // Enable BlockVolume feature gate
- defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BlockVolume, true)()
- for name, tc := range testCases {
- t.Run(name, func(t *testing.T) {
- volumePluginMgr := &volume.VolumePluginMgr{}
- volumePluginMgr.InitPlugins(tc.volumePlugins, nil, nil)
- asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
- oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
- nil, /* kubeClient */
- volumePluginMgr,
- nil, /* fakeRecorder */
- false, /* checkNodeCapabilitiesBeforeMount */
- nil))
- var mounter mount.Interface
- volumeMode := v1.PersistentVolumeBlock
- tmpSpec := &volume.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{VolumeMode: &volumeMode}}}
- deviceToDetach := operationexecutor.AttachedVolume{VolumeSpec: tmpSpec, PluginName: "fake-file-plugin"}
- err := oex.UnmountDevice(deviceToDetach, asw, mounter)
- // Assert
- if assert.Error(t, err) {
- assert.Contains(t, err.Error(), tc.expectedErrMsg)
- }
- })
- }
- }
- // Populates desiredStateOfWorld cache with one volume/pod.
- // Enables controllerAttachDetachEnabled.
- // Calls Run()
- // Wait for volume mounted.
- // Mark volume as fsResizeRequired in ASW.
- // Verifies volume's fsResizeRequired flag is cleared later.
- func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) {
- defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ExpandInUsePersistentVolumes, true)()
- fs := v1.PersistentVolumeFilesystem
- pv := &v1.PersistentVolume{
- ObjectMeta: metav1.ObjectMeta{
- Name: "pv",
- UID: "pvuid",
- },
- Spec: v1.PersistentVolumeSpec{
- ClaimRef: &v1.ObjectReference{Name: "pvc"},
- VolumeMode: &fs,
- },
- }
- pvc := &v1.PersistentVolumeClaim{
- ObjectMeta: metav1.ObjectMeta{
- Name: "pvc",
- UID: "pvcuid",
- },
- Spec: v1.PersistentVolumeClaimSpec{
- VolumeName: "pv",
- VolumeMode: &fs,
- },
- }
- pod := &v1.Pod{
- ObjectMeta: metav1.ObjectMeta{
- Name: "pod1",
- UID: "pod1uid",
- },
- Spec: v1.PodSpec{
- Volumes: []v1.Volume{
- {
- Name: "volume-name",
- VolumeSource: v1.VolumeSource{
- PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
- ClaimName: pvc.Name,
- },
- },
- },
- },
- },
- }
- volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
- dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
- asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
- kubeClient := createtestClientWithPVPVC(pv, pvc)
- fakeRecorder := &record.FakeRecorder{}
- fakeHandler := volumetesting.NewBlockVolumePathHandler()
- oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
- kubeClient,
- volumePluginMgr,
- fakeRecorder,
- false, /* checkNodeCapabilitiesBeforeMount */
- fakeHandler))
- reconciler := NewReconciler(
- kubeClient,
- true, /* controllerAttachDetachEnabled */
- reconcilerLoopSleepDuration,
- waitForAttachTimeout,
- nodeName,
- dsw,
- asw,
- hasAddedPods,
- oex,
- &mount.FakeMounter{},
- volumePluginMgr,
- kubeletPodsDir)
- volumeSpec := &volume.Spec{PersistentVolume: pv}
- podName := util.GetUniquePodName(pod)
- volumeName, err := dsw.AddPodToVolume(
- podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
- // Assert
- if err != nil {
- t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
- }
- dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{volumeName})
- // Start the reconciler to fill ASW.
- stopChan, stoppedChan := make(chan struct{}), make(chan struct{})
- go func() {
- reconciler.Run(stopChan)
- close(stoppedChan)
- }()
- waitForMount(t, fakePlugin, volumeName, asw)
- // Stop the reconciler.
- close(stopChan)
- <-stoppedChan
- // Mark volume as fsResizeRequired.
- asw.MarkFSResizeRequired(volumeName, podName)
- _, _, podExistErr := asw.PodExistsInVolume(podName, volumeName)
- if !cache.IsFSResizeRequiredError(podExistErr) {
- t.Fatalf("Volume should be marked as fsResizeRequired, but receive unexpected error: %v", podExistErr)
- }
- // Start the reconciler again, we hope reconciler will perform the
- // resize operation and clear the fsResizeRequired flag for volume.
- go reconciler.Run(wait.NeverStop)
- waitErr := retryWithExponentialBackOff(500*time.Millisecond, func() (done bool, err error) {
- mounted, _, err := asw.PodExistsInVolume(podName, volumeName)
- return mounted && err == nil, nil
- })
- if waitErr != nil {
- t.Fatal("Volume resize should succeeded")
- }
- }
- func waitForMount(
- t *testing.T,
- fakePlugin *volumetesting.FakeVolumePlugin,
- volumeName v1.UniqueVolumeName,
- asw cache.ActualStateOfWorld) {
- err := retryWithExponentialBackOff(
- time.Duration(500*time.Millisecond),
- func() (bool, error) {
- mountedVolumes := asw.GetMountedVolumes()
- for _, mountedVolume := range mountedVolumes {
- if mountedVolume.VolumeName == volumeName {
- return true, nil
- }
- }
- return false, nil
- },
- )
- if err != nil {
- t.Fatalf("Timed out waiting for volume %q to be attached.", volumeName)
- }
- }
- func waitForDetach(
- t *testing.T,
- fakePlugin *volumetesting.FakeVolumePlugin,
- volumeName v1.UniqueVolumeName,
- asw cache.ActualStateOfWorld) {
- err := retryWithExponentialBackOff(
- time.Duration(500*time.Millisecond),
- func() (bool, error) {
- if asw.VolumeExists(volumeName) {
- return false, nil
- }
- return true, nil
- },
- )
- if err != nil {
- t.Fatalf("Timed out waiting for volume %q to be detached.", volumeName)
- }
- }
- func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.ConditionFunc) error {
- backoff := wait.Backoff{
- Duration: initialDuration,
- Factor: 3,
- Jitter: 0,
- Steps: 6,
- }
- return wait.ExponentialBackoff(backoff, fn)
- }
- func createTestClient() *fake.Clientset {
- fakeClient := &fake.Clientset{}
- fakeClient.AddReactor("get", "nodes",
- func(action core.Action) (bool, runtime.Object, error) {
- return true, &v1.Node{
- ObjectMeta: metav1.ObjectMeta{Name: string(nodeName)},
- Status: v1.NodeStatus{
- VolumesAttached: []v1.AttachedVolume{
- {
- Name: "fake-plugin/fake-device1",
- DevicePath: "/fake/path",
- },
- }},
- }, nil
- })
- fakeClient.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) {
- return true, nil, fmt.Errorf("no reaction implemented for %s", action)
- })
- return fakeClient
- }
- func runReconciler(reconciler Reconciler) {
- go reconciler.Run(wait.NeverStop)
- }
- func createtestClientWithPVPVC(pv *v1.PersistentVolume, pvc *v1.PersistentVolumeClaim) *fake.Clientset {
- fakeClient := &fake.Clientset{}
- fakeClient.AddReactor("get", "nodes",
- func(action core.Action) (bool, runtime.Object, error) {
- return true, &v1.Node{
- ObjectMeta: metav1.ObjectMeta{Name: string(nodeName)},
- Status: v1.NodeStatus{
- VolumesAttached: []v1.AttachedVolume{
- {
- Name: "fake-plugin/pv",
- DevicePath: "fake/path",
- },
- }},
- }, nil
- })
- fakeClient.AddReactor("get", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
- return true, pvc, nil
- })
- fakeClient.AddReactor("get", "persistentvolumes", func(action core.Action) (bool, runtime.Object, error) {
- return true, pv, nil
- })
- fakeClient.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) {
- return true, nil, fmt.Errorf("no reaction implemented for %s", action)
- })
- return fakeClient
- }
- func Test_Run_Positive_VolumeMountControllerAttachEnabledRace(t *testing.T) {
- // Arrange
- volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
- dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
- asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
- kubeClient := createTestClient()
- fakeRecorder := &record.FakeRecorder{}
- fakeHandler := volumetesting.NewBlockVolumePathHandler()
- oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
- kubeClient,
- volumePluginMgr,
- fakeRecorder,
- false, /* checkNodeCapabilitiesBeforeMount */
- fakeHandler))
- reconciler := NewReconciler(
- kubeClient,
- true, /* controllerAttachDetachEnabled */
- reconcilerLoopSleepDuration,
- waitForAttachTimeout,
- nodeName,
- dsw,
- asw,
- hasAddedPods,
- oex,
- &mount.FakeMounter{},
- volumePluginMgr,
- kubeletPodsDir)
- pod := &v1.Pod{
- ObjectMeta: metav1.ObjectMeta{
- Name: "pod1",
- UID: "pod1uid",
- },
- Spec: v1.PodSpec{
- Volumes: []v1.Volume{
- {
- Name: "volume-name",
- VolumeSource: v1.VolumeSource{
- GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
- PDName: "fake-device1",
- },
- },
- },
- },
- },
- }
- // Some steps are executes out of order in callbacks, follow the numbers.
- // 1. Add a volume to DSW and wait until it's mounted
- volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
- podName := util.GetUniquePodName(pod)
- generatedVolumeName, err := dsw.AddPodToVolume(
- podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
- dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeName})
- if err != nil {
- t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
- }
- // Start the reconciler to fill ASW.
- stopChan, stoppedChan := make(chan struct{}), make(chan struct{})
- go func() {
- reconciler.Run(stopChan)
- close(stoppedChan)
- }()
- waitForMount(t, fakePlugin, generatedVolumeName, asw)
- // Stop the reconciler.
- close(stopChan)
- <-stoppedChan
- finished := make(chan interface{})
- fakePlugin.UnmountDeviceHook = func(mountPath string) error {
- // Act:
- // 3. While a volume is being unmounted, add it back to the desired state of world
- klog.Infof("UnmountDevice called")
- generatedVolumeName, err = dsw.AddPodToVolume(
- podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
- dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeName})
- return nil
- }
- fakePlugin.WaitForAttachHook = func(spec *volume.Spec, devicePath string, pod *v1.Pod, spectimeout time.Duration) (string, error) {
- // Assert
- // 4. When the volume is mounted again, expect that UnmountDevice operation did not clear devicePath
- if devicePath == "" {
- t.Errorf("Expected WaitForAttach called with devicePath from Node.Status")
- close(finished)
- return "", fmt.Errorf("Expected devicePath from Node.Status")
- }
- close(finished)
- return devicePath, nil
- }
- // Start the reconciler again.
- go reconciler.Run(wait.NeverStop)
- // 2. Delete the volume from DSW (and wait for callbacks)
- dsw.DeletePodFromVolume(podName, generatedVolumeName)
- <-finished
- waitForMount(t, fakePlugin, generatedVolumeName, asw)
- }
|