extender_test.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package core
  14. import (
  15. "fmt"
  16. "reflect"
  17. "testing"
  18. "time"
  19. "k8s.io/api/core/v1"
  20. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  21. "k8s.io/apimachinery/pkg/util/wait"
  22. "k8s.io/kubernetes/pkg/scheduler/algorithm"
  23. "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
  24. "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
  25. schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
  26. internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
  27. internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
  28. schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
  29. schedulertesting "k8s.io/kubernetes/pkg/scheduler/testing"
  30. "k8s.io/kubernetes/pkg/scheduler/util"
  31. )
  32. type fitPredicate func(pod *v1.Pod, node *v1.Node) (bool, error)
  33. type priorityFunc func(pod *v1.Pod, nodes []*v1.Node) (*schedulerapi.HostPriorityList, error)
  34. type priorityConfig struct {
  35. function priorityFunc
  36. weight int
  37. }
  38. func errorPredicateExtender(pod *v1.Pod, node *v1.Node) (bool, error) {
  39. return false, fmt.Errorf("Some error")
  40. }
  41. func falsePredicateExtender(pod *v1.Pod, node *v1.Node) (bool, error) {
  42. return false, nil
  43. }
  44. func truePredicateExtender(pod *v1.Pod, node *v1.Node) (bool, error) {
  45. return true, nil
  46. }
  47. func machine1PredicateExtender(pod *v1.Pod, node *v1.Node) (bool, error) {
  48. if node.Name == "machine1" {
  49. return true, nil
  50. }
  51. return false, nil
  52. }
  53. func machine2PredicateExtender(pod *v1.Pod, node *v1.Node) (bool, error) {
  54. if node.Name == "machine2" {
  55. return true, nil
  56. }
  57. return false, nil
  58. }
  59. func errorPrioritizerExtender(pod *v1.Pod, nodes []*v1.Node) (*schedulerapi.HostPriorityList, error) {
  60. return &schedulerapi.HostPriorityList{}, fmt.Errorf("Some error")
  61. }
  62. func machine1PrioritizerExtender(pod *v1.Pod, nodes []*v1.Node) (*schedulerapi.HostPriorityList, error) {
  63. result := schedulerapi.HostPriorityList{}
  64. for _, node := range nodes {
  65. score := 1
  66. if node.Name == "machine1" {
  67. score = 10
  68. }
  69. result = append(result, schedulerapi.HostPriority{Host: node.Name, Score: score})
  70. }
  71. return &result, nil
  72. }
  73. func machine2PrioritizerExtender(pod *v1.Pod, nodes []*v1.Node) (*schedulerapi.HostPriorityList, error) {
  74. result := schedulerapi.HostPriorityList{}
  75. for _, node := range nodes {
  76. score := 1
  77. if node.Name == "machine2" {
  78. score = 10
  79. }
  80. result = append(result, schedulerapi.HostPriority{Host: node.Name, Score: score})
  81. }
  82. return &result, nil
  83. }
  84. func machine2Prioritizer(_ *v1.Pod, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo, nodes []*v1.Node) (schedulerapi.HostPriorityList, error) {
  85. result := []schedulerapi.HostPriority{}
  86. for _, node := range nodes {
  87. score := 1
  88. if node.Name == "machine2" {
  89. score = 10
  90. }
  91. result = append(result, schedulerapi.HostPriority{Host: node.Name, Score: score})
  92. }
  93. return result, nil
  94. }
  95. type FakeExtender struct {
  96. predicates []fitPredicate
  97. prioritizers []priorityConfig
  98. weight int
  99. nodeCacheCapable bool
  100. filteredNodes []*v1.Node
  101. unInterested bool
  102. ignorable bool
  103. // Cached node information for fake extender
  104. cachedNodeNameToInfo map[string]*schedulernodeinfo.NodeInfo
  105. }
  106. func (f *FakeExtender) Name() string {
  107. return "FakeExtender"
  108. }
  109. func (f *FakeExtender) IsIgnorable() bool {
  110. return f.ignorable
  111. }
  112. func (f *FakeExtender) SupportsPreemption() bool {
  113. // Assume preempt verb is always defined.
  114. return true
  115. }
  116. func (f *FakeExtender) ProcessPreemption(
  117. pod *v1.Pod,
  118. nodeToVictims map[*v1.Node]*schedulerapi.Victims,
  119. nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo,
  120. ) (map[*v1.Node]*schedulerapi.Victims, error) {
  121. nodeToVictimsCopy := map[*v1.Node]*schedulerapi.Victims{}
  122. // We don't want to change the original nodeToVictims
  123. for k, v := range nodeToVictims {
  124. // In real world implementation, extender's user should have their own way to get node object
  125. // by name if needed (e.g. query kube-apiserver etc).
  126. //
  127. // For test purpose, we just use node from parameters directly.
  128. nodeToVictimsCopy[k] = v
  129. }
  130. for node, victims := range nodeToVictimsCopy {
  131. // Try to do preemption on extender side.
  132. extenderVictimPods, extendernPDBViolations, fits, err := f.selectVictimsOnNodeByExtender(pod, node, nodeNameToInfo)
  133. if err != nil {
  134. return nil, err
  135. }
  136. // If it's unfit after extender's preemption, this node is unresolvable by preemption overall,
  137. // let's remove it from potential preemption nodes.
  138. if !fits {
  139. delete(nodeToVictimsCopy, node)
  140. } else {
  141. // Append new victims to original victims
  142. nodeToVictimsCopy[node].Pods = append(victims.Pods, extenderVictimPods...)
  143. nodeToVictimsCopy[node].NumPDBViolations = victims.NumPDBViolations + extendernPDBViolations
  144. }
  145. }
  146. return nodeToVictimsCopy, nil
  147. }
  148. // selectVictimsOnNodeByExtender checks the given nodes->pods map with predicates on extender's side.
  149. // Returns:
  150. // 1. More victim pods (if any) amended by preemption phase of extender.
  151. // 2. Number of violating victim (used to calculate PDB).
  152. // 3. Fits or not after preemption phase on extender's side.
  153. func (f *FakeExtender) selectVictimsOnNodeByExtender(
  154. pod *v1.Pod,
  155. node *v1.Node,
  156. nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo,
  157. ) ([]*v1.Pod, int, bool, error) {
  158. // If a extender support preemption but have no cached node info, let's run filter to make sure
  159. // default scheduler's decision still stand with given pod and node.
  160. if !f.nodeCacheCapable {
  161. fits, err := f.runPredicate(pod, node)
  162. if err != nil {
  163. return nil, 0, false, err
  164. }
  165. if !fits {
  166. return nil, 0, false, nil
  167. }
  168. return []*v1.Pod{}, 0, true, nil
  169. }
  170. // Otherwise, as a extender support preemption and have cached node info, we will assume cachedNodeNameToInfo is available
  171. // and get cached node info by given node name.
  172. nodeInfoCopy := f.cachedNodeNameToInfo[node.GetName()].Clone()
  173. potentialVictims := util.SortableList{CompFunc: util.MoreImportantPod}
  174. removePod := func(rp *v1.Pod) {
  175. nodeInfoCopy.RemovePod(rp)
  176. }
  177. addPod := func(ap *v1.Pod) {
  178. nodeInfoCopy.AddPod(ap)
  179. }
  180. // As the first step, remove all the lower priority pods from the node and
  181. // check if the given pod can be scheduled.
  182. podPriority := util.GetPodPriority(pod)
  183. for _, p := range nodeInfoCopy.Pods() {
  184. if util.GetPodPriority(p) < podPriority {
  185. potentialVictims.Items = append(potentialVictims.Items, p)
  186. removePod(p)
  187. }
  188. }
  189. potentialVictims.Sort()
  190. // If the new pod does not fit after removing all the lower priority pods,
  191. // we are almost done and this node is not suitable for preemption.
  192. fits, err := f.runPredicate(pod, nodeInfoCopy.Node())
  193. if err != nil {
  194. return nil, 0, false, err
  195. }
  196. if !fits {
  197. return nil, 0, false, nil
  198. }
  199. var victims []*v1.Pod
  200. // TODO(harry): handle PDBs in the future.
  201. numViolatingVictim := 0
  202. reprievePod := func(p *v1.Pod) bool {
  203. addPod(p)
  204. fits, _ := f.runPredicate(pod, nodeInfoCopy.Node())
  205. if !fits {
  206. removePod(p)
  207. victims = append(victims, p)
  208. }
  209. return fits
  210. }
  211. // For now, assume all potential victims to be non-violating.
  212. // Now we try to reprieve non-violating victims.
  213. for _, p := range potentialVictims.Items {
  214. reprievePod(p.(*v1.Pod))
  215. }
  216. return victims, numViolatingVictim, true, nil
  217. }
  218. // runPredicate run predicates of extender one by one for given pod and node.
  219. // Returns: fits or not.
  220. func (f *FakeExtender) runPredicate(pod *v1.Pod, node *v1.Node) (bool, error) {
  221. fits := true
  222. var err error
  223. for _, predicate := range f.predicates {
  224. fits, err = predicate(pod, node)
  225. if err != nil {
  226. return false, err
  227. }
  228. if !fits {
  229. break
  230. }
  231. }
  232. return fits, nil
  233. }
  234. func (f *FakeExtender) Filter(pod *v1.Pod, nodes []*v1.Node, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo) ([]*v1.Node, schedulerapi.FailedNodesMap, error) {
  235. filtered := []*v1.Node{}
  236. failedNodesMap := schedulerapi.FailedNodesMap{}
  237. for _, node := range nodes {
  238. fits, err := f.runPredicate(pod, node)
  239. if err != nil {
  240. return []*v1.Node{}, schedulerapi.FailedNodesMap{}, err
  241. }
  242. if fits {
  243. filtered = append(filtered, node)
  244. } else {
  245. failedNodesMap[node.Name] = "FakeExtender failed"
  246. }
  247. }
  248. f.filteredNodes = filtered
  249. if f.nodeCacheCapable {
  250. return filtered, failedNodesMap, nil
  251. }
  252. return filtered, failedNodesMap, nil
  253. }
  254. func (f *FakeExtender) Prioritize(pod *v1.Pod, nodes []*v1.Node) (*schedulerapi.HostPriorityList, int, error) {
  255. result := schedulerapi.HostPriorityList{}
  256. combinedScores := map[string]int{}
  257. for _, prioritizer := range f.prioritizers {
  258. weight := prioritizer.weight
  259. if weight == 0 {
  260. continue
  261. }
  262. priorityFunc := prioritizer.function
  263. prioritizedList, err := priorityFunc(pod, nodes)
  264. if err != nil {
  265. return &schedulerapi.HostPriorityList{}, 0, err
  266. }
  267. for _, hostEntry := range *prioritizedList {
  268. combinedScores[hostEntry.Host] += hostEntry.Score * weight
  269. }
  270. }
  271. for host, score := range combinedScores {
  272. result = append(result, schedulerapi.HostPriority{Host: host, Score: score})
  273. }
  274. return &result, f.weight, nil
  275. }
  276. func (f *FakeExtender) Bind(binding *v1.Binding) error {
  277. if len(f.filteredNodes) != 0 {
  278. for _, node := range f.filteredNodes {
  279. if node.Name == binding.Target.Name {
  280. f.filteredNodes = nil
  281. return nil
  282. }
  283. }
  284. err := fmt.Errorf("Node %v not in filtered nodes %v", binding.Target.Name, f.filteredNodes)
  285. f.filteredNodes = nil
  286. return err
  287. }
  288. return nil
  289. }
  290. func (f *FakeExtender) IsBinder() bool {
  291. return true
  292. }
  293. func (f *FakeExtender) IsInterested(pod *v1.Pod) bool {
  294. return !f.unInterested
  295. }
  296. var _ algorithm.SchedulerExtender = &FakeExtender{}
  297. func TestGenericSchedulerWithExtenders(t *testing.T) {
  298. tests := []struct {
  299. name string
  300. predicates map[string]predicates.FitPredicate
  301. prioritizers []priorities.PriorityConfig
  302. extenders []FakeExtender
  303. nodes []string
  304. expectedResult ScheduleResult
  305. expectsErr bool
  306. }{
  307. {
  308. predicates: map[string]predicates.FitPredicate{"true": truePredicate},
  309. prioritizers: []priorities.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}},
  310. extenders: []FakeExtender{
  311. {
  312. predicates: []fitPredicate{truePredicateExtender},
  313. },
  314. {
  315. predicates: []fitPredicate{errorPredicateExtender},
  316. },
  317. },
  318. nodes: []string{"machine1", "machine2"},
  319. expectsErr: true,
  320. name: "test 1",
  321. },
  322. {
  323. predicates: map[string]predicates.FitPredicate{"true": truePredicate},
  324. prioritizers: []priorities.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}},
  325. extenders: []FakeExtender{
  326. {
  327. predicates: []fitPredicate{truePredicateExtender},
  328. },
  329. {
  330. predicates: []fitPredicate{falsePredicateExtender},
  331. },
  332. },
  333. nodes: []string{"machine1", "machine2"},
  334. expectsErr: true,
  335. name: "test 2",
  336. },
  337. {
  338. predicates: map[string]predicates.FitPredicate{"true": truePredicate},
  339. prioritizers: []priorities.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}},
  340. extenders: []FakeExtender{
  341. {
  342. predicates: []fitPredicate{truePredicateExtender},
  343. },
  344. {
  345. predicates: []fitPredicate{machine1PredicateExtender},
  346. },
  347. },
  348. nodes: []string{"machine1", "machine2"},
  349. expectedResult: ScheduleResult{
  350. SuggestedHost: "machine1",
  351. EvaluatedNodes: 2,
  352. FeasibleNodes: 1,
  353. },
  354. name: "test 3",
  355. },
  356. {
  357. predicates: map[string]predicates.FitPredicate{"true": truePredicate},
  358. prioritizers: []priorities.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}},
  359. extenders: []FakeExtender{
  360. {
  361. predicates: []fitPredicate{machine2PredicateExtender},
  362. },
  363. {
  364. predicates: []fitPredicate{machine1PredicateExtender},
  365. },
  366. },
  367. nodes: []string{"machine1", "machine2"},
  368. expectsErr: true,
  369. name: "test 4",
  370. },
  371. {
  372. predicates: map[string]predicates.FitPredicate{"true": truePredicate},
  373. prioritizers: []priorities.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}},
  374. extenders: []FakeExtender{
  375. {
  376. predicates: []fitPredicate{truePredicateExtender},
  377. prioritizers: []priorityConfig{{errorPrioritizerExtender, 10}},
  378. weight: 1,
  379. },
  380. },
  381. nodes: []string{"machine1"},
  382. expectedResult: ScheduleResult{
  383. SuggestedHost: "machine1",
  384. EvaluatedNodes: 1,
  385. FeasibleNodes: 1,
  386. },
  387. name: "test 5",
  388. },
  389. {
  390. predicates: map[string]predicates.FitPredicate{"true": truePredicate},
  391. prioritizers: []priorities.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}},
  392. extenders: []FakeExtender{
  393. {
  394. predicates: []fitPredicate{truePredicateExtender},
  395. prioritizers: []priorityConfig{{machine1PrioritizerExtender, 10}},
  396. weight: 1,
  397. },
  398. {
  399. predicates: []fitPredicate{truePredicateExtender},
  400. prioritizers: []priorityConfig{{machine2PrioritizerExtender, 10}},
  401. weight: 5,
  402. },
  403. },
  404. nodes: []string{"machine1", "machine2"},
  405. expectedResult: ScheduleResult{
  406. SuggestedHost: "machine2",
  407. EvaluatedNodes: 2,
  408. FeasibleNodes: 2,
  409. },
  410. name: "test 6",
  411. },
  412. {
  413. predicates: map[string]predicates.FitPredicate{"true": truePredicate},
  414. prioritizers: []priorities.PriorityConfig{{Function: machine2Prioritizer, Weight: 20}},
  415. extenders: []FakeExtender{
  416. {
  417. predicates: []fitPredicate{truePredicateExtender},
  418. prioritizers: []priorityConfig{{machine1PrioritizerExtender, 10}},
  419. weight: 1,
  420. },
  421. },
  422. nodes: []string{"machine1", "machine2"},
  423. expectedResult: ScheduleResult{
  424. SuggestedHost: "machine2",
  425. EvaluatedNodes: 2,
  426. FeasibleNodes: 2,
  427. }, // machine2 has higher score
  428. name: "test 7",
  429. },
  430. {
  431. // Scheduler is expected to not send pod to extender in
  432. // Filter/Prioritize phases if the extender is not interested in
  433. // the pod.
  434. //
  435. // If scheduler sends the pod by mistake, the test would fail
  436. // because of the errors from errorPredicateExtender and/or
  437. // errorPrioritizerExtender.
  438. predicates: map[string]predicates.FitPredicate{"true": truePredicate},
  439. prioritizers: []priorities.PriorityConfig{{Function: machine2Prioritizer, Weight: 1}},
  440. extenders: []FakeExtender{
  441. {
  442. predicates: []fitPredicate{errorPredicateExtender},
  443. prioritizers: []priorityConfig{{errorPrioritizerExtender, 10}},
  444. unInterested: true,
  445. },
  446. },
  447. nodes: []string{"machine1", "machine2"},
  448. expectsErr: false,
  449. expectedResult: ScheduleResult{
  450. SuggestedHost: "machine2",
  451. EvaluatedNodes: 2,
  452. FeasibleNodes: 2,
  453. }, // machine2 has higher score
  454. name: "test 8",
  455. },
  456. {
  457. // Scheduling is expected to not fail in
  458. // Filter/Prioritize phases if the extender is not available and ignorable.
  459. //
  460. // If scheduler did not ignore the extender, the test would fail
  461. // because of the errors from errorPredicateExtender.
  462. predicates: map[string]predicates.FitPredicate{"true": truePredicate},
  463. prioritizers: []priorities.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}},
  464. extenders: []FakeExtender{
  465. {
  466. predicates: []fitPredicate{errorPredicateExtender},
  467. ignorable: true,
  468. },
  469. {
  470. predicates: []fitPredicate{machine1PredicateExtender},
  471. },
  472. },
  473. nodes: []string{"machine1", "machine2"},
  474. expectsErr: false,
  475. expectedResult: ScheduleResult{
  476. SuggestedHost: "machine1",
  477. EvaluatedNodes: 2,
  478. FeasibleNodes: 1,
  479. },
  480. name: "test 9",
  481. },
  482. }
  483. for _, test := range tests {
  484. t.Run(test.name, func(t *testing.T) {
  485. extenders := []algorithm.SchedulerExtender{}
  486. for ii := range test.extenders {
  487. extenders = append(extenders, &test.extenders[ii])
  488. }
  489. cache := internalcache.New(time.Duration(0), wait.NeverStop)
  490. for _, name := range test.nodes {
  491. cache.AddNode(createNode(name))
  492. }
  493. queue := internalqueue.NewSchedulingQueue(nil, nil)
  494. scheduler := NewGenericScheduler(
  495. cache,
  496. queue,
  497. test.predicates,
  498. predicates.EmptyPredicateMetadataProducer,
  499. test.prioritizers,
  500. priorities.EmptyPriorityMetadataProducer,
  501. emptyFramework,
  502. extenders,
  503. nil,
  504. schedulertesting.FakePersistentVolumeClaimLister{},
  505. schedulertesting.FakePDBLister{},
  506. false,
  507. false,
  508. schedulerapi.DefaultPercentageOfNodesToScore,
  509. false)
  510. podIgnored := &v1.Pod{}
  511. result, err := scheduler.Schedule(podIgnored, schedulertesting.FakeNodeLister(makeNodeList(test.nodes)))
  512. if test.expectsErr {
  513. if err == nil {
  514. t.Errorf("Unexpected non-error, result %+v", result)
  515. }
  516. } else {
  517. if err != nil {
  518. t.Errorf("Unexpected error: %v", err)
  519. return
  520. }
  521. if !reflect.DeepEqual(result, test.expectedResult) {
  522. t.Errorf("Expected: %+v, Saw: %+v", test.expectedResult, result)
  523. }
  524. }
  525. })
  526. }
  527. }
  528. func createNode(name string) *v1.Node {
  529. return &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: name}}
  530. }