service_controller.go 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package service
  14. import (
  15. "context"
  16. "fmt"
  17. "sync"
  18. "time"
  19. "reflect"
  20. v1 "k8s.io/api/core/v1"
  21. "k8s.io/apimachinery/pkg/api/errors"
  22. "k8s.io/apimachinery/pkg/util/runtime"
  23. "k8s.io/apimachinery/pkg/util/sets"
  24. "k8s.io/apimachinery/pkg/util/wait"
  25. utilfeature "k8s.io/apiserver/pkg/util/feature"
  26. coreinformers "k8s.io/client-go/informers/core/v1"
  27. clientset "k8s.io/client-go/kubernetes"
  28. "k8s.io/client-go/kubernetes/scheme"
  29. v1core "k8s.io/client-go/kubernetes/typed/core/v1"
  30. corelisters "k8s.io/client-go/listers/core/v1"
  31. "k8s.io/client-go/tools/cache"
  32. "k8s.io/client-go/tools/record"
  33. "k8s.io/client-go/util/workqueue"
  34. cloudprovider "k8s.io/cloud-provider"
  35. servicehelper "k8s.io/cloud-provider/service/helpers"
  36. "k8s.io/klog"
  37. v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
  38. "k8s.io/kubernetes/pkg/controller"
  39. kubefeatures "k8s.io/kubernetes/pkg/features"
  40. "k8s.io/kubernetes/pkg/util/metrics"
  41. "k8s.io/kubernetes/pkg/util/slice"
  42. )
  43. const (
  44. // Interval of synchronizing service status from apiserver
  45. serviceSyncPeriod = 30 * time.Second
  46. // Interval of synchronizing node status from apiserver
  47. nodeSyncPeriod = 100 * time.Second
  48. // How long to wait before retrying the processing of a service change.
  49. // If this changes, the sleep in hack/jenkins/e2e.sh before downing a cluster
  50. // should be changed appropriately.
  51. minRetryDelay = 5 * time.Second
  52. maxRetryDelay = 300 * time.Second
  53. clientRetryCount = 5
  54. clientRetryInterval = 5 * time.Second
  55. // LabelNodeRoleMaster specifies that a node is a master
  56. // It's copied over to kubeadm until it's merged in core: https://github.com/kubernetes/kubernetes/pull/39112
  57. LabelNodeRoleMaster = "node-role.kubernetes.io/master"
  58. // LabelNodeRoleExcludeBalancer specifies that the node should be
  59. // exclude from load balancers created by a cloud provider.
  60. LabelNodeRoleExcludeBalancer = "alpha.service-controller.kubernetes.io/exclude-balancer"
  61. )
  62. type cachedService struct {
  63. // The cached state of the service
  64. state *v1.Service
  65. }
  66. type serviceCache struct {
  67. mu sync.Mutex // protects serviceMap
  68. serviceMap map[string]*cachedService
  69. }
  70. // ServiceController keeps cloud provider service resources
  71. // (like load balancers) in sync with the registry.
  72. type ServiceController struct {
  73. cloud cloudprovider.Interface
  74. knownHosts []*v1.Node
  75. servicesToUpdate []*v1.Service
  76. kubeClient clientset.Interface
  77. clusterName string
  78. balancer cloudprovider.LoadBalancer
  79. cache *serviceCache
  80. serviceLister corelisters.ServiceLister
  81. serviceListerSynced cache.InformerSynced
  82. eventBroadcaster record.EventBroadcaster
  83. eventRecorder record.EventRecorder
  84. nodeLister corelisters.NodeLister
  85. nodeListerSynced cache.InformerSynced
  86. // services that need to be synced
  87. queue workqueue.RateLimitingInterface
  88. }
  89. // New returns a new service controller to keep cloud provider service resources
  90. // (like load balancers) in sync with the registry.
  91. func New(
  92. cloud cloudprovider.Interface,
  93. kubeClient clientset.Interface,
  94. serviceInformer coreinformers.ServiceInformer,
  95. nodeInformer coreinformers.NodeInformer,
  96. clusterName string,
  97. ) (*ServiceController, error) {
  98. broadcaster := record.NewBroadcaster()
  99. broadcaster.StartLogging(klog.Infof)
  100. broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
  101. recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "service-controller"})
  102. if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
  103. if err := metrics.RegisterMetricAndTrackRateLimiterUsage("service_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()); err != nil {
  104. return nil, err
  105. }
  106. }
  107. s := &ServiceController{
  108. cloud: cloud,
  109. knownHosts: []*v1.Node{},
  110. kubeClient: kubeClient,
  111. clusterName: clusterName,
  112. cache: &serviceCache{serviceMap: make(map[string]*cachedService)},
  113. eventBroadcaster: broadcaster,
  114. eventRecorder: recorder,
  115. nodeLister: nodeInformer.Lister(),
  116. nodeListerSynced: nodeInformer.Informer().HasSynced,
  117. queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"),
  118. }
  119. serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
  120. cache.ResourceEventHandlerFuncs{
  121. AddFunc: func(cur interface{}) {
  122. svc, ok := cur.(*v1.Service)
  123. if ok && (wantsLoadBalancer(svc) || needsCleanup(svc)) {
  124. s.enqueueService(cur)
  125. }
  126. },
  127. UpdateFunc: func(old, cur interface{}) {
  128. oldSvc, ok1 := old.(*v1.Service)
  129. curSvc, ok2 := cur.(*v1.Service)
  130. if ok1 && ok2 && (s.needsUpdate(oldSvc, curSvc) || needsCleanup(curSvc)) {
  131. s.enqueueService(cur)
  132. }
  133. },
  134. DeleteFunc: func(old interface{}) {
  135. if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.ServiceLoadBalancerFinalizer) {
  136. // No need to handle deletion event if finalizer feature gate is
  137. // enabled. Because the deletion would be handled by the update
  138. // path when the deletion timestamp is added.
  139. return
  140. }
  141. s.enqueueService(old)
  142. },
  143. },
  144. serviceSyncPeriod,
  145. )
  146. s.serviceLister = serviceInformer.Lister()
  147. s.serviceListerSynced = serviceInformer.Informer().HasSynced
  148. if err := s.init(); err != nil {
  149. return nil, err
  150. }
  151. return s, nil
  152. }
  153. // obj could be an *v1.Service, or a DeletionFinalStateUnknown marker item.
  154. func (s *ServiceController) enqueueService(obj interface{}) {
  155. key, err := controller.KeyFunc(obj)
  156. if err != nil {
  157. runtime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", obj, err))
  158. return
  159. }
  160. s.queue.Add(key)
  161. }
  162. // Run starts a background goroutine that watches for changes to services that
  163. // have (or had) LoadBalancers=true and ensures that they have
  164. // load balancers created and deleted appropriately.
  165. // serviceSyncPeriod controls how often we check the cluster's services to
  166. // ensure that the correct load balancers exist.
  167. // nodeSyncPeriod controls how often we check the cluster's nodes to determine
  168. // if load balancers need to be updated to point to a new set.
  169. //
  170. // It's an error to call Run() more than once for a given ServiceController
  171. // object.
  172. func (s *ServiceController) Run(stopCh <-chan struct{}, workers int) {
  173. defer runtime.HandleCrash()
  174. defer s.queue.ShutDown()
  175. klog.Info("Starting service controller")
  176. defer klog.Info("Shutting down service controller")
  177. if !controller.WaitForCacheSync("service", stopCh, s.serviceListerSynced, s.nodeListerSynced) {
  178. return
  179. }
  180. for i := 0; i < workers; i++ {
  181. go wait.Until(s.worker, time.Second, stopCh)
  182. }
  183. go wait.Until(s.nodeSyncLoop, nodeSyncPeriod, stopCh)
  184. <-stopCh
  185. }
  186. // worker runs a worker thread that just dequeues items, processes them, and marks them done.
  187. // It enforces that the syncHandler is never invoked concurrently with the same key.
  188. func (s *ServiceController) worker() {
  189. for s.processNextWorkItem() {
  190. }
  191. }
  192. func (s *ServiceController) processNextWorkItem() bool {
  193. key, quit := s.queue.Get()
  194. if quit {
  195. return false
  196. }
  197. defer s.queue.Done(key)
  198. err := s.syncService(key.(string))
  199. if err == nil {
  200. s.queue.Forget(key)
  201. return true
  202. }
  203. runtime.HandleError(fmt.Errorf("error processing service %v (will retry): %v", key, err))
  204. s.queue.AddRateLimited(key)
  205. return true
  206. }
  207. func (s *ServiceController) init() error {
  208. if s.cloud == nil {
  209. return fmt.Errorf("WARNING: no cloud provider provided, services of type LoadBalancer will fail")
  210. }
  211. balancer, ok := s.cloud.LoadBalancer()
  212. if !ok {
  213. return fmt.Errorf("the cloud provider does not support external load balancers")
  214. }
  215. s.balancer = balancer
  216. return nil
  217. }
  218. // processServiceCreateOrUpdate operates loadbalancers for the incoming service accordingly.
  219. // Returns an error if processing the service update failed.
  220. func (s *ServiceController) processServiceCreateOrUpdate(service *v1.Service, key string) error {
  221. // TODO(@MrHohn): Remove the cache once we get rid of the non-finalizer deletion
  222. // path. Ref https://github.com/kubernetes/enhancements/issues/980.
  223. cachedService := s.cache.getOrCreate(key)
  224. if cachedService.state != nil && cachedService.state.UID != service.UID {
  225. // This happens only when a service is deleted and re-created
  226. // in a short period, which is only possible when it doesn't
  227. // contain finalizer.
  228. if err := s.processLoadBalancerDelete(cachedService.state, key); err != nil {
  229. return err
  230. }
  231. }
  232. // Always cache the service, we need the info for service deletion in case
  233. // when load balancer cleanup is not handled via finalizer.
  234. cachedService.state = service
  235. op, err := s.syncLoadBalancerIfNeeded(service, key)
  236. if err != nil {
  237. s.eventRecorder.Eventf(service, v1.EventTypeWarning, "SyncLoadBalancerFailed", "Error syncing load balancer: %v", err)
  238. return err
  239. }
  240. if op == deleteLoadBalancer {
  241. // Only delete the cache upon successful load balancer deletion.
  242. s.cache.delete(key)
  243. }
  244. return nil
  245. }
  246. type loadBalancerOperation int
  247. const (
  248. deleteLoadBalancer loadBalancerOperation = iota
  249. ensureLoadBalancer
  250. )
  251. // syncLoadBalancerIfNeeded ensures that service's status is synced up with loadbalancer
  252. // i.e. creates loadbalancer for service if requested and deletes loadbalancer if the service
  253. // doesn't want a loadbalancer no more. Returns whatever error occurred.
  254. func (s *ServiceController) syncLoadBalancerIfNeeded(service *v1.Service, key string) (loadBalancerOperation, error) {
  255. // Note: It is safe to just call EnsureLoadBalancer. But, on some clouds that requires a delete & create,
  256. // which may involve service interruption. Also, we would like user-friendly events.
  257. // Save the state so we can avoid a write if it doesn't change
  258. previousStatus := v1helper.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer)
  259. var newStatus *v1.LoadBalancerStatus
  260. var op loadBalancerOperation
  261. var err error
  262. if !wantsLoadBalancer(service) || needsCleanup(service) {
  263. // Delete the load balancer if service no longer wants one, or if service needs cleanup.
  264. op = deleteLoadBalancer
  265. newStatus = &v1.LoadBalancerStatus{}
  266. _, exists, err := s.balancer.GetLoadBalancer(context.TODO(), s.clusterName, service)
  267. if err != nil {
  268. return op, fmt.Errorf("failed to check if load balancer exists before cleanup: %v", err)
  269. }
  270. if exists {
  271. klog.V(2).Infof("Deleting existing load balancer for service %s", key)
  272. s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer")
  273. if err := s.balancer.EnsureLoadBalancerDeleted(context.TODO(), s.clusterName, service); err != nil {
  274. return op, fmt.Errorf("failed to delete load balancer: %v", err)
  275. }
  276. }
  277. // Always try to remove finalizer when load balancer is deleted.
  278. // It will be a no-op if finalizer does not exist.
  279. // Note this also clears up finalizer if the cluster is downgraded
  280. // from a version that attaches finalizer to a version that doesn't.
  281. if err := s.removeFinalizer(service); err != nil {
  282. return op, fmt.Errorf("failed to remove load balancer cleanup finalizer: %v", err)
  283. }
  284. s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer")
  285. } else {
  286. // Create or update the load balancer if service wants one.
  287. op = ensureLoadBalancer
  288. klog.V(2).Infof("Ensuring load balancer for service %s", key)
  289. s.eventRecorder.Event(service, v1.EventTypeNormal, "EnsuringLoadBalancer", "Ensuring load balancer")
  290. if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.ServiceLoadBalancerFinalizer) {
  291. // Always try to add finalizer prior to load balancer creation.
  292. // It will be a no-op if finalizer already exists.
  293. // Note this also retrospectively puts on finalizer if the cluster
  294. // is upgraded from a version that doesn't attach finalizer to a
  295. // version that does.
  296. if err := s.addFinalizer(service); err != nil {
  297. return op, fmt.Errorf("failed to add load balancer cleanup finalizer: %v", err)
  298. }
  299. }
  300. newStatus, err = s.ensureLoadBalancer(service)
  301. if err != nil {
  302. return op, fmt.Errorf("failed to ensure load balancer: %v", err)
  303. }
  304. s.eventRecorder.Event(service, v1.EventTypeNormal, "EnsuredLoadBalancer", "Ensured load balancer")
  305. }
  306. if err := s.patchStatus(service, previousStatus, newStatus); err != nil {
  307. // Only retry error that isn't not found:
  308. // - Not found error mostly happens when service disappears right after
  309. // we remove the finalizer.
  310. // - We can't patch status on non-exist service anyway.
  311. if !errors.IsNotFound(err) {
  312. return op, fmt.Errorf("failed to update load balancer status: %v", err)
  313. }
  314. }
  315. return op, nil
  316. }
  317. func (s *ServiceController) ensureLoadBalancer(service *v1.Service) (*v1.LoadBalancerStatus, error) {
  318. nodes, err := s.nodeLister.ListWithPredicate(getNodeConditionPredicate())
  319. if err != nil {
  320. return nil, err
  321. }
  322. // If there are no available nodes for LoadBalancer service, make a EventTypeWarning event for it.
  323. if len(nodes) == 0 {
  324. s.eventRecorder.Event(service, v1.EventTypeWarning, "UnAvailableLoadBalancer", "There are no available nodes for LoadBalancer")
  325. }
  326. // - Only one protocol supported per service
  327. // - Not all cloud providers support all protocols and the next step is expected to return
  328. // an error for unsupported protocols
  329. return s.balancer.EnsureLoadBalancer(context.TODO(), s.clusterName, service, nodes)
  330. }
  331. // ListKeys implements the interface required by DeltaFIFO to list the keys we
  332. // already know about.
  333. func (s *serviceCache) ListKeys() []string {
  334. s.mu.Lock()
  335. defer s.mu.Unlock()
  336. keys := make([]string, 0, len(s.serviceMap))
  337. for k := range s.serviceMap {
  338. keys = append(keys, k)
  339. }
  340. return keys
  341. }
  342. // GetByKey returns the value stored in the serviceMap under the given key
  343. func (s *serviceCache) GetByKey(key string) (interface{}, bool, error) {
  344. s.mu.Lock()
  345. defer s.mu.Unlock()
  346. if v, ok := s.serviceMap[key]; ok {
  347. return v, true, nil
  348. }
  349. return nil, false, nil
  350. }
  351. // ListKeys implements the interface required by DeltaFIFO to list the keys we
  352. // already know about.
  353. func (s *serviceCache) allServices() []*v1.Service {
  354. s.mu.Lock()
  355. defer s.mu.Unlock()
  356. services := make([]*v1.Service, 0, len(s.serviceMap))
  357. for _, v := range s.serviceMap {
  358. services = append(services, v.state)
  359. }
  360. return services
  361. }
  362. func (s *serviceCache) get(serviceName string) (*cachedService, bool) {
  363. s.mu.Lock()
  364. defer s.mu.Unlock()
  365. service, ok := s.serviceMap[serviceName]
  366. return service, ok
  367. }
  368. func (s *serviceCache) getOrCreate(serviceName string) *cachedService {
  369. s.mu.Lock()
  370. defer s.mu.Unlock()
  371. service, ok := s.serviceMap[serviceName]
  372. if !ok {
  373. service = &cachedService{}
  374. s.serviceMap[serviceName] = service
  375. }
  376. return service
  377. }
  378. func (s *serviceCache) set(serviceName string, service *cachedService) {
  379. s.mu.Lock()
  380. defer s.mu.Unlock()
  381. s.serviceMap[serviceName] = service
  382. }
  383. func (s *serviceCache) delete(serviceName string) {
  384. s.mu.Lock()
  385. defer s.mu.Unlock()
  386. delete(s.serviceMap, serviceName)
  387. }
  388. // needsCleanup checks if load balancer needs to be cleaned up as indicated by finalizer.
  389. func needsCleanup(service *v1.Service) bool {
  390. return service.ObjectMeta.DeletionTimestamp != nil && servicehelper.HasLBFinalizer(service)
  391. }
  392. // needsUpdate checks if load balancer needs to be updated due to change in attributes.
  393. func (s *ServiceController) needsUpdate(oldService *v1.Service, newService *v1.Service) bool {
  394. if !wantsLoadBalancer(oldService) && !wantsLoadBalancer(newService) {
  395. return false
  396. }
  397. if wantsLoadBalancer(oldService) != wantsLoadBalancer(newService) {
  398. s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "Type", "%v -> %v",
  399. oldService.Spec.Type, newService.Spec.Type)
  400. return true
  401. }
  402. if wantsLoadBalancer(newService) && !reflect.DeepEqual(oldService.Spec.LoadBalancerSourceRanges, newService.Spec.LoadBalancerSourceRanges) {
  403. s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "LoadBalancerSourceRanges", "%v -> %v",
  404. oldService.Spec.LoadBalancerSourceRanges, newService.Spec.LoadBalancerSourceRanges)
  405. return true
  406. }
  407. if !portsEqualForLB(oldService, newService) || oldService.Spec.SessionAffinity != newService.Spec.SessionAffinity {
  408. return true
  409. }
  410. if !loadBalancerIPsAreEqual(oldService, newService) {
  411. s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "LoadbalancerIP", "%v -> %v",
  412. oldService.Spec.LoadBalancerIP, newService.Spec.LoadBalancerIP)
  413. return true
  414. }
  415. if len(oldService.Spec.ExternalIPs) != len(newService.Spec.ExternalIPs) {
  416. s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "ExternalIP", "Count: %v -> %v",
  417. len(oldService.Spec.ExternalIPs), len(newService.Spec.ExternalIPs))
  418. return true
  419. }
  420. for i := range oldService.Spec.ExternalIPs {
  421. if oldService.Spec.ExternalIPs[i] != newService.Spec.ExternalIPs[i] {
  422. s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "ExternalIP", "Added: %v",
  423. newService.Spec.ExternalIPs[i])
  424. return true
  425. }
  426. }
  427. if !reflect.DeepEqual(oldService.Annotations, newService.Annotations) {
  428. return true
  429. }
  430. if oldService.UID != newService.UID {
  431. s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "UID", "%v -> %v",
  432. oldService.UID, newService.UID)
  433. return true
  434. }
  435. if oldService.Spec.ExternalTrafficPolicy != newService.Spec.ExternalTrafficPolicy {
  436. s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "ExternalTrafficPolicy", "%v -> %v",
  437. oldService.Spec.ExternalTrafficPolicy, newService.Spec.ExternalTrafficPolicy)
  438. return true
  439. }
  440. if oldService.Spec.HealthCheckNodePort != newService.Spec.HealthCheckNodePort {
  441. s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "HealthCheckNodePort", "%v -> %v",
  442. oldService.Spec.HealthCheckNodePort, newService.Spec.HealthCheckNodePort)
  443. return true
  444. }
  445. return false
  446. }
  447. func (s *ServiceController) loadBalancerName(service *v1.Service) string {
  448. return s.balancer.GetLoadBalancerName(context.TODO(), "", service)
  449. }
  450. func getPortsForLB(service *v1.Service) ([]*v1.ServicePort, error) {
  451. var protocol v1.Protocol
  452. ports := []*v1.ServicePort{}
  453. for i := range service.Spec.Ports {
  454. sp := &service.Spec.Ports[i]
  455. // The check on protocol was removed here. The cloud provider itself is now responsible for all protocol validation
  456. ports = append(ports, sp)
  457. if protocol == "" {
  458. protocol = sp.Protocol
  459. } else if protocol != sp.Protocol && wantsLoadBalancer(service) {
  460. // TODO: Convert error messages to use event recorder
  461. return nil, fmt.Errorf("mixed protocol external load balancers are not supported")
  462. }
  463. }
  464. return ports, nil
  465. }
  466. func portsEqualForLB(x, y *v1.Service) bool {
  467. xPorts, err := getPortsForLB(x)
  468. if err != nil {
  469. return false
  470. }
  471. yPorts, err := getPortsForLB(y)
  472. if err != nil {
  473. return false
  474. }
  475. return portSlicesEqualForLB(xPorts, yPorts)
  476. }
  477. func portSlicesEqualForLB(x, y []*v1.ServicePort) bool {
  478. if len(x) != len(y) {
  479. return false
  480. }
  481. for i := range x {
  482. if !portEqualForLB(x[i], y[i]) {
  483. return false
  484. }
  485. }
  486. return true
  487. }
  488. func portEqualForLB(x, y *v1.ServicePort) bool {
  489. // TODO: Should we check name? (In theory, an LB could expose it)
  490. if x.Name != y.Name {
  491. return false
  492. }
  493. if x.Protocol != y.Protocol {
  494. return false
  495. }
  496. if x.Port != y.Port {
  497. return false
  498. }
  499. if x.NodePort != y.NodePort {
  500. return false
  501. }
  502. // We don't check TargetPort; that is not relevant for load balancing
  503. // TODO: Should we blank it out? Or just check it anyway?
  504. return true
  505. }
  506. func nodeNames(nodes []*v1.Node) sets.String {
  507. ret := sets.NewString()
  508. for _, node := range nodes {
  509. ret.Insert(node.Name)
  510. }
  511. return ret
  512. }
  513. func nodeSlicesEqualForLB(x, y []*v1.Node) bool {
  514. if len(x) != len(y) {
  515. return false
  516. }
  517. return nodeNames(x).Equal(nodeNames(y))
  518. }
  519. func getNodeConditionPredicate() corelisters.NodeConditionPredicate {
  520. return func(node *v1.Node) bool {
  521. // We add the master to the node list, but its unschedulable. So we use this to filter
  522. // the master.
  523. if node.Spec.Unschedulable {
  524. return false
  525. }
  526. // As of 1.6, we will taint the master, but not necessarily mark it unschedulable.
  527. // Recognize nodes labeled as master, and filter them also, as we were doing previously.
  528. if _, hasMasterRoleLabel := node.Labels[LabelNodeRoleMaster]; hasMasterRoleLabel {
  529. return false
  530. }
  531. if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.ServiceNodeExclusion) {
  532. if _, hasExcludeBalancerLabel := node.Labels[LabelNodeRoleExcludeBalancer]; hasExcludeBalancerLabel {
  533. return false
  534. }
  535. }
  536. // If we have no info, don't accept
  537. if len(node.Status.Conditions) == 0 {
  538. return false
  539. }
  540. for _, cond := range node.Status.Conditions {
  541. // We consider the node for load balancing only when its NodeReady condition status
  542. // is ConditionTrue
  543. if cond.Type == v1.NodeReady && cond.Status != v1.ConditionTrue {
  544. klog.V(4).Infof("Ignoring node %v with %v condition status %v", node.Name, cond.Type, cond.Status)
  545. return false
  546. }
  547. }
  548. return true
  549. }
  550. }
  551. // nodeSyncLoop handles updating the hosts pointed to by all load
  552. // balancers whenever the set of nodes in the cluster changes.
  553. func (s *ServiceController) nodeSyncLoop() {
  554. newHosts, err := s.nodeLister.ListWithPredicate(getNodeConditionPredicate())
  555. if err != nil {
  556. runtime.HandleError(fmt.Errorf("Failed to retrieve current set of nodes from node lister: %v", err))
  557. return
  558. }
  559. if nodeSlicesEqualForLB(newHosts, s.knownHosts) {
  560. // The set of nodes in the cluster hasn't changed, but we can retry
  561. // updating any services that we failed to update last time around.
  562. s.servicesToUpdate = s.updateLoadBalancerHosts(s.servicesToUpdate, newHosts)
  563. return
  564. }
  565. klog.V(2).Infof("Detected change in list of current cluster nodes. New node set: %v",
  566. nodeNames(newHosts))
  567. // Try updating all services, and save the ones that fail to try again next
  568. // round.
  569. s.servicesToUpdate = s.cache.allServices()
  570. numServices := len(s.servicesToUpdate)
  571. s.servicesToUpdate = s.updateLoadBalancerHosts(s.servicesToUpdate, newHosts)
  572. klog.V(2).Infof("Successfully updated %d out of %d load balancers to direct traffic to the updated set of nodes",
  573. numServices-len(s.servicesToUpdate), numServices)
  574. s.knownHosts = newHosts
  575. }
  576. // updateLoadBalancerHosts updates all existing load balancers so that
  577. // they will match the list of hosts provided.
  578. // Returns the list of services that couldn't be updated.
  579. func (s *ServiceController) updateLoadBalancerHosts(services []*v1.Service, hosts []*v1.Node) (servicesToRetry []*v1.Service) {
  580. for _, service := range services {
  581. func() {
  582. if service == nil {
  583. return
  584. }
  585. if err := s.lockedUpdateLoadBalancerHosts(service, hosts); err != nil {
  586. runtime.HandleError(fmt.Errorf("failed to update load balancer hosts for service %s/%s: %v", service.Namespace, service.Name, err))
  587. servicesToRetry = append(servicesToRetry, service)
  588. }
  589. }()
  590. }
  591. return servicesToRetry
  592. }
  593. // Updates the load balancer of a service, assuming we hold the mutex
  594. // associated with the service.
  595. func (s *ServiceController) lockedUpdateLoadBalancerHosts(service *v1.Service, hosts []*v1.Node) error {
  596. if !wantsLoadBalancer(service) {
  597. return nil
  598. }
  599. // This operation doesn't normally take very long (and happens pretty often), so we only record the final event
  600. err := s.balancer.UpdateLoadBalancer(context.TODO(), s.clusterName, service, hosts)
  601. if err == nil {
  602. // If there are no available nodes for LoadBalancer service, make a EventTypeWarning event for it.
  603. if len(hosts) == 0 {
  604. s.eventRecorder.Event(service, v1.EventTypeWarning, "UnAvailableLoadBalancer", "There are no available nodes for LoadBalancer")
  605. } else {
  606. s.eventRecorder.Event(service, v1.EventTypeNormal, "UpdatedLoadBalancer", "Updated load balancer with new hosts")
  607. }
  608. return nil
  609. }
  610. // It's only an actual error if the load balancer still exists.
  611. if _, exists, err := s.balancer.GetLoadBalancer(context.TODO(), s.clusterName, service); err != nil {
  612. runtime.HandleError(fmt.Errorf("failed to check if load balancer exists for service %s/%s: %v", service.Namespace, service.Name, err))
  613. } else if !exists {
  614. return nil
  615. }
  616. s.eventRecorder.Eventf(service, v1.EventTypeWarning, "UpdateLoadBalancerFailed", "Error updating load balancer with new hosts %v: %v", nodeNames(hosts), err)
  617. return err
  618. }
  619. func wantsLoadBalancer(service *v1.Service) bool {
  620. return service.Spec.Type == v1.ServiceTypeLoadBalancer
  621. }
  622. func loadBalancerIPsAreEqual(oldService, newService *v1.Service) bool {
  623. return oldService.Spec.LoadBalancerIP == newService.Spec.LoadBalancerIP
  624. }
  625. // syncService will sync the Service with the given key if it has had its expectations fulfilled,
  626. // meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be
  627. // invoked concurrently with the same key.
  628. func (s *ServiceController) syncService(key string) error {
  629. startTime := time.Now()
  630. defer func() {
  631. klog.V(4).Infof("Finished syncing service %q (%v)", key, time.Since(startTime))
  632. }()
  633. namespace, name, err := cache.SplitMetaNamespaceKey(key)
  634. if err != nil {
  635. return err
  636. }
  637. // service holds the latest service info from apiserver
  638. service, err := s.serviceLister.Services(namespace).Get(name)
  639. switch {
  640. case errors.IsNotFound(err):
  641. // service absence in store means watcher caught the deletion, ensure LB info is cleaned
  642. err = s.processServiceDeletion(key)
  643. case err != nil:
  644. runtime.HandleError(fmt.Errorf("Unable to retrieve service %v from store: %v", key, err))
  645. default:
  646. err = s.processServiceCreateOrUpdate(service, key)
  647. }
  648. return err
  649. }
  650. func (s *ServiceController) processServiceDeletion(key string) error {
  651. cachedService, ok := s.cache.get(key)
  652. if !ok {
  653. // Cache does not contains the key means:
  654. // - We didn't create a Load Balancer for the deleted service at all.
  655. // - We already deleted the Load Balancer that was created for the service.
  656. // In both cases we have nothing left to do.
  657. return nil
  658. }
  659. klog.V(2).Infof("Service %v has been deleted. Attempting to cleanup load balancer resources", key)
  660. if err := s.processLoadBalancerDelete(cachedService.state, key); err != nil {
  661. return err
  662. }
  663. s.cache.delete(key)
  664. return nil
  665. }
  666. func (s *ServiceController) processLoadBalancerDelete(service *v1.Service, key string) error {
  667. // delete load balancer info only if the service type is LoadBalancer
  668. if !wantsLoadBalancer(service) {
  669. return nil
  670. }
  671. s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer")
  672. if err := s.balancer.EnsureLoadBalancerDeleted(context.TODO(), s.clusterName, service); err != nil {
  673. s.eventRecorder.Eventf(service, v1.EventTypeWarning, "DeleteLoadBalancerFailed", "Error deleting load balancer: %v", err)
  674. return err
  675. }
  676. s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer")
  677. return nil
  678. }
  679. // addFinalizer patches the service to add finalizer.
  680. func (s *ServiceController) addFinalizer(service *v1.Service) error {
  681. if servicehelper.HasLBFinalizer(service) {
  682. return nil
  683. }
  684. // Make a copy so we don't mutate the shared informer cache.
  685. updated := service.DeepCopy()
  686. updated.ObjectMeta.Finalizers = append(updated.ObjectMeta.Finalizers, servicehelper.LoadBalancerCleanupFinalizer)
  687. klog.V(2).Infof("Adding finalizer to service %s/%s", updated.Namespace, updated.Name)
  688. _, err := patch(s.kubeClient.CoreV1(), service, updated)
  689. return err
  690. }
  691. // removeFinalizer patches the service to remove finalizer.
  692. func (s *ServiceController) removeFinalizer(service *v1.Service) error {
  693. if !servicehelper.HasLBFinalizer(service) {
  694. return nil
  695. }
  696. // Make a copy so we don't mutate the shared informer cache.
  697. updated := service.DeepCopy()
  698. updated.ObjectMeta.Finalizers = slice.RemoveString(updated.ObjectMeta.Finalizers, servicehelper.LoadBalancerCleanupFinalizer, nil)
  699. klog.V(2).Infof("Removing finalizer from service %s/%s", updated.Namespace, updated.Name)
  700. _, err := patch(s.kubeClient.CoreV1(), service, updated)
  701. return err
  702. }
  703. // patchStatus patches the service with the given LoadBalancerStatus.
  704. func (s *ServiceController) patchStatus(service *v1.Service, previousStatus, newStatus *v1.LoadBalancerStatus) error {
  705. if v1helper.LoadBalancerStatusEqual(previousStatus, newStatus) {
  706. return nil
  707. }
  708. // Make a copy so we don't mutate the shared informer cache.
  709. updated := service.DeepCopy()
  710. updated.Status.LoadBalancer = *newStatus
  711. klog.V(2).Infof("Patching status for service %s/%s", updated.Namespace, updated.Name)
  712. _, err := patch(s.kubeClient.CoreV1(), service, updated)
  713. return err
  714. }