utils.go 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package elasticsearch
  14. import (
  15. "context"
  16. "encoding/json"
  17. "fmt"
  18. "strconv"
  19. "time"
  20. meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  21. "k8s.io/apimachinery/pkg/fields"
  22. api "k8s.io/kubernetes/pkg/apis/core"
  23. "k8s.io/kubernetes/test/e2e/framework"
  24. e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
  25. e2eservice "k8s.io/kubernetes/test/e2e/framework/service"
  26. "k8s.io/kubernetes/test/e2e/instrumentation/logging/utils"
  27. )
  28. const (
  29. // esRetryTimeout is how long to keep retrying requesting elasticsearch for status information.
  30. esRetryTimeout = 5 * time.Minute
  31. // esRetryDelay is how much time to wait between two attempts to send a request to elasticsearch
  32. esRetryDelay = 5 * time.Second
  33. // searchPageSize is how many entries to search for in Elasticsearch.
  34. searchPageSize = 1000
  35. )
  36. var _ utils.LogProvider = &esLogProvider{}
  37. type esLogProvider struct {
  38. Framework *framework.Framework
  39. }
  40. func newEsLogProvider(f *framework.Framework) (*esLogProvider, error) {
  41. return &esLogProvider{Framework: f}, nil
  42. }
  43. // Ensures that elasticsearch is running and ready to serve requests
  44. func (p *esLogProvider) Init() error {
  45. f := p.Framework
  46. // Check for the existence of the Elasticsearch service.
  47. framework.Logf("Checking the Elasticsearch service exists.")
  48. s := f.ClientSet.CoreV1().Services(api.NamespaceSystem)
  49. // Make a few attempts to connect. This makes the test robust against
  50. // being run as the first e2e test just after the e2e cluster has been created.
  51. var err error
  52. for start := time.Now(); time.Since(start) < esRetryTimeout; time.Sleep(esRetryDelay) {
  53. if _, err = s.Get(context.TODO(), "elasticsearch-logging", meta_v1.GetOptions{}); err == nil {
  54. break
  55. }
  56. framework.Logf("Attempt to check for the existence of the Elasticsearch service failed after %v", time.Since(start))
  57. }
  58. if err != nil {
  59. return err
  60. }
  61. // Wait for the Elasticsearch pods to enter the running state.
  62. framework.Logf("Checking to make sure the Elasticsearch pods are running")
  63. labelSelector := fields.SelectorFromSet(fields.Set(map[string]string{"k8s-app": "elasticsearch-logging"})).String()
  64. options := meta_v1.ListOptions{LabelSelector: labelSelector}
  65. pods, err := f.ClientSet.CoreV1().Pods(api.NamespaceSystem).List(context.TODO(), options)
  66. if err != nil {
  67. return err
  68. }
  69. for _, pod := range pods.Items {
  70. err = e2epod.WaitForPodRunningInNamespace(f.ClientSet, &pod)
  71. if err != nil {
  72. return err
  73. }
  74. }
  75. framework.Logf("Checking to make sure we are talking to an Elasticsearch service.")
  76. // Perform a few checks to make sure this looks like an Elasticsearch cluster.
  77. var statusCode int
  78. err = nil
  79. var body []byte
  80. for start := time.Now(); time.Since(start) < esRetryTimeout; time.Sleep(esRetryDelay) {
  81. proxyRequest, errProxy := e2eservice.GetServicesProxyRequest(f.ClientSet, f.ClientSet.CoreV1().RESTClient().Get())
  82. if errProxy != nil {
  83. framework.Logf("After %v failed to get services proxy request: %v", time.Since(start), errProxy)
  84. continue
  85. }
  86. // Query against the root URL for Elasticsearch.
  87. response := proxyRequest.Namespace(api.NamespaceSystem).
  88. Name("elasticsearch-logging").
  89. Do(context.TODO())
  90. err = response.Error()
  91. response.StatusCode(&statusCode)
  92. if err != nil {
  93. framework.Logf("After %v proxy call to elasticsearch-loigging failed: %v", time.Since(start), err)
  94. continue
  95. }
  96. if int(statusCode) != 200 {
  97. framework.Logf("After %v Elasticsearch cluster has a bad status: %v", time.Since(start), statusCode)
  98. continue
  99. }
  100. break
  101. }
  102. if err != nil {
  103. return err
  104. }
  105. if int(statusCode) != 200 {
  106. framework.Failf("Elasticsearch cluster has a bad status: %v", statusCode)
  107. }
  108. // Now assume we really are talking to an Elasticsearch instance.
  109. // Check the cluster health.
  110. framework.Logf("Checking health of Elasticsearch service.")
  111. healthy := false
  112. for start := time.Now(); time.Since(start) < esRetryTimeout; time.Sleep(esRetryDelay) {
  113. proxyRequest, errProxy := e2eservice.GetServicesProxyRequest(f.ClientSet, f.ClientSet.CoreV1().RESTClient().Get())
  114. if errProxy != nil {
  115. framework.Logf("After %v failed to get services proxy request: %v", time.Since(start), errProxy)
  116. continue
  117. }
  118. body, err = proxyRequest.Namespace(api.NamespaceSystem).
  119. Name("elasticsearch-logging").
  120. Suffix("_cluster/health").
  121. Param("level", "indices").
  122. DoRaw(context.TODO())
  123. if err != nil {
  124. continue
  125. }
  126. health := make(map[string]interface{})
  127. err := json.Unmarshal(body, &health)
  128. if err != nil {
  129. framework.Logf("Bad json response from elasticsearch: %v", err)
  130. continue
  131. }
  132. statusIntf, ok := health["status"]
  133. if !ok {
  134. framework.Logf("No status field found in cluster health response: %v", health)
  135. continue
  136. }
  137. status := statusIntf.(string)
  138. if status != "green" && status != "yellow" {
  139. framework.Logf("Cluster health has bad status: %v", health)
  140. continue
  141. }
  142. if err == nil && ok {
  143. healthy = true
  144. break
  145. }
  146. }
  147. if !healthy {
  148. return fmt.Errorf("after %v elasticsearch cluster is not healthy", esRetryTimeout)
  149. }
  150. return nil
  151. }
  152. func (p *esLogProvider) Cleanup() {
  153. // Nothing to do
  154. }
  155. func (p *esLogProvider) ReadEntries(name string) []utils.LogEntry {
  156. f := p.Framework
  157. proxyRequest, errProxy := e2eservice.GetServicesProxyRequest(f.ClientSet, f.ClientSet.CoreV1().RESTClient().Get())
  158. if errProxy != nil {
  159. framework.Logf("Failed to get services proxy request: %v", errProxy)
  160. return nil
  161. }
  162. query := fmt.Sprintf("kubernetes.pod_name:%s AND kubernetes.namespace_name:%s", name, f.Namespace.Name)
  163. framework.Logf("Sending a search request to Elasticsearch with the following query: %s", query)
  164. // Ask Elasticsearch to return all the log lines that were tagged with the
  165. // pod name. Ask for ten times as many log lines because duplication is possible.
  166. body, err := proxyRequest.Namespace(api.NamespaceSystem).
  167. Name("elasticsearch-logging").
  168. Suffix("_search").
  169. Param("q", query).
  170. Param("size", strconv.Itoa(searchPageSize)).
  171. DoRaw(context.TODO())
  172. if err != nil {
  173. framework.Logf("Failed to make proxy call to elasticsearch-logging: %v", err)
  174. return nil
  175. }
  176. var response map[string]interface{}
  177. err = json.Unmarshal(body, &response)
  178. if err != nil {
  179. framework.Logf("Failed to unmarshal response: %v", err)
  180. return nil
  181. }
  182. hits, ok := response["hits"].(map[string]interface{})
  183. if !ok {
  184. framework.Logf("response[hits] not of the expected type: %T", response["hits"])
  185. return nil
  186. }
  187. h, ok := hits["hits"].([]interface{})
  188. if !ok {
  189. framework.Logf("Hits not of the expected type: %T", hits["hits"])
  190. return nil
  191. }
  192. entries := []utils.LogEntry{}
  193. // Iterate over the hits and populate the observed array.
  194. for _, e := range h {
  195. l, ok := e.(map[string]interface{})
  196. if !ok {
  197. framework.Logf("Element of hit not of expected type: %T", e)
  198. continue
  199. }
  200. source, ok := l["_source"].(map[string]interface{})
  201. if !ok {
  202. framework.Logf("_source not of the expected type: %T", l["_source"])
  203. continue
  204. }
  205. msg, ok := source["log"].(string)
  206. if ok {
  207. entries = append(entries, utils.LogEntry{TextPayload: msg})
  208. continue
  209. }
  210. obj, ok := source["log"].(map[string]interface{})
  211. if ok {
  212. entries = append(entries, utils.LogEntry{JSONPayload: obj})
  213. continue
  214. }
  215. framework.Logf("Log is of unknown type, got %v, want string or object in field 'log'", source)
  216. }
  217. return entries
  218. }
  219. func (p *esLogProvider) LoggingAgentName() string {
  220. return "fluentd-es"
  221. }