123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273 |
- /*
- Copyright 2017 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package checkpoint
- import (
- "fmt"
- "math/rand"
- "time"
- apiv1 "k8s.io/api/core/v1"
- apiequality "k8s.io/apimachinery/pkg/api/equality"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/fields"
- "k8s.io/apimachinery/pkg/runtime"
- "k8s.io/apimachinery/pkg/watch"
- clientset "k8s.io/client-go/kubernetes"
- "k8s.io/client-go/tools/cache"
- kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1"
- kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
- "k8s.io/kubernetes/pkg/kubelet/apis/config/scheme"
- "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/status"
- utilcodec "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/codec"
- utillog "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/log"
- )
- // Payload represents a local copy of a config source (payload) object
- type Payload interface {
- // UID returns a globally unique (space and time) identifier for the payload.
- // The return value is guaranteed non-empty.
- UID() string
- // ResourceVersion returns a resource version for the payload.
- // The return value is guaranteed non-empty.
- ResourceVersion() string
- // Files returns a map of filenames to file contents.
- Files() map[string]string
- // object returns the underlying checkpointed object.
- object() interface{}
- }
- // RemoteConfigSource represents a remote config source object that can be downloaded as a Checkpoint
- type RemoteConfigSource interface {
- // KubeletFilename returns the name of the Kubelet config file as it should appear in the keys of Payload.Files()
- KubeletFilename() string
- // APIPath returns the API path to the remote resource, e.g. its SelfLink
- APIPath() string
- // UID returns the globally unique identifier for the most recently downloaded payload targeted by the source.
- UID() string
- // ResourceVersion returns the resource version of the most recently downloaded payload targeted by the source.
- ResourceVersion() string
- // Download downloads the remote config source's target object and returns a Payload backed by the object,
- // or a sanitized failure reason and error if the download fails.
- // Download takes an optional store as an argument. If provided, Download will check this store for the
- // target object prior to contacting the API server.
- // Download updates the local UID and ResourceVersion tracked by this source, based on the downloaded payload.
- Download(client clientset.Interface, store cache.Store) (Payload, string, error)
- // Informer returns an informer that can be used to detect changes to the remote config source
- Informer(client clientset.Interface, handler cache.ResourceEventHandlerFuncs) cache.SharedInformer
- // Encode returns a []byte representation of the object behind the RemoteConfigSource
- Encode() ([]byte, error)
- // NodeConfigSource returns a copy of the underlying apiv1.NodeConfigSource object.
- // All RemoteConfigSources are expected to be backed by a NodeConfigSource,
- // though the convenience methods on the interface will target the source
- // type that was detected in a call to NewRemoteConfigSource.
- NodeConfigSource() *apiv1.NodeConfigSource
- }
- // NewRemoteConfigSource constructs a RemoteConfigSource from a v1/NodeConfigSource object
- // You should only call this with a non-nil config source.
- // Note that the API server validates Node.Spec.ConfigSource.
- func NewRemoteConfigSource(source *apiv1.NodeConfigSource) (RemoteConfigSource, string, error) {
- // NOTE: Even though the API server validates the config, we check whether all *known* fields are
- // nil here, so that if a new API server allows a new config source type, old clients can send
- // an error message rather than crashing due to a nil pointer dereference.
- // Exactly one reference subfield of the config source must be non-nil.
- // Currently ConfigMap is the only reference subfield.
- if source.ConfigMap == nil {
- return nil, status.AllNilSubfieldsError, fmt.Errorf("%s, NodeConfigSource was: %#v", status.AllNilSubfieldsError, source)
- }
- return &remoteConfigMap{source}, "", nil
- }
- // DecodeRemoteConfigSource is a helper for using the apimachinery to decode serialized RemoteConfigSources;
- // e.g. the metadata stored by checkpoint/store/fsstore.go
- func DecodeRemoteConfigSource(data []byte) (RemoteConfigSource, error) {
- // decode the remote config source
- _, codecs, err := scheme.NewSchemeAndCodecs()
- if err != nil {
- return nil, err
- }
- obj, err := runtime.Decode(codecs.UniversalDecoder(), data)
- if err != nil {
- return nil, fmt.Errorf("failed to decode, error: %v", err)
- }
- // for now we assume we are trying to load an kubeletconfigv1beta1.SerializedNodeConfigSource,
- // this may need to be extended if e.g. a new version of the api is born
- cs, ok := obj.(*kubeletconfiginternal.SerializedNodeConfigSource)
- if !ok {
- return nil, fmt.Errorf("failed to cast decoded remote config source to *k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig.SerializedNodeConfigSource")
- }
- // we use the v1.NodeConfigSource type on internal and external, so no need to convert to external here
- source, _, err := NewRemoteConfigSource(&cs.Source)
- if err != nil {
- return nil, err
- }
- return source, nil
- }
- // EqualRemoteConfigSources is a helper for comparing remote config sources by
- // comparing the underlying API objects for semantic equality.
- func EqualRemoteConfigSources(a, b RemoteConfigSource) bool {
- if a != nil && b != nil {
- return apiequality.Semantic.DeepEqual(a.NodeConfigSource(), b.NodeConfigSource())
- }
- return a == b
- }
- // remoteConfigMap implements RemoteConfigSource for v1/ConfigMap config sources
- type remoteConfigMap struct {
- source *apiv1.NodeConfigSource
- }
- var _ RemoteConfigSource = (*remoteConfigMap)(nil)
- func (r *remoteConfigMap) KubeletFilename() string {
- return r.source.ConfigMap.KubeletConfigKey
- }
- const configMapAPIPathFmt = "/api/v1/namespaces/%s/configmaps/%s"
- func (r *remoteConfigMap) APIPath() string {
- ref := r.source.ConfigMap
- return fmt.Sprintf(configMapAPIPathFmt, ref.Namespace, ref.Name)
- }
- func (r *remoteConfigMap) UID() string {
- return string(r.source.ConfigMap.UID)
- }
- func (r *remoteConfigMap) ResourceVersion() string {
- return r.source.ConfigMap.ResourceVersion
- }
- func (r *remoteConfigMap) Download(client clientset.Interface, store cache.Store) (Payload, string, error) {
- var (
- cm *apiv1.ConfigMap
- err error
- )
- // check the in-memory store for the ConfigMap, so we can skip unnecessary downloads
- if store != nil {
- utillog.Infof("checking in-memory store for %s", r.APIPath())
- cm, err = getConfigMapFromStore(store, r.source.ConfigMap.Namespace, r.source.ConfigMap.Name)
- if err != nil {
- // just log the error, we'll attempt a direct download instead
- utillog.Errorf("failed to check in-memory store for %s, error: %v", r.APIPath(), err)
- } else if cm != nil {
- utillog.Infof("found %s in in-memory store, UID: %s, ResourceVersion: %s", r.APIPath(), cm.UID, cm.ResourceVersion)
- } else {
- utillog.Infof("did not find %s in in-memory store", r.APIPath())
- }
- }
- // if we didn't find the ConfigMap in the in-memory store, download it from the API server
- if cm == nil {
- utillog.Infof("attempting to download %s", r.APIPath())
- cm, err = client.CoreV1().ConfigMaps(r.source.ConfigMap.Namespace).Get(r.source.ConfigMap.Name, metav1.GetOptions{})
- if err != nil {
- return nil, status.DownloadError, fmt.Errorf("%s, error: %v", status.DownloadError, err)
- }
- utillog.Infof("successfully downloaded %s, UID: %s, ResourceVersion: %s", r.APIPath(), cm.UID, cm.ResourceVersion)
- } // Assert: Now we have a non-nil ConfigMap
- // construct Payload from the ConfigMap
- payload, err := NewConfigMapPayload(cm)
- if err != nil {
- // We only expect an error here if ObjectMeta is lacking UID or ResourceVersion. This should
- // never happen on objects in the informer's store, or objects downloaded from the API server
- // directly, so we report InternalError.
- return nil, status.InternalError, fmt.Errorf("%s, error: %v", status.InternalError, err)
- }
- // update internal UID and ResourceVersion based on latest ConfigMap
- r.source.ConfigMap.UID = cm.UID
- r.source.ConfigMap.ResourceVersion = cm.ResourceVersion
- return payload, "", nil
- }
- func (r *remoteConfigMap) Informer(client clientset.Interface, handler cache.ResourceEventHandlerFuncs) cache.SharedInformer {
- // select ConfigMap by name
- fieldselector := fields.OneTermEqualSelector("metadata.name", r.source.ConfigMap.Name)
- // add some randomness to resync period, which can help avoid controllers falling into lock-step
- minResyncPeriod := 15 * time.Minute
- factor := rand.Float64() + 1
- resyncPeriod := time.Duration(float64(minResyncPeriod.Nanoseconds()) * factor)
- lw := &cache.ListWatch{
- ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
- return client.CoreV1().ConfigMaps(r.source.ConfigMap.Namespace).List(metav1.ListOptions{
- FieldSelector: fieldselector.String(),
- })
- },
- WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
- return client.CoreV1().ConfigMaps(r.source.ConfigMap.Namespace).Watch(metav1.ListOptions{
- FieldSelector: fieldselector.String(),
- ResourceVersion: options.ResourceVersion,
- })
- },
- }
- informer := cache.NewSharedInformer(lw, &apiv1.ConfigMap{}, resyncPeriod)
- informer.AddEventHandler(handler)
- return informer
- }
- func (r *remoteConfigMap) Encode() ([]byte, error) {
- encoder, err := utilcodec.NewKubeletconfigYAMLEncoder(kubeletconfigv1beta1.SchemeGroupVersion)
- if err != nil {
- return nil, err
- }
- data, err := runtime.Encode(encoder, &kubeletconfigv1beta1.SerializedNodeConfigSource{Source: *r.source})
- if err != nil {
- return nil, err
- }
- return data, nil
- }
- func (r *remoteConfigMap) NodeConfigSource() *apiv1.NodeConfigSource {
- return r.source.DeepCopy()
- }
- func getConfigMapFromStore(store cache.Store, namespace, name string) (*apiv1.ConfigMap, error) {
- key := fmt.Sprintf("%s/%s", namespace, name)
- obj, ok, err := store.GetByKey(key)
- if err != nil || !ok {
- return nil, err
- }
- cm, ok := obj.(*apiv1.ConfigMap)
- if !ok {
- err := fmt.Errorf("failed to cast object %s from informer's store to ConfigMap", key)
- utillog.Errorf(err.Error())
- return nil, err
- }
- return cm, nil
- }
|