cluster_authentication_trust_controller.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517
  1. /*
  2. Copyright 2019 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package clusterauthenticationtrust
  14. import (
  15. "bytes"
  16. "context"
  17. "crypto/x509"
  18. "encoding/json"
  19. "encoding/pem"
  20. "fmt"
  21. "reflect"
  22. "strings"
  23. "time"
  24. corev1 "k8s.io/api/core/v1"
  25. "k8s.io/apimachinery/pkg/api/equality"
  26. apierrors "k8s.io/apimachinery/pkg/api/errors"
  27. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  28. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  29. "k8s.io/apimachinery/pkg/util/sets"
  30. "k8s.io/apimachinery/pkg/util/wait"
  31. "k8s.io/apiserver/pkg/authentication/request/headerrequest"
  32. "k8s.io/apiserver/pkg/server/dynamiccertificates"
  33. corev1informers "k8s.io/client-go/informers/core/v1"
  34. "k8s.io/client-go/kubernetes"
  35. corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
  36. corev1listers "k8s.io/client-go/listers/core/v1"
  37. "k8s.io/client-go/tools/cache"
  38. "k8s.io/client-go/util/cert"
  39. "k8s.io/client-go/util/workqueue"
  40. "k8s.io/klog"
  41. )
  42. const (
  43. configMapNamespace = "kube-system"
  44. configMapName = "extension-apiserver-authentication"
  45. )
  46. // Controller holds the running state for the controller
  47. type Controller struct {
  48. requiredAuthenticationData ClusterAuthenticationInfo
  49. configMapLister corev1listers.ConfigMapLister
  50. configMapClient corev1client.ConfigMapsGetter
  51. namespaceClient corev1client.NamespacesGetter
  52. // queue is where incoming work is placed to de-dup and to allow "easy" rate limited requeues on errors.
  53. // we only ever place one entry in here, but it is keyed as usual: namespace/name
  54. queue workqueue.RateLimitingInterface
  55. // kubeSystemConfigMapInformer is tracked so that we can start these on Run
  56. kubeSystemConfigMapInformer cache.SharedIndexInformer
  57. // preRunCaches are the caches to sync before starting the work of this control loop
  58. preRunCaches []cache.InformerSynced
  59. }
  60. // ClusterAuthenticationInfo holds the information that will included in public configmap.
  61. type ClusterAuthenticationInfo struct {
  62. // ClientCA is the CA that can be used to verify the identity of normal clients
  63. ClientCA dynamiccertificates.CAContentProvider
  64. // RequestHeaderUsernameHeaders are the headers used by this kube-apiserver to determine username
  65. RequestHeaderUsernameHeaders headerrequest.StringSliceProvider
  66. // RequestHeaderGroupHeaders are the headers used by this kube-apiserver to determine groups
  67. RequestHeaderGroupHeaders headerrequest.StringSliceProvider
  68. // RequestHeaderExtraHeaderPrefixes are the headers used by this kube-apiserver to determine user.extra
  69. RequestHeaderExtraHeaderPrefixes headerrequest.StringSliceProvider
  70. // RequestHeaderAllowedNames are the sujbects allowed to act as a front proxy
  71. RequestHeaderAllowedNames headerrequest.StringSliceProvider
  72. // RequestHeaderCA is the CA that can be used to verify the front proxy
  73. RequestHeaderCA dynamiccertificates.CAContentProvider
  74. }
  75. // NewClusterAuthenticationTrustController returns a controller that will maintain the kube-system configmap/extension-apiserver-authentication
  76. // that holds information about how to aggregated apiservers are recommended (but not required) to configure themselves.
  77. func NewClusterAuthenticationTrustController(requiredAuthenticationData ClusterAuthenticationInfo, kubeClient kubernetes.Interface) *Controller {
  78. // we construct our own informer because we need such a small subset of the information available. Just one namespace.
  79. kubeSystemConfigMapInformer := corev1informers.NewConfigMapInformer(kubeClient, configMapNamespace, 12*time.Hour, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
  80. c := &Controller{
  81. requiredAuthenticationData: requiredAuthenticationData,
  82. configMapLister: corev1listers.NewConfigMapLister(kubeSystemConfigMapInformer.GetIndexer()),
  83. configMapClient: kubeClient.CoreV1(),
  84. namespaceClient: kubeClient.CoreV1(),
  85. queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "cluster_authentication_trust_controller"),
  86. preRunCaches: []cache.InformerSynced{kubeSystemConfigMapInformer.HasSynced},
  87. kubeSystemConfigMapInformer: kubeSystemConfigMapInformer,
  88. }
  89. kubeSystemConfigMapInformer.AddEventHandler(cache.FilteringResourceEventHandler{
  90. FilterFunc: func(obj interface{}) bool {
  91. if cast, ok := obj.(*corev1.ConfigMap); ok {
  92. return cast.Name == configMapName
  93. }
  94. if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
  95. if cast, ok := tombstone.Obj.(*corev1.ConfigMap); ok {
  96. return cast.Name == configMapName
  97. }
  98. }
  99. return true // always return true just in case. The checks are fairly cheap
  100. },
  101. Handler: cache.ResourceEventHandlerFuncs{
  102. // we have a filter, so any time we're called, we may as well queue. We only ever check one configmap
  103. // so we don't have to be choosy about our key.
  104. AddFunc: func(obj interface{}) {
  105. c.queue.Add(keyFn())
  106. },
  107. UpdateFunc: func(oldObj, newObj interface{}) {
  108. c.queue.Add(keyFn())
  109. },
  110. DeleteFunc: func(obj interface{}) {
  111. c.queue.Add(keyFn())
  112. },
  113. },
  114. })
  115. return c
  116. }
  117. func (c *Controller) syncConfigMap() error {
  118. originalAuthConfigMap, err := c.configMapLister.ConfigMaps(configMapNamespace).Get(configMapName)
  119. if apierrors.IsNotFound(err) {
  120. originalAuthConfigMap = &corev1.ConfigMap{
  121. ObjectMeta: metav1.ObjectMeta{Namespace: configMapNamespace, Name: configMapName},
  122. }
  123. } else if err != nil {
  124. return err
  125. }
  126. // keep the original to diff against later before updating
  127. authConfigMap := originalAuthConfigMap.DeepCopy()
  128. existingAuthenticationInfo, err := getClusterAuthenticationInfoFor(originalAuthConfigMap.Data)
  129. if err != nil {
  130. return err
  131. }
  132. combinedInfo, err := combinedClusterAuthenticationInfo(existingAuthenticationInfo, c.requiredAuthenticationData)
  133. if err != nil {
  134. return err
  135. }
  136. authConfigMap.Data, err = getConfigMapDataFor(combinedInfo)
  137. if err != nil {
  138. return err
  139. }
  140. if equality.Semantic.DeepEqual(authConfigMap, originalAuthConfigMap) {
  141. klog.V(5).Info("no changes to configmap")
  142. return nil
  143. }
  144. klog.V(2).Infof("writing updated authentication info to %s configmaps/%s", configMapNamespace, configMapName)
  145. if err := createNamespaceIfNeeded(c.namespaceClient, authConfigMap.Namespace); err != nil {
  146. return err
  147. }
  148. if err := writeConfigMap(c.configMapClient, authConfigMap); err != nil {
  149. return err
  150. }
  151. return nil
  152. }
  153. func createNamespaceIfNeeded(nsClient corev1client.NamespacesGetter, ns string) error {
  154. if _, err := nsClient.Namespaces().Get(context.TODO(), ns, metav1.GetOptions{}); err == nil {
  155. // the namespace already exists
  156. return nil
  157. }
  158. newNs := &corev1.Namespace{
  159. ObjectMeta: metav1.ObjectMeta{
  160. Name: ns,
  161. Namespace: "",
  162. },
  163. }
  164. _, err := nsClient.Namespaces().Create(context.TODO(), newNs, metav1.CreateOptions{})
  165. if err != nil && apierrors.IsAlreadyExists(err) {
  166. err = nil
  167. }
  168. return err
  169. }
  170. func writeConfigMap(configMapClient corev1client.ConfigMapsGetter, required *corev1.ConfigMap) error {
  171. _, err := configMapClient.ConfigMaps(required.Namespace).Update(context.TODO(), required, metav1.UpdateOptions{})
  172. if apierrors.IsNotFound(err) {
  173. _, err := configMapClient.ConfigMaps(required.Namespace).Create(context.TODO(), required, metav1.CreateOptions{})
  174. return err
  175. }
  176. // If the configmap is too big, clear the entire thing and count on this controller (or another one) to add the correct data back.
  177. // We return the original error which causes the controller to re-queue.
  178. // Too big means
  179. // 1. request is so big the generic request catcher finds it
  180. // 2. the content is so large that that the server sends a validation error "Too long: must have at most 1048576 characters"
  181. if apierrors.IsRequestEntityTooLargeError(err) || (apierrors.IsInvalid(err) && strings.Contains(err.Error(), "Too long")) {
  182. if deleteErr := configMapClient.ConfigMaps(required.Namespace).Delete(context.TODO(), required.Name, nil); deleteErr != nil {
  183. return deleteErr
  184. }
  185. return err
  186. }
  187. return err
  188. }
  189. // combinedClusterAuthenticationInfo combines two sets of authentication information into a new one
  190. func combinedClusterAuthenticationInfo(lhs, rhs ClusterAuthenticationInfo) (ClusterAuthenticationInfo, error) {
  191. ret := ClusterAuthenticationInfo{
  192. RequestHeaderAllowedNames: combineUniqueStringSlices(lhs.RequestHeaderAllowedNames, rhs.RequestHeaderAllowedNames),
  193. RequestHeaderExtraHeaderPrefixes: combineUniqueStringSlices(lhs.RequestHeaderExtraHeaderPrefixes, rhs.RequestHeaderExtraHeaderPrefixes),
  194. RequestHeaderGroupHeaders: combineUniqueStringSlices(lhs.RequestHeaderGroupHeaders, rhs.RequestHeaderGroupHeaders),
  195. RequestHeaderUsernameHeaders: combineUniqueStringSlices(lhs.RequestHeaderUsernameHeaders, rhs.RequestHeaderUsernameHeaders),
  196. }
  197. var err error
  198. ret.ClientCA, err = combineCertLists(lhs.ClientCA, rhs.ClientCA)
  199. if err != nil {
  200. return ClusterAuthenticationInfo{}, err
  201. }
  202. ret.RequestHeaderCA, err = combineCertLists(lhs.RequestHeaderCA, rhs.RequestHeaderCA)
  203. if err != nil {
  204. return ClusterAuthenticationInfo{}, err
  205. }
  206. return ret, nil
  207. }
  208. func getConfigMapDataFor(authenticationInfo ClusterAuthenticationInfo) (map[string]string, error) {
  209. data := map[string]string{}
  210. if authenticationInfo.ClientCA != nil {
  211. if caBytes := authenticationInfo.ClientCA.CurrentCABundleContent(); len(caBytes) > 0 {
  212. data["client-ca-file"] = string(caBytes)
  213. }
  214. }
  215. if authenticationInfo.RequestHeaderCA == nil {
  216. return data, nil
  217. }
  218. if caBytes := authenticationInfo.RequestHeaderCA.CurrentCABundleContent(); len(caBytes) > 0 {
  219. var err error
  220. // encoding errors aren't going to get better, so just fail on them.
  221. data["requestheader-username-headers"], err = jsonSerializeStringSlice(authenticationInfo.RequestHeaderUsernameHeaders.Value())
  222. if err != nil {
  223. return nil, err
  224. }
  225. data["requestheader-group-headers"], err = jsonSerializeStringSlice(authenticationInfo.RequestHeaderGroupHeaders.Value())
  226. if err != nil {
  227. return nil, err
  228. }
  229. data["requestheader-extra-headers-prefix"], err = jsonSerializeStringSlice(authenticationInfo.RequestHeaderExtraHeaderPrefixes.Value())
  230. if err != nil {
  231. return nil, err
  232. }
  233. data["requestheader-client-ca-file"] = string(caBytes)
  234. data["requestheader-allowed-names"], err = jsonSerializeStringSlice(authenticationInfo.RequestHeaderAllowedNames.Value())
  235. if err != nil {
  236. return nil, err
  237. }
  238. }
  239. return data, nil
  240. }
  241. func getClusterAuthenticationInfoFor(data map[string]string) (ClusterAuthenticationInfo, error) {
  242. ret := ClusterAuthenticationInfo{}
  243. var err error
  244. ret.RequestHeaderGroupHeaders, err = jsonDeserializeStringSlice(data["requestheader-group-headers"])
  245. if err != nil {
  246. return ClusterAuthenticationInfo{}, err
  247. }
  248. ret.RequestHeaderExtraHeaderPrefixes, err = jsonDeserializeStringSlice(data["requestheader-extra-headers-prefix"])
  249. if err != nil {
  250. return ClusterAuthenticationInfo{}, err
  251. }
  252. ret.RequestHeaderAllowedNames, err = jsonDeserializeStringSlice(data["requestheader-allowed-names"])
  253. if err != nil {
  254. return ClusterAuthenticationInfo{}, err
  255. }
  256. ret.RequestHeaderUsernameHeaders, err = jsonDeserializeStringSlice(data["requestheader-username-headers"])
  257. if err != nil {
  258. return ClusterAuthenticationInfo{}, err
  259. }
  260. if caBundle := data["requestheader-client-ca-file"]; len(caBundle) > 0 {
  261. ret.RequestHeaderCA, err = dynamiccertificates.NewStaticCAContent("existing", []byte(caBundle))
  262. if err != nil {
  263. return ClusterAuthenticationInfo{}, err
  264. }
  265. }
  266. if caBundle := data["client-ca-file"]; len(caBundle) > 0 {
  267. ret.ClientCA, err = dynamiccertificates.NewStaticCAContent("existing", []byte(caBundle))
  268. if err != nil {
  269. return ClusterAuthenticationInfo{}, err
  270. }
  271. }
  272. return ret, nil
  273. }
  274. func jsonSerializeStringSlice(in []string) (string, error) {
  275. out, err := json.Marshal(in)
  276. if err != nil {
  277. return "", err
  278. }
  279. return string(out), err
  280. }
  281. func jsonDeserializeStringSlice(in string) (headerrequest.StringSliceProvider, error) {
  282. if len(in) == 0 {
  283. return nil, nil
  284. }
  285. out := []string{}
  286. if err := json.Unmarshal([]byte(in), &out); err != nil {
  287. return nil, err
  288. }
  289. return headerrequest.StaticStringSlice(out), nil
  290. }
  291. func combineUniqueStringSlices(lhs, rhs headerrequest.StringSliceProvider) headerrequest.StringSliceProvider {
  292. ret := []string{}
  293. present := sets.String{}
  294. if lhs != nil {
  295. for _, curr := range lhs.Value() {
  296. if present.Has(curr) {
  297. continue
  298. }
  299. ret = append(ret, curr)
  300. present.Insert(curr)
  301. }
  302. }
  303. if rhs != nil {
  304. for _, curr := range rhs.Value() {
  305. if present.Has(curr) {
  306. continue
  307. }
  308. ret = append(ret, curr)
  309. present.Insert(curr)
  310. }
  311. }
  312. return headerrequest.StaticStringSlice(ret)
  313. }
  314. func combineCertLists(lhs, rhs dynamiccertificates.CAContentProvider) (dynamiccertificates.CAContentProvider, error) {
  315. certificates := []*x509.Certificate{}
  316. if lhs != nil {
  317. lhsCABytes := lhs.CurrentCABundleContent()
  318. lhsCAs, err := cert.ParseCertsPEM(lhsCABytes)
  319. if err != nil {
  320. return nil, err
  321. }
  322. certificates = append(certificates, lhsCAs...)
  323. }
  324. if rhs != nil {
  325. rhsCABytes := rhs.CurrentCABundleContent()
  326. rhsCAs, err := cert.ParseCertsPEM(rhsCABytes)
  327. if err != nil {
  328. return nil, err
  329. }
  330. certificates = append(certificates, rhsCAs...)
  331. }
  332. certificates = filterExpiredCerts(certificates...)
  333. finalCertificates := []*x509.Certificate{}
  334. // now check for duplicates. n^2, but super simple
  335. for i := range certificates {
  336. found := false
  337. for j := range finalCertificates {
  338. if reflect.DeepEqual(certificates[i].Raw, finalCertificates[j].Raw) {
  339. found = true
  340. break
  341. }
  342. }
  343. if !found {
  344. finalCertificates = append(finalCertificates, certificates[i])
  345. }
  346. }
  347. finalCABytes, err := encodeCertificates(finalCertificates...)
  348. if err != nil {
  349. return nil, err
  350. }
  351. if len(finalCABytes) == 0 {
  352. return nil, nil
  353. }
  354. // it makes sense for this list to be static because the combination of sources is only used just before writing and
  355. // is recalculated
  356. return dynamiccertificates.NewStaticCAContent("combined", finalCABytes)
  357. }
  358. // filterExpiredCerts checks are all certificates in the bundle valid, i.e. they have not expired.
  359. // The function returns new bundle with only valid certificates or error if no valid certificate is found.
  360. // We allow five minutes of slack for NotAfter comparisons
  361. func filterExpiredCerts(certs ...*x509.Certificate) []*x509.Certificate {
  362. fiveMinutesAgo := time.Now().Add(-5 * time.Minute)
  363. var validCerts []*x509.Certificate
  364. for _, c := range certs {
  365. if c.NotAfter.After(fiveMinutesAgo) {
  366. validCerts = append(validCerts, c)
  367. }
  368. }
  369. return validCerts
  370. }
  371. // Enqueue a method to allow separate control loops to cause the controller to trigger and reconcile content.
  372. func (c *Controller) Enqueue() {
  373. c.queue.Add(keyFn())
  374. }
  375. // Run the controller until stopped.
  376. func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) {
  377. defer utilruntime.HandleCrash()
  378. // make sure the work queue is shutdown which will trigger workers to end
  379. defer c.queue.ShutDown()
  380. klog.Infof("Starting cluster_authentication_trust_controller controller")
  381. defer klog.Infof("Shutting down cluster_authentication_trust_controller controller")
  382. // we have a personal informer that is narrowly scoped, start it.
  383. go c.kubeSystemConfigMapInformer.Run(stopCh)
  384. // wait for your secondary caches to fill before starting your work
  385. if !cache.WaitForNamedCacheSync("cluster_authentication_trust_controller", stopCh, c.preRunCaches...) {
  386. return
  387. }
  388. // only run one worker
  389. go wait.Until(c.runWorker, time.Second, stopCh)
  390. // checks are cheap. run once a minute just to be sure we stay in sync in case fsnotify fails again
  391. // start timer that rechecks every minute, just in case. this also serves to prime the controller quickly.
  392. _ = wait.PollImmediateUntil(1*time.Minute, func() (bool, error) {
  393. c.queue.Add(keyFn())
  394. return false, nil
  395. }, stopCh)
  396. // wait until we're told to stop
  397. <-stopCh
  398. }
  399. func (c *Controller) runWorker() {
  400. // hot loop until we're told to stop. processNextWorkItem will automatically wait until there's work
  401. // available, so we don't worry about secondary waits
  402. for c.processNextWorkItem() {
  403. }
  404. }
  405. // processNextWorkItem deals with one key off the queue. It returns false when it's time to quit.
  406. func (c *Controller) processNextWorkItem() bool {
  407. // pull the next work item from queue. It should be a key we use to lookup something in a cache
  408. key, quit := c.queue.Get()
  409. if quit {
  410. return false
  411. }
  412. // you always have to indicate to the queue that you've completed a piece of work
  413. defer c.queue.Done(key)
  414. // do your work on the key. This method will contains your "do stuff" logic
  415. err := c.syncConfigMap()
  416. if err == nil {
  417. // if you had no error, tell the queue to stop tracking history for your key. This will
  418. // reset things like failure counts for per-item rate limiting
  419. c.queue.Forget(key)
  420. return true
  421. }
  422. // there was a failure so be sure to report it. This method allows for pluggable error handling
  423. // which can be used for things like cluster-monitoring
  424. utilruntime.HandleError(fmt.Errorf("%v failed with : %v", key, err))
  425. // since we failed, we should requeue the item to work on later. This method will add a backoff
  426. // to avoid hotlooping on particular items (they're probably still not going to work right away)
  427. // and overall controller protection (everything I've done is broken, this controller needs to
  428. // calm down or it can starve other useful work) cases.
  429. c.queue.AddRateLimited(key)
  430. return true
  431. }
  432. func keyFn() string {
  433. // this format matches DeletionHandlingMetaNamespaceKeyFunc for our single key
  434. return configMapNamespace + "/" + configMapName
  435. }
  436. func encodeCertificates(certs ...*x509.Certificate) ([]byte, error) {
  437. b := bytes.Buffer{}
  438. for _, cert := range certs {
  439. if err := pem.Encode(&b, &pem.Block{Type: "CERTIFICATE", Bytes: cert.Raw}); err != nil {
  440. return []byte{}, err
  441. }
  442. }
  443. return b.Bytes(), nil
  444. }