prometheus.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390
  1. /*
  2. Copyright 2018 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package monitoring
  14. import (
  15. "context"
  16. "encoding/json"
  17. "fmt"
  18. "math"
  19. "time"
  20. "github.com/prometheus/common/model"
  21. "github.com/onsi/ginkgo"
  22. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  23. clientset "k8s.io/client-go/kubernetes"
  24. "k8s.io/kubernetes/test/e2e/common"
  25. "k8s.io/kubernetes/test/e2e/framework"
  26. e2elog "k8s.io/kubernetes/test/e2e/framework/log"
  27. instrumentation "k8s.io/kubernetes/test/e2e/instrumentation/common"
  28. )
  29. const (
  30. prometheusQueryStep = time.Minute
  31. prometheusMetricErrorTolerance = 0.25
  32. prometheusMetricValidationDuration = time.Minute * 2
  33. prometheusRate = time.Minute * 2
  34. prometheusRequiredNodesUpDuration = time.Minute * 5
  35. prometheusService = "prometheus"
  36. prometheusSleepBetweenAttempts = time.Second * 30
  37. prometheusTestTimeout = time.Minute * 5
  38. customMetricValue = 1000
  39. targetCPUUsage = 0.1
  40. )
  41. var _ = instrumentation.SIGDescribe("[Feature:PrometheusMonitoring] Prometheus", func() {
  42. ginkgo.BeforeEach(func() {
  43. framework.SkipUnlessPrometheusMonitoringIsEnabled()
  44. })
  45. f := framework.NewDefaultFramework("prometheus-monitoring")
  46. ginkgo.It("should scrape container metrics from all nodes.", func() {
  47. expectedNodes, err := getAllNodes(f.ClientSet)
  48. framework.ExpectNoError(err)
  49. retryUntilSucceeds(func() error {
  50. return validateMetricAvailableForAllNodes(f.ClientSet, `container_cpu_usage_seconds_total`, expectedNodes)
  51. }, prometheusTestTimeout)
  52. })
  53. ginkgo.It("should successfully scrape all targets", func() {
  54. retryUntilSucceeds(func() error {
  55. return validateAllActiveTargetsAreHealthy(f.ClientSet)
  56. }, prometheusTestTimeout)
  57. })
  58. ginkgo.It("should contain correct container CPU metric.", func() {
  59. query := prometheusCPUQuery(f.Namespace.Name, "prometheus-cpu-consumer", prometheusRate)
  60. consumer := consumeCPUResources(f, "prometheus-cpu-consumer", targetCPUUsage*1000)
  61. defer consumer.CleanUp()
  62. retryUntilSucceeds(func() error {
  63. return validateQueryReturnsCorrectValues(f.ClientSet, query, targetCPUUsage, 3, prometheusMetricErrorTolerance)
  64. }, prometheusTestTimeout)
  65. })
  66. ginkgo.It("should scrape metrics from annotated pods.", func() {
  67. query := prometheusPodCustomMetricQuery(f.Namespace.Name, "prometheus-custom-pod-metric")
  68. consumer := exportCustomMetricFromPod(f, "prometheus-custom-pod-metric", customMetricValue)
  69. defer consumer.CleanUp()
  70. retryUntilSucceeds(func() error {
  71. return validateQueryReturnsCorrectValues(f.ClientSet, query, customMetricValue, 1, prometheusMetricErrorTolerance)
  72. }, prometheusTestTimeout)
  73. })
  74. ginkgo.It("should scrape metrics from annotated services.", func() {
  75. query := prometheusServiceCustomMetricQuery(f.Namespace.Name, "prometheus-custom-service-metric")
  76. consumer := exportCustomMetricFromService(f, "prometheus-custom-service-metric", customMetricValue)
  77. defer consumer.CleanUp()
  78. retryUntilSucceeds(func() error {
  79. return validateQueryReturnsCorrectValues(f.ClientSet, query, customMetricValue, 1, prometheusMetricErrorTolerance)
  80. }, prometheusTestTimeout)
  81. })
  82. })
  83. func prometheusCPUQuery(namespace, podNamePrefix string, rate time.Duration) string {
  84. return fmt.Sprintf(`sum(irate(container_cpu_usage_seconds_total{namespace="%v",pod_name=~"%v.*",image!=""}[%vm]))`,
  85. namespace, podNamePrefix, int64(rate.Minutes()))
  86. }
  87. func prometheusServiceCustomMetricQuery(namespace, service string) string {
  88. return fmt.Sprintf(`sum(QPS{kubernetes_namespace="%v",kubernetes_name="%v"})`, namespace, service)
  89. }
  90. func prometheusPodCustomMetricQuery(namespace, podNamePrefix string) string {
  91. return fmt.Sprintf(`sum(QPS{kubernetes_namespace="%s",kubernetes_pod_name=~"%s.*"})`, namespace, podNamePrefix)
  92. }
  93. func consumeCPUResources(f *framework.Framework, consumerName string, cpuUsage int) *common.ResourceConsumer {
  94. return common.NewDynamicResourceConsumer(consumerName, f.Namespace.Name, common.KindDeployment, 1, cpuUsage,
  95. memoryUsed, 0, int64(cpuUsage), memoryLimit, f.ClientSet, f.ScalesGetter)
  96. }
  97. func exportCustomMetricFromPod(f *framework.Framework, consumerName string, metricValue int) *common.ResourceConsumer {
  98. podAnnotations := map[string]string{
  99. "prometheus.io/scrape": "true",
  100. "prometheus.io/path": "/metrics",
  101. "prometheus.io/port": "8080",
  102. }
  103. return common.NewMetricExporter(consumerName, f.Namespace.Name, podAnnotations, nil, metricValue, f.ClientSet, f.ScalesGetter)
  104. }
  105. func exportCustomMetricFromService(f *framework.Framework, consumerName string, metricValue int) *common.ResourceConsumer {
  106. serviceAnnotations := map[string]string{
  107. "prometheus.io/scrape": "true",
  108. "prometheus.io/path": "/metrics",
  109. "prometheus.io/port": "8080",
  110. }
  111. return common.NewMetricExporter(consumerName, f.Namespace.Name, nil, serviceAnnotations, metricValue, f.ClientSet, f.ScalesGetter)
  112. }
  113. func validateMetricAvailableForAllNodes(c clientset.Interface, metric string, expectedNodesNames []string) error {
  114. instanceLabels, err := getInstanceLabelsAvailableForMetric(c, prometheusRequiredNodesUpDuration, metric)
  115. if err != nil {
  116. return err
  117. }
  118. nodesWithMetric := make(map[string]bool)
  119. for _, instance := range instanceLabels {
  120. nodesWithMetric[instance] = true
  121. }
  122. missedNodesCount := 0
  123. for _, nodeName := range expectedNodesNames {
  124. if _, found := nodesWithMetric[nodeName]; !found {
  125. missedNodesCount++
  126. }
  127. }
  128. if missedNodesCount > 0 {
  129. return fmt.Errorf("Metric not found for %v out of %v nodes", missedNodesCount, len(expectedNodesNames))
  130. }
  131. return nil
  132. }
  133. func validateAllActiveTargetsAreHealthy(c clientset.Interface) error {
  134. discovery, err := fetchPrometheusTargetDiscovery(c)
  135. if err != nil {
  136. return err
  137. }
  138. if len(discovery.ActiveTargets) == 0 {
  139. return fmt.Errorf("Prometheus is not scraping any targets, at least one target is required")
  140. }
  141. for _, target := range discovery.ActiveTargets {
  142. if target.Health != HealthGood {
  143. return fmt.Errorf("Target health not good. Target: %v", target)
  144. }
  145. }
  146. return nil
  147. }
  148. func validateQueryReturnsCorrectValues(c clientset.Interface, query string, expectedValue float64, minSamplesCount int, errorTolerance float64) error {
  149. samples, err := fetchQueryValues(c, query, prometheusMetricValidationDuration)
  150. if err != nil {
  151. return err
  152. }
  153. if len(samples) < minSamplesCount {
  154. return fmt.Errorf("Not enough samples for query '%v', got %v", query, samples)
  155. }
  156. e2elog.Logf("Executed query '%v' returned %v", query, samples)
  157. for _, value := range samples {
  158. error := math.Abs(value-expectedValue) / expectedValue
  159. if error >= errorTolerance {
  160. return fmt.Errorf("Query result values outside expected value tolerance. Expected error below %v, got %v", errorTolerance, error)
  161. }
  162. }
  163. return nil
  164. }
  165. func fetchQueryValues(c clientset.Interface, query string, duration time.Duration) ([]float64, error) {
  166. now := time.Now()
  167. response, err := queryPrometheus(c, query, now.Add(-duration), now, prometheusQueryStep)
  168. if err != nil {
  169. return nil, err
  170. }
  171. m, ok := response.(model.Matrix)
  172. if !ok {
  173. return nil, fmt.Errorf("Expected matric response, got: %T", response)
  174. }
  175. values := make([]float64, 0)
  176. for _, stream := range m {
  177. for _, sample := range stream.Values {
  178. values = append(values, float64(sample.Value))
  179. }
  180. }
  181. return values, nil
  182. }
  183. func getInstanceLabelsAvailableForMetric(c clientset.Interface, duration time.Duration, metric string) ([]string, error) {
  184. var instance model.LabelValue
  185. now := time.Now()
  186. query := fmt.Sprintf(`sum(%v)by(instance)`, metric)
  187. result, err := queryPrometheus(c, query, now.Add(-duration), now, prometheusQueryStep)
  188. if err != nil {
  189. return nil, err
  190. }
  191. instanceLabels := make([]string, 0)
  192. m, ok := result.(model.Matrix)
  193. if !ok {
  194. framework.Failf("Expected matrix response for query '%v', got: %T", query, result)
  195. return instanceLabels, nil
  196. }
  197. for _, stream := range m {
  198. if instance, ok = stream.Metric["instance"]; !ok {
  199. continue
  200. }
  201. instanceLabels = append(instanceLabels, string(instance))
  202. }
  203. return instanceLabels, nil
  204. }
  205. func fetchPrometheusTargetDiscovery(c clientset.Interface) (TargetDiscovery, error) {
  206. ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout)
  207. defer cancel()
  208. response, err := c.CoreV1().RESTClient().Get().
  209. Context(ctx).
  210. Namespace("kube-system").
  211. Resource("services").
  212. Name(prometheusService+":9090").
  213. SubResource("proxy").
  214. Suffix("api", "v1", "targets").
  215. Do().
  216. Raw()
  217. var qres promTargetsResponse
  218. if err != nil {
  219. e2elog.Logf(string(response))
  220. return qres.Data, err
  221. }
  222. err = json.Unmarshal(response, &qres)
  223. return qres.Data, nil
  224. }
  225. type promTargetsResponse struct {
  226. Status string `json:"status"`
  227. Data TargetDiscovery `json:"data"`
  228. }
  229. // TargetDiscovery has all the active targets.
  230. type TargetDiscovery struct {
  231. ActiveTargets []*Target `json:"activeTargets"`
  232. DroppedTargets []*DroppedTarget `json:"droppedTargets"`
  233. }
  234. // Target has the information for one target.
  235. type Target struct {
  236. DiscoveredLabels map[string]string `json:"discoveredLabels"`
  237. Labels map[string]string `json:"labels"`
  238. ScrapeURL string `json:"scrapeUrl"`
  239. LastError string `json:"lastError"`
  240. LastScrape time.Time `json:"lastScrape"`
  241. Health TargetHealth `json:"health"`
  242. }
  243. // DroppedTarget has the information for one target that was dropped during relabelling.
  244. type DroppedTarget struct {
  245. // Labels before any processing.
  246. DiscoveredLabels map[string]string `json:"discoveredLabels"`
  247. }
  248. // The possible health states of a target based on the last performed scrape.
  249. const (
  250. HealthUnknown TargetHealth = "unknown"
  251. HealthGood TargetHealth = "up"
  252. HealthBad TargetHealth = "down"
  253. )
  254. // TargetHealth describes the health state of a target.
  255. type TargetHealth string
  256. func queryPrometheus(c clientset.Interface, query string, start, end time.Time, step time.Duration) (model.Value, error) {
  257. ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout)
  258. defer cancel()
  259. response, err := c.CoreV1().RESTClient().Get().
  260. Context(ctx).
  261. Namespace("kube-system").
  262. Resource("services").
  263. Name(prometheusService+":9090").
  264. SubResource("proxy").
  265. Suffix("api", "v1", "query_range").
  266. Param("query", query).
  267. Param("start", fmt.Sprintf("%v", start.Unix())).
  268. Param("end", fmt.Sprintf("%v", end.Unix())).
  269. Param("step", fmt.Sprintf("%vs", step.Seconds())).
  270. Do().
  271. Raw()
  272. if err != nil {
  273. e2elog.Logf(string(response))
  274. return nil, err
  275. }
  276. var qres promQueryResponse
  277. err = json.Unmarshal(response, &qres)
  278. return model.Value(qres.Data.v), err
  279. }
  280. type promQueryResponse struct {
  281. Status string `json:"status"`
  282. Data responseData `json:"data"`
  283. }
  284. type responseData struct {
  285. Type model.ValueType `json:"resultType"`
  286. Result interface{} `json:"result"`
  287. // The decoded value.
  288. v model.Value
  289. }
  290. func (qr *responseData) UnmarshalJSON(b []byte) error {
  291. v := struct {
  292. Type model.ValueType `json:"resultType"`
  293. Result json.RawMessage `json:"result"`
  294. }{}
  295. err := json.Unmarshal(b, &v)
  296. if err != nil {
  297. return err
  298. }
  299. switch v.Type {
  300. case model.ValScalar:
  301. var sv model.Scalar
  302. err = json.Unmarshal(v.Result, &sv)
  303. qr.v = &sv
  304. case model.ValVector:
  305. var vv model.Vector
  306. err = json.Unmarshal(v.Result, &vv)
  307. qr.v = vv
  308. case model.ValMatrix:
  309. var mv model.Matrix
  310. err = json.Unmarshal(v.Result, &mv)
  311. qr.v = mv
  312. default:
  313. err = fmt.Errorf("unexpected value type %q", v.Type)
  314. }
  315. return err
  316. }
  317. func retryUntilSucceeds(validator func() error, timeout time.Duration) {
  318. startTime := time.Now()
  319. var err error
  320. for {
  321. err = validator()
  322. if err == nil {
  323. return
  324. }
  325. if time.Since(startTime) >= timeout {
  326. break
  327. }
  328. e2elog.Logf(err.Error())
  329. time.Sleep(prometheusSleepBetweenAttempts)
  330. }
  331. framework.Failf(err.Error())
  332. }
  333. func getAllNodes(c clientset.Interface) ([]string, error) {
  334. nodeList, err := c.CoreV1().Nodes().List(metav1.ListOptions{})
  335. if err != nil {
  336. return nil, err
  337. }
  338. result := []string{}
  339. for _, node := range nodeList.Items {
  340. result = append(result, node.Name)
  341. }
  342. return result, nil
  343. }