controller.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package master
  14. import (
  15. "context"
  16. "fmt"
  17. "net"
  18. "net/http"
  19. "time"
  20. corev1 "k8s.io/api/core/v1"
  21. "k8s.io/apimachinery/pkg/api/errors"
  22. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  23. "k8s.io/apimachinery/pkg/util/intstr"
  24. utilnet "k8s.io/apimachinery/pkg/util/net"
  25. "k8s.io/apimachinery/pkg/util/runtime"
  26. "k8s.io/apimachinery/pkg/util/wait"
  27. genericapiserver "k8s.io/apiserver/pkg/server"
  28. corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
  29. "k8s.io/client-go/rest"
  30. "k8s.io/klog"
  31. "k8s.io/kubernetes/pkg/master/reconcilers"
  32. "k8s.io/kubernetes/pkg/registry/core/rangeallocation"
  33. corerest "k8s.io/kubernetes/pkg/registry/core/rest"
  34. servicecontroller "k8s.io/kubernetes/pkg/registry/core/service/ipallocator/controller"
  35. portallocatorcontroller "k8s.io/kubernetes/pkg/registry/core/service/portallocator/controller"
  36. "k8s.io/kubernetes/pkg/util/async"
  37. )
  38. const kubernetesServiceName = "kubernetes"
  39. // Controller is the controller manager for the core bootstrap Kubernetes
  40. // controller loops, which manage creating the "kubernetes" service, the
  41. // "default", "kube-system" and "kube-public" namespaces, and provide the IP
  42. // repair check on service IPs
  43. type Controller struct {
  44. ServiceClient corev1client.ServicesGetter
  45. NamespaceClient corev1client.NamespacesGetter
  46. EventClient corev1client.EventsGetter
  47. healthClient rest.Interface
  48. ServiceClusterIPRegistry rangeallocation.RangeRegistry
  49. ServiceClusterIPRange net.IPNet
  50. SecondaryServiceClusterIPRegistry rangeallocation.RangeRegistry
  51. SecondaryServiceClusterIPRange net.IPNet
  52. ServiceClusterIPInterval time.Duration
  53. ServiceNodePortRegistry rangeallocation.RangeRegistry
  54. ServiceNodePortInterval time.Duration
  55. ServiceNodePortRange utilnet.PortRange
  56. EndpointReconciler reconcilers.EndpointReconciler
  57. EndpointInterval time.Duration
  58. SystemNamespaces []string
  59. SystemNamespacesInterval time.Duration
  60. PublicIP net.IP
  61. // ServiceIP indicates where the kubernetes service will live. It may not be nil.
  62. ServiceIP net.IP
  63. ServicePort int
  64. ExtraServicePorts []corev1.ServicePort
  65. ExtraEndpointPorts []corev1.EndpointPort
  66. PublicServicePort int
  67. KubernetesServiceNodePort int
  68. runner *async.Runner
  69. }
  70. // NewBootstrapController returns a controller for watching the core capabilities of the master
  71. func (c *completedConfig) NewBootstrapController(legacyRESTStorage corerest.LegacyRESTStorage, serviceClient corev1client.ServicesGetter, nsClient corev1client.NamespacesGetter, eventClient corev1client.EventsGetter, healthClient rest.Interface) *Controller {
  72. _, publicServicePort, err := c.GenericConfig.SecureServing.HostPort()
  73. if err != nil {
  74. klog.Fatalf("failed to get listener address: %v", err)
  75. }
  76. systemNamespaces := []string{metav1.NamespaceSystem, metav1.NamespacePublic, corev1.NamespaceNodeLease}
  77. return &Controller{
  78. ServiceClient: serviceClient,
  79. NamespaceClient: nsClient,
  80. EventClient: eventClient,
  81. healthClient: healthClient,
  82. EndpointReconciler: c.ExtraConfig.EndpointReconcilerConfig.Reconciler,
  83. EndpointInterval: c.ExtraConfig.EndpointReconcilerConfig.Interval,
  84. SystemNamespaces: systemNamespaces,
  85. SystemNamespacesInterval: 1 * time.Minute,
  86. ServiceClusterIPRegistry: legacyRESTStorage.ServiceClusterIPAllocator,
  87. ServiceClusterIPRange: c.ExtraConfig.ServiceIPRange,
  88. SecondaryServiceClusterIPRegistry: legacyRESTStorage.SecondaryServiceClusterIPAllocator,
  89. SecondaryServiceClusterIPRange: c.ExtraConfig.SecondaryServiceIPRange,
  90. ServiceClusterIPInterval: 3 * time.Minute,
  91. ServiceNodePortRegistry: legacyRESTStorage.ServiceNodePortAllocator,
  92. ServiceNodePortRange: c.ExtraConfig.ServiceNodePortRange,
  93. ServiceNodePortInterval: 3 * time.Minute,
  94. PublicIP: c.GenericConfig.PublicAddress,
  95. ServiceIP: c.ExtraConfig.APIServerServiceIP,
  96. ServicePort: c.ExtraConfig.APIServerServicePort,
  97. ExtraServicePorts: c.ExtraConfig.ExtraServicePorts,
  98. ExtraEndpointPorts: c.ExtraConfig.ExtraEndpointPorts,
  99. PublicServicePort: publicServicePort,
  100. KubernetesServiceNodePort: c.ExtraConfig.KubernetesServiceNodePort,
  101. }
  102. }
  103. // PostStartHook initiates the core controller loops that must exist for bootstrapping.
  104. func (c *Controller) PostStartHook(hookContext genericapiserver.PostStartHookContext) error {
  105. c.Start()
  106. return nil
  107. }
  108. // PreShutdownHook triggers the actions needed to shut down the API Server cleanly.
  109. func (c *Controller) PreShutdownHook() error {
  110. c.Stop()
  111. return nil
  112. }
  113. // Start begins the core controller loops that must exist for bootstrapping
  114. // a cluster.
  115. func (c *Controller) Start() {
  116. if c.runner != nil {
  117. return
  118. }
  119. // Reconcile during first run removing itself until server is ready.
  120. endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https", c.ExtraEndpointPorts)
  121. if err := c.EndpointReconciler.RemoveEndpoints(kubernetesServiceName, c.PublicIP, endpointPorts); err != nil {
  122. klog.Errorf("Unable to remove old endpoints from kubernetes service: %v", err)
  123. }
  124. repairClusterIPs := servicecontroller.NewRepair(c.ServiceClusterIPInterval, c.ServiceClient, c.EventClient, &c.ServiceClusterIPRange, c.ServiceClusterIPRegistry, &c.SecondaryServiceClusterIPRange, c.SecondaryServiceClusterIPRegistry)
  125. repairNodePorts := portallocatorcontroller.NewRepair(c.ServiceNodePortInterval, c.ServiceClient, c.EventClient, c.ServiceNodePortRange, c.ServiceNodePortRegistry)
  126. // run all of the controllers once prior to returning from Start.
  127. if err := repairClusterIPs.RunOnce(); err != nil {
  128. // If we fail to repair cluster IPs apiserver is useless. We should restart and retry.
  129. klog.Fatalf("Unable to perform initial IP allocation check: %v", err)
  130. }
  131. if err := repairNodePorts.RunOnce(); err != nil {
  132. // If we fail to repair node ports apiserver is useless. We should restart and retry.
  133. klog.Fatalf("Unable to perform initial service nodePort check: %v", err)
  134. }
  135. c.runner = async.NewRunner(c.RunKubernetesNamespaces, c.RunKubernetesService, repairClusterIPs.RunUntil, repairNodePorts.RunUntil)
  136. c.runner.Start()
  137. }
  138. // Stop cleans up this API Servers endpoint reconciliation leases so another master can take over more quickly.
  139. func (c *Controller) Stop() {
  140. if c.runner != nil {
  141. c.runner.Stop()
  142. }
  143. endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https", c.ExtraEndpointPorts)
  144. finishedReconciling := make(chan struct{})
  145. go func() {
  146. defer close(finishedReconciling)
  147. klog.Infof("Shutting down kubernetes service endpoint reconciler")
  148. c.EndpointReconciler.StopReconciling()
  149. if err := c.EndpointReconciler.RemoveEndpoints(kubernetesServiceName, c.PublicIP, endpointPorts); err != nil {
  150. klog.Error(err)
  151. }
  152. }()
  153. select {
  154. case <-finishedReconciling:
  155. // done
  156. case <-time.After(2 * c.EndpointInterval):
  157. // don't block server shutdown forever if we can't reach etcd to remove ourselves
  158. klog.Warning("RemoveEndpoints() timed out")
  159. }
  160. }
  161. // RunKubernetesNamespaces periodically makes sure that all internal namespaces exist
  162. func (c *Controller) RunKubernetesNamespaces(ch chan struct{}) {
  163. wait.Until(func() {
  164. // Loop the system namespace list, and create them if they do not exist
  165. for _, ns := range c.SystemNamespaces {
  166. if err := createNamespaceIfNeeded(c.NamespaceClient, ns); err != nil {
  167. runtime.HandleError(fmt.Errorf("unable to create required kubernetes system namespace %s: %v", ns, err))
  168. }
  169. }
  170. }, c.SystemNamespacesInterval, ch)
  171. }
  172. // RunKubernetesService periodically updates the kubernetes service
  173. func (c *Controller) RunKubernetesService(ch chan struct{}) {
  174. // wait until process is ready
  175. wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
  176. var code int
  177. c.healthClient.Get().AbsPath("/healthz").Do(context.TODO()).StatusCode(&code)
  178. return code == http.StatusOK, nil
  179. }, ch)
  180. wait.NonSlidingUntil(func() {
  181. // Service definition is not reconciled after first
  182. // run, ports and type will be corrected only during
  183. // start.
  184. if err := c.UpdateKubernetesService(false); err != nil {
  185. runtime.HandleError(fmt.Errorf("unable to sync kubernetes service: %v", err))
  186. }
  187. }, c.EndpointInterval, ch)
  188. }
  189. // UpdateKubernetesService attempts to update the default Kube service.
  190. func (c *Controller) UpdateKubernetesService(reconcile bool) error {
  191. // Update service & endpoint records.
  192. // TODO: when it becomes possible to change this stuff,
  193. // stop polling and start watching.
  194. // TODO: add endpoints of all replicas, not just the elected master.
  195. if err := createNamespaceIfNeeded(c.NamespaceClient, metav1.NamespaceDefault); err != nil {
  196. return err
  197. }
  198. servicePorts, serviceType := createPortAndServiceSpec(c.ServicePort, c.PublicServicePort, c.KubernetesServiceNodePort, "https", c.ExtraServicePorts)
  199. if err := c.CreateOrUpdateMasterServiceIfNeeded(kubernetesServiceName, c.ServiceIP, servicePorts, serviceType, reconcile); err != nil {
  200. return err
  201. }
  202. endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https", c.ExtraEndpointPorts)
  203. if err := c.EndpointReconciler.ReconcileEndpoints(kubernetesServiceName, c.PublicIP, endpointPorts, reconcile); err != nil {
  204. return err
  205. }
  206. return nil
  207. }
  208. // createPortAndServiceSpec creates an array of service ports.
  209. // If the NodePort value is 0, just the servicePort is used, otherwise, a node port is exposed.
  210. func createPortAndServiceSpec(servicePort int, targetServicePort int, nodePort int, servicePortName string, extraServicePorts []corev1.ServicePort) ([]corev1.ServicePort, corev1.ServiceType) {
  211. //Use the Cluster IP type for the service port if NodePort isn't provided.
  212. //Otherwise, we will be binding the master service to a NodePort.
  213. servicePorts := []corev1.ServicePort{{Protocol: corev1.ProtocolTCP,
  214. Port: int32(servicePort),
  215. Name: servicePortName,
  216. TargetPort: intstr.FromInt(targetServicePort)}}
  217. serviceType := corev1.ServiceTypeClusterIP
  218. if nodePort > 0 {
  219. servicePorts[0].NodePort = int32(nodePort)
  220. serviceType = corev1.ServiceTypeNodePort
  221. }
  222. if extraServicePorts != nil {
  223. servicePorts = append(servicePorts, extraServicePorts...)
  224. }
  225. return servicePorts, serviceType
  226. }
  227. // createEndpointPortSpec creates an array of endpoint ports
  228. func createEndpointPortSpec(endpointPort int, endpointPortName string, extraEndpointPorts []corev1.EndpointPort) []corev1.EndpointPort {
  229. endpointPorts := []corev1.EndpointPort{{Protocol: corev1.ProtocolTCP,
  230. Port: int32(endpointPort),
  231. Name: endpointPortName,
  232. }}
  233. if extraEndpointPorts != nil {
  234. endpointPorts = append(endpointPorts, extraEndpointPorts...)
  235. }
  236. return endpointPorts
  237. }
  238. // CreateOrUpdateMasterServiceIfNeeded will create the specified service if it
  239. // doesn't already exist.
  240. func (c *Controller) CreateOrUpdateMasterServiceIfNeeded(serviceName string, serviceIP net.IP, servicePorts []corev1.ServicePort, serviceType corev1.ServiceType, reconcile bool) error {
  241. if s, err := c.ServiceClient.Services(metav1.NamespaceDefault).Get(context.TODO(), serviceName, metav1.GetOptions{}); err == nil {
  242. // The service already exists.
  243. if reconcile {
  244. if svc, updated := reconcilers.GetMasterServiceUpdateIfNeeded(s, servicePorts, serviceType); updated {
  245. klog.Warningf("Resetting master service %q to %#v", serviceName, svc)
  246. _, err := c.ServiceClient.Services(metav1.NamespaceDefault).Update(context.TODO(), svc, metav1.UpdateOptions{})
  247. return err
  248. }
  249. }
  250. return nil
  251. }
  252. svc := &corev1.Service{
  253. ObjectMeta: metav1.ObjectMeta{
  254. Name: serviceName,
  255. Namespace: metav1.NamespaceDefault,
  256. Labels: map[string]string{"provider": "kubernetes", "component": "apiserver"},
  257. },
  258. Spec: corev1.ServiceSpec{
  259. Ports: servicePorts,
  260. // maintained by this code, not by the pod selector
  261. Selector: nil,
  262. ClusterIP: serviceIP.String(),
  263. SessionAffinity: corev1.ServiceAffinityNone,
  264. Type: serviceType,
  265. },
  266. }
  267. _, err := c.ServiceClient.Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{})
  268. if errors.IsAlreadyExists(err) {
  269. return c.CreateOrUpdateMasterServiceIfNeeded(serviceName, serviceIP, servicePorts, serviceType, reconcile)
  270. }
  271. return err
  272. }