reconciler_test.go 40 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package reconciler
  14. import (
  15. "fmt"
  16. "testing"
  17. "time"
  18. "github.com/stretchr/testify/assert"
  19. "k8s.io/api/core/v1"
  20. "k8s.io/apimachinery/pkg/api/resource"
  21. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  22. "k8s.io/apimachinery/pkg/runtime"
  23. k8stypes "k8s.io/apimachinery/pkg/types"
  24. "k8s.io/apimachinery/pkg/util/wait"
  25. utilfeature "k8s.io/apiserver/pkg/util/feature"
  26. "k8s.io/client-go/kubernetes/fake"
  27. core "k8s.io/client-go/testing"
  28. "k8s.io/client-go/tools/record"
  29. featuregatetesting "k8s.io/component-base/featuregate/testing"
  30. "k8s.io/klog"
  31. "k8s.io/kubernetes/pkg/features"
  32. "k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
  33. "k8s.io/kubernetes/pkg/util/mount"
  34. "k8s.io/kubernetes/pkg/volume"
  35. volumetesting "k8s.io/kubernetes/pkg/volume/testing"
  36. "k8s.io/kubernetes/pkg/volume/util"
  37. "k8s.io/kubernetes/pkg/volume/util/operationexecutor"
  38. )
  39. const (
  40. // reconcilerLoopSleepDuration is the amount of time the reconciler loop
  41. // waits between successive executions
  42. reconcilerLoopSleepDuration time.Duration = 1 * time.Nanosecond
  43. // waitForAttachTimeout is the maximum amount of time a
  44. // operationexecutor.Mount call will wait for a volume to be attached.
  45. waitForAttachTimeout time.Duration = 1 * time.Second
  46. nodeName k8stypes.NodeName = k8stypes.NodeName("mynodename")
  47. kubeletPodsDir string = "fake-dir"
  48. )
  49. func hasAddedPods() bool { return true }
  50. // Calls Run()
  51. // Verifies there are no calls to attach, detach, mount, unmount, etc.
  52. func Test_Run_Positive_DoNothing(t *testing.T) {
  53. // Arrange
  54. volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
  55. dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
  56. asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
  57. kubeClient := createTestClient()
  58. fakeRecorder := &record.FakeRecorder{}
  59. fakeHandler := volumetesting.NewBlockVolumePathHandler()
  60. oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  61. kubeClient,
  62. volumePluginMgr,
  63. fakeRecorder,
  64. false, /* checkNodeCapabilitiesBeforeMount */
  65. fakeHandler,
  66. ))
  67. reconciler := NewReconciler(
  68. kubeClient,
  69. false, /* controllerAttachDetachEnabled */
  70. reconcilerLoopSleepDuration,
  71. waitForAttachTimeout,
  72. nodeName,
  73. dsw,
  74. asw,
  75. hasAddedPods,
  76. oex,
  77. &mount.FakeMounter{},
  78. volumePluginMgr,
  79. kubeletPodsDir)
  80. // Act
  81. runReconciler(reconciler)
  82. // Assert
  83. assert.NoError(t, volumetesting.VerifyZeroAttachCalls(fakePlugin))
  84. assert.NoError(t, volumetesting.VerifyZeroWaitForAttachCallCount(fakePlugin))
  85. assert.NoError(t, volumetesting.VerifyZeroMountDeviceCallCount(fakePlugin))
  86. assert.NoError(t, volumetesting.VerifyZeroSetUpCallCount(fakePlugin))
  87. assert.NoError(t, volumetesting.VerifyZeroTearDownCallCount(fakePlugin))
  88. assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
  89. }
  90. // Populates desiredStateOfWorld cache with one volume/pod.
  91. // Calls Run()
  92. // Verifies there is are attach/mount/etc calls and no detach/unmount calls.
  93. func Test_Run_Positive_VolumeAttachAndMount(t *testing.T) {
  94. // Arrange
  95. volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
  96. dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
  97. asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
  98. kubeClient := createTestClient()
  99. fakeRecorder := &record.FakeRecorder{}
  100. fakeHandler := volumetesting.NewBlockVolumePathHandler()
  101. oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  102. kubeClient,
  103. volumePluginMgr,
  104. fakeRecorder,
  105. false, /* checkNodeCapabilitiesBeforeMount */
  106. fakeHandler))
  107. reconciler := NewReconciler(
  108. kubeClient,
  109. false, /* controllerAttachDetachEnabled */
  110. reconcilerLoopSleepDuration,
  111. waitForAttachTimeout,
  112. nodeName,
  113. dsw,
  114. asw,
  115. hasAddedPods,
  116. oex,
  117. &mount.FakeMounter{},
  118. volumePluginMgr,
  119. kubeletPodsDir)
  120. pod := &v1.Pod{
  121. ObjectMeta: metav1.ObjectMeta{
  122. Name: "pod1",
  123. UID: "pod1uid",
  124. },
  125. Spec: v1.PodSpec{
  126. Volumes: []v1.Volume{
  127. {
  128. Name: "volume-name",
  129. VolumeSource: v1.VolumeSource{
  130. GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
  131. PDName: "fake-device1",
  132. },
  133. },
  134. },
  135. },
  136. },
  137. }
  138. volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
  139. podName := util.GetUniquePodName(pod)
  140. generatedVolumeName, err := dsw.AddPodToVolume(
  141. podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
  142. // Assert
  143. if err != nil {
  144. t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
  145. }
  146. // Act
  147. runReconciler(reconciler)
  148. waitForMount(t, fakePlugin, generatedVolumeName, asw)
  149. // Assert
  150. assert.NoError(t, volumetesting.VerifyAttachCallCount(
  151. 1 /* expectedAttachCallCount */, fakePlugin))
  152. assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount(
  153. 1 /* expectedWaitForAttachCallCount */, fakePlugin))
  154. assert.NoError(t, volumetesting.VerifyMountDeviceCallCount(
  155. 1 /* expectedMountDeviceCallCount */, fakePlugin))
  156. assert.NoError(t, volumetesting.VerifySetUpCallCount(
  157. 1 /* expectedSetUpCallCount */, fakePlugin))
  158. assert.NoError(t, volumetesting.VerifyZeroTearDownCallCount(fakePlugin))
  159. assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
  160. }
  161. // Populates desiredStateOfWorld cache with one volume/pod.
  162. // Enables controllerAttachDetachEnabled.
  163. // Calls Run()
  164. // Verifies there is one mount call and no unmount calls.
  165. // Verifies there are no attach/detach calls.
  166. func Test_Run_Positive_VolumeMountControllerAttachEnabled(t *testing.T) {
  167. // Arrange
  168. volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
  169. dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
  170. asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
  171. kubeClient := createTestClient()
  172. fakeRecorder := &record.FakeRecorder{}
  173. fakeHandler := volumetesting.NewBlockVolumePathHandler()
  174. oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  175. kubeClient,
  176. volumePluginMgr,
  177. fakeRecorder,
  178. false, /* checkNodeCapabilitiesBeforeMount */
  179. fakeHandler))
  180. reconciler := NewReconciler(
  181. kubeClient,
  182. true, /* controllerAttachDetachEnabled */
  183. reconcilerLoopSleepDuration,
  184. waitForAttachTimeout,
  185. nodeName,
  186. dsw,
  187. asw,
  188. hasAddedPods,
  189. oex,
  190. &mount.FakeMounter{},
  191. volumePluginMgr,
  192. kubeletPodsDir)
  193. pod := &v1.Pod{
  194. ObjectMeta: metav1.ObjectMeta{
  195. Name: "pod1",
  196. UID: "pod1uid",
  197. },
  198. Spec: v1.PodSpec{
  199. Volumes: []v1.Volume{
  200. {
  201. Name: "volume-name",
  202. VolumeSource: v1.VolumeSource{
  203. GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
  204. PDName: "fake-device1",
  205. },
  206. },
  207. },
  208. },
  209. },
  210. }
  211. volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
  212. podName := util.GetUniquePodName(pod)
  213. generatedVolumeName, err := dsw.AddPodToVolume(
  214. podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
  215. dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeName})
  216. // Assert
  217. if err != nil {
  218. t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
  219. }
  220. // Act
  221. runReconciler(reconciler)
  222. waitForMount(t, fakePlugin, generatedVolumeName, asw)
  223. // Assert
  224. assert.NoError(t, volumetesting.VerifyZeroAttachCalls(fakePlugin))
  225. assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount(
  226. 1 /* expectedWaitForAttachCallCount */, fakePlugin))
  227. assert.NoError(t, volumetesting.VerifyMountDeviceCallCount(
  228. 1 /* expectedMountDeviceCallCount */, fakePlugin))
  229. assert.NoError(t, volumetesting.VerifySetUpCallCount(
  230. 1 /* expectedSetUpCallCount */, fakePlugin))
  231. assert.NoError(t, volumetesting.VerifyZeroTearDownCallCount(fakePlugin))
  232. assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
  233. }
  234. // Populates desiredStateOfWorld cache with one volume/pod.
  235. // Calls Run()
  236. // Verifies there is one attach/mount/etc call and no detach calls.
  237. // Deletes volume/pod from desired state of world.
  238. // Verifies detach/unmount calls are issued.
  239. func Test_Run_Positive_VolumeAttachMountUnmountDetach(t *testing.T) {
  240. // Arrange
  241. volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
  242. dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
  243. asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
  244. kubeClient := createTestClient()
  245. fakeRecorder := &record.FakeRecorder{}
  246. fakeHandler := volumetesting.NewBlockVolumePathHandler()
  247. oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  248. kubeClient,
  249. volumePluginMgr,
  250. fakeRecorder,
  251. false, /* checkNodeCapabilitiesBeforeMount */
  252. fakeHandler))
  253. reconciler := NewReconciler(
  254. kubeClient,
  255. false, /* controllerAttachDetachEnabled */
  256. reconcilerLoopSleepDuration,
  257. waitForAttachTimeout,
  258. nodeName,
  259. dsw,
  260. asw,
  261. hasAddedPods,
  262. oex,
  263. &mount.FakeMounter{},
  264. volumePluginMgr,
  265. kubeletPodsDir)
  266. pod := &v1.Pod{
  267. ObjectMeta: metav1.ObjectMeta{
  268. Name: "pod1",
  269. UID: "pod1uid",
  270. },
  271. Spec: v1.PodSpec{
  272. Volumes: []v1.Volume{
  273. {
  274. Name: "volume-name",
  275. VolumeSource: v1.VolumeSource{
  276. GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
  277. PDName: "fake-device1",
  278. },
  279. },
  280. },
  281. },
  282. },
  283. }
  284. volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
  285. podName := util.GetUniquePodName(pod)
  286. generatedVolumeName, err := dsw.AddPodToVolume(
  287. podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
  288. // Assert
  289. if err != nil {
  290. t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
  291. }
  292. // Act
  293. runReconciler(reconciler)
  294. waitForMount(t, fakePlugin, generatedVolumeName, asw)
  295. // Assert
  296. assert.NoError(t, volumetesting.VerifyAttachCallCount(
  297. 1 /* expectedAttachCallCount */, fakePlugin))
  298. assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount(
  299. 1 /* expectedWaitForAttachCallCount */, fakePlugin))
  300. assert.NoError(t, volumetesting.VerifyMountDeviceCallCount(
  301. 1 /* expectedMountDeviceCallCount */, fakePlugin))
  302. assert.NoError(t, volumetesting.VerifySetUpCallCount(
  303. 1 /* expectedSetUpCallCount */, fakePlugin))
  304. assert.NoError(t, volumetesting.VerifyZeroTearDownCallCount(fakePlugin))
  305. assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
  306. // Act
  307. dsw.DeletePodFromVolume(podName, generatedVolumeName)
  308. waitForDetach(t, fakePlugin, generatedVolumeName, asw)
  309. // Assert
  310. assert.NoError(t, volumetesting.VerifyTearDownCallCount(
  311. 1 /* expectedTearDownCallCount */, fakePlugin))
  312. assert.NoError(t, volumetesting.VerifyDetachCallCount(
  313. 1 /* expectedDetachCallCount */, fakePlugin))
  314. }
  315. // Populates desiredStateOfWorld cache with one volume/pod.
  316. // Enables controllerAttachDetachEnabled.
  317. // Calls Run()
  318. // Verifies one mount call is made and no unmount calls.
  319. // Deletes volume/pod from desired state of world.
  320. // Verifies one unmount call is made.
  321. // Verifies there are no attach/detach calls made.
  322. func Test_Run_Positive_VolumeUnmountControllerAttachEnabled(t *testing.T) {
  323. // Arrange
  324. volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
  325. dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
  326. asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
  327. kubeClient := createTestClient()
  328. fakeRecorder := &record.FakeRecorder{}
  329. fakeHandler := volumetesting.NewBlockVolumePathHandler()
  330. oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  331. kubeClient,
  332. volumePluginMgr,
  333. fakeRecorder,
  334. false, /* checkNodeCapabilitiesBeforeMount */
  335. fakeHandler))
  336. reconciler := NewReconciler(
  337. kubeClient,
  338. true, /* controllerAttachDetachEnabled */
  339. reconcilerLoopSleepDuration,
  340. waitForAttachTimeout,
  341. nodeName,
  342. dsw,
  343. asw,
  344. hasAddedPods,
  345. oex,
  346. &mount.FakeMounter{},
  347. volumePluginMgr,
  348. kubeletPodsDir)
  349. pod := &v1.Pod{
  350. ObjectMeta: metav1.ObjectMeta{
  351. Name: "pod1",
  352. UID: "pod1uid",
  353. },
  354. Spec: v1.PodSpec{
  355. Volumes: []v1.Volume{
  356. {
  357. Name: "volume-name",
  358. VolumeSource: v1.VolumeSource{
  359. GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
  360. PDName: "fake-device1",
  361. },
  362. },
  363. },
  364. },
  365. },
  366. }
  367. volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
  368. podName := util.GetUniquePodName(pod)
  369. generatedVolumeName, err := dsw.AddPodToVolume(
  370. podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
  371. // Assert
  372. if err != nil {
  373. t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
  374. }
  375. // Act
  376. runReconciler(reconciler)
  377. dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeName})
  378. waitForMount(t, fakePlugin, generatedVolumeName, asw)
  379. // Assert
  380. assert.NoError(t, volumetesting.VerifyZeroAttachCalls(fakePlugin))
  381. assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount(
  382. 1 /* expectedWaitForAttachCallCount */, fakePlugin))
  383. assert.NoError(t, volumetesting.VerifyMountDeviceCallCount(
  384. 1 /* expectedMountDeviceCallCount */, fakePlugin))
  385. assert.NoError(t, volumetesting.VerifySetUpCallCount(
  386. 1 /* expectedSetUpCallCount */, fakePlugin))
  387. assert.NoError(t, volumetesting.VerifyZeroTearDownCallCount(fakePlugin))
  388. assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
  389. // Act
  390. dsw.DeletePodFromVolume(podName, generatedVolumeName)
  391. waitForDetach(t, fakePlugin, generatedVolumeName, asw)
  392. // Assert
  393. assert.NoError(t, volumetesting.VerifyTearDownCallCount(
  394. 1 /* expectedTearDownCallCount */, fakePlugin))
  395. assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
  396. }
  397. // Populates desiredStateOfWorld cache with one volume/pod.
  398. // Calls Run()
  399. // Verifies there are attach/get map paths/setupDevice calls and
  400. // no detach/teardownDevice calls.
  401. func Test_Run_Positive_VolumeAttachAndMap(t *testing.T) {
  402. // Enable BlockVolume feature gate
  403. defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BlockVolume, true)()
  404. // Arrange
  405. volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
  406. dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
  407. asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
  408. kubeClient := createTestClient()
  409. fakeRecorder := &record.FakeRecorder{}
  410. fakeHandler := volumetesting.NewBlockVolumePathHandler()
  411. oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  412. kubeClient,
  413. volumePluginMgr,
  414. fakeRecorder,
  415. false, /* checkNodeCapabilitiesBeforeMount */
  416. fakeHandler))
  417. reconciler := NewReconciler(
  418. kubeClient,
  419. false, /* controllerAttachDetachEnabled */
  420. reconcilerLoopSleepDuration,
  421. waitForAttachTimeout,
  422. nodeName,
  423. dsw,
  424. asw,
  425. hasAddedPods,
  426. oex,
  427. &mount.FakeMounter{},
  428. volumePluginMgr,
  429. kubeletPodsDir)
  430. pod := &v1.Pod{
  431. ObjectMeta: metav1.ObjectMeta{
  432. Name: "pod1",
  433. UID: "pod1uid",
  434. },
  435. Spec: v1.PodSpec{},
  436. }
  437. mode := v1.PersistentVolumeBlock
  438. gcepv := &v1.PersistentVolume{
  439. ObjectMeta: metav1.ObjectMeta{UID: "001", Name: "volume-name"},
  440. Spec: v1.PersistentVolumeSpec{
  441. Capacity: v1.ResourceList{v1.ResourceName(v1.ResourceStorage): resource.MustParse("10G")},
  442. PersistentVolumeSource: v1.PersistentVolumeSource{GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{PDName: "fake-device1"}},
  443. AccessModes: []v1.PersistentVolumeAccessMode{
  444. v1.ReadWriteOnce,
  445. v1.ReadOnlyMany,
  446. },
  447. VolumeMode: &mode,
  448. },
  449. }
  450. volumeSpec := &volume.Spec{
  451. PersistentVolume: gcepv,
  452. }
  453. podName := util.GetUniquePodName(pod)
  454. generatedVolumeName, err := dsw.AddPodToVolume(
  455. podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
  456. // Assert
  457. if err != nil {
  458. t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
  459. }
  460. // Act
  461. runReconciler(reconciler)
  462. waitForMount(t, fakePlugin, generatedVolumeName, asw)
  463. // Assert
  464. assert.NoError(t, volumetesting.VerifyAttachCallCount(
  465. 1 /* expectedAttachCallCount */, fakePlugin))
  466. assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount(
  467. 1 /* expectedWaitForAttachCallCount */, fakePlugin))
  468. assert.NoError(t, volumetesting.VerifyGetMapDeviceCallCount(
  469. 1 /* expectedGetMapDeviceCallCount */, fakePlugin))
  470. assert.NoError(t, volumetesting.VerifyZeroTearDownDeviceCallCount(fakePlugin))
  471. assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
  472. }
  473. // Populates desiredStateOfWorld cache with one volume/pod.
  474. // Enables controllerAttachDetachEnabled.
  475. // Calls Run()
  476. // Verifies there are two get map path calls, a setupDevice call
  477. // and no teardownDevice call.
  478. // Verifies there are no attach/detach calls.
  479. func Test_Run_Positive_BlockVolumeMapControllerAttachEnabled(t *testing.T) {
  480. // Enable BlockVolume feature gate
  481. defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BlockVolume, true)()
  482. // Arrange
  483. volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
  484. dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
  485. asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
  486. kubeClient := createTestClient()
  487. fakeRecorder := &record.FakeRecorder{}
  488. fakeHandler := volumetesting.NewBlockVolumePathHandler()
  489. oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  490. kubeClient,
  491. volumePluginMgr,
  492. fakeRecorder,
  493. false, /* checkNodeCapabilitiesBeforeMount */
  494. fakeHandler))
  495. reconciler := NewReconciler(
  496. kubeClient,
  497. true, /* controllerAttachDetachEnabled */
  498. reconcilerLoopSleepDuration,
  499. waitForAttachTimeout,
  500. nodeName,
  501. dsw,
  502. asw,
  503. hasAddedPods,
  504. oex,
  505. &mount.FakeMounter{},
  506. volumePluginMgr,
  507. kubeletPodsDir)
  508. pod := &v1.Pod{
  509. ObjectMeta: metav1.ObjectMeta{
  510. Name: "pod1",
  511. UID: "pod1uid",
  512. },
  513. Spec: v1.PodSpec{},
  514. }
  515. mode := v1.PersistentVolumeBlock
  516. gcepv := &v1.PersistentVolume{
  517. ObjectMeta: metav1.ObjectMeta{UID: "001", Name: "volume-name"},
  518. Spec: v1.PersistentVolumeSpec{
  519. Capacity: v1.ResourceList{v1.ResourceName(v1.ResourceStorage): resource.MustParse("10G")},
  520. PersistentVolumeSource: v1.PersistentVolumeSource{GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{PDName: "fake-device1"}},
  521. AccessModes: []v1.PersistentVolumeAccessMode{
  522. v1.ReadWriteOnce,
  523. v1.ReadOnlyMany,
  524. },
  525. VolumeMode: &mode,
  526. },
  527. }
  528. volumeSpec := &volume.Spec{
  529. PersistentVolume: gcepv,
  530. }
  531. podName := util.GetUniquePodName(pod)
  532. generatedVolumeName, err := dsw.AddPodToVolume(
  533. podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
  534. dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeName})
  535. // Assert
  536. if err != nil {
  537. t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
  538. }
  539. // Act
  540. runReconciler(reconciler)
  541. waitForMount(t, fakePlugin, generatedVolumeName, asw)
  542. // Assert
  543. assert.NoError(t, volumetesting.VerifyZeroAttachCalls(fakePlugin))
  544. assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount(
  545. 1 /* expectedWaitForAttachCallCount */, fakePlugin))
  546. assert.NoError(t, volumetesting.VerifyGetMapDeviceCallCount(
  547. 1 /* expectedGetMapDeviceCallCount */, fakePlugin))
  548. assert.NoError(t, volumetesting.VerifyZeroTearDownDeviceCallCount(fakePlugin))
  549. assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
  550. }
  551. // Populates desiredStateOfWorld cache with one volume/pod.
  552. // Calls Run()
  553. // Verifies there is one attach call, two get map path calls,
  554. // setupDevice call and no detach calls.
  555. // Deletes volume/pod from desired state of world.
  556. // Verifies one detach/teardownDevice calls are issued.
  557. func Test_Run_Positive_BlockVolumeAttachMapUnmapDetach(t *testing.T) {
  558. // Enable BlockVolume feature gate
  559. defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BlockVolume, true)()
  560. // Arrange
  561. volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
  562. dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
  563. asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
  564. kubeClient := createTestClient()
  565. fakeRecorder := &record.FakeRecorder{}
  566. fakeHandler := volumetesting.NewBlockVolumePathHandler()
  567. oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  568. kubeClient,
  569. volumePluginMgr,
  570. fakeRecorder,
  571. false, /* checkNodeCapabilitiesBeforeMount */
  572. fakeHandler))
  573. reconciler := NewReconciler(
  574. kubeClient,
  575. false, /* controllerAttachDetachEnabled */
  576. reconcilerLoopSleepDuration,
  577. waitForAttachTimeout,
  578. nodeName,
  579. dsw,
  580. asw,
  581. hasAddedPods,
  582. oex,
  583. &mount.FakeMounter{},
  584. volumePluginMgr,
  585. kubeletPodsDir)
  586. pod := &v1.Pod{
  587. ObjectMeta: metav1.ObjectMeta{
  588. Name: "pod1",
  589. UID: "pod1uid",
  590. },
  591. Spec: v1.PodSpec{},
  592. }
  593. mode := v1.PersistentVolumeBlock
  594. gcepv := &v1.PersistentVolume{
  595. ObjectMeta: metav1.ObjectMeta{UID: "001", Name: "volume-name"},
  596. Spec: v1.PersistentVolumeSpec{
  597. Capacity: v1.ResourceList{v1.ResourceName(v1.ResourceStorage): resource.MustParse("10G")},
  598. PersistentVolumeSource: v1.PersistentVolumeSource{GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{PDName: "fake-device1"}},
  599. AccessModes: []v1.PersistentVolumeAccessMode{
  600. v1.ReadWriteOnce,
  601. v1.ReadOnlyMany,
  602. },
  603. VolumeMode: &mode,
  604. },
  605. }
  606. volumeSpec := &volume.Spec{
  607. PersistentVolume: gcepv,
  608. }
  609. podName := util.GetUniquePodName(pod)
  610. generatedVolumeName, err := dsw.AddPodToVolume(
  611. podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
  612. // Assert
  613. if err != nil {
  614. t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
  615. }
  616. // Act
  617. runReconciler(reconciler)
  618. waitForMount(t, fakePlugin, generatedVolumeName, asw)
  619. // Assert
  620. assert.NoError(t, volumetesting.VerifyAttachCallCount(
  621. 1 /* expectedAttachCallCount */, fakePlugin))
  622. assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount(
  623. 1 /* expectedWaitForAttachCallCount */, fakePlugin))
  624. assert.NoError(t, volumetesting.VerifyGetMapDeviceCallCount(
  625. 1 /* expectedGetMapDeviceCallCount */, fakePlugin))
  626. assert.NoError(t, volumetesting.VerifyZeroTearDownDeviceCallCount(fakePlugin))
  627. assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
  628. // Act
  629. dsw.DeletePodFromVolume(podName, generatedVolumeName)
  630. waitForDetach(t, fakePlugin, generatedVolumeName, asw)
  631. // Assert
  632. assert.NoError(t, volumetesting.VerifyTearDownDeviceCallCount(
  633. 1 /* expectedTearDownDeviceCallCount */, fakePlugin))
  634. assert.NoError(t, volumetesting.VerifyDetachCallCount(
  635. 1 /* expectedDetachCallCount */, fakePlugin))
  636. }
  637. // Populates desiredStateOfWorld cache with one volume/pod.
  638. // Enables controllerAttachDetachEnabled.
  639. // Calls Run()
  640. // Verifies two map path calls are made and no teardownDevice/detach calls.
  641. // Deletes volume/pod from desired state of world.
  642. // Verifies one teardownDevice call is made.
  643. // Verifies there are no attach/detach calls made.
  644. func Test_Run_Positive_VolumeUnmapControllerAttachEnabled(t *testing.T) {
  645. // Enable BlockVolume feature gate
  646. defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BlockVolume, true)()
  647. // Arrange
  648. volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
  649. dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
  650. asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
  651. kubeClient := createTestClient()
  652. fakeRecorder := &record.FakeRecorder{}
  653. fakeHandler := volumetesting.NewBlockVolumePathHandler()
  654. oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  655. kubeClient,
  656. volumePluginMgr,
  657. fakeRecorder,
  658. false, /* checkNodeCapabilitiesBeforeMount */
  659. fakeHandler))
  660. reconciler := NewReconciler(
  661. kubeClient,
  662. true, /* controllerAttachDetachEnabled */
  663. reconcilerLoopSleepDuration,
  664. waitForAttachTimeout,
  665. nodeName,
  666. dsw,
  667. asw,
  668. hasAddedPods,
  669. oex,
  670. &mount.FakeMounter{},
  671. volumePluginMgr,
  672. kubeletPodsDir)
  673. pod := &v1.Pod{
  674. ObjectMeta: metav1.ObjectMeta{
  675. Name: "pod1",
  676. UID: "pod1uid",
  677. },
  678. Spec: v1.PodSpec{},
  679. }
  680. mode := v1.PersistentVolumeBlock
  681. gcepv := &v1.PersistentVolume{
  682. ObjectMeta: metav1.ObjectMeta{UID: "001", Name: "volume-name"},
  683. Spec: v1.PersistentVolumeSpec{
  684. Capacity: v1.ResourceList{v1.ResourceName(v1.ResourceStorage): resource.MustParse("10G")},
  685. PersistentVolumeSource: v1.PersistentVolumeSource{GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{PDName: "fake-device1"}},
  686. AccessModes: []v1.PersistentVolumeAccessMode{
  687. v1.ReadWriteOnce,
  688. v1.ReadOnlyMany,
  689. },
  690. VolumeMode: &mode,
  691. },
  692. }
  693. volumeSpec := &volume.Spec{
  694. PersistentVolume: gcepv,
  695. }
  696. podName := util.GetUniquePodName(pod)
  697. generatedVolumeName, err := dsw.AddPodToVolume(
  698. podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
  699. // Assert
  700. if err != nil {
  701. t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
  702. }
  703. // Act
  704. runReconciler(reconciler)
  705. dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeName})
  706. waitForMount(t, fakePlugin, generatedVolumeName, asw)
  707. // Assert
  708. assert.NoError(t, volumetesting.VerifyZeroAttachCalls(fakePlugin))
  709. assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount(
  710. 1 /* expectedWaitForAttachCallCount */, fakePlugin))
  711. assert.NoError(t, volumetesting.VerifyGetMapDeviceCallCount(
  712. 1 /* expectedGetMapDeviceCallCount */, fakePlugin))
  713. assert.NoError(t, volumetesting.VerifyZeroTearDownDeviceCallCount(fakePlugin))
  714. assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
  715. // Act
  716. dsw.DeletePodFromVolume(podName, generatedVolumeName)
  717. waitForDetach(t, fakePlugin, generatedVolumeName, asw)
  718. // Assert
  719. assert.NoError(t, volumetesting.VerifyTearDownDeviceCallCount(
  720. 1 /* expectedTearDownDeviceCallCount */, fakePlugin))
  721. assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
  722. }
  723. func Test_GenerateMapVolumeFunc_Plugin_Not_Found(t *testing.T) {
  724. testCases := map[string]struct {
  725. volumePlugins []volume.VolumePlugin
  726. expectErr bool
  727. expectedErrMsg string
  728. }{
  729. "volumePlugin is nil": {
  730. volumePlugins: []volume.VolumePlugin{},
  731. expectErr: true,
  732. expectedErrMsg: "MapVolume.FindMapperPluginBySpec failed",
  733. },
  734. "blockVolumePlugin is nil": {
  735. volumePlugins: volumetesting.NewFakeFileVolumePlugin(),
  736. expectErr: true,
  737. expectedErrMsg: "MapVolume.FindMapperPluginBySpec failed to find BlockVolumeMapper plugin. Volume plugin is nil.",
  738. },
  739. }
  740. // Enable BlockVolume feature gate
  741. defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BlockVolume, true)()
  742. for name, tc := range testCases {
  743. t.Run(name, func(t *testing.T) {
  744. volumePluginMgr := &volume.VolumePluginMgr{}
  745. volumePluginMgr.InitPlugins(tc.volumePlugins, nil, nil)
  746. asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
  747. oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  748. nil, /* kubeClient */
  749. volumePluginMgr,
  750. nil, /* fakeRecorder */
  751. false, /* checkNodeCapabilitiesBeforeMount */
  752. nil))
  753. pod := &v1.Pod{
  754. ObjectMeta: metav1.ObjectMeta{
  755. Name: "pod1",
  756. UID: "pod1uid",
  757. },
  758. Spec: v1.PodSpec{},
  759. }
  760. volumeMode := v1.PersistentVolumeBlock
  761. tmpSpec := &volume.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{VolumeMode: &volumeMode}}}
  762. volumeToMount := operationexecutor.VolumeToMount{
  763. Pod: pod,
  764. VolumeSpec: tmpSpec}
  765. err := oex.MountVolume(waitForAttachTimeout, volumeToMount, asw, false)
  766. // Assert
  767. if assert.Error(t, err) {
  768. assert.Contains(t, err.Error(), tc.expectedErrMsg)
  769. }
  770. })
  771. }
  772. }
  773. func Test_GenerateUnmapVolumeFunc_Plugin_Not_Found(t *testing.T) {
  774. testCases := map[string]struct {
  775. volumePlugins []volume.VolumePlugin
  776. expectErr bool
  777. expectedErrMsg string
  778. }{
  779. "volumePlugin is nil": {
  780. volumePlugins: []volume.VolumePlugin{},
  781. expectErr: true,
  782. expectedErrMsg: "UnmapVolume.FindMapperPluginByName failed",
  783. },
  784. "blockVolumePlugin is nil": {
  785. volumePlugins: volumetesting.NewFakeFileVolumePlugin(),
  786. expectErr: true,
  787. expectedErrMsg: "UnmapVolume.FindMapperPluginByName failed to find BlockVolumeMapper plugin. Volume plugin is nil.",
  788. },
  789. }
  790. // Enable BlockVolume feature gate
  791. defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BlockVolume, true)()
  792. for name, tc := range testCases {
  793. t.Run(name, func(t *testing.T) {
  794. volumePluginMgr := &volume.VolumePluginMgr{}
  795. volumePluginMgr.InitPlugins(tc.volumePlugins, nil, nil)
  796. asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
  797. oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  798. nil, /* kubeClient */
  799. volumePluginMgr,
  800. nil, /* fakeRecorder */
  801. false, /* checkNodeCapabilitiesBeforeMount */
  802. nil))
  803. volumeMode := v1.PersistentVolumeBlock
  804. tmpSpec := &volume.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{VolumeMode: &volumeMode}}}
  805. volumeToUnmount := operationexecutor.MountedVolume{
  806. PluginName: "fake-file-plugin",
  807. VolumeSpec: tmpSpec}
  808. err := oex.UnmountVolume(volumeToUnmount, asw, "" /* podsDir */)
  809. // Assert
  810. if assert.Error(t, err) {
  811. assert.Contains(t, err.Error(), tc.expectedErrMsg)
  812. }
  813. })
  814. }
  815. }
  816. func Test_GenerateUnmapDeviceFunc_Plugin_Not_Found(t *testing.T) {
  817. testCases := map[string]struct {
  818. volumePlugins []volume.VolumePlugin
  819. expectErr bool
  820. expectedErrMsg string
  821. }{
  822. "volumePlugin is nil": {
  823. volumePlugins: []volume.VolumePlugin{},
  824. expectErr: true,
  825. expectedErrMsg: "UnmapDevice.FindMapperPluginByName failed",
  826. },
  827. "blockVolumePlugin is nil": {
  828. volumePlugins: volumetesting.NewFakeFileVolumePlugin(),
  829. expectErr: true,
  830. expectedErrMsg: "UnmapDevice.FindMapperPluginByName failed to find BlockVolumeMapper plugin. Volume plugin is nil.",
  831. },
  832. }
  833. // Enable BlockVolume feature gate
  834. defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BlockVolume, true)()
  835. for name, tc := range testCases {
  836. t.Run(name, func(t *testing.T) {
  837. volumePluginMgr := &volume.VolumePluginMgr{}
  838. volumePluginMgr.InitPlugins(tc.volumePlugins, nil, nil)
  839. asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
  840. oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  841. nil, /* kubeClient */
  842. volumePluginMgr,
  843. nil, /* fakeRecorder */
  844. false, /* checkNodeCapabilitiesBeforeMount */
  845. nil))
  846. var mounter mount.Interface
  847. volumeMode := v1.PersistentVolumeBlock
  848. tmpSpec := &volume.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{VolumeMode: &volumeMode}}}
  849. deviceToDetach := operationexecutor.AttachedVolume{VolumeSpec: tmpSpec, PluginName: "fake-file-plugin"}
  850. err := oex.UnmountDevice(deviceToDetach, asw, mounter)
  851. // Assert
  852. if assert.Error(t, err) {
  853. assert.Contains(t, err.Error(), tc.expectedErrMsg)
  854. }
  855. })
  856. }
  857. }
  858. // Populates desiredStateOfWorld cache with one volume/pod.
  859. // Enables controllerAttachDetachEnabled.
  860. // Calls Run()
  861. // Wait for volume mounted.
  862. // Mark volume as fsResizeRequired in ASW.
  863. // Verifies volume's fsResizeRequired flag is cleared later.
  864. func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) {
  865. defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ExpandInUsePersistentVolumes, true)()
  866. fs := v1.PersistentVolumeFilesystem
  867. pv := &v1.PersistentVolume{
  868. ObjectMeta: metav1.ObjectMeta{
  869. Name: "pv",
  870. UID: "pvuid",
  871. },
  872. Spec: v1.PersistentVolumeSpec{
  873. ClaimRef: &v1.ObjectReference{Name: "pvc"},
  874. VolumeMode: &fs,
  875. },
  876. }
  877. pvc := &v1.PersistentVolumeClaim{
  878. ObjectMeta: metav1.ObjectMeta{
  879. Name: "pvc",
  880. UID: "pvcuid",
  881. },
  882. Spec: v1.PersistentVolumeClaimSpec{
  883. VolumeName: "pv",
  884. VolumeMode: &fs,
  885. },
  886. }
  887. pod := &v1.Pod{
  888. ObjectMeta: metav1.ObjectMeta{
  889. Name: "pod1",
  890. UID: "pod1uid",
  891. },
  892. Spec: v1.PodSpec{
  893. Volumes: []v1.Volume{
  894. {
  895. Name: "volume-name",
  896. VolumeSource: v1.VolumeSource{
  897. PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
  898. ClaimName: pvc.Name,
  899. },
  900. },
  901. },
  902. },
  903. },
  904. }
  905. volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
  906. dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
  907. asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
  908. kubeClient := createtestClientWithPVPVC(pv, pvc)
  909. fakeRecorder := &record.FakeRecorder{}
  910. fakeHandler := volumetesting.NewBlockVolumePathHandler()
  911. oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  912. kubeClient,
  913. volumePluginMgr,
  914. fakeRecorder,
  915. false, /* checkNodeCapabilitiesBeforeMount */
  916. fakeHandler))
  917. reconciler := NewReconciler(
  918. kubeClient,
  919. true, /* controllerAttachDetachEnabled */
  920. reconcilerLoopSleepDuration,
  921. waitForAttachTimeout,
  922. nodeName,
  923. dsw,
  924. asw,
  925. hasAddedPods,
  926. oex,
  927. &mount.FakeMounter{},
  928. volumePluginMgr,
  929. kubeletPodsDir)
  930. volumeSpec := &volume.Spec{PersistentVolume: pv}
  931. podName := util.GetUniquePodName(pod)
  932. volumeName, err := dsw.AddPodToVolume(
  933. podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
  934. // Assert
  935. if err != nil {
  936. t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
  937. }
  938. dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{volumeName})
  939. // Start the reconciler to fill ASW.
  940. stopChan, stoppedChan := make(chan struct{}), make(chan struct{})
  941. go func() {
  942. reconciler.Run(stopChan)
  943. close(stoppedChan)
  944. }()
  945. waitForMount(t, fakePlugin, volumeName, asw)
  946. // Stop the reconciler.
  947. close(stopChan)
  948. <-stoppedChan
  949. // Mark volume as fsResizeRequired.
  950. asw.MarkFSResizeRequired(volumeName, podName)
  951. _, _, podExistErr := asw.PodExistsInVolume(podName, volumeName)
  952. if !cache.IsFSResizeRequiredError(podExistErr) {
  953. t.Fatalf("Volume should be marked as fsResizeRequired, but receive unexpected error: %v", podExistErr)
  954. }
  955. // Start the reconciler again, we hope reconciler will perform the
  956. // resize operation and clear the fsResizeRequired flag for volume.
  957. go reconciler.Run(wait.NeverStop)
  958. waitErr := retryWithExponentialBackOff(500*time.Millisecond, func() (done bool, err error) {
  959. mounted, _, err := asw.PodExistsInVolume(podName, volumeName)
  960. return mounted && err == nil, nil
  961. })
  962. if waitErr != nil {
  963. t.Fatal("Volume resize should succeeded")
  964. }
  965. }
  966. func waitForMount(
  967. t *testing.T,
  968. fakePlugin *volumetesting.FakeVolumePlugin,
  969. volumeName v1.UniqueVolumeName,
  970. asw cache.ActualStateOfWorld) {
  971. err := retryWithExponentialBackOff(
  972. time.Duration(500*time.Millisecond),
  973. func() (bool, error) {
  974. mountedVolumes := asw.GetMountedVolumes()
  975. for _, mountedVolume := range mountedVolumes {
  976. if mountedVolume.VolumeName == volumeName {
  977. return true, nil
  978. }
  979. }
  980. return false, nil
  981. },
  982. )
  983. if err != nil {
  984. t.Fatalf("Timed out waiting for volume %q to be attached.", volumeName)
  985. }
  986. }
  987. func waitForDetach(
  988. t *testing.T,
  989. fakePlugin *volumetesting.FakeVolumePlugin,
  990. volumeName v1.UniqueVolumeName,
  991. asw cache.ActualStateOfWorld) {
  992. err := retryWithExponentialBackOff(
  993. time.Duration(500*time.Millisecond),
  994. func() (bool, error) {
  995. if asw.VolumeExists(volumeName) {
  996. return false, nil
  997. }
  998. return true, nil
  999. },
  1000. )
  1001. if err != nil {
  1002. t.Fatalf("Timed out waiting for volume %q to be detached.", volumeName)
  1003. }
  1004. }
  1005. func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.ConditionFunc) error {
  1006. backoff := wait.Backoff{
  1007. Duration: initialDuration,
  1008. Factor: 3,
  1009. Jitter: 0,
  1010. Steps: 6,
  1011. }
  1012. return wait.ExponentialBackoff(backoff, fn)
  1013. }
  1014. func createTestClient() *fake.Clientset {
  1015. fakeClient := &fake.Clientset{}
  1016. fakeClient.AddReactor("get", "nodes",
  1017. func(action core.Action) (bool, runtime.Object, error) {
  1018. return true, &v1.Node{
  1019. ObjectMeta: metav1.ObjectMeta{Name: string(nodeName)},
  1020. Status: v1.NodeStatus{
  1021. VolumesAttached: []v1.AttachedVolume{
  1022. {
  1023. Name: "fake-plugin/fake-device1",
  1024. DevicePath: "/fake/path",
  1025. },
  1026. }},
  1027. }, nil
  1028. })
  1029. fakeClient.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) {
  1030. return true, nil, fmt.Errorf("no reaction implemented for %s", action)
  1031. })
  1032. return fakeClient
  1033. }
  1034. func runReconciler(reconciler Reconciler) {
  1035. go reconciler.Run(wait.NeverStop)
  1036. }
  1037. func createtestClientWithPVPVC(pv *v1.PersistentVolume, pvc *v1.PersistentVolumeClaim) *fake.Clientset {
  1038. fakeClient := &fake.Clientset{}
  1039. fakeClient.AddReactor("get", "nodes",
  1040. func(action core.Action) (bool, runtime.Object, error) {
  1041. return true, &v1.Node{
  1042. ObjectMeta: metav1.ObjectMeta{Name: string(nodeName)},
  1043. Status: v1.NodeStatus{
  1044. VolumesAttached: []v1.AttachedVolume{
  1045. {
  1046. Name: "fake-plugin/pv",
  1047. DevicePath: "fake/path",
  1048. },
  1049. }},
  1050. }, nil
  1051. })
  1052. fakeClient.AddReactor("get", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
  1053. return true, pvc, nil
  1054. })
  1055. fakeClient.AddReactor("get", "persistentvolumes", func(action core.Action) (bool, runtime.Object, error) {
  1056. return true, pv, nil
  1057. })
  1058. fakeClient.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) {
  1059. return true, nil, fmt.Errorf("no reaction implemented for %s", action)
  1060. })
  1061. return fakeClient
  1062. }
  1063. func Test_Run_Positive_VolumeMountControllerAttachEnabledRace(t *testing.T) {
  1064. // Arrange
  1065. volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
  1066. dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
  1067. asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
  1068. kubeClient := createTestClient()
  1069. fakeRecorder := &record.FakeRecorder{}
  1070. fakeHandler := volumetesting.NewBlockVolumePathHandler()
  1071. oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  1072. kubeClient,
  1073. volumePluginMgr,
  1074. fakeRecorder,
  1075. false, /* checkNodeCapabilitiesBeforeMount */
  1076. fakeHandler))
  1077. reconciler := NewReconciler(
  1078. kubeClient,
  1079. true, /* controllerAttachDetachEnabled */
  1080. reconcilerLoopSleepDuration,
  1081. waitForAttachTimeout,
  1082. nodeName,
  1083. dsw,
  1084. asw,
  1085. hasAddedPods,
  1086. oex,
  1087. &mount.FakeMounter{},
  1088. volumePluginMgr,
  1089. kubeletPodsDir)
  1090. pod := &v1.Pod{
  1091. ObjectMeta: metav1.ObjectMeta{
  1092. Name: "pod1",
  1093. UID: "pod1uid",
  1094. },
  1095. Spec: v1.PodSpec{
  1096. Volumes: []v1.Volume{
  1097. {
  1098. Name: "volume-name",
  1099. VolumeSource: v1.VolumeSource{
  1100. GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
  1101. PDName: "fake-device1",
  1102. },
  1103. },
  1104. },
  1105. },
  1106. },
  1107. }
  1108. // Some steps are executes out of order in callbacks, follow the numbers.
  1109. // 1. Add a volume to DSW and wait until it's mounted
  1110. volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
  1111. podName := util.GetUniquePodName(pod)
  1112. generatedVolumeName, err := dsw.AddPodToVolume(
  1113. podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
  1114. dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeName})
  1115. if err != nil {
  1116. t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
  1117. }
  1118. // Start the reconciler to fill ASW.
  1119. stopChan, stoppedChan := make(chan struct{}), make(chan struct{})
  1120. go func() {
  1121. reconciler.Run(stopChan)
  1122. close(stoppedChan)
  1123. }()
  1124. waitForMount(t, fakePlugin, generatedVolumeName, asw)
  1125. // Stop the reconciler.
  1126. close(stopChan)
  1127. <-stoppedChan
  1128. finished := make(chan interface{})
  1129. fakePlugin.UnmountDeviceHook = func(mountPath string) error {
  1130. // Act:
  1131. // 3. While a volume is being unmounted, add it back to the desired state of world
  1132. klog.Infof("UnmountDevice called")
  1133. generatedVolumeName, err = dsw.AddPodToVolume(
  1134. podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
  1135. dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeName})
  1136. return nil
  1137. }
  1138. fakePlugin.WaitForAttachHook = func(spec *volume.Spec, devicePath string, pod *v1.Pod, spectimeout time.Duration) (string, error) {
  1139. // Assert
  1140. // 4. When the volume is mounted again, expect that UnmountDevice operation did not clear devicePath
  1141. if devicePath == "" {
  1142. t.Errorf("Expected WaitForAttach called with devicePath from Node.Status")
  1143. close(finished)
  1144. return "", fmt.Errorf("Expected devicePath from Node.Status")
  1145. }
  1146. close(finished)
  1147. return devicePath, nil
  1148. }
  1149. // Start the reconciler again.
  1150. go reconciler.Run(wait.NeverStop)
  1151. // 2. Delete the volume from DSW (and wait for callbacks)
  1152. dsw.DeletePodFromVolume(podName, generatedVolumeName)
  1153. <-finished
  1154. waitForMount(t, fakePlugin, generatedVolumeName, asw)
  1155. }