preemption.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package scheduling
  14. import (
  15. "context"
  16. "fmt"
  17. "strings"
  18. "sync/atomic"
  19. "time"
  20. appsv1 "k8s.io/api/apps/v1"
  21. v1 "k8s.io/api/core/v1"
  22. schedulingv1 "k8s.io/api/scheduling/v1"
  23. apierrors "k8s.io/apimachinery/pkg/api/errors"
  24. "k8s.io/apimachinery/pkg/api/resource"
  25. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  26. "k8s.io/apimachinery/pkg/runtime"
  27. "k8s.io/apimachinery/pkg/util/wait"
  28. "k8s.io/apimachinery/pkg/watch"
  29. clientset "k8s.io/client-go/kubernetes"
  30. "k8s.io/client-go/tools/cache"
  31. v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos"
  32. "k8s.io/kubernetes/pkg/apis/scheduling"
  33. "k8s.io/kubernetes/test/e2e/framework"
  34. e2enode "k8s.io/kubernetes/test/e2e/framework/node"
  35. e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
  36. "k8s.io/kubernetes/test/e2e/framework/replicaset"
  37. e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
  38. "github.com/onsi/ginkgo"
  39. "github.com/onsi/gomega"
  40. // ensure libs have a chance to initialize
  41. _ "github.com/stretchr/testify/assert"
  42. )
  43. type priorityPair struct {
  44. name string
  45. value int32
  46. }
  47. var _ = SIGDescribe("SchedulerPreemption [Serial]", func() {
  48. var cs clientset.Interface
  49. var nodeList *v1.NodeList
  50. var ns string
  51. f := framework.NewDefaultFramework("sched-preemption")
  52. lowPriority, mediumPriority, highPriority := int32(1), int32(100), int32(1000)
  53. lowPriorityClassName := f.BaseName + "-low-priority"
  54. mediumPriorityClassName := f.BaseName + "-medium-priority"
  55. highPriorityClassName := f.BaseName + "-high-priority"
  56. priorityPairs := []priorityPair{
  57. {name: lowPriorityClassName, value: lowPriority},
  58. {name: mediumPriorityClassName, value: mediumPriority},
  59. {name: highPriorityClassName, value: highPriority},
  60. }
  61. ginkgo.AfterEach(func() {
  62. for _, pair := range priorityPairs {
  63. cs.SchedulingV1().PriorityClasses().Delete(context.TODO(), pair.name, metav1.NewDeleteOptions(0))
  64. }
  65. })
  66. ginkgo.BeforeEach(func() {
  67. cs = f.ClientSet
  68. ns = f.Namespace.Name
  69. nodeList = &v1.NodeList{}
  70. var err error
  71. for _, pair := range priorityPairs {
  72. _, err := f.ClientSet.SchedulingV1().PriorityClasses().Create(context.TODO(), &schedulingv1.PriorityClass{ObjectMeta: metav1.ObjectMeta{Name: pair.name}, Value: pair.value}, metav1.CreateOptions{})
  73. framework.ExpectEqual(err == nil || apierrors.IsAlreadyExists(err), true)
  74. }
  75. e2enode.WaitForTotalHealthy(cs, time.Minute)
  76. masterNodes, nodeList, err = e2enode.GetMasterAndWorkerNodes(cs)
  77. if err != nil {
  78. framework.Logf("Unexpected error occurred: %v", err)
  79. }
  80. // TODO: write a wrapper for ExpectNoErrorWithOffset()
  81. framework.ExpectNoErrorWithOffset(0, err)
  82. err = framework.CheckTestingNSDeletedExcept(cs, ns)
  83. framework.ExpectNoError(err)
  84. })
  85. // This test verifies that when a higher priority pod is created and no node with
  86. // enough resources is found, scheduler preempts a lower priority pod to schedule
  87. // the high priority pod.
  88. ginkgo.It("validates basic preemption works", func() {
  89. var podRes v1.ResourceList
  90. // Create one pod per node that uses a lot of the node's resources.
  91. ginkgo.By("Create pods that use 60% of node resources.")
  92. pods := make([]*v1.Pod, 0, len(nodeList.Items))
  93. allPods, err := cs.CoreV1().Pods(metav1.NamespaceAll).List(context.TODO(), metav1.ListOptions{})
  94. framework.ExpectNoError(err)
  95. for i, node := range nodeList.Items {
  96. currentCPUUsage, currentMemUsage := getCurrentPodUsageOnTheNode(node.Name, allPods.Items, podRequestedResource)
  97. framework.Logf("Current cpu and memory usage %v, %v", currentCPUUsage, currentMemUsage)
  98. cpuAllocatable, found := node.Status.Allocatable["cpu"]
  99. framework.ExpectEqual(found, true)
  100. milliCPU := cpuAllocatable.MilliValue()
  101. milliCPU = int64(float64(milliCPU-currentCPUUsage) * float64(0.6))
  102. memAllocatable, found := node.Status.Allocatable["memory"]
  103. framework.ExpectEqual(found, true)
  104. memory := memAllocatable.Value()
  105. memory = int64(float64(memory-currentMemUsage) * float64(0.6))
  106. // If a node is already heavily utilized let not's create a pod there.
  107. if milliCPU <= 0 || memory <= 0 {
  108. framework.Logf("Node is heavily utilized, let's not create a pod here")
  109. continue
  110. }
  111. podRes = v1.ResourceList{}
  112. podRes[v1.ResourceCPU] = *resource.NewMilliQuantity(int64(milliCPU), resource.DecimalSI)
  113. podRes[v1.ResourceMemory] = *resource.NewQuantity(int64(memory), resource.BinarySI)
  114. // make the first pod low priority and the rest medium priority.
  115. priorityName := mediumPriorityClassName
  116. if len(pods) == 0 {
  117. priorityName = lowPriorityClassName
  118. }
  119. pods = append(pods, createPausePod(f, pausePodConfig{
  120. Name: fmt.Sprintf("pod%d-%v", i, priorityName),
  121. PriorityClassName: priorityName,
  122. Resources: &v1.ResourceRequirements{
  123. Requests: podRes,
  124. },
  125. NodeName: node.Name,
  126. }))
  127. framework.Logf("Created pod: %v", pods[i].Name)
  128. }
  129. if len(pods) < 2 {
  130. framework.Failf("We need at least two pods to be created but" +
  131. "all nodes are already heavily utilized, so preemption tests cannot be run")
  132. }
  133. ginkgo.By("Wait for pods to be scheduled.")
  134. for _, pod := range pods {
  135. framework.ExpectNoError(e2epod.WaitForPodRunningInNamespace(cs, pod))
  136. }
  137. // Set the pod request to the first pod's resources (should be low priority pod)
  138. podRes = pods[0].Spec.Containers[0].Resources.Requests
  139. ginkgo.By("Run a high priority pod that has same requirements as that of lower priority pod")
  140. // Create a high priority pod and make sure it is scheduled on the same node as the low priority pod.
  141. runPausePod(f, pausePodConfig{
  142. Name: "preemptor-pod",
  143. PriorityClassName: highPriorityClassName,
  144. Resources: &v1.ResourceRequirements{
  145. Requests: podRes,
  146. },
  147. NodeName: pods[0].Spec.NodeName,
  148. })
  149. preemptedPod, err := cs.CoreV1().Pods(pods[0].Namespace).Get(context.TODO(), pods[0].Name, metav1.GetOptions{})
  150. podPreempted := (err != nil && apierrors.IsNotFound(err)) ||
  151. (err == nil && preemptedPod.DeletionTimestamp != nil)
  152. for i := 1; i < len(pods); i++ {
  153. livePod, err := cs.CoreV1().Pods(pods[i].Namespace).Get(context.TODO(), pods[i].Name, metav1.GetOptions{})
  154. framework.ExpectNoError(err)
  155. gomega.Expect(livePod.DeletionTimestamp).To(gomega.BeNil())
  156. }
  157. framework.ExpectEqual(podPreempted, true)
  158. })
  159. // This test verifies that when a critical pod is created and no node with
  160. // enough resources is found, scheduler preempts a lower priority pod to schedule
  161. // this critical pod.
  162. ginkgo.It("validates lower priority pod preemption by critical pod", func() {
  163. var podRes v1.ResourceList
  164. // Create one pod per node that uses a lot of the node's resources.
  165. ginkgo.By("Create pods that use 60% of node resources.")
  166. pods := make([]*v1.Pod, 0, len(nodeList.Items))
  167. allPods, err := cs.CoreV1().Pods(metav1.NamespaceAll).List(context.TODO(), metav1.ListOptions{})
  168. framework.ExpectNoError(err)
  169. for i, node := range nodeList.Items {
  170. currentCPUUsage, currentMemUsage := getCurrentPodUsageOnTheNode(node.Name, allPods.Items, podRequestedResource)
  171. framework.Logf("Current cpu usage and memory usage is %v, %v", currentCPUUsage, currentMemUsage)
  172. cpuAllocatable, found := node.Status.Allocatable["cpu"]
  173. framework.ExpectEqual(found, true)
  174. milliCPU := cpuAllocatable.MilliValue()
  175. milliCPU = int64(float64(milliCPU-currentCPUUsage) * float64(0.6))
  176. memAllocatable, found := node.Status.Allocatable["memory"]
  177. framework.ExpectEqual(found, true)
  178. memory := memAllocatable.Value()
  179. memory = int64(float64(memory-currentMemUsage) * float64(0.6))
  180. podRes = v1.ResourceList{}
  181. // If a node is already heavily utilized let not's create a pod there.
  182. if milliCPU <= 0 || memory <= 0 {
  183. framework.Logf("Node is heavily utilized, let's not create a pod there")
  184. continue
  185. }
  186. podRes[v1.ResourceCPU] = *resource.NewMilliQuantity(int64(milliCPU), resource.DecimalSI)
  187. podRes[v1.ResourceMemory] = *resource.NewQuantity(int64(memory), resource.BinarySI)
  188. // make the first pod low priority and the rest medium priority.
  189. priorityName := mediumPriorityClassName
  190. if len(pods) == 0 {
  191. priorityName = lowPriorityClassName
  192. }
  193. pods[i] = createPausePod(f, pausePodConfig{
  194. Name: fmt.Sprintf("pod%d-%v", i, priorityName),
  195. PriorityClassName: priorityName,
  196. Resources: &v1.ResourceRequirements{
  197. Requests: podRes,
  198. },
  199. NodeName: node.Name,
  200. })
  201. framework.Logf("Created pod: %v", pods[i].Name)
  202. }
  203. if len(pods) < 2 {
  204. e2eskipper.Skipf("We need at least two pods to be created but" +
  205. "all nodes are already heavily utilized, so preemption tests cannot be run")
  206. }
  207. ginkgo.By("Wait for pods to be scheduled.")
  208. for _, pod := range pods {
  209. framework.ExpectNoError(e2epod.WaitForPodRunningInNamespace(cs, pod))
  210. }
  211. // We want this pod to be preempted
  212. podRes = pods[0].Spec.Containers[0].Resources.Requests
  213. ginkgo.By("Run a critical pod that use same resources as that of a lower priority pod")
  214. // Create a critical pod and make sure it is scheduled.
  215. defer func() {
  216. // Clean-up the critical pod
  217. // Always run cleanup to make sure the pod is properly cleaned up.
  218. err := f.ClientSet.CoreV1().Pods(metav1.NamespaceSystem).Delete(context.TODO(), "critical-pod", metav1.NewDeleteOptions(0))
  219. if err != nil && !apierrors.IsNotFound(err) {
  220. framework.Failf("Error cleanup pod `%s/%s`: %v", metav1.NamespaceSystem, "critical-pod", err)
  221. }
  222. }()
  223. runPausePod(f, pausePodConfig{
  224. Name: "critical-pod",
  225. Namespace: metav1.NamespaceSystem,
  226. PriorityClassName: scheduling.SystemClusterCritical,
  227. Resources: &v1.ResourceRequirements{
  228. Requests: podRes,
  229. },
  230. NodeName: pods[0].Spec.NodeName,
  231. })
  232. defer func() {
  233. // Clean-up the critical pod
  234. err := f.ClientSet.CoreV1().Pods(metav1.NamespaceSystem).Delete(context.TODO(), "critical-pod", metav1.NewDeleteOptions(0))
  235. framework.ExpectNoError(err)
  236. }()
  237. // Make sure that the lowest priority pod is deleted.
  238. preemptedPod, err := cs.CoreV1().Pods(pods[0].Namespace).Get(context.TODO(), pods[0].Name, metav1.GetOptions{})
  239. podPreempted := (err != nil && apierrors.IsNotFound(err)) ||
  240. (err == nil && preemptedPod.DeletionTimestamp != nil)
  241. for i := 1; i < len(pods); i++ {
  242. livePod, err := cs.CoreV1().Pods(pods[i].Namespace).Get(context.TODO(), pods[i].Name, metav1.GetOptions{})
  243. framework.ExpectNoError(err)
  244. gomega.Expect(livePod.DeletionTimestamp).To(gomega.BeNil())
  245. }
  246. framework.ExpectEqual(podPreempted, true)
  247. })
  248. })
  249. // construct a fakecpu so as to set it to status of Node object
  250. // otherwise if we update CPU/Memory/etc, those values will be corrected back by kubelet
  251. var fakecpu v1.ResourceName = "example.com/fakecpu"
  252. var _ = SIGDescribe("PreemptionExecutionPath", func() {
  253. var cs clientset.Interface
  254. var node *v1.Node
  255. var ns, nodeHostNameLabel string
  256. f := framework.NewDefaultFramework("sched-preemption-path")
  257. priorityPairs := make([]priorityPair, 0)
  258. ginkgo.AfterEach(func() {
  259. // print out additional info if tests failed
  260. if ginkgo.CurrentGinkgoTestDescription().Failed {
  261. // list existing priorities
  262. priorityList, err := cs.SchedulingV1().PriorityClasses().List(context.TODO(), metav1.ListOptions{})
  263. if err != nil {
  264. framework.Logf("Unable to list priorities: %v", err)
  265. } else {
  266. framework.Logf("List existing priorities:")
  267. for _, p := range priorityList.Items {
  268. framework.Logf("%v/%v created at %v", p.Name, p.Value, p.CreationTimestamp)
  269. }
  270. }
  271. }
  272. if node != nil {
  273. nodeCopy := node.DeepCopy()
  274. // force it to update
  275. nodeCopy.ResourceVersion = "0"
  276. delete(nodeCopy.Status.Capacity, fakecpu)
  277. _, err := cs.CoreV1().Nodes().UpdateStatus(context.TODO(), nodeCopy, metav1.UpdateOptions{})
  278. framework.ExpectNoError(err)
  279. }
  280. for _, pair := range priorityPairs {
  281. cs.SchedulingV1().PriorityClasses().Delete(context.TODO(), pair.name, metav1.NewDeleteOptions(0))
  282. }
  283. })
  284. ginkgo.BeforeEach(func() {
  285. cs = f.ClientSet
  286. ns = f.Namespace.Name
  287. // find an available node
  288. ginkgo.By("Finding an available node")
  289. nodeName := GetNodeThatCanRunPod(f)
  290. framework.Logf("found a healthy node: %s", nodeName)
  291. // get the node API object
  292. var err error
  293. node, err = cs.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
  294. if err != nil {
  295. framework.Failf("error getting node %q: %v", nodeName, err)
  296. }
  297. var ok bool
  298. nodeHostNameLabel, ok = node.GetObjectMeta().GetLabels()["kubernetes.io/hostname"]
  299. if !ok {
  300. framework.Failf("error getting kubernetes.io/hostname label on node %s", nodeName)
  301. }
  302. // update Node API object with a fake resource
  303. nodeCopy := node.DeepCopy()
  304. // force it to update
  305. nodeCopy.ResourceVersion = "0"
  306. nodeCopy.Status.Capacity[fakecpu] = resource.MustParse("1000")
  307. node, err = cs.CoreV1().Nodes().UpdateStatus(context.TODO(), nodeCopy, metav1.UpdateOptions{})
  308. framework.ExpectNoError(err)
  309. // create four PriorityClass: p1, p2, p3, p4
  310. for i := 1; i <= 4; i++ {
  311. priorityName := fmt.Sprintf("p%d", i)
  312. priorityVal := int32(i)
  313. priorityPairs = append(priorityPairs, priorityPair{name: priorityName, value: priorityVal})
  314. _, err := cs.SchedulingV1().PriorityClasses().Create(context.TODO(), &schedulingv1.PriorityClass{ObjectMeta: metav1.ObjectMeta{Name: priorityName}, Value: priorityVal}, metav1.CreateOptions{})
  315. if err != nil {
  316. framework.Logf("Failed to create priority '%v/%v': %v", priorityName, priorityVal, err)
  317. framework.Logf("Reason: %v. Msg: %v", apierrors.ReasonForError(err), err)
  318. }
  319. framework.ExpectEqual(err == nil || apierrors.IsAlreadyExists(err), true)
  320. }
  321. })
  322. ginkgo.It("runs ReplicaSets to verify preemption running path", func() {
  323. podNamesSeen := []int32{0, 0, 0}
  324. stopCh := make(chan struct{})
  325. // create a pod controller to list/watch pod events from the test framework namespace
  326. _, podController := cache.NewInformer(
  327. &cache.ListWatch{
  328. ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
  329. obj, err := f.ClientSet.CoreV1().Pods(ns).List(context.TODO(), options)
  330. return runtime.Object(obj), err
  331. },
  332. WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
  333. return f.ClientSet.CoreV1().Pods(ns).Watch(context.TODO(), options)
  334. },
  335. },
  336. &v1.Pod{},
  337. 0,
  338. cache.ResourceEventHandlerFuncs{
  339. AddFunc: func(obj interface{}) {
  340. if pod, ok := obj.(*v1.Pod); ok {
  341. if strings.HasPrefix(pod.Name, "rs-pod1") {
  342. atomic.AddInt32(&podNamesSeen[0], 1)
  343. } else if strings.HasPrefix(pod.Name, "rs-pod2") {
  344. atomic.AddInt32(&podNamesSeen[1], 1)
  345. } else if strings.HasPrefix(pod.Name, "rs-pod3") {
  346. atomic.AddInt32(&podNamesSeen[2], 1)
  347. }
  348. }
  349. },
  350. },
  351. )
  352. go podController.Run(stopCh)
  353. defer close(stopCh)
  354. // prepare three ReplicaSet
  355. rsConfs := []pauseRSConfig{
  356. {
  357. Replicas: int32(1),
  358. PodConfig: pausePodConfig{
  359. Name: "pod1",
  360. Namespace: ns,
  361. Labels: map[string]string{"name": "pod1"},
  362. PriorityClassName: "p1",
  363. NodeSelector: map[string]string{"kubernetes.io/hostname": nodeHostNameLabel},
  364. Resources: &v1.ResourceRequirements{
  365. Requests: v1.ResourceList{fakecpu: resource.MustParse("200")},
  366. Limits: v1.ResourceList{fakecpu: resource.MustParse("200")},
  367. },
  368. },
  369. },
  370. {
  371. Replicas: int32(1),
  372. PodConfig: pausePodConfig{
  373. Name: "pod2",
  374. Namespace: ns,
  375. Labels: map[string]string{"name": "pod2"},
  376. PriorityClassName: "p2",
  377. NodeSelector: map[string]string{"kubernetes.io/hostname": nodeHostNameLabel},
  378. Resources: &v1.ResourceRequirements{
  379. Requests: v1.ResourceList{fakecpu: resource.MustParse("300")},
  380. Limits: v1.ResourceList{fakecpu: resource.MustParse("300")},
  381. },
  382. },
  383. },
  384. {
  385. Replicas: int32(1),
  386. PodConfig: pausePodConfig{
  387. Name: "pod3",
  388. Namespace: ns,
  389. Labels: map[string]string{"name": "pod3"},
  390. PriorityClassName: "p3",
  391. NodeSelector: map[string]string{"kubernetes.io/hostname": nodeHostNameLabel},
  392. Resources: &v1.ResourceRequirements{
  393. Requests: v1.ResourceList{fakecpu: resource.MustParse("450")},
  394. Limits: v1.ResourceList{fakecpu: resource.MustParse("450")},
  395. },
  396. },
  397. },
  398. }
  399. // create ReplicaSet{1,2,3} so as to occupy 950/1000 fake resource
  400. for i := range rsConfs {
  401. runPauseRS(f, rsConfs[i])
  402. }
  403. framework.Logf("pods created so far: %v", podNamesSeen)
  404. framework.Logf("length of pods created so far: %v", len(podNamesSeen))
  405. // create a Preemptor Pod
  406. preemptorPodConf := pausePodConfig{
  407. Name: "pod4",
  408. Namespace: ns,
  409. Labels: map[string]string{"name": "pod4"},
  410. PriorityClassName: "p4",
  411. NodeSelector: map[string]string{"kubernetes.io/hostname": nodeHostNameLabel},
  412. Resources: &v1.ResourceRequirements{
  413. Requests: v1.ResourceList{fakecpu: resource.MustParse("500")},
  414. Limits: v1.ResourceList{fakecpu: resource.MustParse("500")},
  415. },
  416. }
  417. preemptorPod := createPod(f, preemptorPodConf)
  418. waitForPreemptingWithTimeout(f, preemptorPod, framework.PodGetTimeout)
  419. framework.Logf("pods created so far: %v", podNamesSeen)
  420. // count pods number of ReplicaSet{1,2,3}:
  421. // - if it's more than expected replicas, it denotes its pods have been over-preempted
  422. // - if it's less than expected replicas, it denotes its pods are under-preempted
  423. // "*2" means pods of ReplicaSet{1,2} are expected to be only preempted once.
  424. expectedRSPods := []int32{1 * 2, 1 * 2, 1}
  425. err := wait.Poll(framework.Poll, framework.PollShortTimeout, func() (bool, error) {
  426. for i := 0; i < len(podNamesSeen); i++ {
  427. got := atomic.LoadInt32(&podNamesSeen[i])
  428. if got < expectedRSPods[i] {
  429. framework.Logf("waiting for rs%d to observe %d pod creations, got %d", i+1, expectedRSPods[i], got)
  430. return false, nil
  431. } else if got > expectedRSPods[i] {
  432. return false, fmt.Errorf("rs%d had more than %d pods created: %d", i+1, expectedRSPods[i], got)
  433. }
  434. }
  435. return true, nil
  436. })
  437. if err != nil {
  438. framework.Logf("pods created so far: %v", podNamesSeen)
  439. framework.Failf("failed pod observation expectations: %v", err)
  440. }
  441. // If logic continues to here, we should do a final check to ensure within a time period,
  442. // the state is stable; otherwise, pods may be over-preempted.
  443. time.Sleep(5 * time.Second)
  444. for i := 0; i < len(podNamesSeen); i++ {
  445. got := atomic.LoadInt32(&podNamesSeen[i])
  446. if got < expectedRSPods[i] {
  447. framework.Failf("pods of ReplicaSet%d have been under-preempted: expect %v pod names, but got %d", i+1, expectedRSPods[i], got)
  448. } else if got > expectedRSPods[i] {
  449. framework.Failf("pods of ReplicaSet%d have been over-preempted: expect %v pod names, but got %d", i+1, expectedRSPods[i], got)
  450. }
  451. }
  452. })
  453. })
  454. type pauseRSConfig struct {
  455. Replicas int32
  456. PodConfig pausePodConfig
  457. }
  458. func initPauseRS(f *framework.Framework, conf pauseRSConfig) *appsv1.ReplicaSet {
  459. pausePod := initPausePod(f, conf.PodConfig)
  460. pauseRS := &appsv1.ReplicaSet{
  461. ObjectMeta: metav1.ObjectMeta{
  462. Name: "rs-" + pausePod.Name,
  463. Namespace: pausePod.Namespace,
  464. },
  465. Spec: appsv1.ReplicaSetSpec{
  466. Replicas: &conf.Replicas,
  467. Selector: &metav1.LabelSelector{
  468. MatchLabels: pausePod.Labels,
  469. },
  470. Template: v1.PodTemplateSpec{
  471. ObjectMeta: metav1.ObjectMeta{Labels: pausePod.ObjectMeta.Labels},
  472. Spec: pausePod.Spec,
  473. },
  474. },
  475. }
  476. return pauseRS
  477. }
  478. func createPauseRS(f *framework.Framework, conf pauseRSConfig) *appsv1.ReplicaSet {
  479. namespace := conf.PodConfig.Namespace
  480. if len(namespace) == 0 {
  481. namespace = f.Namespace.Name
  482. }
  483. rs, err := f.ClientSet.AppsV1().ReplicaSets(namespace).Create(context.TODO(), initPauseRS(f, conf), metav1.CreateOptions{})
  484. framework.ExpectNoError(err)
  485. return rs
  486. }
  487. func runPauseRS(f *framework.Framework, conf pauseRSConfig) *appsv1.ReplicaSet {
  488. rs := createPauseRS(f, conf)
  489. framework.ExpectNoError(replicaset.WaitForReplicaSetTargetAvailableReplicasWithTimeout(f.ClientSet, rs, conf.Replicas, framework.PodGetTimeout))
  490. return rs
  491. }
  492. func createPod(f *framework.Framework, conf pausePodConfig) *v1.Pod {
  493. namespace := conf.Namespace
  494. if len(namespace) == 0 {
  495. namespace = f.Namespace.Name
  496. }
  497. pod, err := f.ClientSet.CoreV1().Pods(namespace).Create(context.TODO(), initPausePod(f, conf), metav1.CreateOptions{})
  498. framework.ExpectNoError(err)
  499. return pod
  500. }
  501. // waitForPreemptingWithTimeout verifies if 'pod' is preempting within 'timeout', specifically it checks
  502. // if the 'spec.NodeName' field of preemptor 'pod' has been set.
  503. func waitForPreemptingWithTimeout(f *framework.Framework, pod *v1.Pod, timeout time.Duration) {
  504. err := wait.Poll(2*time.Second, timeout, func() (bool, error) {
  505. pod, err := f.ClientSet.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{})
  506. if err != nil {
  507. return false, err
  508. }
  509. if len(pod.Spec.NodeName) > 0 {
  510. return true, nil
  511. }
  512. return false, err
  513. })
  514. framework.ExpectNoError(err, "pod %v/%v failed to preempt other pods", pod.Namespace, pod.Name)
  515. }
  516. func getCurrentPodUsageOnTheNode(nodeName string, pods []v1.Pod, resource *v1.ResourceRequirements) (int64, int64) {
  517. totalRequestedCPUResource := resource.Requests.Cpu().MilliValue()
  518. totalRequestedMemResource := resource.Requests.Memory().Value()
  519. for _, pod := range pods {
  520. if pod.Spec.NodeName != nodeName || v1qos.GetPodQOS(&pod) == v1.PodQOSBestEffort {
  521. continue
  522. }
  523. result := getNonZeroRequests(&pod)
  524. totalRequestedCPUResource += result.MilliCPU
  525. totalRequestedMemResource += result.Memory
  526. }
  527. return totalRequestedCPUResource, totalRequestedMemResource
  528. }