operation_executor_test.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package operationexecutor
  14. import (
  15. "strconv"
  16. "testing"
  17. "time"
  18. v1 "k8s.io/api/core/v1"
  19. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  20. "k8s.io/apimachinery/pkg/types"
  21. "k8s.io/apimachinery/pkg/util/uuid"
  22. csitrans "k8s.io/csi-translation-lib"
  23. "k8s.io/kubernetes/pkg/volume"
  24. "k8s.io/kubernetes/pkg/volume/util/hostutil"
  25. volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
  26. )
  27. const (
  28. numVolumesToMount = 2
  29. numAttachableVolumesToUnmount = 2
  30. numNonAttachableVolumesToUnmount = 2
  31. numDevicesToUnmount = 2
  32. numVolumesToAttach = 2
  33. numVolumesToDetach = 2
  34. numVolumesToVerifyAttached = 2
  35. numVolumesToVerifyControllerAttached = 2
  36. numVolumesToMap = 2
  37. numAttachableVolumesToUnmap = 2
  38. numNonAttachableVolumesToUnmap = 2
  39. numDevicesToUnmap = 2
  40. )
  41. var _ OperationGenerator = &fakeOperationGenerator{}
  42. func TestOperationExecutor_MountVolume_ConcurrentMountForNonAttachableAndNonDevicemountablePlugins(t *testing.T) {
  43. // Arrange
  44. ch, quit, oe := setup()
  45. volumesToMount := make([]VolumeToMount, numVolumesToMount)
  46. secretName := "secret-volume"
  47. volumeName := v1.UniqueVolumeName(secretName)
  48. // Act
  49. for i := range volumesToMount {
  50. podName := "pod-" + strconv.Itoa((i + 1))
  51. pod := getTestPodWithSecret(podName, secretName)
  52. volumesToMount[i] = VolumeToMount{
  53. Pod: pod,
  54. VolumeName: volumeName,
  55. PluginIsAttachable: false, // this field determines whether the plugin is attachable
  56. PluginIsDeviceMountable: false, // this field determines whether the plugin is devicemountable
  57. ReportedInUse: true,
  58. }
  59. oe.MountVolume(0 /* waitForAttachTimeOut */, volumesToMount[i], nil /* actualStateOfWorldMounterUpdater */, false /* isRemount */)
  60. }
  61. // Assert
  62. if !isOperationRunConcurrently(ch, quit, numVolumesToMount) {
  63. t.Fatalf("Unable to start mount operations in Concurrent for non-attachable volumes")
  64. }
  65. }
  66. func TestOperationExecutor_MountVolume_ConcurrentMountForAttachablePlugins(t *testing.T) {
  67. // Arrange
  68. ch, quit, oe := setup()
  69. volumesToMount := make([]VolumeToMount, numVolumesToAttach)
  70. pdName := "pd-volume"
  71. volumeName := v1.UniqueVolumeName(pdName)
  72. // Act
  73. for i := range volumesToMount {
  74. podName := "pod-" + strconv.Itoa((i + 1))
  75. pod := getTestPodWithGCEPD(podName, pdName)
  76. volumesToMount[i] = VolumeToMount{
  77. Pod: pod,
  78. VolumeName: volumeName,
  79. PluginIsAttachable: true, // this field determines whether the plugin is attachable
  80. ReportedInUse: true,
  81. }
  82. oe.MountVolume(0 /* waitForAttachTimeout */, volumesToMount[i], nil /* actualStateOfWorldMounterUpdater */, false /* isRemount */)
  83. }
  84. // Assert
  85. if !isOperationRunSerially(ch, quit) {
  86. t.Fatalf("Mount operations should not start concurrently for attachable volumes")
  87. }
  88. }
  89. func TestOperationExecutor_MountVolume_ConcurrentMountForDeviceMountablePlugins(t *testing.T) {
  90. // Arrange
  91. ch, quit, oe := setup()
  92. volumesToMount := make([]VolumeToMount, numVolumesToAttach)
  93. pdName := "pd-volume"
  94. volumeName := v1.UniqueVolumeName(pdName)
  95. // Act
  96. for i := range volumesToMount {
  97. podName := "pod-" + strconv.Itoa((i + 1))
  98. pod := getTestPodWithGCEPD(podName, pdName)
  99. volumesToMount[i] = VolumeToMount{
  100. Pod: pod,
  101. VolumeName: volumeName,
  102. PluginIsDeviceMountable: true, // this field determines whether the plugin is devicemountable
  103. ReportedInUse: true,
  104. }
  105. oe.MountVolume(0 /* waitForAttachTimeout */, volumesToMount[i], nil /* actualStateOfWorldMounterUpdater */, false /* isRemount */)
  106. }
  107. // Assert
  108. if !isOperationRunSerially(ch, quit) {
  109. t.Fatalf("Mount operations should not start concurrently for devicemountable volumes")
  110. }
  111. }
  112. func TestOperationExecutor_UnmountVolume_ConcurrentUnmountForAllPlugins(t *testing.T) {
  113. // Arrange
  114. ch, quit, oe := setup()
  115. volumesToUnmount := make([]MountedVolume, numAttachableVolumesToUnmount+numNonAttachableVolumesToUnmount)
  116. pdName := "pd-volume"
  117. secretName := "secret-volume"
  118. // Act
  119. for i := 0; i < numNonAttachableVolumesToUnmount+numAttachableVolumesToUnmount; i++ {
  120. podName := "pod-" + strconv.Itoa(i+1)
  121. if i < numNonAttachableVolumesToUnmount {
  122. pod := getTestPodWithSecret(podName, secretName)
  123. volumesToUnmount[i] = MountedVolume{
  124. PodName: volumetypes.UniquePodName(podName),
  125. VolumeName: v1.UniqueVolumeName(secretName),
  126. PodUID: pod.UID,
  127. }
  128. } else {
  129. pod := getTestPodWithGCEPD(podName, pdName)
  130. volumesToUnmount[i] = MountedVolume{
  131. PodName: volumetypes.UniquePodName(podName),
  132. VolumeName: v1.UniqueVolumeName(pdName),
  133. PodUID: pod.UID,
  134. }
  135. }
  136. oe.UnmountVolume(volumesToUnmount[i], nil /* actualStateOfWorldMounterUpdater */, "" /*podsDir*/)
  137. }
  138. // Assert
  139. if !isOperationRunConcurrently(ch, quit, numNonAttachableVolumesToUnmount+numAttachableVolumesToUnmount) {
  140. t.Fatalf("Unable to start unmount operations concurrently for volume plugins")
  141. }
  142. }
  143. func TestOperationExecutor_UnmountDeviceConcurrently(t *testing.T) {
  144. // Arrange
  145. ch, quit, oe := setup()
  146. attachedVolumes := make([]AttachedVolume, numDevicesToUnmount)
  147. pdName := "pd-volume"
  148. // Act
  149. for i := range attachedVolumes {
  150. attachedVolumes[i] = AttachedVolume{
  151. VolumeName: v1.UniqueVolumeName(pdName),
  152. NodeName: "node-name",
  153. }
  154. oe.UnmountDevice(attachedVolumes[i], nil /* actualStateOfWorldMounterUpdater */, nil /* mount.Interface */)
  155. }
  156. // Assert
  157. if !isOperationRunSerially(ch, quit) {
  158. t.Fatalf("Unmount device operations should not start concurrently")
  159. }
  160. }
  161. func TestOperationExecutor_AttachVolumeConcurrently(t *testing.T) {
  162. // Arrange
  163. ch, quit, oe := setup()
  164. volumesToAttach := make([]VolumeToAttach, numVolumesToAttach)
  165. pdName := "pd-volume"
  166. // Act
  167. for i := range volumesToAttach {
  168. volumesToAttach[i] = VolumeToAttach{
  169. VolumeName: v1.UniqueVolumeName(pdName),
  170. NodeName: "node",
  171. }
  172. oe.AttachVolume(volumesToAttach[i], nil /* actualStateOfWorldAttacherUpdater */)
  173. }
  174. // Assert
  175. if !isOperationRunSerially(ch, quit) {
  176. t.Fatalf("Attach volume operations should not start concurrently")
  177. }
  178. }
  179. func TestOperationExecutor_DetachVolumeConcurrently(t *testing.T) {
  180. // Arrange
  181. ch, quit, oe := setup()
  182. attachedVolumes := make([]AttachedVolume, numVolumesToDetach)
  183. pdName := "pd-volume"
  184. // Act
  185. for i := range attachedVolumes {
  186. attachedVolumes[i] = AttachedVolume{
  187. VolumeName: v1.UniqueVolumeName(pdName),
  188. NodeName: "node",
  189. }
  190. oe.DetachVolume(attachedVolumes[i], true /* verifySafeToDetach */, nil /* actualStateOfWorldAttacherUpdater */)
  191. }
  192. // Assert
  193. if !isOperationRunSerially(ch, quit) {
  194. t.Fatalf("DetachVolume operations should not run concurrently")
  195. }
  196. }
  197. func TestOperationExecutor_VerifyVolumesAreAttachedConcurrently(t *testing.T) {
  198. // Arrange
  199. ch, quit, oe := setup()
  200. // Act
  201. for i := 0; i < numVolumesToVerifyAttached; i++ {
  202. oe.VerifyVolumesAreAttachedPerNode(nil /* attachedVolumes */, "node-name", nil /* actualStateOfWorldAttacherUpdater */)
  203. }
  204. // Assert
  205. if !isOperationRunConcurrently(ch, quit, numVolumesToVerifyAttached) {
  206. t.Fatalf("VerifyVolumesAreAttached operation is not being run concurrently")
  207. }
  208. }
  209. func TestOperationExecutor_VerifyControllerAttachedVolumeConcurrently(t *testing.T) {
  210. // Arrange
  211. ch, quit, oe := setup()
  212. volumesToMount := make([]VolumeToMount, numVolumesToVerifyControllerAttached)
  213. pdName := "pd-volume"
  214. // Act
  215. for i := range volumesToMount {
  216. volumesToMount[i] = VolumeToMount{
  217. VolumeName: v1.UniqueVolumeName(pdName),
  218. }
  219. oe.VerifyControllerAttachedVolume(volumesToMount[i], types.NodeName("node-name"), nil /* actualStateOfWorldMounterUpdater */)
  220. }
  221. // Assert
  222. if !isOperationRunSerially(ch, quit) {
  223. t.Fatalf("VerifyControllerAttachedVolume should not run concurrently")
  224. }
  225. }
  226. func TestOperationExecutor_MountVolume_ConcurrentMountForNonAttachablePlugins_VolumeMode_Block(t *testing.T) {
  227. // Arrange
  228. ch, quit, oe := setup()
  229. volumesToMount := make([]VolumeToMount, numVolumesToMap)
  230. secretName := "secret-volume"
  231. volumeName := v1.UniqueVolumeName(secretName)
  232. volumeMode := v1.PersistentVolumeBlock
  233. tmpSpec := &volume.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{VolumeMode: &volumeMode}}}
  234. // Act
  235. for i := range volumesToMount {
  236. podName := "pod-" + strconv.Itoa((i + 1))
  237. pod := getTestPodWithSecret(podName, secretName)
  238. volumesToMount[i] = VolumeToMount{
  239. Pod: pod,
  240. VolumeName: volumeName,
  241. PluginIsAttachable: false, // this field determines whether the plugin is attachable
  242. ReportedInUse: true,
  243. VolumeSpec: tmpSpec,
  244. }
  245. oe.MountVolume(0 /* waitForAttachTimeOut */, volumesToMount[i], nil /* actualStateOfWorldMounterUpdater */, false)
  246. }
  247. // Assert
  248. if !isOperationRunConcurrently(ch, quit, numVolumesToMap) {
  249. t.Fatalf("Unable to start map operations in Concurrent for non-attachable volumes")
  250. }
  251. }
  252. func TestOperationExecutor_MountVolume_ConcurrentMountForAttachablePlugins_VolumeMode_Block(t *testing.T) {
  253. // Arrange
  254. ch, quit, oe := setup()
  255. volumesToMount := make([]VolumeToMount, numVolumesToAttach)
  256. pdName := "pd-volume"
  257. volumeName := v1.UniqueVolumeName(pdName)
  258. volumeMode := v1.PersistentVolumeBlock
  259. tmpSpec := &volume.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{VolumeMode: &volumeMode}}}
  260. // Act
  261. for i := range volumesToMount {
  262. podName := "pod-" + strconv.Itoa((i + 1))
  263. pod := getTestPodWithGCEPD(podName, pdName)
  264. volumesToMount[i] = VolumeToMount{
  265. Pod: pod,
  266. VolumeName: volumeName,
  267. PluginIsAttachable: true, // this field determines whether the plugin is attachable
  268. ReportedInUse: true,
  269. VolumeSpec: tmpSpec,
  270. }
  271. oe.MountVolume(0 /* waitForAttachTimeout */, volumesToMount[i], nil /* actualStateOfWorldMounterUpdater */, false)
  272. }
  273. // Assert
  274. if !isOperationRunSerially(ch, quit) {
  275. t.Fatalf("Map operations should not start concurrently for attachable volumes")
  276. }
  277. }
  278. func TestOperationExecutor_UnmountVolume_ConcurrentUnmountForAllPlugins_VolumeMode_Block(t *testing.T) {
  279. // Arrange
  280. ch, quit, oe := setup()
  281. volumesToUnmount := make([]MountedVolume, numAttachableVolumesToUnmap+numNonAttachableVolumesToUnmap)
  282. pdName := "pd-volume"
  283. secretName := "secret-volume"
  284. volumeMode := v1.PersistentVolumeBlock
  285. tmpSpec := &volume.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{VolumeMode: &volumeMode}}}
  286. // Act
  287. for i := 0; i < numNonAttachableVolumesToUnmap+numAttachableVolumesToUnmap; i++ {
  288. podName := "pod-" + strconv.Itoa(i+1)
  289. if i < numNonAttachableVolumesToUnmap {
  290. pod := getTestPodWithSecret(podName, secretName)
  291. volumesToUnmount[i] = MountedVolume{
  292. PodName: volumetypes.UniquePodName(podName),
  293. VolumeName: v1.UniqueVolumeName(secretName),
  294. PodUID: pod.UID,
  295. VolumeSpec: tmpSpec,
  296. }
  297. } else {
  298. pod := getTestPodWithGCEPD(podName, pdName)
  299. volumesToUnmount[i] = MountedVolume{
  300. PodName: volumetypes.UniquePodName(podName),
  301. VolumeName: v1.UniqueVolumeName(pdName),
  302. PodUID: pod.UID,
  303. VolumeSpec: tmpSpec,
  304. }
  305. }
  306. oe.UnmountVolume(volumesToUnmount[i], nil /* actualStateOfWorldMounterUpdater */, "" /* podsDir */)
  307. }
  308. // Assert
  309. if !isOperationRunConcurrently(ch, quit, numNonAttachableVolumesToUnmap+numAttachableVolumesToUnmap) {
  310. t.Fatalf("Unable to start unmap operations concurrently for volume plugins")
  311. }
  312. }
  313. func TestOperationExecutor_UnmountDeviceConcurrently_VolumeMode_Block(t *testing.T) {
  314. // Arrange
  315. ch, quit, oe := setup()
  316. attachedVolumes := make([]AttachedVolume, numDevicesToUnmap)
  317. pdName := "pd-volume"
  318. volumeMode := v1.PersistentVolumeBlock
  319. tmpSpec := &volume.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{VolumeMode: &volumeMode}}}
  320. // Act
  321. for i := range attachedVolumes {
  322. attachedVolumes[i] = AttachedVolume{
  323. VolumeName: v1.UniqueVolumeName(pdName),
  324. NodeName: "node-name",
  325. VolumeSpec: tmpSpec,
  326. }
  327. oe.UnmountDevice(attachedVolumes[i], nil /* actualStateOfWorldMounterUpdater */, nil /* mount.Interface */)
  328. }
  329. // Assert
  330. if !isOperationRunSerially(ch, quit) {
  331. t.Fatalf("Unmap device operations should not start concurrently")
  332. }
  333. }
  334. type fakeOperationGenerator struct {
  335. ch chan interface{}
  336. quit chan interface{}
  337. }
  338. func newFakeOperationGenerator(ch chan interface{}, quit chan interface{}) OperationGenerator {
  339. return &fakeOperationGenerator{
  340. ch: ch,
  341. quit: quit,
  342. }
  343. }
  344. func (fopg *fakeOperationGenerator) GenerateMountVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater, isRemount bool) volumetypes.GeneratedOperations {
  345. opFunc := func() (error, error) {
  346. startOperationAndBlock(fopg.ch, fopg.quit)
  347. return nil, nil
  348. }
  349. return volumetypes.GeneratedOperations{
  350. OperationFunc: opFunc,
  351. }
  352. }
  353. func (fopg *fakeOperationGenerator) GenerateUnmountVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, podsDir string) (volumetypes.GeneratedOperations, error) {
  354. opFunc := func() (error, error) {
  355. startOperationAndBlock(fopg.ch, fopg.quit)
  356. return nil, nil
  357. }
  358. return volumetypes.GeneratedOperations{
  359. OperationFunc: opFunc,
  360. }, nil
  361. }
  362. func (fopg *fakeOperationGenerator) GenerateAttachVolumeFunc(volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) volumetypes.GeneratedOperations {
  363. opFunc := func() (error, error) {
  364. startOperationAndBlock(fopg.ch, fopg.quit)
  365. return nil, nil
  366. }
  367. return volumetypes.GeneratedOperations{
  368. OperationFunc: opFunc,
  369. }
  370. }
  371. func (fopg *fakeOperationGenerator) GenerateDetachVolumeFunc(volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) {
  372. opFunc := func() (error, error) {
  373. startOperationAndBlock(fopg.ch, fopg.quit)
  374. return nil, nil
  375. }
  376. return volumetypes.GeneratedOperations{
  377. OperationFunc: opFunc,
  378. }, nil
  379. }
  380. func (fopg *fakeOperationGenerator) GenerateVolumesAreAttachedFunc(attachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) {
  381. opFunc := func() (error, error) {
  382. startOperationAndBlock(fopg.ch, fopg.quit)
  383. return nil, nil
  384. }
  385. return volumetypes.GeneratedOperations{
  386. OperationFunc: opFunc,
  387. }, nil
  388. }
  389. func (fopg *fakeOperationGenerator) GenerateUnmountDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, hostutil hostutil.HostUtils) (volumetypes.GeneratedOperations, error) {
  390. opFunc := func() (error, error) {
  391. startOperationAndBlock(fopg.ch, fopg.quit)
  392. return nil, nil
  393. }
  394. return volumetypes.GeneratedOperations{
  395. OperationFunc: opFunc,
  396. }, nil
  397. }
  398. func (fopg *fakeOperationGenerator) GenerateVerifyControllerAttachedVolumeFunc(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) {
  399. opFunc := func() (error, error) {
  400. startOperationAndBlock(fopg.ch, fopg.quit)
  401. return nil, nil
  402. }
  403. return volumetypes.GeneratedOperations{
  404. OperationFunc: opFunc,
  405. }, nil
  406. }
  407. func (fopg *fakeOperationGenerator) GenerateExpandVolumeFunc(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) (volumetypes.GeneratedOperations, error) {
  408. opFunc := func() (error, error) {
  409. startOperationAndBlock(fopg.ch, fopg.quit)
  410. return nil, nil
  411. }
  412. return volumetypes.GeneratedOperations{
  413. OperationFunc: opFunc,
  414. }, nil
  415. }
  416. func (fopg *fakeOperationGenerator) GenerateExpandInUseVolumeFunc(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) {
  417. opFunc := func() (error, error) {
  418. startOperationAndBlock(fopg.ch, fopg.quit)
  419. return nil, nil
  420. }
  421. return volumetypes.GeneratedOperations{
  422. OperationFunc: opFunc,
  423. }, nil
  424. }
  425. func (fopg *fakeOperationGenerator) GenerateBulkVolumeVerifyFunc(
  426. pluginNodeVolumes map[types.NodeName][]*volume.Spec,
  427. pluginNane string,
  428. volumeSpecMap map[*volume.Spec]v1.UniqueVolumeName,
  429. actualStateOfWorldAttacherUpdater ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) {
  430. opFunc := func() (error, error) {
  431. startOperationAndBlock(fopg.ch, fopg.quit)
  432. return nil, nil
  433. }
  434. return volumetypes.GeneratedOperations{
  435. OperationFunc: opFunc,
  436. }, nil
  437. }
  438. func (fopg *fakeOperationGenerator) GenerateMapVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) {
  439. opFunc := func() (error, error) {
  440. startOperationAndBlock(fopg.ch, fopg.quit)
  441. return nil, nil
  442. }
  443. return volumetypes.GeneratedOperations{
  444. OperationFunc: opFunc,
  445. }, nil
  446. }
  447. func (fopg *fakeOperationGenerator) GenerateUnmapVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) {
  448. opFunc := func() (error, error) {
  449. startOperationAndBlock(fopg.ch, fopg.quit)
  450. return nil, nil
  451. }
  452. return volumetypes.GeneratedOperations{
  453. OperationFunc: opFunc,
  454. }, nil
  455. }
  456. func (fopg *fakeOperationGenerator) GenerateUnmapDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, hostutil hostutil.HostUtils) (volumetypes.GeneratedOperations, error) {
  457. opFunc := func() (error, error) {
  458. startOperationAndBlock(fopg.ch, fopg.quit)
  459. return nil, nil
  460. }
  461. return volumetypes.GeneratedOperations{
  462. OperationFunc: opFunc,
  463. }, nil
  464. }
  465. func (fopg *fakeOperationGenerator) GetVolumePluginMgr() *volume.VolumePluginMgr {
  466. return nil
  467. }
  468. func (fopg *fakeOperationGenerator) GetCSITranslator() InTreeToCSITranslator {
  469. return csitrans.New()
  470. }
  471. func getTestPodWithSecret(podName, secretName string) *v1.Pod {
  472. return &v1.Pod{
  473. ObjectMeta: metav1.ObjectMeta{
  474. Name: podName,
  475. UID: types.UID(podName),
  476. },
  477. Spec: v1.PodSpec{
  478. Volumes: []v1.Volume{
  479. {
  480. Name: secretName,
  481. VolumeSource: v1.VolumeSource{
  482. Secret: &v1.SecretVolumeSource{
  483. SecretName: secretName,
  484. },
  485. },
  486. },
  487. },
  488. Containers: []v1.Container{
  489. {
  490. Name: "secret-volume-test",
  491. Image: "k8s.gcr.io/mounttest:0.8",
  492. Args: []string{
  493. "--file_content=/etc/secret-volume/data-1",
  494. "--file_mode=/etc/secret-volume/data-1"},
  495. VolumeMounts: []v1.VolumeMount{
  496. {
  497. Name: secretName,
  498. MountPath: "/data",
  499. },
  500. },
  501. },
  502. },
  503. RestartPolicy: v1.RestartPolicyNever,
  504. },
  505. }
  506. }
  507. func getTestPodWithGCEPD(podName, pdName string) *v1.Pod {
  508. return &v1.Pod{
  509. ObjectMeta: metav1.ObjectMeta{
  510. Name: podName,
  511. UID: types.UID(podName + string(uuid.NewUUID())),
  512. },
  513. Spec: v1.PodSpec{
  514. Volumes: []v1.Volume{
  515. {
  516. Name: pdName,
  517. VolumeSource: v1.VolumeSource{
  518. GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
  519. PDName: pdName,
  520. FSType: "ext4",
  521. ReadOnly: false,
  522. },
  523. },
  524. },
  525. },
  526. Containers: []v1.Container{
  527. {
  528. Name: "pd-volume-test",
  529. Image: "k8s.gcr.io/mounttest:0.8",
  530. Args: []string{
  531. "--file_content=/etc/pd-volume/data-1",
  532. },
  533. VolumeMounts: []v1.VolumeMount{
  534. {
  535. Name: pdName,
  536. MountPath: "/data",
  537. },
  538. },
  539. },
  540. },
  541. RestartPolicy: v1.RestartPolicyNever,
  542. },
  543. }
  544. }
  545. func isOperationRunSerially(ch <-chan interface{}, quit chan<- interface{}) bool {
  546. defer close(quit)
  547. numOperationsStarted := 0
  548. loop:
  549. for {
  550. select {
  551. case <-ch:
  552. numOperationsStarted++
  553. if numOperationsStarted > 1 {
  554. return false
  555. }
  556. case <-time.After(5 * time.Second):
  557. break loop
  558. }
  559. }
  560. return true
  561. }
  562. func isOperationRunConcurrently(ch <-chan interface{}, quit chan<- interface{}, numOperationsToRun int) bool {
  563. defer close(quit)
  564. numOperationsStarted := 0
  565. loop:
  566. for {
  567. select {
  568. case <-ch:
  569. numOperationsStarted++
  570. if numOperationsStarted == numOperationsToRun {
  571. return true
  572. }
  573. case <-time.After(5 * time.Second):
  574. break loop
  575. }
  576. }
  577. return false
  578. }
  579. func setup() (chan interface{}, chan interface{}, OperationExecutor) {
  580. ch, quit := make(chan interface{}), make(chan interface{})
  581. return ch, quit, NewOperationExecutor(newFakeOperationGenerator(ch, quit))
  582. }
  583. // This function starts by writing to ch and blocks on the quit channel
  584. // until it is closed by the currently running test
  585. func startOperationAndBlock(ch chan<- interface{}, quit <-chan interface{}) {
  586. ch <- nil
  587. <-quit
  588. }