nestedpendingoperations.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. /*
  14. Package nestedpendingoperations is a modified implementation of
  15. pkg/util/goroutinemap. It implements a data structure for managing go routines
  16. by volume/pod name. It prevents the creation of new go routines if an existing
  17. go routine for the volume already exists. It also allows multiple operations to
  18. execute in parallel for the same volume as long as they are operating on
  19. different pods.
  20. */
  21. package nestedpendingoperations
  22. import (
  23. "fmt"
  24. "sync"
  25. "k8s.io/api/core/v1"
  26. k8sRuntime "k8s.io/apimachinery/pkg/util/runtime"
  27. "k8s.io/klog"
  28. "k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
  29. "k8s.io/kubernetes/pkg/volume/util/types"
  30. )
  31. const (
  32. // EmptyUniquePodName is a UniquePodName for empty string.
  33. EmptyUniquePodName types.UniquePodName = types.UniquePodName("")
  34. // EmptyUniqueVolumeName is a UniqueVolumeName for empty string
  35. EmptyUniqueVolumeName v1.UniqueVolumeName = v1.UniqueVolumeName("")
  36. )
  37. // NestedPendingOperations defines the supported set of operations.
  38. type NestedPendingOperations interface {
  39. // Run adds the concatenation of volumeName and podName to the list of
  40. // running operations and spawns a new go routine to execute operationFunc.
  41. // If an operation with the same volumeName, same or empty podName
  42. // and same operationName exits, an AlreadyExists or ExponentialBackoff
  43. // error is returned. If an operation with same volumeName and podName
  44. // has ExponentialBackoff error but operationName is different, exponential
  45. // backoff is reset and operation is allowed to proceed.
  46. // This enables multiple operations to execute in parallel for the same
  47. // volumeName as long as they have different podName.
  48. // Once the operation is complete, the go routine is terminated and the
  49. // concatenation of volumeName and podName is removed from the list of
  50. // executing operations allowing a new operation to be started with the
  51. // volumeName without error.
  52. Run(volumeName v1.UniqueVolumeName, podName types.UniquePodName, generatedOperations types.GeneratedOperations) error
  53. // Wait blocks until all operations are completed. This is typically
  54. // necessary during tests - the test should wait until all operations finish
  55. // and evaluate results after that.
  56. Wait()
  57. // IsOperationPending returns true if an operation for the given volumeName and podName is pending,
  58. // otherwise it returns false
  59. IsOperationPending(volumeName v1.UniqueVolumeName, podName types.UniquePodName) bool
  60. }
  61. // NewNestedPendingOperations returns a new instance of NestedPendingOperations.
  62. func NewNestedPendingOperations(exponentialBackOffOnError bool) NestedPendingOperations {
  63. g := &nestedPendingOperations{
  64. operations: []operation{},
  65. exponentialBackOffOnError: exponentialBackOffOnError,
  66. }
  67. g.cond = sync.NewCond(&g.lock)
  68. return g
  69. }
  70. type nestedPendingOperations struct {
  71. operations []operation
  72. exponentialBackOffOnError bool
  73. cond *sync.Cond
  74. lock sync.RWMutex
  75. }
  76. type operation struct {
  77. volumeName v1.UniqueVolumeName
  78. podName types.UniquePodName
  79. operationName string
  80. operationPending bool
  81. expBackoff exponentialbackoff.ExponentialBackoff
  82. }
  83. func (grm *nestedPendingOperations) Run(
  84. volumeName v1.UniqueVolumeName,
  85. podName types.UniquePodName,
  86. generatedOperations types.GeneratedOperations) error {
  87. grm.lock.Lock()
  88. defer grm.lock.Unlock()
  89. opExists, previousOpIndex := grm.isOperationExists(volumeName, podName)
  90. if opExists {
  91. previousOp := grm.operations[previousOpIndex]
  92. // Operation already exists
  93. if previousOp.operationPending {
  94. // Operation is pending
  95. operationKey := getOperationKey(volumeName, podName)
  96. return NewAlreadyExistsError(operationKey)
  97. }
  98. operationKey := getOperationKey(volumeName, podName)
  99. backOffErr := previousOp.expBackoff.SafeToRetry(operationKey)
  100. if backOffErr != nil {
  101. if previousOp.operationName == generatedOperations.OperationName {
  102. return backOffErr
  103. }
  104. // previous operation and new operation are different. reset op. name and exp. backoff
  105. grm.operations[previousOpIndex].operationName = generatedOperations.OperationName
  106. grm.operations[previousOpIndex].expBackoff = exponentialbackoff.ExponentialBackoff{}
  107. }
  108. // Update existing operation to mark as pending.
  109. grm.operations[previousOpIndex].operationPending = true
  110. grm.operations[previousOpIndex].volumeName = volumeName
  111. grm.operations[previousOpIndex].podName = podName
  112. } else {
  113. // Create a new operation
  114. grm.operations = append(grm.operations,
  115. operation{
  116. operationPending: true,
  117. volumeName: volumeName,
  118. podName: podName,
  119. operationName: generatedOperations.OperationName,
  120. expBackoff: exponentialbackoff.ExponentialBackoff{},
  121. })
  122. }
  123. go func() (eventErr, detailedErr error) {
  124. // Handle unhandled panics (very unlikely)
  125. defer k8sRuntime.HandleCrash()
  126. // Handle completion of and error, if any, from operationFunc()
  127. defer grm.operationComplete(volumeName, podName, &detailedErr)
  128. return generatedOperations.Run()
  129. }()
  130. return nil
  131. }
  132. func (grm *nestedPendingOperations) IsOperationPending(
  133. volumeName v1.UniqueVolumeName,
  134. podName types.UniquePodName) bool {
  135. grm.lock.RLock()
  136. defer grm.lock.RUnlock()
  137. exist, previousOpIndex := grm.isOperationExists(volumeName, podName)
  138. if exist && grm.operations[previousOpIndex].operationPending {
  139. return true
  140. }
  141. return false
  142. }
  143. // This is an internal function and caller should acquire and release the lock
  144. func (grm *nestedPendingOperations) isOperationExists(
  145. volumeName v1.UniqueVolumeName,
  146. podName types.UniquePodName) (bool, int) {
  147. // If volumeName is empty, operation can be executed concurrently
  148. if volumeName == EmptyUniqueVolumeName {
  149. return false, -1
  150. }
  151. for previousOpIndex, previousOp := range grm.operations {
  152. if previousOp.volumeName != volumeName {
  153. // No match, keep searching
  154. continue
  155. }
  156. if previousOp.podName != EmptyUniquePodName &&
  157. podName != EmptyUniquePodName &&
  158. previousOp.podName != podName {
  159. // No match, keep searching
  160. continue
  161. }
  162. // Match
  163. return true, previousOpIndex
  164. }
  165. return false, -1
  166. }
  167. func (grm *nestedPendingOperations) getOperation(
  168. volumeName v1.UniqueVolumeName,
  169. podName types.UniquePodName) (uint, error) {
  170. // Assumes lock has been acquired by caller.
  171. for i, op := range grm.operations {
  172. if op.volumeName == volumeName &&
  173. op.podName == podName {
  174. return uint(i), nil
  175. }
  176. }
  177. logOperationKey := getOperationKey(volumeName, podName)
  178. return 0, fmt.Errorf("Operation %q not found", logOperationKey)
  179. }
  180. func (grm *nestedPendingOperations) deleteOperation(
  181. // Assumes lock has been acquired by caller.
  182. volumeName v1.UniqueVolumeName,
  183. podName types.UniquePodName) {
  184. opIndex := -1
  185. for i, op := range grm.operations {
  186. if op.volumeName == volumeName &&
  187. op.podName == podName {
  188. opIndex = i
  189. break
  190. }
  191. }
  192. // Delete index without preserving order
  193. grm.operations[opIndex] = grm.operations[len(grm.operations)-1]
  194. grm.operations = grm.operations[:len(grm.operations)-1]
  195. }
  196. func (grm *nestedPendingOperations) operationComplete(
  197. volumeName v1.UniqueVolumeName, podName types.UniquePodName, err *error) {
  198. // Defer operations are executed in Last-In is First-Out order. In this case
  199. // the lock is acquired first when operationCompletes begins, and is
  200. // released when the method finishes, after the lock is released cond is
  201. // signaled to wake waiting goroutine.
  202. defer grm.cond.Signal()
  203. grm.lock.Lock()
  204. defer grm.lock.Unlock()
  205. if *err == nil || !grm.exponentialBackOffOnError {
  206. // Operation completed without error, or exponentialBackOffOnError disabled
  207. grm.deleteOperation(volumeName, podName)
  208. if *err != nil {
  209. // Log error
  210. logOperationKey := getOperationKey(volumeName, podName)
  211. klog.Errorf("operation %s failed with: %v",
  212. logOperationKey,
  213. *err)
  214. }
  215. return
  216. }
  217. // Operation completed with error and exponentialBackOffOnError Enabled
  218. existingOpIndex, getOpErr := grm.getOperation(volumeName, podName)
  219. if getOpErr != nil {
  220. // Failed to find existing operation
  221. logOperationKey := getOperationKey(volumeName, podName)
  222. klog.Errorf("Operation %s completed. error: %v. exponentialBackOffOnError is enabled, but failed to get operation to update.",
  223. logOperationKey,
  224. *err)
  225. return
  226. }
  227. grm.operations[existingOpIndex].expBackoff.Update(err)
  228. grm.operations[existingOpIndex].operationPending = false
  229. // Log error
  230. operationKey :=
  231. getOperationKey(volumeName, podName)
  232. klog.Errorf("%v", grm.operations[existingOpIndex].expBackoff.
  233. GenerateNoRetriesPermittedMsg(operationKey))
  234. }
  235. func (grm *nestedPendingOperations) Wait() {
  236. grm.lock.Lock()
  237. defer grm.lock.Unlock()
  238. for len(grm.operations) > 0 {
  239. grm.cond.Wait()
  240. }
  241. }
  242. func getOperationKey(
  243. volumeName v1.UniqueVolumeName, podName types.UniquePodName) string {
  244. podNameStr := ""
  245. if podName != EmptyUniquePodName {
  246. podNameStr = fmt.Sprintf(" (%q)", podName)
  247. }
  248. return fmt.Sprintf("%q%s",
  249. volumeName,
  250. podNameStr)
  251. }
  252. // NewAlreadyExistsError returns a new instance of AlreadyExists error.
  253. func NewAlreadyExistsError(operationKey string) error {
  254. return alreadyExistsError{operationKey}
  255. }
  256. // IsAlreadyExists returns true if an error returned from
  257. // NestedPendingOperations indicates a new operation can not be started because
  258. // an operation with the same operation name is already executing.
  259. func IsAlreadyExists(err error) bool {
  260. switch err.(type) {
  261. case alreadyExistsError:
  262. return true
  263. default:
  264. return false
  265. }
  266. }
  267. // alreadyExistsError is the error returned by NestedPendingOperations when a
  268. // new operation can not be started because an operation with the same operation
  269. // name is already executing.
  270. type alreadyExistsError struct {
  271. operationKey string
  272. }
  273. var _ error = alreadyExistsError{}
  274. func (err alreadyExistsError) Error() string {
  275. return fmt.Sprintf(
  276. "Failed to create operation with name %q. An operation with that name is already executing.",
  277. err.operationKey)
  278. }