reconciler_test.go 43 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package reconciler
  14. import (
  15. "testing"
  16. "time"
  17. "k8s.io/api/core/v1"
  18. k8stypes "k8s.io/apimachinery/pkg/types"
  19. "k8s.io/apimachinery/pkg/util/wait"
  20. "k8s.io/client-go/informers"
  21. "k8s.io/client-go/tools/record"
  22. "k8s.io/kubernetes/pkg/controller"
  23. "k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
  24. "k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater"
  25. controllervolumetesting "k8s.io/kubernetes/pkg/controller/volume/attachdetach/testing"
  26. volumetesting "k8s.io/kubernetes/pkg/volume/testing"
  27. "k8s.io/kubernetes/pkg/volume/util/operationexecutor"
  28. "k8s.io/kubernetes/pkg/volume/util/types"
  29. utilstrings "k8s.io/utils/strings"
  30. )
  31. const (
  32. reconcilerLoopPeriod time.Duration = 0 * time.Millisecond
  33. syncLoopPeriod time.Duration = 100 * time.Minute
  34. maxWaitForUnmountDuration time.Duration = 50 * time.Millisecond
  35. resyncPeriod time.Duration = 5 * time.Minute
  36. )
  37. // Calls Run()
  38. // Verifies there are no calls to attach or detach.
  39. func Test_Run_Positive_DoNothing(t *testing.T) {
  40. // Arrange
  41. volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
  42. dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
  43. asw := cache.NewActualStateOfWorld(volumePluginMgr)
  44. fakeKubeClient := controllervolumetesting.CreateTestClient()
  45. fakeRecorder := &record.FakeRecorder{}
  46. fakeHandler := volumetesting.NewBlockVolumePathHandler()
  47. ad := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  48. fakeKubeClient,
  49. volumePluginMgr,
  50. fakeRecorder,
  51. false, /* checkNodeCapabilitiesBeforeMount */
  52. fakeHandler))
  53. informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc())
  54. nsu := statusupdater.NewNodeStatusUpdater(
  55. fakeKubeClient, informerFactory.Core().V1().Nodes().Lister(), asw)
  56. reconciler := NewReconciler(
  57. reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, fakeRecorder)
  58. // Act
  59. ch := make(chan struct{})
  60. go reconciler.Run(ch)
  61. defer close(ch)
  62. // Assert
  63. waitForNewAttacherCallCount(t, 0 /* expectedCallCount */, fakePlugin)
  64. verifyNewAttacherCallCount(t, true /* expectZeroNewAttacherCallCount */, fakePlugin)
  65. verifyNewDetacherCallCount(t, true /* expectZeroNewDetacherCallCount */, fakePlugin)
  66. waitForAttachCallCount(t, 0 /* expectedAttachCallCount */, fakePlugin)
  67. waitForDetachCallCount(t, 0 /* expectedDetachCallCount */, fakePlugin)
  68. }
  69. // Populates desiredStateOfWorld cache with one node/volume/pod tuple.
  70. // Calls Run()
  71. // Verifies there is one attach call and no detach calls.
  72. func Test_Run_Positive_OneDesiredVolumeAttach(t *testing.T) {
  73. // Arrange
  74. volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
  75. dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
  76. asw := cache.NewActualStateOfWorld(volumePluginMgr)
  77. fakeKubeClient := controllervolumetesting.CreateTestClient()
  78. fakeRecorder := &record.FakeRecorder{}
  79. fakeHandler := volumetesting.NewBlockVolumePathHandler()
  80. ad := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  81. fakeKubeClient,
  82. volumePluginMgr,
  83. fakeRecorder,
  84. false, /* checkNodeCapabilitiesBeforeMount */
  85. fakeHandler))
  86. nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */)
  87. reconciler := NewReconciler(
  88. reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, fakeRecorder)
  89. podName := "pod-uid"
  90. volumeName := v1.UniqueVolumeName("volume-name")
  91. volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
  92. nodeName := k8stypes.NodeName("node-name")
  93. dsw.AddNode(nodeName, false /*keepTerminatedPodVolumes*/)
  94. volumeExists := dsw.VolumeExists(volumeName, nodeName)
  95. if volumeExists {
  96. t.Fatalf(
  97. "Volume %q/node %q should not exist, but it does.",
  98. volumeName,
  99. nodeName)
  100. }
  101. _, podErr := dsw.AddPod(types.UniquePodName(podName), controllervolumetesting.NewPod(podName, podName), volumeSpec, nodeName)
  102. if podErr != nil {
  103. t.Fatalf("AddPod failed. Expected: <no error> Actual: <%v>", podErr)
  104. }
  105. // Act
  106. ch := make(chan struct{})
  107. go reconciler.Run(ch)
  108. defer close(ch)
  109. // Assert
  110. waitForNewAttacherCallCount(t, 1 /* expectedCallCount */, fakePlugin)
  111. waitForAttachCallCount(t, 1 /* expectedAttachCallCount */, fakePlugin)
  112. verifyNewDetacherCallCount(t, true /* expectZeroNewDetacherCallCount */, fakePlugin)
  113. }
  114. // Populates desiredStateOfWorld cache with one node/volume/pod tuple.
  115. // Calls Run()
  116. // Verifies there is one attach call and no detach calls.
  117. // Marks the node/volume as unmounted.
  118. // Deletes the node/volume/pod tuple from desiredStateOfWorld cache.
  119. // Verifies there is one detach call and no (new) attach calls.
  120. func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithUnmountedVolume(t *testing.T) {
  121. // Arrange
  122. volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
  123. dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
  124. asw := cache.NewActualStateOfWorld(volumePluginMgr)
  125. fakeKubeClient := controllervolumetesting.CreateTestClient()
  126. fakeRecorder := &record.FakeRecorder{}
  127. fakeHandler := volumetesting.NewBlockVolumePathHandler()
  128. ad := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  129. fakeKubeClient,
  130. volumePluginMgr,
  131. fakeRecorder,
  132. false, /* checkNodeCapabilitiesBeforeMount */
  133. fakeHandler))
  134. nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */)
  135. reconciler := NewReconciler(
  136. reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, fakeRecorder)
  137. podName := "pod-uid"
  138. volumeName := v1.UniqueVolumeName("volume-name")
  139. volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
  140. nodeName := k8stypes.NodeName("node-name")
  141. dsw.AddNode(nodeName, false /*keepTerminatedPodVolumes*/)
  142. volumeExists := dsw.VolumeExists(volumeName, nodeName)
  143. if volumeExists {
  144. t.Fatalf(
  145. "Volume %q/node %q should not exist, but it does.",
  146. volumeName,
  147. nodeName)
  148. }
  149. generatedVolumeName, podAddErr := dsw.AddPod(types.UniquePodName(podName), controllervolumetesting.NewPod(podName, podName), volumeSpec, nodeName)
  150. if podAddErr != nil {
  151. t.Fatalf("AddPod failed. Expected: <no error> Actual: <%v>", podAddErr)
  152. }
  153. // Act
  154. ch := make(chan struct{})
  155. go reconciler.Run(ch)
  156. defer close(ch)
  157. // Assert
  158. waitForNewAttacherCallCount(t, 1 /* expectedCallCount */, fakePlugin)
  159. verifyNewAttacherCallCount(t, false /* expectZeroNewAttacherCallCount */, fakePlugin)
  160. waitForAttachCallCount(t, 1 /* expectedAttachCallCount */, fakePlugin)
  161. verifyNewDetacherCallCount(t, true /* expectZeroNewDetacherCallCount */, fakePlugin)
  162. waitForDetachCallCount(t, 0 /* expectedDetachCallCount */, fakePlugin)
  163. // Act
  164. dsw.DeletePod(types.UniquePodName(podName), generatedVolumeName, nodeName)
  165. volumeExists = dsw.VolumeExists(generatedVolumeName, nodeName)
  166. if volumeExists {
  167. t.Fatalf(
  168. "Deleted pod %q from volume %q/node %q. Volume should also be deleted but it still exists.",
  169. podName,
  170. generatedVolumeName,
  171. nodeName)
  172. }
  173. asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, true /* mounted */)
  174. asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, false /* mounted */)
  175. // Assert
  176. waitForNewDetacherCallCount(t, 1 /* expectedCallCount */, fakePlugin)
  177. verifyNewAttacherCallCount(t, false /* expectZeroNewAttacherCallCount */, fakePlugin)
  178. waitForAttachCallCount(t, 1 /* expectedAttachCallCount */, fakePlugin)
  179. verifyNewDetacherCallCount(t, false /* expectZeroNewDetacherCallCount */, fakePlugin)
  180. waitForDetachCallCount(t, 1 /* expectedDetachCallCount */, fakePlugin)
  181. }
  182. // Populates desiredStateOfWorld cache with one node/volume/pod tuple.
  183. // Calls Run()
  184. // Verifies there is one attach call and no detach calls.
  185. // Deletes the node/volume/pod tuple from desiredStateOfWorld cache without first marking the node/volume as unmounted.
  186. // Verifies there is one detach call and no (new) attach calls.
  187. func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithMountedVolume(t *testing.T) {
  188. // Arrange
  189. volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
  190. dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
  191. asw := cache.NewActualStateOfWorld(volumePluginMgr)
  192. fakeKubeClient := controllervolumetesting.CreateTestClient()
  193. fakeRecorder := &record.FakeRecorder{}
  194. fakeHandler := volumetesting.NewBlockVolumePathHandler()
  195. ad := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  196. fakeKubeClient,
  197. volumePluginMgr,
  198. fakeRecorder,
  199. false, /* checkNodeCapabilitiesBeforeMount */
  200. fakeHandler))
  201. nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */)
  202. reconciler := NewReconciler(
  203. reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, fakeRecorder)
  204. podName := "pod-uid"
  205. volumeName := v1.UniqueVolumeName("volume-name")
  206. volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
  207. nodeName := k8stypes.NodeName("node-name")
  208. dsw.AddNode(nodeName, false /*keepTerminatedPodVolumes*/)
  209. volumeExists := dsw.VolumeExists(volumeName, nodeName)
  210. if volumeExists {
  211. t.Fatalf(
  212. "Volume %q/node %q should not exist, but it does.",
  213. volumeName,
  214. nodeName)
  215. }
  216. generatedVolumeName, podAddErr := dsw.AddPod(types.UniquePodName(podName), controllervolumetesting.NewPod(podName, podName), volumeSpec, nodeName)
  217. if podAddErr != nil {
  218. t.Fatalf("AddPod failed. Expected: <no error> Actual: <%v>", podAddErr)
  219. }
  220. // Act
  221. ch := make(chan struct{})
  222. go reconciler.Run(ch)
  223. defer close(ch)
  224. // Assert
  225. waitForNewAttacherCallCount(t, 1 /* expectedCallCount */, fakePlugin)
  226. verifyNewAttacherCallCount(t, false /* expectZeroNewAttacherCallCount */, fakePlugin)
  227. waitForAttachCallCount(t, 1 /* expectedAttachCallCount */, fakePlugin)
  228. verifyNewDetacherCallCount(t, true /* expectZeroNewDetacherCallCount */, fakePlugin)
  229. waitForDetachCallCount(t, 0 /* expectedDetachCallCount */, fakePlugin)
  230. // Act
  231. dsw.DeletePod(types.UniquePodName(podName), generatedVolumeName, nodeName)
  232. volumeExists = dsw.VolumeExists(generatedVolumeName, nodeName)
  233. if volumeExists {
  234. t.Fatalf(
  235. "Deleted pod %q from volume %q/node %q. Volume should also be deleted but it still exists.",
  236. podName,
  237. generatedVolumeName,
  238. nodeName)
  239. }
  240. // Assert -- Timer will triger detach
  241. waitForNewDetacherCallCount(t, 1 /* expectedCallCount */, fakePlugin)
  242. verifyNewAttacherCallCount(t, false /* expectZeroNewAttacherCallCount */, fakePlugin)
  243. waitForAttachCallCount(t, 1 /* expectedAttachCallCount */, fakePlugin)
  244. verifyNewDetacherCallCount(t, false /* expectZeroNewDetacherCallCount */, fakePlugin)
  245. waitForDetachCallCount(t, 1 /* expectedDetachCallCount */, fakePlugin)
  246. }
  247. // Populates desiredStateOfWorld cache with one node/volume/pod tuple.
  248. // Has node update fail
  249. // Calls Run()
  250. // Verifies there is one attach call and no detach calls.
  251. // Marks the node/volume as unmounted.
  252. // Deletes the node/volume/pod tuple from desiredStateOfWorld cache.
  253. // Verifies there are NO detach call and no (new) attach calls.
  254. func Test_Run_Negative_OneDesiredVolumeAttachThenDetachWithUnmountedVolumeUpdateStatusFail(t *testing.T) {
  255. // Arrange
  256. volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
  257. dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
  258. asw := cache.NewActualStateOfWorld(volumePluginMgr)
  259. fakeKubeClient := controllervolumetesting.CreateTestClient()
  260. fakeRecorder := &record.FakeRecorder{}
  261. fakeHandler := volumetesting.NewBlockVolumePathHandler()
  262. ad := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  263. fakeKubeClient,
  264. volumePluginMgr,
  265. fakeRecorder,
  266. false, /* checkNodeCapabilitiesBeforeMount */
  267. fakeHandler))
  268. nsu := statusupdater.NewFakeNodeStatusUpdater(true /* returnError */)
  269. reconciler := NewReconciler(
  270. reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, fakeRecorder)
  271. podName := "pod-uid"
  272. volumeName := v1.UniqueVolumeName("volume-name")
  273. volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
  274. nodeName := k8stypes.NodeName("node-name")
  275. dsw.AddNode(nodeName, false /*keepTerminatedPodVolumes*/)
  276. volumeExists := dsw.VolumeExists(volumeName, nodeName)
  277. if volumeExists {
  278. t.Fatalf(
  279. "Volume %q/node %q should not exist, but it does.",
  280. volumeName,
  281. nodeName)
  282. }
  283. generatedVolumeName, podAddErr := dsw.AddPod(types.UniquePodName(podName), controllervolumetesting.NewPod(podName, podName), volumeSpec, nodeName)
  284. if podAddErr != nil {
  285. t.Fatalf("AddPod failed. Expected: <no error> Actual: <%v>", podAddErr)
  286. }
  287. // Act
  288. ch := make(chan struct{})
  289. go reconciler.Run(ch)
  290. defer close(ch)
  291. // Assert
  292. waitForNewAttacherCallCount(t, 1 /* expectedCallCount */, fakePlugin)
  293. verifyNewAttacherCallCount(t, false /* expectZeroNewAttacherCallCount */, fakePlugin)
  294. waitForAttachCallCount(t, 1 /* expectedAttachCallCount */, fakePlugin)
  295. verifyNewDetacherCallCount(t, true /* expectZeroNewDetacherCallCount */, fakePlugin)
  296. waitForDetachCallCount(t, 0 /* expectedDetachCallCount */, fakePlugin)
  297. // Act
  298. dsw.DeletePod(types.UniquePodName(podName), generatedVolumeName, nodeName)
  299. volumeExists = dsw.VolumeExists(generatedVolumeName, nodeName)
  300. if volumeExists {
  301. t.Fatalf(
  302. "Deleted pod %q from volume %q/node %q. Volume should also be deleted but it still exists.",
  303. podName,
  304. generatedVolumeName,
  305. nodeName)
  306. }
  307. asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, true /* mounted */)
  308. asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, false /* mounted */)
  309. // Assert
  310. verifyNewDetacherCallCount(t, true /* expectZeroNewDetacherCallCount */, fakePlugin)
  311. verifyNewAttacherCallCount(t, false /* expectZeroNewAttacherCallCount */, fakePlugin)
  312. waitForAttachCallCount(t, 1 /* expectedAttachCallCount */, fakePlugin)
  313. verifyNewDetacherCallCount(t, false /* expectZeroNewDetacherCallCount */, fakePlugin)
  314. waitForDetachCallCount(t, 0 /* expectedDetachCallCount */, fakePlugin)
  315. }
  316. // Creates a volume with accessMode ReadWriteMany
  317. // Populates desiredStateOfWorld cache with two ode/volume/pod tuples pointing to the created volume
  318. // Calls Run()
  319. // Verifies there are two attach calls and no detach calls.
  320. // Deletes the first node/volume/pod tuple from desiredStateOfWorld cache without first marking the node/volume as unmounted.
  321. // Verifies there is one detach call and no (new) attach calls.
  322. // Deletes the second node/volume/pod tuple from desiredStateOfWorld cache without first marking the node/volume as unmounted.
  323. // Verifies there are two detach calls and no (new) attach calls.
  324. func Test_Run_OneVolumeAttachAndDetachMultipleNodesWithReadWriteMany(t *testing.T) {
  325. // Arrange
  326. volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
  327. dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
  328. asw := cache.NewActualStateOfWorld(volumePluginMgr)
  329. fakeKubeClient := controllervolumetesting.CreateTestClient()
  330. fakeRecorder := &record.FakeRecorder{}
  331. fakeHandler := volumetesting.NewBlockVolumePathHandler()
  332. ad := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  333. fakeKubeClient,
  334. volumePluginMgr,
  335. fakeRecorder,
  336. false, /* checkNodeCapabilitiesBeforeMount */
  337. fakeHandler))
  338. nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */)
  339. reconciler := NewReconciler(
  340. reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, fakeRecorder)
  341. podName1 := "pod-uid1"
  342. podName2 := "pod-uid2"
  343. volumeName := v1.UniqueVolumeName("volume-name")
  344. volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
  345. volumeSpec.PersistentVolume.Spec.AccessModes = []v1.PersistentVolumeAccessMode{v1.ReadWriteMany}
  346. nodeName1 := k8stypes.NodeName("node-name1")
  347. nodeName2 := k8stypes.NodeName(volumetesting.MultiAttachNode)
  348. dsw.AddNode(nodeName1, false /*keepTerminatedPodVolumes*/)
  349. dsw.AddNode(nodeName2, false /*keepTerminatedPodVolumes*/)
  350. generatedVolumeName, podAddErr := dsw.AddPod(types.UniquePodName(podName1), controllervolumetesting.NewPod(podName1, podName1), volumeSpec, nodeName1)
  351. if podAddErr != nil {
  352. t.Fatalf("AddPod failed. Expected: <no error> Actual: <%v>", podAddErr)
  353. }
  354. _, podAddErr = dsw.AddPod(types.UniquePodName(podName2), controllervolumetesting.NewPod(podName2, podName2), volumeSpec, nodeName2)
  355. if podAddErr != nil {
  356. t.Fatalf("AddPod failed. Expected: <no error> Actual: <%v>", podAddErr)
  357. }
  358. // Act
  359. ch := make(chan struct{})
  360. go reconciler.Run(ch)
  361. defer close(ch)
  362. // Assert
  363. waitForNewAttacherCallCount(t, 2 /* expectedCallCount */, fakePlugin)
  364. verifyNewAttacherCallCount(t, false /* expectZeroNewAttacherCallCount */, fakePlugin)
  365. waitForTotalAttachCallCount(t, 2 /* expectedAttachCallCount */, fakePlugin)
  366. verifyNewDetacherCallCount(t, true /* expectZeroNewDetacherCallCount */, fakePlugin)
  367. waitForDetachCallCount(t, 0 /* expectedDetachCallCount */, fakePlugin)
  368. waitForAttachedToNodesCount(t, 2 /* expectedNodeCount */, generatedVolumeName, asw)
  369. // Act
  370. dsw.DeletePod(types.UniquePodName(podName1), generatedVolumeName, nodeName1)
  371. volumeExists := dsw.VolumeExists(generatedVolumeName, nodeName1)
  372. if volumeExists {
  373. t.Fatalf(
  374. "Deleted pod %q from volume %q/node %q. Volume should also be deleted but it still exists.",
  375. podName1,
  376. generatedVolumeName,
  377. nodeName1)
  378. }
  379. // Assert -- Timer will triger detach
  380. waitForNewDetacherCallCount(t, 1 /* expectedCallCount */, fakePlugin)
  381. verifyNewAttacherCallCount(t, false /* expectZeroNewAttacherCallCount */, fakePlugin)
  382. waitForTotalAttachCallCount(t, 2 /* expectedAttachCallCount */, fakePlugin)
  383. verifyNewDetacherCallCount(t, false /* expectZeroNewDetacherCallCount */, fakePlugin)
  384. waitForTotalDetachCallCount(t, 1 /* expectedDetachCallCount */, fakePlugin)
  385. // Act
  386. dsw.DeletePod(types.UniquePodName(podName2), generatedVolumeName, nodeName2)
  387. volumeExists = dsw.VolumeExists(generatedVolumeName, nodeName2)
  388. if volumeExists {
  389. t.Fatalf(
  390. "Deleted pod %q from volume %q/node %q. Volume should also be deleted but it still exists.",
  391. podName2,
  392. generatedVolumeName,
  393. nodeName2)
  394. }
  395. // Assert -- Timer will triger detach
  396. waitForNewDetacherCallCount(t, 2 /* expectedCallCount */, fakePlugin)
  397. verifyNewAttacherCallCount(t, false /* expectZeroNewAttacherCallCount */, fakePlugin)
  398. waitForTotalAttachCallCount(t, 2 /* expectedAttachCallCount */, fakePlugin)
  399. verifyNewDetacherCallCount(t, false /* expectZeroNewDetacherCallCount */, fakePlugin)
  400. waitForTotalDetachCallCount(t, 2 /* expectedDetachCallCount */, fakePlugin)
  401. }
  402. // Creates a volume with accessMode ReadWriteOnce
  403. // Populates desiredStateOfWorld cache with two ode/volume/pod tuples pointing to the created volume
  404. // Calls Run()
  405. // Verifies there is one attach call and no detach calls.
  406. // Deletes the node/volume/pod tuple from desiredStateOfWorld which succeeded in attaching
  407. // Verifies there are two attach call and one detach call.
  408. func Test_Run_OneVolumeAttachAndDetachMultipleNodesWithReadWriteOnce(t *testing.T) {
  409. // Arrange
  410. volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
  411. dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
  412. asw := cache.NewActualStateOfWorld(volumePluginMgr)
  413. fakeKubeClient := controllervolumetesting.CreateTestClient()
  414. fakeRecorder := &record.FakeRecorder{}
  415. fakeHandler := volumetesting.NewBlockVolumePathHandler()
  416. ad := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  417. fakeKubeClient,
  418. volumePluginMgr,
  419. fakeRecorder,
  420. false, /* checkNodeCapabilitiesBeforeMount */
  421. fakeHandler))
  422. nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */)
  423. reconciler := NewReconciler(
  424. reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, fakeRecorder)
  425. podName1 := "pod-uid1"
  426. podName2 := "pod-uid2"
  427. volumeName := v1.UniqueVolumeName("volume-name")
  428. volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
  429. volumeSpec.PersistentVolume.Spec.AccessModes = []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}
  430. nodeName1 := k8stypes.NodeName("node-name1")
  431. nodeName2 := k8stypes.NodeName("node-name2")
  432. dsw.AddNode(nodeName1, false /*keepTerminatedPodVolumes*/)
  433. dsw.AddNode(nodeName2, false /*keepTerminatedPodVolumes*/)
  434. // Add both pods at the same time to provoke a potential race condition in the reconciler
  435. generatedVolumeName, podAddErr := dsw.AddPod(types.UniquePodName(podName1), controllervolumetesting.NewPod(podName1, podName1), volumeSpec, nodeName1)
  436. if podAddErr != nil {
  437. t.Fatalf("AddPod failed. Expected: <no error> Actual: <%v>", podAddErr)
  438. }
  439. _, podAddErr = dsw.AddPod(types.UniquePodName(podName2), controllervolumetesting.NewPod(podName2, podName2), volumeSpec, nodeName2)
  440. if podAddErr != nil {
  441. t.Fatalf("AddPod failed. Expected: <no error> Actual: <%v>", podAddErr)
  442. }
  443. // Act
  444. ch := make(chan struct{})
  445. go reconciler.Run(ch)
  446. defer close(ch)
  447. // Assert
  448. waitForNewAttacherCallCount(t, 1 /* expectedCallCount */, fakePlugin)
  449. verifyNewAttacherCallCount(t, false /* expectZeroNewAttacherCallCount */, fakePlugin)
  450. waitForTotalAttachCallCount(t, 1 /* expectedAttachCallCount */, fakePlugin)
  451. verifyNewDetacherCallCount(t, true /* expectZeroNewDetacherCallCount */, fakePlugin)
  452. waitForDetachCallCount(t, 0 /* expectedDetachCallCount */, fakePlugin)
  453. waitForAttachedToNodesCount(t, 1 /* expectedNodeCount */, generatedVolumeName, asw)
  454. nodesForVolume := asw.GetNodesForAttachedVolume(generatedVolumeName)
  455. // check if multiattach is marked
  456. // at least one volume+node should be marked with multiattach error
  457. nodeAttachedTo := nodesForVolume[0]
  458. waitForMultiAttachErrorOnNode(t, nodeAttachedTo, dsw)
  459. // Act
  460. podToDelete := ""
  461. if nodesForVolume[0] == nodeName1 {
  462. podToDelete = podName1
  463. } else if nodesForVolume[0] == nodeName2 {
  464. podToDelete = podName2
  465. } else {
  466. t.Fatal("Volume attached to unexpected node")
  467. }
  468. dsw.DeletePod(types.UniquePodName(podToDelete), generatedVolumeName, nodesForVolume[0])
  469. volumeExists := dsw.VolumeExists(generatedVolumeName, nodesForVolume[0])
  470. if volumeExists {
  471. t.Fatalf(
  472. "Deleted pod %q from volume %q/node %q. Volume should also be deleted but it still exists.",
  473. podToDelete,
  474. generatedVolumeName,
  475. nodesForVolume[0])
  476. }
  477. // Assert
  478. waitForNewDetacherCallCount(t, 1 /* expectedCallCount */, fakePlugin)
  479. verifyNewDetacherCallCount(t, false /* expectZeroNewDetacherCallCount */, fakePlugin)
  480. waitForTotalDetachCallCount(t, 1 /* expectedDetachCallCount */, fakePlugin)
  481. waitForNewAttacherCallCount(t, 2 /* expectedCallCount */, fakePlugin)
  482. verifyNewAttacherCallCount(t, false /* expectZeroNewAttacherCallCount */, fakePlugin)
  483. waitForTotalAttachCallCount(t, 2 /* expectedAttachCallCount */, fakePlugin)
  484. }
  485. // Creates a volume with accessMode ReadWriteOnce
  486. // First create a pod which will try to attach the volume to the a node named "uncertain-node". The attach call for this node will
  487. // fail for timeout, but the volume will be actually attached to the node after the call.
  488. // Secondly, delete the this pod.
  489. // Lastly, create a pod scheduled to a normal node which will trigger attach volume to the node. The attach should return successfully.
  490. func Test_Run_OneVolumeAttachAndDetachUncertainNodesWithReadWriteOnce(t *testing.T) {
  491. // Arrange
  492. volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
  493. dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
  494. asw := cache.NewActualStateOfWorld(volumePluginMgr)
  495. fakeKubeClient := controllervolumetesting.CreateTestClient()
  496. fakeRecorder := &record.FakeRecorder{}
  497. fakeHandler := volumetesting.NewBlockVolumePathHandler()
  498. ad := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  499. fakeKubeClient,
  500. volumePluginMgr,
  501. fakeRecorder,
  502. false, /* checkNodeCapabilitiesBeforeMount */
  503. fakeHandler))
  504. nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */)
  505. reconciler := NewReconciler(
  506. reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, fakeRecorder)
  507. podName1 := "pod-uid1"
  508. podName2 := "pod-uid2"
  509. volumeName := v1.UniqueVolumeName("volume-name")
  510. volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
  511. volumeSpec.PersistentVolume.Spec.AccessModes = []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}
  512. nodeName1 := k8stypes.NodeName(volumetesting.UncertainAttachNode)
  513. nodeName2 := k8stypes.NodeName("node-name2")
  514. dsw.AddNode(nodeName1, false /*keepTerminatedPodVolumes*/)
  515. dsw.AddNode(nodeName2, false /*keepTerminatedPodVolumes*/)
  516. // Act
  517. ch := make(chan struct{})
  518. go reconciler.Run(ch)
  519. defer close(ch)
  520. // Add the pod in which the volume is attached to the uncertain node
  521. generatedVolumeName, podAddErr := dsw.AddPod(types.UniquePodName(podName1), controllervolumetesting.NewPod(podName1, podName1), volumeSpec, nodeName1)
  522. if podAddErr != nil {
  523. t.Fatalf("AddPod failed. Expected: <no error> Actual: <%v>", podAddErr)
  524. }
  525. time.Sleep(1 * time.Second)
  526. // Volume is added to asw. Because attach operation fails, volume should not reported as attached to the node.
  527. waitForVolumeAddedToNode(t, generatedVolumeName, nodeName1, asw)
  528. verifyVolumeAttachedToNode(t, generatedVolumeName, nodeName1, true, asw)
  529. verifyVolumeReportedAsAttachedToNode(t, generatedVolumeName, nodeName1, true, asw)
  530. // When volume is added to the node, it is set to mounted by default. Then the status will be updated by checking node status VolumeInUse.
  531. // Without this, the delete operation will be delayed due to mounted status
  532. asw.SetVolumeMountedByNode(generatedVolumeName, nodeName1, false /* mounted */)
  533. dsw.DeletePod(types.UniquePodName(podName1), generatedVolumeName, nodeName1)
  534. waitForVolumeRemovedFromNode(t, generatedVolumeName, nodeName1, asw)
  535. // Add a second pod which tries to attach the volume to a different node.
  536. generatedVolumeName, podAddErr = dsw.AddPod(types.UniquePodName(podName2), controllervolumetesting.NewPod(podName2, podName2), volumeSpec, nodeName2)
  537. if podAddErr != nil {
  538. t.Fatalf("AddPod failed. Expected: <no error> Actual: <%v>", podAddErr)
  539. }
  540. waitForVolumeAttachedToNode(t, generatedVolumeName, nodeName2, asw)
  541. verifyVolumeAttachedToNode(t, generatedVolumeName, nodeName2, true, asw)
  542. }
  543. // Creates a volume with accessMode ReadWriteOnce
  544. // First create a pod which will try to attach the volume to the a node named "timeout-node". The attach call for this node will
  545. // fail for timeout, but the volume will be actually attached to the node after the call.
  546. // Secondly, delete the this pod.
  547. // Lastly, create a pod scheduled to a normal node which will trigger attach volume to the node. The attach should return successfully.
  548. func Test_Run_OneVolumeAttachAndDetachTimeoutNodesWithReadWriteOnce(t *testing.T) {
  549. // Arrange
  550. volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
  551. dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
  552. asw := cache.NewActualStateOfWorld(volumePluginMgr)
  553. fakeKubeClient := controllervolumetesting.CreateTestClient()
  554. fakeRecorder := &record.FakeRecorder{}
  555. fakeHandler := volumetesting.NewBlockVolumePathHandler()
  556. ad := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  557. fakeKubeClient,
  558. volumePluginMgr,
  559. fakeRecorder,
  560. false, /* checkNodeCapabilitiesBeforeMount */
  561. fakeHandler))
  562. nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */)
  563. reconciler := NewReconciler(
  564. reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, fakeRecorder)
  565. podName1 := "pod-uid1"
  566. podName2 := "pod-uid2"
  567. volumeName := v1.UniqueVolumeName("volume-name")
  568. volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
  569. volumeSpec.PersistentVolume.Spec.AccessModes = []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}
  570. nodeName1 := k8stypes.NodeName(volumetesting.TimeoutAttachNode)
  571. nodeName2 := k8stypes.NodeName("node-name2")
  572. dsw.AddNode(nodeName1, false /*keepTerminatedPodVolumes*/)
  573. dsw.AddNode(nodeName2, false /*keepTerminatedPodVolumes*/)
  574. // Act
  575. ch := make(chan struct{})
  576. go reconciler.Run(ch)
  577. defer close(ch)
  578. // Add the pod in which the volume is attached to the timeout node
  579. generatedVolumeName, podAddErr := dsw.AddPod(types.UniquePodName(podName1), controllervolumetesting.NewPod(podName1, podName1), volumeSpec, nodeName1)
  580. if podAddErr != nil {
  581. t.Fatalf("AddPod failed. Expected: <no error> Actual: <%v>", podAddErr)
  582. }
  583. // Volume is added to asw. Because attach operation fails, volume should not reported as attached to the node.
  584. waitForVolumeAddedToNode(t, generatedVolumeName, nodeName1, asw)
  585. verifyVolumeAttachedToNode(t, generatedVolumeName, nodeName1, false, asw)
  586. verifyVolumeReportedAsAttachedToNode(t, generatedVolumeName, nodeName1, false, asw)
  587. // When volume is added to the node, it is set to mounted by default. Then the status will be updated by checking node status VolumeInUse.
  588. // Without this, the delete operation will be delayed due to mounted status
  589. asw.SetVolumeMountedByNode(generatedVolumeName, nodeName1, false /* mounted */)
  590. dsw.DeletePod(types.UniquePodName(podName1), generatedVolumeName, nodeName1)
  591. waitForVolumeRemovedFromNode(t, generatedVolumeName, nodeName1, asw)
  592. // Add a second pod which tries to attach the volume to a different node.
  593. generatedVolumeName, podAddErr = dsw.AddPod(types.UniquePodName(podName2), controllervolumetesting.NewPod(podName2, podName2), volumeSpec, nodeName2)
  594. if podAddErr != nil {
  595. t.Fatalf("AddPod failed. Expected: <no error> Actual: <%v>", podAddErr)
  596. }
  597. waitForVolumeAttachedToNode(t, generatedVolumeName, nodeName2, asw)
  598. verifyVolumeAttachedToNode(t, generatedVolumeName, nodeName2, true, asw)
  599. }
  600. func Test_ReportMultiAttachError(t *testing.T) {
  601. type nodeWithPods struct {
  602. name k8stypes.NodeName
  603. podNames []string
  604. }
  605. tests := []struct {
  606. name string
  607. nodes []nodeWithPods
  608. expectedEvents []string
  609. }{
  610. {
  611. "no pods use the volume",
  612. []nodeWithPods{
  613. {"node1", []string{"ns1/pod1"}},
  614. },
  615. []string{"Warning FailedAttachVolume Multi-Attach error for volume \"volume-name\" Volume is already exclusively attached to one node and can't be attached to another"},
  616. },
  617. {
  618. "pods in the same namespace use the volume",
  619. []nodeWithPods{
  620. {"node1", []string{"ns1/pod1"}},
  621. {"node2", []string{"ns1/pod2"}},
  622. },
  623. []string{"Warning FailedAttachVolume Multi-Attach error for volume \"volume-name\" Volume is already used by pod(s) pod2"},
  624. },
  625. {
  626. "pods in anotother namespace use the volume",
  627. []nodeWithPods{
  628. {"node1", []string{"ns1/pod1"}},
  629. {"node2", []string{"ns2/pod2"}},
  630. },
  631. []string{"Warning FailedAttachVolume Multi-Attach error for volume \"volume-name\" Volume is already used by 1 pod(s) in different namespaces"},
  632. },
  633. {
  634. "pods both in the same and anotother namespace use the volume",
  635. []nodeWithPods{
  636. {"node1", []string{"ns1/pod1"}},
  637. {"node2", []string{"ns2/pod2"}},
  638. {"node3", []string{"ns1/pod3"}},
  639. },
  640. []string{"Warning FailedAttachVolume Multi-Attach error for volume \"volume-name\" Volume is already used by pod(s) pod3 and 1 pod(s) in different namespaces"},
  641. },
  642. }
  643. for _, test := range tests {
  644. // Arrange
  645. t.Logf("Test %q starting", test.name)
  646. volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
  647. dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
  648. asw := cache.NewActualStateOfWorld(volumePluginMgr)
  649. fakeKubeClient := controllervolumetesting.CreateTestClient()
  650. fakeRecorder := record.NewFakeRecorder(100)
  651. fakeHandler := volumetesting.NewBlockVolumePathHandler()
  652. ad := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  653. fakeKubeClient,
  654. volumePluginMgr,
  655. fakeRecorder,
  656. false, /* checkNodeCapabilitiesBeforeMount */
  657. fakeHandler))
  658. nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */)
  659. rc := NewReconciler(
  660. reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, fakeRecorder)
  661. nodes := []k8stypes.NodeName{}
  662. for _, n := range test.nodes {
  663. dsw.AddNode(n.name, false /*keepTerminatedPodVolumes*/)
  664. nodes = append(nodes, n.name)
  665. for _, podName := range n.podNames {
  666. volumeName := v1.UniqueVolumeName("volume-name")
  667. volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
  668. volumeSpec.PersistentVolume.Spec.AccessModes = []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}
  669. uid := string(n.name) + "-" + podName // unique UID
  670. namespace, name := utilstrings.SplitQualifiedName(podName)
  671. pod := controllervolumetesting.NewPod(uid, name)
  672. pod.Namespace = namespace
  673. _, err := dsw.AddPod(types.UniquePodName(uid), pod, volumeSpec, n.name)
  674. if err != nil {
  675. t.Fatalf("Error adding pod %s to DSW: %s", podName, err)
  676. }
  677. }
  678. }
  679. // Act
  680. volumes := dsw.GetVolumesToAttach()
  681. for _, vol := range volumes {
  682. if vol.NodeName == "node1" {
  683. rc.(*reconciler).reportMultiAttachError(vol, nodes)
  684. }
  685. }
  686. // Assert
  687. close(fakeRecorder.Events)
  688. index := 0
  689. for event := range fakeRecorder.Events {
  690. if len(test.expectedEvents) < index {
  691. t.Errorf("Test %q: unexpected event received: %s", test.name, event)
  692. } else {
  693. expectedEvent := test.expectedEvents[index]
  694. if expectedEvent != event {
  695. t.Errorf("Test %q: event %d: expected %q, got %q", test.name, index, expectedEvent, event)
  696. }
  697. }
  698. index++
  699. }
  700. for i := index; i < len(test.expectedEvents); i++ {
  701. t.Errorf("Test %q: event %d: expected %q, got none", test.name, i, test.expectedEvents[i])
  702. }
  703. }
  704. }
  705. func waitForMultiAttachErrorOnNode(
  706. t *testing.T,
  707. attachedNode k8stypes.NodeName,
  708. dsow cache.DesiredStateOfWorld) {
  709. multAttachCheckFunc := func() (bool, error) {
  710. for _, volumeToAttach := range dsow.GetVolumesToAttach() {
  711. if volumeToAttach.NodeName != attachedNode {
  712. if volumeToAttach.MultiAttachErrorReported {
  713. return true, nil
  714. }
  715. }
  716. }
  717. t.Logf("Warning: MultiAttach error not yet set on Node. Will retry.")
  718. return false, nil
  719. }
  720. err := retryWithExponentialBackOff(100*time.Millisecond, multAttachCheckFunc)
  721. if err != nil {
  722. t.Fatalf("Timed out waiting for MultiAttach Error to be set on non-attached node")
  723. }
  724. }
  725. func waitForNewAttacherCallCount(
  726. t *testing.T,
  727. expectedCallCount int,
  728. fakePlugin *volumetesting.FakeVolumePlugin) {
  729. err := retryWithExponentialBackOff(
  730. time.Duration(5*time.Millisecond),
  731. func() (bool, error) {
  732. actualCallCount := fakePlugin.GetNewAttacherCallCount()
  733. if actualCallCount >= expectedCallCount {
  734. return true, nil
  735. }
  736. t.Logf(
  737. "Warning: Wrong NewAttacherCallCount. Expected: <%v> Actual: <%v>. Will retry.",
  738. expectedCallCount,
  739. actualCallCount)
  740. return false, nil
  741. },
  742. )
  743. if err != nil {
  744. t.Fatalf(
  745. "Timed out waiting for NewAttacherCallCount. Expected: <%v> Actual: <%v>",
  746. expectedCallCount,
  747. fakePlugin.GetNewAttacherCallCount())
  748. }
  749. }
  750. func waitForNewDetacherCallCount(
  751. t *testing.T,
  752. expectedCallCount int,
  753. fakePlugin *volumetesting.FakeVolumePlugin) {
  754. err := retryWithExponentialBackOff(
  755. time.Duration(5*time.Millisecond),
  756. func() (bool, error) {
  757. actualCallCount := fakePlugin.GetNewDetacherCallCount()
  758. if actualCallCount >= expectedCallCount {
  759. return true, nil
  760. }
  761. t.Logf(
  762. "Warning: Wrong NewDetacherCallCount. Expected: <%v> Actual: <%v>. Will retry.",
  763. expectedCallCount,
  764. actualCallCount)
  765. return false, nil
  766. },
  767. )
  768. if err != nil {
  769. t.Fatalf(
  770. "Timed out waiting for NewDetacherCallCount. Expected: <%v> Actual: <%v>",
  771. expectedCallCount,
  772. fakePlugin.GetNewDetacherCallCount())
  773. }
  774. }
  775. func waitForAttachCallCount(
  776. t *testing.T,
  777. expectedAttachCallCount int,
  778. fakePlugin *volumetesting.FakeVolumePlugin) {
  779. if len(fakePlugin.GetAttachers()) == 0 && expectedAttachCallCount == 0 {
  780. return
  781. }
  782. err := retryWithExponentialBackOff(
  783. time.Duration(5*time.Millisecond),
  784. func() (bool, error) {
  785. for i, attacher := range fakePlugin.GetAttachers() {
  786. actualCallCount := attacher.GetAttachCallCount()
  787. if actualCallCount == expectedAttachCallCount {
  788. return true, nil
  789. }
  790. t.Logf(
  791. "Warning: Wrong attacher[%v].GetAttachCallCount(). Expected: <%v> Actual: <%v>. Will try next attacher.",
  792. i,
  793. expectedAttachCallCount,
  794. actualCallCount)
  795. }
  796. t.Logf(
  797. "Warning: No attachers have expected AttachCallCount. Expected: <%v>. Will retry.",
  798. expectedAttachCallCount)
  799. return false, nil
  800. },
  801. )
  802. if err != nil {
  803. t.Fatalf(
  804. "No attachers have expected AttachCallCount. Expected: <%v>",
  805. expectedAttachCallCount)
  806. }
  807. }
  808. func waitForTotalAttachCallCount(
  809. t *testing.T,
  810. expectedAttachCallCount int,
  811. fakePlugin *volumetesting.FakeVolumePlugin) {
  812. if len(fakePlugin.GetAttachers()) == 0 && expectedAttachCallCount == 0 {
  813. return
  814. }
  815. err := retryWithExponentialBackOff(
  816. time.Duration(5*time.Millisecond),
  817. func() (bool, error) {
  818. totalCount := 0
  819. for _, attacher := range fakePlugin.GetAttachers() {
  820. totalCount += attacher.GetAttachCallCount()
  821. }
  822. if totalCount == expectedAttachCallCount {
  823. return true, nil
  824. }
  825. t.Logf(
  826. "Warning: Wrong total GetAttachCallCount(). Expected: <%v> Actual: <%v>. Will retry.",
  827. expectedAttachCallCount,
  828. totalCount)
  829. return false, nil
  830. },
  831. )
  832. if err != nil {
  833. t.Fatalf(
  834. "Total AttachCallCount does not match expected value. Expected: <%v>",
  835. expectedAttachCallCount)
  836. }
  837. }
  838. func waitForDetachCallCount(
  839. t *testing.T,
  840. expectedDetachCallCount int,
  841. fakePlugin *volumetesting.FakeVolumePlugin) {
  842. if len(fakePlugin.GetDetachers()) == 0 && expectedDetachCallCount == 0 {
  843. return
  844. }
  845. err := retryWithExponentialBackOff(
  846. time.Duration(5*time.Millisecond),
  847. func() (bool, error) {
  848. for i, detacher := range fakePlugin.GetDetachers() {
  849. actualCallCount := detacher.GetDetachCallCount()
  850. if actualCallCount == expectedDetachCallCount {
  851. return true, nil
  852. }
  853. t.Logf(
  854. "Wrong detacher[%v].GetDetachCallCount(). Expected: <%v> Actual: <%v>. Will try next detacher.",
  855. i,
  856. expectedDetachCallCount,
  857. actualCallCount)
  858. }
  859. t.Logf(
  860. "Warning: No detachers have expected DetachCallCount. Expected: <%v>. Will retry.",
  861. expectedDetachCallCount)
  862. return false, nil
  863. },
  864. )
  865. if err != nil {
  866. t.Fatalf(
  867. "No detachers have expected DetachCallCount. Expected: <%v>",
  868. expectedDetachCallCount)
  869. }
  870. }
  871. func waitForTotalDetachCallCount(
  872. t *testing.T,
  873. expectedDetachCallCount int,
  874. fakePlugin *volumetesting.FakeVolumePlugin) {
  875. if len(fakePlugin.GetDetachers()) == 0 && expectedDetachCallCount == 0 {
  876. return
  877. }
  878. err := retryWithExponentialBackOff(
  879. time.Duration(5*time.Millisecond),
  880. func() (bool, error) {
  881. totalCount := 0
  882. for _, detacher := range fakePlugin.GetDetachers() {
  883. totalCount += detacher.GetDetachCallCount()
  884. }
  885. if totalCount == expectedDetachCallCount {
  886. return true, nil
  887. }
  888. t.Logf(
  889. "Warning: Wrong total GetDetachCallCount(). Expected: <%v> Actual: <%v>. Will retry.",
  890. expectedDetachCallCount,
  891. totalCount)
  892. return false, nil
  893. },
  894. )
  895. if err != nil {
  896. t.Fatalf(
  897. "Total DetachCallCount does not match expected value. Expected: <%v>",
  898. expectedDetachCallCount)
  899. }
  900. }
  901. func waitForAttachedToNodesCount(
  902. t *testing.T,
  903. expectedNodeCount int,
  904. volumeName v1.UniqueVolumeName,
  905. asw cache.ActualStateOfWorld) {
  906. err := retryWithExponentialBackOff(
  907. time.Duration(5*time.Millisecond),
  908. func() (bool, error) {
  909. count := len(asw.GetNodesForAttachedVolume(volumeName))
  910. if count == expectedNodeCount {
  911. return true, nil
  912. }
  913. t.Logf(
  914. "Warning: Wrong number of nodes having <%v> attached. Expected: <%v> Actual: <%v>. Will retry.",
  915. volumeName,
  916. expectedNodeCount,
  917. count)
  918. return false, nil
  919. },
  920. )
  921. if err != nil {
  922. count := len(asw.GetNodesForAttachedVolume(volumeName))
  923. t.Fatalf(
  924. "Wrong number of nodes having <%v> attached. Expected: <%v> Actual: <%v>",
  925. volumeName,
  926. expectedNodeCount,
  927. count)
  928. }
  929. }
  930. func verifyNewAttacherCallCount(
  931. t *testing.T,
  932. expectZeroNewAttacherCallCount bool,
  933. fakePlugin *volumetesting.FakeVolumePlugin) {
  934. if expectZeroNewAttacherCallCount &&
  935. fakePlugin.GetNewAttacherCallCount() != 0 {
  936. t.Fatalf(
  937. "Wrong NewAttacherCallCount. Expected: <0> Actual: <%v>",
  938. fakePlugin.GetNewAttacherCallCount())
  939. }
  940. }
  941. func waitForVolumeAttachedToNode(
  942. t *testing.T,
  943. volumeName v1.UniqueVolumeName,
  944. nodeName k8stypes.NodeName,
  945. asw cache.ActualStateOfWorld) {
  946. err := retryWithExponentialBackOff(
  947. time.Duration(500*time.Millisecond),
  948. func() (bool, error) {
  949. if asw.IsVolumeAttachedToNode(volumeName, nodeName) {
  950. return true, nil
  951. }
  952. t.Logf(
  953. "Warning: Volume <%v> is not attached to node <%v> yet. Will retry.",
  954. volumeName,
  955. nodeName)
  956. return false, nil
  957. },
  958. )
  959. if err != nil && !asw.IsVolumeAttachedToNode(volumeName, nodeName) {
  960. t.Fatalf(
  961. "Volume <%v> is not attached to node <%v>.",
  962. volumeName,
  963. nodeName)
  964. }
  965. }
  966. func waitForVolumeAddedToNode(
  967. t *testing.T,
  968. volumeName v1.UniqueVolumeName,
  969. nodeName k8stypes.NodeName,
  970. asw cache.ActualStateOfWorld) {
  971. err := retryWithExponentialBackOff(
  972. time.Duration(500*time.Millisecond),
  973. func() (bool, error) {
  974. volumes := asw.GetAttachedVolumes()
  975. for _, volume := range volumes {
  976. if volume.VolumeName == volumeName && volume.NodeName == nodeName {
  977. return true, nil
  978. }
  979. }
  980. t.Logf(
  981. "Warning: Volume <%v> is not added to node <%v> yet. Will retry.",
  982. volumeName,
  983. nodeName)
  984. return false, nil
  985. },
  986. )
  987. if err != nil {
  988. t.Fatalf(
  989. "Volume <%v> is not added to node <%v>. %v",
  990. volumeName,
  991. nodeName, err)
  992. }
  993. }
  994. func waitForVolumeRemovedFromNode(
  995. t *testing.T,
  996. volumeName v1.UniqueVolumeName,
  997. nodeName k8stypes.NodeName,
  998. asw cache.ActualStateOfWorld) {
  999. err := retryWithExponentialBackOff(
  1000. time.Duration(500*time.Millisecond),
  1001. func() (bool, error) {
  1002. volumes := asw.GetAttachedVolumes()
  1003. exist := false
  1004. for _, volume := range volumes {
  1005. if volume.VolumeName == volumeName && volume.NodeName == nodeName {
  1006. exist = true
  1007. }
  1008. }
  1009. if exist {
  1010. t.Logf(
  1011. "Warning: Volume <%v> is not removed from the node <%v> yet. Will retry.",
  1012. volumeName,
  1013. nodeName)
  1014. return false, nil
  1015. }
  1016. return true, nil
  1017. },
  1018. )
  1019. if err != nil {
  1020. t.Fatalf(
  1021. "Volume <%v> is not removed from node <%v>. %v",
  1022. volumeName,
  1023. nodeName, err)
  1024. }
  1025. }
  1026. func verifyVolumeAttachedToNode(
  1027. t *testing.T,
  1028. volumeName v1.UniqueVolumeName,
  1029. nodeName k8stypes.NodeName,
  1030. isAttached bool,
  1031. asw cache.ActualStateOfWorld,
  1032. ) {
  1033. result := asw.IsVolumeAttachedToNode(volumeName, nodeName)
  1034. if result == isAttached {
  1035. return
  1036. }
  1037. t.Fatalf("Check volume <%v> is attached to node <%v>, got %v, expected %v",
  1038. volumeName,
  1039. nodeName,
  1040. result,
  1041. isAttached)
  1042. }
  1043. func verifyVolumeReportedAsAttachedToNode(
  1044. t *testing.T,
  1045. volumeName v1.UniqueVolumeName,
  1046. nodeName k8stypes.NodeName,
  1047. isAttached bool,
  1048. asw cache.ActualStateOfWorld,
  1049. ) {
  1050. result := false
  1051. volumes := asw.GetVolumesToReportAttached()
  1052. for _, volume := range volumes[nodeName] {
  1053. if volume.Name == volumeName {
  1054. result = true
  1055. }
  1056. }
  1057. if result == isAttached {
  1058. return
  1059. }
  1060. t.Fatalf("Check volume <%v> is reported as attached to node <%v>, got %v, expected %v",
  1061. volumeName,
  1062. nodeName,
  1063. result,
  1064. isAttached)
  1065. }
  1066. func verifyNewDetacherCallCount(
  1067. t *testing.T,
  1068. expectZeroNewDetacherCallCount bool,
  1069. fakePlugin *volumetesting.FakeVolumePlugin) {
  1070. if expectZeroNewDetacherCallCount &&
  1071. fakePlugin.GetNewDetacherCallCount() != 0 {
  1072. t.Fatalf("Wrong NewDetacherCallCount. Expected: <0> Actual: <%v>",
  1073. fakePlugin.GetNewDetacherCallCount())
  1074. }
  1075. }
  1076. func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.ConditionFunc) error {
  1077. backoff := wait.Backoff{
  1078. Duration: initialDuration,
  1079. Factor: 3,
  1080. Jitter: 0,
  1081. Steps: 6,
  1082. }
  1083. return wait.ExponentialBackoff(backoff, fn)
  1084. }