network_partition.go 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package apps
  14. import (
  15. "context"
  16. "fmt"
  17. "strings"
  18. "time"
  19. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  20. "k8s.io/apimachinery/pkg/fields"
  21. "k8s.io/apimachinery/pkg/labels"
  22. "k8s.io/apimachinery/pkg/runtime"
  23. "k8s.io/apimachinery/pkg/util/wait"
  24. "k8s.io/apimachinery/pkg/watch"
  25. "k8s.io/client-go/tools/cache"
  26. v1 "k8s.io/api/core/v1"
  27. clientset "k8s.io/client-go/kubernetes"
  28. api "k8s.io/kubernetes/pkg/apis/core"
  29. nodepkg "k8s.io/kubernetes/pkg/controller/nodelifecycle"
  30. "k8s.io/kubernetes/test/e2e/common"
  31. "k8s.io/kubernetes/test/e2e/framework"
  32. e2ejob "k8s.io/kubernetes/test/e2e/framework/job"
  33. e2enetwork "k8s.io/kubernetes/test/e2e/framework/network"
  34. e2enode "k8s.io/kubernetes/test/e2e/framework/node"
  35. e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
  36. e2eservice "k8s.io/kubernetes/test/e2e/framework/service"
  37. e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
  38. e2esset "k8s.io/kubernetes/test/e2e/framework/statefulset"
  39. testutils "k8s.io/kubernetes/test/utils"
  40. "github.com/onsi/ginkgo"
  41. )
  42. const (
  43. podReadyTimeout = 2 * time.Minute
  44. podNotReadyTimeout = 1 * time.Minute
  45. nodeReadinessTimeout = 3 * time.Minute
  46. resizeNodeReadyTimeout = 2 * time.Minute
  47. )
  48. func expectNodeReadiness(isReady bool, newNode chan *v1.Node) {
  49. timeout := false
  50. expected := false
  51. timer := time.After(nodeReadinessTimeout)
  52. for !expected && !timeout {
  53. select {
  54. case n := <-newNode:
  55. if e2enode.IsConditionSetAsExpected(n, v1.NodeReady, isReady) {
  56. expected = true
  57. } else {
  58. framework.Logf("Observed node ready status is NOT %v as expected", isReady)
  59. }
  60. case <-timer:
  61. timeout = true
  62. }
  63. }
  64. if !expected {
  65. framework.Failf("Failed to observe node ready status change to %v", isReady)
  66. }
  67. }
  68. func podOnNode(podName, nodeName string, image string) *v1.Pod {
  69. return &v1.Pod{
  70. ObjectMeta: metav1.ObjectMeta{
  71. Name: podName,
  72. Labels: map[string]string{
  73. "name": podName,
  74. },
  75. },
  76. Spec: v1.PodSpec{
  77. Containers: []v1.Container{
  78. {
  79. Name: podName,
  80. Image: image,
  81. Args: []string{"serve-hostname"},
  82. Ports: []v1.ContainerPort{{ContainerPort: 9376}},
  83. },
  84. },
  85. NodeName: nodeName,
  86. RestartPolicy: v1.RestartPolicyNever,
  87. },
  88. }
  89. }
  90. func newPodOnNode(c clientset.Interface, namespace, podName, nodeName string) error {
  91. pod, err := c.CoreV1().Pods(namespace).Create(context.TODO(), podOnNode(podName, nodeName, framework.ServeHostnameImage), metav1.CreateOptions{})
  92. if err == nil {
  93. framework.Logf("Created pod %s on node %s", pod.ObjectMeta.Name, nodeName)
  94. } else {
  95. framework.Logf("Failed to create pod %s on node %s: %v", podName, nodeName, err)
  96. }
  97. return err
  98. }
  99. var _ = SIGDescribe("Network Partition [Disruptive] [Slow]", func() {
  100. f := framework.NewDefaultFramework("network-partition")
  101. var c clientset.Interface
  102. var ns string
  103. ginkgo.BeforeEach(func() {
  104. c = f.ClientSet
  105. ns = f.Namespace.Name
  106. _, err := e2epod.GetPodsInNamespace(c, ns, map[string]string{})
  107. framework.ExpectNoError(err)
  108. // TODO(foxish): Re-enable testing on gce after kubernetes#56787 is fixed.
  109. e2eskipper.SkipUnlessProviderIs("gke", "aws")
  110. if strings.Index(framework.TestContext.CloudConfig.NodeInstanceGroup, ",") >= 0 {
  111. framework.Failf("Test dose not support cluster setup with more than one MIG: %s", framework.TestContext.CloudConfig.NodeInstanceGroup)
  112. }
  113. })
  114. framework.KubeDescribe("Pods", func() {
  115. ginkgo.Context("should return to running and ready state after network partition is healed", func() {
  116. ginkgo.BeforeEach(func() {
  117. e2eskipper.SkipUnlessNodeCountIsAtLeast(2)
  118. e2eskipper.SkipUnlessSSHKeyPresent()
  119. })
  120. // What happens in this test:
  121. // Network traffic from a node to master is cut off to simulate network partition
  122. // Expect to observe:
  123. // 1. Node is marked NotReady after timeout by nodecontroller (40seconds)
  124. // 2. All pods on node are marked NotReady shortly after #1
  125. // 3. Node and pods return to Ready after connectivity recovers
  126. ginkgo.It("All pods on the unreachable node should be marked as NotReady upon the node turn NotReady "+
  127. "AND all pods should be mark back to Ready when the node get back to Ready before pod eviction timeout", func() {
  128. ginkgo.By("choose a node - we will block all network traffic on this node")
  129. var podOpts metav1.ListOptions
  130. nodeOpts := metav1.ListOptions{}
  131. nodes, err := c.CoreV1().Nodes().List(context.TODO(), nodeOpts)
  132. framework.ExpectNoError(err)
  133. e2enode.Filter(nodes, func(node v1.Node) bool {
  134. if !e2enode.IsConditionSetAsExpected(&node, v1.NodeReady, true) {
  135. return false
  136. }
  137. podOpts = metav1.ListOptions{FieldSelector: fields.OneTermEqualSelector(api.PodHostField, node.Name).String()}
  138. pods, err := c.CoreV1().Pods(metav1.NamespaceAll).List(context.TODO(), podOpts)
  139. if err != nil || len(pods.Items) <= 0 {
  140. return false
  141. }
  142. return true
  143. })
  144. if len(nodes.Items) <= 0 {
  145. framework.Failf("No eligible node were found: %d", len(nodes.Items))
  146. }
  147. node := nodes.Items[0]
  148. podOpts = metav1.ListOptions{FieldSelector: fields.OneTermEqualSelector(api.PodHostField, node.Name).String()}
  149. if err = e2epod.WaitForMatchPodsCondition(c, podOpts, "Running and Ready", podReadyTimeout, testutils.PodRunningReady); err != nil {
  150. framework.Failf("Pods on node %s are not ready and running within %v: %v", node.Name, podReadyTimeout, err)
  151. }
  152. ginkgo.By("Set up watch on node status")
  153. nodeSelector := fields.OneTermEqualSelector("metadata.name", node.Name)
  154. stopCh := make(chan struct{})
  155. newNode := make(chan *v1.Node)
  156. var controller cache.Controller
  157. _, controller = cache.NewInformer(
  158. &cache.ListWatch{
  159. ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
  160. options.FieldSelector = nodeSelector.String()
  161. obj, err := f.ClientSet.CoreV1().Nodes().List(context.TODO(), options)
  162. return runtime.Object(obj), err
  163. },
  164. WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
  165. options.FieldSelector = nodeSelector.String()
  166. return f.ClientSet.CoreV1().Nodes().Watch(context.TODO(), options)
  167. },
  168. },
  169. &v1.Node{},
  170. 0,
  171. cache.ResourceEventHandlerFuncs{
  172. UpdateFunc: func(oldObj, newObj interface{}) {
  173. n, ok := newObj.(*v1.Node)
  174. framework.ExpectEqual(ok, true)
  175. newNode <- n
  176. },
  177. },
  178. )
  179. defer func() {
  180. // Will not explicitly close newNode channel here due to
  181. // race condition where stopCh and newNode are closed but informer onUpdate still executes.
  182. close(stopCh)
  183. }()
  184. go controller.Run(stopCh)
  185. ginkgo.By(fmt.Sprintf("Block traffic from node %s to the master", node.Name))
  186. host, err := e2enode.GetExternalIP(&node)
  187. framework.ExpectNoError(err)
  188. masterAddresses := framework.GetAllMasterAddresses(c)
  189. defer func() {
  190. ginkgo.By(fmt.Sprintf("Unblock traffic from node %s to the master", node.Name))
  191. for _, masterAddress := range masterAddresses {
  192. framework.UnblockNetwork(host, masterAddress)
  193. }
  194. if ginkgo.CurrentGinkgoTestDescription().Failed {
  195. return
  196. }
  197. ginkgo.By("Expect to observe node and pod status change from NotReady to Ready after network connectivity recovers")
  198. expectNodeReadiness(true, newNode)
  199. if err = e2epod.WaitForMatchPodsCondition(c, podOpts, "Running and Ready", podReadyTimeout, testutils.PodRunningReady); err != nil {
  200. framework.Failf("Pods on node %s did not become ready and running within %v: %v", node.Name, podReadyTimeout, err)
  201. }
  202. }()
  203. for _, masterAddress := range masterAddresses {
  204. framework.BlockNetwork(host, masterAddress)
  205. }
  206. ginkgo.By("Expect to observe node and pod status change from Ready to NotReady after network partition")
  207. expectNodeReadiness(false, newNode)
  208. if err = e2epod.WaitForMatchPodsCondition(c, podOpts, "NotReady", podNotReadyTimeout, testutils.PodNotReady); err != nil {
  209. framework.Failf("Pods on node %s did not become NotReady within %v: %v", node.Name, podNotReadyTimeout, err)
  210. }
  211. })
  212. })
  213. })
  214. framework.KubeDescribe("[ReplicationController]", func() {
  215. ginkgo.It("should recreate pods scheduled on the unreachable node "+
  216. "AND allow scheduling of pods on a node after it rejoins the cluster", func() {
  217. e2eskipper.SkipUnlessSSHKeyPresent()
  218. // Create a replication controller for a service that serves its hostname.
  219. // The source for the Docker container kubernetes/serve_hostname is in contrib/for-demos/serve_hostname
  220. name := "my-hostname-net"
  221. common.NewSVCByName(c, ns, name)
  222. numNodes, err := e2enode.TotalRegistered(f.ClientSet)
  223. framework.ExpectNoError(err)
  224. replicas := int32(numNodes)
  225. common.NewRCByName(c, ns, name, replicas, nil, nil)
  226. err = e2epod.VerifyPods(c, ns, name, true, replicas)
  227. framework.ExpectNoError(err, "Each pod should start running and responding")
  228. ginkgo.By("choose a node with at least one pod - we will block some network traffic on this node")
  229. label := labels.SelectorFromSet(labels.Set(map[string]string{"name": name}))
  230. options := metav1.ListOptions{LabelSelector: label.String()}
  231. pods, err := c.CoreV1().Pods(ns).List(context.TODO(), options) // list pods after all have been scheduled
  232. framework.ExpectNoError(err)
  233. nodeName := pods.Items[0].Spec.NodeName
  234. node, err := c.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
  235. framework.ExpectNoError(err)
  236. // This creates a temporary network partition, verifies that 'podNameToDisappear',
  237. // that belongs to replication controller 'rcName', really disappeared (because its
  238. // grace period is set to 0).
  239. // Finally, it checks that the replication controller recreates the
  240. // pods on another node and that now the number of replicas is equal 'replicas'.
  241. ginkgo.By(fmt.Sprintf("blocking network traffic from node %s", node.Name))
  242. e2enetwork.TestUnderTemporaryNetworkFailure(c, ns, node, func() {
  243. framework.Logf("Waiting for pod %s to be removed", pods.Items[0].Name)
  244. err := waitForRCPodToDisappear(c, ns, name, pods.Items[0].Name)
  245. framework.ExpectNoError(err)
  246. ginkgo.By("verifying whether the pod from the unreachable node is recreated")
  247. err = e2epod.VerifyPods(c, ns, name, true, replicas)
  248. framework.ExpectNoError(err)
  249. })
  250. framework.Logf("Waiting %v for node %s to be ready once temporary network failure ends", resizeNodeReadyTimeout, node.Name)
  251. if !e2enode.WaitForNodeToBeReady(c, node.Name, resizeNodeReadyTimeout) {
  252. framework.Failf("Node %s did not become ready within %v", node.Name, resizeNodeReadyTimeout)
  253. }
  254. // sleep a bit, to allow Watch in NodeController to catch up.
  255. time.Sleep(5 * time.Second)
  256. ginkgo.By("verify whether new pods can be created on the re-attached node")
  257. // increasing the RC size is not a valid way to test this
  258. // since we have no guarantees the pod will be scheduled on our node.
  259. additionalPod := "additionalpod"
  260. err = newPodOnNode(c, ns, additionalPod, node.Name)
  261. framework.ExpectNoError(err)
  262. err = e2epod.VerifyPods(c, ns, additionalPod, true, 1)
  263. framework.ExpectNoError(err)
  264. // verify that it is really on the requested node
  265. {
  266. pod, err := c.CoreV1().Pods(ns).Get(context.TODO(), additionalPod, metav1.GetOptions{})
  267. framework.ExpectNoError(err)
  268. if pod.Spec.NodeName != node.Name {
  269. framework.Logf("Pod %s found on invalid node: %s instead of %s", pod.Name, pod.Spec.NodeName, node.Name)
  270. }
  271. }
  272. })
  273. ginkgo.It("should eagerly create replacement pod during network partition when termination grace is non-zero", func() {
  274. e2eskipper.SkipUnlessSSHKeyPresent()
  275. // Create a replication controller for a service that serves its hostname.
  276. // The source for the Docker container kubernetes/serve_hostname is in contrib/for-demos/serve_hostname
  277. name := "my-hostname-net"
  278. gracePeriod := int64(30)
  279. common.NewSVCByName(c, ns, name)
  280. numNodes, err := e2enode.TotalRegistered(f.ClientSet)
  281. framework.ExpectNoError(err)
  282. replicas := int32(numNodes)
  283. common.NewRCByName(c, ns, name, replicas, &gracePeriod, []string{"serve-hostname"})
  284. err = e2epod.VerifyPods(c, ns, name, true, replicas)
  285. framework.ExpectNoError(err, "Each pod should start running and responding")
  286. ginkgo.By("choose a node with at least one pod - we will block some network traffic on this node")
  287. label := labels.SelectorFromSet(labels.Set(map[string]string{"name": name}))
  288. options := metav1.ListOptions{LabelSelector: label.String()}
  289. pods, err := c.CoreV1().Pods(ns).List(context.TODO(), options) // list pods after all have been scheduled
  290. framework.ExpectNoError(err)
  291. nodeName := pods.Items[0].Spec.NodeName
  292. node, err := c.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
  293. framework.ExpectNoError(err)
  294. // This creates a temporary network partition, verifies that 'podNameToDisappear',
  295. // that belongs to replication controller 'rcName', did not disappear (because its
  296. // grace period is set to 30).
  297. // Finally, it checks that the replication controller recreates the
  298. // pods on another node and that now the number of replicas is equal 'replicas + 1'.
  299. ginkgo.By(fmt.Sprintf("blocking network traffic from node %s", node.Name))
  300. e2enetwork.TestUnderTemporaryNetworkFailure(c, ns, node, func() {
  301. framework.Logf("Waiting for pod %s to be removed", pods.Items[0].Name)
  302. err := waitForRCPodToDisappear(c, ns, name, pods.Items[0].Name)
  303. framework.ExpectEqual(err, wait.ErrWaitTimeout, "Pod was not deleted during network partition.")
  304. ginkgo.By(fmt.Sprintf("verifying that there are %v running pods during partition", replicas))
  305. _, err = e2epod.PodsCreated(c, ns, name, replicas)
  306. framework.ExpectNoError(err)
  307. })
  308. framework.Logf("Waiting %v for node %s to be ready once temporary network failure ends", resizeNodeReadyTimeout, node.Name)
  309. if !e2enode.WaitForNodeToBeReady(c, node.Name, resizeNodeReadyTimeout) {
  310. framework.Failf("Node %s did not become ready within %v", node.Name, resizeNodeReadyTimeout)
  311. }
  312. })
  313. })
  314. framework.KubeDescribe("[StatefulSet]", func() {
  315. psName := "ss"
  316. labels := map[string]string{
  317. "foo": "bar",
  318. }
  319. headlessSvcName := "test"
  320. ginkgo.BeforeEach(func() {
  321. // TODO(foxish): Re-enable testing on gce after kubernetes#56787 is fixed.
  322. e2eskipper.SkipUnlessProviderIs("gke")
  323. ginkgo.By("creating service " + headlessSvcName + " in namespace " + f.Namespace.Name)
  324. headlessService := e2eservice.CreateServiceSpec(headlessSvcName, "", true, labels)
  325. _, err := f.ClientSet.CoreV1().Services(f.Namespace.Name).Create(context.TODO(), headlessService, metav1.CreateOptions{})
  326. framework.ExpectNoError(err)
  327. c = f.ClientSet
  328. ns = f.Namespace.Name
  329. })
  330. ginkgo.AfterEach(func() {
  331. if ginkgo.CurrentGinkgoTestDescription().Failed {
  332. framework.DumpDebugInfo(c, ns)
  333. }
  334. framework.Logf("Deleting all stateful set in ns %v", ns)
  335. e2esset.DeleteAllStatefulSets(c, ns)
  336. })
  337. ginkgo.It("should come back up if node goes down [Slow] [Disruptive]", func() {
  338. petMounts := []v1.VolumeMount{{Name: "datadir", MountPath: "/data/"}}
  339. podMounts := []v1.VolumeMount{{Name: "home", MountPath: "/home"}}
  340. ps := e2esset.NewStatefulSet(psName, ns, headlessSvcName, 3, petMounts, podMounts, labels)
  341. _, err := c.AppsV1().StatefulSets(ns).Create(context.TODO(), ps, metav1.CreateOptions{})
  342. framework.ExpectNoError(err)
  343. nn, err := e2enode.TotalRegistered(f.ClientSet)
  344. framework.ExpectNoError(err)
  345. nodes, err := e2enode.CheckReady(f.ClientSet, nn, framework.NodeReadyInitialTimeout)
  346. framework.ExpectNoError(err)
  347. common.RestartNodes(f.ClientSet, nodes)
  348. ginkgo.By("waiting for pods to be running again")
  349. e2esset.WaitForRunningAndReady(c, *ps.Spec.Replicas, ps)
  350. })
  351. ginkgo.It("should not reschedule stateful pods if there is a network partition [Slow] [Disruptive]", func() {
  352. e2eskipper.SkipUnlessSSHKeyPresent()
  353. ps := e2esset.NewStatefulSet(psName, ns, headlessSvcName, 3, []v1.VolumeMount{}, []v1.VolumeMount{}, labels)
  354. _, err := c.AppsV1().StatefulSets(ns).Create(context.TODO(), ps, metav1.CreateOptions{})
  355. framework.ExpectNoError(err)
  356. e2esset.WaitForRunningAndReady(c, *ps.Spec.Replicas, ps)
  357. pod := e2esset.GetPodList(c, ps).Items[0]
  358. node, err := c.CoreV1().Nodes().Get(context.TODO(), pod.Spec.NodeName, metav1.GetOptions{})
  359. framework.ExpectNoError(err)
  360. // Blocks outgoing network traffic on 'node'. Then verifies that 'podNameToDisappear',
  361. // that belongs to StatefulSet 'statefulSetName', **does not** disappear due to forced deletion from the apiserver.
  362. // The grace period on the stateful pods is set to a value > 0.
  363. e2enetwork.TestUnderTemporaryNetworkFailure(c, ns, node, func() {
  364. framework.Logf("Checking that the NodeController does not force delete stateful pods %v", pod.Name)
  365. err := e2epod.WaitTimeoutForPodNoLongerRunningInNamespace(c, pod.Name, ns, 10*time.Minute)
  366. framework.ExpectEqual(err, wait.ErrWaitTimeout, "Pod was not deleted during network partition.")
  367. })
  368. framework.Logf("Waiting %v for node %s to be ready once temporary network failure ends", resizeNodeReadyTimeout, node.Name)
  369. if !e2enode.WaitForNodeToBeReady(c, node.Name, resizeNodeReadyTimeout) {
  370. framework.Failf("Node %s did not become ready within %v", node.Name, resizeNodeReadyTimeout)
  371. }
  372. ginkgo.By("waiting for pods to be running again")
  373. e2esset.WaitForRunningAndReady(c, *ps.Spec.Replicas, ps)
  374. })
  375. })
  376. framework.KubeDescribe("[Job]", func() {
  377. ginkgo.It("should create new pods when node is partitioned", func() {
  378. e2eskipper.SkipUnlessSSHKeyPresent()
  379. parallelism := int32(2)
  380. completions := int32(4)
  381. backoffLimit := int32(6) // default value
  382. job := e2ejob.NewTestJob("notTerminate", "network-partition", v1.RestartPolicyNever,
  383. parallelism, completions, nil, backoffLimit)
  384. job, err := e2ejob.CreateJob(f.ClientSet, f.Namespace.Name, job)
  385. framework.ExpectNoError(err)
  386. label := labels.SelectorFromSet(labels.Set(map[string]string{e2ejob.JobSelectorKey: job.Name}))
  387. ginkgo.By(fmt.Sprintf("verifying that there are now %v running pods", parallelism))
  388. _, err = e2epod.PodsCreatedByLabel(c, ns, job.Name, parallelism, label)
  389. framework.ExpectNoError(err)
  390. ginkgo.By("choose a node with at least one pod - we will block some network traffic on this node")
  391. options := metav1.ListOptions{LabelSelector: label.String()}
  392. pods, err := c.CoreV1().Pods(ns).List(context.TODO(), options) // list pods after all have been scheduled
  393. framework.ExpectNoError(err)
  394. nodeName := pods.Items[0].Spec.NodeName
  395. node, err := c.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
  396. framework.ExpectNoError(err)
  397. // This creates a temporary network partition, verifies that the job has 'parallelism' number of
  398. // running pods after the node-controller detects node unreachable.
  399. ginkgo.By(fmt.Sprintf("blocking network traffic from node %s", node.Name))
  400. e2enetwork.TestUnderTemporaryNetworkFailure(c, ns, node, func() {
  401. framework.Logf("Waiting for pod %s to be removed", pods.Items[0].Name)
  402. err := e2epod.WaitForPodToDisappear(c, ns, pods.Items[0].Name, label, 20*time.Second, 10*time.Minute)
  403. framework.ExpectEqual(err, wait.ErrWaitTimeout, "Pod was not deleted during network partition.")
  404. ginkgo.By(fmt.Sprintf("verifying that there are now %v running pods", parallelism))
  405. _, err = e2epod.PodsCreatedByLabel(c, ns, job.Name, parallelism, label)
  406. framework.ExpectNoError(err)
  407. })
  408. framework.Logf("Waiting %v for node %s to be ready once temporary network failure ends", resizeNodeReadyTimeout, node.Name)
  409. if !e2enode.WaitForNodeToBeReady(c, node.Name, resizeNodeReadyTimeout) {
  410. framework.Failf("Node %s did not become ready within %v", node.Name, resizeNodeReadyTimeout)
  411. }
  412. })
  413. })
  414. framework.KubeDescribe("Pods", func() {
  415. ginkgo.Context("should be evicted from unready Node", func() {
  416. ginkgo.BeforeEach(func() {
  417. e2eskipper.SkipUnlessNodeCountIsAtLeast(2)
  418. })
  419. // What happens in this test:
  420. // Network traffic from a node to master is cut off to simulate network partition
  421. // Expect to observe:
  422. // 1. Node is marked NotReady after timeout by nodecontroller (40seconds)
  423. // 2. All pods on node are marked NotReady shortly after #1
  424. // 3. After enough time passess all Pods are evicted from the given Node
  425. ginkgo.It("[Feature:TaintEviction] All pods on the unreachable node should be marked as NotReady upon the node turn NotReady "+
  426. "AND all pods should be evicted after eviction timeout passes", func() {
  427. e2eskipper.SkipUnlessSSHKeyPresent()
  428. ginkgo.By("choose a node - we will block all network traffic on this node")
  429. var podOpts metav1.ListOptions
  430. nodes, err := e2enode.GetReadySchedulableNodes(c)
  431. framework.ExpectNoError(err)
  432. e2enode.Filter(nodes, func(node v1.Node) bool {
  433. if !e2enode.IsConditionSetAsExpected(&node, v1.NodeReady, true) {
  434. return false
  435. }
  436. podOpts = metav1.ListOptions{FieldSelector: fields.OneTermEqualSelector(api.PodHostField, node.Name).String()}
  437. pods, err := c.CoreV1().Pods(metav1.NamespaceAll).List(context.TODO(), podOpts)
  438. if err != nil || len(pods.Items) <= 0 {
  439. return false
  440. }
  441. return true
  442. })
  443. if len(nodes.Items) <= 0 {
  444. framework.Failf("No eligible node were found: %d", len(nodes.Items))
  445. }
  446. node := nodes.Items[0]
  447. podOpts = metav1.ListOptions{FieldSelector: fields.OneTermEqualSelector(api.PodHostField, node.Name).String()}
  448. if err := e2epod.WaitForMatchPodsCondition(c, podOpts, "Running and Ready", podReadyTimeout, testutils.PodRunningReadyOrSucceeded); err != nil {
  449. framework.Failf("Pods on node %s are not ready and running within %v: %v", node.Name, podReadyTimeout, err)
  450. }
  451. pods, err := c.CoreV1().Pods(metav1.NamespaceAll).List(context.TODO(), podOpts)
  452. framework.ExpectNoError(err)
  453. podTolerationTimes := map[string]time.Duration{}
  454. // This test doesn't add tolerations by itself, but because they may be present in the cluster
  455. // it needs to account for that.
  456. for _, pod := range pods.Items {
  457. namespacedName := fmt.Sprintf("%v/%v", pod.Namespace, pod.Name)
  458. tolerations := pod.Spec.Tolerations
  459. framework.ExpectNoError(err)
  460. for _, toleration := range tolerations {
  461. if toleration.ToleratesTaint(nodepkg.UnreachableTaintTemplate) {
  462. if toleration.TolerationSeconds != nil {
  463. podTolerationTimes[namespacedName] = time.Duration(*toleration.TolerationSeconds) * time.Second
  464. break
  465. } else {
  466. podTolerationTimes[namespacedName] = -1
  467. }
  468. }
  469. }
  470. if _, ok := podTolerationTimes[namespacedName]; !ok {
  471. podTolerationTimes[namespacedName] = 0
  472. }
  473. }
  474. neverEvictedPods := []string{}
  475. maxTolerationTime := time.Duration(0)
  476. for podName, tolerationTime := range podTolerationTimes {
  477. if tolerationTime < 0 {
  478. neverEvictedPods = append(neverEvictedPods, podName)
  479. } else {
  480. if tolerationTime > maxTolerationTime {
  481. maxTolerationTime = tolerationTime
  482. }
  483. }
  484. }
  485. framework.Logf(
  486. "Only %v should be running after partition. Maximum TolerationSeconds among other Pods is %v",
  487. neverEvictedPods,
  488. maxTolerationTime,
  489. )
  490. ginkgo.By("Set up watch on node status")
  491. nodeSelector := fields.OneTermEqualSelector("metadata.name", node.Name)
  492. stopCh := make(chan struct{})
  493. newNode := make(chan *v1.Node)
  494. var controller cache.Controller
  495. _, controller = cache.NewInformer(
  496. &cache.ListWatch{
  497. ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
  498. options.FieldSelector = nodeSelector.String()
  499. obj, err := f.ClientSet.CoreV1().Nodes().List(context.TODO(), options)
  500. return runtime.Object(obj), err
  501. },
  502. WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
  503. options.FieldSelector = nodeSelector.String()
  504. return f.ClientSet.CoreV1().Nodes().Watch(context.TODO(), options)
  505. },
  506. },
  507. &v1.Node{},
  508. 0,
  509. cache.ResourceEventHandlerFuncs{
  510. UpdateFunc: func(oldObj, newObj interface{}) {
  511. n, ok := newObj.(*v1.Node)
  512. framework.ExpectEqual(ok, true)
  513. newNode <- n
  514. },
  515. },
  516. )
  517. defer func() {
  518. // Will not explicitly close newNode channel here due to
  519. // race condition where stopCh and newNode are closed but informer onUpdate still executes.
  520. close(stopCh)
  521. }()
  522. go controller.Run(stopCh)
  523. ginkgo.By(fmt.Sprintf("Block traffic from node %s to the master", node.Name))
  524. host, err := e2enode.GetExternalIP(&node)
  525. framework.ExpectNoError(err)
  526. masterAddresses := framework.GetAllMasterAddresses(c)
  527. defer func() {
  528. ginkgo.By(fmt.Sprintf("Unblock traffic from node %s to the master", node.Name))
  529. for _, masterAddress := range masterAddresses {
  530. framework.UnblockNetwork(host, masterAddress)
  531. }
  532. if ginkgo.CurrentGinkgoTestDescription().Failed {
  533. return
  534. }
  535. ginkgo.By("Expect to observe node status change from NotReady to Ready after network connectivity recovers")
  536. expectNodeReadiness(true, newNode)
  537. }()
  538. for _, masterAddress := range masterAddresses {
  539. framework.BlockNetwork(host, masterAddress)
  540. }
  541. ginkgo.By("Expect to observe node and pod status change from Ready to NotReady after network partition")
  542. expectNodeReadiness(false, newNode)
  543. framework.ExpectNoError(wait.Poll(1*time.Second, timeout, func() (bool, error) {
  544. return framework.NodeHasTaint(c, node.Name, nodepkg.UnreachableTaintTemplate)
  545. }))
  546. if err = e2epod.WaitForMatchPodsCondition(c, podOpts, "NotReady", podNotReadyTimeout, testutils.PodNotReady); err != nil {
  547. framework.Failf("Pods on node %s did not become NotReady within %v: %v", node.Name, podNotReadyTimeout, err)
  548. }
  549. sleepTime := maxTolerationTime + 20*time.Second
  550. ginkgo.By(fmt.Sprintf("Sleeping for %v and checking if all Pods were evicted", sleepTime))
  551. time.Sleep(sleepTime)
  552. pods, err = c.CoreV1().Pods(v1.NamespaceAll).List(context.TODO(), podOpts)
  553. framework.ExpectNoError(err)
  554. seenRunning := []string{}
  555. for _, pod := range pods.Items {
  556. namespacedName := fmt.Sprintf("%v/%v", pod.Namespace, pod.Name)
  557. shouldBeTerminating := true
  558. for _, neverEvictedPod := range neverEvictedPods {
  559. if neverEvictedPod == namespacedName {
  560. shouldBeTerminating = false
  561. }
  562. }
  563. if pod.DeletionTimestamp == nil {
  564. seenRunning = append(seenRunning, namespacedName)
  565. if shouldBeTerminating {
  566. framework.Failf("Pod %v should have been deleted but was seen running", namespacedName)
  567. }
  568. }
  569. }
  570. for _, neverEvictedPod := range neverEvictedPods {
  571. running := false
  572. for _, runningPod := range seenRunning {
  573. if runningPod == neverEvictedPod {
  574. running = true
  575. break
  576. }
  577. }
  578. if !running {
  579. framework.Failf("Pod %v was evicted even though it shouldn't", neverEvictedPod)
  580. }
  581. }
  582. })
  583. })
  584. })
  585. })
  586. // waitForRCPodToDisappear returns nil if the pod from the given replication controller (described by rcName) no longer exists.
  587. // In case of failure or too long waiting time, an error is returned.
  588. func waitForRCPodToDisappear(c clientset.Interface, ns, rcName, podName string) error {
  589. label := labels.SelectorFromSet(labels.Set(map[string]string{"name": rcName}))
  590. // NodeController evicts pod after 5 minutes, so we need timeout greater than that to observe effects.
  591. // The grace period must be set to 0 on the pod for it to be deleted during the partition.
  592. // Otherwise, it goes to the 'Terminating' state till the kubelet confirms deletion.
  593. return e2epod.WaitForPodToDisappear(c, ns, podName, label, 20*time.Second, 10*time.Minute)
  594. }