resource_usage_gatherer.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package framework
  14. import (
  15. "bytes"
  16. "fmt"
  17. "math"
  18. "sort"
  19. "strconv"
  20. "strings"
  21. "sync"
  22. "text/tabwriter"
  23. "time"
  24. "k8s.io/api/core/v1"
  25. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  26. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  27. clientset "k8s.io/client-go/kubernetes"
  28. "k8s.io/kubernetes/pkg/util/system"
  29. e2elog "k8s.io/kubernetes/test/e2e/framework/log"
  30. )
  31. // ResourceConstraint is a struct to hold constraints.
  32. type ResourceConstraint struct {
  33. CPUConstraint float64
  34. MemoryConstraint uint64
  35. }
  36. // SingleContainerSummary is a struct to hold single container summary.
  37. type SingleContainerSummary struct {
  38. Name string
  39. CPU float64
  40. Mem uint64
  41. }
  42. // ResourceUsageSummary is a struct to hold resource usage summary.
  43. // we can't have int here, as JSON does not accept integer keys.
  44. type ResourceUsageSummary map[string][]SingleContainerSummary
  45. // NoCPUConstraint is the number of constraint for CPU.
  46. const NoCPUConstraint = math.MaxFloat64
  47. // PrintHumanReadable prints resource usage summary in human readable.
  48. func (s *ResourceUsageSummary) PrintHumanReadable() string {
  49. buf := &bytes.Buffer{}
  50. w := tabwriter.NewWriter(buf, 1, 0, 1, ' ', 0)
  51. for perc, summaries := range *s {
  52. buf.WriteString(fmt.Sprintf("%v percentile:\n", perc))
  53. fmt.Fprintf(w, "container\tcpu(cores)\tmemory(MB)\n")
  54. for _, summary := range summaries {
  55. fmt.Fprintf(w, "%q\t%.3f\t%.2f\n", summary.Name, summary.CPU, float64(summary.Mem)/(1024*1024))
  56. }
  57. w.Flush()
  58. }
  59. return buf.String()
  60. }
  61. // PrintJSON prints resource usage summary in JSON.
  62. func (s *ResourceUsageSummary) PrintJSON() string {
  63. return PrettyPrintJSON(*s)
  64. }
  65. // SummaryKind returns string of ResourceUsageSummary
  66. func (s *ResourceUsageSummary) SummaryKind() string {
  67. return "ResourceUsageSummary"
  68. }
  69. func computePercentiles(timeSeries []ResourceUsagePerContainer, percentilesToCompute []int) map[int]ResourceUsagePerContainer {
  70. if len(timeSeries) == 0 {
  71. return make(map[int]ResourceUsagePerContainer)
  72. }
  73. dataMap := make(map[string]*usageDataPerContainer)
  74. for i := range timeSeries {
  75. for name, data := range timeSeries[i] {
  76. if dataMap[name] == nil {
  77. dataMap[name] = &usageDataPerContainer{
  78. cpuData: make([]float64, 0, len(timeSeries)),
  79. memUseData: make([]uint64, 0, len(timeSeries)),
  80. memWorkSetData: make([]uint64, 0, len(timeSeries)),
  81. }
  82. }
  83. dataMap[name].cpuData = append(dataMap[name].cpuData, data.CPUUsageInCores)
  84. dataMap[name].memUseData = append(dataMap[name].memUseData, data.MemoryUsageInBytes)
  85. dataMap[name].memWorkSetData = append(dataMap[name].memWorkSetData, data.MemoryWorkingSetInBytes)
  86. }
  87. }
  88. for _, v := range dataMap {
  89. sort.Float64s(v.cpuData)
  90. sort.Sort(uint64arr(v.memUseData))
  91. sort.Sort(uint64arr(v.memWorkSetData))
  92. }
  93. result := make(map[int]ResourceUsagePerContainer)
  94. for _, perc := range percentilesToCompute {
  95. data := make(ResourceUsagePerContainer)
  96. for k, v := range dataMap {
  97. percentileIndex := int(math.Ceil(float64(len(v.cpuData)*perc)/100)) - 1
  98. data[k] = &ContainerResourceUsage{
  99. Name: k,
  100. CPUUsageInCores: v.cpuData[percentileIndex],
  101. MemoryUsageInBytes: v.memUseData[percentileIndex],
  102. MemoryWorkingSetInBytes: v.memWorkSetData[percentileIndex],
  103. }
  104. }
  105. result[perc] = data
  106. }
  107. return result
  108. }
  109. func leftMergeData(left, right map[int]ResourceUsagePerContainer) map[int]ResourceUsagePerContainer {
  110. result := make(map[int]ResourceUsagePerContainer)
  111. for percentile, data := range left {
  112. result[percentile] = data
  113. if _, ok := right[percentile]; !ok {
  114. continue
  115. }
  116. for k, v := range right[percentile] {
  117. result[percentile][k] = v
  118. }
  119. }
  120. return result
  121. }
  122. type resourceGatherWorker struct {
  123. c clientset.Interface
  124. nodeName string
  125. wg *sync.WaitGroup
  126. containerIDs []string
  127. stopCh chan struct{}
  128. dataSeries []ResourceUsagePerContainer
  129. finished bool
  130. inKubemark bool
  131. resourceDataGatheringPeriod time.Duration
  132. probeDuration time.Duration
  133. printVerboseLogs bool
  134. }
  135. func (w *resourceGatherWorker) singleProbe() {
  136. data := make(ResourceUsagePerContainer)
  137. if w.inKubemark {
  138. kubemarkData := GetKubemarkMasterComponentsResourceUsage()
  139. if data == nil {
  140. return
  141. }
  142. for k, v := range kubemarkData {
  143. data[k] = &ContainerResourceUsage{
  144. Name: v.Name,
  145. MemoryWorkingSetInBytes: v.MemoryWorkingSetInBytes,
  146. CPUUsageInCores: v.CPUUsageInCores,
  147. }
  148. }
  149. } else {
  150. nodeUsage, err := getOneTimeResourceUsageOnNode(w.c, w.nodeName, w.probeDuration, func() []string { return w.containerIDs })
  151. if err != nil {
  152. e2elog.Logf("Error while reading data from %v: %v", w.nodeName, err)
  153. return
  154. }
  155. for k, v := range nodeUsage {
  156. data[k] = v
  157. if w.printVerboseLogs {
  158. e2elog.Logf("Get container %v usage on node %v. CPUUsageInCores: %v, MemoryUsageInBytes: %v, MemoryWorkingSetInBytes: %v", k, w.nodeName, v.CPUUsageInCores, v.MemoryUsageInBytes, v.MemoryWorkingSetInBytes)
  159. }
  160. }
  161. }
  162. w.dataSeries = append(w.dataSeries, data)
  163. }
  164. func (w *resourceGatherWorker) gather(initialSleep time.Duration) {
  165. defer utilruntime.HandleCrash()
  166. defer w.wg.Done()
  167. defer e2elog.Logf("Closing worker for %v", w.nodeName)
  168. defer func() { w.finished = true }()
  169. select {
  170. case <-time.After(initialSleep):
  171. w.singleProbe()
  172. for {
  173. select {
  174. case <-time.After(w.resourceDataGatheringPeriod):
  175. w.singleProbe()
  176. case <-w.stopCh:
  177. return
  178. }
  179. }
  180. case <-w.stopCh:
  181. return
  182. }
  183. }
  184. // ContainerResourceGatherer is a struct for gathering container resource.
  185. type ContainerResourceGatherer struct {
  186. client clientset.Interface
  187. stopCh chan struct{}
  188. workers []resourceGatherWorker
  189. workerWg sync.WaitGroup
  190. containerIDs []string
  191. options ResourceGathererOptions
  192. }
  193. // ResourceGathererOptions is a struct to hold options for resource.
  194. type ResourceGathererOptions struct {
  195. InKubemark bool
  196. Nodes NodesSet
  197. ResourceDataGatheringPeriod time.Duration
  198. ProbeDuration time.Duration
  199. PrintVerboseLogs bool
  200. }
  201. // NodesSet is a value of nodes set.
  202. type NodesSet int
  203. const (
  204. // AllNodes means all containers on all nodes.
  205. AllNodes NodesSet = 0
  206. // MasterNodes means all containers on Master nodes only.
  207. MasterNodes NodesSet = 1
  208. // MasterAndDNSNodes means all containers on Master nodes and DNS containers on other nodes.
  209. MasterAndDNSNodes NodesSet = 2
  210. )
  211. // NewResourceUsageGatherer returns a new ContainerResourceGatherer.
  212. func NewResourceUsageGatherer(c clientset.Interface, options ResourceGathererOptions, pods *v1.PodList) (*ContainerResourceGatherer, error) {
  213. g := ContainerResourceGatherer{
  214. client: c,
  215. stopCh: make(chan struct{}),
  216. containerIDs: make([]string, 0),
  217. options: options,
  218. }
  219. if options.InKubemark {
  220. g.workerWg.Add(1)
  221. g.workers = append(g.workers, resourceGatherWorker{
  222. inKubemark: true,
  223. stopCh: g.stopCh,
  224. wg: &g.workerWg,
  225. finished: false,
  226. resourceDataGatheringPeriod: options.ResourceDataGatheringPeriod,
  227. probeDuration: options.ProbeDuration,
  228. printVerboseLogs: options.PrintVerboseLogs,
  229. })
  230. } else {
  231. // Tracks kube-system pods if no valid PodList is passed in.
  232. var err error
  233. if pods == nil {
  234. pods, err = c.CoreV1().Pods("kube-system").List(metav1.ListOptions{})
  235. if err != nil {
  236. e2elog.Logf("Error while listing Pods: %v", err)
  237. return nil, err
  238. }
  239. }
  240. dnsNodes := make(map[string]bool)
  241. for _, pod := range pods.Items {
  242. if (options.Nodes == MasterNodes) && !system.IsMasterNode(pod.Spec.NodeName) {
  243. continue
  244. }
  245. if (options.Nodes == MasterAndDNSNodes) && !system.IsMasterNode(pod.Spec.NodeName) && pod.Labels["k8s-app"] != "kube-dns" {
  246. continue
  247. }
  248. for _, container := range pod.Status.InitContainerStatuses {
  249. g.containerIDs = append(g.containerIDs, container.Name)
  250. }
  251. for _, container := range pod.Status.ContainerStatuses {
  252. g.containerIDs = append(g.containerIDs, container.Name)
  253. }
  254. if options.Nodes == MasterAndDNSNodes {
  255. dnsNodes[pod.Spec.NodeName] = true
  256. }
  257. }
  258. nodeList, err := c.CoreV1().Nodes().List(metav1.ListOptions{})
  259. if err != nil {
  260. e2elog.Logf("Error while listing Nodes: %v", err)
  261. return nil, err
  262. }
  263. for _, node := range nodeList.Items {
  264. if options.Nodes == AllNodes || system.IsMasterNode(node.Name) || dnsNodes[node.Name] {
  265. g.workerWg.Add(1)
  266. g.workers = append(g.workers, resourceGatherWorker{
  267. c: c,
  268. nodeName: node.Name,
  269. wg: &g.workerWg,
  270. containerIDs: g.containerIDs,
  271. stopCh: g.stopCh,
  272. finished: false,
  273. inKubemark: false,
  274. resourceDataGatheringPeriod: options.ResourceDataGatheringPeriod,
  275. probeDuration: options.ProbeDuration,
  276. printVerboseLogs: options.PrintVerboseLogs,
  277. })
  278. if options.Nodes == MasterNodes {
  279. break
  280. }
  281. }
  282. }
  283. }
  284. return &g, nil
  285. }
  286. // StartGatheringData starts a stat gathering worker blocks for each node to track,
  287. // and blocks until StopAndSummarize is called.
  288. func (g *ContainerResourceGatherer) StartGatheringData() {
  289. if len(g.workers) == 0 {
  290. return
  291. }
  292. delayPeriod := g.options.ResourceDataGatheringPeriod / time.Duration(len(g.workers))
  293. delay := time.Duration(0)
  294. for i := range g.workers {
  295. go g.workers[i].gather(delay)
  296. delay += delayPeriod
  297. }
  298. g.workerWg.Wait()
  299. }
  300. // StopAndSummarize stops stat gathering workers, processes the collected stats,
  301. // generates resource summary for the passed-in percentiles, and returns the summary.
  302. // It returns an error if the resource usage at any percentile is beyond the
  303. // specified resource constraints.
  304. func (g *ContainerResourceGatherer) StopAndSummarize(percentiles []int, constraints map[string]ResourceConstraint) (*ResourceUsageSummary, error) {
  305. close(g.stopCh)
  306. e2elog.Logf("Closed stop channel. Waiting for %v workers", len(g.workers))
  307. finished := make(chan struct{})
  308. go func() {
  309. g.workerWg.Wait()
  310. finished <- struct{}{}
  311. }()
  312. select {
  313. case <-finished:
  314. e2elog.Logf("Waitgroup finished.")
  315. case <-time.After(2 * time.Minute):
  316. unfinished := make([]string, 0)
  317. for i := range g.workers {
  318. if !g.workers[i].finished {
  319. unfinished = append(unfinished, g.workers[i].nodeName)
  320. }
  321. }
  322. e2elog.Logf("Timed out while waiting for waitgroup, some workers failed to finish: %v", unfinished)
  323. }
  324. if len(percentiles) == 0 {
  325. e2elog.Logf("Warning! Empty percentile list for stopAndPrintData.")
  326. return &ResourceUsageSummary{}, fmt.Errorf("Failed to get any resource usage data")
  327. }
  328. data := make(map[int]ResourceUsagePerContainer)
  329. for i := range g.workers {
  330. if g.workers[i].finished {
  331. stats := computePercentiles(g.workers[i].dataSeries, percentiles)
  332. data = leftMergeData(stats, data)
  333. }
  334. }
  335. // Workers has been stopped. We need to gather data stored in them.
  336. sortedKeys := []string{}
  337. for name := range data[percentiles[0]] {
  338. sortedKeys = append(sortedKeys, name)
  339. }
  340. sort.Strings(sortedKeys)
  341. violatedConstraints := make([]string, 0)
  342. summary := make(ResourceUsageSummary)
  343. for _, perc := range percentiles {
  344. for _, name := range sortedKeys {
  345. usage := data[perc][name]
  346. summary[strconv.Itoa(perc)] = append(summary[strconv.Itoa(perc)], SingleContainerSummary{
  347. Name: name,
  348. CPU: usage.CPUUsageInCores,
  349. Mem: usage.MemoryWorkingSetInBytes,
  350. })
  351. // Verifying 99th percentile of resource usage
  352. if perc == 99 {
  353. // Name has a form: <pod_name>/<container_name>
  354. containerName := strings.Split(name, "/")[1]
  355. if constraint, ok := constraints[containerName]; ok {
  356. if usage.CPUUsageInCores > constraint.CPUConstraint {
  357. violatedConstraints = append(
  358. violatedConstraints,
  359. fmt.Sprintf("Container %v is using %v/%v CPU",
  360. name,
  361. usage.CPUUsageInCores,
  362. constraint.CPUConstraint,
  363. ),
  364. )
  365. }
  366. if usage.MemoryWorkingSetInBytes > constraint.MemoryConstraint {
  367. violatedConstraints = append(
  368. violatedConstraints,
  369. fmt.Sprintf("Container %v is using %v/%v MB of memory",
  370. name,
  371. float64(usage.MemoryWorkingSetInBytes)/(1024*1024),
  372. float64(constraint.MemoryConstraint)/(1024*1024),
  373. ),
  374. )
  375. }
  376. }
  377. }
  378. }
  379. }
  380. if len(violatedConstraints) > 0 {
  381. return &summary, fmt.Errorf(strings.Join(violatedConstraints, "\n"))
  382. }
  383. return &summary, nil
  384. }