file.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package config
  14. import (
  15. "fmt"
  16. "os"
  17. "path/filepath"
  18. "sort"
  19. "strings"
  20. "time"
  21. "k8s.io/klog"
  22. "k8s.io/api/core/v1"
  23. "k8s.io/apimachinery/pkg/types"
  24. "k8s.io/client-go/tools/cache"
  25. api "k8s.io/kubernetes/pkg/apis/core"
  26. kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
  27. utilio "k8s.io/utils/io"
  28. )
  29. type podEventType int
  30. const (
  31. podAdd podEventType = iota
  32. podModify
  33. podDelete
  34. eventBufferLen = 10
  35. )
  36. type watchEvent struct {
  37. fileName string
  38. eventType podEventType
  39. }
  40. type sourceFile struct {
  41. path string
  42. nodeName types.NodeName
  43. period time.Duration
  44. store cache.Store
  45. fileKeyMapping map[string]string
  46. updates chan<- interface{}
  47. watchEvents chan *watchEvent
  48. }
  49. // NewSourceFile watches a config file for changes.
  50. func NewSourceFile(path string, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) {
  51. // "github.com/sigma/go-inotify" requires a path without trailing "/"
  52. path = strings.TrimRight(path, string(os.PathSeparator))
  53. config := newSourceFile(path, nodeName, period, updates)
  54. klog.V(1).Infof("Watching path %q", path)
  55. config.run()
  56. }
  57. func newSourceFile(path string, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) *sourceFile {
  58. send := func(objs []interface{}) {
  59. var pods []*v1.Pod
  60. for _, o := range objs {
  61. pods = append(pods, o.(*v1.Pod))
  62. }
  63. updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.FileSource}
  64. }
  65. store := cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc)
  66. return &sourceFile{
  67. path: path,
  68. nodeName: nodeName,
  69. period: period,
  70. store: store,
  71. fileKeyMapping: map[string]string{},
  72. updates: updates,
  73. watchEvents: make(chan *watchEvent, eventBufferLen),
  74. }
  75. }
  76. func (s *sourceFile) run() {
  77. listTicker := time.NewTicker(s.period)
  78. go func() {
  79. // Read path immediately to speed up startup.
  80. if err := s.listConfig(); err != nil {
  81. klog.Errorf("Unable to read config path %q: %v", s.path, err)
  82. }
  83. for {
  84. select {
  85. case <-listTicker.C:
  86. if err := s.listConfig(); err != nil {
  87. klog.Errorf("Unable to read config path %q: %v", s.path, err)
  88. }
  89. case e := <-s.watchEvents:
  90. if err := s.consumeWatchEvent(e); err != nil {
  91. klog.Errorf("Unable to process watch event: %v", err)
  92. }
  93. }
  94. }
  95. }()
  96. s.startWatch()
  97. }
  98. func (s *sourceFile) applyDefaults(pod *api.Pod, source string) error {
  99. return applyDefaults(pod, source, true, s.nodeName)
  100. }
  101. func (s *sourceFile) listConfig() error {
  102. path := s.path
  103. statInfo, err := os.Stat(path)
  104. if err != nil {
  105. if !os.IsNotExist(err) {
  106. return err
  107. }
  108. // Emit an update with an empty PodList to allow FileSource to be marked as seen
  109. s.updates <- kubetypes.PodUpdate{Pods: []*v1.Pod{}, Op: kubetypes.SET, Source: kubetypes.FileSource}
  110. return fmt.Errorf("path does not exist, ignoring")
  111. }
  112. switch {
  113. case statInfo.Mode().IsDir():
  114. pods, err := s.extractFromDir(path)
  115. if err != nil {
  116. return err
  117. }
  118. if len(pods) == 0 {
  119. // Emit an update with an empty PodList to allow FileSource to be marked as seen
  120. s.updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.FileSource}
  121. return nil
  122. }
  123. return s.replaceStore(pods...)
  124. case statInfo.Mode().IsRegular():
  125. pod, err := s.extractFromFile(path)
  126. if err != nil {
  127. return err
  128. }
  129. return s.replaceStore(pod)
  130. default:
  131. return fmt.Errorf("path is not a directory or file")
  132. }
  133. }
  134. // Get as many pod manifests as we can from a directory. Return an error if and only if something
  135. // prevented us from reading anything at all. Do not return an error if only some files
  136. // were problematic.
  137. func (s *sourceFile) extractFromDir(name string) ([]*v1.Pod, error) {
  138. dirents, err := filepath.Glob(filepath.Join(name, "[^.]*"))
  139. if err != nil {
  140. return nil, fmt.Errorf("glob failed: %v", err)
  141. }
  142. pods := make([]*v1.Pod, 0)
  143. if len(dirents) == 0 {
  144. return pods, nil
  145. }
  146. sort.Strings(dirents)
  147. for _, path := range dirents {
  148. statInfo, err := os.Stat(path)
  149. if err != nil {
  150. klog.Errorf("Can't get metadata for %q: %v", path, err)
  151. continue
  152. }
  153. switch {
  154. case statInfo.Mode().IsDir():
  155. klog.Errorf("Not recursing into manifest path %q", path)
  156. case statInfo.Mode().IsRegular():
  157. pod, err := s.extractFromFile(path)
  158. if err != nil {
  159. if !os.IsNotExist(err) {
  160. klog.Errorf("Can't process manifest file %q: %v", path, err)
  161. }
  162. } else {
  163. pods = append(pods, pod)
  164. }
  165. default:
  166. klog.Errorf("Manifest path %q is not a directory or file: %v", path, statInfo.Mode())
  167. }
  168. }
  169. return pods, nil
  170. }
  171. // extractFromFile parses a file for Pod configuration information.
  172. func (s *sourceFile) extractFromFile(filename string) (pod *v1.Pod, err error) {
  173. klog.V(3).Infof("Reading config file %q", filename)
  174. defer func() {
  175. if err == nil && pod != nil {
  176. objKey, keyErr := cache.MetaNamespaceKeyFunc(pod)
  177. if keyErr != nil {
  178. err = keyErr
  179. return
  180. }
  181. s.fileKeyMapping[filename] = objKey
  182. }
  183. }()
  184. file, err := os.Open(filename)
  185. if err != nil {
  186. return pod, err
  187. }
  188. defer file.Close()
  189. data, err := utilio.ReadAtMost(file, maxConfigLength)
  190. if err != nil {
  191. return pod, err
  192. }
  193. defaultFn := func(pod *api.Pod) error {
  194. return s.applyDefaults(pod, filename)
  195. }
  196. parsed, pod, podErr := tryDecodeSinglePod(data, defaultFn)
  197. if parsed {
  198. if podErr != nil {
  199. return pod, podErr
  200. }
  201. return pod, nil
  202. }
  203. return pod, fmt.Errorf("%v: couldn't parse as pod(%v), please check config file", filename, podErr)
  204. }
  205. func (s *sourceFile) replaceStore(pods ...*v1.Pod) (err error) {
  206. objs := []interface{}{}
  207. for _, pod := range pods {
  208. objs = append(objs, pod)
  209. }
  210. return s.store.Replace(objs, "")
  211. }