123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041 |
- /*
- Copyright 2015 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package scalability
- import (
- "context"
- "fmt"
- "math"
- "os"
- "sort"
- "strconv"
- "sync"
- "time"
- v1 "k8s.io/api/core/v1"
- "k8s.io/apimachinery/pkg/api/resource"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/fields"
- "k8s.io/apimachinery/pkg/labels"
- "k8s.io/apimachinery/pkg/runtime"
- "k8s.io/apimachinery/pkg/runtime/schema"
- utiluuid "k8s.io/apimachinery/pkg/util/uuid"
- "k8s.io/apimachinery/pkg/watch"
- clientset "k8s.io/client-go/kubernetes"
- scaleclient "k8s.io/client-go/scale"
- "k8s.io/client-go/tools/cache"
- "k8s.io/client-go/util/workqueue"
- "k8s.io/kubernetes/pkg/apis/batch"
- api "k8s.io/kubernetes/pkg/apis/core"
- "k8s.io/kubernetes/pkg/apis/extensions"
- "k8s.io/kubernetes/test/e2e/framework"
- e2elog "k8s.io/kubernetes/test/e2e/framework/log"
- "k8s.io/kubernetes/test/e2e/framework/timer"
- testutils "k8s.io/kubernetes/test/utils"
- imageutils "k8s.io/kubernetes/test/utils/image"
- . "github.com/onsi/ginkgo"
- . "github.com/onsi/gomega"
- )
- const (
- PodStartupLatencyThreshold = 5 * time.Second
- MinSaturationThreshold = 2 * time.Minute
- MinPodsPerSecondThroughput = 8
- DensityPollInterval = 10 * time.Second
- MinPodStartupMeasurements = 500
- )
- // Maximum container failures this test tolerates before failing.
- var MaxContainerFailures = 0
- // Maximum no. of missing measurements related to pod-startup that the test tolerates.
- var MaxMissingPodStartupMeasurements = 0
- // Number of nodes in the cluster (computed inside BeforeEach).
- var nodeCount = 0
- type DensityTestConfig struct {
- Configs []testutils.RunObjectConfig
- ClientSets []clientset.Interface
- ScaleClients []scaleclient.ScalesGetter
- PollInterval time.Duration
- PodCount int
- // What kind of resource we want to create
- kind schema.GroupKind
- SecretConfigs []*testutils.SecretConfig
- ConfigMapConfigs []*testutils.ConfigMapConfig
- DaemonConfigs []*testutils.DaemonConfig
- }
- type saturationTime struct {
- TimeToSaturate time.Duration `json:"timeToSaturate"`
- NumberOfNodes int `json:"numberOfNodes"`
- NumberOfPods int `json:"numberOfPods"`
- Throughput float32 `json:"throughput"`
- }
- func (dtc *DensityTestConfig) runSecretConfigs(testPhase *timer.Phase) {
- defer testPhase.End()
- for _, sc := range dtc.SecretConfigs {
- sc.Run()
- }
- }
- func (dtc *DensityTestConfig) runConfigMapConfigs(testPhase *timer.Phase) {
- defer testPhase.End()
- for _, cmc := range dtc.ConfigMapConfigs {
- cmc.Run()
- }
- }
- func (dtc *DensityTestConfig) runDaemonConfigs(testPhase *timer.Phase) {
- defer testPhase.End()
- for _, dc := range dtc.DaemonConfigs {
- dc.Run()
- }
- }
- func (dtc *DensityTestConfig) deleteSecrets(testPhase *timer.Phase) {
- defer testPhase.End()
- for i := range dtc.SecretConfigs {
- dtc.SecretConfigs[i].Stop()
- }
- }
- func (dtc *DensityTestConfig) deleteConfigMaps(testPhase *timer.Phase) {
- defer testPhase.End()
- for i := range dtc.ConfigMapConfigs {
- dtc.ConfigMapConfigs[i].Stop()
- }
- }
- func (dtc *DensityTestConfig) deleteDaemonSets(numberOfClients int, testPhase *timer.Phase) {
- defer testPhase.End()
- for i := range dtc.DaemonConfigs {
- framework.ExpectNoError(framework.DeleteResourceAndWaitForGC(
- dtc.ClientSets[i%numberOfClients],
- extensions.Kind("DaemonSet"),
- dtc.DaemonConfigs[i].Namespace,
- dtc.DaemonConfigs[i].Name,
- ))
- }
- }
- func density30AddonResourceVerifier(numNodes int) map[string]framework.ResourceConstraint {
- var apiserverMem uint64
- var controllerMem uint64
- var schedulerMem uint64
- apiserverCPU := math.MaxFloat32
- apiserverMem = math.MaxUint64
- controllerCPU := math.MaxFloat32
- controllerMem = math.MaxUint64
- schedulerCPU := math.MaxFloat32
- schedulerMem = math.MaxUint64
- e2elog.Logf("Setting resource constraints for provider: %s", framework.TestContext.Provider)
- if framework.ProviderIs("kubemark") {
- if numNodes <= 5 {
- apiserverCPU = 0.35
- apiserverMem = 150 * (1024 * 1024)
- controllerCPU = 0.15
- controllerMem = 100 * (1024 * 1024)
- schedulerCPU = 0.05
- schedulerMem = 50 * (1024 * 1024)
- } else if numNodes <= 100 {
- apiserverCPU = 1.5
- apiserverMem = 1500 * (1024 * 1024)
- controllerCPU = 0.5
- controllerMem = 500 * (1024 * 1024)
- schedulerCPU = 0.4
- schedulerMem = 180 * (1024 * 1024)
- } else if numNodes <= 500 {
- apiserverCPU = 3.5
- apiserverMem = 3400 * (1024 * 1024)
- controllerCPU = 1.3
- controllerMem = 1100 * (1024 * 1024)
- schedulerCPU = 1.5
- schedulerMem = 500 * (1024 * 1024)
- } else if numNodes <= 1000 {
- apiserverCPU = 5.5
- apiserverMem = 4000 * (1024 * 1024)
- controllerCPU = 3
- controllerMem = 2000 * (1024 * 1024)
- schedulerCPU = 1.5
- schedulerMem = 750 * (1024 * 1024)
- }
- } else {
- if numNodes <= 100 {
- apiserverCPU = 2.2
- apiserverMem = 1700 * (1024 * 1024)
- controllerCPU = 0.8
- controllerMem = 530 * (1024 * 1024)
- schedulerCPU = 0.4
- schedulerMem = 180 * (1024 * 1024)
- }
- }
- constraints := make(map[string]framework.ResourceConstraint)
- constraints["fluentd-elasticsearch"] = framework.ResourceConstraint{
- CPUConstraint: 0.2,
- MemoryConstraint: 250 * (1024 * 1024),
- }
- constraints["elasticsearch-logging"] = framework.ResourceConstraint{
- CPUConstraint: 2,
- // TODO: bring it down to 750MB again, when we lower Kubelet verbosity level. I.e. revert #19164
- MemoryConstraint: 5000 * (1024 * 1024),
- }
- constraints["heapster"] = framework.ResourceConstraint{
- CPUConstraint: 2,
- MemoryConstraint: 1800 * (1024 * 1024),
- }
- constraints["kibana-logging"] = framework.ResourceConstraint{
- CPUConstraint: 0.2,
- MemoryConstraint: 100 * (1024 * 1024),
- }
- constraints["kube-proxy"] = framework.ResourceConstraint{
- CPUConstraint: 0.15,
- MemoryConstraint: 100 * (1024 * 1024),
- }
- constraints["l7-lb-controller"] = framework.ResourceConstraint{
- CPUConstraint: 0.2 + 0.00015*float64(numNodes),
- MemoryConstraint: (75 + uint64(math.Ceil(0.8*float64(numNodes)))) * (1024 * 1024),
- }
- constraints["influxdb"] = framework.ResourceConstraint{
- CPUConstraint: 2,
- MemoryConstraint: 500 * (1024 * 1024),
- }
- constraints["kube-apiserver"] = framework.ResourceConstraint{
- CPUConstraint: apiserverCPU,
- MemoryConstraint: apiserverMem,
- }
- constraints["kube-controller-manager"] = framework.ResourceConstraint{
- CPUConstraint: controllerCPU,
- MemoryConstraint: controllerMem,
- }
- constraints["kube-scheduler"] = framework.ResourceConstraint{
- CPUConstraint: schedulerCPU,
- MemoryConstraint: schedulerMem,
- }
- constraints["coredns"] = framework.ResourceConstraint{
- CPUConstraint: framework.NoCPUConstraint,
- MemoryConstraint: 170 * (1024 * 1024),
- }
- constraints["kubedns"] = framework.ResourceConstraint{
- CPUConstraint: framework.NoCPUConstraint,
- MemoryConstraint: 170 * (1024 * 1024),
- }
- return constraints
- }
- func computeAverage(sample []float64) float64 {
- sum := 0.0
- for _, value := range sample {
- sum += value
- }
- return sum / float64(len(sample))
- }
- func computeQuantile(sample []float64, quantile float64) float64 {
- Expect(sort.Float64sAreSorted(sample)).To(Equal(true))
- Expect(quantile >= 0.0 && quantile <= 1.0).To(Equal(true))
- index := int(quantile*float64(len(sample))) - 1
- if index < 0 {
- return math.NaN()
- }
- return sample[index]
- }
- func logPodStartupStatus(
- c clientset.Interface,
- expectedPods int,
- observedLabels map[string]string,
- period time.Duration,
- scheduleThroughputs *[]float64,
- stopCh chan struct{}) {
- label := labels.SelectorFromSet(labels.Set(observedLabels))
- podStore, err := testutils.NewPodStore(c, metav1.NamespaceAll, label, fields.Everything())
- framework.ExpectNoError(err)
- defer podStore.Stop()
- ticker := time.NewTicker(period)
- startupStatus := testutils.ComputeRCStartupStatus(podStore.List(), expectedPods)
- lastScheduledCount := startupStatus.Scheduled
- defer ticker.Stop()
- for {
- select {
- case <-ticker.C:
- case <-stopCh:
- return
- }
- // Log status of the pods.
- startupStatus := testutils.ComputeRCStartupStatus(podStore.List(), expectedPods)
- e2elog.Logf(startupStatus.String("Density"))
- // Compute scheduling throughput for the latest time period.
- throughput := float64(startupStatus.Scheduled-lastScheduledCount) / float64(period/time.Second)
- *scheduleThroughputs = append(*scheduleThroughputs, throughput)
- lastScheduledCount = startupStatus.Scheduled
- }
- }
- // runDensityTest will perform a density test and return the time it took for
- // all pods to start
- func runDensityTest(dtc DensityTestConfig, testPhaseDurations *timer.TestPhaseTimer, scheduleThroughputs *[]float64) time.Duration {
- defer GinkgoRecover()
- // Create all secrets, configmaps and daemons.
- dtc.runSecretConfigs(testPhaseDurations.StartPhase(250, "secrets creation"))
- dtc.runConfigMapConfigs(testPhaseDurations.StartPhase(260, "configmaps creation"))
- dtc.runDaemonConfigs(testPhaseDurations.StartPhase(270, "daemonsets creation"))
- replicationCtrlStartupPhase := testPhaseDurations.StartPhase(300, "saturation pods creation")
- defer replicationCtrlStartupPhase.End()
- // Start scheduler CPU profile-gatherer before we begin cluster saturation.
- profileGatheringDelay := time.Duration(1+nodeCount/100) * time.Minute
- schedulerProfilingStopCh := framework.StartCPUProfileGatherer("kube-scheduler", "density", profileGatheringDelay)
- // Start all replication controllers.
- startTime := time.Now()
- wg := sync.WaitGroup{}
- wg.Add(len(dtc.Configs))
- for i := range dtc.Configs {
- config := dtc.Configs[i]
- go func() {
- defer GinkgoRecover()
- // Call wg.Done() in defer to avoid blocking whole test
- // in case of error from RunRC.
- defer wg.Done()
- framework.ExpectNoError(config.Run())
- }()
- }
- logStopCh := make(chan struct{})
- go logPodStartupStatus(dtc.ClientSets[0], dtc.PodCount, map[string]string{"type": "densityPod"}, dtc.PollInterval, scheduleThroughputs, logStopCh)
- wg.Wait()
- startupTime := time.Since(startTime)
- close(logStopCh)
- close(schedulerProfilingStopCh)
- e2elog.Logf("E2E startup time for %d pods: %v", dtc.PodCount, startupTime)
- e2elog.Logf("Throughput (pods/s) during cluster saturation phase: %v", float32(dtc.PodCount)/float32(startupTime/time.Second))
- replicationCtrlStartupPhase.End()
- // Grabbing scheduler memory profile after cluster saturation finished.
- wg.Add(1)
- framework.GatherMemoryProfile("kube-scheduler", "density", &wg)
- wg.Wait()
- printPodAllocationPhase := testPhaseDurations.StartPhase(400, "printing pod allocation")
- defer printPodAllocationPhase.End()
- // Print some data about Pod to Node allocation
- By("Printing Pod to Node allocation data")
- podList, err := dtc.ClientSets[0].CoreV1().Pods(metav1.NamespaceAll).List(metav1.ListOptions{})
- framework.ExpectNoError(err)
- pausePodAllocation := make(map[string]int)
- systemPodAllocation := make(map[string][]string)
- for _, pod := range podList.Items {
- if pod.Namespace == metav1.NamespaceSystem {
- systemPodAllocation[pod.Spec.NodeName] = append(systemPodAllocation[pod.Spec.NodeName], pod.Name)
- } else {
- pausePodAllocation[pod.Spec.NodeName]++
- }
- }
- nodeNames := make([]string, 0)
- for k := range pausePodAllocation {
- nodeNames = append(nodeNames, k)
- }
- sort.Strings(nodeNames)
- for _, node := range nodeNames {
- e2elog.Logf("%v: %v pause pods, system pods: %v", node, pausePodAllocation[node], systemPodAllocation[node])
- }
- defer printPodAllocationPhase.End()
- return startupTime
- }
- func cleanupDensityTest(dtc DensityTestConfig, testPhaseDurations *timer.TestPhaseTimer) {
- defer GinkgoRecover()
- podCleanupPhase := testPhaseDurations.StartPhase(900, "latency pods deletion")
- defer podCleanupPhase.End()
- By("Deleting created Collections")
- numberOfClients := len(dtc.ClientSets)
- // We explicitly delete all pods to have API calls necessary for deletion accounted in metrics.
- for i := range dtc.Configs {
- name := dtc.Configs[i].GetName()
- namespace := dtc.Configs[i].GetNamespace()
- kind := dtc.Configs[i].GetKind()
- By(fmt.Sprintf("Cleaning up only the %v, garbage collector will clean up the pods", kind))
- err := framework.DeleteResourceAndWaitForGC(dtc.ClientSets[i%numberOfClients], kind, namespace, name)
- framework.ExpectNoError(err)
- }
- podCleanupPhase.End()
- dtc.deleteSecrets(testPhaseDurations.StartPhase(910, "secrets deletion"))
- dtc.deleteConfigMaps(testPhaseDurations.StartPhase(920, "configmaps deletion"))
- dtc.deleteDaemonSets(numberOfClients, testPhaseDurations.StartPhase(930, "daemonsets deletion"))
- }
- // This test suite can take a long time to run, and can affect or be affected by other tests.
- // So by default it is added to the ginkgo.skip list (see driver.go).
- // To run this suite you must explicitly ask for it by setting the
- // -t/--test flag or ginkgo.focus flag.
- // IMPORTANT: This test is designed to work on large (>= 100 Nodes) clusters. For smaller ones
- // results will not be representative for control-plane performance as we'll start hitting
- // limits on Docker's concurrent container startup.
- var _ = SIGDescribe("Density", func() {
- var c clientset.Interface
- var additionalPodsPrefix string
- var ns string
- var uuid string
- var e2eStartupTime time.Duration
- var totalPods int
- var nodeCpuCapacity int64
- var nodeMemCapacity int64
- var nodes *v1.NodeList
- var scheduleThroughputs []float64
- testCaseBaseName := "density"
- missingMeasurements := 0
- var testPhaseDurations *timer.TestPhaseTimer
- var profileGathererStopCh chan struct{}
- var etcdMetricsCollector *framework.EtcdMetricsCollector
- // Gathers data prior to framework namespace teardown
- AfterEach(func() {
- // Stop apiserver CPU profile gatherer and gather memory allocations profile.
- close(profileGathererStopCh)
- wg := sync.WaitGroup{}
- wg.Add(1)
- framework.GatherMemoryProfile("kube-apiserver", "density", &wg)
- wg.Wait()
- saturationThreshold := time.Duration((totalPods / MinPodsPerSecondThroughput)) * time.Second
- if saturationThreshold < MinSaturationThreshold {
- saturationThreshold = MinSaturationThreshold
- }
- Expect(e2eStartupTime).NotTo(BeNumerically(">", saturationThreshold))
- saturationData := saturationTime{
- TimeToSaturate: e2eStartupTime,
- NumberOfNodes: nodeCount,
- NumberOfPods: totalPods,
- Throughput: float32(totalPods) / float32(e2eStartupTime/time.Second),
- }
- e2elog.Logf("Cluster saturation time: %s", framework.PrettyPrintJSON(saturationData))
- summaries := make([]framework.TestDataSummary, 0, 2)
- // Verify latency metrics.
- highLatencyRequests, metrics, err := framework.HighLatencyRequests(c, nodeCount)
- framework.ExpectNoError(err)
- if err == nil {
- summaries = append(summaries, metrics)
- }
- // Summarize scheduler metrics.
- latency, err := framework.VerifySchedulerLatency(c)
- framework.ExpectNoError(err)
- if err == nil {
- // Compute avg and quantiles of throughput (excluding last element, that's usually an outlier).
- sampleSize := len(scheduleThroughputs)
- if sampleSize > 1 {
- scheduleThroughputs = scheduleThroughputs[:sampleSize-1]
- sort.Float64s(scheduleThroughputs)
- latency.ThroughputAverage = computeAverage(scheduleThroughputs)
- latency.ThroughputPerc50 = computeQuantile(scheduleThroughputs, 0.5)
- latency.ThroughputPerc90 = computeQuantile(scheduleThroughputs, 0.9)
- latency.ThroughputPerc99 = computeQuantile(scheduleThroughputs, 0.99)
- }
- summaries = append(summaries, latency)
- }
- // Summarize etcd metrics.
- err = etcdMetricsCollector.StopAndSummarize()
- framework.ExpectNoError(err)
- if err == nil {
- summaries = append(summaries, etcdMetricsCollector.GetMetrics())
- }
- summaries = append(summaries, testPhaseDurations)
- framework.PrintSummaries(summaries, testCaseBaseName)
- // Fail if there were some high-latency requests.
- Expect(highLatencyRequests).NotTo(BeNumerically(">", 0), "There should be no high-latency requests")
- // Fail if more than the allowed threshold of measurements were missing in the latencyTest.
- Expect(missingMeasurements <= MaxMissingPodStartupMeasurements).To(Equal(true))
- })
- options := framework.Options{
- ClientQPS: 50.0,
- ClientBurst: 100,
- }
- // Explicitly put here, to delete namespace at the end of the test
- // (after measuring latency metrics, etc.).
- f := framework.NewFramework(testCaseBaseName, options, nil)
- f.NamespaceDeletionTimeout = time.Hour
- BeforeEach(func() {
- c = f.ClientSet
- ns = f.Namespace.Name
- testPhaseDurations = timer.NewTestPhaseTimer()
- // This is used to mimic what new service account token volumes will
- // eventually look like. We can remove this once the controller manager
- // publishes the root CA certificate to each namespace.
- c.CoreV1().ConfigMaps(ns).Create(&v1.ConfigMap{
- ObjectMeta: metav1.ObjectMeta{
- Name: "kube-root-ca-crt",
- },
- Data: map[string]string{
- "ca.crt": "trust me, i'm a ca.crt",
- },
- })
- _, nodes = framework.GetMasterAndWorkerNodesOrDie(c)
- nodeCount = len(nodes.Items)
- Expect(nodeCount).NotTo(BeZero())
- // Compute node capacity, leaving some slack for addon pods.
- nodeCpuCapacity = nodes.Items[0].Status.Allocatable.Cpu().MilliValue() - 100
- nodeMemCapacity = nodes.Items[0].Status.Allocatable.Memory().Value() - 100*1024*1024
- // Terminating a namespace (deleting the remaining objects from it - which
- // generally means events) can affect the current run. Thus we wait for all
- // terminating namespace to be finally deleted before starting this test.
- err := framework.CheckTestingNSDeletedExcept(c, ns)
- framework.ExpectNoError(err)
- uuid = string(utiluuid.NewUUID())
- framework.ExpectNoError(framework.ResetSchedulerMetrics(c))
- framework.ExpectNoError(framework.ResetMetrics(c))
- framework.ExpectNoError(os.Mkdir(fmt.Sprintf(framework.TestContext.OutputDir+"/%s", uuid), 0777))
- e2elog.Logf("Listing nodes for easy debugging:\n")
- for _, node := range nodes.Items {
- var internalIP, externalIP string
- for _, address := range node.Status.Addresses {
- if address.Type == v1.NodeInternalIP {
- internalIP = address.Address
- }
- if address.Type == v1.NodeExternalIP {
- externalIP = address.Address
- }
- }
- e2elog.Logf("Name: %v, clusterIP: %v, externalIP: %v", node.ObjectMeta.Name, internalIP, externalIP)
- }
- // Start apiserver CPU profile gatherer with frequency based on cluster size.
- profileGatheringDelay := time.Duration(5+nodeCount/100) * time.Minute
- profileGathererStopCh = framework.StartCPUProfileGatherer("kube-apiserver", "density", profileGatheringDelay)
- // Start etcs metrics collection.
- etcdMetricsCollector = framework.NewEtcdMetricsCollector()
- etcdMetricsCollector.StartCollecting(time.Minute)
- })
- type Density struct {
- // Controls if e2e latency tests should be run (they are slow)
- runLatencyTest bool
- podsPerNode int
- // Controls how often the apiserver is polled for pods
- interval time.Duration
- // What kind of resource we should be creating. Default: ReplicationController
- kind schema.GroupKind
- secretsPerPod int
- configMapsPerPod int
- svcacctTokenProjectionsPerPod int
- daemonsPerNode int
- quotas bool
- }
- densityTests := []Density{
- // TODO: Expose runLatencyTest as ginkgo flag.
- {podsPerNode: 3, runLatencyTest: false, kind: api.Kind("ReplicationController")},
- {podsPerNode: 30, runLatencyTest: true, kind: api.Kind("ReplicationController")},
- {podsPerNode: 50, runLatencyTest: false, kind: api.Kind("ReplicationController")},
- {podsPerNode: 95, runLatencyTest: true, kind: api.Kind("ReplicationController")},
- {podsPerNode: 100, runLatencyTest: false, kind: api.Kind("ReplicationController")},
- // Tests for other resource types:
- {podsPerNode: 30, runLatencyTest: true, kind: extensions.Kind("Deployment")},
- {podsPerNode: 30, runLatencyTest: true, kind: batch.Kind("Job")},
- // Test scheduling when daemons are preset
- {podsPerNode: 30, runLatencyTest: true, kind: api.Kind("ReplicationController"), daemonsPerNode: 2},
- // Test with secrets
- {podsPerNode: 30, runLatencyTest: true, kind: extensions.Kind("Deployment"), secretsPerPod: 2},
- // Test with configmaps
- {podsPerNode: 30, runLatencyTest: true, kind: extensions.Kind("Deployment"), configMapsPerPod: 2},
- // Test with service account projected volumes
- {podsPerNode: 30, runLatencyTest: true, kind: extensions.Kind("Deployment"), svcacctTokenProjectionsPerPod: 2},
- // Test with quotas
- {podsPerNode: 30, runLatencyTest: true, kind: api.Kind("ReplicationController"), quotas: true},
- }
- isCanonical := func(test *Density) bool {
- return test.kind == api.Kind("ReplicationController") && test.daemonsPerNode == 0 && test.secretsPerPod == 0 && test.configMapsPerPod == 0 && !test.quotas
- }
- for _, testArg := range densityTests {
- feature := "ManualPerformance"
- switch testArg.podsPerNode {
- case 30:
- if isCanonical(&testArg) {
- feature = "Performance"
- }
- case 95:
- feature = "HighDensityPerformance"
- }
- name := fmt.Sprintf("[Feature:%s] should allow starting %d pods per node using %v with %v secrets, %v configmaps, %v token projections, and %v daemons",
- feature,
- testArg.podsPerNode,
- testArg.kind,
- testArg.secretsPerPod,
- testArg.configMapsPerPod,
- testArg.svcacctTokenProjectionsPerPod,
- testArg.daemonsPerNode,
- )
- if testArg.quotas {
- name += " with quotas"
- }
- itArg := testArg
- It(name, func() {
- nodePrepPhase := testPhaseDurations.StartPhase(100, "node preparation")
- defer nodePrepPhase.End()
- nodePreparer := framework.NewE2ETestNodePreparer(
- f.ClientSet,
- []testutils.CountToStrategy{{Count: nodeCount, Strategy: &testutils.TrivialNodePrepareStrategy{}}},
- )
- framework.ExpectNoError(nodePreparer.PrepareNodes())
- defer nodePreparer.CleanupNodes()
- podsPerNode := itArg.podsPerNode
- if podsPerNode == 30 {
- f.AddonResourceConstraints = func() map[string]framework.ResourceConstraint { return density30AddonResourceVerifier(nodeCount) }()
- }
- totalPods = (podsPerNode - itArg.daemonsPerNode) * nodeCount
- fileHndl, err := os.Create(fmt.Sprintf(framework.TestContext.OutputDir+"/%s/pod_states.csv", uuid))
- framework.ExpectNoError(err)
- defer fileHndl.Close()
- nodePrepPhase.End()
- // nodeCountPerNamespace and CreateNamespaces are defined in load.go
- numberOfCollections := (nodeCount + nodeCountPerNamespace - 1) / nodeCountPerNamespace
- namespaces, err := CreateNamespaces(f, numberOfCollections, fmt.Sprintf("density-%v", testArg.podsPerNode), testPhaseDurations.StartPhase(200, "namespace creation"))
- framework.ExpectNoError(err)
- if itArg.quotas {
- framework.ExpectNoError(CreateQuotas(f, namespaces, totalPods+nodeCount, testPhaseDurations.StartPhase(210, "quota creation")))
- }
- configs := make([]testutils.RunObjectConfig, numberOfCollections)
- secretConfigs := make([]*testutils.SecretConfig, 0, numberOfCollections*itArg.secretsPerPod)
- configMapConfigs := make([]*testutils.ConfigMapConfig, 0, numberOfCollections*itArg.configMapsPerPod)
- // Since all RCs are created at the same time, timeout for each config
- // has to assume that it will be run at the very end.
- podThroughput := 20
- timeout := time.Duration(totalPods/podThroughput) * time.Second
- if timeout < UnreadyNodeToleration {
- timeout = UnreadyNodeToleration
- }
- timeout += 3 * time.Minute
- // createClients is defined in load.go
- clients, scalesClients, err := createClients(numberOfCollections)
- framework.ExpectNoError(err)
- for i := 0; i < numberOfCollections; i++ {
- nsName := namespaces[i].Name
- secretNames := []string{}
- for j := 0; j < itArg.secretsPerPod; j++ {
- secretName := fmt.Sprintf("density-secret-%v-%v", i, j)
- secretConfigs = append(secretConfigs, &testutils.SecretConfig{
- Content: map[string]string{"foo": "bar"},
- Client: clients[i],
- Name: secretName,
- Namespace: nsName,
- LogFunc: e2elog.Logf,
- })
- secretNames = append(secretNames, secretName)
- }
- configMapNames := []string{}
- for j := 0; j < itArg.configMapsPerPod; j++ {
- configMapName := fmt.Sprintf("density-configmap-%v-%v", i, j)
- configMapConfigs = append(configMapConfigs, &testutils.ConfigMapConfig{
- Content: map[string]string{"foo": "bar"},
- Client: clients[i],
- Name: configMapName,
- Namespace: nsName,
- LogFunc: e2elog.Logf,
- })
- configMapNames = append(configMapNames, configMapName)
- }
- name := fmt.Sprintf("density%v-%v-%v", totalPods, i, uuid)
- baseConfig := &testutils.RCConfig{
- Client: clients[i],
- ScalesGetter: scalesClients[i],
- Image: imageutils.GetPauseImageName(),
- Name: name,
- Namespace: nsName,
- Labels: map[string]string{"type": "densityPod"},
- PollInterval: DensityPollInterval,
- Timeout: timeout,
- PodStatusFile: fileHndl,
- Replicas: (totalPods + numberOfCollections - 1) / numberOfCollections,
- CpuRequest: nodeCpuCapacity / 100,
- MemRequest: nodeMemCapacity / 100,
- MaxContainerFailures: &MaxContainerFailures,
- Silent: true,
- LogFunc: e2elog.Logf,
- SecretNames: secretNames,
- ConfigMapNames: configMapNames,
- ServiceAccountTokenProjections: itArg.svcacctTokenProjectionsPerPod,
- Tolerations: []v1.Toleration{
- {
- Key: "node.kubernetes.io/not-ready",
- Operator: v1.TolerationOpExists,
- Effect: v1.TaintEffectNoExecute,
- TolerationSeconds: func(i int64) *int64 { return &i }(int64(UnreadyNodeToleration / time.Second)),
- }, {
- Key: "node.kubernetes.io/unreachable",
- Operator: v1.TolerationOpExists,
- Effect: v1.TaintEffectNoExecute,
- TolerationSeconds: func(i int64) *int64 { return &i }(int64(UnreadyNodeToleration / time.Second)),
- },
- },
- }
- switch itArg.kind {
- case api.Kind("ReplicationController"):
- configs[i] = baseConfig
- case extensions.Kind("ReplicaSet"):
- configs[i] = &testutils.ReplicaSetConfig{RCConfig: *baseConfig}
- case extensions.Kind("Deployment"):
- configs[i] = &testutils.DeploymentConfig{RCConfig: *baseConfig}
- case batch.Kind("Job"):
- configs[i] = &testutils.JobConfig{RCConfig: *baseConfig}
- default:
- framework.Failf("Unsupported kind: %v", itArg.kind)
- }
- }
- // Single client is running out of http2 connections in delete phase, hence we need more.
- clients, scalesClients, err = createClients(2)
- framework.ExpectNoError(err)
- dConfig := DensityTestConfig{
- ClientSets: clients,
- ScaleClients: scalesClients,
- Configs: configs,
- PodCount: totalPods,
- PollInterval: DensityPollInterval,
- kind: itArg.kind,
- SecretConfigs: secretConfigs,
- ConfigMapConfigs: configMapConfigs,
- }
- for i := 0; i < itArg.daemonsPerNode; i++ {
- dConfig.DaemonConfigs = append(dConfig.DaemonConfigs,
- &testutils.DaemonConfig{
- Client: f.ClientSet,
- Name: fmt.Sprintf("density-daemon-%v", i),
- Namespace: f.Namespace.Name,
- LogFunc: e2elog.Logf,
- })
- }
- e2eStartupTime = runDensityTest(dConfig, testPhaseDurations, &scheduleThroughputs)
- defer cleanupDensityTest(dConfig, testPhaseDurations)
- if itArg.runLatencyTest {
- // Pick latencyPodsIterations so that:
- // latencyPodsIterations * nodeCount >= MinPodStartupMeasurements.
- latencyPodsIterations := (MinPodStartupMeasurements + nodeCount - 1) / nodeCount
- By(fmt.Sprintf("Scheduling additional %d Pods to measure startup latencies", latencyPodsIterations*nodeCount))
- createTimes := make(map[string]metav1.Time, 0)
- nodeNames := make(map[string]string, 0)
- scheduleTimes := make(map[string]metav1.Time, 0)
- runTimes := make(map[string]metav1.Time, 0)
- watchTimes := make(map[string]metav1.Time, 0)
- var mutex sync.Mutex
- checkPod := func(p *v1.Pod) {
- mutex.Lock()
- defer mutex.Unlock()
- defer GinkgoRecover()
- if p.Status.Phase == v1.PodRunning {
- if _, found := watchTimes[p.Name]; !found {
- watchTimes[p.Name] = metav1.Now()
- createTimes[p.Name] = p.CreationTimestamp
- nodeNames[p.Name] = p.Spec.NodeName
- var startTime metav1.Time
- for _, cs := range p.Status.ContainerStatuses {
- if cs.State.Running != nil {
- if startTime.Before(&cs.State.Running.StartedAt) {
- startTime = cs.State.Running.StartedAt
- }
- }
- }
- if startTime != metav1.NewTime(time.Time{}) {
- runTimes[p.Name] = startTime
- } else {
- framework.Failf("Pod %v is reported to be running, but none of its containers is", p.Name)
- }
- }
- }
- }
- additionalPodsPrefix = "density-latency-pod"
- stopCh := make(chan struct{})
- latencyPodStores := make([]cache.Store, len(namespaces))
- for i := 0; i < len(namespaces); i++ {
- nsName := namespaces[i].Name
- latencyPodsStore, controller := cache.NewInformer(
- &cache.ListWatch{
- ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
- options.LabelSelector = labels.SelectorFromSet(labels.Set{"type": additionalPodsPrefix}).String()
- obj, err := c.CoreV1().Pods(nsName).List(options)
- return runtime.Object(obj), err
- },
- WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
- options.LabelSelector = labels.SelectorFromSet(labels.Set{"type": additionalPodsPrefix}).String()
- return c.CoreV1().Pods(nsName).Watch(options)
- },
- },
- &v1.Pod{},
- 0,
- cache.ResourceEventHandlerFuncs{
- AddFunc: func(obj interface{}) {
- p, ok := obj.(*v1.Pod)
- if !ok {
- e2elog.Logf("Failed to cast observed object to *v1.Pod.")
- }
- Expect(ok).To(Equal(true))
- go checkPod(p)
- },
- UpdateFunc: func(oldObj, newObj interface{}) {
- p, ok := newObj.(*v1.Pod)
- if !ok {
- e2elog.Logf("Failed to cast observed object to *v1.Pod.")
- }
- Expect(ok).To(Equal(true))
- go checkPod(p)
- },
- },
- )
- latencyPodStores[i] = latencyPodsStore
- go controller.Run(stopCh)
- }
- for latencyPodsIteration := 0; latencyPodsIteration < latencyPodsIterations; latencyPodsIteration++ {
- podIndexOffset := latencyPodsIteration * nodeCount
- e2elog.Logf("Creating %d latency pods in range [%d, %d]", nodeCount, podIndexOffset+1, podIndexOffset+nodeCount)
- watchTimesLen := len(watchTimes)
- // Create some additional pods with throughput ~5 pods/sec.
- latencyPodStartupPhase := testPhaseDurations.StartPhase(800+latencyPodsIteration*10, "latency pods creation")
- defer latencyPodStartupPhase.End()
- var wg sync.WaitGroup
- wg.Add(nodeCount)
- // Explicitly set requests here.
- // Thanks to it we trigger increasing priority function by scheduling
- // a pod to a node, which in turn will result in spreading latency pods
- // more evenly between nodes.
- cpuRequest := *resource.NewMilliQuantity(nodeCpuCapacity/5, resource.DecimalSI)
- memRequest := *resource.NewQuantity(nodeMemCapacity/5, resource.DecimalSI)
- if podsPerNode > 30 {
- // This is to make them schedulable on high-density tests
- // (e.g. 100 pods/node kubemark).
- cpuRequest = *resource.NewMilliQuantity(0, resource.DecimalSI)
- memRequest = *resource.NewQuantity(0, resource.DecimalSI)
- }
- rcNameToNsMap := map[string]string{}
- for i := 1; i <= nodeCount; i++ {
- name := additionalPodsPrefix + "-" + strconv.Itoa(podIndexOffset+i)
- nsName := namespaces[i%len(namespaces)].Name
- rcNameToNsMap[name] = nsName
- go createRunningPodFromRC(&wg, c, name, nsName, imageutils.GetPauseImageName(), additionalPodsPrefix, cpuRequest, memRequest)
- time.Sleep(200 * time.Millisecond)
- }
- wg.Wait()
- latencyPodStartupPhase.End()
- latencyMeasurementPhase := testPhaseDurations.StartPhase(801+latencyPodsIteration*10, "pod startup latencies measurement")
- defer latencyMeasurementPhase.End()
- By("Waiting for all Pods begin observed by the watch...")
- waitTimeout := 10 * time.Minute
- for start := time.Now(); len(watchTimes) < watchTimesLen+nodeCount; time.Sleep(10 * time.Second) {
- if time.Since(start) < waitTimeout {
- framework.Failf("Timeout reached waiting for all Pods being observed by the watch.")
- }
- }
- nodeToLatencyPods := make(map[string]int)
- for i := range latencyPodStores {
- for _, item := range latencyPodStores[i].List() {
- pod := item.(*v1.Pod)
- nodeToLatencyPods[pod.Spec.NodeName]++
- }
- for node, count := range nodeToLatencyPods {
- if count > 1 {
- e2elog.Logf("%d latency pods scheduled on %s", count, node)
- }
- }
- }
- latencyMeasurementPhase.End()
- By("Removing additional replication controllers")
- podDeletionPhase := testPhaseDurations.StartPhase(802+latencyPodsIteration*10, "latency pods deletion")
- defer podDeletionPhase.End()
- deleteRC := func(i int) {
- defer GinkgoRecover()
- name := additionalPodsPrefix + "-" + strconv.Itoa(podIndexOffset+i+1)
- framework.ExpectNoError(framework.DeleteRCAndWaitForGC(c, rcNameToNsMap[name], name))
- }
- workqueue.ParallelizeUntil(context.TODO(), 25, nodeCount, deleteRC)
- podDeletionPhase.End()
- }
- close(stopCh)
- for i := 0; i < len(namespaces); i++ {
- nsName := namespaces[i].Name
- selector := fields.Set{
- "involvedObject.kind": "Pod",
- "involvedObject.namespace": nsName,
- "source": v1.DefaultSchedulerName,
- }.AsSelector().String()
- options := metav1.ListOptions{FieldSelector: selector}
- schedEvents, err := c.CoreV1().Events(nsName).List(options)
- framework.ExpectNoError(err)
- for k := range createTimes {
- for _, event := range schedEvents.Items {
- if event.InvolvedObject.Name == k {
- scheduleTimes[k] = event.FirstTimestamp
- break
- }
- }
- }
- }
- scheduleLag := make([]framework.PodLatencyData, 0)
- startupLag := make([]framework.PodLatencyData, 0)
- watchLag := make([]framework.PodLatencyData, 0)
- schedToWatchLag := make([]framework.PodLatencyData, 0)
- e2eLag := make([]framework.PodLatencyData, 0)
- for name, create := range createTimes {
- sched, ok := scheduleTimes[name]
- if !ok {
- e2elog.Logf("Failed to find schedule time for %v", name)
- missingMeasurements++
- }
- run, ok := runTimes[name]
- if !ok {
- e2elog.Logf("Failed to find run time for %v", name)
- missingMeasurements++
- }
- watch, ok := watchTimes[name]
- if !ok {
- e2elog.Logf("Failed to find watch time for %v", name)
- missingMeasurements++
- }
- node, ok := nodeNames[name]
- if !ok {
- e2elog.Logf("Failed to find node for %v", name)
- missingMeasurements++
- }
- scheduleLag = append(scheduleLag, framework.PodLatencyData{Name: name, Node: node, Latency: sched.Time.Sub(create.Time)})
- startupLag = append(startupLag, framework.PodLatencyData{Name: name, Node: node, Latency: run.Time.Sub(sched.Time)})
- watchLag = append(watchLag, framework.PodLatencyData{Name: name, Node: node, Latency: watch.Time.Sub(run.Time)})
- schedToWatchLag = append(schedToWatchLag, framework.PodLatencyData{Name: name, Node: node, Latency: watch.Time.Sub(sched.Time)})
- e2eLag = append(e2eLag, framework.PodLatencyData{Name: name, Node: node, Latency: watch.Time.Sub(create.Time)})
- }
- sort.Sort(framework.LatencySlice(scheduleLag))
- sort.Sort(framework.LatencySlice(startupLag))
- sort.Sort(framework.LatencySlice(watchLag))
- sort.Sort(framework.LatencySlice(schedToWatchLag))
- sort.Sort(framework.LatencySlice(e2eLag))
- framework.PrintLatencies(scheduleLag, "worst create-to-schedule latencies")
- framework.PrintLatencies(startupLag, "worst schedule-to-run latencies")
- framework.PrintLatencies(watchLag, "worst run-to-watch latencies")
- framework.PrintLatencies(schedToWatchLag, "worst schedule-to-watch latencies")
- framework.PrintLatencies(e2eLag, "worst e2e latencies")
- // Capture latency metrics related to pod-startup.
- podStartupLatency := &framework.PodStartupLatency{
- CreateToScheduleLatency: framework.ExtractLatencyMetrics(scheduleLag),
- ScheduleToRunLatency: framework.ExtractLatencyMetrics(startupLag),
- RunToWatchLatency: framework.ExtractLatencyMetrics(watchLag),
- ScheduleToWatchLatency: framework.ExtractLatencyMetrics(schedToWatchLag),
- E2ELatency: framework.ExtractLatencyMetrics(e2eLag),
- }
- f.TestSummaries = append(f.TestSummaries, podStartupLatency)
- // Test whether e2e pod startup time is acceptable.
- podStartupLatencyThreshold := framework.LatencyMetric{
- Perc50: PodStartupLatencyThreshold,
- Perc90: PodStartupLatencyThreshold,
- Perc99: PodStartupLatencyThreshold,
- }
- framework.ExpectNoError(framework.VerifyLatencyWithinThreshold(podStartupLatencyThreshold, podStartupLatency.E2ELatency, "pod startup"))
- framework.LogSuspiciousLatency(startupLag, e2eLag, nodeCount, c)
- }
- })
- }
- })
- func createRunningPodFromRC(wg *sync.WaitGroup, c clientset.Interface, name, ns, image, podType string, cpuRequest, memRequest resource.Quantity) {
- defer GinkgoRecover()
- defer wg.Done()
- labels := map[string]string{
- "type": podType,
- "name": name,
- }
- rc := &v1.ReplicationController{
- ObjectMeta: metav1.ObjectMeta{
- Name: name,
- Labels: labels,
- },
- Spec: v1.ReplicationControllerSpec{
- Replicas: func(i int) *int32 { x := int32(i); return &x }(1),
- Selector: labels,
- Template: &v1.PodTemplateSpec{
- ObjectMeta: metav1.ObjectMeta{
- Labels: labels,
- },
- Spec: v1.PodSpec{
- Containers: []v1.Container{
- {
- Name: name,
- Image: image,
- Resources: v1.ResourceRequirements{
- Requests: v1.ResourceList{
- v1.ResourceCPU: cpuRequest,
- v1.ResourceMemory: memRequest,
- },
- },
- },
- },
- DNSPolicy: v1.DNSDefault,
- },
- },
- },
- }
- framework.ExpectNoError(testutils.CreateRCWithRetries(c, ns, rc))
- framework.ExpectNoError(framework.WaitForControlledPodsRunning(c, ns, name, api.Kind("ReplicationController")))
- e2elog.Logf("Found pod '%s' running", name)
- }
|