utils.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package elasticsearch
  14. import (
  15. "encoding/json"
  16. "fmt"
  17. "strconv"
  18. "time"
  19. meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  20. "k8s.io/apimachinery/pkg/fields"
  21. api "k8s.io/kubernetes/pkg/apis/core"
  22. "k8s.io/kubernetes/test/e2e/framework"
  23. e2elog "k8s.io/kubernetes/test/e2e/framework/log"
  24. "k8s.io/kubernetes/test/e2e/instrumentation/logging/utils"
  25. )
  26. const (
  27. // esRetryTimeout is how long to keep retrying requesting elasticsearch for status information.
  28. esRetryTimeout = 5 * time.Minute
  29. // esRetryDelay is how much time to wait between two attempts to send a request to elasticsearch
  30. esRetryDelay = 5 * time.Second
  31. // searchPageSize is how many entries to search for in Elasticsearch.
  32. searchPageSize = 1000
  33. )
  34. var _ utils.LogProvider = &esLogProvider{}
  35. type esLogProvider struct {
  36. Framework *framework.Framework
  37. }
  38. func newEsLogProvider(f *framework.Framework) (*esLogProvider, error) {
  39. return &esLogProvider{Framework: f}, nil
  40. }
  41. // Ensures that elasticsearch is running and ready to serve requests
  42. func (p *esLogProvider) Init() error {
  43. f := p.Framework
  44. // Check for the existence of the Elasticsearch service.
  45. e2elog.Logf("Checking the Elasticsearch service exists.")
  46. s := f.ClientSet.CoreV1().Services(api.NamespaceSystem)
  47. // Make a few attempts to connect. This makes the test robust against
  48. // being run as the first e2e test just after the e2e cluster has been created.
  49. var err error
  50. for start := time.Now(); time.Since(start) < esRetryTimeout; time.Sleep(esRetryDelay) {
  51. if _, err = s.Get("elasticsearch-logging", meta_v1.GetOptions{}); err == nil {
  52. break
  53. }
  54. e2elog.Logf("Attempt to check for the existence of the Elasticsearch service failed after %v", time.Since(start))
  55. }
  56. if err != nil {
  57. return err
  58. }
  59. // Wait for the Elasticsearch pods to enter the running state.
  60. e2elog.Logf("Checking to make sure the Elasticsearch pods are running")
  61. labelSelector := fields.SelectorFromSet(fields.Set(map[string]string{"k8s-app": "elasticsearch-logging"})).String()
  62. options := meta_v1.ListOptions{LabelSelector: labelSelector}
  63. pods, err := f.ClientSet.CoreV1().Pods(api.NamespaceSystem).List(options)
  64. if err != nil {
  65. return err
  66. }
  67. for _, pod := range pods.Items {
  68. err = framework.WaitForPodRunningInNamespace(f.ClientSet, &pod)
  69. if err != nil {
  70. return err
  71. }
  72. }
  73. e2elog.Logf("Checking to make sure we are talking to an Elasticsearch service.")
  74. // Perform a few checks to make sure this looks like an Elasticsearch cluster.
  75. var statusCode int
  76. err = nil
  77. var body []byte
  78. for start := time.Now(); time.Since(start) < esRetryTimeout; time.Sleep(esRetryDelay) {
  79. proxyRequest, errProxy := framework.GetServicesProxyRequest(f.ClientSet, f.ClientSet.CoreV1().RESTClient().Get())
  80. if errProxy != nil {
  81. e2elog.Logf("After %v failed to get services proxy request: %v", time.Since(start), errProxy)
  82. continue
  83. }
  84. // Query against the root URL for Elasticsearch.
  85. response := proxyRequest.Namespace(api.NamespaceSystem).
  86. Name("elasticsearch-logging").
  87. Do()
  88. err = response.Error()
  89. response.StatusCode(&statusCode)
  90. if err != nil {
  91. e2elog.Logf("After %v proxy call to elasticsearch-loigging failed: %v", time.Since(start), err)
  92. continue
  93. }
  94. if int(statusCode) != 200 {
  95. e2elog.Logf("After %v Elasticsearch cluster has a bad status: %v", time.Since(start), statusCode)
  96. continue
  97. }
  98. break
  99. }
  100. if err != nil {
  101. return err
  102. }
  103. if int(statusCode) != 200 {
  104. framework.Failf("Elasticsearch cluster has a bad status: %v", statusCode)
  105. }
  106. // Now assume we really are talking to an Elasticsearch instance.
  107. // Check the cluster health.
  108. e2elog.Logf("Checking health of Elasticsearch service.")
  109. healthy := false
  110. for start := time.Now(); time.Since(start) < esRetryTimeout; time.Sleep(esRetryDelay) {
  111. proxyRequest, errProxy := framework.GetServicesProxyRequest(f.ClientSet, f.ClientSet.CoreV1().RESTClient().Get())
  112. if errProxy != nil {
  113. e2elog.Logf("After %v failed to get services proxy request: %v", time.Since(start), errProxy)
  114. continue
  115. }
  116. body, err = proxyRequest.Namespace(api.NamespaceSystem).
  117. Name("elasticsearch-logging").
  118. Suffix("_cluster/health").
  119. Param("level", "indices").
  120. DoRaw()
  121. if err != nil {
  122. continue
  123. }
  124. health := make(map[string]interface{})
  125. err := json.Unmarshal(body, &health)
  126. if err != nil {
  127. e2elog.Logf("Bad json response from elasticsearch: %v", err)
  128. continue
  129. }
  130. statusIntf, ok := health["status"]
  131. if !ok {
  132. e2elog.Logf("No status field found in cluster health response: %v", health)
  133. continue
  134. }
  135. status := statusIntf.(string)
  136. if status != "green" && status != "yellow" {
  137. e2elog.Logf("Cluster health has bad status: %v", health)
  138. continue
  139. }
  140. if err == nil && ok {
  141. healthy = true
  142. break
  143. }
  144. }
  145. if !healthy {
  146. return fmt.Errorf("after %v elasticsearch cluster is not healthy", esRetryTimeout)
  147. }
  148. return nil
  149. }
  150. func (p *esLogProvider) Cleanup() {
  151. // Nothing to do
  152. }
  153. func (p *esLogProvider) ReadEntries(name string) []utils.LogEntry {
  154. f := p.Framework
  155. proxyRequest, errProxy := framework.GetServicesProxyRequest(f.ClientSet, f.ClientSet.CoreV1().RESTClient().Get())
  156. if errProxy != nil {
  157. e2elog.Logf("Failed to get services proxy request: %v", errProxy)
  158. return nil
  159. }
  160. query := fmt.Sprintf("kubernetes.pod_name:%s AND kubernetes.namespace_name:%s", name, f.Namespace.Name)
  161. e2elog.Logf("Sending a search request to Elasticsearch with the following query: %s", query)
  162. // Ask Elasticsearch to return all the log lines that were tagged with the
  163. // pod name. Ask for ten times as many log lines because duplication is possible.
  164. body, err := proxyRequest.Namespace(api.NamespaceSystem).
  165. Name("elasticsearch-logging").
  166. Suffix("_search").
  167. Param("q", query).
  168. // Ask for more in case we included some unrelated records in our query
  169. Param("size", strconv.Itoa(searchPageSize)).
  170. DoRaw()
  171. if err != nil {
  172. e2elog.Logf("Failed to make proxy call to elasticsearch-logging: %v", err)
  173. return nil
  174. }
  175. var response map[string]interface{}
  176. err = json.Unmarshal(body, &response)
  177. if err != nil {
  178. e2elog.Logf("Failed to unmarshal response: %v", err)
  179. return nil
  180. }
  181. hits, ok := response["hits"].(map[string]interface{})
  182. if !ok {
  183. e2elog.Logf("response[hits] not of the expected type: %T", response["hits"])
  184. return nil
  185. }
  186. h, ok := hits["hits"].([]interface{})
  187. if !ok {
  188. e2elog.Logf("Hits not of the expected type: %T", hits["hits"])
  189. return nil
  190. }
  191. entries := []utils.LogEntry{}
  192. // Iterate over the hits and populate the observed array.
  193. for _, e := range h {
  194. l, ok := e.(map[string]interface{})
  195. if !ok {
  196. e2elog.Logf("Element of hit not of expected type: %T", e)
  197. continue
  198. }
  199. source, ok := l["_source"].(map[string]interface{})
  200. if !ok {
  201. e2elog.Logf("_source not of the expected type: %T", l["_source"])
  202. continue
  203. }
  204. msg, ok := source["log"].(string)
  205. if ok {
  206. entries = append(entries, utils.LogEntry{TextPayload: msg})
  207. continue
  208. }
  209. obj, ok := source["log"].(map[string]interface{})
  210. if ok {
  211. entries = append(entries, utils.LogEntry{JSONPayload: obj})
  212. continue
  213. }
  214. e2elog.Logf("Log is of unknown type, got %v, want string or object in field 'log'", source)
  215. }
  216. return entries
  217. }
  218. func (p *esLogProvider) LoggingAgentName() string {
  219. return "fluentd-es"
  220. }