123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338 |
- /*
- Copyright 2018 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package options
- import (
- "fmt"
- "net"
- "os"
- "strconv"
- "time"
- corev1 "k8s.io/api/core/v1"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/util/uuid"
- apiserveroptions "k8s.io/apiserver/pkg/server/options"
- utilfeature "k8s.io/apiserver/pkg/util/feature"
- "k8s.io/client-go/informers"
- clientset "k8s.io/client-go/kubernetes"
- "k8s.io/client-go/kubernetes/scheme"
- v1core "k8s.io/client-go/kubernetes/typed/core/v1"
- restclient "k8s.io/client-go/rest"
- "k8s.io/client-go/tools/clientcmd"
- clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
- "k8s.io/client-go/tools/leaderelection"
- "k8s.io/client-go/tools/leaderelection/resourcelock"
- "k8s.io/client-go/tools/record"
- cliflag "k8s.io/component-base/cli/flag"
- componentbaseconfig "k8s.io/component-base/config"
- "k8s.io/klog"
- kubeschedulerconfigv1alpha1 "k8s.io/kube-scheduler/config/v1alpha1"
- schedulerappconfig "k8s.io/kubernetes/cmd/kube-scheduler/app/config"
- "k8s.io/kubernetes/pkg/client/leaderelectionconfig"
- "k8s.io/kubernetes/pkg/master/ports"
- kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
- kubeschedulerscheme "k8s.io/kubernetes/pkg/scheduler/apis/config/scheme"
- "k8s.io/kubernetes/pkg/scheduler/apis/config/validation"
- "k8s.io/kubernetes/pkg/scheduler/factory"
- )
- // Options has all the params needed to run a Scheduler
- type Options struct {
- // The default values. These are overridden if ConfigFile is set or by values in InsecureServing.
- ComponentConfig kubeschedulerconfig.KubeSchedulerConfiguration
- SecureServing *apiserveroptions.SecureServingOptionsWithLoopback
- CombinedInsecureServing *CombinedInsecureServingOptions
- Authentication *apiserveroptions.DelegatingAuthenticationOptions
- Authorization *apiserveroptions.DelegatingAuthorizationOptions
- Deprecated *DeprecatedOptions
- // ConfigFile is the location of the scheduler server's configuration file.
- ConfigFile string
- // WriteConfigTo is the path where the default configuration will be written.
- WriteConfigTo string
- Master string
- }
- // NewOptions returns default scheduler app options.
- func NewOptions() (*Options, error) {
- cfg, err := newDefaultComponentConfig()
- if err != nil {
- return nil, err
- }
- hhost, hport, err := splitHostIntPort(cfg.HealthzBindAddress)
- if err != nil {
- return nil, err
- }
- o := &Options{
- ComponentConfig: *cfg,
- SecureServing: apiserveroptions.NewSecureServingOptions().WithLoopback(),
- CombinedInsecureServing: &CombinedInsecureServingOptions{
- Healthz: (&apiserveroptions.DeprecatedInsecureServingOptions{
- BindNetwork: "tcp",
- }).WithLoopback(),
- Metrics: (&apiserveroptions.DeprecatedInsecureServingOptions{
- BindNetwork: "tcp",
- }).WithLoopback(),
- BindPort: hport,
- BindAddress: hhost,
- },
- Authentication: apiserveroptions.NewDelegatingAuthenticationOptions(),
- Authorization: apiserveroptions.NewDelegatingAuthorizationOptions(),
- Deprecated: &DeprecatedOptions{
- UseLegacyPolicyConfig: false,
- PolicyConfigMapNamespace: metav1.NamespaceSystem,
- },
- }
- o.Authentication.TolerateInClusterLookupFailure = true
- o.Authentication.RemoteKubeConfigFileOptional = true
- o.Authorization.RemoteKubeConfigFileOptional = true
- o.Authorization.AlwaysAllowPaths = []string{"/healthz"}
- // Set the PairName but leave certificate directory blank to generate in-memory by default
- o.SecureServing.ServerCert.CertDirectory = ""
- o.SecureServing.ServerCert.PairName = "kube-scheduler"
- o.SecureServing.BindPort = ports.KubeSchedulerPort
- return o, nil
- }
- func splitHostIntPort(s string) (string, int, error) {
- host, port, err := net.SplitHostPort(s)
- if err != nil {
- return "", 0, err
- }
- portInt, err := strconv.Atoi(port)
- if err != nil {
- return "", 0, err
- }
- return host, portInt, err
- }
- func newDefaultComponentConfig() (*kubeschedulerconfig.KubeSchedulerConfiguration, error) {
- cfgv1alpha1 := kubeschedulerconfigv1alpha1.KubeSchedulerConfiguration{}
- kubeschedulerscheme.Scheme.Default(&cfgv1alpha1)
- cfg := kubeschedulerconfig.KubeSchedulerConfiguration{}
- if err := kubeschedulerscheme.Scheme.Convert(&cfgv1alpha1, &cfg, nil); err != nil {
- return nil, err
- }
- return &cfg, nil
- }
- // Flags returns flags for a specific scheduler by section name
- func (o *Options) Flags() (nfs cliflag.NamedFlagSets) {
- fs := nfs.FlagSet("misc")
- fs.StringVar(&o.ConfigFile, "config", o.ConfigFile, "The path to the configuration file. Flags override values in this file.")
- fs.StringVar(&o.WriteConfigTo, "write-config-to", o.WriteConfigTo, "If set, write the configuration values to this file and exit.")
- fs.StringVar(&o.Master, "master", o.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig)")
- o.SecureServing.AddFlags(nfs.FlagSet("secure serving"))
- o.CombinedInsecureServing.AddFlags(nfs.FlagSet("insecure serving"))
- o.Authentication.AddFlags(nfs.FlagSet("authentication"))
- o.Authorization.AddFlags(nfs.FlagSet("authorization"))
- o.Deprecated.AddFlags(nfs.FlagSet("deprecated"), &o.ComponentConfig)
- leaderelectionconfig.BindFlags(&o.ComponentConfig.LeaderElection.LeaderElectionConfiguration, nfs.FlagSet("leader election"))
- utilfeature.DefaultMutableFeatureGate.AddFlag(nfs.FlagSet("feature gate"))
- return nfs
- }
- // ApplyTo applies the scheduler options to the given scheduler app configuration.
- func (o *Options) ApplyTo(c *schedulerappconfig.Config) error {
- if len(o.ConfigFile) == 0 {
- c.ComponentConfig = o.ComponentConfig
- // only apply deprecated flags if no config file is loaded (this is the old behaviour).
- if err := o.Deprecated.ApplyTo(&c.ComponentConfig); err != nil {
- return err
- }
- if err := o.CombinedInsecureServing.ApplyTo(c, &c.ComponentConfig); err != nil {
- return err
- }
- } else {
- cfg, err := loadConfigFromFile(o.ConfigFile)
- if err != nil {
- return err
- }
- // use the loaded config file only, with the exception of --address and --port. This means that
- // none of the deprecated flags in o.Deprecated are taken into consideration. This is the old
- // behaviour of the flags we have to keep.
- c.ComponentConfig = *cfg
- if err := o.CombinedInsecureServing.ApplyToFromLoadedConfig(c, &c.ComponentConfig); err != nil {
- return err
- }
- }
- if err := o.SecureServing.ApplyTo(&c.SecureServing, &c.LoopbackClientConfig); err != nil {
- return err
- }
- if o.SecureServing != nil && (o.SecureServing.BindPort != 0 || o.SecureServing.Listener != nil) {
- if err := o.Authentication.ApplyTo(&c.Authentication, c.SecureServing, nil); err != nil {
- return err
- }
- if err := o.Authorization.ApplyTo(&c.Authorization); err != nil {
- return err
- }
- }
- return nil
- }
- // Validate validates all the required options.
- func (o *Options) Validate() []error {
- var errs []error
- if err := validation.ValidateKubeSchedulerConfiguration(&o.ComponentConfig).ToAggregate(); err != nil {
- errs = append(errs, err.Errors()...)
- }
- errs = append(errs, o.SecureServing.Validate()...)
- errs = append(errs, o.CombinedInsecureServing.Validate()...)
- errs = append(errs, o.Authentication.Validate()...)
- errs = append(errs, o.Authorization.Validate()...)
- errs = append(errs, o.Deprecated.Validate()...)
- return errs
- }
- // Config return a scheduler config object
- func (o *Options) Config() (*schedulerappconfig.Config, error) {
- if o.SecureServing != nil {
- if err := o.SecureServing.MaybeDefaultWithSelfSignedCerts("localhost", nil, []net.IP{net.ParseIP("127.0.0.1")}); err != nil {
- return nil, fmt.Errorf("error creating self-signed certificates: %v", err)
- }
- }
- c := &schedulerappconfig.Config{}
- if err := o.ApplyTo(c); err != nil {
- return nil, err
- }
- // Prepare kube clients.
- client, leaderElectionClient, eventClient, err := createClients(c.ComponentConfig.ClientConnection, o.Master, c.ComponentConfig.LeaderElection.RenewDeadline.Duration)
- if err != nil {
- return nil, err
- }
- // Prepare event clients.
- eventBroadcaster := record.NewBroadcaster()
- recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: c.ComponentConfig.SchedulerName})
- // Set up leader election if enabled.
- var leaderElectionConfig *leaderelection.LeaderElectionConfig
- if c.ComponentConfig.LeaderElection.LeaderElect {
- leaderElectionConfig, err = makeLeaderElectionConfig(c.ComponentConfig.LeaderElection, leaderElectionClient, recorder)
- if err != nil {
- return nil, err
- }
- }
- c.Client = client
- c.InformerFactory = informers.NewSharedInformerFactory(client, 0)
- c.PodInformer = factory.NewPodInformer(client, 0)
- c.EventClient = eventClient
- c.Recorder = recorder
- c.Broadcaster = eventBroadcaster
- c.LeaderElection = leaderElectionConfig
- return c, nil
- }
- // makeLeaderElectionConfig builds a leader election configuration. It will
- // create a new resource lock associated with the configuration.
- func makeLeaderElectionConfig(config kubeschedulerconfig.KubeSchedulerLeaderElectionConfiguration, client clientset.Interface, recorder record.EventRecorder) (*leaderelection.LeaderElectionConfig, error) {
- hostname, err := os.Hostname()
- if err != nil {
- return nil, fmt.Errorf("unable to get hostname: %v", err)
- }
- // add a uniquifier so that two processes on the same host don't accidentally both become active
- id := hostname + "_" + string(uuid.NewUUID())
- rl, err := resourcelock.New(config.ResourceLock,
- config.LockObjectNamespace,
- config.LockObjectName,
- client.CoreV1(),
- client.CoordinationV1(),
- resourcelock.ResourceLockConfig{
- Identity: id,
- EventRecorder: recorder,
- })
- if err != nil {
- return nil, fmt.Errorf("couldn't create resource lock: %v", err)
- }
- return &leaderelection.LeaderElectionConfig{
- Lock: rl,
- LeaseDuration: config.LeaseDuration.Duration,
- RenewDeadline: config.RenewDeadline.Duration,
- RetryPeriod: config.RetryPeriod.Duration,
- WatchDog: leaderelection.NewLeaderHealthzAdaptor(time.Second * 20),
- Name: "kube-scheduler",
- }, nil
- }
- // createClients creates a kube client and an event client from the given config and masterOverride.
- // TODO remove masterOverride when CLI flags are removed.
- func createClients(config componentbaseconfig.ClientConnectionConfiguration, masterOverride string, timeout time.Duration) (clientset.Interface, clientset.Interface, v1core.EventsGetter, error) {
- if len(config.Kubeconfig) == 0 && len(masterOverride) == 0 {
- klog.Warningf("Neither --kubeconfig nor --master was specified. Using default API client. This might not work.")
- }
- // This creates a client, first loading any specified kubeconfig
- // file, and then overriding the Master flag, if non-empty.
- kubeConfig, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
- &clientcmd.ClientConfigLoadingRules{ExplicitPath: config.Kubeconfig},
- &clientcmd.ConfigOverrides{ClusterInfo: clientcmdapi.Cluster{Server: masterOverride}}).ClientConfig()
- if err != nil {
- return nil, nil, nil, err
- }
- kubeConfig.AcceptContentTypes = config.AcceptContentTypes
- kubeConfig.ContentType = config.ContentType
- kubeConfig.QPS = config.QPS
- //TODO make config struct use int instead of int32?
- kubeConfig.Burst = int(config.Burst)
- client, err := clientset.NewForConfig(restclient.AddUserAgent(kubeConfig, "scheduler"))
- if err != nil {
- return nil, nil, nil, err
- }
- // shallow copy, do not modify the kubeConfig.Timeout.
- restConfig := *kubeConfig
- restConfig.Timeout = timeout
- leaderElectionClient, err := clientset.NewForConfig(restclient.AddUserAgent(&restConfig, "leader-election"))
- if err != nil {
- return nil, nil, nil, err
- }
- eventClient, err := clientset.NewForConfig(kubeConfig)
- if err != nil {
- return nil, nil, nil, err
- }
- return client, leaderElectionClient, eventClient.CoreV1(), nil
- }
|