123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325 |
- /*
- Copyright 2017 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package kubeletconfig
- import (
- "fmt"
- "path/filepath"
- "time"
- apiequality "k8s.io/apimachinery/pkg/api/equality"
- "k8s.io/apimachinery/pkg/util/wait"
- clientset "k8s.io/client-go/kubernetes"
- v1core "k8s.io/client-go/kubernetes/typed/core/v1"
- "k8s.io/client-go/tools/cache"
- kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
- "k8s.io/kubernetes/pkg/kubelet/apis/config/validation"
- "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/checkpoint"
- "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/checkpoint/store"
- "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/status"
- utillog "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/log"
- utilpanic "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/panic"
- utilfs "k8s.io/kubernetes/pkg/util/filesystem"
- )
- const (
- storeDir = "store"
- // TODO(mtaufen): We may expose this in a future API, but for the time being we use an internal default,
- // because it is not especially clear where this should live in the API.
- configTrialDuration = 10 * time.Minute
- )
- // TransformFunc edits the KubeletConfiguration in-place, and returns an
- // error if any of the transformations failed.
- type TransformFunc func(kc *kubeletconfig.KubeletConfiguration) error
- // Controller manages syncing dynamic Kubelet configurations
- // For more information, see the proposal: https://github.com/kubernetes/community/blob/master/contributors/design-proposals/node/dynamic-kubelet-configuration.md
- type Controller struct {
- // transform applies an arbitrary transformation to config after loading, and before validation.
- // This can be used, for example, to include config from flags before the controller's validation step.
- // If transform returns an error, loadConfig will fail, and an InternalError will be reported.
- // Be wary if using this function as an extension point, in most cases the controller should
- // probably just be natively extended to do what you need. Injecting flag precedence transformations
- // is something of an exception because the caller of this controller (cmd/) is aware of flags, but this
- // controller's tree (pkg/) is not.
- transform TransformFunc
- // pendingConfigSource; write to this channel to indicate that the config source needs to be synced from the API server
- pendingConfigSource chan bool
- // configStatus manages the status we report on the Node object
- configStatus status.NodeConfigStatus
- // nodeInformer is the informer that watches the Node object
- nodeInformer cache.SharedInformer
- // remoteConfigSourceInformer is the informer that watches the assigned config source
- remoteConfigSourceInformer cache.SharedInformer
- // checkpointStore persists config source checkpoints to a storage layer
- checkpointStore store.Store
- }
- // NewController constructs a new Controller object and returns it. The dynamicConfigDir
- // path must be absolute. transform applies an arbitrary transformation to config after loading, and before validation.
- // This can be used, for example, to include config from flags before the controller's validation step.
- // If transform returns an error, loadConfig will fail, and an InternalError will be reported.
- // Be wary if using this function as an extension point, in most cases the controller should
- // probably just be natively extended to do what you need. Injecting flag precedence transformations
- // is something of an exception because the caller of this controller (cmd/) is aware of flags, but this
- // controller's tree (pkg/) is not.
- func NewController(dynamicConfigDir string, transform TransformFunc) *Controller {
- return &Controller{
- transform: transform,
- // channels must have capacity at least 1, since we signal with non-blocking writes
- pendingConfigSource: make(chan bool, 1),
- configStatus: status.NewNodeConfigStatus(),
- checkpointStore: store.NewFsStore(utilfs.DefaultFs{}, filepath.Join(dynamicConfigDir, storeDir)),
- }
- }
- // Bootstrap attempts to return a valid KubeletConfiguration based on the configuration of the Controller,
- // or returns an error if no valid configuration could be produced. Bootstrap should be called synchronously before StartSync.
- // If the pre-existing local configuration should be used, Bootstrap returns a nil config.
- func (cc *Controller) Bootstrap() (*kubeletconfig.KubeletConfiguration, error) {
- utillog.Infof("starting controller")
- // ensure the filesystem is initialized
- if err := cc.initializeDynamicConfigDir(); err != nil {
- return nil, err
- }
- // determine assigned source and set status
- assignedSource, err := cc.checkpointStore.Assigned()
- if err != nil {
- return nil, err
- }
- if assignedSource != nil {
- cc.configStatus.SetAssigned(assignedSource.NodeConfigSource())
- }
- // determine last-known-good source and set status
- lastKnownGoodSource, err := cc.checkpointStore.LastKnownGood()
- if err != nil {
- return nil, err
- }
- if lastKnownGoodSource != nil {
- cc.configStatus.SetLastKnownGood(lastKnownGoodSource.NodeConfigSource())
- }
- // if the assigned source is nil, return nil to indicate local config
- if assignedSource == nil {
- return nil, nil
- }
- // attempt to load assigned config
- assignedConfig, reason, err := cc.loadConfig(assignedSource)
- if err == nil {
- // update the active source to the non-nil assigned source
- cc.configStatus.SetActive(assignedSource.NodeConfigSource())
- // update the last-known-good config if necessary, and start a timer that
- // periodically checks whether the last-known good needs to be updated
- // we only do this when the assigned config loads and passes validation
- // wait.Forever will call the func once before starting the timer
- go wait.Forever(func() { cc.checkTrial(configTrialDuration) }, 10*time.Second)
- return assignedConfig, nil
- } // Assert: the assigned config failed to load or validate
- // TODO(mtaufen): consider re-attempting download when a load/verify/parse/validate
- // error happens outside trial period, we already made it past the trial so it's probably filesystem corruption
- // or something else scary
- // log the reason and error details for the failure to load the assigned config
- utillog.Errorf(fmt.Sprintf("%s, error: %v", reason, err))
- // set status to indicate the failure with the assigned config
- cc.configStatus.SetError(reason)
- // if the last-known-good source is nil, return nil to indicate local config
- if lastKnownGoodSource == nil {
- return nil, nil
- }
- // attempt to load the last-known-good config
- lastKnownGoodConfig, _, err := cc.loadConfig(lastKnownGoodSource)
- if err != nil {
- // we failed to load the last-known-good, so something is really messed up and we just return the error
- return nil, err
- }
- // set status to indicate the active source is the non-nil last-known-good source
- cc.configStatus.SetActive(lastKnownGoodSource.NodeConfigSource())
- return lastKnownGoodConfig, nil
- }
- // StartSync tells the controller to start the goroutines that sync status/config to/from the API server.
- // The clients must be non-nil, and the nodeName must be non-empty.
- func (cc *Controller) StartSync(client clientset.Interface, eventClient v1core.EventsGetter, nodeName string) error {
- const errFmt = "cannot start Kubelet config sync: %s"
- if client == nil {
- return fmt.Errorf(errFmt, "nil client")
- }
- if eventClient == nil {
- return fmt.Errorf(errFmt, "nil event client")
- }
- if nodeName == "" {
- return fmt.Errorf(errFmt, "empty nodeName")
- }
- // Rather than use utilruntime.HandleCrash, which doesn't actually crash in the Kubelet,
- // we use HandlePanic to manually call the panic handlers and then crash.
- // We have a better chance of recovering normal operation if we just restart the Kubelet in the event
- // of a Go runtime error.
- // NOTE(mtaufen): utilpanic.HandlePanic returns a function and you have to call it for your thing to run!
- // This was EVIL to debug (difficult to see missing `()`).
- // The code now uses `go name()` instead of `go utilpanic.HandlePanic(func(){...})()` to avoid confusion.
- // status sync worker
- statusSyncLoopFunc := utilpanic.HandlePanic(func() {
- utillog.Infof("starting status sync loop")
- wait.JitterUntil(func() {
- cc.configStatus.Sync(client, nodeName)
- }, 10*time.Second, 0.2, true, wait.NeverStop)
- })
- // remote config source informer, if we have a remote source to watch
- assignedSource, err := cc.checkpointStore.Assigned()
- if err != nil {
- return fmt.Errorf(errFmt, err)
- } else if assignedSource == nil {
- utillog.Infof("local source is assigned, will not start remote config source informer")
- } else {
- cc.remoteConfigSourceInformer = assignedSource.Informer(client, cache.ResourceEventHandlerFuncs{
- AddFunc: cc.onAddRemoteConfigSourceEvent,
- UpdateFunc: cc.onUpdateRemoteConfigSourceEvent,
- DeleteFunc: cc.onDeleteRemoteConfigSourceEvent,
- },
- )
- }
- remoteConfigSourceInformerFunc := utilpanic.HandlePanic(func() {
- if cc.remoteConfigSourceInformer != nil {
- utillog.Infof("starting remote config source informer")
- cc.remoteConfigSourceInformer.Run(wait.NeverStop)
- }
- })
- // node informer
- cc.nodeInformer = newSharedNodeInformer(client, nodeName,
- cc.onAddNodeEvent, cc.onUpdateNodeEvent, cc.onDeleteNodeEvent)
- nodeInformerFunc := utilpanic.HandlePanic(func() {
- utillog.Infof("starting Node informer")
- cc.nodeInformer.Run(wait.NeverStop)
- })
- // config sync worker
- configSyncLoopFunc := utilpanic.HandlePanic(func() {
- utillog.Infof("starting Kubelet config sync loop")
- wait.JitterUntil(func() {
- cc.syncConfigSource(client, eventClient, nodeName)
- }, 10*time.Second, 0.2, true, wait.NeverStop)
- })
- go statusSyncLoopFunc()
- go remoteConfigSourceInformerFunc()
- go nodeInformerFunc()
- go configSyncLoopFunc()
- return nil
- }
- // loadConfig loads Kubelet config from a checkpoint
- // It returns the loaded configuration or a clean failure reason (for status reporting) and an error.
- func (cc *Controller) loadConfig(source checkpoint.RemoteConfigSource) (*kubeletconfig.KubeletConfiguration, string, error) {
- // load KubeletConfiguration from checkpoint
- kc, err := cc.checkpointStore.Load(source)
- if err != nil {
- return nil, status.LoadError, err
- }
- // apply any required transformations to the KubeletConfiguration
- if cc.transform != nil {
- if err := cc.transform(kc); err != nil {
- return nil, status.InternalError, err
- }
- }
- // validate the result
- if err := validation.ValidateKubeletConfiguration(kc); err != nil {
- return nil, status.ValidateError, err
- }
- return kc, "", nil
- }
- // initializeDynamicConfigDir makes sure that the storage layers for various controller components are set up correctly
- func (cc *Controller) initializeDynamicConfigDir() error {
- utillog.Infof("ensuring filesystem is set up correctly")
- // initializeDynamicConfigDir local checkpoint storage location
- return cc.checkpointStore.Initialize()
- }
- // checkTrial checks whether the trial duration has passed, and updates the last-known-good config if necessary
- func (cc *Controller) checkTrial(duration time.Duration) {
- // when the trial period is over, the assigned config becomes the last-known-good
- if trial, err := cc.inTrial(duration); err != nil {
- utillog.Errorf("failed to check trial period for assigned config, error: %v", err)
- } else if !trial {
- if err := cc.graduateAssignedToLastKnownGood(); err != nil {
- utillog.Errorf("failed to set last-known-good to assigned config, error: %v", err)
- }
- }
- }
- // inTrial returns true if the time elapsed since the last modification of the assigned config does not exceed `trialDur`, false otherwise
- func (cc *Controller) inTrial(trialDur time.Duration) (bool, error) {
- now := time.Now()
- t, err := cc.checkpointStore.AssignedModified()
- if err != nil {
- return false, err
- }
- if now.Sub(t) <= trialDur {
- return true, nil
- }
- return false, nil
- }
- // graduateAssignedToLastKnownGood sets the last-known-good in the checkpointStore
- // to the same value as the assigned config maintained by the checkpointStore
- func (cc *Controller) graduateAssignedToLastKnownGood() error {
- // get assigned
- assigned, err := cc.checkpointStore.Assigned()
- if err != nil {
- return err
- }
- // get last-known-good
- lastKnownGood, err := cc.checkpointStore.LastKnownGood()
- if err != nil {
- return err
- }
- // if the sources are equal, no need to change
- if assigned == lastKnownGood ||
- assigned != nil && lastKnownGood != nil && apiequality.Semantic.DeepEqual(assigned.NodeConfigSource(), lastKnownGood.NodeConfigSource()) {
- return nil
- }
- // update last-known-good
- err = cc.checkpointStore.SetLastKnownGood(assigned)
- if err != nil {
- return err
- }
- // update the status to reflect the new last-known-good config
- cc.configStatus.SetLastKnownGood(assigned.NodeConfigSource())
- utillog.Infof("updated last-known-good config to %s, UID: %s, ResourceVersion: %s", assigned.APIPath(), assigned.UID(), assigned.ResourceVersion())
- return nil
- }
|