volumelimits.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368
  1. /*
  2. Copyright 2019 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package testsuites
  14. import (
  15. "context"
  16. "fmt"
  17. "regexp"
  18. "strings"
  19. "time"
  20. "github.com/onsi/ginkgo"
  21. v1 "k8s.io/api/core/v1"
  22. storagev1 "k8s.io/api/storage/v1"
  23. apierrors "k8s.io/apimachinery/pkg/api/errors"
  24. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  25. "k8s.io/apimachinery/pkg/util/sets"
  26. "k8s.io/apimachinery/pkg/util/wait"
  27. clientset "k8s.io/client-go/kubernetes"
  28. migrationplugins "k8s.io/csi-translation-lib/plugins" // volume plugin names are exported nicely there
  29. volumeutil "k8s.io/kubernetes/pkg/volume/util"
  30. "k8s.io/kubernetes/test/e2e/framework"
  31. e2enode "k8s.io/kubernetes/test/e2e/framework/node"
  32. e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
  33. e2epv "k8s.io/kubernetes/test/e2e/framework/pv"
  34. "k8s.io/kubernetes/test/e2e/storage/testpatterns"
  35. )
  36. type volumeLimitsTestSuite struct {
  37. tsInfo TestSuiteInfo
  38. }
  39. const (
  40. // The test uses generic pod startup / PV deletion timeouts. As it creates
  41. // much more volumes at once, these timeouts are multiplied by this number.
  42. // Using real nr. of volumes (e.g. 128 on GCE) would be really too much.
  43. testSlowMultiplier = 10
  44. // How long to wait until CSINode gets attach limit from installed CSI driver.
  45. csiNodeInfoTimeout = 1 * time.Minute
  46. )
  47. var _ TestSuite = &volumeLimitsTestSuite{}
  48. // InitVolumeLimitsTestSuite returns volumeLimitsTestSuite that implements TestSuite interface
  49. func InitVolumeLimitsTestSuite() TestSuite {
  50. return &volumeLimitsTestSuite{
  51. tsInfo: TestSuiteInfo{
  52. Name: "volumeLimits",
  53. TestPatterns: []testpatterns.TestPattern{
  54. testpatterns.FsVolModeDynamicPV,
  55. },
  56. },
  57. }
  58. }
  59. func (t *volumeLimitsTestSuite) GetTestSuiteInfo() TestSuiteInfo {
  60. return t.tsInfo
  61. }
  62. func (t *volumeLimitsTestSuite) SkipRedundantSuite(driver TestDriver, pattern testpatterns.TestPattern) {
  63. }
  64. func (t *volumeLimitsTestSuite) DefineTests(driver TestDriver, pattern testpatterns.TestPattern) {
  65. type local struct {
  66. config *PerTestConfig
  67. testCleanup func()
  68. cs clientset.Interface
  69. ns *v1.Namespace
  70. // VolumeResource contains pv, pvc, sc, etc. of the first pod created
  71. resource *VolumeResource
  72. // All created PVCs, incl. the one in resource
  73. pvcs []*v1.PersistentVolumeClaim
  74. // All created PVs, incl. the one in resource
  75. pvNames sets.String
  76. runningPod *v1.Pod
  77. unschedulablePod *v1.Pod
  78. }
  79. var (
  80. l local
  81. )
  82. // No preconditions to test. Normally they would be in a BeforeEach here.
  83. f := framework.NewDefaultFramework("volumelimits")
  84. // This checks that CSIMaxVolumeLimitChecker works as expected.
  85. // A randomly chosen node should be able to handle as many CSI volumes as
  86. // it claims to handle in CSINode.Spec.Drivers[x].Allocatable.
  87. // The test uses one single pod with a lot of volumes to work around any
  88. // max pod limit on a node.
  89. // And one extra pod with a CSI volume should get Pending with a condition
  90. // that says it's unschedulable because of volume limit.
  91. // BEWARE: the test may create lot of volumes and it's really slow.
  92. ginkgo.It("should support volume limits [Serial]", func() {
  93. driverInfo := driver.GetDriverInfo()
  94. if !driverInfo.Capabilities[CapVolumeLimits] {
  95. ginkgo.Skip(fmt.Sprintf("driver %s does not support volume limits", driverInfo.Name))
  96. }
  97. var dDriver DynamicPVTestDriver
  98. if dDriver = driver.(DynamicPVTestDriver); dDriver == nil {
  99. framework.Failf("Test driver does not provide dynamically created volumes")
  100. }
  101. l.ns = f.Namespace
  102. l.cs = f.ClientSet
  103. l.config, l.testCleanup = driver.PrepareTest(f)
  104. defer l.testCleanup()
  105. ginkgo.By("Picking a node")
  106. // Some CSI drivers are deployed to a single node (e.g csi-hostpath),
  107. // so we use that node instead of picking a random one.
  108. nodeName := l.config.ClientNodeSelection.Name
  109. if nodeName == "" {
  110. node, err := e2enode.GetRandomReadySchedulableNode(f.ClientSet)
  111. framework.ExpectNoError(err)
  112. nodeName = node.Name
  113. }
  114. framework.Logf("Selected node %s", nodeName)
  115. ginkgo.By("Checking node limits")
  116. limit, err := getNodeLimits(l.cs, l.config, nodeName, driverInfo)
  117. framework.ExpectNoError(err)
  118. framework.Logf("Node %s can handle %d volumes of driver %s", nodeName, limit, driverInfo.Name)
  119. // Create a storage class and generate a PVC. Do not instantiate the PVC yet, keep it for the last pod.
  120. testVolumeSizeRange := t.GetTestSuiteInfo().SupportedSizeRange
  121. driverVolumeSizeRange := dDriver.GetDriverInfo().SupportedSizeRange
  122. claimSize, err := getSizeRangesIntersection(testVolumeSizeRange, driverVolumeSizeRange)
  123. framework.ExpectNoError(err, "determine intersection of test size range %+v and driver size range %+v", testVolumeSizeRange, dDriver)
  124. l.resource = CreateVolumeResource(driver, l.config, pattern, testVolumeSizeRange)
  125. defer func() {
  126. err := l.resource.CleanupResource()
  127. framework.ExpectNoError(err, "while cleaning up resource")
  128. }()
  129. defer func() {
  130. cleanupTest(l.cs, l.ns.Name, l.runningPod.Name, l.unschedulablePod.Name, l.pvcs, l.pvNames)
  131. }()
  132. // Create <limit> PVCs for one gigantic pod.
  133. ginkgo.By(fmt.Sprintf("Creating %d PVC(s)", limit))
  134. for i := 0; i < limit; i++ {
  135. pvc := e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{
  136. ClaimSize: claimSize,
  137. StorageClassName: &l.resource.Sc.Name,
  138. }, l.ns.Name)
  139. pvc, err = l.cs.CoreV1().PersistentVolumeClaims(l.ns.Name).Create(context.TODO(), pvc, metav1.CreateOptions{})
  140. framework.ExpectNoError(err)
  141. l.pvcs = append(l.pvcs, pvc)
  142. }
  143. ginkgo.By("Creating pod to use all PVC(s)")
  144. pod := e2epod.MakeSecPod(l.ns.Name, l.pvcs, nil, false, "", false, false, e2epv.SELinuxLabel, nil)
  145. // Use affinity to schedule everything on the right node
  146. selection := e2epod.NodeSelection{}
  147. e2epod.SetAffinity(&selection, nodeName)
  148. pod.Spec.Affinity = selection.Affinity
  149. l.runningPod, err = l.cs.CoreV1().Pods(l.ns.Name).Create(context.TODO(), pod, metav1.CreateOptions{})
  150. framework.ExpectNoError(err)
  151. ginkgo.By("Waiting for all PVCs to get Bound")
  152. l.pvNames, err = waitForAllPVCsBound(l.cs, testSlowMultiplier*e2epv.PVBindingTimeout, l.pvcs)
  153. framework.ExpectNoError(err)
  154. ginkgo.By("Waiting for the pod Running")
  155. err = e2epod.WaitTimeoutForPodRunningInNamespace(l.cs, l.runningPod.Name, l.ns.Name, testSlowMultiplier*framework.PodStartTimeout)
  156. framework.ExpectNoError(err)
  157. ginkgo.By("Creating an extra pod with one volume to exceed the limit")
  158. pod = e2epod.MakeSecPod(l.ns.Name, []*v1.PersistentVolumeClaim{l.resource.Pvc}, nil, false, "", false, false, e2epv.SELinuxLabel, nil)
  159. // Use affinity to schedule everything on the right node
  160. e2epod.SetAffinity(&selection, nodeName)
  161. pod.Spec.Affinity = selection.Affinity
  162. l.unschedulablePod, err = l.cs.CoreV1().Pods(l.ns.Name).Create(context.TODO(), pod, metav1.CreateOptions{})
  163. framework.ExpectNoError(err, "Failed to create an extra pod with one volume to exceed the limit")
  164. ginkgo.By("Waiting for the pod to get unschedulable with the right message")
  165. err = e2epod.WaitForPodCondition(l.cs, l.ns.Name, l.unschedulablePod.Name, "Unschedulable", framework.PodStartTimeout, func(pod *v1.Pod) (bool, error) {
  166. if pod.Status.Phase == v1.PodPending {
  167. reg, err := regexp.Compile(`max.+volume.+count`)
  168. if err != nil {
  169. return false, err
  170. }
  171. for _, cond := range pod.Status.Conditions {
  172. matched := reg.MatchString(cond.Message)
  173. if cond.Type == v1.PodScheduled && cond.Status == v1.ConditionFalse && cond.Reason == "Unschedulable" && matched {
  174. return true, nil
  175. }
  176. }
  177. }
  178. if pod.Status.Phase != v1.PodPending {
  179. return true, fmt.Errorf("Expected pod to be in phase Pending, but got phase: %v", pod.Status.Phase)
  180. }
  181. return false, nil
  182. })
  183. framework.ExpectNoError(err)
  184. })
  185. }
  186. func cleanupTest(cs clientset.Interface, ns string, runningPodName, unschedulablePodName string, pvcs []*v1.PersistentVolumeClaim, pvNames sets.String) error {
  187. var cleanupErrors []string
  188. if runningPodName != "" {
  189. err := cs.CoreV1().Pods(ns).Delete(context.TODO(), runningPodName, nil)
  190. if err != nil {
  191. cleanupErrors = append(cleanupErrors, fmt.Sprintf("failed to delete pod %s: %s", runningPodName, err))
  192. }
  193. }
  194. if unschedulablePodName != "" {
  195. err := cs.CoreV1().Pods(ns).Delete(context.TODO(), unschedulablePodName, nil)
  196. if err != nil {
  197. cleanupErrors = append(cleanupErrors, fmt.Sprintf("failed to delete pod %s: %s", unschedulablePodName, err))
  198. }
  199. }
  200. for _, pvc := range pvcs {
  201. err := cs.CoreV1().PersistentVolumeClaims(ns).Delete(context.TODO(), pvc.Name, nil)
  202. if err != nil {
  203. cleanupErrors = append(cleanupErrors, fmt.Sprintf("failed to delete PVC %s: %s", pvc.Name, err))
  204. }
  205. }
  206. // Wait for the PVs to be deleted. It includes also pod and PVC deletion because of PVC protection.
  207. // We use PVs to make sure that the test does not leave orphan PVs when a CSI driver is destroyed
  208. // just after the test ends.
  209. err := wait.Poll(5*time.Second, testSlowMultiplier*e2epv.PVDeletingTimeout, func() (bool, error) {
  210. existing := 0
  211. for _, pvName := range pvNames.UnsortedList() {
  212. _, err := cs.CoreV1().PersistentVolumes().Get(context.TODO(), pvName, metav1.GetOptions{})
  213. if err == nil {
  214. existing++
  215. } else {
  216. if apierrors.IsNotFound(err) {
  217. pvNames.Delete(pvName)
  218. } else {
  219. framework.Logf("Failed to get PV %s: %s", pvName, err)
  220. }
  221. }
  222. }
  223. if existing > 0 {
  224. framework.Logf("Waiting for %d PVs to be deleted", existing)
  225. return false, nil
  226. }
  227. return true, nil
  228. })
  229. if err != nil {
  230. cleanupErrors = append(cleanupErrors, fmt.Sprintf("timed out waiting for PVs to be deleted: %s", err))
  231. }
  232. if len(cleanupErrors) != 0 {
  233. return fmt.Errorf("test cleanup failed: " + strings.Join(cleanupErrors, "; "))
  234. }
  235. return nil
  236. }
  237. // waitForAllPVCsBound waits until the given PVCs are all bound. It then returns the bound PVC names as a set.
  238. func waitForAllPVCsBound(cs clientset.Interface, timeout time.Duration, pvcs []*v1.PersistentVolumeClaim) (sets.String, error) {
  239. pvNames := sets.NewString()
  240. err := wait.Poll(5*time.Second, timeout, func() (bool, error) {
  241. unbound := 0
  242. for _, pvc := range pvcs {
  243. pvc, err := cs.CoreV1().PersistentVolumeClaims(pvc.Namespace).Get(context.TODO(), pvc.Name, metav1.GetOptions{})
  244. if err != nil {
  245. return false, err
  246. }
  247. if pvc.Status.Phase != v1.ClaimBound {
  248. unbound++
  249. } else {
  250. pvNames.Insert(pvc.Spec.VolumeName)
  251. }
  252. }
  253. if unbound > 0 {
  254. framework.Logf("%d/%d of PVCs are Bound", pvNames.Len(), len(pvcs))
  255. return false, nil
  256. }
  257. return true, nil
  258. })
  259. if err != nil {
  260. return nil, fmt.Errorf("error waiting for all PVCs to be bound: %v", err)
  261. }
  262. return pvNames, nil
  263. }
  264. func getNodeLimits(cs clientset.Interface, config *PerTestConfig, nodeName string, driverInfo *DriverInfo) (int, error) {
  265. if len(driverInfo.InTreePluginName) == 0 {
  266. return getCSINodeLimits(cs, config, nodeName, driverInfo)
  267. }
  268. return getInTreeNodeLimits(cs, nodeName, driverInfo)
  269. }
  270. func getInTreeNodeLimits(cs clientset.Interface, nodeName string, driverInfo *DriverInfo) (int, error) {
  271. node, err := cs.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
  272. if err != nil {
  273. return 0, err
  274. }
  275. var allocatableKey string
  276. switch driverInfo.InTreePluginName {
  277. case migrationplugins.AWSEBSInTreePluginName:
  278. allocatableKey = volumeutil.EBSVolumeLimitKey
  279. case migrationplugins.GCEPDInTreePluginName:
  280. allocatableKey = volumeutil.GCEVolumeLimitKey
  281. case migrationplugins.CinderInTreePluginName:
  282. allocatableKey = volumeutil.CinderVolumeLimitKey
  283. case migrationplugins.AzureDiskInTreePluginName:
  284. allocatableKey = volumeutil.AzureVolumeLimitKey
  285. default:
  286. return 0, fmt.Errorf("Unknown in-tree volume plugin name: %s", driverInfo.InTreePluginName)
  287. }
  288. limit, ok := node.Status.Allocatable[v1.ResourceName(allocatableKey)]
  289. if !ok {
  290. return 0, fmt.Errorf("Node %s does not contain status.allocatable[%s] for volume plugin %s", nodeName, allocatableKey, driverInfo.InTreePluginName)
  291. }
  292. return int(limit.Value()), nil
  293. }
  294. func getCSINodeLimits(cs clientset.Interface, config *PerTestConfig, nodeName string, driverInfo *DriverInfo) (int, error) {
  295. // Retry with a timeout, the driver might just have been installed and kubelet takes a while to publish everything.
  296. var limit int
  297. err := wait.PollImmediate(2*time.Second, csiNodeInfoTimeout, func() (bool, error) {
  298. csiNode, err := cs.StorageV1().CSINodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
  299. if err != nil {
  300. framework.Logf("%s", err)
  301. return false, nil
  302. }
  303. var csiDriver *storagev1.CSINodeDriver
  304. for _, c := range csiNode.Spec.Drivers {
  305. if c.Name == driverInfo.Name || c.Name == config.GetUniqueDriverName() {
  306. csiDriver = &c
  307. break
  308. }
  309. }
  310. if csiDriver == nil {
  311. framework.Logf("CSINodeInfo does not have driver %s yet", driverInfo.Name)
  312. return false, nil
  313. }
  314. if csiDriver.Allocatable == nil {
  315. return false, fmt.Errorf("CSINodeInfo does not have Allocatable for driver %s", driverInfo.Name)
  316. }
  317. if csiDriver.Allocatable.Count == nil {
  318. return false, fmt.Errorf("CSINodeInfo does not have Allocatable.Count for driver %s", driverInfo.Name)
  319. }
  320. limit = int(*csiDriver.Allocatable.Count)
  321. return true, nil
  322. })
  323. if err != nil {
  324. return 0, fmt.Errorf("could not get CSINode limit for driver %s: %v", driverInfo.Name, err)
  325. }
  326. return limit, nil
  327. }