regional_pd.go 19 KB


  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package storage
  14. import (
  15. "github.com/onsi/ginkgo"
  16. "github.com/onsi/gomega"
  17. "fmt"
  18. "strings"
  19. "time"
  20. "encoding/json"
  21. appsv1 "k8s.io/api/apps/v1"
  22. v1 "k8s.io/api/core/v1"
  23. storage "k8s.io/api/storage/v1"
  24. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  25. "k8s.io/apimachinery/pkg/labels"
  26. "k8s.io/apimachinery/pkg/types"
  27. "k8s.io/apimachinery/pkg/util/sets"
  28. "k8s.io/apimachinery/pkg/util/strategicpatch"
  29. "k8s.io/apimachinery/pkg/util/wait"
  30. clientset "k8s.io/client-go/kubernetes"
  31. volumehelpers "k8s.io/cloud-provider/volume/helpers"
  32. podutil "k8s.io/kubernetes/pkg/api/v1/pod"
  33. "k8s.io/kubernetes/test/e2e/framework"
  34. e2elog "k8s.io/kubernetes/test/e2e/framework/log"
  35. "k8s.io/kubernetes/test/e2e/storage/testsuites"
  36. "k8s.io/kubernetes/test/e2e/storage/utils"
  37. imageutils "k8s.io/kubernetes/test/utils/image"
  38. )
  39. const (
  40. pvDeletionTimeout = 3 * time.Minute
  41. statefulSetReadyTimeout = 3 * time.Minute
  42. taintKeyPrefix = "zoneTaint_"
  43. repdMinSize = "200Gi"
  44. pvcName = "regional-pd-vol"
  45. )
  46. var _ = utils.SIGDescribe("Regional PD", func() {
  47. f := framework.NewDefaultFramework("regional-pd")
  48. // filled in BeforeEach
  49. var c clientset.Interface
  50. var ns string
  51. ginkgo.BeforeEach(func() {
  52. c = f.ClientSet
  53. ns = f.Namespace.Name
  54. framework.SkipUnlessProviderIs("gce", "gke")
  55. framework.SkipUnlessMultizone(c)
  56. })
  57. ginkgo.Describe("RegionalPD", func() {
  58. ginkgo.It("should provision storage [Slow]", func() {
  59. testVolumeProvisioning(c, ns)
  60. })
  61. ginkgo.It("should provision storage with delayed binding [Slow]", func() {
  62. testRegionalDelayedBinding(c, ns, 1 /* pvcCount */)
  63. testRegionalDelayedBinding(c, ns, 3 /* pvcCount */)
  64. })
  65. ginkgo.It("should provision storage in the allowedTopologies [Slow]", func() {
  66. testRegionalAllowedTopologies(c, ns)
  67. })
  68. ginkgo.It("should provision storage in the allowedTopologies with delayed binding [Slow]", func() {
  69. testRegionalAllowedTopologiesWithDelayedBinding(c, ns, 1 /* pvcCount */)
  70. testRegionalAllowedTopologiesWithDelayedBinding(c, ns, 3 /* pvcCount */)
  71. })
  72. ginkgo.It("should failover to a different zone when all nodes in one zone become unreachable [Slow] [Disruptive]", func() {
  73. testZonalFailover(c, ns)
  74. })
  75. })
  76. })
  77. func testVolumeProvisioning(c clientset.Interface, ns string) {
  78. cloudZones := getTwoRandomZones(c)
  79. // This test checks that dynamic provisioning can provision a volume
  80. // that can be used to persist data among pods.
  81. tests := []testsuites.StorageClassTest{
  82. {
  83. Name: "HDD Regional PD on GCE/GKE",
  84. CloudProviders: []string{"gce", "gke"},
  85. Provisioner: "kubernetes.io/gce-pd",
  86. Parameters: map[string]string{
  87. "type": "pd-standard",
  88. "zones": strings.Join(cloudZones, ","),
  89. "replication-type": "regional-pd",
  90. },
  91. ClaimSize: repdMinSize,
  92. ExpectedSize: repdMinSize,
  93. PvCheck: func(claim *v1.PersistentVolumeClaim) {
  94. volume := testsuites.PVWriteReadSingleNodeCheck(c, claim, framework.NodeSelection{})
  95. gomega.Expect(volume).NotTo(gomega.BeNil())
  96. err := checkGCEPD(volume, "pd-standard")
  97. framework.ExpectNoError(err, "checkGCEPD")
  98. err = verifyZonesInPV(volume, sets.NewString(cloudZones...), true /* match */)
  99. framework.ExpectNoError(err, "verifyZonesInPV")
  100. },
  101. },
  102. {
  103. Name: "HDD Regional PD with auto zone selection on GCE/GKE",
  104. CloudProviders: []string{"gce", "gke"},
  105. Provisioner: "kubernetes.io/gce-pd",
  106. Parameters: map[string]string{
  107. "type": "pd-standard",
  108. "replication-type": "regional-pd",
  109. },
  110. ClaimSize: repdMinSize,
  111. ExpectedSize: repdMinSize,
  112. PvCheck: func(claim *v1.PersistentVolumeClaim) {
  113. volume := testsuites.PVWriteReadSingleNodeCheck(c, claim, framework.NodeSelection{})
  114. gomega.Expect(volume).NotTo(gomega.BeNil())
  115. err := checkGCEPD(volume, "pd-standard")
  116. framework.ExpectNoError(err, "checkGCEPD")
  117. zones, err := framework.GetClusterZones(c)
  118. framework.ExpectNoError(err, "GetClusterZones")
  119. err = verifyZonesInPV(volume, zones, false /* match */)
  120. framework.ExpectNoError(err, "verifyZonesInPV")
  121. },
  122. },
  123. }
  124. for _, test := range tests {
  125. test.Client = c
  126. test.Class = newStorageClass(test, ns, "" /* suffix */)
  127. test.Claim = newClaim(test, ns, "" /* suffix */)
  128. test.Claim.Spec.StorageClassName = &test.Class.Name
  129. test.TestDynamicProvisioning()
  130. }
  131. }
  132. func testZonalFailover(c clientset.Interface, ns string) {
  133. cloudZones := getTwoRandomZones(c)
  134. testSpec := testsuites.StorageClassTest{
  135. Name: "Regional PD Failover on GCE/GKE",
  136. CloudProviders: []string{"gce", "gke"},
  137. Provisioner: "kubernetes.io/gce-pd",
  138. Parameters: map[string]string{
  139. "type": "pd-standard",
  140. "zones": strings.Join(cloudZones, ","),
  141. "replication-type": "regional-pd",
  142. },
  143. ClaimSize: repdMinSize,
  144. ExpectedSize: repdMinSize,
  145. }
  146. class := newStorageClass(testSpec, ns, "" /* suffix */)
  147. claimTemplate := newClaim(testSpec, ns, "" /* suffix */)
  148. claimTemplate.Name = pvcName
  149. claimTemplate.Spec.StorageClassName = &class.Name
  150. statefulSet, service, regionalPDLabels := newStatefulSet(claimTemplate, ns)
  151. ginkgo.By("creating a StorageClass " + class.Name)
  152. _, err := c.StorageV1().StorageClasses().Create(class)
  153. framework.ExpectNoError(err)
  154. defer func() {
  155. e2elog.Logf("deleting storage class %s", class.Name)
  156. framework.ExpectNoError(c.StorageV1().StorageClasses().Delete(class.Name, nil),
  157. "Error deleting StorageClass %s", class.Name)
  158. }()
  159. ginkgo.By("creating a StatefulSet")
  160. _, err = c.CoreV1().Services(ns).Create(service)
  161. framework.ExpectNoError(err)
  162. _, err = c.AppsV1().StatefulSets(ns).Create(statefulSet)
  163. framework.ExpectNoError(err)
  164. defer func() {
  165. e2elog.Logf("deleting statefulset%q/%q", statefulSet.Namespace, statefulSet.Name)
  166. // typically this claim has already been deleted
  167. framework.ExpectNoError(c.AppsV1().StatefulSets(ns).Delete(statefulSet.Name, nil /* options */),
  168. "Error deleting StatefulSet %s", statefulSet.Name)
  169. e2elog.Logf("deleting claims in namespace %s", ns)
  170. pvc := getPVC(c, ns, regionalPDLabels)
  171. framework.ExpectNoError(c.CoreV1().PersistentVolumeClaims(pvc.Namespace).Delete(pvc.Name, nil),
  172. "Error deleting claim %s.", pvc.Name)
  173. if pvc.Spec.VolumeName != "" {
  174. err = framework.WaitForPersistentVolumeDeleted(c, pvc.Spec.VolumeName, framework.Poll, pvDeletionTimeout)
  175. if err != nil {
  176. e2elog.Logf("WARNING: PV %s is not yet deleted, and subsequent tests may be affected.", pvc.Spec.VolumeName)
  177. }
  178. }
  179. }()
  180. err = framework.WaitForStatefulSetReplicasReady(statefulSet.Name, ns, c, framework.Poll, statefulSetReadyTimeout)
  181. if err != nil {
  182. pod := getPod(c, ns, regionalPDLabels)
  183. gomega.Expect(podutil.IsPodReadyConditionTrue(pod.Status)).To(gomega.BeTrue(),
  184. "The statefulset pod has the following conditions: %s", pod.Status.Conditions)
  185. framework.ExpectNoError(err)
  186. }
  187. pvc := getPVC(c, ns, regionalPDLabels)
  188. ginkgo.By("getting zone information from pod")
  189. pod := getPod(c, ns, regionalPDLabels)
  190. nodeName := pod.Spec.NodeName
  191. node, err := c.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{})
  192. framework.ExpectNoError(err)
  193. podZone := node.Labels[v1.LabelZoneFailureDomain]
  194. ginkgo.By("tainting nodes in the zone the pod is scheduled in")
  195. selector := labels.SelectorFromSet(labels.Set(map[string]string{v1.LabelZoneFailureDomain: podZone}))
  196. nodesInZone, err := c.CoreV1().Nodes().List(metav1.ListOptions{LabelSelector: selector.String()})
  197. framework.ExpectNoError(err)
  198. removeTaintFunc := addTaint(c, ns, nodesInZone.Items, podZone)
  199. defer func() {
  200. e2elog.Logf("removing previously added node taints")
  201. removeTaintFunc()
  202. }()
  203. ginkgo.By("deleting StatefulSet pod")
  204. err = c.CoreV1().Pods(ns).Delete(pod.Name, &metav1.DeleteOptions{})
  205. // Verify the pod is scheduled in the other zone.
  206. ginkgo.By("verifying the pod is scheduled in a different zone.")
  207. var otherZone string
  208. if cloudZones[0] == podZone {
  209. otherZone = cloudZones[1]
  210. } else {
  211. otherZone = cloudZones[0]
  212. }
  213. err = wait.PollImmediate(framework.Poll, statefulSetReadyTimeout, func() (bool, error) {
  214. e2elog.Logf("checking whether new pod is scheduled in zone %q", otherZone)
  215. pod = getPod(c, ns, regionalPDLabels)
  216. nodeName = pod.Spec.NodeName
  217. node, err = c.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{})
  218. if err != nil {
  219. return false, nil
  220. }
  221. newPodZone := node.Labels[v1.LabelZoneFailureDomain]
  222. return newPodZone == otherZone, nil
  223. })
  224. framework.ExpectNoError(err, "Error waiting for pod to be scheduled in a different zone (%q): %v", otherZone, err)
  225. err = framework.WaitForStatefulSetReplicasReady(statefulSet.Name, ns, c, 3*time.Second, framework.RestartPodReadyAgainTimeout)
  226. if err != nil {
  227. pod := getPod(c, ns, regionalPDLabels)
  228. gomega.Expect(podutil.IsPodReadyConditionTrue(pod.Status)).To(gomega.BeTrue(),
  229. "The statefulset pod has the following conditions: %s", pod.Status.Conditions)
  230. framework.ExpectNoError(err)
  231. }
  232. ginkgo.By("verifying the same PVC is used by the new pod")
  233. gomega.Expect(getPVC(c, ns, regionalPDLabels).Name).To(gomega.Equal(pvc.Name),
  234. "The same PVC should be used after failover.")
  235. ginkgo.By("verifying the container output has 2 lines, indicating the pod has been created twice using the same regional PD.")
  236. logs, err := framework.GetPodLogs(c, ns, pod.Name, "")
  237. framework.ExpectNoError(err,
  238. "Error getting logs from pod %s in namespace %s", pod.Name, ns)
  239. lineCount := len(strings.Split(strings.TrimSpace(logs), "\n"))
  240. expectedLineCount := 2
  241. gomega.Expect(lineCount).To(gomega.Equal(expectedLineCount),
  242. "Line count of the written file should be %d.", expectedLineCount)
  243. }
  244. func addTaint(c clientset.Interface, ns string, nodes []v1.Node, podZone string) (removeTaint func()) {
  245. reversePatches := make(map[string][]byte)
  246. for _, node := range nodes {
  247. oldData, err := json.Marshal(node)
  248. framework.ExpectNoError(err)
  249. node.Spec.Taints = append(node.Spec.Taints, v1.Taint{
  250. Key: taintKeyPrefix + ns,
  251. Value: podZone,
  252. Effect: v1.TaintEffectNoSchedule,
  253. })
  254. newData, err := json.Marshal(node)
  255. framework.ExpectNoError(err)
  256. patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Node{})
  257. framework.ExpectNoError(err)
  258. reversePatchBytes, err := strategicpatch.CreateTwoWayMergePatch(newData, oldData, v1.Node{})
  259. framework.ExpectNoError(err)
  260. reversePatches[node.Name] = reversePatchBytes
  261. _, err = c.CoreV1().Nodes().Patch(node.Name, types.StrategicMergePatchType, patchBytes)
  262. framework.ExpectNoError(err)
  263. }
  264. return func() {
  265. for nodeName, reversePatch := range reversePatches {
  266. _, err := c.CoreV1().Nodes().Patch(nodeName, types.StrategicMergePatchType, reversePatch)
  267. framework.ExpectNoError(err)
  268. }
  269. }
  270. }
  271. func testRegionalDelayedBinding(c clientset.Interface, ns string, pvcCount int) {
  272. test := testsuites.StorageClassTest{
  273. Client: c,
  274. Name: "Regional PD storage class with waitForFirstConsumer test on GCE",
  275. Provisioner: "kubernetes.io/gce-pd",
  276. Parameters: map[string]string{
  277. "type": "pd-standard",
  278. "replication-type": "regional-pd",
  279. },
  280. ClaimSize: repdMinSize,
  281. DelayBinding: true,
  282. }
  283. suffix := "delayed-regional"
  284. test.Class = newStorageClass(test, ns, suffix)
  285. var claims []*v1.PersistentVolumeClaim
  286. for i := 0; i < pvcCount; i++ {
  287. claim := newClaim(test, ns, suffix)
  288. claim.Spec.StorageClassName = &test.Class.Name
  289. claims = append(claims, claim)
  290. }
  291. pvs, node := test.TestBindingWaitForFirstConsumerMultiPVC(claims, nil /* node selector */, false /* expect unschedulable */)
  292. if node == nil {
  293. framework.Failf("unexpected nil node found")
  294. }
  295. zone, ok := node.Labels[v1.LabelZoneFailureDomain]
  296. if !ok {
  297. framework.Failf("label %s not found on Node", v1.LabelZoneFailureDomain)
  298. }
  299. for _, pv := range pvs {
  300. checkZoneFromLabelAndAffinity(pv, zone, false)
  301. }
  302. }
  303. func testRegionalAllowedTopologies(c clientset.Interface, ns string) {
  304. test := testsuites.StorageClassTest{
  305. Name: "Regional PD storage class with allowedTopologies test on GCE",
  306. Provisioner: "kubernetes.io/gce-pd",
  307. Parameters: map[string]string{
  308. "type": "pd-standard",
  309. "replication-type": "regional-pd",
  310. },
  311. ClaimSize: repdMinSize,
  312. ExpectedSize: repdMinSize,
  313. }
  314. suffix := "topo-regional"
  315. test.Client = c
  316. test.Class = newStorageClass(test, ns, suffix)
  317. zones := getTwoRandomZones(c)
  318. addAllowedTopologiesToStorageClass(c, test.Class, zones)
  319. test.Claim = newClaim(test, ns, suffix)
  320. test.Claim.Spec.StorageClassName = &test.Class.Name
  321. pv := test.TestDynamicProvisioning()
  322. checkZonesFromLabelAndAffinity(pv, sets.NewString(zones...), true)
  323. }
  324. func testRegionalAllowedTopologiesWithDelayedBinding(c clientset.Interface, ns string, pvcCount int) {
  325. test := testsuites.StorageClassTest{
  326. Client: c,
  327. Name: "Regional PD storage class with allowedTopologies and waitForFirstConsumer test on GCE",
  328. Provisioner: "kubernetes.io/gce-pd",
  329. Parameters: map[string]string{
  330. "type": "pd-standard",
  331. "replication-type": "regional-pd",
  332. },
  333. ClaimSize: repdMinSize,
  334. DelayBinding: true,
  335. }
  336. suffix := "topo-delayed-regional"
  337. test.Class = newStorageClass(test, ns, suffix)
  338. topoZones := getTwoRandomZones(c)
  339. addAllowedTopologiesToStorageClass(c, test.Class, topoZones)
  340. var claims []*v1.PersistentVolumeClaim
  341. for i := 0; i < pvcCount; i++ {
  342. claim := newClaim(test, ns, suffix)
  343. claim.Spec.StorageClassName = &test.Class.Name
  344. claims = append(claims, claim)
  345. }
  346. pvs, node := test.TestBindingWaitForFirstConsumerMultiPVC(claims, nil /* node selector */, false /* expect unschedulable */)
  347. if node == nil {
  348. framework.Failf("unexpected nil node found")
  349. }
  350. nodeZone, ok := node.Labels[v1.LabelZoneFailureDomain]
  351. if !ok {
  352. framework.Failf("label %s not found on Node", v1.LabelZoneFailureDomain)
  353. }
  354. zoneFound := false
  355. for _, zone := range topoZones {
  356. if zone == nodeZone {
  357. zoneFound = true
  358. break
  359. }
  360. }
  361. if !zoneFound {
  362. framework.Failf("zones specified in AllowedTopologies: %v does not contain zone of node where PV got provisioned: %s", topoZones, nodeZone)
  363. }
  364. for _, pv := range pvs {
  365. checkZonesFromLabelAndAffinity(pv, sets.NewString(topoZones...), true)
  366. }
  367. }
  368. func getPVC(c clientset.Interface, ns string, pvcLabels map[string]string) *v1.PersistentVolumeClaim {
  369. selector := labels.Set(pvcLabels).AsSelector()
  370. options := metav1.ListOptions{LabelSelector: selector.String()}
  371. pvcList, err := c.CoreV1().PersistentVolumeClaims(ns).List(options)
  372. framework.ExpectNoError(err)
  373. gomega.Expect(len(pvcList.Items)).To(gomega.Equal(1), "There should be exactly 1 PVC matched.")
  374. return &pvcList.Items[0]
  375. }
  376. func getPod(c clientset.Interface, ns string, podLabels map[string]string) *v1.Pod {
  377. selector := labels.Set(podLabels).AsSelector()
  378. options := metav1.ListOptions{LabelSelector: selector.String()}
  379. podList, err := c.CoreV1().Pods(ns).List(options)
  380. framework.ExpectNoError(err)
  381. gomega.Expect(len(podList.Items)).To(gomega.Equal(1), "There should be exactly 1 pod matched.")
  382. return &podList.Items[0]
  383. }
  384. func addAllowedTopologiesToStorageClass(c clientset.Interface, sc *storage.StorageClass, zones []string) {
  385. term := v1.TopologySelectorTerm{
  386. MatchLabelExpressions: []v1.TopologySelectorLabelRequirement{
  387. {
  388. Key: v1.LabelZoneFailureDomain,
  389. Values: zones,
  390. },
  391. },
  392. }
  393. sc.AllowedTopologies = append(sc.AllowedTopologies, term)
  394. }
  395. // Generates the spec of a StatefulSet with 1 replica that mounts a Regional PD.
  396. func newStatefulSet(claimTemplate *v1.PersistentVolumeClaim, ns string) (sts *appsv1.StatefulSet, svc *v1.Service, labels map[string]string) {
  397. var replicas int32 = 1
  398. labels = map[string]string{"app": "regional-pd-workload"}
  399. svc = &v1.Service{
  400. ObjectMeta: metav1.ObjectMeta{
  401. Name: "regional-pd-service",
  402. Namespace: ns,
  403. Labels: labels,
  404. },
  405. Spec: v1.ServiceSpec{
  406. Ports: []v1.ServicePort{{
  407. Port: 80,
  408. Name: "web",
  409. }},
  410. ClusterIP: v1.ClusterIPNone,
  411. Selector: labels,
  412. },
  413. }
  414. sts = &appsv1.StatefulSet{
  415. ObjectMeta: metav1.ObjectMeta{
  416. Name: "regional-pd-sts",
  417. Namespace: ns,
  418. },
  419. Spec: appsv1.StatefulSetSpec{
  420. Selector: &metav1.LabelSelector{
  421. MatchLabels: labels,
  422. },
  423. ServiceName: svc.Name,
  424. Replicas: &replicas,
  425. Template: *newPodTemplate(labels),
  426. VolumeClaimTemplates: []v1.PersistentVolumeClaim{*claimTemplate},
  427. },
  428. }
  429. return
  430. }
  431. func newPodTemplate(labels map[string]string) *v1.PodTemplateSpec {
  432. return &v1.PodTemplateSpec{
  433. ObjectMeta: metav1.ObjectMeta{
  434. Labels: labels,
  435. },
  436. Spec: v1.PodSpec{
  437. Containers: []v1.Container{
  438. // This container writes its pod name to a file in the Regional PD
  439. // and prints the entire file to stdout.
  440. {
  441. Name: "busybox",
  442. Image: imageutils.GetE2EImage(imageutils.BusyBox),
  443. Command: []string{"sh", "-c"},
  444. Args: []string{
  445. "echo ${POD_NAME} >> /mnt/data/regional-pd/pods.txt;" +
  446. "cat /mnt/data/regional-pd/pods.txt;" +
  447. "sleep 3600;",
  448. },
  449. Env: []v1.EnvVar{{
  450. Name: "POD_NAME",
  451. ValueFrom: &v1.EnvVarSource{
  452. FieldRef: &v1.ObjectFieldSelector{
  453. FieldPath: "metadata.name",
  454. },
  455. },
  456. }},
  457. Ports: []v1.ContainerPort{{
  458. ContainerPort: 80,
  459. Name: "web",
  460. }},
  461. VolumeMounts: []v1.VolumeMount{{
  462. Name: pvcName,
  463. MountPath: "/mnt/data/regional-pd",
  464. }},
  465. },
  466. },
  467. },
  468. }
  469. }
  470. func getTwoRandomZones(c clientset.Interface) []string {
  471. zones, err := framework.GetClusterZones(c)
  472. framework.ExpectNoError(err)
  473. gomega.Expect(zones.Len()).To(gomega.BeNumerically(">=", 2),
  474. "The test should only be run in multizone clusters.")
  475. zone1, _ := zones.PopAny()
  476. zone2, _ := zones.PopAny()
  477. return []string{zone1, zone2}
  478. }
  479. // If match is true, check if zones in PV exactly match zones given.
  480. // Otherwise, check whether zones in PV is superset of zones given.
  481. func verifyZonesInPV(volume *v1.PersistentVolume, zones sets.String, match bool) error {
  482. pvZones, err := volumehelpers.LabelZonesToSet(volume.Labels[v1.LabelZoneFailureDomain])
  483. if err != nil {
  484. return err
  485. }
  486. if match && zones.Equal(pvZones) || !match && zones.IsSuperset(pvZones) {
  487. return nil
  488. }
  489. return fmt.Errorf("Zones in StorageClass are %v, but zones in PV are %v", zones, pvZones)
  490. }