123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773 |
- /*
- Copyright 2014 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- // Package app does all of the work necessary to create a Kubernetes
- // APIServer by binding together the API, master and APIServer infrastructure.
- // It can be configured and called directly or via the hyperkube framework.
- package app
- import (
- "crypto/tls"
- "fmt"
- "net"
- "net/http"
- "net/url"
- "os"
- "strconv"
- "strings"
- "time"
- "github.com/go-openapi/spec"
- "github.com/spf13/cobra"
- extensionsapiserver "k8s.io/apiextensions-apiserver/pkg/apiserver"
- v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- utilerrors "k8s.io/apimachinery/pkg/util/errors"
- utilnet "k8s.io/apimachinery/pkg/util/net"
- "k8s.io/apimachinery/pkg/util/sets"
- utilwait "k8s.io/apimachinery/pkg/util/wait"
- "k8s.io/apiserver/pkg/admission"
- "k8s.io/apiserver/pkg/authentication/authenticator"
- "k8s.io/apiserver/pkg/authorization/authorizer"
- openapinamer "k8s.io/apiserver/pkg/endpoints/openapi"
- genericfeatures "k8s.io/apiserver/pkg/features"
- genericapiserver "k8s.io/apiserver/pkg/server"
- "k8s.io/apiserver/pkg/server/egressselector"
- "k8s.io/apiserver/pkg/server/filters"
- serveroptions "k8s.io/apiserver/pkg/server/options"
- serverstorage "k8s.io/apiserver/pkg/server/storage"
- "k8s.io/apiserver/pkg/storage/etcd3/preflight"
- "k8s.io/apiserver/pkg/util/feature"
- utilfeature "k8s.io/apiserver/pkg/util/feature"
- utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
- "k8s.io/apiserver/pkg/util/term"
- "k8s.io/apiserver/pkg/util/webhook"
- clientgoinformers "k8s.io/client-go/informers"
- clientgoclientset "k8s.io/client-go/kubernetes"
- "k8s.io/client-go/util/keyutil"
- cloudprovider "k8s.io/cloud-provider"
- cliflag "k8s.io/component-base/cli/flag"
- "k8s.io/component-base/cli/globalflag"
- "k8s.io/component-base/metrics"
- _ "k8s.io/component-base/metrics/prometheus/workqueue" // for workqueue metric registration
- "k8s.io/component-base/version"
- "k8s.io/component-base/version/verflag"
- "k8s.io/klog"
- aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver"
- aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme"
- "k8s.io/kubernetes/cmd/kube-apiserver/app/options"
- "k8s.io/kubernetes/pkg/api/legacyscheme"
- "k8s.io/kubernetes/pkg/capabilities"
- serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount"
- "k8s.io/kubernetes/pkg/features"
- generatedopenapi "k8s.io/kubernetes/pkg/generated/openapi"
- "k8s.io/kubernetes/pkg/kubeapiserver"
- kubeapiserveradmission "k8s.io/kubernetes/pkg/kubeapiserver/admission"
- kubeauthenticator "k8s.io/kubernetes/pkg/kubeapiserver/authenticator"
- "k8s.io/kubernetes/pkg/kubeapiserver/authorizer/modes"
- kubeoptions "k8s.io/kubernetes/pkg/kubeapiserver/options"
- kubeserver "k8s.io/kubernetes/pkg/kubeapiserver/server"
- "k8s.io/kubernetes/pkg/master"
- "k8s.io/kubernetes/pkg/master/reconcilers"
- "k8s.io/kubernetes/pkg/master/tunneler"
- "k8s.io/kubernetes/pkg/registry/cachesize"
- rbacrest "k8s.io/kubernetes/pkg/registry/rbac/rest"
- "k8s.io/kubernetes/pkg/serviceaccount"
- utilflag "k8s.io/kubernetes/pkg/util/flag"
- "k8s.io/kubernetes/plugin/pkg/auth/authenticator/token/bootstrap"
- )
- const (
- etcdRetryLimit = 60
- etcdRetryInterval = 1 * time.Second
- )
- // NewAPIServerCommand creates a *cobra.Command object with default parameters
- func NewAPIServerCommand() *cobra.Command {
- s := options.NewServerRunOptions()
- cmd := &cobra.Command{
- Use: "kube-apiserver",
- Long: `The Kubernetes API server validates and configures data
- for the api objects which include pods, services, replicationcontrollers, and
- others. The API Server services REST operations and provides the frontend to the
- cluster's shared state through which all other components interact.`,
- RunE: func(cmd *cobra.Command, args []string) error {
- verflag.PrintAndExitIfRequested()
- utilflag.PrintFlags(cmd.Flags())
- // set default options
- completedOptions, err := Complete(s)
- if err != nil {
- return err
- }
- // validate options
- if errs := completedOptions.Validate(); len(errs) != 0 {
- return utilerrors.NewAggregate(errs)
- }
- return Run(completedOptions, genericapiserver.SetupSignalHandler())
- },
- }
- fs := cmd.Flags()
- namedFlagSets := s.Flags()
- verflag.AddFlags(namedFlagSets.FlagSet("global"))
- globalflag.AddGlobalFlags(namedFlagSets.FlagSet("global"), cmd.Name())
- options.AddCustomGlobalFlags(namedFlagSets.FlagSet("generic"))
- for _, f := range namedFlagSets.FlagSets {
- fs.AddFlagSet(f)
- }
- usageFmt := "Usage:\n %s\n"
- cols, _, _ := term.TerminalSize(cmd.OutOrStdout())
- cmd.SetUsageFunc(func(cmd *cobra.Command) error {
- fmt.Fprintf(cmd.OutOrStderr(), usageFmt, cmd.UseLine())
- cliflag.PrintSections(cmd.OutOrStderr(), namedFlagSets, cols)
- return nil
- })
- cmd.SetHelpFunc(func(cmd *cobra.Command, args []string) {
- fmt.Fprintf(cmd.OutOrStdout(), "%s\n\n"+usageFmt, cmd.Long, cmd.UseLine())
- cliflag.PrintSections(cmd.OutOrStdout(), namedFlagSets, cols)
- })
- return cmd
- }
- // Run runs the specified APIServer. This should never exit.
- func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error {
- // To help debugging, immediately log version
- klog.Infof("Version: %+v", version.Get())
- server, err := CreateServerChain(completeOptions, stopCh)
- if err != nil {
- return err
- }
- prepared, err := server.PrepareRun()
- if err != nil {
- return err
- }
- return prepared.Run(stopCh)
- }
- // CreateServerChain creates the apiservers connected via delegation.
- func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*aggregatorapiserver.APIAggregator, error) {
- nodeTunneler, proxyTransport, err := CreateNodeDialer(completedOptions)
- if err != nil {
- return nil, err
- }
- kubeAPIServerConfig, insecureServingInfo, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions, nodeTunneler, proxyTransport)
- if err != nil {
- return nil, err
- }
- // If additional API servers are added, they should be gated.
- apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,
- serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(proxyTransport, kubeAPIServerConfig.GenericConfig.EgressSelector, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig))
- if err != nil {
- return nil, err
- }
- apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegate())
- if err != nil {
- return nil, err
- }
- kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer)
- if err != nil {
- return nil, err
- }
- // aggregator comes last in the chain
- aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.ExtraConfig.VersionedInformers, serviceResolver, proxyTransport, pluginInitializer)
- if err != nil {
- return nil, err
- }
- aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)
- if err != nil {
- // we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
- return nil, err
- }
- if insecureServingInfo != nil {
- insecureHandlerChain := kubeserver.BuildInsecureHandlerChain(aggregatorServer.GenericAPIServer.UnprotectedHandler(), kubeAPIServerConfig.GenericConfig)
- if err := insecureServingInfo.Serve(insecureHandlerChain, kubeAPIServerConfig.GenericConfig.RequestTimeout, stopCh); err != nil {
- return nil, err
- }
- }
- return aggregatorServer, nil
- }
- // CreateKubeAPIServer creates and wires a workable kube-apiserver
- func CreateKubeAPIServer(kubeAPIServerConfig *master.Config, delegateAPIServer genericapiserver.DelegationTarget) (*master.Master, error) {
- kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer)
- if err != nil {
- return nil, err
- }
- return kubeAPIServer, nil
- }
- // CreateNodeDialer creates the dialer infrastructure to connect to the nodes.
- func CreateNodeDialer(s completedServerRunOptions) (tunneler.Tunneler, *http.Transport, error) {
- // Setup nodeTunneler if needed
- var nodeTunneler tunneler.Tunneler
- var proxyDialerFn utilnet.DialFunc
- if len(s.SSHUser) > 0 {
- // Get ssh key distribution func, if supported
- var installSSHKey tunneler.InstallSSHKey
- cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider.CloudProvider, s.CloudProvider.CloudConfigFile)
- if err != nil {
- return nil, nil, fmt.Errorf("cloud provider could not be initialized: %v", err)
- }
- if cloud != nil {
- if instances, supported := cloud.Instances(); supported {
- installSSHKey = instances.AddSSHKeyToAllInstances
- }
- }
- if s.KubeletConfig.Port == 0 {
- return nil, nil, fmt.Errorf("must enable kubelet port if proxy ssh-tunneling is specified")
- }
- if s.KubeletConfig.ReadOnlyPort == 0 {
- return nil, nil, fmt.Errorf("must enable kubelet readonly port if proxy ssh-tunneling is specified")
- }
- // Set up the nodeTunneler
- // TODO(cjcullen): If we want this to handle per-kubelet ports or other
- // kubelet listen-addresses, we need to plumb through options.
- healthCheckPath := &url.URL{
- Scheme: "http",
- Host: net.JoinHostPort("127.0.0.1", strconv.FormatUint(uint64(s.KubeletConfig.ReadOnlyPort), 10)),
- Path: "healthz",
- }
- nodeTunneler = tunneler.New(s.SSHUser, s.SSHKeyfile, healthCheckPath, installSSHKey)
- // Use the nodeTunneler's dialer when proxying to pods, services, and nodes
- proxyDialerFn = nodeTunneler.Dial
- }
- // Proxying to pods and services is IP-based... don't expect to be able to verify the hostname
- proxyTLSClientConfig := &tls.Config{InsecureSkipVerify: true}
- proxyTransport := utilnet.SetTransportDefaults(&http.Transport{
- DialContext: proxyDialerFn,
- TLSClientConfig: proxyTLSClientConfig,
- })
- return nodeTunneler, proxyTransport, nil
- }
- // CreateKubeAPIServerConfig creates all the resources for running the API server, but runs none of them
- func CreateKubeAPIServerConfig(
- s completedServerRunOptions,
- nodeTunneler tunneler.Tunneler,
- proxyTransport *http.Transport,
- ) (
- *master.Config,
- *genericapiserver.DeprecatedInsecureServingInfo,
- aggregatorapiserver.ServiceResolver,
- []admission.PluginInitializer,
- error,
- ) {
- genericConfig, versionedInformers, insecureServingInfo, serviceResolver, pluginInitializers, admissionPostStartHook, storageFactory, err := buildGenericConfig(s.ServerRunOptions, proxyTransport)
- if err != nil {
- return nil, nil, nil, nil, err
- }
- if _, port, err := net.SplitHostPort(s.Etcd.StorageConfig.Transport.ServerList[0]); err == nil && port != "0" && len(port) != 0 {
- if err := utilwait.PollImmediate(etcdRetryInterval, etcdRetryLimit*etcdRetryInterval, preflight.EtcdConnection{ServerList: s.Etcd.StorageConfig.Transport.ServerList}.CheckEtcdServers); err != nil {
- return nil, nil, nil, nil, fmt.Errorf("error waiting for etcd connection: %v", err)
- }
- }
- capabilities.Initialize(capabilities.Capabilities{
- AllowPrivileged: s.AllowPrivileged,
- // TODO(vmarmol): Implement support for HostNetworkSources.
- PrivilegedSources: capabilities.PrivilegedSources{
- HostNetworkSources: []string{},
- HostPIDSources: []string{},
- HostIPCSources: []string{},
- },
- PerConnectionBandwidthLimitBytesPerSec: s.MaxConnectionBytesPerSec,
- })
- if len(s.ShowHiddenMetricsForVersion) > 0 {
- metrics.SetShowHidden()
- }
- serviceIPRange, apiServerServiceIP, err := master.ServiceIPRange(s.PrimaryServiceClusterIPRange)
- if err != nil {
- return nil, nil, nil, nil, err
- }
- // defaults to empty range and ip
- var secondaryServiceIPRange net.IPNet
- // process secondary range only if provided by user
- if s.SecondaryServiceClusterIPRange.IP != nil {
- secondaryServiceIPRange, _, err = master.ServiceIPRange(s.SecondaryServiceClusterIPRange)
- if err != nil {
- return nil, nil, nil, nil, err
- }
- }
- config := &master.Config{
- GenericConfig: genericConfig,
- ExtraConfig: master.ExtraConfig{
- APIResourceConfigSource: storageFactory.APIResourceConfigSource,
- StorageFactory: storageFactory,
- EventTTL: s.EventTTL,
- KubeletClientConfig: s.KubeletConfig,
- EnableLogsSupport: s.EnableLogsHandler,
- ProxyTransport: proxyTransport,
- Tunneler: nodeTunneler,
- ServiceIPRange: serviceIPRange,
- APIServerServiceIP: apiServerServiceIP,
- SecondaryServiceIPRange: secondaryServiceIPRange,
- APIServerServicePort: 443,
- ServiceNodePortRange: s.ServiceNodePortRange,
- KubernetesServiceNodePort: s.KubernetesServiceNodePort,
- EndpointReconcilerType: reconcilers.Type(s.EndpointReconcilerType),
- MasterCount: s.MasterCount,
- ServiceAccountIssuer: s.ServiceAccountIssuer,
- ServiceAccountMaxExpiration: s.ServiceAccountTokenMaxExpiration,
- VersionedInformers: versionedInformers,
- },
- }
- clientCAProvider, err := s.Authentication.ClientCert.GetClientCAContentProvider()
- if err != nil {
- return nil, nil, nil, nil, err
- }
- config.ExtraConfig.ClusterAuthenticationInfo.ClientCA = clientCAProvider
- requestHeaderConfig, err := s.Authentication.RequestHeader.ToAuthenticationRequestHeaderConfig()
- if err != nil {
- return nil, nil, nil, nil, err
- }
- if requestHeaderConfig != nil {
- config.ExtraConfig.ClusterAuthenticationInfo.RequestHeaderCA = requestHeaderConfig.CAContentProvider
- config.ExtraConfig.ClusterAuthenticationInfo.RequestHeaderAllowedNames = requestHeaderConfig.AllowedClientNames
- config.ExtraConfig.ClusterAuthenticationInfo.RequestHeaderExtraHeaderPrefixes = requestHeaderConfig.ExtraHeaderPrefixes
- config.ExtraConfig.ClusterAuthenticationInfo.RequestHeaderGroupHeaders = requestHeaderConfig.GroupHeaders
- config.ExtraConfig.ClusterAuthenticationInfo.RequestHeaderUsernameHeaders = requestHeaderConfig.UsernameHeaders
- }
- if err := config.GenericConfig.AddPostStartHook("start-kube-apiserver-admission-initializer", admissionPostStartHook); err != nil {
- return nil, nil, nil, nil, err
- }
- if nodeTunneler != nil {
- // Use the nodeTunneler's dialer to connect to the kubelet
- config.ExtraConfig.KubeletClientConfig.Dial = nodeTunneler.Dial
- }
- if config.GenericConfig.EgressSelector != nil {
- // Use the config.GenericConfig.EgressSelector lookup to find the dialer to connect to the kubelet
- config.ExtraConfig.KubeletClientConfig.Lookup = config.GenericConfig.EgressSelector.Lookup
- // Use the config.GenericConfig.EgressSelector lookup as the transport used by the "proxy" subresources.
- networkContext := egressselector.Cluster.AsNetworkContext()
- dialer, err := config.GenericConfig.EgressSelector.Lookup(networkContext)
- if err != nil {
- return nil, nil, nil, nil, err
- }
- c := proxyTransport.Clone()
- c.DialContext = dialer
- config.ExtraConfig.ProxyTransport = c
- }
- if utilfeature.DefaultFeatureGate.Enabled(features.ServiceAccountIssuerDiscovery) {
- // Load the public keys.
- var pubKeys []interface{}
- for _, f := range s.Authentication.ServiceAccounts.KeyFiles {
- keys, err := keyutil.PublicKeysFromFile(f)
- if err != nil {
- return nil, nil, nil, nil, fmt.Errorf("failed to parse key file %q: %v", f, err)
- }
- pubKeys = append(pubKeys, keys...)
- }
- // Plumb the required metadata through ExtraConfig.
- config.ExtraConfig.ServiceAccountIssuerURL = s.Authentication.ServiceAccounts.Issuer
- config.ExtraConfig.ServiceAccountJWKSURI = s.Authentication.ServiceAccounts.JWKSURI
- config.ExtraConfig.ServiceAccountPublicKeys = pubKeys
- }
- return config, insecureServingInfo, serviceResolver, pluginInitializers, nil
- }
- // BuildGenericConfig takes the master server options and produces the genericapiserver.Config associated with it
- func buildGenericConfig(
- s *options.ServerRunOptions,
- proxyTransport *http.Transport,
- ) (
- genericConfig *genericapiserver.Config,
- versionedInformers clientgoinformers.SharedInformerFactory,
- insecureServingInfo *genericapiserver.DeprecatedInsecureServingInfo,
- serviceResolver aggregatorapiserver.ServiceResolver,
- pluginInitializers []admission.PluginInitializer,
- admissionPostStartHook genericapiserver.PostStartHookFunc,
- storageFactory *serverstorage.DefaultStorageFactory,
- lastErr error,
- ) {
- genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs)
- genericConfig.MergedResourceConfig = master.DefaultAPIResourceConfigSource()
- if lastErr = s.GenericServerRunOptions.ApplyTo(genericConfig); lastErr != nil {
- return
- }
- if lastErr = s.InsecureServing.ApplyTo(&insecureServingInfo, &genericConfig.LoopbackClientConfig); lastErr != nil {
- return
- }
- if lastErr = s.SecureServing.ApplyTo(&genericConfig.SecureServing, &genericConfig.LoopbackClientConfig); lastErr != nil {
- return
- }
- if lastErr = s.Authentication.ApplyTo(genericConfig); lastErr != nil {
- return
- }
- if lastErr = s.Features.ApplyTo(genericConfig); lastErr != nil {
- return
- }
- if lastErr = s.APIEnablement.ApplyTo(genericConfig, master.DefaultAPIResourceConfigSource(), legacyscheme.Scheme); lastErr != nil {
- return
- }
- if lastErr = s.EgressSelector.ApplyTo(genericConfig); lastErr != nil {
- return
- }
- genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensionsapiserver.Scheme, aggregatorscheme.Scheme))
- genericConfig.OpenAPIConfig.Info.Title = "Kubernetes"
- genericConfig.LongRunningFunc = filters.BasicLongRunningRequestCheck(
- sets.NewString("watch", "proxy"),
- sets.NewString("attach", "exec", "proxy", "log", "portforward"),
- )
- kubeVersion := version.Get()
- genericConfig.Version = &kubeVersion
- storageFactoryConfig := kubeapiserver.NewStorageFactoryConfig()
- storageFactoryConfig.APIResourceConfig = genericConfig.MergedResourceConfig
- completedStorageFactoryConfig, err := storageFactoryConfig.Complete(s.Etcd)
- if err != nil {
- lastErr = err
- return
- }
- storageFactory, lastErr = completedStorageFactoryConfig.New()
- if lastErr != nil {
- return
- }
- if genericConfig.EgressSelector != nil {
- storageFactory.StorageConfig.Transport.EgressLookup = genericConfig.EgressSelector.Lookup
- }
- if lastErr = s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); lastErr != nil {
- return
- }
- // Use protobufs for self-communication.
- // Since not every generic apiserver has to support protobufs, we
- // cannot default to it in generic apiserver and need to explicitly
- // set it in kube-apiserver.
- genericConfig.LoopbackClientConfig.ContentConfig.ContentType = "application/vnd.kubernetes.protobuf"
- // Disable compression for self-communication, since we are going to be
- // on a fast local network
- genericConfig.LoopbackClientConfig.DisableCompression = true
- kubeClientConfig := genericConfig.LoopbackClientConfig
- clientgoExternalClient, err := clientgoclientset.NewForConfig(kubeClientConfig)
- if err != nil {
- lastErr = fmt.Errorf("failed to create real external clientset: %v", err)
- return
- }
- versionedInformers = clientgoinformers.NewSharedInformerFactory(clientgoExternalClient, 10*time.Minute)
- genericConfig.Authentication.Authenticator, genericConfig.OpenAPIConfig.SecurityDefinitions, err = BuildAuthenticator(s, clientgoExternalClient, versionedInformers)
- if err != nil {
- lastErr = fmt.Errorf("invalid authentication config: %v", err)
- return
- }
- genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, err = BuildAuthorizer(s, versionedInformers)
- if err != nil {
- lastErr = fmt.Errorf("invalid authorization config: %v", err)
- return
- }
- if !sets.NewString(s.Authorization.Modes...).Has(modes.ModeRBAC) {
- genericConfig.DisabledPostStartHooks.Insert(rbacrest.PostStartHookName)
- }
- admissionConfig := &kubeapiserveradmission.Config{
- ExternalInformers: versionedInformers,
- LoopbackClientConfig: genericConfig.LoopbackClientConfig,
- CloudConfigFile: s.CloudProvider.CloudConfigFile,
- }
- serviceResolver = buildServiceResolver(s.EnableAggregatorRouting, genericConfig.LoopbackClientConfig.Host, versionedInformers)
- authInfoResolverWrapper := webhook.NewDefaultAuthenticationInfoResolverWrapper(proxyTransport, genericConfig.EgressSelector, genericConfig.LoopbackClientConfig)
- lastErr = s.Audit.ApplyTo(
- genericConfig,
- genericConfig.LoopbackClientConfig,
- versionedInformers,
- serveroptions.NewProcessInfo("kube-apiserver", "kube-system"),
- &serveroptions.WebhookOptions{
- AuthInfoResolverWrapper: authInfoResolverWrapper,
- ServiceResolver: serviceResolver,
- },
- )
- if lastErr != nil {
- return
- }
- pluginInitializers, admissionPostStartHook, err = admissionConfig.New(proxyTransport, genericConfig.EgressSelector, serviceResolver)
- if err != nil {
- lastErr = fmt.Errorf("failed to create admission plugin initializer: %v", err)
- return
- }
- err = s.Admission.ApplyTo(
- genericConfig,
- versionedInformers,
- kubeClientConfig,
- feature.DefaultFeatureGate,
- pluginInitializers...)
- if err != nil {
- lastErr = fmt.Errorf("failed to initialize admission: %v", err)
- }
- if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIPriorityAndFairness) && s.GenericServerRunOptions.EnableInflightQuotaHandler {
- genericConfig.FlowControl = BuildPriorityAndFairness(s, clientgoExternalClient, versionedInformers)
- }
- return
- }
- // BuildAuthenticator constructs the authenticator
- func BuildAuthenticator(s *options.ServerRunOptions, extclient clientgoclientset.Interface, versionedInformer clientgoinformers.SharedInformerFactory) (authenticator.Request, *spec.SecurityDefinitions, error) {
- authenticatorConfig, err := s.Authentication.ToAuthenticationConfig()
- if err != nil {
- return nil, nil, err
- }
- if s.Authentication.ServiceAccounts.Lookup || utilfeature.DefaultFeatureGate.Enabled(features.TokenRequest) {
- authenticatorConfig.ServiceAccountTokenGetter = serviceaccountcontroller.NewGetterFromClient(
- extclient,
- versionedInformer.Core().V1().Secrets().Lister(),
- versionedInformer.Core().V1().ServiceAccounts().Lister(),
- versionedInformer.Core().V1().Pods().Lister(),
- )
- }
- authenticatorConfig.BootstrapTokenAuthenticator = bootstrap.NewTokenAuthenticator(
- versionedInformer.Core().V1().Secrets().Lister().Secrets(v1.NamespaceSystem),
- )
- return authenticatorConfig.New()
- }
- // BuildAuthorizer constructs the authorizer
- func BuildAuthorizer(s *options.ServerRunOptions, versionedInformers clientgoinformers.SharedInformerFactory) (authorizer.Authorizer, authorizer.RuleResolver, error) {
- authorizationConfig := s.Authorization.ToAuthorizationConfig(versionedInformers)
- return authorizationConfig.New()
- }
- // BuildPriorityAndFairness constructs the guts of the API Priority and Fairness filter
- func BuildPriorityAndFairness(s *options.ServerRunOptions, extclient clientgoclientset.Interface, versionedInformer clientgoinformers.SharedInformerFactory) utilflowcontrol.Interface {
- return utilflowcontrol.New(
- versionedInformer,
- extclient.FlowcontrolV1alpha1(),
- s.GenericServerRunOptions.MaxRequestsInFlight+s.GenericServerRunOptions.MaxMutatingRequestsInFlight,
- s.GenericServerRunOptions.RequestTimeout/4,
- )
- }
- // completedServerRunOptions is a private wrapper that enforces a call of Complete() before Run can be invoked.
- type completedServerRunOptions struct {
- *options.ServerRunOptions
- }
- // Complete set default ServerRunOptions.
- // Should be called after kube-apiserver flags parsed.
- func Complete(s *options.ServerRunOptions) (completedServerRunOptions, error) {
- var options completedServerRunOptions
- // set defaults
- if err := s.GenericServerRunOptions.DefaultAdvertiseAddress(s.SecureServing.SecureServingOptions); err != nil {
- return options, err
- }
- if err := kubeoptions.DefaultAdvertiseAddress(s.GenericServerRunOptions, s.InsecureServing.DeprecatedInsecureServingOptions); err != nil {
- return options, err
- }
- // process s.ServiceClusterIPRange from list to Primary and Secondary
- // we process secondary only if provided by user
- apiServerServiceIP, primaryServiceIPRange, secondaryServiceIPRange, err := getServiceIPAndRanges(s.ServiceClusterIPRanges)
- if err != nil {
- return options, err
- }
- s.PrimaryServiceClusterIPRange = primaryServiceIPRange
- s.SecondaryServiceClusterIPRange = secondaryServiceIPRange
- if err := s.SecureServing.MaybeDefaultWithSelfSignedCerts(s.GenericServerRunOptions.AdvertiseAddress.String(), []string{"kubernetes.default.svc", "kubernetes.default", "kubernetes"}, []net.IP{apiServerServiceIP}); err != nil {
- return options, fmt.Errorf("error creating self-signed certificates: %v", err)
- }
- if len(s.GenericServerRunOptions.ExternalHost) == 0 {
- if len(s.GenericServerRunOptions.AdvertiseAddress) > 0 {
- s.GenericServerRunOptions.ExternalHost = s.GenericServerRunOptions.AdvertiseAddress.String()
- } else {
- if hostname, err := os.Hostname(); err == nil {
- s.GenericServerRunOptions.ExternalHost = hostname
- } else {
- return options, fmt.Errorf("error finding host name: %v", err)
- }
- }
- klog.Infof("external host was not specified, using %v", s.GenericServerRunOptions.ExternalHost)
- }
- s.Authentication.ApplyAuthorization(s.Authorization)
- // Use (ServiceAccountSigningKeyFile != "") as a proxy to the user enabling
- // TokenRequest functionality. This defaulting was convenient, but messed up
- // a lot of people when they rotated their serving cert with no idea it was
- // connected to their service account keys. We are taking this opportunity to
- // remove this problematic defaulting.
- if s.ServiceAccountSigningKeyFile == "" {
- // Default to the private server key for service account token signing
- if len(s.Authentication.ServiceAccounts.KeyFiles) == 0 && s.SecureServing.ServerCert.CertKey.KeyFile != "" {
- if kubeauthenticator.IsValidServiceAccountKeyFile(s.SecureServing.ServerCert.CertKey.KeyFile) {
- s.Authentication.ServiceAccounts.KeyFiles = []string{s.SecureServing.ServerCert.CertKey.KeyFile}
- } else {
- klog.Warning("No TLS key provided, service account token authentication disabled")
- }
- }
- }
- if s.ServiceAccountSigningKeyFile != "" && s.Authentication.ServiceAccounts.Issuer != "" {
- sk, err := keyutil.PrivateKeyFromFile(s.ServiceAccountSigningKeyFile)
- if err != nil {
- return options, fmt.Errorf("failed to parse service-account-issuer-key-file: %v", err)
- }
- if s.Authentication.ServiceAccounts.MaxExpiration != 0 {
- lowBound := time.Hour
- upBound := time.Duration(1<<32) * time.Second
- if s.Authentication.ServiceAccounts.MaxExpiration < lowBound ||
- s.Authentication.ServiceAccounts.MaxExpiration > upBound {
- return options, fmt.Errorf("the serviceaccount max expiration must be between 1 hour to 2^32 seconds")
- }
- }
- s.ServiceAccountIssuer, err = serviceaccount.JWTTokenGenerator(s.Authentication.ServiceAccounts.Issuer, sk)
- if err != nil {
- return options, fmt.Errorf("failed to build token generator: %v", err)
- }
- s.ServiceAccountTokenMaxExpiration = s.Authentication.ServiceAccounts.MaxExpiration
- }
- if s.Etcd.EnableWatchCache {
- klog.V(2).Infof("Initializing cache sizes based on %dMB limit", s.GenericServerRunOptions.TargetRAMMB)
- sizes := cachesize.NewHeuristicWatchCacheSizes(s.GenericServerRunOptions.TargetRAMMB)
- if userSpecified, err := serveroptions.ParseWatchCacheSizes(s.Etcd.WatchCacheSizes); err == nil {
- for resource, size := range userSpecified {
- sizes[resource] = size
- }
- }
- s.Etcd.WatchCacheSizes, err = serveroptions.WriteWatchCacheSizes(sizes)
- if err != nil {
- return options, err
- }
- }
- if s.APIEnablement.RuntimeConfig != nil {
- for key, value := range s.APIEnablement.RuntimeConfig {
- if key == "v1" || strings.HasPrefix(key, "v1/") ||
- key == "api/v1" || strings.HasPrefix(key, "api/v1/") {
- delete(s.APIEnablement.RuntimeConfig, key)
- s.APIEnablement.RuntimeConfig["/v1"] = value
- }
- if key == "api/legacy" {
- delete(s.APIEnablement.RuntimeConfig, key)
- }
- }
- }
- options.ServerRunOptions = s
- return options, nil
- }
- func buildServiceResolver(enabledAggregatorRouting bool, hostname string, informer clientgoinformers.SharedInformerFactory) webhook.ServiceResolver {
- var serviceResolver webhook.ServiceResolver
- if enabledAggregatorRouting {
- serviceResolver = aggregatorapiserver.NewEndpointServiceResolver(
- informer.Core().V1().Services().Lister(),
- informer.Core().V1().Endpoints().Lister(),
- )
- } else {
- serviceResolver = aggregatorapiserver.NewClusterIPServiceResolver(
- informer.Core().V1().Services().Lister(),
- )
- }
- // resolve kubernetes.default.svc locally
- if localHost, err := url.Parse(hostname); err == nil {
- serviceResolver = aggregatorapiserver.NewLoopbackServiceResolver(serviceResolver, localHost)
- }
- return serviceResolver
- }
- func getServiceIPAndRanges(serviceClusterIPRanges string) (net.IP, net.IPNet, net.IPNet, error) {
- serviceClusterIPRangeList := []string{}
- if serviceClusterIPRanges != "" {
- serviceClusterIPRangeList = strings.Split(serviceClusterIPRanges, ",")
- }
- var apiServerServiceIP net.IP
- var primaryServiceIPRange net.IPNet
- var secondaryServiceIPRange net.IPNet
- var err error
- // nothing provided by user, use default range (only applies to the Primary)
- if len(serviceClusterIPRangeList) == 0 {
- var primaryServiceClusterCIDR net.IPNet
- primaryServiceIPRange, apiServerServiceIP, err = master.ServiceIPRange(primaryServiceClusterCIDR)
- if err != nil {
- return net.IP{}, net.IPNet{}, net.IPNet{}, fmt.Errorf("error determining service IP ranges: %v", err)
- }
- return apiServerServiceIP, primaryServiceIPRange, net.IPNet{}, nil
- }
- if len(serviceClusterIPRangeList) > 0 {
- _, primaryServiceClusterCIDR, err := net.ParseCIDR(serviceClusterIPRangeList[0])
- if err != nil {
- return net.IP{}, net.IPNet{}, net.IPNet{}, fmt.Errorf("service-cluster-ip-range[0] is not a valid cidr")
- }
- primaryServiceIPRange, apiServerServiceIP, err = master.ServiceIPRange(*(primaryServiceClusterCIDR))
- if err != nil {
- return net.IP{}, net.IPNet{}, net.IPNet{}, fmt.Errorf("error determining service IP ranges for primary service cidr: %v", err)
- }
- }
- // user provided at least two entries
- // note: validation asserts that the list is max of two dual stack entries
- if len(serviceClusterIPRangeList) > 1 {
- _, secondaryServiceClusterCIDR, err := net.ParseCIDR(serviceClusterIPRangeList[1])
- if err != nil {
- return net.IP{}, net.IPNet{}, net.IPNet{}, fmt.Errorf("service-cluster-ip-range[1] is not an ip net")
- }
- secondaryServiceIPRange = *secondaryServiceClusterCIDR
- }
- return apiServerServiceIP, primaryServiceIPRange, secondaryServiceIPRange, nil
- }
|