resource_usage_gatherer.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package framework
  14. import (
  15. "bufio"
  16. "bytes"
  17. "context"
  18. "encoding/json"
  19. "fmt"
  20. "math"
  21. "sort"
  22. "strconv"
  23. "strings"
  24. "sync"
  25. "text/tabwriter"
  26. "time"
  27. v1 "k8s.io/api/core/v1"
  28. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  29. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  30. clientset "k8s.io/client-go/kubernetes"
  31. kubeletstatsv1alpha1 "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
  32. "k8s.io/kubernetes/pkg/master/ports"
  33. "k8s.io/kubernetes/test/e2e/system"
  34. // TODO: Remove the following imports (ref: https://github.com/kubernetes/kubernetes/issues/81245)
  35. e2essh "k8s.io/kubernetes/test/e2e/framework/ssh"
  36. )
  37. // ResourceConstraint is a struct to hold constraints.
  38. type ResourceConstraint struct {
  39. CPUConstraint float64
  40. MemoryConstraint uint64
  41. }
  42. // SingleContainerSummary is a struct to hold single container summary.
  43. type SingleContainerSummary struct {
  44. Name string
  45. CPU float64
  46. Mem uint64
  47. }
  48. // ContainerResourceUsage is a structure for gathering container resource usage.
  49. type ContainerResourceUsage struct {
  50. Name string
  51. Timestamp time.Time
  52. CPUUsageInCores float64
  53. MemoryUsageInBytes uint64
  54. MemoryWorkingSetInBytes uint64
  55. MemoryRSSInBytes uint64
  56. // The interval used to calculate CPUUsageInCores.
  57. CPUInterval time.Duration
  58. }
  59. // ResourceUsagePerContainer is map of ContainerResourceUsage
  60. type ResourceUsagePerContainer map[string]*ContainerResourceUsage
  61. // ResourceUsageSummary is a struct to hold resource usage summary.
  62. // we can't have int here, as JSON does not accept integer keys.
  63. type ResourceUsageSummary map[string][]SingleContainerSummary
  64. // PrintHumanReadable prints resource usage summary in human readable.
  65. func (s *ResourceUsageSummary) PrintHumanReadable() string {
  66. buf := &bytes.Buffer{}
  67. w := tabwriter.NewWriter(buf, 1, 0, 1, ' ', 0)
  68. for perc, summaries := range *s {
  69. buf.WriteString(fmt.Sprintf("%v percentile:\n", perc))
  70. fmt.Fprintf(w, "container\tcpu(cores)\tmemory(MB)\n")
  71. for _, summary := range summaries {
  72. fmt.Fprintf(w, "%q\t%.3f\t%.2f\n", summary.Name, summary.CPU, float64(summary.Mem)/(1024*1024))
  73. }
  74. w.Flush()
  75. }
  76. return buf.String()
  77. }
  78. // PrintJSON prints resource usage summary in JSON.
  79. func (s *ResourceUsageSummary) PrintJSON() string {
  80. return PrettyPrintJSON(*s)
  81. }
  82. // SummaryKind returns string of ResourceUsageSummary
  83. func (s *ResourceUsageSummary) SummaryKind() string {
  84. return "ResourceUsageSummary"
  85. }
  86. type uint64arr []uint64
  87. func (a uint64arr) Len() int { return len(a) }
  88. func (a uint64arr) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
  89. func (a uint64arr) Less(i, j int) bool { return a[i] < a[j] }
  90. type usageDataPerContainer struct {
  91. cpuData []float64
  92. memUseData []uint64
  93. memWorkSetData []uint64
  94. }
  95. func computePercentiles(timeSeries []ResourceUsagePerContainer, percentilesToCompute []int) map[int]ResourceUsagePerContainer {
  96. if len(timeSeries) == 0 {
  97. return make(map[int]ResourceUsagePerContainer)
  98. }
  99. dataMap := make(map[string]*usageDataPerContainer)
  100. for i := range timeSeries {
  101. for name, data := range timeSeries[i] {
  102. if dataMap[name] == nil {
  103. dataMap[name] = &usageDataPerContainer{
  104. cpuData: make([]float64, 0, len(timeSeries)),
  105. memUseData: make([]uint64, 0, len(timeSeries)),
  106. memWorkSetData: make([]uint64, 0, len(timeSeries)),
  107. }
  108. }
  109. dataMap[name].cpuData = append(dataMap[name].cpuData, data.CPUUsageInCores)
  110. dataMap[name].memUseData = append(dataMap[name].memUseData, data.MemoryUsageInBytes)
  111. dataMap[name].memWorkSetData = append(dataMap[name].memWorkSetData, data.MemoryWorkingSetInBytes)
  112. }
  113. }
  114. for _, v := range dataMap {
  115. sort.Float64s(v.cpuData)
  116. sort.Sort(uint64arr(v.memUseData))
  117. sort.Sort(uint64arr(v.memWorkSetData))
  118. }
  119. result := make(map[int]ResourceUsagePerContainer)
  120. for _, perc := range percentilesToCompute {
  121. data := make(ResourceUsagePerContainer)
  122. for k, v := range dataMap {
  123. percentileIndex := int(math.Ceil(float64(len(v.cpuData)*perc)/100)) - 1
  124. data[k] = &ContainerResourceUsage{
  125. Name: k,
  126. CPUUsageInCores: v.cpuData[percentileIndex],
  127. MemoryUsageInBytes: v.memUseData[percentileIndex],
  128. MemoryWorkingSetInBytes: v.memWorkSetData[percentileIndex],
  129. }
  130. }
  131. result[perc] = data
  132. }
  133. return result
  134. }
  135. func leftMergeData(left, right map[int]ResourceUsagePerContainer) map[int]ResourceUsagePerContainer {
  136. result := make(map[int]ResourceUsagePerContainer)
  137. for percentile, data := range left {
  138. result[percentile] = data
  139. if _, ok := right[percentile]; !ok {
  140. continue
  141. }
  142. for k, v := range right[percentile] {
  143. result[percentile][k] = v
  144. }
  145. }
  146. return result
  147. }
  148. type resourceGatherWorker struct {
  149. c clientset.Interface
  150. nodeName string
  151. wg *sync.WaitGroup
  152. containerIDs []string
  153. stopCh chan struct{}
  154. dataSeries []ResourceUsagePerContainer
  155. finished bool
  156. inKubemark bool
  157. resourceDataGatheringPeriod time.Duration
  158. probeDuration time.Duration
  159. printVerboseLogs bool
  160. }
  161. func (w *resourceGatherWorker) singleProbe() {
  162. data := make(ResourceUsagePerContainer)
  163. if w.inKubemark {
  164. kubemarkData := getKubemarkMasterComponentsResourceUsage()
  165. if kubemarkData == nil {
  166. return
  167. }
  168. for k, v := range kubemarkData {
  169. data[k] = &ContainerResourceUsage{
  170. Name: v.Name,
  171. MemoryWorkingSetInBytes: v.MemoryWorkingSetInBytes,
  172. CPUUsageInCores: v.CPUUsageInCores,
  173. }
  174. }
  175. } else {
  176. nodeUsage, err := getOneTimeResourceUsageOnNode(w.c, w.nodeName, w.probeDuration, func() []string { return w.containerIDs })
  177. if err != nil {
  178. Logf("Error while reading data from %v: %v", w.nodeName, err)
  179. return
  180. }
  181. for k, v := range nodeUsage {
  182. data[k] = v
  183. if w.printVerboseLogs {
  184. Logf("Get container %v usage on node %v. CPUUsageInCores: %v, MemoryUsageInBytes: %v, MemoryWorkingSetInBytes: %v", k, w.nodeName, v.CPUUsageInCores, v.MemoryUsageInBytes, v.MemoryWorkingSetInBytes)
  185. }
  186. }
  187. }
  188. w.dataSeries = append(w.dataSeries, data)
  189. }
  190. // getOneTimeResourceUsageOnNode queries the node's /stats/summary endpoint
  191. // and returns the resource usage of all containerNames for the past
  192. // cpuInterval.
  193. // The acceptable range of the interval is 2s~120s. Be warned that as the
  194. // interval (and #containers) increases, the size of kubelet's response
  195. // could be significant. E.g., the 60s interval stats for ~20 containers is
  196. // ~1.5MB. Don't hammer the node with frequent, heavy requests.
  197. //
  198. // cadvisor records cumulative cpu usage in nanoseconds, so we need to have two
  199. // stats points to compute the cpu usage over the interval. Assuming cadvisor
  200. // polls every second, we'd need to get N stats points for N-second interval.
  201. // Note that this is an approximation and may not be accurate, hence we also
  202. // write the actual interval used for calculation (based on the timestamps of
  203. // the stats points in ContainerResourceUsage.CPUInterval.
  204. //
  205. // containerNames is a function returning a collection of container names in which
  206. // user is interested in.
  207. func getOneTimeResourceUsageOnNode(
  208. c clientset.Interface,
  209. nodeName string,
  210. cpuInterval time.Duration,
  211. containerNames func() []string,
  212. ) (ResourceUsagePerContainer, error) {
  213. const (
  214. // cadvisor records stats about every second.
  215. cadvisorStatsPollingIntervalInSeconds float64 = 1.0
  216. // cadvisor caches up to 2 minutes of stats (configured by kubelet).
  217. maxNumStatsToRequest int = 120
  218. )
  219. numStats := int(float64(cpuInterval.Seconds()) / cadvisorStatsPollingIntervalInSeconds)
  220. if numStats < 2 || numStats > maxNumStatsToRequest {
  221. return nil, fmt.Errorf("numStats needs to be > 1 and < %d", maxNumStatsToRequest)
  222. }
  223. // Get information of all containers on the node.
  224. summary, err := getStatsSummary(c, nodeName)
  225. if err != nil {
  226. return nil, err
  227. }
  228. f := func(name string, newStats *kubeletstatsv1alpha1.ContainerStats) *ContainerResourceUsage {
  229. if newStats == nil || newStats.CPU == nil || newStats.Memory == nil {
  230. return nil
  231. }
  232. return &ContainerResourceUsage{
  233. Name: name,
  234. Timestamp: newStats.StartTime.Time,
  235. CPUUsageInCores: float64(removeUint64Ptr(newStats.CPU.UsageNanoCores)) / 1000000000,
  236. MemoryUsageInBytes: removeUint64Ptr(newStats.Memory.UsageBytes),
  237. MemoryWorkingSetInBytes: removeUint64Ptr(newStats.Memory.WorkingSetBytes),
  238. MemoryRSSInBytes: removeUint64Ptr(newStats.Memory.RSSBytes),
  239. CPUInterval: 0,
  240. }
  241. }
  242. // Process container infos that are relevant to us.
  243. containers := containerNames()
  244. usageMap := make(ResourceUsagePerContainer, len(containers))
  245. for _, pod := range summary.Pods {
  246. for _, container := range pod.Containers {
  247. isInteresting := false
  248. for _, interestingContainerName := range containers {
  249. if container.Name == interestingContainerName {
  250. isInteresting = true
  251. break
  252. }
  253. }
  254. if !isInteresting {
  255. continue
  256. }
  257. if usage := f(pod.PodRef.Name+"/"+container.Name, &container); usage != nil {
  258. usageMap[pod.PodRef.Name+"/"+container.Name] = usage
  259. }
  260. }
  261. }
  262. return usageMap, nil
  263. }
  264. // getStatsSummary contacts kubelet for the container information.
  265. func getStatsSummary(c clientset.Interface, nodeName string) (*kubeletstatsv1alpha1.Summary, error) {
  266. ctx, cancel := context.WithTimeout(context.Background(), SingleCallTimeout)
  267. defer cancel()
  268. data, err := c.CoreV1().RESTClient().Get().
  269. Resource("nodes").
  270. SubResource("proxy").
  271. Name(fmt.Sprintf("%v:%v", nodeName, ports.KubeletPort)).
  272. Suffix("stats/summary").
  273. Do(ctx).Raw()
  274. if err != nil {
  275. return nil, err
  276. }
  277. summary := kubeletstatsv1alpha1.Summary{}
  278. err = json.Unmarshal(data, &summary)
  279. if err != nil {
  280. return nil, err
  281. }
  282. return &summary, nil
  283. }
  284. func removeUint64Ptr(ptr *uint64) uint64 {
  285. if ptr == nil {
  286. return 0
  287. }
  288. return *ptr
  289. }
  290. func (w *resourceGatherWorker) gather(initialSleep time.Duration) {
  291. defer utilruntime.HandleCrash()
  292. defer w.wg.Done()
  293. defer Logf("Closing worker for %v", w.nodeName)
  294. defer func() { w.finished = true }()
  295. select {
  296. case <-time.After(initialSleep):
  297. w.singleProbe()
  298. for {
  299. select {
  300. case <-time.After(w.resourceDataGatheringPeriod):
  301. w.singleProbe()
  302. case <-w.stopCh:
  303. return
  304. }
  305. }
  306. case <-w.stopCh:
  307. return
  308. }
  309. }
  310. // ContainerResourceGatherer is a struct for gathering container resource.
  311. type ContainerResourceGatherer struct {
  312. client clientset.Interface
  313. stopCh chan struct{}
  314. workers []resourceGatherWorker
  315. workerWg sync.WaitGroup
  316. containerIDs []string
  317. options ResourceGathererOptions
  318. }
  319. // ResourceGathererOptions is a struct to hold options for resource.
  320. type ResourceGathererOptions struct {
  321. InKubemark bool
  322. Nodes NodesSet
  323. ResourceDataGatheringPeriod time.Duration
  324. ProbeDuration time.Duration
  325. PrintVerboseLogs bool
  326. }
  327. // NodesSet is a value of nodes set.
  328. type NodesSet int
  329. const (
  330. // AllNodes means all containers on all nodes.
  331. AllNodes NodesSet = 0
  332. // MasterNodes means all containers on Master nodes only.
  333. MasterNodes NodesSet = 1
  334. // MasterAndDNSNodes means all containers on Master nodes and DNS containers on other nodes.
  335. MasterAndDNSNodes NodesSet = 2
  336. )
  337. // NewResourceUsageGatherer returns a new ContainerResourceGatherer.
  338. func NewResourceUsageGatherer(c clientset.Interface, options ResourceGathererOptions, pods *v1.PodList) (*ContainerResourceGatherer, error) {
  339. g := ContainerResourceGatherer{
  340. client: c,
  341. stopCh: make(chan struct{}),
  342. containerIDs: make([]string, 0),
  343. options: options,
  344. }
  345. if options.InKubemark {
  346. g.workerWg.Add(1)
  347. g.workers = append(g.workers, resourceGatherWorker{
  348. inKubemark: true,
  349. stopCh: g.stopCh,
  350. wg: &g.workerWg,
  351. finished: false,
  352. resourceDataGatheringPeriod: options.ResourceDataGatheringPeriod,
  353. probeDuration: options.ProbeDuration,
  354. printVerboseLogs: options.PrintVerboseLogs,
  355. })
  356. return &g, nil
  357. }
  358. // Tracks kube-system pods if no valid PodList is passed in.
  359. var err error
  360. if pods == nil {
  361. pods, err = c.CoreV1().Pods("kube-system").List(context.TODO(), metav1.ListOptions{})
  362. if err != nil {
  363. Logf("Error while listing Pods: %v", err)
  364. return nil, err
  365. }
  366. }
  367. dnsNodes := make(map[string]bool)
  368. for _, pod := range pods.Items {
  369. if (options.Nodes == MasterNodes) && !system.DeprecatedMightBeMasterNode(pod.Spec.NodeName) {
  370. continue
  371. }
  372. if (options.Nodes == MasterAndDNSNodes) && !system.DeprecatedMightBeMasterNode(pod.Spec.NodeName) && pod.Labels["k8s-app"] != "kube-dns" {
  373. continue
  374. }
  375. for _, container := range pod.Status.InitContainerStatuses {
  376. g.containerIDs = append(g.containerIDs, container.Name)
  377. }
  378. for _, container := range pod.Status.ContainerStatuses {
  379. g.containerIDs = append(g.containerIDs, container.Name)
  380. }
  381. if options.Nodes == MasterAndDNSNodes {
  382. dnsNodes[pod.Spec.NodeName] = true
  383. }
  384. }
  385. nodeList, err := c.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
  386. if err != nil {
  387. Logf("Error while listing Nodes: %v", err)
  388. return nil, err
  389. }
  390. for _, node := range nodeList.Items {
  391. if options.Nodes == AllNodes || system.DeprecatedMightBeMasterNode(node.Name) || dnsNodes[node.Name] {
  392. g.workerWg.Add(1)
  393. g.workers = append(g.workers, resourceGatherWorker{
  394. c: c,
  395. nodeName: node.Name,
  396. wg: &g.workerWg,
  397. containerIDs: g.containerIDs,
  398. stopCh: g.stopCh,
  399. finished: false,
  400. inKubemark: false,
  401. resourceDataGatheringPeriod: options.ResourceDataGatheringPeriod,
  402. probeDuration: options.ProbeDuration,
  403. printVerboseLogs: options.PrintVerboseLogs,
  404. })
  405. if options.Nodes == MasterNodes {
  406. break
  407. }
  408. }
  409. }
  410. return &g, nil
  411. }
  412. // StartGatheringData starts a stat gathering worker blocks for each node to track,
  413. // and blocks until StopAndSummarize is called.
  414. func (g *ContainerResourceGatherer) StartGatheringData() {
  415. if len(g.workers) == 0 {
  416. return
  417. }
  418. delayPeriod := g.options.ResourceDataGatheringPeriod / time.Duration(len(g.workers))
  419. delay := time.Duration(0)
  420. for i := range g.workers {
  421. go g.workers[i].gather(delay)
  422. delay += delayPeriod
  423. }
  424. g.workerWg.Wait()
  425. }
  426. // StopAndSummarize stops stat gathering workers, processes the collected stats,
  427. // generates resource summary for the passed-in percentiles, and returns the summary.
  428. // It returns an error if the resource usage at any percentile is beyond the
  429. // specified resource constraints.
  430. func (g *ContainerResourceGatherer) StopAndSummarize(percentiles []int, constraints map[string]ResourceConstraint) (*ResourceUsageSummary, error) {
  431. close(g.stopCh)
  432. Logf("Closed stop channel. Waiting for %v workers", len(g.workers))
  433. finished := make(chan struct{}, 1)
  434. go func() {
  435. g.workerWg.Wait()
  436. finished <- struct{}{}
  437. }()
  438. select {
  439. case <-finished:
  440. Logf("Waitgroup finished.")
  441. case <-time.After(2 * time.Minute):
  442. unfinished := make([]string, 0)
  443. for i := range g.workers {
  444. if !g.workers[i].finished {
  445. unfinished = append(unfinished, g.workers[i].nodeName)
  446. }
  447. }
  448. Logf("Timed out while waiting for waitgroup, some workers failed to finish: %v", unfinished)
  449. }
  450. if len(percentiles) == 0 {
  451. Logf("Warning! Empty percentile list for stopAndPrintData.")
  452. return &ResourceUsageSummary{}, fmt.Errorf("Failed to get any resource usage data")
  453. }
  454. data := make(map[int]ResourceUsagePerContainer)
  455. for i := range g.workers {
  456. if g.workers[i].finished {
  457. stats := computePercentiles(g.workers[i].dataSeries, percentiles)
  458. data = leftMergeData(stats, data)
  459. }
  460. }
  461. // Workers has been stopped. We need to gather data stored in them.
  462. sortedKeys := []string{}
  463. for name := range data[percentiles[0]] {
  464. sortedKeys = append(sortedKeys, name)
  465. }
  466. sort.Strings(sortedKeys)
  467. violatedConstraints := make([]string, 0)
  468. summary := make(ResourceUsageSummary)
  469. for _, perc := range percentiles {
  470. for _, name := range sortedKeys {
  471. usage := data[perc][name]
  472. summary[strconv.Itoa(perc)] = append(summary[strconv.Itoa(perc)], SingleContainerSummary{
  473. Name: name,
  474. CPU: usage.CPUUsageInCores,
  475. Mem: usage.MemoryWorkingSetInBytes,
  476. })
  477. // Verifying 99th percentile of resource usage
  478. if perc != 99 {
  479. continue
  480. }
  481. // Name has a form: <pod_name>/<container_name>
  482. containerName := strings.Split(name, "/")[1]
  483. constraint, ok := constraints[containerName]
  484. if !ok {
  485. continue
  486. }
  487. if usage.CPUUsageInCores > constraint.CPUConstraint {
  488. violatedConstraints = append(
  489. violatedConstraints,
  490. fmt.Sprintf("Container %v is using %v/%v CPU",
  491. name,
  492. usage.CPUUsageInCores,
  493. constraint.CPUConstraint,
  494. ),
  495. )
  496. }
  497. if usage.MemoryWorkingSetInBytes > constraint.MemoryConstraint {
  498. violatedConstraints = append(
  499. violatedConstraints,
  500. fmt.Sprintf("Container %v is using %v/%v MB of memory",
  501. name,
  502. float64(usage.MemoryWorkingSetInBytes)/(1024*1024),
  503. float64(constraint.MemoryConstraint)/(1024*1024),
  504. ),
  505. )
  506. }
  507. }
  508. }
  509. if len(violatedConstraints) > 0 {
  510. return &summary, fmt.Errorf(strings.Join(violatedConstraints, "\n"))
  511. }
  512. return &summary, nil
  513. }
  514. // kubemarkResourceUsage is a struct for tracking the resource usage of kubemark.
  515. type kubemarkResourceUsage struct {
  516. Name string
  517. MemoryWorkingSetInBytes uint64
  518. CPUUsageInCores float64
  519. }
  520. func getMasterUsageByPrefix(prefix string) (string, error) {
  521. sshResult, err := e2essh.SSH(fmt.Sprintf("ps ax -o %%cpu,rss,command | tail -n +2 | grep %v | sed 's/\\s+/ /g'", prefix), GetMasterHost()+":22", TestContext.Provider)
  522. if err != nil {
  523. return "", err
  524. }
  525. return sshResult.Stdout, nil
  526. }
  527. // getKubemarkMasterComponentsResourceUsage returns the resource usage of kubemark which contains multiple combinations of cpu and memory usage for each pod name.
  528. func getKubemarkMasterComponentsResourceUsage() map[string]*kubemarkResourceUsage {
  529. result := make(map[string]*kubemarkResourceUsage)
  530. // Get kubernetes component resource usage
  531. sshResult, err := getMasterUsageByPrefix("kube")
  532. if err != nil {
  533. Logf("Error when trying to SSH to master machine. Skipping probe. %v", err)
  534. return nil
  535. }
  536. scanner := bufio.NewScanner(strings.NewReader(sshResult))
  537. for scanner.Scan() {
  538. var cpu float64
  539. var mem uint64
  540. var name string
  541. fmt.Sscanf(strings.TrimSpace(scanner.Text()), "%f %d /usr/local/bin/kube-%s", &cpu, &mem, &name)
  542. if name != "" {
  543. // Gatherer expects pod_name/container_name format
  544. fullName := name + "/" + name
  545. result[fullName] = &kubemarkResourceUsage{Name: fullName, MemoryWorkingSetInBytes: mem * 1024, CPUUsageInCores: cpu / 100}
  546. }
  547. }
  548. // Get etcd resource usage
  549. sshResult, err = getMasterUsageByPrefix("bin/etcd")
  550. if err != nil {
  551. Logf("Error when trying to SSH to master machine. Skipping probe")
  552. return nil
  553. }
  554. scanner = bufio.NewScanner(strings.NewReader(sshResult))
  555. for scanner.Scan() {
  556. var cpu float64
  557. var mem uint64
  558. var etcdKind string
  559. fmt.Sscanf(strings.TrimSpace(scanner.Text()), "%f %d /bin/sh -c /usr/local/bin/etcd", &cpu, &mem)
  560. dataDirStart := strings.Index(scanner.Text(), "--data-dir")
  561. if dataDirStart < 0 {
  562. continue
  563. }
  564. fmt.Sscanf(scanner.Text()[dataDirStart:], "--data-dir=/var/%s", &etcdKind)
  565. if etcdKind != "" {
  566. // Gatherer expects pod_name/container_name format
  567. fullName := "etcd/" + etcdKind
  568. result[fullName] = &kubemarkResourceUsage{Name: fullName, MemoryWorkingSetInBytes: mem * 1024, CPUUsageInCores: cpu / 100}
  569. }
  570. }
  571. return result
  572. }