node_lifecycle_controller.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package cloud
  14. import (
  15. "context"
  16. "errors"
  17. "fmt"
  18. "time"
  19. "k8s.io/api/core/v1"
  20. "k8s.io/apimachinery/pkg/labels"
  21. "k8s.io/apimachinery/pkg/types"
  22. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  23. "k8s.io/apimachinery/pkg/util/wait"
  24. coreinformers "k8s.io/client-go/informers/core/v1"
  25. clientset "k8s.io/client-go/kubernetes"
  26. "k8s.io/client-go/kubernetes/scheme"
  27. v1core "k8s.io/client-go/kubernetes/typed/core/v1"
  28. v1lister "k8s.io/client-go/listers/core/v1"
  29. "k8s.io/client-go/tools/record"
  30. cloudprovider "k8s.io/cloud-provider"
  31. "k8s.io/klog"
  32. "k8s.io/kubernetes/pkg/controller"
  33. nodeutil "k8s.io/kubernetes/pkg/controller/util/node"
  34. schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
  35. )
  36. const (
  37. deleteNodeEvent = "DeletingNode"
  38. )
  39. var ShutdownTaint = &v1.Taint{
  40. Key: schedulerapi.TaintNodeShutdown,
  41. Effect: v1.TaintEffectNoSchedule,
  42. }
  43. // CloudNodeLifecycleController is responsible for deleting/updating kubernetes
  44. // nodes that have been deleted/shutdown on the cloud provider
  45. type CloudNodeLifecycleController struct {
  46. kubeClient clientset.Interface
  47. nodeLister v1lister.NodeLister
  48. recorder record.EventRecorder
  49. cloud cloudprovider.Interface
  50. // Value controlling NodeController monitoring period, i.e. how often does NodeController
  51. // check node status posted from kubelet. This value should be lower than nodeMonitorGracePeriod
  52. // set in controller-manager
  53. nodeMonitorPeriod time.Duration
  54. }
  55. func NewCloudNodeLifecycleController(
  56. nodeInformer coreinformers.NodeInformer,
  57. kubeClient clientset.Interface,
  58. cloud cloudprovider.Interface,
  59. nodeMonitorPeriod time.Duration) (*CloudNodeLifecycleController, error) {
  60. eventBroadcaster := record.NewBroadcaster()
  61. recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-lifecycle-controller"})
  62. eventBroadcaster.StartLogging(klog.Infof)
  63. klog.Info("Sending events to api server")
  64. eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
  65. if kubeClient == nil {
  66. return nil, errors.New("kubernetes client is nil")
  67. }
  68. if cloud == nil {
  69. return nil, errors.New("no cloud provider provided")
  70. }
  71. if _, ok := cloud.Instances(); !ok {
  72. return nil, errors.New("cloud provider does not support instances")
  73. }
  74. c := &CloudNodeLifecycleController{
  75. kubeClient: kubeClient,
  76. nodeLister: nodeInformer.Lister(),
  77. recorder: recorder,
  78. cloud: cloud,
  79. nodeMonitorPeriod: nodeMonitorPeriod,
  80. }
  81. return c, nil
  82. }
  83. // Run starts the main loop for this controller. Run is blocking so should
  84. // be called via a goroutine
  85. func (c *CloudNodeLifecycleController) Run(stopCh <-chan struct{}) {
  86. defer utilruntime.HandleCrash()
  87. // The following loops run communicate with the APIServer with a worst case complexity
  88. // of O(num_nodes) per cycle. These functions are justified here because these events fire
  89. // very infrequently. DO NOT MODIFY this to perform frequent operations.
  90. // Start a loop to periodically check if any nodes have been
  91. // deleted or shutdown from the cloudprovider
  92. wait.Until(c.MonitorNodes, c.nodeMonitorPeriod, stopCh)
  93. }
  94. // MonitorNodes checks to see if nodes in the cluster have been deleted
  95. // or shutdown. If deleeted, it deletes the node resource. If shutdown it
  96. // applies a shutdown taint to the node
  97. func (c *CloudNodeLifecycleController) MonitorNodes() {
  98. instances, ok := c.cloud.Instances()
  99. if !ok {
  100. utilruntime.HandleError(fmt.Errorf("failed to get instances from cloud provider"))
  101. return
  102. }
  103. nodes, err := c.nodeLister.List(labels.Everything())
  104. if err != nil {
  105. klog.Errorf("error listing nodes from cache: %s", err)
  106. return
  107. }
  108. for _, node := range nodes {
  109. // Default NodeReady status to v1.ConditionUnknown
  110. status := v1.ConditionUnknown
  111. if _, c := nodeutil.GetNodeCondition(&node.Status, v1.NodeReady); c != nil {
  112. status = c.Status
  113. }
  114. if status == v1.ConditionTrue {
  115. // if taint exist remove taint
  116. err = controller.RemoveTaintOffNode(c.kubeClient, node.Name, node, ShutdownTaint)
  117. if err != nil {
  118. klog.Errorf("error patching node taints: %v", err)
  119. }
  120. continue
  121. }
  122. // we need to check this first to get taint working in similar in all cloudproviders
  123. // current problem is that shutdown nodes are not working in similar way ie. all cloudproviders
  124. // does not delete node from kubernetes cluster when instance it is shutdown see issue #46442
  125. shutdown, err := shutdownInCloudProvider(context.TODO(), c.cloud, node)
  126. if err != nil {
  127. klog.Errorf("error checking if node %s is shutdown: %v", node.Name, err)
  128. }
  129. if shutdown && err == nil {
  130. // if node is shutdown add shutdown taint
  131. err = controller.AddOrUpdateTaintOnNode(c.kubeClient, node.Name, ShutdownTaint)
  132. if err != nil {
  133. klog.Errorf("failed to apply shutdown taint to node %s, it may have been deleted.", node.Name)
  134. }
  135. // Continue checking the remaining nodes since the current one is shutdown.
  136. continue
  137. }
  138. // At this point the node has NotReady status, we need to check if the node has been removed
  139. // from the cloud provider. If node cannot be found in cloudprovider, then delete the node
  140. exists, err := ensureNodeExistsByProviderID(instances, node)
  141. if err != nil {
  142. klog.Errorf("error checking if node %s exists: %v", node.Name, err)
  143. continue
  144. }
  145. if exists {
  146. // Continue checking the remaining nodes since the current one is fine.
  147. continue
  148. }
  149. klog.V(2).Infof("deleting node since it is no longer present in cloud provider: %s", node.Name)
  150. ref := &v1.ObjectReference{
  151. Kind: "Node",
  152. Name: node.Name,
  153. UID: types.UID(node.UID),
  154. Namespace: "",
  155. }
  156. c.recorder.Eventf(ref, v1.EventTypeNormal,
  157. fmt.Sprintf("Deleting node %v because it does not exist in the cloud provider", node.Name),
  158. "Node %s event: %s", node.Name, deleteNodeEvent)
  159. if err := c.kubeClient.CoreV1().Nodes().Delete(node.Name, nil); err != nil {
  160. klog.Errorf("unable to delete node %q: %v", node.Name, err)
  161. }
  162. }
  163. }
  164. // shutdownInCloudProvider returns true if the node is shutdown on the cloud provider
  165. func shutdownInCloudProvider(ctx context.Context, cloud cloudprovider.Interface, node *v1.Node) (bool, error) {
  166. instances, ok := cloud.Instances()
  167. if !ok {
  168. return false, errors.New("cloud provider does not support instances")
  169. }
  170. shutdown, err := instances.InstanceShutdownByProviderID(ctx, node.Spec.ProviderID)
  171. if err == cloudprovider.NotImplemented {
  172. return false, nil
  173. }
  174. return shutdown, err
  175. }