123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312 |
- /*
- Copyright 2014 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package master
- import (
- "fmt"
- "net"
- "net/http"
- "time"
- corev1 "k8s.io/api/core/v1"
- "k8s.io/apimachinery/pkg/api/errors"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/util/intstr"
- utilnet "k8s.io/apimachinery/pkg/util/net"
- "k8s.io/apimachinery/pkg/util/runtime"
- "k8s.io/apimachinery/pkg/util/wait"
- genericapiserver "k8s.io/apiserver/pkg/server"
- utilfeature "k8s.io/apiserver/pkg/util/feature"
- corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
- "k8s.io/client-go/rest"
- "k8s.io/klog"
- "k8s.io/kubernetes/pkg/features"
- "k8s.io/kubernetes/pkg/master/reconcilers"
- "k8s.io/kubernetes/pkg/registry/core/rangeallocation"
- corerest "k8s.io/kubernetes/pkg/registry/core/rest"
- servicecontroller "k8s.io/kubernetes/pkg/registry/core/service/ipallocator/controller"
- portallocatorcontroller "k8s.io/kubernetes/pkg/registry/core/service/portallocator/controller"
- "k8s.io/kubernetes/pkg/util/async"
- )
- const kubernetesServiceName = "kubernetes"
- // Controller is the controller manager for the core bootstrap Kubernetes
- // controller loops, which manage creating the "kubernetes" service, the
- // "default", "kube-system" and "kube-public" namespaces, and provide the IP
- // repair check on service IPs
- type Controller struct {
- ServiceClient corev1client.ServicesGetter
- NamespaceClient corev1client.NamespacesGetter
- EventClient corev1client.EventsGetter
- healthClient rest.Interface
- ServiceClusterIPRegistry rangeallocation.RangeRegistry
- ServiceClusterIPInterval time.Duration
- ServiceClusterIPRange net.IPNet
- ServiceNodePortRegistry rangeallocation.RangeRegistry
- ServiceNodePortInterval time.Duration
- ServiceNodePortRange utilnet.PortRange
- EndpointReconciler reconcilers.EndpointReconciler
- EndpointInterval time.Duration
- SystemNamespaces []string
- SystemNamespacesInterval time.Duration
- PublicIP net.IP
- // ServiceIP indicates where the kubernetes service will live. It may not be nil.
- ServiceIP net.IP
- ServicePort int
- ExtraServicePorts []corev1.ServicePort
- ExtraEndpointPorts []corev1.EndpointPort
- PublicServicePort int
- KubernetesServiceNodePort int
- runner *async.Runner
- }
- // NewBootstrapController returns a controller for watching the core capabilities of the master
- func (c *completedConfig) NewBootstrapController(legacyRESTStorage corerest.LegacyRESTStorage, serviceClient corev1client.ServicesGetter, nsClient corev1client.NamespacesGetter, eventClient corev1client.EventsGetter, healthClient rest.Interface) *Controller {
- _, publicServicePort, err := c.GenericConfig.SecureServing.HostPort()
- if err != nil {
- klog.Fatalf("failed to get listener address: %v", err)
- }
- systemNamespaces := []string{metav1.NamespaceSystem, metav1.NamespacePublic}
- if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) {
- systemNamespaces = append(systemNamespaces, corev1.NamespaceNodeLease)
- }
- return &Controller{
- ServiceClient: serviceClient,
- NamespaceClient: nsClient,
- EventClient: eventClient,
- healthClient: healthClient,
- EndpointReconciler: c.ExtraConfig.EndpointReconcilerConfig.Reconciler,
- EndpointInterval: c.ExtraConfig.EndpointReconcilerConfig.Interval,
- SystemNamespaces: systemNamespaces,
- SystemNamespacesInterval: 1 * time.Minute,
- ServiceClusterIPRegistry: legacyRESTStorage.ServiceClusterIPAllocator,
- ServiceClusterIPRange: c.ExtraConfig.ServiceIPRange,
- ServiceClusterIPInterval: 3 * time.Minute,
- ServiceNodePortRegistry: legacyRESTStorage.ServiceNodePortAllocator,
- ServiceNodePortRange: c.ExtraConfig.ServiceNodePortRange,
- ServiceNodePortInterval: 3 * time.Minute,
- PublicIP: c.GenericConfig.PublicAddress,
- ServiceIP: c.ExtraConfig.APIServerServiceIP,
- ServicePort: c.ExtraConfig.APIServerServicePort,
- ExtraServicePorts: c.ExtraConfig.ExtraServicePorts,
- ExtraEndpointPorts: c.ExtraConfig.ExtraEndpointPorts,
- PublicServicePort: publicServicePort,
- KubernetesServiceNodePort: c.ExtraConfig.KubernetesServiceNodePort,
- }
- }
- func (c *Controller) PostStartHook(hookContext genericapiserver.PostStartHookContext) error {
- c.Start()
- return nil
- }
- func (c *Controller) PreShutdownHook() error {
- c.Stop()
- return nil
- }
- // Start begins the core controller loops that must exist for bootstrapping
- // a cluster.
- func (c *Controller) Start() {
- if c.runner != nil {
- return
- }
- // Reconcile during first run removing itself until server is ready.
- endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https", c.ExtraEndpointPorts)
- if err := c.EndpointReconciler.RemoveEndpoints(kubernetesServiceName, c.PublicIP, endpointPorts); err != nil {
- klog.Errorf("Unable to remove old endpoints from kubernetes service: %v", err)
- }
- repairClusterIPs := servicecontroller.NewRepair(c.ServiceClusterIPInterval, c.ServiceClient, c.EventClient, &c.ServiceClusterIPRange, c.ServiceClusterIPRegistry)
- repairNodePorts := portallocatorcontroller.NewRepair(c.ServiceNodePortInterval, c.ServiceClient, c.EventClient, c.ServiceNodePortRange, c.ServiceNodePortRegistry)
- // run all of the controllers once prior to returning from Start.
- if err := repairClusterIPs.RunOnce(); err != nil {
- // If we fail to repair cluster IPs apiserver is useless. We should restart and retry.
- klog.Fatalf("Unable to perform initial IP allocation check: %v", err)
- }
- if err := repairNodePorts.RunOnce(); err != nil {
- // If we fail to repair node ports apiserver is useless. We should restart and retry.
- klog.Fatalf("Unable to perform initial service nodePort check: %v", err)
- }
- c.runner = async.NewRunner(c.RunKubernetesNamespaces, c.RunKubernetesService, repairClusterIPs.RunUntil, repairNodePorts.RunUntil)
- c.runner.Start()
- }
- func (c *Controller) Stop() {
- if c.runner != nil {
- c.runner.Stop()
- }
- endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https", c.ExtraEndpointPorts)
- finishedReconciling := make(chan struct{})
- go func() {
- defer close(finishedReconciling)
- klog.Infof("Shutting down kubernetes service endpoint reconciler")
- c.EndpointReconciler.StopReconciling()
- if err := c.EndpointReconciler.RemoveEndpoints(kubernetesServiceName, c.PublicIP, endpointPorts); err != nil {
- klog.Error(err)
- }
- }()
- select {
- case <-finishedReconciling:
- // done
- case <-time.After(2 * c.EndpointInterval):
- // don't block server shutdown forever if we can't reach etcd to remove ourselves
- klog.Warning("RemoveEndpoints() timed out")
- }
- }
- // RunKubernetesNamespaces periodically makes sure that all internal namespaces exist
- func (c *Controller) RunKubernetesNamespaces(ch chan struct{}) {
- wait.Until(func() {
- // Loop the system namespace list, and create them if they do not exist
- for _, ns := range c.SystemNamespaces {
- if err := createNamespaceIfNeeded(c.NamespaceClient, ns); err != nil {
- runtime.HandleError(fmt.Errorf("unable to create required kubernetes system namespace %s: %v", ns, err))
- }
- }
- }, c.SystemNamespacesInterval, ch)
- }
- // RunKubernetesService periodically updates the kubernetes service
- func (c *Controller) RunKubernetesService(ch chan struct{}) {
- // wait until process is ready
- wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
- var code int
- c.healthClient.Get().AbsPath("/healthz").Do().StatusCode(&code)
- return code == http.StatusOK, nil
- }, ch)
- wait.NonSlidingUntil(func() {
- // Service definition is not reconciled after first
- // run, ports and type will be corrected only during
- // start.
- if err := c.UpdateKubernetesService(false); err != nil {
- runtime.HandleError(fmt.Errorf("unable to sync kubernetes service: %v", err))
- }
- }, c.EndpointInterval, ch)
- }
- // UpdateKubernetesService attempts to update the default Kube service.
- func (c *Controller) UpdateKubernetesService(reconcile bool) error {
- // Update service & endpoint records.
- // TODO: when it becomes possible to change this stuff,
- // stop polling and start watching.
- // TODO: add endpoints of all replicas, not just the elected master.
- if err := createNamespaceIfNeeded(c.NamespaceClient, metav1.NamespaceDefault); err != nil {
- return err
- }
- servicePorts, serviceType := createPortAndServiceSpec(c.ServicePort, c.PublicServicePort, c.KubernetesServiceNodePort, "https", c.ExtraServicePorts)
- if err := c.CreateOrUpdateMasterServiceIfNeeded(kubernetesServiceName, c.ServiceIP, servicePorts, serviceType, reconcile); err != nil {
- return err
- }
- endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https", c.ExtraEndpointPorts)
- if err := c.EndpointReconciler.ReconcileEndpoints(kubernetesServiceName, c.PublicIP, endpointPorts, reconcile); err != nil {
- return err
- }
- return nil
- }
- // createPortAndServiceSpec creates an array of service ports.
- // If the NodePort value is 0, just the servicePort is used, otherwise, a node port is exposed.
- func createPortAndServiceSpec(servicePort int, targetServicePort int, nodePort int, servicePortName string, extraServicePorts []corev1.ServicePort) ([]corev1.ServicePort, corev1.ServiceType) {
- //Use the Cluster IP type for the service port if NodePort isn't provided.
- //Otherwise, we will be binding the master service to a NodePort.
- servicePorts := []corev1.ServicePort{{Protocol: corev1.ProtocolTCP,
- Port: int32(servicePort),
- Name: servicePortName,
- TargetPort: intstr.FromInt(targetServicePort)}}
- serviceType := corev1.ServiceTypeClusterIP
- if nodePort > 0 {
- servicePorts[0].NodePort = int32(nodePort)
- serviceType = corev1.ServiceTypeNodePort
- }
- if extraServicePorts != nil {
- servicePorts = append(servicePorts, extraServicePorts...)
- }
- return servicePorts, serviceType
- }
- // createEndpointPortSpec creates an array of endpoint ports
- func createEndpointPortSpec(endpointPort int, endpointPortName string, extraEndpointPorts []corev1.EndpointPort) []corev1.EndpointPort {
- endpointPorts := []corev1.EndpointPort{{Protocol: corev1.ProtocolTCP,
- Port: int32(endpointPort),
- Name: endpointPortName,
- }}
- if extraEndpointPorts != nil {
- endpointPorts = append(endpointPorts, extraEndpointPorts...)
- }
- return endpointPorts
- }
- // CreateMasterServiceIfNeeded will create the specified service if it
- // doesn't already exist.
- func (c *Controller) CreateOrUpdateMasterServiceIfNeeded(serviceName string, serviceIP net.IP, servicePorts []corev1.ServicePort, serviceType corev1.ServiceType, reconcile bool) error {
- if s, err := c.ServiceClient.Services(metav1.NamespaceDefault).Get(serviceName, metav1.GetOptions{}); err == nil {
- // The service already exists.
- if reconcile {
- if svc, updated := reconcilers.GetMasterServiceUpdateIfNeeded(s, servicePorts, serviceType); updated {
- klog.Warningf("Resetting master service %q to %#v", serviceName, svc)
- _, err := c.ServiceClient.Services(metav1.NamespaceDefault).Update(svc)
- return err
- }
- }
- return nil
- }
- svc := &corev1.Service{
- ObjectMeta: metav1.ObjectMeta{
- Name: serviceName,
- Namespace: metav1.NamespaceDefault,
- Labels: map[string]string{"provider": "kubernetes", "component": "apiserver"},
- },
- Spec: corev1.ServiceSpec{
- Ports: servicePorts,
- // maintained by this code, not by the pod selector
- Selector: nil,
- ClusterIP: serviceIP.String(),
- SessionAffinity: corev1.ServiceAffinityNone,
- Type: serviceType,
- },
- }
- _, err := c.ServiceClient.Services(metav1.NamespaceDefault).Create(svc)
- if errors.IsAlreadyExists(err) {
- return c.CreateOrUpdateMasterServiceIfNeeded(serviceName, serviceIP, servicePorts, serviceType, reconcile)
- }
- return err
- }
|