kubelet.go 16 KB


  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package services
  14. import (
  15. "flag"
  16. "fmt"
  17. "io/ioutil"
  18. "os"
  19. "os/exec"
  20. "path/filepath"
  21. "strings"
  22. "time"
  23. "github.com/spf13/pflag"
  24. "k8s.io/klog"
  25. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  26. utilfeature "k8s.io/apiserver/pkg/util/feature"
  27. cliflag "k8s.io/component-base/cli/flag"
  28. kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1"
  29. "k8s.io/kubernetes/cmd/kubelet/app/options"
  30. "k8s.io/kubernetes/pkg/features"
  31. kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
  32. kubeletconfigcodec "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/codec"
  33. "k8s.io/kubernetes/test/e2e/framework"
  34. "k8s.io/kubernetes/test/e2e_node/builder"
  35. "k8s.io/kubernetes/test/e2e_node/remote"
  36. )
  37. // TODO(random-liu): Replace this with standard kubelet launcher.
  38. // args is the type used to accumulate args from the flags with the same name.
  39. type args []string
  40. // String function of flag.Value
  41. func (a *args) String() string {
  42. return fmt.Sprint(*a)
  43. }
  44. // Set function of flag.Value
  45. func (a *args) Set(value string) error {
  46. // Note that we assume all white space in flag string is separating fields
  47. na := strings.Fields(value)
  48. *a = append(*a, na...)
  49. return nil
  50. }
  51. // kubeletArgs is the override kubelet args specified by the test runner.
  52. var kubeletArgs args
  53. var kubeletContainerized bool
  54. var hyperkubeImage string
  55. var genKubeletConfigFile bool
  56. func init() {
  57. flag.Var(&kubeletArgs, "kubelet-flags", "Kubelet flags passed to kubelet, this will override default kubelet flags in the test. Flags specified in multiple kubelet-flags will be concatenate.")
  58. flag.BoolVar(&kubeletContainerized, "kubelet-containerized", false, "Run kubelet in a docker container")
  59. flag.StringVar(&hyperkubeImage, "hyperkube-image", "", "Docker image with containerized kubelet")
  60. flag.BoolVar(&genKubeletConfigFile, "generate-kubelet-config-file", true, "The test runner will generate a Kubelet config file containing test defaults instead of passing default flags to the Kubelet.")
  61. }
  62. // RunKubelet starts kubelet and waits for termination signal. Once receives the
  63. // termination signal, it will stop the kubelet gracefully.
  64. func RunKubelet() {
  65. var err error
  66. // Enable monitorParent to make sure kubelet will receive termination signal
  67. // when test process exits.
  68. e := NewE2EServices(true /* monitorParent */)
  69. defer e.Stop()
  70. e.kubelet, err = e.startKubelet()
  71. if err != nil {
  72. klog.Fatalf("Failed to start kubelet: %v", err)
  73. }
  74. // Wait until receiving a termination signal.
  75. waitForTerminationSignal()
  76. }
  77. const (
  78. // Ports of different e2e services.
  79. kubeletPort = "10250"
  80. kubeletReadOnlyPort = "10255"
  81. // KubeletRootDirectory specifies the directory where the kubelet runtime information is stored.
  82. KubeletRootDirectory = "/var/lib/kubelet"
  83. // Health check url of kubelet
  84. kubeletHealthCheckURL = "http://127.0.0.1:" + kubeletReadOnlyPort + "/healthz"
  85. )
  86. // startKubelet starts the Kubelet in a separate process or returns an error
  87. // if the Kubelet fails to start.
  88. func (e *E2EServices) startKubelet() (*server, error) {
  89. if kubeletContainerized && hyperkubeImage == "" {
  90. return nil, fmt.Errorf("the --hyperkube-image option must be set")
  91. }
  92. klog.Info("Starting kubelet")
  93. // set feature gates so we can check which features are enabled and pass the appropriate flags
  94. if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(framework.TestContext.FeatureGates); err != nil {
  95. return nil, err
  96. }
  97. // Build kubeconfig
  98. kubeconfigPath, err := createKubeconfigCWD()
  99. if err != nil {
  100. return nil, err
  101. }
  102. // KubeletConfiguration file path
  103. kubeletConfigPath, err := kubeletConfigCWDPath()
  104. if err != nil {
  105. return nil, err
  106. }
  107. // Create pod directory
  108. podPath, err := createPodDirectory()
  109. if err != nil {
  110. return nil, err
  111. }
  112. e.rmDirs = append(e.rmDirs, podPath)
  113. err = createRootDirectory(KubeletRootDirectory)
  114. if err != nil {
  115. return nil, err
  116. }
  117. // PLEASE NOTE: If you set new KubeletConfiguration values or stop setting values here,
  118. // you must also update the flag names in kubeletConfigFlags!
  119. kubeletConfigFlags := []string{}
  120. // set up the default kubeletconfiguration
  121. kc, err := options.NewKubeletConfiguration()
  122. if err != nil {
  123. return nil, err
  124. }
  125. kc.CgroupRoot = "/"
  126. kubeletConfigFlags = append(kubeletConfigFlags, "cgroup-root")
  127. kc.VolumeStatsAggPeriod = metav1.Duration{Duration: 10 * time.Second} // Aggregate volumes frequently so tests don't need to wait as long
  128. kubeletConfigFlags = append(kubeletConfigFlags, "volume-stats-agg-period")
  129. kc.SerializeImagePulls = false
  130. kubeletConfigFlags = append(kubeletConfigFlags, "serialize-image-pulls")
  131. kc.StaticPodPath = podPath
  132. kubeletConfigFlags = append(kubeletConfigFlags, "pod-manifest-path")
  133. kc.FileCheckFrequency = metav1.Duration{Duration: 10 * time.Second} // Check file frequently so tests won't wait too long
  134. kubeletConfigFlags = append(kubeletConfigFlags, "file-check-frequency")
  135. // Assign a fixed CIDR to the node because there is no node controller.
  136. // Note: this MUST be in sync with the IP in
  137. // - cluster/gce/config-test.sh and
  138. // - test/e2e_node/conformance/run_test.sh.
  139. kc.PodCIDR = "10.100.0.0/24"
  140. kubeletConfigFlags = append(kubeletConfigFlags, "pod-cidr")
  141. kc.EvictionPressureTransitionPeriod = metav1.Duration{Duration: 30 * time.Second}
  142. kubeletConfigFlags = append(kubeletConfigFlags, "eviction-pressure-transition-period")
  143. kc.EvictionHard = map[string]string{
  144. "memory.available": "250Mi",
  145. "nodefs.available": "10%",
  146. "nodefs.inodesFree": "5%",
  147. }
  148. kubeletConfigFlags = append(kubeletConfigFlags, "eviction-hard")
  149. kc.EvictionMinimumReclaim = map[string]string{
  150. "nodefs.available": "5%",
  151. "nodefs.inodesFree": "5%",
  152. }
  153. kubeletConfigFlags = append(kubeletConfigFlags, "eviction-minimum-reclaim")
  154. var killCommand, restartCommand *exec.Cmd
  155. var isSystemd bool
  156. // Apply default kubelet flags.
  157. cmdArgs := []string{}
  158. if systemdRun, err := exec.LookPath("systemd-run"); err == nil {
  159. // On systemd services, detection of a service / unit works reliably while
  160. // detection of a process started from an ssh session does not work.
  161. // Since kubelet will typically be run as a service it also makes more
  162. // sense to test it that way
  163. isSystemd = true
  164. // We can ignore errors, to have GetTimestampFromWorkspaceDir() fallback
  165. // to the current time.
  166. cwd, _ := os.Getwd()
  167. // Use the timestamp from the current directory to name the systemd unit.
  168. unitTimestamp := remote.GetTimestampFromWorkspaceDir(cwd)
  169. unitName := fmt.Sprintf("kubelet-%s.service", unitTimestamp)
  170. if kubeletContainerized {
  171. cmdArgs = append(cmdArgs, systemdRun, "--unit="+unitName, "--slice=runtime.slice", "--remain-after-exit",
  172. "/usr/bin/docker", "run", "--name=kubelet",
  173. "--rm", "--privileged", "--net=host", "--pid=host",
  174. "-e HOST=/rootfs", "-e HOST_ETC=/host-etc",
  175. "-v", "/etc/localtime:/etc/localtime:ro",
  176. "-v", "/etc/machine-id:/etc/machine-id:ro",
  177. "-v", filepath.Dir(kubeconfigPath)+":/etc/kubernetes",
  178. "-v", "/:/rootfs:rw,rslave",
  179. "-v", "/run:/run",
  180. "-v", "/sys/fs/cgroup:/sys/fs/cgroup:rw",
  181. "-v", "/sys:/sys:rw",
  182. "-v", "/usr/bin/docker:/usr/bin/docker:ro",
  183. "-v", "/var/lib/cni:/var/lib/cni",
  184. "-v", "/var/lib/docker:/var/lib/docker",
  185. "-v", "/var/lib/kubelet:/var/lib/kubelet:rw,rslave",
  186. "-v", "/var/log:/var/log",
  187. "-v", podPath+":"+podPath+":rw",
  188. )
  189. // if we will generate a kubelet config file, we need to mount that path into the container too
  190. if genKubeletConfigFile {
  191. cmdArgs = append(cmdArgs, "-v", filepath.Dir(kubeletConfigPath)+":"+filepath.Dir(kubeletConfigPath)+":rw")
  192. }
  193. cmdArgs = append(cmdArgs, hyperkubeImage, "/hyperkube", "kubelet", "--containerized")
  194. kubeconfigPath = "/etc/kubernetes/kubeconfig"
  195. } else {
  196. cmdArgs = append(cmdArgs,
  197. systemdRun,
  198. "--unit="+unitName,
  199. "--slice=runtime.slice",
  200. "--remain-after-exit",
  201. builder.GetKubeletServerBin())
  202. }
  203. killCommand = exec.Command("systemctl", "kill", unitName)
  204. restartCommand = exec.Command("systemctl", "restart", unitName)
  205. e.logs["kubelet.log"] = LogFileData{
  206. Name: "kubelet.log",
  207. JournalctlCommand: []string{"-u", unitName},
  208. }
  209. kc.KubeletCgroups = "/kubelet.slice"
  210. kubeletConfigFlags = append(kubeletConfigFlags, "kubelet-cgroups")
  211. } else {
  212. cmdArgs = append(cmdArgs, builder.GetKubeletServerBin())
  213. // TODO(random-liu): Get rid of this docker specific thing.
  214. cmdArgs = append(cmdArgs, "--runtime-cgroups=/docker-daemon")
  215. kc.KubeletCgroups = "/kubelet"
  216. kubeletConfigFlags = append(kubeletConfigFlags, "kubelet-cgroups")
  217. kc.SystemCgroups = "/system"
  218. kubeletConfigFlags = append(kubeletConfigFlags, "system-cgroups")
  219. }
  220. cmdArgs = append(cmdArgs,
  221. "--kubeconfig", kubeconfigPath,
  222. "--root-dir", KubeletRootDirectory,
  223. "--v", LogVerbosityLevel, "--logtostderr",
  224. )
  225. // Apply test framework feature gates by default. This could also be overridden
  226. // by kubelet-flags.
  227. if len(framework.TestContext.FeatureGates) > 0 {
  228. cmdArgs = append(cmdArgs, "--feature-gates", cliflag.NewMapStringBool(&framework.TestContext.FeatureGates).String())
  229. kc.FeatureGates = framework.TestContext.FeatureGates
  230. }
  231. if utilfeature.DefaultFeatureGate.Enabled(features.DynamicKubeletConfig) {
  232. // Enable dynamic config if the feature gate is enabled
  233. dynamicConfigDir, err := getDynamicConfigDir()
  234. if err != nil {
  235. return nil, err
  236. }
  237. cmdArgs = append(cmdArgs, "--dynamic-config-dir", dynamicConfigDir)
  238. }
  239. // Enable kubenet by default.
  240. cniBinDir, err := getCNIBinDirectory()
  241. if err != nil {
  242. return nil, err
  243. }
  244. cniConfDir, err := getCNIConfDirectory()
  245. if err != nil {
  246. return nil, err
  247. }
  248. cmdArgs = append(cmdArgs,
  249. "--network-plugin=kubenet",
  250. "--cni-bin-dir", cniBinDir,
  251. "--cni-conf-dir", cniConfDir)
  252. // Keep hostname override for convenience.
  253. if framework.TestContext.NodeName != "" { // If node name is specified, set hostname override.
  254. cmdArgs = append(cmdArgs, "--hostname-override", framework.TestContext.NodeName)
  255. }
  256. if framework.TestContext.ContainerRuntime != "" {
  257. cmdArgs = append(cmdArgs, "--container-runtime", framework.TestContext.ContainerRuntime)
  258. }
  259. if framework.TestContext.ContainerRuntimeEndpoint != "" {
  260. cmdArgs = append(cmdArgs, "--container-runtime-endpoint", framework.TestContext.ContainerRuntimeEndpoint)
  261. }
  262. if framework.TestContext.ImageServiceEndpoint != "" {
  263. cmdArgs = append(cmdArgs, "--image-service-endpoint", framework.TestContext.ImageServiceEndpoint)
  264. }
  265. // Write config file or flags, depending on whether --generate-kubelet-config-file was provided
  266. if genKubeletConfigFile {
  267. if err := writeKubeletConfigFile(kc, kubeletConfigPath); err != nil {
  268. return nil, err
  269. }
  270. // add the flag to load config from a file
  271. cmdArgs = append(cmdArgs, "--config", kubeletConfigPath)
  272. } else {
  273. // generate command line flags from the default config, since --generate-kubelet-config-file was not provided
  274. addKubeletConfigFlags(&cmdArgs, kc, kubeletConfigFlags)
  275. }
  276. // Override the default kubelet flags.
  277. cmdArgs = append(cmdArgs, kubeletArgs...)
  278. // Adjust the args if we are running kubelet with systemd.
  279. if isSystemd {
  280. adjustArgsForSystemd(cmdArgs)
  281. }
  282. cmd := exec.Command(cmdArgs[0], cmdArgs[1:]...)
  283. server := newServer(
  284. "kubelet",
  285. cmd,
  286. killCommand,
  287. restartCommand,
  288. []string{kubeletHealthCheckURL},
  289. "kubelet.log",
  290. e.monitorParent,
  291. true /* restartOnExit */)
  292. return server, server.start()
  293. }
  294. // addKubeletConfigFlags adds the flags we care about from the provided kubelet configuration object
  295. func addKubeletConfigFlags(cmdArgs *[]string, kc *kubeletconfig.KubeletConfiguration, flags []string) {
  296. fs := pflag.NewFlagSet("kubelet", pflag.ExitOnError)
  297. options.AddKubeletConfigFlags(fs, kc)
  298. for _, name := range flags {
  299. *cmdArgs = append(*cmdArgs, fmt.Sprintf("--%s=%s", name, fs.Lookup(name).Value.String()))
  300. }
  301. }
  302. // writeKubeletConfigFile writes the kubelet config file based on the args and returns the filename
  303. func writeKubeletConfigFile(internal *kubeletconfig.KubeletConfiguration, path string) error {
  304. data, err := kubeletconfigcodec.EncodeKubeletConfig(internal, kubeletconfigv1beta1.SchemeGroupVersion)
  305. if err != nil {
  306. return err
  307. }
  308. // create the directory, if it does not exist
  309. dir := filepath.Dir(path)
  310. if err := os.MkdirAll(dir, 0755); err != nil {
  311. return err
  312. }
  313. // write the file
  314. if err := ioutil.WriteFile(path, data, 0755); err != nil {
  315. return err
  316. }
  317. return nil
  318. }
  319. // createPodDirectory creates pod directory.
  320. func createPodDirectory() (string, error) {
  321. cwd, err := os.Getwd()
  322. if err != nil {
  323. return "", fmt.Errorf("failed to get current working directory: %v", err)
  324. }
  325. path, err := ioutil.TempDir(cwd, "static-pods")
  326. if err != nil {
  327. return "", fmt.Errorf("failed to create static pod directory: %v", err)
  328. }
  329. return path, nil
  330. }
  331. // createKubeconfig creates a kubeconfig file at the fully qualified `path`. The parent dirs must exist.
  332. func createKubeconfig(path string) error {
  333. kubeconfig := []byte(`apiVersion: v1
  334. kind: Config
  335. users:
  336. - name: kubelet
  337. clusters:
  338. - cluster:
  339. server: ` + getAPIServerClientURL() + `
  340. insecure-skip-tls-verify: true
  341. name: local
  342. contexts:
  343. - context:
  344. cluster: local
  345. user: kubelet
  346. name: local-context
  347. current-context: local-context`)
  348. if err := ioutil.WriteFile(path, kubeconfig, 0666); err != nil {
  349. return err
  350. }
  351. return nil
  352. }
  353. func createRootDirectory(path string) error {
  354. if _, err := os.Stat(path); err != nil {
  355. if os.IsNotExist(err) {
  356. return os.MkdirAll(path, os.FileMode(0755))
  357. }
  358. return err
  359. }
  360. return nil
  361. }
  362. func kubeconfigCWDPath() (string, error) {
  363. cwd, err := os.Getwd()
  364. if err != nil {
  365. return "", fmt.Errorf("failed to get current working directory: %v", err)
  366. }
  367. return filepath.Join(cwd, "kubeconfig"), nil
  368. }
  369. func kubeletConfigCWDPath() (string, error) {
  370. cwd, err := os.Getwd()
  371. if err != nil {
  372. return "", fmt.Errorf("failed to get current working directory: %v", err)
  373. }
  374. // DO NOT name this file "kubelet" - you will overwrite the kubelet binary and be very confused :)
  375. return filepath.Join(cwd, "kubelet-config"), nil
  376. }
  377. // like createKubeconfig, but creates kubeconfig at current-working-directory/kubeconfig
  378. // returns a fully-qualified path to the kubeconfig file
  379. func createKubeconfigCWD() (string, error) {
  380. kubeconfigPath, err := kubeconfigCWDPath()
  381. if err != nil {
  382. return "", err
  383. }
  384. if err = createKubeconfig(kubeconfigPath); err != nil {
  385. return "", err
  386. }
  387. return kubeconfigPath, nil
  388. }
  389. // getCNIBinDirectory returns CNI directory.
  390. func getCNIBinDirectory() (string, error) {
  391. cwd, err := os.Getwd()
  392. if err != nil {
  393. return "", err
  394. }
  395. return filepath.Join(cwd, "cni", "bin"), nil
  396. }
  397. // getCNIConfDirectory returns CNI Configuration directory.
  398. func getCNIConfDirectory() (string, error) {
  399. cwd, err := os.Getwd()
  400. if err != nil {
  401. return "", err
  402. }
  403. return filepath.Join(cwd, "cni", "net.d"), nil
  404. }
  405. // getDynamicConfigDir returns the directory for dynamic Kubelet configuration
  406. func getDynamicConfigDir() (string, error) {
  407. cwd, err := os.Getwd()
  408. if err != nil {
  409. return "", err
  410. }
  411. return filepath.Join(cwd, "dynamic-kubelet-config"), nil
  412. }
  413. // adjustArgsForSystemd escape special characters in kubelet arguments for systemd. Systemd
  414. // may try to do auto expansion without escaping.
  415. func adjustArgsForSystemd(args []string) {
  416. for i := range args {
  417. args[i] = strings.Replace(args[i], "%", "%%", -1)
  418. args[i] = strings.Replace(args[i], "$", "$$", -1)
  419. }
  420. }