endpoints_controller.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package endpoint
  14. import (
  15. "context"
  16. "fmt"
  17. "reflect"
  18. "strconv"
  19. "time"
  20. v1 "k8s.io/api/core/v1"
  21. apiequality "k8s.io/apimachinery/pkg/api/equality"
  22. "k8s.io/apimachinery/pkg/api/errors"
  23. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  24. "k8s.io/apimachinery/pkg/labels"
  25. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  26. "k8s.io/apimachinery/pkg/util/wait"
  27. utilfeature "k8s.io/apiserver/pkg/util/feature"
  28. coreinformers "k8s.io/client-go/informers/core/v1"
  29. clientset "k8s.io/client-go/kubernetes"
  30. "k8s.io/client-go/kubernetes/scheme"
  31. v1core "k8s.io/client-go/kubernetes/typed/core/v1"
  32. corelisters "k8s.io/client-go/listers/core/v1"
  33. "k8s.io/client-go/tools/cache"
  34. "k8s.io/client-go/tools/leaderelection/resourcelock"
  35. "k8s.io/client-go/tools/record"
  36. "k8s.io/client-go/util/workqueue"
  37. "k8s.io/component-base/metrics/prometheus/ratelimiter"
  38. "k8s.io/klog"
  39. "k8s.io/kubernetes/pkg/api/v1/endpoints"
  40. podutil "k8s.io/kubernetes/pkg/api/v1/pod"
  41. api "k8s.io/kubernetes/pkg/apis/core"
  42. helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
  43. "k8s.io/kubernetes/pkg/controller"
  44. endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint"
  45. "k8s.io/kubernetes/pkg/features"
  46. utillabels "k8s.io/kubernetes/pkg/util/labels"
  47. utilnet "k8s.io/utils/net"
  48. )
  49. const (
  50. // maxRetries is the number of times a service will be retried before it is dropped out of the queue.
  51. // With the current rate-limiter in use (5ms*2^(maxRetries-1)) the following numbers represent the
  52. // sequence of delays between successive queuings of a service.
  53. //
  54. // 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s, 20.4s, 41s, 82s
  55. maxRetries = 15
  56. // TolerateUnreadyEndpointsAnnotation is an annotation on the Service denoting if the endpoints
  57. // controller should go ahead and create endpoints for unready pods. This annotation is
  58. // currently only used by StatefulSets, where we need the pod to be DNS
  59. // resolvable during initialization and termination. In this situation we
  60. // create a headless Service just for the StatefulSet, and clients shouldn't
  61. // be using this Service for anything so unready endpoints don't matter.
  62. // Endpoints of these Services retain their DNS records and continue
  63. // receiving traffic for the Service from the moment the kubelet starts all
  64. // containers in the pod and marks it "Running", till the kubelet stops all
  65. // containers and deletes the pod from the apiserver.
  66. // This field is deprecated. v1.Service.PublishNotReadyAddresses will replace it
  67. // subsequent releases. It will be removed no sooner than 1.13.
  68. TolerateUnreadyEndpointsAnnotation = "service.alpha.kubernetes.io/tolerate-unready-endpoints"
  69. )
  70. // NewEndpointController returns a new *EndpointController.
  71. func NewEndpointController(podInformer coreinformers.PodInformer, serviceInformer coreinformers.ServiceInformer,
  72. endpointsInformer coreinformers.EndpointsInformer, client clientset.Interface, endpointUpdatesBatchPeriod time.Duration) *EndpointController {
  73. broadcaster := record.NewBroadcaster()
  74. broadcaster.StartLogging(klog.Infof)
  75. broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
  76. recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "endpoint-controller"})
  77. if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil {
  78. ratelimiter.RegisterMetricAndTrackRateLimiterUsage("endpoint_controller", client.CoreV1().RESTClient().GetRateLimiter())
  79. }
  80. e := &EndpointController{
  81. client: client,
  82. queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "endpoint"),
  83. workerLoopPeriod: time.Second,
  84. }
  85. serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  86. AddFunc: e.onServiceUpdate,
  87. UpdateFunc: func(old, cur interface{}) {
  88. e.onServiceUpdate(cur)
  89. },
  90. DeleteFunc: e.onServiceDelete,
  91. })
  92. e.serviceLister = serviceInformer.Lister()
  93. e.servicesSynced = serviceInformer.Informer().HasSynced
  94. podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  95. AddFunc: e.addPod,
  96. UpdateFunc: e.updatePod,
  97. DeleteFunc: e.deletePod,
  98. })
  99. e.podLister = podInformer.Lister()
  100. e.podsSynced = podInformer.Informer().HasSynced
  101. e.endpointsLister = endpointsInformer.Lister()
  102. e.endpointsSynced = endpointsInformer.Informer().HasSynced
  103. e.triggerTimeTracker = endpointutil.NewTriggerTimeTracker()
  104. e.eventBroadcaster = broadcaster
  105. e.eventRecorder = recorder
  106. e.endpointUpdatesBatchPeriod = endpointUpdatesBatchPeriod
  107. e.serviceSelectorCache = endpointutil.NewServiceSelectorCache()
  108. return e
  109. }
  110. // EndpointController manages selector-based service endpoints.
  111. type EndpointController struct {
  112. client clientset.Interface
  113. eventBroadcaster record.EventBroadcaster
  114. eventRecorder record.EventRecorder
  115. // serviceLister is able to list/get services and is populated by the shared informer passed to
  116. // NewEndpointController.
  117. serviceLister corelisters.ServiceLister
  118. // servicesSynced returns true if the service shared informer has been synced at least once.
  119. // Added as a member to the struct to allow injection for testing.
  120. servicesSynced cache.InformerSynced
  121. // podLister is able to list/get pods and is populated by the shared informer passed to
  122. // NewEndpointController.
  123. podLister corelisters.PodLister
  124. // podsSynced returns true if the pod shared informer has been synced at least once.
  125. // Added as a member to the struct to allow injection for testing.
  126. podsSynced cache.InformerSynced
  127. // endpointsLister is able to list/get endpoints and is populated by the shared informer passed to
  128. // NewEndpointController.
  129. endpointsLister corelisters.EndpointsLister
  130. // endpointsSynced returns true if the endpoints shared informer has been synced at least once.
  131. // Added as a member to the struct to allow injection for testing.
  132. endpointsSynced cache.InformerSynced
  133. // Services that need to be updated. A channel is inappropriate here,
  134. // because it allows services with lots of pods to be serviced much
  135. // more often than services with few pods; it also would cause a
  136. // service that's inserted multiple times to be processed more than
  137. // necessary.
  138. queue workqueue.RateLimitingInterface
  139. // workerLoopPeriod is the time between worker runs. The workers process the queue of service and pod changes.
  140. workerLoopPeriod time.Duration
  141. // triggerTimeTracker is an util used to compute and export the EndpointsLastChangeTriggerTime
  142. // annotation.
  143. triggerTimeTracker *endpointutil.TriggerTimeTracker
  144. endpointUpdatesBatchPeriod time.Duration
  145. // serviceSelectorCache is a cache of service selectors to avoid high CPU consumption caused by frequent calls
  146. // to AsSelectorPreValidated (see #73527)
  147. serviceSelectorCache *endpointutil.ServiceSelectorCache
  148. }
  149. // Run will not return until stopCh is closed. workers determines how many
  150. // endpoints will be handled in parallel.
  151. func (e *EndpointController) Run(workers int, stopCh <-chan struct{}) {
  152. defer utilruntime.HandleCrash()
  153. defer e.queue.ShutDown()
  154. klog.Infof("Starting endpoint controller")
  155. defer klog.Infof("Shutting down endpoint controller")
  156. if !cache.WaitForNamedCacheSync("endpoint", stopCh, e.podsSynced, e.servicesSynced, e.endpointsSynced) {
  157. return
  158. }
  159. for i := 0; i < workers; i++ {
  160. go wait.Until(e.worker, e.workerLoopPeriod, stopCh)
  161. }
  162. go func() {
  163. defer utilruntime.HandleCrash()
  164. e.checkLeftoverEndpoints()
  165. }()
  166. <-stopCh
  167. }
  168. // When a pod is added, figure out what services it will be a member of and
  169. // enqueue them. obj must have *v1.Pod type.
  170. func (e *EndpointController) addPod(obj interface{}) {
  171. pod := obj.(*v1.Pod)
  172. services, err := e.serviceSelectorCache.GetPodServiceMemberships(e.serviceLister, pod)
  173. if err != nil {
  174. utilruntime.HandleError(fmt.Errorf("Unable to get pod %s/%s's service memberships: %v", pod.Namespace, pod.Name, err))
  175. return
  176. }
  177. for key := range services {
  178. e.queue.AddAfter(key, e.endpointUpdatesBatchPeriod)
  179. }
  180. }
  181. func podToEndpointAddressForService(svc *v1.Service, pod *v1.Pod) (*v1.EndpointAddress, error) {
  182. if !utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) {
  183. return podToEndpointAddress(pod), nil
  184. }
  185. // api-server service controller ensured that the service got the correct IP Family
  186. // according to user setup, here we only need to match EndPoint IPs' family to service
  187. // actual IP family. as in, we don't need to check service.IPFamily
  188. ipv6ClusterIP := utilnet.IsIPv6String(svc.Spec.ClusterIP)
  189. for _, podIP := range pod.Status.PodIPs {
  190. ipv6PodIP := utilnet.IsIPv6String(podIP.IP)
  191. // same family?
  192. // TODO (khenidak) when we remove the max of 2 PodIP limit from pods
  193. // we will have to return multiple endpoint addresses
  194. if ipv6ClusterIP == ipv6PodIP {
  195. return &v1.EndpointAddress{
  196. IP: podIP.IP,
  197. NodeName: &pod.Spec.NodeName,
  198. TargetRef: &v1.ObjectReference{
  199. Kind: "Pod",
  200. Namespace: pod.ObjectMeta.Namespace,
  201. Name: pod.ObjectMeta.Name,
  202. UID: pod.ObjectMeta.UID,
  203. ResourceVersion: pod.ObjectMeta.ResourceVersion,
  204. }}, nil
  205. }
  206. }
  207. return nil, fmt.Errorf("failed to find a matching endpoint for service %v", svc.Name)
  208. }
  209. func podToEndpointAddress(pod *v1.Pod) *v1.EndpointAddress {
  210. return &v1.EndpointAddress{
  211. IP: pod.Status.PodIP,
  212. NodeName: &pod.Spec.NodeName,
  213. TargetRef: &v1.ObjectReference{
  214. Kind: "Pod",
  215. Namespace: pod.ObjectMeta.Namespace,
  216. Name: pod.ObjectMeta.Name,
  217. UID: pod.ObjectMeta.UID,
  218. ResourceVersion: pod.ObjectMeta.ResourceVersion,
  219. }}
  220. }
  221. func endpointChanged(pod1, pod2 *v1.Pod) bool {
  222. endpointAddress1 := podToEndpointAddress(pod1)
  223. endpointAddress2 := podToEndpointAddress(pod2)
  224. endpointAddress1.TargetRef.ResourceVersion = ""
  225. endpointAddress2.TargetRef.ResourceVersion = ""
  226. return !reflect.DeepEqual(endpointAddress1, endpointAddress2)
  227. }
  228. // When a pod is updated, figure out what services it used to be a member of
  229. // and what services it will be a member of, and enqueue the union of these.
  230. // old and cur must be *v1.Pod types.
  231. func (e *EndpointController) updatePod(old, cur interface{}) {
  232. services := endpointutil.GetServicesToUpdateOnPodChange(e.serviceLister, e.serviceSelectorCache, old, cur, endpointChanged)
  233. for key := range services {
  234. e.queue.AddAfter(key, e.endpointUpdatesBatchPeriod)
  235. }
  236. }
  237. // When a pod is deleted, enqueue the services the pod used to be a member of.
  238. // obj could be an *v1.Pod, or a DeletionFinalStateUnknown marker item.
  239. func (e *EndpointController) deletePod(obj interface{}) {
  240. pod := endpointutil.GetPodFromDeleteAction(obj)
  241. if pod != nil {
  242. e.addPod(pod)
  243. }
  244. }
  245. // onServiceUpdate updates the Service Selector in the cache and queues the Service for processing.
  246. func (e *EndpointController) onServiceUpdate(obj interface{}) {
  247. key, err := controller.KeyFunc(obj)
  248. if err != nil {
  249. utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
  250. return
  251. }
  252. _ = e.serviceSelectorCache.Update(key, obj.(*v1.Service).Spec.Selector)
  253. e.queue.Add(key)
  254. }
  255. // onServiceDelete removes the Service Selector from the cache and queues the Service for processing.
  256. func (e *EndpointController) onServiceDelete(obj interface{}) {
  257. key, err := controller.KeyFunc(obj)
  258. if err != nil {
  259. utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
  260. return
  261. }
  262. e.serviceSelectorCache.Delete(key)
  263. e.queue.Add(key)
  264. }
  265. // worker runs a worker thread that just dequeues items, processes them, and
  266. // marks them done. You may run as many of these in parallel as you wish; the
  267. // workqueue guarantees that they will not end up processing the same service
  268. // at the same time.
  269. func (e *EndpointController) worker() {
  270. for e.processNextWorkItem() {
  271. }
  272. }
  273. func (e *EndpointController) processNextWorkItem() bool {
  274. eKey, quit := e.queue.Get()
  275. if quit {
  276. return false
  277. }
  278. defer e.queue.Done(eKey)
  279. err := e.syncService(eKey.(string))
  280. e.handleErr(err, eKey)
  281. return true
  282. }
  283. func (e *EndpointController) handleErr(err error, key interface{}) {
  284. if err == nil {
  285. e.queue.Forget(key)
  286. return
  287. }
  288. if e.queue.NumRequeues(key) < maxRetries {
  289. klog.V(2).Infof("Error syncing endpoints for service %q, retrying. Error: %v", key, err)
  290. e.queue.AddRateLimited(key)
  291. return
  292. }
  293. klog.Warningf("Dropping service %q out of the queue: %v", key, err)
  294. e.queue.Forget(key)
  295. utilruntime.HandleError(err)
  296. }
  297. func (e *EndpointController) syncService(key string) error {
  298. startTime := time.Now()
  299. defer func() {
  300. klog.V(4).Infof("Finished syncing service %q endpoints. (%v)", key, time.Since(startTime))
  301. }()
  302. namespace, name, err := cache.SplitMetaNamespaceKey(key)
  303. if err != nil {
  304. return err
  305. }
  306. service, err := e.serviceLister.Services(namespace).Get(name)
  307. if err != nil {
  308. if !errors.IsNotFound(err) {
  309. return err
  310. }
  311. // Delete the corresponding endpoint, as the service has been deleted.
  312. // TODO: Please note that this will delete an endpoint when a
  313. // service is deleted. However, if we're down at the time when
  314. // the service is deleted, we will miss that deletion, so this
  315. // doesn't completely solve the problem. See #6877.
  316. err = e.client.CoreV1().Endpoints(namespace).Delete(context.TODO(), name, nil)
  317. if err != nil && !errors.IsNotFound(err) {
  318. return err
  319. }
  320. e.triggerTimeTracker.DeleteService(namespace, name)
  321. return nil
  322. }
  323. if service.Spec.Selector == nil {
  324. // services without a selector receive no endpoints from this controller;
  325. // these services will receive the endpoints that are created out-of-band via the REST API.
  326. return nil
  327. }
  328. klog.V(5).Infof("About to update endpoints for service %q", key)
  329. pods, err := e.podLister.Pods(service.Namespace).List(labels.Set(service.Spec.Selector).AsSelectorPreValidated())
  330. if err != nil {
  331. // Since we're getting stuff from a local cache, it is
  332. // basically impossible to get this error.
  333. return err
  334. }
  335. // If the user specified the older (deprecated) annotation, we have to respect it.
  336. tolerateUnreadyEndpoints := service.Spec.PublishNotReadyAddresses
  337. if v, ok := service.Annotations[TolerateUnreadyEndpointsAnnotation]; ok {
  338. b, err := strconv.ParseBool(v)
  339. if err == nil {
  340. tolerateUnreadyEndpoints = b
  341. } else {
  342. utilruntime.HandleError(fmt.Errorf("Failed to parse annotation %v: %v", TolerateUnreadyEndpointsAnnotation, err))
  343. }
  344. }
  345. // We call ComputeEndpointLastChangeTriggerTime here to make sure that the
  346. // state of the trigger time tracker gets updated even if the sync turns out
  347. // to be no-op and we don't update the endpoints object.
  348. endpointsLastChangeTriggerTime := e.triggerTimeTracker.
  349. ComputeEndpointLastChangeTriggerTime(namespace, service, pods)
  350. subsets := []v1.EndpointSubset{}
  351. var totalReadyEps int
  352. var totalNotReadyEps int
  353. for _, pod := range pods {
  354. if len(pod.Status.PodIP) == 0 {
  355. klog.V(5).Infof("Failed to find an IP for pod %s/%s", pod.Namespace, pod.Name)
  356. continue
  357. }
  358. if !tolerateUnreadyEndpoints && pod.DeletionTimestamp != nil {
  359. klog.V(5).Infof("Pod is being deleted %s/%s", pod.Namespace, pod.Name)
  360. continue
  361. }
  362. ep, err := podToEndpointAddressForService(service, pod)
  363. if err != nil {
  364. // this will happen, if the cluster runs with some nodes configured as dual stack and some as not
  365. // such as the case of an upgrade..
  366. klog.V(2).Infof("failed to find endpoint for service:%v with ClusterIP:%v on pod:%v with error:%v", service.Name, service.Spec.ClusterIP, pod.Name, err)
  367. continue
  368. }
  369. epa := *ep
  370. if endpointutil.ShouldSetHostname(pod, service) {
  371. epa.Hostname = pod.Spec.Hostname
  372. }
  373. // Allow headless service not to have ports.
  374. if len(service.Spec.Ports) == 0 {
  375. if service.Spec.ClusterIP == api.ClusterIPNone {
  376. subsets, totalReadyEps, totalNotReadyEps = addEndpointSubset(subsets, pod, epa, nil, tolerateUnreadyEndpoints)
  377. // No need to repack subsets for headless service without ports.
  378. }
  379. } else {
  380. for i := range service.Spec.Ports {
  381. servicePort := &service.Spec.Ports[i]
  382. portName := servicePort.Name
  383. portProto := servicePort.Protocol
  384. portNum, err := podutil.FindPort(pod, servicePort)
  385. if err != nil {
  386. klog.V(4).Infof("Failed to find port for service %s/%s: %v", service.Namespace, service.Name, err)
  387. continue
  388. }
  389. var readyEps, notReadyEps int
  390. epp := &v1.EndpointPort{Name: portName, Port: int32(portNum), Protocol: portProto}
  391. subsets, readyEps, notReadyEps = addEndpointSubset(subsets, pod, epa, epp, tolerateUnreadyEndpoints)
  392. totalReadyEps = totalReadyEps + readyEps
  393. totalNotReadyEps = totalNotReadyEps + notReadyEps
  394. }
  395. }
  396. }
  397. subsets = endpoints.RepackSubsets(subsets)
  398. // See if there's actually an update here.
  399. currentEndpoints, err := e.endpointsLister.Endpoints(service.Namespace).Get(service.Name)
  400. if err != nil {
  401. if errors.IsNotFound(err) {
  402. currentEndpoints = &v1.Endpoints{
  403. ObjectMeta: metav1.ObjectMeta{
  404. Name: service.Name,
  405. Labels: service.Labels,
  406. },
  407. }
  408. } else {
  409. return err
  410. }
  411. }
  412. createEndpoints := len(currentEndpoints.ResourceVersion) == 0
  413. if !createEndpoints &&
  414. apiequality.Semantic.DeepEqual(currentEndpoints.Subsets, subsets) &&
  415. apiequality.Semantic.DeepEqual(currentEndpoints.Labels, service.Labels) {
  416. klog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name)
  417. return nil
  418. }
  419. newEndpoints := currentEndpoints.DeepCopy()
  420. newEndpoints.Subsets = subsets
  421. newEndpoints.Labels = service.Labels
  422. if newEndpoints.Annotations == nil {
  423. newEndpoints.Annotations = make(map[string]string)
  424. }
  425. if !endpointsLastChangeTriggerTime.IsZero() {
  426. newEndpoints.Annotations[v1.EndpointsLastChangeTriggerTime] =
  427. endpointsLastChangeTriggerTime.Format(time.RFC3339Nano)
  428. } else { // No new trigger time, clear the annotation.
  429. delete(newEndpoints.Annotations, v1.EndpointsLastChangeTriggerTime)
  430. }
  431. if newEndpoints.Labels == nil {
  432. newEndpoints.Labels = make(map[string]string)
  433. }
  434. if !helper.IsServiceIPSet(service) {
  435. newEndpoints.Labels = utillabels.CloneAndAddLabel(newEndpoints.Labels, v1.IsHeadlessService, "")
  436. } else {
  437. newEndpoints.Labels = utillabels.CloneAndRemoveLabel(newEndpoints.Labels, v1.IsHeadlessService)
  438. }
  439. klog.V(4).Infof("Update endpoints for %v/%v, ready: %d not ready: %d", service.Namespace, service.Name, totalReadyEps, totalNotReadyEps)
  440. if createEndpoints {
  441. // No previous endpoints, create them
  442. _, err = e.client.CoreV1().Endpoints(service.Namespace).Create(context.TODO(), newEndpoints, metav1.CreateOptions{})
  443. } else {
  444. // Pre-existing
  445. _, err = e.client.CoreV1().Endpoints(service.Namespace).Update(context.TODO(), newEndpoints, metav1.UpdateOptions{})
  446. }
  447. if err != nil {
  448. if createEndpoints && errors.IsForbidden(err) {
  449. // A request is forbidden primarily for two reasons:
  450. // 1. namespace is terminating, endpoint creation is not allowed by default.
  451. // 2. policy is misconfigured, in which case no service would function anywhere.
  452. // Given the frequency of 1, we log at a lower level.
  453. klog.V(5).Infof("Forbidden from creating endpoints: %v", err)
  454. // If the namespace is terminating, creates will continue to fail. Simply drop the item.
  455. if errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
  456. return nil
  457. }
  458. }
  459. if createEndpoints {
  460. e.eventRecorder.Eventf(newEndpoints, v1.EventTypeWarning, "FailedToCreateEndpoint", "Failed to create endpoint for service %v/%v: %v", service.Namespace, service.Name, err)
  461. } else {
  462. e.eventRecorder.Eventf(newEndpoints, v1.EventTypeWarning, "FailedToUpdateEndpoint", "Failed to update endpoint %v/%v: %v", service.Namespace, service.Name, err)
  463. }
  464. return err
  465. }
  466. return nil
  467. }
  468. // checkLeftoverEndpoints lists all currently existing endpoints and adds their
  469. // service to the queue. This will detect endpoints that exist with no
  470. // corresponding service; these endpoints need to be deleted. We only need to
  471. // do this once on startup, because in steady-state these are detected (but
  472. // some stragglers could have been left behind if the endpoint controller
  473. // reboots).
  474. func (e *EndpointController) checkLeftoverEndpoints() {
  475. list, err := e.endpointsLister.List(labels.Everything())
  476. if err != nil {
  477. utilruntime.HandleError(fmt.Errorf("Unable to list endpoints (%v); orphaned endpoints will not be cleaned up. (They're pretty harmless, but you can restart this component if you want another attempt made.)", err))
  478. return
  479. }
  480. for _, ep := range list {
  481. if _, ok := ep.Annotations[resourcelock.LeaderElectionRecordAnnotationKey]; ok {
  482. // when there are multiple controller-manager instances,
  483. // we observe that it will delete leader-election endpoints after 5min
  484. // and cause re-election
  485. // so skip the delete here
  486. // as leader-election only have endpoints without service
  487. continue
  488. }
  489. key, err := controller.KeyFunc(ep)
  490. if err != nil {
  491. utilruntime.HandleError(fmt.Errorf("Unable to get key for endpoint %#v", ep))
  492. continue
  493. }
  494. e.queue.Add(key)
  495. }
  496. }
  497. func addEndpointSubset(subsets []v1.EndpointSubset, pod *v1.Pod, epa v1.EndpointAddress,
  498. epp *v1.EndpointPort, tolerateUnreadyEndpoints bool) ([]v1.EndpointSubset, int, int) {
  499. var readyEps int
  500. var notReadyEps int
  501. ports := []v1.EndpointPort{}
  502. if epp != nil {
  503. ports = append(ports, *epp)
  504. }
  505. if tolerateUnreadyEndpoints || podutil.IsPodReady(pod) {
  506. subsets = append(subsets, v1.EndpointSubset{
  507. Addresses: []v1.EndpointAddress{epa},
  508. Ports: ports,
  509. })
  510. readyEps++
  511. } else if shouldPodBeInEndpoints(pod) {
  512. klog.V(5).Infof("Pod is out of service: %s/%s", pod.Namespace, pod.Name)
  513. subsets = append(subsets, v1.EndpointSubset{
  514. NotReadyAddresses: []v1.EndpointAddress{epa},
  515. Ports: ports,
  516. })
  517. notReadyEps++
  518. }
  519. return subsets, readyEps, notReadyEps
  520. }
  521. func shouldPodBeInEndpoints(pod *v1.Pod) bool {
  522. switch pod.Spec.RestartPolicy {
  523. case v1.RestartPolicyNever:
  524. return pod.Status.Phase != v1.PodFailed && pod.Status.Phase != v1.PodSucceeded
  525. case v1.RestartPolicyOnFailure:
  526. return pod.Status.Phase != v1.PodSucceeded
  527. default:
  528. return true
  529. }
  530. }