kmsgparser.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. /*
  2. Copyright 2016 Euan Kemp
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // Package kmsgparser implements a parser for the Linux `/dev/kmsg` format.
  14. // More information about this format may be found here:
  15. // https://www.kernel.org/doc/Documentation/ABI/testing/dev-kmsg
  16. // Some parts of it are slightly inspired by rsyslog's contrib module:
  17. // https://github.com/rsyslog/rsyslog/blob/v8.22.0/contrib/imkmsg/kmsg.c
  18. package kmsgparser
  19. import (
  20. "fmt"
  21. "io"
  22. "os"
  23. "strconv"
  24. "strings"
  25. "syscall"
  26. "time"
  27. )
  28. // Parser is a parser for the kernel ring buffer found at /dev/kmsg
  29. type Parser interface {
  30. // SeekEnd moves the parser to the end of the kmsg queue.
  31. SeekEnd() error
  32. // Parse provides a channel of messages read from the kernel ring buffer.
  33. // When first called, it will read the existing ringbuffer, after which it will emit new messages as they occur.
  34. Parse() <-chan Message
  35. // SetLogger sets the logger that will be used to report malformed kernel
  36. // ringbuffer lines or unexpected kmsg read errors.
  37. SetLogger(Logger)
  38. // Close closes the underlying kmsg reader for this parser
  39. Close() error
  40. }
  41. // Message represents a given kmsg logline, including its timestamp (as
  42. // calculated based on offset from boot time), its possibly multi-line body,
  43. // and so on. More information about these mssages may be found here:
  44. // https://www.kernel.org/doc/Documentation/ABI/testing/dev-kmsg
  45. type Message struct {
  46. Priority int
  47. SequenceNumber int
  48. Timestamp time.Time
  49. Message string
  50. }
  51. func NewParser() (Parser, error) {
  52. f, err := os.Open("/dev/kmsg")
  53. if err != nil {
  54. return nil, err
  55. }
  56. bootTime, err := getBootTime()
  57. if err != nil {
  58. return nil, err
  59. }
  60. return &parser{
  61. log: &StandardLogger{nil},
  62. kmsgReader: f,
  63. bootTime: bootTime,
  64. }, nil
  65. }
  66. type ReadSeekCloser interface {
  67. io.ReadCloser
  68. io.Seeker
  69. }
  70. type parser struct {
  71. log Logger
  72. kmsgReader ReadSeekCloser
  73. bootTime time.Time
  74. }
  75. func getBootTime() (time.Time, error) {
  76. var sysinfo syscall.Sysinfo_t
  77. err := syscall.Sysinfo(&sysinfo)
  78. if err != nil {
  79. return time.Time{}, fmt.Errorf("could not get boot time: %v", err)
  80. }
  81. // sysinfo only has seconds
  82. return time.Now().Add(-1 * (time.Duration(sysinfo.Uptime) * time.Second)), nil
  83. }
  84. func (p *parser) SetLogger(log Logger) {
  85. p.log = log
  86. }
  87. func (p *parser) Close() error {
  88. return p.kmsgReader.Close()
  89. }
  90. func (p *parser) SeekEnd() error {
  91. _, err := p.kmsgReader.Seek(0, os.SEEK_END)
  92. return err
  93. }
  94. // Parse will read from the provided reader and provide a channel of messages
  95. // parsed.
  96. // If the provided reader *is not* a proper Linux kmsg device, Parse might not
  97. // behave correctly since it relies on specific behavior of `/dev/kmsg`
  98. //
  99. // A goroutine is created to process the provided reader. The goroutine will
  100. // exit when the given reader is closed.
  101. // Closing the passed in reader will cause the goroutine to exit.
  102. func (p *parser) Parse() <-chan Message {
  103. output := make(chan Message, 1)
  104. go func() {
  105. defer close(output)
  106. msg := make([]byte, 8192)
  107. for {
  108. // Each read call gives us one full message.
  109. // https://www.kernel.org/doc/Documentation/ABI/testing/dev-kmsg
  110. n, err := p.kmsgReader.Read(msg)
  111. if err != nil {
  112. if err == syscall.EPIPE {
  113. p.log.Warningf("short read from kmsg; skipping")
  114. continue
  115. }
  116. if err == io.EOF {
  117. p.log.Infof("kmsg reader closed, shutting down")
  118. return
  119. }
  120. p.log.Errorf("error reading /dev/kmsg: %v", err)
  121. return
  122. }
  123. msgStr := string(msg[:n])
  124. message, err := p.parseMessage(msgStr)
  125. if err != nil {
  126. p.log.Warningf("unable to parse kmsg message %q: %v", msgStr, err)
  127. continue
  128. }
  129. output <- message
  130. }
  131. }()
  132. return output
  133. }
  134. func (p *parser) parseMessage(input string) (Message, error) {
  135. // Format:
  136. // PRIORITY,SEQUENCE_NUM,TIMESTAMP,-;MESSAGE
  137. parts := strings.SplitN(input, ";", 2)
  138. if len(parts) != 2 {
  139. return Message{}, fmt.Errorf("invalid kmsg; must contain a ';'")
  140. }
  141. metadata, message := parts[0], parts[1]
  142. metadataParts := strings.Split(metadata, ",")
  143. if len(metadataParts) < 3 {
  144. return Message{}, fmt.Errorf("invalid kmsg: must contain at least 3 ',' separated pieces at the start")
  145. }
  146. priority, sequence, timestamp := metadataParts[0], metadataParts[1], metadataParts[2]
  147. prioNum, err := strconv.Atoi(priority)
  148. if err != nil {
  149. return Message{}, fmt.Errorf("could not parse %q as priority: %v", priority, err)
  150. }
  151. sequenceNum, err := strconv.Atoi(sequence)
  152. if err != nil {
  153. return Message{}, fmt.Errorf("could not parse %q as sequence number: %v", priority, err)
  154. }
  155. timestampUsFromBoot, err := strconv.ParseInt(timestamp, 10, 64)
  156. if err != nil {
  157. return Message{}, fmt.Errorf("could not parse %q as timestamp: %v", priority, err)
  158. }
  159. // timestamp is offset in microsecond from boottime.
  160. msgTime := p.bootTime.Add(time.Duration(timestampUsFromBoot) * time.Microsecond)
  161. return Message{
  162. Priority: prioNum,
  163. SequenceNumber: sequenceNum,
  164. Timestamp: msgTime,
  165. Message: message,
  166. }, nil
  167. }