cache_based_manager.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273
  1. /*
  2. Copyright 2018 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package manager
  14. import (
  15. "fmt"
  16. "strconv"
  17. "sync"
  18. "time"
  19. "k8s.io/api/core/v1"
  20. storageetcd "k8s.io/apiserver/pkg/storage/etcd"
  21. "k8s.io/kubernetes/pkg/kubelet/util"
  22. apierrors "k8s.io/apimachinery/pkg/api/errors"
  23. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  24. "k8s.io/apimachinery/pkg/runtime"
  25. "k8s.io/apimachinery/pkg/util/clock"
  26. "k8s.io/apimachinery/pkg/util/sets"
  27. )
  28. // GetObjectTTLFunc defines a function to get value of TTL.
  29. type GetObjectTTLFunc func() (time.Duration, bool)
  30. // GetObjectFunc defines a function to get object with a given namespace and name.
  31. type GetObjectFunc func(string, string, metav1.GetOptions) (runtime.Object, error)
  32. type objectKey struct {
  33. namespace string
  34. name string
  35. }
  36. // objectStoreItems is a single item stored in objectStore.
  37. type objectStoreItem struct {
  38. refCount int
  39. data *objectData
  40. }
  41. type objectData struct {
  42. sync.Mutex
  43. object runtime.Object
  44. err error
  45. lastUpdateTime time.Time
  46. }
  47. // objectStore is a local cache of objects.
  48. type objectStore struct {
  49. getObject GetObjectFunc
  50. clock clock.Clock
  51. lock sync.Mutex
  52. items map[objectKey]*objectStoreItem
  53. defaultTTL time.Duration
  54. getTTL GetObjectTTLFunc
  55. }
  56. // NewObjectStore returns a new ttl-based instance of Store interface.
  57. func NewObjectStore(getObject GetObjectFunc, clock clock.Clock, getTTL GetObjectTTLFunc, ttl time.Duration) Store {
  58. return &objectStore{
  59. getObject: getObject,
  60. clock: clock,
  61. items: make(map[objectKey]*objectStoreItem),
  62. defaultTTL: ttl,
  63. getTTL: getTTL,
  64. }
  65. }
  66. func isObjectOlder(newObject, oldObject runtime.Object) bool {
  67. if newObject == nil || oldObject == nil {
  68. return false
  69. }
  70. newVersion, _ := storageetcd.Versioner.ObjectResourceVersion(newObject)
  71. oldVersion, _ := storageetcd.Versioner.ObjectResourceVersion(oldObject)
  72. return newVersion < oldVersion
  73. }
  74. func (s *objectStore) AddReference(namespace, name string) {
  75. key := objectKey{namespace: namespace, name: name}
  76. // AddReference is called from RegisterPod, thus it needs to be efficient.
  77. // Thus Add() is only increasing refCount and generation of a given object.
  78. // Then Get() is responsible for fetching if needed.
  79. s.lock.Lock()
  80. defer s.lock.Unlock()
  81. item, exists := s.items[key]
  82. if !exists {
  83. item = &objectStoreItem{
  84. refCount: 0,
  85. data: &objectData{},
  86. }
  87. s.items[key] = item
  88. }
  89. item.refCount++
  90. // This will trigger fetch on the next Get() operation.
  91. item.data = nil
  92. }
  93. func (s *objectStore) DeleteReference(namespace, name string) {
  94. key := objectKey{namespace: namespace, name: name}
  95. s.lock.Lock()
  96. defer s.lock.Unlock()
  97. if item, ok := s.items[key]; ok {
  98. item.refCount--
  99. if item.refCount == 0 {
  100. delete(s.items, key)
  101. }
  102. }
  103. }
  104. // GetObjectTTLFromNodeFunc returns a function that returns TTL value
  105. // from a given Node object.
  106. func GetObjectTTLFromNodeFunc(getNode func() (*v1.Node, error)) GetObjectTTLFunc {
  107. return func() (time.Duration, bool) {
  108. node, err := getNode()
  109. if err != nil {
  110. return time.Duration(0), false
  111. }
  112. if node != nil && node.Annotations != nil {
  113. if value, ok := node.Annotations[v1.ObjectTTLAnnotationKey]; ok {
  114. if intValue, err := strconv.Atoi(value); err == nil {
  115. return time.Duration(intValue) * time.Second, true
  116. }
  117. }
  118. }
  119. return time.Duration(0), false
  120. }
  121. }
  122. func (s *objectStore) isObjectFresh(data *objectData) bool {
  123. objectTTL := s.defaultTTL
  124. if ttl, ok := s.getTTL(); ok {
  125. objectTTL = ttl
  126. }
  127. return s.clock.Now().Before(data.lastUpdateTime.Add(objectTTL))
  128. }
  129. func (s *objectStore) Get(namespace, name string) (runtime.Object, error) {
  130. key := objectKey{namespace: namespace, name: name}
  131. data := func() *objectData {
  132. s.lock.Lock()
  133. defer s.lock.Unlock()
  134. item, exists := s.items[key]
  135. if !exists {
  136. return nil
  137. }
  138. if item.data == nil {
  139. item.data = &objectData{}
  140. }
  141. return item.data
  142. }()
  143. if data == nil {
  144. return nil, fmt.Errorf("object %q/%q not registered", namespace, name)
  145. }
  146. // After updating data in objectStore, lock the data, fetch object if
  147. // needed and return data.
  148. data.Lock()
  149. defer data.Unlock()
  150. if data.err != nil || !s.isObjectFresh(data) {
  151. opts := metav1.GetOptions{}
  152. if data.object != nil && data.err == nil {
  153. // This is just a periodic refresh of an object we successfully fetched previously.
  154. // In this case, server data from apiserver cache to reduce the load on both
  155. // etcd and apiserver (the cache is eventually consistent).
  156. util.FromApiserverCache(&opts)
  157. }
  158. object, err := s.getObject(namespace, name, opts)
  159. if err != nil && !apierrors.IsNotFound(err) && data.object == nil && data.err == nil {
  160. // Couldn't fetch the latest object, but there is no cached data to return.
  161. // Return the fetch result instead.
  162. return object, err
  163. }
  164. if (err == nil && !isObjectOlder(object, data.object)) || apierrors.IsNotFound(err) {
  165. // If the fetch succeeded with a newer version of the object, or if the
  166. // object could not be found in the apiserver, update the cached data to
  167. // reflect the current status.
  168. data.object = object
  169. data.err = err
  170. data.lastUpdateTime = s.clock.Now()
  171. }
  172. }
  173. return data.object, data.err
  174. }
  175. // cacheBasedManager keeps a store with objects necessary
  176. // for registered pods. Different implementations of the store
  177. // may result in different semantics for freshness of objects
  178. // (e.g. ttl-based implementation vs watch-based implementation).
  179. type cacheBasedManager struct {
  180. objectStore Store
  181. getReferencedObjects func(*v1.Pod) sets.String
  182. lock sync.Mutex
  183. registeredPods map[objectKey]*v1.Pod
  184. }
  185. func (c *cacheBasedManager) GetObject(namespace, name string) (runtime.Object, error) {
  186. return c.objectStore.Get(namespace, name)
  187. }
  188. func (c *cacheBasedManager) RegisterPod(pod *v1.Pod) {
  189. names := c.getReferencedObjects(pod)
  190. c.lock.Lock()
  191. defer c.lock.Unlock()
  192. for name := range names {
  193. c.objectStore.AddReference(pod.Namespace, name)
  194. }
  195. var prev *v1.Pod
  196. key := objectKey{namespace: pod.Namespace, name: pod.Name}
  197. prev = c.registeredPods[key]
  198. c.registeredPods[key] = pod
  199. if prev != nil {
  200. for name := range c.getReferencedObjects(prev) {
  201. // On an update, the .Add() call above will have re-incremented the
  202. // ref count of any existing object, so any objects that are in both
  203. // names and prev need to have their ref counts decremented. Any that
  204. // are only in prev need to be completely removed. This unconditional
  205. // call takes care of both cases.
  206. c.objectStore.DeleteReference(prev.Namespace, name)
  207. }
  208. }
  209. }
  210. func (c *cacheBasedManager) UnregisterPod(pod *v1.Pod) {
  211. var prev *v1.Pod
  212. key := objectKey{namespace: pod.Namespace, name: pod.Name}
  213. c.lock.Lock()
  214. defer c.lock.Unlock()
  215. prev = c.registeredPods[key]
  216. delete(c.registeredPods, key)
  217. if prev != nil {
  218. for name := range c.getReferencedObjects(prev) {
  219. c.objectStore.DeleteReference(prev.Namespace, name)
  220. }
  221. }
  222. }
  223. // NewCacheBasedManager creates a manager that keeps a cache of all objects
  224. // necessary for registered pods.
  225. // It implements the following logic:
  226. // - whenever a pod is created or updated, the cached versions of all objects
  227. // is referencing are invalidated
  228. // - every GetObject() call tries to fetch the value from local cache; if it is
  229. // not there, invalidated or too old, we fetch it from apiserver and refresh the
  230. // value in cache; otherwise it is just fetched from cache
  231. func NewCacheBasedManager(objectStore Store, getReferencedObjects func(*v1.Pod) sets.String) Manager {
  232. return &cacheBasedManager{
  233. objectStore: objectStore,
  234. getReferencedObjects: getReferencedObjects,
  235. registeredPods: make(map[objectKey]*v1.Pod),
  236. }
  237. }