123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287 |
- /*
- Copyright 2015 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package route
- import (
- "context"
- "fmt"
- "net"
- "sync"
- "time"
- "k8s.io/klog"
- "k8s.io/api/core/v1"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/labels"
- "k8s.io/apimachinery/pkg/types"
- utilruntime "k8s.io/apimachinery/pkg/util/runtime"
- "k8s.io/apimachinery/pkg/util/wait"
- coreinformers "k8s.io/client-go/informers/core/v1"
- clientset "k8s.io/client-go/kubernetes"
- "k8s.io/client-go/kubernetes/scheme"
- v1core "k8s.io/client-go/kubernetes/typed/core/v1"
- corelisters "k8s.io/client-go/listers/core/v1"
- "k8s.io/client-go/tools/cache"
- "k8s.io/client-go/tools/record"
- clientretry "k8s.io/client-go/util/retry"
- cloudprovider "k8s.io/cloud-provider"
- "k8s.io/kubernetes/pkg/controller"
- nodeutil "k8s.io/kubernetes/pkg/controller/util/node"
- "k8s.io/kubernetes/pkg/util/metrics"
- utilnode "k8s.io/kubernetes/pkg/util/node"
- )
- const (
- // Maximal number of concurrent CreateRoute API calls.
- // TODO: This should be per-provider.
- maxConcurrentRouteCreations int = 200
- )
- var updateNetworkConditionBackoff = wait.Backoff{
- Steps: 5, // Maximum number of retries.
- Duration: 100 * time.Millisecond,
- Jitter: 1.0,
- }
- type RouteController struct {
- routes cloudprovider.Routes
- kubeClient clientset.Interface
- clusterName string
- clusterCIDR *net.IPNet
- nodeLister corelisters.NodeLister
- nodeListerSynced cache.InformerSynced
- broadcaster record.EventBroadcaster
- recorder record.EventRecorder
- }
- func New(routes cloudprovider.Routes, kubeClient clientset.Interface, nodeInformer coreinformers.NodeInformer, clusterName string, clusterCIDR *net.IPNet) *RouteController {
- if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
- metrics.RegisterMetricAndTrackRateLimiterUsage("route_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter())
- }
- if clusterCIDR == nil {
- klog.Fatal("RouteController: Must specify clusterCIDR.")
- }
- eventBroadcaster := record.NewBroadcaster()
- eventBroadcaster.StartLogging(klog.Infof)
- recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "route_controller"})
- rc := &RouteController{
- routes: routes,
- kubeClient: kubeClient,
- clusterName: clusterName,
- clusterCIDR: clusterCIDR,
- nodeLister: nodeInformer.Lister(),
- nodeListerSynced: nodeInformer.Informer().HasSynced,
- broadcaster: eventBroadcaster,
- recorder: recorder,
- }
- return rc
- }
- func (rc *RouteController) Run(stopCh <-chan struct{}, syncPeriod time.Duration) {
- defer utilruntime.HandleCrash()
- klog.Info("Starting route controller")
- defer klog.Info("Shutting down route controller")
- if !controller.WaitForCacheSync("route", stopCh, rc.nodeListerSynced) {
- return
- }
- if rc.broadcaster != nil {
- rc.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: rc.kubeClient.CoreV1().Events("")})
- }
- // TODO: If we do just the full Resync every 5 minutes (default value)
- // that means that we may wait up to 5 minutes before even starting
- // creating a route for it. This is bad.
- // We should have a watch on node and if we observe a new node (with CIDR?)
- // trigger reconciliation for that node.
- go wait.NonSlidingUntil(func() {
- if err := rc.reconcileNodeRoutes(); err != nil {
- klog.Errorf("Couldn't reconcile node routes: %v", err)
- }
- }, syncPeriod, stopCh)
- <-stopCh
- }
- func (rc *RouteController) reconcileNodeRoutes() error {
- routeList, err := rc.routes.ListRoutes(context.TODO(), rc.clusterName)
- if err != nil {
- return fmt.Errorf("error listing routes: %v", err)
- }
- nodes, err := rc.nodeLister.List(labels.Everything())
- if err != nil {
- return fmt.Errorf("error listing nodes: %v", err)
- }
- return rc.reconcile(nodes, routeList)
- }
- func (rc *RouteController) reconcile(nodes []*v1.Node, routes []*cloudprovider.Route) error {
- // nodeCIDRs maps nodeName->nodeCIDR
- nodeCIDRs := make(map[types.NodeName]string)
- // routeMap maps routeTargetNode->route
- routeMap := make(map[types.NodeName]*cloudprovider.Route)
- for _, route := range routes {
- if route.TargetNode != "" {
- routeMap[route.TargetNode] = route
- }
- }
- wg := sync.WaitGroup{}
- rateLimiter := make(chan struct{}, maxConcurrentRouteCreations)
- for _, node := range nodes {
- // Skip if the node hasn't been assigned a CIDR yet.
- if node.Spec.PodCIDR == "" {
- continue
- }
- nodeName := types.NodeName(node.Name)
- // Check if we have a route for this node w/ the correct CIDR.
- r := routeMap[nodeName]
- if r == nil || r.DestinationCIDR != node.Spec.PodCIDR {
- // If not, create the route.
- route := &cloudprovider.Route{
- TargetNode: nodeName,
- DestinationCIDR: node.Spec.PodCIDR,
- }
- nameHint := string(node.UID)
- wg.Add(1)
- go func(nodeName types.NodeName, nameHint string, route *cloudprovider.Route) {
- defer wg.Done()
- err := clientretry.RetryOnConflict(updateNetworkConditionBackoff, func() error {
- startTime := time.Now()
- // Ensure that we don't have more than maxConcurrentRouteCreations
- // CreateRoute calls in flight.
- rateLimiter <- struct{}{}
- klog.Infof("Creating route for node %s %s with hint %s, throttled %v", nodeName, route.DestinationCIDR, nameHint, time.Since(startTime))
- err := rc.routes.CreateRoute(context.TODO(), rc.clusterName, nameHint, route)
- <-rateLimiter
- rc.updateNetworkingCondition(nodeName, err == nil)
- if err != nil {
- msg := fmt.Sprintf("Could not create route %s %s for node %s after %v: %v", nameHint, route.DestinationCIDR, nodeName, time.Since(startTime), err)
- if rc.recorder != nil {
- rc.recorder.Eventf(
- &v1.ObjectReference{
- Kind: "Node",
- Name: string(nodeName),
- UID: types.UID(nodeName),
- Namespace: "",
- }, v1.EventTypeWarning, "FailedToCreateRoute", msg)
- }
- klog.V(4).Infof(msg)
- return err
- }
- klog.Infof("Created route for node %s %s with hint %s after %v", nodeName, route.DestinationCIDR, nameHint, time.Since(startTime))
- return nil
- })
- if err != nil {
- klog.Errorf("Could not create route %s %s for node %s: %v", nameHint, route.DestinationCIDR, nodeName, err)
- }
- }(nodeName, nameHint, route)
- } else {
- // Update condition only if it doesn't reflect the current state.
- _, condition := nodeutil.GetNodeCondition(&node.Status, v1.NodeNetworkUnavailable)
- if condition == nil || condition.Status != v1.ConditionFalse {
- rc.updateNetworkingCondition(types.NodeName(node.Name), true)
- }
- }
- nodeCIDRs[nodeName] = node.Spec.PodCIDR
- }
- for _, route := range routes {
- if rc.isResponsibleForRoute(route) {
- // Check if this route is a blackhole, or applies to a node we know about & has an incorrect CIDR.
- if route.Blackhole || (nodeCIDRs[route.TargetNode] != route.DestinationCIDR) {
- wg.Add(1)
- // Delete the route.
- go func(route *cloudprovider.Route, startTime time.Time) {
- defer wg.Done()
- rateLimiter <- struct{}{}
- klog.Infof("Deleting route %s %s", route.Name, route.DestinationCIDR)
- if err := rc.routes.DeleteRoute(context.TODO(), rc.clusterName, route); err != nil {
- klog.Errorf("Could not delete route %s %s after %v: %v", route.Name, route.DestinationCIDR, time.Since(startTime), err)
- } else {
- klog.Infof("Deleted route %s %s after %v", route.Name, route.DestinationCIDR, time.Since(startTime))
- }
- <-rateLimiter
- }(route, time.Now())
- }
- }
- }
- wg.Wait()
- return nil
- }
- func (rc *RouteController) updateNetworkingCondition(nodeName types.NodeName, routeCreated bool) error {
- err := clientretry.RetryOnConflict(updateNetworkConditionBackoff, func() error {
- var err error
- // Patch could also fail, even though the chance is very slim. So we still do
- // patch in the retry loop.
- currentTime := metav1.Now()
- if routeCreated {
- err = utilnode.SetNodeCondition(rc.kubeClient, nodeName, v1.NodeCondition{
- Type: v1.NodeNetworkUnavailable,
- Status: v1.ConditionFalse,
- Reason: "RouteCreated",
- Message: "RouteController created a route",
- LastTransitionTime: currentTime,
- })
- } else {
- err = utilnode.SetNodeCondition(rc.kubeClient, nodeName, v1.NodeCondition{
- Type: v1.NodeNetworkUnavailable,
- Status: v1.ConditionTrue,
- Reason: "NoRouteCreated",
- Message: "RouteController failed to create a route",
- LastTransitionTime: currentTime,
- })
- }
- if err != nil {
- klog.V(4).Infof("Error updating node %s, retrying: %v", nodeName, err)
- }
- return err
- })
- if err != nil {
- klog.Errorf("Error updating node %s: %v", nodeName, err)
- }
- return err
- }
- func (rc *RouteController) isResponsibleForRoute(route *cloudprovider.Route) bool {
- _, cidr, err := net.ParseCIDR(route.DestinationCIDR)
- if err != nil {
- klog.Errorf("Ignoring route %s, unparsable CIDR: %v", route.Name, err)
- return false
- }
- // Not responsible if this route's CIDR is not within our clusterCIDR
- lastIP := make([]byte, len(cidr.IP))
- for i := range lastIP {
- lastIP[i] = cidr.IP[i] | ^cidr.Mask[i]
- }
- if !rc.clusterCIDR.Contains(cidr.IP) || !rc.clusterCIDR.Contains(lastIP) {
- return false
- }
- return true
- }
|