controller.go 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. /*
  2. Copyright 2018 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package nodelease
  14. import (
  15. "context"
  16. "fmt"
  17. "time"
  18. coordinationv1 "k8s.io/api/coordination/v1"
  19. corev1 "k8s.io/api/core/v1"
  20. apierrors "k8s.io/apimachinery/pkg/api/errors"
  21. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  22. "k8s.io/apimachinery/pkg/util/clock"
  23. "k8s.io/apimachinery/pkg/util/wait"
  24. clientset "k8s.io/client-go/kubernetes"
  25. coordclientset "k8s.io/client-go/kubernetes/typed/coordination/v1"
  26. "k8s.io/utils/pointer"
  27. "k8s.io/klog"
  28. )
  29. const (
  30. // renewIntervalFraction is the fraction of lease duration to renew the lease
  31. renewIntervalFraction = 0.25
  32. // maxUpdateRetries is the number of immediate, successive retries the Kubelet will attempt
  33. // when renewing the lease before it waits for the renewal interval before trying again,
  34. // similar to what we do for node status retries
  35. maxUpdateRetries = 5
  36. // maxBackoff is the maximum sleep time during backoff (e.g. in backoffEnsureLease)
  37. maxBackoff = 7 * time.Second
  38. )
  39. // Controller manages creating and renewing the lease for this Kubelet
  40. type Controller interface {
  41. Run(stopCh <-chan struct{})
  42. }
  43. type controller struct {
  44. client clientset.Interface
  45. leaseClient coordclientset.LeaseInterface
  46. holderIdentity string
  47. leaseDurationSeconds int32
  48. renewInterval time.Duration
  49. clock clock.Clock
  50. onRepeatedHeartbeatFailure func()
  51. // latestLease is the latest node lease which Kubelet updated or created
  52. latestLease *coordinationv1.Lease
  53. }
  54. // NewController constructs and returns a controller
  55. func NewController(clock clock.Clock, client clientset.Interface, holderIdentity string, leaseDurationSeconds int32, onRepeatedHeartbeatFailure func()) Controller {
  56. var leaseClient coordclientset.LeaseInterface
  57. if client != nil {
  58. leaseClient = client.CoordinationV1().Leases(corev1.NamespaceNodeLease)
  59. }
  60. leaseDuration := time.Duration(leaseDurationSeconds) * time.Second
  61. return &controller{
  62. client: client,
  63. leaseClient: leaseClient,
  64. holderIdentity: holderIdentity,
  65. leaseDurationSeconds: leaseDurationSeconds,
  66. renewInterval: time.Duration(float64(leaseDuration) * renewIntervalFraction),
  67. clock: clock,
  68. onRepeatedHeartbeatFailure: onRepeatedHeartbeatFailure,
  69. }
  70. }
  71. // Run runs the controller
  72. func (c *controller) Run(stopCh <-chan struct{}) {
  73. if c.leaseClient == nil {
  74. klog.Infof("node lease controller has nil lease client, will not claim or renew leases")
  75. return
  76. }
  77. wait.Until(c.sync, c.renewInterval, stopCh)
  78. }
  79. func (c *controller) sync() {
  80. if c.latestLease != nil {
  81. // As long as node lease is not (or very rarely) updated by any other agent than Kubelet,
  82. // we can optimistically assume it didn't change since our last update and try updating
  83. // based on the version from that time. Thanks to it we avoid GET call and reduce load
  84. // on etcd and kube-apiserver.
  85. // If at some point other agents will also be frequently updating the Lease object, this
  86. // can result in performance degradation, because we will end up with calling additional
  87. // GET/PUT - at this point this whole "if" should be removed.
  88. err := c.retryUpdateLease(c.newLease(c.latestLease))
  89. if err == nil {
  90. return
  91. }
  92. klog.Infof("failed to update lease using latest lease, fallback to ensure lease, err: %v", err)
  93. }
  94. lease, created := c.backoffEnsureLease()
  95. c.latestLease = lease
  96. // we don't need to update the lease if we just created it
  97. if !created && lease != nil {
  98. if err := c.retryUpdateLease(lease); err != nil {
  99. klog.Errorf("%v, will retry after %v", err, c.renewInterval)
  100. }
  101. }
  102. }
  103. // backoffEnsureLease attempts to create the lease if it does not exist,
  104. // and uses exponentially increasing waits to prevent overloading the API server
  105. // with retries. Returns the lease, and true if this call created the lease,
  106. // false otherwise.
  107. func (c *controller) backoffEnsureLease() (*coordinationv1.Lease, bool) {
  108. var (
  109. lease *coordinationv1.Lease
  110. created bool
  111. err error
  112. )
  113. sleep := 100 * time.Millisecond
  114. for {
  115. lease, created, err = c.ensureLease()
  116. if err == nil {
  117. break
  118. }
  119. sleep = minDuration(2*sleep, maxBackoff)
  120. klog.Errorf("failed to ensure node lease exists, will retry in %v, error: %v", sleep, err)
  121. // backoff wait
  122. c.clock.Sleep(sleep)
  123. }
  124. return lease, created
  125. }
  126. // ensureLease creates the lease if it does not exist. Returns the lease and
  127. // a bool (true if this call created the lease), or any error that occurs.
  128. func (c *controller) ensureLease() (*coordinationv1.Lease, bool, error) {
  129. lease, err := c.leaseClient.Get(context.TODO(), c.holderIdentity, metav1.GetOptions{})
  130. if apierrors.IsNotFound(err) {
  131. // lease does not exist, create it.
  132. leaseToCreate := c.newLease(nil)
  133. if len(leaseToCreate.OwnerReferences) == 0 {
  134. // We want to ensure that a lease will always have OwnerReferences set.
  135. // Thus, given that we weren't able to set it correctly, we simply
  136. // not create it this time - we will retry in the next iteration.
  137. return nil, false, nil
  138. }
  139. lease, err := c.leaseClient.Create(context.TODO(), leaseToCreate, metav1.CreateOptions{})
  140. if err != nil {
  141. return nil, false, err
  142. }
  143. return lease, true, nil
  144. } else if err != nil {
  145. // unexpected error getting lease
  146. return nil, false, err
  147. }
  148. // lease already existed
  149. return lease, false, nil
  150. }
  151. // retryUpdateLease attempts to update the lease for maxUpdateRetries,
  152. // call this once you're sure the lease has been created
  153. func (c *controller) retryUpdateLease(base *coordinationv1.Lease) error {
  154. for i := 0; i < maxUpdateRetries; i++ {
  155. lease, err := c.leaseClient.Update(context.TODO(), c.newLease(base), metav1.UpdateOptions{})
  156. if err == nil {
  157. c.latestLease = lease
  158. return nil
  159. }
  160. klog.Errorf("failed to update node lease, error: %v", err)
  161. // OptimisticLockError requires getting the newer version of lease to proceed.
  162. if apierrors.IsConflict(err) {
  163. base, _ = c.backoffEnsureLease()
  164. continue
  165. }
  166. if i > 0 && c.onRepeatedHeartbeatFailure != nil {
  167. c.onRepeatedHeartbeatFailure()
  168. }
  169. }
  170. return fmt.Errorf("failed %d attempts to update node lease", maxUpdateRetries)
  171. }
  172. // newLease constructs a new lease if base is nil, or returns a copy of base
  173. // with desired state asserted on the copy.
  174. func (c *controller) newLease(base *coordinationv1.Lease) *coordinationv1.Lease {
  175. // Use the bare minimum set of fields; other fields exist for debugging/legacy,
  176. // but we don't need to make node heartbeats more complicated by using them.
  177. var lease *coordinationv1.Lease
  178. if base == nil {
  179. lease = &coordinationv1.Lease{
  180. ObjectMeta: metav1.ObjectMeta{
  181. Name: c.holderIdentity,
  182. Namespace: corev1.NamespaceNodeLease,
  183. },
  184. Spec: coordinationv1.LeaseSpec{
  185. HolderIdentity: pointer.StringPtr(c.holderIdentity),
  186. LeaseDurationSeconds: pointer.Int32Ptr(c.leaseDurationSeconds),
  187. },
  188. }
  189. } else {
  190. lease = base.DeepCopy()
  191. }
  192. lease.Spec.RenewTime = &metav1.MicroTime{Time: c.clock.Now()}
  193. // Setting owner reference needs node's UID. Note that it is different from
  194. // kubelet.nodeRef.UID. When lease is initially created, it is possible that
  195. // the connection between master and node is not ready yet. So try to set
  196. // owner reference every time when renewing the lease, until successful.
  197. if len(lease.OwnerReferences) == 0 {
  198. if node, err := c.client.CoreV1().Nodes().Get(context.TODO(), c.holderIdentity, metav1.GetOptions{}); err == nil {
  199. lease.OwnerReferences = []metav1.OwnerReference{
  200. {
  201. APIVersion: corev1.SchemeGroupVersion.WithKind("Node").Version,
  202. Kind: corev1.SchemeGroupVersion.WithKind("Node").Kind,
  203. Name: c.holderIdentity,
  204. UID: node.UID,
  205. },
  206. }
  207. } else {
  208. klog.Errorf("failed to get node %q when trying to set owner ref to the node lease: %v", c.holderIdentity, err)
  209. }
  210. }
  211. return lease
  212. }
  213. func minDuration(a, b time.Duration) time.Duration {
  214. if a < b {
  215. return a
  216. }
  217. return b
  218. }