serviceaccounts_controller.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package serviceaccount
  14. import (
  15. "context"
  16. "fmt"
  17. "time"
  18. "k8s.io/api/core/v1"
  19. apierrors "k8s.io/apimachinery/pkg/api/errors"
  20. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  21. utilerrors "k8s.io/apimachinery/pkg/util/errors"
  22. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  23. "k8s.io/apimachinery/pkg/util/wait"
  24. coreinformers "k8s.io/client-go/informers/core/v1"
  25. clientset "k8s.io/client-go/kubernetes"
  26. corelisters "k8s.io/client-go/listers/core/v1"
  27. "k8s.io/client-go/tools/cache"
  28. "k8s.io/client-go/util/workqueue"
  29. "k8s.io/component-base/metrics/prometheus/ratelimiter"
  30. "k8s.io/klog"
  31. )
  32. // ServiceAccountsControllerOptions contains options for running a ServiceAccountsController
  33. type ServiceAccountsControllerOptions struct {
  34. // ServiceAccounts is the list of service accounts to ensure exist in every namespace
  35. ServiceAccounts []v1.ServiceAccount
  36. // ServiceAccountResync is the interval between full resyncs of ServiceAccounts.
  37. // If non-zero, all service accounts will be re-listed this often.
  38. // Otherwise, re-list will be delayed as long as possible (until the watch is closed or times out).
  39. ServiceAccountResync time.Duration
  40. // NamespaceResync is the interval between full resyncs of Namespaces.
  41. // If non-zero, all namespaces will be re-listed this often.
  42. // Otherwise, re-list will be delayed as long as possible (until the watch is closed or times out).
  43. NamespaceResync time.Duration
  44. }
  45. // DefaultServiceAccountsControllerOptions returns the default options for creating a ServiceAccountsController.
  46. func DefaultServiceAccountsControllerOptions() ServiceAccountsControllerOptions {
  47. return ServiceAccountsControllerOptions{
  48. ServiceAccounts: []v1.ServiceAccount{
  49. {ObjectMeta: metav1.ObjectMeta{Name: "default"}},
  50. },
  51. }
  52. }
  53. // NewServiceAccountsController returns a new *ServiceAccountsController.
  54. func NewServiceAccountsController(saInformer coreinformers.ServiceAccountInformer, nsInformer coreinformers.NamespaceInformer, cl clientset.Interface, options ServiceAccountsControllerOptions) (*ServiceAccountsController, error) {
  55. e := &ServiceAccountsController{
  56. client: cl,
  57. serviceAccountsToEnsure: options.ServiceAccounts,
  58. queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "serviceaccount"),
  59. }
  60. if cl != nil && cl.CoreV1().RESTClient().GetRateLimiter() != nil {
  61. if err := ratelimiter.RegisterMetricAndTrackRateLimiterUsage("serviceaccount_controller", cl.CoreV1().RESTClient().GetRateLimiter()); err != nil {
  62. return nil, err
  63. }
  64. }
  65. saInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{
  66. DeleteFunc: e.serviceAccountDeleted,
  67. }, options.ServiceAccountResync)
  68. e.saLister = saInformer.Lister()
  69. e.saListerSynced = saInformer.Informer().HasSynced
  70. nsInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{
  71. AddFunc: e.namespaceAdded,
  72. UpdateFunc: e.namespaceUpdated,
  73. }, options.NamespaceResync)
  74. e.nsLister = nsInformer.Lister()
  75. e.nsListerSynced = nsInformer.Informer().HasSynced
  76. e.syncHandler = e.syncNamespace
  77. return e, nil
  78. }
  79. // ServiceAccountsController manages ServiceAccount objects inside Namespaces
  80. type ServiceAccountsController struct {
  81. client clientset.Interface
  82. serviceAccountsToEnsure []v1.ServiceAccount
  83. // To allow injection for testing.
  84. syncHandler func(key string) error
  85. saLister corelisters.ServiceAccountLister
  86. saListerSynced cache.InformerSynced
  87. nsLister corelisters.NamespaceLister
  88. nsListerSynced cache.InformerSynced
  89. queue workqueue.RateLimitingInterface
  90. }
  91. // Run runs the ServiceAccountsController blocks until receiving signal from stopCh.
  92. func (c *ServiceAccountsController) Run(workers int, stopCh <-chan struct{}) {
  93. defer utilruntime.HandleCrash()
  94. defer c.queue.ShutDown()
  95. klog.Infof("Starting service account controller")
  96. defer klog.Infof("Shutting down service account controller")
  97. if !cache.WaitForNamedCacheSync("service account", stopCh, c.saListerSynced, c.nsListerSynced) {
  98. return
  99. }
  100. for i := 0; i < workers; i++ {
  101. go wait.Until(c.runWorker, time.Second, stopCh)
  102. }
  103. <-stopCh
  104. }
  105. // serviceAccountDeleted reacts to a ServiceAccount deletion by recreating a default ServiceAccount in the namespace if needed
  106. func (c *ServiceAccountsController) serviceAccountDeleted(obj interface{}) {
  107. sa, ok := obj.(*v1.ServiceAccount)
  108. if !ok {
  109. tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
  110. if !ok {
  111. utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
  112. return
  113. }
  114. sa, ok = tombstone.Obj.(*v1.ServiceAccount)
  115. if !ok {
  116. utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a ServiceAccount %#v", obj))
  117. return
  118. }
  119. }
  120. c.queue.Add(sa.Namespace)
  121. }
  122. // namespaceAdded reacts to a Namespace creation by creating a default ServiceAccount object
  123. func (c *ServiceAccountsController) namespaceAdded(obj interface{}) {
  124. namespace := obj.(*v1.Namespace)
  125. c.queue.Add(namespace.Name)
  126. }
  127. // namespaceUpdated reacts to a Namespace update (or re-list) by creating a default ServiceAccount in the namespace if needed
  128. func (c *ServiceAccountsController) namespaceUpdated(oldObj interface{}, newObj interface{}) {
  129. newNamespace := newObj.(*v1.Namespace)
  130. c.queue.Add(newNamespace.Name)
  131. }
  132. func (c *ServiceAccountsController) runWorker() {
  133. for c.processNextWorkItem() {
  134. }
  135. }
  136. // processNextWorkItem deals with one key off the queue. It returns false when it's time to quit.
  137. func (c *ServiceAccountsController) processNextWorkItem() bool {
  138. key, quit := c.queue.Get()
  139. if quit {
  140. return false
  141. }
  142. defer c.queue.Done(key)
  143. err := c.syncHandler(key.(string))
  144. if err == nil {
  145. c.queue.Forget(key)
  146. return true
  147. }
  148. utilruntime.HandleError(fmt.Errorf("%v failed with : %v", key, err))
  149. c.queue.AddRateLimited(key)
  150. return true
  151. }
  152. func (c *ServiceAccountsController) syncNamespace(key string) error {
  153. startTime := time.Now()
  154. defer func() {
  155. klog.V(4).Infof("Finished syncing namespace %q (%v)", key, time.Since(startTime))
  156. }()
  157. ns, err := c.nsLister.Get(key)
  158. if apierrors.IsNotFound(err) {
  159. return nil
  160. }
  161. if err != nil {
  162. return err
  163. }
  164. if ns.Status.Phase != v1.NamespaceActive {
  165. // If namespace is not active, we shouldn't try to create anything
  166. return nil
  167. }
  168. createFailures := []error{}
  169. for _, sa := range c.serviceAccountsToEnsure {
  170. switch _, err := c.saLister.ServiceAccounts(ns.Name).Get(sa.Name); {
  171. case err == nil:
  172. continue
  173. case apierrors.IsNotFound(err):
  174. case err != nil:
  175. return err
  176. }
  177. // this is only safe because we never read it and we always write it
  178. // TODO eliminate this once the fake client can handle creation without NS
  179. sa.Namespace = ns.Name
  180. if _, err := c.client.CoreV1().ServiceAccounts(ns.Name).Create(context.TODO(), &sa, metav1.CreateOptions{}); err != nil && !apierrors.IsAlreadyExists(err) {
  181. // we can safely ignore terminating namespace errors
  182. if !apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
  183. createFailures = append(createFailures, err)
  184. }
  185. }
  186. }
  187. return utilerrors.Flatten(utilerrors.NewAggregate(createFailures))
  188. }