controller.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package master
  14. import (
  15. "fmt"
  16. "net"
  17. "net/http"
  18. "time"
  19. corev1 "k8s.io/api/core/v1"
  20. "k8s.io/apimachinery/pkg/api/errors"
  21. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  22. "k8s.io/apimachinery/pkg/util/intstr"
  23. utilnet "k8s.io/apimachinery/pkg/util/net"
  24. "k8s.io/apimachinery/pkg/util/runtime"
  25. "k8s.io/apimachinery/pkg/util/wait"
  26. genericapiserver "k8s.io/apiserver/pkg/server"
  27. utilfeature "k8s.io/apiserver/pkg/util/feature"
  28. corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
  29. "k8s.io/client-go/rest"
  30. "k8s.io/klog"
  31. "k8s.io/kubernetes/pkg/features"
  32. "k8s.io/kubernetes/pkg/master/reconcilers"
  33. "k8s.io/kubernetes/pkg/registry/core/rangeallocation"
  34. corerest "k8s.io/kubernetes/pkg/registry/core/rest"
  35. servicecontroller "k8s.io/kubernetes/pkg/registry/core/service/ipallocator/controller"
  36. portallocatorcontroller "k8s.io/kubernetes/pkg/registry/core/service/portallocator/controller"
  37. "k8s.io/kubernetes/pkg/util/async"
  38. )
  39. const kubernetesServiceName = "kubernetes"
  40. // Controller is the controller manager for the core bootstrap Kubernetes
  41. // controller loops, which manage creating the "kubernetes" service, the
  42. // "default", "kube-system" and "kube-public" namespaces, and provide the IP
  43. // repair check on service IPs
  44. type Controller struct {
  45. ServiceClient corev1client.ServicesGetter
  46. NamespaceClient corev1client.NamespacesGetter
  47. EventClient corev1client.EventsGetter
  48. healthClient rest.Interface
  49. ServiceClusterIPRegistry rangeallocation.RangeRegistry
  50. ServiceClusterIPInterval time.Duration
  51. ServiceClusterIPRange net.IPNet
  52. ServiceNodePortRegistry rangeallocation.RangeRegistry
  53. ServiceNodePortInterval time.Duration
  54. ServiceNodePortRange utilnet.PortRange
  55. EndpointReconciler reconcilers.EndpointReconciler
  56. EndpointInterval time.Duration
  57. SystemNamespaces []string
  58. SystemNamespacesInterval time.Duration
  59. PublicIP net.IP
  60. // ServiceIP indicates where the kubernetes service will live. It may not be nil.
  61. ServiceIP net.IP
  62. ServicePort int
  63. ExtraServicePorts []corev1.ServicePort
  64. ExtraEndpointPorts []corev1.EndpointPort
  65. PublicServicePort int
  66. KubernetesServiceNodePort int
  67. runner *async.Runner
  68. }
  69. // NewBootstrapController returns a controller for watching the core capabilities of the master
  70. func (c *completedConfig) NewBootstrapController(legacyRESTStorage corerest.LegacyRESTStorage, serviceClient corev1client.ServicesGetter, nsClient corev1client.NamespacesGetter, eventClient corev1client.EventsGetter, healthClient rest.Interface) *Controller {
  71. _, publicServicePort, err := c.GenericConfig.SecureServing.HostPort()
  72. if err != nil {
  73. klog.Fatalf("failed to get listener address: %v", err)
  74. }
  75. systemNamespaces := []string{metav1.NamespaceSystem, metav1.NamespacePublic}
  76. if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) {
  77. systemNamespaces = append(systemNamespaces, corev1.NamespaceNodeLease)
  78. }
  79. return &Controller{
  80. ServiceClient: serviceClient,
  81. NamespaceClient: nsClient,
  82. EventClient: eventClient,
  83. healthClient: healthClient,
  84. EndpointReconciler: c.ExtraConfig.EndpointReconcilerConfig.Reconciler,
  85. EndpointInterval: c.ExtraConfig.EndpointReconcilerConfig.Interval,
  86. SystemNamespaces: systemNamespaces,
  87. SystemNamespacesInterval: 1 * time.Minute,
  88. ServiceClusterIPRegistry: legacyRESTStorage.ServiceClusterIPAllocator,
  89. ServiceClusterIPRange: c.ExtraConfig.ServiceIPRange,
  90. ServiceClusterIPInterval: 3 * time.Minute,
  91. ServiceNodePortRegistry: legacyRESTStorage.ServiceNodePortAllocator,
  92. ServiceNodePortRange: c.ExtraConfig.ServiceNodePortRange,
  93. ServiceNodePortInterval: 3 * time.Minute,
  94. PublicIP: c.GenericConfig.PublicAddress,
  95. ServiceIP: c.ExtraConfig.APIServerServiceIP,
  96. ServicePort: c.ExtraConfig.APIServerServicePort,
  97. ExtraServicePorts: c.ExtraConfig.ExtraServicePorts,
  98. ExtraEndpointPorts: c.ExtraConfig.ExtraEndpointPorts,
  99. PublicServicePort: publicServicePort,
  100. KubernetesServiceNodePort: c.ExtraConfig.KubernetesServiceNodePort,
  101. }
  102. }
  103. func (c *Controller) PostStartHook(hookContext genericapiserver.PostStartHookContext) error {
  104. c.Start()
  105. return nil
  106. }
  107. func (c *Controller) PreShutdownHook() error {
  108. c.Stop()
  109. return nil
  110. }
  111. // Start begins the core controller loops that must exist for bootstrapping
  112. // a cluster.
  113. func (c *Controller) Start() {
  114. if c.runner != nil {
  115. return
  116. }
  117. // Reconcile during first run removing itself until server is ready.
  118. endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https", c.ExtraEndpointPorts)
  119. if err := c.EndpointReconciler.RemoveEndpoints(kubernetesServiceName, c.PublicIP, endpointPorts); err != nil {
  120. klog.Errorf("Unable to remove old endpoints from kubernetes service: %v", err)
  121. }
  122. repairClusterIPs := servicecontroller.NewRepair(c.ServiceClusterIPInterval, c.ServiceClient, c.EventClient, &c.ServiceClusterIPRange, c.ServiceClusterIPRegistry)
  123. repairNodePorts := portallocatorcontroller.NewRepair(c.ServiceNodePortInterval, c.ServiceClient, c.EventClient, c.ServiceNodePortRange, c.ServiceNodePortRegistry)
  124. // run all of the controllers once prior to returning from Start.
  125. if err := repairClusterIPs.RunOnce(); err != nil {
  126. // If we fail to repair cluster IPs apiserver is useless. We should restart and retry.
  127. klog.Fatalf("Unable to perform initial IP allocation check: %v", err)
  128. }
  129. if err := repairNodePorts.RunOnce(); err != nil {
  130. // If we fail to repair node ports apiserver is useless. We should restart and retry.
  131. klog.Fatalf("Unable to perform initial service nodePort check: %v", err)
  132. }
  133. c.runner = async.NewRunner(c.RunKubernetesNamespaces, c.RunKubernetesService, repairClusterIPs.RunUntil, repairNodePorts.RunUntil)
  134. c.runner.Start()
  135. }
  136. func (c *Controller) Stop() {
  137. if c.runner != nil {
  138. c.runner.Stop()
  139. }
  140. endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https", c.ExtraEndpointPorts)
  141. finishedReconciling := make(chan struct{})
  142. go func() {
  143. defer close(finishedReconciling)
  144. klog.Infof("Shutting down kubernetes service endpoint reconciler")
  145. c.EndpointReconciler.StopReconciling()
  146. if err := c.EndpointReconciler.RemoveEndpoints(kubernetesServiceName, c.PublicIP, endpointPorts); err != nil {
  147. klog.Error(err)
  148. }
  149. }()
  150. select {
  151. case <-finishedReconciling:
  152. // done
  153. case <-time.After(2 * c.EndpointInterval):
  154. // don't block server shutdown forever if we can't reach etcd to remove ourselves
  155. klog.Warning("RemoveEndpoints() timed out")
  156. }
  157. }
  158. // RunKubernetesNamespaces periodically makes sure that all internal namespaces exist
  159. func (c *Controller) RunKubernetesNamespaces(ch chan struct{}) {
  160. wait.Until(func() {
  161. // Loop the system namespace list, and create them if they do not exist
  162. for _, ns := range c.SystemNamespaces {
  163. if err := createNamespaceIfNeeded(c.NamespaceClient, ns); err != nil {
  164. runtime.HandleError(fmt.Errorf("unable to create required kubernetes system namespace %s: %v", ns, err))
  165. }
  166. }
  167. }, c.SystemNamespacesInterval, ch)
  168. }
  169. // RunKubernetesService periodically updates the kubernetes service
  170. func (c *Controller) RunKubernetesService(ch chan struct{}) {
  171. // wait until process is ready
  172. wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
  173. var code int
  174. c.healthClient.Get().AbsPath("/healthz").Do().StatusCode(&code)
  175. return code == http.StatusOK, nil
  176. }, ch)
  177. wait.NonSlidingUntil(func() {
  178. // Service definition is not reconciled after first
  179. // run, ports and type will be corrected only during
  180. // start.
  181. if err := c.UpdateKubernetesService(false); err != nil {
  182. runtime.HandleError(fmt.Errorf("unable to sync kubernetes service: %v", err))
  183. }
  184. }, c.EndpointInterval, ch)
  185. }
  186. // UpdateKubernetesService attempts to update the default Kube service.
  187. func (c *Controller) UpdateKubernetesService(reconcile bool) error {
  188. // Update service & endpoint records.
  189. // TODO: when it becomes possible to change this stuff,
  190. // stop polling and start watching.
  191. // TODO: add endpoints of all replicas, not just the elected master.
  192. if err := createNamespaceIfNeeded(c.NamespaceClient, metav1.NamespaceDefault); err != nil {
  193. return err
  194. }
  195. servicePorts, serviceType := createPortAndServiceSpec(c.ServicePort, c.PublicServicePort, c.KubernetesServiceNodePort, "https", c.ExtraServicePorts)
  196. if err := c.CreateOrUpdateMasterServiceIfNeeded(kubernetesServiceName, c.ServiceIP, servicePorts, serviceType, reconcile); err != nil {
  197. return err
  198. }
  199. endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https", c.ExtraEndpointPorts)
  200. if err := c.EndpointReconciler.ReconcileEndpoints(kubernetesServiceName, c.PublicIP, endpointPorts, reconcile); err != nil {
  201. return err
  202. }
  203. return nil
  204. }
  205. // createPortAndServiceSpec creates an array of service ports.
  206. // If the NodePort value is 0, just the servicePort is used, otherwise, a node port is exposed.
  207. func createPortAndServiceSpec(servicePort int, targetServicePort int, nodePort int, servicePortName string, extraServicePorts []corev1.ServicePort) ([]corev1.ServicePort, corev1.ServiceType) {
  208. //Use the Cluster IP type for the service port if NodePort isn't provided.
  209. //Otherwise, we will be binding the master service to a NodePort.
  210. servicePorts := []corev1.ServicePort{{Protocol: corev1.ProtocolTCP,
  211. Port: int32(servicePort),
  212. Name: servicePortName,
  213. TargetPort: intstr.FromInt(targetServicePort)}}
  214. serviceType := corev1.ServiceTypeClusterIP
  215. if nodePort > 0 {
  216. servicePorts[0].NodePort = int32(nodePort)
  217. serviceType = corev1.ServiceTypeNodePort
  218. }
  219. if extraServicePorts != nil {
  220. servicePorts = append(servicePorts, extraServicePorts...)
  221. }
  222. return servicePorts, serviceType
  223. }
  224. // createEndpointPortSpec creates an array of endpoint ports
  225. func createEndpointPortSpec(endpointPort int, endpointPortName string, extraEndpointPorts []corev1.EndpointPort) []corev1.EndpointPort {
  226. endpointPorts := []corev1.EndpointPort{{Protocol: corev1.ProtocolTCP,
  227. Port: int32(endpointPort),
  228. Name: endpointPortName,
  229. }}
  230. if extraEndpointPorts != nil {
  231. endpointPorts = append(endpointPorts, extraEndpointPorts...)
  232. }
  233. return endpointPorts
  234. }
  235. // CreateMasterServiceIfNeeded will create the specified service if it
  236. // doesn't already exist.
  237. func (c *Controller) CreateOrUpdateMasterServiceIfNeeded(serviceName string, serviceIP net.IP, servicePorts []corev1.ServicePort, serviceType corev1.ServiceType, reconcile bool) error {
  238. if s, err := c.ServiceClient.Services(metav1.NamespaceDefault).Get(serviceName, metav1.GetOptions{}); err == nil {
  239. // The service already exists.
  240. if reconcile {
  241. if svc, updated := reconcilers.GetMasterServiceUpdateIfNeeded(s, servicePorts, serviceType); updated {
  242. klog.Warningf("Resetting master service %q to %#v", serviceName, svc)
  243. _, err := c.ServiceClient.Services(metav1.NamespaceDefault).Update(svc)
  244. return err
  245. }
  246. }
  247. return nil
  248. }
  249. svc := &corev1.Service{
  250. ObjectMeta: metav1.ObjectMeta{
  251. Name: serviceName,
  252. Namespace: metav1.NamespaceDefault,
  253. Labels: map[string]string{"provider": "kubernetes", "component": "apiserver"},
  254. },
  255. Spec: corev1.ServiceSpec{
  256. Ports: servicePorts,
  257. // maintained by this code, not by the pod selector
  258. Selector: nil,
  259. ClusterIP: serviceIP.String(),
  260. SessionAffinity: corev1.ServiceAffinityNone,
  261. Type: serviceType,
  262. },
  263. }
  264. _, err := c.ServiceClient.Services(metav1.NamespaceDefault).Create(svc)
  265. if errors.IsAlreadyExists(err) {
  266. return c.CreateOrUpdateMasterServiceIfNeeded(serviceName, serviceIP, servicePorts, serviceType, reconcile)
  267. }
  268. return err
  269. }