kubelet_stats.go 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package framework
  14. import (
  15. "bytes"
  16. "context"
  17. "encoding/json"
  18. "fmt"
  19. "sort"
  20. "strconv"
  21. "strings"
  22. "sync"
  23. "text/tabwriter"
  24. "time"
  25. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  26. utilerrors "k8s.io/apimachinery/pkg/util/errors"
  27. "k8s.io/apimachinery/pkg/util/sets"
  28. "k8s.io/apimachinery/pkg/util/wait"
  29. clientset "k8s.io/client-go/kubernetes"
  30. stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
  31. dockermetrics "k8s.io/kubernetes/pkg/kubelet/dockershim/metrics"
  32. kubeletmetrics "k8s.io/kubernetes/pkg/kubelet/metrics"
  33. "k8s.io/kubernetes/pkg/master/ports"
  34. e2elog "k8s.io/kubernetes/test/e2e/framework/log"
  35. "k8s.io/kubernetes/test/e2e/framework/metrics"
  36. "github.com/prometheus/common/model"
  37. )
  38. // KubeletLatencyMetric stores metrics scraped from the kubelet server's /metric endpoint.
  39. // TODO: Get some more structure around the metrics and this type
  40. type KubeletLatencyMetric struct {
  41. // eg: list, info, create
  42. Operation string
  43. // eg: sync_pods, pod_worker
  44. Method string
  45. // 0 <= quantile <=1, e.g. 0.95 is 95%tile, 0.5 is median.
  46. Quantile float64
  47. Latency time.Duration
  48. }
  49. // KubeletLatencyMetrics implements sort.Interface for []KubeletMetric based on
  50. // the latency field.
  51. type KubeletLatencyMetrics []KubeletLatencyMetric
  52. func (a KubeletLatencyMetrics) Len() int { return len(a) }
  53. func (a KubeletLatencyMetrics) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
  54. func (a KubeletLatencyMetrics) Less(i, j int) bool { return a[i].Latency > a[j].Latency }
  55. // If a apiserver client is passed in, the function will try to get kubelet metrics from metrics grabber;
  56. // or else, the function will try to get kubelet metrics directly from the node.
  57. func getKubeletMetricsFromNode(c clientset.Interface, nodeName string) (metrics.KubeletMetrics, error) {
  58. if c == nil {
  59. return metrics.GrabKubeletMetricsWithoutProxy(nodeName, "/metrics")
  60. }
  61. grabber, err := metrics.NewMetricsGrabber(c, nil, true, false, false, false, false)
  62. if err != nil {
  63. return metrics.KubeletMetrics{}, err
  64. }
  65. return grabber.GrabFromKubelet(nodeName)
  66. }
  67. // getKubeletMetrics gets all metrics in kubelet subsystem from specified node and trims
  68. // the subsystem prefix.
  69. func getKubeletMetrics(c clientset.Interface, nodeName string) (metrics.KubeletMetrics, error) {
  70. ms, err := getKubeletMetricsFromNode(c, nodeName)
  71. if err != nil {
  72. return metrics.KubeletMetrics{}, err
  73. }
  74. kubeletMetrics := make(metrics.KubeletMetrics)
  75. for name, samples := range ms {
  76. const prefix = kubeletmetrics.KubeletSubsystem + "_"
  77. if !strings.HasPrefix(name, prefix) {
  78. // Not a kubelet metric.
  79. continue
  80. }
  81. method := strings.TrimPrefix(name, prefix)
  82. kubeletMetrics[method] = samples
  83. }
  84. return kubeletMetrics, nil
  85. }
  86. // GetDefaultKubeletLatencyMetrics calls GetKubeletLatencyMetrics with a set of default metricNames
  87. // identifying common latency metrics.
  88. // Note that the KubeletMetrics passed in should not contain subsystem prefix.
  89. func GetDefaultKubeletLatencyMetrics(ms metrics.KubeletMetrics) KubeletLatencyMetrics {
  90. latencyMetricNames := sets.NewString(
  91. kubeletmetrics.PodWorkerDurationKey,
  92. kubeletmetrics.PodWorkerStartDurationKey,
  93. kubeletmetrics.PodStartDurationKey,
  94. kubeletmetrics.CgroupManagerOperationsKey,
  95. dockermetrics.DockerOperationsLatencyKey,
  96. kubeletmetrics.PodWorkerStartDurationKey,
  97. kubeletmetrics.PLEGRelistDurationKey,
  98. )
  99. return GetKubeletLatencyMetrics(ms, latencyMetricNames)
  100. }
  101. // GetKubeletLatencyMetrics filters ms to include only those contained in the metricNames set,
  102. // then constructs a KubeletLatencyMetrics list based on the samples associated with those metrics.
  103. func GetKubeletLatencyMetrics(ms metrics.KubeletMetrics, filterMetricNames sets.String) KubeletLatencyMetrics {
  104. var latencyMetrics KubeletLatencyMetrics
  105. for name, samples := range ms {
  106. if !filterMetricNames.Has(name) {
  107. continue
  108. }
  109. for _, sample := range samples {
  110. latency := sample.Value
  111. operation := string(sample.Metric["operation_type"])
  112. var quantile float64
  113. if val, ok := sample.Metric[model.QuantileLabel]; ok {
  114. var err error
  115. if quantile, err = strconv.ParseFloat(string(val), 64); err != nil {
  116. continue
  117. }
  118. }
  119. latencyMetrics = append(latencyMetrics, KubeletLatencyMetric{
  120. Operation: operation,
  121. Method: name,
  122. Quantile: quantile,
  123. Latency: time.Duration(int64(latency)) * time.Microsecond,
  124. })
  125. }
  126. }
  127. return latencyMetrics
  128. }
  129. // RuntimeOperationMonitor is the tool getting and parsing docker operation metrics.
  130. type RuntimeOperationMonitor struct {
  131. client clientset.Interface
  132. nodesRuntimeOps map[string]NodeRuntimeOperationErrorRate
  133. }
  134. // NodeRuntimeOperationErrorRate is the runtime operation error rate on one node.
  135. type NodeRuntimeOperationErrorRate map[string]*RuntimeOperationErrorRate
  136. // RuntimeOperationErrorRate is the error rate of a specified runtime operation.
  137. type RuntimeOperationErrorRate struct {
  138. TotalNumber float64
  139. ErrorRate float64
  140. TimeoutRate float64
  141. }
  142. // NewRuntimeOperationMonitor returns a new RuntimeOperationMonitor.
  143. func NewRuntimeOperationMonitor(c clientset.Interface) *RuntimeOperationMonitor {
  144. m := &RuntimeOperationMonitor{
  145. client: c,
  146. nodesRuntimeOps: make(map[string]NodeRuntimeOperationErrorRate),
  147. }
  148. nodes, err := m.client.CoreV1().Nodes().List(metav1.ListOptions{})
  149. if err != nil {
  150. Failf("RuntimeOperationMonitor: unable to get list of nodes: %v", err)
  151. }
  152. for _, node := range nodes.Items {
  153. m.nodesRuntimeOps[node.Name] = make(NodeRuntimeOperationErrorRate)
  154. }
  155. // Initialize the runtime operation error rate
  156. m.GetRuntimeOperationErrorRate()
  157. return m
  158. }
  159. // GetRuntimeOperationErrorRate gets runtime operation records from kubelet metrics and calculate
  160. // error rates of all runtime operations.
  161. func (m *RuntimeOperationMonitor) GetRuntimeOperationErrorRate() map[string]NodeRuntimeOperationErrorRate {
  162. for node := range m.nodesRuntimeOps {
  163. nodeResult, err := getNodeRuntimeOperationErrorRate(m.client, node)
  164. if err != nil {
  165. e2elog.Logf("GetRuntimeOperationErrorRate: unable to get kubelet metrics from node %q: %v", node, err)
  166. continue
  167. }
  168. m.nodesRuntimeOps[node] = nodeResult
  169. }
  170. return m.nodesRuntimeOps
  171. }
  172. // GetLatestRuntimeOperationErrorRate gets latest error rate and timeout rate from last observed RuntimeOperationErrorRate.
  173. func (m *RuntimeOperationMonitor) GetLatestRuntimeOperationErrorRate() map[string]NodeRuntimeOperationErrorRate {
  174. result := make(map[string]NodeRuntimeOperationErrorRate)
  175. for node := range m.nodesRuntimeOps {
  176. result[node] = make(NodeRuntimeOperationErrorRate)
  177. oldNodeResult := m.nodesRuntimeOps[node]
  178. curNodeResult, err := getNodeRuntimeOperationErrorRate(m.client, node)
  179. if err != nil {
  180. e2elog.Logf("GetLatestRuntimeOperationErrorRate: unable to get kubelet metrics from node %q: %v", node, err)
  181. continue
  182. }
  183. for op, cur := range curNodeResult {
  184. t := *cur
  185. if old, found := oldNodeResult[op]; found {
  186. t.ErrorRate = (t.ErrorRate*t.TotalNumber - old.ErrorRate*old.TotalNumber) / (t.TotalNumber - old.TotalNumber)
  187. t.TimeoutRate = (t.TimeoutRate*t.TotalNumber - old.TimeoutRate*old.TotalNumber) / (t.TotalNumber - old.TotalNumber)
  188. t.TotalNumber -= old.TotalNumber
  189. }
  190. result[node][op] = &t
  191. }
  192. m.nodesRuntimeOps[node] = curNodeResult
  193. }
  194. return result
  195. }
  196. // FormatRuntimeOperationErrorRate formats the runtime operation error rate to string.
  197. func FormatRuntimeOperationErrorRate(nodesResult map[string]NodeRuntimeOperationErrorRate) string {
  198. lines := []string{}
  199. for node, nodeResult := range nodesResult {
  200. lines = append(lines, fmt.Sprintf("node %q runtime operation error rate:", node))
  201. for op, result := range nodeResult {
  202. line := fmt.Sprintf("operation %q: total - %.0f; error rate - %f; timeout rate - %f", op,
  203. result.TotalNumber, result.ErrorRate, result.TimeoutRate)
  204. lines = append(lines, line)
  205. }
  206. lines = append(lines, fmt.Sprintln())
  207. }
  208. return strings.Join(lines, "\n")
  209. }
  210. // getNodeRuntimeOperationErrorRate gets runtime operation error rate from specified node.
  211. func getNodeRuntimeOperationErrorRate(c clientset.Interface, node string) (NodeRuntimeOperationErrorRate, error) {
  212. result := make(NodeRuntimeOperationErrorRate)
  213. ms, err := getKubeletMetrics(c, node)
  214. if err != nil {
  215. return result, err
  216. }
  217. // If no corresponding metrics are found, the returned samples will be empty. Then the following
  218. // loop will be skipped automatically.
  219. allOps := ms[dockermetrics.DockerOperationsKey]
  220. errOps := ms[dockermetrics.DockerOperationsErrorsKey]
  221. timeoutOps := ms[dockermetrics.DockerOperationsTimeoutKey]
  222. for _, sample := range allOps {
  223. operation := string(sample.Metric["operation_type"])
  224. result[operation] = &RuntimeOperationErrorRate{TotalNumber: float64(sample.Value)}
  225. }
  226. for _, sample := range errOps {
  227. operation := string(sample.Metric["operation_type"])
  228. // Should always find the corresponding item, just in case
  229. if _, found := result[operation]; found {
  230. result[operation].ErrorRate = float64(sample.Value) / result[operation].TotalNumber
  231. }
  232. }
  233. for _, sample := range timeoutOps {
  234. operation := string(sample.Metric["operation_type"])
  235. if _, found := result[operation]; found {
  236. result[operation].TimeoutRate = float64(sample.Value) / result[operation].TotalNumber
  237. }
  238. }
  239. return result, nil
  240. }
  241. // HighLatencyKubeletOperations logs and counts the high latency metrics exported by the kubelet server via /metrics.
  242. func HighLatencyKubeletOperations(c clientset.Interface, threshold time.Duration, nodeName string, logFunc func(fmt string, args ...interface{})) (KubeletLatencyMetrics, error) {
  243. ms, err := getKubeletMetrics(c, nodeName)
  244. if err != nil {
  245. return KubeletLatencyMetrics{}, err
  246. }
  247. latencyMetrics := GetDefaultKubeletLatencyMetrics(ms)
  248. sort.Sort(latencyMetrics)
  249. var badMetrics KubeletLatencyMetrics
  250. logFunc("\nLatency metrics for node %v", nodeName)
  251. for _, m := range latencyMetrics {
  252. if m.Latency > threshold {
  253. badMetrics = append(badMetrics, m)
  254. e2elog.Logf("%+v", m)
  255. }
  256. }
  257. return badMetrics, nil
  258. }
  259. // GetStatsSummary contacts kubelet for the container information.
  260. func GetStatsSummary(c clientset.Interface, nodeName string) (*stats.Summary, error) {
  261. ctx, cancel := context.WithTimeout(context.Background(), SingleCallTimeout)
  262. defer cancel()
  263. data, err := c.CoreV1().RESTClient().Get().
  264. Context(ctx).
  265. Resource("nodes").
  266. SubResource("proxy").
  267. Name(fmt.Sprintf("%v:%v", nodeName, ports.KubeletPort)).
  268. Suffix("stats/summary").
  269. Do().Raw()
  270. if err != nil {
  271. return nil, err
  272. }
  273. summary := stats.Summary{}
  274. err = json.Unmarshal(data, &summary)
  275. if err != nil {
  276. return nil, err
  277. }
  278. return &summary, nil
  279. }
  280. func removeUint64Ptr(ptr *uint64) uint64 {
  281. if ptr == nil {
  282. return 0
  283. }
  284. return *ptr
  285. }
  286. // getOneTimeResourceUsageOnNode queries the node's /stats/summary endpoint
  287. // and returns the resource usage of all containerNames for the past
  288. // cpuInterval.
  289. // The acceptable range of the interval is 2s~120s. Be warned that as the
  290. // interval (and #containers) increases, the size of kubelet's response
  291. // could be significant. E.g., the 60s interval stats for ~20 containers is
  292. // ~1.5MB. Don't hammer the node with frequent, heavy requests.
  293. //
  294. // cadvisor records cumulative cpu usage in nanoseconds, so we need to have two
  295. // stats points to compute the cpu usage over the interval. Assuming cadvisor
  296. // polls every second, we'd need to get N stats points for N-second interval.
  297. // Note that this is an approximation and may not be accurate, hence we also
  298. // write the actual interval used for calculation (based on the timestamps of
  299. // the stats points in ContainerResourceUsage.CPUInterval.
  300. //
  301. // containerNames is a function returning a collection of container names in which
  302. // user is interested in.
  303. func getOneTimeResourceUsageOnNode(
  304. c clientset.Interface,
  305. nodeName string,
  306. cpuInterval time.Duration,
  307. containerNames func() []string,
  308. ) (ResourceUsagePerContainer, error) {
  309. const (
  310. // cadvisor records stats about every second.
  311. cadvisorStatsPollingIntervalInSeconds float64 = 1.0
  312. // cadvisor caches up to 2 minutes of stats (configured by kubelet).
  313. maxNumStatsToRequest int = 120
  314. )
  315. numStats := int(float64(cpuInterval.Seconds()) / cadvisorStatsPollingIntervalInSeconds)
  316. if numStats < 2 || numStats > maxNumStatsToRequest {
  317. return nil, fmt.Errorf("numStats needs to be > 1 and < %d", maxNumStatsToRequest)
  318. }
  319. // Get information of all containers on the node.
  320. summary, err := GetStatsSummary(c, nodeName)
  321. if err != nil {
  322. return nil, err
  323. }
  324. f := func(name string, newStats *stats.ContainerStats) *ContainerResourceUsage {
  325. if newStats == nil || newStats.CPU == nil || newStats.Memory == nil {
  326. return nil
  327. }
  328. return &ContainerResourceUsage{
  329. Name: name,
  330. Timestamp: newStats.StartTime.Time,
  331. CPUUsageInCores: float64(removeUint64Ptr(newStats.CPU.UsageNanoCores)) / 1000000000,
  332. MemoryUsageInBytes: removeUint64Ptr(newStats.Memory.UsageBytes),
  333. MemoryWorkingSetInBytes: removeUint64Ptr(newStats.Memory.WorkingSetBytes),
  334. MemoryRSSInBytes: removeUint64Ptr(newStats.Memory.RSSBytes),
  335. CPUInterval: 0,
  336. }
  337. }
  338. // Process container infos that are relevant to us.
  339. containers := containerNames()
  340. usageMap := make(ResourceUsagePerContainer, len(containers))
  341. observedContainers := []string{}
  342. for _, pod := range summary.Pods {
  343. for _, container := range pod.Containers {
  344. isInteresting := false
  345. for _, interestingContainerName := range containers {
  346. if container.Name == interestingContainerName {
  347. isInteresting = true
  348. observedContainers = append(observedContainers, container.Name)
  349. break
  350. }
  351. }
  352. if !isInteresting {
  353. continue
  354. }
  355. if usage := f(pod.PodRef.Name+"/"+container.Name, &container); usage != nil {
  356. usageMap[pod.PodRef.Name+"/"+container.Name] = usage
  357. }
  358. }
  359. }
  360. return usageMap, nil
  361. }
  362. func getNodeStatsSummary(c clientset.Interface, nodeName string) (*stats.Summary, error) {
  363. data, err := c.CoreV1().RESTClient().Get().
  364. Resource("nodes").
  365. SubResource("proxy").
  366. Name(fmt.Sprintf("%v:%v", nodeName, ports.KubeletPort)).
  367. Suffix("stats/summary").
  368. SetHeader("Content-Type", "application/json").
  369. Do().Raw()
  370. if err != nil {
  371. return nil, err
  372. }
  373. var summary *stats.Summary
  374. err = json.Unmarshal(data, &summary)
  375. if err != nil {
  376. return nil, err
  377. }
  378. return summary, nil
  379. }
  380. func getSystemContainerStats(summary *stats.Summary) map[string]*stats.ContainerStats {
  381. statsList := summary.Node.SystemContainers
  382. statsMap := make(map[string]*stats.ContainerStats)
  383. for i := range statsList {
  384. statsMap[statsList[i].Name] = &statsList[i]
  385. }
  386. // Create a root container stats using information available in
  387. // stats.NodeStats. This is necessary since it is a different type.
  388. statsMap[rootContainerName] = &stats.ContainerStats{
  389. CPU: summary.Node.CPU,
  390. Memory: summary.Node.Memory,
  391. }
  392. return statsMap
  393. }
  394. const (
  395. rootContainerName = "/"
  396. )
  397. // TargetContainers returns a list of containers for which we want to collect resource usage.
  398. func TargetContainers() []string {
  399. return []string{
  400. rootContainerName,
  401. stats.SystemContainerRuntime,
  402. stats.SystemContainerKubelet,
  403. }
  404. }
  405. // ContainerResourceUsage is a structure for gathering container resource usage.
  406. type ContainerResourceUsage struct {
  407. Name string
  408. Timestamp time.Time
  409. CPUUsageInCores float64
  410. MemoryUsageInBytes uint64
  411. MemoryWorkingSetInBytes uint64
  412. MemoryRSSInBytes uint64
  413. // The interval used to calculate CPUUsageInCores.
  414. CPUInterval time.Duration
  415. }
  416. func (r *ContainerResourceUsage) isStrictlyGreaterThan(rhs *ContainerResourceUsage) bool {
  417. return r.CPUUsageInCores > rhs.CPUUsageInCores && r.MemoryWorkingSetInBytes > rhs.MemoryWorkingSetInBytes
  418. }
  419. // ResourceUsagePerContainer is map of ContainerResourceUsage
  420. type ResourceUsagePerContainer map[string]*ContainerResourceUsage
  421. // ResourceUsagePerNode is map of ResourceUsagePerContainer.
  422. type ResourceUsagePerNode map[string]ResourceUsagePerContainer
  423. func formatResourceUsageStats(nodeName string, containerStats ResourceUsagePerContainer) string {
  424. // Example output:
  425. //
  426. // Resource usage for node "e2e-test-foo-node-abcde":
  427. // container cpu(cores) memory(MB)
  428. // "/" 0.363 2942.09
  429. // "/docker-daemon" 0.088 521.80
  430. // "/kubelet" 0.086 424.37
  431. // "/system" 0.007 119.88
  432. buf := &bytes.Buffer{}
  433. w := tabwriter.NewWriter(buf, 1, 0, 1, ' ', 0)
  434. fmt.Fprintf(w, "container\tcpu(cores)\tmemory_working_set(MB)\tmemory_rss(MB)\n")
  435. for name, s := range containerStats {
  436. fmt.Fprintf(w, "%q\t%.3f\t%.2f\t%.2f\n", name, s.CPUUsageInCores, float64(s.MemoryWorkingSetInBytes)/(1024*1024), float64(s.MemoryRSSInBytes)/(1024*1024))
  437. }
  438. w.Flush()
  439. return fmt.Sprintf("Resource usage on node %q:\n%s", nodeName, buf.String())
  440. }
  441. type uint64arr []uint64
  442. func (a uint64arr) Len() int { return len(a) }
  443. func (a uint64arr) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
  444. func (a uint64arr) Less(i, j int) bool { return a[i] < a[j] }
  445. type usageDataPerContainer struct {
  446. cpuData []float64
  447. memUseData []uint64
  448. memWorkSetData []uint64
  449. }
  450. // GetKubeletHeapStats returns stats of kubelet heap.
  451. func GetKubeletHeapStats(c clientset.Interface, nodeName string) (string, error) {
  452. client, err := NodeProxyRequest(c, nodeName, "debug/pprof/heap", ports.KubeletPort)
  453. if err != nil {
  454. return "", err
  455. }
  456. raw, errRaw := client.Raw()
  457. if errRaw != nil {
  458. return "", err
  459. }
  460. stats := string(raw)
  461. // Only dumping the runtime.MemStats numbers to avoid polluting the log.
  462. numLines := 23
  463. lines := strings.Split(stats, "\n")
  464. return strings.Join(lines[len(lines)-numLines:], "\n"), nil
  465. }
  466. // PrintAllKubeletPods outputs status of all kubelet pods into log.
  467. func PrintAllKubeletPods(c clientset.Interface, nodeName string) {
  468. podList, err := GetKubeletPods(c, nodeName)
  469. if err != nil {
  470. e2elog.Logf("Unable to retrieve kubelet pods for node %v: %v", nodeName, err)
  471. return
  472. }
  473. for _, p := range podList.Items {
  474. e2elog.Logf("%v from %v started at %v (%d container statuses recorded)", p.Name, p.Namespace, p.Status.StartTime, len(p.Status.ContainerStatuses))
  475. for _, c := range p.Status.ContainerStatuses {
  476. e2elog.Logf("\tContainer %v ready: %v, restart count %v",
  477. c.Name, c.Ready, c.RestartCount)
  478. }
  479. }
  480. }
  481. func computeContainerResourceUsage(name string, oldStats, newStats *stats.ContainerStats) *ContainerResourceUsage {
  482. return &ContainerResourceUsage{
  483. Name: name,
  484. Timestamp: newStats.CPU.Time.Time,
  485. CPUUsageInCores: float64(*newStats.CPU.UsageCoreNanoSeconds-*oldStats.CPU.UsageCoreNanoSeconds) / float64(newStats.CPU.Time.Time.Sub(oldStats.CPU.Time.Time).Nanoseconds()),
  486. MemoryUsageInBytes: *newStats.Memory.UsageBytes,
  487. MemoryWorkingSetInBytes: *newStats.Memory.WorkingSetBytes,
  488. MemoryRSSInBytes: *newStats.Memory.RSSBytes,
  489. CPUInterval: newStats.CPU.Time.Time.Sub(oldStats.CPU.Time.Time),
  490. }
  491. }
  492. // resourceCollector periodically polls the node, collect stats for a given
  493. // list of containers, computes and cache resource usage up to
  494. // maxEntriesPerContainer for each container.
  495. type resourceCollector struct {
  496. lock sync.RWMutex
  497. node string
  498. containers []string
  499. client clientset.Interface
  500. buffers map[string][]*ContainerResourceUsage
  501. pollingInterval time.Duration
  502. stopCh chan struct{}
  503. }
  504. func newResourceCollector(c clientset.Interface, nodeName string, containerNames []string, pollingInterval time.Duration) *resourceCollector {
  505. buffers := make(map[string][]*ContainerResourceUsage)
  506. return &resourceCollector{
  507. node: nodeName,
  508. containers: containerNames,
  509. client: c,
  510. buffers: buffers,
  511. pollingInterval: pollingInterval,
  512. }
  513. }
  514. // Start starts a goroutine to Poll the node every pollingInterval.
  515. func (r *resourceCollector) Start() {
  516. r.stopCh = make(chan struct{}, 1)
  517. // Keep the last observed stats for comparison.
  518. oldStats := make(map[string]*stats.ContainerStats)
  519. go wait.Until(func() { r.collectStats(oldStats) }, r.pollingInterval, r.stopCh)
  520. }
  521. // Stop sends a signal to terminate the stats collecting goroutine.
  522. func (r *resourceCollector) Stop() {
  523. close(r.stopCh)
  524. }
  525. // collectStats gets the latest stats from kubelet stats summary API, computes
  526. // the resource usage, and pushes it to the buffer.
  527. func (r *resourceCollector) collectStats(oldStatsMap map[string]*stats.ContainerStats) {
  528. summary, err := getNodeStatsSummary(r.client, r.node)
  529. if err != nil {
  530. e2elog.Logf("Error getting node stats summary on %q, err: %v", r.node, err)
  531. return
  532. }
  533. cStatsMap := getSystemContainerStats(summary)
  534. r.lock.Lock()
  535. defer r.lock.Unlock()
  536. for _, name := range r.containers {
  537. cStats, ok := cStatsMap[name]
  538. if !ok {
  539. e2elog.Logf("Missing info/stats for container %q on node %q", name, r.node)
  540. return
  541. }
  542. if oldStats, ok := oldStatsMap[name]; ok {
  543. if oldStats.CPU.Time.Equal(&cStats.CPU.Time) {
  544. // No change -> skip this stat.
  545. continue
  546. }
  547. r.buffers[name] = append(r.buffers[name], computeContainerResourceUsage(name, oldStats, cStats))
  548. }
  549. // Update the old stats.
  550. oldStatsMap[name] = cStats
  551. }
  552. }
  553. func (r *resourceCollector) GetLatest() (ResourceUsagePerContainer, error) {
  554. r.lock.RLock()
  555. defer r.lock.RUnlock()
  556. stats := make(ResourceUsagePerContainer)
  557. for _, name := range r.containers {
  558. contStats, ok := r.buffers[name]
  559. if !ok || len(contStats) == 0 {
  560. return nil, fmt.Errorf("Resource usage on node %q is not ready yet", r.node)
  561. }
  562. stats[name] = contStats[len(contStats)-1]
  563. }
  564. return stats, nil
  565. }
  566. // Reset frees the stats and start over.
  567. func (r *resourceCollector) Reset() {
  568. r.lock.Lock()
  569. defer r.lock.Unlock()
  570. for _, name := range r.containers {
  571. r.buffers[name] = []*ContainerResourceUsage{}
  572. }
  573. }
  574. type resourceUsageByCPU []*ContainerResourceUsage
  575. func (r resourceUsageByCPU) Len() int { return len(r) }
  576. func (r resourceUsageByCPU) Swap(i, j int) { r[i], r[j] = r[j], r[i] }
  577. func (r resourceUsageByCPU) Less(i, j int) bool { return r[i].CPUUsageInCores < r[j].CPUUsageInCores }
  578. // The percentiles to report.
  579. var percentiles = [...]float64{0.05, 0.20, 0.50, 0.70, 0.90, 0.95, 0.99}
  580. // GetBasicCPUStats returns the percentiles the cpu usage in cores for
  581. // containerName. This method examines all data currently in the buffer.
  582. func (r *resourceCollector) GetBasicCPUStats(containerName string) map[float64]float64 {
  583. r.lock.RLock()
  584. defer r.lock.RUnlock()
  585. result := make(map[float64]float64, len(percentiles))
  586. usages := r.buffers[containerName]
  587. sort.Sort(resourceUsageByCPU(usages))
  588. for _, q := range percentiles {
  589. index := int(float64(len(usages))*q) - 1
  590. if index < 0 {
  591. // We don't have enough data.
  592. result[q] = 0
  593. continue
  594. }
  595. result[q] = usages[index].CPUUsageInCores
  596. }
  597. return result
  598. }
  599. // ResourceMonitor manages a resourceCollector per node.
  600. type ResourceMonitor struct {
  601. client clientset.Interface
  602. containers []string
  603. pollingInterval time.Duration
  604. collectors map[string]*resourceCollector
  605. }
  606. // NewResourceMonitor returns a new ResourceMonitor.
  607. func NewResourceMonitor(c clientset.Interface, containerNames []string, pollingInterval time.Duration) *ResourceMonitor {
  608. return &ResourceMonitor{
  609. containers: containerNames,
  610. client: c,
  611. pollingInterval: pollingInterval,
  612. }
  613. }
  614. // Start starts collectors.
  615. func (r *ResourceMonitor) Start() {
  616. // It should be OK to monitor unschedulable Nodes
  617. nodes, err := r.client.CoreV1().Nodes().List(metav1.ListOptions{})
  618. if err != nil {
  619. Failf("ResourceMonitor: unable to get list of nodes: %v", err)
  620. }
  621. r.collectors = make(map[string]*resourceCollector, 0)
  622. for _, node := range nodes.Items {
  623. collector := newResourceCollector(r.client, node.Name, r.containers, r.pollingInterval)
  624. r.collectors[node.Name] = collector
  625. collector.Start()
  626. }
  627. }
  628. // Stop stops collectors.
  629. func (r *ResourceMonitor) Stop() {
  630. for _, collector := range r.collectors {
  631. collector.Stop()
  632. }
  633. }
  634. // Reset resets collectors.
  635. func (r *ResourceMonitor) Reset() {
  636. for _, collector := range r.collectors {
  637. collector.Reset()
  638. }
  639. }
  640. // LogLatest outputs the latest resource usage into log.
  641. func (r *ResourceMonitor) LogLatest() {
  642. summary, err := r.GetLatest()
  643. if err != nil {
  644. e2elog.Logf("%v", err)
  645. }
  646. e2elog.Logf("%s", r.FormatResourceUsage(summary))
  647. }
  648. // FormatResourceUsage returns the formatted string for LogLatest().
  649. // TODO(oomichi): This can be made to local function after making test/e2e/node/kubelet_perf.go use LogLatest directly instead.
  650. func (r *ResourceMonitor) FormatResourceUsage(s ResourceUsagePerNode) string {
  651. summary := []string{}
  652. for node, usage := range s {
  653. summary = append(summary, formatResourceUsageStats(node, usage))
  654. }
  655. return strings.Join(summary, "\n")
  656. }
  657. // GetLatest returns the latest resource usage.
  658. func (r *ResourceMonitor) GetLatest() (ResourceUsagePerNode, error) {
  659. result := make(ResourceUsagePerNode)
  660. errs := []error{}
  661. for key, collector := range r.collectors {
  662. s, err := collector.GetLatest()
  663. if err != nil {
  664. errs = append(errs, err)
  665. continue
  666. }
  667. result[key] = s
  668. }
  669. return result, utilerrors.NewAggregate(errs)
  670. }
  671. // GetMasterNodeLatest returns the latest resource usage of master and node.
  672. func (r *ResourceMonitor) GetMasterNodeLatest(usagePerNode ResourceUsagePerNode) ResourceUsagePerNode {
  673. result := make(ResourceUsagePerNode)
  674. var masterUsage ResourceUsagePerContainer
  675. var nodesUsage []ResourceUsagePerContainer
  676. for node, usage := range usagePerNode {
  677. if strings.HasSuffix(node, "master") {
  678. masterUsage = usage
  679. } else {
  680. nodesUsage = append(nodesUsage, usage)
  681. }
  682. }
  683. nodeAvgUsage := make(ResourceUsagePerContainer)
  684. for _, nodeUsage := range nodesUsage {
  685. for c, usage := range nodeUsage {
  686. if _, found := nodeAvgUsage[c]; !found {
  687. nodeAvgUsage[c] = &ContainerResourceUsage{Name: usage.Name}
  688. }
  689. nodeAvgUsage[c].CPUUsageInCores += usage.CPUUsageInCores
  690. nodeAvgUsage[c].MemoryUsageInBytes += usage.MemoryUsageInBytes
  691. nodeAvgUsage[c].MemoryWorkingSetInBytes += usage.MemoryWorkingSetInBytes
  692. nodeAvgUsage[c].MemoryRSSInBytes += usage.MemoryRSSInBytes
  693. }
  694. }
  695. for c := range nodeAvgUsage {
  696. nodeAvgUsage[c].CPUUsageInCores /= float64(len(nodesUsage))
  697. nodeAvgUsage[c].MemoryUsageInBytes /= uint64(len(nodesUsage))
  698. nodeAvgUsage[c].MemoryWorkingSetInBytes /= uint64(len(nodesUsage))
  699. nodeAvgUsage[c].MemoryRSSInBytes /= uint64(len(nodesUsage))
  700. }
  701. result["master"] = masterUsage
  702. result["node"] = nodeAvgUsage
  703. return result
  704. }
  705. // ContainersCPUSummary is indexed by the container name with each entry a
  706. // (percentile, value) map.
  707. type ContainersCPUSummary map[string]map[float64]float64
  708. // NodesCPUSummary is indexed by the node name with each entry a
  709. // ContainersCPUSummary map.
  710. type NodesCPUSummary map[string]ContainersCPUSummary
  711. // FormatCPUSummary returns the string of human-readable CPU summary from the specified summary data.
  712. func (r *ResourceMonitor) FormatCPUSummary(summary NodesCPUSummary) string {
  713. // Example output for a node (the percentiles may differ):
  714. // CPU usage of containers on node "e2e-test-foo-node-0vj7":
  715. // container 5th% 50th% 90th% 95th%
  716. // "/" 0.051 0.159 0.387 0.455
  717. // "/runtime 0.000 0.000 0.146 0.166
  718. // "/kubelet" 0.036 0.053 0.091 0.154
  719. // "/misc" 0.001 0.001 0.001 0.002
  720. var summaryStrings []string
  721. var header []string
  722. header = append(header, "container")
  723. for _, p := range percentiles {
  724. header = append(header, fmt.Sprintf("%.0fth%%", p*100))
  725. }
  726. for nodeName, containers := range summary {
  727. buf := &bytes.Buffer{}
  728. w := tabwriter.NewWriter(buf, 1, 0, 1, ' ', 0)
  729. fmt.Fprintf(w, "%s\n", strings.Join(header, "\t"))
  730. for _, containerName := range TargetContainers() {
  731. var s []string
  732. s = append(s, fmt.Sprintf("%q", containerName))
  733. data, ok := containers[containerName]
  734. for _, p := range percentiles {
  735. value := "N/A"
  736. if ok {
  737. value = fmt.Sprintf("%.3f", data[p])
  738. }
  739. s = append(s, value)
  740. }
  741. fmt.Fprintf(w, "%s\n", strings.Join(s, "\t"))
  742. }
  743. w.Flush()
  744. summaryStrings = append(summaryStrings, fmt.Sprintf("CPU usage of containers on node %q\n:%s", nodeName, buf.String()))
  745. }
  746. return strings.Join(summaryStrings, "\n")
  747. }
  748. // LogCPUSummary outputs summary of CPU into log.
  749. func (r *ResourceMonitor) LogCPUSummary() {
  750. summary := r.GetCPUSummary()
  751. e2elog.Logf("%s", r.FormatCPUSummary(summary))
  752. }
  753. // GetCPUSummary returns summary of CPU.
  754. func (r *ResourceMonitor) GetCPUSummary() NodesCPUSummary {
  755. result := make(NodesCPUSummary)
  756. for nodeName, collector := range r.collectors {
  757. result[nodeName] = make(ContainersCPUSummary)
  758. for _, containerName := range TargetContainers() {
  759. data := collector.GetBasicCPUStats(containerName)
  760. result[nodeName][containerName] = data
  761. }
  762. }
  763. return result
  764. }
  765. // GetMasterNodeCPUSummary returns summary of master node CPUs.
  766. func (r *ResourceMonitor) GetMasterNodeCPUSummary(summaryPerNode NodesCPUSummary) NodesCPUSummary {
  767. result := make(NodesCPUSummary)
  768. var masterSummary ContainersCPUSummary
  769. var nodesSummaries []ContainersCPUSummary
  770. for node, summary := range summaryPerNode {
  771. if strings.HasSuffix(node, "master") {
  772. masterSummary = summary
  773. } else {
  774. nodesSummaries = append(nodesSummaries, summary)
  775. }
  776. }
  777. nodeAvgSummary := make(ContainersCPUSummary)
  778. for _, nodeSummary := range nodesSummaries {
  779. for c, summary := range nodeSummary {
  780. if _, found := nodeAvgSummary[c]; !found {
  781. nodeAvgSummary[c] = map[float64]float64{}
  782. }
  783. for perc, value := range summary {
  784. nodeAvgSummary[c][perc] += value
  785. }
  786. }
  787. }
  788. for c := range nodeAvgSummary {
  789. for perc := range nodeAvgSummary[c] {
  790. nodeAvgSummary[c][perc] /= float64(len(nodesSummaries))
  791. }
  792. }
  793. result["master"] = masterSummary
  794. result["node"] = nodeAvgSummary
  795. return result
  796. }