server.go 28 KB


  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // Package app does all of the work necessary to create a Kubernetes
  14. // APIServer by binding together the API, master and APIServer infrastructure.
  15. // It can be configured and called directly or via the hyperkube framework.
  16. package app
  17. import (
  18. "crypto/tls"
  19. "fmt"
  20. "net"
  21. "net/http"
  22. "net/url"
  23. "os"
  24. "strconv"
  25. "strings"
  26. "time"
  27. "github.com/go-openapi/spec"
  28. "github.com/spf13/cobra"
  29. extensionsapiserver "k8s.io/apiextensions-apiserver/pkg/apiserver"
  30. v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  31. utilerrors "k8s.io/apimachinery/pkg/util/errors"
  32. utilnet "k8s.io/apimachinery/pkg/util/net"
  33. "k8s.io/apimachinery/pkg/util/sets"
  34. utilwait "k8s.io/apimachinery/pkg/util/wait"
  35. "k8s.io/apiserver/pkg/admission"
  36. "k8s.io/apiserver/pkg/authentication/authenticator"
  37. "k8s.io/apiserver/pkg/authorization/authorizer"
  38. openapinamer "k8s.io/apiserver/pkg/endpoints/openapi"
  39. genericapiserver "k8s.io/apiserver/pkg/server"
  40. "k8s.io/apiserver/pkg/server/filters"
  41. serveroptions "k8s.io/apiserver/pkg/server/options"
  42. serverstorage "k8s.io/apiserver/pkg/server/storage"
  43. "k8s.io/apiserver/pkg/storage/etcd3/preflight"
  44. "k8s.io/apiserver/pkg/util/feature"
  45. utilfeature "k8s.io/apiserver/pkg/util/feature"
  46. "k8s.io/apiserver/pkg/util/term"
  47. "k8s.io/apiserver/pkg/util/webhook"
  48. clientgoinformers "k8s.io/client-go/informers"
  49. clientgoclientset "k8s.io/client-go/kubernetes"
  50. "k8s.io/client-go/util/keyutil"
  51. cloudprovider "k8s.io/cloud-provider"
  52. cliflag "k8s.io/component-base/cli/flag"
  53. "k8s.io/component-base/cli/globalflag"
  54. "k8s.io/component-base/metrics"
  55. _ "k8s.io/component-base/metrics/prometheus/workqueue" // for workqueue metric registration
  56. "k8s.io/component-base/version"
  57. "k8s.io/component-base/version/verflag"
  58. "k8s.io/klog"
  59. aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver"
  60. aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme"
  61. "k8s.io/kubernetes/cmd/kube-apiserver/app/options"
  62. "k8s.io/kubernetes/pkg/api/legacyscheme"
  63. "k8s.io/kubernetes/pkg/capabilities"
  64. serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount"
  65. "k8s.io/kubernetes/pkg/features"
  66. generatedopenapi "k8s.io/kubernetes/pkg/generated/openapi"
  67. "k8s.io/kubernetes/pkg/kubeapiserver"
  68. kubeapiserveradmission "k8s.io/kubernetes/pkg/kubeapiserver/admission"
  69. kubeauthenticator "k8s.io/kubernetes/pkg/kubeapiserver/authenticator"
  70. "k8s.io/kubernetes/pkg/kubeapiserver/authorizer/modes"
  71. kubeoptions "k8s.io/kubernetes/pkg/kubeapiserver/options"
  72. kubeserver "k8s.io/kubernetes/pkg/kubeapiserver/server"
  73. "k8s.io/kubernetes/pkg/master"
  74. "k8s.io/kubernetes/pkg/master/reconcilers"
  75. "k8s.io/kubernetes/pkg/master/tunneler"
  76. "k8s.io/kubernetes/pkg/registry/cachesize"
  77. rbacrest "k8s.io/kubernetes/pkg/registry/rbac/rest"
  78. "k8s.io/kubernetes/pkg/serviceaccount"
  79. utilflag "k8s.io/kubernetes/pkg/util/flag"
  80. "k8s.io/kubernetes/plugin/pkg/auth/authenticator/token/bootstrap"
  81. )
  82. const (
  83. etcdRetryLimit = 60
  84. etcdRetryInterval = 1 * time.Second
  85. )
  86. // NewAPIServerCommand creates a *cobra.Command object with default parameters
  87. func NewAPIServerCommand() *cobra.Command {
  88. s := options.NewServerRunOptions()
  89. cmd := &cobra.Command{
  90. Use: "kube-apiserver",
  91. Long: `The Kubernetes API server validates and configures data
  92. for the api objects which include pods, services, replicationcontrollers, and
  93. others. The API Server services REST operations and provides the frontend to the
  94. cluster's shared state through which all other components interact.`,
  95. RunE: func(cmd *cobra.Command, args []string) error {
  96. verflag.PrintAndExitIfRequested()
  97. utilflag.PrintFlags(cmd.Flags())
  98. // set default options
  99. completedOptions, err := Complete(s)
  100. if err != nil {
  101. return err
  102. }
  103. // validate options
  104. if errs := completedOptions.Validate(); len(errs) != 0 {
  105. return utilerrors.NewAggregate(errs)
  106. }
  107. return Run(completedOptions, genericapiserver.SetupSignalHandler())
  108. },
  109. }
  110. fs := cmd.Flags()
  111. namedFlagSets := s.Flags()
  112. verflag.AddFlags(namedFlagSets.FlagSet("global"))
  113. globalflag.AddGlobalFlags(namedFlagSets.FlagSet("global"), cmd.Name())
  114. options.AddCustomGlobalFlags(namedFlagSets.FlagSet("generic"))
  115. for _, f := range namedFlagSets.FlagSets {
  116. fs.AddFlagSet(f)
  117. }
  118. usageFmt := "Usage:\n %s\n"
  119. cols, _, _ := term.TerminalSize(cmd.OutOrStdout())
  120. cmd.SetUsageFunc(func(cmd *cobra.Command) error {
  121. fmt.Fprintf(cmd.OutOrStderr(), usageFmt, cmd.UseLine())
  122. cliflag.PrintSections(cmd.OutOrStderr(), namedFlagSets, cols)
  123. return nil
  124. })
  125. cmd.SetHelpFunc(func(cmd *cobra.Command, args []string) {
  126. fmt.Fprintf(cmd.OutOrStdout(), "%s\n\n"+usageFmt, cmd.Long, cmd.UseLine())
  127. cliflag.PrintSections(cmd.OutOrStdout(), namedFlagSets, cols)
  128. })
  129. return cmd
  130. }
  131. // Run runs the specified APIServer. This should never exit.
  132. func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error {
  133. // To help debugging, immediately log version
  134. klog.Infof("Version: %+v", version.Get())
  135. server, err := CreateServerChain(completeOptions, stopCh)
  136. if err != nil {
  137. return err
  138. }
  139. prepared, err := server.PrepareRun()
  140. if err != nil {
  141. return err
  142. }
  143. return prepared.Run(stopCh)
  144. }
  145. // CreateServerChain creates the apiservers connected via delegation.
  146. func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*aggregatorapiserver.APIAggregator, error) {
  147. nodeTunneler, proxyTransport, err := CreateNodeDialer(completedOptions)
  148. if err != nil {
  149. return nil, err
  150. }
  151. kubeAPIServerConfig, insecureServingInfo, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions, nodeTunneler, proxyTransport)
  152. if err != nil {
  153. return nil, err
  154. }
  155. // If additional API servers are added, they should be gated.
  156. apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,
  157. serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(proxyTransport, kubeAPIServerConfig.GenericConfig.EgressSelector, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig))
  158. if err != nil {
  159. return nil, err
  160. }
  161. apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegate())
  162. if err != nil {
  163. return nil, err
  164. }
  165. kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer)
  166. if err != nil {
  167. return nil, err
  168. }
  169. // aggregator comes last in the chain
  170. aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.ExtraConfig.VersionedInformers, serviceResolver, proxyTransport, pluginInitializer)
  171. if err != nil {
  172. return nil, err
  173. }
  174. aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)
  175. if err != nil {
  176. // we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
  177. return nil, err
  178. }
  179. if insecureServingInfo != nil {
  180. insecureHandlerChain := kubeserver.BuildInsecureHandlerChain(aggregatorServer.GenericAPIServer.UnprotectedHandler(), kubeAPIServerConfig.GenericConfig)
  181. if err := insecureServingInfo.Serve(insecureHandlerChain, kubeAPIServerConfig.GenericConfig.RequestTimeout, stopCh); err != nil {
  182. return nil, err
  183. }
  184. }
  185. return aggregatorServer, nil
  186. }
  187. // CreateKubeAPIServer creates and wires a workable kube-apiserver
  188. func CreateKubeAPIServer(kubeAPIServerConfig *master.Config, delegateAPIServer genericapiserver.DelegationTarget) (*master.Master, error) {
  189. kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer)
  190. if err != nil {
  191. return nil, err
  192. }
  193. return kubeAPIServer, nil
  194. }
  195. // CreateNodeDialer creates the dialer infrastructure to connect to the nodes.
  196. func CreateNodeDialer(s completedServerRunOptions) (tunneler.Tunneler, *http.Transport, error) {
  197. // Setup nodeTunneler if needed
  198. var nodeTunneler tunneler.Tunneler
  199. var proxyDialerFn utilnet.DialFunc
  200. if len(s.SSHUser) > 0 {
  201. // Get ssh key distribution func, if supported
  202. var installSSHKey tunneler.InstallSSHKey
  203. cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider.CloudProvider, s.CloudProvider.CloudConfigFile)
  204. if err != nil {
  205. return nil, nil, fmt.Errorf("cloud provider could not be initialized: %v", err)
  206. }
  207. if cloud != nil {
  208. if instances, supported := cloud.Instances(); supported {
  209. installSSHKey = instances.AddSSHKeyToAllInstances
  210. }
  211. }
  212. if s.KubeletConfig.Port == 0 {
  213. return nil, nil, fmt.Errorf("must enable kubelet port if proxy ssh-tunneling is specified")
  214. }
  215. if s.KubeletConfig.ReadOnlyPort == 0 {
  216. return nil, nil, fmt.Errorf("must enable kubelet readonly port if proxy ssh-tunneling is specified")
  217. }
  218. // Set up the nodeTunneler
  219. // TODO(cjcullen): If we want this to handle per-kubelet ports or other
  220. // kubelet listen-addresses, we need to plumb through options.
  221. healthCheckPath := &url.URL{
  222. Scheme: "http",
  223. Host: net.JoinHostPort("127.0.0.1", strconv.FormatUint(uint64(s.KubeletConfig.ReadOnlyPort), 10)),
  224. Path: "healthz",
  225. }
  226. nodeTunneler = tunneler.New(s.SSHUser, s.SSHKeyfile, healthCheckPath, installSSHKey)
  227. // Use the nodeTunneler's dialer when proxying to pods, services, and nodes
  228. proxyDialerFn = nodeTunneler.Dial
  229. }
  230. // Proxying to pods and services is IP-based... don't expect to be able to verify the hostname
  231. proxyTLSClientConfig := &tls.Config{InsecureSkipVerify: true}
  232. proxyTransport := utilnet.SetTransportDefaults(&http.Transport{
  233. DialContext: proxyDialerFn,
  234. TLSClientConfig: proxyTLSClientConfig,
  235. })
  236. return nodeTunneler, proxyTransport, nil
  237. }
  238. // CreateKubeAPIServerConfig creates all the resources for running the API server, but runs none of them
  239. func CreateKubeAPIServerConfig(
  240. s completedServerRunOptions,
  241. nodeTunneler tunneler.Tunneler,
  242. proxyTransport *http.Transport,
  243. ) (
  244. *master.Config,
  245. *genericapiserver.DeprecatedInsecureServingInfo,
  246. aggregatorapiserver.ServiceResolver,
  247. []admission.PluginInitializer,
  248. error,
  249. ) {
  250. genericConfig, versionedInformers, insecureServingInfo, serviceResolver, pluginInitializers, admissionPostStartHook, storageFactory, err := buildGenericConfig(s.ServerRunOptions, proxyTransport)
  251. if err != nil {
  252. return nil, nil, nil, nil, err
  253. }
  254. if _, port, err := net.SplitHostPort(s.Etcd.StorageConfig.Transport.ServerList[0]); err == nil && port != "0" && len(port) != 0 {
  255. if err := utilwait.PollImmediate(etcdRetryInterval, etcdRetryLimit*etcdRetryInterval, preflight.EtcdConnection{ServerList: s.Etcd.StorageConfig.Transport.ServerList}.CheckEtcdServers); err != nil {
  256. return nil, nil, nil, nil, fmt.Errorf("error waiting for etcd connection: %v", err)
  257. }
  258. }
  259. capabilities.Initialize(capabilities.Capabilities{
  260. AllowPrivileged: s.AllowPrivileged,
  261. // TODO(vmarmol): Implement support for HostNetworkSources.
  262. PrivilegedSources: capabilities.PrivilegedSources{
  263. HostNetworkSources: []string{},
  264. HostPIDSources: []string{},
  265. HostIPCSources: []string{},
  266. },
  267. PerConnectionBandwidthLimitBytesPerSec: s.MaxConnectionBytesPerSec,
  268. })
  269. if len(s.ShowHiddenMetricsForVersion) > 0 {
  270. metrics.SetShowHidden()
  271. }
  272. serviceIPRange, apiServerServiceIP, err := master.ServiceIPRange(s.PrimaryServiceClusterIPRange)
  273. if err != nil {
  274. return nil, nil, nil, nil, err
  275. }
  276. // defaults to empty range and ip
  277. var secondaryServiceIPRange net.IPNet
  278. // process secondary range only if provided by user
  279. if s.SecondaryServiceClusterIPRange.IP != nil {
  280. secondaryServiceIPRange, _, err = master.ServiceIPRange(s.SecondaryServiceClusterIPRange)
  281. if err != nil {
  282. return nil, nil, nil, nil, err
  283. }
  284. }
  285. config := &master.Config{
  286. GenericConfig: genericConfig,
  287. ExtraConfig: master.ExtraConfig{
  288. APIResourceConfigSource: storageFactory.APIResourceConfigSource,
  289. StorageFactory: storageFactory,
  290. EventTTL: s.EventTTL,
  291. KubeletClientConfig: s.KubeletConfig,
  292. EnableLogsSupport: s.EnableLogsHandler,
  293. ProxyTransport: proxyTransport,
  294. Tunneler: nodeTunneler,
  295. ServiceIPRange: serviceIPRange,
  296. APIServerServiceIP: apiServerServiceIP,
  297. SecondaryServiceIPRange: secondaryServiceIPRange,
  298. APIServerServicePort: 443,
  299. ServiceNodePortRange: s.ServiceNodePortRange,
  300. KubernetesServiceNodePort: s.KubernetesServiceNodePort,
  301. EndpointReconcilerType: reconcilers.Type(s.EndpointReconcilerType),
  302. MasterCount: s.MasterCount,
  303. ServiceAccountIssuer: s.ServiceAccountIssuer,
  304. ServiceAccountMaxExpiration: s.ServiceAccountTokenMaxExpiration,
  305. VersionedInformers: versionedInformers,
  306. },
  307. }
  308. clientCAProvider, err := s.Authentication.ClientCert.GetClientCAContentProvider()
  309. if err != nil {
  310. return nil, nil, nil, nil, err
  311. }
  312. config.ExtraConfig.ClusterAuthenticationInfo.ClientCA = clientCAProvider
  313. requestHeaderConfig, err := s.Authentication.RequestHeader.ToAuthenticationRequestHeaderConfig()
  314. if err != nil {
  315. return nil, nil, nil, nil, err
  316. }
  317. if requestHeaderConfig != nil {
  318. config.ExtraConfig.ClusterAuthenticationInfo.RequestHeaderCA = requestHeaderConfig.CAContentProvider
  319. config.ExtraConfig.ClusterAuthenticationInfo.RequestHeaderAllowedNames = requestHeaderConfig.AllowedClientNames
  320. config.ExtraConfig.ClusterAuthenticationInfo.RequestHeaderExtraHeaderPrefixes = requestHeaderConfig.ExtraHeaderPrefixes
  321. config.ExtraConfig.ClusterAuthenticationInfo.RequestHeaderGroupHeaders = requestHeaderConfig.GroupHeaders
  322. config.ExtraConfig.ClusterAuthenticationInfo.RequestHeaderUsernameHeaders = requestHeaderConfig.UsernameHeaders
  323. }
  324. if err := config.GenericConfig.AddPostStartHook("start-kube-apiserver-admission-initializer", admissionPostStartHook); err != nil {
  325. return nil, nil, nil, nil, err
  326. }
  327. if nodeTunneler != nil {
  328. // Use the nodeTunneler's dialer to connect to the kubelet
  329. config.ExtraConfig.KubeletClientConfig.Dial = nodeTunneler.Dial
  330. }
  331. if config.GenericConfig.EgressSelector != nil {
  332. // Use the config.GenericConfig.EgressSelector lookup to find the dialer to connect to the kubelet
  333. config.ExtraConfig.KubeletClientConfig.Lookup = config.GenericConfig.EgressSelector.Lookup
  334. }
  335. return config, insecureServingInfo, serviceResolver, pluginInitializers, nil
  336. }
  337. // BuildGenericConfig takes the master server options and produces the genericapiserver.Config associated with it
  338. func buildGenericConfig(
  339. s *options.ServerRunOptions,
  340. proxyTransport *http.Transport,
  341. ) (
  342. genericConfig *genericapiserver.Config,
  343. versionedInformers clientgoinformers.SharedInformerFactory,
  344. insecureServingInfo *genericapiserver.DeprecatedInsecureServingInfo,
  345. serviceResolver aggregatorapiserver.ServiceResolver,
  346. pluginInitializers []admission.PluginInitializer,
  347. admissionPostStartHook genericapiserver.PostStartHookFunc,
  348. storageFactory *serverstorage.DefaultStorageFactory,
  349. lastErr error,
  350. ) {
  351. genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs)
  352. genericConfig.MergedResourceConfig = master.DefaultAPIResourceConfigSource()
  353. if lastErr = s.GenericServerRunOptions.ApplyTo(genericConfig); lastErr != nil {
  354. return
  355. }
  356. if lastErr = s.InsecureServing.ApplyTo(&insecureServingInfo, &genericConfig.LoopbackClientConfig); lastErr != nil {
  357. return
  358. }
  359. if lastErr = s.SecureServing.ApplyTo(&genericConfig.SecureServing, &genericConfig.LoopbackClientConfig); lastErr != nil {
  360. return
  361. }
  362. if lastErr = s.Authentication.ApplyTo(genericConfig); lastErr != nil {
  363. return
  364. }
  365. if lastErr = s.Features.ApplyTo(genericConfig); lastErr != nil {
  366. return
  367. }
  368. if lastErr = s.APIEnablement.ApplyTo(genericConfig, master.DefaultAPIResourceConfigSource(), legacyscheme.Scheme); lastErr != nil {
  369. return
  370. }
  371. if lastErr = s.EgressSelector.ApplyTo(genericConfig); lastErr != nil {
  372. return
  373. }
  374. genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensionsapiserver.Scheme, aggregatorscheme.Scheme))
  375. genericConfig.OpenAPIConfig.Info.Title = "Kubernetes"
  376. genericConfig.LongRunningFunc = filters.BasicLongRunningRequestCheck(
  377. sets.NewString("watch", "proxy"),
  378. sets.NewString("attach", "exec", "proxy", "log", "portforward"),
  379. )
  380. kubeVersion := version.Get()
  381. genericConfig.Version = &kubeVersion
  382. storageFactoryConfig := kubeapiserver.NewStorageFactoryConfig()
  383. storageFactoryConfig.APIResourceConfig = genericConfig.MergedResourceConfig
  384. completedStorageFactoryConfig, err := storageFactoryConfig.Complete(s.Etcd)
  385. if err != nil {
  386. lastErr = err
  387. return
  388. }
  389. storageFactory, lastErr = completedStorageFactoryConfig.New()
  390. if lastErr != nil {
  391. return
  392. }
  393. if genericConfig.EgressSelector != nil {
  394. storageFactory.StorageConfig.Transport.EgressLookup = genericConfig.EgressSelector.Lookup
  395. }
  396. if lastErr = s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); lastErr != nil {
  397. return
  398. }
  399. // Use protobufs for self-communication.
  400. // Since not every generic apiserver has to support protobufs, we
  401. // cannot default to it in generic apiserver and need to explicitly
  402. // set it in kube-apiserver.
  403. genericConfig.LoopbackClientConfig.ContentConfig.ContentType = "application/vnd.kubernetes.protobuf"
  404. // Disable compression for self-communication, since we are going to be
  405. // on a fast local network
  406. genericConfig.LoopbackClientConfig.DisableCompression = true
  407. kubeClientConfig := genericConfig.LoopbackClientConfig
  408. clientgoExternalClient, err := clientgoclientset.NewForConfig(kubeClientConfig)
  409. if err != nil {
  410. lastErr = fmt.Errorf("failed to create real external clientset: %v", err)
  411. return
  412. }
  413. versionedInformers = clientgoinformers.NewSharedInformerFactory(clientgoExternalClient, 10*time.Minute)
  414. genericConfig.Authentication.Authenticator, genericConfig.OpenAPIConfig.SecurityDefinitions, err = BuildAuthenticator(s, clientgoExternalClient, versionedInformers)
  415. if err != nil {
  416. lastErr = fmt.Errorf("invalid authentication config: %v", err)
  417. return
  418. }
  419. genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, err = BuildAuthorizer(s, versionedInformers)
  420. if err != nil {
  421. lastErr = fmt.Errorf("invalid authorization config: %v", err)
  422. return
  423. }
  424. if !sets.NewString(s.Authorization.Modes...).Has(modes.ModeRBAC) {
  425. genericConfig.DisabledPostStartHooks.Insert(rbacrest.PostStartHookName)
  426. }
  427. admissionConfig := &kubeapiserveradmission.Config{
  428. ExternalInformers: versionedInformers,
  429. LoopbackClientConfig: genericConfig.LoopbackClientConfig,
  430. CloudConfigFile: s.CloudProvider.CloudConfigFile,
  431. }
  432. serviceResolver = buildServiceResolver(s.EnableAggregatorRouting, genericConfig.LoopbackClientConfig.Host, versionedInformers)
  433. authInfoResolverWrapper := webhook.NewDefaultAuthenticationInfoResolverWrapper(proxyTransport, genericConfig.EgressSelector, genericConfig.LoopbackClientConfig)
  434. lastErr = s.Audit.ApplyTo(
  435. genericConfig,
  436. genericConfig.LoopbackClientConfig,
  437. versionedInformers,
  438. serveroptions.NewProcessInfo("kube-apiserver", "kube-system"),
  439. &serveroptions.WebhookOptions{
  440. AuthInfoResolverWrapper: authInfoResolverWrapper,
  441. ServiceResolver: serviceResolver,
  442. },
  443. )
  444. if lastErr != nil {
  445. return
  446. }
  447. pluginInitializers, admissionPostStartHook, err = admissionConfig.New(proxyTransport, genericConfig.EgressSelector, serviceResolver)
  448. if err != nil {
  449. lastErr = fmt.Errorf("failed to create admission plugin initializer: %v", err)
  450. return
  451. }
  452. err = s.Admission.ApplyTo(
  453. genericConfig,
  454. versionedInformers,
  455. kubeClientConfig,
  456. feature.DefaultFeatureGate,
  457. pluginInitializers...)
  458. if err != nil {
  459. lastErr = fmt.Errorf("failed to initialize admission: %v", err)
  460. }
  461. return
  462. }
  463. // BuildAuthenticator constructs the authenticator
  464. func BuildAuthenticator(s *options.ServerRunOptions, extclient clientgoclientset.Interface, versionedInformer clientgoinformers.SharedInformerFactory) (authenticator.Request, *spec.SecurityDefinitions, error) {
  465. authenticatorConfig, err := s.Authentication.ToAuthenticationConfig()
  466. if err != nil {
  467. return nil, nil, err
  468. }
  469. if s.Authentication.ServiceAccounts.Lookup || utilfeature.DefaultFeatureGate.Enabled(features.TokenRequest) {
  470. authenticatorConfig.ServiceAccountTokenGetter = serviceaccountcontroller.NewGetterFromClient(
  471. extclient,
  472. versionedInformer.Core().V1().Secrets().Lister(),
  473. versionedInformer.Core().V1().ServiceAccounts().Lister(),
  474. versionedInformer.Core().V1().Pods().Lister(),
  475. )
  476. }
  477. authenticatorConfig.BootstrapTokenAuthenticator = bootstrap.NewTokenAuthenticator(
  478. versionedInformer.Core().V1().Secrets().Lister().Secrets(v1.NamespaceSystem),
  479. )
  480. return authenticatorConfig.New()
  481. }
  482. // BuildAuthorizer constructs the authorizer
  483. func BuildAuthorizer(s *options.ServerRunOptions, versionedInformers clientgoinformers.SharedInformerFactory) (authorizer.Authorizer, authorizer.RuleResolver, error) {
  484. authorizationConfig := s.Authorization.ToAuthorizationConfig(versionedInformers)
  485. return authorizationConfig.New()
  486. }
  487. // completedServerRunOptions is a private wrapper that enforces a call of Complete() before Run can be invoked.
  488. type completedServerRunOptions struct {
  489. *options.ServerRunOptions
  490. }
  491. // Complete set default ServerRunOptions.
  492. // Should be called after kube-apiserver flags parsed.
  493. func Complete(s *options.ServerRunOptions) (completedServerRunOptions, error) {
  494. var options completedServerRunOptions
  495. // set defaults
  496. if err := s.GenericServerRunOptions.DefaultAdvertiseAddress(s.SecureServing.SecureServingOptions); err != nil {
  497. return options, err
  498. }
  499. if err := kubeoptions.DefaultAdvertiseAddress(s.GenericServerRunOptions, s.InsecureServing.DeprecatedInsecureServingOptions); err != nil {
  500. return options, err
  501. }
  502. // process s.ServiceClusterIPRange from list to Primary and Secondary
  503. // we process secondary only if provided by user
  504. apiServerServiceIP, primaryServiceIPRange, secondaryServiceIPRange, err := getServiceIPAndRanges(s.ServiceClusterIPRanges)
  505. if err != nil {
  506. return options, err
  507. }
  508. s.PrimaryServiceClusterIPRange = primaryServiceIPRange
  509. s.SecondaryServiceClusterIPRange = secondaryServiceIPRange
  510. if err := s.SecureServing.MaybeDefaultWithSelfSignedCerts(s.GenericServerRunOptions.AdvertiseAddress.String(), []string{"kubernetes.default.svc", "kubernetes.default", "kubernetes"}, []net.IP{apiServerServiceIP}); err != nil {
  511. return options, fmt.Errorf("error creating self-signed certificates: %v", err)
  512. }
  513. if len(s.GenericServerRunOptions.ExternalHost) == 0 {
  514. if len(s.GenericServerRunOptions.AdvertiseAddress) > 0 {
  515. s.GenericServerRunOptions.ExternalHost = s.GenericServerRunOptions.AdvertiseAddress.String()
  516. } else {
  517. if hostname, err := os.Hostname(); err == nil {
  518. s.GenericServerRunOptions.ExternalHost = hostname
  519. } else {
  520. return options, fmt.Errorf("error finding host name: %v", err)
  521. }
  522. }
  523. klog.Infof("external host was not specified, using %v", s.GenericServerRunOptions.ExternalHost)
  524. }
  525. s.Authentication.ApplyAuthorization(s.Authorization)
  526. // Use (ServiceAccountSigningKeyFile != "") as a proxy to the user enabling
  527. // TokenRequest functionality. This defaulting was convenient, but messed up
  528. // a lot of people when they rotated their serving cert with no idea it was
  529. // connected to their service account keys. We are taking this opportunity to
  530. // remove this problematic defaulting.
  531. if s.ServiceAccountSigningKeyFile == "" {
  532. // Default to the private server key for service account token signing
  533. if len(s.Authentication.ServiceAccounts.KeyFiles) == 0 && s.SecureServing.ServerCert.CertKey.KeyFile != "" {
  534. if kubeauthenticator.IsValidServiceAccountKeyFile(s.SecureServing.ServerCert.CertKey.KeyFile) {
  535. s.Authentication.ServiceAccounts.KeyFiles = []string{s.SecureServing.ServerCert.CertKey.KeyFile}
  536. } else {
  537. klog.Warning("No TLS key provided, service account token authentication disabled")
  538. }
  539. }
  540. }
  541. if s.ServiceAccountSigningKeyFile != "" && s.Authentication.ServiceAccounts.Issuer != "" {
  542. sk, err := keyutil.PrivateKeyFromFile(s.ServiceAccountSigningKeyFile)
  543. if err != nil {
  544. return options, fmt.Errorf("failed to parse service-account-issuer-key-file: %v", err)
  545. }
  546. if s.Authentication.ServiceAccounts.MaxExpiration != 0 {
  547. lowBound := time.Hour
  548. upBound := time.Duration(1<<32) * time.Second
  549. if s.Authentication.ServiceAccounts.MaxExpiration < lowBound ||
  550. s.Authentication.ServiceAccounts.MaxExpiration > upBound {
  551. return options, fmt.Errorf("the serviceaccount max expiration must be between 1 hour to 2^32 seconds")
  552. }
  553. }
  554. s.ServiceAccountIssuer, err = serviceaccount.JWTTokenGenerator(s.Authentication.ServiceAccounts.Issuer, sk)
  555. if err != nil {
  556. return options, fmt.Errorf("failed to build token generator: %v", err)
  557. }
  558. s.ServiceAccountTokenMaxExpiration = s.Authentication.ServiceAccounts.MaxExpiration
  559. }
  560. if s.Etcd.EnableWatchCache {
  561. klog.V(2).Infof("Initializing cache sizes based on %dMB limit", s.GenericServerRunOptions.TargetRAMMB)
  562. sizes := cachesize.NewHeuristicWatchCacheSizes(s.GenericServerRunOptions.TargetRAMMB)
  563. if userSpecified, err := serveroptions.ParseWatchCacheSizes(s.Etcd.WatchCacheSizes); err == nil {
  564. for resource, size := range userSpecified {
  565. sizes[resource] = size
  566. }
  567. }
  568. s.Etcd.WatchCacheSizes, err = serveroptions.WriteWatchCacheSizes(sizes)
  569. if err != nil {
  570. return options, err
  571. }
  572. }
  573. if s.APIEnablement.RuntimeConfig != nil {
  574. for key, value := range s.APIEnablement.RuntimeConfig {
  575. if key == "v1" || strings.HasPrefix(key, "v1/") ||
  576. key == "api/v1" || strings.HasPrefix(key, "api/v1/") {
  577. delete(s.APIEnablement.RuntimeConfig, key)
  578. s.APIEnablement.RuntimeConfig["/v1"] = value
  579. }
  580. if key == "api/legacy" {
  581. delete(s.APIEnablement.RuntimeConfig, key)
  582. }
  583. }
  584. }
  585. options.ServerRunOptions = s
  586. return options, nil
  587. }
  588. func buildServiceResolver(enabledAggregatorRouting bool, hostname string, informer clientgoinformers.SharedInformerFactory) webhook.ServiceResolver {
  589. var serviceResolver webhook.ServiceResolver
  590. if enabledAggregatorRouting {
  591. serviceResolver = aggregatorapiserver.NewEndpointServiceResolver(
  592. informer.Core().V1().Services().Lister(),
  593. informer.Core().V1().Endpoints().Lister(),
  594. )
  595. } else {
  596. serviceResolver = aggregatorapiserver.NewClusterIPServiceResolver(
  597. informer.Core().V1().Services().Lister(),
  598. )
  599. }
  600. // resolve kubernetes.default.svc locally
  601. if localHost, err := url.Parse(hostname); err == nil {
  602. serviceResolver = aggregatorapiserver.NewLoopbackServiceResolver(serviceResolver, localHost)
  603. }
  604. return serviceResolver
  605. }
  606. func getServiceIPAndRanges(serviceClusterIPRanges string) (net.IP, net.IPNet, net.IPNet, error) {
  607. serviceClusterIPRangeList := []string{}
  608. if serviceClusterIPRanges != "" {
  609. serviceClusterIPRangeList = strings.Split(serviceClusterIPRanges, ",")
  610. }
  611. var apiServerServiceIP net.IP
  612. var primaryServiceIPRange net.IPNet
  613. var secondaryServiceIPRange net.IPNet
  614. var err error
  615. // nothing provided by user, use default range (only applies to the Primary)
  616. if len(serviceClusterIPRangeList) == 0 {
  617. var primaryServiceClusterCIDR net.IPNet
  618. primaryServiceIPRange, apiServerServiceIP, err = master.ServiceIPRange(primaryServiceClusterCIDR)
  619. if err != nil {
  620. return net.IP{}, net.IPNet{}, net.IPNet{}, fmt.Errorf("error determining service IP ranges: %v", err)
  621. }
  622. return apiServerServiceIP, primaryServiceIPRange, net.IPNet{}, nil
  623. }
  624. if len(serviceClusterIPRangeList) > 0 {
  625. _, primaryServiceClusterCIDR, err := net.ParseCIDR(serviceClusterIPRangeList[0])
  626. if err != nil {
  627. return net.IP{}, net.IPNet{}, net.IPNet{}, fmt.Errorf("service-cluster-ip-range[0] is not a valid cidr")
  628. }
  629. primaryServiceIPRange, apiServerServiceIP, err = master.ServiceIPRange(*(primaryServiceClusterCIDR))
  630. if err != nil {
  631. return net.IP{}, net.IPNet{}, net.IPNet{}, fmt.Errorf("error determining service IP ranges for primary service cidr: %v", err)
  632. }
  633. }
  634. // user provided at least two entries
  635. // note: validation asserts that the list is max of two dual stack entries
  636. if len(serviceClusterIPRangeList) > 1 {
  637. _, secondaryServiceClusterCIDR, err := net.ParseCIDR(serviceClusterIPRangeList[1])
  638. if err != nil {
  639. return net.IP{}, net.IPNet{}, net.IPNet{}, fmt.Errorf("service-cluster-ip-range[1] is not an ip net")
  640. }
  641. secondaryServiceIPRange = *secondaryServiceClusterCIDR
  642. }
  643. return apiServerServiceIP, primaryServiceIPRange, secondaryServiceIPRange, nil
  644. }