generic_scheduler.go 55 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package core
  14. import (
  15. "context"
  16. "fmt"
  17. "math"
  18. "sort"
  19. "strings"
  20. "sync"
  21. "sync/atomic"
  22. "time"
  23. "k8s.io/klog"
  24. "github.com/iwita/kube-scheduler/customcache"
  25. v1 "k8s.io/api/core/v1"
  26. policy "k8s.io/api/policy/v1beta1"
  27. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  28. "k8s.io/apimachinery/pkg/labels"
  29. "k8s.io/apimachinery/pkg/util/errors"
  30. corelisters "k8s.io/client-go/listers/core/v1"
  31. "k8s.io/client-go/util/workqueue"
  32. "k8s.io/kubernetes/pkg/scheduler/algorithm"
  33. "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
  34. "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
  35. schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
  36. framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
  37. internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
  38. internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
  39. "k8s.io/kubernetes/pkg/scheduler/metrics"
  40. schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
  41. "k8s.io/kubernetes/pkg/scheduler/util"
  42. "k8s.io/kubernetes/pkg/scheduler/volumebinder"
  43. utiltrace "k8s.io/utils/trace"
  44. )
  45. const (
  46. // minFeasibleNodesToFind is the minimum number of nodes that would be scored
  47. // in each scheduling cycle. This is a semi-arbitrary value to ensure that a
  48. // certain minimum of nodes are checked for feasibility. This in turn helps
  49. // ensure a minimum level of spreading.
  50. minFeasibleNodesToFind = 100
  51. // minFeasibleNodesPercentageToFind is the minimum percentage of nodes that
  52. // would be scored in each scheduling cycle. This is a semi-arbitrary value
  53. // to ensure that a certain minimum of nodes are checked for feasibility.
  54. // This in turn helps ensure a minimum level of spreading.
  55. minFeasibleNodesPercentageToFind = 5
  56. )
  57. var unresolvablePredicateFailureErrors = map[predicates.PredicateFailureReason]struct{}{
  58. predicates.ErrNodeSelectorNotMatch: {},
  59. predicates.ErrPodAffinityRulesNotMatch: {},
  60. predicates.ErrPodNotMatchHostName: {},
  61. predicates.ErrTaintsTolerationsNotMatch: {},
  62. predicates.ErrNodeLabelPresenceViolated: {},
  63. // Node conditions won't change when scheduler simulates removal of preemption victims.
  64. // So, it is pointless to try nodes that have not been able to host the pod due to node
  65. // conditions. These include ErrNodeNotReady, ErrNodeUnderPIDPressure, ErrNodeUnderMemoryPressure, ....
  66. predicates.ErrNodeNotReady: {},
  67. predicates.ErrNodeNetworkUnavailable: {},
  68. predicates.ErrNodeUnderDiskPressure: {},
  69. predicates.ErrNodeUnderPIDPressure: {},
  70. predicates.ErrNodeUnderMemoryPressure: {},
  71. predicates.ErrNodeUnschedulable: {},
  72. predicates.ErrNodeUnknownCondition: {},
  73. predicates.ErrVolumeZoneConflict: {},
  74. predicates.ErrVolumeNodeConflict: {},
  75. predicates.ErrVolumeBindConflict: {},
  76. }
  77. // FailedPredicateMap declares a map[string][]algorithm.PredicateFailureReason type.
  78. type FailedPredicateMap map[string][]predicates.PredicateFailureReason
  79. // FitError describes a fit error of a pod.
  80. type FitError struct {
  81. Pod *v1.Pod
  82. NumAllNodes int
  83. FailedPredicates FailedPredicateMap
  84. }
  85. // ErrNoNodesAvailable is used to describe the error that no nodes available to schedule pods.
  86. var ErrNoNodesAvailable = fmt.Errorf("no nodes available to schedule pods")
  87. const (
  88. // NoNodeAvailableMsg is used to format message when no nodes available.
  89. NoNodeAvailableMsg = "0/%v nodes are available"
  90. )
  91. // Error returns detailed information of why the pod failed to fit on each node
  92. func (f *FitError) Error() string {
  93. reasons := make(map[string]int)
  94. for _, predicates := range f.FailedPredicates {
  95. for _, pred := range predicates {
  96. reasons[pred.GetReason()]++
  97. }
  98. }
  99. sortReasonsHistogram := func() []string {
  100. reasonStrings := []string{}
  101. for k, v := range reasons {
  102. reasonStrings = append(reasonStrings, fmt.Sprintf("%v %v", v, k))
  103. }
  104. sort.Strings(reasonStrings)
  105. return reasonStrings
  106. }
  107. reasonMsg := fmt.Sprintf(NoNodeAvailableMsg+": %v.", f.NumAllNodes, strings.Join(sortReasonsHistogram(), ", "))
  108. return reasonMsg
  109. }
  110. // ScheduleAlgorithm is an interface implemented by things that know how to schedule pods
  111. // onto machines.
  112. // TODO: Rename this type.
  113. type ScheduleAlgorithm interface {
  114. Schedule(*v1.Pod, algorithm.NodeLister) (scheduleResult ScheduleResult, err error)
  115. // Preempt receives scheduling errors for a pod and tries to create room for
  116. // the pod by preempting lower priority pods if possible.
  117. // It returns the node where preemption happened, a list of preempted pods, a
  118. // list of pods whose nominated node name should be removed, and error if any.
  119. Preempt(*v1.Pod, algorithm.NodeLister, error) (selectedNode *v1.Node, preemptedPods []*v1.Pod, cleanupNominatedPods []*v1.Pod, err error)
  120. // Predicates() returns a pointer to a map of predicate functions. This is
  121. // exposed for testing.
  122. Predicates() map[string]predicates.FitPredicate
  123. // Prioritizers returns a slice of priority config. This is exposed for
  124. // testing.
  125. Prioritizers() []priorities.PriorityConfig
  126. }
  127. // ScheduleResult represents the result of one pod scheduled. It will contain
  128. // the final selected Node, along with the selected intermediate information.
  129. type ScheduleResult struct {
  130. // Name of the scheduler suggest host
  131. SuggestedHost string
  132. // Number of nodes scheduler evaluated on one pod scheduled
  133. EvaluatedNodes int
  134. // Number of feasible nodes on one pod scheduled
  135. FeasibleNodes int
  136. }
  137. type genericScheduler struct {
  138. cache internalcache.Cache
  139. schedulingQueue internalqueue.SchedulingQueue
  140. predicates map[string]predicates.FitPredicate
  141. priorityMetaProducer priorities.PriorityMetadataProducer
  142. predicateMetaProducer predicates.PredicateMetadataProducer
  143. prioritizers []priorities.PriorityConfig
  144. framework framework.Framework
  145. extenders []algorithm.SchedulerExtender
  146. lastNodeIndex uint64
  147. alwaysCheckAllPredicates bool
  148. nodeInfoSnapshot *internalcache.NodeInfoSnapshot
  149. volumeBinder *volumebinder.VolumeBinder
  150. pvcLister corelisters.PersistentVolumeClaimLister
  151. pdbLister algorithm.PDBLister
  152. disablePreemption bool
  153. percentageOfNodesToScore int32
  154. enableNonPreempting bool
  155. }
  156. // snapshot snapshots scheduler cache and node infos for all fit and priority
  157. // functions.
  158. func (g *genericScheduler) snapshot() error {
  159. // Used for all fit and priority funcs.
  160. return g.cache.UpdateNodeInfoSnapshot(g.nodeInfoSnapshot)
  161. }
  162. // Schedule tries to schedule the given pod to one of the nodes in the node list.
  163. // If it succeeds, it will return the name of the node.
  164. // If it fails, it will return a FitError error with reasons.
  165. func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister) (result ScheduleResult, err error) {
  166. trace := utiltrace.New(fmt.Sprintf("Scheduling %s/%s", pod.Namespace, pod.Name))
  167. defer trace.LogIfLong(100 * time.Millisecond)
  168. if err := podPassesBasicChecks(pod, g.pvcLister); err != nil {
  169. return result, err
  170. }
  171. nodes, err := nodeLister.List()
  172. if err != nil {
  173. return result, err
  174. }
  175. if len(nodes) == 0 {
  176. return result, ErrNoNodesAvailable
  177. }
  178. if err := g.snapshot(); err != nil {
  179. return result, err
  180. }
  181. trace.Step("Computing predicates")
  182. startPredicateEvalTime := time.Now()
  183. filteredNodes, failedPredicateMap, err := g.findNodesThatFit(pod, nodes)
  184. if err != nil {
  185. return result, err
  186. }
  187. if len(filteredNodes) == 0 {
  188. return result, &FitError{
  189. Pod: pod,
  190. NumAllNodes: len(nodes),
  191. FailedPredicates: failedPredicateMap,
  192. }
  193. }
  194. metrics.SchedulingAlgorithmPredicateEvaluationDuration.Observe(metrics.SinceInSeconds(startPredicateEvalTime))
  195. metrics.DeprecatedSchedulingAlgorithmPredicateEvaluationDuration.Observe(metrics.SinceInMicroseconds(startPredicateEvalTime))
  196. metrics.SchedulingLatency.WithLabelValues(metrics.PredicateEvaluation).Observe(metrics.SinceInSeconds(startPredicateEvalTime))
  197. metrics.DeprecatedSchedulingLatency.WithLabelValues(metrics.PredicateEvaluation).Observe(metrics.SinceInSeconds(startPredicateEvalTime))
  198. //trace.Step("Prioritizing")
  199. //trace.Step("Prioritizing Sockets")
  200. startPriorityEvalTime := time.Now()
  201. // When only one node after predicate, just use it.
  202. if len(filteredNodes) == 1 {
  203. metrics.SchedulingAlgorithmPriorityEvaluationDuration.Observe(metrics.SinceInSeconds(startPriorityEvalTime))
  204. metrics.DeprecatedSchedulingAlgorithmPriorityEvaluationDuration.Observe(metrics.SinceInMicroseconds(startPriorityEvalTime))
  205. return ScheduleResult{
  206. SuggestedHost: filteredNodes[0].Name,
  207. EvaluatedNodes: 1 + len(failedPredicateMap),
  208. FeasibleNodes: 1,
  209. }, nil
  210. }
  211. metaPrioritiesInterface := g.priorityMetaProducer(pod, g.nodeInfoSnapshot.NodeInfoMap)
  212. // default
  213. //priorityList, err := PrioritizeNodes(pod, g.nodeInfoSnapshot.NodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders)
  214. // default
  215. //start-custom
  216. select {
  217. // clean the cache if 10 seconds are passed
  218. case <-customcache.LabCache.Timeout.C:
  219. klog.Infof("Time to erase: %v", time.Now())
  220. //customcache.LabCache.Timeout.Stop()
  221. customcache.LabCache.CleanCache()
  222. klog.Infof("Cache: %v", customcache.LabCache.Cache)
  223. default:
  224. klog.Infof("Cache is Valid, Time: %v", customcache.LabCache.Timeout.C)
  225. }
  226. socketPrioritizers := []priorities.PriorityConfig{
  227. {
  228. Name: priorities.CustomRequestedPriority,
  229. Map: priorities.CustomRequestedPriorityMap,
  230. Weight: 100,
  231. },
  232. }
  233. klog.Infof("Selecting Socket")
  234. priorityList, err := PrioritizeNodes(pod, g.nodeInfoSnapshot.NodeInfoMap, metaPrioritiesInterface, socketPrioritizers, filteredNodes, g.extenders)
  235. //end-custom
  236. if err != nil {
  237. return result, err
  238. }
  239. metrics.SchedulingAlgorithmPriorityEvaluationDuration.Observe(metrics.SinceInSeconds(startPriorityEvalTime))
  240. metrics.DeprecatedSchedulingAlgorithmPriorityEvaluationDuration.Observe(metrics.SinceInMicroseconds(startPriorityEvalTime))
  241. metrics.SchedulingLatency.WithLabelValues(metrics.PriorityEvaluation).Observe(metrics.SinceInSeconds(startPriorityEvalTime))
  242. metrics.DeprecatedSchedulingLatency.WithLabelValues(metrics.PriorityEvaluation).Observe(metrics.SinceInSeconds(startPriorityEvalTime))
  243. // -----------------------------------------------------
  244. // ------------------START-CUSTOM-----------------------
  245. // -----------------------------------------------------
  246. //trace.Step("Selecting socket")
  247. hosts, err := g.selectHostOnWinningSocket(priorityList)
  248. //declare a subset of the snapshot of all available nodes
  249. // create a new map, containing only the subset of the nodes
  250. //var winningSocketNodes map[string]*schedulernodeinfo.NodeInfo
  251. var winningSocketNodes []*v1.Node
  252. for _, wn := range hosts {
  253. for _, n := range filteredNodes {
  254. if n.Name == wn {
  255. winningSocketNodes = append(winningSocketNodes, n)
  256. }
  257. }
  258. }
  259. //trace.Step("Selecting host")
  260. klog.Infof("Selecting host")
  261. nodePrioritizers := []priorities.PriorityConfig{
  262. {
  263. Name: priorities.NodeSelectionPriority,
  264. Map: priorities.NodeSelectionPriorityMap,
  265. Weight: 100,
  266. },
  267. }
  268. priorityList, err = PrioritizeNodes(pod, g.nodeInfoSnapshot.NodeInfoMap, metaPrioritiesInterface, nodePrioritizers, winningSocketNodes, g.extenders)
  269. // The winner host
  270. host, err := g.selectHost(priorityList)
  271. winningSocket := priorities.Sockets[host]
  272. winningUuid := priorities.Nodes[host]
  273. klog.Infof("Winning node: %v, Socket %v, UUID: %v", host, winningSocket, winningUuid)
  274. var tmp []string
  275. var socketNodes []string
  276. for key, val := range priorities.Nodes {
  277. if val == winningUuid {
  278. tmp = append(tmp, key)
  279. }
  280. }
  281. for _, n := range tmp {
  282. if priorities.Sockets[n] == winningSocket {
  283. socketNodes = append(socketNodes, n)
  284. }
  285. }
  286. // Add pod's information (average metrics to the winning nodes metrics) and cache them
  287. podName := pod.ObjectMeta.Name[0 : len(pod.ObjectMeta.Name)-19]
  288. var win bool
  289. for _, n := range socketNodes {
  290. if n == host {
  291. win = true
  292. } else {
  293. win = false
  294. }
  295. klog.Infof("Update Score for Node %v, using App: %v", n, podName)
  296. klog.Infof("App metrics: %v", priorities.Applications[podName].Metrics)
  297. numCores := len(priorities.Cores[n])
  298. customcache.LabCache.AddAppMetrics(priorities.Applications[podName].Metrics, n, numCores, win)
  299. }
  300. // -----------------------------------------------------
  301. // ------------------END-CUSTOM-----------------------
  302. // -----------------------------------------------------
  303. //trace.Step("Selecting host")
  304. klog.Infof("Return (generic_scheduler.go)")
  305. return ScheduleResult{
  306. SuggestedHost: host,
  307. EvaluatedNodes: len(filteredNodes) + len(failedPredicateMap),
  308. FeasibleNodes: len(filteredNodes),
  309. }, err
  310. }
  311. // Prioritizers returns a slice containing all the scheduler's priority
  312. // functions and their config. It is exposed for testing only.
  313. func (g *genericScheduler) Prioritizers() []priorities.PriorityConfig {
  314. return g.prioritizers
  315. }
  316. // Predicates returns a map containing all the scheduler's predicate
  317. // functions. It is exposed for testing only.
  318. func (g *genericScheduler) Predicates() map[string]predicates.FitPredicate {
  319. return g.predicates
  320. }
  321. // findMaxScores returns the indexes of nodes in the "priorityList" that has the highest "Score".
  322. func findMaxScores(priorityList schedulerapi.HostPriorityList) []int {
  323. maxScoreIndexes := make([]int, 0, len(priorityList)/2)
  324. maxScore := priorityList[0].Score
  325. for i, hp := range priorityList {
  326. if hp.Score > maxScore {
  327. maxScore = hp.Score
  328. maxScoreIndexes = maxScoreIndexes[:0]
  329. maxScoreIndexes = append(maxScoreIndexes, i)
  330. } else if hp.Score == maxScore {
  331. maxScoreIndexes = append(maxScoreIndexes, i)
  332. }
  333. }
  334. return maxScoreIndexes
  335. }
  336. // selectHost takes a prioritized list of nodes and then picks one
  337. // in a round-robin manner from the nodes that had the highest score.
  338. func (g *genericScheduler) selectHost(priorityList schedulerapi.HostPriorityList) (string, error) {
  339. if len(priorityList) == 0 {
  340. return "", fmt.Errorf("empty priorityList")
  341. }
  342. maxScores := findMaxScores(priorityList)
  343. ix := int(g.lastNodeIndex % uint64(len(maxScores)))
  344. g.lastNodeIndex++
  345. return priorityList[maxScores[ix]].Host, nil
  346. }
  347. //------------------------------------------------------------------------------------------------
  348. //------------------------------------------------------------------------------------------------
  349. // ---------START OF CUSTOMIZATION----------------------------------------------------------------
  350. //------------------------------------------------------------------------------------------------
  351. //------------------------------------------------------------------------------------------------
  352. // 1st level of filtering
  353. // returns a list of nodes that belong to the winning socket
  354. func (g *genericScheduler) selectHostOnWinningSocket(priorityList schedulerapi.HostPriorityList) ([]string, error) {
  355. var res []string
  356. if len(priorityList) == 0 {
  357. return res, fmt.Errorf("empty priorityList")
  358. }
  359. maxScores := findMaxScores(priorityList)
  360. //ix := int(g.lastNodeIndex % uint64(len(maxScores)))
  361. g.lastNodeIndex++
  362. for _, idx := range maxScores {
  363. res = append(res, priorityList[idx].Host)
  364. }
  365. return res, nil
  366. }
  367. // func (g *genericScheduler) customSelectHost(priorityList schedulerapi.CustomHostPriorityList) (string, error) {
  368. // if len(priorityList) == 0 {
  369. // return "", fmt.Errorf("empty priorityList")
  370. // }
  371. // maxScores := findMaxScores(priorityList)
  372. // ix := int(g.lastNodeIndex % uint64(len(maxScores)))
  373. // g.lastNodeIndex++
  374. // return priorityList[maxScores[ix]].Host, nil
  375. // }
  376. //------------------------------------------------------------------------------------------------
  377. //------------------------------------------------------------------------------------------------
  378. // ---------END OF CUSTOMIZATION----------------------------------------------------------------
  379. //------------------------------------------------------------------------------------------------
  380. //------------------------------------------------------------------------------------------------
  381. // preempt finds nodes with pods that can be preempted to make room for "pod" to
  382. // schedule. It chooses one of the nodes and preempts the pods on the node and
  383. // returns 1) the node, 2) the list of preempted pods if such a node is found,
  384. // 3) A list of pods whose nominated node name should be cleared, and 4) any
  385. // possible error.
  386. // Preempt does not update its snapshot. It uses the same snapshot used in the
  387. // scheduling cycle. This is to avoid a scenario where preempt finds feasible
  388. // nodes without preempting any pod. When there are many pending pods in the
  389. // scheduling queue a nominated pod will go back to the queue and behind
  390. // other pods with the same priority. The nominated pod prevents other pods from
  391. // using the nominated resources and the nominated pod could take a long time
  392. // before it is retried after many other pending pods.
  393. func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) {
  394. // Scheduler may return various types of errors. Consider preemption only if
  395. // the error is of type FitError.
  396. fitError, ok := scheduleErr.(*FitError)
  397. if !ok || fitError == nil {
  398. return nil, nil, nil, nil
  399. }
  400. if !podEligibleToPreemptOthers(pod, g.nodeInfoSnapshot.NodeInfoMap, g.enableNonPreempting) {
  401. klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name)
  402. return nil, nil, nil, nil
  403. }
  404. allNodes, err := nodeLister.List()
  405. if err != nil {
  406. return nil, nil, nil, err
  407. }
  408. if len(allNodes) == 0 {
  409. return nil, nil, nil, ErrNoNodesAvailable
  410. }
  411. potentialNodes := nodesWherePreemptionMightHelp(allNodes, fitError.FailedPredicates)
  412. if len(potentialNodes) == 0 {
  413. klog.V(3).Infof("Preemption will not help schedule pod %v/%v on any node.", pod.Namespace, pod.Name)
  414. // In this case, we should clean-up any existing nominated node name of the pod.
  415. return nil, nil, []*v1.Pod{pod}, nil
  416. }
  417. pdbs, err := g.pdbLister.List(labels.Everything())
  418. if err != nil {
  419. return nil, nil, nil, err
  420. }
  421. nodeToVictims, err := selectNodesForPreemption(pod, g.nodeInfoSnapshot.NodeInfoMap, potentialNodes, g.predicates,
  422. g.predicateMetaProducer, g.schedulingQueue, pdbs)
  423. if err != nil {
  424. return nil, nil, nil, err
  425. }
  426. // We will only check nodeToVictims with extenders that support preemption.
  427. // Extenders which do not support preemption may later prevent preemptor from being scheduled on the nominated
  428. // node. In that case, scheduler will find a different host for the preemptor in subsequent scheduling cycles.
  429. nodeToVictims, err = g.processPreemptionWithExtenders(pod, nodeToVictims)
  430. if err != nil {
  431. return nil, nil, nil, err
  432. }
  433. candidateNode := pickOneNodeForPreemption(nodeToVictims)
  434. if candidateNode == nil {
  435. return nil, nil, nil, nil
  436. }
  437. // Lower priority pods nominated to run on this node, may no longer fit on
  438. // this node. So, we should remove their nomination. Removing their
  439. // nomination updates these pods and moves them to the active queue. It
  440. // lets scheduler find another place for them.
  441. nominatedPods := g.getLowerPriorityNominatedPods(pod, candidateNode.Name)
  442. if nodeInfo, ok := g.nodeInfoSnapshot.NodeInfoMap[candidateNode.Name]; ok {
  443. return nodeInfo.Node(), nodeToVictims[candidateNode].Pods, nominatedPods, nil
  444. }
  445. return nil, nil, nil, fmt.Errorf(
  446. "preemption failed: the target node %s has been deleted from scheduler cache",
  447. candidateNode.Name)
  448. }
  449. // processPreemptionWithExtenders processes preemption with extenders
  450. func (g *genericScheduler) processPreemptionWithExtenders(
  451. pod *v1.Pod,
  452. nodeToVictims map[*v1.Node]*schedulerapi.Victims,
  453. ) (map[*v1.Node]*schedulerapi.Victims, error) {
  454. if len(nodeToVictims) > 0 {
  455. for _, extender := range g.extenders {
  456. if extender.SupportsPreemption() && extender.IsInterested(pod) {
  457. newNodeToVictims, err := extender.ProcessPreemption(
  458. pod,
  459. nodeToVictims,
  460. g.nodeInfoSnapshot.NodeInfoMap,
  461. )
  462. if err != nil {
  463. if extender.IsIgnorable() {
  464. klog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set",
  465. extender, err)
  466. continue
  467. }
  468. return nil, err
  469. }
  470. // Replace nodeToVictims with new result after preemption. So the
  471. // rest of extenders can continue use it as parameter.
  472. nodeToVictims = newNodeToVictims
  473. // If node list becomes empty, no preemption can happen regardless of other extenders.
  474. if len(nodeToVictims) == 0 {
  475. break
  476. }
  477. }
  478. }
  479. }
  480. return nodeToVictims, nil
  481. }
  482. // getLowerPriorityNominatedPods returns pods whose priority is smaller than the
  483. // priority of the given "pod" and are nominated to run on the given node.
  484. // Note: We could possibly check if the nominated lower priority pods still fit
  485. // and return those that no longer fit, but that would require lots of
  486. // manipulation of NodeInfo and PredicateMeta per nominated pod. It may not be
  487. // worth the complexity, especially because we generally expect to have a very
  488. // small number of nominated pods per node.
  489. func (g *genericScheduler) getLowerPriorityNominatedPods(pod *v1.Pod, nodeName string) []*v1.Pod {
  490. pods := g.schedulingQueue.NominatedPodsForNode(nodeName)
  491. if len(pods) == 0 {
  492. return nil
  493. }
  494. var lowerPriorityPods []*v1.Pod
  495. podPriority := util.GetPodPriority(pod)
  496. for _, p := range pods {
  497. if util.GetPodPriority(p) < podPriority {
  498. lowerPriorityPods = append(lowerPriorityPods, p)
  499. }
  500. }
  501. return lowerPriorityPods
  502. }
  503. // numFeasibleNodesToFind returns the number of feasible nodes that once found, the scheduler stops
  504. // its search for more feasible nodes.
  505. func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes int32) {
  506. if numAllNodes < minFeasibleNodesToFind || g.percentageOfNodesToScore >= 100 {
  507. return numAllNodes
  508. }
  509. adaptivePercentage := g.percentageOfNodesToScore
  510. if adaptivePercentage <= 0 {
  511. adaptivePercentage = schedulerapi.DefaultPercentageOfNodesToScore - numAllNodes/125
  512. if adaptivePercentage < minFeasibleNodesPercentageToFind {
  513. adaptivePercentage = minFeasibleNodesPercentageToFind
  514. }
  515. }
  516. numNodes = numAllNodes * adaptivePercentage / 100
  517. if numNodes < minFeasibleNodesToFind {
  518. return minFeasibleNodesToFind
  519. }
  520. return numNodes
  521. }
  522. // Filters the nodes to find the ones that fit based on the given predicate functions
  523. // Each node is passed through the predicate functions to determine if it is a fit
  524. func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, FailedPredicateMap, error) {
  525. var filtered []*v1.Node
  526. failedPredicateMap := FailedPredicateMap{}
  527. if len(g.predicates) == 0 {
  528. filtered = nodes
  529. } else {
  530. allNodes := int32(g.cache.NodeTree().NumNodes())
  531. numNodesToFind := g.numFeasibleNodesToFind(allNodes)
  532. // Create filtered list with enough space to avoid growing it
  533. // and allow assigning.
  534. filtered = make([]*v1.Node, numNodesToFind)
  535. errs := errors.MessageCountMap{}
  536. var (
  537. predicateResultLock sync.Mutex
  538. filteredLen int32
  539. )
  540. ctx, cancel := context.WithCancel(context.Background())
  541. // We can use the same metadata producer for all nodes.
  542. meta := g.predicateMetaProducer(pod, g.nodeInfoSnapshot.NodeInfoMap)
  543. checkNode := func(i int) {
  544. nodeName := g.cache.NodeTree().Next()
  545. fits, failedPredicates, err := podFitsOnNode(
  546. pod,
  547. meta,
  548. g.nodeInfoSnapshot.NodeInfoMap[nodeName],
  549. g.predicates,
  550. g.schedulingQueue,
  551. g.alwaysCheckAllPredicates,
  552. )
  553. if err != nil {
  554. predicateResultLock.Lock()
  555. errs[err.Error()]++
  556. predicateResultLock.Unlock()
  557. return
  558. }
  559. if fits {
  560. length := atomic.AddInt32(&filteredLen, 1)
  561. if length > numNodesToFind {
  562. cancel()
  563. atomic.AddInt32(&filteredLen, -1)
  564. } else {
  565. filtered[length-1] = g.nodeInfoSnapshot.NodeInfoMap[nodeName].Node()
  566. }
  567. } else {
  568. predicateResultLock.Lock()
  569. failedPredicateMap[nodeName] = failedPredicates
  570. predicateResultLock.Unlock()
  571. }
  572. }
  573. // Stops searching for more nodes once the configured number of feasible nodes
  574. // are found.
  575. workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode)
  576. filtered = filtered[:filteredLen]
  577. if len(errs) > 0 {
  578. return []*v1.Node{}, FailedPredicateMap{}, errors.CreateAggregateFromMessageCountMap(errs)
  579. }
  580. }
  581. if len(filtered) > 0 && len(g.extenders) != 0 {
  582. for _, extender := range g.extenders {
  583. if !extender.IsInterested(pod) {
  584. continue
  585. }
  586. filteredList, failedMap, err := extender.Filter(pod, filtered, g.nodeInfoSnapshot.NodeInfoMap)
  587. if err != nil {
  588. if extender.IsIgnorable() {
  589. klog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set",
  590. extender, err)
  591. continue
  592. } else {
  593. return []*v1.Node{}, FailedPredicateMap{}, err
  594. }
  595. }
  596. for failedNodeName, failedMsg := range failedMap {
  597. if _, found := failedPredicateMap[failedNodeName]; !found {
  598. failedPredicateMap[failedNodeName] = []predicates.PredicateFailureReason{}
  599. }
  600. failedPredicateMap[failedNodeName] = append(failedPredicateMap[failedNodeName], predicates.NewFailureReason(failedMsg))
  601. }
  602. filtered = filteredList
  603. if len(filtered) == 0 {
  604. break
  605. }
  606. }
  607. }
  608. return filtered, failedPredicateMap, nil
  609. }
  610. // addNominatedPods adds pods with equal or greater priority which are nominated
  611. // to run on the node given in nodeInfo to meta and nodeInfo. It returns 1) whether
  612. // any pod was found, 2) augmented meta data, 3) augmented nodeInfo.
  613. func addNominatedPods(pod *v1.Pod, meta predicates.PredicateMetadata,
  614. nodeInfo *schedulernodeinfo.NodeInfo, queue internalqueue.SchedulingQueue) (bool, predicates.PredicateMetadata,
  615. *schedulernodeinfo.NodeInfo) {
  616. if queue == nil || nodeInfo == nil || nodeInfo.Node() == nil {
  617. // This may happen only in tests.
  618. return false, meta, nodeInfo
  619. }
  620. nominatedPods := queue.NominatedPodsForNode(nodeInfo.Node().Name)
  621. if nominatedPods == nil || len(nominatedPods) == 0 {
  622. return false, meta, nodeInfo
  623. }
  624. var metaOut predicates.PredicateMetadata
  625. if meta != nil {
  626. metaOut = meta.ShallowCopy()
  627. }
  628. nodeInfoOut := nodeInfo.Clone()
  629. for _, p := range nominatedPods {
  630. if util.GetPodPriority(p) >= util.GetPodPriority(pod) && p.UID != pod.UID {
  631. nodeInfoOut.AddPod(p)
  632. if metaOut != nil {
  633. metaOut.AddPod(p, nodeInfoOut)
  634. }
  635. }
  636. }
  637. return true, metaOut, nodeInfoOut
  638. }
  639. // podFitsOnNode checks whether a node given by NodeInfo satisfies the given predicate functions.
  640. // For given pod, podFitsOnNode will check if any equivalent pod exists and try to reuse its cached
  641. // predicate results as possible.
  642. // This function is called from two different places: Schedule and Preempt.
  643. // When it is called from Schedule, we want to test whether the pod is schedulable
  644. // on the node with all the existing pods on the node plus higher and equal priority
  645. // pods nominated to run on the node.
  646. // When it is called from Preempt, we should remove the victims of preemption and
  647. // add the nominated pods. Removal of the victims is done by SelectVictimsOnNode().
  648. // It removes victims from meta and NodeInfo before calling this function.
  649. func podFitsOnNode(
  650. pod *v1.Pod,
  651. meta predicates.PredicateMetadata,
  652. info *schedulernodeinfo.NodeInfo,
  653. predicateFuncs map[string]predicates.FitPredicate,
  654. queue internalqueue.SchedulingQueue,
  655. alwaysCheckAllPredicates bool,
  656. ) (bool, []predicates.PredicateFailureReason, error) {
  657. var failedPredicates []predicates.PredicateFailureReason
  658. podsAdded := false
  659. // We run predicates twice in some cases. If the node has greater or equal priority
  660. // nominated pods, we run them when those pods are added to meta and nodeInfo.
  661. // If all predicates succeed in this pass, we run them again when these
  662. // nominated pods are not added. This second pass is necessary because some
  663. // predicates such as inter-pod affinity may not pass without the nominated pods.
  664. // If there are no nominated pods for the node or if the first run of the
  665. // predicates fail, we don't run the second pass.
  666. // We consider only equal or higher priority pods in the first pass, because
  667. // those are the current "pod" must yield to them and not take a space opened
  668. // for running them. It is ok if the current "pod" take resources freed for
  669. // lower priority pods.
  670. // Requiring that the new pod is schedulable in both circumstances ensures that
  671. // we are making a conservative decision: predicates like resources and inter-pod
  672. // anti-affinity are more likely to fail when the nominated pods are treated
  673. // as running, while predicates like pod affinity are more likely to fail when
  674. // the nominated pods are treated as not running. We can't just assume the
  675. // nominated pods are running because they are not running right now and in fact,
  676. // they may end up getting scheduled to a different node.
  677. for i := 0; i < 2; i++ {
  678. metaToUse := meta
  679. nodeInfoToUse := info
  680. if i == 0 {
  681. podsAdded, metaToUse, nodeInfoToUse = addNominatedPods(pod, meta, info, queue)
  682. } else if !podsAdded || len(failedPredicates) != 0 {
  683. break
  684. }
  685. for _, predicateKey := range predicates.Ordering() {
  686. var (
  687. fit bool
  688. reasons []predicates.PredicateFailureReason
  689. err error
  690. )
  691. //TODO (yastij) : compute average predicate restrictiveness to export it as Prometheus metric
  692. if predicate, exist := predicateFuncs[predicateKey]; exist {
  693. fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
  694. if err != nil {
  695. return false, []predicates.PredicateFailureReason{}, err
  696. }
  697. if !fit {
  698. // eCache is available and valid, and predicates result is unfit, record the fail reasons
  699. failedPredicates = append(failedPredicates, reasons...)
  700. // if alwaysCheckAllPredicates is false, short circuit all predicates when one predicate fails.
  701. if !alwaysCheckAllPredicates {
  702. klog.V(5).Infoln("since alwaysCheckAllPredicates has not been set, the predicate " +
  703. "evaluation is short circuited and there are chances " +
  704. "of other predicates failing as well.")
  705. break
  706. }
  707. }
  708. }
  709. }
  710. }
  711. return len(failedPredicates) == 0, failedPredicates, nil
  712. }
  713. // PrioritizeNodes prioritizes the nodes by running the individual priority functions in parallel.
  714. // Each priority function is expected to set a score of 0-10
  715. // 0 is the lowest priority score (least preferred node) and 10 is the highest
  716. // Each priority function can also have its own weight
  717. // The node scores returned by the priority function are multiplied by the weights to get weighted scores
  718. // All scores are finally combined (added) to get the total weighted scores of all nodes
  719. func PrioritizeNodes(
  720. pod *v1.Pod,
  721. nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo,
  722. meta interface{},
  723. priorityConfigs []priorities.PriorityConfig,
  724. nodes []*v1.Node,
  725. extenders []algorithm.SchedulerExtender,
  726. ) (schedulerapi.HostPriorityList, error) {
  727. // If no priority configs are provided, then the EqualPriority function is applied
  728. // This is required to generate the priority list in the required format
  729. if len(priorityConfigs) == 0 && len(extenders) == 0 {
  730. result := make(schedulerapi.HostPriorityList, 0, len(nodes))
  731. for i := range nodes {
  732. hostPriority, err := EqualPriorityMap(pod, meta, nodeNameToInfo[nodes[i].Name])
  733. if err != nil {
  734. return nil, err
  735. }
  736. result = append(result, hostPriority)
  737. }
  738. return result, nil
  739. }
  740. var (
  741. mu = sync.Mutex{}
  742. wg = sync.WaitGroup{}
  743. errs []error
  744. )
  745. appendError := func(err error) {
  746. mu.Lock()
  747. defer mu.Unlock()
  748. errs = append(errs, err)
  749. }
  750. results := make([]schedulerapi.HostPriorityList, len(priorityConfigs), len(priorityConfigs))
  751. // DEPRECATED: we can remove this when all priorityConfigs implement the
  752. // Map-Reduce pattern.
  753. for i := range priorityConfigs {
  754. if priorityConfigs[i].Function != nil {
  755. wg.Add(1)
  756. go func(index int) {
  757. defer wg.Done()
  758. var err error
  759. results[index], err = priorityConfigs[index].Function(pod, nodeNameToInfo, nodes)
  760. if err != nil {
  761. appendError(err)
  762. }
  763. }(i)
  764. } else {
  765. results[i] = make(schedulerapi.HostPriorityList, len(nodes))
  766. }
  767. }
  768. workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {
  769. nodeInfo := nodeNameToInfo[nodes[index].Name]
  770. for i := range priorityConfigs {
  771. // The Function is nil if there is no Map-Reduce functionality provided
  772. if priorityConfigs[i].Function != nil {
  773. continue
  774. }
  775. var err error
  776. results[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo)
  777. if err != nil {
  778. appendError(err)
  779. results[i][index].Host = nodes[index].Name
  780. }
  781. }
  782. })
  783. for i := range priorityConfigs {
  784. if priorityConfigs[i].Reduce == nil {
  785. continue
  786. }
  787. wg.Add(1)
  788. go func(index int) {
  789. defer wg.Done()
  790. if err := priorityConfigs[index].Reduce(pod, meta, nodeNameToInfo, results[index]); err != nil {
  791. appendError(err)
  792. }
  793. if klog.V(10) {
  794. for _, hostPriority := range results[index] {
  795. klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), hostPriority.Host, priorityConfigs[index].Name, hostPriority.Score)
  796. }
  797. }
  798. }(i)
  799. }
  800. // Wait for all computations to be finished.
  801. wg.Wait()
  802. if len(errs) != 0 {
  803. return schedulerapi.HostPriorityList{}, errors.NewAggregate(errs)
  804. }
  805. // Summarize all scores.
  806. result := make(schedulerapi.HostPriorityList, 0, len(nodes))
  807. for i := range nodes {
  808. result = append(result, schedulerapi.HostPriority{Host: nodes[i].Name, Score: 0})
  809. for j := range priorityConfigs {
  810. result[i].Score += results[j][i].Score * float64(priorityConfigs[j].Weight)
  811. }
  812. }
  813. if len(extenders) != 0 && nodes != nil {
  814. combinedScores := make(map[string]float64, len(nodeNameToInfo))
  815. for i := range extenders {
  816. if !extenders[i].IsInterested(pod) {
  817. continue
  818. }
  819. wg.Add(1)
  820. go func(extIndex int) {
  821. defer wg.Done()
  822. prioritizedList, weight, err := extenders[extIndex].Prioritize(pod, nodes)
  823. if err != nil {
  824. // Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities
  825. return
  826. }
  827. mu.Lock()
  828. for i := range *prioritizedList {
  829. host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score
  830. if klog.V(10) {
  831. klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), host, extenders[extIndex].Name(), score)
  832. }
  833. combinedScores[host] += score * float64(weight)
  834. }
  835. mu.Unlock()
  836. }(i)
  837. }
  838. // wait for all go routines to finish
  839. wg.Wait()
  840. for i := range result {
  841. result[i].Score += combinedScores[result[i].Host]
  842. }
  843. }
  844. if klog.V(10) {
  845. for i := range result {
  846. klog.Infof("Host %s => Score %d", result[i].Host, result[i].Score)
  847. }
  848. }
  849. return result, nil
  850. }
  851. //------------------------------------------------------------------
  852. //-------------------START-CUSTOM-BY-IWITA---------------------------------------------------------
  853. //------------------------------------------------------------------
  854. // func CustomPrioritizeNodes(
  855. // pod *v1.Pod,
  856. // nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo,
  857. // meta interface{},
  858. // priorityConfigs []priorities.PriorityConfig,
  859. // nodes []*v1.Node,
  860. // extenders []algorithm.SchedulerExtender,
  861. // ) (schedulerapi.CustomHostPriorityList, error) {
  862. // // If no priority configs are provided, then the EqualPriority function is applied
  863. // // This is required to generate the priority list in the required format
  864. // // if len(priorityConfigs) == 0 && len(extenders) == 0 {
  865. // // result := make(schedulerapi.CustomHostPriorityList, 0, len(nodes))
  866. // // for i := range nodes {
  867. // // // initializes nodes with Score = 1
  868. // // hostPriority, err := EqualPriorityMap(pod, meta, nodeNameToInfo[nodes[i].Name])
  869. // // if err != nil {
  870. // // return nil, err
  871. // // }
  872. // // result = append(result, hostPriority)
  873. // // }
  874. // // return result, nil
  875. // // }
  876. // var (
  877. // mu = sync.Mutex{}
  878. // wg = sync.WaitGroup{}
  879. // errs []error
  880. // )
  881. // appendError := func(err error) {
  882. // mu.Lock()
  883. // defer mu.Unlock()
  884. // errs = append(errs, err)
  885. // }
  886. // results := make([]schedulerapi.CustomHostPriorityList, len(priorityConfigs), len(priorityConfigs))
  887. // // DEPRECATED: we can remove this when all priorityConfigs implement the
  888. // // Map-Reduce pattern.
  889. // for i := range priorityConfigs {
  890. // if priorityConfigs[i].CustomFunction != nil {
  891. // wg.Add(1)
  892. // go func(index int) {
  893. // defer wg.Done()
  894. // var err error
  895. // results[index], err = priorityConfigs[index].CustomFunction(pod, nodeNameToInfo, nodes)
  896. // if err != nil {
  897. // appendError(err)
  898. // }
  899. // }(i)
  900. // } else {
  901. // results[i] = make(schedulerapi.CustomHostPriorityList, len(nodes))
  902. // }
  903. // }
  904. // workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {
  905. // nodeInfo := nodeNameToInfo[nodes[index].Name]
  906. // for i := range priorityConfigs {
  907. // if priorityConfigs[i].Function != nil {
  908. // continue
  909. // }
  910. // var err error
  911. // results[i][index], err = priorityConfigs[i].CustomMap(pod, meta, nodeInfo)
  912. // if err != nil {
  913. // appendError(err)
  914. // results[i][index].Host = nodes[index].Name
  915. // }
  916. // }
  917. // })
  918. // for i := range priorityConfigs {
  919. // if priorityConfigs[i].Reduce == nil {
  920. // continue
  921. // }
  922. // wg.Add(1)
  923. // go func(index int) {
  924. // defer wg.Done()
  925. // if err := priorityConfigs[index].CustomReduce(pod, meta, nodeNameToInfo, results[index]); err != nil {
  926. // appendError(err)
  927. // }
  928. // if klog.V(10) {
  929. // for _, hostPriority := range results[index] {
  930. // klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), hostPriority.Host, priorityConfigs[index].Name, hostPriority.Score)
  931. // }
  932. // }
  933. // }(i)
  934. // }
  935. // // Wait for all computations to be finished.
  936. // wg.Wait()
  937. // if len(errs) != 0 {
  938. // return schedulerapi.CustomHostPriorityList{}, errors.NewAggregate(errs)
  939. // }
  940. // // Summarize all scores.
  941. // result := make(schedulerapi.CustomHostPriorityList, 0, len(nodes))
  942. // for i := range nodes {
  943. // result = append(result, schedulerapi.CustomHostPriority{Host: nodes[i].Name, Score: 0})
  944. // for j := range priorityConfigs {
  945. // result[i].Score += results[j][i].Score * float64(priorityConfigs[j].Weight)
  946. // }
  947. // }
  948. // if len(extenders) != 0 && nodes != nil {
  949. // combinedScores := make(map[string]float64, len(nodeNameToInfo))
  950. // for i := range extenders {
  951. // if !extenders[i].IsInterested(pod) {
  952. // continue
  953. // }
  954. // wg.Add(1)
  955. // go func(extIndex int) {
  956. // defer wg.Done()
  957. // prioritizedList, weight, err := extenders[extIndex].CustomPrioritize(pod, nodes)
  958. // if err != nil {
  959. // // Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities
  960. // return
  961. // }
  962. // mu.Lock()
  963. // for i := range *prioritizedList {
  964. // host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score
  965. // if klog.V(10) {
  966. // klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), host, extenders[extIndex].Name(), score)
  967. // }
  968. // combinedScores[host] += score * float64(weight)
  969. // }
  970. // mu.Unlock()
  971. // }(i)
  972. // }
  973. // // wait for all go routines to finish
  974. // wg.Wait()
  975. // for i := range result {
  976. // result[i].Score += combinedScores[result[i].Host]
  977. // }
  978. // }
  979. // if klog.V(10) {
  980. // for i := range result {
  981. // klog.Infof("Host %s => Score %d", result[i].Host, result[i].Score)
  982. // }
  983. // }
  984. // return result, nil
  985. // }
  986. //------------------------------------------------------------------
  987. // --------------END-CUSTOM-BY-IWITA--------------------------------
  988. //------------------------------------------------------------------
  989. // EqualPriorityMap is a prioritizer function that gives an equal weight of one to all nodes
  990. func EqualPriorityMap(_ *v1.Pod, _ interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (schedulerapi.HostPriority, error) {
  991. node := nodeInfo.Node()
  992. if node == nil {
  993. return schedulerapi.HostPriority{}, fmt.Errorf("node not found")
  994. }
  995. return schedulerapi.HostPriority{
  996. Host: node.Name,
  997. Score: 1,
  998. }, nil
  999. }
  1000. // pickOneNodeForPreemption chooses one node among the given nodes. It assumes
  1001. // pods in each map entry are ordered by decreasing priority.
  1002. // It picks a node based on the following criteria:
  1003. // 1. A node with minimum number of PDB violations.
  1004. // 2. A node with minimum highest priority victim is picked.
  1005. // 3. Ties are broken by sum of priorities of all victims.
  1006. // 4. If there are still ties, node with the minimum number of victims is picked.
  1007. // 5. If there are still ties, node with the latest start time of all highest priority victims is picked.
  1008. // 6. If there are still ties, the first such node is picked (sort of randomly).
  1009. // The 'minNodes1' and 'minNodes2' are being reused here to save the memory
  1010. // allocation and garbage collection time.
  1011. func pickOneNodeForPreemption(nodesToVictims map[*v1.Node]*schedulerapi.Victims) *v1.Node {
  1012. if len(nodesToVictims) == 0 {
  1013. return nil
  1014. }
  1015. minNumPDBViolatingPods := math.MaxInt32
  1016. var minNodes1 []*v1.Node
  1017. lenNodes1 := 0
  1018. for node, victims := range nodesToVictims {
  1019. if len(victims.Pods) == 0 {
  1020. // We found a node that doesn't need any preemption. Return it!
  1021. // This should happen rarely when one or more pods are terminated between
  1022. // the time that scheduler tries to schedule the pod and the time that
  1023. // preemption logic tries to find nodes for preemption.
  1024. return node
  1025. }
  1026. numPDBViolatingPods := victims.NumPDBViolations
  1027. if numPDBViolatingPods < minNumPDBViolatingPods {
  1028. minNumPDBViolatingPods = numPDBViolatingPods
  1029. minNodes1 = nil
  1030. lenNodes1 = 0
  1031. }
  1032. if numPDBViolatingPods == minNumPDBViolatingPods {
  1033. minNodes1 = append(minNodes1, node)
  1034. lenNodes1++
  1035. }
  1036. }
  1037. if lenNodes1 == 1 {
  1038. return minNodes1[0]
  1039. }
  1040. // There are more than one node with minimum number PDB violating pods. Find
  1041. // the one with minimum highest priority victim.
  1042. minHighestPriority := int32(math.MaxInt32)
  1043. var minNodes2 = make([]*v1.Node, lenNodes1)
  1044. lenNodes2 := 0
  1045. for i := 0; i < lenNodes1; i++ {
  1046. node := minNodes1[i]
  1047. victims := nodesToVictims[node]
  1048. // highestPodPriority is the highest priority among the victims on this node.
  1049. highestPodPriority := util.GetPodPriority(victims.Pods[0])
  1050. if highestPodPriority < minHighestPriority {
  1051. minHighestPriority = highestPodPriority
  1052. lenNodes2 = 0
  1053. }
  1054. if highestPodPriority == minHighestPriority {
  1055. minNodes2[lenNodes2] = node
  1056. lenNodes2++
  1057. }
  1058. }
  1059. if lenNodes2 == 1 {
  1060. return minNodes2[0]
  1061. }
  1062. // There are a few nodes with minimum highest priority victim. Find the
  1063. // smallest sum of priorities.
  1064. minSumPriorities := int64(math.MaxInt64)
  1065. lenNodes1 = 0
  1066. for i := 0; i < lenNodes2; i++ {
  1067. var sumPriorities int64
  1068. node := minNodes2[i]
  1069. for _, pod := range nodesToVictims[node].Pods {
  1070. // We add MaxInt32+1 to all priorities to make all of them >= 0. This is
  1071. // needed so that a node with a few pods with negative priority is not
  1072. // picked over a node with a smaller number of pods with the same negative
  1073. // priority (and similar scenarios).
  1074. sumPriorities += int64(util.GetPodPriority(pod)) + int64(math.MaxInt32+1)
  1075. }
  1076. if sumPriorities < minSumPriorities {
  1077. minSumPriorities = sumPriorities
  1078. lenNodes1 = 0
  1079. }
  1080. if sumPriorities == minSumPriorities {
  1081. minNodes1[lenNodes1] = node
  1082. lenNodes1++
  1083. }
  1084. }
  1085. if lenNodes1 == 1 {
  1086. return minNodes1[0]
  1087. }
  1088. // There are a few nodes with minimum highest priority victim and sum of priorities.
  1089. // Find one with the minimum number of pods.
  1090. minNumPods := math.MaxInt32
  1091. lenNodes2 = 0
  1092. for i := 0; i < lenNodes1; i++ {
  1093. node := minNodes1[i]
  1094. numPods := len(nodesToVictims[node].Pods)
  1095. if numPods < minNumPods {
  1096. minNumPods = numPods
  1097. lenNodes2 = 0
  1098. }
  1099. if numPods == minNumPods {
  1100. minNodes2[lenNodes2] = node
  1101. lenNodes2++
  1102. }
  1103. }
  1104. if lenNodes2 == 1 {
  1105. return minNodes2[0]
  1106. }
  1107. // There are a few nodes with same number of pods.
  1108. // Find the node that satisfies latest(earliestStartTime(all highest-priority pods on node))
  1109. latestStartTime := util.GetEarliestPodStartTime(nodesToVictims[minNodes2[0]])
  1110. if latestStartTime == nil {
  1111. // If the earliest start time of all pods on the 1st node is nil, just return it,
  1112. // which is not expected to happen.
  1113. klog.Errorf("earliestStartTime is nil for node %s. Should not reach here.", minNodes2[0])
  1114. return minNodes2[0]
  1115. }
  1116. nodeToReturn := minNodes2[0]
  1117. for i := 1; i < lenNodes2; i++ {
  1118. node := minNodes2[i]
  1119. // Get earliest start time of all pods on the current node.
  1120. earliestStartTimeOnNode := util.GetEarliestPodStartTime(nodesToVictims[node])
  1121. if earliestStartTimeOnNode == nil {
  1122. klog.Errorf("earliestStartTime is nil for node %s. Should not reach here.", node)
  1123. continue
  1124. }
  1125. if earliestStartTimeOnNode.After(latestStartTime.Time) {
  1126. latestStartTime = earliestStartTimeOnNode
  1127. nodeToReturn = node
  1128. }
  1129. }
  1130. return nodeToReturn
  1131. }
  1132. // selectNodesForPreemption finds all the nodes with possible victims for
  1133. // preemption in parallel.
  1134. func selectNodesForPreemption(pod *v1.Pod,
  1135. nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo,
  1136. potentialNodes []*v1.Node,
  1137. fitPredicates map[string]predicates.FitPredicate,
  1138. metadataProducer predicates.PredicateMetadataProducer,
  1139. queue internalqueue.SchedulingQueue,
  1140. pdbs []*policy.PodDisruptionBudget,
  1141. ) (map[*v1.Node]*schedulerapi.Victims, error) {
  1142. nodeToVictims := map[*v1.Node]*schedulerapi.Victims{}
  1143. var resultLock sync.Mutex
  1144. // We can use the same metadata producer for all nodes.
  1145. meta := metadataProducer(pod, nodeNameToInfo)
  1146. checkNode := func(i int) {
  1147. nodeName := potentialNodes[i].Name
  1148. var metaCopy predicates.PredicateMetadata
  1149. if meta != nil {
  1150. metaCopy = meta.ShallowCopy()
  1151. }
  1152. pods, numPDBViolations, fits := selectVictimsOnNode(pod, metaCopy, nodeNameToInfo[nodeName], fitPredicates, queue, pdbs)
  1153. if fits {
  1154. resultLock.Lock()
  1155. victims := schedulerapi.Victims{
  1156. Pods: pods,
  1157. NumPDBViolations: numPDBViolations,
  1158. }
  1159. nodeToVictims[potentialNodes[i]] = &victims
  1160. resultLock.Unlock()
  1161. }
  1162. }
  1163. workqueue.ParallelizeUntil(context.TODO(), 16, len(potentialNodes), checkNode)
  1164. return nodeToVictims, nil
  1165. }
  1166. // filterPodsWithPDBViolation groups the given "pods" into two groups of "violatingPods"
  1167. // and "nonViolatingPods" based on whether their PDBs will be violated if they are
  1168. // preempted.
  1169. // This function is stable and does not change the order of received pods. So, if it
  1170. // receives a sorted list, grouping will preserve the order of the input list.
  1171. func filterPodsWithPDBViolation(pods []interface{}, pdbs []*policy.PodDisruptionBudget) (violatingPods, nonViolatingPods []*v1.Pod) {
  1172. for _, obj := range pods {
  1173. pod := obj.(*v1.Pod)
  1174. pdbForPodIsViolated := false
  1175. // A pod with no labels will not match any PDB. So, no need to check.
  1176. if len(pod.Labels) != 0 {
  1177. for _, pdb := range pdbs {
  1178. if pdb.Namespace != pod.Namespace {
  1179. continue
  1180. }
  1181. selector, err := metav1.LabelSelectorAsSelector(pdb.Spec.Selector)
  1182. if err != nil {
  1183. continue
  1184. }
  1185. // A PDB with a nil or empty selector matches nothing.
  1186. if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) {
  1187. continue
  1188. }
  1189. // We have found a matching PDB.
  1190. if pdb.Status.PodDisruptionsAllowed <= 0 {
  1191. pdbForPodIsViolated = true
  1192. break
  1193. }
  1194. }
  1195. }
  1196. if pdbForPodIsViolated {
  1197. violatingPods = append(violatingPods, pod)
  1198. } else {
  1199. nonViolatingPods = append(nonViolatingPods, pod)
  1200. }
  1201. }
  1202. return violatingPods, nonViolatingPods
  1203. }
  1204. // selectVictimsOnNode finds minimum set of pods on the given node that should
  1205. // be preempted in order to make enough room for "pod" to be scheduled. The
  1206. // minimum set selected is subject to the constraint that a higher-priority pod
  1207. // is never preempted when a lower-priority pod could be (higher/lower relative
  1208. // to one another, not relative to the preemptor "pod").
  1209. // The algorithm first checks if the pod can be scheduled on the node when all the
  1210. // lower priority pods are gone. If so, it sorts all the lower priority pods by
  1211. // their priority and then puts them into two groups of those whose PodDisruptionBudget
  1212. // will be violated if preempted and other non-violating pods. Both groups are
  1213. // sorted by priority. It first tries to reprieve as many PDB violating pods as
  1214. // possible and then does them same for non-PDB-violating pods while checking
  1215. // that the "pod" can still fit on the node.
  1216. // NOTE: This function assumes that it is never called if "pod" cannot be scheduled
  1217. // due to pod affinity, node affinity, or node anti-affinity reasons. None of
  1218. // these predicates can be satisfied by removing more pods from the node.
  1219. func selectVictimsOnNode(
  1220. pod *v1.Pod,
  1221. meta predicates.PredicateMetadata,
  1222. nodeInfo *schedulernodeinfo.NodeInfo,
  1223. fitPredicates map[string]predicates.FitPredicate,
  1224. queue internalqueue.SchedulingQueue,
  1225. pdbs []*policy.PodDisruptionBudget,
  1226. ) ([]*v1.Pod, int, bool) {
  1227. if nodeInfo == nil {
  1228. return nil, 0, false
  1229. }
  1230. potentialVictims := util.SortableList{CompFunc: util.MoreImportantPod}
  1231. nodeInfoCopy := nodeInfo.Clone()
  1232. removePod := func(rp *v1.Pod) {
  1233. nodeInfoCopy.RemovePod(rp)
  1234. if meta != nil {
  1235. meta.RemovePod(rp)
  1236. }
  1237. }
  1238. addPod := func(ap *v1.Pod) {
  1239. nodeInfoCopy.AddPod(ap)
  1240. if meta != nil {
  1241. meta.AddPod(ap, nodeInfoCopy)
  1242. }
  1243. }
  1244. // As the first step, remove all the lower priority pods from the node and
  1245. // check if the given pod can be scheduled.
  1246. podPriority := util.GetPodPriority(pod)
  1247. for _, p := range nodeInfoCopy.Pods() {
  1248. if util.GetPodPriority(p) < podPriority {
  1249. potentialVictims.Items = append(potentialVictims.Items, p)
  1250. removePod(p)
  1251. }
  1252. }
  1253. // If the new pod does not fit after removing all the lower priority pods,
  1254. // we are almost done and this node is not suitable for preemption. The only
  1255. // condition that we could check is if the "pod" is failing to schedule due to
  1256. // inter-pod affinity to one or more victims, but we have decided not to
  1257. // support this case for performance reasons. Having affinity to lower
  1258. // priority pods is not a recommended configuration anyway.
  1259. if fits, _, err := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, queue, false); !fits {
  1260. if err != nil {
  1261. klog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err)
  1262. }
  1263. return nil, 0, false
  1264. }
  1265. var victims []*v1.Pod
  1266. numViolatingVictim := 0
  1267. potentialVictims.Sort()
  1268. // Try to reprieve as many pods as possible. We first try to reprieve the PDB
  1269. // violating victims and then other non-violating ones. In both cases, we start
  1270. // from the highest priority victims.
  1271. violatingVictims, nonViolatingVictims := filterPodsWithPDBViolation(potentialVictims.Items, pdbs)
  1272. reprievePod := func(p *v1.Pod) bool {
  1273. addPod(p)
  1274. fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, queue, false)
  1275. if !fits {
  1276. removePod(p)
  1277. victims = append(victims, p)
  1278. klog.V(5).Infof("Pod %v/%v is a potential preemption victim on node %v.", p.Namespace, p.Name, nodeInfo.Node().Name)
  1279. }
  1280. return fits
  1281. }
  1282. for _, p := range violatingVictims {
  1283. if !reprievePod(p) {
  1284. numViolatingVictim++
  1285. }
  1286. }
  1287. // Now we try to reprieve non-violating victims.
  1288. for _, p := range nonViolatingVictims {
  1289. reprievePod(p)
  1290. }
  1291. return victims, numViolatingVictim, true
  1292. }
  1293. // unresolvablePredicateExists checks whether failedPredicates has unresolvable predicate.
  1294. func unresolvablePredicateExists(failedPredicates []predicates.PredicateFailureReason) bool {
  1295. for _, failedPredicate := range failedPredicates {
  1296. if _, ok := unresolvablePredicateFailureErrors[failedPredicate]; ok {
  1297. return true
  1298. }
  1299. }
  1300. return false
  1301. }
  1302. // nodesWherePreemptionMightHelp returns a list of nodes with failed predicates
  1303. // that may be satisfied by removing pods from the node.
  1304. func nodesWherePreemptionMightHelp(nodes []*v1.Node, failedPredicatesMap FailedPredicateMap) []*v1.Node {
  1305. potentialNodes := []*v1.Node{}
  1306. for _, node := range nodes {
  1307. failedPredicates, _ := failedPredicatesMap[node.Name]
  1308. // If we assume that scheduler looks at all nodes and populates the failedPredicateMap
  1309. // (which is the case today), the !found case should never happen, but we'd prefer
  1310. // to rely less on such assumptions in the code when checking does not impose
  1311. // significant overhead.
  1312. // Also, we currently assume all failures returned by extender as resolvable.
  1313. if !unresolvablePredicateExists(failedPredicates) {
  1314. klog.V(3).Infof("Node %v is a potential node for preemption.", node.Name)
  1315. potentialNodes = append(potentialNodes, node)
  1316. }
  1317. }
  1318. return potentialNodes
  1319. }
  1320. // podEligibleToPreemptOthers determines whether this pod should be considered
  1321. // for preempting other pods or not. If this pod has already preempted other
  1322. // pods and those are in their graceful termination period, it shouldn't be
  1323. // considered for preemption.
  1324. // We look at the node that is nominated for this pod and as long as there are
  1325. // terminating pods on the node, we don't consider this for preempting more pods.
  1326. func podEligibleToPreemptOthers(pod *v1.Pod, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo, enableNonPreempting bool) bool {
  1327. if enableNonPreempting && pod.Spec.PreemptionPolicy != nil && *pod.Spec.PreemptionPolicy == v1.PreemptNever {
  1328. klog.V(5).Infof("Pod %v/%v is not eligible for preemption because it has a preemptionPolicy of %v", pod.Namespace, pod.Name, v1.PreemptNever)
  1329. return false
  1330. }
  1331. nomNodeName := pod.Status.NominatedNodeName
  1332. if len(nomNodeName) > 0 {
  1333. if nodeInfo, found := nodeNameToInfo[nomNodeName]; found {
  1334. podPriority := util.GetPodPriority(pod)
  1335. for _, p := range nodeInfo.Pods() {
  1336. if p.DeletionTimestamp != nil && util.GetPodPriority(p) < podPriority {
  1337. // There is a terminating pod on the nominated node.
  1338. return false
  1339. }
  1340. }
  1341. }
  1342. }
  1343. return true
  1344. }
  1345. // podPassesBasicChecks makes sanity checks on the pod if it can be scheduled.
  1346. func podPassesBasicChecks(pod *v1.Pod, pvcLister corelisters.PersistentVolumeClaimLister) error {
  1347. // Check PVCs used by the pod
  1348. namespace := pod.Namespace
  1349. manifest := &(pod.Spec)
  1350. for i := range manifest.Volumes {
  1351. volume := &manifest.Volumes[i]
  1352. if volume.PersistentVolumeClaim == nil {
  1353. // Volume is not a PVC, ignore
  1354. continue
  1355. }
  1356. pvcName := volume.PersistentVolumeClaim.ClaimName
  1357. pvc, err := pvcLister.PersistentVolumeClaims(namespace).Get(pvcName)
  1358. if err != nil {
  1359. // The error has already enough context ("persistentvolumeclaim "myclaim" not found")
  1360. return err
  1361. }
  1362. if pvc.DeletionTimestamp != nil {
  1363. return fmt.Errorf("persistentvolumeclaim %q is being deleted", pvc.Name)
  1364. }
  1365. }
  1366. return nil
  1367. }
  1368. // NewGenericScheduler creates a genericScheduler object.
  1369. func NewGenericScheduler(
  1370. cache internalcache.Cache,
  1371. podQueue internalqueue.SchedulingQueue,
  1372. predicates map[string]predicates.FitPredicate,
  1373. predicateMetaProducer predicates.PredicateMetadataProducer,
  1374. prioritizers []priorities.PriorityConfig,
  1375. priorityMetaProducer priorities.PriorityMetadataProducer,
  1376. framework framework.Framework,
  1377. extenders []algorithm.SchedulerExtender,
  1378. volumeBinder *volumebinder.VolumeBinder,
  1379. pvcLister corelisters.PersistentVolumeClaimLister,
  1380. pdbLister algorithm.PDBLister,
  1381. alwaysCheckAllPredicates bool,
  1382. disablePreemption bool,
  1383. percentageOfNodesToScore int32,
  1384. enableNonPreempting bool,
  1385. ) ScheduleAlgorithm {
  1386. return &genericScheduler{
  1387. cache: cache,
  1388. schedulingQueue: podQueue,
  1389. predicates: predicates,
  1390. predicateMetaProducer: predicateMetaProducer,
  1391. prioritizers: prioritizers,
  1392. priorityMetaProducer: priorityMetaProducer,
  1393. framework: framework,
  1394. extenders: extenders,
  1395. nodeInfoSnapshot: framework.NodeInfoSnapshot(),
  1396. volumeBinder: volumeBinder,
  1397. pvcLister: pvcLister,
  1398. pdbLister: pdbLister,
  1399. alwaysCheckAllPredicates: alwaysCheckAllPredicates,
  1400. disablePreemption: disablePreemption,
  1401. percentageOfNodesToScore: percentageOfNodesToScore,
  1402. enableNonPreempting: enableNonPreempting,
  1403. }
  1404. }