core.go 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // Package app implements a server that runs a set of active
  14. // components. This includes replication controllers, service endpoints and
  15. // nodes.
  16. //
  17. package app
  18. import (
  19. "errors"
  20. "fmt"
  21. "net"
  22. "net/http"
  23. "strings"
  24. "time"
  25. "k8s.io/klog"
  26. "k8s.io/api/core/v1"
  27. "k8s.io/apimachinery/pkg/runtime/schema"
  28. utilfeature "k8s.io/apiserver/pkg/util/feature"
  29. cacheddiscovery "k8s.io/client-go/discovery/cached/memory"
  30. storagev1informer "k8s.io/client-go/informers/storage/v1"
  31. storagev1beta1informer "k8s.io/client-go/informers/storage/v1beta1"
  32. clientset "k8s.io/client-go/kubernetes"
  33. "k8s.io/client-go/metadata"
  34. restclient "k8s.io/client-go/rest"
  35. "k8s.io/component-base/metrics/prometheus/ratelimiter"
  36. csitrans "k8s.io/csi-translation-lib"
  37. "k8s.io/kubernetes/pkg/controller"
  38. cloudcontroller "k8s.io/kubernetes/pkg/controller/cloud"
  39. endpointcontroller "k8s.io/kubernetes/pkg/controller/endpoint"
  40. "k8s.io/kubernetes/pkg/controller/garbagecollector"
  41. namespacecontroller "k8s.io/kubernetes/pkg/controller/namespace"
  42. nodeipamcontroller "k8s.io/kubernetes/pkg/controller/nodeipam"
  43. nodeipamconfig "k8s.io/kubernetes/pkg/controller/nodeipam/config"
  44. "k8s.io/kubernetes/pkg/controller/nodeipam/ipam"
  45. lifecyclecontroller "k8s.io/kubernetes/pkg/controller/nodelifecycle"
  46. "k8s.io/kubernetes/pkg/controller/podgc"
  47. replicationcontroller "k8s.io/kubernetes/pkg/controller/replication"
  48. resourcequotacontroller "k8s.io/kubernetes/pkg/controller/resourcequota"
  49. routecontroller "k8s.io/kubernetes/pkg/controller/route"
  50. servicecontroller "k8s.io/kubernetes/pkg/controller/service"
  51. serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount"
  52. ttlcontroller "k8s.io/kubernetes/pkg/controller/ttl"
  53. "k8s.io/kubernetes/pkg/controller/ttlafterfinished"
  54. "k8s.io/kubernetes/pkg/controller/volume/attachdetach"
  55. "k8s.io/kubernetes/pkg/controller/volume/expand"
  56. persistentvolumecontroller "k8s.io/kubernetes/pkg/controller/volume/persistentvolume"
  57. "k8s.io/kubernetes/pkg/controller/volume/pvcprotection"
  58. "k8s.io/kubernetes/pkg/controller/volume/pvprotection"
  59. "k8s.io/kubernetes/pkg/features"
  60. "k8s.io/kubernetes/pkg/quota/v1/generic"
  61. quotainstall "k8s.io/kubernetes/pkg/quota/v1/install"
  62. "k8s.io/kubernetes/pkg/volume/csimigration"
  63. netutils "k8s.io/utils/net"
  64. )
  65. const (
  66. // defaultNodeMaskCIDRIPv4 is default mask size for IPv4 node cidr
  67. defaultNodeMaskCIDRIPv4 = 24
  68. // defaultNodeMaskCIDRIPv6 is default mask size for IPv6 node cidr
  69. defaultNodeMaskCIDRIPv6 = 64
  70. )
  71. func startServiceController(ctx ControllerContext) (http.Handler, bool, error) {
  72. serviceController, err := servicecontroller.New(
  73. ctx.Cloud,
  74. ctx.ClientBuilder.ClientOrDie("service-controller"),
  75. ctx.InformerFactory.Core().V1().Services(),
  76. ctx.InformerFactory.Core().V1().Nodes(),
  77. ctx.ComponentConfig.KubeCloudShared.ClusterName,
  78. )
  79. if err != nil {
  80. // This error shouldn't fail. It lives like this as a legacy.
  81. klog.Errorf("Failed to start service controller: %v", err)
  82. return nil, false, nil
  83. }
  84. go serviceController.Run(ctx.Stop, int(ctx.ComponentConfig.ServiceController.ConcurrentServiceSyncs))
  85. return nil, true, nil
  86. }
  87. func startNodeIpamController(ctx ControllerContext) (http.Handler, bool, error) {
  88. var serviceCIDR *net.IPNet
  89. var secondaryServiceCIDR *net.IPNet
  90. // should we start nodeIPAM
  91. if !ctx.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs {
  92. return nil, false, nil
  93. }
  94. // failure: bad cidrs in config
  95. clusterCIDRs, dualStack, err := processCIDRs(ctx.ComponentConfig.KubeCloudShared.ClusterCIDR)
  96. if err != nil {
  97. return nil, false, err
  98. }
  99. // failure: more than one cidr and dual stack is not enabled
  100. if len(clusterCIDRs) > 1 && !utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) {
  101. return nil, false, fmt.Errorf("len of ClusterCIDRs==%v and dualstack feature is not enabled", len(clusterCIDRs))
  102. }
  103. // failure: more than one cidr but they are not configured as dual stack
  104. if len(clusterCIDRs) > 1 && !dualStack {
  105. return nil, false, fmt.Errorf("len of ClusterCIDRs==%v and they are not configured as dual stack (at least one from each IPFamily", len(clusterCIDRs))
  106. }
  107. // failure: more than cidrs is not allowed even with dual stack
  108. if len(clusterCIDRs) > 2 {
  109. return nil, false, fmt.Errorf("len of clusters is:%v > more than max allowed of 2", len(clusterCIDRs))
  110. }
  111. // service cidr processing
  112. if len(strings.TrimSpace(ctx.ComponentConfig.NodeIPAMController.ServiceCIDR)) != 0 {
  113. _, serviceCIDR, err = net.ParseCIDR(ctx.ComponentConfig.NodeIPAMController.ServiceCIDR)
  114. if err != nil {
  115. klog.Warningf("Unsuccessful parsing of service CIDR %v: %v", ctx.ComponentConfig.NodeIPAMController.ServiceCIDR, err)
  116. }
  117. }
  118. if len(strings.TrimSpace(ctx.ComponentConfig.NodeIPAMController.SecondaryServiceCIDR)) != 0 {
  119. _, secondaryServiceCIDR, err = net.ParseCIDR(ctx.ComponentConfig.NodeIPAMController.SecondaryServiceCIDR)
  120. if err != nil {
  121. klog.Warningf("Unsuccessful parsing of service CIDR %v: %v", ctx.ComponentConfig.NodeIPAMController.SecondaryServiceCIDR, err)
  122. }
  123. }
  124. // the following checks are triggered if both serviceCIDR and secondaryServiceCIDR are provided
  125. if serviceCIDR != nil && secondaryServiceCIDR != nil {
  126. // should have dual stack flag enabled
  127. if !utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) {
  128. return nil, false, fmt.Errorf("secondary service cidr is provided and IPv6DualStack feature is not enabled")
  129. }
  130. // should be dual stack (from different IPFamilies)
  131. dualstackServiceCIDR, err := netutils.IsDualStackCIDRs([]*net.IPNet{serviceCIDR, secondaryServiceCIDR})
  132. if err != nil {
  133. return nil, false, fmt.Errorf("failed to perform dualstack check on serviceCIDR and secondaryServiceCIDR error:%v", err)
  134. }
  135. if !dualstackServiceCIDR {
  136. return nil, false, fmt.Errorf("serviceCIDR and secondaryServiceCIDR are not dualstack (from different IPfamiles)")
  137. }
  138. }
  139. var nodeCIDRMaskSizeIPv4, nodeCIDRMaskSizeIPv6 int
  140. if utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) {
  141. // only --node-cidr-mask-size-ipv4 and --node-cidr-mask-size-ipv6 supported with dual stack clusters.
  142. // --node-cidr-mask-size flag is incompatible with dual stack clusters.
  143. nodeCIDRMaskSizeIPv4, nodeCIDRMaskSizeIPv6, err = setNodeCIDRMaskSizesDualStack(ctx.ComponentConfig.NodeIPAMController)
  144. } else {
  145. // only --node-cidr-mask-size supported with single stack clusters.
  146. // --node-cidr-mask-size-ipv4 and --node-cidr-mask-size-ipv6 flags are incompatible with dual stack clusters.
  147. nodeCIDRMaskSizeIPv4, nodeCIDRMaskSizeIPv6, err = setNodeCIDRMaskSizes(ctx.ComponentConfig.NodeIPAMController)
  148. }
  149. if err != nil {
  150. return nil, false, err
  151. }
  152. // get list of node cidr mask sizes
  153. nodeCIDRMaskSizes := getNodeCIDRMaskSizes(clusterCIDRs, nodeCIDRMaskSizeIPv4, nodeCIDRMaskSizeIPv6)
  154. nodeIpamController, err := nodeipamcontroller.NewNodeIpamController(
  155. ctx.InformerFactory.Core().V1().Nodes(),
  156. ctx.Cloud,
  157. ctx.ClientBuilder.ClientOrDie("node-controller"),
  158. clusterCIDRs,
  159. serviceCIDR,
  160. secondaryServiceCIDR,
  161. nodeCIDRMaskSizes,
  162. ipam.CIDRAllocatorType(ctx.ComponentConfig.KubeCloudShared.CIDRAllocatorType),
  163. )
  164. if err != nil {
  165. return nil, true, err
  166. }
  167. go nodeIpamController.Run(ctx.Stop)
  168. return nil, true, nil
  169. }
  170. func startNodeLifecycleController(ctx ControllerContext) (http.Handler, bool, error) {
  171. lifecycleController, err := lifecyclecontroller.NewNodeLifecycleController(
  172. ctx.InformerFactory.Coordination().V1().Leases(),
  173. ctx.InformerFactory.Core().V1().Pods(),
  174. ctx.InformerFactory.Core().V1().Nodes(),
  175. ctx.InformerFactory.Apps().V1().DaemonSets(),
  176. // node lifecycle controller uses existing cluster role from node-controller
  177. ctx.ClientBuilder.ClientOrDie("node-controller"),
  178. ctx.ComponentConfig.KubeCloudShared.NodeMonitorPeriod.Duration,
  179. ctx.ComponentConfig.NodeLifecycleController.NodeStartupGracePeriod.Duration,
  180. ctx.ComponentConfig.NodeLifecycleController.NodeMonitorGracePeriod.Duration,
  181. ctx.ComponentConfig.NodeLifecycleController.PodEvictionTimeout.Duration,
  182. ctx.ComponentConfig.NodeLifecycleController.NodeEvictionRate,
  183. ctx.ComponentConfig.NodeLifecycleController.SecondaryNodeEvictionRate,
  184. ctx.ComponentConfig.NodeLifecycleController.LargeClusterSizeThreshold,
  185. ctx.ComponentConfig.NodeLifecycleController.UnhealthyZoneThreshold,
  186. ctx.ComponentConfig.NodeLifecycleController.EnableTaintManager,
  187. utilfeature.DefaultFeatureGate.Enabled(features.TaintBasedEvictions),
  188. )
  189. if err != nil {
  190. return nil, true, err
  191. }
  192. go lifecycleController.Run(ctx.Stop)
  193. return nil, true, nil
  194. }
  195. func startCloudNodeLifecycleController(ctx ControllerContext) (http.Handler, bool, error) {
  196. cloudNodeLifecycleController, err := cloudcontroller.NewCloudNodeLifecycleController(
  197. ctx.InformerFactory.Core().V1().Nodes(),
  198. // cloud node lifecycle controller uses existing cluster role from node-controller
  199. ctx.ClientBuilder.ClientOrDie("node-controller"),
  200. ctx.Cloud,
  201. ctx.ComponentConfig.KubeCloudShared.NodeMonitorPeriod.Duration,
  202. )
  203. if err != nil {
  204. // the controller manager should continue to run if the "Instances" interface is not
  205. // supported, though it's unlikely for a cloud provider to not support it
  206. klog.Errorf("failed to start cloud node lifecycle controller: %v", err)
  207. return nil, false, nil
  208. }
  209. go cloudNodeLifecycleController.Run(ctx.Stop)
  210. return nil, true, nil
  211. }
  212. func startRouteController(ctx ControllerContext) (http.Handler, bool, error) {
  213. if !ctx.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs || !ctx.ComponentConfig.KubeCloudShared.ConfigureCloudRoutes {
  214. klog.Infof("Will not configure cloud provider routes for allocate-node-cidrs: %v, configure-cloud-routes: %v.", ctx.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs, ctx.ComponentConfig.KubeCloudShared.ConfigureCloudRoutes)
  215. return nil, false, nil
  216. }
  217. if ctx.Cloud == nil {
  218. klog.Warning("configure-cloud-routes is set, but no cloud provider specified. Will not configure cloud provider routes.")
  219. return nil, false, nil
  220. }
  221. routes, ok := ctx.Cloud.Routes()
  222. if !ok {
  223. klog.Warning("configure-cloud-routes is set, but cloud provider does not support routes. Will not configure cloud provider routes.")
  224. return nil, false, nil
  225. }
  226. // failure: bad cidrs in config
  227. clusterCIDRs, dualStack, err := processCIDRs(ctx.ComponentConfig.KubeCloudShared.ClusterCIDR)
  228. if err != nil {
  229. return nil, false, err
  230. }
  231. // failure: more than one cidr and dual stack is not enabled
  232. if len(clusterCIDRs) > 1 && !utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) {
  233. return nil, false, fmt.Errorf("len of ClusterCIDRs==%v and dualstack feature is not enabled", len(clusterCIDRs))
  234. }
  235. // failure: more than one cidr but they are not configured as dual stack
  236. if len(clusterCIDRs) > 1 && !dualStack {
  237. return nil, false, fmt.Errorf("len of ClusterCIDRs==%v and they are not configured as dual stack (at least one from each IPFamily", len(clusterCIDRs))
  238. }
  239. // failure: more than cidrs is not allowed even with dual stack
  240. if len(clusterCIDRs) > 2 {
  241. return nil, false, fmt.Errorf("length of clusterCIDRs is:%v more than max allowed of 2", len(clusterCIDRs))
  242. }
  243. routeController := routecontroller.New(routes,
  244. ctx.ClientBuilder.ClientOrDie("route-controller"),
  245. ctx.InformerFactory.Core().V1().Nodes(),
  246. ctx.ComponentConfig.KubeCloudShared.ClusterName,
  247. clusterCIDRs)
  248. go routeController.Run(ctx.Stop, ctx.ComponentConfig.KubeCloudShared.RouteReconciliationPeriod.Duration)
  249. return nil, true, nil
  250. }
  251. func startPersistentVolumeBinderController(ctx ControllerContext) (http.Handler, bool, error) {
  252. plugins, err := ProbeControllerVolumePlugins(ctx.Cloud, ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration)
  253. if err != nil {
  254. return nil, true, fmt.Errorf("failed to probe volume plugins when starting persistentvolume controller: %v", err)
  255. }
  256. params := persistentvolumecontroller.ControllerParameters{
  257. KubeClient: ctx.ClientBuilder.ClientOrDie("persistent-volume-binder"),
  258. SyncPeriod: ctx.ComponentConfig.PersistentVolumeBinderController.PVClaimBinderSyncPeriod.Duration,
  259. VolumePlugins: plugins,
  260. Cloud: ctx.Cloud,
  261. ClusterName: ctx.ComponentConfig.KubeCloudShared.ClusterName,
  262. VolumeInformer: ctx.InformerFactory.Core().V1().PersistentVolumes(),
  263. ClaimInformer: ctx.InformerFactory.Core().V1().PersistentVolumeClaims(),
  264. ClassInformer: ctx.InformerFactory.Storage().V1().StorageClasses(),
  265. PodInformer: ctx.InformerFactory.Core().V1().Pods(),
  266. NodeInformer: ctx.InformerFactory.Core().V1().Nodes(),
  267. EnableDynamicProvisioning: ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration.EnableDynamicProvisioning,
  268. }
  269. volumeController, volumeControllerErr := persistentvolumecontroller.NewController(params)
  270. if volumeControllerErr != nil {
  271. return nil, true, fmt.Errorf("failed to construct persistentvolume controller: %v", volumeControllerErr)
  272. }
  273. go volumeController.Run(ctx.Stop)
  274. return nil, true, nil
  275. }
  276. func startAttachDetachController(ctx ControllerContext) (http.Handler, bool, error) {
  277. if ctx.ComponentConfig.AttachDetachController.ReconcilerSyncLoopPeriod.Duration < time.Second {
  278. return nil, true, fmt.Errorf("duration time must be greater than one second as set via command line option reconcile-sync-loop-period")
  279. }
  280. var (
  281. csiNodeInformer storagev1informer.CSINodeInformer
  282. csiDriverInformer storagev1beta1informer.CSIDriverInformer
  283. )
  284. if utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) {
  285. csiNodeInformer = ctx.InformerFactory.Storage().V1().CSINodes()
  286. }
  287. if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
  288. csiDriverInformer = ctx.InformerFactory.Storage().V1beta1().CSIDrivers()
  289. }
  290. plugins, err := ProbeAttachableVolumePlugins()
  291. if err != nil {
  292. return nil, true, fmt.Errorf("failed to probe volume plugins when starting attach/detach controller: %v", err)
  293. }
  294. attachDetachController, attachDetachControllerErr :=
  295. attachdetach.NewAttachDetachController(
  296. ctx.ClientBuilder.ClientOrDie("attachdetach-controller"),
  297. ctx.InformerFactory.Core().V1().Pods(),
  298. ctx.InformerFactory.Core().V1().Nodes(),
  299. ctx.InformerFactory.Core().V1().PersistentVolumeClaims(),
  300. ctx.InformerFactory.Core().V1().PersistentVolumes(),
  301. csiNodeInformer,
  302. csiDriverInformer,
  303. ctx.Cloud,
  304. plugins,
  305. GetDynamicPluginProber(ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration),
  306. ctx.ComponentConfig.AttachDetachController.DisableAttachDetachReconcilerSync,
  307. ctx.ComponentConfig.AttachDetachController.ReconcilerSyncLoopPeriod.Duration,
  308. attachdetach.DefaultTimerConfig,
  309. )
  310. if attachDetachControllerErr != nil {
  311. return nil, true, fmt.Errorf("failed to start attach/detach controller: %v", attachDetachControllerErr)
  312. }
  313. go attachDetachController.Run(ctx.Stop)
  314. return nil, true, nil
  315. }
  316. func startVolumeExpandController(ctx ControllerContext) (http.Handler, bool, error) {
  317. if utilfeature.DefaultFeatureGate.Enabled(features.ExpandPersistentVolumes) {
  318. plugins, err := ProbeExpandableVolumePlugins(ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration)
  319. if err != nil {
  320. return nil, true, fmt.Errorf("failed to probe volume plugins when starting volume expand controller: %v", err)
  321. }
  322. csiTranslator := csitrans.New()
  323. expandController, expandControllerErr := expand.NewExpandController(
  324. ctx.ClientBuilder.ClientOrDie("expand-controller"),
  325. ctx.InformerFactory.Core().V1().PersistentVolumeClaims(),
  326. ctx.InformerFactory.Core().V1().PersistentVolumes(),
  327. ctx.InformerFactory.Storage().V1().StorageClasses(),
  328. ctx.Cloud,
  329. plugins,
  330. csiTranslator,
  331. csimigration.NewPluginManager(csiTranslator))
  332. if expandControllerErr != nil {
  333. return nil, true, fmt.Errorf("failed to start volume expand controller: %v", expandControllerErr)
  334. }
  335. go expandController.Run(ctx.Stop)
  336. return nil, true, nil
  337. }
  338. return nil, false, nil
  339. }
  340. func startEndpointController(ctx ControllerContext) (http.Handler, bool, error) {
  341. go endpointcontroller.NewEndpointController(
  342. ctx.InformerFactory.Core().V1().Pods(),
  343. ctx.InformerFactory.Core().V1().Services(),
  344. ctx.InformerFactory.Core().V1().Endpoints(),
  345. ctx.ClientBuilder.ClientOrDie("endpoint-controller"),
  346. ctx.ComponentConfig.EndpointController.EndpointUpdatesBatchPeriod.Duration,
  347. ).Run(int(ctx.ComponentConfig.EndpointController.ConcurrentEndpointSyncs), ctx.Stop)
  348. return nil, true, nil
  349. }
  350. func startReplicationController(ctx ControllerContext) (http.Handler, bool, error) {
  351. go replicationcontroller.NewReplicationManager(
  352. ctx.InformerFactory.Core().V1().Pods(),
  353. ctx.InformerFactory.Core().V1().ReplicationControllers(),
  354. ctx.ClientBuilder.ClientOrDie("replication-controller"),
  355. replicationcontroller.BurstReplicas,
  356. ).Run(int(ctx.ComponentConfig.ReplicationController.ConcurrentRCSyncs), ctx.Stop)
  357. return nil, true, nil
  358. }
  359. func startPodGCController(ctx ControllerContext) (http.Handler, bool, error) {
  360. go podgc.NewPodGC(
  361. ctx.ClientBuilder.ClientOrDie("pod-garbage-collector"),
  362. ctx.InformerFactory.Core().V1().Pods(),
  363. ctx.InformerFactory.Core().V1().Nodes(),
  364. int(ctx.ComponentConfig.PodGCController.TerminatedPodGCThreshold),
  365. ).Run(ctx.Stop)
  366. return nil, true, nil
  367. }
  368. func startResourceQuotaController(ctx ControllerContext) (http.Handler, bool, error) {
  369. resourceQuotaControllerClient := ctx.ClientBuilder.ClientOrDie("resourcequota-controller")
  370. discoveryFunc := resourceQuotaControllerClient.Discovery().ServerPreferredNamespacedResources
  371. listerFuncForResource := generic.ListerFuncForResourceFunc(ctx.InformerFactory.ForResource)
  372. quotaConfiguration := quotainstall.NewQuotaConfigurationForControllers(listerFuncForResource)
  373. resourceQuotaControllerOptions := &resourcequotacontroller.ResourceQuotaControllerOptions{
  374. QuotaClient: resourceQuotaControllerClient.CoreV1(),
  375. ResourceQuotaInformer: ctx.InformerFactory.Core().V1().ResourceQuotas(),
  376. ResyncPeriod: controller.StaticResyncPeriodFunc(ctx.ComponentConfig.ResourceQuotaController.ResourceQuotaSyncPeriod.Duration),
  377. InformerFactory: ctx.ObjectOrMetadataInformerFactory,
  378. ReplenishmentResyncPeriod: ctx.ResyncPeriod,
  379. DiscoveryFunc: discoveryFunc,
  380. IgnoredResourcesFunc: quotaConfiguration.IgnoredResources,
  381. InformersStarted: ctx.InformersStarted,
  382. Registry: generic.NewRegistry(quotaConfiguration.Evaluators()),
  383. }
  384. if resourceQuotaControllerClient.CoreV1().RESTClient().GetRateLimiter() != nil {
  385. if err := ratelimiter.RegisterMetricAndTrackRateLimiterUsage("resource_quota_controller", resourceQuotaControllerClient.CoreV1().RESTClient().GetRateLimiter()); err != nil {
  386. return nil, true, err
  387. }
  388. }
  389. resourceQuotaController, err := resourcequotacontroller.NewResourceQuotaController(resourceQuotaControllerOptions)
  390. if err != nil {
  391. return nil, false, err
  392. }
  393. go resourceQuotaController.Run(int(ctx.ComponentConfig.ResourceQuotaController.ConcurrentResourceQuotaSyncs), ctx.Stop)
  394. // Periodically the quota controller to detect new resource types
  395. go resourceQuotaController.Sync(discoveryFunc, 30*time.Second, ctx.Stop)
  396. return nil, true, nil
  397. }
  398. func startNamespaceController(ctx ControllerContext) (http.Handler, bool, error) {
  399. // the namespace cleanup controller is very chatty. It makes lots of discovery calls and then it makes lots of delete calls
  400. // the ratelimiter negatively affects its speed. Deleting 100 total items in a namespace (that's only a few of each resource
  401. // including events), takes ~10 seconds by default.
  402. nsKubeconfig := ctx.ClientBuilder.ConfigOrDie("namespace-controller")
  403. nsKubeconfig.QPS *= 20
  404. nsKubeconfig.Burst *= 100
  405. namespaceKubeClient := clientset.NewForConfigOrDie(nsKubeconfig)
  406. return startModifiedNamespaceController(ctx, namespaceKubeClient, nsKubeconfig)
  407. }
  408. func startModifiedNamespaceController(ctx ControllerContext, namespaceKubeClient clientset.Interface, nsKubeconfig *restclient.Config) (http.Handler, bool, error) {
  409. metadataClient, err := metadata.NewForConfig(nsKubeconfig)
  410. if err != nil {
  411. return nil, true, err
  412. }
  413. discoverResourcesFn := namespaceKubeClient.Discovery().ServerPreferredNamespacedResources
  414. namespaceController := namespacecontroller.NewNamespaceController(
  415. namespaceKubeClient,
  416. metadataClient,
  417. discoverResourcesFn,
  418. ctx.InformerFactory.Core().V1().Namespaces(),
  419. ctx.ComponentConfig.NamespaceController.NamespaceSyncPeriod.Duration,
  420. v1.FinalizerKubernetes,
  421. )
  422. go namespaceController.Run(int(ctx.ComponentConfig.NamespaceController.ConcurrentNamespaceSyncs), ctx.Stop)
  423. return nil, true, nil
  424. }
  425. func startServiceAccountController(ctx ControllerContext) (http.Handler, bool, error) {
  426. sac, err := serviceaccountcontroller.NewServiceAccountsController(
  427. ctx.InformerFactory.Core().V1().ServiceAccounts(),
  428. ctx.InformerFactory.Core().V1().Namespaces(),
  429. ctx.ClientBuilder.ClientOrDie("service-account-controller"),
  430. serviceaccountcontroller.DefaultServiceAccountsControllerOptions(),
  431. )
  432. if err != nil {
  433. return nil, true, fmt.Errorf("error creating ServiceAccount controller: %v", err)
  434. }
  435. go sac.Run(1, ctx.Stop)
  436. return nil, true, nil
  437. }
  438. func startTTLController(ctx ControllerContext) (http.Handler, bool, error) {
  439. go ttlcontroller.NewTTLController(
  440. ctx.InformerFactory.Core().V1().Nodes(),
  441. ctx.ClientBuilder.ClientOrDie("ttl-controller"),
  442. ).Run(5, ctx.Stop)
  443. return nil, true, nil
  444. }
  445. func startGarbageCollectorController(ctx ControllerContext) (http.Handler, bool, error) {
  446. if !ctx.ComponentConfig.GarbageCollectorController.EnableGarbageCollector {
  447. return nil, false, nil
  448. }
  449. gcClientset := ctx.ClientBuilder.ClientOrDie("generic-garbage-collector")
  450. discoveryClient := cacheddiscovery.NewMemCacheClient(gcClientset.Discovery())
  451. config := ctx.ClientBuilder.ConfigOrDie("generic-garbage-collector")
  452. metadataClient, err := metadata.NewForConfig(config)
  453. if err != nil {
  454. return nil, true, err
  455. }
  456. // Get an initial set of deletable resources to prime the garbage collector.
  457. deletableResources := garbagecollector.GetDeletableResources(discoveryClient)
  458. ignoredResources := make(map[schema.GroupResource]struct{})
  459. for _, r := range ctx.ComponentConfig.GarbageCollectorController.GCIgnoredResources {
  460. ignoredResources[schema.GroupResource{Group: r.Group, Resource: r.Resource}] = struct{}{}
  461. }
  462. garbageCollector, err := garbagecollector.NewGarbageCollector(
  463. metadataClient,
  464. ctx.RESTMapper,
  465. deletableResources,
  466. ignoredResources,
  467. ctx.ObjectOrMetadataInformerFactory,
  468. ctx.InformersStarted,
  469. )
  470. if err != nil {
  471. return nil, true, fmt.Errorf("failed to start the generic garbage collector: %v", err)
  472. }
  473. // Start the garbage collector.
  474. workers := int(ctx.ComponentConfig.GarbageCollectorController.ConcurrentGCSyncs)
  475. go garbageCollector.Run(workers, ctx.Stop)
  476. // Periodically refresh the RESTMapper with new discovery information and sync
  477. // the garbage collector.
  478. go garbageCollector.Sync(gcClientset.Discovery(), 30*time.Second, ctx.Stop)
  479. return garbagecollector.NewDebugHandler(garbageCollector), true, nil
  480. }
  481. func startPVCProtectionController(ctx ControllerContext) (http.Handler, bool, error) {
  482. go pvcprotection.NewPVCProtectionController(
  483. ctx.InformerFactory.Core().V1().PersistentVolumeClaims(),
  484. ctx.InformerFactory.Core().V1().Pods(),
  485. ctx.ClientBuilder.ClientOrDie("pvc-protection-controller"),
  486. utilfeature.DefaultFeatureGate.Enabled(features.StorageObjectInUseProtection),
  487. ).Run(1, ctx.Stop)
  488. return nil, true, nil
  489. }
  490. func startPVProtectionController(ctx ControllerContext) (http.Handler, bool, error) {
  491. go pvprotection.NewPVProtectionController(
  492. ctx.InformerFactory.Core().V1().PersistentVolumes(),
  493. ctx.ClientBuilder.ClientOrDie("pv-protection-controller"),
  494. utilfeature.DefaultFeatureGate.Enabled(features.StorageObjectInUseProtection),
  495. ).Run(1, ctx.Stop)
  496. return nil, true, nil
  497. }
  498. func startTTLAfterFinishedController(ctx ControllerContext) (http.Handler, bool, error) {
  499. if !utilfeature.DefaultFeatureGate.Enabled(features.TTLAfterFinished) {
  500. return nil, false, nil
  501. }
  502. go ttlafterfinished.New(
  503. ctx.InformerFactory.Batch().V1().Jobs(),
  504. ctx.ClientBuilder.ClientOrDie("ttl-after-finished-controller"),
  505. ).Run(int(ctx.ComponentConfig.TTLAfterFinishedController.ConcurrentTTLSyncs), ctx.Stop)
  506. return nil, true, nil
  507. }
  508. // processCIDRs is a helper function that works on a comma separated cidrs and returns
  509. // a list of typed cidrs
  510. // a flag if cidrs represents a dual stack
  511. // error if failed to parse any of the cidrs
  512. func processCIDRs(cidrsList string) ([]*net.IPNet, bool, error) {
  513. cidrsSplit := strings.Split(strings.TrimSpace(cidrsList), ",")
  514. cidrs, err := netutils.ParseCIDRs(cidrsSplit)
  515. if err != nil {
  516. return nil, false, err
  517. }
  518. // if cidrs has an error then the previous call will fail
  519. // safe to ignore error checking on next call
  520. dualstack, _ := netutils.IsDualStackCIDRs(cidrs)
  521. return cidrs, dualstack, nil
  522. }
  523. // setNodeCIDRMaskSizes returns the IPv4 and IPv6 node cidr mask sizes.
  524. // If --node-cidr-mask-size not set, then it will return default IPv4 and IPv6 cidr mask sizes.
  525. func setNodeCIDRMaskSizes(cfg nodeipamconfig.NodeIPAMControllerConfiguration) (int, int, error) {
  526. ipv4Mask, ipv6Mask := defaultNodeMaskCIDRIPv4, defaultNodeMaskCIDRIPv6
  527. // NodeCIDRMaskSizeIPv4 and NodeCIDRMaskSizeIPv6 can be used only for dual-stack clusters
  528. if cfg.NodeCIDRMaskSizeIPv4 != 0 || cfg.NodeCIDRMaskSizeIPv6 != 0 {
  529. return ipv4Mask, ipv6Mask, errors.New("usage of --node-cidr-mask-size-ipv4 and --node-cidr-mask-size-ipv6 are not allowed with non dual-stack clusters")
  530. }
  531. if cfg.NodeCIDRMaskSize != 0 {
  532. ipv4Mask = int(cfg.NodeCIDRMaskSize)
  533. ipv6Mask = int(cfg.NodeCIDRMaskSize)
  534. }
  535. return ipv4Mask, ipv6Mask, nil
  536. }
  537. // setNodeCIDRMaskSizesDualStack returns the IPv4 and IPv6 node cidr mask sizes to the value provided
  538. // for --node-cidr-mask-size-ipv4 and --node-cidr-mask-size-ipv6 respectively. If value not provided,
  539. // then it will return default IPv4 and IPv6 cidr mask sizes.
  540. func setNodeCIDRMaskSizesDualStack(cfg nodeipamconfig.NodeIPAMControllerConfiguration) (int, int, error) {
  541. ipv4Mask, ipv6Mask := defaultNodeMaskCIDRIPv4, defaultNodeMaskCIDRIPv6
  542. // NodeCIDRMaskSize can be used only for single stack clusters
  543. if cfg.NodeCIDRMaskSize != 0 {
  544. return ipv4Mask, ipv6Mask, errors.New("usage of --node-cidr-mask-size is not allowed with dual-stack clusters")
  545. }
  546. if cfg.NodeCIDRMaskSizeIPv4 != 0 {
  547. ipv4Mask = int(cfg.NodeCIDRMaskSizeIPv4)
  548. }
  549. if cfg.NodeCIDRMaskSizeIPv6 != 0 {
  550. ipv6Mask = int(cfg.NodeCIDRMaskSizeIPv6)
  551. }
  552. return ipv4Mask, ipv6Mask, nil
  553. }
  554. // getNodeCIDRMaskSizes is a helper function that helps the generate the node cidr mask
  555. // sizes slice based on the cluster cidr slice
  556. func getNodeCIDRMaskSizes(clusterCIDRs []*net.IPNet, maskSizeIPv4, maskSizeIPv6 int) []int {
  557. nodeMaskCIDRs := make([]int, len(clusterCIDRs))
  558. for idx, clusterCIDR := range clusterCIDRs {
  559. if netutils.IsIPv6CIDR(clusterCIDR) {
  560. nodeMaskCIDRs[idx] = maskSizeIPv6
  561. } else {
  562. nodeMaskCIDRs[idx] = maskSizeIPv4
  563. }
  564. }
  565. return nodeMaskCIDRs
  566. }