kubelet.go 14 KB


  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package services
  14. import (
  15. "flag"
  16. "fmt"
  17. "io/ioutil"
  18. "os"
  19. "os/exec"
  20. "path/filepath"
  21. "strings"
  22. "time"
  23. "github.com/spf13/pflag"
  24. "k8s.io/klog"
  25. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  26. utilfeature "k8s.io/apiserver/pkg/util/feature"
  27. cliflag "k8s.io/component-base/cli/flag"
  28. kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1"
  29. "k8s.io/kubernetes/cmd/kubelet/app/options"
  30. "k8s.io/kubernetes/pkg/features"
  31. kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
  32. kubeletconfigcodec "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/codec"
  33. "k8s.io/kubernetes/test/e2e/framework"
  34. "k8s.io/kubernetes/test/e2e_node/builder"
  35. "k8s.io/kubernetes/test/e2e_node/remote"
  36. )
  37. // TODO(random-liu): Replace this with standard kubelet launcher.
  38. // args is the type used to accumulate args from the flags with the same name.
  39. type args []string
  40. // String function of flag.Value
  41. func (a *args) String() string {
  42. return fmt.Sprint(*a)
  43. }
  44. // Set function of flag.Value
  45. func (a *args) Set(value string) error {
  46. // Note that we assume all white space in flag string is separating fields
  47. na := strings.Fields(value)
  48. *a = append(*a, na...)
  49. return nil
  50. }
  51. // kubeletArgs is the override kubelet args specified by the test runner.
  52. var kubeletArgs args
  53. var genKubeletConfigFile bool
  54. func init() {
  55. flag.Var(&kubeletArgs, "kubelet-flags", "Kubelet flags passed to kubelet, this will override default kubelet flags in the test. Flags specified in multiple kubelet-flags will be concatenate.")
  56. flag.BoolVar(&genKubeletConfigFile, "generate-kubelet-config-file", true, "The test runner will generate a Kubelet config file containing test defaults instead of passing default flags to the Kubelet.")
  57. }
  58. // RunKubelet starts kubelet and waits for termination signal. Once receives the
  59. // termination signal, it will stop the kubelet gracefully.
  60. func RunKubelet() {
  61. var err error
  62. // Enable monitorParent to make sure kubelet will receive termination signal
  63. // when test process exits.
  64. e := NewE2EServices(true /* monitorParent */)
  65. defer e.Stop()
  66. e.kubelet, err = e.startKubelet()
  67. if err != nil {
  68. klog.Fatalf("Failed to start kubelet: %v", err)
  69. }
  70. // Wait until receiving a termination signal.
  71. waitForTerminationSignal()
  72. }
  73. const (
  74. // Ports of different e2e services.
  75. kubeletPort = "10250"
  76. kubeletReadOnlyPort = "10255"
  77. // KubeletRootDirectory specifies the directory where the kubelet runtime information is stored.
  78. KubeletRootDirectory = "/var/lib/kubelet"
  79. // Health check url of kubelet
  80. kubeletHealthCheckURL = "http://127.0.0.1:" + kubeletReadOnlyPort + "/healthz"
  81. )
  82. // startKubelet starts the Kubelet in a separate process or returns an error
  83. // if the Kubelet fails to start.
  84. func (e *E2EServices) startKubelet() (*server, error) {
  85. klog.Info("Starting kubelet")
  86. // set feature gates so we can check which features are enabled and pass the appropriate flags
  87. if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(framework.TestContext.FeatureGates); err != nil {
  88. return nil, err
  89. }
  90. // Build kubeconfig
  91. kubeconfigPath, err := createKubeconfigCWD()
  92. if err != nil {
  93. return nil, err
  94. }
  95. // KubeletConfiguration file path
  96. kubeletConfigPath, err := kubeletConfigCWDPath()
  97. if err != nil {
  98. return nil, err
  99. }
  100. // Create pod directory
  101. podPath, err := createPodDirectory()
  102. if err != nil {
  103. return nil, err
  104. }
  105. e.rmDirs = append(e.rmDirs, podPath)
  106. err = createRootDirectory(KubeletRootDirectory)
  107. if err != nil {
  108. return nil, err
  109. }
  110. // PLEASE NOTE: If you set new KubeletConfiguration values or stop setting values here,
  111. // you must also update the flag names in kubeletConfigFlags!
  112. kubeletConfigFlags := []string{}
  113. // set up the default kubeletconfiguration
  114. kc, err := options.NewKubeletConfiguration()
  115. if err != nil {
  116. return nil, err
  117. }
  118. kc.CgroupRoot = "/"
  119. kubeletConfigFlags = append(kubeletConfigFlags, "cgroup-root")
  120. kc.VolumeStatsAggPeriod = metav1.Duration{Duration: 10 * time.Second} // Aggregate volumes frequently so tests don't need to wait as long
  121. kubeletConfigFlags = append(kubeletConfigFlags, "volume-stats-agg-period")
  122. kc.SerializeImagePulls = false
  123. kubeletConfigFlags = append(kubeletConfigFlags, "serialize-image-pulls")
  124. kc.StaticPodPath = podPath
  125. kubeletConfigFlags = append(kubeletConfigFlags, "pod-manifest-path")
  126. kc.FileCheckFrequency = metav1.Duration{Duration: 10 * time.Second} // Check file frequently so tests won't wait too long
  127. kubeletConfigFlags = append(kubeletConfigFlags, "file-check-frequency")
  128. // Assign a fixed CIDR to the node because there is no node controller.
  129. // Note: this MUST be in sync with the IP in
  130. // - cluster/gce/config-test.sh and
  131. // - test/e2e_node/conformance/run_test.sh.
  132. kc.PodCIDR = "10.100.0.0/24"
  133. kubeletConfigFlags = append(kubeletConfigFlags, "pod-cidr")
  134. kc.EvictionPressureTransitionPeriod = metav1.Duration{Duration: 30 * time.Second}
  135. kubeletConfigFlags = append(kubeletConfigFlags, "eviction-pressure-transition-period")
  136. kc.EvictionHard = map[string]string{
  137. "memory.available": "250Mi",
  138. "nodefs.available": "10%",
  139. "nodefs.inodesFree": "5%",
  140. }
  141. kubeletConfigFlags = append(kubeletConfigFlags, "eviction-hard")
  142. kc.EvictionMinimumReclaim = map[string]string{
  143. "nodefs.available": "5%",
  144. "nodefs.inodesFree": "5%",
  145. }
  146. kubeletConfigFlags = append(kubeletConfigFlags, "eviction-minimum-reclaim")
  147. var killCommand, restartCommand *exec.Cmd
  148. var isSystemd bool
  149. // Apply default kubelet flags.
  150. cmdArgs := []string{}
  151. if systemdRun, err := exec.LookPath("systemd-run"); err == nil {
  152. // On systemd services, detection of a service / unit works reliably while
  153. // detection of a process started from an ssh session does not work.
  154. // Since kubelet will typically be run as a service it also makes more
  155. // sense to test it that way
  156. isSystemd = true
  157. // We can ignore errors, to have GetTimestampFromWorkspaceDir() fallback
  158. // to the current time.
  159. cwd, _ := os.Getwd()
  160. // Use the timestamp from the current directory to name the systemd unit.
  161. unitTimestamp := remote.GetTimestampFromWorkspaceDir(cwd)
  162. unitName := fmt.Sprintf("kubelet-%s.service", unitTimestamp)
  163. cmdArgs = append(cmdArgs,
  164. systemdRun,
  165. "--unit="+unitName,
  166. "--slice=runtime.slice",
  167. "--remain-after-exit",
  168. builder.GetKubeletServerBin())
  169. killCommand = exec.Command("systemctl", "kill", unitName)
  170. restartCommand = exec.Command("systemctl", "restart", unitName)
  171. e.logs["kubelet.log"] = LogFileData{
  172. Name: "kubelet.log",
  173. JournalctlCommand: []string{"-u", unitName},
  174. }
  175. kc.KubeletCgroups = "/kubelet.slice"
  176. kubeletConfigFlags = append(kubeletConfigFlags, "kubelet-cgroups")
  177. } else {
  178. cmdArgs = append(cmdArgs, builder.GetKubeletServerBin())
  179. // TODO(random-liu): Get rid of this docker specific thing.
  180. cmdArgs = append(cmdArgs, "--runtime-cgroups=/docker-daemon")
  181. kc.KubeletCgroups = "/kubelet"
  182. kubeletConfigFlags = append(kubeletConfigFlags, "kubelet-cgroups")
  183. kc.SystemCgroups = "/system"
  184. kubeletConfigFlags = append(kubeletConfigFlags, "system-cgroups")
  185. }
  186. cmdArgs = append(cmdArgs,
  187. "--kubeconfig", kubeconfigPath,
  188. "--root-dir", KubeletRootDirectory,
  189. "--v", LogVerbosityLevel, "--logtostderr",
  190. )
  191. // Apply test framework feature gates by default. This could also be overridden
  192. // by kubelet-flags.
  193. if len(framework.TestContext.FeatureGates) > 0 {
  194. cmdArgs = append(cmdArgs, "--feature-gates", cliflag.NewMapStringBool(&framework.TestContext.FeatureGates).String())
  195. kc.FeatureGates = framework.TestContext.FeatureGates
  196. }
  197. if utilfeature.DefaultFeatureGate.Enabled(features.DynamicKubeletConfig) {
  198. // Enable dynamic config if the feature gate is enabled
  199. dynamicConfigDir, err := getDynamicConfigDir()
  200. if err != nil {
  201. return nil, err
  202. }
  203. cmdArgs = append(cmdArgs, "--dynamic-config-dir", dynamicConfigDir)
  204. }
  205. // Enable kubenet by default.
  206. cniBinDir, err := getCNIBinDirectory()
  207. if err != nil {
  208. return nil, err
  209. }
  210. cniConfDir, err := getCNIConfDirectory()
  211. if err != nil {
  212. return nil, err
  213. }
  214. cniCacheDir, err := getCNICacheDirectory()
  215. if err != nil {
  216. return nil, err
  217. }
  218. cmdArgs = append(cmdArgs,
  219. "--network-plugin=kubenet",
  220. "--cni-bin-dir", cniBinDir,
  221. "--cni-conf-dir", cniConfDir,
  222. "--cni-cache-dir", cniCacheDir)
  223. // Keep hostname override for convenience.
  224. if framework.TestContext.NodeName != "" { // If node name is specified, set hostname override.
  225. cmdArgs = append(cmdArgs, "--hostname-override", framework.TestContext.NodeName)
  226. }
  227. if framework.TestContext.ContainerRuntime != "" {
  228. cmdArgs = append(cmdArgs, "--container-runtime", framework.TestContext.ContainerRuntime)
  229. }
  230. if framework.TestContext.ContainerRuntimeEndpoint != "" {
  231. cmdArgs = append(cmdArgs, "--container-runtime-endpoint", framework.TestContext.ContainerRuntimeEndpoint)
  232. }
  233. if framework.TestContext.ImageServiceEndpoint != "" {
  234. cmdArgs = append(cmdArgs, "--image-service-endpoint", framework.TestContext.ImageServiceEndpoint)
  235. }
  236. // Write config file or flags, depending on whether --generate-kubelet-config-file was provided
  237. if genKubeletConfigFile {
  238. if err := writeKubeletConfigFile(kc, kubeletConfigPath); err != nil {
  239. return nil, err
  240. }
  241. // add the flag to load config from a file
  242. cmdArgs = append(cmdArgs, "--config", kubeletConfigPath)
  243. } else {
  244. // generate command line flags from the default config, since --generate-kubelet-config-file was not provided
  245. addKubeletConfigFlags(&cmdArgs, kc, kubeletConfigFlags)
  246. }
  247. // Override the default kubelet flags.
  248. cmdArgs = append(cmdArgs, kubeletArgs...)
  249. // Adjust the args if we are running kubelet with systemd.
  250. if isSystemd {
  251. adjustArgsForSystemd(cmdArgs)
  252. }
  253. cmd := exec.Command(cmdArgs[0], cmdArgs[1:]...)
  254. server := newServer(
  255. "kubelet",
  256. cmd,
  257. killCommand,
  258. restartCommand,
  259. []string{kubeletHealthCheckURL},
  260. "kubelet.log",
  261. e.monitorParent,
  262. true /* restartOnExit */)
  263. return server, server.start()
  264. }
  265. // addKubeletConfigFlags adds the flags we care about from the provided kubelet configuration object
  266. func addKubeletConfigFlags(cmdArgs *[]string, kc *kubeletconfig.KubeletConfiguration, flags []string) {
  267. fs := pflag.NewFlagSet("kubelet", pflag.ExitOnError)
  268. options.AddKubeletConfigFlags(fs, kc)
  269. for _, name := range flags {
  270. *cmdArgs = append(*cmdArgs, fmt.Sprintf("--%s=%s", name, fs.Lookup(name).Value.String()))
  271. }
  272. }
  273. // writeKubeletConfigFile writes the kubelet config file based on the args and returns the filename
  274. func writeKubeletConfigFile(internal *kubeletconfig.KubeletConfiguration, path string) error {
  275. data, err := kubeletconfigcodec.EncodeKubeletConfig(internal, kubeletconfigv1beta1.SchemeGroupVersion)
  276. if err != nil {
  277. return err
  278. }
  279. // create the directory, if it does not exist
  280. dir := filepath.Dir(path)
  281. if err := os.MkdirAll(dir, 0755); err != nil {
  282. return err
  283. }
  284. // write the file
  285. if err := ioutil.WriteFile(path, data, 0755); err != nil {
  286. return err
  287. }
  288. return nil
  289. }
  290. // createPodDirectory creates pod directory.
  291. func createPodDirectory() (string, error) {
  292. cwd, err := os.Getwd()
  293. if err != nil {
  294. return "", fmt.Errorf("failed to get current working directory: %v", err)
  295. }
  296. path, err := ioutil.TempDir(cwd, "static-pods")
  297. if err != nil {
  298. return "", fmt.Errorf("failed to create static pod directory: %v", err)
  299. }
  300. return path, nil
  301. }
  302. // createKubeconfig creates a kubeconfig file at the fully qualified `path`. The parent dirs must exist.
  303. func createKubeconfig(path string) error {
  304. kubeconfig := []byte(`apiVersion: v1
  305. kind: Config
  306. users:
  307. - name: kubelet
  308. clusters:
  309. - cluster:
  310. server: ` + getAPIServerClientURL() + `
  311. insecure-skip-tls-verify: true
  312. name: local
  313. contexts:
  314. - context:
  315. cluster: local
  316. user: kubelet
  317. name: local-context
  318. current-context: local-context`)
  319. if err := ioutil.WriteFile(path, kubeconfig, 0666); err != nil {
  320. return err
  321. }
  322. return nil
  323. }
  324. func createRootDirectory(path string) error {
  325. if _, err := os.Stat(path); err != nil {
  326. if os.IsNotExist(err) {
  327. return os.MkdirAll(path, os.FileMode(0755))
  328. }
  329. return err
  330. }
  331. return nil
  332. }
  333. func kubeconfigCWDPath() (string, error) {
  334. cwd, err := os.Getwd()
  335. if err != nil {
  336. return "", fmt.Errorf("failed to get current working directory: %v", err)
  337. }
  338. return filepath.Join(cwd, "kubeconfig"), nil
  339. }
  340. func kubeletConfigCWDPath() (string, error) {
  341. cwd, err := os.Getwd()
  342. if err != nil {
  343. return "", fmt.Errorf("failed to get current working directory: %v", err)
  344. }
  345. // DO NOT name this file "kubelet" - you will overwrite the kubelet binary and be very confused :)
  346. return filepath.Join(cwd, "kubelet-config"), nil
  347. }
  348. // like createKubeconfig, but creates kubeconfig at current-working-directory/kubeconfig
  349. // returns a fully-qualified path to the kubeconfig file
  350. func createKubeconfigCWD() (string, error) {
  351. kubeconfigPath, err := kubeconfigCWDPath()
  352. if err != nil {
  353. return "", err
  354. }
  355. if err = createKubeconfig(kubeconfigPath); err != nil {
  356. return "", err
  357. }
  358. return kubeconfigPath, nil
  359. }
  360. // getCNIBinDirectory returns CNI directory.
  361. func getCNIBinDirectory() (string, error) {
  362. cwd, err := os.Getwd()
  363. if err != nil {
  364. return "", err
  365. }
  366. return filepath.Join(cwd, "cni", "bin"), nil
  367. }
  368. // getCNIConfDirectory returns CNI Configuration directory.
  369. func getCNIConfDirectory() (string, error) {
  370. cwd, err := os.Getwd()
  371. if err != nil {
  372. return "", err
  373. }
  374. return filepath.Join(cwd, "cni", "net.d"), nil
  375. }
  376. // getCNICacheDirectory returns CNI Cache directory.
  377. func getCNICacheDirectory() (string, error) {
  378. cwd, err := os.Getwd()
  379. if err != nil {
  380. return "", err
  381. }
  382. return filepath.Join(cwd, "cni", "cache"), nil
  383. }
  384. // getDynamicConfigDir returns the directory for dynamic Kubelet configuration
  385. func getDynamicConfigDir() (string, error) {
  386. cwd, err := os.Getwd()
  387. if err != nil {
  388. return "", err
  389. }
  390. return filepath.Join(cwd, "dynamic-kubelet-config"), nil
  391. }
  392. // adjustArgsForSystemd escape special characters in kubelet arguments for systemd. Systemd
  393. // may try to do auto expansion without escaping.
  394. func adjustArgsForSystemd(args []string) {
  395. for i := range args {
  396. args[i] = strings.Replace(args[i], "%", "%%", -1)
  397. args[i] = strings.Replace(args[i], "$", "$$", -1)
  398. }
  399. }