123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278 |
- /*
- Copyright 2015 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- // Package app makes it easy to create a kubelet server for various contexts.
- package app
- import (
- "context"
- "crypto/tls"
- "errors"
- "fmt"
- "math/rand"
- "net"
- "net/http"
- "net/url"
- "os"
- "path"
- "path/filepath"
- "strconv"
- "strings"
- "time"
- "github.com/coreos/go-systemd/daemon"
- "github.com/spf13/cobra"
- "github.com/spf13/pflag"
- "k8s.io/klog"
- v1 "k8s.io/api/core/v1"
- "k8s.io/apimachinery/pkg/api/resource"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/runtime"
- "k8s.io/apimachinery/pkg/types"
- utilruntime "k8s.io/apimachinery/pkg/util/runtime"
- "k8s.io/apimachinery/pkg/util/sets"
- "k8s.io/apimachinery/pkg/util/wait"
- genericapiserver "k8s.io/apiserver/pkg/server"
- "k8s.io/apiserver/pkg/server/healthz"
- utilfeature "k8s.io/apiserver/pkg/util/feature"
- clientset "k8s.io/client-go/kubernetes"
- certificatesclient "k8s.io/client-go/kubernetes/typed/certificates/v1beta1"
- v1core "k8s.io/client-go/kubernetes/typed/core/v1"
- restclient "k8s.io/client-go/rest"
- "k8s.io/client-go/tools/clientcmd"
- "k8s.io/client-go/tools/record"
- certutil "k8s.io/client-go/util/cert"
- "k8s.io/client-go/util/certificate"
- "k8s.io/client-go/util/connrotation"
- "k8s.io/client-go/util/keyutil"
- cloudprovider "k8s.io/cloud-provider"
- cliflag "k8s.io/component-base/cli/flag"
- kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1"
- "k8s.io/kubernetes/cmd/kubelet/app/options"
- "k8s.io/kubernetes/pkg/api/legacyscheme"
- api "k8s.io/kubernetes/pkg/apis/core"
- "k8s.io/kubernetes/pkg/capabilities"
- "k8s.io/kubernetes/pkg/credentialprovider"
- "k8s.io/kubernetes/pkg/features"
- "k8s.io/kubernetes/pkg/kubelet"
- kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
- kubeletscheme "k8s.io/kubernetes/pkg/kubelet/apis/config/scheme"
- kubeletconfigvalidation "k8s.io/kubernetes/pkg/kubelet/apis/config/validation"
- "k8s.io/kubernetes/pkg/kubelet/cadvisor"
- kubeletcertificate "k8s.io/kubernetes/pkg/kubelet/certificate"
- "k8s.io/kubernetes/pkg/kubelet/certificate/bootstrap"
- "k8s.io/kubernetes/pkg/kubelet/cm"
- "k8s.io/kubernetes/pkg/kubelet/config"
- kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
- "k8s.io/kubernetes/pkg/kubelet/dockershim"
- dockerremote "k8s.io/kubernetes/pkg/kubelet/dockershim/remote"
- "k8s.io/kubernetes/pkg/kubelet/eviction"
- evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
- dynamickubeletconfig "k8s.io/kubernetes/pkg/kubelet/kubeletconfig"
- "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/configfiles"
- "k8s.io/kubernetes/pkg/kubelet/server"
- "k8s.io/kubernetes/pkg/kubelet/server/streaming"
- "k8s.io/kubernetes/pkg/kubelet/stats/pidlimit"
- kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
- "k8s.io/kubernetes/pkg/util/configz"
- utilfs "k8s.io/kubernetes/pkg/util/filesystem"
- utilflag "k8s.io/kubernetes/pkg/util/flag"
- "k8s.io/kubernetes/pkg/util/flock"
- "k8s.io/kubernetes/pkg/util/mount"
- nodeutil "k8s.io/kubernetes/pkg/util/node"
- "k8s.io/kubernetes/pkg/util/oom"
- "k8s.io/kubernetes/pkg/util/rlimit"
- "k8s.io/kubernetes/pkg/version"
- "k8s.io/kubernetes/pkg/version/verflag"
- nsutil "k8s.io/kubernetes/pkg/volume/util/nsenter"
- "k8s.io/kubernetes/pkg/volume/util/subpath"
- "k8s.io/utils/exec"
- "k8s.io/utils/nsenter"
- )
- const (
- // Kubelet component name
- componentKubelet = "kubelet"
- )
- // NewKubeletCommand creates a *cobra.Command object with default parameters
- func NewKubeletCommand() *cobra.Command {
- cleanFlagSet := pflag.NewFlagSet(componentKubelet, pflag.ContinueOnError)
- cleanFlagSet.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
- kubeletFlags := options.NewKubeletFlags()
- kubeletConfig, err := options.NewKubeletConfiguration()
- // programmer error
- if err != nil {
- klog.Fatal(err)
- }
- cmd := &cobra.Command{
- Use: componentKubelet,
- Long: `The kubelet is the primary "node agent" that runs on each
- node. It can register the node with the apiserver using one of: the hostname; a flag to
- override the hostname; or specific logic for a cloud provider.
- The kubelet works in terms of a PodSpec. A PodSpec is a YAML or JSON object
- that describes a pod. The kubelet takes a set of PodSpecs that are provided through
- various mechanisms (primarily through the apiserver) and ensures that the containers
- described in those PodSpecs are running and healthy. The kubelet doesn't manage
- containers which were not created by Kubernetes.
- Other than from an PodSpec from the apiserver, there are three ways that a container
- manifest can be provided to the Kubelet.
- File: Path passed as a flag on the command line. Files under this path will be monitored
- periodically for updates. The monitoring period is 20s by default and is configurable
- via a flag.
- HTTP endpoint: HTTP endpoint passed as a parameter on the command line. This endpoint
- is checked every 20 seconds (also configurable with a flag).
- HTTP server: The kubelet can also listen for HTTP and respond to a simple API
- (underspec'd currently) to submit a new manifest.`,
- // The Kubelet has special flag parsing requirements to enforce flag precedence rules,
- // so we do all our parsing manually in Run, below.
- // DisableFlagParsing=true provides the full set of flags passed to the kubelet in the
- // `args` arg to Run, without Cobra's interference.
- DisableFlagParsing: true,
- Run: func(cmd *cobra.Command, args []string) {
- // initial flag parse, since we disable cobra's flag parsing
- if err := cleanFlagSet.Parse(args); err != nil {
- cmd.Usage()
- klog.Fatal(err)
- }
- // check if there are non-flag arguments in the command line
- cmds := cleanFlagSet.Args()
- if len(cmds) > 0 {
- cmd.Usage()
- klog.Fatalf("unknown command: %s", cmds[0])
- }
- // short-circuit on help
- help, err := cleanFlagSet.GetBool("help")
- if err != nil {
- klog.Fatal(`"help" flag is non-bool, programmer error, please correct`)
- }
- if help {
- cmd.Help()
- return
- }
- // short-circuit on verflag
- verflag.PrintAndExitIfRequested()
- utilflag.PrintFlags(cleanFlagSet)
- // set feature gates from initial flags-based config
- if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
- klog.Fatal(err)
- }
- // validate the initial KubeletFlags
- if err := options.ValidateKubeletFlags(kubeletFlags); err != nil {
- klog.Fatal(err)
- }
- if kubeletFlags.ContainerRuntime == "remote" && cleanFlagSet.Changed("pod-infra-container-image") {
- klog.Warning("Warning: For remote container runtime, --pod-infra-container-image is ignored in kubelet, which should be set in that remote runtime instead")
- }
- // load kubelet config file, if provided
- if configFile := kubeletFlags.KubeletConfigFile; len(configFile) > 0 {
- kubeletConfig, err = loadConfigFile(configFile)
- if err != nil {
- klog.Fatal(err)
- }
- // We must enforce flag precedence by re-parsing the command line into the new object.
- // This is necessary to preserve backwards-compatibility across binary upgrades.
- // See issue #56171 for more details.
- if err := kubeletConfigFlagPrecedence(kubeletConfig, args); err != nil {
- klog.Fatal(err)
- }
- // update feature gates based on new config
- if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
- klog.Fatal(err)
- }
- }
- // We always validate the local configuration (command line + config file).
- // This is the default "last-known-good" config for dynamic config, and must always remain valid.
- if err := kubeletconfigvalidation.ValidateKubeletConfiguration(kubeletConfig); err != nil {
- klog.Fatal(err)
- }
- // use dynamic kubelet config, if enabled
- var kubeletConfigController *dynamickubeletconfig.Controller
- if dynamicConfigDir := kubeletFlags.DynamicConfigDir.Value(); len(dynamicConfigDir) > 0 {
- var dynamicKubeletConfig *kubeletconfiginternal.KubeletConfiguration
- dynamicKubeletConfig, kubeletConfigController, err = BootstrapKubeletConfigController(dynamicConfigDir,
- func(kc *kubeletconfiginternal.KubeletConfiguration) error {
- // Here, we enforce flag precedence inside the controller, prior to the controller's validation sequence,
- // so that we get a complete validation at the same point where we can decide to reject dynamic config.
- // This fixes the flag-precedence component of issue #63305.
- // See issue #56171 for general details on flag precedence.
- return kubeletConfigFlagPrecedence(kc, args)
- })
- if err != nil {
- klog.Fatal(err)
- }
- // If we should just use our existing, local config, the controller will return a nil config
- if dynamicKubeletConfig != nil {
- kubeletConfig = dynamicKubeletConfig
- // Note: flag precedence was already enforced in the controller, prior to validation,
- // by our above transform function. Now we simply update feature gates from the new config.
- if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
- klog.Fatal(err)
- }
- }
- }
- // construct a KubeletServer from kubeletFlags and kubeletConfig
- kubeletServer := &options.KubeletServer{
- KubeletFlags: *kubeletFlags,
- KubeletConfiguration: *kubeletConfig,
- }
- // use kubeletServer to construct the default KubeletDeps
- kubeletDeps, err := UnsecuredDependencies(kubeletServer)
- if err != nil {
- klog.Fatal(err)
- }
- // add the kubelet config controller to kubeletDeps
- kubeletDeps.KubeletConfigController = kubeletConfigController
- // set up stopCh here in order to be reused by kubelet and docker shim
- stopCh := genericapiserver.SetupSignalHandler()
- // start the experimental docker shim, if enabled
- if kubeletServer.KubeletFlags.ExperimentalDockershim {
- if err := RunDockershim(&kubeletServer.KubeletFlags, kubeletConfig, stopCh); err != nil {
- klog.Fatal(err)
- }
- return
- }
- // run the kubelet
- klog.V(5).Infof("KubeletConfiguration: %#v", kubeletServer.KubeletConfiguration)
- if err := Run(kubeletServer, kubeletDeps, stopCh); err != nil {
- klog.Fatal(err)
- }
- },
- }
- // keep cleanFlagSet separate, so Cobra doesn't pollute it with the global flags
- kubeletFlags.AddFlags(cleanFlagSet)
- options.AddKubeletConfigFlags(cleanFlagSet, kubeletConfig)
- options.AddGlobalFlags(cleanFlagSet)
- cleanFlagSet.BoolP("help", "h", false, fmt.Sprintf("help for %s", cmd.Name()))
- // ugly, but necessary, because Cobra's default UsageFunc and HelpFunc pollute the flagset with global flags
- const usageFmt = "Usage:\n %s\n\nFlags:\n%s"
- cmd.SetUsageFunc(func(cmd *cobra.Command) error {
- fmt.Fprintf(cmd.OutOrStderr(), usageFmt, cmd.UseLine(), cleanFlagSet.FlagUsagesWrapped(2))
- return nil
- })
- cmd.SetHelpFunc(func(cmd *cobra.Command, args []string) {
- fmt.Fprintf(cmd.OutOrStdout(), "%s\n\n"+usageFmt, cmd.Long, cmd.UseLine(), cleanFlagSet.FlagUsagesWrapped(2))
- })
- return cmd
- }
- // newFlagSetWithGlobals constructs a new pflag.FlagSet with global flags registered
- // on it.
- func newFlagSetWithGlobals() *pflag.FlagSet {
- fs := pflag.NewFlagSet("", pflag.ExitOnError)
- // set the normalize func, similar to k8s.io/component-base/cli//flags.go:InitFlags
- fs.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
- // explicitly add flags from libs that register global flags
- options.AddGlobalFlags(fs)
- return fs
- }
- // newFakeFlagSet constructs a pflag.FlagSet with the same flags as fs, but where
- // all values have noop Set implementations
- func newFakeFlagSet(fs *pflag.FlagSet) *pflag.FlagSet {
- ret := pflag.NewFlagSet("", pflag.ExitOnError)
- ret.SetNormalizeFunc(fs.GetNormalizeFunc())
- fs.VisitAll(func(f *pflag.Flag) {
- ret.VarP(cliflag.NoOp{}, f.Name, f.Shorthand, f.Usage)
- })
- return ret
- }
- // kubeletConfigFlagPrecedence re-parses flags over the KubeletConfiguration object.
- // We must enforce flag precedence by re-parsing the command line into the new object.
- // This is necessary to preserve backwards-compatibility across binary upgrades.
- // See issue #56171 for more details.
- func kubeletConfigFlagPrecedence(kc *kubeletconfiginternal.KubeletConfiguration, args []string) error {
- // We use a throwaway kubeletFlags and a fake global flagset to avoid double-parses,
- // as some Set implementations accumulate values from multiple flag invocations.
- fs := newFakeFlagSet(newFlagSetWithGlobals())
- // register throwaway KubeletFlags
- options.NewKubeletFlags().AddFlags(fs)
- // register new KubeletConfiguration
- options.AddKubeletConfigFlags(fs, kc)
- // Remember original feature gates, so we can merge with flag gates later
- original := kc.FeatureGates
- // re-parse flags
- if err := fs.Parse(args); err != nil {
- return err
- }
- // Add back feature gates that were set in the original kc, but not in flags
- for k, v := range original {
- if _, ok := kc.FeatureGates[k]; !ok {
- kc.FeatureGates[k] = v
- }
- }
- return nil
- }
- func loadConfigFile(name string) (*kubeletconfiginternal.KubeletConfiguration, error) {
- const errFmt = "failed to load Kubelet config file %s, error %v"
- // compute absolute path based on current working dir
- kubeletConfigFile, err := filepath.Abs(name)
- if err != nil {
- return nil, fmt.Errorf(errFmt, name, err)
- }
- loader, err := configfiles.NewFsLoader(utilfs.DefaultFs{}, kubeletConfigFile)
- if err != nil {
- return nil, fmt.Errorf(errFmt, name, err)
- }
- kc, err := loader.Load()
- if err != nil {
- return nil, fmt.Errorf(errFmt, name, err)
- }
- return kc, err
- }
- // UnsecuredDependencies returns a Dependencies suitable for being run, or an error if the server setup
- // is not valid. It will not start any background processes, and does not include authentication/authorization
- func UnsecuredDependencies(s *options.KubeletServer) (*kubelet.Dependencies, error) {
- // Initialize the TLS Options
- tlsOptions, err := InitializeTLS(&s.KubeletFlags, &s.KubeletConfiguration)
- if err != nil {
- return nil, err
- }
- mounter := mount.New(s.ExperimentalMounterPath)
- subpather := subpath.New(mounter)
- var pluginRunner = exec.New()
- if s.Containerized {
- klog.V(2).Info("Running kubelet in containerized mode")
- ne, err := nsenter.NewNsenter(nsenter.DefaultHostRootFsPath, exec.New())
- if err != nil {
- return nil, err
- }
- mounter = nsutil.NewMounter(s.RootDirectory, ne)
- // NSenter only valid on Linux
- subpather = subpath.NewNSEnter(mounter, ne, s.RootDirectory)
- // an exec interface which can use nsenter for flex plugin calls
- pluginRunner, err = nsenter.NewNsenter(nsenter.DefaultHostRootFsPath, exec.New())
- if err != nil {
- return nil, err
- }
- }
- var dockerClientConfig *dockershim.ClientConfig
- if s.ContainerRuntime == kubetypes.DockerContainerRuntime {
- dockerClientConfig = &dockershim.ClientConfig{
- DockerEndpoint: s.DockerEndpoint,
- RuntimeRequestTimeout: s.RuntimeRequestTimeout.Duration,
- ImagePullProgressDeadline: s.ImagePullProgressDeadline.Duration,
- }
- }
- return &kubelet.Dependencies{
- Auth: nil, // default does not enforce auth[nz]
- CAdvisorInterface: nil, // cadvisor.New launches background processes (bg http.ListenAndServe, and some bg cleaners), not set here
- Cloud: nil, // cloud provider might start background processes
- ContainerManager: nil,
- DockerClientConfig: dockerClientConfig,
- KubeClient: nil,
- HeartbeatClient: nil,
- EventClient: nil,
- Mounter: mounter,
- Subpather: subpather,
- OOMAdjuster: oom.NewOOMAdjuster(),
- OSInterface: kubecontainer.RealOS{},
- VolumePlugins: ProbeVolumePlugins(),
- DynamicPluginProber: GetDynamicPluginProber(s.VolumePluginDir, pluginRunner),
- TLSOptions: tlsOptions}, nil
- }
- // Run runs the specified KubeletServer with the given Dependencies. This should never exit.
- // The kubeDeps argument may be nil - if so, it is initialized from the settings on KubeletServer.
- // Otherwise, the caller is assumed to have set up the Dependencies object and a default one will
- // not be generated.
- func Run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan struct{}) error {
- // To help debugging, immediately log version
- klog.Infof("Version: %+v", version.Get())
- if err := initForOS(s.KubeletFlags.WindowsService); err != nil {
- return fmt.Errorf("failed OS init: %v", err)
- }
- if err := run(s, kubeDeps, stopCh); err != nil {
- return fmt.Errorf("failed to run Kubelet: %v", err)
- }
- return nil
- }
- func checkPermissions() error {
- if uid := os.Getuid(); uid != 0 {
- return fmt.Errorf("Kubelet needs to run as uid `0`. It is being run as %d", uid)
- }
- // TODO: Check if kubelet is running in the `initial` user namespace.
- // http://man7.org/linux/man-pages/man7/user_namespaces.7.html
- return nil
- }
- func setConfigz(cz *configz.Config, kc *kubeletconfiginternal.KubeletConfiguration) error {
- scheme, _, err := kubeletscheme.NewSchemeAndCodecs()
- if err != nil {
- return err
- }
- versioned := kubeletconfigv1beta1.KubeletConfiguration{}
- if err := scheme.Convert(kc, &versioned, nil); err != nil {
- return err
- }
- cz.Set(versioned)
- return nil
- }
- func initConfigz(kc *kubeletconfiginternal.KubeletConfiguration) error {
- cz, err := configz.New("kubeletconfig")
- if err != nil {
- klog.Errorf("unable to register configz: %s", err)
- return err
- }
- if err := setConfigz(cz, kc); err != nil {
- klog.Errorf("unable to register config: %s", err)
- return err
- }
- return nil
- }
- // makeEventRecorder sets up kubeDeps.Recorder if it's nil. It's a no-op otherwise.
- func makeEventRecorder(kubeDeps *kubelet.Dependencies, nodeName types.NodeName) {
- if kubeDeps.Recorder != nil {
- return
- }
- eventBroadcaster := record.NewBroadcaster()
- kubeDeps.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: componentKubelet, Host: string(nodeName)})
- eventBroadcaster.StartLogging(klog.V(3).Infof)
- if kubeDeps.EventClient != nil {
- klog.V(4).Infof("Sending events to api server.")
- eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeDeps.EventClient.Events("")})
- } else {
- klog.Warning("No api server defined - no events will be sent to API server.")
- }
- }
- func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan struct{}) (err error) {
- // Set global feature gates based on the value on the initial KubeletServer
- err = utilfeature.DefaultMutableFeatureGate.SetFromMap(s.KubeletConfiguration.FeatureGates)
- if err != nil {
- return err
- }
- // validate the initial KubeletServer (we set feature gates first, because this validation depends on feature gates)
- if err := options.ValidateKubeletServer(s); err != nil {
- return err
- }
- // Obtain Kubelet Lock File
- if s.ExitOnLockContention && s.LockFilePath == "" {
- return errors.New("cannot exit on lock file contention: no lock file specified")
- }
- done := make(chan struct{})
- if s.LockFilePath != "" {
- klog.Infof("acquiring file lock on %q", s.LockFilePath)
- if err := flock.Acquire(s.LockFilePath); err != nil {
- return fmt.Errorf("unable to acquire file lock on %q: %v", s.LockFilePath, err)
- }
- if s.ExitOnLockContention {
- klog.Infof("watching for inotify events for: %v", s.LockFilePath)
- if err := watchForLockfileContention(s.LockFilePath, done); err != nil {
- return err
- }
- }
- }
- // Register current configuration with /configz endpoint
- err = initConfigz(&s.KubeletConfiguration)
- if err != nil {
- klog.Errorf("unable to register KubeletConfiguration with configz, error: %v", err)
- }
- // About to get clients and such, detect standaloneMode
- standaloneMode := true
- if len(s.KubeConfig) > 0 {
- standaloneMode = false
- }
- if kubeDeps == nil {
- kubeDeps, err = UnsecuredDependencies(s)
- if err != nil {
- return err
- }
- }
- if kubeDeps.Cloud == nil {
- if !cloudprovider.IsExternal(s.CloudProvider) {
- cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
- if err != nil {
- return err
- }
- if cloud == nil {
- klog.V(2).Infof("No cloud provider specified: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile)
- } else {
- klog.V(2).Infof("Successfully initialized cloud provider: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile)
- }
- kubeDeps.Cloud = cloud
- }
- }
- hostName, err := nodeutil.GetHostname(s.HostnameOverride)
- if err != nil {
- return err
- }
- nodeName, err := getNodeName(kubeDeps.Cloud, hostName)
- if err != nil {
- return err
- }
- // if in standalone mode, indicate as much by setting all clients to nil
- switch {
- case standaloneMode:
- kubeDeps.KubeClient = nil
- kubeDeps.EventClient = nil
- kubeDeps.HeartbeatClient = nil
- klog.Warningf("standalone mode, no API client")
- case kubeDeps.KubeClient == nil, kubeDeps.EventClient == nil, kubeDeps.HeartbeatClient == nil:
- clientConfig, closeAllConns, err := buildKubeletClientConfig(s, nodeName)
- if err != nil {
- return err
- }
- if closeAllConns == nil {
- return errors.New("closeAllConns must be a valid function other than nil")
- }
- kubeDeps.OnHeartbeatFailure = closeAllConns
- kubeDeps.KubeClient, err = clientset.NewForConfig(clientConfig)
- if err != nil {
- return fmt.Errorf("failed to initialize kubelet client: %v", err)
- }
- // make a separate client for events
- eventClientConfig := *clientConfig
- eventClientConfig.QPS = float32(s.EventRecordQPS)
- eventClientConfig.Burst = int(s.EventBurst)
- kubeDeps.EventClient, err = v1core.NewForConfig(&eventClientConfig)
- if err != nil {
- return fmt.Errorf("failed to initialize kubelet event client: %v", err)
- }
- // make a separate client for heartbeat with throttling disabled and a timeout attached
- heartbeatClientConfig := *clientConfig
- heartbeatClientConfig.Timeout = s.KubeletConfiguration.NodeStatusUpdateFrequency.Duration
- // if the NodeLease feature is enabled, the timeout is the minimum of the lease duration and status update frequency
- if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) {
- leaseTimeout := time.Duration(s.KubeletConfiguration.NodeLeaseDurationSeconds) * time.Second
- if heartbeatClientConfig.Timeout > leaseTimeout {
- heartbeatClientConfig.Timeout = leaseTimeout
- }
- }
- heartbeatClientConfig.QPS = float32(-1)
- kubeDeps.HeartbeatClient, err = clientset.NewForConfig(&heartbeatClientConfig)
- if err != nil {
- return fmt.Errorf("failed to initialize kubelet heartbeat client: %v", err)
- }
- }
- // If the kubelet config controller is available, and dynamic config is enabled, start the config and status sync loops
- if utilfeature.DefaultFeatureGate.Enabled(features.DynamicKubeletConfig) && len(s.DynamicConfigDir.Value()) > 0 &&
- kubeDeps.KubeletConfigController != nil && !standaloneMode && !s.RunOnce {
- if err := kubeDeps.KubeletConfigController.StartSync(kubeDeps.KubeClient, kubeDeps.EventClient, string(nodeName)); err != nil {
- return err
- }
- }
- if kubeDeps.Auth == nil {
- auth, err := BuildAuth(nodeName, kubeDeps.KubeClient, s.KubeletConfiguration)
- if err != nil {
- return err
- }
- kubeDeps.Auth = auth
- }
- var cgroupRoots []string
- cgroupRoots = append(cgroupRoots, cm.NodeAllocatableRoot(s.CgroupRoot, s.CgroupDriver))
- kubeletCgroup, err := cm.GetKubeletContainer(s.KubeletCgroups)
- if err != nil {
- klog.Warningf("failed to get the kubelet's cgroup: %v. Kubelet system container metrics may be missing.", err)
- } else if kubeletCgroup != "" {
- cgroupRoots = append(cgroupRoots, kubeletCgroup)
- }
- runtimeCgroup, err := cm.GetRuntimeContainer(s.ContainerRuntime, s.RuntimeCgroups)
- if err != nil {
- klog.Warningf("failed to get the container runtime's cgroup: %v. Runtime system container metrics may be missing.", err)
- } else if runtimeCgroup != "" {
- // RuntimeCgroups is optional, so ignore if it isn't specified
- cgroupRoots = append(cgroupRoots, runtimeCgroup)
- }
- if s.SystemCgroups != "" {
- // SystemCgroups is optional, so ignore if it isn't specified
- cgroupRoots = append(cgroupRoots, s.SystemCgroups)
- }
- if kubeDeps.CAdvisorInterface == nil {
- imageFsInfoProvider := cadvisor.NewImageFsInfoProvider(s.ContainerRuntime, s.RemoteRuntimeEndpoint)
- kubeDeps.CAdvisorInterface, err = cadvisor.New(imageFsInfoProvider, s.RootDirectory, cgroupRoots, cadvisor.UsingLegacyCadvisorStats(s.ContainerRuntime, s.RemoteRuntimeEndpoint))
- if err != nil {
- return err
- }
- }
- // Setup event recorder if required.
- makeEventRecorder(kubeDeps, nodeName)
- if kubeDeps.ContainerManager == nil {
- if s.CgroupsPerQOS && s.CgroupRoot == "" {
- klog.Info("--cgroups-per-qos enabled, but --cgroup-root was not specified. defaulting to /")
- s.CgroupRoot = "/"
- }
- kubeReserved, err := parseResourceList(s.KubeReserved)
- if err != nil {
- return err
- }
- systemReserved, err := parseResourceList(s.SystemReserved)
- if err != nil {
- return err
- }
- var hardEvictionThresholds []evictionapi.Threshold
- // If the user requested to ignore eviction thresholds, then do not set valid values for hardEvictionThresholds here.
- if !s.ExperimentalNodeAllocatableIgnoreEvictionThreshold {
- hardEvictionThresholds, err = eviction.ParseThresholdConfig([]string{}, s.EvictionHard, nil, nil, nil)
- if err != nil {
- return err
- }
- }
- experimentalQOSReserved, err := cm.ParseQOSReserved(s.QOSReserved)
- if err != nil {
- return err
- }
- devicePluginEnabled := utilfeature.DefaultFeatureGate.Enabled(features.DevicePlugins)
- kubeDeps.ContainerManager, err = cm.NewContainerManager(
- kubeDeps.Mounter,
- kubeDeps.CAdvisorInterface,
- cm.NodeConfig{
- RuntimeCgroupsName: s.RuntimeCgroups,
- SystemCgroupsName: s.SystemCgroups,
- KubeletCgroupsName: s.KubeletCgroups,
- ContainerRuntime: s.ContainerRuntime,
- CgroupsPerQOS: s.CgroupsPerQOS,
- CgroupRoot: s.CgroupRoot,
- CgroupDriver: s.CgroupDriver,
- KubeletRootDir: s.RootDirectory,
- ProtectKernelDefaults: s.ProtectKernelDefaults,
- NodeAllocatableConfig: cm.NodeAllocatableConfig{
- KubeReservedCgroupName: s.KubeReservedCgroup,
- SystemReservedCgroupName: s.SystemReservedCgroup,
- EnforceNodeAllocatable: sets.NewString(s.EnforceNodeAllocatable...),
- KubeReserved: kubeReserved,
- SystemReserved: systemReserved,
- HardEvictionThresholds: hardEvictionThresholds,
- },
- QOSReserved: *experimentalQOSReserved,
- ExperimentalCPUManagerPolicy: s.CPUManagerPolicy,
- ExperimentalCPUManagerReconcilePeriod: s.CPUManagerReconcilePeriod.Duration,
- ExperimentalPodPidsLimit: s.PodPidsLimit,
- EnforceCPULimits: s.CPUCFSQuota,
- CPUCFSQuotaPeriod: s.CPUCFSQuotaPeriod.Duration,
- },
- s.FailSwapOn,
- devicePluginEnabled,
- kubeDeps.Recorder)
- if err != nil {
- return err
- }
- }
- if err := checkPermissions(); err != nil {
- klog.Error(err)
- }
- utilruntime.ReallyCrash = s.ReallyCrashForTesting
- rand.Seed(time.Now().UnixNano())
- // TODO(vmarmol): Do this through container config.
- oomAdjuster := kubeDeps.OOMAdjuster
- if err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil {
- klog.Warning(err)
- }
- if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
- return err
- }
- if s.HealthzPort > 0 {
- mux := http.NewServeMux()
- healthz.InstallHandler(mux)
- go wait.Until(func() {
- err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(int(s.HealthzPort))), mux)
- if err != nil {
- klog.Errorf("Starting healthz server failed: %v", err)
- }
- }, 5*time.Second, wait.NeverStop)
- }
- if s.RunOnce {
- return nil
- }
- // If systemd is used, notify it that we have started
- go daemon.SdNotify(false, "READY=1")
- select {
- case <-done:
- break
- case <-stopCh:
- break
- }
- return nil
- }
- // buildKubeletClientConfig constructs the appropriate client config for the kubelet depending on whether
- // bootstrapping is enabled or client certificate rotation is enabled.
- func buildKubeletClientConfig(s *options.KubeletServer, nodeName types.NodeName) (*restclient.Config, func(), error) {
- if s.RotateCertificates && utilfeature.DefaultFeatureGate.Enabled(features.RotateKubeletClientCertificate) {
- // Rules for client rotation and the handling of kube config files:
- //
- // 1. If the client provides only a kubeconfig file, we must use that as the initial client
- // kubeadm needs the initial data in the kubeconfig to be placed into the cert store
- // 2. If the client provides only an initial bootstrap kubeconfig file, we must create a
- // kubeconfig file at the target location that points to the cert store, but until
- // the file is present the client config will have no certs
- // 3. If the client provides both and the kubeconfig is valid, we must ignore the bootstrap
- // kubeconfig.
- // 4. If the client provides both and the kubeconfig is expired or otherwise invalid, we must
- // replace the kubeconfig with a new file that points to the cert dir
- //
- // The desired configuration for bootstrapping is to use a bootstrap kubeconfig and to have
- // the kubeconfig file be managed by this process. For backwards compatibility with kubeadm,
- // which provides a high powered kubeconfig on the master with cert/key data, we must
- // bootstrap the cert manager with the contents of the initial client config.
- klog.Infof("Client rotation is on, will bootstrap in background")
- certConfig, clientConfig, err := bootstrap.LoadClientConfig(s.KubeConfig, s.BootstrapKubeconfig, s.CertDirectory)
- if err != nil {
- return nil, nil, err
- }
- // use the correct content type for cert rotation, but don't set QPS
- setContentTypeForClient(certConfig, s.ContentType)
- kubeClientConfigOverrides(s, clientConfig)
- clientCertificateManager, err := buildClientCertificateManager(certConfig, clientConfig, s.CertDirectory, nodeName)
- if err != nil {
- return nil, nil, err
- }
- // the rotating transport will use the cert from the cert manager instead of these files
- transportConfig := restclient.AnonymousClientConfig(clientConfig)
- // we set exitAfter to five minutes because we use this client configuration to request new certs - if we are unable
- // to request new certs, we will be unable to continue normal operation. Exiting the process allows a wrapper
- // or the bootstrapping credentials to potentially lay down new initial config.
- closeAllConns, err := kubeletcertificate.UpdateTransport(wait.NeverStop, transportConfig, clientCertificateManager, 5*time.Minute)
- if err != nil {
- return nil, nil, err
- }
- klog.V(2).Info("Starting client certificate rotation.")
- clientCertificateManager.Start()
- return transportConfig, closeAllConns, nil
- }
- if len(s.BootstrapKubeconfig) > 0 {
- if err := bootstrap.LoadClientCert(s.KubeConfig, s.BootstrapKubeconfig, s.CertDirectory, nodeName); err != nil {
- return nil, nil, err
- }
- }
- clientConfig, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
- &clientcmd.ClientConfigLoadingRules{ExplicitPath: s.KubeConfig},
- &clientcmd.ConfigOverrides{},
- ).ClientConfig()
- if err != nil {
- return nil, nil, fmt.Errorf("invalid kubeconfig: %v", err)
- }
- kubeClientConfigOverrides(s, clientConfig)
- closeAllConns, err := updateDialer(clientConfig)
- if err != nil {
- return nil, nil, err
- }
- return clientConfig, closeAllConns, nil
- }
- // updateDialer instruments a restconfig with a dial. the returned function allows forcefully closing all active connections.
- func updateDialer(clientConfig *restclient.Config) (func(), error) {
- if clientConfig.Transport != nil || clientConfig.Dial != nil {
- return nil, fmt.Errorf("there is already a transport or dialer configured")
- }
- d := connrotation.NewDialer((&net.Dialer{Timeout: 30 * time.Second, KeepAlive: 30 * time.Second}).DialContext)
- clientConfig.Dial = d.DialContext
- return d.CloseAll, nil
- }
- // buildClientCertificateManager creates a certificate manager that will use certConfig to request a client certificate
- // if no certificate is available, or the most recent clientConfig (which is assumed to point to the cert that the manager will
- // write out).
- func buildClientCertificateManager(certConfig, clientConfig *restclient.Config, certDir string, nodeName types.NodeName) (certificate.Manager, error) {
- newClientFn := func(current *tls.Certificate) (certificatesclient.CertificateSigningRequestInterface, error) {
- // If we have a valid certificate, use that to fetch CSRs. Otherwise use the bootstrap
- // credentials. In the future it would be desirable to change the behavior of bootstrap
- // to always fall back to the external bootstrap credentials when such credentials are
- // provided by a fundamental trust system like cloud VM identity or an HSM module.
- config := certConfig
- if current != nil {
- config = clientConfig
- }
- client, err := clientset.NewForConfig(config)
- if err != nil {
- return nil, err
- }
- return client.CertificatesV1beta1().CertificateSigningRequests(), nil
- }
- return kubeletcertificate.NewKubeletClientCertificateManager(
- certDir,
- nodeName,
- // this preserves backwards compatibility with kubeadm which passes
- // a high powered certificate to the kubelet as --kubeconfig and expects
- // it to be rotated out immediately
- clientConfig.CertData,
- clientConfig.KeyData,
- clientConfig.CertFile,
- clientConfig.KeyFile,
- newClientFn,
- )
- }
- func kubeClientConfigOverrides(s *options.KubeletServer, clientConfig *restclient.Config) {
- setContentTypeForClient(clientConfig, s.ContentType)
- // Override kubeconfig qps/burst settings from flags
- clientConfig.QPS = float32(s.KubeAPIQPS)
- clientConfig.Burst = int(s.KubeAPIBurst)
- }
- // getNodeName returns the node name according to the cloud provider
- // if cloud provider is specified. Otherwise, returns the hostname of the node.
- func getNodeName(cloud cloudprovider.Interface, hostname string) (types.NodeName, error) {
- if cloud == nil {
- return types.NodeName(hostname), nil
- }
- instances, ok := cloud.Instances()
- if !ok {
- return "", fmt.Errorf("failed to get instances from cloud provider")
- }
- nodeName, err := instances.CurrentNodeName(context.TODO(), hostname)
- if err != nil {
- return "", fmt.Errorf("error fetching current node name from cloud provider: %v", err)
- }
- klog.V(2).Infof("cloud provider determined current node name to be %s", nodeName)
- return nodeName, nil
- }
- // InitializeTLS checks for a configured TLSCertFile and TLSPrivateKeyFile: if unspecified a new self-signed
- // certificate and key file are generated. Returns a configured server.TLSOptions object.
- func InitializeTLS(kf *options.KubeletFlags, kc *kubeletconfiginternal.KubeletConfiguration) (*server.TLSOptions, error) {
- if !kc.ServerTLSBootstrap && kc.TLSCertFile == "" && kc.TLSPrivateKeyFile == "" {
- kc.TLSCertFile = path.Join(kf.CertDirectory, "kubelet.crt")
- kc.TLSPrivateKeyFile = path.Join(kf.CertDirectory, "kubelet.key")
- canReadCertAndKey, err := certutil.CanReadCertAndKey(kc.TLSCertFile, kc.TLSPrivateKeyFile)
- if err != nil {
- return nil, err
- }
- if !canReadCertAndKey {
- hostName, err := nodeutil.GetHostname(kf.HostnameOverride)
- if err != nil {
- return nil, err
- }
- cert, key, err := certutil.GenerateSelfSignedCertKey(hostName, nil, nil)
- if err != nil {
- return nil, fmt.Errorf("unable to generate self signed cert: %v", err)
- }
- if err := certutil.WriteCert(kc.TLSCertFile, cert); err != nil {
- return nil, err
- }
- if err := keyutil.WriteKey(kc.TLSPrivateKeyFile, key); err != nil {
- return nil, err
- }
- klog.V(4).Infof("Using self-signed cert (%s, %s)", kc.TLSCertFile, kc.TLSPrivateKeyFile)
- }
- }
- tlsCipherSuites, err := cliflag.TLSCipherSuites(kc.TLSCipherSuites)
- if err != nil {
- return nil, err
- }
- minTLSVersion, err := cliflag.TLSVersion(kc.TLSMinVersion)
- if err != nil {
- return nil, err
- }
- tlsOptions := &server.TLSOptions{
- Config: &tls.Config{
- MinVersion: minTLSVersion,
- CipherSuites: tlsCipherSuites,
- },
- CertFile: kc.TLSCertFile,
- KeyFile: kc.TLSPrivateKeyFile,
- }
- if len(kc.Authentication.X509.ClientCAFile) > 0 {
- clientCAs, err := certutil.NewPool(kc.Authentication.X509.ClientCAFile)
- if err != nil {
- return nil, fmt.Errorf("unable to load client CA file %s: %v", kc.Authentication.X509.ClientCAFile, err)
- }
- // Specify allowed CAs for client certificates
- tlsOptions.Config.ClientCAs = clientCAs
- // Populate PeerCertificates in requests, but don't reject connections without verified certificates
- tlsOptions.Config.ClientAuth = tls.RequestClientCert
- }
- return tlsOptions, nil
- }
- // setContentTypeForClient sets the appropritae content type into the rest config
- // and handles defaulting AcceptContentTypes based on that input.
- func setContentTypeForClient(cfg *restclient.Config, contentType string) {
- if len(contentType) == 0 {
- return
- }
- cfg.ContentType = contentType
- switch contentType {
- case runtime.ContentTypeProtobuf:
- cfg.AcceptContentTypes = strings.Join([]string{runtime.ContentTypeProtobuf, runtime.ContentTypeJSON}, ",")
- default:
- // otherwise let the rest client perform defaulting
- }
- }
- // RunKubelet is responsible for setting up and running a kubelet. It is used in three different applications:
- // 1 Integration tests
- // 2 Kubelet binary
- // 3 Standalone 'kubernetes' binary
- // Eventually, #2 will be replaced with instances of #3
- func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
- hostname, err := nodeutil.GetHostname(kubeServer.HostnameOverride)
- if err != nil {
- return err
- }
- // Query the cloud provider for our node name, default to hostname if kubeDeps.Cloud == nil
- nodeName, err := getNodeName(kubeDeps.Cloud, hostname)
- if err != nil {
- return err
- }
- // Setup event recorder if required.
- makeEventRecorder(kubeDeps, nodeName)
- capabilities.Initialize(capabilities.Capabilities{
- AllowPrivileged: true,
- })
- credentialprovider.SetPreferredDockercfgPath(kubeServer.RootDirectory)
- klog.V(2).Infof("Using root directory: %v", kubeServer.RootDirectory)
- if kubeDeps.OSInterface == nil {
- kubeDeps.OSInterface = kubecontainer.RealOS{}
- }
- k, err := createAndInitKubelet(&kubeServer.KubeletConfiguration,
- kubeDeps,
- &kubeServer.ContainerRuntimeOptions,
- kubeServer.ContainerRuntime,
- kubeServer.RuntimeCgroups,
- kubeServer.HostnameOverride,
- kubeServer.NodeIP,
- kubeServer.ProviderID,
- kubeServer.CloudProvider,
- kubeServer.CertDirectory,
- kubeServer.RootDirectory,
- kubeServer.RegisterNode,
- kubeServer.RegisterWithTaints,
- kubeServer.AllowedUnsafeSysctls,
- kubeServer.RemoteRuntimeEndpoint,
- kubeServer.RemoteImageEndpoint,
- kubeServer.ExperimentalMounterPath,
- kubeServer.ExperimentalKernelMemcgNotification,
- kubeServer.ExperimentalCheckNodeCapabilitiesBeforeMount,
- kubeServer.ExperimentalNodeAllocatableIgnoreEvictionThreshold,
- kubeServer.MinimumGCAge,
- kubeServer.MaxPerPodContainerCount,
- kubeServer.MaxContainerCount,
- kubeServer.MasterServiceNamespace,
- kubeServer.RegisterSchedulable,
- kubeServer.NonMasqueradeCIDR,
- kubeServer.KeepTerminatedPodVolumes,
- kubeServer.NodeLabels,
- kubeServer.SeccompProfileRoot,
- kubeServer.BootstrapCheckpointPath,
- kubeServer.NodeStatusMaxImages)
- if err != nil {
- return fmt.Errorf("failed to create kubelet: %v", err)
- }
- // NewMainKubelet should have set up a pod source config if one didn't exist
- // when the builder was run. This is just a precaution.
- if kubeDeps.PodConfig == nil {
- return fmt.Errorf("failed to create kubelet, pod source config was nil")
- }
- podCfg := kubeDeps.PodConfig
- rlimit.RlimitNumFiles(uint64(kubeServer.MaxOpenFiles))
- // process pods and exit.
- if runOnce {
- if _, err := k.RunOnce(podCfg.Updates()); err != nil {
- return fmt.Errorf("runonce failed: %v", err)
- }
- klog.Info("Started kubelet as runonce")
- } else {
- startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableCAdvisorJSONEndpoints, kubeServer.EnableServer)
- klog.Info("Started kubelet")
- }
- return nil
- }
- func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableCAdvisorJSONEndpoints, enableServer bool) {
- // start the kubelet
- go wait.Until(func() {
- k.Run(podCfg.Updates())
- }, 0, wait.NeverStop)
- // start the kubelet server
- if enableServer {
- go k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, enableCAdvisorJSONEndpoints, kubeCfg.EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling)
- }
- if kubeCfg.ReadOnlyPort > 0 {
- go k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort), enableCAdvisorJSONEndpoints)
- }
- if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPodResources) {
- go k.ListenAndServePodResources()
- }
- }
- func createAndInitKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
- kubeDeps *kubelet.Dependencies,
- crOptions *config.ContainerRuntimeOptions,
- containerRuntime string,
- runtimeCgroups string,
- hostnameOverride string,
- nodeIP string,
- providerID string,
- cloudProvider string,
- certDirectory string,
- rootDirectory string,
- registerNode bool,
- registerWithTaints []api.Taint,
- allowedUnsafeSysctls []string,
- remoteRuntimeEndpoint string,
- remoteImageEndpoint string,
- experimentalMounterPath string,
- experimentalKernelMemcgNotification bool,
- experimentalCheckNodeCapabilitiesBeforeMount bool,
- experimentalNodeAllocatableIgnoreEvictionThreshold bool,
- minimumGCAge metav1.Duration,
- maxPerPodContainerCount int32,
- maxContainerCount int32,
- masterServiceNamespace string,
- registerSchedulable bool,
- nonMasqueradeCIDR string,
- keepTerminatedPodVolumes bool,
- nodeLabels map[string]string,
- seccompProfileRoot string,
- bootstrapCheckpointPath string,
- nodeStatusMaxImages int32) (k kubelet.Bootstrap, err error) {
- // TODO: block until all sources have delivered at least one update to the channel, or break the sync loop
- // up into "per source" synchronizations
- k, err = kubelet.NewMainKubelet(kubeCfg,
- kubeDeps,
- crOptions,
- containerRuntime,
- runtimeCgroups,
- hostnameOverride,
- nodeIP,
- providerID,
- cloudProvider,
- certDirectory,
- rootDirectory,
- registerNode,
- registerWithTaints,
- allowedUnsafeSysctls,
- remoteRuntimeEndpoint,
- remoteImageEndpoint,
- experimentalMounterPath,
- experimentalKernelMemcgNotification,
- experimentalCheckNodeCapabilitiesBeforeMount,
- experimentalNodeAllocatableIgnoreEvictionThreshold,
- minimumGCAge,
- maxPerPodContainerCount,
- maxContainerCount,
- masterServiceNamespace,
- registerSchedulable,
- nonMasqueradeCIDR,
- keepTerminatedPodVolumes,
- nodeLabels,
- seccompProfileRoot,
- bootstrapCheckpointPath,
- nodeStatusMaxImages)
- if err != nil {
- return nil, err
- }
- k.BirthCry()
- k.StartGarbageCollection()
- return k, nil
- }
- // parseResourceList parses the given configuration map into an API
- // ResourceList or returns an error.
- func parseResourceList(m map[string]string) (v1.ResourceList, error) {
- if len(m) == 0 {
- return nil, nil
- }
- rl := make(v1.ResourceList)
- for k, v := range m {
- switch v1.ResourceName(k) {
- // CPU, memory, local storage, and PID resources are supported.
- case v1.ResourceCPU, v1.ResourceMemory, v1.ResourceEphemeralStorage, pidlimit.PIDs:
- if v1.ResourceName(k) != pidlimit.PIDs || utilfeature.DefaultFeatureGate.Enabled(features.SupportNodePidsLimit) {
- q, err := resource.ParseQuantity(v)
- if err != nil {
- return nil, err
- }
- if q.Sign() == -1 {
- return nil, fmt.Errorf("resource quantity for %q cannot be negative: %v", k, v)
- }
- rl[v1.ResourceName(k)] = q
- }
- default:
- return nil, fmt.Errorf("cannot reserve %q resource", k)
- }
- }
- return rl, nil
- }
- // BootstrapKubeletConfigController constructs and bootstrap a configuration controller
- func BootstrapKubeletConfigController(dynamicConfigDir string, transform dynamickubeletconfig.TransformFunc) (*kubeletconfiginternal.KubeletConfiguration, *dynamickubeletconfig.Controller, error) {
- if !utilfeature.DefaultFeatureGate.Enabled(features.DynamicKubeletConfig) {
- return nil, nil, fmt.Errorf("failed to bootstrap Kubelet config controller, you must enable the DynamicKubeletConfig feature gate")
- }
- if len(dynamicConfigDir) == 0 {
- return nil, nil, fmt.Errorf("cannot bootstrap Kubelet config controller, --dynamic-config-dir was not provided")
- }
- // compute absolute path and bootstrap controller
- dir, err := filepath.Abs(dynamicConfigDir)
- if err != nil {
- return nil, nil, fmt.Errorf("failed to get absolute path for --dynamic-config-dir=%s", dynamicConfigDir)
- }
- // get the latest KubeletConfiguration checkpoint from disk, or return the default config if no valid checkpoints exist
- c := dynamickubeletconfig.NewController(dir, transform)
- kc, err := c.Bootstrap()
- if err != nil {
- return nil, nil, fmt.Errorf("failed to determine a valid configuration, error: %v", err)
- }
- return kc, c, nil
- }
- // RunDockershim only starts the dockershim in current process. This is only used for cri validate testing purpose
- // TODO(random-liu): Move this to a separate binary.
- func RunDockershim(f *options.KubeletFlags, c *kubeletconfiginternal.KubeletConfiguration, stopCh <-chan struct{}) error {
- r := &f.ContainerRuntimeOptions
- // Initialize docker client configuration.
- dockerClientConfig := &dockershim.ClientConfig{
- DockerEndpoint: r.DockerEndpoint,
- RuntimeRequestTimeout: c.RuntimeRequestTimeout.Duration,
- ImagePullProgressDeadline: r.ImagePullProgressDeadline.Duration,
- }
- // Initialize network plugin settings.
- pluginSettings := dockershim.NetworkPluginSettings{
- HairpinMode: kubeletconfiginternal.HairpinMode(c.HairpinMode),
- NonMasqueradeCIDR: f.NonMasqueradeCIDR,
- PluginName: r.NetworkPluginName,
- PluginConfDir: r.CNIConfDir,
- PluginBinDirString: r.CNIBinDir,
- MTU: int(r.NetworkPluginMTU),
- }
- // Initialize streaming configuration. (Not using TLS now)
- streamingConfig := &streaming.Config{
- // Use a relative redirect (no scheme or host).
- BaseURL: &url.URL{Path: "/cri/"},
- StreamIdleTimeout: c.StreamingConnectionIdleTimeout.Duration,
- StreamCreationTimeout: streaming.DefaultConfig.StreamCreationTimeout,
- SupportedRemoteCommandProtocols: streaming.DefaultConfig.SupportedRemoteCommandProtocols,
- SupportedPortForwardProtocols: streaming.DefaultConfig.SupportedPortForwardProtocols,
- }
- // Standalone dockershim will always start the local streaming server.
- ds, err := dockershim.NewDockerService(dockerClientConfig, r.PodSandboxImage, streamingConfig, &pluginSettings,
- f.RuntimeCgroups, c.CgroupDriver, r.DockershimRootDirectory, true /*startLocalStreamingServer*/)
- if err != nil {
- return err
- }
- klog.V(2).Infof("Starting the GRPC server for the docker CRI shim.")
- server := dockerremote.NewDockerServer(f.RemoteRuntimeEndpoint, ds)
- if err := server.Start(); err != nil {
- return err
- }
- <-stopCh
- return nil
- }
|