operation_executor.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. /*
  2. Copyright 2019 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // Package operationexecutor implements interfaces that enable execution of
  14. // register and unregister operations with a
  15. // goroutinemap so that more than one operation is never triggered
  16. // on the same plugin.
  17. package operationexecutor
  18. import (
  19. "time"
  20. "k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
  21. "k8s.io/kubernetes/pkg/util/goroutinemap"
  22. )
  23. // OperationExecutor defines a set of operations for registering and unregistering
  24. // a plugin that are executed with a NewGoRoutineMap which
  25. // prevents more than one operation from being triggered on the same socket path.
  26. //
  27. // These operations should be idempotent (for example, RegisterPlugin should
  28. // still succeed if the plugin is already registered, etc.). However,
  29. // they depend on the plugin handlers (for each plugin type) to implement this
  30. // behavior.
  31. //
  32. // Once an operation completes successfully, the actualStateOfWorld is updated
  33. // to indicate the plugin is registered/unregistered.
  34. //
  35. // Once the operation is started, since it is executed asynchronously,
  36. // errors are simply logged and the goroutine is terminated without updating
  37. // actualStateOfWorld.
  38. type OperationExecutor interface {
  39. // RegisterPlugin registers the given plugin using the a handler in the plugin handler map.
  40. // It then updates the actual state of the world to reflect that.
  41. RegisterPlugin(socketPath string, foundInDeprecatedDir bool, timestamp time.Time, pluginHandlers map[string]cache.PluginHandler, actualStateOfWorld ActualStateOfWorldUpdater) error
  42. // UnregisterPlugin deregisters the given plugin using a handler in the given plugin handler map.
  43. // It then updates the actual state of the world to reflect that.
  44. UnregisterPlugin(socketPath string, pluginHandlers map[string]cache.PluginHandler, actualStateOfWorld ActualStateOfWorldUpdater) error
  45. }
  46. // NewOperationExecutor returns a new instance of OperationExecutor.
  47. func NewOperationExecutor(
  48. operationGenerator OperationGenerator) OperationExecutor {
  49. return &operationExecutor{
  50. pendingOperations: goroutinemap.NewGoRoutineMap(true /* exponentialBackOffOnError */),
  51. operationGenerator: operationGenerator,
  52. }
  53. }
  54. // ActualStateOfWorldUpdater defines a set of operations updating the actual
  55. // state of the world cache after successful registeration/deregistration.
  56. type ActualStateOfWorldUpdater interface {
  57. // AddPlugin add the given plugin in the cache if no existing plugin
  58. // in the cache has the same socket path.
  59. // An error will be returned if socketPath is empty.
  60. AddPlugin(pluginInfo cache.PluginInfo) error
  61. // RemovePlugin deletes the plugin with the given socket path from the actual
  62. // state of world.
  63. // If a plugin does not exist with the given socket path, this is a no-op.
  64. RemovePlugin(socketPath string)
  65. }
  66. type operationExecutor struct {
  67. // pendingOperations keeps track of pending attach and detach operations so
  68. // multiple operations are not started on the same volume
  69. pendingOperations goroutinemap.GoRoutineMap
  70. // operationGenerator is an interface that provides implementations for
  71. // generating volume function
  72. operationGenerator OperationGenerator
  73. }
  74. var _ OperationExecutor = &operationExecutor{}
  75. func (oe *operationExecutor) IsOperationPending(socketPath string) bool {
  76. return oe.pendingOperations.IsOperationPending(socketPath)
  77. }
  78. func (oe *operationExecutor) RegisterPlugin(
  79. socketPath string,
  80. foundInDeprecatedDir bool,
  81. timestamp time.Time,
  82. pluginHandlers map[string]cache.PluginHandler,
  83. actualStateOfWorld ActualStateOfWorldUpdater) error {
  84. generatedOperation :=
  85. oe.operationGenerator.GenerateRegisterPluginFunc(socketPath, foundInDeprecatedDir, timestamp, pluginHandlers, actualStateOfWorld)
  86. return oe.pendingOperations.Run(
  87. socketPath, generatedOperation)
  88. }
  89. func (oe *operationExecutor) UnregisterPlugin(
  90. socketPath string,
  91. pluginHandlers map[string]cache.PluginHandler,
  92. actualStateOfWorld ActualStateOfWorldUpdater) error {
  93. generatedOperation :=
  94. oe.operationGenerator.GenerateUnregisterPluginFunc(socketPath, pluginHandlers, actualStateOfWorld)
  95. return oe.pendingOperations.Run(
  96. socketPath, generatedOperation)
  97. }