ttl_controller.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // The TTLController sets ttl annotations on nodes, based on cluster size.
  14. // The annotations are consumed by Kubelets as suggestions for how long
  15. // it can cache objects (e.g. secrets or config maps) before refetching
  16. // from apiserver again.
  17. //
  18. // TODO: This is a temporary workaround for the Kubelet not being able to
  19. // send "watch secrets attached to pods from my node" request. Once
  20. // sending such request will be possible, we will modify Kubelet to
  21. // use it and get rid of this controller completely.
  22. package ttl
  23. import (
  24. "fmt"
  25. "math"
  26. "strconv"
  27. "sync"
  28. "time"
  29. "k8s.io/api/core/v1"
  30. apierrors "k8s.io/apimachinery/pkg/api/errors"
  31. "k8s.io/apimachinery/pkg/types"
  32. "k8s.io/apimachinery/pkg/util/json"
  33. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  34. "k8s.io/apimachinery/pkg/util/strategicpatch"
  35. "k8s.io/apimachinery/pkg/util/wait"
  36. informers "k8s.io/client-go/informers/core/v1"
  37. clientset "k8s.io/client-go/kubernetes"
  38. listers "k8s.io/client-go/listers/core/v1"
  39. "k8s.io/client-go/tools/cache"
  40. "k8s.io/client-go/util/workqueue"
  41. "k8s.io/kubernetes/pkg/controller"
  42. "k8s.io/klog"
  43. )
  44. type TTLController struct {
  45. kubeClient clientset.Interface
  46. // nodeStore is a local cache of nodes.
  47. nodeStore listers.NodeLister
  48. // Nodes that need to be synced.
  49. queue workqueue.RateLimitingInterface
  50. // Returns true if all underlying informers are synced.
  51. hasSynced func() bool
  52. lock sync.RWMutex
  53. // Number of nodes in the cluster.
  54. nodeCount int
  55. // Desired TTL for all nodes in the cluster.
  56. desiredTTLSeconds int
  57. // In which interval of cluster size we currently are.
  58. boundaryStep int
  59. }
  60. func NewTTLController(nodeInformer informers.NodeInformer, kubeClient clientset.Interface) *TTLController {
  61. ttlc := &TTLController{
  62. kubeClient: kubeClient,
  63. queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ttlcontroller"),
  64. }
  65. nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  66. AddFunc: ttlc.addNode,
  67. UpdateFunc: ttlc.updateNode,
  68. DeleteFunc: ttlc.deleteNode,
  69. })
  70. ttlc.nodeStore = listers.NewNodeLister(nodeInformer.Informer().GetIndexer())
  71. ttlc.hasSynced = nodeInformer.Informer().HasSynced
  72. return ttlc
  73. }
  74. type ttlBoundary struct {
  75. sizeMin int
  76. sizeMax int
  77. ttlSeconds int
  78. }
  79. var (
  80. ttlBoundaries = []ttlBoundary{
  81. {sizeMin: 0, sizeMax: 100, ttlSeconds: 0},
  82. {sizeMin: 90, sizeMax: 500, ttlSeconds: 15},
  83. {sizeMin: 450, sizeMax: 1000, ttlSeconds: 30},
  84. {sizeMin: 900, sizeMax: 2000, ttlSeconds: 60},
  85. {sizeMin: 1800, sizeMax: 10000, ttlSeconds: 300},
  86. {sizeMin: 9000, sizeMax: math.MaxInt32, ttlSeconds: 600},
  87. }
  88. )
  89. func (ttlc *TTLController) Run(workers int, stopCh <-chan struct{}) {
  90. defer utilruntime.HandleCrash()
  91. defer ttlc.queue.ShutDown()
  92. klog.Infof("Starting TTL controller")
  93. defer klog.Infof("Shutting down TTL controller")
  94. if !controller.WaitForCacheSync("TTL", stopCh, ttlc.hasSynced) {
  95. return
  96. }
  97. for i := 0; i < workers; i++ {
  98. go wait.Until(ttlc.worker, time.Second, stopCh)
  99. }
  100. <-stopCh
  101. }
  102. func (ttlc *TTLController) addNode(obj interface{}) {
  103. node, ok := obj.(*v1.Node)
  104. if !ok {
  105. utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
  106. return
  107. }
  108. func() {
  109. ttlc.lock.Lock()
  110. defer ttlc.lock.Unlock()
  111. ttlc.nodeCount++
  112. if ttlc.nodeCount > ttlBoundaries[ttlc.boundaryStep].sizeMax {
  113. ttlc.boundaryStep++
  114. ttlc.desiredTTLSeconds = ttlBoundaries[ttlc.boundaryStep].ttlSeconds
  115. }
  116. }()
  117. ttlc.enqueueNode(node)
  118. }
  119. func (ttlc *TTLController) updateNode(_, newObj interface{}) {
  120. node, ok := newObj.(*v1.Node)
  121. if !ok {
  122. utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj))
  123. return
  124. }
  125. // Processing all updates of nodes guarantees that we will update
  126. // the ttl annotation, when cluster size changes.
  127. // We are relying on the fact that Kubelet is updating node status
  128. // every 10s (or generally every X seconds), which means that whenever
  129. // required, its ttl annotation should be updated within that period.
  130. ttlc.enqueueNode(node)
  131. }
  132. func (ttlc *TTLController) deleteNode(obj interface{}) {
  133. _, ok := obj.(*v1.Node)
  134. if !ok {
  135. tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
  136. if !ok {
  137. utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
  138. return
  139. }
  140. _, ok = tombstone.Obj.(*v1.Node)
  141. if !ok {
  142. utilruntime.HandleError(fmt.Errorf("unexpected object types: %v", obj))
  143. return
  144. }
  145. }
  146. func() {
  147. ttlc.lock.Lock()
  148. defer ttlc.lock.Unlock()
  149. ttlc.nodeCount--
  150. if ttlc.nodeCount < ttlBoundaries[ttlc.boundaryStep].sizeMin {
  151. ttlc.boundaryStep--
  152. ttlc.desiredTTLSeconds = ttlBoundaries[ttlc.boundaryStep].ttlSeconds
  153. }
  154. }()
  155. // We are not processing the node, as it no longer exists.
  156. }
  157. func (ttlc *TTLController) enqueueNode(node *v1.Node) {
  158. key, err := controller.KeyFunc(node)
  159. if err != nil {
  160. klog.Errorf("Couldn't get key for object %+v", node)
  161. return
  162. }
  163. ttlc.queue.Add(key)
  164. }
  165. func (ttlc *TTLController) worker() {
  166. for ttlc.processItem() {
  167. }
  168. }
  169. func (ttlc *TTLController) processItem() bool {
  170. key, quit := ttlc.queue.Get()
  171. if quit {
  172. return false
  173. }
  174. defer ttlc.queue.Done(key)
  175. err := ttlc.updateNodeIfNeeded(key.(string))
  176. if err == nil {
  177. ttlc.queue.Forget(key)
  178. return true
  179. }
  180. ttlc.queue.AddRateLimited(key)
  181. utilruntime.HandleError(err)
  182. return true
  183. }
  184. func (ttlc *TTLController) getDesiredTTLSeconds() int {
  185. ttlc.lock.RLock()
  186. defer ttlc.lock.RUnlock()
  187. return ttlc.desiredTTLSeconds
  188. }
  189. func getIntFromAnnotation(node *v1.Node, annotationKey string) (int, bool) {
  190. if node.Annotations == nil {
  191. return 0, false
  192. }
  193. annotationValue, ok := node.Annotations[annotationKey]
  194. if !ok {
  195. return 0, false
  196. }
  197. intValue, err := strconv.Atoi(annotationValue)
  198. if err != nil {
  199. klog.Warningf("Cannot convert the value %q with annotation key %q for the node %q",
  200. annotationValue, annotationKey, node.Name)
  201. return 0, false
  202. }
  203. return intValue, true
  204. }
  205. func setIntAnnotation(node *v1.Node, annotationKey string, value int) {
  206. if node.Annotations == nil {
  207. node.Annotations = make(map[string]string)
  208. }
  209. node.Annotations[annotationKey] = strconv.Itoa(value)
  210. }
  211. func (ttlc *TTLController) patchNodeWithAnnotation(node *v1.Node, annotationKey string, value int) error {
  212. oldData, err := json.Marshal(node)
  213. if err != nil {
  214. return err
  215. }
  216. setIntAnnotation(node, annotationKey, value)
  217. newData, err := json.Marshal(node)
  218. if err != nil {
  219. return err
  220. }
  221. patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, &v1.Node{})
  222. if err != nil {
  223. return err
  224. }
  225. _, err = ttlc.kubeClient.CoreV1().Nodes().Patch(node.Name, types.StrategicMergePatchType, patchBytes)
  226. if err != nil {
  227. klog.V(2).Infof("Failed to change ttl annotation for node %s: %v", node.Name, err)
  228. return err
  229. }
  230. klog.V(2).Infof("Changed ttl annotation for node %s to %d seconds", node.Name, value)
  231. return nil
  232. }
  233. func (ttlc *TTLController) updateNodeIfNeeded(key string) error {
  234. node, err := ttlc.nodeStore.Get(key)
  235. if err != nil {
  236. if apierrors.IsNotFound(err) {
  237. return nil
  238. }
  239. return err
  240. }
  241. desiredTTL := ttlc.getDesiredTTLSeconds()
  242. currentTTL, ok := getIntFromAnnotation(node, v1.ObjectTTLAnnotationKey)
  243. if ok && currentTTL == desiredTTL {
  244. return nil
  245. }
  246. return ttlc.patchNodeWithAnnotation(node.DeepCopy(), v1.ObjectTTLAnnotationKey, desiredTTL)
  247. }