publisher.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. /*
  2. Copyright 2018 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package rootcacertpublisher
  14. import (
  15. "context"
  16. "fmt"
  17. "reflect"
  18. "time"
  19. v1 "k8s.io/api/core/v1"
  20. apierrors "k8s.io/apimachinery/pkg/api/errors"
  21. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  22. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  23. "k8s.io/apimachinery/pkg/util/wait"
  24. coreinformers "k8s.io/client-go/informers/core/v1"
  25. clientset "k8s.io/client-go/kubernetes"
  26. corelisters "k8s.io/client-go/listers/core/v1"
  27. "k8s.io/client-go/tools/cache"
  28. "k8s.io/client-go/util/workqueue"
  29. "k8s.io/component-base/metrics/prometheus/ratelimiter"
  30. "k8s.io/klog"
  31. )
  32. // RootCACertConfigMapName is name of the configmap which stores certificates
  33. // to access api-server
  34. const RootCACertConfigMapName = "kube-root-ca.crt"
  35. // NewPublisher construct a new controller which would manage the configmap
  36. // which stores certificates in each namespace. It will make sure certificate
  37. // configmap exists in each namespace.
  38. func NewPublisher(cmInformer coreinformers.ConfigMapInformer, nsInformer coreinformers.NamespaceInformer, cl clientset.Interface, rootCA []byte) (*Publisher, error) {
  39. e := &Publisher{
  40. client: cl,
  41. rootCA: rootCA,
  42. queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "root_ca_cert_publisher"),
  43. }
  44. if cl.CoreV1().RESTClient().GetRateLimiter() != nil {
  45. if err := ratelimiter.RegisterMetricAndTrackRateLimiterUsage("root_ca_cert_publisher", cl.CoreV1().RESTClient().GetRateLimiter()); err != nil {
  46. return nil, err
  47. }
  48. }
  49. cmInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  50. DeleteFunc: e.configMapDeleted,
  51. UpdateFunc: e.configMapUpdated,
  52. })
  53. e.cmLister = cmInformer.Lister()
  54. e.cmListerSynced = cmInformer.Informer().HasSynced
  55. nsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  56. AddFunc: e.namespaceAdded,
  57. UpdateFunc: e.namespaceUpdated,
  58. })
  59. e.nsListerSynced = nsInformer.Informer().HasSynced
  60. e.syncHandler = e.syncNamespace
  61. return e, nil
  62. }
  63. // Publisher manages certificate ConfigMap objects inside Namespaces
  64. type Publisher struct {
  65. client clientset.Interface
  66. rootCA []byte
  67. // To allow injection for testing.
  68. syncHandler func(key string) error
  69. cmLister corelisters.ConfigMapLister
  70. cmListerSynced cache.InformerSynced
  71. nsListerSynced cache.InformerSynced
  72. queue workqueue.RateLimitingInterface
  73. }
  74. // Run starts process
  75. func (c *Publisher) Run(workers int, stopCh <-chan struct{}) {
  76. defer utilruntime.HandleCrash()
  77. defer c.queue.ShutDown()
  78. klog.Infof("Starting root CA certificate configmap publisher")
  79. defer klog.Infof("Shutting down root CA certificate configmap publisher")
  80. if !cache.WaitForNamedCacheSync("crt configmap", stopCh, c.cmListerSynced) {
  81. return
  82. }
  83. for i := 0; i < workers; i++ {
  84. go wait.Until(c.runWorker, time.Second, stopCh)
  85. }
  86. <-stopCh
  87. }
  88. func (c *Publisher) configMapDeleted(obj interface{}) {
  89. cm, err := convertToCM(obj)
  90. if err != nil {
  91. utilruntime.HandleError(err)
  92. return
  93. }
  94. if cm.Name != RootCACertConfigMapName {
  95. return
  96. }
  97. c.queue.Add(cm.Namespace)
  98. }
  99. func (c *Publisher) configMapUpdated(_, newObj interface{}) {
  100. cm, err := convertToCM(newObj)
  101. if err != nil {
  102. utilruntime.HandleError(err)
  103. return
  104. }
  105. if cm.Name != RootCACertConfigMapName {
  106. return
  107. }
  108. c.queue.Add(cm.Namespace)
  109. }
  110. func (c *Publisher) namespaceAdded(obj interface{}) {
  111. namespace := obj.(*v1.Namespace)
  112. c.queue.Add(namespace.Name)
  113. }
  114. func (c *Publisher) namespaceUpdated(oldObj interface{}, newObj interface{}) {
  115. newNamespace := newObj.(*v1.Namespace)
  116. if newNamespace.Status.Phase != v1.NamespaceActive {
  117. return
  118. }
  119. c.queue.Add(newNamespace.Name)
  120. }
  121. func (c *Publisher) runWorker() {
  122. for c.processNextWorkItem() {
  123. }
  124. }
  125. // processNextWorkItem deals with one key off the queue. It returns false when
  126. // it's time to quit.
  127. func (c *Publisher) processNextWorkItem() bool {
  128. key, quit := c.queue.Get()
  129. if quit {
  130. return false
  131. }
  132. defer c.queue.Done(key)
  133. if err := c.syncHandler(key.(string)); err != nil {
  134. utilruntime.HandleError(fmt.Errorf("syncing %q failed: %v", key, err))
  135. c.queue.AddRateLimited(key)
  136. return true
  137. }
  138. c.queue.Forget(key)
  139. return true
  140. }
  141. func (c *Publisher) syncNamespace(ns string) error {
  142. startTime := time.Now()
  143. defer func() {
  144. klog.V(4).Infof("Finished syncing namespace %q (%v)", ns, time.Since(startTime))
  145. }()
  146. cm, err := c.cmLister.ConfigMaps(ns).Get(RootCACertConfigMapName)
  147. switch {
  148. case apierrors.IsNotFound(err):
  149. _, err := c.client.CoreV1().ConfigMaps(ns).Create(context.TODO(), &v1.ConfigMap{
  150. ObjectMeta: metav1.ObjectMeta{
  151. Name: RootCACertConfigMapName,
  152. },
  153. Data: map[string]string{
  154. "ca.crt": string(c.rootCA),
  155. },
  156. }, metav1.CreateOptions{})
  157. return err
  158. case err != nil:
  159. return err
  160. }
  161. data := map[string]string{
  162. "ca.crt": string(c.rootCA),
  163. }
  164. if reflect.DeepEqual(cm.Data, data) {
  165. return nil
  166. }
  167. cm.Data = data
  168. _, err = c.client.CoreV1().ConfigMaps(ns).Update(context.TODO(), cm, metav1.UpdateOptions{})
  169. return err
  170. }
  171. func convertToCM(obj interface{}) (*v1.ConfigMap, error) {
  172. cm, ok := obj.(*v1.ConfigMap)
  173. if !ok {
  174. tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
  175. if !ok {
  176. return nil, fmt.Errorf("couldn't get object from tombstone %#v", obj)
  177. }
  178. cm, ok = tombstone.Obj.(*v1.ConfigMap)
  179. if !ok {
  180. return nil, fmt.Errorf("tombstone contained object that is not a ConfigMap %#v", obj)
  181. }
  182. }
  183. return cm, nil
  184. }