123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327 |
- /*
- Copyright 2016 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package ipam
- import (
- "fmt"
- "net"
- "sync"
- "k8s.io/klog"
- "k8s.io/api/core/v1"
- apierrors "k8s.io/apimachinery/pkg/api/errors"
- "k8s.io/apimachinery/pkg/types"
- utilruntime "k8s.io/apimachinery/pkg/util/runtime"
- "k8s.io/apimachinery/pkg/util/sets"
- informers "k8s.io/client-go/informers/core/v1"
- clientset "k8s.io/client-go/kubernetes"
- "k8s.io/client-go/kubernetes/scheme"
- v1core "k8s.io/client-go/kubernetes/typed/core/v1"
- corelisters "k8s.io/client-go/listers/core/v1"
- "k8s.io/client-go/tools/cache"
- "k8s.io/client-go/tools/record"
- "k8s.io/kubernetes/pkg/controller"
- "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/cidrset"
- nodeutil "k8s.io/kubernetes/pkg/controller/util/node"
- utilnode "k8s.io/kubernetes/pkg/util/node"
- )
- type rangeAllocator struct {
- client clientset.Interface
- cidrs *cidrset.CidrSet
- clusterCIDR *net.IPNet
- maxCIDRs int
- // nodeLister is able to list/get nodes and is populated by the shared informer passed to
- // NewCloudCIDRAllocator.
- nodeLister corelisters.NodeLister
- // nodesSynced returns true if the node shared informer has been synced at least once.
- nodesSynced cache.InformerSynced
- // Channel that is used to pass updating Nodes with assigned CIDRs to the background
- // This increases a throughput of CIDR assignment by not blocking on long operations.
- nodeCIDRUpdateChannel chan nodeAndCIDR
- recorder record.EventRecorder
- // Keep a set of nodes that are currectly being processed to avoid races in CIDR allocation
- lock sync.Mutex
- nodesInProcessing sets.String
- }
- // NewCIDRRangeAllocator returns a CIDRAllocator to allocate CIDR for node
- // Caller must ensure subNetMaskSize is not less than cluster CIDR mask size.
- // Caller must always pass in a list of existing nodes so the new allocator
- // can initialize its CIDR map. NodeList is only nil in testing.
- func NewCIDRRangeAllocator(client clientset.Interface, nodeInformer informers.NodeInformer, clusterCIDR *net.IPNet, serviceCIDR *net.IPNet, subNetMaskSize int, nodeList *v1.NodeList) (CIDRAllocator, error) {
- if client == nil {
- klog.Fatalf("kubeClient is nil when starting NodeController")
- }
- eventBroadcaster := record.NewBroadcaster()
- recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cidrAllocator"})
- eventBroadcaster.StartLogging(klog.Infof)
- klog.V(0).Infof("Sending events to api server.")
- eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
- set, err := cidrset.NewCIDRSet(clusterCIDR, subNetMaskSize)
- if err != nil {
- return nil, err
- }
- ra := &rangeAllocator{
- client: client,
- cidrs: set,
- clusterCIDR: clusterCIDR,
- nodeLister: nodeInformer.Lister(),
- nodesSynced: nodeInformer.Informer().HasSynced,
- nodeCIDRUpdateChannel: make(chan nodeAndCIDR, cidrUpdateQueueSize),
- recorder: recorder,
- nodesInProcessing: sets.NewString(),
- }
- if serviceCIDR != nil {
- ra.filterOutServiceRange(serviceCIDR)
- } else {
- klog.V(0).Info("No Service CIDR provided. Skipping filtering out service addresses.")
- }
- if nodeList != nil {
- for _, node := range nodeList.Items {
- if node.Spec.PodCIDR == "" {
- klog.Infof("Node %v has no CIDR, ignoring", node.Name)
- continue
- } else {
- klog.Infof("Node %v has CIDR %s, occupying it in CIDR map",
- node.Name, node.Spec.PodCIDR)
- }
- if err := ra.occupyCIDR(&node); err != nil {
- // This will happen if:
- // 1. We find garbage in the podCIDR field. Retrying is useless.
- // 2. CIDR out of range: This means a node CIDR has changed.
- // This error will keep crashing controller-manager.
- return nil, err
- }
- }
- }
- nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
- AddFunc: nodeutil.CreateAddNodeHandler(ra.AllocateOrOccupyCIDR),
- UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
- // If the PodCIDR is not empty we either:
- // - already processed a Node that already had a CIDR after NC restarted
- // (cidr is marked as used),
- // - already processed a Node successfully and allocated a CIDR for it
- // (cidr is marked as used),
- // - already processed a Node but we did saw a "timeout" response and
- // request eventually got through in this case we haven't released
- // the allocated CIDR (cidr is still marked as used).
- // There's a possible error here:
- // - NC sees a new Node and assigns a CIDR X to it,
- // - Update Node call fails with a timeout,
- // - Node is updated by some other component, NC sees an update and
- // assigns CIDR Y to the Node,
- // - Both CIDR X and CIDR Y are marked as used in the local cache,
- // even though Node sees only CIDR Y
- // The problem here is that in in-memory cache we see CIDR X as marked,
- // which prevents it from being assigned to any new node. The cluster
- // state is correct.
- // Restart of NC fixes the issue.
- if newNode.Spec.PodCIDR == "" {
- return ra.AllocateOrOccupyCIDR(newNode)
- }
- return nil
- }),
- DeleteFunc: nodeutil.CreateDeleteNodeHandler(ra.ReleaseCIDR),
- })
- return ra, nil
- }
- func (r *rangeAllocator) Run(stopCh <-chan struct{}) {
- defer utilruntime.HandleCrash()
- klog.Infof("Starting range CIDR allocator")
- defer klog.Infof("Shutting down range CIDR allocator")
- if !controller.WaitForCacheSync("cidrallocator", stopCh, r.nodesSynced) {
- return
- }
- for i := 0; i < cidrUpdateWorkers; i++ {
- go r.worker(stopCh)
- }
- <-stopCh
- }
- func (r *rangeAllocator) worker(stopChan <-chan struct{}) {
- for {
- select {
- case workItem, ok := <-r.nodeCIDRUpdateChannel:
- if !ok {
- klog.Warning("Channel nodeCIDRUpdateChannel was unexpectedly closed")
- return
- }
- if err := r.updateCIDRAllocation(workItem); err != nil {
- // Requeue the failed node for update again.
- r.nodeCIDRUpdateChannel <- workItem
- }
- case <-stopChan:
- return
- }
- }
- }
- func (r *rangeAllocator) insertNodeToProcessing(nodeName string) bool {
- r.lock.Lock()
- defer r.lock.Unlock()
- if r.nodesInProcessing.Has(nodeName) {
- return false
- }
- r.nodesInProcessing.Insert(nodeName)
- return true
- }
- func (r *rangeAllocator) removeNodeFromProcessing(nodeName string) {
- r.lock.Lock()
- defer r.lock.Unlock()
- r.nodesInProcessing.Delete(nodeName)
- }
- func (r *rangeAllocator) occupyCIDR(node *v1.Node) error {
- defer r.removeNodeFromProcessing(node.Name)
- if node.Spec.PodCIDR == "" {
- return nil
- }
- _, podCIDR, err := net.ParseCIDR(node.Spec.PodCIDR)
- if err != nil {
- return fmt.Errorf("failed to parse node %s, CIDR %s", node.Name, node.Spec.PodCIDR)
- }
- if err := r.cidrs.Occupy(podCIDR); err != nil {
- return fmt.Errorf("failed to mark cidr as occupied: %v", err)
- }
- return nil
- }
- // WARNING: If you're adding any return calls or defer any more work from this
- // function you have to make sure to update nodesInProcessing properly with the
- // disposition of the node when the work is done.
- func (r *rangeAllocator) AllocateOrOccupyCIDR(node *v1.Node) error {
- if node == nil {
- return nil
- }
- if !r.insertNodeToProcessing(node.Name) {
- klog.V(2).Infof("Node %v is already in a process of CIDR assignment.", node.Name)
- return nil
- }
- if node.Spec.PodCIDR != "" {
- return r.occupyCIDR(node)
- }
- podCIDR, err := r.cidrs.AllocateNext()
- if err != nil {
- r.removeNodeFromProcessing(node.Name)
- nodeutil.RecordNodeStatusChange(r.recorder, node, "CIDRNotAvailable")
- return fmt.Errorf("failed to allocate cidr: %v", err)
- }
- klog.V(4).Infof("Putting node %s with CIDR %s into the work queue", node.Name, podCIDR)
- r.nodeCIDRUpdateChannel <- nodeAndCIDR{
- nodeName: node.Name,
- cidr: podCIDR,
- }
- return nil
- }
- func (r *rangeAllocator) ReleaseCIDR(node *v1.Node) error {
- if node == nil || node.Spec.PodCIDR == "" {
- return nil
- }
- _, podCIDR, err := net.ParseCIDR(node.Spec.PodCIDR)
- if err != nil {
- return fmt.Errorf("Failed to parse CIDR %s on Node %v: %v", node.Spec.PodCIDR, node.Name, err)
- }
- klog.V(4).Infof("release CIDR %s", node.Spec.PodCIDR)
- if err = r.cidrs.Release(podCIDR); err != nil {
- return fmt.Errorf("Error when releasing CIDR %v: %v", node.Spec.PodCIDR, err)
- }
- return err
- }
- // Marks all CIDRs with subNetMaskSize that belongs to serviceCIDR as used,
- // so that they won't be assignable.
- func (r *rangeAllocator) filterOutServiceRange(serviceCIDR *net.IPNet) {
- // Checks if service CIDR has a nonempty intersection with cluster
- // CIDR. It is the case if either clusterCIDR contains serviceCIDR with
- // clusterCIDR's Mask applied (this means that clusterCIDR contains
- // serviceCIDR) or vice versa (which means that serviceCIDR contains
- // clusterCIDR).
- if !r.clusterCIDR.Contains(serviceCIDR.IP.Mask(r.clusterCIDR.Mask)) && !serviceCIDR.Contains(r.clusterCIDR.IP.Mask(serviceCIDR.Mask)) {
- return
- }
- if err := r.cidrs.Occupy(serviceCIDR); err != nil {
- klog.Errorf("Error filtering out service cidr %v: %v", serviceCIDR, err)
- }
- }
- // updateCIDRAllocation assigns CIDR to Node and sends an update to the API server.
- func (r *rangeAllocator) updateCIDRAllocation(data nodeAndCIDR) error {
- var err error
- var node *v1.Node
- defer r.removeNodeFromProcessing(data.nodeName)
- podCIDR := data.cidr.String()
- node, err = r.nodeLister.Get(data.nodeName)
- if err != nil {
- klog.Errorf("Failed while getting node %v for updating Node.Spec.PodCIDR: %v", data.nodeName, err)
- return err
- }
- if node.Spec.PodCIDR == podCIDR {
- klog.V(4).Infof("Node %v already has allocated CIDR %v. It matches the proposed one.", node.Name, podCIDR)
- return nil
- }
- if node.Spec.PodCIDR != "" {
- klog.Errorf("Node %v already has a CIDR allocated %v. Releasing the new one %v.", node.Name, node.Spec.PodCIDR, podCIDR)
- if err := r.cidrs.Release(data.cidr); err != nil {
- klog.Errorf("Error when releasing CIDR %v", podCIDR)
- }
- return nil
- }
- // If we reached here, it means that the node has no CIDR currently assigned. So we set it.
- for i := 0; i < cidrUpdateRetries; i++ {
- if err = utilnode.PatchNodeCIDR(r.client, types.NodeName(node.Name), podCIDR); err == nil {
- klog.Infof("Set node %v PodCIDR to %v", node.Name, podCIDR)
- return nil
- }
- }
- klog.Errorf("Failed to update node %v PodCIDR to %v after multiple attempts: %v", node.Name, podCIDR, err)
- nodeutil.RecordNodeStatusChange(r.recorder, node, "CIDRAssignmentFailed")
- // We accept the fact that we may leak CIDRs here. This is safer than releasing
- // them in case when we don't know if request went through.
- // NodeController restart will return all falsely allocated CIDRs to the pool.
- if !apierrors.IsServerTimeout(err) {
- klog.Errorf("CIDR assignment for node %v failed: %v. Releasing allocated CIDR", node.Name, err)
- if releaseErr := r.cidrs.Release(data.cidr); releaseErr != nil {
- klog.Errorf("Error releasing allocated CIDR for node %v: %v", node.Name, releaseErr)
- }
- }
- return err
- }
|