123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360 |
- /*
- Copyright 2019 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package v1alpha1
- import (
- "fmt"
- "time"
- "k8s.io/api/core/v1"
- "k8s.io/apimachinery/pkg/runtime"
- "k8s.io/apimachinery/pkg/types"
- "k8s.io/klog"
- "k8s.io/kubernetes/pkg/scheduler/apis/config"
- "k8s.io/kubernetes/pkg/scheduler/internal/cache"
- )
- // framework is the component responsible for initializing and running scheduler
- // plugins.
- type framework struct {
- registry Registry
- nodeInfoSnapshot *cache.NodeInfoSnapshot
- waitingPods *waitingPodsMap
- plugins map[string]Plugin // a map of initialized plugins. Plugin name:plugin instance.
- queueSortPlugins []QueueSortPlugin
- reservePlugins []ReservePlugin
- prebindPlugins []PrebindPlugin
- postbindPlugins []PostbindPlugin
- unreservePlugins []UnreservePlugin
- permitPlugins []PermitPlugin
- }
- const (
- // Specifies the maximum timeout a permit plugin can return.
- maxTimeout time.Duration = 15 * time.Minute
- )
- var _ = Framework(&framework{})
- // NewFramework initializes plugins given the configuration and the registry.
- func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfig) (Framework, error) {
- f := &framework{
- registry: r,
- nodeInfoSnapshot: cache.NewNodeInfoSnapshot(),
- plugins: make(map[string]Plugin),
- waitingPods: newWaitingPodsMap(),
- }
- if plugins == nil {
- return f, nil
- }
- // get needed plugins from config
- pg := pluginsNeeded(plugins)
- if len(pg) == 0 {
- return f, nil
- }
- pluginConfig := pluginNameToConfig(args)
- for name, factory := range r {
- // initialize only needed plugins
- if _, ok := pg[name]; !ok {
- continue
- }
- // find the config args of a plugin
- pc := pluginConfig[name]
- p, err := factory(pc, f)
- if err != nil {
- return nil, fmt.Errorf("error initializing plugin %v: %v", name, err)
- }
- f.plugins[name] = p
- }
- if plugins.Reserve != nil {
- for _, r := range plugins.Reserve.Enabled {
- if pg, ok := f.plugins[r.Name]; ok {
- p, ok := pg.(ReservePlugin)
- if !ok {
- return nil, fmt.Errorf("plugin %v does not extend reserve plugin", r.Name)
- }
- f.reservePlugins = append(f.reservePlugins, p)
- } else {
- return nil, fmt.Errorf("reserve plugin %v does not exist", r.Name)
- }
- }
- }
- if plugins.PreBind != nil {
- for _, pb := range plugins.PreBind.Enabled {
- if pg, ok := f.plugins[pb.Name]; ok {
- p, ok := pg.(PrebindPlugin)
- if !ok {
- return nil, fmt.Errorf("plugin %v does not extend prebind plugin", pb.Name)
- }
- f.prebindPlugins = append(f.prebindPlugins, p)
- } else {
- return nil, fmt.Errorf("prebind plugin %v does not exist", pb.Name)
- }
- }
- }
- if plugins.PostBind != nil {
- for _, pb := range plugins.PostBind.Enabled {
- if pg, ok := f.plugins[pb.Name]; ok {
- p, ok := pg.(PostbindPlugin)
- if !ok {
- return nil, fmt.Errorf("plugin %v does not extend postbind plugin", pb.Name)
- }
- f.postbindPlugins = append(f.postbindPlugins, p)
- } else {
- return nil, fmt.Errorf("postbind plugin %v does not exist", pb.Name)
- }
- }
- }
- if plugins.Unreserve != nil {
- for _, ur := range plugins.Unreserve.Enabled {
- if pg, ok := f.plugins[ur.Name]; ok {
- p, ok := pg.(UnreservePlugin)
- if !ok {
- return nil, fmt.Errorf("plugin %v does not extend unreserve plugin", ur.Name)
- }
- f.unreservePlugins = append(f.unreservePlugins, p)
- } else {
- return nil, fmt.Errorf("unreserve plugin %v does not exist", ur.Name)
- }
- }
- }
- if plugins.Permit != nil {
- for _, pr := range plugins.Permit.Enabled {
- if pg, ok := f.plugins[pr.Name]; ok {
- p, ok := pg.(PermitPlugin)
- if !ok {
- return nil, fmt.Errorf("plugin %v does not extend permit plugin", pr.Name)
- }
- f.permitPlugins = append(f.permitPlugins, p)
- } else {
- return nil, fmt.Errorf("permit plugin %v does not exist", pr.Name)
- }
- }
- }
- if plugins.QueueSort != nil {
- for _, qs := range plugins.QueueSort.Enabled {
- if pg, ok := f.plugins[qs.Name]; ok {
- p, ok := pg.(QueueSortPlugin)
- if !ok {
- return nil, fmt.Errorf("plugin %v does not extend queue sort plugin", qs.Name)
- }
- f.queueSortPlugins = append(f.queueSortPlugins, p)
- if len(f.queueSortPlugins) > 1 {
- return nil, fmt.Errorf("only one queue sort plugin can be enabled")
- }
- } else {
- return nil, fmt.Errorf("queue sort plugin %v does not exist", qs.Name)
- }
- }
- }
- return f, nil
- }
- // QueueSortFunc returns the function to sort pods in scheduling queue
- func (f *framework) QueueSortFunc() LessFunc {
- if len(f.queueSortPlugins) == 0 {
- return nil
- }
- // Only one QueueSort plugin can be enabled.
- return f.queueSortPlugins[0].Less
- }
- // RunPrebindPlugins runs the set of configured prebind plugins. It returns a
- // failure (bool) if any of the plugins returns an error. It also returns an
- // error containing the rejection message or the error occurred in the plugin.
- func (f *framework) RunPrebindPlugins(
- pc *PluginContext, pod *v1.Pod, nodeName string) *Status {
- for _, pl := range f.prebindPlugins {
- status := pl.Prebind(pc, pod, nodeName)
- if !status.IsSuccess() {
- if status.Code() == Unschedulable {
- msg := fmt.Sprintf("rejected by %v at prebind: %v", pl.Name(), status.Message())
- klog.V(4).Infof(msg)
- return NewStatus(status.Code(), msg)
- }
- msg := fmt.Sprintf("error while running %v prebind plugin for pod %v: %v", pl.Name(), pod.Name, status.Message())
- klog.Error(msg)
- return NewStatus(Error, msg)
- }
- }
- return nil
- }
- // RunPostbindPlugins runs the set of configured postbind plugins.
- func (f *framework) RunPostbindPlugins(
- pc *PluginContext, pod *v1.Pod, nodeName string) {
- for _, pl := range f.postbindPlugins {
- pl.Postbind(pc, pod, nodeName)
- }
- }
- // RunReservePlugins runs the set of configured reserve plugins. If any of these
- // plugins returns an error, it does not continue running the remaining ones and
- // returns the error. In such case, pod will not be scheduled.
- func (f *framework) RunReservePlugins(
- pc *PluginContext, pod *v1.Pod, nodeName string) *Status {
- for _, pl := range f.reservePlugins {
- status := pl.Reserve(pc, pod, nodeName)
- if !status.IsSuccess() {
- msg := fmt.Sprintf("error while running %v reserve plugin for pod %v: %v", pl.Name(), pod.Name, status.Message())
- klog.Error(msg)
- return NewStatus(Error, msg)
- }
- }
- return nil
- }
- // RunUnreservePlugins runs the set of configured unreserve plugins.
- func (f *framework) RunUnreservePlugins(
- pc *PluginContext, pod *v1.Pod, nodeName string) {
- for _, pl := range f.unreservePlugins {
- pl.Unreserve(pc, pod, nodeName)
- }
- }
- // RunPermitPlugins runs the set of configured permit plugins. If any of these
- // plugins returns a status other than "Success" or "Wait", it does not continue
- // running the remaining plugins and returns an error. Otherwise, if any of the
- // plugins returns "Wait", then this function will block for the timeout period
- // returned by the plugin, if the time expires, then it will return an error.
- // Note that if multiple plugins asked to wait, then we wait for the minimum
- // timeout duration.
- func (f *framework) RunPermitPlugins(
- pc *PluginContext, pod *v1.Pod, nodeName string) *Status {
- timeout := maxTimeout
- statusCode := Success
- for _, pl := range f.permitPlugins {
- status, d := pl.Permit(pc, pod, nodeName)
- if !status.IsSuccess() {
- if status.Code() == Unschedulable {
- msg := fmt.Sprintf("rejected by %v at permit: %v", pl.Name(), status.Message())
- klog.V(4).Infof(msg)
- return NewStatus(status.Code(), msg)
- }
- if status.Code() == Wait {
- // Use the minimum timeout duration.
- if timeout > d {
- timeout = d
- }
- statusCode = Wait
- } else {
- msg := fmt.Sprintf("error while running %v permit plugin for pod %v: %v", pl.Name(), pod.Name, status.Message())
- klog.Error(msg)
- return NewStatus(Error, msg)
- }
- }
- }
- // We now wait for the minimum duration if at least one plugin asked to
- // wait (and no plugin rejected the pod)
- if statusCode == Wait {
- w := newWaitingPod(pod)
- f.waitingPods.add(w)
- defer f.waitingPods.remove(pod.UID)
- timer := time.NewTimer(timeout)
- klog.V(4).Infof("waiting for %v for pod %v at permit", timeout, pod.Name)
- select {
- case <-timer.C:
- msg := fmt.Sprintf("pod %v rejected due to timeout after waiting %v at permit", pod.Name, timeout)
- klog.V(4).Infof(msg)
- return NewStatus(Unschedulable, msg)
- case s := <-w.s:
- if !s.IsSuccess() {
- if s.Code() == Unschedulable {
- msg := fmt.Sprintf("rejected while waiting at permit: %v", s.Message())
- klog.V(4).Infof(msg)
- return NewStatus(s.Code(), msg)
- }
- msg := fmt.Sprintf("error received while waiting at permit for pod %v: %v", pod.Name, s.Message())
- klog.Error(msg)
- return NewStatus(Error, msg)
- }
- }
- }
- return nil
- }
- // NodeInfoSnapshot returns the latest NodeInfo snapshot. The snapshot
- // is taken at the beginning of a scheduling cycle and remains unchanged until a
- // pod finishes "Reserve". There is no guarantee that the information remains
- // unchanged after "Reserve".
- func (f *framework) NodeInfoSnapshot() *cache.NodeInfoSnapshot {
- return f.nodeInfoSnapshot
- }
- // IterateOverWaitingPods acquires a read lock and iterates over the WaitingPods map.
- func (f *framework) IterateOverWaitingPods(callback func(WaitingPod)) {
- f.waitingPods.iterate(callback)
- }
- // GetWaitingPod returns a reference to a WaitingPod given its UID.
- func (f *framework) GetWaitingPod(uid types.UID) WaitingPod {
- return f.waitingPods.get(uid)
- }
- func pluginNameToConfig(args []config.PluginConfig) map[string]*runtime.Unknown {
- pc := make(map[string]*runtime.Unknown, 0)
- for _, p := range args {
- pc[p.Name] = &p.Args
- }
- return pc
- }
- func pluginsNeeded(plugins *config.Plugins) map[string]struct{} {
- pgMap := make(map[string]struct{}, 0)
- if plugins == nil {
- return pgMap
- }
- find := func(pgs *config.PluginSet) {
- if pgs == nil {
- return
- }
- for _, pg := range pgs.Enabled {
- pgMap[pg.Name] = struct{}{}
- }
- }
- find(plugins.QueueSort)
- find(plugins.PreFilter)
- find(plugins.Filter)
- find(plugins.PostFilter)
- find(plugins.Score)
- find(plugins.NormalizeScore)
- find(plugins.Reserve)
- find(plugins.Permit)
- find(plugins.PreBind)
- find(plugins.Bind)
- find(plugins.PostBind)
- find(plugins.Unreserve)
- return pgMap
- }
|