status.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package status
  14. import (
  15. "fmt"
  16. "sync"
  17. apiv1 "k8s.io/api/core/v1"
  18. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  19. "k8s.io/apimachinery/pkg/types"
  20. clientset "k8s.io/client-go/kubernetes"
  21. utillog "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/log"
  22. "k8s.io/kubernetes/pkg/kubelet/metrics"
  23. nodeutil "k8s.io/kubernetes/pkg/util/node"
  24. )
  25. const (
  26. // LoadError indicates that the Kubelet failed to load the config checkpoint
  27. LoadError = "failed to load config, see Kubelet log for details"
  28. // ValidateError indicates that the Kubelet failed to validate the config checkpoint
  29. ValidateError = "failed to validate config, see Kubelet log for details"
  30. // AllNilSubfieldsError is used when no subfields are set
  31. // This could happen in the case that an old client tries to read an object from a newer API server with a set subfield it does not know about
  32. AllNilSubfieldsError = "invalid NodeConfigSource, exactly one subfield must be non-nil, but all were nil"
  33. // DownloadError is used when the download fails, e.g. due to network issues
  34. DownloadError = "failed to download config, see Kubelet log for details"
  35. // InternalError indicates that some internal error happened while trying to sync config, e.g. filesystem issues
  36. InternalError = "internal failure, see Kubelet log for details"
  37. // SyncErrorFmt is used when the system couldn't sync the config, due to a malformed Node.Spec.ConfigSource, a download failure, etc.
  38. SyncErrorFmt = "failed to sync: %s"
  39. )
  40. // NodeConfigStatus represents Node.Status.Config
  41. type NodeConfigStatus interface {
  42. // SetActive sets the active source in the status
  43. SetActive(source *apiv1.NodeConfigSource)
  44. // SetAssigned sets the assigned source in the status
  45. SetAssigned(source *apiv1.NodeConfigSource)
  46. // SetLastKnownGood sets the last-known-good source in the status
  47. SetLastKnownGood(source *apiv1.NodeConfigSource)
  48. // SetError sets the error associated with the status
  49. SetError(err string)
  50. // SetErrorOverride sets an error that overrides the base error set by SetError.
  51. // If the override is set to the empty string, the base error is reported in
  52. // the status, otherwise the override is reported.
  53. SetErrorOverride(err string)
  54. // Sync patches the current status into the Node identified by `nodeName` if an update is pending
  55. Sync(client clientset.Interface, nodeName string)
  56. }
  57. type nodeConfigStatus struct {
  58. // status is the core NodeConfigStatus that we report
  59. status apiv1.NodeConfigStatus
  60. // mux is a mutex on the nodeConfigStatus, alternate between setting and syncing the status
  61. mux sync.Mutex
  62. // errorOverride is sent in place of the usual error if it is non-empty
  63. errorOverride string
  64. // syncCh; write to this channel to indicate that the status needs to be synced to the API server
  65. syncCh chan bool
  66. }
  67. // NewNodeConfigStatus returns a new NodeConfigStatus interface
  68. func NewNodeConfigStatus() NodeConfigStatus {
  69. // channels must have capacity at least 1, since we signal with non-blocking writes
  70. syncCh := make(chan bool, 1)
  71. // prime new status managers to sync with the API server on the first call to Sync
  72. syncCh <- true
  73. return &nodeConfigStatus{
  74. syncCh: syncCh,
  75. }
  76. }
  77. // transact grabs the lock, performs the fn, records the need to sync, and releases the lock
  78. func (s *nodeConfigStatus) transact(fn func()) {
  79. s.mux.Lock()
  80. defer s.mux.Unlock()
  81. fn()
  82. s.sync()
  83. }
  84. func (s *nodeConfigStatus) SetAssigned(source *apiv1.NodeConfigSource) {
  85. s.transact(func() {
  86. s.status.Assigned = source
  87. })
  88. }
  89. func (s *nodeConfigStatus) SetActive(source *apiv1.NodeConfigSource) {
  90. s.transact(func() {
  91. s.status.Active = source
  92. })
  93. }
  94. func (s *nodeConfigStatus) SetLastKnownGood(source *apiv1.NodeConfigSource) {
  95. s.transact(func() {
  96. s.status.LastKnownGood = source
  97. })
  98. }
  99. func (s *nodeConfigStatus) SetError(err string) {
  100. s.transact(func() {
  101. s.status.Error = err
  102. })
  103. }
  104. func (s *nodeConfigStatus) SetErrorOverride(err string) {
  105. s.transact(func() {
  106. s.errorOverride = err
  107. })
  108. }
  109. // sync notes that the status needs to be synced to the API server
  110. func (s *nodeConfigStatus) sync() {
  111. select {
  112. case s.syncCh <- true:
  113. default:
  114. }
  115. }
  116. // Sync attempts to sync the status with the Node object for this Kubelet,
  117. // if syncing fails, an error is logged, and work is queued for retry.
  118. func (s *nodeConfigStatus) Sync(client clientset.Interface, nodeName string) {
  119. select {
  120. case <-s.syncCh:
  121. default:
  122. // no work to be done, return
  123. return
  124. }
  125. utillog.Infof("updating Node.Status.Config")
  126. // grab the lock
  127. s.mux.Lock()
  128. defer s.mux.Unlock()
  129. // if the sync fails, we want to retry
  130. var err error
  131. defer func() {
  132. if err != nil {
  133. utillog.Errorf(err.Error())
  134. s.sync()
  135. }
  136. }()
  137. // get the Node so we can check the current status
  138. oldNode, err := client.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{})
  139. if err != nil {
  140. err = fmt.Errorf("could not get Node %q, will not sync status, error: %v", nodeName, err)
  141. return
  142. }
  143. status := &s.status
  144. // override error, if necessary
  145. if len(s.errorOverride) > 0 {
  146. // copy the status, so we don't overwrite the prior error
  147. // with the override
  148. status = status.DeepCopy()
  149. status.Error = s.errorOverride
  150. }
  151. // update metrics based on the status we will sync
  152. metrics.SetConfigError(len(status.Error) > 0)
  153. err = metrics.SetAssignedConfig(status.Assigned)
  154. if err != nil {
  155. err = fmt.Errorf("failed to update Assigned config metric, error: %v", err)
  156. return
  157. }
  158. err = metrics.SetActiveConfig(status.Active)
  159. if err != nil {
  160. err = fmt.Errorf("failed to update Active config metric, error: %v", err)
  161. return
  162. }
  163. err = metrics.SetLastKnownGoodConfig(status.LastKnownGood)
  164. if err != nil {
  165. err = fmt.Errorf("failed to update LastKnownGood config metric, error: %v", err)
  166. return
  167. }
  168. // apply the status to a copy of the node so we don't modify the object in the informer's store
  169. newNode := oldNode.DeepCopy()
  170. newNode.Status.Config = status
  171. // patch the node with the new status
  172. if _, _, err := nodeutil.PatchNodeStatus(client.CoreV1(), types.NodeName(nodeName), oldNode, newNode); err != nil {
  173. utillog.Errorf("failed to patch node status, error: %v", err)
  174. }
  175. }