fs_resource_analyzer.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package stats
  14. import (
  15. "sync"
  16. "sync/atomic"
  17. "time"
  18. "k8s.io/apimachinery/pkg/types"
  19. "k8s.io/apimachinery/pkg/util/wait"
  20. "k8s.io/klog"
  21. )
  22. type statCache map[types.UID]*volumeStatCalculator
  23. // fsResourceAnalyzerInterface is for embedding fs functions into ResourceAnalyzer
  24. type fsResourceAnalyzerInterface interface {
  25. GetPodVolumeStats(uid types.UID) (PodVolumeStats, bool)
  26. }
  27. // fsResourceAnalyzer provides stats about fs resource usage
  28. type fsResourceAnalyzer struct {
  29. statsProvider Provider
  30. calcPeriod time.Duration
  31. cachedVolumeStats atomic.Value
  32. startOnce sync.Once
  33. }
  34. var _ fsResourceAnalyzerInterface = &fsResourceAnalyzer{}
  35. // newFsResourceAnalyzer returns a new fsResourceAnalyzer implementation
  36. func newFsResourceAnalyzer(statsProvider Provider, calcVolumePeriod time.Duration) *fsResourceAnalyzer {
  37. r := &fsResourceAnalyzer{
  38. statsProvider: statsProvider,
  39. calcPeriod: calcVolumePeriod,
  40. }
  41. r.cachedVolumeStats.Store(make(statCache))
  42. return r
  43. }
  44. // Start eager background caching of volume stats.
  45. func (s *fsResourceAnalyzer) Start() {
  46. s.startOnce.Do(func() {
  47. if s.calcPeriod <= 0 {
  48. klog.Info("Volume stats collection disabled.")
  49. return
  50. }
  51. klog.Info("Starting FS ResourceAnalyzer")
  52. go wait.Forever(func() { s.updateCachedPodVolumeStats() }, s.calcPeriod)
  53. })
  54. }
  55. // updateCachedPodVolumeStats calculates and caches the PodVolumeStats for every Pod known to the kubelet.
  56. func (s *fsResourceAnalyzer) updateCachedPodVolumeStats() {
  57. oldCache := s.cachedVolumeStats.Load().(statCache)
  58. newCache := make(statCache)
  59. // Copy existing entries to new map, creating/starting new entries for pods missing from the cache
  60. for _, pod := range s.statsProvider.GetPods() {
  61. if value, found := oldCache[pod.GetUID()]; !found {
  62. newCache[pod.GetUID()] = newVolumeStatCalculator(s.statsProvider, s.calcPeriod, pod).StartOnce()
  63. } else {
  64. newCache[pod.GetUID()] = value
  65. }
  66. }
  67. // Stop entries for pods that have been deleted
  68. for uid, entry := range oldCache {
  69. if _, found := newCache[uid]; !found {
  70. entry.StopOnce()
  71. }
  72. }
  73. // Update the cache reference
  74. s.cachedVolumeStats.Store(newCache)
  75. }
  76. // GetPodVolumeStats returns the PodVolumeStats for a given pod. Results are looked up from a cache that
  77. // is eagerly populated in the background, and never calculated on the fly.
  78. func (s *fsResourceAnalyzer) GetPodVolumeStats(uid types.UID) (PodVolumeStats, bool) {
  79. cache := s.cachedVolumeStats.Load().(statCache)
  80. statCalc, found := cache[uid]
  81. if !found {
  82. // TODO: Differentiate between stats being empty
  83. // See issue #20679
  84. return PodVolumeStats{}, false
  85. }
  86. return statCalc.GetLatest()
  87. }