download.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package checkpoint
  14. import (
  15. "fmt"
  16. "math/rand"
  17. "time"
  18. apiv1 "k8s.io/api/core/v1"
  19. apiequality "k8s.io/apimachinery/pkg/api/equality"
  20. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  21. "k8s.io/apimachinery/pkg/fields"
  22. "k8s.io/apimachinery/pkg/runtime"
  23. "k8s.io/apimachinery/pkg/watch"
  24. clientset "k8s.io/client-go/kubernetes"
  25. "k8s.io/client-go/tools/cache"
  26. kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1"
  27. kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
  28. "k8s.io/kubernetes/pkg/kubelet/apis/config/scheme"
  29. "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/status"
  30. utilcodec "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/codec"
  31. utillog "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/log"
  32. )
  33. // Payload represents a local copy of a config source (payload) object
  34. type Payload interface {
  35. // UID returns a globally unique (space and time) identifier for the payload.
  36. // The return value is guaranteed non-empty.
  37. UID() string
  38. // ResourceVersion returns a resource version for the payload.
  39. // The return value is guaranteed non-empty.
  40. ResourceVersion() string
  41. // Files returns a map of filenames to file contents.
  42. Files() map[string]string
  43. // object returns the underlying checkpointed object.
  44. object() interface{}
  45. }
  46. // RemoteConfigSource represents a remote config source object that can be downloaded as a Checkpoint
  47. type RemoteConfigSource interface {
  48. // KubeletFilename returns the name of the Kubelet config file as it should appear in the keys of Payload.Files()
  49. KubeletFilename() string
  50. // APIPath returns the API path to the remote resource, e.g. its SelfLink
  51. APIPath() string
  52. // UID returns the globally unique identifier for the most recently downloaded payload targeted by the source.
  53. UID() string
  54. // ResourceVersion returns the resource version of the most recently downloaded payload targeted by the source.
  55. ResourceVersion() string
  56. // Download downloads the remote config source's target object and returns a Payload backed by the object,
  57. // or a sanitized failure reason and error if the download fails.
  58. // Download takes an optional store as an argument. If provided, Download will check this store for the
  59. // target object prior to contacting the API server.
  60. // Download updates the local UID and ResourceVersion tracked by this source, based on the downloaded payload.
  61. Download(client clientset.Interface, store cache.Store) (Payload, string, error)
  62. // Informer returns an informer that can be used to detect changes to the remote config source
  63. Informer(client clientset.Interface, handler cache.ResourceEventHandlerFuncs) cache.SharedInformer
  64. // Encode returns a []byte representation of the object behind the RemoteConfigSource
  65. Encode() ([]byte, error)
  66. // NodeConfigSource returns a copy of the underlying apiv1.NodeConfigSource object.
  67. // All RemoteConfigSources are expected to be backed by a NodeConfigSource,
  68. // though the convenience methods on the interface will target the source
  69. // type that was detected in a call to NewRemoteConfigSource.
  70. NodeConfigSource() *apiv1.NodeConfigSource
  71. }
  72. // NewRemoteConfigSource constructs a RemoteConfigSource from a v1/NodeConfigSource object
  73. // You should only call this with a non-nil config source.
  74. // Note that the API server validates Node.Spec.ConfigSource.
  75. func NewRemoteConfigSource(source *apiv1.NodeConfigSource) (RemoteConfigSource, string, error) {
  76. // NOTE: Even though the API server validates the config, we check whether all *known* fields are
  77. // nil here, so that if a new API server allows a new config source type, old clients can send
  78. // an error message rather than crashing due to a nil pointer dereference.
  79. // Exactly one reference subfield of the config source must be non-nil.
  80. // Currently ConfigMap is the only reference subfield.
  81. if source.ConfigMap == nil {
  82. return nil, status.AllNilSubfieldsError, fmt.Errorf("%s, NodeConfigSource was: %#v", status.AllNilSubfieldsError, source)
  83. }
  84. return &remoteConfigMap{source}, "", nil
  85. }
  86. // DecodeRemoteConfigSource is a helper for using the apimachinery to decode serialized RemoteConfigSources;
  87. // e.g. the metadata stored by checkpoint/store/fsstore.go
  88. func DecodeRemoteConfigSource(data []byte) (RemoteConfigSource, error) {
  89. // decode the remote config source
  90. _, codecs, err := scheme.NewSchemeAndCodecs()
  91. if err != nil {
  92. return nil, err
  93. }
  94. obj, err := runtime.Decode(codecs.UniversalDecoder(), data)
  95. if err != nil {
  96. return nil, fmt.Errorf("failed to decode, error: %v", err)
  97. }
  98. // for now we assume we are trying to load an kubeletconfigv1beta1.SerializedNodeConfigSource,
  99. // this may need to be extended if e.g. a new version of the api is born
  100. cs, ok := obj.(*kubeletconfiginternal.SerializedNodeConfigSource)
  101. if !ok {
  102. return nil, fmt.Errorf("failed to cast decoded remote config source to *k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig.SerializedNodeConfigSource")
  103. }
  104. // we use the v1.NodeConfigSource type on internal and external, so no need to convert to external here
  105. source, _, err := NewRemoteConfigSource(&cs.Source)
  106. if err != nil {
  107. return nil, err
  108. }
  109. return source, nil
  110. }
  111. // EqualRemoteConfigSources is a helper for comparing remote config sources by
  112. // comparing the underlying API objects for semantic equality.
  113. func EqualRemoteConfigSources(a, b RemoteConfigSource) bool {
  114. if a != nil && b != nil {
  115. return apiequality.Semantic.DeepEqual(a.NodeConfigSource(), b.NodeConfigSource())
  116. }
  117. return a == b
  118. }
  119. // remoteConfigMap implements RemoteConfigSource for v1/ConfigMap config sources
  120. type remoteConfigMap struct {
  121. source *apiv1.NodeConfigSource
  122. }
  123. var _ RemoteConfigSource = (*remoteConfigMap)(nil)
  124. func (r *remoteConfigMap) KubeletFilename() string {
  125. return r.source.ConfigMap.KubeletConfigKey
  126. }
  127. const configMapAPIPathFmt = "/api/v1/namespaces/%s/configmaps/%s"
  128. func (r *remoteConfigMap) APIPath() string {
  129. ref := r.source.ConfigMap
  130. return fmt.Sprintf(configMapAPIPathFmt, ref.Namespace, ref.Name)
  131. }
  132. func (r *remoteConfigMap) UID() string {
  133. return string(r.source.ConfigMap.UID)
  134. }
  135. func (r *remoteConfigMap) ResourceVersion() string {
  136. return r.source.ConfigMap.ResourceVersion
  137. }
  138. func (r *remoteConfigMap) Download(client clientset.Interface, store cache.Store) (Payload, string, error) {
  139. var (
  140. cm *apiv1.ConfigMap
  141. err error
  142. )
  143. // check the in-memory store for the ConfigMap, so we can skip unnecessary downloads
  144. if store != nil {
  145. utillog.Infof("checking in-memory store for %s", r.APIPath())
  146. cm, err = getConfigMapFromStore(store, r.source.ConfigMap.Namespace, r.source.ConfigMap.Name)
  147. if err != nil {
  148. // just log the error, we'll attempt a direct download instead
  149. utillog.Errorf("failed to check in-memory store for %s, error: %v", r.APIPath(), err)
  150. } else if cm != nil {
  151. utillog.Infof("found %s in in-memory store, UID: %s, ResourceVersion: %s", r.APIPath(), cm.UID, cm.ResourceVersion)
  152. } else {
  153. utillog.Infof("did not find %s in in-memory store", r.APIPath())
  154. }
  155. }
  156. // if we didn't find the ConfigMap in the in-memory store, download it from the API server
  157. if cm == nil {
  158. utillog.Infof("attempting to download %s", r.APIPath())
  159. cm, err = client.CoreV1().ConfigMaps(r.source.ConfigMap.Namespace).Get(r.source.ConfigMap.Name, metav1.GetOptions{})
  160. if err != nil {
  161. return nil, status.DownloadError, fmt.Errorf("%s, error: %v", status.DownloadError, err)
  162. }
  163. utillog.Infof("successfully downloaded %s, UID: %s, ResourceVersion: %s", r.APIPath(), cm.UID, cm.ResourceVersion)
  164. } // Assert: Now we have a non-nil ConfigMap
  165. // construct Payload from the ConfigMap
  166. payload, err := NewConfigMapPayload(cm)
  167. if err != nil {
  168. // We only expect an error here if ObjectMeta is lacking UID or ResourceVersion. This should
  169. // never happen on objects in the informer's store, or objects downloaded from the API server
  170. // directly, so we report InternalError.
  171. return nil, status.InternalError, fmt.Errorf("%s, error: %v", status.InternalError, err)
  172. }
  173. // update internal UID and ResourceVersion based on latest ConfigMap
  174. r.source.ConfigMap.UID = cm.UID
  175. r.source.ConfigMap.ResourceVersion = cm.ResourceVersion
  176. return payload, "", nil
  177. }
  178. func (r *remoteConfigMap) Informer(client clientset.Interface, handler cache.ResourceEventHandlerFuncs) cache.SharedInformer {
  179. // select ConfigMap by name
  180. fieldselector := fields.OneTermEqualSelector("metadata.name", r.source.ConfigMap.Name)
  181. // add some randomness to resync period, which can help avoid controllers falling into lock-step
  182. minResyncPeriod := 15 * time.Minute
  183. factor := rand.Float64() + 1
  184. resyncPeriod := time.Duration(float64(minResyncPeriod.Nanoseconds()) * factor)
  185. lw := &cache.ListWatch{
  186. ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
  187. return client.CoreV1().ConfigMaps(r.source.ConfigMap.Namespace).List(metav1.ListOptions{
  188. FieldSelector: fieldselector.String(),
  189. })
  190. },
  191. WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
  192. return client.CoreV1().ConfigMaps(r.source.ConfigMap.Namespace).Watch(metav1.ListOptions{
  193. FieldSelector: fieldselector.String(),
  194. ResourceVersion: options.ResourceVersion,
  195. })
  196. },
  197. }
  198. informer := cache.NewSharedInformer(lw, &apiv1.ConfigMap{}, resyncPeriod)
  199. informer.AddEventHandler(handler)
  200. return informer
  201. }
  202. func (r *remoteConfigMap) Encode() ([]byte, error) {
  203. encoder, err := utilcodec.NewKubeletconfigYAMLEncoder(kubeletconfigv1beta1.SchemeGroupVersion)
  204. if err != nil {
  205. return nil, err
  206. }
  207. data, err := runtime.Encode(encoder, &kubeletconfigv1beta1.SerializedNodeConfigSource{Source: *r.source})
  208. if err != nil {
  209. return nil, err
  210. }
  211. return data, nil
  212. }
  213. func (r *remoteConfigMap) NodeConfigSource() *apiv1.NodeConfigSource {
  214. return r.source.DeepCopy()
  215. }
  216. func getConfigMapFromStore(store cache.Store, namespace, name string) (*apiv1.ConfigMap, error) {
  217. key := fmt.Sprintf("%s/%s", namespace, name)
  218. obj, ok, err := store.GetByKey(key)
  219. if err != nil || !ok {
  220. return nil, err
  221. }
  222. cm, ok := obj.(*apiv1.ConfigMap)
  223. if !ok {
  224. err := fmt.Errorf("failed to cast object %s from informer's store to ConfigMap", key)
  225. utillog.Errorf(err.Error())
  226. return nil, err
  227. }
  228. return cm, nil
  229. }