generic_scheduler.go 41 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package core
  14. import (
  15. "context"
  16. "fmt"
  17. "math"
  18. "math/rand"
  19. "sort"
  20. "strings"
  21. "sync"
  22. "sync/atomic"
  23. "time"
  24. "k8s.io/klog"
  25. v1 "k8s.io/api/core/v1"
  26. policy "k8s.io/api/policy/v1beta1"
  27. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  28. "k8s.io/apimachinery/pkg/labels"
  29. corelisters "k8s.io/client-go/listers/core/v1"
  30. policylisters "k8s.io/client-go/listers/policy/v1beta1"
  31. "k8s.io/client-go/util/workqueue"
  32. podutil "k8s.io/kubernetes/pkg/api/v1/pod"
  33. extenderv1 "k8s.io/kubernetes/pkg/scheduler/apis/extender/v1"
  34. framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
  35. internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
  36. internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
  37. "k8s.io/kubernetes/pkg/scheduler/listers"
  38. "k8s.io/kubernetes/pkg/scheduler/metrics"
  39. schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
  40. "k8s.io/kubernetes/pkg/scheduler/util"
  41. "k8s.io/kubernetes/pkg/scheduler/volumebinder"
  42. utiltrace "k8s.io/utils/trace"
  43. )
  44. const (
  45. // minFeasibleNodesToFind is the minimum number of nodes that would be scored
  46. // in each scheduling cycle. This is a semi-arbitrary value to ensure that a
  47. // certain minimum of nodes are checked for feasibility. This in turn helps
  48. // ensure a minimum level of spreading.
  49. minFeasibleNodesToFind = 100
  50. // minFeasibleNodesPercentageToFind is the minimum percentage of nodes that
  51. // would be scored in each scheduling cycle. This is a semi-arbitrary value
  52. // to ensure that a certain minimum of nodes are checked for feasibility.
  53. // This in turn helps ensure a minimum level of spreading.
  54. minFeasibleNodesPercentageToFind = 5
  55. )
  56. // FitError describes a fit error of a pod.
  57. type FitError struct {
  58. Pod *v1.Pod
  59. NumAllNodes int
  60. FilteredNodesStatuses framework.NodeToStatusMap
  61. }
  62. // ErrNoNodesAvailable is used to describe the error that no nodes available to schedule pods.
  63. var ErrNoNodesAvailable = fmt.Errorf("no nodes available to schedule pods")
  64. const (
  65. // NoNodeAvailableMsg is used to format message when no nodes available.
  66. NoNodeAvailableMsg = "0/%v nodes are available"
  67. )
  68. // Error returns detailed information of why the pod failed to fit on each node
  69. func (f *FitError) Error() string {
  70. reasons := make(map[string]int)
  71. for _, status := range f.FilteredNodesStatuses {
  72. for _, reason := range status.Reasons() {
  73. reasons[reason]++
  74. }
  75. }
  76. sortReasonsHistogram := func() []string {
  77. var reasonStrings []string
  78. for k, v := range reasons {
  79. reasonStrings = append(reasonStrings, fmt.Sprintf("%v %v", v, k))
  80. }
  81. sort.Strings(reasonStrings)
  82. return reasonStrings
  83. }
  84. reasonMsg := fmt.Sprintf(NoNodeAvailableMsg+": %v.", f.NumAllNodes, strings.Join(sortReasonsHistogram(), ", "))
  85. return reasonMsg
  86. }
  87. // ScheduleAlgorithm is an interface implemented by things that know how to schedule pods
  88. // onto machines.
  89. // TODO: Rename this type.
  90. type ScheduleAlgorithm interface {
  91. Schedule(context.Context, *framework.CycleState, *v1.Pod) (scheduleResult ScheduleResult, err error)
  92. // Preempt receives scheduling errors for a pod and tries to create room for
  93. // the pod by preempting lower priority pods if possible.
  94. // It returns the node where preemption happened, a list of preempted pods, a
  95. // list of pods whose nominated node name should be removed, and error if any.
  96. Preempt(context.Context, *framework.CycleState, *v1.Pod, error) (selectedNode *v1.Node, preemptedPods []*v1.Pod, cleanupNominatedPods []*v1.Pod, err error)
  97. // Prioritizers returns a slice of priority config. This is exposed for
  98. // testing.
  99. Extenders() []SchedulerExtender
  100. // Snapshot snapshots scheduler cache and node infos. This is needed
  101. // for cluster autoscaler integration.
  102. // TODO(#85691): remove this once CA migrates to creating a Framework instead of a full scheduler.
  103. Snapshot() error
  104. // Framework returns the scheduler framework instance. This is needed for cluster autoscaler integration.
  105. // TODO(#85691): remove this once CA migrates to creating a Framework instead of a full scheduler.
  106. Framework() framework.Framework
  107. }
  108. // ScheduleResult represents the result of one pod scheduled. It will contain
  109. // the final selected Node, along with the selected intermediate information.
  110. type ScheduleResult struct {
  111. // Name of the scheduler suggest host
  112. SuggestedHost string
  113. // Number of nodes scheduler evaluated on one pod scheduled
  114. EvaluatedNodes int
  115. // Number of feasible nodes on one pod scheduled
  116. FeasibleNodes int
  117. }
  118. type genericScheduler struct {
  119. cache internalcache.Cache
  120. schedulingQueue internalqueue.SchedulingQueue
  121. framework framework.Framework
  122. extenders []SchedulerExtender
  123. nodeInfoSnapshot *internalcache.Snapshot
  124. volumeBinder *volumebinder.VolumeBinder
  125. pvcLister corelisters.PersistentVolumeClaimLister
  126. pdbLister policylisters.PodDisruptionBudgetLister
  127. disablePreemption bool
  128. percentageOfNodesToScore int32
  129. enableNonPreempting bool
  130. nextStartNodeIndex int
  131. }
  132. // Snapshot snapshots scheduler cache and node infos for all fit and priority
  133. // functions.
  134. func (g *genericScheduler) Snapshot() error {
  135. // Used for all fit and priority funcs.
  136. return g.cache.UpdateSnapshot(g.nodeInfoSnapshot)
  137. }
  138. // Framework returns the framework instance.
  139. func (g *genericScheduler) Framework() framework.Framework {
  140. // Used for all fit and priority funcs.
  141. return g.framework
  142. }
  143. // Schedule tries to schedule the given pod to one of the nodes in the node list.
  144. // If it succeeds, it will return the name of the node.
  145. // If it fails, it will return a FitError error with reasons.
  146. func (g *genericScheduler) Schedule(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
  147. trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name})
  148. defer trace.LogIfLong(100 * time.Millisecond)
  149. if err := podPassesBasicChecks(pod, g.pvcLister); err != nil {
  150. return result, err
  151. }
  152. trace.Step("Basic checks done")
  153. if err := g.Snapshot(); err != nil {
  154. return result, err
  155. }
  156. trace.Step("Snapshotting scheduler cache and node infos done")
  157. if g.nodeInfoSnapshot.NumNodes() == 0 {
  158. return result, ErrNoNodesAvailable
  159. }
  160. // Run "prefilter" plugins.
  161. preFilterStatus := g.framework.RunPreFilterPlugins(ctx, state, pod)
  162. if !preFilterStatus.IsSuccess() {
  163. return result, preFilterStatus.AsError()
  164. }
  165. trace.Step("Running prefilter plugins done")
  166. startPredicateEvalTime := time.Now()
  167. filteredNodes, filteredNodesStatuses, err := g.findNodesThatFitPod(ctx, state, pod)
  168. if err != nil {
  169. return result, err
  170. }
  171. trace.Step("Computing predicates done")
  172. if len(filteredNodes) == 0 {
  173. return result, &FitError{
  174. Pod: pod,
  175. NumAllNodes: g.nodeInfoSnapshot.NumNodes(),
  176. FilteredNodesStatuses: filteredNodesStatuses,
  177. }
  178. }
  179. // Run "prescore" plugins.
  180. prescoreStatus := g.framework.RunPreScorePlugins(ctx, state, pod, filteredNodes)
  181. if !prescoreStatus.IsSuccess() {
  182. return result, prescoreStatus.AsError()
  183. }
  184. trace.Step("Running prescore plugins done")
  185. metrics.DeprecatedSchedulingAlgorithmPredicateEvaluationSecondsDuration.Observe(metrics.SinceInSeconds(startPredicateEvalTime))
  186. metrics.DeprecatedSchedulingDuration.WithLabelValues(metrics.PredicateEvaluation).Observe(metrics.SinceInSeconds(startPredicateEvalTime))
  187. startPriorityEvalTime := time.Now()
  188. // When only one node after predicate, just use it.
  189. if len(filteredNodes) == 1 {
  190. metrics.DeprecatedSchedulingAlgorithmPriorityEvaluationSecondsDuration.Observe(metrics.SinceInSeconds(startPriorityEvalTime))
  191. return ScheduleResult{
  192. SuggestedHost: filteredNodes[0].Name,
  193. EvaluatedNodes: 1 + len(filteredNodesStatuses),
  194. FeasibleNodes: 1,
  195. }, nil
  196. }
  197. priorityList, err := g.prioritizeNodes(ctx, state, pod, filteredNodes)
  198. if err != nil {
  199. return result, err
  200. }
  201. metrics.DeprecatedSchedulingAlgorithmPriorityEvaluationSecondsDuration.Observe(metrics.SinceInSeconds(startPriorityEvalTime))
  202. metrics.DeprecatedSchedulingDuration.WithLabelValues(metrics.PriorityEvaluation).Observe(metrics.SinceInSeconds(startPriorityEvalTime))
  203. host, err := g.selectHost(priorityList)
  204. trace.Step("Prioritizing done")
  205. return ScheduleResult{
  206. SuggestedHost: host,
  207. EvaluatedNodes: len(filteredNodes) + len(filteredNodesStatuses),
  208. FeasibleNodes: len(filteredNodes),
  209. }, err
  210. }
  211. func (g *genericScheduler) Extenders() []SchedulerExtender {
  212. return g.extenders
  213. }
  214. // selectHost takes a prioritized list of nodes and then picks one
  215. // in a reservoir sampling manner from the nodes that had the highest score.
  216. func (g *genericScheduler) selectHost(nodeScoreList framework.NodeScoreList) (string, error) {
  217. if len(nodeScoreList) == 0 {
  218. return "", fmt.Errorf("empty priorityList")
  219. }
  220. maxScore := nodeScoreList[0].Score
  221. selected := nodeScoreList[0].Name
  222. cntOfMaxScore := 1
  223. for _, ns := range nodeScoreList[1:] {
  224. if ns.Score > maxScore {
  225. maxScore = ns.Score
  226. selected = ns.Name
  227. cntOfMaxScore = 1
  228. } else if ns.Score == maxScore {
  229. cntOfMaxScore++
  230. if rand.Intn(cntOfMaxScore) == 0 {
  231. // Replace the candidate with probability of 1/cntOfMaxScore
  232. selected = ns.Name
  233. }
  234. }
  235. }
  236. return selected, nil
  237. }
  238. // preempt finds nodes with pods that can be preempted to make room for "pod" to
  239. // schedule. It chooses one of the nodes and preempts the pods on the node and
  240. // returns 1) the node, 2) the list of preempted pods if such a node is found,
  241. // 3) A list of pods whose nominated node name should be cleared, and 4) any
  242. // possible error.
  243. // Preempt does not update its snapshot. It uses the same snapshot used in the
  244. // scheduling cycle. This is to avoid a scenario where preempt finds feasible
  245. // nodes without preempting any pod. When there are many pending pods in the
  246. // scheduling queue a nominated pod will go back to the queue and behind
  247. // other pods with the same priority. The nominated pod prevents other pods from
  248. // using the nominated resources and the nominated pod could take a long time
  249. // before it is retried after many other pending pods.
  250. func (g *genericScheduler) Preempt(ctx context.Context, state *framework.CycleState, pod *v1.Pod, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) {
  251. // Scheduler may return various types of errors. Consider preemption only if
  252. // the error is of type FitError.
  253. fitError, ok := scheduleErr.(*FitError)
  254. if !ok || fitError == nil {
  255. return nil, nil, nil, nil
  256. }
  257. if !podEligibleToPreemptOthers(pod, g.nodeInfoSnapshot.NodeInfos(), g.enableNonPreempting) {
  258. klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name)
  259. return nil, nil, nil, nil
  260. }
  261. allNodes, err := g.nodeInfoSnapshot.NodeInfos().List()
  262. if err != nil {
  263. return nil, nil, nil, err
  264. }
  265. if len(allNodes) == 0 {
  266. return nil, nil, nil, ErrNoNodesAvailable
  267. }
  268. potentialNodes := nodesWherePreemptionMightHelp(allNodes, fitError)
  269. if len(potentialNodes) == 0 {
  270. klog.V(3).Infof("Preemption will not help schedule pod %v/%v on any node.", pod.Namespace, pod.Name)
  271. // In this case, we should clean-up any existing nominated node name of the pod.
  272. return nil, nil, []*v1.Pod{pod}, nil
  273. }
  274. var pdbs []*policy.PodDisruptionBudget
  275. if g.pdbLister != nil {
  276. pdbs, err = g.pdbLister.List(labels.Everything())
  277. if err != nil {
  278. return nil, nil, nil, err
  279. }
  280. }
  281. nodeToVictims, err := g.selectNodesForPreemption(ctx, state, pod, potentialNodes, pdbs)
  282. if err != nil {
  283. return nil, nil, nil, err
  284. }
  285. // We will only check nodeToVictims with extenders that support preemption.
  286. // Extenders which do not support preemption may later prevent preemptor from being scheduled on the nominated
  287. // node. In that case, scheduler will find a different host for the preemptor in subsequent scheduling cycles.
  288. nodeToVictims, err = g.processPreemptionWithExtenders(pod, nodeToVictims)
  289. if err != nil {
  290. return nil, nil, nil, err
  291. }
  292. candidateNode := pickOneNodeForPreemption(nodeToVictims)
  293. if candidateNode == nil {
  294. return nil, nil, nil, nil
  295. }
  296. // Lower priority pods nominated to run on this node, may no longer fit on
  297. // this node. So, we should remove their nomination. Removing their
  298. // nomination updates these pods and moves them to the active queue. It
  299. // lets scheduler find another place for them.
  300. nominatedPods := g.getLowerPriorityNominatedPods(pod, candidateNode.Name)
  301. return candidateNode, nodeToVictims[candidateNode].Pods, nominatedPods, nil
  302. }
  303. // processPreemptionWithExtenders processes preemption with extenders
  304. func (g *genericScheduler) processPreemptionWithExtenders(
  305. pod *v1.Pod,
  306. nodeToVictims map[*v1.Node]*extenderv1.Victims,
  307. ) (map[*v1.Node]*extenderv1.Victims, error) {
  308. if len(nodeToVictims) > 0 {
  309. for _, extender := range g.extenders {
  310. if extender.SupportsPreemption() && extender.IsInterested(pod) {
  311. newNodeToVictims, err := extender.ProcessPreemption(
  312. pod,
  313. nodeToVictims,
  314. g.nodeInfoSnapshot.NodeInfos(),
  315. )
  316. if err != nil {
  317. if extender.IsIgnorable() {
  318. klog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set",
  319. extender, err)
  320. continue
  321. }
  322. return nil, err
  323. }
  324. // Replace nodeToVictims with new result after preemption. So the
  325. // rest of extenders can continue use it as parameter.
  326. nodeToVictims = newNodeToVictims
  327. // If node list becomes empty, no preemption can happen regardless of other extenders.
  328. if len(nodeToVictims) == 0 {
  329. break
  330. }
  331. }
  332. }
  333. }
  334. return nodeToVictims, nil
  335. }
  336. // getLowerPriorityNominatedPods returns pods whose priority is smaller than the
  337. // priority of the given "pod" and are nominated to run on the given node.
  338. // Note: We could possibly check if the nominated lower priority pods still fit
  339. // and return those that no longer fit, but that would require lots of
  340. // manipulation of NodeInfo and PreFilter state per nominated pod. It may not be
  341. // worth the complexity, especially because we generally expect to have a very
  342. // small number of nominated pods per node.
  343. func (g *genericScheduler) getLowerPriorityNominatedPods(pod *v1.Pod, nodeName string) []*v1.Pod {
  344. pods := g.schedulingQueue.NominatedPodsForNode(nodeName)
  345. if len(pods) == 0 {
  346. return nil
  347. }
  348. var lowerPriorityPods []*v1.Pod
  349. podPriority := podutil.GetPodPriority(pod)
  350. for _, p := range pods {
  351. if podutil.GetPodPriority(p) < podPriority {
  352. lowerPriorityPods = append(lowerPriorityPods, p)
  353. }
  354. }
  355. return lowerPriorityPods
  356. }
  357. // numFeasibleNodesToFind returns the number of feasible nodes that once found, the scheduler stops
  358. // its search for more feasible nodes.
  359. func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes int32) {
  360. if numAllNodes < minFeasibleNodesToFind || g.percentageOfNodesToScore >= 100 {
  361. return numAllNodes
  362. }
  363. adaptivePercentage := g.percentageOfNodesToScore
  364. if adaptivePercentage <= 0 {
  365. basePercentageOfNodesToScore := int32(50)
  366. adaptivePercentage = basePercentageOfNodesToScore - numAllNodes/125
  367. if adaptivePercentage < minFeasibleNodesPercentageToFind {
  368. adaptivePercentage = minFeasibleNodesPercentageToFind
  369. }
  370. }
  371. numNodes = numAllNodes * adaptivePercentage / 100
  372. if numNodes < minFeasibleNodesToFind {
  373. return minFeasibleNodesToFind
  374. }
  375. return numNodes
  376. }
  377. // Filters the nodes to find the ones that fit the pod based on the framework
  378. // filter plugins and filter extenders.
  379. func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.NodeToStatusMap, error) {
  380. filteredNodesStatuses := make(framework.NodeToStatusMap)
  381. filtered, err := g.findNodesThatPassFilters(ctx, state, pod, filteredNodesStatuses)
  382. if err != nil {
  383. return nil, nil, err
  384. }
  385. filtered, err = g.findNodesThatPassExtenders(pod, filtered, filteredNodesStatuses)
  386. if err != nil {
  387. return nil, nil, err
  388. }
  389. return filtered, filteredNodesStatuses, nil
  390. }
  391. // findNodesThatPassFilters finds the nodes that fit the filter plugins.
  392. func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, state *framework.CycleState, pod *v1.Pod, statuses framework.NodeToStatusMap) ([]*v1.Node, error) {
  393. allNodes, err := g.nodeInfoSnapshot.NodeInfos().List()
  394. if err != nil {
  395. return nil, err
  396. }
  397. numNodesToFind := g.numFeasibleNodesToFind(int32(len(allNodes)))
  398. // Create filtered list with enough space to avoid growing it
  399. // and allow assigning.
  400. filtered := make([]*v1.Node, numNodesToFind)
  401. if !g.framework.HasFilterPlugins() {
  402. for i := range filtered {
  403. filtered[i] = allNodes[i].Node()
  404. }
  405. g.nextStartNodeIndex = (g.nextStartNodeIndex + len(filtered)) % len(allNodes)
  406. return filtered, nil
  407. }
  408. errCh := util.NewErrorChannel()
  409. var statusesLock sync.Mutex
  410. var filteredLen int32
  411. ctx, cancel := context.WithCancel(ctx)
  412. checkNode := func(i int) {
  413. // We check the nodes starting from where we left off in the previous scheduling cycle,
  414. // this is to make sure all nodes have the same chance of being examined across pods.
  415. nodeInfo := allNodes[(g.nextStartNodeIndex+i)%len(allNodes)]
  416. fits, status, err := g.podPassesFiltersOnNode(ctx, state, pod, nodeInfo)
  417. if err != nil {
  418. errCh.SendErrorWithCancel(err, cancel)
  419. return
  420. }
  421. if fits {
  422. length := atomic.AddInt32(&filteredLen, 1)
  423. if length > numNodesToFind {
  424. cancel()
  425. atomic.AddInt32(&filteredLen, -1)
  426. } else {
  427. filtered[length-1] = nodeInfo.Node()
  428. }
  429. } else {
  430. statusesLock.Lock()
  431. if !status.IsSuccess() {
  432. statuses[nodeInfo.Node().Name] = status
  433. }
  434. statusesLock.Unlock()
  435. }
  436. }
  437. beginCheckNode := time.Now()
  438. statusCode := framework.Success
  439. defer func() {
  440. // We record Filter extension point latency here instead of in framework.go because framework.RunFilterPlugins
  441. // function is called for each node, whereas we want to have an overall latency for all nodes per scheduling cycle.
  442. // Note that this latency also includes latency for `addNominatedPods`, which calls framework.RunPreFilterAddPod.
  443. metrics.FrameworkExtensionPointDuration.WithLabelValues(framework.Filter, statusCode.String()).Observe(metrics.SinceInSeconds(beginCheckNode))
  444. }()
  445. // Stops searching for more nodes once the configured number of feasible nodes
  446. // are found.
  447. workqueue.ParallelizeUntil(ctx, 16, len(allNodes), checkNode)
  448. processedNodes := int(filteredLen) + len(statuses)
  449. g.nextStartNodeIndex = (g.nextStartNodeIndex + processedNodes) % len(allNodes)
  450. filtered = filtered[:filteredLen]
  451. if err := errCh.ReceiveError(); err != nil {
  452. statusCode = framework.Error
  453. return nil, err
  454. }
  455. return filtered, nil
  456. }
  457. func (g *genericScheduler) findNodesThatPassExtenders(pod *v1.Pod, filtered []*v1.Node, statuses framework.NodeToStatusMap) ([]*v1.Node, error) {
  458. for _, extender := range g.extenders {
  459. if len(filtered) == 0 {
  460. break
  461. }
  462. if !extender.IsInterested(pod) {
  463. continue
  464. }
  465. filteredList, failedMap, err := extender.Filter(pod, filtered)
  466. if err != nil {
  467. if extender.IsIgnorable() {
  468. klog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set",
  469. extender, err)
  470. continue
  471. }
  472. return nil, err
  473. }
  474. for failedNodeName, failedMsg := range failedMap {
  475. if _, found := statuses[failedNodeName]; !found {
  476. statuses[failedNodeName] = framework.NewStatus(framework.Unschedulable, failedMsg)
  477. } else {
  478. statuses[failedNodeName].AppendReason(failedMsg)
  479. }
  480. }
  481. filtered = filteredList
  482. }
  483. return filtered, nil
  484. }
  485. // addNominatedPods adds pods with equal or greater priority which are nominated
  486. // to run on the node. It returns 1) whether any pod was added, 2) augmented cycleState,
  487. // 3) augmented nodeInfo.
  488. func (g *genericScheduler) addNominatedPods(ctx context.Context, pod *v1.Pod, state *framework.CycleState,
  489. nodeInfo *schedulernodeinfo.NodeInfo) (bool, *framework.CycleState, *schedulernodeinfo.NodeInfo, error) {
  490. if g.schedulingQueue == nil || nodeInfo == nil || nodeInfo.Node() == nil {
  491. // This may happen only in tests.
  492. return false, state, nodeInfo, nil
  493. }
  494. nominatedPods := g.schedulingQueue.NominatedPodsForNode(nodeInfo.Node().Name)
  495. if len(nominatedPods) == 0 {
  496. return false, state, nodeInfo, nil
  497. }
  498. nodeInfoOut := nodeInfo.Clone()
  499. stateOut := state.Clone()
  500. podsAdded := false
  501. for _, p := range nominatedPods {
  502. if podutil.GetPodPriority(p) >= podutil.GetPodPriority(pod) && p.UID != pod.UID {
  503. nodeInfoOut.AddPod(p)
  504. status := g.framework.RunPreFilterExtensionAddPod(ctx, stateOut, pod, p, nodeInfoOut)
  505. if !status.IsSuccess() {
  506. return false, state, nodeInfo, status.AsError()
  507. }
  508. podsAdded = true
  509. }
  510. }
  511. return podsAdded, stateOut, nodeInfoOut, nil
  512. }
  513. // podPassesFiltersOnNode checks whether a node given by NodeInfo satisfies the
  514. // filter plugins.
  515. // This function is called from two different places: Schedule and Preempt.
  516. // When it is called from Schedule, we want to test whether the pod is
  517. // schedulable on the node with all the existing pods on the node plus higher
  518. // and equal priority pods nominated to run on the node.
  519. // When it is called from Preempt, we should remove the victims of preemption
  520. // and add the nominated pods. Removal of the victims is done by
  521. // SelectVictimsOnNode(). Preempt removes victims from PreFilter state and
  522. // NodeInfo before calling this function.
  523. func (g *genericScheduler) podPassesFiltersOnNode(
  524. ctx context.Context,
  525. state *framework.CycleState,
  526. pod *v1.Pod,
  527. info *schedulernodeinfo.NodeInfo,
  528. ) (bool, *framework.Status, error) {
  529. var status *framework.Status
  530. podsAdded := false
  531. // We run filters twice in some cases. If the node has greater or equal priority
  532. // nominated pods, we run them when those pods are added to PreFilter state and nodeInfo.
  533. // If all filters succeed in this pass, we run them again when these
  534. // nominated pods are not added. This second pass is necessary because some
  535. // filters such as inter-pod affinity may not pass without the nominated pods.
  536. // If there are no nominated pods for the node or if the first run of the
  537. // filters fail, we don't run the second pass.
  538. // We consider only equal or higher priority pods in the first pass, because
  539. // those are the current "pod" must yield to them and not take a space opened
  540. // for running them. It is ok if the current "pod" take resources freed for
  541. // lower priority pods.
  542. // Requiring that the new pod is schedulable in both circumstances ensures that
  543. // we are making a conservative decision: filters like resources and inter-pod
  544. // anti-affinity are more likely to fail when the nominated pods are treated
  545. // as running, while filters like pod affinity are more likely to fail when
  546. // the nominated pods are treated as not running. We can't just assume the
  547. // nominated pods are running because they are not running right now and in fact,
  548. // they may end up getting scheduled to a different node.
  549. for i := 0; i < 2; i++ {
  550. stateToUse := state
  551. nodeInfoToUse := info
  552. if i == 0 {
  553. var err error
  554. podsAdded, stateToUse, nodeInfoToUse, err = g.addNominatedPods(ctx, pod, state, info)
  555. if err != nil {
  556. return false, nil, err
  557. }
  558. } else if !podsAdded || !status.IsSuccess() {
  559. break
  560. }
  561. statusMap := g.framework.RunFilterPlugins(ctx, stateToUse, pod, nodeInfoToUse)
  562. status = statusMap.Merge()
  563. if !status.IsSuccess() && !status.IsUnschedulable() {
  564. return false, status, status.AsError()
  565. }
  566. }
  567. return status.IsSuccess(), status, nil
  568. }
  569. // prioritizeNodes prioritizes the nodes by running the score plugins,
  570. // which return a score for each node from the call to RunScorePlugins().
  571. // The scores from each plugin are added together to make the score for that node, then
  572. // any extenders are run as well.
  573. // All scores are finally combined (added) to get the total weighted scores of all nodes
  574. func (g *genericScheduler) prioritizeNodes(
  575. ctx context.Context,
  576. state *framework.CycleState,
  577. pod *v1.Pod,
  578. nodes []*v1.Node,
  579. ) (framework.NodeScoreList, error) {
  580. // If no priority configs are provided, then all nodes will have a score of one.
  581. // This is required to generate the priority list in the required format
  582. if len(g.extenders) == 0 && !g.framework.HasScorePlugins() {
  583. result := make(framework.NodeScoreList, 0, len(nodes))
  584. for i := range nodes {
  585. result = append(result, framework.NodeScore{
  586. Name: nodes[i].Name,
  587. Score: 1,
  588. })
  589. }
  590. return result, nil
  591. }
  592. // Run the Score plugins.
  593. scoresMap, scoreStatus := g.framework.RunScorePlugins(ctx, state, pod, nodes)
  594. if !scoreStatus.IsSuccess() {
  595. return framework.NodeScoreList{}, scoreStatus.AsError()
  596. }
  597. // Summarize all scores.
  598. result := make(framework.NodeScoreList, 0, len(nodes))
  599. for i := range nodes {
  600. result = append(result, framework.NodeScore{Name: nodes[i].Name, Score: 0})
  601. for j := range scoresMap {
  602. result[i].Score += scoresMap[j][i].Score
  603. }
  604. }
  605. if len(g.extenders) != 0 && nodes != nil {
  606. var mu sync.Mutex
  607. var wg sync.WaitGroup
  608. combinedScores := make(map[string]int64, len(nodes))
  609. for i := range g.extenders {
  610. if !g.extenders[i].IsInterested(pod) {
  611. continue
  612. }
  613. wg.Add(1)
  614. go func(extIndex int) {
  615. metrics.SchedulerGoroutines.WithLabelValues("prioritizing_extender").Inc()
  616. defer func() {
  617. metrics.SchedulerGoroutines.WithLabelValues("prioritizing_extender").Dec()
  618. wg.Done()
  619. }()
  620. prioritizedList, weight, err := g.extenders[extIndex].Prioritize(pod, nodes)
  621. if err != nil {
  622. // Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities
  623. return
  624. }
  625. mu.Lock()
  626. for i := range *prioritizedList {
  627. host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score
  628. if klog.V(10) {
  629. klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), host, g.extenders[extIndex].Name(), score)
  630. }
  631. combinedScores[host] += score * weight
  632. }
  633. mu.Unlock()
  634. }(i)
  635. }
  636. // wait for all go routines to finish
  637. wg.Wait()
  638. for i := range result {
  639. // MaxExtenderPriority may diverge from the max priority used in the scheduler and defined by MaxNodeScore,
  640. // therefore we need to scale the score returned by extenders to the score range used by the scheduler.
  641. result[i].Score += combinedScores[result[i].Name] * (framework.MaxNodeScore / extenderv1.MaxExtenderPriority)
  642. }
  643. }
  644. if klog.V(10) {
  645. for i := range result {
  646. klog.Infof("Host %s => Score %d", result[i].Name, result[i].Score)
  647. }
  648. }
  649. return result, nil
  650. }
  651. // pickOneNodeForPreemption chooses one node among the given nodes. It assumes
  652. // pods in each map entry are ordered by decreasing priority.
  653. // It picks a node based on the following criteria:
  654. // 1. A node with minimum number of PDB violations.
  655. // 2. A node with minimum highest priority victim is picked.
  656. // 3. Ties are broken by sum of priorities of all victims.
  657. // 4. If there are still ties, node with the minimum number of victims is picked.
  658. // 5. If there are still ties, node with the latest start time of all highest priority victims is picked.
  659. // 6. If there are still ties, the first such node is picked (sort of randomly).
  660. // The 'minNodes1' and 'minNodes2' are being reused here to save the memory
  661. // allocation and garbage collection time.
  662. func pickOneNodeForPreemption(nodesToVictims map[*v1.Node]*extenderv1.Victims) *v1.Node {
  663. if len(nodesToVictims) == 0 {
  664. return nil
  665. }
  666. minNumPDBViolatingPods := int64(math.MaxInt32)
  667. var minNodes1 []*v1.Node
  668. lenNodes1 := 0
  669. for node, victims := range nodesToVictims {
  670. if len(victims.Pods) == 0 {
  671. // We found a node that doesn't need any preemption. Return it!
  672. // This should happen rarely when one or more pods are terminated between
  673. // the time that scheduler tries to schedule the pod and the time that
  674. // preemption logic tries to find nodes for preemption.
  675. return node
  676. }
  677. numPDBViolatingPods := victims.NumPDBViolations
  678. if numPDBViolatingPods < minNumPDBViolatingPods {
  679. minNumPDBViolatingPods = numPDBViolatingPods
  680. minNodes1 = nil
  681. lenNodes1 = 0
  682. }
  683. if numPDBViolatingPods == minNumPDBViolatingPods {
  684. minNodes1 = append(minNodes1, node)
  685. lenNodes1++
  686. }
  687. }
  688. if lenNodes1 == 1 {
  689. return minNodes1[0]
  690. }
  691. // There are more than one node with minimum number PDB violating pods. Find
  692. // the one with minimum highest priority victim.
  693. minHighestPriority := int32(math.MaxInt32)
  694. var minNodes2 = make([]*v1.Node, lenNodes1)
  695. lenNodes2 := 0
  696. for i := 0; i < lenNodes1; i++ {
  697. node := minNodes1[i]
  698. victims := nodesToVictims[node]
  699. // highestPodPriority is the highest priority among the victims on this node.
  700. highestPodPriority := podutil.GetPodPriority(victims.Pods[0])
  701. if highestPodPriority < minHighestPriority {
  702. minHighestPriority = highestPodPriority
  703. lenNodes2 = 0
  704. }
  705. if highestPodPriority == minHighestPriority {
  706. minNodes2[lenNodes2] = node
  707. lenNodes2++
  708. }
  709. }
  710. if lenNodes2 == 1 {
  711. return minNodes2[0]
  712. }
  713. // There are a few nodes with minimum highest priority victim. Find the
  714. // smallest sum of priorities.
  715. minSumPriorities := int64(math.MaxInt64)
  716. lenNodes1 = 0
  717. for i := 0; i < lenNodes2; i++ {
  718. var sumPriorities int64
  719. node := minNodes2[i]
  720. for _, pod := range nodesToVictims[node].Pods {
  721. // We add MaxInt32+1 to all priorities to make all of them >= 0. This is
  722. // needed so that a node with a few pods with negative priority is not
  723. // picked over a node with a smaller number of pods with the same negative
  724. // priority (and similar scenarios).
  725. sumPriorities += int64(podutil.GetPodPriority(pod)) + int64(math.MaxInt32+1)
  726. }
  727. if sumPriorities < minSumPriorities {
  728. minSumPriorities = sumPriorities
  729. lenNodes1 = 0
  730. }
  731. if sumPriorities == minSumPriorities {
  732. minNodes1[lenNodes1] = node
  733. lenNodes1++
  734. }
  735. }
  736. if lenNodes1 == 1 {
  737. return minNodes1[0]
  738. }
  739. // There are a few nodes with minimum highest priority victim and sum of priorities.
  740. // Find one with the minimum number of pods.
  741. minNumPods := math.MaxInt32
  742. lenNodes2 = 0
  743. for i := 0; i < lenNodes1; i++ {
  744. node := minNodes1[i]
  745. numPods := len(nodesToVictims[node].Pods)
  746. if numPods < minNumPods {
  747. minNumPods = numPods
  748. lenNodes2 = 0
  749. }
  750. if numPods == minNumPods {
  751. minNodes2[lenNodes2] = node
  752. lenNodes2++
  753. }
  754. }
  755. if lenNodes2 == 1 {
  756. return minNodes2[0]
  757. }
  758. // There are a few nodes with same number of pods.
  759. // Find the node that satisfies latest(earliestStartTime(all highest-priority pods on node))
  760. latestStartTime := util.GetEarliestPodStartTime(nodesToVictims[minNodes2[0]])
  761. if latestStartTime == nil {
  762. // If the earliest start time of all pods on the 1st node is nil, just return it,
  763. // which is not expected to happen.
  764. klog.Errorf("earliestStartTime is nil for node %s. Should not reach here.", minNodes2[0])
  765. return minNodes2[0]
  766. }
  767. nodeToReturn := minNodes2[0]
  768. for i := 1; i < lenNodes2; i++ {
  769. node := minNodes2[i]
  770. // Get earliest start time of all pods on the current node.
  771. earliestStartTimeOnNode := util.GetEarliestPodStartTime(nodesToVictims[node])
  772. if earliestStartTimeOnNode == nil {
  773. klog.Errorf("earliestStartTime is nil for node %s. Should not reach here.", node)
  774. continue
  775. }
  776. if earliestStartTimeOnNode.After(latestStartTime.Time) {
  777. latestStartTime = earliestStartTimeOnNode
  778. nodeToReturn = node
  779. }
  780. }
  781. return nodeToReturn
  782. }
  783. // selectNodesForPreemption finds all the nodes with possible victims for
  784. // preemption in parallel.
  785. func (g *genericScheduler) selectNodesForPreemption(
  786. ctx context.Context,
  787. state *framework.CycleState,
  788. pod *v1.Pod,
  789. potentialNodes []*schedulernodeinfo.NodeInfo,
  790. pdbs []*policy.PodDisruptionBudget,
  791. ) (map[*v1.Node]*extenderv1.Victims, error) {
  792. nodeToVictims := map[*v1.Node]*extenderv1.Victims{}
  793. var resultLock sync.Mutex
  794. checkNode := func(i int) {
  795. nodeInfoCopy := potentialNodes[i].Clone()
  796. stateCopy := state.Clone()
  797. pods, numPDBViolations, fits := g.selectVictimsOnNode(ctx, stateCopy, pod, nodeInfoCopy, pdbs)
  798. if fits {
  799. resultLock.Lock()
  800. victims := extenderv1.Victims{
  801. Pods: pods,
  802. NumPDBViolations: int64(numPDBViolations),
  803. }
  804. nodeToVictims[potentialNodes[i].Node()] = &victims
  805. resultLock.Unlock()
  806. }
  807. }
  808. workqueue.ParallelizeUntil(context.TODO(), 16, len(potentialNodes), checkNode)
  809. return nodeToVictims, nil
  810. }
  811. // filterPodsWithPDBViolation groups the given "pods" into two groups of "violatingPods"
  812. // and "nonViolatingPods" based on whether their PDBs will be violated if they are
  813. // preempted.
  814. // This function is stable and does not change the order of received pods. So, if it
  815. // receives a sorted list, grouping will preserve the order of the input list.
  816. func filterPodsWithPDBViolation(pods []*v1.Pod, pdbs []*policy.PodDisruptionBudget) (violatingPods, nonViolatingPods []*v1.Pod) {
  817. pdbsAllowed := make([]int32, len(pdbs))
  818. for i, pdb := range pdbs {
  819. pdbsAllowed[i] = pdb.Status.DisruptionsAllowed
  820. }
  821. for _, obj := range pods {
  822. pod := obj
  823. pdbForPodIsViolated := false
  824. // A pod with no labels will not match any PDB. So, no need to check.
  825. if len(pod.Labels) != 0 {
  826. for i, pdb := range pdbs {
  827. if pdb.Namespace != pod.Namespace {
  828. continue
  829. }
  830. selector, err := metav1.LabelSelectorAsSelector(pdb.Spec.Selector)
  831. if err != nil {
  832. continue
  833. }
  834. // A PDB with a nil or empty selector matches nothing.
  835. if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) {
  836. continue
  837. }
  838. // We have found a matching PDB.
  839. if pdbsAllowed[i] <= 0 {
  840. pdbForPodIsViolated = true
  841. break
  842. } else {
  843. pdbsAllowed[i]--
  844. }
  845. }
  846. }
  847. if pdbForPodIsViolated {
  848. violatingPods = append(violatingPods, pod)
  849. } else {
  850. nonViolatingPods = append(nonViolatingPods, pod)
  851. }
  852. }
  853. return violatingPods, nonViolatingPods
  854. }
  855. // selectVictimsOnNode finds minimum set of pods on the given node that should
  856. // be preempted in order to make enough room for "pod" to be scheduled. The
  857. // minimum set selected is subject to the constraint that a higher-priority pod
  858. // is never preempted when a lower-priority pod could be (higher/lower relative
  859. // to one another, not relative to the preemptor "pod").
  860. // The algorithm first checks if the pod can be scheduled on the node when all the
  861. // lower priority pods are gone. If so, it sorts all the lower priority pods by
  862. // their priority and then puts them into two groups of those whose PodDisruptionBudget
  863. // will be violated if preempted and other non-violating pods. Both groups are
  864. // sorted by priority. It first tries to reprieve as many PDB violating pods as
  865. // possible and then does them same for non-PDB-violating pods while checking
  866. // that the "pod" can still fit on the node.
  867. // NOTE: This function assumes that it is never called if "pod" cannot be scheduled
  868. // due to pod affinity, node affinity, or node anti-affinity reasons. None of
  869. // these predicates can be satisfied by removing more pods from the node.
  870. func (g *genericScheduler) selectVictimsOnNode(
  871. ctx context.Context,
  872. state *framework.CycleState,
  873. pod *v1.Pod,
  874. nodeInfo *schedulernodeinfo.NodeInfo,
  875. pdbs []*policy.PodDisruptionBudget,
  876. ) ([]*v1.Pod, int, bool) {
  877. var potentialVictims []*v1.Pod
  878. removePod := func(rp *v1.Pod) error {
  879. if err := nodeInfo.RemovePod(rp); err != nil {
  880. return err
  881. }
  882. status := g.framework.RunPreFilterExtensionRemovePod(ctx, state, pod, rp, nodeInfo)
  883. if !status.IsSuccess() {
  884. return status.AsError()
  885. }
  886. return nil
  887. }
  888. addPod := func(ap *v1.Pod) error {
  889. nodeInfo.AddPod(ap)
  890. status := g.framework.RunPreFilterExtensionAddPod(ctx, state, pod, ap, nodeInfo)
  891. if !status.IsSuccess() {
  892. return status.AsError()
  893. }
  894. return nil
  895. }
  896. // As the first step, remove all the lower priority pods from the node and
  897. // check if the given pod can be scheduled.
  898. podPriority := podutil.GetPodPriority(pod)
  899. for _, p := range nodeInfo.Pods() {
  900. if podutil.GetPodPriority(p) < podPriority {
  901. potentialVictims = append(potentialVictims, p)
  902. if err := removePod(p); err != nil {
  903. return nil, 0, false
  904. }
  905. }
  906. }
  907. // If the new pod does not fit after removing all the lower priority pods,
  908. // we are almost done and this node is not suitable for preemption. The only
  909. // condition that we could check is if the "pod" is failing to schedule due to
  910. // inter-pod affinity to one or more victims, but we have decided not to
  911. // support this case for performance reasons. Having affinity to lower
  912. // priority pods is not a recommended configuration anyway.
  913. if fits, _, err := g.podPassesFiltersOnNode(ctx, state, pod, nodeInfo); !fits {
  914. if err != nil {
  915. klog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err)
  916. }
  917. return nil, 0, false
  918. }
  919. var victims []*v1.Pod
  920. numViolatingVictim := 0
  921. sort.Slice(potentialVictims, func(i, j int) bool { return util.MoreImportantPod(potentialVictims[i], potentialVictims[j]) })
  922. // Try to reprieve as many pods as possible. We first try to reprieve the PDB
  923. // violating victims and then other non-violating ones. In both cases, we start
  924. // from the highest priority victims.
  925. violatingVictims, nonViolatingVictims := filterPodsWithPDBViolation(potentialVictims, pdbs)
  926. reprievePod := func(p *v1.Pod) (bool, error) {
  927. if err := addPod(p); err != nil {
  928. return false, err
  929. }
  930. fits, _, _ := g.podPassesFiltersOnNode(ctx, state, pod, nodeInfo)
  931. if !fits {
  932. if err := removePod(p); err != nil {
  933. return false, err
  934. }
  935. victims = append(victims, p)
  936. klog.V(5).Infof("Pod %v/%v is a potential preemption victim on node %v.", p.Namespace, p.Name, nodeInfo.Node().Name)
  937. }
  938. return fits, nil
  939. }
  940. for _, p := range violatingVictims {
  941. if fits, err := reprievePod(p); err != nil {
  942. klog.Warningf("Failed to reprieve pod %q: %v", p.Name, err)
  943. return nil, 0, false
  944. } else if !fits {
  945. numViolatingVictim++
  946. }
  947. }
  948. // Now we try to reprieve non-violating victims.
  949. for _, p := range nonViolatingVictims {
  950. if _, err := reprievePod(p); err != nil {
  951. klog.Warningf("Failed to reprieve pod %q: %v", p.Name, err)
  952. return nil, 0, false
  953. }
  954. }
  955. return victims, numViolatingVictim, true
  956. }
  957. // nodesWherePreemptionMightHelp returns a list of nodes with failed predicates
  958. // that may be satisfied by removing pods from the node.
  959. func nodesWherePreemptionMightHelp(nodes []*schedulernodeinfo.NodeInfo, fitErr *FitError) []*schedulernodeinfo.NodeInfo {
  960. var potentialNodes []*schedulernodeinfo.NodeInfo
  961. for _, node := range nodes {
  962. name := node.Node().Name
  963. // We reply on the status by each plugin - 'Unschedulable' or 'UnschedulableAndUnresolvable'
  964. // to determine whether preemption may help or not on the node.
  965. if fitErr.FilteredNodesStatuses[name].Code() == framework.UnschedulableAndUnresolvable {
  966. continue
  967. }
  968. klog.V(3).Infof("Node %v is a potential node for preemption.", name)
  969. potentialNodes = append(potentialNodes, node)
  970. }
  971. return potentialNodes
  972. }
  973. // podEligibleToPreemptOthers determines whether this pod should be considered
  974. // for preempting other pods or not. If this pod has already preempted other
  975. // pods and those are in their graceful termination period, it shouldn't be
  976. // considered for preemption.
  977. // We look at the node that is nominated for this pod and as long as there are
  978. // terminating pods on the node, we don't consider this for preempting more pods.
  979. func podEligibleToPreemptOthers(pod *v1.Pod, nodeInfos listers.NodeInfoLister, enableNonPreempting bool) bool {
  980. if enableNonPreempting && pod.Spec.PreemptionPolicy != nil && *pod.Spec.PreemptionPolicy == v1.PreemptNever {
  981. klog.V(5).Infof("Pod %v/%v is not eligible for preemption because it has a preemptionPolicy of %v", pod.Namespace, pod.Name, v1.PreemptNever)
  982. return false
  983. }
  984. nomNodeName := pod.Status.NominatedNodeName
  985. if len(nomNodeName) > 0 {
  986. if nodeInfo, _ := nodeInfos.Get(nomNodeName); nodeInfo != nil {
  987. podPriority := podutil.GetPodPriority(pod)
  988. for _, p := range nodeInfo.Pods() {
  989. if p.DeletionTimestamp != nil && podutil.GetPodPriority(p) < podPriority {
  990. // There is a terminating pod on the nominated node.
  991. return false
  992. }
  993. }
  994. }
  995. }
  996. return true
  997. }
  998. // podPassesBasicChecks makes sanity checks on the pod if it can be scheduled.
  999. func podPassesBasicChecks(pod *v1.Pod, pvcLister corelisters.PersistentVolumeClaimLister) error {
  1000. // Check PVCs used by the pod
  1001. namespace := pod.Namespace
  1002. manifest := &(pod.Spec)
  1003. for i := range manifest.Volumes {
  1004. volume := &manifest.Volumes[i]
  1005. if volume.PersistentVolumeClaim == nil {
  1006. // Volume is not a PVC, ignore
  1007. continue
  1008. }
  1009. pvcName := volume.PersistentVolumeClaim.ClaimName
  1010. pvc, err := pvcLister.PersistentVolumeClaims(namespace).Get(pvcName)
  1011. if err != nil {
  1012. // The error has already enough context ("persistentvolumeclaim "myclaim" not found")
  1013. return err
  1014. }
  1015. if pvc.DeletionTimestamp != nil {
  1016. return fmt.Errorf("persistentvolumeclaim %q is being deleted", pvc.Name)
  1017. }
  1018. }
  1019. return nil
  1020. }
  1021. // NewGenericScheduler creates a genericScheduler object.
  1022. func NewGenericScheduler(
  1023. cache internalcache.Cache,
  1024. podQueue internalqueue.SchedulingQueue,
  1025. nodeInfoSnapshot *internalcache.Snapshot,
  1026. framework framework.Framework,
  1027. extenders []SchedulerExtender,
  1028. volumeBinder *volumebinder.VolumeBinder,
  1029. pvcLister corelisters.PersistentVolumeClaimLister,
  1030. pdbLister policylisters.PodDisruptionBudgetLister,
  1031. disablePreemption bool,
  1032. percentageOfNodesToScore int32,
  1033. enableNonPreempting bool) ScheduleAlgorithm {
  1034. return &genericScheduler{
  1035. cache: cache,
  1036. schedulingQueue: podQueue,
  1037. framework: framework,
  1038. extenders: extenders,
  1039. nodeInfoSnapshot: nodeInfoSnapshot,
  1040. volumeBinder: volumeBinder,
  1041. pvcLister: pvcLister,
  1042. pdbLister: pdbLister,
  1043. disablePreemption: disablePreemption,
  1044. percentageOfNodesToScore: percentageOfNodesToScore,
  1045. enableNonPreempting: enableNonPreempting,
  1046. }
  1047. }