logs_test.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package logs
  14. import (
  15. "bytes"
  16. "errors"
  17. "fmt"
  18. "io"
  19. "io/ioutil"
  20. "strings"
  21. "sync"
  22. "testing"
  23. "testing/iotest"
  24. "time"
  25. corev1 "k8s.io/api/core/v1"
  26. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  27. "k8s.io/apimachinery/pkg/runtime"
  28. "k8s.io/cli-runtime/pkg/genericclioptions"
  29. restclient "k8s.io/client-go/rest"
  30. cmdtesting "k8s.io/kubernetes/pkg/kubectl/cmd/testing"
  31. )
  32. func TestLog(t *testing.T) {
  33. tests := []struct {
  34. name string
  35. opts func(genericclioptions.IOStreams) *LogsOptions
  36. expectedErr string
  37. expectedOutSubstrings []string
  38. }{
  39. {
  40. name: "v1 - pod log",
  41. opts: func(streams genericclioptions.IOStreams) *LogsOptions {
  42. mock := &logTestMock{
  43. logsForObjectRequests: []restclient.ResponseWrapper{
  44. &responseWrapperMock{data: strings.NewReader("test log content\n")},
  45. },
  46. }
  47. o := NewLogsOptions(streams, false)
  48. o.LogsForObject = mock.mockLogsForObject
  49. o.ConsumeRequestFn = mock.mockConsumeRequest
  50. return o
  51. },
  52. expectedOutSubstrings: []string{"test log content\n"},
  53. },
  54. {
  55. name: "get logs from multiple requests sequentially",
  56. opts: func(streams genericclioptions.IOStreams) *LogsOptions {
  57. mock := &logTestMock{
  58. logsForObjectRequests: []restclient.ResponseWrapper{
  59. &responseWrapperMock{data: strings.NewReader("test log content from source 1\n")},
  60. &responseWrapperMock{data: strings.NewReader("test log content from source 2\n")},
  61. &responseWrapperMock{data: strings.NewReader("test log content from source 3\n")},
  62. },
  63. }
  64. o := NewLogsOptions(streams, false)
  65. o.LogsForObject = mock.mockLogsForObject
  66. o.ConsumeRequestFn = mock.mockConsumeRequest
  67. return o
  68. },
  69. expectedOutSubstrings: []string{
  70. // Order in this case must always be the same, because we read requests sequentially
  71. "test log content from source 1\ntest log content from source 2\ntest log content from source 3\n",
  72. },
  73. },
  74. {
  75. name: "follow logs from multiple requests concurrently",
  76. opts: func(streams genericclioptions.IOStreams) *LogsOptions {
  77. wg := &sync.WaitGroup{}
  78. mock := &logTestMock{
  79. logsForObjectRequests: []restclient.ResponseWrapper{
  80. &responseWrapperMock{data: strings.NewReader("test log content from source 1\n")},
  81. &responseWrapperMock{data: strings.NewReader("test log content from source 2\n")},
  82. &responseWrapperMock{data: strings.NewReader("test log content from source 3\n")},
  83. },
  84. wg: wg,
  85. }
  86. wg.Add(3)
  87. o := NewLogsOptions(streams, false)
  88. o.LogsForObject = mock.mockLogsForObject
  89. o.ConsumeRequestFn = mock.mockConsumeRequest
  90. o.Follow = true
  91. return o
  92. },
  93. expectedOutSubstrings: []string{
  94. "test log content from source 1\n",
  95. "test log content from source 2\n",
  96. "test log content from source 3\n",
  97. },
  98. },
  99. {
  100. name: "fail to follow logs from multiple requests when there are more logs sources then MaxFollowConcurency allows",
  101. opts: func(streams genericclioptions.IOStreams) *LogsOptions {
  102. wg := &sync.WaitGroup{}
  103. mock := &logTestMock{
  104. logsForObjectRequests: []restclient.ResponseWrapper{
  105. &responseWrapperMock{data: strings.NewReader("test log content\n")},
  106. &responseWrapperMock{data: strings.NewReader("test log content\n")},
  107. &responseWrapperMock{data: strings.NewReader("test log content\n")},
  108. },
  109. wg: wg,
  110. }
  111. wg.Add(3)
  112. o := NewLogsOptions(streams, false)
  113. o.LogsForObject = mock.mockLogsForObject
  114. o.ConsumeRequestFn = mock.mockConsumeRequest
  115. o.MaxFollowConcurency = 2
  116. o.Follow = true
  117. return o
  118. },
  119. expectedErr: "you are attempting to follow 3 log streams, but maximum allowed concurency is 2, use --max-log-requests to increase the limit",
  120. },
  121. {
  122. name: "fail if LogsForObject fails",
  123. opts: func(streams genericclioptions.IOStreams) *LogsOptions {
  124. o := NewLogsOptions(streams, false)
  125. o.LogsForObject = func(restClientGetter genericclioptions.RESTClientGetter, object, options runtime.Object, timeout time.Duration, allContainers bool) ([]restclient.ResponseWrapper, error) {
  126. return nil, errors.New("Error from the LogsForObject")
  127. }
  128. return o
  129. },
  130. expectedErr: "Error from the LogsForObject",
  131. },
  132. {
  133. name: "fail to get logs, if ConsumeRequestFn fails",
  134. opts: func(streams genericclioptions.IOStreams) *LogsOptions {
  135. mock := &logTestMock{
  136. logsForObjectRequests: []restclient.ResponseWrapper{
  137. &responseWrapperMock{},
  138. &responseWrapperMock{},
  139. },
  140. }
  141. o := NewLogsOptions(streams, false)
  142. o.LogsForObject = mock.mockLogsForObject
  143. o.ConsumeRequestFn = func(req restclient.ResponseWrapper, out io.Writer) error {
  144. return errors.New("Error from the ConsumeRequestFn")
  145. }
  146. return o
  147. },
  148. expectedErr: "Error from the ConsumeRequestFn",
  149. },
  150. {
  151. name: "fail to follow logs from multiple requests, if ConsumeRequestFn fails",
  152. opts: func(streams genericclioptions.IOStreams) *LogsOptions {
  153. wg := &sync.WaitGroup{}
  154. mock := &logTestMock{
  155. logsForObjectRequests: []restclient.ResponseWrapper{
  156. &responseWrapperMock{},
  157. &responseWrapperMock{},
  158. &responseWrapperMock{},
  159. },
  160. wg: wg,
  161. }
  162. wg.Add(3)
  163. o := NewLogsOptions(streams, false)
  164. o.LogsForObject = mock.mockLogsForObject
  165. o.ConsumeRequestFn = func(req restclient.ResponseWrapper, out io.Writer) error {
  166. return errors.New("Error from the ConsumeRequestFn")
  167. }
  168. o.Follow = true
  169. return o
  170. },
  171. expectedErr: "Error from the ConsumeRequestFn",
  172. },
  173. {
  174. name: "fail to follow logs, if ConsumeRequestFn fails",
  175. opts: func(streams genericclioptions.IOStreams) *LogsOptions {
  176. mock := &logTestMock{
  177. logsForObjectRequests: []restclient.ResponseWrapper{&responseWrapperMock{}},
  178. }
  179. o := NewLogsOptions(streams, false)
  180. o.LogsForObject = mock.mockLogsForObject
  181. o.ConsumeRequestFn = func(req restclient.ResponseWrapper, out io.Writer) error {
  182. return errors.New("Error from the ConsumeRequestFn")
  183. }
  184. o.Follow = true
  185. return o
  186. },
  187. expectedErr: "Error from the ConsumeRequestFn",
  188. },
  189. }
  190. for _, test := range tests {
  191. t.Run(test.name, func(t *testing.T) {
  192. tf := cmdtesting.NewTestFactory().WithNamespace("test")
  193. defer tf.Cleanup()
  194. streams, _, buf, _ := genericclioptions.NewTestIOStreams()
  195. opts := test.opts(streams)
  196. opts.Namespace = "test"
  197. opts.Object = testPod()
  198. opts.Options = &corev1.PodLogOptions{}
  199. err := opts.RunLogs()
  200. if err == nil && len(test.expectedErr) > 0 {
  201. t.Fatalf("expected error %q, got none", test.expectedErr)
  202. }
  203. if err != nil && !strings.Contains(err.Error(), test.expectedErr) {
  204. t.Errorf("%s: expected to find:\n\t%s\nfound:\n\t%s\n", test.name, test.expectedErr, err.Error())
  205. }
  206. bufStr := buf.String()
  207. if test.expectedOutSubstrings != nil {
  208. for _, substr := range test.expectedOutSubstrings {
  209. if !strings.Contains(bufStr, substr) {
  210. t.Errorf("%s: expected to contain %#v. Output: %#v", test.name, substr, bufStr)
  211. }
  212. }
  213. }
  214. })
  215. }
  216. }
  217. func testPod() *corev1.Pod {
  218. return &corev1.Pod{
  219. ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "test", ResourceVersion: "10"},
  220. Spec: corev1.PodSpec{
  221. RestartPolicy: corev1.RestartPolicyAlways,
  222. DNSPolicy: corev1.DNSClusterFirst,
  223. Containers: []corev1.Container{
  224. {
  225. Name: "bar",
  226. },
  227. },
  228. },
  229. }
  230. }
  231. func TestValidateLogOptions(t *testing.T) {
  232. f := cmdtesting.NewTestFactory()
  233. defer f.Cleanup()
  234. f.WithNamespace("")
  235. tests := []struct {
  236. name string
  237. args []string
  238. opts func(genericclioptions.IOStreams) *LogsOptions
  239. expected string
  240. }{
  241. {
  242. name: "since & since-time",
  243. opts: func(streams genericclioptions.IOStreams) *LogsOptions {
  244. o := NewLogsOptions(streams, false)
  245. o.SinceSeconds = time.Hour
  246. o.SinceTime = "2006-01-02T15:04:05Z"
  247. var err error
  248. o.Options, err = o.ToLogOptions()
  249. if err != nil {
  250. t.Fatalf("unexpected error: %v", err)
  251. }
  252. return o
  253. },
  254. args: []string{"foo"},
  255. expected: "at most one of `sinceTime` or `sinceSeconds` may be specified",
  256. },
  257. {
  258. name: "negative since-time",
  259. opts: func(streams genericclioptions.IOStreams) *LogsOptions {
  260. o := NewLogsOptions(streams, false)
  261. o.SinceSeconds = -1 * time.Second
  262. var err error
  263. o.Options, err = o.ToLogOptions()
  264. if err != nil {
  265. t.Fatalf("unexpected error: %v", err)
  266. }
  267. return o
  268. },
  269. args: []string{"foo"},
  270. expected: "must be greater than 0",
  271. },
  272. {
  273. name: "negative limit-bytes",
  274. opts: func(streams genericclioptions.IOStreams) *LogsOptions {
  275. o := NewLogsOptions(streams, false)
  276. o.LimitBytes = -100
  277. var err error
  278. o.Options, err = o.ToLogOptions()
  279. if err != nil {
  280. t.Fatalf("unexpected error: %v", err)
  281. }
  282. return o
  283. },
  284. args: []string{"foo"},
  285. expected: "must be greater than 0",
  286. },
  287. {
  288. name: "negative tail",
  289. opts: func(streams genericclioptions.IOStreams) *LogsOptions {
  290. o := NewLogsOptions(streams, false)
  291. o.Tail = -100
  292. var err error
  293. o.Options, err = o.ToLogOptions()
  294. if err != nil {
  295. t.Fatalf("unexpected error: %v", err)
  296. }
  297. return o
  298. },
  299. args: []string{"foo"},
  300. expected: "must be greater than or equal to 0",
  301. },
  302. {
  303. name: "container name combined with --all-containers",
  304. opts: func(streams genericclioptions.IOStreams) *LogsOptions {
  305. o := NewLogsOptions(streams, true)
  306. o.Container = "my-container"
  307. var err error
  308. o.Options, err = o.ToLogOptions()
  309. if err != nil {
  310. t.Fatalf("unexpected error: %v", err)
  311. }
  312. return o
  313. },
  314. args: []string{"my-pod", "my-container"},
  315. expected: "--all-containers=true should not be specified with container",
  316. },
  317. {
  318. name: "container name combined with second argument",
  319. opts: func(streams genericclioptions.IOStreams) *LogsOptions {
  320. o := NewLogsOptions(streams, false)
  321. o.Container = "my-container"
  322. o.ContainerNameSpecified = true
  323. var err error
  324. o.Options, err = o.ToLogOptions()
  325. if err != nil {
  326. t.Fatalf("unexpected error: %v", err)
  327. }
  328. return o
  329. },
  330. args: []string{"my-pod", "my-container"},
  331. expected: "only one of -c or an inline",
  332. },
  333. }
  334. for _, test := range tests {
  335. streams := genericclioptions.NewTestIOStreamsDiscard()
  336. o := test.opts(streams)
  337. o.Resources = test.args
  338. err := o.Validate()
  339. if err == nil {
  340. t.Fatalf("expected error %q, got none", test.expected)
  341. }
  342. if !strings.Contains(err.Error(), test.expected) {
  343. t.Errorf("%s: expected to find:\n\t%s\nfound:\n\t%s\n", test.name, test.expected, err.Error())
  344. }
  345. }
  346. }
  347. func TestLogComplete(t *testing.T) {
  348. f := cmdtesting.NewTestFactory()
  349. defer f.Cleanup()
  350. tests := []struct {
  351. name string
  352. args []string
  353. opts func(genericclioptions.IOStreams) *LogsOptions
  354. expected string
  355. }{
  356. {
  357. name: "One args case",
  358. args: []string{"foo"},
  359. opts: func(streams genericclioptions.IOStreams) *LogsOptions {
  360. o := NewLogsOptions(streams, false)
  361. o.Selector = "foo"
  362. return o
  363. },
  364. expected: "only a selector (-l) or a POD name is allowed",
  365. },
  366. }
  367. for _, test := range tests {
  368. cmd := NewCmdLogs(f, genericclioptions.NewTestIOStreamsDiscard())
  369. out := ""
  370. // checkErr breaks tests in case of errors, plus we just
  371. // need to check errors returned by the command validation
  372. o := test.opts(genericclioptions.NewTestIOStreamsDiscard())
  373. err := o.Complete(f, cmd, test.args)
  374. if err == nil {
  375. t.Fatalf("expected error %q, got none", test.expected)
  376. }
  377. out = err.Error()
  378. if !strings.Contains(out, test.expected) {
  379. t.Errorf("%s: expected to find:\n\t%s\nfound:\n\t%s\n", test.name, test.expected, out)
  380. }
  381. }
  382. }
  383. func TestDefaultConsumeRequest(t *testing.T) {
  384. tests := []struct {
  385. name string
  386. request restclient.ResponseWrapper
  387. expectedErr string
  388. expectedOut string
  389. }{
  390. {
  391. name: "error from request stream",
  392. request: &responseWrapperMock{
  393. err: errors.New("err from the stream"),
  394. },
  395. expectedErr: "err from the stream",
  396. },
  397. {
  398. name: "error while reading",
  399. request: &responseWrapperMock{
  400. data: iotest.TimeoutReader(strings.NewReader("Some data")),
  401. },
  402. expectedErr: iotest.ErrTimeout.Error(),
  403. expectedOut: "Some data",
  404. },
  405. {
  406. name: "read with empty string",
  407. request: &responseWrapperMock{
  408. data: strings.NewReader(""),
  409. },
  410. expectedOut: "",
  411. },
  412. {
  413. name: "read without new lines",
  414. request: &responseWrapperMock{
  415. data: strings.NewReader("some string without a new line"),
  416. },
  417. expectedOut: "some string without a new line",
  418. },
  419. {
  420. name: "read with newlines in the middle",
  421. request: &responseWrapperMock{
  422. data: strings.NewReader("foo\nbar"),
  423. },
  424. expectedOut: "foo\nbar",
  425. },
  426. {
  427. name: "read with newline at the end",
  428. request: &responseWrapperMock{
  429. data: strings.NewReader("foo\n"),
  430. },
  431. expectedOut: "foo\n",
  432. },
  433. }
  434. for _, test := range tests {
  435. t.Run(test.name, func(t *testing.T) {
  436. buf := &bytes.Buffer{}
  437. err := DefaultConsumeRequest(test.request, buf)
  438. if err != nil && !strings.Contains(err.Error(), test.expectedErr) {
  439. t.Errorf("%s: expected to find:\n\t%s\nfound:\n\t%s\n", test.name, test.expectedErr, err.Error())
  440. }
  441. if buf.String() != test.expectedOut {
  442. t.Errorf("%s: did not get expected log content. Got: %s", test.name, buf.String())
  443. }
  444. })
  445. }
  446. }
  447. type responseWrapperMock struct {
  448. data io.Reader
  449. err error
  450. }
  451. func (r *responseWrapperMock) DoRaw() ([]byte, error) {
  452. data, _ := ioutil.ReadAll(r.data)
  453. return data, r.err
  454. }
  455. func (r *responseWrapperMock) Stream() (io.ReadCloser, error) {
  456. return ioutil.NopCloser(r.data), r.err
  457. }
  458. type logTestMock struct {
  459. logsForObjectRequests []restclient.ResponseWrapper
  460. // We need a WaitGroup in some test cases to make sure that we fetch logs concurrently.
  461. // These test cases will finish successfully without the WaitGroup, but the WaitGroup
  462. // will help us to identify regression when someone accidentally changes
  463. // concurrent fetching to sequential
  464. wg *sync.WaitGroup
  465. }
  466. func (l *logTestMock) mockConsumeRequest(request restclient.ResponseWrapper, out io.Writer) error {
  467. readCloser, err := request.Stream()
  468. if err != nil {
  469. return err
  470. }
  471. defer readCloser.Close()
  472. // Just copy everything for a test sake
  473. _, err = io.Copy(out, readCloser)
  474. if l.wg != nil {
  475. l.wg.Done()
  476. l.wg.Wait()
  477. }
  478. return err
  479. }
  480. func (l *logTestMock) mockLogsForObject(restClientGetter genericclioptions.RESTClientGetter, object, options runtime.Object, timeout time.Duration, allContainers bool) ([]restclient.ResponseWrapper, error) {
  481. switch object.(type) {
  482. case *corev1.Pod:
  483. _, ok := options.(*corev1.PodLogOptions)
  484. if !ok {
  485. return nil, errors.New("provided options object is not a PodLogOptions")
  486. }
  487. return l.logsForObjectRequests, nil
  488. default:
  489. return nil, fmt.Errorf("cannot get the logs from %T", object)
  490. }
  491. }