preemption_test.go 43 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // This file tests preemption functionality of the scheduler.
  14. package scheduler
  15. import (
  16. "context"
  17. "fmt"
  18. "testing"
  19. "time"
  20. v1 "k8s.io/api/core/v1"
  21. policy "k8s.io/api/policy/v1beta1"
  22. "k8s.io/apimachinery/pkg/api/resource"
  23. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  24. "k8s.io/apimachinery/pkg/runtime"
  25. "k8s.io/apimachinery/pkg/runtime/schema"
  26. "k8s.io/apimachinery/pkg/types"
  27. "k8s.io/apimachinery/pkg/util/intstr"
  28. "k8s.io/apimachinery/pkg/util/wait"
  29. "k8s.io/client-go/informers"
  30. "k8s.io/client-go/kubernetes"
  31. clientset "k8s.io/client-go/kubernetes"
  32. restclient "k8s.io/client-go/rest"
  33. "k8s.io/klog"
  34. podutil "k8s.io/kubernetes/pkg/api/v1/pod"
  35. "k8s.io/kubernetes/pkg/apis/scheduling"
  36. "k8s.io/kubernetes/pkg/scheduler"
  37. schedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
  38. framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
  39. schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
  40. "k8s.io/kubernetes/plugin/pkg/admission/priority"
  41. testutils "k8s.io/kubernetes/test/utils"
  42. )
  43. var lowPriority, mediumPriority, highPriority = int32(100), int32(200), int32(300)
  44. func waitForNominatedNodeNameWithTimeout(cs clientset.Interface, pod *v1.Pod, timeout time.Duration) error {
  45. if err := wait.Poll(100*time.Millisecond, timeout, func() (bool, error) {
  46. pod, err := cs.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{})
  47. if err != nil {
  48. return false, err
  49. }
  50. if len(pod.Status.NominatedNodeName) > 0 {
  51. return true, nil
  52. }
  53. return false, err
  54. }); err != nil {
  55. return fmt.Errorf("Pod %v/%v annotation did not get set: %v", pod.Namespace, pod.Name, err)
  56. }
  57. return nil
  58. }
  59. func waitForNominatedNodeName(cs clientset.Interface, pod *v1.Pod) error {
  60. return waitForNominatedNodeNameWithTimeout(cs, pod, wait.ForeverTestTimeout)
  61. }
  62. const tokenFilterName = "token-filter"
  63. type tokenFilter struct {
  64. Tokens int
  65. Unresolvable bool
  66. }
  67. // Name returns name of the plugin.
  68. func (fp *tokenFilter) Name() string {
  69. return tokenFilterName
  70. }
  71. func (fp *tokenFilter) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod,
  72. nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status {
  73. if fp.Tokens > 0 {
  74. fp.Tokens--
  75. return nil
  76. }
  77. status := framework.Unschedulable
  78. if fp.Unresolvable {
  79. status = framework.UnschedulableAndUnresolvable
  80. }
  81. return framework.NewStatus(status, fmt.Sprintf("can't fit %v", pod.Name))
  82. }
  83. func (fp *tokenFilter) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) *framework.Status {
  84. return nil
  85. }
  86. func (fp *tokenFilter) AddPod(ctx context.Context, state *framework.CycleState, podToSchedule *v1.Pod,
  87. podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status {
  88. fp.Tokens--
  89. return nil
  90. }
  91. func (fp *tokenFilter) RemovePod(ctx context.Context, state *framework.CycleState, podToSchedule *v1.Pod,
  92. podToRemove *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status {
  93. fp.Tokens++
  94. return nil
  95. }
  96. func (fp *tokenFilter) PreFilterExtensions() framework.PreFilterExtensions {
  97. return fp
  98. }
  99. var _ framework.FilterPlugin = &tokenFilter{}
  100. // TestPreemption tests a few preemption scenarios.
  101. func TestPreemption(t *testing.T) {
  102. // Initialize scheduler with a filter plugin.
  103. var filter tokenFilter
  104. registry := make(framework.Registry)
  105. err := registry.Register(filterPluginName, func(_ *runtime.Unknown, fh framework.FrameworkHandle) (framework.Plugin, error) {
  106. return &filter, nil
  107. })
  108. if err != nil {
  109. t.Fatalf("Error registering a filter: %v", err)
  110. }
  111. plugins := &schedulerconfig.Plugins{
  112. Filter: &schedulerconfig.PluginSet{
  113. Enabled: []schedulerconfig.Plugin{
  114. {
  115. Name: filterPluginName,
  116. },
  117. },
  118. },
  119. PreFilter: &schedulerconfig.PluginSet{
  120. Enabled: []schedulerconfig.Plugin{
  121. {
  122. Name: filterPluginName,
  123. },
  124. },
  125. },
  126. }
  127. testCtx := initTestSchedulerWithOptions(t,
  128. initTestMaster(t, "preemptiom", nil),
  129. false, nil, time.Second,
  130. scheduler.WithFrameworkPlugins(plugins),
  131. scheduler.WithFrameworkOutOfTreeRegistry(registry))
  132. defer cleanupTest(t, testCtx)
  133. cs := testCtx.clientSet
  134. defaultPodRes := &v1.ResourceRequirements{Requests: v1.ResourceList{
  135. v1.ResourceCPU: *resource.NewMilliQuantity(100, resource.DecimalSI),
  136. v1.ResourceMemory: *resource.NewQuantity(100, resource.DecimalSI)},
  137. }
  138. maxTokens := 1000
  139. tests := []struct {
  140. description string
  141. existingPods []*v1.Pod
  142. pod *v1.Pod
  143. initTokens int
  144. unresolvable bool
  145. preemptedPodIndexes map[int]struct{}
  146. }{
  147. {
  148. description: "basic pod preemption",
  149. initTokens: maxTokens,
  150. existingPods: []*v1.Pod{
  151. initPausePod(testCtx.clientSet, &pausePodConfig{
  152. Name: "victim-pod",
  153. Namespace: testCtx.ns.Name,
  154. Priority: &lowPriority,
  155. Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{
  156. v1.ResourceCPU: *resource.NewMilliQuantity(400, resource.DecimalSI),
  157. v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)},
  158. },
  159. }),
  160. },
  161. pod: initPausePod(cs, &pausePodConfig{
  162. Name: "preemptor-pod",
  163. Namespace: testCtx.ns.Name,
  164. Priority: &highPriority,
  165. Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{
  166. v1.ResourceCPU: *resource.NewMilliQuantity(300, resource.DecimalSI),
  167. v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)},
  168. },
  169. }),
  170. preemptedPodIndexes: map[int]struct{}{0: {}},
  171. },
  172. {
  173. description: "basic pod preemption with filter",
  174. initTokens: 1,
  175. existingPods: []*v1.Pod{
  176. initPausePod(testCtx.clientSet, &pausePodConfig{
  177. Name: "victim-pod",
  178. Namespace: testCtx.ns.Name,
  179. Priority: &lowPriority,
  180. Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{
  181. v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI),
  182. v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)},
  183. },
  184. }),
  185. },
  186. pod: initPausePod(cs, &pausePodConfig{
  187. Name: "preemptor-pod",
  188. Namespace: testCtx.ns.Name,
  189. Priority: &highPriority,
  190. Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{
  191. v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI),
  192. v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)},
  193. },
  194. }),
  195. preemptedPodIndexes: map[int]struct{}{0: {}},
  196. },
  197. {
  198. // same as the previous test, but the filter is unresolvable.
  199. description: "basic pod preemption with unresolvable filter",
  200. initTokens: 1,
  201. unresolvable: true,
  202. existingPods: []*v1.Pod{
  203. initPausePod(testCtx.clientSet, &pausePodConfig{
  204. Name: "victim-pod",
  205. Namespace: testCtx.ns.Name,
  206. Priority: &lowPriority,
  207. Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{
  208. v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI),
  209. v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)},
  210. },
  211. }),
  212. },
  213. pod: initPausePod(cs, &pausePodConfig{
  214. Name: "preemptor-pod",
  215. Namespace: testCtx.ns.Name,
  216. Priority: &highPriority,
  217. Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{
  218. v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI),
  219. v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)},
  220. },
  221. }),
  222. preemptedPodIndexes: map[int]struct{}{},
  223. },
  224. {
  225. description: "preemption is performed to satisfy anti-affinity",
  226. initTokens: maxTokens,
  227. existingPods: []*v1.Pod{
  228. initPausePod(cs, &pausePodConfig{
  229. Name: "pod-0", Namespace: testCtx.ns.Name,
  230. Priority: &mediumPriority,
  231. Labels: map[string]string{"pod": "p0"},
  232. Resources: defaultPodRes,
  233. }),
  234. initPausePod(cs, &pausePodConfig{
  235. Name: "pod-1", Namespace: testCtx.ns.Name,
  236. Priority: &lowPriority,
  237. Labels: map[string]string{"pod": "p1"},
  238. Resources: defaultPodRes,
  239. Affinity: &v1.Affinity{
  240. PodAntiAffinity: &v1.PodAntiAffinity{
  241. RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
  242. {
  243. LabelSelector: &metav1.LabelSelector{
  244. MatchExpressions: []metav1.LabelSelectorRequirement{
  245. {
  246. Key: "pod",
  247. Operator: metav1.LabelSelectorOpIn,
  248. Values: []string{"preemptor"},
  249. },
  250. },
  251. },
  252. TopologyKey: "node",
  253. },
  254. },
  255. },
  256. },
  257. }),
  258. },
  259. // A higher priority pod with anti-affinity.
  260. pod: initPausePod(cs, &pausePodConfig{
  261. Name: "preemptor-pod",
  262. Namespace: testCtx.ns.Name,
  263. Priority: &highPriority,
  264. Labels: map[string]string{"pod": "preemptor"},
  265. Resources: defaultPodRes,
  266. Affinity: &v1.Affinity{
  267. PodAntiAffinity: &v1.PodAntiAffinity{
  268. RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
  269. {
  270. LabelSelector: &metav1.LabelSelector{
  271. MatchExpressions: []metav1.LabelSelectorRequirement{
  272. {
  273. Key: "pod",
  274. Operator: metav1.LabelSelectorOpIn,
  275. Values: []string{"p0"},
  276. },
  277. },
  278. },
  279. TopologyKey: "node",
  280. },
  281. },
  282. },
  283. },
  284. }),
  285. preemptedPodIndexes: map[int]struct{}{0: {}, 1: {}},
  286. },
  287. {
  288. // This is similar to the previous case only pod-1 is high priority.
  289. description: "preemption is not performed when anti-affinity is not satisfied",
  290. initTokens: maxTokens,
  291. existingPods: []*v1.Pod{
  292. initPausePod(cs, &pausePodConfig{
  293. Name: "pod-0", Namespace: testCtx.ns.Name,
  294. Priority: &mediumPriority,
  295. Labels: map[string]string{"pod": "p0"},
  296. Resources: defaultPodRes,
  297. }),
  298. initPausePod(cs, &pausePodConfig{
  299. Name: "pod-1", Namespace: testCtx.ns.Name,
  300. Priority: &highPriority,
  301. Labels: map[string]string{"pod": "p1"},
  302. Resources: defaultPodRes,
  303. Affinity: &v1.Affinity{
  304. PodAntiAffinity: &v1.PodAntiAffinity{
  305. RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
  306. {
  307. LabelSelector: &metav1.LabelSelector{
  308. MatchExpressions: []metav1.LabelSelectorRequirement{
  309. {
  310. Key: "pod",
  311. Operator: metav1.LabelSelectorOpIn,
  312. Values: []string{"preemptor"},
  313. },
  314. },
  315. },
  316. TopologyKey: "node",
  317. },
  318. },
  319. },
  320. },
  321. }),
  322. },
  323. // A higher priority pod with anti-affinity.
  324. pod: initPausePod(cs, &pausePodConfig{
  325. Name: "preemptor-pod",
  326. Namespace: testCtx.ns.Name,
  327. Priority: &highPriority,
  328. Labels: map[string]string{"pod": "preemptor"},
  329. Resources: defaultPodRes,
  330. Affinity: &v1.Affinity{
  331. PodAntiAffinity: &v1.PodAntiAffinity{
  332. RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
  333. {
  334. LabelSelector: &metav1.LabelSelector{
  335. MatchExpressions: []metav1.LabelSelectorRequirement{
  336. {
  337. Key: "pod",
  338. Operator: metav1.LabelSelectorOpIn,
  339. Values: []string{"p0"},
  340. },
  341. },
  342. },
  343. TopologyKey: "node",
  344. },
  345. },
  346. },
  347. },
  348. }),
  349. preemptedPodIndexes: map[int]struct{}{},
  350. },
  351. }
  352. // Create a node with some resources and a label.
  353. nodeRes := &v1.ResourceList{
  354. v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI),
  355. v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI),
  356. v1.ResourceMemory: *resource.NewQuantity(500, resource.DecimalSI),
  357. }
  358. node, err := createNode(testCtx.clientSet, "node1", nodeRes)
  359. if err != nil {
  360. t.Fatalf("Error creating nodes: %v", err)
  361. }
  362. nodeLabels := map[string]string{"node": node.Name}
  363. if err = testutils.AddLabelsToNode(testCtx.clientSet, node.Name, nodeLabels); err != nil {
  364. t.Fatalf("Cannot add labels to node: %v", err)
  365. }
  366. if err = waitForNodeLabels(testCtx.clientSet, node.Name, nodeLabels); err != nil {
  367. t.Fatalf("Adding labels to node didn't succeed: %v", err)
  368. }
  369. for _, test := range tests {
  370. t.Logf("================ Running test: %v\n", test.description)
  371. filter.Tokens = test.initTokens
  372. filter.Unresolvable = test.unresolvable
  373. pods := make([]*v1.Pod, len(test.existingPods))
  374. // Create and run existingPods.
  375. for i, p := range test.existingPods {
  376. pods[i], err = runPausePod(cs, p)
  377. if err != nil {
  378. t.Fatalf("Test [%v]: Error running pause pod: %v", test.description, err)
  379. }
  380. }
  381. // Create the "pod".
  382. preemptor, err := createPausePod(cs, test.pod)
  383. if err != nil {
  384. t.Errorf("Error while creating high priority pod: %v", err)
  385. }
  386. // Wait for preemption of pods and make sure the other ones are not preempted.
  387. for i, p := range pods {
  388. if _, found := test.preemptedPodIndexes[i]; found {
  389. if err = wait.Poll(time.Second, wait.ForeverTestTimeout, podIsGettingEvicted(cs, p.Namespace, p.Name)); err != nil {
  390. t.Errorf("Test [%v]: Pod %v/%v is not getting evicted.", test.description, p.Namespace, p.Name)
  391. }
  392. } else {
  393. if p.DeletionTimestamp != nil {
  394. t.Errorf("Test [%v]: Didn't expect pod %v to get preempted.", test.description, p.Name)
  395. }
  396. }
  397. }
  398. // Also check that the preemptor pod gets the NominatedNodeName field set.
  399. if len(test.preemptedPodIndexes) > 0 {
  400. if err := waitForNominatedNodeName(cs, preemptor); err != nil {
  401. t.Errorf("Test [%v]: NominatedNodeName field was not set for pod %v: %v", test.description, preemptor.Name, err)
  402. }
  403. }
  404. // Cleanup
  405. pods = append(pods, preemptor)
  406. cleanupPods(cs, t, pods)
  407. }
  408. }
  409. // TestDisablePreemption tests disable pod preemption of scheduler works as expected.
  410. func TestDisablePreemption(t *testing.T) {
  411. // Initialize scheduler, and disable preemption.
  412. testCtx := initTestDisablePreemption(t, "disable-preemption")
  413. defer cleanupTest(t, testCtx)
  414. cs := testCtx.clientSet
  415. tests := []struct {
  416. description string
  417. existingPods []*v1.Pod
  418. pod *v1.Pod
  419. }{
  420. {
  421. description: "pod preemption will not happen",
  422. existingPods: []*v1.Pod{
  423. initPausePod(testCtx.clientSet, &pausePodConfig{
  424. Name: "victim-pod",
  425. Namespace: testCtx.ns.Name,
  426. Priority: &lowPriority,
  427. Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{
  428. v1.ResourceCPU: *resource.NewMilliQuantity(400, resource.DecimalSI),
  429. v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)},
  430. },
  431. }),
  432. },
  433. pod: initPausePod(cs, &pausePodConfig{
  434. Name: "preemptor-pod",
  435. Namespace: testCtx.ns.Name,
  436. Priority: &highPriority,
  437. Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{
  438. v1.ResourceCPU: *resource.NewMilliQuantity(300, resource.DecimalSI),
  439. v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)},
  440. },
  441. }),
  442. },
  443. }
  444. // Create a node with some resources and a label.
  445. nodeRes := &v1.ResourceList{
  446. v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI),
  447. v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI),
  448. v1.ResourceMemory: *resource.NewQuantity(500, resource.DecimalSI),
  449. }
  450. _, err := createNode(testCtx.clientSet, "node1", nodeRes)
  451. if err != nil {
  452. t.Fatalf("Error creating nodes: %v", err)
  453. }
  454. for _, test := range tests {
  455. pods := make([]*v1.Pod, len(test.existingPods))
  456. // Create and run existingPods.
  457. for i, p := range test.existingPods {
  458. pods[i], err = runPausePod(cs, p)
  459. if err != nil {
  460. t.Fatalf("Test [%v]: Error running pause pod: %v", test.description, err)
  461. }
  462. }
  463. // Create the "pod".
  464. preemptor, err := createPausePod(cs, test.pod)
  465. if err != nil {
  466. t.Errorf("Error while creating high priority pod: %v", err)
  467. }
  468. // Ensure preemptor should keep unschedulable.
  469. if err := waitForPodUnschedulable(cs, preemptor); err != nil {
  470. t.Errorf("Test [%v]: Preemptor %v should not become scheduled",
  471. test.description, preemptor.Name)
  472. }
  473. // Ensure preemptor should not be nominated.
  474. if err := waitForNominatedNodeNameWithTimeout(cs, preemptor, 5*time.Second); err == nil {
  475. t.Errorf("Test [%v]: Preemptor %v should not be nominated",
  476. test.description, preemptor.Name)
  477. }
  478. // Cleanup
  479. pods = append(pods, preemptor)
  480. cleanupPods(cs, t, pods)
  481. }
  482. }
  483. // This test verifies that system critical priorities are created automatically and resolved properly.
  484. func TestPodPriorityResolution(t *testing.T) {
  485. admission := priority.NewPlugin()
  486. testCtx := initTestScheduler(t, initTestMaster(t, "preemption", admission), true, nil)
  487. defer cleanupTest(t, testCtx)
  488. cs := testCtx.clientSet
  489. // Build clientset and informers for controllers.
  490. externalClientset := kubernetes.NewForConfigOrDie(&restclient.Config{
  491. QPS: -1,
  492. Host: testCtx.httpServer.URL,
  493. ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
  494. externalInformers := informers.NewSharedInformerFactory(externalClientset, time.Second)
  495. admission.SetExternalKubeClientSet(externalClientset)
  496. admission.SetExternalKubeInformerFactory(externalInformers)
  497. externalInformers.Start(testCtx.ctx.Done())
  498. externalInformers.WaitForCacheSync(testCtx.ctx.Done())
  499. tests := []struct {
  500. Name string
  501. PriorityClass string
  502. Pod *v1.Pod
  503. ExpectedPriority int32
  504. ExpectedError error
  505. }{
  506. {
  507. Name: "SystemNodeCritical priority class",
  508. PriorityClass: scheduling.SystemNodeCritical,
  509. ExpectedPriority: scheduling.SystemCriticalPriority + 1000,
  510. Pod: initPausePod(cs, &pausePodConfig{
  511. Name: fmt.Sprintf("pod1-%v", scheduling.SystemNodeCritical),
  512. Namespace: metav1.NamespaceSystem,
  513. PriorityClassName: scheduling.SystemNodeCritical,
  514. }),
  515. },
  516. {
  517. Name: "SystemClusterCritical priority class",
  518. PriorityClass: scheduling.SystemClusterCritical,
  519. ExpectedPriority: scheduling.SystemCriticalPriority,
  520. Pod: initPausePod(cs, &pausePodConfig{
  521. Name: fmt.Sprintf("pod2-%v", scheduling.SystemClusterCritical),
  522. Namespace: metav1.NamespaceSystem,
  523. PriorityClassName: scheduling.SystemClusterCritical,
  524. }),
  525. },
  526. {
  527. Name: "Invalid priority class should result in error",
  528. PriorityClass: "foo",
  529. ExpectedPriority: scheduling.SystemCriticalPriority,
  530. Pod: initPausePod(cs, &pausePodConfig{
  531. Name: fmt.Sprintf("pod3-%v", scheduling.SystemClusterCritical),
  532. Namespace: metav1.NamespaceSystem,
  533. PriorityClassName: "foo",
  534. }),
  535. ExpectedError: fmt.Errorf("Error creating pause pod: pods \"pod3-system-cluster-critical\" is forbidden: no PriorityClass with name foo was found"),
  536. },
  537. }
  538. // Create a node with some resources and a label.
  539. nodeRes := &v1.ResourceList{
  540. v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI),
  541. v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI),
  542. v1.ResourceMemory: *resource.NewQuantity(500, resource.DecimalSI),
  543. }
  544. _, err := createNode(testCtx.clientSet, "node1", nodeRes)
  545. if err != nil {
  546. t.Fatalf("Error creating nodes: %v", err)
  547. }
  548. pods := make([]*v1.Pod, 0, len(tests))
  549. for _, test := range tests {
  550. t.Logf("================ Running test: %v\n", test.Name)
  551. t.Run(test.Name, func(t *testing.T) {
  552. pod, err := runPausePod(cs, test.Pod)
  553. if err != nil {
  554. if test.ExpectedError == nil {
  555. t.Fatalf("Test [PodPriority/%v]: Error running pause pod: %v", test.PriorityClass, err)
  556. }
  557. if err.Error() != test.ExpectedError.Error() {
  558. t.Fatalf("Test [PodPriority/%v]: Expected error %v but got error %v", test.PriorityClass, test.ExpectedError, err)
  559. }
  560. return
  561. }
  562. pods = append(pods, pod)
  563. if pod.Spec.Priority != nil {
  564. if *pod.Spec.Priority != test.ExpectedPriority {
  565. t.Errorf("Expected pod %v to have priority %v but was %v", pod.Name, test.ExpectedPriority, pod.Spec.Priority)
  566. }
  567. } else {
  568. t.Errorf("Expected pod %v to have priority %v but was nil", pod.Name, test.PriorityClass)
  569. }
  570. })
  571. }
  572. cleanupPods(cs, t, pods)
  573. cleanupNodes(cs, t)
  574. }
  575. func mkPriorityPodWithGrace(tc *testContext, name string, priority int32, grace int64) *v1.Pod {
  576. defaultPodRes := &v1.ResourceRequirements{Requests: v1.ResourceList{
  577. v1.ResourceCPU: *resource.NewMilliQuantity(100, resource.DecimalSI),
  578. v1.ResourceMemory: *resource.NewQuantity(100, resource.DecimalSI)},
  579. }
  580. pod := initPausePod(tc.clientSet, &pausePodConfig{
  581. Name: name,
  582. Namespace: tc.ns.Name,
  583. Priority: &priority,
  584. Labels: map[string]string{"pod": name},
  585. Resources: defaultPodRes,
  586. })
  587. // Setting grace period to zero. Otherwise, we may never see the actual deletion
  588. // of the pods in integration tests.
  589. pod.Spec.TerminationGracePeriodSeconds = &grace
  590. return pod
  591. }
  592. // This test ensures that while the preempting pod is waiting for the victims to
  593. // terminate, other pending lower priority pods are not scheduled in the room created
  594. // after preemption and while the higher priority pods is not scheduled yet.
  595. func TestPreemptionStarvation(t *testing.T) {
  596. // Initialize scheduler.
  597. testCtx := initTest(t, "preemption")
  598. defer cleanupTest(t, testCtx)
  599. cs := testCtx.clientSet
  600. tests := []struct {
  601. description string
  602. numExistingPod int
  603. numExpectedPending int
  604. preemptor *v1.Pod
  605. }{
  606. {
  607. // This test ensures that while the preempting pod is waiting for the victims
  608. // terminate, other lower priority pods are not scheduled in the room created
  609. // after preemption and while the higher priority pods is not scheduled yet.
  610. description: "starvation test: higher priority pod is scheduled before the lower priority ones",
  611. numExistingPod: 10,
  612. numExpectedPending: 5,
  613. preemptor: initPausePod(cs, &pausePodConfig{
  614. Name: "preemptor-pod",
  615. Namespace: testCtx.ns.Name,
  616. Priority: &highPriority,
  617. Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{
  618. v1.ResourceCPU: *resource.NewMilliQuantity(300, resource.DecimalSI),
  619. v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)},
  620. },
  621. }),
  622. },
  623. }
  624. // Create a node with some resources and a label.
  625. nodeRes := &v1.ResourceList{
  626. v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI),
  627. v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI),
  628. v1.ResourceMemory: *resource.NewQuantity(500, resource.DecimalSI),
  629. }
  630. _, err := createNode(testCtx.clientSet, "node1", nodeRes)
  631. if err != nil {
  632. t.Fatalf("Error creating nodes: %v", err)
  633. }
  634. for _, test := range tests {
  635. pendingPods := make([]*v1.Pod, test.numExpectedPending)
  636. numRunningPods := test.numExistingPod - test.numExpectedPending
  637. runningPods := make([]*v1.Pod, numRunningPods)
  638. // Create and run existingPods.
  639. for i := 0; i < numRunningPods; i++ {
  640. runningPods[i], err = createPausePod(cs, mkPriorityPodWithGrace(testCtx, fmt.Sprintf("rpod-%v", i), mediumPriority, 0))
  641. if err != nil {
  642. t.Fatalf("Test [%v]: Error creating pause pod: %v", test.description, err)
  643. }
  644. }
  645. // make sure that runningPods are all scheduled.
  646. for _, p := range runningPods {
  647. if err := waitForPodToSchedule(cs, p); err != nil {
  648. t.Fatalf("Pod %v/%v didn't get scheduled: %v", p.Namespace, p.Name, err)
  649. }
  650. }
  651. // Create pending pods.
  652. for i := 0; i < test.numExpectedPending; i++ {
  653. pendingPods[i], err = createPausePod(cs, mkPriorityPodWithGrace(testCtx, fmt.Sprintf("ppod-%v", i), mediumPriority, 0))
  654. if err != nil {
  655. t.Fatalf("Test [%v]: Error creating pending pod: %v", test.description, err)
  656. }
  657. }
  658. // Make sure that all pending pods are being marked unschedulable.
  659. for _, p := range pendingPods {
  660. if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout,
  661. podUnschedulable(cs, p.Namespace, p.Name)); err != nil {
  662. t.Errorf("Pod %v/%v didn't get marked unschedulable: %v", p.Namespace, p.Name, err)
  663. }
  664. }
  665. // Create the preemptor.
  666. preemptor, err := createPausePod(cs, test.preemptor)
  667. if err != nil {
  668. t.Errorf("Error while creating the preempting pod: %v", err)
  669. }
  670. // Check that the preemptor pod gets the annotation for nominated node name.
  671. if err := waitForNominatedNodeName(cs, preemptor); err != nil {
  672. t.Errorf("Test [%v]: NominatedNodeName annotation was not set for pod %v/%v: %v", test.description, preemptor.Namespace, preemptor.Name, err)
  673. }
  674. // Make sure that preemptor is scheduled after preemptions.
  675. if err := waitForPodToScheduleWithTimeout(cs, preemptor, 60*time.Second); err != nil {
  676. t.Errorf("Preemptor pod %v didn't get scheduled: %v", preemptor.Name, err)
  677. }
  678. // Cleanup
  679. klog.Info("Cleaning up all pods...")
  680. allPods := pendingPods
  681. allPods = append(allPods, runningPods...)
  682. allPods = append(allPods, preemptor)
  683. cleanupPods(cs, t, allPods)
  684. }
  685. }
  686. // TestPreemptionRaces tests that other scheduling events and operations do not
  687. // race with the preemption process.
  688. func TestPreemptionRaces(t *testing.T) {
  689. // Initialize scheduler.
  690. testCtx := initTest(t, "preemption-race")
  691. defer cleanupTest(t, testCtx)
  692. cs := testCtx.clientSet
  693. tests := []struct {
  694. description string
  695. numInitialPods int // Pods created and executed before running preemptor
  696. numAdditionalPods int // Pods created after creating the preemptor
  697. numRepetitions int // Repeat the tests to check races
  698. preemptor *v1.Pod
  699. }{
  700. {
  701. // This test ensures that while the preempting pod is waiting for the victims
  702. // terminate, other lower priority pods are not scheduled in the room created
  703. // after preemption and while the higher priority pods is not scheduled yet.
  704. description: "ensures that other pods are not scheduled while preemptor is being marked as nominated (issue #72124)",
  705. numInitialPods: 2,
  706. numAdditionalPods: 50,
  707. numRepetitions: 10,
  708. preemptor: initPausePod(cs, &pausePodConfig{
  709. Name: "preemptor-pod",
  710. Namespace: testCtx.ns.Name,
  711. Priority: &highPriority,
  712. Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{
  713. v1.ResourceCPU: *resource.NewMilliQuantity(4900, resource.DecimalSI),
  714. v1.ResourceMemory: *resource.NewQuantity(4900, resource.DecimalSI)},
  715. },
  716. }),
  717. },
  718. }
  719. // Create a node with some resources and a label.
  720. nodeRes := &v1.ResourceList{
  721. v1.ResourcePods: *resource.NewQuantity(100, resource.DecimalSI),
  722. v1.ResourceCPU: *resource.NewMilliQuantity(5000, resource.DecimalSI),
  723. v1.ResourceMemory: *resource.NewQuantity(5000, resource.DecimalSI),
  724. }
  725. _, err := createNode(testCtx.clientSet, "node1", nodeRes)
  726. if err != nil {
  727. t.Fatalf("Error creating nodes: %v", err)
  728. }
  729. for _, test := range tests {
  730. if test.numRepetitions <= 0 {
  731. test.numRepetitions = 1
  732. }
  733. for n := 0; n < test.numRepetitions; n++ {
  734. initialPods := make([]*v1.Pod, test.numInitialPods)
  735. additionalPods := make([]*v1.Pod, test.numAdditionalPods)
  736. // Create and run existingPods.
  737. for i := 0; i < test.numInitialPods; i++ {
  738. initialPods[i], err = createPausePod(cs, mkPriorityPodWithGrace(testCtx, fmt.Sprintf("rpod-%v", i), mediumPriority, 0))
  739. if err != nil {
  740. t.Fatalf("Test [%v]: Error creating pause pod: %v", test.description, err)
  741. }
  742. }
  743. // make sure that initial Pods are all scheduled.
  744. for _, p := range initialPods {
  745. if err := waitForPodToSchedule(cs, p); err != nil {
  746. t.Fatalf("Pod %v/%v didn't get scheduled: %v", p.Namespace, p.Name, err)
  747. }
  748. }
  749. // Create the preemptor.
  750. klog.Info("Creating the preemptor pod...")
  751. preemptor, err := createPausePod(cs, test.preemptor)
  752. if err != nil {
  753. t.Errorf("Error while creating the preempting pod: %v", err)
  754. }
  755. klog.Info("Creating additional pods...")
  756. for i := 0; i < test.numAdditionalPods; i++ {
  757. additionalPods[i], err = createPausePod(cs, mkPriorityPodWithGrace(testCtx, fmt.Sprintf("ppod-%v", i), mediumPriority, 0))
  758. if err != nil {
  759. t.Fatalf("Test [%v]: Error creating pending pod: %v", test.description, err)
  760. }
  761. }
  762. // Check that the preemptor pod gets nominated node name.
  763. if err := waitForNominatedNodeName(cs, preemptor); err != nil {
  764. t.Errorf("Test [%v]: NominatedNodeName annotation was not set for pod %v/%v: %v", test.description, preemptor.Namespace, preemptor.Name, err)
  765. }
  766. // Make sure that preemptor is scheduled after preemptions.
  767. if err := waitForPodToScheduleWithTimeout(cs, preemptor, 60*time.Second); err != nil {
  768. t.Errorf("Preemptor pod %v didn't get scheduled: %v", preemptor.Name, err)
  769. }
  770. klog.Info("Check unschedulable pods still exists and were never scheduled...")
  771. for _, p := range additionalPods {
  772. pod, err := cs.CoreV1().Pods(p.Namespace).Get(context.TODO(), p.Name, metav1.GetOptions{})
  773. if err != nil {
  774. t.Errorf("Error in getting Pod %v/%v info: %v", p.Namespace, p.Name, err)
  775. }
  776. if len(pod.Spec.NodeName) > 0 {
  777. t.Errorf("Pod %v/%v is already scheduled", p.Namespace, p.Name)
  778. }
  779. _, cond := podutil.GetPodCondition(&pod.Status, v1.PodScheduled)
  780. if cond != nil && cond.Status != v1.ConditionFalse {
  781. t.Errorf("Pod %v/%v is no longer unschedulable: %v", p.Namespace, p.Name, err)
  782. }
  783. }
  784. // Cleanup
  785. klog.Info("Cleaning up all pods...")
  786. allPods := additionalPods
  787. allPods = append(allPods, initialPods...)
  788. allPods = append(allPods, preemptor)
  789. cleanupPods(cs, t, allPods)
  790. }
  791. }
  792. }
  793. // TestNominatedNodeCleanUp checks that when there are nominated pods on a
  794. // node and a higher priority pod is nominated to run on the node, the nominated
  795. // node name of the lower priority pods is cleared.
  796. // Test scenario:
  797. // 1. Create a few low priority pods with long grade period that fill up a node.
  798. // 2. Create a medium priority pod that preempt some of those pods.
  799. // 3. Check that nominated node name of the medium priority pod is set.
  800. // 4. Create a high priority pod that preempts some pods on that node.
  801. // 5. Check that nominated node name of the high priority pod is set and nominated
  802. // node name of the medium priority pod is cleared.
  803. func TestNominatedNodeCleanUp(t *testing.T) {
  804. // Initialize scheduler.
  805. testCtx := initTest(t, "preemption")
  806. defer cleanupTest(t, testCtx)
  807. cs := testCtx.clientSet
  808. defer cleanupPodsInNamespace(cs, t, testCtx.ns.Name)
  809. // Create a node with some resources and a label.
  810. nodeRes := &v1.ResourceList{
  811. v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI),
  812. v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI),
  813. v1.ResourceMemory: *resource.NewQuantity(500, resource.DecimalSI),
  814. }
  815. _, err := createNode(testCtx.clientSet, "node1", nodeRes)
  816. if err != nil {
  817. t.Fatalf("Error creating nodes: %v", err)
  818. }
  819. // Step 1. Create a few low priority pods.
  820. lowPriPods := make([]*v1.Pod, 4)
  821. for i := 0; i < len(lowPriPods); i++ {
  822. lowPriPods[i], err = createPausePod(cs, mkPriorityPodWithGrace(testCtx, fmt.Sprintf("lpod-%v", i), lowPriority, 60))
  823. if err != nil {
  824. t.Fatalf("Error creating pause pod: %v", err)
  825. }
  826. }
  827. // make sure that the pods are all scheduled.
  828. for _, p := range lowPriPods {
  829. if err := waitForPodToSchedule(cs, p); err != nil {
  830. t.Fatalf("Pod %v/%v didn't get scheduled: %v", p.Namespace, p.Name, err)
  831. }
  832. }
  833. // Step 2. Create a medium priority pod.
  834. podConf := initPausePod(cs, &pausePodConfig{
  835. Name: "medium-priority",
  836. Namespace: testCtx.ns.Name,
  837. Priority: &mediumPriority,
  838. Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{
  839. v1.ResourceCPU: *resource.NewMilliQuantity(400, resource.DecimalSI),
  840. v1.ResourceMemory: *resource.NewQuantity(400, resource.DecimalSI)},
  841. },
  842. })
  843. medPriPod, err := createPausePod(cs, podConf)
  844. if err != nil {
  845. t.Errorf("Error while creating the medium priority pod: %v", err)
  846. }
  847. // Step 3. Check that nominated node name of the medium priority pod is set.
  848. if err := waitForNominatedNodeName(cs, medPriPod); err != nil {
  849. t.Errorf("NominatedNodeName annotation was not set for pod %v/%v: %v", medPriPod.Namespace, medPriPod.Name, err)
  850. }
  851. // Step 4. Create a high priority pod.
  852. podConf = initPausePod(cs, &pausePodConfig{
  853. Name: "high-priority",
  854. Namespace: testCtx.ns.Name,
  855. Priority: &highPriority,
  856. Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{
  857. v1.ResourceCPU: *resource.NewMilliQuantity(300, resource.DecimalSI),
  858. v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)},
  859. },
  860. })
  861. highPriPod, err := createPausePod(cs, podConf)
  862. if err != nil {
  863. t.Errorf("Error while creating the high priority pod: %v", err)
  864. }
  865. // Step 5. Check that nominated node name of the high priority pod is set.
  866. if err := waitForNominatedNodeName(cs, highPriPod); err != nil {
  867. t.Errorf("NominatedNodeName annotation was not set for pod %v/%v: %v", medPriPod.Namespace, medPriPod.Name, err)
  868. }
  869. // And the nominated node name of the medium priority pod is cleared.
  870. if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
  871. pod, err := cs.CoreV1().Pods(medPriPod.Namespace).Get(context.TODO(), medPriPod.Name, metav1.GetOptions{})
  872. if err != nil {
  873. t.Errorf("Error getting the medium priority pod info: %v", err)
  874. }
  875. if len(pod.Status.NominatedNodeName) == 0 {
  876. return true, nil
  877. }
  878. return false, err
  879. }); err != nil {
  880. t.Errorf("The nominated node name of the medium priority pod was not cleared: %v", err)
  881. }
  882. }
  883. func mkMinAvailablePDB(name, namespace string, uid types.UID, minAvailable int, matchLabels map[string]string) *policy.PodDisruptionBudget {
  884. intMinAvailable := intstr.FromInt(minAvailable)
  885. return &policy.PodDisruptionBudget{
  886. ObjectMeta: metav1.ObjectMeta{
  887. Name: name,
  888. Namespace: namespace,
  889. },
  890. Spec: policy.PodDisruptionBudgetSpec{
  891. MinAvailable: &intMinAvailable,
  892. Selector: &metav1.LabelSelector{MatchLabels: matchLabels},
  893. },
  894. }
  895. }
  896. func addPodConditionReady(pod *v1.Pod) {
  897. pod.Status = v1.PodStatus{
  898. Phase: v1.PodRunning,
  899. Conditions: []v1.PodCondition{
  900. {
  901. Type: v1.PodReady,
  902. Status: v1.ConditionTrue,
  903. },
  904. },
  905. }
  906. }
  907. // TestPDBInPreemption tests PodDisruptionBudget support in preemption.
  908. func TestPDBInPreemption(t *testing.T) {
  909. // Initialize scheduler.
  910. testCtx := initTest(t, "preemption-pdb")
  911. defer cleanupTest(t, testCtx)
  912. cs := testCtx.clientSet
  913. initDisruptionController(t, testCtx)
  914. defaultPodRes := &v1.ResourceRequirements{Requests: v1.ResourceList{
  915. v1.ResourceCPU: *resource.NewMilliQuantity(100, resource.DecimalSI),
  916. v1.ResourceMemory: *resource.NewQuantity(100, resource.DecimalSI)},
  917. }
  918. defaultNodeRes := &v1.ResourceList{
  919. v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI),
  920. v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI),
  921. v1.ResourceMemory: *resource.NewQuantity(500, resource.DecimalSI),
  922. }
  923. type nodeConfig struct {
  924. name string
  925. res *v1.ResourceList
  926. }
  927. tests := []struct {
  928. description string
  929. nodes []*nodeConfig
  930. pdbs []*policy.PodDisruptionBudget
  931. pdbPodNum []int32
  932. existingPods []*v1.Pod
  933. pod *v1.Pod
  934. preemptedPodIndexes map[int]struct{}
  935. }{
  936. {
  937. description: "A non-PDB violating pod is preempted despite its higher priority",
  938. nodes: []*nodeConfig{{name: "node-1", res: defaultNodeRes}},
  939. pdbs: []*policy.PodDisruptionBudget{
  940. mkMinAvailablePDB("pdb-1", testCtx.ns.Name, types.UID("pdb-1-uid"), 2, map[string]string{"foo": "bar"}),
  941. },
  942. pdbPodNum: []int32{2},
  943. existingPods: []*v1.Pod{
  944. initPausePod(testCtx.clientSet, &pausePodConfig{
  945. Name: "low-pod1",
  946. Namespace: testCtx.ns.Name,
  947. Priority: &lowPriority,
  948. Resources: defaultPodRes,
  949. Labels: map[string]string{"foo": "bar"},
  950. }),
  951. initPausePod(testCtx.clientSet, &pausePodConfig{
  952. Name: "low-pod2",
  953. Namespace: testCtx.ns.Name,
  954. Priority: &lowPriority,
  955. Resources: defaultPodRes,
  956. Labels: map[string]string{"foo": "bar"},
  957. }),
  958. initPausePod(testCtx.clientSet, &pausePodConfig{
  959. Name: "mid-pod3",
  960. Namespace: testCtx.ns.Name,
  961. Priority: &mediumPriority,
  962. Resources: defaultPodRes,
  963. }),
  964. },
  965. pod: initPausePod(cs, &pausePodConfig{
  966. Name: "preemptor-pod",
  967. Namespace: testCtx.ns.Name,
  968. Priority: &highPriority,
  969. Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{
  970. v1.ResourceCPU: *resource.NewMilliQuantity(300, resource.DecimalSI),
  971. v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)},
  972. },
  973. }),
  974. preemptedPodIndexes: map[int]struct{}{2: {}},
  975. },
  976. {
  977. description: "A node without any PDB violating pods is preferred for preemption",
  978. nodes: []*nodeConfig{
  979. {name: "node-1", res: defaultNodeRes},
  980. {name: "node-2", res: defaultNodeRes},
  981. },
  982. pdbs: []*policy.PodDisruptionBudget{
  983. mkMinAvailablePDB("pdb-1", testCtx.ns.Name, types.UID("pdb-1-uid"), 2, map[string]string{"foo": "bar"}),
  984. },
  985. pdbPodNum: []int32{1},
  986. existingPods: []*v1.Pod{
  987. initPausePod(testCtx.clientSet, &pausePodConfig{
  988. Name: "low-pod1",
  989. Namespace: testCtx.ns.Name,
  990. Priority: &lowPriority,
  991. Resources: defaultPodRes,
  992. NodeName: "node-1",
  993. Labels: map[string]string{"foo": "bar"},
  994. }),
  995. initPausePod(testCtx.clientSet, &pausePodConfig{
  996. Name: "mid-pod2",
  997. Namespace: testCtx.ns.Name,
  998. Priority: &mediumPriority,
  999. NodeName: "node-2",
  1000. Resources: defaultPodRes,
  1001. }),
  1002. },
  1003. pod: initPausePod(cs, &pausePodConfig{
  1004. Name: "preemptor-pod",
  1005. Namespace: testCtx.ns.Name,
  1006. Priority: &highPriority,
  1007. Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{
  1008. v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI),
  1009. v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)},
  1010. },
  1011. }),
  1012. preemptedPodIndexes: map[int]struct{}{1: {}},
  1013. },
  1014. {
  1015. description: "A node with fewer PDB violating pods is preferred for preemption",
  1016. nodes: []*nodeConfig{
  1017. {name: "node-1", res: defaultNodeRes},
  1018. {name: "node-2", res: defaultNodeRes},
  1019. {name: "node-3", res: defaultNodeRes},
  1020. },
  1021. pdbs: []*policy.PodDisruptionBudget{
  1022. mkMinAvailablePDB("pdb-1", testCtx.ns.Name, types.UID("pdb-1-uid"), 2, map[string]string{"foo1": "bar"}),
  1023. mkMinAvailablePDB("pdb-2", testCtx.ns.Name, types.UID("pdb-2-uid"), 2, map[string]string{"foo2": "bar"}),
  1024. },
  1025. pdbPodNum: []int32{1, 5},
  1026. existingPods: []*v1.Pod{
  1027. initPausePod(testCtx.clientSet, &pausePodConfig{
  1028. Name: "low-pod1",
  1029. Namespace: testCtx.ns.Name,
  1030. Priority: &lowPriority,
  1031. Resources: defaultPodRes,
  1032. NodeName: "node-1",
  1033. Labels: map[string]string{"foo1": "bar"},
  1034. }),
  1035. initPausePod(testCtx.clientSet, &pausePodConfig{
  1036. Name: "mid-pod1",
  1037. Namespace: testCtx.ns.Name,
  1038. Priority: &mediumPriority,
  1039. Resources: defaultPodRes,
  1040. NodeName: "node-1",
  1041. }),
  1042. initPausePod(testCtx.clientSet, &pausePodConfig{
  1043. Name: "low-pod2",
  1044. Namespace: testCtx.ns.Name,
  1045. Priority: &lowPriority,
  1046. Resources: defaultPodRes,
  1047. NodeName: "node-2",
  1048. Labels: map[string]string{"foo2": "bar"},
  1049. }),
  1050. initPausePod(testCtx.clientSet, &pausePodConfig{
  1051. Name: "mid-pod2",
  1052. Namespace: testCtx.ns.Name,
  1053. Priority: &mediumPriority,
  1054. Resources: defaultPodRes,
  1055. NodeName: "node-2",
  1056. Labels: map[string]string{"foo2": "bar"},
  1057. }),
  1058. initPausePod(testCtx.clientSet, &pausePodConfig{
  1059. Name: "low-pod4",
  1060. Namespace: testCtx.ns.Name,
  1061. Priority: &lowPriority,
  1062. Resources: defaultPodRes,
  1063. NodeName: "node-3",
  1064. Labels: map[string]string{"foo2": "bar"},
  1065. }),
  1066. initPausePod(testCtx.clientSet, &pausePodConfig{
  1067. Name: "low-pod5",
  1068. Namespace: testCtx.ns.Name,
  1069. Priority: &lowPriority,
  1070. Resources: defaultPodRes,
  1071. NodeName: "node-3",
  1072. Labels: map[string]string{"foo2": "bar"},
  1073. }),
  1074. initPausePod(testCtx.clientSet, &pausePodConfig{
  1075. Name: "low-pod6",
  1076. Namespace: testCtx.ns.Name,
  1077. Priority: &lowPriority,
  1078. Resources: defaultPodRes,
  1079. NodeName: "node-3",
  1080. Labels: map[string]string{"foo2": "bar"},
  1081. }),
  1082. },
  1083. pod: initPausePod(cs, &pausePodConfig{
  1084. Name: "preemptor-pod",
  1085. Namespace: testCtx.ns.Name,
  1086. Priority: &highPriority,
  1087. Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{
  1088. v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI),
  1089. v1.ResourceMemory: *resource.NewQuantity(400, resource.DecimalSI)},
  1090. },
  1091. }),
  1092. // The third node is chosen because PDB is not violated for node 3 and the victims have lower priority than node-2.
  1093. preemptedPodIndexes: map[int]struct{}{4: {}, 5: {}, 6: {}},
  1094. },
  1095. }
  1096. for _, test := range tests {
  1097. t.Logf("================ Running test: %v\n", test.description)
  1098. for _, nodeConf := range test.nodes {
  1099. _, err := createNode(cs, nodeConf.name, nodeConf.res)
  1100. if err != nil {
  1101. t.Fatalf("Error creating node %v: %v", nodeConf.name, err)
  1102. }
  1103. }
  1104. pods := make([]*v1.Pod, len(test.existingPods))
  1105. var err error
  1106. // Create and run existingPods.
  1107. for i, p := range test.existingPods {
  1108. if pods[i], err = runPausePod(cs, p); err != nil {
  1109. t.Fatalf("Test [%v]: Error running pause pod: %v", test.description, err)
  1110. }
  1111. // Add pod condition ready so that PDB is updated.
  1112. addPodConditionReady(p)
  1113. if _, err := testCtx.clientSet.CoreV1().Pods(testCtx.ns.Name).UpdateStatus(context.TODO(), p, metav1.UpdateOptions{}); err != nil {
  1114. t.Fatal(err)
  1115. }
  1116. }
  1117. // Wait for Pods to be stable in scheduler cache.
  1118. if err := waitCachedPodsStable(testCtx, test.existingPods); err != nil {
  1119. t.Fatalf("Not all pods are stable in the cache: %v", err)
  1120. }
  1121. // Create PDBs.
  1122. for _, pdb := range test.pdbs {
  1123. _, err := testCtx.clientSet.PolicyV1beta1().PodDisruptionBudgets(testCtx.ns.Name).Create(context.TODO(), pdb, metav1.CreateOptions{})
  1124. if err != nil {
  1125. t.Fatalf("Failed to create PDB: %v", err)
  1126. }
  1127. }
  1128. // Wait for PDBs to become stable.
  1129. if err := waitForPDBsStable(testCtx, test.pdbs, test.pdbPodNum); err != nil {
  1130. t.Fatalf("Not all pdbs are stable in the cache: %v", err)
  1131. }
  1132. // Create the "pod".
  1133. preemptor, err := createPausePod(cs, test.pod)
  1134. if err != nil {
  1135. t.Errorf("Error while creating high priority pod: %v", err)
  1136. }
  1137. // Wait for preemption of pods and make sure the other ones are not preempted.
  1138. for i, p := range pods {
  1139. if _, found := test.preemptedPodIndexes[i]; found {
  1140. if err = wait.Poll(time.Second, wait.ForeverTestTimeout, podIsGettingEvicted(cs, p.Namespace, p.Name)); err != nil {
  1141. t.Errorf("Test [%v]: Pod %v/%v is not getting evicted.", test.description, p.Namespace, p.Name)
  1142. }
  1143. } else {
  1144. if p.DeletionTimestamp != nil {
  1145. t.Errorf("Test [%v]: Didn't expect pod %v/%v to get preempted.", test.description, p.Namespace, p.Name)
  1146. }
  1147. }
  1148. }
  1149. // Also check that the preemptor pod gets the annotation for nominated node name.
  1150. if len(test.preemptedPodIndexes) > 0 {
  1151. if err := waitForNominatedNodeName(cs, preemptor); err != nil {
  1152. t.Errorf("Test [%v]: NominatedNodeName annotation was not set for pod %v/%v: %v", test.description, preemptor.Namespace, preemptor.Name, err)
  1153. }
  1154. }
  1155. // Cleanup
  1156. pods = append(pods, preemptor)
  1157. cleanupPods(cs, t, pods)
  1158. cs.PolicyV1beta1().PodDisruptionBudgets(testCtx.ns.Name).DeleteCollection(context.TODO(), nil, metav1.ListOptions{})
  1159. cs.CoreV1().Nodes().DeleteCollection(context.TODO(), nil, metav1.ListOptions{})
  1160. }
  1161. }