goroutinemap_test.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package goroutinemap
  14. import (
  15. "fmt"
  16. "testing"
  17. "time"
  18. "k8s.io/apimachinery/pkg/util/wait"
  19. )
  20. const (
  21. // testTimeout is a timeout of goroutines to finish. This _should_ be just a
  22. // "context switch" and it should take several ms, however, Clayton says "We
  23. // have had flakes due to tests that assumed that 15s is long enough to sleep")
  24. testTimeout time.Duration = 1 * time.Minute
  25. // initialOperationWaitTimeShort is the initial amount of time the test will
  26. // wait for an operation to complete (each successive failure results in
  27. // exponential backoff).
  28. initialOperationWaitTimeShort time.Duration = 20 * time.Millisecond
  29. // initialOperationWaitTimeLong is the initial amount of time the test will
  30. // wait for an operation to complete (each successive failure results in
  31. // exponential backoff).
  32. initialOperationWaitTimeLong time.Duration = 500 * time.Millisecond
  33. )
  34. func Test_NewGoRoutineMap_Positive_SingleOp(t *testing.T) {
  35. // Arrange
  36. grm := NewGoRoutineMap(false /* exponentialBackOffOnError */)
  37. operationName := "operation-name"
  38. operation := func() error { return nil }
  39. // Act
  40. err := grm.Run(operationName, operation)
  41. // Assert
  42. if err != nil {
  43. t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err)
  44. }
  45. }
  46. func Test_NewGoRoutineMap_Positive_TwoOps(t *testing.T) {
  47. // Arrange
  48. grm := NewGoRoutineMap(false /* exponentialBackOffOnError */)
  49. operation1Name := "operation1-name"
  50. operation2Name := "operation2-name"
  51. operation := func() error { return nil }
  52. // Act
  53. err1 := grm.Run(operation1Name, operation)
  54. err2 := grm.Run(operation2Name, operation)
  55. // Assert
  56. if err1 != nil {
  57. t.Fatalf("NewGoRoutine %q failed. Expected: <no error> Actual: <%v>", operation1Name, err1)
  58. }
  59. if err2 != nil {
  60. t.Fatalf("NewGoRoutine %q failed. Expected: <no error> Actual: <%v>", operation2Name, err2)
  61. }
  62. }
  63. func Test_NewGoRoutineMap_Positive_SingleOpWithExpBackoff(t *testing.T) {
  64. // Arrange
  65. grm := NewGoRoutineMap(true /* exponentialBackOffOnError */)
  66. operationName := "operation-name"
  67. operation := func() error { return nil }
  68. // Act
  69. err := grm.Run(operationName, operation)
  70. // Assert
  71. if err != nil {
  72. t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err)
  73. }
  74. }
  75. func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletes(t *testing.T) {
  76. // Arrange
  77. grm := NewGoRoutineMap(false /* exponentialBackOffOnError */)
  78. operationName := "operation-name"
  79. operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
  80. operation1 := generateCallbackFunc(operation1DoneCh)
  81. err1 := grm.Run(operationName, operation1)
  82. if err1 != nil {
  83. t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
  84. }
  85. operation2 := generateNoopFunc()
  86. <-operation1DoneCh // Force operation1 to complete
  87. // Act
  88. err2 := retryWithExponentialBackOff(
  89. time.Duration(initialOperationWaitTimeShort),
  90. func() (bool, error) {
  91. err := grm.Run(operationName, operation2)
  92. if err != nil {
  93. t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
  94. return false, nil
  95. }
  96. return true, nil
  97. },
  98. )
  99. // Assert
  100. if err2 != nil {
  101. t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err2)
  102. }
  103. }
  104. func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletesWithExpBackoff(t *testing.T) {
  105. // Arrange
  106. grm := NewGoRoutineMap(true /* exponentialBackOffOnError */)
  107. operationName := "operation-name"
  108. operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
  109. operation1 := generateCallbackFunc(operation1DoneCh)
  110. err1 := grm.Run(operationName, operation1)
  111. if err1 != nil {
  112. t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
  113. }
  114. operation2 := generateNoopFunc()
  115. <-operation1DoneCh // Force operation1 to complete
  116. // Act
  117. err2 := retryWithExponentialBackOff(
  118. time.Duration(initialOperationWaitTimeShort),
  119. func() (bool, error) {
  120. err := grm.Run(operationName, operation2)
  121. if err != nil {
  122. t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
  123. return false, nil
  124. }
  125. return true, nil
  126. },
  127. )
  128. // Assert
  129. if err2 != nil {
  130. t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err2)
  131. }
  132. }
  133. func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanics(t *testing.T) {
  134. // Arrange
  135. grm := NewGoRoutineMap(false /* exponentialBackOffOnError */)
  136. operationName := "operation-name"
  137. operation1 := generatePanicFunc()
  138. err1 := grm.Run(operationName, operation1)
  139. if err1 != nil {
  140. t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
  141. }
  142. operation2 := generateNoopFunc()
  143. // Act
  144. err2 := retryWithExponentialBackOff(
  145. time.Duration(initialOperationWaitTimeShort),
  146. func() (bool, error) {
  147. err := grm.Run(operationName, operation2)
  148. if err != nil {
  149. t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
  150. return false, nil
  151. }
  152. return true, nil
  153. },
  154. )
  155. // Assert
  156. if err2 != nil {
  157. t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err2)
  158. }
  159. }
  160. func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanicsWithExpBackoff(t *testing.T) {
  161. // Arrange
  162. grm := NewGoRoutineMap(true /* exponentialBackOffOnError */)
  163. operationName := "operation-name"
  164. operation1 := generatePanicFunc()
  165. err1 := grm.Run(operationName, operation1)
  166. if err1 != nil {
  167. t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
  168. }
  169. operation2 := generateNoopFunc()
  170. // Act
  171. err2 := retryWithExponentialBackOff(
  172. time.Duration(initialOperationWaitTimeLong), // Longer duration to accommodate for backoff
  173. func() (bool, error) {
  174. err := grm.Run(operationName, operation2)
  175. if err != nil {
  176. t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
  177. return false, nil
  178. }
  179. return true, nil
  180. },
  181. )
  182. // Assert
  183. if err2 != nil {
  184. t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err2)
  185. }
  186. }
  187. func Test_NewGoRoutineMap_Negative_SecondOpBeforeFirstCompletes(t *testing.T) {
  188. // Arrange
  189. grm := NewGoRoutineMap(false /* exponentialBackOffOnError */)
  190. operationName := "operation-name"
  191. operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
  192. operation1 := generateWaitFunc(operation1DoneCh)
  193. err1 := grm.Run(operationName, operation1)
  194. if err1 != nil {
  195. t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
  196. }
  197. operation2 := generateNoopFunc()
  198. // Act
  199. err2 := grm.Run(operationName, operation2)
  200. // Assert
  201. if err2 == nil {
  202. t.Fatalf("NewGoRoutine did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", operationName)
  203. }
  204. if !IsAlreadyExists(err2) {
  205. t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2)
  206. }
  207. }
  208. func Test_NewGoRoutineMap_Negative_SecondOpBeforeFirstCompletesWithExpBackoff(t *testing.T) {
  209. // Arrange
  210. grm := NewGoRoutineMap(true /* exponentialBackOffOnError */)
  211. operationName := "operation-name"
  212. operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
  213. operation1 := generateWaitFunc(operation1DoneCh)
  214. err1 := grm.Run(operationName, operation1)
  215. if err1 != nil {
  216. t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
  217. }
  218. operation2 := generateNoopFunc()
  219. // Act
  220. err2 := grm.Run(operationName, operation2)
  221. // Assert
  222. if err2 == nil {
  223. t.Fatalf("NewGoRoutine did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", operationName)
  224. }
  225. if !IsAlreadyExists(err2) {
  226. t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2)
  227. }
  228. }
  229. func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletes(t *testing.T) {
  230. // Arrange
  231. grm := NewGoRoutineMap(false /* exponentialBackOffOnError */)
  232. operationName := "operation-name"
  233. operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
  234. operation1 := generateWaitFunc(operation1DoneCh)
  235. err1 := grm.Run(operationName, operation1)
  236. if err1 != nil {
  237. t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
  238. }
  239. operation2 := generateNoopFunc()
  240. operation3 := generateNoopFunc()
  241. // Act
  242. err2 := grm.Run(operationName, operation2)
  243. // Assert
  244. if err2 == nil {
  245. t.Fatalf("NewGoRoutine did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", operationName)
  246. }
  247. if !IsAlreadyExists(err2) {
  248. t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2)
  249. }
  250. // Act
  251. operation1DoneCh <- true // Force operation1 to complete
  252. err3 := retryWithExponentialBackOff(
  253. time.Duration(initialOperationWaitTimeShort),
  254. func() (bool, error) {
  255. err := grm.Run(operationName, operation3)
  256. if err != nil {
  257. t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
  258. return false, nil
  259. }
  260. return true, nil
  261. },
  262. )
  263. // Assert
  264. if err3 != nil {
  265. t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err3)
  266. }
  267. }
  268. func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletesWithExpBackoff(t *testing.T) {
  269. // Arrange
  270. grm := NewGoRoutineMap(true /* exponentialBackOffOnError */)
  271. operationName := "operation-name"
  272. operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
  273. operation1 := generateWaitFunc(operation1DoneCh)
  274. err1 := grm.Run(operationName, operation1)
  275. if err1 != nil {
  276. t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
  277. }
  278. operation2 := generateNoopFunc()
  279. operation3 := generateNoopFunc()
  280. // Act
  281. err2 := grm.Run(operationName, operation2)
  282. // Assert
  283. if err2 == nil {
  284. t.Fatalf("NewGoRoutine did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", operationName)
  285. }
  286. if !IsAlreadyExists(err2) {
  287. t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2)
  288. }
  289. // Act
  290. operation1DoneCh <- true // Force operation1 to complete
  291. err3 := retryWithExponentialBackOff(
  292. time.Duration(initialOperationWaitTimeShort),
  293. func() (bool, error) {
  294. err := grm.Run(operationName, operation3)
  295. if err != nil {
  296. t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
  297. return false, nil
  298. }
  299. return true, nil
  300. },
  301. )
  302. // Assert
  303. if err3 != nil {
  304. t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err3)
  305. }
  306. }
  307. func Test_NewGoRoutineMap_Positive_WaitEmpty(t *testing.T) {
  308. // Test than Wait() on empty GoRoutineMap always succeeds without blocking
  309. // Arrange
  310. grm := NewGoRoutineMap(false /* exponentialBackOffOnError */)
  311. // Act
  312. waitDoneCh := make(chan interface{}, 1)
  313. go func() {
  314. grm.Wait()
  315. waitDoneCh <- true
  316. }()
  317. // Assert
  318. err := waitChannelWithTimeout(waitDoneCh, testTimeout)
  319. if err != nil {
  320. t.Errorf("Error waiting for GoRoutineMap.Wait: %v", err)
  321. }
  322. }
  323. func Test_NewGoRoutineMap_Positive_WaitEmptyWithExpBackoff(t *testing.T) {
  324. // Test than Wait() on empty GoRoutineMap always succeeds without blocking
  325. // Arrange
  326. grm := NewGoRoutineMap(true /* exponentialBackOffOnError */)
  327. // Act
  328. waitDoneCh := make(chan interface{}, 1)
  329. go func() {
  330. grm.Wait()
  331. waitDoneCh <- true
  332. }()
  333. // Assert
  334. err := waitChannelWithTimeout(waitDoneCh, testTimeout)
  335. if err != nil {
  336. t.Errorf("Error waiting for GoRoutineMap.Wait: %v", err)
  337. }
  338. }
  339. func Test_NewGoRoutineMap_Positive_Wait(t *testing.T) {
  340. // Test that Wait() really blocks until the last operation succeeds
  341. // Arrange
  342. grm := NewGoRoutineMap(false /* exponentialBackOffOnError */)
  343. operationName := "operation-name"
  344. operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
  345. operation1 := generateWaitFunc(operation1DoneCh)
  346. err := grm.Run(operationName, operation1)
  347. if err != nil {
  348. t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err)
  349. }
  350. // Act
  351. waitDoneCh := make(chan interface{}, 1)
  352. go func() {
  353. grm.Wait()
  354. waitDoneCh <- true
  355. }()
  356. // Finish the operation
  357. operation1DoneCh <- true
  358. // Assert
  359. err = waitChannelWithTimeout(waitDoneCh, testTimeout)
  360. if err != nil {
  361. t.Fatalf("Error waiting for GoRoutineMap.Wait: %v", err)
  362. }
  363. }
  364. func Test_NewGoRoutineMap_Positive_WaitWithExpBackoff(t *testing.T) {
  365. // Test that Wait() really blocks until the last operation succeeds
  366. // Arrange
  367. grm := NewGoRoutineMap(true /* exponentialBackOffOnError */)
  368. operationName := "operation-name"
  369. operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
  370. operation1 := generateWaitFunc(operation1DoneCh)
  371. err := grm.Run(operationName, operation1)
  372. if err != nil {
  373. t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err)
  374. }
  375. // Act
  376. waitDoneCh := make(chan interface{}, 1)
  377. go func() {
  378. grm.Wait()
  379. waitDoneCh <- true
  380. }()
  381. // Finish the operation
  382. operation1DoneCh <- true
  383. // Assert
  384. err = waitChannelWithTimeout(waitDoneCh, testTimeout)
  385. if err != nil {
  386. t.Fatalf("Error waiting for GoRoutineMap.Wait: %v", err)
  387. }
  388. }
  389. func Test_NewGoRoutineMap_WaitForCompletionWithExpBackoff(t *testing.T) {
  390. grm := NewGoRoutineMap(true /* exponentialBackOffOnError */)
  391. operationName := "operation-err"
  392. operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
  393. operation1 := generateErrorFunc(operation1DoneCh)
  394. err := grm.Run(operationName, operation1)
  395. if err != nil {
  396. t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err)
  397. }
  398. // Act
  399. waitDoneCh := make(chan interface{}, 1)
  400. go func() {
  401. grm.WaitForCompletion()
  402. waitDoneCh <- true
  403. }()
  404. // Finish the operation
  405. operation1DoneCh <- true
  406. // Assert that WaitForCompletion returns even if scheduled op had error
  407. err = waitChannelWithTimeout(waitDoneCh, testTimeout)
  408. if err != nil {
  409. t.Fatalf("Error waiting for GoRoutineMap.Wait: %v", err)
  410. }
  411. }
  412. func generateCallbackFunc(done chan<- interface{}) func() error {
  413. return func() error {
  414. done <- true
  415. return nil
  416. }
  417. }
  418. func generateErrorFunc(done <-chan interface{}) func() error {
  419. return func() error {
  420. <-done
  421. return fmt.Errorf("Generic error")
  422. }
  423. }
  424. func generateWaitFunc(done <-chan interface{}) func() error {
  425. return func() error {
  426. <-done
  427. return nil
  428. }
  429. }
  430. func generatePanicFunc() func() error {
  431. return func() error {
  432. panic("testing panic")
  433. }
  434. }
  435. func generateNoopFunc() func() error {
  436. return func() error { return nil }
  437. }
  438. func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.ConditionFunc) error {
  439. backoff := wait.Backoff{
  440. Duration: initialDuration,
  441. Factor: 3,
  442. Jitter: 0,
  443. Steps: 4,
  444. }
  445. return wait.ExponentialBackoff(backoff, fn)
  446. }
  447. func waitChannelWithTimeout(ch <-chan interface{}, timeout time.Duration) error {
  448. timer := time.NewTimer(timeout)
  449. defer timer.Stop()
  450. select {
  451. case <-ch:
  452. // Success!
  453. return nil
  454. case <-timer.C:
  455. return fmt.Errorf("timeout after %v", timeout)
  456. }
  457. }