probe.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package flexvolume
  14. import (
  15. "fmt"
  16. "os"
  17. "path/filepath"
  18. "strings"
  19. "sync"
  20. "github.com/fsnotify/fsnotify"
  21. "k8s.io/apimachinery/pkg/util/errors"
  22. "k8s.io/klog"
  23. utilfs "k8s.io/kubernetes/pkg/util/filesystem"
  24. "k8s.io/kubernetes/pkg/volume"
  25. "k8s.io/utils/exec"
  26. utilstrings "k8s.io/utils/strings"
  27. )
  28. type flexVolumeProber struct {
  29. mutex sync.Mutex
  30. pluginDir string // Flexvolume driver directory
  31. runner exec.Interface // Interface to use for execing flex calls
  32. watcher utilfs.FSWatcher
  33. factory PluginFactory
  34. fs utilfs.Filesystem
  35. probeAllNeeded bool
  36. eventsMap map[string]volume.ProbeOperation // the key is the driver directory path, the value is the coresponding operation
  37. }
  38. // GetDynamicPluginProber creates dynamic plugin prober
  39. func GetDynamicPluginProber(pluginDir string, runner exec.Interface) volume.DynamicPluginProber {
  40. return &flexVolumeProber{
  41. pluginDir: pluginDir,
  42. watcher: utilfs.NewFsnotifyWatcher(),
  43. factory: pluginFactory{},
  44. runner: runner,
  45. fs: &utilfs.DefaultFs{},
  46. }
  47. }
  48. func (prober *flexVolumeProber) Init() error {
  49. prober.testAndSetProbeAllNeeded(true)
  50. prober.eventsMap = map[string]volume.ProbeOperation{}
  51. if err := prober.createPluginDir(); err != nil {
  52. return err
  53. }
  54. if err := prober.initWatcher(); err != nil {
  55. return err
  56. }
  57. return nil
  58. }
  59. // If probeAllNeeded is true, probe all pluginDir
  60. // else probe events in eventsMap
  61. func (prober *flexVolumeProber) Probe() (events []volume.ProbeEvent, err error) {
  62. if prober.probeAllNeeded {
  63. prober.testAndSetProbeAllNeeded(false)
  64. return prober.probeAll()
  65. }
  66. return prober.probeMap()
  67. }
  68. func (prober *flexVolumeProber) probeMap() (events []volume.ProbeEvent, err error) {
  69. // TODO use a concurrent map to avoid Locking the entire map
  70. prober.mutex.Lock()
  71. defer prober.mutex.Unlock()
  72. probeEvents := []volume.ProbeEvent{}
  73. allErrs := []error{}
  74. for driverDirPathAbs, op := range prober.eventsMap {
  75. driverDirName := filepath.Base(driverDirPathAbs) // e.g. driverDirName = vendor~cifs
  76. probeEvent, pluginErr := prober.newProbeEvent(driverDirName, op)
  77. if pluginErr != nil {
  78. allErrs = append(allErrs, pluginErr)
  79. continue
  80. }
  81. probeEvents = append(probeEvents, probeEvent)
  82. delete(prober.eventsMap, driverDirPathAbs)
  83. }
  84. return probeEvents, errors.NewAggregate(allErrs)
  85. }
  86. func (prober *flexVolumeProber) probeAll() (events []volume.ProbeEvent, err error) {
  87. probeEvents := []volume.ProbeEvent{}
  88. allErrs := []error{}
  89. files, err := prober.fs.ReadDir(prober.pluginDir)
  90. if err != nil {
  91. return nil, fmt.Errorf("Error reading the Flexvolume directory: %s", err)
  92. }
  93. for _, f := range files {
  94. // only directories with names that do not begin with '.' are counted as plugins
  95. // and pluginDir/dirname/dirname should be an executable
  96. // unless dirname contains '~' for escaping namespace
  97. // e.g. dirname = vendor~cifs
  98. // then, executable will be pluginDir/dirname/cifs
  99. if f.IsDir() && filepath.Base(f.Name())[0] != '.' {
  100. probeEvent, pluginErr := prober.newProbeEvent(f.Name(), volume.ProbeAddOrUpdate)
  101. if pluginErr != nil {
  102. allErrs = append(allErrs, pluginErr)
  103. continue
  104. }
  105. probeEvents = append(probeEvents, probeEvent)
  106. }
  107. }
  108. return probeEvents, errors.NewAggregate(allErrs)
  109. }
  110. func (prober *flexVolumeProber) newProbeEvent(driverDirName string, op volume.ProbeOperation) (volume.ProbeEvent, error) {
  111. probeEvent := volume.ProbeEvent{
  112. Op: op,
  113. }
  114. if op == volume.ProbeAddOrUpdate {
  115. plugin, pluginErr := prober.factory.NewFlexVolumePlugin(prober.pluginDir, driverDirName, prober.runner)
  116. if pluginErr != nil {
  117. pluginErr = fmt.Errorf(
  118. "Error creating Flexvolume plugin from directory %s, skipping. Error: %s",
  119. driverDirName, pluginErr)
  120. return probeEvent, pluginErr
  121. }
  122. probeEvent.Plugin = plugin
  123. probeEvent.PluginName = plugin.GetPluginName()
  124. } else if op == volume.ProbeRemove {
  125. driverName := utilstrings.UnescapeQualifiedName(driverDirName)
  126. probeEvent.PluginName = flexVolumePluginNamePrefix + driverName
  127. } else {
  128. return probeEvent, fmt.Errorf("Unknown Operation on directory: %s. ", driverDirName)
  129. }
  130. return probeEvent, nil
  131. }
  132. func (prober *flexVolumeProber) handleWatchEvent(event fsnotify.Event) error {
  133. // event.Name is the watched path.
  134. if filepath.Base(event.Name)[0] == '.' {
  135. // Ignore files beginning with '.'
  136. return nil
  137. }
  138. eventPathAbs, err := filepath.Abs(event.Name)
  139. if err != nil {
  140. return err
  141. }
  142. parentPathAbs := filepath.Dir(eventPathAbs)
  143. pluginDirAbs, err := filepath.Abs(prober.pluginDir)
  144. if err != nil {
  145. return err
  146. }
  147. // event of pluginDirAbs
  148. if eventPathAbs == pluginDirAbs {
  149. // If the Flexvolume plugin directory is removed, need to recreate it
  150. // in order to keep it under watch.
  151. if eventOpIs(event, fsnotify.Remove) {
  152. if err := prober.createPluginDir(); err != nil {
  153. return err
  154. }
  155. if err := prober.addWatchRecursive(pluginDirAbs); err != nil {
  156. return err
  157. }
  158. }
  159. return nil
  160. }
  161. // watch newly added subdirectories inside a driver directory
  162. if eventOpIs(event, fsnotify.Create) {
  163. if err := prober.addWatchRecursive(eventPathAbs); err != nil {
  164. return err
  165. }
  166. }
  167. eventRelPathToPluginDir, err := filepath.Rel(pluginDirAbs, eventPathAbs)
  168. if err != nil {
  169. return err
  170. }
  171. // event inside specific driver dir
  172. if len(eventRelPathToPluginDir) > 0 {
  173. driverDirName := strings.Split(eventRelPathToPluginDir, string(os.PathSeparator))[0]
  174. driverDirAbs := filepath.Join(pluginDirAbs, driverDirName)
  175. // executable is removed, will trigger ProbeRemove event
  176. if eventOpIs(event, fsnotify.Remove) && (eventRelPathToPluginDir == getExecutablePathRel(driverDirName) || parentPathAbs == pluginDirAbs) {
  177. prober.updateEventsMap(driverDirAbs, volume.ProbeRemove)
  178. } else {
  179. prober.updateEventsMap(driverDirAbs, volume.ProbeAddOrUpdate)
  180. }
  181. }
  182. return nil
  183. }
  184. // getExecutableName returns the executableName of a flex plugin
  185. func getExecutablePathRel(driverDirName string) string {
  186. parts := strings.Split(driverDirName, "~")
  187. return filepath.Join(driverDirName, parts[len(parts)-1])
  188. }
  189. func (prober *flexVolumeProber) updateEventsMap(eventDirAbs string, op volume.ProbeOperation) {
  190. prober.mutex.Lock()
  191. defer prober.mutex.Unlock()
  192. if prober.probeAllNeeded {
  193. return
  194. }
  195. prober.eventsMap[eventDirAbs] = op
  196. }
  197. // Recursively adds to watch all directories inside and including the file specified by the given filename.
  198. // If the file is a symlink to a directory, it will watch the symlink but not any of the subdirectories.
  199. //
  200. // Each file or directory change triggers two events: one from the watch on itself, another from the watch
  201. // on its parent directory.
  202. func (prober *flexVolumeProber) addWatchRecursive(filename string) error {
  203. addWatch := func(path string, info os.FileInfo, err error) error {
  204. if err == nil && info.IsDir() {
  205. if err := prober.watcher.AddWatch(path); err != nil {
  206. klog.Errorf("Error recursively adding watch: %v", err)
  207. }
  208. }
  209. return nil
  210. }
  211. return prober.fs.Walk(filename, addWatch)
  212. }
  213. // Creates a new filesystem watcher and adds watches for the plugin directory
  214. // and all of its subdirectories.
  215. func (prober *flexVolumeProber) initWatcher() error {
  216. err := prober.watcher.Init(func(event fsnotify.Event) {
  217. if err := prober.handleWatchEvent(event); err != nil {
  218. klog.Errorf("Flexvolume prober watch: %s", err)
  219. }
  220. }, func(err error) {
  221. klog.Errorf("Received an error from watcher: %s", err)
  222. })
  223. if err != nil {
  224. return fmt.Errorf("Error initializing watcher: %s", err)
  225. }
  226. if err := prober.addWatchRecursive(prober.pluginDir); err != nil {
  227. return fmt.Errorf("Error adding watch on Flexvolume directory: %s", err)
  228. }
  229. prober.watcher.Run()
  230. return nil
  231. }
  232. // Creates the plugin directory, if it doesn't already exist.
  233. func (prober *flexVolumeProber) createPluginDir() error {
  234. if _, err := prober.fs.Stat(prober.pluginDir); os.IsNotExist(err) {
  235. klog.Warningf("Flexvolume plugin directory at %s does not exist. Recreating.", prober.pluginDir)
  236. err := prober.fs.MkdirAll(prober.pluginDir, 0755)
  237. if err != nil {
  238. return fmt.Errorf("Error (re-)creating driver directory: %s", err)
  239. }
  240. }
  241. return nil
  242. }
  243. func (prober *flexVolumeProber) testAndSetProbeAllNeeded(newval bool) (oldval bool) {
  244. prober.mutex.Lock()
  245. defer prober.mutex.Unlock()
  246. oldval, prober.probeAllNeeded = prober.probeAllNeeded, newval
  247. return
  248. }
  249. func eventOpIs(event fsnotify.Event, op fsnotify.Op) bool {
  250. return event.Op&op == op
  251. }