resource_collector.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552
  1. // +build linux
  2. /*
  3. Copyright 2015 The Kubernetes Authors.
  4. Licensed under the Apache License, Version 2.0 (the "License");
  5. you may not use this file except in compliance with the License.
  6. You may obtain a copy of the License at
  7. http://www.apache.org/licenses/LICENSE-2.0
  8. Unless required by applicable law or agreed to in writing, software
  9. distributed under the License is distributed on an "AS IS" BASIS,
  10. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. See the License for the specific language governing permissions and
  12. limitations under the License.
  13. */
  14. package e2e_node
  15. import (
  16. "bytes"
  17. "fmt"
  18. "io/ioutil"
  19. "log"
  20. "os"
  21. "sort"
  22. "strconv"
  23. "strings"
  24. "sync"
  25. "text/tabwriter"
  26. "time"
  27. cadvisorclient "github.com/google/cadvisor/client/v2"
  28. cadvisorapiv2 "github.com/google/cadvisor/info/v2"
  29. "github.com/opencontainers/runc/libcontainer/cgroups"
  30. "k8s.io/api/core/v1"
  31. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  32. "k8s.io/apimachinery/pkg/labels"
  33. "k8s.io/apimachinery/pkg/util/runtime"
  34. "k8s.io/apimachinery/pkg/util/uuid"
  35. "k8s.io/apimachinery/pkg/util/wait"
  36. stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
  37. "k8s.io/kubernetes/pkg/util/procfs"
  38. "k8s.io/kubernetes/test/e2e/framework"
  39. e2elog "k8s.io/kubernetes/test/e2e/framework/log"
  40. "k8s.io/kubernetes/test/e2e_node/perftype"
  41. . "github.com/onsi/ginkgo"
  42. . "github.com/onsi/gomega"
  43. )
  44. const (
  45. // resource monitoring
  46. cadvisorImageName = "google/cadvisor:latest"
  47. cadvisorPodName = "cadvisor"
  48. cadvisorPort = 8090
  49. // housekeeping interval of Cadvisor (second)
  50. houseKeepingInterval = 1
  51. )
  52. var (
  53. systemContainers map[string]string
  54. )
  55. type ResourceCollector struct {
  56. client *cadvisorclient.Client
  57. request *cadvisorapiv2.RequestOptions
  58. pollingInterval time.Duration
  59. buffers map[string][]*framework.ContainerResourceUsage
  60. lock sync.RWMutex
  61. stopCh chan struct{}
  62. }
  63. // NewResourceCollector creates a resource collector object which collects
  64. // resource usage periodically from Cadvisor
  65. func NewResourceCollector(interval time.Duration) *ResourceCollector {
  66. buffers := make(map[string][]*framework.ContainerResourceUsage)
  67. return &ResourceCollector{
  68. pollingInterval: interval,
  69. buffers: buffers,
  70. }
  71. }
  72. // Start starts resource collector and connects to the standalone Cadvisor pod
  73. // then repeatedly runs collectStats.
  74. func (r *ResourceCollector) Start() {
  75. // Get the cgroup container names for kubelet and runtime
  76. kubeletContainer, err1 := getContainerNameForProcess(kubeletProcessName, "")
  77. runtimeContainer, err2 := getContainerNameForProcess(framework.TestContext.ContainerRuntimeProcessName, framework.TestContext.ContainerRuntimePidFile)
  78. if err1 == nil && err2 == nil {
  79. systemContainers = map[string]string{
  80. stats.SystemContainerKubelet: kubeletContainer,
  81. stats.SystemContainerRuntime: runtimeContainer,
  82. }
  83. } else {
  84. framework.Failf("Failed to get runtime container name in test-e2e-node resource collector.")
  85. }
  86. wait.Poll(1*time.Second, 1*time.Minute, func() (bool, error) {
  87. var err error
  88. r.client, err = cadvisorclient.NewClient(fmt.Sprintf("http://localhost:%d/", cadvisorPort))
  89. if err == nil {
  90. return true, nil
  91. }
  92. return false, err
  93. })
  94. Expect(r.client).NotTo(BeNil(), "cadvisor client not ready")
  95. r.request = &cadvisorapiv2.RequestOptions{IdType: "name", Count: 1, Recursive: false}
  96. r.stopCh = make(chan struct{})
  97. oldStatsMap := make(map[string]*cadvisorapiv2.ContainerStats)
  98. go wait.Until(func() { r.collectStats(oldStatsMap) }, r.pollingInterval, r.stopCh)
  99. }
  100. // Stop stops resource collector collecting stats. It does not clear the buffer
  101. func (r *ResourceCollector) Stop() {
  102. close(r.stopCh)
  103. }
  104. // Reset clears the stats buffer of resource collector.
  105. func (r *ResourceCollector) Reset() {
  106. r.lock.Lock()
  107. defer r.lock.Unlock()
  108. for _, name := range systemContainers {
  109. r.buffers[name] = []*framework.ContainerResourceUsage{}
  110. }
  111. }
  112. // GetCPUSummary gets CPU usage in percentile.
  113. func (r *ResourceCollector) GetCPUSummary() framework.ContainersCPUSummary {
  114. result := make(framework.ContainersCPUSummary)
  115. for key, name := range systemContainers {
  116. data := r.GetBasicCPUStats(name)
  117. result[key] = data
  118. }
  119. return result
  120. }
  121. // LogLatest logs the latest resource usage.
  122. func (r *ResourceCollector) LogLatest() {
  123. summary, err := r.GetLatest()
  124. if err != nil {
  125. e2elog.Logf("%v", err)
  126. }
  127. e2elog.Logf("%s", formatResourceUsageStats(summary))
  128. }
  129. // collectStats collects resource usage from Cadvisor.
  130. func (r *ResourceCollector) collectStats(oldStatsMap map[string]*cadvisorapiv2.ContainerStats) {
  131. for _, name := range systemContainers {
  132. ret, err := r.client.Stats(name, r.request)
  133. if err != nil {
  134. e2elog.Logf("Error getting container stats, err: %v", err)
  135. return
  136. }
  137. cStats, ok := ret[name]
  138. if !ok {
  139. e2elog.Logf("Missing info/stats for container %q", name)
  140. return
  141. }
  142. newStats := cStats.Stats[0]
  143. if oldStats, ok := oldStatsMap[name]; ok && oldStats.Timestamp.Before(newStats.Timestamp) {
  144. r.buffers[name] = append(r.buffers[name], computeContainerResourceUsage(name, oldStats, newStats))
  145. }
  146. oldStatsMap[name] = newStats
  147. }
  148. }
  149. // computeContainerResourceUsage computes resource usage based on new data sample.
  150. func computeContainerResourceUsage(name string, oldStats, newStats *cadvisorapiv2.ContainerStats) *framework.ContainerResourceUsage {
  151. return &framework.ContainerResourceUsage{
  152. Name: name,
  153. Timestamp: newStats.Timestamp,
  154. CPUUsageInCores: float64(newStats.Cpu.Usage.Total-oldStats.Cpu.Usage.Total) / float64(newStats.Timestamp.Sub(oldStats.Timestamp).Nanoseconds()),
  155. MemoryUsageInBytes: newStats.Memory.Usage,
  156. MemoryWorkingSetInBytes: newStats.Memory.WorkingSet,
  157. MemoryRSSInBytes: newStats.Memory.RSS,
  158. CPUInterval: newStats.Timestamp.Sub(oldStats.Timestamp),
  159. }
  160. }
  161. // GetLatest gets the latest resource usage from stats buffer.
  162. func (r *ResourceCollector) GetLatest() (framework.ResourceUsagePerContainer, error) {
  163. r.lock.RLock()
  164. defer r.lock.RUnlock()
  165. stats := make(framework.ResourceUsagePerContainer)
  166. for key, name := range systemContainers {
  167. contStats, ok := r.buffers[name]
  168. if !ok || len(contStats) == 0 {
  169. return nil, fmt.Errorf("No resource usage data for %s container (%s)", key, name)
  170. }
  171. stats[key] = contStats[len(contStats)-1]
  172. }
  173. return stats, nil
  174. }
  175. type resourceUsageByCPU []*framework.ContainerResourceUsage
  176. func (r resourceUsageByCPU) Len() int { return len(r) }
  177. func (r resourceUsageByCPU) Swap(i, j int) { r[i], r[j] = r[j], r[i] }
  178. func (r resourceUsageByCPU) Less(i, j int) bool { return r[i].CPUUsageInCores < r[j].CPUUsageInCores }
  179. // The percentiles to report.
  180. var percentiles = [...]float64{0.50, 0.90, 0.95, 0.99, 1.00}
  181. // GetBasicCPUStats returns the percentiles the cpu usage in cores for
  182. // containerName. This method examines all data currently in the buffer.
  183. func (r *ResourceCollector) GetBasicCPUStats(containerName string) map[float64]float64 {
  184. r.lock.RLock()
  185. defer r.lock.RUnlock()
  186. result := make(map[float64]float64, len(percentiles))
  187. // We must make a copy of array, otherwise the timeseries order is changed.
  188. usages := make([]*framework.ContainerResourceUsage, 0)
  189. usages = append(usages, r.buffers[containerName]...)
  190. sort.Sort(resourceUsageByCPU(usages))
  191. for _, q := range percentiles {
  192. index := int(float64(len(usages))*q) - 1
  193. if index < 0 {
  194. // We don't have enough data.
  195. result[q] = 0
  196. continue
  197. }
  198. result[q] = usages[index].CPUUsageInCores
  199. }
  200. return result
  201. }
  202. func formatResourceUsageStats(containerStats framework.ResourceUsagePerContainer) string {
  203. // Example output:
  204. //
  205. // Resource usage:
  206. //container cpu(cores) memory_working_set(MB) memory_rss(MB)
  207. //"kubelet" 0.068 27.92 15.43
  208. //"runtime" 0.664 89.88 68.13
  209. buf := &bytes.Buffer{}
  210. w := tabwriter.NewWriter(buf, 1, 0, 1, ' ', 0)
  211. fmt.Fprintf(w, "container\tcpu(cores)\tmemory_working_set(MB)\tmemory_rss(MB)\n")
  212. for name, s := range containerStats {
  213. fmt.Fprintf(w, "%q\t%.3f\t%.2f\t%.2f\n", name, s.CPUUsageInCores, float64(s.MemoryWorkingSetInBytes)/(1024*1024), float64(s.MemoryRSSInBytes)/(1024*1024))
  214. }
  215. w.Flush()
  216. return fmt.Sprintf("Resource usage:\n%s", buf.String())
  217. }
  218. func formatCPUSummary(summary framework.ContainersCPUSummary) string {
  219. // Example output for a node (the percentiles may differ):
  220. // CPU usage of containers:
  221. // container 5th% 50th% 90th% 95th%
  222. // "/" 0.051 0.159 0.387 0.455
  223. // "/runtime 0.000 0.000 0.146 0.166
  224. // "/kubelet" 0.036 0.053 0.091 0.154
  225. // "/misc" 0.001 0.001 0.001 0.002
  226. var summaryStrings []string
  227. var header []string
  228. header = append(header, "container")
  229. for _, p := range percentiles {
  230. header = append(header, fmt.Sprintf("%.0fth%%", p*100))
  231. }
  232. buf := &bytes.Buffer{}
  233. w := tabwriter.NewWriter(buf, 1, 0, 1, ' ', 0)
  234. fmt.Fprintf(w, "%s\n", strings.Join(header, "\t"))
  235. for _, containerName := range framework.TargetContainers() {
  236. var s []string
  237. s = append(s, fmt.Sprintf("%q", containerName))
  238. data, ok := summary[containerName]
  239. for _, p := range percentiles {
  240. value := "N/A"
  241. if ok {
  242. value = fmt.Sprintf("%.3f", data[p])
  243. }
  244. s = append(s, value)
  245. }
  246. fmt.Fprintf(w, "%s\n", strings.Join(s, "\t"))
  247. }
  248. w.Flush()
  249. summaryStrings = append(summaryStrings, fmt.Sprintf("CPU usage of containers:\n%s", buf.String()))
  250. return strings.Join(summaryStrings, "\n")
  251. }
  252. // createCadvisorPod creates a standalone cadvisor pod for fine-grain resource monitoring.
  253. func getCadvisorPod() *v1.Pod {
  254. return &v1.Pod{
  255. ObjectMeta: metav1.ObjectMeta{
  256. Name: cadvisorPodName,
  257. },
  258. Spec: v1.PodSpec{
  259. // It uses a host port for the tests to collect data.
  260. // Currently we can not use port mapping in test-e2e-node.
  261. HostNetwork: true,
  262. SecurityContext: &v1.PodSecurityContext{},
  263. Containers: []v1.Container{
  264. {
  265. Image: cadvisorImageName,
  266. Name: cadvisorPodName,
  267. Ports: []v1.ContainerPort{
  268. {
  269. Name: "http",
  270. HostPort: cadvisorPort,
  271. ContainerPort: cadvisorPort,
  272. Protocol: v1.ProtocolTCP,
  273. },
  274. },
  275. VolumeMounts: []v1.VolumeMount{
  276. {
  277. Name: "sys",
  278. ReadOnly: true,
  279. MountPath: "/sys",
  280. },
  281. {
  282. Name: "var-run",
  283. ReadOnly: false,
  284. MountPath: "/var/run",
  285. },
  286. {
  287. Name: "docker",
  288. ReadOnly: true,
  289. MountPath: "/var/lib/docker/",
  290. },
  291. {
  292. Name: "rootfs",
  293. ReadOnly: true,
  294. MountPath: "/rootfs",
  295. },
  296. },
  297. Args: []string{
  298. "--profiling",
  299. fmt.Sprintf("--housekeeping_interval=%ds", houseKeepingInterval),
  300. fmt.Sprintf("--port=%d", cadvisorPort),
  301. },
  302. },
  303. },
  304. Volumes: []v1.Volume{
  305. {
  306. Name: "rootfs",
  307. VolumeSource: v1.VolumeSource{HostPath: &v1.HostPathVolumeSource{Path: "/"}},
  308. },
  309. {
  310. Name: "var-run",
  311. VolumeSource: v1.VolumeSource{HostPath: &v1.HostPathVolumeSource{Path: "/var/run"}},
  312. },
  313. {
  314. Name: "sys",
  315. VolumeSource: v1.VolumeSource{HostPath: &v1.HostPathVolumeSource{Path: "/sys"}},
  316. },
  317. {
  318. Name: "docker",
  319. VolumeSource: v1.VolumeSource{HostPath: &v1.HostPathVolumeSource{Path: "/var/lib/docker"}},
  320. },
  321. },
  322. },
  323. }
  324. }
  325. // deletePodsSync deletes a list of pods and block until pods disappear.
  326. func deletePodsSync(f *framework.Framework, pods []*v1.Pod) {
  327. var wg sync.WaitGroup
  328. for _, pod := range pods {
  329. wg.Add(1)
  330. go func(pod *v1.Pod) {
  331. defer GinkgoRecover()
  332. defer wg.Done()
  333. err := f.PodClient().Delete(pod.ObjectMeta.Name, metav1.NewDeleteOptions(30))
  334. Expect(err).NotTo(HaveOccurred())
  335. Expect(framework.WaitForPodToDisappear(f.ClientSet, f.Namespace.Name, pod.ObjectMeta.Name, labels.Everything(),
  336. 30*time.Second, 10*time.Minute)).NotTo(HaveOccurred())
  337. }(pod)
  338. }
  339. wg.Wait()
  340. return
  341. }
  342. // newTestPods creates a list of pods (specification) for test.
  343. func newTestPods(numPods int, volume bool, imageName, podType string) []*v1.Pod {
  344. var pods []*v1.Pod
  345. for i := 0; i < numPods; i++ {
  346. podName := "test-" + string(uuid.NewUUID())
  347. labels := map[string]string{
  348. "type": podType,
  349. "name": podName,
  350. }
  351. if volume {
  352. pods = append(pods,
  353. &v1.Pod{
  354. ObjectMeta: metav1.ObjectMeta{
  355. Name: podName,
  356. Labels: labels,
  357. },
  358. Spec: v1.PodSpec{
  359. // Restart policy is always (default).
  360. Containers: []v1.Container{
  361. {
  362. Image: imageName,
  363. Name: podName,
  364. VolumeMounts: []v1.VolumeMount{
  365. {MountPath: "/test-volume-mnt", Name: podName + "-volume"},
  366. },
  367. },
  368. },
  369. Volumes: []v1.Volume{
  370. {Name: podName + "-volume", VolumeSource: v1.VolumeSource{EmptyDir: &v1.EmptyDirVolumeSource{}}},
  371. },
  372. },
  373. })
  374. } else {
  375. pods = append(pods,
  376. &v1.Pod{
  377. ObjectMeta: metav1.ObjectMeta{
  378. Name: podName,
  379. Labels: labels,
  380. },
  381. Spec: v1.PodSpec{
  382. // Restart policy is always (default).
  383. Containers: []v1.Container{
  384. {
  385. Image: imageName,
  386. Name: podName,
  387. },
  388. },
  389. },
  390. })
  391. }
  392. }
  393. return pods
  394. }
  395. // GetResourceSeriesWithLabels gets the time series of resource usage of each container.
  396. func (r *ResourceCollector) GetResourceTimeSeries() map[string]*perftype.ResourceSeries {
  397. resourceSeries := make(map[string]*perftype.ResourceSeries)
  398. for key, name := range systemContainers {
  399. newSeries := &perftype.ResourceSeries{Units: map[string]string{
  400. "cpu": "mCPU",
  401. "memory": "MB",
  402. }}
  403. resourceSeries[key] = newSeries
  404. for _, usage := range r.buffers[name] {
  405. newSeries.Timestamp = append(newSeries.Timestamp, usage.Timestamp.UnixNano())
  406. newSeries.CPUUsageInMilliCores = append(newSeries.CPUUsageInMilliCores, int64(usage.CPUUsageInCores*1000))
  407. newSeries.MemoryRSSInMegaBytes = append(newSeries.MemoryRSSInMegaBytes, int64(float64(usage.MemoryUsageInBytes)/(1024*1024)))
  408. }
  409. }
  410. return resourceSeries
  411. }
  412. const kubeletProcessName = "kubelet"
  413. func getPidsForProcess(name, pidFile string) ([]int, error) {
  414. if len(pidFile) > 0 {
  415. if pid, err := getPidFromPidFile(pidFile); err == nil {
  416. return []int{pid}, nil
  417. } else {
  418. // log the error and fall back to pidof
  419. runtime.HandleError(err)
  420. }
  421. }
  422. return procfs.PidOf(name)
  423. }
  424. func getPidFromPidFile(pidFile string) (int, error) {
  425. file, err := os.Open(pidFile)
  426. if err != nil {
  427. return 0, fmt.Errorf("error opening pid file %s: %v", pidFile, err)
  428. }
  429. defer file.Close()
  430. data, err := ioutil.ReadAll(file)
  431. if err != nil {
  432. return 0, fmt.Errorf("error reading pid file %s: %v", pidFile, err)
  433. }
  434. pid, err := strconv.Atoi(string(data))
  435. if err != nil {
  436. return 0, fmt.Errorf("error parsing %s as a number: %v", string(data), err)
  437. }
  438. return pid, nil
  439. }
  440. func getContainerNameForProcess(name, pidFile string) (string, error) {
  441. pids, err := getPidsForProcess(name, pidFile)
  442. if err != nil {
  443. return "", fmt.Errorf("failed to detect process id for %q - %v", name, err)
  444. }
  445. if len(pids) == 0 {
  446. return "", nil
  447. }
  448. cont, err := getContainer(pids[0])
  449. if err != nil {
  450. return "", err
  451. }
  452. return cont, nil
  453. }
  454. // getContainer returns the cgroup associated with the specified pid.
  455. // It enforces a unified hierarchy for memory and cpu cgroups.
  456. // On systemd environments, it uses the name=systemd cgroup for the specified pid.
  457. func getContainer(pid int) (string, error) {
  458. cgs, err := cgroups.ParseCgroupFile(fmt.Sprintf("/proc/%d/cgroup", pid))
  459. if err != nil {
  460. return "", err
  461. }
  462. cpu, found := cgs["cpu"]
  463. if !found {
  464. return "", cgroups.NewNotFoundError("cpu")
  465. }
  466. memory, found := cgs["memory"]
  467. if !found {
  468. return "", cgroups.NewNotFoundError("memory")
  469. }
  470. // since we use this container for accounting, we need to ensure it is a unified hierarchy.
  471. if cpu != memory {
  472. return "", fmt.Errorf("cpu and memory cgroup hierarchy not unified. cpu: %s, memory: %s", cpu, memory)
  473. }
  474. // on systemd, every pid is in a unified cgroup hierarchy (name=systemd as seen in systemd-cgls)
  475. // cpu and memory accounting is off by default, users may choose to enable it per unit or globally.
  476. // users could enable CPU and memory accounting globally via /etc/systemd/system.conf (DefaultCPUAccounting=true DefaultMemoryAccounting=true).
  477. // users could also enable CPU and memory accounting per unit via CPUAccounting=true and MemoryAccounting=true
  478. // we only warn if accounting is not enabled for CPU or memory so as to not break local development flows where kubelet is launched in a terminal.
  479. // for example, the cgroup for the user session will be something like /user.slice/user-X.slice/session-X.scope, but the cpu and memory
  480. // cgroup will be the closest ancestor where accounting is performed (most likely /) on systems that launch docker containers.
  481. // as a result, on those systems, you will not get cpu or memory accounting statistics for kubelet.
  482. // in addition, you would not get memory or cpu accounting for the runtime unless accounting was enabled on its unit (or globally).
  483. if systemd, found := cgs["name=systemd"]; found {
  484. if systemd != cpu {
  485. log.Printf("CPUAccounting not enabled for pid: %d", pid)
  486. }
  487. if systemd != memory {
  488. log.Printf("MemoryAccounting not enabled for pid: %d", pid)
  489. }
  490. return systemd, nil
  491. }
  492. return cpu, nil
  493. }