stats.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666
  1. /*
  2. Copyright 2019 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package kubelet
  14. import (
  15. "bytes"
  16. "context"
  17. "encoding/json"
  18. "fmt"
  19. "sort"
  20. "strings"
  21. "sync"
  22. "text/tabwriter"
  23. "time"
  24. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  25. utilerrors "k8s.io/apimachinery/pkg/util/errors"
  26. "k8s.io/apimachinery/pkg/util/wait"
  27. clientset "k8s.io/client-go/kubernetes"
  28. restclient "k8s.io/client-go/rest"
  29. kubeletstatsv1alpha1 "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
  30. dockermetrics "k8s.io/kubernetes/pkg/kubelet/dockershim/metrics"
  31. "k8s.io/kubernetes/pkg/master/ports"
  32. "k8s.io/kubernetes/test/e2e/framework"
  33. e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics"
  34. )
  35. const (
  36. // timeout for proxy requests.
  37. proxyTimeout = 2 * time.Minute
  38. )
  39. // ContainerResourceUsage is a structure for gathering container resource usage.
  40. type ContainerResourceUsage struct {
  41. Name string
  42. Timestamp time.Time
  43. CPUUsageInCores float64
  44. MemoryUsageInBytes uint64
  45. MemoryWorkingSetInBytes uint64
  46. MemoryRSSInBytes uint64
  47. // The interval used to calculate CPUUsageInCores.
  48. CPUInterval time.Duration
  49. }
  50. // ResourceUsagePerContainer is map of ContainerResourceUsage
  51. type ResourceUsagePerContainer map[string]*ContainerResourceUsage
  52. // ResourceUsagePerNode is map of ResourceUsagePerContainer.
  53. type ResourceUsagePerNode map[string]ResourceUsagePerContainer
  54. // ContainersCPUSummary is indexed by the container name with each entry a
  55. // (percentile, value) map.
  56. type ContainersCPUSummary map[string]map[float64]float64
  57. // NodesCPUSummary is indexed by the node name with each entry a
  58. // ContainersCPUSummary map.
  59. type NodesCPUSummary map[string]ContainersCPUSummary
  60. // RuntimeOperationMonitor is the tool getting and parsing docker operation metrics.
  61. type RuntimeOperationMonitor struct {
  62. client clientset.Interface
  63. nodesRuntimeOps map[string]NodeRuntimeOperationErrorRate
  64. }
  65. // NodeRuntimeOperationErrorRate is the runtime operation error rate on one node.
  66. type NodeRuntimeOperationErrorRate map[string]*RuntimeOperationErrorRate
  67. // RuntimeOperationErrorRate is the error rate of a specified runtime operation.
  68. type RuntimeOperationErrorRate struct {
  69. TotalNumber float64
  70. ErrorRate float64
  71. TimeoutRate float64
  72. }
  73. // ProxyRequest performs a get on a node proxy endpoint given the nodename and rest client.
  74. func ProxyRequest(c clientset.Interface, node, endpoint string, port int) (restclient.Result, error) {
  75. // proxy tends to hang in some cases when Node is not ready. Add an artificial timeout for this call. #22165
  76. var result restclient.Result
  77. finished := make(chan struct{}, 1)
  78. go func() {
  79. result = c.CoreV1().RESTClient().Get().
  80. Resource("nodes").
  81. SubResource("proxy").
  82. Name(fmt.Sprintf("%v:%v", node, port)).
  83. Suffix(endpoint).
  84. Do(context.TODO())
  85. finished <- struct{}{}
  86. }()
  87. select {
  88. case <-finished:
  89. return result, nil
  90. case <-time.After(proxyTimeout):
  91. return restclient.Result{}, nil
  92. }
  93. }
  94. // NewRuntimeOperationMonitor returns a new RuntimeOperationMonitor.
  95. func NewRuntimeOperationMonitor(c clientset.Interface) *RuntimeOperationMonitor {
  96. m := &RuntimeOperationMonitor{
  97. client: c,
  98. nodesRuntimeOps: make(map[string]NodeRuntimeOperationErrorRate),
  99. }
  100. nodes, err := m.client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
  101. if err != nil {
  102. framework.Failf("RuntimeOperationMonitor: unable to get list of nodes: %v", err)
  103. }
  104. for _, node := range nodes.Items {
  105. m.nodesRuntimeOps[node.Name] = make(NodeRuntimeOperationErrorRate)
  106. }
  107. // Initialize the runtime operation error rate
  108. m.GetRuntimeOperationErrorRate()
  109. return m
  110. }
  111. // GetRuntimeOperationErrorRate gets runtime operation records from kubelet metrics and calculate
  112. // error rates of all runtime operations.
  113. func (m *RuntimeOperationMonitor) GetRuntimeOperationErrorRate() map[string]NodeRuntimeOperationErrorRate {
  114. for node := range m.nodesRuntimeOps {
  115. nodeResult, err := getNodeRuntimeOperationErrorRate(m.client, node)
  116. if err != nil {
  117. framework.Logf("GetRuntimeOperationErrorRate: unable to get kubelet metrics from node %q: %v", node, err)
  118. continue
  119. }
  120. m.nodesRuntimeOps[node] = nodeResult
  121. }
  122. return m.nodesRuntimeOps
  123. }
  124. // GetLatestRuntimeOperationErrorRate gets latest error rate and timeout rate from last observed RuntimeOperationErrorRate.
  125. func (m *RuntimeOperationMonitor) GetLatestRuntimeOperationErrorRate() map[string]NodeRuntimeOperationErrorRate {
  126. result := make(map[string]NodeRuntimeOperationErrorRate)
  127. for node := range m.nodesRuntimeOps {
  128. result[node] = make(NodeRuntimeOperationErrorRate)
  129. oldNodeResult := m.nodesRuntimeOps[node]
  130. curNodeResult, err := getNodeRuntimeOperationErrorRate(m.client, node)
  131. if err != nil {
  132. framework.Logf("GetLatestRuntimeOperationErrorRate: unable to get kubelet metrics from node %q: %v", node, err)
  133. continue
  134. }
  135. for op, cur := range curNodeResult {
  136. t := *cur
  137. if old, found := oldNodeResult[op]; found {
  138. t.ErrorRate = (t.ErrorRate*t.TotalNumber - old.ErrorRate*old.TotalNumber) / (t.TotalNumber - old.TotalNumber)
  139. t.TimeoutRate = (t.TimeoutRate*t.TotalNumber - old.TimeoutRate*old.TotalNumber) / (t.TotalNumber - old.TotalNumber)
  140. t.TotalNumber -= old.TotalNumber
  141. }
  142. result[node][op] = &t
  143. }
  144. m.nodesRuntimeOps[node] = curNodeResult
  145. }
  146. return result
  147. }
  148. // FormatRuntimeOperationErrorRate formats the runtime operation error rate to string.
  149. func FormatRuntimeOperationErrorRate(nodesResult map[string]NodeRuntimeOperationErrorRate) string {
  150. lines := []string{}
  151. for node, nodeResult := range nodesResult {
  152. lines = append(lines, fmt.Sprintf("node %q runtime operation error rate:", node))
  153. for op, result := range nodeResult {
  154. line := fmt.Sprintf("operation %q: total - %.0f; error rate - %f; timeout rate - %f", op,
  155. result.TotalNumber, result.ErrorRate, result.TimeoutRate)
  156. lines = append(lines, line)
  157. }
  158. lines = append(lines, fmt.Sprintln())
  159. }
  160. return strings.Join(lines, "\n")
  161. }
  162. // getNodeRuntimeOperationErrorRate gets runtime operation error rate from specified node.
  163. func getNodeRuntimeOperationErrorRate(c clientset.Interface, node string) (NodeRuntimeOperationErrorRate, error) {
  164. result := make(NodeRuntimeOperationErrorRate)
  165. ms, err := e2emetrics.GetKubeletMetrics(c, node)
  166. if err != nil {
  167. return result, err
  168. }
  169. // If no corresponding metrics are found, the returned samples will be empty. Then the following
  170. // loop will be skipped automatically.
  171. allOps := ms[dockermetrics.DockerOperationsKey]
  172. errOps := ms[dockermetrics.DockerOperationsErrorsKey]
  173. timeoutOps := ms[dockermetrics.DockerOperationsTimeoutKey]
  174. for _, sample := range allOps {
  175. operation := string(sample.Metric["operation_type"])
  176. result[operation] = &RuntimeOperationErrorRate{TotalNumber: float64(sample.Value)}
  177. }
  178. for _, sample := range errOps {
  179. operation := string(sample.Metric["operation_type"])
  180. // Should always find the corresponding item, just in case
  181. if _, found := result[operation]; found {
  182. result[operation].ErrorRate = float64(sample.Value) / result[operation].TotalNumber
  183. }
  184. }
  185. for _, sample := range timeoutOps {
  186. operation := string(sample.Metric["operation_type"])
  187. if _, found := result[operation]; found {
  188. result[operation].TimeoutRate = float64(sample.Value) / result[operation].TotalNumber
  189. }
  190. }
  191. return result, nil
  192. }
  193. // GetStatsSummary contacts kubelet for the container information.
  194. func GetStatsSummary(c clientset.Interface, nodeName string) (*kubeletstatsv1alpha1.Summary, error) {
  195. ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout)
  196. defer cancel()
  197. data, err := c.CoreV1().RESTClient().Get().
  198. Resource("nodes").
  199. SubResource("proxy").
  200. Name(fmt.Sprintf("%v:%v", nodeName, ports.KubeletPort)).
  201. Suffix("stats/summary").
  202. Do(ctx).Raw()
  203. if err != nil {
  204. return nil, err
  205. }
  206. summary := kubeletstatsv1alpha1.Summary{}
  207. err = json.Unmarshal(data, &summary)
  208. if err != nil {
  209. return nil, err
  210. }
  211. return &summary, nil
  212. }
  213. func getNodeStatsSummary(c clientset.Interface, nodeName string) (*kubeletstatsv1alpha1.Summary, error) {
  214. data, err := c.CoreV1().RESTClient().Get().
  215. Resource("nodes").
  216. SubResource("proxy").
  217. Name(fmt.Sprintf("%v:%v", nodeName, ports.KubeletPort)).
  218. Suffix("stats/summary").
  219. SetHeader("Content-Type", "application/json").
  220. Do(context.TODO()).Raw()
  221. if err != nil {
  222. return nil, err
  223. }
  224. var summary *kubeletstatsv1alpha1.Summary
  225. err = json.Unmarshal(data, &summary)
  226. if err != nil {
  227. return nil, err
  228. }
  229. return summary, nil
  230. }
  231. func getSystemContainerStats(summary *kubeletstatsv1alpha1.Summary) map[string]*kubeletstatsv1alpha1.ContainerStats {
  232. statsList := summary.Node.SystemContainers
  233. statsMap := make(map[string]*kubeletstatsv1alpha1.ContainerStats)
  234. for i := range statsList {
  235. statsMap[statsList[i].Name] = &statsList[i]
  236. }
  237. // Create a root container stats using information available in
  238. // stats.NodeStats. This is necessary since it is a different type.
  239. statsMap[rootContainerName] = &kubeletstatsv1alpha1.ContainerStats{
  240. CPU: summary.Node.CPU,
  241. Memory: summary.Node.Memory,
  242. }
  243. return statsMap
  244. }
  245. const (
  246. rootContainerName = "/"
  247. )
  248. // TargetContainers returns a list of containers for which we want to collect resource usage.
  249. func TargetContainers() []string {
  250. return []string{
  251. rootContainerName,
  252. kubeletstatsv1alpha1.SystemContainerRuntime,
  253. kubeletstatsv1alpha1.SystemContainerKubelet,
  254. }
  255. }
  256. func formatResourceUsageStats(nodeName string, containerStats ResourceUsagePerContainer) string {
  257. // Example output:
  258. //
  259. // Resource usage for node "e2e-test-foo-node-abcde":
  260. // container cpu(cores) memory(MB)
  261. // "/" 0.363 2942.09
  262. // "/docker-daemon" 0.088 521.80
  263. // "/kubelet" 0.086 424.37
  264. // "/system" 0.007 119.88
  265. buf := &bytes.Buffer{}
  266. w := tabwriter.NewWriter(buf, 1, 0, 1, ' ', 0)
  267. fmt.Fprintf(w, "container\tcpu(cores)\tmemory_working_set(MB)\tmemory_rss(MB)\n")
  268. for name, s := range containerStats {
  269. fmt.Fprintf(w, "%q\t%.3f\t%.2f\t%.2f\n", name, s.CPUUsageInCores, float64(s.MemoryWorkingSetInBytes)/(1024*1024), float64(s.MemoryRSSInBytes)/(1024*1024))
  270. }
  271. w.Flush()
  272. return fmt.Sprintf("Resource usage on node %q:\n%s", nodeName, buf.String())
  273. }
  274. // GetKubeletHeapStats returns stats of kubelet heap.
  275. func GetKubeletHeapStats(c clientset.Interface, nodeName string) (string, error) {
  276. client, err := ProxyRequest(c, nodeName, "debug/pprof/heap", ports.KubeletPort)
  277. if err != nil {
  278. return "", err
  279. }
  280. raw, errRaw := client.Raw()
  281. if errRaw != nil {
  282. return "", err
  283. }
  284. kubeletstatsv1alpha1 := string(raw)
  285. // Only dumping the runtime.MemStats numbers to avoid polluting the log.
  286. numLines := 23
  287. lines := strings.Split(kubeletstatsv1alpha1, "\n")
  288. return strings.Join(lines[len(lines)-numLines:], "\n"), nil
  289. }
  290. func computeContainerResourceUsage(name string, oldStats, newStats *kubeletstatsv1alpha1.ContainerStats) *ContainerResourceUsage {
  291. return &ContainerResourceUsage{
  292. Name: name,
  293. Timestamp: newStats.CPU.Time.Time,
  294. CPUUsageInCores: float64(*newStats.CPU.UsageCoreNanoSeconds-*oldStats.CPU.UsageCoreNanoSeconds) / float64(newStats.CPU.Time.Time.Sub(oldStats.CPU.Time.Time).Nanoseconds()),
  295. MemoryUsageInBytes: *newStats.Memory.UsageBytes,
  296. MemoryWorkingSetInBytes: *newStats.Memory.WorkingSetBytes,
  297. MemoryRSSInBytes: *newStats.Memory.RSSBytes,
  298. CPUInterval: newStats.CPU.Time.Time.Sub(oldStats.CPU.Time.Time),
  299. }
  300. }
  301. // resourceCollector periodically polls the node, collect stats for a given
  302. // list of containers, computes and cache resource usage up to
  303. // maxEntriesPerContainer for each container.
  304. type resourceCollector struct {
  305. lock sync.RWMutex
  306. node string
  307. containers []string
  308. client clientset.Interface
  309. buffers map[string][]*ContainerResourceUsage
  310. pollingInterval time.Duration
  311. stopCh chan struct{}
  312. }
  313. func newResourceCollector(c clientset.Interface, nodeName string, containerNames []string, pollingInterval time.Duration) *resourceCollector {
  314. buffers := make(map[string][]*ContainerResourceUsage)
  315. return &resourceCollector{
  316. node: nodeName,
  317. containers: containerNames,
  318. client: c,
  319. buffers: buffers,
  320. pollingInterval: pollingInterval,
  321. }
  322. }
  323. // Start starts a goroutine to Poll the node every pollingInterval.
  324. func (r *resourceCollector) Start() {
  325. r.stopCh = make(chan struct{}, 1)
  326. // Keep the last observed stats for comparison.
  327. oldStats := make(map[string]*kubeletstatsv1alpha1.ContainerStats)
  328. go wait.Until(func() { r.collectStats(oldStats) }, r.pollingInterval, r.stopCh)
  329. }
  330. // Stop sends a signal to terminate the stats collecting goroutine.
  331. func (r *resourceCollector) Stop() {
  332. close(r.stopCh)
  333. }
  334. // collectStats gets the latest stats from kubelet stats summary API, computes
  335. // the resource usage, and pushes it to the buffer.
  336. func (r *resourceCollector) collectStats(oldStatsMap map[string]*kubeletstatsv1alpha1.ContainerStats) {
  337. summary, err := getNodeStatsSummary(r.client, r.node)
  338. if err != nil {
  339. framework.Logf("Error getting node stats summary on %q, err: %v", r.node, err)
  340. return
  341. }
  342. cStatsMap := getSystemContainerStats(summary)
  343. r.lock.Lock()
  344. defer r.lock.Unlock()
  345. for _, name := range r.containers {
  346. cStats, ok := cStatsMap[name]
  347. if !ok {
  348. framework.Logf("Missing info/stats for container %q on node %q", name, r.node)
  349. return
  350. }
  351. if oldStats, ok := oldStatsMap[name]; ok {
  352. if oldStats.CPU == nil || cStats.CPU == nil || oldStats.Memory == nil || cStats.Memory == nil {
  353. continue
  354. }
  355. if oldStats.CPU.Time.Equal(&cStats.CPU.Time) {
  356. // No change -> skip this stat.
  357. continue
  358. }
  359. r.buffers[name] = append(r.buffers[name], computeContainerResourceUsage(name, oldStats, cStats))
  360. }
  361. // Update the old stats.
  362. oldStatsMap[name] = cStats
  363. }
  364. }
  365. func (r *resourceCollector) GetLatest() (ResourceUsagePerContainer, error) {
  366. r.lock.RLock()
  367. defer r.lock.RUnlock()
  368. kubeletstatsv1alpha1 := make(ResourceUsagePerContainer)
  369. for _, name := range r.containers {
  370. contStats, ok := r.buffers[name]
  371. if !ok || len(contStats) == 0 {
  372. return nil, fmt.Errorf("Resource usage on node %q is not ready yet", r.node)
  373. }
  374. kubeletstatsv1alpha1[name] = contStats[len(contStats)-1]
  375. }
  376. return kubeletstatsv1alpha1, nil
  377. }
  378. // Reset frees the stats and start over.
  379. func (r *resourceCollector) Reset() {
  380. r.lock.Lock()
  381. defer r.lock.Unlock()
  382. for _, name := range r.containers {
  383. r.buffers[name] = []*ContainerResourceUsage{}
  384. }
  385. }
  386. type resourceUsageByCPU []*ContainerResourceUsage
  387. func (r resourceUsageByCPU) Len() int { return len(r) }
  388. func (r resourceUsageByCPU) Swap(i, j int) { r[i], r[j] = r[j], r[i] }
  389. func (r resourceUsageByCPU) Less(i, j int) bool { return r[i].CPUUsageInCores < r[j].CPUUsageInCores }
  390. // The percentiles to report.
  391. var percentiles = [...]float64{0.05, 0.20, 0.50, 0.70, 0.90, 0.95, 0.99}
  392. // GetBasicCPUStats returns the percentiles the cpu usage in cores for
  393. // containerName. This method examines all data currently in the buffer.
  394. func (r *resourceCollector) GetBasicCPUStats(containerName string) map[float64]float64 {
  395. r.lock.RLock()
  396. defer r.lock.RUnlock()
  397. result := make(map[float64]float64, len(percentiles))
  398. usages := r.buffers[containerName]
  399. sort.Sort(resourceUsageByCPU(usages))
  400. for _, q := range percentiles {
  401. index := int(float64(len(usages))*q) - 1
  402. if index < 0 {
  403. // We don't have enough data.
  404. result[q] = 0
  405. continue
  406. }
  407. result[q] = usages[index].CPUUsageInCores
  408. }
  409. return result
  410. }
  411. // ResourceMonitor manages a resourceCollector per node.
  412. type ResourceMonitor struct {
  413. client clientset.Interface
  414. containers []string
  415. pollingInterval time.Duration
  416. collectors map[string]*resourceCollector
  417. }
  418. // NewResourceMonitor returns a new ResourceMonitor.
  419. func NewResourceMonitor(c clientset.Interface, containerNames []string, pollingInterval time.Duration) *ResourceMonitor {
  420. return &ResourceMonitor{
  421. containers: containerNames,
  422. client: c,
  423. pollingInterval: pollingInterval,
  424. }
  425. }
  426. // Start starts collectors.
  427. func (r *ResourceMonitor) Start() {
  428. // It should be OK to monitor unschedulable Nodes
  429. nodes, err := r.client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
  430. if err != nil {
  431. framework.Failf("ResourceMonitor: unable to get list of nodes: %v", err)
  432. }
  433. r.collectors = make(map[string]*resourceCollector, 0)
  434. for _, node := range nodes.Items {
  435. collector := newResourceCollector(r.client, node.Name, r.containers, r.pollingInterval)
  436. r.collectors[node.Name] = collector
  437. collector.Start()
  438. }
  439. }
  440. // Stop stops collectors.
  441. func (r *ResourceMonitor) Stop() {
  442. for _, collector := range r.collectors {
  443. collector.Stop()
  444. }
  445. }
  446. // Reset resets collectors.
  447. func (r *ResourceMonitor) Reset() {
  448. for _, collector := range r.collectors {
  449. collector.Reset()
  450. }
  451. }
  452. // LogLatest outputs the latest resource usage into log.
  453. func (r *ResourceMonitor) LogLatest() {
  454. summary, err := r.GetLatest()
  455. if err != nil {
  456. framework.Logf("%v", err)
  457. }
  458. framework.Logf("%s", r.FormatResourceUsage(summary))
  459. }
  460. // FormatResourceUsage returns the formatted string for LogLatest().
  461. // TODO(oomichi): This can be made to local function after making test/e2e/node/kubelet_perf.go use LogLatest directly instead.
  462. func (r *ResourceMonitor) FormatResourceUsage(s ResourceUsagePerNode) string {
  463. summary := []string{}
  464. for node, usage := range s {
  465. summary = append(summary, formatResourceUsageStats(node, usage))
  466. }
  467. return strings.Join(summary, "\n")
  468. }
  469. // GetLatest returns the latest resource usage.
  470. func (r *ResourceMonitor) GetLatest() (ResourceUsagePerNode, error) {
  471. result := make(ResourceUsagePerNode)
  472. errs := []error{}
  473. for key, collector := range r.collectors {
  474. s, err := collector.GetLatest()
  475. if err != nil {
  476. errs = append(errs, err)
  477. continue
  478. }
  479. result[key] = s
  480. }
  481. return result, utilerrors.NewAggregate(errs)
  482. }
  483. // GetMasterNodeLatest returns the latest resource usage of master and node.
  484. func (r *ResourceMonitor) GetMasterNodeLatest(usagePerNode ResourceUsagePerNode) ResourceUsagePerNode {
  485. result := make(ResourceUsagePerNode)
  486. var masterUsage ResourceUsagePerContainer
  487. var nodesUsage []ResourceUsagePerContainer
  488. for node, usage := range usagePerNode {
  489. if strings.HasSuffix(node, "master") {
  490. masterUsage = usage
  491. } else {
  492. nodesUsage = append(nodesUsage, usage)
  493. }
  494. }
  495. nodeAvgUsage := make(ResourceUsagePerContainer)
  496. for _, nodeUsage := range nodesUsage {
  497. for c, usage := range nodeUsage {
  498. if _, found := nodeAvgUsage[c]; !found {
  499. nodeAvgUsage[c] = &ContainerResourceUsage{Name: usage.Name}
  500. }
  501. nodeAvgUsage[c].CPUUsageInCores += usage.CPUUsageInCores
  502. nodeAvgUsage[c].MemoryUsageInBytes += usage.MemoryUsageInBytes
  503. nodeAvgUsage[c].MemoryWorkingSetInBytes += usage.MemoryWorkingSetInBytes
  504. nodeAvgUsage[c].MemoryRSSInBytes += usage.MemoryRSSInBytes
  505. }
  506. }
  507. for c := range nodeAvgUsage {
  508. nodeAvgUsage[c].CPUUsageInCores /= float64(len(nodesUsage))
  509. nodeAvgUsage[c].MemoryUsageInBytes /= uint64(len(nodesUsage))
  510. nodeAvgUsage[c].MemoryWorkingSetInBytes /= uint64(len(nodesUsage))
  511. nodeAvgUsage[c].MemoryRSSInBytes /= uint64(len(nodesUsage))
  512. }
  513. result["master"] = masterUsage
  514. result["node"] = nodeAvgUsage
  515. return result
  516. }
  517. // FormatCPUSummary returns the string of human-readable CPU summary from the specified summary data.
  518. func (r *ResourceMonitor) FormatCPUSummary(summary NodesCPUSummary) string {
  519. // Example output for a node (the percentiles may differ):
  520. // CPU usage of containers on node "e2e-test-foo-node-0vj7":
  521. // container 5th% 50th% 90th% 95th%
  522. // "/" 0.051 0.159 0.387 0.455
  523. // "/runtime 0.000 0.000 0.146 0.166
  524. // "/kubelet" 0.036 0.053 0.091 0.154
  525. // "/misc" 0.001 0.001 0.001 0.002
  526. var summaryStrings []string
  527. var header []string
  528. header = append(header, "container")
  529. for _, p := range percentiles {
  530. header = append(header, fmt.Sprintf("%.0fth%%", p*100))
  531. }
  532. for nodeName, containers := range summary {
  533. buf := &bytes.Buffer{}
  534. w := tabwriter.NewWriter(buf, 1, 0, 1, ' ', 0)
  535. fmt.Fprintf(w, "%s\n", strings.Join(header, "\t"))
  536. for _, containerName := range TargetContainers() {
  537. var s []string
  538. s = append(s, fmt.Sprintf("%q", containerName))
  539. data, ok := containers[containerName]
  540. for _, p := range percentiles {
  541. value := "N/A"
  542. if ok {
  543. value = fmt.Sprintf("%.3f", data[p])
  544. }
  545. s = append(s, value)
  546. }
  547. fmt.Fprintf(w, "%s\n", strings.Join(s, "\t"))
  548. }
  549. w.Flush()
  550. summaryStrings = append(summaryStrings, fmt.Sprintf("CPU usage of containers on node %q\n:%s", nodeName, buf.String()))
  551. }
  552. return strings.Join(summaryStrings, "\n")
  553. }
  554. // LogCPUSummary outputs summary of CPU into log.
  555. func (r *ResourceMonitor) LogCPUSummary() {
  556. summary := r.GetCPUSummary()
  557. framework.Logf("%s", r.FormatCPUSummary(summary))
  558. }
  559. // GetCPUSummary returns summary of CPU.
  560. func (r *ResourceMonitor) GetCPUSummary() NodesCPUSummary {
  561. result := make(NodesCPUSummary)
  562. for nodeName, collector := range r.collectors {
  563. result[nodeName] = make(ContainersCPUSummary)
  564. for _, containerName := range TargetContainers() {
  565. data := collector.GetBasicCPUStats(containerName)
  566. result[nodeName][containerName] = data
  567. }
  568. }
  569. return result
  570. }
  571. // GetMasterNodeCPUSummary returns summary of master node CPUs.
  572. func (r *ResourceMonitor) GetMasterNodeCPUSummary(summaryPerNode NodesCPUSummary) NodesCPUSummary {
  573. result := make(NodesCPUSummary)
  574. var masterSummary ContainersCPUSummary
  575. var nodesSummaries []ContainersCPUSummary
  576. for node, summary := range summaryPerNode {
  577. if strings.HasSuffix(node, "master") {
  578. masterSummary = summary
  579. } else {
  580. nodesSummaries = append(nodesSummaries, summary)
  581. }
  582. }
  583. nodeAvgSummary := make(ContainersCPUSummary)
  584. for _, nodeSummary := range nodesSummaries {
  585. for c, summary := range nodeSummary {
  586. if _, found := nodeAvgSummary[c]; !found {
  587. nodeAvgSummary[c] = map[float64]float64{}
  588. }
  589. for perc, value := range summary {
  590. nodeAvgSummary[c][perc] += value
  591. }
  592. }
  593. }
  594. for c := range nodeAvgSummary {
  595. for perc := range nodeAvgSummary[c] {
  596. nodeAvgSummary[c][perc] /= float64(len(nodesSummaries))
  597. }
  598. }
  599. result["master"] = masterSummary
  600. result["node"] = nodeAvgSummary
  601. return result
  602. }