reconciler.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. /*
  2. Copyright 2019 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // Package reconciler implements interfaces that attempt to reconcile the
  14. // desired state of the world with the actual state of the world by triggering
  15. // relevant actions (register/deregister plugins).
  16. package reconciler
  17. import (
  18. "sync"
  19. "time"
  20. "k8s.io/apimachinery/pkg/util/wait"
  21. "k8s.io/klog"
  22. "k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
  23. "k8s.io/kubernetes/pkg/kubelet/pluginmanager/operationexecutor"
  24. "k8s.io/kubernetes/pkg/util/goroutinemap"
  25. "k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
  26. )
  27. // Reconciler runs a periodic loop to reconcile the desired state of the world
  28. // with the actual state of the world by triggering register and unregister
  29. // operations.
  30. type Reconciler interface {
  31. // Starts running the reconciliation loop which executes periodically, checks
  32. // if plugins that should be registered are register and plugins that should be
  33. // unregistered are unregistered. If not, it will trigger register/unregister
  34. // operations to rectify.
  35. Run(stopCh <-chan struct{})
  36. // AddHandler adds the given plugin handler for a specific plugin type
  37. AddHandler(pluginType string, pluginHandler cache.PluginHandler)
  38. }
  39. // NewReconciler returns a new instance of Reconciler.
  40. //
  41. // loopSleepDuration - the amount of time the reconciler loop sleeps between
  42. // successive executions
  43. // syncDuration - the amount of time the syncStates sleeps between
  44. // successive executions
  45. // operationExecutor - used to trigger register/unregister operations safely
  46. // (prevents more than one operation from being triggered on the same
  47. // socket path)
  48. // desiredStateOfWorld - cache containing the desired state of the world
  49. // actualStateOfWorld - cache containing the actual state of the world
  50. func NewReconciler(
  51. operationExecutor operationexecutor.OperationExecutor,
  52. loopSleepDuration time.Duration,
  53. desiredStateOfWorld cache.DesiredStateOfWorld,
  54. actualStateOfWorld cache.ActualStateOfWorld) Reconciler {
  55. return &reconciler{
  56. operationExecutor: operationExecutor,
  57. loopSleepDuration: loopSleepDuration,
  58. desiredStateOfWorld: desiredStateOfWorld,
  59. actualStateOfWorld: actualStateOfWorld,
  60. handlers: make(map[string]cache.PluginHandler),
  61. }
  62. }
  63. type reconciler struct {
  64. operationExecutor operationexecutor.OperationExecutor
  65. loopSleepDuration time.Duration
  66. desiredStateOfWorld cache.DesiredStateOfWorld
  67. actualStateOfWorld cache.ActualStateOfWorld
  68. handlers map[string]cache.PluginHandler
  69. sync.RWMutex
  70. }
  71. var _ Reconciler = &reconciler{}
  72. func (rc *reconciler) Run(stopCh <-chan struct{}) {
  73. wait.Until(func() {
  74. rc.reconcile()
  75. },
  76. rc.loopSleepDuration,
  77. stopCh)
  78. }
  79. func (rc *reconciler) AddHandler(pluginType string, pluginHandler cache.PluginHandler) {
  80. rc.Lock()
  81. defer rc.Unlock()
  82. rc.handlers[pluginType] = pluginHandler
  83. }
  84. func (rc *reconciler) getHandlers() map[string]cache.PluginHandler {
  85. rc.RLock()
  86. defer rc.RUnlock()
  87. return rc.handlers
  88. }
  89. func (rc *reconciler) reconcile() {
  90. // Unregisterations are triggered before registrations
  91. // Ensure plugins that should be unregistered are unregistered.
  92. for _, registeredPlugin := range rc.actualStateOfWorld.GetRegisteredPlugins() {
  93. unregisterPlugin := false
  94. if !rc.desiredStateOfWorld.PluginExists(registeredPlugin.SocketPath) {
  95. unregisterPlugin = true
  96. } else {
  97. // We also need to unregister the plugins that exist in both actual state of world
  98. // and desired state of world cache, but the timestamps don't match.
  99. // Iterate through desired state of world plugins and see if there's any plugin
  100. // with the same socket path but different timestamp.
  101. for _, dswPlugin := range rc.desiredStateOfWorld.GetPluginsToRegister() {
  102. if dswPlugin.SocketPath == registeredPlugin.SocketPath && dswPlugin.Timestamp != registeredPlugin.Timestamp {
  103. klog.V(5).Infof(registeredPlugin.GenerateMsgDetailed("An updated version of plugin has been found, unregistering the plugin first before reregistering", ""))
  104. unregisterPlugin = true
  105. break
  106. }
  107. }
  108. }
  109. if unregisterPlugin {
  110. klog.V(5).Infof(registeredPlugin.GenerateMsgDetailed("Starting operationExecutor.UnregisterPlugin", ""))
  111. err := rc.operationExecutor.UnregisterPlugin(registeredPlugin.SocketPath, rc.getHandlers(), rc.actualStateOfWorld)
  112. if err != nil &&
  113. !goroutinemap.IsAlreadyExists(err) &&
  114. !exponentialbackoff.IsExponentialBackoff(err) {
  115. // Ignore goroutinemap.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
  116. // Log all other errors.
  117. klog.Errorf(registeredPlugin.GenerateErrorDetailed("operationExecutor.UnregisterPlugin failed", err).Error())
  118. }
  119. if err == nil {
  120. klog.V(1).Infof(registeredPlugin.GenerateMsgDetailed("operationExecutor.UnregisterPlugin started", ""))
  121. }
  122. }
  123. }
  124. // Ensure plugins that should be registered are registered
  125. for _, pluginToRegister := range rc.desiredStateOfWorld.GetPluginsToRegister() {
  126. if !rc.actualStateOfWorld.PluginExistsWithCorrectTimestamp(pluginToRegister) {
  127. klog.V(5).Infof(pluginToRegister.GenerateMsgDetailed("Starting operationExecutor.RegisterPlugin", ""))
  128. err := rc.operationExecutor.RegisterPlugin(pluginToRegister.SocketPath, pluginToRegister.Timestamp, rc.getHandlers(), rc.actualStateOfWorld)
  129. if err != nil &&
  130. !goroutinemap.IsAlreadyExists(err) &&
  131. !exponentialbackoff.IsExponentialBackoff(err) {
  132. // Ignore goroutinemap.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
  133. klog.Errorf(pluginToRegister.GenerateErrorDetailed("operationExecutor.RegisterPlugin failed", err).Error())
  134. }
  135. if err == nil {
  136. klog.V(1).Infof(pluginToRegister.GenerateMsgDetailed("operationExecutor.RegisterPlugin started", ""))
  137. }
  138. }
  139. }
  140. }