aggregator.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // Package app does all of the work necessary to create a Kubernetes
  14. // APIServer by binding together the API, master and APIServer infrastructure.
  15. // It can be configured and called directly or via the hyperkube framework.
  16. package app
  17. import (
  18. "fmt"
  19. "io/ioutil"
  20. "net/http"
  21. "strings"
  22. "sync"
  23. "k8s.io/klog"
  24. apiextensionsinformers "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions"
  25. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  26. "k8s.io/apimachinery/pkg/runtime"
  27. "k8s.io/apimachinery/pkg/runtime/schema"
  28. "k8s.io/apimachinery/pkg/util/sets"
  29. "k8s.io/apiserver/pkg/admission"
  30. "k8s.io/apiserver/pkg/features"
  31. genericapiserver "k8s.io/apiserver/pkg/server"
  32. "k8s.io/apiserver/pkg/server/healthz"
  33. genericoptions "k8s.io/apiserver/pkg/server/options"
  34. "k8s.io/apiserver/pkg/util/feature"
  35. utilfeature "k8s.io/apiserver/pkg/util/feature"
  36. kubeexternalinformers "k8s.io/client-go/informers"
  37. "k8s.io/client-go/tools/cache"
  38. v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
  39. v1helper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper"
  40. "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1"
  41. aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver"
  42. aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme"
  43. apiregistrationclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/typed/apiregistration/v1"
  44. informers "k8s.io/kube-aggregator/pkg/client/informers/externalversions/apiregistration/v1"
  45. "k8s.io/kube-aggregator/pkg/controllers/autoregister"
  46. "k8s.io/kubernetes/cmd/kube-apiserver/app/options"
  47. "k8s.io/kubernetes/pkg/master/controller/crdregistration"
  48. )
  49. func createAggregatorConfig(
  50. kubeAPIServerConfig genericapiserver.Config,
  51. commandOptions *options.ServerRunOptions,
  52. externalInformers kubeexternalinformers.SharedInformerFactory,
  53. serviceResolver aggregatorapiserver.ServiceResolver,
  54. proxyTransport *http.Transport,
  55. pluginInitializers []admission.PluginInitializer,
  56. ) (*aggregatorapiserver.Config, error) {
  57. // make a shallow copy to let us twiddle a few things
  58. // most of the config actually remains the same. We only need to mess with a couple items related to the particulars of the aggregator
  59. genericConfig := kubeAPIServerConfig
  60. genericConfig.PostStartHooks = map[string]genericapiserver.PostStartHookConfigEntry{}
  61. genericConfig.RESTOptionsGetter = nil
  62. // override genericConfig.AdmissionControl with kube-aggregator's scheme,
  63. // because aggregator apiserver should use its own scheme to convert its own resources.
  64. err := commandOptions.Admission.ApplyTo(
  65. &genericConfig,
  66. externalInformers,
  67. genericConfig.LoopbackClientConfig,
  68. feature.DefaultFeatureGate,
  69. pluginInitializers...)
  70. if err != nil {
  71. return nil, err
  72. }
  73. // copy the etcd options so we don't mutate originals.
  74. etcdOptions := *commandOptions.Etcd
  75. etcdOptions.StorageConfig.Paging = utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
  76. etcdOptions.StorageConfig.Codec = aggregatorscheme.Codecs.LegacyCodec(v1beta1.SchemeGroupVersion, v1.SchemeGroupVersion)
  77. etcdOptions.StorageConfig.EncodeVersioner = runtime.NewMultiGroupVersioner(v1beta1.SchemeGroupVersion, schema.GroupKind{Group: v1beta1.GroupName})
  78. genericConfig.RESTOptionsGetter = &genericoptions.SimpleRestOptionsFactory{Options: etcdOptions}
  79. // override MergedResourceConfig with aggregator defaults and registry
  80. if err := commandOptions.APIEnablement.ApplyTo(
  81. &genericConfig,
  82. aggregatorapiserver.DefaultAPIResourceConfigSource(),
  83. aggregatorscheme.Scheme); err != nil {
  84. return nil, err
  85. }
  86. var certBytes, keyBytes []byte
  87. if len(commandOptions.ProxyClientCertFile) > 0 && len(commandOptions.ProxyClientKeyFile) > 0 {
  88. certBytes, err = ioutil.ReadFile(commandOptions.ProxyClientCertFile)
  89. if err != nil {
  90. return nil, err
  91. }
  92. keyBytes, err = ioutil.ReadFile(commandOptions.ProxyClientKeyFile)
  93. if err != nil {
  94. return nil, err
  95. }
  96. }
  97. aggregatorConfig := &aggregatorapiserver.Config{
  98. GenericConfig: &genericapiserver.RecommendedConfig{
  99. Config: genericConfig,
  100. SharedInformerFactory: externalInformers,
  101. },
  102. ExtraConfig: aggregatorapiserver.ExtraConfig{
  103. ProxyClientCert: certBytes,
  104. ProxyClientKey: keyBytes,
  105. ServiceResolver: serviceResolver,
  106. ProxyTransport: proxyTransport,
  107. },
  108. }
  109. // we need to clear the poststarthooks so we don't add them multiple times to all the servers (that fails)
  110. aggregatorConfig.GenericConfig.PostStartHooks = map[string]genericapiserver.PostStartHookConfigEntry{}
  111. return aggregatorConfig, nil
  112. }
  113. func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget, apiExtensionInformers apiextensionsinformers.SharedInformerFactory) (*aggregatorapiserver.APIAggregator, error) {
  114. aggregatorServer, err := aggregatorConfig.Complete().NewWithDelegate(delegateAPIServer)
  115. if err != nil {
  116. return nil, err
  117. }
  118. // create controllers for auto-registration
  119. apiRegistrationClient, err := apiregistrationclient.NewForConfig(aggregatorConfig.GenericConfig.LoopbackClientConfig)
  120. if err != nil {
  121. return nil, err
  122. }
  123. autoRegistrationController := autoregister.NewAutoRegisterController(aggregatorServer.APIRegistrationInformers.Apiregistration().V1().APIServices(), apiRegistrationClient)
  124. apiServices := apiServicesToRegister(delegateAPIServer, autoRegistrationController)
  125. crdRegistrationController := crdregistration.NewCRDRegistrationController(
  126. apiExtensionInformers.Apiextensions().V1().CustomResourceDefinitions(),
  127. autoRegistrationController)
  128. err = aggregatorServer.GenericAPIServer.AddPostStartHook("kube-apiserver-autoregistration", func(context genericapiserver.PostStartHookContext) error {
  129. go crdRegistrationController.Run(5, context.StopCh)
  130. go func() {
  131. // let the CRD controller process the initial set of CRDs before starting the autoregistration controller.
  132. // this prevents the autoregistration controller's initial sync from deleting APIServices for CRDs that still exist.
  133. // we only need to do this if CRDs are enabled on this server. We can't use discovery because we are the source for discovery.
  134. if aggregatorConfig.GenericConfig.MergedResourceConfig.AnyVersionForGroupEnabled("apiextensions.k8s.io") {
  135. crdRegistrationController.WaitForInitialSync()
  136. }
  137. autoRegistrationController.Run(5, context.StopCh)
  138. }()
  139. return nil
  140. })
  141. if err != nil {
  142. return nil, err
  143. }
  144. err = aggregatorServer.GenericAPIServer.AddBootSequenceHealthChecks(
  145. makeAPIServiceAvailableHealthCheck(
  146. "autoregister-completion",
  147. apiServices,
  148. aggregatorServer.APIRegistrationInformers.Apiregistration().V1().APIServices(),
  149. ),
  150. )
  151. if err != nil {
  152. return nil, err
  153. }
  154. return aggregatorServer, nil
  155. }
  156. func makeAPIService(gv schema.GroupVersion) *v1.APIService {
  157. apiServicePriority, ok := apiVersionPriorities[gv]
  158. if !ok {
  159. // if we aren't found, then we shouldn't register ourselves because it could result in a CRD group version
  160. // being permanently stuck in the APIServices list.
  161. klog.Infof("Skipping APIService creation for %v", gv)
  162. return nil
  163. }
  164. return &v1.APIService{
  165. ObjectMeta: metav1.ObjectMeta{Name: gv.Version + "." + gv.Group},
  166. Spec: v1.APIServiceSpec{
  167. Group: gv.Group,
  168. Version: gv.Version,
  169. GroupPriorityMinimum: apiServicePriority.group,
  170. VersionPriority: apiServicePriority.version,
  171. },
  172. }
  173. }
  174. // makeAPIServiceAvailableHealthCheck returns a healthz check that returns healthy
  175. // once all of the specified services have been observed to be available at least once.
  176. func makeAPIServiceAvailableHealthCheck(name string, apiServices []*v1.APIService, apiServiceInformer informers.APIServiceInformer) healthz.HealthChecker {
  177. // Track the auto-registered API services that have not been observed to be available yet
  178. pendingServiceNamesLock := &sync.RWMutex{}
  179. pendingServiceNames := sets.NewString()
  180. for _, service := range apiServices {
  181. pendingServiceNames.Insert(service.Name)
  182. }
  183. // When an APIService in the list is seen as available, remove it from the pending list
  184. handleAPIServiceChange := func(service *v1.APIService) {
  185. pendingServiceNamesLock.Lock()
  186. defer pendingServiceNamesLock.Unlock()
  187. if !pendingServiceNames.Has(service.Name) {
  188. return
  189. }
  190. if v1helper.IsAPIServiceConditionTrue(service, v1.Available) {
  191. pendingServiceNames.Delete(service.Name)
  192. }
  193. }
  194. // Watch add/update events for APIServices
  195. apiServiceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  196. AddFunc: func(obj interface{}) { handleAPIServiceChange(obj.(*v1.APIService)) },
  197. UpdateFunc: func(old, new interface{}) { handleAPIServiceChange(new.(*v1.APIService)) },
  198. })
  199. // Don't return healthy until the pending list is empty
  200. return healthz.NamedCheck(name, func(r *http.Request) error {
  201. pendingServiceNamesLock.RLock()
  202. defer pendingServiceNamesLock.RUnlock()
  203. if pendingServiceNames.Len() > 0 {
  204. return fmt.Errorf("missing APIService: %v", pendingServiceNames.List())
  205. }
  206. return nil
  207. })
  208. }
  209. // priority defines group priority that is used in discovery. This controls
  210. // group position in the kubectl output.
  211. type priority struct {
  212. // group indicates the order of the group relative to other groups.
  213. group int32
  214. // version indicates the relative order of the version inside of its group.
  215. version int32
  216. }
  217. // The proper way to resolve this letting the aggregator know the desired group and version-within-group order of the underlying servers
  218. // is to refactor the genericapiserver.DelegationTarget to include a list of priorities based on which APIs were installed.
  219. // This requires the APIGroupInfo struct to evolve and include the concept of priorities and to avoid mistakes, the core storage map there needs to be updated.
  220. // That ripples out every bit as far as you'd expect, so for 1.7 we'll include the list here instead of being built up during storage.
  221. var apiVersionPriorities = map[schema.GroupVersion]priority{
  222. {Group: "", Version: "v1"}: {group: 18000, version: 1},
  223. // extensions is above the rest for CLI compatibility, though the level of unqualified resource compatibility we
  224. // can reasonably expect seems questionable.
  225. {Group: "extensions", Version: "v1beta1"}: {group: 17900, version: 1},
  226. // to my knowledge, nothing below here collides
  227. {Group: "apps", Version: "v1"}: {group: 17800, version: 15},
  228. {Group: "events.k8s.io", Version: "v1beta1"}: {group: 17750, version: 5},
  229. {Group: "authentication.k8s.io", Version: "v1"}: {group: 17700, version: 15},
  230. {Group: "authentication.k8s.io", Version: "v1beta1"}: {group: 17700, version: 9},
  231. {Group: "authorization.k8s.io", Version: "v1"}: {group: 17600, version: 15},
  232. {Group: "authorization.k8s.io", Version: "v1beta1"}: {group: 17600, version: 9},
  233. {Group: "autoscaling", Version: "v1"}: {group: 17500, version: 15},
  234. {Group: "autoscaling", Version: "v2beta1"}: {group: 17500, version: 9},
  235. {Group: "autoscaling", Version: "v2beta2"}: {group: 17500, version: 1},
  236. {Group: "batch", Version: "v1"}: {group: 17400, version: 15},
  237. {Group: "batch", Version: "v1beta1"}: {group: 17400, version: 9},
  238. {Group: "batch", Version: "v2alpha1"}: {group: 17400, version: 9},
  239. {Group: "certificates.k8s.io", Version: "v1beta1"}: {group: 17300, version: 9},
  240. {Group: "networking.k8s.io", Version: "v1"}: {group: 17200, version: 15},
  241. {Group: "networking.k8s.io", Version: "v1beta1"}: {group: 17200, version: 9},
  242. {Group: "policy", Version: "v1beta1"}: {group: 17100, version: 9},
  243. {Group: "rbac.authorization.k8s.io", Version: "v1"}: {group: 17000, version: 15},
  244. {Group: "rbac.authorization.k8s.io", Version: "v1beta1"}: {group: 17000, version: 12},
  245. {Group: "rbac.authorization.k8s.io", Version: "v1alpha1"}: {group: 17000, version: 9},
  246. {Group: "settings.k8s.io", Version: "v1alpha1"}: {group: 16900, version: 9},
  247. {Group: "storage.k8s.io", Version: "v1"}: {group: 16800, version: 15},
  248. {Group: "storage.k8s.io", Version: "v1beta1"}: {group: 16800, version: 9},
  249. {Group: "storage.k8s.io", Version: "v1alpha1"}: {group: 16800, version: 1},
  250. {Group: "apiextensions.k8s.io", Version: "v1"}: {group: 16700, version: 15},
  251. {Group: "apiextensions.k8s.io", Version: "v1beta1"}: {group: 16700, version: 9},
  252. {Group: "admissionregistration.k8s.io", Version: "v1"}: {group: 16700, version: 15},
  253. {Group: "admissionregistration.k8s.io", Version: "v1beta1"}: {group: 16700, version: 12},
  254. {Group: "scheduling.k8s.io", Version: "v1"}: {group: 16600, version: 15},
  255. {Group: "scheduling.k8s.io", Version: "v1beta1"}: {group: 16600, version: 12},
  256. {Group: "scheduling.k8s.io", Version: "v1alpha1"}: {group: 16600, version: 9},
  257. {Group: "coordination.k8s.io", Version: "v1"}: {group: 16500, version: 15},
  258. {Group: "coordination.k8s.io", Version: "v1beta1"}: {group: 16500, version: 9},
  259. {Group: "auditregistration.k8s.io", Version: "v1alpha1"}: {group: 16400, version: 1},
  260. {Group: "node.k8s.io", Version: "v1alpha1"}: {group: 16300, version: 1},
  261. {Group: "node.k8s.io", Version: "v1beta1"}: {group: 16300, version: 9},
  262. {Group: "discovery.k8s.io", Version: "v1beta1"}: {group: 16200, version: 12},
  263. {Group: "discovery.k8s.io", Version: "v1alpha1"}: {group: 16200, version: 9},
  264. {Group: "flowcontrol.apiserver.k8s.io", Version: "v1alpha1"}: {group: 16100, version: 9},
  265. // Append a new group to the end of the list if unsure.
  266. // You can use min(existing group)-100 as the initial value for a group.
  267. // Version can be set to 9 (to have space around) for a new group.
  268. }
  269. func apiServicesToRegister(delegateAPIServer genericapiserver.DelegationTarget, registration autoregister.AutoAPIServiceRegistration) []*v1.APIService {
  270. apiServices := []*v1.APIService{}
  271. for _, curr := range delegateAPIServer.ListedPaths() {
  272. if curr == "/api/v1" {
  273. apiService := makeAPIService(schema.GroupVersion{Group: "", Version: "v1"})
  274. registration.AddAPIServiceToSyncOnStart(apiService)
  275. apiServices = append(apiServices, apiService)
  276. continue
  277. }
  278. if !strings.HasPrefix(curr, "/apis/") {
  279. continue
  280. }
  281. // this comes back in a list that looks like /apis/rbac.authorization.k8s.io/v1alpha1
  282. tokens := strings.Split(curr, "/")
  283. if len(tokens) != 4 {
  284. continue
  285. }
  286. apiService := makeAPIService(schema.GroupVersion{Group: tokens[2], Version: tokens[3]})
  287. if apiService == nil {
  288. continue
  289. }
  290. registration.AddAPIServiceToSyncOnStart(apiService)
  291. apiServices = append(apiServices, apiService)
  292. }
  293. return apiServices
  294. }