123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213 |
- package cloud
- import (
- "context"
- "errors"
- "fmt"
- "time"
- "k8s.io/api/core/v1"
- "k8s.io/apimachinery/pkg/labels"
- "k8s.io/apimachinery/pkg/types"
- utilruntime "k8s.io/apimachinery/pkg/util/runtime"
- "k8s.io/apimachinery/pkg/util/wait"
- coreinformers "k8s.io/client-go/informers/core/v1"
- clientset "k8s.io/client-go/kubernetes"
- "k8s.io/client-go/kubernetes/scheme"
- v1core "k8s.io/client-go/kubernetes/typed/core/v1"
- v1lister "k8s.io/client-go/listers/core/v1"
- "k8s.io/client-go/tools/record"
- cloudprovider "k8s.io/cloud-provider"
- cloudnodeutil "k8s.io/cloud-provider/node/helpers"
- "k8s.io/klog"
- "k8s.io/kubernetes/pkg/controller"
- schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
- )
- const (
- deleteNodeEvent = "DeletingNode"
- )
- var ShutdownTaint = &v1.Taint{
- Key: schedulerapi.TaintNodeShutdown,
- Effect: v1.TaintEffectNoSchedule,
- }
- type CloudNodeLifecycleController struct {
- kubeClient clientset.Interface
- nodeLister v1lister.NodeLister
- recorder record.EventRecorder
- cloud cloudprovider.Interface
-
-
-
- nodeMonitorPeriod time.Duration
- }
- func NewCloudNodeLifecycleController(
- nodeInformer coreinformers.NodeInformer,
- kubeClient clientset.Interface,
- cloud cloudprovider.Interface,
- nodeMonitorPeriod time.Duration) (*CloudNodeLifecycleController, error) {
- eventBroadcaster := record.NewBroadcaster()
- recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-lifecycle-controller"})
- eventBroadcaster.StartLogging(klog.Infof)
- klog.Info("Sending events to api server")
- eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
- if kubeClient == nil {
- return nil, errors.New("kubernetes client is nil")
- }
- if cloud == nil {
- return nil, errors.New("no cloud provider provided")
- }
- if _, ok := cloud.Instances(); !ok {
- return nil, errors.New("cloud provider does not support instances")
- }
- c := &CloudNodeLifecycleController{
- kubeClient: kubeClient,
- nodeLister: nodeInformer.Lister(),
- recorder: recorder,
- cloud: cloud,
- nodeMonitorPeriod: nodeMonitorPeriod,
- }
- return c, nil
- }
- func (c *CloudNodeLifecycleController) Run(stopCh <-chan struct{}) {
- defer utilruntime.HandleCrash()
-
-
-
-
-
- wait.Until(c.MonitorNodes, c.nodeMonitorPeriod, stopCh)
- }
- func (c *CloudNodeLifecycleController) MonitorNodes() {
- instances, ok := c.cloud.Instances()
- if !ok {
- utilruntime.HandleError(fmt.Errorf("failed to get instances from cloud provider"))
- return
- }
- nodes, err := c.nodeLister.List(labels.Everything())
- if err != nil {
- klog.Errorf("error listing nodes from cache: %s", err)
- return
- }
- for _, node := range nodes {
-
- status := v1.ConditionUnknown
- if _, c := cloudnodeutil.GetNodeCondition(&node.Status, v1.NodeReady); c != nil {
- status = c.Status
- }
- if status == v1.ConditionTrue {
-
- err = controller.RemoveTaintOffNode(c.kubeClient, node.Name, node, ShutdownTaint)
- if err != nil {
- klog.Errorf("error patching node taints: %v", err)
- }
- continue
- }
-
-
-
- shutdown, err := shutdownInCloudProvider(context.TODO(), c.cloud, node)
- if err != nil {
- klog.Errorf("error checking if node %s is shutdown: %v", node.Name, err)
- }
- if shutdown && err == nil {
-
- err = controller.AddOrUpdateTaintOnNode(c.kubeClient, node.Name, ShutdownTaint)
- if err != nil {
- klog.Errorf("failed to apply shutdown taint to node %s, it may have been deleted.", node.Name)
- }
-
- continue
- }
-
-
- exists, err := ensureNodeExistsByProviderID(context.TODO(), instances, node)
- if err != nil {
- klog.Errorf("error checking if node %s exists: %v", node.Name, err)
- continue
- }
- if exists {
-
- continue
- }
- klog.V(2).Infof("deleting node since it is no longer present in cloud provider: %s", node.Name)
- ref := &v1.ObjectReference{
- Kind: "Node",
- Name: node.Name,
- UID: types.UID(node.UID),
- Namespace: "",
- }
- c.recorder.Eventf(ref, v1.EventTypeNormal,
- fmt.Sprintf("Deleting node %v because it does not exist in the cloud provider", node.Name),
- "Node %s event: %s", node.Name, deleteNodeEvent)
- if err := c.kubeClient.CoreV1().Nodes().Delete(context.TODO(), node.Name, nil); err != nil {
- klog.Errorf("unable to delete node %q: %v", node.Name, err)
- }
- }
- }
- func shutdownInCloudProvider(ctx context.Context, cloud cloudprovider.Interface, node *v1.Node) (bool, error) {
- instances, ok := cloud.Instances()
- if !ok {
- return false, errors.New("cloud provider does not support instances")
- }
- shutdown, err := instances.InstanceShutdownByProviderID(ctx, node.Spec.ProviderID)
- if err == cloudprovider.NotImplemented {
- return false, nil
- }
- return shutdown, err
- }
|