watch_based_manager.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. /*
  2. Copyright 2018 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // TODO: We did some scalability tests and using watchBasedManager
  14. // seems to help with apiserver performance at scale visibly.
  15. // No issues we also observed at the scale of ~200k watchers with a
  16. // single apiserver.
  17. // However, we need to perform more extensive testing before we
  18. // enable this in production setups.
  19. package manager
  20. import (
  21. "fmt"
  22. "sync"
  23. "time"
  24. "k8s.io/api/core/v1"
  25. "k8s.io/client-go/tools/cache"
  26. apierrors "k8s.io/apimachinery/pkg/api/errors"
  27. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  28. "k8s.io/apimachinery/pkg/fields"
  29. "k8s.io/apimachinery/pkg/runtime"
  30. "k8s.io/apimachinery/pkg/runtime/schema"
  31. "k8s.io/apimachinery/pkg/util/sets"
  32. "k8s.io/apimachinery/pkg/util/wait"
  33. "k8s.io/apimachinery/pkg/watch"
  34. )
  35. type listObjectFunc func(string, metav1.ListOptions) (runtime.Object, error)
  36. type watchObjectFunc func(string, metav1.ListOptions) (watch.Interface, error)
  37. type newObjectFunc func() runtime.Object
  38. // objectCacheItem is a single item stored in objectCache.
  39. type objectCacheItem struct {
  40. refCount int
  41. store cache.Store
  42. hasSynced func() (bool, error)
  43. stopCh chan struct{}
  44. }
  45. // objectCache is a local cache of objects propagated via
  46. // individual watches.
  47. type objectCache struct {
  48. listObject listObjectFunc
  49. watchObject watchObjectFunc
  50. newObject newObjectFunc
  51. groupResource schema.GroupResource
  52. lock sync.Mutex
  53. items map[objectKey]*objectCacheItem
  54. }
  55. // NewObjectCache returns a new watch-based instance of Store interface.
  56. func NewObjectCache(listObject listObjectFunc, watchObject watchObjectFunc, newObject newObjectFunc, groupResource schema.GroupResource) Store {
  57. return &objectCache{
  58. listObject: listObject,
  59. watchObject: watchObject,
  60. newObject: newObject,
  61. groupResource: groupResource,
  62. items: make(map[objectKey]*objectCacheItem),
  63. }
  64. }
  65. func (c *objectCache) newStore() cache.Store {
  66. // TODO: We may consider created a dedicated store keeping just a single
  67. // item, instead of using a generic store implementation for this purpose.
  68. // However, simple benchmarks show that memory overhead in that case is
  69. // decrease from ~600B to ~300B per object. So we are not optimizing it
  70. // until we will see a good reason for that.
  71. return cache.NewStore(cache.MetaNamespaceKeyFunc)
  72. }
  73. func (c *objectCache) newReflector(namespace, name string) *objectCacheItem {
  74. fieldSelector := fields.Set{"metadata.name": name}.AsSelector().String()
  75. listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
  76. options.FieldSelector = fieldSelector
  77. return c.listObject(namespace, options)
  78. }
  79. watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
  80. options.FieldSelector = fieldSelector
  81. return c.watchObject(namespace, options)
  82. }
  83. store := c.newStore()
  84. reflector := cache.NewNamedReflector(
  85. fmt.Sprintf("object-%q/%q", namespace, name),
  86. &cache.ListWatch{ListFunc: listFunc, WatchFunc: watchFunc},
  87. c.newObject(),
  88. store,
  89. 0,
  90. )
  91. stopCh := make(chan struct{})
  92. go reflector.Run(stopCh)
  93. return &objectCacheItem{
  94. refCount: 0,
  95. store: store,
  96. hasSynced: func() (bool, error) { return reflector.LastSyncResourceVersion() != "", nil },
  97. stopCh: stopCh,
  98. }
  99. }
  100. func (c *objectCache) AddReference(namespace, name string) {
  101. key := objectKey{namespace: namespace, name: name}
  102. // AddReference is called from RegisterPod thus it needs to be efficient.
  103. // Thus, it is only increaisng refCount and in case of first registration
  104. // of a given object it starts corresponding reflector.
  105. // It's responsibility of the first Get operation to wait until the
  106. // reflector propagated the store.
  107. c.lock.Lock()
  108. defer c.lock.Unlock()
  109. item, exists := c.items[key]
  110. if !exists {
  111. item = c.newReflector(namespace, name)
  112. c.items[key] = item
  113. }
  114. item.refCount++
  115. }
  116. func (c *objectCache) DeleteReference(namespace, name string) {
  117. key := objectKey{namespace: namespace, name: name}
  118. c.lock.Lock()
  119. defer c.lock.Unlock()
  120. if item, ok := c.items[key]; ok {
  121. item.refCount--
  122. if item.refCount == 0 {
  123. // Stop the underlying reflector.
  124. close(item.stopCh)
  125. delete(c.items, key)
  126. }
  127. }
  128. }
  129. // key returns key of an object with a given name and namespace.
  130. // This has to be in-sync with cache.MetaNamespaceKeyFunc.
  131. func (c *objectCache) key(namespace, name string) string {
  132. if len(namespace) > 0 {
  133. return namespace + "/" + name
  134. }
  135. return name
  136. }
  137. func (c *objectCache) Get(namespace, name string) (runtime.Object, error) {
  138. key := objectKey{namespace: namespace, name: name}
  139. c.lock.Lock()
  140. item, exists := c.items[key]
  141. c.lock.Unlock()
  142. if !exists {
  143. return nil, fmt.Errorf("object %q/%q not registered", namespace, name)
  144. }
  145. if err := wait.PollImmediate(10*time.Millisecond, time.Second, item.hasSynced); err != nil {
  146. return nil, fmt.Errorf("couldn't propagate object cache: %v", err)
  147. }
  148. obj, exists, err := item.store.GetByKey(c.key(namespace, name))
  149. if err != nil {
  150. return nil, err
  151. }
  152. if !exists {
  153. return nil, apierrors.NewNotFound(c.groupResource, name)
  154. }
  155. if object, ok := obj.(runtime.Object); ok {
  156. return object, nil
  157. }
  158. return nil, fmt.Errorf("unexpected object type: %v", obj)
  159. }
  160. // NewWatchBasedManager creates a manager that keeps a cache of all objects
  161. // necessary for registered pods.
  162. // It implements the following logic:
  163. // - whenever a pod is created or updated, we start individual watches for all
  164. // referenced objects that aren't referenced from other registered pods
  165. // - every GetObject() returns a value from local cache propagated via watches
  166. func NewWatchBasedManager(listObject listObjectFunc, watchObject watchObjectFunc, newObject newObjectFunc, groupResource schema.GroupResource, getReferencedObjects func(*v1.Pod) sets.String) Manager {
  167. objectStore := NewObjectCache(listObject, watchObject, newObject, groupResource)
  168. return NewCacheBasedManager(objectStore, getReferencedObjects)
  169. }