plugin_watcher.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. /*
  2. Copyright 2018 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package pluginwatcher
  14. import (
  15. "fmt"
  16. "os"
  17. "runtime"
  18. "strings"
  19. "time"
  20. "github.com/fsnotify/fsnotify"
  21. "k8s.io/klog"
  22. "k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
  23. "k8s.io/kubernetes/pkg/kubelet/util"
  24. utilfs "k8s.io/kubernetes/pkg/util/filesystem"
  25. )
  26. // Watcher is the plugin watcher
  27. type Watcher struct {
  28. path string
  29. fs utilfs.Filesystem
  30. fsWatcher *fsnotify.Watcher
  31. stopped chan struct{}
  32. desiredStateOfWorld cache.DesiredStateOfWorld
  33. }
  34. // NewWatcher provides a new watcher for socket registration
  35. func NewWatcher(sockDir string, desiredStateOfWorld cache.DesiredStateOfWorld) *Watcher {
  36. return &Watcher{
  37. path: sockDir,
  38. fs: &utilfs.DefaultFs{},
  39. desiredStateOfWorld: desiredStateOfWorld,
  40. }
  41. }
  42. // Start watches for the creation and deletion of plugin sockets at the path
  43. func (w *Watcher) Start(stopCh <-chan struct{}) error {
  44. klog.V(2).Infof("Plugin Watcher Start at %s", w.path)
  45. w.stopped = make(chan struct{})
  46. // Creating the directory to be watched if it doesn't exist yet,
  47. // and walks through the directory to discover the existing plugins.
  48. if err := w.init(); err != nil {
  49. return err
  50. }
  51. fsWatcher, err := fsnotify.NewWatcher()
  52. if err != nil {
  53. return fmt.Errorf("failed to start plugin fsWatcher, err: %v", err)
  54. }
  55. w.fsWatcher = fsWatcher
  56. // Traverse plugin dir and add filesystem watchers before starting the plugin processing goroutine.
  57. if err := w.traversePluginDir(w.path); err != nil {
  58. klog.Errorf("failed to traverse plugin socket path %q, err: %v", w.path, err)
  59. }
  60. go func(fsWatcher *fsnotify.Watcher) {
  61. defer close(w.stopped)
  62. for {
  63. select {
  64. case event := <-fsWatcher.Events:
  65. //TODO: Handle errors by taking corrective measures
  66. if event.Op&fsnotify.Create == fsnotify.Create {
  67. err := w.handleCreateEvent(event)
  68. if err != nil {
  69. klog.Errorf("error %v when handling create event: %s", err, event)
  70. }
  71. } else if event.Op&fsnotify.Remove == fsnotify.Remove {
  72. w.handleDeleteEvent(event)
  73. }
  74. continue
  75. case err := <-fsWatcher.Errors:
  76. if err != nil {
  77. klog.Errorf("fsWatcher received error: %v", err)
  78. }
  79. continue
  80. case <-stopCh:
  81. // In case of plugin watcher being stopped by plugin manager, stop
  82. // probing the creation/deletion of plugin sockets.
  83. // Also give all pending go routines a chance to complete
  84. select {
  85. case <-w.stopped:
  86. case <-time.After(11 * time.Second):
  87. klog.Errorf("timeout on stopping watcher")
  88. }
  89. w.fsWatcher.Close()
  90. return
  91. }
  92. }
  93. }(fsWatcher)
  94. return nil
  95. }
  96. func (w *Watcher) init() error {
  97. klog.V(4).Infof("Ensuring Plugin directory at %s ", w.path)
  98. if err := w.fs.MkdirAll(w.path, 0755); err != nil {
  99. return fmt.Errorf("error (re-)creating root %s: %v", w.path, err)
  100. }
  101. return nil
  102. }
  103. // Walks through the plugin directory discover any existing plugin sockets.
  104. // Ignore all errors except root dir not being walkable
  105. func (w *Watcher) traversePluginDir(dir string) error {
  106. return w.fs.Walk(dir, func(path string, info os.FileInfo, err error) error {
  107. if err != nil {
  108. if path == dir {
  109. return fmt.Errorf("error accessing path: %s error: %v", path, err)
  110. }
  111. klog.Errorf("error accessing path: %s error: %v", path, err)
  112. return nil
  113. }
  114. switch mode := info.Mode(); {
  115. case mode.IsDir():
  116. if err := w.fsWatcher.Add(path); err != nil {
  117. return fmt.Errorf("failed to watch %s, err: %v", path, err)
  118. }
  119. case mode&os.ModeSocket != 0:
  120. event := fsnotify.Event{
  121. Name: path,
  122. Op: fsnotify.Create,
  123. }
  124. //TODO: Handle errors by taking corrective measures
  125. if err := w.handleCreateEvent(event); err != nil {
  126. klog.Errorf("error %v when handling create event: %s", err, event)
  127. }
  128. default:
  129. klog.V(5).Infof("Ignoring file %s with mode %v", path, mode)
  130. }
  131. return nil
  132. })
  133. }
  134. // Handle filesystem notify event.
  135. // Files names:
  136. // - MUST NOT start with a '.'
  137. func (w *Watcher) handleCreateEvent(event fsnotify.Event) error {
  138. klog.V(6).Infof("Handling create event: %v", event)
  139. fi, err := os.Stat(event.Name)
  140. if err != nil {
  141. return fmt.Errorf("stat file %s failed: %v", event.Name, err)
  142. }
  143. if strings.HasPrefix(fi.Name(), ".") {
  144. klog.V(5).Infof("Ignoring file (starts with '.'): %s", fi.Name())
  145. return nil
  146. }
  147. if !fi.IsDir() {
  148. isSocket, err := util.IsUnixDomainSocket(util.NormalizePath(event.Name))
  149. if err != nil {
  150. return fmt.Errorf("failed to determine if file: %s is a unix domain socket: %v", event.Name, err)
  151. }
  152. if !isSocket {
  153. klog.V(5).Infof("Ignoring non socket file %s", fi.Name())
  154. return nil
  155. }
  156. return w.handlePluginRegistration(event.Name)
  157. }
  158. return w.traversePluginDir(event.Name)
  159. }
  160. func (w *Watcher) handlePluginRegistration(socketPath string) error {
  161. if runtime.GOOS == "windows" {
  162. socketPath = util.NormalizePath(socketPath)
  163. }
  164. //TODO: Implement rate limiting to mitigate any DOS kind of attacks.
  165. // Update desired state of world list of plugins
  166. // If the socket path does exist in the desired world cache, there's still
  167. // a possibility that it has been deleted and recreated again before it is
  168. // removed from the desired world cache, so we still need to call AddOrUpdatePlugin
  169. // in this case to update the timestamp
  170. klog.V(2).Infof("Adding socket path or updating timestamp %s to desired state cache", socketPath)
  171. err := w.desiredStateOfWorld.AddOrUpdatePlugin(socketPath)
  172. if err != nil {
  173. return fmt.Errorf("error adding socket path %s or updating timestamp to desired state cache: %v", socketPath, err)
  174. }
  175. return nil
  176. }
  177. func (w *Watcher) handleDeleteEvent(event fsnotify.Event) {
  178. klog.V(6).Infof("Handling delete event: %v", event)
  179. socketPath := event.Name
  180. klog.V(2).Infof("Removing socket path %s from desired state cache", socketPath)
  181. w.desiredStateOfWorld.RemovePlugin(socketPath)
  182. }