controller.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package kubeletconfig
  14. import (
  15. "fmt"
  16. "path/filepath"
  17. "time"
  18. apiequality "k8s.io/apimachinery/pkg/api/equality"
  19. "k8s.io/apimachinery/pkg/util/wait"
  20. clientset "k8s.io/client-go/kubernetes"
  21. v1core "k8s.io/client-go/kubernetes/typed/core/v1"
  22. "k8s.io/client-go/tools/cache"
  23. kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
  24. "k8s.io/kubernetes/pkg/kubelet/apis/config/validation"
  25. "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/checkpoint"
  26. "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/checkpoint/store"
  27. "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/status"
  28. utillog "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/log"
  29. utilpanic "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/panic"
  30. utilfs "k8s.io/kubernetes/pkg/util/filesystem"
  31. )
  32. const (
  33. storeDir = "store"
  34. // TODO(mtaufen): We may expose this in a future API, but for the time being we use an internal default,
  35. // because it is not especially clear where this should live in the API.
  36. configTrialDuration = 10 * time.Minute
  37. )
  38. // TransformFunc edits the KubeletConfiguration in-place, and returns an
  39. // error if any of the transformations failed.
  40. type TransformFunc func(kc *kubeletconfig.KubeletConfiguration) error
  41. // Controller manages syncing dynamic Kubelet configurations
  42. // For more information, see the proposal: https://github.com/kubernetes/community/blob/master/contributors/design-proposals/node/dynamic-kubelet-configuration.md
  43. type Controller struct {
  44. // transform applies an arbitrary transformation to config after loading, and before validation.
  45. // This can be used, for example, to include config from flags before the controller's validation step.
  46. // If transform returns an error, loadConfig will fail, and an InternalError will be reported.
  47. // Be wary if using this function as an extension point, in most cases the controller should
  48. // probably just be natively extended to do what you need. Injecting flag precedence transformations
  49. // is something of an exception because the caller of this controller (cmd/) is aware of flags, but this
  50. // controller's tree (pkg/) is not.
  51. transform TransformFunc
  52. // pendingConfigSource; write to this channel to indicate that the config source needs to be synced from the API server
  53. pendingConfigSource chan bool
  54. // configStatus manages the status we report on the Node object
  55. configStatus status.NodeConfigStatus
  56. // nodeInformer is the informer that watches the Node object
  57. nodeInformer cache.SharedInformer
  58. // remoteConfigSourceInformer is the informer that watches the assigned config source
  59. remoteConfigSourceInformer cache.SharedInformer
  60. // checkpointStore persists config source checkpoints to a storage layer
  61. checkpointStore store.Store
  62. }
  63. // NewController constructs a new Controller object and returns it. The dynamicConfigDir
  64. // path must be absolute. transform applies an arbitrary transformation to config after loading, and before validation.
  65. // This can be used, for example, to include config from flags before the controller's validation step.
  66. // If transform returns an error, loadConfig will fail, and an InternalError will be reported.
  67. // Be wary if using this function as an extension point, in most cases the controller should
  68. // probably just be natively extended to do what you need. Injecting flag precedence transformations
  69. // is something of an exception because the caller of this controller (cmd/) is aware of flags, but this
  70. // controller's tree (pkg/) is not.
  71. func NewController(dynamicConfigDir string, transform TransformFunc) *Controller {
  72. return &Controller{
  73. transform: transform,
  74. // channels must have capacity at least 1, since we signal with non-blocking writes
  75. pendingConfigSource: make(chan bool, 1),
  76. configStatus: status.NewNodeConfigStatus(),
  77. checkpointStore: store.NewFsStore(utilfs.DefaultFs{}, filepath.Join(dynamicConfigDir, storeDir)),
  78. }
  79. }
  80. // Bootstrap attempts to return a valid KubeletConfiguration based on the configuration of the Controller,
  81. // or returns an error if no valid configuration could be produced. Bootstrap should be called synchronously before StartSync.
  82. // If the pre-existing local configuration should be used, Bootstrap returns a nil config.
  83. func (cc *Controller) Bootstrap() (*kubeletconfig.KubeletConfiguration, error) {
  84. utillog.Infof("starting controller")
  85. // ensure the filesystem is initialized
  86. if err := cc.initializeDynamicConfigDir(); err != nil {
  87. return nil, err
  88. }
  89. // determine assigned source and set status
  90. assignedSource, err := cc.checkpointStore.Assigned()
  91. if err != nil {
  92. return nil, err
  93. }
  94. if assignedSource != nil {
  95. cc.configStatus.SetAssigned(assignedSource.NodeConfigSource())
  96. }
  97. // determine last-known-good source and set status
  98. lastKnownGoodSource, err := cc.checkpointStore.LastKnownGood()
  99. if err != nil {
  100. return nil, err
  101. }
  102. if lastKnownGoodSource != nil {
  103. cc.configStatus.SetLastKnownGood(lastKnownGoodSource.NodeConfigSource())
  104. }
  105. // if the assigned source is nil, return nil to indicate local config
  106. if assignedSource == nil {
  107. return nil, nil
  108. }
  109. // attempt to load assigned config
  110. assignedConfig, reason, err := cc.loadConfig(assignedSource)
  111. if err == nil {
  112. // update the active source to the non-nil assigned source
  113. cc.configStatus.SetActive(assignedSource.NodeConfigSource())
  114. // update the last-known-good config if necessary, and start a timer that
  115. // periodically checks whether the last-known good needs to be updated
  116. // we only do this when the assigned config loads and passes validation
  117. // wait.Forever will call the func once before starting the timer
  118. go wait.Forever(func() { cc.checkTrial(configTrialDuration) }, 10*time.Second)
  119. return assignedConfig, nil
  120. } // Assert: the assigned config failed to load or validate
  121. // TODO(mtaufen): consider re-attempting download when a load/verify/parse/validate
  122. // error happens outside trial period, we already made it past the trial so it's probably filesystem corruption
  123. // or something else scary
  124. // log the reason and error details for the failure to load the assigned config
  125. utillog.Errorf(fmt.Sprintf("%s, error: %v", reason, err))
  126. // set status to indicate the failure with the assigned config
  127. cc.configStatus.SetError(reason)
  128. // if the last-known-good source is nil, return nil to indicate local config
  129. if lastKnownGoodSource == nil {
  130. return nil, nil
  131. }
  132. // attempt to load the last-known-good config
  133. lastKnownGoodConfig, _, err := cc.loadConfig(lastKnownGoodSource)
  134. if err != nil {
  135. // we failed to load the last-known-good, so something is really messed up and we just return the error
  136. return nil, err
  137. }
  138. // set status to indicate the active source is the non-nil last-known-good source
  139. cc.configStatus.SetActive(lastKnownGoodSource.NodeConfigSource())
  140. return lastKnownGoodConfig, nil
  141. }
  142. // StartSync tells the controller to start the goroutines that sync status/config to/from the API server.
  143. // The clients must be non-nil, and the nodeName must be non-empty.
  144. func (cc *Controller) StartSync(client clientset.Interface, eventClient v1core.EventsGetter, nodeName string) error {
  145. const errFmt = "cannot start Kubelet config sync: %s"
  146. if client == nil {
  147. return fmt.Errorf(errFmt, "nil client")
  148. }
  149. if eventClient == nil {
  150. return fmt.Errorf(errFmt, "nil event client")
  151. }
  152. if nodeName == "" {
  153. return fmt.Errorf(errFmt, "empty nodeName")
  154. }
  155. // Rather than use utilruntime.HandleCrash, which doesn't actually crash in the Kubelet,
  156. // we use HandlePanic to manually call the panic handlers and then crash.
  157. // We have a better chance of recovering normal operation if we just restart the Kubelet in the event
  158. // of a Go runtime error.
  159. // NOTE(mtaufen): utilpanic.HandlePanic returns a function and you have to call it for your thing to run!
  160. // This was EVIL to debug (difficult to see missing `()`).
  161. // The code now uses `go name()` instead of `go utilpanic.HandlePanic(func(){...})()` to avoid confusion.
  162. // status sync worker
  163. statusSyncLoopFunc := utilpanic.HandlePanic(func() {
  164. utillog.Infof("starting status sync loop")
  165. wait.JitterUntil(func() {
  166. cc.configStatus.Sync(client, nodeName)
  167. }, 10*time.Second, 0.2, true, wait.NeverStop)
  168. })
  169. // remote config source informer, if we have a remote source to watch
  170. assignedSource, err := cc.checkpointStore.Assigned()
  171. if err != nil {
  172. return fmt.Errorf(errFmt, err)
  173. } else if assignedSource == nil {
  174. utillog.Infof("local source is assigned, will not start remote config source informer")
  175. } else {
  176. cc.remoteConfigSourceInformer = assignedSource.Informer(client, cache.ResourceEventHandlerFuncs{
  177. AddFunc: cc.onAddRemoteConfigSourceEvent,
  178. UpdateFunc: cc.onUpdateRemoteConfigSourceEvent,
  179. DeleteFunc: cc.onDeleteRemoteConfigSourceEvent,
  180. },
  181. )
  182. }
  183. remoteConfigSourceInformerFunc := utilpanic.HandlePanic(func() {
  184. if cc.remoteConfigSourceInformer != nil {
  185. utillog.Infof("starting remote config source informer")
  186. cc.remoteConfigSourceInformer.Run(wait.NeverStop)
  187. }
  188. })
  189. // node informer
  190. cc.nodeInformer = newSharedNodeInformer(client, nodeName,
  191. cc.onAddNodeEvent, cc.onUpdateNodeEvent, cc.onDeleteNodeEvent)
  192. nodeInformerFunc := utilpanic.HandlePanic(func() {
  193. utillog.Infof("starting Node informer")
  194. cc.nodeInformer.Run(wait.NeverStop)
  195. })
  196. // config sync worker
  197. configSyncLoopFunc := utilpanic.HandlePanic(func() {
  198. utillog.Infof("starting Kubelet config sync loop")
  199. wait.JitterUntil(func() {
  200. cc.syncConfigSource(client, eventClient, nodeName)
  201. }, 10*time.Second, 0.2, true, wait.NeverStop)
  202. })
  203. go statusSyncLoopFunc()
  204. go remoteConfigSourceInformerFunc()
  205. go nodeInformerFunc()
  206. go configSyncLoopFunc()
  207. return nil
  208. }
  209. // loadConfig loads Kubelet config from a checkpoint
  210. // It returns the loaded configuration or a clean failure reason (for status reporting) and an error.
  211. func (cc *Controller) loadConfig(source checkpoint.RemoteConfigSource) (*kubeletconfig.KubeletConfiguration, string, error) {
  212. // load KubeletConfiguration from checkpoint
  213. kc, err := cc.checkpointStore.Load(source)
  214. if err != nil {
  215. return nil, status.LoadError, err
  216. }
  217. // apply any required transformations to the KubeletConfiguration
  218. if cc.transform != nil {
  219. if err := cc.transform(kc); err != nil {
  220. return nil, status.InternalError, err
  221. }
  222. }
  223. // validate the result
  224. if err := validation.ValidateKubeletConfiguration(kc); err != nil {
  225. return nil, status.ValidateError, err
  226. }
  227. return kc, "", nil
  228. }
  229. // initializeDynamicConfigDir makes sure that the storage layers for various controller components are set up correctly
  230. func (cc *Controller) initializeDynamicConfigDir() error {
  231. utillog.Infof("ensuring filesystem is set up correctly")
  232. // initializeDynamicConfigDir local checkpoint storage location
  233. return cc.checkpointStore.Initialize()
  234. }
  235. // checkTrial checks whether the trial duration has passed, and updates the last-known-good config if necessary
  236. func (cc *Controller) checkTrial(duration time.Duration) {
  237. // when the trial period is over, the assigned config becomes the last-known-good
  238. if trial, err := cc.inTrial(duration); err != nil {
  239. utillog.Errorf("failed to check trial period for assigned config, error: %v", err)
  240. } else if !trial {
  241. if err := cc.graduateAssignedToLastKnownGood(); err != nil {
  242. utillog.Errorf("failed to set last-known-good to assigned config, error: %v", err)
  243. }
  244. }
  245. }
  246. // inTrial returns true if the time elapsed since the last modification of the assigned config does not exceed `trialDur`, false otherwise
  247. func (cc *Controller) inTrial(trialDur time.Duration) (bool, error) {
  248. now := time.Now()
  249. t, err := cc.checkpointStore.AssignedModified()
  250. if err != nil {
  251. return false, err
  252. }
  253. if now.Sub(t) <= trialDur {
  254. return true, nil
  255. }
  256. return false, nil
  257. }
  258. // graduateAssignedToLastKnownGood sets the last-known-good in the checkpointStore
  259. // to the same value as the assigned config maintained by the checkpointStore
  260. func (cc *Controller) graduateAssignedToLastKnownGood() error {
  261. // get assigned
  262. assigned, err := cc.checkpointStore.Assigned()
  263. if err != nil {
  264. return err
  265. }
  266. // get last-known-good
  267. lastKnownGood, err := cc.checkpointStore.LastKnownGood()
  268. if err != nil {
  269. return err
  270. }
  271. // if the sources are equal, no need to change
  272. if assigned == lastKnownGood ||
  273. assigned != nil && lastKnownGood != nil && apiequality.Semantic.DeepEqual(assigned.NodeConfigSource(), lastKnownGood.NodeConfigSource()) {
  274. return nil
  275. }
  276. // update last-known-good
  277. err = cc.checkpointStore.SetLastKnownGood(assigned)
  278. if err != nil {
  279. return err
  280. }
  281. // update the status to reflect the new last-known-good config
  282. cc.configStatus.SetLastKnownGood(assigned.NodeConfigSource())
  283. utillog.Infof("updated last-known-good config to %s, UID: %s, ResourceVersion: %s", assigned.APIPath(), assigned.UID(), assigned.ResourceVersion())
  284. return nil
  285. }