preemption.go 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package scheduling
  14. import (
  15. "context"
  16. "fmt"
  17. "strings"
  18. "sync/atomic"
  19. "time"
  20. appsv1 "k8s.io/api/apps/v1"
  21. v1 "k8s.io/api/core/v1"
  22. schedulingv1 "k8s.io/api/scheduling/v1"
  23. apierrors "k8s.io/apimachinery/pkg/api/errors"
  24. "k8s.io/apimachinery/pkg/api/resource"
  25. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  26. "k8s.io/apimachinery/pkg/runtime"
  27. "k8s.io/apimachinery/pkg/util/sets"
  28. "k8s.io/apimachinery/pkg/util/wait"
  29. "k8s.io/apimachinery/pkg/watch"
  30. clientset "k8s.io/client-go/kubernetes"
  31. "k8s.io/client-go/tools/cache"
  32. v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos"
  33. "k8s.io/kubernetes/pkg/apis/scheduling"
  34. "k8s.io/kubernetes/test/e2e/framework"
  35. e2enode "k8s.io/kubernetes/test/e2e/framework/node"
  36. e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
  37. "k8s.io/kubernetes/test/e2e/framework/replicaset"
  38. e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
  39. "github.com/onsi/ginkgo"
  40. "github.com/onsi/gomega"
  41. // ensure libs have a chance to initialize
  42. _ "github.com/stretchr/testify/assert"
  43. )
  44. type priorityPair struct {
  45. name string
  46. value int32
  47. }
  48. var _ = SIGDescribe("SchedulerPreemption [Serial]", func() {
  49. var cs clientset.Interface
  50. var nodeList *v1.NodeList
  51. var ns string
  52. f := framework.NewDefaultFramework("sched-preemption")
  53. lowPriority, mediumPriority, highPriority := int32(1), int32(100), int32(1000)
  54. lowPriorityClassName := f.BaseName + "-low-priority"
  55. mediumPriorityClassName := f.BaseName + "-medium-priority"
  56. highPriorityClassName := f.BaseName + "-high-priority"
  57. priorityPairs := []priorityPair{
  58. {name: lowPriorityClassName, value: lowPriority},
  59. {name: mediumPriorityClassName, value: mediumPriority},
  60. {name: highPriorityClassName, value: highPriority},
  61. }
  62. ginkgo.AfterEach(func() {
  63. for _, pair := range priorityPairs {
  64. cs.SchedulingV1().PriorityClasses().Delete(context.TODO(), pair.name, metav1.NewDeleteOptions(0))
  65. }
  66. })
  67. ginkgo.BeforeEach(func() {
  68. cs = f.ClientSet
  69. ns = f.Namespace.Name
  70. nodeList = &v1.NodeList{}
  71. var err error
  72. for _, pair := range priorityPairs {
  73. _, err := f.ClientSet.SchedulingV1().PriorityClasses().Create(context.TODO(), &schedulingv1.PriorityClass{ObjectMeta: metav1.ObjectMeta{Name: pair.name}, Value: pair.value}, metav1.CreateOptions{})
  74. framework.ExpectEqual(err == nil || apierrors.IsAlreadyExists(err), true)
  75. }
  76. e2enode.WaitForTotalHealthy(cs, time.Minute)
  77. masterNodes, nodeList, err = e2enode.GetMasterAndWorkerNodes(cs)
  78. if err != nil {
  79. framework.Logf("Unexpected error occurred: %v", err)
  80. }
  81. // TODO: write a wrapper for ExpectNoErrorWithOffset()
  82. framework.ExpectNoErrorWithOffset(0, err)
  83. err = framework.CheckTestingNSDeletedExcept(cs, ns)
  84. framework.ExpectNoError(err)
  85. })
  86. // This test verifies that when a higher priority pod is created and no node with
  87. // enough resources is found, scheduler preempts a lower priority pod to schedule
  88. // the high priority pod.
  89. ginkgo.It("validates basic preemption works", func() {
  90. var podRes v1.ResourceList
  91. // Create one pod per node that uses a lot of the node's resources.
  92. ginkgo.By("Create pods that use 60% of node resources.")
  93. pods := make([]*v1.Pod, 0, len(nodeList.Items))
  94. allPods, err := cs.CoreV1().Pods(metav1.NamespaceAll).List(context.TODO(), metav1.ListOptions{})
  95. framework.ExpectNoError(err)
  96. for i, node := range nodeList.Items {
  97. currentCPUUsage, currentMemUsage := getCurrentPodUsageOnTheNode(node.Name, allPods.Items, podRequestedResource)
  98. framework.Logf("Current cpu and memory usage %v, %v", currentCPUUsage, currentMemUsage)
  99. cpuAllocatable, found := node.Status.Allocatable["cpu"]
  100. framework.ExpectEqual(found, true)
  101. milliCPU := cpuAllocatable.MilliValue()
  102. milliCPU = int64(float64(milliCPU-currentCPUUsage) * float64(0.6))
  103. memAllocatable, found := node.Status.Allocatable["memory"]
  104. framework.ExpectEqual(found, true)
  105. memory := memAllocatable.Value()
  106. memory = int64(float64(memory-currentMemUsage) * float64(0.6))
  107. // If a node is already heavily utilized let not's create a pod there.
  108. if milliCPU <= 0 || memory <= 0 {
  109. framework.Logf("Node is heavily utilized, let's not create a pod here")
  110. continue
  111. }
  112. podRes = v1.ResourceList{}
  113. podRes[v1.ResourceCPU] = *resource.NewMilliQuantity(int64(milliCPU), resource.DecimalSI)
  114. podRes[v1.ResourceMemory] = *resource.NewQuantity(int64(memory), resource.BinarySI)
  115. // make the first pod low priority and the rest medium priority.
  116. priorityName := mediumPriorityClassName
  117. if len(pods) == 0 {
  118. priorityName = lowPriorityClassName
  119. }
  120. pods = append(pods, createPausePod(f, pausePodConfig{
  121. Name: fmt.Sprintf("pod%d-%v", i, priorityName),
  122. PriorityClassName: priorityName,
  123. Resources: &v1.ResourceRequirements{
  124. Requests: podRes,
  125. },
  126. NodeName: node.Name,
  127. }))
  128. framework.Logf("Created pod: %v", pods[i].Name)
  129. }
  130. if len(pods) < 2 {
  131. framework.Failf("We need at least two pods to be created but" +
  132. "all nodes are already heavily utilized, so preemption tests cannot be run")
  133. }
  134. ginkgo.By("Wait for pods to be scheduled.")
  135. for _, pod := range pods {
  136. framework.ExpectNoError(e2epod.WaitForPodRunningInNamespace(cs, pod))
  137. }
  138. // Set the pod request to the first pod's resources (should be low priority pod)
  139. podRes = pods[0].Spec.Containers[0].Resources.Requests
  140. ginkgo.By("Run a high priority pod that has same requirements as that of lower priority pod")
  141. // Create a high priority pod and make sure it is scheduled on the same node as the low priority pod.
  142. runPausePod(f, pausePodConfig{
  143. Name: "preemptor-pod",
  144. PriorityClassName: highPriorityClassName,
  145. Resources: &v1.ResourceRequirements{
  146. Requests: podRes,
  147. },
  148. NodeName: pods[0].Spec.NodeName,
  149. })
  150. preemptedPod, err := cs.CoreV1().Pods(pods[0].Namespace).Get(context.TODO(), pods[0].Name, metav1.GetOptions{})
  151. podPreempted := (err != nil && apierrors.IsNotFound(err)) ||
  152. (err == nil && preemptedPod.DeletionTimestamp != nil)
  153. for i := 1; i < len(pods); i++ {
  154. livePod, err := cs.CoreV1().Pods(pods[i].Namespace).Get(context.TODO(), pods[i].Name, metav1.GetOptions{})
  155. framework.ExpectNoError(err)
  156. gomega.Expect(livePod.DeletionTimestamp).To(gomega.BeNil())
  157. }
  158. framework.ExpectEqual(podPreempted, true)
  159. })
  160. // This test verifies that when a critical pod is created and no node with
  161. // enough resources is found, scheduler preempts a lower priority pod to schedule
  162. // this critical pod.
  163. ginkgo.It("validates lower priority pod preemption by critical pod", func() {
  164. var podRes v1.ResourceList
  165. // Create one pod per node that uses a lot of the node's resources.
  166. ginkgo.By("Create pods that use 60% of node resources.")
  167. pods := make([]*v1.Pod, 0, len(nodeList.Items))
  168. allPods, err := cs.CoreV1().Pods(metav1.NamespaceAll).List(context.TODO(), metav1.ListOptions{})
  169. framework.ExpectNoError(err)
  170. for i, node := range nodeList.Items {
  171. currentCPUUsage, currentMemUsage := getCurrentPodUsageOnTheNode(node.Name, allPods.Items, podRequestedResource)
  172. framework.Logf("Current cpu usage and memory usage is %v, %v", currentCPUUsage, currentMemUsage)
  173. cpuAllocatable, found := node.Status.Allocatable["cpu"]
  174. framework.ExpectEqual(found, true)
  175. milliCPU := cpuAllocatable.MilliValue()
  176. milliCPU = int64(float64(milliCPU-currentCPUUsage) * float64(0.6))
  177. memAllocatable, found := node.Status.Allocatable["memory"]
  178. framework.ExpectEqual(found, true)
  179. memory := memAllocatable.Value()
  180. memory = int64(float64(memory-currentMemUsage) * float64(0.6))
  181. podRes = v1.ResourceList{}
  182. // If a node is already heavily utilized let not's create a pod there.
  183. if milliCPU <= 0 || memory <= 0 {
  184. framework.Logf("Node is heavily utilized, let's not create a pod there")
  185. continue
  186. }
  187. podRes[v1.ResourceCPU] = *resource.NewMilliQuantity(int64(milliCPU), resource.DecimalSI)
  188. podRes[v1.ResourceMemory] = *resource.NewQuantity(int64(memory), resource.BinarySI)
  189. // make the first pod low priority and the rest medium priority.
  190. priorityName := mediumPriorityClassName
  191. if len(pods) == 0 {
  192. priorityName = lowPriorityClassName
  193. }
  194. pods[i] = createPausePod(f, pausePodConfig{
  195. Name: fmt.Sprintf("pod%d-%v", i, priorityName),
  196. PriorityClassName: priorityName,
  197. Resources: &v1.ResourceRequirements{
  198. Requests: podRes,
  199. },
  200. NodeName: node.Name,
  201. })
  202. framework.Logf("Created pod: %v", pods[i].Name)
  203. }
  204. if len(pods) < 2 {
  205. e2eskipper.Skipf("We need at least two pods to be created but" +
  206. "all nodes are already heavily utilized, so preemption tests cannot be run")
  207. }
  208. ginkgo.By("Wait for pods to be scheduled.")
  209. for _, pod := range pods {
  210. framework.ExpectNoError(e2epod.WaitForPodRunningInNamespace(cs, pod))
  211. }
  212. // We want this pod to be preempted
  213. podRes = pods[0].Spec.Containers[0].Resources.Requests
  214. ginkgo.By("Run a critical pod that use same resources as that of a lower priority pod")
  215. // Create a critical pod and make sure it is scheduled.
  216. defer func() {
  217. // Clean-up the critical pod
  218. // Always run cleanup to make sure the pod is properly cleaned up.
  219. err := f.ClientSet.CoreV1().Pods(metav1.NamespaceSystem).Delete(context.TODO(), "critical-pod", metav1.NewDeleteOptions(0))
  220. if err != nil && !apierrors.IsNotFound(err) {
  221. framework.Failf("Error cleanup pod `%s/%s`: %v", metav1.NamespaceSystem, "critical-pod", err)
  222. }
  223. }()
  224. runPausePod(f, pausePodConfig{
  225. Name: "critical-pod",
  226. Namespace: metav1.NamespaceSystem,
  227. PriorityClassName: scheduling.SystemClusterCritical,
  228. Resources: &v1.ResourceRequirements{
  229. Requests: podRes,
  230. },
  231. NodeName: pods[0].Spec.NodeName,
  232. })
  233. defer func() {
  234. // Clean-up the critical pod
  235. err := f.ClientSet.CoreV1().Pods(metav1.NamespaceSystem).Delete(context.TODO(), "critical-pod", metav1.NewDeleteOptions(0))
  236. framework.ExpectNoError(err)
  237. }()
  238. // Make sure that the lowest priority pod is deleted.
  239. preemptedPod, err := cs.CoreV1().Pods(pods[0].Namespace).Get(context.TODO(), pods[0].Name, metav1.GetOptions{})
  240. podPreempted := (err != nil && apierrors.IsNotFound(err)) ||
  241. (err == nil && preemptedPod.DeletionTimestamp != nil)
  242. for i := 1; i < len(pods); i++ {
  243. livePod, err := cs.CoreV1().Pods(pods[i].Namespace).Get(context.TODO(), pods[i].Name, metav1.GetOptions{})
  244. framework.ExpectNoError(err)
  245. gomega.Expect(livePod.DeletionTimestamp).To(gomega.BeNil())
  246. }
  247. framework.ExpectEqual(podPreempted, true)
  248. })
  249. ginkgo.Context("PodTopologySpread Preemption", func() {
  250. var nodeNames []string
  251. var nodes []*v1.Node
  252. topologyKey := "kubernetes.io/e2e-pts-preemption"
  253. var fakeRes v1.ResourceName = "example.com/fakePTSRes"
  254. ginkgo.BeforeEach(func() {
  255. ginkgo.By("Trying to get 2 available nodes which can run pod")
  256. nodeNames = Get2NodesThatCanRunPod(f)
  257. ginkgo.By(fmt.Sprintf("Apply dedicated topologyKey %v for this test on the 2 nodes.", topologyKey))
  258. for _, nodeName := range nodeNames {
  259. framework.AddOrUpdateLabelOnNode(cs, nodeName, topologyKey, nodeName)
  260. node, err := cs.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
  261. framework.ExpectNoError(err)
  262. // update Node API object with a fake resource
  263. nodeCopy := node.DeepCopy()
  264. // force it to update
  265. nodeCopy.ResourceVersion = "0"
  266. ginkgo.By(fmt.Sprintf("Apply 10 fake resource to node %v.", node.Name))
  267. nodeCopy.Status.Capacity[fakeRes] = resource.MustParse("10")
  268. node, err = cs.CoreV1().Nodes().UpdateStatus(context.TODO(), nodeCopy, metav1.UpdateOptions{})
  269. framework.ExpectNoError(err)
  270. nodes = append(nodes, node)
  271. }
  272. })
  273. ginkgo.AfterEach(func() {
  274. for _, nodeName := range nodeNames {
  275. framework.RemoveLabelOffNode(cs, nodeName, topologyKey)
  276. }
  277. for _, node := range nodes {
  278. nodeCopy := node.DeepCopy()
  279. // force it to update
  280. nodeCopy.ResourceVersion = "0"
  281. delete(nodeCopy.Status.Capacity, fakeRes)
  282. _, err := cs.CoreV1().Nodes().UpdateStatus(context.TODO(), nodeCopy, metav1.UpdateOptions{})
  283. framework.ExpectNoError(err)
  284. }
  285. })
  286. ginkgo.It("validates proper pods are preempted", func() {
  287. podLabel := "e2e-pts-preemption"
  288. nodeAffinity := &v1.Affinity{
  289. NodeAffinity: &v1.NodeAffinity{
  290. RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
  291. NodeSelectorTerms: []v1.NodeSelectorTerm{
  292. {
  293. MatchExpressions: []v1.NodeSelectorRequirement{
  294. {
  295. Key: topologyKey,
  296. Operator: v1.NodeSelectorOpIn,
  297. Values: nodeNames,
  298. },
  299. },
  300. },
  301. },
  302. },
  303. },
  304. }
  305. highPodCfg := pausePodConfig{
  306. Name: "high",
  307. Namespace: ns,
  308. Labels: map[string]string{podLabel: ""},
  309. PriorityClassName: highPriorityClassName,
  310. Affinity: nodeAffinity,
  311. Resources: &v1.ResourceRequirements{
  312. Requests: v1.ResourceList{fakeRes: resource.MustParse("9")},
  313. Limits: v1.ResourceList{fakeRes: resource.MustParse("9")},
  314. },
  315. }
  316. lowPodCfg := pausePodConfig{
  317. Namespace: ns,
  318. Labels: map[string]string{podLabel: ""},
  319. PriorityClassName: lowPriorityClassName,
  320. Affinity: nodeAffinity,
  321. Resources: &v1.ResourceRequirements{
  322. Requests: v1.ResourceList{fakeRes: resource.MustParse("3")},
  323. Limits: v1.ResourceList{fakeRes: resource.MustParse("3")},
  324. },
  325. }
  326. ginkgo.By("Create 1 High Pod and 3 Low Pods to occupy 9/10 of fake resources on both nodes.")
  327. // Prepare 1 High Pod and 3 Low Pods
  328. runPausePod(f, highPodCfg)
  329. for i := 1; i <= 3; i++ {
  330. lowPodCfg.Name = fmt.Sprintf("low-%v", i)
  331. runPausePod(f, lowPodCfg)
  332. }
  333. ginkgo.By("Create 1 Medium Pod with TopologySpreadConstraints")
  334. mediumPodCfg := pausePodConfig{
  335. Name: "medium",
  336. Namespace: ns,
  337. Labels: map[string]string{podLabel: ""},
  338. PriorityClassName: mediumPriorityClassName,
  339. Affinity: nodeAffinity,
  340. Resources: &v1.ResourceRequirements{
  341. Requests: v1.ResourceList{fakeRes: resource.MustParse("3")},
  342. Limits: v1.ResourceList{fakeRes: resource.MustParse("3")},
  343. },
  344. TopologySpreadConstraints: []v1.TopologySpreadConstraint{
  345. {
  346. MaxSkew: 1,
  347. TopologyKey: topologyKey,
  348. WhenUnsatisfiable: v1.DoNotSchedule,
  349. LabelSelector: &metav1.LabelSelector{
  350. MatchExpressions: []metav1.LabelSelectorRequirement{
  351. {
  352. Key: podLabel,
  353. Operator: metav1.LabelSelectorOpExists,
  354. },
  355. },
  356. },
  357. },
  358. },
  359. }
  360. // To fulfil resource.requests, the medium Pod only needs to preempt one low pod.
  361. // However, in that case, the Pods spread becomes [<high>, <medium, low, low>], which doesn't
  362. // satisfy the pod topology spread constraints. Hence it needs to preempt another low pod
  363. // to make the Pods spread like [<high>, <medium, low>].
  364. runPausePod(f, mediumPodCfg)
  365. e2epod.WaitForPodNotPending(cs, ns, mediumPodCfg.Name)
  366. ginkgo.By("Verify there are 3 Pods left in this namespace")
  367. wantPods := sets.NewString("high", "medium", "low")
  368. podList, err := cs.CoreV1().Pods(ns).List(context.TODO(), metav1.ListOptions{})
  369. pods := podList.Items
  370. framework.ExpectNoError(err)
  371. framework.ExpectEqual(len(pods), 3)
  372. for _, pod := range pods {
  373. // Remove the ordinal index for low pod.
  374. podName := strings.Split(pod.Name, "-")[0]
  375. if wantPods.Has(podName) {
  376. ginkgo.By(fmt.Sprintf("Pod %q is as expected to be running.", pod.Name))
  377. wantPods.Delete(podName)
  378. } else {
  379. framework.Failf("Pod %q conflicted with expected PodSet %v", podName, wantPods)
  380. }
  381. }
  382. })
  383. })
  384. })
  385. // construct a fakecpu so as to set it to status of Node object
  386. // otherwise if we update CPU/Memory/etc, those values will be corrected back by kubelet
  387. var fakecpu v1.ResourceName = "example.com/fakecpu"
  388. var _ = SIGDescribe("PreemptionExecutionPath", func() {
  389. var cs clientset.Interface
  390. var node *v1.Node
  391. var ns, nodeHostNameLabel string
  392. f := framework.NewDefaultFramework("sched-preemption-path")
  393. priorityPairs := make([]priorityPair, 0)
  394. ginkgo.AfterEach(func() {
  395. // print out additional info if tests failed
  396. if ginkgo.CurrentGinkgoTestDescription().Failed {
  397. // list existing priorities
  398. priorityList, err := cs.SchedulingV1().PriorityClasses().List(context.TODO(), metav1.ListOptions{})
  399. if err != nil {
  400. framework.Logf("Unable to list priorities: %v", err)
  401. } else {
  402. framework.Logf("List existing priorities:")
  403. for _, p := range priorityList.Items {
  404. framework.Logf("%v/%v created at %v", p.Name, p.Value, p.CreationTimestamp)
  405. }
  406. }
  407. }
  408. if node != nil {
  409. nodeCopy := node.DeepCopy()
  410. // force it to update
  411. nodeCopy.ResourceVersion = "0"
  412. delete(nodeCopy.Status.Capacity, fakecpu)
  413. _, err := cs.CoreV1().Nodes().UpdateStatus(context.TODO(), nodeCopy, metav1.UpdateOptions{})
  414. framework.ExpectNoError(err)
  415. }
  416. for _, pair := range priorityPairs {
  417. cs.SchedulingV1().PriorityClasses().Delete(context.TODO(), pair.name, metav1.NewDeleteOptions(0))
  418. }
  419. })
  420. ginkgo.BeforeEach(func() {
  421. cs = f.ClientSet
  422. ns = f.Namespace.Name
  423. // find an available node
  424. ginkgo.By("Finding an available node")
  425. nodeName := GetNodeThatCanRunPod(f)
  426. framework.Logf("found a healthy node: %s", nodeName)
  427. // get the node API object
  428. var err error
  429. node, err = cs.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
  430. if err != nil {
  431. framework.Failf("error getting node %q: %v", nodeName, err)
  432. }
  433. var ok bool
  434. nodeHostNameLabel, ok = node.GetObjectMeta().GetLabels()["kubernetes.io/hostname"]
  435. if !ok {
  436. framework.Failf("error getting kubernetes.io/hostname label on node %s", nodeName)
  437. }
  438. // update Node API object with a fake resource
  439. nodeCopy := node.DeepCopy()
  440. // force it to update
  441. nodeCopy.ResourceVersion = "0"
  442. nodeCopy.Status.Capacity[fakecpu] = resource.MustParse("1000")
  443. node, err = cs.CoreV1().Nodes().UpdateStatus(context.TODO(), nodeCopy, metav1.UpdateOptions{})
  444. framework.ExpectNoError(err)
  445. // create four PriorityClass: p1, p2, p3, p4
  446. for i := 1; i <= 4; i++ {
  447. priorityName := fmt.Sprintf("p%d", i)
  448. priorityVal := int32(i)
  449. priorityPairs = append(priorityPairs, priorityPair{name: priorityName, value: priorityVal})
  450. _, err := cs.SchedulingV1().PriorityClasses().Create(context.TODO(), &schedulingv1.PriorityClass{ObjectMeta: metav1.ObjectMeta{Name: priorityName}, Value: priorityVal}, metav1.CreateOptions{})
  451. if err != nil {
  452. framework.Logf("Failed to create priority '%v/%v': %v", priorityName, priorityVal, err)
  453. framework.Logf("Reason: %v. Msg: %v", apierrors.ReasonForError(err), err)
  454. }
  455. framework.ExpectEqual(err == nil || apierrors.IsAlreadyExists(err), true)
  456. }
  457. })
  458. ginkgo.It("runs ReplicaSets to verify preemption running path", func() {
  459. podNamesSeen := []int32{0, 0, 0}
  460. stopCh := make(chan struct{})
  461. // create a pod controller to list/watch pod events from the test framework namespace
  462. _, podController := cache.NewInformer(
  463. &cache.ListWatch{
  464. ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
  465. obj, err := f.ClientSet.CoreV1().Pods(ns).List(context.TODO(), options)
  466. return runtime.Object(obj), err
  467. },
  468. WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
  469. return f.ClientSet.CoreV1().Pods(ns).Watch(context.TODO(), options)
  470. },
  471. },
  472. &v1.Pod{},
  473. 0,
  474. cache.ResourceEventHandlerFuncs{
  475. AddFunc: func(obj interface{}) {
  476. if pod, ok := obj.(*v1.Pod); ok {
  477. if strings.HasPrefix(pod.Name, "rs-pod1") {
  478. atomic.AddInt32(&podNamesSeen[0], 1)
  479. } else if strings.HasPrefix(pod.Name, "rs-pod2") {
  480. atomic.AddInt32(&podNamesSeen[1], 1)
  481. } else if strings.HasPrefix(pod.Name, "rs-pod3") {
  482. atomic.AddInt32(&podNamesSeen[2], 1)
  483. }
  484. }
  485. },
  486. },
  487. )
  488. go podController.Run(stopCh)
  489. defer close(stopCh)
  490. // prepare three ReplicaSet
  491. rsConfs := []pauseRSConfig{
  492. {
  493. Replicas: int32(1),
  494. PodConfig: pausePodConfig{
  495. Name: "pod1",
  496. Namespace: ns,
  497. Labels: map[string]string{"name": "pod1"},
  498. PriorityClassName: "p1",
  499. NodeSelector: map[string]string{"kubernetes.io/hostname": nodeHostNameLabel},
  500. Resources: &v1.ResourceRequirements{
  501. Requests: v1.ResourceList{fakecpu: resource.MustParse("200")},
  502. Limits: v1.ResourceList{fakecpu: resource.MustParse("200")},
  503. },
  504. },
  505. },
  506. {
  507. Replicas: int32(1),
  508. PodConfig: pausePodConfig{
  509. Name: "pod2",
  510. Namespace: ns,
  511. Labels: map[string]string{"name": "pod2"},
  512. PriorityClassName: "p2",
  513. NodeSelector: map[string]string{"kubernetes.io/hostname": nodeHostNameLabel},
  514. Resources: &v1.ResourceRequirements{
  515. Requests: v1.ResourceList{fakecpu: resource.MustParse("300")},
  516. Limits: v1.ResourceList{fakecpu: resource.MustParse("300")},
  517. },
  518. },
  519. },
  520. {
  521. Replicas: int32(1),
  522. PodConfig: pausePodConfig{
  523. Name: "pod3",
  524. Namespace: ns,
  525. Labels: map[string]string{"name": "pod3"},
  526. PriorityClassName: "p3",
  527. NodeSelector: map[string]string{"kubernetes.io/hostname": nodeHostNameLabel},
  528. Resources: &v1.ResourceRequirements{
  529. Requests: v1.ResourceList{fakecpu: resource.MustParse("450")},
  530. Limits: v1.ResourceList{fakecpu: resource.MustParse("450")},
  531. },
  532. },
  533. },
  534. }
  535. // create ReplicaSet{1,2,3} so as to occupy 950/1000 fake resource
  536. for i := range rsConfs {
  537. runPauseRS(f, rsConfs[i])
  538. }
  539. framework.Logf("pods created so far: %v", podNamesSeen)
  540. framework.Logf("length of pods created so far: %v", len(podNamesSeen))
  541. // create a Preemptor Pod
  542. preemptorPodConf := pausePodConfig{
  543. Name: "pod4",
  544. Namespace: ns,
  545. Labels: map[string]string{"name": "pod4"},
  546. PriorityClassName: "p4",
  547. NodeSelector: map[string]string{"kubernetes.io/hostname": nodeHostNameLabel},
  548. Resources: &v1.ResourceRequirements{
  549. Requests: v1.ResourceList{fakecpu: resource.MustParse("500")},
  550. Limits: v1.ResourceList{fakecpu: resource.MustParse("500")},
  551. },
  552. }
  553. preemptorPod := createPod(f, preemptorPodConf)
  554. waitForPreemptingWithTimeout(f, preemptorPod, framework.PodGetTimeout)
  555. framework.Logf("pods created so far: %v", podNamesSeen)
  556. // count pods number of ReplicaSet{1,2,3}:
  557. // - if it's more than expected replicas, it denotes its pods have been over-preempted
  558. // - if it's less than expected replicas, it denotes its pods are under-preempted
  559. // "*2" means pods of ReplicaSet{1,2} are expected to be only preempted once.
  560. expectedRSPods := []int32{1 * 2, 1 * 2, 1}
  561. err := wait.Poll(framework.Poll, framework.PollShortTimeout, func() (bool, error) {
  562. for i := 0; i < len(podNamesSeen); i++ {
  563. got := atomic.LoadInt32(&podNamesSeen[i])
  564. if got < expectedRSPods[i] {
  565. framework.Logf("waiting for rs%d to observe %d pod creations, got %d", i+1, expectedRSPods[i], got)
  566. return false, nil
  567. } else if got > expectedRSPods[i] {
  568. return false, fmt.Errorf("rs%d had more than %d pods created: %d", i+1, expectedRSPods[i], got)
  569. }
  570. }
  571. return true, nil
  572. })
  573. if err != nil {
  574. framework.Logf("pods created so far: %v", podNamesSeen)
  575. framework.Failf("failed pod observation expectations: %v", err)
  576. }
  577. // If logic continues to here, we should do a final check to ensure within a time period,
  578. // the state is stable; otherwise, pods may be over-preempted.
  579. time.Sleep(5 * time.Second)
  580. for i := 0; i < len(podNamesSeen); i++ {
  581. got := atomic.LoadInt32(&podNamesSeen[i])
  582. if got < expectedRSPods[i] {
  583. framework.Failf("pods of ReplicaSet%d have been under-preempted: expect %v pod names, but got %d", i+1, expectedRSPods[i], got)
  584. } else if got > expectedRSPods[i] {
  585. framework.Failf("pods of ReplicaSet%d have been over-preempted: expect %v pod names, but got %d", i+1, expectedRSPods[i], got)
  586. }
  587. }
  588. })
  589. })
  590. type pauseRSConfig struct {
  591. Replicas int32
  592. PodConfig pausePodConfig
  593. }
  594. func initPauseRS(f *framework.Framework, conf pauseRSConfig) *appsv1.ReplicaSet {
  595. pausePod := initPausePod(f, conf.PodConfig)
  596. pauseRS := &appsv1.ReplicaSet{
  597. ObjectMeta: metav1.ObjectMeta{
  598. Name: "rs-" + pausePod.Name,
  599. Namespace: pausePod.Namespace,
  600. },
  601. Spec: appsv1.ReplicaSetSpec{
  602. Replicas: &conf.Replicas,
  603. Selector: &metav1.LabelSelector{
  604. MatchLabels: pausePod.Labels,
  605. },
  606. Template: v1.PodTemplateSpec{
  607. ObjectMeta: metav1.ObjectMeta{Labels: pausePod.ObjectMeta.Labels},
  608. Spec: pausePod.Spec,
  609. },
  610. },
  611. }
  612. return pauseRS
  613. }
  614. func createPauseRS(f *framework.Framework, conf pauseRSConfig) *appsv1.ReplicaSet {
  615. namespace := conf.PodConfig.Namespace
  616. if len(namespace) == 0 {
  617. namespace = f.Namespace.Name
  618. }
  619. rs, err := f.ClientSet.AppsV1().ReplicaSets(namespace).Create(context.TODO(), initPauseRS(f, conf), metav1.CreateOptions{})
  620. framework.ExpectNoError(err)
  621. return rs
  622. }
  623. func runPauseRS(f *framework.Framework, conf pauseRSConfig) *appsv1.ReplicaSet {
  624. rs := createPauseRS(f, conf)
  625. framework.ExpectNoError(replicaset.WaitForReplicaSetTargetAvailableReplicasWithTimeout(f.ClientSet, rs, conf.Replicas, framework.PodGetTimeout))
  626. return rs
  627. }
  628. func createPod(f *framework.Framework, conf pausePodConfig) *v1.Pod {
  629. namespace := conf.Namespace
  630. if len(namespace) == 0 {
  631. namespace = f.Namespace.Name
  632. }
  633. pod, err := f.ClientSet.CoreV1().Pods(namespace).Create(context.TODO(), initPausePod(f, conf), metav1.CreateOptions{})
  634. framework.ExpectNoError(err)
  635. return pod
  636. }
  637. // waitForPreemptingWithTimeout verifies if 'pod' is preempting within 'timeout', specifically it checks
  638. // if the 'spec.NodeName' field of preemptor 'pod' has been set.
  639. func waitForPreemptingWithTimeout(f *framework.Framework, pod *v1.Pod, timeout time.Duration) {
  640. err := wait.Poll(2*time.Second, timeout, func() (bool, error) {
  641. pod, err := f.ClientSet.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{})
  642. if err != nil {
  643. return false, err
  644. }
  645. if len(pod.Spec.NodeName) > 0 {
  646. return true, nil
  647. }
  648. return false, err
  649. })
  650. framework.ExpectNoError(err, "pod %v/%v failed to preempt other pods", pod.Namespace, pod.Name)
  651. }
  652. func getCurrentPodUsageOnTheNode(nodeName string, pods []v1.Pod, resource *v1.ResourceRequirements) (int64, int64) {
  653. totalRequestedCPUResource := resource.Requests.Cpu().MilliValue()
  654. totalRequestedMemResource := resource.Requests.Memory().Value()
  655. for _, pod := range pods {
  656. if pod.Spec.NodeName != nodeName || v1qos.GetPodQOS(&pod) == v1.PodQOSBestEffort {
  657. continue
  658. }
  659. result := getNonZeroRequests(&pod)
  660. totalRequestedCPUResource += result.MilliCPU
  661. totalRequestedMemResource += result.Memory
  662. }
  663. return totalRequestedCPUResource, totalRequestedMemResource
  664. }