123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251 |
- /*
- Copyright 2017 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package volumescheduling
- // This file tests the VolumeScheduling feature.
- import (
- "context"
- "fmt"
- "os"
- "strconv"
- "strings"
- "testing"
- "time"
- "k8s.io/klog"
- v1 "k8s.io/api/core/v1"
- storagev1 "k8s.io/api/storage/v1"
- "k8s.io/apimachinery/pkg/api/resource"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/util/rand"
- "k8s.io/apimachinery/pkg/util/sets"
- "k8s.io/apimachinery/pkg/util/wait"
- "k8s.io/client-go/informers"
- clientset "k8s.io/client-go/kubernetes"
- "k8s.io/client-go/util/workqueue"
- "k8s.io/kubernetes/pkg/controller/volume/persistentvolume"
- "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodevolumelimits"
- "k8s.io/kubernetes/pkg/volume"
- volumetest "k8s.io/kubernetes/pkg/volume/testing"
- imageutils "k8s.io/kubernetes/test/utils/image"
- )
- type testConfig struct {
- client clientset.Interface
- ns string
- stop <-chan struct{}
- teardown func()
- }
- var (
- // Delete API objects immediately
- deletePeriod = int64(0)
- deleteOption = &metav1.DeleteOptions{GracePeriodSeconds: &deletePeriod}
- modeWait = storagev1.VolumeBindingWaitForFirstConsumer
- modeImmediate = storagev1.VolumeBindingImmediate
- classWait = "wait"
- classImmediate = "immediate"
- classDynamic = "dynamic"
- classTopoMismatch = "topomismatch"
- sharedClasses = map[string]*storagev1.StorageClass{
- classImmediate: makeStorageClass(classImmediate, &modeImmediate),
- classWait: makeStorageClass(classWait, &modeWait),
- }
- )
- const (
- node1 = "node-1"
- node2 = "node-2"
- podLimit = 50
- volsPerPod = 3
- nodeAffinityLabelKey = "kubernetes.io/hostname"
- provisionerPluginName = "kubernetes.io/mock-provisioner"
- )
- type testPV struct {
- name string
- scName string
- preboundPVC string
- node string
- }
- type testPVC struct {
- name string
- scName string
- preboundPV string
- }
- func TestVolumeBinding(t *testing.T) {
- config := setupCluster(t, "volume-scheduling-", 2, 0, 0)
- defer config.teardown()
- cases := map[string]struct {
- pod *v1.Pod
- pvs []*testPV
- pvcs []*testPVC
- // Create these, but they should not be bound in the end
- unboundPvcs []*testPVC
- unboundPvs []*testPV
- shouldFail bool
- }{
- "immediate can bind": {
- pod: makePod("pod-i-canbind", config.ns, []string{"pvc-i-canbind"}),
- pvs: []*testPV{{"pv-i-canbind", classImmediate, "", node1}},
- pvcs: []*testPVC{{"pvc-i-canbind", classImmediate, ""}},
- },
- "immediate cannot bind": {
- pod: makePod("pod-i-cannotbind", config.ns, []string{"pvc-i-cannotbind"}),
- unboundPvcs: []*testPVC{{"pvc-i-cannotbind", classImmediate, ""}},
- shouldFail: true,
- },
- "immediate pvc prebound": {
- pod: makePod("pod-i-pvc-prebound", config.ns, []string{"pvc-i-prebound"}),
- pvs: []*testPV{{"pv-i-pvc-prebound", classImmediate, "", node1}},
- pvcs: []*testPVC{{"pvc-i-prebound", classImmediate, "pv-i-pvc-prebound"}},
- },
- "immediate pv prebound": {
- pod: makePod("pod-i-pv-prebound", config.ns, []string{"pvc-i-pv-prebound"}),
- pvs: []*testPV{{"pv-i-prebound", classImmediate, "pvc-i-pv-prebound", node1}},
- pvcs: []*testPVC{{"pvc-i-pv-prebound", classImmediate, ""}},
- },
- "wait can bind": {
- pod: makePod("pod-w-canbind", config.ns, []string{"pvc-w-canbind"}),
- pvs: []*testPV{{"pv-w-canbind", classWait, "", node1}},
- pvcs: []*testPVC{{"pvc-w-canbind", classWait, ""}},
- },
- "wait cannot bind": {
- pod: makePod("pod-w-cannotbind", config.ns, []string{"pvc-w-cannotbind"}),
- unboundPvcs: []*testPVC{{"pvc-w-cannotbind", classWait, ""}},
- shouldFail: true,
- },
- "wait pvc prebound": {
- pod: makePod("pod-w-pvc-prebound", config.ns, []string{"pvc-w-prebound"}),
- pvs: []*testPV{{"pv-w-pvc-prebound", classWait, "", node1}},
- pvcs: []*testPVC{{"pvc-w-prebound", classWait, "pv-w-pvc-prebound"}},
- },
- "wait pv prebound": {
- pod: makePod("pod-w-pv-prebound", config.ns, []string{"pvc-w-pv-prebound"}),
- pvs: []*testPV{{"pv-w-prebound", classWait, "pvc-w-pv-prebound", node1}},
- pvcs: []*testPVC{{"pvc-w-pv-prebound", classWait, ""}},
- },
- "wait can bind two": {
- pod: makePod("pod-w-canbind-2", config.ns, []string{"pvc-w-canbind-2", "pvc-w-canbind-3"}),
- pvs: []*testPV{
- {"pv-w-canbind-2", classWait, "", node2},
- {"pv-w-canbind-3", classWait, "", node2},
- },
- pvcs: []*testPVC{
- {"pvc-w-canbind-2", classWait, ""},
- {"pvc-w-canbind-3", classWait, ""},
- },
- unboundPvs: []*testPV{
- {"pv-w-canbind-5", classWait, "", node1},
- },
- },
- "wait cannot bind two": {
- pod: makePod("pod-w-cannotbind-2", config.ns, []string{"pvc-w-cannotbind-1", "pvc-w-cannotbind-2"}),
- unboundPvcs: []*testPVC{
- {"pvc-w-cannotbind-1", classWait, ""},
- {"pvc-w-cannotbind-2", classWait, ""},
- },
- unboundPvs: []*testPV{
- {"pv-w-cannotbind-1", classWait, "", node2},
- {"pv-w-cannotbind-2", classWait, "", node1},
- },
- shouldFail: true,
- },
- "mix immediate and wait": {
- pod: makePod("pod-mix-bound", config.ns, []string{"pvc-w-canbind-4", "pvc-i-canbind-2"}),
- pvs: []*testPV{
- {"pv-w-canbind-4", classWait, "", node1},
- {"pv-i-canbind-2", classImmediate, "", node1},
- },
- pvcs: []*testPVC{
- {"pvc-w-canbind-4", classWait, ""},
- {"pvc-i-canbind-2", classImmediate, ""},
- },
- },
- }
- for name, test := range cases {
- klog.Infof("Running test %v", name)
- // Create two StorageClasses
- suffix := rand.String(4)
- classes := map[string]*storagev1.StorageClass{}
- classes[classImmediate] = makeStorageClass(fmt.Sprintf("immediate-%v", suffix), &modeImmediate)
- classes[classWait] = makeStorageClass(fmt.Sprintf("wait-%v", suffix), &modeWait)
- for _, sc := range classes {
- if _, err := config.client.StorageV1().StorageClasses().Create(context.TODO(), sc, metav1.CreateOptions{}); err != nil {
- t.Fatalf("Failed to create StorageClass %q: %v", sc.Name, err)
- }
- }
- // Create PVs
- for _, pvConfig := range test.pvs {
- pv := makePV(pvConfig.name, classes[pvConfig.scName].Name, pvConfig.preboundPVC, config.ns, pvConfig.node)
- if _, err := config.client.CoreV1().PersistentVolumes().Create(context.TODO(), pv, metav1.CreateOptions{}); err != nil {
- t.Fatalf("Failed to create PersistentVolume %q: %v", pv.Name, err)
- }
- }
- for _, pvConfig := range test.unboundPvs {
- pv := makePV(pvConfig.name, classes[pvConfig.scName].Name, pvConfig.preboundPVC, config.ns, pvConfig.node)
- if _, err := config.client.CoreV1().PersistentVolumes().Create(context.TODO(), pv, metav1.CreateOptions{}); err != nil {
- t.Fatalf("Failed to create PersistentVolume %q: %v", pv.Name, err)
- }
- }
- // Wait for PVs to become available to avoid race condition in PV controller
- // https://github.com/kubernetes/kubernetes/issues/85320
- for _, pvConfig := range test.pvs {
- if err := waitForPVPhase(config.client, pvConfig.name, v1.VolumeAvailable); err != nil {
- t.Fatalf("PersistentVolume %q failed to become available: %v", pvConfig.name, err)
- }
- }
- for _, pvConfig := range test.unboundPvs {
- if err := waitForPVPhase(config.client, pvConfig.name, v1.VolumeAvailable); err != nil {
- t.Fatalf("PersistentVolume %q failed to become available: %v", pvConfig.name, err)
- }
- }
- // Create PVCs
- for _, pvcConfig := range test.pvcs {
- pvc := makePVC(pvcConfig.name, config.ns, &classes[pvcConfig.scName].Name, pvcConfig.preboundPV)
- if _, err := config.client.CoreV1().PersistentVolumeClaims(config.ns).Create(context.TODO(), pvc, metav1.CreateOptions{}); err != nil {
- t.Fatalf("Failed to create PersistentVolumeClaim %q: %v", pvc.Name, err)
- }
- }
- for _, pvcConfig := range test.unboundPvcs {
- pvc := makePVC(pvcConfig.name, config.ns, &classes[pvcConfig.scName].Name, pvcConfig.preboundPV)
- if _, err := config.client.CoreV1().PersistentVolumeClaims(config.ns).Create(context.TODO(), pvc, metav1.CreateOptions{}); err != nil {
- t.Fatalf("Failed to create PersistentVolumeClaim %q: %v", pvc.Name, err)
- }
- }
- // Create Pod
- if _, err := config.client.CoreV1().Pods(config.ns).Create(context.TODO(), test.pod, metav1.CreateOptions{}); err != nil {
- t.Fatalf("Failed to create Pod %q: %v", test.pod.Name, err)
- }
- if test.shouldFail {
- if err := waitForPodUnschedulable(config.client, test.pod); err != nil {
- t.Errorf("Pod %q was not unschedulable: %v", test.pod.Name, err)
- }
- } else {
- if err := waitForPodToSchedule(config.client, test.pod); err != nil {
- t.Errorf("Failed to schedule Pod %q: %v", test.pod.Name, err)
- }
- }
- // Validate PVC/PV binding
- for _, pvc := range test.pvcs {
- validatePVCPhase(t, config.client, pvc.name, config.ns, v1.ClaimBound, false)
- }
- for _, pvc := range test.unboundPvcs {
- validatePVCPhase(t, config.client, pvc.name, config.ns, v1.ClaimPending, false)
- }
- for _, pv := range test.pvs {
- validatePVPhase(t, config.client, pv.name, v1.VolumeBound)
- }
- for _, pv := range test.unboundPvs {
- validatePVPhase(t, config.client, pv.name, v1.VolumeAvailable)
- }
- // Force delete objects, but they still may not be immediately removed
- deleteTestObjects(config.client, config.ns, deleteOption)
- }
- }
- // TestVolumeBindingRescheduling tests scheduler will retry scheduling when needed.
- func TestVolumeBindingRescheduling(t *testing.T) {
- config := setupCluster(t, "volume-scheduling-", 2, 0, 0)
- defer config.teardown()
- storageClassName := "local-storage"
- cases := map[string]struct {
- pod *v1.Pod
- pvcs []*testPVC
- pvs []*testPV
- trigger func(config *testConfig)
- shouldFail bool
- }{
- "reschedule on WaitForFirstConsumer dynamic storage class add": {
- pod: makePod("pod-reschedule-onclassadd-dynamic", config.ns, []string{"pvc-reschedule-onclassadd-dynamic"}),
- pvcs: []*testPVC{
- {"pvc-reschedule-onclassadd-dynamic", "", ""},
- },
- trigger: func(config *testConfig) {
- sc := makeDynamicProvisionerStorageClass(storageClassName, &modeWait, nil)
- if _, err := config.client.StorageV1().StorageClasses().Create(context.TODO(), sc, metav1.CreateOptions{}); err != nil {
- t.Fatalf("Failed to create StorageClass %q: %v", sc.Name, err)
- }
- },
- shouldFail: false,
- },
- "reschedule on WaitForFirstConsumer static storage class add": {
- pod: makePod("pod-reschedule-onclassadd-static", config.ns, []string{"pvc-reschedule-onclassadd-static"}),
- pvcs: []*testPVC{
- {"pvc-reschedule-onclassadd-static", "", ""},
- },
- trigger: func(config *testConfig) {
- sc := makeStorageClass(storageClassName, &modeWait)
- if _, err := config.client.StorageV1().StorageClasses().Create(context.TODO(), sc, metav1.CreateOptions{}); err != nil {
- t.Fatalf("Failed to create StorageClass %q: %v", sc.Name, err)
- }
- // Create pv for this class to mock static provisioner behavior.
- pv := makePV("pv-reschedule-onclassadd-static", storageClassName, "", "", node1)
- if pv, err := config.client.CoreV1().PersistentVolumes().Create(context.TODO(), pv, metav1.CreateOptions{}); err != nil {
- t.Fatalf("Failed to create PersistentVolume %q: %v", pv.Name, err)
- }
- },
- shouldFail: false,
- },
- "reschedule on delay binding PVC add": {
- pod: makePod("pod-reschedule-onpvcadd", config.ns, []string{"pvc-reschedule-onpvcadd"}),
- pvs: []*testPV{
- {
- name: "pv-reschedule-onpvcadd",
- scName: classWait,
- node: node1,
- },
- },
- trigger: func(config *testConfig) {
- pvc := makePVC("pvc-reschedule-onpvcadd", config.ns, &classWait, "")
- if _, err := config.client.CoreV1().PersistentVolumeClaims(config.ns).Create(context.TODO(), pvc, metav1.CreateOptions{}); err != nil {
- t.Fatalf("Failed to create PersistentVolumeClaim %q: %v", pvc.Name, err)
- }
- },
- shouldFail: false,
- },
- }
- for name, test := range cases {
- klog.Infof("Running test %v", name)
- if test.pod == nil {
- t.Fatal("pod is required for this test")
- }
- // Create unbound pvc
- for _, pvcConfig := range test.pvcs {
- pvc := makePVC(pvcConfig.name, config.ns, &storageClassName, "")
- if _, err := config.client.CoreV1().PersistentVolumeClaims(config.ns).Create(context.TODO(), pvc, metav1.CreateOptions{}); err != nil {
- t.Fatalf("Failed to create PersistentVolumeClaim %q: %v", pvc.Name, err)
- }
- }
- // Create PVs
- for _, pvConfig := range test.pvs {
- pv := makePV(pvConfig.name, sharedClasses[pvConfig.scName].Name, pvConfig.preboundPVC, config.ns, pvConfig.node)
- if _, err := config.client.CoreV1().PersistentVolumes().Create(context.TODO(), pv, metav1.CreateOptions{}); err != nil {
- t.Fatalf("Failed to create PersistentVolume %q: %v", pv.Name, err)
- }
- }
- // Create pod
- if _, err := config.client.CoreV1().Pods(config.ns).Create(context.TODO(), test.pod, metav1.CreateOptions{}); err != nil {
- t.Fatalf("Failed to create Pod %q: %v", test.pod.Name, err)
- }
- // Wait for pod is unschedulable.
- klog.Infof("Waiting for pod is unschedulable")
- if err := waitForPodUnschedulable(config.client, test.pod); err != nil {
- t.Errorf("Failed as Pod %s was not unschedulable: %v", test.pod.Name, err)
- }
- // Trigger
- test.trigger(config)
- // Wait for pod is scheduled or unscheduable.
- if !test.shouldFail {
- klog.Infof("Waiting for pod is scheduled")
- if err := waitForPodToSchedule(config.client, test.pod); err != nil {
- t.Errorf("Failed to schedule Pod %q: %v", test.pod.Name, err)
- }
- } else {
- klog.Infof("Waiting for pod is unschedulable")
- if err := waitForPodUnschedulable(config.client, test.pod); err != nil {
- t.Errorf("Failed as Pod %s was not unschedulable: %v", test.pod.Name, err)
- }
- }
- // Force delete objects, but they still may not be immediately removed
- deleteTestObjects(config.client, config.ns, deleteOption)
- }
- }
- // TestVolumeBindingStress creates <podLimit> pods, each with <volsPerPod> unbound or prebound PVCs.
- // PVs are precreated.
- func TestVolumeBindingStress(t *testing.T) {
- testVolumeBindingStress(t, 0, false, 0)
- }
- // Like TestVolumeBindingStress but with scheduler resync. In real cluster,
- // scheduler will schedule failed pod frequently due to various events, e.g.
- // service/node update events.
- // This is useful to detect possible race conditions.
- func TestVolumeBindingStressWithSchedulerResync(t *testing.T) {
- testVolumeBindingStress(t, time.Second, false, 0)
- }
- // Like TestVolumeBindingStress but with fast dynamic provisioning
- func TestVolumeBindingDynamicStressFast(t *testing.T) {
- testVolumeBindingStress(t, 0, true, 0)
- }
- // Like TestVolumeBindingStress but with slow dynamic provisioning
- func TestVolumeBindingDynamicStressSlow(t *testing.T) {
- testVolumeBindingStress(t, 0, true, 10)
- }
- func testVolumeBindingStress(t *testing.T, schedulerResyncPeriod time.Duration, dynamic bool, provisionDelaySeconds int) {
- config := setupCluster(t, "volume-binding-stress-", 1, schedulerResyncPeriod, provisionDelaySeconds)
- defer config.teardown()
- // Set max volume limit to the number of PVCs the test will create
- // TODO: remove when max volume limit allows setting through storageclass
- if err := os.Setenv(nodevolumelimits.KubeMaxPDVols, fmt.Sprintf("%v", podLimit*volsPerPod)); err != nil {
- t.Fatalf("failed to set max pd limit: %v", err)
- }
- defer os.Unsetenv(nodevolumelimits.KubeMaxPDVols)
- scName := &classWait
- if dynamic {
- scName = &classDynamic
- sc := makeDynamicProvisionerStorageClass(*scName, &modeWait, nil)
- if _, err := config.client.StorageV1().StorageClasses().Create(context.TODO(), sc, metav1.CreateOptions{}); err != nil {
- t.Fatalf("Failed to create StorageClass %q: %v", sc.Name, err)
- }
- }
- klog.Infof("Start creating PVs and PVCs")
- // Create enough PVs and PVCs for all the pods
- podVolumesCount := podLimit * volsPerPod
- pvs := make([]*v1.PersistentVolume, podVolumesCount)
- pvcs := make([]*v1.PersistentVolumeClaim, podVolumesCount)
- workqueue.ParallelizeUntil(context.TODO(), 16, podVolumesCount, func(i int) {
- var (
- pv *v1.PersistentVolume
- pvc *v1.PersistentVolumeClaim
- pvName = fmt.Sprintf("pv-stress-%v", i)
- pvcName = fmt.Sprintf("pvc-stress-%v", i)
- )
- // Don't create pvs for dynamic provisioning test
- if !dynamic {
- if rand.Int()%2 == 0 {
- // static unbound pvs
- pv = makePV(pvName, *scName, "", "", node1)
- } else {
- // static prebound pvs
- pv = makePV(pvName, classImmediate, pvcName, config.ns, node1)
- }
- if pv, err := config.client.CoreV1().PersistentVolumes().Create(context.TODO(), pv, metav1.CreateOptions{}); err != nil {
- t.Fatalf("Failed to create PersistentVolume %q: %v", pv.Name, err)
- }
- pvs[i] = pv
- }
- if pv != nil && pv.Spec.ClaimRef != nil && pv.Spec.ClaimRef.Name == pvcName {
- pvc = makePVC(pvcName, config.ns, &classImmediate, pv.Name)
- } else {
- pvc = makePVC(pvcName, config.ns, scName, "")
- }
- if pvc, err := config.client.CoreV1().PersistentVolumeClaims(config.ns).Create(context.TODO(), pvc, metav1.CreateOptions{}); err != nil {
- t.Fatalf("Failed to create PersistentVolumeClaim %q: %v", pvc.Name, err)
- }
- pvcs[i] = pvc
- })
- klog.Infof("Start creating Pods")
- pods := make([]*v1.Pod, podLimit)
- workqueue.ParallelizeUntil(context.TODO(), 16, podLimit, func(i int) {
- // Generate string of all the PVCs for the pod
- podPvcs := []string{}
- for j := i * volsPerPod; j < (i+1)*volsPerPod; j++ {
- podPvcs = append(podPvcs, pvcs[j].Name)
- }
- pod := makePod(fmt.Sprintf("pod%03d", i), config.ns, podPvcs)
- if pod, err := config.client.CoreV1().Pods(config.ns).Create(context.TODO(), pod, metav1.CreateOptions{}); err != nil {
- t.Fatalf("Failed to create Pod %q: %v", pod.Name, err)
- }
- pods[i] = pod
- })
- klog.Infof("Start validating pod scheduled")
- // Validate Pods scheduled
- workqueue.ParallelizeUntil(context.TODO(), 16, len(pods), func(i int) {
- pod := pods[i]
- // Use increased timeout for stress test because there is a higher chance of
- // PV sync error
- if err := waitForPodToScheduleWithTimeout(config.client, pod, 2*time.Minute); err != nil {
- t.Errorf("Failed to schedule Pod %q: %v", pod.Name, err)
- }
- })
- klog.Infof("Start validating PVCs scheduled")
- // Validate PVC/PV binding
- workqueue.ParallelizeUntil(context.TODO(), 16, len(pvcs), func(i int) {
- validatePVCPhase(t, config.client, pvcs[i].Name, config.ns, v1.ClaimBound, dynamic)
- })
- // Don't validate pv for dynamic provisioning test
- if !dynamic {
- klog.Infof("Start validating PVs scheduled")
- workqueue.ParallelizeUntil(context.TODO(), 16, len(pvs), func(i int) {
- validatePVPhase(t, config.client, pvs[i].Name, v1.VolumeBound)
- })
- }
- }
- func testVolumeBindingWithAffinity(t *testing.T, anti bool, numNodes, numPods, numPVsFirstNode int) {
- config := setupCluster(t, "volume-pod-affinity-", numNodes, 0, 0)
- defer config.teardown()
- pods := []*v1.Pod{}
- pvcs := []*v1.PersistentVolumeClaim{}
- // Create PVs for the first node
- for i := 0; i < numPVsFirstNode; i++ {
- pv := makePV(fmt.Sprintf("pv-node1-%v", i), classWait, "", "", node1)
- if pv, err := config.client.CoreV1().PersistentVolumes().Create(context.TODO(), pv, metav1.CreateOptions{}); err != nil {
- t.Fatalf("Failed to create PersistentVolume %q: %v", pv.Name, err)
- }
- }
- // Create 1 PV per Node for the remaining nodes
- for i := 2; i <= numNodes; i++ {
- pv := makePV(fmt.Sprintf("pv-node%v-0", i), classWait, "", "", fmt.Sprintf("node-%v", i))
- if pv, err := config.client.CoreV1().PersistentVolumes().Create(context.TODO(), pv, metav1.CreateOptions{}); err != nil {
- t.Fatalf("Failed to create PersistentVolume %q: %v", pv.Name, err)
- }
- }
- // Create pods
- for i := 0; i < numPods; i++ {
- // Create one pvc per pod
- pvc := makePVC(fmt.Sprintf("pvc-%v", i), config.ns, &classWait, "")
- if pvc, err := config.client.CoreV1().PersistentVolumeClaims(config.ns).Create(context.TODO(), pvc, metav1.CreateOptions{}); err != nil {
- t.Fatalf("Failed to create PersistentVolumeClaim %q: %v", pvc.Name, err)
- }
- pvcs = append(pvcs, pvc)
- // Create pod with pod affinity
- pod := makePod(fmt.Sprintf("pod%03d", i), config.ns, []string{pvc.Name})
- pod.Spec.Affinity = &v1.Affinity{}
- affinityTerms := []v1.PodAffinityTerm{
- {
- LabelSelector: &metav1.LabelSelector{
- MatchExpressions: []metav1.LabelSelectorRequirement{
- {
- Key: "app",
- Operator: metav1.LabelSelectorOpIn,
- Values: []string{"volume-binding-test"},
- },
- },
- },
- TopologyKey: nodeAffinityLabelKey,
- },
- }
- if anti {
- pod.Spec.Affinity.PodAntiAffinity = &v1.PodAntiAffinity{
- RequiredDuringSchedulingIgnoredDuringExecution: affinityTerms,
- }
- } else {
- pod.Spec.Affinity.PodAffinity = &v1.PodAffinity{
- RequiredDuringSchedulingIgnoredDuringExecution: affinityTerms,
- }
- }
- if pod, err := config.client.CoreV1().Pods(config.ns).Create(context.TODO(), pod, metav1.CreateOptions{}); err != nil {
- t.Fatalf("Failed to create Pod %q: %v", pod.Name, err)
- }
- pods = append(pods, pod)
- }
- // Validate Pods scheduled
- scheduledNodes := sets.NewString()
- for _, pod := range pods {
- if err := waitForPodToSchedule(config.client, pod); err != nil {
- t.Errorf("Failed to schedule Pod %q: %v", pod.Name, err)
- } else {
- // Keep track of all the nodes that the Pods were scheduled on
- pod, err = config.client.CoreV1().Pods(config.ns).Get(context.TODO(), pod.Name, metav1.GetOptions{})
- if err != nil {
- t.Fatalf("Failed to get Pod %q: %v", pod.Name, err)
- }
- if pod.Spec.NodeName == "" {
- t.Fatalf("Pod %q node name unset after scheduling", pod.Name)
- }
- scheduledNodes.Insert(pod.Spec.NodeName)
- }
- }
- // Validate the affinity policy
- if anti {
- // The pods should have been spread across different nodes
- if scheduledNodes.Len() != numPods {
- t.Errorf("Pods were scheduled across %v nodes instead of %v", scheduledNodes.Len(), numPods)
- }
- } else {
- // The pods should have been scheduled on 1 node
- if scheduledNodes.Len() != 1 {
- t.Errorf("Pods were scheduled across %v nodes instead of %v", scheduledNodes.Len(), 1)
- }
- }
- // Validate PVC binding
- for _, pvc := range pvcs {
- validatePVCPhase(t, config.client, pvc.Name, config.ns, v1.ClaimBound, false)
- }
- }
- func TestVolumeBindingWithAntiAffinity(t *testing.T) {
- numNodes := 10
- // Create as many pods as number of nodes
- numPods := numNodes
- // Create many more PVs on node1 to increase chance of selecting node1
- numPVsFirstNode := 10 * numNodes
- testVolumeBindingWithAffinity(t, true, numNodes, numPods, numPVsFirstNode)
- }
- func TestVolumeBindingWithAffinity(t *testing.T) {
- numPods := 10
- // Create many more nodes to increase chance of selecting a PV on a different node than node1
- numNodes := 10 * numPods
- // Create numPods PVs on the first node
- numPVsFirstNode := numPods
- testVolumeBindingWithAffinity(t, true, numNodes, numPods, numPVsFirstNode)
- }
- func TestPVAffinityConflict(t *testing.T) {
- config := setupCluster(t, "volume-scheduling-", 3, 0, 0)
- defer config.teardown()
- pv := makePV("local-pv", classImmediate, "", "", node1)
- pvc := makePVC("local-pvc", config.ns, &classImmediate, "")
- // Create PV
- if _, err := config.client.CoreV1().PersistentVolumes().Create(context.TODO(), pv, metav1.CreateOptions{}); err != nil {
- t.Fatalf("Failed to create PersistentVolume %q: %v", pv.Name, err)
- }
- // Create PVC
- if _, err := config.client.CoreV1().PersistentVolumeClaims(config.ns).Create(context.TODO(), pvc, metav1.CreateOptions{}); err != nil {
- t.Fatalf("Failed to create PersistentVolumeClaim %q: %v", pvc.Name, err)
- }
- // Wait for PVC bound
- if err := waitForPVCBound(config.client, pvc); err != nil {
- t.Fatalf("PVC %q failed to bind: %v", pvc.Name, err)
- }
- nodeMarkers := []interface{}{
- markNodeAffinity,
- markNodeSelector,
- }
- for i := 0; i < len(nodeMarkers); i++ {
- podName := "local-pod-" + strconv.Itoa(i+1)
- pod := makePod(podName, config.ns, []string{"local-pvc"})
- nodeMarkers[i].(func(*v1.Pod, string))(pod, "node-2")
- // Create Pod
- if _, err := config.client.CoreV1().Pods(config.ns).Create(context.TODO(), pod, metav1.CreateOptions{}); err != nil {
- t.Fatalf("Failed to create Pod %q: %v", pod.Name, err)
- }
- // Give time to shceduler to attempt to schedule pod
- if err := waitForPodUnschedulable(config.client, pod); err != nil {
- t.Errorf("Failed as Pod %s was not unschedulable: %v", pod.Name, err)
- }
- // Check pod conditions
- p, err := config.client.CoreV1().Pods(config.ns).Get(context.TODO(), podName, metav1.GetOptions{})
- if err != nil {
- t.Fatalf("Failed to access Pod %s status: %v", podName, err)
- }
- if strings.Compare(string(p.Status.Phase), "Pending") != 0 {
- t.Fatalf("Failed as Pod %s was in: %s state and not in expected: Pending state", podName, p.Status.Phase)
- }
- if strings.Compare(p.Status.Conditions[0].Reason, "Unschedulable") != 0 {
- t.Fatalf("Failed as Pod %s reason was: %s but expected: Unschedulable", podName, p.Status.Conditions[0].Reason)
- }
- if !strings.Contains(p.Status.Conditions[0].Message, "node(s) didn't match node selector") || !strings.Contains(p.Status.Conditions[0].Message, "node(s) had volume node affinity conflict") {
- t.Fatalf("Failed as Pod's %s failure message does not contain expected message: node(s) didn't match node selector, node(s) had volume node affinity conflict. Got message %q", podName, p.Status.Conditions[0].Message)
- }
- // Deleting test pod
- if err := config.client.CoreV1().Pods(config.ns).Delete(context.TODO(), podName, &metav1.DeleteOptions{}); err != nil {
- t.Fatalf("Failed to delete Pod %s: %v", podName, err)
- }
- }
- }
- func TestVolumeProvision(t *testing.T) {
- config := setupCluster(t, "volume-scheduling", 1, 0, 0)
- defer config.teardown()
- cases := map[string]struct {
- pod *v1.Pod
- pvs []*testPV
- boundPvcs []*testPVC
- provisionedPvcs []*testPVC
- // Create these, but they should not be bound in the end
- unboundPvcs []*testPVC
- shouldFail bool
- }{
- "wait provisioned": {
- pod: makePod("pod-pvc-canprovision", config.ns, []string{"pvc-canprovision"}),
- provisionedPvcs: []*testPVC{{"pvc-canprovision", classWait, ""}},
- },
- "topolgy unsatisfied": {
- pod: makePod("pod-pvc-topomismatch", config.ns, []string{"pvc-topomismatch"}),
- unboundPvcs: []*testPVC{{"pvc-topomismatch", classTopoMismatch, ""}},
- shouldFail: true,
- },
- "wait one bound, one provisioned": {
- pod: makePod("pod-pvc-canbind-or-provision", config.ns, []string{"pvc-w-canbind", "pvc-canprovision"}),
- pvs: []*testPV{{"pv-w-canbind", classWait, "", node1}},
- boundPvcs: []*testPVC{{"pvc-w-canbind", classWait, ""}},
- provisionedPvcs: []*testPVC{{"pvc-canprovision", classWait, ""}},
- },
- "one immediate pv prebound, one wait provisioned": {
- pod: makePod("pod-i-pv-prebound-w-provisioned", config.ns, []string{"pvc-i-pv-prebound", "pvc-canprovision"}),
- pvs: []*testPV{{"pv-i-prebound", classImmediate, "pvc-i-pv-prebound", node1}},
- boundPvcs: []*testPVC{{"pvc-i-pv-prebound", classImmediate, ""}},
- provisionedPvcs: []*testPVC{{"pvc-canprovision", classWait, ""}},
- },
- "wait one pv prebound, one provisioned": {
- pod: makePod("pod-w-pv-prebound-w-provisioned", config.ns, []string{"pvc-w-pv-prebound", "pvc-canprovision"}),
- pvs: []*testPV{{"pv-w-prebound", classWait, "pvc-w-pv-prebound", node1}},
- boundPvcs: []*testPVC{{"pvc-w-pv-prebound", classWait, ""}},
- provisionedPvcs: []*testPVC{{"pvc-canprovision", classWait, ""}},
- },
- "immediate provisioned by controller": {
- pod: makePod("pod-i-unbound", config.ns, []string{"pvc-controller-provisioned"}),
- // A pvc of immediate binding mode is expected to be provisioned by controller,
- // we treat it as "bound" here because it is supposed to be in same state
- // with bound claims, i.e. in bound status and has no selectedNode annotation.
- boundPvcs: []*testPVC{{"pvc-controller-provisioned", classImmediate, ""}},
- },
- }
- for name, test := range cases {
- klog.Infof("Running test %v", name)
- // Create StorageClasses
- suffix := rand.String(4)
- classes := map[string]*storagev1.StorageClass{}
- classes[classImmediate] = makeDynamicProvisionerStorageClass(fmt.Sprintf("immediate-%v", suffix), &modeImmediate, nil)
- classes[classWait] = makeDynamicProvisionerStorageClass(fmt.Sprintf("wait-%v", suffix), &modeWait, nil)
- topo := []v1.TopologySelectorTerm{
- {
- MatchLabelExpressions: []v1.TopologySelectorLabelRequirement{
- {
- Key: nodeAffinityLabelKey,
- Values: []string{node2},
- },
- },
- },
- }
- classes[classTopoMismatch] = makeDynamicProvisionerStorageClass(fmt.Sprintf("topomismatch-%v", suffix), &modeWait, topo)
- for _, sc := range classes {
- if _, err := config.client.StorageV1().StorageClasses().Create(context.TODO(), sc, metav1.CreateOptions{}); err != nil {
- t.Fatalf("Failed to create StorageClass %q: %v", sc.Name, err)
- }
- }
- // Create PVs
- for _, pvConfig := range test.pvs {
- pv := makePV(pvConfig.name, classes[pvConfig.scName].Name, pvConfig.preboundPVC, config.ns, pvConfig.node)
- if _, err := config.client.CoreV1().PersistentVolumes().Create(context.TODO(), pv, metav1.CreateOptions{}); err != nil {
- t.Fatalf("Failed to create PersistentVolume %q: %v", pv.Name, err)
- }
- }
- // Create PVCs
- for _, pvcConfig := range test.boundPvcs {
- pvc := makePVC(pvcConfig.name, config.ns, &classes[pvcConfig.scName].Name, pvcConfig.preboundPV)
- if _, err := config.client.CoreV1().PersistentVolumeClaims(config.ns).Create(context.TODO(), pvc, metav1.CreateOptions{}); err != nil {
- t.Fatalf("Failed to create PersistentVolumeClaim %q: %v", pvc.Name, err)
- }
- }
- for _, pvcConfig := range test.unboundPvcs {
- pvc := makePVC(pvcConfig.name, config.ns, &classes[pvcConfig.scName].Name, pvcConfig.preboundPV)
- if _, err := config.client.CoreV1().PersistentVolumeClaims(config.ns).Create(context.TODO(), pvc, metav1.CreateOptions{}); err != nil {
- t.Fatalf("Failed to create PersistentVolumeClaim %q: %v", pvc.Name, err)
- }
- }
- for _, pvcConfig := range test.provisionedPvcs {
- pvc := makePVC(pvcConfig.name, config.ns, &classes[pvcConfig.scName].Name, pvcConfig.preboundPV)
- if _, err := config.client.CoreV1().PersistentVolumeClaims(config.ns).Create(context.TODO(), pvc, metav1.CreateOptions{}); err != nil {
- t.Fatalf("Failed to create PersistentVolumeClaim %q: %v", pvc.Name, err)
- }
- }
- // Create Pod
- if _, err := config.client.CoreV1().Pods(config.ns).Create(context.TODO(), test.pod, metav1.CreateOptions{}); err != nil {
- t.Fatalf("Failed to create Pod %q: %v", test.pod.Name, err)
- }
- if test.shouldFail {
- if err := waitForPodUnschedulable(config.client, test.pod); err != nil {
- t.Errorf("Pod %q was not unschedulable: %v", test.pod.Name, err)
- }
- } else {
- if err := waitForPodToSchedule(config.client, test.pod); err != nil {
- t.Errorf("Failed to schedule Pod %q: %v", test.pod.Name, err)
- }
- }
- // Validate PVC/PV binding
- for _, pvc := range test.boundPvcs {
- validatePVCPhase(t, config.client, pvc.name, config.ns, v1.ClaimBound, false)
- }
- for _, pvc := range test.unboundPvcs {
- validatePVCPhase(t, config.client, pvc.name, config.ns, v1.ClaimPending, false)
- }
- for _, pvc := range test.provisionedPvcs {
- validatePVCPhase(t, config.client, pvc.name, config.ns, v1.ClaimBound, true)
- }
- for _, pv := range test.pvs {
- validatePVPhase(t, config.client, pv.name, v1.VolumeBound)
- }
- // Force delete objects, but they still may not be immediately removed
- deleteTestObjects(config.client, config.ns, deleteOption)
- }
- }
- // TestRescheduleProvisioning validate that PV controller will remove
- // selectedNode annotation from a claim to reschedule volume provision
- // on provision failure.
- func TestRescheduleProvisioning(t *testing.T) {
- // Set feature gates
- controllerCh := make(chan struct{})
- testCtx := initTestMaster(t, "reschedule-volume-provision", nil)
- clientset := testCtx.clientSet
- ns := testCtx.ns.Name
- defer func() {
- close(controllerCh)
- deleteTestObjects(clientset, ns, nil)
- testCtx.clientSet.CoreV1().Nodes().DeleteCollection(context.TODO(), nil, metav1.ListOptions{})
- testCtx.closeFn()
- }()
- ctrl, informerFactory, err := initPVController(t, testCtx, 0)
- if err != nil {
- t.Fatalf("Failed to create PV controller: %v", err)
- }
- // Prepare node and storage class.
- testNode := makeNode(0)
- if _, err := clientset.CoreV1().Nodes().Create(context.TODO(), testNode, metav1.CreateOptions{}); err != nil {
- t.Fatalf("Failed to create Node %q: %v", testNode.Name, err)
- }
- scName := "fail-provision"
- sc := makeDynamicProvisionerStorageClass(scName, &modeWait, nil)
- // Expect the storage class fail to provision.
- sc.Parameters[volumetest.ExpectProvisionFailureKey] = ""
- if _, err := clientset.StorageV1().StorageClasses().Create(context.TODO(), sc, metav1.CreateOptions{}); err != nil {
- t.Fatalf("Failed to create StorageClass %q: %v", sc.Name, err)
- }
- // Create a pvc with selected node annotation.
- pvcName := "pvc-fail-to-provision"
- pvc := makePVC(pvcName, ns, &scName, "")
- pvc.Annotations = map[string]string{"volume.kubernetes.io/selected-node": node1}
- pvc, err = clientset.CoreV1().PersistentVolumeClaims(ns).Create(context.TODO(), pvc, metav1.CreateOptions{})
- if err != nil {
- t.Fatalf("Failed to create PersistentVolumeClaim %q: %v", pvc.Name, err)
- }
- // Validate selectedNode annotation exists on created claim.
- selectedNodeAnn, exist := pvc.Annotations["volume.kubernetes.io/selected-node"]
- if !exist || selectedNodeAnn != node1 {
- t.Fatalf("Created pvc is not annotated as expected")
- }
- // Start controller.
- go ctrl.Run(controllerCh)
- informerFactory.Start(controllerCh)
- informerFactory.WaitForCacheSync(controllerCh)
- // Validate that the annotation is removed by controller for provision reschedule.
- if err := waitForProvisionAnn(clientset, pvc, false); err != nil {
- t.Errorf("Expect to reschedule provision for PVC %v/%v, but still found selected-node annotation on it", ns, pvcName)
- }
- }
- func setupCluster(t *testing.T, nsName string, numberOfNodes int, resyncPeriod time.Duration, provisionDelaySeconds int) *testConfig {
- textCtx := initTestSchedulerWithOptions(t, initTestMaster(t, nsName, nil), resyncPeriod)
- clientset := textCtx.clientSet
- ns := textCtx.ns.Name
- ctrl, informerFactory, err := initPVController(t, textCtx, provisionDelaySeconds)
- if err != nil {
- t.Fatalf("Failed to create PV controller: %v", err)
- }
- go ctrl.Run(textCtx.ctx.Done())
- // Start informer factory after all controllers are configured and running.
- informerFactory.Start(textCtx.ctx.Done())
- informerFactory.WaitForCacheSync(textCtx.ctx.Done())
- // Create shared objects
- // Create nodes
- for i := 0; i < numberOfNodes; i++ {
- testNode := makeNode(i)
- if _, err := clientset.CoreV1().Nodes().Create(context.TODO(), testNode, metav1.CreateOptions{}); err != nil {
- t.Fatalf("Failed to create Node %q: %v", testNode.Name, err)
- }
- }
- // Create SCs
- for _, sc := range sharedClasses {
- if _, err := clientset.StorageV1().StorageClasses().Create(context.TODO(), sc, metav1.CreateOptions{}); err != nil {
- t.Fatalf("Failed to create StorageClass %q: %v", sc.Name, err)
- }
- }
- return &testConfig{
- client: clientset,
- ns: ns,
- stop: textCtx.ctx.Done(),
- teardown: func() {
- klog.Infof("test cluster %q start to tear down", ns)
- deleteTestObjects(clientset, ns, nil)
- cleanupTest(t, textCtx)
- },
- }
- }
- func initPVController(t *testing.T, testCtx *testContext, provisionDelaySeconds int) (*persistentvolume.PersistentVolumeController, informers.SharedInformerFactory, error) {
- clientset := testCtx.clientSet
- // Informers factory for controllers
- informerFactory := informers.NewSharedInformerFactory(clientset, 0)
- // Start PV controller for volume binding.
- host := volumetest.NewFakeVolumeHost(t, "/tmp/fake", nil, nil)
- plugin := &volumetest.FakeVolumePlugin{
- PluginName: provisionerPluginName,
- Host: host,
- Config: volume.VolumeConfig{},
- LastProvisionerOptions: volume.VolumeOptions{},
- ProvisionDelaySeconds: provisionDelaySeconds,
- NewAttacherCallCount: 0,
- NewDetacherCallCount: 0,
- Mounters: nil,
- Unmounters: nil,
- Attachers: nil,
- Detachers: nil,
- }
- plugins := []volume.VolumePlugin{plugin}
- params := persistentvolume.ControllerParameters{
- KubeClient: clientset,
- // Use a frequent resync period to retry API update conflicts due to
- // https://github.com/kubernetes/kubernetes/issues/85320
- SyncPeriod: 5 * time.Second,
- VolumePlugins: plugins,
- Cloud: nil,
- ClusterName: "volume-test-cluster",
- VolumeInformer: informerFactory.Core().V1().PersistentVolumes(),
- ClaimInformer: informerFactory.Core().V1().PersistentVolumeClaims(),
- ClassInformer: informerFactory.Storage().V1().StorageClasses(),
- PodInformer: informerFactory.Core().V1().Pods(),
- NodeInformer: informerFactory.Core().V1().Nodes(),
- EnableDynamicProvisioning: true,
- }
- ctrl, err := persistentvolume.NewController(params)
- if err != nil {
- return nil, nil, err
- }
- return ctrl, informerFactory, nil
- }
- func deleteTestObjects(client clientset.Interface, ns string, option *metav1.DeleteOptions) {
- client.CoreV1().Pods(ns).DeleteCollection(context.TODO(), option, metav1.ListOptions{})
- client.CoreV1().PersistentVolumeClaims(ns).DeleteCollection(context.TODO(), option, metav1.ListOptions{})
- client.CoreV1().PersistentVolumes().DeleteCollection(context.TODO(), option, metav1.ListOptions{})
- client.StorageV1().StorageClasses().DeleteCollection(context.TODO(), option, metav1.ListOptions{})
- }
- func makeStorageClass(name string, mode *storagev1.VolumeBindingMode) *storagev1.StorageClass {
- return &storagev1.StorageClass{
- ObjectMeta: metav1.ObjectMeta{
- Name: name,
- },
- Provisioner: "kubernetes.io/no-provisioner",
- VolumeBindingMode: mode,
- }
- }
- func makeDynamicProvisionerStorageClass(name string, mode *storagev1.VolumeBindingMode, allowedTopologies []v1.TopologySelectorTerm) *storagev1.StorageClass {
- return &storagev1.StorageClass{
- ObjectMeta: metav1.ObjectMeta{
- Name: name,
- },
- Provisioner: provisionerPluginName,
- VolumeBindingMode: mode,
- AllowedTopologies: allowedTopologies,
- Parameters: map[string]string{},
- }
- }
- func makePV(name, scName, pvcName, ns, node string) *v1.PersistentVolume {
- pv := &v1.PersistentVolume{
- ObjectMeta: metav1.ObjectMeta{
- Name: name,
- Annotations: map[string]string{},
- },
- Spec: v1.PersistentVolumeSpec{
- Capacity: v1.ResourceList{
- v1.ResourceName(v1.ResourceStorage): resource.MustParse("5Gi"),
- },
- AccessModes: []v1.PersistentVolumeAccessMode{
- v1.ReadWriteOnce,
- },
- StorageClassName: scName,
- PersistentVolumeSource: v1.PersistentVolumeSource{
- Local: &v1.LocalVolumeSource{
- Path: "/test-path",
- },
- },
- NodeAffinity: &v1.VolumeNodeAffinity{
- Required: &v1.NodeSelector{
- NodeSelectorTerms: []v1.NodeSelectorTerm{
- {
- MatchExpressions: []v1.NodeSelectorRequirement{
- {
- Key: nodeAffinityLabelKey,
- Operator: v1.NodeSelectorOpIn,
- Values: []string{node},
- },
- },
- },
- },
- },
- },
- },
- }
- if pvcName != "" {
- pv.Spec.ClaimRef = &v1.ObjectReference{Name: pvcName, Namespace: ns}
- }
- return pv
- }
- func makePVC(name, ns string, scName *string, volumeName string) *v1.PersistentVolumeClaim {
- return &v1.PersistentVolumeClaim{
- ObjectMeta: metav1.ObjectMeta{
- Name: name,
- Namespace: ns,
- },
- Spec: v1.PersistentVolumeClaimSpec{
- AccessModes: []v1.PersistentVolumeAccessMode{
- v1.ReadWriteOnce,
- },
- Resources: v1.ResourceRequirements{
- Requests: v1.ResourceList{
- v1.ResourceName(v1.ResourceStorage): resource.MustParse("5Gi"),
- },
- },
- StorageClassName: scName,
- VolumeName: volumeName,
- },
- }
- }
- func makePod(name, ns string, pvcs []string) *v1.Pod {
- volumes := []v1.Volume{}
- for i, pvc := range pvcs {
- volumes = append(volumes, v1.Volume{
- Name: fmt.Sprintf("vol%v", i),
- VolumeSource: v1.VolumeSource{
- PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
- ClaimName: pvc,
- },
- },
- })
- }
- return &v1.Pod{
- ObjectMeta: metav1.ObjectMeta{
- Name: name,
- Namespace: ns,
- Labels: map[string]string{
- "app": "volume-binding-test",
- },
- },
- Spec: v1.PodSpec{
- Containers: []v1.Container{
- {
- Name: "write-pod",
- Image: imageutils.GetE2EImage(imageutils.BusyBox),
- Command: []string{"/bin/sh"},
- Args: []string{"-c", "while true; do sleep 1; done"},
- },
- },
- Volumes: volumes,
- },
- }
- }
- func makeNode(index int) *v1.Node {
- return &v1.Node{
- ObjectMeta: metav1.ObjectMeta{
- Name: fmt.Sprintf("node-%d", index+1),
- Labels: map[string]string{nodeAffinityLabelKey: fmt.Sprintf("node-%d", index+1)},
- },
- Spec: v1.NodeSpec{Unschedulable: false},
- Status: v1.NodeStatus{
- Capacity: v1.ResourceList{
- v1.ResourcePods: *resource.NewQuantity(podLimit, resource.DecimalSI),
- },
- Conditions: []v1.NodeCondition{
- {
- Type: v1.NodeReady,
- Status: v1.ConditionTrue,
- Reason: fmt.Sprintf("schedulable condition"),
- LastHeartbeatTime: metav1.Time{Time: time.Now()},
- },
- },
- },
- }
- }
- func validatePVCPhase(t *testing.T, client clientset.Interface, pvcName string, ns string, phase v1.PersistentVolumeClaimPhase, isProvisioned bool) {
- claim, err := client.CoreV1().PersistentVolumeClaims(ns).Get(context.TODO(), pvcName, metav1.GetOptions{})
- if err != nil {
- t.Errorf("Failed to get PVC %v/%v: %v", ns, pvcName, err)
- }
- if claim.Status.Phase != phase {
- t.Errorf("PVC %v/%v phase not %v, got %v", ns, pvcName, phase, claim.Status.Phase)
- }
- // Check whether the bound claim is provisioned/bound as expect.
- if phase == v1.ClaimBound {
- if err := validateProvisionAnn(claim, isProvisioned); err != nil {
- t.Errorf("Provisoning annotaion on PVC %v/%v not bahaviors as expected: %v", ns, pvcName, err)
- }
- }
- }
- func validateProvisionAnn(claim *v1.PersistentVolumeClaim, volIsProvisioned bool) error {
- selectedNode, provisionAnnoExist := claim.Annotations["volume.kubernetes.io/selected-node"]
- if volIsProvisioned {
- if !provisionAnnoExist {
- return fmt.Errorf("PVC %v/%v expected to be provisioned, but no selected-node annotation found", claim.Namespace, claim.Name)
- }
- if selectedNode != node1 {
- return fmt.Errorf("PVC %v/%v expected to be annotated as %v, but got %v", claim.Namespace, claim.Name, node1, selectedNode)
- }
- }
- if !volIsProvisioned && provisionAnnoExist {
- return fmt.Errorf("PVC %v/%v not expected to be provisioned, but found selected-node annotation", claim.Namespace, claim.Name)
- }
- return nil
- }
- func waitForProvisionAnn(client clientset.Interface, pvc *v1.PersistentVolumeClaim, annShouldExist bool) error {
- return wait.Poll(time.Second, 30*time.Second, func() (bool, error) {
- claim, err := client.CoreV1().PersistentVolumeClaims(pvc.Namespace).Get(context.TODO(), pvc.Name, metav1.GetOptions{})
- if err != nil {
- return false, err
- }
- if err := validateProvisionAnn(claim, annShouldExist); err == nil {
- return true, nil
- }
- return false, nil
- })
- }
- func validatePVPhase(t *testing.T, client clientset.Interface, pvName string, phase v1.PersistentVolumePhase) {
- pv, err := client.CoreV1().PersistentVolumes().Get(context.TODO(), pvName, metav1.GetOptions{})
- if err != nil {
- t.Errorf("Failed to get PV %v: %v", pvName, err)
- }
- if pv.Status.Phase != phase {
- t.Errorf("PV %v phase not %v, got %v", pvName, phase, pv.Status.Phase)
- }
- }
- func waitForPVPhase(client clientset.Interface, pvName string, phase v1.PersistentVolumePhase) error {
- return wait.PollImmediate(time.Second, 30*time.Second, func() (bool, error) {
- pv, err := client.CoreV1().PersistentVolumes().Get(context.TODO(), pvName, metav1.GetOptions{})
- if err != nil {
- return false, err
- }
- if pv.Status.Phase == phase {
- return true, nil
- }
- return false, nil
- })
- }
- func waitForPVCBound(client clientset.Interface, pvc *v1.PersistentVolumeClaim) error {
- return wait.Poll(time.Second, 30*time.Second, func() (bool, error) {
- claim, err := client.CoreV1().PersistentVolumeClaims(pvc.Namespace).Get(context.TODO(), pvc.Name, metav1.GetOptions{})
- if err != nil {
- return false, err
- }
- if claim.Status.Phase == v1.ClaimBound {
- return true, nil
- }
- return false, nil
- })
- }
- func markNodeAffinity(pod *v1.Pod, node string) {
- affinity := &v1.Affinity{
- NodeAffinity: &v1.NodeAffinity{
- RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
- NodeSelectorTerms: []v1.NodeSelectorTerm{
- {
- MatchExpressions: []v1.NodeSelectorRequirement{
- {
- Key: nodeAffinityLabelKey,
- Operator: v1.NodeSelectorOpIn,
- Values: []string{node},
- },
- },
- },
- },
- },
- },
- }
- pod.Spec.Affinity = affinity
- }
- func markNodeSelector(pod *v1.Pod, node string) {
- ns := map[string]string{
- nodeAffinityLabelKey: node,
- }
- pod.Spec.NodeSelector = ns
- }
|