equivalence_cache_predicates.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package scheduling
  14. import (
  15. "fmt"
  16. "time"
  17. v1 "k8s.io/api/core/v1"
  18. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  19. "k8s.io/apimachinery/pkg/util/sets"
  20. "k8s.io/apimachinery/pkg/util/uuid"
  21. clientset "k8s.io/client-go/kubernetes"
  22. api "k8s.io/kubernetes/pkg/apis/core"
  23. "k8s.io/kubernetes/test/e2e/framework"
  24. e2elog "k8s.io/kubernetes/test/e2e/framework/log"
  25. testutils "k8s.io/kubernetes/test/utils"
  26. imageutils "k8s.io/kubernetes/test/utils/image"
  27. "github.com/onsi/ginkgo"
  28. "github.com/onsi/gomega"
  29. // ensure libs have a chance to initialize
  30. _ "github.com/stretchr/testify/assert"
  31. )
  32. const (
  33. defaultTimeout = 3 * time.Minute
  34. )
  35. var _ = framework.KubeDescribe("EquivalenceCache [Serial]", func() {
  36. var cs clientset.Interface
  37. var nodeList *v1.NodeList
  38. var masterNodes sets.String
  39. var systemPodsNo int
  40. var ns string
  41. f := framework.NewDefaultFramework("equivalence-cache")
  42. ginkgo.BeforeEach(func() {
  43. cs = f.ClientSet
  44. ns = f.Namespace.Name
  45. framework.WaitForAllNodesHealthy(cs, time.Minute)
  46. masterNodes, nodeList = framework.GetMasterAndWorkerNodesOrDie(cs)
  47. framework.ExpectNoError(framework.CheckTestingNSDeletedExcept(cs, ns))
  48. // Every test case in this suite assumes that cluster add-on pods stay stable and
  49. // cannot be run in parallel with any other test that touches Nodes or Pods.
  50. // It is so because we need to have precise control on what's running in the cluster.
  51. systemPods, err := framework.GetPodsInNamespace(cs, ns, map[string]string{})
  52. framework.ExpectNoError(err)
  53. systemPodsNo = 0
  54. for _, pod := range systemPods {
  55. if !masterNodes.Has(pod.Spec.NodeName) && pod.DeletionTimestamp == nil {
  56. systemPodsNo++
  57. }
  58. }
  59. err = framework.WaitForPodsRunningReady(cs, api.NamespaceSystem, int32(systemPodsNo), int32(systemPodsNo), framework.PodReadyBeforeTimeout, map[string]string{})
  60. framework.ExpectNoError(err)
  61. for _, node := range nodeList.Items {
  62. e2elog.Logf("\nLogging pods the kubelet thinks is on node %v before test", node.Name)
  63. framework.PrintAllKubeletPods(cs, node.Name)
  64. }
  65. })
  66. // This test verifies that GeneralPredicates works as expected:
  67. // When a replica pod (with HostPorts) is scheduled to a node, it will invalidate GeneralPredicates cache on this node,
  68. // so that subsequent replica pods with same host port claim will be rejected.
  69. // We enforce all replica pods bind to the same node so there will always be conflicts.
  70. ginkgo.It("validates GeneralPredicates is properly invalidated when a pod is scheduled [Slow]", func() {
  71. ginkgo.By("Launching a RC with two replica pods with HostPorts")
  72. nodeName := getNodeThatCanRunPodWithoutToleration(f)
  73. rcName := "host-port"
  74. // bind all replicas to same node
  75. nodeSelector := map[string]string{"kubernetes.io/hostname": nodeName}
  76. ginkgo.By("One pod should be scheduled, the other should be rejected")
  77. // CreateNodeSelectorPods creates RC with host port 4321
  78. WaitForSchedulerAfterAction(f, func() error {
  79. err := CreateNodeSelectorPods(f, rcName, 2, nodeSelector, false)
  80. return err
  81. }, ns, rcName, false)
  82. defer framework.DeleteRCAndWaitForGC(f.ClientSet, ns, rcName)
  83. // the first replica pod is scheduled, and the second pod will be rejected.
  84. verifyResult(cs, 1, 1, ns)
  85. })
  86. // This test verifies that MatchInterPodAffinity works as expected.
  87. // In equivalence cache, it does not handle inter pod affinity (anti-affinity) specially (unless node label changed),
  88. // because current predicates algorithm will ensure newly scheduled pod does not break existing affinity in cluster.
  89. ginkgo.It("validates pod affinity works properly when new replica pod is scheduled", func() {
  90. // create a pod running with label {security: S1}, and choose this node
  91. nodeName, _ := runAndKeepPodWithLabelAndGetNodeName(f)
  92. ginkgo.By("Trying to apply a random label on the found node.")
  93. // we need to use real failure domains, since scheduler only know them
  94. k := "failure-domain.beta.kubernetes.io/zone"
  95. v := "equivalence-e2e-test"
  96. oldValue := framework.AddOrUpdateLabelOnNodeAndReturnOldValue(cs, nodeName, k, v)
  97. framework.ExpectNodeHasLabel(cs, nodeName, k, v)
  98. // restore the node label
  99. defer framework.AddOrUpdateLabelOnNode(cs, nodeName, k, oldValue)
  100. ginkgo.By("Trying to schedule RC with Pod Affinity should success.")
  101. framework.WaitForStableCluster(cs, masterNodes)
  102. affinityRCName := "with-pod-affinity-" + string(uuid.NewUUID())
  103. replica := 2
  104. labelsMap := map[string]string{
  105. "name": affinityRCName,
  106. }
  107. affinity := &v1.Affinity{
  108. PodAffinity: &v1.PodAffinity{
  109. RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
  110. {
  111. LabelSelector: &metav1.LabelSelector{
  112. MatchExpressions: []metav1.LabelSelectorRequirement{
  113. {
  114. Key: "security",
  115. Operator: metav1.LabelSelectorOpIn,
  116. Values: []string{"S1"},
  117. },
  118. },
  119. },
  120. TopologyKey: k,
  121. Namespaces: []string{ns},
  122. },
  123. },
  124. },
  125. }
  126. rc := getRCWithInterPodAffinity(affinityRCName, labelsMap, replica, affinity, imageutils.GetPauseImageName())
  127. defer framework.DeleteRCAndWaitForGC(f.ClientSet, ns, affinityRCName)
  128. // RC should be running successfully
  129. // TODO: WaitForSchedulerAfterAction() can on be used to wait for failure event,
  130. // not for successful RC, since no specific pod name can be provided.
  131. _, err := cs.CoreV1().ReplicationControllers(ns).Create(rc)
  132. framework.ExpectNoError(err)
  133. framework.ExpectNoError(framework.WaitForControlledPodsRunning(cs, ns, affinityRCName, api.Kind("ReplicationController")))
  134. ginkgo.By("Remove node failure domain label")
  135. framework.RemoveLabelOffNode(cs, nodeName, k)
  136. ginkgo.By("Trying to schedule another equivalent Pod should fail due to node label has been removed.")
  137. // use scale to create another equivalent pod and wait for failure event
  138. WaitForSchedulerAfterAction(f, func() error {
  139. err := framework.ScaleRC(f.ClientSet, f.ScalesGetter, ns, affinityRCName, uint(replica+1), false)
  140. return err
  141. }, ns, affinityRCName, false)
  142. // and this new pod should be rejected since node label has been updated
  143. verifyReplicasResult(cs, replica, 1, ns, affinityRCName)
  144. })
  145. // This test verifies that MatchInterPodAffinity (anti-affinity) is respected as expected.
  146. ginkgo.It("validates pod anti-affinity works properly when new replica pod is scheduled", func() {
  147. ginkgo.By("Launching two pods on two distinct nodes to get two node names")
  148. CreateHostPortPods(f, "host-port", 2, true)
  149. defer framework.DeleteRCAndWaitForGC(f.ClientSet, ns, "host-port")
  150. podList, err := cs.CoreV1().Pods(ns).List(metav1.ListOptions{})
  151. framework.ExpectNoError(err)
  152. gomega.Expect(len(podList.Items)).To(gomega.Equal(2))
  153. nodeNames := []string{podList.Items[0].Spec.NodeName, podList.Items[1].Spec.NodeName}
  154. gomega.Expect(nodeNames[0]).ToNot(gomega.Equal(nodeNames[1]))
  155. ginkgo.By("Applying a random label to both nodes.")
  156. k := "e2e.inter-pod-affinity.kubernetes.io/zone"
  157. v := "equivalence-e2etest"
  158. for _, nodeName := range nodeNames {
  159. framework.AddOrUpdateLabelOnNode(cs, nodeName, k, v)
  160. framework.ExpectNodeHasLabel(cs, nodeName, k, v)
  161. defer framework.RemoveLabelOffNode(cs, nodeName, k)
  162. }
  163. ginkgo.By("Trying to launch a pod with the service label on the selected nodes.")
  164. // run a pod with label {"service": "S1"} and expect it to be running
  165. runPausePod(f, pausePodConfig{
  166. Name: "with-label-" + string(uuid.NewUUID()),
  167. Labels: map[string]string{"service": "S1"},
  168. NodeSelector: map[string]string{k: v}, // only launch on our two nodes
  169. })
  170. ginkgo.By("Trying to launch RC with podAntiAffinity on these two nodes should be rejected.")
  171. labelRCName := "with-podantiaffinity-" + string(uuid.NewUUID())
  172. replica := 2
  173. labelsMap := map[string]string{
  174. "name": labelRCName,
  175. }
  176. affinity := &v1.Affinity{
  177. PodAntiAffinity: &v1.PodAntiAffinity{
  178. RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
  179. {
  180. LabelSelector: &metav1.LabelSelector{
  181. MatchExpressions: []metav1.LabelSelectorRequirement{
  182. {
  183. Key: "service",
  184. Operator: metav1.LabelSelectorOpIn,
  185. Values: []string{"S1"},
  186. },
  187. },
  188. },
  189. TopologyKey: k,
  190. Namespaces: []string{ns},
  191. },
  192. },
  193. },
  194. }
  195. rc := getRCWithInterPodAffinityNodeSelector(labelRCName, labelsMap, replica, affinity,
  196. imageutils.GetPauseImageName(), map[string]string{k: v})
  197. defer framework.DeleteRCAndWaitForGC(f.ClientSet, ns, labelRCName)
  198. WaitForSchedulerAfterAction(f, func() error {
  199. _, err := cs.CoreV1().ReplicationControllers(ns).Create(rc)
  200. return err
  201. }, ns, labelRCName, false)
  202. // these two replicas should all be rejected since podAntiAffinity says it they anti-affinity with pod {"service": "S1"}
  203. verifyReplicasResult(cs, 0, replica, ns, labelRCName)
  204. })
  205. })
  206. // getRCWithInterPodAffinity returns RC with given affinity rules.
  207. func getRCWithInterPodAffinity(name string, labelsMap map[string]string, replica int, affinity *v1.Affinity, image string) *v1.ReplicationController {
  208. return getRCWithInterPodAffinityNodeSelector(name, labelsMap, replica, affinity, image, map[string]string{})
  209. }
  210. // getRCWithInterPodAffinity returns RC with given affinity rules and node selector.
  211. func getRCWithInterPodAffinityNodeSelector(name string, labelsMap map[string]string, replica int, affinity *v1.Affinity, image string, nodeSelector map[string]string) *v1.ReplicationController {
  212. replicaInt32 := int32(replica)
  213. return &v1.ReplicationController{
  214. ObjectMeta: metav1.ObjectMeta{
  215. Name: name,
  216. },
  217. Spec: v1.ReplicationControllerSpec{
  218. Replicas: &replicaInt32,
  219. Selector: labelsMap,
  220. Template: &v1.PodTemplateSpec{
  221. ObjectMeta: metav1.ObjectMeta{
  222. Labels: labelsMap,
  223. },
  224. Spec: v1.PodSpec{
  225. Affinity: affinity,
  226. Containers: []v1.Container{
  227. {
  228. Name: name,
  229. Image: image,
  230. },
  231. },
  232. DNSPolicy: v1.DNSDefault,
  233. NodeSelector: nodeSelector,
  234. },
  235. },
  236. },
  237. }
  238. }
  239. // CreateNodeSelectorPods creates RC with host port 4321 and defines node selector
  240. func CreateNodeSelectorPods(f *framework.Framework, id string, replicas int, nodeSelector map[string]string, expectRunning bool) error {
  241. ginkgo.By(fmt.Sprintf("Running RC which reserves host port and defines node selector"))
  242. config := &testutils.RCConfig{
  243. Client: f.ClientSet,
  244. Name: id,
  245. Namespace: f.Namespace.Name,
  246. Timeout: defaultTimeout,
  247. Image: imageutils.GetPauseImageName(),
  248. Replicas: replicas,
  249. HostPorts: map[string]int{"port1": 4321},
  250. NodeSelector: nodeSelector,
  251. }
  252. err := framework.RunRC(*config)
  253. if expectRunning {
  254. return err
  255. }
  256. return nil
  257. }