123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501 |
- /*
- Copyright 2014 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package config
- import (
- "fmt"
- "time"
- "k8s.io/api/core/v1"
- discovery "k8s.io/api/discovery/v1beta1"
- utilruntime "k8s.io/apimachinery/pkg/util/runtime"
- coreinformers "k8s.io/client-go/informers/core/v1"
- discoveryinformers "k8s.io/client-go/informers/discovery/v1beta1"
- "k8s.io/client-go/tools/cache"
- "k8s.io/klog"
- )
- // ServiceHandler is an abstract interface of objects which receive
- // notifications about service object changes.
- type ServiceHandler interface {
- // OnServiceAdd is called whenever creation of new service object
- // is observed.
- OnServiceAdd(service *v1.Service)
- // OnServiceUpdate is called whenever modification of an existing
- // service object is observed.
- OnServiceUpdate(oldService, service *v1.Service)
- // OnServiceDelete is called whenever deletion of an existing service
- // object is observed.
- OnServiceDelete(service *v1.Service)
- // OnServiceSynced is called once all the initial event handlers were
- // called and the state is fully propagated to local cache.
- OnServiceSynced()
- }
- // EndpointsHandler is an abstract interface of objects which receive
- // notifications about endpoints object changes.
- type EndpointsHandler interface {
- // OnEndpointsAdd is called whenever creation of new endpoints object
- // is observed.
- OnEndpointsAdd(endpoints *v1.Endpoints)
- // OnEndpointsUpdate is called whenever modification of an existing
- // endpoints object is observed.
- OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints)
- // OnEndpointsDelete is called whenever deletion of an existing endpoints
- // object is observed.
- OnEndpointsDelete(endpoints *v1.Endpoints)
- // OnEndpointsSynced is called once all the initial event handlers were
- // called and the state is fully propagated to local cache.
- OnEndpointsSynced()
- }
- // EndpointSliceHandler is an abstract interface of objects which receive
- // notifications about endpoint slice object changes.
- type EndpointSliceHandler interface {
- // OnEndpointSliceAdd is called whenever creation of new endpoint slice
- // object is observed.
- OnEndpointSliceAdd(endpointSlice *discovery.EndpointSlice)
- // OnEndpointSliceUpdate is called whenever modification of an existing
- // endpoint slice object is observed.
- OnEndpointSliceUpdate(oldEndpointSlice, newEndpointSlice *discovery.EndpointSlice)
- // OnEndpointSliceDelete is called whenever deletion of an existing
- // endpoint slice object is observed.
- OnEndpointSliceDelete(endpointSlice *discovery.EndpointSlice)
- // OnEndpointSlicesSynced is called once all the initial event handlers were
- // called and the state is fully propagated to local cache.
- OnEndpointSlicesSynced()
- }
- // NoopEndpointSliceHandler is a noop handler for proxiers that have not yet
- // implemented a full EndpointSliceHandler.
- type NoopEndpointSliceHandler struct{}
- // OnEndpointSliceAdd is a noop handler for EndpointSlice creates.
- func (*NoopEndpointSliceHandler) OnEndpointSliceAdd(endpointSlice *discovery.EndpointSlice) {}
- // OnEndpointSliceUpdate is a noop handler for EndpointSlice updates.
- func (*NoopEndpointSliceHandler) OnEndpointSliceUpdate(oldEndpointSlice, newEndpointSlice *discovery.EndpointSlice) {
- }
- // OnEndpointSliceDelete is a noop handler for EndpointSlice deletes.
- func (*NoopEndpointSliceHandler) OnEndpointSliceDelete(endpointSlice *discovery.EndpointSlice) {}
- // OnEndpointSlicesSynced is a noop handler for EndpointSlice syncs.
- func (*NoopEndpointSliceHandler) OnEndpointSlicesSynced() {}
- var _ EndpointSliceHandler = &NoopEndpointSliceHandler{}
- // EndpointsConfig tracks a set of endpoints configurations.
- type EndpointsConfig struct {
- listerSynced cache.InformerSynced
- eventHandlers []EndpointsHandler
- }
- // NewEndpointsConfig creates a new EndpointsConfig.
- func NewEndpointsConfig(endpointsInformer coreinformers.EndpointsInformer, resyncPeriod time.Duration) *EndpointsConfig {
- result := &EndpointsConfig{
- listerSynced: endpointsInformer.Informer().HasSynced,
- }
- endpointsInformer.Informer().AddEventHandlerWithResyncPeriod(
- cache.ResourceEventHandlerFuncs{
- AddFunc: result.handleAddEndpoints,
- UpdateFunc: result.handleUpdateEndpoints,
- DeleteFunc: result.handleDeleteEndpoints,
- },
- resyncPeriod,
- )
- return result
- }
- // RegisterEventHandler registers a handler which is called on every endpoints change.
- func (c *EndpointsConfig) RegisterEventHandler(handler EndpointsHandler) {
- c.eventHandlers = append(c.eventHandlers, handler)
- }
- // Run waits for cache synced and invokes handlers after syncing.
- func (c *EndpointsConfig) Run(stopCh <-chan struct{}) {
- klog.Info("Starting endpoints config controller")
- if !cache.WaitForNamedCacheSync("endpoints config", stopCh, c.listerSynced) {
- return
- }
- for i := range c.eventHandlers {
- klog.V(3).Infof("Calling handler.OnEndpointsSynced()")
- c.eventHandlers[i].OnEndpointsSynced()
- }
- }
- func (c *EndpointsConfig) handleAddEndpoints(obj interface{}) {
- endpoints, ok := obj.(*v1.Endpoints)
- if !ok {
- utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
- return
- }
- for i := range c.eventHandlers {
- klog.V(4).Infof("Calling handler.OnEndpointsAdd")
- c.eventHandlers[i].OnEndpointsAdd(endpoints)
- }
- }
- func (c *EndpointsConfig) handleUpdateEndpoints(oldObj, newObj interface{}) {
- oldEndpoints, ok := oldObj.(*v1.Endpoints)
- if !ok {
- utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", oldObj))
- return
- }
- endpoints, ok := newObj.(*v1.Endpoints)
- if !ok {
- utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj))
- return
- }
- for i := range c.eventHandlers {
- klog.V(4).Infof("Calling handler.OnEndpointsUpdate")
- c.eventHandlers[i].OnEndpointsUpdate(oldEndpoints, endpoints)
- }
- }
- func (c *EndpointsConfig) handleDeleteEndpoints(obj interface{}) {
- endpoints, ok := obj.(*v1.Endpoints)
- if !ok {
- tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
- if !ok {
- utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
- return
- }
- if endpoints, ok = tombstone.Obj.(*v1.Endpoints); !ok {
- utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
- return
- }
- }
- for i := range c.eventHandlers {
- klog.V(4).Infof("Calling handler.OnEndpointsDelete")
- c.eventHandlers[i].OnEndpointsDelete(endpoints)
- }
- }
- // EndpointSliceConfig tracks a set of endpoints configurations.
- type EndpointSliceConfig struct {
- listerSynced cache.InformerSynced
- eventHandlers []EndpointSliceHandler
- }
- // NewEndpointSliceConfig creates a new EndpointSliceConfig.
- func NewEndpointSliceConfig(endpointSliceInformer discoveryinformers.EndpointSliceInformer, resyncPeriod time.Duration) *EndpointSliceConfig {
- result := &EndpointSliceConfig{
- listerSynced: endpointSliceInformer.Informer().HasSynced,
- }
- endpointSliceInformer.Informer().AddEventHandlerWithResyncPeriod(
- cache.ResourceEventHandlerFuncs{
- AddFunc: result.handleAddEndpointSlice,
- UpdateFunc: result.handleUpdateEndpointSlice,
- DeleteFunc: result.handleDeleteEndpointSlice,
- },
- resyncPeriod,
- )
- return result
- }
- // RegisterEventHandler registers a handler which is called on every endpoint slice change.
- func (c *EndpointSliceConfig) RegisterEventHandler(handler EndpointSliceHandler) {
- c.eventHandlers = append(c.eventHandlers, handler)
- }
- // Run waits for cache synced and invokes handlers after syncing.
- func (c *EndpointSliceConfig) Run(stopCh <-chan struct{}) {
- klog.Info("Starting endpoint slice config controller")
- if !cache.WaitForNamedCacheSync("endpoint slice config", stopCh, c.listerSynced) {
- return
- }
- for _, h := range c.eventHandlers {
- klog.V(3).Infof("Calling handler.OnEndpointSlicesSynced()")
- h.OnEndpointSlicesSynced()
- }
- }
- func (c *EndpointSliceConfig) handleAddEndpointSlice(obj interface{}) {
- endpointSlice, ok := obj.(*discovery.EndpointSlice)
- if !ok {
- utilruntime.HandleError(fmt.Errorf("unexpected object type: %T", obj))
- return
- }
- for _, h := range c.eventHandlers {
- klog.V(4).Infof("Calling handler.OnEndpointSliceAdd %+v", endpointSlice)
- h.OnEndpointSliceAdd(endpointSlice)
- }
- }
- func (c *EndpointSliceConfig) handleUpdateEndpointSlice(oldObj, newObj interface{}) {
- oldEndpointSlice, ok := oldObj.(*discovery.EndpointSlice)
- if !ok {
- utilruntime.HandleError(fmt.Errorf("unexpected object type: %T", newObj))
- return
- }
- newEndpointSlice, ok := newObj.(*discovery.EndpointSlice)
- if !ok {
- utilruntime.HandleError(fmt.Errorf("unexpected object type: %T", newObj))
- return
- }
- for _, h := range c.eventHandlers {
- klog.V(4).Infof("Calling handler.OnEndpointSliceUpdate")
- h.OnEndpointSliceUpdate(oldEndpointSlice, newEndpointSlice)
- }
- }
- func (c *EndpointSliceConfig) handleDeleteEndpointSlice(obj interface{}) {
- endpointSlice, ok := obj.(*discovery.EndpointSlice)
- if !ok {
- tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
- if !ok {
- utilruntime.HandleError(fmt.Errorf("unexpected object type: %T", obj))
- return
- }
- if endpointSlice, ok = tombstone.Obj.(*discovery.EndpointSlice); !ok {
- utilruntime.HandleError(fmt.Errorf("unexpected object type: %T", obj))
- return
- }
- }
- for _, h := range c.eventHandlers {
- klog.V(4).Infof("Calling handler.OnEndpointsDelete")
- h.OnEndpointSliceDelete(endpointSlice)
- }
- }
- // ServiceConfig tracks a set of service configurations.
- type ServiceConfig struct {
- listerSynced cache.InformerSynced
- eventHandlers []ServiceHandler
- }
- // NewServiceConfig creates a new ServiceConfig.
- func NewServiceConfig(serviceInformer coreinformers.ServiceInformer, resyncPeriod time.Duration) *ServiceConfig {
- result := &ServiceConfig{
- listerSynced: serviceInformer.Informer().HasSynced,
- }
- serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
- cache.ResourceEventHandlerFuncs{
- AddFunc: result.handleAddService,
- UpdateFunc: result.handleUpdateService,
- DeleteFunc: result.handleDeleteService,
- },
- resyncPeriod,
- )
- return result
- }
- // RegisterEventHandler registers a handler which is called on every service change.
- func (c *ServiceConfig) RegisterEventHandler(handler ServiceHandler) {
- c.eventHandlers = append(c.eventHandlers, handler)
- }
- // Run waits for cache synced and invokes handlers after syncing.
- func (c *ServiceConfig) Run(stopCh <-chan struct{}) {
- klog.Info("Starting service config controller")
- if !cache.WaitForNamedCacheSync("service config", stopCh, c.listerSynced) {
- return
- }
- for i := range c.eventHandlers {
- klog.V(3).Info("Calling handler.OnServiceSynced()")
- c.eventHandlers[i].OnServiceSynced()
- }
- }
- func (c *ServiceConfig) handleAddService(obj interface{}) {
- service, ok := obj.(*v1.Service)
- if !ok {
- utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
- return
- }
- for i := range c.eventHandlers {
- klog.V(4).Info("Calling handler.OnServiceAdd")
- c.eventHandlers[i].OnServiceAdd(service)
- }
- }
- func (c *ServiceConfig) handleUpdateService(oldObj, newObj interface{}) {
- oldService, ok := oldObj.(*v1.Service)
- if !ok {
- utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", oldObj))
- return
- }
- service, ok := newObj.(*v1.Service)
- if !ok {
- utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj))
- return
- }
- for i := range c.eventHandlers {
- klog.V(4).Info("Calling handler.OnServiceUpdate")
- c.eventHandlers[i].OnServiceUpdate(oldService, service)
- }
- }
- func (c *ServiceConfig) handleDeleteService(obj interface{}) {
- service, ok := obj.(*v1.Service)
- if !ok {
- tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
- if !ok {
- utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
- return
- }
- if service, ok = tombstone.Obj.(*v1.Service); !ok {
- utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
- return
- }
- }
- for i := range c.eventHandlers {
- klog.V(4).Info("Calling handler.OnServiceDelete")
- c.eventHandlers[i].OnServiceDelete(service)
- }
- }
- // NodeHandler is an abstract interface of objects which receive
- // notifications about node object changes.
- type NodeHandler interface {
- // OnNodeAdd is called whenever creation of new node object
- // is observed.
- OnNodeAdd(node *v1.Node)
- // OnNodeUpdate is called whenever modification of an existing
- // node object is observed.
- OnNodeUpdate(oldNode, node *v1.Node)
- // OnNodeDelete is called whever deletion of an existing node
- // object is observed.
- OnNodeDelete(node *v1.Node)
- // OnNodeSynced is called once all the initial event handlers were
- // called and the state is fully propagated to local cache.
- OnNodeSynced()
- }
- // NoopNodeHandler is a noop handler for proxiers that have not yet
- // implemented a full NodeHandler.
- type NoopNodeHandler struct{}
- // OnNodeAdd is a noop handler for Node creates.
- func (*NoopNodeHandler) OnNodeAdd(node *v1.Node) {}
- // OnNodeUpdate is a noop handler for Node updates.
- func (*NoopNodeHandler) OnNodeUpdate(oldNode, node *v1.Node) {}
- // OnNodeDelete is a noop handler for Node deletes.
- func (*NoopNodeHandler) OnNodeDelete(node *v1.Node) {}
- // OnNodeSynced is a noop handler for Node syncs.
- func (*NoopNodeHandler) OnNodeSynced() {}
- var _ NodeHandler = &NoopNodeHandler{}
- // NodeConfig tracks a set of node configurations.
- // It accepts "set", "add" and "remove" operations of node via channels, and invokes registered handlers on change.
- type NodeConfig struct {
- listerSynced cache.InformerSynced
- eventHandlers []NodeHandler
- }
- // NewNodeConfig creates a new NodeConfig.
- func NewNodeConfig(nodeInformer coreinformers.NodeInformer, resyncPeriod time.Duration) *NodeConfig {
- result := &NodeConfig{
- listerSynced: nodeInformer.Informer().HasSynced,
- }
- nodeInformer.Informer().AddEventHandlerWithResyncPeriod(
- cache.ResourceEventHandlerFuncs{
- AddFunc: result.handleAddNode,
- UpdateFunc: result.handleUpdateNode,
- DeleteFunc: result.handleDeleteNode,
- },
- resyncPeriod,
- )
- return result
- }
- // RegisterEventHandler registers a handler which is called on every node change.
- func (c *NodeConfig) RegisterEventHandler(handler NodeHandler) {
- c.eventHandlers = append(c.eventHandlers, handler)
- }
- // Run starts the goroutine responsible for calling registered handlers.
- func (c *NodeConfig) Run(stopCh <-chan struct{}) {
- klog.Info("Starting node config controller")
- if !cache.WaitForNamedCacheSync("node config", stopCh, c.listerSynced) {
- return
- }
- for i := range c.eventHandlers {
- klog.V(3).Infof("Calling handler.OnNodeSynced()")
- c.eventHandlers[i].OnNodeSynced()
- }
- }
- func (c *NodeConfig) handleAddNode(obj interface{}) {
- node, ok := obj.(*v1.Node)
- if !ok {
- utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
- return
- }
- for i := range c.eventHandlers {
- klog.V(4).Infof("Calling handler.OnNodeAdd")
- c.eventHandlers[i].OnNodeAdd(node)
- }
- }
- func (c *NodeConfig) handleUpdateNode(oldObj, newObj interface{}) {
- oldNode, ok := oldObj.(*v1.Node)
- if !ok {
- utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", oldObj))
- return
- }
- node, ok := newObj.(*v1.Node)
- if !ok {
- utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj))
- return
- }
- for i := range c.eventHandlers {
- klog.V(5).Infof("Calling handler.OnNodeUpdate")
- c.eventHandlers[i].OnNodeUpdate(oldNode, node)
- }
- }
- func (c *NodeConfig) handleDeleteNode(obj interface{}) {
- node, ok := obj.(*v1.Node)
- if !ok {
- tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
- if !ok {
- utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
- return
- }
- if node, ok = tombstone.Obj.(*v1.Node); !ok {
- utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
- return
- }
- }
- for i := range c.eventHandlers {
- klog.V(4).Infof("Calling handler.OnNodeDelete")
- c.eventHandlers[i].OnNodeDelete(node)
- }
- }
|