scheduler_test.go 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package benchmark
  14. import (
  15. "context"
  16. "fmt"
  17. "math"
  18. "strconv"
  19. "sync/atomic"
  20. "testing"
  21. "time"
  22. v1 "k8s.io/api/core/v1"
  23. "k8s.io/apimachinery/pkg/api/resource"
  24. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  25. coreinformers "k8s.io/client-go/informers/core/v1"
  26. clientset "k8s.io/client-go/kubernetes"
  27. "k8s.io/client-go/tools/cache"
  28. testutils "k8s.io/kubernetes/test/utils"
  29. "k8s.io/klog"
  30. )
  31. const (
  32. warning3K = 100
  33. threshold3K = 30
  34. )
  35. var (
  36. basePodTemplate = &v1.Pod{
  37. ObjectMeta: metav1.ObjectMeta{
  38. GenerateName: "sched-perf-pod-",
  39. },
  40. // TODO: this needs to be configurable.
  41. Spec: testutils.MakePodSpec(),
  42. }
  43. baseNodeTemplate = &v1.Node{
  44. ObjectMeta: metav1.ObjectMeta{
  45. GenerateName: "sample-node-",
  46. },
  47. Status: v1.NodeStatus{
  48. Capacity: v1.ResourceList{
  49. v1.ResourcePods: *resource.NewQuantity(110, resource.DecimalSI),
  50. v1.ResourceCPU: resource.MustParse("4"),
  51. v1.ResourceMemory: resource.MustParse("32Gi"),
  52. },
  53. Phase: v1.NodeRunning,
  54. Conditions: []v1.NodeCondition{
  55. {Type: v1.NodeReady, Status: v1.ConditionTrue},
  56. },
  57. },
  58. }
  59. )
  60. // TestSchedule100Node3KPods schedules 3k pods on 100 nodes.
  61. func TestSchedule100Node3KPods(t *testing.T) {
  62. if testing.Short() {
  63. t.Skip("Skipping because we want to run short tests")
  64. }
  65. config := getBaseConfig(100, 3000)
  66. err := writePodAndNodeTopologyToConfig(config)
  67. if err != nil {
  68. t.Errorf("Misconfiguration happened for nodes/pods chosen to have predicates and priorities")
  69. }
  70. min := schedulePods(config)
  71. if min < threshold3K {
  72. t.Errorf("Failing: Scheduling rate was too low for an interval, we saw rate of %v, which is the allowed minimum of %v ! ", min, threshold3K)
  73. } else if min < warning3K {
  74. fmt.Printf("Warning: pod scheduling throughput for 3k pods was slow for an interval... Saw an interval with very low (%v) scheduling rate!", min)
  75. } else {
  76. fmt.Printf("Minimal observed throughput for 3k pod test: %v\n", min)
  77. }
  78. }
  79. // TestSchedule2000Node60KPods schedules 60k pods on 2000 nodes.
  80. // This test won't fit in normal 10 minutes time window.
  81. // func TestSchedule2000Node60KPods(t *testing.T) {
  82. // if testing.Short() {
  83. // t.Skip("Skipping because we want to run short tests")
  84. // }
  85. // config := defaultSchedulerBenchmarkConfig(2000, 60000)
  86. // if min := schedulePods(config); min < threshold60K {
  87. // t.Errorf("Too small pod scheduling throughput for 60k pods. Expected %v got %v", threshold60K, min)
  88. // } else {
  89. // fmt.Printf("Minimal observed throughput for 60k pod test: %v\n", min)
  90. // }
  91. // }
  92. // testConfig contains the some input parameters needed for running test-suite
  93. type testConfig struct {
  94. numPods int
  95. numNodes int
  96. mutatedNodeTemplate *v1.Node
  97. mutatedPodTemplate *v1.Pod
  98. clientset clientset.Interface
  99. podInformer coreinformers.PodInformer
  100. destroyFunc func()
  101. }
  102. // getBaseConfig returns baseConfig after initializing number of nodes and pods.
  103. func getBaseConfig(nodes int, pods int) *testConfig {
  104. destroyFunc, podInformer, clientset := mustSetupScheduler()
  105. return &testConfig{
  106. clientset: clientset,
  107. destroyFunc: destroyFunc,
  108. numNodes: nodes,
  109. numPods: pods,
  110. podInformer: podInformer,
  111. }
  112. }
  113. // schedulePods schedules specific number of pods on specific number of nodes.
  114. // This is used to learn the scheduling throughput on various
  115. // sizes of cluster and changes as more and more pods are scheduled.
  116. // It won't stop until all pods are scheduled.
  117. // It returns the minimum of throughput over whole run.
  118. func schedulePods(config *testConfig) int32 {
  119. defer config.destroyFunc()
  120. prev := int32(0)
  121. // On startup there may be a latent period where NO scheduling occurs (qps = 0).
  122. // We are interested in low scheduling rates (i.e. qps=2),
  123. minQPS := int32(math.MaxInt32)
  124. start := time.Now()
  125. // Bake in time for the first pod scheduling event.
  126. for {
  127. time.Sleep(50 * time.Millisecond)
  128. scheduled, err := getScheduledPods(config.podInformer)
  129. if err != nil {
  130. klog.Fatalf("%v", err)
  131. }
  132. // 30,000 pods -> wait till @ least 300 are scheduled to start measuring.
  133. // TODO Find out why sometimes there may be scheduling blips in the beginning.
  134. if len(scheduled) > config.numPods/100 {
  135. break
  136. }
  137. }
  138. scheduled := int32(0)
  139. ctx, cancel := context.WithCancel(context.Background())
  140. config.podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  141. UpdateFunc: func(old, cur interface{}) {
  142. curPod := cur.(*v1.Pod)
  143. oldPod := old.(*v1.Pod)
  144. if len(oldPod.Spec.NodeName) == 0 && len(curPod.Spec.NodeName) > 0 {
  145. if atomic.AddInt32(&scheduled, 1) >= int32(config.numPods) {
  146. cancel()
  147. }
  148. }
  149. },
  150. })
  151. // map minimum QPS entries in a counter, useful for debugging tests.
  152. qpsStats := map[int32]int{}
  153. ticker := time.NewTicker(1 * time.Second)
  154. go func() {
  155. for {
  156. select {
  157. case <-ticker.C:
  158. scheduled := atomic.LoadInt32(&scheduled)
  159. qps := scheduled - prev
  160. qpsStats[qps]++
  161. if qps < minQPS {
  162. minQPS = qps
  163. }
  164. fmt.Printf("%ds\trate: %d\ttotal: %d (qps frequency: %v)\n", time.Since(start)/time.Second, qps, scheduled, qpsStats)
  165. prev = scheduled
  166. case <-ctx.Done():
  167. return
  168. }
  169. }
  170. }()
  171. <-ctx.Done()
  172. ticker.Stop()
  173. // We will be completed when all pods are done being scheduled.
  174. // return the worst-case-scenario interval that was seen during this time.
  175. // Note this should never be low due to cold-start, so allow bake in sched time if necessary.
  176. consumed := int(time.Since(start) / time.Second)
  177. if consumed <= 0 {
  178. consumed = 1
  179. }
  180. fmt.Printf("Scheduled %v Pods in %v seconds (%v per second on average). min QPS was %v\n",
  181. config.numPods, consumed, config.numPods/consumed, minQPS)
  182. return minQPS
  183. }
  184. // mutateNodeTemplate returns the modified node needed for creation of nodes.
  185. func (na nodeAffinity) mutateNodeTemplate(node *v1.Node) {
  186. labels := make(map[string]string)
  187. for i := 0; i < na.LabelCount; i++ {
  188. value := strconv.Itoa(i)
  189. key := na.nodeAffinityKey + value
  190. labels[key] = value
  191. }
  192. node.ObjectMeta.Labels = labels
  193. return
  194. }
  195. // mutatePodTemplate returns the modified pod template after applying mutations.
  196. func (na nodeAffinity) mutatePodTemplate(pod *v1.Pod) {
  197. var nodeSelectorRequirements []v1.NodeSelectorRequirement
  198. for i := 0; i < na.LabelCount; i++ {
  199. value := strconv.Itoa(i)
  200. key := na.nodeAffinityKey + value
  201. nodeSelector := v1.NodeSelectorRequirement{Key: key, Values: []string{value}, Operator: v1.NodeSelectorOpIn}
  202. nodeSelectorRequirements = append(nodeSelectorRequirements, nodeSelector)
  203. }
  204. pod.Spec.Affinity = &v1.Affinity{
  205. NodeAffinity: &v1.NodeAffinity{
  206. RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
  207. NodeSelectorTerms: []v1.NodeSelectorTerm{
  208. {
  209. MatchExpressions: nodeSelectorRequirements,
  210. },
  211. },
  212. },
  213. },
  214. }
  215. }
  216. // generateNodes generates nodes to be used for scheduling.
  217. func (inputConfig *schedulerPerfConfig) generateNodes(config *testConfig) {
  218. for i := 0; i < inputConfig.NodeCount; i++ {
  219. config.clientset.CoreV1().Nodes().Create(context.TODO(), config.mutatedNodeTemplate, metav1.CreateOptions{})
  220. }
  221. for i := 0; i < config.numNodes-inputConfig.NodeCount; i++ {
  222. config.clientset.CoreV1().Nodes().Create(context.TODO(), baseNodeTemplate, metav1.CreateOptions{})
  223. }
  224. }
  225. // generatePods generates pods to be used for scheduling.
  226. func (inputConfig *schedulerPerfConfig) generatePods(config *testConfig) {
  227. testutils.CreatePod(config.clientset, "sample", inputConfig.PodCount, config.mutatedPodTemplate)
  228. testutils.CreatePod(config.clientset, "sample", config.numPods-inputConfig.PodCount, basePodTemplate)
  229. }
  230. // generatePodAndNodeTopology is the wrapper function for modifying both pods and node objects.
  231. func (inputConfig *schedulerPerfConfig) generatePodAndNodeTopology(config *testConfig) error {
  232. if config.numNodes < inputConfig.NodeCount || config.numPods < inputConfig.PodCount {
  233. return fmt.Errorf("NodeCount cannot be greater than numNodes")
  234. }
  235. nodeAffinity := inputConfig.NodeAffinity
  236. // Node template that needs to be mutated.
  237. mutatedNodeTemplate := baseNodeTemplate
  238. // Pod template that needs to be mutated.
  239. mutatedPodTemplate := basePodTemplate
  240. if nodeAffinity != nil {
  241. nodeAffinity.mutateNodeTemplate(mutatedNodeTemplate)
  242. nodeAffinity.mutatePodTemplate(mutatedPodTemplate)
  243. } // TODO: other predicates/priorities will be processed in subsequent if statements or a switch:).
  244. config.mutatedPodTemplate = mutatedPodTemplate
  245. config.mutatedNodeTemplate = mutatedNodeTemplate
  246. inputConfig.generateNodes(config)
  247. inputConfig.generatePods(config)
  248. return nil
  249. }
  250. // writePodAndNodeTopologyToConfig reads a configuration and then applies it to a test configuration.
  251. //TODO: As of now, this function is not doing anything except for reading input values to priority structs.
  252. func writePodAndNodeTopologyToConfig(config *testConfig) error {
  253. // High Level structure that should be filled for every predicate or priority.
  254. inputConfig := &schedulerPerfConfig{
  255. NodeCount: 100,
  256. PodCount: 3000,
  257. NodeAffinity: &nodeAffinity{
  258. nodeAffinityKey: "kubernetes.io/sched-perf-node-affinity-",
  259. LabelCount: 10,
  260. },
  261. }
  262. err := inputConfig.generatePodAndNodeTopology(config)
  263. if err != nil {
  264. return err
  265. }
  266. return nil
  267. }