goroutinemap.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. /*
  14. Package goroutinemap implements a data structure for managing go routines
  15. by name. It prevents the creation of new go routines if an existing go routine
  16. with the same name exists.
  17. */
  18. package goroutinemap
  19. import (
  20. "fmt"
  21. "sync"
  22. k8sRuntime "k8s.io/apimachinery/pkg/util/runtime"
  23. "k8s.io/klog"
  24. "k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
  25. )
  26. // GoRoutineMap defines a type that can run named goroutines and track their
  27. // state. It prevents the creation of multiple goroutines with the same name
  28. // and may prevent recreation of a goroutine until after the a backoff time
  29. // has elapsed after the last goroutine with that name finished.
  30. type GoRoutineMap interface {
  31. // Run adds operation name to the list of running operations and spawns a
  32. // new go routine to execute the operation.
  33. // If an operation with the same operation name already exists, an
  34. // AlreadyExists or ExponentialBackoff error is returned.
  35. // Once the operation is complete, the go routine is terminated and the
  36. // operation name is removed from the list of executing operations allowing
  37. // a new operation to be started with the same operation name without error.
  38. Run(operationName string, operationFunc func() error) error
  39. // Wait blocks until operations map is empty. This is typically
  40. // necessary during tests - the test should wait until all operations finish
  41. // and evaluate results after that.
  42. Wait()
  43. // WaitForCompletion blocks until either all operations have successfully completed
  44. // or have failed but are not pending. The test should wait until operations are either
  45. // complete or have failed.
  46. WaitForCompletion()
  47. // IsOperationPending returns true if the operation is pending (currently
  48. // running), otherwise returns false.
  49. IsOperationPending(operationName string) bool
  50. }
  51. // NewGoRoutineMap returns a new instance of GoRoutineMap.
  52. func NewGoRoutineMap(exponentialBackOffOnError bool) GoRoutineMap {
  53. g := &goRoutineMap{
  54. operations: make(map[string]operation),
  55. exponentialBackOffOnError: exponentialBackOffOnError,
  56. }
  57. g.cond = sync.NewCond(&g.lock)
  58. return g
  59. }
  60. type goRoutineMap struct {
  61. operations map[string]operation
  62. exponentialBackOffOnError bool
  63. cond *sync.Cond
  64. lock sync.RWMutex
  65. }
  66. // operation holds the state of a single goroutine.
  67. type operation struct {
  68. operationPending bool
  69. expBackoff exponentialbackoff.ExponentialBackoff
  70. }
  71. func (grm *goRoutineMap) Run(
  72. operationName string,
  73. operationFunc func() error) error {
  74. grm.lock.Lock()
  75. defer grm.lock.Unlock()
  76. existingOp, exists := grm.operations[operationName]
  77. if exists {
  78. // Operation with name exists
  79. if existingOp.operationPending {
  80. return NewAlreadyExistsError(operationName)
  81. }
  82. if err := existingOp.expBackoff.SafeToRetry(operationName); err != nil {
  83. return err
  84. }
  85. }
  86. grm.operations[operationName] = operation{
  87. operationPending: true,
  88. expBackoff: existingOp.expBackoff,
  89. }
  90. go func() (err error) {
  91. // Handle unhandled panics (very unlikely)
  92. defer k8sRuntime.HandleCrash()
  93. // Handle completion of and error, if any, from operationFunc()
  94. defer grm.operationComplete(operationName, &err)
  95. // Handle panic, if any, from operationFunc()
  96. defer k8sRuntime.RecoverFromPanic(&err)
  97. return operationFunc()
  98. }()
  99. return nil
  100. }
  101. // operationComplete handles the completion of a goroutine run in the
  102. // goRoutineMap.
  103. func (grm *goRoutineMap) operationComplete(
  104. operationName string, err *error) {
  105. // Defer operations are executed in Last-In is First-Out order. In this case
  106. // the lock is acquired first when operationCompletes begins, and is
  107. // released when the method finishes, after the lock is released cond is
  108. // signaled to wake waiting goroutine.
  109. defer grm.cond.Signal()
  110. grm.lock.Lock()
  111. defer grm.lock.Unlock()
  112. if *err == nil || !grm.exponentialBackOffOnError {
  113. // Operation completed without error, or exponentialBackOffOnError disabled
  114. delete(grm.operations, operationName)
  115. if *err != nil {
  116. // Log error
  117. klog.Errorf("operation for %q failed with: %v",
  118. operationName,
  119. *err)
  120. }
  121. } else {
  122. // Operation completed with error and exponentialBackOffOnError Enabled
  123. existingOp := grm.operations[operationName]
  124. existingOp.expBackoff.Update(err)
  125. existingOp.operationPending = false
  126. grm.operations[operationName] = existingOp
  127. // Log error
  128. klog.Errorf("%v",
  129. existingOp.expBackoff.GenerateNoRetriesPermittedMsg(operationName))
  130. }
  131. }
  132. func (grm *goRoutineMap) IsOperationPending(operationName string) bool {
  133. grm.lock.RLock()
  134. defer grm.lock.RUnlock()
  135. existingOp, exists := grm.operations[operationName]
  136. if exists && existingOp.operationPending {
  137. return true
  138. }
  139. return false
  140. }
  141. func (grm *goRoutineMap) Wait() {
  142. grm.lock.Lock()
  143. defer grm.lock.Unlock()
  144. for len(grm.operations) > 0 {
  145. grm.cond.Wait()
  146. }
  147. }
  148. func (grm *goRoutineMap) WaitForCompletion() {
  149. grm.lock.Lock()
  150. defer grm.lock.Unlock()
  151. for {
  152. if len(grm.operations) == 0 || grm.nothingPending() {
  153. break
  154. } else {
  155. grm.cond.Wait()
  156. }
  157. }
  158. }
  159. // Check if any operation is pending. Already assumes caller has the
  160. // necessary locks
  161. func (grm *goRoutineMap) nothingPending() bool {
  162. nothingIsPending := true
  163. for _, operation := range grm.operations {
  164. if operation.operationPending {
  165. nothingIsPending = false
  166. break
  167. }
  168. }
  169. return nothingIsPending
  170. }
  171. // NewAlreadyExistsError returns a new instance of AlreadyExists error.
  172. func NewAlreadyExistsError(operationName string) error {
  173. return alreadyExistsError{operationName}
  174. }
  175. // IsAlreadyExists returns true if an error returned from GoRoutineMap indicates
  176. // a new operation can not be started because an operation with the same
  177. // operation name is already executing.
  178. func IsAlreadyExists(err error) bool {
  179. switch err.(type) {
  180. case alreadyExistsError:
  181. return true
  182. default:
  183. return false
  184. }
  185. }
  186. // alreadyExistsError is the error returned by GoRoutineMap when a new operation
  187. // can not be started because an operation with the same operation name is
  188. // already executing.
  189. type alreadyExistsError struct {
  190. operationName string
  191. }
  192. var _ error = alreadyExistsError{}
  193. func (err alreadyExistsError) Error() string {
  194. return fmt.Sprintf(
  195. "Failed to create operation with name %q. An operation with that name is already executing.",
  196. err.operationName)
  197. }