server.go 48 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // Package app makes it easy to create a kubelet server for various contexts.
  14. package app
  15. import (
  16. "context"
  17. "crypto/tls"
  18. "errors"
  19. "fmt"
  20. "net"
  21. "net/http"
  22. "net/url"
  23. "os"
  24. "path"
  25. "path/filepath"
  26. "strconv"
  27. "strings"
  28. "time"
  29. "github.com/coreos/go-systemd/daemon"
  30. "github.com/spf13/cobra"
  31. "github.com/spf13/pflag"
  32. "k8s.io/klog"
  33. "k8s.io/utils/mount"
  34. v1 "k8s.io/api/core/v1"
  35. "k8s.io/apimachinery/pkg/api/resource"
  36. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  37. "k8s.io/apimachinery/pkg/runtime"
  38. "k8s.io/apimachinery/pkg/types"
  39. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  40. "k8s.io/apimachinery/pkg/util/sets"
  41. "k8s.io/apimachinery/pkg/util/wait"
  42. genericapiserver "k8s.io/apiserver/pkg/server"
  43. "k8s.io/apiserver/pkg/server/healthz"
  44. utilfeature "k8s.io/apiserver/pkg/util/feature"
  45. clientset "k8s.io/client-go/kubernetes"
  46. certificatesclient "k8s.io/client-go/kubernetes/typed/certificates/v1beta1"
  47. v1core "k8s.io/client-go/kubernetes/typed/core/v1"
  48. restclient "k8s.io/client-go/rest"
  49. "k8s.io/client-go/tools/clientcmd"
  50. "k8s.io/client-go/tools/record"
  51. certutil "k8s.io/client-go/util/cert"
  52. "k8s.io/client-go/util/certificate"
  53. "k8s.io/client-go/util/connrotation"
  54. "k8s.io/client-go/util/keyutil"
  55. cloudprovider "k8s.io/cloud-provider"
  56. cliflag "k8s.io/component-base/cli/flag"
  57. "k8s.io/component-base/featuregate"
  58. "k8s.io/component-base/version"
  59. "k8s.io/component-base/version/verflag"
  60. kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1"
  61. "k8s.io/kubernetes/cmd/kubelet/app/options"
  62. "k8s.io/kubernetes/pkg/api/legacyscheme"
  63. api "k8s.io/kubernetes/pkg/apis/core"
  64. "k8s.io/kubernetes/pkg/capabilities"
  65. "k8s.io/kubernetes/pkg/credentialprovider"
  66. "k8s.io/kubernetes/pkg/features"
  67. "k8s.io/kubernetes/pkg/kubelet"
  68. kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
  69. kubeletscheme "k8s.io/kubernetes/pkg/kubelet/apis/config/scheme"
  70. kubeletconfigvalidation "k8s.io/kubernetes/pkg/kubelet/apis/config/validation"
  71. "k8s.io/kubernetes/pkg/kubelet/cadvisor"
  72. kubeletcertificate "k8s.io/kubernetes/pkg/kubelet/certificate"
  73. "k8s.io/kubernetes/pkg/kubelet/certificate/bootstrap"
  74. "k8s.io/kubernetes/pkg/kubelet/cm"
  75. "k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
  76. "k8s.io/kubernetes/pkg/kubelet/config"
  77. kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
  78. "k8s.io/kubernetes/pkg/kubelet/dockershim"
  79. dockerremote "k8s.io/kubernetes/pkg/kubelet/dockershim/remote"
  80. "k8s.io/kubernetes/pkg/kubelet/eviction"
  81. evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
  82. dynamickubeletconfig "k8s.io/kubernetes/pkg/kubelet/kubeletconfig"
  83. "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/configfiles"
  84. "k8s.io/kubernetes/pkg/kubelet/server"
  85. "k8s.io/kubernetes/pkg/kubelet/server/streaming"
  86. "k8s.io/kubernetes/pkg/kubelet/stats/pidlimit"
  87. kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
  88. "k8s.io/kubernetes/pkg/util/configz"
  89. utilfs "k8s.io/kubernetes/pkg/util/filesystem"
  90. utilflag "k8s.io/kubernetes/pkg/util/flag"
  91. "k8s.io/kubernetes/pkg/util/flock"
  92. nodeutil "k8s.io/kubernetes/pkg/util/node"
  93. "k8s.io/kubernetes/pkg/util/oom"
  94. "k8s.io/kubernetes/pkg/util/rlimit"
  95. "k8s.io/kubernetes/pkg/volume/util/hostutil"
  96. "k8s.io/kubernetes/pkg/volume/util/subpath"
  97. "k8s.io/utils/exec"
  98. )
  99. const (
  100. // Kubelet component name
  101. componentKubelet = "kubelet"
  102. )
  103. // NewKubeletCommand creates a *cobra.Command object with default parameters
  104. func NewKubeletCommand() *cobra.Command {
  105. cleanFlagSet := pflag.NewFlagSet(componentKubelet, pflag.ContinueOnError)
  106. cleanFlagSet.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
  107. kubeletFlags := options.NewKubeletFlags()
  108. kubeletConfig, err := options.NewKubeletConfiguration()
  109. // programmer error
  110. if err != nil {
  111. klog.Fatal(err)
  112. }
  113. cmd := &cobra.Command{
  114. Use: componentKubelet,
  115. Long: `The kubelet is the primary "node agent" that runs on each
  116. node. It can register the node with the apiserver using one of: the hostname; a flag to
  117. override the hostname; or specific logic for a cloud provider.
  118. The kubelet works in terms of a PodSpec. A PodSpec is a YAML or JSON object
  119. that describes a pod. The kubelet takes a set of PodSpecs that are provided through
  120. various mechanisms (primarily through the apiserver) and ensures that the containers
  121. described in those PodSpecs are running and healthy. The kubelet doesn't manage
  122. containers which were not created by Kubernetes.
  123. Other than from an PodSpec from the apiserver, there are three ways that a container
  124. manifest can be provided to the Kubelet.
  125. File: Path passed as a flag on the command line. Files under this path will be monitored
  126. periodically for updates. The monitoring period is 20s by default and is configurable
  127. via a flag.
  128. HTTP endpoint: HTTP endpoint passed as a parameter on the command line. This endpoint
  129. is checked every 20 seconds (also configurable with a flag).
  130. HTTP server: The kubelet can also listen for HTTP and respond to a simple API
  131. (underspec'd currently) to submit a new manifest.`,
  132. // The Kubelet has special flag parsing requirements to enforce flag precedence rules,
  133. // so we do all our parsing manually in Run, below.
  134. // DisableFlagParsing=true provides the full set of flags passed to the kubelet in the
  135. // `args` arg to Run, without Cobra's interference.
  136. DisableFlagParsing: true,
  137. Run: func(cmd *cobra.Command, args []string) {
  138. // initial flag parse, since we disable cobra's flag parsing
  139. if err := cleanFlagSet.Parse(args); err != nil {
  140. cmd.Usage()
  141. klog.Fatal(err)
  142. }
  143. // check if there are non-flag arguments in the command line
  144. cmds := cleanFlagSet.Args()
  145. if len(cmds) > 0 {
  146. cmd.Usage()
  147. klog.Fatalf("unknown command: %s", cmds[0])
  148. }
  149. // short-circuit on help
  150. help, err := cleanFlagSet.GetBool("help")
  151. if err != nil {
  152. klog.Fatal(`"help" flag is non-bool, programmer error, please correct`)
  153. }
  154. if help {
  155. cmd.Help()
  156. return
  157. }
  158. // short-circuit on verflag
  159. verflag.PrintAndExitIfRequested()
  160. utilflag.PrintFlags(cleanFlagSet)
  161. // set feature gates from initial flags-based config
  162. if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
  163. klog.Fatal(err)
  164. }
  165. // validate the initial KubeletFlags
  166. if err := options.ValidateKubeletFlags(kubeletFlags); err != nil {
  167. klog.Fatal(err)
  168. }
  169. if kubeletFlags.ContainerRuntime == "remote" && cleanFlagSet.Changed("pod-infra-container-image") {
  170. klog.Warning("Warning: For remote container runtime, --pod-infra-container-image is ignored in kubelet, which should be set in that remote runtime instead")
  171. }
  172. // load kubelet config file, if provided
  173. if configFile := kubeletFlags.KubeletConfigFile; len(configFile) > 0 {
  174. kubeletConfig, err = loadConfigFile(configFile)
  175. if err != nil {
  176. klog.Fatal(err)
  177. }
  178. // We must enforce flag precedence by re-parsing the command line into the new object.
  179. // This is necessary to preserve backwards-compatibility across binary upgrades.
  180. // See issue #56171 for more details.
  181. if err := kubeletConfigFlagPrecedence(kubeletConfig, args); err != nil {
  182. klog.Fatal(err)
  183. }
  184. // update feature gates based on new config
  185. if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
  186. klog.Fatal(err)
  187. }
  188. }
  189. // We always validate the local configuration (command line + config file).
  190. // This is the default "last-known-good" config for dynamic config, and must always remain valid.
  191. if err := kubeletconfigvalidation.ValidateKubeletConfiguration(kubeletConfig); err != nil {
  192. klog.Fatal(err)
  193. }
  194. // use dynamic kubelet config, if enabled
  195. var kubeletConfigController *dynamickubeletconfig.Controller
  196. if dynamicConfigDir := kubeletFlags.DynamicConfigDir.Value(); len(dynamicConfigDir) > 0 {
  197. var dynamicKubeletConfig *kubeletconfiginternal.KubeletConfiguration
  198. dynamicKubeletConfig, kubeletConfigController, err = BootstrapKubeletConfigController(dynamicConfigDir,
  199. func(kc *kubeletconfiginternal.KubeletConfiguration) error {
  200. // Here, we enforce flag precedence inside the controller, prior to the controller's validation sequence,
  201. // so that we get a complete validation at the same point where we can decide to reject dynamic config.
  202. // This fixes the flag-precedence component of issue #63305.
  203. // See issue #56171 for general details on flag precedence.
  204. return kubeletConfigFlagPrecedence(kc, args)
  205. })
  206. if err != nil {
  207. klog.Fatal(err)
  208. }
  209. // If we should just use our existing, local config, the controller will return a nil config
  210. if dynamicKubeletConfig != nil {
  211. kubeletConfig = dynamicKubeletConfig
  212. // Note: flag precedence was already enforced in the controller, prior to validation,
  213. // by our above transform function. Now we simply update feature gates from the new config.
  214. if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
  215. klog.Fatal(err)
  216. }
  217. }
  218. }
  219. // construct a KubeletServer from kubeletFlags and kubeletConfig
  220. kubeletServer := &options.KubeletServer{
  221. KubeletFlags: *kubeletFlags,
  222. KubeletConfiguration: *kubeletConfig,
  223. }
  224. // use kubeletServer to construct the default KubeletDeps
  225. kubeletDeps, err := UnsecuredDependencies(kubeletServer, utilfeature.DefaultFeatureGate)
  226. if err != nil {
  227. klog.Fatal(err)
  228. }
  229. // add the kubelet config controller to kubeletDeps
  230. kubeletDeps.KubeletConfigController = kubeletConfigController
  231. // set up stopCh here in order to be reused by kubelet and docker shim
  232. stopCh := genericapiserver.SetupSignalHandler()
  233. // start the experimental docker shim, if enabled
  234. if kubeletServer.KubeletFlags.ExperimentalDockershim {
  235. if err := RunDockershim(&kubeletServer.KubeletFlags, kubeletConfig, stopCh); err != nil {
  236. klog.Fatal(err)
  237. }
  238. return
  239. }
  240. // run the kubelet
  241. klog.V(5).Infof("KubeletConfiguration: %#v", kubeletServer.KubeletConfiguration)
  242. if err := Run(kubeletServer, kubeletDeps, utilfeature.DefaultFeatureGate, stopCh); err != nil {
  243. klog.Fatal(err)
  244. }
  245. },
  246. }
  247. // keep cleanFlagSet separate, so Cobra doesn't pollute it with the global flags
  248. kubeletFlags.AddFlags(cleanFlagSet)
  249. options.AddKubeletConfigFlags(cleanFlagSet, kubeletConfig)
  250. options.AddGlobalFlags(cleanFlagSet)
  251. cleanFlagSet.BoolP("help", "h", false, fmt.Sprintf("help for %s", cmd.Name()))
  252. // ugly, but necessary, because Cobra's default UsageFunc and HelpFunc pollute the flagset with global flags
  253. const usageFmt = "Usage:\n %s\n\nFlags:\n%s"
  254. cmd.SetUsageFunc(func(cmd *cobra.Command) error {
  255. fmt.Fprintf(cmd.OutOrStderr(), usageFmt, cmd.UseLine(), cleanFlagSet.FlagUsagesWrapped(2))
  256. return nil
  257. })
  258. cmd.SetHelpFunc(func(cmd *cobra.Command, args []string) {
  259. fmt.Fprintf(cmd.OutOrStdout(), "%s\n\n"+usageFmt, cmd.Long, cmd.UseLine(), cleanFlagSet.FlagUsagesWrapped(2))
  260. })
  261. return cmd
  262. }
  263. // newFlagSetWithGlobals constructs a new pflag.FlagSet with global flags registered
  264. // on it.
  265. func newFlagSetWithGlobals() *pflag.FlagSet {
  266. fs := pflag.NewFlagSet("", pflag.ExitOnError)
  267. // set the normalize func, similar to k8s.io/component-base/cli//flags.go:InitFlags
  268. fs.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
  269. // explicitly add flags from libs that register global flags
  270. options.AddGlobalFlags(fs)
  271. return fs
  272. }
  273. // newFakeFlagSet constructs a pflag.FlagSet with the same flags as fs, but where
  274. // all values have noop Set implementations
  275. func newFakeFlagSet(fs *pflag.FlagSet) *pflag.FlagSet {
  276. ret := pflag.NewFlagSet("", pflag.ExitOnError)
  277. ret.SetNormalizeFunc(fs.GetNormalizeFunc())
  278. fs.VisitAll(func(f *pflag.Flag) {
  279. ret.VarP(cliflag.NoOp{}, f.Name, f.Shorthand, f.Usage)
  280. })
  281. return ret
  282. }
  283. // kubeletConfigFlagPrecedence re-parses flags over the KubeletConfiguration object.
  284. // We must enforce flag precedence by re-parsing the command line into the new object.
  285. // This is necessary to preserve backwards-compatibility across binary upgrades.
  286. // See issue #56171 for more details.
  287. func kubeletConfigFlagPrecedence(kc *kubeletconfiginternal.KubeletConfiguration, args []string) error {
  288. // We use a throwaway kubeletFlags and a fake global flagset to avoid double-parses,
  289. // as some Set implementations accumulate values from multiple flag invocations.
  290. fs := newFakeFlagSet(newFlagSetWithGlobals())
  291. // register throwaway KubeletFlags
  292. options.NewKubeletFlags().AddFlags(fs)
  293. // register new KubeletConfiguration
  294. options.AddKubeletConfigFlags(fs, kc)
  295. // Remember original feature gates, so we can merge with flag gates later
  296. original := kc.FeatureGates
  297. // re-parse flags
  298. if err := fs.Parse(args); err != nil {
  299. return err
  300. }
  301. // Add back feature gates that were set in the original kc, but not in flags
  302. for k, v := range original {
  303. if _, ok := kc.FeatureGates[k]; !ok {
  304. kc.FeatureGates[k] = v
  305. }
  306. }
  307. return nil
  308. }
  309. func loadConfigFile(name string) (*kubeletconfiginternal.KubeletConfiguration, error) {
  310. const errFmt = "failed to load Kubelet config file %s, error %v"
  311. // compute absolute path based on current working dir
  312. kubeletConfigFile, err := filepath.Abs(name)
  313. if err != nil {
  314. return nil, fmt.Errorf(errFmt, name, err)
  315. }
  316. loader, err := configfiles.NewFsLoader(utilfs.DefaultFs{}, kubeletConfigFile)
  317. if err != nil {
  318. return nil, fmt.Errorf(errFmt, name, err)
  319. }
  320. kc, err := loader.Load()
  321. if err != nil {
  322. return nil, fmt.Errorf(errFmt, name, err)
  323. }
  324. return kc, err
  325. }
  326. // UnsecuredDependencies returns a Dependencies suitable for being run, or an error if the server setup
  327. // is not valid. It will not start any background processes, and does not include authentication/authorization
  328. func UnsecuredDependencies(s *options.KubeletServer, featureGate featuregate.FeatureGate) (*kubelet.Dependencies, error) {
  329. // Initialize the TLS Options
  330. tlsOptions, err := InitializeTLS(&s.KubeletFlags, &s.KubeletConfiguration)
  331. if err != nil {
  332. return nil, err
  333. }
  334. mounter := mount.New(s.ExperimentalMounterPath)
  335. subpather := subpath.New(mounter)
  336. hu := hostutil.NewHostUtil()
  337. var pluginRunner = exec.New()
  338. var dockerClientConfig *dockershim.ClientConfig
  339. if s.ContainerRuntime == kubetypes.DockerContainerRuntime {
  340. dockerClientConfig = &dockershim.ClientConfig{
  341. DockerEndpoint: s.DockerEndpoint,
  342. RuntimeRequestTimeout: s.RuntimeRequestTimeout.Duration,
  343. ImagePullProgressDeadline: s.ImagePullProgressDeadline.Duration,
  344. }
  345. }
  346. plugins, err := ProbeVolumePlugins(featureGate)
  347. if err != nil {
  348. return nil, err
  349. }
  350. return &kubelet.Dependencies{
  351. Auth: nil, // default does not enforce auth[nz]
  352. CAdvisorInterface: nil, // cadvisor.New launches background processes (bg http.ListenAndServe, and some bg cleaners), not set here
  353. Cloud: nil, // cloud provider might start background processes
  354. ContainerManager: nil,
  355. DockerClientConfig: dockerClientConfig,
  356. KubeClient: nil,
  357. HeartbeatClient: nil,
  358. EventClient: nil,
  359. HostUtil: hu,
  360. Mounter: mounter,
  361. Subpather: subpather,
  362. OOMAdjuster: oom.NewOOMAdjuster(),
  363. OSInterface: kubecontainer.RealOS{},
  364. VolumePlugins: plugins,
  365. DynamicPluginProber: GetDynamicPluginProber(s.VolumePluginDir, pluginRunner),
  366. TLSOptions: tlsOptions}, nil
  367. }
  368. // Run runs the specified KubeletServer with the given Dependencies. This should never exit.
  369. // The kubeDeps argument may be nil - if so, it is initialized from the settings on KubeletServer.
  370. // Otherwise, the caller is assumed to have set up the Dependencies object and a default one will
  371. // not be generated.
  372. func Run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate, stopCh <-chan struct{}) error {
  373. // To help debugging, immediately log version
  374. klog.Infof("Version: %+v", version.Get())
  375. if err := initForOS(s.KubeletFlags.WindowsService); err != nil {
  376. return fmt.Errorf("failed OS init: %v", err)
  377. }
  378. if err := run(s, kubeDeps, featureGate, stopCh); err != nil {
  379. return fmt.Errorf("failed to run Kubelet: %v", err)
  380. }
  381. return nil
  382. }
  383. func checkPermissions() error {
  384. if uid := os.Getuid(); uid != 0 {
  385. return fmt.Errorf("kubelet needs to run as uid `0`. It is being run as %d", uid)
  386. }
  387. // TODO: Check if kubelet is running in the `initial` user namespace.
  388. // http://man7.org/linux/man-pages/man7/user_namespaces.7.html
  389. return nil
  390. }
  391. func setConfigz(cz *configz.Config, kc *kubeletconfiginternal.KubeletConfiguration) error {
  392. scheme, _, err := kubeletscheme.NewSchemeAndCodecs()
  393. if err != nil {
  394. return err
  395. }
  396. versioned := kubeletconfigv1beta1.KubeletConfiguration{}
  397. if err := scheme.Convert(kc, &versioned, nil); err != nil {
  398. return err
  399. }
  400. cz.Set(versioned)
  401. return nil
  402. }
  403. func initConfigz(kc *kubeletconfiginternal.KubeletConfiguration) error {
  404. cz, err := configz.New("kubeletconfig")
  405. if err != nil {
  406. klog.Errorf("unable to register configz: %s", err)
  407. return err
  408. }
  409. if err := setConfigz(cz, kc); err != nil {
  410. klog.Errorf("unable to register config: %s", err)
  411. return err
  412. }
  413. return nil
  414. }
  415. // makeEventRecorder sets up kubeDeps.Recorder if it's nil. It's a no-op otherwise.
  416. func makeEventRecorder(kubeDeps *kubelet.Dependencies, nodeName types.NodeName) {
  417. if kubeDeps.Recorder != nil {
  418. return
  419. }
  420. eventBroadcaster := record.NewBroadcaster()
  421. kubeDeps.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: componentKubelet, Host: string(nodeName)})
  422. eventBroadcaster.StartLogging(klog.V(3).Infof)
  423. if kubeDeps.EventClient != nil {
  424. klog.V(4).Infof("Sending events to api server.")
  425. eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeDeps.EventClient.Events("")})
  426. } else {
  427. klog.Warning("No api server defined - no events will be sent to API server.")
  428. }
  429. }
  430. func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate, stopCh <-chan struct{}) (err error) {
  431. // Set global feature gates based on the value on the initial KubeletServer
  432. err = utilfeature.DefaultMutableFeatureGate.SetFromMap(s.KubeletConfiguration.FeatureGates)
  433. if err != nil {
  434. return err
  435. }
  436. // validate the initial KubeletServer (we set feature gates first, because this validation depends on feature gates)
  437. if err := options.ValidateKubeletServer(s); err != nil {
  438. return err
  439. }
  440. // Obtain Kubelet Lock File
  441. if s.ExitOnLockContention && s.LockFilePath == "" {
  442. return errors.New("cannot exit on lock file contention: no lock file specified")
  443. }
  444. done := make(chan struct{})
  445. if s.LockFilePath != "" {
  446. klog.Infof("acquiring file lock on %q", s.LockFilePath)
  447. if err := flock.Acquire(s.LockFilePath); err != nil {
  448. return fmt.Errorf("unable to acquire file lock on %q: %v", s.LockFilePath, err)
  449. }
  450. if s.ExitOnLockContention {
  451. klog.Infof("watching for inotify events for: %v", s.LockFilePath)
  452. if err := watchForLockfileContention(s.LockFilePath, done); err != nil {
  453. return err
  454. }
  455. }
  456. }
  457. // Register current configuration with /configz endpoint
  458. err = initConfigz(&s.KubeletConfiguration)
  459. if err != nil {
  460. klog.Errorf("unable to register KubeletConfiguration with configz, error: %v", err)
  461. }
  462. // About to get clients and such, detect standaloneMode
  463. standaloneMode := true
  464. if len(s.KubeConfig) > 0 {
  465. standaloneMode = false
  466. }
  467. if kubeDeps == nil {
  468. kubeDeps, err = UnsecuredDependencies(s, featureGate)
  469. if err != nil {
  470. return err
  471. }
  472. }
  473. if kubeDeps.Cloud == nil {
  474. if !cloudprovider.IsExternal(s.CloudProvider) {
  475. cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
  476. if err != nil {
  477. return err
  478. }
  479. if cloud == nil {
  480. klog.V(2).Infof("No cloud provider specified: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile)
  481. } else {
  482. klog.V(2).Infof("Successfully initialized cloud provider: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile)
  483. }
  484. kubeDeps.Cloud = cloud
  485. }
  486. }
  487. hostName, err := nodeutil.GetHostname(s.HostnameOverride)
  488. if err != nil {
  489. return err
  490. }
  491. nodeName, err := getNodeName(kubeDeps.Cloud, hostName)
  492. if err != nil {
  493. return err
  494. }
  495. // if in standalone mode, indicate as much by setting all clients to nil
  496. switch {
  497. case standaloneMode:
  498. kubeDeps.KubeClient = nil
  499. kubeDeps.EventClient = nil
  500. kubeDeps.HeartbeatClient = nil
  501. klog.Warningf("standalone mode, no API client")
  502. case kubeDeps.KubeClient == nil, kubeDeps.EventClient == nil, kubeDeps.HeartbeatClient == nil:
  503. clientConfig, closeAllConns, err := buildKubeletClientConfig(s, nodeName)
  504. if err != nil {
  505. return err
  506. }
  507. if closeAllConns == nil {
  508. return errors.New("closeAllConns must be a valid function other than nil")
  509. }
  510. kubeDeps.OnHeartbeatFailure = closeAllConns
  511. kubeDeps.KubeClient, err = clientset.NewForConfig(clientConfig)
  512. if err != nil {
  513. return fmt.Errorf("failed to initialize kubelet client: %v", err)
  514. }
  515. // make a separate client for events
  516. eventClientConfig := *clientConfig
  517. eventClientConfig.QPS = float32(s.EventRecordQPS)
  518. eventClientConfig.Burst = int(s.EventBurst)
  519. kubeDeps.EventClient, err = v1core.NewForConfig(&eventClientConfig)
  520. if err != nil {
  521. return fmt.Errorf("failed to initialize kubelet event client: %v", err)
  522. }
  523. // make a separate client for heartbeat with throttling disabled and a timeout attached
  524. heartbeatClientConfig := *clientConfig
  525. heartbeatClientConfig.Timeout = s.KubeletConfiguration.NodeStatusUpdateFrequency.Duration
  526. // The timeout is the minimum of the lease duration and status update frequency
  527. leaseTimeout := time.Duration(s.KubeletConfiguration.NodeLeaseDurationSeconds) * time.Second
  528. if heartbeatClientConfig.Timeout > leaseTimeout {
  529. heartbeatClientConfig.Timeout = leaseTimeout
  530. }
  531. heartbeatClientConfig.QPS = float32(-1)
  532. kubeDeps.HeartbeatClient, err = clientset.NewForConfig(&heartbeatClientConfig)
  533. if err != nil {
  534. return fmt.Errorf("failed to initialize kubelet heartbeat client: %v", err)
  535. }
  536. }
  537. if kubeDeps.Auth == nil {
  538. auth, err := BuildAuth(nodeName, kubeDeps.KubeClient, s.KubeletConfiguration)
  539. if err != nil {
  540. return err
  541. }
  542. kubeDeps.Auth = auth
  543. }
  544. var cgroupRoots []string
  545. cgroupRoots = append(cgroupRoots, cm.NodeAllocatableRoot(s.CgroupRoot, s.CgroupDriver))
  546. kubeletCgroup, err := cm.GetKubeletContainer(s.KubeletCgroups)
  547. if err != nil {
  548. klog.Warningf("failed to get the kubelet's cgroup: %v. Kubelet system container metrics may be missing.", err)
  549. } else if kubeletCgroup != "" {
  550. cgroupRoots = append(cgroupRoots, kubeletCgroup)
  551. }
  552. runtimeCgroup, err := cm.GetRuntimeContainer(s.ContainerRuntime, s.RuntimeCgroups)
  553. if err != nil {
  554. klog.Warningf("failed to get the container runtime's cgroup: %v. Runtime system container metrics may be missing.", err)
  555. } else if runtimeCgroup != "" {
  556. // RuntimeCgroups is optional, so ignore if it isn't specified
  557. cgroupRoots = append(cgroupRoots, runtimeCgroup)
  558. }
  559. if s.SystemCgroups != "" {
  560. // SystemCgroups is optional, so ignore if it isn't specified
  561. cgroupRoots = append(cgroupRoots, s.SystemCgroups)
  562. }
  563. if kubeDeps.CAdvisorInterface == nil {
  564. imageFsInfoProvider := cadvisor.NewImageFsInfoProvider(s.ContainerRuntime, s.RemoteRuntimeEndpoint)
  565. kubeDeps.CAdvisorInterface, err = cadvisor.New(imageFsInfoProvider, s.RootDirectory, cgroupRoots, cadvisor.UsingLegacyCadvisorStats(s.ContainerRuntime, s.RemoteRuntimeEndpoint))
  566. if err != nil {
  567. return err
  568. }
  569. }
  570. // Setup event recorder if required.
  571. makeEventRecorder(kubeDeps, nodeName)
  572. if kubeDeps.ContainerManager == nil {
  573. if s.CgroupsPerQOS && s.CgroupRoot == "" {
  574. klog.Info("--cgroups-per-qos enabled, but --cgroup-root was not specified. defaulting to /")
  575. s.CgroupRoot = "/"
  576. }
  577. var reservedSystemCPUs cpuset.CPUSet
  578. var errParse error
  579. if s.ReservedSystemCPUs != "" {
  580. reservedSystemCPUs, errParse = cpuset.Parse(s.ReservedSystemCPUs)
  581. if errParse != nil {
  582. // invalid cpu list is provided, set reservedSystemCPUs to empty, so it won't overwrite kubeReserved/systemReserved
  583. klog.Infof("Invalid ReservedSystemCPUs \"%s\"", s.ReservedSystemCPUs)
  584. return errParse
  585. }
  586. // is it safe do use CAdvisor here ??
  587. machineInfo, err := kubeDeps.CAdvisorInterface.MachineInfo()
  588. if err != nil {
  589. // if can't use CAdvisor here, fall back to non-explicit cpu list behavor
  590. klog.Warning("Failed to get MachineInfo, set reservedSystemCPUs to empty")
  591. reservedSystemCPUs = cpuset.NewCPUSet()
  592. } else {
  593. reservedList := reservedSystemCPUs.ToSlice()
  594. first := reservedList[0]
  595. last := reservedList[len(reservedList)-1]
  596. if first < 0 || last >= machineInfo.NumCores {
  597. // the specified cpuset is outside of the range of what the machine has
  598. klog.Infof("Invalid cpuset specified by --reserved-cpus")
  599. return fmt.Errorf("Invalid cpuset %q specified by --reserved-cpus", s.ReservedSystemCPUs)
  600. }
  601. }
  602. } else {
  603. reservedSystemCPUs = cpuset.NewCPUSet()
  604. }
  605. if reservedSystemCPUs.Size() > 0 {
  606. // at cmd option valication phase it is tested either --system-reserved-cgroup or --kube-reserved-cgroup is specified, so overwrite should be ok
  607. klog.Infof("Option --reserved-cpus is specified, it will overwrite the cpu setting in KubeReserved=\"%v\", SystemReserved=\"%v\".", s.KubeReserved, s.SystemReserved)
  608. if s.KubeReserved != nil {
  609. delete(s.KubeReserved, "cpu")
  610. }
  611. if s.SystemReserved == nil {
  612. s.SystemReserved = make(map[string]string)
  613. }
  614. s.SystemReserved["cpu"] = strconv.Itoa(reservedSystemCPUs.Size())
  615. klog.Infof("After cpu setting is overwritten, KubeReserved=\"%v\", SystemReserved=\"%v\"", s.KubeReserved, s.SystemReserved)
  616. }
  617. kubeReserved, err := parseResourceList(s.KubeReserved)
  618. if err != nil {
  619. return err
  620. }
  621. systemReserved, err := parseResourceList(s.SystemReserved)
  622. if err != nil {
  623. return err
  624. }
  625. var hardEvictionThresholds []evictionapi.Threshold
  626. // If the user requested to ignore eviction thresholds, then do not set valid values for hardEvictionThresholds here.
  627. if !s.ExperimentalNodeAllocatableIgnoreEvictionThreshold {
  628. hardEvictionThresholds, err = eviction.ParseThresholdConfig([]string{}, s.EvictionHard, nil, nil, nil)
  629. if err != nil {
  630. return err
  631. }
  632. }
  633. experimentalQOSReserved, err := cm.ParseQOSReserved(s.QOSReserved)
  634. if err != nil {
  635. return err
  636. }
  637. devicePluginEnabled := utilfeature.DefaultFeatureGate.Enabled(features.DevicePlugins)
  638. kubeDeps.ContainerManager, err = cm.NewContainerManager(
  639. kubeDeps.Mounter,
  640. kubeDeps.CAdvisorInterface,
  641. cm.NodeConfig{
  642. RuntimeCgroupsName: s.RuntimeCgroups,
  643. SystemCgroupsName: s.SystemCgroups,
  644. KubeletCgroupsName: s.KubeletCgroups,
  645. ContainerRuntime: s.ContainerRuntime,
  646. CgroupsPerQOS: s.CgroupsPerQOS,
  647. CgroupRoot: s.CgroupRoot,
  648. CgroupDriver: s.CgroupDriver,
  649. KubeletRootDir: s.RootDirectory,
  650. ProtectKernelDefaults: s.ProtectKernelDefaults,
  651. NodeAllocatableConfig: cm.NodeAllocatableConfig{
  652. KubeReservedCgroupName: s.KubeReservedCgroup,
  653. SystemReservedCgroupName: s.SystemReservedCgroup,
  654. EnforceNodeAllocatable: sets.NewString(s.EnforceNodeAllocatable...),
  655. KubeReserved: kubeReserved,
  656. SystemReserved: systemReserved,
  657. ReservedSystemCPUs: reservedSystemCPUs,
  658. HardEvictionThresholds: hardEvictionThresholds,
  659. },
  660. QOSReserved: *experimentalQOSReserved,
  661. ExperimentalCPUManagerPolicy: s.CPUManagerPolicy,
  662. ExperimentalCPUManagerReconcilePeriod: s.CPUManagerReconcilePeriod.Duration,
  663. ExperimentalPodPidsLimit: s.PodPidsLimit,
  664. EnforceCPULimits: s.CPUCFSQuota,
  665. CPUCFSQuotaPeriod: s.CPUCFSQuotaPeriod.Duration,
  666. ExperimentalTopologyManagerPolicy: s.TopologyManagerPolicy,
  667. },
  668. s.FailSwapOn,
  669. devicePluginEnabled,
  670. kubeDeps.Recorder)
  671. if err != nil {
  672. return err
  673. }
  674. }
  675. if err := checkPermissions(); err != nil {
  676. klog.Error(err)
  677. }
  678. utilruntime.ReallyCrash = s.ReallyCrashForTesting
  679. // TODO(vmarmol): Do this through container config.
  680. oomAdjuster := kubeDeps.OOMAdjuster
  681. if err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil {
  682. klog.Warning(err)
  683. }
  684. err = kubelet.PreInitRuntimeService(&s.KubeletConfiguration,
  685. kubeDeps, &s.ContainerRuntimeOptions,
  686. s.ContainerRuntime,
  687. s.RuntimeCgroups,
  688. s.RemoteRuntimeEndpoint,
  689. s.RemoteImageEndpoint,
  690. s.NonMasqueradeCIDR)
  691. if err != nil {
  692. return err
  693. }
  694. if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
  695. return err
  696. }
  697. // If the kubelet config controller is available, and dynamic config is enabled, start the config and status sync loops
  698. if utilfeature.DefaultFeatureGate.Enabled(features.DynamicKubeletConfig) && len(s.DynamicConfigDir.Value()) > 0 &&
  699. kubeDeps.KubeletConfigController != nil && !standaloneMode && !s.RunOnce {
  700. if err := kubeDeps.KubeletConfigController.StartSync(kubeDeps.KubeClient, kubeDeps.EventClient, string(nodeName)); err != nil {
  701. return err
  702. }
  703. }
  704. if s.HealthzPort > 0 {
  705. mux := http.NewServeMux()
  706. healthz.InstallHandler(mux)
  707. go wait.Until(func() {
  708. err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(int(s.HealthzPort))), mux)
  709. if err != nil {
  710. klog.Errorf("Starting healthz server failed: %v", err)
  711. }
  712. }, 5*time.Second, wait.NeverStop)
  713. }
  714. if s.RunOnce {
  715. return nil
  716. }
  717. // If systemd is used, notify it that we have started
  718. go daemon.SdNotify(false, "READY=1")
  719. select {
  720. case <-done:
  721. break
  722. case <-stopCh:
  723. break
  724. }
  725. return nil
  726. }
  727. // buildKubeletClientConfig constructs the appropriate client config for the kubelet depending on whether
  728. // bootstrapping is enabled or client certificate rotation is enabled.
  729. func buildKubeletClientConfig(s *options.KubeletServer, nodeName types.NodeName) (*restclient.Config, func(), error) {
  730. if s.RotateCertificates && utilfeature.DefaultFeatureGate.Enabled(features.RotateKubeletClientCertificate) {
  731. // Rules for client rotation and the handling of kube config files:
  732. //
  733. // 1. If the client provides only a kubeconfig file, we must use that as the initial client
  734. // kubeadm needs the initial data in the kubeconfig to be placed into the cert store
  735. // 2. If the client provides only an initial bootstrap kubeconfig file, we must create a
  736. // kubeconfig file at the target location that points to the cert store, but until
  737. // the file is present the client config will have no certs
  738. // 3. If the client provides both and the kubeconfig is valid, we must ignore the bootstrap
  739. // kubeconfig.
  740. // 4. If the client provides both and the kubeconfig is expired or otherwise invalid, we must
  741. // replace the kubeconfig with a new file that points to the cert dir
  742. //
  743. // The desired configuration for bootstrapping is to use a bootstrap kubeconfig and to have
  744. // the kubeconfig file be managed by this process. For backwards compatibility with kubeadm,
  745. // which provides a high powered kubeconfig on the master with cert/key data, we must
  746. // bootstrap the cert manager with the contents of the initial client config.
  747. klog.Infof("Client rotation is on, will bootstrap in background")
  748. certConfig, clientConfig, err := bootstrap.LoadClientConfig(s.KubeConfig, s.BootstrapKubeconfig, s.CertDirectory)
  749. if err != nil {
  750. return nil, nil, err
  751. }
  752. // use the correct content type for cert rotation, but don't set QPS
  753. setContentTypeForClient(certConfig, s.ContentType)
  754. kubeClientConfigOverrides(s, clientConfig)
  755. clientCertificateManager, err := buildClientCertificateManager(certConfig, clientConfig, s.CertDirectory, nodeName)
  756. if err != nil {
  757. return nil, nil, err
  758. }
  759. // the rotating transport will use the cert from the cert manager instead of these files
  760. transportConfig := restclient.AnonymousClientConfig(clientConfig)
  761. // we set exitAfter to five minutes because we use this client configuration to request new certs - if we are unable
  762. // to request new certs, we will be unable to continue normal operation. Exiting the process allows a wrapper
  763. // or the bootstrapping credentials to potentially lay down new initial config.
  764. closeAllConns, err := kubeletcertificate.UpdateTransport(wait.NeverStop, transportConfig, clientCertificateManager, 5*time.Minute)
  765. if err != nil {
  766. return nil, nil, err
  767. }
  768. klog.V(2).Info("Starting client certificate rotation.")
  769. clientCertificateManager.Start()
  770. return transportConfig, closeAllConns, nil
  771. }
  772. if len(s.BootstrapKubeconfig) > 0 {
  773. if err := bootstrap.LoadClientCert(s.KubeConfig, s.BootstrapKubeconfig, s.CertDirectory, nodeName); err != nil {
  774. return nil, nil, err
  775. }
  776. }
  777. clientConfig, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
  778. &clientcmd.ClientConfigLoadingRules{ExplicitPath: s.KubeConfig},
  779. &clientcmd.ConfigOverrides{},
  780. ).ClientConfig()
  781. if err != nil {
  782. return nil, nil, fmt.Errorf("invalid kubeconfig: %v", err)
  783. }
  784. kubeClientConfigOverrides(s, clientConfig)
  785. closeAllConns, err := updateDialer(clientConfig)
  786. if err != nil {
  787. return nil, nil, err
  788. }
  789. return clientConfig, closeAllConns, nil
  790. }
  791. // updateDialer instruments a restconfig with a dial. the returned function allows forcefully closing all active connections.
  792. func updateDialer(clientConfig *restclient.Config) (func(), error) {
  793. if clientConfig.Transport != nil || clientConfig.Dial != nil {
  794. return nil, fmt.Errorf("there is already a transport or dialer configured")
  795. }
  796. d := connrotation.NewDialer((&net.Dialer{Timeout: 30 * time.Second, KeepAlive: 30 * time.Second}).DialContext)
  797. clientConfig.Dial = d.DialContext
  798. return d.CloseAll, nil
  799. }
  800. // buildClientCertificateManager creates a certificate manager that will use certConfig to request a client certificate
  801. // if no certificate is available, or the most recent clientConfig (which is assumed to point to the cert that the manager will
  802. // write out).
  803. func buildClientCertificateManager(certConfig, clientConfig *restclient.Config, certDir string, nodeName types.NodeName) (certificate.Manager, error) {
  804. newClientFn := func(current *tls.Certificate) (certificatesclient.CertificateSigningRequestInterface, error) {
  805. // If we have a valid certificate, use that to fetch CSRs. Otherwise use the bootstrap
  806. // credentials. In the future it would be desirable to change the behavior of bootstrap
  807. // to always fall back to the external bootstrap credentials when such credentials are
  808. // provided by a fundamental trust system like cloud VM identity or an HSM module.
  809. config := certConfig
  810. if current != nil {
  811. config = clientConfig
  812. }
  813. client, err := clientset.NewForConfig(config)
  814. if err != nil {
  815. return nil, err
  816. }
  817. return client.CertificatesV1beta1().CertificateSigningRequests(), nil
  818. }
  819. return kubeletcertificate.NewKubeletClientCertificateManager(
  820. certDir,
  821. nodeName,
  822. // this preserves backwards compatibility with kubeadm which passes
  823. // a high powered certificate to the kubelet as --kubeconfig and expects
  824. // it to be rotated out immediately
  825. clientConfig.CertData,
  826. clientConfig.KeyData,
  827. clientConfig.CertFile,
  828. clientConfig.KeyFile,
  829. newClientFn,
  830. )
  831. }
  832. func kubeClientConfigOverrides(s *options.KubeletServer, clientConfig *restclient.Config) {
  833. setContentTypeForClient(clientConfig, s.ContentType)
  834. // Override kubeconfig qps/burst settings from flags
  835. clientConfig.QPS = float32(s.KubeAPIQPS)
  836. clientConfig.Burst = int(s.KubeAPIBurst)
  837. }
  838. // getNodeName returns the node name according to the cloud provider
  839. // if cloud provider is specified. Otherwise, returns the hostname of the node.
  840. func getNodeName(cloud cloudprovider.Interface, hostname string) (types.NodeName, error) {
  841. if cloud == nil {
  842. return types.NodeName(hostname), nil
  843. }
  844. instances, ok := cloud.Instances()
  845. if !ok {
  846. return "", fmt.Errorf("failed to get instances from cloud provider")
  847. }
  848. nodeName, err := instances.CurrentNodeName(context.TODO(), hostname)
  849. if err != nil {
  850. return "", fmt.Errorf("error fetching current node name from cloud provider: %v", err)
  851. }
  852. klog.V(2).Infof("cloud provider determined current node name to be %s", nodeName)
  853. return nodeName, nil
  854. }
  855. // InitializeTLS checks for a configured TLSCertFile and TLSPrivateKeyFile: if unspecified a new self-signed
  856. // certificate and key file are generated. Returns a configured server.TLSOptions object.
  857. func InitializeTLS(kf *options.KubeletFlags, kc *kubeletconfiginternal.KubeletConfiguration) (*server.TLSOptions, error) {
  858. if !kc.ServerTLSBootstrap && kc.TLSCertFile == "" && kc.TLSPrivateKeyFile == "" {
  859. kc.TLSCertFile = path.Join(kf.CertDirectory, "kubelet.crt")
  860. kc.TLSPrivateKeyFile = path.Join(kf.CertDirectory, "kubelet.key")
  861. canReadCertAndKey, err := certutil.CanReadCertAndKey(kc.TLSCertFile, kc.TLSPrivateKeyFile)
  862. if err != nil {
  863. return nil, err
  864. }
  865. if !canReadCertAndKey {
  866. hostName, err := nodeutil.GetHostname(kf.HostnameOverride)
  867. if err != nil {
  868. return nil, err
  869. }
  870. cert, key, err := certutil.GenerateSelfSignedCertKey(hostName, nil, nil)
  871. if err != nil {
  872. return nil, fmt.Errorf("unable to generate self signed cert: %v", err)
  873. }
  874. if err := certutil.WriteCert(kc.TLSCertFile, cert); err != nil {
  875. return nil, err
  876. }
  877. if err := keyutil.WriteKey(kc.TLSPrivateKeyFile, key); err != nil {
  878. return nil, err
  879. }
  880. klog.V(4).Infof("Using self-signed cert (%s, %s)", kc.TLSCertFile, kc.TLSPrivateKeyFile)
  881. }
  882. }
  883. tlsCipherSuites, err := cliflag.TLSCipherSuites(kc.TLSCipherSuites)
  884. if err != nil {
  885. return nil, err
  886. }
  887. minTLSVersion, err := cliflag.TLSVersion(kc.TLSMinVersion)
  888. if err != nil {
  889. return nil, err
  890. }
  891. tlsOptions := &server.TLSOptions{
  892. Config: &tls.Config{
  893. MinVersion: minTLSVersion,
  894. CipherSuites: tlsCipherSuites,
  895. },
  896. CertFile: kc.TLSCertFile,
  897. KeyFile: kc.TLSPrivateKeyFile,
  898. }
  899. if len(kc.Authentication.X509.ClientCAFile) > 0 {
  900. clientCAs, err := certutil.NewPool(kc.Authentication.X509.ClientCAFile)
  901. if err != nil {
  902. return nil, fmt.Errorf("unable to load client CA file %s: %v", kc.Authentication.X509.ClientCAFile, err)
  903. }
  904. // Specify allowed CAs for client certificates
  905. tlsOptions.Config.ClientCAs = clientCAs
  906. // Populate PeerCertificates in requests, but don't reject connections without verified certificates
  907. tlsOptions.Config.ClientAuth = tls.RequestClientCert
  908. }
  909. return tlsOptions, nil
  910. }
  911. // setContentTypeForClient sets the appropritae content type into the rest config
  912. // and handles defaulting AcceptContentTypes based on that input.
  913. func setContentTypeForClient(cfg *restclient.Config, contentType string) {
  914. if len(contentType) == 0 {
  915. return
  916. }
  917. cfg.ContentType = contentType
  918. switch contentType {
  919. case runtime.ContentTypeProtobuf:
  920. cfg.AcceptContentTypes = strings.Join([]string{runtime.ContentTypeProtobuf, runtime.ContentTypeJSON}, ",")
  921. default:
  922. // otherwise let the rest client perform defaulting
  923. }
  924. }
  925. // RunKubelet is responsible for setting up and running a kubelet. It is used in three different applications:
  926. // 1 Integration tests
  927. // 2 Kubelet binary
  928. // 3 Standalone 'kubernetes' binary
  929. // Eventually, #2 will be replaced with instances of #3
  930. func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
  931. hostname, err := nodeutil.GetHostname(kubeServer.HostnameOverride)
  932. if err != nil {
  933. return err
  934. }
  935. // Query the cloud provider for our node name, default to hostname if kubeDeps.Cloud == nil
  936. nodeName, err := getNodeName(kubeDeps.Cloud, hostname)
  937. if err != nil {
  938. return err
  939. }
  940. // Setup event recorder if required.
  941. makeEventRecorder(kubeDeps, nodeName)
  942. capabilities.Initialize(capabilities.Capabilities{
  943. AllowPrivileged: true,
  944. })
  945. credentialprovider.SetPreferredDockercfgPath(kubeServer.RootDirectory)
  946. klog.V(2).Infof("Using root directory: %v", kubeServer.RootDirectory)
  947. if kubeDeps.OSInterface == nil {
  948. kubeDeps.OSInterface = kubecontainer.RealOS{}
  949. }
  950. k, err := createAndInitKubelet(&kubeServer.KubeletConfiguration,
  951. kubeDeps,
  952. &kubeServer.ContainerRuntimeOptions,
  953. kubeServer.ContainerRuntime,
  954. kubeServer.HostnameOverride,
  955. kubeServer.NodeIP,
  956. kubeServer.ProviderID,
  957. kubeServer.CloudProvider,
  958. kubeServer.CertDirectory,
  959. kubeServer.RootDirectory,
  960. kubeServer.RegisterNode,
  961. kubeServer.RegisterWithTaints,
  962. kubeServer.AllowedUnsafeSysctls,
  963. kubeServer.ExperimentalMounterPath,
  964. kubeServer.ExperimentalKernelMemcgNotification,
  965. kubeServer.ExperimentalCheckNodeCapabilitiesBeforeMount,
  966. kubeServer.ExperimentalNodeAllocatableIgnoreEvictionThreshold,
  967. kubeServer.MinimumGCAge,
  968. kubeServer.MaxPerPodContainerCount,
  969. kubeServer.MaxContainerCount,
  970. kubeServer.MasterServiceNamespace,
  971. kubeServer.RegisterSchedulable,
  972. kubeServer.KeepTerminatedPodVolumes,
  973. kubeServer.NodeLabels,
  974. kubeServer.SeccompProfileRoot,
  975. kubeServer.BootstrapCheckpointPath,
  976. kubeServer.NodeStatusMaxImages)
  977. if err != nil {
  978. return fmt.Errorf("failed to create kubelet: %v", err)
  979. }
  980. // NewMainKubelet should have set up a pod source config if one didn't exist
  981. // when the builder was run. This is just a precaution.
  982. if kubeDeps.PodConfig == nil {
  983. return fmt.Errorf("failed to create kubelet, pod source config was nil")
  984. }
  985. podCfg := kubeDeps.PodConfig
  986. rlimit.RlimitNumFiles(uint64(kubeServer.MaxOpenFiles))
  987. // process pods and exit.
  988. if runOnce {
  989. if _, err := k.RunOnce(podCfg.Updates()); err != nil {
  990. return fmt.Errorf("runonce failed: %v", err)
  991. }
  992. klog.Info("Started kubelet as runonce")
  993. } else {
  994. startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableCAdvisorJSONEndpoints, kubeServer.EnableServer)
  995. klog.Info("Started kubelet")
  996. }
  997. return nil
  998. }
  999. func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableCAdvisorJSONEndpoints, enableServer bool) {
  1000. // start the kubelet
  1001. go wait.Until(func() {
  1002. k.Run(podCfg.Updates())
  1003. }, 0, wait.NeverStop)
  1004. // start the kubelet server
  1005. if enableServer {
  1006. go k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, enableCAdvisorJSONEndpoints, kubeCfg.EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling)
  1007. }
  1008. if kubeCfg.ReadOnlyPort > 0 {
  1009. go k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort), enableCAdvisorJSONEndpoints)
  1010. }
  1011. if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPodResources) {
  1012. go k.ListenAndServePodResources()
  1013. }
  1014. }
  1015. func createAndInitKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
  1016. kubeDeps *kubelet.Dependencies,
  1017. crOptions *config.ContainerRuntimeOptions,
  1018. containerRuntime string,
  1019. hostnameOverride string,
  1020. nodeIP string,
  1021. providerID string,
  1022. cloudProvider string,
  1023. certDirectory string,
  1024. rootDirectory string,
  1025. registerNode bool,
  1026. registerWithTaints []api.Taint,
  1027. allowedUnsafeSysctls []string,
  1028. experimentalMounterPath string,
  1029. experimentalKernelMemcgNotification bool,
  1030. experimentalCheckNodeCapabilitiesBeforeMount bool,
  1031. experimentalNodeAllocatableIgnoreEvictionThreshold bool,
  1032. minimumGCAge metav1.Duration,
  1033. maxPerPodContainerCount int32,
  1034. maxContainerCount int32,
  1035. masterServiceNamespace string,
  1036. registerSchedulable bool,
  1037. keepTerminatedPodVolumes bool,
  1038. nodeLabels map[string]string,
  1039. seccompProfileRoot string,
  1040. bootstrapCheckpointPath string,
  1041. nodeStatusMaxImages int32) (k kubelet.Bootstrap, err error) {
  1042. // TODO: block until all sources have delivered at least one update to the channel, or break the sync loop
  1043. // up into "per source" synchronizations
  1044. k, err = kubelet.NewMainKubelet(kubeCfg,
  1045. kubeDeps,
  1046. crOptions,
  1047. containerRuntime,
  1048. hostnameOverride,
  1049. nodeIP,
  1050. providerID,
  1051. cloudProvider,
  1052. certDirectory,
  1053. rootDirectory,
  1054. registerNode,
  1055. registerWithTaints,
  1056. allowedUnsafeSysctls,
  1057. experimentalMounterPath,
  1058. experimentalKernelMemcgNotification,
  1059. experimentalCheckNodeCapabilitiesBeforeMount,
  1060. experimentalNodeAllocatableIgnoreEvictionThreshold,
  1061. minimumGCAge,
  1062. maxPerPodContainerCount,
  1063. maxContainerCount,
  1064. masterServiceNamespace,
  1065. registerSchedulable,
  1066. keepTerminatedPodVolumes,
  1067. nodeLabels,
  1068. seccompProfileRoot,
  1069. bootstrapCheckpointPath,
  1070. nodeStatusMaxImages)
  1071. if err != nil {
  1072. return nil, err
  1073. }
  1074. k.BirthCry()
  1075. k.StartGarbageCollection()
  1076. return k, nil
  1077. }
  1078. // parseResourceList parses the given configuration map into an API
  1079. // ResourceList or returns an error.
  1080. func parseResourceList(m map[string]string) (v1.ResourceList, error) {
  1081. if len(m) == 0 {
  1082. return nil, nil
  1083. }
  1084. rl := make(v1.ResourceList)
  1085. for k, v := range m {
  1086. switch v1.ResourceName(k) {
  1087. // CPU, memory, local storage, and PID resources are supported.
  1088. case v1.ResourceCPU, v1.ResourceMemory, v1.ResourceEphemeralStorage, pidlimit.PIDs:
  1089. if v1.ResourceName(k) != pidlimit.PIDs || utilfeature.DefaultFeatureGate.Enabled(features.SupportNodePidsLimit) {
  1090. q, err := resource.ParseQuantity(v)
  1091. if err != nil {
  1092. return nil, err
  1093. }
  1094. if q.Sign() == -1 {
  1095. return nil, fmt.Errorf("resource quantity for %q cannot be negative: %v", k, v)
  1096. }
  1097. rl[v1.ResourceName(k)] = q
  1098. }
  1099. default:
  1100. return nil, fmt.Errorf("cannot reserve %q resource", k)
  1101. }
  1102. }
  1103. return rl, nil
  1104. }
  1105. // BootstrapKubeletConfigController constructs and bootstrap a configuration controller
  1106. func BootstrapKubeletConfigController(dynamicConfigDir string, transform dynamickubeletconfig.TransformFunc) (*kubeletconfiginternal.KubeletConfiguration, *dynamickubeletconfig.Controller, error) {
  1107. if !utilfeature.DefaultFeatureGate.Enabled(features.DynamicKubeletConfig) {
  1108. return nil, nil, fmt.Errorf("failed to bootstrap Kubelet config controller, you must enable the DynamicKubeletConfig feature gate")
  1109. }
  1110. if len(dynamicConfigDir) == 0 {
  1111. return nil, nil, fmt.Errorf("cannot bootstrap Kubelet config controller, --dynamic-config-dir was not provided")
  1112. }
  1113. // compute absolute path and bootstrap controller
  1114. dir, err := filepath.Abs(dynamicConfigDir)
  1115. if err != nil {
  1116. return nil, nil, fmt.Errorf("failed to get absolute path for --dynamic-config-dir=%s", dynamicConfigDir)
  1117. }
  1118. // get the latest KubeletConfiguration checkpoint from disk, or return the default config if no valid checkpoints exist
  1119. c := dynamickubeletconfig.NewController(dir, transform)
  1120. kc, err := c.Bootstrap()
  1121. if err != nil {
  1122. return nil, nil, fmt.Errorf("failed to determine a valid configuration, error: %v", err)
  1123. }
  1124. return kc, c, nil
  1125. }
  1126. // RunDockershim only starts the dockershim in current process. This is only used for cri validate testing purpose
  1127. // TODO(random-liu): Move this to a separate binary.
  1128. func RunDockershim(f *options.KubeletFlags, c *kubeletconfiginternal.KubeletConfiguration, stopCh <-chan struct{}) error {
  1129. r := &f.ContainerRuntimeOptions
  1130. // Initialize docker client configuration.
  1131. dockerClientConfig := &dockershim.ClientConfig{
  1132. DockerEndpoint: r.DockerEndpoint,
  1133. RuntimeRequestTimeout: c.RuntimeRequestTimeout.Duration,
  1134. ImagePullProgressDeadline: r.ImagePullProgressDeadline.Duration,
  1135. }
  1136. // Initialize network plugin settings.
  1137. pluginSettings := dockershim.NetworkPluginSettings{
  1138. HairpinMode: kubeletconfiginternal.HairpinMode(c.HairpinMode),
  1139. NonMasqueradeCIDR: f.NonMasqueradeCIDR,
  1140. PluginName: r.NetworkPluginName,
  1141. PluginConfDir: r.CNIConfDir,
  1142. PluginBinDirString: r.CNIBinDir,
  1143. PluginCacheDir: r.CNICacheDir,
  1144. MTU: int(r.NetworkPluginMTU),
  1145. }
  1146. // Initialize streaming configuration. (Not using TLS now)
  1147. streamingConfig := &streaming.Config{
  1148. // Use a relative redirect (no scheme or host).
  1149. BaseURL: &url.URL{Path: "/cri/"},
  1150. StreamIdleTimeout: c.StreamingConnectionIdleTimeout.Duration,
  1151. StreamCreationTimeout: streaming.DefaultConfig.StreamCreationTimeout,
  1152. SupportedRemoteCommandProtocols: streaming.DefaultConfig.SupportedRemoteCommandProtocols,
  1153. SupportedPortForwardProtocols: streaming.DefaultConfig.SupportedPortForwardProtocols,
  1154. }
  1155. // Standalone dockershim will always start the local streaming server.
  1156. ds, err := dockershim.NewDockerService(dockerClientConfig, r.PodSandboxImage, streamingConfig, &pluginSettings,
  1157. f.RuntimeCgroups, c.CgroupDriver, r.DockershimRootDirectory, true /*startLocalStreamingServer*/)
  1158. if err != nil {
  1159. return err
  1160. }
  1161. klog.V(2).Infof("Starting the GRPC server for the docker CRI shim.")
  1162. server := dockerremote.NewDockerServer(f.RemoteRuntimeEndpoint, ds)
  1163. if err := server.Start(); err != nil {
  1164. return err
  1165. }
  1166. <-stopCh
  1167. return nil
  1168. }