123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175 |
- /*
- Copyright 2014 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package nodeipam
- import (
- "net"
- "time"
- "k8s.io/klog"
- utilruntime "k8s.io/apimachinery/pkg/util/runtime"
- v1core "k8s.io/client-go/kubernetes/typed/core/v1"
- "k8s.io/client-go/tools/cache"
- "k8s.io/client-go/tools/record"
- "k8s.io/api/core/v1"
- coreinformers "k8s.io/client-go/informers/core/v1"
- clientset "k8s.io/client-go/kubernetes"
- corelisters "k8s.io/client-go/listers/core/v1"
- cloudprovider "k8s.io/cloud-provider"
- "k8s.io/kubernetes/pkg/controller"
- "k8s.io/kubernetes/pkg/controller/nodeipam/ipam"
- nodesync "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/sync"
- "k8s.io/kubernetes/pkg/util/metrics"
- )
- const (
- // ipamResyncInterval is the amount of time between when the cloud and node
- // CIDR range assignments are synchronized.
- ipamResyncInterval = 30 * time.Second
- // ipamMaxBackoff is the maximum backoff for retrying synchronization of a
- // given in the error state.
- ipamMaxBackoff = 10 * time.Second
- // ipamInitialRetry is the initial retry interval for retrying synchronization of a
- // given in the error state.
- ipamInitialBackoff = 250 * time.Millisecond
- )
- // Controller is the controller that manages node ipam state.
- type Controller struct {
- allocatorType ipam.CIDRAllocatorType
- cloud cloudprovider.Interface
- clusterCIDR *net.IPNet
- serviceCIDR *net.IPNet
- kubeClient clientset.Interface
- // Method for easy mocking in unittest.
- lookupIP func(host string) ([]net.IP, error)
- nodeLister corelisters.NodeLister
- nodeInformerSynced cache.InformerSynced
- cidrAllocator ipam.CIDRAllocator
- forcefullyDeletePod func(*v1.Pod) error
- }
- // NewNodeIpamController returns a new node IP Address Management controller to
- // sync instances from cloudprovider.
- // This method returns an error if it is unable to initialize the CIDR bitmap with
- // podCIDRs it has already allocated to nodes. Since we don't allow podCIDR changes
- // currently, this should be handled as a fatal error.
- func NewNodeIpamController(
- nodeInformer coreinformers.NodeInformer,
- cloud cloudprovider.Interface,
- kubeClient clientset.Interface,
- clusterCIDR *net.IPNet,
- serviceCIDR *net.IPNet,
- nodeCIDRMaskSize int,
- allocatorType ipam.CIDRAllocatorType) (*Controller, error) {
- if kubeClient == nil {
- klog.Fatalf("kubeClient is nil when starting Controller")
- }
- eventBroadcaster := record.NewBroadcaster()
- eventBroadcaster.StartLogging(klog.Infof)
- klog.Infof("Sending events to api server.")
- eventBroadcaster.StartRecordingToSink(
- &v1core.EventSinkImpl{
- Interface: kubeClient.CoreV1().Events(""),
- })
- if kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
- metrics.RegisterMetricAndTrackRateLimiterUsage("node_ipam_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter())
- }
- if allocatorType != ipam.CloudAllocatorType {
- // Cloud CIDR allocator does not rely on clusterCIDR or nodeCIDRMaskSize for allocation.
- if clusterCIDR == nil {
- klog.Fatal("Controller: Must specify --cluster-cidr if --allocate-node-cidrs is set")
- }
- if maskSize, _ := clusterCIDR.Mask.Size(); maskSize > nodeCIDRMaskSize {
- klog.Fatal("Controller: Invalid --cluster-cidr, mask size of cluster CIDR must be less than --node-cidr-mask-size")
- }
- }
- ic := &Controller{
- cloud: cloud,
- kubeClient: kubeClient,
- lookupIP: net.LookupIP,
- clusterCIDR: clusterCIDR,
- serviceCIDR: serviceCIDR,
- allocatorType: allocatorType,
- }
- // TODO: Abstract this check into a generic controller manager should run method.
- if ic.allocatorType == ipam.IPAMFromClusterAllocatorType || ic.allocatorType == ipam.IPAMFromCloudAllocatorType {
- cfg := &ipam.Config{
- Resync: ipamResyncInterval,
- MaxBackoff: ipamMaxBackoff,
- InitialRetry: ipamInitialBackoff,
- }
- switch ic.allocatorType {
- case ipam.IPAMFromClusterAllocatorType:
- cfg.Mode = nodesync.SyncFromCluster
- case ipam.IPAMFromCloudAllocatorType:
- cfg.Mode = nodesync.SyncFromCloud
- }
- ipamc, err := ipam.NewController(cfg, kubeClient, cloud, clusterCIDR, serviceCIDR, nodeCIDRMaskSize)
- if err != nil {
- klog.Fatalf("Error creating ipam controller: %v", err)
- }
- if err := ipamc.Start(nodeInformer); err != nil {
- klog.Fatalf("Error trying to Init(): %v", err)
- }
- } else {
- var err error
- ic.cidrAllocator, err = ipam.New(
- kubeClient, cloud, nodeInformer, ic.allocatorType, ic.clusterCIDR, ic.serviceCIDR, nodeCIDRMaskSize)
- if err != nil {
- return nil, err
- }
- }
- ic.nodeLister = nodeInformer.Lister()
- ic.nodeInformerSynced = nodeInformer.Informer().HasSynced
- return ic, nil
- }
- // Run starts an asynchronous loop that monitors the status of cluster nodes.
- func (nc *Controller) Run(stopCh <-chan struct{}) {
- defer utilruntime.HandleCrash()
- klog.Infof("Starting ipam controller")
- defer klog.Infof("Shutting down ipam controller")
- if !controller.WaitForCacheSync("node", stopCh, nc.nodeInformerSynced) {
- return
- }
- if nc.allocatorType != ipam.IPAMFromClusterAllocatorType && nc.allocatorType != ipam.IPAMFromCloudAllocatorType {
- go nc.cidrAllocator.Run(stopCh)
- }
- <-stopCh
- }
|