controller.go 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package service
  14. import (
  15. "context"
  16. "fmt"
  17. "sync"
  18. "time"
  19. "reflect"
  20. v1 "k8s.io/api/core/v1"
  21. "k8s.io/apimachinery/pkg/api/errors"
  22. "k8s.io/apimachinery/pkg/labels"
  23. "k8s.io/apimachinery/pkg/util/runtime"
  24. "k8s.io/apimachinery/pkg/util/sets"
  25. "k8s.io/apimachinery/pkg/util/wait"
  26. utilfeature "k8s.io/apiserver/pkg/util/feature"
  27. coreinformers "k8s.io/client-go/informers/core/v1"
  28. clientset "k8s.io/client-go/kubernetes"
  29. "k8s.io/client-go/kubernetes/scheme"
  30. v1core "k8s.io/client-go/kubernetes/typed/core/v1"
  31. corelisters "k8s.io/client-go/listers/core/v1"
  32. "k8s.io/client-go/tools/cache"
  33. "k8s.io/client-go/tools/record"
  34. "k8s.io/client-go/util/workqueue"
  35. cloudprovider "k8s.io/cloud-provider"
  36. servicehelper "k8s.io/cloud-provider/service/helpers"
  37. "k8s.io/component-base/metrics/prometheus/ratelimiter"
  38. "k8s.io/klog"
  39. )
  40. const (
  41. // Interval of synchronizing service status from apiserver
  42. serviceSyncPeriod = 30 * time.Second
  43. // Interval of synchronizing node status from apiserver
  44. nodeSyncPeriod = 100 * time.Second
  45. // How long to wait before retrying the processing of a service change.
  46. // If this changes, the sleep in hack/jenkins/e2e.sh before downing a cluster
  47. // should be changed appropriately.
  48. minRetryDelay = 5 * time.Second
  49. maxRetryDelay = 300 * time.Second
  50. // labelNodeRoleMaster specifies that a node is a master. The use of this label within the
  51. // controller is deprecated and only considered when the LegacyNodeRoleBehavior feature gate
  52. // is on.
  53. labelNodeRoleMaster = "node-role.kubernetes.io/master"
  54. // labelNodeRoleExcludeBalancer specifies that the node should not be considered as a target
  55. // for external load-balancers which use nodes as a second hop (e.g. many cloud LBs which only
  56. // understand nodes). For services that use externalTrafficPolicy=Local, this may mean that
  57. // any backends on excluded nodes are not reachable by those external load-balancers.
  58. // Implementations of this exclusion may vary based on provider. This label is honored starting
  59. // in 1.16 when the ServiceNodeExclusion gate is on.
  60. labelNodeRoleExcludeBalancer = "node.kubernetes.io/exclude-from-external-load-balancers"
  61. // labelAlphaNodeRoleExcludeBalancer specifies that the node should be
  62. // exclude from load balancers created by a cloud provider. This label is deprecated and will
  63. // be removed in 1.18.
  64. labelAlphaNodeRoleExcludeBalancer = "alpha.service-controller.kubernetes.io/exclude-balancer"
  65. // serviceNodeExclusionFeature is the feature gate name that
  66. // enables nodes to exclude themselves from service load balancers
  67. // originated from: https://github.com/kubernetes/kubernetes/blob/28e800245e/pkg/features/kube_features.go#L178
  68. serviceNodeExclusionFeature = "ServiceNodeExclusion"
  69. // legacyNodeRoleBehaviorFeature is the feature gate name that enables legacy
  70. // behavior to vary cluster functionality on the node-role.kubernetes.io
  71. // labels.
  72. legacyNodeRoleBehaviorFeature = "LegacyNodeRoleBehavior"
  73. )
  74. type cachedService struct {
  75. // The cached state of the service
  76. state *v1.Service
  77. }
  78. type serviceCache struct {
  79. mu sync.RWMutex // protects serviceMap
  80. serviceMap map[string]*cachedService
  81. }
  82. // Controller keeps cloud provider service resources
  83. // (like load balancers) in sync with the registry.
  84. type Controller struct {
  85. cloud cloudprovider.Interface
  86. knownHosts []*v1.Node
  87. servicesToUpdate []*v1.Service
  88. kubeClient clientset.Interface
  89. clusterName string
  90. balancer cloudprovider.LoadBalancer
  91. // TODO(#85155): Stop relying on this and remove the cache completely.
  92. cache *serviceCache
  93. serviceLister corelisters.ServiceLister
  94. serviceListerSynced cache.InformerSynced
  95. eventBroadcaster record.EventBroadcaster
  96. eventRecorder record.EventRecorder
  97. nodeLister corelisters.NodeLister
  98. nodeListerSynced cache.InformerSynced
  99. // services that need to be synced
  100. queue workqueue.RateLimitingInterface
  101. }
  102. // New returns a new service controller to keep cloud provider service resources
  103. // (like load balancers) in sync with the registry.
  104. func New(
  105. cloud cloudprovider.Interface,
  106. kubeClient clientset.Interface,
  107. serviceInformer coreinformers.ServiceInformer,
  108. nodeInformer coreinformers.NodeInformer,
  109. clusterName string,
  110. ) (*Controller, error) {
  111. broadcaster := record.NewBroadcaster()
  112. broadcaster.StartLogging(klog.Infof)
  113. broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
  114. recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "service-controller"})
  115. if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
  116. if err := ratelimiter.RegisterMetricAndTrackRateLimiterUsage("service_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()); err != nil {
  117. return nil, err
  118. }
  119. }
  120. s := &Controller{
  121. cloud: cloud,
  122. knownHosts: []*v1.Node{},
  123. kubeClient: kubeClient,
  124. clusterName: clusterName,
  125. cache: &serviceCache{serviceMap: make(map[string]*cachedService)},
  126. eventBroadcaster: broadcaster,
  127. eventRecorder: recorder,
  128. nodeLister: nodeInformer.Lister(),
  129. nodeListerSynced: nodeInformer.Informer().HasSynced,
  130. queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"),
  131. }
  132. serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
  133. cache.ResourceEventHandlerFuncs{
  134. AddFunc: func(cur interface{}) {
  135. svc, ok := cur.(*v1.Service)
  136. // Check cleanup here can provide a remedy when controller failed to handle
  137. // changes before it exiting (e.g. crashing, restart, etc.).
  138. if ok && (wantsLoadBalancer(svc) || needsCleanup(svc)) {
  139. s.enqueueService(cur)
  140. }
  141. },
  142. UpdateFunc: func(old, cur interface{}) {
  143. oldSvc, ok1 := old.(*v1.Service)
  144. curSvc, ok2 := cur.(*v1.Service)
  145. if ok1 && ok2 && (s.needsUpdate(oldSvc, curSvc) || needsCleanup(curSvc)) {
  146. s.enqueueService(cur)
  147. }
  148. },
  149. // No need to handle deletion event because the deletion would be handled by
  150. // the update path when the deletion timestamp is added.
  151. },
  152. serviceSyncPeriod,
  153. )
  154. s.serviceLister = serviceInformer.Lister()
  155. s.serviceListerSynced = serviceInformer.Informer().HasSynced
  156. if err := s.init(); err != nil {
  157. return nil, err
  158. }
  159. return s, nil
  160. }
  161. // obj could be an *v1.Service, or a DeletionFinalStateUnknown marker item.
  162. func (s *Controller) enqueueService(obj interface{}) {
  163. key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
  164. if err != nil {
  165. runtime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", obj, err))
  166. return
  167. }
  168. s.queue.Add(key)
  169. }
  170. // Run starts a background goroutine that watches for changes to services that
  171. // have (or had) LoadBalancers=true and ensures that they have
  172. // load balancers created and deleted appropriately.
  173. // serviceSyncPeriod controls how often we check the cluster's services to
  174. // ensure that the correct load balancers exist.
  175. // nodeSyncPeriod controls how often we check the cluster's nodes to determine
  176. // if load balancers need to be updated to point to a new set.
  177. //
  178. // It's an error to call Run() more than once for a given ServiceController
  179. // object.
  180. func (s *Controller) Run(stopCh <-chan struct{}, workers int) {
  181. defer runtime.HandleCrash()
  182. defer s.queue.ShutDown()
  183. klog.Info("Starting service controller")
  184. defer klog.Info("Shutting down service controller")
  185. if !cache.WaitForNamedCacheSync("service", stopCh, s.serviceListerSynced, s.nodeListerSynced) {
  186. return
  187. }
  188. for i := 0; i < workers; i++ {
  189. go wait.Until(s.worker, time.Second, stopCh)
  190. }
  191. go wait.Until(s.nodeSyncLoop, nodeSyncPeriod, stopCh)
  192. <-stopCh
  193. }
  194. // worker runs a worker thread that just dequeues items, processes them, and marks them done.
  195. // It enforces that the syncHandler is never invoked concurrently with the same key.
  196. func (s *Controller) worker() {
  197. for s.processNextWorkItem() {
  198. }
  199. }
  200. func (s *Controller) processNextWorkItem() bool {
  201. key, quit := s.queue.Get()
  202. if quit {
  203. return false
  204. }
  205. defer s.queue.Done(key)
  206. err := s.syncService(key.(string))
  207. if err == nil {
  208. s.queue.Forget(key)
  209. return true
  210. }
  211. runtime.HandleError(fmt.Errorf("error processing service %v (will retry): %v", key, err))
  212. s.queue.AddRateLimited(key)
  213. return true
  214. }
  215. func (s *Controller) init() error {
  216. if s.cloud == nil {
  217. return fmt.Errorf("WARNING: no cloud provider provided, services of type LoadBalancer will fail")
  218. }
  219. balancer, ok := s.cloud.LoadBalancer()
  220. if !ok {
  221. return fmt.Errorf("the cloud provider does not support external load balancers")
  222. }
  223. s.balancer = balancer
  224. return nil
  225. }
  226. // processServiceCreateOrUpdate operates loadbalancers for the incoming service accordingly.
  227. // Returns an error if processing the service update failed.
  228. func (s *Controller) processServiceCreateOrUpdate(service *v1.Service, key string) error {
  229. // TODO(@MrHohn): Remove the cache once we get rid of the non-finalizer deletion
  230. // path. Ref https://github.com/kubernetes/enhancements/issues/980.
  231. cachedService := s.cache.getOrCreate(key)
  232. if cachedService.state != nil && cachedService.state.UID != service.UID {
  233. // This happens only when a service is deleted and re-created
  234. // in a short period, which is only possible when it doesn't
  235. // contain finalizer.
  236. if err := s.processLoadBalancerDelete(cachedService.state, key); err != nil {
  237. return err
  238. }
  239. }
  240. // Always cache the service, we need the info for service deletion in case
  241. // when load balancer cleanup is not handled via finalizer.
  242. cachedService.state = service
  243. op, err := s.syncLoadBalancerIfNeeded(service, key)
  244. if err != nil {
  245. s.eventRecorder.Eventf(service, v1.EventTypeWarning, "SyncLoadBalancerFailed", "Error syncing load balancer: %v", err)
  246. return err
  247. }
  248. if op == deleteLoadBalancer {
  249. // Only delete the cache upon successful load balancer deletion.
  250. s.cache.delete(key)
  251. }
  252. return nil
  253. }
  254. type loadBalancerOperation int
  255. const (
  256. deleteLoadBalancer loadBalancerOperation = iota
  257. ensureLoadBalancer
  258. )
  259. // syncLoadBalancerIfNeeded ensures that service's status is synced up with loadbalancer
  260. // i.e. creates loadbalancer for service if requested and deletes loadbalancer if the service
  261. // doesn't want a loadbalancer no more. Returns whatever error occurred.
  262. func (s *Controller) syncLoadBalancerIfNeeded(service *v1.Service, key string) (loadBalancerOperation, error) {
  263. // Note: It is safe to just call EnsureLoadBalancer. But, on some clouds that requires a delete & create,
  264. // which may involve service interruption. Also, we would like user-friendly events.
  265. // Save the state so we can avoid a write if it doesn't change
  266. previousStatus := service.Status.LoadBalancer.DeepCopy()
  267. var newStatus *v1.LoadBalancerStatus
  268. var op loadBalancerOperation
  269. var err error
  270. if !wantsLoadBalancer(service) || needsCleanup(service) {
  271. // Delete the load balancer if service no longer wants one, or if service needs cleanup.
  272. op = deleteLoadBalancer
  273. newStatus = &v1.LoadBalancerStatus{}
  274. _, exists, err := s.balancer.GetLoadBalancer(context.TODO(), s.clusterName, service)
  275. if err != nil {
  276. return op, fmt.Errorf("failed to check if load balancer exists before cleanup: %v", err)
  277. }
  278. if exists {
  279. klog.V(2).Infof("Deleting existing load balancer for service %s", key)
  280. s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer")
  281. if err := s.balancer.EnsureLoadBalancerDeleted(context.TODO(), s.clusterName, service); err != nil {
  282. return op, fmt.Errorf("failed to delete load balancer: %v", err)
  283. }
  284. }
  285. // Always remove finalizer when load balancer is deleted, this ensures Services
  286. // can be deleted after all corresponding load balancer resources are deleted.
  287. if err := s.removeFinalizer(service); err != nil {
  288. return op, fmt.Errorf("failed to remove load balancer cleanup finalizer: %v", err)
  289. }
  290. s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer")
  291. } else {
  292. // Create or update the load balancer if service wants one.
  293. op = ensureLoadBalancer
  294. klog.V(2).Infof("Ensuring load balancer for service %s", key)
  295. s.eventRecorder.Event(service, v1.EventTypeNormal, "EnsuringLoadBalancer", "Ensuring load balancer")
  296. // Always add a finalizer prior to creating load balancers, this ensures Services
  297. // can't be deleted until all corresponding load balancer resources are also deleted.
  298. if err := s.addFinalizer(service); err != nil {
  299. return op, fmt.Errorf("failed to add load balancer cleanup finalizer: %v", err)
  300. }
  301. newStatus, err = s.ensureLoadBalancer(service)
  302. if err != nil {
  303. if err == cloudprovider.ImplementedElsewhere {
  304. // ImplementedElsewhere indicates that the ensureLoadBalancer is a nop and the
  305. // functionality is implemented by a different controller. In this case, we
  306. // return immediately without doing anything.
  307. klog.V(4).Infof("LoadBalancer for service %s implemented by a different controller %s, Ignoring error", key, s.cloud.ProviderName())
  308. return op, nil
  309. }
  310. return op, fmt.Errorf("failed to ensure load balancer: %v", err)
  311. }
  312. if newStatus == nil {
  313. return op, fmt.Errorf("service status returned by EnsureLoadBalancer is nil")
  314. }
  315. s.eventRecorder.Event(service, v1.EventTypeNormal, "EnsuredLoadBalancer", "Ensured load balancer")
  316. }
  317. if err := s.patchStatus(service, previousStatus, newStatus); err != nil {
  318. // Only retry error that isn't not found:
  319. // - Not found error mostly happens when service disappears right after
  320. // we remove the finalizer.
  321. // - We can't patch status on non-exist service anyway.
  322. if !errors.IsNotFound(err) {
  323. return op, fmt.Errorf("failed to update load balancer status: %v", err)
  324. }
  325. }
  326. return op, nil
  327. }
  328. func (s *Controller) ensureLoadBalancer(service *v1.Service) (*v1.LoadBalancerStatus, error) {
  329. nodes, err := listWithPredicate(s.nodeLister, getNodeConditionPredicate())
  330. if err != nil {
  331. return nil, err
  332. }
  333. // If there are no available nodes for LoadBalancer service, make a EventTypeWarning event for it.
  334. if len(nodes) == 0 {
  335. s.eventRecorder.Event(service, v1.EventTypeWarning, "UnAvailableLoadBalancer", "There are no available nodes for LoadBalancer")
  336. }
  337. // - Only one protocol supported per service
  338. // - Not all cloud providers support all protocols and the next step is expected to return
  339. // an error for unsupported protocols
  340. return s.balancer.EnsureLoadBalancer(context.TODO(), s.clusterName, service, nodes)
  341. }
  342. // ListKeys implements the interface required by DeltaFIFO to list the keys we
  343. // already know about.
  344. func (s *serviceCache) ListKeys() []string {
  345. s.mu.RLock()
  346. defer s.mu.RUnlock()
  347. keys := make([]string, 0, len(s.serviceMap))
  348. for k := range s.serviceMap {
  349. keys = append(keys, k)
  350. }
  351. return keys
  352. }
  353. // GetByKey returns the value stored in the serviceMap under the given key
  354. func (s *serviceCache) GetByKey(key string) (interface{}, bool, error) {
  355. s.mu.RLock()
  356. defer s.mu.RUnlock()
  357. if v, ok := s.serviceMap[key]; ok {
  358. return v, true, nil
  359. }
  360. return nil, false, nil
  361. }
  362. // ListKeys implements the interface required by DeltaFIFO to list the keys we
  363. // already know about.
  364. func (s *serviceCache) allServices() []*v1.Service {
  365. s.mu.RLock()
  366. defer s.mu.RUnlock()
  367. services := make([]*v1.Service, 0, len(s.serviceMap))
  368. for _, v := range s.serviceMap {
  369. services = append(services, v.state)
  370. }
  371. return services
  372. }
  373. func (s *serviceCache) get(serviceName string) (*cachedService, bool) {
  374. s.mu.RLock()
  375. defer s.mu.RUnlock()
  376. service, ok := s.serviceMap[serviceName]
  377. return service, ok
  378. }
  379. func (s *serviceCache) getOrCreate(serviceName string) *cachedService {
  380. s.mu.Lock()
  381. defer s.mu.Unlock()
  382. service, ok := s.serviceMap[serviceName]
  383. if !ok {
  384. service = &cachedService{}
  385. s.serviceMap[serviceName] = service
  386. }
  387. return service
  388. }
  389. func (s *serviceCache) set(serviceName string, service *cachedService) {
  390. s.mu.Lock()
  391. defer s.mu.Unlock()
  392. s.serviceMap[serviceName] = service
  393. }
  394. func (s *serviceCache) delete(serviceName string) {
  395. s.mu.Lock()
  396. defer s.mu.Unlock()
  397. delete(s.serviceMap, serviceName)
  398. }
  399. // needsCleanup checks if load balancer needs to be cleaned up as indicated by finalizer.
  400. func needsCleanup(service *v1.Service) bool {
  401. if !servicehelper.HasLBFinalizer(service) {
  402. return false
  403. }
  404. if service.ObjectMeta.DeletionTimestamp != nil {
  405. return true
  406. }
  407. // Service doesn't want loadBalancer but owns loadBalancer finalizer also need to be cleaned up.
  408. if service.Spec.Type != v1.ServiceTypeLoadBalancer {
  409. return true
  410. }
  411. return false
  412. }
  413. // needsUpdate checks if load balancer needs to be updated due to change in attributes.
  414. func (s *Controller) needsUpdate(oldService *v1.Service, newService *v1.Service) bool {
  415. if !wantsLoadBalancer(oldService) && !wantsLoadBalancer(newService) {
  416. return false
  417. }
  418. if wantsLoadBalancer(oldService) != wantsLoadBalancer(newService) {
  419. s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "Type", "%v -> %v",
  420. oldService.Spec.Type, newService.Spec.Type)
  421. return true
  422. }
  423. if wantsLoadBalancer(newService) && !reflect.DeepEqual(oldService.Spec.LoadBalancerSourceRanges, newService.Spec.LoadBalancerSourceRanges) {
  424. s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "LoadBalancerSourceRanges", "%v -> %v",
  425. oldService.Spec.LoadBalancerSourceRanges, newService.Spec.LoadBalancerSourceRanges)
  426. return true
  427. }
  428. if !portsEqualForLB(oldService, newService) || oldService.Spec.SessionAffinity != newService.Spec.SessionAffinity {
  429. return true
  430. }
  431. if !reflect.DeepEqual(oldService.Spec.SessionAffinityConfig, newService.Spec.SessionAffinityConfig) {
  432. return true
  433. }
  434. if !loadBalancerIPsAreEqual(oldService, newService) {
  435. s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "LoadbalancerIP", "%v -> %v",
  436. oldService.Spec.LoadBalancerIP, newService.Spec.LoadBalancerIP)
  437. return true
  438. }
  439. if len(oldService.Spec.ExternalIPs) != len(newService.Spec.ExternalIPs) {
  440. s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "ExternalIP", "Count: %v -> %v",
  441. len(oldService.Spec.ExternalIPs), len(newService.Spec.ExternalIPs))
  442. return true
  443. }
  444. for i := range oldService.Spec.ExternalIPs {
  445. if oldService.Spec.ExternalIPs[i] != newService.Spec.ExternalIPs[i] {
  446. s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "ExternalIP", "Added: %v",
  447. newService.Spec.ExternalIPs[i])
  448. return true
  449. }
  450. }
  451. if !reflect.DeepEqual(oldService.Annotations, newService.Annotations) {
  452. return true
  453. }
  454. if oldService.UID != newService.UID {
  455. s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "UID", "%v -> %v",
  456. oldService.UID, newService.UID)
  457. return true
  458. }
  459. if oldService.Spec.ExternalTrafficPolicy != newService.Spec.ExternalTrafficPolicy {
  460. s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "ExternalTrafficPolicy", "%v -> %v",
  461. oldService.Spec.ExternalTrafficPolicy, newService.Spec.ExternalTrafficPolicy)
  462. return true
  463. }
  464. if oldService.Spec.HealthCheckNodePort != newService.Spec.HealthCheckNodePort {
  465. s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "HealthCheckNodePort", "%v -> %v",
  466. oldService.Spec.HealthCheckNodePort, newService.Spec.HealthCheckNodePort)
  467. return true
  468. }
  469. return false
  470. }
  471. func getPortsForLB(service *v1.Service) []*v1.ServicePort {
  472. ports := []*v1.ServicePort{}
  473. for i := range service.Spec.Ports {
  474. sp := &service.Spec.Ports[i]
  475. ports = append(ports, sp)
  476. }
  477. return ports
  478. }
  479. func portsEqualForLB(x, y *v1.Service) bool {
  480. xPorts := getPortsForLB(x)
  481. yPorts := getPortsForLB(y)
  482. return portSlicesEqualForLB(xPorts, yPorts)
  483. }
  484. func portSlicesEqualForLB(x, y []*v1.ServicePort) bool {
  485. if len(x) != len(y) {
  486. return false
  487. }
  488. for i := range x {
  489. if !portEqualForLB(x[i], y[i]) {
  490. return false
  491. }
  492. }
  493. return true
  494. }
  495. func portEqualForLB(x, y *v1.ServicePort) bool {
  496. // TODO: Should we check name? (In theory, an LB could expose it)
  497. if x.Name != y.Name {
  498. return false
  499. }
  500. if x.Protocol != y.Protocol {
  501. return false
  502. }
  503. if x.Port != y.Port {
  504. return false
  505. }
  506. if x.NodePort != y.NodePort {
  507. return false
  508. }
  509. if x.TargetPort != y.TargetPort {
  510. return false
  511. }
  512. return true
  513. }
  514. func nodeNames(nodes []*v1.Node) sets.String {
  515. ret := sets.NewString()
  516. for _, node := range nodes {
  517. ret.Insert(node.Name)
  518. }
  519. return ret
  520. }
  521. func nodeSlicesEqualForLB(x, y []*v1.Node) bool {
  522. if len(x) != len(y) {
  523. return false
  524. }
  525. return nodeNames(x).Equal(nodeNames(y))
  526. }
  527. func getNodeConditionPredicate() NodeConditionPredicate {
  528. return func(node *v1.Node) bool {
  529. // We add the master to the node list, but its unschedulable. So we use this to filter
  530. // the master.
  531. if node.Spec.Unschedulable {
  532. return false
  533. }
  534. if utilfeature.DefaultFeatureGate.Enabled(legacyNodeRoleBehaviorFeature) {
  535. // As of 1.6, we will taint the master, but not necessarily mark it unschedulable.
  536. // Recognize nodes labeled as master, and filter them also, as we were doing previously.
  537. if _, hasMasterRoleLabel := node.Labels[labelNodeRoleMaster]; hasMasterRoleLabel {
  538. return false
  539. }
  540. }
  541. if utilfeature.DefaultFeatureGate.Enabled(serviceNodeExclusionFeature) {
  542. // Will be removed in 1.18
  543. if _, hasExcludeBalancerLabel := node.Labels[labelAlphaNodeRoleExcludeBalancer]; hasExcludeBalancerLabel {
  544. return false
  545. }
  546. if _, hasExcludeBalancerLabel := node.Labels[labelNodeRoleExcludeBalancer]; hasExcludeBalancerLabel {
  547. return false
  548. }
  549. }
  550. // If we have no info, don't accept
  551. if len(node.Status.Conditions) == 0 {
  552. return false
  553. }
  554. for _, cond := range node.Status.Conditions {
  555. // We consider the node for load balancing only when its NodeReady condition status
  556. // is ConditionTrue
  557. if cond.Type == v1.NodeReady && cond.Status != v1.ConditionTrue {
  558. klog.V(4).Infof("Ignoring node %v with %v condition status %v", node.Name, cond.Type, cond.Status)
  559. return false
  560. }
  561. }
  562. return true
  563. }
  564. }
  565. // nodeSyncLoop handles updating the hosts pointed to by all load
  566. // balancers whenever the set of nodes in the cluster changes.
  567. func (s *Controller) nodeSyncLoop() {
  568. newHosts, err := listWithPredicate(s.nodeLister, getNodeConditionPredicate())
  569. if err != nil {
  570. runtime.HandleError(fmt.Errorf("Failed to retrieve current set of nodes from node lister: %v", err))
  571. return
  572. }
  573. if nodeSlicesEqualForLB(newHosts, s.knownHosts) {
  574. // The set of nodes in the cluster hasn't changed, but we can retry
  575. // updating any services that we failed to update last time around.
  576. s.servicesToUpdate = s.updateLoadBalancerHosts(s.servicesToUpdate, newHosts)
  577. return
  578. }
  579. klog.V(2).Infof("Detected change in list of current cluster nodes. New node set: %v",
  580. nodeNames(newHosts))
  581. // Try updating all services, and save the ones that fail to try again next
  582. // round.
  583. s.servicesToUpdate = s.cache.allServices()
  584. numServices := len(s.servicesToUpdate)
  585. s.servicesToUpdate = s.updateLoadBalancerHosts(s.servicesToUpdate, newHosts)
  586. klog.V(2).Infof("Successfully updated %d out of %d load balancers to direct traffic to the updated set of nodes",
  587. numServices-len(s.servicesToUpdate), numServices)
  588. s.knownHosts = newHosts
  589. }
  590. // updateLoadBalancerHosts updates all existing load balancers so that
  591. // they will match the list of hosts provided.
  592. // Returns the list of services that couldn't be updated.
  593. func (s *Controller) updateLoadBalancerHosts(services []*v1.Service, hosts []*v1.Node) (servicesToRetry []*v1.Service) {
  594. for _, service := range services {
  595. func() {
  596. if service == nil {
  597. return
  598. }
  599. if err := s.lockedUpdateLoadBalancerHosts(service, hosts); err != nil {
  600. runtime.HandleError(fmt.Errorf("failed to update load balancer hosts for service %s/%s: %v", service.Namespace, service.Name, err))
  601. servicesToRetry = append(servicesToRetry, service)
  602. }
  603. }()
  604. }
  605. return servicesToRetry
  606. }
  607. // Updates the load balancer of a service, assuming we hold the mutex
  608. // associated with the service.
  609. func (s *Controller) lockedUpdateLoadBalancerHosts(service *v1.Service, hosts []*v1.Node) error {
  610. if !wantsLoadBalancer(service) {
  611. return nil
  612. }
  613. // This operation doesn't normally take very long (and happens pretty often), so we only record the final event
  614. err := s.balancer.UpdateLoadBalancer(context.TODO(), s.clusterName, service, hosts)
  615. if err == nil {
  616. // If there are no available nodes for LoadBalancer service, make a EventTypeWarning event for it.
  617. if len(hosts) == 0 {
  618. s.eventRecorder.Event(service, v1.EventTypeWarning, "UnAvailableLoadBalancer", "There are no available nodes for LoadBalancer")
  619. } else {
  620. s.eventRecorder.Event(service, v1.EventTypeNormal, "UpdatedLoadBalancer", "Updated load balancer with new hosts")
  621. }
  622. return nil
  623. }
  624. if err == cloudprovider.ImplementedElsewhere {
  625. // ImplementedElsewhere indicates that the UpdateLoadBalancer is a nop and the
  626. // functionality is implemented by a different controller. In this case, we
  627. // return immediately without doing anything.
  628. return nil
  629. }
  630. // It's only an actual error if the load balancer still exists.
  631. if _, exists, err := s.balancer.GetLoadBalancer(context.TODO(), s.clusterName, service); err != nil {
  632. runtime.HandleError(fmt.Errorf("failed to check if load balancer exists for service %s/%s: %v", service.Namespace, service.Name, err))
  633. } else if !exists {
  634. return nil
  635. }
  636. s.eventRecorder.Eventf(service, v1.EventTypeWarning, "UpdateLoadBalancerFailed", "Error updating load balancer with new hosts %v: %v", nodeNames(hosts), err)
  637. return err
  638. }
  639. func wantsLoadBalancer(service *v1.Service) bool {
  640. return service.Spec.Type == v1.ServiceTypeLoadBalancer
  641. }
  642. func loadBalancerIPsAreEqual(oldService, newService *v1.Service) bool {
  643. return oldService.Spec.LoadBalancerIP == newService.Spec.LoadBalancerIP
  644. }
  645. // syncService will sync the Service with the given key if it has had its expectations fulfilled,
  646. // meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be
  647. // invoked concurrently with the same key.
  648. func (s *Controller) syncService(key string) error {
  649. startTime := time.Now()
  650. defer func() {
  651. klog.V(4).Infof("Finished syncing service %q (%v)", key, time.Since(startTime))
  652. }()
  653. namespace, name, err := cache.SplitMetaNamespaceKey(key)
  654. if err != nil {
  655. return err
  656. }
  657. // service holds the latest service info from apiserver
  658. service, err := s.serviceLister.Services(namespace).Get(name)
  659. switch {
  660. case errors.IsNotFound(err):
  661. // service absence in store means watcher caught the deletion, ensure LB info is cleaned
  662. err = s.processServiceDeletion(key)
  663. case err != nil:
  664. runtime.HandleError(fmt.Errorf("Unable to retrieve service %v from store: %v", key, err))
  665. default:
  666. err = s.processServiceCreateOrUpdate(service, key)
  667. }
  668. return err
  669. }
  670. func (s *Controller) processServiceDeletion(key string) error {
  671. cachedService, ok := s.cache.get(key)
  672. if !ok {
  673. // Cache does not contains the key means:
  674. // - We didn't create a Load Balancer for the deleted service at all.
  675. // - We already deleted the Load Balancer that was created for the service.
  676. // In both cases we have nothing left to do.
  677. return nil
  678. }
  679. klog.V(2).Infof("Service %v has been deleted. Attempting to cleanup load balancer resources", key)
  680. if err := s.processLoadBalancerDelete(cachedService.state, key); err != nil {
  681. return err
  682. }
  683. s.cache.delete(key)
  684. return nil
  685. }
  686. func (s *Controller) processLoadBalancerDelete(service *v1.Service, key string) error {
  687. // delete load balancer info only if the service type is LoadBalancer
  688. if !wantsLoadBalancer(service) {
  689. return nil
  690. }
  691. s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer")
  692. if err := s.balancer.EnsureLoadBalancerDeleted(context.TODO(), s.clusterName, service); err != nil {
  693. s.eventRecorder.Eventf(service, v1.EventTypeWarning, "DeleteLoadBalancerFailed", "Error deleting load balancer: %v", err)
  694. return err
  695. }
  696. s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer")
  697. return nil
  698. }
  699. // addFinalizer patches the service to add finalizer.
  700. func (s *Controller) addFinalizer(service *v1.Service) error {
  701. if servicehelper.HasLBFinalizer(service) {
  702. return nil
  703. }
  704. // Make a copy so we don't mutate the shared informer cache.
  705. updated := service.DeepCopy()
  706. updated.ObjectMeta.Finalizers = append(updated.ObjectMeta.Finalizers, servicehelper.LoadBalancerCleanupFinalizer)
  707. klog.V(2).Infof("Adding finalizer to service %s/%s", updated.Namespace, updated.Name)
  708. // TODO(87447) use PatchService from k8s.io/cloud-provider/service/helpers
  709. _, err := patch(s.kubeClient.CoreV1(), service, updated)
  710. return err
  711. }
  712. // removeFinalizer patches the service to remove finalizer.
  713. func (s *Controller) removeFinalizer(service *v1.Service) error {
  714. if !servicehelper.HasLBFinalizer(service) {
  715. return nil
  716. }
  717. // Make a copy so we don't mutate the shared informer cache.
  718. updated := service.DeepCopy()
  719. updated.ObjectMeta.Finalizers = removeString(updated.ObjectMeta.Finalizers, servicehelper.LoadBalancerCleanupFinalizer)
  720. klog.V(2).Infof("Removing finalizer from service %s/%s", updated.Namespace, updated.Name)
  721. _, err := patch(s.kubeClient.CoreV1(), service, updated)
  722. return err
  723. }
  724. // removeString returns a newly created []string that contains all items from slice that
  725. // are not equal to s.
  726. func removeString(slice []string, s string) []string {
  727. var newSlice []string
  728. for _, item := range slice {
  729. if item != s {
  730. newSlice = append(newSlice, item)
  731. }
  732. }
  733. return newSlice
  734. }
  735. // patchStatus patches the service with the given LoadBalancerStatus.
  736. func (s *Controller) patchStatus(service *v1.Service, previousStatus, newStatus *v1.LoadBalancerStatus) error {
  737. if servicehelper.LoadBalancerStatusEqual(previousStatus, newStatus) {
  738. return nil
  739. }
  740. // Make a copy so we don't mutate the shared informer cache.
  741. updated := service.DeepCopy()
  742. updated.Status.LoadBalancer = *newStatus
  743. klog.V(2).Infof("Patching status for service %s/%s", updated.Namespace, updated.Name)
  744. _, err := patch(s.kubeClient.CoreV1(), service, updated)
  745. return err
  746. }
  747. // NodeConditionPredicate is a function that indicates whether the given node's conditions meet
  748. // some set of criteria defined by the function.
  749. type NodeConditionPredicate func(node *v1.Node) bool
  750. // listWithPredicate gets nodes that matches predicate function.
  751. func listWithPredicate(nodeLister corelisters.NodeLister, predicate NodeConditionPredicate) ([]*v1.Node, error) {
  752. nodes, err := nodeLister.List(labels.Everything())
  753. if err != nil {
  754. return nil, err
  755. }
  756. var filtered []*v1.Node
  757. for i := range nodes {
  758. if predicate(nodes[i]) {
  759. filtered = append(filtered, nodes[i])
  760. }
  761. }
  762. return filtered, nil
  763. }