123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527 |
- /*
- Copyright 2016 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package autoscaling
- import (
- "encoding/json"
- "fmt"
- "math"
- "strings"
- "time"
- "k8s.io/api/core/v1"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/fields"
- "k8s.io/apimachinery/pkg/types"
- "k8s.io/apimachinery/pkg/util/strategicpatch"
- clientset "k8s.io/client-go/kubernetes"
- "k8s.io/kubernetes/test/e2e/framework"
- testutils "k8s.io/kubernetes/test/utils"
- imageutils "k8s.io/kubernetes/test/utils/image"
- "github.com/onsi/ginkgo"
- "github.com/onsi/gomega"
- "k8s.io/klog"
- )
- const (
- memoryReservationTimeout = 5 * time.Minute
- largeResizeTimeout = 8 * time.Minute
- largeScaleUpTimeout = 10 * time.Minute
- maxNodes = 1000
- )
- type clusterPredicates struct {
- nodes int
- }
- type scaleUpTestConfig struct {
- initialNodes int
- initialPods int
- extraPods *testutils.RCConfig
- expectedResult *clusterPredicates
- }
- var _ = framework.KubeDescribe("Cluster size autoscaler scalability [Slow]", func() {
- f := framework.NewDefaultFramework("autoscaling")
- var c clientset.Interface
- var nodeCount int
- var coresPerNode int
- var memCapacityMb int
- var originalSizes map[string]int
- var sum int
- ginkgo.BeforeEach(func() {
- framework.SkipUnlessProviderIs("gce", "gke", "kubemark")
- // Check if Cloud Autoscaler is enabled by trying to get its ConfigMap.
- _, err := f.ClientSet.CoreV1().ConfigMaps("kube-system").Get("cluster-autoscaler-status", metav1.GetOptions{})
- if err != nil {
- framework.Skipf("test expects Cluster Autoscaler to be enabled")
- }
- c = f.ClientSet
- if originalSizes == nil {
- originalSizes = make(map[string]int)
- sum = 0
- for _, mig := range strings.Split(framework.TestContext.CloudConfig.NodeInstanceGroup, ",") {
- size, err := framework.GroupSize(mig)
- framework.ExpectNoError(err)
- ginkgo.By(fmt.Sprintf("Initial size of %s: %d", mig, size))
- originalSizes[mig] = size
- sum += size
- }
- }
- framework.ExpectNoError(framework.WaitForReadyNodes(c, sum, scaleUpTimeout))
- nodes := framework.GetReadySchedulableNodesOrDie(f.ClientSet)
- nodeCount = len(nodes.Items)
- gomega.Expect(nodeCount).NotTo(gomega.BeZero())
- cpu := nodes.Items[0].Status.Capacity[v1.ResourceCPU]
- mem := nodes.Items[0].Status.Capacity[v1.ResourceMemory]
- coresPerNode = int((&cpu).MilliValue() / 1000)
- memCapacityMb = int((&mem).Value() / 1024 / 1024)
- gomega.Expect(nodeCount).Should(gomega.Equal(sum))
- if framework.ProviderIs("gke") {
- val, err := isAutoscalerEnabled(3)
- framework.ExpectNoError(err)
- if !val {
- err = enableAutoscaler("default-pool", 3, 5)
- framework.ExpectNoError(err)
- }
- }
- })
- ginkgo.AfterEach(func() {
- ginkgo.By(fmt.Sprintf("Restoring initial size of the cluster"))
- setMigSizes(originalSizes)
- framework.ExpectNoError(framework.WaitForReadyNodes(c, nodeCount, scaleDownTimeout))
- nodes, err := c.CoreV1().Nodes().List(metav1.ListOptions{})
- framework.ExpectNoError(err)
- s := time.Now()
- makeSchedulableLoop:
- for start := time.Now(); time.Since(start) < makeSchedulableTimeout; time.Sleep(makeSchedulableDelay) {
- for _, n := range nodes.Items {
- err = makeNodeSchedulable(c, &n, true)
- switch err.(type) {
- case CriticalAddonsOnlyError:
- continue makeSchedulableLoop
- default:
- framework.ExpectNoError(err)
- }
- }
- break
- }
- klog.Infof("Made nodes schedulable again in %v", time.Since(s).String())
- })
- ginkgo.It("should scale up at all [Feature:ClusterAutoscalerScalability1]", func() {
- perNodeReservation := int(float64(memCapacityMb) * 0.95)
- replicasPerNode := 10
- additionalNodes := maxNodes - nodeCount
- replicas := additionalNodes * replicasPerNode
- additionalReservation := additionalNodes * perNodeReservation
- // saturate cluster
- reservationCleanup := ReserveMemory(f, "some-pod", nodeCount*2, nodeCount*perNodeReservation, true, memoryReservationTimeout)
- defer reservationCleanup()
- framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, c))
- // configure pending pods & expected scale up
- rcConfig := reserveMemoryRCConfig(f, "extra-pod-1", replicas, additionalReservation, largeScaleUpTimeout)
- expectedResult := createClusterPredicates(nodeCount + additionalNodes)
- config := createScaleUpTestConfig(nodeCount, nodeCount, rcConfig, expectedResult)
- // run test
- testCleanup := simpleScaleUpTest(f, config)
- defer testCleanup()
- })
- ginkgo.It("should scale up twice [Feature:ClusterAutoscalerScalability2]", func() {
- perNodeReservation := int(float64(memCapacityMb) * 0.95)
- replicasPerNode := 10
- additionalNodes1 := int(math.Ceil(0.7 * maxNodes))
- additionalNodes2 := int(math.Ceil(0.25 * maxNodes))
- if additionalNodes1+additionalNodes2 > maxNodes {
- additionalNodes2 = maxNodes - additionalNodes1
- }
- replicas1 := additionalNodes1 * replicasPerNode
- replicas2 := additionalNodes2 * replicasPerNode
- klog.Infof("cores per node: %v", coresPerNode)
- // saturate cluster
- initialReplicas := nodeCount
- reservationCleanup := ReserveMemory(f, "some-pod", initialReplicas, nodeCount*perNodeReservation, true, memoryReservationTimeout)
- defer reservationCleanup()
- framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, c))
- klog.Infof("Reserved successfully")
- // configure pending pods & expected scale up #1
- rcConfig := reserveMemoryRCConfig(f, "extra-pod-1", replicas1, additionalNodes1*perNodeReservation, largeScaleUpTimeout)
- expectedResult := createClusterPredicates(nodeCount + additionalNodes1)
- config := createScaleUpTestConfig(nodeCount, nodeCount, rcConfig, expectedResult)
- // run test #1
- tolerateUnreadyNodes := additionalNodes1 / 20
- tolerateUnreadyPods := (initialReplicas + replicas1) / 20
- testCleanup1 := simpleScaleUpTestWithTolerance(f, config, tolerateUnreadyNodes, tolerateUnreadyPods)
- defer testCleanup1()
- klog.Infof("Scaled up once")
- // configure pending pods & expected scale up #2
- rcConfig2 := reserveMemoryRCConfig(f, "extra-pod-2", replicas2, additionalNodes2*perNodeReservation, largeScaleUpTimeout)
- expectedResult2 := createClusterPredicates(nodeCount + additionalNodes1 + additionalNodes2)
- config2 := createScaleUpTestConfig(nodeCount+additionalNodes1, nodeCount+additionalNodes2, rcConfig2, expectedResult2)
- // run test #2
- tolerateUnreadyNodes = maxNodes / 20
- tolerateUnreadyPods = (initialReplicas + replicas1 + replicas2) / 20
- testCleanup2 := simpleScaleUpTestWithTolerance(f, config2, tolerateUnreadyNodes, tolerateUnreadyPods)
- defer testCleanup2()
- klog.Infof("Scaled up twice")
- })
- ginkgo.It("should scale down empty nodes [Feature:ClusterAutoscalerScalability3]", func() {
- perNodeReservation := int(float64(memCapacityMb) * 0.7)
- replicas := int(math.Ceil(maxNodes * 0.7))
- totalNodes := maxNodes
- // resize cluster to totalNodes
- newSizes := map[string]int{
- anyKey(originalSizes): totalNodes,
- }
- setMigSizes(newSizes)
- framework.ExpectNoError(framework.WaitForReadyNodes(f.ClientSet, totalNodes, largeResizeTimeout))
- // run replicas
- rcConfig := reserveMemoryRCConfig(f, "some-pod", replicas, replicas*perNodeReservation, largeScaleUpTimeout)
- expectedResult := createClusterPredicates(totalNodes)
- config := createScaleUpTestConfig(totalNodes, totalNodes, rcConfig, expectedResult)
- tolerateUnreadyNodes := totalNodes / 10
- tolerateUnreadyPods := replicas / 10
- testCleanup := simpleScaleUpTestWithTolerance(f, config, tolerateUnreadyNodes, tolerateUnreadyPods)
- defer testCleanup()
- // check if empty nodes are scaled down
- framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet,
- func(size int) bool {
- return size <= replicas+3 // leaving space for non-evictable kube-system pods
- }, scaleDownTimeout))
- })
- ginkgo.It("should scale down underutilized nodes [Feature:ClusterAutoscalerScalability4]", func() {
- perPodReservation := int(float64(memCapacityMb) * 0.01)
- // underutilizedNodes are 10% full
- underutilizedPerNodeReplicas := 10
- // fullNodes are 70% full
- fullPerNodeReplicas := 70
- totalNodes := maxNodes
- underutilizedRatio := 0.3
- maxDelta := 30
- // resize cluster to totalNodes
- newSizes := map[string]int{
- anyKey(originalSizes): totalNodes,
- }
- setMigSizes(newSizes)
- framework.ExpectNoError(framework.WaitForReadyNodes(f.ClientSet, totalNodes, largeResizeTimeout))
- // annotate all nodes with no-scale-down
- ScaleDownDisabledKey := "cluster-autoscaler.kubernetes.io/scale-down-disabled"
- nodes, err := f.ClientSet.CoreV1().Nodes().List(metav1.ListOptions{
- FieldSelector: fields.Set{
- "spec.unschedulable": "false",
- }.AsSelector().String(),
- })
- framework.ExpectNoError(err)
- framework.ExpectNoError(addAnnotation(f, nodes.Items, ScaleDownDisabledKey, "true"))
- // distribute pods using replication controllers taking up space that should
- // be empty after pods are distributed
- underutilizedNodesNum := int(float64(maxNodes) * underutilizedRatio)
- fullNodesNum := totalNodes - underutilizedNodesNum
- podDistribution := []podBatch{
- {numNodes: fullNodesNum, podsPerNode: fullPerNodeReplicas},
- {numNodes: underutilizedNodesNum, podsPerNode: underutilizedPerNodeReplicas}}
- cleanup := distributeLoad(f, f.Namespace.Name, "10-70", podDistribution, perPodReservation,
- int(0.95*float64(memCapacityMb)), map[string]string{}, largeScaleUpTimeout)
- defer cleanup()
- // enable scale down again
- framework.ExpectNoError(addAnnotation(f, nodes.Items, ScaleDownDisabledKey, "false"))
- // wait for scale down to start. Node deletion takes a long time, so we just
- // wait for maximum of 30 nodes deleted
- nodesToScaleDownCount := int(float64(totalNodes) * 0.1)
- if nodesToScaleDownCount > maxDelta {
- nodesToScaleDownCount = maxDelta
- }
- expectedSize := totalNodes - nodesToScaleDownCount
- timeout := time.Duration(nodesToScaleDownCount)*time.Minute + scaleDownTimeout
- framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet, func(size int) bool {
- return size <= expectedSize
- }, timeout))
- })
- ginkgo.It("shouldn't scale down with underutilized nodes due to host port conflicts [Feature:ClusterAutoscalerScalability5]", func() {
- fullReservation := int(float64(memCapacityMb) * 0.9)
- hostPortPodReservation := int(float64(memCapacityMb) * 0.3)
- totalNodes := maxNodes
- reservedPort := 4321
- // resize cluster to totalNodes
- newSizes := map[string]int{
- anyKey(originalSizes): totalNodes,
- }
- setMigSizes(newSizes)
- framework.ExpectNoError(framework.WaitForReadyNodes(f.ClientSet, totalNodes, largeResizeTimeout))
- divider := int(float64(totalNodes) * 0.7)
- fullNodesCount := divider
- underutilizedNodesCount := totalNodes - fullNodesCount
- ginkgo.By("Reserving full nodes")
- // run RC1 w/o host port
- cleanup := ReserveMemory(f, "filling-pod", fullNodesCount, fullNodesCount*fullReservation, true, largeScaleUpTimeout*2)
- defer cleanup()
- ginkgo.By("Reserving host ports on remaining nodes")
- // run RC2 w/ host port
- cleanup2 := createHostPortPodsWithMemory(f, "underutilizing-host-port-pod", underutilizedNodesCount, reservedPort, underutilizedNodesCount*hostPortPodReservation, largeScaleUpTimeout)
- defer cleanup2()
- waitForAllCaPodsReadyInNamespace(f, c)
- // wait and check scale down doesn't occur
- ginkgo.By(fmt.Sprintf("Sleeping %v minutes...", scaleDownTimeout.Minutes()))
- time.Sleep(scaleDownTimeout)
- ginkgo.By("Checking if the number of nodes is as expected")
- nodes := framework.GetReadySchedulableNodesOrDie(f.ClientSet)
- klog.Infof("Nodes: %v, expected: %v", len(nodes.Items), totalNodes)
- gomega.Expect(len(nodes.Items)).Should(gomega.Equal(totalNodes))
- })
- ginkgo.Specify("CA ignores unschedulable pods while scheduling schedulable pods [Feature:ClusterAutoscalerScalability6]", func() {
- // Start a number of pods saturating existing nodes.
- perNodeReservation := int(float64(memCapacityMb) * 0.80)
- replicasPerNode := 10
- initialPodReplicas := nodeCount * replicasPerNode
- initialPodsTotalMemory := nodeCount * perNodeReservation
- reservationCleanup := ReserveMemory(f, "initial-pod", initialPodReplicas, initialPodsTotalMemory, true /* wait for pods to run */, memoryReservationTimeout)
- defer reservationCleanup()
- framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, c))
- // Configure a number of unschedulable pods.
- unschedulableMemReservation := memCapacityMb * 2
- unschedulablePodReplicas := 1000
- totalMemReservation := unschedulableMemReservation * unschedulablePodReplicas
- timeToWait := 5 * time.Minute
- podsConfig := reserveMemoryRCConfig(f, "unschedulable-pod", unschedulablePodReplicas, totalMemReservation, timeToWait)
- framework.RunRC(*podsConfig) // Ignore error (it will occur because pods are unschedulable)
- defer framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, podsConfig.Name)
- // Ensure that no new nodes have been added so far.
- gomega.Expect(framework.NumberOfReadyNodes(f.ClientSet)).To(gomega.Equal(nodeCount))
- // Start a number of schedulable pods to ensure CA reacts.
- additionalNodes := maxNodes - nodeCount
- replicas := additionalNodes * replicasPerNode
- totalMemory := additionalNodes * perNodeReservation
- rcConfig := reserveMemoryRCConfig(f, "extra-pod", replicas, totalMemory, largeScaleUpTimeout)
- expectedResult := createClusterPredicates(nodeCount + additionalNodes)
- config := createScaleUpTestConfig(nodeCount, initialPodReplicas, rcConfig, expectedResult)
- // Test that scale up happens, allowing 1000 unschedulable pods not to be scheduled.
- testCleanup := simpleScaleUpTestWithTolerance(f, config, 0, unschedulablePodReplicas)
- defer testCleanup()
- })
- })
- func anyKey(input map[string]int) string {
- for k := range input {
- return k
- }
- return ""
- }
- func simpleScaleUpTestWithTolerance(f *framework.Framework, config *scaleUpTestConfig, tolerateMissingNodeCount int, tolerateMissingPodCount int) func() error {
- // resize cluster to start size
- // run rc based on config
- ginkgo.By(fmt.Sprintf("Running RC %v from config", config.extraPods.Name))
- start := time.Now()
- framework.ExpectNoError(framework.RunRC(*config.extraPods))
- // check results
- if tolerateMissingNodeCount > 0 {
- // Tolerate some number of nodes not to be created.
- minExpectedNodeCount := config.expectedResult.nodes - tolerateMissingNodeCount
- framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet,
- func(size int) bool { return size >= minExpectedNodeCount }, scaleUpTimeout))
- } else {
- framework.ExpectNoError(framework.WaitForReadyNodes(f.ClientSet, config.expectedResult.nodes, scaleUpTimeout))
- }
- klog.Infof("cluster is increased")
- if tolerateMissingPodCount > 0 {
- framework.ExpectNoError(waitForCaPodsReadyInNamespace(f, f.ClientSet, tolerateMissingPodCount))
- } else {
- framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, f.ClientSet))
- }
- timeTrack(start, fmt.Sprintf("Scale up to %v", config.expectedResult.nodes))
- return func() error {
- return framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, config.extraPods.Name)
- }
- }
- func simpleScaleUpTest(f *framework.Framework, config *scaleUpTestConfig) func() error {
- return simpleScaleUpTestWithTolerance(f, config, 0, 0)
- }
- func reserveMemoryRCConfig(f *framework.Framework, id string, replicas, megabytes int, timeout time.Duration) *testutils.RCConfig {
- return &testutils.RCConfig{
- Client: f.ClientSet,
- Name: id,
- Namespace: f.Namespace.Name,
- Timeout: timeout,
- Image: imageutils.GetPauseImageName(),
- Replicas: replicas,
- MemRequest: int64(1024 * 1024 * megabytes / replicas),
- }
- }
- func createScaleUpTestConfig(nodes, pods int, extraPods *testutils.RCConfig, expectedResult *clusterPredicates) *scaleUpTestConfig {
- return &scaleUpTestConfig{
- initialNodes: nodes,
- initialPods: pods,
- extraPods: extraPods,
- expectedResult: expectedResult,
- }
- }
- func createClusterPredicates(nodes int) *clusterPredicates {
- return &clusterPredicates{
- nodes: nodes,
- }
- }
- func addAnnotation(f *framework.Framework, nodes []v1.Node, key, value string) error {
- for _, node := range nodes {
- oldData, err := json.Marshal(node)
- if err != nil {
- return err
- }
- if node.Annotations == nil {
- node.Annotations = make(map[string]string)
- }
- node.Annotations[key] = value
- newData, err := json.Marshal(node)
- if err != nil {
- return err
- }
- patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Node{})
- if err != nil {
- return err
- }
- _, err = f.ClientSet.CoreV1().Nodes().Patch(string(node.Name), types.StrategicMergePatchType, patchBytes)
- if err != nil {
- return err
- }
- }
- return nil
- }
- func createHostPortPodsWithMemory(f *framework.Framework, id string, replicas, port, megabytes int, timeout time.Duration) func() error {
- ginkgo.By(fmt.Sprintf("Running RC which reserves host port and memory"))
- request := int64(1024 * 1024 * megabytes / replicas)
- config := &testutils.RCConfig{
- Client: f.ClientSet,
- Name: id,
- Namespace: f.Namespace.Name,
- Timeout: timeout,
- Image: imageutils.GetPauseImageName(),
- Replicas: replicas,
- HostPorts: map[string]int{"port1": port},
- MemRequest: request,
- }
- err := framework.RunRC(*config)
- framework.ExpectNoError(err)
- return func() error {
- return framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, id)
- }
- }
- type podBatch struct {
- numNodes int
- podsPerNode int
- }
- // distributeLoad distributes the pods in the way described by podDostribution,
- // assuming all pods will have the same memory reservation and all nodes the same
- // memory capacity. This allows us generate the load on the cluster in the exact
- // way that we want.
- //
- // To achieve this we do the following:
- // 1. Create replication controllers that eat up all the space that should be
- // empty after setup, making sure they end up on different nodes by specifying
- // conflicting host port
- // 2. Create targer RC that will generate the load on the cluster
- // 3. Remove the rcs created in 1.
- func distributeLoad(f *framework.Framework, namespace string, id string, podDistribution []podBatch,
- podMemRequestMegabytes int, nodeMemCapacity int, labels map[string]string, timeout time.Duration) func() error {
- port := 8013
- // Create load-distribution RCs with one pod per node, reserving all remaining
- // memory to force the distribution of pods for the target RCs.
- // The load-distribution RCs will be deleted on function return.
- totalPods := 0
- for i, podBatch := range podDistribution {
- totalPods += podBatch.numNodes * podBatch.podsPerNode
- remainingMem := nodeMemCapacity - podBatch.podsPerNode*podMemRequestMegabytes
- replicas := podBatch.numNodes
- cleanup := createHostPortPodsWithMemory(f, fmt.Sprintf("load-distribution%d", i), replicas, port, remainingMem*replicas, timeout)
- defer cleanup()
- }
- framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, f.ClientSet))
- // Create the target RC
- rcConfig := reserveMemoryRCConfig(f, id, totalPods, totalPods*podMemRequestMegabytes, timeout)
- framework.ExpectNoError(framework.RunRC(*rcConfig))
- framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, f.ClientSet))
- return func() error {
- return framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, id)
- }
- }
- func timeTrack(start time.Time, name string) {
- elapsed := time.Since(start)
- klog.Infof("%s took %s", name, elapsed)
- }
|