123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201 |
- /*
- Copyright 2016 Euan Kemp
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- // Package kmsgparser implements a parser for the Linux `/dev/kmsg` format.
- // More information about this format may be found here:
- // https://www.kernel.org/doc/Documentation/ABI/testing/dev-kmsg
- // Some parts of it are slightly inspired by rsyslog's contrib module:
- // https://github.com/rsyslog/rsyslog/blob/v8.22.0/contrib/imkmsg/kmsg.c
- package kmsgparser
- import (
- "fmt"
- "io"
- "os"
- "strconv"
- "strings"
- "syscall"
- "time"
- )
- // Parser is a parser for the kernel ring buffer found at /dev/kmsg
- type Parser interface {
- // SeekEnd moves the parser to the end of the kmsg queue.
- SeekEnd() error
- // Parse provides a channel of messages read from the kernel ring buffer.
- // When first called, it will read the existing ringbuffer, after which it will emit new messages as they occur.
- Parse() <-chan Message
- // SetLogger sets the logger that will be used to report malformed kernel
- // ringbuffer lines or unexpected kmsg read errors.
- SetLogger(Logger)
- // Close closes the underlying kmsg reader for this parser
- Close() error
- }
- // Message represents a given kmsg logline, including its timestamp (as
- // calculated based on offset from boot time), its possibly multi-line body,
- // and so on. More information about these mssages may be found here:
- // https://www.kernel.org/doc/Documentation/ABI/testing/dev-kmsg
- type Message struct {
- Priority int
- SequenceNumber int
- Timestamp time.Time
- Message string
- }
- func NewParser() (Parser, error) {
- f, err := os.Open("/dev/kmsg")
- if err != nil {
- return nil, err
- }
- bootTime, err := getBootTime()
- if err != nil {
- return nil, err
- }
- return &parser{
- log: &StandardLogger{nil},
- kmsgReader: f,
- bootTime: bootTime,
- }, nil
- }
- type ReadSeekCloser interface {
- io.ReadCloser
- io.Seeker
- }
- type parser struct {
- log Logger
- kmsgReader ReadSeekCloser
- bootTime time.Time
- }
- func getBootTime() (time.Time, error) {
- var sysinfo syscall.Sysinfo_t
- err := syscall.Sysinfo(&sysinfo)
- if err != nil {
- return time.Time{}, fmt.Errorf("could not get boot time: %v", err)
- }
- // sysinfo only has seconds
- return time.Now().Add(-1 * (time.Duration(sysinfo.Uptime) * time.Second)), nil
- }
- func (p *parser) SetLogger(log Logger) {
- p.log = log
- }
- func (p *parser) Close() error {
- return p.kmsgReader.Close()
- }
- func (p *parser) SeekEnd() error {
- _, err := p.kmsgReader.Seek(0, os.SEEK_END)
- return err
- }
- // Parse will read from the provided reader and provide a channel of messages
- // parsed.
- // If the provided reader *is not* a proper Linux kmsg device, Parse might not
- // behave correctly since it relies on specific behavior of `/dev/kmsg`
- //
- // A goroutine is created to process the provided reader. The goroutine will
- // exit when the given reader is closed.
- // Closing the passed in reader will cause the goroutine to exit.
- func (p *parser) Parse() <-chan Message {
- output := make(chan Message, 1)
- go func() {
- defer close(output)
- msg := make([]byte, 8192)
- for {
- // Each read call gives us one full message.
- // https://www.kernel.org/doc/Documentation/ABI/testing/dev-kmsg
- n, err := p.kmsgReader.Read(msg)
- if err != nil {
- if err == syscall.EPIPE {
- p.log.Warningf("short read from kmsg; skipping")
- continue
- }
- if err == io.EOF {
- p.log.Infof("kmsg reader closed, shutting down")
- return
- }
- p.log.Errorf("error reading /dev/kmsg: %v", err)
- return
- }
- msgStr := string(msg[:n])
- message, err := p.parseMessage(msgStr)
- if err != nil {
- p.log.Warningf("unable to parse kmsg message %q: %v", msgStr, err)
- continue
- }
- output <- message
- }
- }()
- return output
- }
- func (p *parser) parseMessage(input string) (Message, error) {
- // Format:
- // PRIORITY,SEQUENCE_NUM,TIMESTAMP,-;MESSAGE
- parts := strings.SplitN(input, ";", 2)
- if len(parts) != 2 {
- return Message{}, fmt.Errorf("invalid kmsg; must contain a ';'")
- }
- metadata, message := parts[0], parts[1]
- metadataParts := strings.Split(metadata, ",")
- if len(metadataParts) < 3 {
- return Message{}, fmt.Errorf("invalid kmsg: must contain at least 3 ',' separated pieces at the start")
- }
- priority, sequence, timestamp := metadataParts[0], metadataParts[1], metadataParts[2]
- prioNum, err := strconv.Atoi(priority)
- if err != nil {
- return Message{}, fmt.Errorf("could not parse %q as priority: %v", priority, err)
- }
- sequenceNum, err := strconv.Atoi(sequence)
- if err != nil {
- return Message{}, fmt.Errorf("could not parse %q as sequence number: %v", priority, err)
- }
- timestampUsFromBoot, err := strconv.ParseInt(timestamp, 10, 64)
- if err != nil {
- return Message{}, fmt.Errorf("could not parse %q as timestamp: %v", priority, err)
- }
- // timestamp is offset in microsecond from boottime.
- msgTime := p.bootTime.Add(time.Duration(timestampUsFromBoot) * time.Microsecond)
- return Message{
- Priority: prioNum,
- SequenceNumber: sequenceNum,
- Timestamp: msgTime,
- Message: message,
- }, nil
- }
|