persistent_volumes-local.go 41 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208
  1. /*
  2. Copyright 2018 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package storage
  14. import (
  15. "context"
  16. "fmt"
  17. "path/filepath"
  18. "strconv"
  19. "strings"
  20. "sync"
  21. "time"
  22. "github.com/onsi/ginkgo"
  23. "github.com/onsi/gomega"
  24. appsv1 "k8s.io/api/apps/v1"
  25. v1 "k8s.io/api/core/v1"
  26. storagev1 "k8s.io/api/storage/v1"
  27. apierrors "k8s.io/apimachinery/pkg/api/errors"
  28. "k8s.io/apimachinery/pkg/api/resource"
  29. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  30. utilerrors "k8s.io/apimachinery/pkg/util/errors"
  31. "k8s.io/apimachinery/pkg/util/sets"
  32. "k8s.io/apimachinery/pkg/util/wait"
  33. "k8s.io/apimachinery/pkg/watch"
  34. clientset "k8s.io/client-go/kubernetes"
  35. "k8s.io/kubernetes/test/e2e/framework"
  36. e2enode "k8s.io/kubernetes/test/e2e/framework/node"
  37. e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
  38. e2epv "k8s.io/kubernetes/test/e2e/framework/pv"
  39. e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
  40. e2esset "k8s.io/kubernetes/test/e2e/framework/statefulset"
  41. "k8s.io/kubernetes/test/e2e/storage/utils"
  42. imageutils "k8s.io/kubernetes/test/utils/image"
  43. )
  44. type localTestConfig struct {
  45. ns string
  46. nodes []v1.Node
  47. node0 *v1.Node
  48. client clientset.Interface
  49. scName string
  50. discoveryDir string
  51. hostExec utils.HostExec
  52. ltrMgr utils.LocalTestResourceManager
  53. }
  54. type localVolumeType string
  55. const (
  56. // DirectoryLocalVolumeType is the default local volume type, aka a directory
  57. DirectoryLocalVolumeType localVolumeType = "dir"
  58. // DirectoryLinkLocalVolumeType is like DirectoryLocalVolumeType,
  59. // but it's a symbolic link to directory
  60. DirectoryLinkLocalVolumeType localVolumeType = "dir-link"
  61. // DirectoryBindMountedLocalVolumeType is like DirectoryLocalVolumeType
  62. // but bind mounted
  63. DirectoryBindMountedLocalVolumeType localVolumeType = "dir-bindmounted"
  64. // DirectoryLinkBindMountedLocalVolumeType is like DirectoryLocalVolumeType,
  65. // but it's a symbolic link to self bind mounted directory
  66. // Note that bind mounting at symbolic link actually mounts at directory it
  67. // links to.
  68. DirectoryLinkBindMountedLocalVolumeType localVolumeType = "dir-link-bindmounted"
  69. // TmpfsLocalVolumeType creates a tmpfs and mounts it
  70. TmpfsLocalVolumeType localVolumeType = "tmpfs"
  71. // GCELocalSSDVolumeType tests based on local ssd at /mnt/disks/by-uuid/
  72. GCELocalSSDVolumeType localVolumeType = "gce-localssd-scsi-fs"
  73. // BlockLocalVolumeType creates a local file, formats it, and maps it as a block device.
  74. BlockLocalVolumeType localVolumeType = "block"
  75. // BlockFsWithFormatLocalVolumeType creates a local file serving as the backing for block device,
  76. // formats it, and mounts it to use as FS mode local volume.
  77. BlockFsWithFormatLocalVolumeType localVolumeType = "blockfswithformat"
  78. // BlockFsWithoutFormatLocalVolumeType creates a local file serving as the backing for block device,
  79. // does not format it manually, and mounts it to use as FS mode local volume.
  80. BlockFsWithoutFormatLocalVolumeType localVolumeType = "blockfswithoutformat"
  81. )
  82. // map to local test resource type
  83. var setupLocalVolumeMap = map[localVolumeType]utils.LocalVolumeType{
  84. GCELocalSSDVolumeType: utils.LocalVolumeGCELocalSSD,
  85. TmpfsLocalVolumeType: utils.LocalVolumeTmpfs,
  86. DirectoryLocalVolumeType: utils.LocalVolumeDirectory,
  87. DirectoryLinkLocalVolumeType: utils.LocalVolumeDirectoryLink,
  88. DirectoryBindMountedLocalVolumeType: utils.LocalVolumeDirectoryBindMounted,
  89. DirectoryLinkBindMountedLocalVolumeType: utils.LocalVolumeDirectoryLinkBindMounted,
  90. BlockLocalVolumeType: utils.LocalVolumeBlock, // block device in Block mode
  91. BlockFsWithFormatLocalVolumeType: utils.LocalVolumeBlockFS,
  92. BlockFsWithoutFormatLocalVolumeType: utils.LocalVolumeBlock, // block device in Filesystem mode (default in this test suite)
  93. }
  94. type localTestVolume struct {
  95. // Local test resource
  96. ltr *utils.LocalTestResource
  97. // PVC for this volume
  98. pvc *v1.PersistentVolumeClaim
  99. // PV for this volume
  100. pv *v1.PersistentVolume
  101. // Type of local volume
  102. localVolumeType localVolumeType
  103. }
  104. const (
  105. // TODO: This may not be available/writable on all images.
  106. hostBase = "/tmp"
  107. // Path to the first volume in the test containers
  108. // created via createLocalPod or makeLocalPod
  109. // leveraging pv_util.MakePod
  110. volumeDir = "/mnt/volume1"
  111. // testFile created in setupLocalVolume
  112. testFile = "test-file"
  113. // testFileContent written into testFile
  114. testFileContent = "test-file-content"
  115. testSCPrefix = "local-volume-test-storageclass"
  116. // A sample request size
  117. testRequestSize = "10Mi"
  118. // Max number of nodes to use for testing
  119. maxNodes = 5
  120. )
  121. var (
  122. // storage class volume binding modes
  123. waitMode = storagev1.VolumeBindingWaitForFirstConsumer
  124. immediateMode = storagev1.VolumeBindingImmediate
  125. // Common selinux labels
  126. selinuxLabel = &v1.SELinuxOptions{
  127. Level: "s0:c0,c1"}
  128. )
  129. var _ = utils.SIGDescribe("PersistentVolumes-local ", func() {
  130. f := framework.NewDefaultFramework("persistent-local-volumes-test")
  131. var (
  132. config *localTestConfig
  133. scName string
  134. )
  135. ginkgo.BeforeEach(func() {
  136. nodes, err := e2enode.GetBoundedReadySchedulableNodes(f.ClientSet, maxNodes)
  137. framework.ExpectNoError(err)
  138. scName = fmt.Sprintf("%v-%v", testSCPrefix, f.Namespace.Name)
  139. // Choose the first node
  140. node0 := &nodes.Items[0]
  141. hostExec := utils.NewHostExec(f)
  142. ltrMgr := utils.NewLocalResourceManager("local-volume-test", hostExec, hostBase)
  143. config = &localTestConfig{
  144. ns: f.Namespace.Name,
  145. client: f.ClientSet,
  146. nodes: nodes.Items,
  147. node0: node0,
  148. scName: scName,
  149. discoveryDir: filepath.Join(hostBase, f.Namespace.Name),
  150. hostExec: hostExec,
  151. ltrMgr: ltrMgr,
  152. }
  153. })
  154. for tempTestVolType := range setupLocalVolumeMap {
  155. // New variable required for gingko test closures
  156. testVolType := tempTestVolType
  157. serialStr := ""
  158. if testVolType == GCELocalSSDVolumeType {
  159. serialStr = " [Serial]"
  160. }
  161. ctxString := fmt.Sprintf("[Volume type: %s]%v", testVolType, serialStr)
  162. testMode := immediateMode
  163. ginkgo.Context(ctxString, func() {
  164. var testVol *localTestVolume
  165. ginkgo.BeforeEach(func() {
  166. if testVolType == GCELocalSSDVolumeType {
  167. SkipUnlessLocalSSDExists(config, "scsi", "fs", config.node0)
  168. }
  169. setupStorageClass(config, &testMode)
  170. testVols := setupLocalVolumesPVCsPVs(config, testVolType, config.node0, 1, testMode)
  171. testVol = testVols[0]
  172. })
  173. ginkgo.AfterEach(func() {
  174. cleanupLocalVolumes(config, []*localTestVolume{testVol})
  175. cleanupStorageClass(config)
  176. })
  177. ginkgo.Context("One pod requesting one prebound PVC", func() {
  178. var (
  179. pod1 *v1.Pod
  180. pod1Err error
  181. )
  182. ginkgo.BeforeEach(func() {
  183. ginkgo.By("Creating pod1")
  184. pod1, pod1Err = createLocalPod(config, testVol, nil)
  185. framework.ExpectNoError(pod1Err)
  186. verifyLocalPod(config, testVol, pod1, config.node0.Name)
  187. writeCmd := createWriteCmd(volumeDir, testFile, testFileContent, testVol.localVolumeType)
  188. ginkgo.By("Writing in pod1")
  189. podRWCmdExec(f, pod1, writeCmd)
  190. })
  191. ginkgo.AfterEach(func() {
  192. ginkgo.By("Deleting pod1")
  193. e2epod.DeletePodOrFail(config.client, config.ns, pod1.Name)
  194. })
  195. ginkgo.It("should be able to mount volume and read from pod1", func() {
  196. ginkgo.By("Reading in pod1")
  197. // testFileContent was written in BeforeEach
  198. testReadFileContent(f, volumeDir, testFile, testFileContent, pod1, testVolType)
  199. })
  200. ginkgo.It("should be able to mount volume and write from pod1", func() {
  201. // testFileContent was written in BeforeEach
  202. testReadFileContent(f, volumeDir, testFile, testFileContent, pod1, testVolType)
  203. ginkgo.By("Writing in pod1")
  204. writeCmd := createWriteCmd(volumeDir, testFile, testVol.ltr.Path /*writeTestFileContent*/, testVolType)
  205. podRWCmdExec(f, pod1, writeCmd)
  206. })
  207. })
  208. ginkgo.Context("Two pods mounting a local volume at the same time", func() {
  209. ginkgo.It("should be able to write from pod1 and read from pod2", func() {
  210. twoPodsReadWriteTest(f, config, testVol)
  211. })
  212. })
  213. ginkgo.Context("Two pods mounting a local volume one after the other", func() {
  214. ginkgo.It("should be able to write from pod1 and read from pod2", func() {
  215. twoPodsReadWriteSerialTest(f, config, testVol)
  216. })
  217. })
  218. ginkgo.Context("Set fsGroup for local volume", func() {
  219. ginkgo.BeforeEach(func() {
  220. if testVolType == BlockLocalVolumeType {
  221. e2eskipper.Skipf("We don't set fsGroup on block device, skipped.")
  222. }
  223. })
  224. ginkgo.It("should set fsGroup for one pod [Slow]", func() {
  225. ginkgo.By("Checking fsGroup is set")
  226. pod := createPodWithFsGroupTest(config, testVol, 1234, 1234)
  227. ginkgo.By("Deleting pod")
  228. e2epod.DeletePodOrFail(config.client, config.ns, pod.Name)
  229. })
  230. ginkgo.It("should set same fsGroup for two pods simultaneously [Slow]", func() {
  231. fsGroup := int64(1234)
  232. ginkgo.By("Create first pod and check fsGroup is set")
  233. pod1 := createPodWithFsGroupTest(config, testVol, fsGroup, fsGroup)
  234. ginkgo.By("Create second pod with same fsGroup and check fsGroup is correct")
  235. pod2 := createPodWithFsGroupTest(config, testVol, fsGroup, fsGroup)
  236. ginkgo.By("Deleting first pod")
  237. e2epod.DeletePodOrFail(config.client, config.ns, pod1.Name)
  238. ginkgo.By("Deleting second pod")
  239. e2epod.DeletePodOrFail(config.client, config.ns, pod2.Name)
  240. })
  241. ginkgo.It("should set different fsGroup for second pod if first pod is deleted", func() {
  242. e2eskipper.Skipf("Disabled temporarily, reopen after #73168 is fixed")
  243. fsGroup1, fsGroup2 := int64(1234), int64(4321)
  244. ginkgo.By("Create first pod and check fsGroup is set")
  245. pod1 := createPodWithFsGroupTest(config, testVol, fsGroup1, fsGroup1)
  246. ginkgo.By("Deleting first pod")
  247. err := e2epod.DeletePodWithWait(config.client, pod1)
  248. framework.ExpectNoError(err, "while deleting first pod")
  249. ginkgo.By("Create second pod and check fsGroup is the new one")
  250. pod2 := createPodWithFsGroupTest(config, testVol, fsGroup2, fsGroup2)
  251. ginkgo.By("Deleting second pod")
  252. e2epod.DeletePodOrFail(config.client, config.ns, pod2.Name)
  253. })
  254. })
  255. })
  256. }
  257. ginkgo.Context("Local volume that cannot be mounted [Slow]", func() {
  258. // TODO:
  259. // - check for these errors in unit tests instead
  260. ginkgo.It("should fail due to non-existent path", func() {
  261. testVol := &localTestVolume{
  262. ltr: &utils.LocalTestResource{
  263. Node: config.node0,
  264. Path: "/non-existent/location/nowhere",
  265. },
  266. localVolumeType: DirectoryLocalVolumeType,
  267. }
  268. ginkgo.By("Creating local PVC and PV")
  269. createLocalPVCsPVs(config, []*localTestVolume{testVol}, immediateMode)
  270. pod, err := createLocalPod(config, testVol, nil)
  271. framework.ExpectError(err)
  272. err = e2epod.WaitTimeoutForPodRunningInNamespace(config.client, pod.Name, pod.Namespace, framework.PodStartShortTimeout)
  273. framework.ExpectError(err)
  274. cleanupLocalPVCsPVs(config, []*localTestVolume{testVol})
  275. })
  276. ginkgo.It("should fail due to wrong node", func() {
  277. if len(config.nodes) < 2 {
  278. e2eskipper.Skipf("Runs only when number of nodes >= 2")
  279. }
  280. testVols := setupLocalVolumesPVCsPVs(config, DirectoryLocalVolumeType, config.node0, 1, immediateMode)
  281. testVol := testVols[0]
  282. pod := makeLocalPodWithNodeName(config, testVol, config.nodes[1].Name)
  283. pod, err := config.client.CoreV1().Pods(config.ns).Create(context.TODO(), pod, metav1.CreateOptions{})
  284. framework.ExpectNoError(err)
  285. err = e2epod.WaitTimeoutForPodRunningInNamespace(config.client, pod.Name, pod.Namespace, framework.PodStartShortTimeout)
  286. framework.ExpectError(err)
  287. cleanupLocalVolumes(config, []*localTestVolume{testVol})
  288. })
  289. })
  290. ginkgo.Context("Pod with node different from PV's NodeAffinity", func() {
  291. var (
  292. testVol *localTestVolume
  293. volumeType localVolumeType
  294. )
  295. ginkgo.BeforeEach(func() {
  296. if len(config.nodes) < 2 {
  297. e2eskipper.Skipf("Runs only when number of nodes >= 2")
  298. }
  299. volumeType = DirectoryLocalVolumeType
  300. setupStorageClass(config, &immediateMode)
  301. testVols := setupLocalVolumesPVCsPVs(config, volumeType, config.node0, 1, immediateMode)
  302. testVol = testVols[0]
  303. })
  304. ginkgo.AfterEach(func() {
  305. cleanupLocalVolumes(config, []*localTestVolume{testVol})
  306. cleanupStorageClass(config)
  307. })
  308. ginkgo.It("should fail scheduling due to different NodeAffinity", func() {
  309. testPodWithNodeConflict(config, volumeType, config.nodes[1].Name, makeLocalPodWithNodeAffinity, immediateMode)
  310. })
  311. ginkgo.It("should fail scheduling due to different NodeSelector", func() {
  312. testPodWithNodeConflict(config, volumeType, config.nodes[1].Name, makeLocalPodWithNodeSelector, immediateMode)
  313. })
  314. })
  315. ginkgo.Context("StatefulSet with pod affinity [Slow]", func() {
  316. var testVols map[string][]*localTestVolume
  317. const (
  318. ssReplicas = 3
  319. volsPerNode = 6
  320. )
  321. ginkgo.BeforeEach(func() {
  322. setupStorageClass(config, &waitMode)
  323. testVols = map[string][]*localTestVolume{}
  324. for i, node := range config.nodes {
  325. // The PVCs created here won't be used
  326. ginkgo.By(fmt.Sprintf("Setting up local volumes on node %q", node.Name))
  327. vols := setupLocalVolumesPVCsPVs(config, DirectoryLocalVolumeType, &config.nodes[i], volsPerNode, waitMode)
  328. testVols[node.Name] = vols
  329. }
  330. })
  331. ginkgo.AfterEach(func() {
  332. for _, vols := range testVols {
  333. cleanupLocalVolumes(config, vols)
  334. }
  335. cleanupStorageClass(config)
  336. })
  337. ginkgo.It("should use volumes spread across nodes when pod has anti-affinity", func() {
  338. if len(config.nodes) < ssReplicas {
  339. e2eskipper.Skipf("Runs only when number of nodes >= %v", ssReplicas)
  340. }
  341. ginkgo.By("Creating a StatefulSet with pod anti-affinity on nodes")
  342. ss := createStatefulSet(config, ssReplicas, volsPerNode, true, false)
  343. validateStatefulSet(config, ss, true)
  344. })
  345. ginkgo.It("should use volumes on one node when pod has affinity", func() {
  346. ginkgo.By("Creating a StatefulSet with pod affinity on nodes")
  347. ss := createStatefulSet(config, ssReplicas, volsPerNode/ssReplicas, false, false)
  348. validateStatefulSet(config, ss, false)
  349. })
  350. ginkgo.It("should use volumes spread across nodes when pod management is parallel and pod has anti-affinity", func() {
  351. if len(config.nodes) < ssReplicas {
  352. e2eskipper.Skipf("Runs only when number of nodes >= %v", ssReplicas)
  353. }
  354. ginkgo.By("Creating a StatefulSet with pod anti-affinity on nodes")
  355. ss := createStatefulSet(config, ssReplicas, 1, true, true)
  356. validateStatefulSet(config, ss, true)
  357. })
  358. ginkgo.It("should use volumes on one node when pod management is parallel and pod has affinity", func() {
  359. ginkgo.By("Creating a StatefulSet with pod affinity on nodes")
  360. ss := createStatefulSet(config, ssReplicas, 1, false, true)
  361. validateStatefulSet(config, ss, false)
  362. })
  363. })
  364. ginkgo.Context("Stress with local volumes [Serial]", func() {
  365. var (
  366. allLocalVolumes = make(map[string][]*localTestVolume)
  367. volType = TmpfsLocalVolumeType
  368. stopCh = make(chan struct{})
  369. wg sync.WaitGroup
  370. )
  371. const (
  372. volsPerNode = 10 // Make this non-divisable by volsPerPod to increase changes of partial binding failure
  373. volsPerPod = 3
  374. podsFactor = 4
  375. )
  376. ginkgo.BeforeEach(func() {
  377. setupStorageClass(config, &waitMode)
  378. for i, node := range config.nodes {
  379. ginkgo.By(fmt.Sprintf("Setting up %d local volumes on node %q", volsPerNode, node.Name))
  380. allLocalVolumes[node.Name] = setupLocalVolumes(config, volType, &config.nodes[i], volsPerNode)
  381. }
  382. ginkgo.By(fmt.Sprintf("Create %d PVs", volsPerNode*len(config.nodes)))
  383. var err error
  384. for _, localVolumes := range allLocalVolumes {
  385. for _, localVolume := range localVolumes {
  386. pvConfig := makeLocalPVConfig(config, localVolume)
  387. localVolume.pv, err = e2epv.CreatePV(config.client, e2epv.MakePersistentVolume(pvConfig))
  388. framework.ExpectNoError(err)
  389. }
  390. }
  391. ginkgo.By("Start a goroutine to recycle unbound PVs")
  392. wg.Add(1)
  393. go func() {
  394. defer wg.Done()
  395. w, err := config.client.CoreV1().PersistentVolumes().Watch(context.TODO(), metav1.ListOptions{})
  396. framework.ExpectNoError(err)
  397. if w == nil {
  398. return
  399. }
  400. defer w.Stop()
  401. for {
  402. select {
  403. case event := <-w.ResultChan():
  404. if event.Type != watch.Modified {
  405. continue
  406. }
  407. pv, ok := event.Object.(*v1.PersistentVolume)
  408. if !ok {
  409. continue
  410. }
  411. if pv.Status.Phase == v1.VolumeBound || pv.Status.Phase == v1.VolumeAvailable {
  412. continue
  413. }
  414. pv, err = config.client.CoreV1().PersistentVolumes().Get(context.TODO(), pv.Name, metav1.GetOptions{})
  415. if apierrors.IsNotFound(err) {
  416. continue
  417. }
  418. // Delete and create a new PV for same local volume storage
  419. ginkgo.By(fmt.Sprintf("Delete %q and create a new PV for same local volume storage", pv.Name))
  420. for _, localVolumes := range allLocalVolumes {
  421. for _, localVolume := range localVolumes {
  422. if localVolume.pv.Name != pv.Name {
  423. continue
  424. }
  425. err = config.client.CoreV1().PersistentVolumes().Delete(context.TODO(), pv.Name, &metav1.DeleteOptions{})
  426. framework.ExpectNoError(err)
  427. pvConfig := makeLocalPVConfig(config, localVolume)
  428. localVolume.pv, err = e2epv.CreatePV(config.client, e2epv.MakePersistentVolume(pvConfig))
  429. framework.ExpectNoError(err)
  430. }
  431. }
  432. case <-stopCh:
  433. return
  434. }
  435. }
  436. }()
  437. })
  438. ginkgo.AfterEach(func() {
  439. ginkgo.By("Stop and wait for recycle goroutine to finish")
  440. close(stopCh)
  441. wg.Wait()
  442. ginkgo.By("Clean all PVs")
  443. for nodeName, localVolumes := range allLocalVolumes {
  444. ginkgo.By(fmt.Sprintf("Cleaning up %d local volumes on node %q", len(localVolumes), nodeName))
  445. cleanupLocalVolumes(config, localVolumes)
  446. }
  447. cleanupStorageClass(config)
  448. })
  449. ginkgo.It("should be able to process many pods and reuse local volumes", func() {
  450. var (
  451. podsLock sync.Mutex
  452. // Have one extra pod pending
  453. numConcurrentPods = volsPerNode/volsPerPod*len(config.nodes) + 1
  454. totalPods = numConcurrentPods * podsFactor
  455. numCreated = 0
  456. numFinished = 0
  457. pods = map[string]*v1.Pod{}
  458. )
  459. // Create pods gradually instead of all at once because scheduler has
  460. // exponential backoff
  461. ginkgo.By(fmt.Sprintf("Creating %v pods periodically", numConcurrentPods))
  462. stop := make(chan struct{})
  463. go wait.Until(func() {
  464. podsLock.Lock()
  465. defer podsLock.Unlock()
  466. if numCreated >= totalPods {
  467. // Created all the pods for the test
  468. return
  469. }
  470. if len(pods) > numConcurrentPods/2 {
  471. // Too many outstanding pods
  472. return
  473. }
  474. for i := 0; i < numConcurrentPods; i++ {
  475. pvcs := []*v1.PersistentVolumeClaim{}
  476. for j := 0; j < volsPerPod; j++ {
  477. pvc := e2epv.MakePersistentVolumeClaim(makeLocalPVCConfig(config, volType), config.ns)
  478. pvc, err := e2epv.CreatePVC(config.client, config.ns, pvc)
  479. framework.ExpectNoError(err)
  480. pvcs = append(pvcs, pvc)
  481. }
  482. pod := e2epod.MakeSecPod(config.ns, pvcs, nil, false, "sleep 1", false, false, selinuxLabel, nil)
  483. pod, err := config.client.CoreV1().Pods(config.ns).Create(context.TODO(), pod, metav1.CreateOptions{})
  484. framework.ExpectNoError(err)
  485. pods[pod.Name] = pod
  486. numCreated++
  487. }
  488. }, 2*time.Second, stop)
  489. defer func() {
  490. close(stop)
  491. podsLock.Lock()
  492. defer podsLock.Unlock()
  493. for _, pod := range pods {
  494. if err := deletePodAndPVCs(config, pod); err != nil {
  495. framework.Logf("Deleting pod %v failed: %v", pod.Name, err)
  496. }
  497. }
  498. }()
  499. ginkgo.By("Waiting for all pods to complete successfully")
  500. const completeTimeout = 5 * time.Minute
  501. waitErr := wait.PollImmediate(time.Second, completeTimeout, func() (done bool, err error) {
  502. podsList, err := config.client.CoreV1().Pods(config.ns).List(context.TODO(), metav1.ListOptions{})
  503. if err != nil {
  504. return false, err
  505. }
  506. podsLock.Lock()
  507. defer podsLock.Unlock()
  508. for _, pod := range podsList.Items {
  509. switch pod.Status.Phase {
  510. case v1.PodSucceeded:
  511. // Delete pod and its PVCs
  512. if err := deletePodAndPVCs(config, &pod); err != nil {
  513. return false, err
  514. }
  515. delete(pods, pod.Name)
  516. numFinished++
  517. framework.Logf("%v/%v pods finished", numFinished, totalPods)
  518. case v1.PodUnknown:
  519. return false, fmt.Errorf("pod %v is in %v phase", pod.Name, pod.Status.Phase)
  520. case v1.PodFailed:
  521. }
  522. }
  523. return numFinished == totalPods, nil
  524. })
  525. framework.ExpectNoError(waitErr, "some pods failed to complete within %v", completeTimeout)
  526. })
  527. })
  528. ginkgo.Context("Pods sharing a single local PV [Serial]", func() {
  529. var (
  530. pv *v1.PersistentVolume
  531. )
  532. ginkgo.BeforeEach(func() {
  533. localVolume := &localTestVolume{
  534. ltr: &utils.LocalTestResource{
  535. Node: config.node0,
  536. Path: "/tmp",
  537. },
  538. localVolumeType: DirectoryLocalVolumeType,
  539. }
  540. pvConfig := makeLocalPVConfig(config, localVolume)
  541. var err error
  542. pv, err = e2epv.CreatePV(config.client, e2epv.MakePersistentVolume(pvConfig))
  543. framework.ExpectNoError(err)
  544. })
  545. ginkgo.AfterEach(func() {
  546. if pv == nil {
  547. return
  548. }
  549. ginkgo.By(fmt.Sprintf("Clean PV %s", pv.Name))
  550. err := config.client.CoreV1().PersistentVolumes().Delete(context.TODO(), pv.Name, &metav1.DeleteOptions{})
  551. framework.ExpectNoError(err)
  552. })
  553. ginkgo.It("all pods should be running", func() {
  554. var (
  555. pvc *v1.PersistentVolumeClaim
  556. pods = map[string]*v1.Pod{}
  557. count = 50
  558. err error
  559. )
  560. pvc = e2epv.MakePersistentVolumeClaim(makeLocalPVCConfig(config, DirectoryLocalVolumeType), config.ns)
  561. ginkgo.By(fmt.Sprintf("Create a PVC %s", pvc.Name))
  562. pvc, err = e2epv.CreatePVC(config.client, config.ns, pvc)
  563. framework.ExpectNoError(err)
  564. ginkgo.By(fmt.Sprintf("Create %d pods to use this PVC", count))
  565. for i := 0; i < count; i++ {
  566. pod := e2epod.MakeSecPod(config.ns, []*v1.PersistentVolumeClaim{pvc}, nil, false, "", false, false, selinuxLabel, nil)
  567. pod, err := config.client.CoreV1().Pods(config.ns).Create(context.TODO(), pod, metav1.CreateOptions{})
  568. framework.ExpectNoError(err)
  569. pods[pod.Name] = pod
  570. }
  571. ginkgo.By("Wait for all pods are running")
  572. const runningTimeout = 5 * time.Minute
  573. waitErr := wait.PollImmediate(time.Second, runningTimeout, func() (done bool, err error) {
  574. podsList, err := config.client.CoreV1().Pods(config.ns).List(context.TODO(), metav1.ListOptions{})
  575. if err != nil {
  576. return false, err
  577. }
  578. runningPods := 0
  579. for _, pod := range podsList.Items {
  580. switch pod.Status.Phase {
  581. case v1.PodRunning:
  582. runningPods++
  583. }
  584. }
  585. return runningPods == count, nil
  586. })
  587. framework.ExpectNoError(waitErr, "Some pods are not running within %v", runningTimeout)
  588. })
  589. })
  590. })
  591. func deletePodAndPVCs(config *localTestConfig, pod *v1.Pod) error {
  592. framework.Logf("Deleting pod %v", pod.Name)
  593. if err := config.client.CoreV1().Pods(config.ns).Delete(context.TODO(), pod.Name, nil); err != nil {
  594. return err
  595. }
  596. // Delete PVCs
  597. for _, vol := range pod.Spec.Volumes {
  598. pvcSource := vol.VolumeSource.PersistentVolumeClaim
  599. if pvcSource != nil {
  600. if err := e2epv.DeletePersistentVolumeClaim(config.client, pvcSource.ClaimName, config.ns); err != nil {
  601. return err
  602. }
  603. }
  604. }
  605. return nil
  606. }
  607. type makeLocalPodWith func(config *localTestConfig, volume *localTestVolume, nodeName string) *v1.Pod
  608. func testPodWithNodeConflict(config *localTestConfig, testVolType localVolumeType, nodeName string, makeLocalPodFunc makeLocalPodWith, bindingMode storagev1.VolumeBindingMode) {
  609. ginkgo.By(fmt.Sprintf("local-volume-type: %s", testVolType))
  610. testVols := setupLocalVolumesPVCsPVs(config, testVolType, config.node0, 1, bindingMode)
  611. testVol := testVols[0]
  612. pod := makeLocalPodFunc(config, testVol, nodeName)
  613. pod, err := config.client.CoreV1().Pods(config.ns).Create(context.TODO(), pod, metav1.CreateOptions{})
  614. framework.ExpectNoError(err)
  615. err = e2epod.WaitForPodNameUnschedulableInNamespace(config.client, pod.Name, pod.Namespace)
  616. framework.ExpectNoError(err)
  617. }
  618. // The tests below are run against multiple mount point types
  619. // Test two pods at the same time, write from pod1, and read from pod2
  620. func twoPodsReadWriteTest(f *framework.Framework, config *localTestConfig, testVol *localTestVolume) {
  621. ginkgo.By("Creating pod1 to write to the PV")
  622. pod1, pod1Err := createLocalPod(config, testVol, nil)
  623. framework.ExpectNoError(pod1Err)
  624. verifyLocalPod(config, testVol, pod1, config.node0.Name)
  625. writeCmd := createWriteCmd(volumeDir, testFile, testFileContent, testVol.localVolumeType)
  626. ginkgo.By("Writing in pod1")
  627. podRWCmdExec(f, pod1, writeCmd)
  628. // testFileContent was written after creating pod1
  629. testReadFileContent(f, volumeDir, testFile, testFileContent, pod1, testVol.localVolumeType)
  630. ginkgo.By("Creating pod2 to read from the PV")
  631. pod2, pod2Err := createLocalPod(config, testVol, nil)
  632. framework.ExpectNoError(pod2Err)
  633. verifyLocalPod(config, testVol, pod2, config.node0.Name)
  634. // testFileContent was written after creating pod1
  635. testReadFileContent(f, volumeDir, testFile, testFileContent, pod2, testVol.localVolumeType)
  636. writeCmd = createWriteCmd(volumeDir, testFile, testVol.ltr.Path /*writeTestFileContent*/, testVol.localVolumeType)
  637. ginkgo.By("Writing in pod2")
  638. podRWCmdExec(f, pod2, writeCmd)
  639. ginkgo.By("Reading in pod1")
  640. testReadFileContent(f, volumeDir, testFile, testVol.ltr.Path, pod1, testVol.localVolumeType)
  641. ginkgo.By("Deleting pod1")
  642. e2epod.DeletePodOrFail(config.client, config.ns, pod1.Name)
  643. ginkgo.By("Deleting pod2")
  644. e2epod.DeletePodOrFail(config.client, config.ns, pod2.Name)
  645. }
  646. // Test two pods one after other, write from pod1, and read from pod2
  647. func twoPodsReadWriteSerialTest(f *framework.Framework, config *localTestConfig, testVol *localTestVolume) {
  648. ginkgo.By("Creating pod1")
  649. pod1, pod1Err := createLocalPod(config, testVol, nil)
  650. framework.ExpectNoError(pod1Err)
  651. verifyLocalPod(config, testVol, pod1, config.node0.Name)
  652. writeCmd := createWriteCmd(volumeDir, testFile, testFileContent, testVol.localVolumeType)
  653. ginkgo.By("Writing in pod1")
  654. podRWCmdExec(f, pod1, writeCmd)
  655. // testFileContent was written after creating pod1
  656. testReadFileContent(f, volumeDir, testFile, testFileContent, pod1, testVol.localVolumeType)
  657. ginkgo.By("Deleting pod1")
  658. e2epod.DeletePodOrFail(config.client, config.ns, pod1.Name)
  659. ginkgo.By("Creating pod2")
  660. pod2, pod2Err := createLocalPod(config, testVol, nil)
  661. framework.ExpectNoError(pod2Err)
  662. verifyLocalPod(config, testVol, pod2, config.node0.Name)
  663. ginkgo.By("Reading in pod2")
  664. testReadFileContent(f, volumeDir, testFile, testFileContent, pod2, testVol.localVolumeType)
  665. ginkgo.By("Deleting pod2")
  666. e2epod.DeletePodOrFail(config.client, config.ns, pod2.Name)
  667. }
  668. // Test creating pod with fsGroup, and check fsGroup is expected fsGroup.
  669. func createPodWithFsGroupTest(config *localTestConfig, testVol *localTestVolume, fsGroup int64, expectedFsGroup int64) *v1.Pod {
  670. pod, err := createLocalPod(config, testVol, &fsGroup)
  671. framework.ExpectNoError(err)
  672. _, err = framework.LookForStringInPodExec(config.ns, pod.Name, []string{"stat", "-c", "%g", volumeDir}, strconv.FormatInt(expectedFsGroup, 10), time.Second*3)
  673. framework.ExpectNoError(err, "failed to get expected fsGroup %d on directory %s in pod %s", fsGroup, volumeDir, pod.Name)
  674. return pod
  675. }
  676. func setupStorageClass(config *localTestConfig, mode *storagev1.VolumeBindingMode) {
  677. sc := &storagev1.StorageClass{
  678. ObjectMeta: metav1.ObjectMeta{
  679. Name: config.scName,
  680. },
  681. Provisioner: "kubernetes.io/no-provisioner",
  682. VolumeBindingMode: mode,
  683. }
  684. _, err := config.client.StorageV1().StorageClasses().Create(context.TODO(), sc, metav1.CreateOptions{})
  685. framework.ExpectNoError(err)
  686. }
  687. func cleanupStorageClass(config *localTestConfig) {
  688. framework.ExpectNoError(config.client.StorageV1().StorageClasses().Delete(context.TODO(), config.scName, nil))
  689. }
  690. // podNode wraps RunKubectl to get node where pod is running
  691. func podNodeName(config *localTestConfig, pod *v1.Pod) (string, error) {
  692. runtimePod, runtimePodErr := config.client.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{})
  693. return runtimePod.Spec.NodeName, runtimePodErr
  694. }
  695. // setupLocalVolumes sets up directories to use for local PV
  696. func setupLocalVolumes(config *localTestConfig, localVolumeType localVolumeType, node *v1.Node, count int) []*localTestVolume {
  697. vols := []*localTestVolume{}
  698. for i := 0; i < count; i++ {
  699. ltrType, ok := setupLocalVolumeMap[localVolumeType]
  700. framework.ExpectEqual(ok, true)
  701. ltr := config.ltrMgr.Create(node, ltrType, nil)
  702. vols = append(vols, &localTestVolume{
  703. ltr: ltr,
  704. localVolumeType: localVolumeType,
  705. })
  706. }
  707. return vols
  708. }
  709. func cleanupLocalPVCsPVs(config *localTestConfig, volumes []*localTestVolume) {
  710. for _, volume := range volumes {
  711. ginkgo.By("Cleaning up PVC and PV")
  712. errs := e2epv.PVPVCCleanup(config.client, config.ns, volume.pv, volume.pvc)
  713. if len(errs) > 0 {
  714. framework.Failf("Failed to delete PV and/or PVC: %v", utilerrors.NewAggregate(errs))
  715. }
  716. }
  717. }
  718. // Deletes the PVC/PV, and launches a pod with hostpath volume to remove the test directory
  719. func cleanupLocalVolumes(config *localTestConfig, volumes []*localTestVolume) {
  720. cleanupLocalPVCsPVs(config, volumes)
  721. for _, volume := range volumes {
  722. config.ltrMgr.Remove(volume.ltr)
  723. }
  724. }
  725. func verifyLocalVolume(config *localTestConfig, volume *localTestVolume) {
  726. framework.ExpectNoError(e2epv.WaitOnPVandPVC(config.client, config.ns, volume.pv, volume.pvc))
  727. }
  728. func verifyLocalPod(config *localTestConfig, volume *localTestVolume, pod *v1.Pod, expectedNodeName string) {
  729. podNodeName, err := podNodeName(config, pod)
  730. framework.ExpectNoError(err)
  731. framework.Logf("pod %q created on Node %q", pod.Name, podNodeName)
  732. framework.ExpectEqual(podNodeName, expectedNodeName)
  733. }
  734. func makeLocalPVCConfig(config *localTestConfig, volumeType localVolumeType) e2epv.PersistentVolumeClaimConfig {
  735. pvcConfig := e2epv.PersistentVolumeClaimConfig{
  736. AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
  737. StorageClassName: &config.scName,
  738. }
  739. if volumeType == BlockLocalVolumeType {
  740. pvcVolumeMode := v1.PersistentVolumeBlock
  741. pvcConfig.VolumeMode = &pvcVolumeMode
  742. }
  743. return pvcConfig
  744. }
  745. func makeLocalPVConfig(config *localTestConfig, volume *localTestVolume) e2epv.PersistentVolumeConfig {
  746. // TODO: hostname may not be the best option
  747. nodeKey := "kubernetes.io/hostname"
  748. if volume.ltr.Node.Labels == nil {
  749. framework.Failf("Node does not have labels")
  750. }
  751. nodeValue, found := volume.ltr.Node.Labels[nodeKey]
  752. if !found {
  753. framework.Failf("Node does not have required label %q", nodeKey)
  754. }
  755. pvConfig := e2epv.PersistentVolumeConfig{
  756. PVSource: v1.PersistentVolumeSource{
  757. Local: &v1.LocalVolumeSource{
  758. Path: volume.ltr.Path,
  759. },
  760. },
  761. NamePrefix: "local-pv",
  762. StorageClassName: config.scName,
  763. NodeAffinity: &v1.VolumeNodeAffinity{
  764. Required: &v1.NodeSelector{
  765. NodeSelectorTerms: []v1.NodeSelectorTerm{
  766. {
  767. MatchExpressions: []v1.NodeSelectorRequirement{
  768. {
  769. Key: nodeKey,
  770. Operator: v1.NodeSelectorOpIn,
  771. Values: []string{nodeValue},
  772. },
  773. },
  774. },
  775. },
  776. },
  777. },
  778. }
  779. if volume.localVolumeType == BlockLocalVolumeType {
  780. pvVolumeMode := v1.PersistentVolumeBlock
  781. pvConfig.VolumeMode = &pvVolumeMode
  782. }
  783. return pvConfig
  784. }
  785. // Creates a PVC and PV with prebinding
  786. func createLocalPVCsPVs(config *localTestConfig, volumes []*localTestVolume, mode storagev1.VolumeBindingMode) {
  787. var err error
  788. for _, volume := range volumes {
  789. pvcConfig := makeLocalPVCConfig(config, volume.localVolumeType)
  790. pvConfig := makeLocalPVConfig(config, volume)
  791. volume.pv, volume.pvc, err = e2epv.CreatePVPVC(config.client, pvConfig, pvcConfig, config.ns, false)
  792. framework.ExpectNoError(err)
  793. }
  794. if mode == storagev1.VolumeBindingImmediate {
  795. for _, volume := range volumes {
  796. verifyLocalVolume(config, volume)
  797. }
  798. } else {
  799. // Verify PVCs are not bound by waiting for phase==bound with a timeout and asserting that we hit the timeout.
  800. // There isn't really a great way to verify this without making the test be slow...
  801. const bindTimeout = 10 * time.Second
  802. waitErr := wait.PollImmediate(time.Second, bindTimeout, func() (done bool, err error) {
  803. for _, volume := range volumes {
  804. pvc, err := config.client.CoreV1().PersistentVolumeClaims(volume.pvc.Namespace).Get(context.TODO(), volume.pvc.Name, metav1.GetOptions{})
  805. if err != nil {
  806. return false, fmt.Errorf("failed to get PVC %s/%s: %v", volume.pvc.Namespace, volume.pvc.Name, err)
  807. }
  808. if pvc.Status.Phase != v1.ClaimPending {
  809. return true, nil
  810. }
  811. }
  812. return false, nil
  813. })
  814. if waitErr == wait.ErrWaitTimeout {
  815. framework.Logf("PVCs were not bound within %v (that's good)", bindTimeout)
  816. waitErr = nil
  817. }
  818. framework.ExpectNoError(waitErr, "Error making sure PVCs are not bound")
  819. }
  820. }
  821. func makeLocalPodWithNodeAffinity(config *localTestConfig, volume *localTestVolume, nodeName string) (pod *v1.Pod) {
  822. pod = e2epod.MakeSecPod(config.ns, []*v1.PersistentVolumeClaim{volume.pvc}, nil, false, "", false, false, selinuxLabel, nil)
  823. if pod == nil {
  824. return
  825. }
  826. affinity := &v1.Affinity{
  827. NodeAffinity: &v1.NodeAffinity{
  828. RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
  829. NodeSelectorTerms: []v1.NodeSelectorTerm{
  830. {
  831. MatchExpressions: []v1.NodeSelectorRequirement{
  832. {
  833. Key: "kubernetes.io/hostname",
  834. Operator: v1.NodeSelectorOpIn,
  835. Values: []string{nodeName},
  836. },
  837. },
  838. },
  839. },
  840. },
  841. },
  842. }
  843. pod.Spec.Affinity = affinity
  844. return
  845. }
  846. func makeLocalPodWithNodeSelector(config *localTestConfig, volume *localTestVolume, nodeName string) (pod *v1.Pod) {
  847. pod = e2epod.MakeSecPod(config.ns, []*v1.PersistentVolumeClaim{volume.pvc}, nil, false, "", false, false, selinuxLabel, nil)
  848. if pod == nil {
  849. return
  850. }
  851. ns := map[string]string{
  852. "kubernetes.io/hostname": nodeName,
  853. }
  854. pod.Spec.NodeSelector = ns
  855. return
  856. }
  857. func makeLocalPodWithNodeName(config *localTestConfig, volume *localTestVolume, nodeName string) (pod *v1.Pod) {
  858. pod = e2epod.MakeSecPod(config.ns, []*v1.PersistentVolumeClaim{volume.pvc}, nil, false, "", false, false, selinuxLabel, nil)
  859. if pod == nil {
  860. return
  861. }
  862. e2epod.SetNodeAffinity(pod, nodeName)
  863. return
  864. }
  865. func createLocalPod(config *localTestConfig, volume *localTestVolume, fsGroup *int64) (*v1.Pod, error) {
  866. ginkgo.By("Creating a pod")
  867. return e2epod.CreateSecPod(config.client, config.ns, []*v1.PersistentVolumeClaim{volume.pvc}, nil, false, "", false, false, selinuxLabel, fsGroup, framework.PodStartShortTimeout)
  868. }
  869. func createWriteCmd(testDir string, testFile string, writeTestFileContent string, volumeType localVolumeType) string {
  870. if volumeType == BlockLocalVolumeType {
  871. // testDir is the block device.
  872. testFileDir := filepath.Join("/tmp", testDir)
  873. testFilePath := filepath.Join(testFileDir, testFile)
  874. // Create a file containing the testFileContent.
  875. writeTestFileCmd := fmt.Sprintf("mkdir -p %s; echo %s > %s", testFileDir, writeTestFileContent, testFilePath)
  876. // sudo is needed when using ssh exec to node.
  877. // sudo is not needed and does not exist in some containers (e.g. busybox), when using pod exec.
  878. sudoCmd := fmt.Sprintf("SUDO_CMD=$(which sudo); echo ${SUDO_CMD}")
  879. // Write the testFileContent into the block device.
  880. writeBlockCmd := fmt.Sprintf("${SUDO_CMD} dd if=%s of=%s bs=512 count=100", testFilePath, testDir)
  881. // Cleanup the file containing testFileContent.
  882. deleteTestFileCmd := fmt.Sprintf("rm %s", testFilePath)
  883. return fmt.Sprintf("%s && %s && %s && %s", writeTestFileCmd, sudoCmd, writeBlockCmd, deleteTestFileCmd)
  884. }
  885. testFilePath := filepath.Join(testDir, testFile)
  886. return fmt.Sprintf("mkdir -p %s; echo %s > %s", testDir, writeTestFileContent, testFilePath)
  887. }
  888. func createReadCmd(testFileDir string, testFile string, volumeType localVolumeType) string {
  889. if volumeType == BlockLocalVolumeType {
  890. // Create the command to read the beginning of the block device and print it in ascii.
  891. return fmt.Sprintf("hexdump -n 100 -e '100 \"%%_p\"' %s | head -1", testFileDir)
  892. }
  893. // Create the command to read (aka cat) a file.
  894. testFilePath := filepath.Join(testFileDir, testFile)
  895. return fmt.Sprintf("cat %s", testFilePath)
  896. }
  897. // Read testFile and evaluate whether it contains the testFileContent
  898. func testReadFileContent(f *framework.Framework, testFileDir string, testFile string, testFileContent string, pod *v1.Pod, volumeType localVolumeType) {
  899. readCmd := createReadCmd(testFileDir, testFile, volumeType)
  900. readOut := podRWCmdExec(f, pod, readCmd)
  901. gomega.Expect(readOut).To(gomega.ContainSubstring(testFileContent))
  902. }
  903. // Execute a read or write command in a pod.
  904. // Fail on error
  905. func podRWCmdExec(f *framework.Framework, pod *v1.Pod, cmd string) string {
  906. out, err := utils.PodExec(f, pod, cmd)
  907. framework.Logf("podRWCmdExec out: %q err: %v", out, err)
  908. framework.ExpectNoError(err)
  909. return out
  910. }
  911. // Initialize test volume on node
  912. // and create local PVC and PV
  913. func setupLocalVolumesPVCsPVs(
  914. config *localTestConfig,
  915. localVolumeType localVolumeType,
  916. node *v1.Node,
  917. count int,
  918. mode storagev1.VolumeBindingMode) []*localTestVolume {
  919. ginkgo.By("Initializing test volumes")
  920. testVols := setupLocalVolumes(config, localVolumeType, node, count)
  921. ginkgo.By("Creating local PVCs and PVs")
  922. createLocalPVCsPVs(config, testVols, mode)
  923. return testVols
  924. }
  925. // newLocalClaim creates a new persistent volume claim.
  926. func newLocalClaimWithName(config *localTestConfig, name string) *v1.PersistentVolumeClaim {
  927. claim := v1.PersistentVolumeClaim{
  928. ObjectMeta: metav1.ObjectMeta{
  929. Name: name,
  930. Namespace: config.ns,
  931. },
  932. Spec: v1.PersistentVolumeClaimSpec{
  933. StorageClassName: &config.scName,
  934. AccessModes: []v1.PersistentVolumeAccessMode{
  935. v1.ReadWriteOnce,
  936. },
  937. Resources: v1.ResourceRequirements{
  938. Requests: v1.ResourceList{
  939. v1.ResourceName(v1.ResourceStorage): resource.MustParse(testRequestSize),
  940. },
  941. },
  942. },
  943. }
  944. return &claim
  945. }
  946. func createStatefulSet(config *localTestConfig, ssReplicas int32, volumeCount int, anti, parallel bool) *appsv1.StatefulSet {
  947. mounts := []v1.VolumeMount{}
  948. claims := []v1.PersistentVolumeClaim{}
  949. for i := 0; i < volumeCount; i++ {
  950. name := fmt.Sprintf("vol%v", i+1)
  951. pvc := newLocalClaimWithName(config, name)
  952. mounts = append(mounts, v1.VolumeMount{Name: name, MountPath: "/" + name})
  953. claims = append(claims, *pvc)
  954. }
  955. podAffinityTerms := []v1.PodAffinityTerm{
  956. {
  957. LabelSelector: &metav1.LabelSelector{
  958. MatchExpressions: []metav1.LabelSelectorRequirement{
  959. {
  960. Key: "app",
  961. Operator: metav1.LabelSelectorOpIn,
  962. Values: []string{"local-volume-test"},
  963. },
  964. },
  965. },
  966. TopologyKey: "kubernetes.io/hostname",
  967. },
  968. }
  969. affinity := v1.Affinity{}
  970. if anti {
  971. affinity.PodAntiAffinity = &v1.PodAntiAffinity{
  972. RequiredDuringSchedulingIgnoredDuringExecution: podAffinityTerms,
  973. }
  974. } else {
  975. affinity.PodAffinity = &v1.PodAffinity{
  976. RequiredDuringSchedulingIgnoredDuringExecution: podAffinityTerms,
  977. }
  978. }
  979. labels := map[string]string{"app": "local-volume-test"}
  980. spec := &appsv1.StatefulSet{
  981. ObjectMeta: metav1.ObjectMeta{
  982. Name: "local-volume-statefulset",
  983. Namespace: config.ns,
  984. },
  985. Spec: appsv1.StatefulSetSpec{
  986. Selector: &metav1.LabelSelector{
  987. MatchLabels: map[string]string{"app": "local-volume-test"},
  988. },
  989. Replicas: &ssReplicas,
  990. Template: v1.PodTemplateSpec{
  991. ObjectMeta: metav1.ObjectMeta{
  992. Labels: labels,
  993. },
  994. Spec: v1.PodSpec{
  995. Containers: []v1.Container{
  996. {
  997. Name: "nginx",
  998. Image: imageutils.GetE2EImage(imageutils.Nginx),
  999. VolumeMounts: mounts,
  1000. },
  1001. },
  1002. Affinity: &affinity,
  1003. },
  1004. },
  1005. VolumeClaimTemplates: claims,
  1006. ServiceName: "test-service",
  1007. },
  1008. }
  1009. if parallel {
  1010. spec.Spec.PodManagementPolicy = appsv1.ParallelPodManagement
  1011. }
  1012. ss, err := config.client.AppsV1().StatefulSets(config.ns).Create(context.TODO(), spec, metav1.CreateOptions{})
  1013. framework.ExpectNoError(err)
  1014. e2esset.WaitForRunningAndReady(config.client, ssReplicas, ss)
  1015. return ss
  1016. }
  1017. func validateStatefulSet(config *localTestConfig, ss *appsv1.StatefulSet, anti bool) {
  1018. pods := e2esset.GetPodList(config.client, ss)
  1019. nodes := sets.NewString()
  1020. for _, pod := range pods.Items {
  1021. nodes.Insert(pod.Spec.NodeName)
  1022. }
  1023. if anti {
  1024. // Verify that each pod is on a different node
  1025. framework.ExpectEqual(nodes.Len(), len(pods.Items))
  1026. } else {
  1027. // Verify that all pods are on same node.
  1028. framework.ExpectEqual(nodes.Len(), 1)
  1029. }
  1030. // Validate all PVCs are bound
  1031. for _, pod := range pods.Items {
  1032. for _, volume := range pod.Spec.Volumes {
  1033. pvcSource := volume.VolumeSource.PersistentVolumeClaim
  1034. if pvcSource != nil {
  1035. err := e2epv.WaitForPersistentVolumeClaimPhase(
  1036. v1.ClaimBound, config.client, config.ns, pvcSource.ClaimName, framework.Poll, time.Second)
  1037. framework.ExpectNoError(err)
  1038. }
  1039. }
  1040. }
  1041. }
  1042. // SkipUnlessLocalSSDExists takes in an ssdInterface (scsi/nvme) and a filesystemType (fs/block)
  1043. // and skips if a disk of that type does not exist on the node
  1044. func SkipUnlessLocalSSDExists(config *localTestConfig, ssdInterface, filesystemType string, node *v1.Node) {
  1045. ssdCmd := fmt.Sprintf("ls -1 /mnt/disks/by-uuid/google-local-ssds-%s-%s/ | wc -l", ssdInterface, filesystemType)
  1046. res, err := config.hostExec.Execute(ssdCmd, node)
  1047. utils.LogResult(res)
  1048. framework.ExpectNoError(err)
  1049. num, err := strconv.Atoi(strings.TrimSpace(res.Stdout))
  1050. framework.ExpectNoError(err)
  1051. if num < 1 {
  1052. e2eskipper.Skipf("Requires at least 1 %s %s localSSD ", ssdInterface, filesystemType)
  1053. }
  1054. }