12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310 |
- /*
- Copyright 2015 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- // Package app makes it easy to create a kubelet server for various contexts.
- package app
- import (
- "context"
- "crypto/tls"
- "errors"
- "fmt"
- "net"
- "net/http"
- "net/url"
- "os"
- "path"
- "path/filepath"
- "strconv"
- "strings"
- "time"
- "github.com/coreos/go-systemd/daemon"
- "github.com/spf13/cobra"
- "github.com/spf13/pflag"
- "k8s.io/klog"
- "k8s.io/utils/mount"
- v1 "k8s.io/api/core/v1"
- "k8s.io/apimachinery/pkg/api/resource"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/runtime"
- "k8s.io/apimachinery/pkg/types"
- utilruntime "k8s.io/apimachinery/pkg/util/runtime"
- "k8s.io/apimachinery/pkg/util/sets"
- "k8s.io/apimachinery/pkg/util/wait"
- genericapiserver "k8s.io/apiserver/pkg/server"
- "k8s.io/apiserver/pkg/server/healthz"
- utilfeature "k8s.io/apiserver/pkg/util/feature"
- clientset "k8s.io/client-go/kubernetes"
- certificatesclient "k8s.io/client-go/kubernetes/typed/certificates/v1beta1"
- v1core "k8s.io/client-go/kubernetes/typed/core/v1"
- restclient "k8s.io/client-go/rest"
- "k8s.io/client-go/tools/clientcmd"
- "k8s.io/client-go/tools/record"
- certutil "k8s.io/client-go/util/cert"
- "k8s.io/client-go/util/certificate"
- "k8s.io/client-go/util/connrotation"
- "k8s.io/client-go/util/keyutil"
- cloudprovider "k8s.io/cloud-provider"
- cliflag "k8s.io/component-base/cli/flag"
- "k8s.io/component-base/featuregate"
- "k8s.io/component-base/version"
- "k8s.io/component-base/version/verflag"
- kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1"
- "k8s.io/kubernetes/cmd/kubelet/app/options"
- "k8s.io/kubernetes/pkg/api/legacyscheme"
- api "k8s.io/kubernetes/pkg/apis/core"
- "k8s.io/kubernetes/pkg/capabilities"
- "k8s.io/kubernetes/pkg/credentialprovider"
- "k8s.io/kubernetes/pkg/features"
- "k8s.io/kubernetes/pkg/kubelet"
- kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
- kubeletscheme "k8s.io/kubernetes/pkg/kubelet/apis/config/scheme"
- kubeletconfigvalidation "k8s.io/kubernetes/pkg/kubelet/apis/config/validation"
- "k8s.io/kubernetes/pkg/kubelet/cadvisor"
- kubeletcertificate "k8s.io/kubernetes/pkg/kubelet/certificate"
- "k8s.io/kubernetes/pkg/kubelet/certificate/bootstrap"
- "k8s.io/kubernetes/pkg/kubelet/cm"
- "k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
- "k8s.io/kubernetes/pkg/kubelet/config"
- kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
- "k8s.io/kubernetes/pkg/kubelet/dockershim"
- dockerremote "k8s.io/kubernetes/pkg/kubelet/dockershim/remote"
- "k8s.io/kubernetes/pkg/kubelet/eviction"
- evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
- dynamickubeletconfig "k8s.io/kubernetes/pkg/kubelet/kubeletconfig"
- "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/configfiles"
- "k8s.io/kubernetes/pkg/kubelet/server"
- "k8s.io/kubernetes/pkg/kubelet/server/streaming"
- "k8s.io/kubernetes/pkg/kubelet/stats/pidlimit"
- kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
- "k8s.io/kubernetes/pkg/util/configz"
- utilfs "k8s.io/kubernetes/pkg/util/filesystem"
- utilflag "k8s.io/kubernetes/pkg/util/flag"
- "k8s.io/kubernetes/pkg/util/flock"
- nodeutil "k8s.io/kubernetes/pkg/util/node"
- "k8s.io/kubernetes/pkg/util/oom"
- "k8s.io/kubernetes/pkg/util/rlimit"
- "k8s.io/kubernetes/pkg/volume/util/hostutil"
- "k8s.io/kubernetes/pkg/volume/util/subpath"
- "k8s.io/utils/exec"
- )
- const (
- // Kubelet component name
- componentKubelet = "kubelet"
- )
- // NewKubeletCommand creates a *cobra.Command object with default parameters
- func NewKubeletCommand() *cobra.Command {
- cleanFlagSet := pflag.NewFlagSet(componentKubelet, pflag.ContinueOnError)
- cleanFlagSet.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
- kubeletFlags := options.NewKubeletFlags()
- kubeletConfig, err := options.NewKubeletConfiguration()
- // programmer error
- if err != nil {
- klog.Fatal(err)
- }
- cmd := &cobra.Command{
- Use: componentKubelet,
- Long: `The kubelet is the primary "node agent" that runs on each
- node. It can register the node with the apiserver using one of: the hostname; a flag to
- override the hostname; or specific logic for a cloud provider.
- The kubelet works in terms of a PodSpec. A PodSpec is a YAML or JSON object
- that describes a pod. The kubelet takes a set of PodSpecs that are provided through
- various mechanisms (primarily through the apiserver) and ensures that the containers
- described in those PodSpecs are running and healthy. The kubelet doesn't manage
- containers which were not created by Kubernetes.
- Other than from an PodSpec from the apiserver, there are three ways that a container
- manifest can be provided to the Kubelet.
- File: Path passed as a flag on the command line. Files under this path will be monitored
- periodically for updates. The monitoring period is 20s by default and is configurable
- via a flag.
- HTTP endpoint: HTTP endpoint passed as a parameter on the command line. This endpoint
- is checked every 20 seconds (also configurable with a flag).
- HTTP server: The kubelet can also listen for HTTP and respond to a simple API
- (underspec'd currently) to submit a new manifest.`,
- // The Kubelet has special flag parsing requirements to enforce flag precedence rules,
- // so we do all our parsing manually in Run, below.
- // DisableFlagParsing=true provides the full set of flags passed to the kubelet in the
- // `args` arg to Run, without Cobra's interference.
- DisableFlagParsing: true,
- Run: func(cmd *cobra.Command, args []string) {
- // initial flag parse, since we disable cobra's flag parsing
- if err := cleanFlagSet.Parse(args); err != nil {
- cmd.Usage()
- klog.Fatal(err)
- }
- // check if there are non-flag arguments in the command line
- cmds := cleanFlagSet.Args()
- if len(cmds) > 0 {
- cmd.Usage()
- klog.Fatalf("unknown command: %s", cmds[0])
- }
- // short-circuit on help
- help, err := cleanFlagSet.GetBool("help")
- if err != nil {
- klog.Fatal(`"help" flag is non-bool, programmer error, please correct`)
- }
- if help {
- cmd.Help()
- return
- }
- // short-circuit on verflag
- verflag.PrintAndExitIfRequested()
- utilflag.PrintFlags(cleanFlagSet)
- // set feature gates from initial flags-based config
- if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
- klog.Fatal(err)
- }
- // validate the initial KubeletFlags
- if err := options.ValidateKubeletFlags(kubeletFlags); err != nil {
- klog.Fatal(err)
- }
- if kubeletFlags.ContainerRuntime == "remote" && cleanFlagSet.Changed("pod-infra-container-image") {
- klog.Warning("Warning: For remote container runtime, --pod-infra-container-image is ignored in kubelet, which should be set in that remote runtime instead")
- }
- // load kubelet config file, if provided
- if configFile := kubeletFlags.KubeletConfigFile; len(configFile) > 0 {
- kubeletConfig, err = loadConfigFile(configFile)
- if err != nil {
- klog.Fatal(err)
- }
- // We must enforce flag precedence by re-parsing the command line into the new object.
- // This is necessary to preserve backwards-compatibility across binary upgrades.
- // See issue #56171 for more details.
- if err := kubeletConfigFlagPrecedence(kubeletConfig, args); err != nil {
- klog.Fatal(err)
- }
- // update feature gates based on new config
- if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
- klog.Fatal(err)
- }
- }
- // We always validate the local configuration (command line + config file).
- // This is the default "last-known-good" config for dynamic config, and must always remain valid.
- if err := kubeletconfigvalidation.ValidateKubeletConfiguration(kubeletConfig); err != nil {
- klog.Fatal(err)
- }
- // use dynamic kubelet config, if enabled
- var kubeletConfigController *dynamickubeletconfig.Controller
- if dynamicConfigDir := kubeletFlags.DynamicConfigDir.Value(); len(dynamicConfigDir) > 0 {
- var dynamicKubeletConfig *kubeletconfiginternal.KubeletConfiguration
- dynamicKubeletConfig, kubeletConfigController, err = BootstrapKubeletConfigController(dynamicConfigDir,
- func(kc *kubeletconfiginternal.KubeletConfiguration) error {
- // Here, we enforce flag precedence inside the controller, prior to the controller's validation sequence,
- // so that we get a complete validation at the same point where we can decide to reject dynamic config.
- // This fixes the flag-precedence component of issue #63305.
- // See issue #56171 for general details on flag precedence.
- return kubeletConfigFlagPrecedence(kc, args)
- })
- if err != nil {
- klog.Fatal(err)
- }
- // If we should just use our existing, local config, the controller will return a nil config
- if dynamicKubeletConfig != nil {
- kubeletConfig = dynamicKubeletConfig
- // Note: flag precedence was already enforced in the controller, prior to validation,
- // by our above transform function. Now we simply update feature gates from the new config.
- if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
- klog.Fatal(err)
- }
- }
- }
- // construct a KubeletServer from kubeletFlags and kubeletConfig
- kubeletServer := &options.KubeletServer{
- KubeletFlags: *kubeletFlags,
- KubeletConfiguration: *kubeletConfig,
- }
- // use kubeletServer to construct the default KubeletDeps
- kubeletDeps, err := UnsecuredDependencies(kubeletServer, utilfeature.DefaultFeatureGate)
- if err != nil {
- klog.Fatal(err)
- }
- // add the kubelet config controller to kubeletDeps
- kubeletDeps.KubeletConfigController = kubeletConfigController
- // set up stopCh here in order to be reused by kubelet and docker shim
- stopCh := genericapiserver.SetupSignalHandler()
- // start the experimental docker shim, if enabled
- if kubeletServer.KubeletFlags.ExperimentalDockershim {
- if err := RunDockershim(&kubeletServer.KubeletFlags, kubeletConfig, stopCh); err != nil {
- klog.Fatal(err)
- }
- return
- }
- // run the kubelet
- klog.V(5).Infof("KubeletConfiguration: %#v", kubeletServer.KubeletConfiguration)
- if err := Run(kubeletServer, kubeletDeps, utilfeature.DefaultFeatureGate, stopCh); err != nil {
- klog.Fatal(err)
- }
- },
- }
- // keep cleanFlagSet separate, so Cobra doesn't pollute it with the global flags
- kubeletFlags.AddFlags(cleanFlagSet)
- options.AddKubeletConfigFlags(cleanFlagSet, kubeletConfig)
- options.AddGlobalFlags(cleanFlagSet)
- cleanFlagSet.BoolP("help", "h", false, fmt.Sprintf("help for %s", cmd.Name()))
- // ugly, but necessary, because Cobra's default UsageFunc and HelpFunc pollute the flagset with global flags
- const usageFmt = "Usage:\n %s\n\nFlags:\n%s"
- cmd.SetUsageFunc(func(cmd *cobra.Command) error {
- fmt.Fprintf(cmd.OutOrStderr(), usageFmt, cmd.UseLine(), cleanFlagSet.FlagUsagesWrapped(2))
- return nil
- })
- cmd.SetHelpFunc(func(cmd *cobra.Command, args []string) {
- fmt.Fprintf(cmd.OutOrStdout(), "%s\n\n"+usageFmt, cmd.Long, cmd.UseLine(), cleanFlagSet.FlagUsagesWrapped(2))
- })
- return cmd
- }
- // newFlagSetWithGlobals constructs a new pflag.FlagSet with global flags registered
- // on it.
- func newFlagSetWithGlobals() *pflag.FlagSet {
- fs := pflag.NewFlagSet("", pflag.ExitOnError)
- // set the normalize func, similar to k8s.io/component-base/cli//flags.go:InitFlags
- fs.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
- // explicitly add flags from libs that register global flags
- options.AddGlobalFlags(fs)
- return fs
- }
- // newFakeFlagSet constructs a pflag.FlagSet with the same flags as fs, but where
- // all values have noop Set implementations
- func newFakeFlagSet(fs *pflag.FlagSet) *pflag.FlagSet {
- ret := pflag.NewFlagSet("", pflag.ExitOnError)
- ret.SetNormalizeFunc(fs.GetNormalizeFunc())
- fs.VisitAll(func(f *pflag.Flag) {
- ret.VarP(cliflag.NoOp{}, f.Name, f.Shorthand, f.Usage)
- })
- return ret
- }
- // kubeletConfigFlagPrecedence re-parses flags over the KubeletConfiguration object.
- // We must enforce flag precedence by re-parsing the command line into the new object.
- // This is necessary to preserve backwards-compatibility across binary upgrades.
- // See issue #56171 for more details.
- func kubeletConfigFlagPrecedence(kc *kubeletconfiginternal.KubeletConfiguration, args []string) error {
- // We use a throwaway kubeletFlags and a fake global flagset to avoid double-parses,
- // as some Set implementations accumulate values from multiple flag invocations.
- fs := newFakeFlagSet(newFlagSetWithGlobals())
- // register throwaway KubeletFlags
- options.NewKubeletFlags().AddFlags(fs)
- // register new KubeletConfiguration
- options.AddKubeletConfigFlags(fs, kc)
- // Remember original feature gates, so we can merge with flag gates later
- original := kc.FeatureGates
- // re-parse flags
- if err := fs.Parse(args); err != nil {
- return err
- }
- // Add back feature gates that were set in the original kc, but not in flags
- for k, v := range original {
- if _, ok := kc.FeatureGates[k]; !ok {
- kc.FeatureGates[k] = v
- }
- }
- return nil
- }
- func loadConfigFile(name string) (*kubeletconfiginternal.KubeletConfiguration, error) {
- const errFmt = "failed to load Kubelet config file %s, error %v"
- // compute absolute path based on current working dir
- kubeletConfigFile, err := filepath.Abs(name)
- if err != nil {
- return nil, fmt.Errorf(errFmt, name, err)
- }
- loader, err := configfiles.NewFsLoader(utilfs.DefaultFs{}, kubeletConfigFile)
- if err != nil {
- return nil, fmt.Errorf(errFmt, name, err)
- }
- kc, err := loader.Load()
- if err != nil {
- return nil, fmt.Errorf(errFmt, name, err)
- }
- return kc, err
- }
- // UnsecuredDependencies returns a Dependencies suitable for being run, or an error if the server setup
- // is not valid. It will not start any background processes, and does not include authentication/authorization
- func UnsecuredDependencies(s *options.KubeletServer, featureGate featuregate.FeatureGate) (*kubelet.Dependencies, error) {
- // Initialize the TLS Options
- tlsOptions, err := InitializeTLS(&s.KubeletFlags, &s.KubeletConfiguration)
- if err != nil {
- return nil, err
- }
- mounter := mount.New(s.ExperimentalMounterPath)
- subpather := subpath.New(mounter)
- hu := hostutil.NewHostUtil()
- var pluginRunner = exec.New()
- var dockerClientConfig *dockershim.ClientConfig
- if s.ContainerRuntime == kubetypes.DockerContainerRuntime {
- dockerClientConfig = &dockershim.ClientConfig{
- DockerEndpoint: s.DockerEndpoint,
- RuntimeRequestTimeout: s.RuntimeRequestTimeout.Duration,
- ImagePullProgressDeadline: s.ImagePullProgressDeadline.Duration,
- }
- }
- plugins, err := ProbeVolumePlugins(featureGate)
- if err != nil {
- return nil, err
- }
- return &kubelet.Dependencies{
- Auth: nil, // default does not enforce auth[nz]
- CAdvisorInterface: nil, // cadvisor.New launches background processes (bg http.ListenAndServe, and some bg cleaners), not set here
- Cloud: nil, // cloud provider might start background processes
- ContainerManager: nil,
- DockerClientConfig: dockerClientConfig,
- KubeClient: nil,
- HeartbeatClient: nil,
- EventClient: nil,
- HostUtil: hu,
- Mounter: mounter,
- Subpather: subpather,
- OOMAdjuster: oom.NewOOMAdjuster(),
- OSInterface: kubecontainer.RealOS{},
- VolumePlugins: plugins,
- DynamicPluginProber: GetDynamicPluginProber(s.VolumePluginDir, pluginRunner),
- TLSOptions: tlsOptions}, nil
- }
- // Run runs the specified KubeletServer with the given Dependencies. This should never exit.
- // The kubeDeps argument may be nil - if so, it is initialized from the settings on KubeletServer.
- // Otherwise, the caller is assumed to have set up the Dependencies object and a default one will
- // not be generated.
- func Run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate, stopCh <-chan struct{}) error {
- // To help debugging, immediately log version
- klog.Infof("Version: %+v", version.Get())
- if err := initForOS(s.KubeletFlags.WindowsService); err != nil {
- return fmt.Errorf("failed OS init: %v", err)
- }
- if err := run(s, kubeDeps, featureGate, stopCh); err != nil {
- return fmt.Errorf("failed to run Kubelet: %v", err)
- }
- return nil
- }
- func checkPermissions() error {
- if uid := os.Getuid(); uid != 0 {
- return fmt.Errorf("kubelet needs to run as uid `0`. It is being run as %d", uid)
- }
- // TODO: Check if kubelet is running in the `initial` user namespace.
- // http://man7.org/linux/man-pages/man7/user_namespaces.7.html
- return nil
- }
- func setConfigz(cz *configz.Config, kc *kubeletconfiginternal.KubeletConfiguration) error {
- scheme, _, err := kubeletscheme.NewSchemeAndCodecs()
- if err != nil {
- return err
- }
- versioned := kubeletconfigv1beta1.KubeletConfiguration{}
- if err := scheme.Convert(kc, &versioned, nil); err != nil {
- return err
- }
- cz.Set(versioned)
- return nil
- }
- func initConfigz(kc *kubeletconfiginternal.KubeletConfiguration) error {
- cz, err := configz.New("kubeletconfig")
- if err != nil {
- klog.Errorf("unable to register configz: %s", err)
- return err
- }
- if err := setConfigz(cz, kc); err != nil {
- klog.Errorf("unable to register config: %s", err)
- return err
- }
- return nil
- }
- // makeEventRecorder sets up kubeDeps.Recorder if it's nil. It's a no-op otherwise.
- func makeEventRecorder(kubeDeps *kubelet.Dependencies, nodeName types.NodeName) {
- if kubeDeps.Recorder != nil {
- return
- }
- eventBroadcaster := record.NewBroadcaster()
- kubeDeps.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: componentKubelet, Host: string(nodeName)})
- eventBroadcaster.StartLogging(klog.V(3).Infof)
- if kubeDeps.EventClient != nil {
- klog.V(4).Infof("Sending events to api server.")
- eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeDeps.EventClient.Events("")})
- } else {
- klog.Warning("No api server defined - no events will be sent to API server.")
- }
- }
- func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate, stopCh <-chan struct{}) (err error) {
- // Set global feature gates based on the value on the initial KubeletServer
- err = utilfeature.DefaultMutableFeatureGate.SetFromMap(s.KubeletConfiguration.FeatureGates)
- if err != nil {
- return err
- }
- // validate the initial KubeletServer (we set feature gates first, because this validation depends on feature gates)
- if err := options.ValidateKubeletServer(s); err != nil {
- return err
- }
- // Obtain Kubelet Lock File
- if s.ExitOnLockContention && s.LockFilePath == "" {
- return errors.New("cannot exit on lock file contention: no lock file specified")
- }
- done := make(chan struct{})
- if s.LockFilePath != "" {
- klog.Infof("acquiring file lock on %q", s.LockFilePath)
- if err := flock.Acquire(s.LockFilePath); err != nil {
- return fmt.Errorf("unable to acquire file lock on %q: %v", s.LockFilePath, err)
- }
- if s.ExitOnLockContention {
- klog.Infof("watching for inotify events for: %v", s.LockFilePath)
- if err := watchForLockfileContention(s.LockFilePath, done); err != nil {
- return err
- }
- }
- }
- // Register current configuration with /configz endpoint
- err = initConfigz(&s.KubeletConfiguration)
- if err != nil {
- klog.Errorf("unable to register KubeletConfiguration with configz, error: %v", err)
- }
- // About to get clients and such, detect standaloneMode
- standaloneMode := true
- if len(s.KubeConfig) > 0 {
- standaloneMode = false
- }
- if kubeDeps == nil {
- kubeDeps, err = UnsecuredDependencies(s, featureGate)
- if err != nil {
- return err
- }
- }
- if kubeDeps.Cloud == nil {
- if !cloudprovider.IsExternal(s.CloudProvider) {
- cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
- if err != nil {
- return err
- }
- if cloud == nil {
- klog.V(2).Infof("No cloud provider specified: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile)
- } else {
- klog.V(2).Infof("Successfully initialized cloud provider: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile)
- }
- kubeDeps.Cloud = cloud
- }
- }
- hostName, err := nodeutil.GetHostname(s.HostnameOverride)
- if err != nil {
- return err
- }
- nodeName, err := getNodeName(kubeDeps.Cloud, hostName)
- if err != nil {
- return err
- }
- // if in standalone mode, indicate as much by setting all clients to nil
- switch {
- case standaloneMode:
- kubeDeps.KubeClient = nil
- kubeDeps.EventClient = nil
- kubeDeps.HeartbeatClient = nil
- klog.Warningf("standalone mode, no API client")
- case kubeDeps.KubeClient == nil, kubeDeps.EventClient == nil, kubeDeps.HeartbeatClient == nil:
- clientConfig, closeAllConns, err := buildKubeletClientConfig(s, nodeName)
- if err != nil {
- return err
- }
- if closeAllConns == nil {
- return errors.New("closeAllConns must be a valid function other than nil")
- }
- kubeDeps.OnHeartbeatFailure = closeAllConns
- kubeDeps.KubeClient, err = clientset.NewForConfig(clientConfig)
- if err != nil {
- return fmt.Errorf("failed to initialize kubelet client: %v", err)
- }
- // make a separate client for events
- eventClientConfig := *clientConfig
- eventClientConfig.QPS = float32(s.EventRecordQPS)
- eventClientConfig.Burst = int(s.EventBurst)
- kubeDeps.EventClient, err = v1core.NewForConfig(&eventClientConfig)
- if err != nil {
- return fmt.Errorf("failed to initialize kubelet event client: %v", err)
- }
- // make a separate client for heartbeat with throttling disabled and a timeout attached
- heartbeatClientConfig := *clientConfig
- heartbeatClientConfig.Timeout = s.KubeletConfiguration.NodeStatusUpdateFrequency.Duration
- // The timeout is the minimum of the lease duration and status update frequency
- leaseTimeout := time.Duration(s.KubeletConfiguration.NodeLeaseDurationSeconds) * time.Second
- if heartbeatClientConfig.Timeout > leaseTimeout {
- heartbeatClientConfig.Timeout = leaseTimeout
- }
- heartbeatClientConfig.QPS = float32(-1)
- kubeDeps.HeartbeatClient, err = clientset.NewForConfig(&heartbeatClientConfig)
- if err != nil {
- return fmt.Errorf("failed to initialize kubelet heartbeat client: %v", err)
- }
- }
- if kubeDeps.Auth == nil {
- auth, err := BuildAuth(nodeName, kubeDeps.KubeClient, s.KubeletConfiguration)
- if err != nil {
- return err
- }
- kubeDeps.Auth = auth
- }
- var cgroupRoots []string
- cgroupRoots = append(cgroupRoots, cm.NodeAllocatableRoot(s.CgroupRoot, s.CgroupDriver))
- kubeletCgroup, err := cm.GetKubeletContainer(s.KubeletCgroups)
- if err != nil {
- klog.Warningf("failed to get the kubelet's cgroup: %v. Kubelet system container metrics may be missing.", err)
- } else if kubeletCgroup != "" {
- cgroupRoots = append(cgroupRoots, kubeletCgroup)
- }
- runtimeCgroup, err := cm.GetRuntimeContainer(s.ContainerRuntime, s.RuntimeCgroups)
- if err != nil {
- klog.Warningf("failed to get the container runtime's cgroup: %v. Runtime system container metrics may be missing.", err)
- } else if runtimeCgroup != "" {
- // RuntimeCgroups is optional, so ignore if it isn't specified
- cgroupRoots = append(cgroupRoots, runtimeCgroup)
- }
- if s.SystemCgroups != "" {
- // SystemCgroups is optional, so ignore if it isn't specified
- cgroupRoots = append(cgroupRoots, s.SystemCgroups)
- }
- if kubeDeps.CAdvisorInterface == nil {
- imageFsInfoProvider := cadvisor.NewImageFsInfoProvider(s.ContainerRuntime, s.RemoteRuntimeEndpoint)
- kubeDeps.CAdvisorInterface, err = cadvisor.New(imageFsInfoProvider, s.RootDirectory, cgroupRoots, cadvisor.UsingLegacyCadvisorStats(s.ContainerRuntime, s.RemoteRuntimeEndpoint))
- if err != nil {
- return err
- }
- }
- // Setup event recorder if required.
- makeEventRecorder(kubeDeps, nodeName)
- if kubeDeps.ContainerManager == nil {
- if s.CgroupsPerQOS && s.CgroupRoot == "" {
- klog.Info("--cgroups-per-qos enabled, but --cgroup-root was not specified. defaulting to /")
- s.CgroupRoot = "/"
- }
- var reservedSystemCPUs cpuset.CPUSet
- var errParse error
- if s.ReservedSystemCPUs != "" {
- reservedSystemCPUs, errParse = cpuset.Parse(s.ReservedSystemCPUs)
- if errParse != nil {
- // invalid cpu list is provided, set reservedSystemCPUs to empty, so it won't overwrite kubeReserved/systemReserved
- klog.Infof("Invalid ReservedSystemCPUs \"%s\"", s.ReservedSystemCPUs)
- return errParse
- }
- // is it safe do use CAdvisor here ??
- machineInfo, err := kubeDeps.CAdvisorInterface.MachineInfo()
- if err != nil {
- // if can't use CAdvisor here, fall back to non-explicit cpu list behavor
- klog.Warning("Failed to get MachineInfo, set reservedSystemCPUs to empty")
- reservedSystemCPUs = cpuset.NewCPUSet()
- } else {
- reservedList := reservedSystemCPUs.ToSlice()
- first := reservedList[0]
- last := reservedList[len(reservedList)-1]
- if first < 0 || last >= machineInfo.NumCores {
- // the specified cpuset is outside of the range of what the machine has
- klog.Infof("Invalid cpuset specified by --reserved-cpus")
- return fmt.Errorf("Invalid cpuset %q specified by --reserved-cpus", s.ReservedSystemCPUs)
- }
- }
- } else {
- reservedSystemCPUs = cpuset.NewCPUSet()
- }
- if reservedSystemCPUs.Size() > 0 {
- // at cmd option valication phase it is tested either --system-reserved-cgroup or --kube-reserved-cgroup is specified, so overwrite should be ok
- klog.Infof("Option --reserved-cpus is specified, it will overwrite the cpu setting in KubeReserved=\"%v\", SystemReserved=\"%v\".", s.KubeReserved, s.SystemReserved)
- if s.KubeReserved != nil {
- delete(s.KubeReserved, "cpu")
- }
- if s.SystemReserved == nil {
- s.SystemReserved = make(map[string]string)
- }
- s.SystemReserved["cpu"] = strconv.Itoa(reservedSystemCPUs.Size())
- klog.Infof("After cpu setting is overwritten, KubeReserved=\"%v\", SystemReserved=\"%v\"", s.KubeReserved, s.SystemReserved)
- }
- kubeReserved, err := parseResourceList(s.KubeReserved)
- if err != nil {
- return err
- }
- systemReserved, err := parseResourceList(s.SystemReserved)
- if err != nil {
- return err
- }
- var hardEvictionThresholds []evictionapi.Threshold
- // If the user requested to ignore eviction thresholds, then do not set valid values for hardEvictionThresholds here.
- if !s.ExperimentalNodeAllocatableIgnoreEvictionThreshold {
- hardEvictionThresholds, err = eviction.ParseThresholdConfig([]string{}, s.EvictionHard, nil, nil, nil)
- if err != nil {
- return err
- }
- }
- experimentalQOSReserved, err := cm.ParseQOSReserved(s.QOSReserved)
- if err != nil {
- return err
- }
- devicePluginEnabled := utilfeature.DefaultFeatureGate.Enabled(features.DevicePlugins)
- kubeDeps.ContainerManager, err = cm.NewContainerManager(
- kubeDeps.Mounter,
- kubeDeps.CAdvisorInterface,
- cm.NodeConfig{
- RuntimeCgroupsName: s.RuntimeCgroups,
- SystemCgroupsName: s.SystemCgroups,
- KubeletCgroupsName: s.KubeletCgroups,
- ContainerRuntime: s.ContainerRuntime,
- CgroupsPerQOS: s.CgroupsPerQOS,
- CgroupRoot: s.CgroupRoot,
- CgroupDriver: s.CgroupDriver,
- KubeletRootDir: s.RootDirectory,
- ProtectKernelDefaults: s.ProtectKernelDefaults,
- NodeAllocatableConfig: cm.NodeAllocatableConfig{
- KubeReservedCgroupName: s.KubeReservedCgroup,
- SystemReservedCgroupName: s.SystemReservedCgroup,
- EnforceNodeAllocatable: sets.NewString(s.EnforceNodeAllocatable...),
- KubeReserved: kubeReserved,
- SystemReserved: systemReserved,
- ReservedSystemCPUs: reservedSystemCPUs,
- HardEvictionThresholds: hardEvictionThresholds,
- },
- QOSReserved: *experimentalQOSReserved,
- ExperimentalCPUManagerPolicy: s.CPUManagerPolicy,
- ExperimentalCPUManagerReconcilePeriod: s.CPUManagerReconcilePeriod.Duration,
- ExperimentalPodPidsLimit: s.PodPidsLimit,
- EnforceCPULimits: s.CPUCFSQuota,
- CPUCFSQuotaPeriod: s.CPUCFSQuotaPeriod.Duration,
- ExperimentalTopologyManagerPolicy: s.TopologyManagerPolicy,
- },
- s.FailSwapOn,
- devicePluginEnabled,
- kubeDeps.Recorder)
- if err != nil {
- return err
- }
- }
- if err := checkPermissions(); err != nil {
- klog.Error(err)
- }
- utilruntime.ReallyCrash = s.ReallyCrashForTesting
- // TODO(vmarmol): Do this through container config.
- oomAdjuster := kubeDeps.OOMAdjuster
- if err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil {
- klog.Warning(err)
- }
- err = kubelet.PreInitRuntimeService(&s.KubeletConfiguration,
- kubeDeps, &s.ContainerRuntimeOptions,
- s.ContainerRuntime,
- s.RuntimeCgroups,
- s.RemoteRuntimeEndpoint,
- s.RemoteImageEndpoint,
- s.NonMasqueradeCIDR)
- if err != nil {
- return err
- }
- if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
- return err
- }
- // If the kubelet config controller is available, and dynamic config is enabled, start the config and status sync loops
- if utilfeature.DefaultFeatureGate.Enabled(features.DynamicKubeletConfig) && len(s.DynamicConfigDir.Value()) > 0 &&
- kubeDeps.KubeletConfigController != nil && !standaloneMode && !s.RunOnce {
- if err := kubeDeps.KubeletConfigController.StartSync(kubeDeps.KubeClient, kubeDeps.EventClient, string(nodeName)); err != nil {
- return err
- }
- }
- if s.HealthzPort > 0 {
- mux := http.NewServeMux()
- healthz.InstallHandler(mux)
- go wait.Until(func() {
- err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(int(s.HealthzPort))), mux)
- if err != nil {
- klog.Errorf("Starting healthz server failed: %v", err)
- }
- }, 5*time.Second, wait.NeverStop)
- }
- if s.RunOnce {
- return nil
- }
- // If systemd is used, notify it that we have started
- go daemon.SdNotify(false, "READY=1")
- select {
- case <-done:
- break
- case <-stopCh:
- break
- }
- return nil
- }
- // buildKubeletClientConfig constructs the appropriate client config for the kubelet depending on whether
- // bootstrapping is enabled or client certificate rotation is enabled.
- func buildKubeletClientConfig(s *options.KubeletServer, nodeName types.NodeName) (*restclient.Config, func(), error) {
- if s.RotateCertificates && utilfeature.DefaultFeatureGate.Enabled(features.RotateKubeletClientCertificate) {
- // Rules for client rotation and the handling of kube config files:
- //
- // 1. If the client provides only a kubeconfig file, we must use that as the initial client
- // kubeadm needs the initial data in the kubeconfig to be placed into the cert store
- // 2. If the client provides only an initial bootstrap kubeconfig file, we must create a
- // kubeconfig file at the target location that points to the cert store, but until
- // the file is present the client config will have no certs
- // 3. If the client provides both and the kubeconfig is valid, we must ignore the bootstrap
- // kubeconfig.
- // 4. If the client provides both and the kubeconfig is expired or otherwise invalid, we must
- // replace the kubeconfig with a new file that points to the cert dir
- //
- // The desired configuration for bootstrapping is to use a bootstrap kubeconfig and to have
- // the kubeconfig file be managed by this process. For backwards compatibility with kubeadm,
- // which provides a high powered kubeconfig on the master with cert/key data, we must
- // bootstrap the cert manager with the contents of the initial client config.
- klog.Infof("Client rotation is on, will bootstrap in background")
- certConfig, clientConfig, err := bootstrap.LoadClientConfig(s.KubeConfig, s.BootstrapKubeconfig, s.CertDirectory)
- if err != nil {
- return nil, nil, err
- }
- // use the correct content type for cert rotation, but don't set QPS
- setContentTypeForClient(certConfig, s.ContentType)
- kubeClientConfigOverrides(s, clientConfig)
- clientCertificateManager, err := buildClientCertificateManager(certConfig, clientConfig, s.CertDirectory, nodeName)
- if err != nil {
- return nil, nil, err
- }
- // the rotating transport will use the cert from the cert manager instead of these files
- transportConfig := restclient.AnonymousClientConfig(clientConfig)
- // we set exitAfter to five minutes because we use this client configuration to request new certs - if we are unable
- // to request new certs, we will be unable to continue normal operation. Exiting the process allows a wrapper
- // or the bootstrapping credentials to potentially lay down new initial config.
- closeAllConns, err := kubeletcertificate.UpdateTransport(wait.NeverStop, transportConfig, clientCertificateManager, 5*time.Minute)
- if err != nil {
- return nil, nil, err
- }
- klog.V(2).Info("Starting client certificate rotation.")
- clientCertificateManager.Start()
- return transportConfig, closeAllConns, nil
- }
- if len(s.BootstrapKubeconfig) > 0 {
- if err := bootstrap.LoadClientCert(s.KubeConfig, s.BootstrapKubeconfig, s.CertDirectory, nodeName); err != nil {
- return nil, nil, err
- }
- }
- clientConfig, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
- &clientcmd.ClientConfigLoadingRules{ExplicitPath: s.KubeConfig},
- &clientcmd.ConfigOverrides{},
- ).ClientConfig()
- if err != nil {
- return nil, nil, fmt.Errorf("invalid kubeconfig: %v", err)
- }
- kubeClientConfigOverrides(s, clientConfig)
- closeAllConns, err := updateDialer(clientConfig)
- if err != nil {
- return nil, nil, err
- }
- return clientConfig, closeAllConns, nil
- }
- // updateDialer instruments a restconfig with a dial. the returned function allows forcefully closing all active connections.
- func updateDialer(clientConfig *restclient.Config) (func(), error) {
- if clientConfig.Transport != nil || clientConfig.Dial != nil {
- return nil, fmt.Errorf("there is already a transport or dialer configured")
- }
- d := connrotation.NewDialer((&net.Dialer{Timeout: 30 * time.Second, KeepAlive: 30 * time.Second}).DialContext)
- clientConfig.Dial = d.DialContext
- return d.CloseAll, nil
- }
- // buildClientCertificateManager creates a certificate manager that will use certConfig to request a client certificate
- // if no certificate is available, or the most recent clientConfig (which is assumed to point to the cert that the manager will
- // write out).
- func buildClientCertificateManager(certConfig, clientConfig *restclient.Config, certDir string, nodeName types.NodeName) (certificate.Manager, error) {
- newClientFn := func(current *tls.Certificate) (certificatesclient.CertificateSigningRequestInterface, error) {
- // If we have a valid certificate, use that to fetch CSRs. Otherwise use the bootstrap
- // credentials. In the future it would be desirable to change the behavior of bootstrap
- // to always fall back to the external bootstrap credentials when such credentials are
- // provided by a fundamental trust system like cloud VM identity or an HSM module.
- config := certConfig
- if current != nil {
- config = clientConfig
- }
- client, err := clientset.NewForConfig(config)
- if err != nil {
- return nil, err
- }
- return client.CertificatesV1beta1().CertificateSigningRequests(), nil
- }
- return kubeletcertificate.NewKubeletClientCertificateManager(
- certDir,
- nodeName,
- // this preserves backwards compatibility with kubeadm which passes
- // a high powered certificate to the kubelet as --kubeconfig and expects
- // it to be rotated out immediately
- clientConfig.CertData,
- clientConfig.KeyData,
- clientConfig.CertFile,
- clientConfig.KeyFile,
- newClientFn,
- )
- }
- func kubeClientConfigOverrides(s *options.KubeletServer, clientConfig *restclient.Config) {
- setContentTypeForClient(clientConfig, s.ContentType)
- // Override kubeconfig qps/burst settings from flags
- clientConfig.QPS = float32(s.KubeAPIQPS)
- clientConfig.Burst = int(s.KubeAPIBurst)
- }
- // getNodeName returns the node name according to the cloud provider
- // if cloud provider is specified. Otherwise, returns the hostname of the node.
- func getNodeName(cloud cloudprovider.Interface, hostname string) (types.NodeName, error) {
- if cloud == nil {
- return types.NodeName(hostname), nil
- }
- instances, ok := cloud.Instances()
- if !ok {
- return "", fmt.Errorf("failed to get instances from cloud provider")
- }
- nodeName, err := instances.CurrentNodeName(context.TODO(), hostname)
- if err != nil {
- return "", fmt.Errorf("error fetching current node name from cloud provider: %v", err)
- }
- klog.V(2).Infof("cloud provider determined current node name to be %s", nodeName)
- return nodeName, nil
- }
- // InitializeTLS checks for a configured TLSCertFile and TLSPrivateKeyFile: if unspecified a new self-signed
- // certificate and key file are generated. Returns a configured server.TLSOptions object.
- func InitializeTLS(kf *options.KubeletFlags, kc *kubeletconfiginternal.KubeletConfiguration) (*server.TLSOptions, error) {
- if !kc.ServerTLSBootstrap && kc.TLSCertFile == "" && kc.TLSPrivateKeyFile == "" {
- kc.TLSCertFile = path.Join(kf.CertDirectory, "kubelet.crt")
- kc.TLSPrivateKeyFile = path.Join(kf.CertDirectory, "kubelet.key")
- canReadCertAndKey, err := certutil.CanReadCertAndKey(kc.TLSCertFile, kc.TLSPrivateKeyFile)
- if err != nil {
- return nil, err
- }
- if !canReadCertAndKey {
- hostName, err := nodeutil.GetHostname(kf.HostnameOverride)
- if err != nil {
- return nil, err
- }
- cert, key, err := certutil.GenerateSelfSignedCertKey(hostName, nil, nil)
- if err != nil {
- return nil, fmt.Errorf("unable to generate self signed cert: %v", err)
- }
- if err := certutil.WriteCert(kc.TLSCertFile, cert); err != nil {
- return nil, err
- }
- if err := keyutil.WriteKey(kc.TLSPrivateKeyFile, key); err != nil {
- return nil, err
- }
- klog.V(4).Infof("Using self-signed cert (%s, %s)", kc.TLSCertFile, kc.TLSPrivateKeyFile)
- }
- }
- tlsCipherSuites, err := cliflag.TLSCipherSuites(kc.TLSCipherSuites)
- if err != nil {
- return nil, err
- }
- minTLSVersion, err := cliflag.TLSVersion(kc.TLSMinVersion)
- if err != nil {
- return nil, err
- }
- tlsOptions := &server.TLSOptions{
- Config: &tls.Config{
- MinVersion: minTLSVersion,
- CipherSuites: tlsCipherSuites,
- },
- CertFile: kc.TLSCertFile,
- KeyFile: kc.TLSPrivateKeyFile,
- }
- if len(kc.Authentication.X509.ClientCAFile) > 0 {
- clientCAs, err := certutil.NewPool(kc.Authentication.X509.ClientCAFile)
- if err != nil {
- return nil, fmt.Errorf("unable to load client CA file %s: %v", kc.Authentication.X509.ClientCAFile, err)
- }
- // Specify allowed CAs for client certificates
- tlsOptions.Config.ClientCAs = clientCAs
- // Populate PeerCertificates in requests, but don't reject connections without verified certificates
- tlsOptions.Config.ClientAuth = tls.RequestClientCert
- }
- return tlsOptions, nil
- }
- // setContentTypeForClient sets the appropritae content type into the rest config
- // and handles defaulting AcceptContentTypes based on that input.
- func setContentTypeForClient(cfg *restclient.Config, contentType string) {
- if len(contentType) == 0 {
- return
- }
- cfg.ContentType = contentType
- switch contentType {
- case runtime.ContentTypeProtobuf:
- cfg.AcceptContentTypes = strings.Join([]string{runtime.ContentTypeProtobuf, runtime.ContentTypeJSON}, ",")
- default:
- // otherwise let the rest client perform defaulting
- }
- }
- // RunKubelet is responsible for setting up and running a kubelet. It is used in three different applications:
- // 1 Integration tests
- // 2 Kubelet binary
- // 3 Standalone 'kubernetes' binary
- // Eventually, #2 will be replaced with instances of #3
- func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
- hostname, err := nodeutil.GetHostname(kubeServer.HostnameOverride)
- if err != nil {
- return err
- }
- // Query the cloud provider for our node name, default to hostname if kubeDeps.Cloud == nil
- nodeName, err := getNodeName(kubeDeps.Cloud, hostname)
- if err != nil {
- return err
- }
- // Setup event recorder if required.
- makeEventRecorder(kubeDeps, nodeName)
- capabilities.Initialize(capabilities.Capabilities{
- AllowPrivileged: true,
- })
- credentialprovider.SetPreferredDockercfgPath(kubeServer.RootDirectory)
- klog.V(2).Infof("Using root directory: %v", kubeServer.RootDirectory)
- if kubeDeps.OSInterface == nil {
- kubeDeps.OSInterface = kubecontainer.RealOS{}
- }
- k, err := createAndInitKubelet(&kubeServer.KubeletConfiguration,
- kubeDeps,
- &kubeServer.ContainerRuntimeOptions,
- kubeServer.ContainerRuntime,
- kubeServer.HostnameOverride,
- kubeServer.NodeIP,
- kubeServer.ProviderID,
- kubeServer.CloudProvider,
- kubeServer.CertDirectory,
- kubeServer.RootDirectory,
- kubeServer.RegisterNode,
- kubeServer.RegisterWithTaints,
- kubeServer.AllowedUnsafeSysctls,
- kubeServer.ExperimentalMounterPath,
- kubeServer.ExperimentalKernelMemcgNotification,
- kubeServer.ExperimentalCheckNodeCapabilitiesBeforeMount,
- kubeServer.ExperimentalNodeAllocatableIgnoreEvictionThreshold,
- kubeServer.MinimumGCAge,
- kubeServer.MaxPerPodContainerCount,
- kubeServer.MaxContainerCount,
- kubeServer.MasterServiceNamespace,
- kubeServer.RegisterSchedulable,
- kubeServer.KeepTerminatedPodVolumes,
- kubeServer.NodeLabels,
- kubeServer.SeccompProfileRoot,
- kubeServer.BootstrapCheckpointPath,
- kubeServer.NodeStatusMaxImages)
- if err != nil {
- return fmt.Errorf("failed to create kubelet: %v", err)
- }
- // NewMainKubelet should have set up a pod source config if one didn't exist
- // when the builder was run. This is just a precaution.
- if kubeDeps.PodConfig == nil {
- return fmt.Errorf("failed to create kubelet, pod source config was nil")
- }
- podCfg := kubeDeps.PodConfig
- rlimit.RlimitNumFiles(uint64(kubeServer.MaxOpenFiles))
- // process pods and exit.
- if runOnce {
- if _, err := k.RunOnce(podCfg.Updates()); err != nil {
- return fmt.Errorf("runonce failed: %v", err)
- }
- klog.Info("Started kubelet as runonce")
- } else {
- startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableCAdvisorJSONEndpoints, kubeServer.EnableServer)
- klog.Info("Started kubelet")
- }
- return nil
- }
- func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableCAdvisorJSONEndpoints, enableServer bool) {
- // start the kubelet
- go wait.Until(func() {
- k.Run(podCfg.Updates())
- }, 0, wait.NeverStop)
- // start the kubelet server
- if enableServer {
- go k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, enableCAdvisorJSONEndpoints, kubeCfg.EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling)
- }
- if kubeCfg.ReadOnlyPort > 0 {
- go k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort), enableCAdvisorJSONEndpoints)
- }
- if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPodResources) {
- go k.ListenAndServePodResources()
- }
- }
- func createAndInitKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
- kubeDeps *kubelet.Dependencies,
- crOptions *config.ContainerRuntimeOptions,
- containerRuntime string,
- hostnameOverride string,
- nodeIP string,
- providerID string,
- cloudProvider string,
- certDirectory string,
- rootDirectory string,
- registerNode bool,
- registerWithTaints []api.Taint,
- allowedUnsafeSysctls []string,
- experimentalMounterPath string,
- experimentalKernelMemcgNotification bool,
- experimentalCheckNodeCapabilitiesBeforeMount bool,
- experimentalNodeAllocatableIgnoreEvictionThreshold bool,
- minimumGCAge metav1.Duration,
- maxPerPodContainerCount int32,
- maxContainerCount int32,
- masterServiceNamespace string,
- registerSchedulable bool,
- keepTerminatedPodVolumes bool,
- nodeLabels map[string]string,
- seccompProfileRoot string,
- bootstrapCheckpointPath string,
- nodeStatusMaxImages int32) (k kubelet.Bootstrap, err error) {
- // TODO: block until all sources have delivered at least one update to the channel, or break the sync loop
- // up into "per source" synchronizations
- k, err = kubelet.NewMainKubelet(kubeCfg,
- kubeDeps,
- crOptions,
- containerRuntime,
- hostnameOverride,
- nodeIP,
- providerID,
- cloudProvider,
- certDirectory,
- rootDirectory,
- registerNode,
- registerWithTaints,
- allowedUnsafeSysctls,
- experimentalMounterPath,
- experimentalKernelMemcgNotification,
- experimentalCheckNodeCapabilitiesBeforeMount,
- experimentalNodeAllocatableIgnoreEvictionThreshold,
- minimumGCAge,
- maxPerPodContainerCount,
- maxContainerCount,
- masterServiceNamespace,
- registerSchedulable,
- keepTerminatedPodVolumes,
- nodeLabels,
- seccompProfileRoot,
- bootstrapCheckpointPath,
- nodeStatusMaxImages)
- if err != nil {
- return nil, err
- }
- k.BirthCry()
- k.StartGarbageCollection()
- return k, nil
- }
- // parseResourceList parses the given configuration map into an API
- // ResourceList or returns an error.
- func parseResourceList(m map[string]string) (v1.ResourceList, error) {
- if len(m) == 0 {
- return nil, nil
- }
- rl := make(v1.ResourceList)
- for k, v := range m {
- switch v1.ResourceName(k) {
- // CPU, memory, local storage, and PID resources are supported.
- case v1.ResourceCPU, v1.ResourceMemory, v1.ResourceEphemeralStorage, pidlimit.PIDs:
- if v1.ResourceName(k) != pidlimit.PIDs || utilfeature.DefaultFeatureGate.Enabled(features.SupportNodePidsLimit) {
- q, err := resource.ParseQuantity(v)
- if err != nil {
- return nil, err
- }
- if q.Sign() == -1 {
- return nil, fmt.Errorf("resource quantity for %q cannot be negative: %v", k, v)
- }
- rl[v1.ResourceName(k)] = q
- }
- default:
- return nil, fmt.Errorf("cannot reserve %q resource", k)
- }
- }
- return rl, nil
- }
- // BootstrapKubeletConfigController constructs and bootstrap a configuration controller
- func BootstrapKubeletConfigController(dynamicConfigDir string, transform dynamickubeletconfig.TransformFunc) (*kubeletconfiginternal.KubeletConfiguration, *dynamickubeletconfig.Controller, error) {
- if !utilfeature.DefaultFeatureGate.Enabled(features.DynamicKubeletConfig) {
- return nil, nil, fmt.Errorf("failed to bootstrap Kubelet config controller, you must enable the DynamicKubeletConfig feature gate")
- }
- if len(dynamicConfigDir) == 0 {
- return nil, nil, fmt.Errorf("cannot bootstrap Kubelet config controller, --dynamic-config-dir was not provided")
- }
- // compute absolute path and bootstrap controller
- dir, err := filepath.Abs(dynamicConfigDir)
- if err != nil {
- return nil, nil, fmt.Errorf("failed to get absolute path for --dynamic-config-dir=%s", dynamicConfigDir)
- }
- // get the latest KubeletConfiguration checkpoint from disk, or return the default config if no valid checkpoints exist
- c := dynamickubeletconfig.NewController(dir, transform)
- kc, err := c.Bootstrap()
- if err != nil {
- return nil, nil, fmt.Errorf("failed to determine a valid configuration, error: %v", err)
- }
- return kc, c, nil
- }
- // RunDockershim only starts the dockershim in current process. This is only used for cri validate testing purpose
- // TODO(random-liu): Move this to a separate binary.
- func RunDockershim(f *options.KubeletFlags, c *kubeletconfiginternal.KubeletConfiguration, stopCh <-chan struct{}) error {
- r := &f.ContainerRuntimeOptions
- // Initialize docker client configuration.
- dockerClientConfig := &dockershim.ClientConfig{
- DockerEndpoint: r.DockerEndpoint,
- RuntimeRequestTimeout: c.RuntimeRequestTimeout.Duration,
- ImagePullProgressDeadline: r.ImagePullProgressDeadline.Duration,
- }
- // Initialize network plugin settings.
- pluginSettings := dockershim.NetworkPluginSettings{
- HairpinMode: kubeletconfiginternal.HairpinMode(c.HairpinMode),
- NonMasqueradeCIDR: f.NonMasqueradeCIDR,
- PluginName: r.NetworkPluginName,
- PluginConfDir: r.CNIConfDir,
- PluginBinDirString: r.CNIBinDir,
- PluginCacheDir: r.CNICacheDir,
- MTU: int(r.NetworkPluginMTU),
- }
- // Initialize streaming configuration. (Not using TLS now)
- streamingConfig := &streaming.Config{
- // Use a relative redirect (no scheme or host).
- BaseURL: &url.URL{Path: "/cri/"},
- StreamIdleTimeout: c.StreamingConnectionIdleTimeout.Duration,
- StreamCreationTimeout: streaming.DefaultConfig.StreamCreationTimeout,
- SupportedRemoteCommandProtocols: streaming.DefaultConfig.SupportedRemoteCommandProtocols,
- SupportedPortForwardProtocols: streaming.DefaultConfig.SupportedPortForwardProtocols,
- }
- // Standalone dockershim will always start the local streaming server.
- ds, err := dockershim.NewDockerService(dockerClientConfig, r.PodSandboxImage, streamingConfig, &pluginSettings,
- f.RuntimeCgroups, c.CgroupDriver, r.DockershimRootDirectory, true /*startLocalStreamingServer*/)
- if err != nil {
- return err
- }
- klog.V(2).Infof("Starting the GRPC server for the docker CRI shim.")
- server := dockerremote.NewDockerServer(f.RemoteRuntimeEndpoint, ds)
- if err := server.Start(); err != nil {
- return err
- }
- <-stopCh
- return nil
- }
|