status.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package status
  14. import (
  15. "context"
  16. "fmt"
  17. "sync"
  18. apiv1 "k8s.io/api/core/v1"
  19. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  20. "k8s.io/apimachinery/pkg/types"
  21. clientset "k8s.io/client-go/kubernetes"
  22. utillog "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/log"
  23. "k8s.io/kubernetes/pkg/kubelet/metrics"
  24. nodeutil "k8s.io/kubernetes/pkg/util/node"
  25. )
  26. const (
  27. // LoadError indicates that the Kubelet failed to load the config checkpoint
  28. LoadError = "failed to load config, see Kubelet log for details"
  29. // ValidateError indicates that the Kubelet failed to validate the config checkpoint
  30. ValidateError = "failed to validate config, see Kubelet log for details"
  31. // AllNilSubfieldsError is used when no subfields are set
  32. // This could happen in the case that an old client tries to read an object from a newer API server with a set subfield it does not know about
  33. AllNilSubfieldsError = "invalid NodeConfigSource, exactly one subfield must be non-nil, but all were nil"
  34. // DownloadError is used when the download fails, e.g. due to network issues
  35. DownloadError = "failed to download config, see Kubelet log for details"
  36. // InternalError indicates that some internal error happened while trying to sync config, e.g. filesystem issues
  37. InternalError = "internal failure, see Kubelet log for details"
  38. // SyncErrorFmt is used when the system couldn't sync the config, due to a malformed Node.Spec.ConfigSource, a download failure, etc.
  39. SyncErrorFmt = "failed to sync: %s"
  40. )
  41. // NodeConfigStatus represents Node.Status.Config
  42. type NodeConfigStatus interface {
  43. // SetActive sets the active source in the status
  44. SetActive(source *apiv1.NodeConfigSource)
  45. // SetAssigned sets the assigned source in the status
  46. SetAssigned(source *apiv1.NodeConfigSource)
  47. // SetLastKnownGood sets the last-known-good source in the status
  48. SetLastKnownGood(source *apiv1.NodeConfigSource)
  49. // SetError sets the error associated with the status
  50. SetError(err string)
  51. // SetErrorOverride sets an error that overrides the base error set by SetError.
  52. // If the override is set to the empty string, the base error is reported in
  53. // the status, otherwise the override is reported.
  54. SetErrorOverride(err string)
  55. // Sync patches the current status into the Node identified by `nodeName` if an update is pending
  56. Sync(client clientset.Interface, nodeName string)
  57. }
  58. type nodeConfigStatus struct {
  59. // status is the core NodeConfigStatus that we report
  60. status apiv1.NodeConfigStatus
  61. // mux is a mutex on the nodeConfigStatus, alternate between setting and syncing the status
  62. mux sync.Mutex
  63. // errorOverride is sent in place of the usual error if it is non-empty
  64. errorOverride string
  65. // syncCh; write to this channel to indicate that the status needs to be synced to the API server
  66. syncCh chan bool
  67. }
  68. // NewNodeConfigStatus returns a new NodeConfigStatus interface
  69. func NewNodeConfigStatus() NodeConfigStatus {
  70. // channels must have capacity at least 1, since we signal with non-blocking writes
  71. syncCh := make(chan bool, 1)
  72. // prime new status managers to sync with the API server on the first call to Sync
  73. syncCh <- true
  74. return &nodeConfigStatus{
  75. syncCh: syncCh,
  76. }
  77. }
  78. // transact grabs the lock, performs the fn, records the need to sync, and releases the lock
  79. func (s *nodeConfigStatus) transact(fn func()) {
  80. s.mux.Lock()
  81. defer s.mux.Unlock()
  82. fn()
  83. s.sync()
  84. }
  85. func (s *nodeConfigStatus) SetAssigned(source *apiv1.NodeConfigSource) {
  86. s.transact(func() {
  87. s.status.Assigned = source
  88. })
  89. }
  90. func (s *nodeConfigStatus) SetActive(source *apiv1.NodeConfigSource) {
  91. s.transact(func() {
  92. s.status.Active = source
  93. })
  94. }
  95. func (s *nodeConfigStatus) SetLastKnownGood(source *apiv1.NodeConfigSource) {
  96. s.transact(func() {
  97. s.status.LastKnownGood = source
  98. })
  99. }
  100. func (s *nodeConfigStatus) SetError(err string) {
  101. s.transact(func() {
  102. s.status.Error = err
  103. })
  104. }
  105. func (s *nodeConfigStatus) SetErrorOverride(err string) {
  106. s.transact(func() {
  107. s.errorOverride = err
  108. })
  109. }
  110. // sync notes that the status needs to be synced to the API server
  111. func (s *nodeConfigStatus) sync() {
  112. select {
  113. case s.syncCh <- true:
  114. default:
  115. }
  116. }
  117. // Sync attempts to sync the status with the Node object for this Kubelet,
  118. // if syncing fails, an error is logged, and work is queued for retry.
  119. func (s *nodeConfigStatus) Sync(client clientset.Interface, nodeName string) {
  120. select {
  121. case <-s.syncCh:
  122. default:
  123. // no work to be done, return
  124. return
  125. }
  126. utillog.Infof("updating Node.Status.Config")
  127. // grab the lock
  128. s.mux.Lock()
  129. defer s.mux.Unlock()
  130. // if the sync fails, we want to retry
  131. var err error
  132. defer func() {
  133. if err != nil {
  134. utillog.Errorf(err.Error())
  135. s.sync()
  136. }
  137. }()
  138. // get the Node so we can check the current status
  139. oldNode, err := client.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
  140. if err != nil {
  141. err = fmt.Errorf("could not get Node %q, will not sync status, error: %v", nodeName, err)
  142. return
  143. }
  144. status := &s.status
  145. // override error, if necessary
  146. if len(s.errorOverride) > 0 {
  147. // copy the status, so we don't overwrite the prior error
  148. // with the override
  149. status = status.DeepCopy()
  150. status.Error = s.errorOverride
  151. }
  152. // update metrics based on the status we will sync
  153. metrics.SetConfigError(len(status.Error) > 0)
  154. err = metrics.SetAssignedConfig(status.Assigned)
  155. if err != nil {
  156. err = fmt.Errorf("failed to update Assigned config metric, error: %v", err)
  157. return
  158. }
  159. err = metrics.SetActiveConfig(status.Active)
  160. if err != nil {
  161. err = fmt.Errorf("failed to update Active config metric, error: %v", err)
  162. return
  163. }
  164. err = metrics.SetLastKnownGoodConfig(status.LastKnownGood)
  165. if err != nil {
  166. err = fmt.Errorf("failed to update LastKnownGood config metric, error: %v", err)
  167. return
  168. }
  169. // apply the status to a copy of the node so we don't modify the object in the informer's store
  170. newNode := oldNode.DeepCopy()
  171. newNode.Status.Config = status
  172. // patch the node with the new status
  173. if _, _, err := nodeutil.PatchNodeStatus(client.CoreV1(), types.NodeName(nodeName), oldNode, newNode); err != nil {
  174. utillog.Errorf("failed to patch node status, error: %v", err)
  175. }
  176. }