operation_executor_test.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package operationexecutor
  14. import (
  15. "strconv"
  16. "testing"
  17. "time"
  18. "k8s.io/api/core/v1"
  19. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  20. "k8s.io/apimachinery/pkg/types"
  21. "k8s.io/apimachinery/pkg/util/uuid"
  22. "k8s.io/kubernetes/pkg/util/mount"
  23. "k8s.io/kubernetes/pkg/volume"
  24. volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
  25. )
  26. const (
  27. numVolumesToMount = 2
  28. numAttachableVolumesToUnmount = 2
  29. numNonAttachableVolumesToUnmount = 2
  30. numDevicesToUnmount = 2
  31. numVolumesToAttach = 2
  32. numVolumesToDetach = 2
  33. numVolumesToVerifyAttached = 2
  34. numVolumesToVerifyControllerAttached = 2
  35. numVolumesToMap = 2
  36. numAttachableVolumesToUnmap = 2
  37. numNonAttachableVolumesToUnmap = 2
  38. numDevicesToUnmap = 2
  39. )
  40. var _ OperationGenerator = &fakeOperationGenerator{}
  41. func TestOperationExecutor_MountVolume_ConcurrentMountForNonAttachableAndNonDevicemountablePlugins(t *testing.T) {
  42. // Arrange
  43. ch, quit, oe := setup()
  44. volumesToMount := make([]VolumeToMount, numVolumesToMount)
  45. secretName := "secret-volume"
  46. volumeName := v1.UniqueVolumeName(secretName)
  47. // Act
  48. for i := range volumesToMount {
  49. podName := "pod-" + strconv.Itoa((i + 1))
  50. pod := getTestPodWithSecret(podName, secretName)
  51. volumesToMount[i] = VolumeToMount{
  52. Pod: pod,
  53. VolumeName: volumeName,
  54. PluginIsAttachable: false, // this field determines whether the plugin is attachable
  55. PluginIsDeviceMountable: false, // this field determines whether the plugin is devicemountable
  56. ReportedInUse: true,
  57. }
  58. oe.MountVolume(0 /* waitForAttachTimeOut */, volumesToMount[i], nil /* actualStateOfWorldMounterUpdater */, false /* isRemount */)
  59. }
  60. // Assert
  61. if !isOperationRunConcurrently(ch, quit, numVolumesToMount) {
  62. t.Fatalf("Unable to start mount operations in Concurrent for non-attachable volumes")
  63. }
  64. }
  65. func TestOperationExecutor_MountVolume_ConcurrentMountForAttachablePlugins(t *testing.T) {
  66. // Arrange
  67. ch, quit, oe := setup()
  68. volumesToMount := make([]VolumeToMount, numVolumesToAttach)
  69. pdName := "pd-volume"
  70. volumeName := v1.UniqueVolumeName(pdName)
  71. // Act
  72. for i := range volumesToMount {
  73. podName := "pod-" + strconv.Itoa((i + 1))
  74. pod := getTestPodWithGCEPD(podName, pdName)
  75. volumesToMount[i] = VolumeToMount{
  76. Pod: pod,
  77. VolumeName: volumeName,
  78. PluginIsAttachable: true, // this field determines whether the plugin is attachable
  79. ReportedInUse: true,
  80. }
  81. oe.MountVolume(0 /* waitForAttachTimeout */, volumesToMount[i], nil /* actualStateOfWorldMounterUpdater */, false /* isRemount */)
  82. }
  83. // Assert
  84. if !isOperationRunSerially(ch, quit) {
  85. t.Fatalf("Mount operations should not start concurrently for attachable volumes")
  86. }
  87. }
  88. func TestOperationExecutor_MountVolume_ConcurrentMountForDeviceMountablePlugins(t *testing.T) {
  89. // Arrange
  90. ch, quit, oe := setup()
  91. volumesToMount := make([]VolumeToMount, numVolumesToAttach)
  92. pdName := "pd-volume"
  93. volumeName := v1.UniqueVolumeName(pdName)
  94. // Act
  95. for i := range volumesToMount {
  96. podName := "pod-" + strconv.Itoa((i + 1))
  97. pod := getTestPodWithGCEPD(podName, pdName)
  98. volumesToMount[i] = VolumeToMount{
  99. Pod: pod,
  100. VolumeName: volumeName,
  101. PluginIsDeviceMountable: true, // this field determines whether the plugin is devicemountable
  102. ReportedInUse: true,
  103. }
  104. oe.MountVolume(0 /* waitForAttachTimeout */, volumesToMount[i], nil /* actualStateOfWorldMounterUpdater */, false /* isRemount */)
  105. }
  106. // Assert
  107. if !isOperationRunSerially(ch, quit) {
  108. t.Fatalf("Mount operations should not start concurrently for devicemountable volumes")
  109. }
  110. }
  111. func TestOperationExecutor_UnmountVolume_ConcurrentUnmountForAllPlugins(t *testing.T) {
  112. // Arrange
  113. ch, quit, oe := setup()
  114. volumesToUnmount := make([]MountedVolume, numAttachableVolumesToUnmount+numNonAttachableVolumesToUnmount)
  115. pdName := "pd-volume"
  116. secretName := "secret-volume"
  117. // Act
  118. for i := 0; i < numNonAttachableVolumesToUnmount+numAttachableVolumesToUnmount; i++ {
  119. podName := "pod-" + strconv.Itoa(i+1)
  120. if i < numNonAttachableVolumesToUnmount {
  121. pod := getTestPodWithSecret(podName, secretName)
  122. volumesToUnmount[i] = MountedVolume{
  123. PodName: volumetypes.UniquePodName(podName),
  124. VolumeName: v1.UniqueVolumeName(secretName),
  125. PodUID: pod.UID,
  126. }
  127. } else {
  128. pod := getTestPodWithGCEPD(podName, pdName)
  129. volumesToUnmount[i] = MountedVolume{
  130. PodName: volumetypes.UniquePodName(podName),
  131. VolumeName: v1.UniqueVolumeName(pdName),
  132. PodUID: pod.UID,
  133. }
  134. }
  135. oe.UnmountVolume(volumesToUnmount[i], nil /* actualStateOfWorldMounterUpdater */, "" /*podsDir*/)
  136. }
  137. // Assert
  138. if !isOperationRunConcurrently(ch, quit, numNonAttachableVolumesToUnmount+numAttachableVolumesToUnmount) {
  139. t.Fatalf("Unable to start unmount operations concurrently for volume plugins")
  140. }
  141. }
  142. func TestOperationExecutor_UnmountDeviceConcurrently(t *testing.T) {
  143. // Arrange
  144. ch, quit, oe := setup()
  145. attachedVolumes := make([]AttachedVolume, numDevicesToUnmount)
  146. pdName := "pd-volume"
  147. // Act
  148. for i := range attachedVolumes {
  149. attachedVolumes[i] = AttachedVolume{
  150. VolumeName: v1.UniqueVolumeName(pdName),
  151. NodeName: "node-name",
  152. }
  153. oe.UnmountDevice(attachedVolumes[i], nil /* actualStateOfWorldMounterUpdater */, nil /* mount.Interface */)
  154. }
  155. // Assert
  156. if !isOperationRunSerially(ch, quit) {
  157. t.Fatalf("Unmount device operations should not start concurrently")
  158. }
  159. }
  160. func TestOperationExecutor_AttachVolumeConcurrently(t *testing.T) {
  161. // Arrange
  162. ch, quit, oe := setup()
  163. volumesToAttach := make([]VolumeToAttach, numVolumesToAttach)
  164. pdName := "pd-volume"
  165. // Act
  166. for i := range volumesToAttach {
  167. volumesToAttach[i] = VolumeToAttach{
  168. VolumeName: v1.UniqueVolumeName(pdName),
  169. NodeName: "node",
  170. }
  171. oe.AttachVolume(volumesToAttach[i], nil /* actualStateOfWorldAttacherUpdater */)
  172. }
  173. // Assert
  174. if !isOperationRunSerially(ch, quit) {
  175. t.Fatalf("Attach volume operations should not start concurrently")
  176. }
  177. }
  178. func TestOperationExecutor_DetachVolumeConcurrently(t *testing.T) {
  179. // Arrange
  180. ch, quit, oe := setup()
  181. attachedVolumes := make([]AttachedVolume, numVolumesToDetach)
  182. pdName := "pd-volume"
  183. // Act
  184. for i := range attachedVolumes {
  185. attachedVolumes[i] = AttachedVolume{
  186. VolumeName: v1.UniqueVolumeName(pdName),
  187. NodeName: "node",
  188. }
  189. oe.DetachVolume(attachedVolumes[i], true /* verifySafeToDetach */, nil /* actualStateOfWorldAttacherUpdater */)
  190. }
  191. // Assert
  192. if !isOperationRunSerially(ch, quit) {
  193. t.Fatalf("DetachVolume operations should not run concurrently")
  194. }
  195. }
  196. func TestOperationExecutor_VerifyVolumesAreAttachedConcurrently(t *testing.T) {
  197. // Arrange
  198. ch, quit, oe := setup()
  199. // Act
  200. for i := 0; i < numVolumesToVerifyAttached; i++ {
  201. oe.VerifyVolumesAreAttachedPerNode(nil /* attachedVolumes */, "node-name", nil /* actualStateOfWorldAttacherUpdater */)
  202. }
  203. // Assert
  204. if !isOperationRunConcurrently(ch, quit, numVolumesToVerifyAttached) {
  205. t.Fatalf("VerifyVolumesAreAttached operation is not being run concurrently")
  206. }
  207. }
  208. func TestOperationExecutor_VerifyControllerAttachedVolumeConcurrently(t *testing.T) {
  209. // Arrange
  210. ch, quit, oe := setup()
  211. volumesToMount := make([]VolumeToMount, numVolumesToVerifyControllerAttached)
  212. pdName := "pd-volume"
  213. // Act
  214. for i := range volumesToMount {
  215. volumesToMount[i] = VolumeToMount{
  216. VolumeName: v1.UniqueVolumeName(pdName),
  217. }
  218. oe.VerifyControllerAttachedVolume(volumesToMount[i], types.NodeName("node-name"), nil /* actualStateOfWorldMounterUpdater */)
  219. }
  220. // Assert
  221. if !isOperationRunSerially(ch, quit) {
  222. t.Fatalf("VerifyControllerAttachedVolume should not run concurrently")
  223. }
  224. }
  225. func TestOperationExecutor_MountVolume_ConcurrentMountForNonAttachablePlugins_VolumeMode_Block(t *testing.T) {
  226. // Arrange
  227. ch, quit, oe := setup()
  228. volumesToMount := make([]VolumeToMount, numVolumesToMap)
  229. secretName := "secret-volume"
  230. volumeName := v1.UniqueVolumeName(secretName)
  231. volumeMode := v1.PersistentVolumeBlock
  232. tmpSpec := &volume.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{VolumeMode: &volumeMode}}}
  233. // Act
  234. for i := range volumesToMount {
  235. podName := "pod-" + strconv.Itoa((i + 1))
  236. pod := getTestPodWithSecret(podName, secretName)
  237. volumesToMount[i] = VolumeToMount{
  238. Pod: pod,
  239. VolumeName: volumeName,
  240. PluginIsAttachable: false, // this field determines whether the plugin is attachable
  241. ReportedInUse: true,
  242. VolumeSpec: tmpSpec,
  243. }
  244. oe.MountVolume(0 /* waitForAttachTimeOut */, volumesToMount[i], nil /* actualStateOfWorldMounterUpdater */, false)
  245. }
  246. // Assert
  247. if !isOperationRunConcurrently(ch, quit, numVolumesToMap) {
  248. t.Fatalf("Unable to start map operations in Concurrent for non-attachable volumes")
  249. }
  250. }
  251. func TestOperationExecutor_MountVolume_ConcurrentMountForAttachablePlugins_VolumeMode_Block(t *testing.T) {
  252. // Arrange
  253. ch, quit, oe := setup()
  254. volumesToMount := make([]VolumeToMount, numVolumesToAttach)
  255. pdName := "pd-volume"
  256. volumeName := v1.UniqueVolumeName(pdName)
  257. volumeMode := v1.PersistentVolumeBlock
  258. tmpSpec := &volume.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{VolumeMode: &volumeMode}}}
  259. // Act
  260. for i := range volumesToMount {
  261. podName := "pod-" + strconv.Itoa((i + 1))
  262. pod := getTestPodWithGCEPD(podName, pdName)
  263. volumesToMount[i] = VolumeToMount{
  264. Pod: pod,
  265. VolumeName: volumeName,
  266. PluginIsAttachable: true, // this field determines whether the plugin is attachable
  267. ReportedInUse: true,
  268. VolumeSpec: tmpSpec,
  269. }
  270. oe.MountVolume(0 /* waitForAttachTimeout */, volumesToMount[i], nil /* actualStateOfWorldMounterUpdater */, false)
  271. }
  272. // Assert
  273. if !isOperationRunSerially(ch, quit) {
  274. t.Fatalf("Map operations should not start concurrently for attachable volumes")
  275. }
  276. }
  277. func TestOperationExecutor_UnmountVolume_ConcurrentUnmountForAllPlugins_VolumeMode_Block(t *testing.T) {
  278. // Arrange
  279. ch, quit, oe := setup()
  280. volumesToUnmount := make([]MountedVolume, numAttachableVolumesToUnmap+numNonAttachableVolumesToUnmap)
  281. pdName := "pd-volume"
  282. secretName := "secret-volume"
  283. volumeMode := v1.PersistentVolumeBlock
  284. tmpSpec := &volume.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{VolumeMode: &volumeMode}}}
  285. // Act
  286. for i := 0; i < numNonAttachableVolumesToUnmap+numAttachableVolumesToUnmap; i++ {
  287. podName := "pod-" + strconv.Itoa(i+1)
  288. if i < numNonAttachableVolumesToUnmap {
  289. pod := getTestPodWithSecret(podName, secretName)
  290. volumesToUnmount[i] = MountedVolume{
  291. PodName: volumetypes.UniquePodName(podName),
  292. VolumeName: v1.UniqueVolumeName(secretName),
  293. PodUID: pod.UID,
  294. VolumeSpec: tmpSpec,
  295. }
  296. } else {
  297. pod := getTestPodWithGCEPD(podName, pdName)
  298. volumesToUnmount[i] = MountedVolume{
  299. PodName: volumetypes.UniquePodName(podName),
  300. VolumeName: v1.UniqueVolumeName(pdName),
  301. PodUID: pod.UID,
  302. VolumeSpec: tmpSpec,
  303. }
  304. }
  305. oe.UnmountVolume(volumesToUnmount[i], nil /* actualStateOfWorldMounterUpdater */, "" /* podsDir */)
  306. }
  307. // Assert
  308. if !isOperationRunConcurrently(ch, quit, numNonAttachableVolumesToUnmap+numAttachableVolumesToUnmap) {
  309. t.Fatalf("Unable to start unmap operations concurrently for volume plugins")
  310. }
  311. }
  312. func TestOperationExecutor_UnmountDeviceConcurrently_VolumeMode_Block(t *testing.T) {
  313. // Arrange
  314. ch, quit, oe := setup()
  315. attachedVolumes := make([]AttachedVolume, numDevicesToUnmap)
  316. pdName := "pd-volume"
  317. volumeMode := v1.PersistentVolumeBlock
  318. tmpSpec := &volume.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{VolumeMode: &volumeMode}}}
  319. // Act
  320. for i := range attachedVolumes {
  321. attachedVolumes[i] = AttachedVolume{
  322. VolumeName: v1.UniqueVolumeName(pdName),
  323. NodeName: "node-name",
  324. VolumeSpec: tmpSpec,
  325. }
  326. oe.UnmountDevice(attachedVolumes[i], nil /* actualStateOfWorldMounterUpdater */, nil /* mount.Interface */)
  327. }
  328. // Assert
  329. if !isOperationRunSerially(ch, quit) {
  330. t.Fatalf("Unmap device operations should not start concurrently")
  331. }
  332. }
  333. type fakeOperationGenerator struct {
  334. ch chan interface{}
  335. quit chan interface{}
  336. }
  337. func newFakeOperationGenerator(ch chan interface{}, quit chan interface{}) OperationGenerator {
  338. return &fakeOperationGenerator{
  339. ch: ch,
  340. quit: quit,
  341. }
  342. }
  343. func (fopg *fakeOperationGenerator) GenerateMountVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater, isRemount bool) volumetypes.GeneratedOperations {
  344. opFunc := func() (error, error) {
  345. startOperationAndBlock(fopg.ch, fopg.quit)
  346. return nil, nil
  347. }
  348. return volumetypes.GeneratedOperations{
  349. OperationFunc: opFunc,
  350. }
  351. }
  352. func (fopg *fakeOperationGenerator) GenerateUnmountVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, podsDir string) (volumetypes.GeneratedOperations, error) {
  353. opFunc := func() (error, error) {
  354. startOperationAndBlock(fopg.ch, fopg.quit)
  355. return nil, nil
  356. }
  357. return volumetypes.GeneratedOperations{
  358. OperationFunc: opFunc,
  359. }, nil
  360. }
  361. func (fopg *fakeOperationGenerator) GenerateAttachVolumeFunc(volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) volumetypes.GeneratedOperations {
  362. opFunc := func() (error, error) {
  363. startOperationAndBlock(fopg.ch, fopg.quit)
  364. return nil, nil
  365. }
  366. return volumetypes.GeneratedOperations{
  367. OperationFunc: opFunc,
  368. }
  369. }
  370. func (fopg *fakeOperationGenerator) GenerateDetachVolumeFunc(volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) {
  371. opFunc := func() (error, error) {
  372. startOperationAndBlock(fopg.ch, fopg.quit)
  373. return nil, nil
  374. }
  375. return volumetypes.GeneratedOperations{
  376. OperationFunc: opFunc,
  377. }, nil
  378. }
  379. func (fopg *fakeOperationGenerator) GenerateVolumesAreAttachedFunc(attachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) {
  380. opFunc := func() (error, error) {
  381. startOperationAndBlock(fopg.ch, fopg.quit)
  382. return nil, nil
  383. }
  384. return volumetypes.GeneratedOperations{
  385. OperationFunc: opFunc,
  386. }, nil
  387. }
  388. func (fopg *fakeOperationGenerator) GenerateUnmountDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (volumetypes.GeneratedOperations, error) {
  389. opFunc := func() (error, error) {
  390. startOperationAndBlock(fopg.ch, fopg.quit)
  391. return nil, nil
  392. }
  393. return volumetypes.GeneratedOperations{
  394. OperationFunc: opFunc,
  395. }, nil
  396. }
  397. func (fopg *fakeOperationGenerator) GenerateVerifyControllerAttachedVolumeFunc(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) {
  398. opFunc := func() (error, error) {
  399. startOperationAndBlock(fopg.ch, fopg.quit)
  400. return nil, nil
  401. }
  402. return volumetypes.GeneratedOperations{
  403. OperationFunc: opFunc,
  404. }, nil
  405. }
  406. func (fopg *fakeOperationGenerator) GenerateExpandVolumeFunc(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) (volumetypes.GeneratedOperations, error) {
  407. opFunc := func() (error, error) {
  408. startOperationAndBlock(fopg.ch, fopg.quit)
  409. return nil, nil
  410. }
  411. return volumetypes.GeneratedOperations{
  412. OperationFunc: opFunc,
  413. }, nil
  414. }
  415. func (fopg *fakeOperationGenerator) GenerateExpandInUseVolumeFunc(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) {
  416. opFunc := func() (error, error) {
  417. startOperationAndBlock(fopg.ch, fopg.quit)
  418. return nil, nil
  419. }
  420. return volumetypes.GeneratedOperations{
  421. OperationFunc: opFunc,
  422. }, nil
  423. }
  424. func (fopg *fakeOperationGenerator) GenerateBulkVolumeVerifyFunc(
  425. pluginNodeVolumes map[types.NodeName][]*volume.Spec,
  426. pluginNane string,
  427. volumeSpecMap map[*volume.Spec]v1.UniqueVolumeName,
  428. actualStateOfWorldAttacherUpdater ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) {
  429. opFunc := func() (error, error) {
  430. startOperationAndBlock(fopg.ch, fopg.quit)
  431. return nil, nil
  432. }
  433. return volumetypes.GeneratedOperations{
  434. OperationFunc: opFunc,
  435. }, nil
  436. }
  437. func (fopg *fakeOperationGenerator) GenerateMapVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) {
  438. opFunc := func() (error, error) {
  439. startOperationAndBlock(fopg.ch, fopg.quit)
  440. return nil, nil
  441. }
  442. return volumetypes.GeneratedOperations{
  443. OperationFunc: opFunc,
  444. }, nil
  445. }
  446. func (fopg *fakeOperationGenerator) GenerateUnmapVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) {
  447. opFunc := func() (error, error) {
  448. startOperationAndBlock(fopg.ch, fopg.quit)
  449. return nil, nil
  450. }
  451. return volumetypes.GeneratedOperations{
  452. OperationFunc: opFunc,
  453. }, nil
  454. }
  455. func (fopg *fakeOperationGenerator) GenerateUnmapDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (volumetypes.GeneratedOperations, error) {
  456. opFunc := func() (error, error) {
  457. startOperationAndBlock(fopg.ch, fopg.quit)
  458. return nil, nil
  459. }
  460. return volumetypes.GeneratedOperations{
  461. OperationFunc: opFunc,
  462. }, nil
  463. }
  464. func (fopg *fakeOperationGenerator) GetVolumePluginMgr() *volume.VolumePluginMgr {
  465. return nil
  466. }
  467. func getTestPodWithSecret(podName, secretName string) *v1.Pod {
  468. return &v1.Pod{
  469. ObjectMeta: metav1.ObjectMeta{
  470. Name: podName,
  471. UID: types.UID(podName),
  472. },
  473. Spec: v1.PodSpec{
  474. Volumes: []v1.Volume{
  475. {
  476. Name: secretName,
  477. VolumeSource: v1.VolumeSource{
  478. Secret: &v1.SecretVolumeSource{
  479. SecretName: secretName,
  480. },
  481. },
  482. },
  483. },
  484. Containers: []v1.Container{
  485. {
  486. Name: "secret-volume-test",
  487. Image: "k8s.gcr.io/mounttest:0.8",
  488. Args: []string{
  489. "--file_content=/etc/secret-volume/data-1",
  490. "--file_mode=/etc/secret-volume/data-1"},
  491. VolumeMounts: []v1.VolumeMount{
  492. {
  493. Name: secretName,
  494. MountPath: "/data",
  495. },
  496. },
  497. },
  498. },
  499. RestartPolicy: v1.RestartPolicyNever,
  500. },
  501. }
  502. }
  503. func getTestPodWithGCEPD(podName, pdName string) *v1.Pod {
  504. return &v1.Pod{
  505. ObjectMeta: metav1.ObjectMeta{
  506. Name: podName,
  507. UID: types.UID(podName + string(uuid.NewUUID())),
  508. },
  509. Spec: v1.PodSpec{
  510. Volumes: []v1.Volume{
  511. {
  512. Name: pdName,
  513. VolumeSource: v1.VolumeSource{
  514. GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
  515. PDName: pdName,
  516. FSType: "ext4",
  517. ReadOnly: false,
  518. },
  519. },
  520. },
  521. },
  522. Containers: []v1.Container{
  523. {
  524. Name: "pd-volume-test",
  525. Image: "k8s.gcr.io/mounttest:0.8",
  526. Args: []string{
  527. "--file_content=/etc/pd-volume/data-1",
  528. },
  529. VolumeMounts: []v1.VolumeMount{
  530. {
  531. Name: pdName,
  532. MountPath: "/data",
  533. },
  534. },
  535. },
  536. },
  537. RestartPolicy: v1.RestartPolicyNever,
  538. },
  539. }
  540. }
  541. func isOperationRunSerially(ch <-chan interface{}, quit chan<- interface{}) bool {
  542. defer close(quit)
  543. numOperationsStarted := 0
  544. loop:
  545. for {
  546. select {
  547. case <-ch:
  548. numOperationsStarted++
  549. if numOperationsStarted > 1 {
  550. return false
  551. }
  552. case <-time.After(5 * time.Second):
  553. break loop
  554. }
  555. }
  556. return true
  557. }
  558. func isOperationRunConcurrently(ch <-chan interface{}, quit chan<- interface{}, numOperationsToRun int) bool {
  559. defer close(quit)
  560. numOperationsStarted := 0
  561. loop:
  562. for {
  563. select {
  564. case <-ch:
  565. numOperationsStarted++
  566. if numOperationsStarted == numOperationsToRun {
  567. return true
  568. }
  569. case <-time.After(5 * time.Second):
  570. break loop
  571. }
  572. }
  573. return false
  574. }
  575. func setup() (chan interface{}, chan interface{}, OperationExecutor) {
  576. ch, quit := make(chan interface{}), make(chan interface{})
  577. return ch, quit, NewOperationExecutor(newFakeOperationGenerator(ch, quit))
  578. }
  579. // This function starts by writing to ch and blocks on the quit channel
  580. // until it is closed by the currently running test
  581. func startOperationAndBlock(ch chan<- interface{}, quit <-chan interface{}) {
  582. ch <- nil
  583. <-quit
  584. }