cluster_autoscaler_scalability.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package autoscaling
  14. import (
  15. "context"
  16. "encoding/json"
  17. "fmt"
  18. "math"
  19. "strings"
  20. "time"
  21. "k8s.io/api/core/v1"
  22. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  23. "k8s.io/apimachinery/pkg/fields"
  24. "k8s.io/apimachinery/pkg/types"
  25. "k8s.io/apimachinery/pkg/util/strategicpatch"
  26. clientset "k8s.io/client-go/kubernetes"
  27. "k8s.io/klog"
  28. "k8s.io/kubernetes/test/e2e/framework"
  29. e2enode "k8s.io/kubernetes/test/e2e/framework/node"
  30. e2erc "k8s.io/kubernetes/test/e2e/framework/rc"
  31. e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
  32. testutils "k8s.io/kubernetes/test/utils"
  33. imageutils "k8s.io/kubernetes/test/utils/image"
  34. "github.com/onsi/ginkgo"
  35. )
  36. const (
  37. memoryReservationTimeout = 5 * time.Minute
  38. largeResizeTimeout = 8 * time.Minute
  39. largeScaleUpTimeout = 10 * time.Minute
  40. maxNodes = 1000
  41. )
  42. type clusterPredicates struct {
  43. nodes int
  44. }
  45. type scaleUpTestConfig struct {
  46. initialNodes int
  47. initialPods int
  48. extraPods *testutils.RCConfig
  49. expectedResult *clusterPredicates
  50. }
  51. var _ = framework.KubeDescribe("Cluster size autoscaler scalability [Slow]", func() {
  52. f := framework.NewDefaultFramework("autoscaling")
  53. var c clientset.Interface
  54. var nodeCount int
  55. var coresPerNode int
  56. var memCapacityMb int
  57. var originalSizes map[string]int
  58. var sum int
  59. ginkgo.BeforeEach(func() {
  60. e2eskipper.SkipUnlessProviderIs("gce", "gke", "kubemark")
  61. // Check if Cloud Autoscaler is enabled by trying to get its ConfigMap.
  62. _, err := f.ClientSet.CoreV1().ConfigMaps("kube-system").Get(context.TODO(), "cluster-autoscaler-status", metav1.GetOptions{})
  63. if err != nil {
  64. e2eskipper.Skipf("test expects Cluster Autoscaler to be enabled")
  65. }
  66. c = f.ClientSet
  67. if originalSizes == nil {
  68. originalSizes = make(map[string]int)
  69. sum = 0
  70. for _, mig := range strings.Split(framework.TestContext.CloudConfig.NodeInstanceGroup, ",") {
  71. size, err := framework.GroupSize(mig)
  72. framework.ExpectNoError(err)
  73. ginkgo.By(fmt.Sprintf("Initial size of %s: %d", mig, size))
  74. originalSizes[mig] = size
  75. sum += size
  76. }
  77. }
  78. framework.ExpectNoError(e2enode.WaitForReadyNodes(c, sum, scaleUpTimeout))
  79. nodes, err := e2enode.GetReadySchedulableNodes(f.ClientSet)
  80. framework.ExpectNoError(err)
  81. nodeCount = len(nodes.Items)
  82. cpu := nodes.Items[0].Status.Capacity[v1.ResourceCPU]
  83. mem := nodes.Items[0].Status.Capacity[v1.ResourceMemory]
  84. coresPerNode = int((&cpu).MilliValue() / 1000)
  85. memCapacityMb = int((&mem).Value() / 1024 / 1024)
  86. framework.ExpectEqual(nodeCount, sum)
  87. if framework.ProviderIs("gke") {
  88. val, err := isAutoscalerEnabled(3)
  89. framework.ExpectNoError(err)
  90. if !val {
  91. err = enableAutoscaler("default-pool", 3, 5)
  92. framework.ExpectNoError(err)
  93. }
  94. }
  95. })
  96. ginkgo.AfterEach(func() {
  97. ginkgo.By(fmt.Sprintf("Restoring initial size of the cluster"))
  98. setMigSizes(originalSizes)
  99. framework.ExpectNoError(e2enode.WaitForReadyNodes(c, nodeCount, scaleDownTimeout))
  100. nodes, err := c.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
  101. framework.ExpectNoError(err)
  102. s := time.Now()
  103. makeSchedulableLoop:
  104. for start := time.Now(); time.Since(start) < makeSchedulableTimeout; time.Sleep(makeSchedulableDelay) {
  105. for _, n := range nodes.Items {
  106. err = makeNodeSchedulable(c, &n, true)
  107. switch err.(type) {
  108. case CriticalAddonsOnlyError:
  109. continue makeSchedulableLoop
  110. default:
  111. framework.ExpectNoError(err)
  112. }
  113. }
  114. break
  115. }
  116. klog.Infof("Made nodes schedulable again in %v", time.Since(s).String())
  117. })
  118. ginkgo.It("should scale up at all [Feature:ClusterAutoscalerScalability1]", func() {
  119. perNodeReservation := int(float64(memCapacityMb) * 0.95)
  120. replicasPerNode := 10
  121. additionalNodes := maxNodes - nodeCount
  122. replicas := additionalNodes * replicasPerNode
  123. additionalReservation := additionalNodes * perNodeReservation
  124. // saturate cluster
  125. reservationCleanup := ReserveMemory(f, "some-pod", nodeCount*2, nodeCount*perNodeReservation, true, memoryReservationTimeout)
  126. defer reservationCleanup()
  127. framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, c))
  128. // configure pending pods & expected scale up
  129. rcConfig := reserveMemoryRCConfig(f, "extra-pod-1", replicas, additionalReservation, largeScaleUpTimeout)
  130. expectedResult := createClusterPredicates(nodeCount + additionalNodes)
  131. config := createScaleUpTestConfig(nodeCount, nodeCount, rcConfig, expectedResult)
  132. // run test
  133. testCleanup := simpleScaleUpTest(f, config)
  134. defer testCleanup()
  135. })
  136. ginkgo.It("should scale up twice [Feature:ClusterAutoscalerScalability2]", func() {
  137. perNodeReservation := int(float64(memCapacityMb) * 0.95)
  138. replicasPerNode := 10
  139. additionalNodes1 := int(math.Ceil(0.7 * maxNodes))
  140. additionalNodes2 := int(math.Ceil(0.25 * maxNodes))
  141. if additionalNodes1+additionalNodes2 > maxNodes {
  142. additionalNodes2 = maxNodes - additionalNodes1
  143. }
  144. replicas1 := additionalNodes1 * replicasPerNode
  145. replicas2 := additionalNodes2 * replicasPerNode
  146. klog.Infof("cores per node: %v", coresPerNode)
  147. // saturate cluster
  148. initialReplicas := nodeCount
  149. reservationCleanup := ReserveMemory(f, "some-pod", initialReplicas, nodeCount*perNodeReservation, true, memoryReservationTimeout)
  150. defer reservationCleanup()
  151. framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, c))
  152. klog.Infof("Reserved successfully")
  153. // configure pending pods & expected scale up #1
  154. rcConfig := reserveMemoryRCConfig(f, "extra-pod-1", replicas1, additionalNodes1*perNodeReservation, largeScaleUpTimeout)
  155. expectedResult := createClusterPredicates(nodeCount + additionalNodes1)
  156. config := createScaleUpTestConfig(nodeCount, nodeCount, rcConfig, expectedResult)
  157. // run test #1
  158. tolerateUnreadyNodes := additionalNodes1 / 20
  159. tolerateUnreadyPods := (initialReplicas + replicas1) / 20
  160. testCleanup1 := simpleScaleUpTestWithTolerance(f, config, tolerateUnreadyNodes, tolerateUnreadyPods)
  161. defer testCleanup1()
  162. klog.Infof("Scaled up once")
  163. // configure pending pods & expected scale up #2
  164. rcConfig2 := reserveMemoryRCConfig(f, "extra-pod-2", replicas2, additionalNodes2*perNodeReservation, largeScaleUpTimeout)
  165. expectedResult2 := createClusterPredicates(nodeCount + additionalNodes1 + additionalNodes2)
  166. config2 := createScaleUpTestConfig(nodeCount+additionalNodes1, nodeCount+additionalNodes2, rcConfig2, expectedResult2)
  167. // run test #2
  168. tolerateUnreadyNodes = maxNodes / 20
  169. tolerateUnreadyPods = (initialReplicas + replicas1 + replicas2) / 20
  170. testCleanup2 := simpleScaleUpTestWithTolerance(f, config2, tolerateUnreadyNodes, tolerateUnreadyPods)
  171. defer testCleanup2()
  172. klog.Infof("Scaled up twice")
  173. })
  174. ginkgo.It("should scale down empty nodes [Feature:ClusterAutoscalerScalability3]", func() {
  175. perNodeReservation := int(float64(memCapacityMb) * 0.7)
  176. replicas := int(math.Ceil(maxNodes * 0.7))
  177. totalNodes := maxNodes
  178. // resize cluster to totalNodes
  179. newSizes := map[string]int{
  180. anyKey(originalSizes): totalNodes,
  181. }
  182. setMigSizes(newSizes)
  183. framework.ExpectNoError(e2enode.WaitForReadyNodes(f.ClientSet, totalNodes, largeResizeTimeout))
  184. // run replicas
  185. rcConfig := reserveMemoryRCConfig(f, "some-pod", replicas, replicas*perNodeReservation, largeScaleUpTimeout)
  186. expectedResult := createClusterPredicates(totalNodes)
  187. config := createScaleUpTestConfig(totalNodes, totalNodes, rcConfig, expectedResult)
  188. tolerateUnreadyNodes := totalNodes / 10
  189. tolerateUnreadyPods := replicas / 10
  190. testCleanup := simpleScaleUpTestWithTolerance(f, config, tolerateUnreadyNodes, tolerateUnreadyPods)
  191. defer testCleanup()
  192. // check if empty nodes are scaled down
  193. framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet,
  194. func(size int) bool {
  195. return size <= replicas+3 // leaving space for non-evictable kube-system pods
  196. }, scaleDownTimeout))
  197. })
  198. ginkgo.It("should scale down underutilized nodes [Feature:ClusterAutoscalerScalability4]", func() {
  199. perPodReservation := int(float64(memCapacityMb) * 0.01)
  200. // underutilizedNodes are 10% full
  201. underutilizedPerNodeReplicas := 10
  202. // fullNodes are 70% full
  203. fullPerNodeReplicas := 70
  204. totalNodes := maxNodes
  205. underutilizedRatio := 0.3
  206. maxDelta := 30
  207. // resize cluster to totalNodes
  208. newSizes := map[string]int{
  209. anyKey(originalSizes): totalNodes,
  210. }
  211. setMigSizes(newSizes)
  212. framework.ExpectNoError(e2enode.WaitForReadyNodes(f.ClientSet, totalNodes, largeResizeTimeout))
  213. // annotate all nodes with no-scale-down
  214. ScaleDownDisabledKey := "cluster-autoscaler.kubernetes.io/scale-down-disabled"
  215. nodes, err := f.ClientSet.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{
  216. FieldSelector: fields.Set{
  217. "spec.unschedulable": "false",
  218. }.AsSelector().String(),
  219. })
  220. framework.ExpectNoError(err)
  221. framework.ExpectNoError(addAnnotation(f, nodes.Items, ScaleDownDisabledKey, "true"))
  222. // distribute pods using replication controllers taking up space that should
  223. // be empty after pods are distributed
  224. underutilizedNodesNum := int(float64(maxNodes) * underutilizedRatio)
  225. fullNodesNum := totalNodes - underutilizedNodesNum
  226. podDistribution := []podBatch{
  227. {numNodes: fullNodesNum, podsPerNode: fullPerNodeReplicas},
  228. {numNodes: underutilizedNodesNum, podsPerNode: underutilizedPerNodeReplicas}}
  229. cleanup := distributeLoad(f, f.Namespace.Name, "10-70", podDistribution, perPodReservation,
  230. int(0.95*float64(memCapacityMb)), map[string]string{}, largeScaleUpTimeout)
  231. defer cleanup()
  232. // enable scale down again
  233. framework.ExpectNoError(addAnnotation(f, nodes.Items, ScaleDownDisabledKey, "false"))
  234. // wait for scale down to start. Node deletion takes a long time, so we just
  235. // wait for maximum of 30 nodes deleted
  236. nodesToScaleDownCount := int(float64(totalNodes) * 0.1)
  237. if nodesToScaleDownCount > maxDelta {
  238. nodesToScaleDownCount = maxDelta
  239. }
  240. expectedSize := totalNodes - nodesToScaleDownCount
  241. timeout := time.Duration(nodesToScaleDownCount)*time.Minute + scaleDownTimeout
  242. framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet, func(size int) bool {
  243. return size <= expectedSize
  244. }, timeout))
  245. })
  246. ginkgo.It("shouldn't scale down with underutilized nodes due to host port conflicts [Feature:ClusterAutoscalerScalability5]", func() {
  247. fullReservation := int(float64(memCapacityMb) * 0.9)
  248. hostPortPodReservation := int(float64(memCapacityMb) * 0.3)
  249. totalNodes := maxNodes
  250. reservedPort := 4321
  251. // resize cluster to totalNodes
  252. newSizes := map[string]int{
  253. anyKey(originalSizes): totalNodes,
  254. }
  255. setMigSizes(newSizes)
  256. framework.ExpectNoError(e2enode.WaitForReadyNodes(f.ClientSet, totalNodes, largeResizeTimeout))
  257. divider := int(float64(totalNodes) * 0.7)
  258. fullNodesCount := divider
  259. underutilizedNodesCount := totalNodes - fullNodesCount
  260. ginkgo.By("Reserving full nodes")
  261. // run RC1 w/o host port
  262. cleanup := ReserveMemory(f, "filling-pod", fullNodesCount, fullNodesCount*fullReservation, true, largeScaleUpTimeout*2)
  263. defer cleanup()
  264. ginkgo.By("Reserving host ports on remaining nodes")
  265. // run RC2 w/ host port
  266. cleanup2 := createHostPortPodsWithMemory(f, "underutilizing-host-port-pod", underutilizedNodesCount, reservedPort, underutilizedNodesCount*hostPortPodReservation, largeScaleUpTimeout)
  267. defer cleanup2()
  268. waitForAllCaPodsReadyInNamespace(f, c)
  269. // wait and check scale down doesn't occur
  270. ginkgo.By(fmt.Sprintf("Sleeping %v minutes...", scaleDownTimeout.Minutes()))
  271. time.Sleep(scaleDownTimeout)
  272. ginkgo.By("Checking if the number of nodes is as expected")
  273. nodes, err := e2enode.GetReadySchedulableNodes(f.ClientSet)
  274. framework.ExpectNoError(err)
  275. klog.Infof("Nodes: %v, expected: %v", len(nodes.Items), totalNodes)
  276. framework.ExpectEqual(len(nodes.Items), totalNodes)
  277. })
  278. ginkgo.Specify("CA ignores unschedulable pods while scheduling schedulable pods [Feature:ClusterAutoscalerScalability6]", func() {
  279. // Start a number of pods saturating existing nodes.
  280. perNodeReservation := int(float64(memCapacityMb) * 0.80)
  281. replicasPerNode := 10
  282. initialPodReplicas := nodeCount * replicasPerNode
  283. initialPodsTotalMemory := nodeCount * perNodeReservation
  284. reservationCleanup := ReserveMemory(f, "initial-pod", initialPodReplicas, initialPodsTotalMemory, true /* wait for pods to run */, memoryReservationTimeout)
  285. defer reservationCleanup()
  286. framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, c))
  287. // Configure a number of unschedulable pods.
  288. unschedulableMemReservation := memCapacityMb * 2
  289. unschedulablePodReplicas := 1000
  290. totalMemReservation := unschedulableMemReservation * unschedulablePodReplicas
  291. timeToWait := 5 * time.Minute
  292. podsConfig := reserveMemoryRCConfig(f, "unschedulable-pod", unschedulablePodReplicas, totalMemReservation, timeToWait)
  293. e2erc.RunRC(*podsConfig) // Ignore error (it will occur because pods are unschedulable)
  294. defer e2erc.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, podsConfig.Name)
  295. // Ensure that no new nodes have been added so far.
  296. readyNodeCount, _ := e2enode.TotalReady(f.ClientSet)
  297. framework.ExpectEqual(readyNodeCount, nodeCount)
  298. // Start a number of schedulable pods to ensure CA reacts.
  299. additionalNodes := maxNodes - nodeCount
  300. replicas := additionalNodes * replicasPerNode
  301. totalMemory := additionalNodes * perNodeReservation
  302. rcConfig := reserveMemoryRCConfig(f, "extra-pod", replicas, totalMemory, largeScaleUpTimeout)
  303. expectedResult := createClusterPredicates(nodeCount + additionalNodes)
  304. config := createScaleUpTestConfig(nodeCount, initialPodReplicas, rcConfig, expectedResult)
  305. // Test that scale up happens, allowing 1000 unschedulable pods not to be scheduled.
  306. testCleanup := simpleScaleUpTestWithTolerance(f, config, 0, unschedulablePodReplicas)
  307. defer testCleanup()
  308. })
  309. })
  310. func anyKey(input map[string]int) string {
  311. for k := range input {
  312. return k
  313. }
  314. return ""
  315. }
  316. func simpleScaleUpTestWithTolerance(f *framework.Framework, config *scaleUpTestConfig, tolerateMissingNodeCount int, tolerateMissingPodCount int) func() error {
  317. // resize cluster to start size
  318. // run rc based on config
  319. ginkgo.By(fmt.Sprintf("Running RC %v from config", config.extraPods.Name))
  320. start := time.Now()
  321. framework.ExpectNoError(e2erc.RunRC(*config.extraPods))
  322. // check results
  323. if tolerateMissingNodeCount > 0 {
  324. // Tolerate some number of nodes not to be created.
  325. minExpectedNodeCount := config.expectedResult.nodes - tolerateMissingNodeCount
  326. framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet,
  327. func(size int) bool { return size >= minExpectedNodeCount }, scaleUpTimeout))
  328. } else {
  329. framework.ExpectNoError(e2enode.WaitForReadyNodes(f.ClientSet, config.expectedResult.nodes, scaleUpTimeout))
  330. }
  331. klog.Infof("cluster is increased")
  332. if tolerateMissingPodCount > 0 {
  333. framework.ExpectNoError(waitForCaPodsReadyInNamespace(f, f.ClientSet, tolerateMissingPodCount))
  334. } else {
  335. framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, f.ClientSet))
  336. }
  337. timeTrack(start, fmt.Sprintf("Scale up to %v", config.expectedResult.nodes))
  338. return func() error {
  339. return e2erc.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, config.extraPods.Name)
  340. }
  341. }
  342. func simpleScaleUpTest(f *framework.Framework, config *scaleUpTestConfig) func() error {
  343. return simpleScaleUpTestWithTolerance(f, config, 0, 0)
  344. }
  345. func reserveMemoryRCConfig(f *framework.Framework, id string, replicas, megabytes int, timeout time.Duration) *testutils.RCConfig {
  346. return &testutils.RCConfig{
  347. Client: f.ClientSet,
  348. Name: id,
  349. Namespace: f.Namespace.Name,
  350. Timeout: timeout,
  351. Image: imageutils.GetPauseImageName(),
  352. Replicas: replicas,
  353. MemRequest: int64(1024 * 1024 * megabytes / replicas),
  354. }
  355. }
  356. func createScaleUpTestConfig(nodes, pods int, extraPods *testutils.RCConfig, expectedResult *clusterPredicates) *scaleUpTestConfig {
  357. return &scaleUpTestConfig{
  358. initialNodes: nodes,
  359. initialPods: pods,
  360. extraPods: extraPods,
  361. expectedResult: expectedResult,
  362. }
  363. }
  364. func createClusterPredicates(nodes int) *clusterPredicates {
  365. return &clusterPredicates{
  366. nodes: nodes,
  367. }
  368. }
  369. func addAnnotation(f *framework.Framework, nodes []v1.Node, key, value string) error {
  370. for _, node := range nodes {
  371. oldData, err := json.Marshal(node)
  372. if err != nil {
  373. return err
  374. }
  375. if node.Annotations == nil {
  376. node.Annotations = make(map[string]string)
  377. }
  378. node.Annotations[key] = value
  379. newData, err := json.Marshal(node)
  380. if err != nil {
  381. return err
  382. }
  383. patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Node{})
  384. if err != nil {
  385. return err
  386. }
  387. _, err = f.ClientSet.CoreV1().Nodes().Patch(context.TODO(), string(node.Name), types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
  388. if err != nil {
  389. return err
  390. }
  391. }
  392. return nil
  393. }
  394. func createHostPortPodsWithMemory(f *framework.Framework, id string, replicas, port, megabytes int, timeout time.Duration) func() error {
  395. ginkgo.By(fmt.Sprintf("Running RC which reserves host port and memory"))
  396. request := int64(1024 * 1024 * megabytes / replicas)
  397. config := &testutils.RCConfig{
  398. Client: f.ClientSet,
  399. Name: id,
  400. Namespace: f.Namespace.Name,
  401. Timeout: timeout,
  402. Image: imageutils.GetPauseImageName(),
  403. Replicas: replicas,
  404. HostPorts: map[string]int{"port1": port},
  405. MemRequest: request,
  406. }
  407. err := e2erc.RunRC(*config)
  408. framework.ExpectNoError(err)
  409. return func() error {
  410. return e2erc.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, id)
  411. }
  412. }
  413. type podBatch struct {
  414. numNodes int
  415. podsPerNode int
  416. }
  417. // distributeLoad distributes the pods in the way described by podDostribution,
  418. // assuming all pods will have the same memory reservation and all nodes the same
  419. // memory capacity. This allows us generate the load on the cluster in the exact
  420. // way that we want.
  421. //
  422. // To achieve this we do the following:
  423. // 1. Create replication controllers that eat up all the space that should be
  424. // empty after setup, making sure they end up on different nodes by specifying
  425. // conflicting host port
  426. // 2. Create targer RC that will generate the load on the cluster
  427. // 3. Remove the rcs created in 1.
  428. func distributeLoad(f *framework.Framework, namespace string, id string, podDistribution []podBatch,
  429. podMemRequestMegabytes int, nodeMemCapacity int, labels map[string]string, timeout time.Duration) func() error {
  430. port := 8013
  431. // Create load-distribution RCs with one pod per node, reserving all remaining
  432. // memory to force the distribution of pods for the target RCs.
  433. // The load-distribution RCs will be deleted on function return.
  434. totalPods := 0
  435. for i, podBatch := range podDistribution {
  436. totalPods += podBatch.numNodes * podBatch.podsPerNode
  437. remainingMem := nodeMemCapacity - podBatch.podsPerNode*podMemRequestMegabytes
  438. replicas := podBatch.numNodes
  439. cleanup := createHostPortPodsWithMemory(f, fmt.Sprintf("load-distribution%d", i), replicas, port, remainingMem*replicas, timeout)
  440. defer cleanup()
  441. }
  442. framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, f.ClientSet))
  443. // Create the target RC
  444. rcConfig := reserveMemoryRCConfig(f, id, totalPods, totalPods*podMemRequestMegabytes, timeout)
  445. framework.ExpectNoError(e2erc.RunRC(*rcConfig))
  446. framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, f.ClientSet))
  447. return func() error {
  448. return e2erc.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, id)
  449. }
  450. }
  451. func timeTrack(start time.Time, name string) {
  452. elapsed := time.Since(start)
  453. klog.Infof("%s took %s", name, elapsed)
  454. }