123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255 |
- /*
- Copyright 2017 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package elasticsearch
- import (
- "context"
- "encoding/json"
- "fmt"
- "strconv"
- "time"
- meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/fields"
- api "k8s.io/kubernetes/pkg/apis/core"
- "k8s.io/kubernetes/test/e2e/framework"
- e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
- e2eservice "k8s.io/kubernetes/test/e2e/framework/service"
- "k8s.io/kubernetes/test/e2e/instrumentation/logging/utils"
- )
- const (
- // esRetryTimeout is how long to keep retrying requesting elasticsearch for status information.
- esRetryTimeout = 5 * time.Minute
- // esRetryDelay is how much time to wait between two attempts to send a request to elasticsearch
- esRetryDelay = 5 * time.Second
- // searchPageSize is how many entries to search for in Elasticsearch.
- searchPageSize = 1000
- )
- var _ utils.LogProvider = &esLogProvider{}
- type esLogProvider struct {
- Framework *framework.Framework
- }
- func newEsLogProvider(f *framework.Framework) (*esLogProvider, error) {
- return &esLogProvider{Framework: f}, nil
- }
- // Ensures that elasticsearch is running and ready to serve requests
- func (p *esLogProvider) Init() error {
- f := p.Framework
- // Check for the existence of the Elasticsearch service.
- framework.Logf("Checking the Elasticsearch service exists.")
- s := f.ClientSet.CoreV1().Services(api.NamespaceSystem)
- // Make a few attempts to connect. This makes the test robust against
- // being run as the first e2e test just after the e2e cluster has been created.
- var err error
- for start := time.Now(); time.Since(start) < esRetryTimeout; time.Sleep(esRetryDelay) {
- if _, err = s.Get(context.TODO(), "elasticsearch-logging", meta_v1.GetOptions{}); err == nil {
- break
- }
- framework.Logf("Attempt to check for the existence of the Elasticsearch service failed after %v", time.Since(start))
- }
- if err != nil {
- return err
- }
- // Wait for the Elasticsearch pods to enter the running state.
- framework.Logf("Checking to make sure the Elasticsearch pods are running")
- labelSelector := fields.SelectorFromSet(fields.Set(map[string]string{"k8s-app": "elasticsearch-logging"})).String()
- options := meta_v1.ListOptions{LabelSelector: labelSelector}
- pods, err := f.ClientSet.CoreV1().Pods(api.NamespaceSystem).List(context.TODO(), options)
- if err != nil {
- return err
- }
- for _, pod := range pods.Items {
- err = e2epod.WaitForPodRunningInNamespace(f.ClientSet, &pod)
- if err != nil {
- return err
- }
- }
- framework.Logf("Checking to make sure we are talking to an Elasticsearch service.")
- // Perform a few checks to make sure this looks like an Elasticsearch cluster.
- var statusCode int
- err = nil
- var body []byte
- for start := time.Now(); time.Since(start) < esRetryTimeout; time.Sleep(esRetryDelay) {
- proxyRequest, errProxy := e2eservice.GetServicesProxyRequest(f.ClientSet, f.ClientSet.CoreV1().RESTClient().Get())
- if errProxy != nil {
- framework.Logf("After %v failed to get services proxy request: %v", time.Since(start), errProxy)
- continue
- }
- // Query against the root URL for Elasticsearch.
- response := proxyRequest.Namespace(api.NamespaceSystem).
- Name("elasticsearch-logging").
- Do(context.TODO())
- err = response.Error()
- response.StatusCode(&statusCode)
- if err != nil {
- framework.Logf("After %v proxy call to elasticsearch-loigging failed: %v", time.Since(start), err)
- continue
- }
- if int(statusCode) != 200 {
- framework.Logf("After %v Elasticsearch cluster has a bad status: %v", time.Since(start), statusCode)
- continue
- }
- break
- }
- if err != nil {
- return err
- }
- if int(statusCode) != 200 {
- framework.Failf("Elasticsearch cluster has a bad status: %v", statusCode)
- }
- // Now assume we really are talking to an Elasticsearch instance.
- // Check the cluster health.
- framework.Logf("Checking health of Elasticsearch service.")
- healthy := false
- for start := time.Now(); time.Since(start) < esRetryTimeout; time.Sleep(esRetryDelay) {
- proxyRequest, errProxy := e2eservice.GetServicesProxyRequest(f.ClientSet, f.ClientSet.CoreV1().RESTClient().Get())
- if errProxy != nil {
- framework.Logf("After %v failed to get services proxy request: %v", time.Since(start), errProxy)
- continue
- }
- body, err = proxyRequest.Namespace(api.NamespaceSystem).
- Name("elasticsearch-logging").
- Suffix("_cluster/health").
- Param("level", "indices").
- DoRaw(context.TODO())
- if err != nil {
- continue
- }
- health := make(map[string]interface{})
- err := json.Unmarshal(body, &health)
- if err != nil {
- framework.Logf("Bad json response from elasticsearch: %v", err)
- continue
- }
- statusIntf, ok := health["status"]
- if !ok {
- framework.Logf("No status field found in cluster health response: %v", health)
- continue
- }
- status := statusIntf.(string)
- if status != "green" && status != "yellow" {
- framework.Logf("Cluster health has bad status: %v", health)
- continue
- }
- if err == nil && ok {
- healthy = true
- break
- }
- }
- if !healthy {
- return fmt.Errorf("after %v elasticsearch cluster is not healthy", esRetryTimeout)
- }
- return nil
- }
- func (p *esLogProvider) Cleanup() {
- // Nothing to do
- }
- func (p *esLogProvider) ReadEntries(name string) []utils.LogEntry {
- f := p.Framework
- proxyRequest, errProxy := e2eservice.GetServicesProxyRequest(f.ClientSet, f.ClientSet.CoreV1().RESTClient().Get())
- if errProxy != nil {
- framework.Logf("Failed to get services proxy request: %v", errProxy)
- return nil
- }
- query := fmt.Sprintf("kubernetes.pod_name:%s AND kubernetes.namespace_name:%s", name, f.Namespace.Name)
- framework.Logf("Sending a search request to Elasticsearch with the following query: %s", query)
- // Ask Elasticsearch to return all the log lines that were tagged with the
- // pod name. Ask for ten times as many log lines because duplication is possible.
- body, err := proxyRequest.Namespace(api.NamespaceSystem).
- Name("elasticsearch-logging").
- Suffix("_search").
- Param("q", query).
- Param("size", strconv.Itoa(searchPageSize)).
- DoRaw(context.TODO())
- if err != nil {
- framework.Logf("Failed to make proxy call to elasticsearch-logging: %v", err)
- return nil
- }
- var response map[string]interface{}
- err = json.Unmarshal(body, &response)
- if err != nil {
- framework.Logf("Failed to unmarshal response: %v", err)
- return nil
- }
- hits, ok := response["hits"].(map[string]interface{})
- if !ok {
- framework.Logf("response[hits] not of the expected type: %T", response["hits"])
- return nil
- }
- h, ok := hits["hits"].([]interface{})
- if !ok {
- framework.Logf("Hits not of the expected type: %T", hits["hits"])
- return nil
- }
- entries := []utils.LogEntry{}
- // Iterate over the hits and populate the observed array.
- for _, e := range h {
- l, ok := e.(map[string]interface{})
- if !ok {
- framework.Logf("Element of hit not of expected type: %T", e)
- continue
- }
- source, ok := l["_source"].(map[string]interface{})
- if !ok {
- framework.Logf("_source not of the expected type: %T", l["_source"])
- continue
- }
- msg, ok := source["log"].(string)
- if ok {
- entries = append(entries, utils.LogEntry{TextPayload: msg})
- continue
- }
- obj, ok := source["log"].(map[string]interface{})
- if ok {
- entries = append(entries, utils.LogEntry{JSONPayload: obj})
- continue
- }
- framework.Logf("Log is of unknown type, got %v, want string or object in field 'log'", source)
- }
- return entries
- }
- func (p *esLogProvider) LoggingAgentName() string {
- return "fluentd-es"
- }
|