config.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package config
  14. import (
  15. "fmt"
  16. "time"
  17. "k8s.io/api/core/v1"
  18. discovery "k8s.io/api/discovery/v1beta1"
  19. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  20. coreinformers "k8s.io/client-go/informers/core/v1"
  21. discoveryinformers "k8s.io/client-go/informers/discovery/v1beta1"
  22. "k8s.io/client-go/tools/cache"
  23. "k8s.io/klog"
  24. )
  25. // ServiceHandler is an abstract interface of objects which receive
  26. // notifications about service object changes.
  27. type ServiceHandler interface {
  28. // OnServiceAdd is called whenever creation of new service object
  29. // is observed.
  30. OnServiceAdd(service *v1.Service)
  31. // OnServiceUpdate is called whenever modification of an existing
  32. // service object is observed.
  33. OnServiceUpdate(oldService, service *v1.Service)
  34. // OnServiceDelete is called whenever deletion of an existing service
  35. // object is observed.
  36. OnServiceDelete(service *v1.Service)
  37. // OnServiceSynced is called once all the initial event handlers were
  38. // called and the state is fully propagated to local cache.
  39. OnServiceSynced()
  40. }
  41. // EndpointsHandler is an abstract interface of objects which receive
  42. // notifications about endpoints object changes.
  43. type EndpointsHandler interface {
  44. // OnEndpointsAdd is called whenever creation of new endpoints object
  45. // is observed.
  46. OnEndpointsAdd(endpoints *v1.Endpoints)
  47. // OnEndpointsUpdate is called whenever modification of an existing
  48. // endpoints object is observed.
  49. OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints)
  50. // OnEndpointsDelete is called whenever deletion of an existing endpoints
  51. // object is observed.
  52. OnEndpointsDelete(endpoints *v1.Endpoints)
  53. // OnEndpointsSynced is called once all the initial event handlers were
  54. // called and the state is fully propagated to local cache.
  55. OnEndpointsSynced()
  56. }
  57. // EndpointSliceHandler is an abstract interface of objects which receive
  58. // notifications about endpoint slice object changes.
  59. type EndpointSliceHandler interface {
  60. // OnEndpointSliceAdd is called whenever creation of new endpoint slice
  61. // object is observed.
  62. OnEndpointSliceAdd(endpointSlice *discovery.EndpointSlice)
  63. // OnEndpointSliceUpdate is called whenever modification of an existing
  64. // endpoint slice object is observed.
  65. OnEndpointSliceUpdate(oldEndpointSlice, newEndpointSlice *discovery.EndpointSlice)
  66. // OnEndpointSliceDelete is called whenever deletion of an existing
  67. // endpoint slice object is observed.
  68. OnEndpointSliceDelete(endpointSlice *discovery.EndpointSlice)
  69. // OnEndpointSlicesSynced is called once all the initial event handlers were
  70. // called and the state is fully propagated to local cache.
  71. OnEndpointSlicesSynced()
  72. }
  73. // NoopEndpointSliceHandler is a noop handler for proxiers that have not yet
  74. // implemented a full EndpointSliceHandler.
  75. type NoopEndpointSliceHandler struct{}
  76. // OnEndpointSliceAdd is a noop handler for EndpointSlice creates.
  77. func (*NoopEndpointSliceHandler) OnEndpointSliceAdd(endpointSlice *discovery.EndpointSlice) {}
  78. // OnEndpointSliceUpdate is a noop handler for EndpointSlice updates.
  79. func (*NoopEndpointSliceHandler) OnEndpointSliceUpdate(oldEndpointSlice, newEndpointSlice *discovery.EndpointSlice) {
  80. }
  81. // OnEndpointSliceDelete is a noop handler for EndpointSlice deletes.
  82. func (*NoopEndpointSliceHandler) OnEndpointSliceDelete(endpointSlice *discovery.EndpointSlice) {}
  83. // OnEndpointSlicesSynced is a noop handler for EndpointSlice syncs.
  84. func (*NoopEndpointSliceHandler) OnEndpointSlicesSynced() {}
  85. var _ EndpointSliceHandler = &NoopEndpointSliceHandler{}
  86. // EndpointsConfig tracks a set of endpoints configurations.
  87. type EndpointsConfig struct {
  88. listerSynced cache.InformerSynced
  89. eventHandlers []EndpointsHandler
  90. }
  91. // NewEndpointsConfig creates a new EndpointsConfig.
  92. func NewEndpointsConfig(endpointsInformer coreinformers.EndpointsInformer, resyncPeriod time.Duration) *EndpointsConfig {
  93. result := &EndpointsConfig{
  94. listerSynced: endpointsInformer.Informer().HasSynced,
  95. }
  96. endpointsInformer.Informer().AddEventHandlerWithResyncPeriod(
  97. cache.ResourceEventHandlerFuncs{
  98. AddFunc: result.handleAddEndpoints,
  99. UpdateFunc: result.handleUpdateEndpoints,
  100. DeleteFunc: result.handleDeleteEndpoints,
  101. },
  102. resyncPeriod,
  103. )
  104. return result
  105. }
  106. // RegisterEventHandler registers a handler which is called on every endpoints change.
  107. func (c *EndpointsConfig) RegisterEventHandler(handler EndpointsHandler) {
  108. c.eventHandlers = append(c.eventHandlers, handler)
  109. }
  110. // Run waits for cache synced and invokes handlers after syncing.
  111. func (c *EndpointsConfig) Run(stopCh <-chan struct{}) {
  112. klog.Info("Starting endpoints config controller")
  113. if !cache.WaitForNamedCacheSync("endpoints config", stopCh, c.listerSynced) {
  114. return
  115. }
  116. for i := range c.eventHandlers {
  117. klog.V(3).Infof("Calling handler.OnEndpointsSynced()")
  118. c.eventHandlers[i].OnEndpointsSynced()
  119. }
  120. }
  121. func (c *EndpointsConfig) handleAddEndpoints(obj interface{}) {
  122. endpoints, ok := obj.(*v1.Endpoints)
  123. if !ok {
  124. utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
  125. return
  126. }
  127. for i := range c.eventHandlers {
  128. klog.V(4).Infof("Calling handler.OnEndpointsAdd")
  129. c.eventHandlers[i].OnEndpointsAdd(endpoints)
  130. }
  131. }
  132. func (c *EndpointsConfig) handleUpdateEndpoints(oldObj, newObj interface{}) {
  133. oldEndpoints, ok := oldObj.(*v1.Endpoints)
  134. if !ok {
  135. utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", oldObj))
  136. return
  137. }
  138. endpoints, ok := newObj.(*v1.Endpoints)
  139. if !ok {
  140. utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj))
  141. return
  142. }
  143. for i := range c.eventHandlers {
  144. klog.V(4).Infof("Calling handler.OnEndpointsUpdate")
  145. c.eventHandlers[i].OnEndpointsUpdate(oldEndpoints, endpoints)
  146. }
  147. }
  148. func (c *EndpointsConfig) handleDeleteEndpoints(obj interface{}) {
  149. endpoints, ok := obj.(*v1.Endpoints)
  150. if !ok {
  151. tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
  152. if !ok {
  153. utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
  154. return
  155. }
  156. if endpoints, ok = tombstone.Obj.(*v1.Endpoints); !ok {
  157. utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
  158. return
  159. }
  160. }
  161. for i := range c.eventHandlers {
  162. klog.V(4).Infof("Calling handler.OnEndpointsDelete")
  163. c.eventHandlers[i].OnEndpointsDelete(endpoints)
  164. }
  165. }
  166. // EndpointSliceConfig tracks a set of endpoints configurations.
  167. type EndpointSliceConfig struct {
  168. listerSynced cache.InformerSynced
  169. eventHandlers []EndpointSliceHandler
  170. }
  171. // NewEndpointSliceConfig creates a new EndpointSliceConfig.
  172. func NewEndpointSliceConfig(endpointSliceInformer discoveryinformers.EndpointSliceInformer, resyncPeriod time.Duration) *EndpointSliceConfig {
  173. result := &EndpointSliceConfig{
  174. listerSynced: endpointSliceInformer.Informer().HasSynced,
  175. }
  176. endpointSliceInformer.Informer().AddEventHandlerWithResyncPeriod(
  177. cache.ResourceEventHandlerFuncs{
  178. AddFunc: result.handleAddEndpointSlice,
  179. UpdateFunc: result.handleUpdateEndpointSlice,
  180. DeleteFunc: result.handleDeleteEndpointSlice,
  181. },
  182. resyncPeriod,
  183. )
  184. return result
  185. }
  186. // RegisterEventHandler registers a handler which is called on every endpoint slice change.
  187. func (c *EndpointSliceConfig) RegisterEventHandler(handler EndpointSliceHandler) {
  188. c.eventHandlers = append(c.eventHandlers, handler)
  189. }
  190. // Run waits for cache synced and invokes handlers after syncing.
  191. func (c *EndpointSliceConfig) Run(stopCh <-chan struct{}) {
  192. klog.Info("Starting endpoint slice config controller")
  193. if !cache.WaitForNamedCacheSync("endpoint slice config", stopCh, c.listerSynced) {
  194. return
  195. }
  196. for _, h := range c.eventHandlers {
  197. klog.V(3).Infof("Calling handler.OnEndpointSlicesSynced()")
  198. h.OnEndpointSlicesSynced()
  199. }
  200. }
  201. func (c *EndpointSliceConfig) handleAddEndpointSlice(obj interface{}) {
  202. endpointSlice, ok := obj.(*discovery.EndpointSlice)
  203. if !ok {
  204. utilruntime.HandleError(fmt.Errorf("unexpected object type: %T", obj))
  205. return
  206. }
  207. for _, h := range c.eventHandlers {
  208. klog.V(4).Infof("Calling handler.OnEndpointSliceAdd %+v", endpointSlice)
  209. h.OnEndpointSliceAdd(endpointSlice)
  210. }
  211. }
  212. func (c *EndpointSliceConfig) handleUpdateEndpointSlice(oldObj, newObj interface{}) {
  213. oldEndpointSlice, ok := oldObj.(*discovery.EndpointSlice)
  214. if !ok {
  215. utilruntime.HandleError(fmt.Errorf("unexpected object type: %T", newObj))
  216. return
  217. }
  218. newEndpointSlice, ok := newObj.(*discovery.EndpointSlice)
  219. if !ok {
  220. utilruntime.HandleError(fmt.Errorf("unexpected object type: %T", newObj))
  221. return
  222. }
  223. for _, h := range c.eventHandlers {
  224. klog.V(4).Infof("Calling handler.OnEndpointSliceUpdate")
  225. h.OnEndpointSliceUpdate(oldEndpointSlice, newEndpointSlice)
  226. }
  227. }
  228. func (c *EndpointSliceConfig) handleDeleteEndpointSlice(obj interface{}) {
  229. endpointSlice, ok := obj.(*discovery.EndpointSlice)
  230. if !ok {
  231. tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
  232. if !ok {
  233. utilruntime.HandleError(fmt.Errorf("unexpected object type: %T", obj))
  234. return
  235. }
  236. if endpointSlice, ok = tombstone.Obj.(*discovery.EndpointSlice); !ok {
  237. utilruntime.HandleError(fmt.Errorf("unexpected object type: %T", obj))
  238. return
  239. }
  240. }
  241. for _, h := range c.eventHandlers {
  242. klog.V(4).Infof("Calling handler.OnEndpointsDelete")
  243. h.OnEndpointSliceDelete(endpointSlice)
  244. }
  245. }
  246. // ServiceConfig tracks a set of service configurations.
  247. type ServiceConfig struct {
  248. listerSynced cache.InformerSynced
  249. eventHandlers []ServiceHandler
  250. }
  251. // NewServiceConfig creates a new ServiceConfig.
  252. func NewServiceConfig(serviceInformer coreinformers.ServiceInformer, resyncPeriod time.Duration) *ServiceConfig {
  253. result := &ServiceConfig{
  254. listerSynced: serviceInformer.Informer().HasSynced,
  255. }
  256. serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
  257. cache.ResourceEventHandlerFuncs{
  258. AddFunc: result.handleAddService,
  259. UpdateFunc: result.handleUpdateService,
  260. DeleteFunc: result.handleDeleteService,
  261. },
  262. resyncPeriod,
  263. )
  264. return result
  265. }
  266. // RegisterEventHandler registers a handler which is called on every service change.
  267. func (c *ServiceConfig) RegisterEventHandler(handler ServiceHandler) {
  268. c.eventHandlers = append(c.eventHandlers, handler)
  269. }
  270. // Run waits for cache synced and invokes handlers after syncing.
  271. func (c *ServiceConfig) Run(stopCh <-chan struct{}) {
  272. klog.Info("Starting service config controller")
  273. if !cache.WaitForNamedCacheSync("service config", stopCh, c.listerSynced) {
  274. return
  275. }
  276. for i := range c.eventHandlers {
  277. klog.V(3).Info("Calling handler.OnServiceSynced()")
  278. c.eventHandlers[i].OnServiceSynced()
  279. }
  280. }
  281. func (c *ServiceConfig) handleAddService(obj interface{}) {
  282. service, ok := obj.(*v1.Service)
  283. if !ok {
  284. utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
  285. return
  286. }
  287. for i := range c.eventHandlers {
  288. klog.V(4).Info("Calling handler.OnServiceAdd")
  289. c.eventHandlers[i].OnServiceAdd(service)
  290. }
  291. }
  292. func (c *ServiceConfig) handleUpdateService(oldObj, newObj interface{}) {
  293. oldService, ok := oldObj.(*v1.Service)
  294. if !ok {
  295. utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", oldObj))
  296. return
  297. }
  298. service, ok := newObj.(*v1.Service)
  299. if !ok {
  300. utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj))
  301. return
  302. }
  303. for i := range c.eventHandlers {
  304. klog.V(4).Info("Calling handler.OnServiceUpdate")
  305. c.eventHandlers[i].OnServiceUpdate(oldService, service)
  306. }
  307. }
  308. func (c *ServiceConfig) handleDeleteService(obj interface{}) {
  309. service, ok := obj.(*v1.Service)
  310. if !ok {
  311. tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
  312. if !ok {
  313. utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
  314. return
  315. }
  316. if service, ok = tombstone.Obj.(*v1.Service); !ok {
  317. utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
  318. return
  319. }
  320. }
  321. for i := range c.eventHandlers {
  322. klog.V(4).Info("Calling handler.OnServiceDelete")
  323. c.eventHandlers[i].OnServiceDelete(service)
  324. }
  325. }
  326. // NodeHandler is an abstract interface of objects which receive
  327. // notifications about node object changes.
  328. type NodeHandler interface {
  329. // OnNodeAdd is called whenever creation of new node object
  330. // is observed.
  331. OnNodeAdd(node *v1.Node)
  332. // OnNodeUpdate is called whenever modification of an existing
  333. // node object is observed.
  334. OnNodeUpdate(oldNode, node *v1.Node)
  335. // OnNodeDelete is called whever deletion of an existing node
  336. // object is observed.
  337. OnNodeDelete(node *v1.Node)
  338. // OnNodeSynced is called once all the initial event handlers were
  339. // called and the state is fully propagated to local cache.
  340. OnNodeSynced()
  341. }
  342. // NoopNodeHandler is a noop handler for proxiers that have not yet
  343. // implemented a full NodeHandler.
  344. type NoopNodeHandler struct{}
  345. // OnNodeAdd is a noop handler for Node creates.
  346. func (*NoopNodeHandler) OnNodeAdd(node *v1.Node) {}
  347. // OnNodeUpdate is a noop handler for Node updates.
  348. func (*NoopNodeHandler) OnNodeUpdate(oldNode, node *v1.Node) {}
  349. // OnNodeDelete is a noop handler for Node deletes.
  350. func (*NoopNodeHandler) OnNodeDelete(node *v1.Node) {}
  351. // OnNodeSynced is a noop handler for Node syncs.
  352. func (*NoopNodeHandler) OnNodeSynced() {}
  353. var _ NodeHandler = &NoopNodeHandler{}
  354. // NodeConfig tracks a set of node configurations.
  355. // It accepts "set", "add" and "remove" operations of node via channels, and invokes registered handlers on change.
  356. type NodeConfig struct {
  357. listerSynced cache.InformerSynced
  358. eventHandlers []NodeHandler
  359. }
  360. // NewNodeConfig creates a new NodeConfig.
  361. func NewNodeConfig(nodeInformer coreinformers.NodeInformer, resyncPeriod time.Duration) *NodeConfig {
  362. result := &NodeConfig{
  363. listerSynced: nodeInformer.Informer().HasSynced,
  364. }
  365. nodeInformer.Informer().AddEventHandlerWithResyncPeriod(
  366. cache.ResourceEventHandlerFuncs{
  367. AddFunc: result.handleAddNode,
  368. UpdateFunc: result.handleUpdateNode,
  369. DeleteFunc: result.handleDeleteNode,
  370. },
  371. resyncPeriod,
  372. )
  373. return result
  374. }
  375. // RegisterEventHandler registers a handler which is called on every node change.
  376. func (c *NodeConfig) RegisterEventHandler(handler NodeHandler) {
  377. c.eventHandlers = append(c.eventHandlers, handler)
  378. }
  379. // Run starts the goroutine responsible for calling registered handlers.
  380. func (c *NodeConfig) Run(stopCh <-chan struct{}) {
  381. klog.Info("Starting node config controller")
  382. if !cache.WaitForNamedCacheSync("node config", stopCh, c.listerSynced) {
  383. return
  384. }
  385. for i := range c.eventHandlers {
  386. klog.V(3).Infof("Calling handler.OnNodeSynced()")
  387. c.eventHandlers[i].OnNodeSynced()
  388. }
  389. }
  390. func (c *NodeConfig) handleAddNode(obj interface{}) {
  391. node, ok := obj.(*v1.Node)
  392. if !ok {
  393. utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
  394. return
  395. }
  396. for i := range c.eventHandlers {
  397. klog.V(4).Infof("Calling handler.OnNodeAdd")
  398. c.eventHandlers[i].OnNodeAdd(node)
  399. }
  400. }
  401. func (c *NodeConfig) handleUpdateNode(oldObj, newObj interface{}) {
  402. oldNode, ok := oldObj.(*v1.Node)
  403. if !ok {
  404. utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", oldObj))
  405. return
  406. }
  407. node, ok := newObj.(*v1.Node)
  408. if !ok {
  409. utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj))
  410. return
  411. }
  412. for i := range c.eventHandlers {
  413. klog.V(5).Infof("Calling handler.OnNodeUpdate")
  414. c.eventHandlers[i].OnNodeUpdate(oldNode, node)
  415. }
  416. }
  417. func (c *NodeConfig) handleDeleteNode(obj interface{}) {
  418. node, ok := obj.(*v1.Node)
  419. if !ok {
  420. tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
  421. if !ok {
  422. utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
  423. return
  424. }
  425. if node, ok = tombstone.Obj.(*v1.Node); !ok {
  426. utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
  427. return
  428. }
  429. }
  430. for i := range c.eventHandlers {
  431. klog.V(4).Infof("Calling handler.OnNodeDelete")
  432. c.eventHandlers[i].OnNodeDelete(node)
  433. }
  434. }