scheduler_test.go 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package scheduler
  14. // This file tests the scheduler.
  15. import (
  16. "fmt"
  17. "testing"
  18. "time"
  19. "k8s.io/api/core/v1"
  20. "k8s.io/apimachinery/pkg/api/resource"
  21. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  22. "k8s.io/apimachinery/pkg/runtime/schema"
  23. "k8s.io/apimachinery/pkg/util/sets"
  24. "k8s.io/apimachinery/pkg/util/wait"
  25. "k8s.io/client-go/informers"
  26. clientset "k8s.io/client-go/kubernetes"
  27. clientv1core "k8s.io/client-go/kubernetes/typed/core/v1"
  28. corelisters "k8s.io/client-go/listers/core/v1"
  29. restclient "k8s.io/client-go/rest"
  30. "k8s.io/client-go/tools/cache"
  31. "k8s.io/client-go/tools/record"
  32. "k8s.io/kubernetes/pkg/api/legacyscheme"
  33. "k8s.io/kubernetes/pkg/scheduler"
  34. "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
  35. _ "k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
  36. schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
  37. kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
  38. "k8s.io/kubernetes/pkg/scheduler/factory"
  39. schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
  40. schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
  41. "k8s.io/kubernetes/test/integration/framework"
  42. )
  43. type nodeMutationFunc func(t *testing.T, n *v1.Node, nodeLister corelisters.NodeLister, c clientset.Interface)
  44. type nodeStateManager struct {
  45. makeSchedulable nodeMutationFunc
  46. makeUnSchedulable nodeMutationFunc
  47. }
  48. func PredicateOne(pod *v1.Pod, meta predicates.PredicateMetadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []predicates.PredicateFailureReason, error) {
  49. return true, nil, nil
  50. }
  51. func PredicateTwo(pod *v1.Pod, meta predicates.PredicateMetadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []predicates.PredicateFailureReason, error) {
  52. return true, nil, nil
  53. }
  54. func PriorityOne(pod *v1.Pod, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo, nodes []*v1.Node) (schedulerapi.HostPriorityList, error) {
  55. return []schedulerapi.HostPriority{}, nil
  56. }
  57. func PriorityTwo(pod *v1.Pod, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo, nodes []*v1.Node) (schedulerapi.HostPriorityList, error) {
  58. return []schedulerapi.HostPriority{}, nil
  59. }
  60. // TestSchedulerCreationFromConfigMap verifies that scheduler can be created
  61. // from configurations provided by a ConfigMap object and then verifies that the
  62. // configuration is applied correctly.
  63. func TestSchedulerCreationFromConfigMap(t *testing.T) {
  64. _, s, closeFn := framework.RunAMaster(nil)
  65. defer closeFn()
  66. ns := framework.CreateTestingNamespace("configmap", s, t)
  67. defer framework.DeleteTestingNamespace(ns, s, t)
  68. clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
  69. defer clientSet.CoreV1().Nodes().DeleteCollection(nil, metav1.ListOptions{})
  70. informerFactory := informers.NewSharedInformerFactory(clientSet, 0)
  71. // Pre-register some predicate and priority functions
  72. factory.RegisterFitPredicate("PredicateOne", PredicateOne)
  73. factory.RegisterFitPredicate("PredicateTwo", PredicateTwo)
  74. factory.RegisterPriorityFunction("PriorityOne", PriorityOne, 1)
  75. factory.RegisterPriorityFunction("PriorityTwo", PriorityTwo, 1)
  76. for i, test := range []struct {
  77. policy string
  78. expectedPredicates sets.String
  79. expectedPrioritizers sets.String
  80. }{
  81. {
  82. policy: `{
  83. "kind" : "Policy",
  84. "apiVersion" : "v1",
  85. "predicates" : [
  86. {"name" : "PredicateOne"},
  87. {"name" : "PredicateTwo"}
  88. ],
  89. "priorities" : [
  90. {"name" : "PriorityOne", "weight" : 1},
  91. {"name" : "PriorityTwo", "weight" : 5}
  92. ]
  93. }`,
  94. expectedPredicates: sets.NewString(
  95. "CheckNodeCondition", // mandatory predicate
  96. "PredicateOne",
  97. "PredicateTwo",
  98. ),
  99. expectedPrioritizers: sets.NewString(
  100. "PriorityOne",
  101. "PriorityTwo",
  102. ),
  103. },
  104. {
  105. policy: `{
  106. "kind" : "Policy",
  107. "apiVersion" : "v1"
  108. }`,
  109. expectedPredicates: sets.NewString(
  110. "CheckNodeCondition", // mandatory predicate
  111. "CheckNodeDiskPressure",
  112. "CheckNodeMemoryPressure",
  113. "CheckNodePIDPressure",
  114. "CheckVolumeBinding",
  115. "GeneralPredicates",
  116. "MatchInterPodAffinity",
  117. "MaxAzureDiskVolumeCount",
  118. "MaxCSIVolumeCountPred",
  119. "MaxEBSVolumeCount",
  120. "MaxGCEPDVolumeCount",
  121. "NoDiskConflict",
  122. "NoVolumeZoneConflict",
  123. "PodToleratesNodeTaints",
  124. ),
  125. expectedPrioritizers: sets.NewString(
  126. "BalancedResourceAllocation",
  127. "InterPodAffinityPriority",
  128. "LeastRequestedPriority",
  129. "NodeAffinityPriority",
  130. "NodePreferAvoidPodsPriority",
  131. "SelectorSpreadPriority",
  132. "TaintTolerationPriority",
  133. "ImageLocalityPriority",
  134. ),
  135. },
  136. {
  137. policy: `{
  138. "kind" : "Policy",
  139. "apiVersion" : "v1",
  140. "predicates" : [],
  141. "priorities" : []
  142. }`,
  143. expectedPredicates: sets.NewString(
  144. "CheckNodeCondition", // mandatory predicate
  145. ),
  146. expectedPrioritizers: sets.NewString(),
  147. },
  148. {
  149. policy: `apiVersion: v1
  150. kind: Policy
  151. predicates:
  152. - name: PredicateOne
  153. - name: PredicateTwo
  154. priorities:
  155. - name: PriorityOne
  156. weight: 1
  157. - name: PriorityTwo
  158. weight: 5
  159. `,
  160. expectedPredicates: sets.NewString(
  161. "CheckNodeCondition", // mandatory predicate
  162. "PredicateOne",
  163. "PredicateTwo",
  164. ),
  165. expectedPrioritizers: sets.NewString(
  166. "PriorityOne",
  167. "PriorityTwo",
  168. ),
  169. },
  170. {
  171. policy: `apiVersion: v1
  172. kind: Policy
  173. `,
  174. expectedPredicates: sets.NewString(
  175. "CheckNodeCondition", // mandatory predicate
  176. "CheckNodeDiskPressure",
  177. "CheckNodeMemoryPressure",
  178. "CheckNodePIDPressure",
  179. "CheckVolumeBinding",
  180. "GeneralPredicates",
  181. "MatchInterPodAffinity",
  182. "MaxAzureDiskVolumeCount",
  183. "MaxCSIVolumeCountPred",
  184. "MaxEBSVolumeCount",
  185. "MaxGCEPDVolumeCount",
  186. "NoDiskConflict",
  187. "NoVolumeZoneConflict",
  188. "PodToleratesNodeTaints",
  189. ),
  190. expectedPrioritizers: sets.NewString(
  191. "BalancedResourceAllocation",
  192. "InterPodAffinityPriority",
  193. "LeastRequestedPriority",
  194. "NodeAffinityPriority",
  195. "NodePreferAvoidPodsPriority",
  196. "SelectorSpreadPriority",
  197. "TaintTolerationPriority",
  198. "ImageLocalityPriority",
  199. ),
  200. },
  201. {
  202. policy: `apiVersion: v1
  203. kind: Policy
  204. predicates: []
  205. priorities: []
  206. `,
  207. expectedPredicates: sets.NewString(
  208. "CheckNodeCondition", // mandatory predicate
  209. ),
  210. expectedPrioritizers: sets.NewString(),
  211. },
  212. } {
  213. // Add a ConfigMap object.
  214. configPolicyName := fmt.Sprintf("scheduler-custom-policy-config-%d", i)
  215. policyConfigMap := v1.ConfigMap{
  216. ObjectMeta: metav1.ObjectMeta{Namespace: metav1.NamespaceSystem, Name: configPolicyName},
  217. Data: map[string]string{kubeschedulerconfig.SchedulerPolicyConfigMapKey: test.policy},
  218. }
  219. policyConfigMap.APIVersion = "v1"
  220. clientSet.CoreV1().ConfigMaps(metav1.NamespaceSystem).Create(&policyConfigMap)
  221. eventBroadcaster := record.NewBroadcaster()
  222. eventBroadcaster.StartRecordingToSink(&clientv1core.EventSinkImpl{Interface: clientSet.CoreV1().Events("")})
  223. defaultBindTimeout := int64(30)
  224. sched, err := scheduler.New(clientSet,
  225. informerFactory.Core().V1().Nodes(),
  226. factory.NewPodInformer(clientSet, 0),
  227. informerFactory.Core().V1().PersistentVolumes(),
  228. informerFactory.Core().V1().PersistentVolumeClaims(),
  229. informerFactory.Core().V1().ReplicationControllers(),
  230. informerFactory.Apps().V1().ReplicaSets(),
  231. informerFactory.Apps().V1().StatefulSets(),
  232. informerFactory.Core().V1().Services(),
  233. informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
  234. informerFactory.Storage().V1().StorageClasses(),
  235. eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: v1.DefaultSchedulerName}),
  236. kubeschedulerconfig.SchedulerAlgorithmSource{
  237. Policy: &kubeschedulerconfig.SchedulerPolicySource{
  238. ConfigMap: &kubeschedulerconfig.SchedulerPolicyConfigMapSource{
  239. Namespace: policyConfigMap.Namespace,
  240. Name: policyConfigMap.Name,
  241. },
  242. },
  243. },
  244. nil,
  245. schedulerframework.NewRegistry(),
  246. nil,
  247. []kubeschedulerconfig.PluginConfig{},
  248. scheduler.WithName(v1.DefaultSchedulerName),
  249. scheduler.WithHardPodAffinitySymmetricWeight(v1.DefaultHardPodAffinitySymmetricWeight),
  250. scheduler.WithBindTimeoutSeconds(defaultBindTimeout),
  251. )
  252. if err != nil {
  253. t.Fatalf("couldn't make scheduler config: %v", err)
  254. }
  255. config := sched.Config()
  256. // Verify that the config is applied correctly.
  257. schedPredicates := sets.NewString()
  258. for k := range config.Algorithm.Predicates() {
  259. schedPredicates.Insert(k)
  260. }
  261. schedPrioritizers := sets.NewString()
  262. for _, p := range config.Algorithm.Prioritizers() {
  263. schedPrioritizers.Insert(p.Name)
  264. }
  265. if !schedPredicates.Equal(test.expectedPredicates) {
  266. t.Errorf("Expected predicates %v, got %v", test.expectedPredicates, schedPredicates)
  267. }
  268. if !schedPrioritizers.Equal(test.expectedPrioritizers) {
  269. t.Errorf("Expected priority functions %v, got %v", test.expectedPrioritizers, schedPrioritizers)
  270. }
  271. }
  272. }
  273. // TestSchedulerCreationFromNonExistentConfigMap ensures that creation of the
  274. // scheduler from a non-existent ConfigMap fails.
  275. func TestSchedulerCreationFromNonExistentConfigMap(t *testing.T) {
  276. _, s, closeFn := framework.RunAMaster(nil)
  277. defer closeFn()
  278. ns := framework.CreateTestingNamespace("configmap", s, t)
  279. defer framework.DeleteTestingNamespace(ns, s, t)
  280. clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
  281. defer clientSet.CoreV1().Nodes().DeleteCollection(nil, metav1.ListOptions{})
  282. informerFactory := informers.NewSharedInformerFactory(clientSet, 0)
  283. eventBroadcaster := record.NewBroadcaster()
  284. eventBroadcaster.StartRecordingToSink(&clientv1core.EventSinkImpl{Interface: clientSet.CoreV1().Events("")})
  285. defaultBindTimeout := int64(30)
  286. _, err := scheduler.New(clientSet,
  287. informerFactory.Core().V1().Nodes(),
  288. factory.NewPodInformer(clientSet, 0),
  289. informerFactory.Core().V1().PersistentVolumes(),
  290. informerFactory.Core().V1().PersistentVolumeClaims(),
  291. informerFactory.Core().V1().ReplicationControllers(),
  292. informerFactory.Apps().V1().ReplicaSets(),
  293. informerFactory.Apps().V1().StatefulSets(),
  294. informerFactory.Core().V1().Services(),
  295. informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
  296. informerFactory.Storage().V1().StorageClasses(),
  297. eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: v1.DefaultSchedulerName}),
  298. kubeschedulerconfig.SchedulerAlgorithmSource{
  299. Policy: &kubeschedulerconfig.SchedulerPolicySource{
  300. ConfigMap: &kubeschedulerconfig.SchedulerPolicyConfigMapSource{
  301. Namespace: "non-existent-config",
  302. Name: "non-existent-config",
  303. },
  304. },
  305. },
  306. nil,
  307. schedulerframework.NewRegistry(),
  308. nil,
  309. []kubeschedulerconfig.PluginConfig{},
  310. scheduler.WithName(v1.DefaultSchedulerName),
  311. scheduler.WithHardPodAffinitySymmetricWeight(v1.DefaultHardPodAffinitySymmetricWeight),
  312. scheduler.WithBindTimeoutSeconds(defaultBindTimeout))
  313. if err == nil {
  314. t.Fatalf("Creation of scheduler didn't fail while the policy ConfigMap didn't exist.")
  315. }
  316. }
  317. func TestUnschedulableNodes(t *testing.T) {
  318. context := initTest(t, "unschedulable-nodes")
  319. defer cleanupTest(t, context)
  320. nodeLister := context.schedulerConfigFactory.GetNodeLister()
  321. // NOTE: This test cannot run in parallel, because it is creating and deleting
  322. // non-namespaced objects (Nodes).
  323. defer context.clientSet.CoreV1().Nodes().DeleteCollection(nil, metav1.ListOptions{})
  324. goodCondition := v1.NodeCondition{
  325. Type: v1.NodeReady,
  326. Status: v1.ConditionTrue,
  327. Reason: fmt.Sprintf("schedulable condition"),
  328. LastHeartbeatTime: metav1.Time{Time: time.Now()},
  329. }
  330. badCondition := v1.NodeCondition{
  331. Type: v1.NodeReady,
  332. Status: v1.ConditionUnknown,
  333. Reason: fmt.Sprintf("unschedulable condition"),
  334. LastHeartbeatTime: metav1.Time{Time: time.Now()},
  335. }
  336. // Create a new schedulable node, since we're first going to apply
  337. // the unschedulable condition and verify that pods aren't scheduled.
  338. node := &v1.Node{
  339. ObjectMeta: metav1.ObjectMeta{Name: "node-scheduling-test-node"},
  340. Spec: v1.NodeSpec{Unschedulable: false},
  341. Status: v1.NodeStatus{
  342. Capacity: v1.ResourceList{
  343. v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI),
  344. },
  345. Conditions: []v1.NodeCondition{goodCondition},
  346. },
  347. }
  348. nodeKey, err := cache.MetaNamespaceKeyFunc(node)
  349. if err != nil {
  350. t.Fatalf("Couldn't retrieve key for node %v", node.Name)
  351. }
  352. // The test does the following for each nodeStateManager in this list:
  353. // 1. Create a new node
  354. // 2. Apply the makeUnSchedulable function
  355. // 3. Create a new pod
  356. // 4. Check that the pod doesn't get assigned to the node
  357. // 5. Apply the schedulable function
  358. // 6. Check that the pod *does* get assigned to the node
  359. // 7. Delete the pod and node.
  360. nodeModifications := []nodeStateManager{
  361. // Test node.Spec.Unschedulable=true/false
  362. {
  363. makeUnSchedulable: func(t *testing.T, n *v1.Node, nodeLister corelisters.NodeLister, c clientset.Interface) {
  364. n.Spec.Unschedulable = true
  365. if _, err := c.CoreV1().Nodes().Update(n); err != nil {
  366. t.Fatalf("Failed to update node with unschedulable=true: %v", err)
  367. }
  368. err = waitForReflection(t, nodeLister, nodeKey, func(node interface{}) bool {
  369. // An unschedulable node should still be present in the store
  370. // Nodes that are unschedulable or that are not ready or
  371. // have their disk full (Node.Spec.Conditions) are excluded
  372. // based on NodeConditionPredicate, a separate check
  373. return node != nil && node.(*v1.Node).Spec.Unschedulable == true
  374. })
  375. if err != nil {
  376. t.Fatalf("Failed to observe reflected update for setting unschedulable=true: %v", err)
  377. }
  378. },
  379. makeSchedulable: func(t *testing.T, n *v1.Node, nodeLister corelisters.NodeLister, c clientset.Interface) {
  380. n.Spec.Unschedulable = false
  381. if _, err := c.CoreV1().Nodes().Update(n); err != nil {
  382. t.Fatalf("Failed to update node with unschedulable=false: %v", err)
  383. }
  384. err = waitForReflection(t, nodeLister, nodeKey, func(node interface{}) bool {
  385. return node != nil && node.(*v1.Node).Spec.Unschedulable == false
  386. })
  387. if err != nil {
  388. t.Fatalf("Failed to observe reflected update for setting unschedulable=false: %v", err)
  389. }
  390. },
  391. },
  392. // Test node.Status.Conditions=ConditionTrue/Unknown
  393. {
  394. makeUnSchedulable: func(t *testing.T, n *v1.Node, nodeLister corelisters.NodeLister, c clientset.Interface) {
  395. n.Status = v1.NodeStatus{
  396. Capacity: v1.ResourceList{
  397. v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI),
  398. },
  399. Conditions: []v1.NodeCondition{badCondition},
  400. }
  401. if _, err = c.CoreV1().Nodes().UpdateStatus(n); err != nil {
  402. t.Fatalf("Failed to update node with bad status condition: %v", err)
  403. }
  404. err = waitForReflection(t, nodeLister, nodeKey, func(node interface{}) bool {
  405. return node != nil && node.(*v1.Node).Status.Conditions[0].Status == v1.ConditionUnknown
  406. })
  407. if err != nil {
  408. t.Fatalf("Failed to observe reflected update for status condition update: %v", err)
  409. }
  410. },
  411. makeSchedulable: func(t *testing.T, n *v1.Node, nodeLister corelisters.NodeLister, c clientset.Interface) {
  412. n.Status = v1.NodeStatus{
  413. Capacity: v1.ResourceList{
  414. v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI),
  415. },
  416. Conditions: []v1.NodeCondition{goodCondition},
  417. }
  418. if _, err = c.CoreV1().Nodes().UpdateStatus(n); err != nil {
  419. t.Fatalf("Failed to update node with healthy status condition: %v", err)
  420. }
  421. err = waitForReflection(t, nodeLister, nodeKey, func(node interface{}) bool {
  422. return node != nil && node.(*v1.Node).Status.Conditions[0].Status == v1.ConditionTrue
  423. })
  424. if err != nil {
  425. t.Fatalf("Failed to observe reflected update for status condition update: %v", err)
  426. }
  427. },
  428. },
  429. }
  430. for i, mod := range nodeModifications {
  431. unSchedNode, err := context.clientSet.CoreV1().Nodes().Create(node)
  432. if err != nil {
  433. t.Fatalf("Failed to create node: %v", err)
  434. }
  435. // Apply the unschedulable modification to the node, and wait for the reflection
  436. mod.makeUnSchedulable(t, unSchedNode, nodeLister, context.clientSet)
  437. // Create the new pod, note that this needs to happen post unschedulable
  438. // modification or we have a race in the test.
  439. myPod, err := createPausePodWithResource(context.clientSet, "node-scheduling-test-pod", context.ns.Name, nil)
  440. if err != nil {
  441. t.Fatalf("Failed to create pod: %v", err)
  442. }
  443. // There are no schedulable nodes - the pod shouldn't be scheduled.
  444. err = waitForPodToScheduleWithTimeout(context.clientSet, myPod, 2*time.Second)
  445. if err == nil {
  446. t.Errorf("Pod scheduled successfully on unschedulable nodes")
  447. }
  448. if err != wait.ErrWaitTimeout {
  449. t.Errorf("Test %d: failed while trying to confirm the pod does not get scheduled on the node: %v", i, err)
  450. } else {
  451. t.Logf("Test %d: Pod did not get scheduled on an unschedulable node", i)
  452. }
  453. // Apply the schedulable modification to the node, and wait for the reflection
  454. schedNode, err := context.clientSet.CoreV1().Nodes().Get(unSchedNode.Name, metav1.GetOptions{})
  455. if err != nil {
  456. t.Fatalf("Failed to get node: %v", err)
  457. }
  458. mod.makeSchedulable(t, schedNode, nodeLister, context.clientSet)
  459. // Wait until the pod is scheduled.
  460. if err := waitForPodToSchedule(context.clientSet, myPod); err != nil {
  461. t.Errorf("Test %d: failed to schedule a pod: %v", i, err)
  462. } else {
  463. t.Logf("Test %d: Pod got scheduled on a schedulable node", i)
  464. }
  465. // Clean up.
  466. if err := deletePod(context.clientSet, myPod.Name, myPod.Namespace); err != nil {
  467. t.Errorf("Failed to delete pod: %v", err)
  468. }
  469. err = context.clientSet.CoreV1().Nodes().Delete(schedNode.Name, nil)
  470. if err != nil {
  471. t.Errorf("Failed to delete node: %v", err)
  472. }
  473. }
  474. }
  475. func TestMultiScheduler(t *testing.T) {
  476. /*
  477. This integration tests the multi-scheduler feature in the following way:
  478. 1. create a default scheduler
  479. 2. create a node
  480. 3. create 3 pods: testPodNoAnnotation, testPodWithAnnotationFitsDefault and testPodWithAnnotationFitsFoo
  481. - note: the first two should be picked and scheduled by default scheduler while the last one should be
  482. picked by scheduler of name "foo-scheduler" which does not exist yet.
  483. 4. **check point-1**:
  484. - testPodNoAnnotation, testPodWithAnnotationFitsDefault should be scheduled
  485. - testPodWithAnnotationFitsFoo should NOT be scheduled
  486. 5. create a scheduler with name "foo-scheduler"
  487. 6. **check point-2**:
  488. - testPodWithAnnotationFitsFoo should be scheduled
  489. 7. stop default scheduler
  490. 8. create 2 pods: testPodNoAnnotation2 and testPodWithAnnotationFitsDefault2
  491. - note: these two pods belong to default scheduler which no longer exists
  492. 9. **check point-3**:
  493. - testPodNoAnnotation2 and testPodWithAnnotationFitsDefault2 should NOT be scheduled
  494. */
  495. // 1. create and start default-scheduler
  496. context := initTest(t, "multi-scheduler")
  497. defer cleanupTest(t, context)
  498. // 2. create a node
  499. node := &v1.Node{
  500. ObjectMeta: metav1.ObjectMeta{Name: "node-multi-scheduler-test-node"},
  501. Spec: v1.NodeSpec{Unschedulable: false},
  502. Status: v1.NodeStatus{
  503. Capacity: v1.ResourceList{
  504. v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI),
  505. },
  506. },
  507. }
  508. context.clientSet.CoreV1().Nodes().Create(node)
  509. // 3. create 3 pods for testing
  510. t.Logf("create 3 pods for testing")
  511. testPod, err := createPausePodWithResource(context.clientSet, "pod-without-scheduler-name", context.ns.Name, nil)
  512. if err != nil {
  513. t.Fatalf("Failed to create pod: %v", err)
  514. }
  515. defaultScheduler := "default-scheduler"
  516. testPodFitsDefault, err := createPausePod(context.clientSet, initPausePod(context.clientSet, &pausePodConfig{Name: "pod-fits-default", Namespace: context.ns.Name, SchedulerName: defaultScheduler}))
  517. if err != nil {
  518. t.Fatalf("Failed to create pod: %v", err)
  519. }
  520. fooScheduler := "foo-scheduler"
  521. testPodFitsFoo, err := createPausePod(context.clientSet, initPausePod(context.clientSet, &pausePodConfig{Name: "pod-fits-foo", Namespace: context.ns.Name, SchedulerName: fooScheduler}))
  522. if err != nil {
  523. t.Fatalf("Failed to create pod: %v", err)
  524. }
  525. // 4. **check point-1**:
  526. // - testPod, testPodFitsDefault should be scheduled
  527. // - testPodFitsFoo should NOT be scheduled
  528. t.Logf("wait for pods scheduled")
  529. if err := waitForPodToSchedule(context.clientSet, testPod); err != nil {
  530. t.Errorf("Test MultiScheduler: %s Pod not scheduled: %v", testPod.Name, err)
  531. } else {
  532. t.Logf("Test MultiScheduler: %s Pod scheduled", testPod.Name)
  533. }
  534. if err := waitForPodToSchedule(context.clientSet, testPodFitsDefault); err != nil {
  535. t.Errorf("Test MultiScheduler: %s Pod not scheduled: %v", testPodFitsDefault.Name, err)
  536. } else {
  537. t.Logf("Test MultiScheduler: %s Pod scheduled", testPodFitsDefault.Name)
  538. }
  539. if err := waitForPodToScheduleWithTimeout(context.clientSet, testPodFitsFoo, time.Second*5); err == nil {
  540. t.Errorf("Test MultiScheduler: %s Pod got scheduled, %v", testPodFitsFoo.Name, err)
  541. } else {
  542. t.Logf("Test MultiScheduler: %s Pod not scheduled", testPodFitsFoo.Name)
  543. }
  544. // 5. create and start a scheduler with name "foo-scheduler"
  545. clientSet2 := clientset.NewForConfigOrDie(&restclient.Config{Host: context.httpServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
  546. informerFactory2 := informers.NewSharedInformerFactory(context.clientSet, 0)
  547. podInformer2 := factory.NewPodInformer(context.clientSet, 0)
  548. stopCh := make(chan struct{})
  549. defer close(stopCh)
  550. schedulerConfigFactory2 := createConfiguratorWithPodInformer(fooScheduler, clientSet2, podInformer2, informerFactory2, schedulerframework.NewRegistry(),
  551. nil, []kubeschedulerconfig.PluginConfig{}, stopCh)
  552. schedulerConfig2, err := schedulerConfigFactory2.Create()
  553. if err != nil {
  554. t.Errorf("Couldn't create scheduler config: %v", err)
  555. }
  556. eventBroadcaster2 := record.NewBroadcaster()
  557. schedulerConfig2.Recorder = eventBroadcaster2.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: fooScheduler})
  558. eventBroadcaster2.StartRecordingToSink(&clientv1core.EventSinkImpl{Interface: clientSet2.CoreV1().Events("")})
  559. sched2 := scheduler.NewFromConfig(schedulerConfig2)
  560. scheduler.AddAllEventHandlers(sched2,
  561. fooScheduler,
  562. context.informerFactory.Core().V1().Nodes(),
  563. podInformer2,
  564. context.informerFactory.Core().V1().PersistentVolumes(),
  565. context.informerFactory.Core().V1().PersistentVolumeClaims(),
  566. context.informerFactory.Core().V1().Services(),
  567. context.informerFactory.Storage().V1().StorageClasses(),
  568. )
  569. go podInformer2.Informer().Run(stopCh)
  570. informerFactory2.Start(stopCh)
  571. sched2.Run()
  572. // 6. **check point-2**:
  573. // - testPodWithAnnotationFitsFoo should be scheduled
  574. err = waitForPodToSchedule(context.clientSet, testPodFitsFoo)
  575. if err != nil {
  576. t.Errorf("Test MultiScheduler: %s Pod not scheduled, %v", testPodFitsFoo.Name, err)
  577. } else {
  578. t.Logf("Test MultiScheduler: %s Pod scheduled", testPodFitsFoo.Name)
  579. }
  580. // 7. delete the pods that were scheduled by the default scheduler, and stop the default scheduler
  581. if err := deletePod(context.clientSet, testPod.Name, context.ns.Name); err != nil {
  582. t.Errorf("Failed to delete pod: %v", err)
  583. }
  584. if err := deletePod(context.clientSet, testPodFitsDefault.Name, context.ns.Name); err != nil {
  585. t.Errorf("Failed to delete pod: %v", err)
  586. }
  587. // The rest of this test assumes that closing StopEverything will cause the
  588. // scheduler thread to stop immediately. It won't, and in fact it will often
  589. // schedule 1 more pod before finally exiting. Comment out until we fix that.
  590. //
  591. // See https://github.com/kubernetes/kubernetes/issues/23715 for more details.
  592. /*
  593. close(schedulerConfig.StopEverything)
  594. // 8. create 2 pods: testPodNoAnnotation2 and testPodWithAnnotationFitsDefault2
  595. // - note: these two pods belong to default scheduler which no longer exists
  596. podWithNoAnnotation2 := createPod("pod-with-no-annotation2", nil)
  597. podWithAnnotationFitsDefault2 := createPod("pod-with-annotation-fits-default2", schedulerAnnotationFitsDefault)
  598. testPodNoAnnotation2, err := clientSet.CoreV1().Pods(ns.Name).Create(podWithNoAnnotation2)
  599. if err != nil {
  600. t.Fatalf("Failed to create pod: %v", err)
  601. }
  602. testPodWithAnnotationFitsDefault2, err := clientSet.CoreV1().Pods(ns.Name).Create(podWithAnnotationFitsDefault2)
  603. if err != nil {
  604. t.Fatalf("Failed to create pod: %v", err)
  605. }
  606. // 9. **check point-3**:
  607. // - testPodNoAnnotation2 and testPodWithAnnotationFitsDefault2 should NOT be scheduled
  608. err = wait.Poll(time.Second, time.Second*5, podScheduled(clientSet, testPodNoAnnotation2.Namespace, testPodNoAnnotation2.Name))
  609. if err == nil {
  610. t.Errorf("Test MultiScheduler: %s Pod got scheduled, %v", testPodNoAnnotation2.Name, err)
  611. } else {
  612. t.Logf("Test MultiScheduler: %s Pod not scheduled", testPodNoAnnotation2.Name)
  613. }
  614. err = wait.Poll(time.Second, time.Second*5, podScheduled(clientSet, testPodWithAnnotationFitsDefault2.Namespace, testPodWithAnnotationFitsDefault2.Name))
  615. if err == nil {
  616. t.Errorf("Test MultiScheduler: %s Pod got scheduled, %v", testPodWithAnnotationFitsDefault2.Name, err)
  617. } else {
  618. t.Logf("Test MultiScheduler: %s Pod scheduled", testPodWithAnnotationFitsDefault2.Name)
  619. }
  620. */
  621. }
  622. // This test will verify scheduler can work well regardless of whether kubelet is allocatable aware or not.
  623. func TestAllocatable(t *testing.T) {
  624. context := initTest(t, "allocatable")
  625. defer cleanupTest(t, context)
  626. // 2. create a node without allocatable awareness
  627. nodeRes := &v1.ResourceList{
  628. v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI),
  629. v1.ResourceCPU: *resource.NewMilliQuantity(30, resource.DecimalSI),
  630. v1.ResourceMemory: *resource.NewQuantity(30, resource.BinarySI),
  631. }
  632. allocNode, err := createNode(context.clientSet, "node-allocatable-scheduler-test-node", nodeRes)
  633. if err != nil {
  634. t.Fatalf("Failed to create node: %v", err)
  635. }
  636. // 3. create resource pod which requires less than Capacity
  637. podName := "pod-test-allocatable"
  638. podRes := &v1.ResourceList{
  639. v1.ResourceCPU: *resource.NewMilliQuantity(20, resource.DecimalSI),
  640. v1.ResourceMemory: *resource.NewQuantity(20, resource.BinarySI),
  641. }
  642. testAllocPod, err := createPausePodWithResource(context.clientSet, podName, context.ns.Name, podRes)
  643. if err != nil {
  644. t.Fatalf("Test allocatable unawareness failed to create pod: %v", err)
  645. }
  646. // 4. Test: this test pod should be scheduled since api-server will use Capacity as Allocatable
  647. err = waitForPodToScheduleWithTimeout(context.clientSet, testAllocPod, time.Second*5)
  648. if err != nil {
  649. t.Errorf("Test allocatable unawareness: %s Pod not scheduled: %v", testAllocPod.Name, err)
  650. } else {
  651. t.Logf("Test allocatable unawareness: %s Pod scheduled", testAllocPod.Name)
  652. }
  653. // 5. Change the node status to allocatable aware, note that Allocatable is less than Pod's requirement
  654. allocNode.Status = v1.NodeStatus{
  655. Capacity: v1.ResourceList{
  656. v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI),
  657. v1.ResourceCPU: *resource.NewMilliQuantity(30, resource.DecimalSI),
  658. v1.ResourceMemory: *resource.NewQuantity(30, resource.BinarySI),
  659. },
  660. Allocatable: v1.ResourceList{
  661. v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI),
  662. v1.ResourceCPU: *resource.NewMilliQuantity(10, resource.DecimalSI),
  663. v1.ResourceMemory: *resource.NewQuantity(10, resource.BinarySI),
  664. },
  665. }
  666. if _, err := context.clientSet.CoreV1().Nodes().UpdateStatus(allocNode); err != nil {
  667. t.Fatalf("Failed to update node with Status.Allocatable: %v", err)
  668. }
  669. if err := deletePod(context.clientSet, testAllocPod.Name, context.ns.Name); err != nil {
  670. t.Fatalf("Failed to remove the first pod: %v", err)
  671. }
  672. // 6. Make another pod with different name, same resource request
  673. podName2 := "pod-test-allocatable2"
  674. testAllocPod2, err := createPausePodWithResource(context.clientSet, podName2, context.ns.Name, podRes)
  675. if err != nil {
  676. t.Fatalf("Test allocatable awareness failed to create pod: %v", err)
  677. }
  678. // 7. Test: this test pod should not be scheduled since it request more than Allocatable
  679. if err := waitForPodToScheduleWithTimeout(context.clientSet, testAllocPod2, time.Second*5); err == nil {
  680. t.Errorf("Test allocatable awareness: %s Pod got scheduled unexpectedly, %v", testAllocPod2.Name, err)
  681. } else {
  682. t.Logf("Test allocatable awareness: %s Pod not scheduled as expected", testAllocPod2.Name)
  683. }
  684. }
  685. // TestSchedulerInformers tests that scheduler receives informer events and updates its cache when
  686. // pods are scheduled by other schedulers.
  687. func TestSchedulerInformers(t *testing.T) {
  688. // Initialize scheduler.
  689. context := initTest(t, "scheduler-informer")
  690. defer cleanupTest(t, context)
  691. cs := context.clientSet
  692. defaultPodRes := &v1.ResourceRequirements{Requests: v1.ResourceList{
  693. v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI),
  694. v1.ResourceMemory: *resource.NewQuantity(200, resource.BinarySI)},
  695. }
  696. defaultNodeRes := &v1.ResourceList{
  697. v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI),
  698. v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI),
  699. v1.ResourceMemory: *resource.NewQuantity(500, resource.BinarySI),
  700. }
  701. type nodeConfig struct {
  702. name string
  703. res *v1.ResourceList
  704. }
  705. tests := []struct {
  706. description string
  707. nodes []*nodeConfig
  708. existingPods []*v1.Pod
  709. pod *v1.Pod
  710. preemptedPodIndexes map[int]struct{}
  711. }{
  712. {
  713. description: "Pod cannot be scheduled when node is occupied by pods scheduled by other schedulers",
  714. nodes: []*nodeConfig{{name: "node-1", res: defaultNodeRes}},
  715. existingPods: []*v1.Pod{
  716. initPausePod(context.clientSet, &pausePodConfig{
  717. Name: "pod1",
  718. Namespace: context.ns.Name,
  719. Resources: defaultPodRes,
  720. Labels: map[string]string{"foo": "bar"},
  721. NodeName: "node-1",
  722. SchedulerName: "foo-scheduler",
  723. }),
  724. initPausePod(context.clientSet, &pausePodConfig{
  725. Name: "pod2",
  726. Namespace: context.ns.Name,
  727. Resources: defaultPodRes,
  728. Labels: map[string]string{"foo": "bar"},
  729. NodeName: "node-1",
  730. SchedulerName: "bar-scheduler",
  731. }),
  732. },
  733. pod: initPausePod(cs, &pausePodConfig{
  734. Name: "unschedulable-pod",
  735. Namespace: context.ns.Name,
  736. Resources: defaultPodRes,
  737. }),
  738. preemptedPodIndexes: map[int]struct{}{2: {}},
  739. },
  740. }
  741. for _, test := range tests {
  742. for _, nodeConf := range test.nodes {
  743. _, err := createNode(cs, nodeConf.name, nodeConf.res)
  744. if err != nil {
  745. t.Fatalf("Error creating node %v: %v", nodeConf.name, err)
  746. }
  747. }
  748. pods := make([]*v1.Pod, len(test.existingPods))
  749. var err error
  750. // Create and run existingPods.
  751. for i, p := range test.existingPods {
  752. if pods[i], err = runPausePod(cs, p); err != nil {
  753. t.Fatalf("Test [%v]: Error running pause pod: %v", test.description, err)
  754. }
  755. }
  756. // Create the new "pod".
  757. unschedulable, err := createPausePod(cs, test.pod)
  758. if err != nil {
  759. t.Errorf("Error while creating new pod: %v", err)
  760. }
  761. if err := waitForPodUnschedulable(cs, unschedulable); err != nil {
  762. t.Errorf("Pod %v got scheduled: %v", unschedulable.Name, err)
  763. }
  764. // Cleanup
  765. pods = append(pods, unschedulable)
  766. cleanupPods(cs, t, pods)
  767. cs.PolicyV1beta1().PodDisruptionBudgets(context.ns.Name).DeleteCollection(nil, metav1.ListOptions{})
  768. cs.CoreV1().Nodes().DeleteCollection(nil, metav1.ListOptions{})
  769. }
  770. }