123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246 |
- /*
- Copyright 2014 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package config
- import (
- "fmt"
- "time"
- "k8s.io/api/core/v1"
- utilruntime "k8s.io/apimachinery/pkg/util/runtime"
- coreinformers "k8s.io/client-go/informers/core/v1"
- "k8s.io/client-go/tools/cache"
- "k8s.io/klog"
- "k8s.io/kubernetes/pkg/controller"
- )
- // ServiceHandler is an abstract interface of objects which receive
- // notifications about service object changes.
- type ServiceHandler interface {
- // OnServiceAdd is called whenever creation of new service object
- // is observed.
- OnServiceAdd(service *v1.Service)
- // OnServiceUpdate is called whenever modification of an existing
- // service object is observed.
- OnServiceUpdate(oldService, service *v1.Service)
- // OnServiceDelete is called whenever deletion of an existing service
- // object is observed.
- OnServiceDelete(service *v1.Service)
- // OnServiceSynced is called once all the initial even handlers were
- // called and the state is fully propagated to local cache.
- OnServiceSynced()
- }
- // EndpointsHandler is an abstract interface of objects which receive
- // notifications about endpoints object changes.
- type EndpointsHandler interface {
- // OnEndpointsAdd is called whenever creation of new endpoints object
- // is observed.
- OnEndpointsAdd(endpoints *v1.Endpoints)
- // OnEndpointsUpdate is called whenever modification of an existing
- // endpoints object is observed.
- OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints)
- // OnEndpointsDelete is called whever deletion of an existing endpoints
- // object is observed.
- OnEndpointsDelete(endpoints *v1.Endpoints)
- // OnEndpointsSynced is called once all the initial event handlers were
- // called and the state is fully propagated to local cache.
- OnEndpointsSynced()
- }
- // EndpointsConfig tracks a set of endpoints configurations.
- type EndpointsConfig struct {
- listerSynced cache.InformerSynced
- eventHandlers []EndpointsHandler
- }
- // NewEndpointsConfig creates a new EndpointsConfig.
- func NewEndpointsConfig(endpointsInformer coreinformers.EndpointsInformer, resyncPeriod time.Duration) *EndpointsConfig {
- result := &EndpointsConfig{
- listerSynced: endpointsInformer.Informer().HasSynced,
- }
- endpointsInformer.Informer().AddEventHandlerWithResyncPeriod(
- cache.ResourceEventHandlerFuncs{
- AddFunc: result.handleAddEndpoints,
- UpdateFunc: result.handleUpdateEndpoints,
- DeleteFunc: result.handleDeleteEndpoints,
- },
- resyncPeriod,
- )
- return result
- }
- // RegisterEventHandler registers a handler which is called on every endpoints change.
- func (c *EndpointsConfig) RegisterEventHandler(handler EndpointsHandler) {
- c.eventHandlers = append(c.eventHandlers, handler)
- }
- // Run waits for cache synced and invokes handlers after syncing.
- func (c *EndpointsConfig) Run(stopCh <-chan struct{}) {
- klog.Info("Starting endpoints config controller")
- if !controller.WaitForCacheSync("endpoints config", stopCh, c.listerSynced) {
- return
- }
- for i := range c.eventHandlers {
- klog.V(3).Infof("Calling handler.OnEndpointsSynced()")
- c.eventHandlers[i].OnEndpointsSynced()
- }
- }
- func (c *EndpointsConfig) handleAddEndpoints(obj interface{}) {
- endpoints, ok := obj.(*v1.Endpoints)
- if !ok {
- utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
- return
- }
- for i := range c.eventHandlers {
- klog.V(4).Infof("Calling handler.OnEndpointsAdd")
- c.eventHandlers[i].OnEndpointsAdd(endpoints)
- }
- }
- func (c *EndpointsConfig) handleUpdateEndpoints(oldObj, newObj interface{}) {
- oldEndpoints, ok := oldObj.(*v1.Endpoints)
- if !ok {
- utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", oldObj))
- return
- }
- endpoints, ok := newObj.(*v1.Endpoints)
- if !ok {
- utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj))
- return
- }
- for i := range c.eventHandlers {
- klog.V(4).Infof("Calling handler.OnEndpointsUpdate")
- c.eventHandlers[i].OnEndpointsUpdate(oldEndpoints, endpoints)
- }
- }
- func (c *EndpointsConfig) handleDeleteEndpoints(obj interface{}) {
- endpoints, ok := obj.(*v1.Endpoints)
- if !ok {
- tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
- if !ok {
- utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
- return
- }
- if endpoints, ok = tombstone.Obj.(*v1.Endpoints); !ok {
- utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
- return
- }
- }
- for i := range c.eventHandlers {
- klog.V(4).Infof("Calling handler.OnEndpointsDelete")
- c.eventHandlers[i].OnEndpointsDelete(endpoints)
- }
- }
- // ServiceConfig tracks a set of service configurations.
- type ServiceConfig struct {
- listerSynced cache.InformerSynced
- eventHandlers []ServiceHandler
- }
- // NewServiceConfig creates a new ServiceConfig.
- func NewServiceConfig(serviceInformer coreinformers.ServiceInformer, resyncPeriod time.Duration) *ServiceConfig {
- result := &ServiceConfig{
- listerSynced: serviceInformer.Informer().HasSynced,
- }
- serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
- cache.ResourceEventHandlerFuncs{
- AddFunc: result.handleAddService,
- UpdateFunc: result.handleUpdateService,
- DeleteFunc: result.handleDeleteService,
- },
- resyncPeriod,
- )
- return result
- }
- // RegisterEventHandler registers a handler which is called on every service change.
- func (c *ServiceConfig) RegisterEventHandler(handler ServiceHandler) {
- c.eventHandlers = append(c.eventHandlers, handler)
- }
- // Run waits for cache synced and invokes handlers after syncing.
- func (c *ServiceConfig) Run(stopCh <-chan struct{}) {
- klog.Info("Starting service config controller")
- if !controller.WaitForCacheSync("service config", stopCh, c.listerSynced) {
- return
- }
- for i := range c.eventHandlers {
- klog.V(3).Info("Calling handler.OnServiceSynced()")
- c.eventHandlers[i].OnServiceSynced()
- }
- }
- func (c *ServiceConfig) handleAddService(obj interface{}) {
- service, ok := obj.(*v1.Service)
- if !ok {
- utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
- return
- }
- for i := range c.eventHandlers {
- klog.V(4).Info("Calling handler.OnServiceAdd")
- c.eventHandlers[i].OnServiceAdd(service)
- }
- }
- func (c *ServiceConfig) handleUpdateService(oldObj, newObj interface{}) {
- oldService, ok := oldObj.(*v1.Service)
- if !ok {
- utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", oldObj))
- return
- }
- service, ok := newObj.(*v1.Service)
- if !ok {
- utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj))
- return
- }
- for i := range c.eventHandlers {
- klog.V(4).Info("Calling handler.OnServiceUpdate")
- c.eventHandlers[i].OnServiceUpdate(oldService, service)
- }
- }
- func (c *ServiceConfig) handleDeleteService(obj interface{}) {
- service, ok := obj.(*v1.Service)
- if !ok {
- tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
- if !ok {
- utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
- return
- }
- if service, ok = tombstone.Obj.(*v1.Service); !ok {
- utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
- return
- }
- }
- for i := range c.eventHandlers {
- klog.V(4).Info("Calling handler.OnServiceDelete")
- c.eventHandlers[i].OnServiceDelete(service)
- }
- }
|