framework.go 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918
  1. /*
  2. Copyright 2019 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package v1alpha1
  14. import (
  15. "context"
  16. "fmt"
  17. "reflect"
  18. "time"
  19. v1 "k8s.io/api/core/v1"
  20. "k8s.io/apimachinery/pkg/runtime"
  21. "k8s.io/apimachinery/pkg/types"
  22. "k8s.io/apimachinery/pkg/util/sets"
  23. "k8s.io/client-go/informers"
  24. clientset "k8s.io/client-go/kubernetes"
  25. "k8s.io/client-go/util/workqueue"
  26. "k8s.io/klog"
  27. "k8s.io/kubernetes/pkg/scheduler/apis/config"
  28. schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
  29. "k8s.io/kubernetes/pkg/scheduler/metrics"
  30. schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
  31. schedutil "k8s.io/kubernetes/pkg/scheduler/util"
  32. "k8s.io/kubernetes/pkg/scheduler/volumebinder"
  33. )
  34. const (
  35. // Filter is the name of the filter extension point.
  36. Filter = "Filter"
  37. // Specifies the maximum timeout a permit plugin can return.
  38. maxTimeout time.Duration = 15 * time.Minute
  39. preFilter = "PreFilter"
  40. preFilterExtensionAddPod = "PreFilterExtensionAddPod"
  41. preFilterExtensionRemovePod = "PreFilterExtensionRemovePod"
  42. preScore = "PreScore"
  43. score = "Score"
  44. scoreExtensionNormalize = "ScoreExtensionNormalize"
  45. preBind = "PreBind"
  46. bind = "Bind"
  47. postBind = "PostBind"
  48. reserve = "Reserve"
  49. unreserve = "Unreserve"
  50. permit = "Permit"
  51. )
  52. // framework is the component responsible for initializing and running scheduler
  53. // plugins.
  54. type framework struct {
  55. registry Registry
  56. snapshotSharedLister schedulerlisters.SharedLister
  57. waitingPods *waitingPodsMap
  58. pluginNameToWeightMap map[string]int
  59. queueSortPlugins []QueueSortPlugin
  60. preFilterPlugins []PreFilterPlugin
  61. filterPlugins []FilterPlugin
  62. preScorePlugins []PreScorePlugin
  63. scorePlugins []ScorePlugin
  64. reservePlugins []ReservePlugin
  65. preBindPlugins []PreBindPlugin
  66. bindPlugins []BindPlugin
  67. postBindPlugins []PostBindPlugin
  68. unreservePlugins []UnreservePlugin
  69. permitPlugins []PermitPlugin
  70. clientSet clientset.Interface
  71. informerFactory informers.SharedInformerFactory
  72. volumeBinder *volumebinder.VolumeBinder
  73. metricsRecorder *metricsRecorder
  74. // Indicates that RunFilterPlugins should accumulate all failed statuses and not return
  75. // after the first failure.
  76. runAllFilters bool
  77. }
  78. // extensionPoint encapsulates desired and applied set of plugins at a specific extension
  79. // point. This is used to simplify iterating over all extension points supported by the
  80. // framework.
  81. type extensionPoint struct {
  82. // the set of plugins to be configured at this extension point.
  83. plugins *config.PluginSet
  84. // a pointer to the slice storing plugins implementations that will run at this
  85. // extension point.
  86. slicePtr interface{}
  87. }
  88. func (f *framework) getExtensionPoints(plugins *config.Plugins) []extensionPoint {
  89. return []extensionPoint{
  90. {plugins.PreFilter, &f.preFilterPlugins},
  91. {plugins.Filter, &f.filterPlugins},
  92. {plugins.Reserve, &f.reservePlugins},
  93. {plugins.PreScore, &f.preScorePlugins},
  94. {plugins.Score, &f.scorePlugins},
  95. {plugins.PreBind, &f.preBindPlugins},
  96. {plugins.Bind, &f.bindPlugins},
  97. {plugins.PostBind, &f.postBindPlugins},
  98. {plugins.Unreserve, &f.unreservePlugins},
  99. {plugins.Permit, &f.permitPlugins},
  100. {plugins.QueueSort, &f.queueSortPlugins},
  101. }
  102. }
  103. type frameworkOptions struct {
  104. clientSet clientset.Interface
  105. informerFactory informers.SharedInformerFactory
  106. snapshotSharedLister schedulerlisters.SharedLister
  107. metricsRecorder *metricsRecorder
  108. volumeBinder *volumebinder.VolumeBinder
  109. runAllFilters bool
  110. }
  111. // Option for the framework.
  112. type Option func(*frameworkOptions)
  113. // WithClientSet sets clientSet for the scheduling framework.
  114. func WithClientSet(clientSet clientset.Interface) Option {
  115. return func(o *frameworkOptions) {
  116. o.clientSet = clientSet
  117. }
  118. }
  119. // WithInformerFactory sets informer factory for the scheduling framework.
  120. func WithInformerFactory(informerFactory informers.SharedInformerFactory) Option {
  121. return func(o *frameworkOptions) {
  122. o.informerFactory = informerFactory
  123. }
  124. }
  125. // WithSnapshotSharedLister sets the SharedLister of the snapshot.
  126. func WithSnapshotSharedLister(snapshotSharedLister schedulerlisters.SharedLister) Option {
  127. return func(o *frameworkOptions) {
  128. o.snapshotSharedLister = snapshotSharedLister
  129. }
  130. }
  131. // WithRunAllFilters sets the runAllFilters flag, which means RunFilterPlugins accumulates
  132. // all failure Statuses.
  133. func WithRunAllFilters(runAllFilters bool) Option {
  134. return func(o *frameworkOptions) {
  135. o.runAllFilters = runAllFilters
  136. }
  137. }
  138. // withMetricsRecorder is only used in tests.
  139. func withMetricsRecorder(recorder *metricsRecorder) Option {
  140. return func(o *frameworkOptions) {
  141. o.metricsRecorder = recorder
  142. }
  143. }
  144. // WithVolumeBinder sets volume binder for the scheduling framework.
  145. func WithVolumeBinder(binder *volumebinder.VolumeBinder) Option {
  146. return func(o *frameworkOptions) {
  147. o.volumeBinder = binder
  148. }
  149. }
  150. var defaultFrameworkOptions = frameworkOptions{
  151. metricsRecorder: newMetricsRecorder(1000, time.Second),
  152. }
  153. var _ Framework = &framework{}
  154. // NewFramework initializes plugins given the configuration and the registry.
  155. func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfig, opts ...Option) (Framework, error) {
  156. options := defaultFrameworkOptions
  157. for _, opt := range opts {
  158. opt(&options)
  159. }
  160. f := &framework{
  161. registry: r,
  162. snapshotSharedLister: options.snapshotSharedLister,
  163. pluginNameToWeightMap: make(map[string]int),
  164. waitingPods: newWaitingPodsMap(),
  165. clientSet: options.clientSet,
  166. informerFactory: options.informerFactory,
  167. volumeBinder: options.volumeBinder,
  168. metricsRecorder: options.metricsRecorder,
  169. runAllFilters: options.runAllFilters,
  170. }
  171. if plugins == nil {
  172. return f, nil
  173. }
  174. // get needed plugins from config
  175. pg := f.pluginsNeeded(plugins)
  176. if len(pg) == 0 {
  177. return f, nil
  178. }
  179. pluginConfig := make(map[string]*runtime.Unknown, 0)
  180. for i := range args {
  181. pluginConfig[args[i].Name] = &args[i].Args
  182. }
  183. pluginsMap := make(map[string]Plugin)
  184. var totalPriority int64
  185. for name, factory := range r {
  186. // initialize only needed plugins.
  187. if _, ok := pg[name]; !ok {
  188. continue
  189. }
  190. p, err := factory(pluginConfig[name], f)
  191. if err != nil {
  192. return nil, fmt.Errorf("error initializing plugin %q: %v", name, err)
  193. }
  194. pluginsMap[name] = p
  195. // a weight of zero is not permitted, plugins can be disabled explicitly
  196. // when configured.
  197. f.pluginNameToWeightMap[name] = int(pg[name].Weight)
  198. if f.pluginNameToWeightMap[name] == 0 {
  199. f.pluginNameToWeightMap[name] = 1
  200. }
  201. // Checks totalPriority against MaxTotalScore to avoid overflow
  202. if int64(f.pluginNameToWeightMap[name])*MaxNodeScore > MaxTotalScore-totalPriority {
  203. return nil, fmt.Errorf("total score of Score plugins could overflow")
  204. }
  205. totalPriority += int64(f.pluginNameToWeightMap[name]) * MaxNodeScore
  206. }
  207. for _, e := range f.getExtensionPoints(plugins) {
  208. if err := updatePluginList(e.slicePtr, e.plugins, pluginsMap); err != nil {
  209. return nil, err
  210. }
  211. }
  212. // Verifying the score weights again since Plugin.Name() could return a different
  213. // value from the one used in the configuration.
  214. for _, scorePlugin := range f.scorePlugins {
  215. if f.pluginNameToWeightMap[scorePlugin.Name()] == 0 {
  216. return nil, fmt.Errorf("score plugin %q is not configured with weight", scorePlugin.Name())
  217. }
  218. }
  219. if len(f.queueSortPlugins) == 0 {
  220. return nil, fmt.Errorf("no queue sort plugin is enabled")
  221. }
  222. if len(f.queueSortPlugins) > 1 {
  223. return nil, fmt.Errorf("only one queue sort plugin can be enabled")
  224. }
  225. if len(f.bindPlugins) == 0 {
  226. return nil, fmt.Errorf("at least one bind plugin is needed")
  227. }
  228. return f, nil
  229. }
  230. func updatePluginList(pluginList interface{}, pluginSet *config.PluginSet, pluginsMap map[string]Plugin) error {
  231. if pluginSet == nil {
  232. return nil
  233. }
  234. plugins := reflect.ValueOf(pluginList).Elem()
  235. pluginType := plugins.Type().Elem()
  236. set := sets.NewString()
  237. for _, ep := range pluginSet.Enabled {
  238. pg, ok := pluginsMap[ep.Name]
  239. if !ok {
  240. return fmt.Errorf("%s %q does not exist", pluginType.Name(), ep.Name)
  241. }
  242. if !reflect.TypeOf(pg).Implements(pluginType) {
  243. return fmt.Errorf("plugin %q does not extend %s plugin", ep.Name, pluginType.Name())
  244. }
  245. if set.Has(ep.Name) {
  246. return fmt.Errorf("plugin %q already registered as %q", ep.Name, pluginType.Name())
  247. }
  248. set.Insert(ep.Name)
  249. newPlugins := reflect.Append(plugins, reflect.ValueOf(pg))
  250. plugins.Set(newPlugins)
  251. }
  252. return nil
  253. }
  254. // QueueSortFunc returns the function to sort pods in scheduling queue
  255. func (f *framework) QueueSortFunc() LessFunc {
  256. if f == nil {
  257. // If framework is nil, simply keep their order unchanged.
  258. // NOTE: this is primarily for tests.
  259. return func(_, _ *PodInfo) bool { return false }
  260. }
  261. if len(f.queueSortPlugins) == 0 {
  262. panic("No QueueSort plugin is registered in the framework.")
  263. }
  264. // Only one QueueSort plugin can be enabled.
  265. return f.queueSortPlugins[0].Less
  266. }
  267. // RunPreFilterPlugins runs the set of configured PreFilter plugins. It returns
  268. // *Status and its code is set to non-success if any of the plugins returns
  269. // anything but Success. If a non-success status is returned, then the scheduling
  270. // cycle is aborted.
  271. func (f *framework) RunPreFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod) (status *Status) {
  272. startTime := time.Now()
  273. defer func() {
  274. metrics.FrameworkExtensionPointDuration.WithLabelValues(preFilter, status.Code().String()).Observe(metrics.SinceInSeconds(startTime))
  275. }()
  276. for _, pl := range f.preFilterPlugins {
  277. status = f.runPreFilterPlugin(ctx, pl, state, pod)
  278. if !status.IsSuccess() {
  279. if status.IsUnschedulable() {
  280. msg := fmt.Sprintf("rejected by %q at prefilter: %v", pl.Name(), status.Message())
  281. klog.V(4).Infof(msg)
  282. return NewStatus(status.Code(), msg)
  283. }
  284. msg := fmt.Sprintf("error while running %q prefilter plugin for pod %q: %v", pl.Name(), pod.Name, status.Message())
  285. klog.Error(msg)
  286. return NewStatus(Error, msg)
  287. }
  288. }
  289. return nil
  290. }
  291. func (f *framework) runPreFilterPlugin(ctx context.Context, pl PreFilterPlugin, state *CycleState, pod *v1.Pod) *Status {
  292. if !state.ShouldRecordPluginMetrics() {
  293. return pl.PreFilter(ctx, state, pod)
  294. }
  295. startTime := time.Now()
  296. status := pl.PreFilter(ctx, state, pod)
  297. f.metricsRecorder.observePluginDurationAsync(preFilter, pl.Name(), status, metrics.SinceInSeconds(startTime))
  298. return status
  299. }
  300. // RunPreFilterExtensionAddPod calls the AddPod interface for the set of configured
  301. // PreFilter plugins. It returns directly if any of the plugins return any
  302. // status other than Success.
  303. func (f *framework) RunPreFilterExtensionAddPod(
  304. ctx context.Context,
  305. state *CycleState,
  306. podToSchedule *v1.Pod,
  307. podToAdd *v1.Pod,
  308. nodeInfo *schedulernodeinfo.NodeInfo,
  309. ) (status *Status) {
  310. for _, pl := range f.preFilterPlugins {
  311. if pl.PreFilterExtensions() == nil {
  312. continue
  313. }
  314. status = f.runPreFilterExtensionAddPod(ctx, pl, state, podToSchedule, podToAdd, nodeInfo)
  315. if !status.IsSuccess() {
  316. msg := fmt.Sprintf("error while running AddPod for plugin %q while scheduling pod %q: %v",
  317. pl.Name(), podToSchedule.Name, status.Message())
  318. klog.Error(msg)
  319. return NewStatus(Error, msg)
  320. }
  321. }
  322. return nil
  323. }
  324. func (f *framework) runPreFilterExtensionAddPod(ctx context.Context, pl PreFilterPlugin, state *CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status {
  325. if !state.ShouldRecordPluginMetrics() {
  326. return pl.PreFilterExtensions().AddPod(ctx, state, podToSchedule, podToAdd, nodeInfo)
  327. }
  328. startTime := time.Now()
  329. status := pl.PreFilterExtensions().AddPod(ctx, state, podToSchedule, podToAdd, nodeInfo)
  330. f.metricsRecorder.observePluginDurationAsync(preFilterExtensionAddPod, pl.Name(), status, metrics.SinceInSeconds(startTime))
  331. return status
  332. }
  333. // RunPreFilterExtensionRemovePod calls the RemovePod interface for the set of configured
  334. // PreFilter plugins. It returns directly if any of the plugins return any
  335. // status other than Success.
  336. func (f *framework) RunPreFilterExtensionRemovePod(
  337. ctx context.Context,
  338. state *CycleState,
  339. podToSchedule *v1.Pod,
  340. podToRemove *v1.Pod,
  341. nodeInfo *schedulernodeinfo.NodeInfo,
  342. ) (status *Status) {
  343. for _, pl := range f.preFilterPlugins {
  344. if pl.PreFilterExtensions() == nil {
  345. continue
  346. }
  347. status = f.runPreFilterExtensionRemovePod(ctx, pl, state, podToSchedule, podToRemove, nodeInfo)
  348. if !status.IsSuccess() {
  349. msg := fmt.Sprintf("error while running RemovePod for plugin %q while scheduling pod %q: %v",
  350. pl.Name(), podToSchedule.Name, status.Message())
  351. klog.Error(msg)
  352. return NewStatus(Error, msg)
  353. }
  354. }
  355. return nil
  356. }
  357. func (f *framework) runPreFilterExtensionRemovePod(ctx context.Context, pl PreFilterPlugin, state *CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status {
  358. if !state.ShouldRecordPluginMetrics() {
  359. return pl.PreFilterExtensions().RemovePod(ctx, state, podToSchedule, podToAdd, nodeInfo)
  360. }
  361. startTime := time.Now()
  362. status := pl.PreFilterExtensions().RemovePod(ctx, state, podToSchedule, podToAdd, nodeInfo)
  363. f.metricsRecorder.observePluginDurationAsync(preFilterExtensionRemovePod, pl.Name(), status, metrics.SinceInSeconds(startTime))
  364. return status
  365. }
  366. // RunFilterPlugins runs the set of configured Filter plugins for pod on
  367. // the given node. If any of these plugins doesn't return "Success", the
  368. // given node is not suitable for running pod.
  369. // Meanwhile, the failure message and status are set for the given node.
  370. func (f *framework) RunFilterPlugins(
  371. ctx context.Context,
  372. state *CycleState,
  373. pod *v1.Pod,
  374. nodeInfo *schedulernodeinfo.NodeInfo,
  375. ) PluginToStatus {
  376. var firstFailedStatus *Status
  377. statuses := make(PluginToStatus)
  378. for _, pl := range f.filterPlugins {
  379. pluginStatus := f.runFilterPlugin(ctx, pl, state, pod, nodeInfo)
  380. if len(statuses) == 0 {
  381. firstFailedStatus = pluginStatus
  382. }
  383. if !pluginStatus.IsSuccess() {
  384. if !pluginStatus.IsUnschedulable() {
  385. // Filter plugins are not supposed to return any status other than
  386. // Success or Unschedulable.
  387. firstFailedStatus = NewStatus(Error, fmt.Sprintf("running %q filter plugin for pod %q: %v", pl.Name(), pod.Name, pluginStatus.Message()))
  388. return map[string]*Status{pl.Name(): firstFailedStatus}
  389. }
  390. statuses[pl.Name()] = pluginStatus
  391. if !f.runAllFilters {
  392. // Exit early if we don't need to run all filters.
  393. return statuses
  394. }
  395. }
  396. }
  397. return statuses
  398. }
  399. func (f *framework) runFilterPlugin(ctx context.Context, pl FilterPlugin, state *CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status {
  400. if !state.ShouldRecordPluginMetrics() {
  401. return pl.Filter(ctx, state, pod, nodeInfo)
  402. }
  403. startTime := time.Now()
  404. status := pl.Filter(ctx, state, pod, nodeInfo)
  405. f.metricsRecorder.observePluginDurationAsync(Filter, pl.Name(), status, metrics.SinceInSeconds(startTime))
  406. return status
  407. }
  408. // RunPreScorePlugins runs the set of configured pre-score plugins. If any
  409. // of these plugins returns any status other than "Success", the given pod is rejected.
  410. func (f *framework) RunPreScorePlugins(
  411. ctx context.Context,
  412. state *CycleState,
  413. pod *v1.Pod,
  414. nodes []*v1.Node,
  415. ) (status *Status) {
  416. startTime := time.Now()
  417. defer func() {
  418. metrics.FrameworkExtensionPointDuration.WithLabelValues(preScore, status.Code().String()).Observe(metrics.SinceInSeconds(startTime))
  419. }()
  420. for _, pl := range f.preScorePlugins {
  421. status = f.runPreScorePlugin(ctx, pl, state, pod, nodes)
  422. if !status.IsSuccess() {
  423. msg := fmt.Sprintf("error while running %q prescore plugin for pod %q: %v", pl.Name(), pod.Name, status.Message())
  424. klog.Error(msg)
  425. return NewStatus(Error, msg)
  426. }
  427. }
  428. return nil
  429. }
  430. func (f *framework) runPreScorePlugin(ctx context.Context, pl PreScorePlugin, state *CycleState, pod *v1.Pod, nodes []*v1.Node) *Status {
  431. if !state.ShouldRecordPluginMetrics() {
  432. return pl.PreScore(ctx, state, pod, nodes)
  433. }
  434. startTime := time.Now()
  435. status := pl.PreScore(ctx, state, pod, nodes)
  436. f.metricsRecorder.observePluginDurationAsync(preScore, pl.Name(), status, metrics.SinceInSeconds(startTime))
  437. return status
  438. }
  439. // RunScorePlugins runs the set of configured scoring plugins. It returns a list that
  440. // stores for each scoring plugin name the corresponding NodeScoreList(s).
  441. // It also returns *Status, which is set to non-success if any of the plugins returns
  442. // a non-success status.
  443. func (f *framework) RunScorePlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodes []*v1.Node) (ps PluginToNodeScores, status *Status) {
  444. startTime := time.Now()
  445. defer func() {
  446. metrics.FrameworkExtensionPointDuration.WithLabelValues(score, status.Code().String()).Observe(metrics.SinceInSeconds(startTime))
  447. }()
  448. pluginToNodeScores := make(PluginToNodeScores, len(f.scorePlugins))
  449. for _, pl := range f.scorePlugins {
  450. pluginToNodeScores[pl.Name()] = make(NodeScoreList, len(nodes))
  451. }
  452. ctx, cancel := context.WithCancel(ctx)
  453. errCh := schedutil.NewErrorChannel()
  454. // Run Score method for each node in parallel.
  455. workqueue.ParallelizeUntil(ctx, 16, len(nodes), func(index int) {
  456. for _, pl := range f.scorePlugins {
  457. nodeName := nodes[index].Name
  458. s, status := f.runScorePlugin(ctx, pl, state, pod, nodeName)
  459. if !status.IsSuccess() {
  460. errCh.SendErrorWithCancel(fmt.Errorf(status.Message()), cancel)
  461. return
  462. }
  463. pluginToNodeScores[pl.Name()][index] = NodeScore{
  464. Name: nodeName,
  465. Score: int64(s),
  466. }
  467. }
  468. })
  469. if err := errCh.ReceiveError(); err != nil {
  470. msg := fmt.Sprintf("error while running score plugin for pod %q: %v", pod.Name, err)
  471. klog.Error(msg)
  472. return nil, NewStatus(Error, msg)
  473. }
  474. // Run NormalizeScore method for each ScorePlugin in parallel.
  475. workqueue.ParallelizeUntil(ctx, 16, len(f.scorePlugins), func(index int) {
  476. pl := f.scorePlugins[index]
  477. nodeScoreList := pluginToNodeScores[pl.Name()]
  478. if pl.ScoreExtensions() == nil {
  479. return
  480. }
  481. status := f.runScoreExtension(ctx, pl, state, pod, nodeScoreList)
  482. if !status.IsSuccess() {
  483. err := fmt.Errorf("normalize score plugin %q failed with error %v", pl.Name(), status.Message())
  484. errCh.SendErrorWithCancel(err, cancel)
  485. return
  486. }
  487. })
  488. if err := errCh.ReceiveError(); err != nil {
  489. msg := fmt.Sprintf("error while running normalize score plugin for pod %q: %v", pod.Name, err)
  490. klog.Error(msg)
  491. return nil, NewStatus(Error, msg)
  492. }
  493. // Apply score defaultWeights for each ScorePlugin in parallel.
  494. workqueue.ParallelizeUntil(ctx, 16, len(f.scorePlugins), func(index int) {
  495. pl := f.scorePlugins[index]
  496. // Score plugins' weight has been checked when they are initialized.
  497. weight := f.pluginNameToWeightMap[pl.Name()]
  498. nodeScoreList := pluginToNodeScores[pl.Name()]
  499. for i, nodeScore := range nodeScoreList {
  500. // return error if score plugin returns invalid score.
  501. if nodeScore.Score > int64(MaxNodeScore) || nodeScore.Score < int64(MinNodeScore) {
  502. err := fmt.Errorf("score plugin %q returns an invalid score %v, it should in the range of [%v, %v] after normalizing", pl.Name(), nodeScore.Score, MinNodeScore, MaxNodeScore)
  503. errCh.SendErrorWithCancel(err, cancel)
  504. return
  505. }
  506. nodeScoreList[i].Score = nodeScore.Score * int64(weight)
  507. }
  508. })
  509. if err := errCh.ReceiveError(); err != nil {
  510. msg := fmt.Sprintf("error while applying score defaultWeights for pod %q: %v", pod.Name, err)
  511. klog.Error(msg)
  512. return nil, NewStatus(Error, msg)
  513. }
  514. return pluginToNodeScores, nil
  515. }
  516. func (f *framework) runScorePlugin(ctx context.Context, pl ScorePlugin, state *CycleState, pod *v1.Pod, nodeName string) (int64, *Status) {
  517. if !state.ShouldRecordPluginMetrics() {
  518. return pl.Score(ctx, state, pod, nodeName)
  519. }
  520. startTime := time.Now()
  521. s, status := pl.Score(ctx, state, pod, nodeName)
  522. f.metricsRecorder.observePluginDurationAsync(score, pl.Name(), status, metrics.SinceInSeconds(startTime))
  523. return s, status
  524. }
  525. func (f *framework) runScoreExtension(ctx context.Context, pl ScorePlugin, state *CycleState, pod *v1.Pod, nodeScoreList NodeScoreList) *Status {
  526. if !state.ShouldRecordPluginMetrics() {
  527. return pl.ScoreExtensions().NormalizeScore(ctx, state, pod, nodeScoreList)
  528. }
  529. startTime := time.Now()
  530. status := pl.ScoreExtensions().NormalizeScore(ctx, state, pod, nodeScoreList)
  531. f.metricsRecorder.observePluginDurationAsync(scoreExtensionNormalize, pl.Name(), status, metrics.SinceInSeconds(startTime))
  532. return status
  533. }
  534. // RunPreBindPlugins runs the set of configured prebind plugins. It returns a
  535. // failure (bool) if any of the plugins returns an error. It also returns an
  536. // error containing the rejection message or the error occurred in the plugin.
  537. func (f *framework) RunPreBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) {
  538. startTime := time.Now()
  539. defer func() {
  540. metrics.FrameworkExtensionPointDuration.WithLabelValues(preBind, status.Code().String()).Observe(metrics.SinceInSeconds(startTime))
  541. }()
  542. for _, pl := range f.preBindPlugins {
  543. status = f.runPreBindPlugin(ctx, pl, state, pod, nodeName)
  544. if !status.IsSuccess() {
  545. msg := fmt.Sprintf("error while running %q prebind plugin for pod %q: %v", pl.Name(), pod.Name, status.Message())
  546. klog.Error(msg)
  547. return NewStatus(Error, msg)
  548. }
  549. }
  550. return nil
  551. }
  552. func (f *framework) runPreBindPlugin(ctx context.Context, pl PreBindPlugin, state *CycleState, pod *v1.Pod, nodeName string) *Status {
  553. if !state.ShouldRecordPluginMetrics() {
  554. return pl.PreBind(ctx, state, pod, nodeName)
  555. }
  556. startTime := time.Now()
  557. status := pl.PreBind(ctx, state, pod, nodeName)
  558. f.metricsRecorder.observePluginDurationAsync(preBind, pl.Name(), status, metrics.SinceInSeconds(startTime))
  559. return status
  560. }
  561. // RunBindPlugins runs the set of configured bind plugins until one returns a non `Skip` status.
  562. func (f *framework) RunBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) {
  563. startTime := time.Now()
  564. defer func() {
  565. metrics.FrameworkExtensionPointDuration.WithLabelValues(bind, status.Code().String()).Observe(metrics.SinceInSeconds(startTime))
  566. }()
  567. if len(f.bindPlugins) == 0 {
  568. return NewStatus(Skip, "")
  569. }
  570. for _, bp := range f.bindPlugins {
  571. status = f.runBindPlugin(ctx, bp, state, pod, nodeName)
  572. if status != nil && status.Code() == Skip {
  573. continue
  574. }
  575. if !status.IsSuccess() {
  576. msg := fmt.Sprintf("plugin %q failed to bind pod \"%v/%v\": %v", bp.Name(), pod.Namespace, pod.Name, status.Message())
  577. klog.Error(msg)
  578. return NewStatus(Error, msg)
  579. }
  580. return status
  581. }
  582. return status
  583. }
  584. func (f *framework) runBindPlugin(ctx context.Context, bp BindPlugin, state *CycleState, pod *v1.Pod, nodeName string) *Status {
  585. if !state.ShouldRecordPluginMetrics() {
  586. return bp.Bind(ctx, state, pod, nodeName)
  587. }
  588. startTime := time.Now()
  589. status := bp.Bind(ctx, state, pod, nodeName)
  590. f.metricsRecorder.observePluginDurationAsync(bind, bp.Name(), status, metrics.SinceInSeconds(startTime))
  591. return status
  592. }
  593. // RunPostBindPlugins runs the set of configured postbind plugins.
  594. func (f *framework) RunPostBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) {
  595. startTime := time.Now()
  596. defer func() {
  597. metrics.FrameworkExtensionPointDuration.WithLabelValues(postBind, Success.String()).Observe(metrics.SinceInSeconds(startTime))
  598. }()
  599. for _, pl := range f.postBindPlugins {
  600. f.runPostBindPlugin(ctx, pl, state, pod, nodeName)
  601. }
  602. }
  603. func (f *framework) runPostBindPlugin(ctx context.Context, pl PostBindPlugin, state *CycleState, pod *v1.Pod, nodeName string) {
  604. if !state.ShouldRecordPluginMetrics() {
  605. pl.PostBind(ctx, state, pod, nodeName)
  606. return
  607. }
  608. startTime := time.Now()
  609. pl.PostBind(ctx, state, pod, nodeName)
  610. f.metricsRecorder.observePluginDurationAsync(postBind, pl.Name(), nil, metrics.SinceInSeconds(startTime))
  611. }
  612. // RunReservePlugins runs the set of configured reserve plugins. If any of these
  613. // plugins returns an error, it does not continue running the remaining ones and
  614. // returns the error. In such case, pod will not be scheduled.
  615. func (f *framework) RunReservePlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) {
  616. startTime := time.Now()
  617. defer func() {
  618. metrics.FrameworkExtensionPointDuration.WithLabelValues(reserve, status.Code().String()).Observe(metrics.SinceInSeconds(startTime))
  619. }()
  620. for _, pl := range f.reservePlugins {
  621. status = f.runReservePlugin(ctx, pl, state, pod, nodeName)
  622. if !status.IsSuccess() {
  623. msg := fmt.Sprintf("error while running %q reserve plugin for pod %q: %v", pl.Name(), pod.Name, status.Message())
  624. klog.Error(msg)
  625. return NewStatus(Error, msg)
  626. }
  627. }
  628. return nil
  629. }
  630. func (f *framework) runReservePlugin(ctx context.Context, pl ReservePlugin, state *CycleState, pod *v1.Pod, nodeName string) *Status {
  631. if !state.ShouldRecordPluginMetrics() {
  632. return pl.Reserve(ctx, state, pod, nodeName)
  633. }
  634. startTime := time.Now()
  635. status := pl.Reserve(ctx, state, pod, nodeName)
  636. f.metricsRecorder.observePluginDurationAsync(reserve, pl.Name(), status, metrics.SinceInSeconds(startTime))
  637. return status
  638. }
  639. // RunUnreservePlugins runs the set of configured unreserve plugins.
  640. func (f *framework) RunUnreservePlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) {
  641. startTime := time.Now()
  642. defer func() {
  643. metrics.FrameworkExtensionPointDuration.WithLabelValues(unreserve, Success.String()).Observe(metrics.SinceInSeconds(startTime))
  644. }()
  645. for _, pl := range f.unreservePlugins {
  646. f.runUnreservePlugin(ctx, pl, state, pod, nodeName)
  647. }
  648. }
  649. func (f *framework) runUnreservePlugin(ctx context.Context, pl UnreservePlugin, state *CycleState, pod *v1.Pod, nodeName string) {
  650. if !state.ShouldRecordPluginMetrics() {
  651. pl.Unreserve(ctx, state, pod, nodeName)
  652. return
  653. }
  654. startTime := time.Now()
  655. pl.Unreserve(ctx, state, pod, nodeName)
  656. f.metricsRecorder.observePluginDurationAsync(unreserve, pl.Name(), nil, metrics.SinceInSeconds(startTime))
  657. }
  658. // RunPermitPlugins runs the set of configured permit plugins. If any of these
  659. // plugins returns a status other than "Success" or "Wait", it does not continue
  660. // running the remaining plugins and returns an error. Otherwise, if any of the
  661. // plugins returns "Wait", then this function will create and add waiting pod
  662. // to a map of currently waiting pods and return status with "Wait" code.
  663. // Pod will remain waiting pod for the minimum duration returned by the permit plugins.
  664. func (f *framework) RunPermitPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) {
  665. startTime := time.Now()
  666. defer func() {
  667. metrics.FrameworkExtensionPointDuration.WithLabelValues(permit, status.Code().String()).Observe(metrics.SinceInSeconds(startTime))
  668. }()
  669. pluginsWaitTime := make(map[string]time.Duration)
  670. statusCode := Success
  671. for _, pl := range f.permitPlugins {
  672. status, timeout := f.runPermitPlugin(ctx, pl, state, pod, nodeName)
  673. if !status.IsSuccess() {
  674. if status.IsUnschedulable() {
  675. msg := fmt.Sprintf("rejected pod %q by permit plugin %q: %v", pod.Name, pl.Name(), status.Message())
  676. klog.V(4).Infof(msg)
  677. return NewStatus(status.Code(), msg)
  678. }
  679. if status.Code() == Wait {
  680. // Not allowed to be greater than maxTimeout.
  681. if timeout > maxTimeout {
  682. timeout = maxTimeout
  683. }
  684. pluginsWaitTime[pl.Name()] = timeout
  685. statusCode = Wait
  686. } else {
  687. msg := fmt.Sprintf("error while running %q permit plugin for pod %q: %v", pl.Name(), pod.Name, status.Message())
  688. klog.Error(msg)
  689. return NewStatus(Error, msg)
  690. }
  691. }
  692. }
  693. if statusCode == Wait {
  694. waitingPod := newWaitingPod(pod, pluginsWaitTime)
  695. f.waitingPods.add(waitingPod)
  696. msg := fmt.Sprintf("one or more plugins asked to wait and no plugin rejected pod %q", pod.Name)
  697. klog.V(4).Infof(msg)
  698. return NewStatus(Wait, msg)
  699. }
  700. return nil
  701. }
  702. func (f *framework) runPermitPlugin(ctx context.Context, pl PermitPlugin, state *CycleState, pod *v1.Pod, nodeName string) (*Status, time.Duration) {
  703. if !state.ShouldRecordPluginMetrics() {
  704. return pl.Permit(ctx, state, pod, nodeName)
  705. }
  706. startTime := time.Now()
  707. status, timeout := pl.Permit(ctx, state, pod, nodeName)
  708. f.metricsRecorder.observePluginDurationAsync(permit, pl.Name(), status, metrics.SinceInSeconds(startTime))
  709. return status, timeout
  710. }
  711. // WaitOnPermit will block, if the pod is a waiting pod, until the waiting pod is rejected or allowed.
  712. func (f *framework) WaitOnPermit(ctx context.Context, pod *v1.Pod) (status *Status) {
  713. waitingPod := f.waitingPods.get(pod.UID)
  714. if waitingPod == nil {
  715. return nil
  716. }
  717. defer f.waitingPods.remove(pod.UID)
  718. klog.V(4).Infof("pod %q waiting on permit", pod.Name)
  719. startTime := time.Now()
  720. s := <-waitingPod.s
  721. metrics.PermitWaitDuration.WithLabelValues(s.Code().String()).Observe(metrics.SinceInSeconds(startTime))
  722. if !s.IsSuccess() {
  723. if s.IsUnschedulable() {
  724. msg := fmt.Sprintf("pod %q rejected while waiting on permit: %v", pod.Name, s.Message())
  725. klog.V(4).Infof(msg)
  726. return NewStatus(s.Code(), msg)
  727. }
  728. msg := fmt.Sprintf("error received while waiting on permit for pod %q: %v", pod.Name, s.Message())
  729. klog.Error(msg)
  730. return NewStatus(Error, msg)
  731. }
  732. return nil
  733. }
  734. // SnapshotSharedLister returns the scheduler's SharedLister of the latest NodeInfo
  735. // snapshot. The snapshot is taken at the beginning of a scheduling cycle and remains
  736. // unchanged until a pod finishes "Reserve". There is no guarantee that the information
  737. // remains unchanged after "Reserve".
  738. func (f *framework) SnapshotSharedLister() schedulerlisters.SharedLister {
  739. return f.snapshotSharedLister
  740. }
  741. // IterateOverWaitingPods acquires a read lock and iterates over the WaitingPods map.
  742. func (f *framework) IterateOverWaitingPods(callback func(WaitingPod)) {
  743. f.waitingPods.iterate(callback)
  744. }
  745. // GetWaitingPod returns a reference to a WaitingPod given its UID.
  746. func (f *framework) GetWaitingPod(uid types.UID) WaitingPod {
  747. if wp := f.waitingPods.get(uid); wp != nil {
  748. return wp
  749. }
  750. return nil // Returning nil instead of *waitingPod(nil).
  751. }
  752. // RejectWaitingPod rejects a WaitingPod given its UID.
  753. func (f *framework) RejectWaitingPod(uid types.UID) {
  754. waitingPod := f.waitingPods.get(uid)
  755. if waitingPod != nil {
  756. waitingPod.Reject("removed")
  757. }
  758. }
  759. // HasFilterPlugins returns true if at least one filter plugin is defined.
  760. func (f *framework) HasFilterPlugins() bool {
  761. return len(f.filterPlugins) > 0
  762. }
  763. // HasScorePlugins returns true if at least one score plugin is defined.
  764. func (f *framework) HasScorePlugins() bool {
  765. return len(f.scorePlugins) > 0
  766. }
  767. // ListPlugins returns a map of extension point name to plugin names configured at each extension
  768. // point. Returns nil if no plugins where configred.
  769. func (f *framework) ListPlugins() map[string][]config.Plugin {
  770. m := make(map[string][]config.Plugin)
  771. for _, e := range f.getExtensionPoints(&config.Plugins{}) {
  772. plugins := reflect.ValueOf(e.slicePtr).Elem()
  773. extName := plugins.Type().Elem().Name()
  774. var cfgs []config.Plugin
  775. for i := 0; i < plugins.Len(); i++ {
  776. name := plugins.Index(i).Interface().(Plugin).Name()
  777. p := config.Plugin{Name: name}
  778. if extName == "ScorePlugin" {
  779. // Weights apply only to score plugins.
  780. p.Weight = int32(f.pluginNameToWeightMap[name])
  781. }
  782. cfgs = append(cfgs, p)
  783. }
  784. if len(cfgs) > 0 {
  785. m[extName] = cfgs
  786. }
  787. }
  788. if len(m) > 0 {
  789. return m
  790. }
  791. return nil
  792. }
  793. // ClientSet returns a kubernetes clientset.
  794. func (f *framework) ClientSet() clientset.Interface {
  795. return f.clientSet
  796. }
  797. // SharedInformerFactory returns a shared informer factory.
  798. func (f *framework) SharedInformerFactory() informers.SharedInformerFactory {
  799. return f.informerFactory
  800. }
  801. // VolumeBinder returns the volume binder used by scheduler.
  802. func (f *framework) VolumeBinder() *volumebinder.VolumeBinder {
  803. return f.volumeBinder
  804. }
  805. func (f *framework) pluginsNeeded(plugins *config.Plugins) map[string]config.Plugin {
  806. pgMap := make(map[string]config.Plugin)
  807. if plugins == nil {
  808. return pgMap
  809. }
  810. find := func(pgs *config.PluginSet) {
  811. if pgs == nil {
  812. return
  813. }
  814. for _, pg := range pgs.Enabled {
  815. pgMap[pg.Name] = pg
  816. }
  817. }
  818. for _, e := range f.getExtensionPoints(plugins) {
  819. find(e.plugins)
  820. }
  821. return pgMap
  822. }