reconciler_test.go 55 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package reconciler
  14. import (
  15. "fmt"
  16. "testing"
  17. "time"
  18. "github.com/stretchr/testify/assert"
  19. "k8s.io/klog"
  20. "k8s.io/utils/mount"
  21. v1 "k8s.io/api/core/v1"
  22. "k8s.io/apimachinery/pkg/api/resource"
  23. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  24. "k8s.io/apimachinery/pkg/runtime"
  25. k8stypes "k8s.io/apimachinery/pkg/types"
  26. "k8s.io/apimachinery/pkg/util/wait"
  27. utilfeature "k8s.io/apiserver/pkg/util/feature"
  28. "k8s.io/client-go/kubernetes/fake"
  29. core "k8s.io/client-go/testing"
  30. "k8s.io/client-go/tools/record"
  31. featuregatetesting "k8s.io/component-base/featuregate/testing"
  32. "k8s.io/kubernetes/pkg/features"
  33. "k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
  34. "k8s.io/kubernetes/pkg/volume"
  35. volumetesting "k8s.io/kubernetes/pkg/volume/testing"
  36. "k8s.io/kubernetes/pkg/volume/util"
  37. "k8s.io/kubernetes/pkg/volume/util/hostutil"
  38. "k8s.io/kubernetes/pkg/volume/util/operationexecutor"
  39. )
  40. const (
  41. // reconcilerLoopSleepDuration is the amount of time the reconciler loop
  42. // waits between successive executions
  43. reconcilerLoopSleepDuration time.Duration = 1 * time.Nanosecond
  44. // waitForAttachTimeout is the maximum amount of time a
  45. // operationexecutor.Mount call will wait for a volume to be attached.
  46. waitForAttachTimeout time.Duration = 1 * time.Second
  47. nodeName k8stypes.NodeName = k8stypes.NodeName("mynodename")
  48. kubeletPodsDir string = "fake-dir"
  49. testOperationBackOffDuration time.Duration = 100 * time.Millisecond
  50. reconcilerSyncWaitDuration time.Duration = 10 * time.Second
  51. )
  52. func hasAddedPods() bool { return true }
  53. // Calls Run()
  54. // Verifies there are no calls to attach, detach, mount, unmount, etc.
  55. func Test_Run_Positive_DoNothing(t *testing.T) {
  56. // Arrange
  57. volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
  58. dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
  59. asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
  60. kubeClient := createTestClient()
  61. fakeRecorder := &record.FakeRecorder{}
  62. fakeHandler := volumetesting.NewBlockVolumePathHandler()
  63. oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  64. kubeClient,
  65. volumePluginMgr,
  66. fakeRecorder,
  67. false, /* checkNodeCapabilitiesBeforeMount */
  68. fakeHandler,
  69. ))
  70. reconciler := NewReconciler(
  71. kubeClient,
  72. false, /* controllerAttachDetachEnabled */
  73. reconcilerLoopSleepDuration,
  74. waitForAttachTimeout,
  75. nodeName,
  76. dsw,
  77. asw,
  78. hasAddedPods,
  79. oex,
  80. mount.NewFakeMounter(nil),
  81. hostutil.NewFakeHostUtil(nil),
  82. volumePluginMgr,
  83. kubeletPodsDir)
  84. // Act
  85. runReconciler(reconciler)
  86. // Assert
  87. assert.NoError(t, volumetesting.VerifyZeroAttachCalls(fakePlugin))
  88. assert.NoError(t, volumetesting.VerifyZeroWaitForAttachCallCount(fakePlugin))
  89. assert.NoError(t, volumetesting.VerifyZeroMountDeviceCallCount(fakePlugin))
  90. assert.NoError(t, volumetesting.VerifyZeroSetUpCallCount(fakePlugin))
  91. assert.NoError(t, volumetesting.VerifyZeroTearDownCallCount(fakePlugin))
  92. assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
  93. }
  94. // Populates desiredStateOfWorld cache with one volume/pod.
  95. // Calls Run()
  96. // Verifies there is are attach/mount/etc calls and no detach/unmount calls.
  97. func Test_Run_Positive_VolumeAttachAndMount(t *testing.T) {
  98. // Arrange
  99. volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
  100. dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
  101. asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
  102. kubeClient := createTestClient()
  103. fakeRecorder := &record.FakeRecorder{}
  104. fakeHandler := volumetesting.NewBlockVolumePathHandler()
  105. oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  106. kubeClient,
  107. volumePluginMgr,
  108. fakeRecorder,
  109. false, /* checkNodeCapabilitiesBeforeMount */
  110. fakeHandler))
  111. reconciler := NewReconciler(
  112. kubeClient,
  113. false, /* controllerAttachDetachEnabled */
  114. reconcilerLoopSleepDuration,
  115. waitForAttachTimeout,
  116. nodeName,
  117. dsw,
  118. asw,
  119. hasAddedPods,
  120. oex,
  121. mount.NewFakeMounter(nil),
  122. hostutil.NewFakeHostUtil(nil),
  123. volumePluginMgr,
  124. kubeletPodsDir)
  125. pod := &v1.Pod{
  126. ObjectMeta: metav1.ObjectMeta{
  127. Name: "pod1",
  128. UID: "pod1uid",
  129. },
  130. Spec: v1.PodSpec{
  131. Volumes: []v1.Volume{
  132. {
  133. Name: "volume-name",
  134. VolumeSource: v1.VolumeSource{
  135. GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
  136. PDName: "fake-device1",
  137. },
  138. },
  139. },
  140. },
  141. },
  142. }
  143. volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
  144. podName := util.GetUniquePodName(pod)
  145. generatedVolumeName, err := dsw.AddPodToVolume(
  146. podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
  147. // Assert
  148. if err != nil {
  149. t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
  150. }
  151. // Act
  152. runReconciler(reconciler)
  153. waitForMount(t, fakePlugin, generatedVolumeName, asw)
  154. // Assert
  155. assert.NoError(t, volumetesting.VerifyAttachCallCount(
  156. 1 /* expectedAttachCallCount */, fakePlugin))
  157. assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount(
  158. 1 /* expectedWaitForAttachCallCount */, fakePlugin))
  159. assert.NoError(t, volumetesting.VerifyMountDeviceCallCount(
  160. 1 /* expectedMountDeviceCallCount */, fakePlugin))
  161. assert.NoError(t, volumetesting.VerifySetUpCallCount(
  162. 1 /* expectedSetUpCallCount */, fakePlugin))
  163. assert.NoError(t, volumetesting.VerifyZeroTearDownCallCount(fakePlugin))
  164. assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
  165. }
  166. // Populates desiredStateOfWorld cache with one volume/pod.
  167. // Enables controllerAttachDetachEnabled.
  168. // Calls Run()
  169. // Verifies there is one mount call and no unmount calls.
  170. // Verifies there are no attach/detach calls.
  171. func Test_Run_Positive_VolumeMountControllerAttachEnabled(t *testing.T) {
  172. // Arrange
  173. volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
  174. dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
  175. asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
  176. kubeClient := createTestClient()
  177. fakeRecorder := &record.FakeRecorder{}
  178. fakeHandler := volumetesting.NewBlockVolumePathHandler()
  179. oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  180. kubeClient,
  181. volumePluginMgr,
  182. fakeRecorder,
  183. false, /* checkNodeCapabilitiesBeforeMount */
  184. fakeHandler))
  185. reconciler := NewReconciler(
  186. kubeClient,
  187. true, /* controllerAttachDetachEnabled */
  188. reconcilerLoopSleepDuration,
  189. waitForAttachTimeout,
  190. nodeName,
  191. dsw,
  192. asw,
  193. hasAddedPods,
  194. oex,
  195. mount.NewFakeMounter(nil),
  196. hostutil.NewFakeHostUtil(nil),
  197. volumePluginMgr,
  198. kubeletPodsDir)
  199. pod := &v1.Pod{
  200. ObjectMeta: metav1.ObjectMeta{
  201. Name: "pod1",
  202. UID: "pod1uid",
  203. },
  204. Spec: v1.PodSpec{
  205. Volumes: []v1.Volume{
  206. {
  207. Name: "volume-name",
  208. VolumeSource: v1.VolumeSource{
  209. GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
  210. PDName: "fake-device1",
  211. },
  212. },
  213. },
  214. },
  215. },
  216. }
  217. volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
  218. podName := util.GetUniquePodName(pod)
  219. generatedVolumeName, err := dsw.AddPodToVolume(
  220. podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
  221. dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeName})
  222. // Assert
  223. if err != nil {
  224. t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
  225. }
  226. // Act
  227. runReconciler(reconciler)
  228. waitForMount(t, fakePlugin, generatedVolumeName, asw)
  229. // Assert
  230. assert.NoError(t, volumetesting.VerifyZeroAttachCalls(fakePlugin))
  231. assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount(
  232. 1 /* expectedWaitForAttachCallCount */, fakePlugin))
  233. assert.NoError(t, volumetesting.VerifyMountDeviceCallCount(
  234. 1 /* expectedMountDeviceCallCount */, fakePlugin))
  235. assert.NoError(t, volumetesting.VerifySetUpCallCount(
  236. 1 /* expectedSetUpCallCount */, fakePlugin))
  237. assert.NoError(t, volumetesting.VerifyZeroTearDownCallCount(fakePlugin))
  238. assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
  239. }
  240. // Populates desiredStateOfWorld cache with one volume/pod.
  241. // Calls Run()
  242. // Verifies there is one attach/mount/etc call and no detach calls.
  243. // Deletes volume/pod from desired state of world.
  244. // Verifies detach/unmount calls are issued.
  245. func Test_Run_Positive_VolumeAttachMountUnmountDetach(t *testing.T) {
  246. // Arrange
  247. volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
  248. dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
  249. asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
  250. kubeClient := createTestClient()
  251. fakeRecorder := &record.FakeRecorder{}
  252. fakeHandler := volumetesting.NewBlockVolumePathHandler()
  253. oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  254. kubeClient,
  255. volumePluginMgr,
  256. fakeRecorder,
  257. false, /* checkNodeCapabilitiesBeforeMount */
  258. fakeHandler))
  259. reconciler := NewReconciler(
  260. kubeClient,
  261. false, /* controllerAttachDetachEnabled */
  262. reconcilerLoopSleepDuration,
  263. waitForAttachTimeout,
  264. nodeName,
  265. dsw,
  266. asw,
  267. hasAddedPods,
  268. oex,
  269. mount.NewFakeMounter(nil),
  270. hostutil.NewFakeHostUtil(nil),
  271. volumePluginMgr,
  272. kubeletPodsDir)
  273. pod := &v1.Pod{
  274. ObjectMeta: metav1.ObjectMeta{
  275. Name: "pod1",
  276. UID: "pod1uid",
  277. },
  278. Spec: v1.PodSpec{
  279. Volumes: []v1.Volume{
  280. {
  281. Name: "volume-name",
  282. VolumeSource: v1.VolumeSource{
  283. GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
  284. PDName: "fake-device1",
  285. },
  286. },
  287. },
  288. },
  289. },
  290. }
  291. volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
  292. podName := util.GetUniquePodName(pod)
  293. generatedVolumeName, err := dsw.AddPodToVolume(
  294. podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
  295. // Assert
  296. if err != nil {
  297. t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
  298. }
  299. // Act
  300. runReconciler(reconciler)
  301. waitForMount(t, fakePlugin, generatedVolumeName, asw)
  302. // Assert
  303. assert.NoError(t, volumetesting.VerifyAttachCallCount(
  304. 1 /* expectedAttachCallCount */, fakePlugin))
  305. assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount(
  306. 1 /* expectedWaitForAttachCallCount */, fakePlugin))
  307. assert.NoError(t, volumetesting.VerifyMountDeviceCallCount(
  308. 1 /* expectedMountDeviceCallCount */, fakePlugin))
  309. assert.NoError(t, volumetesting.VerifySetUpCallCount(
  310. 1 /* expectedSetUpCallCount */, fakePlugin))
  311. assert.NoError(t, volumetesting.VerifyZeroTearDownCallCount(fakePlugin))
  312. assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
  313. // Act
  314. dsw.DeletePodFromVolume(podName, generatedVolumeName)
  315. waitForDetach(t, generatedVolumeName, asw)
  316. // Assert
  317. assert.NoError(t, volumetesting.VerifyTearDownCallCount(
  318. 1 /* expectedTearDownCallCount */, fakePlugin))
  319. assert.NoError(t, volumetesting.VerifyDetachCallCount(
  320. 1 /* expectedDetachCallCount */, fakePlugin))
  321. }
  322. // Populates desiredStateOfWorld cache with one volume/pod.
  323. // Enables controllerAttachDetachEnabled.
  324. // Calls Run()
  325. // Verifies one mount call is made and no unmount calls.
  326. // Deletes volume/pod from desired state of world.
  327. // Verifies one unmount call is made.
  328. // Verifies there are no attach/detach calls made.
  329. func Test_Run_Positive_VolumeUnmountControllerAttachEnabled(t *testing.T) {
  330. // Arrange
  331. volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
  332. dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
  333. asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
  334. kubeClient := createTestClient()
  335. fakeRecorder := &record.FakeRecorder{}
  336. fakeHandler := volumetesting.NewBlockVolumePathHandler()
  337. oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  338. kubeClient,
  339. volumePluginMgr,
  340. fakeRecorder,
  341. false, /* checkNodeCapabilitiesBeforeMount */
  342. fakeHandler))
  343. reconciler := NewReconciler(
  344. kubeClient,
  345. true, /* controllerAttachDetachEnabled */
  346. reconcilerLoopSleepDuration,
  347. waitForAttachTimeout,
  348. nodeName,
  349. dsw,
  350. asw,
  351. hasAddedPods,
  352. oex,
  353. mount.NewFakeMounter(nil),
  354. hostutil.NewFakeHostUtil(nil),
  355. volumePluginMgr,
  356. kubeletPodsDir)
  357. pod := &v1.Pod{
  358. ObjectMeta: metav1.ObjectMeta{
  359. Name: "pod1",
  360. UID: "pod1uid",
  361. },
  362. Spec: v1.PodSpec{
  363. Volumes: []v1.Volume{
  364. {
  365. Name: "volume-name",
  366. VolumeSource: v1.VolumeSource{
  367. GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
  368. PDName: "fake-device1",
  369. },
  370. },
  371. },
  372. },
  373. },
  374. }
  375. volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
  376. podName := util.GetUniquePodName(pod)
  377. generatedVolumeName, err := dsw.AddPodToVolume(
  378. podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
  379. // Assert
  380. if err != nil {
  381. t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
  382. }
  383. // Act
  384. runReconciler(reconciler)
  385. dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeName})
  386. waitForMount(t, fakePlugin, generatedVolumeName, asw)
  387. // Assert
  388. assert.NoError(t, volumetesting.VerifyZeroAttachCalls(fakePlugin))
  389. assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount(
  390. 1 /* expectedWaitForAttachCallCount */, fakePlugin))
  391. assert.NoError(t, volumetesting.VerifyMountDeviceCallCount(
  392. 1 /* expectedMountDeviceCallCount */, fakePlugin))
  393. assert.NoError(t, volumetesting.VerifySetUpCallCount(
  394. 1 /* expectedSetUpCallCount */, fakePlugin))
  395. assert.NoError(t, volumetesting.VerifyZeroTearDownCallCount(fakePlugin))
  396. assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
  397. // Act
  398. dsw.DeletePodFromVolume(podName, generatedVolumeName)
  399. waitForDetach(t, generatedVolumeName, asw)
  400. // Assert
  401. assert.NoError(t, volumetesting.VerifyTearDownCallCount(
  402. 1 /* expectedTearDownCallCount */, fakePlugin))
  403. assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
  404. }
  405. // Populates desiredStateOfWorld cache with one volume/pod.
  406. // Calls Run()
  407. // Verifies there are attach/get map paths/setupDevice calls and
  408. // no detach/teardownDevice calls.
  409. func Test_Run_Positive_VolumeAttachAndMap(t *testing.T) {
  410. // Enable BlockVolume feature gate
  411. defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BlockVolume, true)()
  412. pod := &v1.Pod{
  413. ObjectMeta: metav1.ObjectMeta{
  414. Name: "pod1",
  415. UID: "pod1uid",
  416. Namespace: "ns",
  417. },
  418. Spec: v1.PodSpec{},
  419. }
  420. mode := v1.PersistentVolumeBlock
  421. gcepv := &v1.PersistentVolume{
  422. ObjectMeta: metav1.ObjectMeta{UID: "001", Name: "volume-name"},
  423. Spec: v1.PersistentVolumeSpec{
  424. Capacity: v1.ResourceList{v1.ResourceName(v1.ResourceStorage): resource.MustParse("10G")},
  425. PersistentVolumeSource: v1.PersistentVolumeSource{GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{PDName: "fake-device1"}},
  426. AccessModes: []v1.PersistentVolumeAccessMode{
  427. v1.ReadWriteOnce,
  428. v1.ReadOnlyMany,
  429. },
  430. VolumeMode: &mode,
  431. ClaimRef: &v1.ObjectReference{Namespace: "ns", Name: "pvc-volume-name"},
  432. },
  433. }
  434. gcepvc := &v1.PersistentVolumeClaim{
  435. ObjectMeta: metav1.ObjectMeta{UID: "pvc-001", Name: "pvc-volume-name", Namespace: "ns"},
  436. Spec: v1.PersistentVolumeClaimSpec{
  437. VolumeName: "volume-name",
  438. VolumeMode: &mode,
  439. },
  440. Status: v1.PersistentVolumeClaimStatus{
  441. Phase: v1.ClaimBound,
  442. Capacity: gcepv.Spec.Capacity,
  443. },
  444. }
  445. // Arrange
  446. volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
  447. dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
  448. asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
  449. kubeClient := createtestClientWithPVPVC(gcepv, gcepvc)
  450. fakeRecorder := &record.FakeRecorder{}
  451. fakeHandler := volumetesting.NewBlockVolumePathHandler()
  452. oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  453. kubeClient,
  454. volumePluginMgr,
  455. fakeRecorder,
  456. false, /* checkNodeCapabilitiesBeforeMount */
  457. fakeHandler))
  458. reconciler := NewReconciler(
  459. kubeClient,
  460. false, /* controllerAttachDetachEnabled */
  461. reconcilerLoopSleepDuration,
  462. waitForAttachTimeout,
  463. nodeName,
  464. dsw,
  465. asw,
  466. hasAddedPods,
  467. oex,
  468. mount.NewFakeMounter(nil),
  469. hostutil.NewFakeHostUtil(nil),
  470. volumePluginMgr,
  471. kubeletPodsDir)
  472. volumeSpec := &volume.Spec{
  473. PersistentVolume: gcepv,
  474. }
  475. podName := util.GetUniquePodName(pod)
  476. generatedVolumeName, err := dsw.AddPodToVolume(
  477. podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
  478. // Assert
  479. if err != nil {
  480. t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
  481. }
  482. // Act
  483. runReconciler(reconciler)
  484. waitForMount(t, fakePlugin, generatedVolumeName, asw)
  485. // Assert
  486. assert.NoError(t, volumetesting.VerifyAttachCallCount(
  487. 1 /* expectedAttachCallCount */, fakePlugin))
  488. assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount(
  489. 1 /* expectedWaitForAttachCallCount */, fakePlugin))
  490. assert.NoError(t, volumetesting.VerifyGetMapPodDeviceCallCount(
  491. 1 /* expectedGetMapDeviceCallCount */, fakePlugin))
  492. assert.NoError(t, volumetesting.VerifyZeroTearDownDeviceCallCount(fakePlugin))
  493. assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
  494. }
  495. // Populates desiredStateOfWorld cache with one volume/pod.
  496. // Enables controllerAttachDetachEnabled.
  497. // Calls Run()
  498. // Verifies there are two get map path calls, a setupDevice call
  499. // and no teardownDevice call.
  500. // Verifies there are no attach/detach calls.
  501. func Test_Run_Positive_BlockVolumeMapControllerAttachEnabled(t *testing.T) {
  502. // Enable BlockVolume feature gate
  503. defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BlockVolume, true)()
  504. pod := &v1.Pod{
  505. ObjectMeta: metav1.ObjectMeta{
  506. Name: "pod1",
  507. UID: "pod1uid",
  508. Namespace: "ns",
  509. },
  510. Spec: v1.PodSpec{},
  511. }
  512. mode := v1.PersistentVolumeBlock
  513. gcepv := &v1.PersistentVolume{
  514. ObjectMeta: metav1.ObjectMeta{UID: "001", Name: "volume-name"},
  515. Spec: v1.PersistentVolumeSpec{
  516. Capacity: v1.ResourceList{v1.ResourceName(v1.ResourceStorage): resource.MustParse("10G")},
  517. PersistentVolumeSource: v1.PersistentVolumeSource{GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{PDName: "fake-device1"}},
  518. AccessModes: []v1.PersistentVolumeAccessMode{
  519. v1.ReadWriteOnce,
  520. v1.ReadOnlyMany,
  521. },
  522. VolumeMode: &mode,
  523. ClaimRef: &v1.ObjectReference{Namespace: "ns", Name: "pvc-volume-name"},
  524. },
  525. }
  526. gcepvc := &v1.PersistentVolumeClaim{
  527. ObjectMeta: metav1.ObjectMeta{UID: "pvc-001", Name: "pvc-volume-name", Namespace: "ns"},
  528. Spec: v1.PersistentVolumeClaimSpec{
  529. VolumeName: "volume-name",
  530. VolumeMode: &mode,
  531. },
  532. Status: v1.PersistentVolumeClaimStatus{
  533. Phase: v1.ClaimBound,
  534. Capacity: gcepv.Spec.Capacity,
  535. },
  536. }
  537. volumeSpec := &volume.Spec{
  538. PersistentVolume: gcepv,
  539. }
  540. // Arrange
  541. volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
  542. dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
  543. asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
  544. kubeClient := createtestClientWithPVPVC(gcepv, gcepvc, v1.AttachedVolume{
  545. Name: "fake-plugin/fake-device1",
  546. DevicePath: "/fake/path",
  547. })
  548. fakeRecorder := &record.FakeRecorder{}
  549. fakeHandler := volumetesting.NewBlockVolumePathHandler()
  550. oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  551. kubeClient,
  552. volumePluginMgr,
  553. fakeRecorder,
  554. false, /* checkNodeCapabilitiesBeforeMount */
  555. fakeHandler))
  556. reconciler := NewReconciler(
  557. kubeClient,
  558. true, /* controllerAttachDetachEnabled */
  559. reconcilerLoopSleepDuration,
  560. waitForAttachTimeout,
  561. nodeName,
  562. dsw,
  563. asw,
  564. hasAddedPods,
  565. oex,
  566. mount.NewFakeMounter(nil),
  567. hostutil.NewFakeHostUtil(nil),
  568. volumePluginMgr,
  569. kubeletPodsDir)
  570. podName := util.GetUniquePodName(pod)
  571. generatedVolumeName, err := dsw.AddPodToVolume(
  572. podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
  573. dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeName})
  574. // Assert
  575. if err != nil {
  576. t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
  577. }
  578. // Act
  579. runReconciler(reconciler)
  580. waitForMount(t, fakePlugin, generatedVolumeName, asw)
  581. // Assert
  582. assert.NoError(t, volumetesting.VerifyZeroAttachCalls(fakePlugin))
  583. assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount(
  584. 1 /* expectedWaitForAttachCallCount */, fakePlugin))
  585. assert.NoError(t, volumetesting.VerifyGetMapPodDeviceCallCount(
  586. 1 /* expectedGetMapDeviceCallCount */, fakePlugin))
  587. assert.NoError(t, volumetesting.VerifyZeroTearDownDeviceCallCount(fakePlugin))
  588. assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
  589. }
  590. // Populates desiredStateOfWorld cache with one volume/pod.
  591. // Calls Run()
  592. // Verifies there is one attach call, two get map path calls,
  593. // setupDevice call and no detach calls.
  594. // Deletes volume/pod from desired state of world.
  595. // Verifies one detach/teardownDevice calls are issued.
  596. func Test_Run_Positive_BlockVolumeAttachMapUnmapDetach(t *testing.T) {
  597. // Enable BlockVolume feature gate
  598. defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BlockVolume, true)()
  599. pod := &v1.Pod{
  600. ObjectMeta: metav1.ObjectMeta{
  601. Name: "pod1",
  602. UID: "pod1uid",
  603. Namespace: "ns",
  604. },
  605. Spec: v1.PodSpec{},
  606. }
  607. mode := v1.PersistentVolumeBlock
  608. gcepv := &v1.PersistentVolume{
  609. ObjectMeta: metav1.ObjectMeta{UID: "001", Name: "volume-name"},
  610. Spec: v1.PersistentVolumeSpec{
  611. Capacity: v1.ResourceList{v1.ResourceName(v1.ResourceStorage): resource.MustParse("10G")},
  612. PersistentVolumeSource: v1.PersistentVolumeSource{GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{PDName: "fake-device1"}},
  613. AccessModes: []v1.PersistentVolumeAccessMode{
  614. v1.ReadWriteOnce,
  615. v1.ReadOnlyMany,
  616. },
  617. VolumeMode: &mode,
  618. ClaimRef: &v1.ObjectReference{Namespace: "ns", Name: "pvc-volume-name"},
  619. },
  620. }
  621. gcepvc := &v1.PersistentVolumeClaim{
  622. ObjectMeta: metav1.ObjectMeta{UID: "pvc-001", Name: "pvc-volume-name", Namespace: "ns"},
  623. Spec: v1.PersistentVolumeClaimSpec{
  624. VolumeName: "volume-name",
  625. VolumeMode: &mode,
  626. },
  627. Status: v1.PersistentVolumeClaimStatus{
  628. Phase: v1.ClaimBound,
  629. Capacity: gcepv.Spec.Capacity,
  630. },
  631. }
  632. volumeSpec := &volume.Spec{
  633. PersistentVolume: gcepv,
  634. }
  635. // Arrange
  636. volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
  637. dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
  638. asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
  639. kubeClient := createtestClientWithPVPVC(gcepv, gcepvc)
  640. fakeRecorder := &record.FakeRecorder{}
  641. fakeHandler := volumetesting.NewBlockVolumePathHandler()
  642. oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  643. kubeClient,
  644. volumePluginMgr,
  645. fakeRecorder,
  646. false, /* checkNodeCapabilitiesBeforeMount */
  647. fakeHandler))
  648. reconciler := NewReconciler(
  649. kubeClient,
  650. false, /* controllerAttachDetachEnabled */
  651. reconcilerLoopSleepDuration,
  652. waitForAttachTimeout,
  653. nodeName,
  654. dsw,
  655. asw,
  656. hasAddedPods,
  657. oex,
  658. mount.NewFakeMounter(nil),
  659. hostutil.NewFakeHostUtil(nil),
  660. volumePluginMgr,
  661. kubeletPodsDir)
  662. podName := util.GetUniquePodName(pod)
  663. generatedVolumeName, err := dsw.AddPodToVolume(
  664. podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
  665. // Assert
  666. if err != nil {
  667. t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
  668. }
  669. // Act
  670. runReconciler(reconciler)
  671. waitForMount(t, fakePlugin, generatedVolumeName, asw)
  672. // Assert
  673. assert.NoError(t, volumetesting.VerifyAttachCallCount(
  674. 1 /* expectedAttachCallCount */, fakePlugin))
  675. assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount(
  676. 1 /* expectedWaitForAttachCallCount */, fakePlugin))
  677. assert.NoError(t, volumetesting.VerifyGetMapPodDeviceCallCount(
  678. 1 /* expectedGetMapDeviceCallCount */, fakePlugin))
  679. assert.NoError(t, volumetesting.VerifyZeroTearDownDeviceCallCount(fakePlugin))
  680. assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
  681. // Act
  682. dsw.DeletePodFromVolume(podName, generatedVolumeName)
  683. waitForDetach(t, generatedVolumeName, asw)
  684. // Assert
  685. assert.NoError(t, volumetesting.VerifyTearDownDeviceCallCount(
  686. 1 /* expectedTearDownDeviceCallCount */, fakePlugin))
  687. assert.NoError(t, volumetesting.VerifyDetachCallCount(
  688. 1 /* expectedDetachCallCount */, fakePlugin))
  689. }
  690. // Populates desiredStateOfWorld cache with one volume/pod.
  691. // Enables controllerAttachDetachEnabled.
  692. // Calls Run()
  693. // Verifies two map path calls are made and no teardownDevice/detach calls.
  694. // Deletes volume/pod from desired state of world.
  695. // Verifies one teardownDevice call is made.
  696. // Verifies there are no attach/detach calls made.
  697. func Test_Run_Positive_VolumeUnmapControllerAttachEnabled(t *testing.T) {
  698. // Enable BlockVolume feature gate
  699. defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BlockVolume, true)()
  700. pod := &v1.Pod{
  701. ObjectMeta: metav1.ObjectMeta{
  702. Name: "pod1",
  703. UID: "pod1uid",
  704. Namespace: "ns",
  705. },
  706. Spec: v1.PodSpec{},
  707. }
  708. mode := v1.PersistentVolumeBlock
  709. gcepv := &v1.PersistentVolume{
  710. ObjectMeta: metav1.ObjectMeta{UID: "001", Name: "volume-name"},
  711. Spec: v1.PersistentVolumeSpec{
  712. Capacity: v1.ResourceList{v1.ResourceName(v1.ResourceStorage): resource.MustParse("10G")},
  713. PersistentVolumeSource: v1.PersistentVolumeSource{GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{PDName: "fake-device1"}},
  714. AccessModes: []v1.PersistentVolumeAccessMode{
  715. v1.ReadWriteOnce,
  716. v1.ReadOnlyMany,
  717. },
  718. VolumeMode: &mode,
  719. ClaimRef: &v1.ObjectReference{Namespace: "ns", Name: "pvc-volume-name"},
  720. },
  721. }
  722. gcepvc := &v1.PersistentVolumeClaim{
  723. ObjectMeta: metav1.ObjectMeta{UID: "pvc-001", Name: "pvc-volume-name", Namespace: "ns"},
  724. Spec: v1.PersistentVolumeClaimSpec{
  725. VolumeName: "volume-name",
  726. VolumeMode: &mode,
  727. },
  728. Status: v1.PersistentVolumeClaimStatus{
  729. Phase: v1.ClaimBound,
  730. Capacity: gcepv.Spec.Capacity,
  731. },
  732. }
  733. volumeSpec := &volume.Spec{
  734. PersistentVolume: gcepv,
  735. }
  736. // Arrange
  737. volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
  738. dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
  739. asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
  740. kubeClient := createtestClientWithPVPVC(gcepv, gcepvc, v1.AttachedVolume{
  741. Name: "fake-plugin/fake-device1",
  742. DevicePath: "/fake/path",
  743. })
  744. fakeRecorder := &record.FakeRecorder{}
  745. fakeHandler := volumetesting.NewBlockVolumePathHandler()
  746. oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  747. kubeClient,
  748. volumePluginMgr,
  749. fakeRecorder,
  750. false, /* checkNodeCapabilitiesBeforeMount */
  751. fakeHandler))
  752. reconciler := NewReconciler(
  753. kubeClient,
  754. true, /* controllerAttachDetachEnabled */
  755. reconcilerLoopSleepDuration,
  756. waitForAttachTimeout,
  757. nodeName,
  758. dsw,
  759. asw,
  760. hasAddedPods,
  761. oex,
  762. mount.NewFakeMounter(nil),
  763. hostutil.NewFakeHostUtil(nil),
  764. volumePluginMgr,
  765. kubeletPodsDir)
  766. podName := util.GetUniquePodName(pod)
  767. generatedVolumeName, err := dsw.AddPodToVolume(
  768. podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
  769. // Assert
  770. if err != nil {
  771. t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
  772. }
  773. // Act
  774. runReconciler(reconciler)
  775. dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeName})
  776. waitForMount(t, fakePlugin, generatedVolumeName, asw)
  777. // Assert
  778. assert.NoError(t, volumetesting.VerifyZeroAttachCalls(fakePlugin))
  779. assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount(
  780. 1 /* expectedWaitForAttachCallCount */, fakePlugin))
  781. assert.NoError(t, volumetesting.VerifyGetMapPodDeviceCallCount(
  782. 1 /* expectedGetMapDeviceCallCount */, fakePlugin))
  783. assert.NoError(t, volumetesting.VerifyZeroTearDownDeviceCallCount(fakePlugin))
  784. assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
  785. // Act
  786. dsw.DeletePodFromVolume(podName, generatedVolumeName)
  787. waitForDetach(t, generatedVolumeName, asw)
  788. // Assert
  789. assert.NoError(t, volumetesting.VerifyTearDownDeviceCallCount(
  790. 1 /* expectedTearDownDeviceCallCount */, fakePlugin))
  791. assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
  792. }
  793. func Test_GenerateMapVolumeFunc_Plugin_Not_Found(t *testing.T) {
  794. testCases := map[string]struct {
  795. volumePlugins []volume.VolumePlugin
  796. expectErr bool
  797. expectedErrMsg string
  798. }{
  799. "volumePlugin is nil": {
  800. volumePlugins: []volume.VolumePlugin{},
  801. expectErr: true,
  802. expectedErrMsg: "MapVolume.FindMapperPluginBySpec failed",
  803. },
  804. "blockVolumePlugin is nil": {
  805. volumePlugins: volumetesting.NewFakeFileVolumePlugin(),
  806. expectErr: true,
  807. expectedErrMsg: "MapVolume.FindMapperPluginBySpec failed to find BlockVolumeMapper plugin. Volume plugin is nil.",
  808. },
  809. }
  810. // Enable BlockVolume feature gate
  811. defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BlockVolume, true)()
  812. for name, tc := range testCases {
  813. t.Run(name, func(t *testing.T) {
  814. volumePluginMgr := &volume.VolumePluginMgr{}
  815. volumePluginMgr.InitPlugins(tc.volumePlugins, nil, nil)
  816. asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
  817. oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  818. nil, /* kubeClient */
  819. volumePluginMgr,
  820. nil, /* fakeRecorder */
  821. false, /* checkNodeCapabilitiesBeforeMount */
  822. nil))
  823. pod := &v1.Pod{
  824. ObjectMeta: metav1.ObjectMeta{
  825. Name: "pod1",
  826. UID: "pod1uid",
  827. },
  828. Spec: v1.PodSpec{},
  829. }
  830. volumeMode := v1.PersistentVolumeBlock
  831. tmpSpec := &volume.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{VolumeMode: &volumeMode}}}
  832. volumeToMount := operationexecutor.VolumeToMount{
  833. Pod: pod,
  834. VolumeSpec: tmpSpec}
  835. err := oex.MountVolume(waitForAttachTimeout, volumeToMount, asw, false)
  836. // Assert
  837. if assert.Error(t, err) {
  838. assert.Contains(t, err.Error(), tc.expectedErrMsg)
  839. }
  840. })
  841. }
  842. }
  843. func Test_GenerateUnmapVolumeFunc_Plugin_Not_Found(t *testing.T) {
  844. testCases := map[string]struct {
  845. volumePlugins []volume.VolumePlugin
  846. expectErr bool
  847. expectedErrMsg string
  848. }{
  849. "volumePlugin is nil": {
  850. volumePlugins: []volume.VolumePlugin{},
  851. expectErr: true,
  852. expectedErrMsg: "UnmapVolume.FindMapperPluginByName failed",
  853. },
  854. "blockVolumePlugin is nil": {
  855. volumePlugins: volumetesting.NewFakeFileVolumePlugin(),
  856. expectErr: true,
  857. expectedErrMsg: "UnmapVolume.FindMapperPluginByName failed to find BlockVolumeMapper plugin. Volume plugin is nil.",
  858. },
  859. }
  860. // Enable BlockVolume feature gate
  861. defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BlockVolume, true)()
  862. for name, tc := range testCases {
  863. t.Run(name, func(t *testing.T) {
  864. volumePluginMgr := &volume.VolumePluginMgr{}
  865. volumePluginMgr.InitPlugins(tc.volumePlugins, nil, nil)
  866. asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
  867. oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  868. nil, /* kubeClient */
  869. volumePluginMgr,
  870. nil, /* fakeRecorder */
  871. false, /* checkNodeCapabilitiesBeforeMount */
  872. nil))
  873. volumeMode := v1.PersistentVolumeBlock
  874. tmpSpec := &volume.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{VolumeMode: &volumeMode}}}
  875. volumeToUnmount := operationexecutor.MountedVolume{
  876. PluginName: "fake-file-plugin",
  877. VolumeSpec: tmpSpec}
  878. err := oex.UnmountVolume(volumeToUnmount, asw, "" /* podsDir */)
  879. // Assert
  880. if assert.Error(t, err) {
  881. assert.Contains(t, err.Error(), tc.expectedErrMsg)
  882. }
  883. })
  884. }
  885. }
  886. func Test_GenerateUnmapDeviceFunc_Plugin_Not_Found(t *testing.T) {
  887. testCases := map[string]struct {
  888. volumePlugins []volume.VolumePlugin
  889. expectErr bool
  890. expectedErrMsg string
  891. }{
  892. "volumePlugin is nil": {
  893. volumePlugins: []volume.VolumePlugin{},
  894. expectErr: true,
  895. expectedErrMsg: "UnmapDevice.FindMapperPluginByName failed",
  896. },
  897. "blockVolumePlugin is nil": {
  898. volumePlugins: volumetesting.NewFakeFileVolumePlugin(),
  899. expectErr: true,
  900. expectedErrMsg: "UnmapDevice.FindMapperPluginByName failed to find BlockVolumeMapper plugin. Volume plugin is nil.",
  901. },
  902. }
  903. // Enable BlockVolume feature gate
  904. defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BlockVolume, true)()
  905. for name, tc := range testCases {
  906. t.Run(name, func(t *testing.T) {
  907. volumePluginMgr := &volume.VolumePluginMgr{}
  908. volumePluginMgr.InitPlugins(tc.volumePlugins, nil, nil)
  909. asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
  910. oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  911. nil, /* kubeClient */
  912. volumePluginMgr,
  913. nil, /* fakeRecorder */
  914. false, /* checkNodeCapabilitiesBeforeMount */
  915. nil))
  916. var hostutil hostutil.HostUtils
  917. volumeMode := v1.PersistentVolumeBlock
  918. tmpSpec := &volume.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{VolumeMode: &volumeMode}}}
  919. deviceToDetach := operationexecutor.AttachedVolume{VolumeSpec: tmpSpec, PluginName: "fake-file-plugin"}
  920. err := oex.UnmountDevice(deviceToDetach, asw, hostutil)
  921. // Assert
  922. if assert.Error(t, err) {
  923. assert.Contains(t, err.Error(), tc.expectedErrMsg)
  924. }
  925. })
  926. }
  927. }
  928. // Populates desiredStateOfWorld cache with one volume/pod.
  929. // Enables controllerAttachDetachEnabled.
  930. // Calls Run()
  931. // Wait for volume mounted.
  932. // Mark volume as fsResizeRequired in ASW.
  933. // Verifies volume's fsResizeRequired flag is cleared later.
  934. func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) {
  935. defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ExpandInUsePersistentVolumes, true)()
  936. blockMode := v1.PersistentVolumeBlock
  937. fsMode := v1.PersistentVolumeFilesystem
  938. var tests = []struct {
  939. name string
  940. volumeMode *v1.PersistentVolumeMode
  941. }{
  942. {
  943. name: "expand-fs-volume",
  944. volumeMode: &fsMode,
  945. },
  946. {
  947. name: "expand-raw-block",
  948. volumeMode: &blockMode,
  949. },
  950. }
  951. for _, tc := range tests {
  952. t.Run(tc.name, func(t *testing.T) {
  953. pv := &v1.PersistentVolume{
  954. ObjectMeta: metav1.ObjectMeta{
  955. Name: "pv",
  956. UID: "pvuid",
  957. },
  958. Spec: v1.PersistentVolumeSpec{
  959. ClaimRef: &v1.ObjectReference{Name: "pvc"},
  960. VolumeMode: tc.volumeMode,
  961. },
  962. }
  963. pvc := &v1.PersistentVolumeClaim{
  964. ObjectMeta: metav1.ObjectMeta{
  965. Name: "pvc",
  966. UID: "pvcuid",
  967. },
  968. Spec: v1.PersistentVolumeClaimSpec{
  969. VolumeName: "pv",
  970. VolumeMode: tc.volumeMode,
  971. },
  972. }
  973. pod := &v1.Pod{
  974. ObjectMeta: metav1.ObjectMeta{
  975. Name: "pod1",
  976. UID: "pod1uid",
  977. },
  978. Spec: v1.PodSpec{
  979. Volumes: []v1.Volume{
  980. {
  981. Name: "volume-name",
  982. VolumeSource: v1.VolumeSource{
  983. PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
  984. ClaimName: pvc.Name,
  985. },
  986. },
  987. },
  988. },
  989. },
  990. }
  991. volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
  992. dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
  993. asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
  994. kubeClient := createtestClientWithPVPVC(pv, pvc)
  995. fakeRecorder := &record.FakeRecorder{}
  996. fakeHandler := volumetesting.NewBlockVolumePathHandler()
  997. oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  998. kubeClient,
  999. volumePluginMgr,
  1000. fakeRecorder,
  1001. false, /* checkNodeCapabilitiesBeforeMount */
  1002. fakeHandler))
  1003. reconciler := NewReconciler(
  1004. kubeClient,
  1005. true, /* controllerAttachDetachEnabled */
  1006. reconcilerLoopSleepDuration,
  1007. waitForAttachTimeout,
  1008. nodeName,
  1009. dsw,
  1010. asw,
  1011. hasAddedPods,
  1012. oex,
  1013. mount.NewFakeMounter(nil),
  1014. hostutil.NewFakeHostUtil(nil),
  1015. volumePluginMgr,
  1016. kubeletPodsDir)
  1017. volumeSpec := &volume.Spec{PersistentVolume: pv}
  1018. podName := util.GetUniquePodName(pod)
  1019. volumeName, err := dsw.AddPodToVolume(
  1020. podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
  1021. // Assert
  1022. if err != nil {
  1023. t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
  1024. }
  1025. dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{volumeName})
  1026. // Start the reconciler to fill ASW.
  1027. stopChan, stoppedChan := make(chan struct{}), make(chan struct{})
  1028. go func() {
  1029. reconciler.Run(stopChan)
  1030. close(stoppedChan)
  1031. }()
  1032. waitForMount(t, fakePlugin, volumeName, asw)
  1033. // Stop the reconciler.
  1034. close(stopChan)
  1035. <-stoppedChan
  1036. // Mark volume as fsResizeRequired.
  1037. asw.MarkFSResizeRequired(volumeName, podName)
  1038. _, _, podExistErr := asw.PodExistsInVolume(podName, volumeName)
  1039. if !cache.IsFSResizeRequiredError(podExistErr) {
  1040. t.Fatalf("Volume should be marked as fsResizeRequired, but receive unexpected error: %v", podExistErr)
  1041. }
  1042. // Start the reconciler again, we hope reconciler will perform the
  1043. // resize operation and clear the fsResizeRequired flag for volume.
  1044. go reconciler.Run(wait.NeverStop)
  1045. waitErr := retryWithExponentialBackOff(testOperationBackOffDuration, func() (done bool, err error) {
  1046. mounted, _, err := asw.PodExistsInVolume(podName, volumeName)
  1047. return mounted && err == nil, nil
  1048. })
  1049. if waitErr != nil {
  1050. t.Fatal("Volume resize should succeeded")
  1051. }
  1052. })
  1053. }
  1054. }
  1055. func Test_UncertainDeviceGlobalMounts(t *testing.T) {
  1056. fsMode := v1.PersistentVolumeFilesystem
  1057. var tests = []struct {
  1058. name string
  1059. deviceState operationexecutor.DeviceMountState
  1060. unmountDeviceCallCount int
  1061. volumeName string
  1062. supportRemount bool
  1063. }{
  1064. {
  1065. name: "timed out operations should result in device marked as uncertain",
  1066. deviceState: operationexecutor.DeviceMountUncertain,
  1067. unmountDeviceCallCount: 1,
  1068. volumeName: volumetesting.TimeoutOnMountDeviceVolumeName,
  1069. },
  1070. {
  1071. name: "failed operation should result in not-mounted device",
  1072. deviceState: operationexecutor.DeviceNotMounted,
  1073. unmountDeviceCallCount: 0,
  1074. volumeName: volumetesting.FailMountDeviceVolumeName,
  1075. },
  1076. {
  1077. name: "timeout followed by failed operation should result in non-mounted device",
  1078. deviceState: operationexecutor.DeviceNotMounted,
  1079. unmountDeviceCallCount: 0,
  1080. volumeName: volumetesting.TimeoutAndFailOnMountDeviceVolumeName,
  1081. },
  1082. {
  1083. name: "success followed by timeout operation should result in mounted device",
  1084. deviceState: operationexecutor.DeviceGloballyMounted,
  1085. unmountDeviceCallCount: 1,
  1086. volumeName: volumetesting.SuccessAndTimeoutDeviceName,
  1087. supportRemount: true,
  1088. },
  1089. {
  1090. name: "success followed by failed operation should result in mounted device",
  1091. deviceState: operationexecutor.DeviceGloballyMounted,
  1092. unmountDeviceCallCount: 1,
  1093. volumeName: volumetesting.SuccessAndFailOnMountDeviceName,
  1094. supportRemount: true,
  1095. },
  1096. }
  1097. for _, tc := range tests {
  1098. t.Run(tc.name, func(t *testing.T) {
  1099. pv := &v1.PersistentVolume{
  1100. ObjectMeta: metav1.ObjectMeta{
  1101. Name: tc.volumeName,
  1102. UID: "pvuid",
  1103. },
  1104. Spec: v1.PersistentVolumeSpec{
  1105. ClaimRef: &v1.ObjectReference{Name: "pvc"},
  1106. VolumeMode: &fsMode,
  1107. },
  1108. }
  1109. pvc := &v1.PersistentVolumeClaim{
  1110. ObjectMeta: metav1.ObjectMeta{
  1111. Name: "pvc",
  1112. UID: "pvcuid",
  1113. },
  1114. Spec: v1.PersistentVolumeClaimSpec{
  1115. VolumeName: tc.volumeName,
  1116. },
  1117. }
  1118. pod := &v1.Pod{
  1119. ObjectMeta: metav1.ObjectMeta{
  1120. Name: "pod1",
  1121. UID: "pod1uid",
  1122. },
  1123. Spec: v1.PodSpec{
  1124. Volumes: []v1.Volume{
  1125. {
  1126. Name: "volume-name",
  1127. VolumeSource: v1.VolumeSource{
  1128. PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
  1129. ClaimName: pvc.Name,
  1130. },
  1131. },
  1132. },
  1133. },
  1134. },
  1135. }
  1136. volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
  1137. fakePlugin.SupportsRemount = tc.supportRemount
  1138. dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
  1139. asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
  1140. kubeClient := createtestClientWithPVPVC(pv, pvc, v1.AttachedVolume{
  1141. Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", tc.volumeName)),
  1142. DevicePath: "fake/path",
  1143. })
  1144. fakeRecorder := &record.FakeRecorder{}
  1145. fakeHandler := volumetesting.NewBlockVolumePathHandler()
  1146. oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  1147. kubeClient,
  1148. volumePluginMgr,
  1149. fakeRecorder,
  1150. false, /* checkNodeCapabilitiesBeforeMount */
  1151. fakeHandler))
  1152. reconciler := NewReconciler(
  1153. kubeClient,
  1154. true, /* controllerAttachDetachEnabled */
  1155. reconcilerLoopSleepDuration,
  1156. waitForAttachTimeout,
  1157. nodeName,
  1158. dsw,
  1159. asw,
  1160. hasAddedPods,
  1161. oex,
  1162. &mount.FakeMounter{},
  1163. hostutil.NewFakeHostUtil(nil),
  1164. volumePluginMgr,
  1165. kubeletPodsDir)
  1166. volumeSpec := &volume.Spec{PersistentVolume: pv}
  1167. podName := util.GetUniquePodName(pod)
  1168. volumeName, err := dsw.AddPodToVolume(
  1169. podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
  1170. // Assert
  1171. if err != nil {
  1172. t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
  1173. }
  1174. dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{volumeName})
  1175. // Start the reconciler to fill ASW.
  1176. stopChan, stoppedChan := make(chan struct{}), make(chan struct{})
  1177. go func() {
  1178. reconciler.Run(stopChan)
  1179. close(stoppedChan)
  1180. }()
  1181. waitForVolumeToExistInASW(t, volumeName, asw)
  1182. if tc.volumeName == volumetesting.TimeoutAndFailOnMountDeviceVolumeName {
  1183. // Wait upto 10s for reconciler to catchup
  1184. time.Sleep(reconcilerSyncWaitDuration)
  1185. }
  1186. if tc.volumeName == volumetesting.SuccessAndFailOnMountDeviceName ||
  1187. tc.volumeName == volumetesting.SuccessAndTimeoutDeviceName {
  1188. // wait for mount and then break it via remount
  1189. waitForMount(t, fakePlugin, volumeName, asw)
  1190. asw.MarkRemountRequired(podName)
  1191. time.Sleep(reconcilerSyncWaitDuration)
  1192. }
  1193. if tc.deviceState == operationexecutor.DeviceMountUncertain {
  1194. waitForUncertainGlobalMount(t, volumeName, asw)
  1195. }
  1196. if tc.deviceState == operationexecutor.DeviceGloballyMounted {
  1197. waitForMount(t, fakePlugin, volumeName, asw)
  1198. }
  1199. dsw.DeletePodFromVolume(podName, volumeName)
  1200. waitForDetach(t, volumeName, asw)
  1201. err = volumetesting.VerifyUnmountDeviceCallCount(tc.unmountDeviceCallCount, fakePlugin)
  1202. if err != nil {
  1203. t.Errorf("Error verifying UnMountDeviceCallCount: %v", err)
  1204. }
  1205. })
  1206. }
  1207. }
  1208. func Test_UncertainVolumeMountState(t *testing.T) {
  1209. fsMode := v1.PersistentVolumeFilesystem
  1210. var tests = []struct {
  1211. name string
  1212. volumeState operationexecutor.VolumeMountState
  1213. unmountDeviceCallCount int
  1214. unmountVolumeCount int
  1215. volumeName string
  1216. supportRemount bool
  1217. }{
  1218. {
  1219. name: "timed out operations should result in volume marked as uncertain",
  1220. volumeState: operationexecutor.VolumeMountUncertain,
  1221. unmountDeviceCallCount: 1,
  1222. unmountVolumeCount: 1,
  1223. volumeName: volumetesting.TimeoutOnSetupVolumeName,
  1224. },
  1225. {
  1226. name: "failed operation should result in not-mounted volume",
  1227. volumeState: operationexecutor.VolumeNotMounted,
  1228. unmountDeviceCallCount: 0,
  1229. unmountVolumeCount: 0,
  1230. volumeName: volumetesting.FailOnSetupVolumeName,
  1231. },
  1232. {
  1233. name: "timeout followed by failed operation should result in non-mounted volume",
  1234. volumeState: operationexecutor.VolumeNotMounted,
  1235. unmountDeviceCallCount: 0,
  1236. unmountVolumeCount: 0,
  1237. volumeName: volumetesting.TimeoutAndFailOnSetupVolumeName,
  1238. },
  1239. {
  1240. name: "success followed by timeout operation should result in mounted volume",
  1241. volumeState: operationexecutor.VolumeMounted,
  1242. unmountDeviceCallCount: 1,
  1243. unmountVolumeCount: 1,
  1244. volumeName: volumetesting.SuccessAndTimeoutSetupVolumeName,
  1245. supportRemount: true,
  1246. },
  1247. {
  1248. name: "success followed by failed operation should result in mounted volume",
  1249. volumeState: operationexecutor.VolumeMounted,
  1250. unmountDeviceCallCount: 1,
  1251. unmountVolumeCount: 1,
  1252. volumeName: volumetesting.SuccessAndFailOnSetupVolumeName,
  1253. supportRemount: true,
  1254. },
  1255. }
  1256. for _, tc := range tests {
  1257. t.Run(tc.name, func(t *testing.T) {
  1258. pv := &v1.PersistentVolume{
  1259. ObjectMeta: metav1.ObjectMeta{
  1260. Name: tc.volumeName,
  1261. UID: "pvuid",
  1262. },
  1263. Spec: v1.PersistentVolumeSpec{
  1264. ClaimRef: &v1.ObjectReference{Name: "pvc"},
  1265. VolumeMode: &fsMode,
  1266. },
  1267. }
  1268. pvc := &v1.PersistentVolumeClaim{
  1269. ObjectMeta: metav1.ObjectMeta{
  1270. Name: "pvc",
  1271. UID: "pvcuid",
  1272. },
  1273. Spec: v1.PersistentVolumeClaimSpec{
  1274. VolumeName: tc.volumeName,
  1275. },
  1276. }
  1277. pod := &v1.Pod{
  1278. ObjectMeta: metav1.ObjectMeta{
  1279. Name: "pod1",
  1280. UID: "pod1uid",
  1281. },
  1282. Spec: v1.PodSpec{
  1283. Volumes: []v1.Volume{
  1284. {
  1285. Name: "volume-name",
  1286. VolumeSource: v1.VolumeSource{
  1287. PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
  1288. ClaimName: pvc.Name,
  1289. },
  1290. },
  1291. },
  1292. },
  1293. },
  1294. }
  1295. volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
  1296. fakePlugin.SupportsRemount = tc.supportRemount
  1297. dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
  1298. asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
  1299. kubeClient := createtestClientWithPVPVC(pv, pvc, v1.AttachedVolume{
  1300. Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", tc.volumeName)),
  1301. DevicePath: "fake/path",
  1302. })
  1303. fakeRecorder := &record.FakeRecorder{}
  1304. fakeHandler := volumetesting.NewBlockVolumePathHandler()
  1305. oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  1306. kubeClient,
  1307. volumePluginMgr,
  1308. fakeRecorder,
  1309. false, /* checkNodeCapabilitiesBeforeMount */
  1310. fakeHandler))
  1311. reconciler := NewReconciler(
  1312. kubeClient,
  1313. true, /* controllerAttachDetachEnabled */
  1314. reconcilerLoopSleepDuration,
  1315. waitForAttachTimeout,
  1316. nodeName,
  1317. dsw,
  1318. asw,
  1319. hasAddedPods,
  1320. oex,
  1321. &mount.FakeMounter{},
  1322. hostutil.NewFakeHostUtil(nil),
  1323. volumePluginMgr,
  1324. kubeletPodsDir)
  1325. volumeSpec := &volume.Spec{PersistentVolume: pv}
  1326. podName := util.GetUniquePodName(pod)
  1327. volumeName, err := dsw.AddPodToVolume(
  1328. podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
  1329. // Assert
  1330. if err != nil {
  1331. t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
  1332. }
  1333. dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{volumeName})
  1334. // Start the reconciler to fill ASW.
  1335. stopChan, stoppedChan := make(chan struct{}), make(chan struct{})
  1336. go func() {
  1337. reconciler.Run(stopChan)
  1338. close(stoppedChan)
  1339. }()
  1340. waitForVolumeToExistInASW(t, volumeName, asw)
  1341. if tc.volumeName == volumetesting.TimeoutAndFailOnSetupVolumeName {
  1342. // Wait upto 10s for reconciler to catchup
  1343. time.Sleep(reconcilerSyncWaitDuration)
  1344. }
  1345. if tc.volumeName == volumetesting.SuccessAndFailOnSetupVolumeName ||
  1346. tc.volumeName == volumetesting.SuccessAndTimeoutSetupVolumeName {
  1347. // wait for mount and then break it via remount
  1348. waitForMount(t, fakePlugin, volumeName, asw)
  1349. asw.MarkRemountRequired(podName)
  1350. time.Sleep(reconcilerSyncWaitDuration)
  1351. }
  1352. if tc.volumeState == operationexecutor.VolumeMountUncertain {
  1353. waitForUncertainPodMount(t, volumeName, asw)
  1354. }
  1355. if tc.volumeState == operationexecutor.VolumeMounted {
  1356. waitForMount(t, fakePlugin, volumeName, asw)
  1357. }
  1358. dsw.DeletePodFromVolume(podName, volumeName)
  1359. waitForDetach(t, volumeName, asw)
  1360. volumetesting.VerifyUnmountDeviceCallCount(tc.unmountDeviceCallCount, fakePlugin)
  1361. volumetesting.VerifyTearDownCallCount(tc.unmountVolumeCount, fakePlugin)
  1362. })
  1363. }
  1364. }
  1365. func waitForUncertainGlobalMount(t *testing.T, volumeName v1.UniqueVolumeName, asw cache.ActualStateOfWorld) {
  1366. // check if volume is globally mounted in uncertain state
  1367. err := retryWithExponentialBackOff(
  1368. testOperationBackOffDuration,
  1369. func() (bool, error) {
  1370. unmountedVolumes := asw.GetUnmountedVolumes()
  1371. for _, v := range unmountedVolumes {
  1372. if v.VolumeName == volumeName && v.DeviceMountState == operationexecutor.DeviceMountUncertain {
  1373. return true, nil
  1374. }
  1375. }
  1376. return false, nil
  1377. },
  1378. )
  1379. if err != nil {
  1380. t.Fatalf("expected volumes %s to be mounted in uncertain state globally", volumeName)
  1381. }
  1382. }
  1383. func waitForUncertainPodMount(t *testing.T, volumeName v1.UniqueVolumeName, asw cache.ActualStateOfWorld) {
  1384. // check if volume is locally pod mounted in uncertain state
  1385. err := retryWithExponentialBackOff(
  1386. testOperationBackOffDuration,
  1387. func() (bool, error) {
  1388. allMountedVolumes := asw.GetAllMountedVolumes()
  1389. for _, v := range allMountedVolumes {
  1390. if v.VolumeName == volumeName {
  1391. return true, nil
  1392. }
  1393. }
  1394. return false, nil
  1395. },
  1396. )
  1397. if err != nil {
  1398. t.Fatalf("expected volumes %s to be mounted in uncertain state for pod", volumeName)
  1399. }
  1400. }
  1401. func waitForMount(
  1402. t *testing.T,
  1403. fakePlugin *volumetesting.FakeVolumePlugin,
  1404. volumeName v1.UniqueVolumeName,
  1405. asw cache.ActualStateOfWorld) {
  1406. err := retryWithExponentialBackOff(
  1407. testOperationBackOffDuration,
  1408. func() (bool, error) {
  1409. mountedVolumes := asw.GetMountedVolumes()
  1410. for _, mountedVolume := range mountedVolumes {
  1411. if mountedVolume.VolumeName == volumeName {
  1412. return true, nil
  1413. }
  1414. }
  1415. return false, nil
  1416. },
  1417. )
  1418. if err != nil {
  1419. t.Fatalf("Timed out waiting for volume %q to be attached.", volumeName)
  1420. }
  1421. }
  1422. func waitForVolumeToExistInASW(t *testing.T, volumeName v1.UniqueVolumeName, asw cache.ActualStateOfWorld) {
  1423. err := retryWithExponentialBackOff(
  1424. testOperationBackOffDuration,
  1425. func() (bool, error) {
  1426. if asw.VolumeExists(volumeName) {
  1427. return true, nil
  1428. }
  1429. return false, nil
  1430. },
  1431. )
  1432. if err != nil {
  1433. t.Fatalf("Timed out waiting for volume %q to be exist in asw.", volumeName)
  1434. }
  1435. }
  1436. func waitForDetach(
  1437. t *testing.T,
  1438. volumeName v1.UniqueVolumeName,
  1439. asw cache.ActualStateOfWorld) {
  1440. err := retryWithExponentialBackOff(
  1441. testOperationBackOffDuration,
  1442. func() (bool, error) {
  1443. if asw.VolumeExists(volumeName) {
  1444. return false, nil
  1445. }
  1446. return true, nil
  1447. },
  1448. )
  1449. if err != nil {
  1450. t.Fatalf("Timed out waiting for volume %q to be detached.", volumeName)
  1451. }
  1452. }
  1453. func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.ConditionFunc) error {
  1454. backoff := wait.Backoff{
  1455. Duration: initialDuration,
  1456. Factor: 3,
  1457. Jitter: 0,
  1458. Steps: 6,
  1459. }
  1460. return wait.ExponentialBackoff(backoff, fn)
  1461. }
  1462. func createTestClient() *fake.Clientset {
  1463. fakeClient := &fake.Clientset{}
  1464. fakeClient.AddReactor("get", "nodes",
  1465. func(action core.Action) (bool, runtime.Object, error) {
  1466. return true, &v1.Node{
  1467. ObjectMeta: metav1.ObjectMeta{Name: string(nodeName)},
  1468. Status: v1.NodeStatus{
  1469. VolumesAttached: []v1.AttachedVolume{
  1470. {
  1471. Name: "fake-plugin/fake-device1",
  1472. DevicePath: "/fake/path",
  1473. },
  1474. }},
  1475. }, nil
  1476. })
  1477. fakeClient.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) {
  1478. return true, nil, fmt.Errorf("no reaction implemented for %s", action)
  1479. })
  1480. return fakeClient
  1481. }
  1482. func runReconciler(reconciler Reconciler) {
  1483. go reconciler.Run(wait.NeverStop)
  1484. }
  1485. func createtestClientWithPVPVC(pv *v1.PersistentVolume, pvc *v1.PersistentVolumeClaim, attachedVolumes ...v1.AttachedVolume) *fake.Clientset {
  1486. fakeClient := &fake.Clientset{}
  1487. if len(attachedVolumes) == 0 {
  1488. attachedVolumes = append(attachedVolumes, v1.AttachedVolume{
  1489. Name: "fake-plugin/pv",
  1490. DevicePath: "fake/path",
  1491. })
  1492. }
  1493. fakeClient.AddReactor("get", "nodes",
  1494. func(action core.Action) (bool, runtime.Object, error) {
  1495. return true, &v1.Node{
  1496. ObjectMeta: metav1.ObjectMeta{Name: string(nodeName)},
  1497. Status: v1.NodeStatus{
  1498. VolumesAttached: attachedVolumes,
  1499. },
  1500. }, nil
  1501. })
  1502. fakeClient.AddReactor("get", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
  1503. return true, pvc, nil
  1504. })
  1505. fakeClient.AddReactor("get", "persistentvolumes", func(action core.Action) (bool, runtime.Object, error) {
  1506. return true, pv, nil
  1507. })
  1508. fakeClient.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) {
  1509. return true, nil, fmt.Errorf("no reaction implemented for %s", action)
  1510. })
  1511. return fakeClient
  1512. }
  1513. func Test_Run_Positive_VolumeMountControllerAttachEnabledRace(t *testing.T) {
  1514. // Arrange
  1515. volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
  1516. dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
  1517. asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
  1518. kubeClient := createTestClient()
  1519. fakeRecorder := &record.FakeRecorder{}
  1520. fakeHandler := volumetesting.NewBlockVolumePathHandler()
  1521. oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  1522. kubeClient,
  1523. volumePluginMgr,
  1524. fakeRecorder,
  1525. false, /* checkNodeCapabilitiesBeforeMount */
  1526. fakeHandler))
  1527. reconciler := NewReconciler(
  1528. kubeClient,
  1529. true, /* controllerAttachDetachEnabled */
  1530. reconcilerLoopSleepDuration,
  1531. waitForAttachTimeout,
  1532. nodeName,
  1533. dsw,
  1534. asw,
  1535. hasAddedPods,
  1536. oex,
  1537. mount.NewFakeMounter(nil),
  1538. hostutil.NewFakeHostUtil(nil),
  1539. volumePluginMgr,
  1540. kubeletPodsDir)
  1541. pod := &v1.Pod{
  1542. ObjectMeta: metav1.ObjectMeta{
  1543. Name: "pod1",
  1544. UID: "pod1uid",
  1545. },
  1546. Spec: v1.PodSpec{
  1547. Volumes: []v1.Volume{
  1548. {
  1549. Name: "volume-name",
  1550. VolumeSource: v1.VolumeSource{
  1551. GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
  1552. PDName: "fake-device1",
  1553. },
  1554. },
  1555. },
  1556. },
  1557. },
  1558. }
  1559. // Some steps are executes out of order in callbacks, follow the numbers.
  1560. // 1. Add a volume to DSW and wait until it's mounted
  1561. volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
  1562. podName := util.GetUniquePodName(pod)
  1563. generatedVolumeName, err := dsw.AddPodToVolume(
  1564. podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
  1565. dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeName})
  1566. if err != nil {
  1567. t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
  1568. }
  1569. // Start the reconciler to fill ASW.
  1570. stopChan, stoppedChan := make(chan struct{}), make(chan struct{})
  1571. go func() {
  1572. reconciler.Run(stopChan)
  1573. close(stoppedChan)
  1574. }()
  1575. waitForMount(t, fakePlugin, generatedVolumeName, asw)
  1576. // Stop the reconciler.
  1577. close(stopChan)
  1578. <-stoppedChan
  1579. finished := make(chan interface{})
  1580. fakePlugin.UnmountDeviceHook = func(mountPath string) error {
  1581. // Act:
  1582. // 3. While a volume is being unmounted, add it back to the desired state of world
  1583. klog.Infof("UnmountDevice called")
  1584. generatedVolumeName, err = dsw.AddPodToVolume(
  1585. podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
  1586. dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeName})
  1587. return nil
  1588. }
  1589. fakePlugin.WaitForAttachHook = func(spec *volume.Spec, devicePath string, pod *v1.Pod, spectimeout time.Duration) (string, error) {
  1590. // Assert
  1591. // 4. When the volume is mounted again, expect that UnmountDevice operation did not clear devicePath
  1592. if devicePath == "" {
  1593. t.Errorf("Expected WaitForAttach called with devicePath from Node.Status")
  1594. close(finished)
  1595. return "", fmt.Errorf("Expected devicePath from Node.Status")
  1596. }
  1597. close(finished)
  1598. return devicePath, nil
  1599. }
  1600. // Start the reconciler again.
  1601. go reconciler.Run(wait.NeverStop)
  1602. // 2. Delete the volume from DSW (and wait for callbacks)
  1603. dsw.DeletePodFromVolume(podName, generatedVolumeName)
  1604. <-finished
  1605. waitForMount(t, fakePlugin, generatedVolumeName, asw)
  1606. }