config.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package config
  14. import (
  15. "fmt"
  16. "reflect"
  17. "sync"
  18. "k8s.io/api/core/v1"
  19. "k8s.io/apimachinery/pkg/types"
  20. "k8s.io/apimachinery/pkg/util/sets"
  21. "k8s.io/client-go/tools/record"
  22. "k8s.io/klog"
  23. "k8s.io/kubernetes/pkg/kubelet/checkpoint"
  24. "k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
  25. kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
  26. "k8s.io/kubernetes/pkg/kubelet/events"
  27. kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
  28. "k8s.io/kubernetes/pkg/kubelet/util/format"
  29. "k8s.io/kubernetes/pkg/util/config"
  30. )
  31. // PodConfigNotificationMode describes how changes are sent to the update channel.
  32. type PodConfigNotificationMode int
  33. const (
  34. // PodConfigNotificationUnknown is the default value for
  35. // PodConfigNotificationMode when uninitialized.
  36. PodConfigNotificationUnknown = iota
  37. // PodConfigNotificationSnapshot delivers the full configuration as a SET whenever
  38. // any change occurs.
  39. PodConfigNotificationSnapshot
  40. // PodConfigNotificationSnapshotAndUpdates delivers an UPDATE and DELETE message whenever pods are
  41. // changed, and a SET message if there are any additions or removals.
  42. PodConfigNotificationSnapshotAndUpdates
  43. // PodConfigNotificationIncremental delivers ADD, UPDATE, DELETE, REMOVE, RECONCILE to the update channel.
  44. PodConfigNotificationIncremental
  45. )
  46. // PodConfig is a configuration mux that merges many sources of pod configuration into a single
  47. // consistent structure, and then delivers incremental change notifications to listeners
  48. // in order.
  49. type PodConfig struct {
  50. pods *podStorage
  51. mux *config.Mux
  52. // the channel of denormalized changes passed to listeners
  53. updates chan kubetypes.PodUpdate
  54. // contains the list of all configured sources
  55. sourcesLock sync.Mutex
  56. sources sets.String
  57. checkpointManager checkpointmanager.CheckpointManager
  58. }
  59. // NewPodConfig creates an object that can merge many configuration sources into a stream
  60. // of normalized updates to a pod configuration.
  61. func NewPodConfig(mode PodConfigNotificationMode, recorder record.EventRecorder) *PodConfig {
  62. updates := make(chan kubetypes.PodUpdate, 50)
  63. storage := newPodStorage(updates, mode, recorder)
  64. podConfig := &PodConfig{
  65. pods: storage,
  66. mux: config.NewMux(storage),
  67. updates: updates,
  68. sources: sets.String{},
  69. }
  70. return podConfig
  71. }
  72. // Channel creates or returns a config source channel. The channel
  73. // only accepts PodUpdates
  74. func (c *PodConfig) Channel(source string) chan<- interface{} {
  75. c.sourcesLock.Lock()
  76. defer c.sourcesLock.Unlock()
  77. c.sources.Insert(source)
  78. return c.mux.Channel(source)
  79. }
  80. // SeenAllSources returns true if seenSources contains all sources in the
  81. // config, and also this config has received a SET message from each source.
  82. func (c *PodConfig) SeenAllSources(seenSources sets.String) bool {
  83. if c.pods == nil {
  84. return false
  85. }
  86. klog.V(5).Infof("Looking for %v, have seen %v", c.sources.List(), seenSources)
  87. return seenSources.HasAll(c.sources.List()...) && c.pods.seenSources(c.sources.List()...)
  88. }
  89. // Updates returns a channel of updates to the configuration, properly denormalized.
  90. func (c *PodConfig) Updates() <-chan kubetypes.PodUpdate {
  91. return c.updates
  92. }
  93. // Sync requests the full configuration be delivered to the update channel.
  94. func (c *PodConfig) Sync() {
  95. c.pods.Sync()
  96. }
  97. // Restore restores pods from the checkpoint path, *once*
  98. func (c *PodConfig) Restore(path string, updates chan<- interface{}) error {
  99. if c.checkpointManager != nil {
  100. return nil
  101. }
  102. var err error
  103. c.checkpointManager, err = checkpointmanager.NewCheckpointManager(path)
  104. if err != nil {
  105. return err
  106. }
  107. pods, err := checkpoint.LoadPods(c.checkpointManager)
  108. if err != nil {
  109. return err
  110. }
  111. updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.RESTORE, Source: kubetypes.ApiserverSource}
  112. return nil
  113. }
  114. // podStorage manages the current pod state at any point in time and ensures updates
  115. // to the channel are delivered in order. Note that this object is an in-memory source of
  116. // "truth" and on creation contains zero entries. Once all previously read sources are
  117. // available, then this object should be considered authoritative.
  118. type podStorage struct {
  119. podLock sync.RWMutex
  120. // map of source name to pod uid to pod reference
  121. pods map[string]map[types.UID]*v1.Pod
  122. mode PodConfigNotificationMode
  123. // ensures that updates are delivered in strict order
  124. // on the updates channel
  125. updateLock sync.Mutex
  126. updates chan<- kubetypes.PodUpdate
  127. // contains the set of all sources that have sent at least one SET
  128. sourcesSeenLock sync.RWMutex
  129. sourcesSeen sets.String
  130. // the EventRecorder to use
  131. recorder record.EventRecorder
  132. }
  133. // TODO: PodConfigNotificationMode could be handled by a listener to the updates channel
  134. // in the future, especially with multiple listeners.
  135. // TODO: allow initialization of the current state of the store with snapshotted version.
  136. func newPodStorage(updates chan<- kubetypes.PodUpdate, mode PodConfigNotificationMode, recorder record.EventRecorder) *podStorage {
  137. return &podStorage{
  138. pods: make(map[string]map[types.UID]*v1.Pod),
  139. mode: mode,
  140. updates: updates,
  141. sourcesSeen: sets.String{},
  142. recorder: recorder,
  143. }
  144. }
  145. // Merge normalizes a set of incoming changes from different sources into a map of all Pods
  146. // and ensures that redundant changes are filtered out, and then pushes zero or more minimal
  147. // updates onto the update channel. Ensures that updates are delivered in order.
  148. func (s *podStorage) Merge(source string, change interface{}) error {
  149. s.updateLock.Lock()
  150. defer s.updateLock.Unlock()
  151. seenBefore := s.sourcesSeen.Has(source)
  152. adds, updates, deletes, removes, reconciles, restores := s.merge(source, change)
  153. firstSet := !seenBefore && s.sourcesSeen.Has(source)
  154. // deliver update notifications
  155. switch s.mode {
  156. case PodConfigNotificationIncremental:
  157. if len(removes.Pods) > 0 {
  158. s.updates <- *removes
  159. }
  160. if len(adds.Pods) > 0 {
  161. s.updates <- *adds
  162. }
  163. if len(updates.Pods) > 0 {
  164. s.updates <- *updates
  165. }
  166. if len(deletes.Pods) > 0 {
  167. s.updates <- *deletes
  168. }
  169. if len(restores.Pods) > 0 {
  170. s.updates <- *restores
  171. }
  172. if firstSet && len(adds.Pods) == 0 && len(updates.Pods) == 0 && len(deletes.Pods) == 0 {
  173. // Send an empty update when first seeing the source and there are
  174. // no ADD or UPDATE or DELETE pods from the source. This signals kubelet that
  175. // the source is ready.
  176. s.updates <- *adds
  177. }
  178. // Only add reconcile support here, because kubelet doesn't support Snapshot update now.
  179. if len(reconciles.Pods) > 0 {
  180. s.updates <- *reconciles
  181. }
  182. case PodConfigNotificationSnapshotAndUpdates:
  183. if len(removes.Pods) > 0 || len(adds.Pods) > 0 || firstSet {
  184. s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*v1.Pod), Op: kubetypes.SET, Source: source}
  185. }
  186. if len(updates.Pods) > 0 {
  187. s.updates <- *updates
  188. }
  189. if len(deletes.Pods) > 0 {
  190. s.updates <- *deletes
  191. }
  192. case PodConfigNotificationSnapshot:
  193. if len(updates.Pods) > 0 || len(deletes.Pods) > 0 || len(adds.Pods) > 0 || len(removes.Pods) > 0 || firstSet {
  194. s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*v1.Pod), Op: kubetypes.SET, Source: source}
  195. }
  196. case PodConfigNotificationUnknown:
  197. fallthrough
  198. default:
  199. panic(fmt.Sprintf("unsupported PodConfigNotificationMode: %#v", s.mode))
  200. }
  201. return nil
  202. }
  203. func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes, removes, reconciles, restores *kubetypes.PodUpdate) {
  204. s.podLock.Lock()
  205. defer s.podLock.Unlock()
  206. addPods := []*v1.Pod{}
  207. updatePods := []*v1.Pod{}
  208. deletePods := []*v1.Pod{}
  209. removePods := []*v1.Pod{}
  210. reconcilePods := []*v1.Pod{}
  211. restorePods := []*v1.Pod{}
  212. pods := s.pods[source]
  213. if pods == nil {
  214. pods = make(map[types.UID]*v1.Pod)
  215. }
  216. // updatePodFunc is the local function which updates the pod cache *oldPods* with new pods *newPods*.
  217. // After updated, new pod will be stored in the pod cache *pods*.
  218. // Notice that *pods* and *oldPods* could be the same cache.
  219. updatePodsFunc := func(newPods []*v1.Pod, oldPods, pods map[types.UID]*v1.Pod) {
  220. filtered := filterInvalidPods(newPods, source, s.recorder)
  221. for _, ref := range filtered {
  222. // Annotate the pod with the source before any comparison.
  223. if ref.Annotations == nil {
  224. ref.Annotations = make(map[string]string)
  225. }
  226. ref.Annotations[kubetypes.ConfigSourceAnnotationKey] = source
  227. if existing, found := oldPods[ref.UID]; found {
  228. pods[ref.UID] = existing
  229. needUpdate, needReconcile, needGracefulDelete := checkAndUpdatePod(existing, ref)
  230. if needUpdate {
  231. updatePods = append(updatePods, existing)
  232. } else if needReconcile {
  233. reconcilePods = append(reconcilePods, existing)
  234. } else if needGracefulDelete {
  235. deletePods = append(deletePods, existing)
  236. }
  237. continue
  238. }
  239. recordFirstSeenTime(ref)
  240. pods[ref.UID] = ref
  241. addPods = append(addPods, ref)
  242. }
  243. }
  244. update := change.(kubetypes.PodUpdate)
  245. switch update.Op {
  246. case kubetypes.ADD, kubetypes.UPDATE, kubetypes.DELETE:
  247. if update.Op == kubetypes.ADD {
  248. klog.V(4).Infof("Adding new pods from source %s : %v", source, update.Pods)
  249. } else if update.Op == kubetypes.DELETE {
  250. klog.V(4).Infof("Graceful deleting pods from source %s : %v", source, update.Pods)
  251. } else {
  252. klog.V(4).Infof("Updating pods from source %s : %v", source, update.Pods)
  253. }
  254. updatePodsFunc(update.Pods, pods, pods)
  255. case kubetypes.REMOVE:
  256. klog.V(4).Infof("Removing pods from source %s : %v", source, update.Pods)
  257. for _, value := range update.Pods {
  258. if existing, found := pods[value.UID]; found {
  259. // this is a delete
  260. delete(pods, value.UID)
  261. removePods = append(removePods, existing)
  262. continue
  263. }
  264. // this is a no-op
  265. }
  266. case kubetypes.SET:
  267. klog.V(4).Infof("Setting pods for source %s", source)
  268. s.markSourceSet(source)
  269. // Clear the old map entries by just creating a new map
  270. oldPods := pods
  271. pods = make(map[types.UID]*v1.Pod)
  272. updatePodsFunc(update.Pods, oldPods, pods)
  273. for uid, existing := range oldPods {
  274. if _, found := pods[uid]; !found {
  275. // this is a delete
  276. removePods = append(removePods, existing)
  277. }
  278. }
  279. case kubetypes.RESTORE:
  280. klog.V(4).Infof("Restoring pods for source %s", source)
  281. restorePods = append(restorePods, update.Pods...)
  282. default:
  283. klog.Warningf("Received invalid update type: %v", update)
  284. }
  285. s.pods[source] = pods
  286. adds = &kubetypes.PodUpdate{Op: kubetypes.ADD, Pods: copyPods(addPods), Source: source}
  287. updates = &kubetypes.PodUpdate{Op: kubetypes.UPDATE, Pods: copyPods(updatePods), Source: source}
  288. deletes = &kubetypes.PodUpdate{Op: kubetypes.DELETE, Pods: copyPods(deletePods), Source: source}
  289. removes = &kubetypes.PodUpdate{Op: kubetypes.REMOVE, Pods: copyPods(removePods), Source: source}
  290. reconciles = &kubetypes.PodUpdate{Op: kubetypes.RECONCILE, Pods: copyPods(reconcilePods), Source: source}
  291. restores = &kubetypes.PodUpdate{Op: kubetypes.RESTORE, Pods: copyPods(restorePods), Source: source}
  292. return adds, updates, deletes, removes, reconciles, restores
  293. }
  294. func (s *podStorage) markSourceSet(source string) {
  295. s.sourcesSeenLock.Lock()
  296. defer s.sourcesSeenLock.Unlock()
  297. s.sourcesSeen.Insert(source)
  298. }
  299. func (s *podStorage) seenSources(sources ...string) bool {
  300. s.sourcesSeenLock.RLock()
  301. defer s.sourcesSeenLock.RUnlock()
  302. return s.sourcesSeen.HasAll(sources...)
  303. }
  304. func filterInvalidPods(pods []*v1.Pod, source string, recorder record.EventRecorder) (filtered []*v1.Pod) {
  305. names := sets.String{}
  306. for i, pod := range pods {
  307. // Pods from each source are assumed to have passed validation individually.
  308. // This function only checks if there is any naming conflict.
  309. name := kubecontainer.GetPodFullName(pod)
  310. if names.Has(name) {
  311. klog.Warningf("Pod[%d] (%s) from %s failed validation due to duplicate pod name %q, ignoring", i+1, format.Pod(pod), source, pod.Name)
  312. recorder.Eventf(pod, v1.EventTypeWarning, events.FailedValidation, "Error validating pod %s from %s due to duplicate pod name %q, ignoring", format.Pod(pod), source, pod.Name)
  313. continue
  314. } else {
  315. names.Insert(name)
  316. }
  317. filtered = append(filtered, pod)
  318. }
  319. return
  320. }
  321. // Annotations that the kubelet adds to the pod.
  322. var localAnnotations = []string{
  323. kubetypes.ConfigSourceAnnotationKey,
  324. kubetypes.ConfigMirrorAnnotationKey,
  325. kubetypes.ConfigFirstSeenAnnotationKey,
  326. }
  327. func isLocalAnnotationKey(key string) bool {
  328. for _, localKey := range localAnnotations {
  329. if key == localKey {
  330. return true
  331. }
  332. }
  333. return false
  334. }
  335. // isAnnotationMapEqual returns true if the existing annotation Map is equal to candidate except
  336. // for local annotations.
  337. func isAnnotationMapEqual(existingMap, candidateMap map[string]string) bool {
  338. if candidateMap == nil {
  339. candidateMap = make(map[string]string)
  340. }
  341. for k, v := range candidateMap {
  342. if isLocalAnnotationKey(k) {
  343. continue
  344. }
  345. if existingValue, ok := existingMap[k]; ok && existingValue == v {
  346. continue
  347. }
  348. return false
  349. }
  350. for k := range existingMap {
  351. if isLocalAnnotationKey(k) {
  352. continue
  353. }
  354. // stale entry in existing map.
  355. if _, exists := candidateMap[k]; !exists {
  356. return false
  357. }
  358. }
  359. return true
  360. }
  361. // recordFirstSeenTime records the first seen time of this pod.
  362. func recordFirstSeenTime(pod *v1.Pod) {
  363. klog.V(4).Infof("Receiving a new pod %q", format.Pod(pod))
  364. pod.Annotations[kubetypes.ConfigFirstSeenAnnotationKey] = kubetypes.NewTimestamp().GetString()
  365. }
  366. // updateAnnotations returns an Annotation map containing the api annotation map plus
  367. // locally managed annotations
  368. func updateAnnotations(existing, ref *v1.Pod) {
  369. annotations := make(map[string]string, len(ref.Annotations)+len(localAnnotations))
  370. for k, v := range ref.Annotations {
  371. annotations[k] = v
  372. }
  373. for _, k := range localAnnotations {
  374. if v, ok := existing.Annotations[k]; ok {
  375. annotations[k] = v
  376. }
  377. }
  378. existing.Annotations = annotations
  379. }
  380. func podsDifferSemantically(existing, ref *v1.Pod) bool {
  381. if reflect.DeepEqual(existing.Spec, ref.Spec) &&
  382. reflect.DeepEqual(existing.Labels, ref.Labels) &&
  383. reflect.DeepEqual(existing.DeletionTimestamp, ref.DeletionTimestamp) &&
  384. reflect.DeepEqual(existing.DeletionGracePeriodSeconds, ref.DeletionGracePeriodSeconds) &&
  385. isAnnotationMapEqual(existing.Annotations, ref.Annotations) {
  386. return false
  387. }
  388. return true
  389. }
  390. // checkAndUpdatePod updates existing, and:
  391. // * if ref makes a meaningful change, returns needUpdate=true
  392. // * if ref makes a meaningful change, and this change is graceful deletion, returns needGracefulDelete=true
  393. // * if ref makes no meaningful change, but changes the pod status, returns needReconcile=true
  394. // * else return all false
  395. // Now, needUpdate, needGracefulDelete and needReconcile should never be both true
  396. func checkAndUpdatePod(existing, ref *v1.Pod) (needUpdate, needReconcile, needGracefulDelete bool) {
  397. // 1. this is a reconcile
  398. // TODO: it would be better to update the whole object and only preserve certain things
  399. // like the source annotation or the UID (to ensure safety)
  400. if !podsDifferSemantically(existing, ref) {
  401. // this is not an update
  402. // Only check reconcile when it is not an update, because if the pod is going to
  403. // be updated, an extra reconcile is unnecessary
  404. if !reflect.DeepEqual(existing.Status, ref.Status) {
  405. // Pod with changed pod status needs reconcile, because kubelet should
  406. // be the source of truth of pod status.
  407. existing.Status = ref.Status
  408. needReconcile = true
  409. }
  410. return
  411. }
  412. // Overwrite the first-seen time with the existing one. This is our own
  413. // internal annotation, there is no need to update.
  414. ref.Annotations[kubetypes.ConfigFirstSeenAnnotationKey] = existing.Annotations[kubetypes.ConfigFirstSeenAnnotationKey]
  415. existing.Spec = ref.Spec
  416. existing.Labels = ref.Labels
  417. existing.DeletionTimestamp = ref.DeletionTimestamp
  418. existing.DeletionGracePeriodSeconds = ref.DeletionGracePeriodSeconds
  419. existing.Status = ref.Status
  420. updateAnnotations(existing, ref)
  421. // 2. this is an graceful delete
  422. if ref.DeletionTimestamp != nil {
  423. needGracefulDelete = true
  424. } else {
  425. // 3. this is an update
  426. needUpdate = true
  427. }
  428. return
  429. }
  430. // Sync sends a copy of the current state through the update channel.
  431. func (s *podStorage) Sync() {
  432. s.updateLock.Lock()
  433. defer s.updateLock.Unlock()
  434. s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*v1.Pod), Op: kubetypes.SET, Source: kubetypes.AllSource}
  435. }
  436. // Object implements config.Accessor
  437. func (s *podStorage) MergedState() interface{} {
  438. s.podLock.RLock()
  439. defer s.podLock.RUnlock()
  440. pods := make([]*v1.Pod, 0)
  441. for _, sourcePods := range s.pods {
  442. for _, podRef := range sourcePods {
  443. pods = append(pods, podRef.DeepCopy())
  444. }
  445. }
  446. return pods
  447. }
  448. func copyPods(sourcePods []*v1.Pod) []*v1.Pod {
  449. pods := []*v1.Pod{}
  450. for _, source := range sourcePods {
  451. // Use a deep copy here just in case
  452. pods = append(pods, source.DeepCopy())
  453. }
  454. return pods
  455. }