server.go 30 KB


  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // Package app does all of the work necessary to create a Kubernetes
  14. // APIServer by binding together the API, master and APIServer infrastructure.
  15. // It can be configured and called directly or via the hyperkube framework.
  16. package app
  17. import (
  18. "crypto/tls"
  19. "fmt"
  20. "net"
  21. "net/http"
  22. "net/url"
  23. "os"
  24. "strconv"
  25. "strings"
  26. "time"
  27. "github.com/go-openapi/spec"
  28. "github.com/spf13/cobra"
  29. extensionsapiserver "k8s.io/apiextensions-apiserver/pkg/apiserver"
  30. v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  31. utilerrors "k8s.io/apimachinery/pkg/util/errors"
  32. utilnet "k8s.io/apimachinery/pkg/util/net"
  33. "k8s.io/apimachinery/pkg/util/sets"
  34. utilwait "k8s.io/apimachinery/pkg/util/wait"
  35. "k8s.io/apiserver/pkg/admission"
  36. "k8s.io/apiserver/pkg/authentication/authenticator"
  37. "k8s.io/apiserver/pkg/authorization/authorizer"
  38. openapinamer "k8s.io/apiserver/pkg/endpoints/openapi"
  39. genericfeatures "k8s.io/apiserver/pkg/features"
  40. genericapiserver "k8s.io/apiserver/pkg/server"
  41. "k8s.io/apiserver/pkg/server/egressselector"
  42. "k8s.io/apiserver/pkg/server/filters"
  43. serveroptions "k8s.io/apiserver/pkg/server/options"
  44. serverstorage "k8s.io/apiserver/pkg/server/storage"
  45. "k8s.io/apiserver/pkg/storage/etcd3/preflight"
  46. "k8s.io/apiserver/pkg/util/feature"
  47. utilfeature "k8s.io/apiserver/pkg/util/feature"
  48. utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
  49. "k8s.io/apiserver/pkg/util/term"
  50. "k8s.io/apiserver/pkg/util/webhook"
  51. clientgoinformers "k8s.io/client-go/informers"
  52. clientgoclientset "k8s.io/client-go/kubernetes"
  53. "k8s.io/client-go/util/keyutil"
  54. cloudprovider "k8s.io/cloud-provider"
  55. cliflag "k8s.io/component-base/cli/flag"
  56. "k8s.io/component-base/cli/globalflag"
  57. "k8s.io/component-base/metrics"
  58. _ "k8s.io/component-base/metrics/prometheus/workqueue" // for workqueue metric registration
  59. "k8s.io/component-base/version"
  60. "k8s.io/component-base/version/verflag"
  61. "k8s.io/klog"
  62. aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver"
  63. aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme"
  64. "k8s.io/kubernetes/cmd/kube-apiserver/app/options"
  65. "k8s.io/kubernetes/pkg/api/legacyscheme"
  66. "k8s.io/kubernetes/pkg/capabilities"
  67. serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount"
  68. "k8s.io/kubernetes/pkg/features"
  69. generatedopenapi "k8s.io/kubernetes/pkg/generated/openapi"
  70. "k8s.io/kubernetes/pkg/kubeapiserver"
  71. kubeapiserveradmission "k8s.io/kubernetes/pkg/kubeapiserver/admission"
  72. kubeauthenticator "k8s.io/kubernetes/pkg/kubeapiserver/authenticator"
  73. "k8s.io/kubernetes/pkg/kubeapiserver/authorizer/modes"
  74. kubeoptions "k8s.io/kubernetes/pkg/kubeapiserver/options"
  75. kubeserver "k8s.io/kubernetes/pkg/kubeapiserver/server"
  76. "k8s.io/kubernetes/pkg/master"
  77. "k8s.io/kubernetes/pkg/master/reconcilers"
  78. "k8s.io/kubernetes/pkg/master/tunneler"
  79. "k8s.io/kubernetes/pkg/registry/cachesize"
  80. rbacrest "k8s.io/kubernetes/pkg/registry/rbac/rest"
  81. "k8s.io/kubernetes/pkg/serviceaccount"
  82. utilflag "k8s.io/kubernetes/pkg/util/flag"
  83. "k8s.io/kubernetes/plugin/pkg/auth/authenticator/token/bootstrap"
  84. )
  85. const (
  86. etcdRetryLimit = 60
  87. etcdRetryInterval = 1 * time.Second
  88. )
  89. // NewAPIServerCommand creates a *cobra.Command object with default parameters
  90. func NewAPIServerCommand() *cobra.Command {
  91. s := options.NewServerRunOptions()
  92. cmd := &cobra.Command{
  93. Use: "kube-apiserver",
  94. Long: `The Kubernetes API server validates and configures data
  95. for the api objects which include pods, services, replicationcontrollers, and
  96. others. The API Server services REST operations and provides the frontend to the
  97. cluster's shared state through which all other components interact.`,
  98. RunE: func(cmd *cobra.Command, args []string) error {
  99. verflag.PrintAndExitIfRequested()
  100. utilflag.PrintFlags(cmd.Flags())
  101. // set default options
  102. completedOptions, err := Complete(s)
  103. if err != nil {
  104. return err
  105. }
  106. // validate options
  107. if errs := completedOptions.Validate(); len(errs) != 0 {
  108. return utilerrors.NewAggregate(errs)
  109. }
  110. return Run(completedOptions, genericapiserver.SetupSignalHandler())
  111. },
  112. }
  113. fs := cmd.Flags()
  114. namedFlagSets := s.Flags()
  115. verflag.AddFlags(namedFlagSets.FlagSet("global"))
  116. globalflag.AddGlobalFlags(namedFlagSets.FlagSet("global"), cmd.Name())
  117. options.AddCustomGlobalFlags(namedFlagSets.FlagSet("generic"))
  118. for _, f := range namedFlagSets.FlagSets {
  119. fs.AddFlagSet(f)
  120. }
  121. usageFmt := "Usage:\n %s\n"
  122. cols, _, _ := term.TerminalSize(cmd.OutOrStdout())
  123. cmd.SetUsageFunc(func(cmd *cobra.Command) error {
  124. fmt.Fprintf(cmd.OutOrStderr(), usageFmt, cmd.UseLine())
  125. cliflag.PrintSections(cmd.OutOrStderr(), namedFlagSets, cols)
  126. return nil
  127. })
  128. cmd.SetHelpFunc(func(cmd *cobra.Command, args []string) {
  129. fmt.Fprintf(cmd.OutOrStdout(), "%s\n\n"+usageFmt, cmd.Long, cmd.UseLine())
  130. cliflag.PrintSections(cmd.OutOrStdout(), namedFlagSets, cols)
  131. })
  132. return cmd
  133. }
  134. // Run runs the specified APIServer. This should never exit.
  135. func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error {
  136. // To help debugging, immediately log version
  137. klog.Infof("Version: %+v", version.Get())
  138. server, err := CreateServerChain(completeOptions, stopCh)
  139. if err != nil {
  140. return err
  141. }
  142. prepared, err := server.PrepareRun()
  143. if err != nil {
  144. return err
  145. }
  146. return prepared.Run(stopCh)
  147. }
  148. // CreateServerChain creates the apiservers connected via delegation.
  149. func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*aggregatorapiserver.APIAggregator, error) {
  150. nodeTunneler, proxyTransport, err := CreateNodeDialer(completedOptions)
  151. if err != nil {
  152. return nil, err
  153. }
  154. kubeAPIServerConfig, insecureServingInfo, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions, nodeTunneler, proxyTransport)
  155. if err != nil {
  156. return nil, err
  157. }
  158. // If additional API servers are added, they should be gated.
  159. apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,
  160. serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(proxyTransport, kubeAPIServerConfig.GenericConfig.EgressSelector, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig))
  161. if err != nil {
  162. return nil, err
  163. }
  164. apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegate())
  165. if err != nil {
  166. return nil, err
  167. }
  168. kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer)
  169. if err != nil {
  170. return nil, err
  171. }
  172. // aggregator comes last in the chain
  173. aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.ExtraConfig.VersionedInformers, serviceResolver, proxyTransport, pluginInitializer)
  174. if err != nil {
  175. return nil, err
  176. }
  177. aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)
  178. if err != nil {
  179. // we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
  180. return nil, err
  181. }
  182. if insecureServingInfo != nil {
  183. insecureHandlerChain := kubeserver.BuildInsecureHandlerChain(aggregatorServer.GenericAPIServer.UnprotectedHandler(), kubeAPIServerConfig.GenericConfig)
  184. if err := insecureServingInfo.Serve(insecureHandlerChain, kubeAPIServerConfig.GenericConfig.RequestTimeout, stopCh); err != nil {
  185. return nil, err
  186. }
  187. }
  188. return aggregatorServer, nil
  189. }
  190. // CreateKubeAPIServer creates and wires a workable kube-apiserver
  191. func CreateKubeAPIServer(kubeAPIServerConfig *master.Config, delegateAPIServer genericapiserver.DelegationTarget) (*master.Master, error) {
  192. kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer)
  193. if err != nil {
  194. return nil, err
  195. }
  196. return kubeAPIServer, nil
  197. }
  198. // CreateNodeDialer creates the dialer infrastructure to connect to the nodes.
  199. func CreateNodeDialer(s completedServerRunOptions) (tunneler.Tunneler, *http.Transport, error) {
  200. // Setup nodeTunneler if needed
  201. var nodeTunneler tunneler.Tunneler
  202. var proxyDialerFn utilnet.DialFunc
  203. if len(s.SSHUser) > 0 {
  204. // Get ssh key distribution func, if supported
  205. var installSSHKey tunneler.InstallSSHKey
  206. cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider.CloudProvider, s.CloudProvider.CloudConfigFile)
  207. if err != nil {
  208. return nil, nil, fmt.Errorf("cloud provider could not be initialized: %v", err)
  209. }
  210. if cloud != nil {
  211. if instances, supported := cloud.Instances(); supported {
  212. installSSHKey = instances.AddSSHKeyToAllInstances
  213. }
  214. }
  215. if s.KubeletConfig.Port == 0 {
  216. return nil, nil, fmt.Errorf("must enable kubelet port if proxy ssh-tunneling is specified")
  217. }
  218. if s.KubeletConfig.ReadOnlyPort == 0 {
  219. return nil, nil, fmt.Errorf("must enable kubelet readonly port if proxy ssh-tunneling is specified")
  220. }
  221. // Set up the nodeTunneler
  222. // TODO(cjcullen): If we want this to handle per-kubelet ports or other
  223. // kubelet listen-addresses, we need to plumb through options.
  224. healthCheckPath := &url.URL{
  225. Scheme: "http",
  226. Host: net.JoinHostPort("127.0.0.1", strconv.FormatUint(uint64(s.KubeletConfig.ReadOnlyPort), 10)),
  227. Path: "healthz",
  228. }
  229. nodeTunneler = tunneler.New(s.SSHUser, s.SSHKeyfile, healthCheckPath, installSSHKey)
  230. // Use the nodeTunneler's dialer when proxying to pods, services, and nodes
  231. proxyDialerFn = nodeTunneler.Dial
  232. }
  233. // Proxying to pods and services is IP-based... don't expect to be able to verify the hostname
  234. proxyTLSClientConfig := &tls.Config{InsecureSkipVerify: true}
  235. proxyTransport := utilnet.SetTransportDefaults(&http.Transport{
  236. DialContext: proxyDialerFn,
  237. TLSClientConfig: proxyTLSClientConfig,
  238. })
  239. return nodeTunneler, proxyTransport, nil
  240. }
  241. // CreateKubeAPIServerConfig creates all the resources for running the API server, but runs none of them
  242. func CreateKubeAPIServerConfig(
  243. s completedServerRunOptions,
  244. nodeTunneler tunneler.Tunneler,
  245. proxyTransport *http.Transport,
  246. ) (
  247. *master.Config,
  248. *genericapiserver.DeprecatedInsecureServingInfo,
  249. aggregatorapiserver.ServiceResolver,
  250. []admission.PluginInitializer,
  251. error,
  252. ) {
  253. genericConfig, versionedInformers, insecureServingInfo, serviceResolver, pluginInitializers, admissionPostStartHook, storageFactory, err := buildGenericConfig(s.ServerRunOptions, proxyTransport)
  254. if err != nil {
  255. return nil, nil, nil, nil, err
  256. }
  257. if _, port, err := net.SplitHostPort(s.Etcd.StorageConfig.Transport.ServerList[0]); err == nil && port != "0" && len(port) != 0 {
  258. if err := utilwait.PollImmediate(etcdRetryInterval, etcdRetryLimit*etcdRetryInterval, preflight.EtcdConnection{ServerList: s.Etcd.StorageConfig.Transport.ServerList}.CheckEtcdServers); err != nil {
  259. return nil, nil, nil, nil, fmt.Errorf("error waiting for etcd connection: %v", err)
  260. }
  261. }
  262. capabilities.Initialize(capabilities.Capabilities{
  263. AllowPrivileged: s.AllowPrivileged,
  264. // TODO(vmarmol): Implement support for HostNetworkSources.
  265. PrivilegedSources: capabilities.PrivilegedSources{
  266. HostNetworkSources: []string{},
  267. HostPIDSources: []string{},
  268. HostIPCSources: []string{},
  269. },
  270. PerConnectionBandwidthLimitBytesPerSec: s.MaxConnectionBytesPerSec,
  271. })
  272. if len(s.ShowHiddenMetricsForVersion) > 0 {
  273. metrics.SetShowHidden()
  274. }
  275. serviceIPRange, apiServerServiceIP, err := master.ServiceIPRange(s.PrimaryServiceClusterIPRange)
  276. if err != nil {
  277. return nil, nil, nil, nil, err
  278. }
  279. // defaults to empty range and ip
  280. var secondaryServiceIPRange net.IPNet
  281. // process secondary range only if provided by user
  282. if s.SecondaryServiceClusterIPRange.IP != nil {
  283. secondaryServiceIPRange, _, err = master.ServiceIPRange(s.SecondaryServiceClusterIPRange)
  284. if err != nil {
  285. return nil, nil, nil, nil, err
  286. }
  287. }
  288. config := &master.Config{
  289. GenericConfig: genericConfig,
  290. ExtraConfig: master.ExtraConfig{
  291. APIResourceConfigSource: storageFactory.APIResourceConfigSource,
  292. StorageFactory: storageFactory,
  293. EventTTL: s.EventTTL,
  294. KubeletClientConfig: s.KubeletConfig,
  295. EnableLogsSupport: s.EnableLogsHandler,
  296. ProxyTransport: proxyTransport,
  297. Tunneler: nodeTunneler,
  298. ServiceIPRange: serviceIPRange,
  299. APIServerServiceIP: apiServerServiceIP,
  300. SecondaryServiceIPRange: secondaryServiceIPRange,
  301. APIServerServicePort: 443,
  302. ServiceNodePortRange: s.ServiceNodePortRange,
  303. KubernetesServiceNodePort: s.KubernetesServiceNodePort,
  304. EndpointReconcilerType: reconcilers.Type(s.EndpointReconcilerType),
  305. MasterCount: s.MasterCount,
  306. ServiceAccountIssuer: s.ServiceAccountIssuer,
  307. ServiceAccountMaxExpiration: s.ServiceAccountTokenMaxExpiration,
  308. VersionedInformers: versionedInformers,
  309. },
  310. }
  311. clientCAProvider, err := s.Authentication.ClientCert.GetClientCAContentProvider()
  312. if err != nil {
  313. return nil, nil, nil, nil, err
  314. }
  315. config.ExtraConfig.ClusterAuthenticationInfo.ClientCA = clientCAProvider
  316. requestHeaderConfig, err := s.Authentication.RequestHeader.ToAuthenticationRequestHeaderConfig()
  317. if err != nil {
  318. return nil, nil, nil, nil, err
  319. }
  320. if requestHeaderConfig != nil {
  321. config.ExtraConfig.ClusterAuthenticationInfo.RequestHeaderCA = requestHeaderConfig.CAContentProvider
  322. config.ExtraConfig.ClusterAuthenticationInfo.RequestHeaderAllowedNames = requestHeaderConfig.AllowedClientNames
  323. config.ExtraConfig.ClusterAuthenticationInfo.RequestHeaderExtraHeaderPrefixes = requestHeaderConfig.ExtraHeaderPrefixes
  324. config.ExtraConfig.ClusterAuthenticationInfo.RequestHeaderGroupHeaders = requestHeaderConfig.GroupHeaders
  325. config.ExtraConfig.ClusterAuthenticationInfo.RequestHeaderUsernameHeaders = requestHeaderConfig.UsernameHeaders
  326. }
  327. if err := config.GenericConfig.AddPostStartHook("start-kube-apiserver-admission-initializer", admissionPostStartHook); err != nil {
  328. return nil, nil, nil, nil, err
  329. }
  330. if nodeTunneler != nil {
  331. // Use the nodeTunneler's dialer to connect to the kubelet
  332. config.ExtraConfig.KubeletClientConfig.Dial = nodeTunneler.Dial
  333. }
  334. if config.GenericConfig.EgressSelector != nil {
  335. // Use the config.GenericConfig.EgressSelector lookup to find the dialer to connect to the kubelet
  336. config.ExtraConfig.KubeletClientConfig.Lookup = config.GenericConfig.EgressSelector.Lookup
  337. // Use the config.GenericConfig.EgressSelector lookup as the transport used by the "proxy" subresources.
  338. networkContext := egressselector.Cluster.AsNetworkContext()
  339. dialer, err := config.GenericConfig.EgressSelector.Lookup(networkContext)
  340. if err != nil {
  341. return nil, nil, nil, nil, err
  342. }
  343. c := proxyTransport.Clone()
  344. c.DialContext = dialer
  345. config.ExtraConfig.ProxyTransport = c
  346. }
  347. if utilfeature.DefaultFeatureGate.Enabled(features.ServiceAccountIssuerDiscovery) {
  348. // Load the public keys.
  349. var pubKeys []interface{}
  350. for _, f := range s.Authentication.ServiceAccounts.KeyFiles {
  351. keys, err := keyutil.PublicKeysFromFile(f)
  352. if err != nil {
  353. return nil, nil, nil, nil, fmt.Errorf("failed to parse key file %q: %v", f, err)
  354. }
  355. pubKeys = append(pubKeys, keys...)
  356. }
  357. // Plumb the required metadata through ExtraConfig.
  358. config.ExtraConfig.ServiceAccountIssuerURL = s.Authentication.ServiceAccounts.Issuer
  359. config.ExtraConfig.ServiceAccountJWKSURI = s.Authentication.ServiceAccounts.JWKSURI
  360. config.ExtraConfig.ServiceAccountPublicKeys = pubKeys
  361. }
  362. return config, insecureServingInfo, serviceResolver, pluginInitializers, nil
  363. }
  364. // BuildGenericConfig takes the master server options and produces the genericapiserver.Config associated with it
  365. func buildGenericConfig(
  366. s *options.ServerRunOptions,
  367. proxyTransport *http.Transport,
  368. ) (
  369. genericConfig *genericapiserver.Config,
  370. versionedInformers clientgoinformers.SharedInformerFactory,
  371. insecureServingInfo *genericapiserver.DeprecatedInsecureServingInfo,
  372. serviceResolver aggregatorapiserver.ServiceResolver,
  373. pluginInitializers []admission.PluginInitializer,
  374. admissionPostStartHook genericapiserver.PostStartHookFunc,
  375. storageFactory *serverstorage.DefaultStorageFactory,
  376. lastErr error,
  377. ) {
  378. genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs)
  379. genericConfig.MergedResourceConfig = master.DefaultAPIResourceConfigSource()
  380. if lastErr = s.GenericServerRunOptions.ApplyTo(genericConfig); lastErr != nil {
  381. return
  382. }
  383. if lastErr = s.InsecureServing.ApplyTo(&insecureServingInfo, &genericConfig.LoopbackClientConfig); lastErr != nil {
  384. return
  385. }
  386. if lastErr = s.SecureServing.ApplyTo(&genericConfig.SecureServing, &genericConfig.LoopbackClientConfig); lastErr != nil {
  387. return
  388. }
  389. if lastErr = s.Authentication.ApplyTo(genericConfig); lastErr != nil {
  390. return
  391. }
  392. if lastErr = s.Features.ApplyTo(genericConfig); lastErr != nil {
  393. return
  394. }
  395. if lastErr = s.APIEnablement.ApplyTo(genericConfig, master.DefaultAPIResourceConfigSource(), legacyscheme.Scheme); lastErr != nil {
  396. return
  397. }
  398. if lastErr = s.EgressSelector.ApplyTo(genericConfig); lastErr != nil {
  399. return
  400. }
  401. genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensionsapiserver.Scheme, aggregatorscheme.Scheme))
  402. genericConfig.OpenAPIConfig.Info.Title = "Kubernetes"
  403. genericConfig.LongRunningFunc = filters.BasicLongRunningRequestCheck(
  404. sets.NewString("watch", "proxy"),
  405. sets.NewString("attach", "exec", "proxy", "log", "portforward"),
  406. )
  407. kubeVersion := version.Get()
  408. genericConfig.Version = &kubeVersion
  409. storageFactoryConfig := kubeapiserver.NewStorageFactoryConfig()
  410. storageFactoryConfig.APIResourceConfig = genericConfig.MergedResourceConfig
  411. completedStorageFactoryConfig, err := storageFactoryConfig.Complete(s.Etcd)
  412. if err != nil {
  413. lastErr = err
  414. return
  415. }
  416. storageFactory, lastErr = completedStorageFactoryConfig.New()
  417. if lastErr != nil {
  418. return
  419. }
  420. if genericConfig.EgressSelector != nil {
  421. storageFactory.StorageConfig.Transport.EgressLookup = genericConfig.EgressSelector.Lookup
  422. }
  423. if lastErr = s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); lastErr != nil {
  424. return
  425. }
  426. // Use protobufs for self-communication.
  427. // Since not every generic apiserver has to support protobufs, we
  428. // cannot default to it in generic apiserver and need to explicitly
  429. // set it in kube-apiserver.
  430. genericConfig.LoopbackClientConfig.ContentConfig.ContentType = "application/vnd.kubernetes.protobuf"
  431. // Disable compression for self-communication, since we are going to be
  432. // on a fast local network
  433. genericConfig.LoopbackClientConfig.DisableCompression = true
  434. kubeClientConfig := genericConfig.LoopbackClientConfig
  435. clientgoExternalClient, err := clientgoclientset.NewForConfig(kubeClientConfig)
  436. if err != nil {
  437. lastErr = fmt.Errorf("failed to create real external clientset: %v", err)
  438. return
  439. }
  440. versionedInformers = clientgoinformers.NewSharedInformerFactory(clientgoExternalClient, 10*time.Minute)
  441. genericConfig.Authentication.Authenticator, genericConfig.OpenAPIConfig.SecurityDefinitions, err = BuildAuthenticator(s, clientgoExternalClient, versionedInformers)
  442. if err != nil {
  443. lastErr = fmt.Errorf("invalid authentication config: %v", err)
  444. return
  445. }
  446. genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, err = BuildAuthorizer(s, versionedInformers)
  447. if err != nil {
  448. lastErr = fmt.Errorf("invalid authorization config: %v", err)
  449. return
  450. }
  451. if !sets.NewString(s.Authorization.Modes...).Has(modes.ModeRBAC) {
  452. genericConfig.DisabledPostStartHooks.Insert(rbacrest.PostStartHookName)
  453. }
  454. admissionConfig := &kubeapiserveradmission.Config{
  455. ExternalInformers: versionedInformers,
  456. LoopbackClientConfig: genericConfig.LoopbackClientConfig,
  457. CloudConfigFile: s.CloudProvider.CloudConfigFile,
  458. }
  459. serviceResolver = buildServiceResolver(s.EnableAggregatorRouting, genericConfig.LoopbackClientConfig.Host, versionedInformers)
  460. authInfoResolverWrapper := webhook.NewDefaultAuthenticationInfoResolverWrapper(proxyTransport, genericConfig.EgressSelector, genericConfig.LoopbackClientConfig)
  461. lastErr = s.Audit.ApplyTo(
  462. genericConfig,
  463. genericConfig.LoopbackClientConfig,
  464. versionedInformers,
  465. serveroptions.NewProcessInfo("kube-apiserver", "kube-system"),
  466. &serveroptions.WebhookOptions{
  467. AuthInfoResolverWrapper: authInfoResolverWrapper,
  468. ServiceResolver: serviceResolver,
  469. },
  470. )
  471. if lastErr != nil {
  472. return
  473. }
  474. pluginInitializers, admissionPostStartHook, err = admissionConfig.New(proxyTransport, genericConfig.EgressSelector, serviceResolver)
  475. if err != nil {
  476. lastErr = fmt.Errorf("failed to create admission plugin initializer: %v", err)
  477. return
  478. }
  479. err = s.Admission.ApplyTo(
  480. genericConfig,
  481. versionedInformers,
  482. kubeClientConfig,
  483. feature.DefaultFeatureGate,
  484. pluginInitializers...)
  485. if err != nil {
  486. lastErr = fmt.Errorf("failed to initialize admission: %v", err)
  487. }
  488. if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIPriorityAndFairness) && s.GenericServerRunOptions.EnableInflightQuotaHandler {
  489. genericConfig.FlowControl = BuildPriorityAndFairness(s, clientgoExternalClient, versionedInformers)
  490. }
  491. return
  492. }
  493. // BuildAuthenticator constructs the authenticator
  494. func BuildAuthenticator(s *options.ServerRunOptions, extclient clientgoclientset.Interface, versionedInformer clientgoinformers.SharedInformerFactory) (authenticator.Request, *spec.SecurityDefinitions, error) {
  495. authenticatorConfig, err := s.Authentication.ToAuthenticationConfig()
  496. if err != nil {
  497. return nil, nil, err
  498. }
  499. if s.Authentication.ServiceAccounts.Lookup || utilfeature.DefaultFeatureGate.Enabled(features.TokenRequest) {
  500. authenticatorConfig.ServiceAccountTokenGetter = serviceaccountcontroller.NewGetterFromClient(
  501. extclient,
  502. versionedInformer.Core().V1().Secrets().Lister(),
  503. versionedInformer.Core().V1().ServiceAccounts().Lister(),
  504. versionedInformer.Core().V1().Pods().Lister(),
  505. )
  506. }
  507. authenticatorConfig.BootstrapTokenAuthenticator = bootstrap.NewTokenAuthenticator(
  508. versionedInformer.Core().V1().Secrets().Lister().Secrets(v1.NamespaceSystem),
  509. )
  510. return authenticatorConfig.New()
  511. }
  512. // BuildAuthorizer constructs the authorizer
  513. func BuildAuthorizer(s *options.ServerRunOptions, versionedInformers clientgoinformers.SharedInformerFactory) (authorizer.Authorizer, authorizer.RuleResolver, error) {
  514. authorizationConfig := s.Authorization.ToAuthorizationConfig(versionedInformers)
  515. return authorizationConfig.New()
  516. }
  517. // BuildPriorityAndFairness constructs the guts of the API Priority and Fairness filter
  518. func BuildPriorityAndFairness(s *options.ServerRunOptions, extclient clientgoclientset.Interface, versionedInformer clientgoinformers.SharedInformerFactory) utilflowcontrol.Interface {
  519. return utilflowcontrol.New(
  520. versionedInformer,
  521. extclient.FlowcontrolV1alpha1(),
  522. s.GenericServerRunOptions.MaxRequestsInFlight+s.GenericServerRunOptions.MaxMutatingRequestsInFlight,
  523. s.GenericServerRunOptions.RequestTimeout/4,
  524. )
  525. }
  526. // completedServerRunOptions is a private wrapper that enforces a call of Complete() before Run can be invoked.
  527. type completedServerRunOptions struct {
  528. *options.ServerRunOptions
  529. }
  530. // Complete set default ServerRunOptions.
  531. // Should be called after kube-apiserver flags parsed.
  532. func Complete(s *options.ServerRunOptions) (completedServerRunOptions, error) {
  533. var options completedServerRunOptions
  534. // set defaults
  535. if err := s.GenericServerRunOptions.DefaultAdvertiseAddress(s.SecureServing.SecureServingOptions); err != nil {
  536. return options, err
  537. }
  538. if err := kubeoptions.DefaultAdvertiseAddress(s.GenericServerRunOptions, s.InsecureServing.DeprecatedInsecureServingOptions); err != nil {
  539. return options, err
  540. }
  541. // process s.ServiceClusterIPRange from list to Primary and Secondary
  542. // we process secondary only if provided by user
  543. apiServerServiceIP, primaryServiceIPRange, secondaryServiceIPRange, err := getServiceIPAndRanges(s.ServiceClusterIPRanges)
  544. if err != nil {
  545. return options, err
  546. }
  547. s.PrimaryServiceClusterIPRange = primaryServiceIPRange
  548. s.SecondaryServiceClusterIPRange = secondaryServiceIPRange
  549. if err := s.SecureServing.MaybeDefaultWithSelfSignedCerts(s.GenericServerRunOptions.AdvertiseAddress.String(), []string{"kubernetes.default.svc", "kubernetes.default", "kubernetes"}, []net.IP{apiServerServiceIP}); err != nil {
  550. return options, fmt.Errorf("error creating self-signed certificates: %v", err)
  551. }
  552. if len(s.GenericServerRunOptions.ExternalHost) == 0 {
  553. if len(s.GenericServerRunOptions.AdvertiseAddress) > 0 {
  554. s.GenericServerRunOptions.ExternalHost = s.GenericServerRunOptions.AdvertiseAddress.String()
  555. } else {
  556. if hostname, err := os.Hostname(); err == nil {
  557. s.GenericServerRunOptions.ExternalHost = hostname
  558. } else {
  559. return options, fmt.Errorf("error finding host name: %v", err)
  560. }
  561. }
  562. klog.Infof("external host was not specified, using %v", s.GenericServerRunOptions.ExternalHost)
  563. }
  564. s.Authentication.ApplyAuthorization(s.Authorization)
  565. // Use (ServiceAccountSigningKeyFile != "") as a proxy to the user enabling
  566. // TokenRequest functionality. This defaulting was convenient, but messed up
  567. // a lot of people when they rotated their serving cert with no idea it was
  568. // connected to their service account keys. We are taking this opportunity to
  569. // remove this problematic defaulting.
  570. if s.ServiceAccountSigningKeyFile == "" {
  571. // Default to the private server key for service account token signing
  572. if len(s.Authentication.ServiceAccounts.KeyFiles) == 0 && s.SecureServing.ServerCert.CertKey.KeyFile != "" {
  573. if kubeauthenticator.IsValidServiceAccountKeyFile(s.SecureServing.ServerCert.CertKey.KeyFile) {
  574. s.Authentication.ServiceAccounts.KeyFiles = []string{s.SecureServing.ServerCert.CertKey.KeyFile}
  575. } else {
  576. klog.Warning("No TLS key provided, service account token authentication disabled")
  577. }
  578. }
  579. }
  580. if s.ServiceAccountSigningKeyFile != "" && s.Authentication.ServiceAccounts.Issuer != "" {
  581. sk, err := keyutil.PrivateKeyFromFile(s.ServiceAccountSigningKeyFile)
  582. if err != nil {
  583. return options, fmt.Errorf("failed to parse service-account-issuer-key-file: %v", err)
  584. }
  585. if s.Authentication.ServiceAccounts.MaxExpiration != 0 {
  586. lowBound := time.Hour
  587. upBound := time.Duration(1<<32) * time.Second
  588. if s.Authentication.ServiceAccounts.MaxExpiration < lowBound ||
  589. s.Authentication.ServiceAccounts.MaxExpiration > upBound {
  590. return options, fmt.Errorf("the serviceaccount max expiration must be between 1 hour to 2^32 seconds")
  591. }
  592. }
  593. s.ServiceAccountIssuer, err = serviceaccount.JWTTokenGenerator(s.Authentication.ServiceAccounts.Issuer, sk)
  594. if err != nil {
  595. return options, fmt.Errorf("failed to build token generator: %v", err)
  596. }
  597. s.ServiceAccountTokenMaxExpiration = s.Authentication.ServiceAccounts.MaxExpiration
  598. }
  599. if s.Etcd.EnableWatchCache {
  600. klog.V(2).Infof("Initializing cache sizes based on %dMB limit", s.GenericServerRunOptions.TargetRAMMB)
  601. sizes := cachesize.NewHeuristicWatchCacheSizes(s.GenericServerRunOptions.TargetRAMMB)
  602. if userSpecified, err := serveroptions.ParseWatchCacheSizes(s.Etcd.WatchCacheSizes); err == nil {
  603. for resource, size := range userSpecified {
  604. sizes[resource] = size
  605. }
  606. }
  607. s.Etcd.WatchCacheSizes, err = serveroptions.WriteWatchCacheSizes(sizes)
  608. if err != nil {
  609. return options, err
  610. }
  611. }
  612. if s.APIEnablement.RuntimeConfig != nil {
  613. for key, value := range s.APIEnablement.RuntimeConfig {
  614. if key == "v1" || strings.HasPrefix(key, "v1/") ||
  615. key == "api/v1" || strings.HasPrefix(key, "api/v1/") {
  616. delete(s.APIEnablement.RuntimeConfig, key)
  617. s.APIEnablement.RuntimeConfig["/v1"] = value
  618. }
  619. if key == "api/legacy" {
  620. delete(s.APIEnablement.RuntimeConfig, key)
  621. }
  622. }
  623. }
  624. options.ServerRunOptions = s
  625. return options, nil
  626. }
  627. func buildServiceResolver(enabledAggregatorRouting bool, hostname string, informer clientgoinformers.SharedInformerFactory) webhook.ServiceResolver {
  628. var serviceResolver webhook.ServiceResolver
  629. if enabledAggregatorRouting {
  630. serviceResolver = aggregatorapiserver.NewEndpointServiceResolver(
  631. informer.Core().V1().Services().Lister(),
  632. informer.Core().V1().Endpoints().Lister(),
  633. )
  634. } else {
  635. serviceResolver = aggregatorapiserver.NewClusterIPServiceResolver(
  636. informer.Core().V1().Services().Lister(),
  637. )
  638. }
  639. // resolve kubernetes.default.svc locally
  640. if localHost, err := url.Parse(hostname); err == nil {
  641. serviceResolver = aggregatorapiserver.NewLoopbackServiceResolver(serviceResolver, localHost)
  642. }
  643. return serviceResolver
  644. }
  645. func getServiceIPAndRanges(serviceClusterIPRanges string) (net.IP, net.IPNet, net.IPNet, error) {
  646. serviceClusterIPRangeList := []string{}
  647. if serviceClusterIPRanges != "" {
  648. serviceClusterIPRangeList = strings.Split(serviceClusterIPRanges, ",")
  649. }
  650. var apiServerServiceIP net.IP
  651. var primaryServiceIPRange net.IPNet
  652. var secondaryServiceIPRange net.IPNet
  653. var err error
  654. // nothing provided by user, use default range (only applies to the Primary)
  655. if len(serviceClusterIPRangeList) == 0 {
  656. var primaryServiceClusterCIDR net.IPNet
  657. primaryServiceIPRange, apiServerServiceIP, err = master.ServiceIPRange(primaryServiceClusterCIDR)
  658. if err != nil {
  659. return net.IP{}, net.IPNet{}, net.IPNet{}, fmt.Errorf("error determining service IP ranges: %v", err)
  660. }
  661. return apiServerServiceIP, primaryServiceIPRange, net.IPNet{}, nil
  662. }
  663. if len(serviceClusterIPRangeList) > 0 {
  664. _, primaryServiceClusterCIDR, err := net.ParseCIDR(serviceClusterIPRangeList[0])
  665. if err != nil {
  666. return net.IP{}, net.IPNet{}, net.IPNet{}, fmt.Errorf("service-cluster-ip-range[0] is not a valid cidr")
  667. }
  668. primaryServiceIPRange, apiServerServiceIP, err = master.ServiceIPRange(*(primaryServiceClusterCIDR))
  669. if err != nil {
  670. return net.IP{}, net.IPNet{}, net.IPNet{}, fmt.Errorf("error determining service IP ranges for primary service cidr: %v", err)
  671. }
  672. }
  673. // user provided at least two entries
  674. // note: validation asserts that the list is max of two dual stack entries
  675. if len(serviceClusterIPRangeList) > 1 {
  676. _, secondaryServiceClusterCIDR, err := net.ParseCIDR(serviceClusterIPRangeList[1])
  677. if err != nil {
  678. return net.IP{}, net.IPNet{}, net.IPNet{}, fmt.Errorf("service-cluster-ip-range[1] is not an ip net")
  679. }
  680. secondaryServiceIPRange = *secondaryServiceClusterCIDR
  681. }
  682. return apiServerServiceIP, primaryServiceIPRange, secondaryServiceIPRange, nil
  683. }