123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231 |
- /*
- Copyright 2017 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package utils
- import (
- "fmt"
- "strings"
- "time"
- "k8s.io/apimachinery/pkg/util/wait"
- e2elog "k8s.io/kubernetes/test/e2e/framework/log"
- )
- // LogChecker is an interface for an entity that can check whether logging
- // backend contains all wanted log entries.
- type LogChecker interface {
- EntriesIngested() (bool, error)
- Timeout() error
- }
- // IngestionPred is a type of a function that checks whether all required
- // log entries were ingested.
- type IngestionPred func(string, []LogEntry) (bool, error)
- // UntilFirstEntry is a IngestionPred that checks that at least one entry was
- // ingested.
- var UntilFirstEntry IngestionPred = func(_ string, entries []LogEntry) (bool, error) {
- return len(entries) > 0, nil
- }
- // UntilFirstEntryFromLog is a IngestionPred that checks that at least one
- // entry from the log with a given name was ingested.
- func UntilFirstEntryFromLog(log string) IngestionPred {
- return func(_ string, entries []LogEntry) (bool, error) {
- for _, e := range entries {
- if e.LogName == log {
- return true, nil
- }
- }
- return false, nil
- }
- }
- // UntilFirstEntryFromLocation is a IngestionPred that checks that at least one
- // entry from the log with a given name was ingested.
- func UntilFirstEntryFromLocation(location string) IngestionPred {
- return func(_ string, entries []LogEntry) (bool, error) {
- for _, e := range entries {
- if e.Location == location {
- return true, nil
- }
- }
- return false, nil
- }
- }
- // TimeoutFun is a function that is called when the waiting times out.
- type TimeoutFun func([]string, []bool) error
- // JustTimeout returns the error with the list of names for which backend is
- // still still missing logs.
- var JustTimeout TimeoutFun = func(names []string, ingested []bool) error {
- failedNames := []string{}
- for i, name := range names {
- if !ingested[i] {
- failedNames = append(failedNames, name)
- }
- }
- return fmt.Errorf("timed out waiting for ingestion, still not ingested: %s",
- strings.Join(failedNames, ","))
- }
- var _ LogChecker = &logChecker{}
- type logChecker struct {
- provider LogProvider
- names []string
- ingested []bool
- ingestionPred IngestionPred
- timeoutFun TimeoutFun
- }
- // NewLogChecker constructs a LogChecker for a list of names from custom
- // IngestionPred and TimeoutFun.
- func NewLogChecker(p LogProvider, pred IngestionPred, timeout TimeoutFun, names ...string) LogChecker {
- return &logChecker{
- provider: p,
- names: names,
- ingested: make([]bool, len(names)),
- ingestionPred: pred,
- timeoutFun: timeout,
- }
- }
- func (c *logChecker) EntriesIngested() (bool, error) {
- allIngested := true
- for i, name := range c.names {
- if c.ingested[i] {
- continue
- }
- entries := c.provider.ReadEntries(name)
- ingested, err := c.ingestionPred(name, entries)
- if err != nil {
- return false, err
- }
- if ingested {
- c.ingested[i] = true
- }
- allIngested = allIngested && ingested
- }
- return allIngested, nil
- }
- func (c *logChecker) Timeout() error {
- return c.timeoutFun(c.names, c.ingested)
- }
- // NumberedIngestionPred is a IngestionPred that takes into account sequential
- // numbers of ingested entries.
- type NumberedIngestionPred func(string, map[int]bool) (bool, error)
- // NumberedTimeoutFun is a TimeoutFun that takes into account sequential
- // numbers of ingested entries.
- type NumberedTimeoutFun func([]string, map[string]map[int]bool) error
- // NewNumberedLogChecker returns a log checker that works with numbered log
- // entries generated by load logging pods.
- func NewNumberedLogChecker(p LogProvider, pred NumberedIngestionPred,
- timeout NumberedTimeoutFun, names ...string) LogChecker {
- occs := map[string]map[int]bool{}
- return NewLogChecker(p, func(name string, entries []LogEntry) (bool, error) {
- occ, ok := occs[name]
- if !ok {
- occ = map[int]bool{}
- occs[name] = occ
- }
- for _, entry := range entries {
- if no, ok := entry.TryGetEntryNumber(); ok {
- occ[no] = true
- }
- }
- return pred(name, occ)
- }, func(names []string, _ []bool) error {
- return timeout(names, occs)
- }, names...)
- }
- // NewFullIngestionPodLogChecker returns a log checks that works with numbered
- // log entries generated by load logging pods and waits until all entries are
- // ingested. If timeout is reached, fraction is lost logs up to slack is
- // considered tolerable.
- func NewFullIngestionPodLogChecker(p LogProvider, slack float64, pods ...FiniteLoggingPod) LogChecker {
- podsMap := map[string]FiniteLoggingPod{}
- for _, p := range pods {
- podsMap[p.Name()] = p
- }
- return NewNumberedLogChecker(p, getFullIngestionPred(podsMap),
- getFullIngestionTimeout(podsMap, slack), getFiniteLoggingPodNames(pods)...)
- }
- func getFullIngestionPred(podsMap map[string]FiniteLoggingPod) NumberedIngestionPred {
- return func(name string, occ map[int]bool) (bool, error) {
- p := podsMap[name]
- ok := len(occ) == p.ExpectedLineCount()
- return ok, nil
- }
- }
- func getFullIngestionTimeout(podsMap map[string]FiniteLoggingPod, slack float64) NumberedTimeoutFun {
- return func(names []string, occs map[string]map[int]bool) error {
- totalGot, totalWant := 0, 0
- lossMsgs := []string{}
- for _, name := range names {
- got := len(occs[name])
- want := podsMap[name].ExpectedLineCount()
- if got != want {
- lossMsg := fmt.Sprintf("%s: %d lines", name, want-got)
- lossMsgs = append(lossMsgs, lossMsg)
- }
- totalGot += got
- totalWant += want
- }
- if len(lossMsgs) > 0 {
- e2elog.Logf("Still missing logs from:\n%s", strings.Join(lossMsgs, "\n"))
- }
- lostFrac := 1 - float64(totalGot)/float64(totalWant)
- if lostFrac > slack {
- return fmt.Errorf("still missing %.2f%% of logs, only %.2f%% is tolerable",
- lostFrac*100, slack*100)
- }
- e2elog.Logf("Missing %.2f%% of logs, which is lower than the threshold %.2f%%",
- lostFrac*100, slack*100)
- return nil
- }
- }
- // WaitForLogs checks that logs are ingested, as reported by the log checker
- // until the timeout has passed. Function sleeps for interval between two
- // log ingestion checks.
- func WaitForLogs(c LogChecker, interval, timeout time.Duration) error {
- err := wait.Poll(interval, timeout, func() (bool, error) {
- return c.EntriesIngested()
- })
- if err == wait.ErrWaitTimeout {
- return c.Timeout()
- }
- return err
- }
- func getFiniteLoggingPodNames(pods []FiniteLoggingPod) []string {
- names := []string{}
- for _, p := range pods {
- names = append(names, p.Name())
- }
- return names
- }
|