123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390 |
- /*
- Copyright 2018 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package monitoring
- import (
- "context"
- "encoding/json"
- "fmt"
- "math"
- "time"
- "github.com/prometheus/common/model"
- "github.com/onsi/ginkgo"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- clientset "k8s.io/client-go/kubernetes"
- "k8s.io/kubernetes/test/e2e/common"
- "k8s.io/kubernetes/test/e2e/framework"
- e2elog "k8s.io/kubernetes/test/e2e/framework/log"
- instrumentation "k8s.io/kubernetes/test/e2e/instrumentation/common"
- )
- const (
- prometheusQueryStep = time.Minute
- prometheusMetricErrorTolerance = 0.25
- prometheusMetricValidationDuration = time.Minute * 2
- prometheusRate = time.Minute * 2
- prometheusRequiredNodesUpDuration = time.Minute * 5
- prometheusService = "prometheus"
- prometheusSleepBetweenAttempts = time.Second * 30
- prometheusTestTimeout = time.Minute * 5
- customMetricValue = 1000
- targetCPUUsage = 0.1
- )
- var _ = instrumentation.SIGDescribe("[Feature:PrometheusMonitoring] Prometheus", func() {
- ginkgo.BeforeEach(func() {
- framework.SkipUnlessPrometheusMonitoringIsEnabled()
- })
- f := framework.NewDefaultFramework("prometheus-monitoring")
- ginkgo.It("should scrape container metrics from all nodes.", func() {
- expectedNodes, err := getAllNodes(f.ClientSet)
- framework.ExpectNoError(err)
- retryUntilSucceeds(func() error {
- return validateMetricAvailableForAllNodes(f.ClientSet, `container_cpu_usage_seconds_total`, expectedNodes)
- }, prometheusTestTimeout)
- })
- ginkgo.It("should successfully scrape all targets", func() {
- retryUntilSucceeds(func() error {
- return validateAllActiveTargetsAreHealthy(f.ClientSet)
- }, prometheusTestTimeout)
- })
- ginkgo.It("should contain correct container CPU metric.", func() {
- query := prometheusCPUQuery(f.Namespace.Name, "prometheus-cpu-consumer", prometheusRate)
- consumer := consumeCPUResources(f, "prometheus-cpu-consumer", targetCPUUsage*1000)
- defer consumer.CleanUp()
- retryUntilSucceeds(func() error {
- return validateQueryReturnsCorrectValues(f.ClientSet, query, targetCPUUsage, 3, prometheusMetricErrorTolerance)
- }, prometheusTestTimeout)
- })
- ginkgo.It("should scrape metrics from annotated pods.", func() {
- query := prometheusPodCustomMetricQuery(f.Namespace.Name, "prometheus-custom-pod-metric")
- consumer := exportCustomMetricFromPod(f, "prometheus-custom-pod-metric", customMetricValue)
- defer consumer.CleanUp()
- retryUntilSucceeds(func() error {
- return validateQueryReturnsCorrectValues(f.ClientSet, query, customMetricValue, 1, prometheusMetricErrorTolerance)
- }, prometheusTestTimeout)
- })
- ginkgo.It("should scrape metrics from annotated services.", func() {
- query := prometheusServiceCustomMetricQuery(f.Namespace.Name, "prometheus-custom-service-metric")
- consumer := exportCustomMetricFromService(f, "prometheus-custom-service-metric", customMetricValue)
- defer consumer.CleanUp()
- retryUntilSucceeds(func() error {
- return validateQueryReturnsCorrectValues(f.ClientSet, query, customMetricValue, 1, prometheusMetricErrorTolerance)
- }, prometheusTestTimeout)
- })
- })
- func prometheusCPUQuery(namespace, podNamePrefix string, rate time.Duration) string {
- return fmt.Sprintf(`sum(irate(container_cpu_usage_seconds_total{namespace="%v",pod_name=~"%v.*",image!=""}[%vm]))`,
- namespace, podNamePrefix, int64(rate.Minutes()))
- }
- func prometheusServiceCustomMetricQuery(namespace, service string) string {
- return fmt.Sprintf(`sum(QPS{kubernetes_namespace="%v",kubernetes_name="%v"})`, namespace, service)
- }
- func prometheusPodCustomMetricQuery(namespace, podNamePrefix string) string {
- return fmt.Sprintf(`sum(QPS{kubernetes_namespace="%s",kubernetes_pod_name=~"%s.*"})`, namespace, podNamePrefix)
- }
- func consumeCPUResources(f *framework.Framework, consumerName string, cpuUsage int) *common.ResourceConsumer {
- return common.NewDynamicResourceConsumer(consumerName, f.Namespace.Name, common.KindDeployment, 1, cpuUsage,
- memoryUsed, 0, int64(cpuUsage), memoryLimit, f.ClientSet, f.ScalesGetter)
- }
- func exportCustomMetricFromPod(f *framework.Framework, consumerName string, metricValue int) *common.ResourceConsumer {
- podAnnotations := map[string]string{
- "prometheus.io/scrape": "true",
- "prometheus.io/path": "/metrics",
- "prometheus.io/port": "8080",
- }
- return common.NewMetricExporter(consumerName, f.Namespace.Name, podAnnotations, nil, metricValue, f.ClientSet, f.ScalesGetter)
- }
- func exportCustomMetricFromService(f *framework.Framework, consumerName string, metricValue int) *common.ResourceConsumer {
- serviceAnnotations := map[string]string{
- "prometheus.io/scrape": "true",
- "prometheus.io/path": "/metrics",
- "prometheus.io/port": "8080",
- }
- return common.NewMetricExporter(consumerName, f.Namespace.Name, nil, serviceAnnotations, metricValue, f.ClientSet, f.ScalesGetter)
- }
- func validateMetricAvailableForAllNodes(c clientset.Interface, metric string, expectedNodesNames []string) error {
- instanceLabels, err := getInstanceLabelsAvailableForMetric(c, prometheusRequiredNodesUpDuration, metric)
- if err != nil {
- return err
- }
- nodesWithMetric := make(map[string]bool)
- for _, instance := range instanceLabels {
- nodesWithMetric[instance] = true
- }
- missedNodesCount := 0
- for _, nodeName := range expectedNodesNames {
- if _, found := nodesWithMetric[nodeName]; !found {
- missedNodesCount++
- }
- }
- if missedNodesCount > 0 {
- return fmt.Errorf("Metric not found for %v out of %v nodes", missedNodesCount, len(expectedNodesNames))
- }
- return nil
- }
- func validateAllActiveTargetsAreHealthy(c clientset.Interface) error {
- discovery, err := fetchPrometheusTargetDiscovery(c)
- if err != nil {
- return err
- }
- if len(discovery.ActiveTargets) == 0 {
- return fmt.Errorf("Prometheus is not scraping any targets, at least one target is required")
- }
- for _, target := range discovery.ActiveTargets {
- if target.Health != HealthGood {
- return fmt.Errorf("Target health not good. Target: %v", target)
- }
- }
- return nil
- }
- func validateQueryReturnsCorrectValues(c clientset.Interface, query string, expectedValue float64, minSamplesCount int, errorTolerance float64) error {
- samples, err := fetchQueryValues(c, query, prometheusMetricValidationDuration)
- if err != nil {
- return err
- }
- if len(samples) < minSamplesCount {
- return fmt.Errorf("Not enough samples for query '%v', got %v", query, samples)
- }
- e2elog.Logf("Executed query '%v' returned %v", query, samples)
- for _, value := range samples {
- error := math.Abs(value-expectedValue) / expectedValue
- if error >= errorTolerance {
- return fmt.Errorf("Query result values outside expected value tolerance. Expected error below %v, got %v", errorTolerance, error)
- }
- }
- return nil
- }
- func fetchQueryValues(c clientset.Interface, query string, duration time.Duration) ([]float64, error) {
- now := time.Now()
- response, err := queryPrometheus(c, query, now.Add(-duration), now, prometheusQueryStep)
- if err != nil {
- return nil, err
- }
- m, ok := response.(model.Matrix)
- if !ok {
- return nil, fmt.Errorf("Expected matric response, got: %T", response)
- }
- values := make([]float64, 0)
- for _, stream := range m {
- for _, sample := range stream.Values {
- values = append(values, float64(sample.Value))
- }
- }
- return values, nil
- }
- func getInstanceLabelsAvailableForMetric(c clientset.Interface, duration time.Duration, metric string) ([]string, error) {
- var instance model.LabelValue
- now := time.Now()
- query := fmt.Sprintf(`sum(%v)by(instance)`, metric)
- result, err := queryPrometheus(c, query, now.Add(-duration), now, prometheusQueryStep)
- if err != nil {
- return nil, err
- }
- instanceLabels := make([]string, 0)
- m, ok := result.(model.Matrix)
- if !ok {
- framework.Failf("Expected matrix response for query '%v', got: %T", query, result)
- return instanceLabels, nil
- }
- for _, stream := range m {
- if instance, ok = stream.Metric["instance"]; !ok {
- continue
- }
- instanceLabels = append(instanceLabels, string(instance))
- }
- return instanceLabels, nil
- }
- func fetchPrometheusTargetDiscovery(c clientset.Interface) (TargetDiscovery, error) {
- ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout)
- defer cancel()
- response, err := c.CoreV1().RESTClient().Get().
- Context(ctx).
- Namespace("kube-system").
- Resource("services").
- Name(prometheusService+":9090").
- SubResource("proxy").
- Suffix("api", "v1", "targets").
- Do().
- Raw()
- var qres promTargetsResponse
- if err != nil {
- e2elog.Logf(string(response))
- return qres.Data, err
- }
- err = json.Unmarshal(response, &qres)
- return qres.Data, nil
- }
- type promTargetsResponse struct {
- Status string `json:"status"`
- Data TargetDiscovery `json:"data"`
- }
- // TargetDiscovery has all the active targets.
- type TargetDiscovery struct {
- ActiveTargets []*Target `json:"activeTargets"`
- DroppedTargets []*DroppedTarget `json:"droppedTargets"`
- }
- // Target has the information for one target.
- type Target struct {
- DiscoveredLabels map[string]string `json:"discoveredLabels"`
- Labels map[string]string `json:"labels"`
- ScrapeURL string `json:"scrapeUrl"`
- LastError string `json:"lastError"`
- LastScrape time.Time `json:"lastScrape"`
- Health TargetHealth `json:"health"`
- }
- // DroppedTarget has the information for one target that was dropped during relabelling.
- type DroppedTarget struct {
- // Labels before any processing.
- DiscoveredLabels map[string]string `json:"discoveredLabels"`
- }
- // The possible health states of a target based on the last performed scrape.
- const (
- HealthUnknown TargetHealth = "unknown"
- HealthGood TargetHealth = "up"
- HealthBad TargetHealth = "down"
- )
- // TargetHealth describes the health state of a target.
- type TargetHealth string
- func queryPrometheus(c clientset.Interface, query string, start, end time.Time, step time.Duration) (model.Value, error) {
- ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout)
- defer cancel()
- response, err := c.CoreV1().RESTClient().Get().
- Context(ctx).
- Namespace("kube-system").
- Resource("services").
- Name(prometheusService+":9090").
- SubResource("proxy").
- Suffix("api", "v1", "query_range").
- Param("query", query).
- Param("start", fmt.Sprintf("%v", start.Unix())).
- Param("end", fmt.Sprintf("%v", end.Unix())).
- Param("step", fmt.Sprintf("%vs", step.Seconds())).
- Do().
- Raw()
- if err != nil {
- e2elog.Logf(string(response))
- return nil, err
- }
- var qres promQueryResponse
- err = json.Unmarshal(response, &qres)
- return model.Value(qres.Data.v), err
- }
- type promQueryResponse struct {
- Status string `json:"status"`
- Data responseData `json:"data"`
- }
- type responseData struct {
- Type model.ValueType `json:"resultType"`
- Result interface{} `json:"result"`
- // The decoded value.
- v model.Value
- }
- func (qr *responseData) UnmarshalJSON(b []byte) error {
- v := struct {
- Type model.ValueType `json:"resultType"`
- Result json.RawMessage `json:"result"`
- }{}
- err := json.Unmarshal(b, &v)
- if err != nil {
- return err
- }
- switch v.Type {
- case model.ValScalar:
- var sv model.Scalar
- err = json.Unmarshal(v.Result, &sv)
- qr.v = &sv
- case model.ValVector:
- var vv model.Vector
- err = json.Unmarshal(v.Result, &vv)
- qr.v = vv
- case model.ValMatrix:
- var mv model.Matrix
- err = json.Unmarshal(v.Result, &mv)
- qr.v = mv
- default:
- err = fmt.Errorf("unexpected value type %q", v.Type)
- }
- return err
- }
- func retryUntilSucceeds(validator func() error, timeout time.Duration) {
- startTime := time.Now()
- var err error
- for {
- err = validator()
- if err == nil {
- return
- }
- if time.Since(startTime) >= timeout {
- break
- }
- e2elog.Logf(err.Error())
- time.Sleep(prometheusSleepBetweenAttempts)
- }
- framework.Failf(err.Error())
- }
- func getAllNodes(c clientset.Interface) ([]string, error) {
- nodeList, err := c.CoreV1().Nodes().List(metav1.ListOptions{})
- if err != nil {
- return nil, err
- }
- result := []string{}
- for _, node := range nodeList.Items {
- result = append(result, node.Name)
- }
- return result, nil
- }
|