123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154 |
- // +build linux
- /*
- Copyright 2016 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package config
- import (
- "fmt"
- "os"
- "path/filepath"
- "strings"
- "time"
- "github.com/fsnotify/fsnotify"
- "k8s.io/klog"
- "k8s.io/api/core/v1"
- "k8s.io/apimachinery/pkg/util/wait"
- "k8s.io/client-go/util/flowcontrol"
- kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
- )
- const (
- retryPeriod = 1 * time.Second
- maxRetryPeriod = 20 * time.Second
- )
- type retryableError struct {
- message string
- }
- func (e *retryableError) Error() string {
- return e.message
- }
- func (s *sourceFile) startWatch() {
- backOff := flowcontrol.NewBackOff(retryPeriod, maxRetryPeriod)
- backOffID := "watch"
- go wait.Forever(func() {
- if backOff.IsInBackOffSinceUpdate(backOffID, time.Now()) {
- return
- }
- if err := s.doWatch(); err != nil {
- klog.Errorf("Unable to read config path %q: %v", s.path, err)
- if _, retryable := err.(*retryableError); !retryable {
- backOff.Next(backOffID, time.Now())
- }
- }
- }, retryPeriod)
- }
- func (s *sourceFile) doWatch() error {
- _, err := os.Stat(s.path)
- if err != nil {
- if !os.IsNotExist(err) {
- return err
- }
- // Emit an update with an empty PodList to allow FileSource to be marked as seen
- s.updates <- kubetypes.PodUpdate{Pods: []*v1.Pod{}, Op: kubetypes.SET, Source: kubetypes.FileSource}
- return &retryableError{"path does not exist, ignoring"}
- }
- w, err := fsnotify.NewWatcher()
- if err != nil {
- return fmt.Errorf("unable to create inotify: %v", err)
- }
- defer w.Close()
- err = w.Add(s.path)
- if err != nil {
- return fmt.Errorf("unable to create inotify for path %q: %v", s.path, err)
- }
- for {
- select {
- case event := <-w.Events:
- if err = s.produceWatchEvent(&event); err != nil {
- return fmt.Errorf("error while processing inotify event (%+v): %v", event, err)
- }
- case err = <-w.Errors:
- return fmt.Errorf("error while watching %q: %v", s.path, err)
- }
- }
- }
- func (s *sourceFile) produceWatchEvent(e *fsnotify.Event) error {
- // Ignore file start with dots
- if strings.HasPrefix(filepath.Base(e.Name), ".") {
- klog.V(4).Infof("Ignored pod manifest: %s, because it starts with dots", e.Name)
- return nil
- }
- var eventType podEventType
- switch {
- case (e.Op & fsnotify.Create) > 0:
- eventType = podAdd
- case (e.Op & fsnotify.Write) > 0:
- eventType = podModify
- case (e.Op & fsnotify.Chmod) > 0:
- eventType = podModify
- case (e.Op & fsnotify.Remove) > 0:
- eventType = podDelete
- case (e.Op & fsnotify.Rename) > 0:
- eventType = podDelete
- default:
- // Ignore rest events
- return nil
- }
- s.watchEvents <- &watchEvent{e.Name, eventType}
- return nil
- }
- func (s *sourceFile) consumeWatchEvent(e *watchEvent) error {
- switch e.eventType {
- case podAdd, podModify:
- pod, err := s.extractFromFile(e.fileName)
- if err != nil {
- return fmt.Errorf("can't process config file %q: %v", e.fileName, err)
- }
- return s.store.Add(pod)
- case podDelete:
- if objKey, keyExist := s.fileKeyMapping[e.fileName]; keyExist {
- pod, podExist, err := s.store.GetByKey(objKey)
- if err != nil {
- return err
- } else if !podExist {
- return fmt.Errorf("the pod with key %s doesn't exist in cache", objKey)
- } else {
- if err = s.store.Delete(pod); err != nil {
- return fmt.Errorf("failed to remove deleted pod from cache: %v", err)
- }
- delete(s.fileKeyMapping, e.fileName)
- }
- }
- }
- return nil
- }
|