server.go 46 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // Package app makes it easy to create a kubelet server for various contexts.
  14. package app
  15. import (
  16. "context"
  17. "crypto/tls"
  18. "errors"
  19. "fmt"
  20. "math/rand"
  21. "net"
  22. "net/http"
  23. "net/url"
  24. "os"
  25. "path"
  26. "path/filepath"
  27. "strconv"
  28. "strings"
  29. "time"
  30. "github.com/coreos/go-systemd/daemon"
  31. "github.com/spf13/cobra"
  32. "github.com/spf13/pflag"
  33. "k8s.io/klog"
  34. v1 "k8s.io/api/core/v1"
  35. "k8s.io/apimachinery/pkg/api/resource"
  36. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  37. "k8s.io/apimachinery/pkg/runtime"
  38. "k8s.io/apimachinery/pkg/types"
  39. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  40. "k8s.io/apimachinery/pkg/util/sets"
  41. "k8s.io/apimachinery/pkg/util/wait"
  42. genericapiserver "k8s.io/apiserver/pkg/server"
  43. "k8s.io/apiserver/pkg/server/healthz"
  44. utilfeature "k8s.io/apiserver/pkg/util/feature"
  45. clientset "k8s.io/client-go/kubernetes"
  46. certificatesclient "k8s.io/client-go/kubernetes/typed/certificates/v1beta1"
  47. v1core "k8s.io/client-go/kubernetes/typed/core/v1"
  48. restclient "k8s.io/client-go/rest"
  49. "k8s.io/client-go/tools/clientcmd"
  50. "k8s.io/client-go/tools/record"
  51. certutil "k8s.io/client-go/util/cert"
  52. "k8s.io/client-go/util/certificate"
  53. "k8s.io/client-go/util/connrotation"
  54. "k8s.io/client-go/util/keyutil"
  55. cloudprovider "k8s.io/cloud-provider"
  56. cliflag "k8s.io/component-base/cli/flag"
  57. kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1"
  58. "k8s.io/kubernetes/cmd/kubelet/app/options"
  59. "k8s.io/kubernetes/pkg/api/legacyscheme"
  60. api "k8s.io/kubernetes/pkg/apis/core"
  61. "k8s.io/kubernetes/pkg/capabilities"
  62. "k8s.io/kubernetes/pkg/credentialprovider"
  63. "k8s.io/kubernetes/pkg/features"
  64. "k8s.io/kubernetes/pkg/kubelet"
  65. kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
  66. kubeletscheme "k8s.io/kubernetes/pkg/kubelet/apis/config/scheme"
  67. kubeletconfigvalidation "k8s.io/kubernetes/pkg/kubelet/apis/config/validation"
  68. "k8s.io/kubernetes/pkg/kubelet/cadvisor"
  69. kubeletcertificate "k8s.io/kubernetes/pkg/kubelet/certificate"
  70. "k8s.io/kubernetes/pkg/kubelet/certificate/bootstrap"
  71. "k8s.io/kubernetes/pkg/kubelet/cm"
  72. "k8s.io/kubernetes/pkg/kubelet/config"
  73. kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
  74. "k8s.io/kubernetes/pkg/kubelet/dockershim"
  75. dockerremote "k8s.io/kubernetes/pkg/kubelet/dockershim/remote"
  76. "k8s.io/kubernetes/pkg/kubelet/eviction"
  77. evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
  78. dynamickubeletconfig "k8s.io/kubernetes/pkg/kubelet/kubeletconfig"
  79. "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/configfiles"
  80. "k8s.io/kubernetes/pkg/kubelet/server"
  81. "k8s.io/kubernetes/pkg/kubelet/server/streaming"
  82. "k8s.io/kubernetes/pkg/kubelet/stats/pidlimit"
  83. kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
  84. "k8s.io/kubernetes/pkg/util/configz"
  85. utilfs "k8s.io/kubernetes/pkg/util/filesystem"
  86. utilflag "k8s.io/kubernetes/pkg/util/flag"
  87. "k8s.io/kubernetes/pkg/util/flock"
  88. "k8s.io/kubernetes/pkg/util/mount"
  89. nodeutil "k8s.io/kubernetes/pkg/util/node"
  90. "k8s.io/kubernetes/pkg/util/oom"
  91. "k8s.io/kubernetes/pkg/util/rlimit"
  92. "k8s.io/kubernetes/pkg/version"
  93. "k8s.io/kubernetes/pkg/version/verflag"
  94. nsutil "k8s.io/kubernetes/pkg/volume/util/nsenter"
  95. "k8s.io/kubernetes/pkg/volume/util/subpath"
  96. "k8s.io/utils/exec"
  97. "k8s.io/utils/nsenter"
  98. )
  99. const (
  100. // Kubelet component name
  101. componentKubelet = "kubelet"
  102. )
  103. // NewKubeletCommand creates a *cobra.Command object with default parameters
  104. func NewKubeletCommand() *cobra.Command {
  105. cleanFlagSet := pflag.NewFlagSet(componentKubelet, pflag.ContinueOnError)
  106. cleanFlagSet.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
  107. kubeletFlags := options.NewKubeletFlags()
  108. kubeletConfig, err := options.NewKubeletConfiguration()
  109. // programmer error
  110. if err != nil {
  111. klog.Fatal(err)
  112. }
  113. cmd := &cobra.Command{
  114. Use: componentKubelet,
  115. Long: `The kubelet is the primary "node agent" that runs on each
  116. node. It can register the node with the apiserver using one of: the hostname; a flag to
  117. override the hostname; or specific logic for a cloud provider.
  118. The kubelet works in terms of a PodSpec. A PodSpec is a YAML or JSON object
  119. that describes a pod. The kubelet takes a set of PodSpecs that are provided through
  120. various mechanisms (primarily through the apiserver) and ensures that the containers
  121. described in those PodSpecs are running and healthy. The kubelet doesn't manage
  122. containers which were not created by Kubernetes.
  123. Other than from an PodSpec from the apiserver, there are three ways that a container
  124. manifest can be provided to the Kubelet.
  125. File: Path passed as a flag on the command line. Files under this path will be monitored
  126. periodically for updates. The monitoring period is 20s by default and is configurable
  127. via a flag.
  128. HTTP endpoint: HTTP endpoint passed as a parameter on the command line. This endpoint
  129. is checked every 20 seconds (also configurable with a flag).
  130. HTTP server: The kubelet can also listen for HTTP and respond to a simple API
  131. (underspec'd currently) to submit a new manifest.`,
  132. // The Kubelet has special flag parsing requirements to enforce flag precedence rules,
  133. // so we do all our parsing manually in Run, below.
  134. // DisableFlagParsing=true provides the full set of flags passed to the kubelet in the
  135. // `args` arg to Run, without Cobra's interference.
  136. DisableFlagParsing: true,
  137. Run: func(cmd *cobra.Command, args []string) {
  138. // initial flag parse, since we disable cobra's flag parsing
  139. if err := cleanFlagSet.Parse(args); err != nil {
  140. cmd.Usage()
  141. klog.Fatal(err)
  142. }
  143. // check if there are non-flag arguments in the command line
  144. cmds := cleanFlagSet.Args()
  145. if len(cmds) > 0 {
  146. cmd.Usage()
  147. klog.Fatalf("unknown command: %s", cmds[0])
  148. }
  149. // short-circuit on help
  150. help, err := cleanFlagSet.GetBool("help")
  151. if err != nil {
  152. klog.Fatal(`"help" flag is non-bool, programmer error, please correct`)
  153. }
  154. if help {
  155. cmd.Help()
  156. return
  157. }
  158. // short-circuit on verflag
  159. verflag.PrintAndExitIfRequested()
  160. utilflag.PrintFlags(cleanFlagSet)
  161. // set feature gates from initial flags-based config
  162. if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
  163. klog.Fatal(err)
  164. }
  165. // validate the initial KubeletFlags
  166. if err := options.ValidateKubeletFlags(kubeletFlags); err != nil {
  167. klog.Fatal(err)
  168. }
  169. if kubeletFlags.ContainerRuntime == "remote" && cleanFlagSet.Changed("pod-infra-container-image") {
  170. klog.Warning("Warning: For remote container runtime, --pod-infra-container-image is ignored in kubelet, which should be set in that remote runtime instead")
  171. }
  172. // load kubelet config file, if provided
  173. if configFile := kubeletFlags.KubeletConfigFile; len(configFile) > 0 {
  174. kubeletConfig, err = loadConfigFile(configFile)
  175. if err != nil {
  176. klog.Fatal(err)
  177. }
  178. // We must enforce flag precedence by re-parsing the command line into the new object.
  179. // This is necessary to preserve backwards-compatibility across binary upgrades.
  180. // See issue #56171 for more details.
  181. if err := kubeletConfigFlagPrecedence(kubeletConfig, args); err != nil {
  182. klog.Fatal(err)
  183. }
  184. // update feature gates based on new config
  185. if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
  186. klog.Fatal(err)
  187. }
  188. }
  189. // We always validate the local configuration (command line + config file).
  190. // This is the default "last-known-good" config for dynamic config, and must always remain valid.
  191. if err := kubeletconfigvalidation.ValidateKubeletConfiguration(kubeletConfig); err != nil {
  192. klog.Fatal(err)
  193. }
  194. // use dynamic kubelet config, if enabled
  195. var kubeletConfigController *dynamickubeletconfig.Controller
  196. if dynamicConfigDir := kubeletFlags.DynamicConfigDir.Value(); len(dynamicConfigDir) > 0 {
  197. var dynamicKubeletConfig *kubeletconfiginternal.KubeletConfiguration
  198. dynamicKubeletConfig, kubeletConfigController, err = BootstrapKubeletConfigController(dynamicConfigDir,
  199. func(kc *kubeletconfiginternal.KubeletConfiguration) error {
  200. // Here, we enforce flag precedence inside the controller, prior to the controller's validation sequence,
  201. // so that we get a complete validation at the same point where we can decide to reject dynamic config.
  202. // This fixes the flag-precedence component of issue #63305.
  203. // See issue #56171 for general details on flag precedence.
  204. return kubeletConfigFlagPrecedence(kc, args)
  205. })
  206. if err != nil {
  207. klog.Fatal(err)
  208. }
  209. // If we should just use our existing, local config, the controller will return a nil config
  210. if dynamicKubeletConfig != nil {
  211. kubeletConfig = dynamicKubeletConfig
  212. // Note: flag precedence was already enforced in the controller, prior to validation,
  213. // by our above transform function. Now we simply update feature gates from the new config.
  214. if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
  215. klog.Fatal(err)
  216. }
  217. }
  218. }
  219. // construct a KubeletServer from kubeletFlags and kubeletConfig
  220. kubeletServer := &options.KubeletServer{
  221. KubeletFlags: *kubeletFlags,
  222. KubeletConfiguration: *kubeletConfig,
  223. }
  224. // use kubeletServer to construct the default KubeletDeps
  225. kubeletDeps, err := UnsecuredDependencies(kubeletServer)
  226. if err != nil {
  227. klog.Fatal(err)
  228. }
  229. // add the kubelet config controller to kubeletDeps
  230. kubeletDeps.KubeletConfigController = kubeletConfigController
  231. // set up stopCh here in order to be reused by kubelet and docker shim
  232. stopCh := genericapiserver.SetupSignalHandler()
  233. // start the experimental docker shim, if enabled
  234. if kubeletServer.KubeletFlags.ExperimentalDockershim {
  235. if err := RunDockershim(&kubeletServer.KubeletFlags, kubeletConfig, stopCh); err != nil {
  236. klog.Fatal(err)
  237. }
  238. return
  239. }
  240. // run the kubelet
  241. klog.V(5).Infof("KubeletConfiguration: %#v", kubeletServer.KubeletConfiguration)
  242. if err := Run(kubeletServer, kubeletDeps, stopCh); err != nil {
  243. klog.Fatal(err)
  244. }
  245. },
  246. }
  247. // keep cleanFlagSet separate, so Cobra doesn't pollute it with the global flags
  248. kubeletFlags.AddFlags(cleanFlagSet)
  249. options.AddKubeletConfigFlags(cleanFlagSet, kubeletConfig)
  250. options.AddGlobalFlags(cleanFlagSet)
  251. cleanFlagSet.BoolP("help", "h", false, fmt.Sprintf("help for %s", cmd.Name()))
  252. // ugly, but necessary, because Cobra's default UsageFunc and HelpFunc pollute the flagset with global flags
  253. const usageFmt = "Usage:\n %s\n\nFlags:\n%s"
  254. cmd.SetUsageFunc(func(cmd *cobra.Command) error {
  255. fmt.Fprintf(cmd.OutOrStderr(), usageFmt, cmd.UseLine(), cleanFlagSet.FlagUsagesWrapped(2))
  256. return nil
  257. })
  258. cmd.SetHelpFunc(func(cmd *cobra.Command, args []string) {
  259. fmt.Fprintf(cmd.OutOrStdout(), "%s\n\n"+usageFmt, cmd.Long, cmd.UseLine(), cleanFlagSet.FlagUsagesWrapped(2))
  260. })
  261. return cmd
  262. }
  263. // newFlagSetWithGlobals constructs a new pflag.FlagSet with global flags registered
  264. // on it.
  265. func newFlagSetWithGlobals() *pflag.FlagSet {
  266. fs := pflag.NewFlagSet("", pflag.ExitOnError)
  267. // set the normalize func, similar to k8s.io/component-base/cli//flags.go:InitFlags
  268. fs.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
  269. // explicitly add flags from libs that register global flags
  270. options.AddGlobalFlags(fs)
  271. return fs
  272. }
  273. // newFakeFlagSet constructs a pflag.FlagSet with the same flags as fs, but where
  274. // all values have noop Set implementations
  275. func newFakeFlagSet(fs *pflag.FlagSet) *pflag.FlagSet {
  276. ret := pflag.NewFlagSet("", pflag.ExitOnError)
  277. ret.SetNormalizeFunc(fs.GetNormalizeFunc())
  278. fs.VisitAll(func(f *pflag.Flag) {
  279. ret.VarP(cliflag.NoOp{}, f.Name, f.Shorthand, f.Usage)
  280. })
  281. return ret
  282. }
  283. // kubeletConfigFlagPrecedence re-parses flags over the KubeletConfiguration object.
  284. // We must enforce flag precedence by re-parsing the command line into the new object.
  285. // This is necessary to preserve backwards-compatibility across binary upgrades.
  286. // See issue #56171 for more details.
  287. func kubeletConfigFlagPrecedence(kc *kubeletconfiginternal.KubeletConfiguration, args []string) error {
  288. // We use a throwaway kubeletFlags and a fake global flagset to avoid double-parses,
  289. // as some Set implementations accumulate values from multiple flag invocations.
  290. fs := newFakeFlagSet(newFlagSetWithGlobals())
  291. // register throwaway KubeletFlags
  292. options.NewKubeletFlags().AddFlags(fs)
  293. // register new KubeletConfiguration
  294. options.AddKubeletConfigFlags(fs, kc)
  295. // Remember original feature gates, so we can merge with flag gates later
  296. original := kc.FeatureGates
  297. // re-parse flags
  298. if err := fs.Parse(args); err != nil {
  299. return err
  300. }
  301. // Add back feature gates that were set in the original kc, but not in flags
  302. for k, v := range original {
  303. if _, ok := kc.FeatureGates[k]; !ok {
  304. kc.FeatureGates[k] = v
  305. }
  306. }
  307. return nil
  308. }
  309. func loadConfigFile(name string) (*kubeletconfiginternal.KubeletConfiguration, error) {
  310. const errFmt = "failed to load Kubelet config file %s, error %v"
  311. // compute absolute path based on current working dir
  312. kubeletConfigFile, err := filepath.Abs(name)
  313. if err != nil {
  314. return nil, fmt.Errorf(errFmt, name, err)
  315. }
  316. loader, err := configfiles.NewFsLoader(utilfs.DefaultFs{}, kubeletConfigFile)
  317. if err != nil {
  318. return nil, fmt.Errorf(errFmt, name, err)
  319. }
  320. kc, err := loader.Load()
  321. if err != nil {
  322. return nil, fmt.Errorf(errFmt, name, err)
  323. }
  324. return kc, err
  325. }
  326. // UnsecuredDependencies returns a Dependencies suitable for being run, or an error if the server setup
  327. // is not valid. It will not start any background processes, and does not include authentication/authorization
  328. func UnsecuredDependencies(s *options.KubeletServer) (*kubelet.Dependencies, error) {
  329. // Initialize the TLS Options
  330. tlsOptions, err := InitializeTLS(&s.KubeletFlags, &s.KubeletConfiguration)
  331. if err != nil {
  332. return nil, err
  333. }
  334. mounter := mount.New(s.ExperimentalMounterPath)
  335. subpather := subpath.New(mounter)
  336. var pluginRunner = exec.New()
  337. if s.Containerized {
  338. klog.V(2).Info("Running kubelet in containerized mode")
  339. ne, err := nsenter.NewNsenter(nsenter.DefaultHostRootFsPath, exec.New())
  340. if err != nil {
  341. return nil, err
  342. }
  343. mounter = nsutil.NewMounter(s.RootDirectory, ne)
  344. // NSenter only valid on Linux
  345. subpather = subpath.NewNSEnter(mounter, ne, s.RootDirectory)
  346. // an exec interface which can use nsenter for flex plugin calls
  347. pluginRunner, err = nsenter.NewNsenter(nsenter.DefaultHostRootFsPath, exec.New())
  348. if err != nil {
  349. return nil, err
  350. }
  351. }
  352. var dockerClientConfig *dockershim.ClientConfig
  353. if s.ContainerRuntime == kubetypes.DockerContainerRuntime {
  354. dockerClientConfig = &dockershim.ClientConfig{
  355. DockerEndpoint: s.DockerEndpoint,
  356. RuntimeRequestTimeout: s.RuntimeRequestTimeout.Duration,
  357. ImagePullProgressDeadline: s.ImagePullProgressDeadline.Duration,
  358. }
  359. }
  360. return &kubelet.Dependencies{
  361. Auth: nil, // default does not enforce auth[nz]
  362. CAdvisorInterface: nil, // cadvisor.New launches background processes (bg http.ListenAndServe, and some bg cleaners), not set here
  363. Cloud: nil, // cloud provider might start background processes
  364. ContainerManager: nil,
  365. DockerClientConfig: dockerClientConfig,
  366. KubeClient: nil,
  367. HeartbeatClient: nil,
  368. EventClient: nil,
  369. Mounter: mounter,
  370. Subpather: subpather,
  371. OOMAdjuster: oom.NewOOMAdjuster(),
  372. OSInterface: kubecontainer.RealOS{},
  373. VolumePlugins: ProbeVolumePlugins(),
  374. DynamicPluginProber: GetDynamicPluginProber(s.VolumePluginDir, pluginRunner),
  375. TLSOptions: tlsOptions}, nil
  376. }
  377. // Run runs the specified KubeletServer with the given Dependencies. This should never exit.
  378. // The kubeDeps argument may be nil - if so, it is initialized from the settings on KubeletServer.
  379. // Otherwise, the caller is assumed to have set up the Dependencies object and a default one will
  380. // not be generated.
  381. func Run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan struct{}) error {
  382. // To help debugging, immediately log version
  383. klog.Infof("Version: %+v", version.Get())
  384. if err := initForOS(s.KubeletFlags.WindowsService); err != nil {
  385. return fmt.Errorf("failed OS init: %v", err)
  386. }
  387. if err := run(s, kubeDeps, stopCh); err != nil {
  388. return fmt.Errorf("failed to run Kubelet: %v", err)
  389. }
  390. return nil
  391. }
  392. func checkPermissions() error {
  393. if uid := os.Getuid(); uid != 0 {
  394. return fmt.Errorf("Kubelet needs to run as uid `0`. It is being run as %d", uid)
  395. }
  396. // TODO: Check if kubelet is running in the `initial` user namespace.
  397. // http://man7.org/linux/man-pages/man7/user_namespaces.7.html
  398. return nil
  399. }
  400. func setConfigz(cz *configz.Config, kc *kubeletconfiginternal.KubeletConfiguration) error {
  401. scheme, _, err := kubeletscheme.NewSchemeAndCodecs()
  402. if err != nil {
  403. return err
  404. }
  405. versioned := kubeletconfigv1beta1.KubeletConfiguration{}
  406. if err := scheme.Convert(kc, &versioned, nil); err != nil {
  407. return err
  408. }
  409. cz.Set(versioned)
  410. return nil
  411. }
  412. func initConfigz(kc *kubeletconfiginternal.KubeletConfiguration) error {
  413. cz, err := configz.New("kubeletconfig")
  414. if err != nil {
  415. klog.Errorf("unable to register configz: %s", err)
  416. return err
  417. }
  418. if err := setConfigz(cz, kc); err != nil {
  419. klog.Errorf("unable to register config: %s", err)
  420. return err
  421. }
  422. return nil
  423. }
  424. // makeEventRecorder sets up kubeDeps.Recorder if it's nil. It's a no-op otherwise.
  425. func makeEventRecorder(kubeDeps *kubelet.Dependencies, nodeName types.NodeName) {
  426. if kubeDeps.Recorder != nil {
  427. return
  428. }
  429. eventBroadcaster := record.NewBroadcaster()
  430. kubeDeps.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: componentKubelet, Host: string(nodeName)})
  431. eventBroadcaster.StartLogging(klog.V(3).Infof)
  432. if kubeDeps.EventClient != nil {
  433. klog.V(4).Infof("Sending events to api server.")
  434. eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeDeps.EventClient.Events("")})
  435. } else {
  436. klog.Warning("No api server defined - no events will be sent to API server.")
  437. }
  438. }
  439. func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan struct{}) (err error) {
  440. // Set global feature gates based on the value on the initial KubeletServer
  441. err = utilfeature.DefaultMutableFeatureGate.SetFromMap(s.KubeletConfiguration.FeatureGates)
  442. if err != nil {
  443. return err
  444. }
  445. // validate the initial KubeletServer (we set feature gates first, because this validation depends on feature gates)
  446. if err := options.ValidateKubeletServer(s); err != nil {
  447. return err
  448. }
  449. // Obtain Kubelet Lock File
  450. if s.ExitOnLockContention && s.LockFilePath == "" {
  451. return errors.New("cannot exit on lock file contention: no lock file specified")
  452. }
  453. done := make(chan struct{})
  454. if s.LockFilePath != "" {
  455. klog.Infof("acquiring file lock on %q", s.LockFilePath)
  456. if err := flock.Acquire(s.LockFilePath); err != nil {
  457. return fmt.Errorf("unable to acquire file lock on %q: %v", s.LockFilePath, err)
  458. }
  459. if s.ExitOnLockContention {
  460. klog.Infof("watching for inotify events for: %v", s.LockFilePath)
  461. if err := watchForLockfileContention(s.LockFilePath, done); err != nil {
  462. return err
  463. }
  464. }
  465. }
  466. // Register current configuration with /configz endpoint
  467. err = initConfigz(&s.KubeletConfiguration)
  468. if err != nil {
  469. klog.Errorf("unable to register KubeletConfiguration with configz, error: %v", err)
  470. }
  471. // About to get clients and such, detect standaloneMode
  472. standaloneMode := true
  473. if len(s.KubeConfig) > 0 {
  474. standaloneMode = false
  475. }
  476. if kubeDeps == nil {
  477. kubeDeps, err = UnsecuredDependencies(s)
  478. if err != nil {
  479. return err
  480. }
  481. }
  482. if kubeDeps.Cloud == nil {
  483. if !cloudprovider.IsExternal(s.CloudProvider) {
  484. cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
  485. if err != nil {
  486. return err
  487. }
  488. if cloud == nil {
  489. klog.V(2).Infof("No cloud provider specified: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile)
  490. } else {
  491. klog.V(2).Infof("Successfully initialized cloud provider: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile)
  492. }
  493. kubeDeps.Cloud = cloud
  494. }
  495. }
  496. hostName, err := nodeutil.GetHostname(s.HostnameOverride)
  497. if err != nil {
  498. return err
  499. }
  500. nodeName, err := getNodeName(kubeDeps.Cloud, hostName)
  501. if err != nil {
  502. return err
  503. }
  504. // if in standalone mode, indicate as much by setting all clients to nil
  505. switch {
  506. case standaloneMode:
  507. kubeDeps.KubeClient = nil
  508. kubeDeps.EventClient = nil
  509. kubeDeps.HeartbeatClient = nil
  510. klog.Warningf("standalone mode, no API client")
  511. case kubeDeps.KubeClient == nil, kubeDeps.EventClient == nil, kubeDeps.HeartbeatClient == nil:
  512. clientConfig, closeAllConns, err := buildKubeletClientConfig(s, nodeName)
  513. if err != nil {
  514. return err
  515. }
  516. if closeAllConns == nil {
  517. return errors.New("closeAllConns must be a valid function other than nil")
  518. }
  519. kubeDeps.OnHeartbeatFailure = closeAllConns
  520. kubeDeps.KubeClient, err = clientset.NewForConfig(clientConfig)
  521. if err != nil {
  522. return fmt.Errorf("failed to initialize kubelet client: %v", err)
  523. }
  524. // make a separate client for events
  525. eventClientConfig := *clientConfig
  526. eventClientConfig.QPS = float32(s.EventRecordQPS)
  527. eventClientConfig.Burst = int(s.EventBurst)
  528. kubeDeps.EventClient, err = v1core.NewForConfig(&eventClientConfig)
  529. if err != nil {
  530. return fmt.Errorf("failed to initialize kubelet event client: %v", err)
  531. }
  532. // make a separate client for heartbeat with throttling disabled and a timeout attached
  533. heartbeatClientConfig := *clientConfig
  534. heartbeatClientConfig.Timeout = s.KubeletConfiguration.NodeStatusUpdateFrequency.Duration
  535. // if the NodeLease feature is enabled, the timeout is the minimum of the lease duration and status update frequency
  536. if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) {
  537. leaseTimeout := time.Duration(s.KubeletConfiguration.NodeLeaseDurationSeconds) * time.Second
  538. if heartbeatClientConfig.Timeout > leaseTimeout {
  539. heartbeatClientConfig.Timeout = leaseTimeout
  540. }
  541. }
  542. heartbeatClientConfig.QPS = float32(-1)
  543. kubeDeps.HeartbeatClient, err = clientset.NewForConfig(&heartbeatClientConfig)
  544. if err != nil {
  545. return fmt.Errorf("failed to initialize kubelet heartbeat client: %v", err)
  546. }
  547. }
  548. // If the kubelet config controller is available, and dynamic config is enabled, start the config and status sync loops
  549. if utilfeature.DefaultFeatureGate.Enabled(features.DynamicKubeletConfig) && len(s.DynamicConfigDir.Value()) > 0 &&
  550. kubeDeps.KubeletConfigController != nil && !standaloneMode && !s.RunOnce {
  551. if err := kubeDeps.KubeletConfigController.StartSync(kubeDeps.KubeClient, kubeDeps.EventClient, string(nodeName)); err != nil {
  552. return err
  553. }
  554. }
  555. if kubeDeps.Auth == nil {
  556. auth, err := BuildAuth(nodeName, kubeDeps.KubeClient, s.KubeletConfiguration)
  557. if err != nil {
  558. return err
  559. }
  560. kubeDeps.Auth = auth
  561. }
  562. var cgroupRoots []string
  563. cgroupRoots = append(cgroupRoots, cm.NodeAllocatableRoot(s.CgroupRoot, s.CgroupDriver))
  564. kubeletCgroup, err := cm.GetKubeletContainer(s.KubeletCgroups)
  565. if err != nil {
  566. klog.Warningf("failed to get the kubelet's cgroup: %v. Kubelet system container metrics may be missing.", err)
  567. } else if kubeletCgroup != "" {
  568. cgroupRoots = append(cgroupRoots, kubeletCgroup)
  569. }
  570. runtimeCgroup, err := cm.GetRuntimeContainer(s.ContainerRuntime, s.RuntimeCgroups)
  571. if err != nil {
  572. klog.Warningf("failed to get the container runtime's cgroup: %v. Runtime system container metrics may be missing.", err)
  573. } else if runtimeCgroup != "" {
  574. // RuntimeCgroups is optional, so ignore if it isn't specified
  575. cgroupRoots = append(cgroupRoots, runtimeCgroup)
  576. }
  577. if s.SystemCgroups != "" {
  578. // SystemCgroups is optional, so ignore if it isn't specified
  579. cgroupRoots = append(cgroupRoots, s.SystemCgroups)
  580. }
  581. if kubeDeps.CAdvisorInterface == nil {
  582. imageFsInfoProvider := cadvisor.NewImageFsInfoProvider(s.ContainerRuntime, s.RemoteRuntimeEndpoint)
  583. kubeDeps.CAdvisorInterface, err = cadvisor.New(imageFsInfoProvider, s.RootDirectory, cgroupRoots, cadvisor.UsingLegacyCadvisorStats(s.ContainerRuntime, s.RemoteRuntimeEndpoint))
  584. if err != nil {
  585. return err
  586. }
  587. }
  588. // Setup event recorder if required.
  589. makeEventRecorder(kubeDeps, nodeName)
  590. if kubeDeps.ContainerManager == nil {
  591. if s.CgroupsPerQOS && s.CgroupRoot == "" {
  592. klog.Info("--cgroups-per-qos enabled, but --cgroup-root was not specified. defaulting to /")
  593. s.CgroupRoot = "/"
  594. }
  595. kubeReserved, err := parseResourceList(s.KubeReserved)
  596. if err != nil {
  597. return err
  598. }
  599. systemReserved, err := parseResourceList(s.SystemReserved)
  600. if err != nil {
  601. return err
  602. }
  603. var hardEvictionThresholds []evictionapi.Threshold
  604. // If the user requested to ignore eviction thresholds, then do not set valid values for hardEvictionThresholds here.
  605. if !s.ExperimentalNodeAllocatableIgnoreEvictionThreshold {
  606. hardEvictionThresholds, err = eviction.ParseThresholdConfig([]string{}, s.EvictionHard, nil, nil, nil)
  607. if err != nil {
  608. return err
  609. }
  610. }
  611. experimentalQOSReserved, err := cm.ParseQOSReserved(s.QOSReserved)
  612. if err != nil {
  613. return err
  614. }
  615. devicePluginEnabled := utilfeature.DefaultFeatureGate.Enabled(features.DevicePlugins)
  616. kubeDeps.ContainerManager, err = cm.NewContainerManager(
  617. kubeDeps.Mounter,
  618. kubeDeps.CAdvisorInterface,
  619. cm.NodeConfig{
  620. RuntimeCgroupsName: s.RuntimeCgroups,
  621. SystemCgroupsName: s.SystemCgroups,
  622. KubeletCgroupsName: s.KubeletCgroups,
  623. ContainerRuntime: s.ContainerRuntime,
  624. CgroupsPerQOS: s.CgroupsPerQOS,
  625. CgroupRoot: s.CgroupRoot,
  626. CgroupDriver: s.CgroupDriver,
  627. KubeletRootDir: s.RootDirectory,
  628. ProtectKernelDefaults: s.ProtectKernelDefaults,
  629. NodeAllocatableConfig: cm.NodeAllocatableConfig{
  630. KubeReservedCgroupName: s.KubeReservedCgroup,
  631. SystemReservedCgroupName: s.SystemReservedCgroup,
  632. EnforceNodeAllocatable: sets.NewString(s.EnforceNodeAllocatable...),
  633. KubeReserved: kubeReserved,
  634. SystemReserved: systemReserved,
  635. HardEvictionThresholds: hardEvictionThresholds,
  636. },
  637. QOSReserved: *experimentalQOSReserved,
  638. ExperimentalCPUManagerPolicy: s.CPUManagerPolicy,
  639. ExperimentalCPUManagerReconcilePeriod: s.CPUManagerReconcilePeriod.Duration,
  640. ExperimentalPodPidsLimit: s.PodPidsLimit,
  641. EnforceCPULimits: s.CPUCFSQuota,
  642. CPUCFSQuotaPeriod: s.CPUCFSQuotaPeriod.Duration,
  643. },
  644. s.FailSwapOn,
  645. devicePluginEnabled,
  646. kubeDeps.Recorder)
  647. if err != nil {
  648. return err
  649. }
  650. }
  651. if err := checkPermissions(); err != nil {
  652. klog.Error(err)
  653. }
  654. utilruntime.ReallyCrash = s.ReallyCrashForTesting
  655. rand.Seed(time.Now().UnixNano())
  656. // TODO(vmarmol): Do this through container config.
  657. oomAdjuster := kubeDeps.OOMAdjuster
  658. if err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil {
  659. klog.Warning(err)
  660. }
  661. if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
  662. return err
  663. }
  664. if s.HealthzPort > 0 {
  665. mux := http.NewServeMux()
  666. healthz.InstallHandler(mux)
  667. go wait.Until(func() {
  668. err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(int(s.HealthzPort))), mux)
  669. if err != nil {
  670. klog.Errorf("Starting healthz server failed: %v", err)
  671. }
  672. }, 5*time.Second, wait.NeverStop)
  673. }
  674. if s.RunOnce {
  675. return nil
  676. }
  677. // If systemd is used, notify it that we have started
  678. go daemon.SdNotify(false, "READY=1")
  679. select {
  680. case <-done:
  681. break
  682. case <-stopCh:
  683. break
  684. }
  685. return nil
  686. }
  687. // buildKubeletClientConfig constructs the appropriate client config for the kubelet depending on whether
  688. // bootstrapping is enabled or client certificate rotation is enabled.
  689. func buildKubeletClientConfig(s *options.KubeletServer, nodeName types.NodeName) (*restclient.Config, func(), error) {
  690. if s.RotateCertificates && utilfeature.DefaultFeatureGate.Enabled(features.RotateKubeletClientCertificate) {
  691. // Rules for client rotation and the handling of kube config files:
  692. //
  693. // 1. If the client provides only a kubeconfig file, we must use that as the initial client
  694. // kubeadm needs the initial data in the kubeconfig to be placed into the cert store
  695. // 2. If the client provides only an initial bootstrap kubeconfig file, we must create a
  696. // kubeconfig file at the target location that points to the cert store, but until
  697. // the file is present the client config will have no certs
  698. // 3. If the client provides both and the kubeconfig is valid, we must ignore the bootstrap
  699. // kubeconfig.
  700. // 4. If the client provides both and the kubeconfig is expired or otherwise invalid, we must
  701. // replace the kubeconfig with a new file that points to the cert dir
  702. //
  703. // The desired configuration for bootstrapping is to use a bootstrap kubeconfig and to have
  704. // the kubeconfig file be managed by this process. For backwards compatibility with kubeadm,
  705. // which provides a high powered kubeconfig on the master with cert/key data, we must
  706. // bootstrap the cert manager with the contents of the initial client config.
  707. klog.Infof("Client rotation is on, will bootstrap in background")
  708. certConfig, clientConfig, err := bootstrap.LoadClientConfig(s.KubeConfig, s.BootstrapKubeconfig, s.CertDirectory)
  709. if err != nil {
  710. return nil, nil, err
  711. }
  712. // use the correct content type for cert rotation, but don't set QPS
  713. setContentTypeForClient(certConfig, s.ContentType)
  714. kubeClientConfigOverrides(s, clientConfig)
  715. clientCertificateManager, err := buildClientCertificateManager(certConfig, clientConfig, s.CertDirectory, nodeName)
  716. if err != nil {
  717. return nil, nil, err
  718. }
  719. // the rotating transport will use the cert from the cert manager instead of these files
  720. transportConfig := restclient.AnonymousClientConfig(clientConfig)
  721. // we set exitAfter to five minutes because we use this client configuration to request new certs - if we are unable
  722. // to request new certs, we will be unable to continue normal operation. Exiting the process allows a wrapper
  723. // or the bootstrapping credentials to potentially lay down new initial config.
  724. closeAllConns, err := kubeletcertificate.UpdateTransport(wait.NeverStop, transportConfig, clientCertificateManager, 5*time.Minute)
  725. if err != nil {
  726. return nil, nil, err
  727. }
  728. klog.V(2).Info("Starting client certificate rotation.")
  729. clientCertificateManager.Start()
  730. return transportConfig, closeAllConns, nil
  731. }
  732. if len(s.BootstrapKubeconfig) > 0 {
  733. if err := bootstrap.LoadClientCert(s.KubeConfig, s.BootstrapKubeconfig, s.CertDirectory, nodeName); err != nil {
  734. return nil, nil, err
  735. }
  736. }
  737. clientConfig, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
  738. &clientcmd.ClientConfigLoadingRules{ExplicitPath: s.KubeConfig},
  739. &clientcmd.ConfigOverrides{},
  740. ).ClientConfig()
  741. if err != nil {
  742. return nil, nil, fmt.Errorf("invalid kubeconfig: %v", err)
  743. }
  744. kubeClientConfigOverrides(s, clientConfig)
  745. closeAllConns, err := updateDialer(clientConfig)
  746. if err != nil {
  747. return nil, nil, err
  748. }
  749. return clientConfig, closeAllConns, nil
  750. }
  751. // updateDialer instruments a restconfig with a dial. the returned function allows forcefully closing all active connections.
  752. func updateDialer(clientConfig *restclient.Config) (func(), error) {
  753. if clientConfig.Transport != nil || clientConfig.Dial != nil {
  754. return nil, fmt.Errorf("there is already a transport or dialer configured")
  755. }
  756. d := connrotation.NewDialer((&net.Dialer{Timeout: 30 * time.Second, KeepAlive: 30 * time.Second}).DialContext)
  757. clientConfig.Dial = d.DialContext
  758. return d.CloseAll, nil
  759. }
  760. // buildClientCertificateManager creates a certificate manager that will use certConfig to request a client certificate
  761. // if no certificate is available, or the most recent clientConfig (which is assumed to point to the cert that the manager will
  762. // write out).
  763. func buildClientCertificateManager(certConfig, clientConfig *restclient.Config, certDir string, nodeName types.NodeName) (certificate.Manager, error) {
  764. newClientFn := func(current *tls.Certificate) (certificatesclient.CertificateSigningRequestInterface, error) {
  765. // If we have a valid certificate, use that to fetch CSRs. Otherwise use the bootstrap
  766. // credentials. In the future it would be desirable to change the behavior of bootstrap
  767. // to always fall back to the external bootstrap credentials when such credentials are
  768. // provided by a fundamental trust system like cloud VM identity or an HSM module.
  769. config := certConfig
  770. if current != nil {
  771. config = clientConfig
  772. }
  773. client, err := clientset.NewForConfig(config)
  774. if err != nil {
  775. return nil, err
  776. }
  777. return client.CertificatesV1beta1().CertificateSigningRequests(), nil
  778. }
  779. return kubeletcertificate.NewKubeletClientCertificateManager(
  780. certDir,
  781. nodeName,
  782. // this preserves backwards compatibility with kubeadm which passes
  783. // a high powered certificate to the kubelet as --kubeconfig and expects
  784. // it to be rotated out immediately
  785. clientConfig.CertData,
  786. clientConfig.KeyData,
  787. clientConfig.CertFile,
  788. clientConfig.KeyFile,
  789. newClientFn,
  790. )
  791. }
  792. func kubeClientConfigOverrides(s *options.KubeletServer, clientConfig *restclient.Config) {
  793. setContentTypeForClient(clientConfig, s.ContentType)
  794. // Override kubeconfig qps/burst settings from flags
  795. clientConfig.QPS = float32(s.KubeAPIQPS)
  796. clientConfig.Burst = int(s.KubeAPIBurst)
  797. }
  798. // getNodeName returns the node name according to the cloud provider
  799. // if cloud provider is specified. Otherwise, returns the hostname of the node.
  800. func getNodeName(cloud cloudprovider.Interface, hostname string) (types.NodeName, error) {
  801. if cloud == nil {
  802. return types.NodeName(hostname), nil
  803. }
  804. instances, ok := cloud.Instances()
  805. if !ok {
  806. return "", fmt.Errorf("failed to get instances from cloud provider")
  807. }
  808. nodeName, err := instances.CurrentNodeName(context.TODO(), hostname)
  809. if err != nil {
  810. return "", fmt.Errorf("error fetching current node name from cloud provider: %v", err)
  811. }
  812. klog.V(2).Infof("cloud provider determined current node name to be %s", nodeName)
  813. return nodeName, nil
  814. }
  815. // InitializeTLS checks for a configured TLSCertFile and TLSPrivateKeyFile: if unspecified a new self-signed
  816. // certificate and key file are generated. Returns a configured server.TLSOptions object.
  817. func InitializeTLS(kf *options.KubeletFlags, kc *kubeletconfiginternal.KubeletConfiguration) (*server.TLSOptions, error) {
  818. if !kc.ServerTLSBootstrap && kc.TLSCertFile == "" && kc.TLSPrivateKeyFile == "" {
  819. kc.TLSCertFile = path.Join(kf.CertDirectory, "kubelet.crt")
  820. kc.TLSPrivateKeyFile = path.Join(kf.CertDirectory, "kubelet.key")
  821. canReadCertAndKey, err := certutil.CanReadCertAndKey(kc.TLSCertFile, kc.TLSPrivateKeyFile)
  822. if err != nil {
  823. return nil, err
  824. }
  825. if !canReadCertAndKey {
  826. hostName, err := nodeutil.GetHostname(kf.HostnameOverride)
  827. if err != nil {
  828. return nil, err
  829. }
  830. cert, key, err := certutil.GenerateSelfSignedCertKey(hostName, nil, nil)
  831. if err != nil {
  832. return nil, fmt.Errorf("unable to generate self signed cert: %v", err)
  833. }
  834. if err := certutil.WriteCert(kc.TLSCertFile, cert); err != nil {
  835. return nil, err
  836. }
  837. if err := keyutil.WriteKey(kc.TLSPrivateKeyFile, key); err != nil {
  838. return nil, err
  839. }
  840. klog.V(4).Infof("Using self-signed cert (%s, %s)", kc.TLSCertFile, kc.TLSPrivateKeyFile)
  841. }
  842. }
  843. tlsCipherSuites, err := cliflag.TLSCipherSuites(kc.TLSCipherSuites)
  844. if err != nil {
  845. return nil, err
  846. }
  847. minTLSVersion, err := cliflag.TLSVersion(kc.TLSMinVersion)
  848. if err != nil {
  849. return nil, err
  850. }
  851. tlsOptions := &server.TLSOptions{
  852. Config: &tls.Config{
  853. MinVersion: minTLSVersion,
  854. CipherSuites: tlsCipherSuites,
  855. },
  856. CertFile: kc.TLSCertFile,
  857. KeyFile: kc.TLSPrivateKeyFile,
  858. }
  859. if len(kc.Authentication.X509.ClientCAFile) > 0 {
  860. clientCAs, err := certutil.NewPool(kc.Authentication.X509.ClientCAFile)
  861. if err != nil {
  862. return nil, fmt.Errorf("unable to load client CA file %s: %v", kc.Authentication.X509.ClientCAFile, err)
  863. }
  864. // Specify allowed CAs for client certificates
  865. tlsOptions.Config.ClientCAs = clientCAs
  866. // Populate PeerCertificates in requests, but don't reject connections without verified certificates
  867. tlsOptions.Config.ClientAuth = tls.RequestClientCert
  868. }
  869. return tlsOptions, nil
  870. }
  871. // setContentTypeForClient sets the appropritae content type into the rest config
  872. // and handles defaulting AcceptContentTypes based on that input.
  873. func setContentTypeForClient(cfg *restclient.Config, contentType string) {
  874. if len(contentType) == 0 {
  875. return
  876. }
  877. cfg.ContentType = contentType
  878. switch contentType {
  879. case runtime.ContentTypeProtobuf:
  880. cfg.AcceptContentTypes = strings.Join([]string{runtime.ContentTypeProtobuf, runtime.ContentTypeJSON}, ",")
  881. default:
  882. // otherwise let the rest client perform defaulting
  883. }
  884. }
  885. // RunKubelet is responsible for setting up and running a kubelet. It is used in three different applications:
  886. // 1 Integration tests
  887. // 2 Kubelet binary
  888. // 3 Standalone 'kubernetes' binary
  889. // Eventually, #2 will be replaced with instances of #3
  890. func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
  891. hostname, err := nodeutil.GetHostname(kubeServer.HostnameOverride)
  892. if err != nil {
  893. return err
  894. }
  895. // Query the cloud provider for our node name, default to hostname if kubeDeps.Cloud == nil
  896. nodeName, err := getNodeName(kubeDeps.Cloud, hostname)
  897. if err != nil {
  898. return err
  899. }
  900. // Setup event recorder if required.
  901. makeEventRecorder(kubeDeps, nodeName)
  902. capabilities.Initialize(capabilities.Capabilities{
  903. AllowPrivileged: true,
  904. })
  905. credentialprovider.SetPreferredDockercfgPath(kubeServer.RootDirectory)
  906. klog.V(2).Infof("Using root directory: %v", kubeServer.RootDirectory)
  907. if kubeDeps.OSInterface == nil {
  908. kubeDeps.OSInterface = kubecontainer.RealOS{}
  909. }
  910. k, err := createAndInitKubelet(&kubeServer.KubeletConfiguration,
  911. kubeDeps,
  912. &kubeServer.ContainerRuntimeOptions,
  913. kubeServer.ContainerRuntime,
  914. kubeServer.RuntimeCgroups,
  915. kubeServer.HostnameOverride,
  916. kubeServer.NodeIP,
  917. kubeServer.ProviderID,
  918. kubeServer.CloudProvider,
  919. kubeServer.CertDirectory,
  920. kubeServer.RootDirectory,
  921. kubeServer.RegisterNode,
  922. kubeServer.RegisterWithTaints,
  923. kubeServer.AllowedUnsafeSysctls,
  924. kubeServer.RemoteRuntimeEndpoint,
  925. kubeServer.RemoteImageEndpoint,
  926. kubeServer.ExperimentalMounterPath,
  927. kubeServer.ExperimentalKernelMemcgNotification,
  928. kubeServer.ExperimentalCheckNodeCapabilitiesBeforeMount,
  929. kubeServer.ExperimentalNodeAllocatableIgnoreEvictionThreshold,
  930. kubeServer.MinimumGCAge,
  931. kubeServer.MaxPerPodContainerCount,
  932. kubeServer.MaxContainerCount,
  933. kubeServer.MasterServiceNamespace,
  934. kubeServer.RegisterSchedulable,
  935. kubeServer.NonMasqueradeCIDR,
  936. kubeServer.KeepTerminatedPodVolumes,
  937. kubeServer.NodeLabels,
  938. kubeServer.SeccompProfileRoot,
  939. kubeServer.BootstrapCheckpointPath,
  940. kubeServer.NodeStatusMaxImages)
  941. if err != nil {
  942. return fmt.Errorf("failed to create kubelet: %v", err)
  943. }
  944. // NewMainKubelet should have set up a pod source config if one didn't exist
  945. // when the builder was run. This is just a precaution.
  946. if kubeDeps.PodConfig == nil {
  947. return fmt.Errorf("failed to create kubelet, pod source config was nil")
  948. }
  949. podCfg := kubeDeps.PodConfig
  950. rlimit.RlimitNumFiles(uint64(kubeServer.MaxOpenFiles))
  951. // process pods and exit.
  952. if runOnce {
  953. if _, err := k.RunOnce(podCfg.Updates()); err != nil {
  954. return fmt.Errorf("runonce failed: %v", err)
  955. }
  956. klog.Info("Started kubelet as runonce")
  957. } else {
  958. startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableCAdvisorJSONEndpoints, kubeServer.EnableServer)
  959. klog.Info("Started kubelet")
  960. }
  961. return nil
  962. }
  963. func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableCAdvisorJSONEndpoints, enableServer bool) {
  964. // start the kubelet
  965. go wait.Until(func() {
  966. k.Run(podCfg.Updates())
  967. }, 0, wait.NeverStop)
  968. // start the kubelet server
  969. if enableServer {
  970. go k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, enableCAdvisorJSONEndpoints, kubeCfg.EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling)
  971. }
  972. if kubeCfg.ReadOnlyPort > 0 {
  973. go k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort), enableCAdvisorJSONEndpoints)
  974. }
  975. if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPodResources) {
  976. go k.ListenAndServePodResources()
  977. }
  978. }
  979. func createAndInitKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
  980. kubeDeps *kubelet.Dependencies,
  981. crOptions *config.ContainerRuntimeOptions,
  982. containerRuntime string,
  983. runtimeCgroups string,
  984. hostnameOverride string,
  985. nodeIP string,
  986. providerID string,
  987. cloudProvider string,
  988. certDirectory string,
  989. rootDirectory string,
  990. registerNode bool,
  991. registerWithTaints []api.Taint,
  992. allowedUnsafeSysctls []string,
  993. remoteRuntimeEndpoint string,
  994. remoteImageEndpoint string,
  995. experimentalMounterPath string,
  996. experimentalKernelMemcgNotification bool,
  997. experimentalCheckNodeCapabilitiesBeforeMount bool,
  998. experimentalNodeAllocatableIgnoreEvictionThreshold bool,
  999. minimumGCAge metav1.Duration,
  1000. maxPerPodContainerCount int32,
  1001. maxContainerCount int32,
  1002. masterServiceNamespace string,
  1003. registerSchedulable bool,
  1004. nonMasqueradeCIDR string,
  1005. keepTerminatedPodVolumes bool,
  1006. nodeLabels map[string]string,
  1007. seccompProfileRoot string,
  1008. bootstrapCheckpointPath string,
  1009. nodeStatusMaxImages int32) (k kubelet.Bootstrap, err error) {
  1010. // TODO: block until all sources have delivered at least one update to the channel, or break the sync loop
  1011. // up into "per source" synchronizations
  1012. k, err = kubelet.NewMainKubelet(kubeCfg,
  1013. kubeDeps,
  1014. crOptions,
  1015. containerRuntime,
  1016. runtimeCgroups,
  1017. hostnameOverride,
  1018. nodeIP,
  1019. providerID,
  1020. cloudProvider,
  1021. certDirectory,
  1022. rootDirectory,
  1023. registerNode,
  1024. registerWithTaints,
  1025. allowedUnsafeSysctls,
  1026. remoteRuntimeEndpoint,
  1027. remoteImageEndpoint,
  1028. experimentalMounterPath,
  1029. experimentalKernelMemcgNotification,
  1030. experimentalCheckNodeCapabilitiesBeforeMount,
  1031. experimentalNodeAllocatableIgnoreEvictionThreshold,
  1032. minimumGCAge,
  1033. maxPerPodContainerCount,
  1034. maxContainerCount,
  1035. masterServiceNamespace,
  1036. registerSchedulable,
  1037. nonMasqueradeCIDR,
  1038. keepTerminatedPodVolumes,
  1039. nodeLabels,
  1040. seccompProfileRoot,
  1041. bootstrapCheckpointPath,
  1042. nodeStatusMaxImages)
  1043. if err != nil {
  1044. return nil, err
  1045. }
  1046. k.BirthCry()
  1047. k.StartGarbageCollection()
  1048. return k, nil
  1049. }
  1050. // parseResourceList parses the given configuration map into an API
  1051. // ResourceList or returns an error.
  1052. func parseResourceList(m map[string]string) (v1.ResourceList, error) {
  1053. if len(m) == 0 {
  1054. return nil, nil
  1055. }
  1056. rl := make(v1.ResourceList)
  1057. for k, v := range m {
  1058. switch v1.ResourceName(k) {
  1059. // CPU, memory, local storage, and PID resources are supported.
  1060. case v1.ResourceCPU, v1.ResourceMemory, v1.ResourceEphemeralStorage, pidlimit.PIDs:
  1061. if v1.ResourceName(k) != pidlimit.PIDs || utilfeature.DefaultFeatureGate.Enabled(features.SupportNodePidsLimit) {
  1062. q, err := resource.ParseQuantity(v)
  1063. if err != nil {
  1064. return nil, err
  1065. }
  1066. if q.Sign() == -1 {
  1067. return nil, fmt.Errorf("resource quantity for %q cannot be negative: %v", k, v)
  1068. }
  1069. rl[v1.ResourceName(k)] = q
  1070. }
  1071. default:
  1072. return nil, fmt.Errorf("cannot reserve %q resource", k)
  1073. }
  1074. }
  1075. return rl, nil
  1076. }
  1077. // BootstrapKubeletConfigController constructs and bootstrap a configuration controller
  1078. func BootstrapKubeletConfigController(dynamicConfigDir string, transform dynamickubeletconfig.TransformFunc) (*kubeletconfiginternal.KubeletConfiguration, *dynamickubeletconfig.Controller, error) {
  1079. if !utilfeature.DefaultFeatureGate.Enabled(features.DynamicKubeletConfig) {
  1080. return nil, nil, fmt.Errorf("failed to bootstrap Kubelet config controller, you must enable the DynamicKubeletConfig feature gate")
  1081. }
  1082. if len(dynamicConfigDir) == 0 {
  1083. return nil, nil, fmt.Errorf("cannot bootstrap Kubelet config controller, --dynamic-config-dir was not provided")
  1084. }
  1085. // compute absolute path and bootstrap controller
  1086. dir, err := filepath.Abs(dynamicConfigDir)
  1087. if err != nil {
  1088. return nil, nil, fmt.Errorf("failed to get absolute path for --dynamic-config-dir=%s", dynamicConfigDir)
  1089. }
  1090. // get the latest KubeletConfiguration checkpoint from disk, or return the default config if no valid checkpoints exist
  1091. c := dynamickubeletconfig.NewController(dir, transform)
  1092. kc, err := c.Bootstrap()
  1093. if err != nil {
  1094. return nil, nil, fmt.Errorf("failed to determine a valid configuration, error: %v", err)
  1095. }
  1096. return kc, c, nil
  1097. }
  1098. // RunDockershim only starts the dockershim in current process. This is only used for cri validate testing purpose
  1099. // TODO(random-liu): Move this to a separate binary.
  1100. func RunDockershim(f *options.KubeletFlags, c *kubeletconfiginternal.KubeletConfiguration, stopCh <-chan struct{}) error {
  1101. r := &f.ContainerRuntimeOptions
  1102. // Initialize docker client configuration.
  1103. dockerClientConfig := &dockershim.ClientConfig{
  1104. DockerEndpoint: r.DockerEndpoint,
  1105. RuntimeRequestTimeout: c.RuntimeRequestTimeout.Duration,
  1106. ImagePullProgressDeadline: r.ImagePullProgressDeadline.Duration,
  1107. }
  1108. // Initialize network plugin settings.
  1109. pluginSettings := dockershim.NetworkPluginSettings{
  1110. HairpinMode: kubeletconfiginternal.HairpinMode(c.HairpinMode),
  1111. NonMasqueradeCIDR: f.NonMasqueradeCIDR,
  1112. PluginName: r.NetworkPluginName,
  1113. PluginConfDir: r.CNIConfDir,
  1114. PluginBinDirString: r.CNIBinDir,
  1115. MTU: int(r.NetworkPluginMTU),
  1116. }
  1117. // Initialize streaming configuration. (Not using TLS now)
  1118. streamingConfig := &streaming.Config{
  1119. // Use a relative redirect (no scheme or host).
  1120. BaseURL: &url.URL{Path: "/cri/"},
  1121. StreamIdleTimeout: c.StreamingConnectionIdleTimeout.Duration,
  1122. StreamCreationTimeout: streaming.DefaultConfig.StreamCreationTimeout,
  1123. SupportedRemoteCommandProtocols: streaming.DefaultConfig.SupportedRemoteCommandProtocols,
  1124. SupportedPortForwardProtocols: streaming.DefaultConfig.SupportedPortForwardProtocols,
  1125. }
  1126. // Standalone dockershim will always start the local streaming server.
  1127. ds, err := dockershim.NewDockerService(dockerClientConfig, r.PodSandboxImage, streamingConfig, &pluginSettings,
  1128. f.RuntimeCgroups, c.CgroupDriver, r.DockershimRootDirectory, true /*startLocalStreamingServer*/)
  1129. if err != nil {
  1130. return err
  1131. }
  1132. klog.V(2).Infof("Starting the GRPC server for the docker CRI shim.")
  1133. server := dockerremote.NewDockerServer(f.RemoteRuntimeEndpoint, ds)
  1134. if err := server.Start(); err != nil {
  1135. return err
  1136. }
  1137. <-stopCh
  1138. return nil
  1139. }