file.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // Reads the pod configuration from file or a directory of files.
  14. package config
  15. import (
  16. "fmt"
  17. "io/ioutil"
  18. "os"
  19. "path/filepath"
  20. "sort"
  21. "strings"
  22. "time"
  23. "k8s.io/klog"
  24. "k8s.io/api/core/v1"
  25. "k8s.io/apimachinery/pkg/types"
  26. "k8s.io/client-go/tools/cache"
  27. api "k8s.io/kubernetes/pkg/apis/core"
  28. kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
  29. )
  30. type podEventType int
  31. const (
  32. podAdd podEventType = iota
  33. podModify
  34. podDelete
  35. eventBufferLen = 10
  36. )
  37. type watchEvent struct {
  38. fileName string
  39. eventType podEventType
  40. }
  41. type sourceFile struct {
  42. path string
  43. nodeName types.NodeName
  44. period time.Duration
  45. store cache.Store
  46. fileKeyMapping map[string]string
  47. updates chan<- interface{}
  48. watchEvents chan *watchEvent
  49. }
  50. func NewSourceFile(path string, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) {
  51. // "github.com/sigma/go-inotify" requires a path without trailing "/"
  52. path = strings.TrimRight(path, string(os.PathSeparator))
  53. config := newSourceFile(path, nodeName, period, updates)
  54. klog.V(1).Infof("Watching path %q", path)
  55. config.run()
  56. }
  57. func newSourceFile(path string, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) *sourceFile {
  58. send := func(objs []interface{}) {
  59. var pods []*v1.Pod
  60. for _, o := range objs {
  61. pods = append(pods, o.(*v1.Pod))
  62. }
  63. updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.FileSource}
  64. }
  65. store := cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc)
  66. return &sourceFile{
  67. path: path,
  68. nodeName: nodeName,
  69. period: period,
  70. store: store,
  71. fileKeyMapping: map[string]string{},
  72. updates: updates,
  73. watchEvents: make(chan *watchEvent, eventBufferLen),
  74. }
  75. }
  76. func (s *sourceFile) run() {
  77. listTicker := time.NewTicker(s.period)
  78. go func() {
  79. // Read path immediately to speed up startup.
  80. if err := s.listConfig(); err != nil {
  81. klog.Errorf("Unable to read config path %q: %v", s.path, err)
  82. }
  83. for {
  84. select {
  85. case <-listTicker.C:
  86. if err := s.listConfig(); err != nil {
  87. klog.Errorf("Unable to read config path %q: %v", s.path, err)
  88. }
  89. case e := <-s.watchEvents:
  90. if err := s.consumeWatchEvent(e); err != nil {
  91. klog.Errorf("Unable to process watch event: %v", err)
  92. }
  93. }
  94. }
  95. }()
  96. s.startWatch()
  97. }
  98. func (s *sourceFile) applyDefaults(pod *api.Pod, source string) error {
  99. return applyDefaults(pod, source, true, s.nodeName)
  100. }
  101. func (s *sourceFile) listConfig() error {
  102. path := s.path
  103. statInfo, err := os.Stat(path)
  104. if err != nil {
  105. if !os.IsNotExist(err) {
  106. return err
  107. }
  108. // Emit an update with an empty PodList to allow FileSource to be marked as seen
  109. s.updates <- kubetypes.PodUpdate{Pods: []*v1.Pod{}, Op: kubetypes.SET, Source: kubetypes.FileSource}
  110. return fmt.Errorf("path does not exist, ignoring")
  111. }
  112. switch {
  113. case statInfo.Mode().IsDir():
  114. pods, err := s.extractFromDir(path)
  115. if err != nil {
  116. return err
  117. }
  118. if len(pods) == 0 {
  119. // Emit an update with an empty PodList to allow FileSource to be marked as seen
  120. s.updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.FileSource}
  121. return nil
  122. }
  123. return s.replaceStore(pods...)
  124. case statInfo.Mode().IsRegular():
  125. pod, err := s.extractFromFile(path)
  126. if err != nil {
  127. return err
  128. }
  129. return s.replaceStore(pod)
  130. default:
  131. return fmt.Errorf("path is not a directory or file")
  132. }
  133. }
  134. // Get as many pod manifests as we can from a directory. Return an error if and only if something
  135. // prevented us from reading anything at all. Do not return an error if only some files
  136. // were problematic.
  137. func (s *sourceFile) extractFromDir(name string) ([]*v1.Pod, error) {
  138. dirents, err := filepath.Glob(filepath.Join(name, "[^.]*"))
  139. if err != nil {
  140. return nil, fmt.Errorf("glob failed: %v", err)
  141. }
  142. pods := make([]*v1.Pod, 0)
  143. if len(dirents) == 0 {
  144. return pods, nil
  145. }
  146. sort.Strings(dirents)
  147. for _, path := range dirents {
  148. statInfo, err := os.Stat(path)
  149. if err != nil {
  150. klog.Errorf("Can't get metadata for %q: %v", path, err)
  151. continue
  152. }
  153. switch {
  154. case statInfo.Mode().IsDir():
  155. klog.Errorf("Not recursing into manifest path %q", path)
  156. case statInfo.Mode().IsRegular():
  157. pod, err := s.extractFromFile(path)
  158. if err != nil {
  159. if !os.IsNotExist(err) {
  160. klog.Errorf("Can't process manifest file %q: %v", path, err)
  161. }
  162. } else {
  163. pods = append(pods, pod)
  164. }
  165. default:
  166. klog.Errorf("Manifest path %q is not a directory or file: %v", path, statInfo.Mode())
  167. }
  168. }
  169. return pods, nil
  170. }
  171. func (s *sourceFile) extractFromFile(filename string) (pod *v1.Pod, err error) {
  172. klog.V(3).Infof("Reading config file %q", filename)
  173. defer func() {
  174. if err == nil && pod != nil {
  175. objKey, keyErr := cache.MetaNamespaceKeyFunc(pod)
  176. if keyErr != nil {
  177. err = keyErr
  178. return
  179. }
  180. s.fileKeyMapping[filename] = objKey
  181. }
  182. }()
  183. file, err := os.Open(filename)
  184. if err != nil {
  185. return pod, err
  186. }
  187. defer file.Close()
  188. data, err := ioutil.ReadAll(file)
  189. if err != nil {
  190. return pod, err
  191. }
  192. defaultFn := func(pod *api.Pod) error {
  193. return s.applyDefaults(pod, filename)
  194. }
  195. parsed, pod, podErr := tryDecodeSinglePod(data, defaultFn)
  196. if parsed {
  197. if podErr != nil {
  198. return pod, podErr
  199. }
  200. return pod, nil
  201. }
  202. return pod, fmt.Errorf("%v: couldn't parse as pod(%v), please check config file.\n", filename, podErr)
  203. }
  204. func (s *sourceFile) replaceStore(pods ...*v1.Pod) (err error) {
  205. objs := []interface{}{}
  206. for _, pod := range pods {
  207. objs = append(objs, pod)
  208. }
  209. return s.store.Replace(objs, "")
  210. }