123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388 |
- /*
- Copyright 2018 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package logs
- import (
- "compress/gzip"
- "fmt"
- "io"
- "os"
- "path/filepath"
- "sort"
- "strings"
- "time"
- "k8s.io/klog"
- "k8s.io/apimachinery/pkg/api/resource"
- "k8s.io/apimachinery/pkg/util/clock"
- "k8s.io/apimachinery/pkg/util/wait"
- internalapi "k8s.io/cri-api/pkg/apis"
- runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
- )
- const (
- // logMonitorPeriod is the period container log manager monitors
- // container logs and performs log rotation.
- logMonitorPeriod = 10 * time.Second
- // timestampFormat is format of the timestamp suffix for rotated log.
- // See https://golang.org/pkg/time/#Time.Format.
- timestampFormat = "20060102-150405"
- // compressSuffix is the suffix for compressed log.
- compressSuffix = ".gz"
- // tmpSuffix is the suffix for temporary file.
- tmpSuffix = ".tmp"
- )
- // ContainerLogManager manages lifecycle of all container logs.
- //
- // Implementation is thread-safe.
- type ContainerLogManager interface {
- // TODO(random-liu): Add RotateLogs function and call it under disk pressure.
- // Start container log manager.
- Start()
- }
- // LogRotatePolicy is a policy for container log rotation. The policy applies to all
- // containers managed by kubelet.
- type LogRotatePolicy struct {
- // MaxSize in bytes of the container log file before it is rotated. Negative
- // number means to disable container log rotation.
- MaxSize int64
- // MaxFiles is the maximum number of log files that can be present.
- // If rotating the logs creates excess files, the oldest file is removed.
- MaxFiles int
- }
- // GetAllLogs gets all inuse (rotated/compressed) logs for a specific container log.
- // Returned logs are sorted in oldest to newest order.
- // TODO(#59902): Leverage this function to support log rotation in `kubectl logs`.
- func GetAllLogs(log string) ([]string, error) {
- // pattern is used to match all rotated files.
- pattern := fmt.Sprintf("%s.*", log)
- logs, err := filepath.Glob(pattern)
- if err != nil {
- return nil, fmt.Errorf("failed to list all log files with pattern %q: %v", pattern, err)
- }
- inuse, _ := filterUnusedLogs(logs)
- sort.Strings(inuse)
- return append(inuse, log), nil
- }
- // compressReadCloser wraps gzip.Reader with a function to close file handler.
- type compressReadCloser struct {
- f *os.File
- *gzip.Reader
- }
- func (rc *compressReadCloser) Close() error {
- ferr := rc.f.Close()
- rerr := rc.Reader.Close()
- if ferr != nil {
- return ferr
- }
- if rerr != nil {
- return rerr
- }
- return nil
- }
- // UncompressLog compresses a compressed log and return a readcloser for the
- // stream of the uncompressed content.
- // TODO(#59902): Leverage this function to support log rotation in `kubectl logs`.
- func UncompressLog(log string) (_ io.ReadCloser, retErr error) {
- if !strings.HasSuffix(log, compressSuffix) {
- return nil, fmt.Errorf("log is not compressed")
- }
- f, err := os.Open(log)
- if err != nil {
- return nil, fmt.Errorf("failed to open log: %v", err)
- }
- defer func() {
- if retErr != nil {
- f.Close()
- }
- }()
- r, err := gzip.NewReader(f)
- if err != nil {
- return nil, fmt.Errorf("failed to create gzip reader: %v", err)
- }
- return &compressReadCloser{f: f, Reader: r}, nil
- }
- // parseMaxSize parses quantity string to int64 max size in bytes.
- func parseMaxSize(size string) (int64, error) {
- quantity, err := resource.ParseQuantity(size)
- if err != nil {
- return 0, err
- }
- maxSize, ok := quantity.AsInt64()
- if !ok {
- return 0, fmt.Errorf("invalid max log size")
- }
- if maxSize < 0 {
- return 0, fmt.Errorf("negative max log size %d", maxSize)
- }
- return maxSize, nil
- }
- type containerLogManager struct {
- runtimeService internalapi.RuntimeService
- policy LogRotatePolicy
- clock clock.Clock
- }
- // NewContainerLogManager creates a new container log manager.
- func NewContainerLogManager(runtimeService internalapi.RuntimeService, maxSize string, maxFiles int) (ContainerLogManager, error) {
- if maxFiles <= 1 {
- return nil, fmt.Errorf("invalid MaxFiles %d, must be > 1", maxFiles)
- }
- parsedMaxSize, err := parseMaxSize(maxSize)
- if err != nil {
- return nil, fmt.Errorf("failed to parse container log max size %q: %v", maxSize, err)
- }
- // policy LogRotatePolicy
- return &containerLogManager{
- runtimeService: runtimeService,
- policy: LogRotatePolicy{
- MaxSize: parsedMaxSize,
- MaxFiles: maxFiles,
- },
- clock: clock.RealClock{},
- }, nil
- }
- // Start the container log manager.
- func (c *containerLogManager) Start() {
- // Start a goroutine periodically does container log rotation.
- go wait.Forever(func() {
- if err := c.rotateLogs(); err != nil {
- klog.Errorf("Failed to rotate container logs: %v", err)
- }
- }, logMonitorPeriod)
- }
- func (c *containerLogManager) rotateLogs() error {
- // TODO(#59998): Use kubelet pod cache.
- containers, err := c.runtimeService.ListContainers(&runtimeapi.ContainerFilter{})
- if err != nil {
- return fmt.Errorf("failed to list containers: %v", err)
- }
- // NOTE(random-liu): Figure out whether we need to rotate container logs in parallel.
- for _, container := range containers {
- // Only rotate logs for running containers. Non-running containers won't
- // generate new output, it doesn't make sense to keep an empty latest log.
- if container.GetState() != runtimeapi.ContainerState_CONTAINER_RUNNING {
- continue
- }
- id := container.GetId()
- // Note that we should not block log rotate for an error of a single container.
- status, err := c.runtimeService.ContainerStatus(id)
- if err != nil {
- klog.Errorf("Failed to get container status for %q: %v", id, err)
- continue
- }
- path := status.GetLogPath()
- info, err := os.Stat(path)
- if err != nil {
- if !os.IsNotExist(err) {
- klog.Errorf("Failed to stat container log %q: %v", path, err)
- continue
- }
- // In rotateLatestLog, there are several cases that we may
- // lose original container log after ReopenContainerLog fails.
- // We try to recover it by reopening container log.
- if err := c.runtimeService.ReopenContainerLog(id); err != nil {
- klog.Errorf("Container %q log %q doesn't exist, reopen container log failed: %v", id, path, err)
- continue
- }
- // The container log should be recovered.
- info, err = os.Stat(path)
- if err != nil {
- klog.Errorf("Failed to stat container log %q after reopen: %v", path, err)
- continue
- }
- }
- if info.Size() < c.policy.MaxSize {
- continue
- }
- // Perform log rotation.
- if err := c.rotateLog(id, path); err != nil {
- klog.Errorf("Failed to rotate log %q for container %q: %v", path, id, err)
- continue
- }
- }
- return nil
- }
- func (c *containerLogManager) rotateLog(id, log string) error {
- // pattern is used to match all rotated files.
- pattern := fmt.Sprintf("%s.*", log)
- logs, err := filepath.Glob(pattern)
- if err != nil {
- return fmt.Errorf("failed to list all log files with pattern %q: %v", pattern, err)
- }
- logs, err = c.cleanupUnusedLogs(logs)
- if err != nil {
- return fmt.Errorf("failed to cleanup logs: %v", err)
- }
- logs, err = c.removeExcessLogs(logs)
- if err != nil {
- return fmt.Errorf("failed to remove excess logs: %v", err)
- }
- // Compress uncompressed log files.
- for _, l := range logs {
- if strings.HasSuffix(l, compressSuffix) {
- continue
- }
- if err := c.compressLog(l); err != nil {
- return fmt.Errorf("failed to compress log %q: %v", l, err)
- }
- }
- if err := c.rotateLatestLog(id, log); err != nil {
- return fmt.Errorf("failed to rotate log %q: %v", log, err)
- }
- return nil
- }
- // cleanupUnusedLogs cleans up temporary or unused log files generated by previous log rotation
- // failure.
- func (c *containerLogManager) cleanupUnusedLogs(logs []string) ([]string, error) {
- inuse, unused := filterUnusedLogs(logs)
- for _, l := range unused {
- if err := os.Remove(l); err != nil {
- return nil, fmt.Errorf("failed to remove unused log %q: %v", l, err)
- }
- }
- return inuse, nil
- }
- // filterUnusedLogs splits logs into 2 groups, the 1st group is in used logs,
- // the second group is unused logs.
- func filterUnusedLogs(logs []string) (inuse []string, unused []string) {
- for _, l := range logs {
- if isInUse(l, logs) {
- inuse = append(inuse, l)
- } else {
- unused = append(unused, l)
- }
- }
- return inuse, unused
- }
- // isInUse checks whether a container log file is still inuse.
- func isInUse(l string, logs []string) bool {
- // All temporary files are not in use.
- if strings.HasSuffix(l, tmpSuffix) {
- return false
- }
- // All compressed logs are in use.
- if strings.HasSuffix(l, compressSuffix) {
- return true
- }
- // Files has already been compressed are not in use.
- for _, another := range logs {
- if l+compressSuffix == another {
- return false
- }
- }
- return true
- }
- // removeExcessLogs removes old logs to make sure there are only at most MaxFiles log files.
- func (c *containerLogManager) removeExcessLogs(logs []string) ([]string, error) {
- // Sort log files in oldest to newest order.
- sort.Strings(logs)
- // Container will create a new log file, and we'll rotate the latest log file.
- // Other than those 2 files, we can have at most MaxFiles-2 rotated log files.
- // Keep MaxFiles-2 files by removing old files.
- // We should remove from oldest to newest, so as not to break ongoing `kubectl logs`.
- maxRotatedFiles := c.policy.MaxFiles - 2
- if maxRotatedFiles < 0 {
- maxRotatedFiles = 0
- }
- i := 0
- for ; i < len(logs)-maxRotatedFiles; i++ {
- if err := os.Remove(logs[i]); err != nil {
- return nil, fmt.Errorf("failed to remove old log %q: %v", logs[i], err)
- }
- }
- logs = logs[i:]
- return logs, nil
- }
- // compressLog compresses a log to log.gz with gzip.
- func (c *containerLogManager) compressLog(log string) error {
- r, err := os.Open(log)
- if err != nil {
- return fmt.Errorf("failed to open log %q: %v", log, err)
- }
- defer r.Close()
- tmpLog := log + tmpSuffix
- f, err := os.OpenFile(tmpLog, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
- if err != nil {
- return fmt.Errorf("failed to create temporary log %q: %v", tmpLog, err)
- }
- defer func() {
- // Best effort cleanup of tmpLog.
- os.Remove(tmpLog)
- }()
- defer f.Close()
- w := gzip.NewWriter(f)
- defer w.Close()
- if _, err := io.Copy(w, r); err != nil {
- return fmt.Errorf("failed to compress %q to %q: %v", log, tmpLog, err)
- }
- compressedLog := log + compressSuffix
- if err := os.Rename(tmpLog, compressedLog); err != nil {
- return fmt.Errorf("failed to rename %q to %q: %v", tmpLog, compressedLog, err)
- }
- // Remove old log file.
- if err := os.Remove(log); err != nil {
- return fmt.Errorf("failed to remove log %q after compress: %v", log, err)
- }
- return nil
- }
- // rotateLatestLog rotates latest log without compression, so that container can still write
- // and fluentd can finish reading.
- func (c *containerLogManager) rotateLatestLog(id, log string) error {
- timestamp := c.clock.Now().Format(timestampFormat)
- rotated := fmt.Sprintf("%s.%s", log, timestamp)
- if err := os.Rename(log, rotated); err != nil {
- return fmt.Errorf("failed to rotate log %q to %q: %v", log, rotated, err)
- }
- if err := c.runtimeService.ReopenContainerLog(id); err != nil {
- // Rename the rotated log back, so that we can try rotating it again
- // next round.
- // If kubelet gets restarted at this point, we'll lose original log.
- if renameErr := os.Rename(rotated, log); renameErr != nil {
- // This shouldn't happen.
- // Report an error if this happens, because we will lose original
- // log.
- klog.Errorf("Failed to rename rotated log %q back to %q: %v, reopen container log error: %v", rotated, log, renameErr, err)
- }
- return fmt.Errorf("failed to reopen container log %q: %v", id, err)
- }
- return nil
- }
|