123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311 |
- // +build !providerless
- /*
- Copyright 2016 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package ipam
- import (
- "fmt"
- "math/rand"
- "net"
- "sync"
- "time"
- "k8s.io/klog"
- v1 "k8s.io/api/core/v1"
- "k8s.io/apimachinery/pkg/api/errors"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/types"
- utilruntime "k8s.io/apimachinery/pkg/util/runtime"
- informers "k8s.io/client-go/informers/core/v1"
- corelisters "k8s.io/client-go/listers/core/v1"
- "k8s.io/client-go/tools/cache"
- "k8s.io/client-go/tools/record"
- clientset "k8s.io/client-go/kubernetes"
- "k8s.io/client-go/kubernetes/scheme"
- v1core "k8s.io/client-go/kubernetes/typed/core/v1"
- cloudprovider "k8s.io/cloud-provider"
- nodeutil "k8s.io/kubernetes/pkg/controller/util/node"
- utilnode "k8s.io/kubernetes/pkg/util/node"
- utiltaints "k8s.io/kubernetes/pkg/util/taints"
- "k8s.io/legacy-cloud-providers/gce"
- )
- // nodeProcessingInfo tracks information related to current nodes in processing
- type nodeProcessingInfo struct {
- retries int
- }
- // cloudCIDRAllocator allocates node CIDRs according to IP address aliases
- // assigned by the cloud provider. In this case, the allocation and
- // deallocation is delegated to the external provider, and the controller
- // merely takes the assignment and updates the node spec.
- type cloudCIDRAllocator struct {
- client clientset.Interface
- cloud *gce.Cloud
- // nodeLister is able to list/get nodes and is populated by the shared informer passed to
- // NewCloudCIDRAllocator.
- nodeLister corelisters.NodeLister
- // nodesSynced returns true if the node shared informer has been synced at least once.
- nodesSynced cache.InformerSynced
- // Channel that is used to pass updating Nodes to the background.
- // This increases the throughput of CIDR assignment by parallelization
- // and not blocking on long operations (which shouldn't be done from
- // event handlers anyway).
- nodeUpdateChannel chan string
- recorder record.EventRecorder
- // Keep a set of nodes that are currectly being processed to avoid races in CIDR allocation
- lock sync.Mutex
- nodesInProcessing map[string]*nodeProcessingInfo
- }
- var _ CIDRAllocator = (*cloudCIDRAllocator)(nil)
- // NewCloudCIDRAllocator creates a new cloud CIDR allocator.
- func NewCloudCIDRAllocator(client clientset.Interface, cloud cloudprovider.Interface, nodeInformer informers.NodeInformer) (CIDRAllocator, error) {
- if client == nil {
- klog.Fatalf("kubeClient is nil when starting NodeController")
- }
- eventBroadcaster := record.NewBroadcaster()
- recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cidrAllocator"})
- eventBroadcaster.StartLogging(klog.Infof)
- klog.V(0).Infof("Sending events to api server.")
- eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
- gceCloud, ok := cloud.(*gce.Cloud)
- if !ok {
- err := fmt.Errorf("cloudCIDRAllocator does not support %v provider", cloud.ProviderName())
- return nil, err
- }
- ca := &cloudCIDRAllocator{
- client: client,
- cloud: gceCloud,
- nodeLister: nodeInformer.Lister(),
- nodesSynced: nodeInformer.Informer().HasSynced,
- nodeUpdateChannel: make(chan string, cidrUpdateQueueSize),
- recorder: recorder,
- nodesInProcessing: map[string]*nodeProcessingInfo{},
- }
- nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
- AddFunc: nodeutil.CreateAddNodeHandler(ca.AllocateOrOccupyCIDR),
- UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
- if newNode.Spec.PodCIDR == "" {
- return ca.AllocateOrOccupyCIDR(newNode)
- }
- // Even if PodCIDR is assigned, but NetworkUnavailable condition is
- // set to true, we need to process the node to set the condition.
- networkUnavailableTaint := &v1.Taint{Key: v1.TaintNodeNetworkUnavailable, Effect: v1.TaintEffectNoSchedule}
- _, cond := nodeutil.GetNodeCondition(&newNode.Status, v1.NodeNetworkUnavailable)
- if cond == nil || cond.Status != v1.ConditionFalse || utiltaints.TaintExists(newNode.Spec.Taints, networkUnavailableTaint) {
- return ca.AllocateOrOccupyCIDR(newNode)
- }
- return nil
- }),
- DeleteFunc: nodeutil.CreateDeleteNodeHandler(ca.ReleaseCIDR),
- })
- klog.V(0).Infof("Using cloud CIDR allocator (provider: %v)", cloud.ProviderName())
- return ca, nil
- }
- func (ca *cloudCIDRAllocator) Run(stopCh <-chan struct{}) {
- defer utilruntime.HandleCrash()
- klog.Infof("Starting cloud CIDR allocator")
- defer klog.Infof("Shutting down cloud CIDR allocator")
- if !cache.WaitForNamedCacheSync("cidrallocator", stopCh, ca.nodesSynced) {
- return
- }
- for i := 0; i < cidrUpdateWorkers; i++ {
- go ca.worker(stopCh)
- }
- <-stopCh
- }
- func (ca *cloudCIDRAllocator) worker(stopChan <-chan struct{}) {
- for {
- select {
- case workItem, ok := <-ca.nodeUpdateChannel:
- if !ok {
- klog.Warning("Channel nodeCIDRUpdateChannel was unexpectedly closed")
- return
- }
- if err := ca.updateCIDRAllocation(workItem); err == nil {
- klog.V(3).Infof("Updated CIDR for %q", workItem)
- } else {
- klog.Errorf("Error updating CIDR for %q: %v", workItem, err)
- if canRetry, timeout := ca.retryParams(workItem); canRetry {
- klog.V(2).Infof("Retrying update for %q after %v", workItem, timeout)
- time.AfterFunc(timeout, func() {
- // Requeue the failed node for update again.
- ca.nodeUpdateChannel <- workItem
- })
- continue
- }
- klog.Errorf("Exceeded retry count for %q, dropping from queue", workItem)
- }
- ca.removeNodeFromProcessing(workItem)
- case <-stopChan:
- return
- }
- }
- }
- func (ca *cloudCIDRAllocator) insertNodeToProcessing(nodeName string) bool {
- ca.lock.Lock()
- defer ca.lock.Unlock()
- if _, found := ca.nodesInProcessing[nodeName]; found {
- return false
- }
- ca.nodesInProcessing[nodeName] = &nodeProcessingInfo{}
- return true
- }
- func (ca *cloudCIDRAllocator) retryParams(nodeName string) (bool, time.Duration) {
- ca.lock.Lock()
- defer ca.lock.Unlock()
- entry, ok := ca.nodesInProcessing[nodeName]
- if !ok {
- klog.Errorf("Cannot get retryParams for %q as entry does not exist", nodeName)
- return false, 0
- }
- count := entry.retries + 1
- if count > updateMaxRetries {
- return false, 0
- }
- ca.nodesInProcessing[nodeName].retries = count
- return true, nodeUpdateRetryTimeout(count)
- }
- func nodeUpdateRetryTimeout(count int) time.Duration {
- timeout := updateRetryTimeout
- for i := 0; i < count && timeout < maxUpdateRetryTimeout; i++ {
- timeout *= 2
- }
- if timeout > maxUpdateRetryTimeout {
- timeout = maxUpdateRetryTimeout
- }
- return time.Duration(timeout.Nanoseconds()/2 + rand.Int63n(timeout.Nanoseconds()))
- }
- func (ca *cloudCIDRAllocator) removeNodeFromProcessing(nodeName string) {
- ca.lock.Lock()
- defer ca.lock.Unlock()
- delete(ca.nodesInProcessing, nodeName)
- }
- // WARNING: If you're adding any return calls or defer any more work from this
- // function you have to make sure to update nodesInProcessing properly with the
- // disposition of the node when the work is done.
- func (ca *cloudCIDRAllocator) AllocateOrOccupyCIDR(node *v1.Node) error {
- if node == nil {
- return nil
- }
- if !ca.insertNodeToProcessing(node.Name) {
- klog.V(2).Infof("Node %v is already in a process of CIDR assignment.", node.Name)
- return nil
- }
- klog.V(4).Infof("Putting node %s into the work queue", node.Name)
- ca.nodeUpdateChannel <- node.Name
- return nil
- }
- // updateCIDRAllocation assigns CIDR to Node and sends an update to the API server.
- func (ca *cloudCIDRAllocator) updateCIDRAllocation(nodeName string) error {
- node, err := ca.nodeLister.Get(nodeName)
- if err != nil {
- if errors.IsNotFound(err) {
- return nil // node no longer available, skip processing
- }
- klog.Errorf("Failed while getting node %v for updating Node.Spec.PodCIDR: %v", nodeName, err)
- return err
- }
- cidrs, err := ca.cloud.AliasRanges(types.NodeName(nodeName))
- if err != nil {
- nodeutil.RecordNodeStatusChange(ca.recorder, node, "CIDRNotAvailable")
- return fmt.Errorf("failed to allocate cidr: %v", err)
- }
- if len(cidrs) == 0 {
- nodeutil.RecordNodeStatusChange(ca.recorder, node, "CIDRNotAvailable")
- return fmt.Errorf("failed to allocate cidr: Node %v has no CIDRs", node.Name)
- }
- _, cidr, err := net.ParseCIDR(cidrs[0])
- if err != nil {
- return fmt.Errorf("failed to parse string '%s' as a CIDR: %v", cidrs[0], err)
- }
- podCIDR := cidr.String()
- if node.Spec.PodCIDR == podCIDR {
- klog.V(4).Infof("Node %v already has allocated CIDR %v. It matches the proposed one.", node.Name, podCIDR)
- // We don't return here, in order to set the NetworkUnavailable condition later below.
- } else {
- if node.Spec.PodCIDR != "" {
- klog.Errorf("PodCIDR being reassigned! Node %v spec has %v, but cloud provider has assigned %v", node.Name, node.Spec.PodCIDR, podCIDR)
- // We fall through and set the CIDR despite this error. This
- // implements the same logic as implemented in the
- // rangeAllocator.
- //
- // See https://github.com/kubernetes/kubernetes/pull/42147#discussion_r103357248
- }
- for i := 0; i < cidrUpdateRetries; i++ {
- if err = utilnode.PatchNodeCIDR(ca.client, types.NodeName(node.Name), podCIDR); err == nil {
- klog.Infof("Set node %v PodCIDR to %v", node.Name, podCIDR)
- break
- }
- }
- }
- if err != nil {
- nodeutil.RecordNodeStatusChange(ca.recorder, node, "CIDRAssignmentFailed")
- klog.Errorf("Failed to update node %v PodCIDR to %v after multiple attempts: %v", node.Name, podCIDR, err)
- return err
- }
- err = utilnode.SetNodeCondition(ca.client, types.NodeName(node.Name), v1.NodeCondition{
- Type: v1.NodeNetworkUnavailable,
- Status: v1.ConditionFalse,
- Reason: "RouteCreated",
- Message: "NodeController create implicit route",
- LastTransitionTime: metav1.Now(),
- })
- if err != nil {
- klog.Errorf("Error setting route status for node %v: %v", node.Name, err)
- }
- return err
- }
- func (ca *cloudCIDRAllocator) ReleaseCIDR(node *v1.Node) error {
- klog.V(2).Infof("Node %v PodCIDR (%v) will be released by external cloud provider (not managed by controller)",
- node.Name, node.Spec.PodCIDR)
- return nil
- }
|