kubelet_metrics.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package metrics
  14. import (
  15. "context"
  16. "fmt"
  17. "io/ioutil"
  18. "net/http"
  19. "sort"
  20. "strconv"
  21. "strings"
  22. "time"
  23. "k8s.io/apimachinery/pkg/util/sets"
  24. clientset "k8s.io/client-go/kubernetes"
  25. "k8s.io/component-base/metrics/testutil"
  26. dockermetrics "k8s.io/kubernetes/pkg/kubelet/dockershim/metrics"
  27. kubeletmetrics "k8s.io/kubernetes/pkg/kubelet/metrics"
  28. e2elog "k8s.io/kubernetes/test/e2e/framework/log"
  29. )
  30. const (
  31. proxyTimeout = 2 * time.Minute
  32. )
  33. // KubeletMetrics is metrics for kubelet
  34. type KubeletMetrics testutil.Metrics
  35. // Equal returns true if all metrics are the same as the arguments.
  36. func (m *KubeletMetrics) Equal(o KubeletMetrics) bool {
  37. return (*testutil.Metrics)(m).Equal(testutil.Metrics(o))
  38. }
  39. // NewKubeletMetrics returns new metrics which are initialized.
  40. func NewKubeletMetrics() KubeletMetrics {
  41. result := testutil.NewMetrics()
  42. return KubeletMetrics(result)
  43. }
  44. // GrabKubeletMetricsWithoutProxy retrieve metrics from the kubelet on the given node using a simple GET over http.
  45. // Currently only used in integration tests.
  46. func GrabKubeletMetricsWithoutProxy(nodeName, path string) (KubeletMetrics, error) {
  47. resp, err := http.Get(fmt.Sprintf("http://%s%s", nodeName, path))
  48. if err != nil {
  49. return KubeletMetrics{}, err
  50. }
  51. defer resp.Body.Close()
  52. body, err := ioutil.ReadAll(resp.Body)
  53. if err != nil {
  54. return KubeletMetrics{}, err
  55. }
  56. return parseKubeletMetrics(string(body))
  57. }
  58. func parseKubeletMetrics(data string) (KubeletMetrics, error) {
  59. result := NewKubeletMetrics()
  60. if err := testutil.ParseMetrics(data, (*testutil.Metrics)(&result)); err != nil {
  61. return KubeletMetrics{}, err
  62. }
  63. return result, nil
  64. }
  65. func (g *Grabber) getMetricsFromNode(nodeName string, kubeletPort int) (string, error) {
  66. // There's a problem with timing out during proxy. Wrapping this in a goroutine to prevent deadlock.
  67. finished := make(chan struct{}, 1)
  68. var err error
  69. var rawOutput []byte
  70. go func() {
  71. rawOutput, err = g.client.CoreV1().RESTClient().Get().
  72. Resource("nodes").
  73. SubResource("proxy").
  74. Name(fmt.Sprintf("%v:%v", nodeName, kubeletPort)).
  75. Suffix("metrics").
  76. Do(context.TODO()).Raw()
  77. finished <- struct{}{}
  78. }()
  79. select {
  80. case <-time.After(proxyTimeout):
  81. return "", fmt.Errorf("Timed out when waiting for proxy to gather metrics from %v", nodeName)
  82. case <-finished:
  83. if err != nil {
  84. return "", err
  85. }
  86. return string(rawOutput), nil
  87. }
  88. }
  89. // KubeletLatencyMetric stores metrics scraped from the kubelet server's /metric endpoint.
  90. // TODO: Get some more structure around the metrics and this type
  91. type KubeletLatencyMetric struct {
  92. // eg: list, info, create
  93. Operation string
  94. // eg: sync_pods, pod_worker
  95. Method string
  96. // 0 <= quantile <=1, e.g. 0.95 is 95%tile, 0.5 is median.
  97. Quantile float64
  98. Latency time.Duration
  99. }
  100. // KubeletLatencyMetrics implements sort.Interface for []KubeletMetric based on
  101. // the latency field.
  102. type KubeletLatencyMetrics []KubeletLatencyMetric
  103. func (a KubeletLatencyMetrics) Len() int { return len(a) }
  104. func (a KubeletLatencyMetrics) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
  105. func (a KubeletLatencyMetrics) Less(i, j int) bool { return a[i].Latency > a[j].Latency }
  106. // If a apiserver client is passed in, the function will try to get kubelet metrics from metrics grabber;
  107. // or else, the function will try to get kubelet metrics directly from the node.
  108. func getKubeletMetricsFromNode(c clientset.Interface, nodeName string) (KubeletMetrics, error) {
  109. if c == nil {
  110. return GrabKubeletMetricsWithoutProxy(nodeName, "/metrics")
  111. }
  112. grabber, err := NewMetricsGrabber(c, nil, true, false, false, false, false)
  113. if err != nil {
  114. return KubeletMetrics{}, err
  115. }
  116. return grabber.GrabFromKubelet(nodeName)
  117. }
  118. // GetKubeletMetrics gets all metrics in kubelet subsystem from specified node and trims
  119. // the subsystem prefix.
  120. func GetKubeletMetrics(c clientset.Interface, nodeName string) (KubeletMetrics, error) {
  121. ms, err := getKubeletMetricsFromNode(c, nodeName)
  122. if err != nil {
  123. return KubeletMetrics{}, err
  124. }
  125. kubeletMetrics := make(KubeletMetrics)
  126. for name, samples := range ms {
  127. const prefix = kubeletmetrics.KubeletSubsystem + "_"
  128. if !strings.HasPrefix(name, prefix) {
  129. // Not a kubelet metric.
  130. continue
  131. }
  132. method := strings.TrimPrefix(name, prefix)
  133. kubeletMetrics[method] = samples
  134. }
  135. return kubeletMetrics, nil
  136. }
  137. // GetDefaultKubeletLatencyMetrics calls GetKubeletLatencyMetrics with a set of default metricNames
  138. // identifying common latency metrics.
  139. // Note that the KubeletMetrics passed in should not contain subsystem prefix.
  140. func GetDefaultKubeletLatencyMetrics(ms KubeletMetrics) KubeletLatencyMetrics {
  141. latencyMetricNames := sets.NewString(
  142. kubeletmetrics.PodWorkerDurationKey,
  143. kubeletmetrics.PodWorkerStartDurationKey,
  144. kubeletmetrics.PodStartDurationKey,
  145. kubeletmetrics.CgroupManagerOperationsKey,
  146. dockermetrics.DockerOperationsLatencyKey,
  147. kubeletmetrics.PodWorkerStartDurationKey,
  148. kubeletmetrics.PLEGRelistDurationKey,
  149. )
  150. return GetKubeletLatencyMetrics(ms, latencyMetricNames)
  151. }
  152. // GetKubeletLatencyMetrics filters ms to include only those contained in the metricNames set,
  153. // then constructs a KubeletLatencyMetrics list based on the samples associated with those metrics.
  154. func GetKubeletLatencyMetrics(ms KubeletMetrics, filterMetricNames sets.String) KubeletLatencyMetrics {
  155. var latencyMetrics KubeletLatencyMetrics
  156. for name, samples := range ms {
  157. if !filterMetricNames.Has(name) {
  158. continue
  159. }
  160. for _, sample := range samples {
  161. latency := sample.Value
  162. operation := string(sample.Metric["operation_type"])
  163. var quantile float64
  164. if val, ok := sample.Metric[testutil.QuantileLabel]; ok {
  165. var err error
  166. if quantile, err = strconv.ParseFloat(string(val), 64); err != nil {
  167. continue
  168. }
  169. }
  170. latencyMetrics = append(latencyMetrics, KubeletLatencyMetric{
  171. Operation: operation,
  172. Method: name,
  173. Quantile: quantile,
  174. Latency: time.Duration(int64(latency)) * time.Microsecond,
  175. })
  176. }
  177. }
  178. return latencyMetrics
  179. }
  180. // HighLatencyKubeletOperations logs and counts the high latency metrics exported by the kubelet server via /metrics.
  181. func HighLatencyKubeletOperations(c clientset.Interface, threshold time.Duration, nodeName string, logFunc func(fmt string, args ...interface{})) (KubeletLatencyMetrics, error) {
  182. ms, err := GetKubeletMetrics(c, nodeName)
  183. if err != nil {
  184. return KubeletLatencyMetrics{}, err
  185. }
  186. latencyMetrics := GetDefaultKubeletLatencyMetrics(ms)
  187. sort.Sort(latencyMetrics)
  188. var badMetrics KubeletLatencyMetrics
  189. logFunc("\nLatency metrics for node %v", nodeName)
  190. for _, m := range latencyMetrics {
  191. if m.Latency > threshold {
  192. badMetrics = append(badMetrics, m)
  193. e2elog.Logf("%+v", m)
  194. }
  195. }
  196. return badMetrics, nil
  197. }