certificate_controller.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // Package certificates implements an abstract controller that is useful for
  14. // building controllers that manage CSRs
  15. package certificates
  16. import (
  17. "fmt"
  18. "time"
  19. "golang.org/x/time/rate"
  20. "k8s.io/klog"
  21. certificates "k8s.io/api/certificates/v1beta1"
  22. "k8s.io/apimachinery/pkg/api/errors"
  23. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  24. "k8s.io/apimachinery/pkg/util/wait"
  25. certificatesinformers "k8s.io/client-go/informers/certificates/v1beta1"
  26. clientset "k8s.io/client-go/kubernetes"
  27. v1core "k8s.io/client-go/kubernetes/typed/core/v1"
  28. certificateslisters "k8s.io/client-go/listers/certificates/v1beta1"
  29. "k8s.io/client-go/tools/cache"
  30. "k8s.io/client-go/tools/record"
  31. "k8s.io/client-go/util/workqueue"
  32. "k8s.io/kubernetes/pkg/controller"
  33. )
  34. type CertificateController struct {
  35. kubeClient clientset.Interface
  36. csrLister certificateslisters.CertificateSigningRequestLister
  37. csrsSynced cache.InformerSynced
  38. handler func(*certificates.CertificateSigningRequest) error
  39. queue workqueue.RateLimitingInterface
  40. }
  41. func NewCertificateController(
  42. kubeClient clientset.Interface,
  43. csrInformer certificatesinformers.CertificateSigningRequestInformer,
  44. handler func(*certificates.CertificateSigningRequest) error,
  45. ) *CertificateController {
  46. // Send events to the apiserver
  47. eventBroadcaster := record.NewBroadcaster()
  48. eventBroadcaster.StartLogging(klog.Infof)
  49. eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
  50. cc := &CertificateController{
  51. kubeClient: kubeClient,
  52. queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewMaxOfRateLimiter(
  53. workqueue.NewItemExponentialFailureRateLimiter(200*time.Millisecond, 1000*time.Second),
  54. // 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
  55. &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
  56. ), "certificate"),
  57. handler: handler,
  58. }
  59. // Manage the addition/update of certificate requests
  60. csrInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  61. AddFunc: func(obj interface{}) {
  62. csr := obj.(*certificates.CertificateSigningRequest)
  63. klog.V(4).Infof("Adding certificate request %s", csr.Name)
  64. cc.enqueueCertificateRequest(obj)
  65. },
  66. UpdateFunc: func(old, new interface{}) {
  67. oldCSR := old.(*certificates.CertificateSigningRequest)
  68. klog.V(4).Infof("Updating certificate request %s", oldCSR.Name)
  69. cc.enqueueCertificateRequest(new)
  70. },
  71. DeleteFunc: func(obj interface{}) {
  72. csr, ok := obj.(*certificates.CertificateSigningRequest)
  73. if !ok {
  74. tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
  75. if !ok {
  76. klog.V(2).Infof("Couldn't get object from tombstone %#v", obj)
  77. return
  78. }
  79. csr, ok = tombstone.Obj.(*certificates.CertificateSigningRequest)
  80. if !ok {
  81. klog.V(2).Infof("Tombstone contained object that is not a CSR: %#v", obj)
  82. return
  83. }
  84. }
  85. klog.V(4).Infof("Deleting certificate request %s", csr.Name)
  86. cc.enqueueCertificateRequest(obj)
  87. },
  88. })
  89. cc.csrLister = csrInformer.Lister()
  90. cc.csrsSynced = csrInformer.Informer().HasSynced
  91. return cc
  92. }
  93. // Run the main goroutine responsible for watching and syncing jobs.
  94. func (cc *CertificateController) Run(workers int, stopCh <-chan struct{}) {
  95. defer utilruntime.HandleCrash()
  96. defer cc.queue.ShutDown()
  97. klog.Infof("Starting certificate controller")
  98. defer klog.Infof("Shutting down certificate controller")
  99. if !controller.WaitForCacheSync("certificate", stopCh, cc.csrsSynced) {
  100. return
  101. }
  102. for i := 0; i < workers; i++ {
  103. go wait.Until(cc.worker, time.Second, stopCh)
  104. }
  105. <-stopCh
  106. }
  107. // worker runs a thread that dequeues CSRs, handles them, and marks them done.
  108. func (cc *CertificateController) worker() {
  109. for cc.processNextWorkItem() {
  110. }
  111. }
  112. // processNextWorkItem deals with one key off the queue. It returns false when it's time to quit.
  113. func (cc *CertificateController) processNextWorkItem() bool {
  114. cKey, quit := cc.queue.Get()
  115. if quit {
  116. return false
  117. }
  118. defer cc.queue.Done(cKey)
  119. if err := cc.syncFunc(cKey.(string)); err != nil {
  120. cc.queue.AddRateLimited(cKey)
  121. if _, ignorable := err.(ignorableError); !ignorable {
  122. utilruntime.HandleError(fmt.Errorf("Sync %v failed with : %v", cKey, err))
  123. } else {
  124. klog.V(4).Infof("Sync %v failed with : %v", cKey, err)
  125. }
  126. return true
  127. }
  128. cc.queue.Forget(cKey)
  129. return true
  130. }
  131. func (cc *CertificateController) enqueueCertificateRequest(obj interface{}) {
  132. key, err := controller.KeyFunc(obj)
  133. if err != nil {
  134. utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
  135. return
  136. }
  137. cc.queue.Add(key)
  138. }
  139. // maybeSignCertificate will inspect the certificate request and, if it has
  140. // been approved and meets policy expectations, generate an X509 cert using the
  141. // cluster CA assets. If successful it will update the CSR approve subresource
  142. // with the signed certificate.
  143. func (cc *CertificateController) syncFunc(key string) error {
  144. startTime := time.Now()
  145. defer func() {
  146. klog.V(4).Infof("Finished syncing certificate request %q (%v)", key, time.Since(startTime))
  147. }()
  148. csr, err := cc.csrLister.Get(key)
  149. if errors.IsNotFound(err) {
  150. klog.V(3).Infof("csr has been deleted: %v", key)
  151. return nil
  152. }
  153. if err != nil {
  154. return err
  155. }
  156. if csr.Status.Certificate != nil {
  157. // no need to do anything because it already has a cert
  158. return nil
  159. }
  160. // need to operate on a copy so we don't mutate the csr in the shared cache
  161. csr = csr.DeepCopy()
  162. return cc.handler(csr)
  163. }
  164. // IgnorableError returns an error that we shouldn't handle (i.e. log) because
  165. // it's spammy and usually user error. Instead we will log these errors at a
  166. // higher log level. We still need to throw these errors to signal that the
  167. // sync should be retried.
  168. func IgnorableError(s string, args ...interface{}) ignorableError {
  169. return ignorableError(fmt.Sprintf(s, args...))
  170. }
  171. type ignorableError string
  172. func (e ignorableError) Error() string {
  173. return string(e)
  174. }