endpointslice_controller.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436
  1. /*
  2. Copyright 2019 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package endpointslice
  14. import (
  15. "fmt"
  16. "time"
  17. v1 "k8s.io/api/core/v1"
  18. discovery "k8s.io/api/discovery/v1beta1"
  19. apierrors "k8s.io/apimachinery/pkg/api/errors"
  20. "k8s.io/apimachinery/pkg/labels"
  21. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  22. "k8s.io/apimachinery/pkg/util/wait"
  23. coreinformers "k8s.io/client-go/informers/core/v1"
  24. discoveryinformers "k8s.io/client-go/informers/discovery/v1beta1"
  25. clientset "k8s.io/client-go/kubernetes"
  26. "k8s.io/client-go/kubernetes/scheme"
  27. v1core "k8s.io/client-go/kubernetes/typed/core/v1"
  28. corelisters "k8s.io/client-go/listers/core/v1"
  29. discoverylisters "k8s.io/client-go/listers/discovery/v1beta1"
  30. "k8s.io/client-go/tools/cache"
  31. "k8s.io/client-go/tools/record"
  32. "k8s.io/client-go/util/workqueue"
  33. "k8s.io/component-base/metrics/prometheus/ratelimiter"
  34. "k8s.io/klog"
  35. "k8s.io/kubernetes/pkg/controller"
  36. endpointslicemetrics "k8s.io/kubernetes/pkg/controller/endpointslice/metrics"
  37. endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint"
  38. )
  39. const (
  40. // maxRetries is the number of times a service will be retried before it is
  41. // dropped out of the queue. Any sync error, such as a failure to create or
  42. // update an EndpointSlice could trigger a retry. With the current
  43. // rate-limiter in use (5ms*2^(maxRetries-1)) the following numbers
  44. // represent the sequence of delays between successive queuings of a
  45. // service.
  46. //
  47. // 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s,
  48. // 10.2s, 20.4s, 41s, 82s
  49. maxRetries = 15
  50. // controllerName is a unique value used with LabelManagedBy to indicated
  51. // the component managing an EndpointSlice.
  52. controllerName = "endpointslice-controller.k8s.io"
  53. )
  54. // NewController creates and initializes a new Controller
  55. func NewController(podInformer coreinformers.PodInformer,
  56. serviceInformer coreinformers.ServiceInformer,
  57. nodeInformer coreinformers.NodeInformer,
  58. endpointSliceInformer discoveryinformers.EndpointSliceInformer,
  59. maxEndpointsPerSlice int32,
  60. client clientset.Interface,
  61. ) *Controller {
  62. broadcaster := record.NewBroadcaster()
  63. broadcaster.StartLogging(klog.Infof)
  64. broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
  65. recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "endpoint-slice-controller"})
  66. if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil {
  67. ratelimiter.RegisterMetricAndTrackRateLimiterUsage("endpoint_slice_controller", client.DiscoveryV1beta1().RESTClient().GetRateLimiter())
  68. }
  69. endpointslicemetrics.RegisterMetrics()
  70. c := &Controller{
  71. client: client,
  72. queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "endpoint_slice"),
  73. workerLoopPeriod: time.Second,
  74. }
  75. serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  76. AddFunc: c.onServiceUpdate,
  77. UpdateFunc: func(old, cur interface{}) {
  78. c.onServiceUpdate(cur)
  79. },
  80. DeleteFunc: c.onServiceDelete,
  81. })
  82. c.serviceLister = serviceInformer.Lister()
  83. c.servicesSynced = serviceInformer.Informer().HasSynced
  84. podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  85. AddFunc: c.addPod,
  86. UpdateFunc: c.updatePod,
  87. DeleteFunc: c.deletePod,
  88. })
  89. c.podLister = podInformer.Lister()
  90. c.podsSynced = podInformer.Informer().HasSynced
  91. c.nodeLister = nodeInformer.Lister()
  92. c.nodesSynced = nodeInformer.Informer().HasSynced
  93. endpointSliceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  94. AddFunc: c.onEndpointSliceAdd,
  95. UpdateFunc: c.onEndpointSliceUpdate,
  96. DeleteFunc: c.onEndpointSliceDelete,
  97. })
  98. c.endpointSliceLister = endpointSliceInformer.Lister()
  99. c.endpointSlicesSynced = endpointSliceInformer.Informer().HasSynced
  100. c.endpointSliceTracker = newEndpointSliceTracker()
  101. c.maxEndpointsPerSlice = maxEndpointsPerSlice
  102. c.reconciler = &reconciler{
  103. client: c.client,
  104. nodeLister: c.nodeLister,
  105. maxEndpointsPerSlice: c.maxEndpointsPerSlice,
  106. endpointSliceTracker: c.endpointSliceTracker,
  107. metricsCache: endpointslicemetrics.NewCache(maxEndpointsPerSlice),
  108. }
  109. c.triggerTimeTracker = endpointutil.NewTriggerTimeTracker()
  110. c.eventBroadcaster = broadcaster
  111. c.eventRecorder = recorder
  112. c.serviceSelectorCache = endpointutil.NewServiceSelectorCache()
  113. return c
  114. }
  115. // Controller manages selector-based service endpoint slices
  116. type Controller struct {
  117. client clientset.Interface
  118. eventBroadcaster record.EventBroadcaster
  119. eventRecorder record.EventRecorder
  120. // serviceLister is able to list/get services and is populated by the
  121. // shared informer passed to NewController
  122. serviceLister corelisters.ServiceLister
  123. // servicesSynced returns true if the service shared informer has been synced at least once.
  124. // Added as a member to the struct to allow injection for testing.
  125. servicesSynced cache.InformerSynced
  126. // podLister is able to list/get pods and is populated by the
  127. // shared informer passed to NewController
  128. podLister corelisters.PodLister
  129. // podsSynced returns true if the pod shared informer has been synced at least once.
  130. // Added as a member to the struct to allow injection for testing.
  131. podsSynced cache.InformerSynced
  132. // endpointSliceLister is able to list/get endpoint slices and is populated by the
  133. // shared informer passed to NewController
  134. endpointSliceLister discoverylisters.EndpointSliceLister
  135. // endpointSlicesSynced returns true if the endpoint slice shared informer has been synced at least once.
  136. // Added as a member to the struct to allow injection for testing.
  137. endpointSlicesSynced cache.InformerSynced
  138. // endpointSliceTracker tracks the list of EndpointSlices and associated
  139. // resource versions expected for each Service. It can help determine if a
  140. // cached EndpointSlice is out of date.
  141. endpointSliceTracker *endpointSliceTracker
  142. // nodeLister is able to list/get nodes and is populated by the
  143. // shared informer passed to NewController
  144. nodeLister corelisters.NodeLister
  145. // nodesSynced returns true if the node shared informer has been synced at least once.
  146. // Added as a member to the struct to allow injection for testing.
  147. nodesSynced cache.InformerSynced
  148. // reconciler is an util used to reconcile EndpointSlice changes.
  149. reconciler *reconciler
  150. // triggerTimeTracker is an util used to compute and export the
  151. // EndpointsLastChangeTriggerTime annotation.
  152. triggerTimeTracker *endpointutil.TriggerTimeTracker
  153. // Services that need to be updated. A channel is inappropriate here,
  154. // because it allows services with lots of pods to be serviced much
  155. // more often than services with few pods; it also would cause a
  156. // service that's inserted multiple times to be processed more than
  157. // necessary.
  158. queue workqueue.RateLimitingInterface
  159. // maxEndpointsPerSlice references the maximum number of endpoints that
  160. // should be added to an EndpointSlice
  161. maxEndpointsPerSlice int32
  162. // workerLoopPeriod is the time between worker runs. The workers
  163. // process the queue of service and pod changes
  164. workerLoopPeriod time.Duration
  165. // serviceSelectorCache is a cache of service selectors to avoid high CPU consumption caused by frequent calls
  166. // to AsSelectorPreValidated (see #73527)
  167. serviceSelectorCache *endpointutil.ServiceSelectorCache
  168. }
  169. // Run will not return until stopCh is closed.
  170. func (c *Controller) Run(workers int, stopCh <-chan struct{}) {
  171. defer utilruntime.HandleCrash()
  172. defer c.queue.ShutDown()
  173. klog.Infof("Starting endpoint slice controller")
  174. defer klog.Infof("Shutting down endpoint slice controller")
  175. if !cache.WaitForNamedCacheSync("endpoint_slice", stopCh, c.podsSynced, c.servicesSynced) {
  176. return
  177. }
  178. for i := 0; i < workers; i++ {
  179. go wait.Until(c.worker, c.workerLoopPeriod, stopCh)
  180. }
  181. go func() {
  182. defer utilruntime.HandleCrash()
  183. }()
  184. <-stopCh
  185. }
  186. // worker runs a worker thread that just dequeues items, processes them, and
  187. // marks them done. You may run as many of these in parallel as you wish; the
  188. // workqueue guarantees that they will not end up processing the same service
  189. // at the same time
  190. func (c *Controller) worker() {
  191. for c.processNextWorkItem() {
  192. }
  193. }
  194. func (c *Controller) processNextWorkItem() bool {
  195. cKey, quit := c.queue.Get()
  196. if quit {
  197. return false
  198. }
  199. defer c.queue.Done(cKey)
  200. err := c.syncService(cKey.(string))
  201. c.handleErr(err, cKey)
  202. return true
  203. }
  204. func (c *Controller) handleErr(err error, key interface{}) {
  205. if err == nil {
  206. c.queue.Forget(key)
  207. return
  208. }
  209. if c.queue.NumRequeues(key) < maxRetries {
  210. klog.Warningf("Error syncing endpoint slices for service %q, retrying. Error: %v", key, err)
  211. c.queue.AddRateLimited(key)
  212. return
  213. }
  214. klog.Warningf("Retry budget exceeded, dropping service %q out of the queue: %v", key, err)
  215. c.queue.Forget(key)
  216. utilruntime.HandleError(err)
  217. }
  218. func (c *Controller) syncService(key string) error {
  219. startTime := time.Now()
  220. defer func() {
  221. klog.V(4).Infof("Finished syncing service %q endpoint slices. (%v)", key, time.Since(startTime))
  222. }()
  223. namespace, name, err := cache.SplitMetaNamespaceKey(key)
  224. if err != nil {
  225. return err
  226. }
  227. service, err := c.serviceLister.Services(namespace).Get(name)
  228. if err != nil {
  229. if apierrors.IsNotFound(err) {
  230. c.triggerTimeTracker.DeleteService(namespace, name)
  231. c.reconciler.deleteService(namespace, name)
  232. // The service has been deleted, return nil so that it won't be retried.
  233. return nil
  234. }
  235. return err
  236. }
  237. if service.Spec.Selector == nil {
  238. // services without a selector receive no endpoint slices from this controller;
  239. // these services will receive endpoint slices that are created out-of-band via the REST API.
  240. return nil
  241. }
  242. klog.V(5).Infof("About to update endpoint slices for service %q", key)
  243. podLabelSelector := labels.Set(service.Spec.Selector).AsSelectorPreValidated()
  244. pods, err := c.podLister.Pods(service.Namespace).List(podLabelSelector)
  245. if err != nil {
  246. // Since we're getting stuff from a local cache, it is basically
  247. // impossible to get this error.
  248. c.eventRecorder.Eventf(service, v1.EventTypeWarning, "FailedToListPods",
  249. "Error listing Pods for Service %s/%s: %v", service.Namespace, service.Name, err)
  250. return err
  251. }
  252. esLabelSelector := labels.Set(map[string]string{
  253. discovery.LabelServiceName: service.Name,
  254. discovery.LabelManagedBy: controllerName,
  255. }).AsSelectorPreValidated()
  256. endpointSlices, err := c.endpointSliceLister.EndpointSlices(service.Namespace).List(esLabelSelector)
  257. if err != nil {
  258. // Since we're getting stuff from a local cache, it is basically
  259. // impossible to get this error.
  260. c.eventRecorder.Eventf(service, v1.EventTypeWarning, "FailedToListEndpointSlices",
  261. "Error listing Endpoint Slices for Service %s/%s: %v", service.Namespace, service.Name, err)
  262. return err
  263. }
  264. // We call ComputeEndpointLastChangeTriggerTime here to make sure that the
  265. // state of the trigger time tracker gets updated even if the sync turns out
  266. // to be no-op and we don't update the EndpointSlice objects.
  267. lastChangeTriggerTime := c.triggerTimeTracker.
  268. ComputeEndpointLastChangeTriggerTime(namespace, service, pods)
  269. err = c.reconciler.reconcile(service, pods, endpointSlices, lastChangeTriggerTime)
  270. if err != nil {
  271. c.eventRecorder.Eventf(service, v1.EventTypeWarning, "FailedToUpdateEndpointSlices",
  272. "Error updating Endpoint Slices for Service %s/%s: %v", service.Namespace, service.Name, err)
  273. return err
  274. }
  275. return nil
  276. }
  277. // onServiceUpdate updates the Service Selector in the cache and queues the Service for processing.
  278. func (c *Controller) onServiceUpdate(obj interface{}) {
  279. key, err := controller.KeyFunc(obj)
  280. if err != nil {
  281. utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
  282. return
  283. }
  284. _ = c.serviceSelectorCache.Update(key, obj.(*v1.Service).Spec.Selector)
  285. c.queue.Add(key)
  286. }
  287. // onServiceDelete removes the Service Selector from the cache and queues the Service for processing.
  288. func (c *Controller) onServiceDelete(obj interface{}) {
  289. key, err := controller.KeyFunc(obj)
  290. if err != nil {
  291. utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
  292. return
  293. }
  294. c.serviceSelectorCache.Delete(key)
  295. c.queue.Add(key)
  296. }
  297. // onEndpointSliceAdd queues a sync for the relevant Service for a sync if the
  298. // EndpointSlice resource version does not match the expected version in the
  299. // endpointSliceTracker.
  300. func (c *Controller) onEndpointSliceAdd(obj interface{}) {
  301. endpointSlice := obj.(*discovery.EndpointSlice)
  302. if endpointSlice == nil {
  303. utilruntime.HandleError(fmt.Errorf("Invalid EndpointSlice provided to onEndpointSliceAdd()"))
  304. return
  305. }
  306. if managedByController(endpointSlice) && c.endpointSliceTracker.Stale(endpointSlice) {
  307. c.queueServiceForEndpointSlice(endpointSlice)
  308. }
  309. }
  310. // onEndpointSliceUpdate queues a sync for the relevant Service for a sync if
  311. // the EndpointSlice resource version does not match the expected version in the
  312. // endpointSliceTracker or the managed-by value of the EndpointSlice has changed
  313. // from or to this controller.
  314. func (c *Controller) onEndpointSliceUpdate(prevObj, obj interface{}) {
  315. prevEndpointSlice := obj.(*discovery.EndpointSlice)
  316. endpointSlice := obj.(*discovery.EndpointSlice)
  317. if endpointSlice == nil || prevEndpointSlice == nil {
  318. utilruntime.HandleError(fmt.Errorf("Invalid EndpointSlice provided to onEndpointSliceUpdate()"))
  319. return
  320. }
  321. if managedByChanged(prevEndpointSlice, endpointSlice) || (managedByController(endpointSlice) && c.endpointSliceTracker.Stale(endpointSlice)) {
  322. c.queueServiceForEndpointSlice(endpointSlice)
  323. }
  324. }
  325. // onEndpointSliceDelete queues a sync for the relevant Service for a sync if the
  326. // EndpointSlice resource version does not match the expected version in the
  327. // endpointSliceTracker.
  328. func (c *Controller) onEndpointSliceDelete(obj interface{}) {
  329. endpointSlice := getEndpointSliceFromDeleteAction(obj)
  330. if endpointSlice != nil && managedByController(endpointSlice) && c.endpointSliceTracker.Has(endpointSlice) {
  331. c.queueServiceForEndpointSlice(endpointSlice)
  332. }
  333. }
  334. // queueServiceForEndpointSlice attempts to queue the corresponding Service for
  335. // the provided EndpointSlice.
  336. func (c *Controller) queueServiceForEndpointSlice(endpointSlice *discovery.EndpointSlice) {
  337. key, err := serviceControllerKey(endpointSlice)
  338. if err != nil {
  339. utilruntime.HandleError(fmt.Errorf("Couldn't get key for EndpointSlice %+v: %v", endpointSlice, err))
  340. return
  341. }
  342. c.queue.Add(key)
  343. }
  344. func (c *Controller) addPod(obj interface{}) {
  345. pod := obj.(*v1.Pod)
  346. services, err := c.serviceSelectorCache.GetPodServiceMemberships(c.serviceLister, pod)
  347. if err != nil {
  348. utilruntime.HandleError(fmt.Errorf("Unable to get pod %s/%s's service memberships: %v", pod.Namespace, pod.Name, err))
  349. return
  350. }
  351. for key := range services {
  352. c.queue.Add(key)
  353. }
  354. }
  355. func (c *Controller) updatePod(old, cur interface{}) {
  356. services := endpointutil.GetServicesToUpdateOnPodChange(c.serviceLister, c.serviceSelectorCache, old, cur, podEndpointChanged)
  357. for key := range services {
  358. c.queue.Add(key)
  359. }
  360. }
  361. // When a pod is deleted, enqueue the services the pod used to be a member of
  362. // obj could be an *v1.Pod, or a DeletionFinalStateUnknown marker item.
  363. func (c *Controller) deletePod(obj interface{}) {
  364. pod := endpointutil.GetPodFromDeleteAction(obj)
  365. if pod != nil {
  366. c.addPod(pod)
  367. }
  368. }