nestedpendingoperations_test.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package nestedpendingoperations
  14. import (
  15. "fmt"
  16. "testing"
  17. "time"
  18. "k8s.io/api/core/v1"
  19. "k8s.io/apimachinery/pkg/util/wait"
  20. "k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
  21. "k8s.io/kubernetes/pkg/volume/util/types"
  22. )
  23. const (
  24. // testTimeout is a timeout of goroutines to finish. This _should_ be just a
  25. // "context switch" and it should take several ms, however, Clayton says "We
  26. // have had flakes due to tests that assumed that 15s is long enough to sleep")
  27. testTimeout time.Duration = 1 * time.Minute
  28. // initialOperationWaitTimeShort is the initial amount of time the test will
  29. // wait for an operation to complete (each successive failure results in
  30. // exponential backoff).
  31. initialOperationWaitTimeShort time.Duration = 20 * time.Millisecond
  32. // initialOperationWaitTimeLong is the initial amount of time the test will
  33. // wait for an operation to complete (each successive failure results in
  34. // exponential backoff).
  35. initialOperationWaitTimeLong time.Duration = 500 * time.Millisecond
  36. )
  37. func Test_NewGoRoutineMap_Positive_SingleOp(t *testing.T) {
  38. // Arrange
  39. grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
  40. volumeName := v1.UniqueVolumeName("volume-name")
  41. operation := func() (error, error) { return nil, nil }
  42. // Act
  43. err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation})
  44. // Assert
  45. if err != nil {
  46. t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err)
  47. }
  48. }
  49. func Test_NewGoRoutineMap_Positive_TwoOps(t *testing.T) {
  50. // Arrange
  51. grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
  52. volume1Name := v1.UniqueVolumeName("volume1-name")
  53. volume2Name := v1.UniqueVolumeName("volume2-name")
  54. operation := func() (error, error) { return nil, nil }
  55. // Act
  56. err1 := grm.Run(volume1Name, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation})
  57. err2 := grm.Run(volume2Name, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation})
  58. // Assert
  59. if err1 != nil {
  60. t.Fatalf("NewGoRoutine %q failed. Expected: <no error> Actual: <%v>", volume1Name, err1)
  61. }
  62. if err2 != nil {
  63. t.Fatalf("NewGoRoutine %q failed. Expected: <no error> Actual: <%v>", volume2Name, err2)
  64. }
  65. }
  66. func Test_NewGoRoutineMap_Positive_TwoSubOps(t *testing.T) {
  67. // Arrange
  68. grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
  69. volumeName := v1.UniqueVolumeName("volume-name")
  70. operation1PodName := types.UniquePodName("operation1-podname")
  71. operation2PodName := types.UniquePodName("operation2-podname")
  72. operation := func() (error, error) { return nil, nil }
  73. // Act
  74. err1 := grm.Run(volumeName, operation1PodName, types.GeneratedOperations{OperationFunc: operation})
  75. err2 := grm.Run(volumeName, operation2PodName, types.GeneratedOperations{OperationFunc: operation})
  76. // Assert
  77. if err1 != nil {
  78. t.Fatalf("NewGoRoutine %q failed. Expected: <no error> Actual: <%v>", operation1PodName, err1)
  79. }
  80. if err2 != nil {
  81. t.Fatalf("NewGoRoutine %q failed. Expected: <no error> Actual: <%v>", operation2PodName, err2)
  82. }
  83. }
  84. func Test_NewGoRoutineMap_Positive_SingleOpWithExpBackoff(t *testing.T) {
  85. // Arrange
  86. grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
  87. volumeName := v1.UniqueVolumeName("volume-name")
  88. operation := func() (error, error) { return nil, nil }
  89. // Act
  90. err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation})
  91. // Assert
  92. if err != nil {
  93. t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err)
  94. }
  95. }
  96. func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletes(t *testing.T) {
  97. // Arrange
  98. grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
  99. volumeName := v1.UniqueVolumeName("volume-name")
  100. operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
  101. operation1 := generateCallbackFunc(operation1DoneCh)
  102. err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1})
  103. if err1 != nil {
  104. t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
  105. }
  106. operation2 := generateNoopFunc()
  107. <-operation1DoneCh // Force operation1 to complete
  108. // Act
  109. err2 := retryWithExponentialBackOff(
  110. time.Duration(initialOperationWaitTimeShort),
  111. func() (bool, error) {
  112. err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2})
  113. if err != nil {
  114. t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
  115. return false, nil
  116. }
  117. return true, nil
  118. },
  119. )
  120. // Assert
  121. if err2 != nil {
  122. t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err2)
  123. }
  124. }
  125. func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletesWithExpBackoff(t *testing.T) {
  126. // Arrange
  127. grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
  128. volumeName := v1.UniqueVolumeName("volume-name")
  129. operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
  130. operation1 := generateCallbackFunc(operation1DoneCh)
  131. err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1})
  132. if err1 != nil {
  133. t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
  134. }
  135. operation2 := generateNoopFunc()
  136. <-operation1DoneCh // Force operation1 to complete
  137. // Act
  138. err2 := retryWithExponentialBackOff(
  139. time.Duration(initialOperationWaitTimeShort),
  140. func() (bool, error) {
  141. err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2})
  142. if err != nil {
  143. t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
  144. return false, nil
  145. }
  146. return true, nil
  147. },
  148. )
  149. // Assert
  150. if err2 != nil {
  151. t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err2)
  152. }
  153. }
  154. func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanics(t *testing.T) {
  155. // Arrange
  156. grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
  157. volumeName := v1.UniqueVolumeName("volume-name")
  158. operation1 := generatePanicFunc()
  159. err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1})
  160. if err1 != nil {
  161. t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
  162. }
  163. operation2 := generateNoopFunc()
  164. // Act
  165. err2 := retryWithExponentialBackOff(
  166. time.Duration(initialOperationWaitTimeShort),
  167. func() (bool, error) {
  168. err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2})
  169. if err != nil {
  170. t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
  171. return false, nil
  172. }
  173. return true, nil
  174. },
  175. )
  176. // Assert
  177. if err2 != nil {
  178. t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err2)
  179. }
  180. }
  181. func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanicsWithExpBackoff(t *testing.T) {
  182. // Arrange
  183. grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
  184. volumeName := v1.UniqueVolumeName("volume-name")
  185. operation1 := generatePanicFunc()
  186. err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1})
  187. if err1 != nil {
  188. t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
  189. }
  190. operation2 := generateNoopFunc()
  191. // Act
  192. err2 := retryWithExponentialBackOff(
  193. time.Duration(initialOperationWaitTimeLong), // Longer duration to accommodate for backoff
  194. func() (bool, error) {
  195. err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2})
  196. if err != nil {
  197. t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
  198. return false, nil
  199. }
  200. return true, nil
  201. },
  202. )
  203. // Assert
  204. if err2 != nil {
  205. t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err2)
  206. }
  207. }
  208. func Test_NewGoRoutineMap_Negative_SecondOpBeforeFirstCompletes(t *testing.T) {
  209. // Arrange
  210. grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
  211. volumeName := v1.UniqueVolumeName("volume-name")
  212. operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
  213. operation1 := generateWaitFunc(operation1DoneCh)
  214. err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1})
  215. if err1 != nil {
  216. t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
  217. }
  218. operation2 := generateNoopFunc()
  219. // Act
  220. err2 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2})
  221. // Assert
  222. if err2 == nil {
  223. t.Fatalf("NewGoRoutine did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName)
  224. }
  225. if !IsAlreadyExists(err2) {
  226. t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2)
  227. }
  228. }
  229. func Test_NewGoRoutineMap_Negative_SecondThirdOpWithDifferentNames(t *testing.T) {
  230. // Arrange
  231. grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
  232. volumeName := v1.UniqueVolumeName("volume-name")
  233. op1Name := "mount_volume"
  234. operation1 := generateErrorFunc()
  235. err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1, OperationName: op1Name})
  236. if err1 != nil {
  237. t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
  238. }
  239. // Shorter than exponential backoff period, so as to trigger exponential backoff error on second
  240. // operation.
  241. operation2 := generateErrorFunc()
  242. err2 := retryWithExponentialBackOff(
  243. initialOperationWaitTimeShort,
  244. func() (bool, error) {
  245. err := grm.Run(volumeName,
  246. "", /* operationSubName */
  247. types.GeneratedOperations{OperationFunc: operation2, OperationName: op1Name})
  248. if exponentialbackoff.IsExponentialBackoff(err) {
  249. return true, nil
  250. }
  251. return false, nil
  252. },
  253. )
  254. // Assert
  255. if err2 != nil {
  256. t.Fatalf("Expected NewGoRoutine to fail with exponential backoff for operationKey : %s and operationName : %s", volumeName, op1Name)
  257. }
  258. operation3 := generateNoopFunc()
  259. op3Name := "unmount_volume"
  260. // Act
  261. err3 := grm.Run(volumeName, "" /*pod name*/, types.GeneratedOperations{OperationFunc: operation3, OperationName: op3Name})
  262. if err3 != nil {
  263. t.Fatalf("NewGoRoutine failed. Expected <no error> Actual: <%v>", err3)
  264. }
  265. }
  266. func Test_NewGoRoutineMap_Negative_SecondSubOpBeforeFirstCompletes2(t *testing.T) {
  267. // Arrange
  268. grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
  269. volumeName := v1.UniqueVolumeName("volume-name")
  270. operationPodName := types.UniquePodName("operation-podname")
  271. operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
  272. operation1 := generateWaitFunc(operation1DoneCh)
  273. err1 := grm.Run(volumeName, operationPodName, types.GeneratedOperations{OperationFunc: operation1})
  274. if err1 != nil {
  275. t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
  276. }
  277. operation2 := generateNoopFunc()
  278. // Act
  279. err2 := grm.Run(volumeName, operationPodName, types.GeneratedOperations{OperationFunc: operation2})
  280. // Assert
  281. if err2 == nil {
  282. t.Fatalf("NewGoRoutine did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName)
  283. }
  284. if !IsAlreadyExists(err2) {
  285. t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2)
  286. }
  287. }
  288. func Test_NewGoRoutineMap_Negative_SecondSubOpBeforeFirstCompletes(t *testing.T) {
  289. // Arrange
  290. grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
  291. volumeName := v1.UniqueVolumeName("volume-name")
  292. operationPodName := types.UniquePodName("operation-podname")
  293. operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
  294. operation1 := generateWaitFunc(operation1DoneCh)
  295. err1 := grm.Run(volumeName, operationPodName, types.GeneratedOperations{OperationFunc: operation1})
  296. if err1 != nil {
  297. t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
  298. }
  299. operation2 := generateNoopFunc()
  300. // Act
  301. err2 := grm.Run(volumeName, operationPodName, types.GeneratedOperations{OperationFunc: operation2})
  302. // Assert
  303. if err2 == nil {
  304. t.Fatalf("NewGoRoutine did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName)
  305. }
  306. if !IsAlreadyExists(err2) {
  307. t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2)
  308. }
  309. }
  310. func Test_NewGoRoutineMap_Negative_SecondOpBeforeFirstCompletesWithExpBackoff(t *testing.T) {
  311. // Arrange
  312. grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
  313. volumeName := v1.UniqueVolumeName("volume-name")
  314. operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
  315. operation1 := generateWaitFunc(operation1DoneCh)
  316. err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1})
  317. if err1 != nil {
  318. t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
  319. }
  320. operation2 := generateNoopFunc()
  321. // Act
  322. err2 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2})
  323. // Assert
  324. if err2 == nil {
  325. t.Fatalf("NewGoRoutine did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName)
  326. }
  327. if !IsAlreadyExists(err2) {
  328. t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2)
  329. }
  330. }
  331. func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletes(t *testing.T) {
  332. // Arrange
  333. grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
  334. volumeName := v1.UniqueVolumeName("volume-name")
  335. operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
  336. operation1 := generateWaitFunc(operation1DoneCh)
  337. err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1})
  338. if err1 != nil {
  339. t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
  340. }
  341. operation2 := generateNoopFunc()
  342. operation3 := generateNoopFunc()
  343. // Act
  344. err2 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2})
  345. // Assert
  346. if err2 == nil {
  347. t.Fatalf("NewGoRoutine did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName)
  348. }
  349. if !IsAlreadyExists(err2) {
  350. t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2)
  351. }
  352. // Act
  353. operation1DoneCh <- true // Force operation1 to complete
  354. err3 := retryWithExponentialBackOff(
  355. time.Duration(initialOperationWaitTimeShort),
  356. func() (bool, error) {
  357. err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation3})
  358. if err != nil {
  359. t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
  360. return false, nil
  361. }
  362. return true, nil
  363. },
  364. )
  365. // Assert
  366. if err3 != nil {
  367. t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err3)
  368. }
  369. }
  370. func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletesWithExpBackoff(t *testing.T) {
  371. // Arrange
  372. grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
  373. volumeName := v1.UniqueVolumeName("volume-name")
  374. operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
  375. operation1 := generateWaitFunc(operation1DoneCh)
  376. err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1})
  377. if err1 != nil {
  378. t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
  379. }
  380. operation2 := generateNoopFunc()
  381. operation3 := generateNoopFunc()
  382. // Act
  383. err2 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2})
  384. // Assert
  385. if err2 == nil {
  386. t.Fatalf("NewGoRoutine did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName)
  387. }
  388. if !IsAlreadyExists(err2) {
  389. t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2)
  390. }
  391. // Act
  392. operation1DoneCh <- true // Force operation1 to complete
  393. err3 := retryWithExponentialBackOff(
  394. time.Duration(initialOperationWaitTimeShort),
  395. func() (bool, error) {
  396. err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation3})
  397. if err != nil {
  398. t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
  399. return false, nil
  400. }
  401. return true, nil
  402. },
  403. )
  404. // Assert
  405. if err3 != nil {
  406. t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err3)
  407. }
  408. }
  409. func Test_NewGoRoutineMap_Positive_WaitEmpty(t *testing.T) {
  410. // Test than Wait() on empty GoRoutineMap always succeeds without blocking
  411. // Arrange
  412. grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
  413. // Act
  414. waitDoneCh := make(chan interface{}, 1)
  415. go func() {
  416. grm.Wait()
  417. waitDoneCh <- true
  418. }()
  419. // Assert
  420. err := waitChannelWithTimeout(waitDoneCh, testTimeout)
  421. if err != nil {
  422. t.Errorf("Error waiting for GoRoutineMap.Wait: %v", err)
  423. }
  424. }
  425. func Test_NewGoRoutineMap_Positive_WaitEmptyWithExpBackoff(t *testing.T) {
  426. // Test than Wait() on empty GoRoutineMap always succeeds without blocking
  427. // Arrange
  428. grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
  429. // Act
  430. waitDoneCh := make(chan interface{}, 1)
  431. go func() {
  432. grm.Wait()
  433. waitDoneCh <- true
  434. }()
  435. // Assert
  436. err := waitChannelWithTimeout(waitDoneCh, testTimeout)
  437. if err != nil {
  438. t.Errorf("Error waiting for GoRoutineMap.Wait: %v", err)
  439. }
  440. }
  441. func Test_NewGoRoutineMap_Positive_Wait(t *testing.T) {
  442. // Test that Wait() really blocks until the last operation succeeds
  443. // Arrange
  444. grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
  445. volumeName := v1.UniqueVolumeName("volume-name")
  446. operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
  447. operation1 := generateWaitFunc(operation1DoneCh)
  448. err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1})
  449. if err != nil {
  450. t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err)
  451. }
  452. // Act
  453. waitDoneCh := make(chan interface{}, 1)
  454. go func() {
  455. grm.Wait()
  456. waitDoneCh <- true
  457. }()
  458. // Finish the operation
  459. operation1DoneCh <- true
  460. // Assert
  461. err = waitChannelWithTimeout(waitDoneCh, testTimeout)
  462. if err != nil {
  463. t.Fatalf("Error waiting for GoRoutineMap.Wait: %v", err)
  464. }
  465. }
  466. func Test_NewGoRoutineMap_Positive_WaitWithExpBackoff(t *testing.T) {
  467. // Test that Wait() really blocks until the last operation succeeds
  468. // Arrange
  469. grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
  470. volumeName := v1.UniqueVolumeName("volume-name")
  471. operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
  472. operation1 := generateWaitFunc(operation1DoneCh)
  473. err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1})
  474. if err != nil {
  475. t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err)
  476. }
  477. // Act
  478. waitDoneCh := make(chan interface{}, 1)
  479. go func() {
  480. grm.Wait()
  481. waitDoneCh <- true
  482. }()
  483. // Finish the operation
  484. operation1DoneCh <- true
  485. // Assert
  486. err = waitChannelWithTimeout(waitDoneCh, testTimeout)
  487. if err != nil {
  488. t.Fatalf("Error waiting for GoRoutineMap.Wait: %v", err)
  489. }
  490. }
  491. func generateCallbackFunc(done chan<- interface{}) func() (error, error) {
  492. return func() (error, error) {
  493. done <- true
  494. return nil, nil
  495. }
  496. }
  497. func generateWaitFunc(done <-chan interface{}) func() (error, error) {
  498. return func() (error, error) {
  499. <-done
  500. return nil, nil
  501. }
  502. }
  503. func generatePanicFunc() func() (error, error) {
  504. return func() (error, error) {
  505. panic("testing panic")
  506. }
  507. }
  508. func generateErrorFunc() func() (error, error) {
  509. return func() (error, error) {
  510. return fmt.Errorf("placholder1"), fmt.Errorf("placeholder2")
  511. }
  512. }
  513. func generateNoopFunc() func() (error, error) {
  514. return func() (error, error) { return nil, nil }
  515. }
  516. func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.ConditionFunc) error {
  517. backoff := wait.Backoff{
  518. Duration: initialDuration,
  519. Factor: 3,
  520. Jitter: 0,
  521. Steps: 4,
  522. }
  523. return wait.ExponentialBackoff(backoff, fn)
  524. }
  525. func waitChannelWithTimeout(ch <-chan interface{}, timeout time.Duration) error {
  526. timer := time.NewTimer(timeout)
  527. defer timer.Stop()
  528. select {
  529. case <-ch:
  530. // Success!
  531. return nil
  532. case <-timer.C:
  533. return fmt.Errorf("timeout after %v", timeout)
  534. }
  535. }