persistent_volumes-local.go 40 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200
  1. /*
  2. Copyright 2018 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package storage
  14. import (
  15. "fmt"
  16. "path/filepath"
  17. "strconv"
  18. "strings"
  19. "sync"
  20. "time"
  21. "github.com/onsi/ginkgo"
  22. "github.com/onsi/gomega"
  23. appsv1 "k8s.io/api/apps/v1"
  24. v1 "k8s.io/api/core/v1"
  25. storagev1 "k8s.io/api/storage/v1"
  26. apierrors "k8s.io/apimachinery/pkg/api/errors"
  27. "k8s.io/apimachinery/pkg/api/resource"
  28. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  29. utilerrors "k8s.io/apimachinery/pkg/util/errors"
  30. "k8s.io/apimachinery/pkg/util/sets"
  31. "k8s.io/apimachinery/pkg/util/wait"
  32. "k8s.io/apimachinery/pkg/watch"
  33. clientset "k8s.io/client-go/kubernetes"
  34. "k8s.io/kubernetes/test/e2e/framework"
  35. e2elog "k8s.io/kubernetes/test/e2e/framework/log"
  36. "k8s.io/kubernetes/test/e2e/storage/utils"
  37. imageutils "k8s.io/kubernetes/test/utils/image"
  38. )
  39. type localTestConfig struct {
  40. ns string
  41. nodes []v1.Node
  42. node0 *v1.Node
  43. client clientset.Interface
  44. scName string
  45. ssTester *framework.StatefulSetTester
  46. discoveryDir string
  47. hostExec utils.HostExec
  48. ltrMgr utils.LocalTestResourceManager
  49. }
  50. type localVolumeType string
  51. const (
  52. // DirectoryLocalVolumeType is the default local volume type, aka a directory
  53. DirectoryLocalVolumeType localVolumeType = "dir"
  54. // DirectoryLinkLocalVolumeType is like DirectoryLocalVolumeType,
  55. // but it's a symbolic link to directory
  56. DirectoryLinkLocalVolumeType localVolumeType = "dir-link"
  57. // DirectoryBindMountedLocalVolumeType is like DirectoryLocalVolumeType
  58. // but bind mounted
  59. DirectoryBindMountedLocalVolumeType localVolumeType = "dir-bindmounted"
  60. // DirectoryLinkBindMountedLocalVolumeType is like DirectoryLocalVolumeType,
  61. // but it's a symbolic link to self bind mounted directory
  62. // Note that bind mounting at symbolic link actually mounts at directory it
  63. // links to.
  64. DirectoryLinkBindMountedLocalVolumeType localVolumeType = "dir-link-bindmounted"
  65. // TmpfsLocalVolumeType creates a tmpfs and mounts it
  66. TmpfsLocalVolumeType localVolumeType = "tmpfs"
  67. // GCELocalSSDVolumeType tests based on local ssd at /mnt/disks/by-uuid/
  68. GCELocalSSDVolumeType localVolumeType = "gce-localssd-scsi-fs"
  69. // BlockLocalVolumeType creates a local file, formats it, and maps it as a block device.
  70. BlockLocalVolumeType localVolumeType = "block"
  71. // BlockFsWithFormatLocalVolumeType creates a local file serving as the backing for block device,
  72. // formats it, and mounts it to use as FS mode local volume.
  73. BlockFsWithFormatLocalVolumeType localVolumeType = "blockfswithformat"
  74. // BlockFsWithoutFormatLocalVolumeType creates a local file serving as the backing for block device,
  75. // does not format it manually, and mounts it to use as FS mode local volume.
  76. BlockFsWithoutFormatLocalVolumeType localVolumeType = "blockfswithoutformat"
  77. )
  78. // map to local test resource type
  79. var setupLocalVolumeMap = map[localVolumeType]utils.LocalVolumeType{
  80. GCELocalSSDVolumeType: utils.LocalVolumeGCELocalSSD,
  81. TmpfsLocalVolumeType: utils.LocalVolumeTmpfs,
  82. DirectoryLocalVolumeType: utils.LocalVolumeDirectory,
  83. DirectoryLinkLocalVolumeType: utils.LocalVolumeDirectoryLink,
  84. DirectoryBindMountedLocalVolumeType: utils.LocalVolumeDirectoryBindMounted,
  85. DirectoryLinkBindMountedLocalVolumeType: utils.LocalVolumeDirectoryLinkBindMounted,
  86. BlockLocalVolumeType: utils.LocalVolumeBlock, // block device in Block mode
  87. BlockFsWithFormatLocalVolumeType: utils.LocalVolumeBlockFS,
  88. BlockFsWithoutFormatLocalVolumeType: utils.LocalVolumeBlock, // block device in Filesystem mode (default in this test suite)
  89. }
  90. type localTestVolume struct {
  91. // Local test resource
  92. ltr *utils.LocalTestResource
  93. // PVC for this volume
  94. pvc *v1.PersistentVolumeClaim
  95. // PV for this volume
  96. pv *v1.PersistentVolume
  97. // Type of local volume
  98. localVolumeType localVolumeType
  99. }
  100. const (
  101. // TODO: This may not be available/writable on all images.
  102. hostBase = "/tmp"
  103. // Path to the first volume in the test containers
  104. // created via createLocalPod or makeLocalPod
  105. // leveraging pv_util.MakePod
  106. volumeDir = "/mnt/volume1"
  107. // testFile created in setupLocalVolume
  108. testFile = "test-file"
  109. // testFileContent written into testFile
  110. testFileContent = "test-file-content"
  111. testSCPrefix = "local-volume-test-storageclass"
  112. // A sample request size
  113. testRequestSize = "10Mi"
  114. // Max number of nodes to use for testing
  115. maxNodes = 5
  116. )
  117. var (
  118. // storage class volume binding modes
  119. waitMode = storagev1.VolumeBindingWaitForFirstConsumer
  120. immediateMode = storagev1.VolumeBindingImmediate
  121. // Common selinux labels
  122. selinuxLabel = &v1.SELinuxOptions{
  123. Level: "s0:c0,c1"}
  124. )
  125. var _ = utils.SIGDescribe("PersistentVolumes-local ", func() {
  126. f := framework.NewDefaultFramework("persistent-local-volumes-test")
  127. var (
  128. config *localTestConfig
  129. scName string
  130. )
  131. ginkgo.BeforeEach(func() {
  132. // Get all the schedulable nodes
  133. nodes := framework.GetReadySchedulableNodesOrDie(f.ClientSet)
  134. gomega.Expect(len(nodes.Items)).NotTo(gomega.BeZero(), "No available nodes for scheduling")
  135. // Cap max number of nodes
  136. maxLen := len(nodes.Items)
  137. if maxLen > maxNodes {
  138. maxLen = maxNodes
  139. }
  140. scName = fmt.Sprintf("%v-%v", testSCPrefix, f.Namespace.Name)
  141. // Choose the first node
  142. node0 := &nodes.Items[0]
  143. ssTester := framework.NewStatefulSetTester(f.ClientSet)
  144. hostExec := utils.NewHostExec(f)
  145. ltrMgr := utils.NewLocalResourceManager("local-volume-test", hostExec, hostBase)
  146. config = &localTestConfig{
  147. ns: f.Namespace.Name,
  148. client: f.ClientSet,
  149. nodes: nodes.Items[:maxLen],
  150. node0: node0,
  151. scName: scName,
  152. ssTester: ssTester,
  153. discoveryDir: filepath.Join(hostBase, f.Namespace.Name),
  154. hostExec: hostExec,
  155. ltrMgr: ltrMgr,
  156. }
  157. })
  158. for tempTestVolType := range setupLocalVolumeMap {
  159. // New variable required for gingko test closures
  160. testVolType := tempTestVolType
  161. serialStr := ""
  162. if testVolType == GCELocalSSDVolumeType {
  163. serialStr = " [Serial]"
  164. }
  165. ctxString := fmt.Sprintf("[Volume type: %s]%v", testVolType, serialStr)
  166. testMode := immediateMode
  167. ginkgo.Context(ctxString, func() {
  168. var testVol *localTestVolume
  169. ginkgo.BeforeEach(func() {
  170. if testVolType == GCELocalSSDVolumeType {
  171. SkipUnlessLocalSSDExists(config, "scsi", "fs", config.node0)
  172. }
  173. setupStorageClass(config, &testMode)
  174. testVols := setupLocalVolumesPVCsPVs(config, testVolType, config.node0, 1, testMode)
  175. testVol = testVols[0]
  176. })
  177. ginkgo.AfterEach(func() {
  178. cleanupLocalVolumes(config, []*localTestVolume{testVol})
  179. cleanupStorageClass(config)
  180. })
  181. ginkgo.Context("One pod requesting one prebound PVC", func() {
  182. var (
  183. pod1 *v1.Pod
  184. pod1Err error
  185. )
  186. ginkgo.BeforeEach(func() {
  187. ginkgo.By("Creating pod1")
  188. pod1, pod1Err = createLocalPod(config, testVol, nil)
  189. framework.ExpectNoError(pod1Err)
  190. verifyLocalPod(config, testVol, pod1, config.node0.Name)
  191. writeCmd := createWriteCmd(volumeDir, testFile, testFileContent, testVol.localVolumeType)
  192. ginkgo.By("Writing in pod1")
  193. podRWCmdExec(pod1, writeCmd)
  194. })
  195. ginkgo.AfterEach(func() {
  196. ginkgo.By("Deleting pod1")
  197. framework.DeletePodOrFail(config.client, config.ns, pod1.Name)
  198. })
  199. ginkgo.It("should be able to mount volume and read from pod1", func() {
  200. ginkgo.By("Reading in pod1")
  201. // testFileContent was written in BeforeEach
  202. testReadFileContent(volumeDir, testFile, testFileContent, pod1, testVolType)
  203. })
  204. ginkgo.It("should be able to mount volume and write from pod1", func() {
  205. // testFileContent was written in BeforeEach
  206. testReadFileContent(volumeDir, testFile, testFileContent, pod1, testVolType)
  207. ginkgo.By("Writing in pod1")
  208. writeCmd := createWriteCmd(volumeDir, testFile, testVol.ltr.Path /*writeTestFileContent*/, testVolType)
  209. podRWCmdExec(pod1, writeCmd)
  210. })
  211. })
  212. ginkgo.Context("Two pods mounting a local volume at the same time", func() {
  213. ginkgo.It("should be able to write from pod1 and read from pod2", func() {
  214. twoPodsReadWriteTest(config, testVol)
  215. })
  216. })
  217. ginkgo.Context("Two pods mounting a local volume one after the other", func() {
  218. ginkgo.It("should be able to write from pod1 and read from pod2", func() {
  219. twoPodsReadWriteSerialTest(config, testVol)
  220. })
  221. })
  222. ginkgo.Context("Set fsGroup for local volume", func() {
  223. ginkgo.BeforeEach(func() {
  224. if testVolType == BlockLocalVolumeType {
  225. framework.Skipf("We don't set fsGroup on block device, skipped.")
  226. }
  227. })
  228. ginkgo.It("should set fsGroup for one pod [Slow]", func() {
  229. ginkgo.By("Checking fsGroup is set")
  230. pod := createPodWithFsGroupTest(config, testVol, 1234, 1234)
  231. ginkgo.By("Deleting pod")
  232. framework.DeletePodOrFail(config.client, config.ns, pod.Name)
  233. })
  234. ginkgo.It("should set same fsGroup for two pods simultaneously [Slow]", func() {
  235. fsGroup := int64(1234)
  236. ginkgo.By("Create first pod and check fsGroup is set")
  237. pod1 := createPodWithFsGroupTest(config, testVol, fsGroup, fsGroup)
  238. ginkgo.By("Create second pod with same fsGroup and check fsGroup is correct")
  239. pod2 := createPodWithFsGroupTest(config, testVol, fsGroup, fsGroup)
  240. ginkgo.By("Deleting first pod")
  241. framework.DeletePodOrFail(config.client, config.ns, pod1.Name)
  242. ginkgo.By("Deleting second pod")
  243. framework.DeletePodOrFail(config.client, config.ns, pod2.Name)
  244. })
  245. ginkgo.It("should set different fsGroup for second pod if first pod is deleted", func() {
  246. framework.Skipf("Disabled temporarily, reopen after #73168 is fixed")
  247. fsGroup1, fsGroup2 := int64(1234), int64(4321)
  248. ginkgo.By("Create first pod and check fsGroup is set")
  249. pod1 := createPodWithFsGroupTest(config, testVol, fsGroup1, fsGroup1)
  250. ginkgo.By("Deleting first pod")
  251. err := framework.DeletePodWithWait(f, config.client, pod1)
  252. framework.ExpectNoError(err, "while deleting first pod")
  253. ginkgo.By("Create second pod and check fsGroup is the new one")
  254. pod2 := createPodWithFsGroupTest(config, testVol, fsGroup2, fsGroup2)
  255. ginkgo.By("Deleting second pod")
  256. framework.DeletePodOrFail(config.client, config.ns, pod2.Name)
  257. })
  258. })
  259. })
  260. }
  261. ginkgo.Context("Local volume that cannot be mounted [Slow]", func() {
  262. // TODO:
  263. // - check for these errors in unit tests instead
  264. ginkgo.It("should fail due to non-existent path", func() {
  265. testVol := &localTestVolume{
  266. ltr: &utils.LocalTestResource{
  267. Node: config.node0,
  268. Path: "/non-existent/location/nowhere",
  269. },
  270. localVolumeType: DirectoryLocalVolumeType,
  271. }
  272. ginkgo.By("Creating local PVC and PV")
  273. createLocalPVCsPVs(config, []*localTestVolume{testVol}, immediateMode)
  274. pod, err := createLocalPod(config, testVol, nil)
  275. framework.ExpectError(err)
  276. err = framework.WaitTimeoutForPodRunningInNamespace(config.client, pod.Name, pod.Namespace, framework.PodStartShortTimeout)
  277. framework.ExpectError(err)
  278. cleanupLocalPVCsPVs(config, []*localTestVolume{testVol})
  279. })
  280. ginkgo.It("should fail due to wrong node", func() {
  281. if len(config.nodes) < 2 {
  282. framework.Skipf("Runs only when number of nodes >= 2")
  283. }
  284. testVols := setupLocalVolumesPVCsPVs(config, DirectoryLocalVolumeType, config.node0, 1, immediateMode)
  285. testVol := testVols[0]
  286. pod := makeLocalPodWithNodeName(config, testVol, config.nodes[1].Name)
  287. pod, err := config.client.CoreV1().Pods(config.ns).Create(pod)
  288. framework.ExpectNoError(err)
  289. err = framework.WaitTimeoutForPodRunningInNamespace(config.client, pod.Name, pod.Namespace, framework.PodStartShortTimeout)
  290. framework.ExpectError(err)
  291. cleanupLocalVolumes(config, []*localTestVolume{testVol})
  292. })
  293. })
  294. ginkgo.Context("Pod with node different from PV's NodeAffinity", func() {
  295. var (
  296. testVol *localTestVolume
  297. volumeType localVolumeType
  298. )
  299. ginkgo.BeforeEach(func() {
  300. if len(config.nodes) < 2 {
  301. framework.Skipf("Runs only when number of nodes >= 2")
  302. }
  303. volumeType = DirectoryLocalVolumeType
  304. setupStorageClass(config, &immediateMode)
  305. testVols := setupLocalVolumesPVCsPVs(config, volumeType, config.node0, 1, immediateMode)
  306. testVol = testVols[0]
  307. })
  308. ginkgo.AfterEach(func() {
  309. cleanupLocalVolumes(config, []*localTestVolume{testVol})
  310. cleanupStorageClass(config)
  311. })
  312. ginkgo.It("should fail scheduling due to different NodeAffinity", func() {
  313. testPodWithNodeConflict(config, volumeType, config.nodes[1].Name, makeLocalPodWithNodeAffinity, immediateMode)
  314. })
  315. ginkgo.It("should fail scheduling due to different NodeSelector", func() {
  316. testPodWithNodeConflict(config, volumeType, config.nodes[1].Name, makeLocalPodWithNodeSelector, immediateMode)
  317. })
  318. })
  319. ginkgo.Context("StatefulSet with pod affinity [Slow]", func() {
  320. var testVols map[string][]*localTestVolume
  321. const (
  322. ssReplicas = 3
  323. volsPerNode = 6
  324. )
  325. ginkgo.BeforeEach(func() {
  326. setupStorageClass(config, &waitMode)
  327. testVols = map[string][]*localTestVolume{}
  328. for i, node := range config.nodes {
  329. // The PVCs created here won't be used
  330. ginkgo.By(fmt.Sprintf("Setting up local volumes on node %q", node.Name))
  331. vols := setupLocalVolumesPVCsPVs(config, DirectoryLocalVolumeType, &config.nodes[i], volsPerNode, waitMode)
  332. testVols[node.Name] = vols
  333. }
  334. })
  335. ginkgo.AfterEach(func() {
  336. for _, vols := range testVols {
  337. cleanupLocalVolumes(config, vols)
  338. }
  339. cleanupStorageClass(config)
  340. })
  341. ginkgo.It("should use volumes spread across nodes when pod has anti-affinity", func() {
  342. if len(config.nodes) < ssReplicas {
  343. framework.Skipf("Runs only when number of nodes >= %v", ssReplicas)
  344. }
  345. ginkgo.By("Creating a StatefulSet with pod anti-affinity on nodes")
  346. ss := createStatefulSet(config, ssReplicas, volsPerNode, true, false)
  347. validateStatefulSet(config, ss, true)
  348. })
  349. ginkgo.It("should use volumes on one node when pod has affinity", func() {
  350. ginkgo.By("Creating a StatefulSet with pod affinity on nodes")
  351. ss := createStatefulSet(config, ssReplicas, volsPerNode/ssReplicas, false, false)
  352. validateStatefulSet(config, ss, false)
  353. })
  354. ginkgo.It("should use volumes spread across nodes when pod management is parallel and pod has anti-affinity", func() {
  355. if len(config.nodes) < ssReplicas {
  356. framework.Skipf("Runs only when number of nodes >= %v", ssReplicas)
  357. }
  358. ginkgo.By("Creating a StatefulSet with pod anti-affinity on nodes")
  359. ss := createStatefulSet(config, ssReplicas, 1, true, true)
  360. validateStatefulSet(config, ss, true)
  361. })
  362. ginkgo.It("should use volumes on one node when pod management is parallel and pod has affinity", func() {
  363. ginkgo.By("Creating a StatefulSet with pod affinity on nodes")
  364. ss := createStatefulSet(config, ssReplicas, 1, false, true)
  365. validateStatefulSet(config, ss, false)
  366. })
  367. })
  368. ginkgo.Context("Stress with local volumes [Serial]", func() {
  369. var (
  370. allLocalVolumes = make(map[string][]*localTestVolume)
  371. volType = TmpfsLocalVolumeType
  372. stopCh = make(chan struct{})
  373. wg sync.WaitGroup
  374. )
  375. const (
  376. volsPerNode = 10 // Make this non-divisable by volsPerPod to increase changes of partial binding failure
  377. volsPerPod = 3
  378. podsFactor = 4
  379. )
  380. ginkgo.BeforeEach(func() {
  381. setupStorageClass(config, &waitMode)
  382. for i, node := range config.nodes {
  383. ginkgo.By(fmt.Sprintf("Setting up %d local volumes on node %q", volsPerNode, node.Name))
  384. allLocalVolumes[node.Name] = setupLocalVolumes(config, volType, &config.nodes[i], volsPerNode)
  385. }
  386. ginkgo.By(fmt.Sprintf("Create %d PVs", volsPerNode*len(config.nodes)))
  387. var err error
  388. for _, localVolumes := range allLocalVolumes {
  389. for _, localVolume := range localVolumes {
  390. pvConfig := makeLocalPVConfig(config, localVolume)
  391. localVolume.pv, err = framework.CreatePV(config.client, framework.MakePersistentVolume(pvConfig))
  392. framework.ExpectNoError(err)
  393. }
  394. }
  395. ginkgo.By("Start a goroutine to recycle unbound PVs")
  396. wg.Add(1)
  397. go func() {
  398. defer wg.Done()
  399. w, err := config.client.CoreV1().PersistentVolumes().Watch(metav1.ListOptions{})
  400. framework.ExpectNoError(err)
  401. if w == nil {
  402. return
  403. }
  404. defer w.Stop()
  405. for {
  406. select {
  407. case event := <-w.ResultChan():
  408. if event.Type != watch.Modified {
  409. continue
  410. }
  411. pv, ok := event.Object.(*v1.PersistentVolume)
  412. if !ok {
  413. continue
  414. }
  415. if pv.Status.Phase == v1.VolumeBound || pv.Status.Phase == v1.VolumeAvailable {
  416. continue
  417. }
  418. pv, err = config.client.CoreV1().PersistentVolumes().Get(pv.Name, metav1.GetOptions{})
  419. if apierrors.IsNotFound(err) {
  420. continue
  421. }
  422. // Delete and create a new PV for same local volume storage
  423. ginkgo.By(fmt.Sprintf("Delete %q and create a new PV for same local volume storage", pv.Name))
  424. for _, localVolumes := range allLocalVolumes {
  425. for _, localVolume := range localVolumes {
  426. if localVolume.pv.Name != pv.Name {
  427. continue
  428. }
  429. err = config.client.CoreV1().PersistentVolumes().Delete(pv.Name, &metav1.DeleteOptions{})
  430. framework.ExpectNoError(err)
  431. pvConfig := makeLocalPVConfig(config, localVolume)
  432. localVolume.pv, err = framework.CreatePV(config.client, framework.MakePersistentVolume(pvConfig))
  433. framework.ExpectNoError(err)
  434. }
  435. }
  436. case <-stopCh:
  437. return
  438. }
  439. }
  440. }()
  441. })
  442. ginkgo.AfterEach(func() {
  443. ginkgo.By("Stop and wait for recycle goroutine to finish")
  444. close(stopCh)
  445. wg.Wait()
  446. ginkgo.By("Clean all PVs")
  447. for nodeName, localVolumes := range allLocalVolumes {
  448. ginkgo.By(fmt.Sprintf("Cleaning up %d local volumes on node %q", len(localVolumes), nodeName))
  449. cleanupLocalVolumes(config, localVolumes)
  450. }
  451. cleanupStorageClass(config)
  452. })
  453. ginkgo.It("should be able to process many pods and reuse local volumes", func() {
  454. var (
  455. podsLock sync.Mutex
  456. // Have one extra pod pending
  457. numConcurrentPods = volsPerNode/volsPerPod*len(config.nodes) + 1
  458. totalPods = numConcurrentPods * podsFactor
  459. numCreated = 0
  460. numFinished = 0
  461. pods = map[string]*v1.Pod{}
  462. )
  463. // Create pods gradually instead of all at once because scheduler has
  464. // exponential backoff
  465. ginkgo.By(fmt.Sprintf("Creating %v pods periodically", numConcurrentPods))
  466. stop := make(chan struct{})
  467. go wait.Until(func() {
  468. podsLock.Lock()
  469. defer podsLock.Unlock()
  470. if numCreated >= totalPods {
  471. // Created all the pods for the test
  472. return
  473. }
  474. if len(pods) > numConcurrentPods/2 {
  475. // Too many outstanding pods
  476. return
  477. }
  478. for i := 0; i < numConcurrentPods; i++ {
  479. pvcs := []*v1.PersistentVolumeClaim{}
  480. for j := 0; j < volsPerPod; j++ {
  481. pvc := framework.MakePersistentVolumeClaim(makeLocalPVCConfig(config, volType), config.ns)
  482. pvc, err := framework.CreatePVC(config.client, config.ns, pvc)
  483. framework.ExpectNoError(err)
  484. pvcs = append(pvcs, pvc)
  485. }
  486. pod := framework.MakeSecPod(config.ns, pvcs, false, "sleep 1", false, false, selinuxLabel, nil)
  487. pod, err := config.client.CoreV1().Pods(config.ns).Create(pod)
  488. framework.ExpectNoError(err)
  489. pods[pod.Name] = pod
  490. numCreated++
  491. }
  492. }, 2*time.Second, stop)
  493. defer func() {
  494. close(stop)
  495. podsLock.Lock()
  496. defer podsLock.Unlock()
  497. for _, pod := range pods {
  498. if err := deletePodAndPVCs(config, pod); err != nil {
  499. e2elog.Logf("Deleting pod %v failed: %v", pod.Name, err)
  500. }
  501. }
  502. }()
  503. ginkgo.By("Waiting for all pods to complete successfully")
  504. err := wait.PollImmediate(time.Second, 5*time.Minute, func() (done bool, err error) {
  505. podsList, err := config.client.CoreV1().Pods(config.ns).List(metav1.ListOptions{})
  506. if err != nil {
  507. return false, err
  508. }
  509. podsLock.Lock()
  510. defer podsLock.Unlock()
  511. for _, pod := range podsList.Items {
  512. switch pod.Status.Phase {
  513. case v1.PodSucceeded:
  514. // Delete pod and its PVCs
  515. if err := deletePodAndPVCs(config, &pod); err != nil {
  516. return false, err
  517. }
  518. delete(pods, pod.Name)
  519. numFinished++
  520. e2elog.Logf("%v/%v pods finished", numFinished, totalPods)
  521. case v1.PodFailed:
  522. case v1.PodUnknown:
  523. return false, fmt.Errorf("pod %v is in %v phase", pod.Name, pod.Status.Phase)
  524. }
  525. }
  526. return numFinished == totalPods, nil
  527. })
  528. framework.ExpectNoError(err)
  529. })
  530. })
  531. ginkgo.Context("Pods sharing a single local PV [Serial]", func() {
  532. var (
  533. pv *v1.PersistentVolume
  534. )
  535. ginkgo.BeforeEach(func() {
  536. localVolume := &localTestVolume{
  537. ltr: &utils.LocalTestResource{
  538. Node: config.node0,
  539. Path: "/tmp",
  540. },
  541. localVolumeType: DirectoryLocalVolumeType,
  542. }
  543. pvConfig := makeLocalPVConfig(config, localVolume)
  544. var err error
  545. pv, err = framework.CreatePV(config.client, framework.MakePersistentVolume(pvConfig))
  546. framework.ExpectNoError(err)
  547. })
  548. ginkgo.AfterEach(func() {
  549. if pv == nil {
  550. return
  551. }
  552. ginkgo.By(fmt.Sprintf("Clean PV %s", pv.Name))
  553. err := config.client.CoreV1().PersistentVolumes().Delete(pv.Name, &metav1.DeleteOptions{})
  554. framework.ExpectNoError(err)
  555. })
  556. ginkgo.It("all pods should be running", func() {
  557. var (
  558. pvc *v1.PersistentVolumeClaim
  559. pods = map[string]*v1.Pod{}
  560. count = 50
  561. err error
  562. )
  563. pvc = framework.MakePersistentVolumeClaim(makeLocalPVCConfig(config, DirectoryLocalVolumeType), config.ns)
  564. ginkgo.By(fmt.Sprintf("Create a PVC %s", pvc.Name))
  565. pvc, err = framework.CreatePVC(config.client, config.ns, pvc)
  566. framework.ExpectNoError(err)
  567. ginkgo.By(fmt.Sprintf("Create %d pods to use this PVC", count))
  568. for i := 0; i < count; i++ {
  569. pod := framework.MakeSecPod(config.ns, []*v1.PersistentVolumeClaim{pvc}, false, "", false, false, selinuxLabel, nil)
  570. pod, err := config.client.CoreV1().Pods(config.ns).Create(pod)
  571. framework.ExpectNoError(err)
  572. pods[pod.Name] = pod
  573. }
  574. ginkgo.By("Wait for all pods are running")
  575. err = wait.PollImmediate(time.Second, 5*time.Minute, func() (done bool, err error) {
  576. podsList, err := config.client.CoreV1().Pods(config.ns).List(metav1.ListOptions{})
  577. if err != nil {
  578. return false, err
  579. }
  580. runningPods := 0
  581. for _, pod := range podsList.Items {
  582. switch pod.Status.Phase {
  583. case v1.PodRunning:
  584. runningPods++
  585. }
  586. }
  587. return runningPods == count, nil
  588. })
  589. framework.ExpectNoError(err)
  590. })
  591. })
  592. })
  593. func deletePodAndPVCs(config *localTestConfig, pod *v1.Pod) error {
  594. e2elog.Logf("Deleting pod %v", pod.Name)
  595. if err := config.client.CoreV1().Pods(config.ns).Delete(pod.Name, nil); err != nil {
  596. return err
  597. }
  598. // Delete PVCs
  599. for _, vol := range pod.Spec.Volumes {
  600. pvcSource := vol.VolumeSource.PersistentVolumeClaim
  601. if pvcSource != nil {
  602. if err := framework.DeletePersistentVolumeClaim(config.client, pvcSource.ClaimName, config.ns); err != nil {
  603. return err
  604. }
  605. }
  606. }
  607. return nil
  608. }
  609. type makeLocalPodWith func(config *localTestConfig, volume *localTestVolume, nodeName string) *v1.Pod
  610. func testPodWithNodeConflict(config *localTestConfig, testVolType localVolumeType, nodeName string, makeLocalPodFunc makeLocalPodWith, bindingMode storagev1.VolumeBindingMode) {
  611. ginkgo.By(fmt.Sprintf("local-volume-type: %s", testVolType))
  612. testVols := setupLocalVolumesPVCsPVs(config, testVolType, config.node0, 1, bindingMode)
  613. testVol := testVols[0]
  614. pod := makeLocalPodFunc(config, testVol, nodeName)
  615. pod, err := config.client.CoreV1().Pods(config.ns).Create(pod)
  616. framework.ExpectNoError(err)
  617. err = framework.WaitForPodNameUnschedulableInNamespace(config.client, pod.Name, pod.Namespace)
  618. framework.ExpectNoError(err)
  619. }
  620. // The tests below are run against multiple mount point types
  621. // Test two pods at the same time, write from pod1, and read from pod2
  622. func twoPodsReadWriteTest(config *localTestConfig, testVol *localTestVolume) {
  623. ginkgo.By("Creating pod1 to write to the PV")
  624. pod1, pod1Err := createLocalPod(config, testVol, nil)
  625. framework.ExpectNoError(pod1Err)
  626. verifyLocalPod(config, testVol, pod1, config.node0.Name)
  627. writeCmd := createWriteCmd(volumeDir, testFile, testFileContent, testVol.localVolumeType)
  628. ginkgo.By("Writing in pod1")
  629. podRWCmdExec(pod1, writeCmd)
  630. // testFileContent was written after creating pod1
  631. testReadFileContent(volumeDir, testFile, testFileContent, pod1, testVol.localVolumeType)
  632. ginkgo.By("Creating pod2 to read from the PV")
  633. pod2, pod2Err := createLocalPod(config, testVol, nil)
  634. framework.ExpectNoError(pod2Err)
  635. verifyLocalPod(config, testVol, pod2, config.node0.Name)
  636. // testFileContent was written after creating pod1
  637. testReadFileContent(volumeDir, testFile, testFileContent, pod2, testVol.localVolumeType)
  638. writeCmd = createWriteCmd(volumeDir, testFile, testVol.ltr.Path /*writeTestFileContent*/, testVol.localVolumeType)
  639. ginkgo.By("Writing in pod2")
  640. podRWCmdExec(pod2, writeCmd)
  641. ginkgo.By("Reading in pod1")
  642. testReadFileContent(volumeDir, testFile, testVol.ltr.Path, pod1, testVol.localVolumeType)
  643. ginkgo.By("Deleting pod1")
  644. framework.DeletePodOrFail(config.client, config.ns, pod1.Name)
  645. ginkgo.By("Deleting pod2")
  646. framework.DeletePodOrFail(config.client, config.ns, pod2.Name)
  647. }
  648. // Test two pods one after other, write from pod1, and read from pod2
  649. func twoPodsReadWriteSerialTest(config *localTestConfig, testVol *localTestVolume) {
  650. ginkgo.By("Creating pod1")
  651. pod1, pod1Err := createLocalPod(config, testVol, nil)
  652. framework.ExpectNoError(pod1Err)
  653. verifyLocalPod(config, testVol, pod1, config.node0.Name)
  654. writeCmd := createWriteCmd(volumeDir, testFile, testFileContent, testVol.localVolumeType)
  655. ginkgo.By("Writing in pod1")
  656. podRWCmdExec(pod1, writeCmd)
  657. // testFileContent was written after creating pod1
  658. testReadFileContent(volumeDir, testFile, testFileContent, pod1, testVol.localVolumeType)
  659. ginkgo.By("Deleting pod1")
  660. framework.DeletePodOrFail(config.client, config.ns, pod1.Name)
  661. ginkgo.By("Creating pod2")
  662. pod2, pod2Err := createLocalPod(config, testVol, nil)
  663. framework.ExpectNoError(pod2Err)
  664. verifyLocalPod(config, testVol, pod2, config.node0.Name)
  665. ginkgo.By("Reading in pod2")
  666. testReadFileContent(volumeDir, testFile, testFileContent, pod2, testVol.localVolumeType)
  667. ginkgo.By("Deleting pod2")
  668. framework.DeletePodOrFail(config.client, config.ns, pod2.Name)
  669. }
  670. // Test creating pod with fsGroup, and check fsGroup is expected fsGroup.
  671. func createPodWithFsGroupTest(config *localTestConfig, testVol *localTestVolume, fsGroup int64, expectedFsGroup int64) *v1.Pod {
  672. pod, err := createLocalPod(config, testVol, &fsGroup)
  673. framework.ExpectNoError(err)
  674. _, err = framework.LookForStringInPodExec(config.ns, pod.Name, []string{"stat", "-c", "%g", volumeDir}, strconv.FormatInt(expectedFsGroup, 10), time.Second*3)
  675. framework.ExpectNoError(err, "failed to get expected fsGroup %d on directory %s in pod %s", fsGroup, volumeDir, pod.Name)
  676. return pod
  677. }
  678. func setupStorageClass(config *localTestConfig, mode *storagev1.VolumeBindingMode) {
  679. sc := &storagev1.StorageClass{
  680. ObjectMeta: metav1.ObjectMeta{
  681. Name: config.scName,
  682. },
  683. Provisioner: "kubernetes.io/no-provisioner",
  684. VolumeBindingMode: mode,
  685. }
  686. _, err := config.client.StorageV1().StorageClasses().Create(sc)
  687. framework.ExpectNoError(err)
  688. }
  689. func cleanupStorageClass(config *localTestConfig) {
  690. framework.ExpectNoError(config.client.StorageV1().StorageClasses().Delete(config.scName, nil))
  691. }
  692. // podNode wraps RunKubectl to get node where pod is running
  693. func podNodeName(config *localTestConfig, pod *v1.Pod) (string, error) {
  694. runtimePod, runtimePodErr := config.client.CoreV1().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{})
  695. return runtimePod.Spec.NodeName, runtimePodErr
  696. }
  697. // setupLocalVolumes sets up directories to use for local PV
  698. func setupLocalVolumes(config *localTestConfig, localVolumeType localVolumeType, node *v1.Node, count int) []*localTestVolume {
  699. vols := []*localTestVolume{}
  700. for i := 0; i < count; i++ {
  701. ltrType, ok := setupLocalVolumeMap[localVolumeType]
  702. gomega.Expect(ok).To(gomega.BeTrue())
  703. ltr := config.ltrMgr.Create(node, ltrType, nil)
  704. vols = append(vols, &localTestVolume{
  705. ltr: ltr,
  706. localVolumeType: localVolumeType,
  707. })
  708. }
  709. return vols
  710. }
  711. func cleanupLocalPVCsPVs(config *localTestConfig, volumes []*localTestVolume) {
  712. for _, volume := range volumes {
  713. ginkgo.By("Cleaning up PVC and PV")
  714. errs := framework.PVPVCCleanup(config.client, config.ns, volume.pv, volume.pvc)
  715. if len(errs) > 0 {
  716. framework.Failf("Failed to delete PV and/or PVC: %v", utilerrors.NewAggregate(errs))
  717. }
  718. }
  719. }
  720. // Deletes the PVC/PV, and launches a pod with hostpath volume to remove the test directory
  721. func cleanupLocalVolumes(config *localTestConfig, volumes []*localTestVolume) {
  722. cleanupLocalPVCsPVs(config, volumes)
  723. for _, volume := range volumes {
  724. config.ltrMgr.Remove(volume.ltr)
  725. }
  726. }
  727. func verifyLocalVolume(config *localTestConfig, volume *localTestVolume) {
  728. framework.ExpectNoError(framework.WaitOnPVandPVC(config.client, config.ns, volume.pv, volume.pvc))
  729. }
  730. func verifyLocalPod(config *localTestConfig, volume *localTestVolume, pod *v1.Pod, expectedNodeName string) {
  731. podNodeName, err := podNodeName(config, pod)
  732. framework.ExpectNoError(err)
  733. e2elog.Logf("pod %q created on Node %q", pod.Name, podNodeName)
  734. gomega.Expect(podNodeName).To(gomega.Equal(expectedNodeName))
  735. }
  736. func makeLocalPVCConfig(config *localTestConfig, volumeType localVolumeType) framework.PersistentVolumeClaimConfig {
  737. pvcConfig := framework.PersistentVolumeClaimConfig{
  738. AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
  739. StorageClassName: &config.scName,
  740. }
  741. if volumeType == BlockLocalVolumeType {
  742. pvcVolumeMode := v1.PersistentVolumeBlock
  743. pvcConfig.VolumeMode = &pvcVolumeMode
  744. }
  745. return pvcConfig
  746. }
  747. func makeLocalPVConfig(config *localTestConfig, volume *localTestVolume) framework.PersistentVolumeConfig {
  748. // TODO: hostname may not be the best option
  749. nodeKey := "kubernetes.io/hostname"
  750. if volume.ltr.Node.Labels == nil {
  751. framework.Failf("Node does not have labels")
  752. }
  753. nodeValue, found := volume.ltr.Node.Labels[nodeKey]
  754. if !found {
  755. framework.Failf("Node does not have required label %q", nodeKey)
  756. }
  757. pvConfig := framework.PersistentVolumeConfig{
  758. PVSource: v1.PersistentVolumeSource{
  759. Local: &v1.LocalVolumeSource{
  760. Path: volume.ltr.Path,
  761. },
  762. },
  763. NamePrefix: "local-pv",
  764. StorageClassName: config.scName,
  765. NodeAffinity: &v1.VolumeNodeAffinity{
  766. Required: &v1.NodeSelector{
  767. NodeSelectorTerms: []v1.NodeSelectorTerm{
  768. {
  769. MatchExpressions: []v1.NodeSelectorRequirement{
  770. {
  771. Key: nodeKey,
  772. Operator: v1.NodeSelectorOpIn,
  773. Values: []string{nodeValue},
  774. },
  775. },
  776. },
  777. },
  778. },
  779. },
  780. }
  781. if volume.localVolumeType == BlockLocalVolumeType {
  782. pvVolumeMode := v1.PersistentVolumeBlock
  783. pvConfig.VolumeMode = &pvVolumeMode
  784. }
  785. return pvConfig
  786. }
  787. // Creates a PVC and PV with prebinding
  788. func createLocalPVCsPVs(config *localTestConfig, volumes []*localTestVolume, mode storagev1.VolumeBindingMode) {
  789. var err error
  790. for _, volume := range volumes {
  791. pvcConfig := makeLocalPVCConfig(config, volume.localVolumeType)
  792. pvConfig := makeLocalPVConfig(config, volume)
  793. volume.pv, volume.pvc, err = framework.CreatePVPVC(config.client, pvConfig, pvcConfig, config.ns, false)
  794. framework.ExpectNoError(err)
  795. }
  796. if mode == storagev1.VolumeBindingImmediate {
  797. for _, volume := range volumes {
  798. verifyLocalVolume(config, volume)
  799. }
  800. } else {
  801. // Verify PVCs are not bound
  802. // There isn't really a great way to verify this without making the test be slow...
  803. err = wait.PollImmediate(time.Second, 10*time.Second, func() (done bool, err error) {
  804. for _, volume := range volumes {
  805. pvc, err := config.client.CoreV1().PersistentVolumeClaims(volume.pvc.Namespace).Get(volume.pvc.Name, metav1.GetOptions{})
  806. framework.ExpectNoError(err)
  807. gomega.Expect(pvc.Status.Phase).To(gomega.Equal(v1.ClaimPending))
  808. }
  809. return false, nil
  810. })
  811. framework.ExpectError(err)
  812. }
  813. }
  814. func makeLocalPodWithNodeAffinity(config *localTestConfig, volume *localTestVolume, nodeName string) (pod *v1.Pod) {
  815. pod = framework.MakeSecPod(config.ns, []*v1.PersistentVolumeClaim{volume.pvc}, false, "", false, false, selinuxLabel, nil)
  816. if pod == nil {
  817. return
  818. }
  819. affinity := &v1.Affinity{
  820. NodeAffinity: &v1.NodeAffinity{
  821. RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
  822. NodeSelectorTerms: []v1.NodeSelectorTerm{
  823. {
  824. MatchExpressions: []v1.NodeSelectorRequirement{
  825. {
  826. Key: "kubernetes.io/hostname",
  827. Operator: v1.NodeSelectorOpIn,
  828. Values: []string{nodeName},
  829. },
  830. },
  831. },
  832. },
  833. },
  834. },
  835. }
  836. pod.Spec.Affinity = affinity
  837. return
  838. }
  839. func makeLocalPodWithNodeSelector(config *localTestConfig, volume *localTestVolume, nodeName string) (pod *v1.Pod) {
  840. pod = framework.MakeSecPod(config.ns, []*v1.PersistentVolumeClaim{volume.pvc}, false, "", false, false, selinuxLabel, nil)
  841. if pod == nil {
  842. return
  843. }
  844. ns := map[string]string{
  845. "kubernetes.io/hostname": nodeName,
  846. }
  847. pod.Spec.NodeSelector = ns
  848. return
  849. }
  850. func makeLocalPodWithNodeName(config *localTestConfig, volume *localTestVolume, nodeName string) (pod *v1.Pod) {
  851. pod = framework.MakeSecPod(config.ns, []*v1.PersistentVolumeClaim{volume.pvc}, false, "", false, false, selinuxLabel, nil)
  852. if pod == nil {
  853. return
  854. }
  855. pod.Spec.NodeName = nodeName
  856. return
  857. }
  858. func createLocalPod(config *localTestConfig, volume *localTestVolume, fsGroup *int64) (*v1.Pod, error) {
  859. ginkgo.By("Creating a pod")
  860. return framework.CreateSecPod(config.client, config.ns, []*v1.PersistentVolumeClaim{volume.pvc}, false, "", false, false, selinuxLabel, fsGroup, framework.PodStartShortTimeout)
  861. }
  862. func createWriteCmd(testDir string, testFile string, writeTestFileContent string, volumeType localVolumeType) string {
  863. if volumeType == BlockLocalVolumeType {
  864. // testDir is the block device.
  865. testFileDir := filepath.Join("/tmp", testDir)
  866. testFilePath := filepath.Join(testFileDir, testFile)
  867. // Create a file containing the testFileContent.
  868. writeTestFileCmd := fmt.Sprintf("mkdir -p %s; echo %s > %s", testFileDir, writeTestFileContent, testFilePath)
  869. // sudo is needed when using ssh exec to node.
  870. // sudo is not needed and does not exist in some containers (e.g. busybox), when using pod exec.
  871. sudoCmd := fmt.Sprintf("SUDO_CMD=$(which sudo); echo ${SUDO_CMD}")
  872. // Write the testFileContent into the block device.
  873. writeBlockCmd := fmt.Sprintf("${SUDO_CMD} dd if=%s of=%s bs=512 count=100", testFilePath, testDir)
  874. // Cleanup the file containing testFileContent.
  875. deleteTestFileCmd := fmt.Sprintf("rm %s", testFilePath)
  876. return fmt.Sprintf("%s && %s && %s && %s", writeTestFileCmd, sudoCmd, writeBlockCmd, deleteTestFileCmd)
  877. }
  878. testFilePath := filepath.Join(testDir, testFile)
  879. return fmt.Sprintf("mkdir -p %s; echo %s > %s", testDir, writeTestFileContent, testFilePath)
  880. }
  881. func createReadCmd(testFileDir string, testFile string, volumeType localVolumeType) string {
  882. if volumeType == BlockLocalVolumeType {
  883. // Create the command to read the beginning of the block device and print it in ascii.
  884. return fmt.Sprintf("hexdump -n 100 -e '100 \"%%_p\"' %s | head -1", testFileDir)
  885. }
  886. // Create the command to read (aka cat) a file.
  887. testFilePath := filepath.Join(testFileDir, testFile)
  888. return fmt.Sprintf("cat %s", testFilePath)
  889. }
  890. // Read testFile and evaluate whether it contains the testFileContent
  891. func testReadFileContent(testFileDir string, testFile string, testFileContent string, pod *v1.Pod, volumeType localVolumeType) {
  892. readCmd := createReadCmd(testFileDir, testFile, volumeType)
  893. readOut := podRWCmdExec(pod, readCmd)
  894. gomega.Expect(readOut).To(gomega.ContainSubstring(testFileContent))
  895. }
  896. // Execute a read or write command in a pod.
  897. // Fail on error
  898. func podRWCmdExec(pod *v1.Pod, cmd string) string {
  899. out, err := utils.PodExec(pod, cmd)
  900. e2elog.Logf("podRWCmdExec out: %q err: %v", out, err)
  901. framework.ExpectNoError(err)
  902. return out
  903. }
  904. // Initialize test volume on node
  905. // and create local PVC and PV
  906. func setupLocalVolumesPVCsPVs(
  907. config *localTestConfig,
  908. localVolumeType localVolumeType,
  909. node *v1.Node,
  910. count int,
  911. mode storagev1.VolumeBindingMode) []*localTestVolume {
  912. ginkgo.By("Initializing test volumes")
  913. testVols := setupLocalVolumes(config, localVolumeType, node, count)
  914. ginkgo.By("Creating local PVCs and PVs")
  915. createLocalPVCsPVs(config, testVols, mode)
  916. return testVols
  917. }
  918. // newLocalClaim creates a new persistent volume claim.
  919. func newLocalClaimWithName(config *localTestConfig, name string) *v1.PersistentVolumeClaim {
  920. claim := v1.PersistentVolumeClaim{
  921. ObjectMeta: metav1.ObjectMeta{
  922. Name: name,
  923. Namespace: config.ns,
  924. },
  925. Spec: v1.PersistentVolumeClaimSpec{
  926. StorageClassName: &config.scName,
  927. AccessModes: []v1.PersistentVolumeAccessMode{
  928. v1.ReadWriteOnce,
  929. },
  930. Resources: v1.ResourceRequirements{
  931. Requests: v1.ResourceList{
  932. v1.ResourceName(v1.ResourceStorage): resource.MustParse(testRequestSize),
  933. },
  934. },
  935. },
  936. }
  937. return &claim
  938. }
  939. func createStatefulSet(config *localTestConfig, ssReplicas int32, volumeCount int, anti, parallel bool) *appsv1.StatefulSet {
  940. mounts := []v1.VolumeMount{}
  941. claims := []v1.PersistentVolumeClaim{}
  942. for i := 0; i < volumeCount; i++ {
  943. name := fmt.Sprintf("vol%v", i+1)
  944. pvc := newLocalClaimWithName(config, name)
  945. mounts = append(mounts, v1.VolumeMount{Name: name, MountPath: "/" + name})
  946. claims = append(claims, *pvc)
  947. }
  948. podAffinityTerms := []v1.PodAffinityTerm{
  949. {
  950. LabelSelector: &metav1.LabelSelector{
  951. MatchExpressions: []metav1.LabelSelectorRequirement{
  952. {
  953. Key: "app",
  954. Operator: metav1.LabelSelectorOpIn,
  955. Values: []string{"local-volume-test"},
  956. },
  957. },
  958. },
  959. TopologyKey: "kubernetes.io/hostname",
  960. },
  961. }
  962. affinity := v1.Affinity{}
  963. if anti {
  964. affinity.PodAntiAffinity = &v1.PodAntiAffinity{
  965. RequiredDuringSchedulingIgnoredDuringExecution: podAffinityTerms,
  966. }
  967. } else {
  968. affinity.PodAffinity = &v1.PodAffinity{
  969. RequiredDuringSchedulingIgnoredDuringExecution: podAffinityTerms,
  970. }
  971. }
  972. labels := map[string]string{"app": "local-volume-test"}
  973. spec := &appsv1.StatefulSet{
  974. ObjectMeta: metav1.ObjectMeta{
  975. Name: "local-volume-statefulset",
  976. Namespace: config.ns,
  977. },
  978. Spec: appsv1.StatefulSetSpec{
  979. Selector: &metav1.LabelSelector{
  980. MatchLabels: map[string]string{"app": "local-volume-test"},
  981. },
  982. Replicas: &ssReplicas,
  983. Template: v1.PodTemplateSpec{
  984. ObjectMeta: metav1.ObjectMeta{
  985. Labels: labels,
  986. },
  987. Spec: v1.PodSpec{
  988. Containers: []v1.Container{
  989. {
  990. Name: "nginx",
  991. Image: imageutils.GetE2EImage(imageutils.Nginx),
  992. VolumeMounts: mounts,
  993. },
  994. },
  995. Affinity: &affinity,
  996. },
  997. },
  998. VolumeClaimTemplates: claims,
  999. ServiceName: "test-service",
  1000. },
  1001. }
  1002. if parallel {
  1003. spec.Spec.PodManagementPolicy = appsv1.ParallelPodManagement
  1004. }
  1005. ss, err := config.client.AppsV1().StatefulSets(config.ns).Create(spec)
  1006. framework.ExpectNoError(err)
  1007. config.ssTester.WaitForRunningAndReady(ssReplicas, ss)
  1008. return ss
  1009. }
  1010. func validateStatefulSet(config *localTestConfig, ss *appsv1.StatefulSet, anti bool) {
  1011. pods := config.ssTester.GetPodList(ss)
  1012. nodes := sets.NewString()
  1013. for _, pod := range pods.Items {
  1014. nodes.Insert(pod.Spec.NodeName)
  1015. }
  1016. if anti {
  1017. // Verify that each pod is on a different node
  1018. gomega.Expect(nodes.Len()).To(gomega.Equal(len(pods.Items)))
  1019. } else {
  1020. // Verify that all pods are on same node.
  1021. gomega.Expect(nodes.Len()).To(gomega.Equal(1))
  1022. }
  1023. // Validate all PVCs are bound
  1024. for _, pod := range pods.Items {
  1025. for _, volume := range pod.Spec.Volumes {
  1026. pvcSource := volume.VolumeSource.PersistentVolumeClaim
  1027. if pvcSource != nil {
  1028. err := framework.WaitForPersistentVolumeClaimPhase(
  1029. v1.ClaimBound, config.client, config.ns, pvcSource.ClaimName, framework.Poll, time.Second)
  1030. framework.ExpectNoError(err)
  1031. }
  1032. }
  1033. }
  1034. }
  1035. // SkipUnlessLocalSSDExists takes in an ssdInterface (scsi/nvme) and a filesystemType (fs/block)
  1036. // and skips if a disk of that type does not exist on the node
  1037. func SkipUnlessLocalSSDExists(config *localTestConfig, ssdInterface, filesystemType string, node *v1.Node) {
  1038. ssdCmd := fmt.Sprintf("ls -1 /mnt/disks/by-uuid/google-local-ssds-%s-%s/ | wc -l", ssdInterface, filesystemType)
  1039. res, err := config.hostExec.IssueCommandWithResult(ssdCmd, node)
  1040. framework.ExpectNoError(err)
  1041. num, err := strconv.Atoi(strings.TrimSpace(res))
  1042. framework.ExpectNoError(err)
  1043. if num < 1 {
  1044. framework.Skipf("Requires at least 1 %s %s localSSD ", ssdInterface, filesystemType)
  1045. }
  1046. }