watch_based_manager.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. /*
  2. Copyright 2018 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package manager
  14. import (
  15. "fmt"
  16. "sync"
  17. "time"
  18. "k8s.io/api/core/v1"
  19. "k8s.io/client-go/tools/cache"
  20. "k8s.io/klog"
  21. apierrors "k8s.io/apimachinery/pkg/api/errors"
  22. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  23. "k8s.io/apimachinery/pkg/fields"
  24. "k8s.io/apimachinery/pkg/runtime"
  25. "k8s.io/apimachinery/pkg/runtime/schema"
  26. "k8s.io/apimachinery/pkg/util/sets"
  27. "k8s.io/apimachinery/pkg/util/wait"
  28. "k8s.io/apimachinery/pkg/watch"
  29. utilfeature "k8s.io/apiserver/pkg/util/feature"
  30. "k8s.io/kubernetes/pkg/features"
  31. )
  32. type listObjectFunc func(string, metav1.ListOptions) (runtime.Object, error)
  33. type watchObjectFunc func(string, metav1.ListOptions) (watch.Interface, error)
  34. type newObjectFunc func() runtime.Object
  35. type isImmutableFunc func(runtime.Object) bool
  36. // objectCacheItem is a single item stored in objectCache.
  37. type objectCacheItem struct {
  38. refCount int
  39. store cache.Store
  40. hasSynced func() (bool, error)
  41. // lock is protecting from closing stopCh multiple times.
  42. lock sync.Mutex
  43. stopCh chan struct{}
  44. }
  45. func (i *objectCacheItem) stop() bool {
  46. i.lock.Lock()
  47. defer i.lock.Unlock()
  48. select {
  49. case <-i.stopCh:
  50. // This means that channel is already closed.
  51. return false
  52. default:
  53. close(i.stopCh)
  54. return true
  55. }
  56. }
  57. // objectCache is a local cache of objects propagated via
  58. // individual watches.
  59. type objectCache struct {
  60. listObject listObjectFunc
  61. watchObject watchObjectFunc
  62. newObject newObjectFunc
  63. isImmutable isImmutableFunc
  64. groupResource schema.GroupResource
  65. lock sync.RWMutex
  66. items map[objectKey]*objectCacheItem
  67. }
  68. // NewObjectCache returns a new watch-based instance of Store interface.
  69. func NewObjectCache(
  70. listObject listObjectFunc,
  71. watchObject watchObjectFunc,
  72. newObject newObjectFunc,
  73. isImmutable isImmutableFunc,
  74. groupResource schema.GroupResource) Store {
  75. return &objectCache{
  76. listObject: listObject,
  77. watchObject: watchObject,
  78. newObject: newObject,
  79. isImmutable: isImmutable,
  80. groupResource: groupResource,
  81. items: make(map[objectKey]*objectCacheItem),
  82. }
  83. }
  84. func (c *objectCache) newStore() cache.Store {
  85. // TODO: We may consider created a dedicated store keeping just a single
  86. // item, instead of using a generic store implementation for this purpose.
  87. // However, simple benchmarks show that memory overhead in that case is
  88. // decrease from ~600B to ~300B per object. So we are not optimizing it
  89. // until we will see a good reason for that.
  90. return cache.NewStore(cache.MetaNamespaceKeyFunc)
  91. }
  92. func (c *objectCache) newReflector(namespace, name string) *objectCacheItem {
  93. fieldSelector := fields.Set{"metadata.name": name}.AsSelector().String()
  94. listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
  95. options.FieldSelector = fieldSelector
  96. return c.listObject(namespace, options)
  97. }
  98. watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
  99. options.FieldSelector = fieldSelector
  100. return c.watchObject(namespace, options)
  101. }
  102. store := c.newStore()
  103. reflector := cache.NewNamedReflector(
  104. fmt.Sprintf("object-%q/%q", namespace, name),
  105. &cache.ListWatch{ListFunc: listFunc, WatchFunc: watchFunc},
  106. c.newObject(),
  107. store,
  108. 0,
  109. )
  110. stopCh := make(chan struct{})
  111. go reflector.Run(stopCh)
  112. return &objectCacheItem{
  113. refCount: 0,
  114. store: store,
  115. hasSynced: func() (bool, error) { return reflector.LastSyncResourceVersion() != "", nil },
  116. stopCh: stopCh,
  117. }
  118. }
  119. func (c *objectCache) AddReference(namespace, name string) {
  120. key := objectKey{namespace: namespace, name: name}
  121. // AddReference is called from RegisterPod thus it needs to be efficient.
  122. // Thus, it is only increasing refCount and in case of first registration
  123. // of a given object it starts corresponding reflector.
  124. // It's responsibility of the first Get operation to wait until the
  125. // reflector propagated the store.
  126. c.lock.Lock()
  127. defer c.lock.Unlock()
  128. item, exists := c.items[key]
  129. if !exists {
  130. item = c.newReflector(namespace, name)
  131. c.items[key] = item
  132. }
  133. item.refCount++
  134. }
  135. func (c *objectCache) DeleteReference(namespace, name string) {
  136. key := objectKey{namespace: namespace, name: name}
  137. c.lock.Lock()
  138. defer c.lock.Unlock()
  139. if item, ok := c.items[key]; ok {
  140. item.refCount--
  141. if item.refCount == 0 {
  142. // Stop the underlying reflector.
  143. item.stop()
  144. delete(c.items, key)
  145. }
  146. }
  147. }
  148. // key returns key of an object with a given name and namespace.
  149. // This has to be in-sync with cache.MetaNamespaceKeyFunc.
  150. func (c *objectCache) key(namespace, name string) string {
  151. if len(namespace) > 0 {
  152. return namespace + "/" + name
  153. }
  154. return name
  155. }
  156. func (c *objectCache) Get(namespace, name string) (runtime.Object, error) {
  157. key := objectKey{namespace: namespace, name: name}
  158. c.lock.RLock()
  159. item, exists := c.items[key]
  160. c.lock.RUnlock()
  161. if !exists {
  162. return nil, fmt.Errorf("object %q/%q not registered", namespace, name)
  163. }
  164. if err := wait.PollImmediate(10*time.Millisecond, time.Second, item.hasSynced); err != nil {
  165. return nil, fmt.Errorf("failed to sync %s cache: %v", c.groupResource.String(), err)
  166. }
  167. obj, exists, err := item.store.GetByKey(c.key(namespace, name))
  168. if err != nil {
  169. return nil, err
  170. }
  171. if !exists {
  172. return nil, apierrors.NewNotFound(c.groupResource, name)
  173. }
  174. if object, ok := obj.(runtime.Object); ok {
  175. // If the returned object is immutable, stop the reflector.
  176. //
  177. // NOTE: we may potentially not even start the reflector if the object is
  178. // already immutable. However, given that:
  179. // - we want to also handle the case when object is marked as immutable later
  180. // - Secrets and ConfigMaps are periodically fetched by volumemanager anyway
  181. // - doing that wouldn't provide visible scalability/performance gain - we
  182. // already have it from here
  183. // - doing that would require significant refactoring to reflector
  184. // we limit ourselves to just quickly stop the reflector here.
  185. if utilfeature.DefaultFeatureGate.Enabled(features.ImmutableEphemeralVolumes) && c.isImmutable(object) {
  186. if item.stop() {
  187. klog.V(4).Infof("Stopped watching for changes of %q/%q - object is immutable", namespace, name)
  188. }
  189. }
  190. return object, nil
  191. }
  192. return nil, fmt.Errorf("unexpected object type: %v", obj)
  193. }
  194. // NewWatchBasedManager creates a manager that keeps a cache of all objects
  195. // necessary for registered pods.
  196. // It implements the following logic:
  197. // - whenever a pod is created or updated, we start individual watches for all
  198. // referenced objects that aren't referenced from other registered pods
  199. // - every GetObject() returns a value from local cache propagated via watches
  200. func NewWatchBasedManager(
  201. listObject listObjectFunc,
  202. watchObject watchObjectFunc,
  203. newObject newObjectFunc,
  204. isImmutable isImmutableFunc,
  205. groupResource schema.GroupResource,
  206. getReferencedObjects func(*v1.Pod) sets.String) Manager {
  207. objectStore := NewObjectCache(listObject, watchObject, newObject, isImmutable, groupResource)
  208. return NewCacheBasedManager(objectStore, getReferencedObjects)
  209. }