endpoints_controller.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package endpoint
  14. import (
  15. "fmt"
  16. "reflect"
  17. "strconv"
  18. "time"
  19. "k8s.io/api/core/v1"
  20. apiequality "k8s.io/apimachinery/pkg/api/equality"
  21. "k8s.io/apimachinery/pkg/api/errors"
  22. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  23. "k8s.io/apimachinery/pkg/labels"
  24. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  25. "k8s.io/apimachinery/pkg/util/sets"
  26. "k8s.io/apimachinery/pkg/util/wait"
  27. coreinformers "k8s.io/client-go/informers/core/v1"
  28. clientset "k8s.io/client-go/kubernetes"
  29. "k8s.io/client-go/kubernetes/scheme"
  30. v1core "k8s.io/client-go/kubernetes/typed/core/v1"
  31. corelisters "k8s.io/client-go/listers/core/v1"
  32. "k8s.io/client-go/tools/cache"
  33. "k8s.io/client-go/tools/leaderelection/resourcelock"
  34. "k8s.io/client-go/tools/record"
  35. "k8s.io/client-go/util/workqueue"
  36. "k8s.io/klog"
  37. "k8s.io/kubernetes/pkg/api/v1/endpoints"
  38. podutil "k8s.io/kubernetes/pkg/api/v1/pod"
  39. api "k8s.io/kubernetes/pkg/apis/core"
  40. "k8s.io/kubernetes/pkg/controller"
  41. "k8s.io/kubernetes/pkg/util/metrics"
  42. )
  43. const (
  44. // maxRetries is the number of times a service will be retried before it is dropped out of the queue.
  45. // With the current rate-limiter in use (5ms*2^(maxRetries-1)) the following numbers represent the
  46. // sequence of delays between successive queuings of a service.
  47. //
  48. // 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s, 20.4s, 41s, 82s
  49. maxRetries = 15
  50. // An annotation on the Service denoting if the endpoints controller should
  51. // go ahead and create endpoints for unready pods. This annotation is
  52. // currently only used by StatefulSets, where we need the pod to be DNS
  53. // resolvable during initialization and termination. In this situation we
  54. // create a headless Service just for the StatefulSet, and clients shouldn't
  55. // be using this Service for anything so unready endpoints don't matter.
  56. // Endpoints of these Services retain their DNS records and continue
  57. // receiving traffic for the Service from the moment the kubelet starts all
  58. // containers in the pod and marks it "Running", till the kubelet stops all
  59. // containers and deletes the pod from the apiserver.
  60. // This field is deprecated. v1.Service.PublishNotReadyAddresses will replace it
  61. // subsequent releases. It will be removed no sooner than 1.13.
  62. TolerateUnreadyEndpointsAnnotation = "service.alpha.kubernetes.io/tolerate-unready-endpoints"
  63. )
  64. // NewEndpointController returns a new *EndpointController.
  65. func NewEndpointController(podInformer coreinformers.PodInformer, serviceInformer coreinformers.ServiceInformer,
  66. endpointsInformer coreinformers.EndpointsInformer, client clientset.Interface) *EndpointController {
  67. broadcaster := record.NewBroadcaster()
  68. broadcaster.StartLogging(klog.Infof)
  69. broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
  70. recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "endpoint-controller"})
  71. if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil {
  72. metrics.RegisterMetricAndTrackRateLimiterUsage("endpoint_controller", client.CoreV1().RESTClient().GetRateLimiter())
  73. }
  74. e := &EndpointController{
  75. client: client,
  76. queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "endpoint"),
  77. workerLoopPeriod: time.Second,
  78. }
  79. serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  80. AddFunc: e.enqueueService,
  81. UpdateFunc: func(old, cur interface{}) {
  82. e.enqueueService(cur)
  83. },
  84. DeleteFunc: e.enqueueService,
  85. })
  86. e.serviceLister = serviceInformer.Lister()
  87. e.servicesSynced = serviceInformer.Informer().HasSynced
  88. podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  89. AddFunc: e.addPod,
  90. UpdateFunc: e.updatePod,
  91. DeleteFunc: e.deletePod,
  92. })
  93. e.podLister = podInformer.Lister()
  94. e.podsSynced = podInformer.Informer().HasSynced
  95. e.endpointsLister = endpointsInformer.Lister()
  96. e.endpointsSynced = endpointsInformer.Informer().HasSynced
  97. e.triggerTimeTracker = NewTriggerTimeTracker()
  98. e.eventBroadcaster = broadcaster
  99. e.eventRecorder = recorder
  100. return e
  101. }
  102. // EndpointController manages selector-based service endpoints.
  103. type EndpointController struct {
  104. client clientset.Interface
  105. eventBroadcaster record.EventBroadcaster
  106. eventRecorder record.EventRecorder
  107. // serviceLister is able to list/get services and is populated by the shared informer passed to
  108. // NewEndpointController.
  109. serviceLister corelisters.ServiceLister
  110. // servicesSynced returns true if the service shared informer has been synced at least once.
  111. // Added as a member to the struct to allow injection for testing.
  112. servicesSynced cache.InformerSynced
  113. // podLister is able to list/get pods and is populated by the shared informer passed to
  114. // NewEndpointController.
  115. podLister corelisters.PodLister
  116. // podsSynced returns true if the pod shared informer has been synced at least once.
  117. // Added as a member to the struct to allow injection for testing.
  118. podsSynced cache.InformerSynced
  119. // endpointsLister is able to list/get endpoints and is populated by the shared informer passed to
  120. // NewEndpointController.
  121. endpointsLister corelisters.EndpointsLister
  122. // endpointsSynced returns true if the endpoints shared informer has been synced at least once.
  123. // Added as a member to the struct to allow injection for testing.
  124. endpointsSynced cache.InformerSynced
  125. // Services that need to be updated. A channel is inappropriate here,
  126. // because it allows services with lots of pods to be serviced much
  127. // more often than services with few pods; it also would cause a
  128. // service that's inserted multiple times to be processed more than
  129. // necessary.
  130. queue workqueue.RateLimitingInterface
  131. // workerLoopPeriod is the time between worker runs. The workers process the queue of service and pod changes.
  132. workerLoopPeriod time.Duration
  133. // triggerTimeTracker is an util used to compute and export the EndpointsLastChangeTriggerTime
  134. // annotation.
  135. triggerTimeTracker *TriggerTimeTracker
  136. }
  137. // Run will not return until stopCh is closed. workers determines how many
  138. // endpoints will be handled in parallel.
  139. func (e *EndpointController) Run(workers int, stopCh <-chan struct{}) {
  140. defer utilruntime.HandleCrash()
  141. defer e.queue.ShutDown()
  142. klog.Infof("Starting endpoint controller")
  143. defer klog.Infof("Shutting down endpoint controller")
  144. if !controller.WaitForCacheSync("endpoint", stopCh, e.podsSynced, e.servicesSynced, e.endpointsSynced) {
  145. return
  146. }
  147. for i := 0; i < workers; i++ {
  148. go wait.Until(e.worker, e.workerLoopPeriod, stopCh)
  149. }
  150. go func() {
  151. defer utilruntime.HandleCrash()
  152. e.checkLeftoverEndpoints()
  153. }()
  154. <-stopCh
  155. }
  156. func (e *EndpointController) getPodServiceMemberships(pod *v1.Pod) (sets.String, error) {
  157. set := sets.String{}
  158. services, err := e.serviceLister.GetPodServices(pod)
  159. if err != nil {
  160. // don't log this error because this function makes pointless
  161. // errors when no services match.
  162. return set, nil
  163. }
  164. for i := range services {
  165. key, err := controller.KeyFunc(services[i])
  166. if err != nil {
  167. return nil, err
  168. }
  169. set.Insert(key)
  170. }
  171. return set, nil
  172. }
  173. // When a pod is added, figure out what services it will be a member of and
  174. // enqueue them. obj must have *v1.Pod type.
  175. func (e *EndpointController) addPod(obj interface{}) {
  176. pod := obj.(*v1.Pod)
  177. services, err := e.getPodServiceMemberships(pod)
  178. if err != nil {
  179. utilruntime.HandleError(fmt.Errorf("Unable to get pod %s/%s's service memberships: %v", pod.Namespace, pod.Name, err))
  180. return
  181. }
  182. for key := range services {
  183. e.queue.Add(key)
  184. }
  185. }
  186. func podToEndpointAddress(pod *v1.Pod) *v1.EndpointAddress {
  187. return &v1.EndpointAddress{
  188. IP: pod.Status.PodIP,
  189. NodeName: &pod.Spec.NodeName,
  190. TargetRef: &v1.ObjectReference{
  191. Kind: "Pod",
  192. Namespace: pod.ObjectMeta.Namespace,
  193. Name: pod.ObjectMeta.Name,
  194. UID: pod.ObjectMeta.UID,
  195. ResourceVersion: pod.ObjectMeta.ResourceVersion,
  196. }}
  197. }
  198. func podChanged(oldPod, newPod *v1.Pod) bool {
  199. // If the pod's deletion timestamp is set, remove endpoint from ready address.
  200. if newPod.DeletionTimestamp != oldPod.DeletionTimestamp {
  201. return true
  202. }
  203. // If the pod's readiness has changed, the associated endpoint address
  204. // will move from the unready endpoints set to the ready endpoints.
  205. // So for the purposes of an endpoint, a readiness change on a pod
  206. // means we have a changed pod.
  207. if podutil.IsPodReady(oldPod) != podutil.IsPodReady(newPod) {
  208. return true
  209. }
  210. // Convert the pod to an EndpointAddress, clear inert fields,
  211. // and see if they are the same.
  212. newEndpointAddress := podToEndpointAddress(newPod)
  213. oldEndpointAddress := podToEndpointAddress(oldPod)
  214. // Ignore the ResourceVersion because it changes
  215. // with every pod update. This allows the comparison to
  216. // show equality if all other relevant fields match.
  217. newEndpointAddress.TargetRef.ResourceVersion = ""
  218. oldEndpointAddress.TargetRef.ResourceVersion = ""
  219. if reflect.DeepEqual(newEndpointAddress, oldEndpointAddress) {
  220. // The pod has not changed in any way that impacts the endpoints
  221. return false
  222. }
  223. return true
  224. }
  225. func determineNeededServiceUpdates(oldServices, services sets.String, podChanged bool) sets.String {
  226. if podChanged {
  227. // if the labels and pod changed, all services need to be updated
  228. services = services.Union(oldServices)
  229. } else {
  230. // if only the labels changed, services not common to
  231. // both the new and old service set (i.e the disjunctive union)
  232. // need to be updated
  233. services = services.Difference(oldServices).Union(oldServices.Difference(services))
  234. }
  235. return services
  236. }
  237. // When a pod is updated, figure out what services it used to be a member of
  238. // and what services it will be a member of, and enqueue the union of these.
  239. // old and cur must be *v1.Pod types.
  240. func (e *EndpointController) updatePod(old, cur interface{}) {
  241. newPod := cur.(*v1.Pod)
  242. oldPod := old.(*v1.Pod)
  243. if newPod.ResourceVersion == oldPod.ResourceVersion {
  244. // Periodic resync will send update events for all known pods.
  245. // Two different versions of the same pod will always have different RVs.
  246. return
  247. }
  248. podChangedFlag := podChanged(oldPod, newPod)
  249. // Check if the pod labels have changed, indicating a possible
  250. // change in the service membership
  251. labelsChanged := false
  252. if !reflect.DeepEqual(newPod.Labels, oldPod.Labels) ||
  253. !hostNameAndDomainAreEqual(newPod, oldPod) {
  254. labelsChanged = true
  255. }
  256. // If both the pod and labels are unchanged, no update is needed
  257. if !podChangedFlag && !labelsChanged {
  258. return
  259. }
  260. services, err := e.getPodServiceMemberships(newPod)
  261. if err != nil {
  262. utilruntime.HandleError(fmt.Errorf("Unable to get pod %v/%v's service memberships: %v", newPod.Namespace, newPod.Name, err))
  263. return
  264. }
  265. if labelsChanged {
  266. oldServices, err := e.getPodServiceMemberships(oldPod)
  267. if err != nil {
  268. utilruntime.HandleError(fmt.Errorf("Unable to get pod %v/%v's service memberships: %v", oldPod.Namespace, oldPod.Name, err))
  269. return
  270. }
  271. services = determineNeededServiceUpdates(oldServices, services, podChangedFlag)
  272. }
  273. for key := range services {
  274. e.queue.Add(key)
  275. }
  276. }
  277. func hostNameAndDomainAreEqual(pod1, pod2 *v1.Pod) bool {
  278. return pod1.Spec.Hostname == pod2.Spec.Hostname &&
  279. pod1.Spec.Subdomain == pod2.Spec.Subdomain
  280. }
  281. // When a pod is deleted, enqueue the services the pod used to be a member of.
  282. // obj could be an *v1.Pod, or a DeletionFinalStateUnknown marker item.
  283. func (e *EndpointController) deletePod(obj interface{}) {
  284. if _, ok := obj.(*v1.Pod); ok {
  285. // Enqueue all the services that the pod used to be a member
  286. // of. This happens to be exactly the same thing we do when a
  287. // pod is added.
  288. e.addPod(obj)
  289. return
  290. }
  291. // If we reached here it means the pod was deleted but its final state is unrecorded.
  292. tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
  293. if !ok {
  294. utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
  295. return
  296. }
  297. pod, ok := tombstone.Obj.(*v1.Pod)
  298. if !ok {
  299. utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a Pod: %#v", obj))
  300. return
  301. }
  302. klog.V(4).Infof("Enqueuing services of deleted pod %s/%s having final state unrecorded", pod.Namespace, pod.Name)
  303. e.addPod(pod)
  304. }
  305. // obj could be an *v1.Service, or a DeletionFinalStateUnknown marker item.
  306. func (e *EndpointController) enqueueService(obj interface{}) {
  307. key, err := controller.KeyFunc(obj)
  308. if err != nil {
  309. utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
  310. return
  311. }
  312. e.queue.Add(key)
  313. }
  314. // worker runs a worker thread that just dequeues items, processes them, and
  315. // marks them done. You may run as many of these in parallel as you wish; the
  316. // workqueue guarantees that they will not end up processing the same service
  317. // at the same time.
  318. func (e *EndpointController) worker() {
  319. for e.processNextWorkItem() {
  320. }
  321. }
  322. func (e *EndpointController) processNextWorkItem() bool {
  323. eKey, quit := e.queue.Get()
  324. if quit {
  325. return false
  326. }
  327. defer e.queue.Done(eKey)
  328. err := e.syncService(eKey.(string))
  329. e.handleErr(err, eKey)
  330. return true
  331. }
  332. func (e *EndpointController) handleErr(err error, key interface{}) {
  333. if err == nil {
  334. e.queue.Forget(key)
  335. return
  336. }
  337. if e.queue.NumRequeues(key) < maxRetries {
  338. klog.V(2).Infof("Error syncing endpoints for service %q, retrying. Error: %v", key, err)
  339. e.queue.AddRateLimited(key)
  340. return
  341. }
  342. klog.Warningf("Dropping service %q out of the queue: %v", key, err)
  343. e.queue.Forget(key)
  344. utilruntime.HandleError(err)
  345. }
  346. func (e *EndpointController) syncService(key string) error {
  347. startTime := time.Now()
  348. defer func() {
  349. klog.V(4).Infof("Finished syncing service %q endpoints. (%v)", key, time.Since(startTime))
  350. }()
  351. namespace, name, err := cache.SplitMetaNamespaceKey(key)
  352. if err != nil {
  353. return err
  354. }
  355. service, err := e.serviceLister.Services(namespace).Get(name)
  356. if err != nil {
  357. // Delete the corresponding endpoint, as the service has been deleted.
  358. // TODO: Please note that this will delete an endpoint when a
  359. // service is deleted. However, if we're down at the time when
  360. // the service is deleted, we will miss that deletion, so this
  361. // doesn't completely solve the problem. See #6877.
  362. err = e.client.CoreV1().Endpoints(namespace).Delete(name, nil)
  363. if err != nil && !errors.IsNotFound(err) {
  364. return err
  365. }
  366. e.triggerTimeTracker.DeleteEndpoints(namespace, name)
  367. return nil
  368. }
  369. if service.Spec.Selector == nil {
  370. // services without a selector receive no endpoints from this controller;
  371. // these services will receive the endpoints that are created out-of-band via the REST API.
  372. return nil
  373. }
  374. klog.V(5).Infof("About to update endpoints for service %q", key)
  375. pods, err := e.podLister.Pods(service.Namespace).List(labels.Set(service.Spec.Selector).AsSelectorPreValidated())
  376. if err != nil {
  377. // Since we're getting stuff from a local cache, it is
  378. // basically impossible to get this error.
  379. return err
  380. }
  381. // If the user specified the older (deprecated) annotation, we have to respect it.
  382. tolerateUnreadyEndpoints := service.Spec.PublishNotReadyAddresses
  383. if v, ok := service.Annotations[TolerateUnreadyEndpointsAnnotation]; ok {
  384. b, err := strconv.ParseBool(v)
  385. if err == nil {
  386. tolerateUnreadyEndpoints = b
  387. } else {
  388. utilruntime.HandleError(fmt.Errorf("Failed to parse annotation %v: %v", TolerateUnreadyEndpointsAnnotation, err))
  389. }
  390. }
  391. // We call ComputeEndpointsLastChangeTriggerTime here to make sure that the state of the trigger
  392. // time tracker gets updated even if the sync turns out to be no-op and we don't update the
  393. // endpoints object.
  394. endpointsLastChangeTriggerTime := e.triggerTimeTracker.
  395. ComputeEndpointsLastChangeTriggerTime(namespace, name, service, pods)
  396. subsets := []v1.EndpointSubset{}
  397. var totalReadyEps int
  398. var totalNotReadyEps int
  399. for _, pod := range pods {
  400. if len(pod.Status.PodIP) == 0 {
  401. klog.V(5).Infof("Failed to find an IP for pod %s/%s", pod.Namespace, pod.Name)
  402. continue
  403. }
  404. if !tolerateUnreadyEndpoints && pod.DeletionTimestamp != nil {
  405. klog.V(5).Infof("Pod is being deleted %s/%s", pod.Namespace, pod.Name)
  406. continue
  407. }
  408. epa := *podToEndpointAddress(pod)
  409. hostname := pod.Spec.Hostname
  410. if len(hostname) > 0 && pod.Spec.Subdomain == service.Name && service.Namespace == pod.Namespace {
  411. epa.Hostname = hostname
  412. }
  413. // Allow headless service not to have ports.
  414. if len(service.Spec.Ports) == 0 {
  415. if service.Spec.ClusterIP == api.ClusterIPNone {
  416. subsets, totalReadyEps, totalNotReadyEps = addEndpointSubset(subsets, pod, epa, nil, tolerateUnreadyEndpoints)
  417. // No need to repack subsets for headless service without ports.
  418. }
  419. } else {
  420. for i := range service.Spec.Ports {
  421. servicePort := &service.Spec.Ports[i]
  422. portName := servicePort.Name
  423. portProto := servicePort.Protocol
  424. portNum, err := podutil.FindPort(pod, servicePort)
  425. if err != nil {
  426. klog.V(4).Infof("Failed to find port for service %s/%s: %v", service.Namespace, service.Name, err)
  427. continue
  428. }
  429. var readyEps, notReadyEps int
  430. epp := &v1.EndpointPort{Name: portName, Port: int32(portNum), Protocol: portProto}
  431. subsets, readyEps, notReadyEps = addEndpointSubset(subsets, pod, epa, epp, tolerateUnreadyEndpoints)
  432. totalReadyEps = totalReadyEps + readyEps
  433. totalNotReadyEps = totalNotReadyEps + notReadyEps
  434. }
  435. }
  436. }
  437. subsets = endpoints.RepackSubsets(subsets)
  438. // See if there's actually an update here.
  439. currentEndpoints, err := e.endpointsLister.Endpoints(service.Namespace).Get(service.Name)
  440. if err != nil {
  441. if errors.IsNotFound(err) {
  442. currentEndpoints = &v1.Endpoints{
  443. ObjectMeta: metav1.ObjectMeta{
  444. Name: service.Name,
  445. Labels: service.Labels,
  446. },
  447. }
  448. } else {
  449. return err
  450. }
  451. }
  452. createEndpoints := len(currentEndpoints.ResourceVersion) == 0
  453. if !createEndpoints &&
  454. apiequality.Semantic.DeepEqual(currentEndpoints.Subsets, subsets) &&
  455. apiequality.Semantic.DeepEqual(currentEndpoints.Labels, service.Labels) {
  456. klog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name)
  457. return nil
  458. }
  459. newEndpoints := currentEndpoints.DeepCopy()
  460. newEndpoints.Subsets = subsets
  461. newEndpoints.Labels = service.Labels
  462. if newEndpoints.Annotations == nil {
  463. newEndpoints.Annotations = make(map[string]string)
  464. }
  465. if !endpointsLastChangeTriggerTime.IsZero() {
  466. newEndpoints.Annotations[v1.EndpointsLastChangeTriggerTime] =
  467. endpointsLastChangeTriggerTime.Format(time.RFC3339Nano)
  468. } else { // No new trigger time, clear the annotation.
  469. delete(newEndpoints.Annotations, v1.EndpointsLastChangeTriggerTime)
  470. }
  471. klog.V(4).Infof("Update endpoints for %v/%v, ready: %d not ready: %d", service.Namespace, service.Name, totalReadyEps, totalNotReadyEps)
  472. if createEndpoints {
  473. // No previous endpoints, create them
  474. _, err = e.client.CoreV1().Endpoints(service.Namespace).Create(newEndpoints)
  475. } else {
  476. // Pre-existing
  477. _, err = e.client.CoreV1().Endpoints(service.Namespace).Update(newEndpoints)
  478. }
  479. if err != nil {
  480. if createEndpoints && errors.IsForbidden(err) {
  481. // A request is forbidden primarily for two reasons:
  482. // 1. namespace is terminating, endpoint creation is not allowed by default.
  483. // 2. policy is misconfigured, in which case no service would function anywhere.
  484. // Given the frequency of 1, we log at a lower level.
  485. klog.V(5).Infof("Forbidden from creating endpoints: %v", err)
  486. }
  487. if createEndpoints {
  488. e.eventRecorder.Eventf(newEndpoints, v1.EventTypeWarning, "FailedToCreateEndpoint", "Failed to create endpoint for service %v/%v: %v", service.Namespace, service.Name, err)
  489. } else {
  490. e.eventRecorder.Eventf(newEndpoints, v1.EventTypeWarning, "FailedToUpdateEndpoint", "Failed to update endpoint %v/%v: %v", service.Namespace, service.Name, err)
  491. }
  492. return err
  493. }
  494. return nil
  495. }
  496. // checkLeftoverEndpoints lists all currently existing endpoints and adds their
  497. // service to the queue. This will detect endpoints that exist with no
  498. // corresponding service; these endpoints need to be deleted. We only need to
  499. // do this once on startup, because in steady-state these are detected (but
  500. // some stragglers could have been left behind if the endpoint controller
  501. // reboots).
  502. func (e *EndpointController) checkLeftoverEndpoints() {
  503. list, err := e.endpointsLister.List(labels.Everything())
  504. if err != nil {
  505. utilruntime.HandleError(fmt.Errorf("Unable to list endpoints (%v); orphaned endpoints will not be cleaned up. (They're pretty harmless, but you can restart this component if you want another attempt made.)", err))
  506. return
  507. }
  508. for _, ep := range list {
  509. if _, ok := ep.Annotations[resourcelock.LeaderElectionRecordAnnotationKey]; ok {
  510. // when there are multiple controller-manager instances,
  511. // we observe that it will delete leader-election endpoints after 5min
  512. // and cause re-election
  513. // so skip the delete here
  514. // as leader-election only have endpoints without service
  515. continue
  516. }
  517. key, err := controller.KeyFunc(ep)
  518. if err != nil {
  519. utilruntime.HandleError(fmt.Errorf("Unable to get key for endpoint %#v", ep))
  520. continue
  521. }
  522. e.queue.Add(key)
  523. }
  524. }
  525. func addEndpointSubset(subsets []v1.EndpointSubset, pod *v1.Pod, epa v1.EndpointAddress,
  526. epp *v1.EndpointPort, tolerateUnreadyEndpoints bool) ([]v1.EndpointSubset, int, int) {
  527. var readyEps int
  528. var notReadyEps int
  529. ports := []v1.EndpointPort{}
  530. if epp != nil {
  531. ports = append(ports, *epp)
  532. }
  533. if tolerateUnreadyEndpoints || podutil.IsPodReady(pod) {
  534. subsets = append(subsets, v1.EndpointSubset{
  535. Addresses: []v1.EndpointAddress{epa},
  536. Ports: ports,
  537. })
  538. readyEps++
  539. } else if shouldPodBeInEndpoints(pod) {
  540. klog.V(5).Infof("Pod is out of service: %s/%s", pod.Namespace, pod.Name)
  541. subsets = append(subsets, v1.EndpointSubset{
  542. NotReadyAddresses: []v1.EndpointAddress{epa},
  543. Ports: ports,
  544. })
  545. notReadyEps++
  546. }
  547. return subsets, readyEps, notReadyEps
  548. }
  549. func shouldPodBeInEndpoints(pod *v1.Pod) bool {
  550. switch pod.Spec.RestartPolicy {
  551. case v1.RestartPolicyNever:
  552. return pod.Status.Phase != v1.PodFailed && pod.Status.Phase != v1.PodSucceeded
  553. case v1.RestartPolicyOnFailure:
  554. return pod.Status.Phase != v1.PodSucceeded
  555. default:
  556. return true
  557. }
  558. }