regional_pd.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package storage
  14. import (
  15. "context"
  16. "github.com/onsi/ginkgo"
  17. "github.com/onsi/gomega"
  18. "fmt"
  19. "strings"
  20. "time"
  21. "encoding/json"
  22. appsv1 "k8s.io/api/apps/v1"
  23. v1 "k8s.io/api/core/v1"
  24. storagev1 "k8s.io/api/storage/v1"
  25. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  26. "k8s.io/apimachinery/pkg/labels"
  27. "k8s.io/apimachinery/pkg/types"
  28. "k8s.io/apimachinery/pkg/util/sets"
  29. "k8s.io/apimachinery/pkg/util/strategicpatch"
  30. "k8s.io/apimachinery/pkg/util/wait"
  31. clientset "k8s.io/client-go/kubernetes"
  32. volumehelpers "k8s.io/cloud-provider/volume/helpers"
  33. podutil "k8s.io/kubernetes/pkg/api/v1/pod"
  34. "k8s.io/kubernetes/test/e2e/framework"
  35. e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
  36. e2epv "k8s.io/kubernetes/test/e2e/framework/pv"
  37. e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
  38. "k8s.io/kubernetes/test/e2e/storage/testsuites"
  39. "k8s.io/kubernetes/test/e2e/storage/utils"
  40. imageutils "k8s.io/kubernetes/test/utils/image"
  41. )
  42. const (
  43. pvDeletionTimeout = 3 * time.Minute
  44. statefulSetReadyTimeout = 3 * time.Minute
  45. taintKeyPrefix = "zoneTaint_"
  46. repdMinSize = "200Gi"
  47. pvcName = "regional-pd-vol"
  48. )
  49. var _ = utils.SIGDescribe("Regional PD", func() {
  50. f := framework.NewDefaultFramework("regional-pd")
  51. // filled in BeforeEach
  52. var c clientset.Interface
  53. var ns string
  54. ginkgo.BeforeEach(func() {
  55. c = f.ClientSet
  56. ns = f.Namespace.Name
  57. e2eskipper.SkipUnlessProviderIs("gce", "gke")
  58. e2eskipper.SkipUnlessMultizone(c)
  59. })
  60. ginkgo.Describe("RegionalPD", func() {
  61. ginkgo.It("should provision storage [Slow]", func() {
  62. testVolumeProvisioning(c, ns)
  63. })
  64. ginkgo.It("should provision storage with delayed binding [Slow]", func() {
  65. testRegionalDelayedBinding(c, ns, 1 /* pvcCount */)
  66. testRegionalDelayedBinding(c, ns, 3 /* pvcCount */)
  67. })
  68. ginkgo.It("should provision storage in the allowedTopologies [Slow]", func() {
  69. testRegionalAllowedTopologies(c, ns)
  70. })
  71. ginkgo.It("should provision storage in the allowedTopologies with delayed binding [Slow]", func() {
  72. testRegionalAllowedTopologiesWithDelayedBinding(c, ns, 1 /* pvcCount */)
  73. testRegionalAllowedTopologiesWithDelayedBinding(c, ns, 3 /* pvcCount */)
  74. })
  75. ginkgo.It("should failover to a different zone when all nodes in one zone become unreachable [Slow] [Disruptive]", func() {
  76. testZonalFailover(c, ns)
  77. })
  78. })
  79. })
  80. func testVolumeProvisioning(c clientset.Interface, ns string) {
  81. cloudZones := getTwoRandomZones(c)
  82. // This test checks that dynamic provisioning can provision a volume
  83. // that can be used to persist data among pods.
  84. tests := []testsuites.StorageClassTest{
  85. {
  86. Name: "HDD Regional PD on GCE/GKE",
  87. CloudProviders: []string{"gce", "gke"},
  88. Provisioner: "kubernetes.io/gce-pd",
  89. Parameters: map[string]string{
  90. "type": "pd-standard",
  91. "zones": strings.Join(cloudZones, ","),
  92. "replication-type": "regional-pd",
  93. },
  94. ClaimSize: repdMinSize,
  95. ExpectedSize: repdMinSize,
  96. PvCheck: func(claim *v1.PersistentVolumeClaim) {
  97. volume := testsuites.PVWriteReadSingleNodeCheck(c, claim, e2epod.NodeSelection{})
  98. gomega.Expect(volume).NotTo(gomega.BeNil())
  99. err := checkGCEPD(volume, "pd-standard")
  100. framework.ExpectNoError(err, "checkGCEPD")
  101. err = verifyZonesInPV(volume, sets.NewString(cloudZones...), true /* match */)
  102. framework.ExpectNoError(err, "verifyZonesInPV")
  103. },
  104. },
  105. {
  106. Name: "HDD Regional PD with auto zone selection on GCE/GKE",
  107. CloudProviders: []string{"gce", "gke"},
  108. Provisioner: "kubernetes.io/gce-pd",
  109. Parameters: map[string]string{
  110. "type": "pd-standard",
  111. "replication-type": "regional-pd",
  112. },
  113. ClaimSize: repdMinSize,
  114. ExpectedSize: repdMinSize,
  115. PvCheck: func(claim *v1.PersistentVolumeClaim) {
  116. volume := testsuites.PVWriteReadSingleNodeCheck(c, claim, e2epod.NodeSelection{})
  117. gomega.Expect(volume).NotTo(gomega.BeNil())
  118. err := checkGCEPD(volume, "pd-standard")
  119. framework.ExpectNoError(err, "checkGCEPD")
  120. zones, err := framework.GetClusterZones(c)
  121. framework.ExpectNoError(err, "GetClusterZones")
  122. err = verifyZonesInPV(volume, zones, false /* match */)
  123. framework.ExpectNoError(err, "verifyZonesInPV")
  124. },
  125. },
  126. }
  127. for _, test := range tests {
  128. test.Client = c
  129. test.Class = newStorageClass(test, ns, "" /* suffix */)
  130. test.Claim = e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{
  131. ClaimSize: test.ClaimSize,
  132. StorageClassName: &(test.Class.Name),
  133. VolumeMode: &test.VolumeMode,
  134. }, ns)
  135. test.TestDynamicProvisioning()
  136. }
  137. }
  138. func testZonalFailover(c clientset.Interface, ns string) {
  139. cloudZones := getTwoRandomZones(c)
  140. testSpec := testsuites.StorageClassTest{
  141. Name: "Regional PD Failover on GCE/GKE",
  142. CloudProviders: []string{"gce", "gke"},
  143. Provisioner: "kubernetes.io/gce-pd",
  144. Parameters: map[string]string{
  145. "type": "pd-standard",
  146. "zones": strings.Join(cloudZones, ","),
  147. "replication-type": "regional-pd",
  148. },
  149. ClaimSize: repdMinSize,
  150. ExpectedSize: repdMinSize,
  151. }
  152. class := newStorageClass(testSpec, ns, "" /* suffix */)
  153. claimTemplate := e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{
  154. NamePrefix: pvcName,
  155. ClaimSize: testSpec.ClaimSize,
  156. StorageClassName: &(class.Name),
  157. VolumeMode: &testSpec.VolumeMode,
  158. }, ns)
  159. statefulSet, service, regionalPDLabels := newStatefulSet(claimTemplate, ns)
  160. ginkgo.By("creating a StorageClass " + class.Name)
  161. _, err := c.StorageV1().StorageClasses().Create(context.TODO(), class, metav1.CreateOptions{})
  162. framework.ExpectNoError(err)
  163. defer func() {
  164. framework.Logf("deleting storage class %s", class.Name)
  165. framework.ExpectNoError(c.StorageV1().StorageClasses().Delete(context.TODO(), class.Name, nil),
  166. "Error deleting StorageClass %s", class.Name)
  167. }()
  168. ginkgo.By("creating a StatefulSet")
  169. _, err = c.CoreV1().Services(ns).Create(context.TODO(), service, metav1.CreateOptions{})
  170. framework.ExpectNoError(err)
  171. _, err = c.AppsV1().StatefulSets(ns).Create(context.TODO(), statefulSet, metav1.CreateOptions{})
  172. framework.ExpectNoError(err)
  173. defer func() {
  174. framework.Logf("deleting statefulset%q/%q", statefulSet.Namespace, statefulSet.Name)
  175. // typically this claim has already been deleted
  176. framework.ExpectNoError(c.AppsV1().StatefulSets(ns).Delete(context.TODO(), statefulSet.Name, nil),
  177. "Error deleting StatefulSet %s", statefulSet.Name)
  178. framework.Logf("deleting claims in namespace %s", ns)
  179. pvc := getPVC(c, ns, regionalPDLabels)
  180. framework.ExpectNoError(c.CoreV1().PersistentVolumeClaims(pvc.Namespace).Delete(context.TODO(), pvc.Name, nil),
  181. "Error deleting claim %s.", pvc.Name)
  182. if pvc.Spec.VolumeName != "" {
  183. err = framework.WaitForPersistentVolumeDeleted(c, pvc.Spec.VolumeName, framework.Poll, pvDeletionTimeout)
  184. if err != nil {
  185. framework.Logf("WARNING: PV %s is not yet deleted, and subsequent tests may be affected.", pvc.Spec.VolumeName)
  186. }
  187. }
  188. }()
  189. err = waitForStatefulSetReplicasReady(statefulSet.Name, ns, c, framework.Poll, statefulSetReadyTimeout)
  190. if err != nil {
  191. pod := getPod(c, ns, regionalPDLabels)
  192. framework.ExpectEqual(podutil.IsPodReadyConditionTrue(pod.Status), true, "The statefulset pod has the following conditions: %s", pod.Status.Conditions)
  193. framework.ExpectNoError(err)
  194. }
  195. pvc := getPVC(c, ns, regionalPDLabels)
  196. ginkgo.By("getting zone information from pod")
  197. pod := getPod(c, ns, regionalPDLabels)
  198. nodeName := pod.Spec.NodeName
  199. node, err := c.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
  200. framework.ExpectNoError(err)
  201. podZone := node.Labels[v1.LabelZoneFailureDomain]
  202. ginkgo.By("tainting nodes in the zone the pod is scheduled in")
  203. selector := labels.SelectorFromSet(labels.Set(map[string]string{v1.LabelZoneFailureDomain: podZone}))
  204. nodesInZone, err := c.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{LabelSelector: selector.String()})
  205. framework.ExpectNoError(err)
  206. removeTaintFunc := addTaint(c, ns, nodesInZone.Items, podZone)
  207. defer func() {
  208. framework.Logf("removing previously added node taints")
  209. removeTaintFunc()
  210. }()
  211. ginkgo.By("deleting StatefulSet pod")
  212. err = c.CoreV1().Pods(ns).Delete(context.TODO(), pod.Name, &metav1.DeleteOptions{})
  213. // Verify the pod is scheduled in the other zone.
  214. ginkgo.By("verifying the pod is scheduled in a different zone.")
  215. var otherZone string
  216. if cloudZones[0] == podZone {
  217. otherZone = cloudZones[1]
  218. } else {
  219. otherZone = cloudZones[0]
  220. }
  221. waitErr := wait.PollImmediate(framework.Poll, statefulSetReadyTimeout, func() (bool, error) {
  222. framework.Logf("Checking whether new pod is scheduled in zone %q", otherZone)
  223. pod := getPod(c, ns, regionalPDLabels)
  224. node, err := c.CoreV1().Nodes().Get(context.TODO(), pod.Spec.NodeName, metav1.GetOptions{})
  225. if err != nil {
  226. return false, nil
  227. }
  228. newPodZone := node.Labels[v1.LabelZoneFailureDomain]
  229. return newPodZone == otherZone, nil
  230. })
  231. framework.ExpectNoError(waitErr, "Error waiting for pod to be scheduled in a different zone (%q): %v", otherZone, err)
  232. err = waitForStatefulSetReplicasReady(statefulSet.Name, ns, c, 3*time.Second, framework.RestartPodReadyAgainTimeout)
  233. if err != nil {
  234. pod := getPod(c, ns, regionalPDLabels)
  235. framework.ExpectEqual(podutil.IsPodReadyConditionTrue(pod.Status), true, "The statefulset pod has the following conditions: %s", pod.Status.Conditions)
  236. framework.ExpectNoError(err)
  237. }
  238. ginkgo.By("verifying the same PVC is used by the new pod")
  239. framework.ExpectEqual(getPVC(c, ns, regionalPDLabels).Name, pvc.Name, "The same PVC should be used after failover.")
  240. ginkgo.By("verifying the container output has 2 lines, indicating the pod has been created twice using the same regional PD.")
  241. logs, err := e2epod.GetPodLogs(c, ns, pod.Name, "")
  242. framework.ExpectNoError(err,
  243. "Error getting logs from pod %s in namespace %s", pod.Name, ns)
  244. lineCount := len(strings.Split(strings.TrimSpace(logs), "\n"))
  245. expectedLineCount := 2
  246. framework.ExpectEqual(lineCount, expectedLineCount, "Line count of the written file should be %d.", expectedLineCount)
  247. }
  248. func addTaint(c clientset.Interface, ns string, nodes []v1.Node, podZone string) (removeTaint func()) {
  249. reversePatches := make(map[string][]byte)
  250. for _, node := range nodes {
  251. oldData, err := json.Marshal(node)
  252. framework.ExpectNoError(err)
  253. node.Spec.Taints = append(node.Spec.Taints, v1.Taint{
  254. Key: taintKeyPrefix + ns,
  255. Value: podZone,
  256. Effect: v1.TaintEffectNoSchedule,
  257. })
  258. newData, err := json.Marshal(node)
  259. framework.ExpectNoError(err)
  260. patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Node{})
  261. framework.ExpectNoError(err)
  262. reversePatchBytes, err := strategicpatch.CreateTwoWayMergePatch(newData, oldData, v1.Node{})
  263. framework.ExpectNoError(err)
  264. reversePatches[node.Name] = reversePatchBytes
  265. _, err = c.CoreV1().Nodes().Patch(context.TODO(), node.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
  266. framework.ExpectNoError(err)
  267. }
  268. return func() {
  269. for nodeName, reversePatch := range reversePatches {
  270. _, err := c.CoreV1().Nodes().Patch(context.TODO(), nodeName, types.StrategicMergePatchType, reversePatch, metav1.PatchOptions{})
  271. framework.ExpectNoError(err)
  272. }
  273. }
  274. }
  275. func testRegionalDelayedBinding(c clientset.Interface, ns string, pvcCount int) {
  276. test := testsuites.StorageClassTest{
  277. Client: c,
  278. Name: "Regional PD storage class with waitForFirstConsumer test on GCE",
  279. Provisioner: "kubernetes.io/gce-pd",
  280. Parameters: map[string]string{
  281. "type": "pd-standard",
  282. "replication-type": "regional-pd",
  283. },
  284. ClaimSize: repdMinSize,
  285. DelayBinding: true,
  286. }
  287. suffix := "delayed-regional"
  288. test.Class = newStorageClass(test, ns, suffix)
  289. var claims []*v1.PersistentVolumeClaim
  290. for i := 0; i < pvcCount; i++ {
  291. claim := e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{
  292. ClaimSize: test.ClaimSize,
  293. StorageClassName: &(test.Class.Name),
  294. VolumeMode: &test.VolumeMode,
  295. }, ns)
  296. claims = append(claims, claim)
  297. }
  298. pvs, node := test.TestBindingWaitForFirstConsumerMultiPVC(claims, nil /* node selector */, false /* expect unschedulable */)
  299. if node == nil {
  300. framework.Failf("unexpected nil node found")
  301. }
  302. zone, ok := node.Labels[v1.LabelZoneFailureDomain]
  303. if !ok {
  304. framework.Failf("label %s not found on Node", v1.LabelZoneFailureDomain)
  305. }
  306. for _, pv := range pvs {
  307. checkZoneFromLabelAndAffinity(pv, zone, false)
  308. }
  309. }
  310. func testRegionalAllowedTopologies(c clientset.Interface, ns string) {
  311. test := testsuites.StorageClassTest{
  312. Name: "Regional PD storage class with allowedTopologies test on GCE",
  313. Provisioner: "kubernetes.io/gce-pd",
  314. Parameters: map[string]string{
  315. "type": "pd-standard",
  316. "replication-type": "regional-pd",
  317. },
  318. ClaimSize: repdMinSize,
  319. ExpectedSize: repdMinSize,
  320. }
  321. suffix := "topo-regional"
  322. test.Client = c
  323. test.Class = newStorageClass(test, ns, suffix)
  324. zones := getTwoRandomZones(c)
  325. addAllowedTopologiesToStorageClass(c, test.Class, zones)
  326. test.Claim = e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{
  327. NamePrefix: pvcName,
  328. ClaimSize: test.ClaimSize,
  329. StorageClassName: &(test.Class.Name),
  330. VolumeMode: &test.VolumeMode,
  331. }, ns)
  332. pv := test.TestDynamicProvisioning()
  333. checkZonesFromLabelAndAffinity(pv, sets.NewString(zones...), true)
  334. }
  335. func testRegionalAllowedTopologiesWithDelayedBinding(c clientset.Interface, ns string, pvcCount int) {
  336. test := testsuites.StorageClassTest{
  337. Client: c,
  338. Name: "Regional PD storage class with allowedTopologies and waitForFirstConsumer test on GCE",
  339. Provisioner: "kubernetes.io/gce-pd",
  340. Parameters: map[string]string{
  341. "type": "pd-standard",
  342. "replication-type": "regional-pd",
  343. },
  344. ClaimSize: repdMinSize,
  345. DelayBinding: true,
  346. }
  347. suffix := "topo-delayed-regional"
  348. test.Class = newStorageClass(test, ns, suffix)
  349. topoZones := getTwoRandomZones(c)
  350. addAllowedTopologiesToStorageClass(c, test.Class, topoZones)
  351. var claims []*v1.PersistentVolumeClaim
  352. for i := 0; i < pvcCount; i++ {
  353. claim := e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{
  354. ClaimSize: test.ClaimSize,
  355. StorageClassName: &(test.Class.Name),
  356. VolumeMode: &test.VolumeMode,
  357. }, ns)
  358. claims = append(claims, claim)
  359. }
  360. pvs, node := test.TestBindingWaitForFirstConsumerMultiPVC(claims, nil /* node selector */, false /* expect unschedulable */)
  361. if node == nil {
  362. framework.Failf("unexpected nil node found")
  363. }
  364. nodeZone, ok := node.Labels[v1.LabelZoneFailureDomain]
  365. if !ok {
  366. framework.Failf("label %s not found on Node", v1.LabelZoneFailureDomain)
  367. }
  368. zoneFound := false
  369. for _, zone := range topoZones {
  370. if zone == nodeZone {
  371. zoneFound = true
  372. break
  373. }
  374. }
  375. if !zoneFound {
  376. framework.Failf("zones specified in AllowedTopologies: %v does not contain zone of node where PV got provisioned: %s", topoZones, nodeZone)
  377. }
  378. for _, pv := range pvs {
  379. checkZonesFromLabelAndAffinity(pv, sets.NewString(topoZones...), true)
  380. }
  381. }
  382. func getPVC(c clientset.Interface, ns string, pvcLabels map[string]string) *v1.PersistentVolumeClaim {
  383. selector := labels.Set(pvcLabels).AsSelector()
  384. options := metav1.ListOptions{LabelSelector: selector.String()}
  385. pvcList, err := c.CoreV1().PersistentVolumeClaims(ns).List(context.TODO(), options)
  386. framework.ExpectNoError(err)
  387. framework.ExpectEqual(len(pvcList.Items), 1, "There should be exactly 1 PVC matched.")
  388. return &pvcList.Items[0]
  389. }
  390. func getPod(c clientset.Interface, ns string, podLabels map[string]string) *v1.Pod {
  391. selector := labels.Set(podLabels).AsSelector()
  392. options := metav1.ListOptions{LabelSelector: selector.String()}
  393. podList, err := c.CoreV1().Pods(ns).List(context.TODO(), options)
  394. framework.ExpectNoError(err)
  395. framework.ExpectEqual(len(podList.Items), 1, "There should be exactly 1 pod matched.")
  396. return &podList.Items[0]
  397. }
  398. func addAllowedTopologiesToStorageClass(c clientset.Interface, sc *storagev1.StorageClass, zones []string) {
  399. term := v1.TopologySelectorTerm{
  400. MatchLabelExpressions: []v1.TopologySelectorLabelRequirement{
  401. {
  402. Key: v1.LabelZoneFailureDomain,
  403. Values: zones,
  404. },
  405. },
  406. }
  407. sc.AllowedTopologies = append(sc.AllowedTopologies, term)
  408. }
  409. // Generates the spec of a StatefulSet with 1 replica that mounts a Regional PD.
  410. func newStatefulSet(claimTemplate *v1.PersistentVolumeClaim, ns string) (sts *appsv1.StatefulSet, svc *v1.Service, labels map[string]string) {
  411. var replicas int32 = 1
  412. labels = map[string]string{"app": "regional-pd-workload"}
  413. svc = &v1.Service{
  414. ObjectMeta: metav1.ObjectMeta{
  415. Name: "regional-pd-service",
  416. Namespace: ns,
  417. Labels: labels,
  418. },
  419. Spec: v1.ServiceSpec{
  420. Ports: []v1.ServicePort{{
  421. Port: 80,
  422. Name: "web",
  423. }},
  424. ClusterIP: v1.ClusterIPNone,
  425. Selector: labels,
  426. },
  427. }
  428. sts = &appsv1.StatefulSet{
  429. ObjectMeta: metav1.ObjectMeta{
  430. Name: "regional-pd-sts",
  431. Namespace: ns,
  432. },
  433. Spec: appsv1.StatefulSetSpec{
  434. Selector: &metav1.LabelSelector{
  435. MatchLabels: labels,
  436. },
  437. ServiceName: svc.Name,
  438. Replicas: &replicas,
  439. Template: *newPodTemplate(labels),
  440. VolumeClaimTemplates: []v1.PersistentVolumeClaim{*claimTemplate},
  441. },
  442. }
  443. return
  444. }
  445. func newPodTemplate(labels map[string]string) *v1.PodTemplateSpec {
  446. return &v1.PodTemplateSpec{
  447. ObjectMeta: metav1.ObjectMeta{
  448. Labels: labels,
  449. },
  450. Spec: v1.PodSpec{
  451. Containers: []v1.Container{
  452. // This container writes its pod name to a file in the Regional PD
  453. // and prints the entire file to stdout.
  454. {
  455. Name: "busybox",
  456. Image: imageutils.GetE2EImage(imageutils.BusyBox),
  457. Command: []string{"sh", "-c"},
  458. Args: []string{
  459. "echo ${POD_NAME} >> /mnt/data/regional-pd/pods.txt;" +
  460. "cat /mnt/data/regional-pd/pods.txt;" +
  461. "sleep 3600;",
  462. },
  463. Env: []v1.EnvVar{{
  464. Name: "POD_NAME",
  465. ValueFrom: &v1.EnvVarSource{
  466. FieldRef: &v1.ObjectFieldSelector{
  467. FieldPath: "metadata.name",
  468. },
  469. },
  470. }},
  471. Ports: []v1.ContainerPort{{
  472. ContainerPort: 80,
  473. Name: "web",
  474. }},
  475. VolumeMounts: []v1.VolumeMount{{
  476. Name: pvcName,
  477. MountPath: "/mnt/data/regional-pd",
  478. }},
  479. },
  480. },
  481. },
  482. }
  483. }
  484. func getTwoRandomZones(c clientset.Interface) []string {
  485. zones, err := framework.GetClusterZones(c)
  486. framework.ExpectNoError(err)
  487. gomega.Expect(zones.Len()).To(gomega.BeNumerically(">=", 2),
  488. "The test should only be run in multizone clusters.")
  489. zone1, _ := zones.PopAny()
  490. zone2, _ := zones.PopAny()
  491. return []string{zone1, zone2}
  492. }
  493. // If match is true, check if zones in PV exactly match zones given.
  494. // Otherwise, check whether zones in PV is superset of zones given.
  495. func verifyZonesInPV(volume *v1.PersistentVolume, zones sets.String, match bool) error {
  496. pvZones, err := volumehelpers.LabelZonesToSet(volume.Labels[v1.LabelZoneFailureDomain])
  497. if err != nil {
  498. return err
  499. }
  500. if match && zones.Equal(pvZones) || !match && zones.IsSuperset(pvZones) {
  501. return nil
  502. }
  503. return fmt.Errorf("Zones in StorageClass are %v, but zones in PV are %v", zones, pvZones)
  504. }
  505. func checkZoneFromLabelAndAffinity(pv *v1.PersistentVolume, zone string, matchZone bool) {
  506. checkZonesFromLabelAndAffinity(pv, sets.NewString(zone), matchZone)
  507. }
  508. // checkZoneLabelAndAffinity checks the LabelZoneFailureDomain label of PV and terms
  509. // with key LabelZoneFailureDomain in PV's node affinity contains zone
  510. // matchZones is used to indicate if zones should match perfectly
  511. func checkZonesFromLabelAndAffinity(pv *v1.PersistentVolume, zones sets.String, matchZones bool) {
  512. ginkgo.By("checking PV's zone label and node affinity terms match expected zone")
  513. if pv == nil {
  514. framework.Failf("nil pv passed")
  515. }
  516. pvLabel, ok := pv.Labels[v1.LabelZoneFailureDomain]
  517. if !ok {
  518. framework.Failf("label %s not found on PV", v1.LabelZoneFailureDomain)
  519. }
  520. zonesFromLabel, err := volumehelpers.LabelZonesToSet(pvLabel)
  521. if err != nil {
  522. framework.Failf("unable to parse zone labels %s: %v", pvLabel, err)
  523. }
  524. if matchZones && !zonesFromLabel.Equal(zones) {
  525. framework.Failf("value[s] of %s label for PV: %v does not match expected zone[s]: %v", v1.LabelZoneFailureDomain, zonesFromLabel, zones)
  526. }
  527. if !matchZones && !zonesFromLabel.IsSuperset(zones) {
  528. framework.Failf("value[s] of %s label for PV: %v does not contain expected zone[s]: %v", v1.LabelZoneFailureDomain, zonesFromLabel, zones)
  529. }
  530. if pv.Spec.NodeAffinity == nil {
  531. framework.Failf("node affinity not found in PV spec %v", pv.Spec)
  532. }
  533. if len(pv.Spec.NodeAffinity.Required.NodeSelectorTerms) == 0 {
  534. framework.Failf("node selector terms not found in PV spec %v", pv.Spec)
  535. }
  536. for _, term := range pv.Spec.NodeAffinity.Required.NodeSelectorTerms {
  537. keyFound := false
  538. for _, r := range term.MatchExpressions {
  539. if r.Key != v1.LabelZoneFailureDomain {
  540. continue
  541. }
  542. keyFound = true
  543. zonesFromNodeAffinity := sets.NewString(r.Values...)
  544. if matchZones && !zonesFromNodeAffinity.Equal(zones) {
  545. framework.Failf("zones from NodeAffinity of PV: %v does not equal expected zone[s]: %v", zonesFromNodeAffinity, zones)
  546. }
  547. if !matchZones && !zonesFromNodeAffinity.IsSuperset(zones) {
  548. framework.Failf("zones from NodeAffinity of PV: %v does not contain expected zone[s]: %v", zonesFromNodeAffinity, zones)
  549. }
  550. break
  551. }
  552. if !keyFound {
  553. framework.Failf("label %s not found in term %v", v1.LabelZoneFailureDomain, term)
  554. }
  555. }
  556. }
  557. // waitForStatefulSetReplicasReady waits for all replicas of a StatefulSet to become ready or until timeout occurs, whichever comes first.
  558. func waitForStatefulSetReplicasReady(statefulSetName, ns string, c clientset.Interface, Poll, timeout time.Duration) error {
  559. framework.Logf("Waiting up to %v for StatefulSet %s to have all replicas ready", timeout, statefulSetName)
  560. for start := time.Now(); time.Since(start) < timeout; time.Sleep(Poll) {
  561. sts, err := c.AppsV1().StatefulSets(ns).Get(context.TODO(), statefulSetName, metav1.GetOptions{})
  562. if err != nil {
  563. framework.Logf("Get StatefulSet %s failed, ignoring for %v: %v", statefulSetName, Poll, err)
  564. continue
  565. }
  566. if sts.Status.ReadyReplicas == *sts.Spec.Replicas {
  567. framework.Logf("All %d replicas of StatefulSet %s are ready. (%v)", sts.Status.ReadyReplicas, statefulSetName, time.Since(start))
  568. return nil
  569. }
  570. framework.Logf("StatefulSet %s found but there are %d ready replicas and %d total replicas.", statefulSetName, sts.Status.ReadyReplicas, *sts.Spec.Replicas)
  571. }
  572. return fmt.Errorf("StatefulSet %s still has unready pods within %v", statefulSetName, timeout)
  573. }