container_log_manager.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388
  1. /*
  2. Copyright 2018 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package logs
  14. import (
  15. "compress/gzip"
  16. "fmt"
  17. "io"
  18. "os"
  19. "path/filepath"
  20. "sort"
  21. "strings"
  22. "time"
  23. "k8s.io/klog"
  24. "k8s.io/apimachinery/pkg/api/resource"
  25. "k8s.io/apimachinery/pkg/util/clock"
  26. "k8s.io/apimachinery/pkg/util/wait"
  27. internalapi "k8s.io/cri-api/pkg/apis"
  28. runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
  29. )
  30. const (
  31. // logMonitorPeriod is the period container log manager monitors
  32. // container logs and performs log rotation.
  33. logMonitorPeriod = 10 * time.Second
  34. // timestampFormat is format of the timestamp suffix for rotated log.
  35. // See https://golang.org/pkg/time/#Time.Format.
  36. timestampFormat = "20060102-150405"
  37. // compressSuffix is the suffix for compressed log.
  38. compressSuffix = ".gz"
  39. // tmpSuffix is the suffix for temporary file.
  40. tmpSuffix = ".tmp"
  41. )
  42. // ContainerLogManager manages lifecycle of all container logs.
  43. //
  44. // Implementation is thread-safe.
  45. type ContainerLogManager interface {
  46. // TODO(random-liu): Add RotateLogs function and call it under disk pressure.
  47. // Start container log manager.
  48. Start()
  49. }
  50. // LogRotatePolicy is a policy for container log rotation. The policy applies to all
  51. // containers managed by kubelet.
  52. type LogRotatePolicy struct {
  53. // MaxSize in bytes of the container log file before it is rotated. Negative
  54. // number means to disable container log rotation.
  55. MaxSize int64
  56. // MaxFiles is the maximum number of log files that can be present.
  57. // If rotating the logs creates excess files, the oldest file is removed.
  58. MaxFiles int
  59. }
  60. // GetAllLogs gets all inuse (rotated/compressed) logs for a specific container log.
  61. // Returned logs are sorted in oldest to newest order.
  62. // TODO(#59902): Leverage this function to support log rotation in `kubectl logs`.
  63. func GetAllLogs(log string) ([]string, error) {
  64. // pattern is used to match all rotated files.
  65. pattern := fmt.Sprintf("%s.*", log)
  66. logs, err := filepath.Glob(pattern)
  67. if err != nil {
  68. return nil, fmt.Errorf("failed to list all log files with pattern %q: %v", pattern, err)
  69. }
  70. inuse, _ := filterUnusedLogs(logs)
  71. sort.Strings(inuse)
  72. return append(inuse, log), nil
  73. }
  74. // compressReadCloser wraps gzip.Reader with a function to close file handler.
  75. type compressReadCloser struct {
  76. f *os.File
  77. *gzip.Reader
  78. }
  79. func (rc *compressReadCloser) Close() error {
  80. ferr := rc.f.Close()
  81. rerr := rc.Reader.Close()
  82. if ferr != nil {
  83. return ferr
  84. }
  85. if rerr != nil {
  86. return rerr
  87. }
  88. return nil
  89. }
  90. // UncompressLog compresses a compressed log and return a readcloser for the
  91. // stream of the uncompressed content.
  92. // TODO(#59902): Leverage this function to support log rotation in `kubectl logs`.
  93. func UncompressLog(log string) (_ io.ReadCloser, retErr error) {
  94. if !strings.HasSuffix(log, compressSuffix) {
  95. return nil, fmt.Errorf("log is not compressed")
  96. }
  97. f, err := os.Open(log)
  98. if err != nil {
  99. return nil, fmt.Errorf("failed to open log: %v", err)
  100. }
  101. defer func() {
  102. if retErr != nil {
  103. f.Close()
  104. }
  105. }()
  106. r, err := gzip.NewReader(f)
  107. if err != nil {
  108. return nil, fmt.Errorf("failed to create gzip reader: %v", err)
  109. }
  110. return &compressReadCloser{f: f, Reader: r}, nil
  111. }
  112. // parseMaxSize parses quantity string to int64 max size in bytes.
  113. func parseMaxSize(size string) (int64, error) {
  114. quantity, err := resource.ParseQuantity(size)
  115. if err != nil {
  116. return 0, err
  117. }
  118. maxSize, ok := quantity.AsInt64()
  119. if !ok {
  120. return 0, fmt.Errorf("invalid max log size")
  121. }
  122. if maxSize < 0 {
  123. return 0, fmt.Errorf("negative max log size %d", maxSize)
  124. }
  125. return maxSize, nil
  126. }
  127. type containerLogManager struct {
  128. runtimeService internalapi.RuntimeService
  129. policy LogRotatePolicy
  130. clock clock.Clock
  131. }
  132. // NewContainerLogManager creates a new container log manager.
  133. func NewContainerLogManager(runtimeService internalapi.RuntimeService, maxSize string, maxFiles int) (ContainerLogManager, error) {
  134. if maxFiles <= 1 {
  135. return nil, fmt.Errorf("invalid MaxFiles %d, must be > 1", maxFiles)
  136. }
  137. parsedMaxSize, err := parseMaxSize(maxSize)
  138. if err != nil {
  139. return nil, fmt.Errorf("failed to parse container log max size %q: %v", maxSize, err)
  140. }
  141. // policy LogRotatePolicy
  142. return &containerLogManager{
  143. runtimeService: runtimeService,
  144. policy: LogRotatePolicy{
  145. MaxSize: parsedMaxSize,
  146. MaxFiles: maxFiles,
  147. },
  148. clock: clock.RealClock{},
  149. }, nil
  150. }
  151. // Start the container log manager.
  152. func (c *containerLogManager) Start() {
  153. // Start a goroutine periodically does container log rotation.
  154. go wait.Forever(func() {
  155. if err := c.rotateLogs(); err != nil {
  156. klog.Errorf("Failed to rotate container logs: %v", err)
  157. }
  158. }, logMonitorPeriod)
  159. }
  160. func (c *containerLogManager) rotateLogs() error {
  161. // TODO(#59998): Use kubelet pod cache.
  162. containers, err := c.runtimeService.ListContainers(&runtimeapi.ContainerFilter{})
  163. if err != nil {
  164. return fmt.Errorf("failed to list containers: %v", err)
  165. }
  166. // NOTE(random-liu): Figure out whether we need to rotate container logs in parallel.
  167. for _, container := range containers {
  168. // Only rotate logs for running containers. Non-running containers won't
  169. // generate new output, it doesn't make sense to keep an empty latest log.
  170. if container.GetState() != runtimeapi.ContainerState_CONTAINER_RUNNING {
  171. continue
  172. }
  173. id := container.GetId()
  174. // Note that we should not block log rotate for an error of a single container.
  175. status, err := c.runtimeService.ContainerStatus(id)
  176. if err != nil {
  177. klog.Errorf("Failed to get container status for %q: %v", id, err)
  178. continue
  179. }
  180. path := status.GetLogPath()
  181. info, err := os.Stat(path)
  182. if err != nil {
  183. if !os.IsNotExist(err) {
  184. klog.Errorf("Failed to stat container log %q: %v", path, err)
  185. continue
  186. }
  187. // In rotateLatestLog, there are several cases that we may
  188. // lose original container log after ReopenContainerLog fails.
  189. // We try to recover it by reopening container log.
  190. if err := c.runtimeService.ReopenContainerLog(id); err != nil {
  191. klog.Errorf("Container %q log %q doesn't exist, reopen container log failed: %v", id, path, err)
  192. continue
  193. }
  194. // The container log should be recovered.
  195. info, err = os.Stat(path)
  196. if err != nil {
  197. klog.Errorf("Failed to stat container log %q after reopen: %v", path, err)
  198. continue
  199. }
  200. }
  201. if info.Size() < c.policy.MaxSize {
  202. continue
  203. }
  204. // Perform log rotation.
  205. if err := c.rotateLog(id, path); err != nil {
  206. klog.Errorf("Failed to rotate log %q for container %q: %v", path, id, err)
  207. continue
  208. }
  209. }
  210. return nil
  211. }
  212. func (c *containerLogManager) rotateLog(id, log string) error {
  213. // pattern is used to match all rotated files.
  214. pattern := fmt.Sprintf("%s.*", log)
  215. logs, err := filepath.Glob(pattern)
  216. if err != nil {
  217. return fmt.Errorf("failed to list all log files with pattern %q: %v", pattern, err)
  218. }
  219. logs, err = c.cleanupUnusedLogs(logs)
  220. if err != nil {
  221. return fmt.Errorf("failed to cleanup logs: %v", err)
  222. }
  223. logs, err = c.removeExcessLogs(logs)
  224. if err != nil {
  225. return fmt.Errorf("failed to remove excess logs: %v", err)
  226. }
  227. // Compress uncompressed log files.
  228. for _, l := range logs {
  229. if strings.HasSuffix(l, compressSuffix) {
  230. continue
  231. }
  232. if err := c.compressLog(l); err != nil {
  233. return fmt.Errorf("failed to compress log %q: %v", l, err)
  234. }
  235. }
  236. if err := c.rotateLatestLog(id, log); err != nil {
  237. return fmt.Errorf("failed to rotate log %q: %v", log, err)
  238. }
  239. return nil
  240. }
  241. // cleanupUnusedLogs cleans up temporary or unused log files generated by previous log rotation
  242. // failure.
  243. func (c *containerLogManager) cleanupUnusedLogs(logs []string) ([]string, error) {
  244. inuse, unused := filterUnusedLogs(logs)
  245. for _, l := range unused {
  246. if err := os.Remove(l); err != nil {
  247. return nil, fmt.Errorf("failed to remove unused log %q: %v", l, err)
  248. }
  249. }
  250. return inuse, nil
  251. }
  252. // filterUnusedLogs splits logs into 2 groups, the 1st group is in used logs,
  253. // the second group is unused logs.
  254. func filterUnusedLogs(logs []string) (inuse []string, unused []string) {
  255. for _, l := range logs {
  256. if isInUse(l, logs) {
  257. inuse = append(inuse, l)
  258. } else {
  259. unused = append(unused, l)
  260. }
  261. }
  262. return inuse, unused
  263. }
  264. // isInUse checks whether a container log file is still inuse.
  265. func isInUse(l string, logs []string) bool {
  266. // All temporary files are not in use.
  267. if strings.HasSuffix(l, tmpSuffix) {
  268. return false
  269. }
  270. // All compressed logs are in use.
  271. if strings.HasSuffix(l, compressSuffix) {
  272. return true
  273. }
  274. // Files has already been compressed are not in use.
  275. for _, another := range logs {
  276. if l+compressSuffix == another {
  277. return false
  278. }
  279. }
  280. return true
  281. }
  282. // removeExcessLogs removes old logs to make sure there are only at most MaxFiles log files.
  283. func (c *containerLogManager) removeExcessLogs(logs []string) ([]string, error) {
  284. // Sort log files in oldest to newest order.
  285. sort.Strings(logs)
  286. // Container will create a new log file, and we'll rotate the latest log file.
  287. // Other than those 2 files, we can have at most MaxFiles-2 rotated log files.
  288. // Keep MaxFiles-2 files by removing old files.
  289. // We should remove from oldest to newest, so as not to break ongoing `kubectl logs`.
  290. maxRotatedFiles := c.policy.MaxFiles - 2
  291. if maxRotatedFiles < 0 {
  292. maxRotatedFiles = 0
  293. }
  294. i := 0
  295. for ; i < len(logs)-maxRotatedFiles; i++ {
  296. if err := os.Remove(logs[i]); err != nil {
  297. return nil, fmt.Errorf("failed to remove old log %q: %v", logs[i], err)
  298. }
  299. }
  300. logs = logs[i:]
  301. return logs, nil
  302. }
  303. // compressLog compresses a log to log.gz with gzip.
  304. func (c *containerLogManager) compressLog(log string) error {
  305. r, err := os.Open(log)
  306. if err != nil {
  307. return fmt.Errorf("failed to open log %q: %v", log, err)
  308. }
  309. defer r.Close()
  310. tmpLog := log + tmpSuffix
  311. f, err := os.OpenFile(tmpLog, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
  312. if err != nil {
  313. return fmt.Errorf("failed to create temporary log %q: %v", tmpLog, err)
  314. }
  315. defer func() {
  316. // Best effort cleanup of tmpLog.
  317. os.Remove(tmpLog)
  318. }()
  319. defer f.Close()
  320. w := gzip.NewWriter(f)
  321. defer w.Close()
  322. if _, err := io.Copy(w, r); err != nil {
  323. return fmt.Errorf("failed to compress %q to %q: %v", log, tmpLog, err)
  324. }
  325. compressedLog := log + compressSuffix
  326. if err := os.Rename(tmpLog, compressedLog); err != nil {
  327. return fmt.Errorf("failed to rename %q to %q: %v", tmpLog, compressedLog, err)
  328. }
  329. // Remove old log file.
  330. if err := os.Remove(log); err != nil {
  331. return fmt.Errorf("failed to remove log %q after compress: %v", log, err)
  332. }
  333. return nil
  334. }
  335. // rotateLatestLog rotates latest log without compression, so that container can still write
  336. // and fluentd can finish reading.
  337. func (c *containerLogManager) rotateLatestLog(id, log string) error {
  338. timestamp := c.clock.Now().Format(timestampFormat)
  339. rotated := fmt.Sprintf("%s.%s", log, timestamp)
  340. if err := os.Rename(log, rotated); err != nil {
  341. return fmt.Errorf("failed to rotate log %q to %q: %v", log, rotated, err)
  342. }
  343. if err := c.runtimeService.ReopenContainerLog(id); err != nil {
  344. // Rename the rotated log back, so that we can try rotating it again
  345. // next round.
  346. // If kubelet gets restarted at this point, we'll lose original log.
  347. if renameErr := os.Rename(rotated, log); renameErr != nil {
  348. // This shouldn't happen.
  349. // Report an error if this happens, because we will lose original
  350. // log.
  351. klog.Errorf("Failed to rename rotated log %q back to %q: %v, reopen container log error: %v", rotated, log, renameErr, err)
  352. }
  353. return fmt.Errorf("failed to reopen container log %q: %v", id, err)
  354. }
  355. return nil
  356. }