clusterroleaggregation_controller.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package clusterroleaggregation
  14. import (
  15. "fmt"
  16. "sort"
  17. "time"
  18. "k8s.io/klog"
  19. rbacv1 "k8s.io/api/rbac/v1"
  20. "k8s.io/apimachinery/pkg/api/equality"
  21. "k8s.io/apimachinery/pkg/api/errors"
  22. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  23. "k8s.io/apimachinery/pkg/labels"
  24. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  25. "k8s.io/apimachinery/pkg/util/wait"
  26. rbacinformers "k8s.io/client-go/informers/rbac/v1"
  27. rbacclient "k8s.io/client-go/kubernetes/typed/rbac/v1"
  28. rbaclisters "k8s.io/client-go/listers/rbac/v1"
  29. "k8s.io/client-go/tools/cache"
  30. "k8s.io/client-go/util/workqueue"
  31. "k8s.io/kubernetes/pkg/controller"
  32. )
  33. // ClusterRoleAggregationController is a controller to combine cluster roles
  34. type ClusterRoleAggregationController struct {
  35. clusterRoleClient rbacclient.ClusterRolesGetter
  36. clusterRoleLister rbaclisters.ClusterRoleLister
  37. clusterRolesSynced cache.InformerSynced
  38. syncHandler func(key string) error
  39. queue workqueue.RateLimitingInterface
  40. }
  41. // NewClusterRoleAggregation creates a new controller
  42. func NewClusterRoleAggregation(clusterRoleInformer rbacinformers.ClusterRoleInformer, clusterRoleClient rbacclient.ClusterRolesGetter) *ClusterRoleAggregationController {
  43. c := &ClusterRoleAggregationController{
  44. clusterRoleClient: clusterRoleClient,
  45. clusterRoleLister: clusterRoleInformer.Lister(),
  46. clusterRolesSynced: clusterRoleInformer.Informer().HasSynced,
  47. queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ClusterRoleAggregator"),
  48. }
  49. c.syncHandler = c.syncClusterRole
  50. clusterRoleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  51. AddFunc: func(obj interface{}) {
  52. c.enqueue()
  53. },
  54. UpdateFunc: func(old, cur interface{}) {
  55. c.enqueue()
  56. },
  57. DeleteFunc: func(uncast interface{}) {
  58. c.enqueue()
  59. },
  60. })
  61. return c
  62. }
  63. func (c *ClusterRoleAggregationController) syncClusterRole(key string) error {
  64. _, name, err := cache.SplitMetaNamespaceKey(key)
  65. if err != nil {
  66. return err
  67. }
  68. sharedClusterRole, err := c.clusterRoleLister.Get(name)
  69. if errors.IsNotFound(err) {
  70. return nil
  71. }
  72. if err != nil {
  73. return err
  74. }
  75. if sharedClusterRole.AggregationRule == nil {
  76. return nil
  77. }
  78. newPolicyRules := []rbacv1.PolicyRule{}
  79. for i := range sharedClusterRole.AggregationRule.ClusterRoleSelectors {
  80. selector := sharedClusterRole.AggregationRule.ClusterRoleSelectors[i]
  81. runtimeLabelSelector, err := metav1.LabelSelectorAsSelector(&selector)
  82. if err != nil {
  83. return err
  84. }
  85. clusterRoles, err := c.clusterRoleLister.List(runtimeLabelSelector)
  86. if err != nil {
  87. return err
  88. }
  89. sort.Sort(byName(clusterRoles))
  90. for i := range clusterRoles {
  91. if clusterRoles[i].Name == sharedClusterRole.Name {
  92. continue
  93. }
  94. for j := range clusterRoles[i].Rules {
  95. currRule := clusterRoles[i].Rules[j]
  96. if !ruleExists(newPolicyRules, currRule) {
  97. newPolicyRules = append(newPolicyRules, currRule)
  98. }
  99. }
  100. }
  101. }
  102. if equality.Semantic.DeepEqual(newPolicyRules, sharedClusterRole.Rules) {
  103. return nil
  104. }
  105. // we need to update
  106. clusterRole := sharedClusterRole.DeepCopy()
  107. clusterRole.Rules = nil
  108. for _, rule := range newPolicyRules {
  109. clusterRole.Rules = append(clusterRole.Rules, *rule.DeepCopy())
  110. }
  111. _, err = c.clusterRoleClient.ClusterRoles().Update(clusterRole)
  112. return err
  113. }
  114. func ruleExists(haystack []rbacv1.PolicyRule, needle rbacv1.PolicyRule) bool {
  115. for _, curr := range haystack {
  116. if equality.Semantic.DeepEqual(curr, needle) {
  117. return true
  118. }
  119. }
  120. return false
  121. }
  122. // Run starts the controller and blocks until stopCh is closed.
  123. func (c *ClusterRoleAggregationController) Run(workers int, stopCh <-chan struct{}) {
  124. defer utilruntime.HandleCrash()
  125. defer c.queue.ShutDown()
  126. klog.Infof("Starting ClusterRoleAggregator")
  127. defer klog.Infof("Shutting down ClusterRoleAggregator")
  128. if !controller.WaitForCacheSync("ClusterRoleAggregator", stopCh, c.clusterRolesSynced) {
  129. return
  130. }
  131. for i := 0; i < workers; i++ {
  132. go wait.Until(c.runWorker, time.Second, stopCh)
  133. }
  134. <-stopCh
  135. }
  136. func (c *ClusterRoleAggregationController) runWorker() {
  137. for c.processNextWorkItem() {
  138. }
  139. }
  140. func (c *ClusterRoleAggregationController) processNextWorkItem() bool {
  141. dsKey, quit := c.queue.Get()
  142. if quit {
  143. return false
  144. }
  145. defer c.queue.Done(dsKey)
  146. err := c.syncHandler(dsKey.(string))
  147. if err == nil {
  148. c.queue.Forget(dsKey)
  149. return true
  150. }
  151. utilruntime.HandleError(fmt.Errorf("%v failed with : %v", dsKey, err))
  152. c.queue.AddRateLimited(dsKey)
  153. return true
  154. }
  155. func (c *ClusterRoleAggregationController) enqueue() {
  156. // this is unusual, but since the set of all clusterroles is small and we don't know the dependency
  157. // graph, just queue up every thing each time. This allows errors to be selectively retried if there
  158. // is a problem updating a single role
  159. allClusterRoles, err := c.clusterRoleLister.List(labels.Everything())
  160. if err != nil {
  161. utilruntime.HandleError(fmt.Errorf("Couldn't list all objects %v", err))
  162. return
  163. }
  164. for _, clusterRole := range allClusterRoles {
  165. // only queue ones that we may need to aggregate
  166. if clusterRole.AggregationRule == nil {
  167. continue
  168. }
  169. key, err := controller.KeyFunc(clusterRole)
  170. if err != nil {
  171. utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", clusterRole, err))
  172. return
  173. }
  174. c.queue.Add(key)
  175. }
  176. }
  177. type byName []*rbacv1.ClusterRole
  178. func (a byName) Len() int { return len(a) }
  179. func (a byName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
  180. func (a byName) Less(i, j int) bool { return a[i].Name < a[j].Name }