ttl_controller.go 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // The TTLController sets ttl annotations on nodes, based on cluster size.
  14. // The annotations are consumed by Kubelets as suggestions for how long
  15. // it can cache objects (e.g. secrets or config maps) before refetching
  16. // from apiserver again.
  17. //
  18. // TODO: This is a temporary workaround for the Kubelet not being able to
  19. // send "watch secrets attached to pods from my node" request. Once
  20. // sending such request will be possible, we will modify Kubelet to
  21. // use it and get rid of this controller completely.
  22. package ttl
  23. import (
  24. "context"
  25. "fmt"
  26. "math"
  27. "strconv"
  28. "sync"
  29. "time"
  30. "k8s.io/api/core/v1"
  31. apierrors "k8s.io/apimachinery/pkg/api/errors"
  32. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  33. "k8s.io/apimachinery/pkg/types"
  34. "k8s.io/apimachinery/pkg/util/json"
  35. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  36. "k8s.io/apimachinery/pkg/util/strategicpatch"
  37. "k8s.io/apimachinery/pkg/util/wait"
  38. informers "k8s.io/client-go/informers/core/v1"
  39. clientset "k8s.io/client-go/kubernetes"
  40. listers "k8s.io/client-go/listers/core/v1"
  41. "k8s.io/client-go/tools/cache"
  42. "k8s.io/client-go/util/workqueue"
  43. "k8s.io/kubernetes/pkg/controller"
  44. "k8s.io/klog"
  45. )
  46. type TTLController struct {
  47. kubeClient clientset.Interface
  48. // nodeStore is a local cache of nodes.
  49. nodeStore listers.NodeLister
  50. // Nodes that need to be synced.
  51. queue workqueue.RateLimitingInterface
  52. // Returns true if all underlying informers are synced.
  53. hasSynced func() bool
  54. lock sync.RWMutex
  55. // Number of nodes in the cluster.
  56. nodeCount int
  57. // Desired TTL for all nodes in the cluster.
  58. desiredTTLSeconds int
  59. // In which interval of cluster size we currently are.
  60. boundaryStep int
  61. }
  62. func NewTTLController(nodeInformer informers.NodeInformer, kubeClient clientset.Interface) *TTLController {
  63. ttlc := &TTLController{
  64. kubeClient: kubeClient,
  65. queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ttlcontroller"),
  66. }
  67. nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  68. AddFunc: ttlc.addNode,
  69. UpdateFunc: ttlc.updateNode,
  70. DeleteFunc: ttlc.deleteNode,
  71. })
  72. ttlc.nodeStore = listers.NewNodeLister(nodeInformer.Informer().GetIndexer())
  73. ttlc.hasSynced = nodeInformer.Informer().HasSynced
  74. return ttlc
  75. }
  76. type ttlBoundary struct {
  77. sizeMin int
  78. sizeMax int
  79. ttlSeconds int
  80. }
  81. var (
  82. ttlBoundaries = []ttlBoundary{
  83. {sizeMin: 0, sizeMax: 100, ttlSeconds: 0},
  84. {sizeMin: 90, sizeMax: 500, ttlSeconds: 15},
  85. {sizeMin: 450, sizeMax: 1000, ttlSeconds: 30},
  86. {sizeMin: 900, sizeMax: 2000, ttlSeconds: 60},
  87. {sizeMin: 1800, sizeMax: 10000, ttlSeconds: 300},
  88. {sizeMin: 9000, sizeMax: math.MaxInt32, ttlSeconds: 600},
  89. }
  90. )
  91. func (ttlc *TTLController) Run(workers int, stopCh <-chan struct{}) {
  92. defer utilruntime.HandleCrash()
  93. defer ttlc.queue.ShutDown()
  94. klog.Infof("Starting TTL controller")
  95. defer klog.Infof("Shutting down TTL controller")
  96. if !cache.WaitForNamedCacheSync("TTL", stopCh, ttlc.hasSynced) {
  97. return
  98. }
  99. for i := 0; i < workers; i++ {
  100. go wait.Until(ttlc.worker, time.Second, stopCh)
  101. }
  102. <-stopCh
  103. }
  104. func (ttlc *TTLController) addNode(obj interface{}) {
  105. node, ok := obj.(*v1.Node)
  106. if !ok {
  107. utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
  108. return
  109. }
  110. func() {
  111. ttlc.lock.Lock()
  112. defer ttlc.lock.Unlock()
  113. ttlc.nodeCount++
  114. if ttlc.nodeCount > ttlBoundaries[ttlc.boundaryStep].sizeMax {
  115. ttlc.boundaryStep++
  116. ttlc.desiredTTLSeconds = ttlBoundaries[ttlc.boundaryStep].ttlSeconds
  117. }
  118. }()
  119. ttlc.enqueueNode(node)
  120. }
  121. func (ttlc *TTLController) updateNode(_, newObj interface{}) {
  122. node, ok := newObj.(*v1.Node)
  123. if !ok {
  124. utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj))
  125. return
  126. }
  127. // Processing all updates of nodes guarantees that we will update
  128. // the ttl annotation, when cluster size changes.
  129. // We are relying on the fact that Kubelet is updating node status
  130. // every 10s (or generally every X seconds), which means that whenever
  131. // required, its ttl annotation should be updated within that period.
  132. ttlc.enqueueNode(node)
  133. }
  134. func (ttlc *TTLController) deleteNode(obj interface{}) {
  135. _, ok := obj.(*v1.Node)
  136. if !ok {
  137. tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
  138. if !ok {
  139. utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
  140. return
  141. }
  142. _, ok = tombstone.Obj.(*v1.Node)
  143. if !ok {
  144. utilruntime.HandleError(fmt.Errorf("unexpected object types: %v", obj))
  145. return
  146. }
  147. }
  148. func() {
  149. ttlc.lock.Lock()
  150. defer ttlc.lock.Unlock()
  151. ttlc.nodeCount--
  152. if ttlc.nodeCount < ttlBoundaries[ttlc.boundaryStep].sizeMin {
  153. ttlc.boundaryStep--
  154. ttlc.desiredTTLSeconds = ttlBoundaries[ttlc.boundaryStep].ttlSeconds
  155. }
  156. }()
  157. // We are not processing the node, as it no longer exists.
  158. }
  159. func (ttlc *TTLController) enqueueNode(node *v1.Node) {
  160. key, err := controller.KeyFunc(node)
  161. if err != nil {
  162. klog.Errorf("Couldn't get key for object %+v", node)
  163. return
  164. }
  165. ttlc.queue.Add(key)
  166. }
  167. func (ttlc *TTLController) worker() {
  168. for ttlc.processItem() {
  169. }
  170. }
  171. func (ttlc *TTLController) processItem() bool {
  172. key, quit := ttlc.queue.Get()
  173. if quit {
  174. return false
  175. }
  176. defer ttlc.queue.Done(key)
  177. err := ttlc.updateNodeIfNeeded(key.(string))
  178. if err == nil {
  179. ttlc.queue.Forget(key)
  180. return true
  181. }
  182. ttlc.queue.AddRateLimited(key)
  183. utilruntime.HandleError(err)
  184. return true
  185. }
  186. func (ttlc *TTLController) getDesiredTTLSeconds() int {
  187. ttlc.lock.RLock()
  188. defer ttlc.lock.RUnlock()
  189. return ttlc.desiredTTLSeconds
  190. }
  191. func getIntFromAnnotation(node *v1.Node, annotationKey string) (int, bool) {
  192. if node.Annotations == nil {
  193. return 0, false
  194. }
  195. annotationValue, ok := node.Annotations[annotationKey]
  196. if !ok {
  197. return 0, false
  198. }
  199. intValue, err := strconv.Atoi(annotationValue)
  200. if err != nil {
  201. klog.Warningf("Cannot convert the value %q with annotation key %q for the node %q",
  202. annotationValue, annotationKey, node.Name)
  203. return 0, false
  204. }
  205. return intValue, true
  206. }
  207. func setIntAnnotation(node *v1.Node, annotationKey string, value int) {
  208. if node.Annotations == nil {
  209. node.Annotations = make(map[string]string)
  210. }
  211. node.Annotations[annotationKey] = strconv.Itoa(value)
  212. }
  213. func (ttlc *TTLController) patchNodeWithAnnotation(node *v1.Node, annotationKey string, value int) error {
  214. oldData, err := json.Marshal(node)
  215. if err != nil {
  216. return err
  217. }
  218. setIntAnnotation(node, annotationKey, value)
  219. newData, err := json.Marshal(node)
  220. if err != nil {
  221. return err
  222. }
  223. patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, &v1.Node{})
  224. if err != nil {
  225. return err
  226. }
  227. _, err = ttlc.kubeClient.CoreV1().Nodes().Patch(context.TODO(), node.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
  228. if err != nil {
  229. klog.V(2).Infof("Failed to change ttl annotation for node %s: %v", node.Name, err)
  230. return err
  231. }
  232. klog.V(2).Infof("Changed ttl annotation for node %s to %d seconds", node.Name, value)
  233. return nil
  234. }
  235. func (ttlc *TTLController) updateNodeIfNeeded(key string) error {
  236. node, err := ttlc.nodeStore.Get(key)
  237. if err != nil {
  238. if apierrors.IsNotFound(err) {
  239. return nil
  240. }
  241. return err
  242. }
  243. desiredTTL := ttlc.getDesiredTTLSeconds()
  244. currentTTL, ok := getIntFromAnnotation(node, v1.ObjectTTLAnnotationKey)
  245. if ok && currentTTL == desiredTTL {
  246. return nil
  247. }
  248. return ttlc.patchNodeWithAnnotation(node.DeepCopy(), v1.ObjectTTLAnnotationKey, desiredTTL)
  249. }