kubelet_node_status.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package kubelet
  14. import (
  15. "context"
  16. "fmt"
  17. "net"
  18. goruntime "runtime"
  19. "sort"
  20. "strings"
  21. "time"
  22. v1 "k8s.io/api/core/v1"
  23. apiequality "k8s.io/apimachinery/pkg/api/equality"
  24. apierrors "k8s.io/apimachinery/pkg/api/errors"
  25. "k8s.io/apimachinery/pkg/api/resource"
  26. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  27. "k8s.io/apimachinery/pkg/types"
  28. cloudprovider "k8s.io/cloud-provider"
  29. "k8s.io/klog"
  30. k8s_api_v1 "k8s.io/kubernetes/pkg/apis/core/v1"
  31. v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
  32. kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
  33. "k8s.io/kubernetes/pkg/kubelet/events"
  34. "k8s.io/kubernetes/pkg/kubelet/nodestatus"
  35. "k8s.io/kubernetes/pkg/kubelet/util"
  36. schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
  37. nodeutil "k8s.io/kubernetes/pkg/util/node"
  38. taintutil "k8s.io/kubernetes/pkg/util/taints"
  39. volutil "k8s.io/kubernetes/pkg/volume/util"
  40. )
  41. // registerWithAPIServer registers the node with the cluster master. It is safe
  42. // to call multiple times, but not concurrently (kl.registrationCompleted is
  43. // not locked).
  44. func (kl *Kubelet) registerWithAPIServer() {
  45. if kl.registrationCompleted {
  46. return
  47. }
  48. step := 100 * time.Millisecond
  49. for {
  50. time.Sleep(step)
  51. step = step * 2
  52. if step >= 7*time.Second {
  53. step = 7 * time.Second
  54. }
  55. node, err := kl.initialNode(context.TODO())
  56. if err != nil {
  57. klog.Errorf("Unable to construct v1.Node object for kubelet: %v", err)
  58. continue
  59. }
  60. klog.Infof("Attempting to register node %s", node.Name)
  61. registered := kl.tryRegisterWithAPIServer(node)
  62. if registered {
  63. klog.Infof("Successfully registered node %s", node.Name)
  64. kl.registrationCompleted = true
  65. return
  66. }
  67. }
  68. }
  69. // tryRegisterWithAPIServer makes an attempt to register the given node with
  70. // the API server, returning a boolean indicating whether the attempt was
  71. // successful. If a node with the same name already exists, it reconciles the
  72. // value of the annotation for controller-managed attach-detach of attachable
  73. // persistent volumes for the node.
  74. func (kl *Kubelet) tryRegisterWithAPIServer(node *v1.Node) bool {
  75. _, err := kl.kubeClient.CoreV1().Nodes().Create(context.TODO(), node, metav1.CreateOptions{})
  76. if err == nil {
  77. return true
  78. }
  79. if !apierrors.IsAlreadyExists(err) {
  80. klog.Errorf("Unable to register node %q with API server: %v", kl.nodeName, err)
  81. return false
  82. }
  83. existingNode, err := kl.kubeClient.CoreV1().Nodes().Get(context.TODO(), string(kl.nodeName), metav1.GetOptions{})
  84. if err != nil {
  85. klog.Errorf("Unable to register node %q with API server: error getting existing node: %v", kl.nodeName, err)
  86. return false
  87. }
  88. if existingNode == nil {
  89. klog.Errorf("Unable to register node %q with API server: no node instance returned", kl.nodeName)
  90. return false
  91. }
  92. originalNode := existingNode.DeepCopy()
  93. if originalNode == nil {
  94. klog.Errorf("Nil %q node object", kl.nodeName)
  95. return false
  96. }
  97. klog.Infof("Node %s was previously registered", kl.nodeName)
  98. // Edge case: the node was previously registered; reconcile
  99. // the value of the controller-managed attach-detach
  100. // annotation.
  101. requiresUpdate := kl.reconcileCMADAnnotationWithExistingNode(node, existingNode)
  102. requiresUpdate = kl.updateDefaultLabels(node, existingNode) || requiresUpdate
  103. requiresUpdate = kl.reconcileExtendedResource(node, existingNode) || requiresUpdate
  104. if requiresUpdate {
  105. if _, _, err := nodeutil.PatchNodeStatus(kl.kubeClient.CoreV1(), types.NodeName(kl.nodeName), originalNode, existingNode); err != nil {
  106. klog.Errorf("Unable to reconcile node %q with API server: error updating node: %v", kl.nodeName, err)
  107. return false
  108. }
  109. }
  110. return true
  111. }
  112. // Zeros out extended resource capacity during reconciliation.
  113. func (kl *Kubelet) reconcileExtendedResource(initialNode, node *v1.Node) bool {
  114. requiresUpdate := false
  115. // Check with the device manager to see if node has been recreated, in which case extended resources should be zeroed until they are available
  116. if kl.containerManager.ShouldResetExtendedResourceCapacity() {
  117. for k := range node.Status.Capacity {
  118. if v1helper.IsExtendedResourceName(k) {
  119. klog.Infof("Zero out resource %s capacity in existing node.", k)
  120. node.Status.Capacity[k] = *resource.NewQuantity(int64(0), resource.DecimalSI)
  121. node.Status.Allocatable[k] = *resource.NewQuantity(int64(0), resource.DecimalSI)
  122. requiresUpdate = true
  123. }
  124. }
  125. }
  126. return requiresUpdate
  127. }
  128. // updateDefaultLabels will set the default labels on the node
  129. func (kl *Kubelet) updateDefaultLabels(initialNode, existingNode *v1.Node) bool {
  130. defaultLabels := []string{
  131. v1.LabelHostname,
  132. v1.LabelZoneFailureDomainStable,
  133. v1.LabelZoneRegionStable,
  134. v1.LabelZoneFailureDomain,
  135. v1.LabelZoneRegion,
  136. v1.LabelInstanceTypeStable,
  137. v1.LabelInstanceType,
  138. v1.LabelOSStable,
  139. v1.LabelArchStable,
  140. v1.LabelWindowsBuild,
  141. kubeletapis.LabelOS,
  142. kubeletapis.LabelArch,
  143. }
  144. needsUpdate := false
  145. if existingNode.Labels == nil {
  146. existingNode.Labels = make(map[string]string)
  147. }
  148. //Set default labels but make sure to not set labels with empty values
  149. for _, label := range defaultLabels {
  150. if _, hasInitialValue := initialNode.Labels[label]; !hasInitialValue {
  151. continue
  152. }
  153. if existingNode.Labels[label] != initialNode.Labels[label] {
  154. existingNode.Labels[label] = initialNode.Labels[label]
  155. needsUpdate = true
  156. }
  157. if existingNode.Labels[label] == "" {
  158. delete(existingNode.Labels, label)
  159. }
  160. }
  161. return needsUpdate
  162. }
  163. // reconcileCMADAnnotationWithExistingNode reconciles the controller-managed
  164. // attach-detach annotation on a new node and the existing node, returning
  165. // whether the existing node must be updated.
  166. func (kl *Kubelet) reconcileCMADAnnotationWithExistingNode(node, existingNode *v1.Node) bool {
  167. var (
  168. existingCMAAnnotation = existingNode.Annotations[volutil.ControllerManagedAttachAnnotation]
  169. newCMAAnnotation, newSet = node.Annotations[volutil.ControllerManagedAttachAnnotation]
  170. )
  171. if newCMAAnnotation == existingCMAAnnotation {
  172. return false
  173. }
  174. // If the just-constructed node and the existing node do
  175. // not have the same value, update the existing node with
  176. // the correct value of the annotation.
  177. if !newSet {
  178. klog.Info("Controller attach-detach setting changed to false; updating existing Node")
  179. delete(existingNode.Annotations, volutil.ControllerManagedAttachAnnotation)
  180. } else {
  181. klog.Info("Controller attach-detach setting changed to true; updating existing Node")
  182. if existingNode.Annotations == nil {
  183. existingNode.Annotations = make(map[string]string)
  184. }
  185. existingNode.Annotations[volutil.ControllerManagedAttachAnnotation] = newCMAAnnotation
  186. }
  187. return true
  188. }
  189. // initialNode constructs the initial v1.Node for this Kubelet, incorporating node
  190. // labels, information from the cloud provider, and Kubelet configuration.
  191. func (kl *Kubelet) initialNode(ctx context.Context) (*v1.Node, error) {
  192. node := &v1.Node{
  193. ObjectMeta: metav1.ObjectMeta{
  194. Name: string(kl.nodeName),
  195. Labels: map[string]string{
  196. v1.LabelHostname: kl.hostname,
  197. v1.LabelOSStable: goruntime.GOOS,
  198. v1.LabelArchStable: goruntime.GOARCH,
  199. kubeletapis.LabelOS: goruntime.GOOS,
  200. kubeletapis.LabelArch: goruntime.GOARCH,
  201. },
  202. },
  203. Spec: v1.NodeSpec{
  204. Unschedulable: !kl.registerSchedulable,
  205. },
  206. }
  207. osLabels, err := getOSSpecificLabels()
  208. if err != nil {
  209. return nil, err
  210. }
  211. for label, value := range osLabels {
  212. node.Labels[label] = value
  213. }
  214. nodeTaints := make([]v1.Taint, 0)
  215. if len(kl.registerWithTaints) > 0 {
  216. taints := make([]v1.Taint, len(kl.registerWithTaints))
  217. for i := range kl.registerWithTaints {
  218. if err := k8s_api_v1.Convert_core_Taint_To_v1_Taint(&kl.registerWithTaints[i], &taints[i], nil); err != nil {
  219. return nil, err
  220. }
  221. }
  222. nodeTaints = append(nodeTaints, taints...)
  223. }
  224. unschedulableTaint := v1.Taint{
  225. Key: v1.TaintNodeUnschedulable,
  226. Effect: v1.TaintEffectNoSchedule,
  227. }
  228. // Taint node with TaintNodeUnschedulable when initializing
  229. // node to avoid race condition; refer to #63897 for more detail.
  230. if node.Spec.Unschedulable &&
  231. !taintutil.TaintExists(nodeTaints, &unschedulableTaint) {
  232. nodeTaints = append(nodeTaints, unschedulableTaint)
  233. }
  234. if kl.externalCloudProvider {
  235. taint := v1.Taint{
  236. Key: schedulerapi.TaintExternalCloudProvider,
  237. Value: "true",
  238. Effect: v1.TaintEffectNoSchedule,
  239. }
  240. nodeTaints = append(nodeTaints, taint)
  241. }
  242. if len(nodeTaints) > 0 {
  243. node.Spec.Taints = nodeTaints
  244. }
  245. // Initially, set NodeNetworkUnavailable to true.
  246. if kl.providerRequiresNetworkingConfiguration() {
  247. node.Status.Conditions = append(node.Status.Conditions, v1.NodeCondition{
  248. Type: v1.NodeNetworkUnavailable,
  249. Status: v1.ConditionTrue,
  250. Reason: "NoRouteCreated",
  251. Message: "Node created without a route",
  252. LastTransitionTime: metav1.NewTime(kl.clock.Now()),
  253. })
  254. }
  255. if kl.enableControllerAttachDetach {
  256. if node.Annotations == nil {
  257. node.Annotations = make(map[string]string)
  258. }
  259. klog.Infof("Setting node annotation to enable volume controller attach/detach")
  260. node.Annotations[volutil.ControllerManagedAttachAnnotation] = "true"
  261. } else {
  262. klog.Infof("Controller attach/detach is disabled for this node; Kubelet will attach and detach volumes")
  263. }
  264. if kl.keepTerminatedPodVolumes {
  265. if node.Annotations == nil {
  266. node.Annotations = make(map[string]string)
  267. }
  268. klog.Infof("Setting node annotation to keep pod volumes of terminated pods attached to the node")
  269. node.Annotations[volutil.KeepTerminatedPodVolumesAnnotation] = "true"
  270. }
  271. // @question: should this be place after the call to the cloud provider? which also applies labels
  272. for k, v := range kl.nodeLabels {
  273. if cv, found := node.ObjectMeta.Labels[k]; found {
  274. klog.Warningf("the node label %s=%s will overwrite default setting %s", k, v, cv)
  275. }
  276. node.ObjectMeta.Labels[k] = v
  277. }
  278. if kl.providerID != "" {
  279. node.Spec.ProviderID = kl.providerID
  280. }
  281. if kl.cloud != nil {
  282. instances, ok := kl.cloud.Instances()
  283. if !ok {
  284. return nil, fmt.Errorf("failed to get instances from cloud provider")
  285. }
  286. // TODO: We can't assume that the node has credentials to talk to the
  287. // cloudprovider from arbitrary nodes. At most, we should talk to a
  288. // local metadata server here.
  289. var err error
  290. if node.Spec.ProviderID == "" {
  291. node.Spec.ProviderID, err = cloudprovider.GetInstanceProviderID(ctx, kl.cloud, kl.nodeName)
  292. if err != nil {
  293. return nil, err
  294. }
  295. }
  296. instanceType, err := instances.InstanceType(ctx, kl.nodeName)
  297. if err != nil {
  298. return nil, err
  299. }
  300. if instanceType != "" {
  301. klog.Infof("Adding node label from cloud provider: %s=%s", v1.LabelInstanceType, instanceType)
  302. node.ObjectMeta.Labels[v1.LabelInstanceType] = instanceType
  303. klog.Infof("Adding node label from cloud provider: %s=%s", v1.LabelInstanceTypeStable, instanceType)
  304. node.ObjectMeta.Labels[v1.LabelInstanceTypeStable] = instanceType
  305. }
  306. // If the cloud has zone information, label the node with the zone information
  307. zones, ok := kl.cloud.Zones()
  308. if ok {
  309. zone, err := zones.GetZone(ctx)
  310. if err != nil {
  311. return nil, fmt.Errorf("failed to get zone from cloud provider: %v", err)
  312. }
  313. if zone.FailureDomain != "" {
  314. klog.Infof("Adding node label from cloud provider: %s=%s", v1.LabelZoneFailureDomain, zone.FailureDomain)
  315. node.ObjectMeta.Labels[v1.LabelZoneFailureDomain] = zone.FailureDomain
  316. klog.Infof("Adding node label from cloud provider: %s=%s", v1.LabelZoneFailureDomainStable, zone.FailureDomain)
  317. node.ObjectMeta.Labels[v1.LabelZoneFailureDomainStable] = zone.FailureDomain
  318. }
  319. if zone.Region != "" {
  320. klog.Infof("Adding node label from cloud provider: %s=%s", v1.LabelZoneRegion, zone.Region)
  321. node.ObjectMeta.Labels[v1.LabelZoneRegion] = zone.Region
  322. klog.Infof("Adding node label from cloud provider: %s=%s", v1.LabelZoneRegionStable, zone.Region)
  323. node.ObjectMeta.Labels[v1.LabelZoneRegionStable] = zone.Region
  324. }
  325. }
  326. }
  327. kl.setNodeStatus(node)
  328. return node, nil
  329. }
  330. // syncNodeStatus should be called periodically from a goroutine.
  331. // It synchronizes node status to master if there is any change or enough time
  332. // passed from the last sync, registering the kubelet first if necessary.
  333. func (kl *Kubelet) syncNodeStatus() {
  334. kl.syncNodeStatusMux.Lock()
  335. defer kl.syncNodeStatusMux.Unlock()
  336. if kl.kubeClient == nil || kl.heartbeatClient == nil {
  337. return
  338. }
  339. if kl.registerNode {
  340. // This will exit immediately if it doesn't need to do anything.
  341. kl.registerWithAPIServer()
  342. }
  343. if err := kl.updateNodeStatus(); err != nil {
  344. klog.Errorf("Unable to update node status: %v", err)
  345. }
  346. }
  347. // updateNodeStatus updates node status to master with retries if there is any
  348. // change or enough time passed from the last sync.
  349. func (kl *Kubelet) updateNodeStatus() error {
  350. klog.V(5).Infof("Updating node status")
  351. for i := 0; i < nodeStatusUpdateRetry; i++ {
  352. if err := kl.tryUpdateNodeStatus(i); err != nil {
  353. if i > 0 && kl.onRepeatedHeartbeatFailure != nil {
  354. kl.onRepeatedHeartbeatFailure()
  355. }
  356. klog.Errorf("Error updating node status, will retry: %v", err)
  357. } else {
  358. return nil
  359. }
  360. }
  361. return fmt.Errorf("update node status exceeds retry count")
  362. }
  363. // tryUpdateNodeStatus tries to update node status to master if there is any
  364. // change or enough time passed from the last sync.
  365. func (kl *Kubelet) tryUpdateNodeStatus(tryNumber int) error {
  366. // In large clusters, GET and PUT operations on Node objects coming
  367. // from here are the majority of load on apiserver and etcd.
  368. // To reduce the load on etcd, we are serving GET operations from
  369. // apiserver cache (the data might be slightly delayed but it doesn't
  370. // seem to cause more conflict - the delays are pretty small).
  371. // If it result in a conflict, all retries are served directly from etcd.
  372. opts := metav1.GetOptions{}
  373. if tryNumber == 0 {
  374. util.FromApiserverCache(&opts)
  375. }
  376. node, err := kl.heartbeatClient.CoreV1().Nodes().Get(context.TODO(), string(kl.nodeName), opts)
  377. if err != nil {
  378. return fmt.Errorf("error getting node %q: %v", kl.nodeName, err)
  379. }
  380. originalNode := node.DeepCopy()
  381. if originalNode == nil {
  382. return fmt.Errorf("nil %q node object", kl.nodeName)
  383. }
  384. podCIDRChanged := false
  385. if len(node.Spec.PodCIDRs) != 0 {
  386. // Pod CIDR could have been updated before, so we cannot rely on
  387. // node.Spec.PodCIDR being non-empty. We also need to know if pod CIDR is
  388. // actually changed.
  389. podCIDRs := strings.Join(node.Spec.PodCIDRs, ",")
  390. if podCIDRChanged, err = kl.updatePodCIDR(podCIDRs); err != nil {
  391. klog.Errorf(err.Error())
  392. }
  393. }
  394. kl.setNodeStatus(node)
  395. now := kl.clock.Now()
  396. if now.Before(kl.lastStatusReportTime.Add(kl.nodeStatusReportFrequency)) {
  397. if !podCIDRChanged && !nodeStatusHasChanged(&originalNode.Status, &node.Status) {
  398. // We must mark the volumes as ReportedInUse in volume manager's dsw even
  399. // if no changes were made to the node status (no volumes were added or removed
  400. // from the VolumesInUse list).
  401. //
  402. // The reason is that on a kubelet restart, the volume manager's dsw is
  403. // repopulated and the volume ReportedInUse is initialized to false, while the
  404. // VolumesInUse list from the Node object still contains the state from the
  405. // previous kubelet instantiation.
  406. //
  407. // Once the volumes are added to the dsw, the ReportedInUse field needs to be
  408. // synced from the VolumesInUse list in the Node.Status.
  409. //
  410. // The MarkVolumesAsReportedInUse() call cannot be performed in dsw directly
  411. // because it does not have access to the Node object.
  412. // This also cannot be populated on node status manager init because the volume
  413. // may not have been added to dsw at that time.
  414. kl.volumeManager.MarkVolumesAsReportedInUse(node.Status.VolumesInUse)
  415. return nil
  416. }
  417. }
  418. // Patch the current status on the API server
  419. updatedNode, _, err := nodeutil.PatchNodeStatus(kl.heartbeatClient.CoreV1(), types.NodeName(kl.nodeName), originalNode, node)
  420. if err != nil {
  421. return err
  422. }
  423. kl.lastStatusReportTime = now
  424. kl.setLastObservedNodeAddresses(updatedNode.Status.Addresses)
  425. // If update finishes successfully, mark the volumeInUse as reportedInUse to indicate
  426. // those volumes are already updated in the node's status
  427. kl.volumeManager.MarkVolumesAsReportedInUse(updatedNode.Status.VolumesInUse)
  428. return nil
  429. }
  430. // recordNodeStatusEvent records an event of the given type with the given
  431. // message for the node.
  432. func (kl *Kubelet) recordNodeStatusEvent(eventType, event string) {
  433. klog.V(2).Infof("Recording %s event message for node %s", event, kl.nodeName)
  434. // TODO: This requires a transaction, either both node status is updated
  435. // and event is recorded or neither should happen, see issue #6055.
  436. kl.recorder.Eventf(kl.nodeRef, eventType, event, "Node %s status is now: %s", kl.nodeName, event)
  437. }
  438. // recordEvent records an event for this node, the Kubelet's nodeRef is passed to the recorder
  439. func (kl *Kubelet) recordEvent(eventType, event, message string) {
  440. kl.recorder.Eventf(kl.nodeRef, eventType, event, message)
  441. }
  442. // record if node schedulable change.
  443. func (kl *Kubelet) recordNodeSchedulableEvent(node *v1.Node) error {
  444. kl.lastNodeUnschedulableLock.Lock()
  445. defer kl.lastNodeUnschedulableLock.Unlock()
  446. if kl.lastNodeUnschedulable != node.Spec.Unschedulable {
  447. if node.Spec.Unschedulable {
  448. kl.recordNodeStatusEvent(v1.EventTypeNormal, events.NodeNotSchedulable)
  449. } else {
  450. kl.recordNodeStatusEvent(v1.EventTypeNormal, events.NodeSchedulable)
  451. }
  452. kl.lastNodeUnschedulable = node.Spec.Unschedulable
  453. }
  454. return nil
  455. }
  456. // setNodeStatus fills in the Status fields of the given Node, overwriting
  457. // any fields that are currently set.
  458. // TODO(madhusudancs): Simplify the logic for setting node conditions and
  459. // refactor the node status condition code out to a different file.
  460. func (kl *Kubelet) setNodeStatus(node *v1.Node) {
  461. for i, f := range kl.setNodeStatusFuncs {
  462. klog.V(5).Infof("Setting node status at position %v", i)
  463. if err := f(node); err != nil {
  464. klog.Errorf("Failed to set some node status fields: %s", err)
  465. }
  466. }
  467. }
  468. func (kl *Kubelet) setLastObservedNodeAddresses(addresses []v1.NodeAddress) {
  469. kl.lastObservedNodeAddressesMux.Lock()
  470. defer kl.lastObservedNodeAddressesMux.Unlock()
  471. kl.lastObservedNodeAddresses = addresses
  472. }
  473. func (kl *Kubelet) getLastObservedNodeAddresses() []v1.NodeAddress {
  474. kl.lastObservedNodeAddressesMux.RLock()
  475. defer kl.lastObservedNodeAddressesMux.RUnlock()
  476. return kl.lastObservedNodeAddresses
  477. }
  478. // defaultNodeStatusFuncs is a factory that generates the default set of
  479. // setNodeStatus funcs
  480. func (kl *Kubelet) defaultNodeStatusFuncs() []func(*v1.Node) error {
  481. // if cloud is not nil, we expect the cloud resource sync manager to exist
  482. var nodeAddressesFunc func() ([]v1.NodeAddress, error)
  483. if kl.cloud != nil {
  484. nodeAddressesFunc = kl.cloudResourceSyncManager.NodeAddresses
  485. }
  486. var validateHostFunc func() error
  487. if kl.appArmorValidator != nil {
  488. validateHostFunc = kl.appArmorValidator.ValidateHost
  489. }
  490. var setters []func(n *v1.Node) error
  491. setters = append(setters,
  492. nodestatus.NodeAddress(kl.nodeIP, kl.nodeIPValidator, kl.hostname, kl.hostnameOverridden, kl.externalCloudProvider, kl.cloud, nodeAddressesFunc),
  493. nodestatus.MachineInfo(string(kl.nodeName), kl.maxPods, kl.podsPerCore, kl.GetCachedMachineInfo, kl.containerManager.GetCapacity,
  494. kl.containerManager.GetDevicePluginResourceCapacity, kl.containerManager.GetNodeAllocatableReservation, kl.recordEvent),
  495. nodestatus.VersionInfo(kl.cadvisor.VersionInfo, kl.containerRuntime.Type, kl.containerRuntime.Version),
  496. nodestatus.DaemonEndpoints(kl.daemonEndpoints),
  497. nodestatus.Images(kl.nodeStatusMaxImages, kl.imageManager.GetImageList),
  498. nodestatus.GoRuntime(),
  499. )
  500. // Volume limits
  501. setters = append(setters, nodestatus.VolumeLimits(kl.volumePluginMgr.ListVolumePluginWithLimits))
  502. setters = append(setters,
  503. nodestatus.MemoryPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderMemoryPressure, kl.recordNodeStatusEvent),
  504. nodestatus.DiskPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderDiskPressure, kl.recordNodeStatusEvent),
  505. nodestatus.PIDPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderPIDPressure, kl.recordNodeStatusEvent),
  506. nodestatus.ReadyCondition(kl.clock.Now, kl.runtimeState.runtimeErrors, kl.runtimeState.networkErrors, kl.runtimeState.storageErrors, validateHostFunc, kl.containerManager.Status, kl.recordNodeStatusEvent),
  507. nodestatus.VolumesInUse(kl.volumeManager.ReconcilerStatesHasBeenSynced, kl.volumeManager.GetVolumesInUse),
  508. // TODO(mtaufen): I decided not to move this setter for now, since all it does is send an event
  509. // and record state back to the Kubelet runtime object. In the future, I'd like to isolate
  510. // these side-effects by decoupling the decisions to send events and partial status recording
  511. // from the Node setters.
  512. kl.recordNodeSchedulableEvent,
  513. )
  514. return setters
  515. }
  516. // Validate given node IP belongs to the current host
  517. func validateNodeIP(nodeIP net.IP) error {
  518. // Honor IP limitations set in setNodeStatus()
  519. if nodeIP.To4() == nil && nodeIP.To16() == nil {
  520. return fmt.Errorf("nodeIP must be a valid IP address")
  521. }
  522. if nodeIP.IsLoopback() {
  523. return fmt.Errorf("nodeIP can't be loopback address")
  524. }
  525. if nodeIP.IsMulticast() {
  526. return fmt.Errorf("nodeIP can't be a multicast address")
  527. }
  528. if nodeIP.IsLinkLocalUnicast() {
  529. return fmt.Errorf("nodeIP can't be a link-local unicast address")
  530. }
  531. if nodeIP.IsUnspecified() {
  532. return fmt.Errorf("nodeIP can't be an all zeros address")
  533. }
  534. addrs, err := net.InterfaceAddrs()
  535. if err != nil {
  536. return err
  537. }
  538. for _, addr := range addrs {
  539. var ip net.IP
  540. switch v := addr.(type) {
  541. case *net.IPNet:
  542. ip = v.IP
  543. case *net.IPAddr:
  544. ip = v.IP
  545. }
  546. if ip != nil && ip.Equal(nodeIP) {
  547. return nil
  548. }
  549. }
  550. return fmt.Errorf("node IP: %q not found in the host's network interfaces", nodeIP.String())
  551. }
  552. // nodeStatusHasChanged compares the original node and current node's status and
  553. // returns true if any change happens. The heartbeat timestamp is ignored.
  554. func nodeStatusHasChanged(originalStatus *v1.NodeStatus, status *v1.NodeStatus) bool {
  555. if originalStatus == nil && status == nil {
  556. return false
  557. }
  558. if originalStatus == nil || status == nil {
  559. return true
  560. }
  561. // Compare node conditions here because we need to ignore the heartbeat timestamp.
  562. if nodeConditionsHaveChanged(originalStatus.Conditions, status.Conditions) {
  563. return true
  564. }
  565. // Compare other fields of NodeStatus.
  566. originalStatusCopy := originalStatus.DeepCopy()
  567. statusCopy := status.DeepCopy()
  568. originalStatusCopy.Conditions = nil
  569. statusCopy.Conditions = nil
  570. return !apiequality.Semantic.DeepEqual(originalStatusCopy, statusCopy)
  571. }
  572. // nodeConditionsHaveChanged compares the original node and current node's
  573. // conditions and returns true if any change happens. The heartbeat timestamp is
  574. // ignored.
  575. func nodeConditionsHaveChanged(originalConditions []v1.NodeCondition, conditions []v1.NodeCondition) bool {
  576. if len(originalConditions) != len(conditions) {
  577. return true
  578. }
  579. originalConditionsCopy := make([]v1.NodeCondition, 0, len(originalConditions))
  580. originalConditionsCopy = append(originalConditionsCopy, originalConditions...)
  581. conditionsCopy := make([]v1.NodeCondition, 0, len(conditions))
  582. conditionsCopy = append(conditionsCopy, conditions...)
  583. sort.SliceStable(originalConditionsCopy, func(i, j int) bool { return originalConditionsCopy[i].Type < originalConditionsCopy[j].Type })
  584. sort.SliceStable(conditionsCopy, func(i, j int) bool { return conditionsCopy[i].Type < conditionsCopy[j].Type })
  585. replacedheartbeatTime := metav1.Time{}
  586. for i := range conditionsCopy {
  587. originalConditionsCopy[i].LastHeartbeatTime = replacedheartbeatTime
  588. conditionsCopy[i].LastHeartbeatTime = replacedheartbeatTime
  589. if !apiequality.Semantic.DeepEqual(&originalConditionsCopy[i], &conditionsCopy[i]) {
  590. return true
  591. }
  592. }
  593. return false
  594. }