123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246 |
- /*
- Copyright 2016 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package config
- import (
- "fmt"
- "os"
- "path/filepath"
- "sort"
- "strings"
- "time"
- "k8s.io/klog"
- "k8s.io/api/core/v1"
- "k8s.io/apimachinery/pkg/types"
- "k8s.io/client-go/tools/cache"
- api "k8s.io/kubernetes/pkg/apis/core"
- kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
- utilio "k8s.io/utils/io"
- )
- type podEventType int
- const (
- podAdd podEventType = iota
- podModify
- podDelete
- eventBufferLen = 10
- )
- type watchEvent struct {
- fileName string
- eventType podEventType
- }
- type sourceFile struct {
- path string
- nodeName types.NodeName
- period time.Duration
- store cache.Store
- fileKeyMapping map[string]string
- updates chan<- interface{}
- watchEvents chan *watchEvent
- }
- // NewSourceFile watches a config file for changes.
- func NewSourceFile(path string, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) {
- // "github.com/sigma/go-inotify" requires a path without trailing "/"
- path = strings.TrimRight(path, string(os.PathSeparator))
- config := newSourceFile(path, nodeName, period, updates)
- klog.V(1).Infof("Watching path %q", path)
- config.run()
- }
- func newSourceFile(path string, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) *sourceFile {
- send := func(objs []interface{}) {
- var pods []*v1.Pod
- for _, o := range objs {
- pods = append(pods, o.(*v1.Pod))
- }
- updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.FileSource}
- }
- store := cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc)
- return &sourceFile{
- path: path,
- nodeName: nodeName,
- period: period,
- store: store,
- fileKeyMapping: map[string]string{},
- updates: updates,
- watchEvents: make(chan *watchEvent, eventBufferLen),
- }
- }
- func (s *sourceFile) run() {
- listTicker := time.NewTicker(s.period)
- go func() {
- // Read path immediately to speed up startup.
- if err := s.listConfig(); err != nil {
- klog.Errorf("Unable to read config path %q: %v", s.path, err)
- }
- for {
- select {
- case <-listTicker.C:
- if err := s.listConfig(); err != nil {
- klog.Errorf("Unable to read config path %q: %v", s.path, err)
- }
- case e := <-s.watchEvents:
- if err := s.consumeWatchEvent(e); err != nil {
- klog.Errorf("Unable to process watch event: %v", err)
- }
- }
- }
- }()
- s.startWatch()
- }
- func (s *sourceFile) applyDefaults(pod *api.Pod, source string) error {
- return applyDefaults(pod, source, true, s.nodeName)
- }
- func (s *sourceFile) listConfig() error {
- path := s.path
- statInfo, err := os.Stat(path)
- if err != nil {
- if !os.IsNotExist(err) {
- return err
- }
- // Emit an update with an empty PodList to allow FileSource to be marked as seen
- s.updates <- kubetypes.PodUpdate{Pods: []*v1.Pod{}, Op: kubetypes.SET, Source: kubetypes.FileSource}
- return fmt.Errorf("path does not exist, ignoring")
- }
- switch {
- case statInfo.Mode().IsDir():
- pods, err := s.extractFromDir(path)
- if err != nil {
- return err
- }
- if len(pods) == 0 {
- // Emit an update with an empty PodList to allow FileSource to be marked as seen
- s.updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.FileSource}
- return nil
- }
- return s.replaceStore(pods...)
- case statInfo.Mode().IsRegular():
- pod, err := s.extractFromFile(path)
- if err != nil {
- return err
- }
- return s.replaceStore(pod)
- default:
- return fmt.Errorf("path is not a directory or file")
- }
- }
- // Get as many pod manifests as we can from a directory. Return an error if and only if something
- // prevented us from reading anything at all. Do not return an error if only some files
- // were problematic.
- func (s *sourceFile) extractFromDir(name string) ([]*v1.Pod, error) {
- dirents, err := filepath.Glob(filepath.Join(name, "[^.]*"))
- if err != nil {
- return nil, fmt.Errorf("glob failed: %v", err)
- }
- pods := make([]*v1.Pod, 0)
- if len(dirents) == 0 {
- return pods, nil
- }
- sort.Strings(dirents)
- for _, path := range dirents {
- statInfo, err := os.Stat(path)
- if err != nil {
- klog.Errorf("Can't get metadata for %q: %v", path, err)
- continue
- }
- switch {
- case statInfo.Mode().IsDir():
- klog.Errorf("Not recursing into manifest path %q", path)
- case statInfo.Mode().IsRegular():
- pod, err := s.extractFromFile(path)
- if err != nil {
- if !os.IsNotExist(err) {
- klog.Errorf("Can't process manifest file %q: %v", path, err)
- }
- } else {
- pods = append(pods, pod)
- }
- default:
- klog.Errorf("Manifest path %q is not a directory or file: %v", path, statInfo.Mode())
- }
- }
- return pods, nil
- }
- // extractFromFile parses a file for Pod configuration information.
- func (s *sourceFile) extractFromFile(filename string) (pod *v1.Pod, err error) {
- klog.V(3).Infof("Reading config file %q", filename)
- defer func() {
- if err == nil && pod != nil {
- objKey, keyErr := cache.MetaNamespaceKeyFunc(pod)
- if keyErr != nil {
- err = keyErr
- return
- }
- s.fileKeyMapping[filename] = objKey
- }
- }()
- file, err := os.Open(filename)
- if err != nil {
- return pod, err
- }
- defer file.Close()
- data, err := utilio.ReadAtMost(file, maxConfigLength)
- if err != nil {
- return pod, err
- }
- defaultFn := func(pod *api.Pod) error {
- return s.applyDefaults(pod, filename)
- }
- parsed, pod, podErr := tryDecodeSinglePod(data, defaultFn)
- if parsed {
- if podErr != nil {
- return pod, podErr
- }
- return pod, nil
- }
- return pod, fmt.Errorf("%v: couldn't parse as pod(%v), please check config file", filename, podErr)
- }
- func (s *sourceFile) replaceStore(pods ...*v1.Pod) (err error) {
- objs := []interface{}{}
- for _, pod := range pods {
- objs = append(objs, pod)
- }
- return s.store.Replace(objs, "")
- }
|