kubelet.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package node
  14. import (
  15. "context"
  16. "fmt"
  17. "path/filepath"
  18. "strings"
  19. "time"
  20. v1 "k8s.io/api/core/v1"
  21. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  22. "k8s.io/apimachinery/pkg/util/sets"
  23. "k8s.io/apimachinery/pkg/util/uuid"
  24. "k8s.io/apimachinery/pkg/util/wait"
  25. clientset "k8s.io/client-go/kubernetes"
  26. "k8s.io/kubernetes/test/e2e/framework"
  27. e2ekubelet "k8s.io/kubernetes/test/e2e/framework/kubelet"
  28. e2enode "k8s.io/kubernetes/test/e2e/framework/node"
  29. e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
  30. e2erc "k8s.io/kubernetes/test/e2e/framework/rc"
  31. e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
  32. e2essh "k8s.io/kubernetes/test/e2e/framework/ssh"
  33. "k8s.io/kubernetes/test/e2e/framework/volume"
  34. testutils "k8s.io/kubernetes/test/utils"
  35. imageutils "k8s.io/kubernetes/test/utils/image"
  36. "github.com/onsi/ginkgo"
  37. )
  38. const (
  39. // Interval to framework.Poll /runningpods on a node
  40. pollInterval = 1 * time.Second
  41. // Interval to framework.Poll /stats/container on a node
  42. containerStatsPollingInterval = 5 * time.Second
  43. // Maximum number of nodes that we constraint to
  44. maxNodesToCheck = 10
  45. )
  46. // getPodMatches returns a set of pod names on the given node that matches the
  47. // podNamePrefix and namespace.
  48. func getPodMatches(c clientset.Interface, nodeName string, podNamePrefix string, namespace string) sets.String {
  49. matches := sets.NewString()
  50. framework.Logf("Checking pods on node %v via /runningpods endpoint", nodeName)
  51. runningPods, err := e2ekubelet.GetKubeletPods(c, nodeName)
  52. if err != nil {
  53. framework.Logf("Error checking running pods on %v: %v", nodeName, err)
  54. return matches
  55. }
  56. for _, pod := range runningPods.Items {
  57. if pod.Namespace == namespace && strings.HasPrefix(pod.Name, podNamePrefix) {
  58. matches.Insert(pod.Name)
  59. }
  60. }
  61. return matches
  62. }
  63. // waitTillNPodsRunningOnNodes polls the /runningpods endpoint on kubelet until
  64. // it finds targetNumPods pods that match the given criteria (namespace and
  65. // podNamePrefix). Note that we usually use label selector to filter pods that
  66. // belong to the same RC. However, we use podNamePrefix with namespace here
  67. // because pods returned from /runningpods do not contain the original label
  68. // information; they are reconstructed by examining the container runtime. In
  69. // the scope of this test, we do not expect pod naming conflicts so
  70. // podNamePrefix should be sufficient to identify the pods.
  71. func waitTillNPodsRunningOnNodes(c clientset.Interface, nodeNames sets.String, podNamePrefix string, namespace string, targetNumPods int, timeout time.Duration) error {
  72. return wait.Poll(pollInterval, timeout, func() (bool, error) {
  73. matchCh := make(chan sets.String, len(nodeNames))
  74. for _, item := range nodeNames.List() {
  75. // Launch a goroutine per node to check the pods running on the nodes.
  76. nodeName := item
  77. go func() {
  78. matchCh <- getPodMatches(c, nodeName, podNamePrefix, namespace)
  79. }()
  80. }
  81. seen := sets.NewString()
  82. for i := 0; i < len(nodeNames.List()); i++ {
  83. seen = seen.Union(<-matchCh)
  84. }
  85. if seen.Len() == targetNumPods {
  86. return true, nil
  87. }
  88. framework.Logf("Waiting for %d pods to be running on the node; %d are currently running;", targetNumPods, seen.Len())
  89. return false, nil
  90. })
  91. }
  92. // Restart the passed-in nfs-server by issuing a `/usr/sbin/rpc.nfsd 1` command in the
  93. // pod's (only) container. This command changes the number of nfs server threads from
  94. // (presumably) zero back to 1, and therefore allows nfs to open connections again.
  95. func restartNfsServer(serverPod *v1.Pod) {
  96. const startcmd = "/usr/sbin/rpc.nfsd 1"
  97. ns := fmt.Sprintf("--namespace=%v", serverPod.Namespace)
  98. framework.RunKubectlOrDie(ns, "exec", ns, serverPod.Name, "--", "/bin/sh", "-c", startcmd)
  99. }
  100. // Stop the passed-in nfs-server by issuing a `/usr/sbin/rpc.nfsd 0` command in the
  101. // pod's (only) container. This command changes the number of nfs server threads to 0,
  102. // thus closing all open nfs connections.
  103. func stopNfsServer(serverPod *v1.Pod) {
  104. const stopcmd = "/usr/sbin/rpc.nfsd 0"
  105. ns := fmt.Sprintf("--namespace=%v", serverPod.Namespace)
  106. framework.RunKubectlOrDie(ns, "exec", ns, serverPod.Name, "--", "/bin/sh", "-c", stopcmd)
  107. }
  108. // Creates a pod that mounts an nfs volume that is served by the nfs-server pod. The container
  109. // will execute the passed in shell cmd. Waits for the pod to start.
  110. // Note: the nfs plugin is defined inline, no PV or PVC.
  111. func createPodUsingNfs(f *framework.Framework, c clientset.Interface, ns, nfsIP, cmd string) *v1.Pod {
  112. ginkgo.By("create pod using nfs volume")
  113. isPrivileged := true
  114. cmdLine := []string{"-c", cmd}
  115. pod := &v1.Pod{
  116. TypeMeta: metav1.TypeMeta{
  117. Kind: "Pod",
  118. APIVersion: "v1",
  119. },
  120. ObjectMeta: metav1.ObjectMeta{
  121. GenerateName: "pod-nfs-vol-",
  122. Namespace: ns,
  123. },
  124. Spec: v1.PodSpec{
  125. Containers: []v1.Container{
  126. {
  127. Name: "pod-nfs-vol",
  128. Image: imageutils.GetE2EImage(imageutils.BusyBox),
  129. Command: []string{"/bin/sh"},
  130. Args: cmdLine,
  131. VolumeMounts: []v1.VolumeMount{
  132. {
  133. Name: "nfs-vol",
  134. MountPath: "/mnt",
  135. },
  136. },
  137. SecurityContext: &v1.SecurityContext{
  138. Privileged: &isPrivileged,
  139. },
  140. },
  141. },
  142. RestartPolicy: v1.RestartPolicyNever, //don't restart pod
  143. Volumes: []v1.Volume{
  144. {
  145. Name: "nfs-vol",
  146. VolumeSource: v1.VolumeSource{
  147. NFS: &v1.NFSVolumeSource{
  148. Server: nfsIP,
  149. Path: "/exports",
  150. ReadOnly: false,
  151. },
  152. },
  153. },
  154. },
  155. },
  156. }
  157. rtnPod, err := c.CoreV1().Pods(ns).Create(context.TODO(), pod, metav1.CreateOptions{})
  158. framework.ExpectNoError(err)
  159. err = f.WaitForPodReady(rtnPod.Name) // running & ready
  160. framework.ExpectNoError(err)
  161. rtnPod, err = c.CoreV1().Pods(ns).Get(context.TODO(), rtnPod.Name, metav1.GetOptions{}) // return fresh pod
  162. framework.ExpectNoError(err)
  163. return rtnPod
  164. }
  165. // getHostExternalAddress gets the node for a pod and returns the first External
  166. // address. Returns an error if the node the pod is on doesn't have an External
  167. // address.
  168. func getHostExternalAddress(client clientset.Interface, p *v1.Pod) (externalAddress string, err error) {
  169. node, err := client.CoreV1().Nodes().Get(context.TODO(), p.Spec.NodeName, metav1.GetOptions{})
  170. if err != nil {
  171. return "", err
  172. }
  173. for _, address := range node.Status.Addresses {
  174. if address.Type == v1.NodeExternalIP {
  175. if address.Address != "" {
  176. externalAddress = address.Address
  177. break
  178. }
  179. }
  180. }
  181. if externalAddress == "" {
  182. err = fmt.Errorf("No external address for pod %v on node %v",
  183. p.Name, p.Spec.NodeName)
  184. }
  185. return
  186. }
  187. // Checks for a lingering nfs mount and/or uid directory on the pod's host. The host IP is used
  188. // so that this test runs in GCE, where it appears that SSH cannot resolve the hostname.
  189. // If expectClean is true then we expect the node to be cleaned up and thus commands like
  190. // `ls <uid-dir>` should fail (since that dir was removed). If expectClean is false then we expect
  191. // the node is not cleaned up, and thus cmds like `ls <uid-dir>` should succeed. We wait for the
  192. // kubelet to be cleaned up, afterwhich an error is reported.
  193. func checkPodCleanup(c clientset.Interface, pod *v1.Pod, expectClean bool) {
  194. timeout := 5 * time.Minute
  195. poll := 20 * time.Second
  196. podDir := filepath.Join("/var/lib/kubelet/pods", string(pod.UID))
  197. mountDir := filepath.Join(podDir, "volumes", "kubernetes.io~nfs")
  198. // use ip rather than hostname in GCE
  199. nodeIP, err := getHostExternalAddress(c, pod)
  200. framework.ExpectNoError(err)
  201. condMsg := "deleted"
  202. if !expectClean {
  203. condMsg = "present"
  204. }
  205. // table of host tests to perform (order may matter so not using a map)
  206. type testT struct {
  207. feature string // feature to test
  208. cmd string // remote command to execute on node
  209. }
  210. tests := []testT{
  211. {
  212. feature: "pod UID directory",
  213. cmd: fmt.Sprintf("sudo ls %v", podDir),
  214. },
  215. {
  216. feature: "pod nfs mount",
  217. cmd: fmt.Sprintf("sudo mount | grep %v", mountDir),
  218. },
  219. }
  220. for _, test := range tests {
  221. framework.Logf("Wait up to %v for host's (%v) %q to be %v", timeout, nodeIP, test.feature, condMsg)
  222. err = wait.Poll(poll, timeout, func() (bool, error) {
  223. result, err := e2essh.NodeExec(nodeIP, test.cmd, framework.TestContext.Provider)
  224. framework.ExpectNoError(err)
  225. e2essh.LogResult(result)
  226. ok := (result.Code == 0 && len(result.Stdout) > 0 && len(result.Stderr) == 0)
  227. if expectClean && ok { // keep trying
  228. return false, nil
  229. }
  230. if !expectClean && !ok { // stop wait loop
  231. return true, fmt.Errorf("%v is gone but expected to exist", test.feature)
  232. }
  233. return true, nil // done, host is as expected
  234. })
  235. framework.ExpectNoError(err, fmt.Sprintf("Host (%v) cleanup error: %v. Expected %q to be %v", nodeIP, err, test.feature, condMsg))
  236. }
  237. if expectClean {
  238. framework.Logf("Pod's host has been cleaned up")
  239. } else {
  240. framework.Logf("Pod's host has not been cleaned up (per expectation)")
  241. }
  242. }
  243. var _ = SIGDescribe("kubelet", func() {
  244. var (
  245. c clientset.Interface
  246. ns string
  247. )
  248. f := framework.NewDefaultFramework("kubelet")
  249. ginkgo.BeforeEach(func() {
  250. c = f.ClientSet
  251. ns = f.Namespace.Name
  252. })
  253. SIGDescribe("Clean up pods on node", func() {
  254. var (
  255. numNodes int
  256. nodeNames sets.String
  257. nodeLabels map[string]string
  258. resourceMonitor *e2ekubelet.ResourceMonitor
  259. )
  260. type DeleteTest struct {
  261. podsPerNode int
  262. timeout time.Duration
  263. }
  264. deleteTests := []DeleteTest{
  265. {podsPerNode: 10, timeout: 1 * time.Minute},
  266. }
  267. ginkgo.BeforeEach(func() {
  268. // Use node labels to restrict the pods to be assigned only to the
  269. // nodes we observe initially.
  270. nodeLabels = make(map[string]string)
  271. nodeLabels["kubelet_cleanup"] = "true"
  272. nodes, err := e2enode.GetBoundedReadySchedulableNodes(c, maxNodesToCheck)
  273. numNodes = len(nodes.Items)
  274. framework.ExpectNoError(err)
  275. nodeNames = sets.NewString()
  276. for i := 0; i < len(nodes.Items); i++ {
  277. nodeNames.Insert(nodes.Items[i].Name)
  278. }
  279. for nodeName := range nodeNames {
  280. for k, v := range nodeLabels {
  281. framework.AddOrUpdateLabelOnNode(c, nodeName, k, v)
  282. }
  283. }
  284. // While we only use a bounded number of nodes in the test. We need to know
  285. // the actual number of nodes in the cluster, to avoid running resourceMonitor
  286. // against large clusters.
  287. actualNodes, err := e2enode.GetReadySchedulableNodes(c)
  288. framework.ExpectNoError(err)
  289. // Start resourceMonitor only in small clusters.
  290. if len(actualNodes.Items) <= maxNodesToCheck {
  291. resourceMonitor = e2ekubelet.NewResourceMonitor(f.ClientSet, e2ekubelet.TargetContainers(), containerStatsPollingInterval)
  292. resourceMonitor.Start()
  293. }
  294. })
  295. ginkgo.AfterEach(func() {
  296. if resourceMonitor != nil {
  297. resourceMonitor.Stop()
  298. }
  299. // If we added labels to nodes in this test, remove them now.
  300. for nodeName := range nodeNames {
  301. for k := range nodeLabels {
  302. framework.RemoveLabelOffNode(c, nodeName, k)
  303. }
  304. }
  305. })
  306. for _, itArg := range deleteTests {
  307. name := fmt.Sprintf(
  308. "kubelet should be able to delete %d pods per node in %v.", itArg.podsPerNode, itArg.timeout)
  309. ginkgo.It(name, func() {
  310. totalPods := itArg.podsPerNode * numNodes
  311. ginkgo.By(fmt.Sprintf("Creating a RC of %d pods and wait until all pods of this RC are running", totalPods))
  312. rcName := fmt.Sprintf("cleanup%d-%s", totalPods, string(uuid.NewUUID()))
  313. err := e2erc.RunRC(testutils.RCConfig{
  314. Client: f.ClientSet,
  315. Name: rcName,
  316. Namespace: f.Namespace.Name,
  317. Image: imageutils.GetPauseImageName(),
  318. Replicas: totalPods,
  319. NodeSelector: nodeLabels,
  320. })
  321. framework.ExpectNoError(err)
  322. // Perform a sanity check so that we know all desired pods are
  323. // running on the nodes according to kubelet. The timeout is set to
  324. // only 30 seconds here because e2erc.RunRC already waited for all pods to
  325. // transition to the running status.
  326. err = waitTillNPodsRunningOnNodes(f.ClientSet, nodeNames, rcName, ns, totalPods, time.Second*30)
  327. framework.ExpectNoError(err)
  328. if resourceMonitor != nil {
  329. resourceMonitor.LogLatest()
  330. }
  331. ginkgo.By("Deleting the RC")
  332. e2erc.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, rcName)
  333. // Check that the pods really are gone by querying /runningpods on the
  334. // node. The /runningpods handler checks the container runtime (or its
  335. // cache) and returns a list of running pods. Some possible causes of
  336. // failures are:
  337. // - kubelet deadlock
  338. // - a bug in graceful termination (if it is enabled)
  339. // - docker slow to delete pods (or resource problems causing slowness)
  340. start := time.Now()
  341. err = waitTillNPodsRunningOnNodes(f.ClientSet, nodeNames, rcName, ns, 0, itArg.timeout)
  342. framework.ExpectNoError(err)
  343. framework.Logf("Deleting %d pods on %d nodes completed in %v after the RC was deleted", totalPods, len(nodeNames),
  344. time.Since(start))
  345. if resourceMonitor != nil {
  346. resourceMonitor.LogCPUSummary()
  347. }
  348. })
  349. }
  350. })
  351. // Test host cleanup when disrupting the volume environment.
  352. SIGDescribe("host cleanup with volume mounts [sig-storage][HostCleanup][Flaky]", func() {
  353. type hostCleanupTest struct {
  354. itDescr string
  355. podCmd string
  356. }
  357. // Disrupt the nfs-server pod after a client pod accesses the nfs volume.
  358. // Note: the nfs-server is stopped NOT deleted. This is done to preserve its ip addr.
  359. // If the nfs-server pod is deleted the client pod's mount can not be unmounted.
  360. // If the nfs-server pod is deleted and re-created, due to having a different ip
  361. // addr, the client pod's mount still cannot be unmounted.
  362. ginkgo.Context("Host cleanup after disrupting NFS volume [NFS]", func() {
  363. // issue #31272
  364. var (
  365. nfsServerPod *v1.Pod
  366. nfsIP string
  367. pod *v1.Pod // client pod
  368. )
  369. // fill in test slice for this context
  370. testTbl := []hostCleanupTest{
  371. {
  372. itDescr: "after stopping the nfs-server and deleting the (sleeping) client pod, the NFS mount and the pod's UID directory should be removed.",
  373. podCmd: "sleep 6000", // keep pod running
  374. },
  375. {
  376. itDescr: "after stopping the nfs-server and deleting the (active) client pod, the NFS mount and the pod's UID directory should be removed.",
  377. podCmd: "while true; do echo FeFieFoFum >>/mnt/SUCCESS; sleep 1; cat /mnt/SUCCESS; done",
  378. },
  379. }
  380. ginkgo.BeforeEach(func() {
  381. e2eskipper.SkipUnlessProviderIs(framework.ProvidersWithSSH...)
  382. _, nfsServerPod, nfsIP = volume.NewNFSServer(c, ns, []string{"-G", "777", "/exports"})
  383. })
  384. ginkgo.AfterEach(func() {
  385. err := e2epod.DeletePodWithWait(c, pod)
  386. framework.ExpectNoError(err, "AfterEach: Failed to delete client pod ", pod.Name)
  387. err = e2epod.DeletePodWithWait(c, nfsServerPod)
  388. framework.ExpectNoError(err, "AfterEach: Failed to delete server pod ", nfsServerPod.Name)
  389. })
  390. // execute It blocks from above table of tests
  391. for _, t := range testTbl {
  392. ginkgo.It(t.itDescr, func() {
  393. pod = createPodUsingNfs(f, c, ns, nfsIP, t.podCmd)
  394. ginkgo.By("Stop the NFS server")
  395. stopNfsServer(nfsServerPod)
  396. ginkgo.By("Delete the pod mounted to the NFS volume -- expect failure")
  397. err := e2epod.DeletePodWithWait(c, pod)
  398. framework.ExpectError(err)
  399. // pod object is now stale, but is intentionally not nil
  400. ginkgo.By("Check if pod's host has been cleaned up -- expect not")
  401. checkPodCleanup(c, pod, false)
  402. ginkgo.By("Restart the nfs server")
  403. restartNfsServer(nfsServerPod)
  404. ginkgo.By("Verify that the deleted client pod is now cleaned up")
  405. checkPodCleanup(c, pod, true)
  406. })
  407. }
  408. })
  409. })
  410. })