123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329 |
- /*
- Copyright 2014 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- // Package app implements a Server object for running the scheduler.
- package app
- import (
- "context"
- "fmt"
- "io"
- "net/http"
- "os"
- goruntime "runtime"
- utilerrors "k8s.io/apimachinery/pkg/util/errors"
- utilruntime "k8s.io/apimachinery/pkg/util/runtime"
- "k8s.io/apiserver/pkg/authentication/authenticator"
- "k8s.io/apiserver/pkg/authorization/authorizer"
- genericapifilters "k8s.io/apiserver/pkg/endpoints/filters"
- apirequest "k8s.io/apiserver/pkg/endpoints/request"
- genericfilters "k8s.io/apiserver/pkg/server/filters"
- "k8s.io/apiserver/pkg/server/healthz"
- "k8s.io/apiserver/pkg/server/mux"
- "k8s.io/apiserver/pkg/server/routes"
- "k8s.io/apiserver/pkg/util/term"
- v1core "k8s.io/client-go/kubernetes/typed/core/v1"
- "k8s.io/client-go/tools/leaderelection"
- cliflag "k8s.io/component-base/cli/flag"
- "k8s.io/component-base/cli/globalflag"
- schedulerserverconfig "k8s.io/kubernetes/cmd/kube-scheduler/app/config"
- "k8s.io/kubernetes/cmd/kube-scheduler/app/options"
- "k8s.io/kubernetes/pkg/api/legacyscheme"
- "k8s.io/kubernetes/pkg/scheduler"
- "k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
- kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
- framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
- "k8s.io/kubernetes/pkg/scheduler/metrics"
- "k8s.io/kubernetes/pkg/util/configz"
- utilflag "k8s.io/kubernetes/pkg/util/flag"
- "k8s.io/kubernetes/pkg/version"
- "k8s.io/kubernetes/pkg/version/verflag"
- "github.com/prometheus/client_golang/prometheus"
- "github.com/spf13/cobra"
- "k8s.io/klog"
- )
- // NewSchedulerCommand creates a *cobra.Command object with default parameters
- func NewSchedulerCommand() *cobra.Command {
- opts, err := options.NewOptions()
- if err != nil {
- klog.Fatalf("unable to initialize command options: %v", err)
- }
- cmd := &cobra.Command{
- Use: "kube-scheduler",
- Long: `The Kubernetes scheduler is a policy-rich, topology-aware,
- workload-specific function that significantly impacts availability, performance,
- and capacity. The scheduler needs to take into account individual and collective
- resource requirements, quality of service requirements, hardware/software/policy
- constraints, affinity and anti-affinity specifications, data locality, inter-workload
- interference, deadlines, and so on. Workload-specific requirements will be exposed
- through the API as necessary.`,
- Run: func(cmd *cobra.Command, args []string) {
- if err := runCommand(cmd, args, opts); err != nil {
- fmt.Fprintf(os.Stderr, "%v\n", err)
- os.Exit(1)
- }
- },
- }
- fs := cmd.Flags()
- namedFlagSets := opts.Flags()
- verflag.AddFlags(namedFlagSets.FlagSet("global"))
- globalflag.AddGlobalFlags(namedFlagSets.FlagSet("global"), cmd.Name())
- for _, f := range namedFlagSets.FlagSets {
- fs.AddFlagSet(f)
- }
- usageFmt := "Usage:\n %s\n"
- cols, _, _ := term.TerminalSize(cmd.OutOrStdout())
- cmd.SetUsageFunc(func(cmd *cobra.Command) error {
- fmt.Fprintf(cmd.OutOrStderr(), usageFmt, cmd.UseLine())
- cliflag.PrintSections(cmd.OutOrStderr(), namedFlagSets, cols)
- return nil
- })
- cmd.SetHelpFunc(func(cmd *cobra.Command, args []string) {
- fmt.Fprintf(cmd.OutOrStdout(), "%s\n\n"+usageFmt, cmd.Long, cmd.UseLine())
- cliflag.PrintSections(cmd.OutOrStdout(), namedFlagSets, cols)
- })
- cmd.MarkFlagFilename("config", "yaml", "yml", "json")
- return cmd
- }
- // runCommand runs the scheduler.
- func runCommand(cmd *cobra.Command, args []string, opts *options.Options) error {
- verflag.PrintAndExitIfRequested()
- utilflag.PrintFlags(cmd.Flags())
- if len(args) != 0 {
- fmt.Fprint(os.Stderr, "arguments are not supported\n")
- }
- if errs := opts.Validate(); len(errs) > 0 {
- fmt.Fprintf(os.Stderr, "%v\n", utilerrors.NewAggregate(errs))
- os.Exit(1)
- }
- if len(opts.WriteConfigTo) > 0 {
- if err := options.WriteConfigFile(opts.WriteConfigTo, &opts.ComponentConfig); err != nil {
- fmt.Fprintf(os.Stderr, "%v\n", err)
- os.Exit(1)
- }
- klog.Infof("Wrote configuration to: %s\n", opts.WriteConfigTo)
- }
- c, err := opts.Config()
- if err != nil {
- fmt.Fprintf(os.Stderr, "%v\n", err)
- os.Exit(1)
- }
- stopCh := make(chan struct{})
- // Get the completed config
- cc := c.Complete()
- // To help debugging, immediately log version
- klog.Infof("Version: %+v", version.Get())
- // Apply algorithms based on feature gates.
- // TODO: make configurable?
- algorithmprovider.ApplyFeatureGates()
- // Configz registration.
- if cz, err := configz.New("componentconfig"); err == nil {
- cz.Set(cc.ComponentConfig)
- } else {
- return fmt.Errorf("unable to register configz: %s", err)
- }
- return Run(cc, stopCh)
- }
- // Run executes the scheduler based on the given configuration. It only return on error or when stopCh is closed.
- func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error {
- // To help debugging, immediately log version
- klog.V(1).Infof("Starting Kubernetes Scheduler version %+v", version.Get())
- // Create the scheduler.
- sched, err := scheduler.New(cc.Client,
- cc.InformerFactory.Core().V1().Nodes(),
- cc.PodInformer,
- cc.InformerFactory.Core().V1().PersistentVolumes(),
- cc.InformerFactory.Core().V1().PersistentVolumeClaims(),
- cc.InformerFactory.Core().V1().ReplicationControllers(),
- cc.InformerFactory.Apps().V1().ReplicaSets(),
- cc.InformerFactory.Apps().V1().StatefulSets(),
- cc.InformerFactory.Core().V1().Services(),
- cc.InformerFactory.Policy().V1beta1().PodDisruptionBudgets(),
- cc.InformerFactory.Storage().V1().StorageClasses(),
- cc.Recorder,
- cc.ComponentConfig.AlgorithmSource,
- stopCh,
- framework.NewRegistry(),
- cc.ComponentConfig.Plugins,
- cc.ComponentConfig.PluginConfig,
- scheduler.WithName(cc.ComponentConfig.SchedulerName),
- scheduler.WithHardPodAffinitySymmetricWeight(cc.ComponentConfig.HardPodAffinitySymmetricWeight),
- scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption),
- scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
- scheduler.WithBindTimeoutSeconds(*cc.ComponentConfig.BindTimeoutSeconds))
- if err != nil {
- return err
- }
- // Prepare the event broadcaster.
- if cc.Broadcaster != nil && cc.EventClient != nil {
- cc.Broadcaster.StartLogging(klog.V(6).Infof)
- cc.Broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: cc.EventClient.Events("")})
- }
- // Setup healthz checks.
- var checks []healthz.HealthzChecker
- if cc.ComponentConfig.LeaderElection.LeaderElect {
- checks = append(checks, cc.LeaderElection.WatchDog)
- }
- // Start up the healthz server.
- if cc.InsecureServing != nil {
- separateMetrics := cc.InsecureMetricsServing != nil
- handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, separateMetrics, checks...), nil, nil)
- if err := cc.InsecureServing.Serve(handler, 0, stopCh); err != nil {
- return fmt.Errorf("failed to start healthz server: %v", err)
- }
- }
- if cc.InsecureMetricsServing != nil {
- handler := buildHandlerChain(newMetricsHandler(&cc.ComponentConfig), nil, nil)
- if err := cc.InsecureMetricsServing.Serve(handler, 0, stopCh); err != nil {
- return fmt.Errorf("failed to start metrics server: %v", err)
- }
- }
- if cc.SecureServing != nil {
- handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, false, checks...), cc.Authentication.Authenticator, cc.Authorization.Authorizer)
- // TODO: handle stoppedCh returned by c.SecureServing.Serve
- if _, err := cc.SecureServing.Serve(handler, 0, stopCh); err != nil {
- // fail early for secure handlers, removing the old error loop from above
- return fmt.Errorf("failed to start secure server: %v", err)
- }
- }
- // Start all informers.
- go cc.PodInformer.Informer().Run(stopCh)
- cc.InformerFactory.Start(stopCh)
- // Wait for all caches to sync before scheduling.
- cc.InformerFactory.WaitForCacheSync(stopCh)
- // Prepare a reusable runCommand function.
- run := func(ctx context.Context) {
- sched.Run()
- <-ctx.Done()
- }
- ctx, cancel := context.WithCancel(context.TODO()) // TODO once Run() accepts a context, it should be used here
- defer cancel()
- go func() {
- select {
- case <-stopCh:
- cancel()
- case <-ctx.Done():
- }
- }()
- // If leader election is enabled, runCommand via LeaderElector until done and exit.
- if cc.LeaderElection != nil {
- cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
- OnStartedLeading: run,
- OnStoppedLeading: func() {
- utilruntime.HandleError(fmt.Errorf("lost master"))
- },
- }
- leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection)
- if err != nil {
- return fmt.Errorf("couldn't create leader elector: %v", err)
- }
- leaderElector.Run(ctx)
- return fmt.Errorf("lost lease")
- }
- // Leader election is disabled, so runCommand inline until done.
- run(ctx)
- return fmt.Errorf("finished without leader elect")
- }
- // buildHandlerChain wraps the given handler with the standard filters.
- func buildHandlerChain(handler http.Handler, authn authenticator.Request, authz authorizer.Authorizer) http.Handler {
- requestInfoResolver := &apirequest.RequestInfoFactory{}
- failedHandler := genericapifilters.Unauthorized(legacyscheme.Codecs, false)
- handler = genericapifilters.WithRequestInfo(handler, requestInfoResolver)
- handler = genericapifilters.WithAuthorization(handler, authz, legacyscheme.Codecs)
- handler = genericapifilters.WithAuthentication(handler, authn, failedHandler, nil)
- handler = genericapifilters.WithRequestInfo(handler, requestInfoResolver)
- handler = genericfilters.WithPanicRecovery(handler)
- return handler
- }
- func installMetricHandler(pathRecorderMux *mux.PathRecorderMux) {
- configz.InstallHandler(pathRecorderMux)
- defaultMetricsHandler := prometheus.Handler().ServeHTTP
- pathRecorderMux.HandleFunc("/metrics", func(w http.ResponseWriter, req *http.Request) {
- if req.Method == "DELETE" {
- metrics.Reset()
- io.WriteString(w, "metrics reset\n")
- return
- }
- defaultMetricsHandler(w, req)
- })
- }
- // newMetricsHandler builds a metrics server from the config.
- func newMetricsHandler(config *kubeschedulerconfig.KubeSchedulerConfiguration) http.Handler {
- pathRecorderMux := mux.NewPathRecorderMux("kube-scheduler")
- installMetricHandler(pathRecorderMux)
- if config.EnableProfiling {
- routes.Profiling{}.Install(pathRecorderMux)
- if config.EnableContentionProfiling {
- goruntime.SetBlockProfileRate(1)
- }
- }
- return pathRecorderMux
- }
- // newHealthzHandler creates a healthz server from the config, and will also
- // embed the metrics handler if the healthz and metrics address configurations
- // are the same.
- func newHealthzHandler(config *kubeschedulerconfig.KubeSchedulerConfiguration, separateMetrics bool, checks ...healthz.HealthzChecker) http.Handler {
- pathRecorderMux := mux.NewPathRecorderMux("kube-scheduler")
- healthz.InstallHandler(pathRecorderMux, checks...)
- if !separateMetrics {
- installMetricHandler(pathRecorderMux)
- }
- if config.EnableProfiling {
- routes.Profiling{}.Install(pathRecorderMux)
- if config.EnableContentionProfiling {
- goruntime.SetBlockProfileRate(1)
- }
- }
- return pathRecorderMux
- }
|