metrics_util.go 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package framework
  14. import (
  15. "bytes"
  16. "context"
  17. "encoding/json"
  18. "fmt"
  19. "io"
  20. "math"
  21. "reflect"
  22. "sort"
  23. "strconv"
  24. "strings"
  25. "sync"
  26. "time"
  27. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  28. "k8s.io/apimachinery/pkg/util/sets"
  29. clientset "k8s.io/client-go/kubernetes"
  30. "k8s.io/kubernetes/pkg/master/ports"
  31. schedulermetric "k8s.io/kubernetes/pkg/scheduler/metrics"
  32. "k8s.io/kubernetes/pkg/util/system"
  33. e2elog "k8s.io/kubernetes/test/e2e/framework/log"
  34. "k8s.io/kubernetes/test/e2e/framework/metrics"
  35. e2essh "k8s.io/kubernetes/test/e2e/framework/ssh"
  36. "github.com/prometheus/common/expfmt"
  37. "github.com/prometheus/common/model"
  38. )
  39. const (
  40. // NodeStartupThreshold is a rough estimate of the time allocated for a pod to start on a node.
  41. NodeStartupThreshold = 4 * time.Second
  42. // We are setting 1s threshold for apicalls even in small clusters to avoid flakes.
  43. // The problem is that if long GC is happening in small clusters (where we have e.g.
  44. // 1-core master machines) and tests are pretty short, it may consume significant
  45. // portion of CPU and basically stop all the real work.
  46. // Increasing threshold to 1s is within our SLO and should solve this problem.
  47. apiCallLatencyThreshold time.Duration = 1 * time.Second
  48. // We use a higher threshold for list apicalls if the cluster is big (i.e having > 500 nodes)
  49. // as list response sizes are bigger in general for big clusters. We also use a higher threshold
  50. // for list calls at cluster scope (this includes non-namespaced and all-namespaced calls).
  51. apiListCallLatencyThreshold time.Duration = 5 * time.Second
  52. apiClusterScopeListCallThreshold time.Duration = 10 * time.Second
  53. bigClusterNodeCountThreshold = 500
  54. // Cluster Autoscaler metrics names
  55. caFunctionMetric = "cluster_autoscaler_function_duration_seconds_bucket"
  56. caFunctionMetricLabel = "function"
  57. )
  58. // MetricsForE2E is metrics collection of components.
  59. type MetricsForE2E metrics.Collection
  60. func (m *MetricsForE2E) filterMetrics() {
  61. apiServerMetrics := make(metrics.APIServerMetrics)
  62. for _, metric := range interestingAPIServerMetrics {
  63. apiServerMetrics[metric] = (*m).APIServerMetrics[metric]
  64. }
  65. controllerManagerMetrics := make(metrics.ControllerManagerMetrics)
  66. for _, metric := range interestingControllerManagerMetrics {
  67. controllerManagerMetrics[metric] = (*m).ControllerManagerMetrics[metric]
  68. }
  69. kubeletMetrics := make(map[string]metrics.KubeletMetrics)
  70. for kubelet, grabbed := range (*m).KubeletMetrics {
  71. kubeletMetrics[kubelet] = make(metrics.KubeletMetrics)
  72. for _, metric := range interestingKubeletMetrics {
  73. kubeletMetrics[kubelet][metric] = grabbed[metric]
  74. }
  75. }
  76. (*m).APIServerMetrics = apiServerMetrics
  77. (*m).ControllerManagerMetrics = controllerManagerMetrics
  78. (*m).KubeletMetrics = kubeletMetrics
  79. }
  80. func printSample(sample *model.Sample) string {
  81. buf := make([]string, 0)
  82. // Id is a VERY special label. For 'normal' container it's useless, but it's necessary
  83. // for 'system' containers (e.g. /docker-daemon, /kubelet, etc.). We know if that's the
  84. // case by checking if there's a label "kubernetes_container_name" present. It's hacky
  85. // but it works...
  86. _, normalContainer := sample.Metric["kubernetes_container_name"]
  87. for k, v := range sample.Metric {
  88. if strings.HasPrefix(string(k), "__") {
  89. continue
  90. }
  91. if string(k) == "id" && normalContainer {
  92. continue
  93. }
  94. buf = append(buf, fmt.Sprintf("%v=%v", string(k), v))
  95. }
  96. return fmt.Sprintf("[%v] = %v", strings.Join(buf, ","), sample.Value)
  97. }
  98. // PrintHumanReadable returns e2e metrics with JSON format.
  99. func (m *MetricsForE2E) PrintHumanReadable() string {
  100. buf := bytes.Buffer{}
  101. for _, interestingMetric := range interestingAPIServerMetrics {
  102. buf.WriteString(fmt.Sprintf("For %v:\n", interestingMetric))
  103. for _, sample := range (*m).APIServerMetrics[interestingMetric] {
  104. buf.WriteString(fmt.Sprintf("\t%v\n", printSample(sample)))
  105. }
  106. }
  107. for _, interestingMetric := range interestingControllerManagerMetrics {
  108. buf.WriteString(fmt.Sprintf("For %v:\n", interestingMetric))
  109. for _, sample := range (*m).ControllerManagerMetrics[interestingMetric] {
  110. buf.WriteString(fmt.Sprintf("\t%v\n", printSample(sample)))
  111. }
  112. }
  113. for _, interestingMetric := range interestingClusterAutoscalerMetrics {
  114. buf.WriteString(fmt.Sprintf("For %v:\n", interestingMetric))
  115. for _, sample := range (*m).ClusterAutoscalerMetrics[interestingMetric] {
  116. buf.WriteString(fmt.Sprintf("\t%v\n", printSample(sample)))
  117. }
  118. }
  119. for kubelet, grabbed := range (*m).KubeletMetrics {
  120. buf.WriteString(fmt.Sprintf("For %v:\n", kubelet))
  121. for _, interestingMetric := range interestingKubeletMetrics {
  122. buf.WriteString(fmt.Sprintf("\tFor %v:\n", interestingMetric))
  123. for _, sample := range grabbed[interestingMetric] {
  124. buf.WriteString(fmt.Sprintf("\t\t%v\n", printSample(sample)))
  125. }
  126. }
  127. }
  128. return buf.String()
  129. }
  130. // PrintJSON returns e2e metrics with JSON format.
  131. func (m *MetricsForE2E) PrintJSON() string {
  132. m.filterMetrics()
  133. return PrettyPrintJSON(m)
  134. }
  135. // SummaryKind returns the summary of e2e metrics.
  136. func (m *MetricsForE2E) SummaryKind() string {
  137. return "MetricsForE2E"
  138. }
  139. var schedulingLatencyMetricName = model.LabelValue(schedulermetric.SchedulerSubsystem + "_" + schedulermetric.SchedulingLatencyName)
  140. var interestingAPIServerMetrics = []string{
  141. "apiserver_request_total",
  142. // TODO(krzysied): apiserver_request_latencies_summary is a deprecated metric.
  143. // It should be replaced with new metric.
  144. "apiserver_request_latencies_summary",
  145. "apiserver_init_events_total",
  146. }
  147. var interestingControllerManagerMetrics = []string{
  148. "garbage_collector_attempt_to_delete_queue_latency",
  149. "garbage_collector_attempt_to_delete_work_duration",
  150. "garbage_collector_attempt_to_orphan_queue_latency",
  151. "garbage_collector_attempt_to_orphan_work_duration",
  152. "garbage_collector_dirty_processing_latency_microseconds",
  153. "garbage_collector_event_processing_latency_microseconds",
  154. "garbage_collector_graph_changes_queue_latency",
  155. "garbage_collector_graph_changes_work_duration",
  156. "garbage_collector_orphan_processing_latency_microseconds",
  157. "namespace_queue_latency",
  158. "namespace_queue_latency_sum",
  159. "namespace_queue_latency_count",
  160. "namespace_retries",
  161. "namespace_work_duration",
  162. "namespace_work_duration_sum",
  163. "namespace_work_duration_count",
  164. }
  165. var interestingKubeletMetrics = []string{
  166. "kubelet_docker_operations_errors_total",
  167. "kubelet_docker_operations_duration_seconds",
  168. "kubelet_pod_start_duration_seconds",
  169. "kubelet_pod_worker_duration_seconds",
  170. "kubelet_pod_worker_start_duration_seconds",
  171. }
  172. var interestingClusterAutoscalerMetrics = []string{
  173. "function_duration_seconds",
  174. "errors_total",
  175. "evicted_pods_total",
  176. }
  177. // LatencyMetric is a struct for dashboard metrics.
  178. type LatencyMetric struct {
  179. Perc50 time.Duration `json:"Perc50"`
  180. Perc90 time.Duration `json:"Perc90"`
  181. Perc99 time.Duration `json:"Perc99"`
  182. Perc100 time.Duration `json:"Perc100"`
  183. }
  184. // PodStartupLatency is a struct for managing latency of pod startup.
  185. type PodStartupLatency struct {
  186. CreateToScheduleLatency LatencyMetric `json:"createToScheduleLatency"`
  187. ScheduleToRunLatency LatencyMetric `json:"scheduleToRunLatency"`
  188. RunToWatchLatency LatencyMetric `json:"runToWatchLatency"`
  189. ScheduleToWatchLatency LatencyMetric `json:"scheduleToWatchLatency"`
  190. E2ELatency LatencyMetric `json:"e2eLatency"`
  191. }
  192. // SummaryKind returns the summary of pod startup latency.
  193. func (l *PodStartupLatency) SummaryKind() string {
  194. return "PodStartupLatency"
  195. }
  196. // PrintHumanReadable returns pod startup letency with JSON format.
  197. func (l *PodStartupLatency) PrintHumanReadable() string {
  198. return PrettyPrintJSON(l)
  199. }
  200. // PrintJSON returns pod startup letency with JSON format.
  201. func (l *PodStartupLatency) PrintJSON() string {
  202. return PrettyPrintJSON(PodStartupLatencyToPerfData(l))
  203. }
  204. // SchedulingMetrics is a struct for managing scheduling metrics.
  205. type SchedulingMetrics struct {
  206. PredicateEvaluationLatency LatencyMetric `json:"predicateEvaluationLatency"`
  207. PriorityEvaluationLatency LatencyMetric `json:"priorityEvaluationLatency"`
  208. PreemptionEvaluationLatency LatencyMetric `json:"preemptionEvaluationLatency"`
  209. BindingLatency LatencyMetric `json:"bindingLatency"`
  210. ThroughputAverage float64 `json:"throughputAverage"`
  211. ThroughputPerc50 float64 `json:"throughputPerc50"`
  212. ThroughputPerc90 float64 `json:"throughputPerc90"`
  213. ThroughputPerc99 float64 `json:"throughputPerc99"`
  214. }
  215. // SummaryKind returns the summary of scheduling metrics.
  216. func (l *SchedulingMetrics) SummaryKind() string {
  217. return "SchedulingMetrics"
  218. }
  219. // PrintHumanReadable returns scheduling metrics with JSON format.
  220. func (l *SchedulingMetrics) PrintHumanReadable() string {
  221. return PrettyPrintJSON(l)
  222. }
  223. // PrintJSON returns scheduling metrics with JSON format.
  224. func (l *SchedulingMetrics) PrintJSON() string {
  225. return PrettyPrintJSON(l)
  226. }
  227. // Histogram is a struct for managing histogram.
  228. type Histogram struct {
  229. Labels map[string]string `json:"labels"`
  230. Buckets map[string]int `json:"buckets"`
  231. }
  232. // HistogramVec is an array of Histogram.
  233. type HistogramVec []Histogram
  234. func newHistogram(labels map[string]string) *Histogram {
  235. return &Histogram{
  236. Labels: labels,
  237. Buckets: make(map[string]int),
  238. }
  239. }
  240. // EtcdMetrics is a struct for managing etcd metrics.
  241. type EtcdMetrics struct {
  242. BackendCommitDuration HistogramVec `json:"backendCommitDuration"`
  243. SnapshotSaveTotalDuration HistogramVec `json:"snapshotSaveTotalDuration"`
  244. PeerRoundTripTime HistogramVec `json:"peerRoundTripTime"`
  245. WalFsyncDuration HistogramVec `json:"walFsyncDuration"`
  246. MaxDatabaseSize float64 `json:"maxDatabaseSize"`
  247. }
  248. func newEtcdMetrics() *EtcdMetrics {
  249. return &EtcdMetrics{
  250. BackendCommitDuration: make(HistogramVec, 0),
  251. SnapshotSaveTotalDuration: make(HistogramVec, 0),
  252. PeerRoundTripTime: make(HistogramVec, 0),
  253. WalFsyncDuration: make(HistogramVec, 0),
  254. }
  255. }
  256. // SummaryKind returns the summary of etcd metrics.
  257. func (l *EtcdMetrics) SummaryKind() string {
  258. return "EtcdMetrics"
  259. }
  260. // PrintHumanReadable returns etcd metrics with JSON format.
  261. func (l *EtcdMetrics) PrintHumanReadable() string {
  262. return PrettyPrintJSON(l)
  263. }
  264. // PrintJSON returns etcd metrics with JSON format.
  265. func (l *EtcdMetrics) PrintJSON() string {
  266. return PrettyPrintJSON(l)
  267. }
  268. // EtcdMetricsCollector is a struct for managing etcd metrics collector.
  269. type EtcdMetricsCollector struct {
  270. stopCh chan struct{}
  271. wg *sync.WaitGroup
  272. metrics *EtcdMetrics
  273. }
  274. // NewEtcdMetricsCollector creates a new etcd metrics collector.
  275. func NewEtcdMetricsCollector() *EtcdMetricsCollector {
  276. return &EtcdMetricsCollector{
  277. stopCh: make(chan struct{}),
  278. wg: &sync.WaitGroup{},
  279. metrics: newEtcdMetrics(),
  280. }
  281. }
  282. func getEtcdMetrics() ([]*model.Sample, error) {
  283. // Etcd is only exposed on localhost level. We are using ssh method
  284. if TestContext.Provider == "gke" || TestContext.Provider == "eks" {
  285. e2elog.Logf("Not grabbing etcd metrics through master SSH: unsupported for %s", TestContext.Provider)
  286. return nil, nil
  287. }
  288. cmd := "curl http://localhost:2379/metrics"
  289. sshResult, err := e2essh.SSH(cmd, GetMasterHost()+":22", TestContext.Provider)
  290. if err != nil || sshResult.Code != 0 {
  291. return nil, fmt.Errorf("unexpected error (code: %d) in ssh connection to master: %#v", sshResult.Code, err)
  292. }
  293. data := sshResult.Stdout
  294. return extractMetricSamples(data)
  295. }
  296. func getEtcdDatabaseSize() (float64, error) {
  297. samples, err := getEtcdMetrics()
  298. if err != nil {
  299. return 0, err
  300. }
  301. for _, sample := range samples {
  302. if sample.Metric[model.MetricNameLabel] == "etcd_debugging_mvcc_db_total_size_in_bytes" {
  303. return float64(sample.Value), nil
  304. }
  305. }
  306. return 0, fmt.Errorf("Couldn't find etcd database size metric")
  307. }
  308. // StartCollecting starts to collect etcd db size metric periodically
  309. // and updates MaxDatabaseSize accordingly.
  310. func (mc *EtcdMetricsCollector) StartCollecting(interval time.Duration) {
  311. mc.wg.Add(1)
  312. go func() {
  313. defer mc.wg.Done()
  314. for {
  315. select {
  316. case <-time.After(interval):
  317. dbSize, err := getEtcdDatabaseSize()
  318. if err != nil {
  319. e2elog.Logf("Failed to collect etcd database size")
  320. continue
  321. }
  322. mc.metrics.MaxDatabaseSize = math.Max(mc.metrics.MaxDatabaseSize, dbSize)
  323. case <-mc.stopCh:
  324. return
  325. }
  326. }
  327. }()
  328. }
  329. // StopAndSummarize stops etcd metrics collector and summarizes the metrics.
  330. func (mc *EtcdMetricsCollector) StopAndSummarize() error {
  331. close(mc.stopCh)
  332. mc.wg.Wait()
  333. // Do some one-off collection of metrics.
  334. samples, err := getEtcdMetrics()
  335. if err != nil {
  336. return err
  337. }
  338. for _, sample := range samples {
  339. switch sample.Metric[model.MetricNameLabel] {
  340. case "etcd_disk_backend_commit_duration_seconds_bucket":
  341. convertSampleToBucket(sample, &mc.metrics.BackendCommitDuration)
  342. case "etcd_debugging_snap_save_total_duration_seconds_bucket":
  343. convertSampleToBucket(sample, &mc.metrics.SnapshotSaveTotalDuration)
  344. case "etcd_disk_wal_fsync_duration_seconds_bucket":
  345. convertSampleToBucket(sample, &mc.metrics.WalFsyncDuration)
  346. case "etcd_network_peer_round_trip_time_seconds_bucket":
  347. convertSampleToBucket(sample, &mc.metrics.PeerRoundTripTime)
  348. }
  349. }
  350. return nil
  351. }
  352. // GetMetrics returns metrics of etcd metrics collector.
  353. func (mc *EtcdMetricsCollector) GetMetrics() *EtcdMetrics {
  354. return mc.metrics
  355. }
  356. // APICall is a struct for managing API call.
  357. type APICall struct {
  358. Resource string `json:"resource"`
  359. Subresource string `json:"subresource"`
  360. Verb string `json:"verb"`
  361. Scope string `json:"scope"`
  362. Latency LatencyMetric `json:"latency"`
  363. Count int `json:"count"`
  364. }
  365. // APIResponsiveness is a struct for managing multiple API calls.
  366. type APIResponsiveness struct {
  367. APICalls []APICall `json:"apicalls"`
  368. }
  369. // SummaryKind returns the summary of API responsiveness.
  370. func (a *APIResponsiveness) SummaryKind() string {
  371. return "APIResponsiveness"
  372. }
  373. // PrintHumanReadable returns metrics with JSON format.
  374. func (a *APIResponsiveness) PrintHumanReadable() string {
  375. return PrettyPrintJSON(a)
  376. }
  377. // PrintJSON returns metrics of PerfData(50, 90 and 99th percentiles) with JSON format.
  378. func (a *APIResponsiveness) PrintJSON() string {
  379. return PrettyPrintJSON(APICallToPerfData(a))
  380. }
  381. func (a *APIResponsiveness) Len() int { return len(a.APICalls) }
  382. func (a *APIResponsiveness) Swap(i, j int) {
  383. a.APICalls[i], a.APICalls[j] = a.APICalls[j], a.APICalls[i]
  384. }
  385. func (a *APIResponsiveness) Less(i, j int) bool {
  386. return a.APICalls[i].Latency.Perc99 < a.APICalls[j].Latency.Perc99
  387. }
  388. // Set request latency for a particular quantile in the APICall metric entry (creating one if necessary).
  389. // 0 <= quantile <=1 (e.g. 0.95 is 95%tile, 0.5 is median)
  390. // Only 0.5, 0.9 and 0.99 quantiles are supported.
  391. func (a *APIResponsiveness) addMetricRequestLatency(resource, subresource, verb, scope string, quantile float64, latency time.Duration) {
  392. for i, apicall := range a.APICalls {
  393. if apicall.Resource == resource && apicall.Subresource == subresource && apicall.Verb == verb && apicall.Scope == scope {
  394. a.APICalls[i] = setQuantileAPICall(apicall, quantile, latency)
  395. return
  396. }
  397. }
  398. apicall := setQuantileAPICall(APICall{Resource: resource, Subresource: subresource, Verb: verb, Scope: scope}, quantile, latency)
  399. a.APICalls = append(a.APICalls, apicall)
  400. }
  401. // 0 <= quantile <=1 (e.g. 0.95 is 95%tile, 0.5 is median)
  402. // Only 0.5, 0.9 and 0.99 quantiles are supported.
  403. func setQuantileAPICall(apicall APICall, quantile float64, latency time.Duration) APICall {
  404. setQuantile(&apicall.Latency, quantile, latency)
  405. return apicall
  406. }
  407. // Only 0.5, 0.9 and 0.99 quantiles are supported.
  408. func setQuantile(metric *LatencyMetric, quantile float64, latency time.Duration) {
  409. switch quantile {
  410. case 0.5:
  411. metric.Perc50 = latency
  412. case 0.9:
  413. metric.Perc90 = latency
  414. case 0.99:
  415. metric.Perc99 = latency
  416. }
  417. }
  418. // Add request count to the APICall metric entry (creating one if necessary).
  419. func (a *APIResponsiveness) addMetricRequestCount(resource, subresource, verb, scope string, count int) {
  420. for i, apicall := range a.APICalls {
  421. if apicall.Resource == resource && apicall.Subresource == subresource && apicall.Verb == verb && apicall.Scope == scope {
  422. a.APICalls[i].Count += count
  423. return
  424. }
  425. }
  426. apicall := APICall{Resource: resource, Subresource: subresource, Verb: verb, Count: count, Scope: scope}
  427. a.APICalls = append(a.APICalls, apicall)
  428. }
  429. func readLatencyMetrics(c clientset.Interface) (*APIResponsiveness, error) {
  430. var a APIResponsiveness
  431. body, err := getMetrics(c)
  432. if err != nil {
  433. return nil, err
  434. }
  435. samples, err := extractMetricSamples(body)
  436. if err != nil {
  437. return nil, err
  438. }
  439. ignoredResources := sets.NewString("events")
  440. // TODO: figure out why we're getting non-capitalized proxy and fix this.
  441. ignoredVerbs := sets.NewString("WATCH", "WATCHLIST", "PROXY", "proxy", "CONNECT")
  442. for _, sample := range samples {
  443. // Example line:
  444. // apiserver_request_latencies_summary{resource="namespaces",verb="LIST",quantile="0.99"} 908
  445. // apiserver_request_total{resource="pods",verb="LIST",client="kubectl",code="200",contentType="json"} 233
  446. if sample.Metric[model.MetricNameLabel] != "apiserver_request_latencies_summary" &&
  447. sample.Metric[model.MetricNameLabel] != "apiserver_request_total" {
  448. continue
  449. }
  450. resource := string(sample.Metric["resource"])
  451. subresource := string(sample.Metric["subresource"])
  452. verb := string(sample.Metric["verb"])
  453. scope := string(sample.Metric["scope"])
  454. if ignoredResources.Has(resource) || ignoredVerbs.Has(verb) {
  455. continue
  456. }
  457. switch sample.Metric[model.MetricNameLabel] {
  458. case "apiserver_request_latencies_summary":
  459. latency := sample.Value
  460. quantile, err := strconv.ParseFloat(string(sample.Metric[model.QuantileLabel]), 64)
  461. if err != nil {
  462. return nil, err
  463. }
  464. a.addMetricRequestLatency(resource, subresource, verb, scope, quantile, time.Duration(int64(latency))*time.Microsecond)
  465. case "apiserver_request_total":
  466. count := sample.Value
  467. a.addMetricRequestCount(resource, subresource, verb, scope, int(count))
  468. }
  469. }
  470. return &a, err
  471. }
  472. // HighLatencyRequests prints top five summary metrics for request types with latency and returns
  473. // number of such request types above threshold. We use a higher threshold for
  474. // list calls if nodeCount is above a given threshold (i.e. cluster is big).
  475. func HighLatencyRequests(c clientset.Interface, nodeCount int) (int, *APIResponsiveness, error) {
  476. isBigCluster := (nodeCount > bigClusterNodeCountThreshold)
  477. metrics, err := readLatencyMetrics(c)
  478. if err != nil {
  479. return 0, metrics, err
  480. }
  481. sort.Sort(sort.Reverse(metrics))
  482. badMetrics := 0
  483. top := 5
  484. for i := range metrics.APICalls {
  485. latency := metrics.APICalls[i].Latency.Perc99
  486. isListCall := (metrics.APICalls[i].Verb == "LIST")
  487. isClusterScopedCall := (metrics.APICalls[i].Scope == "cluster")
  488. isBad := false
  489. latencyThreshold := apiCallLatencyThreshold
  490. if isListCall && isBigCluster {
  491. latencyThreshold = apiListCallLatencyThreshold
  492. if isClusterScopedCall {
  493. latencyThreshold = apiClusterScopeListCallThreshold
  494. }
  495. }
  496. if latency > latencyThreshold {
  497. isBad = true
  498. badMetrics++
  499. }
  500. if top > 0 || isBad {
  501. top--
  502. prefix := ""
  503. if isBad {
  504. prefix = "WARNING "
  505. }
  506. e2elog.Logf("%vTop latency metric: %+v", prefix, metrics.APICalls[i])
  507. }
  508. }
  509. return badMetrics, metrics, nil
  510. }
  511. // VerifyLatencyWithinThreshold verifies whether 50, 90 and 99th percentiles of a latency metric are
  512. // within the expected threshold.
  513. func VerifyLatencyWithinThreshold(threshold, actual LatencyMetric, metricName string) error {
  514. if actual.Perc50 > threshold.Perc50 {
  515. return fmt.Errorf("too high %v latency 50th percentile: %v", metricName, actual.Perc50)
  516. }
  517. if actual.Perc90 > threshold.Perc90 {
  518. return fmt.Errorf("too high %v latency 90th percentile: %v", metricName, actual.Perc90)
  519. }
  520. if actual.Perc99 > threshold.Perc99 {
  521. return fmt.Errorf("too high %v latency 99th percentile: %v", metricName, actual.Perc99)
  522. }
  523. return nil
  524. }
  525. // ResetMetrics resets latency metrics in apiserver.
  526. func ResetMetrics(c clientset.Interface) error {
  527. e2elog.Logf("Resetting latency metrics in apiserver...")
  528. body, err := c.CoreV1().RESTClient().Delete().AbsPath("/metrics").DoRaw()
  529. if err != nil {
  530. return err
  531. }
  532. if string(body) != "metrics reset\n" {
  533. return fmt.Errorf("Unexpected response: %q", string(body))
  534. }
  535. return nil
  536. }
  537. // Retrieves metrics information.
  538. func getMetrics(c clientset.Interface) (string, error) {
  539. body, err := c.CoreV1().RESTClient().Get().AbsPath("/metrics").DoRaw()
  540. if err != nil {
  541. return "", err
  542. }
  543. return string(body), nil
  544. }
  545. // Sends REST request to kube scheduler metrics
  546. func sendRestRequestToScheduler(c clientset.Interface, op string) (string, error) {
  547. opUpper := strings.ToUpper(op)
  548. if opUpper != "GET" && opUpper != "DELETE" {
  549. return "", fmt.Errorf("Unknown REST request")
  550. }
  551. nodes, err := c.CoreV1().Nodes().List(metav1.ListOptions{})
  552. ExpectNoError(err)
  553. var masterRegistered = false
  554. for _, node := range nodes.Items {
  555. if system.IsMasterNode(node.Name) {
  556. masterRegistered = true
  557. }
  558. }
  559. var responseText string
  560. if masterRegistered {
  561. ctx, cancel := context.WithTimeout(context.Background(), SingleCallTimeout)
  562. defer cancel()
  563. body, err := c.CoreV1().RESTClient().Verb(opUpper).
  564. Context(ctx).
  565. Namespace(metav1.NamespaceSystem).
  566. Resource("pods").
  567. Name(fmt.Sprintf("kube-scheduler-%v:%v", TestContext.CloudConfig.MasterName, ports.InsecureSchedulerPort)).
  568. SubResource("proxy").
  569. Suffix("metrics").
  570. Do().Raw()
  571. ExpectNoError(err)
  572. responseText = string(body)
  573. } else {
  574. // If master is not registered fall back to old method of using SSH.
  575. if TestContext.Provider == "gke" || TestContext.Provider == "eks" {
  576. e2elog.Logf("Not grabbing scheduler metrics through master SSH: unsupported for %s", TestContext.Provider)
  577. return "", nil
  578. }
  579. cmd := "curl -X " + opUpper + " http://localhost:10251/metrics"
  580. sshResult, err := e2essh.SSH(cmd, GetMasterHost()+":22", TestContext.Provider)
  581. if err != nil || sshResult.Code != 0 {
  582. return "", fmt.Errorf("unexpected error (code: %d) in ssh connection to master: %#v", sshResult.Code, err)
  583. }
  584. responseText = sshResult.Stdout
  585. }
  586. return responseText, nil
  587. }
  588. // Retrieves scheduler latency metrics.
  589. func getSchedulingLatency(c clientset.Interface) (*SchedulingMetrics, error) {
  590. result := SchedulingMetrics{}
  591. data, err := sendRestRequestToScheduler(c, "GET")
  592. if err != nil {
  593. return nil, err
  594. }
  595. samples, err := extractMetricSamples(data)
  596. if err != nil {
  597. return nil, err
  598. }
  599. for _, sample := range samples {
  600. if sample.Metric[model.MetricNameLabel] != schedulingLatencyMetricName {
  601. continue
  602. }
  603. var metric *LatencyMetric
  604. switch sample.Metric[schedulermetric.OperationLabel] {
  605. case schedulermetric.PredicateEvaluation:
  606. metric = &result.PredicateEvaluationLatency
  607. case schedulermetric.PriorityEvaluation:
  608. metric = &result.PriorityEvaluationLatency
  609. case schedulermetric.PreemptionEvaluation:
  610. metric = &result.PreemptionEvaluationLatency
  611. case schedulermetric.Binding:
  612. metric = &result.BindingLatency
  613. }
  614. if metric == nil {
  615. continue
  616. }
  617. quantile, err := strconv.ParseFloat(string(sample.Metric[model.QuantileLabel]), 64)
  618. if err != nil {
  619. return nil, err
  620. }
  621. setQuantile(metric, quantile, time.Duration(int64(float64(sample.Value)*float64(time.Second))))
  622. }
  623. return &result, nil
  624. }
  625. // VerifySchedulerLatency verifies (currently just by logging them) the scheduling latencies.
  626. func VerifySchedulerLatency(c clientset.Interface) (*SchedulingMetrics, error) {
  627. latency, err := getSchedulingLatency(c)
  628. if err != nil {
  629. return nil, err
  630. }
  631. return latency, nil
  632. }
  633. // ResetSchedulerMetrics sends a DELETE request to kube-scheduler for resetting metrics.
  634. func ResetSchedulerMetrics(c clientset.Interface) error {
  635. responseText, err := sendRestRequestToScheduler(c, "DELETE")
  636. if err != nil {
  637. return fmt.Errorf("Unexpected response: %q", responseText)
  638. }
  639. return nil
  640. }
  641. func convertSampleToBucket(sample *model.Sample, h *HistogramVec) {
  642. labels := make(map[string]string)
  643. for k, v := range sample.Metric {
  644. if k != "le" {
  645. labels[string(k)] = string(v)
  646. }
  647. }
  648. var hist *Histogram
  649. for i := range *h {
  650. if reflect.DeepEqual(labels, (*h)[i].Labels) {
  651. hist = &((*h)[i])
  652. break
  653. }
  654. }
  655. if hist == nil {
  656. hist = newHistogram(labels)
  657. *h = append(*h, *hist)
  658. }
  659. hist.Buckets[string(sample.Metric["le"])] = int(sample.Value)
  660. }
  661. // PrettyPrintJSON converts metrics to JSON format.
  662. func PrettyPrintJSON(metrics interface{}) string {
  663. output := &bytes.Buffer{}
  664. if err := json.NewEncoder(output).Encode(metrics); err != nil {
  665. e2elog.Logf("Error building encoder: %v", err)
  666. return ""
  667. }
  668. formatted := &bytes.Buffer{}
  669. if err := json.Indent(formatted, output.Bytes(), "", " "); err != nil {
  670. e2elog.Logf("Error indenting: %v", err)
  671. return ""
  672. }
  673. return string(formatted.Bytes())
  674. }
  675. // extractMetricSamples parses the prometheus metric samples from the input string.
  676. func extractMetricSamples(metricsBlob string) ([]*model.Sample, error) {
  677. dec := expfmt.NewDecoder(strings.NewReader(metricsBlob), expfmt.FmtText)
  678. decoder := expfmt.SampleDecoder{
  679. Dec: dec,
  680. Opts: &expfmt.DecodeOptions{},
  681. }
  682. var samples []*model.Sample
  683. for {
  684. var v model.Vector
  685. if err := decoder.Decode(&v); err != nil {
  686. if err == io.EOF {
  687. // Expected loop termination condition.
  688. return samples, nil
  689. }
  690. return nil, err
  691. }
  692. samples = append(samples, v...)
  693. }
  694. }
  695. // PodLatencyData encapsulates pod startup latency information.
  696. type PodLatencyData struct {
  697. // Name of the pod
  698. Name string
  699. // Node this pod was running on
  700. Node string
  701. // Latency information related to pod startuptime
  702. Latency time.Duration
  703. }
  704. // LatencySlice is an array of PodLatencyData which encapsulates pod startup latency information.
  705. type LatencySlice []PodLatencyData
  706. func (a LatencySlice) Len() int { return len(a) }
  707. func (a LatencySlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
  708. func (a LatencySlice) Less(i, j int) bool { return a[i].Latency < a[j].Latency }
  709. // ExtractLatencyMetrics returns latency metrics for each percentile(50th, 90th and 99th).
  710. func ExtractLatencyMetrics(latencies []PodLatencyData) LatencyMetric {
  711. length := len(latencies)
  712. perc50 := latencies[int(math.Ceil(float64(length*50)/100))-1].Latency
  713. perc90 := latencies[int(math.Ceil(float64(length*90)/100))-1].Latency
  714. perc99 := latencies[int(math.Ceil(float64(length*99)/100))-1].Latency
  715. perc100 := latencies[length-1].Latency
  716. return LatencyMetric{Perc50: perc50, Perc90: perc90, Perc99: perc99, Perc100: perc100}
  717. }
  718. // LogSuspiciousLatency logs metrics/docker errors from all nodes that had slow startup times
  719. // If latencyDataLag is nil then it will be populated from latencyData
  720. func LogSuspiciousLatency(latencyData []PodLatencyData, latencyDataLag []PodLatencyData, nodeCount int, c clientset.Interface) {
  721. if latencyDataLag == nil {
  722. latencyDataLag = latencyData
  723. }
  724. for _, l := range latencyData {
  725. if l.Latency > NodeStartupThreshold {
  726. HighLatencyKubeletOperations(c, 1*time.Second, l.Node, e2elog.Logf)
  727. }
  728. }
  729. e2elog.Logf("Approx throughput: %v pods/min",
  730. float64(nodeCount)/(latencyDataLag[len(latencyDataLag)-1].Latency.Minutes()))
  731. }
  732. // PrintLatencies outputs latencies to log with readable format.
  733. func PrintLatencies(latencies []PodLatencyData, header string) {
  734. metrics := ExtractLatencyMetrics(latencies)
  735. e2elog.Logf("10%% %s: %v", header, latencies[(len(latencies)*9)/10:])
  736. e2elog.Logf("perc50: %v, perc90: %v, perc99: %v", metrics.Perc50, metrics.Perc90, metrics.Perc99)
  737. }
  738. func (m *MetricsForE2E) computeClusterAutoscalerMetricsDelta(before metrics.Collection) {
  739. if beforeSamples, found := before.ClusterAutoscalerMetrics[caFunctionMetric]; found {
  740. if afterSamples, found := m.ClusterAutoscalerMetrics[caFunctionMetric]; found {
  741. beforeSamplesMap := make(map[string]*model.Sample)
  742. for _, bSample := range beforeSamples {
  743. beforeSamplesMap[makeKey(bSample.Metric[caFunctionMetricLabel], bSample.Metric["le"])] = bSample
  744. }
  745. for _, aSample := range afterSamples {
  746. if bSample, found := beforeSamplesMap[makeKey(aSample.Metric[caFunctionMetricLabel], aSample.Metric["le"])]; found {
  747. aSample.Value = aSample.Value - bSample.Value
  748. }
  749. }
  750. }
  751. }
  752. }
  753. func makeKey(a, b model.LabelValue) string {
  754. return string(a) + "___" + string(b)
  755. }