123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195 |
- /*
- Copyright 2018 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- // TODO: We did some scalability tests and using watchBasedManager
- // seems to help with apiserver performance at scale visibly.
- // No issues we also observed at the scale of ~200k watchers with a
- // single apiserver.
- // However, we need to perform more extensive testing before we
- // enable this in production setups.
- package manager
- import (
- "fmt"
- "sync"
- "time"
- "k8s.io/api/core/v1"
- "k8s.io/client-go/tools/cache"
- apierrors "k8s.io/apimachinery/pkg/api/errors"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/fields"
- "k8s.io/apimachinery/pkg/runtime"
- "k8s.io/apimachinery/pkg/runtime/schema"
- "k8s.io/apimachinery/pkg/util/sets"
- "k8s.io/apimachinery/pkg/util/wait"
- "k8s.io/apimachinery/pkg/watch"
- )
- type listObjectFunc func(string, metav1.ListOptions) (runtime.Object, error)
- type watchObjectFunc func(string, metav1.ListOptions) (watch.Interface, error)
- type newObjectFunc func() runtime.Object
- // objectCacheItem is a single item stored in objectCache.
- type objectCacheItem struct {
- refCount int
- store cache.Store
- hasSynced func() (bool, error)
- stopCh chan struct{}
- }
- // objectCache is a local cache of objects propagated via
- // individual watches.
- type objectCache struct {
- listObject listObjectFunc
- watchObject watchObjectFunc
- newObject newObjectFunc
- groupResource schema.GroupResource
- lock sync.Mutex
- items map[objectKey]*objectCacheItem
- }
- // NewObjectCache returns a new watch-based instance of Store interface.
- func NewObjectCache(listObject listObjectFunc, watchObject watchObjectFunc, newObject newObjectFunc, groupResource schema.GroupResource) Store {
- return &objectCache{
- listObject: listObject,
- watchObject: watchObject,
- newObject: newObject,
- groupResource: groupResource,
- items: make(map[objectKey]*objectCacheItem),
- }
- }
- func (c *objectCache) newStore() cache.Store {
- // TODO: We may consider created a dedicated store keeping just a single
- // item, instead of using a generic store implementation for this purpose.
- // However, simple benchmarks show that memory overhead in that case is
- // decrease from ~600B to ~300B per object. So we are not optimizing it
- // until we will see a good reason for that.
- return cache.NewStore(cache.MetaNamespaceKeyFunc)
- }
- func (c *objectCache) newReflector(namespace, name string) *objectCacheItem {
- fieldSelector := fields.Set{"metadata.name": name}.AsSelector().String()
- listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
- options.FieldSelector = fieldSelector
- return c.listObject(namespace, options)
- }
- watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
- options.FieldSelector = fieldSelector
- return c.watchObject(namespace, options)
- }
- store := c.newStore()
- reflector := cache.NewNamedReflector(
- fmt.Sprintf("object-%q/%q", namespace, name),
- &cache.ListWatch{ListFunc: listFunc, WatchFunc: watchFunc},
- c.newObject(),
- store,
- 0,
- )
- stopCh := make(chan struct{})
- go reflector.Run(stopCh)
- return &objectCacheItem{
- refCount: 0,
- store: store,
- hasSynced: func() (bool, error) { return reflector.LastSyncResourceVersion() != "", nil },
- stopCh: stopCh,
- }
- }
- func (c *objectCache) AddReference(namespace, name string) {
- key := objectKey{namespace: namespace, name: name}
- // AddReference is called from RegisterPod thus it needs to be efficient.
- // Thus, it is only increaisng refCount and in case of first registration
- // of a given object it starts corresponding reflector.
- // It's responsibility of the first Get operation to wait until the
- // reflector propagated the store.
- c.lock.Lock()
- defer c.lock.Unlock()
- item, exists := c.items[key]
- if !exists {
- item = c.newReflector(namespace, name)
- c.items[key] = item
- }
- item.refCount++
- }
- func (c *objectCache) DeleteReference(namespace, name string) {
- key := objectKey{namespace: namespace, name: name}
- c.lock.Lock()
- defer c.lock.Unlock()
- if item, ok := c.items[key]; ok {
- item.refCount--
- if item.refCount == 0 {
- // Stop the underlying reflector.
- close(item.stopCh)
- delete(c.items, key)
- }
- }
- }
- // key returns key of an object with a given name and namespace.
- // This has to be in-sync with cache.MetaNamespaceKeyFunc.
- func (c *objectCache) key(namespace, name string) string {
- if len(namespace) > 0 {
- return namespace + "/" + name
- }
- return name
- }
- func (c *objectCache) Get(namespace, name string) (runtime.Object, error) {
- key := objectKey{namespace: namespace, name: name}
- c.lock.Lock()
- item, exists := c.items[key]
- c.lock.Unlock()
- if !exists {
- return nil, fmt.Errorf("object %q/%q not registered", namespace, name)
- }
- if err := wait.PollImmediate(10*time.Millisecond, time.Second, item.hasSynced); err != nil {
- return nil, fmt.Errorf("couldn't propagate object cache: %v", err)
- }
- obj, exists, err := item.store.GetByKey(c.key(namespace, name))
- if err != nil {
- return nil, err
- }
- if !exists {
- return nil, apierrors.NewNotFound(c.groupResource, name)
- }
- if object, ok := obj.(runtime.Object); ok {
- return object, nil
- }
- return nil, fmt.Errorf("unexpected object type: %v", obj)
- }
- // NewWatchBasedManager creates a manager that keeps a cache of all objects
- // necessary for registered pods.
- // It implements the following logic:
- // - whenever a pod is created or updated, we start individual watches for all
- // referenced objects that aren't referenced from other registered pods
- // - every GetObject() returns a value from local cache propagated via watches
- func NewWatchBasedManager(listObject listObjectFunc, watchObject watchObjectFunc, newObject newObjectFunc, groupResource schema.GroupResource, getReferencedObjects func(*v1.Pod) sets.String) Manager {
- objectStore := NewObjectCache(listObject, watchObject, newObject, groupResource)
- return NewCacheBasedManager(objectStore, getReferencedObjects)
- }
|