clusterroleaggregation_controller.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package clusterroleaggregation
  14. import (
  15. "context"
  16. "fmt"
  17. "sort"
  18. "time"
  19. "k8s.io/klog"
  20. rbacv1 "k8s.io/api/rbac/v1"
  21. "k8s.io/apimachinery/pkg/api/equality"
  22. "k8s.io/apimachinery/pkg/api/errors"
  23. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  24. "k8s.io/apimachinery/pkg/labels"
  25. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  26. "k8s.io/apimachinery/pkg/util/wait"
  27. rbacinformers "k8s.io/client-go/informers/rbac/v1"
  28. rbacclient "k8s.io/client-go/kubernetes/typed/rbac/v1"
  29. rbaclisters "k8s.io/client-go/listers/rbac/v1"
  30. "k8s.io/client-go/tools/cache"
  31. "k8s.io/client-go/util/workqueue"
  32. "k8s.io/kubernetes/pkg/controller"
  33. )
  34. // ClusterRoleAggregationController is a controller to combine cluster roles
  35. type ClusterRoleAggregationController struct {
  36. clusterRoleClient rbacclient.ClusterRolesGetter
  37. clusterRoleLister rbaclisters.ClusterRoleLister
  38. clusterRolesSynced cache.InformerSynced
  39. syncHandler func(key string) error
  40. queue workqueue.RateLimitingInterface
  41. }
  42. // NewClusterRoleAggregation creates a new controller
  43. func NewClusterRoleAggregation(clusterRoleInformer rbacinformers.ClusterRoleInformer, clusterRoleClient rbacclient.ClusterRolesGetter) *ClusterRoleAggregationController {
  44. c := &ClusterRoleAggregationController{
  45. clusterRoleClient: clusterRoleClient,
  46. clusterRoleLister: clusterRoleInformer.Lister(),
  47. clusterRolesSynced: clusterRoleInformer.Informer().HasSynced,
  48. queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ClusterRoleAggregator"),
  49. }
  50. c.syncHandler = c.syncClusterRole
  51. clusterRoleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  52. AddFunc: func(obj interface{}) {
  53. c.enqueue()
  54. },
  55. UpdateFunc: func(old, cur interface{}) {
  56. c.enqueue()
  57. },
  58. DeleteFunc: func(uncast interface{}) {
  59. c.enqueue()
  60. },
  61. })
  62. return c
  63. }
  64. func (c *ClusterRoleAggregationController) syncClusterRole(key string) error {
  65. _, name, err := cache.SplitMetaNamespaceKey(key)
  66. if err != nil {
  67. return err
  68. }
  69. sharedClusterRole, err := c.clusterRoleLister.Get(name)
  70. if errors.IsNotFound(err) {
  71. return nil
  72. }
  73. if err != nil {
  74. return err
  75. }
  76. if sharedClusterRole.AggregationRule == nil {
  77. return nil
  78. }
  79. newPolicyRules := []rbacv1.PolicyRule{}
  80. for i := range sharedClusterRole.AggregationRule.ClusterRoleSelectors {
  81. selector := sharedClusterRole.AggregationRule.ClusterRoleSelectors[i]
  82. runtimeLabelSelector, err := metav1.LabelSelectorAsSelector(&selector)
  83. if err != nil {
  84. return err
  85. }
  86. clusterRoles, err := c.clusterRoleLister.List(runtimeLabelSelector)
  87. if err != nil {
  88. return err
  89. }
  90. sort.Sort(byName(clusterRoles))
  91. for i := range clusterRoles {
  92. if clusterRoles[i].Name == sharedClusterRole.Name {
  93. continue
  94. }
  95. for j := range clusterRoles[i].Rules {
  96. currRule := clusterRoles[i].Rules[j]
  97. if !ruleExists(newPolicyRules, currRule) {
  98. newPolicyRules = append(newPolicyRules, currRule)
  99. }
  100. }
  101. }
  102. }
  103. if equality.Semantic.DeepEqual(newPolicyRules, sharedClusterRole.Rules) {
  104. return nil
  105. }
  106. // we need to update
  107. clusterRole := sharedClusterRole.DeepCopy()
  108. clusterRole.Rules = nil
  109. for _, rule := range newPolicyRules {
  110. clusterRole.Rules = append(clusterRole.Rules, *rule.DeepCopy())
  111. }
  112. _, err = c.clusterRoleClient.ClusterRoles().Update(context.TODO(), clusterRole, metav1.UpdateOptions{})
  113. return err
  114. }
  115. func ruleExists(haystack []rbacv1.PolicyRule, needle rbacv1.PolicyRule) bool {
  116. for _, curr := range haystack {
  117. if equality.Semantic.DeepEqual(curr, needle) {
  118. return true
  119. }
  120. }
  121. return false
  122. }
  123. // Run starts the controller and blocks until stopCh is closed.
  124. func (c *ClusterRoleAggregationController) Run(workers int, stopCh <-chan struct{}) {
  125. defer utilruntime.HandleCrash()
  126. defer c.queue.ShutDown()
  127. klog.Infof("Starting ClusterRoleAggregator")
  128. defer klog.Infof("Shutting down ClusterRoleAggregator")
  129. if !cache.WaitForNamedCacheSync("ClusterRoleAggregator", stopCh, c.clusterRolesSynced) {
  130. return
  131. }
  132. for i := 0; i < workers; i++ {
  133. go wait.Until(c.runWorker, time.Second, stopCh)
  134. }
  135. <-stopCh
  136. }
  137. func (c *ClusterRoleAggregationController) runWorker() {
  138. for c.processNextWorkItem() {
  139. }
  140. }
  141. func (c *ClusterRoleAggregationController) processNextWorkItem() bool {
  142. dsKey, quit := c.queue.Get()
  143. if quit {
  144. return false
  145. }
  146. defer c.queue.Done(dsKey)
  147. err := c.syncHandler(dsKey.(string))
  148. if err == nil {
  149. c.queue.Forget(dsKey)
  150. return true
  151. }
  152. utilruntime.HandleError(fmt.Errorf("%v failed with : %v", dsKey, err))
  153. c.queue.AddRateLimited(dsKey)
  154. return true
  155. }
  156. func (c *ClusterRoleAggregationController) enqueue() {
  157. // this is unusual, but since the set of all clusterroles is small and we don't know the dependency
  158. // graph, just queue up every thing each time. This allows errors to be selectively retried if there
  159. // is a problem updating a single role
  160. allClusterRoles, err := c.clusterRoleLister.List(labels.Everything())
  161. if err != nil {
  162. utilruntime.HandleError(fmt.Errorf("Couldn't list all objects %v", err))
  163. return
  164. }
  165. for _, clusterRole := range allClusterRoles {
  166. // only queue ones that we may need to aggregate
  167. if clusterRole.AggregationRule == nil {
  168. continue
  169. }
  170. key, err := controller.KeyFunc(clusterRole)
  171. if err != nil {
  172. utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", clusterRole, err))
  173. return
  174. }
  175. c.queue.Add(key)
  176. }
  177. }
  178. type byName []*rbacv1.ClusterRole
  179. func (a byName) Len() int { return len(a) }
  180. func (a byName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
  181. func (a byName) Less(i, j int) bool { return a[i].Name < a[j].Name }