config.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package config
  14. import (
  15. "fmt"
  16. "time"
  17. "k8s.io/api/core/v1"
  18. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  19. coreinformers "k8s.io/client-go/informers/core/v1"
  20. "k8s.io/client-go/tools/cache"
  21. "k8s.io/klog"
  22. "k8s.io/kubernetes/pkg/controller"
  23. )
  24. // ServiceHandler is an abstract interface of objects which receive
  25. // notifications about service object changes.
  26. type ServiceHandler interface {
  27. // OnServiceAdd is called whenever creation of new service object
  28. // is observed.
  29. OnServiceAdd(service *v1.Service)
  30. // OnServiceUpdate is called whenever modification of an existing
  31. // service object is observed.
  32. OnServiceUpdate(oldService, service *v1.Service)
  33. // OnServiceDelete is called whenever deletion of an existing service
  34. // object is observed.
  35. OnServiceDelete(service *v1.Service)
  36. // OnServiceSynced is called once all the initial even handlers were
  37. // called and the state is fully propagated to local cache.
  38. OnServiceSynced()
  39. }
  40. // EndpointsHandler is an abstract interface of objects which receive
  41. // notifications about endpoints object changes.
  42. type EndpointsHandler interface {
  43. // OnEndpointsAdd is called whenever creation of new endpoints object
  44. // is observed.
  45. OnEndpointsAdd(endpoints *v1.Endpoints)
  46. // OnEndpointsUpdate is called whenever modification of an existing
  47. // endpoints object is observed.
  48. OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints)
  49. // OnEndpointsDelete is called whever deletion of an existing endpoints
  50. // object is observed.
  51. OnEndpointsDelete(endpoints *v1.Endpoints)
  52. // OnEndpointsSynced is called once all the initial event handlers were
  53. // called and the state is fully propagated to local cache.
  54. OnEndpointsSynced()
  55. }
  56. // EndpointsConfig tracks a set of endpoints configurations.
  57. type EndpointsConfig struct {
  58. listerSynced cache.InformerSynced
  59. eventHandlers []EndpointsHandler
  60. }
  61. // NewEndpointsConfig creates a new EndpointsConfig.
  62. func NewEndpointsConfig(endpointsInformer coreinformers.EndpointsInformer, resyncPeriod time.Duration) *EndpointsConfig {
  63. result := &EndpointsConfig{
  64. listerSynced: endpointsInformer.Informer().HasSynced,
  65. }
  66. endpointsInformer.Informer().AddEventHandlerWithResyncPeriod(
  67. cache.ResourceEventHandlerFuncs{
  68. AddFunc: result.handleAddEndpoints,
  69. UpdateFunc: result.handleUpdateEndpoints,
  70. DeleteFunc: result.handleDeleteEndpoints,
  71. },
  72. resyncPeriod,
  73. )
  74. return result
  75. }
  76. // RegisterEventHandler registers a handler which is called on every endpoints change.
  77. func (c *EndpointsConfig) RegisterEventHandler(handler EndpointsHandler) {
  78. c.eventHandlers = append(c.eventHandlers, handler)
  79. }
  80. // Run waits for cache synced and invokes handlers after syncing.
  81. func (c *EndpointsConfig) Run(stopCh <-chan struct{}) {
  82. klog.Info("Starting endpoints config controller")
  83. if !controller.WaitForCacheSync("endpoints config", stopCh, c.listerSynced) {
  84. return
  85. }
  86. for i := range c.eventHandlers {
  87. klog.V(3).Infof("Calling handler.OnEndpointsSynced()")
  88. c.eventHandlers[i].OnEndpointsSynced()
  89. }
  90. }
  91. func (c *EndpointsConfig) handleAddEndpoints(obj interface{}) {
  92. endpoints, ok := obj.(*v1.Endpoints)
  93. if !ok {
  94. utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
  95. return
  96. }
  97. for i := range c.eventHandlers {
  98. klog.V(4).Infof("Calling handler.OnEndpointsAdd")
  99. c.eventHandlers[i].OnEndpointsAdd(endpoints)
  100. }
  101. }
  102. func (c *EndpointsConfig) handleUpdateEndpoints(oldObj, newObj interface{}) {
  103. oldEndpoints, ok := oldObj.(*v1.Endpoints)
  104. if !ok {
  105. utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", oldObj))
  106. return
  107. }
  108. endpoints, ok := newObj.(*v1.Endpoints)
  109. if !ok {
  110. utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj))
  111. return
  112. }
  113. for i := range c.eventHandlers {
  114. klog.V(4).Infof("Calling handler.OnEndpointsUpdate")
  115. c.eventHandlers[i].OnEndpointsUpdate(oldEndpoints, endpoints)
  116. }
  117. }
  118. func (c *EndpointsConfig) handleDeleteEndpoints(obj interface{}) {
  119. endpoints, ok := obj.(*v1.Endpoints)
  120. if !ok {
  121. tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
  122. if !ok {
  123. utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
  124. return
  125. }
  126. if endpoints, ok = tombstone.Obj.(*v1.Endpoints); !ok {
  127. utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
  128. return
  129. }
  130. }
  131. for i := range c.eventHandlers {
  132. klog.V(4).Infof("Calling handler.OnEndpointsDelete")
  133. c.eventHandlers[i].OnEndpointsDelete(endpoints)
  134. }
  135. }
  136. // ServiceConfig tracks a set of service configurations.
  137. type ServiceConfig struct {
  138. listerSynced cache.InformerSynced
  139. eventHandlers []ServiceHandler
  140. }
  141. // NewServiceConfig creates a new ServiceConfig.
  142. func NewServiceConfig(serviceInformer coreinformers.ServiceInformer, resyncPeriod time.Duration) *ServiceConfig {
  143. result := &ServiceConfig{
  144. listerSynced: serviceInformer.Informer().HasSynced,
  145. }
  146. serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
  147. cache.ResourceEventHandlerFuncs{
  148. AddFunc: result.handleAddService,
  149. UpdateFunc: result.handleUpdateService,
  150. DeleteFunc: result.handleDeleteService,
  151. },
  152. resyncPeriod,
  153. )
  154. return result
  155. }
  156. // RegisterEventHandler registers a handler which is called on every service change.
  157. func (c *ServiceConfig) RegisterEventHandler(handler ServiceHandler) {
  158. c.eventHandlers = append(c.eventHandlers, handler)
  159. }
  160. // Run waits for cache synced and invokes handlers after syncing.
  161. func (c *ServiceConfig) Run(stopCh <-chan struct{}) {
  162. klog.Info("Starting service config controller")
  163. if !controller.WaitForCacheSync("service config", stopCh, c.listerSynced) {
  164. return
  165. }
  166. for i := range c.eventHandlers {
  167. klog.V(3).Info("Calling handler.OnServiceSynced()")
  168. c.eventHandlers[i].OnServiceSynced()
  169. }
  170. }
  171. func (c *ServiceConfig) handleAddService(obj interface{}) {
  172. service, ok := obj.(*v1.Service)
  173. if !ok {
  174. utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
  175. return
  176. }
  177. for i := range c.eventHandlers {
  178. klog.V(4).Info("Calling handler.OnServiceAdd")
  179. c.eventHandlers[i].OnServiceAdd(service)
  180. }
  181. }
  182. func (c *ServiceConfig) handleUpdateService(oldObj, newObj interface{}) {
  183. oldService, ok := oldObj.(*v1.Service)
  184. if !ok {
  185. utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", oldObj))
  186. return
  187. }
  188. service, ok := newObj.(*v1.Service)
  189. if !ok {
  190. utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj))
  191. return
  192. }
  193. for i := range c.eventHandlers {
  194. klog.V(4).Info("Calling handler.OnServiceUpdate")
  195. c.eventHandlers[i].OnServiceUpdate(oldService, service)
  196. }
  197. }
  198. func (c *ServiceConfig) handleDeleteService(obj interface{}) {
  199. service, ok := obj.(*v1.Service)
  200. if !ok {
  201. tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
  202. if !ok {
  203. utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
  204. return
  205. }
  206. if service, ok = tombstone.Obj.(*v1.Service); !ok {
  207. utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
  208. return
  209. }
  210. }
  211. for i := range c.eventHandlers {
  212. klog.V(4).Info("Calling handler.OnServiceDelete")
  213. c.eventHandlers[i].OnServiceDelete(service)
  214. }
  215. }