123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725 |
- /*
- Copyright 2017 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package scheduling
- import (
- "context"
- "fmt"
- "strings"
- "sync/atomic"
- "time"
- appsv1 "k8s.io/api/apps/v1"
- v1 "k8s.io/api/core/v1"
- schedulingv1 "k8s.io/api/scheduling/v1"
- apierrors "k8s.io/apimachinery/pkg/api/errors"
- "k8s.io/apimachinery/pkg/api/resource"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/runtime"
- "k8s.io/apimachinery/pkg/util/sets"
- "k8s.io/apimachinery/pkg/util/wait"
- "k8s.io/apimachinery/pkg/watch"
- clientset "k8s.io/client-go/kubernetes"
- "k8s.io/client-go/tools/cache"
- v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos"
- "k8s.io/kubernetes/pkg/apis/scheduling"
- "k8s.io/kubernetes/test/e2e/framework"
- e2enode "k8s.io/kubernetes/test/e2e/framework/node"
- e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
- "k8s.io/kubernetes/test/e2e/framework/replicaset"
- e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
- "github.com/onsi/ginkgo"
- "github.com/onsi/gomega"
- // ensure libs have a chance to initialize
- _ "github.com/stretchr/testify/assert"
- )
- type priorityPair struct {
- name string
- value int32
- }
- var _ = SIGDescribe("SchedulerPreemption [Serial]", func() {
- var cs clientset.Interface
- var nodeList *v1.NodeList
- var ns string
- f := framework.NewDefaultFramework("sched-preemption")
- lowPriority, mediumPriority, highPriority := int32(1), int32(100), int32(1000)
- lowPriorityClassName := f.BaseName + "-low-priority"
- mediumPriorityClassName := f.BaseName + "-medium-priority"
- highPriorityClassName := f.BaseName + "-high-priority"
- priorityPairs := []priorityPair{
- {name: lowPriorityClassName, value: lowPriority},
- {name: mediumPriorityClassName, value: mediumPriority},
- {name: highPriorityClassName, value: highPriority},
- }
- ginkgo.AfterEach(func() {
- for _, pair := range priorityPairs {
- cs.SchedulingV1().PriorityClasses().Delete(context.TODO(), pair.name, metav1.NewDeleteOptions(0))
- }
- })
- ginkgo.BeforeEach(func() {
- cs = f.ClientSet
- ns = f.Namespace.Name
- nodeList = &v1.NodeList{}
- var err error
- for _, pair := range priorityPairs {
- _, err := f.ClientSet.SchedulingV1().PriorityClasses().Create(context.TODO(), &schedulingv1.PriorityClass{ObjectMeta: metav1.ObjectMeta{Name: pair.name}, Value: pair.value}, metav1.CreateOptions{})
- framework.ExpectEqual(err == nil || apierrors.IsAlreadyExists(err), true)
- }
- e2enode.WaitForTotalHealthy(cs, time.Minute)
- masterNodes, nodeList, err = e2enode.GetMasterAndWorkerNodes(cs)
- if err != nil {
- framework.Logf("Unexpected error occurred: %v", err)
- }
- // TODO: write a wrapper for ExpectNoErrorWithOffset()
- framework.ExpectNoErrorWithOffset(0, err)
- err = framework.CheckTestingNSDeletedExcept(cs, ns)
- framework.ExpectNoError(err)
- })
- // This test verifies that when a higher priority pod is created and no node with
- // enough resources is found, scheduler preempts a lower priority pod to schedule
- // the high priority pod.
- ginkgo.It("validates basic preemption works", func() {
- var podRes v1.ResourceList
- // Create one pod per node that uses a lot of the node's resources.
- ginkgo.By("Create pods that use 60% of node resources.")
- pods := make([]*v1.Pod, 0, len(nodeList.Items))
- allPods, err := cs.CoreV1().Pods(metav1.NamespaceAll).List(context.TODO(), metav1.ListOptions{})
- framework.ExpectNoError(err)
- for i, node := range nodeList.Items {
- currentCPUUsage, currentMemUsage := getCurrentPodUsageOnTheNode(node.Name, allPods.Items, podRequestedResource)
- framework.Logf("Current cpu and memory usage %v, %v", currentCPUUsage, currentMemUsage)
- cpuAllocatable, found := node.Status.Allocatable["cpu"]
- framework.ExpectEqual(found, true)
- milliCPU := cpuAllocatable.MilliValue()
- milliCPU = int64(float64(milliCPU-currentCPUUsage) * float64(0.6))
- memAllocatable, found := node.Status.Allocatable["memory"]
- framework.ExpectEqual(found, true)
- memory := memAllocatable.Value()
- memory = int64(float64(memory-currentMemUsage) * float64(0.6))
- // If a node is already heavily utilized let not's create a pod there.
- if milliCPU <= 0 || memory <= 0 {
- framework.Logf("Node is heavily utilized, let's not create a pod here")
- continue
- }
- podRes = v1.ResourceList{}
- podRes[v1.ResourceCPU] = *resource.NewMilliQuantity(int64(milliCPU), resource.DecimalSI)
- podRes[v1.ResourceMemory] = *resource.NewQuantity(int64(memory), resource.BinarySI)
- // make the first pod low priority and the rest medium priority.
- priorityName := mediumPriorityClassName
- if len(pods) == 0 {
- priorityName = lowPriorityClassName
- }
- pods = append(pods, createPausePod(f, pausePodConfig{
- Name: fmt.Sprintf("pod%d-%v", i, priorityName),
- PriorityClassName: priorityName,
- Resources: &v1.ResourceRequirements{
- Requests: podRes,
- },
- NodeName: node.Name,
- }))
- framework.Logf("Created pod: %v", pods[i].Name)
- }
- if len(pods) < 2 {
- framework.Failf("We need at least two pods to be created but" +
- "all nodes are already heavily utilized, so preemption tests cannot be run")
- }
- ginkgo.By("Wait for pods to be scheduled.")
- for _, pod := range pods {
- framework.ExpectNoError(e2epod.WaitForPodRunningInNamespace(cs, pod))
- }
- // Set the pod request to the first pod's resources (should be low priority pod)
- podRes = pods[0].Spec.Containers[0].Resources.Requests
- ginkgo.By("Run a high priority pod that has same requirements as that of lower priority pod")
- // Create a high priority pod and make sure it is scheduled on the same node as the low priority pod.
- runPausePod(f, pausePodConfig{
- Name: "preemptor-pod",
- PriorityClassName: highPriorityClassName,
- Resources: &v1.ResourceRequirements{
- Requests: podRes,
- },
- NodeName: pods[0].Spec.NodeName,
- })
- preemptedPod, err := cs.CoreV1().Pods(pods[0].Namespace).Get(context.TODO(), pods[0].Name, metav1.GetOptions{})
- podPreempted := (err != nil && apierrors.IsNotFound(err)) ||
- (err == nil && preemptedPod.DeletionTimestamp != nil)
- for i := 1; i < len(pods); i++ {
- livePod, err := cs.CoreV1().Pods(pods[i].Namespace).Get(context.TODO(), pods[i].Name, metav1.GetOptions{})
- framework.ExpectNoError(err)
- gomega.Expect(livePod.DeletionTimestamp).To(gomega.BeNil())
- }
- framework.ExpectEqual(podPreempted, true)
- })
- // This test verifies that when a critical pod is created and no node with
- // enough resources is found, scheduler preempts a lower priority pod to schedule
- // this critical pod.
- ginkgo.It("validates lower priority pod preemption by critical pod", func() {
- var podRes v1.ResourceList
- // Create one pod per node that uses a lot of the node's resources.
- ginkgo.By("Create pods that use 60% of node resources.")
- pods := make([]*v1.Pod, 0, len(nodeList.Items))
- allPods, err := cs.CoreV1().Pods(metav1.NamespaceAll).List(context.TODO(), metav1.ListOptions{})
- framework.ExpectNoError(err)
- for i, node := range nodeList.Items {
- currentCPUUsage, currentMemUsage := getCurrentPodUsageOnTheNode(node.Name, allPods.Items, podRequestedResource)
- framework.Logf("Current cpu usage and memory usage is %v, %v", currentCPUUsage, currentMemUsage)
- cpuAllocatable, found := node.Status.Allocatable["cpu"]
- framework.ExpectEqual(found, true)
- milliCPU := cpuAllocatable.MilliValue()
- milliCPU = int64(float64(milliCPU-currentCPUUsage) * float64(0.6))
- memAllocatable, found := node.Status.Allocatable["memory"]
- framework.ExpectEqual(found, true)
- memory := memAllocatable.Value()
- memory = int64(float64(memory-currentMemUsage) * float64(0.6))
- podRes = v1.ResourceList{}
- // If a node is already heavily utilized let not's create a pod there.
- if milliCPU <= 0 || memory <= 0 {
- framework.Logf("Node is heavily utilized, let's not create a pod there")
- continue
- }
- podRes[v1.ResourceCPU] = *resource.NewMilliQuantity(int64(milliCPU), resource.DecimalSI)
- podRes[v1.ResourceMemory] = *resource.NewQuantity(int64(memory), resource.BinarySI)
- // make the first pod low priority and the rest medium priority.
- priorityName := mediumPriorityClassName
- if len(pods) == 0 {
- priorityName = lowPriorityClassName
- }
- pods[i] = createPausePod(f, pausePodConfig{
- Name: fmt.Sprintf("pod%d-%v", i, priorityName),
- PriorityClassName: priorityName,
- Resources: &v1.ResourceRequirements{
- Requests: podRes,
- },
- NodeName: node.Name,
- })
- framework.Logf("Created pod: %v", pods[i].Name)
- }
- if len(pods) < 2 {
- e2eskipper.Skipf("We need at least two pods to be created but" +
- "all nodes are already heavily utilized, so preemption tests cannot be run")
- }
- ginkgo.By("Wait for pods to be scheduled.")
- for _, pod := range pods {
- framework.ExpectNoError(e2epod.WaitForPodRunningInNamespace(cs, pod))
- }
- // We want this pod to be preempted
- podRes = pods[0].Spec.Containers[0].Resources.Requests
- ginkgo.By("Run a critical pod that use same resources as that of a lower priority pod")
- // Create a critical pod and make sure it is scheduled.
- defer func() {
- // Clean-up the critical pod
- // Always run cleanup to make sure the pod is properly cleaned up.
- err := f.ClientSet.CoreV1().Pods(metav1.NamespaceSystem).Delete(context.TODO(), "critical-pod", metav1.NewDeleteOptions(0))
- if err != nil && !apierrors.IsNotFound(err) {
- framework.Failf("Error cleanup pod `%s/%s`: %v", metav1.NamespaceSystem, "critical-pod", err)
- }
- }()
- runPausePod(f, pausePodConfig{
- Name: "critical-pod",
- Namespace: metav1.NamespaceSystem,
- PriorityClassName: scheduling.SystemClusterCritical,
- Resources: &v1.ResourceRequirements{
- Requests: podRes,
- },
- NodeName: pods[0].Spec.NodeName,
- })
- defer func() {
- // Clean-up the critical pod
- err := f.ClientSet.CoreV1().Pods(metav1.NamespaceSystem).Delete(context.TODO(), "critical-pod", metav1.NewDeleteOptions(0))
- framework.ExpectNoError(err)
- }()
- // Make sure that the lowest priority pod is deleted.
- preemptedPod, err := cs.CoreV1().Pods(pods[0].Namespace).Get(context.TODO(), pods[0].Name, metav1.GetOptions{})
- podPreempted := (err != nil && apierrors.IsNotFound(err)) ||
- (err == nil && preemptedPod.DeletionTimestamp != nil)
- for i := 1; i < len(pods); i++ {
- livePod, err := cs.CoreV1().Pods(pods[i].Namespace).Get(context.TODO(), pods[i].Name, metav1.GetOptions{})
- framework.ExpectNoError(err)
- gomega.Expect(livePod.DeletionTimestamp).To(gomega.BeNil())
- }
- framework.ExpectEqual(podPreempted, true)
- })
- ginkgo.Context("PodTopologySpread Preemption", func() {
- var nodeNames []string
- var nodes []*v1.Node
- topologyKey := "kubernetes.io/e2e-pts-preemption"
- var fakeRes v1.ResourceName = "example.com/fakePTSRes"
- ginkgo.BeforeEach(func() {
- ginkgo.By("Trying to get 2 available nodes which can run pod")
- nodeNames = Get2NodesThatCanRunPod(f)
- ginkgo.By(fmt.Sprintf("Apply dedicated topologyKey %v for this test on the 2 nodes.", topologyKey))
- for _, nodeName := range nodeNames {
- framework.AddOrUpdateLabelOnNode(cs, nodeName, topologyKey, nodeName)
- node, err := cs.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
- framework.ExpectNoError(err)
- // update Node API object with a fake resource
- nodeCopy := node.DeepCopy()
- // force it to update
- nodeCopy.ResourceVersion = "0"
- ginkgo.By(fmt.Sprintf("Apply 10 fake resource to node %v.", node.Name))
- nodeCopy.Status.Capacity[fakeRes] = resource.MustParse("10")
- node, err = cs.CoreV1().Nodes().UpdateStatus(context.TODO(), nodeCopy, metav1.UpdateOptions{})
- framework.ExpectNoError(err)
- nodes = append(nodes, node)
- }
- })
- ginkgo.AfterEach(func() {
- for _, nodeName := range nodeNames {
- framework.RemoveLabelOffNode(cs, nodeName, topologyKey)
- }
- for _, node := range nodes {
- nodeCopy := node.DeepCopy()
- // force it to update
- nodeCopy.ResourceVersion = "0"
- delete(nodeCopy.Status.Capacity, fakeRes)
- _, err := cs.CoreV1().Nodes().UpdateStatus(context.TODO(), nodeCopy, metav1.UpdateOptions{})
- framework.ExpectNoError(err)
- }
- })
- ginkgo.It("validates proper pods are preempted", func() {
- podLabel := "e2e-pts-preemption"
- nodeAffinity := &v1.Affinity{
- NodeAffinity: &v1.NodeAffinity{
- RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
- NodeSelectorTerms: []v1.NodeSelectorTerm{
- {
- MatchExpressions: []v1.NodeSelectorRequirement{
- {
- Key: topologyKey,
- Operator: v1.NodeSelectorOpIn,
- Values: nodeNames,
- },
- },
- },
- },
- },
- },
- }
- highPodCfg := pausePodConfig{
- Name: "high",
- Namespace: ns,
- Labels: map[string]string{podLabel: ""},
- PriorityClassName: highPriorityClassName,
- Affinity: nodeAffinity,
- Resources: &v1.ResourceRequirements{
- Requests: v1.ResourceList{fakeRes: resource.MustParse("9")},
- Limits: v1.ResourceList{fakeRes: resource.MustParse("9")},
- },
- }
- lowPodCfg := pausePodConfig{
- Namespace: ns,
- Labels: map[string]string{podLabel: ""},
- PriorityClassName: lowPriorityClassName,
- Affinity: nodeAffinity,
- Resources: &v1.ResourceRequirements{
- Requests: v1.ResourceList{fakeRes: resource.MustParse("3")},
- Limits: v1.ResourceList{fakeRes: resource.MustParse("3")},
- },
- }
- ginkgo.By("Create 1 High Pod and 3 Low Pods to occupy 9/10 of fake resources on both nodes.")
- // Prepare 1 High Pod and 3 Low Pods
- runPausePod(f, highPodCfg)
- for i := 1; i <= 3; i++ {
- lowPodCfg.Name = fmt.Sprintf("low-%v", i)
- runPausePod(f, lowPodCfg)
- }
- ginkgo.By("Create 1 Medium Pod with TopologySpreadConstraints")
- mediumPodCfg := pausePodConfig{
- Name: "medium",
- Namespace: ns,
- Labels: map[string]string{podLabel: ""},
- PriorityClassName: mediumPriorityClassName,
- Affinity: nodeAffinity,
- Resources: &v1.ResourceRequirements{
- Requests: v1.ResourceList{fakeRes: resource.MustParse("3")},
- Limits: v1.ResourceList{fakeRes: resource.MustParse("3")},
- },
- TopologySpreadConstraints: []v1.TopologySpreadConstraint{
- {
- MaxSkew: 1,
- TopologyKey: topologyKey,
- WhenUnsatisfiable: v1.DoNotSchedule,
- LabelSelector: &metav1.LabelSelector{
- MatchExpressions: []metav1.LabelSelectorRequirement{
- {
- Key: podLabel,
- Operator: metav1.LabelSelectorOpExists,
- },
- },
- },
- },
- },
- }
- // To fulfil resource.requests, the medium Pod only needs to preempt one low pod.
- // However, in that case, the Pods spread becomes [<high>, <medium, low, low>], which doesn't
- // satisfy the pod topology spread constraints. Hence it needs to preempt another low pod
- // to make the Pods spread like [<high>, <medium, low>].
- runPausePod(f, mediumPodCfg)
- e2epod.WaitForPodNotPending(cs, ns, mediumPodCfg.Name)
- ginkgo.By("Verify there are 3 Pods left in this namespace")
- wantPods := sets.NewString("high", "medium", "low")
- podList, err := cs.CoreV1().Pods(ns).List(context.TODO(), metav1.ListOptions{})
- pods := podList.Items
- framework.ExpectNoError(err)
- framework.ExpectEqual(len(pods), 3)
- for _, pod := range pods {
- // Remove the ordinal index for low pod.
- podName := strings.Split(pod.Name, "-")[0]
- if wantPods.Has(podName) {
- ginkgo.By(fmt.Sprintf("Pod %q is as expected to be running.", pod.Name))
- wantPods.Delete(podName)
- } else {
- framework.Failf("Pod %q conflicted with expected PodSet %v", podName, wantPods)
- }
- }
- })
- })
- })
- // construct a fakecpu so as to set it to status of Node object
- // otherwise if we update CPU/Memory/etc, those values will be corrected back by kubelet
- var fakecpu v1.ResourceName = "example.com/fakecpu"
- var _ = SIGDescribe("PreemptionExecutionPath", func() {
- var cs clientset.Interface
- var node *v1.Node
- var ns, nodeHostNameLabel string
- f := framework.NewDefaultFramework("sched-preemption-path")
- priorityPairs := make([]priorityPair, 0)
- ginkgo.AfterEach(func() {
- // print out additional info if tests failed
- if ginkgo.CurrentGinkgoTestDescription().Failed {
- // list existing priorities
- priorityList, err := cs.SchedulingV1().PriorityClasses().List(context.TODO(), metav1.ListOptions{})
- if err != nil {
- framework.Logf("Unable to list priorities: %v", err)
- } else {
- framework.Logf("List existing priorities:")
- for _, p := range priorityList.Items {
- framework.Logf("%v/%v created at %v", p.Name, p.Value, p.CreationTimestamp)
- }
- }
- }
- if node != nil {
- nodeCopy := node.DeepCopy()
- // force it to update
- nodeCopy.ResourceVersion = "0"
- delete(nodeCopy.Status.Capacity, fakecpu)
- _, err := cs.CoreV1().Nodes().UpdateStatus(context.TODO(), nodeCopy, metav1.UpdateOptions{})
- framework.ExpectNoError(err)
- }
- for _, pair := range priorityPairs {
- cs.SchedulingV1().PriorityClasses().Delete(context.TODO(), pair.name, metav1.NewDeleteOptions(0))
- }
- })
- ginkgo.BeforeEach(func() {
- cs = f.ClientSet
- ns = f.Namespace.Name
- // find an available node
- ginkgo.By("Finding an available node")
- nodeName := GetNodeThatCanRunPod(f)
- framework.Logf("found a healthy node: %s", nodeName)
- // get the node API object
- var err error
- node, err = cs.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
- if err != nil {
- framework.Failf("error getting node %q: %v", nodeName, err)
- }
- var ok bool
- nodeHostNameLabel, ok = node.GetObjectMeta().GetLabels()["kubernetes.io/hostname"]
- if !ok {
- framework.Failf("error getting kubernetes.io/hostname label on node %s", nodeName)
- }
- // update Node API object with a fake resource
- nodeCopy := node.DeepCopy()
- // force it to update
- nodeCopy.ResourceVersion = "0"
- nodeCopy.Status.Capacity[fakecpu] = resource.MustParse("1000")
- node, err = cs.CoreV1().Nodes().UpdateStatus(context.TODO(), nodeCopy, metav1.UpdateOptions{})
- framework.ExpectNoError(err)
- // create four PriorityClass: p1, p2, p3, p4
- for i := 1; i <= 4; i++ {
- priorityName := fmt.Sprintf("p%d", i)
- priorityVal := int32(i)
- priorityPairs = append(priorityPairs, priorityPair{name: priorityName, value: priorityVal})
- _, err := cs.SchedulingV1().PriorityClasses().Create(context.TODO(), &schedulingv1.PriorityClass{ObjectMeta: metav1.ObjectMeta{Name: priorityName}, Value: priorityVal}, metav1.CreateOptions{})
- if err != nil {
- framework.Logf("Failed to create priority '%v/%v': %v", priorityName, priorityVal, err)
- framework.Logf("Reason: %v. Msg: %v", apierrors.ReasonForError(err), err)
- }
- framework.ExpectEqual(err == nil || apierrors.IsAlreadyExists(err), true)
- }
- })
- ginkgo.It("runs ReplicaSets to verify preemption running path", func() {
- podNamesSeen := []int32{0, 0, 0}
- stopCh := make(chan struct{})
- // create a pod controller to list/watch pod events from the test framework namespace
- _, podController := cache.NewInformer(
- &cache.ListWatch{
- ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
- obj, err := f.ClientSet.CoreV1().Pods(ns).List(context.TODO(), options)
- return runtime.Object(obj), err
- },
- WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
- return f.ClientSet.CoreV1().Pods(ns).Watch(context.TODO(), options)
- },
- },
- &v1.Pod{},
- 0,
- cache.ResourceEventHandlerFuncs{
- AddFunc: func(obj interface{}) {
- if pod, ok := obj.(*v1.Pod); ok {
- if strings.HasPrefix(pod.Name, "rs-pod1") {
- atomic.AddInt32(&podNamesSeen[0], 1)
- } else if strings.HasPrefix(pod.Name, "rs-pod2") {
- atomic.AddInt32(&podNamesSeen[1], 1)
- } else if strings.HasPrefix(pod.Name, "rs-pod3") {
- atomic.AddInt32(&podNamesSeen[2], 1)
- }
- }
- },
- },
- )
- go podController.Run(stopCh)
- defer close(stopCh)
- // prepare three ReplicaSet
- rsConfs := []pauseRSConfig{
- {
- Replicas: int32(1),
- PodConfig: pausePodConfig{
- Name: "pod1",
- Namespace: ns,
- Labels: map[string]string{"name": "pod1"},
- PriorityClassName: "p1",
- NodeSelector: map[string]string{"kubernetes.io/hostname": nodeHostNameLabel},
- Resources: &v1.ResourceRequirements{
- Requests: v1.ResourceList{fakecpu: resource.MustParse("200")},
- Limits: v1.ResourceList{fakecpu: resource.MustParse("200")},
- },
- },
- },
- {
- Replicas: int32(1),
- PodConfig: pausePodConfig{
- Name: "pod2",
- Namespace: ns,
- Labels: map[string]string{"name": "pod2"},
- PriorityClassName: "p2",
- NodeSelector: map[string]string{"kubernetes.io/hostname": nodeHostNameLabel},
- Resources: &v1.ResourceRequirements{
- Requests: v1.ResourceList{fakecpu: resource.MustParse("300")},
- Limits: v1.ResourceList{fakecpu: resource.MustParse("300")},
- },
- },
- },
- {
- Replicas: int32(1),
- PodConfig: pausePodConfig{
- Name: "pod3",
- Namespace: ns,
- Labels: map[string]string{"name": "pod3"},
- PriorityClassName: "p3",
- NodeSelector: map[string]string{"kubernetes.io/hostname": nodeHostNameLabel},
- Resources: &v1.ResourceRequirements{
- Requests: v1.ResourceList{fakecpu: resource.MustParse("450")},
- Limits: v1.ResourceList{fakecpu: resource.MustParse("450")},
- },
- },
- },
- }
- // create ReplicaSet{1,2,3} so as to occupy 950/1000 fake resource
- for i := range rsConfs {
- runPauseRS(f, rsConfs[i])
- }
- framework.Logf("pods created so far: %v", podNamesSeen)
- framework.Logf("length of pods created so far: %v", len(podNamesSeen))
- // create a Preemptor Pod
- preemptorPodConf := pausePodConfig{
- Name: "pod4",
- Namespace: ns,
- Labels: map[string]string{"name": "pod4"},
- PriorityClassName: "p4",
- NodeSelector: map[string]string{"kubernetes.io/hostname": nodeHostNameLabel},
- Resources: &v1.ResourceRequirements{
- Requests: v1.ResourceList{fakecpu: resource.MustParse("500")},
- Limits: v1.ResourceList{fakecpu: resource.MustParse("500")},
- },
- }
- preemptorPod := createPod(f, preemptorPodConf)
- waitForPreemptingWithTimeout(f, preemptorPod, framework.PodGetTimeout)
- framework.Logf("pods created so far: %v", podNamesSeen)
- // count pods number of ReplicaSet{1,2,3}:
- // - if it's more than expected replicas, it denotes its pods have been over-preempted
- // - if it's less than expected replicas, it denotes its pods are under-preempted
- // "*2" means pods of ReplicaSet{1,2} are expected to be only preempted once.
- expectedRSPods := []int32{1 * 2, 1 * 2, 1}
- err := wait.Poll(framework.Poll, framework.PollShortTimeout, func() (bool, error) {
- for i := 0; i < len(podNamesSeen); i++ {
- got := atomic.LoadInt32(&podNamesSeen[i])
- if got < expectedRSPods[i] {
- framework.Logf("waiting for rs%d to observe %d pod creations, got %d", i+1, expectedRSPods[i], got)
- return false, nil
- } else if got > expectedRSPods[i] {
- return false, fmt.Errorf("rs%d had more than %d pods created: %d", i+1, expectedRSPods[i], got)
- }
- }
- return true, nil
- })
- if err != nil {
- framework.Logf("pods created so far: %v", podNamesSeen)
- framework.Failf("failed pod observation expectations: %v", err)
- }
- // If logic continues to here, we should do a final check to ensure within a time period,
- // the state is stable; otherwise, pods may be over-preempted.
- time.Sleep(5 * time.Second)
- for i := 0; i < len(podNamesSeen); i++ {
- got := atomic.LoadInt32(&podNamesSeen[i])
- if got < expectedRSPods[i] {
- framework.Failf("pods of ReplicaSet%d have been under-preempted: expect %v pod names, but got %d", i+1, expectedRSPods[i], got)
- } else if got > expectedRSPods[i] {
- framework.Failf("pods of ReplicaSet%d have been over-preempted: expect %v pod names, but got %d", i+1, expectedRSPods[i], got)
- }
- }
- })
- })
- type pauseRSConfig struct {
- Replicas int32
- PodConfig pausePodConfig
- }
- func initPauseRS(f *framework.Framework, conf pauseRSConfig) *appsv1.ReplicaSet {
- pausePod := initPausePod(f, conf.PodConfig)
- pauseRS := &appsv1.ReplicaSet{
- ObjectMeta: metav1.ObjectMeta{
- Name: "rs-" + pausePod.Name,
- Namespace: pausePod.Namespace,
- },
- Spec: appsv1.ReplicaSetSpec{
- Replicas: &conf.Replicas,
- Selector: &metav1.LabelSelector{
- MatchLabels: pausePod.Labels,
- },
- Template: v1.PodTemplateSpec{
- ObjectMeta: metav1.ObjectMeta{Labels: pausePod.ObjectMeta.Labels},
- Spec: pausePod.Spec,
- },
- },
- }
- return pauseRS
- }
- func createPauseRS(f *framework.Framework, conf pauseRSConfig) *appsv1.ReplicaSet {
- namespace := conf.PodConfig.Namespace
- if len(namespace) == 0 {
- namespace = f.Namespace.Name
- }
- rs, err := f.ClientSet.AppsV1().ReplicaSets(namespace).Create(context.TODO(), initPauseRS(f, conf), metav1.CreateOptions{})
- framework.ExpectNoError(err)
- return rs
- }
- func runPauseRS(f *framework.Framework, conf pauseRSConfig) *appsv1.ReplicaSet {
- rs := createPauseRS(f, conf)
- framework.ExpectNoError(replicaset.WaitForReplicaSetTargetAvailableReplicasWithTimeout(f.ClientSet, rs, conf.Replicas, framework.PodGetTimeout))
- return rs
- }
- func createPod(f *framework.Framework, conf pausePodConfig) *v1.Pod {
- namespace := conf.Namespace
- if len(namespace) == 0 {
- namespace = f.Namespace.Name
- }
- pod, err := f.ClientSet.CoreV1().Pods(namespace).Create(context.TODO(), initPausePod(f, conf), metav1.CreateOptions{})
- framework.ExpectNoError(err)
- return pod
- }
- // waitForPreemptingWithTimeout verifies if 'pod' is preempting within 'timeout', specifically it checks
- // if the 'spec.NodeName' field of preemptor 'pod' has been set.
- func waitForPreemptingWithTimeout(f *framework.Framework, pod *v1.Pod, timeout time.Duration) {
- err := wait.Poll(2*time.Second, timeout, func() (bool, error) {
- pod, err := f.ClientSet.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{})
- if err != nil {
- return false, err
- }
- if len(pod.Spec.NodeName) > 0 {
- return true, nil
- }
- return false, err
- })
- framework.ExpectNoError(err, "pod %v/%v failed to preempt other pods", pod.Namespace, pod.Name)
- }
- func getCurrentPodUsageOnTheNode(nodeName string, pods []v1.Pod, resource *v1.ResourceRequirements) (int64, int64) {
- totalRequestedCPUResource := resource.Requests.Cpu().MilliValue()
- totalRequestedMemResource := resource.Requests.Memory().Value()
- for _, pod := range pods {
- if pod.Spec.NodeName != nodeName || v1qos.GetPodQOS(&pod) == v1.PodQOSBestEffort {
- continue
- }
- result := getNonZeroRequests(&pod)
- totalRequestedCPUResource += result.MilliCPU
- totalRequestedMemResource += result.Memory
- }
- return totalRequestedCPUResource, totalRequestedMemResource
- }
|