options.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344
  1. /*
  2. Copyright 2018 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package options
  14. import (
  15. "fmt"
  16. "net"
  17. "os"
  18. "strconv"
  19. "time"
  20. corev1 "k8s.io/api/core/v1"
  21. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  22. "k8s.io/apimachinery/pkg/util/uuid"
  23. apiserveroptions "k8s.io/apiserver/pkg/server/options"
  24. utilfeature "k8s.io/apiserver/pkg/util/feature"
  25. "k8s.io/client-go/informers"
  26. clientset "k8s.io/client-go/kubernetes"
  27. "k8s.io/client-go/kubernetes/scheme"
  28. restclient "k8s.io/client-go/rest"
  29. "k8s.io/client-go/tools/clientcmd"
  30. clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
  31. "k8s.io/client-go/tools/leaderelection"
  32. "k8s.io/client-go/tools/leaderelection/resourcelock"
  33. "k8s.io/client-go/tools/record"
  34. cliflag "k8s.io/component-base/cli/flag"
  35. componentbaseconfig "k8s.io/component-base/config"
  36. "k8s.io/klog"
  37. kubeschedulerconfigv1alpha2 "k8s.io/kube-scheduler/config/v1alpha2"
  38. schedulerappconfig "k8s.io/kubernetes/cmd/kube-scheduler/app/config"
  39. "k8s.io/kubernetes/pkg/client/leaderelectionconfig"
  40. "k8s.io/kubernetes/pkg/master/ports"
  41. "k8s.io/kubernetes/pkg/scheduler"
  42. kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
  43. kubeschedulerscheme "k8s.io/kubernetes/pkg/scheduler/apis/config/scheme"
  44. "k8s.io/kubernetes/pkg/scheduler/apis/config/validation"
  45. "k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity"
  46. )
  47. // Options has all the params needed to run a Scheduler
  48. type Options struct {
  49. // The default values. These are overridden if ConfigFile is set or by values in InsecureServing.
  50. ComponentConfig kubeschedulerconfig.KubeSchedulerConfiguration
  51. SecureServing *apiserveroptions.SecureServingOptionsWithLoopback
  52. CombinedInsecureServing *CombinedInsecureServingOptions
  53. Authentication *apiserveroptions.DelegatingAuthenticationOptions
  54. Authorization *apiserveroptions.DelegatingAuthorizationOptions
  55. Deprecated *DeprecatedOptions
  56. // ConfigFile is the location of the scheduler server's configuration file.
  57. ConfigFile string
  58. // WriteConfigTo is the path where the default configuration will be written.
  59. WriteConfigTo string
  60. Master string
  61. }
  62. // NewOptions returns default scheduler app options.
  63. func NewOptions() (*Options, error) {
  64. cfg, err := newDefaultComponentConfig()
  65. if err != nil {
  66. return nil, err
  67. }
  68. hhost, hport, err := splitHostIntPort(cfg.HealthzBindAddress)
  69. if err != nil {
  70. return nil, err
  71. }
  72. o := &Options{
  73. ComponentConfig: *cfg,
  74. SecureServing: apiserveroptions.NewSecureServingOptions().WithLoopback(),
  75. CombinedInsecureServing: &CombinedInsecureServingOptions{
  76. Healthz: (&apiserveroptions.DeprecatedInsecureServingOptions{
  77. BindNetwork: "tcp",
  78. }).WithLoopback(),
  79. Metrics: (&apiserveroptions.DeprecatedInsecureServingOptions{
  80. BindNetwork: "tcp",
  81. }).WithLoopback(),
  82. BindPort: hport,
  83. BindAddress: hhost,
  84. },
  85. Authentication: apiserveroptions.NewDelegatingAuthenticationOptions(),
  86. Authorization: apiserveroptions.NewDelegatingAuthorizationOptions(),
  87. Deprecated: &DeprecatedOptions{
  88. UseLegacyPolicyConfig: false,
  89. PolicyConfigMapNamespace: metav1.NamespaceSystem,
  90. SchedulerName: corev1.DefaultSchedulerName,
  91. HardPodAffinitySymmetricWeight: interpodaffinity.DefaultHardPodAffinityWeight,
  92. },
  93. }
  94. o.Authentication.TolerateInClusterLookupFailure = true
  95. o.Authentication.RemoteKubeConfigFileOptional = true
  96. o.Authorization.RemoteKubeConfigFileOptional = true
  97. o.Authorization.AlwaysAllowPaths = []string{"/healthz"}
  98. // Set the PairName but leave certificate directory blank to generate in-memory by default
  99. o.SecureServing.ServerCert.CertDirectory = ""
  100. o.SecureServing.ServerCert.PairName = "kube-scheduler"
  101. o.SecureServing.BindPort = ports.KubeSchedulerPort
  102. return o, nil
  103. }
  104. func splitHostIntPort(s string) (string, int, error) {
  105. host, port, err := net.SplitHostPort(s)
  106. if err != nil {
  107. return "", 0, err
  108. }
  109. portInt, err := strconv.Atoi(port)
  110. if err != nil {
  111. return "", 0, err
  112. }
  113. return host, portInt, err
  114. }
  115. func newDefaultComponentConfig() (*kubeschedulerconfig.KubeSchedulerConfiguration, error) {
  116. versionedCfg := kubeschedulerconfigv1alpha2.KubeSchedulerConfiguration{}
  117. kubeschedulerscheme.Scheme.Default(&versionedCfg)
  118. cfg := kubeschedulerconfig.KubeSchedulerConfiguration{}
  119. if err := kubeschedulerscheme.Scheme.Convert(&versionedCfg, &cfg, nil); err != nil {
  120. return nil, err
  121. }
  122. return &cfg, nil
  123. }
  124. // Flags returns flags for a specific scheduler by section name
  125. func (o *Options) Flags() (nfs cliflag.NamedFlagSets) {
  126. fs := nfs.FlagSet("misc")
  127. fs.StringVar(&o.ConfigFile, "config", o.ConfigFile, "The path to the configuration file. Flags override values in this file.")
  128. fs.StringVar(&o.WriteConfigTo, "write-config-to", o.WriteConfigTo, "If set, write the configuration values to this file and exit.")
  129. fs.StringVar(&o.Master, "master", o.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig)")
  130. o.SecureServing.AddFlags(nfs.FlagSet("secure serving"))
  131. o.CombinedInsecureServing.AddFlags(nfs.FlagSet("insecure serving"))
  132. o.Authentication.AddFlags(nfs.FlagSet("authentication"))
  133. o.Authorization.AddFlags(nfs.FlagSet("authorization"))
  134. o.Deprecated.AddFlags(nfs.FlagSet("deprecated"), &o.ComponentConfig)
  135. leaderelectionconfig.BindFlags(&o.ComponentConfig.LeaderElection.LeaderElectionConfiguration, nfs.FlagSet("leader election"))
  136. utilfeature.DefaultMutableFeatureGate.AddFlag(nfs.FlagSet("feature gate"))
  137. return nfs
  138. }
  139. // ApplyTo applies the scheduler options to the given scheduler app configuration.
  140. func (o *Options) ApplyTo(c *schedulerappconfig.Config) error {
  141. if len(o.ConfigFile) == 0 {
  142. c.ComponentConfig = o.ComponentConfig
  143. // only apply deprecated flags if no config file is loaded (this is the old behaviour).
  144. if err := o.Deprecated.ApplyTo(&c.ComponentConfig); err != nil {
  145. return err
  146. }
  147. if err := o.CombinedInsecureServing.ApplyTo(c, &c.ComponentConfig); err != nil {
  148. return err
  149. }
  150. } else {
  151. cfg, err := loadConfigFromFile(o.ConfigFile)
  152. if err != nil {
  153. return err
  154. }
  155. if err := validation.ValidateKubeSchedulerConfiguration(cfg).ToAggregate(); err != nil {
  156. return err
  157. }
  158. // use the loaded config file only, with the exception of --address and --port. This means that
  159. // none of the deprecated flags in o.Deprecated are taken into consideration. This is the old
  160. // behaviour of the flags we have to keep.
  161. c.ComponentConfig = *cfg
  162. if err := o.CombinedInsecureServing.ApplyToFromLoadedConfig(c, &c.ComponentConfig); err != nil {
  163. return err
  164. }
  165. }
  166. if err := o.SecureServing.ApplyTo(&c.SecureServing, &c.LoopbackClientConfig); err != nil {
  167. return err
  168. }
  169. if o.SecureServing != nil && (o.SecureServing.BindPort != 0 || o.SecureServing.Listener != nil) {
  170. if err := o.Authentication.ApplyTo(&c.Authentication, c.SecureServing, nil); err != nil {
  171. return err
  172. }
  173. if err := o.Authorization.ApplyTo(&c.Authorization); err != nil {
  174. return err
  175. }
  176. }
  177. return nil
  178. }
  179. // Validate validates all the required options.
  180. func (o *Options) Validate() []error {
  181. var errs []error
  182. if err := validation.ValidateKubeSchedulerConfiguration(&o.ComponentConfig).ToAggregate(); err != nil {
  183. errs = append(errs, err.Errors()...)
  184. }
  185. errs = append(errs, o.SecureServing.Validate()...)
  186. errs = append(errs, o.CombinedInsecureServing.Validate()...)
  187. errs = append(errs, o.Authentication.Validate()...)
  188. errs = append(errs, o.Authorization.Validate()...)
  189. errs = append(errs, o.Deprecated.Validate()...)
  190. return errs
  191. }
  192. // Config return a scheduler config object
  193. func (o *Options) Config() (*schedulerappconfig.Config, error) {
  194. if o.SecureServing != nil {
  195. if err := o.SecureServing.MaybeDefaultWithSelfSignedCerts("localhost", nil, []net.IP{net.ParseIP("127.0.0.1")}); err != nil {
  196. return nil, fmt.Errorf("error creating self-signed certificates: %v", err)
  197. }
  198. }
  199. c := &schedulerappconfig.Config{}
  200. if err := o.ApplyTo(c); err != nil {
  201. return nil, err
  202. }
  203. // Prepare kube clients.
  204. client, leaderElectionClient, eventClient, err := createClients(c.ComponentConfig.ClientConnection, o.Master, c.ComponentConfig.LeaderElection.RenewDeadline.Duration)
  205. if err != nil {
  206. return nil, err
  207. }
  208. coreBroadcaster := record.NewBroadcaster()
  209. // Set up leader election if enabled.
  210. var leaderElectionConfig *leaderelection.LeaderElectionConfig
  211. if c.ComponentConfig.LeaderElection.LeaderElect {
  212. // Use the scheduler name in the first profile to record leader election.
  213. coreRecorder := coreBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: c.ComponentConfig.Profiles[0].SchedulerName})
  214. leaderElectionConfig, err = makeLeaderElectionConfig(c.ComponentConfig.LeaderElection, leaderElectionClient, coreRecorder)
  215. if err != nil {
  216. return nil, err
  217. }
  218. }
  219. c.Client = client
  220. c.InformerFactory = informers.NewSharedInformerFactory(client, 0)
  221. c.PodInformer = scheduler.NewPodInformer(client, 0)
  222. c.EventClient = eventClient.EventsV1beta1()
  223. c.CoreEventClient = eventClient.CoreV1()
  224. c.CoreBroadcaster = coreBroadcaster
  225. c.LeaderElection = leaderElectionConfig
  226. return c, nil
  227. }
  228. // makeLeaderElectionConfig builds a leader election configuration. It will
  229. // create a new resource lock associated with the configuration.
  230. func makeLeaderElectionConfig(config kubeschedulerconfig.KubeSchedulerLeaderElectionConfiguration, client clientset.Interface, recorder record.EventRecorder) (*leaderelection.LeaderElectionConfig, error) {
  231. hostname, err := os.Hostname()
  232. if err != nil {
  233. return nil, fmt.Errorf("unable to get hostname: %v", err)
  234. }
  235. // add a uniquifier so that two processes on the same host don't accidentally both become active
  236. id := hostname + "_" + string(uuid.NewUUID())
  237. rl, err := resourcelock.New(config.ResourceLock,
  238. config.ResourceNamespace,
  239. config.ResourceName,
  240. client.CoreV1(),
  241. client.CoordinationV1(),
  242. resourcelock.ResourceLockConfig{
  243. Identity: id,
  244. EventRecorder: recorder,
  245. })
  246. if err != nil {
  247. return nil, fmt.Errorf("couldn't create resource lock: %v", err)
  248. }
  249. return &leaderelection.LeaderElectionConfig{
  250. Lock: rl,
  251. LeaseDuration: config.LeaseDuration.Duration,
  252. RenewDeadline: config.RenewDeadline.Duration,
  253. RetryPeriod: config.RetryPeriod.Duration,
  254. WatchDog: leaderelection.NewLeaderHealthzAdaptor(time.Second * 20),
  255. Name: "kube-scheduler",
  256. }, nil
  257. }
  258. // createClients creates a kube client and an event client from the given config and masterOverride.
  259. // TODO remove masterOverride when CLI flags are removed.
  260. func createClients(config componentbaseconfig.ClientConnectionConfiguration, masterOverride string, timeout time.Duration) (clientset.Interface, clientset.Interface, clientset.Interface, error) {
  261. if len(config.Kubeconfig) == 0 && len(masterOverride) == 0 {
  262. klog.Warningf("Neither --kubeconfig nor --master was specified. Using default API client. This might not work.")
  263. }
  264. // This creates a client, first loading any specified kubeconfig
  265. // file, and then overriding the Master flag, if non-empty.
  266. kubeConfig, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
  267. &clientcmd.ClientConfigLoadingRules{ExplicitPath: config.Kubeconfig},
  268. &clientcmd.ConfigOverrides{ClusterInfo: clientcmdapi.Cluster{Server: masterOverride}}).ClientConfig()
  269. if err != nil {
  270. return nil, nil, nil, err
  271. }
  272. kubeConfig.DisableCompression = true
  273. kubeConfig.AcceptContentTypes = config.AcceptContentTypes
  274. kubeConfig.ContentType = config.ContentType
  275. kubeConfig.QPS = config.QPS
  276. //TODO make config struct use int instead of int32?
  277. kubeConfig.Burst = int(config.Burst)
  278. client, err := clientset.NewForConfig(restclient.AddUserAgent(kubeConfig, "scheduler"))
  279. if err != nil {
  280. return nil, nil, nil, err
  281. }
  282. // shallow copy, do not modify the kubeConfig.Timeout.
  283. restConfig := *kubeConfig
  284. restConfig.Timeout = timeout
  285. leaderElectionClient, err := clientset.NewForConfig(restclient.AddUserAgent(&restConfig, "leader-election"))
  286. if err != nil {
  287. return nil, nil, nil, err
  288. }
  289. eventClient, err := clientset.NewForConfig(kubeConfig)
  290. if err != nil {
  291. return nil, nil, nil, err
  292. }
  293. return client, leaderElectionClient, eventClient, nil
  294. }