123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546 |
- /*
- Copyright 2014 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package logs
- import (
- "bytes"
- "errors"
- "fmt"
- "io"
- "io/ioutil"
- "strings"
- "sync"
- "testing"
- "testing/iotest"
- "time"
- corev1 "k8s.io/api/core/v1"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/runtime"
- "k8s.io/cli-runtime/pkg/genericclioptions"
- restclient "k8s.io/client-go/rest"
- cmdtesting "k8s.io/kubernetes/pkg/kubectl/cmd/testing"
- )
- func TestLog(t *testing.T) {
- tests := []struct {
- name string
- opts func(genericclioptions.IOStreams) *LogsOptions
- expectedErr string
- expectedOutSubstrings []string
- }{
- {
- name: "v1 - pod log",
- opts: func(streams genericclioptions.IOStreams) *LogsOptions {
- mock := &logTestMock{
- logsForObjectRequests: []restclient.ResponseWrapper{
- &responseWrapperMock{data: strings.NewReader("test log content\n")},
- },
- }
- o := NewLogsOptions(streams, false)
- o.LogsForObject = mock.mockLogsForObject
- o.ConsumeRequestFn = mock.mockConsumeRequest
- return o
- },
- expectedOutSubstrings: []string{"test log content\n"},
- },
- {
- name: "get logs from multiple requests sequentially",
- opts: func(streams genericclioptions.IOStreams) *LogsOptions {
- mock := &logTestMock{
- logsForObjectRequests: []restclient.ResponseWrapper{
- &responseWrapperMock{data: strings.NewReader("test log content from source 1\n")},
- &responseWrapperMock{data: strings.NewReader("test log content from source 2\n")},
- &responseWrapperMock{data: strings.NewReader("test log content from source 3\n")},
- },
- }
- o := NewLogsOptions(streams, false)
- o.LogsForObject = mock.mockLogsForObject
- o.ConsumeRequestFn = mock.mockConsumeRequest
- return o
- },
- expectedOutSubstrings: []string{
- // Order in this case must always be the same, because we read requests sequentially
- "test log content from source 1\ntest log content from source 2\ntest log content from source 3\n",
- },
- },
- {
- name: "follow logs from multiple requests concurrently",
- opts: func(streams genericclioptions.IOStreams) *LogsOptions {
- wg := &sync.WaitGroup{}
- mock := &logTestMock{
- logsForObjectRequests: []restclient.ResponseWrapper{
- &responseWrapperMock{data: strings.NewReader("test log content from source 1\n")},
- &responseWrapperMock{data: strings.NewReader("test log content from source 2\n")},
- &responseWrapperMock{data: strings.NewReader("test log content from source 3\n")},
- },
- wg: wg,
- }
- wg.Add(3)
- o := NewLogsOptions(streams, false)
- o.LogsForObject = mock.mockLogsForObject
- o.ConsumeRequestFn = mock.mockConsumeRequest
- o.Follow = true
- return o
- },
- expectedOutSubstrings: []string{
- "test log content from source 1\n",
- "test log content from source 2\n",
- "test log content from source 3\n",
- },
- },
- {
- name: "fail to follow logs from multiple requests when there are more logs sources then MaxFollowConcurency allows",
- opts: func(streams genericclioptions.IOStreams) *LogsOptions {
- wg := &sync.WaitGroup{}
- mock := &logTestMock{
- logsForObjectRequests: []restclient.ResponseWrapper{
- &responseWrapperMock{data: strings.NewReader("test log content\n")},
- &responseWrapperMock{data: strings.NewReader("test log content\n")},
- &responseWrapperMock{data: strings.NewReader("test log content\n")},
- },
- wg: wg,
- }
- wg.Add(3)
- o := NewLogsOptions(streams, false)
- o.LogsForObject = mock.mockLogsForObject
- o.ConsumeRequestFn = mock.mockConsumeRequest
- o.MaxFollowConcurency = 2
- o.Follow = true
- return o
- },
- expectedErr: "you are attempting to follow 3 log streams, but maximum allowed concurency is 2, use --max-log-requests to increase the limit",
- },
- {
- name: "fail if LogsForObject fails",
- opts: func(streams genericclioptions.IOStreams) *LogsOptions {
- o := NewLogsOptions(streams, false)
- o.LogsForObject = func(restClientGetter genericclioptions.RESTClientGetter, object, options runtime.Object, timeout time.Duration, allContainers bool) ([]restclient.ResponseWrapper, error) {
- return nil, errors.New("Error from the LogsForObject")
- }
- return o
- },
- expectedErr: "Error from the LogsForObject",
- },
- {
- name: "fail to get logs, if ConsumeRequestFn fails",
- opts: func(streams genericclioptions.IOStreams) *LogsOptions {
- mock := &logTestMock{
- logsForObjectRequests: []restclient.ResponseWrapper{
- &responseWrapperMock{},
- &responseWrapperMock{},
- },
- }
- o := NewLogsOptions(streams, false)
- o.LogsForObject = mock.mockLogsForObject
- o.ConsumeRequestFn = func(req restclient.ResponseWrapper, out io.Writer) error {
- return errors.New("Error from the ConsumeRequestFn")
- }
- return o
- },
- expectedErr: "Error from the ConsumeRequestFn",
- },
- {
- name: "fail to follow logs from multiple requests, if ConsumeRequestFn fails",
- opts: func(streams genericclioptions.IOStreams) *LogsOptions {
- wg := &sync.WaitGroup{}
- mock := &logTestMock{
- logsForObjectRequests: []restclient.ResponseWrapper{
- &responseWrapperMock{},
- &responseWrapperMock{},
- &responseWrapperMock{},
- },
- wg: wg,
- }
- wg.Add(3)
- o := NewLogsOptions(streams, false)
- o.LogsForObject = mock.mockLogsForObject
- o.ConsumeRequestFn = func(req restclient.ResponseWrapper, out io.Writer) error {
- return errors.New("Error from the ConsumeRequestFn")
- }
- o.Follow = true
- return o
- },
- expectedErr: "Error from the ConsumeRequestFn",
- },
- {
- name: "fail to follow logs, if ConsumeRequestFn fails",
- opts: func(streams genericclioptions.IOStreams) *LogsOptions {
- mock := &logTestMock{
- logsForObjectRequests: []restclient.ResponseWrapper{&responseWrapperMock{}},
- }
- o := NewLogsOptions(streams, false)
- o.LogsForObject = mock.mockLogsForObject
- o.ConsumeRequestFn = func(req restclient.ResponseWrapper, out io.Writer) error {
- return errors.New("Error from the ConsumeRequestFn")
- }
- o.Follow = true
- return o
- },
- expectedErr: "Error from the ConsumeRequestFn",
- },
- }
- for _, test := range tests {
- t.Run(test.name, func(t *testing.T) {
- tf := cmdtesting.NewTestFactory().WithNamespace("test")
- defer tf.Cleanup()
- streams, _, buf, _ := genericclioptions.NewTestIOStreams()
- opts := test.opts(streams)
- opts.Namespace = "test"
- opts.Object = testPod()
- opts.Options = &corev1.PodLogOptions{}
- err := opts.RunLogs()
- if err == nil && len(test.expectedErr) > 0 {
- t.Fatalf("expected error %q, got none", test.expectedErr)
- }
- if err != nil && !strings.Contains(err.Error(), test.expectedErr) {
- t.Errorf("%s: expected to find:\n\t%s\nfound:\n\t%s\n", test.name, test.expectedErr, err.Error())
- }
- bufStr := buf.String()
- if test.expectedOutSubstrings != nil {
- for _, substr := range test.expectedOutSubstrings {
- if !strings.Contains(bufStr, substr) {
- t.Errorf("%s: expected to contain %#v. Output: %#v", test.name, substr, bufStr)
- }
- }
- }
- })
- }
- }
- func testPod() *corev1.Pod {
- return &corev1.Pod{
- ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "test", ResourceVersion: "10"},
- Spec: corev1.PodSpec{
- RestartPolicy: corev1.RestartPolicyAlways,
- DNSPolicy: corev1.DNSClusterFirst,
- Containers: []corev1.Container{
- {
- Name: "bar",
- },
- },
- },
- }
- }
- func TestValidateLogOptions(t *testing.T) {
- f := cmdtesting.NewTestFactory()
- defer f.Cleanup()
- f.WithNamespace("")
- tests := []struct {
- name string
- args []string
- opts func(genericclioptions.IOStreams) *LogsOptions
- expected string
- }{
- {
- name: "since & since-time",
- opts: func(streams genericclioptions.IOStreams) *LogsOptions {
- o := NewLogsOptions(streams, false)
- o.SinceSeconds = time.Hour
- o.SinceTime = "2006-01-02T15:04:05Z"
- var err error
- o.Options, err = o.ToLogOptions()
- if err != nil {
- t.Fatalf("unexpected error: %v", err)
- }
- return o
- },
- args: []string{"foo"},
- expected: "at most one of `sinceTime` or `sinceSeconds` may be specified",
- },
- {
- name: "negative since-time",
- opts: func(streams genericclioptions.IOStreams) *LogsOptions {
- o := NewLogsOptions(streams, false)
- o.SinceSeconds = -1 * time.Second
- var err error
- o.Options, err = o.ToLogOptions()
- if err != nil {
- t.Fatalf("unexpected error: %v", err)
- }
- return o
- },
- args: []string{"foo"},
- expected: "must be greater than 0",
- },
- {
- name: "negative limit-bytes",
- opts: func(streams genericclioptions.IOStreams) *LogsOptions {
- o := NewLogsOptions(streams, false)
- o.LimitBytes = -100
- var err error
- o.Options, err = o.ToLogOptions()
- if err != nil {
- t.Fatalf("unexpected error: %v", err)
- }
- return o
- },
- args: []string{"foo"},
- expected: "must be greater than 0",
- },
- {
- name: "negative tail",
- opts: func(streams genericclioptions.IOStreams) *LogsOptions {
- o := NewLogsOptions(streams, false)
- o.Tail = -100
- var err error
- o.Options, err = o.ToLogOptions()
- if err != nil {
- t.Fatalf("unexpected error: %v", err)
- }
- return o
- },
- args: []string{"foo"},
- expected: "must be greater than or equal to 0",
- },
- {
- name: "container name combined with --all-containers",
- opts: func(streams genericclioptions.IOStreams) *LogsOptions {
- o := NewLogsOptions(streams, true)
- o.Container = "my-container"
- var err error
- o.Options, err = o.ToLogOptions()
- if err != nil {
- t.Fatalf("unexpected error: %v", err)
- }
- return o
- },
- args: []string{"my-pod", "my-container"},
- expected: "--all-containers=true should not be specified with container",
- },
- {
- name: "container name combined with second argument",
- opts: func(streams genericclioptions.IOStreams) *LogsOptions {
- o := NewLogsOptions(streams, false)
- o.Container = "my-container"
- o.ContainerNameSpecified = true
- var err error
- o.Options, err = o.ToLogOptions()
- if err != nil {
- t.Fatalf("unexpected error: %v", err)
- }
- return o
- },
- args: []string{"my-pod", "my-container"},
- expected: "only one of -c or an inline",
- },
- }
- for _, test := range tests {
- streams := genericclioptions.NewTestIOStreamsDiscard()
- o := test.opts(streams)
- o.Resources = test.args
- err := o.Validate()
- if err == nil {
- t.Fatalf("expected error %q, got none", test.expected)
- }
- if !strings.Contains(err.Error(), test.expected) {
- t.Errorf("%s: expected to find:\n\t%s\nfound:\n\t%s\n", test.name, test.expected, err.Error())
- }
- }
- }
- func TestLogComplete(t *testing.T) {
- f := cmdtesting.NewTestFactory()
- defer f.Cleanup()
- tests := []struct {
- name string
- args []string
- opts func(genericclioptions.IOStreams) *LogsOptions
- expected string
- }{
- {
- name: "One args case",
- args: []string{"foo"},
- opts: func(streams genericclioptions.IOStreams) *LogsOptions {
- o := NewLogsOptions(streams, false)
- o.Selector = "foo"
- return o
- },
- expected: "only a selector (-l) or a POD name is allowed",
- },
- }
- for _, test := range tests {
- cmd := NewCmdLogs(f, genericclioptions.NewTestIOStreamsDiscard())
- out := ""
- // checkErr breaks tests in case of errors, plus we just
- // need to check errors returned by the command validation
- o := test.opts(genericclioptions.NewTestIOStreamsDiscard())
- err := o.Complete(f, cmd, test.args)
- if err == nil {
- t.Fatalf("expected error %q, got none", test.expected)
- }
- out = err.Error()
- if !strings.Contains(out, test.expected) {
- t.Errorf("%s: expected to find:\n\t%s\nfound:\n\t%s\n", test.name, test.expected, out)
- }
- }
- }
- func TestDefaultConsumeRequest(t *testing.T) {
- tests := []struct {
- name string
- request restclient.ResponseWrapper
- expectedErr string
- expectedOut string
- }{
- {
- name: "error from request stream",
- request: &responseWrapperMock{
- err: errors.New("err from the stream"),
- },
- expectedErr: "err from the stream",
- },
- {
- name: "error while reading",
- request: &responseWrapperMock{
- data: iotest.TimeoutReader(strings.NewReader("Some data")),
- },
- expectedErr: iotest.ErrTimeout.Error(),
- expectedOut: "Some data",
- },
- {
- name: "read with empty string",
- request: &responseWrapperMock{
- data: strings.NewReader(""),
- },
- expectedOut: "",
- },
- {
- name: "read without new lines",
- request: &responseWrapperMock{
- data: strings.NewReader("some string without a new line"),
- },
- expectedOut: "some string without a new line",
- },
- {
- name: "read with newlines in the middle",
- request: &responseWrapperMock{
- data: strings.NewReader("foo\nbar"),
- },
- expectedOut: "foo\nbar",
- },
- {
- name: "read with newline at the end",
- request: &responseWrapperMock{
- data: strings.NewReader("foo\n"),
- },
- expectedOut: "foo\n",
- },
- }
- for _, test := range tests {
- t.Run(test.name, func(t *testing.T) {
- buf := &bytes.Buffer{}
- err := DefaultConsumeRequest(test.request, buf)
- if err != nil && !strings.Contains(err.Error(), test.expectedErr) {
- t.Errorf("%s: expected to find:\n\t%s\nfound:\n\t%s\n", test.name, test.expectedErr, err.Error())
- }
- if buf.String() != test.expectedOut {
- t.Errorf("%s: did not get expected log content. Got: %s", test.name, buf.String())
- }
- })
- }
- }
- type responseWrapperMock struct {
- data io.Reader
- err error
- }
- func (r *responseWrapperMock) DoRaw() ([]byte, error) {
- data, _ := ioutil.ReadAll(r.data)
- return data, r.err
- }
- func (r *responseWrapperMock) Stream() (io.ReadCloser, error) {
- return ioutil.NopCloser(r.data), r.err
- }
- type logTestMock struct {
- logsForObjectRequests []restclient.ResponseWrapper
- // We need a WaitGroup in some test cases to make sure that we fetch logs concurrently.
- // These test cases will finish successfully without the WaitGroup, but the WaitGroup
- // will help us to identify regression when someone accidentally changes
- // concurrent fetching to sequential
- wg *sync.WaitGroup
- }
- func (l *logTestMock) mockConsumeRequest(request restclient.ResponseWrapper, out io.Writer) error {
- readCloser, err := request.Stream()
- if err != nil {
- return err
- }
- defer readCloser.Close()
- // Just copy everything for a test sake
- _, err = io.Copy(out, readCloser)
- if l.wg != nil {
- l.wg.Done()
- l.wg.Wait()
- }
- return err
- }
- func (l *logTestMock) mockLogsForObject(restClientGetter genericclioptions.RESTClientGetter, object, options runtime.Object, timeout time.Duration, allContainers bool) ([]restclient.ResponseWrapper, error) {
- switch object.(type) {
- case *corev1.Pod:
- _, ok := options.(*corev1.PodLogOptions)
- if !ok {
- return nil, errors.New("provided options object is not a PodLogOptions")
- }
- return l.logsForObjectRequests, nil
- default:
- return nil, fmt.Errorf("cannot get the logs from %T", object)
- }
- }
|