wait.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package utils
  14. import (
  15. "fmt"
  16. "strings"
  17. "time"
  18. "k8s.io/apimachinery/pkg/util/wait"
  19. e2elog "k8s.io/kubernetes/test/e2e/framework/log"
  20. )
  21. // LogChecker is an interface for an entity that can check whether logging
  22. // backend contains all wanted log entries.
  23. type LogChecker interface {
  24. EntriesIngested() (bool, error)
  25. Timeout() error
  26. }
  27. // IngestionPred is a type of a function that checks whether all required
  28. // log entries were ingested.
  29. type IngestionPred func(string, []LogEntry) (bool, error)
  30. // UntilFirstEntry is a IngestionPred that checks that at least one entry was
  31. // ingested.
  32. var UntilFirstEntry IngestionPred = func(_ string, entries []LogEntry) (bool, error) {
  33. return len(entries) > 0, nil
  34. }
  35. // UntilFirstEntryFromLog is a IngestionPred that checks that at least one
  36. // entry from the log with a given name was ingested.
  37. func UntilFirstEntryFromLog(log string) IngestionPred {
  38. return func(_ string, entries []LogEntry) (bool, error) {
  39. for _, e := range entries {
  40. if e.LogName == log {
  41. return true, nil
  42. }
  43. }
  44. return false, nil
  45. }
  46. }
  47. // UntilFirstEntryFromLocation is a IngestionPred that checks that at least one
  48. // entry from the log with a given name was ingested.
  49. func UntilFirstEntryFromLocation(location string) IngestionPred {
  50. return func(_ string, entries []LogEntry) (bool, error) {
  51. for _, e := range entries {
  52. if e.Location == location {
  53. return true, nil
  54. }
  55. }
  56. return false, nil
  57. }
  58. }
  59. // TimeoutFun is a function that is called when the waiting times out.
  60. type TimeoutFun func([]string, []bool) error
  61. // JustTimeout returns the error with the list of names for which backend is
  62. // still still missing logs.
  63. var JustTimeout TimeoutFun = func(names []string, ingested []bool) error {
  64. failedNames := []string{}
  65. for i, name := range names {
  66. if !ingested[i] {
  67. failedNames = append(failedNames, name)
  68. }
  69. }
  70. return fmt.Errorf("timed out waiting for ingestion, still not ingested: %s",
  71. strings.Join(failedNames, ","))
  72. }
  73. var _ LogChecker = &logChecker{}
  74. type logChecker struct {
  75. provider LogProvider
  76. names []string
  77. ingested []bool
  78. ingestionPred IngestionPred
  79. timeoutFun TimeoutFun
  80. }
  81. // NewLogChecker constructs a LogChecker for a list of names from custom
  82. // IngestionPred and TimeoutFun.
  83. func NewLogChecker(p LogProvider, pred IngestionPred, timeout TimeoutFun, names ...string) LogChecker {
  84. return &logChecker{
  85. provider: p,
  86. names: names,
  87. ingested: make([]bool, len(names)),
  88. ingestionPred: pred,
  89. timeoutFun: timeout,
  90. }
  91. }
  92. func (c *logChecker) EntriesIngested() (bool, error) {
  93. allIngested := true
  94. for i, name := range c.names {
  95. if c.ingested[i] {
  96. continue
  97. }
  98. entries := c.provider.ReadEntries(name)
  99. ingested, err := c.ingestionPred(name, entries)
  100. if err != nil {
  101. return false, err
  102. }
  103. if ingested {
  104. c.ingested[i] = true
  105. }
  106. allIngested = allIngested && ingested
  107. }
  108. return allIngested, nil
  109. }
  110. func (c *logChecker) Timeout() error {
  111. return c.timeoutFun(c.names, c.ingested)
  112. }
  113. // NumberedIngestionPred is a IngestionPred that takes into account sequential
  114. // numbers of ingested entries.
  115. type NumberedIngestionPred func(string, map[int]bool) (bool, error)
  116. // NumberedTimeoutFun is a TimeoutFun that takes into account sequential
  117. // numbers of ingested entries.
  118. type NumberedTimeoutFun func([]string, map[string]map[int]bool) error
  119. // NewNumberedLogChecker returns a log checker that works with numbered log
  120. // entries generated by load logging pods.
  121. func NewNumberedLogChecker(p LogProvider, pred NumberedIngestionPred,
  122. timeout NumberedTimeoutFun, names ...string) LogChecker {
  123. occs := map[string]map[int]bool{}
  124. return NewLogChecker(p, func(name string, entries []LogEntry) (bool, error) {
  125. occ, ok := occs[name]
  126. if !ok {
  127. occ = map[int]bool{}
  128. occs[name] = occ
  129. }
  130. for _, entry := range entries {
  131. if no, ok := entry.TryGetEntryNumber(); ok {
  132. occ[no] = true
  133. }
  134. }
  135. return pred(name, occ)
  136. }, func(names []string, _ []bool) error {
  137. return timeout(names, occs)
  138. }, names...)
  139. }
  140. // NewFullIngestionPodLogChecker returns a log checks that works with numbered
  141. // log entries generated by load logging pods and waits until all entries are
  142. // ingested. If timeout is reached, fraction is lost logs up to slack is
  143. // considered tolerable.
  144. func NewFullIngestionPodLogChecker(p LogProvider, slack float64, pods ...FiniteLoggingPod) LogChecker {
  145. podsMap := map[string]FiniteLoggingPod{}
  146. for _, p := range pods {
  147. podsMap[p.Name()] = p
  148. }
  149. return NewNumberedLogChecker(p, getFullIngestionPred(podsMap),
  150. getFullIngestionTimeout(podsMap, slack), getFiniteLoggingPodNames(pods)...)
  151. }
  152. func getFullIngestionPred(podsMap map[string]FiniteLoggingPod) NumberedIngestionPred {
  153. return func(name string, occ map[int]bool) (bool, error) {
  154. p := podsMap[name]
  155. ok := len(occ) == p.ExpectedLineCount()
  156. return ok, nil
  157. }
  158. }
  159. func getFullIngestionTimeout(podsMap map[string]FiniteLoggingPod, slack float64) NumberedTimeoutFun {
  160. return func(names []string, occs map[string]map[int]bool) error {
  161. totalGot, totalWant := 0, 0
  162. lossMsgs := []string{}
  163. for _, name := range names {
  164. got := len(occs[name])
  165. want := podsMap[name].ExpectedLineCount()
  166. if got != want {
  167. lossMsg := fmt.Sprintf("%s: %d lines", name, want-got)
  168. lossMsgs = append(lossMsgs, lossMsg)
  169. }
  170. totalGot += got
  171. totalWant += want
  172. }
  173. if len(lossMsgs) > 0 {
  174. e2elog.Logf("Still missing logs from:\n%s", strings.Join(lossMsgs, "\n"))
  175. }
  176. lostFrac := 1 - float64(totalGot)/float64(totalWant)
  177. if lostFrac > slack {
  178. return fmt.Errorf("still missing %.2f%% of logs, only %.2f%% is tolerable",
  179. lostFrac*100, slack*100)
  180. }
  181. e2elog.Logf("Missing %.2f%% of logs, which is lower than the threshold %.2f%%",
  182. lostFrac*100, slack*100)
  183. return nil
  184. }
  185. }
  186. // WaitForLogs checks that logs are ingested, as reported by the log checker
  187. // until the timeout has passed. Function sleeps for interval between two
  188. // log ingestion checks.
  189. func WaitForLogs(c LogChecker, interval, timeout time.Duration) error {
  190. err := wait.Poll(interval, timeout, func() (bool, error) {
  191. return c.EntriesIngested()
  192. })
  193. if err == wait.ErrWaitTimeout {
  194. return c.Timeout()
  195. }
  196. return err
  197. }
  198. func getFiniteLoggingPodNames(pods []FiniteLoggingPod) []string {
  199. names := []string{}
  200. for _, p := range pods {
  201. names = append(names, p.Name())
  202. }
  203. return names
  204. }