certificate_controller.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // Package certificates implements an abstract controller that is useful for
  14. // building controllers that manage CSRs
  15. package certificates
  16. import (
  17. "fmt"
  18. "time"
  19. "golang.org/x/time/rate"
  20. certificates "k8s.io/api/certificates/v1beta1"
  21. "k8s.io/apimachinery/pkg/api/errors"
  22. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  23. "k8s.io/apimachinery/pkg/util/wait"
  24. certificatesinformers "k8s.io/client-go/informers/certificates/v1beta1"
  25. clientset "k8s.io/client-go/kubernetes"
  26. v1core "k8s.io/client-go/kubernetes/typed/core/v1"
  27. certificateslisters "k8s.io/client-go/listers/certificates/v1beta1"
  28. "k8s.io/client-go/tools/cache"
  29. "k8s.io/client-go/tools/record"
  30. "k8s.io/client-go/util/workqueue"
  31. "k8s.io/klog"
  32. "k8s.io/kubernetes/pkg/controller"
  33. )
  34. type CertificateController struct {
  35. // name is an identifier for this particular controller instance.
  36. name string
  37. kubeClient clientset.Interface
  38. csrLister certificateslisters.CertificateSigningRequestLister
  39. csrsSynced cache.InformerSynced
  40. handler func(*certificates.CertificateSigningRequest) error
  41. queue workqueue.RateLimitingInterface
  42. }
  43. func NewCertificateController(
  44. name string,
  45. kubeClient clientset.Interface,
  46. csrInformer certificatesinformers.CertificateSigningRequestInformer,
  47. handler func(*certificates.CertificateSigningRequest) error,
  48. ) *CertificateController {
  49. // Send events to the apiserver
  50. eventBroadcaster := record.NewBroadcaster()
  51. eventBroadcaster.StartLogging(klog.Infof)
  52. eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
  53. cc := &CertificateController{
  54. name: name,
  55. kubeClient: kubeClient,
  56. queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewMaxOfRateLimiter(
  57. workqueue.NewItemExponentialFailureRateLimiter(200*time.Millisecond, 1000*time.Second),
  58. // 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
  59. &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
  60. ), "certificate"),
  61. handler: handler,
  62. }
  63. // Manage the addition/update of certificate requests
  64. csrInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  65. AddFunc: func(obj interface{}) {
  66. csr := obj.(*certificates.CertificateSigningRequest)
  67. klog.V(4).Infof("Adding certificate request %s", csr.Name)
  68. cc.enqueueCertificateRequest(obj)
  69. },
  70. UpdateFunc: func(old, new interface{}) {
  71. oldCSR := old.(*certificates.CertificateSigningRequest)
  72. klog.V(4).Infof("Updating certificate request %s", oldCSR.Name)
  73. cc.enqueueCertificateRequest(new)
  74. },
  75. DeleteFunc: func(obj interface{}) {
  76. csr, ok := obj.(*certificates.CertificateSigningRequest)
  77. if !ok {
  78. tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
  79. if !ok {
  80. klog.V(2).Infof("Couldn't get object from tombstone %#v", obj)
  81. return
  82. }
  83. csr, ok = tombstone.Obj.(*certificates.CertificateSigningRequest)
  84. if !ok {
  85. klog.V(2).Infof("Tombstone contained object that is not a CSR: %#v", obj)
  86. return
  87. }
  88. }
  89. klog.V(4).Infof("Deleting certificate request %s", csr.Name)
  90. cc.enqueueCertificateRequest(obj)
  91. },
  92. })
  93. cc.csrLister = csrInformer.Lister()
  94. cc.csrsSynced = csrInformer.Informer().HasSynced
  95. return cc
  96. }
  97. // Run the main goroutine responsible for watching and syncing jobs.
  98. func (cc *CertificateController) Run(workers int, stopCh <-chan struct{}) {
  99. defer utilruntime.HandleCrash()
  100. defer cc.queue.ShutDown()
  101. klog.Infof("Starting certificate controller %q", cc.name)
  102. defer klog.Infof("Shutting down certificate controller %q", cc.name)
  103. if !cache.WaitForNamedCacheSync(fmt.Sprintf("certificate-%s", cc.name), stopCh, cc.csrsSynced) {
  104. return
  105. }
  106. for i := 0; i < workers; i++ {
  107. go wait.Until(cc.worker, time.Second, stopCh)
  108. }
  109. <-stopCh
  110. }
  111. // worker runs a thread that dequeues CSRs, handles them, and marks them done.
  112. func (cc *CertificateController) worker() {
  113. for cc.processNextWorkItem() {
  114. }
  115. }
  116. // processNextWorkItem deals with one key off the queue. It returns false when it's time to quit.
  117. func (cc *CertificateController) processNextWorkItem() bool {
  118. cKey, quit := cc.queue.Get()
  119. if quit {
  120. return false
  121. }
  122. defer cc.queue.Done(cKey)
  123. if err := cc.syncFunc(cKey.(string)); err != nil {
  124. cc.queue.AddRateLimited(cKey)
  125. if _, ignorable := err.(ignorableError); !ignorable {
  126. utilruntime.HandleError(fmt.Errorf("Sync %v failed with : %v", cKey, err))
  127. } else {
  128. klog.V(4).Infof("Sync %v failed with : %v", cKey, err)
  129. }
  130. return true
  131. }
  132. cc.queue.Forget(cKey)
  133. return true
  134. }
  135. func (cc *CertificateController) enqueueCertificateRequest(obj interface{}) {
  136. key, err := controller.KeyFunc(obj)
  137. if err != nil {
  138. utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
  139. return
  140. }
  141. cc.queue.Add(key)
  142. }
  143. // maybeSignCertificate will inspect the certificate request and, if it has
  144. // been approved and meets policy expectations, generate an X509 cert using the
  145. // cluster CA assets. If successful it will update the CSR approve subresource
  146. // with the signed certificate.
  147. func (cc *CertificateController) syncFunc(key string) error {
  148. startTime := time.Now()
  149. defer func() {
  150. klog.V(4).Infof("Finished syncing certificate request %q (%v)", key, time.Since(startTime))
  151. }()
  152. csr, err := cc.csrLister.Get(key)
  153. if errors.IsNotFound(err) {
  154. klog.V(3).Infof("csr has been deleted: %v", key)
  155. return nil
  156. }
  157. if err != nil {
  158. return err
  159. }
  160. if csr.Status.Certificate != nil {
  161. // no need to do anything because it already has a cert
  162. return nil
  163. }
  164. // need to operate on a copy so we don't mutate the csr in the shared cache
  165. csr = csr.DeepCopy()
  166. return cc.handler(csr)
  167. }
  168. // IgnorableError returns an error that we shouldn't handle (i.e. log) because
  169. // it's spammy and usually user error. Instead we will log these errors at a
  170. // higher log level. We still need to throw these errors to signal that the
  171. // sync should be retried.
  172. func IgnorableError(s string, args ...interface{}) ignorableError {
  173. return ignorableError(fmt.Sprintf(s, args...))
  174. }
  175. type ignorableError string
  176. func (e ignorableError) Error() string {
  177. return string(e)
  178. }