podlogs.go 8.5 KB


  1. /*
  2. Copyright 2018 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // Package podlogs enables live capturing of all events and log
  14. // messages for some or all pods in a namespace as they get generated.
  15. // This helps debugging both a running test (what is currently going
  16. // on?) and the output of a CI run (events appear in chronological
  17. // order and output that normally isn't available like the command
  18. // stdout messages are available).
  19. package podlogs
  20. import (
  21. "bufio"
  22. "bytes"
  23. "context"
  24. "fmt"
  25. "io"
  26. "os"
  27. "path"
  28. "regexp"
  29. "strings"
  30. "sync"
  31. "github.com/pkg/errors"
  32. "k8s.io/api/core/v1"
  33. meta "k8s.io/apimachinery/pkg/apis/meta/v1"
  34. clientset "k8s.io/client-go/kubernetes"
  35. )
  36. // LogsForPod starts reading the logs for a certain pod. If the pod has more than one
  37. // container, opts.Container must be set. Reading stops when the context is done.
  38. // The stream includes formatted error messages and ends with
  39. // rpc error: code = Unknown desc = Error: No such container: 41a...
  40. // when the pod gets deleted while streaming.
  41. func LogsForPod(ctx context.Context, cs clientset.Interface, ns, pod string, opts *v1.PodLogOptions) (io.ReadCloser, error) {
  42. req := cs.CoreV1().Pods(ns).GetLogs(pod, opts)
  43. return req.Context(ctx).Stream()
  44. }
  45. // LogOutput determines where output from CopyAllLogs goes.
  46. type LogOutput struct {
  47. // If not nil, errors will be logged here.
  48. StatusWriter io.Writer
  49. // If not nil, all output goes to this writer with "<pod>/<container>:" as prefix.
  50. LogWriter io.Writer
  51. // Base directory for one log file per container.
  52. // The full path of each log file will be <log path prefix><pod>-<container>.log.
  53. LogPathPrefix string
  54. }
  55. // Matches harmless errors from pkg/kubelet/kubelet_pods.go.
  56. var expectedErrors = regexp.MustCompile(`container .* in pod .* is (terminated|waiting to start|not available)|the server could not find the requested resource`)
  57. // CopyAllLogs follows the logs of all containers in all pods,
  58. // including those that get created in the future, and writes each log
  59. // line as configured in the output options. It does that until the
  60. // context is done or until an error occurs.
  61. //
  62. // Beware that there is currently no way to force log collection
  63. // before removing pods, which means that there is a known race
  64. // between "stop pod" and "collecting log entries". The alternative
  65. // would be a blocking function with collects logs from all currently
  66. // running pods, but that then would have the disadvantage that
  67. // already deleted pods aren't covered.
  68. func CopyAllLogs(ctx context.Context, cs clientset.Interface, ns string, to LogOutput) error {
  69. watcher, err := cs.CoreV1().Pods(ns).Watch(meta.ListOptions{})
  70. if err != nil {
  71. return errors.Wrap(err, "cannot create Pod event watcher")
  72. }
  73. go func() {
  74. var m sync.Mutex
  75. logging := map[string]bool{}
  76. check := func() {
  77. m.Lock()
  78. defer m.Unlock()
  79. pods, err := cs.CoreV1().Pods(ns).List(meta.ListOptions{})
  80. if err != nil {
  81. if to.StatusWriter != nil {
  82. fmt.Fprintf(to.StatusWriter, "ERROR: get pod list in %s: %s\n", ns, err)
  83. }
  84. return
  85. }
  86. for _, pod := range pods.Items {
  87. for i, c := range pod.Spec.Containers {
  88. name := pod.ObjectMeta.Name + "/" + c.Name
  89. if logging[name] ||
  90. // sanity check, array should have entry for each container
  91. len(pod.Status.ContainerStatuses) <= i ||
  92. // Don't attempt to get logs for a container unless it is running or has terminated.
  93. // Trying to get a log would just end up with an error that we would have to suppress.
  94. (pod.Status.ContainerStatuses[i].State.Running == nil &&
  95. pod.Status.ContainerStatuses[i].State.Terminated == nil) {
  96. continue
  97. }
  98. readCloser, err := LogsForPod(ctx, cs, ns, pod.ObjectMeta.Name,
  99. &v1.PodLogOptions{
  100. Container: c.Name,
  101. Follow: true,
  102. })
  103. if err != nil {
  104. // We do get "normal" errors here, like trying to read too early.
  105. // We can ignore those.
  106. if to.StatusWriter != nil &&
  107. expectedErrors.FindStringIndex(err.Error()) == nil {
  108. fmt.Fprintf(to.StatusWriter, "WARNING: pod log: %s: %s\n", name, err)
  109. }
  110. continue
  111. }
  112. // Determine where we write. If this fails, we intentionally return without clearing
  113. // the logging[name] flag, which prevents trying over and over again to
  114. // create the output file.
  115. var out io.Writer
  116. var closer io.Closer
  117. var prefix string
  118. if to.LogWriter != nil {
  119. out = to.LogWriter
  120. prefix = name + ": "
  121. } else {
  122. var err error
  123. filename := to.LogPathPrefix + pod.ObjectMeta.Name + "-" + c.Name + ".log"
  124. err = os.MkdirAll(path.Dir(filename), 0755)
  125. if err != nil {
  126. if to.StatusWriter != nil {
  127. fmt.Fprintf(to.StatusWriter, "ERROR: pod log: create directory for %s: %s\n", filename, err)
  128. }
  129. return
  130. }
  131. // The test suite might run the same test multiple times,
  132. // so we have to append here.
  133. file, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
  134. if err != nil {
  135. if to.StatusWriter != nil {
  136. fmt.Fprintf(to.StatusWriter, "ERROR: pod log: create file %s: %s\n", filename, err)
  137. }
  138. return
  139. }
  140. closer = file
  141. out = file
  142. }
  143. go func() {
  144. if closer != nil {
  145. defer closer.Close()
  146. }
  147. defer func() {
  148. m.Lock()
  149. logging[name] = false
  150. m.Unlock()
  151. readCloser.Close()
  152. }()
  153. scanner := bufio.NewScanner(readCloser)
  154. first := true
  155. for scanner.Scan() {
  156. line := scanner.Text()
  157. // Filter out the expected "end of stream" error message,
  158. // it would just confuse developers who don't know about it.
  159. // Same for attempts to read logs from a container that
  160. // isn't ready (yet?!).
  161. if !strings.HasPrefix(line, "rpc error: code = Unknown desc = Error: No such container:") &&
  162. !strings.HasPrefix(line, "Unable to retrieve container logs for ") {
  163. if first {
  164. if to.LogWriter == nil {
  165. // Because the same log might be written to multiple times
  166. // in different test instances, log an extra line to separate them.
  167. // Also provides some useful extra information.
  168. fmt.Fprintf(out, "==== start of log for container %s ====\n", name)
  169. }
  170. first = false
  171. }
  172. fmt.Fprintf(out, "%s%s\n", prefix, scanner.Text())
  173. }
  174. }
  175. }()
  176. logging[name] = true
  177. }
  178. }
  179. }
  180. // Watch events to see whether we can start logging
  181. // and log interesting ones.
  182. check()
  183. for {
  184. select {
  185. case <-watcher.ResultChan():
  186. check()
  187. case <-ctx.Done():
  188. return
  189. }
  190. }
  191. }()
  192. return nil
  193. }
  194. // WatchPods prints pod status events for a certain namespace or all namespaces
  195. // when namespace name is empty.
  196. func WatchPods(ctx context.Context, cs clientset.Interface, ns string, to io.Writer) error {
  197. watcher, err := cs.CoreV1().Pods(ns).Watch(meta.ListOptions{})
  198. if err != nil {
  199. return errors.Wrap(err, "cannot create Pod event watcher")
  200. }
  201. go func() {
  202. defer watcher.Stop()
  203. for {
  204. select {
  205. case e := <-watcher.ResultChan():
  206. if e.Object == nil {
  207. continue
  208. }
  209. pod, ok := e.Object.(*v1.Pod)
  210. if !ok {
  211. continue
  212. }
  213. buffer := new(bytes.Buffer)
  214. fmt.Fprintf(buffer,
  215. "pod event: %s: %s/%s %s: %s %s\n",
  216. e.Type,
  217. pod.Namespace,
  218. pod.Name,
  219. pod.Status.Phase,
  220. pod.Status.Reason,
  221. pod.Status.Conditions,
  222. )
  223. for _, cst := range pod.Status.ContainerStatuses {
  224. fmt.Fprintf(buffer, " %s: ", cst.Name)
  225. if cst.State.Waiting != nil {
  226. fmt.Fprintf(buffer, "WAITING: %s - %s",
  227. cst.State.Waiting.Reason,
  228. cst.State.Waiting.Message,
  229. )
  230. } else if cst.State.Running != nil {
  231. fmt.Fprintf(buffer, "RUNNING")
  232. } else if cst.State.Waiting != nil {
  233. fmt.Fprintf(buffer, "TERMINATED: %s - %s",
  234. cst.State.Waiting.Reason,
  235. cst.State.Waiting.Message,
  236. )
  237. }
  238. fmt.Fprintf(buffer, "\n")
  239. }
  240. to.Write(buffer.Bytes())
  241. case <-ctx.Done():
  242. return
  243. }
  244. }
  245. }()
  246. return nil
  247. }