node_controller.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package cloud
  14. import (
  15. "context"
  16. "errors"
  17. "fmt"
  18. "time"
  19. v1 "k8s.io/api/core/v1"
  20. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  21. "k8s.io/apimachinery/pkg/types"
  22. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  23. "k8s.io/apimachinery/pkg/util/wait"
  24. coreinformers "k8s.io/client-go/informers/core/v1"
  25. clientset "k8s.io/client-go/kubernetes"
  26. "k8s.io/client-go/kubernetes/scheme"
  27. v1core "k8s.io/client-go/kubernetes/typed/core/v1"
  28. "k8s.io/client-go/tools/cache"
  29. "k8s.io/client-go/tools/record"
  30. clientretry "k8s.io/client-go/util/retry"
  31. cloudprovider "k8s.io/cloud-provider"
  32. "k8s.io/klog"
  33. kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
  34. schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
  35. nodeutil "k8s.io/kubernetes/pkg/util/node"
  36. )
  37. var UpdateNodeSpecBackoff = wait.Backoff{
  38. Steps: 20,
  39. Duration: 50 * time.Millisecond,
  40. Jitter: 1.0,
  41. }
  42. type CloudNodeController struct {
  43. nodeInformer coreinformers.NodeInformer
  44. kubeClient clientset.Interface
  45. recorder record.EventRecorder
  46. cloud cloudprovider.Interface
  47. nodeStatusUpdateFrequency time.Duration
  48. }
  49. // NewCloudNodeController creates a CloudNodeController object
  50. func NewCloudNodeController(
  51. nodeInformer coreinformers.NodeInformer,
  52. kubeClient clientset.Interface,
  53. cloud cloudprovider.Interface,
  54. nodeStatusUpdateFrequency time.Duration) *CloudNodeController {
  55. eventBroadcaster := record.NewBroadcaster()
  56. recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-controller"})
  57. eventBroadcaster.StartLogging(klog.Infof)
  58. if kubeClient != nil {
  59. klog.V(0).Infof("Sending events to api server.")
  60. eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
  61. } else {
  62. klog.V(0).Infof("No api server defined - no events will be sent to API server.")
  63. }
  64. cnc := &CloudNodeController{
  65. nodeInformer: nodeInformer,
  66. kubeClient: kubeClient,
  67. recorder: recorder,
  68. cloud: cloud,
  69. nodeStatusUpdateFrequency: nodeStatusUpdateFrequency,
  70. }
  71. // Use shared informer to listen to add/update of nodes. Note that any nodes
  72. // that exist before node controller starts will show up in the update method
  73. cnc.nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  74. AddFunc: cnc.AddCloudNode,
  75. UpdateFunc: cnc.UpdateCloudNode,
  76. })
  77. return cnc
  78. }
  79. // This controller updates newly registered nodes with information
  80. // from the cloud provider. This call is blocking so should be called
  81. // via a goroutine
  82. func (cnc *CloudNodeController) Run(stopCh <-chan struct{}) {
  83. defer utilruntime.HandleCrash()
  84. // The following loops run communicate with the APIServer with a worst case complexity
  85. // of O(num_nodes) per cycle. These functions are justified here because these events fire
  86. // very infrequently. DO NOT MODIFY this to perform frequent operations.
  87. // Start a loop to periodically update the node addresses obtained from the cloud
  88. wait.Until(cnc.UpdateNodeStatus, cnc.nodeStatusUpdateFrequency, stopCh)
  89. }
  90. // UpdateNodeStatus updates the node status, such as node addresses
  91. func (cnc *CloudNodeController) UpdateNodeStatus() {
  92. instances, ok := cnc.cloud.Instances()
  93. if !ok {
  94. utilruntime.HandleError(fmt.Errorf("failed to get instances from cloud provider"))
  95. return
  96. }
  97. nodes, err := cnc.kubeClient.CoreV1().Nodes().List(metav1.ListOptions{ResourceVersion: "0"})
  98. if err != nil {
  99. klog.Errorf("Error monitoring node status: %v", err)
  100. return
  101. }
  102. for i := range nodes.Items {
  103. cnc.updateNodeAddress(&nodes.Items[i], instances)
  104. }
  105. }
  106. // UpdateNodeAddress updates the nodeAddress of a single node
  107. func (cnc *CloudNodeController) updateNodeAddress(node *v1.Node, instances cloudprovider.Instances) {
  108. // Do not process nodes that are still tainted
  109. cloudTaint := getCloudTaint(node.Spec.Taints)
  110. if cloudTaint != nil {
  111. klog.V(5).Infof("This node %s is still tainted. Will not process.", node.Name)
  112. return
  113. }
  114. // Node that isn't present according to the cloud provider shouldn't have its address updated
  115. exists, err := ensureNodeExistsByProviderID(instances, node)
  116. if err != nil {
  117. // Continue to update node address when not sure the node is not exists
  118. klog.Errorf("%v", err)
  119. } else if !exists {
  120. klog.V(4).Infof("The node %s is no longer present according to the cloud provider, do not process.", node.Name)
  121. return
  122. }
  123. nodeAddresses, err := getNodeAddressesByProviderIDOrName(instances, node)
  124. if err != nil {
  125. klog.Errorf("%v", err)
  126. return
  127. }
  128. if len(nodeAddresses) == 0 {
  129. klog.V(5).Infof("Skipping node address update for node %q since cloud provider did not return any", node.Name)
  130. return
  131. }
  132. // Check if a hostname address exists in the cloud provided addresses
  133. hostnameExists := false
  134. for i := range nodeAddresses {
  135. if nodeAddresses[i].Type == v1.NodeHostName {
  136. hostnameExists = true
  137. }
  138. }
  139. // If hostname was not present in cloud provided addresses, use the hostname
  140. // from the existing node (populated by kubelet)
  141. if !hostnameExists {
  142. for _, addr := range node.Status.Addresses {
  143. if addr.Type == v1.NodeHostName {
  144. nodeAddresses = append(nodeAddresses, addr)
  145. }
  146. }
  147. }
  148. // If nodeIP was suggested by user, ensure that
  149. // it can be found in the cloud as well (consistent with the behaviour in kubelet)
  150. if nodeIP, ok := ensureNodeProvidedIPExists(node, nodeAddresses); ok {
  151. if nodeIP == nil {
  152. klog.Errorf("Specified Node IP not found in cloudprovider")
  153. return
  154. }
  155. }
  156. if !nodeAddressesChangeDetected(node.Status.Addresses, nodeAddresses) {
  157. return
  158. }
  159. newNode := node.DeepCopy()
  160. newNode.Status.Addresses = nodeAddresses
  161. _, _, err = nodeutil.PatchNodeStatus(cnc.kubeClient.CoreV1(), types.NodeName(node.Name), node, newNode)
  162. if err != nil {
  163. klog.Errorf("Error patching node with cloud ip addresses = [%v]", err)
  164. }
  165. }
  166. func (cnc *CloudNodeController) UpdateCloudNode(_, newObj interface{}) {
  167. node, ok := newObj.(*v1.Node)
  168. if !ok {
  169. utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj))
  170. return
  171. }
  172. cloudTaint := getCloudTaint(node.Spec.Taints)
  173. if cloudTaint == nil {
  174. // The node has already been initialized so nothing to do.
  175. return
  176. }
  177. cnc.initializeNode(node)
  178. }
  179. // AddCloudNode handles initializing new nodes registered with the cloud taint.
  180. func (cnc *CloudNodeController) AddCloudNode(obj interface{}) {
  181. node := obj.(*v1.Node)
  182. cloudTaint := getCloudTaint(node.Spec.Taints)
  183. if cloudTaint == nil {
  184. klog.V(2).Infof("This node %s is registered without the cloud taint. Will not process.", node.Name)
  185. return
  186. }
  187. cnc.initializeNode(node)
  188. }
  189. // This processes nodes that were added into the cluster, and cloud initialize them if appropriate
  190. func (cnc *CloudNodeController) initializeNode(node *v1.Node) {
  191. instances, ok := cnc.cloud.Instances()
  192. if !ok {
  193. utilruntime.HandleError(fmt.Errorf("failed to get instances from cloud provider"))
  194. return
  195. }
  196. err := clientretry.RetryOnConflict(UpdateNodeSpecBackoff, func() error {
  197. // TODO(wlan0): Move this logic to the route controller using the node taint instead of condition
  198. // Since there are node taints, do we still need this?
  199. // This condition marks the node as unusable until routes are initialized in the cloud provider
  200. if cnc.cloud.ProviderName() == "gce" {
  201. if err := nodeutil.SetNodeCondition(cnc.kubeClient, types.NodeName(node.Name), v1.NodeCondition{
  202. Type: v1.NodeNetworkUnavailable,
  203. Status: v1.ConditionTrue,
  204. Reason: "NoRouteCreated",
  205. Message: "Node created without a route",
  206. LastTransitionTime: metav1.Now(),
  207. }); err != nil {
  208. return err
  209. }
  210. }
  211. curNode, err := cnc.kubeClient.CoreV1().Nodes().Get(node.Name, metav1.GetOptions{})
  212. if err != nil {
  213. return err
  214. }
  215. cloudTaint := getCloudTaint(curNode.Spec.Taints)
  216. if cloudTaint == nil {
  217. // Node object received from event had the cloud taint but was outdated,
  218. // the node has actually already been initialized.
  219. return nil
  220. }
  221. if curNode.Spec.ProviderID == "" {
  222. providerID, err := cloudprovider.GetInstanceProviderID(context.TODO(), cnc.cloud, types.NodeName(curNode.Name))
  223. if err == nil {
  224. curNode.Spec.ProviderID = providerID
  225. } else {
  226. // we should attempt to set providerID on curNode, but
  227. // we can continue if we fail since we will attempt to set
  228. // node addresses given the node name in getNodeAddressesByProviderIDOrName
  229. klog.Errorf("failed to set node provider id: %v", err)
  230. }
  231. }
  232. nodeAddresses, err := getNodeAddressesByProviderIDOrName(instances, curNode)
  233. if err != nil {
  234. return err
  235. }
  236. // If user provided an IP address, ensure that IP address is found
  237. // in the cloud provider before removing the taint on the node
  238. if nodeIP, ok := ensureNodeProvidedIPExists(curNode, nodeAddresses); ok {
  239. if nodeIP == nil {
  240. return errors.New("failed to find kubelet node IP from cloud provider")
  241. }
  242. }
  243. if instanceType, err := getInstanceTypeByProviderIDOrName(instances, curNode); err != nil {
  244. return err
  245. } else if instanceType != "" {
  246. klog.V(2).Infof("Adding node label from cloud provider: %s=%s", v1.LabelInstanceType, instanceType)
  247. curNode.ObjectMeta.Labels[v1.LabelInstanceType] = instanceType
  248. }
  249. if zones, ok := cnc.cloud.Zones(); ok {
  250. zone, err := getZoneByProviderIDOrName(zones, curNode)
  251. if err != nil {
  252. return fmt.Errorf("failed to get zone from cloud provider: %v", err)
  253. }
  254. if zone.FailureDomain != "" {
  255. klog.V(2).Infof("Adding node label from cloud provider: %s=%s", v1.LabelZoneFailureDomain, zone.FailureDomain)
  256. curNode.ObjectMeta.Labels[v1.LabelZoneFailureDomain] = zone.FailureDomain
  257. }
  258. if zone.Region != "" {
  259. klog.V(2).Infof("Adding node label from cloud provider: %s=%s", v1.LabelZoneRegion, zone.Region)
  260. curNode.ObjectMeta.Labels[v1.LabelZoneRegion] = zone.Region
  261. }
  262. }
  263. curNode.Spec.Taints = excludeCloudTaint(curNode.Spec.Taints)
  264. _, err = cnc.kubeClient.CoreV1().Nodes().Update(curNode)
  265. if err != nil {
  266. return err
  267. }
  268. // After adding, call UpdateNodeAddress to set the CloudProvider provided IPAddresses
  269. // So that users do not see any significant delay in IP addresses being filled into the node
  270. cnc.updateNodeAddress(curNode, instances)
  271. klog.Infof("Successfully initialized node %s with cloud provider", node.Name)
  272. return nil
  273. })
  274. if err != nil {
  275. utilruntime.HandleError(err)
  276. return
  277. }
  278. }
  279. func getCloudTaint(taints []v1.Taint) *v1.Taint {
  280. for _, taint := range taints {
  281. if taint.Key == schedulerapi.TaintExternalCloudProvider {
  282. return &taint
  283. }
  284. }
  285. return nil
  286. }
  287. func excludeCloudTaint(taints []v1.Taint) []v1.Taint {
  288. newTaints := []v1.Taint{}
  289. for _, taint := range taints {
  290. if taint.Key == schedulerapi.TaintExternalCloudProvider {
  291. continue
  292. }
  293. newTaints = append(newTaints, taint)
  294. }
  295. return newTaints
  296. }
  297. // ensureNodeExistsByProviderID checks if the instance exists by the provider id,
  298. // If provider id in spec is empty it calls instanceId with node name to get provider id
  299. func ensureNodeExistsByProviderID(instances cloudprovider.Instances, node *v1.Node) (bool, error) {
  300. providerID := node.Spec.ProviderID
  301. if providerID == "" {
  302. var err error
  303. providerID, err = instances.InstanceID(context.TODO(), types.NodeName(node.Name))
  304. if err != nil {
  305. if err == cloudprovider.InstanceNotFound {
  306. return false, nil
  307. }
  308. return false, err
  309. }
  310. if providerID == "" {
  311. klog.Warningf("Cannot find valid providerID for node name %q, assuming non existence", node.Name)
  312. return false, nil
  313. }
  314. }
  315. return instances.InstanceExistsByProviderID(context.TODO(), providerID)
  316. }
  317. func getNodeAddressesByProviderIDOrName(instances cloudprovider.Instances, node *v1.Node) ([]v1.NodeAddress, error) {
  318. nodeAddresses, err := instances.NodeAddressesByProviderID(context.TODO(), node.Spec.ProviderID)
  319. if err != nil {
  320. providerIDErr := err
  321. nodeAddresses, err = instances.NodeAddresses(context.TODO(), types.NodeName(node.Name))
  322. if err != nil {
  323. return nil, fmt.Errorf("NodeAddress: Error fetching by providerID: %v Error fetching by NodeName: %v", providerIDErr, err)
  324. }
  325. }
  326. return nodeAddresses, nil
  327. }
  328. func nodeAddressesChangeDetected(addressSet1, addressSet2 []v1.NodeAddress) bool {
  329. if len(addressSet1) != len(addressSet2) {
  330. return true
  331. }
  332. addressMap1 := map[v1.NodeAddressType]string{}
  333. for i := range addressSet1 {
  334. addressMap1[addressSet1[i].Type] = addressSet1[i].Address
  335. }
  336. for _, v := range addressSet2 {
  337. if addressMap1[v.Type] != v.Address {
  338. return true
  339. }
  340. }
  341. return false
  342. }
  343. func ensureNodeProvidedIPExists(node *v1.Node, nodeAddresses []v1.NodeAddress) (*v1.NodeAddress, bool) {
  344. var nodeIP *v1.NodeAddress
  345. nodeIPExists := false
  346. if providedIP, ok := node.ObjectMeta.Annotations[kubeletapis.AnnotationProvidedIPAddr]; ok {
  347. nodeIPExists = true
  348. for i := range nodeAddresses {
  349. if nodeAddresses[i].Address == providedIP {
  350. nodeIP = &nodeAddresses[i]
  351. break
  352. }
  353. }
  354. }
  355. return nodeIP, nodeIPExists
  356. }
  357. func getInstanceTypeByProviderIDOrName(instances cloudprovider.Instances, node *v1.Node) (string, error) {
  358. instanceType, err := instances.InstanceTypeByProviderID(context.TODO(), node.Spec.ProviderID)
  359. if err != nil {
  360. providerIDErr := err
  361. instanceType, err = instances.InstanceType(context.TODO(), types.NodeName(node.Name))
  362. if err != nil {
  363. return "", fmt.Errorf("InstanceType: Error fetching by providerID: %v Error fetching by NodeName: %v", providerIDErr, err)
  364. }
  365. }
  366. return instanceType, err
  367. }
  368. // getZoneByProviderIDorName will attempt to get the zone of node using its providerID
  369. // then it's name. If both attempts fail, an error is returned
  370. func getZoneByProviderIDOrName(zones cloudprovider.Zones, node *v1.Node) (cloudprovider.Zone, error) {
  371. zone, err := zones.GetZoneByProviderID(context.TODO(), node.Spec.ProviderID)
  372. if err != nil {
  373. providerIDErr := err
  374. zone, err = zones.GetZoneByNodeName(context.TODO(), types.NodeName(node.Name))
  375. if err != nil {
  376. return cloudprovider.Zone{}, fmt.Errorf("Zone: Error fetching by providerID: %v Error fetching by NodeName: %v", providerIDErr, err)
  377. }
  378. }
  379. return zone, nil
  380. }