plugin_watcher.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. /*
  2. Copyright 2018 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package pluginwatcher
  14. import (
  15. "fmt"
  16. "os"
  17. "path/filepath"
  18. "strings"
  19. "time"
  20. "github.com/fsnotify/fsnotify"
  21. "k8s.io/klog"
  22. "k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
  23. utilfs "k8s.io/kubernetes/pkg/util/filesystem"
  24. )
  25. // Watcher is the plugin watcher
  26. type Watcher struct {
  27. path string
  28. deprecatedPath string
  29. fs utilfs.Filesystem
  30. fsWatcher *fsnotify.Watcher
  31. stopped chan struct{}
  32. desiredStateOfWorld cache.DesiredStateOfWorld
  33. }
  34. // NewWatcher provides a new watcher
  35. // deprecatedSockDir refers to a pre-GA directory that was used by older plugins
  36. // for socket registration. New plugins should not use this directory.
  37. func NewWatcher(sockDir string, deprecatedSockDir string, desiredStateOfWorld cache.DesiredStateOfWorld) *Watcher {
  38. return &Watcher{
  39. path: sockDir,
  40. deprecatedPath: deprecatedSockDir,
  41. fs: &utilfs.DefaultFs{},
  42. desiredStateOfWorld: desiredStateOfWorld,
  43. }
  44. }
  45. // Start watches for the creation and deletion of plugin sockets at the path
  46. func (w *Watcher) Start(stopCh <-chan struct{}) error {
  47. klog.V(2).Infof("Plugin Watcher Start at %s", w.path)
  48. w.stopped = make(chan struct{})
  49. // Creating the directory to be watched if it doesn't exist yet,
  50. // and walks through the directory to discover the existing plugins.
  51. if err := w.init(); err != nil {
  52. return err
  53. }
  54. fsWatcher, err := fsnotify.NewWatcher()
  55. if err != nil {
  56. return fmt.Errorf("failed to start plugin fsWatcher, err: %v", err)
  57. }
  58. w.fsWatcher = fsWatcher
  59. // Traverse plugin dir and add filesystem watchers before starting the plugin processing goroutine.
  60. if err := w.traversePluginDir(w.path); err != nil {
  61. klog.Errorf("failed to traverse plugin socket path %q, err: %v", w.path, err)
  62. }
  63. // Traverse deprecated plugin dir, if specified.
  64. if len(w.deprecatedPath) != 0 {
  65. if err := w.traversePluginDir(w.deprecatedPath); err != nil {
  66. klog.Errorf("failed to traverse deprecated plugin socket path %q, err: %v", w.deprecatedPath, err)
  67. }
  68. }
  69. go func(fsWatcher *fsnotify.Watcher) {
  70. defer close(w.stopped)
  71. for {
  72. select {
  73. case event := <-fsWatcher.Events:
  74. //TODO: Handle errors by taking corrective measures
  75. if event.Op&fsnotify.Create == fsnotify.Create {
  76. err := w.handleCreateEvent(event)
  77. if err != nil {
  78. klog.Errorf("error %v when handling create event: %s", err, event)
  79. }
  80. } else if event.Op&fsnotify.Remove == fsnotify.Remove {
  81. err := w.handleDeleteEvent(event)
  82. if err != nil {
  83. klog.Errorf("error %v when handling delete event: %s", err, event)
  84. }
  85. }
  86. continue
  87. case err := <-fsWatcher.Errors:
  88. if err != nil {
  89. klog.Errorf("fsWatcher received error: %v", err)
  90. }
  91. continue
  92. case <-stopCh:
  93. // In case of plugin watcher being stopped by plugin manager, stop
  94. // probing the creation/deletion of plugin sockets.
  95. // Also give all pending go routines a chance to complete
  96. select {
  97. case <-w.stopped:
  98. case <-time.After(11 * time.Second):
  99. klog.Errorf("timeout on stopping watcher")
  100. }
  101. w.fsWatcher.Close()
  102. return
  103. }
  104. }
  105. }(fsWatcher)
  106. return nil
  107. }
  108. func (w *Watcher) init() error {
  109. klog.V(4).Infof("Ensuring Plugin directory at %s ", w.path)
  110. if err := w.fs.MkdirAll(w.path, 0755); err != nil {
  111. return fmt.Errorf("error (re-)creating root %s: %v", w.path, err)
  112. }
  113. return nil
  114. }
  115. // Walks through the plugin directory discover any existing plugin sockets.
  116. // Ignore all errors except root dir not being walkable
  117. func (w *Watcher) traversePluginDir(dir string) error {
  118. return w.fs.Walk(dir, func(path string, info os.FileInfo, err error) error {
  119. if err != nil {
  120. if path == dir {
  121. return fmt.Errorf("error accessing path: %s error: %v", path, err)
  122. }
  123. klog.Errorf("error accessing path: %s error: %v", path, err)
  124. return nil
  125. }
  126. switch mode := info.Mode(); {
  127. case mode.IsDir():
  128. if w.containsBlacklistedDir(path) {
  129. return filepath.SkipDir
  130. }
  131. if err := w.fsWatcher.Add(path); err != nil {
  132. return fmt.Errorf("failed to watch %s, err: %v", path, err)
  133. }
  134. case mode&os.ModeSocket != 0:
  135. event := fsnotify.Event{
  136. Name: path,
  137. Op: fsnotify.Create,
  138. }
  139. //TODO: Handle errors by taking corrective measures
  140. if err := w.handleCreateEvent(event); err != nil {
  141. klog.Errorf("error %v when handling create event: %s", err, event)
  142. }
  143. default:
  144. klog.V(5).Infof("Ignoring file %s with mode %v", path, mode)
  145. }
  146. return nil
  147. })
  148. }
  149. // Handle filesystem notify event.
  150. // Files names:
  151. // - MUST NOT start with a '.'
  152. func (w *Watcher) handleCreateEvent(event fsnotify.Event) error {
  153. klog.V(6).Infof("Handling create event: %v", event)
  154. if w.containsBlacklistedDir(event.Name) {
  155. return nil
  156. }
  157. fi, err := os.Stat(event.Name)
  158. if err != nil {
  159. return fmt.Errorf("stat file %s failed: %v", event.Name, err)
  160. }
  161. if strings.HasPrefix(fi.Name(), ".") {
  162. klog.V(5).Infof("Ignoring file (starts with '.'): %s", fi.Name())
  163. return nil
  164. }
  165. if !fi.IsDir() {
  166. if fi.Mode()&os.ModeSocket == 0 {
  167. klog.V(5).Infof("Ignoring non socket file %s", fi.Name())
  168. return nil
  169. }
  170. return w.handlePluginRegistration(event.Name)
  171. }
  172. return w.traversePluginDir(event.Name)
  173. }
  174. func (w *Watcher) handlePluginRegistration(socketPath string) error {
  175. //TODO: Implement rate limiting to mitigate any DOS kind of attacks.
  176. // Update desired state of world list of plugins
  177. // If the socket path does exist in the desired world cache, there's still
  178. // a possibility that it has been deleted and recreated again before it is
  179. // removed from the desired world cache, so we still need to call AddOrUpdatePlugin
  180. // in this case to update the timestamp
  181. klog.V(2).Infof("Adding socket path or updating timestamp %s to desired state cache", socketPath)
  182. err := w.desiredStateOfWorld.AddOrUpdatePlugin(socketPath, w.foundInDeprecatedDir(socketPath))
  183. if err != nil {
  184. return fmt.Errorf("error adding socket path %s or updating timestamp to desired state cache: %v", socketPath, err)
  185. }
  186. return nil
  187. }
  188. func (w *Watcher) handleDeleteEvent(event fsnotify.Event) error {
  189. klog.V(6).Infof("Handling delete event: %v", event)
  190. socketPath := event.Name
  191. klog.V(2).Infof("Removing socket path %s from desired state cache", socketPath)
  192. w.desiredStateOfWorld.RemovePlugin(socketPath)
  193. return nil
  194. }
  195. // While deprecated dir is supported, to add extra protection around #69015
  196. // we will explicitly blacklist kubernetes.io directory.
  197. func (w *Watcher) containsBlacklistedDir(path string) bool {
  198. return strings.HasPrefix(path, w.deprecatedPath+"/kubernetes.io/") ||
  199. path == w.deprecatedPath+"/kubernetes.io"
  200. }
  201. func (w *Watcher) foundInDeprecatedDir(socketPath string) bool {
  202. if len(w.deprecatedPath) != 0 {
  203. if socketPath == w.deprecatedPath {
  204. return true
  205. }
  206. deprecatedPath := w.deprecatedPath
  207. if !strings.HasSuffix(deprecatedPath, "/") {
  208. deprecatedPath = deprecatedPath + "/"
  209. }
  210. if strings.HasPrefix(socketPath, deprecatedPath) {
  211. return true
  212. }
  213. }
  214. return false
  215. }